原始数据是这样的
//ip地址
//日期
//请求方式
//状态码
代理
访问网址
http协议
请求页面地址
操作系统名称
//浏览器名称
导入依赖
3,LogMapper类4.0.0 org.example code 1.0-SNAPSHOT 8 8 org.springframework.boot spring-boot-maven-plugin lib BOOT-INF/lib/ ** public class LogBean implements Writable { private String ip;//ip地址 private String date;//日期 private String method;//请求方式 private String code;//状态码 private String userAgent;//代理 private String staticUrl;//访问网址 private String http;//http协议 private String url;//请求页面地址 private String os;//操作系统名称 private String browser; //浏览器名称 public LogBean() {super();} @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(this.ip); dataOutput.writeUTF(this.date); dataOutput.writeUTF(this.method); dataOutput.writeUTF(this.code); dataOutput.writeUTF(this.userAgent); dataOutput.writeUTF(this.url); dataOutput.writeUTF(this.http); dataOutput.writeUTF(this.staticUrl); dataOutput.writeUTF(this.os); dataOutput.writeUTF(this.browser); } @Override public void readFields(DataInput dataInput) throws IOException { this.ip = dataInput.readUTF(); this.date = dataInput.readUTF(); this.method = dataInput.readUTF(); this.code = dataInput.readUTF(); this.userAgent = dataInput.readUTF(); this.url = dataInput.readUTF(); this.http = dataInput.readUTF(); this.staticUrl = dataInput.readUTF(); this.os = dataInput.readUTF(); this.browser = dataInput.readUTF(); } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public String getDate() { return date; } public void setDate(String date) { this.date = date; } public String getMethod() { return method; } public void setMethod(String method) { this.method = method; } public String getCode() { return code; } public void setCode(String code) { this.code = code; } public String getUserAgent() { return userAgent; } public void setUserAgent(String userAgent) { this.userAgent = userAgent; } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public String getHttp() { return http; } public void setHttp(String http) { this.http = http; } public String getStaticUrl() { return staticUrl; } public void setStaticUrl(String staticUrl) { this.staticUrl = staticUrl; } public String getOs() { return os; } public void setOs(String os) { this.os = os; } public String getBrowser() { return browser; } public void setBrowser(String browser) { this.browser = browser; } @Override public String toString() { return ip +'t'+ date + 't'+ method + 't' + code + 't' + userAgent + 't' + staticUrl + 't' + http + 't' + url + 't' + os + 't'+ browser; } }
package cn.awz.log; import nl.bitwalker.useragentutils.UserAgent; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class LogMapper extends Mapper4,LogReduce类{ private static Text logMapperKey = new Text(); private static LogBean logMapperValue = new LogBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lines = value.toString(); if (!parse(lines)) { return; } logMapperKey.set(logMapperValue.getIp()); context.write(logMapperKey,logMapperValue); } public static boolean parse(String line) { if (line == null) { return false; } String[] lines = line.split("""); //判断lines长度小于8的为不合法请求直接拦截 if (lines.length < 8) return false; if (lines.length < 6) { logMapperValue.setUserAgent(""); } else { logMapperValue.setUserAgent(lines[5]); logMapperValue.setBrowser(getBrowser(lines[5])); logMapperValue.setOs(getOS(lines[5])); } String[] logBeanValue = lines[0].split(" "); String[] logBeanValueUrl = lines[1].split(" "); String[] logBeanStatus = lines[2].split(" "); logMapperValue.setIp(logBeanValue[0]); logMapperValue.setDate(logBeanValue[3].replace("[","")); if (logBeanValueUrl.length == 1) { return false; } if (logBeanValueUrl.length < 3) { logMapperValue.setHttp(" "); } else { logMapperValue.setHttp(logBeanValueUrl[2]); } logMapperValue.setMethod(logBeanValueUrl[0]); logMapperValue.setUrl(logBeanValueUrl[1]); logMapperValue.setStaticUrl(lines[3]); logMapperValue.setCode(logBeanStatus[1]); return true; } //获取系统信息 private static UserAgent getOperatingSystem(String userAgents) { return UserAgent.parseUserAgentString(userAgents); } //获取系统名称 public static String getOS(String userAgents) { return getOperatingSystem(userAgents).getOperatingSystem().getName(); } //获取浏览器名称 public static String getBrowser(String userAgents) { return getOperatingSystem(userAgents).getBrowser().getName(); } }
package cn.awz.log; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class LogReduce extends Reducer5,LogDriver类{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { for (LogBean value : values) { context.write(value,null); } } }
package cn.awz.log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class LogDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(LogDriver.class);
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LogBean.class);
job.setOutputKeyClass(LogBean.class);
job.setOutputValueClass(NullWritable.class);
// job.setPartitionerClass(LogPartitioner.class);
// job.setNumReduceTasks(7);
//D:HadoopDevelopmentclassDemodata
//out
FileInputFormat.setInputPaths(job,new Path("D:HadoopDevelopmentclassDemodata"));
FileOutputFormat.setOutputPath(job,new Path("out"));
job.waitForCompletion(true);
}
}
查看结果



