Spark大作业之FLume+Kafka+SparkStreaming实时处理+log4j实时生成日志本学期学习了大数据技术之spark,期末大作业就是使用Flume+kafka+SparkStreaming实现实时处理,在这之中有很多奇奇怪怪的问题出现,最终还是艰难的将此实验完成。如果你也刚好在做这个实验,希望能对你有用。有帮助的好希望一键三连哦,持续学习,持续更新
- 前言
- 实现方法
- 处理流程分析
- 实现步骤
- 1.创建一个Maven项目并创建两个maven模块
- 2、导入依赖
- 3、配置log4j.properties
- 3、配置flume文件
- 4、程序
- 4.1、日志生成
- 4.1、数据分析处理代码
- 5、执行程序
- 执行结果
- 总结
前言
代码开发环境“IDEA”
动态:对每天每个城市的用户浏览的网页IP的数量的统计,将前20输出到MySQL数据库
实现方法
使用一个java程序持续生成log4j日志,采用flume监听获取数据,将获取的数据sink到kafka的topic,sparkStreaming去消费数据,并对数据进行分析统计。
如果你需要实现你自己想要的功能只需要稍稍修改“日志生成程序”和
“处理程序”中的部分代码即可实现更改!!!!
静态分析:
实时分析:
- 提交 log4j日志的jar包到logProduct模块的pom.xml文件中
org.apache.flume.flume-ng-clients flume-ng-log4jappender 1.8.0
-
做数据处理的spark相关依赖,直接引入到Maven项目框架 的pom.xml中即可
org.apache.spark spark-core_2.112.1.1 mysql mysql-connector-java5.1.27 org.apache.spark spark-sql_2.112.1.1 org.apache.spark spark-hive_2.112.1.1 org.apache.spark spark-streaming-kafka-0-10_2.112.1.1 org.apache.spark spark-streaming_2.112.1.1 org.scala-lang scala-library2.11.8 org.apache.spark spark-streaming-flume_2.112.1.1
此处主要针对LogProduct模块
log4j.rootLogger=INFO,stdout,flume
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
#注意修改为你自己flume监听的服务器主机名和端口号
log4j.appender.flume.Hostname = ethan002
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true
3、配置flume文件
在flume的安装目录下新建一个目录jobs
mkdir jobs
在jobs目录下新建一个文件a3.conf(名字自定,以**.conf**结尾)
在文件中输入一下内容:
a1.sources=r1 a1.sinks=k1 a1.channels=c1 #Describe/configure the source a1.sources.r1.type=avro a1.sources.r1.bind=ethan002 a1.sources.r1.port=41414 #define sink a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink #你的kafka中有的topic,若无则创建 a1.sinks.k1.kafka.topic=test #你的主机名 a1.sinks.k1.kafka.bootstrap.servers=ethan002:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 100 ##kafka消息生产的大小限制 a1.sinks.k1.kafka.producer.max.request.size=51200000 # Use a channel which buffers events in memory a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 # Bind the source and sink to the channel a1.sources.r1.channels=c1 a1.sinks.k1.channel=c14、程序 4.1、日志生成
代码如下:
package com.ethan.finalWork;
import org.apache.log4j.Logger;
public class LogProduct {
private static Logger logger = Logger.getLogger(LogProduct.class.getName());
public static void main(String[] args) throws InterruptedException {
while(true) {
Thread.sleep(5000);
for (int i = 0; i < 1000; i++) {
StringBuilder sb = new StringBuilder();
String date = timeGen();
sb.append(date)
.append("_")
.append(userIdGen()) // 用户ID
.append("_")
.append(sessionIdGen()) // sessionId
.append("_")
.append(date + " " + timeStampGen())
.append("_")
.append(pageId()) // 页面ID
.append("_")
.append(keyWorld()) // 搜索关键字
.append("_")
.append(cIdGen()) // 点击品类ID
.append("_")
.append(productIdGen()) // 产品ID
.append("_")
.append(orderIdsGen()) // 下单品类ID
.append("_")
.append(cIdGen()) // 下单产品ID
.append("_")
.append(orderIdsGen()) // 支付品类Ids
.append("_")
.append(cIdGen()) // 产品ids
.append("_")
.append(cityIdsGen()); // 城市Id
System.out.println(sb.toString());
logger.info(sb);
}
}
}
public static String userIdGen() {
return (String)((int)(Math.random() * (1000000 - 1) + 1)+"");
}
public static String sessionIdGen(){
int n = (int)(Math.random() * (1000000 - 1) + 1);
String str = n+"-"+(int)(Math.random()*123*Math.random()*1000);
return str;
}
public static String pageId(){
return (String) ((int)(Math.random() * (1200 - 1000) + 1000)+"");
}
private static String timeGen() {
int year = (int)(Math.random() * (2021 - 2010 + 1) + 2010);
int month = (int)(Math.random() * (12 - 1 + 1) + 1);
int day = (int)(Math.random() * (31 - 1 + 1) + 1);
return (2021 + "-" + 12 + "-" + 12);
}
private static String timeStampGen(){
int hour = (int)(Math.random() * (24 - 1 + 1) + 1);
int munite = (int)(Math.random() * (60 - 1 + 1) + 1);
int second = (int)(Math.random() * (60 - 1 + 1) + 1);
return (hour + "-" + munite + "-" + second);
}
public static String keyWorld(){
String[] kw ={"手机","数码","服装","篮球","文具","食品","学习资料","工具","餐饮用具"};
int value = (int)(Math.random() * 9);
String str = kw[value];
return str;
}
public static String cIdGen() {
return (String)((int)(Math.random() * (10000 - 1) + 1)+"");
}
public static String productIdGen(){
return (String)((int)(Math.random() * (100000 - 100) + 100)+"");
}
public static String orderIdsGen(){
int r = (int)(Math.random() * (3 - 1) + 2);
String ret = "";
for (int i =1;i
4.1、数据分析处理代码
代码如下:
package com.ethan.online
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
object PageTop {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("PageTop")
val streamingContext = new StreamingContext(sparkConf, Seconds(10))
streamingContext.sparkContext.setLogLevel("ERROR")
streamingContext.checkpoint("hdfs://ethan001:9000/spark/checkpoint")
val kafkaParams = Map[String,Object](
"bootstrap.servers" -> "ethan002:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("test", "knf")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams) //订阅主题
)
val mapDStream: DStream[(String, String)] = stream.map(record => (record.key, record.value)) //转换格式
val resultDS:DStream[(String,Int)] = mapDStream.map(rdd => {
val fields = rdd._2.split("_")
//获取日期
val str = fields(0).trim
//获取城市
val city: String = fields(12).trim
//获取页面
val page: String = fields(4).trim
//封装为元组
(str + "_" + city + "_" + page, 1)
})
//每天没城市页面点击数进行聚合(天_城市_页面ID,sum)
val updateDS:DStream[(String,Int)]=resultDS.updateStateByKey(
(seq:Seq[Int],buffer:Option[Int])=>{
Option(seq.sum+buffer.getOrElse(0))
})
updateDS.foreachRDD(rdd=>{
val temp = rdd.sortBy(_._2,false).take(20)
val url = "jdbc:mysql://localhost:3306/finalwork?useUnicode=true&characterEncoding=UTF-8"
val user = "root"
val password = "123456"
Class.forName("com.mysql.jdbc.Driver").newInstance()
//截断数据表,将数据表原有的数据进行删除
var conn1: Connection = DriverManager.getConnection(url,user,password)
val sql1 = "truncate table online"
var stmt1 : PreparedStatement = conn1.prepareStatement(sql1)
stmt1.executeUpdate()
conn1.close()
temp.foreach(
data=>{
val fields = data._1.split("_")
val conn2: Connection = DriverManager.getConnection(url,user,password)
val sql2 = "insert into online(date,cityName,pageId,sum) values(?,?,?,?)"
val stmt2 : PreparedStatement = conn2.prepareStatement(sql2)
stmt2.setString(1,fields(0).toString)
stmt2.setString(2,fields(1).toString)
stmt2.setString(3,fields(2).toString)
stmt2.setInt(4,data._2.toInt)
stmt2.executeUpdate()
conn2.close()
})
})
updateDS.foreachRDD(rdd=>{
rdd.sortBy(_._2,false).take(20).foreach(println)
})
streamingContext.start()
streamingContext.awaitTermination()
}
}
5、执行程序
1、启动hadoop
因为我们实时处理需要检查点,我们通常设置在hdfs上
2、启动zookeeper
bin/zkServer.sh start
注意:如果是集群的话所有的主机都需要启动
3、启动kafka
bin/kafka-server-start.sh -daemon config/server.properties
注意:如果是集群的话所有的主机都需要启动
4、启动flume
bin/flume-ng agent -n a1 -c conf/ -f jobs/flume-kafka-sparkstreaming.conf -Dflume.root.logger=INFO,console
5、启动日志生成程序
6、启动sparkStreaming程序
执行结果
日志生成
数据处理
总结
主要注意:
1、日志是否能被flume监听到
2、flume是否能将数据sink到kafka的topic,启动消费者程序查看是否有数据可验证
3、sparkStreaming能否连接到kafka并且消费topic中的数据
4、数据是否能存储到MySQL中



