栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

分布式电商项目 谷粒商城 学习笔记<3>

分布式电商项目 谷粒商城 学习笔记<3>

文章目录
  • 十、ES
    • 7.进阶--聚合
      • 聚合
      • 子聚合
    • 8.Mapping字段映射
      • 创建索引并指定映射
      • 不能更新映射
    • 9.分词
      • 安装ik分词器
      • 补充:linux命令行编辑
      • 自定义词库
    • 10.elasticsearch-Rest-Client
      • 导入依赖
      • 不需要数据源的微服务 依赖的父工程中有数据源相关配置处理
      • 配置类
      • 测试类
        • 保存/修改
        • 检索及聚合
  • 十一、安装nginx
  • 十二、product-es准备
    • 1.确定索引模型
    • 2.nested嵌入式对象
  • 十三、商品上架
    • 1.基本思路
    • 2.批量查询sku是否有库存
    • 3.给一个skuEsModels 批量上传ES
    • 4.根据spuId封装上架数据
    • 5.一个小高端的写法
  • 十四、Nginx
    • 1.简述
    • 2.Nginx+网关的逻辑
    • 3.Nginx配置文件解析
    • 4.Nginx+网关配置

十、ES 7.进阶–聚合

聚合提供了从数据中分组和提取数据的能力。

最简单的聚合方法大致等于SQL Group by(分组统计)和SQL聚合函数(求平均 求最大 求最小)

聚合

是对查询出的数据做一些处理

# 分别为包含mill、平均年龄、
GET bank/_search
{
  "query": { # 查询出包含mill的
    "match": {
      "address": "Mill"
    }
  },
  "aggs": { #基于查询聚合
    "ageAgg": {  # 聚合的名字,随便起
      "terms": { # 看值的可能性分布 就是分组统计 类似于group by
        "field": "age",
        "size": 10
      }
    },
    "ageAvg": { 
      "avg": { # 看age值的平均
        "field": "age"
      }
    },
    "balanceAvg": {
      "avg": { # 看balance的平均
        "field": "balance"
      }
    }
  },
  "size": 0  # 不看详情
}
子聚合

也就是在aggs里再写一个aggs

GET bank/_search
{
  "query": {
    "match_all": {}
  },
  "aggs": {
    "ageAgg": {
      "terms": { # 看分布 对年龄分组
        "field": "age",
        "size": 100
      },
      "aggs": { # 与terms并列
        "ageAvg": { #平均  对已根据年龄分组的数据 进行求平均 比如 求20岁所有人的平均薪资
          "avg": {
            "field": "balance"
          }
        }
      }
    }
  },
  "size": 0
}
8.Mapping字段映射

是直接定义在索引下的,因为不同类型下、同名文档的处理方式是没有区别的。所以使用mapping映射,相当于屏蔽了类型,把文档直接放在了索引的下一级。

ElasticSearch7-去掉type概念

创建索引并指定映射
PUT /my_index  #相当于mysql创建一个表  指定各个字段的类型
{
  "mappings": {
    "properties": {
      "age": {
        "type": "integer"
      },
      "email": {
        "type": "keyword" # 指定为keyword
      },
      "name": {
        "type": "text" # 全文检索。保存时候分词,检索时候进行分词匹配
      }
    }
  }
}


PUT /my_index/_mapping  #相当于往表里填数据
{
  "properties": {
    "employee-id": {
      "type": "keyword",
      "index": false # 字段不能被检索。检索  表明新增的字段不能被检索,只是一个冗余字段。不能更新映射
对于已经存在的字段映射,我们不能更新。更新必须创建新的索引,进行数据迁移。
    }
  }
}



GET /my_index #查看映射
不能更新映射

对于已经存在的字段映射,我们不能更新。更新必须创建新的索引,进行数据迁移。

相当于不能 更改表的属性 (类比于mysql)

必须重新新建一个索引,然后把旧数据迁移过来。

创建新索引

PUT /newbank
{
  "mappings": {
    "properties": {
      "account_number": {
        "type": "long"
      },
      "address": {
        "type": "text"
      },
      "age": {
        "type": "integer"
      },
      "balance": {
        "type": "long"
      },
      "city": {
        "type": "keyword"
      },
      "email": {
        "type": "keyword"
      },
      "employer": {
        "type": "keyword"
      },
      "firstname": {
        "type": "text"
      },
      "gender": {
        "type": "keyword"
      },
      "lastname": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "state": {
        "type": "keyword"
      }
    }
  }
}

将bank中的数据迁移到newbank中

POST _reindex
{
  "source": {
    "index": "bank",
    "type": "account" #原索引的名称和类型
  },
  "dest": {
    "index": "newbank"  #新索引的名称
  }
}

新索引的type会变为_doc (默认的索引类型 ,旧索引是account)

9.分词

一个tokenizer(分词器)接收一个字符流,将之分割为独立的tokens(词元,通常是独立的单词),然后输出tokens流。

POST _analyze
{
  "analyzer": "standard",
  "text": "The 2 Brown-Foxes bone."
}

会按单词分开,但是 对于中文,就不合适,因为它会把每个字都当成一个词 去分

所以我们需要使用其他分词器

安装ik分词器

在前面安装的elasticsearch时,我们已经将elasticsearch容器的“/usr/share/elasticsearch/plugins”目录,映射到宿主机的“ /mydata/elasticsearch/plugins”目录下,所以比较方便的做法就是下载“/elasticsearch-analysis-ik-7.4.2.zip”文件,然后解压到该文件夹下即可。安装完毕后,需要重启elasticsearch容器。

就是下载然后解压 然后放到/mydata/elasticsearch/plugins目录下 然后重启容器。

GET _analyze
{
   "analyzer": "ik_smart", 
   "text":"我是中国人"
}

GET _analyze
{
   "analyzer": "ik_max_word", 
   "text":"我是中国人"
}
补充:linux命令行编辑

vi 文件名

i 进入插入模式

esc退出插入模式

:wq退出并保存

自定义词库

修改/usr/share/elasticsearch/plugins/ik/config中的IKAnalyzer.cfg.xml




	IK Analyzer 扩展配置
	
	
	 
	
	
	http://192.168.56.10/es/fenci.txt  #在这里配置自定义分词文件的路径
	
	

具体可以看笔记,这里就不深究了

https://blog.csdn.net/hancoder/article/details/113922398

10.elasticsearch-Rest-Client 导入依赖
    org.elasticsearch.client    elasticsearch-rest-high-level-client    7.4.2

由于 springboot已经整合了ES 版本为6.8.5 所以要把spring-boot-dependencies中所依赖的ES版本改掉


    1.8
    7.4.2 #这里原来是6.8.5

不需要数据源的微服务 依赖的父工程中有数据源相关配置处理

启动类加注解

@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
配置类
@Configuration
public class GulimallElasticSearchConfig {

    public static final RequestOptions COMMON_OPTIONS;

    static {
        RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();

        COMMON_OPTIONS = builder.build();
    }

    @Bean
    public RestHighLevelClient esRestClient() {

        RestClientBuilder builder = null;
        // 可以指定多个es
        builder = RestClient.builder(new HttpHost(host, 9200, "http"));

        RestHighLevelClient client = new RestHighLevelClient(builder);
        return client;
    }
}
测试类 保存/修改
@Testpublic void indexData() throws IOException {        // 设置索引    IndexRequest indexRequest = new IndexRequest ("users");    indexRequest.id("1");    User user = new User();    user.setUserName("张三");    user.setAge(20);    user.setGender("男");    String jsonString = JSON.toJSonString(user);        //设置要保存的内容,指定数据和类型    indexRequest.source(jsonString, XContentType.JSON);        //执行创建索引和保存数据    IndexResponse index = client.index(indexRequest, GulimallElasticSearchConfig.COMMON_OPTIONS);    System.out.println(index);}

保存语句再次发送就会变成修改操作。

检索及聚合
	@Test    public void find() throws IOException {        // 1 创建检索请求        SearchRequest searchRequest = new SearchRequest();        searchRequest.indices("bank");  //填充索引        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();        // 构造检索条件//        sourceBuilder.query();//        sourceBuilder.from();//        sourceBuilder.size();//        sourceBuilder.aggregation();        sourceBuilder.query(QueryBuilders.matchQuery("address","mill")); //查询address包含mill的        System.out.println(sourceBuilder.toString());  //sourceBuilder就是一个查询语句的json串 //任何的查询条件 match matchall agg  都用各自的构造器构造好了放到SearchSourceBuilder中,然后塞入searchRequest                //构建第一个聚合条件:按照年龄的值分布          TermsAggregationBuilder agg1 = AggregationBuilders.terms("agg1").field("age").size(10);// 聚合名称		// 参数为AggregationBuilder        sourceBuilder.aggregation(agg1);        // 构建第二个聚合条件:平均薪资        AvgAggregationBuilder agg2 = AggregationBuilders.avg("agg2").field("balance");        sourceBuilder.aggregation(agg2);                        searchRequest.source(sourceBuilder);        // 2 执行检索        SearchResponse response = client.search(searchRequest, GulimallElasticSearchConfig.COMMON_OPTIONS);        // 3 分析响应结果        System.out.println(response.toString());  //response也是一个结果json串                        // 3.1 获取java bean        SearchHits hits = response.getHits();        SearchHit[] hits1 = hits.getHits();        for (SearchHit hit : hits1) {            hit.getId();            hit.getIndex();            String sourceAsString = hit.getSourceAsString(); //真正的查询结果在source中,获取到source,用json转换工具转成bean对象 这个bean需要自己创建VO类(或者已有的PO)            Account account = JSON.parseObject(sourceAsString, Account.class);            System.out.println(account);        }                // 3.2 获取聚合的结果        Aggregations aggregations = response.getAggregations();        Terms agg21 = aggregations.get("agg2");  //根据聚合名称获取        for (Terms.Bucket bucket : agg21.getBuckets()) {            String keyAsString = bucket.getKeyAsString();            System.out.println(keyAsString);        }    }

一般一个类名加s 代表这个类的构造器

十一、安装nginx

nginx可以理解为tomcat 是一个web服务器

随便启动一个nginx实例,只是为了复制出配置

docker run -p80:80 --name nginx -d nginx:1.10  

将容器内的配置文件拷贝到/mydata/nginx/conf/ 下

mkdir -p /mydata/nginx/htmlmkdir -p /mydata/nginx/logsmkdir -p /mydata/nginx/confdocker container cp nginx:/etc/nginx@PostMapping("/hasStock")public R getSkuHasStock(@RequestBody List SkuIds){    List vos = wareSkuService.getSkuHasStock(SkuIds);    return R.ok().setData(vos);}
3.给一个skuEsModels 批量上传ES
@PostMapping("/product") // ElasticSaveController
public R productStatusUp(@RequestBody List skuEsModels){

    boolean status;
    try {
        status = productSaveService.productStatusUp(skuEsModels);
    } catch (IOException e) {
        log.error("ElasticSaveController商品上架错误: {}", e);
        return R.error(BizCodeEnum.PRODUCT_UP_EXCEPTION.getCode(), BizCodeEnum.PRODUCT_UP_EXCEPTION.getMsg());
    }
    if(!status){
        return R.ok();
    }
    return R.error(BizCodeEnum.PRODUCT_UP_EXCEPTION.getCode(), BizCodeEnum.PRODUCT_UP_EXCEPTION.getMsg());
}


public boolean productStatusUp(List skuEsModels) throws IOException {
    // 1.给ES建立一个索引 product
    BulkRequest bulkRequest = new BulkRequest();
    // 2.构造保存请求
    for (SkuEsModel esModel : skuEsModels) {
        // 设置索引
        IndexRequest indexRequest = new IndexRequest(EsConstant.PRODUCT_INDEX);
        // 设置索引id
        indexRequest.id(esModel.getSkuId().toString());
        String jsonString = JSON.toJSONString(esModel);
        indexRequest.source(jsonString, XContentType.JSON);
        // add
        bulkRequest.add(indexRequest);
    }
    // bulk批量保存
    BulkResponse bulk = client.bulk(bulkRequest, GuliESConfig.COMMON_OPTIONS);
    // TODO 是否拥有错误
    boolean hasFailures = bulk.hasFailures();
    if(hasFailures){
        List collect = Arrays.stream(bulk.getItems()).map(item -> item.getId()).collect(Collectors.toList());
        log.error("商品上架错误:{}",collect);
    }
    return hasFailures;
}
4.根据spuId封装上架数据

也就是根据spuid封装ESModels

// SpuInfoServiceImpl 
public void upSpuForSearch(Long spuId) {
        //1、查出当前spuId对应的所有sku信息,品牌的名字
        List skuInfoEntities=skuInfoService.getSkusBySpuId(spuId);
        //TODO 4、根据spu查出当前sku的所有可以被用来检索的规格属性
        List productAttrValueEntities = productAttrValueService.list(new QueryWrapper().eq("spu_id", spuId));
        List attrIds = productAttrValueEntities.stream().map(attr -> {
            return attr.getAttrId();
        }).collect(Collectors.toList());
        List searchIds=attrService.selectSearchAttrIds(attrIds); #这里是根据id查询数据库 下面有sql语句
        Set ids = new HashSet<>(searchIds);
        List searchAttrs = productAttrValueEntities.stream().filter(entity -> {
            return ids.contains(entity.getAttrId());
        }).map(entity -> {
            SkuEsModel.Attr attr = new SkuEsModel.Attr();
            BeanUtils.copyProperties(entity, attr);
            return attr;
        }).collect(Collectors.toList());


        //TODO 1、发送远程调用,库存系统查询是否有库存
        Map stockMap = null;
        try {
            List longList = skuInfoEntities.stream().map(SkuInfoEntity::getSkuId).collect(Collectors.toList());
            List skuHasStocks = wareFeignService.getSkuHasStocks(longList);
            stockMap = skuHasStocks.stream().collect(Collectors.toMap(SkuHasStockVo::getSkuId, SkuHasStockVo::getHasStock));
        }catch (Exception e){
            log.error("远程调用库存服务失败,原因{}",e);
        }

        //2、封装每个sku的信息
        Map finalStockMap = stockMap;
        List skuEsModels = skuInfoEntities.stream().map(sku -> {
            SkuEsModel skuEsModel = new SkuEsModel();
            BeanUtils.copyProperties(sku, skuEsModel);
            skuEsModel.setSkuPrice(sku.getPrice());
            skuEsModel.setSkuImg(sku.getSkuDefaultImg());
            //TODO 2、热度评分。0
            skuEsModel.setHotScore(0L);
            //TODO 3、查询品牌和分类的名字信息
            BrandEntity brandEntity = brandService.getById(sku.getBrandId());
            skuEsModel.setBrandName(brandEntity.getName());
            skuEsModel.setBrandImg(brandEntity.getLogo());
            CategoryEntity categoryEntity = categoryService.getById(sku.getCatalogId());
            skuEsModel.setCatalogName(categoryEntity.getName());
            //设置可搜索属性
            skuEsModel.setAttrs(searchAttrs);
            //设置是否有库存
            skuEsModel.setHasStock(finalStockMap==null?false:finalStockMap.get(sku.getSkuId()));
            return skuEsModel;
        }).collect(Collectors.toList());

    
        //TODO 5、将数据发给es进行保存:gulimall-search
        R r = searchFeignService.saveProductAsIndices(skuEsModels);
        if (r.getCode()==0){
            this.baseMapper.upSpuStatus(spuId, ProductConstant.ProductStatusEnum.SPU_UP.getCode());
        }else {
            log.error("商品远程es保存失败");
        }
    }

selectSearchAttrIds 对应的持久层sql


		
		
		
		
		
		
		
		
		
		
< / resu1tMap>