栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

11.4.3、kafka

11.4.3、kafka

1、kafka的集群

2、使用flink消费kafka的数据

(1)导包


	org.apache.flink
	flink-connector-kafka_2.11
	1.11.2

(2)代码实现读取数据

—读取kafka的数据,就是自定义source
—setStartFromEarliest() // 读取所有的数据
—setStartFromLatest() // 读取最新的数据
—setStartFromGroupOffsets() // 如果消费者组之前存在,接着之前的数据读取
如果消费者组之前不存在,读取最新的数据

object Demo03KafkaSource {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "master:9092")
    properties.setProperty("group.id", "test1")

    //创建kafka的消费者
    val flinkKafkaCusumor = new FlinkKafkaConsumer[String]("test_topic1", new SimpleStringSchema(), properties)

    
    flinkKafkaCusumor.setStartFromGroupOffsets()

    //其实就是一个自定义source
    val kafkaDS = env.addSource(flinkKafkaCusumor)
    kafkaDS.print()

    env.execute()
  }

}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/773976.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号