数据存储空间是有限的,数据本身和增量是动态变化的,数据格式描述了数据保存在文件或者记录中的规则。HDFS中分为文件格式和压缩格式。
1,文件格式
文件格式按面向的存储形式不同,分为面向行和面向列的两大类文件格式。
| 面向行/列 | 类型名称 | 是否可切分 | 优点 | 缺点 | 适用场景 |
| 面向行 | 文本文件格式(.txt) | 是 | 查看便编辑简单 | 无压缩占空间大、传输压力大、数据解析开销大 | 学习练习使用 |
| 面向行 | sequenceFile序列文件格式(.seq) | 是 | 原生支持、二进制kv存储、支持行和块压缩 | 本地查看不方便:小文件合并成kv结构后不易查看内部数据 | 生产环境使用、map输出的默认文件格式 |
| 面向列 | rcfile文件格式(.rc) | 是 | 数据加载快、查询快、空间利用率高、高负载能力 | 每一项都不是最高 | 学习生产均可 |
| 面向列 | orcfile文件格式(.orc) | 是 | 兼具了rcfile优点,进一步提高了读取、存储效率、新数据类型的支持 | 每一项都不是最高 | 学习生产均可 |
压缩格式按其可切分计算性,分为可切分计算和不可切分计算两种
| 可切分性 | 类型名称 | 是否原生 | 优点 | 缺点 | 适用场景 |
| 可切分 | lzo(.lzo) | 否 | 压缩/解压速度快,合理的压缩率 | 压缩率比gzip低,非原生、需要native安装 | 单个文件越大,lzo优点越明显,压缩完成后>=200M为宜 |
| 可切分 | bzip2(.bz2) | 是 | 高压缩率超过gzip,原生支持、不需要native安装,用linux bzip可解压操作 | 压缩/解压速率慢 | 处理速度要求不高、压缩率要求高的情况 |
| 不可切分 | gzip(.gz) | 是 | 压缩/解压速度快,原生/native都支持,使用方便 | 不可切分,对cpu要求较高 | 压缩完成后<=128M的文件适宜 |
| 不可切分 | snappy(.snappy) | 否 | 高压缩/解压速度,合理的压缩率 | 压缩率比gzip低,非原生、需要native安装 | 适合作为map->reduce或是job数据流中间的数据传输格式 |
3,设置输出格式为lzo格式通过shell命令改动,添加参数设置模板:
yarn jar jar_path main_class_path -Dk1=v1参数列表
先安装lzo,命令:
yum -y install lzo lzo-devel hadooplzo hadooplzo-native
再安装lzop,命令:
yum install lzop
lzo应用
二,MR应用之自定义Partition 1,Partition默认实现-HashPartition 2,MapReduce个数的确定时机yarn jar TlHadoopCore-jar-with-dependencies.jar
com.tianliangedu.core.WordCountV2
-Dmapred.output.compress=true
-Dmapred.output.compression.codec=com.hadoop.compression.lzo.LzopCodec
/tmp/tianliangedu/input /tmp/tianliangedu/output37
在Job提交后,任务正式开始计算之前即已经确定
Map数量的确定:由输入数据文件的总大小、数据格式、块大小综合确定。
Reduce数量确定:系统根据输入数据量的大小自动确定,有固定的计算公式,待冲刺环节详解。另外,用户可以自定义设置,通过参数配置,由用户决定。
3,自定义reduce数量
yarn jar TlHadoopCore-jar-with-dependencies.jar
com.tianliangedu.examples.WordCountV2
-Dmapred.output.compress=true
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
-Dmapred.reduce.tasks=2
/tmp/tianliangedu/input /tmp/tianliangedu/output38
4,自定义Partition实现
1,通过继承Partitioner类,自定义实现Partition
public static class MyHashPartitioner extends Partitioner {
public int getPartition(K key, V value, int numReduceTasks) {
return (key.toString().charAt(0) < 'q' ? 0 : 1) % numReduceTasks;
// return key.toString().charAt(0);
}
}
yarn jar TlHadoopCore-jar-with-dependencies.jar
com.tianliangedu.examples.WordCountV2
-Dmapred.output.compress=true
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
-Dmapred.reduce.tasks=2
/tmp/tianliangedu/input /tmp/tianliangedu/output38
public static class MyHashPartitioner extends Partitioner {
public int getPartition(K key, V value, int numReduceTasks) {
return (key.toString().charAt(0) < 'q' ? 0 : 1) % numReduceTasks;
// return key.toString().charAt(0);
}
}
完整代码:
java:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
//启动mr的driver类
public class SelfDefinePartitioner {
// map类,实现map函数
public static class TokenizerMapper extends
Mapper
脚本调用:
2,通过配置指定参数来实现yarn jar TlHadoopCore-jar-with-dependencies.jar
com.tianliangedu.examples.SelfDefinePartitioner
-Dmapred.output.compress=true
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
-Dmapred.reduce.tasks=2
/tmp/tianliangedu/input /tmp/tianliangedu/output40
使用yarn shell命令配置自定义的partition实现。
自定义分区类的独立类代码如下:
package com.tianliangedu.examples; import org.apache.hadoop.mapreduce.Partitioner; public class MyHashPartitionerextends Partitioner { public int getPartition(K key, V value, int numReduceTasks) { return (key.toString().charAt(0) < 'q' ? 0 : 1) % numReduceTasks; } }
不改动代码,将自定义Partition通过系统参数指定。
yarn jar TlHadoopCore-jar-with-dependencies.jar
com.tianliangedu.examples.SelfDefinePartitioner4ShellConfigure
-Dmapred.output.compress=true
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
-Dmapred.reduce.tasks=2
-Dmapreduce.job.partitioner.class=com.tianliangedu.examples.MyHashPartitioner
/tmp/tianliangedu/input /tmp/tianliangedu/output44
注:以上两种方法,只是设置分区类的方式不同,对计算结果没有任何影响



