目录
一、问题描述
二、intellij idea中编写代码+打包项目
三、xftp中上传jar包到Linux
四、hadoop中准备输入数据+运行jar包+查看输出结果
一、问题描述
使用MapReduce,对于每个用户A建议10个与A不是朋友的用户,但与A有最多共同朋友。
输入:
输入文件包含邻接列表,在列表中有多行,每行格式如下
0 1,2,3,…
这里,“0”是唯一的整数ID,对应于唯一的用户,“1,2,3…”是一个逗号分隔的唯一ID列表,对应于具有唯一ID的用户的好友。请注意,这些好友是相互的(即边是无向的):如果A是B的朋友,那么B也是A的朋友。
输出:
将结果输出到txt文件,应包含每个用户一行,每行格式如下
1 2,4,5,10,…
其中“1”是对应于用户的唯一ID,“2,4,5,10,…”是一个以逗号分隔的唯一ID列表,对应于算法的推荐可能知道的人,按递减顺序排列共同的朋友数量。
二、intellij idea中编写代码+打包项目
1、创建项目
2、导入jar包
4.0.0 org.example hadoop1.0-SNAPSHOT hadoop http://maven.apache.org UTF-8 1.7 1.7 junit junit4.11 test org.apache.hadoop hadoop-common2.6.0 provided org.apache.hadoop hadoop-mapreduce-client-jobclient2.6.0 provided org.apache.hadoop hadoop-hdfs2.6.0 commons-cli commons-cli1.2
3、编写MR代码
(项目结构)
(FOF)
类型定义由于A:B与B:A是同一个潜在好友列表,为了能够方便的统计,故统一按照字典排序,输出A:B格式
package org.example.recommend_friends;
import org.apache.hadoop.io.Text;
public class FOF extends Text {
public FOF(){
super();
}
public FOF(String friend01,String friend02){
set(getof(friend01,friend02));
}
private String getof(String friend01,String friend02){
int c = friend01.compareTo(friend02);
if(c>0){
return friend02+"t"+friend01;
}
return friend01+"t"+friend02;
}
}
(Map01)
package org.example.recommend_friends; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.StringUtils; import java.io.IOException; //map01,统计好友之间的FOF关系(潜在好友关系) public class Map01 extends Mapper{ @Override protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{ String lines = value.toString(); //用户所有好友列表 String userAndFriends[] = StringUtils.split(lines,'t'); String user = userAndFriends[0]; String[] friends; if(userAndFriends.length == 1){ return; }else if (userAndFriends[1].length() == 1){ friends = new String[]{userAndFriends[1]}; }else{ friends = userAndFriends[1].split(","); } //好友之间的FOF关系矩阵 for(int i=0;i (Reduce01)
package org.example.recommend_friends; //import com.sun.deploy.util.StringUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.StringUtils; import java.io.IOException; //reduce函数,统计全部的FOF关系列表的系数 public class Reduce01 extends Reducer{ @Override protected void reduce(FOF key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; boolean f = true; for(IntWritable i : values){ if(0==i.get()){ //已经是好友关系 f = false; break; } sum+=i.get(); //累计,统计FOF的系数 } if (f) { String msg = StringUtils.split(key.toString(), 't')[0]+" "+StringUtils.split(key.toString(), 't')[1]+" "+sum; System.out.println(msg); context.write(new Text(msg), NullWritable.get()); //输出key为潜在好友对,值为出现的次数 } } } (FriendSort)
package org.example.recommend_friends; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class FriendSort implements WritableComparable{ private String friend; private int hot; public String getFriend(){ return friend; } public void setFriend(String friend){ this.friend = friend; } public int getHot(){ return hot; } public void setHot(int hot){ this.hot = hot; } public FriendSort(){ super(); } public FriendSort(String friend,int hot){ this.hot = hot; this.friend = friend; } //反序列化 @Override public void readFields(DataInput in) throws IOException{ this.friend = in.readUTF(); this.hot = in.readInt(); } //序列化 @Override public void write(DataOutput out) throws IOException{ out.writeUTF(friend); out.writeInt(hot); } //判断是否为同一用户,并通过hot值排序 @Override public int compareTo(FriendSort newFriend){ int c = friend.compareTo(newFriend.getFriend()); int e = -Integer.compare(hot,newFriend.getHot()); if (c==0){ return e; } return c; } } (Map02)
package org.example.recommend_friends; //import com.sun.deploy.util.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.StringUtils; import java.io.IOException; //map函数,每个用户的推荐好友列表,并按推荐指数从大到小排序 public class Map02 extends Mapper{ @Override protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{ String lines = value.toString(); String friend01 = StringUtils.split(lines,' ')[0]; String friend02 = StringUtils.split(lines,' ')[1]; //推荐的好友 int hot = Integer.parseInt(StringUtils.split(lines,' ')[2]); // 该推荐好友的推荐系数 System.out.println(friend01+" "+friend02+" "+hot); System.out.println(friend02+" "+friend01+" "+hot); context.write(new FriendSort(friend01,hot),new Text(friend02+":"+hot)); //mapkey输出用户和好友推荐系数 context.write(new FriendSort(friend02,hot),new Text(friend01+":"+hot)); //好友关系是相互的 } } (NumSort)
package org.example.recommend_friends; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; //将key根据用户名和次序排序 public class NumSort extends WritableComparator { public NumSort(){ super(FriendSort.class,true); } public int compare(WritableComparable a,WritableComparable b){ FriendSort o1 = (FriendSort) a; FriendSort o2 = (FriendSort) b; int r =o1.getFriend().compareTo(o2.getFriend()); if(r==0){ return -Integer.compare(o1.getHot(), o2.getHot()); } return r; } }(UserGroup)
package org.example.recommend_friends; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class UserGroup extends WritableComparator { public UserGroup(){ super(FriendSort.class,true); } public int compare(WritableComparable a,WritableComparable b){ FriendSort o1 =(FriendSort) a; FriendSort o2 =(FriendSort) b; return o1.getFriend().compareTo(o2.getFriend()); } }(Reduce02)
package org.example.recommend_friends; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class Reduce02 extends Reducer{ @Override protected void reduce(FriendSort user,Iterable friends,Context context) throws IOException,InterruptedException{ String msg=""; // for(Text friend : friends){ msg += friend.toString() +","; } context.write(new Text(user.getFriend()),new Text(msg)); } } (JobFriends)
package org.example.recommend_friends; import com.sun.org.apache.xpath.internal.operations.Bool; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class JobFriends { public static void main(String[] args){ Boolean flag = jobOne(); if(flag){ jobTwo(); } } //MapReduce01 private static Boolean jobOne(){ Configuration config = new Configuration(); boolean flag = false; try { Job job = Job.getInstance(config); job.setJarByClass(JobFriends.class); job.setJobName("fof one job"); job.setMapperClass(Map01.class); job.setReducerClass(Reduce01.class); job.setOutputKeyClass(FOF.class); job.setOutputValueClass(IntWritable.class); // 此处路径都为 hdfs文件系统下输入输出数据的路径 /txt/input.txt(输入) /txt/output1.txt(中间结果) /txt/output2.txt(输出) Path input = new Path("/txt/input.txt"); FileInputFormat.addInputPath(job, input); Path output = new Path("/txt/output1.txt"); //如果文件存在,删除文件,方便后续调试 if(output.getFileSystem(config).exists(output)){ output.getFileSystem(config).delete(output,true); } FileOutputFormat.setOutputPath(job,output); flag = job.waitForCompletion(true); if(flag){ System.out.println("job1 success..."); } }catch (Exception e){ e.printStackTrace(); } return flag; } private static Boolean jobTwo(){ Configuration config = new Configuration(); Boolean flag = false; try { Job job = Job.getInstance(config); job.setJarByClass(JobFriends.class); job.setJobName("fof two job"); job.setMapperClass(Map02.class); job.setReducerClass(Reduce02.class); job.setSortComparatorClass(NumSort.class); job.setGroupingComparatorClass(UserGroup.class); job.setMapOutputKeyClass(FriendSort.class); job.setMapOutputValueClass(Text.class); Path input = new Path("/txt/output1.txt"); FileInputFormat.addInputPath(job, input); Path output = new Path("/txt/output2.txt"); //如果文件存在,删除文件,方便后续调试 if(output.getFileSystem(config).exists(output)){ output.getFileSystem(config).delete(output,true); } FileOutputFormat.setOutputPath(job,output); flag = job.waitForCompletion(true); if(flag){ System.out.println("job2 success..."); } }catch (Exception e){ e.printStackTrace(); }; return flag; } }(App)
注释掉main方法,或直接把App类删掉
4、打包项目
项目右键 → Open Module Settings → Artifacts
点击OK,接下来
ps:
三、xftp中上传jar包到Linux
把打包好的jar通过xftp上传到Linux中
四、hadoop中准备输入数据+运行jar包+查看输出结果
1、准备输入数据
下载数据源
修改txt文件名称
把input.txt从本地上传到hdfs文件系统下
2、运行jar包
3、查看输出结果
在/txt文件下新增了output1.txt (初始数据经一次MR所得的中间结果)和 output2.txt(中间结果经一次MR所得的最终结果)
分别查看output1.txt、output2.txt内容
到此就完成了在Hadoop2.6.0+Linux Centos7+idea环境下,使用MapReduce算法实现二度好友推荐。
参考博客:
hadoop2.7.3+win10+idea环境下:MapReduce实现二度好友推荐算法-渣渣的夏天
CentOS 7.6 下利用 Hadoop2.6.0 运行 MapReduce 案例 WordCount-渣渣的夏天
idea中运行hadoop的案例使用打jar包的方式操作(HDFS java API)-快乐的小银龙-博客园



