docker安装zookeeper,kafka,posgresql,connect
1.1、zookeeper安装
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:latest
1.2、kafka安装
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:latest
1.3、postgresql安装
docker run -it --rm --name database -p 5432:5432 -e POSTGRES_PASSWORD=xxx -d debezium/example-postgres:latest
1.4、connect安装
docker volume create connect docker run -itd --rm --name connect --volume connect:/kafka/connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link postgres:postgres debezium/connect:latest
解析:docker volume create connect 创建了一个connect目录用来挂载connect容器中的/kafka/connect目录(连接器插件目录,为了以后方便添加es connect plugin)
这里connect官方文档提供了一些api接口用来管理connectors,plugins,tasks
安装完成用postman测试 get: ip:8083,安装成功会显示如下信息
{
"version": "2.3.1",
"commit": "18a913733fb71c01",
"kafka_cluster_id": "sgc2CHb1TcKugWd5R6zcXA"
}
创建postgresql连接器
post ip:8083/connectors
{
"name": "test-connector1",
"config": {
"name": "test-connector1",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "192.168.110.192",
"database.port": "5432",
"database.dbname": "xdeasdb",
"database.user": "postgres",
"database.password": "postgres",
"database.server.name": "know", //自定义服务名
"table.whitelist": "knowledge.formal_new", //数据库表白名单,要同步的表单,模式名+表名;和上面配置的"know"生成一个topic "know.knowledge.formal_new"
"plugin.name": "pgoutput" //pg9后自带的输出插件
}
}
验证:获取所有的connectors:get ip:8093/connectors/
[
"test-connector1"
]
订阅topic验证数据同步
进入kafka容器 docker exec -it kafka容器id /bin/bash
进入bin目录 cd bin
订阅队列在控制台输出:
sh ./kafka-console-consumer.sh --bootstrap-server ip:9092 --topic know.knowledge.formal_new --from-beginning;
修改数据库表数据或插入数据;
输出如下信息:
"payload":{"before":null,
"after":{"id":"1","collect_id":"1","title":"test","content":"1","publish_date":1591025759000000,"collect_date":1591025761000000,"status":1,"create_date":1591025764000000,"creater":"1","update_date":1591025769000000,"updater":"1","link":"1","label":["1"],"origin":"4"},
"source":{"version":"1.1.1.Final","connector":"postgresql","name":"know","ts_ms":1591006642405,"snapshot":"false","db":"xdeasdb","schema":"knowledge","table":"knowledge_formal_new","txId":1604,"lsn":29368760,"xmin":null},
"op":"u","ts_ms":1591006642869,"transaction":null}}
"after"字段即同步过来的最新数据,后面同步至es时我们也只需要这个字段的数据;



