栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

乐忧商城项目总结-4

乐忧商城项目总结-4

乐忧商城

13.搜索过滤

13.1 生成分类和品牌过滤13.3 生成规格参数过滤13.4 过滤条件的筛选13.5 页面展示选择的过滤项13.6 取消过滤项13.7 优化 14.thymeleaf及其静态化

14.1 商品详情14.2 页面静态化 15.RabbitMQ

15.1 RabbitMQ简介15.2 五种消息模型15.3 Spring AMQP15.4 项目改造

13.搜索过滤 13.1 生成分类和品牌过滤

对于过滤功能,先看一下想要实现的效果:

整个过滤部分有3块:

顶部的导航,已经选择的过滤条件展示:

商品分类面包屑,根据用户选择的商品分类变化其它已选择过滤参数 过滤条件展示,又包含3部分

商品分类展示品牌展示其它规格参数 展开或收起的过滤条件的按钮

顶部导航要展示的内容跟用户选择的过滤条件有关。

比如用户选择了某个商品分类,则面包屑中才会展示具体的分类比如用户选择了某个品牌,列表中才会有品牌信息。

所以,这部分需要依赖第二部分:过滤条件的展示和选择。因此我们先不着急去做。展开或收起的按钮是否显示,取决于过滤条件有多少,如果很少,那么就没必要展示。所以也是跟第二部分的过滤条件有关。

这样分析来看,我们必须先做第二部分:过滤条件展示。

先来看分类和品牌。在我们的数据库中已经有所有的分类和品牌信息。在这个位置,是不是把所有的分类和品牌信息都展示出来呢?显然不是,用户搜索的条件会对商品进行过滤,而在搜索结果中,不一定包含所有的分类和品牌,直接展示出所有商品分类,让用户选择显然是不合适的。无论是分类信息,还是品牌信息,都应该从搜索的结果商品中进行聚合得到。

原来,我们返回的结果是PageResult对象,里面只有total、totalPage、items3个属性。但是现在要对商品分类和品牌进行聚合,数据显然不够用,我们需要对返回的结果进行扩展,添加分类和品牌的数据,由于后面也要返回聚合后的规格参数,因此这里统一扩展SearchResult类。

package com.leyou.search.pojo;

import com.leyou.common.pojo.PageResult;
import com.leyou.item.pojo.Brand;

import java.util.List;
import java.util.Map;

public class SearchResult extends PageResult {
    private List brands;
    private List> categories;
    private List> specs;

    public List> getSpecs() {
        return specs;
    }

    public void setSpecs(List> specs) {
        this.specs = specs;
    }

    public SearchResult(Long total, List items, List brands, List> categories, List> specs) {
        super(total, items);
        this.brands = brands;
        this.categories = categories;
        this.specs = specs;
    }

    public SearchResult(Long total, Integer totalPage, List items, List brands, List> categories, List> specs) {
        super(total, totalPage, items);
        this.brands = brands;
        this.categories = categories;
        this.specs = specs;
    }

    public SearchResult(List brands, List> categories, List> specs) {
        this.brands = brands;
        this.categories = categories;
        this.specs = specs;
    }

    public List getBrands() {
        return brands;
    }

    public void setBrands(List brands) {
        this.brands = brands;
    }

    public List> getCategories() {
        return categories;
    }

    public void setCategories(List> categories) {
        this.categories = categories;
    }
}

修改SearchServiceImpl:

public SearchResult search(SearchRequest request) {

    // 判断查询条件
    if (StringUtils.isBlank(request.getKey())) {
        // 返回默认结果集
        return null;
    }

    // 初始化自定义查询构建器
    NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();
    // 添加查询条件
    queryBuilder.withQuery(QueryBuilders.matchQuery("all", request.getKey()).operator(Operator.AND));
    // 添加结果集过滤,只需要:id,subTitle, skus
    queryBuilder.withSourceFilter(new FetchSourceFilter(new String[]{"id", "subTitle", "skus"}, null));

    // 获取分页参数
    Integer page = request.getPage();
    Integer size = request.getSize();
    // 添加分页
    queryBuilder.withPageable(PageRequest.of(page - 1, size));

    String categoryAggName = "categories";
    String brandAggName = "brands";
    queryBuilder.addAggregation(AggregationBuilders.terms(categoryAggName).field("cid3"));
    queryBuilder.addAggregation(AggregationBuilders.terms(brandAggName).field("brandId"));

    // 执行搜索,获取搜索的结果集
    AggregatedPage goodsPage = (AggregatedPage)this.goodsReponsitory.search(queryBuilder.build());

    // 解析聚合结果集
    List> categories = getCategoryAggResult(goodsPage.getAggregation(categoryAggName));
    List brands = getBrandAggResult(goodsPage.getAggregation(brandAggName));

    // 封装成需要的返回结果集
    return new SearchResult(goodsPage.getContent(), goodsPage.getTotalElements(), goodsPage.getTotalPages(), categories, brands);
}

private List getBrandAggResult(Aggregation aggregation) {
    // 处理聚合结果集
    LongTerms terms = (LongTerms)aggregation;
    // 获取所有的品牌id桶
    List buckets = terms.getBuckets();
    // 定义一个品牌集合,搜集所有的品牌对象
    List brands = new ArrayList<>();
    // 解析所有的id桶,查询品牌
    buckets.forEach(bucket -> {
        Brand brand = this.brandClient.queryBrandById(bucket.getKeyAsNumber().longValue());
        brands.add(brand);
    });
    return brands;
    // 解析聚合结果集中的桶,把桶的集合转化成id的集合
    // List brandIds = terms.getBuckets().stream().map(bucket -> bucket.getKeyAsNumber().longValue()).collect(Collectors.toList());
    // 根据ids查询品牌
    //return brandIds.stream().map(id -> this.brandClient.queryBrandById(id)).collect(Collectors.toList());
    // return terms.getBuckets().stream().map(bucket -> this.brandClient.queryBrandById(bucket.getKeyAsNumber().longValue())).collect(Collectors.toList());
}


private List> getCategoryAggResult(Aggregation aggregation) {
    // 处理聚合结果集
    LongTerms terms = (LongTerms)aggregation;
    // 获取所有的分类id桶
    List buckets = terms.getBuckets();
    // 定义一个品牌集合,搜集所有的品牌对象
    List> categories = new ArrayList<>();
    List cids = new ArrayList<>();
    // 解析所有的id桶,查询品牌
    buckets.forEach(bucket -> {
        cids.add(bucket.getKeyAsNumber().longValue());
    });
    List names = this.categoryClient.queryNamesByIds(cids);
    for (int i = 0; i < cids.size(); i++) {
        Map map = new HashMap<>();
        map.put("id", cids.get(i));
        map.put("name", names.get(i));
        categories.add(map);
    }
    return categories;
}

页面渲染数据结构
我们可以把所有的过滤条件放入一个数组中,然后在页面利用v-for遍历一次生成。
其基本结构是这样的:

[
    {
        k:"过滤字段名",
        options:[{},{}]
    }
]

我们先在data中定义数组:filters,等待组装过滤参数:

data: {
    ly,
    search:{
        key: "",
        page: 1
    },
    goodsList:[], // 接收搜索得到的结果
    total: 0, // 总条数
    totalPage: 0, // 总页数
    filters:[] // 过滤参数集合
},

然后在查询搜索结果的回调函数中,对过滤参数进行封装:

页面渲染数据
我们注意到,虽然页面元素是一样的,但是品牌会比其它搜索条件多出一些样式,因为品牌是以图片展示。需要进行特殊处理。数据展示是一致的,我们采用v-for处理:

{{f.k}}
13.3 生成规格参数过滤

有四个问题需要先思考清楚:

什么时候显示规格参数过滤? 分类只有一个如何知道哪些规格需要过滤?要过滤的参数,其可选值是如何获取的?规格过滤的可选值,其数据格式怎样的?

1.什么情况下显示有关规格参数的过滤?
如果用户尚未选择商品分类,或者聚合得到的分类数大于1,那么就没必要进行规格参数的聚合。因为不同分类的商品,其规格是不同的。因此,我们在后台需要对聚合得到的商品分类数量进行判断,如果等于1,我们才继续进行规格参数的聚合。
**2.如何知道哪些规格需要过滤? **
我们不能把数据库中的所有规格参数都拿来过滤。因为并不是所有的规格参数都可以用来过滤,参数的值是不确定的。我们在设计规格参数时,已经标记了某些规格可搜索,某些不可搜索。因此,一旦商品分类确定,我们就可以根据商品分类查询到其对应的规格,从而知道哪些规格要进行搜索。
3.要过滤的参数,其可选值是如何获取的?
虽然数据库中有所有的规格参数,但是不能把一切数据都用来供用户选择。与商品分类和品牌一样,应该是从用户搜索得到的结果中聚合,得到与结果品牌的规格参数可选值。
4.要过滤的可选值,其数据格式怎样的?

具体怎么实现呢?总结为以下五步:

1)用户搜索得到商品,并聚合出商品分类2)判断分类数量是否等于1,如果是则进行规格参数聚合3)先根据分类,查找可以用来搜索的规格4)对规格参数进行聚合5)将规格参数聚合结果整理后返回

具体的实现过程就不一一分析了,直接贴上代码即可:
GoodsServiceImpl类:

package com.leyou.search.service.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.leyou.common.pojo.PageResult;
import com.leyou.item.pojo.*;
import com.leyou.search.client.BrandClient;
import com.leyou.search.client.CategoryClient;
import com.leyou.search.client.GoodsClient;
import com.leyou.search.client.SpecificationClient;
import com.leyou.search.pojo.Goods;
import com.leyou.search.pojo.SearchRequest;
import com.leyou.search.pojo.SearchResult;
import com.leyou.search.repository.GoodsRepository;
import com.leyou.search.service.SearchService;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.elasticsearch.core.aggregation.AggregatedPage;
import org.springframework.data.elasticsearch.core.query.FetchSourceFilter;
import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Service
public class SearchServiceImpl implements SearchService {
    @Autowired
    private BrandClient brandClient;

    @Autowired
    private CategoryClient categoryClient;

    @Autowired
    private GoodsClient goodsClient;

    @Autowired
    private SpecificationClient specificationClient;

    @Autowired
    private GoodsRepository goodsRepository;

    private static final ObjectMapper MAPPER = new ObjectMapper();
    
    @Override
    public Goods spuToGoods(Spu spu) throws IOException {
        Goods goods = new Goods();
        goods.setId(spu.getId());
        List ids = Arrays.asList(spu.getCid1(), spu.getCid2(), spu.getCid3());
        List names = this.categoryClient.queryNamesById(ids);
        String brandName = this.brandClient.queryByid(spu.getBrandId()).getName();
        String all = spu.getTitle() + " " + StringUtils.join(names, " ") + " " + brandName;
        goods.setAll(all);
        goods.setSubTitle(spu.getSubTitle());
        goods.setBrandId(spu.getBrandId());
        goods.setCid1(spu.getCid1());
        goods.setCid2(spu.getCid2());
        goods.setCid3(spu.getCid3());
        goods.setCreateTime(spu.getCreateTime());
        List prices = new ArrayList<>();
        List> skus = new ArrayList<>();
        List skusList = this.goodsClient.querySkusBySpuId(spu.getId());
        skusList.forEach(sku -> {
            prices.add(sku.getPrice());
            Map map = new HashMap<>();
            map.put("id",sku.getId());
            map.put("title",sku.getTitle());
            String images = sku.getImages();
            map.put("images",StringUtils.isEmpty(images)?"":images.split(",")[0]);
            map.put("price",sku.getPrice());
            skus.add(map);
        });
        goods.setPrice(prices);
        goods.setSkus(MAPPER.writevalueAsString(skus));

        Map specs = new HashMap<>();
        List params = this.specificationClient.queryParams(null, spu.getCid3(), null, true);
        SpuDetail spuDetail = this.goodsClient.querySpuDetailBySpuId(spu.getId());
        String genericSpec = spuDetail.getGenericSpec();
        String specialSpec = spuDetail.getSpecialSpec();
        Map genericSpecMap = MAPPER.readValue(genericSpec, new TypeReference>() {
        });
        Map> specialSpecMap = MAPPER.readValue(specialSpec, new TypeReference>>() {
        });
        params.forEach(param -> {
            if(param.getGeneric()){
                String value = genericSpecMap.get(param.getId().toString()).toString();
                if(param.getNumeric()){
                    value = chooseSegment(value, param);
                }
                specs.put(param.getName(),value);
            }
            else{
                String value = specialSpecMap.get(param.getId().toString()).toString();
                specs.put(param.getName(),value);
            }
        });
        goods.setSpecs(specs);
        return goods;
    }

    
    @Override
    public SearchResult search(SearchRequest searchRequest) {
        //获得搜索的关键字
        String key = searchRequest.getKey();
        if(StringUtils.isEmpty(key)){
            return null;
        }

        NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();
        //QueryBuilder basicQuery = QueryBuilders.matchQuery("all", searchRequest.getKey()).operator(Operator.AND);
        QueryBuilder basicQuery = buildBoolQueryBuilder(searchRequest);
        queryBuilder.withQuery(basicQuery);
        queryBuilder.withSourceFilter(new FetchSourceFilter(new String[]{"id","subTitle","skus"},null));
        queryBuilder.withPageable(PageRequest.of(searchRequest.getPage()-1,searchRequest.getSize()));
        //添加品牌聚合
        String brandAggName="brands";
        queryBuilder.addAggregation(AggregationBuilders.terms(brandAggName).field("brandId"));
        //添加分类聚合
        String categoryAggName="categories";
        queryBuilder.addAggregation(AggregationBuilders.terms(categoryAggName).field("cid3"));
        //获取查询结果
        AggregatedPage goods = (AggregatedPage) this.goodsRepository.search(queryBuilder.build());

        //将查询结果转化成需要的形式
        List brands = getBrandAggResult(goods.getAggregation(brandAggName));
        List> categories = getCategoryAggResult(goods.getAggregation(categoryAggName));
        List> specs = null;
        //如果分类只有一个就对相应的规格参数进行聚合,如果分类有多个则对所有规格参数进行聚合没有意义且浪费时间
        if(!CollectionUtils.isEmpty(categories) && categories.size() == 1){
            specs = getSpecsAggResult((Long)categories.get(0).get("id"), basicQuery);
        }
        SearchResult result = new SearchResult(goods.getTotalElements(), goods.getTotalPages(), goods.getContent(),brands,categories,specs);
        return result;
    }

    
    private String chooseSegment(String value, SpecParam p) {
        double val = NumberUtils.toDouble(value);
        String result = "其它";
        // 保存数值段
        for (String segment : p.getSegments().split(",")) {
            String[] segs = segment.split("-");
            // 获取数值范围
            double begin = NumberUtils.toDouble(segs[0]);
            double end = Double.MAX_VALUE;
            if(segs.length == 2){
                end = NumberUtils.toDouble(segs[1]);
            }
            // 判断是否在范围内
            if(val >= begin && val < end){
                if(segs.length == 1){
                    result = segs[0] + p.getUnit() + "以上";
                }else if(begin == 0){
                    result = segs[1] + p.getUnit() + "以下";
                }else{
                    result = segment + p.getUnit();
                }
                break;
            }
        }
        return result;
    }

    
    private List getBrandAggResult(Aggregation aggregation){
        // 强制转换
        LongTerms longTerms = (LongTerms) aggregation;
        return longTerms.getBuckets().stream().map(bucket -> this.brandClient.queryByid(bucket.getKeyAsNumber().longValue())
        ).collect(Collectors.toList());
    }

    
    private List> getCategoryAggResult(Aggregation aggregation){
        LongTerms longTerms = (LongTerms) aggregation;
        return longTerms.getBuckets().stream().map(bucket -> {
            Map map = new HashMap<>();
            Long id = bucket.getKeyAsNumber().longValue();
            List names = this.categoryClient.queryNamesById(Arrays.asList(id));
            map.put("id",id);
            map.put("name",names.get(0));
            return map;
        }).collect(Collectors.toList());
    }

    
    private List> getSpecsAggResult(Long cid, QueryBuilder basicQuery){
        List> specs = new ArrayList<>();
        //根据cid查询所有的规格参数
        List params = this.specificationClient.queryParams(null, cid, null, true);
        //构造新的查询
        NativeSearchQueryBuilder queryBuilder = new NativeSearchQueryBuilder();
        //传入查询条件
        queryBuilder.withQuery(basicQuery);
        //添加过滤集
        queryBuilder.withSourceFilter(new FetchSourceFilter(new String[]{},null));
        //添加聚合查询
        params.forEach(param -> {
            //将规格参数名称作为聚合名称
            queryBuilder.addAggregation(AggregationBuilders.terms(param.getName()).field("specs." + param.getName() + ".keyword"));
        });
        //执行查询
        AggregatedPage search = (AggregatedPage)this.goodsRepository.search(queryBuilder.build());
        //处理查询结果
        Map aggregationMap = search.getAggregations().asMap();
        for (Map.Entry entry : aggregationMap.entrySet()) {
            Map map = new HashMap<>();
            String key = entry.getKey();
            StringTerms value = (StringTerms)entry.getValue();
            map.put("key", key);
            List collect = value.getBuckets().stream().map(bucket -> bucket.getKeyAsString()).collect(Collectors.toList());
            map.put("options", collect);
            specs.add(map);
        }
        return specs;
    }

    
    private QueryBuilder buildBoolQueryBuilder(SearchRequest searchRequest){
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        // 添加must条件
        boolQueryBuilder.must(QueryBuilders.matchQuery("all",searchRequest.getKey()).operator(Operator.AND));
        // 添加filter条件
        Map filter = searchRequest.getFilter();
        if(filter != null) {
            for (Map.Entry entry : filter.entrySet()) {
                String key = entry.getKey();
                String term;
                //将key转换成相应的查询字段
                if (StringUtils.equals("品牌", key)) {
                    term = "brandId";
                } else if (StringUtils.equals("分类", key)) {
                    term = "cid3";
                } else {
                    term = "specs." + key + ".keyword";
                }
                boolQueryBuilder.filter(QueryBuilders.termQuery(term, entry.getValue()));
            }
        }
        return boolQueryBuilder;
    }
}

页面渲染
首先把后台传递过来的specs添加到filters数组:要注意:分类、品牌的option选项是对象,里面有name属性,而specs中的option是简单的字符串,所以需要进行封装,变为相同的结构:

data.specs.forEach(spec => {
                       spec.options = spec.options.map(o => ({name: o}));
                       _this.filters.push(spec);
                    });

最后渲染的结果如下:

展示或收起过滤条件
如果感觉显示的太多了,我们可以通过按钮点击来展开和隐藏部分内容:


我们在data中定义变量,记录展开或隐藏的状态:

然后在按钮绑定点击事件,以改变show的取值:

在展示规格时,对show进行判断:

13.4 过滤条件的筛选

当我们点击页面的过滤项,要做哪些事情?

把过滤条件保存在search对象中(watch监控到search变化后就会发送到后台)在页面顶部展示已选择的过滤项把商品分类展示到顶部面包屑
保存过滤项
我们把已选择的过滤项保存在search中:

这里有一个很隐蔽的坑要注意,在created构造函数中会对search进行初始化,所以要在构造函数中对filter进行初始化,否则又会出现filter条件变了但是监听不生效的情况,因为钩子函数初始化search时没有filter对象,导致后面即使filter发生变化也不会被监听到

search.filter是一个对象,结构:

{
    "过滤项名":"过滤项值"
}

然后给所有的过滤项绑定点击事件:

要注意,点击事件传2个参数:

k:过滤项的keyoption:当前过滤项对象

在点击事件中,保存过滤项到selectedFilter:

selectFilter(k, o){
    const obj = {};
    Object.assign(obj, this.search);
    if(k === '分类' || k === '品牌'){
        o = o.id;
    }
    obj.filter[k] = o.name || o;
    this.search = obj;
}

另外,这里search对象中嵌套了filter对象,请求参数格式化时需要进行特殊处理,修改common.js中的一段代码:

关于后台代码的实现,已经粘贴过了,具体的实现步骤就直接看代码吧!

13.5 页面展示选择的过滤项

当用户选择一个商品分类以后,我们应该在过滤模块的上方展示一个面包屑,把三级商品分类都显示出来。用户选择的商品分类就存放在search.filter中,但是里面只有第三级分类的id:cid3,我们需要根据它查询出所有三级分类的id及名称。于是我们需要在后台提供相应的接口。

在页面重新加载完毕后,此时因为过滤条件中加入了商品分类的条件,所以查询的结果中只有1个分类。我们判断商品分类是否只有1个,如果是,则查询三级商品分类,添加到面包屑即可。

然后渲染:

其它过滤项
接下来,我们需要在页面展示用户已选择的过滤项,如图:

我们知道,所有已选择过滤项都保存在search.filter中,因此在页面遍历并展示即可。
基本有四类数据:

商品分类:这个不需要展示,分类展示在面包屑位置品牌:这个要展示,但是其key和值不合适,我们不能显示一个id在页面。需要找到其name值数值类型规格:这个展示的时候,需要把单位查询出来非数值类型规格:这个直接展示其值即可

具体怎么实现也并不难,这里就略了。
隐藏已经选择的过滤项
现在,我们已经实现了已选择过滤项的展示,但是你会发现一个问题:已经选择的过滤项,在过滤列表中依然存在:这些已经选择的过滤项,应该从列表中移除。怎么做呢?你必须先知道用户选择了什么。用户选择的项保存在search.filter中:我们可以编写一个计算属性,把filters中的 已经被选择的key过滤掉:

computed:{
    remainFilters(){
        const keys = Object.keys(this.search.filter);
        if(this.search.filter.cid3){
            keys.push("分类")
        }
        if(this.search.filter.brandId){
            keys.push("品牌")
        }
        return this.filters.filter(f => !keys.includes(f.k));
    }
}

然后页面不再直接遍历filters,而是遍历remainFilters

最后发现,还剩下一堆没选过的。但是都只有一个可选项,此时再过滤没有任何意义,应该隐藏,所以,在刚才的过滤条件中,还应该添加一条:如果只剩下一个可选项,不显示

13.6 取消过滤项

我们能够看到,每个过滤项后面都有一个小叉,当点击后,应该取消对应条件的过滤。
思路非常简单:

给小叉绑定点击事件点击后把过滤项从search.filter中移除,页面会自动刷新,OK

绑定点击事件时,把k传递过去,方便删除

removeFilter(k){
    this.search.filter[k] = null;
}
13.7 优化

搜索系统需要优化的点:

查询规格参数部分可以添加缓存聚合计算interval变化频率极低,所以可以设计为定时任务计算(周期为天),然后缓存起来。elasticsearch本身有查询缓存,可以不进行优化商品图片应该采用缩略图,减少流量,提高页面加载速度图片采用延迟加载图片还可以采用CDN服务器sku信息应该在页面异步加载,而不是放到索引库 14.thymeleaf及其静态化 14.1 商品详情

当用户搜索到商品,肯定会点击查看,就会进入商品详情页,接下来我们完成商品详情页的展示。在商品详情页中,我们会使用到Thymeleaf来渲染页面,所以需要先了解Thymeleaf的语法。其实我觉得Thymeleaf和jsp语法也差不多,学了不用就忘,然后再学,唉。

商品详情浏览量比较大,并发高,我们会独立开启一个微服务,用来展示商品详情,该微服务命名为leyou-goods-web,依然三步走。
1.引入依赖:



    
        leyou
        com.leyou.parent
        1.0.0-SNAPSHOT
    
    4.0.0

    com.leyou.goods
    leyou-goods-web

    
        11
        11
    

    
        
            org.springframework.cloud
            spring-cloud-starter-netflix-eureka-client
        
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.boot
            spring-boot-starter-thymeleaf
        
        
            org.springframework.cloud
            spring-cloud-starter-openfeign
        
        
            com.leyou.item
            leyou-item-interface
            1.0.0-SNAPSHOT
        
        
            com.leyou.common
            leyou-common
            1.0.0-SNAPSHOT
        
        
            org.springframework.boot
            spring-boot-starter-test
        
        
            org.springframework.boot
            spring-boot-starter-amqp
        
    


2.添加配置文件application.yml

server:
  port: 8084
spring:
  application:
    name: goods-web
  thymeleaf:
    cache: false
  rabbitmq:
    host: 192.168.124.121
    virtual-host: /leyou
    username: leyou
    password: leyou
eureka:
  client:
    service-url:
      defaultZone: http://127.0.0.1:10086/eureka
    registry-fetch-interval-seconds: 5

3.编写引导类:

package com.leyou;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;

@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
public class LeyouGoodsWebApplication {
    public static void main(String[] args) {
        SpringApplication.run(LeyouGoodsWebApplication.class,args);
    }
}

修改页面跳转路径

在zuul网关组件中添加路由:

zuul:
  prefix: /api
  routes:
    item-service: /item
    @Override
    public Map queryDetailsOfItem(Long spuId) {
        Map resultMap = new HashMap<>();
        //查询spu
        Spu spu = this.goodsClient.querySpuBySpuId(spuId);
        resultMap.put("spu",spu);

        //查询spuDetail
        SpuDetail spuDetail = this.goodsClient.querySpuDetailBySpuId(spuId);
        resultMap.put("spuDetail",spuDetail);

        //查询skus
        List skus = this.goodsClient.querySkusBySpuId(spuId);
        resultMap.put("skus",skus);

        //查询品牌
        Brand brand = this.brandClient.queryByid(spu.getBrandId());
        resultMap.put("brand",brand);

        //查询分类
        List ids = Arrays.asList(spu.getCid1(), spu.getCid2(), spu.getCid3());
        List names = this.categoryClient.queryNamesById(ids);
        List> categories = new ArrayList<>();
        for (int i = 0; i < ids.size(); i++) {
            Map map = new HashMap<>();
            map.put("id",ids.get(i));
            map.put("name",names.get(i));
            categories.add(map);
        }
        resultMap.put("categories",categories);

        //查询规格参数组
        List specGroups = this.specificationClient.querySpecGroupsByCid(spu.getCid3());
        resultMap.put("groups",specGroups);

        //查询特殊规格参数
        List params = this.specificationClient.queryParams(null, spu.getCid3(), false, null);
        Map paramMap = new HashMap<>();
        params.forEach(param -> {
            paramMap.put(param.getId(),param.getName());
        });
        resultMap.put("paramMap",paramMap);
        //返回结果
        return resultMap;
    }

}

渲染商品列表

这个部分需要渲染的数据有5块:

sku图片sku标题副标题sku价格特有规格属性列表

其中,sku 的图片、标题、价格,都必须在用户选中一个具体sku后,才能渲染。而特有规格属性列表可以在spuDetail中查询到。而副标题则是在spu中,直接可以在页面渲染。因此,我们先对特有规格属性列表进行渲染。等用户选择一个sku,再通过js对其它sku属性渲染。

渲染规格属性列表
规格属性列表将来会有事件和动态效果。我们需要有js代码参与,不能使用Thymeleaf来渲染了。因此,这里我们用vue,不过需要先把数据放到js对象中,方便vue使用

我们在页面的head中,定义一个js标签,然后在里面定义变量,保存与sku相关的一些数据:


specialSpec:这是SpuDetail中唯一与Sku相关的数据
因此我们并没有保存整个spuDetail,而是只保留了这个属性,而且需要手动转为js对象。paramMap:规格参数的id和name键值对,方便页面根据id获取参数名skus:sku集合

通过Vue渲染
我们把刚才获得的几个变量保存在Vue实例中:

然后在页面中渲染:

{{paramMap[k]}}
{{str}} 

数据成功渲染了。不过我们发现所有的规格都被勾选了。这是因为现在,每一个规格都有式:selected,我们应该只选中一个,让它的class样式为selected才对!

规格属性的筛选
每一个规格项是数组中的一个元素,因此我们只要保存被选择的规格项的索引,就能判断哪个是用户选择的了!我们需要一个对象来保存用户选择的索引,格式如下:

{
    "4":0,
    "12":0,
    "13":0
}

但问题是,第一次进入页面时,用户并未选择任何参数。因此索引应该有一个默认值,我们将默认值设置为0。我们在head的script标签中,对索引对象进行初始化:

然后将其保存在vue之中。
页面改造
我们在页面中,通过判断indexes的值来判断当前规格是否被选中,并且给规格绑定点击事件,点击规格项后,修改indexes中的对应值:

{{paramMap[k]}}
{{str}} 

确定SKU
在我们设计sku数据的时候,就已经添加了一个字段:indexes,这其实就是规格参数的索引组合。而我们在页面中,用户点击选择规格后,就会把对应的索引保存起来,因此,我们可以根据这个indexes来确定用户要选择的sku。我们在vue中定义一个计算属性,来计算与索引匹配的sku:

computed:{
    sku(){
        const index = Object.values(this.indexes).join("_");
        return this.skus.find(s => s.indexes == index);
    }
}

渲染sku列表
既然已经拿到了用户选中的sku,接下来,就可以在页面渲染数据了

商品图片是一个字符串,以,分割,页面展示比较麻烦,所以我们编写一个计算属性:images(),将图片字符串变成数组:

computed: {
    sku(){
        const index = Object.values(this.indexes).join("_");
        return this.skus.find(s=>s.indexes==index);
    },
    images(){
        return this.sku.images ? this.sku.images.split(",") : [''];
    }
},

将页面改造成如下即可:

   
   
      
         
      
   
   
   
      <
      
      
         

  • >
  • 注意商品详情是HTML代码,我们不能使用 th:text,应该使用th:utext

    
    

    规格包装
    规格包装分成两部分:

    规格参数包装列表

    而且规格参数需要按照组来显示
    规格参数的值分为两部分:

    通用规格参数:保存在SpuDetail中的genericSpec中特有规格参数:保存在sku的ownSpec中

    所以我们需要把这两部分值取出来,放到groups中。
    从spuDetail中取出genericSpec并取出groups:

    把genericSpec引入到Vue实例:

    因为sku是动态的,所以我们编写一个计算属性,来进行值的组合:

    groups(){
        groups.forEach(group => {
            group.params.forEach(param => {
                if(param.generic){
                    // 通用属性,去spu的genericSpec中获取
                    param.v = this.genericSpec[param.id] || '其它';
                }else{
                    // 特有属性值,去SKU中获取
                    param.v = JSON.parse(this.sku.ownSpec)[param.id]
                }
            })
        })
        return groups;
    }
    

    然后页面渲染:

    {{group.name}}

    {{p.name}}
    {{p.v + (p.unit || '')}}

    包装列表
    包装列表在商品详情中,我们一开始并没有赋值到Vue实例中,但是可以通过Thymeleaf来渲染

    包装清单

    售后服务
    售后服务也可以通过Thymeleaf进行渲染:

    售后保障

    14.2 页面静态化

    问题分析
    现在,我们的页面是通过Thymeleaf模板引擎渲染后返回到客户端。在后台需要大量的数据查询,而后渲染得到HTML页面。会对数据库造成压力,并且请求的响应时间过长,并发能力不高。那么怎么解决呢?首先我们能想到的就是缓存技术,比如之前学习过的Redis。不过Redis适合数据规模比较小的情况。假如数据量比较大,例如我们的商品详情页。每个页面如果10kb,100万商品,就是10GB空间,对内存占用比较大。此时就给缓存系统带来极大压力,如果缓存崩溃,接下来倒霉的就是数据库了。所以缓存并不是万能的,某些场景需要其它技术来解决,比如静态化。
    什么是静态化
    静态化是指把动态生成的HTML页面变为静态内容保存,以后用户的请求到来,直接访问静态页面,不再经过服务的渲染。而静态的HTML页面可以部署在nginx中,从而大大提高并发能力,减小tomcat压力。
    如何实现静态化
    目前,静态化页面都是通过模板引擎来生成,而后保存到nginx服务器来部署。常用的模板引擎比如:

    FreemarkerVelocityThymeleaf

    我们之前就使用的Thymeleaf,来渲染html返回给用户。Thymeleaf除了可以把渲染结果写入Response,也可以写到本地文件,从而实现静态化。

    Thymeleaf实现静态化
    先说下Thymeleaf中的几个概念:

    Context:运行上下文TemplateResolver:模板解析器TemplateEngine:模板引擎

    1.Context
    上下文: 用来保存模型数据,当模板引擎渲染时,可以从Context上下文中获取数据用于渲染。当与SpringBoot结合使用时,我们放入Model的数据就会被处理到Context,作为模板渲染的数据使用。
    2.TemplateResolver
    模板解析器:用来读取模板相关的配置,例如:模板存放的位置信息,模板文件名称,模板文件的类型等等。当与SpringBoot结合时,TemplateResolver已经由其创建完成,并且各种配置也都有默认值,比如模板存放位置,其默认值就是:templates。比如模板文件类型,其默认值就是html。
    3.TemplateEngine
    模板引擎:用来解析模板的引擎,需要使用到上下文、模板解析器。分别从两者中获取模板中需要的数据,模板文件。然后利用内置的语法规则解析,从而输出解析后的文件。来看下模板引擎进行处理的函数:

    templateEngine.process("模板名", context, writer);
    

    三个参数:

    模板名称上下文:里面包含模型数据writer:输出目的地的流

    在输出时,我们可以指定输出的目的地,如果目的地是Response的流,那就是网络响应。如果目的地是本地文件,那就实现静态化了。而在SpringBoot中已经自动配置了模板引擎,因此我们不需要关心这个。现在我们做静态化,就是把输出的目的地改成本地文件即可!
    GoodsHtmlServiceImpl类:

    package com.leyou.goods.service.impl;
    
    import com.leyou.goods.service.GoodsService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import org.thymeleaf.TemplateEngine;
    import org.thymeleaf.context.Context;
    
    import java.io.File;
    import java.io.FileNotFoundException;
    import java.io.PrintWriter;
    
    @Service
    public class GoodsHtmlServiceImpl {
        // thymeleaf启动器已经自动注入了模板引擎
        @Autowired
        private TemplateEngine engine;
    
        @Autowired
        private GoodsService goodsService;
    
        public void createHtml(Long spuId){
            //初始化运行环境
            Context context = new Context();
            //设置变量
            context.setVariables(this.goodsService.queryDetailsOfItem(spuId));
            PrintWriter printWriter = null;
            try {
                File file = new File("D:\ApplicationProgram\nginx-1.14.0\html\api\goods\item\"+spuId+".html");
                printWriter = new PrintWriter(file);
                engine.process("item",context,printWriter);
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            } finally {
                if(printWriter != null){
                    printWriter.close();
                }
            }
        }
    
        public void asyncExcute(Long spuId) {
            ThreadUtils.execute(()->createHtml(spuId));
            
        }
    }
    
    

    线程工具类:

    public class ThreadUtils {
    
        private static final ExecutorService es = Executors.newFixedThreadPool(10);
    
        public static void execute(Runnable runnable) {
            es.submit(runnable);
        }
    }
    

    什么时候创建静态文件
    我们编写好了创建静态文件的service,那么问题来了:什么时候去调用它呢?想想这样的场景:假如大部分的商品都有了静态页面。那么用户的请求都会被nginx拦截下来,根本不会到达我们的leyou-goods-web服务。只有那些还没有页面的请求,才可能会到达这里。因此,如果请求到达了这里,我们除了返回页面视图外,还应该创建一个静态页面,那么下次就不会再来麻烦我们了。所以,我们在GoodsController中添加逻辑,去生成静态html文件:

    @GetMapping("{id}.html")
    public String toItemPage(@PathVariable("id")Long id, Model model){
    
        // 加载所需的数据
        Map map = this.goodsService.loadModel(id);
        // 把数据放入数据模型
        model.addAllAttributes(map);
    
        // 页面静态化
        this.goodsHtmlService.asyncExcute(id);
    
        return "item";
    }
    

    注意:生成html 的代码不能对用户请求产生影响,所以这里我们使用额外的线程进行异步创建。
    nginx代理静态页面
    接下来,我们修改nginx,让它对商品请求进行监听,指向本地静态页面,如果本地没找到,才进行反向代理:

    server {
            listen       80;
            server_name  www.leyou.com;
    
            proxy_set_header X-Forwarded-Host $host;
    		proxy_set_header X-Forwarded-Server $host;
    		proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    		
    		location /api/goods/item {
    			# 先找本地
    			# root html;
    			# if (!-f $request_filename) { #请求的文件不存在,就反向代理
    				proxy_pass http://127.0.0.1:10010;
    			#	break;
    			# }
    		}
    		
    		location /api/goods {
    			proxy_pass http://127.0.0.1:10010;
    			proxy_connect_timeout 600;
    			proxy_read_timeout 600;
    		}
    		
    		location / {
    			proxy_pass http://127.0.0.1:9002;
    			proxy_connect_timeout 600;
    			proxy_read_timeout 600;
    		}
        }
    

    实际测试时发现静态化之后访问速度大大增加。

    15.RabbitMQ 15.1 RabbitMQ简介

    目前我们已经完成了商品详情和搜索系统的开发。我们思考一下,是否存在问题?

    商品的原始数据保存在数据库中,增删改查都在数据库中完成。搜索服务数据来源是索引库,如果数据库商品发生变化,索引库数据不能及时更新。商品详情做了页面静态化,静态页面数据也不会随着数据库商品发生变化。

    如果我们在后台修改了商品的价格,搜索页面和商品详情页显示的依然是旧的价格,这样显然不对。该如何解决?
    这里有两种解决方案:

    方案1:每当后台对商品做增删改操作,同时要修改索引库数据及静态页面方案2:搜索服务和商品页面服务对外提供操作接口,后台在商品增删改后,调用接口

    以上两种方式都有同一个严重问题:就是代码耦合,后台服务中需要嵌入搜索和商品页面服务,违背了微服务的独立原则。

    所以,我们会通过另外一种方式来解决这个问题:消息队列

    什么是消息队列
    消息队列是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

    结合前面所说的问题:

    商品服务对商品增删改以后,无需去操作索引库或静态页面,只是发送一条消息,也不关心消息被谁接收。搜索服务和静态页面服务接收消息,分别去处理索引库和静态页面。

    如果以后有其它系统也依赖商品服务的数据,同样监听消息即可,商品服务无需任何代码修改。

    MQ是消息通信的模型,并不是具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。
    两者间的区别和联系:

    JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。JMS规定了两种消息模型;而AMQP的消息模型更加丰富

    常见MQ产品

    ActiveMQ:基于JMSRabbitMQ:基于AMQP协议,erlang语言开发,稳定性好RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会Kafka:分布式消息系统,高吞吐量

    我将我的rabbitMQ安装在虚拟机上。

    15.2 五种消息模型

    创建一个新工程,并导入相关依赖:

    
    	4.0.0
    	cn.itcast.rabbitmq
    	itcast-rabbitmq
    	0.0.1-SNAPSHOT
    	
    		org.springframework.boot
    		spring-boot-starter-parent
    		2.0.2.RELEASE
    	
    	
    		1.8
    	
    	
    		
    			org.apache.commons
    			commons-lang3
    			3.3.2
    		
    		
    			org.springframework.boot
    			spring-boot-starter-amqp
    		
    		
    			org.springframework.boot
    			spring-boot-starter-test
    		
    	
    
    

    然后添加配置文件:

    spring:
      rabbitmq:
        host: 192.168.124.121
        username: leyou
        password: leyou
        virtual-host: /leyou
    

    编写一个工具类,用于建立与rabbitMQ的连接:

    package cn.itcast.rabbitmq.util;
    
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Connection;
    
    public class ConnectionUtil {
        
        public static Connection getConnection() throws Exception {
            //定义连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置服务地址
            factory.setHost("192.168.124.121");
            //端口
            factory.setPort(5672);
            //设置账号信息,用户名、密码、vhost
            factory.setVirtualHost("/leyou");
            factory.setUsername("leyou");
            factory.setPassword("leyou");
            // 通过工程获取连接
            Connection connection = factory.newConnection();
            return connection;
        }
    
    }
    

    RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同。

    1.基本消息模型
    RabbitMQ是一个消息代理:它接受和转发消息。 你可以把它想象成一个邮局:当你把邮件放在邮箱里时,你可以确定邮差先生最终会把邮件发送给你的收件人。 在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。RabbitMQ与邮局的主要区别是它不处理纸张,而是接受,存储和转发数据消息的二进制数据块。

    P(producer/ publisher):生产者,一个发送消息的用户应用程序。
    C(consumer):消费者,消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序
    队列(红色区域):rabbitmq内部类似于邮箱的一个概念。虽然消息流经rabbitmq和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。
    总之:
    生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。
    生产者:

    package cn.itcast.rabbitmq.direct;
    
    import cn.itcast.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Send {
        private final static String EXCHANGE_NAME = "direct_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明exchange,指定类型为direct
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            // 消息内容
            String message = "商品删除了, id = 1001";
            // 发送消息,并且指定routing key 为:insert ,代表新增商品
            channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());
            System.out.println(" [商品服务:] Sent '" + message + "'");
    
            channel.close();
            connection.close();
        }
    }
    

    消费者1:

    package cn.itcast.rabbitmq.direct;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import cn.itcast.rabbitmq.util.ConnectionUtil;
    
    public class Recv {
        private final static String QUEUE_NAME = "direct_exchange_queue_1";
        private final static String EXCHANGE_NAME = "direct_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            // 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
    
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者1] received : " + msg + "!");
                }
            };
            // 监听队列,自动ACK
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
    

    消费者2:

    package cn.itcast.rabbitmq.direct;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import cn.itcast.rabbitmq.util.ConnectionUtil;
    
    public class Recv2 {
        private final static String QUEUE_NAME = "direct_exchange_queue_2";
        private final static String EXCHANGE_NAME = "direct_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            // 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
    
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者2] received : " + msg + "!");
                }
            };
            // 监听队列,自动ACK
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
    

    消息确认机制(ACK)
    消息一旦被消费者接收,队列中的消息就会被删除。那么问题来了:RabbitMQ怎么知道消息被接收了呢?如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:

    自动ACK:消息一旦被接收,消费者自动发送ACK手动ACK:消息接收后,不会发送ACK,需要手动调用

    哪种更好取决于消息的重要性:

    如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。
    如何手动ACK呢?

    public class Recv2 {
        private final static String QUEUE_NAME = "simple_queue";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 创建通道
            final Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [x] received : " + msg + "!");
                    // 手动进行ACK
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            // 监听队列,第二个参数false,手动进行ACK
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    }
    

    注意最后一行代码

    channel.basicConsume(QUEUE_NAME, false, consumer);
    

    如果第二个参数为true,则会自动进行ACK;如果为false,则需要手动ACK。

    2.work消息模型
    工作队列或者竞争消费者模式

    在第一篇教程中,我们编写了一个程序,从一个命名队列中发送并接受消息。在这里,我们将创建一个工作队列,在多个工作者之间分配耗时任务。工作队列,又称任务队列。主要思想就是避免执行资源密集型任务时,必须等待它执行完成。相反我们稍后完成任务,我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。当你运行许多消费者时,任务将在他们之间共享,但是一个消息只能被一个消费者获取。这个概念在Web应用程序中特别有用,因为在短的HTTP请求窗口中无法处理复杂的任务。
    所以避免消息堆积的方法有两种:

    1)采用workqueue,多个消费者监听同一队列。2)接收到消息以后,通过线程池,异步消费。

    接下来我们来模拟这个流程:
    生产者:

    package cn.itcast.rabbitmq.work;
    
    import cn.itcast.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    // 生产者
    public class Send {
        private final static String QUEUE_NAME = "test_work_queue";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 循环发布任务
            for (int i = 0; i < 50; i++) {
                // 消息内容
                String message = "task .. " + i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'");
    
                Thread.sleep(i * 2);
            }
            // 关闭通道和连接
            channel.close();
            connection.close();
        }
    }
    

    消费者1:

    package cn.itcast.rabbitmq.work;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import cn.itcast.rabbitmq.util.ConnectionUtil;
    
    // 消费者1
    public class Recv {
        private final static String QUEUE_NAME = "test_work_queue";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            final Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 设置每个消费者同时只能处理一条消息
            channel.basicQos(1);
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者1] received : " + msg + "!");
                    try {
                        // 模拟完成任务的耗时:1000ms
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                    // 手动ACK
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            // 监听队列。
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    }
    

    消费者2:

    package cn.itcast.rabbitmq.work;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import cn.itcast.rabbitmq.util.ConnectionUtil;
    
    //消费者2
    public class Recv2 {
        private final static String QUEUE_NAME = "test_work_queue";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            final Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // 设置每个消费者同时只能处理一条消息
            channel.basicQos(1);
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者2] received : " + msg + "!");
                    // 手动ACK
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            // 监听队列。
            channel.basicConsume(QUEUE_NAME, false, consumer);
        }
    }
    

    能者多劳

    消费者1比消费者2的效率要低,一次任务的耗时较长然而两人最终消费的消息数量是一样的消费者2大量时间处于空闲状态,消费者1一直忙碌

    现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。

    怎么实现呢?

    我们可以使用basicQos方法和prefetchCount = 1设置。 这告诉RabbitMQ一次不要向工作人员发送多于一条消息。 或者换句话说,不要向工作人员发送新消息,直到它处理并确认了前一个消息。 相反,它会将其分派给不是仍然忙碌的下一个工作人员。

    订阅模型分类
    在之前的模式中,我们创建了一个工作队列。 工作队列背后的假设是:每个任务只被传递给一个工作人员。 在这一部分,我们将做一些完全不同的事情 - 我们将会传递一个信息给多个消费者。 这种模式被称为“发布/订阅”。

    1、1个生产者,多个消费者
    2、每一个消费者都有自己的一个队列
    3、生产者没有将消息直接发送到队列,而是发送到了交换机
    4、每个队列都要绑定到交换机
    5、生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的
    X(Exchanges):交换机一方面接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
    Exchange类型有以下几种:

    Fanout:广播,将消息交给所有绑定到交换机的队列
    Direct:定向,把消息交给符合指定routing key 的队列 
    Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
    

    Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
    3.订阅模型-Fanout

    在广播模式下,消息发送流程是这样的:

    1) 可以有多个消费者2) 每个消费者有自己的queue(队列)3) 每个队列都要绑定到Exchange(交换机)4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。5) 交换机把消息发送给绑定过的所有队列6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

    生产者:
    两个变化:

    1) 声明Exchange,不再声明Queue2) 发送消息到Exchange,不再发送到Queue

    package cn.itcast.rabbitmq.fanout;
    
    import cn.itcast.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Send {
    
        private final static String EXCHANGE_NAME = "fanout_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            
            // 声明exchange,指定类型为fanout
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            
            // 消息内容
            String message = "Hello everyone";
            // 发布消息到Exchange
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [生产者] Sent '" + message + "'");
    
            channel.close();
            connection.close();
        }
    }
    

    消费者1:

    package cn.itcast.rabbitmq.fanout;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import cn.itcast.rabbitmq.util.ConnectionUtil;
    
    //消费者1
    public class Recv {
        private final static String QUEUE_NAME = "fanout_exchange_queue_1";
    
        private final static String EXCHANGE_NAME = "fanout_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者1] received : " + msg + "!");
                }
            };
            // 监听队列,自动返回完成
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
    

    消费者2:

    package cn.itcast.rabbitmq.fanout;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import cn.itcast.rabbitmq.util.ConnectionUtil;
    // 消费者2
    public class Recv2 {
        private final static String QUEUE_NAME = "fanout_exchange_queue_2";
    
        private final static String EXCHANGE_NAME = "fanout_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            // 绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者2] received : " + msg + "!");
                }
            };
            // 监听队列,手动返回完成
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
    

    4.订阅模型-Direct
    有选择性的接收消息.在订阅模式中,生产者发布消息,所有消费者都可以获取所有消息。
    在路由模式中,我们将添加一个功能 - 我们将只能订阅一部分消息。 例如,我们只能将重要的错误消息引导到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
    但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
    在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)。消息的发送方在向Exchange发送消息时,也必须指定消息的routing key。

    P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
    X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
    C1:消费者,其所在队列指定了需要routing key 为 error 的消息
    C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
    生产者:

    package cn.itcast.rabbitmq.direct;
    
    import cn.itcast.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Send {
        private final static String EXCHANGE_NAME = "direct_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明exchange,指定类型为direct
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            // 消息内容
            String message = "商品删除了, id = 1001";
            // 发送消息,并且指定routing key 为:insert ,代表新增商品
            channel.basicPublish(EXCHANGE_NAME, "delete", null, message.getBytes());
            System.out.println(" [商品服务:] Sent '" + message + "'");
    
            channel.close();
            connection.close();
        }
    }
    

    消费者1:

    package cn.itcast.rabbitmq.direct;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import cn.itcast.rabbitmq.util.ConnectionUtil;
    
    public class Recv {
        private final static String QUEUE_NAME = "direct_exchange_queue_1";
        private final static String EXCHANGE_NAME = "direct_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            // 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
    
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者1] received : " + msg + "!");
                }
            };
            // 监听队列,自动ACK
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
    

    消费者2:

    package cn.itcast.rabbitmq.direct;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import cn.itcast.rabbitmq.util.ConnectionUtil;
    
    public class Recv2 {
        private final static String QUEUE_NAME = "direct_exchange_queue_2";
        private final static String EXCHANGE_NAME = "direct_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            
            // 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
    
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者2] received : " + msg + "!");
                }
            };
            // 监听队列,自动ACK
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
    

    5.订阅模型-Topic
    Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符!

    Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如:item.insert
    通配符规则:

    `#`:匹配一个或多个词
    `*`:匹配不多不少恰好1个词
    

    audit.#:能够匹配audit.irs.corporate 或者 audit.irs
    audit.*:只能匹配audit.irs
    生产者:
    使用topic类型的Exchange,发送消息的routing key有3种: item.isnert、item.update、item.delete:

    package cn.itcast.rabbitmq.topic;
    
    import cn.itcast.rabbitmq.util.ConnectionUtil;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    public class Send {
        private final static String EXCHANGE_NAME = "topic_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明exchange,指定类型为topic
            // 第三个参数表明需要持久化
            channel.exchangeDeclare(EXCHANGE_NAME, "topic",true);
            // 消息内容
            String message = "新增商品 : id = 1001";
            // 发送消息,并且指定routing key 为:insert ,代表新增商品
            // 第三个参数可以写成MessageProperties.PERSISTENT_TEXT_PLAIN 表示持久化消息
            channel.basicPublish(EXCHANGE_NAME, "item.insert", null, message.getBytes());
            System.out.println(" [商品服务:] Sent '" + message + "'");
    
            channel.close();
            connection.close();
        }
    }
    

    消费者1:

    package cn.itcast.rabbitmq.topic;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import cn.itcast.rabbitmq.util.ConnectionUtil;
    
    public class Recv {
        private final static String QUEUE_NAME = "topic_exchange_queue_1";
        private final static String EXCHANGE_NAME = "topic_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            // 第二个参数表示队列的持久化
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            
            // 绑定队列到交换机,同时指定需要订阅的routing key。需要 update、delete
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
    
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者1] received : " + msg + "!");
                }
            };
            // 监听队列,自动ACK
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
    

    消费者2:

    package cn.itcast.rabbitmq.topic;
    
    import java.io.IOException;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import cn.itcast.rabbitmq.util.ConnectionUtil;
    
    public class Recv2 {
        private final static String QUEUE_NAME = "topic_exchange_queue_2";
        private final static String EXCHANGE_NAME = "topic_exchange_test";
    
        public static void main(String[] argv) throws Exception {
            // 获取到连接
            Connection connection = ConnectionUtil.getConnection();
            // 获取通道
            Channel channel = connection.createChannel();
            // 声明队列
            // 第二个参数表示队列的持久化
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            
            // 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.*");
    
            // 定义队列的消费者
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                        byte[] body) throws IOException {
                    // body 即消息体
                    String msg = new String(body);
                    System.out.println(" [消费者2] received : " + msg + "!");
                }
            };
            // 监听队列,自动ACK
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }
    

    持久化
    如何避免消息丢失?
    1) 消费者的ACK机制。可以防止消费者丢失消息。
    2) 但是,如果在消费者消费之前,MQ就宕机了,消息就没了。

    是可以将消息进行持久化呢?
    要将消息持久化,前提是:队列、Exchange都持久化
    交换机持久化

    队列持久化

    消息持久化

    15.3 Spring AMQP

    Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ。如何使用Spring-amqp呢?
    添加依赖:

    
        org.springframework.boot
        spring-boot-starter-amqp
    
    

    在配置文件中添加:

    spring:
      rabbitmq:
        host: 192.168.124.121
        username: leyou
        password: leyou
        virtual-host: /leyou
    

    添加监听者:
    在SpringAmqp中,对消息的消费者进行了封装和抽象,一个普通的JavaBean中的普通方法,只要通过简单的注解,就可以成为一个消费者。

    @Component
    public class Listener {
    
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = "spring.test.queue", durable = "true"),
                exchange = @Exchange(
                        value = "spring.test.exchange",
                        ignoreDeclarationExceptions = "true",
                        type = ExchangeTypes.TOPIC
                ),
                key = {"#.#"}))
        public void listen(String msg){
            System.out.println("接收到消息:" + msg);
        }
    }
    

    @Componet:类上的注解,注册到Spring容器@RabbitListener:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:

    bindings:指定绑定关系,可以有多个。值是@QueueBinding的数组。@QueueBinding包含下面属性:

    value:这个消费者关联的队列。值是@Queue,代表一个队列exchange:队列所绑定的交换机,值是@Exchange类型key:队列和交换机绑定的RoutingKey

    类似listen这样的方法在一个类中可以写多个,就代表多个消费者。
    AmqpTemplate
    Spring最擅长的事情就是封装,把他人的框架进行封装和整合。Spring为AMQP提供了统一的消息处理模板:AmqpTemplate,非常方便的发送消息,其发送方法:

    红框圈起来的是比较常用的3个方法,分别是:

    指定交换机、RoutingKey和消息体指定消息指定RoutingKey和消息,会向默认的交换机发送消息

    测试代码

    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = Application.class)
    public class MqDemo {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        @Test
        public void testSend() throws InterruptedException {
            String msg = "hello, Spring boot amqp";
            this.amqpTemplate.convertAndSend("spring.test.exchange","a.b", msg);
            // 等待10秒后再结束
            Thread.sleep(10000);
        }
    }
    
    15.4 项目改造

    接下来,我们就改造项目,实现搜索服务、商品静态页的数据同步。
    思路分析
    发送方:商品微服务

    什么时候发?
    当商品服务对商品进行写操作:增、删、改的时候,需要发送一条消息,通知其它服务。发送什么内容?
    对商品的增删改时其它服务可能需要新的商品数据,但是如果消息内容中包含全部商品信息,数据量太大,而且并不是每个服务都需要全部的信息。因此我们只发送商品id,其它服务可以根据id查询自己需要的信息。

    接收方:搜索微服务、静态页面微服务
    接收消息后如何处理?

    搜索微服务:

    增/改:添加新的数据到索引库删:删除索引库数据 静态页微服务:

    增/改:创建新的静态页删:删除原来的静态页

    商品服务发送消息
    1.引入依赖

    
        org.springframework.boot
        spring-boot-starter-amqp
    
    

    2.配置文件

    spring:
      rabbitmq:
        host: 192.168.124.121
        username: leyou
        password: leyou
        virtual-host: /leyou
        template:
          exchange: leyou.item.exchange
        publisher-/confirm/is: true
    

    template:有关AmqpTemplate的配置

    exchange:缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个 publisher-/confirm/is:生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试

    3.改造GoodsService
    在GoodsService中封装一个发送消息到mq的方法:(需要注入AmqpTemplate模板)

    private void sendMessage(Long id, String type){
        // 发送消息
        try {
            this.amqpTemplate.convertAndSend("item." + type, id);
        } catch (Exception e) {
            logger.error("{}商品消息发送异常,商品id:{}", type, id, e);
        }
    }
    

    这里没有指定交换机,因此默认发送到了配置中的:leyou.item.exchange
    注意:这里要把所有异常都try起来,不能让消息的发送影响到正常的业务逻辑
    然后在新增的时候调用:

    修改的时候调用:

    搜索服务接收消息
    搜索服务接收到消息后要做的事情:

    增:添加新的数据到索引库删:删除索引库数据改:修改索引库数据

    因为索引库的新增和修改方法是合二为一的,因此我们可以将这两类消息一同处理,删除另外处理。
    引入依赖和添加配置我们跳过,直接进入监听器的编写:

    package com.leyou.search.listener;
    
    import com.leyou.item.pojo.Spu;
    import com.leyou.search.client.GoodsClient;
    import com.leyou.search.pojo.Goods;
    import com.leyou.search.repository.GoodsRepository;
    import com.leyou.search.service.SearchService;
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    //必须将listener注入spring容器
    @Component
    public class GoodsListener {
        @Autowired
        private GoodsRepository goodsRepository;
    
        @Autowired
        private GoodsClient goodsClient;
    
        @Autowired
        private SearchService searchService;
        //添加rabbitMQ注解,指定交换机的类型,名称,队列名称以及其绑定的routingKey
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = "LEYOU_SEARCH_SAVE_QUEUE", durable = "true"),
                exchange = @Exchange(value = "LEYOU_ITEM_EXCHANGE", ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),
                key = {"item.insert","item.update"}))
        // 异常一定要抛出,因为spring会根据该方法是否抛出异常来决定是否发送ACK确认,若没有抛出异常则spring自动帮我们发送确认,若抛出异常,则消息仍然会继续存在于
        // 消息队列之中
        public void save(Long id) throws IOException {
            if(id == null){
                return;
            }
            Spu spu = this.goodsClient.querySpuBySpuId(id);
            Goods goods = this.searchService.spuToGoods(spu);
            this.goodsRepository.save(goods);
        }
    
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = "LEYOU_SEARCH_DELETE_QUEUE", durable = "true"),
                exchange = @Exchange(value = "LEYOU_ITEM_EXCHANGE", ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),
                key = {"item.delete"}))
        public void delete(Long id){
            if(id == null){
                return;
            }
            this.goodsRepository.deleteById(id);
        }
    }
    
    

    静态页面服务接收消息
    商品静态页服务接收到消息后的处理:

    增:创建新的静态页删:删除原来的静态页改:创建新的静态页并覆盖原来的

    不过,我们编写的创建静态页的方法也具备覆盖以前页面的功能,因此:增和改的消息可以放在一个方法中处理,删除消息放在另一个方法处理。

    package com.leyou.goods.listener;
    
    import com.leyou.goods.service.impl.GoodsHtmlServiceImpl;
    import org.aspectj.lang.annotation.Aspect;
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.io.File;
    
    @Component
    public class GoodsListener {
        @Autowired
        private GoodsHtmlServiceImpl goodsHtmlService;
    
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = "LEYOU_GOODS_SAVE_QUEUE",durable = "true"),
                exchange = @Exchange(value = "LEYOU_ITEM_EXCHANGE", ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),
                key = {"item.insert","item.update"}
        ))
        public void save(Long id){
            if(id == null){
                return;
            }
            this.goodsHtmlService.createHtml(id);
        }
    
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = "LEYOU_GOODS_DELETE_QUEUE",durable = "true"),
                exchange = @Exchange(value = "LEYOU_ITEM_EXCHANGE", ignoreDeclarationExceptions = "true",type = ExchangeTypes.TOPIC),
                key = {"item.delete"}
        ))
        public void delete(Long id){
            if(id == null){
                return;
            }
            File file = new File("D:\ApplicationProgram\nginx-1.14.0\html\api\goods\item\" + id + ".html");
            file.deleteOnExit();
        }
    }
    
    
    转载请注明:文章转载自 www.mshxw.com
    本文地址:https://www.mshxw.com/it/758606.html
    我们一直用心在做
    关于我们 文章归档 网站地图 联系我们

    版权所有 (c)2021-2022 MSHXW.COM

    ICP备案号:晋ICP备2021003244-6号