flinksql 在group by添加字段后,从checkpoint重启会报错,
2022-02-07 18:28:33 java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUponFail(StreamTask.java:647) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_563f38a7d955da9b9b5048f28249c3bd_(3/40) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) ... 10 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:177) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111) at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:131) at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:73) at org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:136) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ... 12 more Caused by: org.apache.flink.util.StateMigrationException: The new key serializer (org.apache.flink.table.runtime.typeutils.RowDataSerializer@4ceb3b3d) must be compatible with the previous key serializer (org.apache.flink.table.runtime.typeutils.RowDataSerializer@4ceb3b3d). at org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation.readmetaData(FullSnapshotRestoreOperation.java:210) at org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation.restoreKeyGroupsInStateHandle(FullSnapshotRestoreOperation.java:171) at org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation.access$100(FullSnapshotRestoreOperation.java:113) at org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation$1.next(FullSnapshotRestoreOperation.java:158) at org.apache.flink.runtime.state.restore.FullSnapshotRestoreOperation$1.next(FullSnapshotRestoreOperation.java:140) at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:115) at org.apache.flink.runtime.state.heap.HeapSavepointRestoreOperation.restore(HeapSavepointRestoreOperation.java:57) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:174) ... 19 more
1、可以通过在一个字段作为json打包字段做冗余,来避免从checkpoint恢复失败的问题.



