当然有。
您需要做的是利用输入过滤器上的多行编解码器。
根据示例:
input { file { path => "/var/log/someapp.log" prec => multiline { # Grok pattern names are valid! :) pattern => "^[%{YEAR}%{MONTHNUM}%{MONTHDAY}s*%{TIME}" negate => true what => previous } }}这基本上表明任何不以YYYYMMDD HH:mi:ss.000开头的行都将与上一行合并
现在,您可以从那里将Grok模式应用于第一行(以获取高级数据)。
一旦感到满意,您就可以从第一行获得所有所需的数据,然后可以在 r或 n上拆分,并使用单个grok模式(基于上面给出的示例)获取单个统计数据。
希望这可以帮助
d
更新2017-05-08 11:54:
完整的logstash conf可能看起来像这样,您将需要考虑更改grok模式以更好地满足您的要求(仅您知道数据)。
请注意,这尚未经过测试,由您自己决定。
input { file { path => "/var/log/someapp.log"### For testing and continual process of the same file, remove these before produciton start_position => "beginning" sincedb_path => "/dev/null"### Lets read the logfile and recombine multi line details prec => multiline { # Grok pattern names are valid! :) pattern => "^[%{YEAR}%{MONTHNUM}%{MONTHDAY}s*%{TIME}" negate => true what => previous } }}filter {### Let's get some high level data before we split the line (note: anything you grab before the split gets copied) grok { match => { "message" => "^[%{YEAR:yr}%{MONTHNUM:mnt}%{MONTHDAY:daynum}s*%{TIME:time}s*%{TZ:timezone}s*(%{data:thread_name})s*%{JAVACLASS:javaclass}#%{WORD:method}s*%{LOGLEVEL}]" } }### Split the lines back out to being a single line now. (this may be a r or n, test which one) split { "field" => "message" "terminator" => "r" }### Ok, the lines should now be independent, lets add another grok here to get the patterns as dictated by your example [fieldA: str | field2: 0...] etc.### Note: you should look to change the grok pattern to better suit your requirements, I used DATA here to quickly capture your content grok { break_on_match => false match => { "message" => "^[%{DATA}:s*%{data:fieldA}|%{DATA}:s*%{data:field2}|%{DATA}:s*%{data:field3}|%{DATA}:s*%{data:field4}|%{DATA}:s*%{data:field5}|%{DATA}:s*%{data:field6}|%{DATA}:s*%{data:field7}]$" } } mutate { convert => { "message" => "string" } add_field => { "session_timestamp" => "%{yr}-%{mnt}-%{daynum} %{time} %{timezone}" "load_timestamp" => "%{@timestamp}" } remove_field => ["yr","mnt", "daynum", "time", "timezone"] }}output { stdout { prec => rubydebug }}编辑2017-05-15
Logstash是一个复杂的解析器,它希望作为一个进程保持运行状态并持续监视日志文件(因此您必须将其崩溃)
匹配中断意味着您可能对同一行有多个匹配要求,如果找不到匹配项,则尝试列表中的下一个(总是变得很简单)
输入过滤器,将路径更改为以.log *结尾,也按照您的原始示例,该模式不必与所需的日期格式匹配(以便将所有关联放在一行上)
您的过滤器应指定我相信的分割字符(否则,我相信默认字符是逗号)。
input { file { path => "/u/bansalp/activemq_primary_plugin.stats.log*"### For testing and continual process of the same file, remove these before production start_position => "beginning" sincedb_path => "/dev/null"### Lets read the logfile and recombine multi line details prec => multiline { # Grok pattern names are valid! :) pattern => "^[destName:" negate => false what => "previous" } }}filter { if "logPerDestinationStats" in [message] { grok { match => { "message" => "^[%{YEAR:yr}%{MONTHNUM:mnt}%{MONTHDAY:daynum}s*%{TIME:time}s*%{TZ:timezone}s*(%{data:thread_name})s*%{JAVACLASS:javaclass}#%{WORD:method}s*%{LOGLEVEL}]s*" } } split { field => "message" terminator => "r” } grok { match => { "message" => "^[%{DATA}:s*%{data:destName}s*|s*%{DATA}:s*%{NUMBER:enqueueCount}s*|s*%{DATA}:s*%{NUMBER:dequeueCount}s*|s*%{DATA}:s*%{NUMBER:dispatchCount}s*|s*%{DATA}:s*%{NUMBER:expiredCount}s*|s*%{DATA}:s*%{NUMBER:inflightCount}s*|s*%{DATA}:s*%{NUMBER:msgsHeld}s*|s*%{DATA}:s*%{NUMBER:msgsCached}s*|s*%{DATA}:s*%{NUMBER:memoryPercentUsage}s*|s*%{DATA}:s*%{NUMBER:memoryUsage}s*|s*%{DATA}:s*%{NUMBER:memoryLimit}s*|s*%{DATA}:s*%{NUMBER:avgEnqueueTimeMs}s*|s*%{DATA}:s*%{NUMBER:maxEnqueueTimeMs}s*|s*%{DATA}:s*%{NUMBER:minEnqueueTimeMs}s*|s*%{DATA}:s*%{NUMBER:currentConsumers}s*|s*%{DATA}:s*%{NUMBER:currentProducers}s*|s*%{DATA}:s*%{NUMBER:blockedSendsCount}s*|s*%{DATA}:s*%{NUMBER:blockedSendsTimeMs}s*|s*%{DATA}:s*%{NUMBER:minMsgSize}s*|s*%{DATA}:s*%{NUMBER:maxMsgSize}s*|s*%{DATA}:s*%{NUMBER:avgMsgSize}s*|s*%{DATA}:s*%{NUMBER:totalMsgSize}]$" } } mutate { convert => { "message" => "string" } add_field => { "session_timestamp" => "%{yr}-%{mnt}-%{daynum} %{time} %{timezone}" "load_timestamp" => "%{@timestamp}" } remove_field => ["yr","mnt", "daynum", "time", "timezone"] } } else { drop{} }}请原谅我当前正在通过手机更新的格式,我很高兴有人代替我来更新格式。



