导入数据到Elasticsearch中
前言写入的几种方案具体操作实现
版本: 测试
es入门使用
es索引使用
es分词使用
es聚合使用
java操作es
我们在使用es时候,需要手动将数据导入到es中,导入数据无非就是给es中写入数据,可以从mysql中写入,也可以从其他db或者excle中导入中间需要做一层转换,然后使用es的相关api批量写入es中。
写入的几种方案写入方案有很多种:
业务代码中异步写入
如我们创单成功时,异步将订单数据写入es中数据同步到mq中然后mq在写入es中
如项目日志操作,kafka + es 收集日志操作将mysql指定表中数据写入到es
这里我们演示将某个表中数据写入es中订阅mysql binlog 异步导入es
cancel伪装bin log 将解析后的数据导入到es中
具体操作
一般随着业务的发展,db查询到一定程度就显得力不从心(当然这里指的是加索引,分库分表以及各种优化之后也无济于事,数据量还是很大),此时我们就需要使用es来提高查询效率,此时就需要从db将数据导入到es中。
读取db数据批量写入es依次循环,直到数据都被写完 实现
写入es方法有很多种,我们可以使用es的api,一条一条写,也可以批量操作。
这里我们使用es官方推荐的一个工具类Using Bulk Processor操作
官方文档地址
jdk: 11
es:7.1
sb : 2.1
mysql: 5.7
首先导入依赖
org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-starter-test org.elasticsearch.client elasticsearch-rest-high-level-client 7.3.0 org.elasticsearch elasticsearch org.elasticsearch elasticsearch 7.3.0 org.springframework.boot spring-boot-starter-thymeleaf org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-devtools runtime true org.springframework.boot spring-boot-configuration-processor true org.springframework.boot spring-boot-starter-test test org.junit.vintage junit-vintage-engine org.apache.httpcomponents httpclient 4.5.3 com.alibaba fastjson 1.2.58 org.projectlombok lombok true mysql mysql-connector-java 5.1.38 compile org.apache.commons commons-lang3 3.9 junit junit 4.12 test
yml配置
spring:
devtools:
restart:
enabled: true #u8BBEu7F6Eu5F00u542Fu70EDu90E8u7F72
additional-paths: src/main/java #u91CDu542Fu76EEu5F55
exclude: WEB-INF/**
freemarker:
cache: false #u9875u9762u4E0Du52A0u8F7Du7F13u5B58uFF0Cu4FEEu6539u5373u65F6u751Fu6548
elasticsearch:
rest:
uris: 127.0.0.1:9200
server:
port: 8080
logging:
level:
root: info
com.xdclass.search: debug
我们需要初始化db,以及es(这里可以集群,也可以单机),然后写相关api
首先是db操作,这里直接单个db实例,简单一点,当然也可以使用其他的mybatis,jpa,jdbc等
package com.example.esdemo.config;
import java.sql.Connection;
import java.sql.DriverManager;
public class DBHelper {
public static final String url = "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai";
// public static final String name = "com.mysql.cj.jdbc.Driver";
public static final String name = "com.mysql.jdbc.Driver";
public static final String user = "root";
public static final String password = "root";
private static Connection connection = null;
public static Connection getConn(){
try {
Class.forName(name);
connection = DriverManager.getConnection(url,user,password);
}catch (Exception e){
e.printStackTrace();
}
return connection;
}
}
es配置
package com.example.esdemo.config;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ElasticsearchConfig {
@Value("${spring.elasticsearch.rest.uris}")
private String hostlist;
@Bean
public RestHighLevelClient restHighLevelClient() {
//解析hostlist配置信息
String[] split = hostlist.split(",");
//创建HttpHost数组,其中存放es主机和端口的配置信息
HttpHost[] httpHostArray = new HttpHost[split.length];
for (int i = 0; i < split.length; i++) {
String item = split[i];
httpHostArray[i] = new HttpHost(item.split(":")[0], Integer.parseInt(item.split(":")[1]), "http");
}
//创建RestHighLevelClient客户端
return new RestHighLevelClient(RestClient.builder(httpHostArray));
}//项目主要使用RestHighLevelClient,对于低级的客户端暂时不用
@Bean
public RestClient restClient() {
// 解析hostlist配置信息
String[] split = hostlist.split(",");
//创建HttpHost数组,其中存放es主机和端口的配置信息
HttpHost[] httpHostArray = new HttpHost[split.length];
for (int i = 0; i < split.length; i++) {
String item = split[i];
httpHostArray[i] = new HttpHost(item.split(":")[0], Integer.parseInt(item.split(":")[1]), "http");
}
return RestClient.builder(httpHostArray).build();
}
}
db 和es相关的表明和index名
*/
public class importDb2Es {
private String dbTableName;
private String esIndexName;
//get set 省略
}
然后是相关service,导入service
public interface importService {
void importDb2Es(String dbName,String esIndexName);
}
实现
package com.example.esdemo.service.impl;
import com.example.esdemo.config.DBHelper;
import com.example.esdemo.imports.importDb2Es;
import com.example.esdemo.service.importService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizevalue;
import org.elasticsearch.common.unit.Timevalue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
@Component
public class importServiceImpl implements importService {
private static final Logger logger = LogManager.getLogger(importServiceImpl.class);
@Autowired
private RestHighLevelClient client;
@Override
public void importDb2Es(importDb2Es importDb2Es) {
writeMySQLDataToES(importDb2Es.getDbTableName(),importDb2Es.getDbTableName());
}
private void writeMySQLDataToES(String tableName,String esIndeName) {
BulkProcessor bulkProcessor = getBulkProcessor(client);
Connection connection = null;
PreparedStatement ps = null;
ResultSet rs = null;
try {
connection = DBHelper.getConn();
logger.info("start handle data:" + tableName);
String sql = "select * from " + tableName;
ps = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
// 根据自己需要设置 fetchSize
ps.setFetchSize(20);
rs = ps.executeQuery();
ResultSetmetaData colData = rs.getmetaData();
ArrayList> dataList = new ArrayList<>();
HashMap map = null;
int count = 0;
// c 就是列的名字 v 就是列对应的值
String c = null;
String v = null;
while (rs.next()) {
count++;
map = new HashMap(128);
for (int i = 1; i < colData.getColumnCount(); i++) {
c = colData.getColumnName(i);
v = rs.getString(c);
map.put(c, v);
}
dataList.add(map);
// 每1万条 写一次 不足的批次的数据 最后一次提交处理
if (count % 10000 == 0) {
logger.info("mysql handle data number:" + count);
// 将数据添加到 bulkProcessor
for (HashMap hashMap2 : dataList) {
bulkProcessor.add(new IndexRequest(esIndeName).source(hashMap2));
}
// 每提交一次 清空 map 和 dataList
map.clear();
dataList.clear();
}
}
// 处理 未提交的数据
for (HashMap hashMap2 : dataList) {
bulkProcessor.add(new IndexRequest(esIndeName).source(hashMap2));
}
bulkProcessor.flush();
} catch (SQLException e) {
e.printStackTrace();
} finally {
try {
rs.close();
ps.close();
connection.close();
boolean terinaFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS);
logger.info(terinaFlag);
} catch (Exception e) {
e.printStackTrace();
}
}
}
private BulkProcessor getBulkProcessor(RestHighLevelClient client) {
BulkProcessor bulkProcessor = null;
try {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
logger.info("Try to insert data number : "
+ request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
logger.info("************** Success insert data number : "
+ request.numberOfActions() + " , id: " + executionId);
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("Bulk is unsuccess : " + failure + ", executionId: " + executionId);
}
};
BiConsumer> bulkConsumer = (request, bulkListener) -> client
.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener);
builder.setBulkActions(5000);
builder.setBulkSize(new ByteSizevalue(100L, ByteSizeUnit.MB));
builder.setConcurrentRequests(10);
builder.setFlushInterval(Timevalue.timevalueSeconds(100L));
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(Timevalue.timevalueSeconds(1L), 3));
// 注意点:让参数设置生效
bulkProcessor = builder.build();
} catch (Exception e) {
e.printStackTrace();
try {
bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);
} catch (Exception e1) {
logger.error(e1.getMessage());
}
}
return bulkProcessor;
}
}
这里我们提供一个http 测试
@Autowired
private importService importService;
@RequestMapping("api/import")
public Map imports(importDb2Es importDb2Es) {
Map map = new HashMap<>();
importService.importDb2Es(importDb2Es);
map.put("code", 200);
map.put("msg", "成功");
return map;
}
这里我们准备了一个表一些数据
我们调用http接口
http://127.0.0.1:8080/api/import?dbTableName=position&esIndexName=position_index
可以看到我们的后台导入日志50条数据
我们去es中查询如下100条(这里我请求了2次,所以是100条)



