Recommmend 类
package UserRecommend;
import java.io.IOException;
import java.util.HashMap;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
public class Recommend {
public static Pattern REGEX=Pattern.compile("[t,]");
public static void main(String[] args) throws Exception, IOException, InterruptedException {
// TODO Auto-generated method stub
HashMap
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
String start="E:\data\recommend2";
path.put("Step1Input", start+"/StepData");
path.put("Step1Output", start+"/Step1");
path.put("Step2Input", path.get("Step1Output"));
path.put("Step2Output", start+"/Step2");
path.put("Step3Input", path.get("Step2Output"));
path.put("Step3Output", start+"/Step3");
path.put("Step4Input1", path.get("Step1Input"));
path.put("Step4Input2", path.get("Step3Output"));
path.put("Step4Output", start+"/Step4");
// path.put("Step4_2Input", path.get("Step4_1Output"));
// path.put("Step4_2Output", start+"/Step4_2");
// path.put("Step5Input2", start+"/StepData");
path.put("Step5Input", path.get("Step4Output"));
path.put("Step5Output", start+"/Step5");
Step1.run(conf, path);
Step2.run(conf, path);
Step3.run(conf, path);
Step4.run(conf, path);
// Step4_1.run(conf, path);
// Step4_2.run(conf, path);
Step5.run(conf, path);
}
}
Step1
package UserRecommend;
//import hadoop.myMapreduce.martrix.MainRun;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Step1 {
public static class MyMapper extends Mapper
@Override
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
String[] tokens = Recommend.REGEX.split(values.toString());
if (tokens.length >= 3)
{
Text k = new Text(tokens[1]);//itemid
Text v = new Text(tokens[0] + "," + tokens[2]);//userid + score
context.write(k, v);
}
}
}
public static class MyReducer extends Reducer
@Override
public void reduce(Text key, Iterable
Map
for (Text line : values) {
String val = line.toString();
String[] vlist = Recommend.REGEX.split(val);
if (vlist.length >= 2)
{
map.put(vlist[0], vlist[1]);
}
}
Iterator
while (iterA.hasNext())
{
String k1 = iterA.next();
String v1 = map.get(k1);
Iterator
while (iterB.hasNext())
{
String k2 = iterB.next();
String v2 = map.get(k2);
context.write(new Text(k1 + "," + k2), new Text(v1 + "," + v2));
}
}
}
}
public static void run(Configuration config,Map
FileSystem fs=FileSystem.get(config);
Path output = new Path(path.get("Step1Output"));
if(fs.exists(output)) {
fs.delete(output,true);
}
Job job = Job.getInstance(config);
job.setJarByClass(Step1.class);
//
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(path.get("Step1Input")));// 加载2个输入数据集
FileOutputFormat.setOutputPath(job, output);
job.waitForCompletion(true);
}
}
Step2
package UserRecommend;
//import hadoop.myMapreduce.martrix.MainRun;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.lang.Math;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Step2 {
public static class MyMapper extends Mapper
@Override
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
String[] tokens = Recommend.REGEX.split(values.toString());
if (tokens.length >= 4)
{
Text k = new Text(tokens[0] + "," + tokens[1]);
Text v = new Text(tokens[2] + "," + tokens[3]);
context.write(k, v);
}
}
}
public static class MyReducer extends Reducer
@Override
public void reduce(Text key, Iterable
double sum = 0.0;
double similarity = 0.0;
int num = 0;
for (Text line : values) {
String val = line.toString();
String[] vlist = Recommend.REGEX.split(val);
if (vlist.length >= 2)
{
sum += Math.pow((Double.parseDouble(vlist[0]) - Double.parseDouble(vlist[1])), 2);
num += 1;
}
}
if (sum > 0.00000001)
{
similarity = (double)num / (1 + Math.sqrt(sum));
}
// if (similarity > 1.0)
// {
// similarity = 1.0;
// }
context.write(key, new Text(String.format("%.7f", similarity)));
}
}
public static void run(Configuration config,Map
FileSystem fs =FileSystem.get(config);
Path output = new Path(path.get("Step2Output"));
if(fs.exists(output)) {
fs.delete(output,true);
}
Job job = Job.getInstance(config, "UserCF_Step2 job");
job.setJarByClass(Step2.class);
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(path.get("Step2Input")));// 加载2个输入数据集
FileOutputFormat.setOutputPath(job, new Path(path.get("Step2Output")));
if (!job.waitForCompletion(true))
{
System.out.println("main run stop!");
return;
}
System.out.println("main run successfully!");
}
}
Step3
package UserRecommend;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.lang.Math;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Step3 {
public static class MyMapper extends Mapper
@Override
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
String[] tokens = Recommend.REGEX.split(values.toString());
if (tokens.length >= 3)
{
Text k = new Text(tokens[0]);
Text v = new Text(tokens[1] + "," + tokens[2]);
context.write(k, v);
}
}
}
public static class MyReducer extends Reducer
private final int NEIGHBORHOOD_NUM = 2;
@Override
public void reduce(Text key, Iterable
Map
for (Text line : values) {
String val = line.toString();
String[] vlist = Recommend.REGEX.split(val);
if (vlist.length >= 2)
{
map.put(Double.parseDouble(vlist[1]), vlist[0]);
}
}
List
Iterator
while (iter.hasNext()) {
Double similarity = iter.next();
list.add(similarity);
}
//然后通过比较器来实现排序
Collections.sort(list,new Comparator
//降序排序
public int compare(Double o1, Double o2) {
return o2.compareTo(o1);
}
});
// for (int i = 0; i < NEIGHBORHOOD_NUM && i < list.size(); i++)
// {
// context.write(key, new Text(map.get(list.get(i)) + "," + String.format("%.7f", list.get(i))));
// }
String v = "";
for (int i = 0; i < NEIGHBORHOOD_NUM && i < list.size(); i++)
{
v += "," + map.get(list.get(i)) + "," + String.format("%.7f", list.get(i));
}
context.write(key, new Text(v.substring(1)));
}
}
public static void run(Configuration config,Map
FileSystem fs = FileSystem.get(config);
Path output= new Path(path.get("Step3Output"));
if(fs.exists(output)) {
fs.delete(output,true);
}
Job job = Job.getInstance(config, "UserCF_Step3 job");
job.setJarByClass(Step3.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(path.get("Step3Input")));// 加载2个输入数据集
FileOutputFormat.setOutputPath(job, output);
if (!job.waitForCompletion(true))
{
System.out.println("main run stop!");
return;
}
System.out.println("main run successfully!");
}
}
Step4
package UserRecommend;
//import hadoop.myMapreduce.martrix.MainRun;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.lang.Math;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Step4 {
public static class MyMapper extends Mapper
private String flag;// A:step3 or B:data
private int itemNum = 7;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSplit split = (FileSplit) context.getInputSplit();
flag = split.getPath().getParent().getName();// 判断读的数据集
System.out.println(flag);
}
@Override
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
String[] tokens = Recommend.REGEX.split(values.toString());
int itemIndex = 100;
if (flag.equals("Step3")) {
for (int i = 1; i <= itemNum; i++)
{
Text k = new Text(Integer.toString(itemIndex + i));//itemid
Text v = new Text("A:" + tokens[0] + "," + tokens[1] + "," + tokens[3]);
context.write(k, v);
// System.out.println(k.toString() + " " + v.toString());
}
} else if (flag.equals("StepData")) {
Text k = new Text(tokens[1]);//itemid
Text v = new Text("B:" + tokens[0] + "," + tokens[2]);//userid + score
context.write(k, v);
// System.out.println(k.toString() + " " + v.toString());
}
}
}
public static class MyReducer extends Reducer
@Override
public void reduce(Text key, Iterable
Map
Map
for (Text line : values) {
String val = line.toString();
if (val.startsWith("A:")) {
String[] kv = Recommend.REGEX.split(val.substring(2));
mapA.put(kv[0], kv[1] + "," + kv[2]);
} else if (val.startsWith("B:")) {
String[] kv = Recommend.REGEX.split(val.substring(2));
mapB.put(kv[0], kv[1]);
}
}
Iterator
while (iterA.hasNext())
{
String userId = iterA.next();
if (!mapB.containsKey(userId))//不存在可以推荐 有买过这个物品的不推荐
{
String simiStr = mapA.get(userId);
String[] simi = Recommend.REGEX.split(simiStr);
if (simi.length >= 2)
{
double simiVal1 = mapB.containsKey(simi[0]) ? Double.parseDouble(mapB.get(simi[0])) : 0;
double simiVal2 = mapB.containsKey(simi[1]) ? Double.parseDouble(mapB.get(simi[1])) : 0;
double score = (simiVal1 + simiVal2) / 2;
context.write(new Text(userId), new Text(key.toString() + "," + String.format("%.2f", score)));
}
}
}
}
}
public static void run(Configuration config,Map
String input1 = path.get("input1_step4");
String input2 = path.get("input2_step4");
String output = path.get("output_step4");
Job job = Job.getInstance(config, "UserCF_Step4 job");
job.setJarByClass(Step4.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(path.get("Step4Input1")), new Path(path.get("Step4Input2")));// 加载2个输入数据集
FileOutputFormat.setOutputPath(job, new Path(path.get("Step4Output")));
if (!job.waitForCompletion(true))
{
System.out.println("main run stop!");
return;
}
System.out.println("main run successfully!");
}
}
Step5
package UserRecommend;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.lang.Math;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Step5 {
public static class MyMapper extends Mapper
@Override
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
String[] tokens = Recommend.REGEX.split(values.toString());
if (tokens.length >= 3)
{
Text k = new Text(tokens[0]);
Text v = new Text(tokens[1] + "," + tokens[2]);
context.write(k, v);
}
}
}
public static class MyReducer extends Reducer
private final int RECOMMENDER_NUM = 3;
@Override
public void reduce(Text key, Iterable
Map
for (Text line : values) {
String val = line.toString();
String[] vlist = Recommend.REGEX.split(val);
if (vlist.length >= 2)
{
map.put(Double.parseDouble(vlist[1]), vlist[0]);
}
}
List
Iterator
while (iter.hasNext()) {
Double similarity = iter.next();
list.add(similarity);
}
//然后通过比较器来实现排序
Collections.sort(list,new Comparator
//降序排序
public int compare(Double o1, Double o2) {
return o2.compareTo(o1);
}
});
String v = "";
for (int i = 0; i < RECOMMENDER_NUM && i < list.size(); i++)
{
if (list.get(i).compareTo(new Double(0.001)) > 0)
{
v += "," + map.get(list.get(i)) + "[" + String.format("%.2f", list.get(i)) + "]";
}
}
if (!v.isEmpty())
{
context.write(key, new Text(v.substring(1)));
}
else
{
context.write(key, new Text("none"));
}
}
}
public static void run(Configuration config,Map
Path output = new Path(path.get("Step5Output"));
Job job = Job.getInstance(config, "UserCF_Step5 job");
job.setJarByClass(Step5.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(path.get("Step5Input")));// 加载2个输入数据集
FileOutputFormat.setOutputPath(job, output);
if (!job.waitForCompletion(true))
{
System.out.println("main run stop!");
return;
}
System.out.println("main run successfully!");
}
}
输入数据
1,101,5.0
1,102,3.0
1,103,2.5
2,101,2.0
2,102,2.5
2,103,5.0
2,104,2.0
3,101,2.5
3,104,4.0
3,105,4.5
3,107,5.0
4,101,5.0
4,103,3.0
4,104,4.5
4,106,4.0
5,101,4.0
5,102,3.0
5,103,2.0
5,104,4.0
5,105,3.5
5,106,4.0
输出数据
1 104[4.25],106[4.00],105[1.75]
2 105[4.00],107[2.50],106[2.00]
3 103[3.50],102[2.75],106[2.00]
4 102[3.00],105[1.75]
5 none



