一.基于Canal, 开发数据监控微服务
1. Canal: 阿里开源, java开发, 监控数据库 -> 双机房数据同步
(1) 工作原理: canal -> 伪装成mysql备份机 -> mysql数据改变 -> 数据写入 binary log -> 日志发送到Canal -> canal提取mysql更新数据
(2) Canal 环境部署 - LINUX虚拟机:
1) mysql开启binlog模式, 值ON已开启, OFF未开启
SHOW VARIABLES LIKE '%log_bin%'
2) 修改 /etc/my.cnf 开启binlog模式
[mysql]
log-bin=mysql-bin
binlog-format=ROW
server_id=1
3) 重启mysql: mysql -h localhost -u root -p
4) 使用root账号 -> 创建用户 -> 授予权限
create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION, SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
5) canal服务端安装:
a. 下载地址: https://github.com/alibaba/canal/releases/tag/canal-1.0.24
b. 上传到linux系统, 解压到目录 /usr/local/canal
c. 修改exmaple下的实例配置:
#position info
canal.instance.master.address=192.168.200.128:3306
#username/password
canal.instance.dbusername=canal
canal.instance.dbpassword=canal
#注释掉这个参数, 开启扫描全库
#canal.instance.defaultDatabaseName=
6) 启动服务: [root@localhost canal]# ./bin/startup.sh
7) 查看日志: cat /usr/local/canal/logs/canal/canal.log
(3) 整合Canal+springboot源码包 -> 添加到maven仓库
1) 下载canal整合包spring-boot-start-canal:
https://github.com/chengqian56131/spring-starter-canal
2) 解压进入starter-canal: cmd
3) cd spring-boot-starter-canal-master/starter-canal
4) mvn指令: mvn clean install -DskipTest
5) 进入本地仓库 -> 搜索xpand -> 已安装在目录 repository/com
2. 数据监控微服务 changgou_canal:
(1) 添加依赖: start-canal / spring-rabbit
(2) 启动类CanalApplication:
@EnableCanalClient //声明是canal的客户端
@SpringBootApplication
(3) 配置application.properties:
canal.client.instances.example.host=192.168.200.128 //连接主机
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000 //批大小,每次数据库交互处理1000条数据
spring.rabbitmq.host=192.168.200.128 //连接rabbitmq
(4) 测试监听类 -> com.changgou.canal.listener.BusinessListener
@CanalEventListener //声明当前类是canal监听类
public class BusinessListener{
@ListenPoint(schema="changgou_business", table="tb_ad")//监听方法注解, 监听的库名和表名
public void addUpdate(CanalEntry.EventType e, CanalEntry.RowData r) //监听方法
//eventType: 当前操作数据库的类型
//rowdata: 当前操作数据库的数据
rowData.getBeforeColumnslist(); //改变前的数据
rowData.getAfterColumnsList(); //改变后的数据
二. 基于Canal, 更新首页广告缓存
1. 需求: 广告表tb_ad数据改变 -> canal获取binlog数据 -> changgou_canal微服务获取字段"position" -> 发送消息到rabbitmq -> 运营微服务获取消息"position" -> 调用nginx中的ad_update.lua脚本 -> 更新缓存
2. 第一部分代码: tb_ad数据改变 -> canal监控 -> 发送到mq
(1) canal微服务->添加 config包 ->创建 com.changgou.canal.config.RabbitMQConfig.class
定义队列名psfs AD_UPDATE_QUEUE="ad_update_queue"-> 声明队列@bean(AD_UPDATE_QUEUE)
(2) 监听类BusinessListener.java:
//注入rabbit
@Autowired
private RabbitTemplate rabbitTemplate;
//遍历改变后的数据, 查询字段"position"
for(canalEntry.Column : rowData.getAfterColumsList()){
if("position".equals(column.getName())){
//发送消息
rabbitTemplate.convertAndSend("", RabbitMQConfig.AD_UPDATE_QUEUE, colunm.getValue());
}
}
3. 第二部分: changgou_service_business提取mq消息 -> 远程调用nginx -> 执行ad_update.lua -> 存储并更新到redis
(1) 在changgou_service_business微服务添加依赖: spring-boot-starter-amop / okhttp
(2) 修改yml文件: rabbitmq.host: 192.168.200.128
(3) 监听类 business/listener/AdListener.java
@Component //声明当前类是bean
public class AdListener{
@RabbitListener(queues="ad_update_queue") //指定要监听的队列
public void receiveMessage(String message){
//发起远程调用
OkHttpClient okHttpClient = new OkHttpClient();
//请求路径, 虚拟机中nginx的ad_update.lua文件
String url = "http://192.168.200.128/add_update?position="+message;
//OKHttp3 构建请求
Request request = new Request().Builder().url(url).build();
//设置新的访问
Call call = okHttpClient.newCall(request);
//发起请求
call.enqueue(new Callback(){
//重写回调方法
@Override
public void onFailure(Call call, IOException e){
//请求失败
e.printStackTrace();
}
@Override
public void onResponse(Call call, Response response) throws IOException{
//请求成功
System.out.println("请求成功"+response.message());
}
}
三. 商品上架索引 -> 导入数据 -> 全文搜索引擎库ElasticSearch
1. 需求: 商品上架 -> skuList导入索引库
2. 步骤: canal微服务监控tb_spu表更改isMarketable -> 发送spuId到rabbitmq -> RabbitMQ创建fanout交换机 -> search微服务接收消息spuId -> feign调用goods微服务 -> 获取商品skuList -> 调用ES的restAPI -> 导入skulist到索引库
3. 实现第一部分 -> 发送上架消息:
(1)添加RabbitMQConfig:
//定义交换机
public static final String GOODS_UP_EXCHANGE="goods_up_exchange";
//定义队列
psfs SEARCH_ADD_QUEUE="search_add_queue";
//声明队列
@Bean(SEARCH_ADD_QUEUE)
public Queue SEARCH_ADD_QUEUE(){
return new Queue (SEARCH_ADD_QUEUE);
}
//声明交换机
@Bean(GOODS_UP_EXCHANGE)
public Exchange GOODS_UP_EXCHANGE(){
return ExchangeBuilder.fanoutExchange(GOODS_UP_EXCHANGE).durable(true).build();
}
//交换机和队列的绑定
@Bean
public Binding GOOGDS_UP_EXCHANGE_BINDING(@Qualifier(SEARCH_ADD_QUEUE)Queue queue, @Qualifier(GOODS_UP_EXCHANGE)Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
(2) changgou_canal中添加监听类-> listener/SpuListener.java
@CanalEventListener
Public class SpuListener{
@ListenPoint(schema="changgou_goods", table="tb_spu")
public void goodsUp(CanalEntry.EventType eventType, CanalEntry.RowData rowData){
//获取改变前的数据 -> 转换为map
rowData.getBeforeColumnsList().forEach(c)->new HashMap().put(c.getName(), c.getValue());
//获取改变后的数据 -> 转换为map
rowData.getBeforeColumnsList().forEach(c)->new HashMap().put(c.getName(), c.getValue());
//获取最新上架的商品 状态由 oldData"0" -> newData"1"
if("0".equals(oldData.get("is_marketable"))&&"1".equals(newData.get("is_marketable"))){
//商品spuId发送到mq
redisTemplate.convertAndSend(RabbitMQConfig.GOODS_UP_EXCAHNGE, "", newData.get("id");
}
}
}
4. 实现第二部分 -> 更新商品分词索引库
(1) ES访问方式:
1) 端口: 192:168:200:128:9200
2) Chrome插件 -> 更多工具 -> 扩展程序 -> 发送ES-head文件
(2) 创建 service_search_api
1) 依赖: changgou_common / spring-boot-starter-data
2) com.changgou.search.feign:
a.
3) com.changgou.search.pojo
a. SkuInfo.java -> 索引库的映射实体类 -> java代码操作实体类 -> 更新索引库数据
b. @document(indexName="skuinfo", type="docs") //定义index和type
(3) 创建搜索微服务-> changgou_service_search
1) 依赖: common / eureka-client / elasticsearch / goods-api / search-api / amqp
2) yml配置: #省略了格式, 节省空间
server.port: 9009
spring.application.name: search
rabbitmq.host: 192.168.200.128
redis.host: 192.168.200.128
main: allow-bean-definition-overriding: true #允许重名覆盖
data.elasticsearch:
cluster-name: elasticsearch
cluster-nodes: 192.168.200.128:9300 #集群节点端口
eureka:
client.service-url.defaultZone: http://127.0.0.1.6868/eureka
instance: prefer-ip-address: true #将IP注册到Eureka Server上
feign:
hystrix.enabled: true
client.config.default:
connectTimeout: 600000 #消费者超时连接生产者
readTimeout: 600000 #调用生产者连接超时
hystrix.command.default.execution:
timeout.enabled: false #超时交给ribbon控制
isolation.strategy: SEMAPHORE
3) 启动类SearchApplication: @SpringBootApplication @EnableEurekaClient @EnableFeignClients(basePackages="com.changgou.goods.feign")
4) 创建包com.changgou.search.config.RabbitMQConfig.java
5) 商品服务提供接口 -> 给search微服务调用 -> 通过spuId查询SkuList
1) com.changgou.goods.controller.SkuController.java
//定义远程接口
@GetMapping("/spu/{skuID}")
public List findSkuListBySpuId(@PathVariable("spuId")String spuId){
//封装条件的map
Map searchMap = new HashMap<>();
//规定传递的spuId不是特定的意义"all", 则携带spuId值查询
if(!"all".equals(spuId){
searchMap.put("spuId", spuId);
}
//传递的spuId值为"all"时, 只携带一个参数status="1", 返回全部审核通过的skuList
searchMap.put("status", "1");
//返回集合
return skuService.findList(searchMap);
}
2) 定义feign接口: com.changgou.goods.feign.SkuFeign.java
a. 添加依赖: changgou_common
@FeignClient(name="goods")
public interface SkuFeign{
GetMapping("/sku/spu/{spuId}")
List findSkuListBySpuId(@PathVariable("spuId")String spuId);
}
6) 在search微服务中更新ES索引数据:
1) com.changgou.search.dao.ESManagerMapper.java
//参数1: 当前操作的实体类; 参数2: 当前主键数据类型
public interface ESManagerMapper Extends ElasticsearchRepository{}
2) search.service.impl.ESManagerServiceImpl.java
@Service
public class ESManagerServiceImpl implements ESManagerService{
//创建索引库结构
@Override
public void createMappingAndIndex(){
//创建索引
elasticsearchTemplate.createTndex(SkuInfo.class);
//创建映射
elasticsearchTemplate.putMapping(SkuInfo.class);
}
//导入全部sku集合到索引库
@Override
public void importAll(){
//查询skuList
List skuList = skuFeign.findSkuListBySpuId("all");
if(skuList==null||skuList.size()<=0){
throw new RuntimeException("当前数据未查询到,无法导入索引库");
}
//skuList转换为json [{key:value}, {}, ...]
String JSonSkuList = JSON.toJSonString(skuList);
//将JSON串转换为SkuInfo
List skuInfoList = JSON.parseArray(jasonSkuLlist, SkuInfo.class);
//转换规格信息 -> map
for(SkuInfo skuInfo : skuInfoList){
skuInfo.setSpecMap(JSON.parseObject(skuInfo.getSpec(), Map.class));
}
//导入索引库
esManagerMapper.saveAll(skuInfoList);
}
//根据spu查询skuList, 添加索引库
@Override
public void importDataBySpuId(String spuId){
-> 代码同上...
}
}
3) ESManagerController.java
@RestController
@RequestMapping("/manager")
public class ESMnagerController{
@Autrowired
private ESManagerService esManagerService;
@GetMapping("/create")
pubic Result reate(){
//创建索引映射
esManagerService.createMappingAndIndex();
return new Result(true, StatusCode.OK, "创建索引结构成功");
}
//导入全部数据
@GetMapping("/importAll")
public Result importAll(){
esManagerService.importALL();
return new Result(true, StatusCode.OK, "创建索引结构成功");
}
}
7) search 微服务中添加消息监听类 -> 获取上架的spuId
1) serach微服务: com.changgou.search.listener.GoodsUpListener.java
//监听mq 获取spuId
@Component
public class GoodsUpListener{
@Autowired
private ESManagerService esManagerService;
@RabbitListener(queues=RabbitMQConfig.SEARCH_ADD_QUEUE)
public void receiveMessage(String spuId){
//接收到的消息spuId
//查询skuList, 并导入suoyinku
esManagerService.importDataBySpuId(spuId);
}
}
四. 商品下架索引 -> 删除数据 -> Canal+RabbitMQ+ES
1.需求: 商品下架 -> is_market="0" -> 索引库移除
2. canal微服务监控tb_spu -> 商品下架后 -> 发送spuId到rabbitMQ -> 新建商品下架exchange+索引删除queque -> 发送spuId到search微服务 -> 删除ES数据
3. 实现:
1) 修改数据监控服务
(1) 修改canal微服务: com.changgou.canal.config.RabbitMQConfig.java
//声明交换机
psfs GOODS_DOWN_EXCHANGE="goods_down_exchange";
//声明交换机
@Bean(GOODS_DOWN_EXCHANGE)
public Exchange exchange(){ //方法见名知意
return ExchangeBuilder.fanoutExchange (GOODS_DOWN_EXCHANGE).durable(true).build(); //持久化
}
//声明队列
public static final String SEARCH_DELETE_QUEUE="search_delete_queue";
//定义队列
@Bean(SEARCH_DELETE_QUEUE)
public Queue queue(){ //方法见名知意
return new Queue(SEARCH_DELETE_QUEUE);
}
//绑定交换机
@Bean
public Binding GDEBinding(@Qualifier(GOODS_DOWN_EXCHANGE)Exchange exchange, @Qualifier(SEARCH_DELETE_QUEUE)Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("").noargs();
}
(2) 修改监听类: com.changgou.canal.listener.SpuListener.java
//获取下架商品, is_marketable 原转态"1" -> 新状态"1"
if("1".equals(oldData.get("is_marketable"))&&"0".equals.(newData.get("is_marketable"))){
rabbitTemplate.convertAndSend(RabbitMQConfig.GOODS_DOWN_EXCHANGE,"", newData.get("id"));
2) 在search微服务中监听消息, 删除索引库
(1) search微服务业务层添加删除方法: com.changgou.search.service.ESManagerServiceImpl.java
//根据spuId删除索引库数据
@Override
public void delDataBySpuId(String spuId){
List skuList = skuFeign.findSkuListBySpuId(spuId);
//判断无数据, throw new RuntimeException();
//遍历skuList
for(Sku sku:skuList){
//删除
esManagerService.deleteById(Long.parse(sku.getId()));
}
}
(2) search微服务添加监听类: com.changgou.search.listener.GoodsDeleteListener.java
@Component
public class GoodsDeleteListener{
@rabbitListener(queues="RrabbitMQConfig.SEARCH_DELETE_QUEUE)
public void void receiveMmessage(String spuId){
//接收消息队列spuId, 删除索引
//调用业务层,删除索引库
esManagerService.deleteDataBbySpuId(spuId);
}
}



