栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

【玩转Kafka】海量日志收集实战之架构介绍与日志输出

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

【玩转Kafka】海量日志收集实战之架构介绍与日志输出

海量日志收集实战

话不多说,看架构图:

最左侧,是Beats,它主要是用于收集日志的,比如这个Filebeat它的底层是用erlang语言写的,性能非常好,其实我们的系统中打印出来的日志,都会用这个Filebeat给我们抓取出来。
在我们这个架构中,Filebeat的主要作用就是把我们的日志搜集出来并转储到Kafka。

然后看Kafka的右边,是Logstash,它主要是用日志做一个过滤,然后它会把你过滤后的数据发送到Elasticsearch里。

最后利用Kibana进行展示。

接下来,我们就来说一下我们要做的事情,先看下这张图:

我们这了采用的是Log4j2,为什么不采用SpringBoot默认集成的logback?
这是因为Log4j2性能会更好一些,因为它的底层是基于无锁并行框架Disruptor做的,关于Disruptor大家可以看看这篇文章《java高阶编程之无锁并行计算框架——Disruptor初识》,当然,它想要发挥出威力还是有一定前提的,就是要求你配置一定要高,因为Disruptor比较耗内存和CPU,所以你想要日志收集的更实时,那对应的你应用服务的性能和配置也要高一些,并且也要经过一些压测。

这里我们看一下图中两个粉色的app.log与error.log,这里我们把它分成两个对应的log日志文件,app.log在这里我们可以存储一个全量的log日志,也就是说无论我们的业务打什么样级别的日志,都会存储到app.log里。然后error.log,只要出现异常就会存储在error.log里。这样的的目的主要是用于我们后续做一些告警和分析,所以我们后续的一些告警会通过对这个error.log做一些手段,比如定时任务做定时抓取。

最后,我们把这些日志通过filebeat抓取出来,转储到Kafka中,然后Kafka收到消息后,他对应的消费者也就是Logstash,它得到消息后在发送给Elasticsearch。

以上就是这个架构的整体设计。

日志输出

我们直接开拔,打开Idea,每天coding十小时,健康工作一百年。

首先引入依赖:


        org.springframework.boot
        spring-boot-starter-parent
        2.1.5.RELEASE
        
    

        
            org.springframework.boot
            spring-boot-starter-web
            
                
                    org.springframework.boot
                    spring-boot-starter-logging
                
            
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
            org.projectlombok
            lombok
        
        
            org.springframework.boot
            spring-boot-starter-log4j2
        
        
            com.alibaba
            fastjson
            1.2.58
        
        
            com.lmax
            disruptor
            3.3.4
        
    

工具类:
这是Dubbo底层用到的一个工具类,可以拿来直接使用。

public class NetUtil {

    public static String normalizeAddress(String address) {
        String[] blocks = address.split("[:]");
        if (blocks.length > 2) {
            throw new IllegalArgumentException(address + " is invalid");
        }
        String host = blocks[0];
        int port = 80;
        if (blocks.length > 1) {
            port = Integer.valueOf(blocks[1]);
        } else {
            address += ":" + port; //use default 80
        }
        String serverAddr = String.format("%s:%d", host, port);
        return serverAddr;
    }

    public static String getLocalAddress(String address) {
        String[] blocks = address.split("[:]");
        if (blocks.length != 2) {
            throw new IllegalArgumentException(address + " is invalid address");
        }
        String host = blocks[0];
        int port = Integer.valueOf(blocks[1]);

        if ("0.0.0.0".equals(host)) {
            return String.format("%s:%d", NetUtil.getLocalIp(), port);
        }
        return address;
    }

    private static int matchedIndex(String ip, String[] prefix) {
        for (int i = 0; i < prefix.length; i++) {
            String p = prefix[i];
            if ("*".equals(p)) { /
    public static  T convertJSONToObject(String data, Class clzss) {
        try {
            T t = JSON.parseObject(data, clzss);
            return t;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    
    public static  T convertJSONToObject(JSONObject data, Class clzss) {
        try {
            T t = JSONObject.toJavaObject(data, clzss);
            return t;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    
    public static  List convertJSONToArray(String data, Class clzss) {
        try {
            List t = JSON.parseArray(data, clzss);
            return t;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    
    public static  List convertJSONToArray(List data, Class clzss) {
        try {
            List t = new ArrayList();
            for (JSONObject jsonObject : data) {
                t.add(convertJSONToObject(jsonObject, clzss));
            }
            return t;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    
    public static String convertObjectToJSON(Object obj) {
        try {
            String text = JSON.toJSONString(obj);
            return text;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    
    public static JSONObject convertObjectToJSONObject(Object obj){
        try {
            JSONObject jsonObject = (JSONObject) JSONObject.toJSON(obj);
            return jsonObject;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }


    
    public static String convertObjectToJSONWithNullValue(Object obj) {
        try {
            String text = JSON.toJSONString(obj, featuresWithNullValue);
            return text;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
}

然后,我们再写一个工具类:

@Componet
public class InputMDC implements EnvironmentAware {

    private static Environment environment;
    @Override
    public void setEnvironment(Environment environment) {
        InputMDC.environment = environment;
    }

    public static void putMDC() {
        MDC.put("hostName",NetUtil.getLocalHostName());
        MDC.put("ip",NetUtil.getLocalIp());
        MDC.put("applicationName",environment.getProperty("spring.application.name"));
    }
}

创建Controller:

@Slf4j
@RestController
public class IndexController {

    @RequestMapping(value = "/index")
    public String index(){
        InputMDC.putMDC();
        log.info("我是一个info级别日志");
        log.warn("我是一个warn级别日志");
        log.error("我是一个error级别日志");
        return "idx";
    }
}

创建application.yml:

server:
  servlet:
    context-path: /
  port: 8001

spring:
  application:
    name: log-collector
  http:
    encoding:
      charset: UTF-8
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: NON_NULL

创建log4j2.xml



    
        logs
        collector
        [%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n
    
    
        
            
          
        
          
          
              
              
          
                   
        
        
          
          
              
                        
          
              
              
          
                   
                    
    
    
        
        
          
        
        
          
               
        
            
            
            
                 
    

我们这里重点关注一下这个AsyncLogger,它这里就是底层使用了Disruptor,我们可以进到源码里看看:

点进去:

不难发现,它的log4j2的底层确实是采用了Disruptor,大家可以打个断点看看,这里就不详细探讨了。

项目结构:

我们启动一下项目,查看控制台:

可以看到,我们的控制台没有任何错误信息,说明已经正常启动起来了。

并且项目下多出来一个logs目录:

我们访问localhost:8001/index,在查看控制台


可以看到,我们想要的一些信息也通过InputMDC拿到了。

然后,我们人为制造一个异常:

访问后查看控制台:

并且我们也能在错误日志中看到这个错误信息:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/605484.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号