栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

代码001

代码001

Recommend类

package text;

import java.util.HashMap;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;

import recommendMovie.Step1;

public class Recommend {
    public static final Pattern DELIMITER=Pattern.compile("[t,]");
    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        Configuration conf = new Configuration();
        conf.set("fs.deFaultFS", "file:///");
        HashMap path =new HashMap();
        path.put("Step1Input", "E://data//recommend");
        path.put("Step1Output", path.get("Step1Input")+"/Step1");
        
        path.put("Step2Input", path.get("Step1Output"));
        path.put("Step2Output", path.get("Step1Input")+"/Step2");
        
        path.put("Step3Input", path.get("Step1Output"));
        path.put("Step3Output", path.get("Step1Input")+"/Step3");
        
        path.put("Step4_1Input1",path.get("Step2Output") );
        path.put("Step4_1Input2", path.get("Step3Output"));
        path.put("Step4_1Output", path.get("Step1Input")+"/Step4");
        
        path.put("Step4_2Input",path.get("Step4_1Output") );
        path.put("Step4_2Output", path.get("Step1Input")+"/Step4_2");
        
        path.put("Step5Input1", path.get("Step4_2Output"));
        path.put("Step5Input2", path.get("Step1Input")+"/StepData");
        path.put("Step5Output", path.get("Step1Input")+"/Step5");
        
//        Step1.run(conf, path);
//        Step2.run(conf, path);
//        Step3.run(conf, path);
//        Step4_1.run(conf,path);
//        Step4_2.run(conf, path);
        Step5.run(conf,path);
        
    }

}
 

Step1

package text;

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Step1  {

    public static class Step1_toItemPreMapper extends Mapper     IntWritable,Text>{

        @Override
        protected void map(LongWritable key, Text value, Mapper.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            String[] tokens=Recommend.DELIMITER.split(value.toString());
            int  UserID=Integer.parseInt(tokens[0]);
            String ItemID=tokens[1];
            String pref = tokens[2];
            IntWritable k= new IntWritable(UserID);
            Text v= new Text(ItemID+":"+pref);
            context.write(k,v);
            
        }
    }
    
    public static class Step1_ToUserVectorReducer extends Reducer     Text,IntWritable,Text>{

        @Override
        protected void reduce(IntWritable key, Iterable values,
                Reducer.Context context) throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            StringBuilder sb = new StringBuilder();
            for(Text value:values) {
                sb.append(","+value.toString());
            }
            context.write(key, new Text(sb.toString().replaceFirst(",", "")));
        }
        
    }
    
    public static void run(Configuration config ,Map path)
    throws Exception
    {
        FileSystem fs =  FileSystem.get(config);
        Path output=new Path(path.get("Step1Output"));
        if(fs.exists(output)) {
            fs.delete(output,true);
        }
        Job job=Job.getInstance(config);
        job.setJobName("Step1");
         job.setJarByClass(Step1.class);
        job.setMapperClass(Step1_toItemPreMapper.class);
        job.setReducerClass(Step1_ToUserVectorReducer.class);
        
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);
        
        FileInputFormat.addInputPath(job, new Path(path.get("Step1Input")));
        FileOutputFormat.setOutputPath(job, output);
        job.waitForCompletion(true);
        
        
    }

}
 

Step2

package text;

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import text.Step1.Step1_ToUserVectorReducer;
import text.Step1.Step1_toItemPreMapper;

public class Step2 {
    public static class Step2_UserVectorToCoocurrenceMapper extends Mapper     Text,IntWritable>{

        @Override
        protected void map(LongWritable key, Text value, Mapper.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            String[] tokens=Recommend.DELIMITER.split(value.toString());
            for(int i=1;i                 String itemID1=tokens[i].split(":")[0];
                for(int j=1;j                     String itemID2=tokens[j].split(":")[0];
                    context.write(new Text(itemID1+":"+itemID2), new IntWritable(1));
                }
            }
        }
        
    }
    public static class Step2_UserVectorToConoccurrenceReducer extends Reducer{

        @Override
        protected void reduce(Text key, Iterable values,
                Reducer.Context context) 
                        throws IOException, InterruptedException {
            int sum=0;
            for(IntWritable value:values) {
                sum+=value.get();    
            }
            context.write(key, new IntWritable(sum));
        }    
    }
    public static void run(Configuration config ,Map path)
            throws Exception
            {
                FileSystem fs =  FileSystem.get(config);
                Path output=new Path(path.get("Step2Output"));
                if(fs.exists(output)) {
                    fs.delete(output,true);
                }
                Job job=Job.getInstance(config);
                job.setJobName("Step2");
                 job.setJarByClass(Step2.class);
                job.setMapperClass(Step2_UserVectorToCoocurrenceMapper.class);
                job.setReducerClass(Step2_UserVectorToConoccurrenceReducer.class);
                
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(IntWritable.class);
                
                FileInputFormat.addInputPath(job, new Path(path.get("Step2Input")));
                FileOutputFormat.setOutputPath(job, new Path(path.get("Step2Output")));
                job.waitForCompletion(true);
            }
}
 

Step3

package text;

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Step3 {
    
    public static class Step3_ItemPrefVectorMapper extends Mapper     Text,Text,Text>{

        @Override
        protected void map(LongWritable key, Text value, Mapper.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            String[] tokens=Recommend.DELIMITER.split(value.toString());
            String userID=tokens[0];
            for(int i=1;i             String itemID=tokens[i].split(":")[0];
            String pref=tokens[i].split(":")[1];
            context.write(new Text(itemID), new Text(userID+":"+pref));}
        }
        
    }
    
    public static void run(Configuration config ,Map path)
            throws Exception
            {
                FileSystem fs =  FileSystem.get(config);
                Path output=new Path(path.get("Step3Output"));
                if(fs.exists(output)) {
                    fs.delete(output,true);
                }
                Job job=Job.getInstance(config);
                job.setJobName("Step3");
                 job.setJarByClass(Step3.class);
                job.setMapperClass(Step3_ItemPrefVectorMapper.class);
//                job.setReducerClass(Step2_UserVectorToConoccurrenceReducer.class);
                
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                
                FileInputFormat.addInputPath(job, new Path(path.get("Step3Input")));
                FileOutputFormat.setOutputPath(job, new Path(path.get("Step3Output")));
                job.waitForCompletion(true);
            }
}
 

Step4_1

package text;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import text.Step3.Step3_ItemPrefVectorMapper;

public class Step4_1 {
    public static class Step4_1_filterMapper extends Mapper     Text,Text,Text>{
        
        //用来判断文件来自于Step2,还是Step3
        private String flag;
        @Override
        protected void setup(Mapper.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            FileSplit split = (FileSplit)context.getInputSplit();
            flag=split.getPath().getParent().getName();
        }

        @Override
        protected void map(LongWritable key, Text value, Mapper.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            String[] tokens=Recommend.DELIMITER.split(value.toString());
            if(flag.equals("Step2")) {
                String itemID1=tokens[0].split(":")[0];
                String itemID2=tokens[0].split(":")[1];
                int num=Integer.parseInt(tokens[1]);
                context.write(new Text(itemID1), new Text("A:"+itemID2+","+num));
            }else if(flag.equals("Step3")) {
                String itemID=tokens[0];
                String userID=tokens[1].split(":")[0];
                String  pref =tokens[1].split(":")[1];
                context.write(new Text(itemID), new Text("B:"+userID+","+pref));
            }
            
                    
        }
        
    }
    
    
    public static class Step4_1_MultplyReducer extends Reducer     Text,Text>{

        @Override
        protected void reduce(Text arg0, Iterable arg1, Reducer.Context arg2)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            HashMap mapA=new HashMap();
            HashMap mapB=new HashMap();
            for(Text line:arg1) {
                String val=line.toString();
                if(val.startsWith("A")) {
                    String[] temp=Recommend.DELIMITER.split(val.substring(2));
                    mapA.put(temp[0], temp[1]);
                }else if (val.startsWith("B")) {
                    String[] temp=Recommend.DELIMITER.split(val.substring(2));
                    mapB.put(temp[0], temp[1]);
                }
            }
            
            double result=0;
            Iterator iter =mapA.keySet().iterator();
            while(iter.hasNext()) {
                String mapk=iter.next();
                int num = Integer.parseInt(mapA.get(mapk));
                Iterator iterb =mapB.keySet().iterator();
                while(iterb.hasNext()) {
                    String mapkb=iterb.next();//user
                    Double pref=Double.parseDouble(mapB.get(mapkb));
                    result=num*pref;
                    arg2.write(new Text(mapkb), new Text(mapk+","+result));
                }
            }
        }
        
        
    }
    
    public static void run(Configuration config ,Map path)
            throws Exception
            {
                FileSystem fs =  FileSystem.get(config);
                Path output=new Path(path.get("Step4_1Output"));
                if(fs.exists(output)) {
                    fs.delete(output,true);
                }
                Job job=Job.getInstance(config);
                job.setJobName("Step4");
                 job.setJarByClass(Step4_1.class);
                job.setMapperClass(Step4_1_filterMapper.class);
                job.setReducerClass(Step4_1_MultplyReducer.class);
                
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                
                FileInputFormat.setInputPaths(job, new Path[] {new Path(path.get("Step4_1Input1")),new Path(path.get("Step4_1Input2"))});
                FileOutputFormat.setOutputPath(job, new Path(path.get("Step4_1Output")));
                job.waitForCompletion(true);
            }

}
 

Step4_2

package text;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import text.Step3.Step3_ItemPrefVectorMapper;

public class Step4_1 {
    public static class Step4_1_filterMapper extends Mapper     Text,Text,Text>{
        
        //用来判断文件来自于Step2,还是Step3
        private String flag;
        @Override
        protected void setup(Mapper.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            FileSplit split = (FileSplit)context.getInputSplit();
            flag=split.getPath().getParent().getName();
        }

        @Override
        protected void map(LongWritable key, Text value, Mapper.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            String[] tokens=Recommend.DELIMITER.split(value.toString());
            if(flag.equals("Step2")) {
                String itemID1=tokens[0].split(":")[0];
                String itemID2=tokens[0].split(":")[1];
                int num=Integer.parseInt(tokens[1]);
                context.write(new Text(itemID1), new Text("A:"+itemID2+","+num));
            }else if(flag.equals("Step3")) {
                String itemID=tokens[0];
                String userID=tokens[1].split(":")[0];
                String  pref =tokens[1].split(":")[1];
                context.write(new Text(itemID), new Text("B:"+userID+","+pref));
            }
            
                    
        }
        
    }
    
    
    public static class Step4_1_MultplyReducer extends Reducer     Text,Text>{

        @Override
        protected void reduce(Text arg0, Iterable arg1, Reducer.Context arg2)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            HashMap mapA=new HashMap();
            HashMap mapB=new HashMap();
            for(Text line:arg1) {
                String val=line.toString();
                if(val.startsWith("A")) {
                    String[] temp=Recommend.DELIMITER.split(val.substring(2));
                    mapA.put(temp[0], temp[1]);
                }else if (val.startsWith("B")) {
                    String[] temp=Recommend.DELIMITER.split(val.substring(2));
                    mapB.put(temp[0], temp[1]);
                }
            }
            
            double result=0;
            Iterator iter =mapA.keySet().iterator();
            while(iter.hasNext()) {
                String mapk=iter.next();
                int num = Integer.parseInt(mapA.get(mapk));
                Iterator iterb =mapB.keySet().iterator();
                while(iterb.hasNext()) {
                    String mapkb=iterb.next();//user
                    Double pref=Double.parseDouble(mapB.get(mapkb));
                    result=num*pref;
                    arg2.write(new Text(mapkb), new Text(mapk+","+result));
                }
            }
        }
        
        
    }
    
    public static void run(Configuration config ,Map path)
            throws Exception
            {
                FileSystem fs =  FileSystem.get(config);
                Path output=new Path(path.get("Step4_1Output"));
                if(fs.exists(output)) {
                    fs.delete(output,true);
                }
                Job job=Job.getInstance(config);
                job.setJobName("Step4");
                 job.setJarByClass(Step4_1.class);
                job.setMapperClass(Step4_1_filterMapper.class);
                job.setReducerClass(Step4_1_MultplyReducer.class);
                
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                
                FileInputFormat.setInputPaths(job, new Path[] {new Path(path.get("Step4_1Input1")),new Path(path.get("Step4_1Input2"))});
                FileOutputFormat.setOutputPath(job, new Path(path.get("Step4_1Output")));
                job.waitForCompletion(true);
            }

}
 

Step5

package text;

import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.linkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import text.Step4_2.Step4_2functionMapper;
import text.Step4_2.Step4_2functionReducer;

public class Step5 {
    public static class Step5_Mapper extends Mapper{
        private String flag;
        

        @Override
        protected void setup(Mapper.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
                        FileSplit split=(FileSplit)context.getInputSplit();
            flag=split.getPath().getParent().getName();
        }
        @Override
        protected void map(LongWritable key, Text value, Mapper.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            if(flag.equals("Step4_2")) {
                String[] tokens=Recommend.DELIMITER.split(value.toString());
                String userID=tokens[0];
                String itemID=tokens[1];
                String pref=tokens[2];
                context.write(new Text(userID),new Text("W:"+itemID+","+pref));
            }else if(flag.equals("StepData")) {
                String[] tokens=Recommend.DELIMITER.split(value.toString());
                String userID=tokens[0];
                String itemID=tokens[1];
                String pref=tokens[2];
                context.write(new Text(userID),new Text("S:"+itemID+","+pref));
            }
        }
    }
    
    public static class Step5_reducer extends Reducer{

        @Override
        protected void reduce(Text arg0, Iterable arg1, Reducer.Context arg2)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            //过滤
            HashMap wMap=new HashMap();
            HashMap sMap=new HashMap();
            
            List> list=new linkedList>();
            //注意这边arg1是多个字段
            for(Text line:arg1) {
            String[] tokens=Recommend.DELIMITER.split(line.toString().substring(2));
            String itemID=tokens[0];
            String pref=tokens[1];
            if(line.toString().startsWith("W")) {
                wMap.put(itemID,pref);
            }else if(line.toString().startsWith("S")) {
                sMap.put(itemID, pref);
            }
            }
            HashMap flatMap=new HashMap();
            Iterator iter=wMap.keySet().iterator();
            while(iter.hasNext()) {
                String k=iter.next();
                if(sMap.containsKey(k)==false) {
                    flatMap.put(k, Float.parseFloat(wMap.get(k)));
                }
            }
            
            for(Entry entry:flatMap.entrySet() ) {
                list.add(entry);
            }
            
            list.sort(new Comparator>(){

                @Override
                public int compare(Entry o1, Entry o2) {
                    // TODO Auto-generated method stub
                    return (int)(o2.getValue()-o1.getValue());
                }
            }
            );
            for(int i=0;i                 arg2.write(arg0, new Text(list.get(i).getKey()+"t"+list.get(i).getValue()));
            }
                    
        }
        
    }
    
    
    public static void run(Configuration config ,Map path)
            throws Exception
            {
                FileSystem fs =  FileSystem.get(config);
                Path output=new Path(path.get("Step5Output"));
                if(fs.exists(output)) {
                    fs.delete(output,true);
                }
                Job job=Job.getInstance(config);
                job.setJobName("Step5");
                 job.setJarByClass(Step5.class);
                job.setMapperClass(Step5_Mapper.class);
                job.setReducerClass(Step5_reducer.class);
                
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(Text.class);
                
                FileInputFormat.setInputPaths(job, new Path[] {new Path(path.get("Step5Input1")),
                        new Path(path.get("Step5Input2"))});
                FileOutputFormat.setOutputPath(job, new Path(path.get("Step5Output")));
                job.waitForCompletion(true);
            }
}
 

数据

1,101,5
1,102,3
1,103,2.5
2,101,2
2,102,2.5
2,103,5
2,104,2
3,101,2
3,104,4
3,105,4.5
3,107,5
4,101,5
4,103,3
4,104,4.5
4,106,4
5,101,4
5,102,3
5,103,2
5,104,4
5,105,3.5
5,106,4
6,102,4
6,103,2
6,105,3.5
6,107,4
 最终结果

1    104    33.5
1    105    21.0
1    106    18.0
1    107    10.5
2    105    23.0
2    106    20.5
2    107    11.5
3    103    34.0
3    102    28.0
3    106    16.5
4    102    40.0
4    105    29.0
4    107    12.5
5    107    20.0
6    101    31.0
6    104    25.0
6    106    11.5
 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/672828.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号