calls.txt
呼叫者手机号,接受者手机号,开始时间戳,结束时间戳,呼叫者地址省份编码,接受者地址省份编码
18620192711,15733218050,1506628174,1506628265,650000,810000
userPhone.txt
id,省份编码,省份名称
13,15733218050,杜泽文
location.txt
1,110000,北京市具体代码 1. 将电话号码替换成人名
知识点:
- map side join
- subString字符串切分
我在之前写的博客里面有详细解析过 map side join
package Demo.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
public class subject1 {
public static class demoMapper extends Mapper{
HashMap hashmap = new HashMap();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSystem fs = null;
try {
fs = FileSystem.get(new URI("hdfs://master:9000"),new Configuration());
} catch (URISyntaxException e) {
e.printStackTrace();
}
Path path = new Path("/data/userPhone.txt");
FSDataInputStream open = fs.open(path);
BufferedReader br = new BufferedReader(new InputStreamReader(open));
String line;
while((line=br.readLine())!=null){
hashmap.put(line.split(",")[1],line.split(",")[2]);
}
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(",");
String phone1 = split[0];
String phone2 = split[1];
String name1 = hashmap.get(phone1);
String name2 = hashmap.get(phone2);
String info = line.substring(phone1.length()+phone2.length()+2);
context.write(new Text(name1+","+name2+","),new Text(info));
}
}
public static void main(String[] args) throws Exception{
BasicConfigurator.configure();
// 配置mapreduce
Job job = Job.getInstance();
job.setJobName("zhang");
job.setJarByClass(subject1.class);
job.setMapperClass(demoMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
Path input1 = new Path("hdfs://master:9000/data/calls.txt");
FileInputFormat.addInputPath(job,input1);
Path output = new Path("hdfs://master:9000/output");//输出路径不能已存在
//获取文件系统对象fs,利用fs来对hdfs中的文件进行操作
FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"),new Configuration());
if(fs.exists(output)){
fs.delete(output,true);
}
FileOutputFormat.setOutputPath(job,output);
//启动
job.waitForCompletion(true);
}
}
结果为
我在代码的Driver部分,也就是main方法里面做了一些改动,设置了一个判断,如果输出路径存在则删去,这样可以直接在idea里面运行,而不需要每次再去删除output
2. 将拨打、接听电话的时间戳转换成日期知识点:
- 时间戳转为日期格式
这个知识点我也在之前的博客里面写过
解决时间戳转为日期格式总是为1970年的问题(currentTimeMillis()与数据集里面的数据戳)
package Demo.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
public class subject1 {
public static class demoMapper extends Mapper{
HashMap hashmap = new HashMap();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSystem fs = null;
try {
fs = FileSystem.get(new URI("hdfs://master:9000"),new Configuration());
} catch (URISyntaxException e) {
e.printStackTrace();
}
Path path = new Path("/data/userPhone.txt");
FSDataInputStream open = fs.open(path);
BufferedReader br = new BufferedReader(new InputStreamReader(open));
String line;
while((line=br.readLine())!=null){
hashmap.put(line.split(",")[1],line.split(",")[2]);
}
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(",");
//获取电话号码
String phone1 = split[0];
String phone2 = split[1];
//获取电话号码对应的姓名
String name1 = hashmap.get(phone1);
String name2 = hashmap.get(phone2);
//获取时间
String time1 = split[2];
String time2 = split[3];
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String sd1 = sdf.format(new Date(Long.parseLong(time1)*1000L));
String sd2 = sdf.format(new Date(Long.parseLong(time2)*1000L));
String info = split[4]+","+split[5];
//String info = line.substring(phone1.length()+phone2.length()+2);
context.write(new Text(name1+","+name2+","+sd1+","+sd2+","),new Text(info));
}
}
public static void main(String[] args) throws Exception{
BasicConfigurator.configure();
// 配置mapreduce
Job job = Job.getInstance();
job.setJobName("zhang");
job.setJarByClass(subject1.class);
job.setMapperClass(demoMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//指定路径
Path input1 = new Path("hdfs://master:9000/data/calls.txt");
FileInputFormat.addInputPath(job,input1);
Path output = new Path("hdfs://master:9000/output");//输出路径不能已存在
//获取文件系统对象fs,利用fs来对hdfs中的文件进行操作
FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"),new Configuration());
if(fs.exists(output)){
fs.delete(output,true);
}
FileOutputFormat.setOutputPath(job,output);
//启动
job.waitForCompletion(true);
}
}
结果为
知识点:
- 日期相减
Date格式的数据相减,需要借助getTime(),得出的结果是毫秒数
package Demo.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
public class subject1 {
public static class demoMapper extends Mapper{
HashMap hashmap = new HashMap();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSystem fs = null;
try {
fs = FileSystem.get(new URI("hdfs://master:9000"),new Configuration());
} catch (URISyntaxException e) {
e.printStackTrace();
}
Path path = new Path("/data/userPhone.txt");
FSDataInputStream open = fs.open(path);
BufferedReader br = new BufferedReader(new InputStreamReader(open));
String line;
while((line=br.readLine())!=null){
hashmap.put(line.split(",")[1],line.split(",")[2]);
}
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(",");
//获取电话号码
String phone1 = split[0];
String phone2 = split[1];
//获取电话号码对应的姓名
String name1 = hashmap.get(phone1);
String name2 = hashmap.get(phone2);
//获取时间
Date time1 = new Date(Long.parseLong(split[2]) * 1000L);
Date time2 = new Date(Long.parseLong(split[3]) * 1000L);
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String t1 = sdf.format(time1);
String t2 = sdf.format(time2);
//获取通话时间
//因为这样相减得到的是毫秒数,所以需要除以1000才是秒数
long time3 = (time2.getTime() - time1.getTime())/1000;
String t3 = time3+"秒";
String info = split[4]+","+split[5];
//String info = line.substring(phone1.length()+phone2.length()+2);
context.write(new Text(name1+","+name2+","+t1+","+t2+","+t3+","),new Text(info));
}
}
public static void main(String[] args) throws Exception{
BasicConfigurator.configure();
// 配置mapreduce
Job job = Job.getInstance();
job.setJobName("zhang");
job.setJarByClass(subject1.class);
job.setMapperClass(demoMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//指定路径
Path input1 = new Path("hdfs://master:9000/data/calls.txt");
FileInputFormat.addInputPath(job,input1);
Path output = new Path("hdfs://master:9000/output");//输出路径不能已存在
//获取文件系统对象fs,利用fs来对hdfs中的文件进行操作
FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"),new Configuration());
if(fs.exists(output)){
fs.delete(output,true);
}
FileOutputFormat.setOutputPath(job,output);
//启动
job.waitForCompletion(true);
}
}
结果为
package Demo.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
public class subject1 {
public static class demoMapper extends Mapper{
HashMap userPhone = new HashMap();
HashMap location = new HashMap();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSystem fs = null;
try {
fs = FileSystem.get(new URI("hdfs://master:9000"),new Configuration());
} catch (URISyntaxException e) {
e.printStackTrace();
}
Path path1 = new Path("/data/userPhone.txt");
FSDataInputStream open = fs.open(path1);
BufferedReader br = new BufferedReader(new InputStreamReader(open));
String line;
while((line=br.readLine())!=null){
userPhone.put(line.split(",")[1],line.split(",")[2]);
}
Path path2 = new Path("/data/location.txt");
FSDataInputStream open2 = fs.open(path2);
BufferedReader br2 = new BufferedReader(new InputStreamReader(open2));
String line2;
while((line2=br2.readLine())!=null){
location.put(line2.split(",")[1],line2.split(",")[2]);
}
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(",");
//获取电话号码
String phone1 = split[0];
String phone2 = split[1];
//获取电话号码对应的姓名
String name1 = userPhone.get(phone1);
String name2 = userPhone.get(phone2);
//获取时间
Date time1 = new Date(Long.parseLong(split[2]) * 1000L);
Date time2 = new Date(Long.parseLong(split[3]) * 1000L);
SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String t1 = sdf.format(time1);
String t2 = sdf.format(time2);
//获取通话时间
//因为这样相减得到的是毫秒数,所以需要除以1000才是秒数
long time3 = (time2.getTime() - time1.getTime())/1000;
String t3 = time3+"秒";
//获取地址编码
String code1 = split[4];
String code2 = split[5];
String location1 = location.get(code1);
String location2 = location.get(code2);
//String info = line.substring(phone1.length()+phone2.length()+2);
context.write(new Text(name1+","+name2+","+t1+","+t2+","+t3+","+location1+","+location2),NullWritable.get());
}
}
public static void main(String[] args) throws Exception{
BasicConfigurator.configure();
// 配置mapreduce
Job job = Job.getInstance();
job.setJobName("zhang");
job.setJarByClass(subject1.class);
job.setMapperClass(demoMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//指定路径
Path input1 = new Path("hdfs://master:9000/data/calls.txt");
FileInputFormat.addInputPath(job,input1);
Path output = new Path("hdfs://master:9000/output");//输出路径不能已存在
//获取文件系统对象fs,利用fs来对hdfs中的文件进行操作
FileSystem fs = FileSystem.get(new URI("hdfs://master:9000"),new Configuration());
if(fs.exists(output)){
fs.delete(output,true);
}
FileOutputFormat.setOutputPath(job,output);
//启动
job.waitForCompletion(true);
}
}
结果为



