栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【Flink从入门到精通 05】Source&Sink

【Flink从入门到精通 05】Source&Sink

Flink用于处理有状态的流式计算,需要对Source端的数据进行加工处理,然后写入到Sink端,下图展示了在Flink中数据所经历的过程,今天就根据这张图分别给大家分享下。

01 Environment

Flink所有的程序都从这一步开始,只有创建了执行环境,才能开始下一步的编写。可以使用如下方式获取运行环境:

(1)getExecutionEnvironment

创建一个执行环境,表示当前执行程序的上下文

如果程序是独立调用的,则此方法返回本地执行环境如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境会根据查询运行的方式决定返回什么样的运行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

如果没有设置并行度,会以flink-conf.yaml中的配置为准,默认是1

parallelism.default:1

(2)createRemoteEnvironment

返回集群的执行环境,将Jar提交到远程服务器,需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包

StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname",6123,"YOURPATH//wordcount.jar");

常用的创建执行环境方式为getExecutionEnvironment。

02 Source&Sink

Source即Flink中的数据源,Sink则为数据输出端,Flink通过Flink Streaming Connector来与外部存储系统连接,Flink主要通过四种方式完成数据交换:

Flink预定义的Source与SinkFlink内部提供的Boundled Connectors第三方Apache Bahir项目中的连接器异步IO方式

下面主要对预定义内容及Boundled Connectors作为介绍,更多内容可以参考

(1) 预定义的Source&Sink

先来看一下Flink给我们提供的内置Source,这些方法都位于StreamExecutionEnvironment类中。

Flink中内置的Sink如下图,均位于DataStream类中。

基于文件的 source 和 sink
    从文本文件中读取数据
env.readTextFile(path)
    根据指定的 fileInputFormat 格式读取文件中的内容
env.readFile(fileInputFormat, path)
    将结果从文本或 csv 格式写出到文件中
dataStream.writeAsText(path) ;
dataStream.writeAsCsv(path);
基于Socket的Source和Sink

需要提供 Socket 的 hostname 及 port,可以直接用 StreamExecutionEnvironment 预定的接口

socketTextStream 创建基于 Socket 的 source,从该 socket 中 以文本的形式读取数据

writeToSocket将结果写出到Socket

env.socketTextStream("localhost",9999);
kafkaDStream.writeToSocket("localhost",9999,new SimpleStringSchema());
基于内存 Collections、Iterators 的 Source

用于连接基于内存中的集合或者迭代器,通常用于数据测试。

调用 StreamExecutionEnvironment fromCollection、fromElements 构建相应的 source

结果数据直接 print、 printToError 的方式写出到标准输出或标准错误

env.fromCollection(Collection);
env.fromCollection(Iterator, Class);
env.fromElements(T ...);
(2) Boundled Connectors

在官网中,给出了如下的Connectors:

Apache Kafka (source/sink)Apache Cassandra (sink)Amazon Kinesis Streams (source/sink)Elasticsearch (sink)FileSystem (sink)RabbitMQ (source/sink)Google PubSub (source/sink)Hybrid Source (source)Apache NiFi (source/sink)Apache Pulsar (source)Twitter Streaming API (source)JDBC (sink)

在使用过程中,提交 Job 的时候需要注意, job 代码 jar 包中一定要将相应的 connetor 相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常

(3) 自定义Source&Sink

除了上述的Source与Sink外,Flink还支持自定义Source与Sink。

自定义Source

实现SourceFunction类重写run方法和cancel方法在主函数中通过addSource调用

public class MySource implements SourceFunction {
    // 定义一个运行标志位,表示数据源是否运行
    Boolean flag = true;
    @Override
    public void run(SourceContext sourceContext) throws Exception {
        while (flag){
            sourceContext.collect("当前时间为:" + System.currentTimeMillis());
            Thread.sleep(100);
        }
    }

    @Override
    public void cancel() {
        flag = false;
    }
}
env.addSource(new MySource());
自定义Sink

继承SinkFunction重写invoke方法

下面给出了自定义JDBC Sink的案例,可以参考:

public class MyJdbcSink extends RichSinkFunction {
    
    // 定义连接
    Connection conn;
    
    // 创建连接
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","root");
    }

    // 关闭连接
    @Override
    public void close() throws Exception {
        super.close();
        conn.close();
    }

    // 调用连接执行SQL
    @Override
    public void invoke(String value, Context context) throws Exception {
        PreparedStatement preparedStatement = conn.prepareStatement(value);
        preparedStatement.execute();
        preparedStatement.close();
    }
}
env.addSink(new MyJdbcSink());
03 Transform

关于转换算子,在之前的DataStreamAPI中有过详细介绍,这里不做过多讲解,有不清楚的可以跳转《Flink DataStream API》一文。

04 Kafka Connector

在生产实践中,Kafka Connector是最常用的Connector,下面针对Kafka Connector做详细介绍。

(1)Kafka Consumer

反序列化数据设置消费起始位置Topic和Partition动态发现Commit OffsetTimestamp Extraction/Watermark生成

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer<>(
    java.util.regex.Pattern.compile("test-topic-[0-9]"),
    new SimpleStringSchema(),
    properties);

DataStream stream = env.addSource(myConsumer);
反序列化数据

kafka 中数据是以二进制 byte 形式存储的,读到 Flink 系统中之后,需要将二进制数据转化为具体的 java、scala 对象

实现一个 schema 类, 定义如何序列化和反序列数据反序列化时需要实现 DeserializationSchema 接口,并重写 deserialize(byte[] message) 函数反序列化 kafka 中 kv 的数据时,需要实现 KeyedDeserializationSchema 接口,并重写 deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) 函数常用的序列化反序列化的 schema 类

SimpleStringSchema,按字符串方式进行序列化、反序列化TypeInformationSerializationSchema,根据 Flink 的 TypeInformation 信息来推断出需要选择的 schemaJsonDeserializationSchema 使用 jackson 反序列化 json 格式消息, 并返回 ObjectNode,可以使用 .get(“property”) 方法来访问相应字段 消费起始位置设置

对于数据源来说,能否重设消费位置关系到端到端一致性的保证。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer<>(...);
myConsumer.setStartFromEarliest();     // 尽可能从最早的记录开始
myConsumer.setStartFromLatest();       // 从最新的记录开始
myConsumer.setStartFromTimestamp(...); // 从指定的时间开始(毫秒)
myConsumer.setStartFromGroupOffsets(); // 默认的方法


// 为每个分区指定 consumer 应该开始消费的具体 offset
Map specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);

myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
DataStream stream = env.addSource(myConsumer);
...

FlinkKafkaConsumer 类提供了相应函数,设置合适的起始位置

setStartFromGroupOffsets(默认),从 group offset 位置读取数据,group offset 指的是 kafka broker 端记录的某个 group 的最后一次的消费位置,但是 kafka broker 端没有该 group 信息,会根据 kafka 的参数 “auto.offset.reset” 的设置来决定从哪个位置开始消费setStartFromEarliest,从 kafka 最早的位置开始读取setStartFromLatest,从 kafka 最新的位置开始读取setStartFromTimestamp(long),从时间戳大于或等于指定时间戳的位置开始读取,Kafka 时戳,是指 kafka 为每条消息增加另一个时间戳,可以表示消息在 proudcer 端生成时的时间、或进入到 kafka broker 时的时间setStartFromSpecificOffsets,从指定分区的 offset 位置开始读取,如指定的 offsets 中不存某个分区,该分区从 group offset 位置开始读取。此时需要用户给定一个具体的分区、offset 的集合。

作业从 Checkpoint或Savepoint 恢复时,作业消费起始位置是从之前保存的状态中恢复,与上面提到跟 kafka 这些单独的配置无关

Topic 和分区发现 分区发现

Flink Kafka Consumer 支持发现动态创建的 Kafka 分区,并使用精准一次的语义保证去消费它们。在初始检索分区元数据之后(即当 Job 开始运行时)发现的所有分区将从最早可能的 offset 中消费。

默认情况下,是禁用了分区发现的。若要启用它,需要在提供的属性配置中为 flink.partition-discovery.interval-millis 设置大于 0 的值,表示发现分区的间隔是以毫秒为单位的。

FlinkKafkaConsumer 内部会启动一个单独的线程定期去 kafka 获取最新的 meta 信息。

Topic 发现

在更高的级别上,Flink Kafka Consumer 还能够使用正则表达式基于 Topic 名称的模式匹配来发现 Topic。

当 Job 开始运行时,Consumer 将订阅名称与指定正则表达式匹配的所有主题(以 test-topic 开头并以单个数字结尾)。

要允许 consumer 在作业开始运行后发现动态创建的主题,则要将 flink.partition-discovery.interval-millis 设置非负值,允许 consumer 发现名称与指定模式匹配的新主题的分区。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer myConsumer = new FlinkKafkaConsumer<>(
    java.util.regex.Pattern.compile("test-topic-[0-9]"),
    new SimpleStringSchema(),
    properties);

DataStream stream = env.addSource(myConsumer);
Commit Offset

Flink Kafka Consumer 不依赖于提交的 offset 来实现容错保证

禁用 Checkpointing: 如果禁用了 checkpointing,则 Flink Kafka Consumer 依赖于内部使用的 Kafka client 自动定期 offset 提交功能。 因此,要禁用或启用 offset 的提交,只需将 enable.auto.commit 或者 auto.commit.interval.ms 的Key 值设置为提供的 Properties 配置中的适当值。启用 Checkpointing: 如果启用了 checkpointing,那么当 checkpointing 完成时,Flink Kafka Consumer 将提交的 offset 存储在 checkpoint 状态中。 这确保 Kafka broker 中提交的 offset 与 checkpoint 状态中的 offset 一致。 用户可以通过调用 consumer 上的 setCommitOffsetsonCheckpoints(boolean) 方法来禁用或启用 offset 的提交(默认情况下,这个值是 true )。 注意,该场景中,Properties 中的自动定期 offset 提交设置会被完全忽略 Timestamp Extraction/Watermark生成

Flink Kafka Consumer 允许指定 AssignerWithPeriodicWatermarks 或 AssignerWithPunctuatedWatermarks,此时每个 partition 一个 watermark assigner,source 生成的时戳为多个 partition 时戳对齐后的最小时间戳。

此时在一个 source 读取多个 partition,并且 partition 之间数据时戳有一定差距的情况下,因为在 source 端 watermark 在 partition 级别有对齐,不 会导致数据读取较慢 partition 数据丢失。

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

FlinkKafkaConsumer myConsumer =
    new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(
    WatermarkStrategy
        .forBoundedOutOfOrderness(Duration.ofSeconds(20)));

DataStream stream = env.addSource(myConsumer);
(2)Kafka Producer

Flink Kafka Producer 即 FlinkKafkaProducer。它允许将消息流写入一个或多个 Kafka topic。

构造器接收下列参数:

    kafka主题 topic序列化数据写入 Kafka 的 SerializationSchema / KafkaSerializationSchemaKafka client 的 Properties,其中“bootstrap.servers” (逗号分隔 Kafka broker 列表)配置是必须的容错语义setWriteTimestampToKafka(boolean writeTimestampToKafka),给每条记录设置时间戳setLogFailuresonly(boolean logFailuresOnly) ,设置是否在 Producer 发生异常时仅仅记录日志setTransactionalIdPrefix(String transactionalIdPrefix) ,设置自定义的 transactional.id 前缀ignoreFailuresAfterTransactionTimeout(),在恢复时忽略事务超时异常
DataStream stream = ...;

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");

FlinkKafkaProducer myProducer = new FlinkKafkaProducer(
        "my-topic",                  // 目标 topic
        new SimpleStringSchema(),    // 序列化 schema
        properties,                  // producer 配置
        FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // 容错

stream.addSink(myProducer);
Producer分区

使用 FlinkKafkaProducer 往 kafka 中写数据时,如果不单独设置 partition 策略,默认使用 FlinkFixedPartitioner,该 partitioner 分区的方式是 task 所在的并发 id 对 topic 总 partition 数取余:parallelInstanceId % partitions.length

如果 sink 为 4,paritition 为 1,则 4 个 task 往同一个 partition 中写数据

sink task < partition 个数时会有部分 partition 没有数据写入,如 sink task 为 2,partition 总数为 4,则后面两个 partition 将没有数据写入

如果构建 FlinkKafkaProducer 时,partition 设置为 null,此时会使用 kafka producer 默认分区方式,非 key 写入的情况下,使用 round-robin 的方式进行分区,每个 task 都会轮循的写下游的所有 partition。该方式下游的 partition 数据会比较均衡,但是缺点是 partition 个数过多的情况下需要维持过多的网络连接,即每个 task 都会维持跟所有 partition 所在 broker 的连接

容错

启用 Flink 的 checkpointing 后,FlinkKafkaProducer 可以提供精确一次的语义保证。

除了启用 Flink 的 checkpointing,也可以通过将适当的 semantic 参数传递给 FlinkKafkaProducer 来选择三种不同的操作模式:

Semantic.NONE:Flink 不会有任何语义的保证,产生的记录可能会丢失或重复。Semantic.AT_LEAST_ONCE(默认设置):可以保证不会丢失任何记录(但是记录可能会重复)Semantic.EXACTLY_ONCE:使用 Kafka 事务提供精确一次语义。无论何时,在使用事务写入 Kafka 时,都要记得为所有消费 Kafka 消息的应用程序设置所需的 isolation.level(read_committed 或 read_uncommitted - 后者是默认值)

Semantic.EXACTLY_ONCE 模式依赖于事务提交的能力。事务提交发生于触发 checkpoint 之前,以及从 checkpoint 恢复之后。如果从 Flink 应用程序崩溃到完全重启的时间超过了 Kafka 的事务超时时间,那么将会有数据丢失(Kafka 会自动丢弃超出超时时间的事务)

默认情况下,Kafka broker 将 transaction.max.timeout.ms 设置为 15 分钟。此属性不允许为大于其值的 producer 设置事务超时时间。 默认情况下,FlinkKafkaProducer 将 producer config 中的 transaction.timeout.ms 属性设置为 1 小时,因此在使用 Semantic.EXACTLY_ONCE 模式之前应该增加 transaction.max.timeout.ms 的值。

在 KafkaConsumer 的 read_committed 模式中,任何未结束(既未中止也未完成)的事务将阻塞来自给定 Kafka topic 的未结束事务之后的所有读取数据。 换句话说,在遵循如下一系列事件之后:

    用户启动了 transaction1 并使用它写了一些记录用户启动了 transaction2 并使用它编写了一些其他记录用户提交了 transaction2

即使 transaction2 中的记录已提交,在提交或中止 transaction1 之前,消费者也不会看到这些记录。这有 2 层含义:

首先,在 Flink 应用程序的正常工作期间,用户可以预料 Kafka 主题中生成的记录的可见性会延迟,相当于已完成 checkpoint 之间的平均时间。其次,在 Flink 应用程序失败的情况下,此应用程序正在写入的供消费者读取的主题将被阻塞,直到应用程序重新启动或配置的事务超时时间过去后,才恢复正常。此标注仅适用于有多个 agent 或者应用程序写入同一 Kafka 主题的情况。

注意:Semantic.EXACTLY_ONCE 模式为每个 FlinkKafkaProducer 实例使用固定大小的 KafkaProducer 池。每个 checkpoint 使用其中一个 producer。如果并发 checkpoint 的数量超过池的大小,FlinkKafkaProducer 将抛出异常,并导致整个应用程序失败。请合理地配置最大池大小和最大并发 checkpoint 数量。

注意:Semantic.EXACTLY_ONCE 会尽一切可能不留下任何逗留的事务,否则会阻塞其他消费者从这个 Kafka topic 中读取数据。但是,如果 Flink 应用程序在第一次 checkpoint 之前就失败了,那么在重新启动此类应用程序后,系统中不会有先前池大小(pool size)相关的信息。因此,在第一次 checkpoint 完成前对 Flink 应用程序进行缩容,且并发数缩容倍数大于安全系数 FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR 的值的话,是不安全的。同样,在这种情况使用 setTransactionalIdPrefix() 改变 transactional.id 也是不安全的,因为系统也不知道先前使用的 transactional.id 前缀

05 Function (1) 函数类

Flink暴露了所有udf函数的接口(实现方式为接口或抽象类),如:MapFunction、FilterFunction、ProcessFunction等。

// 将函数实现为匿名类
kafkaDStream.filter(new FilterFunction() {
            @Override
            public boolean filter(String s) throws Exception {
                if (s.contains("actions")) {
                    return true;
                }
                return false;
            }
        });
(2) 富函数

富函数是DataStreamAPI提供的一个函数类的接口,所有Flink函数类都有其Rich版本

可以获取运行环境的上下文拥有生命周期方法

open()方法:初始化方法,在算子调用之前会调用open方法close()方法:生命周期中最后一个调用的方法,用于清理工作getRuntimeContext()方法,提供函数的RuntimeContext的一些信息,如函数执行的并行度、任务的名字、state状态等

public class MyJdbcSink extends RichSinkFunction {

    // 定义连接
    Connection conn;

    // 创建连接,open常用于初始化
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","root");
    }

    // 关闭连接,常用于资源清理
    @Override
    public void close() throws Exception {
        super.close();
        conn.close();
    }

    // 调用连接执行SQL
    @Override
    public void invoke(String value, Context context) throws Exception {
        PreparedStatement preparedStatement = conn.prepareStatement(value);
        preparedStatement.execute();
        preparedStatement.close();
    }
}

关于Flink的Source与Sink就分享到这了,如果对你有帮助,动动小手点个关注吧~

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/746420.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号