栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021年安徽省大数据与人工智能应用竞赛——MapReduce(数据预处理)题目解答

2021年安徽省大数据与人工智能应用竞赛——MapReduce(数据预处理)题目解答

数据格式

calls.txt
呼叫者手机号,接受者手机号,开始时间戳,结束时间戳,呼叫者地址省份编码,接受者地址省份编码

18620192711,15733218050,1506628174,1506628265,650000,810000

userPhone.txt
id,省份编码,省份名称

13,15733218050,杜泽文

location.txt

1,110000,北京市
具体代码 1. 将电话号码替换成人名

知识点:

  1. map side join
  2. subString字符串切分

我在之前写的博客里面有详细解析过 map side join

package Demo.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;

public class subject1 {
    public static class demoMapper extends Mapper{
        HashMap hashmap = new HashMap();
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSystem fs = null;
            try {
                fs = FileSystem.get(new URI("hdfs://master:9000"),new Configuration());
            } catch (URISyntaxException e) {
                e.printStackTrace();
            }
            Path path = new Path("/data/userPhone.txt");
            FSDataInputStream open = fs.open(path);
            BufferedReader br = new BufferedReader(new InputStreamReader(open));
            String line;
            while((line=br.readLine())!=null){
                hashmap.put(line.split(",")[1],line.split(",")[2]);
            }
        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] split = line.split(",");
            String phone1 = split[0];
            String phone2 = split[1];
            String name1 = hashmap.get(phone1);
            String name2 = hashmap.get(phone2);
            String info = line.substring(phone1.length()+phone2.length()+2);
            context.write(new Text(name1+","+name2+","),new Text(info));
        }
    }

    public static void main(String[] args) throws Exception{
        BasicConfigurator.configure();
        // 配置mapreduce
        Job job = Job.getInstance();
        job.setJobName("zhang");
        job.setJarByClass(subject1.class);
        job.setMapperClass(demoMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        Path input1 = new Path("hdfs://master:9000/data/calls.txt");
        FileInputFormat.addInputPath(job,input1);
        Path output = new Path("hdfs://master:9000/output");//输出路径不能已存在

        //获取文件系统对象fs,利用fs来对hdfs中的文件进行操作
        FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"),new Configuration());
        if(fs.exists(output)){
            fs.delete(output,true);
        }

        FileOutputFormat.setOutputPath(job,output);
        //启动
        job.waitForCompletion(true);
    }
}

结果为

我在代码的Driver部分,也就是main方法里面做了一些改动,设置了一个判断,如果输出路径存在则删去,这样可以直接在idea里面运行,而不需要每次再去删除output

2. 将拨打、接听电话的时间戳转换成日期

知识点:

  1. 时间戳转为日期格式
    这个知识点我也在之前的博客里面写过
    解决时间戳转为日期格式总是为1970年的问题(currentTimeMillis()与数据集里面的数据戳)
package Demo.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;

public class subject1 {
    public static class demoMapper extends Mapper{
        HashMap hashmap = new HashMap();
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSystem fs = null;
            try {
                fs = FileSystem.get(new URI("hdfs://master:9000"),new Configuration());
            } catch (URISyntaxException e) {
                e.printStackTrace();
            }
            Path path = new Path("/data/userPhone.txt");
            FSDataInputStream open = fs.open(path);
            BufferedReader br = new BufferedReader(new InputStreamReader(open));
            String line;
            while((line=br.readLine())!=null){
                hashmap.put(line.split(",")[1],line.split(",")[2]);
            }
        }

        
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] split = line.split(",");
            //获取电话号码
            String phone1 = split[0];
            String phone2 = split[1];
            //获取电话号码对应的姓名
            String name1 = hashmap.get(phone1);
            String name2 = hashmap.get(phone2);
            //获取时间
            String time1 = split[2];
            String time2 = split[3];
            SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String sd1 = sdf.format(new Date(Long.parseLong(time1)*1000L));
            String sd2 = sdf.format(new Date(Long.parseLong(time2)*1000L));

            String info = split[4]+","+split[5];
            //String info = line.substring(phone1.length()+phone2.length()+2);
            context.write(new Text(name1+","+name2+","+sd1+","+sd2+","),new Text(info));
            
        }
    }

    public static void main(String[] args) throws Exception{
        BasicConfigurator.configure();
        // 配置mapreduce
        Job job = Job.getInstance();
        job.setJobName("zhang");
        job.setJarByClass(subject1.class);
        job.setMapperClass(demoMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //指定路径
        Path input1 = new Path("hdfs://master:9000/data/calls.txt");
        FileInputFormat.addInputPath(job,input1);

        Path output = new Path("hdfs://master:9000/output");//输出路径不能已存在

        //获取文件系统对象fs,利用fs来对hdfs中的文件进行操作
        FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"),new Configuration());
        if(fs.exists(output)){
            fs.delete(output,true);
        }

        FileOutputFormat.setOutputPath(job,output);
        //启动
        job.waitForCompletion(true);
    }
}

结果为

3. 求出电话的通话时间,以秒做单位

知识点:

  1. 日期相减
    Date格式的数据相减,需要借助getTime(),得出的结果是毫秒数
package Demo.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;

public class subject1 {
    public static class demoMapper extends Mapper{
        HashMap hashmap = new HashMap();
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSystem fs = null;
            try {
                fs = FileSystem.get(new URI("hdfs://master:9000"),new Configuration());
            } catch (URISyntaxException e) {
                e.printStackTrace();
            }
            Path path = new Path("/data/userPhone.txt");
            FSDataInputStream open = fs.open(path);
            BufferedReader br = new BufferedReader(new InputStreamReader(open));
            String line;
            while((line=br.readLine())!=null){
                hashmap.put(line.split(",")[1],line.split(",")[2]);
            }
        }

        
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] split = line.split(",");
            //获取电话号码
            String phone1 = split[0];
            String phone2 = split[1];
            //获取电话号码对应的姓名
            String name1 = hashmap.get(phone1);
            String name2 = hashmap.get(phone2);
            //获取时间
            Date time1 = new Date(Long.parseLong(split[2]) * 1000L);
            Date time2 = new Date(Long.parseLong(split[3]) * 1000L);
            SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String t1 = sdf.format(time1);
            String t2 = sdf.format(time2);

            //获取通话时间
            //因为这样相减得到的是毫秒数,所以需要除以1000才是秒数
            long time3 = (time2.getTime() - time1.getTime())/1000;
            String t3 = time3+"秒";

            String info = split[4]+","+split[5];
            //String info = line.substring(phone1.length()+phone2.length()+2);
            context.write(new Text(name1+","+name2+","+t1+","+t2+","+t3+","),new Text(info));

        }
    }

    public static void main(String[] args) throws Exception{
        BasicConfigurator.configure();
        // 配置mapreduce
        Job job = Job.getInstance();
        job.setJobName("zhang");
        job.setJarByClass(subject1.class);
        job.setMapperClass(demoMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //指定路径
        Path input1 = new Path("hdfs://master:9000/data/calls.txt");
        FileInputFormat.addInputPath(job,input1);

        Path output = new Path("hdfs://master:9000/output");//输出路径不能已存在

        //获取文件系统对象fs,利用fs来对hdfs中的文件进行操作
        FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"),new Configuration());
        if(fs.exists(output)){
            fs.delete(output,true);
        }

        FileOutputFormat.setOutputPath(job,output);
        //启动
        job.waitForCompletion(true);
    }
}

结果为

4. 将省份编码替换成省份名称
package Demo.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;

public class subject1 {
    public static class demoMapper extends Mapper{
        HashMap userPhone = new HashMap();
        HashMap location = new HashMap();
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSystem fs = null;
            try {
                fs = FileSystem.get(new URI("hdfs://master:9000"),new Configuration());
            } catch (URISyntaxException e) {
                e.printStackTrace();
            }
            Path path1 = new Path("/data/userPhone.txt");
            FSDataInputStream open = fs.open(path1);
            BufferedReader br = new BufferedReader(new InputStreamReader(open));
            String line;
            while((line=br.readLine())!=null){
                userPhone.put(line.split(",")[1],line.split(",")[2]);
            }

            Path path2 = new Path("/data/location.txt");
            FSDataInputStream open2 = fs.open(path2);
            BufferedReader br2 = new BufferedReader(new InputStreamReader(open2));
            String line2;
            while((line2=br2.readLine())!=null){
                location.put(line2.split(",")[1],line2.split(",")[2]);
            }
        }

        
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] split = line.split(",");
            //获取电话号码
            String phone1 = split[0];
            String phone2 = split[1];
            //获取电话号码对应的姓名
            String name1 = userPhone.get(phone1);
            String name2 = userPhone.get(phone2);
            //获取时间
            Date time1 = new Date(Long.parseLong(split[2]) * 1000L);
            Date time2 = new Date(Long.parseLong(split[3]) * 1000L);
            SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String t1 = sdf.format(time1);
            String t2 = sdf.format(time2);

            //获取通话时间
            //因为这样相减得到的是毫秒数,所以需要除以1000才是秒数
            long time3 = (time2.getTime() - time1.getTime())/1000;
            String t3 = time3+"秒";

            //获取地址编码
            String code1 = split[4];
            String code2 = split[5];
            String location1 = location.get(code1);
            String location2 = location.get(code2);

            //String info = line.substring(phone1.length()+phone2.length()+2);
            context.write(new Text(name1+","+name2+","+t1+","+t2+","+t3+","+location1+","+location2),NullWritable.get());

        }
    }

    public static void main(String[] args) throws Exception{
        BasicConfigurator.configure();
        // 配置mapreduce
        Job job = Job.getInstance();
        job.setJobName("zhang");
        job.setJarByClass(subject1.class);
        job.setMapperClass(demoMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        //指定路径
        Path input1 = new Path("hdfs://master:9000/data/calls.txt");
        FileInputFormat.addInputPath(job,input1);

        Path output = new Path("hdfs://master:9000/output");//输出路径不能已存在

        //获取文件系统对象fs,利用fs来对hdfs中的文件进行操作
        FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"),new Configuration());
        if(fs.exists(output)){
            fs.delete(output,true);
        }

        FileOutputFormat.setOutputPath(job,output);
        //启动
        job.waitForCompletion(true);
    }
}

结果为

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/604795.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号