栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

107.HBase之Endpoint Coprocessor的调用

107.HBase之Endpoint Coprocessor的调用

Endpoint Coprocessor客户端调用过程:

107.1 演示环境介绍

CM版本:5.14.3CDH版本:5.14.3编写示例代码及运行为Java 107.2 操作演示

Hbase中自带的Endpoint的协处理器,所以首先确认hbase-examples-1.2.0-cdh5.14.2.jar是否在

[root@ip-168-31-8-230 lib]# pwd
/opt/cloudera/parcels/CDH/lib/hbase/lib
[root@ip-168-31-8-230 lib]# ll hbase-examples-1.2.0-cdh5.14.2.jar 

生成TestTable测试表及数据

[root@ip-168-31-8-230 ~]# hbase pe --rows=10000000 randomWrite 1

登录CM进入Hbase服务进行配置

配置自定义的Endpoint类,因为Endpoint类型的Coprocessor运行在Hbase 的RegionServer中,所以这里只需要配置Hbase Coprocessor Region类在这里的配置为全局配置,协处理器有两种使用方式上图的方式是其中的一种,另外一种则是对单个表进行修改

org.apache.hadoop.hbase.coprocessor.example.RowCountEndpoint

pom.xml文件内容如下


    
        cdh-project
        com.cloudera
        1.0-SNAPSHOT
    
    4.0.0
    hbase-demo
    jar
    hbase-demo
    http://maven.apache.org
    
        UTF-8
    
    
        
            org.apache.hadoop
            hadoop-client
            2.6.0-cdh5.11.2
        
        
            org.apache.hadoop
            hadoop-common
            2.6.0-cdh5.11.2
        
        
            org.apache.hbase
            hbase-client
            1.2.0-cdh5.11.2
        
        
            org.apache.hbase
            hbase-examples
            1.2.0-cdh5.11.2
        
    

CoprocessorExample.java类,编写内容如下

package com.cloudera.hbase.coprocessor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

public class CoprocessorExample {
    public static void main(String[] args) {
        //初始化Hbase配置
        Configuration configuration = HbaseConfiguration.create();
        configuration.set("hbase.zookeeper.property.clientPort", "2181");
        configuration.setStrings("hbase.zookeeper.quorum", "ip-168-31-5-38.ap-southeast-1.compute.internal,ip-168-31-8-230.ap-southeast-1.compute.internal,ip-168-31-5-171.ap-southeast-1.compute.internal");
        try {
            //创建一个Hbase的Connection
            Connection connection = ConnectionFactory.createConnection(configuration);
            Table testTable = connection.getTable(TableName.valueOf("TestTable"));
            execBatchEndpointCoprocessor(testTable);
            execEndpointCoprocessor(testTable);
            execFastEndpointCoprocessor(testTable);
            //关闭连接
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    public static long execBatchEndpointCoprocessor(Table table) {
        byte[] s= Bytes.toBytes("00000000000000000000000000");
        byte[] e= Bytes.toBytes("00000000000000000000000010");
        long start_t = System.currentTimeMillis();
        Map batchMap = null;
        try {
            batchMap = table.batchCoprocessorService(
                    ExampleProtos.RowCountService.getDescriptor().findMethodByName("getKeyValueCount"),
                    ExampleProtos.CountRequest.getDefaultInstance(),
                    s,
                    e,
                    ExampleProtos.CountResponse.getDefaultInstance());
        } catch (Throwable throwable) {
            throwable.printStackTrace();
        }
        long batch_count = 0;
        System.out.println("Region Size:" + batchMap.size());
        for (ExampleProtos.CountResponse response : batchMap.values()) {
            batch_count += response.getCount();
        }
        System.out.println("方式一耗时:" + (System.currentTimeMillis() - start_t));
        System.out.println("方式一统计数量:" + batch_count);
        return batch_count;
    }
    
    public static long execEndpointCoprocessor(Table table) {
        try {
            long start_t = System.currentTimeMillis();
            
            Map map = table.coprocessorService(ExampleProtos.RowCountService.class, null, null, new Batch.Call() {
                @Override
                public Long call(ExampleProtos.RowCountService rowCountService) throws IOException {
                    ExampleProtos.CountRequest requet = ExampleProtos.CountRequest.getDefaultInstance();
                    BlockingRpcCallback rpcCallback = new BlockingRpcCallback<>();
                    rowCountService.getKeyValueCount(null, requet, rpcCallback);
                    ExampleProtos.CountResponse response = rpcCallback.get();
                    return response.getCount();
                }
            });
            //对协处理器返回的所有Region的数量累加得出表的总条数
            long count = 0;
            System.out.println("Region Size:" + map.size());
            for(Long count_r : map.values()) {
                count += count_r;
            }
            System.out.println("方式二耗时:" + (System.currentTimeMillis() - start_t));
            System.out.println("方式二统计数量:" + count);
        } catch (Throwable throwable) {
            throwable.printStackTrace();
        }
        return 0l;
    }
    
    public static long execFastEndpointCoprocessor(Table table) {
        long start_t = System.currentTimeMillis();
        //定义总的 rowCount 变量
        AtomicLong totalRowCount = new AtomicLong();
        try {
            Batch.Callback callback = new Batch.Callback() {
                @Override
                public void update(byte[] region, byte[] row, Long result) {
                    totalRowCount.getAndAdd(result);
                }
            };
            table.coprocessorService(ExampleProtos.RowCountService.class, null, null, new Batch.Call() {
                @Override
                public Long call(ExampleProtos.RowCountService rowCountService) throws IOException {
                    ExampleProtos.CountRequest requet = ExampleProtos.CountRequest.getDefaultInstance();
                    BlockingRpcCallback rpcCallback = new BlockingRpcCallback<>();
                    rowCountService.getKeyValueCount(null, requet, rpcCallback);
                    ExampleProtos.CountResponse response = rpcCallback.get();
                    return response.getCount();
                }
            }, callback);
        } catch (Throwable throwable) {
            throwable.printStackTrace();
        }
        System.out.println("方式三耗时:" + (System.currentTimeMillis() - start_t));
        System.out.println("方式三统计数量:" + totalRowCount.longValue());
        return totalRowCount.longValue();
    }
}

Hbase表统计效率对比

使用Hbase的count来统计测试表的总条数

[root@ip-168-31-8-230 ~]# hbase shell
hbase(main):001:0> count 'TestTable', INTERVAL => 1000000, CACHE => 10000

使用Hbase提供的MapReduce方式统计测试表的总条数

[root@ip-168-31-8-230 ~]# hbase org.apache.hadoop.hbase.mapreduce.RowCounter TestTable

查看MapReduce耗时使用Hbase协处理器执行测试表统计耗时统计:
结论:

在使用Hbase的coprocessor方法是如果传入startkey和endkey是会根据rowkey的访问检索出符合条件的region并统计每个region上数据量Hbase的Endpoint Coprocessor协处理器可以通过CM的方式配置全局的也可以通过客户端或hbase shell的方式来指定某一个表使用比较灵活

大数据视频推荐:
CSDN
大数据语音推荐:
企业级大数据技术应用
大数据机器学习案例之推荐系统
自然语言处理
大数据基础
人工智能:深度学习入门到精通

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/752647.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号