栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

SparkStreaming Bulkload入Hyperbase--应用与原理

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SparkStreaming Bulkload入Hyperbase--应用与原理

SparkStreaming Bulkload入Hyperbase–应用与原理


一、环境准备

见《Spark3.1.2 on TDH622》

二、补充jar包 三、关键代码说明
  • 接入kafka数据

    JavaInputDStream> stream =
                    KafkaUtils.createDirectStream(
                            ssc,
                            LocationStrategies.PreferConsistent(),
                            ConsumerStrategies.Subscribe(topics, kafkaParams));
    
  • kafka消息拆包,解析为单条数据

    JavaDStream messages = stream.repartition(shuffleNum).map(new GetValueFunc());
    
    public class GetValueFunc implements Function, String>, Serializable {
        @Override
        public String call(ConsumerRecord consumerRecord) throws Exception {
            return consumerRecord.value();
        }
    }
    
  • 解析数据,生产hbase的KeyValue对象

    JavaPairDStream putStream = messages.flatMapToPair(new StringToKeyValueFunc());
    
    public class StringToKeyValueFunc implements PairFlatMapFunction, Serializable {
    
        Random random = null;
    
        @Override
        public Iterator> call(String s) throws Exception {
            if (random == null) {
                random = new Random();
            }
            String[] line = s.split(",");
            String rowkey = line[0];
            String name = line[1];
            String age = line[2];
            
            List> puts = new linkedList<>();
    
            KeyValue kv1 = new KeyValue(Bytes.toBytes(rowkey), Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes(name));
            String key = Bytes.toString(Bytes.toBytes(rowkey)) + "01" + "cf" + "01" + "name" + "01" + random.nextInt(9997);
            puts.add(new Tuple2(new ImmutableBytesWritable(Bytes.toBytes(key)), kv1));
            KeyValue kv2 = new KeyValue(Bytes.toBytes(rowkey), Bytes.toBytes("cf"), Bytes.toBytes("age"), Bytes.toBytes(age));
            String key2 = Bytes.toString(Bytes.toBytes(rowkey)) + "01" + "cf" + "01" + "age" + "01" + "01" + random.nextInt(9997);
            puts.add(new Tuple2(new ImmutableBytesWritable(Bytes.toBytes(key2)), kv2));
    
            return puts.iterator();
        }
    }
    
  • 生成HFile,bulkload入库

    putStream.foreachRDD(new BulkloadFunc(shuffleNum));
    
    public class BulkloadFunc implements VoidFunction>, Serializable {
    
        private int sortNum;
    
        public BulkloadFunc(int sortNum) {
            this.sortNum = sortNum;
        }
    
        @Override
        public void call(JavaPairRDD rdd) throws Exception {
            DateTime currentTime = new DateTime();
            String day = currentTime.toString("yyyyMMdd");
            String tableName = "default:testbulk2";
            Configuration conf = new Configuration(false);
            conf.setClassLoader(BulkloadFunc.class.getClassLoader());
            conf.addResource("hbase-site.xml");
            conf.addResource("hdfs-site.xml");
            conf.addResource("core-site.xml");
    
            conf.setStrings("io.serializations", conf.get("io.serializations"),
                    KeyValueSerialization.class.getName(), WritableSerialization.class.getName());
    
            Job job = Job.getInstance(conf);
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(KeyValue.class);
    
            HTable table = new HTable(conf, tableName);
            HFileOutputFormat.configureIncrementalLoad(job, table);
            String path = "hdfs://nameservice1/tmp/lsk2/" + currentTime.toString("yyyyMMddHHmmss");
            job.getConfiguration().set("mapred.output.dir", path);
            rdd.sortByKey(new MyComparator(), true, sortNum).saveAsNewAPIHadoopDataset(job.getConfiguration());
            load(conf,path,tableName);
        }
    
        private void load(Configuration conf,String path,String tableName) throws Exception {
            HTable table = new HTable(conf,tableName);
            LoadIncrementalHFiles bulkLoader = new LoadIncrementalHFiles(conf);
            bulkLoader.doBulkLoad(new Path(path),table);
            FileSystem fileSystem = FileSystem.get(conf);
            fileSystem.delete(new Path(path));
        }
    }
    
四、原理解析 1.Hbase存储原理

从Hbase的架构图上可以看出,Hbase中的存储包括HMaster、HRegionSever、HRegion、HLog、Store、MemStore、StoreFile、HFile等。Hbase是Google的BigTable的开源实现,底层存储引擎是基于LSM-Tree数据结构设计的。写入数据时会先写WAL日志,再将数据写到写缓存MemStore中,等写缓存达到一定规模后或满足其他触发条件才会flush刷写到磁盘,这样就将磁盘随机写变成了顺序写,提高了写性能。每一次刷写磁盘都会生成新的HFile文件。

2.MemStore的主要作用
  1. 更新数据存储在 MemStore 中,使用 LSM(Log-Structured Merge Tree)数据结构存储,在内存内进行排序整合。即保证写入数据有序(HFile中数据都按照RowKey进行排序),同时可以极大地提升Hbase的写入性能。
  2. 作为内存缓存,读取数据时会优先检查 MemStore,根据局部性原理,新写入的数据被访问的概率更大。
  3. 在持久化写入前可以做某些优化,例如:保留数据的版本设置为1,持久化只需写入最新版本。
3.HFile原理
  • HFile主要分为四个部分:Scanned block section,Non-scanned block section,Opening-time data section和Trailer。

    • Scanned block section:表示顺序扫描HFile时(包含所有需要被读取的数据)所有的数据块将会被读取,包括Leaf Index Block和Bloom Block;
    • Non-scanned block section:HFile顺序扫描的时候该部分数据不会被读取,主要包括meta Block和Intermediate Level Data Index Blocks两部分;
    • Load-on-open-section:这部分数据在Hbase的region server启动时,需要加载到内存中。包括FileInfo、Bloom filter block、data block index和meta block index;
    • Trailer:这部分主要记录了HFile的基本信息、各个部分的偏移值和寻址信息。

  • HFile生成过程

    • 起初数据存在于MemStore中,Flush发生时,创建HFile Writer,MemStore中的KeyValues被一个个append到位于内存中的Data Block
    • append时,会对Cell进行排序。(注:KeyValue是Cell的一种实现)
    • 针对有序的Cell,HFile会生成三级索引:Root Index – Intermediate Level Data Index Block – Leaf Index Block
  • HFile中KeyValue的排序规则

    org.apache.hadoop.hbase.CellComparator类中的compare()方法是KeyValue排序的核心方法

    public static int compare(final Cell a, final Cell b, boolean ignoreSequenceid) {
        // row
        int c = compareRows(a, b);
        if (c != 0) return c;
    
        c = compareWithoutRow(a, b);
        if(c != 0) return c;
    
        if (!ignoreSequenceid) {
          // Negate following comparisons so later edits show up first
          // mvccVersion: later sorts first
          return Longs.compare(b.getMvccVersion(), a.getMvccVersion());
        } else {
          return c;
        }
      }
    

    compareRows()方法是比较rowkey的大小,保证rowkey按照字典顺序排列。若rowkey相同,则进入compareWithoutRow()方法,compareWithoutRow()方法的核心逻辑如下:

        boolean sameFamilySize = (leftCell.getFamilyLength() == rightCell.getFamilyLength());
        if (!sameFamilySize) {
          // comparing column family is enough.
    
          return Bytes.compareTo(leftCell.getFamilyArray(), leftCell.getFamilyOffset(),
              leftCell.getFamilyLength(), rightCell.getFamilyArray(), rightCell.getFamilyOffset(),
              rightCell.getFamilyLength());
        }
        int diff = compareColumns(leftCell, rightCell);
        if (diff != 0) return diff;
    
        diff = compareTimestamps(leftCell, rightCell);
        if (diff != 0) return diff;
    
        // Compare types. Let the delete types sort ahead of puts; i.e. types
        // of higher numbers sort before those of lesser numbers. Maximum (255)
        // appears ahead of everything, and minimum (0) appears after
        // everything.
        return (0xff & rightCell.getTypeByte()) - (0xff & leftCell.getTypeByte());
    

    若列族的(字节)长度不一致,则按照字段顺序去比较列族就可以返回了。若长度一致,则会根据字典顺序比较列族,若列族一致,则继续比较列名、时间戳(按大小)、cell的type,直到结果不为0或者以上项都相同。

    sequenceId是关联WAL、HFile、MemStore三者内容的机制,此处略。

4.Spark生成HFile

核心思想:将kafka的数据组装成KeyValue对象,保证KeyValue有序,且必须和上节讲述的排序规则一致。

  • 将kafka数据解析,组装成KeyValue对象。如:

    String[] line = s.split(",");
    String rowkey = line[0];
    String name = line[1];
    String age = line[2];
    KeyValue kv1 = new KeyValue(Bytes.toBytes(rowkey), Bytes.toBytes("cf"), Bytes.toBytes("name"), Bytes.toBytes(name));
    KeyValue kv2 = new KeyValue(Bytes.toBytes(rowkey), Bytes.toBytes("cf"), Bytes.toBytes("age"), Bytes.toBytes(age));
    
  • 根据排序规则,构造用来排序的key。

    String key = Bytes.toString(Bytes.toBytes(rowkey)) + "01" + "cf" + "01" + "name" + "01" + random.nextInt(9997);
    String key2 = Bytes.toString(Bytes.toBytes(rowkey)) + "01" + "cf" + "01" + "age" + "01" + random.nextInt(9997);
    
  • 根据排序规则,自定义排序算法

    String[] keyLeft = Bytes.toString(left.get()).split("01");
    String[] keyRight = Bytes.toString(right.get()).split("01");
    int compareResult = 0;
    for (int i = 0; i < size; i++) {
        compareResult = Bytes.compareTo(Bytes.toBytes(keyLeft[i]), Bytes.toBytes(keyRight[i]));
        if (compareResult != 0) {
            return compareResult;
        }
    }
    return compareResult;
    
  • 将rdd排序并生产HFile至HDFS

    rdd.sortByKey(new MyComparator(), true, sortNum).saveAsNewAPIHadoopDataset(job.getConfiguration());
    
5.Hbase表Load HFile
private void load(Configuration conf,String path,String tableName) throws Exception {
        HTable table = new HTable(conf,tableName);
        LoadIncrementalHFiles bulkLoader = new LoadIncrementalHFiles(conf);
        bulkLoader.doBulkLoad(new Path(path),table);
        FileSystem fileSystem = FileSystem.get(conf);
        fileSystem.delete(new Path(path));
    }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/458423.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号