注意:文章末尾提供apache-flume-1.6.0-cdh5.10.1-bin 网盘资源连接
1、flume配置文件 flume-conf-spark-netcat-pull.properties
a1.sources = r1 a1.sinks = k1 a1.channels = c1 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 5140 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 1000 a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink a1.sinks.k1.hostname = 0.0.0.0 a1.sinks.k1.port = 5141 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
2、启动flume
cd /usr/local/src/app/apache-flume-1.6.0-cdh5.10.1-bin/conf bin/flume-ng agent -c conf/ -f conf/flume-conf-spark-netcat-pull.properties -n a1 -Dflume.root.logger=INFO,console
3、java工程部分pom文件
UTF-8 1.7 1.7 2.11.8 0.9.0.0 2.2.0 2.6.0-cdh5.7.0 1.2.0-cdh5.7.0 cloudera https://repository.cloudera.com/artifactory/cloudera-repos org.scala-lang scala-library ${scala.version} org.apache.spark spark-streaming_2.11 ${spark.version} org.apache.spark spark-streaming-flume_2.11 ${spark.version} org.apache.spark spark-streaming-flume-sink_2.11 ${spark.version} org.apache.spark spark-streaming-kafka-0-8_2.11 ${spark.version} org.apache.commons commons-lang3 3.5 org.apache.spark spark-sql_2.11 ${spark.version} com.fasterxml.jackson.module jackson-module-scala_2.11 2.6.5 net.jpountz.lz4 lz4 1.3.0 org.apache.flume.flume-ng-clients flume-ng-log4jappender 1.6.0 org.apache.avro avro 1.7.4
4、JAVA代码
public class SparkFlumeUpdateStateTest {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[3]").setAppName("app");
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(conf, Durations.seconds(5));
javaStreamingContext.checkpoint(".");
//初始化
List> tuples = Arrays.asList(new Tuple2("start", 1));
JavaPairRDD initialRDD = javaStreamingContext.sparkContext().parallelizePairs(tuples);
JavaReceiverInputDStream flumeStream =
FlumeUtils.createPollingStream(javaStreamingContext, "IP地址", 5141);
JavaPairDStream pairDStream = flumeStream.map(item -> new String(item.event().getBody().array()).trim()).mapToPair(s -> new Tuple2<>(s, 1));
Function3, State, Tuple2> mappingFunc =
(word, one, state) -> {
int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
Tuple2 output = new Tuple2<>(word, sum);
state.update(sum);
return output;
};
JavaMapWithStateDStream> stateDstream =
pairDStream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD));
stateDstream.print();
stateDstream.stateSnapshots().print();
javaStreamingContext.start();
try {
javaStreamingContext.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
5、IDEA开发工具需要配置Scala环境
6、flume-1.6.0-cdh5.10.1 网盘资源地址
链接: https://pan.baidu.com/s/1td4z5dIWfkaDnT28loj8HA 提取码: 1ou2



