- 一. Spark streaming整合Kafka概述
- 1.1 Maven配置
- 1.2 创建Direct Stream
- 1.3 定位策略
- 1.4 消费者的策略
- 1.5 创建RDD
- 1.6 获得Offsets
- 1.7 存储 Offsets
- 1.8 检查点
- 1.9 Kafka自身
- 1.10 自身数据存储
- 二.Spark Streaming整合Kafka实战
- 2.1 Maven配置
- 2.2 代码
- 2.3 测试
- 参考:
对于使用SBT/Maven项目定义的Scala/Java应用程序,将您的流应用程序与以下工件链接(参见主编程指南中的链接部分获取更多信息)。
groupId = org.apache.spark artifactId = spark-streaming-kafka-0-10_2.11 version = 2.4.0
不要手动添加依赖于org.apache.kafka的工件(例如kafka-clients)。spark-streaming-kafka-0-10工件已经有了适当的传递依赖,并且不同的版本可能难以诊断的方式不兼容。
1.2 创建Direct Stream注意,导入的命名空间包括版本,org.apache.spark.streaming.kafka010
import java.util.*; import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.*; import org.apache.spark.streaming.api.java.*; import org.apache.spark.streaming.kafka010.*; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import scala.Tuple2; MapkafkaParams = new HashMap<>(); kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false); Collection topics = Arrays.asList("topicA", "topicB"); JavaInputDStream > stream = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies. Subscribe(topics, kafkaParams) ); stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
对于可能的kafkaParams,请参阅Kafka消费者配置文档。如果你的Spark批处理时间大于Kafka默认的心跳会话超时时间(30秒),适当增加heartbeat.interval.ms和session.timeout.ms。对于大于5分钟的批,这将需要在代理上更改group.max.session.timeout.ms。注意,该示例将enable.auto.commit设置为false,有关讨论,请参阅下面的存储偏移量。
1.3 定位策略新的Kafka消费者API会预取消息到缓冲区。因此,出于性能考虑,Spark集成将缓存的消费者保留在执行器上(而不是为每批重新创建它们),并倾向于在拥有适当消费者的主机位置上调度分区,这一点很重要。
在大多数情况下,你应该使用LocationStrategies。preferred consistent如上所示。这将在可用的执行器之间均匀地分布分区。如果你的执行器和你的Kafka代理在同一个主机上,使用PreferBrokers,它会更喜欢在Kafka的leader上为那个分区调度分区。最后,如果分区之间的负载有显著的倾斜,请使用PreferFixed。这允许您指定分区到主机的显式映射(任何未指定的分区将使用一致的位置)。
消费者的缓存默认最大大小为64。如果你希望处理超过(64 *个执行器)的Kafka分区,你可以通过spark.streaming.kafka.consumer.cache.maxCapacity来改变这个设置。
如果你想要关闭Kafka消费者的缓存,你可以设置spark.streaming.kafka.consumer.cache.enabled为false。为了解决SPARK-19185中描述的问题,可能需要禁用缓存。一旦Spark -19185被解析,这个属性可能会在Spark的后续版本中被删除。
缓存是由topicpartition和group.id决定的,所以使用一个单独的组。每个调用createDirectStream的id。
1.4 消费者的策略新的Kafka消费者API有许多不同的方式来指定主题,其中一些需要大量的后对象实例化设置。ConsumerStrategies提供了一个抽象,允许Spark在从检查点重新启动后获得正确配置的消费者。
ConsumerStrategies。如上所示,订阅允许您订阅固定的主题集合。SubscribePattern允许您使用正则表达式来指定感兴趣的主题。注意,与0.8集成不同,使用Subscribe或SubscribePattern应该在流运行期间响应添加分区。最后,Assign允许指定一个固定的分区集合。这三种策略都有重载构造函数,允许您指定特定分区的起始偏移量。
如果您有特定的消费者设置需求,而上述选项无法满足这些需求,那么ConsumerStrategy是一个可以扩展的公共类。
1.5 创建RDD如果您有一个更适合批处理的用例,您可以为定义的偏移量范围创建一个RDD。
// import dependencies and create kafka params as in Create Direct Stream above
OffsetRange[] offsetRanges = {
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange.create("test", 0, 0, 100),
OffsetRange.create("test", 1, 0, 100)
};
JavaRDD> rdd = KafkaUtils.createRDD(
sparkContext,
kafkaParams,
offsetRanges,
LocationStrategies.PreferConsistent()
);
注意,您不能使用PreferBrokers,因为如果没有流,就没有驱动端消费者自动为您查找代理元数据。如果有必要,可以将PreferFixed与自己的元数据查找一起使用。
1.6 获得Offsetsstream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
rdd.foreachPartition(consumerRecords -> {
OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
System.out.println(
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
});
});
注意,对hasoffsetrange的类型转换只有在对createDirectStream的结果调用的第一个方法中才会成功,而不是在之后的方法链中。需要注意的是,RDD分区和Kafka分区之间的一对一映射在任何shuffle或重分区方法之后都不会保留,例如reduceByKey()或window()。
1.7 存储 OffsetsKafka的传递语义在失败的情况下取决于偏移量如何和何时存储。Spark输出操作至少一次。因此,如果您想要等价于精确一次语义,则必须要么在幂等输出之后存储偏移量,要么在原子事务中与输出一起存储偏移量。通过这个集成,您有3个选项来存储偏移量,以提高可靠性(和代码复杂度)。
1.8 检查点如果启用Spark检查点,偏移量将存储在检查点中。这很容易启用,但也有缺点。你的输出操作必须是幂等的,因为你会得到重复的输出;事务不是一个选项。此外,如果应用程序代码已更改,则无法从检查点恢复。对于计划的升级,您可以通过在旧代码的同时运行新代码来缓解这个问题(因为输出无论如何都需要是幂等的,它们不应该冲突)。但是对于需要更改代码的计划外故障,除非有其他方法来确定已知的良好起始偏移量,否则您将丢失数据。
1.9 Kafka自身Kafka有一个偏移量提交API,它将偏移量存储在一个特殊的Kafka主题中。默认情况下,新使用者将定期自动提交偏移量。这几乎肯定不是您想要的,因为由使用者成功轮询的消息可能还没有导致Spark输出操作,从而导致未定义的语义。这就是为什么上面的流例子将" enable.auto.commit "设置为false。然而,你可以在知道你的输出已经被存储后,使用commitAsync API提交偏移量给Kafka。与检查点相比,Kafka的好处是不管你的应用代码发生了什么变化,它都是一个持久的存储。然而,Kafka不是事务性的,所以你的输出必须仍然是幂等的。
stream.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// some time later, after outputs have completed
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});
1.10 自身数据存储
对于支持事务的数据存储,将偏移量作为结果保存在同一个事务中可以使两者保持同步,即使在失败的情况下也是如此。如果您非常小心地检测重复或跳过的偏移范围,则回滚事务可以防止重复或丢失的消息影响结果。这提供了等价的“一次精确”语义。甚至对于聚集产生的输出也可以使用这种策略,聚集通常很难使其幂等。
// The details depend on your data store, but the general idea looks like this // begin from the the offsets committed to the database Map二.Spark Streaming整合Kafka实战 2.1 Maven配置fromOffsets = new HashMap<>(); for (resultSet : selectOffsetsFromYourDatabase) fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset")); } JavaInputDStream > stream = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies. Assign(fromOffsets.keySet(), kafkaParams, fromOffsets) ); stream.foreachRDD(rdd -> { OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); Object results = yourCalculation(rdd); // begin your transaction // update results // update offsets where the end of existing offsets matches the beginning of this batch of offsets // assert that offsets were updated correctly // end your transaction });
下面是我整个项目的Maven配置
2.2 代码4.0.0 org.example SparkStudy1.0-SNAPSHOT http://www.example.com UTF-8 1.8 1.8 2.11 junit junit4.11 test org.apache.spark spark-core_2.122.4.0 provided org.apache.spark spark-sql_2.122.4.0 provided org.apache.spark spark-streaming_2.112.4.0 org.apache.spark spark-streaming-kafka-0-10_2.112.4.0 maven-clean-plugin 3.1.0 maven-resources-plugin 3.0.2 maven-compiler-plugin 3.8.0 maven-surefire-plugin 2.22.1 maven-jar-plugin 3.0.2 maven-install-plugin 2.5.2 maven-deploy-plugin 2.8.2 maven-site-plugin 3.7.1 maven-project-info-reports-plugin 3.0.0 org.apache.maven.plugins maven-jar-plugin3.0.2 true org.zqs.kafka.Producer org.springframework.boot spring-boot-maven-plugin
生产者代码:
package org.zqs.kafka;
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class Producer {
public static String topic = "test_20210816_2";//定义主题
public static void main(String[] args) throws Exception {
Properties p = new Properties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.31.1.124:9092,10.31.1.125:9092,10.31.1.126:9092");//kafka地址,多个地址用逗号分割
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
KafkaProducer kafkaProducer = new KafkaProducer<>(p);
try {
while (true) {
String msg = "Hello," + new Random().nextInt(100);
ProducerRecord record = new ProducerRecord(topic, msg);
kafkaProducer.send(record);
System.out.println("消息发送成功:" + msg);
Thread.sleep(500);
}
} finally {
kafkaProducer.close();
}
}
}
SparkStreaming消费代码
package org.zqs.kafka;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.rdd.RDD;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.*;
import scala.Tuple2;
public class SparkStreaming2 {
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingFromkafka");
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf , Durations.seconds(1));
Map kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "10.31.1.124:9092,10.31.1.125:9092,10.31.1.126:9092");//多个可用ip可用","隔开
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "sparkStreaming");
Collection topics = Arrays.asList("test_20210816_2");//配置topic,可以是数组
JavaInputDStream> javaInputDStream =KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaParams));
JavaPairDStream javaPairDStream = javaInputDStream.mapToPair(new PairFunction, String, String>(){
private static final long serialVersionUID = 1L;
@Override
public Tuple2 call(ConsumerRecord consumerRecord) throws Exception {
return new Tuple2<>(consumerRecord.key(), consumerRecord.value());
}
});
javaPairDStream.foreachRDD(new VoidFunction>() {
@Override
public void call(JavaPairRDD javaPairRDD) throws Exception {
// TODO Auto-generated method stub
javaPairRDD.foreach(new VoidFunction>() {
@Override
public void call(Tuple2 tuple2)
throws Exception {
// TODO Auto-generated method stub
System.out.println(tuple2._2);
}
});
}
});
streamingContext.start();
streamingContext.awaitTermination();
streamingContext.close();
}
}
2.3 测试
-- 打包 mvn package -- 上传文件到服务器 -- 提交到spark集群 spark-submit --class org.zqs.kafka.SparkStreaming2 --master local[2] /home/javaspark/SparkStudy-1.0-SNAPSHOT.jar
生产者要提前运行
参考:- https://spark.apache.org/docs/2.4.0/streaming-kafka-0-10-integration.html
- https://blog.csdn.net/a1361585/article/details/101954816



