配置
@Configuration
@Slf4j
public class ElasticSearchConfig {
final static String schema = "http";
final static int connectTimeOut = 6000;
final static int socketTimeOut = 30000;
final static int connectionRequestTimeOut = 1000;
final static int maxConnectNum = 100;
final static int maxConnectPerRoute = 100;
@Value("${elasticsearch.nodes}")
String[] nodes;
@Value("${elasticsearch.username}")
String userName;
@Value("${elasticsearch.password}")
String password;
private List hostList() {
List nodeList = Lists.newArrayList();
Arrays.asList(nodes).stream().forEach(each -> {
String[] array = each.split(":");
nodeList.add(new HttpHost(array[0], Integer.valueOf(array[1]), schema));
});
return nodeList;
}
@Bean(name="restHighLevelClient")
public RestHighLevelClient client() {
RestClientBuilder builder = RestClient.builder(hostList().toArray(new HttpHost[0]));
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
builder.setHttpClientConfigCallback(httpClientBuilder ->{
httpClientBuilder.disableAuthCaching();
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
});
builder.setRequestConfigCallback(requestConfigBuilder->{
requestConfigBuilder.setConnectTimeout(connectTimeOut);
requestConfigBuilder.setSocketTimeout(socketTimeOut);
requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut);
return requestConfigBuilder;
});
builder.setHttpClientConfigCallback(httpClientBuilder ->{
httpClientBuilder.setMaxConnTotal(maxConnectNum);
httpClientBuilder.setMaxConnPerRoute(maxConnectPerRoute);
return httpClientBuilder;
});
return new RestHighLevelClient(builder);
}
}
使用方法
@Slf4j
@Component
public class EsUtil {
@Autowired
private RestHighLevelClient restHighLevelClient;
public void addToEs(String indexName, String json){
IndexRequest indexRequest = new IndexRequest(indexName);
indexRequest = indexRequest.id(UUID.randomUUID().toString().replace("-", ""));
indexRequest = indexRequest.source(json, XContentType.JSON);
indexRequest = indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
try {
restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("数据插入es失败: " + e.getMessage());
}
}
public void scrollId(long lMin, long lMax, String tableName) {
if (null != restHighLevelClient) {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder qbs = QueryBuilders.boolQuery();
QueryBuilder qb = QueryBuilders.termQuery("gender", 1);
QueryBuilder qb1 = QueryBuilders.rangeQuery("create_time").from(lMin).to(lMax);
qbs.must(qb);
qbs.must(qb1);
sourceBuilder.size(10000);
sourceBuilder.trackTotalHits(true);
sourceBuilder.query(qbs);
SearchRequest searchRequest = new SearchRequest(tableName + "*");
searchRequest.source(sourceBuilder);
searchRequest.scroll(Timevalue.timevalueMinutes(10));
try {
SearchResponse res = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
String scrollId = null;
if(res != null && res.getHits().getHits().length > 0){
res.getHits().forEach(e -> {
JSONObject obj = JSONObject.parseObject(e.getSourceAsString());
String name = obj.getString("name");
log.info("name:" + name);
});
scrollId = res.getScrollId();
}
while (true){
if(scrollId == null){
break;
}
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId).scroll(Timevalue.timevalueMinutes(10));
res = restHighLevelClient.searchScroll(searchScrollRequest,RequestOptions.DEFAULT);
if(res != null && res.getHits().getHits().length > 0){
res.getHits().forEach(e -> {
JSONObject obj = JSONObject.parseObject(e.getSourceAsString());
String name = obj.getString("name");
log.info("name:" + name);
});
scrollId = res.getScrollId();
}else {
break;
}
}
//完成滚动后,清除滚动上下文
if(res != null && res.getScrollId() != null && !"".equals(res.getScrollId())){
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
ClearScrollResponse clearScrollResponse = restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
if(!clearScrollResponse.isSucceeded()){
log.error("清除ScrollId报错: " + clearScrollResponse.toString());
}
}
} catch (Exception e) {
log.error("es查询出错,错误信息:" + e.getMessage());
}
}
}
}