转载自 KafkaUtils.createDirectStream()参数详解 - 海贼王一样的男人 - 博客园
通过KafkaUtils.createDirectStream该方法创建kafka的DStream数据源,传入有三个参数:ssc,LocationStrategies,ConsumerStrategies。
LocationStrategies有三种策略:PreferBrokers,PreferConsistent,PreferFixed详情查看上边源码解析
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | @Experimental object LocationStrategies {
@Experimental def PreferBrokers: LocationStrategy = org.apache.spark.streaming.kafka010.PreferBrokers
@Experimental def PreferConsistent: LocationStrategy = org.apache.spark.streaming.kafka010.PreferConsistent
@Experimental def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy = new PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava))
@Experimental def PreferFixed(hostMap: ju.Map[TopicPartition, String]): LocationStrategy = new PreferFixed(hostMap) |
ConsumerStrategies消费者策略:Subscribe,SubscribePattern,Assign,订阅和分配
Subscribe为consumer自动分配partition,有内部算法保证topic-partitions以最优的方式均匀分配给同group下的不同consumer
Assign为consumer手动、显示的指定需要消费的topic-partitions,不受group.id限制,相当于指定的group无效
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 | @Experimental def Subscribe[K, V]( topics: Iterable[jl.String], kafkaParams: collection.Map[String, Object], offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = { new Subscribe[K, V]( new ju.ArrayList(topics.asJavaCollection), new ju.HashMap[String, Object](kafkaParams.asJava), new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava)) } @Experimental def SubscribePattern[K, V]( pattern: ju.regex.Pattern, kafkaParams: collection.Map[String, Object], offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = { new SubscribePattern[K, V]( pattern, new ju.HashMap[String, Object](kafkaParams.asJava), new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava)) } @Experimental def Assign[K, V]( topicPartitions: Iterable[TopicPartition], kafkaParams: collection.Map[String, Object], offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = { new Assign[K, V]( new ju.ArrayList(topicPartitions.asJavaCollection), new ju.HashMap[String, Object](kafkaParams.asJava), new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava)) } |
Cannot resolve overloaded method:
原因:方法中传入的参数不符合要求。检查参数类型



