新建项目后在pom.xml中添加依赖:
1.2 HbaseAPI代码编写org.apache.hbase hbase-server1.4.13 org.apache.hbase hbase-client1.4.13 junit junit4.12
package com.jackyan.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class TestHbaseApi {
Connection connection = null;
Configuration configuration = null;
String tableName = "jackyan:emp";
@Before
public void getConnect() throws IOException {
configuration = HbaseConfiguration.create();
connection = ConnectionFactory.createConnection(configuration);
System.out.println(connection);
}
@Test
public void createTable() throws IOException {
HbaseAdmin admin = (HbaseAdmin) connection.getAdmin();
boolean b = admin.tableExists(tableName);
if (b) {
System.out.println("表已存在!!!");
}
// 表不存在则创建表
if(!b) {
TableName table = TableName.valueOf(tableName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(table);
HColumnDescriptor family = new HColumnDescriptor("basic");
hTableDescriptor.addFamily(family);
family = new HColumnDescriptor("info");
hTableDescriptor.addFamily(family);
try {
admin.createTable(hTableDescriptor);
} catch (NamespaceNotFoundException e) {
// 命名空间不存在则先创建命名空间
System.out.println("创建命名空间。。。");
NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create("jackyan").build();
admin.createNamespace(namespaceDescriptor);
// 再创建表
admin.createTable(hTableDescriptor);
}
System.out.println("表创建成功!!!");
}
admin.close();
}
@Test
public void deleteTable() throws IOException {
// 判断表是否存在
HbaseAdmin admin = (HbaseAdmin) connection.getAdmin();
// 表如果存在,则删除表
if (admin.tableExists(tableName)) {
// 先disable表
admin.disableTable(tableName);
// 删除表
admin.deleteTable(tableName);
System.out.println("表 " + tableName + " 删除成功!");
} else {
System.out.println("表 " + tableName + " 不存在!");
}
admin.close();
}
@Test
public void addRowData() throws IOException {
// 获取表对象
Table table = connection.getTable(TableName.valueOf(tableName));
String rowkey = "100001";
Put put = new Put(Bytes.toBytes(rowkey));
String family = "basic";
String column = "name";
String value = "tom";
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
family = "basic";
column = "age";
value = "28";
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
family = "basic";
column = "sex";
value = "male";
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
table.put(put);
rowkey = "100002";
put = new Put(Bytes.toBytes(rowkey));
family = "basic";
column = "name";
value = "jack";
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
family = "basic";
column = "age";
value = "26";
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
family = "basic";
column = "sex";
value = "male";
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
family = "info";
column = "email";
value = "12345678@qq.com";
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
family = "info";
column = "phone";
value = "12345678";
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
table.put(put);
table.close();
System.out.println("插入数据成功!");
table.close();
}
@Test
public void getAllRowData() throws IOException {
// 获取用于获取region数据的scan对象
Scan scan = new Scan();
// 获取表对象
Table table = connection.getTable(TableName.valueOf(tableName));
// 获取查询结果集
ResultScanner resultScanner = table.getScanner(scan);
for (Result result : resultScanner) {
// 获取单元格数据
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
System.out.println("rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)));
System.out.println("family:" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("column:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("value:" + Bytes.toString(CellUtil.clonevalue(cell)));
}
}
table.close();
}
@Test
public void getoneRowData() throws IOException {
String rowkey = "100002";
Get get = new Get(Bytes.toBytes(rowkey));
// 获取表对象
Table table = connection.getTable(TableName.valueOf(tableName));
// 获取查询结果集;
Result result = table.get(get);
// 获取单元格数据
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
System.out.println("rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)));
System.out.println("family:" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("column:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("value:" + Bytes.toString(CellUtil.clonevalue(cell)));
}
table.close();
}
@Test
public void getRowQualifier() throws IOException {
String rowkey = "100001";
Get get = new Get(Bytes.toBytes(rowkey));
String family = "basic";
String qualifier = "name";
get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
// 获取表对象
Table table = connection.getTable(TableName.valueOf(tableName));
// 获取查询结果集;
Result result = table.get(get);
// 获取单元格数据
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
System.out.println("rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)));
System.out.println("family:" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("column:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("value:" + Bytes.toString(CellUtil.clonevalue(cell)));
}
table.close();
}
@Test
public void deleteRowData() throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
List deleteList = new ArrayList();
String rowkey = "100001";
Delete delete = new Delete(Bytes.toBytes(rowkey));
deleteList.add(delete);
rowkey = "100002";
delete = new Delete(Bytes.toBytes(rowkey));
deleteList.add(delete);
table.delete(deleteList);
table.close();
System.out.println("删除数据成功!");
}
@After
public void close() throws IOException {
if (connection != null) {
connection.close();
}
System.out.println("关闭资源。。。");
}
}
1.3 HbaseUtil
package com.jackyan.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
public class HbaseUtil {
// 为了线程安全,故使用ThreadLocal
private static ThreadLocal connThreadLocal = new ThreadLocal();
public static void getConnection() {
// 从ThreadLocal中获取连接对象
Connection conn = connThreadLocal.get();
// 如果获取不到连接,则创建连接并放入ThreadLocal中
if (conn == null) {
Configuration conf = HbaseConfiguration.create();
try {
conn = ConnectionFactory.createConnection(conf);
connThreadLocal.set(conn);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void put(String tablename, String rowkey, String famliyname, String column, String value) throws Exception {
// 获取连接
Connection conn = connThreadLocal.get();
// 获取表
Table table = conn.getTable(TableName.valueOf(tablename));
// 将字符串转换成byte[]
byte[] rowkeybyte = Bytes.toBytes(rowkey);
Put put = new Put(rowkeybyte);
put.addColumn(Bytes.toBytes(famliyname), Bytes.toBytes(column), Bytes.toBytes(value));
table.put(put);
table.close();
System.out.println("ok");
}
public static void put(String tablename, String rowkey, String famliyname, Map datamap) throws Exception {
// 获取连接
Connection conn = connThreadLocal.get();
// 获取表
Table table = conn.getTable(TableName.valueOf(tablename));
// 将字符串转换成byte[]
byte[] rowkeybyte = Bytes.toBytes(rowkey);
Put put = new Put(rowkeybyte);
if(datamap != null){
Set> set = datamap.entrySet();
for(Map.Entry entry : set){
String column = entry.getKey();
String value = entry.getValue();
put.addColumn(Bytes.toBytes(famliyname), Bytes.toBytes(column), Bytes.toBytes(value));
}
}
table.put(put);
table.close();
System.out.println("ok");
}
public static String getdata(String tablename, String rowkey, String famliyname, String colum) throws Exception {
// 获取连接
Connection conn = connThreadLocal.get();
// 获取表
Table table = conn.getTable(TableName.valueOf(tablename));
// 将字符串转换成byte[]
byte[] rowkeybyte = Bytes.toBytes(rowkey);
Get get = new Get(rowkeybyte);
Result result =table.get(get);
byte[] resultbytes = result.getValue(famliyname.getBytes(),colum.getBytes());
if(resultbytes == null){
return null;
}
return new String(resultbytes);
}
public static void putdata(String tablename, String rowkey, String famliyname,String colum,String data) throws Exception {
// 获取连接
Connection conn = connThreadLocal.get();
// 获取表
Table table = conn.getTable(TableName.valueOf(tablename));
Put put = new Put(rowkey.getBytes());
put.addColumn(famliyname.getBytes(),colum.getBytes(),data.getBytes());
table.put(put);
}
public static void close() {
Connection conn = connThreadLocal.get();
if (conn != null) {
try {
conn.close();
connThreadLocal.remove();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
1.4 MapReduce
通过Hbase的相关JavaAPI,我们可以实现伴随Hbase操作的MapReduce过程,比如使用MapReduce将数据从本地文件系统导入到Hbase的表中,比如我们从Hbase中读取一些原始数据后使用MapReduce做数据分析。
1.4.1 官方Hbase-MapReduce1.查看Hbase的MapReduce任务的执行
$ bin/hbase mapredcp
2.环境变量的导入
(1)执行环境变量的导入(临时生效,在命令行执行下述操作)
$ export Hbase_HOME=/opt/module/hbase
$ export HADOOP_HOME=/opt/module/hadoop
$ export HADOOP_CLASSPATH=`${Hbase_HOME}/bin/hbase mapredcp`
(2)永久生效:在/etc/profile配置
export Hbase_HOME=/opt/module/hbase export HADOOP_HOME=/opt/module/hadoop
并在hadoop-env.sh中配置:(注意:在for循环之后配)
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/opt/module/hbase/lib
//将读取到的每行数据写入到context中作为map的输出
context.write(key, put);
}
}
WriteDataToTableMRReducer
public class WriteDataToTableMRReducer extends TableReducer{ @Override protected void reduce(ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException { for (Put put : values) { context.write(NullWritable.get(), put); } } }
2、打包
此处需要打成可运行jar包
选中需要打包的项目或模块-->Project Structure-->Artifacts-->'+'-->JAR-->From modules with dependcies...
选择需要打包的模块,选择main函数所在的主类,选择copy to the output...,最后选择打包配置信息存放的目录,一般选择resoureces目录
然后选择Build-->Build Artifacts...
最后build,build成功之后的jar包在主项目根目录下的out目录下
3、将打包后的jar目录上传到linux环境运行
运行命令
yarn jar jar/hbase_jar/hbase.jar1.4.3 从hdfs上的文件将数据插入到hbase表中
代码编写
File2TableApplication
public class File2TableApplication {
public static void main(String[] args) throws Exception{
// TooRunner可以运行mapreduce作业
ToolRunner.run(new File2TableMapReduceTool(), args);
}
}
File2TableMapReduceTool
public class File2TableMapReduceTool implements Tool {
public int run(String[] args) throws Exception {
// 获取作业
Job job = Job.getInstance();
job.setJarByClass(File2TableMapReduceTool.class);
Path path = new Path("hdfs://hadoop101:9000/user.csv");
// 设置FileInputFormat
FileInputFormat.addInputPath(job, path);
// FileInputFormat.setInputPaths(job, path);
// mapper
job.setMapperClass(ReadDataFromFileMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
//reducer
TableMapReduceUtil.initTableReducerJob(
"user",
WriteDataToTableMRReducer.class,
job
);
boolean b = job.waitForCompletion(true);
return b ? JobStatus.State.SUCCEEDED.getValue() : JobStatus.State.FAILED.getValue();
}
public void setConf(Configuration conf) {
}
public Configuration getConf() {
return null;
}
}
ReadDataFromFileMapper
public class ReadDataFromFileMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split(","); String rowkey = fields[0]; String name = fields[1]; String age = fields[2]; // 初始化rowkey ImmutableBytesWritable rowkeyWritable = new ImmutableBytesWritable(Bytes.toBytes(rowkey)); // 创建put对象 Put put = new Put(Bytes.toBytes(rowkey)); // 增加列族:列:值 put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name)); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(age)); context.write(rowkeyWritable, put); } }
WriteDataToTableMRReducer
public class WriteDataToTableMRReducer extends TableReducer{ @Override protected void reduce(ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException { for (Put put : values) { context.write(NullWritable.get(), put); } } }
接下来是打可运行jar包及运行,打包步骤及运行方法详见上一个示例。



