栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

spark-streaming pull方式拉取 flume-1.6.0-cdh5.10.1数据

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

spark-streaming pull方式拉取 flume-1.6.0-cdh5.10.1数据

注意:文章末尾提供apache-flume-1.6.0-cdh5.10.1-bin 网盘资源连接
1、flume配置文件 flume-conf-spark-netcat-pull.properties

a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5140
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 1000
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = 0.0.0.0
a1.sinks.k1.port = 5141
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2、启动flume

cd /usr/local/src/app/apache-flume-1.6.0-cdh5.10.1-bin/conf
bin/flume-ng agent -c conf/ -f conf/flume-conf-spark-netcat-pull.properties -n a1 -Dflume.root.logger=INFO,console

3、java工程部分pom文件

  
    UTF-8
    1.7
    1.7
    2.11.8
    0.9.0.0
    2.2.0
    2.6.0-cdh5.7.0
    1.2.0-cdh5.7.0
  
  
    
      cloudera
      https://repository.cloudera.com/artifactory/cloudera-repos
    
  
  
    
      org.scala-lang
      scala-library
      ${scala.version}
    

    
    
      org.apache.spark
      spark-streaming_2.11
      ${spark.version}
    

    
    
      org.apache.spark
      spark-streaming-flume_2.11
      ${spark.version}
    

    
      org.apache.spark
      spark-streaming-flume-sink_2.11
      ${spark.version}
    

    
      org.apache.spark
      spark-streaming-kafka-0-8_2.11
      ${spark.version}
    

    
      org.apache.commons
      commons-lang3
      3.5
    

    
    
      org.apache.spark
      spark-sql_2.11
      ${spark.version}
    


    
      com.fasterxml.jackson.module
      jackson-module-scala_2.11
      2.6.5
    

    
      net.jpountz.lz4
      lz4
      1.3.0
    

    
      org.apache.flume.flume-ng-clients
      flume-ng-log4jappender
      1.6.0
    
    
      org.apache.avro
      avro
      1.7.4
    
  

4、JAVA代码

public class SparkFlumeUpdateStateTest {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[3]").setAppName("app");
        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(conf, Durations.seconds(5));
        javaStreamingContext.checkpoint(".");
        //初始化

        List> tuples = Arrays.asList(new Tuple2("start", 1));
        JavaPairRDD initialRDD = javaStreamingContext.sparkContext().parallelizePairs(tuples);


        JavaReceiverInputDStream flumeStream =
                FlumeUtils.createPollingStream(javaStreamingContext, "IP地址", 5141);
        JavaPairDStream pairDStream = flumeStream.map(item -> new String(item.event().getBody().array()).trim()).mapToPair(s -> new Tuple2<>(s, 1));
        Function3, State, Tuple2> mappingFunc =
                (word, one, state) -> {
                    int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
                    Tuple2 output = new Tuple2<>(word, sum);
                    state.update(sum);
                    return output;
                };

        JavaMapWithStateDStream> stateDstream =
                pairDStream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));

        stateDstream.print();
        stateDstream.stateSnapshots().print();
        javaStreamingContext.start();
        try {
            javaStreamingContext.awaitTermination();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

5、IDEA开发工具需要配置Scala环境


6、flume-1.6.0-cdh5.10.1 网盘资源地址
链接: https://pan.baidu.com/s/1td4z5dIWfkaDnT28loj8HA 提取码: 1ou2

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/841910.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号