如何实现elasticsearch导入mysql数据

技术如何实现elasticsearch导入mysql数据这篇文章主要讲解了“如何实现elasticsearch导入mysql数据”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学

这篇文章主要讲解了"如何实现弹性搜索导入关系型数据库数据",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何实现弹性搜索导入关系型数据库数据"吧!

一、基于elasticsearch的官方API批量导入

引入maven依赖

家长

groupIdorg.springframework.boot/groupId

artifactIdspring-boot-starter-parent/artifactId

version2.3.2.RELEASE/version

relativePath/!- lookupparentfromrepository -

/家长

属国

属国

groupIdorg.springframework.boot/groupId

artifactIdspring-boot-starter-web/artifactId

/依赖性

属国

groupIdorg.springframework.boot/groupId

artifactIdspring-boot-starter-data-elastic search/artifactId

/依赖性

属国

groupIdmysql/groupId

artifactIdmysql-连接器-java/artifactId

scoperuntime/scope

/依赖性

/dependenciesjdbc连接类

publicclassDBHelper{

publicationstatifindringurl=

jdbc:mysql

://localhost:3306/lagou_db?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai";
    public static final String name = "com.mysql.cj.jdbc.Driver";
    public static final String user = "root";
    public static final String password = "root";
    public static Connection conn = null;
    public static Connection getConn() {
        try {
            Class.forName(name);
            conn = DriverManager.getConnection(url, user, password);//获取连接
        } catch (Exception e) {
            e.printStackTrace();
        }
        return conn;
    }
}

导入逻辑
@Service("positionService")
public class PositionService {
    @Autowired
    ElasticsearchRestTemplate elasticsearchTemplate;
    @Autowired
    RestHighLevelClient client;
    private static final String POSITIOIN_INDEX = "position";
    public void importAll() throws IOException {
        writeMysqlDataToES(POSITIOIN_INDEX);
    }
    /** 讲数据批量写入ES中 */
    private void writeMysqlDataToES(String tableName) {
        BulkProcessor bulkProcessor = getBulkProcessor(client);
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        try {
            conn = DBHelper.getConn();
            System.out.println("Start handle data :" + tableName);
            String sql = "SELECT * from " + tableName;
            ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY,
                    ResultSet.CONCUR_READ_ONLY);
            // 根据自己需要 设置
            ps.setFetchSize(20);
            rs = ps.executeQuery();
            ResultSetMetaData colData = rs.getMetaData();
            ArrayList<HashMap<String, String>> dataList = new
                    ArrayList<HashMap<String, String>>();
            // bulkProcessor 添加的数据支持的方式并不多,查看其api发现其支持map键值对的方式,故笔者在此将查出来的数据转换成hashMap方式
            HashMap<String, String> map = null;
            int count = 0;
            String c = null;
            String v = null;
            while (rs.next()) {
                count++;
                map = new HashMap<String, String>(128);
                for (int i = 1; i <= colData.getColumnCount(); i++) {
                    c = colData.getColumnName(i);
                    v = rs.getString(c);
                    map.put(c, v);
                }
                dataList.add(map);
                // 每1万条写一次,不足的批次的最后再一并提交
                if (count % 10000 == 0) {
                    System.out.println("Mysql handle data number : " + count);
                    // 将数据添加到 bulkProcessor 中
                    for (HashMap<String, String> hashMap2 : dataList) {
                        bulkProcessor.add(
                                new IndexRequest(POSITIOIN_INDEX).source(hashMap2));
                    }
                    // 每提交一次便将map与list清空
                    map.clear();
                    dataList.clear();
                }
            }
            // 处理未提交的数据
            for (HashMap<String, String> hashMap2 : dataList) {
                bulkProcessor.add(
                        new IndexRequest(POSITIOIN_INDEX).source(hashMap2));
                System.out.println(hashMap2);
            }
            System.out.println("-------------------------- Finally insert number total: " + count);
            // 将数据刷新到es, 注意这一步执行后并不会立即生效,取决于bulkProcessor设置的刷新时间
            bulkProcessor.flush();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        } finally {
            try {
                rs.close();
                ps.close();
                conn.close();
                boolean terminatedFlag = bulkProcessor.awaitClose(150L,
                        TimeUnit.SECONDS);
                System.out.println(terminatedFlag);
            } catch (Exception e) {
                System.out.println(e.getMessage());
            }
        }
    }
    private BulkProcessor getBulkProcessor(RestHighLevelClient client) {
        BulkProcessor bulkProcessor = null;
        try {
            BulkProcessor.Listener listener = new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId, BulkRequest request) {
                    System.out.println("Try to insert data number : " + request.numberOfActions());
                }
                @Override
                public void afterBulk(long executionId, BulkRequest request,
                                      BulkResponse response) {
                    System.out.println("************** Success insert data number : "+ request.numberOfActions() + " , id: " +executionId);
                }
                @Override
                public void afterBulk(long executionId, BulkRequest request,
                                      Throwable failure) {
                    System.out.println("Bulk is unsuccess : " + failure + ",executionId: " + executionId);
                }
            };
            BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
                    (request, bulkListener) -> client
                            .bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
            BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer,
                    listener);
            builder.setBulkActions(5000);
            builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));
            builder.setConcurrentRequests(10);
            builder.setFlushInterval(TimeValue.timeValueSeconds(100L));
            builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
            // 注意点:让参数设置生效
            bulkProcessor = builder.build();
        } catch (Exception e) {
            e.printStackTrace();
            try {
                bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);
            } catch (Exception e1) {
                System.out.println(e1.getMessage());
            }
        }
        return bulkProcessor;
    }
}
调用入口
@RestController
public class PositionController {
    @Autowired
    PositionService positionService;
    @RequestMapping("query")
    public List<Map> query(String positionName) {
        if(positionName == null){
            return null;
        }
        return positionService.queryPositions(positionName);
    }
    @RequestMapping("/importAll")
    public String importAll(){
        try {
            positionService.importAll();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return "success";
    }
}
导入的数据表
public class Position implements Serializable {
    //主键
    private String id;
    //公司名称
    private String companyName;
    //职位名称
    private String positionName;
    //职位诱惑
    private String positionAdvantage;
    //薪资
    private String salary;
    //薪资下限
    private int salaryMin;
    //薪资上限
    private int salaryMax;
    //学历
    private String education;
    //工作年限
    private String workYear;
    //发布时间
    private String publishTime;
    //工作城市
    private String city;
    //工作地点
    private String workAddress;
    //发布时间
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private Date createTime;
    //工作模式
    private String jobNature;
}

二、基于logstash导入

前提:安装好logstash

import.conf
input {
    stdin {
    }
    jdbc {
   
      jdbc_connection_string => "jdbc:mysql://localhost:3306/lagou_db?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai"
 
      jdbc_user => "root"
      jdbc_password => "root" 
      jdbc_driver_library => "D:/mysql-connector-java-5.1.10.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "1000"
   
      statement_filepath => "D:/import.sql"
  
 
    }
}
 
filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}
 
output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "position"
        document_type => "_doc"
 
    }
    stdout {
        codec => json_lines
    }
}
import.sql
select * from position
启动logstash
logstash -f ../import.conf

感谢各位的阅读,以上就是“如何实现elasticsearch导入mysql数据”的内容了,经过本文的学习后,相信大家对如何实现elasticsearch导入mysql数据这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/133544.html

(0)

相关推荐

  • spring成神之路第四十八篇:@Transaction 事务源码解析

    技术spring成神之路第四十八篇:@Transaction 事务源码解析 spring成神之路第四十八篇:@Transaction 事务源码解析大家好,今天咱们通过源码来了解一下spring中@Tra

    礼包 2021年11月7日
  • mysql​升级过程中的mysql Cannot add foreign key constraint错误怎么解决

    技术mysql​升级过程中的mysql Cannot add foreign key constraint错误怎么解决本篇内容介绍了“mysql升级过程中的mysql Cannot add foreign key con

    攻略 2021年12月4日
  • Git版本思路是什么

    技术Git版本思路是什么这篇文章给大家介绍Git版本思路是什么,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。简单的说,git的管理策略目前有两大流派。平时和同事聊天或和别的公司的朋友交流时也能够感觉

    攻略 2021年10月20日
  • sqlite3基本操作(sqlite3怎么创建数据表)

    技术SQLite3如何实现数据库全文搜索这篇文章主要为大家展示了“SQLite3如何实现数据库全文搜索”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“SQLite3如何实现数据

    攻略 2021年12月18日
  • ELK安装是怎样的

    技术ELK安装是怎样的这篇文章给大家介绍ELK安装是怎样的,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。一:安装环境: 1. VMware Workstation Pro 15 下载及安装

    攻略 2021年10月20日
  • java基础知识回顾之java Thread类学习,二)--java多线程安全问题,锁)

    技术java基础知识回顾之java Thread类学习,二)--java多线程安全问题,锁) java基础知识回顾之java Thread类学习(二)--java多线程安全问题(锁)上一节售票系统中我们

    礼包 2021年12月23日