栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink (二):MaxCompute DataStream Connector

Flink (二):MaxCompute DataStream Connector

一、背景

需把MaxCompute的结果采用Flink DataStream 读取数据,并sink到目标表,MaxCompute文档那边一直没看到文档,后面查看了阿里的Flink文档,这边整理一下,方便后面操作

  

二、全量同步代码

MaxCompute源表和结果表依赖  VVR 4.0.7(对应Flink 1.13)

 
      com.alibaba.ververica
      ververica-connector-odps
      1.13-vvr-4.0.7

    1、对某一个分区全量读取数据

      

   try {
            Configuration conf = new Configuration();
            conf.setString("endpoint", "");// MaxCompute项目的Endpoint信息,详情请参见Endpoint。
            conf.setString("tunnelEndpoint","");
            conf.setString("project", "");// MaxCompute项目的名称。
            conf.setString("tablename", "");  // MaxCompute项目中的表名称
            conf.setString("accessid", "");// 阿里云账号AccessKey ID。
            conf.setString("accesskey", "");  // 阿里云账号AccessKey Secret。
            conf.setString("partition", "dt=20211025"); // MaxCompute项目中的表的分区信息
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         //Schema信息。
         TableSchema schema = org.apache.flink.table.api.TableSchema.builder()
                .field("id", DataTypes.STRING())
                .build();
        //构建MaxCompute Source Function。
        ODPSStreamSource odpsSource =
                new OdpsSourceBuilder(schema, conf).buildSourceFunction();
          DataStreamSource source = env.addSource(odpsSource);
          source.addSink(new PrintSinkFunction<>());
          env.execute("test_odps");
        }catch (Exception e){
           e.printStackTrace();
        }

import com.alibaba.ververica.connectors.continuous.odps.source.ContinuousODPSStreamSource;
import com.alibaba.ververica.connectors.odps.ODPSStreamSource;
import com.alibaba.ververica.connectors.odps.OdpsConf;
import com.alibaba.ververica.connectors.odps.OdpsOptions;
import com.alibaba.ververica.connectors.odps.schema.ODPSColumn;
import com.alibaba.ververica.connectors.odps.schema.ODPSTableSchema;
import com.alibaba.ververica.connectors.odps.util.OdpsUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.util.Preconditions;

import java.util.ArrayList;
import java.util.List;

public class OdpsSourceBuilder {

        private final OdpsConf odpsConf;
        private final String startPartition;
        private final String odpsTable;

        private TypeInformation producedTypeInfo;
        private ODPSColumn[] selectedColumns;

        private long retryTimes;
        private long sleepTimesMs;
        private long maxPartitionCount;
        private int discoveryIntervalInMs;
        private List prunedPartitions;

        public OdpsSourceBuilder(
                TableSchema tableSchema,
                Configuration conf) {
            this.odpsConf = OdpsUtils.createOdpsConf(conf);
            this.startPartition = conf.getString(OdpsOptions.START_PARTITION);
            this.odpsTable = conf.getString(OdpsOptions.TABLE_NAME);
            String specificPartition = conf.getString(OdpsOptions.PARTITION);
            ODPSTableSchema physicalTableSchema = OdpsUtils.getOdpsTableSchema(odpsConf, odpsTable);

            boolean isPartitionedTable = physicalTableSchema.isPartition();
            this.maxPartitionCount = conf.getInteger(OdpsOptions.MAX_PARTITION_COUNT);
            this.prunedPartitions = getSpecificPartitions(
                    odpsConf, odpsTable, isPartitionedTable, specificPartition);

            Preconditions.checkArgument(
                    isPartitionedTable || StringUtils.isEmpty(startPartition),
                    "Non-partitioned table can not be an unbounded source.");

            this.producedTypeInfo = InternalTypeInfo.of(tableSchema.toRowDataType().getLogicalType());
            this.selectedColumns = OdpsUtils.validateAndGetProjectCols(physicalTableSchema, tableSchema);

            this.discoveryIntervalInMs = conf.getInteger(OdpsOptions.SUBSCRIBE_INTERVAL_IN_SEC);
            this.retryTimes = conf.getInteger(OdpsOptions.RETRY_TIME);
            this.sleepTimesMs = conf.getInteger(OdpsOptions.SLEEP_MILLIS);
        }

        public ODPSStreamSource buildSourceFunction() {
            return new ODPSStreamSource(
                    odpsConf,
                    odpsTable,
                    selectedColumns,
                    prunedPartitions,
                    producedTypeInfo,
                    sleepTimesMs,
                    retryTimes);
        }

        public ContinuousODPSStreamSource buildContinuousOdpsSource() {
            return new ContinuousODPSStreamSource(
                    odpsConf,
                    odpsTable,
                    selectedColumns,
                    producedTypeInfo,
                    sleepTimesMs,
                    retryTimes,
                    startPartition,
                    discoveryIntervalInMs);
        }

        private List getSpecificPartitions(
                OdpsConf odpsConf,
                String odpsTable,
                boolean isPartitionedTable,
                String specificPartition) {
            if (!isPartitionedTable) {
                return new ArrayList<>();
            }
            List conditions = new ArrayList<>();
            if (StringUtils.isNotEmpty(specificPartition)) {
                conditions.add(specificPartition);
            }
            List partitions = OdpsUtils.getMatchedPartitions(
                    odpsConf, odpsTable, conditions, true, true);

            if (partitions.size() > maxPartitionCount) {
                throw new TableException(
                        String.format(
                                "The number of matched partitions [%d] exceeds"
                                        + " the default limit of [%d]! nPlease confirm whether you need to read all these "
                                        + "partitions, if you really need it, you can increase the `maxPartitionCount` "
                                        + "in DDL's with options.",
                                partitions.size(), maxPartitionCount));
            }
            return partitions;
        }
}

三、增量同步代码

     1、MaxCompute增量源表依赖 VVR 4.0.7(对应Flink 1.13)

        
            com.alibaba.ververica
            ververica-connector-continuous-odps
            1.13-vvr-4.0.7
        

     

ODPSStreamSource odpsSource =
    new OdpsSourceBuilder(schema, conf).buildSourceFunction();

  增量这块没试过,参考了官方文档

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/701702.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号