栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink cdc 的 问题

flink cdc 的 问题

 问题:一 Can't call rollback when autocommit=true
2022-03-21 11:44:29,859 INFO  io.debezium.jdbc.JdbcConnection                              [] - Connection gracefully closed
2022-03-21 11:44:29,859 INFO  io.debezium.connector.mysql.MySqlConnectorTask               [] - Connector task finished all work and is now shutdown
2022-03-21 11:44:29,870 ERROR io.debezium.connector.mysql.SnapshotReader                   [] - Failed due to error: Aborting snapshot due to error when last running 'SELECt * FROM `test`.`test`': Can't call rollback when autocommit=true
org.apache.kafka.connect.errors.ConnectException: Can't call rollback when autocommit=true Error code: 0; SQLSTATE: 08003.
	at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:241) ~[stream-1.0-test.jar:?]
	at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:218) ~[stream-1.0-test.jar:?]
	at io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:857) ~[stream-1.0-test.jar:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_272]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_272]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_272]
Caused by: java.sql.SQLNonTransientConnectionException: Can't call rollback when autocommit=true
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:110) ~[stream-1.0-test.jar:?]
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97) ~[stream-1.0-test.jar:?]
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:89) ~[stream-1.0-test.jar:?]
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:63) ~[stream-1.0-test.jar:?]
	at com.mysql.cj.jdbc.ConnectionImpl.rollback(ConnectionImpl.java:1833) ~[stream-1.0-test.jar:?]
	at io.debezium.connector.mysql.SnapshotReader.execute(SnapshotReader.java:766) ~[stream-1.0-test.jar:?]
	... 3 more
2022-03-21 11:44:29,872 INFO  io.debezium.jdbc.JdbcConnection                              [] - Connection gracefully closed
2022-03-21 11:44:30,219 INFO  io.debezium.embedded.EmbeddedEngine                          [] - Stopping the embedded engine

解决:仔细查看代码,发现一个空指针异常的问题

问题:二 配置重启

程序执行一段时间挂了,希望数据不丢失

MySQLSource.Builder builder = MySQLSource.builder()
                .hostname(properties.getProperty("hostname"))
                .username(properties.getProperty("username"))
                .password(properties.getProperty("password"))
                .port(3306)
                .serverId(6401)
                .deserializer(new MyDeserializationSchema()) //去参数里面找找实现类
                .databaseList(database) //可以指定多个库
                .tableList(tableName) //因为是多个库 所以要指定库名+表名
                .debeziumProperties(debeProp); //配置不要锁表 但是数据一致性不是精准一次 会变成最少一次
                //.startupOptions(StartupOptions.initial())// 读取binlog策略 这个启动选项有五种

        if (args.length > 1) {
            // 读取binlog策略,从file 和 pos启动
            String offsetFile = args[0];
            int offsetPos = Integer.valueOf(args[1]);
            builder.startupOptions(StartupOptions.specificOffset(offsetFile, offsetPos));
        } else if (args.length == 1) {
            // 读取binlog策略,从指定时间戳启动(毫秒)
            long timeStamp = Long.valueOf(args[0]);
            builder.startupOptions(StartupOptions.timestamp(timeStamp));
        } else {
            // 读取binlog策略 第一次启动(全量读取,再增量读取binlog)
            builder.startupOptions(StartupOptions.initial());
        }
        DebeziumSourceFunction mysqlSource = builder.build();

        DataStreamSource source = env.addSource(mysqlSource);

解决:

# 正在写的binlog文件
SHOW MASTER STATUS

# 所有的binlog文件
SHOW MASTER LOGS
SHOW BINARY LOGS

# 写入binlog文件的事件
SHOW BINLOG EVENTS 

# 指定文件查看事件
SHOW BINLOG EVENTS IN 'mysql-bin.096356'

        方法1、在mysql的客户端 : show master status  将文件和位点传入上面的程序里面;

        方法2、传入一个时间戳(毫秒级)  

问题: 三  The connector is trying to read binlog starting at binlog

配置的文件和位点启动失败

Caused by: org.apache.kafka.connect.errors.ConnectException: The connector is trying to read binlog starting at binlog file 'mysql-bin.096422', pos=49294060, skipping 0 events plus 0 rows, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.
		at io.debezium.connector.mysql.MySqlConnectorTask.start(MySqlConnectorTask.java:94)
		at io.debezium.connector.common.baseSourceTask.start(baseSourceTask.java:106)
		at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:758)
		at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171)
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
		at java.lang.Thread.run(Thread.java:748)

解决:查看配置的binlog是否还存在(show master logs),查看mysql的日志文件的保存时间(show variables like "%expire_logs_days%")。如果这个binlog已经被删除了(binlog有保留策略),你可能需要重新再消费一遍,或者你不在乎数据丢失,配置新的文件和位点就行了。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/775049.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号