我就废话不多说了,大家还是直接看代码吧~
@Configuration
public class ElasticSearchDataSourceConfigurer {
private static final Logger LOG = LogManager.getLogger(ElasticSearchDataSourceConfigurer.class);
@Bean
public TransportClient getESClient() {
//设置集群名称
Settings settings = Settings.builder().put("cluster.name", "bigData-cluster").put("client.transport.sniff", true).build();
//创建client
TransportClient client = null;
try {
client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(""), 9300));//集群ip
LOG.info("ESClient连接建立成功");
} catch (UnknownHostException e) {
LOG.info("ESClient连接建立失败");
e.printStackTrace();
}
return client;
}
}
@Repository
public class UserDaoImpl implements userDao {
private static final String INDEXNAME = "user";//小写
private static final String TYPENAME = "info";
@Resource
TransportClient transportClient;
@Override
public int addUser(User[] user) {
IndexResponse indexResponse = null;
int successNum = 0;
for (int i = 0; i < user.length; i++) {
UUID uuid = UUID.randomUUID();
String str = uuid.toString();
String jsonValue = null;
try {
jsonValue = JsonUtil.object2JsonString(user[i]);
if (jsonValue != null) {
indexResponse = transportClient.prepareIndex(INDEXNAME, TYPENAME, str).setSource(jsonValue)
.execute().actionGet();
successNum++;
}
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
return successNum;
}
}
public static void bathAddUser(TransportClient client, Listusers) { BulkRequestBuilder bulkRequest = transportClient.prepareBulk(); for (int i = 0; i < users.size(); i++) { UUID uuid = UUID.randomUUID(); String str = uuid.toString(); String jsonValue = null; try { jsonValue = JsonUtil.object2JsonString(users.get(i)); } catch (JsonProcessingException e) { e.printStackTrace(); } bulkRequest.add(client.prepareIndex("user", "info", str).setSource(jsonValue)); // 一万条插入一次 if (i % 10000 == 0) { bulkRequest.execute().actionGet(); } System.out.println("已经插入第" + i + "多少条"); } }
补充知识:使用java创建ES(ElasticSearch)连接池
1.首先要有一个创建连接的工厂类
package com.aly.util; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.PooledObjectFactory; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; public class EsClientPoolFactory implements PooledObjectFactory{ @Override public void activateObject(PooledObject arg0) throws Exception { System.out.println("activateObject"); } @Override public void destroyObject(PooledObject pooledObject) throws Exception { RestHighLevelClient highLevelClient = pooledObject.getObject(); highLevelClient.close(); } // @SuppressWarnings({ "resource" }) @Override public PooledObject makeObject() throws Exception { // Settings settings = Settings.builder().put("cluster.name","elasticsearch").build(); RestHighLevelClient client = null; try { client = new RestHighLevelClient(RestClient.builder( new HttpHost("192.168.1.121", 9200, "http"), new HttpHost("192.168.1.122", 9200, "http"), new HttpHost("192.168.1.123", 9200, "http"), new HttpHost("192.168.1.125", 9200, "http"), new HttpHost("192.168.1.126", 9200, "http"), new HttpHost("192.168.1.127", 9200, "http"))); } catch (Exception e) { e.printStackTrace(); } return new DefaultPooledObject (client); } @Override public void passivateObject(PooledObject arg0) throws Exception { System.out.println("passivateObject"); } @Override public boolean validateObject(PooledObject arg0) { return true; } }
2.然后再写我们的连接池工具类
package com.aly.util;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.elasticsearch.client.RestHighLevelClient;
public class ElasticSearchPoolUtil {
// 对象池配置类,不写也可以,采用默认配置
private static GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
// 采用默认配置maxTotal是8,池中有8个client
static {
poolConfig.setMaxTotal(8);
}
// 要池化的对象的工厂类,这个是我们要实现的类
private static EsClientPoolFactory esClientPoolFactory = new EsClientPoolFactory();
// 利用对象工厂类和配置类生成对象池
private static GenericObjectPool clientPool = new GenericObjectPool<>(esClientPoolFactory,
poolConfig);
public static RestHighLevelClient getClient() throws Exception {
// 从池中取一个对象
RestHighLevelClient client = clientPool.borrowObject();
return client;
}
public static void returnClient(RestHighLevelClient client) {
// 使用完毕之后,归还对象
clientPool.returnObject(client);
}
}
以上这篇java连接ElasticSearch集群操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持考高分网。



