栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hadoop清洗Nginx日志数据

Hadoop清洗Nginx日志数据

对于上一篇Flume跨服务器监控日志数据 获取的nginx日志数据进行数据清洗

原始数据是这样的

对这些数据进行切分 ,获取

//ip地址
//日期
//请求方式
//状态码
代理
访问网址
http协议
请求页面地址
操作系统名称
//浏览器名称

创建Maven项目

导入依赖



    4.0.0

    org.example
    code
    1.0-SNAPSHOT

    
        8
        8
    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
        
        
            
                lib
                BOOT-INF/lib/
                
                    **
public class LogBean implements Writable {

    private String ip;//ip地址
    private String date;//日期
    private String method;//请求方式
    private String code;//状态码
    private String userAgent;//代理
    private String staticUrl;//访问网址
    private String http;//http协议
    private String url;//请求页面地址
    private String os;//操作系统名称
    private String browser; //浏览器名称

    public LogBean() {super();}

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(this.ip);
        dataOutput.writeUTF(this.date);
        dataOutput.writeUTF(this.method);
        dataOutput.writeUTF(this.code);
        dataOutput.writeUTF(this.userAgent);
        dataOutput.writeUTF(this.url);
        dataOutput.writeUTF(this.http);
        dataOutput.writeUTF(this.staticUrl);
        dataOutput.writeUTF(this.os);
        dataOutput.writeUTF(this.browser);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {

        this.ip = dataInput.readUTF();
        this.date = dataInput.readUTF();
        this.method = dataInput.readUTF();
        this.code = dataInput.readUTF();
        this.userAgent = dataInput.readUTF();
        this.url = dataInput.readUTF();
        this.http = dataInput.readUTF();
        this.staticUrl = dataInput.readUTF();
        this.os = dataInput.readUTF();
        this.browser = dataInput.readUTF();

    }

    public String getIp() {
        return ip;
    }

    public void setIp(String ip) {
        this.ip = ip;
    }

    public String getDate() {
        return date;
    }

    public void setDate(String date) {
        this.date = date;
    }

    public String getMethod() {
        return method;
    }

    public void setMethod(String method) {
        this.method = method;
    }

    public String getCode() {
        return code;
    }

    public void setCode(String code) {
        this.code = code;
    }

    public String getUserAgent() {
        return userAgent;
    }

    public void setUserAgent(String userAgent) {
        this.userAgent = userAgent;
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public String getHttp() {
        return http;
    }

    public void setHttp(String http) {
        this.http = http;
    }

    public String getStaticUrl() {
        return staticUrl;
    }

    public void setStaticUrl(String staticUrl) {
        this.staticUrl = staticUrl;
    }

    public String getOs() {
        return os;
    }

    public void setOs(String os) {
        this.os = os;
    }

    public String getBrowser() {
        return browser;
    }

    public void setBrowser(String browser) {
        this.browser = browser;
    }

    @Override
    public String toString() {
        return ip +'t'+ date + 't'+ method + 't' +
                code + 't' + userAgent + 't' + staticUrl + 't' +
                http + 't' + url + 't' + os + 't'+ browser;
    }
}

3,LogMapper类
package cn.awz.log;


import nl.bitwalker.useragentutils.UserAgent;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class LogMapper extends Mapper {

    private static Text logMapperKey = new Text();
    private static LogBean logMapperValue = new LogBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String lines = value.toString();

        if (!parse(lines)) {
            return;
        }

        logMapperKey.set(logMapperValue.getIp());

        context.write(logMapperKey,logMapperValue);
    }

    public static boolean parse(String line) {

        if (line == null) {
            return false;
        }

        String[] lines = line.split(""");

        //判断lines长度小于8的为不合法请求直接拦截
        if (lines.length < 8) return false;

        if (lines.length < 6) {
            logMapperValue.setUserAgent("");
        } else {
            logMapperValue.setUserAgent(lines[5]);
            logMapperValue.setBrowser(getBrowser(lines[5]));
            logMapperValue.setOs(getOS(lines[5]));
        }

        String[] logBeanValue = lines[0].split(" ");

        String[] logBeanValueUrl = lines[1].split(" ");

        String[] logBeanStatus = lines[2].split(" ");

        logMapperValue.setIp(logBeanValue[0]);
        logMapperValue.setDate(logBeanValue[3].replace("[",""));

        if (logBeanValueUrl.length == 1) {
            return false;
        }
        if (logBeanValueUrl.length < 3) {
            logMapperValue.setHttp(" ");
        } else {
            logMapperValue.setHttp(logBeanValueUrl[2]);
        }



        logMapperValue.setMethod(logBeanValueUrl[0]);
        logMapperValue.setUrl(logBeanValueUrl[1]);

        logMapperValue.setStaticUrl(lines[3]);
        logMapperValue.setCode(logBeanStatus[1]);

        return true;
    }

    //获取系统信息
    private static UserAgent getOperatingSystem(String userAgents) {
        return UserAgent.parseUserAgentString(userAgents);
    }

    //获取系统名称
    public static String getOS(String userAgents) {
        return getOperatingSystem(userAgents).getOperatingSystem().getName();
    }
    //获取浏览器名称
    public static String getBrowser(String userAgents) {
        return getOperatingSystem(userAgents).getBrowser().getName();
    }

}

4,LogReduce类
package cn.awz.log;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


public class LogReduce extends Reducer {

    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

        for (LogBean value : values) {
            context.write(value,null);
        }
    }
}

5,LogDriver类
package cn.awz.log;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;



public class LogDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {


        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        job.setJarByClass(LogDriver.class);
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReduce.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LogBean.class);

        job.setOutputKeyClass(LogBean.class);
        job.setOutputValueClass(NullWritable.class);

//        job.setPartitionerClass(LogPartitioner.class);
//        job.setNumReduceTasks(7);

        //D:HadoopDevelopmentclassDemodata
        //out
        FileInputFormat.setInputPaths(job,new Path("D:HadoopDevelopmentclassDemodata"));
        FileOutputFormat.setOutputPath(job,new Path("out"));

        job.waitForCompletion(true);

    }

}

查看结果

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/773818.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号