I'm using
- logstash 6.4.3
- with aggregate filter plugin 2.8.0
- with multiple pipelines (though not sure if this is relevant)
- with multiple aggregate filter in a sequence (though not sure if this is relevant)
and I'm trying to aggregate "load" and "cpu" events from metricbeat.
~# cat /etc/metricbeat/modules.d/system.yml
- module: system
period: 1s
metricsets:
- cpu
- load
When starting logstash, I often get following exception. Please note that it sometimes takes minutes, sometimes it takes hours...
[2020-11-29T06:22:07,772][FATAL][logstash.runner ] An unexpected error occurred! {:error=>#<NoMethodError: undefined method
multi_filter' for nil:NilClass>, :backtrace=>["(eval):241:in
block in initialize'", "org/jruby/RubyArray.java:1734:ineach'", "(eval):235:in
block in initialize'", "(eval):25:inblock in initialize'", "/opt/logstash/logstash-core/lib/logstash/pipeline.rb:499:in
block in flush_filters'", "org/jruby/RubyArray.java:1734:ineach'", "/opt/logstash/logstash-core/lib/logstash/pipeline.rb:498:in
flush_filters'", "/opt/logstash/logstash-core/lib/logstash/pipeline.rb:539:inflush_filters_to_batch'", "/opt/logstash/logstash-core/lib/logstash/pipeline.rb:322:in
worker_loop'", "/opt/logstash/logstash-core/lib/logstash/pipeline.rb:286:inblock in start_workers'"]} [2020-11-29T06:22:07,824][WARN ][logstash.inputs.syslog ] syslog listener died {:protocol=>:udp, :address=>"127.0.0.1:5514", :exception=>#<IOError: IO Error>, :backtrace=>["/opt/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-syslog-3.4.1/lib/logstash/inputs/syslog.rb:152:in
udp_listener'", "/opt/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-syslog-3.4.1/lib/logstash/inputs/syslog.rb:130:inserver'", "/opt/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-syslog-3.4.1/lib/logstash/inputs/syslog.rb:110:in
block in run'"]}
I want to aggregate metricbeat events in the interval of 10, 60 and 120 seconds (no worries, times gonna be increased when everything is running)... and as mentioned... I therefore put 3 filters in sequence. The filters compute the average of metrics (like for the cpu metricset the average of irq) by summing up correspondig fields (e.g. in irq_sum) and dividing the sum by the number of occurred events int the pushed aggregation event upon timeout.
For example, an aggregated events looks like following
{"system":{"cpu":{"system":{"pct":0.018459999999999997},"user":{"pct":0.01845},"irq":{"pct":0},"iowait":{"pct":0.00819},"steal":{"pct":0},"nice":{"pct":0.032314999999999997},"softirq":{"pct":0.002565}}},"count":20,"task_id":"cpu_agg10","tags":["aggregated","agg10","metricbeat"],"metricset":{"name":"cpu"},"@timestamp":"2020-11-29T06:01:33.374Z","@version":"1"}
The configuration looks like following, wheres all three filter are quite identical.
filter {
if [metricset][name] == "cpu" or [metricset][name] == "load" {
aggregate {
task_id => "%{[metricset][name]}_agg10"
code => "
map['timestamp'] ||= 0
map['count'] ||= 0
map['timestamp'] = event.get('@timestamp')
if event.get('[metricset][name]') == 'cpu' then
map['metricset_name'] = 'cpu'
map['count'] += 1
['irq', 'nice', 'softirq', 'steal', 'system', 'user', 'iowait'].each do |metric|
search_key = '[system][cpu][' + metric + '][pct]'
map_key = metric + '_sum'
map[map_key] ||= 0
map[map_key] += event.get(search_key)
end
else
map['metricset_name'] = 'load'
map['count'] += 1
['1', '5', '15'].each do |metric|
search_key = '[system][load][' + metric + ']'
map_key = metric + '_sum'
map[map_key] ||= 0
map[map_key] += event.get(search_key)
end
end
"
timeout_task_id_field => "task_id"
push_map_as_event_on_timeout => true
timeout => 10
timeout_tags => [ "aggregated", "agg10" ]
timeout_code => "
event.set('@timestamp', event.get('timestamp'))
if event.get('metricset_name') == 'cpu' then
event.set('[metricset][name]', 'cpu')
['irq', 'nice', 'softirq', 'steal', 'system', 'user', 'iowait'].each do |metric|
key = '[system][cpu][' + metric + '][pct]'
map_key = metric + '_sum'
event.set(key, (event.get(map_key)/event.get('count')))
event.remove(map_key)
end
else
event.set('[metricset][name]', 'load')
['1', '5', '15'].each do |metric|
key = '[system][load][' + metric + ']'
map_key = metric + '_sum'
event.set(key, (event.get(map_key)/event.get('count')))
event.remove(map_key)
end
end
event.remove('metricset_name')
event.remove('timestamp')
"
}
if "agg10" in [tags] {
aggregate {
task_id => "%{[metricset][name]}_agg30"
code => "
... same as for first filter ...
"
timeout_task_id_field => "task_id"
push_map_as_event_on_timeout => true
timeout => 30
timeout_tags => [ "aggregated", "agg30" ]
timeout_code => "
... same as for first filter ...
"
}
if "agg30" in [tags] {
aggregate {
task_id => "%{[metricset][name]}_agg60"
code => "
... same as for first filter ...
"
timeout_task_id_field => "task_id"
push_map_as_event_on_timeout => true
timeout => 60
timeout_tags => [ "aggregated", "agg60" ]
timeout_code => "
... same as for first filter ...
"
}
}
}
mutate { add_tag => ["metricbeat"] }
}
if "metricbeat" in [tags] {
if "aggregated" not in [tags] {
mutate { add_field => { "[@metadata][es_retention]" => "ultrashort" } }
} else if "agg10" in [tags] {
mutate { add_field => { "[@metadata][es_retention]" => "short" } }
} else if "agg30" in [tags] {
mutate { add_field => { "[@metadata][es_retention]" => "med" } }
} else if "agg60" in [tags] {
mutate { add_field => { "[@metadata][es_retention]" => "long" } }
}
}
}
BTW, metricbeat raw events look like following
{"system":{"load":{"norm":{"15":0.31,"5":0.45,"1":0.595},"15":0.62,"5":0.9,"cores":2,"1":1.19}},"host":{"name":"ses01"},"tags":["beats_input_raw_event","metricbeat"],"metricset":{"module":"system","rtt":413,"name":"load"},"@timestamp":"2020-11-29T06:00:07.375Z","@version":"1","beat":{"hostname":"ses01.local","name":"ses01","version":"6.8.0"},"event":{"duration":413415,"dataset":"system.load"}}
{"system":{"cpu":{"system":{"pct":0.0205},"user":{"pct":0.0205},"idle":{"pct":1.9385},"cores":2,"irq":{"pct":0},"iowait":{"pct":0.0103},"steal":{"pct":0},"nice":{"pct":0.0103},"total":{"pct":0.0512},"softirq":{"pct":0}}},"host":{"name":"ses01"},"tags":["beats_input_raw_event","metricbeat"],"metricset":{"module":"system","rtt":117,"name":"cpu"},"@timestamp":"2020-11-29T06:00:08.374Z","@version":"1","beat":{"hostname":"ses01.local","name":"ses01","version":"6.8.0"},"event":{"duration":117552,"dataset":"system.cpu"}}
The pipeline setup looks like following
~# cat /etc/logstash/pipelines.yml
---
- pipeline.id: metricbeat-input-aggregation
path.config: "/etc/logstash/pipelines/metricbeat/*.conf"
pipeline.workers: '1'
- pipeline.id: metricbeat-main
path.config: "/etc/logstash/pipelines/metricbeat/main/*.conf"
- pipeline.id: main
path.config: "/etc/logstash/conf.d/*.conf"
# tree /etc/logstash/pipelines
/etc/logstash/pipelines
└── metricbeat
├── 10_input.conf
├── 20_aggregation.conf
├── 30_output.conf
└── main
├── 10-input.conf
├── 20-filter-XXX.conf
├── 21-filter-YYY.conf
└── 30-output.conf
The configuration shown above sits in 20_aggregation.conf. 10_input.conf reads from beats plugin, whereas 30_output.conf sends all events to the main pipeline.
Since updating to logstash 6.8.13 there a no more crashes.