栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

logstash多任务多管道pipeline配置启动,方案分析

logstash多任务多管道pipeline配置启动,方案分析

文章目录
  • 6.0以下方案
  • 6.0以下方案分析
  • 启动方式
  • 6.0以上方案
  • 启动方式

6.0以下方案

logstash处理多个输入输出源方案

没有使用多管道,两个任务,jdbc demo
手写自定义mysql.conf

input {
    stdin {}
    jdbc {
       # 第一个任务对应一个type;
        type => "type_1"
         # ...省略
         # 同步频率(分 时 天 月 年),默认每分钟同步一次;
        schedule => "* * * * *"
    }
    jdbc {
         # 第二个任务对应一个type;
         type => "type_2"
          # ...省略
    }
}

filter {
 if [type] == "type_1" {
   #过滤
    json {
        source => "message"
        remove_field => ["message"]
    }
    # nested聚合
    aggregate{
	 task_id => "%{id}"
	 code => "
		map['id'] ||= event.get('id')
		map['name'] ||= event.get('name')
		map['hobby'] ||= event.get('hobby')
		map['genshin'] ||=[]
		map['genshin'] << {'student_id' =>event.get('student_id'),'person' => event.get('person')}
		event.cancel()
	 "
	push_previous_map_as_event =>true
	timeout => 5
	}
  }
  if [type] == "type_2" {}
}

output {
    # output模块的type需和jdbc模块的type一致
    if [type] == "type_1" {
        elasticsearch {
             # host => "x.x.x.x"
             # port => "9200"
             # 配置ES集群地址
            hosts => ["x.x.x.x:9200", "x.x.x.x:9200"]
             # elastic索引名字,必须小写
            index => "logstash_test"
             # 数据唯一索引(建议使用数据库id)
            document_id => "%{id}"
        }
    }
    if [type] == "type_2" {
        elasticsearch {
            # host => "x.x.x.x"
            # port => "9300"
            # ...省略
        }
    }
}
6.0以下方案分析

如果logstash需要配置多源输入输出,在一个管道pipeline里配置多个独立的输入流输出流,需要用 if 判断,这样确实能够解决问题,但是也会有隐患,如果某个任务流出现阻塞无法使用,那么其他的流也会收到影响不能正常工作。
6.0过后出现了多pipeline结构,可以配置多个pipeline,他们之间互不影响,一个pipeline出现问题不影响其他pipeline工作

启动方式

在解压后的目录./bin/logstash -f 某某.conf,可以用-f指定启动的配置,就是上面的demo配置

6.0以上方案

使用多pipeline互不影响方案
在conf下有个配置文件pipeline.yml

# List of pipelines to be loaded by Logstash
#
# This document must be a list of dictionaries/hashes, where the keys/values are pipeline settings.
# Default values for omitted settings are read from the `logstash.yml` file.
# When declaring multiple pipelines, each MUST have its own `pipeline.id`.
#
# Example of two pipelines:
#放开#注释字典的方式定义了logstash_test这个pipeline
 - pipeline.id: logstash_test
   pipeline.workers: 4
   pipeline.batch.size: 1
   # 指向类似上面那个demo的配置,只是没有if判断多个流而已,一个pipeline最好只做一个任务
   path.config: "/Users/xx/apps/logstash_run/logstash-8.0.0-beta1/script/mysql2.conf"
   queue.type: persisted
#   config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }"
#放开#注释字典的方式定义了another_test这个pipeline,这里没有放开,所以只启动了上面一个管道
# - pipeline.id: another_test
#   queue.type: persisted
#   path.config: "/tmp/logstash/*.config"
启动方式

如果启动命令不加 -f 指定自定义的conf配置,它会自己找到pipeline.yml配置启动,
命令:./bin/logstash
这时候只会启动pipeline放开#注释的pipeline,如果有多个pipeline就会启动多个

如果看爽了,就点个赞吧(doge)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/600257.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号