如何实现elasticsearch导入mysql数据

技术如何实现elasticsearch导入mysql数据这篇文章主要讲解了“如何实现elasticsearch导入mysql数据”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学

这篇文章主要讲解了"如何实现弹性搜索导入关系型数据库数据",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何实现弹性搜索导入关系型数据库数据"吧!

一、基于elasticsearch的官方API批量导入

引入maven依赖

家长

groupIdorg.springframework.boot/groupId

artifactIdspring-boot-starter-parent/artifactId

version2.3.2.RELEASE/version

relativePath/!- lookupparentfromrepository -

/家长

属国

属国

groupIdorg.springframework.boot/groupId

artifactIdspring-boot-starter-web/artifactId

/依赖性

属国

groupIdorg.springframework.boot/groupId

artifactIdspring-boot-starter-data-elastic search/artifactId

/依赖性

属国

groupIdmysql/groupId

artifactIdmysql-连接器-java/artifactId

scoperuntime/scope

/依赖性

/dependenciesjdbc连接类

publicclassDBHelper{

publicationstatifindringurl=

jdbc:mysql

://localhost:3306/lagou_db?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai";
    public static final String name = "com.mysql.cj.jdbc.Driver";
    public static final String user = "root";
    public static final String password = "root";
    public static Connection conn = null;
    public static Connection getConn() {
        try {
            Class.forName(name);
            conn = DriverManager.getConnection(url, user, password);//获取连接
        } catch (Exception e) {
            e.printStackTrace();
        }
        return conn;
    }
}

导入逻辑
@Service("positionService")
public class PositionService {
    @Autowired
    ElasticsearchRestTemplate elasticsearchTemplate;
    @Autowired
    RestHighLevelClient client;
    private static final String POSITIOIN_INDEX = "position";
    public void importAll() throws IOException {
        writeMysqlDataToES(POSITIOIN_INDEX);
    }
    /** 讲数据批量写入ES中 */
    private void writeMysqlDataToES(String tableName) {
        BulkProcessor bulkProcessor = getBulkProcessor(client);
        Connection conn = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        try {
            conn = DBHelper.getConn();
            System.out.println("Start handle data :" + tableName);
            String sql = "SELECT * from " + tableName;
            ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY,
                    ResultSet.CONCUR_READ_ONLY);
            // 根据自己需要 设置
            ps.setFetchSize(20);
            rs = ps.executeQuery();
            ResultSetMetaData colData = rs.getMetaData();
            ArrayList<HashMap<String, String>> dataList = new
                    ArrayList<HashMap<String, String>>();
            // bulkProcessor 添加的数据支持的方式并不多,查看其api发现其支持map键值对的方式,故笔者在此将查出来的数据转换成hashMap方式
            HashMap<String, String> map = null;
            int count = 0;
            String c = null;
            String v = null;
            while (rs.next()) {
                count++;
                map = new HashMap<String, String>(128);
                for (int i = 1; i <= colData.getColumnCount(); i++) {
                    c = colData.getColumnName(i);
                    v = rs.getString(c);
                    map.put(c, v);
                }
                dataList.add(map);
                // 每1万条写一次,不足的批次的最后再一并提交
                if (count % 10000 == 0) {
                    System.out.println("Mysql handle data number : " + count);
                    // 将数据添加到 bulkProcessor 中
                    for (HashMap<String, String> hashMap2 : dataList) {
                        bulkProcessor.add(
                                new IndexRequest(POSITIOIN_INDEX).source(hashMap2));
                    }
                    // 每提交一次便将map与list清空
                    map.clear();
                    dataList.clear();
                }
            }
            // 处理未提交的数据
            for (HashMap<String, String> hashMap2 : dataList) {
                bulkProcessor.add(
                        new IndexRequest(POSITIOIN_INDEX).source(hashMap2));
                System.out.println(hashMap2);
            }
            System.out.println("-------------------------- Finally insert number total: " + count);
            // 将数据刷新到es, 注意这一步执行后并不会立即生效,取决于bulkProcessor设置的刷新时间
            bulkProcessor.flush();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        } finally {
            try {
                rs.close();
                ps.close();
                conn.close();
                boolean terminatedFlag = bulkProcessor.awaitClose(150L,
                        TimeUnit.SECONDS);
                System.out.println(terminatedFlag);
            } catch (Exception e) {
                System.out.println(e.getMessage());
            }
        }
    }
    private BulkProcessor getBulkProcessor(RestHighLevelClient client) {
        BulkProcessor bulkProcessor = null;
        try {
            BulkProcessor.Listener listener = new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId, BulkRequest request) {
                    System.out.println("Try to insert data number : " + request.numberOfActions());
                }
                @Override
                public void afterBulk(long executionId, BulkRequest request,
                                      BulkResponse response) {
                    System.out.println("************** Success insert data number : "+ request.numberOfActions() + " , id: " +executionId);
                }
                @Override
                public void afterBulk(long executionId, BulkRequest request,
                                      Throwable failure) {
                    System.out.println("Bulk is unsuccess : " + failure + ",executionId: " + executionId);
                }
            };
            BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
                    (request, bulkListener) -> client
                            .bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
            BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer,
                    listener);
            builder.setBulkActions(5000);
            builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));
            builder.setConcurrentRequests(10);
            builder.setFlushInterval(TimeValue.timeValueSeconds(100L));
            builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
            // 注意点:让参数设置生效
            bulkProcessor = builder.build();
        } catch (Exception e) {
            e.printStackTrace();
            try {
                bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);
            } catch (Exception e1) {
                System.out.println(e1.getMessage());
            }
        }
        return bulkProcessor;
    }
}
调用入口
@RestController
public class PositionController {
    @Autowired
    PositionService positionService;
    @RequestMapping("query")
    public List<Map> query(String positionName) {
        if(positionName == null){
            return null;
        }
        return positionService.queryPositions(positionName);
    }
    @RequestMapping("/importAll")
    public String importAll(){
        try {
            positionService.importAll();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return "success";
    }
}
导入的数据表
public class Position implements Serializable {
    //主键
    private String id;
    //公司名称
    private String companyName;
    //职位名称
    private String positionName;
    //职位诱惑
    private String positionAdvantage;
    //薪资
    private String salary;
    //薪资下限
    private int salaryMin;
    //薪资上限
    private int salaryMax;
    //学历
    private String education;
    //工作年限
    private String workYear;
    //发布时间
    private String publishTime;
    //工作城市
    private String city;
    //工作地点
    private String workAddress;
    //发布时间
    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
    private Date createTime;
    //工作模式
    private String jobNature;
}

二、基于logstash导入

前提:安装好logstash

import.conf
input {
    stdin {
    }
    jdbc {
   
      jdbc_connection_string => "jdbc:mysql://localhost:3306/lagou_db?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai"
 
      jdbc_user => "root"
      jdbc_password => "root" 
      jdbc_driver_library => "D:/mysql-connector-java-5.1.10.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "1000"
   
      statement_filepath => "D:/import.sql"
  
 
    }
}
 
filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}
 
output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "position"
        document_type => "_doc"
 
    }
    stdout {
        codec => json_lines
    }
}
import.sql
select * from position
启动logstash
logstash -f ../import.conf

感谢各位的阅读,以上就是“如何实现elasticsearch导入mysql数据”的内容了,经过本文的学习后,相信大家对如何实现elasticsearch导入mysql数据这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/133544.html

(0)

相关推荐

  • html5如何使一段代码无效

    技术html5怎么让一段代码无效小编给大家分享一下html5怎么让一段代码无效,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

    攻略 2021年12月19日
  • Hive中静态分区与动态分区的示例分析

    技术Hive中静态分区与动态分区的示例分析这篇文章给大家分享的是有关Hive中静态分区与动态分区的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。  分区是hive存放数据的一种方式。将

    攻略 2021年12月10日
  • Android事件分发机制

    技术Android事件分发机制 Android事件分发机制原文链接:https://juejin.im/post/5eb3e0d6f265da7c002028cd这次说下Android中的事件分发机制从

    礼包 2021年12月7日
  • ios怎么整个手机截屏(ios有哪几种截屏方式)

    技术IOS如何实现手机截屏小编给大家分享一下IOS如何实现手机截屏 ,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!IOS手机截屏 主要步骤1

    攻略 2021年12月24日
  • macbookpro怎么保养屏幕(macbook 电脑屏幕怎么养护)

    技术怎么保养macbook的屏幕这篇文章给大家分享的是有关怎么保养macbook的屏幕的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。在电子市场,苹果产品可以说受到更多人的追捧和喜爱。与iPho

    攻略 2021年12月24日
  • 基于Python的算法数据集怎么实现

    技术基于Python的算法数据集怎么实现这篇文章主要介绍“基于Python的算法数据集怎么实现”,在日常操作中,相信很多人在基于Python的算法数据集怎么实现问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法

    攻略 2021年11月23日