入门级项目,实践一下,分析并统计服务器运行日志中调用量最多的SQL语句,把它进行缓存
pom.xml 引入依赖
4.0.0 test.hadoop WordCount0.0.1-acute jar WordCount http://maven.apache.org UTF-8 org.apache.hadoop hadoop-common0.23.11 org.apache.hadoop hadoop-hdfs0.23.11 org.apache.hadoop hadoop-client0.23.11
总涉及3个类,一个是程序启动类及两个执行不同统计的功能类
package test.hadoop.line;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
public class LineMatcherStarter {
public static void main(String[] args) throws IOException {
// 根据参数调用不同功能
int key = -1;
try {
key = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
key = 0;
return;
}
switch (key) {
case 1:
countJob(key, args); // 计数任务
break;
case 2:
collectJob(key, args); // 收集任务
break;
default:
printUsage();
}
}
private static void printUsage() {
System.out.println("Usage: java [-options] -jar jarfile class [args...]");
System.out.println(" class a.b.c.Starter");
System.out.println(" args[0] 1=count 2=line");
System.out.println(" args[1] source");
System.out.println(" args[2] destination");
}
private static void collectJob(int key, String[] args) throws IOException {
if (args.length < 4) {
printUsage();
System.out.println(" args[3] expression");
System.out.println(" args[4] rule=[starts|contains|ends]");
System.out.println(" args[5] max line default=9999");
return;
}
// Hadoop任务的初始化操作,版本不同写法不同
JobConf conf = new JobConf(LineMatcherStarter.class);
conf.setJobName("LineCollect");
conf.setMapperClass(TextWithLineMapperReducer.class);
conf.setCombinerClass(TextWithLineReducer.class);
conf.setReducerClass(TextWithLineReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(LineArrayWritable.class);
conf.setOutputFormat(TextWithLineOutputFormat.class);
// 指定文件输入路径 和 输出路径
FileInputFormat.setInputPaths(conf, args[1]);
FileOutputFormat.setOutputPath(conf, new Path(args[2]));
// 自定义属性,用于搜索的 字符串 和 匹配规则(开头,包含,结尾)
conf.set("TEXTWITHLINE.search", args[3]);
conf.set("TEXTWITHLINE.rule", args[4]);
if (args.length == 6) {
// 每个任务在分布式机器上的最大统计行数
// 根据内存估算,不然有可能会引发OOM异常,别问我是怎么知道的
conf.set("TEXTWITHLINE.maxLine", args[5]);
}
// 执行任务
JobClient.runJob(conf);
}
private static void countJob(int key, String[] args) throws IOException {
if (args.length < 4) {
printUsage();
System.out.println(" args[3] expression");
System.out.println(" args[4] rule=[starts|contains|ends]");
return;
}
JobConf conf = new JobConf(LineMatcherStarter.class);
conf.setJobName("LineCount");
conf.setMapperClass(LineCountMapperReducer.class);
conf.setCombinerClass(LineCountReducer.class);
conf.setReducerClass(LineCountReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(conf, args[1]);
FileOutputFormat.setOutputPath(conf, new Path(args[2]));
conf.set("TEXTWITHLINE.search", args[3]);
conf.set("TEXTWITHLINE.rule", args[4]);
JobClient.runJob(conf);
}
}
package test.hadoop.line; import java.io.DataOutputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReducebase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; public class TextWithLineMapperReducer extends MapReducebase implements Mapper{ private Text keyText; private String search; public TextWithLineMapperReducer() throws FileNotFoundException, IOException { } public void configure(JobConf job) { search = job.get("TEXTWITHLINE.search"); } public void map(LongWritable k,Text v,OutputCollector o,Reporter r)throws IOException{ if (search == null || keyText == null) { keyText = new Text(search); if (search.contentEquals("") || keyText == null) { throw new RuntimeException("Search is empty!"); } } String line = v.toString(); if (line.indexOf(search) >= 0) { o.collect(keyText, new LineArrayWritable(new Text[]{v})); } } } class TextWithLineReducer extends MapReducebase implements Reducer { private int max = Integer.MAX_VALUE; public void configure(JobConf job) { max = Integer.valueOf(job.get("TEXTWITHLINE.maxLine", "9999")); } public void reduce(Text k,Iterator v,OutputCollector o,Reporter r)throws IOException{ List list = new ArrayList<>(); int i = 0; while (v.hasNext()) { String[] ss = v.next().toStrings(); for (String s : ss) { if (i++ < max) list.add(new Text(s)); } } o.collect(k, new LineArrayWritable(list.toArray(new Text[0]))); } } class TextWithLineWriter implements RecordWriter { private static final byte[] newline = getBytes("rn"); static { } private static byte[] getBytes(String s) { try { return s.getBytes("UTF-8"); } catch (UnsupportedEncodingException uee) { throw new IllegalArgumentException("can't find " + "UTF-8" + " encoding"); } } protected DataOutputStream out; public TextWithLineWriter(DataOutputStream s) { out = s; } public synchronized void write(Text key, LineArrayWritable value) throws IOException { out.write(getBytes("----->" + key.toString())); out.write(newline); writeArray(value); out.write(newline); } private void writeArray(LineArrayWritable aw) throws IOException { int i = 0; for (String s : aw.toStrings()) { out.write(getBytes("-->" + (i++) + "->" + s)); out.write(newline); } } public void close(Reporter reporter) throws IOException { out.close(); } } class TextWithLineOutputFormat extends TextOutputFormat { public RecordWriter getRecordWriter(FileSystem ignored,JobConf job,String name,Progressable p)throws IOException{ boolean isCompressed = getCompressOutput(job); if (!isCompressed) { Path file = FileOutputFormat.getTaskOutputPath(job, name); FileSystem fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file, p); return new TextWithLineWriter(fileOut); } else { Class extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); // create the named codec CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job); // build the filename including the extension Path file = FileOutputFormat.getTaskOutputPath(job,name+codec.getDefaultExtension()); FileSystem fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file, p); return new TextWithLineWriter(fileOut); } } } class LineArrayWritable extends ArrayWritable { public LineArrayWritable() { super(Text.class); } public LineArrayWritable(Text[] array) { super(Text.class); Text[] texts = new Text[array.length]; for (int i = 0; i < array.length; ++i) { texts[i] = new Text(array[i]); } set(texts); } }
package test.hadoop.line; import java.io.IOException; import java.util.Iterator; import java.util.function.Predicate; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReducebase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; public class LineCountMapperReducer extends MapReducebase implements Mapper{ private static final IntWritable ONE = new IntWritable(1); private String search; private Text key; private Predicate rule; private Predicate starts = s -> s.startsWith(search); private Predicate contains = s -> s.contains(search); private Predicate ends = s -> s.endsWith(search); public void configure(JobConf job) { search = job.get("TEXTWITHLINE.search"); key = new Text(search); switch (job.get("TEXTWITHLINE.rule")) { case "starts": rule = starts; break; case "ends": rule = ends; break; case "contains": default: rule = contains; } } public void map(LongWritable k,Text v,OutputCollector o,Reporter r)throws IOException{ String line = v.toString(); if (rule.test(line)) { o.collect(key, ONE); } } } class LineCountReducer extends MapReducebase implements Reducer { public void reduce(Text k,Iterator v,OutputCollector o,Reporter r)throws IOException{ int sum = 0; while (v.hasNext()) { sum += v.next().get(); } o.collect(k, new IntWritable(sum)); } }
本人使用了虚拟机,安装,克隆,修改主机名和用户,参考准备工作的文章 Ubuntu 14.04 上实现 更改用户名 用户组 域名 主机名 和 Ubuntu 14.04 上实现 SSH 无密码访问
《未完待续》



