栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink采集本地文件到kafka (本地跑)

flink采集本地文件到kafka (本地跑)

kafka地址为:

创建测试topic。
首先找到创建脚本:kafka-topics.sh

命令:find /opt/ -name 'kafka-topics*’

查看所有topic:kafka-topics --zookeeper localhost:2181 —list

创建一个mgtest

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mgtest1

代码如下:

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011


//采集本地数据到kafka
object KafkaSinkTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)


    val inputStream = env.readTextFile("/Users/edy/IdeaProjects/flinksql/src/main/resources/sensor.txt")
    val datastream = inputStream.map(x => {
      val arr = x.split(",")
      Test(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble).toString
    })


    //sink
    datastream.addSink(new FlinkKafkaProducer011[String]("172.16.104.2:9092,172.16.104.3:9092,172.16.104.4:9092","mgtest",new SimpleStringSchema()))
    datastream.print()


    env.execute(" kafka sink test")


  }


case class Test(id: String, timestamp: Long, temperature: Double)
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/741807.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号