1.java api 操作hbase2.过滤器查询3.Hbase整合Hive
3.1 准备Hbase表和数据 4.HbaseToHDFS:Hbase表数据写入HDFS
1.java api 操作hbasepublic class HbaseDemoTest {
// 声明静态配置
static Configuration conf = null;
private static final String ZK_CONNECT_STR =
"bigdata02:2181,bigdata03:2181,bigdata04:2181,bigdata05:2181";
static {
conf = HbaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", ZK_CONNECT_STR);
}
public static void creatTable(String tableName, String[] family) throws Exception {
HbaseAdmin admin = new HbaseAdmin(conf);
HTableDescriptor desc = new HTableDescriptor(tableName);
for (int i = 0; i < family.length; i++) {
desc.addFamily(new HColumnDescriptor(family[i]));
}
if (admin.tableExists(tableName)) {
System.out.println("table Exists!")
System.exit(0);
} else {
admin.createTable(desc);
System.out.println("create table Success!");
}
}
public static void addData(
String rowKey, String tableName, String[] column1, String[] value1, String[] column2, String[] value2) throws IOException {
// 设置rowkey
Put put = new Put(Bytes.toBytes(rowKey));
// HTabel负责跟记录相关的操作如增删改查等//
HTable table = new HTable(conf, Bytes.toBytes(tableName));
// 获取所有的列簇
HColumnDescriptor[] columnFamilies = table.getTableDescriptor().getColumnFamilies();
for (int i = 0; i < columnFamilies.length; i++) {
// 获取列簇名
String familyName = columnFamilies[i].getNameAsString();
// article列簇put数据
if (familyName.equals("article")) {
for (int j = 0; j < column1.length; j++) {
put.add(Bytes.toBytes(familyName),
Bytes.toBytes(column1[j]), Bytes.toBytes(value1[j]));
}
}
// author列簇put数据
if (familyName.equals("author")) {
for (int j = 0; j < column2.length; j++) {
put.add(Bytes.toBytes(familyName),
Bytes.toBytes(column2[j]), Bytes.toBytes(value2[j]));
}
}
}
table.put(put);
System.out.println("add data Success!");
}
public static void getResultScann(String tableName) throws IOException {
Scan scan = new Scan();
ResultScanner rs = null;
HTable table = new HTable(conf, Bytes.toBytes(tableName));
try {
rs = table.getScanner(scan);
for (Result r : rs) {
for (KeyValue kv : r.list()) {
printKeyValue(kv);
}
}
} finally {
rs.close();
}
}
public static void getResultScann(String tableName, String start_rowkey,
String stop_rowkey) throws IOException {
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes(start_rowkey));
scan.setStopRow(Bytes.toBytes(stop_rowkey));
ResultScanner rs = null;
HTable table = new HTable(conf, Bytes.toBytes(tableName));
try {
rs = table.getScanner(scan);
printResultScanner(rs);
} finally {
rs.close();
}
}
public static void getResultByColumn(String tableName, String rowKey, String
familyName, String columnName) throws IOException {
HTable table = new HTable(conf, Bytes.toBytes(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
// 获取指定列簇和列修饰符对应的列
get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName));
Result result = table.get(get);
for (KeyValue kv : result.list()) {
printKeyValue(kv);
}
}
public static void updateTable(
String tableName, String rowKey, String familyName, String
columnName, String value) throws IOException {
HTable table = new HTable(conf, Bytes.toBytes(tableName));
Put put = new Put(Bytes.toBytes(rowKey));
put.add(Bytes.toBytes(familyName), Bytes.toBytes(columnName),
Bytes.toBytes(value));
table.put(put);
System.out.println("update table Success!");
}
public static void getResultByVersion(String tableName, String rowKey,
String familyName, String columnName) throws IOException {
HTable table = new HTable(conf, Bytes.toBytes(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName));
get.setMaxVersions(5);
Result result = table.get(get);
for (KeyValue kv : result.list()) {
printKeyValue(kv);
}
}
public static void deleteAllColumn(String tableName, String rowKey) throws
IOException {
HTable table = new HTable(conf, Bytes.toBytes(tableName));
Delete deleteAll = new Delete(Bytes.toBytes(rowKey));
table.delete(deleteAll);
System.out.println("all columns are deleted!");
}
public static void deleteTable(String tableName) throws IOException {
HbaseAdmin admin = new HbaseAdmin(conf);
admin.disableTable(tableName);
admin.deleteTable(tableName);
System.out.println(tableName + "is deleted!");
}
}
2.过滤器查询
比较过滤器,专用过滤器
二.比较器
LESS < LESS_OR_EQUAL <= EQUAL = NOT_EQUAL <> GREATER_OR_EQUAL >= GREATER > NO_OP 不做任何操作
三.过滤器
BinaryComparator 按字节索引顺序比较指定字节数组,采用 Bytes.compareTo(byte[]) BinaryPrefixComparator 跟前面相同,只是比较左端的数据是否相同 NullComparator 判断给定的是否为空 BitComparator 按位比较 RegexStringComparator 提供一个正则的比较器,仅支持 EQUAL 和 NOT_EQUAL SubstringComparator 判断提供的子串是否出现在 value 中,仅支持 EQUAL 和 NOT_EQUAL
四.比较过滤器
行键过滤器RowFilter
Filter filter1 = new RowFilter(CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("user0000")));
scan.setFilter(filter1);
列簇过滤器FamilyFilter
Filter filter1 = new FamilyFilter(CompareOp.LESS,
new BinaryComparator(Bytes.toBytes("base_info")));
scan.setFilter(filter1)
列过滤器QualifierFilter
Filter filter = new QualifierFilter(CompareOp.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("name")));
scan.setFilter(filter1);
值过滤器 ValueFilter
Filter filter = new ValueFilter(CompareOp.EQUAL, new
SubstringComparator("zhangsan") );
scan.setFilter(filter1);
时间戳过滤器 TimestampsFilter
Listtss = new ArrayList (); tss.add(1495398833002l); Filter filter1 = new TimestampsFilter(tss); scan.setFilter(filter1);
专用过滤器
单列值过滤器 SingleColumnValueFilter ----会返回满足条件的整行
SingleColumnValueFilter filter = new SingleColumnValueFilter(
Bytes.toBytes("colfam1"),
Bytes.toBytes("col-5"),
CompareFilter.CompareOp.NOT_EQUAL,
new SubstringComparator("val-5"));
filter.setFilterIfMissing(true); //如果不设置为true,则那些不包含指定column的行也会返回
scan.setFilter(filter1);
单列值排除器SingleColumnValueExcludeFilter -----返回排除了该列的结果
SingleColumnValueFilter filter = new SingleColumnValueFilter(
Bytes.toBytes("colfam1"),
Bytes.toBytes("col-5"),
CompareFilter.CompareOp.NOT_EQUAL,
new SubstringComparator("val-5"));
filter.setFilterIfMissing(true); //如果不设置为true,则那些不包含指定column的行也会返回
scan.setFilter(filter1);
前缀过滤器 PrefixFilter----针对行键
Filter filter = new PrefixFilter(Bytes.toBytes("row1"));
scan.setFilter(filter1);
列前缀过滤器 ColumnPrefixFilter
Filter filter = new ColumnPrefixFilter(Bytes.toBytes("qual2"));
scan.setFilter(filter1);
3.Hbase整合Hive
Hive 与 Hbase 利用两者本身对外的 API 来实现整合,主要是靠 HbaseStorageHandler 进行通信。Hive 与 Hbase 利用两者本身对外的 API 来实现整合,主要是靠HbaseStorageHandler 进行通信。Hive 与 Hbase 利用两者本身对外的 API 来实现整合,主要是靠 HbaseStorageHandler 进行通信。HiveHbaseTableInputFormat 完成对 Hbase 表的切分,获取 RecordReader 对象来读取数据。对 Hbase 按Regions,确定MapReduce 中就有多少个MapTask
3.1 准备Hbase表和数据准备Hbase表和数据
create 'mingxing', {NAME => 'base_info',VERSIONS => 1},{NAME =>
'extra_info',VERSIONS => 1}
插入准备数据:
put 'mingxing','rk001','base_info:name','huangbo' put 'mingxing','rk001','base_info:age','33' put 'mingxing','rk001','extra_info:math','44' put 'mingxing','rk001','extra_info:province','beijing' put 'mingxing','rk002','base_info:name','xuzheng' put 'mingxing','rk002','base_info:age','44' put 'mingxing','rk003','base_info:name','wangbaoqiang' put 'mingxing','rk003','base_info:age','55' put 'mingxing','rk003','base_info:gender','male' put 'mingxing','rk004','extra_info:math','33' put 'mingxing','rk004','extra_info:province','tianjin' put 'mingxing','rk004','extra_info:children','3' put 'mingxing','rk005','base_info:name','liutao' put 'mingxing','rk006','extra_info:name','liujialing'
Hive端操作
指定hbase所使用的zookeeper集群的地址:默认端口是2181,可以不写 set hbase.zookeeper.quorum=bigdata02:2181,bigdata03:2181,bigdata04:2181; 指定hbase在zookeeper中使用的根目录 set zookeeper.znode.parent=/hbase; 加入指定的处理jar add jar /home/bigdata/apps/apache-hive-2.3.6-bin/lib/hive-hbase-handler- 2.3.6.jar;
所有列簇:
create external table mingxing(rowkey string, base_info map, extra_info map ) row format delimited fields terminated by 't' stored by 'org.apache.hadoop.hive.hbase.HbaseStorageHandler' with serdeproperties ("hbase.columns.mapping" = ":key,base_info:,extra_info:") tblproperties("hbase.table.name"="mingxing","hbase.mapred.output.outputtable"="mingxing");
列族 列
hbase.columns.mapping" = ":key,base_info:,extra_info:
部分列簇部分列
create external table mingxing1(rowkey string, name string, province string)
row format delimited fields terminated by 't'
stored by 'org.apache.hadoop.hive.hbase.HbaseStorageHandler'
with serdeproperties ("hbase.columns.mapping" =
":key,base_info:name,extra_info:province")
tblproperties("hbase.table.name"="mingxing","hbase.mapred.output.outputtable"="m
ingxing");
部分参数解释:
org.apache.hadoop.hive.hbase.HbaseStorageHandler:处理hive到hbase转换关系的处理器 hbase.columns.mapping:定义hbase的列簇和列到hive的映射关系 hbase.table.name:hbase表名4.HbaseToHDFS:Hbase表数据写入HDFS
从Hbase中读取数据,分析之后然后写入HDFS,代码实现
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.List;
public class HbaseDataToHDFSMR {
public static final String ZK_ConNECT =
"bigdata02:2181,bigdata03:2181,bigdata04:2181";
public static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum";public static final String HDFS_ConNECT = "hdfs://hadoop277ha/";
public static final String HDFS_CONNECT_KEY = "fs.defaultFS";
public static void main(String[] args) throws Exception {
// 把Hadoop集群的配置文件:core-site.xml 和 hdfs-site.xml 放入 resources 目录中。
Configuration conf = HbaseConfiguration.create();
conf.set(ZK_CONNECT_KEY, ZK_CONNECT);
conf.set(HDFS_CONNECT_KEY, HDFS_CONNECT);
System.setProperty("HADOOP_USER_NAME", "bigdata");
Job job = Job.getInstance(conf);
job.setJarByClass(HbaseDataToHDFSMR.class);
// 输入数据来源于hbase的user_info表
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob("user_info", scan,
HbaseDataToHDFSMRMapper.class, Text.class,
NullWritable.class, job);
// RecordReader --- TableRecordReader
// InputFormat ----- TextInputFormat
// 数据输出到hdfs
FileOutputFormat.setOutputPath(job, new Path("/hbase2hdfs/output2"));
boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion ? 0 : 1);
}
class HbaseDataToHDFSMRMapper extends TableMapper {
@Override
protected void map(ImmutableBytesWritable key, Result value,
Mapper.Context context) throws IOException,
InterruptedException {
// byte[] rowkey = Bytes.copy(key, 0, key.getLength());
String rowkey = Bytes.toString(key.copyBytes());
List| listCells = value.listCells();
Text text = new Text();
// 最后输出格式是: rowkey, base_info:name-huangbo, base-info:age-34
for (Cell cell : listCells) {
String family = new String(CellUtil.cloneFamily(cell));
String qualifier = new String(CellUtil.cloneQualifier(cell));
String v = new String(CellUtil.clonevalue(cell));
long ts = cell.getTimestamp();
text.set(rowkey + "t" + family + "t" + qualifier + "t" + v +
"t" + ts);
context.write(text, NullWritable.get());
}
}
}
}
|
HDFSToHbase:HDFS数据写入Hbase
从HDFS从读入数据,处理之后写入Hbase,代码实现:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
public class HDFSDataToHbaseMR extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new HDFSDataToHbaseMR(), args);
System.exit(run);
}
@Override
public int run(String[] arg0) throws Exception {
Configuration config = HbaseConfiguration.create();
config.set("hbase.zookeeper.quorum",
"bigdata02:2181,bigdata03:2181,bigdata04:2181");
System.setProperty("HADOOP_USER_NAME", "bigdata");
Job job = Job.getInstance(config, "HDFSDataToHbaseMR");
job.setJarByClass(HDFSDataToHbaseMR.class);
job.setMapperClass(HDFSDataToHbase_Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 设置数据的输出组件
TableMapReduceUtil.initTableReducerJob("student",
HDFSDataToHbase_Reducer.class, job);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Put.class);
FileInputFormat.addInputPath(job, new Path("/bigdata/student/input"));
boolean isDone = job.waitForCompletion(true);
return isDone ? 0 : 1;
}
static class HDFSDataToHbase_Mapper extends Mapper {
@Override
protected void map(LongWritable key, Text value, Context context) throws
IOException,
InterruptedException {
context.write(value, NullWritable.get());
}
}
static class HDFSDataToHbase_Reducer extends TableReducer {
protected void reduce(Text key, Iterable values,
Reducer.Context context) throws IOException,
InterruptedException {
String[] split = key.toString().split(",");
Put put = new Put(split[0].getBytes());
put.addColumn("info".getBytes(), "name".getBytes(),
split[1].getBytes());
put.addColumn("info".getBytes(), "sex".getBytes(),
split[2].getBytes());
put.addColumn("info".getBytes(), "age".getBytes(),
split[3].getBytes());
put.addColumn("info".getBytes(), "department".getBytes(),
split[4].getBytes());
context.write(NullWritable.get(), put);
}
}}



