需把MaxCompute的结果采用Flink DataStream 读取数据,并sink到目标表,MaxCompute文档那边一直没看到文档,后面查看了阿里的Flink文档,这边整理一下,方便后面操作
二、全量同步代码
MaxCompute源表和结果表依赖 VVR 4.0.7(对应Flink 1.13)
com.alibaba.ververica ververica-connector-odps1.13-vvr-4.0.7
1、对某一个分区全量读取数据
try {
Configuration conf = new Configuration();
conf.setString("endpoint", "");// MaxCompute项目的Endpoint信息,详情请参见Endpoint。
conf.setString("tunnelEndpoint","");
conf.setString("project", "");// MaxCompute项目的名称。
conf.setString("tablename", ""); // MaxCompute项目中的表名称
conf.setString("accessid", "");// 阿里云账号AccessKey ID。
conf.setString("accesskey", ""); // 阿里云账号AccessKey Secret。
conf.setString("partition", "dt=20211025"); // MaxCompute项目中的表的分区信息
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Schema信息。
TableSchema schema = org.apache.flink.table.api.TableSchema.builder()
.field("id", DataTypes.STRING())
.build();
//构建MaxCompute Source Function。
ODPSStreamSource odpsSource =
new OdpsSourceBuilder(schema, conf).buildSourceFunction();
DataStreamSource source = env.addSource(odpsSource);
source.addSink(new PrintSinkFunction<>());
env.execute("test_odps");
}catch (Exception e){
e.printStackTrace();
}
import com.alibaba.ververica.connectors.continuous.odps.source.ContinuousODPSStreamSource;
import com.alibaba.ververica.connectors.odps.ODPSStreamSource;
import com.alibaba.ververica.connectors.odps.OdpsConf;
import com.alibaba.ververica.connectors.odps.OdpsOptions;
import com.alibaba.ververica.connectors.odps.schema.ODPSColumn;
import com.alibaba.ververica.connectors.odps.schema.ODPSTableSchema;
import com.alibaba.ververica.connectors.odps.util.OdpsUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
import java.util.List;
public class OdpsSourceBuilder {
private final OdpsConf odpsConf;
private final String startPartition;
private final String odpsTable;
private TypeInformation producedTypeInfo;
private ODPSColumn[] selectedColumns;
private long retryTimes;
private long sleepTimesMs;
private long maxPartitionCount;
private int discoveryIntervalInMs;
private List prunedPartitions;
public OdpsSourceBuilder(
TableSchema tableSchema,
Configuration conf) {
this.odpsConf = OdpsUtils.createOdpsConf(conf);
this.startPartition = conf.getString(OdpsOptions.START_PARTITION);
this.odpsTable = conf.getString(OdpsOptions.TABLE_NAME);
String specificPartition = conf.getString(OdpsOptions.PARTITION);
ODPSTableSchema physicalTableSchema = OdpsUtils.getOdpsTableSchema(odpsConf, odpsTable);
boolean isPartitionedTable = physicalTableSchema.isPartition();
this.maxPartitionCount = conf.getInteger(OdpsOptions.MAX_PARTITION_COUNT);
this.prunedPartitions = getSpecificPartitions(
odpsConf, odpsTable, isPartitionedTable, specificPartition);
Preconditions.checkArgument(
isPartitionedTable || StringUtils.isEmpty(startPartition),
"Non-partitioned table can not be an unbounded source.");
this.producedTypeInfo = InternalTypeInfo.of(tableSchema.toRowDataType().getLogicalType());
this.selectedColumns = OdpsUtils.validateAndGetProjectCols(physicalTableSchema, tableSchema);
this.discoveryIntervalInMs = conf.getInteger(OdpsOptions.SUBSCRIBE_INTERVAL_IN_SEC);
this.retryTimes = conf.getInteger(OdpsOptions.RETRY_TIME);
this.sleepTimesMs = conf.getInteger(OdpsOptions.SLEEP_MILLIS);
}
public ODPSStreamSource buildSourceFunction() {
return new ODPSStreamSource(
odpsConf,
odpsTable,
selectedColumns,
prunedPartitions,
producedTypeInfo,
sleepTimesMs,
retryTimes);
}
public ContinuousODPSStreamSource buildContinuousOdpsSource() {
return new ContinuousODPSStreamSource(
odpsConf,
odpsTable,
selectedColumns,
producedTypeInfo,
sleepTimesMs,
retryTimes,
startPartition,
discoveryIntervalInMs);
}
private List getSpecificPartitions(
OdpsConf odpsConf,
String odpsTable,
boolean isPartitionedTable,
String specificPartition) {
if (!isPartitionedTable) {
return new ArrayList<>();
}
List conditions = new ArrayList<>();
if (StringUtils.isNotEmpty(specificPartition)) {
conditions.add(specificPartition);
}
List partitions = OdpsUtils.getMatchedPartitions(
odpsConf, odpsTable, conditions, true, true);
if (partitions.size() > maxPartitionCount) {
throw new TableException(
String.format(
"The number of matched partitions [%d] exceeds"
+ " the default limit of [%d]! nPlease confirm whether you need to read all these "
+ "partitions, if you really need it, you can increase the `maxPartitionCount` "
+ "in DDL's with options.",
partitions.size(), maxPartitionCount));
}
return partitions;
}
}
三、增量同步代码
1、MaxCompute增量源表依赖 VVR 4.0.7(对应Flink 1.13)
com.alibaba.ververica ververica-connector-continuous-odps1.13-vvr-4.0.7
ODPSStreamSource odpsSource =
new OdpsSourceBuilder(schema, conf).buildSourceFunction();
增量这块没试过,参考了官方文档



