kafka地址为:
创建测试topic。
首先找到创建脚本:kafka-topics.sh
命令:find /opt/ -name 'kafka-topics*’
查看所有topic:kafka-topics --zookeeper localhost:2181 —list
创建一个mgtest
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mgtest1
代码如下:
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
//采集本地数据到kafka
object KafkaSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val inputStream = env.readTextFile("/Users/edy/IdeaProjects/flinksql/src/main/resources/sensor.txt")
val datastream = inputStream.map(x => {
val arr = x.split(",")
Test(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble).toString
})
//sink
datastream.addSink(new FlinkKafkaProducer011[String]("172.16.104.2:9092,172.16.104.3:9092,172.16.104.4:9092","mgtest",new SimpleStringSchema()))
datastream.print()
env.execute(" kafka sink test")
}
case class Test(id: String, timestamp: Long, temperature: Double)
}



