13.搜索过滤
13.1 生成分类和品牌过滤13.3 生成规格参数过滤13.4 过滤条件的筛选13.5 页面展示选择的过滤项13.6 取消过滤项13.7 优化 14.thymeleaf及其静态化
14.1 商品详情14.2 页面静态化 15.RabbitMQ
15.1 RabbitMQ简介15.2 五种消息模型15.3 Spring AMQP15.4 项目改造
13.搜索过滤 13.1 生成分类和品牌过滤对于过滤功能,先看一下想要实现的效果:
整个过滤部分有3块:
顶部的导航,已经选择的过滤条件展示:
商品分类面包屑,根据用户选择的商品分类变化其它已选择过滤参数 过滤条件展示,又包含3部分
商品分类展示品牌展示其它规格参数 展开或收起的过滤条件的按钮
顶部导航要展示的内容跟用户选择的过滤条件有关。
比如用户选择了某个商品分类,则面包屑中才会展示具体的分类比如用户选择了某个品牌,列表中才会有品牌信息。
所以,这部分需要依赖第二部分:过滤条件的展示和选择。因此我们先不着急去做。展开或收起的按钮是否显示,取决于过滤条件有多少,如果很少,那么就没必要展示。所以也是跟第二部分的过滤条件有关。
这样分析来看,我们必须先做第二部分:过滤条件展示。
先来看分类和品牌。在我们的数据库中已经有所有的分类和品牌信息。在这个位置,是不是把所有的分类和品牌信息都展示出来呢?显然不是,用户搜索的条件会对商品进行过滤,而在搜索结果中,不一定包含所有的分类和品牌,直接展示出所有商品分类,让用户选择显然是不合适的。无论是分类信息,还是品牌信息,都应该从搜索的结果商品中进行聚合得到。
原来,我们返回的结果是PageResult对象,里面只有total、totalPage、items3个属性。但是现在要对商品分类和品牌进行聚合,数据显然不够用,我们需要对返回的结果进行扩展,添加分类和品牌的数据,由于后面也要返回聚合后的规格参数,因此这里统一扩展SearchResult类。
package com.leyou.search.pojo; import com.leyou.common.pojo.PageResult; import com.leyou.item.pojo.Brand; import java.util.List; import java.util.Map; public class SearchResult extends PageResult{ private List brands; private List
修改SearchServiceImpl:
public SearchResult search(SearchRequest request) {
// 判断查询条件
if (StringUtils.isBlank(request.getKey())) {
// 返回默认结果集
return null;
}
// 初始化自定义查询构建器
NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();
// 添加查询条件
queryBuilder.withQuery(QueryBuilders.matchQuery("all", request.getKey()).operator(Operator.AND));
// 添加结果集过滤,只需要:id,subTitle, skus
queryBuilder.withSourceFilter(new FetchSourceFilter(new String[]{"id", "subTitle", "skus"}, null));
// 获取分页参数
Integer page = request.getPage();
Integer size = request.getSize();
// 添加分页
queryBuilder.withPageable(PageRequest.of(page - 1, size));
String categoryAggName = "categories";
String brandAggName = "brands";
queryBuilder.addAggregation(AggregationBuilders.terms(categoryAggName).field("cid3"));
queryBuilder.addAggregation(AggregationBuilders.terms(brandAggName).field("brandId"));
// 执行搜索,获取搜索的结果集
AggregatedPage goodsPage = (AggregatedPage)this.goodsReponsitory.search(queryBuilder.build());
// 解析聚合结果集
List> categories = getCategoryAggResult(goodsPage.getAggregation(categoryAggName));
List brands = getBrandAggResult(goodsPage.getAggregation(brandAggName));
// 封装成需要的返回结果集
return new SearchResult(goodsPage.getContent(), goodsPage.getTotalElements(), goodsPage.getTotalPages(), categories, brands);
}
private List getBrandAggResult(Aggregation aggregation) {
// 处理聚合结果集
LongTerms terms = (LongTerms)aggregation;
// 获取所有的品牌id桶
List buckets = terms.getBuckets();
// 定义一个品牌集合,搜集所有的品牌对象
List brands = new ArrayList<>();
// 解析所有的id桶,查询品牌
buckets.forEach(bucket -> {
Brand brand = this.brandClient.queryBrandById(bucket.getKeyAsNumber().longValue());
brands.add(brand);
});
return brands;
// 解析聚合结果集中的桶,把桶的集合转化成id的集合
// List brandIds = terms.getBuckets().stream().map(bucket -> bucket.getKeyAsNumber().longValue()).collect(Collectors.toList());
// 根据ids查询品牌
//return brandIds.stream().map(id -> this.brandClient.queryBrandById(id)).collect(Collectors.toList());
// return terms.getBuckets().stream().map(bucket -> this.brandClient.queryBrandById(bucket.getKeyAsNumber().longValue())).collect(Collectors.toList());
}
private List> getCategoryAggResult(Aggregation aggregation) {
// 处理聚合结果集
LongTerms terms = (LongTerms)aggregation;
// 获取所有的分类id桶
List buckets = terms.getBuckets();
// 定义一个品牌集合,搜集所有的品牌对象
List> categories = new ArrayList<>();
List cids = new ArrayList<>();
// 解析所有的id桶,查询品牌
buckets.forEach(bucket -> {
cids.add(bucket.getKeyAsNumber().longValue());
});
List names = this.categoryClient.queryNamesByIds(cids);
for (int i = 0; i < cids.size(); i++) {
Map map = new HashMap<>();
map.put("id", cids.get(i));
map.put("name", names.get(i));
categories.add(map);
}
return categories;
}
页面渲染数据结构
我们可以把所有的过滤条件放入一个数组中,然后在页面利用v-for遍历一次生成。
其基本结构是这样的:
[
{
k:"过滤字段名",
options:[{},{}]
}
]
我们先在data中定义数组:filters,等待组装过滤参数:
data: {
ly,
search:{
key: "",
page: 1
},
goodsList:[], // 接收搜索得到的结果
total: 0, // 总条数
totalPage: 0, // 总页数
filters:[] // 过滤参数集合
},
然后在查询搜索结果的回调函数中,对过滤参数进行封装:
页面渲染数据
我们注意到,虽然页面元素是一样的,但是品牌会比其它搜索条件多出一些样式,因为品牌是以图片展示。需要进行特殊处理。数据展示是一致的,我们采用v-for处理:
13.3 生成规格参数过滤{{f.k}}
有四个问题需要先思考清楚:
什么时候显示规格参数过滤? 分类只有一个如何知道哪些规格需要过滤?要过滤的参数,其可选值是如何获取的?规格过滤的可选值,其数据格式怎样的?
1.什么情况下显示有关规格参数的过滤?
如果用户尚未选择商品分类,或者聚合得到的分类数大于1,那么就没必要进行规格参数的聚合。因为不同分类的商品,其规格是不同的。因此,我们在后台需要对聚合得到的商品分类数量进行判断,如果等于1,我们才继续进行规格参数的聚合。
**2.如何知道哪些规格需要过滤? **
我们不能把数据库中的所有规格参数都拿来过滤。因为并不是所有的规格参数都可以用来过滤,参数的值是不确定的。我们在设计规格参数时,已经标记了某些规格可搜索,某些不可搜索。因此,一旦商品分类确定,我们就可以根据商品分类查询到其对应的规格,从而知道哪些规格要进行搜索。
3.要过滤的参数,其可选值是如何获取的?
虽然数据库中有所有的规格参数,但是不能把一切数据都用来供用户选择。与商品分类和品牌一样,应该是从用户搜索得到的结果中聚合,得到与结果品牌的规格参数可选值。
4.要过滤的可选值,其数据格式怎样的?
具体怎么实现呢?总结为以下五步:
1)用户搜索得到商品,并聚合出商品分类2)判断分类数量是否等于1,如果是则进行规格参数聚合3)先根据分类,查找可以用来搜索的规格4)对规格参数进行聚合5)将规格参数聚合结果整理后返回
具体的实现过程就不一一分析了,直接贴上代码即可:
GoodsServiceImpl类:
package com.leyou.search.service.impl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.leyou.common.pojo.PageResult;
import com.leyou.item.pojo.*;
import com.leyou.search.client.BrandClient;
import com.leyou.search.client.CategoryClient;
import com.leyou.search.client.GoodsClient;
import com.leyou.search.client.SpecificationClient;
import com.leyou.search.pojo.Goods;
import com.leyou.search.pojo.SearchRequest;
import com.leyou.search.pojo.SearchResult;
import com.leyou.search.repository.GoodsRepository;
import com.leyou.search.service.SearchService;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.elasticsearch.core.aggregation.AggregatedPage;
import org.springframework.data.elasticsearch.core.query.FetchSourceFilter;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@Service
public class SearchServiceImpl implements SearchService {
@Autowired
private BrandClient brandClient;
@Autowired
private CategoryClient categoryClient;
@Autowired
private GoodsClient goodsClient;
@Autowired
private SpecificationClient specificationClient;
@Autowired
private GoodsRepository goodsRepository;
private static final ObjectMapper MAPPER = new ObjectMapper();
@Override
public Goods spuToGoods(Spu spu) throws IOException {
Goods goods = new Goods();
goods.setId(spu.getId());
List ids = Arrays.asList(spu.getCid1(), spu.getCid2(), spu.getCid3());
List names = this.categoryClient.queryNamesById(ids);
String brandName = this.brandClient.queryByid(spu.getBrandId()).getName();
String all = spu.getTitle() + " " + StringUtils.join(names, " ") + " " + brandName;
goods.setAll(all);
goods.setSubTitle(spu.getSubTitle());
goods.setBrandId(spu.getBrandId());
goods.setCid1(spu.getCid1());
goods.setCid2(spu.getCid2());
goods.setCid3(spu.getCid3());
goods.setCreateTime(spu.getCreateTime());
List prices = new ArrayList<>();
List> skus = new ArrayList<>();
List skusList = this.goodsClient.querySkusBySpuId(spu.getId());
skusList.forEach(sku -> {
prices.add(sku.getPrice());
Map map = new HashMap<>();
map.put("id",sku.getId());
map.put("title",sku.getTitle());
String images = sku.getImages();
map.put("images",StringUtils.isEmpty(images)?"":images.split(",")[0]);
map.put("price",sku.getPrice());
skus.add(map);
});
goods.setPrice(prices);
goods.setSkus(MAPPER.writevalueAsString(skus));
Map specs = new HashMap<>();
List params = this.specificationClient.queryParams(null, spu.getCid3(), null, true);
SpuDetail spuDetail = this.goodsClient.querySpuDetailBySpuId(spu.getId());
String genericSpec = spuDetail.getGenericSpec();
String specialSpec = spuDetail.getSpecialSpec();
Map genericSpecMap = MAPPER.readValue(genericSpec, new TypeReference>() {
});
Map> specialSpecMap = MAPPER.readValue(specialSpec, new TypeReference>>() {
});
params.forEach(param -> {
if(param.getGeneric()){
String value = genericSpecMap.get(param.getId().toString()).toString();
if(param.getNumeric()){
value = chooseSegment(value, param);
}
specs.put(param.getName(),value);
}
else{
String value = specialSpecMap.get(param.getId().toString()).toString();
specs.put(param.getName(),value);
}
});
goods.setSpecs(specs);
return goods;
}
@Override
public SearchResult search(SearchRequest searchRequest) {
//获得搜索的关键字
String key = searchRequest.getKey();
if(StringUtils.isEmpty(key)){
return null;
}
NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();
//QueryBuilder basicQuery = QueryBuilders.matchQuery("all", searchRequest.getKey()).operator(Operator.AND);
QueryBuilder basicQuery = buildBoolQueryBuilder(searchRequest);
queryBuilder.withQuery(basicQuery);
queryBuilder.withSourceFilter(new FetchSourceFilter(new String[]{"id","subTitle","skus"},null));
queryBuilder.withPageable(PageRequest.of(searchRequest.getPage()-1,searchRequest.getSize()));
//添加品牌聚合
String brandAggName="brands";
queryBuilder.addAggregation(AggregationBuilders.terms(brandAggName).field("brandId"));
//添加分类聚合
String categoryAggName="categories";
queryBuilder.addAggregation(AggregationBuilders.terms(categoryAggName).field("cid3"));
//获取查询结果
AggregatedPage goods = (AggregatedPage) this.goodsRepository.search(queryBuilder.build());
//将查询结果转化成需要的形式
List brands = getBrandAggResult(goods.getAggregation(brandAggName));
List> categories = getCategoryAggResult(goods.getAggregation(categoryAggName));
List> specs = null;
//如果分类只有一个就对相应的规格参数进行聚合,如果分类有多个则对所有规格参数进行聚合没有意义且浪费时间
if(!CollectionUtils.isEmpty(categories) && categories.size() == 1){
specs = getSpecsAggResult((Long)categories.get(0).get("id"), basicQuery);
}
SearchResult result = new SearchResult(goods.getTotalElements(), goods.getTotalPages(), goods.getContent(),brands,categories,specs);
return result;
}
private String chooseSegment(String value, SpecParam p) {
double val = NumberUtils.toDouble(value);
String result = "其它";
// 保存数值段
for (String segment : p.getSegments().split(",")) {
String[] segs = segment.split("-");
// 获取数值范围
double begin = NumberUtils.toDouble(segs[0]);
double end = Double.MAX_VALUE;
if(segs.length == 2){
end = NumberUtils.toDouble(segs[1]);
}
// 判断是否在范围内
if(val >= begin && val < end){
if(segs.length == 1){
result = segs[0] + p.getUnit() + "以上";
}else if(begin == 0){
result = segs[1] + p.getUnit() + "以下";
}else{
result = segment + p.getUnit();
}
break;
}
}
return result;
}
private List getBrandAggResult(Aggregation aggregation){
// 强制转换
LongTerms longTerms = (LongTerms) aggregation;
return longTerms.getBuckets().stream().map(bucket -> this.brandClient.queryByid(bucket.getKeyAsNumber().longValue())
).collect(Collectors.toList());
}
private List> getCategoryAggResult(Aggregation aggregation){
LongTerms longTerms = (LongTerms) aggregation;
return longTerms.getBuckets().stream().map(bucket -> {
Map map = new HashMap<>();
Long id = bucket.getKeyAsNumber().longValue();
List names = this.categoryClient.queryNamesById(Arrays.asList(id));
map.put("id",id);
map.put("name",names.get(0));
return map;
}).collect(Collectors.toList());
}
private List> getSpecsAggResult(Long cid, QueryBuilder basicQuery){
List> specs = new ArrayList<>();
//根据cid查询所有的规格参数
List params = this.specificationClient.queryParams(null, cid, null, true);
//构造新的查询
NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();
//传入查询条件
queryBuilder.withQuery(basicQuery);
//添加过滤集
queryBuilder.withSourceFilter(new FetchSourceFilter(new String[]{},null));
//添加聚合查询
params.forEach(param -> {
//将规格参数名称作为聚合名称
queryBuilder.addAggregation(AggregationBuilders.terms(param.getName()).field("specs." + param.getName() + ".keyword"));
});
//执行查询
AggregatedPage search = (AggregatedPage)this.goodsRepository.search(queryBuilder.build());
//处理查询结果
Map aggregationMap = search.getAggregations().asMap();
for (Map.Entry entry : aggregationMap.entrySet()) {
Map map = new HashMap<>();
String key = entry.getKey();
StringTerms value = (StringTerms)entry.getValue();
map.put("key", key);
List collect = value.getBuckets().stream().map(bucket -> bucket.getKeyAsString()).collect(Collectors.toList());
map.put("options", collect);
specs.add(map);
}
return specs;
}
private QueryBuilder buildBoolQueryBuilder(SearchRequest searchRequest){
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
// 添加must条件
boolQueryBuilder.must(QueryBuilders.matchQuery("all",searchRequest.getKey()).operator(Operator.AND));
// 添加filter条件
Map filter = searchRequest.getFilter();
if(filter != null) {
for (Map.Entry entry : filter.entrySet()) {
String key = entry.getKey();
String term;
//将key转换成相应的查询字段
if (StringUtils.equals("品牌", key)) {
term = "brandId";
} else if (StringUtils.equals("分类", key)) {
term = "cid3";
} else {
term = "specs." + key + ".keyword";
}
boolQueryBuilder.filter(QueryBuilders.termQuery(term, entry.getValue()));
}
}
return boolQueryBuilder;
}
}
页面渲染
首先把后台传递过来的specs添加到filters数组:要注意:分类、品牌的option选项是对象,里面有name属性,而specs中的option是简单的字符串,所以需要进行封装,变为相同的结构:
data.specs.forEach(spec => {
spec.options = spec.options.map(o => ({name: o}));
_this.filters.push(spec);
});
最后渲染的结果如下:
展示或收起过滤条件
如果感觉显示的太多了,我们可以通过按钮点击来展开和隐藏部分内容:
我们在data中定义变量,记录展开或隐藏的状态:
然后在按钮绑定点击事件,以改变show的取值:
在展示规格时,对show进行判断:
当我们点击页面的过滤项,要做哪些事情?
把过滤条件保存在search对象中(watch监控到search变化后就会发送到后台)在页面顶部展示已选择的过滤项把商品分类展示到顶部面包屑
保存过滤项
我们把已选择的过滤项保存在search中:
这里有一个很隐蔽的坑要注意,在created构造函数中会对search进行初始化,所以要在构造函数中对filter进行初始化,否则又会出现filter条件变了但是监听不生效的情况,因为钩子函数初始化search时没有filter对象,导致后面即使filter发生变化也不会被监听到:
search.filter是一个对象,结构:
{
"过滤项名":"过滤项值"
}
然后给所有的过滤项绑定点击事件:
要注意,点击事件传2个参数:
k:过滤项的keyoption:当前过滤项对象
在点击事件中,保存过滤项到selectedFilter:
selectFilter(k, o){
const obj = {};
Object.assign(obj, this.search);
if(k === '分类' || k === '品牌'){
o = o.id;
}
obj.filter[k] = o.name || o;
this.search = obj;
}
另外,这里search对象中嵌套了filter对象,请求参数格式化时需要进行特殊处理,修改common.js中的一段代码:
关于后台代码的实现,已经粘贴过了,具体的实现步骤就直接看代码吧!
当用户选择一个商品分类以后,我们应该在过滤模块的上方展示一个面包屑,把三级商品分类都显示出来。用户选择的商品分类就存放在search.filter中,但是里面只有第三级分类的id:cid3,我们需要根据它查询出所有三级分类的id及名称。于是我们需要在后台提供相应的接口。
在页面重新加载完毕后,此时因为过滤条件中加入了商品分类的条件,所以查询的结果中只有1个分类。我们判断商品分类是否只有1个,如果是,则查询三级商品分类,添加到面包屑即可。
然后渲染:
其它过滤项
接下来,我们需要在页面展示用户已选择的过滤项,如图:
我们知道,所有已选择过滤项都保存在search.filter中,因此在页面遍历并展示即可。
基本有四类数据:
商品分类:这个不需要展示,分类展示在面包屑位置品牌:这个要展示,但是其key和值不合适,我们不能显示一个id在页面。需要找到其name值数值类型规格:这个展示的时候,需要把单位查询出来非数值类型规格:这个直接展示其值即可
具体怎么实现也并不难,这里就略了。
隐藏已经选择的过滤项
现在,我们已经实现了已选择过滤项的展示,但是你会发现一个问题:已经选择的过滤项,在过滤列表中依然存在:这些已经选择的过滤项,应该从列表中移除。怎么做呢?你必须先知道用户选择了什么。用户选择的项保存在search.filter中:我们可以编写一个计算属性,把filters中的 已经被选择的key过滤掉:
computed:{
remainFilters(){
const keys = Object.keys(this.search.filter);
if(this.search.filter.cid3){
keys.push("分类")
}
if(this.search.filter.brandId){
keys.push("品牌")
}
return this.filters.filter(f => !keys.includes(f.k));
}
}
然后页面不再直接遍历filters,而是遍历remainFilters
最后发现,还剩下一堆没选过的。但是都只有一个可选项,此时再过滤没有任何意义,应该隐藏,所以,在刚才的过滤条件中,还应该添加一条:如果只剩下一个可选项,不显示
我们能够看到,每个过滤项后面都有一个小叉,当点击后,应该取消对应条件的过滤。
思路非常简单:
给小叉绑定点击事件点击后把过滤项从search.filter中移除,页面会自动刷新,OK
绑定点击事件时,把k传递过去,方便删除
removeFilter(k){
this.search.filter[k] = null;
}
13.7 优化
搜索系统需要优化的点:
查询规格参数部分可以添加缓存聚合计算interval变化频率极低,所以可以设计为定时任务计算(周期为天),然后缓存起来。elasticsearch本身有查询缓存,可以不进行优化商品图片应该采用缩略图,减少流量,提高页面加载速度图片采用延迟加载图片还可以采用CDN服务器sku信息应该在页面异步加载,而不是放到索引库 14.thymeleaf及其静态化 14.1 商品详情
当用户搜索到商品,肯定会点击查看,就会进入商品详情页,接下来我们完成商品详情页的展示。在商品详情页中,我们会使用到Thymeleaf来渲染页面,所以需要先了解Thymeleaf的语法。其实我觉得Thymeleaf和jsp语法也差不多,学了不用就忘,然后再学,唉。
商品详情浏览量比较大,并发高,我们会独立开启一个微服务,用来展示商品详情,该微服务命名为leyou-goods-web,依然三步走。
1.引入依赖:
leyou com.leyou.parent 1.0.0-SNAPSHOT 4.0.0 com.leyou.goods leyou-goods-web11 11 org.springframework.cloud spring-cloud-starter-netflix-eureka-clientorg.springframework.boot spring-boot-starter-weborg.springframework.boot spring-boot-starter-thymeleaforg.springframework.cloud spring-cloud-starter-openfeigncom.leyou.item leyou-item-interface1.0.0-SNAPSHOT com.leyou.common leyou-common1.0.0-SNAPSHOT org.springframework.boot spring-boot-starter-testorg.springframework.boot spring-boot-starter-amqp
2.添加配置文件application.yml
server:
port: 8084
spring:
application:
name: goods-web
thymeleaf:
cache: false
rabbitmq:
host: 192.168.124.121
virtual-host: /leyou
username: leyou
password: leyou
eureka:
client:
service-url:
defaultZone: http://127.0.0.1:10086/eureka
registry-fetch-interval-seconds: 5
3.编写引导类:
package com.leyou;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
public class LeyouGoodsWebApplication {
public static void main(String[] args) {
SpringApplication.run(LeyouGoodsWebApplication.class,args);
}
}
修改页面跳转路径
在zuul网关组件中添加路由:
zuul:
prefix: /api
routes:
item-service: /item
@Override
public Map queryDetailsOfItem(Long spuId) {
Map resultMap = new HashMap<>();
//查询spu
Spu spu = this.goodsClient.querySpuBySpuId(spuId);
resultMap.put("spu",spu);
//查询spuDetail
SpuDetail spuDetail = this.goodsClient.querySpuDetailBySpuId(spuId);
resultMap.put("spuDetail",spuDetail);
//查询skus
List skus = this.goodsClient.querySkusBySpuId(spuId);
resultMap.put("skus",skus);
//查询品牌
Brand brand = this.brandClient.queryByid(spu.getBrandId());
resultMap.put("brand",brand);
//查询分类
List ids = Arrays.asList(spu.getCid1(), spu.getCid2(), spu.getCid3());
List names = this.categoryClient.queryNamesById(ids);
List> categories = new ArrayList<>();
for (int i = 0; i < ids.size(); i++) {
Map map = new HashMap<>();
map.put("id",ids.get(i));
map.put("name",names.get(i));
categories.add(map);
}
resultMap.put("categories",categories);
//查询规格参数组
List specGroups = this.specificationClient.querySpecGroupsByCid(spu.getCid3());
resultMap.put("groups",specGroups);
//查询特殊规格参数
List params = this.specificationClient.queryParams(null, spu.getCid3(), false, null);
Map paramMap = new HashMap<>();
params.forEach(param -> {
paramMap.put(param.getId(),param.getName());
});
resultMap.put("paramMap",paramMap);
//返回结果
return resultMap;
}
}
渲染商品列表
这个部分需要渲染的数据有5块:
sku图片sku标题副标题sku价格特有规格属性列表
其中,sku 的图片、标题、价格,都必须在用户选中一个具体sku后,才能渲染。而特有规格属性列表可以在spuDetail中查询到。而副标题则是在spu中,直接可以在页面渲染。因此,我们先对特有规格属性列表进行渲染。等用户选择一个sku,再通过js对其它sku属性渲染。
渲染规格属性列表
规格属性列表将来会有事件和动态效果。我们需要有js代码参与,不能使用Thymeleaf来渲染了。因此,这里我们用vue,不过需要先把数据放到js对象中,方便vue使用
我们在页面的head中,定义一个js标签,然后在里面定义变量,保存与sku相关的一些数据:
specialSpec:这是SpuDetail中唯一与Sku相关的数据
因此我们并没有保存整个spuDetail,而是只保留了这个属性,而且需要手动转为js对象。paramMap:规格参数的id和name键值对,方便页面根据id获取参数名skus:sku集合
通过Vue渲染
我们把刚才获得的几个变量保存在Vue实例中:
然后在页面中渲染:
{{paramMap[k]}}- {{str}}
数据成功渲染了。不过我们发现所有的规格都被勾选了。这是因为现在,每一个规格都有式:selected,我们应该只选中一个,让它的class样式为selected才对!
规格属性的筛选
每一个规格项是数组中的一个元素,因此我们只要保存被选择的规格项的索引,就能判断哪个是用户选择的了!我们需要一个对象来保存用户选择的索引,格式如下:
{
"4":0,
"12":0,
"13":0
}
但问题是,第一次进入页面时,用户并未选择任何参数。因此索引应该有一个默认值,我们将默认值设置为0。我们在head的script标签中,对索引对象进行初始化:
然后将其保存在vue之中。
页面改造
我们在页面中,通过判断indexes的值来判断当前规格是否被选中,并且给规格绑定点击事件,点击规格项后,修改indexes中的对应值:
{{paramMap[k]}}- {{str}}
确定SKU
在我们设计sku数据的时候,就已经添加了一个字段:indexes,这其实就是规格参数的索引组合。而我们在页面中,用户点击选择规格后,就会把对应的索引保存起来,因此,我们可以根据这个indexes来确定用户要选择的sku。我们在vue中定义一个计算属性,来计算与索引匹配的sku:
computed:{
sku(){
const index = Object.values(this.indexes).join("_");
return this.skus.find(s => s.indexes == index);
}
}
渲染sku列表
既然已经拿到了用户选中的sku,接下来,就可以在页面渲染数据了
商品图片是一个字符串,以,分割,页面展示比较麻烦,所以我们编写一个计算属性:images(),将图片字符串变成数组:
computed: {
sku(){
const index = Object.values(this.indexes).join("_");
return this.skus.find(s=>s.indexes==index);
},
images(){
return this.sku.images ? this.sku.images.split(",") : [''];
}
},
将页面改造成如下即可:
<
注意商品详情是HTML代码,我们不能使用 th:text,应该使用th:utext
规格包装
规格包装分成两部分:
规格参数包装列表
而且规格参数需要按照组来显示
规格参数的值分为两部分:
通用规格参数:保存在SpuDetail中的genericSpec中特有规格参数:保存在sku的ownSpec中
所以我们需要把这两部分值取出来,放到groups中。
从spuDetail中取出genericSpec并取出groups:
把genericSpec引入到Vue实例:
因为sku是动态的,所以我们编写一个计算属性,来进行值的组合:
groups(){
groups.forEach(group => {
group.params.forEach(param => {
if(param.generic){
// 通用属性,去spu的genericSpec中获取
param.v = this.genericSpec[param.id] || '其它';
}else{
// 特有属性值,去SKU中获取
param.v = JSON.parse(this.sku.ownSpec)[param.id]
}
})
})
return groups;
}
然后页面渲染:
{{group.name}}
- {{p.name}}
- {{p.v + (p.unit || '')}}
包装列表
包装列表在商品详情中,我们一开始并没有赋值到Vue实例中,但是可以通过Thymeleaf来渲染
包装清单
售后服务
售后服务也可以通过Thymeleaf进行渲染:
14.2 页面静态化售后保障
问题分析
现在,我们的页面是通过Thymeleaf模板引擎渲染后返回到客户端。在后台需要大量的数据查询,而后渲染得到HTML页面。会对数据库造成压力,并且请求的响应时间过长,并发能力不高。那么怎么解决呢?首先我们能想到的就是缓存技术,比如之前学习过的Redis。不过Redis适合数据规模比较小的情况。假如数据量比较大,例如我们的商品详情页。每个页面如果10kb,100万商品,就是10GB空间,对内存占用比较大。此时就给缓存系统带来极大压力,如果缓存崩溃,接下来倒霉的就是数据库了。所以缓存并不是万能的,某些场景需要其它技术来解决,比如静态化。
什么是静态化
静态化是指把动态生成的HTML页面变为静态内容保存,以后用户的请求到来,直接访问静态页面,不再经过服务的渲染。而静态的HTML页面可以部署在nginx中,从而大大提高并发能力,减小tomcat压力。
如何实现静态化
目前,静态化页面都是通过模板引擎来生成,而后保存到nginx服务器来部署。常用的模板引擎比如:
FreemarkerVelocityThymeleaf
我们之前就使用的Thymeleaf,来渲染html返回给用户。Thymeleaf除了可以把渲染结果写入Response,也可以写到本地文件,从而实现静态化。
Thymeleaf实现静态化
先说下Thymeleaf中的几个概念:
Context:运行上下文TemplateResolver:模板解析器TemplateEngine:模板引擎
1.Context
上下文: 用来保存模型数据,当模板引擎渲染时,可以从Context上下文中获取数据用于渲染。当与SpringBoot结合使用时,我们放入Model的数据就会被处理到Context,作为模板渲染的数据使用。
2.TemplateResolver
模板解析器:用来读取模板相关的配置,例如:模板存放的位置信息,模板文件名称,模板文件的类型等等。当与SpringBoot结合时,TemplateResolver已经由其创建完成,并且各种配置也都有默认值,比如模板存放位置,其默认值就是:templates。比如模板文件类型,其默认值就是html。
3.TemplateEngine
模板引擎:用来解析模板的引擎,需要使用到上下文、模板解析器。分别从两者中获取模板中需要的数据,模板文件。然后利用内置的语法规则解析,从而输出解析后的文件。来看下模板引擎进行处理的函数:
templateEngine.process("模板名", context, writer);
三个参数:
模板名称上下文:里面包含模型数据writer:输出目的地的流
在输出时,我们可以指定输出的目的地,如果目的地是Response的流,那就是网络响应。如果目的地是本地文件,那就实现静态化了。而在SpringBoot中已经自动配置了模板引擎,因此我们不需要关心这个。现在我们做静态化,就是把输出的目的地改成本地文件即可!
GoodsHtmlServiceImpl类:
package com.leyou.goods.service.impl;
import com.leyou.goods.service.GoodsService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.thymeleaf.TemplateEngine;
import org.thymeleaf.context.Context;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
@Service
public class GoodsHtmlServiceImpl {
// thymeleaf启动器已经自动注入了模板引擎
@Autowired
private TemplateEngine engine;
@Autowired
private GoodsService goodsService;
public void createHtml(Long spuId){
//初始化运行环境
Context context = new Context();
//设置变量
context.setVariables(this.goodsService.queryDetailsOfItem(spuId));
PrintWriter printWriter = null;
try {
File file = new File("D:\ApplicationProgram\nginx-1.14.0\html\api\goods\item\"+spuId+".html");
printWriter = new PrintWriter(file);
engine.process("item",context,printWriter);
} catch (FileNotFoundException e) {
e.printStackTrace();
} finally {
if(printWriter != null){
printWriter.close();
}
}
}
public void asyncExcute(Long spuId) {
ThreadUtils.execute(()->createHtml(spuId));
}
}
线程工具类:
public class ThreadUtils {
private static final ExecutorService es = Executors.newFixedThreadPool(10);
public static void execute(Runnable runnable) {
es.submit(runnable);
}
}
什么时候创建静态文件
我们编写好了创建静态文件的service,那么问题来了:什么时候去调用它呢?想想这样的场景:假如大部分的商品都有了静态页面。那么用户的请求都会被nginx拦截下来,根本不会到达我们的leyou-goods-web服务。只有那些还没有页面的请求,才可能会到达这里。因此,如果请求到达了这里,我们除了返回页面视图外,还应该创建一个静态页面,那么下次就不会再来麻烦我们了。所以,我们在GoodsController中添加逻辑,去生成静态html文件:
@GetMapping("{id}.html")
public String toItemPage(@PathVariable("id")Long id, Model model){
// 加载所需的数据
Map map = this.goodsService.loadModel(id);
// 把数据放入数据模型
model.addAllAttributes(map);
// 页面静态化
this.goodsHtmlService.asyncExcute(id);
return "item";
}
注意:生成html 的代码不能对用户请求产生影响,所以这里我们使用额外的线程进行异步创建。
nginx代理静态页面
接下来,我们修改nginx,让它对商品请求进行监听,指向本地静态页面,如果本地没找到,才进行反向代理:
server {
listen 80;
server_name www.leyou.com;
proxy_set_header X-Forwarded-Host $host;
proxy_set_header X-Forwarded-Server $host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
location /api/goods/item {
# 先找本地
# root html;
# if (!-f $request_filename) { #请求的文件不存在,就反向代理
proxy_pass http://127.0.0.1:10010;
# break;
# }
}
location /api/goods {
proxy_pass http://127.0.0.1:10010;
proxy_connect_timeout 600;
proxy_read_timeout 600;
}
location / {
proxy_pass http://127.0.0.1:9002;
proxy_connect_timeout 600;
proxy_read_timeout 600;
}
}
实际测试时发现静态化之后访问速度大大增加。
15.RabbitMQ 15.1 RabbitMQ简介目前我们已经完成了商品详情和搜索系统的开发。我们思考一下,是否存在问题?
商品的原始数据保存在数据库中,增删改查都在数据库中完成。搜索服务数据来源是索引库,如果数据库商品发生变化,索引库数据不能及时更新。商品详情做了页面静态化,静态页面数据也不会随着数据库商品发生变化。
如果我们在后台修改了商品的价格,搜索页面和商品详情页显示的依然是旧的价格,这样显然不对。该如何解决?
这里有两种解决方案:
方案1:每当后台对商品做增删改操作,同时要修改索引库数据及静态页面方案2:搜索服务和商品页面服务对外提供操作接口,后台在商品增删改后,调用接口
以上两种方式都有同一个严重问题:就是代码耦合,后台服务中需要嵌入搜索和商品页面服务,违背了微服务的独立原则。
所以,我们会通过另外一种方式来解决这个问题:消息队列
什么是消息队列
消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。
结合前面所说的问题:
商品服务对商品增删改以后,无需去操作索引库或静态页面,只是发送一条消息,也不关心消息被谁接收。搜索服务和静态页面服务接收消息,分别去处理索引库和静态页面。
如果以后有其它系统也依赖商品服务的数据,同样监听消息即可,商品服务无需任何代码修改。
MQ是消息通信的模型,并不是具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。
两者间的区别和联系:
JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。JMS规定了两种消息模型;而AMQP的消息模型更加丰富
常见MQ产品
ActiveMQ:基于JMSRabbitMQ:基于AMQP协议,erlang语言开发,稳定性好RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会Kafka:分布式消息系统,高吞吐量
我将我的rabbitMQ安装在虚拟机上。
15.2 五种消息模型创建一个新工程,并导入相关依赖:
4.0.0 cn.itcast.rabbitmq itcast-rabbitmq0.0.1-SNAPSHOT org.springframework.boot spring-boot-starter-parent2.0.2.RELEASE 1.8 org.apache.commons commons-lang33.3.2 org.springframework.boot spring-boot-starter-amqporg.springframework.boot spring-boot-starter-test
然后添加配置文件:
spring:
rabbitmq:
host: 192.168.124.121
username: leyou
password: leyou
virtual-host: /leyou
编写一个工具类,用于建立与rabbitMQ的连接:
package cn.itcast.rabbitmq.util;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("192.168.124.121");
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/leyou");
factory.setUsername("leyou");
factory.setPassword("leyou");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
}
RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同。
1.基本消息模型
RabbitMQ是一个消息代理:它接受和转发消息。 你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。 在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。RabbitMQ与邮局的主要区别是它不处理纸张,而是接受,存储和转发数据消息的二进制数据块。
P(producer/ publisher):生产者,一个发送消息的用户应用程序。
C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序
队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。
总之:
生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。
生产者:
package cn.itcast.rabbitmq.direct;
import cn.itcast.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
private final static String EXCHANGE_NAME = "direct_exchange_test";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明exchange,指定类型为direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 消息内容
String message = "商品删除了, id = 1001";
// 发送消息,并且指定routing key 为:insert ,代表新增商品
channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());
System.out.println(" [商品服务:] Sent '" + message + "'");
channel.close();
connection.close();
}
}
消费者1:
package cn.itcast.rabbitmq.direct;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import cn.itcast.rabbitmq.util.ConnectionUtil;
public class Recv {
private final static String QUEUE_NAME = "direct_exchange_queue_1";
private final static String EXCHANGE_NAME = "direct_exchange_test";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [消费者1] received : " + msg + "!");
}
};
// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
消费者2:
package cn.itcast.rabbitmq.direct;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import cn.itcast.rabbitmq.util.ConnectionUtil;
public class Recv2 {
private final static String QUEUE_NAME = "direct_exchange_queue_2";
private final static String EXCHANGE_NAME = "direct_exchange_test";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [消费者2] received : " + msg + "!");
}
};
// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
消息确认机制(ACK)
消息一旦被消费者接收,队列中的消息就会被删除。那么问题来了:RabbitMQ怎么知道消息被接收了呢?如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:
自动ACK:消息一旦被接收,消费者自动发送ACK手动ACK:消息接收后,不会发送ACK,需要手动调用
哪种更好取决于消息的重要性:
如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。
如何手动ACK呢?
public class Recv2 {
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 创建通道
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [x] received : " + msg + "!");
// 手动进行ACK
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 监听队列,第二个参数false,手动进行ACK
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
注意最后一行代码
channel.basicConsume(QUEUE_NAME, false, consumer);
如果第二个参数为true,则会自动进行ACK;如果为false,则需要手动ACK。
2.work消息模型
工作队列或者竞争消费者模式
在第一篇教程中,我们编写了一个程序,从一个命名队列中发送并接受消息。在这里,我们将创建一个工作队列,在多个工作者之间分配耗时任务。工作队列,又称任务队列。主要思想就是避免执行资源密集型任务时,必须等待它执行完成。相反我们稍后完成任务,我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。当你运行许多消费者时,任务将在他们之间共享,但是一个消息只能被一个消费者获取。这个概念在Web应用程序中特别有用,因为在短的HTTP请求窗口中无法处理复杂的任务。
所以避免消息堆积的方法有两种:
1)采用workqueue,多个消费者监听同一队列。2)接收到消息以后,通过线程池,异步消费。
接下来我们来模拟这个流程:
生产者:
package cn.itcast.rabbitmq.work;
import cn.itcast.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
// 生产者
public class Send {
private final static String QUEUE_NAME = "test_work_queue";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 循环发布任务
for (int i = 0; i < 50; i++) {
// 消息内容
String message = "task .. " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep(i * 2);
}
// 关闭通道和连接
channel.close();
connection.close();
}
}
消费者1:
package cn.itcast.rabbitmq.work;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import cn.itcast.rabbitmq.util.ConnectionUtil;
// 消费者1
public class Recv {
private final static String QUEUE_NAME = "test_work_queue";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 设置每个消费者同时只能处理一条消息
channel.basicQos(1);
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [消费者1] received : " + msg + "!");
try {
// 模拟完成任务的耗时:1000ms
Thread.sleep(1000);
} catch (InterruptedException e) {
}
// 手动ACK
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 监听队列。
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
消费者2:
package cn.itcast.rabbitmq.work;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import cn.itcast.rabbitmq.util.ConnectionUtil;
//消费者2
public class Recv2 {
private final static String QUEUE_NAME = "test_work_queue";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 设置每个消费者同时只能处理一条消息
channel.basicQos(1);
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [消费者2] received : " + msg + "!");
// 手动ACK
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 监听队列。
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
能者多劳
消费者1比消费者2的效率要低,一次任务的耗时较长然而两人最终消费的消息数量是一样的消费者2大量时间处于空闲状态,消费者1一直忙碌
现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。
怎么实现呢?
我们可以使用basicQos方法和prefetchCount = 1设置。 这告诉RabbitMQ一次不要向工作人员发送多于一条消息。 或者换句话说,不要向工作人员发送新消息,直到它处理并确认了前一个消息。 相反,它会将其分派给不是仍然忙碌的下一个工作人员。
订阅模型分类
在之前的模式中,我们创建了一个工作队列。 工作队列背后的假设是:每个任务只被传递给一个工作人员。 在这一部分,我们将做一些完全不同的事情 - 我们将会传递一个信息给多个消费者。 这种模式被称为“发布/订阅”。
1、1个生产者,多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机
4、每个队列都要绑定到交换机
5、生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的
X(Exchanges):交换机一方面接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
Exchange类型有以下几种:
Fanout:广播,将消息交给所有绑定到交换机的队列 Direct:定向,把消息交给符合指定routing key 的队列 Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
3.订阅模型-Fanout
在广播模式下,消息发送流程是这样的:
1) 可以有多个消费者2) 每个消费者有自己的queue(队列)3) 每个队列都要绑定到Exchange(交换机)4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。5) 交换机把消息发送给绑定过的所有队列6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
生产者:
两个变化:
1) 声明Exchange,不再声明Queue2) 发送消息到Exchange,不再发送到Queue
package cn.itcast.rabbitmq.fanout;
import cn.itcast.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
private final static String EXCHANGE_NAME = "fanout_exchange_test";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明exchange,指定类型为fanout
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 消息内容
String message = "Hello everyone";
// 发布消息到Exchange
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [生产者] Sent '" + message + "'");
channel.close();
connection.close();
}
}
消费者1:
package cn.itcast.rabbitmq.fanout;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import cn.itcast.rabbitmq.util.ConnectionUtil;
//消费者1
public class Recv {
private final static String QUEUE_NAME = "fanout_exchange_queue_1";
private final static String EXCHANGE_NAME = "fanout_exchange_test";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [消费者1] received : " + msg + "!");
}
};
// 监听队列,自动返回完成
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
消费者2:
package cn.itcast.rabbitmq.fanout;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import cn.itcast.rabbitmq.util.ConnectionUtil;
// 消费者2
public class Recv2 {
private final static String QUEUE_NAME = "fanout_exchange_queue_2";
private final static String EXCHANGE_NAME = "fanout_exchange_test";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [消费者2] received : " + msg + "!");
}
};
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
4.订阅模型-Direct
有选择性的接收消息.在订阅模式中,生产者发布消息,所有消费者都可以获取所有消息。
在路由模式中,我们将添加一个功能 - 我们将只能订阅一部分消息。 例如,我们只能将重要的错误消息引导到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)。消息的发送方在向Exchange发送消息时,也必须指定消息的routing key。
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
生产者:
package cn.itcast.rabbitmq.direct;
import cn.itcast.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
private final static String EXCHANGE_NAME = "direct_exchange_test";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明exchange,指定类型为direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 消息内容
String message = "商品删除了, id = 1001";
// 发送消息,并且指定routing key 为:insert ,代表新增商品
channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());
System.out.println(" [商品服务:] Sent '" + message + "'");
channel.close();
connection.close();
}
}
消费者1:
package cn.itcast.rabbitmq.direct;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import cn.itcast.rabbitmq.util.ConnectionUtil;
public class Recv {
private final static String QUEUE_NAME = "direct_exchange_queue_1";
private final static String EXCHANGE_NAME = "direct_exchange_test";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [消费者1] received : " + msg + "!");
}
};
// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
消费者2:
package cn.itcast.rabbitmq.direct;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import cn.itcast.rabbitmq.util.ConnectionUtil;
public class Recv2 {
private final static String QUEUE_NAME = "direct_exchange_queue_2";
private final static String EXCHANGE_NAME = "direct_exchange_test";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [消费者2] received : " + msg + "!");
}
};
// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
5.订阅模型-Topic
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符!
Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如:item.insert
通配符规则:
`#`:匹配一个或多个词 `*`:匹配不多不少恰好1个词
audit.#:能够匹配audit.irs.corporate 或者 audit.irs
audit.*:只能匹配audit.irs
生产者:
使用topic类型的Exchange,发送消息的routing key有3种: item.isnert、item.update、item.delete:
package cn.itcast.rabbitmq.topic;
import cn.itcast.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
private final static String EXCHANGE_NAME = "topic_exchange_test";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明exchange,指定类型为topic
// 第三个参数表明需要持久化
channel.exchangeDeclare(EXCHANGE_NAME, "topic",true);
// 消息内容
String message = "新增商品 : id = 1001";
// 发送消息,并且指定routing key 为:insert ,代表新增商品
// 第三个参数可以写成MessageProperties.PERSISTENT_TEXT_PLAIN 表示持久化消息
channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes());
System.out.println(" [商品服务:] Sent '" + message + "'");
channel.close();
connection.close();
}
}
消费者1:
package cn.itcast.rabbitmq.topic;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import cn.itcast.rabbitmq.util.ConnectionUtil;
public class Recv {
private final static String QUEUE_NAME = "topic_exchange_queue_1";
private final static String EXCHANGE_NAME = "topic_exchange_test";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
// 第二个参数表示队列的持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 绑定队列到交换机,同时指定需要订阅的routing key。需要 update、delete
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [消费者1] received : " + msg + "!");
}
};
// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
消费者2:
package cn.itcast.rabbitmq.topic;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import cn.itcast.rabbitmq.util.ConnectionUtil;
public class Recv2 {
private final static String QUEUE_NAME = "topic_exchange_queue_2";
private final static String EXCHANGE_NAME = "topic_exchange_test";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
// 第二个参数表示队列的持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [消费者2] received : " + msg + "!");
}
};
// 监听队列,自动ACK
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
持久化
如何避免消息丢失?
1) 消费者的ACK机制。可以防止消费者丢失消息。
2) 但是,如果在消费者消费之前,MQ就宕机了,消息就没了。
是可以将消息进行持久化呢?
要将消息持久化,前提是:队列、Exchange都持久化
交换机持久化
队列持久化
消息持久化
Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ。如何使用Spring-amqp呢?
添加依赖:
org.springframework.boot spring-boot-starter-amqp
在配置文件中添加:
spring:
rabbitmq:
host: 192.168.124.121
username: leyou
password: leyou
virtual-host: /leyou
添加监听者:
在SpringAmqp中,对消息的消费者进行了封装和抽象,一个普通的JavaBean中的普通方法,只要通过简单的注解,就可以成为一个消费者。
@Component
public class Listener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "spring.test.queue", durable = "true"),
exchange = @Exchange(
value = "spring.test.exchange",
ignoreDeclarationExceptions = "true",
type = ExchangeTypes.TOPIC
),
key = {"#.#"}))
public void listen(String msg){
System.out.println("接收到消息:" + msg);
}
}
@Componet:类上的注解,注册到Spring容器@RabbitListener:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:
bindings:指定绑定关系,可以有多个。值是@QueueBinding的数组。@QueueBinding包含下面属性:
value:这个消费者关联的队列。值是@Queue,代表一个队列exchange:队列所绑定的交换机,值是@Exchange类型key:队列和交换机绑定的RoutingKey
类似listen这样的方法在一个类中可以写多个,就代表多个消费者。
AmqpTemplate
Spring最擅长的事情就是封装,把他人的框架进行封装和整合。Spring为AMQP提供了统一的消息处理模板:AmqpTemplate,非常方便的发送消息,其发送方法:
红框圈起来的是比较常用的3个方法,分别是:
指定交换机、RoutingKey和消息体指定消息指定RoutingKey和消息,会向默认的交换机发送消息
测试代码
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class MqDemo {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void testSend() throws InterruptedException {
String msg = "hello, Spring boot amqp";
this.amqpTemplate.convertAndSend("spring.test.exchange","a.b", msg);
// 等待10秒后再结束
Thread.sleep(10000);
}
}
15.4 项目改造
接下来,我们就改造项目,实现搜索服务、商品静态页的数据同步。
思路分析
发送方:商品微服务
什么时候发?
当商品服务对商品进行写操作:增、删、改的时候,需要发送一条消息,通知其它服务。发送什么内容?
对商品的增删改时其它服务可能需要新的商品数据,但是如果消息内容中包含全部商品信息,数据量太大,而且并不是每个服务都需要全部的信息。因此我们只发送商品id,其它服务可以根据id查询自己需要的信息。
接收方:搜索微服务、静态页面微服务
接收消息后如何处理?
搜索微服务:
增/改:添加新的数据到索引库删:删除索引库数据 静态页微服务:
增/改:创建新的静态页删:删除原来的静态页
商品服务发送消息
1.引入依赖
org.springframework.boot spring-boot-starter-amqp
2.配置文件
spring:
rabbitmq:
host: 192.168.124.121
username: leyou
password: leyou
virtual-host: /leyou
template:
exchange: leyou.item.exchange
publisher-/confirm/is: true
template:有关AmqpTemplate的配置
exchange:缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个 publisher-/confirm/is:生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试
3.改造GoodsService
在GoodsService中封装一个发送消息到mq的方法:(需要注入AmqpTemplate模板)
private void sendMessage(Long id, String type){
// 发送消息
try {
this.amqpTemplate.convertAndSend("item." + type, id);
} catch (Exception e) {
logger.error("{}商品消息发送异常,商品id:{}", type, id, e);
}
}
这里没有指定交换机,因此默认发送到了配置中的:leyou.item.exchange
注意:这里要把所有异常都try起来,不能让消息的发送影响到正常的业务逻辑
然后在新增的时候调用:
修改的时候调用:
搜索服务接收消息
搜索服务接收到消息后要做的事情:
增:添加新的数据到索引库删:删除索引库数据改:修改索引库数据
因为索引库的新增和修改方法是合二为一的,因此我们可以将这两类消息一同处理,删除另外处理。
引入依赖和添加配置我们跳过,直接进入监听器的编写:
package com.leyou.search.listener;
import com.leyou.item.pojo.Spu;
import com.leyou.search.client.GoodsClient;
import com.leyou.search.pojo.Goods;
import com.leyou.search.repository.GoodsRepository;
import com.leyou.search.service.SearchService;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
//必须将listener注入spring容器
@Component
public class GoodsListener {
@Autowired
private GoodsRepository goodsRepository;
@Autowired
private GoodsClient goodsClient;
@Autowired
private SearchService searchService;
//添加rabbitMQ注解,指定交换机的类型,名称,队列名称以及其绑定的routingKey
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "LEYOU_SEARCH_SAVE_QUEUE", durable = "true"),
exchange = @Exchange(value = "LEYOU_ITEM_EXCHANGE", ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),
key = {"item.insert","item.update"}))
// 异常一定要抛出,因为spring会根据该方法是否抛出异常来决定是否发送ACK确认,若没有抛出异常则spring自动帮我们发送确认,若抛出异常,则消息仍然会继续存在于
// 消息队列之中
public void save(Long id) throws IOException {
if(id == null){
return;
}
Spu spu = this.goodsClient.querySpuBySpuId(id);
Goods goods = this.searchService.spuToGoods(spu);
this.goodsRepository.save(goods);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "LEYOU_SEARCH_DELETE_QUEUE", durable = "true"),
exchange = @Exchange(value = "LEYOU_ITEM_EXCHANGE", ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),
key = {"item.delete"}))
public void delete(Long id){
if(id == null){
return;
}
this.goodsRepository.deleteById(id);
}
}
静态页面服务接收消息
商品静态页服务接收到消息后的处理:
增:创建新的静态页删:删除原来的静态页改:创建新的静态页并覆盖原来的
不过,我们编写的创建静态页的方法也具备覆盖以前页面的功能,因此:增和改的消息可以放在一个方法中处理,删除消息放在另一个方法处理。
package com.leyou.goods.listener;
import com.leyou.goods.service.impl.GoodsHtmlServiceImpl;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.File;
@Component
public class GoodsListener {
@Autowired
private GoodsHtmlServiceImpl goodsHtmlService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "LEYOU_GOODS_SAVE_QUEUE",durable = "true"),
exchange = @Exchange(value = "LEYOU_ITEM_EXCHANGE", ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),
key = {"item.insert","item.update"}
))
public void save(Long id){
if(id == null){
return;
}
this.goodsHtmlService.createHtml(id);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "LEYOU_GOODS_DELETE_QUEUE",durable = "true"),
exchange = @Exchange(value = "LEYOU_ITEM_EXCHANGE", ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),
key = {"item.delete"}
))
public void delete(Long id){
if(id == null){
return;
}
File file = new File("D:\ApplicationProgram\nginx-1.14.0\html\api\goods\item\" + id + ".html");
file.deleteOnExit();
}
}



