栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

代码002

代码002

Recommmend 类

package UserRecommend;

import java.io.IOException;
import java.util.HashMap;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;

public class Recommend {
    public static Pattern REGEX=Pattern.compile("[t,]");
    public static void main(String[] args) throws Exception, IOException, InterruptedException {
        // TODO Auto-generated method stub
        HashMap path = new HashMap();
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "file:///");
        String start="E:\data\recommend2";
        path.put("Step1Input", start+"/StepData");
        path.put("Step1Output", start+"/Step1");
        
        path.put("Step2Input", path.get("Step1Output"));
        path.put("Step2Output", start+"/Step2");
        
        path.put("Step3Input", path.get("Step2Output"));
        path.put("Step3Output", start+"/Step3");
        
        path.put("Step4Input1", path.get("Step1Input"));
        path.put("Step4Input2", path.get("Step3Output"));
        path.put("Step4Output", start+"/Step4");
        
//        path.put("Step4_2Input", path.get("Step4_1Output"));
//        path.put("Step4_2Output", start+"/Step4_2");
        
        
//        path.put("Step5Input2", start+"/StepData");
        path.put("Step5Input", path.get("Step4Output"));
        path.put("Step5Output", start+"/Step5");
        
        
        Step1.run(conf, path);
        Step2.run(conf, path);
        Step3.run(conf, path);
        Step4.run(conf, path);
//        Step4_1.run(conf, path);
//        Step4_2.run(conf, path);
        Step5.run(conf, path);
    }

}
 

Step1

package UserRecommend;

//import hadoop.myMapreduce.martrix.MainRun;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class Step1 {
  public static class MyMapper extends Mapper {

      @Override
      public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
          String[] tokens = Recommend.REGEX.split(values.toString());
          
          if (tokens.length >= 3)
          {
                Text k = new Text(tokens[1]);//itemid
                Text v = new Text(tokens[0] + "," + tokens[2]);//userid + score
                context.write(k, v);                
          }
      }
  }

  public static class MyReducer extends Reducer {

      @Override
      public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
          Map map = new HashMap();
          
          for (Text line : values) {
              String val = line.toString();
              String[] vlist = Recommend.REGEX.split(val);
              
              if (vlist.length >= 2)
              {
                  map.put(vlist[0], vlist[1]);
              }
          }

          Iterator iterA = map.keySet().iterator();
          while (iterA.hasNext())
          {
              String k1 = iterA.next();
              String v1 = map.get(k1);
              Iterator iterB = map.keySet().iterator();
              while (iterB.hasNext())
              {
                  String k2 = iterB.next();
                  String v2 = map.get(k2);
                  context.write(new Text(k1 + "," + k2), new Text(v1 + "," + v2));
              }
          }
      }
  }

  public static void run(Configuration config,Map path) throws IOException, InterruptedException, ClassNotFoundException {
       FileSystem fs=FileSystem.get(config);

      Path output = new Path(path.get("Step1Output"));
      if(fs.exists(output)) {
          fs.delete(output,true);
      }
      Job job = Job.getInstance(config);
      job.setJarByClass(Step1.class);
//
//      job.setMapOutputKeyClass(Text.class);
//      job.setMapOutputValueClass(Text.class);
      
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);

      job.setMapperClass(MyMapper.class);
      job.setReducerClass(MyReducer.class);

      job.setInputFormatClass(TextInputFormat.class);
      job.setOutputFormatClass(TextOutputFormat.class);

      FileInputFormat.setInputPaths(job, new Path(path.get("Step1Input")));// 加载2个输入数据集
      FileOutputFormat.setOutputPath(job, output);

      
      job.waitForCompletion(true);

  }
}
 

Step2

package UserRecommend;


//import hadoop.myMapreduce.martrix.MainRun;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.lang.Math;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class Step2 {
  public static class MyMapper extends Mapper {

      @Override
      public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
          String[] tokens = Recommend.REGEX.split(values.toString());
          
          if (tokens.length >= 4)
          {
                Text k = new Text(tokens[0] + "," + tokens[1]);
                Text v = new Text(tokens[2] + "," + tokens[3]);
                context.write(k, v);                
          }
      }
  }

  public static class MyReducer extends Reducer {

      @Override
      public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
          double sum = 0.0;
          double similarity = 0.0;
          int num = 0;
          
          for (Text line : values) {
              String val = line.toString();
              String[] vlist = Recommend.REGEX.split(val);
              
              if (vlist.length >= 2)
              {
                  sum += Math.pow((Double.parseDouble(vlist[0]) - Double.parseDouble(vlist[1])), 2);
                  num += 1;
              }
          }
          
          if (sum > 0.00000001)
          {
              similarity = (double)num / (1 + Math.sqrt(sum));
          }
          
//          if (similarity > 1.0)
//          {
//              similarity = 1.0;
//          }
          
          context.write(key, new Text(String.format("%.7f", similarity)));
      }
  }

  public static void run(Configuration config,Map path) throws IOException, InterruptedException, ClassNotFoundException {

      FileSystem fs =FileSystem.get(config);
      Path output = new Path(path.get("Step2Output"));
      if(fs.exists(output)) {
          fs.delete(output,true);
      }
      Job job = Job.getInstance(config, "UserCF_Step2 job");
      job.setJarByClass(Step2.class);

//      job.setMapOutputKeyClass(Text.class);
//      job.setMapOutputValueClass(Text.class);
      
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);

      job.setMapperClass(MyMapper.class);
      job.setReducerClass(MyReducer.class);

      job.setInputFormatClass(TextInputFormat.class);
      job.setOutputFormatClass(TextOutputFormat.class);

      FileInputFormat.setInputPaths(job, new Path(path.get("Step2Input")));// 加载2个输入数据集
      FileOutputFormat.setOutputPath(job, new Path(path.get("Step2Output")));


      
      if (!job.waitForCompletion(true))
      {
          System.out.println("main run stop!");
            return;    
      }
      
      System.out.println("main run successfully!");
  }
}
 

Step3

package UserRecommend;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.lang.Math;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 
 
public class Step3 {
    public static class MyMapper extends Mapper {
 
        @Override
        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            String[] tokens = Recommend.REGEX.split(values.toString());
            
            if (tokens.length >= 3)
            {
                Text k = new Text(tokens[0]);
                Text v = new Text(tokens[1] + "," + tokens[2]);
                context.write(k, v);                
            }
        }
    }
 
    public static class MyReducer extends Reducer {
        private final int NEIGHBORHOOD_NUM = 2;
        
        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            Map map = new HashMap();
            
            for (Text line : values) {
                String val = line.toString();
                String[] vlist = Recommend.REGEX.split(val);
                
                if (vlist.length >= 2)
                {
                    map.put(Double.parseDouble(vlist[1]), vlist[0]);
                }
            }
            
            List list = new ArrayList();
            Iterator iter = map.keySet().iterator();
            while (iter.hasNext()) {
                Double similarity = iter.next();
                list.add(similarity);
            }
            
            //然后通过比较器来实现排序
            Collections.sort(list,new Comparator() {
                //降序排序
                public int compare(Double o1, Double o2) {
                    return o2.compareTo(o1);
                }
            });
            
//            for (int i = 0; i < NEIGHBORHOOD_NUM && i < list.size(); i++)
//            {
//                context.write(key, new Text(map.get(list.get(i)) + "," + String.format("%.7f", list.get(i))));
//            }
            
            String v = "";
            for (int i = 0; i < NEIGHBORHOOD_NUM && i < list.size(); i++)
            {
                v += "," + map.get(list.get(i)) + "," + String.format("%.7f", list.get(i));
            }
            context.write(key, new Text(v.substring(1)));
        }
    }
 
    public static void run(Configuration config,Map path) throws IOException, InterruptedException, ClassNotFoundException {

 
       FileSystem fs = FileSystem.get(config);
       Path output= new Path(path.get("Step3Output"));
       if(fs.exists(output)) {
           fs.delete(output,true);
       }
        Job job = Job.getInstance(config, "UserCF_Step3 job");
        job.setJarByClass(Step3.class);
 
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
 
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
 
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
 
        FileInputFormat.setInputPaths(job, new Path(path.get("Step3Input")));// 加载2个输入数据集
        FileOutputFormat.setOutputPath(job, output);
 

        if (!job.waitForCompletion(true))
        {
            System.out.println("main run stop!");
            return;    
        }
        
        System.out.println("main run successfully!");
    }
}

Step4

package UserRecommend;

//import hadoop.myMapreduce.martrix.MainRun;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.lang.Math;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class Step4 {
  public static class MyMapper extends Mapper {
      private String flag;// A:step3 or B:data
      private int itemNum = 7;
      
      @Override
      protected void setup(Context context) throws IOException, InterruptedException {
          FileSplit split = (FileSplit) context.getInputSplit();
          flag = split.getPath().getParent().getName();// 判断读的数据集
          
          System.out.println(flag);
      }
      
      @Override
      public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
          String[] tokens = Recommend.REGEX.split(values.toString());
          int itemIndex = 100;
          
          if (flag.equals("Step3")) {
              for (int i = 1; i <= itemNum; i++)
              {
                  Text k = new Text(Integer.toString(itemIndex + i));//itemid
                  Text v = new Text("A:" + tokens[0] + "," + tokens[1] + "," + tokens[3]);
                  context.write(k, v);
//                  System.out.println(k.toString() + "  " + v.toString());
              }
          } else if (flag.equals("StepData")) {
              Text k = new Text(tokens[1]);//itemid
                Text v = new Text("B:" + tokens[0] + "," + tokens[2]);//userid + score
                context.write(k, v);  
//                System.out.println(k.toString() + "  " + v.toString());
          }
      }
  }

  public static class MyReducer extends Reducer {
      
      @Override
      public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
          Map mapA = new HashMap();
          Map mapB = new HashMap();

          for (Text line : values) {
              String val = line.toString();

              if (val.startsWith("A:")) {
                  String[] kv = Recommend.REGEX.split(val.substring(2));
                  mapA.put(kv[0], kv[1] + "," + kv[2]);
              } else if (val.startsWith("B:")) {
                  String[] kv = Recommend.REGEX.split(val.substring(2));
                  mapB.put(kv[0], kv[1]);
              }
          }
          
          Iterator iterA = mapA.keySet().iterator();
          while (iterA.hasNext())
          {
              String userId = iterA.next();
              if (!mapB.containsKey(userId))//不存在可以推荐 有买过这个物品的不推荐
              {
                  String simiStr = mapA.get(userId);
                  String[] simi = Recommend.REGEX.split(simiStr);
                  if (simi.length >= 2)
                  {
                      double simiVal1 = mapB.containsKey(simi[0]) ? Double.parseDouble(mapB.get(simi[0])) : 0;
                      double simiVal2 = mapB.containsKey(simi[1]) ? Double.parseDouble(mapB.get(simi[1])) : 0;
                      double score = (simiVal1 + simiVal2) / 2;
                      
                      context.write(new Text(userId), new Text(key.toString() + "," + String.format("%.2f", score)));
                  }
              }
          }
      }
  }

  public static void run(Configuration config,Map path) throws IOException, InterruptedException, ClassNotFoundException {


      String input1 = path.get("input1_step4");
      String input2 = path.get("input2_step4");
      String output = path.get("output_step4");

      Job job = Job.getInstance(config, "UserCF_Step4 job");
      job.setJarByClass(Step4.class);

      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(Text.class);
      
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(Text.class);

      job.setMapperClass(MyMapper.class);
      job.setReducerClass(MyReducer.class);

      job.setInputFormatClass(TextInputFormat.class);
      job.setOutputFormatClass(TextOutputFormat.class);

      FileInputFormat.setInputPaths(job, new Path(path.get("Step4Input1")), new Path(path.get("Step4Input2")));// 加载2个输入数据集
      FileOutputFormat.setOutputPath(job, new Path(path.get("Step4Output")));

 
      
      if (!job.waitForCompletion(true))
      {
          System.out.println("main run stop!");
            return;    
      }
      
      System.out.println("main run successfully!");
  }
}
 

Step5

package UserRecommend;


import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.lang.Math;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 

 
 
public class Step5 {
    public static class MyMapper extends Mapper {
 
        @Override
        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            String[] tokens = Recommend.REGEX.split(values.toString());
            
            if (tokens.length >= 3)
            {
                Text k = new Text(tokens[0]);
                Text v = new Text(tokens[1] + "," + tokens[2]);
                context.write(k, v);                
            }
        }
    }
 
    public static class MyReducer extends Reducer {
        private final int RECOMMENDER_NUM = 3;
        
        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            Map map = new HashMap();
            
            for (Text line : values) {
                String val = line.toString();
                String[] vlist =  Recommend.REGEX.split(val);
                
                if (vlist.length >= 2)
                {
                    map.put(Double.parseDouble(vlist[1]), vlist[0]);
                }
            }
            
            List list = new ArrayList();
            Iterator iter = map.keySet().iterator();
            while (iter.hasNext()) {
                Double similarity = iter.next();
                list.add(similarity);
            }
            
            //然后通过比较器来实现排序
            Collections.sort(list,new Comparator() {
                //降序排序
                public int compare(Double o1, Double o2) {
                    return o2.compareTo(o1);
                }
            });
            
            String v = "";
            for (int i = 0; i < RECOMMENDER_NUM && i < list.size(); i++)
            {
                if (list.get(i).compareTo(new Double(0.001)) > 0)
                {
                    v += "," + map.get(list.get(i)) + "[" + String.format("%.2f", list.get(i)) + "]";
                }
            }
            
            if (!v.isEmpty())
            {
                context.write(key, new Text(v.substring(1)));
            }
            else
            {
                context.write(key, new Text("none"));
            }
        }
    }
 
    public static void run(Configuration config,Map path) throws IOException, InterruptedException, ClassNotFoundException {
        
 
        Path output =  new Path(path.get("Step5Output"));
        
        Job job = Job.getInstance(config, "UserCF_Step5 job");
        job.setJarByClass(Step5.class);
 
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
 
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
 
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
 
        FileInputFormat.setInputPaths(job, new Path(path.get("Step5Input")));// 加载2个输入数据集
        FileOutputFormat.setOutputPath(job, output);
 
        
        if (!job.waitForCompletion(true))
        {
            System.out.println("main run stop!");
            return;    
        }
        
        System.out.println("main run successfully!");
    }
    
}

输入数据

1,101,5.0
1,102,3.0
1,103,2.5
2,101,2.0
2,102,2.5
2,103,5.0
2,104,2.0
3,101,2.5
3,104,4.0
3,105,4.5
3,107,5.0
4,101,5.0
4,103,3.0
4,104,4.5
4,106,4.0
5,101,4.0
5,102,3.0
5,103,2.0
5,104,4.0
5,105,3.5
5,106,4.0 

输出数据

1    104[4.25],106[4.00],105[1.75]
2    105[4.00],107[2.50],106[2.00]
3    103[3.50],102[2.75],106[2.00]
4    102[3.00],105[1.75]
5    none
 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/663034.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号