导入依赖
2、从kafka读取数据org.scala-lang scala-library 2.11.0 org.apache.spark spark-streaming_2.11 2.3.2 org.apache.spark spark-streaming-flume_2.11 2.3.2 com.alibaba fastjson 1.2.76 org.apache.spark spark-core_2.11 2.3.2 org.apache.spark spark-streaming-kafka-0-10_2.11 2.3.2 org.apache.spark spark-sql_2.11 2.3.2 org.apache.spark spark-sql-kafka-0-10_2.11 2.3.2 redis.clients jedis 2.9.0 org.apache.commons commons-lang3 3.3.2 com.alibaba fastjson 1.2.76 org.apache.kafka kafka_2.11 2.3.0 org.apache.kafka kafka-clients 2.1.0
注意:
1、structured streaming 读取kafka数据是不需要设置group id的。
2、df.selectExpr()中还可以选择kafka的key等
代码如下
val conf = new SparkConf().setMaster("local[*]")
val spark = SparkSession
.builder()
.config(conf)
.appName(getClass.getName)
.getOrCreate()
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", PropertiesUtils.loadProperties("kafka.broker.list"))
.option("subscribe", "cache_tmp")//
.option("startingOffsets", "earliest")
.load()
val re = df.selectExpr("CAST(value AS STRING) ")//
.as[(String)]
3、写入kafka
注意:
1、写入kafka需要做checkpoint(),checkpoint 中保存着偏移量
2、目的 kafka 即使不存在 也会自己创建。
代码如下
val query = re
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers",PropertiesUtils.loadProperties("kafka.broker.list"))
.option("topic","tmp_t")
.option("checkpointLocation","E:/t_check_2")//生产环境中要放在hdfs哟。
.start()
query.awaitTermination()
这里的checkpoint的目录保存这消费的偏移量等等信息。



