栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

KafkaUtils.createDirectStream()参数详解

KafkaUtils.createDirectStream()参数详解

转载自 KafkaUtils.createDirectStream()参数详解 - 海贼王一样的男人 - 博客园

通过KafkaUtils.createDirectStream该方法创建kafka的DStream数据源,传入有三个参数:ssc,LocationStrategies,ConsumerStrategies。

LocationStrategies有三种策略:PreferBrokers,PreferConsistent,PreferFixed详情查看上边源码解析

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

@Experimental

object LocationStrategies {

  

  @Experimental

  def PreferBrokers: LocationStrategy =

    org.apache.spark.streaming.kafka010.PreferBrokers

  

  @Experimental

  def PreferConsistent: LocationStrategy =

    org.apache.spark.streaming.kafka010.PreferConsistent

  

  @Experimental

  def PreferFixed(hostMap: collection.Map[TopicPartition, String]): LocationStrategy =

    new PreferFixed(new ju.HashMap[TopicPartition, String](hostMap.asJava))

  

  @Experimental

  def PreferFixed(hostMap: ju.Map[TopicPartition, String]): LocationStrategy =

    new PreferFixed(hostMap)

ConsumerStrategies消费者策略:Subscribe,SubscribePattern,Assign,订阅和分配

  Subscribe为consumer自动分配partition,有内部算法保证topic-partitions以最优的方式均匀分配给同group下的不同consumer

  Assign为consumer手动、显示的指定需要消费的topic-partitions,不受group.id限制,相当于指定的group无效

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

  @Experimental

  def Subscribe[K, V](

      topics: Iterable[jl.String],

      kafkaParams: collection.Map[String, Object],

      offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {

    new Subscribe[K, V](

      new ju.ArrayList(topics.asJavaCollection),

      new ju.HashMap[String, Object](kafkaParams.asJava),

      new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))

  }

  @Experimental

  def SubscribePattern[K, V](

      pattern: ju.regex.Pattern,

      kafkaParams: collection.Map[String, Object],

      offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {

    new SubscribePattern[K, V](

      pattern,

      new ju.HashMap[String, Object](kafkaParams.asJava),

      new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))

  }

  @Experimental

  def Assign[K, V](

      topicPartitions: Iterable[TopicPartition],

      kafkaParams: collection.Map[String, Object],

      offsets: collection.Map[TopicPartition, Long]): ConsumerStrategy[K, V] = {

    new Assign[K, V](

      new ju.ArrayList(topicPartitions.asJavaCollection),

      new ju.HashMap[String, Object](kafkaParams.asJava),

      new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(l => new jl.Long(l)).asJava))

  }

 Cannot resolve overloaded method:

  原因:方法中传入的参数不符合要求。检查参数类型

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/708003.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号