如果要用Logstash加载ES,则可以使用
aggregate过滤器来组装相关的离散日志行。这个想法是要注意一个持续时间较长的事件何时开始(即用户已连接),然后
disconnected在同一用户的事件通过以下时间结束时结束该事件:(请注意,您的grok模式可能有所不同,但是原理是相同的)
filter { grok { match => [ "message", "%{LOGLEVEL:loglevel} %{TIMESTAMP_ISO8601:timestamp} %{WORD:entity} %{INT:userid} %{WORD:status}" ] } if [status] == "connected" { aggregate { task_id => "%{userid}" pre => "map['started'] = event['timestamp']" map_action => "create" } } if [status] == "disconnected" { aggregate { task_id => "%{userid}" pre => "event['duration'] = event['timestamp'] - map['started']" map_action => "update" end_of_task => true timeout => 86400000 } }}最后,您将获得一个名为
duration(以毫秒为单位)的附加字段,然后可以使用该字段在Kibana上绘制以显示平均连接时间。
另请注意,我给了一天的任意超时时间,这可能会也可能不适合您的情况。随意玩。



