目录
1.什么是canal
2.canal工作原理
3.下载安装
4.连接配置kafka
1.打开config目录下的canal.properties文件
2.修改相关配置
3.mysql相关配置
3.1验证binary log日志是否开启
4.测试kafka
5.canal实现同步Elasticsearch
1.添加依赖
2.编写代码
1.什么是canal
canal是阿里开源针对mysql的一个数据实时同步的一个框架,可以实现无代码,简单配置即可完成数据库数据的同步到kafka,rabbitMQ等消息,增加,修改,删除
2.canal工作原理
canal的工作原理:其实就是mysql的主从复制,只不过canal不是真正的从节点,而是伪装成mysql的slave从节点,模拟mysql从节点的交互方式,给主节点发送dump请求,主节点接收到了之后,会把binary log日志推送给canal,然后canal进行解析,从而发送到存储目的地
3.下载安装
canal是阿里开源针对mysql的一个数据实时同步的一个框架,可以实现无代码,简单配置即可完成数据库数据的同步到kafka,rabbitMQ等消息,增加,修改,删除
canal的工作原理:其实就是mysql的主从复制,只不过canal不是真正的从节点,而是伪装成mysql的slave从节点,模拟mysql从节点的交互方式,给主节点发送dump请求,主节点接收到了之后,会把binary log日志推送给canal,然后canal进行解析,从而发送到存储目的地
3.下载安装
下载canal.deployer服务端
canal下载地址,下载1.14,1.15都可以,但是1.15才能支持es7.x以上的版本注意一下
这里以1.15为例,下载解压后
bin目录下有个startup.bat 双击即可打开
4.连接配置kafka
首先要修改几个配置
1.打开config目录下的canal.properties文件
2.修改相关配置
2.1 往下拉会看见关于kafka相关的配置
2.2 在config目录下有个example文件,点击后会看见instance.properties文件,打开配置一下kafka的主题,以下操作都是在instance.properties文件里面
2.3如果数据库不在本地,记得修改下这里
2.4如果没有其他的设置,这里的username/password要修改为数据库的账户和密码
2.5这里的slaveId要打开,值不要和mysql配置的一样就好
3.mysql相关配置
这里以mysql5.7.20解压版本为例
解压版本目录下是没有my.ini文件,自行创建一个就好,具体解压版要如何安装自己百度看下,这里只说一下canal相关的配置
因为mysql默认是不开binary log日志的,所以第一步要开启binary log日志,在my.ini配置一下就好,打开my.ini,务必在[mysqld]下面添加这四行,不在mysqld下面添加不生效,server-id=1如果有可以不加
log-bin=mysql-bin # 开启Binlog 一般只需要修改这一行即可 binlog-format=ROW # 设置格式 此行可以不加 命令设置即可 详见下方拓展 binlog-do-db=test # 监控mysql中test这个库下面的所有表 server_id=1 # 配置serverID
3.1验证binary log日志是否开启
要重启mysql不然配置的不生效,再mysql的bin路径下
D:mysql-5.7.20-winx64bin 输入如下命令,要用管理员的方式进cmd命令
首先应该先关闭mysql服务
net stop mysql
然后再开启mysql服务
net start mysql
要重启mysql不然配置的不生效,再mysql的bin路径下
D:mysql-5.7.20-winx64bin 输入如下命令,要用管理员的方式进cmd命令
首先应该先关闭mysql服务
net stop mysql
然后再开启mysql服务
net start mysql
然后登录mysql看下日志是否开启
mysql -uroot -p show variables like 'log_bin';
开启之后就可以开始测试了!
4.测试kafka
首先要下载安装kafka,然后打开kafka,进入之前在canal配置文件里面配置的topic主题监控
需要在kafka的路径下输入,D:kafka_2.11-2.4.1
打开zookeeper .binwindowszookeeper-server-start.bat configzookeeper.properties 打开kafka .binwindowskafka-server-start.bat .configserver.properties 进入topic .binwindowskafka-console-consumer.bat --bootstrap-server 10.88.40.156:9092 --topic topic2 --from-beginning
在数据库添加一条数据
去topic看下是否发送到了
ok结束,canal数据同步到kafka结束
5.canal实现同步Elasticsearch
canal1.15才支持es7.x以上
记得修改canal的config目录下的canal.properties文件
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = tcp #如果是es修改为tcp模式
1.添加依赖
org.springframework.boot
spring-boot-starter-data-elasticsearch
com.alibaba
fastjson
1.2.76
org.jsoup
jsoup
1.13.1
com.alibaba.otter
canal.client
1.1.4
2.编写代码
canal实现数据库发生增加,修改和删除就会被获取到
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.mybatisplus.entity.Content;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
@Component
public class CanalClient {
//@PostConstruct项目启动是就会加载这个方法,用这个注解可以实现此方法一直属于运行的状态,只要数据库发生增加,修改和删除就会被获取到
@PostConstruct
public List canal() throws InvalidProtocolBufferException {
List contentList = new ArrayList<>();
//获取连接
CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1",11111),
"example","","");
//连接
canalConnector.connect();
//订阅数据库
canalConnector.subscribe("test.*");
//获取数据,一次抓取100条,没有100条就全部抓完,有多少抓多少,并不会堵塞
Message message = canalConnector.get(100);
//获取Entry集合
List entries = message.getEntries();
//如果集合为空,就等待几秒
if (!entries.isEmpty()){
//遍历entries逐个解析
for (CanalEntry.Entry entry : entries) {
//1.获取表名
String tableName = entry.getHeader().getTableName();
//2.获取获取类型
CanalEntry.EntryType entryType = entry.getEntryType();
//3.获取序列化后的数据
ByteString storevalue = entry.getStorevalue();
//4.判断当前的数据类型是否为rowData类型
if (CanalEntry.EntryType.ROWDATA.equals(entryType)){
//反序列化数据
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storevalue);
//获取当前事件的类型
CanalEntry.EventType eventType = rowChange.getEventType();
//获取数据集
List rowDataList = rowChange.getRowDatasList();
//遍历rowDataList数据集并打印
for (CanalEntry.RowData rowdata: rowDataList) {
//修改之后的数据
JSonObject afterData = new JSonObject();
List afterColumnsList = rowData.getAfterColumnsList();
for (CanalEntry.Column column : afterColumnsList) {
//name字段名,value是字段名对应的值
afterData.put(column.getName(),column.getValue());
//将JSONObject类型的数据转化为content对象
Content content = afterData.toJavaObject(Content.class);
contentList.add(content);
}
System.out.println("表名:"+tableName+",类型"+eventType+",after:"+contentList);
}
}
}
}
return contentList;
}
}
连接Elasticsearch,写个config类
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ElasticsearchConfig {
@Bean
public RestHighLevelClient restHighLevelClient(){
RestHighLevelClient restHighLevelClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost",9200,"http"))
);
return restHighLevelClient;
}
}
解析网页数据,拿到数据放到数据库,从而实现数据库变化并实时同步到es索引库中
import com.mybatisplus.entity.Content;
import org.jsoup.Jsoup;
import org.jsoup.nodes.document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
@Component
public class HtmlParseUtil {
public List parseJD(String keyword) throws IOException {
List contentList = new ArrayList<>();
//获得请求
String url = "https://search.xxxxx.com/Search?keyword="+keyword;
//解析地址,如果30s内解析不到就会报错
// jsoup返回的 document 对象就是 js浏览器里面的document对象
document parse = Jsoup.parse(new URL(url), 30000);
//先获取id=J_goodsList的大的一个商品div
Element elementById = parse.getElementById("J_goodsList");
//再获取包含一个个商品的li标签
Elements li = elementById.getElementsByTag("li");
//循环这个li,这里的li包含的就是一个个商品,商品有价格,名称,店铺名,图片地址等等
for (Element element : li) {
//为了用户体验,一般大厂的网页图片都懒加载,刚开始只会加载一个默认图片,正在的图片地址在这个data-lazy-img 里面,而不是在src里面
String img = element.getElementsByTag("img").eq(0).attr("data-lazy-img");
String price = element.getElementsByClass("p-price").eq(0).text();
String name = element.getElementsByClass("p-name").eq(0).text();
String shop = element.getElementsByClass("p-shopnum").eq(0).text();
Content content = new Content();
content.setImg(img);
content.setPrice(price);
content.setTitle(name);
content.setShop(shop);
contentList.add(content);
}
return contentList;
}
实体类
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import lombok.Data;
@Data
public class Content {
//myBatis-plus 的id自增
@TableId(type = IdType.AUTO)
private Integer id;
private String title;
private String price;
private String img;
private String shop;
}
mapper层代码
canal1.15才支持es7.x以上
记得修改canal的config目录下的canal.properties文件
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = tcp #如果是es修改为tcp模式
org.springframework.boot
spring-boot-starter-data-elasticsearch
com.alibaba
fastjson
1.2.76
org.jsoup
jsoup
1.13.1
com.alibaba.otter
canal.client
1.1.4
2.编写代码
canal实现数据库发生增加,修改和删除就会被获取到
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.mybatisplus.entity.Content;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
@Component
public class CanalClient {
//@PostConstruct项目启动是就会加载这个方法,用这个注解可以实现此方法一直属于运行的状态,只要数据库发生增加,修改和删除就会被获取到
@PostConstruct
public List canal() throws InvalidProtocolBufferException {
List contentList = new ArrayList<>();
//获取连接
CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1",11111),
"example","","");
//连接
canalConnector.connect();
//订阅数据库
canalConnector.subscribe("test.*");
//获取数据,一次抓取100条,没有100条就全部抓完,有多少抓多少,并不会堵塞
Message message = canalConnector.get(100);
//获取Entry集合
List entries = message.getEntries();
//如果集合为空,就等待几秒
if (!entries.isEmpty()){
//遍历entries逐个解析
for (CanalEntry.Entry entry : entries) {
//1.获取表名
String tableName = entry.getHeader().getTableName();
//2.获取获取类型
CanalEntry.EntryType entryType = entry.getEntryType();
//3.获取序列化后的数据
ByteString storevalue = entry.getStorevalue();
//4.判断当前的数据类型是否为rowData类型
if (CanalEntry.EntryType.ROWDATA.equals(entryType)){
//反序列化数据
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storevalue);
//获取当前事件的类型
CanalEntry.EventType eventType = rowChange.getEventType();
//获取数据集
List rowDataList = rowChange.getRowDatasList();
//遍历rowDataList数据集并打印
for (CanalEntry.RowData rowdata: rowDataList) {
//修改之后的数据
JSonObject afterData = new JSonObject();
List afterColumnsList = rowData.getAfterColumnsList();
for (CanalEntry.Column column : afterColumnsList) {
//name字段名,value是字段名对应的值
afterData.put(column.getName(),column.getValue());
//将JSONObject类型的数据转化为content对象
Content content = afterData.toJavaObject(Content.class);
contentList.add(content);
}
System.out.println("表名:"+tableName+",类型"+eventType+",after:"+contentList);
}
}
}
}
return contentList;
}
}
连接Elasticsearch,写个config类
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ElasticsearchConfig {
@Bean
public RestHighLevelClient restHighLevelClient(){
RestHighLevelClient restHighLevelClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost",9200,"http"))
);
return restHighLevelClient;
}
}
解析网页数据,拿到数据放到数据库,从而实现数据库变化并实时同步到es索引库中
import com.mybatisplus.entity.Content;
import org.jsoup.Jsoup;
import org.jsoup.nodes.document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
@Component
public class HtmlParseUtil {
public List parseJD(String keyword) throws IOException {
List contentList = new ArrayList<>();
//获得请求
String url = "https://search.xxxxx.com/Search?keyword="+keyword;
//解析地址,如果30s内解析不到就会报错
// jsoup返回的 document 对象就是 js浏览器里面的document对象
document parse = Jsoup.parse(new URL(url), 30000);
//先获取id=J_goodsList的大的一个商品div
Element elementById = parse.getElementById("J_goodsList");
//再获取包含一个个商品的li标签
Elements li = elementById.getElementsByTag("li");
//循环这个li,这里的li包含的就是一个个商品,商品有价格,名称,店铺名,图片地址等等
for (Element element : li) {
//为了用户体验,一般大厂的网页图片都懒加载,刚开始只会加载一个默认图片,正在的图片地址在这个data-lazy-img 里面,而不是在src里面
String img = element.getElementsByTag("img").eq(0).attr("data-lazy-img");
String price = element.getElementsByClass("p-price").eq(0).text();
String name = element.getElementsByClass("p-name").eq(0).text();
String shop = element.getElementsByClass("p-shopnum").eq(0).text();
Content content = new Content();
content.setImg(img);
content.setPrice(price);
content.setTitle(name);
content.setShop(shop);
contentList.add(content);
}
return contentList;
}
实体类
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import lombok.Data;
@Data
public class Content {
//myBatis-plus 的id自增
@TableId(type = IdType.AUTO)
private Integer id;
private String title;
private String price;
private String img;
private String shop;
}
mapper层代码
canal实现数据库发生增加,修改和删除就会被获取到
连接Elasticsearch,写个config类
解析网页数据,拿到数据放到数据库,从而实现数据库变化并实时同步到es索引库中
实体类
mapper层代码
import com.baomidou.mybatisplus.core.mapper.baseMapper; import com.mybatisplus.entity.Content; import org.springframework.stereotype.Repository; @Repository public interface ContentMapper extends baseMapper{ }
service代码
import com.alibaba.fastjson.JSON;
import com.mybatisplus.canal.CanalClient;
import com.mybatisplus.entity.Content;
import com.mybatisplus.mapper.ContentMapper;
import com.mybatisplus.until.HtmlParseUtil;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.core.Timevalue;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Service
public class ContentService {
@Autowired
private RestHighLevelClient client;
@Autowired
private ContentMapper contentMapper;
@Autowired
private HtmlParseUtil htmlParseUtil;
@Autowired
private CanalClient canalClient;
//将解析的网页数据保存到数据库中
public void insertData(String keyword) throws IOException {
List contentList = htmlParseUtil.parseJD(keyword);
for (Content content : contentList) {
contentMapper.insert(content);
}
}
//将数据库中的数据更新到es索引库中
public boolean saveES(String keyword) throws IOException {
//网页解析存到数据库
insertData(keyword);
//通过canal拿到新增的数据
List canal = canalClient.canal();
//数据库批量插入到es索引库中
BulkRequest bulkRequest = new BulkRequest("jd","jd_goods");
bulkRequest.timeout();
for (Content content : canal) {
bulkRequest.add(new IndexRequest("jd").id(String.valueOf(content.getId())).source(JSON.toJSonString(content), XContentType.JSON));
}
BulkResponse bulk = client.bulk(bulkRequest, RequestOptions.DEFAULT);
//批量插入成功返回false,所以加个非!
return !bulk.hasFailures();
}
//查询数据,分页
public List
controller层
import com.mybatisplus.service.ContentService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@RestController
public class ContentController {
@Autowired
private ContentService contentService;
//添加数据
@GetMapping("/add/{keyword}")
public boolean add(@PathVariable("keyword") String keyword) throws IOException {
return contentService.saveES(keyword);
}
//查询数据
@GetMapping("/search/{keyword}/{pageNo}/{pageSize}")
public List> search(@PathVariable("keyword") String keyword,
@PathVariable("pageNo") Integer pageNo,
@PathVariable("pageSize") Integer pageSize) throws IOException {
return contentService.search(keyword, pageNo, pageSize);
}
}


![[会写代码的健身爱好者成长史]之阿里canal数据实时同步-附kafka和Elasticsearch实现 [会写代码的健身爱好者成长史]之阿里canal数据实时同步-附kafka和Elasticsearch实现](http://www.mshxw.com/aiimages/31/708801.png)
