mapreduce代码部分
计算推荐结果
recommend部分代码
Test.java
package my;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
public class Test {
public Map
public static final Pattern DELIMITER = Pattern.compile("[,t]");
public Test()
{
path = new HashMap
path.put("Step1Input", "hdfs://localhost:9000/user/hadoop/recommend");
path.put("Step1Output", path.get("Step1Input") + "/step1");
path.put("Step2Input", path.get("Step1Output"));
path.put("Step2Output", path.get("Step1Input") + "/step2");
path.put("Step3Input1", path.get("Step1Output"));
path.put("Step3Output1", path.get("Step1Input") + "/step3_1");
path.put("Step3Input2", path.get("Step2Output"));
path.put("Step3Output2", path.get("Step1Input") + "/step3_2");
path.put("Step4Input1", path.get("Step3Output1"));
path.put("Step4Input2", path.get("Step3Output2"));
path.put("Step4Output", path.get("Step1Input") + "/step4");
}
public Map
return path;
}
public void setPath(Map
this.path = path;
}
}
Cooccurrence.java
package my;
public class Cooccurrence {
private int itemID1;
private int itemID2;
private int num;
public Cooccurrence(int itemID1, int itemID2, int num) {
super();
this.itemID1 = itemID1;
this.itemID2 = itemID2;
this.num = num;
}
public int getItemId1() {
return itemID1;
}
public void setItemId1(int itemID1) {
this.itemID1 = itemID1;
}
public int getItemId2() {
return itemID2;
}
public void setItemId2(int itemID2) {
this.itemID2 = itemID2;
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
}
MultiTask2.java
package my;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MultiTask2 {
public static final Pattern DELIMITER = Pattern.compile("[t,]");
public static class Step1Mapper extends Mapper
private final static Text v = new Text();
public void reduce(IntWritable key, Iterable
StringBuilder sb = new StringBuilder();
for (Text val : values) {
sb.append("," + val.toString());
}
v.set(sb.toString().replaceFirst(",", ""));//133 24:5,57:3,41:4
context.write(key, v);
}
}
public static class Step2Mapper extends Mapper
private final static Text k = new Text();
private final static IntWritable v = new IntWritable(1);
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
String[] tokens = Test.DELIMITER.split(values.toString());
for (int i = 1; i < tokens.length; i++) {
String itemID = tokens[i].split(":")[0];
for (int j = 1; j < tokens.length; j++) {
String itemID2 = tokens[j].split(":")[0];
k.set(itemID + ":" + itemID2);
context.write(k, v);
}
}
}
}
public static class Step2Reducer extends Reducer
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static class Step3Mapper1 extends Mapper
private final static IntWritable k = new IntWritable();
private final static Text v = new Text();
@Override
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
String[] tokens = DELIMITER.split(values.toString());
for (int i = 1; i < tokens.length; i++) {
String[] vector = tokens[i].split(":");
int itemID = Integer.parseInt(vector[0]);
String pref = vector[1];
k.set(itemID);
v.set(tokens[0] + ":" + pref);
context.write(k,v);
}
}
}
public static class Step3Mapper2 extends Mapper
private final static Text k = new Text();
private final static IntWritable v = new IntWritable();
@Override
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
String[] tokens = DELIMITER.split(values.toString());
k.set(tokens[0]);
v.set(Integer.parseInt(tokens[1]));
context.write(k,v);
}
}
public static class Step4Mapper extends Mapper
private final static IntWritable k = new IntWritable();
private final static Text v = new Text();
private final static Map
public void map(LongWritable key, Text values,Context context) throws IOException, InterruptedException {
String[] tokens = DELIMITER.split(values.toString());
String[] v1 = tokens[0].split(":");
String[] v2 = tokens[1].split(":");
if (v1.length > 1) {// cooccurrence
int itemID1 = Integer.parseInt(v1[0]);
int itemID2 = Integer.parseInt(v1[1]);
int num = Integer.parseInt(tokens[1]);
List list = null;
if (!cooccurrenceMatrix.containsKey(itemID1)) {
list = new ArrayList();
} else {
list = cooccurrenceMatrix.get(itemID1);
}
list.add(new Cooccurrence(itemID1, itemID2, num));
cooccurrenceMatrix.put(itemID1, list);
}
if (v2.length > 1) {// userVector
int itemID = Integer.parseInt(tokens[0]);
int userID = Integer.parseInt(v2[0]);
double pref = Double.parseDouble(v2[1]);
k.set(userID);
for (Cooccurrence co : cooccurrenceMatrix.get(itemID)) {
v.set(co.getItemId2() + "," + pref * co.getNum());
//System.out.println("score:"+co.getItemId2() + "," + pref * co.getNum());
context.write(k, v);
}
}
}
}
public static class Step4Reducer extends Reducer
private Text v = new Text();
public void reduce(IntWritable key, Iterable
Map
for(Text vale : values){
String[] str = vale.toString().split(",");
if (result.containsKey(str[0]))
result.put( str[0], result.get(str[0]) + Double.parseDouble(str[1]) );
else {
result.put(str[0], Double.parseDouble(str[1]));
}
}
for (String val : result.keySet()) {
String itemID = (String) val;
double score = result.get(itemID);
v.set(itemID + "," + score);
context.write(key, v);
}
}
}
public static void main(String[] args)throws Exception {
Test test=new Test();
String input1 = test.getPath().get("Step1Input");
String output1 = test.getPath().get("Step1Output");
Configuration conf = new Configuration();
Job job1 = new Job(conf, "job1");
job1.setJarByClass(MultiTask2.class);
job1.setMapperClass(Step1Mapper.class);
job1.setCombinerClass(Step1Reducer.class);
job1.setReducerClass(Step1Reducer.class);
job1.setOutputKeyClass(IntWritable.class);
job1.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job1, new Path(input1));
FileOutputFormat.setOutputPath(job1, new Path(output1));
job1.waitForCompletion(true);
ControlledJob ctrljob1=new ControlledJob(conf);
ctrljob1.setJob(job1);
String input2 = test.getPath().get("Step2Input");
String output2 = test.getPath().get("Step2Output");
Job job2 = new Job(conf, "job2");
job2.setJarByClass(MultiTask2.class);
job2.setMapperClass(Step2Mapper.class);
job2.setCombinerClass(Step2Reducer.class);
job2.setReducerClass(Step2Reducer.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job2, new Path(input2));
FileOutputFormat.setOutputPath(job2, new Path(output2));
job2.waitForCompletion(true);
ControlledJob ctrljob2=new ControlledJob(conf);
ctrljob2.setJob(job2);
ctrljob2.addDependingJob(ctrljob1);
String input31 = test.getPath().get("Step3Input1");
String output31 = test.getPath().get("Step3Output1");
Job job31 = new Job(conf, "job31");
job31.setMapperClass(Step3Mapper1.class);
job31.setOutputKeyClass(IntWritable.class);
job31.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job31, new Path(input31));
FileOutputFormat.setOutputPath(job31, new Path(output31));
job31.waitForCompletion(true);
ControlledJob ctrljob31=new ControlledJob(conf);
ctrljob31.setJob(job31);
ctrljob31.addDependingJob(ctrljob2);
String input32 = test.getPath().get("Step3Input2");
String output32 = test.getPath().get("Step3Output2");
Job job32 = new Job(conf, "job32");
job32.setMapperClass(Step3Mapper2.class);
job32.setOutputKeyClass(Text.class);
job32.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job32, new Path(input32));
FileOutputFormat.setOutputPath(job32, new Path(output32));
job32.waitForCompletion(true);
ControlledJob ctrljob32=new ControlledJob(conf);
ctrljob32.setJob(job32);
ctrljob32.addDependingJob(ctrljob31);
String input41 = test.getPath().get("Step4Input1");
String input42 = test.getPath().get("Step4Input2");
String output4 = test.getPath().get("Step4Output");
Job job4 = new Job(conf, "job4");
job4.setJarByClass(MultiTask2.class);
job4.setMapperClass(Step4Mapper.class);
job4.setReducerClass(Step4Reducer.class);
job4.setOutputKeyClass(IntWritable.class);
job4.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job4, new Path(input41), new Path(input42));
FileOutputFormat.setOutputPath(job4, new Path(output4));
job4.waitForCompletion(true);
ControlledJob ctrljob4=new ControlledJob(conf);
ctrljob4.setJob(job4);
ctrljob4.addDependingJob(ctrljob32);
JobControl jobCtrl=new JobControl("myctrl");
jobCtrl.addJob(ctrljob1);
jobCtrl.addJob(ctrljob2);
jobCtrl.addJob(ctrljob31);
jobCtrl.addJob(ctrljob32);
jobCtrl.addJob(ctrljob4);
}
}
level部分代码
levelCount.java
package my;
import java.io.IOException;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class count {
public static final Pattern Delimiter = Pattern.compile("[,t]");
public static class countMapper extends Mapper
year部分代码
yearCount.java
package my1;
import java.io.IOException;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
public class movieCount {
public static final Pattern Delimiter = Pattern.compile("[|]");
public static class countMapper extends Mapper
job部分代码
jobCount.java
package my2;
import java.io.IOException;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
public class jobCount {
public static final Pattern Delimiter = Pattern.compile("[|]");
public static class countMapper extends Mapper
average部分代码
averageCount.java
package my4;
import java.io.IOException;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
public class averageCount {
public static final Pattern Delimiter = Pattern.compile("[t,]");
public static class countMapper extends Mapper
}
hot部分代码
hotCount.java
package my4;
import java.io.IOException;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
public class averageCount {
public static final Pattern Delimiter = Pattern.compile("[t,]");
public static class countMapper extends Mapper
}
smilarity部分代码
similarity.java
package my6;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import my4.averageCount;
import my4.averageCount.countMapper;
import my4.averageCount.countReducer;
public class similarity {
public static final Pattern Delimiter = Pattern.compile("[t,]");
public static class countMapper extends Mapper
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
String[] otherArgs=new String[]{"similarity","similarity/result"};
if(otherArgs.length!=2){
System.err.println("Usage:Merge and duplicate removal
System.exit(2);
}
Job job=Job.getInstance(conf,"Merge and duplicate removal");
job.setJarByClass(similarity.class);
job.setMapperClass(countMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(countReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
boolean result=job.waitForCompletion(true);
System.out.println(result?"成功":"失败");
}
}
weight.java
package my6;
public class weight {
public static double [] weig;
public weight(){
weig = new double[20];
String wei = "0.12765441 0.00169363 0.07760147 0.05145752 0.02151025 0.04790932 0.11253775 0.0441973 0.02463966 0.12589693 0.0128394 0.01377639 0.03906519 0.02689036 0.02870992 0.07683498 0.04182388 0.07760147 0.0322126 0.01514757";
String [] temp = wei.split(" ");
for(int i = 0;i < temp.length;i++) {
weig[i] = Double.parseDouble(temp[i]);
}
}
public double getW(int num)
{
return weig[num];
}
}
至此mapreduce处理部分完毕
接下来是可视化和将数据存储进mysql的代码
存储部分
import pandas as pd
import pymysql
import numpy as np
jobF = 'job'
hotF = 'hot'
averageF = 'average'
scoreF = 'level'
recommendF = 'recommend'
userF = 'u.user'
movieSF = 'movieS'
con = pymysql.connect(host='localhost',port=3306,user='root',passwd='',db='test1',charset = 'utf8',cursorclass=pymysql.cursors.DictCursor,autocommit =True)
cur1 = con.cursor()
def insertJ():
job = [] # 用来存放用户对电影的评分信息
for line in open(jobF): # 逐行读取
work, num = line.split('t')[0:2]
num = int(num)
job.append([work, num])
# job = np.array(job,dtype=[('work','str'),('num','int')])
#job = pd.Dataframe(job, columns=['job', 'num'])
cur1.execute("drop table if exists job")
cur1.execute("CREATE TABLE job (work VARCHAr(255),num INT(10))")
for i in range(len(job)):
insert = ("INSERT INTO `job`(work,num) VALUES ('%s','%d')" % (job[i][0],job[i][1]))
cur1.execute(insert)
con.commit()
def insertH():
hot = []
for line in open(hotF): # 逐行读取
movie, num = line.split('t')[0:2]
movie = int(movie)
num = int(num)
hot.append([movie, num])
# hot = np.array(hot)
#hot = pd.Dataframe(hot, columns=['movie', 'assess'])
cur1.execute("drop table if exists hot")
cur1.execute("CREATE TABLE hot (movieID INT(10),assess INT(10))")
for i in range(len(hot)):
insert = ("INSERT INTO `hot`(movieID,assess) VALUES ('%s','%d')" % (hot[i][0], hot[i][1]))
cur1.execute(insert)
con.commit()
def insertA():
average = []
for line in open(averageF): # 逐行读取
movie, score = line.split('t')[0:2]
movie = int(movie)
score = float(score[:-1])
average.append([movie, score])
#average = pd.Dataframe(average, columns=['movie', 'score'])
cur1.execute("drop table if exists average")
cur1.execute("CREATE TABLE average (movieID INT(10),score FLOAT)")
for i in range(len(average)):
insert = ("INSERT INTO `average`(movieID,score) VALUES ('%d','%f')" % (average[i][0], average[i][1]))
cur1.execute(insert)
con.commit()
def insertS():
scoreM = []
for line in open(scoreF):
movie, mess = line.split('t')[0:2]
temp = [int(movie), 0, 0, 0, 0, 0]
messa = mess.split(',')
for i in range(len(messa)):
message = messa[i].split(':')
temp[int(message[0])] = int(message[1])
scoreM.append(temp)
# scoreM = np.array(scoreM)
#scoreM = pd.Dataframe(scoreM, columns=['movie', '1', '2', '3', '4', '5'])
cur1.execute("drop table if exists scoreM")
cur1.execute("CREATE TABLE scoreM (movieID INT(10),num1 INT(10),num2 INT(10),num3 INT(10),num4 INT(10),num5 INT(10))")
for i in range(len(scoreM)):
insert = ("INSERT INTO `scoreM`(movieID,num1,num2,num3,num4,num5) VALUES ('%d','%d','%d','%d','%d','%d')" % (scoreM[i][0],scoreM[i][1], scoreM[i][2], scoreM[i][3], scoreM[i][4], scoreM[i][5]))
cur1.execute(insert)
con.commit()
def insertR():
recommend = {}
Rec = []
for line in open(recommendF):
movieID, mess = line.split('t')[0:2]
movieID = int(movieID)
recommend.setdefault(movieID, [])
messa = mess.split(',')
recommend[movieID].append([int(messa[0]),int(float(messa[1]))])
for k,v in recommend.items():
temp = np.array(v)
temp = temp[np.argsort(-temp[:,1])]
if (len(temp)>=10):
recommend[k] = temp[:10,:]
else:
length = len(temp)
recommend[k] = temp[:length,:]
cur1.execute("drop table if exists recommend")
cur1.execute("CREATE TABLE recommend (userID INT(10),movieID INT(10),score INT(10))")
print(len(recommend))
for i in range(len(recommend)):
for j in range(len(recommend[i+1])):
insert = ("INSERT INTO `recommend`(userID,movieID,score) VALUES ('%d','%d','%d')" % (i + 1, recommend[i + 1][j,0], recommend[i+1][j,1]))
cur1.execute(insert)
con.commit()
def insertUA():
UA = {
'0-9':0, '10-19':0, '20-29':0, '30-39':0,
'40-49':0, '50-59':0, '60-69':0, '70-79':0
}
age = []
cur1.execute("drop table if exists age")
cur1.execute("CREATE TABLE age (age VARCHAr(255),num INT(10))")
for line in open(userF, encoding='ISO-8859-1'):
userID, age = line.split('|')[0:2]
age = int(age)
if 0<=age<=9:
UA['0-9']+=1
elif 10<=age<=19:
UA['10-19']+=1
elif 20<=age<=29:
UA['20-29']+=1
elif 30<=age<=39:
UA['30-39']+=1
elif 40<=age<=49:
UA['40-49']+=1
elif 50<=age<=59:
UA['50-59']+=1
elif 60<=age<=69:
UA['60-69']+=1
elif 70<=age<=79:
UA['70-79']+=1
for k,v in UA.items():
insert = ("INSERT INTO `age`(age,num) VALUES ('%s','%d')" % (k, v))
cur1.execute(insert)
con.commit()
def insertMovie():
movieS = {}
num = 0
for line in open(movieSF):
movie, sim = line.split('t')[0:2]
sim = float(sim)
id1,id2 = movie.split(',')[0:2]
id1 = int(id1)
id2 = int(id2)
movieS.setdefault(id1,[])
movieS[id1].append([id2,sim])
num+=1
print(num)
data = []
for k,v in movieS.items():
temp = np.array(v)
temp = temp[np.argsort(-temp[:, 1])]
if (len(temp) >= 10):
movieS[k] = temp[:10, :]
else:
length = len(temp)
movieS[k] = temp[:length, :]
cur1.execute("drop table if exists movieSim")
cur1.execute("CREATE TABLE movieSim (movieID1 INT(10),movieID2 INT(10),sim FLOAT)")
for k,v in movieS.items():
for i in range(len(v)):
insert = ("INSERT INTO `movieSim`(movieID1,movieID2,sim) VALUES ('%d','%d','%f')" % (
k, movieS[k][i,0], movieS[k][i,1]))
cur1.execute(insert)
con.commit()
#insertS()
#insertA()
#insertJ()
#insertH()
#insertR()
#insertUA()
insertMovie()
cur1.close()
con.close()
可视化部分
import time
import matplotlib.pyplot as plt
from tkinter import END
from tkinter.scrolledtext import ScrolledText
from tkinter import *
from tkinter import ttk
import tkinter as tk
import win32api, win32con
import os
import pymysql
import numpy as np
import pandas as pd
mDB = 'test1'
pWD = ''
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
movies = {} #用来存放电影基本信息
for line in open('u.item',encoding='ISO-8859-1'):
movieID, movieName = line.split('|')[0:2]
movieID = int(movieID)
movies[movieID] = movieName
def similarity():
def getMovieS():
con = pymysql.connect(host='localhost', port=3306, user='root', passwd=pWD, db=mDB, charset='utf8',
cursorclass=pymysql.cursors.DictCursor, autocommit=True)
cur = con.cursor()
ID = int(user_text.get())
user_text.set("")
select = 'SELECT movieID2 FROM movieSim WHERe movieID1=%d' % (ID)
cur.execute(select)
mess = cur.fetchall()
cur.close()
con.close()
top = Toplevel()
top.geometry("500x300")
top.title('电影'+movies[ID]+'相似影片')
scr = ScrolledText(top, width=60, height=20, font=("隶书", 10))
scr.pack()
for i in range(len(mess)):
mes =str(i+1)+'t'+ movies[mess[i]['movieID2']] +'n'
scr.insert(END, mes)
top = tk.Toplevel() # 生成主窗口
top.title("相似影片") # 窗体名称
top.geometry("300x100") # 指定窗体大小
input = Label(top, text="请输影片ID")
input.pack()
user_text = StringVar()
bucket = Entry(top, textvariable=user_text)
user_text.set("")
bucket.pack()
Button(top, text="搜索", command=getMovieS).pack()
top.mainloop()
def recommend():
def getUserR():
con = pymysql.connect(host='localhost', port=3306, user='root', passwd=pWD, db=mDB, charset='utf8',
cursorclass=pymysql.cursors.DictCursor, autocommit=True)
cur = con.cursor()
ID = int(user_text.get())
user_text.set("")
select = 'SELECT movieID FROM recommend WHERe userID=%d' % (ID)
cur.execute(select)
mess = cur.fetchall()
cur.close()
con.close()
top = Toplevel()
top.geometry("500x300")
top.title('推荐结果')
scr = ScrolledText(top, width=60, height=20, font=("隶书", 10))
scr.pack()
for i in range(len(mess)):
mes = movies[mess[i]['movieID']] +'n'
scr.insert(END, mes)
top = tk.Toplevel() # 生成主窗口
top.title("个性化定制") # 窗体名称
top.geometry("300x100") # 指定窗体大小
input = Label(top, text="请输入用户ID")
input.pack()
user_text = StringVar()
bucket = Entry(top, textvariable=user_text)
user_text.set("")
bucket.pack()
Button(top, text="搜索", command=getUserR).pack()
top.mainloop()
def hot():
con = pymysql.connect(host='localhost', port=3306, user='root', passwd=pWD, db=mDB, charset='utf8',
cursorclass=pymysql.cursors.DictCursor, autocommit=True)
cur = con.cursor()
select = 'SELECT * FROM hot'
cur.execute(select)
mess = cur.fetchall()
cur.close()
con.close()
top = Toplevel()
top.geometry("500x300")
top.title('热门榜')
scr = ScrolledText(top, width=60, height=20, font=("隶书", 10))
scr.pack()
for i in range(len(mess)):
mes = str(i+1) + 't' +movies[mess[i]['movieID']] + 'n'
scr.insert(END, mes)
top.mainloop()
def average():
con = pymysql.connect(host='localhost', port=3306, user='root', passwd=pWD, db=mDB, charset='utf8',
cursorclass=pymysql.cursors.DictCursor, autocommit=True)
cur = con.cursor()
select = 'SELECT * FROM average'
cur.execute(select)
mess = cur.fetchall()
cur.close()
con.close()
top = Toplevel()
top.geometry("500x300")
top.title('好评榜')
scr = ScrolledText(top, width=60, height=20, font=("隶书", 10))
scr.pack()
for i in range(len(mess)):
mes = str(i+1) + 't' + movies[mess[i]['movieID']] + 't' + str(mess[i]['score']) +'n'
scr.insert(END, mes)
top.mainloop()
def analysis():
def scoreAnalysis():
ID = int(movie_text.get())
con = pymysql.connect(host='localhost', port=3306, user='root', passwd=pWD, db=mDB, charset='utf8',
cursorclass=pymysql.cursors.DictCursor, autocommit=True)
cur = con.cursor()
select = 'SELECT * FROM scoreM WHERe movieID=%d' % (ID)
cur.execute(select)
mess = cur.fetchall()
cur.close()
con.close()
x = []
y = []
level = 1
for k, v in (mess[0]).items():
x.append(level)
y.append(v)
level += 1
plt.bar(x, y)
index = np.arange(len(y));
for a, b in zip(index, y): # 柱子上的数字显示
plt.text(a, b, '%d' % b, ha='center', va='bottom', fontsize=10);
plt.title('电影' + movies[ID] + '评分结果')
plt.xlabel('评分')
plt.ylabel('人数')
plt.show()
def ageAnalysis():
con = pymysql.connect(host='localhost', port=3306, user='root', passwd=pWD, db=mDB, charset='utf8',
cursorclass=pymysql.cursors.DictCursor, autocommit=True)
cur = con.cursor()
select = 'SELECT * FROM age'
cur.execute(select)
mess = cur.fetchall()
cur.close()
con.close()
print(mess)
x = []
y = []
for i in range(len(mess)):
x.append(mess[i]['age'])
y.append(mess[i]['num'])
plt.bar(x,y)
index = np.arange(len(y));
for a, b in zip(index,y): # 柱子上的数字显示
plt.text(a, b, '%d' % b, ha='center', va='bottom', fontsize=10);
plt.title('用户年龄分布')
plt.xlabel('年龄段')
plt.ylabel('人数')
plt.show()
def jobAnalysis():
con = pymysql.connect(host='localhost', port=3306, user='root', passwd=pWD, db=mDB, charset='utf8',
cursorclass=pymysql.cursors.DictCursor, autocommit=True)
cur = con.cursor()
select = 'SELECT * FROM job'
cur.execute(select)
mess = cur.fetchall()
cur.close()
con.close()
top = Toplevel()
top.geometry("500x300")
top.title('用户职业分布')
scr = ScrolledText(top, width=60, height=20, font=("隶书", 10))
scr.pack()
for i in range(len(mess)):
mes = mess[i]['work'] + 't' + str(mess[i]['num']) + 'n'
scr.insert(END, mes)
top.mainloop()
'''x = []
y = []
for i in range(len(mess)):
x.append(mess[i]['work'])
y.append(mess[i]['num'])
plt.bar(x,y)
index = np.arange(len(y));
for a, b in zip(index,y): # 柱子上的数字显示
plt.text(a, b, '%d' % b, ha='center', va='bottom', fontsize=10);
plt.title('用户职业分布')
plt.xlabel('职业')
plt.ylabel('人数')
plt.show()'''
def genderAnalysis():
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
u_cols = ['user_id', 'age', 'sex', 'occupation', 'zip_code']
users = pd.read_csv('u.user', sep='|', names=u_cols, encoding='latin-1')
r_cols = ['user_id', 'movie_id', 'rating', 'unix_timestamp']
ratings = pd.read_csv('u.data', sep='t', names=r_cols, encoding='latin-1')
m_cols = ['movie_id', 'title', 'release_date', 'video_release_date', 'imdb_url']
movies = pd.read_csv('u.item', sep='|', names=m_cols, usecols=range(5), encoding='latin-1')
# 数据集整合
movie_ratings = pd.merge(movies, ratings)
lens = pd.merge(movie_ratings, users)
labels = ['0-9', '10-19', '20-29', '30-39', '40-49', '50-59', '60-69', '70-79']
lens['age_group'] = pd.cut(lens.age, range(0, 81, 10), right=False, labels=labels)
lens[['age', 'age_group']].drop_duplicates()[:10]
lens.groupby('age_group').agg({'rating': [np.size, np.mean]})
most_50 = lens.groupby('movie_id').size().sort_values(ascending=False)[:50]
lens.set_index('movie_id', inplace=True)
by_age = lens.loc[most_50.index].groupby(['title', 'age_group'])
lens.reset_index('movie_id', inplace=True)
pivoted = lens.pivot_table(index=['movie_id', 'title'],
columns=['sex'],
values='rating',
fill_value=0)
pivoted['diff'] = pivoted.M - pivoted.F
pivoted.reset_index('movie_id', inplace=True)
disagreements = pivoted[pivoted.movie_id.isin(most_50.index)]['diff']
disagreements.sort_values().plot(kind='barh', color='dodgerblue')
plt.title('男/女性平均评分n(差异>0=受男性青睐)', fontsize=14)
plt.ylabel('电影', fontsize=14)
plt.xlabel('平均评级差', fontsize=14)
plt.show()
top = Toplevel()
top.geometry("500x300")
input = Label(top, text="请输入电影ID")
input.pack()
movie_text = StringVar()
bucket = Entry(top, textvariable=movie_text)
movie_text.set("")
bucket.pack()
Button(top, text="电影评分分析", command=scoreAnalysis).pack()
Button(top, text="用户年龄分析", command=ageAnalysis).pack()
Button(top, text="用户职业分析", command=jobAnalysis).pack()
Button(top, text="用户偏好分析", command=genderAnalysis).pack()
top.title('电影数据')
top.mainloop()
if __name__ == "__main__":
root = tk.Tk() # 生成主窗口
root.title("电影推荐系统") # 窗体名称
root.geometry("700x250") # 指定窗体大小
Button(root, text="电影推荐", command=recommend).pack()
Button(root, text="电影热度榜", command=hot).pack()
Button(root, text="电影好评榜", command=average).pack()
Button(root, text="电影数据大全", command=analysis).pack()
Button(root, text="相似电影", command=similarity).pack()
root.mainloop()
#con = pymysql.connect(host='localhost', port=3306, user='root', passwd='', db='test1', charset='utf8',cursorclass=pymysql.cursors.DictCursor, autocommit=True)
#cur1 = con.cursor()
#cur1.close()
#con.close()


