- 构建zookeeper集群
- IP1
- IP2
- 构建kafka集群
- IP1
- IP2
- 构建kafka-connect
- IP1
- IP2
- 启动集群
- 测试与sqlserver连接
- 搭建sqlserver环境
- 创建测试数据库
- 开启cdc
- 新建connect
- 消费数据
假设目前有以两台主机参与构建本次集群。
| IP1 | IP2 |
|---|---|
| 192.168.10.2 | 192.168.10.3 |
docker和docker-compose已安装完毕。
IP1构建docker子网络。
构建docker-compose.yml。
version: "3"
services:
zookeeper:
image: zookeeper:latest
restart: always
ports:
- 2181:2181
- 2888:2888
- 3888:3888
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: quorumListenOnAllIPs:true server.1=192.168.10.2:2888:3888;2181 server.2=192.168.10.3:2888:3888;2181
注意:
1. 每台ip的ZOO_MY_ID不一样
2. ZOO_SERVERS 如果多个的话:空格server.3=ip:2888:3888;2181
构建docker子网络。
构建docker-compose.yml。
version: "3"
services:
zookeeper:
image: zookeeper:latest
restart: always
ports:
- 2181:2181
- 2888:2888
- 3888:3888
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: quorumListenOnAllIPs:true server.1=192.168.10.2:2888:3888;2181 server.2=192.168.10.3:2888:3888;2181
构建kafka集群
IP1
构建docker-compose.yml。
kafka:
image: quay.io/debezium/kafka:1.9
ports:
- 9092:9092
links:
- zookeeper
restart: always
environment:
- ADVERTISED_PORT=9092
- ADVERTISED_HOST_NAME=192.168.10.2
- ZOOKEEPER_CONNECT=192.168.10.2:2181,192.168.10.3:2181
- BROKER_ID=1
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2
- KAFKA_MIN_INSYNC_REPLICAS=2
- KAFKA_DEFAULT_REPLICATION_FACTOR=2
entrypoint: ["sh", "-c", "sed -i 's/-Xmx1G -Xms1G/-Xmx256M -Xms128M/g' bin/kafka-server-start.sh &&
exec /docker-entrypoint.sh start"]
注意:
1. 每台ip的BROKER_ID不一样。
2. ZOOKEEPER_CONNECT需要把所有的集群写上。
3. ADVERTISED_HOST_NAME为本机ip。
4. ADVERTISED_PORT默认是9092,如果上面端口变了需要修改。
构建docker-compose.yml。
kafka:
image: quay.io/debezium/kafka:1.9
ports:
- 9092:9092
links:
- zookeeper
restart: always
environment:
- ADVERTISED_PORT=9092
- ADVERTISED_HOST_NAME=192.168.10.3
- ZOOKEEPER_CONNECT=192.168.10.2:2181,192.168.10.3:2181
- BROKER_ID=2
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2
- KAFKA_MIN_INSYNC_REPLICAS=2
- KAFKA_DEFAULT_REPLICATION_FACTOR=2
entrypoint: ["sh", "-c", "sed -i 's/-Xmx1G -Xms1G/-Xmx256M -Xms128M/g' bin/kafka-server-start.sh &&
exec /docker-entrypoint.sh start"]
构建kafka-connect
IP1
构建docker-compose.yml。
connect:
image: quay.io/debezium/connect:1.9
ports:
- 8083:8083
restart: always
environment:
- GROUP_ID=connect-cluster
- BOOTSTRAP_SERVERS=192.168.10.2:9092,192.168.10.3:9092
- CONFIG_STORAGE_TOPIC=my-connect-config
- OFFSET_STORAGE_TOPIC=my-connect-offsets
- STATUS_STORAGE_TOPIC=my-connect-status
- OFFSET_FLUSH_TIMEOUT_MS=60000
- CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=2
- CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=2
- CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=2
- CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY=All
注意:
1. BOOTSTRAP_SERVERS需要把所有的集群写上。
IP2构建docker-compose.yml。
connect:
image: quay.io/debezium/connect:1.9
ports:
- 8083:8083
restart: always
environment:
- GROUP_ID=connect-cluster
- BOOTSTRAP_SERVERS=192.168.10.2:9092,192.168.10.3:9092
- CONFIG_STORAGE_TOPIC=my-connect-config
- OFFSET_STORAGE_TOPIC=my-connect-offsets
- STATUS_STORAGE_TOPIC=my-connect-status
- OFFSET_FLUSH_TIMEOUT_MS=60000
- CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=2
- CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=2
- CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=2
- CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY=All
启动集群
启动集群,分别在ip1和ip2上执行
docker-compose up -d
查看启动状态
docker-compose ps测试与sqlserver连接 搭建sqlserver环境
本次测试搭建sqlserver2017。
version: '3'
services:
#服务名称
mssql:
#容器名称
container_name: mssql
#镜像名称
image: microsoft/mssql-server-linux:2017-latest
#总是重启后启动
restart: always
#端口映射
ports:
- 1433:1433
#挂载
volumes:
- ./data:/var/lib/rabbitmq
#环境变量
environment:
- ACCEPT_EULA=Y
#SA用户密码
- SA_PASSWORD=123456!@
docker-compose up -d
创建测试数据库docker-compose exec mssql bash /opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P '123456!@' CREATE DATABASE yl_test COLLATE Chinese_PRC_CI_AS; go
随便创建一张表,插入几条数据。
当前数据库启用cdc:
USE yl_test GO EXECUTE sys.sp_cdc_enable_db; GO
开启表CDC:
IF EXISTS(SELECt 1 FROM sys.tables WHERe name='data_test' AND is_tracked_by_cdc = 0) BEGIN EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'data_test', @capture_instance = NULL, @supports_net_changes = 1, @role_name = NULL, @index_name = NULL, @captured_column_list = NULL, END GO
–查看数据库是否启用cdc:
SELECt name ,is_cdc_enabled FROM sys.databases WHERe is_cdc_enabled = 1
–查看当前数据库表是否启用cdc:
SELECt name ,is_tracked_by_cdc FROM sys.tables WHERe is_tracked_by_cdc = 1新建connect
post请求(json格式): http://192.168.10.3:8083/connectors
{
"name":" connector_name",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max":"1",
"database.hostname": "192.168.10.3",#sqlserverip
"database.port": "1433",#sqlserver端口
"database.user": "sa",#sqlserver用户名
"database.password": "123456!@",#sqlserver密码
"database.dbname": "yl_test",#sqlserver库名
"database.server.name": "yl_test",
"database.history.kafka.bootstrap.servers": "192.168.10.2:9092,192.168.10.3:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}
消费数据
docker-compose exec kafka /kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.2:9092,192.168.10.3:9092 --from-beginning --property print.key=true --topic yl_test.dbo.data_test



