- Hbase参数调优
- hbase.regionserver.handler.count
- hbase.hregion.max.filesize
- hbase.hregion.majorcompaction
- hbase.hstore.compaction.min
- hbase.hstore.compaction.max
- hbase.hstore.blockingStoreFiles
- hbase.regionserver.global.memstore.size(重)
- hbase.regionserver.global.memstore.size.lower.limit(重)
- hfile.block.cache.size(重)
- hbase.hregion.memstore.flush.size(重)
- hbase.hregion.memstore.block.multiplier
- hbase.regionserver.regionSplitLimit
- hbase.regionserver.maxlogs
- hbase.regionserver.hlog.blocksize
- 什么时候触发 MemStore Flush?(重)
- 什么操作会触发 MemStore 刷写?
- MemStore 刷写策略(FlushPolicy)
- hbase高可用
- Hbase调优 BulkLoading
- BulkLoading优点:
- BulkLoading限制:
- BulkLoading代码部分
- 代码所需要的依赖
- 说明
hbase.hregion.max.filesize该设置决定了处理RPC的线程数量,默认值是10,通常可以调大,比如:150,当请求内容很大(上MB,比如大的put、使用缓存的scans)的时候,如果该值设置过大则会占用过多的内存,导致频繁的GC,或者出现OutOfMemory,因此该值不是越大越好。
hbase.hregion.majorcompaction配置region大小,默认是10G,region大小一般控制在几个G比较合适,可以在建表时规划好region数量,进行预分区,做到一定时间内,每个region的数据大小在一定的数据量之下,当发现有大的region,或者需要对整个表进行region扩充时再进行split操作,一般提供在线服务的hbase集群均会弃用hbase的自动split,转而自己管理split。
hbase.hstore.compaction.min配置major合并的间隔时间,默认值604800000,单位ms。表示major compaction默认7天调度一次,Hbase 0.96.x及之前默认为1天调度一次,设置为 0 时表示禁用自动触发major compaction。一般major compaction持续时间较长、系统资源消耗较大,对上层业务也有比较大的影响,一般生产环境下为了避免影响读写请求,会禁用自动触发major compaction,可手动或者通过脚本定期进行major合并。
hbase.hstore.compaction.max默认值 3,一个列族下的HFile数量超过该值就会触发Minor Compaction,这个参数默认值小了,一般情况下建议调大到5~10之间,注意相应调整下一个参数
hbase.hstore.blockingStoreFiles默认值 10,一次Minor Compaction最多合并的HFile文件数量,这个参数基本控制着一次压缩即Compaction的耗时。这个参数要比上一个参数hbase.hstore.compaction.min值大,通常是其2~3倍。
hbase.regionserver.global.memstore.size(重)默认值 10,一个列族下HFile数量达到该值,flush操作将会受到阻塞,阻塞时间为hbase.hstore.blockingWaitTime,默认90000,即1.5分钟,在这段时间内,如果compaction操作使得HFile下降到blockingStoreFiles配置值,则停止阻塞。另外阻塞超过时间后,也会恢复执行flush操作。这样做可以有效地控制大量写请求的速度,但同时这也是影响写请求速度的主要原因之一。生产环境中默认值太小了,一般建议设置大点比如100,避免出现阻塞更新的情况
hbase.regionserver.global.memstore.size.lower.limit(重)默认值0.4,RS所有memstore占用内存在总内存中的比例,当达到该值,则会从整个RS中找出最需要flush的region进行flush,直到总内存比例降至该数限制以下,并且在降至限制比例前,将阻塞所有的写memstore的操作,在以写为主的集群中,可以调大该配置项,不建议太大,因为block cache和memstore cache的总大小不会超过0.8,而且不建议这两个cache的大小总和达到或者接近0.8,避免OOM,在偏向写的业务时,可配置为0.45
默认值0.95,相当于上一个参数的0.95
如果有 16G 堆内存,默认情况下:
-
# 达到该值会触发刷写 16 * 0.4 * 0.95 = 6.08 # 达到该值会触发阻塞 16 * 0.4 = 6.4
| 新参数 | 老参数 |
|---|---|
| hbase.regionserver.global.memstore.size | hbase.regionserver.global.memstore.upperLimit |
| hbase.regionserver.global.memstore.size.lower.limit | hbase.regionserver.global.memstore.lowerLimit |
hbase.hregion.memstore.flush.size(重)RS的block cache的内存大小限制,默认值0.4,在偏向读的业务中,可以适当调大该值,具体配置时需试hbase集群服务的业务特征,结合memstore的内存占比进行综合考虑。
hbase.hregion.memstore.block.multiplier默认值128M,单位字节,超过将被flush到hdfs,该值比较适中,一般不需要调整。
hbase.regionserver.regionSplitLimit默认值4,如果memstore的内存大小已经超过了hbase.hregion.memstore.flush.size的4倍,则会阻塞memstore的写操作,直到降至该值以下,为避免发生阻塞,最好调大该值,比如:6,不可太大,如果太大,则会增大导致整个RS的memstore内存超过global.memstore.size限制的可能性,进而增大阻塞整个RS的写的几率,如果region发生了阻塞会导致大量的线程被阻塞在到该region上,从而其它region的线程数会下降,影响整体的RS服务能力。
hbase.regionserver.maxlogs控制最大的region数量,超过则不可以进行split操作,默认是Integer.MAX(2147483647),可设置为1,禁止自动的split,通过人工,或者写脚本在集群空闲时执行。如果不禁止自动的split,则当region大小超过hbase.hregion.max.filesize时会触发split操作(具体的split有一定的策略,不仅仅通过该参数控制,前期的split会考虑region数据量和memstore大小),每次flush或者compact之后,regionserver都会检查是否需要Split,split会先下线老region再上线split后的region,该过程会很快,但是会存在两个问题:1、老region下线后,新region上线前client访问会失败,在重试过程中会成功但是如果是提供实时服务的系统则响应时长会增加;2、split后的compact是一个比较耗资源的动作。
hbase.regionserver.hlog.blocksize默认值32,HLOG最大的数量
默认为 2 倍的HDFS block size(128MB),即256MB
JVM调整:
内存大小:master默认为1G,可增加到2G,regionserver默认1G,可调大到10G,或者更大,zk并不耗资源,可以不用调整,需要注意的是,调整了rs的内存大小后,需调整hbase.regionserver.maxlogs和hbase.regionserver.hlog.blocksize这两个参数,WAL的最大值由hbase.regionserver.maxlogs * hbase.regionserver.hlog.blocksize决定(默认32*2*128M=8G),一旦达到这个值,就会被触发flush memstore,如果memstore的内存增大了,但是没有调整这两个参数,实际上对大量小文件没有任何改进,调整策略:hbase.regionserver.hlog.blocksize * hbase.regionserver.maxlogs 设置为略大于hbase.regionserver.global.memstore.size* Hbase_HEAPSIZE。
什么时候触发 MemStore Flush?(重)
有很多情况会触发 MemStore 的 Flush 操作,主要有以下几种情况:
-
Region 中任意一个 MemStore 占用的内存超过相关阈值 128MB会刷
当一个 Region 中所有 MemStore 占用的内存大小超过刷写阈值的时候会触发一次刷写,这个阈值由 hbase.hregion.memstore.flush.size 参数控制,默认为128MB。我们每次调用 put、delete 等操作都会检查的这个条件的。
但是如果我们的数据增加得很快,达到了 hbase.hregion.memstore.flush.size * hbase.hregion.memstore.block.multiplier 的大小,hbase.hregion.memstore.block.multiplier 默认值为4,也就是128*4=512MB的时候,那么除了触发 MemStore 刷写之外,Hbase 还会在刷写的时候同时阻塞所有写入该 Store 的写请求!这时候如果你往对应的 Store 写数据,会出现 RegionTooBusyException 异常。
-
整个 RegionServer 的 MemStore 占用内存总和大于相关阈值 达到40%会刷新
如果达到了 RegionServer 级别的 Flush,那么当前 RegionServer 的所有写操作将会被阻塞,而且这个阻塞可能会持续到分钟级别。
-
WAL数量大于相关阈值或WAL的大小超过一定阈值 数量或者大小超过一定阈值会刷
如果设置了 hbase.regionserver.maxlogs,那就是这个参数的值;否则是 max(32, hbase_heapsize * hbase.regionserver.global.memstore.size * 2 / logRollSize)
(logRollSize 默认大小为:0.95 * HDFS block size)
如果某个 RegionServer 的 WAL 数量大于 maxLogs 就会触发 MemStore 的刷写。
WAL的最大值由hbase.regionserver.maxlogs * hbase.regionserver.hlog.blocksize决定(默认32*2*128M=8G),一旦达到这个值,就会被触发flush memstore,如果memstore的内存增大了,但是没有调整这两个参数,实际上对大量小文件没有任何改进,调整策略:hbase.regionserver.hlog.blocksize * hbase.regionserver.maxlogs 设置为略大于hbase.regionserver.global.memstore.size* Hbase_HEAPSIZE。
-
定期自动刷写
如果我们很久没有对 Hbase 的数据进行更新,这时候就可以依赖定期刷写策略了。RegionServer 在启动的时候会启动一个线程 PeriodicMemStoreFlusher 每隔 hbase.server.thread.wakefrequency 时间(服务线程的sleep时间,默认10000毫秒)去检查属于这个 RegionServer 的 Region 有没有超过一定时间都没有刷写,这个时间是由 hbase.regionserver.optionalcacheflushinterval 参数控制的,默认是 3600000,也就是1小时会进行一次刷写。如果设定为0,则意味着关闭定时自动刷写。
为了防止一次性有过多的 MemStore 刷写,定期自动刷写会有 0 ~ 5 分钟的延迟
-
数据更新超过一定阈值
如果 Hbase 的某个 Region 更新的很频繁,而且既没有达到自动刷写阀值,也没有达到内存的使用限制,但是内存中的更新数量已经足够多,比如超过 hbase.regionserver.flush.per.changes 参数配置,默认为30000000,那么也是会触发刷写的。
-
手动触发刷写
分别对某张表、某个 Region 进行刷写操作。
可以在 Shell 中执行 flush 命令
什么操作会触发 MemStore 刷写?
常见的 put、delete、append、incr、调用 flush 命令、Region 分裂、Region Merge、bulkLoad HFiles 以及给表做快照操作都会对上面的相关条件做检查,以便判断要不要做刷写操作。
MemStore 刷写策略(FlushPolicy)在 Hbase 1.1 之前,MemStore 刷写是 Region 级别的。就是说,如果要刷写某个 MemStore ,MemStore 所在的 Region 中其他 MemStore 也是会被一起刷写的!(Flush风暴)这会造成一定的问题,比如小文件问题。可以通过 hbase.regionserver.flush.policy 参数选择不同的刷写策略。
目前 Hbase 2.x 的刷写策略全部都是实现 FlushPolicy 抽象类的。并且自带三种刷写策略:FlushAllLargeStoresPolicy、FlushNonSloppyStoresFirstPolicy 以及 FlushAllStoresPolicy。
-
FlushAllStoresPolicy
这种刷写策略实现最简单,直接返回当前 Region 对应的所有 MemStore。也就是每次刷写都是对 Region 里面所有的 MemStore 进行的,这个行为和 Hbase 1.1 之前是一样的。
-
FlushAllLargeStoresPolicy
在 Hbase 2.0 之前版本是 FlushLargeStoresPolicy,后面被拆分成分 FlushAllLargeStoresPolicy 和FlushNonSloppyStoresFirstPolicy
这种策略会先判断 Region 中每个 MemStore 的使用内存是否大于某个阀值,大于这个阀值的 MemStore 将会被刷写
hbase.hregion.percolumnfamilyflush.size.lower.bound.min 默认值为 16MB
hbase.hregion.percolumnfamilyflush.size.lower.bound 没有默认值,计算规则如下:
比如当前表有3个列族,那么 flushSizeLowerBound = max((long)128 / 3, 16) = 42。
如果 Region 中没有 MemStore 的使用内存大于上面的阀值,FlushAllLargeStoresPolicy 策略就退化成 FlushAllStoresPolicy 策略了,也就是会对 Region 里面所有的 MemStore 进行 Flush。
-
FlushNonSloppyStoresFirstPolicy
Hbase 2.0 引入了 in-memory compaction,如果我们对相关列族hbase.hregion.compacting.memstore.type 参数的值不是 NONE,那么这个 MemStore 的 isSloppyMemStore 值就是 true,否则就是 false。
FlushNonSloppyStoresFirstPolicy 策略将 Region 中的 MemStore 按照 isSloppyMemStore 分到两个 HashSet 里面(sloppyStores 和 regularStores)。然后
- 判断 regularStores 里面是否有 MemStore 内存占用大于相关阀值的 MemStore ,有的话就会对这些 MemStore 进行刷写,其他的不做处理,这个阀值计算和 FlushAllLargeStoresPolicy 的阀值计算逻辑一致。
- 如果 regularStores 里面没有 MemStore 内存占用大于相关阀值的 MemStore,这时候就开始在 sloppyStores 里面寻找是否有 MemStore 内存占用大于相关阀值的 MemStore,有的话就会对这些 MemStore 进行刷写,其他的不做处理。
- 如果上面 sloppyStores 和 regularStores 都没有满足条件的 MemStore 需要刷写,这时候就 FlushNonSloppyStoresFirstPolicy 策略久退化成 FlushAllStoresPolicy 策略了。
-
如果我们一次性入库hbase巨量数据,处理速度慢不说,还特别占用Region资源, 一个比较高效便捷的方法就是使用 “Bulk Loading”方法,即Hbase提供的HFileOutputFormat类。
-
它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接生成这种hdfs内存储的数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载。
-
仅适合初次数据导入,即表内数据为空,或者每次入库表内都无数据的情况。
-
Hbase集群与Hadoop集群为同一集群,即Hbase所基于的HDFS为生成HFile的MR的集群。
//hbase shell建表
create 'dianxin_bulk','info',{SPLITS=>['1|','3|','5|','7|','9|','B|','D|']}
BulkLoading代码部分
package com.gym.Hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HbaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class Demo8BulkLoad {
//继承TableMapper 是从hbase中读数据
//map端
public static class BulkLoadMapper extends Mapper{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] splits = value.toString().split(",");
String mdn = splits[0];
String start_time = splits[1];
// 经度
String longitude = splits[4];
// 纬度
String latitude = splits[5];
String rowkey = mdn + "_" + start_time;
//keyvalue相当于cell
KeyValue lgkv = new KeyValue(rowkey.getBytes(), "info".getBytes(), "lg".getBytes(), longitude.getBytes());
KeyValue latkv = new KeyValue(rowkey.getBytes(), "info".getBytes(), "lat".getBytes(), latitude.getBytes());
context.write(new ImmutableBytesWritable(rowkey.getBytes()),lgkv);
context.write(new ImmutableBytesWritable(rowkey.getBytes()),latkv);
}
}
//因为不需要计算,reduce任务可以省略
//driver端
public static void main(String[] args) throws Exception {
Configuration conf = HbaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","master:2181,node1:2181,node2:2181");
Job job = Job.getInstance();
job.setJobName("Demo8BulkLoad");
job.setJarByClass(Demo8BulkLoad.class);
//该配置不会生效,因为reduce任务的数量由region决定
job.setNumReduceTasks(4);
//配置map
job.setMapperClass(BulkLoadMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
//保证每个reduce之间的数据不会有重叠,并且是有序的
job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
//配置reduce
//保证在reduce内部是有序的
job.setReducerClass(KeyValueSortReducer.class);
//配置输入输出路径
Path outputPath = new Path("/bulk_load/output1");
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath,true);
}
FileInputFormat.addInputPath(job,new Path("/bulk_load/input"));
FileOutputFormat.setOutputPath(job,outputPath);
Connection conn = ConnectionFactory.createConnection(conf);
Admin admin = conn.getAdmin();
Table dianxin_bulk = conn.getTable(TableName.valueOf("dianxin_bulk"));
RegionLocator regionLocator = conn.getRegionLocator(TableName.valueOf("dianxin_bulk"));
//第一步:生成Hfile
// 使用HFileOutputFormat2将输出的数据按照HFile的形式格式化
HFileOutputFormat2.configureIncrementalLoad(job,
dianxin_bulk,
regionLocator
);
boolean flag = job.waitForCompletion(true);
if (flag){
//第二步:加载Hfile到Hbase中
LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
load.doBulkLoad(
outputPath,
admin,
dianxin_bulk,
regionLocator
);
}else {
System.out.println("运行失败");
}
}
}
代码所需要的依赖
4.0.0 com.gym HbaseTest 1.0-SNAPSHOT org.apache.phoenix phoenix-core 4.15.0-Hbase-1.4 org.apache.hadoop hadoop-common 2.7.6 compile org.apache.maven.plugins maven-compiler-plugin 3.1 1.8 1.8 maven-assembly-plugin jar-with-dependencies make-assembly package single
说明打成jar包,上传至hdfs上,运行
hadoop jar HbaseTest-1.0-SNAPSHOT.jar com.gym.Hbase.Demo8BulkLoad
-
最终输出结果,无论是map还是reduce,输出部分key和value的类型必须是: < ImmutableBytesWritable, KeyValue>或者< ImmutableBytesWritable, Put>。
-
最终输出部分,Value类型是KeyValue 或Put,对应的Sorter分别是KeyValueSortReducer或PutSortReducer。
-
MR例子中HFileOutputFormat2.configureIncrementalLoad(job, dianxin_bulk, regionLocator);自动对job进行配置。SimpleTotalOrderPartitioner是需要先对key进行整体排序,然后划分到每个reduce中,保证每一个reducer中的的key最小最大值区间范围,是不会有交集的。因为入库到Hbase的时候,作为一个整体的Region,key是绝对有序的。
-
MR例子中最后生成HFile存储在HDFS上,输出路径下的子目录是各个列族。如果对HFile进行入库Hbase,相当于move HFile到Hbase的Region中,HFile子目录的列族内容没有了,但不能直接使用mv命令移动,因为直接移动不能更新Hbase的元数据。
-
HFile入库到Hbase通过Hbase中 LoadIncrementalHFiles的doBulkLoad方法,对生成的HFile文件入库



