栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink blink(flink官方文档)

flink blink(flink官方文档)

Flink连接Kakfa

1. 引入依赖2. 消费kafka生产的数据3. 发送消息到kafka

数据文件 sensor.txt

本文主要记录使用Flink消费kafka生产的数据,以及发送数据到kafka

1. 引入依赖

首先需要创建maven项目,添加依赖,添加scala框架支持


   
         org.apache.flink
         flink-streaming-scala_2.11
         1.10.2
     
     
         org.apache.flink
         flink-scala_2.11
         1.10.2
     

     
         org.apache.flink
         flink-connector-kafka-0.11_2.11
         1.10.2
     
 
2. 消费kafka生产的数据

创建流处理环境,创建properties配置对象 配置kafka相关的,然后添加Kafka数据源,由于是消费数据所以在方法中创建消费者,最后打印输出

object KafkaSourceTest {
  def main(args: Array[String]): Unit = {
	val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
	
	// 消费kafka数据
	val properties = new Properties()
	properties.setProperty("bootstrap.servers", "localhost:9092")
	properties.setProperty("group.id", "consumer-group")
	
	val kafkaStream: DataStream[String] = env.addSource(
	  new FlinkKafkaConsumer011[String](
	    "sensor", // topic
	    new SimpleStringSchema(),
	    properties
	  ))
	 kafkaStream.print()
	 env.execute("kafka source test")
   }
 }
3. 发送消息到kafka

直接添加kafkaSink,创建生产者,指定broker-list和topic向kafka发送数据

// 样例类
case class SensorReading(id: String, timestamp: Long, temperature: Double)

object KafkaSinkTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    // 读取文件数据
    val inputStream: DataStream[String] = env.readTextFile("data/sensor.txt")
    // 简单转换
    val dataStream: DataStream[String] = inputStream
      .map(data => {
        val fields: Array[String] = data.split(",")
        SensorReading(fields(0), fields(1).toLong, fields(2).toDouble).toString
      })

    // 输出到kafka
    dataStream.addSink(
      new FlinkKafkaProducer011[String](
        "localhost:9092", // broker-list
        "skintest", // topic
        new SimpleStringSchema())
    )
    
	dataStream.print()
	
    env.execute("kafka sink test")
  }
}
数据文件 sensor.txt
sensor_1,1507718199,4.0
sensor_2,1507718200,25.0
sensor_3,1507718300,37.0
sensor_4,1507718500,-6.0
sensor_1,1507718200,42.0
sensor_2,1507718201,215.0
sensor_3,1507718301,317.0
sensor_4,1507718501,-36.0
sensor_1,1507718300,2.0
sensor_2,1507718202,5.0
sensor_3,1507718302,1.0
sensor_4,1507718502,-5.0
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/771825.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号