栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【大数据安全分析】---- Flink 实时流计算

【大数据安全分析】---- Flink 实时流计算

参考引用

flink 体系化总结 https://www.yuque.com/wangzhiwuimportbigdata/ihf2lk/ka2x0h#Aq72c

大数据成神之路 https://github.com/wangzhiwubigdata/God-Of-BigData

大数据方向学习指南 https://blog.csdn.net/u013411339/article/details/118643213

五分钟学大数据 https://www.cnblogs.com/itlz/p/14356708.html

背景知识 实时计算的三个特征:

无限数据:无限数据指的是一种不断增长的,基本上无限的数据集。这些通常被称为 流数据,而与之相对的是有限的数据集。
无界数据处理:一种持续的数据处理模式,能够通过处理引擎重复的去处理上面的无限数据,是能够突破有限数据处理引擎的瓶颈的。
低延迟:延迟是多少并没有明确的定义。但我们都知道数据的价值将随着时间的流逝降低,时效性将是需要持续解决的问题。

分布式事务-两阶段提交协议(2PC)

两阶段提交协议(Two-Phase Commit,2PC)是很常用的解决分布式事务问题的方式,它可以保证在分布式事务中,要么所有参与进程都提交事务,要么都取消,即实现 ACID 中的 A (原子性)

在数据一致性的环境下,其代表的含义是:要么所有备份数据同时更改某个数值,要么都不改,以此来达到数据的强一致性。

两阶段提交协议中有两个重要角色,协调者(Coordinator)和参与者(Participant),其中协调者只有一个,起到分布式事务的协调管理作用,参与者有多个。

顾名思义,两阶段提交将提交过程划分为连续的两个阶段:表决阶段(Voting)和提交阶段(Commit)。

两阶段提交协议过程如下图所示:

第一阶段:表决阶段

    协调者向所有参与者发送一个 VOTE_REQUEST 消息。当参与者接收到 VOTE_REQUEST 消息,向协调者发送VOTE_COMMIT 消息作为回应,告诉协调者自己已经做好准备提交准备,如果参与者没有准备好或遇到其他故障,就返回一个VOTE_ABORT 消息,告诉协调者目前无法提交事务。

第二阶段:提交阶段

    协调者收集来自各个参与者的表决消息。如果**所有参与者一致认为可以提交事务,那么协调者决定事务的最终提交,在此情形下协调者向所有参与者发送一个 GLOBAL_COMMIT 消息,通知参与者进行本地提交;如果所有参与者中有任意一个返回消息是 VOTE_ABORT,协调者就会取消事务**,向所有参与者广播一条 GLOBAL_ABORT 消息通知所有的参与者取消事务。每个提交了表决信息的参与者等候协调者返回消息,如果参与者接收到一个 GLOBAL_COMMIT 消息,那么参与者提交本地事务,否则如果接收到 GLOBAL_ABORT 消息,则参与者取消本地事务。
Flink 核心介绍

核心理念:Apache Flink 是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理**

容错机制 Checkpoint

Checkpoint 机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性。Flink的Checkpoint机制原理来自 Chandy-Lamport algorithm算法。

Flink Checkpoint 的两阶段实现

    beginTransaction,开启一个事务,在临时目录中创建一个临时文件,之后写入数据到该文件中。此过程为不同的事务创建隔离,避免数据混淆。preCommit。预提交阶段。将缓存数据块写出到创建的临时文件,然后关闭该文件,确保不再写入新数据到该文件,同时开启一个新事务,执行属于下一个检查点的写入操作。commit。在提交阶段,以原子操作的方式将上一阶段的文件写入真正的文件目录下。如果提交失败,Flink应用会重启,并调用TwoPhaseCommitSinkFunction#recoverAndCommit方法尝试恢复并重新提交事务。abort。一旦终止事务,删除临时文件。
状态管理 State

进行有状态的计算是 Flink 最重要的特性之一。所谓的状态,其实指的是 Flink 程序的中间计算结果。Flink 支持了不同类型的状态,并且针对状态的持久化还提供了专门的机制和状态管理器。

Keyed States:记录每个Key对应的状态值一个Task上可能包含多个Key不同Task上不会出现相同的Key ,常用的 MapState, ValueState

Operator States:记录每个Task对应的状态值数据类型

State-Keyed State

基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,都对应一个state,比如:stream.keyBy(…)。KeyBy之后的Operator State,可以理解为分区过的Operator State。

保存state的数据结构:

ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值。

ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值。

ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值。

MapState:即状态值为一个map。用户通过put或putAll方法添加元素。

需要注意的是,以上所述的State对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄

State-Operator State

与Key无关的State,与Operator绑定的state,整个operator只对应一个state。

举例来说,Flink中的 Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射。

Broadcast State

Broadcast State 是 Flink 1.5 引入的新特性。在开发过程中,如果遇到需要下发/广播配置、规则等低吞吐事件流到下游所有 task 时,就可以使用 Broadcast State 特性。下游的 task 接收这些配置、规则并保存为 BroadcastState, 将这些配置应用到另一个数据流的计算中 。

class MyBroadcastProcessImpl[KS, IN1, IN2, OUT] extends                                                                 BroadcastProcessFunction[IN1, IN2, OUT] {
  // 数据流处理部分
  override def processElement(in1: IN1,
                    ctx: BroadcastProcessFunction[IN1, IN2, OUT]#ReadOnlyContext,
                    collector: Collector[OUT]): Unit = ???
  // 广播流处理部分
  override def processBroadcastElement(in2: IN2,
                    ctx: BroadcastProcessFunction[IN1, IN2, OUT]#Context,
                    collector: Collector[OUT]): Unit = ???
}

过期时间 TTL 功能的用法

val ttlConfig: StateTtlConfig = StateTtlConfig
      .newBuilder(Time.seconds(60))
      .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
      .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
      .build
val stateDescriptor = new ValueStateDescriptor[String]("testState", 															    classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)
时间概念 Time

Flink 支持了流处理程序在时间上的三个定义:事件时间 EventTime 、摄入时间 IngestionTime 、处理时间 ProcessingTime

EventTime :是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。IngestionTime :是数据进入Flink的时间。ProcessingTime:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。

使用方式如下:

 val env = StreamExecutionEnvironment.getExecutionEnvironrnent()
 // 使用处理时间
 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) 
 // 使用摄入时间
 env.setStrearnTimeCharacteristic(TimeCharacteristic.IngestionTime)
 // 使用事件时间
 env.setStrearnTimeCharacteristic(TimeCharacteristic.EventTime)

延迟的数据Flink也有自己的解决办法,主要的办法是给定一个允许延迟的时间,在该时间范围内仍可以接受处理延迟数据:

设置允许延迟的时间是通过 allowedLateness(lateness: Time)设置保存延迟数据则是通过 sideOutputLateData(outputTag: OutputTag[T])保存获取延迟数据是通过 DataStream.getSideOutput(tag: OutputTag[X])获取

Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用 Watermark 机制结合 Window 来实现。

数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了,因此,Window 的执行也是由 Watermark 触发的。

WaterMark + EventTimeWindow + Allowed Lateness方案

窗口机制 Windows

流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而window是一种切割无限数据为有限块进行处理的手段。

划分窗口就两种方式:

    根据时间进行截取(time-driven-window),比如每1分钟统计一次或每10分钟统计一次。根据数据进行截取(data-driven-window),比如每5个数据统计一次或每50个数据统计一次。
时间窗口类型

滚动窗口(Tumbling Windows)
将数据依据固定的窗口长度对数据进行切片。


适用场景: 基于时间段、时间片的各种统计指标

滑动窗口(Sliding Windows)
滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。

适用场景: 对最近一个时间段内的统计(求某接口最近5min的失败率)来决定是否要报警

会话窗口(Session Windows)
由一系列事件组合一个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。

适用场景: 基于session 的概念,适合一些类似UEBA 类行为分析

编程案例 Flink 标准编程模型

Flink 应用程序主要由三部分组成,**源 Source ** 、转换 transformation 、**目的地 sink ** 。这些流式 dataflows 形成了有向图,以一个或多个源(source)开始,并以一个或多个目的地(sink)结束。

Source: 数据源,Flink 在流处理和批处理上的 source 大概有 4 类:基于本地集合的 source、基于文件的 source、基于网络套接字的 source、自定义的 source。自定义的 source 常见的有 Apache kafka、RabbitMQ 等,当然你也可以定义自己的 source。

Transformation:数据转换的各种操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select 等,操作很多,可以将数据转换计算成你想要的数据。

Sink:接收器,Flink 将转换计算后的数据发送的地点 ,你可能需要存储下来,Flink 常见的 Sink 大概有如下几类:写入文件、打印出来、写入 socket 、自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定义自己的 sink。

统计分析
  val conf: Configuration = new Configuration()
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
//    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val kafkaStream: DataStream[JSONObject] = getKafkaStream(env, jobParams)

...

    flowDataStream.keyBy(_.upstreamAddr)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
      .process(new FlowBeanWindowsCalc())
      .flatMap(_.toList)
      .addSink(sensitiveEsSinkBuilder.build()).setParallelism(1)

...

class FlowBeanWindowsCalc 
    extends ProcessWindowFunction[FlowBean, Array[String],String, TimeWindow] {

  override def process(upStreamAddr: String,
                       context: Context, 
                       elements: Iterable[FlowBean],
                       out: Collector[Array[String]]) = {

    val flowRelationArray = ArrayBuffer[String]()
    val timeSlot = context.window.getEnd
    val keyByRemoteAddr = elements.toList.groupBy(_.remoteAddr)
    keyByRemoteAddr.foreach {
      case (remoteAddr, flowBeanLists) => {
        val upFlowJson = new mutable.HashMap[String,Any]()

        val timeList = flowBeanLists.map(_.timestamp)
        val startTime = timeList.min
        val endTime = timeList.max
        val visit = flowBeanLists.size

        val statusCount = flowBeanLists.map(f=>(f.status,1)).groupBy(_._1).mapValues(_.size)
        
        val uriStatistics =  ArrayBuffer[mutable.HashMap[String,Any]]()
        flowBeanLists.groupBy(_.uri).foreach {
          case (uri, byUriFlowInfo) => {

                  val hasSensitive = byUriFlowInfo.exists(_.hasSenstive)
                  val uriStatusCode = byUriFlowInfo.map(f=>(f.status,1)).groupBy(_._1).mapValues(_.size)
                  val requestTime = byUriFlowInfo.map(_.requestTime)

                  val slideSize = scala.math.ceil(byUriFlowInfo.size / 10.0).toInt

                  val lag =  Map("max"->requestTime.max,
                                 "min"->requestTime.min,
                                 "avg"->requestTime.sum / requestTime.size)

                  val lagTrend = requestTime.grouped(slideSize).map(_.sum).toArray

                  val visit = byUriFlowInfo.size
                  val visitTrend = byUriFlowInfo.map(_.visit).grouped(slideSize).map(_.sum).toArray

            val requestTraffic = byUriFlowInfo.map(_.requestTrafficSize)
            val requestTrafficTotal = requestTraffic.sum

            val requestTrafficTrend = requestTraffic.grouped(slideSize).map(r=>r.sum/r.size).toArray

            val responseTraffic = byUriFlowInfo.map(_.responseTrafficSize)
            val responseTrafficTotal = responseTraffic.sum
            val responseTrafficTrend = responseTraffic.grouped(slideSize).map(r=>r.sum/r.size).toArray

            val uriType = byUriFlowInfo.head.uriType
          }
        }
       ...
       
        upFlowJson.put("uri_statistics",uriStatistics.toArray)
        upFlowJson.put("src_node",remoteAddr)
        upFlowJson.put("dst_node",upStreamAddr)

        flowRelationArray.append(Json(DefaultFormats).write(upFlowJson))
      }
    }
    out.collect(flowRelationArray.toArray)
  }
}
标准 CEP
val cepPattern = Pattern
      //第一步: 外网连接 135 端口
      .begin[JSONObject]("inbound135", AfterMatchSkipStrategy.skipPastLastEvent)
      .where { (value, ctx) => {
          val networkCondition =                             
              JsonPathUtils.readValue(value,"network.direction") == "inbound"
          val portCondition = 
                  JsonPathUtils.readValue(value,"destination.port") == 135
        newworkCondition && portCondition
      }
      // 第二步: Administrator 账号登录成功,同时登录IP是上一步的ip 
      .followedBy("loginSuccess").where((value, ctx) => {
        val eventCondition = 
           JsonPathUtils.readValue(value,"winlog.eventId") == 4624
        val sourceIp = JsonPathUtils.readValue(value,"source.ip")
      
        val inbound135 = ctx.getEventsForPattern("inbound135")
        var sourceIpCondition = inbound135.exist(cahedData => {
          JsonPathUtils.readValue(cahedData,"source.ip") == sourceIp
        })
        val userCondition = 
          value.getJSONObject("user").getString("name") == "Administrator"
        eventCondition && sourceIpCondition && userCondition
      })
      //第三步 有进程创建行为,且logonId 为 第二步 操作所产生
      .followedBy("traceProcess").where((value, ctx) => {
        val eventCondition = JsonPathUtils.readValue(value,"winlog.eventId") == 1
        val logonId = JsonPathUtils.readValue(value,"winlog.event_data.logonId")
        val loginSuccess = ctx.getEventsForPattern("loginSuccess")
        var logonCondition = loginSuccess.exist(cachedData => {
          JsonPathUtils.readValue(cahedData,"winlog.event_data.TargeLogonId") == logonId
        })
        eventCondition && logonCondition
      })
      //第四步,上一步创建的进程 创建了 .dll  or .ps1 类型文件
      .followedBy("traceFile").where((value, ctx) => {
        val eventCondition = 
           JsonPathUtils.readValue(value,"winlog.eventId") == 11
        val pid = JsonPathUtils.readValue(value,"process.pid")
        val traceProcess = ctx.getEventsForPattern("traceProcess")
        val pidCondiditon = traceProcess.exist(cacheData => {
          JsonPathUtils.readValue(cacheData,"process.pid") == pid)
        })
        val filePathCondititon = 
          JsonPathUtils.readValue(value,"file.path").endsWith(".dll") ||
          JsonPathUtils.readValue(value,"file.path").endsWith("ps1")
          
        eventCondition && pidCondiditon && filePathCondititon
    })
      .followedBy("traceNetwork").where((value, ctx) => {
        val eventCondition = JsonPathUtils.readValue(value,"winlog.eventId") == 3
        val networkCondition = 
          value.getJSONObject("network").getString("direction") == "outbound"

        val pid = JsonPathUtils.readValue(value,"process.pid")
        val traceProcess = ctx.getEventsForPattern("traceProcess")
        var pidCondiditon = traceProcess.exist(cacheData => {
         JsonPathUtils.readValue(cacheData,"process.pid") == pid
        })
        eventCondition && networkCondition && pidCondiditon
    })
      .oneOrMore.optional
      .within(Time.minutes(30L))

    val patternStream = CEP.pattern(kafkaStream, cepPattern)

    val result = patternStream.select(patternMap => {
      val inbound135 = patternMap("inbound135")
      val loginSuccess = patternMap("loginSuccess")
      val traceProcess = patternMap("traceProcess")
      val traceFile = patternMap("traceFile")
      val traceNetwork = patternMap("traceNetwork")

      var ruleInfoTmp =
        s"""
           |主机 ${inbound135.head.getString("computer_name")}发现 $ruleName事件:
           |${inbound135.head.getString("@timestamp")} 遭遇 外部ip   
              ${inbound135.map(_.getString("ip")).mkString} 连接 本地  端口
           |${loginSuccess.head.getString("@timestamp")} ip:${loginSuccess.map(_.getJSonObject("source").getString("ip")).mkString} 账号: ${loginSuccess.map(_.getJSonObject("user").getString("name")).mkString} 登录成功
           |${traceProcess.head.getString("@timestamp")} 执行命令:${traceProcess.map(_.getJSonObject("process").getString("args")).mkString}, 创建进程:${traceProcess.map(_.getJSonObject("process").getString("pid")).mkString}
           |${traceFile.head.getString("@timestamp")} 创建疑似恶意文件 ${traceFile.map(_.getJSonObject("file").getString("path")).mkString}
           |${traceNetwork.head.getString("@timestamp")} 网络外连 ${traceNetwork.map(_.getJSonObject("destination").toJSONString).mkString}""".stripMargin
      ruleInfoTmp += s"n详细日志如下:n ${patternMap.values.map(_.map(_.toJSONString).mkString("n")).mkString("n")}"
      ruleInfoTmp
    })
Flink Sql

统计、pattern recognition、ETL

val conf: Configuration = new Configuration()
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
val bsEnv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
val tbEnv = StreamTableEnvironment.create(bsEnv)

 val createTable:String =
    """
      |CREATE TABLE  hadoop.iceberg_db.iceberg_001 (
      |    id BIGINT COMMENT 'unique id',
      |    data STRING,
      |    `@timestamp` STRING
      |) WITH ('connector'='iceberg','write.format.default'='parquet')
      |""".stripMargin

  val sourceTable:String =
    """
      |CREATE TABLE sourceTable (
      | userid int,
      | f_random_str STRING
      |) WITH (
      | 'connector' = 'datagen',
      | 'rows-per-second'='1000',
      | 'fields.userid.kind'='random',
      | 'fields.userid.min'='1',
      | 'fields.userid.max'='100',
      | 'fields.f_random_str.length'='10'
      |)
      |""".stripMargin

tbEnv.toRetractStream[Row](tab)

tbEnv.executeSql("select * from sourceTable").print()
tbEnv.executeSql("insert into hadoop.iceberg_db.iceberg_002 select * from sourceTable")
状态计算
class MyMergeStateImpl extends KeyedProcessFunction[String, ProcessNode, String]{
    
val ttlConfig: StateTtlConfig = StateTtlConfig
      .newBuilder(Time.seconds(60))
      .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
      .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
      .build

var procTreeNodesState: MapState[String, ArrayBuffer[ProcessNode]] = _
val procTreeNodesStateDesc = new MapStateDescriptor[String, 
    ArrayBuffer[ProcessNode]](
    "ProcessTreeNode",
    BasicTypeInfo.STRING_TYPE_INFO,
    TypeInformation.of(new TypeHint[ArrayBuffer[ProcessNode]]() {})
  ).enableTimeToLive(ttlConfig)

  var procTreeIndexState: MapState[String, String] = _
  val procTreeIndexStateDesc = new MapStateDescriptor[String, String](
    "ProcessTreeIndex",
    BasicTypeInfo.STRING_TYPE_INFO,
    BasicTypeInfo.STRING_TYPE_INFO
  ).enableTimeToLive(ttlConfig)

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    procTreeNodesState = getRuntimeContext.getMapState(procTreeNodesStateDesc)
    procTreeIndexState = getRuntimeContext.getMapState(procTreeIndexStateDesc)

  }
  override def processElement(processNode: ProcessNode,
               context: KeyedProcessFunction[String, ProcessNode,String]#Context,
               collector: Collector[String]): Unit = {
    mergeProcessTreeNode(processNode)
  }

  private def mergeProcessTreeNode(curTreeNode:ProcessNode):Unit = {
      ... 
    nodeFlag match {
      // 同一棵树 : 直接 把当前节点添加进去
      case "sameTree" =>
        val upTreeId = if(curTreeId != "") curTreeId else parentTreeId
        val updateNode = procTreeNodesState.get(upTreeId)
        ProcessTree.mergeNode(curTreeNode, updateNode)
        procTreeNodesState.put(curTreeId, updateNode)
        procTreeIndexState.put(curTreeNode.selfCode,upTreeId)
        procTreeIndexState.put(curTreeNode.parentCode,upTreeId)

      // 不同树: 分别把两个棵树得所有节点 合并在一起,产生一棵新树
      case "differentTree" =>
        val mergeTreeId = UUID.randomUUID().toString
        val mergeNodeBuffer = new ArrayBuffer[ProcessNode]()
        procTreeNodesState.get(curTreeId).foreach( ProcessTree.mergeNode(_, mergeNodeBuffer))
        procTreeNodesState.get(parentTreeId).foreach( ProcessTree.mergeNode(_, mergeNodeBuffer))

        ProcessTree.mergeNode(curTreeNode, mergeNodeBuffer)
        procTreeIndexState.entries().asScala.foreach(indexEntry => {
          if (indexEntry.getValue == curTreeId || indexEntry.getValue == parentTreeId){
            indexEntry.setValue(mergeTreeId)
          }
        })
        procTreeNodesState.remove(curTreeId)
        procTreeNodesState.remove(parentTreeId)
        procTreeNodesState.put(mergeTreeId, mergeNodeBuffer.sortBy(_.timestamp))

      case "notTree" =>
        val newTreeId = UUID.randomUUID().toString
        procTreeNodesState.put(newTreeId, ArrayBuffer.apply(curTreeNode))

        procTreeIndexState.put(curTreeNode.selfCode, newTreeId)
        procTreeIndexState.put(curTreeNode.parentCode, newTreeId)
    }
    procTreeIndexState.remove("")
      
    // 清理历史数据
      procTreeNodesState.entries().asScala.foreach(treeEntry => {
        val nodeList = treeEntry.getValue
        val treeMaxTime = nodeList.maxBy(_.timestamp).timestamp
        val timeFilterNode = nodeList.filter(n=>{
          System.currentTimeMillis() - 5 * 60 * 1000 < n.timestamp 
        })
        treeEntry.setValue(timeFilterNode)
        procTreeNodesState.put(treeEntry.getKey, treeEntry.getValue)
        if(timeFilterNode.isEmpty) noneTreeId.append(treeEntry.getKey)
      })
  }
}
connector 消费 kafka 数据 (source)
val kafkaProperties = new Properties()
kafkaProperties.setProperty("bootstrap.servers", jobParams.bootstrapServers)
kafkaProperties.setProperty("auto.offset.reset", jobParams.autoOffsetReset)
kafkaProperties.setProperty("group.id", jobParams.groupId)
kafkaProperties.setProperty("default.api.timeout.ms", "600000")

 val kafkaStream = env
      .addSource(new FlinkKafkaConsumer(jobParams.kafkaInputTopic,
                                        new SimpleStringSchema(),                                                     kafkaProperties)
        .setCommitOffsetsOnCheckpoints(true)
        .setStartFromGroupOffsets())
      .setParallelism(jobParams.kafkaParallelism).name("kafka 数据流")
      // 过滤掉非json 格式的脏数据
      .map(recode => {
        try {JSON.parseObject(recode)}
        catch {
          case e: Exception => log.error(recode + ">>>" + e.getMessage)
        }
      })
      .filter(_ != null)
      .name("format data from kafka")

广播消息源(广播规则)
val broadcastDescriptor = new MapStateDescriptor[String, Map[String, Any]]                               ("broadcastRules",
                             BasicTypeInfo.STRING_TYPE_INFO,
                             TypeInformation.of(classOf[Map[String, Any]]))

env.addSource(new RichSourceFunction[Map[String, Any]]() {
    private var isRunning = true
    private var lastModifyTime: Long = _
    private var ruleParser: RulParser = _

  override def open(parameters: Configuration) {
      lastModifyTime = 0
      ruleParser = new FalcoRuleParser()
      ruleParser.parsedRule(jobParams.ruleIndex)
    }

  @throws[Exception]
    override def run(ctx: SourceFunction.SourceContext[Map[String, Any]]) = {
      while (isRunning) {
      val currentMaxTime = ruleParser.getRuleUpdateTime(jobParams)
        val isChanged = if (lastModifyTime == currentMaxTime) false else true
        if (isChanged) {
          ctx.collect(ruleParser.getRuleCondition)
          lastModifyTime = currentMaxTime
        }
        TimeUnit.SECONDS.sleep(60)
      }
    }
    override def cancel(): Unit = {isRunning = false}
  }).setParallelism(1).name("规则监控流").broadcast(broadcastDescriptor)
自定义消息源(SocketServer)
class SyslogUDPServerSource(serverHost:String, serverPort:Int) extends RichSourceFunction[JSONObject] {

  private lazy val log = LoggerFactory.getLogger(this.getClass.getName)
  private val splitPackageBuffer = new ArrayBuffer[String]
  private var channel: DatagramChannel = _
  private var selector: Selector = _
  private var isRunning: Boolean = false
  private var grokCompiler: GrokCompiler = _
  private var grok: Grok = _

  override def open(parameters: Configuration): Unit = {
    channel = DatagramChannel.open
    channel.configureBlocking(false)
    channel.bind(new InetSocketAddress(serverHost, serverPort))
    selector = Selector.open
    channel.register(selector, SelectionKey.OP_READ)
    isRunning = true

    grokCompiler = GrokCompiler.newInstance()
    grokCompiler.registerDefaultPatterns()
    // 进行注册, registerDefaultPatterns()方法注册的是Grok内置的patterns

    grok = grokCompiler.compile(Constant.GROK_PATTERN)
    log.info(s"UDP服务器启动:${serverHost}:${serverPort}")
  }
  override def run(ctx: SourceFunction.SourceContext[JSONObject]): Unit = {
    //轮询服务
    while (isRunning) {
      // 通过选择器,查询IO事件
      while (selector.select > 0) {

        val iterator = selector.selectedKeys.iterator
        val buffer = ByteBuffer.allocate(1024 * 1000)
        while (iterator.hasNext) {
          val selectionKey = iterator.next

          if (selectionKey.isReadable) {
            channel.receive(buffer)
            buffer.flip
            val receiveStr = new String(buffer.array, 0, buffer.limit)
            val parserData = SyslogGrokExtract.getMessage(receiveStr)
		   ctx.collect(parserData)
          } 
          buffer.compact()
          iterator.remove()
        }
      }
      TimeUnit.MILLISECONDS.sleep(500)
    }
  }
  override def cancel(): Unit = {
    try {
      isRunning = false
      selector.close()
      channel.close()
    } catch {
      case e: IOException =>
        e.printStackTrace()
    }
  }
}
Sink 结果到 Kafka
val kafkaProducer = new FlinkKafkaProducer[String](
                                       jobParams.bootstrapServers,                                                  jobParams.kafkaOutputTopic,
                                       new SimpleStringSchema)
kafkaProducer.setWriteTimestampToKafka(true)
rtnDataStream.addSink(kafkaProducer)
Sink 结果到 ElasticSearch
def getElasticSearchSinkBuilder(esHosts: String, 
               esPort: Int,
               esUser: String, 
               esPassword: String) = {
  val httpHosts = new java.util.ArrayList[HttpHost]
  esHosts.split(",").foreach(esaddr => {
    httpHosts.add(new HttpHost(esaddr, esPort, "http"))
  })
  new ElasticsearchSink.Builder[ElasticSearchSinkBean](
    httpHosts,
    new ElasticsearchSinkFunction[ElasticSearchSinkBean] {
      def createIndexRequest(esBean: ElasticSearchSinkBean): IndexRequest = {
        Requests.indexRequest()
          .index(esBean.indexName)
          .`type`("doc")
          .id(esBean.docId)
          .source(esBean.message, XContentType.JSON)
      }

      override def process(esBean: ElasticSearchSinkBean,
                           runtimeContext: RuntimeContext,
                           requestIndexer: RequestIndexer): Unit = {
        requestIndexer.add(createIndexRequest(esBean))
      }
    }
  )
}
...
// 对 DataStream 使用 addSink 绑定输出
val esSinkBuilder = getElasticSearchSinkBuilder(xxxx)
esSinkBuilder.setBulkFlushMaxActions(1000)
formatESData.addSink(esSinkBuilder.build()).setParallelism(5)
Sink 结果到 Mysql
class JDBCSink() extends RichSinkFunction[SensorReading]{ 
  // 定义sql连接、预编译器
  var conn: Connection = _
  var insertStmt: PreparedStatement = _
  var updateStmt: PreparedStatement = _

  // 初始化,创建连接和预编译语句
  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink?serverTimezone=UTC", "root", "root")
    insertStmt = conn.prepareStatement("INSERT INTO salary_table (name, salary) VALUES (?,?)")
    updateStmt = conn.prepareStatement("UPDATE salary_table SET salary = ? WHERe name = ?")
  }

  // 调用连接,执行sql
  override def invoke(value: SensorReading,
                      context: SinkFunction.Context[_]): Unit = {
    // 执行更新语句
    updateStmt.setString(2, value.id)
    updateStmt.setDouble(1, value.temperature)
    updateStmt.execute()

    // 如果update没有查到数据,那么执行插入语句
    if( updateStmt.getUpdateCount == 0 ){
      insertStmt.setString(1, value.id)
      insertStmt.setDouble(2, value.temperature)
      insertStmt.execute()
    }
  }

  // 关闭时做清理工作
  override def close(): Unit = {
    insertStmt.close()
    updateStmt.close()
    conn.close()
  }
}
...
// 在主程序里面通过addSink绑定
stream.addSink(new JDBCSink())
其他

大数据方向技能图谱

大数据安全分析体系

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/712719.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号