栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hadoop2.6.0+Linux Centos7+idea环境下:MapReduce二度好友推荐案例

Hadoop2.6.0+Linux Centos7+idea环境下:MapReduce二度好友推荐案例

目录

一、问题描述

二、intellij idea中编写代码+打包项目

三、xftp中上传jar包到Linux

四、hadoop中准备输入数据+运行jar包+查看输出结果


一、问题描述

使用MapReduce,对于每个用户A建议10个与A不是朋友的用户,但与A有最多共同朋友。

输入:

输入文件包含邻接列表,在列表中有多行,每行格式如下

0        1,2,3,…

这里,“0”是唯一的整数ID,对应于唯一的用户,“1,2,3…”是一个逗号分隔的唯一ID列表,对应于具有唯一ID的用户的好友。请注意,这些好友是相互的(即边是无向的):如果A是B的朋友,那么B也是A的朋友。

输出:

将结果输出到txt文件,应包含每个用户一行,每行格式如下

1        2,4,5,10,…

其中“1”是对应于用户的唯一ID,“2,4,5,10,…”是一个以逗号分隔的唯一ID列表,对应于算法的推荐可能知道的人,按递减顺序排列共同的朋友数量。

二、intellij idea中编写代码+打包项目

1、创建项目

2、导入jar包




  4.0.0

  org.example
  hadoop
  1.0-SNAPSHOT

  hadoop
  
  http://maven.apache.org

  
    UTF-8
    1.7
    1.7
  

  
    
      junit
      junit
      4.11
      test
    

    
      org.apache.hadoop
      hadoop-common
      2.6.0
      provided
    

    
      org.apache.hadoop
      hadoop-mapreduce-client-jobclient
      2.6.0
      provided
    

    
      org.apache.hadoop
      hadoop-hdfs
      2.6.0
    
    
      commons-cli
      commons-cli
      1.2
    

  

3、编写MR代码

(项目结构)

(FOF)

类型定义由于A:B与B:A是同一个潜在好友列表,为了能够方便的统计,故统一按照字典排序,输出A:B格式

package org.example.recommend_friends;

import org.apache.hadoop.io.Text;

public class FOF  extends Text {

    public FOF(){
        super();
    }

    public FOF(String friend01,String friend02){
        set(getof(friend01,friend02));
    }

    private String getof(String friend01,String friend02){
        int c = friend01.compareTo(friend02);
        if(c>0){
            return friend02+"t"+friend01;
        }
        return friend01+"t"+friend02;
    }
}

(Map01)

package org.example.recommend_friends;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;

import java.io.IOException;

//map01,统计好友之间的FOF关系(潜在好友关系)

public class Map01 extends  Mapper {

    @Override
    protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
        String lines = value.toString(); //用户所有好友列表
        String userAndFriends[] = StringUtils.split(lines,'t');
        String user = userAndFriends[0];
        String[] friends;
        if(userAndFriends.length == 1){
            return;
        }else if (userAndFriends[1].length() == 1){
            friends = new String[]{userAndFriends[1]};
        }else{
            friends = userAndFriends[1].split(",");
        }

        //好友之间的FOF关系矩阵
        for(int i=0;i 
 

(Reduce01)

package org.example.recommend_friends;

//import com.sun.deploy.util.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.StringUtils;

import java.io.IOException;


//reduce函数,统计全部的FOF关系列表的系数
public class Reduce01 extends Reducer {

    @Override
    protected void reduce(FOF key, Iterable values, Context context) throws IOException, InterruptedException {

        int sum = 0;
        boolean f = true;
        for(IntWritable i : values){
            if(0==i.get()){  //已经是好友关系
                f = false;
                break;
            }
            sum+=i.get();  //累计,统计FOF的系数
        }
        if (f) {
            String msg = StringUtils.split(key.toString(), 't')[0]+" "+StringUtils.split(key.toString(), 't')[1]+" "+sum;
            System.out.println(msg);
            context.write(new Text(msg), NullWritable.get()); //输出key为潜在好友对,值为出现的次数
        }
    }
}

(FriendSort)

package org.example.recommend_friends;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FriendSort implements WritableComparable {

    private String friend;
    private int hot;

    public String getFriend(){
        return friend;
    }

    public void setFriend(String friend){
        this.friend = friend;
    }

    public int getHot(){
        return hot;
    }

    public void setHot(int hot){
        this.hot = hot;
    }

    public FriendSort(){
        super();
    }

    public FriendSort(String friend,int hot){
        this.hot = hot;
        this.friend = friend;
    }

    //反序列化
    @Override
    public void readFields(DataInput in) throws IOException{
        this.friend = in.readUTF();
        this.hot = in.readInt();
    }

    //序列化
    @Override
    public void write(DataOutput out) throws IOException{
        out.writeUTF(friend);
        out.writeInt(hot);
    }

    //判断是否为同一用户,并通过hot值排序
    @Override
    public int compareTo(FriendSort newFriend){
        int c = friend.compareTo(newFriend.getFriend());
        int e = -Integer.compare(hot,newFriend.getHot());
        if (c==0){
            return e;
        }
        return c;
    }

}

(Map02)

package org.example.recommend_friends;

//import com.sun.deploy.util.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;

import java.io.IOException;

//map函数,每个用户的推荐好友列表,并按推荐指数从大到小排序
public class Map02 extends Mapper {

    @Override
    protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{

        String lines = value.toString();
        String friend01 = StringUtils.split(lines,' ')[0];
        String friend02 = StringUtils.split(lines,' ')[1]; //推荐的好友
        int hot = Integer.parseInt(StringUtils.split(lines,' ')[2]);	// 该推荐好友的推荐系数

        System.out.println(friend01+" "+friend02+" "+hot);
        System.out.println(friend02+" "+friend01+" "+hot);
        context.write(new FriendSort(friend01,hot),new Text(friend02+":"+hot)); //mapkey输出用户和好友推荐系数
        context.write(new FriendSort(friend02,hot),new Text(friend01+":"+hot)); //好友关系是相互的
    }
}

(NumSort)

package org.example.recommend_friends;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

//将key根据用户名和次序排序
public class NumSort extends WritableComparator {
    public NumSort(){
        super(FriendSort.class,true);
    }

    public int compare(WritableComparable a,WritableComparable b){
        FriendSort o1 = (FriendSort) a;
        FriendSort o2 = (FriendSort) b;

        int r =o1.getFriend().compareTo(o2.getFriend());
        if(r==0){
            return -Integer.compare(o1.getHot(), o2.getHot());
        }
        return r;
    }
}

(UserGroup)

package org.example.recommend_friends;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class UserGroup extends WritableComparator {
    public UserGroup(){
        super(FriendSort.class,true);
    }

    public int compare(WritableComparable a,WritableComparable b){
        FriendSort o1 =(FriendSort) a;
        FriendSort o2 =(FriendSort) b;
        return o1.getFriend().compareTo(o2.getFriend());
    }
}

(Reduce02)

package org.example.recommend_friends;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class Reduce02 extends Reducer {
    @Override
    protected void reduce(FriendSort user,Iterable friends,Context context) throws IOException,InterruptedException{
        String msg="";
        //
        for(Text friend : friends){
            msg += friend.toString() +",";
        }
        context.write(new Text(user.getFriend()),new Text(msg));
    }
}

(JobFriends)

package org.example.recommend_friends;

import com.sun.org.apache.xpath.internal.operations.Bool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class JobFriends {

    public static void main(String[] args){
        Boolean flag = jobOne();
        if(flag){
            jobTwo();
        }
    }

    //MapReduce01
    private static Boolean jobOne(){
        Configuration config = new Configuration();

        boolean flag = false;
        try {
            Job job = Job.getInstance(config);

            job.setJarByClass(JobFriends.class);
            job.setJobName("fof one job");

            job.setMapperClass(Map01.class);
            job.setReducerClass(Reduce01.class);

            job.setOutputKeyClass(FOF.class);
            job.setOutputValueClass(IntWritable.class);

//          此处路径都为 hdfs文件系统下输入输出数据的路径  /txt/input.txt(输入) /txt/output1.txt(中间结果)  /txt/output2.txt(输出)
            Path input = new Path("/txt/input.txt");
            FileInputFormat.addInputPath(job, input);

            Path output = new Path("/txt/output1.txt");
            //如果文件存在,删除文件,方便后续调试
            if(output.getFileSystem(config).exists(output)){
                output.getFileSystem(config).delete(output,true);
            }

            FileOutputFormat.setOutputPath(job,output);

            flag = job.waitForCompletion(true);
            if(flag){
                System.out.println("job1 success...");
            }

        }catch (Exception e){
            e.printStackTrace();
        }
        return  flag;
    }

    private static Boolean jobTwo(){
        Configuration config = new Configuration();

        Boolean flag = false;
        try {
            Job job = Job.getInstance(config);

            job.setJarByClass(JobFriends.class);
            job.setJobName("fof two job");

            job.setMapperClass(Map02.class);
            job.setReducerClass(Reduce02.class);
            job.setSortComparatorClass(NumSort.class);
            job.setGroupingComparatorClass(UserGroup.class);
            job.setMapOutputKeyClass(FriendSort.class);
            job.setMapOutputValueClass(Text.class);

            Path input = new Path("/txt/output1.txt");
            FileInputFormat.addInputPath(job, input);

            Path output = new Path("/txt/output2.txt");
            //如果文件存在,删除文件,方便后续调试
            if(output.getFileSystem(config).exists(output)){
                output.getFileSystem(config).delete(output,true);
            }

            FileOutputFormat.setOutputPath(job,output);

            flag = job.waitForCompletion(true);
            if(flag){
                System.out.println("job2 success...");
            }
        }catch (Exception e){
            e.printStackTrace();
        };
        return flag;
    }
}

(App)

注释掉main方法,或直接把App类删掉

4、打包项目

项目右键 → Open Module Settings → Artifacts 

点击OK,接下来

 ps:​

三、xftp中上传jar包到Linux

 把打包好的jar通过xftp上传到Linux中

四、hadoop中准备输入数据+运行jar包+查看输出结果

1、准备输入数据

下载数据源

 修改txt文件名称

 把input.txt从本地上传到hdfs文件系统下

2、运行jar包

3、查看输出结果

在/txt文件下新增了output1.txt (初始数据经一次MR所得的中间结果)和 output2.txt(中间结果经一次MR所得的最终结果)

 分别查看output1.txt、output2.txt内容

到此就完成了在Hadoop2.6.0+Linux Centos7+idea环境下,使用MapReduce算法实现二度好友推荐。

参考博客:

hadoop2.7.3+win10+idea环境下:MapReduce实现二度好友推荐算法-渣渣的夏天

CentOS 7.6 下利用 Hadoop2.6.0 运行 MapReduce 案例 WordCount-渣渣的夏天

idea中运行hadoop的案例使用打jar包的方式操作(HDFS java API)-快乐的小银龙-博客园

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/652787.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号