通过java实现mapreduce对电影评论内容的分词进行词频统计,以此来制作电影评论内容的词云,基于hadoop集群实现。
代码如下:
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.huaban.analysis.jieba.Jiebasegmenter;
public class emotion_analysis_wordcount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if(otherArgs.length < 2) {
System.err.println("Usage: Emotion_Analysis [...] ");
System.exit(2);
}
Job job = Job.getInstance(conf, "Emotion Analysis");
job.setJarByClass(emotion_analysis_wordcount.class);
job.setMapperClass(emotion_analysis_wordcount.TokenizerMapper.class);
//job.setCombinerClass(Emotion_Analysis.IntSumReducer.class);
job.setReducerClass(emotion_analysis_wordcount.IntSumReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.addInputPath(job, new Path("/user/hadoop/movie/test2.csv"));
//设置文件输出
FileOutputFormat.setOutputPath(job, new Path("/user/hadoop/output2"));
//解决输出路径已经存在的问题
FileSystem fileSystem = FileSystem.get(conf);
Path outputPath = new Path("/user/hadoop/output2");
if (fileSystem.exists(outputPath)) {
fileSystem.delete(outputPath, true);
}
//3.执行
job.waitForCompletion(true);
}
public static class TokenizerMapper extends Mapper {
public TokenizerMapper() {
}
public static List most = new ArrayList<>();
public static List very = new ArrayList<>();
public static List more = new ArrayList<>();
public static List ish = new ArrayList<>();
public static List insufficiently = new ArrayList<>();
public static List over = new ArrayList<>();
public static List negative_words = new ArrayList<>();
public static List postive_words = new ArrayList<>();
public static List stop_words = new ArrayList<>();
public static void read() throws IOException {
String temp=null;
String filepath="/home/hadoop/Chinese_English_stopwords.txt";
File file =new File(filepath);
BufferedReader bufferedReader = new BufferedReader(new FileReader(file));
while ((temp = bufferedReader.readLine()) != null) {
stop_words.add(temp.replace(" ", ""));
}
}
public static List withoutstopwords(String oldstring) throws IOException{
String newString = oldstring;
Jiebasegmenter segmenter = new Jiebasegmenter();
Listtermlist=segmenter.sentenceProcess(newString);
termlist.removeAll(stop_words);
return termlist;
}
public static boolean isNumeric(String str){
Pattern pattern = Pattern.compile("[0-9]*");
Matcher isNum = pattern.matcher(str);
if( !isNum.matches() ){
return true;
}
return false;
}
public static boolean checkname(String name)
{
int n = 0;
for(int i = 0; i < name.length(); i++) {
n = (int)name.charAt(i);
if(!(19968 <= n && n <40869)) {
return false;
}
}
return true;
}
public void setup() throws IOException, InterruptedException {
System.out.println("setup");
read();
}
public void run(Context context) throws IOException, InterruptedException {
setup();
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
}
finally {
cleanup(context);
}
}
public void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
String line=value.toString();
String[] words = line.split(",");
if (words.length-1<=8)
return;
String[] pre=Arrays.copyOfRange(words,0,8);
String[] comment_lines=Arrays.copyOfRange(words, 8,words.length-1);
String commentString="";
for(String comment:comment_lines)
{
commentString+=comment;
}
if (isNumeric(pre[0]))
{
return;
}
List comment=withoutstopwords(commentString);
for (String g : comment) {
if (!checkname(g.replace(" ", "")))
{
return;
}
context.write(new Text(pre[1]+","+g),new IntWritable(1));
}
}
}
public static class IntSumReducer extends Reducer {
public IntSumReducer() {
}
public static Configuration configuration;
public static Connection connection;
public static Admin admin;
public static Table table;
public static void insertData(String rowKey,String colFamily,String col,String val) throws IOException {
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(colFamily.getBytes(),col.getBytes(), val.getBytes());
table.put(put);
}
public void setup() throws IOException, InterruptedException {
configuration = HbaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum","slave1");
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();
TableName tableName=TableName.valueOf("movie2");
String[] colFamily= {"information"};
if (admin.tableExists(tableName))
{
System.out.println("文件存在,我要删了他");
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
TableDescriptorBuilder tableDescriptor = TableDescriptorBuilder.newBuilder(tableName);
for(String str:colFamily){
ColumnFamilyDescriptor family = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(str)).build();
tableDescriptor.setColumnFamily(family);
}
admin.createTable(tableDescriptor.build());
table = connection.getTable(tableName);
}
public void run(Context context) throws IOException, InterruptedException {
setup();
try {
while (context.nextKeyValue()) {
reduce(context.getCurrentKey(), context.getValues(), context);
}
} finally {
cleanup(context);
}
}
public void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
int sum = 0;
IntWritable val;
for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
val = (IntWritable)i$.next();
}
if (sum<5)
return;
insertData(key.toString(),"information","number",""+sum);
context.write(key, new DoubleWritable(sum));
}
}
}



