栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

es的基本使用

es的基本使用

配置
@Configuration
@Slf4j
public class ElasticSearchConfig {

    
    final static String schema = "http";

    
    final static int connectTimeOut = 6000;

    
    final static int socketTimeOut = 30000;

    
    final static int connectionRequestTimeOut = 1000;

    
    final static int maxConnectNum = 100;

    
    final static int maxConnectPerRoute = 100;

    @Value("${elasticsearch.nodes}")
    String[] nodes;
    @Value("${elasticsearch.username}")
    String userName;
    @Value("${elasticsearch.password}")
    String password;

    private List hostList() {
        List nodeList = Lists.newArrayList();
        Arrays.asList(nodes).stream().forEach(each -> {
            String[] array = each.split(":");
            nodeList.add(new HttpHost(array[0], Integer.valueOf(array[1]), schema));
        });
        return nodeList;
    }

    @Bean(name="restHighLevelClient")
    public RestHighLevelClient client() {
        RestClientBuilder builder = RestClient.builder(hostList().toArray(new HttpHost[0]));
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
        builder.setHttpClientConfigCallback(httpClientBuilder ->{
           httpClientBuilder.disableAuthCaching();
           return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
        });

        builder.setRequestConfigCallback(requestConfigBuilder->{
            requestConfigBuilder.setConnectTimeout(connectTimeOut);
            requestConfigBuilder.setSocketTimeout(socketTimeOut);
            requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut);
            return requestConfigBuilder;
        });

        builder.setHttpClientConfigCallback(httpClientBuilder ->{
            httpClientBuilder.setMaxConnTotal(maxConnectNum);
            httpClientBuilder.setMaxConnPerRoute(maxConnectPerRoute);
            return httpClientBuilder;
        });
        return new RestHighLevelClient(builder);
    }
}

使用方法
@Slf4j
@Component
public class EsUtil {

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    public void addToEs(String indexName, String json){
        IndexRequest indexRequest = new IndexRequest(indexName);
        indexRequest  = indexRequest.id(UUID.randomUUID().toString().replace("-", ""));
        indexRequest = indexRequest.source(json, XContentType.JSON);
        indexRequest = indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        try {
            restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.error("数据插入es失败: " + e.getMessage());
        }
    }

    public void scrollId(long lMin, long lMax, String tableName) {
        if (null != restHighLevelClient) {
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            BoolQueryBuilder qbs = QueryBuilders.boolQuery();
            QueryBuilder qb = QueryBuilders.termQuery("gender", 1);
            QueryBuilder qb1 = QueryBuilders.rangeQuery("create_time").from(lMin).to(lMax);
            qbs.must(qb);
            qbs.must(qb1);
            sourceBuilder.size(10000);
            sourceBuilder.trackTotalHits(true);
            sourceBuilder.query(qbs);

            SearchRequest searchRequest = new SearchRequest(tableName + "*");
            searchRequest.source(sourceBuilder);
            searchRequest.scroll(Timevalue.timevalueMinutes(10));
            try {
                SearchResponse res = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
                String scrollId = null;
                if(res != null && res.getHits().getHits().length > 0){
                    res.getHits().forEach(e -> {
                        JSONObject obj = JSONObject.parseObject(e.getSourceAsString());
                        String name = obj.getString("name");
                        log.info("name:" + name);
                    });
                    scrollId = res.getScrollId();
                }

                while (true){
                    if(scrollId == null){
                        break;
                    }
                    SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId).scroll(Timevalue.timevalueMinutes(10));
                    res = restHighLevelClient.searchScroll(searchScrollRequest,RequestOptions.DEFAULT);
                    if(res != null && res.getHits().getHits().length > 0){
                        res.getHits().forEach(e -> {
                            JSONObject obj = JSONObject.parseObject(e.getSourceAsString());
                            String name = obj.getString("name");
                            log.info("name:" + name);
                        });
                        scrollId = res.getScrollId();
                    }else {
                        break;
                    }
                }

                //完成滚动后,清除滚动上下文
                if(res != null && res.getScrollId() != null && !"".equals(res.getScrollId())){
                    ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
                    clearScrollRequest.addScrollId(scrollId);
                    ClearScrollResponse clearScrollResponse = restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
                    if(!clearScrollResponse.isSucceeded()){
                        log.error("清除ScrollId报错: " + clearScrollResponse.toString());
                    }
                }

            } catch (Exception e) {
                log.error("es查询出错,错误信息:" + e.getMessage());
            }
        }
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/722867.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号