自动提交:
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.codehaus.jackson.map.deser.std.StringDeserializer
object sparkStreaming_kafka_Demo {
def main(args: Array[String]): Unit = {
// TODO 0.准备环境
val conf:SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
val sc:SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc:StreamingContext = new StreamingContext(sc, Seconds(5)) //每隔5秒划分一个批次
ssc.checkpoint("./ckp")
// TODO 1.加载数据-从Kafka加载
val kafkaParams = Map[String,Object](
"bootstrap.servers"->"localhost:9092", //Kafka集群地址
"key.deserializer" -> classOf[StringDeserializer], //key的反序列化规则
"value.deserializer" -> classOf[StringDeserializer], // value的反序列化规则
"group.id" -> "sparkdemo", // 消费者组名称
// earliest:表示如果由offset记录从offset记录开始消费,如果没有从最早的消息开始消费
// latest:表示如果有offset记录从offset记录开始消费,如果没有从最后/最新的消息开始消费
// none: 表示如果有offset记录从offset记录开始消费,如果没有则报错
"auto.offset.reset"->"latest", //如果由offset记录从offset开始消费,没有则从最新的开始消费
"auto.commit.interval.ms" ->"10000", //自动提交的时间间隔
"enable.auto.commit" -> (true:java.lang.Boolean) //是否自动提交
)
val topics = Array("topicA") //要订阅的主题
// TODO 4.读取Kafka数据创建DStream
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc, //传入StreamingContext
LocationStrategies.PreferConsistent, //位置策略
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) //消费策略
) // 消费策略,使用源码中推荐的
// TODO 5.处理消息
val infoDS:DStream[String] = kafkaDStream.map(record => {
val topic: String = record.topic()
val partition: Int = record.partition()
val offset: Long = record.offset()
val key: String = record.key()
val value: String = record.value()
val info: String =
s"""
|topic:${topic},
|partition:${partition},
|offset:${offset},
|key:${key}
|value:${value}
|"""
info
})
// TODO 6.输出结果
infoDS.print()
// TODO 7.启动并等待结束
ssc.start()
ssc.awaitTermination() // 注意:流式应用程序启动之后需要已知运行等待手动停止/等待数据到来
// TODO 8.关闭资源
ssc.stop()
}
}
手动提交:
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.codehaus.jackson.map.deser.std.StringDeserializer
object sparkStreaming_kafka_Demo2 {
def main(args: Array[String]): Unit = {
// TODO 0.准备环境
val conf:SparkConf = new SparkConf().setAppName("spark").setMaster("local[*]")
val sc:SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc:StreamingContext = new StreamingContext(sc, Seconds(5)) //每隔5秒划分一个批次
ssc.checkpoint("./ckp")
// TODO 1.加载数据-从Kafka加载
val kafkaParams = Map[String,Object](
"bootstrap.servers"->"localhost:9092", //Kafka集群地址
"key.deserializer" -> classOf[StringDeserializer], //key的反序列化规则
"value.deserializer" -> classOf[StringDeserializer], // value的反序列化规则
"group.id" -> "sparkdemo", // 消费者组名称
// earliest:表示如果由offset记录从offset记录开始消费,如果没有从最早的消息开始消费
// latest:表示如果有offset记录从offset记录开始消费,如果没有从最后/最新的消息开始消费
// none: 表示如果有offset记录从offset记录开始消费,如果没有则报错
"auto.offset.reset"->"latest", //如果由offset记录从offset开始消费,没有则从最新的开始消费
// "auto.commit.interval.ms" ->"10000", //自动提交的时间间隔
"enable.auto.commit" -> (true:java.lang.Boolean) //是否自动提交
)
val topics = Array("topicA") //要订阅的主题
// TODO 4.读取Kafka数据创建DStream
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)) // 消费策略,使用源码中推荐的
// TODO 5.处理消息
// 注意提交的时机: 应该是消费完一小批久提交一次offset
kafkaDStream.foreachRDD(rdd=>{
if(!rdd.isEmpty()){
rdd.foreach(record=>{
val topic: String = record.topic()
val partition: Int = record.partition()
val offset: Long = record.offset()
val key: String = record.key()
val value: String = record.value()
val info: String =
s"""
|topic:${topic},
|partition:${partition},
|offset:${offset},
|key:${key}
|value:${value}
|"""
println("消费到的消息的详细信息为:" +info)
})
// 获取rdd中offset相关的信息:offsetRanges里面包含了该批次各个分区的offset信息
val offsetRanges:Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 提交
kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
print("当前批次的数据已消费并手动提交")
}
})
// TODO 7.启动并等待结束
ssc.start()
ssc.awaitTermination() // 注意:流式应用程序启动之后需要已知运行等待手动停止/等待数据到来
// TODO 8.关闭资源
ssc.stop()
}
}