版本
logstash-7.6.1
kibana-7.6.1
logstash.yml
node.name: "logstast-node1" pipeline.id: "secondbatch" #pipeline.id: "pipeline" pipeline.workers: 12 pipeline.batch.size: 5000 pipeline.batch.delay: 10 http.host: "127.0.0.1" http.port: 9600 xpack.monitoring.enabled: true xpack.monitoring.elasticsearch.username: elastic xpack.monitoring.elasticsearch.password: your elastic password xpack.monitoring.elasticsearch.hosts: ["https://xx.xx.xx.xx:9200","https://xx.xx.xx.xx:9200"] xpack.monitoring.elasticsearch.ssl.certificate_authority: "/home/epguser/logstash-7.6.1/config/ca.crt" xpack.monitoring.elasticsearch.ssl.verification_mode: certificate xpack.monitoring.elasticsearch.sniffing: false xpack.monitoring.collection.interval: 60s xpack.monitoring.collection.pipeline.details.enabled: true
根据kibana提示在 xxx.conf中
input{
kafka {
id => "kafka"
bootstrap_servers => "xx.xx.xx.xx:9092,xx.xx.xx.xx:9092"
client_id => "logstash-25"
auto_offset_reset => "latest"
topics => "yourTopic"
group_id => "logstash-new"
codec => "json"
consumer_threads => "12"
heartbeat_interval_ms => "5000"
max_poll_records => "1000"
max_poll_interval_ms => "1200000"
sasl_mechanism => "PLAIN"
security_protocol => "SASL_PLAINTEXT"
jaas_path => "/home/epguser/kafka_2.11-2.2.0/config/kafka_server_jaas.conf"
decorate_events => true
}
}
filter{
grok{
id => "grok"
match => {"message" => '"%{IP:remoteIp}" "(?[^"]*)" "[%{HTTPDATE:timestamp}]" "%{WORD:method} %{URIPATHPARAM:url} (?[a-zA-zd/.]*)" "%{NUMBER:code:int}" "%{NUMBER:body_size:int}" "%{NUMBER:size:int}" "%{data:gzip_ratio}" "%{data:http_referer}" "%{data:http_user_agent}" "%{data:http_x_forwarded_for}" "%{data:pstream_addr}" "%{data:upstream_response_time}" "%{NUMBER:totalTime:float}"'}
}
date {
id => "date"
match => ["timestamp", "dd/MMM/YYYY:HH:mm:ss Z"]
target => "@timestamp"
}
mutate { remove_field => ["timestamp"]}
}
output{
if "_grokparsefailure" in [tags] {
elasticsearch{
id => "index_epg_error"
hosts => ["https://xx.xx.xx.xx:9200","https://xx.xx.xx.xx:9200"]
index => "index_epg_error_%{+YYYY.MM.dd}"
user => "elastic"
password => "your password"
ssl => true
ssl_certificate_verification => true
cacert => "/home/epguser/logstash-7.6.1/config/ca.crt"
}
}else{
elasticsearch{
id => "index_sx_live"
hosts => ["https://xx.xx.xx.xx:9200","https://xx.xx.xx.xx:9200"]
index => "index_sx_live_%{+YYYY.MM.dd}"
user => "elastic"
password => "your password"
ssl => true
ssl_certificate_verification => true
cacert => "/home/epguser/logstash-7.6.1/config/ca.crt"
}
}
}
在xxx.conf保持group_id 一致,在同一个分组中消费数据
在logstash.yml 中根据实际需求 是否用多管道pipeline.id



