文章目录[版权申明] 非商业目的注明出处可自由转载
出自:shusheng007
概述方案环境准备
安装Mysql/MariaDb按装Elasticsearch与Kibana(可选)安装Logstash
写logstash配置文件运行 查看结果更新数据 总结
概述最近工作中有一个将MySql数据导入ES的需求,于是网上搜索了一下,发现不尽如人意。看了好几篇文章,还是不知道怎么做,于是我准备记录一下自己成功的经验,为自己也为同行…
方案采用Logstash从MySql中抽取数据,然后导入ES中,如下图所示
环境准备 安装Mysql/MariaDb已经安装则跳过,此处采用docker安装
创建本地卷
~/software/mariadb/data
cd ~ mkdir -p software/mariadb/data
写docker-compose文件
mariadb-dc.yaml
version: '3'
services:
mariadb:
image: mariadb:10.6.5
ports:
- 3306:3306
volumes:
- ~/software/mariadb/data:/var/lib/mysql
environment:
- MYSQL_ROOT_PASSWORD=root
- MYSQL_USER=ss007
- MYSQL_PASSWORD=ss007
- MYSQL_DATAbase=ss007_bd
其中有几点需要说明:
- MYSQL_USER=ss007 - MYSQL_PASSWORD=ss007
创建了一个叫ss007的用户
执行
在mariadb-dc.yaml目录下执行
docker-compose -f mariadb-dc.yaml up -d
准备数据
数据库已经安装好,使用数据库管理工具,例如Navicat连接数据库并建表导入数据。
当然如果你习惯命令行,进入docker使用mysql命令行客户端操作也是可以的
-- ---------------------------- -- Table structure for student -- ---------------------------- DROp TABLE IF EXISTS `student`; CREATE TABLE `student` ( `id` int(11) NOT NULL AUTO_INCREMENT, `stu_name` varchar(32) NOT NULL DEFAULT '' COMMENT '姓名', `age` int(11) NOT NULL DEFAULT 18 COMMENT '年龄', `create_time` datetime DEFAULT current_timestamp() ON UPDATE current_timestamp() COMMENT '创建时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4 COMMENT='学生表'; -- ---------------------------- -- Records of student -- ---------------------------- INSERT INTO `student` VALUES (1, '王二狗', 30, '2022-02-19 10:57:16'); INSERT INTO `student` VALUES (2, '牛翠华', 25, '2022-02-19 10:57:16'); INSERT INTO `student` VALUES (3, '上官无雪', 18, '2022-02-19 10:57:16'); INSERT INTO `student` VALUES (4, '王婆', 50, '2022-02-19 10:57:16');按装Elasticsearch与Kibana(可选)
已经安装则跳过,没安装请参考docker之如何使用docker来安装ELK(elasticsearch,logstash,kibana)
安装Logstash没有采用docker安装,直接从官网下载了7.17.0的压缩包,解压即可
写logstash配置文件我们在config文件里新建一个名为my-demo-logstash.conf的文件,内容如下
input {
jdbc {
jdbc_driver_library => "./mysql-connector-java-5.1.49.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/ss007_db?characterEncoding=UTF-8&useSSL=false"
jdbc_user => root
jdbc_password => root
# jdbc_paging_enabled => "true" #是否进行分页
# jdbc_page_size => "50000"
tracking_column => "create_time"
use_column_value => true
# statement_filepath => "sql文件路径,与下面的执行语句二选1"
statement => "SELECT id,stu_name AS name,age,create_time FROM ss007_db.student WHERe create_time > :sql_last_value;"
# 设置监听间隔 各字段含义(由左至右)秒、分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => " 10 * * * * *"
}
}
output {
elasticsearch {
document_id => "%{id}"
# document_type => ""
index => "ss007_index"
hosts => ["localhost:9200"]
}
stdout{
codec => rubydebug
}
}
文件分为input 和output两部分,其实还可以有个filter。input 部分从mysql读取数据,output部分向ES插入数据,关键字段解释如下
input:
jdbc_driver_library:mysql-connector-java.jar 连接文件路径,我放到了与bin同一级别,所以是./tracking_column: mysql中的某一列,logstash用它来跟踪数据的变化,例如此处我用的是create_time列,那么当新增加一条记录,或者修改一下这个时间的话,logstash就会知道,从而更新数据use_column_value :是否使用tracking_column的值作为:sql_last_value的值。设置为true时,:sql_last_value = tracking_column。设置为false,:sql_last_value= 上次查询时间。statement:sql查询语句,其中:sql_last_value 已经在上面介绍过了schedule:执行计划,可以通过它设置查询的执行计划,我这里设置每隔10秒执行一次
output:
如果ES中没有index,会自动创建
document_id => “%{id}” : ES中document的唯一id,这里使用student的主键index => “ss007_index” : ES 的索引
7.0以后不用设置document_type,会默认doc
运行导航到logstash路径下运行以下命令,注意7.x不能进入bin文件执行
sudo bin/logstash -f config/my-demo-logstash.conf
输出如下类似结果说明成功了
...
[2022-02-20T16:22:10,869][INFO ][logstash.inputs.jdbc ][main][4a56def37a98a3ccef919df855d4d915c4fa56a1212a5e6c99a82aa2a6e70df9] (0.009233s) SELECt id,stu_name AS name,age,create_time FROM ss007_db.student WHERe create_time > 0;
{
"name" => "王婆",
"@timestamp" => 2022-02-20T08:22:10.938Z,
"create_time" => 2022-02-19T02:57:16.000Z,
"id" => 4,
"@version" => "1",
"age" => 50
}
{
"name" => "王二狗",
"@timestamp" => 2022-02-20T08:22:10.927Z,
"create_time" => 2022-02-19T02:57:16.000Z,
"id" => 1,
"@version" => "1",
"age" => 30
}
...
查看结果
从Kibana上查看ss007_index是否创建并导入了数据
更新数据因为我们设置了每隔10秒执行一次,所以当我们插入新数据,或者更新数据导致create_time列的日期增大,logstash就会感受到,然后更新ES中的数据
插入一条新数据输出
{
"name" => "张妈",
"@timestamp" => 2022-02-20T08:27:10.160Z,
"create_time" => 2022-02-19T21:57:16.000Z,
"id" => 5,
"@version" => "1",
"age" => 50
}
修改已有数据输出
{
"name" => "牛翠华",
"@timestamp" => 2022-02-20T08:22:10.937Z,
"create_time" => 2022-02-19T02:57:16.000Z,
"id" => 2,
"@version" => "1",
"age" => 25
}
总结
为了搞这个我也是花了一天时间的,理论都知道,一动手就各种问题,如果它对你有帮助就点个赞再走
参考文章:https://dzone.com/articles/migrating-mysql-data-to-elasticsearch-using-logsta



