栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

ElasticSearch搜索引擎结合Mysql数据库,查询mysql数据

ElasticSearch搜索引擎结合Mysql数据库,查询mysql数据

需要下载的东西
  • ElasticSearch——https://www.elastic.co/cn/downloads/elasticsearch
  • Logstash(版本需要和ES对应)——https://www.elastic.co/cn/downloads/logstash
  • mysql驱动jar包——https://dev.mysql.com/downloads/connector/j/
ElasticSearch

下载完ElasticSearch后解压,然后点击bin目录下的elasticsearch.bat启动(ES默认端口号为9200,若不能启动请检查端口号是否占用)

Logstash

将下载好的mysql驱动压缩包和Logstash压缩包解压,将mysql的的jar包放入logstash文件夹内

同步mysql数据库中数据到ES中

在logstash中新建logstash-mysql.conf文件(名字任意,可以不同)

其中写入

input {
    stdin {
    }
    jdbc {
      # 数据库  数据库名称为ESDB,表名为user
      jdbc_connection_string => "jdbc:mysql://localhost:3306/ESDB"
      # 用户名密码
      jdbc_user => "root"
      jdbc_password => "root"
      # jar包的位置
      jdbc_driver_library => "D:/ElasticSearch/logstash-7.13.3/mysql-connector-java-8.0.25.jar"
      # mysql的Driver
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      # sql文件位置 没有可以不写
      #statement_filepath => "config-mysql/user.sql"
      statement => "select * from user"
      # 设置定时任务间隔  含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务
      schedule => "* * * * *"
	  #索引的类型
      type => "user"
    }
}

output {
    elasticsearch {
        hosts => "127.0.0.1:9200"
        # index名
		index => "user"
		# 需要关联的数据库中有有一个id字段,对应索引的id号
        document_id => "%{id}"
    }
    stdout {
        codec => json_lines
    }
}

然后在bin目录下执行logstash -f logstash-mysql.conf就能自动同步mysql数据了

可能出现的问题
  • 数据库中的时间字段与同步到ES中的对应的数据不符(相差8小时)
    修改logstash-mysql.conf配置文件
# 在jdbc字段中添加
plugin_timezone => "local"
# 添加filter字段
filter {
    # 因为时区问题需要修正时间
    ruby { 
        code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" 
    }
    ruby {
        code => "event.set('@timestamp',event.get('timestamp'))"
    }
    mutate {
        remove_field => ["timestamp"]
    }
    # 因为时区问题需要修正时间
    ruby {
        code => "event.set('create_date', event.get('create_date').time.localtime + 8*60*60)" 
    }    
    # 因为时区问题需要修正时间
    ruby {
        code => "event.set('update_date', event.get('update_date').time.localtime + 8*60*60)" 
    } 
}


# 完整配置信息
input {
    stdin {
    }
    jdbc {
      # 数据库  数据库名称为elk,表名为book_table
      jdbc_connection_string => "jdbc:mysql://localhost:3306/ESDB"
      # 用户名密码
      jdbc_user => "root"
      jdbc_password => "root"
      # jar包的位置
      jdbc_driver_library => "D:/ElasticSearch/logstash-7.13.3/mysql-connector-java-8.0.25.jar"
      # mysql的Driver
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      #statement_filepath => "config-mysql/user.sql"
      statement => "select * from user"
      schedule => "* * * * *"
	  #索引的类型
      type => "user"
	  plugin_timezone => "local"
    }
}
 
filter {
    # 因为时区问题需要修正时间
    ruby { 
        code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)" 
    }
    ruby {
        code => "event.set('@timestamp',event.get('timestamp'))"
    }
    mutate {
        remove_field => ["timestamp"]
    }
    # 因为时区问题需要修正时间
    ruby {
        code => "event.set('create_date', event.get('create_date').time.localtime + 8*60*60)" 
    }    
    # 因为时区问题需要修正时间
    ruby {
        code => "event.set('update_date', event.get('update_date').time.localtime + 8*60*60)" 
    } 
}
 
output {
    elasticsearch {
        hosts => "127.0.0.1:9200"
        # index名
		index => "user"
		# 需要关联的数据库中有有一个id字段,对应索引的id号
        document_id => "%{id}"
    }
    stdout {
        codec => json_lines
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/286023.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号