栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

MapReduce进阶与经典案例

MapReduce进阶与经典案例

一,HDFS数据格式详解

数据存储空间是有限的,数据本身和增量是动态变化的,数据格式描述了数据保存在文件或者记录中的规则。HDFS中分为文件格式和压缩格式。

1,文件格式

文件格式按面向的存储形式不同,分为面向行和面向列的两大类文件格式。

面向行/列

类型名称

是否可切分

优点

缺点

适用场景

面向行

文本文件格式(.txt)

查看便编辑简单

无压缩占空间大、传输压力大、数据解析开销大

学习练习使用

面向行

sequenceFile序列文件格式(.seq)

原生支持、二进制kv存储、支持行和块压缩

本地查看不方便:小文件合并成kv结构后不易查看内部数据

生产环境使用、map输出的默认文件格式

面向列

rcfile文件格式(.rc)

数据加载快、查询快、空间利用率高、高负载能力

每一项都不是最高

学习生产均可

面向列

orcfile文件格式(.orc)

兼具了rcfile优点,进一步提高了读取、存储效率、新数据类型的支持

每一项都不是最高

学习生产均可

2,压缩格式 

压缩格式按其可切分计算性,分为可切分计算和不可切分计算两种

可切分性

类型名称

是否原生

优点

缺点

适用场景

可切分

lzo(.lzo)

压缩/解压速度快,合理的压缩率

压缩率比gzip低,非原生、需要native安装

单个文件越大,lzo优点越明显,压缩完成后>=200M为宜

可切分

bzip2(.bz2)

高压缩率超过gzip,原生支持、不需要native安装,用linux bzip可解压操作

压缩/解压速率慢

处理速度要求不高、压缩率要求高的情况

不可切分

gzip(.gz)

压缩/解压速度快,原生/native都支持,使用方便

不可切分,对cpu要求较高

压缩完成后<=128M的文件适宜

不可切分

snappy(.snappy)

高压缩/解压速度,合理的压缩率

压缩率比gzip低,非原生、需要native安装

适合作为map->reduce或是job数据流中间的数据传输格式

 3,文件格式的使用 1,默认输出为txt文本文件格式 2,设置输出格式为gzip

通过shell命令改动,添加参数设置模板:

yarn jar jar_path main_class_path -Dk1=v1参数列表

3,设置输出格式为lzo格式 

先安装lzo,命令:

yum -y install lzo lzo-devel hadooplzo hadooplzo-native

再安装lzop,命令:

yum install lzop

lzo应用

yarn jar TlHadoopCore-jar-with-dependencies.jar

com.tianliangedu.core.WordCountV2

-Dmapred.output.compress=true

-Dmapred.output.compression.codec=com.hadoop.compression.lzo.LzopCodec

/tmp/tianliangedu/input /tmp/tianliangedu/output37

二,MR应用之自定义Partition  1,Partition默认实现-HashPartition 2,MapReduce个数的确定时机

在Job提交后,任务正式开始计算之前即已经确定

Map数量的确定:由输入数据文件的总大小、数据格式、块大小综合确定。

Reduce数量确定:系统根据输入数据量的大小自动确定,有固定的计算公式,待冲刺环节详解。另外,用户可以自定义设置,通过参数配置,由用户决定。

3,自定义reduce数量

yarn jar TlHadoopCore-jar-with-dependencies.jar

com.tianliangedu.examples.WordCountV2

-Dmapred.output.compress=true

-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec

-Dmapred.reduce.tasks=2

/tmp/tianliangedu/input /tmp/tianliangedu/output38

4,自定义Partition实现 

1,通过继承Partitioner类,自定义实现Partition
    public static class MyHashPartitioner extends Partitioner {

        
        public int getPartition(K key, V value, int numReduceTasks) {
            return (key.toString().charAt(0) < 'q' ? 0 : 1) % numReduceTasks;
            // return key.toString().charAt(0);
        }

    }

完整代码: 

java:

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

//启动mr的driver类
public class SelfDefinePartitioner {

    // map类,实现map函数
    public static class TokenizerMapper extends
            Mapper {
        // 暂存每个传过来的词频计数,均为1,省掉重复申请空间
        private final static IntWritable one = new IntWritable(1);
        // 暂存每个传过来的词的值,省掉重复申请空间
        private Text word = new Text();

        // 核心map方法的具体实现,逐个对去处理
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            // 用每行的字符串值初始化StringTokenizer
            StringTokenizer itr = new StringTokenizer(value.toString());
            // 循环取得每个空白符分隔出来的每个元素
            while (itr.hasMoreTokens()) {
                // 将取得出的每个元素放到word Text对象中
                word.set(itr.nextToken());
                // 通过context对象,将map的输出逐个输出
                context.write(word, one);
            }
        }
    }

     
    public static class MyHashPartitioner extends Partitioner {

        
        public int getPartition(K key, V value, int numReduceTasks) {
            return (key.toString().charAt(0) < 'q' ? 0 : 1) % numReduceTasks;
            // return key.toString().charAt(0);
        }

    }

    // reduce类,实现reduce函数
    public static class IntSumReducer extends
            Reducer {
        private IntWritable result = new IntWritable();

        // 核心reduce方法的具体实现,逐个去处理
        public void reduce(Text key, Iterable values,
                Context context) throws IOException, InterruptedException {
            // 暂存每个key组中计算总和
            int sum = 0;
            // 加强型for,依次获取迭代器中的每个元素值,即为一个一个的词频数值
            for (IntWritable val : values) {
                // 将key组中的每个词频数值sum到一起
                sum += val.get();
            }
            // 将该key组sum完成的值放到result IntWritable中,使可以序列化输出
            result.set(sum);
            // 将计算结果逐条输出
            context.write(key, result);
        }
    }

    // 启动mr的driver方法
    public static void main(String[] args) throws Exception {
        // 得到集群配置参数
        Configuration conf = new Configuration();

        // 参数解析器
        GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
        String[] remainingArgs = optionParser.getRemainingArgs();
        if ((remainingArgs.length != 2)) {
            System.err
                    .println("Usage: yarn jar jar_path main_class_path -D参数列表  ");
            System.exit(2);
        }
        // 设置到本次的job实例中
        Job job = Job.getInstance(conf, "Partition");
        // 指定本次执行的主类是WordCount
        job.setJarByClass(SelfDefinePartitioner.class);
        // 指定map类
        job.setMapperClass(TokenizerMapper.class);
        // 指定partition类--------------------------------------------start
        job.setPartitionerClass(MyHashPartitioner.class);
        // 指定partition类--------------------------------------------end
        // 指定combiner类,要么不指定,如果指定,一般与reducer类相同
        job.setCombinerClass(IntSumReducer.class);
        // 指定reducer类
        job.setReducerClass(IntSumReducer.class);
        // 指定job输出的key和value的类型,如果map和reduce输出类型不完全相同,需要重新设置map的output的key和value的class类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 指定输入数据的路径
        FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
        // 指定输出路径,并要求该输出路径一定是不存在的
        FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
        // 指定job执行模式,等待任务执行完成后,提交任务的客户端才会退出!
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

脚本调用:

yarn jar TlHadoopCore-jar-with-dependencies.jar

com.tianliangedu.examples.SelfDefinePartitioner

-Dmapred.output.compress=true

-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec

-Dmapred.reduce.tasks=2

/tmp/tianliangedu/input /tmp/tianliangedu/output40

2,通过配置指定参数来实现 

使用yarn shell命令配置自定义的partition实现。

自定义分区类的独立类代码如下:

package com.tianliangedu.examples;
import org.apache.hadoop.mapreduce.Partitioner;
public class MyHashPartitioner extends Partitioner {
   public int getPartition(K key, V value, int numReduceTasks) {
      return (key.toString().charAt(0) < 'q' ? 0 : 1) % numReduceTasks;
   }
}

不改动代码,将自定义Partition通过系统参数指定。

yarn jar TlHadoopCore-jar-with-dependencies.jar

com.tianliangedu.examples.SelfDefinePartitioner4ShellConfigure

-Dmapred.output.compress=true

-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec

-Dmapred.reduce.tasks=2

-Dmapreduce.job.partitioner.class=com.tianliangedu.examples.MyHashPartitioner

/tmp/tianliangedu/input /tmp/tianliangedu/output44

注:以上两种方法,只是设置分区类的方式不同,对计算结果没有任何影响

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/735637.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号