Endpoint Coprocessor客户端调用过程:
CM版本:5.14.3CDH版本:5.14.3编写示例代码及运行为Java 107.2 操作演示
Hbase中自带的Endpoint的协处理器,所以首先确认hbase-examples-1.2.0-cdh5.14.2.jar是否在
[root@ip-168-31-8-230 lib]# pwd /opt/cloudera/parcels/CDH/lib/hbase/lib [root@ip-168-31-8-230 lib]# ll hbase-examples-1.2.0-cdh5.14.2.jar
生成TestTable测试表及数据
[root@ip-168-31-8-230 ~]# hbase pe --rows=10000000 randomWrite 1
登录CM进入Hbase服务进行配置
配置自定义的Endpoint类,因为Endpoint类型的Coprocessor运行在Hbase 的RegionServer中,所以这里只需要配置Hbase Coprocessor Region类在这里的配置为全局配置,协处理器有两种使用方式上图的方式是其中的一种,另外一种则是对单个表进行修改
org.apache.hadoop.hbase.coprocessor.example.RowCountEndpoint
pom.xml文件内容如下
cdh-project com.cloudera 1.0-SNAPSHOT 4.0.0 hbase-demojar hbase-demo http://maven.apache.org UTF-8 org.apache.hadoop hadoop-client2.6.0-cdh5.11.2 org.apache.hadoop hadoop-common2.6.0-cdh5.11.2 org.apache.hbase hbase-client1.2.0-cdh5.11.2 org.apache.hbase hbase-examples1.2.0-cdh5.11.2
CoprocessorExample.java类,编写内容如下
package com.cloudera.hbase.coprocessor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
public class CoprocessorExample {
public static void main(String[] args) {
//初始化Hbase配置
Configuration configuration = HbaseConfiguration.create();
configuration.set("hbase.zookeeper.property.clientPort", "2181");
configuration.setStrings("hbase.zookeeper.quorum", "ip-168-31-5-38.ap-southeast-1.compute.internal,ip-168-31-8-230.ap-southeast-1.compute.internal,ip-168-31-5-171.ap-southeast-1.compute.internal");
try {
//创建一个Hbase的Connection
Connection connection = ConnectionFactory.createConnection(configuration);
Table testTable = connection.getTable(TableName.valueOf("TestTable"));
execBatchEndpointCoprocessor(testTable);
execEndpointCoprocessor(testTable);
execFastEndpointCoprocessor(testTable);
//关闭连接
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static long execBatchEndpointCoprocessor(Table table) {
byte[] s= Bytes.toBytes("00000000000000000000000000");
byte[] e= Bytes.toBytes("00000000000000000000000010");
long start_t = System.currentTimeMillis();
Map batchMap = null;
try {
batchMap = table.batchCoprocessorService(
ExampleProtos.RowCountService.getDescriptor().findMethodByName("getKeyValueCount"),
ExampleProtos.CountRequest.getDefaultInstance(),
s,
e,
ExampleProtos.CountResponse.getDefaultInstance());
} catch (Throwable throwable) {
throwable.printStackTrace();
}
long batch_count = 0;
System.out.println("Region Size:" + batchMap.size());
for (ExampleProtos.CountResponse response : batchMap.values()) {
batch_count += response.getCount();
}
System.out.println("方式一耗时:" + (System.currentTimeMillis() - start_t));
System.out.println("方式一统计数量:" + batch_count);
return batch_count;
}
public static long execEndpointCoprocessor(Table table) {
try {
long start_t = System.currentTimeMillis();
Map map = table.coprocessorService(ExampleProtos.RowCountService.class, null, null, new Batch.Call() {
@Override
public Long call(ExampleProtos.RowCountService rowCountService) throws IOException {
ExampleProtos.CountRequest requet = ExampleProtos.CountRequest.getDefaultInstance();
BlockingRpcCallback rpcCallback = new BlockingRpcCallback<>();
rowCountService.getKeyValueCount(null, requet, rpcCallback);
ExampleProtos.CountResponse response = rpcCallback.get();
return response.getCount();
}
});
//对协处理器返回的所有Region的数量累加得出表的总条数
long count = 0;
System.out.println("Region Size:" + map.size());
for(Long count_r : map.values()) {
count += count_r;
}
System.out.println("方式二耗时:" + (System.currentTimeMillis() - start_t));
System.out.println("方式二统计数量:" + count);
} catch (Throwable throwable) {
throwable.printStackTrace();
}
return 0l;
}
public static long execFastEndpointCoprocessor(Table table) {
long start_t = System.currentTimeMillis();
//定义总的 rowCount 变量
AtomicLong totalRowCount = new AtomicLong();
try {
Batch.Callback callback = new Batch.Callback() {
@Override
public void update(byte[] region, byte[] row, Long result) {
totalRowCount.getAndAdd(result);
}
};
table.coprocessorService(ExampleProtos.RowCountService.class, null, null, new Batch.Call() {
@Override
public Long call(ExampleProtos.RowCountService rowCountService) throws IOException {
ExampleProtos.CountRequest requet = ExampleProtos.CountRequest.getDefaultInstance();
BlockingRpcCallback rpcCallback = new BlockingRpcCallback<>();
rowCountService.getKeyValueCount(null, requet, rpcCallback);
ExampleProtos.CountResponse response = rpcCallback.get();
return response.getCount();
}
}, callback);
} catch (Throwable throwable) {
throwable.printStackTrace();
}
System.out.println("方式三耗时:" + (System.currentTimeMillis() - start_t));
System.out.println("方式三统计数量:" + totalRowCount.longValue());
return totalRowCount.longValue();
}
}
Hbase表统计效率对比
使用Hbase的count来统计测试表的总条数
[root@ip-168-31-8-230 ~]# hbase shell hbase(main):001:0> count 'TestTable', INTERVAL => 1000000, CACHE => 10000
使用Hbase提供的MapReduce方式统计测试表的总条数
[root@ip-168-31-8-230 ~]# hbase org.apache.hadoop.hbase.mapreduce.RowCounter TestTable
查看MapReduce耗时使用Hbase协处理器执行测试表统计耗时统计:
结论:
在使用Hbase的coprocessor方法是如果传入startkey和endkey是会根据rowkey的访问检索出符合条件的region并统计每个region上数据量Hbase的Endpoint Coprocessor协处理器可以通过CM的方式配置全局的也可以通过客户端或hbase shell的方式来指定某一个表使用比较灵活
大数据视频推荐:
CSDN
大数据语音推荐:
企业级大数据技术应用
大数据机器学习案例之推荐系统
自然语言处理
大数据基础
人工智能:深度学习入门到精通



