栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

MapReduce的单词统计从本地运行到集群运行

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

MapReduce的单词统计从本地运行到集群运行

基于windows已配置号hadoop客户端,本次使用的事hadoop3.1.3版本

数据:

 As soon as an abnormally elevated IOP was encountered, patient was excluded from the study
and the prior regimen was reestablished. RESULTS This study included 53 eyes of 53
 patients with open angle glaucoma. Twenty-seven patients suffered from primary open
 angle glaucoma and 26 patients had pseudoexfoliative glaucoma. After beginning the
 second phase of the study, a mild trend of increasing IOP was recordable. A corresponding
 trend was even detected in female and male patients separately. The P values at week 
In the first 2 weeks after initiation of the 2nd phase, 66% of cases have no change 
in IOPs, but thereafter, 69.8%, experienced increasing IOPs.The present
study shows the superiority of the conventional dosage of Latanoprost 0.005% in 
comparison with once every other day dose but at least in first few weeks, the 
IOPs are reasonably close to each other. Further studies with higher number of 
cases would widen the present findings.

#maper代码

package com.pw.study.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class WCMapper extends Mapper {
    private final Text outKey = new Text();
    private final IntWritable outValue = new IntWritable();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        
        //1.读取一行
        String line = value.toString();
        //2.切割
        String[] values = line.split(" ");
        for (String item : values) {
            //每一个单词记录一下
            outKey.set(item);
            outValue.set(1);
            context.write(outKey, outValue);
        }
    }
}

#reducer代码

package com.pw.study.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WCReducer extends Reducer {
    //输出内容
    private final IntWritable outValue = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        //累计求和
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        outValue.set(sum);
        context.write(key, outValue);
    }
}

# wcDriver 主类,本地直接启动(本地运行)得到结果

import java.io.IOException;


public class WCDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 启动mrJob
        Job job = Job.getInstance(new Configuration());
        //设置主启动类

        
        job.setMapperClass(WCMapper.class);
        
        job.setReducerClass(WCReducer.class);

        //设置map输出key类型
        job.setMapOutputKeyClass(Text.class);
        //设置map输出value类型
        job.setMapOutputValueClass(IntWritable.class);

        //设置最终输出key
        job.setOutputKeyClass(Text.class);
        //设置最终输出value
        job.setOutputValueClass(IntWritable.class);


        //文件输入
        FileInputFormat.setInputPaths(job, new Path("E:\work\note\iodata\input"));
        //最终结果输出
        FileOutputFormat.setOutputPath(job, new Path("E:\work\note\iodata\out3"));

        //执行job
        job.waitForCompletion(true);
    }
}

#hdfs运行,在本地将Wordcount项目打包,将jar上传到hdfs的节点上,并将输入文件上传到/input中。然后运行如下代码,由于我打包时没有你配置主类所以包名一起输入,其他的和官网的一致,

hadoop jar./MR_WC.jar com.pw.study.wordcount.WCDriverForHDFSAndSingle /input  /output
package com.pw.study.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class WCDriverForHDFSAndSingle {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 启动mrJob
        Job job = Job.getInstance(new Configuration());
        //设置主启动类
        job.setJarByClass(WCDriverForHDFSAndSingle.class);

        
        job.setMapperClass(WCMapper.class);
        
        job.setReducerClass(WCReducer.class);

        //设置map输出key类型
        job.setMapOutputKeyClass(Text.class);
        //设置map输出value类型
        job.setMapOutputValueClass(IntWritable.class);

        //设置最终输出key
        job.setOutputKeyClass(Text.class);
        //设置最终输出value
        job.setOutputValueClass(IntWritable.class);


        //文件输入
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //最终结果输出
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //执行job
        job.waitForCompletion(true);
    }
}

#本地连接hdfs集群,配置参数然后运行,设置环境变量

 注意的地方是5.6步先打包(打包是还没有设置jar的代码)

package com.pw.study.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class WCDriverForHDFSAndCluster {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();
        //设置在集群运行的相关参数-设置HDFS,NAMENODE的地址
        conf.set("fs.defaultFS", "hdfs://hadoop102:8020");
        //指定MR运行在Yarn上
        conf.set("mapreduce.framework.name", "yarn");
        //指定MR可以在远程集群运行
        conf.set("mapreduce.app-submission.cross-platform", "true");
        //指定yarn resourcemanager的位置
        conf.set("yarn.resourcemanager.hostname", "hadoop103");

        // 启动mrJob
        Job job = Job.getInstance(conf);
        //设置主启动类
        // job.setJarByClass(WCDriverForHDFSAndCluster.class);

        //设置jar的绝对路径并注释掉主启动类
        job.setJar("E:\code\hadoop_code\MRDemo1_wordCount\target\MRDemo1_wordCount-1.0-SNAPSHOT.jar");
        
        job.setMapperClass(WCMapper.class);
        
        job.setReducerClass(WCReducer.class);

        //设置map输出key类型
        job.setMapOutputKeyClass(Text.class);
        //设置map输出value类型
        job.setMapOutputValueClass(IntWritable.class);

        //设置最终输出key
        job.setOutputKeyClass(Text.class);
        //设置最终输出value
        job.setOutputValueClass(IntWritable.class);


        //文件输入
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //最终结果输出
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //执行job
        job.waitForCompletion(true);
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/531747.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号