(1)导包
org.apache.flink flink-connector-kafka_2.11 1.11.2
(2)代码实现读取数据
—读取kafka的数据,就是自定义source
—setStartFromEarliest() // 读取所有的数据
—setStartFromLatest() // 读取最新的数据
—setStartFromGroupOffsets() // 如果消费者组之前存在,接着之前的数据读取
如果消费者组之前不存在,读取最新的数据
object Demo03KafkaSource {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "master:9092")
properties.setProperty("group.id", "test1")
//创建kafka的消费者
val flinkKafkaCusumor = new FlinkKafkaConsumer[String]("test_topic1", new SimpleStringSchema(), properties)
flinkKafkaCusumor.setStartFromGroupOffsets()
//其实就是一个自定义source
val kafkaDS = env.addSource(flinkKafkaCusumor)
kafkaDS.print()
env.execute()
}
}



