栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

ELK+kafaka+filebeat实现系统日志收集与预警

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

ELK+kafaka+filebeat实现系统日志收集与预警

ELK+kafaka+filebeat实现系统日志收集与预警

总体的流程图如下

1.项目准备

添加相关依赖


        
            org.springframework.boot
            spring-boot-starter-web
            
            
                
                    org.springframework.boot
                    spring-boot-starter-logging
                
            
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
            org.projectlombok
            lombok
        
        
        
            org.springframework.boot
            spring-boot-starter-log4j2
        
        
            com.lmax
            disruptor
            3.3.4
        

        
            com.alibaba
            fastjson
            1.2.58
        

    
    
        collector
        
        
            
                src/main/java
                
                    ***.xml
                
                
                true
            
            
                src/main/resources
            
        
        
            
                org.springframework.boot
                spring-boot-maven-plugin
                
                    com.xp.logcollector.Application
                
            
        
    

配置log4j2.xml



    
        logs
        logcollector
        [%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n
    
    
        
            
          
        
          
          
              
              
          
                   
        
        
          
          
              
                        
          
              
              
          
                   
                    
    
    
        
        
          
        
        
          
               
        
            
            
            
                 
    

项目中打印日志使用

package com.xp.logcollector.web;

import com.xp.logcollector.utils.InputMDC;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


@Slf4j
@RestController
public class IndexController {
    @RequestMapping("/index")
    public String index(){

        InputMDC.putMDC();
        log.info("index中的info级别日志");
        log.error("index中的error级别日志");

        log.warn("index中的warn级别日志");

        return "index";
    }
    @RequestMapping(value = "/err")
    public String err() {
        InputMDC.putMDC();
        try {
            int a = 1/0;
        } catch (Exception e) {
            log.error("算术异常", e);
        }
        return "err";
    }
}

相关工具包
FastJsonConvertUtil

package com.xp.logcollector.utils;

import java.util.ArrayList;
import java.util.List;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;

import lombok.extern.slf4j.Slf4j;


@Slf4j
public class FastJsonConvertUtil {

	private static final SerializerFeature[] featuresWithNullValue = { SerializerFeature.WriteMapNullValue, SerializerFeature.WriteNullBooleanAsFalse,
	        SerializerFeature.WriteNullListAsEmpty, SerializerFeature.WriteNullNumberAsZero, SerializerFeature.WriteNullStringAsEmpty };

	
	public static  T convertJSONToObject(String data, Class clzss) {
		try {
			T t = JSON.parseObject(data, clzss);
			return t;
		} catch (Exception e) {
			log.error("convertJSONToObject Exception", e);
			return null;
		}
	}
	
	
	public static  T convertJSONToObject(JSonObject data, Class clzss) {
		try {
			T t = JSONObject.toJavaObject(data, clzss);
			return t;
		} catch (Exception e) {
			log.error("convertJSONToObject Exception", e);
			return null;
		}
	}

	
	public static  List convertJSONToArray(String data, Class clzss) {
		try {
			List t = JSON.parseArray(data, clzss);
			return t;
		} catch (Exception e) {
			log.error("convertJSONToArray Exception", e);
			return null;
		}
	}

	
	public static  List convertJSONToArray(List data, Class clzss) {
		try {
			List t = new ArrayList();
			for (JSonObject jsonObject : data) {
				t.add(convertJSONToObject(jsonObject, clzss));
			}
			return t;
		} catch (Exception e) {
			log.error("convertJSONToArray Exception", e);
			return null;
		}
	}

	
	public static String convertObjectToJSON(Object obj) {
		try {
			String text = JSON.toJSonString(obj);
			return text;
		} catch (Exception e) {
			log.error("convertObjectToJSON Exception", e);
			return null;
		}
	}
	
	
	public static JSonObject convertObjectToJSONObject(Object obj){
		try {
			JSonObject jsonObject = (JSONObject) JSONObject.toJSON(obj);
			return jsonObject;
		} catch (Exception e) {
			log.error("convertObjectToJSONObject Exception", e);
			return null;
		}		
	}

	public static String convertObjectToJSONWithNullValue(Object obj) {
		try {
			String text = JSON.toJSonString(obj, featuresWithNullValue);
			return text;
		} catch (Exception e) {
			log.error("convertObjectToJSONWithNullValue Exception", e);
			return null;
		}
	}
}

InputMDC

package com.xp.logcollector.utils;

import org.jboss.logging.MDC;
import org.springframework.context.EnvironmentAware;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;


@Component
public class InputMDC implements EnvironmentAware {

	private static Environment environment;
	
	@Override
	public void setEnvironment(Environment environment) {
		InputMDC.environment = environment;
	}
	
	public static void putMDC() {
		MDC.put("hostName", NetUtil.getLocalHostName());
		MDC.put("ip", NetUtil.getLocalIp());
		MDC.put("applicationName", environment.getProperty("spring.application.name"));
	}

}

NetUtil

package com.xp.logcollector.utils;

import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.SocketChannel;
import java.util.Enumeration;
import java.util.regex.Matcher;
import java.util.regex.Pattern;


public class NetUtil {   
	
	public static String normalizeAddress(String address){
		String[] blocks = address.split("[:]");
		if(blocks.length > 2){
			throw new IllegalArgumentException(address + " is invalid");
		}
		String host = blocks[0];
		int port = 80;
		if(blocks.length > 1){
			port = Integer.valueOf(blocks[1]);
		} else {
			address += ":"+port; //use default 80
		} 
		String serverAddr = String.format("%s:%d", host, port);
		return serverAddr;
	}
	
	public static String getLocalAddress(String address){
		String[] blocks = address.split("[:]");
		if(blocks.length != 2){
			throw new IllegalArgumentException(address + " is invalid address");
		} 
		String host = blocks[0];
		int port = Integer.valueOf(blocks[1]);
		
		if("0.0.0.0".equals(host)){
			return String.format("%s:%d",NetUtil.getLocalIp(), port);
		}
		return address;
	}
	
	private static int matchedIndex(String ip, String[] prefix){
		for(int i=0; i ]+");
		try {
			Pattern pattern = Pattern.compile("[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+");
			Enumeration interfaces = NetworkInterface.getNetworkInterfaces();
			String matchedIp = null;
			int matchedIdx = -1;
			while (interfaces.hasMoreElements()) {
				NetworkInterface ni = interfaces.nextElement();
				Enumeration en = ni.getInetAddresses(); 
				while (en.hasMoreElements()) {
					InetAddress addr = en.nextElement();
					String ip = addr.getHostAddress();  
					Matcher matcher = pattern.matcher(ip);
					if (matcher.matches()) {  
						int idx = matchedIndex(ip, prefix);
						if(idx == -1) continue;
						if(matchedIdx == -1){
							matchedIdx = idx;
							matchedIp = ip;
						} else {
							if(matchedIdx>idx){
								matchedIdx = idx;
								matchedIp = ip;
							}
						}
					} 
				} 
			} 
			if(matchedIp != null) return matchedIp;
			return "127.0.0.1";
		} catch (Exception e) { 
			return "127.0.0.1";
		}
	}
	
	public static String getLocalIp() {
		return getLocalIp("*>10>172>192>127");
	}
	
	public static String remoteAddress(SocketChannel channel){
		SocketAddress addr = channel.socket().getRemoteSocketAddress();
		String res = String.format("%s", addr);
		return res;
	}
	
	public static String localAddress(SocketChannel channel){
		SocketAddress addr = channel.socket().getLocalSocketAddress();
		String res = String.format("%s", addr);
		return addr==null? res: res.substring(1);
	}
	
	public static String getPid(){
		RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
        String name = runtime.getName();
        int index = name.indexOf("@");
        if (index != -1) {
            return name.substring(0, index);
        }
		return null;
	}
	
	public static String getLocalHostName() {
        try {
            return (InetAddress.getLocalHost()).getHostName();
        } catch (UnknownHostException uhe) {
            String host = uhe.getMessage();
            if (host != null) {
                int colon = host.indexOf(':');
                if (colon > 0) {
                    return host.substring(0, colon);
                }
            }
            return "UnknownHost";
        }
    }
}

警告的回调准备
entity

package com.xp.logcollector.entity;

import lombok.Data;

@Data
public class AccurateWatcherMessage {
	
	private String title;
	
	private String executionTime;
	
	private String applicationName;
	
	private String level;
	
	private String body;

}

controller

package com.xp.logcollector.web;

import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.alibaba.fastjson.JSON;
import com.xp.logcollector.entity.AccurateWatcherMessage;

@RestController
public class WatcherController {
	
	@RequestMapping(value ="/accurateWatch")
	public String watch(@RequestBody AccurateWatcherMessage accurateWatcherMessage) {
		String ret = JSON.toJSonString(accurateWatcherMessage);
		System.err.println("----告警内容----:" + ret);
		return "is watched" + ret;
	}
}

2.配置filebeat

我们使用filebeat进行日志的收集
配置filebeat.yml

###################### Filebeat Configuration Example #########################
filebeat.prospectors:

  - input_type: log

    paths:
      ## app-服务名称.log, 为何写死,防止发生轮转抓取历史数据
      - /usr/local/software/logs/app-logcollector.log       #日志文件地址
    #定义写入 ES 时的 _type 值
    document_type: "app-log"
    multiline:
      #pattern: '^s*(d{4}|d{2})-(d{2}|[a-zA-Z]{3})-(d{2}|d{4})'   # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
      pattern: '^['                              # 指定匹配的表达式(匹配以 "{ 开头的字符串)。具体以哪一种形式进行匹配要根据实际的日志格式来配置。
      negate: true                                # 是否必须匹配到
      match: after                                # 以[开头的多行数据,从第二行开始合并到上一行的末尾
      max_lines: 2000                             # 最大的行数,多余的再也不合并到上一行末尾
      timeout: 2s                                 # 若是在规定时间没有新的日志事件就不等待后面的日志,提交数据
    fields:
      logbiz: collector
      logtopic: app-log-collector   ## 按服务划分用做kafka topic
      evn: dev


  - input_type: log


    paths:
      - /usr/local/software/logs/error-logcollector.log
    document_type: "error-log"
    multiline:
      #pattern: '^s*(d{4}|d{2})-(d{2}|[a-zA-Z]{3})-(d{2}|d{4})'   # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)
      pattern: '^['                              # 指定匹配的表达式(匹配以 "{ 开头的字符串)
      negate: true                                # 是否匹配到
      match: after                                # 合并到上一行的末尾
      max_lines: 2000                             # 最大的行数
      timeout: 2s                                 # 若是在规定时间没有新的日志事件就不等待后面的日志
    fields:
      logbiz: collector
      logtopic: error-log-collector   ## 按服务划分用做kafka topic
      evn: dev

output.kafka:
  enabled: true
  hosts: ["ip:9092"]
  topic: '%{[fields.logtopic]}'
  partition.hash:
    reachable_only: true
  compression: gzip
  max_message_bytes: 1000000
  required_acks: 1
logging.to_files: true
3.配置Kafka

使用Kafka存储日志消息
启动Kafka

/usr/local/software/kafka_2.12-2.1.1/bin/kafka-server-start.sh /usr/local/software/kafka_2.12-2.1.1/config/server.properties &

创建两个主题

/usr/local/software/kafka_2.12-2.1.1/bin/kafka-topics.sh --zookeeper i[:2181 --create --topic app-log-collector --partitions 1  --replication-factor 1
/usr/local/software/kafka_2.12-2.1.1/bin/kafka-topics.sh --zookeeper ip:2181 --create --topic error-log-collector --partitions 1  --replication-factor 1 

查看主题

bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --topic error-log-collector  --from-beginning
4.配置elastcisearch

详细查看
elasticsearch配置
添加ik分词器

## 下载 https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.6.0/elasticsearch-analysis-ik-6.6.0.zip
mkdir -p /usr/local/elasticsearch-6.6.0/plugins/ik/
## 上传到/usr/local/software下 elasticsearch-analysis-ik-6.6.0.zip
## 进行解压到刚创建的/usr/local/elasticsearch-6.6.0/plugins/ik/目录:
unzip -d /usr/local/elasticsearch-6.6.0/plugins/ik/ elasticsearch-analysis-ik-6.6.0.zip

## 查看是否ok
cd /usr/local/elasticsearch-6.6.0/plugins/ik/
## 重新复权
chown -R elk:elk /usr/local/elasticsearch-6.6.0/

## 重新启动ES节点,显示如下信息代表加载ik分词器成功
[es-node01] loaded plugin [analysis-ik]
5.配置logstash

logstash充当消费者将kafaka里面的日志topic消费到elasticsearch中去
在logstash目录创建script
添加脚本logstash-script.conf

input {
  kafka {
    ## app-log-服务名称
    topics_pattern => "app-log-.*"
    bootstrap_servers => "ip:9092"
    codec => json
    consumer_threads => 1    ## 由于只设置了一个partition,因此消费者线程数设置为1
    decorate_events => true
    #auto_offset_rest => "latest"
    group_id => "app-log-group"
   }
   kafka {
    ## error-log-服务名称
    topics_pattern => "error-log-.*"
    bootstrap_servers => "ip:9092"
    codec => json
    consumer_threads => 1
    decorate_events => true
    #auto_offset_rest => "latest"
    group_id => "error-log-group"
   }

}

filter {

  ## 时区转换
  ruby {
    code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"
  }

  if "app-log" in [fields][logtopic]{
    grok {
        ## 表达式
        match => ["message", "[%{NOTSPACE:currentDateTime}] [%{NOTSPACE:level}] [%{NOTSPACE:thread-id}] [%{NOTSPACE:class}] [%{data:hostName}] [%{data:ip}] [%{data:applicationName}] [%{data:location}] [%{data:messageInfo}] ## (''|%{QUOTEDSTRING:throwable})"]
    }
  }

  if "error-log" in [fields][logtopic]{
    grok {
        ## 表达式
        match => ["message", "[%{NOTSPACE:currentDateTime}] [%{NOTSPACE:level}] [%{NOTSPACE:thread-id}] [%{NOTSPACE:class}] [%{data:hostName}] [%{data:ip}] [%{data:applicationName}] [%{data:location}] [%{data:messageInfo}] ## (''|%{QUOTEDSTRING:throwable})"]
    }
  }

}

## 测试输出到控制台:
output {
  stdout { codec => rubydebug }
}

## elasticsearch,未实现:
output {

  if "app-log" in [fields][logtopic]{
    ## es插件
    elasticsearch {
          # es服务地址
        hosts => ["ip:9200"]
        # 用户名密码
        user => "elastic"
        password => "123456"
        ## 索引名,+ 号开头的,就会自动认为后面是时间格式:
        ## javalog-app-service-2019.01.23
        index => "app-log-%{[fields][logbiz]}-%{index_time}"
        # 是否嗅探集群ip:通常设置true;http://ip:9200/_nodes/http?pretty
        # 经过嗅探机制进行es集群负载均衡发日志消息
        sniffing => true
        # logstash默认自带一个mapping模板,进行模板覆盖
        template_overwrite => true
    }
  }

  if "error-log" in [fields][logtopic]{
    elasticsearch {
        hosts => ["ip:9200"]
        user => "elastic"
        password => "123456"
        index => "error-log-%{[fields][logbiz]}-%{index_time}"
        sniffing => true
        template_overwrite => true
    }
  }


}

注意你的logstash和elasticsearch的私网ip要互通,logstash会用公网ip连接你的elasticsearch,然后通过你elasticsearch里面配置的连接池私网ip去推你的数据,如果你的私网ip不通会一直连接失败

Elasticsearch pool URLs updated {:changes=>{:removed=>[http://elastic:xxxxxx
6.kibana配置
## 解压kibana
wget https://artifacts.elastic.co/downloads/kibana/kibana-6.6.0-linux-x86_64.tar.gz
tar -zxvf kibana-6.6.0-linux-x86_64.tar.gz -C /usr/local/
mv kibana-6.6.0-linux-x86_64/ kibana-6.6.0
## 进入kibana目录,修改配置文件
vim /usr/local/kibana-6.6.0/config/kibana.yml
## 修改配置如下:
server.host: "0.0.0.0"
server.name: "ip"
elasticsearch.hosts: ["http://ip:9200"]
elasticsearch.username: "elastic"
elasticsearch.password: "123456"
## 启动:
/usr/local/kibana-6.6.0/bin/kibana &
## 指定配置文件启动:
nohup /usr/local/kibana-6.6.0/bin/kibana -c /usr/local/kibana-6.6.0/config/kibana.yml > /dev/null 2>&1 &
## 访问:
http://192.168.0.236:5601/app/kibana (5601为kibana默认端口)

##申请license:
https://license.elastic.co/registration

## 修改申请的license, 注意license.json文件名称不能变否则认证失败
1."type":"basic" 替换为 "type":"platinum" # 基础版变更为铂金版
2."expiry_date_in_millis":1561420799999 替换为 "expiry_date_in_millis":3107746200000 # 1年变为50年

## 启动elasticsearch服务 和 kibana服务
## 进入kibana后台,Management->License Management上传修改后的token

7.依次启动,配置watcher

zookeeper---->filebeat---->kafka---->项目jar---->elasticsearch---->logstash---->kibana

在kibana中创建index,如果index输入关键词没有下一步提升,你的数据没有推送到elastic search中,注意你的端口是否打开,看其他应用是否启动起来了

创建erro的索引

PUT _template/error-log-
{
  "template": "error-log-*",
  "order": 0,
  "settings": {
    "index": {
      "refresh_interval": "5s"
    }
  },
  "mappings": {
    "_default_": {
      "dynamic_templates": [
        {
          "message_field": {
            "match_mapping_type": "string",
            "path_match": "message",
            "mapping": {
              "norms": false,
              "type": "text",
              "analyzer": "ik_max_word",
              "search_analyzer": "ik_max_word"
            }
          }
        },
        {
          "throwable_field": {
            "match_mapping_type": "string",
            "path_match": "throwable",
            "mapping": {
              "norms": false,
              "type": "text",
              "analyzer": "ik_max_word",
              "search_analyzer": "ik_max_word"
            }
          }
        },
        {
          "string_fields": {
            "match_mapping_type": "string",
            "match": "*",
            "mapping": {
              "norms": false,
              "type": "text",
              "analyzer": "ik_max_word",
              "search_analyzer": "ik_max_word",
              "fields": {
                "keyword": {
                  "type": "keyword"
                }
              }
            }
          }
        }
      ],
      "_all": {
        "enabled": false
      },
      "properties": {         
        "hostName": {
          "type": "keyword"
        },
        "ip": {
          "type": "ip"
        },
        "level": {
          "type": "keyword"
        },
		"currentDateTime": {
		  "type": "date"
		}
      }
    }
  }
}

创建对应的watcher

PUT _xpack/watcher/watch/error_log_collector_watcher
{
  "trigger": {
    "schedule": {
      "interval": "5s"
    }
  },
  "input": {
    "search": {
      "request": {
        "indices": [""],
        "body": {
          "size": 0,
          "query": {
            "bool": {
              "must": [
                  {
                    "term": {"level": "ERROR"}
                  }
              ],
              "filter": {
                "range": {
                    "currentDateTime": {
                    "gt": "now-30s" , "lt": "now"
                  }
                }
              }
            }
          }
        }
      }
    }
  },

  "condition": {
    "compare": {
      "ctx.payload.hits.total": {
        "gt": 0
      }
    }
  },

  "transform": {
    "search": {
      "request": {
        "indices": [""],
        "body": {
          "size": 1,
          "query": {
            "bool": {
              "must": [
                  {
                    "term": {"level": "ERROR"}
                  }
              ],
              "filter": {
                "range": {
                    "currentDateTime": {
                    "gt": "now-30s" , "lt": "now"
                  }
                }
              }
            }
          },
          "sort": [
            {
                "currentDateTime": {
                    "order": "desc"
                }
            }
          ]
        }
      }
    }
  },
  "actions": {
    "test_error": {
      "webhook" : {
        "method" : "POST",
        "url" : "http://ip:8001/accurateWatch",
        "body" : "{"title": "异常错误告警", "applicationName": "{{#ctx.payload.hits.hits}}{{_source.applicationName}}{{/ctx.payload.hits.hits}}", "level":"告警级别P1", "body": "{{#ctx.payload.hits.hits}}{{_source.messageInfo}}{{/ctx.payload.hits.hits}}", "executionTime": "{{#ctx.payload.hits.hits}}{{_source.currentDateTime}}{{/ctx.payload.hits.hits}}"}"
      }
    }
 }
}

查看对应日志



查看相关watcher

项目中回调预警业务

配置成功,祝君好梦

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/697278.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号