栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark大作业之FLume+Kafka+SparkStreaming实时处理+logj实时生成日志

Spark大作业之FLume+Kafka+SparkStreaming实时处理+logj实时生成日志

本学期学习了大数据技术之spark,期末大作业就是使用Flume+kafka+SparkStreaming实现实时处理,在这之中有很多奇奇怪怪的问题出现,最终还是艰难的将此实验完成。如果你也刚好在做这个实验,希望能对你有用。有帮助的好希望一键三连哦,持续学习,持续更新

Spark大作业之FLume+Kafka+SparkStreaming实时处理+log4j实时生成日志
    • 前言
    • 实现方法
    • 处理流程分析
  • 实现步骤
    • 1.创建一个Maven项目并创建两个maven模块
    • 2、导入依赖
    • 3、配置log4j.properties
    • 3、配置flume文件
    • 4、程序
      • 4.1、日志生成
      • 4.1、数据分析处理代码
    • 5、执行程序
      • 执行结果
  • 总结


前言

代码开发环境“IDEA”
动态:对每天每个城市的用户浏览的网页IP的数量的统计,将前20输出到MySQL数据库


实现方法

使用一个java程序持续生成log4j日志,采用flume监听获取数据,将获取的数据sink到kafka的topic,sparkStreaming去消费数据,并对数据进行分析统计。

如果你需要实现你自己想要的功能只需要稍稍修改“日志生成程序”和
“处理程序”中的部分代码即可实现更改!!!!

处理流程分析

静态分析:

实时分析:

实现步骤 1.创建一个Maven项目并创建两个maven模块

2、导入依赖
  1. 提交 log4j日志的jar包到logProduct模块的pom.xml文件中

         org.apache.flume.flume-ng-clients
         flume-ng-log4jappender
         1.8.0
 
  1. 做数据处理的spark相关依赖,直接引入到Maven项目框架 的pom.xml中即可

    	
    	    org.apache.spark
    	    spark-core_2.11
    	    2.1.1
    	
    	
    	    mysql
    	    mysql-connector-java
    	    5.1.27
    	
    	
    	    org.apache.spark
    	    spark-sql_2.11
    	    2.1.1
    	
    	
    	
    	    org.apache.spark
    	    spark-hive_2.11
    	    2.1.1
    	
    	
    	
    	
    	    org.apache.spark
    	    spark-streaming-kafka-0-10_2.11
    	    2.1.1
    	
    	
    	
    	    org.apache.spark
    	    spark-streaming_2.11
    	    2.1.1
    	
    	
    	
    	    org.scala-lang
    	    scala-library
    	    2.11.8
    	
    	
    	
    	    org.apache.spark
    	    spark-streaming-flume_2.11
    	    2.1.1
    	
    
3、配置log4j.properties

此处主要针对LogProduct模块

log4j.rootLogger=INFO,stdout,flume

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
#注意修改为你自己flume监听的服务器主机名和端口号
log4j.appender.flume.Hostname = ethan002
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true
3、配置flume文件

在flume的安装目录下新建一个目录jobs
mkdir jobs
在jobs目录下新建一个文件a3.conf(名字自定,以**.conf**结尾)
在文件中输入一下内容:

a1.sources=r1
a1.sinks=k1
a1.channels=c1

#Describe/configure the source
a1.sources.r1.type=avro
a1.sources.r1.bind=ethan002
a1.sources.r1.port=41414

#define sink
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
#你的kafka中有的topic,若无则创建
a1.sinks.k1.kafka.topic=test
#你的主机名
a1.sinks.k1.kafka.bootstrap.servers=ethan002:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 100
##kafka消息生产的大小限制
a1.sinks.k1.kafka.producer.max.request.size=51200000

# Use a channel which buffers events in memory
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
# Bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
4、程序 4.1、日志生成

代码如下:

package com.ethan.finalWork;
import org.apache.log4j.Logger;
public class LogProduct {
    private static Logger logger = Logger.getLogger(LogProduct.class.getName());
    public static void main(String[] args) throws InterruptedException {
        while(true) {
            Thread.sleep(5000);
            for (int i = 0; i < 1000; i++) {
            StringBuilder sb = new StringBuilder();
            String date = timeGen();
            sb.append(date)
                    .append("_")
                    .append(userIdGen()) // 用户ID
                    .append("_")
                    .append(sessionIdGen()) // sessionId
                    .append("_")
                    .append(date + " " + timeStampGen())
                    .append("_")
                    .append(pageId()) // 页面ID
                    .append("_")
                    .append(keyWorld()) // 搜索关键字
                    .append("_")
                    .append(cIdGen())  // 点击品类ID
                    .append("_")
                    .append(productIdGen())  // 产品ID
                    .append("_")
                    .append(orderIdsGen()) // 下单品类ID
                    .append("_")
                    .append(cIdGen()) // 下单产品ID
                    .append("_")
                    .append(orderIdsGen()) // 支付品类Ids
                    .append("_")
                    .append(cIdGen()) // 产品ids
                    .append("_")
                    .append(cityIdsGen());  // 城市Id

            System.out.println(sb.toString());
            logger.info(sb);
            }
        }
    }
    public static String userIdGen() {
        return  (String)((int)(Math.random() * (1000000 - 1) + 1)+"");
    }
    public static String sessionIdGen(){
        int n = (int)(Math.random() * (1000000 - 1) + 1);
        String str = n+"-"+(int)(Math.random()*123*Math.random()*1000);
        return str;
    }
    public static String pageId(){
        return (String) ((int)(Math.random() * (1200 - 1000) + 1000)+"");
    }
    private static String timeGen() {
        int year = (int)(Math.random() * (2021 - 2010 + 1) + 2010);
        int month = (int)(Math.random() * (12 - 1 + 1) + 1);
        int day = (int)(Math.random() * (31 - 1 + 1) + 1);
        return (2021 + "-" + 12 + "-" + 12);
    }
    private static String timeStampGen(){
        int hour = (int)(Math.random() * (24 - 1 + 1) + 1);
        int munite  = (int)(Math.random() * (60 - 1 + 1) + 1);
        int second = (int)(Math.random() * (60 - 1 + 1) + 1);
        return (hour + "-" + munite + "-" + second);
    }
    public static String keyWorld(){
        String[] kw ={"手机","数码","服装","篮球","文具","食品","学习资料","工具","餐饮用具"};
        int value = (int)(Math.random() * 9);
        String str = kw[value];
        return str;
    }
    public static String cIdGen() {
        return (String)((int)(Math.random() * (10000 - 1) + 1)+"");
    }
    public static String  productIdGen(){
        return (String)((int)(Math.random() * (100000 - 100) + 100)+"");
    }
    public static String orderIdsGen(){
        int r = (int)(Math.random() * (3 - 1) + 2);
        String ret = "";
        for (int i =1;i 
4.1、数据分析处理代码 

代码如下:

package com.ethan.online

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}

object PageTop {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("PageTop")
    val streamingContext = new StreamingContext(sparkConf, Seconds(10))
    streamingContext.sparkContext.setLogLevel("ERROR")
    streamingContext.checkpoint("hdfs://ethan001:9000/spark/checkpoint")

    val kafkaParams = Map[String,Object](
      "bootstrap.servers" -> "ethan002:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("test", "knf")
    val stream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams) //订阅主题
    )

    val mapDStream: DStream[(String, String)] = stream.map(record => (record.key, record.value)) //转换格式

    val resultDS:DStream[(String,Int)] = mapDStream.map(rdd => {
      val fields = rdd._2.split("_")
      //获取日期
      val str = fields(0).trim
      //获取城市
      val city: String = fields(12).trim
      //获取页面
      val page: String = fields(4).trim
      //封装为元组
      (str + "_" + city + "_" + page, 1)
    })


    //每天没城市页面点击数进行聚合(天_城市_页面ID,sum)
    val updateDS:DStream[(String,Int)]=resultDS.updateStateByKey(
      (seq:Seq[Int],buffer:Option[Int])=>{
        Option(seq.sum+buffer.getOrElse(0))
      })

    updateDS.foreachRDD(rdd=>{
      val temp = rdd.sortBy(_._2,false).take(20)
        val url = "jdbc:mysql://localhost:3306/finalwork?useUnicode=true&characterEncoding=UTF-8"
        val user = "root"
        val password = "123456"
        Class.forName("com.mysql.jdbc.Driver").newInstance()
        //截断数据表,将数据表原有的数据进行删除
        var conn1: Connection = DriverManager.getConnection(url,user,password)
        val sql1 = "truncate table online"
        var stmt1 : PreparedStatement = conn1.prepareStatement(sql1)
        stmt1.executeUpdate()
        conn1.close()
        temp.foreach(
          data=>{
            val fields = data._1.split("_")
            val conn2: Connection = DriverManager.getConnection(url,user,password)
            val sql2 = "insert into online(date,cityName,pageId,sum) values(?,?,?,?)"
            val stmt2 : PreparedStatement = conn2.prepareStatement(sql2)
            stmt2.setString(1,fields(0).toString)
            stmt2.setString(2,fields(1).toString)
            stmt2.setString(3,fields(2).toString)
            stmt2.setInt(4,data._2.toInt)
            stmt2.executeUpdate()
            conn2.close()
          })

      })


  updateDS.foreachRDD(rdd=>{
     rdd.sortBy(_._2,false).take(20).foreach(println)
   })

    streamingContext.start()
    streamingContext.awaitTermination()

  }
}

5、执行程序

1、启动hadoop
因为我们实时处理需要检查点,我们通常设置在hdfs上

2、启动zookeeper

bin/zkServer.sh start
注意:如果是集群的话所有的主机都需要启动

3、启动kafka

bin/kafka-server-start.sh -daemon config/server.properties
注意:如果是集群的话所有的主机都需要启动

4、启动flume

bin/flume-ng agent -n a1 -c conf/ -f jobs/flume-kafka-sparkstreaming.conf -Dflume.root.logger=INFO,console

5、启动日志生成程序
6、启动sparkStreaming程序


执行结果

日志生成

数据处理

总结

主要注意:
1、日志是否能被flume监听到
2、flume是否能将数据sink到kafka的topic,启动消费者程序查看是否有数据可验证
3、sparkStreaming能否连接到kafka并且消费topic中的数据
4、数据是否能存储到MySQL中

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/677209.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号