pom.xml
org.springframework.boot spring-boot-starter-data-elasticsearch org.springframework.boot spring-boot-starter-web
配置文件
@Configuration
public class ElasticSearchClient extends AbstractElasticsearchConfiguration {
@Override
@Bean
public RestHighLevelClient elasticsearchClient() {
ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo("127.0.0.1:9200")
.withBasicAuth("elastic", "123456").build();
return RestClients.create(clientConfiguration).rest();
}
}
ElasticsearchOperations
@Autowired private ElasticsearchOperations elasticsearchOperations;
@Data
@AllArgsConstructor
@NoArgsConstructor
@document(indexName = "book")
public class Book {
@Id
private Integer id;
@Field(type = FieldType.Keyword)
private String name;
@Field(type = FieldType.Text)
private String desc;
}
添加文档
@Test
void createDoc() {
Book book1 = new Book(1, "《史记》", "中国历史上第一部纪传体通史");
Book book2 = new Book(2, "《三国演义》", "长篇章回体历史演义小说");
Book book3 = new Book(3, "《红楼梦》", "中国古代章回体长篇小说");
Book result = elasticsearchOperations.save(book1);
// Book(id=1, name=《史记》, desc=中国历史上第一部纪传体通史)
System.out.println(result);
}
删除文档
@Test
void deleteDoc() {
Book book = new Book();
book.setId(1);
String result = elasticsearchOperations.delete(book);
// 1
System.out.println(result);
}
修改文档
@Test
void updateDoc() {
// 全量覆盖
Book book = new Book();
book.setId(3);
book.setName("《水浒传》");
book.setDesc("歌颂农民起义的长篇章回体版块结构小说");
elasticsearchOperations.save(book);
}
查询文档
@Test
void queryAllDoc() {
SearchHits search = elasticsearchOperations.search(Query.findAll(), Book.class);
List> searchHits = search.getSearchHits();
for (SearchHit searchHit : searchHits) {
Book content = searchHit.getContent();
System.out.println(content);
}
}
RestHighLevelClient
@Autowired private RestHighLevelClient restHighLevelClient;索引和映射 创建索引 + 映射
@Test
void createIndex() throws IOException {
// 指定索引名称
CreateIndexRequest createIndexRequest = new CreateIndexRequest("books");
// 指定配置
createIndexRequest.settings("{"number_of_shards":1,"number_of_replicas":0}", XContentType.JSON);
// 指定映射
createIndexRequest.mapping("{"properties":{"id":{"type":"integer"},"name":{"type":"keyword"},"desc":{"type":"text","analyzer":"ik_max_word"}}}", XContentType.JSON);
// 创建索引
CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
System.out.println(createIndexResponse.isAcknowledged());
}
删除索引
@Test
void deleteIndex() throws IOException {
// 指定索引名称
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest("books");
AcknowledgedResponse acknowledgedResponse = restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
System.out.println(acknowledgedResponse.isAcknowledged());
}
文档
添加文档
@Test
void addDoc() throws IOException {
IndexRequest indexRequest = new IndexRequest("books");
indexRequest.id("1").source("{"id": 1,"name": "《史记》","desc": "中国历史上第一部纪传体通史"}", XContentType.JSON);
IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
}
修改文档
@Test
void editDoc() throws IOException {
UpdateRequest updateRequest = new UpdateRequest("books", "1");
updateRequest.doc("{"name": "《史记》"}", XContentType.JSON);
UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
}
删除文档
@Test
void removeDoc() throws IOException {
DeleteRequest deleteRequest = new DeleteRequest("books", "1");
DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
}
查询文档
根据 id 查询
@Test
void queryDocById() throws IOException {
GetRequest getRequest = new GetRequest("books", "1");
GetResponse getResponse = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
}
查询所有
void query(QueryBuilder queryBuilder) throws IOException {
// 拿到要查询的索引
SearchRequest searchRequest = new SearchRequest("products");
// 构建查询条件
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchAllQuery());
searchRequest.source(sourceBuilder);
// 进行查询
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
SearchHit[] hits = searchResponse.getHits().getHits();
for (SearchHit hit : hits) {
System.out.println(hit);
}
}
term 查询
sourceBuilder.query(QueryBuilders.termQuery("description","薯片"));
range 查询
sourceBuilder.query(QueryBuilders.rangeQuery("price").gt(0).lt(10));
prefix 查询
sourceBuilder.query(QueryBuilders.prefixQuery("title","小"));
通配符查询
sourceBuilder.query(QueryBuilders.wildcardQuery("title", "小*"));
多个id查询
sourceBuilder.query(QueryBuilders.idsQuery().addIds("1","2"));
多字段查询
sourceBuilder.query(QueryBuilders.multiMatchQuery("小辣","title","description"));
高亮
// 构建高亮条件
HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.field("title").field("description").preTags("").postTags("");
// 构建查询条件
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.termQuery("description", "小")).highlighter(highlightBuilder);
// 进行查询
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
SearchHit[] hits = searchResponse.getHits().getHits();
for (SearchHit hit : hits) {
System.out.println(hit.getSourceAsString());
Map highlightFields = hit.getHighlightFields();
highlightFields.forEach((key, value) -> {
Text fragment = value.getFragments()[0];
System.out.println(fragment);
});
}
分页
sourceBuilder.query(QueryBuilders.termQuery("description", "小")).from(0).size(5);
排序
sourceBuilder.query(QueryBuilders.termQuery("description", "小")).sort("price", SortOrder.DESC);
指定字段返回 [_source]
fetchSource:第一个数组是希望包含的字段;第二个数组是希望不包含的字段。
ourceBuilder.query(QueryBuilders.termQuery("description", "小")).fetchSource(new String[]{"title", "price", "description"}, new String[]{});
filter 查询
sourceBuilder.query(QueryBuilders.matchAllQuery()).postFilter(QueryBuilders.termQuery("description", "小"));
聚合查询
sourceBuilder.query(QueryBuilders.matchAllQuery()).aggregation(AggregationBuilders.range("group_price").field("price").addRange(0, 1).addRange(1, 5).addRange(5, 10));
ParsedRange terms = aggregations.get("group_price");
List extends Range.Bucket> buckets = terms.getBuckets();
for (Range.Bucket bucket : buckets) {
System.out.println(bucket.getKey() + ":" + bucket.getDocCount());
}
avg
sourceBuilder.query(QueryBuilders.matchAllQuery()).aggregation(AggregationBuilders.avg("avg_price").field("price"));
ParsedAvg avgPrice = aggregations.get("avg_price");
sum
sourceBuilder.query(QueryBuilders.matchAllQuery()).aggregation(AggregationBuilders.sum("sum_price").field("price"));
ParsedSum sumPrice = aggregations.get("sum_price");
max
sourceBuilder.query(QueryBuilders.matchAllQuery()).aggregation(AggregationBuilders.max("max_price").field("price"));
ParsedMax maxPrice = aggregations.get("max_price");
min
sourceBuilder.query(QueryBuilders.matchAllQuery()).aggregation(AggregationBuilders.min("min_price").field("price"));
ParsedMin minPrice = aggregations.get("min_price");
对象操作文档
new ObjectMapper().writevalueAsString(book)
new ObjectMapper().readValue(hit.getSourceAsString(), Book.class)
@Test
void ObjectSave() throws IOException {
Book book = new Book(1, "《史记》", "中国历史上第一部纪传体通史");
IndexRequest indexRequest = new IndexRequest("books");
indexRequest.id(book.getId().toString())
.source(new ObjectMapper().writevalueAsString(book), XContentType.JSON);
IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
System.out.println(indexResponse.getId());
}
@Test
void ObjectQuery() throws IOException {
SearchRequest searchRequest = new SearchRequest("books");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.matchAllQuery());
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
SearchHit[] hits = searchResponse.getHits().getHits();
for (SearchHit hit : hits) {
Book book = new ObjectMapper().readValue(hit.getSourceAsString(), Book.class);
System.out.println(book);
}
}
MySQL 数据批量导入 ES
mysql mysql-connector-java
利用 bulk 操作
private BulkProcessor getBulkProcessor() {
BulkProcessor bulkProcessor = null;
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest bulkRequest) {
System.out.println("executionId:" + executionId + "," + "Try to insert data number:" + bulkRequest.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest bulkRequest, BulkResponse bulkResponse) {
System.out.println("executionId:" + executionId + "," + "Success insert data number:" + bulkRequest.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable failure) {
System.out.println("executionId:" + executionId + "," + "Bulk is unSuccess:" + failure);
}
};
BiConsumer> bulkConsumer = (request, bulkListener) ->
restHighLevelClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener, "books");
builder.setBulkActions(5000);
builder.setBulkSize(new ByteSizevalue(100L, ByteSizeUnit.MB));
builder.setConcurrentRequests(10);
builder.setFlushInterval(Timevalue.timevalueSeconds(100L));
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(Timevalue.timevalueSeconds(1L), 3));
bulkProcessor = builder.build();
return bulkProcessor;
}
利用 IndexRequest 中 source 传 Map 类型的数据进行批量导入
@Test
void importFromDBToEs() throws Exception {
BulkProcessor bulkProcessor = getBulkProcessor();
final String url = "jdbc:mysql:///bank?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC";
final String user = "root";
final String password = "123456";
Connection connection = DriverManager.getConnection(url, user, password);
PreparedStatement preparedStatement = connection.prepareStatement("select * from books");
preparedStatement.setFetchSize(5);
ResultSet resultSet = preparedStatement.executeQuery();
ResultSetmetaData metaData = resultSet.getmetaData();
ArrayList> dataList = new ArrayList<>();
HashMap map;
int count = 0;
while (resultSet.next()) {
count++;
int columnCount = metaData.getColumnCount();
map = new HashMap<>();
for (int i = 1; i <= columnCount; i++) {
String columnName = metaData.getColumnName(i);
map.put(columnName, resultSet.getObject(columnName));
}
dataList.add(map);
if (count % 10000 == 0) {
for (HashMap dataMap : dataList) {
bulkProcessor.add(new IndexRequest("books").source(dataMap));
}
map.clear();
dataList.clear();
}
}
for (HashMap dataMap : dataList) {
bulkProcessor.add(new IndexRequest("books").source(dataMap));
}
bulkProcessor.flush();
bulkProcessor.awaitClose(100, TimeUnit.SECONDS);
}
参考git:https://gitee.com/zhangyizhou/learning-elasticsearch-demo.git



