栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Logstash同步Kafka数据到es,实现删除

Logstash同步Kafka数据到es,实现删除

说下背景,使用debezium同步数据到kafka,然后使用logstash同步kafka数据到es,想要实现pgsql删除数据后,把es中的数据删除掉
通过观察发现,数据库新增的数据,kafka中的after是有数据的,对于删除数据,kafka中的after是没有数据的,因此可以通过判断这个值来做处理

input {
  kafka {
    bootstrap_servers => ["localhost:9092"]
    group_id => "voucher_voucher1"
    topics => ["citus8.user1.voucher"]
    client_id => "voucher1"              
    auto_offset_reset => "earlist"   
    consumer_threads => 1		
    decorate_events => "true"		
    codec => "json"
 }
}
filter {

  ruby {
    code => "if event.get('after').nil?; event.set('tags','null-value');end"
  }
#主要是获取主键
  if "null-value" in [tags] {
    mutate {
      add_field => {
                "@after" => "%{before}"
            }
    }
  }
  else{
    mutate {
      add_field => {
                "@after" => "%{after}"
            }
    }
  }


  json {
    source=>"@after"
  }


  ruby {

    code => "

    require 'json'

    some_json_field_value = JSON.parse(event.get('voucher_source').to_s)

    event.set('voucher_source',some_json_field_value)

    "
  }


  mutate {
        remove_field => ["kafka","source","before","after","@after","@timestamp","@version","ts_ms"]
  }


}

output {
    stdout { codec => json_lines }
    if "null-value" in [tags] {
    elasticsearch {
        hosts => ["localhost:9200"]
        action=>"delete" #删除
        index => "e_document"
        document_id => "%{voucher_id}"
    }
  }
  else{
    elasticsearch {
      hosts => ["localhost:9200"]
      index => "e_document"
      document_id => "%{voucher_id}"
  }
  }
}

https://elasticsearch.cn/question/3681

https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html#plugins-outputs-elasticsearch-action

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/775055.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号