规划
192.168.56.11 k1 192.168.56.12 k2 192.168.56.13 k3 192.168.56.21 pg11n1 192.168.56.22 pg11n2
k1、k2、k3 安装debezium-connector-postgres
逻辑解码使用内置的 pgoutput,需要确保 pg 参数 shared_preload_libraries 不包含 decoderbufs, wal2json
安装debezium-connector-postgres# su - kafka $ mkdir -p /usr/local/kafka-2.8/plugins $ cd /tmp $ wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.7.1.Final/debezium-connector-postgres-1.7.1.Final-plugin.tar.gz $ tar -zxvf ./debezium-connector-postgres-1.7.1.Final-plugin.tar.gz $ cd debezium-connector-postgres/ $ cp ./*.jar /usr/local/kafka-2.8/libs/ $ cp -R /tmp/debezium-connector-postgres /usr/local/kafka-2.8/plugins配置 connect standalone
配置postgres.properties
# su - kafka $ cd /usr/local/kafka-2.8/config $ vi postgres.properties name=dbz-pg-connector-source connector.class=io.debezium.connector.postgresql.PostgresConnector plugin.name=pgoutput database.hostname=192.168.56.21 database.port=5432 database.user=repl database.password=replrepl database.dbname=yewudb database.history.kafka.bootstrap.servers=192.168.56.11:9092 database.server.name=yewudb database.whitelist=public.tmp_t0
配置connect-standalone.properties
# su - kafka $ cd /usr/local/kafka-2.8/config $ vi connect-standalone.properties bootstrap.servers=192.168.56.11:9092 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000 plugin.path=/usr/local/kafka-2.8/plugins
启动kafka connect
# su - kafka $ /usr/local/kafka-2.8/bin/connect-standalone.sh /usr/local/kafka-2.8/config/connect-standalone.properties /usr/local/kafka-2.8/config/postgres.properties
验证
http://192.168.56.11:8083/connectors
http://192.168.56.11:8083/connector-plugins
查看 topic
$ /usr/local/kafka-2.8/bin/kafka-topics.sh --list --zookeeper 192.168.56.11:2181 __consumer_offsets yewudb.public.tmp_t0
启动监听
$ /usr/local/kafka-2.8/bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.11:9092 --topic yewudb.public.tmp_t0
数据库做DML,发现PG库重启了,还需要进一步分析
$ psql -U yewu yewudb psql (11.13) Type "help" for help. yewudb=> insert into tmp_t0(id) values(66); INSERT 0 1 yewudb=> select * from pg_replication_slots yewudb-> ; slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | /confirm/ied_flush_lsn -----------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+--------------------- debezium | pgoutput | logical | 16385 | yewudb | f | t | 5080 | | 618 | 0/16E7238 | 0/16E7238 (1 row) ewudb=> insert into tmp_t0(id) values(67); INSERT 0 1 yewudb=> select * from pg_replication_slots ; slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | /confirm/ied_flush_lsn -----------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+--------------------- debezium | pgoutput | logical | 16385 | yewudb | f | t | 5080 | | 618 | 0/16E7238 | 0/16E7238 (1 row)配置 connect distributed
参考:



