这篇文章主要讲解了"如何实现弹性搜索导入关系型数据库数据",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"如何实现弹性搜索导入关系型数据库数据"吧!
一、基于elasticsearch的官方API批量导入
引入maven依赖
家长
groupIdorg.springframework.boot/groupId
artifactIdspring-boot-starter-parent/artifactId
version2.3.2.RELEASE/version
relativePath/!- lookupparentfromrepository -
/家长
属国
属国
groupIdorg.springframework.boot/groupId
artifactIdspring-boot-starter-web/artifactId
/依赖性
属国
groupIdorg.springframework.boot/groupId
artifactIdspring-boot-starter-data-elastic search/artifactId
/依赖性
属国
groupIdmysql/groupId
artifactIdmysql-连接器-java/artifactId
scoperuntime/scope
/依赖性
/dependenciesjdbc连接类
publicclassDBHelper{
publicationstatifindringurl=
jdbc:mysql
://localhost:3306/lagou_db?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai";
public static final String name = "com.mysql.cj.jdbc.Driver";
public static final String user = "root";
public static final String password = "root";
public static Connection conn = null;
public static Connection getConn() {
try {
Class.forName(name);
conn = DriverManager.getConnection(url, user, password);//获取连接
} catch (Exception e) {
e.printStackTrace();
}
return conn;
}
}
导入逻辑
@Service("positionService") public class PositionService { @Autowired ElasticsearchRestTemplate elasticsearchTemplate; @Autowired RestHighLevelClient client; private static final String POSITIOIN_INDEX = "position"; public void importAll() throws IOException { writeMysqlDataToES(POSITIOIN_INDEX); } /** 讲数据批量写入ES中 */ private void writeMysqlDataToES(String tableName) { BulkProcessor bulkProcessor = getBulkProcessor(client); Connection conn = null; PreparedStatement ps = null; ResultSet rs = null; try { conn = DBHelper.getConn(); System.out.println("Start handle data :" + tableName); String sql = "SELECT * from " + tableName; ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); // 根据自己需要 设置 ps.setFetchSize(20); rs = ps.executeQuery(); ResultSetMetaData colData = rs.getMetaData(); ArrayList<HashMap<String, String>> dataList = new ArrayList<HashMap<String, String>>(); // bulkProcessor 添加的数据支持的方式并不多,查看其api发现其支持map键值对的方式,故笔者在此将查出来的数据转换成hashMap方式 HashMap<String, String> map = null; int count = 0; String c = null; String v = null; while (rs.next()) { count++; map = new HashMap<String, String>(128); for (int i = 1; i <= colData.getColumnCount(); i++) { c = colData.getColumnName(i); v = rs.getString(c); map.put(c, v); } dataList.add(map); // 每1万条写一次,不足的批次的最后再一并提交 if (count % 10000 == 0) { System.out.println("Mysql handle data number : " + count); // 将数据添加到 bulkProcessor 中 for (HashMap<String, String> hashMap2 : dataList) { bulkProcessor.add( new IndexRequest(POSITIOIN_INDEX).source(hashMap2)); } // 每提交一次便将map与list清空 map.clear(); dataList.clear(); } } // 处理未提交的数据 for (HashMap<String, String> hashMap2 : dataList) { bulkProcessor.add( new IndexRequest(POSITIOIN_INDEX).source(hashMap2)); System.out.println(hashMap2); } System.out.println("-------------------------- Finally insert number total: " + count); // 将数据刷新到es, 注意这一步执行后并不会立即生效,取决于bulkProcessor设置的刷新时间 bulkProcessor.flush(); } catch (Exception e) { System.out.println(e.getMessage()); } finally { try { rs.close(); ps.close(); conn.close(); boolean terminatedFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS); System.out.println(terminatedFlag); } catch (Exception e) { System.out.println(e.getMessage()); } } } private BulkProcessor getBulkProcessor(RestHighLevelClient client) { BulkProcessor bulkProcessor = null; try { BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { System.out.println("Try to insert data number : " + request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { System.out.println("************** Success insert data number : "+ request.numberOfActions() + " , id: " +executionId); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { System.out.println("Bulk is unsuccess : " + failure + ",executionId: " + executionId); } }; BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> client .bulkAsync(request, RequestOptions.DEFAULT, bulkListener); BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener); builder.setBulkActions(5000); builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB)); builder.setConcurrentRequests(10); builder.setFlushInterval(TimeValue.timeValueSeconds(100L)); builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); // 注意点:让参数设置生效 bulkProcessor = builder.build(); } catch (Exception e) { e.printStackTrace(); try { bulkProcessor.awaitClose(100L, TimeUnit.SECONDS); } catch (Exception e1) { System.out.println(e1.getMessage()); } } return bulkProcessor; } }
调用入口
@RestController public class PositionController { @Autowired PositionService positionService; @RequestMapping("query") public List<Map> query(String positionName) { if(positionName == null){ return null; } return positionService.queryPositions(positionName); } @RequestMapping("/importAll") public String importAll(){ try { positionService.importAll(); } catch (IOException e) { e.printStackTrace(); } return "success"; } }
导入的数据表
public class Position implements Serializable { //主键 private String id; //公司名称 private String companyName; //职位名称 private String positionName; //职位诱惑 private String positionAdvantage; //薪资 private String salary; //薪资下限 private int salaryMin; //薪资上限 private int salaryMax; //学历 private String education; //工作年限 private String workYear; //发布时间 private String publishTime; //工作城市 private String city; //工作地点 private String workAddress; //发布时间 @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss") private Date createTime; //工作模式 private String jobNature; }
二、基于logstash导入
前提:安装好logstash
import.conf
input { stdin { } jdbc { jdbc_connection_string => "jdbc:mysql://localhost:3306/lagou_db?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai" jdbc_user => "root" jdbc_password => "root" jdbc_driver_library => "D:/mysql-connector-java-5.1.10.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "1000" statement_filepath => "D:/import.sql" } } filter { json { source => "message" remove_field => ["message"] } } output { elasticsearch { hosts => ["localhost:9200"] index => "position" document_type => "_doc" } stdout { codec => json_lines } }
import.sql
select * from position
启动logstash
logstash -f ../import.conf
感谢各位的阅读,以上就是“如何实现elasticsearch导入mysql数据”的内容了,经过本文的学习后,相信大家对如何实现elasticsearch导入mysql数据这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/133544.html