我将Logstash与
jdbc输入插件和
elasticsearch输出插件结合使用。有一篇博客文章显示了此解决方案的完整示例。
后安装Logstash,您可以创建我上面这样提到的插件配置文件:
input { jdbc { jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb" jdbc_user => "user" jdbc_password => "1234" jdbc_validate_connection => true jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" schedule => "5m" statement => "SELECt * FROM search WHERe timestamp > :sql_last_value" }}output { elasticsearch { protocol => http index => "searches" document_type => "search" document_id => "%{uid}" host => "ES_NODE_HOST" }}您需要确保更改一些值以匹配您的环境,但这应该可以解决您需要做的事情。
查询将每5分钟运行一次,并将提取所有
search记录
timestamp(更改名称以匹配您的数据)的时间比上次运行查询的时间最近。所选记录将沉入
searches到您的Elasticsearch服务器上的索引中
ES_NODE_HOST。确保更改索引并相应地键入名称,以及主键字段的名称(例如
uid)以匹配您的数据。



