了解Hbase基本概念了解Hbase架构掌握Hbase集群环境搭建掌握Hbase中shell操作掌握Hbase中java API操作掌握rowKey的设计原则【延伸】MapReduce操作Hbase【延伸】spark操作Hbase 2.Hbase基础 2.1.基本概念 2.1.1.介绍
官网地址:http://hbase.apache.org/
Hbase在Hadoop之上提供了类似于Bigtable的能力,是BigTable的开源实现版本,通过java编程语言实现 。Hbase不同于关系数据库,它是一个适合于非结构化数据存储的数据库 。
Hbase是一个分布式的、面向列的开源数据库 ,是一个NoSQL的分布式数据存储系统,实现对大型数据的实时、随机读写访问。
Hbase利用Hadoop HDFS作为文件存储系统,利用Hadoop MapReduce来处理Hbase中的海量数据 。
Hbase依赖于hdfs做底层的数据存储Hbase依赖于mapReduce做数据计算Hbase依赖于zookeeper做服务协调
2.1.2.特点
提供高可靠、高性能、列存储、可伸缩、实时读写的nosql数据库系统
介于NoSQL和RDBMS之间,仅能通过主键(row key)和主键的范围(range)来检索数据主要用来存储非结构化和半结构化的松散数据Hbase中底层数据存储都是字节数组:byte[]
Hbase中表的特点
大:一个表可以有上十亿行,上百万列
面向列:面向**列(族)**的存储和权限控制,列(簇)独立检索
稀疏:空(null)列不占用存储空间,表可以设计的非常稀疏
无模式:每行都有一个可排序的主键和任意多的列,列可以动态增加,不同的行可以有不同的列
和行式数据库的区别
行式数据库在分析的时候,操作整行数据。比如将字段id,name,age,sex,score等完整的信息读入内存,造成大量的内存和IO资源浪费列式数据库是把行式数据库全部拆开,按照列的方式重新组合存储,一列所有的行的数据存放在一起。带来的好处就是,比如要分析男女信息,就直接访问所有的男女信息;要分析销售额,就直接访问消费额相关的数据。
2.2.Hbase表结构
表(table):用于存储管理数据,具有稀疏的、面向列的特点。Hbase中的每一张表,就是所谓的大表(Bigtable),可以有上十亿行,上百万列。对于值为空的列,并不占用存储空间,因此表可以设计的非常稀疏
行键(RowKey):类似于mysql中的主键,Hbase根据行键来快速检索数据,一个行键对应一条记录。
列簇(ColumnFamily):是列的集合。列簇在表定义时需要指定,列在插入数据时动态指定。列中的数据都是以二进制形式存在,没有数据类型。在物理存储结构上,每个表中的每个列簇单独以一个文件存储,一个表可以有多个列簇(官方推荐小于等于3个)
时间戳(TimeStamp):是列的一个属性,是一个64位整数。由行键和列确定的单元格(Cell),可以存储多个数据,每个数据含有时间戳属性,数据具有版本特性。可根据版本(VERSIONS)或时间戳来指定查询历史版本数据,如果都不指定,则默认返回最新版本的数据
参考案例:
mysql数据库表:
转换对应成Hbase表:
2.3.Hbase整体架构 2.3.1.Client通过 RPC机制与HMaster和HRegionServer进行通信Client与HMaster进行通信进行管理类操作Client与HRegionServer进行数据读写类操作 2.3.2.Zookeeper
保证任何时候,集群中只有一个运行状态的 Master,避免脑裂问题存储所有Region的寻址入口,包括**-ROOT-**表地址、HMaster地址实时监控Region Server的状态,将Region server的上线和下线信息,实时通知给Master 2.3.3.HMaster
可以启动多个HMaster,通过Zookeeper的Master Election机制保证有且仅有一个Master提供服务作用:
为Region server分配region负责region server的负载均衡发现失效的region serve并重新分配其上的regionHDFS上的垃圾文件回收 2.3.4.HRegionServer
Hbase中最核心的模块,主要负责响应用户I/O请求,向HDFS文件系统中读写数据作用:
维护Master分配给它的region,处理对这些region的IO请求负责切分在运行过程中变得过大的regionHRegionServer管理一些列HRegion对象,每个HRegion对应Table中一个Region,HRegion由多个HStore组成,每个HStore对应Table中一个Column Family的存储,Column Family就是一个集中的存储单元集 2.3.5.HStore
Hbase存储的核心,由MemStore和StoreFile组成。
用户写入数据的流程为:client写入 -> 存入MemStore,一直到MemStore满 -> Flush成一个StoreFile,直至增长到一定阈值 -> 触发Compact合并操作 -> 多个StoreFile合并成一个StoreFile,同时进行版本合并和数据删除 -> 当StoreFiles Compact后,逐步形成越来越大的StoreFile -> 单个StoreFile大小超过一定阈值后,触发Split操作,把当前Region Split成2个Region,原先的Region会下线,新Split出的2个孩子Region会被HMaster分配到相应的HRegionServer上,使得原先1个Region的压力得以分流到2个Region上。如图所示:
2.3.6.HRegion一个表最开始存储的时候,是一个Region。
一个Region中会有个多个store,每个store用来存储一个列簇。如果只有一个column family,就只有一个store。Region会随着插入的数据越来越多,会进行拆分。默认大小是10G一个。
2.3.7.HLog在分布式系统环境中,无法避免系统出错或者宕机,一旦HRegionServer意外退出,MemStore中的内存数据就会丢失,引入HLog就是防止这种情况。
工作机制:每个HRegionServer中都会有一个HLog对象,HLog是一个实现Write Ahead Log的类,每次用户操作写入Memstore的同时,也会写一份数据到HLog文件,HLog文件定期会滚动出新,并删除旧的文件(已持久化到StoreFile中的数据)。
当HRegionServer意外终止后,HMaster会通过Zookeeper感知,HMaster首先处理遗留的HLog文件,将不同Region的log数据拆分,分别放到相应Region目录下,然后再将失效的Region重新分配,领取到这些Region的HRegionServer在Load Region的过程中,会发现有历史HLog需要处理,因此会Replay HLog中的数据到MemStore中,然后flush到StoreFiles,完成数据恢复。
2.4.查询路由Hbase中存有两张特殊的表,-ROOT-和.meta.:
.meta.:记录了用户表的Region信息,.meta.可以有多个Regoin-ROOT-:记录了.meta.表的Region信息,-ROOT-只有一个Region。Zookeeper中记录了-ROOT-表的location
客户端访问数据流程:
访问zookeeper->访问-ROOT-表->访问.meta.表->找到用户数据的位置,再去访问数据
3.Hbase集群部署 3.1.选择jdk版本和hadoop版本参考地址:http://hbase.apache.org/1.2/book.html
jdk版本:
hadoop版本:
3.2.下载Hbase参考地址:http://mirrors.hust.edu.cn/apache/hbase/
3.3.上传Hbase到linux主机,并解压cd /export/softwares tar -zxvf hbase-1.3.1-bin.tar.gz -C /export/servers #解压完成后删除压缩包 rm -rf hbase-1.3.1-bin.tar.gz #重新命名hbase cd /export/servers mv hbase-1.3.1 hbase1313.4.修改配置文件 3.4.1.regionservers
说明:配置HRegionservers节点列表。
cd /export/servers/hbase131/conf vi regionservers node02 node033.4.2.hbase-env.sh
说明:配置jdk环境变量,配置集群通过zookeeper管理,不由hbase自己管理。
#配置jdk环境变量 export JAVA_HOME=/export/servers/jdk1.8.0_141 #配置集群交给zookeeper管理 export Hbase_MANAGES_ZK=false3.4.3.hbase-site.xml
说明:hbase的核心配置文件。
vim hbase-site.xml3.4.4. backup-mastershbase.rootdir hdfs://node01:8020/hbase hbase.cluster.distributed true hbase.master.port 16000 hbase.zookeeper.quorum node01,node02,node03 hbase.zookeeper.property.clientPort 2181
说明:配置备用master节点。backup-masters需要手动创建。
vim backup-masters node023.4.5.拷贝hadoop中hdfs的核心配置文件到hbase
cp /export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop/hdfs-site.xml /export/servers/hbase131/conf/ cp /export/servers/hadoop-2.6.0-cdh5.14.0/etc/hadoop/core-site.xml /export/servers/hbase131/conf/3.5.把配置好的Hbase远程拷贝到其它主机节点
scp -r /export/servers/hbase131 root@node02:/export/servers scp -r /export/servers/hbase131 root@node03:/export/servers3.6.配置环境变量
vim /etc/profile
export Hbase_HOME=/export/servers/hbase131
export PATH=$PATH:${Hbase_HOME}/bin
#让环境变量生效
source /etc/profile
3.7.启动Hbase集群
3.7.1.系统时钟同步
说明:Hbase对于集群时间的同步要求非常严格,要求相差不要超过30秒。启动集群前一定要同步系统时间。
ntpdate -u 0.uk.pool.ntp.org3.7.2.启动zookeeper集群
/export/servers/zookeeper-3.4.5-cdh5.14.0/bin/zkServer.sh start3.7.3.启动hdfs 3.7.4.启动hbase集群
/export/servers/hbase131/bin/start-hbase.sh3.7.5.访问hbase的web界面
地址:http://192.168.53.100:16010/master-status
3.7.6.如何启动单节点进程说明:如果某些主机节点进程未启动正常,可以通过以下方式进行单独启动。
#单独启动master hbase-daemon.sh start master #单独启动regionserver hbase-daemon.sh start regionserver4.Hbase shell操作 4.1.shell交互界面整体操作 4.1.1.启动shell交互界面
cd /export/servers/hbase131/bin #启动shell交互界面 hbase shell #查看集群状态信息 status #查看版本信息 version4.1.2.退出shell交互界面
exit4.1.3.查看帮助信息
#查看全部帮助信息 help #查看指定的命令帮助信息 help "create"
图一:
图二:
4.2.操作名称空间namespace说明:在hbase中,namespace相当于关系数据库的database。用于从逻辑上对表进行分组。hbase系统默认有两个缺省的名称空间:
hbase:系统内建表,包括namespace和meta表default:创建表时,如果不指定namespace的表 ,则在default名称空间下
#列出所有名称空间 list_namespace #查看指定名称空间 describe_namespace 'hbase' #创建名称空间 create_namespace 'my_ns' #删除名称空间 drop_namespace 'my_ns'4.3.操作表table 4.3.1.案例
说明:将关系数据库中的表,转换成hbase中的表进行操作。
mysql数据库表:
hbase表:
4.3.2.命令操作#列出所有表
list
#创建表(create '名称空间:表名称','列簇')
create 'my_ns:user','base_info'
#插入数据(put '名称空间:表名称','行键','列簇:列名','数据')
put 'my_ns:user','rowkey_10','base_info:username','张三'
put 'my_ns:user','rowkey_10','base_info:birthday','2014-07-10'
put 'my_ns:user','rowkey_10','base_info:sex','1'
put 'my_ns:user','rowkey_10','base_info:address','北京市'
put 'my_ns:user','rowkey_16','base_info:username','张小明'
put 'my_ns:user','rowkey_16','base_info:birthday','2014-07-10'
put 'my_ns:user','rowkey_16','base_info:sex','1'
put 'my_ns:user','rowkey_16','base_info:address','北京'
put 'my_ns:user','rowkey_22','base_info:username','陈小明'
put 'my_ns:user','rowkey_22','base_info:birthday','2014-07-10'
put 'my_ns:user','rowkey_22','base_info:sex','1'
put 'my_ns:user','rowkey_22','base_info:address','上海'
put 'my_ns:user','rowkey_24','base_info:username','张三丰'
put 'my_ns:user','rowkey_24','base_info:birthday','2014-07-10'
put 'my_ns:user','rowkey_24','base_info:sex','1'
put 'my_ns:user','rowkey_24','base_info:address','河南'
put 'my_ns:user','rowkey_25','base_info:username','陈大明'
put 'my_ns:user','rowkey_25','base_info:birthday','2014-07-10'
put 'my_ns:user','rowkey_25','base_info:sex','1'
put 'my_ns:user','rowkey_25','base_info:address','西安'
#查询表中的数据
scan 'my_ns:user'
#统计表中行的数量
count 'my_ns:user'
#获取指定行键的数据
get 'my_ns:user','rowkey_10'
#获取指定列簇的数据
get 'my_ns:user','rowkey_10','base_info'
#获取指定列簇中某一列的数据
get 'my_ns:user','rowkey_10','base_info:username'
#获取指定列簇中多列数据
get 'my_ns:user', 'rowkey_10', {COLUMN => ['base_info:username','base_info:sex']}
#删除指定行键,指定列的数据
delete 'my_ns:user','rowkey_10','base_info:username'
#删除指定行键的所有数据
deleteall 'my_ns:user','rowkey_10'
#清空表的数据(慎用)
truncate 'my_ns:user'
#浏览表结构
describe 'my_ns:user'
#操作列簇
alter 'my_ns:user',NAME=>'ext_info'
alter 'my_ns:user','delete'=>'ext_info'
#测试表是否存在
exists 'my_ns:user'
#删除表(首先要禁用表)
disable 'my_ns:user'
drop 'my_ns:user'
5.Hbase Java API操作
5.1.搭建环境
5.1.1.创建项目
5.1.2.导入依赖
5.1.3.准备hbase-site.xml配置文件4.0.0 cn.liny spark-teach-day08-01hbase 1.0-SNAPSHOT jar 1.3.1 org.apache.hbase hbase-client ${hbase.version}
说明:在resources目录下创建hbase-site.xml,方便通过配置文件信息创建Hbase连接对象。
5.2封装HbaseUtil工具类hbase.zookeeper.quorum node01,node02,node03
package cn.liny.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.List;
public class HbaseUtil {
// 集群配置信息
private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum";
private static final String ZK_CONNECT_VALUE = "node01:2181,node02:2181,node03:2181";
// 操作集群对象
private static Connection conn = null;
private static Admin admin = null;
//集群环境信息=============================================================================start
public static Connection getConnection(){
// 1.创建一个可以用来管理hbase配置信息的conf对象
Configuration conf = new Configuration();
conf.set(ZK_CONNECT_KEY,ZK_CONNECT_VALUE);
// 2.创建返回Connection对象
try {
if(conn == null){
conn = ConnectionFactory.createConnection(conf);
}
} catch (IOException e) {
System.out.println("创建Connection连接对象发生异常:"+e.getMessage());
}
return conn;
}
public static Admin getAdmin(){
try {
admin = conn.getAdmin();
} catch (IOException e) {
System.out.println("获取管理员Admin对象发生异常:"+e.getMessage());
}
return admin;
}
public static void closeConnection(){
if(conn !=null){
try {
conn.close();
} catch (IOException e) {
System.out.println("释放Connection对象发生异常:"+e.getMessage());
}
}
}
//集群环境信息=============================================================================end
//操作名称空间=============================================================================start
public static void createNameSpace(String ns_name){
// 标识名称空间是否存在,默认
boolean isExists = false;
try {
// 查询已经存在的名称空间
NamespaceDescriptor[] list = admin.listNamespaceDescriptors();
for(NamespaceDescriptor ns:list){
if(ns.getName().equals(ns_name)){
System.out.println("目标名称空间:"+ns_name+"已经存在");
isExists = true;
break;
}
}
// 名称空间不存在,则创建新的名称空间
if(!isExists){
System.out.println("目标名称空间:"+ns_name+"不存在,正在准备创建");
NamespaceDescriptor newNameSpace = NamespaceDescriptor.create(ns_name).build();
admin.createNamespace(newNameSpace);
}
} catch (IOException e) {
System.out.println("创建名称空间发生异常:"+e.getMessage());
}
}
public static void deleteNameSpace(String ns_name){
System.out.println("正在准备删除名称空间:"+ns_name);
try {
admin.deleteNamespace(ns_name);
} catch (IOException e) {
System.out.println("删除名称空间发生异常:"+e.getMessage());
}
}
//操作名称空间=============================================================================end
//表DDL操作=============================================================================start
public static void createTable(String t_name,String ...family){
// 创建表,必须要指定列簇
if(family.length<=0){
System.out.println("创建表,必须要指定列簇");
return;
}
// 定义表名称
TableName tableName = TableName.valueOf(t_name);
try {
// 判断表是否存在
if(admin.tableExists(tableName)){
System.out.println("表"+t_name+"已经存在,不需要创建");
return;
}
// 如果表不存在,则执行创建
System.out.println("表"+t_name+"不存在,正在创建");
// 定义表的列簇集
HTableDescriptor htd = new HTableDescriptor(tableName);
// 添加列簇
HColumnDescriptor tmpColumn = null;
for(String hc:family){
tmpColumn= new HColumnDescriptor(hc);
htd.addFamily(tmpColumn);
}
// 创建表
admin.createTable(htd);
} catch (IOException e) {
System.out.println("创建表发生异常:"+e.getMessage());
}
}
public static void getTableInfo(String t_name) throws Exception{
// 定义表名称对象
TableName tableName = TableName.valueOf(t_name);
// 判断表是否存在
if(admin.tableExists(tableName)){
// 获取表的列簇集
HTableDescriptor htd = admin.getTableDescriptor(tableName);
// 获取表的列簇
HColumnDescriptor[] columnFamilies = htd.getColumnFamilies();
// 打印表的详情信息
for(HColumnDescriptor hcd:columnFamilies){
System.out.println(hcd.getNameAsString());
}
}else{
System.out.println("表"+tableName.getNameAsString()+"不存在。");
}
}
public static void updateTable(String t_name,String family,boolean addOrDelete){
// 定义表名称对象
TableName tableName = TableName.valueOf(t_name);
try{
// 判断表是否存在
if(admin.tableExists(tableName)){
// 细节:对表进行修改,需要先禁用表,修改完成后再启用表
admin.disableTable(tableName);
// 获取表描述对象
HTableDescriptor htd = admin.getTableDescriptor(tableName);
// 判断是添加还是删除
if(addOrDelete){
// 正在添加列簇
System.out.println("正在添加列簇:"+family);
// 定义列簇对象
HColumnDescriptor hcd = new HColumnDescriptor(family);
htd.addFamily(hcd);
}else{
// 正在删除列簇
System.out.println("正在删除列簇:"+family);
htd.removeFamily(Bytes.toBytes(family));
}
// 修改表
admin.modifyTable(tableName,htd);
}else{
System.out.println("表【"+tableName.getNameAsString()+"】不存在.");
}
}catch(Exception e){
System.out.println("修改表发生异常:"+e.getMessage());
}finally {
// 重新启用表
try {
admin.enableTable(tableName);
} catch (IOException e) {
System.out.println("重新启用表发生异常:"+e.getMessage());
}
}
}
public static void deleteTable(String t_name){
// 定义表名称对象
TableName tableName = TableName.valueOf(t_name);
try {
// 判断表是否存在
if(admin.tableExists(tableName)){
// 禁用表
admin.disableTable(tableName);
// 删除表
admin.deleteTable(tableName);
}else{
System.out.println("表"+tableName.getNameAsString()+"不存在.");
}
} catch (IOException e) {
System.out.println("删除表发生异常:"+e.getMessage());
}
}
//表DDL操作=============================================================================end
//表DML操作=============================================================================start
public static void put(String t_name,String rowKey,String family,String[] ck,String[] cv){
// 定义表的名称对象
TableName tableName = TableName.valueOf(t_name);
try {
// 获取表
Table table = conn.getTable(tableName);
// 根据行键创建Put对象
Put put = new Put(Bytes.toBytes(rowKey));
// 准备数据
for(int i=0;i cells = result.listCells();
for(Cell c:cells){
//打印rowKey,family,qualifier,value
System.out.println(Bytes.toString(CellUtil.cloneRow(c))// 行键
+ "==> " + Bytes.toString(CellUtil.cloneFamily(c))// 列簇
+ "{" + Bytes.toString(CellUtil.cloneQualifier(c))// 列
+ ":" + Bytes.toString(CellUtil.clonevalue(c)) + "}");// 值
}
// 释放资源
table.close();
} catch (IOException e) {
System.out.println("根据rowKey获取数据发生异常:"+e.getMessage());
}
}
public static void scan(String t_name,String ... start_end_rowKey){
// 定义表的名称
TableName tableName = TableName.valueOf(t_name);
try {
// 获取表
Table table = conn.getTable(tableName);
// 定义扫描器
Scan scan = new Scan();
// 设置扫描范围
if(start_end_rowKey.length>1){
scan.setStartRow(Bytes.toBytes(start_end_rowKey[0]));
scan.setStopRow(Bytes.toBytes(start_end_rowKey[1]));
}
// 获取扫描结果集
ResultScanner scanner = table.getScanner(scan);
Result result = null;
while ((result = scanner.next()) != null){
// 获取单元格
List cells = result.listCells();
// 打印 rowKey,family,qualifier,value
for(Cell c:cells){
System.out.println(Bytes.toString(CellUtil.cloneRow(c))
+ "==> " + Bytes.toString(CellUtil.cloneFamily(c))
+ "{" + Bytes.toString(CellUtil.cloneQualifier(c))
+ ":" + Bytes.toString(CellUtil.clonevalue(c)) + "}");
}
}
// 释放资源
table.close();
} catch (IOException e) {
System.out.println("范围扫描数据发生异常:"+e.getMessage());
}
}
public static void del(String t_name,String rowKey,String ... family_qualifier){
// 定义表名称
TableName tableName=TableName.valueOf(t_name);
try {
// 获取表
Table table = conn.getTable(tableName);
// 创建删除对象
Delete delete = new Delete(Bytes.toBytes(rowKey));
// 明确删除某一列
if(family_qualifier.length>1){
delete.addColumn(Bytes.toBytes(family_qualifier[0]),Bytes.toBytes(family_qualifier[1]));
}
// 执行删除
table.delete(delete);
// 释放资源
table.close();
} catch (IOException e) {
System.out.println("删除表数据发生异常:"+e.getMessage());
}
}
//表DML操作=============================================================================end
//Filter查询===========================================================================start
public static void scanByFilter(String t_name,Filter filter){
// 定义表的名称
TableName tableName = TableName.valueOf(t_name);
try {
// 获取表
Table table = conn.getTable(tableName);
// 定义扫描器
Scan scan = new Scan();
scan.setFilter(filter);
// 执行扫描
ResultScanner scanner = table.getScanner(scan);
Result result = null;
while ((result = scanner.next()) != null){
// 获取单元格
List| cells = result.listCells();
// 打印 rowKey,family,qualifier,value
for(Cell c:cells){
System.out.println(Bytes.toString(CellUtil.cloneRow(c))
+ "==> " + Bytes.toString(CellUtil.cloneFamily(c))
+ "{" + Bytes.toString(CellUtil.cloneQualifier(c))
+ ":" + Bytes.toString(CellUtil.clonevalue(c)) + "}");
}
}
// 释放资源
table.close();
} catch (IOException e) {
System.out.println("过滤查询数据发生异常:"+e.getMessage());
}
}
//Filter查询=============================================================================end
}
| |
5.3.操作名称空间
package cn.liny.hbase;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
public class HbaseNameSpace {
public static void main(String[] args) throws Exception{
// 1.获取连接对象
Connection conn = HbaseUtil.getConnection();
// 2.获取管理员对象Admin,执行管理任务操作
Admin admin = HbaseUtil.getAdmin();
// 3.操作名称空间namespace
// 3.1.列出所有的名称空间
System.out.println("-----------------3.1.列出所有名称空间--------------------");
NamespaceDescriptor[] list = admin.listNamespaceDescriptors();
for(NamespaceDescriptor ns:list){
System.out.println("名称空间:"+ns.getName());
}
// 3.2.创建名称空间
System.out.println("-----------------3.2.创建名称空间-------------------------");
HbaseUtil.createNameSpace("my_ns1");
// 3.3.删除名称空间
System.out.println("-----------------3.3.删除名称空间-------------------------");
HbaseUtil.deleteNameSpace("my_ns2");
// 2.关闭连接对象
HbaseUtil.closeConnection();
}
}
5.4.操作表
5.4.1.表DDL操作
package cn.liny.hbase;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
public class HbaseTableDDL {
public static void main(String[] args) throws Exception {
// 1.获取连接对象
Connection conn = HbaseUtil.getConnection();
// 2.获取管理员对象Admin,执行管理任务操作
Admin admin = HbaseUtil.getAdmin();
// 3.表DDL操作
// 3.1.查询所有表
System.out.println("----------------------3.1.查询所有表------------------------");
TableName[] tableNames = admin.listTableNames();
for(TableName t:tableNames){
System.out.println("表名称:"+t.getNameAsString());
}
// 3.2.创建表
System.out.println("----------------------3.2.创建表-----------------------------");
HbaseUtil.createTable("student","base_info");
// 3.3.获取表的详情信息
System.out.println("----------------------3.3.获取表的详情信息---------------------");
HbaseUtil.getTableInfo("student");
// 3.4.修改表
System.out.println("----------------------3.4.修改表-----------------------------");
HbaseUtil.updateTable("student","ext_info",true);
// 3.5.删除表
System.out.println("----------------------3.5.删除表-----------------------------");
HbaseUtil.deleteTable("student");
// 关闭连接对象
HbaseUtil.closeConnection();
}
}
5.4.2.表DML操作
package cn.liny.hbase;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
public class HbaseTableDML {
public static void main(String[] args) throws Exception{
// 1.获取连接对象
Connection conn = HbaseUtil.getConnection();
// 2.获取管理员对象Admin,执行管理任务操作
Admin admin = HbaseUtil.getAdmin();
// 3.表DML操作
// 3.1.创建表
System.out.println("----------------------3.1.创建表-----------------------------");
HbaseUtil.createTable("my_ns:user","base_info");
// 3.2.添加(更新)数据
System.out.println("----------------------3.2.添加(更新)数据---------------------");
// 准备数据
String row_key="rowkey_10";
String[] ck = {"username","sex","address","birthday"};
String[] cv = {"张三","1","北京市","2014-07-10"};
HbaseUtil.put("my_ns:user",row_key,"base_info",ck,cv);
// 3.3.根据rowKey获取数据
System.out.println("----------------------3.3.根据rowKey获取数据---------------------");
HbaseUtil.get("my_ns:user",row_key);
// 3.4.范围扫描数据
System.out.println("----------------------3.4.范围扫描数据----------------------------");
//HbaseUtil.scan("my_ns:user");// 全表扫描
HbaseUtil.scan("my_ns:user","rowkey_16","rowkey_26");// 指定rowKey范围扫描
// 3.5.删除数据
System.out.println("----------------------3.5.删除数据----------------------------");
// HbaseUtil.del("my_ns:user","rowkey_16");// 删除整行
HbaseUtil.del("my_ns:user","rowkey_22","base_info","address");// 明确删除列
// 关闭连接对象
HbaseUtil.closeConnection();
}
}
5.5.Hbase过滤查询
5.5.1.关于过滤查询
基础API中的查询操作,在面对大量数据的时候是非常苍白的,Hbase提供了高级的查询方法:Filter。
Filter根据簇、列、版本等更多的条件来对数据进行过滤,基于Hbase提供的三维有序(主键有序、列有序、版本有序),这些Filter可以高效的完成查询过滤的务。
要完成过滤的操作,至少需要两个参数:
抽象操作符
LESS :<
LESS_OR_EQUAL: <=
EQUAL: =
NOT_EQUAL: <>
GREATER :>
GREATER_OR_EQUAL :>=
NO_OP :排除所有
比较器
BinaryComparator :按字节索引顺序比较指定字节数组,采用 Bytes.compareTo(byte[])
BinaryPrefixComparator: 与BinaryComparator相同,仅比较左端的数据是否相同
NullComparator :判断给定的是否为空
BitComparator: 按位比较
SubstringComparator: 判断提供的子串是否出现在 value 中
5.5.2.过滤查询案例以下案例演示了常用过滤器的使用:
行键过滤器:RowFilter列簇过滤器:FamilyFilter列过滤器:QualifierFilter值过滤器:ValueFilter行键前缀过滤器:PreFixFilter列前缀过滤器:ColumnPrefixFilter
package cn.liny.hbase;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.filter.*;
public class HbaseFilter {
public static void main(String[] args) {
// 1.获取连接对象
Connection conn = HbaseUtil.getConnection();
// 2.行键过滤器:RowFilter
System.out.println("----------------------2.行键过滤器:RowFilter-----------------------------");
// 定义过滤器
Filter rowFilter = new RowFilter(CompareFilter.CompareOp.GREATER, new BinaryComparator("rowkey_10".getBytes()));
HbaseUtil.scanByFilter("my_ns:user",rowFilter);
//3.列簇过滤器: FamilyFilter
System.out.println("----------------------3.列簇过滤器: FamilyFilter-----------------------------");
// 定义过滤器
Filter familyFilter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator("ext_info".getBytes()));
HbaseUtil.scanByFilter("my_ns:user",familyFilter);
//4.列过滤器 :QualifierFilter
System.out.println("----------------------4.列过滤器 :QualifierFilter-----------------------------");
// 定义过滤器
Filter qualifierFilter = new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator("username".getBytes()));
HbaseUtil.scanByFilter("my_ns:user",qualifierFilter);
//5.值过滤器 :ValueFilter
System.out.println("----------------------5.值过滤器 :ValueFilter-----------------------------");
// 定义过滤器
Filter valueFilter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("张三"));
HbaseUtil.scanByFilter("my_ns:user",valueFilter);
// 6.前缀过滤器:PrefixFilter----针对行键
System.out.println("----------------------6.前缀过滤器:PrefixFilter----针对行键-----------------------------");
// 定义过滤器
Filter prefixFilter = new PrefixFilter("rowkey_1".getBytes());
HbaseUtil.scanByFilter("my_ns:user",prefixFilter);
// 7.列前缀过滤器:ColumnPrefixFilter
System.out.println("----------------------7.列前缀过滤器:ColumnPrefixFilter-----------------------------");
// 定义过滤器
Filter columnPrefixFilter = new ColumnPrefixFilter("a".getBytes());
HbaseUtil.scanByFilter("my_ns:user",columnPrefixFilter);
// 关闭连接对象
HbaseUtil.closeConnection();
}
}
6.Hbase的rowkey设计原则
Hbase是三维有序存储的,通过rowkey(行键),column key(column family和qualifier)和TimeStamp(时间戳)这个三个维度可以对Hbase中的数据进行快速定位。
Hbase中rowkey可以唯一标识一行记录,三种查询方式
通过get方式,指定rowkey获取唯一一条记录通过scan方式,设置startRow和stopRow参数进行范围匹配全表扫描,即直接扫描整张表中所有行记录 6.1.Rowkey长度原则
rowkey是一个二进制码流,可以是任意字符串,最大长度64kb,实际应用中一般为10-100bytes,以byte[]形式保存,一般设计成定长。
建议越短越好,不要超过16个字节
案例说明:
数据的持久化文件HFile中是按照Key/Value存储的,如果rowkey过长,比如超过100字节,1000w行数据,光rowkey就要占用100*1000w=10亿个字节,将近1G数据,这样会极大影响HFile的存储效率;MemStore将缓存部分数据到内存,如果rowkey字段过长,内存的有效利用率就会降低,系统不能缓存更多的数据,这样会降低检索效率。
目前操作系统都是64位系统,内存8字节对齐,控制在16个字节,8字节的整数倍利用了操作系统的最佳特性。
6.2.Rowkey 散列原则如果rowkey按照时间戳的方式递增,不要将时间放在二进制码的前面,建议将rowkey的高位作为散列字段,由程序随机生成,低位放时间字段,这样将提高数据均衡分布在每个RegionServer,以实现负载均衡的几率。
如果没有散列字段,首字段直接是时间信息,所有的数据都会集中在一个RegionServer上,这样在数据检索的时候负载会集中在个别的RegionServer上,造成热点问题,会降低查询效率。
6.3.Rowkey唯一原则必须在设计上保证其唯一性,rowkey是按照字典顺序排序存储的,因此,设计rowkey的时候,要充分利用这个排序的特点,将经常读取的数据存储到一块,将最近可能会被访问的数据放到一块。但是这里的量不能太大,如果太大需要拆分到多个节点上去 。
6.4.具体设计参考 6.4.1.加盐加盐指的是在rowkey的前面分配随机数,当给rowkey随机前缀后,它就能分布到不同的region中,这里的前缀应该和你想要数据分散的不同的region的数量有关。
案例需求
以电信公司为例 。当我们去电信公司打印通话记录。对于通话记录来说,每个人每月可能都有很多通话记录,同时电信的用户以亿计。这样的海量信息,可以考虑存入hbase当中 。那么在hbase中如何设计这样一张表呢?
确定需要保存的信息
对于通话记录 ,有主叫和被叫 信息,有通话时长,有通话时间 ,有主叫的位置信息,和被叫的位置信息 。那么可以确定要保存的信息:主叫、被叫、时长、时间、主叫位置、被叫位置
如何设计hbase表
hbase表是依靠rowkey来定位的,我们应该尽可能多的将查询的信息编入rowkey当中。设计通话记录的rowkey时,需要将能唯一确定该条记录的数据编入rowkey当中:主叫、被叫、时间
17766657999 18699886666 201806121502 #主叫,被叫,时间
该rowkey合理吗
该rowkey可能会造成热点问题。例如电话以177开头,hbase集群有100台,那么该数据可能只被存入一台或者两台机器的region中。当需要打印通话记录时,就只有一台机器提供服务。而如果数据均匀分散到100机器中,则是整个集群提供服务。两者之间效率速度差了不止一个数量级 。
因此我们可以考虑:hbase集群有100台,可以把在0-99之间中的随机数字,添加到rowkey首部 。
10 17766657999 18699886666 201806121502 #随机数,主叫,被叫,时间6.4.2.预分区
hbase会自动处理region拆分,当region的大小到达一定阈值后,region将被拆分成两个。这个过程当中,会出现两个问题:
第一点还是热点问题,数据会继续往一个region中写,出现写热点问题第二点是拆分合并风暴,当用户的region大小以恒定的速度增长,region的拆分会在同一时间发生,因为同时需要压缩region中的存储文件,这个过程会重写拆分后的region,这将会引起磁盘I/O上升 。
要解决以上问题,预分区(预先创建hbase表分区 )是一种很好的方案,可以和加盐结合起来使用。使用预分区需要明确rowkey的取值范围和构成逻辑 。
案例需求:
还是上面电信电话详单表 ,通过加盐我们得到的 rowkey构成是:随机数+主叫+被叫+时间 ,如果当前并没有100台机器,只有10台。但是预计未来将扩展到100台的规模。那么仍然设计0到99的随机数,将以主叫177开头的通话记录分配到十个region当中,将随机数均分成十个区域,范围如下:
-10,10-20,20-30,30-40,40-50,50-60,60-70,70-80,80-90,90-
将预分区存入数组当中,当插入数据时,先根据插入数据的首部随机数,判断分区位置,再进行插入数据。这样也能使得各台节点负载均衡。
6.4.3.哈希哈希会使同一行永远用一个前缀加盐。哈希也可以使负载分散到整个集群。同时读是可以预测的。使用确定的哈希可以让客户端重构完整的rowkey,可以使用get操作准确获取某一个行数据
6.4.4.反转对于手机号码,比较固定开头的rowkey(177,159,188),而后几位都是随机的,没有规律的。可以将手机号反转之后作为rowkey,避免热点问题 。
这就是rowkey设计的另一种方法:反转。通过反转固定长度或者数字格式的rowkey ,把rowkey中经常改变的部分(最没有意义的部分)放在前面 。可以有效的随机rowkey ,不过这样一来就牺牲了rowkey的有序性 。
7.MapReduce操作Hbase 7.1.创建项目 7.2.导入依赖7.3.MapReduce从hdfs读取数据到Hbase 7.3.1.案例代码4.0.0 cn.liny spark-teach-day08-02hbase-hdfs 1.0-SNAPSHOT jar 1.3.1 org.apache.hbase hbase-client ${hbase.version} org.apache.hbase hbase-server ${hbase.version}
package cn.liny.hbase.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class HbaseFromHDFS {
public static void main(String[] args) throws Exception{
// 创建配置对象
Configuration conf = HbaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");
// 创建Job
Job job = Job.getInstance(conf,"HbaseFromHDFS");
job.setJarByClass(HbaseFromHDFS.class);
// 设置map
job.setMapperClass(HDFSToHbaseMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 设置输入目录
FileInputFormat.addInputPath(job, new Path(args[0]));
TableMapReduceUtil.initTableReducerJob(args[1],HDFSToHbaseReducer.class, job);
job.waitForCompletion(true);
}
// HDFSToHbaseMapper
public static class HDFSToHbaseMapper extends Mapper {
private Text outKey = new Text();
private Text outValue = new Text();
// map函数
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] splits = value.toString().split(" ");
outKey.set(splits[0]);
outValue.set(splits[1]+" "+splits[2]+" "+splits[3]+" "+splits[4]);
context.write(outKey, outValue);
}
}
// HDFSToHbaseReducer
public static class HDFSToHbaseReducer extends TableReducer {
// reduce函数
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
Put put = new Put(key.getBytes());
String family="base_info";
for (Text v : values) {
String[] split = v.toString().split(" ");
if(split[0]!=null && !"NULL".equals(split[0])){
put.addColumn(family.getBytes(),"username".getBytes(),split[0].getBytes());
}
if(split[1]!=null && !"NULL".equals(split[1])){
put.addColumn(family.getBytes(),"sex".getBytes(),split[1].getBytes());
}
if(split[2]!=null && !"NULL".equals(split[2])){
put.addColumn(family.getBytes(),"birthday".getBytes(),split[2].getBytes());
}
if(split[3]!=null && !"NULL".equals(split[3])){
put.addColumn(family.getBytes(),"address".getBytes(),split[3].getBytes());
}
}
// 输出
context.write(NullWritable.get(),put);
}
}
}
7.3.2.配置执行环境
准备数据文件
user.txt:
rowkey_10 张三 1 2014-07-10 北京市 rowkey_16 张小明 1 2014-07-10 北京 rowkey_22 陈小明 1 2014-07-10 上海 rowkey_24 张三丰 1 2014-07-10 河南 rowkey_25 陈大明 1 2014-07-10 西安
上传到hdfs:
#创建hdfs目录 hdfs dfs -mkdir -p /anan/hbase/input #上传文件 hdfs dfs -put /usr/local/develop/testdata/user.txt /anan/hbase/input
创建hbase表
create 'my_ns:huser','base_info'
配置hadoop依赖hbase相关包
vi /usr/local/develop/hadoop274/etc/hadoop/hadoop-env.sh
#配置HADOOP_CLASSPATH环境变量,指定hbase依赖包目录
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/usr/local/develop/hbase131/lib
public class HbaseToHDFS {
public static void main(String[] args) throws Exception{
// 创建配置对象
Configuration conf = HbaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181");
// 创建Job
Job job = Job.getInstance(conf,"HbaseToHDFS");
job.setJarByClass(HbaseToHDFS.class);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(args[0], scan, HbaseToHDFSMapper.class, Text.class, Text.class, job);
// 设置输入目录
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
// mapper
public static class HbaseToHDFSMapper extends TableMapper {
Text outKey = new Text();
Text outValue = new Text();
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
StringBuilder builder =new StringBuilder();
boolean isContainsColumn = value.containsColumn("base_info".getBytes(),"username".getBytes());
if(isContainsColumn) {
List listCells = value.getColumnCells("base_info".getBytes(), "username".getBytes());
Cell cell = listCells.get(0);
byte[] clonevalue = CellUtil.clonevalue(cell);
builder.append(Bytes.toString(clonevalue)).append(" ");
}
isContainsColumn = value.containsColumn("base_info".getBytes(),"sex".getBytes());
if(isContainsColumn) {
List listCells = value.getColumnCells("base_info".getBytes(), "sex".getBytes());
Cell cell = listCells.get(0);
byte[] clonevalue = CellUtil.clonevalue(cell);
builder.append(Bytes.toString(clonevalue)).append(" ");
}
isContainsColumn = value.containsColumn("base_info".getBytes(),"birthday".getBytes());
if(isContainsColumn) {
List listCells = value.getColumnCells("base_info".getBytes(), "birthday".getBytes());
Cell cell = listCells.get(0);
byte[] clonevalue = CellUtil.clonevalue(cell);
builder.append(Bytes.toString(clonevalue)).append(" ");
}
isContainsColumn = value.containsColumn("base_info".getBytes(),"address".getBytes());
if(isContainsColumn) {
List listCells = value.getColumnCells("base_info".getBytes(), "address".getBytes());
Cell cell = listCells.get(0);
byte[] clonevalue = CellUtil.clonevalue(cell);
builder.append(Bytes.toString(clonevalue)).append(" ");
}
// 输出
outKey.set(Bytes.toString(key.get()));
outValue.set(builder.toString());
context.write(outKey,outValue);
}
}
//reducer
public static class HbaseToHDFSReducer extends Reducer {
Text outValue = new Text();
protected void reduce(Text key, Iterable values,Context context)
throws IOException, InterruptedException {
outValue.set(values.toString());
context.write(key, outValue);
}
}
}
| | | |
7.4.2.打包,提交任务执行
hadoop提交任务:
hadoop jar /usr/local/develop/testjar/spark-teach-day08-02hbase-hdfs-1.0-SNAPSHOT.jar cn.itheima.hbase.hdfs.HbaseToHDFS my_ns:huser /anan/hbase/output
执行结果:
hdfs dfs -cat /anan/hbase/output/part-r-000008.spark操作Hbase 8.1.创建项目 8.2.导入依赖
8.3.spark写入数据到Hbase 8.3.1.编写案例代码4.0.0 cn.liny spark-teach-day08-03hbase-spark 1.0-SNAPSHOT jar 2.11.8 2.0.2 1.3.1 org.scala-lang scala-library ${scala.version} org.apache.spark spark-streaming_2.11 ${spark.version} org.apache.hbase hbase-client ${hbase.version} org.apache.hbase hbase-server ${hbase.version} org.scala-tools maven-scala-plugin 2.15.2 compile maven-assembly-plugin cn.itheima.hbase.spark.SparkToHbase jar-with-dependencies
package cn.liny.hbase.spark
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HbaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkToHbase {
// 执行入口
def main(args: Array[String]): Unit = {
// 1.创建Hbase配置
val conf: Configuration = HbaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181")
// 2.指定输出格式和输出表名
val jobConf: JobConf = new JobConf(conf,this.getClass)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE,"my_ns:s_user")
// 3.创建SparkContext对象
// 本地测试使用
val sparkConf: SparkConf = new SparkConf().setAppName("SparkToHbase").setMaster("local[2]")
//打包到集群环境使用
//val sparkConf: SparkConf = new SparkConf().setAppName("SparkToHbase")
val sc: SparkContext = new SparkContext(sparkConf)
sc.setLogLevel("WARN")
// 构建RDD数据
val dataRDD = List(("rowkey_1","lilei","16"), ("rowkey_2","hanmeimei","17"), ("rowkey_3","lucy","18"))
val resultRDD: RDD[(ImmutableBytesWritable, Put)] = sc.parallelize(dataRDD).map(convert)
// 写入Hbase表
resultRDD.saveAsHadoopDataset(jobConf)
}
// 定义元组转换函数
// (1,"lilei",18)
def convert(tuple: (String, String, String)) = {
// 定义put
val put = new Put(Bytes.toBytes(tuple._1))
// 添加列数据
put.addColumn(Bytes.toBytes("base_info"),Bytes.toBytes("name"),Bytes.toBytes(tuple._2))
put.addColumn(Bytes.toBytes("base_info"),Bytes.toBytes("age"),Bytes.toBytes(tuple._3))
// 返回
(new ImmutableBytesWritable, put)
}
}
8.3.2.配置执行环境
创建hbase表
create 'my_ns:s_user','base_info' #查询数据 scan 'my_ns:s_user'
如果要在集群环境使用,则需要启动spark集群,提交命令如下:
spark-submit --class cn.itheima.hbase.spark.SparkToHbase --master spark://hadoop01:7077 --executor-memory 1g --total-executor-cores 1 /usr/local/develop/testjar/spark-teach-day08-03hbase-spark-1.0-SNAPSHOT-jar-with-dependencies.jar8.3.3.执行结果 8.4.spark从Hbase读取数据 8.4.1.编写案例代码
package cn.liny.hbase.spark
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HbaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object SparkFromHbase {
// 执行入口
def main(args: Array[String]): Unit = {
// 1.创建Hbase配置
val conf: Configuration = HbaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "hadoop01:2181,hadoop02:2181,hadoop03:2181")
// 2.设置查询的表名
conf.set(TableInputFormat.INPUT_TABLE, "my_ns:s_user")
// 3.创建SparkContext对象
val sparkConf: SparkConf = new SparkConf().setAppName("SparkFromHbase").setMaster("local[2]")
// 设置序列化
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable]))
val sc: SparkContext = new SparkContext(sparkConf)
sc.setLogLevel("WARN")
// 4.通过SparkContext 提供的newAPIHadoopRDD获取内容
val dataRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
//5.遍历输出
dataRDD.foreach{ case (_,result) =>
val key = Bytes.toString(result.getRow)
val name = Bytes.toString(result.getValue("base_info".getBytes,"name".getBytes))
val age = Bytes.toString(result.getValue("base_info".getBytes,"age".getBytes))
println("Row key:"+key+",name="+name+",age="+age)
}
}
}
8.4.2.执行结果



