一般分为列式和行式存储,行式包含text、sequence两种类型;列式包含orc和Parquet两种;
压缩方式比较多,gzip、bzip2、DEFLATE、LZO、LZ4、Snappy。
开启map压缩:
set hive.exec.compress.intermediate=true;
set mapreduce.map.output.compress=true;
set mapreduce.map.output.compress.codec= org.apache.hadoop.io.compress.SnappyCodec;
开启Reduce输出阶段压缩:
开启hive最终输出数据压缩功能
set hive.exec.compress.output=true;
开启mapreduce最终输出数据压缩
set mapreduce.output.fileoutputformat.compress=true;
设置mapreduce最终数据输出压缩方式
set mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;
设置mapreduce最终数据输出压缩为块压缩
set mapreduce.output.fileoutputformat.compress.type=BLOCK;
存储方式和压缩总结:
在实际的项目开发当中,hive表的数据存储格式一般选择:orc或parquet。压缩方式一般选择snappy。
orc + snappy
大多数的Hadoop Job是需要Hadoop提供的完整的可扩展性来处理大数据集的。不过,有时Hive的输入数据量是非常小的。
在这种情况下,为查询触发执行任务时消耗可能会比实际job的执行时间要多的多。对于大多数这种情况,Hive可以通过本地模式
在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。用户可以通过设置hive.exec.mode.local.auto的值为true,
来让Hive在适当的时候自动启动这个优化。
set hive.exec.mode.local.auto=true;? --开启本地mr
–设置local mr的最大输入数据量,当输入数据量小于这个值时采用local? mr的方式,默认为134217728,即128M
set hive.exec.mode.local.auto.inputbytes.max=51234560;
–设置local mr的最大输入文件个数,当输入文件个数小于这个值时采用local mr的方式,默认为4
set hive.exec.mode.local.auto.input.files.max=10;
create table ori(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by ‘t’;
create table nullidtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by ‘t’;
INSERT OVERWRITE TABLE jointable
SELECt a.* FROM (SELECt * FROM nullidtable WHERe id IS NOT NULL ) a JOIN ori b ON a.id = b.id;
reduce
set hive.exec.reducers.bytes.per.reducer=32123456;
set mapreduce.job.reduces=7;
INSERT OVERWRITE TABLE jointable
SELECt a.*
FROM nullidtable a
LEFT JOIN ori b ON CASE WHEN a.id IS NULL THEN concat(‘hive’, rand()) ELSE a.id END = b.id;
a.是否在Map端进行聚合,默认为True
set hive.map.aggr = true;
b.在Map端进行聚合操作的条目数目(阈值)
set hive.groupby.mapaggr.checkinterval = 100000;
c.有数据倾斜的时候进行负载均衡(默认是false)
set hive.groupby.skewindata = true;
当选项设定为 true,生成的查询计划会有两个MR Job。第一个MR Job中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;
第二个MR Job再根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程可以保证相同的Group By Key被分布到同一个Reduce中),最后完成最终的聚合操作。
数据量小的时候无所谓,数据量大的情况下,由于COUNT DISTINCT操作需要用一个Reduce Task来完成,这一个Reduce需要处理的数据量太大,就会导致整个Job很难完成,一般COUNT DISTINCT使用先GROUP BY再COUNT的方式替换。
方式1:
SELECt count(DISTINCT id) FROM bigtable;
方式2
SELECt count(id) FROM (SELECt id FROM bigtable GROUP BY id) a;
Hive会将一个查询转化成一个或者多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit阶段。
或者Hive执行过程中可能需要的其他阶段。默认情况下,Hive一次只会执行一个阶段。不过,某个特定的job可能包含众多的阶段,
而这些阶段可能并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个job的执行时间缩短。
不过,如果有更多的阶段可以并行执行,那么job可能就越快完成。
set hive.exec.parallel=true;?? --打开任务并行执行
set hive.exec.parallel.thread.number=16; --同一个sql允许最大并行度,默认为8。
set hive.mapred.mode = strict; --开启严格模式
set hive.mapred.mode = nostrict; --开启非严格模式
开启严格模式可以禁止3种类型的查询。
a.对于分区表,在where语句中必须含有分区字段作为过滤条件来限制范围,否则不允许执行;
b.对于使用了order by语句的查询,要求必须使用limit语句;
c.限制笛卡尔积的查询。
set hive.fetch.task.conversion=more,设置成minimal或者none不i走mr任务
hive 0.10.0为了执行效率考虑,简单的查询,就是只是select,不带count,sum,group by这样的,都不走map/reduce,直接读取hdfs文件进行filter过滤。
这样做的好处就是不新开mr任务,执行效率要提高不少,但是不好的地方就是用户界面不友好,有时候数据量大还是要等很长时间,但是又没有任何返回。
Fetch 抓取是指,Hive 中对某些情况的查询可以不必使用 MapReduce 计算。例如:SELECT
FROM employees;在这种情况下,Hive 可以简单地读取 employee 对应的存储目录下的文件,
然后输出查询结果到控制台。
在 hive-default.xml.template 文件中 hive.fetch.task.conversion 默认是 more,老版本 hive
默认是 minimal,该属性修改为 more 以后,在全局查找、字段查找、limit 查找等都不走
mapreduce。
10.创建分区表
–动态分区配置
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
–hive压缩
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
–写入时压缩生效
set hive.exec.orc.compression.strategy=COMPRESSION;
set hive.auto.convert.join= true;该参数默认true,表示开启map join,改为false表示开启reduce join,内存不够可以开启reduce join。
mapred.min.split.size: 指的是数据的最小分割单元大小;min的默认值是1B
mapred.max.split.size: 指的是数据的最大分割单元大小;max的默认值是256MB
调整块的大小,设置切片的大小,从而调整map的个数。
备注:当切片最后剩下的大小不超过切片的110%时,会当作一个切片。例如:切片大小=128,文件大小为394
切成三个块,前两个大小为128,第三个为138,因为138/128=1.08<1.1,所以还是当作一个切片。
set mapred.max.split.size=100000000;
set mapred.min.split.size.per.node=100000000;
set mapred.min.split.size.per.rack=100000000;
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
#设置Hive中底层MapReduce读取数据的输入类:将所有文件合并为一个大文件作为输入
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
#如果hive的程序,只有maptask,将MapTask产生的所有小文件进行合并
set hive.merge.mapfiles=true;
#如果hive的程序,有Map和ReduceTask,将ReduceTask产生的所有小文件进行合并
set hive.merge.mapredfiles=true;
#每一个合并的文件的大小
set hive.merge.size.per.task=256000000;
#平均每个文件的大小,如果小于这个值就会进行合并
set hive.merge.smallfiles.avgsize=16000000;
set mapred.reduce.tasks=10;
根据分桶表处理,把大文件根据某个字段分桶
语法:create table table_name as select * from table_name distribute by [rand(123)field] sorted by field into number bucks;
clustered by col [sorted by col]
–开启分桶,如果不开启,是不会启动多个reduce分桶的
set hive.enforce.bucketing=true;
–开启分桶SMB join
set hive.optimize.bucketmapjoin = true;
set hive.auto.convert.sortmerge.join=true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.auto.convert.sortmerge.join.noconditionaltask=true;
利用smb join提高效率。
reduce个数的设定极大影响任务执行效率,不指定reduce个数的情况下,Hive会猜测确定一个reduce个数,基于以下两个设定:
hive.exec.reducers.bytes.per.reducer(每个reduce任务处理的数据量,默认为1000^3=1G)
hive.exec.reducers.max(每个任务最大的reduce数,默认为999)
计算reducer数的公式很简单N=min(参数2,总输入数据量/参数1)
即,如果reduce的输入(map的输出)总大小不超过1G,那么只会有一个reduce任务;
a:采样
开发测试过程中,原始数据过大,测试验证代码逻辑时非常的不方便,取部分数据来做测试代码的逻辑
常规采样:
select * from tableName tablesample(N PERCENT);–按照文件大小的比例来进行采样
select * from temp_buck tablesample(50 PERCENT);
分桶采样
select …… from tableName tablesample(bucket x out of y)
b:分桶Join
Bucket Map Join:分桶Join
第一种普通的分桶join:Bucket map Join
语法:clusterd by col into N buckets两张表必须为桶表,并且桶的个数要相等或者成倍数分桶字段 = Join字段
第二种基于排序的分桶Join:Sort Merge Bucket Join => SMB Join语法:clusterd by col sorted by col into N buckets两张表必须为桶表,并且桶的个数要相等分桶字段 = Join字段 = 排序字段
要控制每个分桶的文件大小符合小文件的定义。
通过explain查看执行计划,可知smb join和bm join实际上是map join,不走reduce join,效率高。
16.hive索引机制
语法:create index indexName on Tbname(col)
场景:适合于做大数据量中查询少量数据
不适合场景:小数据量查询或者大数据量中查询大量数据
功能:将Hive中的数据对应的文件路径、文件中的偏移量构建索引信息,做过滤查询加快MR读取数据的性能
本质:通过一个MapReduce对所有数据构建索引表,将索引信息存储在索引表中
问题:如果查询新增的数据就无法走索引,索引不会自动更新,必须强制手动更新;
通过一个命令走一个MapReduce程序来手动更新索引数据。
一般不要,3.0以后弃用。
ORC文件中保存了三个层级的统计信息,分别为文件级别、stripe级别和row group级别的
文件级别索引:每个文件都会记录这个文件中所有信息
Strip级别索引:组成的逻辑单元,记录每个strip中的信息
Row索引:每一行的所有列的信息
ORC文件的内部存储结构:ORC每部存储是按照Strip划分存储的
Row Group Index
功能:将每一列在这个Strip中对应最大值和最小值进行记录,当用户进行比较查询时,可以通过记录的最大与最小值判断查询的数据是否在这个Strip中,如果在,读取Strip,如果不在就直接跳过,不读取Strip
应用:做范围比较,一般用唯一标识的那一列,例如id等来进行排序,作为查询条件
使用
创建表时,指定开启row group index
create table2()
stored as orc (’orc.create.index’=’true’)
查询数据时,开启row group index 过滤查询
set hive.optimize.index.filter=true
注意:如果要想实现基于范围的查询索引过滤,必须由用户自己保证写入orc的数据是基于查询条件有序的
insert into table2
select id,name from table1 sort by id
以后按照这个排序的字段做过滤,就可以走行组索引实现范围过滤
Bloom Filter Index布隆过滤索引
row group :范围过滤,根据某列排序的结果做范围比较
bloom filter:等值过滤判断你要的数据在不在这个Strip中,如果索引显示在,就读,但是不一定在,但是索引如果显示不在,就一定不在,就不读了一般的查询条件时间:直接是分区字段,直接使用分区过滤了唯一标识符:基于row group index其他的列作为查询条件?指定为某些列在Strip中构建索引功能:根据指定的列,在生成ORC文件时,在每个Strip中生成这一列所有值的编码值,当进行等值判断时,直接读取对应的索引进行判断,如果在这个Strip中,就读取,如果不在,就跳到下一个Strip,降低数据读取的IO注意:布隆过滤特点,如果索引中显示有对应的编码,实际不一定有这个值,但是没有这个编码就一定没有这个值
使用,创建表时候,指定为某一列创建索引
create table () stored as orc (”orc.bloom.filter.columns”=”列名称”)
create table () stored as orc (”orc.bloom.filter.columns”=”name1,name2”)
18.属性优化
矢量化查询
功能:按照批次对数据进行读取查询
不开启:Hive读一条处理一条开启了:Hive每个批次读取1024条,处理1024条
属性
set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true;
零拷贝
功能:数据在操作系统中从内存中不用经过多次拷贝直接读取
不开启:必须在内存中经过多次交换才能读取到数据开启了:数据可以直接从内存中读取
属性
set hive.exec.orc.zerocopy=true;
关联优化器
功能:Hive在解析SQL语句,转换为MapReduce时候,可以将相关联的部分合并在一起执行
不开启
先走第一个MapReduce进行分组,结果保存在磁盘再走第二个MapReduce进行排序,得到最终的结果 开启了
自动判断所有执行过程语法数是否有重合的过程,放在一起执行只启动一个MapReduce,就可以得到最终的结果 属性
set hive.optimize.correlation=true;



