栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 其他 > Hadoop

【博文推荐】Hadoop中MapReduce多种join实现实例分析

Hadoop 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【博文推荐】Hadoop中MapReduce多种join实现实例分析

本博文出自51CTO博客 zengzhaozheng博主,有任何问题请进入博主页面互动讨论!
博文地址:http://zengzhaozheng.blog.51cto.com/8219051/1392961

一、概述

对于RDBMS中的join操作大伙一定非常熟悉,写sql的时候要十分注意细节,稍有差池就会耗时巨久造成很大的性能瓶颈,而在Hadoop中使用MapReduce框架进行join的操作时同样耗时,但是由于hadoop的分布式设计理念的特殊性,因此对于这种join操作同样也具备了一定的特殊性。本文主要对MapReduce框架对表之间的join操作的几种实现方式进行详细分析,并且根据我在实际开发过程中遇到的实际例子来进行进一步的说明。

二、实现原理

1、在Reudce端进行连接。

在Reudce端进行连接是MapReduce框架进行表之间join操作最为常见的模式,其具体的实现原理如下:

Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,***进行输出。

reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,***进行笛卡尔只就ok了。原理非常简单,下面来看一个实例:

(1)自定义一个value返回类型:

  1. package com.mr.reduceSizeJoin;   import java.io.DataInput;   
  2. import java.io.DataOutput;   import java.io.IOException;   
  3. import org.apache.hadoop.io.Text;   import org.apache.hadoop.io.WritableComparable;   
  4. public class Combinevalues implements WritableComparable{       //private static final Logger logger = LoggerFactory.getLogger(Combinevalues.class);   
  5.     private Text joinKey;//链接关键字       private Text flag;//文件来源标志   
  6.     private Text secondPart;//除了链接键外的其他部分       public void setJoinKey(Text joinKey) {   
  7.         this.joinKey = joinKey;       }   
  8.     public void setFlag(Text flag) {           this.flag = flag;   
  9.     }       public void setSecondPart(Text secondPart) {   
  10.         this.secondPart = secondPart;       }   
  11.     public Text getFlag() {           return flag;   
  12.     }       public Text getSecondPart() {   
  13.         return secondPart;       }   
  14.     public Text getJoinKey() {           return joinKey;   
  15.     }       public Combinevalues() {   
  16.         this.joinKey =  new Text();           this.flag = new Text();   
  17.         this.secondPart = new Text();       }     @Override 
  18.     public void write(DataOutput out) throws IOException {           this.joinKey.write(out);   
  19.         this.flag.write(out);           this.secondPart.write(out);   
  20.     }       @Override 
  21.     public void readFields(DataInput in) throws IOException {           this.joinKey.readFields(in);   
  22.         this.flag.readFields(in);           this.secondPart.readFields(in);   
  23.     }       @Override 
  24.     public int compareTo(Combinevalues o) {           return this.joinKey.compareTo(o.getJoinKey());   
  25.     }       @Override 
  26.     public String toString() {           // TODO Auto-generated method stub   
  27.         return "[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]";       }   

(2)map、reduce主体代码

  1. package com.mr.reduceSizeJoin;   import java.io.IOException;   
  2. import java.util.ArrayList;   import org.apache.hadoop.conf.Configuration;   
  3. import org.apache.hadoop.conf.Configured;   import org.apache.hadoop.fs.Path;   
  4. import org.apache.hadoop.io.Text;   import org.apache.hadoop.mapreduce.Job;   
  5. import org.apache.hadoop.mapreduce.Mapper;   import org.apache.hadoop.mapreduce.Reducer;   
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   import org.apache.hadoop.mapreduce.lib.input.FileSplit;   
  7. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
  8. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   import org.apache.hadoop.util.Tool;   
  9. import org.apache.hadoop.util.ToolRunner;   import org.slf4j.Logger;   
  10. import org.slf4j.LoggerFactory;    public class ReduceSideJoin_LeftOuterJoin extends Configured implements Tool{   
  11.     private static final Logger logger = LoggerFactory.getLogger(ReduceSideJoin_LeftOuterJoin.class);       public static class LeftOutJoinMapper extends Mapper {   
  12.         private Combinevalues combinevalues = new Combinevalues();           private Text flag = new Text();   
  13.         private Text joinKey = new Text();           private Text secondPart = new Text();   
  14.         @Override         protected void map(Object key, Text value, Context context)   
  15.                 throws IOException, InterruptedException {               //获得文件输入路径   
  16.             String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();               //数据来自tb_dim_city.dat文件,标志即为"0"   
  17.             if(pathName.endsWith("tb_dim_city.dat")){                   String[] valueItems = value.toString().split("\|");   
  18.                 //过滤格式错误的记录                   if(valueItems.length != 5){   
  19.                     return;                   }   
  20.                 flag.set("0");                   joinKey.set(valueItems[0]);   
  21.                 secondPart.set(valueItems[1]+"t"+valueItems[2]+"t"+valueItems[3]+"t"+valueItems[4]);                   combinevalues.setFlag(flag);   
  22.                 combinevalues.setJoinKey(joinKey);                   combinevalues.setSecondPart(secondPart);   
  23.                 context.write(combinevalues.getJoinKey(), combinevalues);
  24.  
  25.                 }//数据来自于tb_user_profiles.dat,标志即为"1"               else if(pathName.endsWith("tb_user_profiles.dat")){   
  26.                 String[] valueItems = value.toString().split("\|");                   //过滤格式错误的记录   
  27.                 if(valueItems.length != 4){                       return;   
  28.                 }                   flag.set("1");   
  29.                 joinKey.set(valueItems[3]);                   secondPart.set(valueItems[0]+"t"+valueItems[1]+"t"+valueItems[2]);   
  30.                 combinevalues.setFlag(flag);                   combinevalues.setJoinKey(joinKey);   
  31.                 combinevalues.setSecondPart(secondPart);                   context.write(combinevalues.getJoinKey(), combinevalues);   
  32.             }           }   
  33.     }       public static class LeftOutJoinReducer extends Reducer {   
  34.         //存储一个分组中的左表信息           private ArrayList leftTable = new ArrayList();   
  35.         //存储一个分组中的右表信息           private ArrayList rightTable = new ArrayList();   
  36.         private Text secondPar = null;           private Text output = new Text();   
  37.                  @Override 
  38.         protected void reduce(Text key, Iterable value, Context context)                   throws IOException, InterruptedException {   
  39.             leftTable.clear();               rightTable.clear();   
  40.                          for(Combinevalues cv : value){   
  41.                 secondPar = new Text(cv.getSecondPart().toString());                   //左表tb_dim_city   
  42.                 if("0".equals(cv.getFlag().toString().trim())){                       leftTable.add(secondPar);   
  43.                 }                   //右表tb_user_profiles   
  44.                 else if("1".equals(cv.getFlag().toString().trim())){                       rightTable.add(secondPar);   
  45.                 }               }   
  46.             logger.info("tb_dim_city:"+leftTable.toString());               logger.info("tb_user_profiles:"+rightTable.toString());   
  47.             for(Text leftPart : leftTable){                   for(Text rightPart : rightTable){   
  48.                     output.set(leftPart+ "t" + rightPart);                       context.write(key, output);   
  49.                 }               }   
  50.         }       }   
  51.     @Override     public int run(String[] args) throws Exception {   
  52.           Configuration conf=getConf(); //获得配置文件对象               Job job=new Job(conf,"LeftOutJoinMR");   
  53.             job.setJarByClass(ReduceSideJoin_LeftOuterJoin.class);
  54.             FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径               FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径            job.setMapperClass(LeftOutJoinMapper.class);   
  55.             job.setReducerClass(LeftOutJoinReducer.class);
  56.             job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式               job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格格式             //设置map的输出key和value类型   
  57.             job.setMapOutputKeyClass(Text.class);               job.setMapOutputValueClass(Combinevalues.class);             //设置reduce的输出key和value类型   
  58.             job.setOutputKeyClass(Text.class);               job.setOutputValueClass(Text.class);   
  59.             job.waitForCompletion(true);               return job.isSuccessful()?0:1;   
  60.     }       public static void main(String[] args) throws IOException,   
  61.             ClassNotFoundException, InterruptedException {           try {   
  62.             int returnCode =  ToolRunner.run(new ReduceSideJoin_LeftOuterJoin(),args);               System.exit(returnCode);   
  63.         } catch (Exception e) {               // TODO Auto-generated catch block   
  64.             logger.error(e.getMessage());           }   
  65.     }   } 

其中具体的分析以及数据的输出输入请看代码中的注释已经写得比较清楚了,这里主要分析一下reduce join的一些不足。之所以会存在reduce join这种方式,我们可以很明显的看出原:因为整体数据被分割了,每个map task只处理一部分数据而不能够获取到所有需要的join字段,因此我们需要在讲join key作为reduce端的分组将所有join key相同的记录集中起来进行处理,所以reduce join这种方式就出现了。这种方式的缺点很明显就是会造成map和reduce端也就是shuffle阶段出现大量的数据传输,效率很低。

2、在Map端进行连接。

使用场景:一张表十分小、一张表很大。

用法:在提交作业的时候先将小表文件放到该作业的DistributedCache中,然后从DistributeCache中取出该小表进行join key / value解释分割放到内存中(可以放大Hash Map等等容器中)。然后扫描大表,看大表中的每条记录的join key /value值是否能够在内存中找到相同join key的记录,如果有则直接输出结果。

直接上代码,比较简单:

  1. package com.mr.mapSideJoin;   import java.io.BufferedReader;   
  2. import java.io.FileReader;   import java.io.IOException;   
  3. import java.util.HashMap;   import org.apache.hadoop.conf.Configuration;   
  4. import org.apache.hadoop.conf.Configured;   import org.apache.hadoop.filecache.DistributedCache;   
  5. import org.apache.hadoop.fs.Path;   import org.apache.hadoop.io.Text;   
  6. import org.apache.hadoop.mapreduce.Job;   import org.apache.hadoop.mapreduce.Mapper;   
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   
  9. import org.apache.hadoop.util.Tool;   import org.apache.hadoop.util.ToolRunner;   
  10. import org.slf4j.Logger;   import org.slf4j.LoggerFactory;   
  11.  
  12. public class MapSideJoinMain extends Configured implements Tool{       private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class);   
  13.     public static class LeftOutJoinMapper extends Mapper {
  14.  
  15.         private HashMap city_info = new HashMap();           private Text outPutKey = new Text();   
  16.         private Text outPutValue = new Text();           private String mapInputStr = null;   
  17.         private String mapInputSpit[] = null;           private String city_secondPart = null;   
  18.          
  19.         @Override         protected void setup(Context context)   
  20.                 throws IOException, InterruptedException {               BufferedReader br = null;   
  21.             //获得当前作业的DistributedCache相关文件               Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());   
  22.             String cityInfo = null;               for(Path p : distributePaths){   
  23.                 if(p.toString().endsWith("tb_dim_city.dat")){                       //读缓存文件,并放到mem中   
  24.                     br = new BufferedReader(new FileReader(p.toString()));                       while(null!=(cityInfo=br.readLine())){   
  25.                         String[] cityPart = cityInfo.split("\|",5);                           if(cityPart.length ==5){   
  26.                             city_info.put(cityPart[0], cityPart[1]+"t"+cityPart[2]+"t"+cityPart[3]+"t"+cityPart[4]);                           }   
  27.                     }                   }   
  28.             }           }                  @Override 
  29.         protected void map(Object key, Text value, Context context)                   throws IOException, InterruptedException {   
  30.             //排掉空行               if(value == null || value.toString().equals("")){   
  31.                 return;               }   
  32.             mapInputStr = value.toString();               mapInputSpit = mapInputStr.split("\|",4);   
  33.             //过滤非法记录               if(mapInputSpit.length != 4){   
  34.                 return;               }   
  35.             //判断链接字段是否在map中存在               city_secondPart = city_info.get(mapInputSpit[3]);   
  36.             if(city_secondPart != null){                   this.outPutKey.set(mapInputSpit[3]);   
  37.                 this.outPutValue.set(city_secondPart+"t"+mapInputSpit[0]+"t"+mapInputSpit[1]+"t"+mapInputSpit[2]);                   context.write(outPutKey, outPutValue);   
  38.             }           }   
  39.     }       @Override 
  40.     public int run(String[] args) throws Exception {               Configuration conf=getConf(); //获得配置文件对象   
  41.             DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);//为该job添加缓存文件               Job job=new Job(conf,"MapJoinMR");   
  42.             job.setNumReduceTasks(0);
  43.  
  44.             FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径               FileOutputFormat.setOutputPath(job, new Path(args[2])); //设置reduce输出文件路径             job.setJarByClass(MapSideJoinMain.class);   
  45.             job.setMapperClass(LeftOutJoinMapper.class);
  46.  
  47.             job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式               job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式             //设置map的输出key和value类型   
  48.             job.setMapOutputKeyClass(Text.class);
  49.  
  50.             //设置reduce的输出key和value类型               job.setOutputKeyClass(Text.class);   
  51.             job.setOutputValueClass(Text.class);               job.waitForCompletion(true);   
  52.             return job.isSuccessful()?0:1;       }   
  53.     public static void main(String[] args) throws IOException,               ClassNotFoundException, InterruptedException {   
  54.         try {               int returnCode =  ToolRunner.run(new MapSideJoinMain(),args);   
  55.             System.exit(returnCode);           } catch (Exception e) {   
  56.             // TODO Auto-generated catch block               logger.error(e.getMessage());   
  57.         }       }   

这里说说DistributedCache。DistributedCache是分布式缓存的一种实现,它在整个MapReduce框架中起着相当重要的作用,他可以支撑我们写一些相当复杂高效的分布式程序。说回到这里,JobTracker在作业启动之前会获取到DistributedCache的资源uri列表,并将对应的文件分发到各个涉及到该作业的任务的TaskTracker上。另外,关于DistributedCache和作业的关系,比如权限、存储路径区分、public和private等属性,接下来有用再整理研究一下写一篇blog,这里就不详细说了。

另外还有一种比较变态的Map Join方式,就是结合Hbase来做Map Join操作。这种方式完全可以突破内存的控制,使你毫无忌惮的使用Map Join,而且效率也非常不错。

3、SemiJoin。

SemiJoin就是所谓的半连接,其实仔细一看就是reduce join的一个变种,就是在map端过滤掉一些数据,在网络中只传输参与连接的数据不参与连接的数据不必在网络中进行传输,从而减少了shuffle的网络传输量,使整体效率得到提高,其他思想和reduce join是一模一样的。说得更加接地气一点就是将小表中参与join的key单独抽出来通过DistributedCach分发到相关节点,然后将其取出放到内存中(可以放到HashSet中),在map阶段扫描连接表,将join key不在内存HashSet中的记录过滤掉,让那些参与join的记录通过shuffle传输到reduce端进行join操作,其他的和reduce join都是一样的。看代码:

  1. package com.mr.SemiJoin;   import java.io.BufferedReader;   
  2. import java.io.FileReader;   import java.io.IOException;   
  3. import java.util.ArrayList;   import java.util.HashSet;   
  4. import org.apache.hadoop.conf.Configuration;   import org.apache.hadoop.conf.Configured;   
  5. import org.apache.hadoop.filecache.DistributedCache;   import org.apache.hadoop.fs.Path;   
  6. import org.apache.hadoop.io.Text;   import org.apache.hadoop.mapreduce.Job;   
  7. import org.apache.hadoop.mapreduce.Mapper;   import org.apache.hadoop.mapreduce.Reducer;   
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   import org.apache.hadoop.mapreduce.lib.input.FileSplit;   
  9. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
  10. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   import org.apache.hadoop.util.Tool;   
  11. import org.apache.hadoop.util.ToolRunner;   import org.slf4j.Logger;   
  12. import org.slf4j.LoggerFactory;    
  13. public class SemiJoin extends Configured implements Tool{       private static final Logger logger = LoggerFactory.getLogger(SemiJoin.class);   
  14.     public static class SemiJoinMapper extends Mapper {           private Combinevalues combinevalues = new Combinevalues();   
  15.         private HashSet joinKeySet = new HashSet();           private Text flag = new Text();   
  16.         private Text joinKey = new Text();           private Text secondPart = new Text();   
  17.                  @Override 
  18.         protected void setup(Context context)                   throws IOException, InterruptedException {   
  19.             BufferedReader br = null;               //获得当前作业的DistributedCache相关文件   
  20.             Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());               String joinKeyStr = null;   
  21.             for(Path p : distributePaths){                   if(p.toString().endsWith("joinKey.dat")){   
  22.                     //读缓存文件,并放到mem中                       br = new BufferedReader(new FileReader(p.toString()));   
  23.                     while(null!=(joinKeyStr=br.readLine())){                           joinKeySet.add(joinKeyStr);   
  24.                     }                   }   
  25.             }           }   
  26.         @Override         protected void map(Object key, Text value, Context context)   
  27.                 throws IOException, InterruptedException {               //获得文件输入路径   
  28.             String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();               //数据来自tb_dim_city.dat文件,标志即为"0"   
  29.             if(pathName.endsWith("tb_dim_city.dat")){                   String[] valueItems = value.toString().split("\|");   
  30.                 //过滤格式错误的记录                   if(valueItems.length != 5){   
  31.                     return;                   }   
  32.                 //过滤掉不需要参加join的记录                   if(joinKeySet.contains(valueItems[0])){   
  33.                     flag.set("0");                       joinKey.set(valueItems[0]);   
  34.                     secondPart.set(valueItems[1]+"t"+valueItems[2]+"t"+valueItems[3]+"t"+valueItems[4]);                       combinevalues.setFlag(flag);   
  35.                     combinevalues.setJoinKey(joinKey);                       combinevalues.setSecondPart(secondPart);   
  36.                     context.write(combinevalues.getJoinKey(), combinevalues);                   }else{   
  37.                     return ;                   }   
  38.             }//数据来自于tb_user_profiles.dat,标志即为"1"               else if(pathName.endsWith("tb_user_profiles.dat")){   
  39.                 String[] valueItems = value.toString().split("\|");                   //过滤格式错误的记录   
  40.                 if(valueItems.length != 4){                       return;   
  41.                 }                   //过滤掉不需要参加join的记录   
  42.                 if(joinKeySet.contains(valueItems[3])){                       flag.set("1");   
  43.                     joinKey.set(valueItems[3]);                       secondPart.set(valueItems[0]+"t"+valueItems[1]+"t"+valueItems[2]);   
  44.                     combinevalues.setFlag(flag);                       combinevalues.setJoinKey(joinKey);   
  45.                     combinevalues.setSecondPart(secondPart);                       context.write(combinevalues.getJoinKey(), combinevalues);   
  46.                 }else{                       return ;   
  47.                 }               }   
  48.         }       }   
  49.     public static class SemiJoinReducer extends Reducer {           //存储一个分组中的左表信息   
  50.         private ArrayList leftTable = new ArrayList();           //存储一个分组中的右表信息   
  51.         private ArrayList rightTable = new ArrayList();           private Text secondPar = null;   
  52.         private Text output = new Text();            
  53.         @Override         protected void reduce(Text key, Iterable value, Context context)   
  54.                 throws IOException, InterruptedException {               leftTable.clear();   
  55.             rightTable.clear();                
  56.             for(Combinevalues cv : value){                   secondPar = new Text(cv.getSecondPart().toString());   
  57.                 //左表tb_dim_city                   if("0".equals(cv.getFlag().toString().trim())){   
  58.                     leftTable.add(secondPar);                   }   
  59.                 //右表tb_user_profiles                   else if("1".equals(cv.getFlag().toString().trim())){   
  60.                     rightTable.add(secondPar);                   }   
  61.             }               logger.info("tb_dim_city:"+leftTable.toString());   
  62.             logger.info("tb_user_profiles:"+rightTable.toString());               for(Text leftPart : leftTable){   
  63.                 for(Text rightPart : rightTable){                       output.set(leftPart+ "t" + rightPart);   
  64.                     context.write(key, output);                   }   
  65.             }           }   
  66.     }       @Override 
  67.     public int run(String[] args) throws Exception {               Configuration conf=getConf(); //获得配置文件对象   
  68.             DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);
  69.             Job job=new Job(conf,"LeftOutJoinMR");               job.setJarByClass(SemiJoin.class);             FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径   
  70.             FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径
  71.  
  72.             job.setMapperClass(SemiJoinMapper.class);               job.setReducerClass(SemiJoinReducer.class);             job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式   
  73.             job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式
  74.  
  75.             //设置map的输出key和value类型               job.setMapOutputKeyClass(Text.class);   
  76.             job.setMapOutputValueClass(Combinevalues.class);
  77.  
  78.             //设置reduce的输出key和value类型               job.setOutputKeyClass(Text.class);   
  79.             job.setOutputValueClass(Text.class);               job.waitForCompletion(true);   
  80.             return job.isSuccessful()?0:1;       }   
  81.     public static void main(String[] args) throws IOException,               ClassNotFoundException, InterruptedException {   
  82.         try {               int returnCode =  ToolRunner.run(new SemiJoin(),args);   
  83.             System.exit(returnCode);           } catch (Exception e) {   
  84.             logger.error(e.getMessage());           }   
  85.     }   } 

这里还说说SemiJoin也是有一定的适用范围的,其抽取出来进行join的key是要放到内存中的,所以不能够太大,容易在Map端造成OOM。

三、总结

blog介绍了三种join方式。这三种join方式适用于不同的场景,其处理效率上的相差还是蛮大的,其中主要导致因素是网络传输。Map join效率***,其次是SemiJoin,***的是reduce join。另外,写分布式大数据处理程序的时***要对整体要处理的数据分布情况作一个了解,这可以提高我们代码的效率,使数据的倾斜度降到***,使我们的代码倾向性更好。

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/796371.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号