栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Elasticsearch系列---Java客户端代码Demo

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Elasticsearch系列---Java客户端代码Demo

前言

前面历经33篇内容的讲解,与ES的请求操作都是在Kibana平台上用Restful请求完成的,一直没发布Java或python的客户端代码,Restful才是运用、理解ES核心功能最直接的表达方式,但实际项目中肯定是以Java/python来完成ES请求的发起与数据处理的,前面理解了ES的核心功能,后面Java API的使用将会非常简单,剩余的未覆盖的功能API,自行查阅文档即可。

概要

本篇讲解Elasticsearch的客户端API开发的一些示例,以Java语言为主,介绍一些最常用,最核心的API。

代码示例 引入依赖

我们以maven项目为例,添加项目依赖


	org.elasticsearch
	elasticsearch
	6.3.1


	org.elasticsearch.client
	transport
	6.3.1


	log4j
	log4j
	1.2.17


	org.apache.logging.log4j
	log4j-core
	2.12.1

建立ES连接
  1. 创建Settings对象,指定集群名称
  2. 创建TransportClient对象,手动指定IP、端口即可
Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build();
		
TransportClient client = new PreBuiltTransportClient(settings).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));

如果集群的节点数比较多,为每个node分别指定IP、Port可行性不高,我们可以使用集群节点自动探查的功能,代码如下:

// 将client.transport.sniff设置为true即可打开集群节点自动探查功能
Settings settings = Settings.builder().put("client.transport.sniff", true)..put("cluster.name", "elasticsearch").build();

// 只需要指定一个node就行
TransportClient client = new PreBuiltTransportClient(settings);
transport.addTransportAddress(new TransportAddress(InetAddress.getByName("192.168.17.137"), 9300));

基本CRUD

最基本的CRUD代码,可以当作入门demo来写:


	private static void createEmployee(TransportClient client) throws Exception {
		IndexResponse response = client.prepareIndex("company", "employee", "1")
				.setSource(XContentFactory.jsonBuilder()
						.startObject()
							.field("name", "jack")
							.field("age", 27)
							.field("position", "technique")
							.field("country", "china")
							.field("join_date", "2017-01-01")
							.field("salary", 10000)
						.endObject())
				.get();
		System.out.println(response.getResult()); 
	}
	
	
	private static void getEmployee(TransportClient client) throws Exception {
		GetResponse response = client.prepareGet("company", "employee", "1").get();
		System.out.println(response.getSourceAsString()); 
	}
	
	
	private static void updateEmployee(TransportClient client) throws Exception {
		UpdateResponse response = client.prepareUpdate("company", "employee", "1") 
				.setDoc(XContentFactory.jsonBuilder()
							.startObject()
								.field("position", "technique manager")
							.endObject())
				.get();
		System.out.println(response.getResult());  
 	}
	
	
	private static void deleteEmployee(TransportClient client) throws Exception {
		DeleteResponse response = client.prepareDelete("company", "employee", "1").get();
		System.out.println(response.getResult());  
	}
搜索

我们之前使用Restful的搜索,现在改用java实现,原有的Restful示例如下:

GET /company/employee/_search
{
  "query": {
    "bool": {
      "must": [
 {
   "match": {
     "position": "technique"
   }
 }
      ],
      "filter": {
 "range": {
   "age": {
     "gte": 30,
     "lte": 40
   }
 }
      }
    }
  },
  "from": 0,
  "size": 1
}

等同于这样的Java代码:

SearchResponse response = client.prepareSearch("company")
 .setTypes("employee")
 .setQuery(QueryBuilders.termQuery("position", "technique"))   // Query
 .setPostFilter(QueryBuilders.rangeQuery("age").from(30).to(40))     // Filter
 .setFrom(0).setSize(60)
 .get();
聚合查询

聚合查询稍微麻烦一些,请求的封装和响应报文的解析,都是根据实际返回的结构来做的,例如下面的查询:

需求:

  1. 按照country国家来进行分组
  2. 在每个country分组内,再按照入职年限进行分组
  3. 最后计算每个分组内的平均薪资

Restful的请求如下:

GET /company/employee/_search
{
  "size": 0,
  "aggs": {
    "group_by_country": {
      "terms": {
 "field": "country"
      },
      "aggs": {
 "group_by_join_date": {
   "date_histogram": {
     "field": "join_date",
     "interval": "year"
   },
   "aggs": {
     "avg_salary": {
"avg": {
  "field": "salary"
}
     }
   }
 }
      }
    }
  }
}

用Java编写的请求如下:

SearchResponse sr = node.client().prepareSearch()
    .addAggregation(
 AggregationBuilders.terms("by_country").field("country")
 .subAggregation(AggregationBuilders.dateHistogram("by_year")
     .field("dateOfBirth")
     .dateHistogramInterval(DateHistogramInterval.YEAR)
     .subAggregation(AggregationBuilders.avg("avg_children").field("children"))
 )
    )
    .execute().actionGet();

对响应的处理,则需要一层一层获取数据:

Map aggrMap = searchResponse.getAggregations().asMap();
	StringTerms groupByCountry = (StringTerms) aggrMap.get("group_by_country");
	Iterator groupByCountryBucketIterator = groupByCountry.getBuckets().iterator();
	
	while(groupByCountryBucketIterator.hasNext()) {
		Bucket groupByCountryBucket = groupByCountryBucketIterator.next();
		
		System.out.println(groupByCountryBucket.getKey() + "t" + groupByCountryBucket.getDocCount()); 
		
		Histogram groupByJoinDate = (Histogram) groupByCountryBucket.getAggregations().asMap().get("group_by_join_date"); 
		Iterator groupByJoinDateBucketIterator = groupByJoinDate.getBuckets().iterator();
		 
		while(groupByJoinDateBucketIterator.hasNext()) {
			org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket groupByJoinDateBucket = groupByJoinDateBucketIterator.next();
			
			System.out.println(groupByJoinDateBucket.getKey() + "t" + groupByJoinDateBucket.getDocCount()); 
			
			Avg avgSalary = (Avg) groupByJoinDateBucket.getAggregations().asMap().get("avg_salary");
			System.out.println(avgSalary.getValue()); 
		}
	}
	
	client.close();
}
upsert请求
private static void upsert(TransportClient transport) {
	try {
		IndexRequest index = new IndexRequest("book_shop", "books", "2").source(
				XContentFactory.jsonBuilder().startObject()
						.field("name", "mysql从入门到删库跑路")
						.field("tags", "mysql")
						.field("price", 32.8)
						.endObject());

		UpdateRequest update = new UpdateRequest("book_shop", "books", "2")
				.doc(XContentFactory.jsonBuilder()
						.startObject().field("price", 31.8)
						.endObject())
				.upsert(index);
		UpdateResponse response = transport.update(update).get();
		System.out.println(response.getVersion());
	} catch (IOException e) {
		e.printStackTrace();
	} catch (InterruptedException e) {
		e.printStackTrace();
	} catch (ExecutionException e) {
		e.printStackTrace();
	}
}
mget请求
public static void mget(TransportClient transport) {
	MultiGetResponse res = transport.prepareMultiGet()
			.add("book_shop", "books", "1")
			.add("book_shop", "books", "2")
			.get();
	for (MultiGetItemResponse item : res.getResponses()) {
		System.out.println(item.getResponse());
	}
}
bulk请求
public static void bulk(TransportClient transport) {
	try {
	BulkRequestBuilder bulk = transport.prepareBulk();
	bulk.add(transport.prepareIndex("book_shop", "books", "3").setSource(
			XContentFactory.jsonBuilder().startObject()
					.field("name", "设计模式从入门到拷贝代码")
					.field("tags", "设计模式")
					.field("price", 55.9)
					.endObject()));
		bulk.add(transport.prepareIndex("book_shop", "books", "4").setSource(
				XContentFactory.jsonBuilder().startObject()
						.field("name", "架构设计从入门到google搜索")
						.field("tags", "架构设计")
						.field("price", 68.9)
						.endObject()));
		bulk.add(transport.prepareUpdate("book_shop", "books", "1").setDoc((XContentFactory.jsonBuilder()
				.startObject().field("price", 32.8)
				.endObject())));

		BulkResponse bulkRes = bulk.get();
		if (bulkRes.hasFailures()) {
			System.out.println("Error...");
		}
	} catch (IOException e) {
		e.printStackTrace();
	}
}
scorll请求
public static void scorll(TransportClient client) {
	SearchResponse bookShop = client.prepareSearch("book_shop").setScroll(new Timevalue(60000)).setSize(1).get();

	int batchCnt = 0;
	do {
	    // 循环读取scrollid信息,直到结果为空
		for(SearchHit hit: bookShop.getHits().getHits()) {
			System.out.println("batchCnt:" + ++batchCnt);
			System.out.println(hit.getSourceAsString());
		}
		bookShop = client.prepareSearchScroll(bookShop.getScrollId()).setScroll(new Timevalue(60000)).execute().actionGet();
	} while (bookShop.getHits().getHits().length != 0);
}

搜索模板
public static void searchTemplates(TransportClient client) {
	Map params = new HashMap<>(10);
	params.put("from",0);
	params.put("size",10);
	params.put("tags","java");

	SearchTemplateResponse str = new SearchTemplateRequestBuilder(client)
			.setscript("page_query_by_tags")
			.setscriptType(scriptType.STORED)
			.setscriptParams(params)
			.setRequest(new SearchRequest())
			.get();

	for(SearchHit hit:str.getResponse().getHits().getHits()) {
		System.out.println(hit.getSourceAsString());
	}
}
多条件组合查询
public static void otherSearch(TransportClient client) {
	SearchResponse response1 = client.prepareSearch("book_shop").setQuery(QueryBuilders.termQuery("tags", "java")).get();
	SearchResponse response2 = client.prepareSearch("book_shop").setQuery(QueryBuilders.multiMatchQuery("32.8", "price","tags")).get();
	SearchResponse response3 = client.prepareSearch("book_shop").setQuery(QueryBuilders.commonTermsQuery("name", "入门")).get();
	SearchResponse response4 = client.prepareSearch("book_shop").setQuery(QueryBuilders.prefixQuery("name", "java")).get();

	System.out.println(response1.getHits().getHits()[0].getSourceAsString());
	System.out.println(response2.getHits().getHits()[0].getSourceAsString());
	System.out.println(response3.getHits().getHits()[0].getSourceAsString());
	System.out.println(response4.getHits().getHits()[0].getSourceAsString());

	// 多个条件组合
	SearchResponse response5 = client.prepareSearch("book_shop").setQuery(QueryBuilders.boolQuery()
			.must(QueryBuilders.termQuery("tags", "java"))
			.mustNot(QueryBuilders.matchQuery("name", "跑路"))
			.should(QueryBuilders.matchQuery("name", "入门"))
			.filter(QueryBuilders.rangeQuery("price").gte(23).lte(55))).get();

	System.out.println(response5.getHits().getHits()[0].getSourceAsString());
}
地理位置查询
public static void geo(TransportClient client) {
	GeoBoundingBoxQueryBuilder query1 = QueryBuilders.geoBoundingBoxQuery("location").setCorners(23, 112, 21, 114);

	List points = new ArrayList<>();
	points.add(new GeoPoint(23,115));
	points.add(new GeoPoint(25,113));
	points.add(new GeoPoint(21,112));
	GeoPolygonQueryBuilder query2 = QueryBuilders.geoPolygonQuery("location",points);

	GeoDistanceQueryBuilder query3 = QueryBuilders.geoDistanceQuery("location").point(22.523375, 113.911231).distance(500, DistanceUnit.METERS);


	SearchResponse response = client.prepareSearch("location").setQuery(query3).get();
	for(SearchHit hit:response.getHits().getHits()) {
		System.out.println(hit.getSourceAsString());
	}
}
小结

上述的那些案例demo,快速浏览一下即可,如果已经在开发ES相关的项目,还是多参考官方的API文档:https://www.elastic.co/guide/en/elasticsearch/client/java-api/6.3/index.html。上面有很详尽的API说明和使用Demo

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号