因为关系型数据库的查询速度有限,正好学习了一下es,相采用mysql与es实时数据的同步,查询从es进行查询,这里记录一下canal如何配置和实现数据同步的,具体步骤如下:
下载地址链接:https://pan.baidu.com/s/135JUWTpIvvkFHBgR5bvk_g
提取码:0s14
–来自百度网盘超级会员V4的分享
log_bin=mysql-bin binlog-format=ROW # 重启mysql将下载好的canal解压后,进入到配置文件
cd /www/canal/conf/example vim instance.properties具体修改的配置如下(其它信息可以不用改) instance.properties配置
# position info canal.instance.master.address=127.0.0.1:3306 # username/password canal.instance.dbUsername=root canal.instance.dbPassword=123456 # mq config (主题,自己定义也可以使用默认的) canal.mq.topic=example修改canal的配置文件
vim /www/canal/conf/canal.propertiescanal.properties配置如下
canal.zkServers = 192.168.11.252:2181 # tcp, kafka, RocketMQ canal.serverMode = kafka canal.mq.servers = 192.168.11.252:9092
启动canal
cd /www/canal/bin ./start.sh启动好canal以后,我采用的springboot集成kafka进行消息消费 添加pom.xml依赖
org.elasticsearch.client
elasticsearch-rest-high-level-client
${elasticsearch.version}
org.elasticsearch.client
elasticsearch-rest-client
${elasticsearch.version}
org.elasticsearch
elasticsearch
${elasticsearch.version}
org.springframework.kafka
spring-kafka
添加kafka配置文件
# kafka相关配置
spring:
kafka:
bootstrap-servers: 192.168.11.252:9092 # kafka服务器地址(可以多个用逗号分开)
consumer:
group-id: test-consumer-group
enable-auto-commit: false
auto-commit-interval: 3000
listener:
ack-mode: MANUAL_IMMEDIATE
添加es配置文件
#elasticsearch 配置: elasticsearch: address: 192.168.11.252:9200 #如果是集群,用逗号隔开 connect-timeout: 1000 #连接超时时间 socket-timeout: 30000 #连接超时时间 connection-request-timeout: 500 max-connect-num: 100 max-connect-per-route: 100kafka消费类,采用监听模式
package com.wstx.oa.crm.handler;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.wstx.common.util.CollectionUtils;
import com.wstx.oa.crm.defines.BinlogConstants;
import com.wstx.oa.crm.utils.ElasticsearchUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.elasticsearch.action.index.IndexResponse;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.springframework.util.ClassUtils.getMethod;
@Component
@Slf4j
public class MessageHandler {
@Resource
ElasticsearchUtils elasticsearchUtils;
@KafkaListener(topics = {"example"})
public void handleMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
try {
String message = (String) record.value();
log.info("收到消息Str: {}", message);
Map map = JSON.parseObject(message);
List
BinlogConstants
package com.wstx.oa.crm.defines;
public class BinlogConstants {
public static String BINLOG_DATA_KEY = "data";
public static String BINLOG_TBL_KEY = "table";
public static String BINLOG_TYPE_KEY = "type";
public static String BINLOG_PK_KEY = "pkNames";
public final static String INSERT_EVENT = "INSERT";
public final static String UPDATE_EVENT = "UPDATE";
public final static String DELETE_EVENT = "DELETE";
}
ElasticsearchUtils
package com.wstx.oa.crm.utils;
import com.alibaba.fastjson.JSON;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
@Component
public class ElasticsearchUtils {
@Resource
private RestHighLevelClient restHighLevelClient;
public boolean isIndexExists(String indexName) {
boolean exists = false;
try {
GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
getIndexRequest.humanReadable(true);
exists = restHighLevelClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
return exists;
}
public IndexResponse saveEs(String table, Object object, String id) throws Exception {
IndexRequest request = new IndexRequest(table);
String userJson = JSON.toJSONString(object);
request.source(userJson, XContentType.JSON);
request.id(id);
IndexResponse response = null;
try {
response = restHighLevelClient.index(request, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
return response;
}
public boolean updateDoc(String indexName, String docId, Object object) throws IOException {
UpdateRequest request = new UpdateRequest(indexName, docId);
request.doc(JSON.toJSONString(object), XContentType.JSON);
request.docAsUpsert(true);
UpdateResponse updateResponse = restHighLevelClient.update(request, RequestOptions.DEFAULT);
String index = updateResponse.getIndex();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
return true;
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
return true;
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
}
return false;
}
public boolean deleteDoc(String indexName, String docId) throws IOException {
DeleteRequest request = new DeleteRequest(indexName, docId);
DeleteResponse deleteResponse = restHighLevelClient.delete(request, RequestOptions.DEFAULT);
// 解析response
String index = deleteResponse.getIndex();
String id = deleteResponse.getId();
long version = deleteResponse.getVersion();
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure :
shardInfo.getFailures()) {
String reason = failure.reason();
System.out.println("删除失败,原因为 :" + reason );
}
}
return true;
}
}



