话不多说,看架构图:
最左侧,是Beats,它主要是用于收集日志的,比如这个Filebeat它的底层是用erlang语言写的,性能非常好,其实我们的系统中打印出来的日志,都会用这个Filebeat给我们抓取出来。
在我们这个架构中,Filebeat的主要作用就是把我们的日志搜集出来并转储到Kafka。
然后看Kafka的右边,是Logstash,它主要是用日志做一个过滤,然后它会把你过滤后的数据发送到Elasticsearch里。
最后利用Kibana进行展示。
接下来,我们就来说一下我们要做的事情,先看下这张图:
我们这了采用的是Log4j2,为什么不采用SpringBoot默认集成的logback?
这是因为Log4j2性能会更好一些,因为它的底层是基于无锁并行框架Disruptor做的,关于Disruptor大家可以看看这篇文章《java高阶编程之无锁并行计算框架——Disruptor初识》,当然,它想要发挥出威力还是有一定前提的,就是要求你配置一定要高,因为Disruptor比较耗内存和CPU,所以你想要日志收集的更实时,那对应的你应用服务的性能和配置也要高一些,并且也要经过一些压测。
这里我们看一下图中两个粉色的app.log与error.log,这里我们把它分成两个对应的log日志文件,app.log在这里我们可以存储一个全量的log日志,也就是说无论我们的业务打什么样级别的日志,都会存储到app.log里。然后error.log,只要出现异常就会存储在error.log里。这样的的目的主要是用于我们后续做一些告警和分析,所以我们后续的一些告警会通过对这个error.log做一些手段,比如定时任务做定时抓取。
最后,我们把这些日志通过filebeat抓取出来,转储到Kafka中,然后Kafka收到消息后,他对应的消费者也就是Logstash,它得到消息后在发送给Elasticsearch。
以上就是这个架构的整体设计。
日志输出我们直接开拔,打开Idea,每天coding十小时,健康工作一百年。
首先引入依赖:
org.springframework.boot spring-boot-starter-parent 2.1.5.RELEASE org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-logging org.springframework.boot spring-boot-starter-test test org.projectlombok lombok org.springframework.boot spring-boot-starter-log4j2 com.alibaba fastjson 1.2.58 com.lmax disruptor 3.3.4
工具类:
这是Dubbo底层用到的一个工具类,可以拿来直接使用。
public class NetUtil {
public static String normalizeAddress(String address) {
String[] blocks = address.split("[:]");
if (blocks.length > 2) {
throw new IllegalArgumentException(address + " is invalid");
}
String host = blocks[0];
int port = 80;
if (blocks.length > 1) {
port = Integer.valueOf(blocks[1]);
} else {
address += ":" + port; //use default 80
}
String serverAddr = String.format("%s:%d", host, port);
return serverAddr;
}
public static String getLocalAddress(String address) {
String[] blocks = address.split("[:]");
if (blocks.length != 2) {
throw new IllegalArgumentException(address + " is invalid address");
}
String host = blocks[0];
int port = Integer.valueOf(blocks[1]);
if ("0.0.0.0".equals(host)) {
return String.format("%s:%d", NetUtil.getLocalIp(), port);
}
return address;
}
private static int matchedIndex(String ip, String[] prefix) {
for (int i = 0; i < prefix.length; i++) {
String p = prefix[i];
if ("*".equals(p)) { /
public static T convertJSONToObject(String data, Class clzss) {
try {
T t = JSON.parseObject(data, clzss);
return t;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public static T convertJSONToObject(JSONObject data, Class clzss) {
try {
T t = JSONObject.toJavaObject(data, clzss);
return t;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public static List convertJSONToArray(String data, Class clzss) {
try {
List t = JSON.parseArray(data, clzss);
return t;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public static List convertJSONToArray(List data, Class clzss) {
try {
List t = new ArrayList();
for (JSONObject jsonObject : data) {
t.add(convertJSONToObject(jsonObject, clzss));
}
return t;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public static String convertObjectToJSON(Object obj) {
try {
String text = JSON.toJSONString(obj);
return text;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public static JSONObject convertObjectToJSONObject(Object obj){
try {
JSONObject jsonObject = (JSONObject) JSONObject.toJSON(obj);
return jsonObject;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
public static String convertObjectToJSONWithNullValue(Object obj) {
try {
String text = JSON.toJSONString(obj, featuresWithNullValue);
return text;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}
然后,我们再写一个工具类:
@Componet
public class InputMDC implements EnvironmentAware {
private static Environment environment;
@Override
public void setEnvironment(Environment environment) {
InputMDC.environment = environment;
}
public static void putMDC() {
MDC.put("hostName",NetUtil.getLocalHostName());
MDC.put("ip",NetUtil.getLocalIp());
MDC.put("applicationName",environment.getProperty("spring.application.name"));
}
}
创建Controller:
@Slf4j
@RestController
public class IndexController {
@RequestMapping(value = "/index")
public String index(){
InputMDC.putMDC();
log.info("我是一个info级别日志");
log.warn("我是一个warn级别日志");
log.error("我是一个error级别日志");
return "idx";
}
}
创建application.yml:
server:
servlet:
context-path: /
port: 8001
spring:
application:
name: log-collector
http:
encoding:
charset: UTF-8
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: NON_NULL
创建log4j2.xml
logs collector [%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n
我们这里重点关注一下这个AsyncLogger,它这里就是底层使用了Disruptor,我们可以进到源码里看看:
点进去:
不难发现,它的log4j2的底层确实是采用了Disruptor,大家可以打个断点看看,这里就不详细探讨了。
项目结构:
我们启动一下项目,查看控制台:
可以看到,我们的控制台没有任何错误信息,说明已经正常启动起来了。
并且项目下多出来一个logs目录:
我们访问localhost:8001/index,在查看控制台
可以看到,我们想要的一些信息也通过InputMDC拿到了。
然后,我们人为制造一个异常:
访问后查看控制台:
并且我们也能在错误日志中看到这个错误信息:



