总体的流程图如下
添加相关依赖
org.springframework.boot spring-boot-starter-weborg.springframework.boot spring-boot-starter-loggingorg.springframework.boot spring-boot-starter-testtest org.projectlombok lombokorg.springframework.boot spring-boot-starter-log4j2com.lmax disruptor3.3.4 com.alibaba fastjson1.2.58 collector src/main/java ***.xml true src/main/resources org.springframework.boot spring-boot-maven-plugincom.xp.logcollector.Application
配置log4j2.xml
logs logcollector [%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n
项目中打印日志使用
package com.xp.logcollector.web;
import com.xp.logcollector.utils.InputMDC;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
public class IndexController {
@RequestMapping("/index")
public String index(){
InputMDC.putMDC();
log.info("index中的info级别日志");
log.error("index中的error级别日志");
log.warn("index中的warn级别日志");
return "index";
}
@RequestMapping(value = "/err")
public String err() {
InputMDC.putMDC();
try {
int a = 1/0;
} catch (Exception e) {
log.error("算术异常", e);
}
return "err";
}
}
相关工具包
FastJsonConvertUtil
package com.xp.logcollector.utils;
import java.util.ArrayList;
import java.util.List;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class FastJsonConvertUtil {
private static final SerializerFeature[] featuresWithNullValue = { SerializerFeature.WriteMapNullValue, SerializerFeature.WriteNullBooleanAsFalse,
SerializerFeature.WriteNullListAsEmpty, SerializerFeature.WriteNullNumberAsZero, SerializerFeature.WriteNullStringAsEmpty };
public static T convertJSONToObject(String data, Class clzss) {
try {
T t = JSON.parseObject(data, clzss);
return t;
} catch (Exception e) {
log.error("convertJSONToObject Exception", e);
return null;
}
}
public static T convertJSONToObject(JSonObject data, Class clzss) {
try {
T t = JSONObject.toJavaObject(data, clzss);
return t;
} catch (Exception e) {
log.error("convertJSONToObject Exception", e);
return null;
}
}
public static List convertJSONToArray(String data, Class clzss) {
try {
List t = JSON.parseArray(data, clzss);
return t;
} catch (Exception e) {
log.error("convertJSONToArray Exception", e);
return null;
}
}
public static List convertJSONToArray(List data, Class clzss) {
try {
List t = new ArrayList();
for (JSonObject jsonObject : data) {
t.add(convertJSONToObject(jsonObject, clzss));
}
return t;
} catch (Exception e) {
log.error("convertJSONToArray Exception", e);
return null;
}
}
public static String convertObjectToJSON(Object obj) {
try {
String text = JSON.toJSonString(obj);
return text;
} catch (Exception e) {
log.error("convertObjectToJSON Exception", e);
return null;
}
}
public static JSonObject convertObjectToJSONObject(Object obj){
try {
JSonObject jsonObject = (JSONObject) JSONObject.toJSON(obj);
return jsonObject;
} catch (Exception e) {
log.error("convertObjectToJSONObject Exception", e);
return null;
}
}
public static String convertObjectToJSONWithNullValue(Object obj) {
try {
String text = JSON.toJSonString(obj, featuresWithNullValue);
return text;
} catch (Exception e) {
log.error("convertObjectToJSONWithNullValue Exception", e);
return null;
}
}
}
InputMDC
package com.xp.logcollector.utils;
import org.jboss.logging.MDC;
import org.springframework.context.EnvironmentAware;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
@Component
public class InputMDC implements EnvironmentAware {
private static Environment environment;
@Override
public void setEnvironment(Environment environment) {
InputMDC.environment = environment;
}
public static void putMDC() {
MDC.put("hostName", NetUtil.getLocalHostName());
MDC.put("ip", NetUtil.getLocalIp());
MDC.put("applicationName", environment.getProperty("spring.application.name"));
}
}
NetUtil
package com.xp.logcollector.utils;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.SocketChannel;
import java.util.Enumeration;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class NetUtil {
public static String normalizeAddress(String address){
String[] blocks = address.split("[:]");
if(blocks.length > 2){
throw new IllegalArgumentException(address + " is invalid");
}
String host = blocks[0];
int port = 80;
if(blocks.length > 1){
port = Integer.valueOf(blocks[1]);
} else {
address += ":"+port; //use default 80
}
String serverAddr = String.format("%s:%d", host, port);
return serverAddr;
}
public static String getLocalAddress(String address){
String[] blocks = address.split("[:]");
if(blocks.length != 2){
throw new IllegalArgumentException(address + " is invalid address");
}
String host = blocks[0];
int port = Integer.valueOf(blocks[1]);
if("0.0.0.0".equals(host)){
return String.format("%s:%d",NetUtil.getLocalIp(), port);
}
return address;
}
private static int matchedIndex(String ip, String[] prefix){
for(int i=0; i ]+");
try {
Pattern pattern = Pattern.compile("[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+");
Enumeration interfaces = NetworkInterface.getNetworkInterfaces();
String matchedIp = null;
int matchedIdx = -1;
while (interfaces.hasMoreElements()) {
NetworkInterface ni = interfaces.nextElement();
Enumeration en = ni.getInetAddresses();
while (en.hasMoreElements()) {
InetAddress addr = en.nextElement();
String ip = addr.getHostAddress();
Matcher matcher = pattern.matcher(ip);
if (matcher.matches()) {
int idx = matchedIndex(ip, prefix);
if(idx == -1) continue;
if(matchedIdx == -1){
matchedIdx = idx;
matchedIp = ip;
} else {
if(matchedIdx>idx){
matchedIdx = idx;
matchedIp = ip;
}
}
}
}
}
if(matchedIp != null) return matchedIp;
return "127.0.0.1";
} catch (Exception e) {
return "127.0.0.1";
}
}
public static String getLocalIp() {
return getLocalIp("*>10>172>192>127");
}
public static String remoteAddress(SocketChannel channel){
SocketAddress addr = channel.socket().getRemoteSocketAddress();
String res = String.format("%s", addr);
return res;
}
public static String localAddress(SocketChannel channel){
SocketAddress addr = channel.socket().getLocalSocketAddress();
String res = String.format("%s", addr);
return addr==null? res: res.substring(1);
}
public static String getPid(){
RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
String name = runtime.getName();
int index = name.indexOf("@");
if (index != -1) {
return name.substring(0, index);
}
return null;
}
public static String getLocalHostName() {
try {
return (InetAddress.getLocalHost()).getHostName();
} catch (UnknownHostException uhe) {
String host = uhe.getMessage();
if (host != null) {
int colon = host.indexOf(':');
if (colon > 0) {
return host.substring(0, colon);
}
}
return "UnknownHost";
}
}
}
警告的回调准备
entity
package com.xp.logcollector.entity;
import lombok.Data;
@Data
public class AccurateWatcherMessage {
private String title;
private String executionTime;
private String applicationName;
private String level;
private String body;
}
controller
package com.xp.logcollector.web;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.fastjson.JSON;
import com.xp.logcollector.entity.AccurateWatcherMessage;
@RestController
public class WatcherController {
@RequestMapping(value ="/accurateWatch")
public String watch(@RequestBody AccurateWatcherMessage accurateWatcherMessage) {
String ret = JSON.toJSonString(accurateWatcherMessage);
System.err.println("----告警内容----:" + ret);
return "is watched" + ret;
}
}
2.配置filebeat
我们使用filebeat进行日志的收集
配置filebeat.yml
###################### Filebeat Configuration Example #########################
filebeat.prospectors:
- input_type: log
paths:
## app-服务名称.log, 为何写死,防止发生轮转抓取历史数据
- /usr/local/software/logs/app-logcollector.log #日志文件地址
#定义写入 ES 时的 _type 值
document_type: "app-log"
multiline:
#pattern: '^s*(d{4}|d{2})-(d{2}|[a-zA-Z]{3})-(d{2}|d{4})' # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
pattern: '^[' # 指定匹配的表达式(匹配以 "{ 开头的字符串)。具体以哪一种形式进行匹配要根据实际的日志格式来配置。
negate: true # 是否必须匹配到
match: after # 以[开头的多行数据,从第二行开始合并到上一行的末尾
max_lines: 2000 # 最大的行数,多余的再也不合并到上一行末尾
timeout: 2s # 若是在规定时间没有新的日志事件就不等待后面的日志,提交数据
fields:
logbiz: collector
logtopic: app-log-collector ## 按服务划分用做kafka topic
evn: dev
- input_type: log
paths:
- /usr/local/software/logs/error-logcollector.log
document_type: "error-log"
multiline:
#pattern: '^s*(d{4}|d{2})-(d{2}|[a-zA-Z]{3})-(d{2}|d{4})' # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
pattern: '^[' # 指定匹配的表达式(匹配以 "{ 开头的字符串)
negate: true # 是否匹配到
match: after # 合并到上一行的末尾
max_lines: 2000 # 最大的行数
timeout: 2s # 若是在规定时间没有新的日志事件就不等待后面的日志
fields:
logbiz: collector
logtopic: error-log-collector ## 按服务划分用做kafka topic
evn: dev
output.kafka:
enabled: true
hosts: ["ip:9092"]
topic: '%{[fields.logtopic]}'
partition.hash:
reachable_only: true
compression: gzip
max_message_bytes: 1000000
required_acks: 1
logging.to_files: true
3.配置Kafka
使用Kafka存储日志消息
启动Kafka
/usr/local/software/kafka_2.12-2.1.1/bin/kafka-server-start.sh /usr/local/software/kafka_2.12-2.1.1/config/server.properties &
创建两个主题
/usr/local/software/kafka_2.12-2.1.1/bin/kafka-topics.sh --zookeeper i[:2181 --create --topic app-log-collector --partitions 1 --replication-factor 1 /usr/local/software/kafka_2.12-2.1.1/bin/kafka-topics.sh --zookeeper ip:2181 --create --topic error-log-collector --partitions 1 --replication-factor 1
查看主题
bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --topic error-log-collector --from-beginning4.配置elastcisearch
详细查看
elasticsearch配置
添加ik分词器
## 下载 https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.6.0/elasticsearch-analysis-ik-6.6.0.zip mkdir -p /usr/local/elasticsearch-6.6.0/plugins/ik/ ## 上传到/usr/local/software下 elasticsearch-analysis-ik-6.6.0.zip ## 进行解压到刚创建的/usr/local/elasticsearch-6.6.0/plugins/ik/目录: unzip -d /usr/local/elasticsearch-6.6.0/plugins/ik/ elasticsearch-analysis-ik-6.6.0.zip ## 查看是否ok cd /usr/local/elasticsearch-6.6.0/plugins/ik/ ## 重新复权 chown -R elk:elk /usr/local/elasticsearch-6.6.0/ ## 重新启动ES节点,显示如下信息代表加载ik分词器成功 [es-node01] loaded plugin [analysis-ik]5.配置logstash
logstash充当消费者将kafaka里面的日志topic消费到elasticsearch中去
在logstash目录创建script
添加脚本logstash-script.conf
input {
kafka {
## app-log-服务名称
topics_pattern => "app-log-.*"
bootstrap_servers => "ip:9092"
codec => json
consumer_threads => 1 ## 由于只设置了一个partition,因此消费者线程数设置为1
decorate_events => true
#auto_offset_rest => "latest"
group_id => "app-log-group"
}
kafka {
## error-log-服务名称
topics_pattern => "error-log-.*"
bootstrap_servers => "ip:9092"
codec => json
consumer_threads => 1
decorate_events => true
#auto_offset_rest => "latest"
group_id => "error-log-group"
}
}
filter {
## 时区转换
ruby {
code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
}
if "app-log" in [fields][logtopic]{
grok {
## 表达式
match => ["message", "[%{NOTSPACE:currentDateTime}] [%{NOTSPACE:level}] [%{NOTSPACE:thread-id}] [%{NOTSPACE:class}] [%{data:hostName}] [%{data:ip}] [%{data:applicationName}] [%{data:location}] [%{data:messageInfo}] ## (''|%{QUOTEDSTRING:throwable})"]
}
}
if "error-log" in [fields][logtopic]{
grok {
## 表达式
match => ["message", "[%{NOTSPACE:currentDateTime}] [%{NOTSPACE:level}] [%{NOTSPACE:thread-id}] [%{NOTSPACE:class}] [%{data:hostName}] [%{data:ip}] [%{data:applicationName}] [%{data:location}] [%{data:messageInfo}] ## (''|%{QUOTEDSTRING:throwable})"]
}
}
}
## 测试输出到控制台:
output {
stdout { codec => rubydebug }
}
## elasticsearch,未实现:
output {
if "app-log" in [fields][logtopic]{
## es插件
elasticsearch {
# es服务地址
hosts => ["ip:9200"]
# 用户名密码
user => "elastic"
password => "123456"
## 索引名,+ 号开头的,就会自动认为后面是时间格式:
## javalog-app-service-2019.01.23
index => "app-log-%{[fields][logbiz]}-%{index_time}"
# 是否嗅探集群ip:通常设置true;http://ip:9200/_nodes/http?pretty
# 经过嗅探机制进行es集群负载均衡发日志消息
sniffing => true
# logstash默认自带一个mapping模板,进行模板覆盖
template_overwrite => true
}
}
if "error-log" in [fields][logtopic]{
elasticsearch {
hosts => ["ip:9200"]
user => "elastic"
password => "123456"
index => "error-log-%{[fields][logbiz]}-%{index_time}"
sniffing => true
template_overwrite => true
}
}
}
注意你的logstash和elasticsearch的私网ip要互通,logstash会用公网ip连接你的elasticsearch,然后通过你elasticsearch里面配置的连接池私网ip去推你的数据,如果你的私网ip不通会一直连接失败
Elasticsearch pool URLs updated {:changes=>{:removed=>[http://elastic:xxxxxx
6.kibana配置
## 解压kibana wget https://artifacts.elastic.co/downloads/kibana/kibana-6.6.0-linux-x86_64.tar.gz tar -zxvf kibana-6.6.0-linux-x86_64.tar.gz -C /usr/local/ mv kibana-6.6.0-linux-x86_64/ kibana-6.6.0 ## 进入kibana目录,修改配置文件 vim /usr/local/kibana-6.6.0/config/kibana.yml ## 修改配置如下: server.host: "0.0.0.0" server.name: "ip" elasticsearch.hosts: ["http://ip:9200"] elasticsearch.username: "elastic" elasticsearch.password: "123456" ## 启动: /usr/local/kibana-6.6.0/bin/kibana & ## 指定配置文件启动: nohup /usr/local/kibana-6.6.0/bin/kibana -c /usr/local/kibana-6.6.0/config/kibana.yml > /dev/null 2>&1 & ## 访问: http://192.168.0.236:5601/app/kibana (5601为kibana默认端口) ##申请license: https://license.elastic.co/registration ## 修改申请的license, 注意license.json文件名称不能变否则认证失败 1."type":"basic" 替换为 "type":"platinum" # 基础版变更为铂金版 2."expiry_date_in_millis":1561420799999 替换为 "expiry_date_in_millis":3107746200000 # 1年变为50年 ## 启动elasticsearch服务 和 kibana服务 ## 进入kibana后台,Management->License Management上传修改后的token7.依次启动,配置watcher
zookeeper---->filebeat---->kafka---->项目jar---->elasticsearch---->logstash---->kibana
在kibana中创建index,如果index输入关键词没有下一步提升,你的数据没有推送到elastic search中,注意你的端口是否打开,看其他应用是否启动起来了
创建erro的索引
PUT _template/error-log-
{
"template": "error-log-*",
"order": 0,
"settings": {
"index": {
"refresh_interval": "5s"
}
},
"mappings": {
"_default_": {
"dynamic_templates": [
{
"message_field": {
"match_mapping_type": "string",
"path_match": "message",
"mapping": {
"norms": false,
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_max_word"
}
}
},
{
"throwable_field": {
"match_mapping_type": "string",
"path_match": "throwable",
"mapping": {
"norms": false,
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_max_word"
}
}
},
{
"string_fields": {
"match_mapping_type": "string",
"match": "*",
"mapping": {
"norms": false,
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_max_word",
"fields": {
"keyword": {
"type": "keyword"
}
}
}
}
}
],
"_all": {
"enabled": false
},
"properties": {
"hostName": {
"type": "keyword"
},
"ip": {
"type": "ip"
},
"level": {
"type": "keyword"
},
"currentDateTime": {
"type": "date"
}
}
}
}
}
创建对应的watcher
PUT _xpack/watcher/watch/error_log_collector_watcher
{
"trigger": {
"schedule": {
"interval": "5s"
}
},
"input": {
"search": {
"request": {
"indices": [""],
"body": {
"size": 0,
"query": {
"bool": {
"must": [
{
"term": {"level": "ERROR"}
}
],
"filter": {
"range": {
"currentDateTime": {
"gt": "now-30s" , "lt": "now"
}
}
}
}
}
}
}
}
},
"condition": {
"compare": {
"ctx.payload.hits.total": {
"gt": 0
}
}
},
"transform": {
"search": {
"request": {
"indices": [""],
"body": {
"size": 1,
"query": {
"bool": {
"must": [
{
"term": {"level": "ERROR"}
}
],
"filter": {
"range": {
"currentDateTime": {
"gt": "now-30s" , "lt": "now"
}
}
}
}
},
"sort": [
{
"currentDateTime": {
"order": "desc"
}
}
]
}
}
}
},
"actions": {
"test_error": {
"webhook" : {
"method" : "POST",
"url" : "http://ip:8001/accurateWatch",
"body" : "{"title": "异常错误告警", "applicationName": "{{#ctx.payload.hits.hits}}{{_source.applicationName}}{{/ctx.payload.hits.hits}}", "level":"告警级别P1", "body": "{{#ctx.payload.hits.hits}}{{_source.messageInfo}}{{/ctx.payload.hits.hits}}", "executionTime": "{{#ctx.payload.hits.hits}}{{_source.currentDateTime}}{{/ctx.payload.hits.hits}}"}"
}
}
}
}
查看对应日志
查看相关watcher
项目中回调预警业务
配置成功,祝君好梦



