栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink table api数据写入外部数据源的更新模式以及常用外部数据源支持的更新模式

Flink table api数据写入外部数据源的更新模式以及常用外部数据源支持的更新模式

更新模式(Update Mode)与常见外部系统支持模式
在流处理过程中,表的处理并不像传统定义的那样简单。
对于流式查询(Streaming Queries),需要声明如何在表和外部连接器之间执行转换。与外部系统交换的消息类型,由更新模式(update mode)指定。
Flink Table API 中的更新模式有以下三种:
①追加模式(Append Mode)
在追加模式下,表和外部连接器只交换插入(Insert)消息。
②撤回模式(Retract Mode)
在撤回模式下,表和外部连接器交换的是:添加(Add)和撤回(Retract)消息。
 插入(Insert)会被编码为添加消息;
 删除(Delete)则编码为撤回消息;
 更新(Update)则会编码为,已更新行(上一行)的撤回消息,和更新行(新行)的添加消息。
在此模式下,不能定义 key,这一点跟 upsert 模式完全不同。
③Upsert(更新插入)模式
在 Upsert 模式下,动态表和外部连接器交换 Upsert 和 Delete 消息。
这个模式需要一个唯一的 key,通过这个 key 可以传递更新消息。为了正确应用消息,
外部连接器需要知道这个唯一 key 的属性。
 插入(Insert)和更新(Update)都被编码为 Upsert 消息;
 删除(Delete)编码为 Delete 信息。
这种模式和 Retract 模式的主要区别在于,Update 操作是用单个消息编码的,所以效率会更高
在table api连接外部数据源中分别提供了上述三种方法
.inAppendMode()
.inRetractMode()
.inUpsertMode()
但是table api能否写入到外部系统完全取决于外部系统对于三种模式的支持情况:
① 写入文件系统
实现的是AppendStreamTableSink,因此只能实现追加模式

② 写入kafka(0.11版本)
实现的是AppendStreamTableSink,因此只能实现追加模式

③ 写入ES(版本6)
实现的是UpsertStreamTableSink,可实现upsert模式

④ 写入mysql
jdbc实现了AppendStreamTableSink与UpsertStreamTableSink,可以实现追加及upsert模式


⑤ 写入redis

为追加模式
需要引入bahir连接依赖

   org.apache.bahir
   flink-connector-redis_2.11
   1.0


class MyRedisMapper extends RedisMapper[SensorReading]{
  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.HSET,"sensor")
  }
  override def getKeyFromData(data: SensorReading): String = data.id
  override def getValueFromData(data: SensorReading): String = data.temperature.toString
}

AppendStreamTableSink接口实现的子类有csv、kafka、jdbc

RetractStreamTableSink接口实现的子类


UpsertStreamTableSink接口实现的子类有ES、jdbc

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/342795.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号