栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Springboot 操作 Elasticsearch

Springboot 操作 Elasticsearch

Springboot 操作 Elasticsearch

pom.xml


    org.springframework.boot
    spring-boot-starter-data-elasticsearch


    org.springframework.boot
    spring-boot-starter-web

配置文件

@Configuration
public class ElasticSearchClient extends AbstractElasticsearchConfiguration {
    @Override
    @Bean
    public RestHighLevelClient elasticsearchClient() {
        ClientConfiguration clientConfiguration = ClientConfiguration.builder()
                .connectedTo("127.0.0.1:9200")
                .withBasicAuth("elastic", "123456").build();
        return RestClients.create(clientConfiguration).rest();
    }
}
ElasticsearchOperations
@Autowired
private ElasticsearchOperations elasticsearchOperations;
@Data
@AllArgsConstructor
@NoArgsConstructor
@document(indexName = "book")
public class Book {
    @Id
    private Integer id;
    @Field(type = FieldType.Keyword)
    private String name;
    @Field(type = FieldType.Text)
    private String desc;
}
添加文档
@Test
void createDoc() {
    Book book1 = new Book(1, "《史记》", "中国历史上第一部纪传体通史");
    Book book2 = new Book(2, "《三国演义》", "长篇章回体历史演义小说");
    Book book3 = new Book(3, "《红楼梦》", "中国古代章回体长篇小说");
    Book result = elasticsearchOperations.save(book1);
    // Book(id=1, name=《史记》, desc=中国历史上第一部纪传体通史)
    System.out.println(result);
}
删除文档
@Test
void deleteDoc() {
    Book book = new Book();
    book.setId(1);
    String result = elasticsearchOperations.delete(book);
    // 1
    System.out.println(result);
}
修改文档
@Test
void updateDoc() {
    // 全量覆盖
    Book book = new Book();
    book.setId(3);
    book.setName("《水浒传》");
    book.setDesc("歌颂农民起义的长篇章回体版块结构小说");
    elasticsearchOperations.save(book);
}
查询文档
@Test
void queryAllDoc() {
    SearchHits search = elasticsearchOperations.search(Query.findAll(), Book.class);
    List> searchHits = search.getSearchHits();
    for (SearchHit searchHit : searchHits) {
        Book content = searchHit.getContent();
        System.out.println(content);
    }
}
RestHighLevelClient
@Autowired
private RestHighLevelClient restHighLevelClient;
索引和映射 创建索引 + 映射
@Test
void createIndex() throws IOException {
    // 指定索引名称
    CreateIndexRequest createIndexRequest = new CreateIndexRequest("books");
    // 指定配置
    createIndexRequest.settings("{"number_of_shards":1,"number_of_replicas":0}", XContentType.JSON);
    // 指定映射
    createIndexRequest.mapping("{"properties":{"id":{"type":"integer"},"name":{"type":"keyword"},"desc":{"type":"text","analyzer":"ik_max_word"}}}", XContentType.JSON);
    // 创建索引
    CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
    System.out.println(createIndexResponse.isAcknowledged());
}
删除索引
@Test
void deleteIndex() throws IOException {
    // 指定索引名称
    DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest("books");
    AcknowledgedResponse acknowledgedResponse = restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
    System.out.println(acknowledgedResponse.isAcknowledged());
}
文档 添加文档
@Test
void addDoc() throws IOException {
    IndexRequest indexRequest = new IndexRequest("books");
    indexRequest.id("1").source("{"id": 1,"name": "《史记》","desc": "中国历史上第一部纪传体通史"}", XContentType.JSON);
    IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
}
修改文档
@Test
void editDoc() throws IOException {
    UpdateRequest updateRequest = new UpdateRequest("books", "1");
    updateRequest.doc("{"name": "《史记》"}", XContentType.JSON);
    UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
}
删除文档
@Test
void removeDoc() throws IOException {
    DeleteRequest deleteRequest = new DeleteRequest("books", "1");
    DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
}
查询文档 根据 id 查询
@Test
void queryDocById() throws IOException {
    GetRequest getRequest = new GetRequest("books", "1");
    GetResponse getResponse = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
}
查询所有
void query(QueryBuilder queryBuilder) throws IOException {
    // 拿到要查询的索引
    SearchRequest searchRequest = new SearchRequest("products");
    
    // 构建查询条件
    SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
    sourceBuilder.query(QueryBuilders.matchAllQuery());
    
    searchRequest.source(sourceBuilder);
    // 进行查询
    SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
    SearchHit[] hits = searchResponse.getHits().getHits();
    for (SearchHit hit : hits) {
        System.out.println(hit);
    }
}
term 查询
sourceBuilder.query(QueryBuilders.termQuery("description","薯片"));
range 查询
sourceBuilder.query(QueryBuilders.rangeQuery("price").gt(0).lt(10));
prefix 查询
sourceBuilder.query(QueryBuilders.prefixQuery("title","小"));
通配符查询
sourceBuilder.query(QueryBuilders.wildcardQuery("title", "小*"));
多个id查询
sourceBuilder.query(QueryBuilders.idsQuery().addIds("1","2"));
多字段查询
sourceBuilder.query(QueryBuilders.multiMatchQuery("小辣","title","description"));
高亮
// 构建高亮条件
HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.field("title").field("description").preTags("").postTags("");
// 构建查询条件
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.termQuery("description", "小")).highlighter(highlightBuilder);
// 进行查询
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
SearchHit[] hits = searchResponse.getHits().getHits();
for (SearchHit hit : hits) {
    System.out.println(hit.getSourceAsString());
    Map highlightFields = hit.getHighlightFields();
    highlightFields.forEach((key, value) -> {
        Text fragment = value.getFragments()[0];
        System.out.println(fragment);
    });
}
分页
sourceBuilder.query(QueryBuilders.termQuery("description", "小")).from(0).size(5);
排序
sourceBuilder.query(QueryBuilders.termQuery("description", "小")).sort("price", SortOrder.DESC);
指定字段返回 [_source]

fetchSource:第一个数组是希望包含的字段;第二个数组是希望不包含的字段。

ourceBuilder.query(QueryBuilders.termQuery("description", "小")).fetchSource(new String[]{"title", "price", "description"}, new String[]{});
filter 查询
sourceBuilder.query(QueryBuilders.matchAllQuery()).postFilter(QueryBuilders.termQuery("description", "小"));
聚合查询
sourceBuilder.query(QueryBuilders.matchAllQuery()).aggregation(AggregationBuilders.range("group_price").field("price").addRange(0, 1).addRange(1, 5).addRange(5, 10));

ParsedRange terms = aggregations.get("group_price");
List buckets = terms.getBuckets();
for (Range.Bucket bucket : buckets) {
    System.out.println(bucket.getKey() + ":" + bucket.getDocCount());
}
avg
sourceBuilder.query(QueryBuilders.matchAllQuery()).aggregation(AggregationBuilders.avg("avg_price").field("price"));

ParsedAvg avgPrice = aggregations.get("avg_price");
sum
sourceBuilder.query(QueryBuilders.matchAllQuery()).aggregation(AggregationBuilders.sum("sum_price").field("price"));

ParsedSum sumPrice = aggregations.get("sum_price");
max
sourceBuilder.query(QueryBuilders.matchAllQuery()).aggregation(AggregationBuilders.max("max_price").field("price"));

ParsedMax maxPrice = aggregations.get("max_price");
min
sourceBuilder.query(QueryBuilders.matchAllQuery()).aggregation(AggregationBuilders.min("min_price").field("price"));

ParsedMin minPrice = aggregations.get("min_price");
对象操作文档

new ObjectMapper().writevalueAsString(book)

new ObjectMapper().readValue(hit.getSourceAsString(), Book.class)

@Test
void ObjectSave() throws IOException {
    Book book = new Book(1, "《史记》", "中国历史上第一部纪传体通史");
    IndexRequest indexRequest = new IndexRequest("books");
    indexRequest.id(book.getId().toString())
        .source(new ObjectMapper().writevalueAsString(book), XContentType.JSON);
    IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
    System.out.println(indexResponse.getId());
}
@Test
void ObjectQuery() throws IOException {
    SearchRequest searchRequest = new SearchRequest("books");
    SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
    sourceBuilder.query(QueryBuilders.matchAllQuery());
    searchRequest.source(sourceBuilder);
    SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
    SearchHit[] hits = searchResponse.getHits().getHits();
    for (SearchHit hit : hits) {
        Book book = new ObjectMapper().readValue(hit.getSourceAsString(), Book.class);
        System.out.println(book);
    }
}
MySQL 数据批量导入 ES

    mysql
    mysql-connector-java

利用 bulk 操作

private BulkProcessor getBulkProcessor() {
    BulkProcessor bulkProcessor = null;

    BulkProcessor.Listener listener = new BulkProcessor.Listener() {
        @Override
        public void beforeBulk(long executionId, BulkRequest bulkRequest) {
            System.out.println("executionId:" + executionId + "," + "Try to insert data number:" + bulkRequest.numberOfActions());
        }

        @Override
        public void afterBulk(long executionId, BulkRequest bulkRequest, BulkResponse bulkResponse) {
            System.out.println("executionId:" + executionId + "," + "Success insert data number:" + bulkRequest.numberOfActions());
        }

        @Override
        public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable failure) {
            System.out.println("executionId:" + executionId + "," + "Bulk is unSuccess:" + failure);
        }
    };

    BiConsumer> bulkConsumer = (request, bulkListener) ->
        restHighLevelClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);

    BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener, "books");
    builder.setBulkActions(5000);
    builder.setBulkSize(new ByteSizevalue(100L, ByteSizeUnit.MB));
    builder.setConcurrentRequests(10);
    builder.setFlushInterval(Timevalue.timevalueSeconds(100L));
    builder.setBackoffPolicy(BackoffPolicy.constantBackoff(Timevalue.timevalueSeconds(1L), 3));
    bulkProcessor = builder.build();

    return bulkProcessor;
}

利用 IndexRequest 中 source 传 Map 类型的数据进行批量导入

@Test
void importFromDBToEs() throws Exception {

    BulkProcessor bulkProcessor = getBulkProcessor();

    final String url = "jdbc:mysql:///bank?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC";
    final String user = "root";
    final String password = "123456";

    Connection connection = DriverManager.getConnection(url, user, password);
    PreparedStatement preparedStatement = connection.prepareStatement("select * from books");
    preparedStatement.setFetchSize(5);

    ResultSet resultSet = preparedStatement.executeQuery();
    ResultSetmetaData metaData = resultSet.getmetaData();

    ArrayList> dataList = new ArrayList<>();
    HashMap map;
    int count = 0;
    while (resultSet.next()) {
        count++;
        int columnCount = metaData.getColumnCount();
        map = new HashMap<>();
        for (int i = 1; i <= columnCount; i++) {
            String columnName = metaData.getColumnName(i);
            map.put(columnName, resultSet.getObject(columnName));
        }
        dataList.add(map);
        if (count % 10000 == 0) {
            for (HashMap dataMap : dataList) {
                bulkProcessor.add(new IndexRequest("books").source(dataMap));
            }
            map.clear();
            dataList.clear();
        }
    }
    for (HashMap dataMap : dataList) {
        bulkProcessor.add(new IndexRequest("books").source(dataMap));
    }
    bulkProcessor.flush();
    bulkProcessor.awaitClose(100, TimeUnit.SECONDS);
}

参考git:https://gitee.com/zhangyizhou/learning-elasticsearch-demo.git

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/775788.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号