Recommend类
package text;
import java.util.HashMap;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import recommendMovie.Step1;
public class Recommend {
public static final Pattern DELIMITER=Pattern.compile("[t,]");
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
conf.set("fs.deFaultFS", "file:///");
HashMap
path.put("Step1Input", "E://data//recommend");
path.put("Step1Output", path.get("Step1Input")+"/Step1");
path.put("Step2Input", path.get("Step1Output"));
path.put("Step2Output", path.get("Step1Input")+"/Step2");
path.put("Step3Input", path.get("Step1Output"));
path.put("Step3Output", path.get("Step1Input")+"/Step3");
path.put("Step4_1Input1",path.get("Step2Output") );
path.put("Step4_1Input2", path.get("Step3Output"));
path.put("Step4_1Output", path.get("Step1Input")+"/Step4");
path.put("Step4_2Input",path.get("Step4_1Output") );
path.put("Step4_2Output", path.get("Step1Input")+"/Step4_2");
path.put("Step5Input1", path.get("Step4_2Output"));
path.put("Step5Input2", path.get("Step1Input")+"/StepData");
path.put("Step5Output", path.get("Step1Input")+"/Step5");
// Step1.run(conf, path);
// Step2.run(conf, path);
// Step3.run(conf, path);
// Step4_1.run(conf,path);
// Step4_2.run(conf, path);
Step5.run(conf,path);
}
}
Step1
package text;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Step1 {
public static class Step1_toItemPreMapper extends Mapper
@Override
protected void map(LongWritable key, Text value, Mapper
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String[] tokens=Recommend.DELIMITER.split(value.toString());
int UserID=Integer.parseInt(tokens[0]);
String ItemID=tokens[1];
String pref = tokens[2];
IntWritable k= new IntWritable(UserID);
Text v= new Text(ItemID+":"+pref);
context.write(k,v);
}
}
public static class Step1_ToUserVectorReducer extends Reducer
@Override
protected void reduce(IntWritable key, Iterable
Reducer
// TODO Auto-generated method stub
StringBuilder sb = new StringBuilder();
for(Text value:values) {
sb.append(","+value.toString());
}
context.write(key, new Text(sb.toString().replaceFirst(",", "")));
}
}
public static void run(Configuration config ,Map
throws Exception
{
FileSystem fs = FileSystem.get(config);
Path output=new Path(path.get("Step1Output"));
if(fs.exists(output)) {
fs.delete(output,true);
}
Job job=Job.getInstance(config);
job.setJobName("Step1");
job.setJarByClass(Step1.class);
job.setMapperClass(Step1_toItemPreMapper.class);
job.setReducerClass(Step1_ToUserVectorReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(path.get("Step1Input")));
FileOutputFormat.setOutputPath(job, output);
job.waitForCompletion(true);
}
}
Step2
package text;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import text.Step1.Step1_ToUserVectorReducer;
import text.Step1.Step1_toItemPreMapper;
public class Step2 {
public static class Step2_UserVectorToCoocurrenceMapper extends Mapper
@Override
protected void map(LongWritable key, Text value, Mapper
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String[] tokens=Recommend.DELIMITER.split(value.toString());
for(int i=1;i
for(int j=1;j
context.write(new Text(itemID1+":"+itemID2), new IntWritable(1));
}
}
}
}
public static class Step2_UserVectorToConoccurrenceReducer extends Reducer
@Override
protected void reduce(Text key, Iterable
Reducer
throws IOException, InterruptedException {
int sum=0;
for(IntWritable value:values) {
sum+=value.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void run(Configuration config ,Map
throws Exception
{
FileSystem fs = FileSystem.get(config);
Path output=new Path(path.get("Step2Output"));
if(fs.exists(output)) {
fs.delete(output,true);
}
Job job=Job.getInstance(config);
job.setJobName("Step2");
job.setJarByClass(Step2.class);
job.setMapperClass(Step2_UserVectorToCoocurrenceMapper.class);
job.setReducerClass(Step2_UserVectorToConoccurrenceReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(path.get("Step2Input")));
FileOutputFormat.setOutputPath(job, new Path(path.get("Step2Output")));
job.waitForCompletion(true);
}
}
Step3
package text;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Step3 {
public static class Step3_ItemPrefVectorMapper extends Mapper
@Override
protected void map(LongWritable key, Text value, Mapper
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String[] tokens=Recommend.DELIMITER.split(value.toString());
String userID=tokens[0];
for(int i=1;i
String pref=tokens[i].split(":")[1];
context.write(new Text(itemID), new Text(userID+":"+pref));}
}
}
public static void run(Configuration config ,Map
throws Exception
{
FileSystem fs = FileSystem.get(config);
Path output=new Path(path.get("Step3Output"));
if(fs.exists(output)) {
fs.delete(output,true);
}
Job job=Job.getInstance(config);
job.setJobName("Step3");
job.setJarByClass(Step3.class);
job.setMapperClass(Step3_ItemPrefVectorMapper.class);
// job.setReducerClass(Step2_UserVectorToConoccurrenceReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(path.get("Step3Input")));
FileOutputFormat.setOutputPath(job, new Path(path.get("Step3Output")));
job.waitForCompletion(true);
}
}
Step4_1
package text;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import text.Step3.Step3_ItemPrefVectorMapper;
public class Step4_1 {
public static class Step4_1_filterMapper extends Mapper
//用来判断文件来自于Step2,还是Step3
private String flag;
@Override
protected void setup(Mapper
throws IOException, InterruptedException {
// TODO Auto-generated method stub
FileSplit split = (FileSplit)context.getInputSplit();
flag=split.getPath().getParent().getName();
}
@Override
protected void map(LongWritable key, Text value, Mapper
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String[] tokens=Recommend.DELIMITER.split(value.toString());
if(flag.equals("Step2")) {
String itemID1=tokens[0].split(":")[0];
String itemID2=tokens[0].split(":")[1];
int num=Integer.parseInt(tokens[1]);
context.write(new Text(itemID1), new Text("A:"+itemID2+","+num));
}else if(flag.equals("Step3")) {
String itemID=tokens[0];
String userID=tokens[1].split(":")[0];
String pref =tokens[1].split(":")[1];
context.write(new Text(itemID), new Text("B:"+userID+","+pref));
}
}
}
public static class Step4_1_MultplyReducer extends Reducer
@Override
protected void reduce(Text arg0, Iterable
throws IOException, InterruptedException {
// TODO Auto-generated method stub
HashMap
HashMap
for(Text line:arg1) {
String val=line.toString();
if(val.startsWith("A")) {
String[] temp=Recommend.DELIMITER.split(val.substring(2));
mapA.put(temp[0], temp[1]);
}else if (val.startsWith("B")) {
String[] temp=Recommend.DELIMITER.split(val.substring(2));
mapB.put(temp[0], temp[1]);
}
}
double result=0;
Iterator
while(iter.hasNext()) {
String mapk=iter.next();
int num = Integer.parseInt(mapA.get(mapk));
Iterator
while(iterb.hasNext()) {
String mapkb=iterb.next();//user
Double pref=Double.parseDouble(mapB.get(mapkb));
result=num*pref;
arg2.write(new Text(mapkb), new Text(mapk+","+result));
}
}
}
}
public static void run(Configuration config ,Map
throws Exception
{
FileSystem fs = FileSystem.get(config);
Path output=new Path(path.get("Step4_1Output"));
if(fs.exists(output)) {
fs.delete(output,true);
}
Job job=Job.getInstance(config);
job.setJobName("Step4");
job.setJarByClass(Step4_1.class);
job.setMapperClass(Step4_1_filterMapper.class);
job.setReducerClass(Step4_1_MultplyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path[] {new Path(path.get("Step4_1Input1")),new Path(path.get("Step4_1Input2"))});
FileOutputFormat.setOutputPath(job, new Path(path.get("Step4_1Output")));
job.waitForCompletion(true);
}
}
Step4_2
package text;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import text.Step3.Step3_ItemPrefVectorMapper;
public class Step4_1 {
public static class Step4_1_filterMapper extends Mapper
//用来判断文件来自于Step2,还是Step3
private String flag;
@Override
protected void setup(Mapper
throws IOException, InterruptedException {
// TODO Auto-generated method stub
FileSplit split = (FileSplit)context.getInputSplit();
flag=split.getPath().getParent().getName();
}
@Override
protected void map(LongWritable key, Text value, Mapper
throws IOException, InterruptedException {
// TODO Auto-generated method stub
String[] tokens=Recommend.DELIMITER.split(value.toString());
if(flag.equals("Step2")) {
String itemID1=tokens[0].split(":")[0];
String itemID2=tokens[0].split(":")[1];
int num=Integer.parseInt(tokens[1]);
context.write(new Text(itemID1), new Text("A:"+itemID2+","+num));
}else if(flag.equals("Step3")) {
String itemID=tokens[0];
String userID=tokens[1].split(":")[0];
String pref =tokens[1].split(":")[1];
context.write(new Text(itemID), new Text("B:"+userID+","+pref));
}
}
}
public static class Step4_1_MultplyReducer extends Reducer
@Override
protected void reduce(Text arg0, Iterable
throws IOException, InterruptedException {
// TODO Auto-generated method stub
HashMap
HashMap
for(Text line:arg1) {
String val=line.toString();
if(val.startsWith("A")) {
String[] temp=Recommend.DELIMITER.split(val.substring(2));
mapA.put(temp[0], temp[1]);
}else if (val.startsWith("B")) {
String[] temp=Recommend.DELIMITER.split(val.substring(2));
mapB.put(temp[0], temp[1]);
}
}
double result=0;
Iterator
while(iter.hasNext()) {
String mapk=iter.next();
int num = Integer.parseInt(mapA.get(mapk));
Iterator
while(iterb.hasNext()) {
String mapkb=iterb.next();//user
Double pref=Double.parseDouble(mapB.get(mapkb));
result=num*pref;
arg2.write(new Text(mapkb), new Text(mapk+","+result));
}
}
}
}
public static void run(Configuration config ,Map
throws Exception
{
FileSystem fs = FileSystem.get(config);
Path output=new Path(path.get("Step4_1Output"));
if(fs.exists(output)) {
fs.delete(output,true);
}
Job job=Job.getInstance(config);
job.setJobName("Step4");
job.setJarByClass(Step4_1.class);
job.setMapperClass(Step4_1_filterMapper.class);
job.setReducerClass(Step4_1_MultplyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path[] {new Path(path.get("Step4_1Input1")),new Path(path.get("Step4_1Input2"))});
FileOutputFormat.setOutputPath(job, new Path(path.get("Step4_1Output")));
job.waitForCompletion(true);
}
}
Step5
package text;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.linkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import text.Step4_2.Step4_2functionMapper;
import text.Step4_2.Step4_2functionReducer;
public class Step5 {
public static class Step5_Mapper extends Mapper
private String flag;
@Override
protected void setup(Mapper
throws IOException, InterruptedException {
// TODO Auto-generated method stub
FileSplit split=(FileSplit)context.getInputSplit();
flag=split.getPath().getParent().getName();
}
@Override
protected void map(LongWritable key, Text value, Mapper
throws IOException, InterruptedException {
// TODO Auto-generated method stub
if(flag.equals("Step4_2")) {
String[] tokens=Recommend.DELIMITER.split(value.toString());
String userID=tokens[0];
String itemID=tokens[1];
String pref=tokens[2];
context.write(new Text(userID),new Text("W:"+itemID+","+pref));
}else if(flag.equals("StepData")) {
String[] tokens=Recommend.DELIMITER.split(value.toString());
String userID=tokens[0];
String itemID=tokens[1];
String pref=tokens[2];
context.write(new Text(userID),new Text("S:"+itemID+","+pref));
}
}
}
public static class Step5_reducer extends Reducer
@Override
protected void reduce(Text arg0, Iterable
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//过滤
HashMap
HashMap
List
//注意这边arg1是多个字段
for(Text line:arg1) {
String[] tokens=Recommend.DELIMITER.split(line.toString().substring(2));
String itemID=tokens[0];
String pref=tokens[1];
if(line.toString().startsWith("W")) {
wMap.put(itemID,pref);
}else if(line.toString().startsWith("S")) {
sMap.put(itemID, pref);
}
}
HashMap
Iterator
while(iter.hasNext()) {
String k=iter.next();
if(sMap.containsKey(k)==false) {
flatMap.put(k, Float.parseFloat(wMap.get(k)));
}
}
for(Entry
list.add(entry);
}
list.sort(new Comparator
@Override
public int compare(Entry
// TODO Auto-generated method stub
return (int)(o2.getValue()-o1.getValue());
}
}
);
for(int i=0;i
}
}
}
public static void run(Configuration config ,Map
throws Exception
{
FileSystem fs = FileSystem.get(config);
Path output=new Path(path.get("Step5Output"));
if(fs.exists(output)) {
fs.delete(output,true);
}
Job job=Job.getInstance(config);
job.setJobName("Step5");
job.setJarByClass(Step5.class);
job.setMapperClass(Step5_Mapper.class);
job.setReducerClass(Step5_reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path[] {new Path(path.get("Step5Input1")),
new Path(path.get("Step5Input2"))});
FileOutputFormat.setOutputPath(job, new Path(path.get("Step5Output")));
job.waitForCompletion(true);
}
}
数据
1,101,5
1,102,3
1,103,2.5
2,101,2
2,102,2.5
2,103,5
2,104,2
3,101,2
3,104,4
3,105,4.5
3,107,5
4,101,5
4,103,3
4,104,4.5
4,106,4
5,101,4
5,102,3
5,103,2
5,104,4
5,105,3.5
5,106,4
6,102,4
6,103,2
6,105,3.5
6,107,4
最终结果
1 104 33.5
1 105 21.0
1 106 18.0
1 107 10.5
2 105 23.0
2 106 20.5
2 107 11.5
3 103 34.0
3 102 28.0
3 106 16.5
4 102 40.0
4 105 29.0
4 107 12.5
5 107 20.0
6 101 31.0
6 104 25.0
6 106 11.5



