1. 引入依赖2. 消费kafka生产的数据3. 发送消息到kafka
数据文件 sensor.txt
1. 引入依赖本文主要记录使用Flink消费kafka生产的数据,以及发送数据到kafka
首先需要创建maven项目,添加依赖,添加scala框架支持
2. 消费kafka生产的数据org.apache.flink flink-streaming-scala_2.11 1.10.2 org.apache.flink flink-scala_2.11 1.10.2 org.apache.flink flink-connector-kafka-0.11_2.11 1.10.2
创建流处理环境,创建properties配置对象 配置kafka相关的,然后添加Kafka数据源,由于是消费数据所以在方法中创建消费者,最后打印输出
object KafkaSourceTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 消费kafka数据
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
val kafkaStream: DataStream[String] = env.addSource(
new FlinkKafkaConsumer011[String](
"sensor", // topic
new SimpleStringSchema(),
properties
))
kafkaStream.print()
env.execute("kafka source test")
}
}
3. 发送消息到kafka
直接添加kafkaSink,创建生产者,指定broker-list和topic向kafka发送数据
// 样例类
case class SensorReading(id: String, timestamp: Long, temperature: Double)
object KafkaSinkTest {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 读取文件数据
val inputStream: DataStream[String] = env.readTextFile("data/sensor.txt")
// 简单转换
val dataStream: DataStream[String] = inputStream
.map(data => {
val fields: Array[String] = data.split(",")
SensorReading(fields(0), fields(1).toLong, fields(2).toDouble).toString
})
// 输出到kafka
dataStream.addSink(
new FlinkKafkaProducer011[String](
"localhost:9092", // broker-list
"skintest", // topic
new SimpleStringSchema())
)
dataStream.print()
env.execute("kafka sink test")
}
}
数据文件 sensor.txt
sensor_1,1507718199,4.0 sensor_2,1507718200,25.0 sensor_3,1507718300,37.0 sensor_4,1507718500,-6.0 sensor_1,1507718200,42.0 sensor_2,1507718201,215.0 sensor_3,1507718301,317.0 sensor_4,1507718501,-36.0 sensor_1,1507718300,2.0 sensor_2,1507718202,5.0 sensor_3,1507718302,1.0 sensor_4,1507718502,-5.0



