flink 体系化总结 https://www.yuque.com/wangzhiwuimportbigdata/ihf2lk/ka2x0h#Aq72c
大数据成神之路 https://github.com/wangzhiwubigdata/God-Of-BigData
大数据方向学习指南 https://blog.csdn.net/u013411339/article/details/118643213
五分钟学大数据 https://www.cnblogs.com/itlz/p/14356708.html
背景知识 实时计算的三个特征:无限数据:无限数据指的是一种不断增长的,基本上无限的数据集。这些通常被称为 流数据,而与之相对的是有限的数据集。
无界数据处理:一种持续的数据处理模式,能够通过处理引擎重复的去处理上面的无限数据,是能够突破有限数据处理引擎的瓶颈的。
低延迟:延迟是多少并没有明确的定义。但我们都知道数据的价值将随着时间的流逝降低,时效性将是需要持续解决的问题。
两阶段提交协议(Two-Phase Commit,2PC)是很常用的解决分布式事务问题的方式,它可以保证在分布式事务中,要么所有参与进程都提交事务,要么都取消,即实现 ACID 中的 A (原子性)。
在数据一致性的环境下,其代表的含义是:要么所有备份数据同时更改某个数值,要么都不改,以此来达到数据的强一致性。
两阶段提交协议中有两个重要角色,协调者(Coordinator)和参与者(Participant),其中协调者只有一个,起到分布式事务的协调管理作用,参与者有多个。
顾名思义,两阶段提交将提交过程划分为连续的两个阶段:表决阶段(Voting)和提交阶段(Commit)。
两阶段提交协议过程如下图所示:
第一阶段:表决阶段
- 协调者向所有参与者发送一个 VOTE_REQUEST 消息。当参与者接收到 VOTE_REQUEST 消息,向协调者发送VOTE_COMMIT 消息作为回应,告诉协调者自己已经做好准备提交准备,如果参与者没有准备好或遇到其他故障,就返回一个VOTE_ABORT 消息,告诉协调者目前无法提交事务。
第二阶段:提交阶段
- 协调者收集来自各个参与者的表决消息。如果**所有参与者一致认为可以提交事务,那么协调者决定事务的最终提交,在此情形下协调者向所有参与者发送一个 GLOBAL_COMMIT 消息,通知参与者进行本地提交;如果所有参与者中有任意一个返回消息是 VOTE_ABORT,协调者就会取消事务**,向所有参与者广播一条 GLOBAL_ABORT 消息通知所有的参与者取消事务。每个提交了表决信息的参与者等候协调者返回消息,如果参与者接收到一个 GLOBAL_COMMIT 消息,那么参与者提交本地事务,否则如果接收到 GLOBAL_ABORT 消息,则参与者取消本地事务。
核心理念:Apache Flink 是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理**
容错机制 CheckpointCheckpoint 机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性。Flink的Checkpoint机制原理来自 Chandy-Lamport algorithm算法。
Flink Checkpoint 的两阶段实现
- beginTransaction,开启一个事务,在临时目录中创建一个临时文件,之后写入数据到该文件中。此过程为不同的事务创建隔离,避免数据混淆。preCommit。预提交阶段。将缓存数据块写出到创建的临时文件,然后关闭该文件,确保不再写入新数据到该文件,同时开启一个新事务,执行属于下一个检查点的写入操作。commit。在提交阶段,以原子操作的方式将上一阶段的文件写入真正的文件目录下。如果提交失败,Flink应用会重启,并调用TwoPhaseCommitSinkFunction#recoverAndCommit方法尝试恢复并重新提交事务。abort。一旦终止事务,删除临时文件。
进行有状态的计算是 Flink 最重要的特性之一。所谓的状态,其实指的是 Flink 程序的中间计算结果。Flink 支持了不同类型的状态,并且针对状态的持久化还提供了专门的机制和状态管理器。
Keyed States:记录每个Key对应的状态值一个Task上可能包含多个Key不同Task上不会出现相同的Key ,常用的 MapState, ValueState
Operator States:记录每个Task对应的状态值数据类型
State-Keyed State基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,都对应一个state,比如:stream.keyBy(…)。KeyBy之后的Operator State,可以理解为分区过的Operator State。
保存state的数据结构:
ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值。
ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值。
ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值。
MapState
需要注意的是,以上所述的State对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄
State-Operator State与Key无关的State,与Operator绑定的state,整个operator只对应一个state。
举例来说,Flink中的 Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射。
Broadcast StateBroadcast State 是 Flink 1.5 引入的新特性。在开发过程中,如果遇到需要下发/广播配置、规则等低吞吐事件流到下游所有 task 时,就可以使用 Broadcast State 特性。下游的 task 接收这些配置、规则并保存为 BroadcastState, 将这些配置应用到另一个数据流的计算中 。
class MyBroadcastProcessImpl[KS, IN1, IN2, OUT] extends BroadcastProcessFunction[IN1, IN2, OUT] {
// 数据流处理部分
override def processElement(in1: IN1,
ctx: BroadcastProcessFunction[IN1, IN2, OUT]#ReadOnlyContext,
collector: Collector[OUT]): Unit = ???
// 广播流处理部分
override def processBroadcastElement(in2: IN2,
ctx: BroadcastProcessFunction[IN1, IN2, OUT]#Context,
collector: Collector[OUT]): Unit = ???
}
过期时间 TTL 功能的用法
val ttlConfig: StateTtlConfig = StateTtlConfig
.newBuilder(Time.seconds(60))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build
val stateDescriptor = new ValueStateDescriptor[String]("testState", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)
时间概念 Time
Flink 支持了流处理程序在时间上的三个定义:事件时间 EventTime 、摄入时间 IngestionTime 、处理时间 ProcessingTime 。
EventTime :是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。IngestionTime :是数据进入Flink的时间。ProcessingTime:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。
使用方式如下:
val env = StreamExecutionEnvironment.getExecutionEnvironrnent() // 使用处理时间 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // 使用摄入时间 env.setStrearnTimeCharacteristic(TimeCharacteristic.IngestionTime) // 使用事件时间 env.setStrearnTimeCharacteristic(TimeCharacteristic.EventTime)
延迟的数据Flink也有自己的解决办法,主要的办法是给定一个允许延迟的时间,在该时间范围内仍可以接受处理延迟数据:
设置允许延迟的时间是通过 allowedLateness(lateness: Time)设置保存延迟数据则是通过 sideOutputLateData(outputTag: OutputTag[T])保存获取延迟数据是通过 DataStream.getSideOutput(tag: OutputTag[X])获取
Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用 Watermark 机制结合 Window 来实现。
数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了,因此,Window 的执行也是由 Watermark 触发的。
WaterMark + EventTimeWindow + Allowed Lateness方案
窗口机制 Windows流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而window是一种切割无限数据为有限块进行处理的手段。
划分窗口就两种方式:
- 根据时间进行截取(time-driven-window),比如每1分钟统计一次或每10分钟统计一次。根据数据进行截取(data-driven-window),比如每5个数据统计一次或每50个数据统计一次。
滚动窗口(Tumbling Windows)
将数据依据固定的窗口长度对数据进行切片。
适用场景: 基于时间段、时间片的各种统计指标
滑动窗口(Sliding Windows)
滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。
适用场景: 对最近一个时间段内的统计(求某接口最近5min的失败率)来决定是否要报警
会话窗口(Session Windows)
由一系列事件组合一个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。
适用场景: 基于session 的概念,适合一些类似UEBA 类行为分析
编程案例 Flink 标准编程模型Flink 应用程序主要由三部分组成,**源 Source ** 、转换 transformation 、**目的地 sink ** 。这些流式 dataflows 形成了有向图,以一个或多个源(source)开始,并以一个或多个目的地(sink)结束。
Source: 数据源,Flink 在流处理和批处理上的 source 大概有 4 类:基于本地集合的 source、基于文件的 source、基于网络套接字的 source、自定义的 source。自定义的 source 常见的有 Apache kafka、RabbitMQ 等,当然你也可以定义自己的 source。
Transformation:数据转换的各种操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select 等,操作很多,可以将数据转换计算成你想要的数据。
Sink:接收器,Flink 将转换计算后的数据发送的地点 ,你可能需要存储下来,Flink 常见的 Sink 大概有如下几类:写入文件、打印出来、写入 socket 、自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定义自己的 sink。
统计分析 val conf: Configuration = new Configuration()
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
// val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val kafkaStream: DataStream[JSONObject] = getKafkaStream(env, jobParams)
...
flowDataStream.keyBy(_.upstreamAddr)
.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
.process(new FlowBeanWindowsCalc())
.flatMap(_.toList)
.addSink(sensitiveEsSinkBuilder.build()).setParallelism(1)
...
class FlowBeanWindowsCalc
extends ProcessWindowFunction[FlowBean, Array[String],String, TimeWindow] {
override def process(upStreamAddr: String,
context: Context,
elements: Iterable[FlowBean],
out: Collector[Array[String]]) = {
val flowRelationArray = ArrayBuffer[String]()
val timeSlot = context.window.getEnd
val keyByRemoteAddr = elements.toList.groupBy(_.remoteAddr)
keyByRemoteAddr.foreach {
case (remoteAddr, flowBeanLists) => {
val upFlowJson = new mutable.HashMap[String,Any]()
val timeList = flowBeanLists.map(_.timestamp)
val startTime = timeList.min
val endTime = timeList.max
val visit = flowBeanLists.size
val statusCount = flowBeanLists.map(f=>(f.status,1)).groupBy(_._1).mapValues(_.size)
val uriStatistics = ArrayBuffer[mutable.HashMap[String,Any]]()
flowBeanLists.groupBy(_.uri).foreach {
case (uri, byUriFlowInfo) => {
val hasSensitive = byUriFlowInfo.exists(_.hasSenstive)
val uriStatusCode = byUriFlowInfo.map(f=>(f.status,1)).groupBy(_._1).mapValues(_.size)
val requestTime = byUriFlowInfo.map(_.requestTime)
val slideSize = scala.math.ceil(byUriFlowInfo.size / 10.0).toInt
val lag = Map("max"->requestTime.max,
"min"->requestTime.min,
"avg"->requestTime.sum / requestTime.size)
val lagTrend = requestTime.grouped(slideSize).map(_.sum).toArray
val visit = byUriFlowInfo.size
val visitTrend = byUriFlowInfo.map(_.visit).grouped(slideSize).map(_.sum).toArray
val requestTraffic = byUriFlowInfo.map(_.requestTrafficSize)
val requestTrafficTotal = requestTraffic.sum
val requestTrafficTrend = requestTraffic.grouped(slideSize).map(r=>r.sum/r.size).toArray
val responseTraffic = byUriFlowInfo.map(_.responseTrafficSize)
val responseTrafficTotal = responseTraffic.sum
val responseTrafficTrend = responseTraffic.grouped(slideSize).map(r=>r.sum/r.size).toArray
val uriType = byUriFlowInfo.head.uriType
}
}
...
upFlowJson.put("uri_statistics",uriStatistics.toArray)
upFlowJson.put("src_node",remoteAddr)
upFlowJson.put("dst_node",upStreamAddr)
flowRelationArray.append(Json(DefaultFormats).write(upFlowJson))
}
}
out.collect(flowRelationArray.toArray)
}
}
标准 CEP
val cepPattern = Pattern
//第一步: 外网连接 135 端口
.begin[JSONObject]("inbound135", AfterMatchSkipStrategy.skipPastLastEvent)
.where { (value, ctx) => {
val networkCondition =
JsonPathUtils.readValue(value,"network.direction") == "inbound"
val portCondition =
JsonPathUtils.readValue(value,"destination.port") == 135
newworkCondition && portCondition
}
// 第二步: Administrator 账号登录成功,同时登录IP是上一步的ip
.followedBy("loginSuccess").where((value, ctx) => {
val eventCondition =
JsonPathUtils.readValue(value,"winlog.eventId") == 4624
val sourceIp = JsonPathUtils.readValue(value,"source.ip")
val inbound135 = ctx.getEventsForPattern("inbound135")
var sourceIpCondition = inbound135.exist(cahedData => {
JsonPathUtils.readValue(cahedData,"source.ip") == sourceIp
})
val userCondition =
value.getJSONObject("user").getString("name") == "Administrator"
eventCondition && sourceIpCondition && userCondition
})
//第三步 有进程创建行为,且logonId 为 第二步 操作所产生
.followedBy("traceProcess").where((value, ctx) => {
val eventCondition = JsonPathUtils.readValue(value,"winlog.eventId") == 1
val logonId = JsonPathUtils.readValue(value,"winlog.event_data.logonId")
val loginSuccess = ctx.getEventsForPattern("loginSuccess")
var logonCondition = loginSuccess.exist(cachedData => {
JsonPathUtils.readValue(cahedData,"winlog.event_data.TargeLogonId") == logonId
})
eventCondition && logonCondition
})
//第四步,上一步创建的进程 创建了 .dll or .ps1 类型文件
.followedBy("traceFile").where((value, ctx) => {
val eventCondition =
JsonPathUtils.readValue(value,"winlog.eventId") == 11
val pid = JsonPathUtils.readValue(value,"process.pid")
val traceProcess = ctx.getEventsForPattern("traceProcess")
val pidCondiditon = traceProcess.exist(cacheData => {
JsonPathUtils.readValue(cacheData,"process.pid") == pid)
})
val filePathCondititon =
JsonPathUtils.readValue(value,"file.path").endsWith(".dll") ||
JsonPathUtils.readValue(value,"file.path").endsWith("ps1")
eventCondition && pidCondiditon && filePathCondititon
})
.followedBy("traceNetwork").where((value, ctx) => {
val eventCondition = JsonPathUtils.readValue(value,"winlog.eventId") == 3
val networkCondition =
value.getJSONObject("network").getString("direction") == "outbound"
val pid = JsonPathUtils.readValue(value,"process.pid")
val traceProcess = ctx.getEventsForPattern("traceProcess")
var pidCondiditon = traceProcess.exist(cacheData => {
JsonPathUtils.readValue(cacheData,"process.pid") == pid
})
eventCondition && networkCondition && pidCondiditon
})
.oneOrMore.optional
.within(Time.minutes(30L))
val patternStream = CEP.pattern(kafkaStream, cepPattern)
val result = patternStream.select(patternMap => {
val inbound135 = patternMap("inbound135")
val loginSuccess = patternMap("loginSuccess")
val traceProcess = patternMap("traceProcess")
val traceFile = patternMap("traceFile")
val traceNetwork = patternMap("traceNetwork")
var ruleInfoTmp =
s"""
|主机 ${inbound135.head.getString("computer_name")}发现 $ruleName事件:
|${inbound135.head.getString("@timestamp")} 遭遇 外部ip
${inbound135.map(_.getString("ip")).mkString} 连接 本地 端口
|${loginSuccess.head.getString("@timestamp")} ip:${loginSuccess.map(_.getJSonObject("source").getString("ip")).mkString} 账号: ${loginSuccess.map(_.getJSonObject("user").getString("name")).mkString} 登录成功
|${traceProcess.head.getString("@timestamp")} 执行命令:${traceProcess.map(_.getJSonObject("process").getString("args")).mkString}, 创建进程:${traceProcess.map(_.getJSonObject("process").getString("pid")).mkString}
|${traceFile.head.getString("@timestamp")} 创建疑似恶意文件 ${traceFile.map(_.getJSonObject("file").getString("path")).mkString}
|${traceNetwork.head.getString("@timestamp")} 网络外连 ${traceNetwork.map(_.getJSonObject("destination").toJSONString).mkString}""".stripMargin
ruleInfoTmp += s"n详细日志如下:n ${patternMap.values.map(_.map(_.toJSONString).mkString("n")).mkString("n")}"
ruleInfoTmp
})
Flink Sql
统计、pattern recognition、ETL
val conf: Configuration = new Configuration()
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
val bsEnv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
val tbEnv = StreamTableEnvironment.create(bsEnv)
val createTable:String =
"""
|CREATE TABLE hadoop.iceberg_db.iceberg_001 (
| id BIGINT COMMENT 'unique id',
| data STRING,
| `@timestamp` STRING
|) WITH ('connector'='iceberg','write.format.default'='parquet')
|""".stripMargin
val sourceTable:String =
"""
|CREATE TABLE sourceTable (
| userid int,
| f_random_str STRING
|) WITH (
| 'connector' = 'datagen',
| 'rows-per-second'='1000',
| 'fields.userid.kind'='random',
| 'fields.userid.min'='1',
| 'fields.userid.max'='100',
| 'fields.f_random_str.length'='10'
|)
|""".stripMargin
tbEnv.toRetractStream[Row](tab)
tbEnv.executeSql("select * from sourceTable").print()
tbEnv.executeSql("insert into hadoop.iceberg_db.iceberg_002 select * from sourceTable")
状态计算
class MyMergeStateImpl extends KeyedProcessFunction[String, ProcessNode, String]{
val ttlConfig: StateTtlConfig = StateTtlConfig
.newBuilder(Time.seconds(60))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build
var procTreeNodesState: MapState[String, ArrayBuffer[ProcessNode]] = _
val procTreeNodesStateDesc = new MapStateDescriptor[String,
ArrayBuffer[ProcessNode]](
"ProcessTreeNode",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint[ArrayBuffer[ProcessNode]]() {})
).enableTimeToLive(ttlConfig)
var procTreeIndexState: MapState[String, String] = _
val procTreeIndexStateDesc = new MapStateDescriptor[String, String](
"ProcessTreeIndex",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO
).enableTimeToLive(ttlConfig)
override def open(parameters: Configuration): Unit = {
super.open(parameters)
procTreeNodesState = getRuntimeContext.getMapState(procTreeNodesStateDesc)
procTreeIndexState = getRuntimeContext.getMapState(procTreeIndexStateDesc)
}
override def processElement(processNode: ProcessNode,
context: KeyedProcessFunction[String, ProcessNode,String]#Context,
collector: Collector[String]): Unit = {
mergeProcessTreeNode(processNode)
}
private def mergeProcessTreeNode(curTreeNode:ProcessNode):Unit = {
...
nodeFlag match {
// 同一棵树 : 直接 把当前节点添加进去
case "sameTree" =>
val upTreeId = if(curTreeId != "") curTreeId else parentTreeId
val updateNode = procTreeNodesState.get(upTreeId)
ProcessTree.mergeNode(curTreeNode, updateNode)
procTreeNodesState.put(curTreeId, updateNode)
procTreeIndexState.put(curTreeNode.selfCode,upTreeId)
procTreeIndexState.put(curTreeNode.parentCode,upTreeId)
// 不同树: 分别把两个棵树得所有节点 合并在一起,产生一棵新树
case "differentTree" =>
val mergeTreeId = UUID.randomUUID().toString
val mergeNodeBuffer = new ArrayBuffer[ProcessNode]()
procTreeNodesState.get(curTreeId).foreach( ProcessTree.mergeNode(_, mergeNodeBuffer))
procTreeNodesState.get(parentTreeId).foreach( ProcessTree.mergeNode(_, mergeNodeBuffer))
ProcessTree.mergeNode(curTreeNode, mergeNodeBuffer)
procTreeIndexState.entries().asScala.foreach(indexEntry => {
if (indexEntry.getValue == curTreeId || indexEntry.getValue == parentTreeId){
indexEntry.setValue(mergeTreeId)
}
})
procTreeNodesState.remove(curTreeId)
procTreeNodesState.remove(parentTreeId)
procTreeNodesState.put(mergeTreeId, mergeNodeBuffer.sortBy(_.timestamp))
case "notTree" =>
val newTreeId = UUID.randomUUID().toString
procTreeNodesState.put(newTreeId, ArrayBuffer.apply(curTreeNode))
procTreeIndexState.put(curTreeNode.selfCode, newTreeId)
procTreeIndexState.put(curTreeNode.parentCode, newTreeId)
}
procTreeIndexState.remove("")
// 清理历史数据
procTreeNodesState.entries().asScala.foreach(treeEntry => {
val nodeList = treeEntry.getValue
val treeMaxTime = nodeList.maxBy(_.timestamp).timestamp
val timeFilterNode = nodeList.filter(n=>{
System.currentTimeMillis() - 5 * 60 * 1000 < n.timestamp
})
treeEntry.setValue(timeFilterNode)
procTreeNodesState.put(treeEntry.getKey, treeEntry.getValue)
if(timeFilterNode.isEmpty) noneTreeId.append(treeEntry.getKey)
})
}
}
connector
消费 kafka 数据 (source)
val kafkaProperties = new Properties()
kafkaProperties.setProperty("bootstrap.servers", jobParams.bootstrapServers)
kafkaProperties.setProperty("auto.offset.reset", jobParams.autoOffsetReset)
kafkaProperties.setProperty("group.id", jobParams.groupId)
kafkaProperties.setProperty("default.api.timeout.ms", "600000")
val kafkaStream = env
.addSource(new FlinkKafkaConsumer(jobParams.kafkaInputTopic,
new SimpleStringSchema(), kafkaProperties)
.setCommitOffsetsOnCheckpoints(true)
.setStartFromGroupOffsets())
.setParallelism(jobParams.kafkaParallelism).name("kafka 数据流")
// 过滤掉非json 格式的脏数据
.map(recode => {
try {JSON.parseObject(recode)}
catch {
case e: Exception => log.error(recode + ">>>" + e.getMessage)
}
})
.filter(_ != null)
.name("format data from kafka")
广播消息源(广播规则)
val broadcastDescriptor = new MapStateDescriptor[String, Map[String, Any]] ("broadcastRules",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(classOf[Map[String, Any]]))
env.addSource(new RichSourceFunction[Map[String, Any]]() {
private var isRunning = true
private var lastModifyTime: Long = _
private var ruleParser: RulParser = _
override def open(parameters: Configuration) {
lastModifyTime = 0
ruleParser = new FalcoRuleParser()
ruleParser.parsedRule(jobParams.ruleIndex)
}
@throws[Exception]
override def run(ctx: SourceFunction.SourceContext[Map[String, Any]]) = {
while (isRunning) {
val currentMaxTime = ruleParser.getRuleUpdateTime(jobParams)
val isChanged = if (lastModifyTime == currentMaxTime) false else true
if (isChanged) {
ctx.collect(ruleParser.getRuleCondition)
lastModifyTime = currentMaxTime
}
TimeUnit.SECONDS.sleep(60)
}
}
override def cancel(): Unit = {isRunning = false}
}).setParallelism(1).name("规则监控流").broadcast(broadcastDescriptor)
自定义消息源(SocketServer)
class SyslogUDPServerSource(serverHost:String, serverPort:Int) extends RichSourceFunction[JSONObject] {
private lazy val log = LoggerFactory.getLogger(this.getClass.getName)
private val splitPackageBuffer = new ArrayBuffer[String]
private var channel: DatagramChannel = _
private var selector: Selector = _
private var isRunning: Boolean = false
private var grokCompiler: GrokCompiler = _
private var grok: Grok = _
override def open(parameters: Configuration): Unit = {
channel = DatagramChannel.open
channel.configureBlocking(false)
channel.bind(new InetSocketAddress(serverHost, serverPort))
selector = Selector.open
channel.register(selector, SelectionKey.OP_READ)
isRunning = true
grokCompiler = GrokCompiler.newInstance()
grokCompiler.registerDefaultPatterns()
// 进行注册, registerDefaultPatterns()方法注册的是Grok内置的patterns
grok = grokCompiler.compile(Constant.GROK_PATTERN)
log.info(s"UDP服务器启动:${serverHost}:${serverPort}")
}
override def run(ctx: SourceFunction.SourceContext[JSONObject]): Unit = {
//轮询服务
while (isRunning) {
// 通过选择器,查询IO事件
while (selector.select > 0) {
val iterator = selector.selectedKeys.iterator
val buffer = ByteBuffer.allocate(1024 * 1000)
while (iterator.hasNext) {
val selectionKey = iterator.next
if (selectionKey.isReadable) {
channel.receive(buffer)
buffer.flip
val receiveStr = new String(buffer.array, 0, buffer.limit)
val parserData = SyslogGrokExtract.getMessage(receiveStr)
ctx.collect(parserData)
}
buffer.compact()
iterator.remove()
}
}
TimeUnit.MILLISECONDS.sleep(500)
}
}
override def cancel(): Unit = {
try {
isRunning = false
selector.close()
channel.close()
} catch {
case e: IOException =>
e.printStackTrace()
}
}
}
Sink 结果到 Kafka
val kafkaProducer = new FlinkKafkaProducer[String](
jobParams.bootstrapServers, jobParams.kafkaOutputTopic,
new SimpleStringSchema)
kafkaProducer.setWriteTimestampToKafka(true)
rtnDataStream.addSink(kafkaProducer)
Sink 结果到 ElasticSearch
def getElasticSearchSinkBuilder(esHosts: String,
esPort: Int,
esUser: String,
esPassword: String) = {
val httpHosts = new java.util.ArrayList[HttpHost]
esHosts.split(",").foreach(esaddr => {
httpHosts.add(new HttpHost(esaddr, esPort, "http"))
})
new ElasticsearchSink.Builder[ElasticSearchSinkBean](
httpHosts,
new ElasticsearchSinkFunction[ElasticSearchSinkBean] {
def createIndexRequest(esBean: ElasticSearchSinkBean): IndexRequest = {
Requests.indexRequest()
.index(esBean.indexName)
.`type`("doc")
.id(esBean.docId)
.source(esBean.message, XContentType.JSON)
}
override def process(esBean: ElasticSearchSinkBean,
runtimeContext: RuntimeContext,
requestIndexer: RequestIndexer): Unit = {
requestIndexer.add(createIndexRequest(esBean))
}
}
)
}
...
// 对 DataStream 使用 addSink 绑定输出
val esSinkBuilder = getElasticSearchSinkBuilder(xxxx)
esSinkBuilder.setBulkFlushMaxActions(1000)
formatESData.addSink(esSinkBuilder.build()).setParallelism(5)
Sink 结果到 Mysql
class JDBCSink() extends RichSinkFunction[SensorReading]{
// 定义sql连接、预编译器
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _
// 初始化,创建连接和预编译语句
override def open(parameters: Configuration): Unit = {
super.open(parameters)
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink?serverTimezone=UTC", "root", "root")
insertStmt = conn.prepareStatement("INSERT INTO salary_table (name, salary) VALUES (?,?)")
updateStmt = conn.prepareStatement("UPDATE salary_table SET salary = ? WHERe name = ?")
}
// 调用连接,执行sql
override def invoke(value: SensorReading,
context: SinkFunction.Context[_]): Unit = {
// 执行更新语句
updateStmt.setString(2, value.id)
updateStmt.setDouble(1, value.temperature)
updateStmt.execute()
// 如果update没有查到数据,那么执行插入语句
if( updateStmt.getUpdateCount == 0 ){
insertStmt.setString(1, value.id)
insertStmt.setDouble(2, value.temperature)
insertStmt.execute()
}
}
// 关闭时做清理工作
override def close(): Unit = {
insertStmt.close()
updateStmt.close()
conn.close()
}
}
...
// 在主程序里面通过addSink绑定
stream.addSink(new JDBCSink())
其他
大数据方向技能图谱
大数据安全分析体系



