- 配置Hbase环境
- 编辑类文件
- Java操作表
- Java操作表数据
- 比较运算符、比较器
- 过滤器的AND/OR
然后在工程的src右键–new java package
在包内new class file
导入相关的类:
- 使用Hadoop配置
import org.apache.hadoop.conf.Configuration;
- Hbase的客户端
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.client.Connection;//连接类 import org.apache.hadoop.hbase.client.ConnectionFactory;//使用这个建立连接对象 import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.*; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException;
- 声明三个全局变量
public class hbaseJavaProgramme {
//声明三个全局变量
public static Configuration conf;
public static Connection conn;
public static Admin admin;
public static void main(String[] args) {
// TODO Auto-generated method stub
//创建配置对象
conf = HbaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","localhost");
conf.set("hbase.zookeeper.property.clientPort","2181");
try{
conn = ConnectionFactory.createConnection(conf);
admin = conn.getAdmin();//操作表
if(admin.tableExists(TableName.valueOf("lauf"))){
admin.disableTable(TableName.valueOf("lauf"));
admin.isTableDisabled(TableName.valueOf("lauf"));
admin.deleteTable(TableName.valueOf("lauf"));
}
//创建表
HTableDescriptor tdesc = new HTableDescriptor(TableName.valueOf("lauf"));
tdesc.addFamily(new HColumnDescriptor("StuInfo"));
admin.createTable(tdesc);
}catch(IOException e){
e.print.StackTrace();
}
}
}
Java操作表
创建一个表
//先判断是否存在该表,存在则删除
HTableDescriptor tdesc = new HTableDescriptor(TableName.valueOf("lauf"));
tdesc.addFamily(new HColumnDescriptor("StuInfo"));
admin.createTable(tdesc);
删除一个表
if(admin.tableExists(TableName.valueOf("lauf"))){
admin.disableTable(TableName.valueOf("lauf"));
admin.deleteTable(TableName.valueOf("lauf"));
}
表的描述符类:
import org.apache.hadoop.hbase.HTableDescriptor
HTableDescriptor tdesc = new HTableDescriptor(TableNme.valueOf("lauf"));
//描述表结构admin.getTableDescriptor(TableName tableName)
//说明表的类型
//是否只读tdesc.isReadonly()
//设置列族缓存大小tdesc.setMemStoreFlushSize(64*1024*1024);默认单位bytes
//设置最大StoreFile,tdesc.setMaxFileSize(256*1024*1024);
//增加列族tdesc.addFamily(HColumnDescriptor obj)
//删除列族tdesc.removeFamily("StuInfo".getBytes())
通过表描述符获取信息:
tdesc.getNameAsString();获取表名,返回字符串
tdesc.getFamilies();获取所有列族,HColumnDescriptor对象集合,如下,
Collection collection = tdesc.getFamilies();//返回列族集合
Iterator iterator = collection.iterator();//集合转为迭代器
while(iterator.hasNext()){
HColumnDescriptor colDesc = (HColumnDescriptor)iterator.next();
String colName = colDesc.getNameAsString();
System.out.println(colName);
}
tdesc.getColumnFamilies();返回所有列族的数组
HColumnDescriptor[] a = tdesc.getColumnFamilies();
for(HColumnDescriptor columnDescriptor : a) {
String s = columnDescriptor.getNameAsString();
System.out.println(s);
}
通过列簇描述符获取、设置单元格的值:
String s = columnDescriptor.getNameAsString(); byte[] key = "Name".getBytes(); byte[] s1 = columnDescriptor.getValue(key); System.out.println(s1);Java操作表数据
Put/Delete/Get/Scan
全部在org.apache.hadoop.hbase.client.xxx
studentTable.put(Put obj);插入一行的一个或者多个cell
studentTable.delete(Delete obj);删除一行/列族/列
studentTable.get(Get obj);获取一行/列族/列
studentTable.getScanner(scan);
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
TableName studentTableName = TableName.valueOf("Student");
Table studentTable = conn.getTable(studentTableName);
//插入数据
//insert data
public static void insertData() throws IOException{
//获取表名对象
TableName stuTableName = TableName.valueOf("Student");
//获取该表对象
Table studenTable = conn.getTable(stuTableName);
//创建Put对象,并增加key-value
Put put = new Put("003".getBytes());
put.addColumn("StuInfo".getBytes(), "Name".getBytes(), "lauf".getBytes());
put.addColumn("StuInfo".getBytes(), "Age".getBytes(), "23".getBytes());
studenTable.put(put);//也可以是一个put集合
ArrayList list = new ArrayList();
for(int i=0;i<5;i++) {
Put put1 = new Put(("00"+(i+4)).getBytes());
put1.addColumn("StuInfo".getBytes(), "Name".getBytes(), String.valueOf(i).getBytes());
list.add(put1);
}
studenTable.put(list);
studenTable.close();//释放所有资源
}
//delete data
//delete data,只是标记为删除
public static void deleteData() throws IOException{
Table stuTable = conn.getTable(TableName.valueOf("Student"));
Delete delete = new Delete("003".getBytes());//删除行
delete.addColumn("StuInfo".getBytes(), "Age".getBytes());//删除列
//delete.addFamily(xxx)//删除列族
stuTable.delete(delete);
stuTable.close();
}
//get查询数据
import org.apache.hadoop.hbase.client.Result;//单行
import org.apache.hadoop.hbase.client.ResultScanner;//多行
import org.apache.hadoop.hbase.Cell;//一个单元格
import org.apache.hadoop.hbase.CellUtil;//操作cell
//get data
public static void getRow(String tableName, String rowKey) throws IOException {
Table table = conn.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
Result result = table.get(get);
for (Cell cell : result.rawCells()) {
System.out.println(
"行键:" + new String(CellUtil.cloneRow(cell)) + "t" +
"列族:" + new String(CellUtil.cloneFamily(cell)) + "t" +
"列名:" + new String(CellUtil.cloneQualifier(cell)) + "t" +
"值:" + new String(CellUtil.clonevalue(cell)) + "t" +
"时间戳:" + cell.getTimestamp());
}
table.close();
}
//扫描数据
//scan data
public static void scanTable(String tableName) throws IOException {
Table table = conn.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
ResultScanner results = table.getScanner(scan);
for(Result result : results){
for (Cell cell : result.rawCells()) {
System.out.println(
"行键:" + new String(CellUtil.cloneRow(cell)) + "t" +
"列族:" + new String(CellUtil.cloneFamily(cell)) + "t" +
"列名:" + new String(CellUtil.cloneQualifier(cell)) + "t" +
"值:" + new String(CellUtil.clonevalue(cell)) + "t" +
"时间戳:" + cell.getTimestamp());
}
}
table.close();
}
//带有过滤器的san
import org.apache.hadoop.hbase.filter.*
Scan scan = new Scan();
Filter f1 = new RowFilter(CompareOp.GREATER, new BinaryComparator("002".getBytes()));
scan.setFilter(f1);//设置一个过滤器
ResultScanner results = table.getScanner(scan);
for(Result result : results){
for (Cell cell : result.rawCells()) {
}
//多个过滤器
Filter f1 = new RowFilter(CompareOp.GREATER,new BinaryComparator("005".getBytes()));
Filter f2 = new ColumnPrefixFilter("Name".getBytes());
FilterList filterList = new FilterList(f1,f2,..);
scan.setFilter(filterList);//过滤器集合
比较运算符、比较器
- 设置一个Filter
Scan scan = new Scan();
Filter f1 = new RowFilter(CompareOp.GREATER,new BinaryComparator("001".getBytes()));
scan.setFilter(f1);
//多条结果
ResultScanner results = table.getScanner(scan);
- 设置多个过滤器时,需要指定是AND操作,还是OR操作
Filter f1 = new RowFilter(CompareOp.LESS,new SubstringComparator("003".getBytes()));
Filter f2 = new SingleColumnValueFilter("StuInfo".getBytes,"Age".getBytes(),CompareOp.LESS,new BinaryComparator("24".getBytes()));
//过滤器集合
FilterList filterList = new FilterList(Operator.MUST_PASS_ALL);//AND操作
FilterList filterList1 = new FilterList(Operator.MUST_PASS_ONE);//OR操作
filterList.addFilter(f1);
filterList.addFilter(f2);
scan.setFilter(filterList);
ResultScanner results = table.getScanner(scan);
- 具体使用案例
package hbasetest2;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.filter.CompareFilter.*;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import com.sun.xml.bind.v2.schemagen.xmlschema.List;
import java.util.ArrayList;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
public class HbaseJavaProgramme {
//statement global variable
public static Configuration configuration;
public static Connection connection;
public static Admin admin;
public static Table table;
public static void main(String[] args) {
// TODO Auto-generated method stub
try {
//create configuration
configuration = HbaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "localhost");
configuration.set("hbase.zookeeper.property.clientPort", "2181");
//create connection
connection = ConnectionFactory.createConnection(configuration);
//create table
String[] families = {"StuInfo","Grades"};
createTable("stu2", families);
//read csv and create put object
ArrayList puts = FileOperation.readData("/home/tarena/stuData.csv");
//insert data
table = connection.getTable(TableName.valueOf("stu2"));
table.put(puts);
table.close();
System.out.println("insert data completed!");
//scan data
scanData("stu2");
//scan data with filter
scanDataWithFilter("stu2","6","24");//行键>=6 年龄<24
connection.close();
}catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
}
}
//create table stu2
public static void createTable(String tableString,String[] families) throws Exception{
admin = connection.getAdmin();
TableName tableName = TableName.valueOf(tableString);
if(admin.tableExists(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
HTableDescriptor tdesc = new HTableDescriptor(tableName);
//add family
for(String family:families) {
tdesc.addFamily(new HColumnDescriptor(family.getBytes()));
}
admin.createTable(tdesc);
admin.close();
}
//scan data of table stu2
public static void scanData(String tableNameString) throws Exception{
TableName tableName = TableName.valueOf(tableNameString);
table = connection.getTable(tableName);
Scan scan = new Scan();
ResultScanner resultScanner = table.getScanner(scan);
for(Result result:resultScanner) {
for(Cell cell:result.rawCells()) {
parseCell(cell);
}
}
table.close();
}
//scan data of table stu2 with filters
public static void scanDataWithFilter(String tableNameString,String rowString,String valueString) throws Exception{
TableName tableName = TableName.valueOf(tableNameString);
table = connection.getTable(tableName);
Scan scan = new Scan();
Filter f1 = new RowFilter(CompareOp.GREATER_OR_EQUAL, new BinaryComparator(rowString.getBytes()));
Filter f2 = new SingleColumnValueFilter("StuInfo".getBytes(), "Age".getBytes(), CompareOp.LESS, new BinaryComparator(valueString.getBytes()));
FilterList filterList = new FilterList(f1,f2);//AND
FilterList filterList2 = new FilterList(Operator.MUST_PASS_ALL);//AND过滤
filterList2.addFilter(f1);
filterList2.addFilter(f2);
scan.setFilter(filterList);
ResultScanner resultScanner = table.getScanner(scan);
for(Result result:resultScanner) {
for(Cell cell:result.rawCells()) {
System.out.println("");
parseCell(cell);
}
}
table.close();
}
//parse Cell
public static void parseCell(Cell cell) throws Exception{
System.out.println("行键:"+ new String(CellUtil.cloneRow(cell)));
System.out.println("列族:"+ new String(CellUtil.cloneFamily(cell)));
System.out.println("列标识:"+ new String(CellUtil.cloneQualifier(cell)));
System.out.println("值:"+ new String(CellUtil.clonevalue(cell)) + "n");
}
}



