您可以使用
aggregate过滤器来执行此操作。聚合筛选器支持基于公共字段值将多个日志行聚合到一个事件中。在您的情况下,公共字段将是该
job_id字段。
然后,我们需要另一个字段来检测应该聚合的第一个事件与第二个事件。就您而言,这就是
state字段。
因此,您只需要向现有的Logstash配置中添加另一个过滤器,如下所示:
filter { ...your other filters if [state] == "processing" { aggregate { task_id => "%{job_id}" } } else if [state] == "failed" { aggregate { task_id => "%{job_id}" end_of_task => true timeout => 120 } }}您可以
timeout根据作业的运行时间自由调整(以秒为单位)。



