logstash crashes with "NoMethodError: undefined method `multi_filter' for nil:NilClass"

1.2k views Asked by At

I'm using

  • logstash 6.4.3
  • with aggregate filter plugin 2.8.0
  • with multiple pipelines (though not sure if this is relevant)
  • with multiple aggregate filter in a sequence (though not sure if this is relevant)

and I'm trying to aggregate "load" and "cpu" events from metricbeat.

~# cat /etc/metricbeat/modules.d/system.yml 
- module: system
  period: 1s
  metricsets:
    - cpu
    - load

When starting logstash, I often get following exception. Please note that it sometimes takes minutes, sometimes it takes hours...

[2020-11-29T06:22:07,772][FATAL][logstash.runner ] An unexpected error occurred! {:error=>#<NoMethodError: undefined method multi_filter' for nil:NilClass>, :backtrace=>["(eval):241:in block in initialize'", "org/jruby/RubyArray.java:1734:in each'", "(eval):235:in block in initialize'", "(eval):25:in block in initialize'", "/opt/logstash/logstash-core/lib/logstash/pipeline.rb:499:in block in flush_filters'", "org/jruby/RubyArray.java:1734:in each'", "/opt/logstash/logstash-core/lib/logstash/pipeline.rb:498:in flush_filters'", "/opt/logstash/logstash-core/lib/logstash/pipeline.rb:539:in flush_filters_to_batch'", "/opt/logstash/logstash-core/lib/logstash/pipeline.rb:322:in worker_loop'", "/opt/logstash/logstash-core/lib/logstash/pipeline.rb:286:in block in start_workers'"]} [2020-11-29T06:22:07,824][WARN ][logstash.inputs.syslog ] syslog listener died {:protocol=>:udp, :address=>"127.0.0.1:5514", :exception=>#<IOError: IO Error>, :backtrace=>["/opt/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-syslog-3.4.1/lib/logstash/inputs/syslog.rb:152:in udp_listener'", "/opt/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-syslog-3.4.1/lib/logstash/inputs/syslog.rb:130:in server'", "/opt/logstash/vendor/bundle/jruby/2.3.0/gems/logstash-input-syslog-3.4.1/lib/logstash/inputs/syslog.rb:110:in block in run'"]}

I want to aggregate metricbeat events in the interval of 10, 60 and 120 seconds (no worries, times gonna be increased when everything is running)... and as mentioned... I therefore put 3 filters in sequence. The filters compute the average of metrics (like for the cpu metricset the average of irq) by summing up correspondig fields (e.g. in irq_sum) and dividing the sum by the number of occurred events int the pushed aggregation event upon timeout.

For example, an aggregated events looks like following

{"system":{"cpu":{"system":{"pct":0.018459999999999997},"user":{"pct":0.01845},"irq":{"pct":0},"iowait":{"pct":0.00819},"steal":{"pct":0},"nice":{"pct":0.032314999999999997},"softirq":{"pct":0.002565}}},"count":20,"task_id":"cpu_agg10","tags":["aggregated","agg10","metricbeat"],"metricset":{"name":"cpu"},"@timestamp":"2020-11-29T06:01:33.374Z","@version":"1"}

The configuration looks like following, wheres all three filter are quite identical.

filter {
  if [metricset][name] == "cpu" or [metricset][name] == "load" {
    aggregate {
      task_id => "%{[metricset][name]}_agg10"
      code => "
        map['timestamp'] ||= 0
        map['count'] ||= 0
        map['timestamp'] = event.get('@timestamp')

        if event.get('[metricset][name]') == 'cpu' then
          map['metricset_name'] = 'cpu'
          map['count'] += 1
          ['irq', 'nice', 'softirq', 'steal', 'system', 'user', 'iowait'].each do |metric|
            search_key = '[system][cpu][' + metric + '][pct]'
            map_key = metric + '_sum'
            map[map_key] ||= 0
            map[map_key] += event.get(search_key)
          end
        else
          map['metricset_name'] = 'load'
          map['count'] += 1
          ['1', '5', '15'].each do |metric|
            search_key = '[system][load][' + metric + ']'
            map_key = metric + '_sum'
            map[map_key] ||= 0
            map[map_key]  += event.get(search_key)
          end
        end
      "
      timeout_task_id_field        => "task_id"
      push_map_as_event_on_timeout => true
      timeout                      => 10
      timeout_tags                 => [ "aggregated", "agg10" ]
      timeout_code                 => "
        event.set('@timestamp', event.get('timestamp'))

        if event.get('metricset_name') == 'cpu' then
          event.set('[metricset][name]', 'cpu')
          ['irq', 'nice', 'softirq', 'steal', 'system', 'user', 'iowait'].each do |metric|
            key = '[system][cpu][' + metric + '][pct]'
            map_key = metric + '_sum'
            event.set(key, (event.get(map_key)/event.get('count')))
            event.remove(map_key)
          end
        else
          event.set('[metricset][name]', 'load')
          ['1', '5', '15'].each do |metric|
            key = '[system][load][' + metric + ']'
            map_key = metric + '_sum'
            event.set(key, (event.get(map_key)/event.get('count')))
            event.remove(map_key)
          end
        end

        event.remove('metricset_name')
        event.remove('timestamp')
      "
    }
    if "agg10" in [tags] {
      aggregate {
        task_id => "%{[metricset][name]}_agg30"
        code => "
            ... same as for first filter ...
        "
        timeout_task_id_field        => "task_id"
        push_map_as_event_on_timeout => true
        timeout                      => 30
        timeout_tags                 => [ "aggregated", "agg30" ]
        timeout_code                 => "
         ... same as for first filter ...
        "
      }
      if "agg30" in [tags] {
        aggregate {
          task_id => "%{[metricset][name]}_agg60"
          code => "
        ... same as for first filter ...
          "
          timeout_task_id_field        => "task_id"
          push_map_as_event_on_timeout => true
          timeout                      => 60
          timeout_tags                 => [ "aggregated", "agg60" ]
          timeout_code                 => "
        ... same as for first filter ...
          "
        }
      }
    }
    mutate { add_tag => ["metricbeat"] }
  }

  if "metricbeat" in [tags] {
    if "aggregated" not in [tags] {
      mutate { add_field => { "[@metadata][es_retention]"  => "ultrashort" } }
    } else if "agg10" in [tags] {
      mutate { add_field => { "[@metadata][es_retention]"  => "short" } }
    } else if "agg30" in [tags] {
      mutate { add_field => { "[@metadata][es_retention]"  => "med" } }
    } else if "agg60" in [tags] {
      mutate { add_field => { "[@metadata][es_retention]"  => "long" } }
    }
  }
}

BTW, metricbeat raw events look like following

{"system":{"load":{"norm":{"15":0.31,"5":0.45,"1":0.595},"15":0.62,"5":0.9,"cores":2,"1":1.19}},"host":{"name":"ses01"},"tags":["beats_input_raw_event","metricbeat"],"metricset":{"module":"system","rtt":413,"name":"load"},"@timestamp":"2020-11-29T06:00:07.375Z","@version":"1","beat":{"hostname":"ses01.local","name":"ses01","version":"6.8.0"},"event":{"duration":413415,"dataset":"system.load"}}
{"system":{"cpu":{"system":{"pct":0.0205},"user":{"pct":0.0205},"idle":{"pct":1.9385},"cores":2,"irq":{"pct":0},"iowait":{"pct":0.0103},"steal":{"pct":0},"nice":{"pct":0.0103},"total":{"pct":0.0512},"softirq":{"pct":0}}},"host":{"name":"ses01"},"tags":["beats_input_raw_event","metricbeat"],"metricset":{"module":"system","rtt":117,"name":"cpu"},"@timestamp":"2020-11-29T06:00:08.374Z","@version":"1","beat":{"hostname":"ses01.local","name":"ses01","version":"6.8.0"},"event":{"duration":117552,"dataset":"system.cpu"}}

The pipeline setup looks like following

~# cat /etc/logstash/pipelines.yml 
---
- pipeline.id: metricbeat-input-aggregation
  path.config: "/etc/logstash/pipelines/metricbeat/*.conf"
  pipeline.workers: '1'
- pipeline.id: metricbeat-main
  path.config: "/etc/logstash/pipelines/metricbeat/main/*.conf"
- pipeline.id: main
  path.config: "/etc/logstash/conf.d/*.conf"
# tree /etc/logstash/pipelines
/etc/logstash/pipelines
└── metricbeat
    ├── 10_input.conf
    ├── 20_aggregation.conf
    ├── 30_output.conf
    └── main
        ├── 10-input.conf
        ├── 20-filter-XXX.conf
        ├── 21-filter-YYY.conf
        └── 30-output.conf

The configuration shown above sits in 20_aggregation.conf. 10_input.conf reads from beats plugin, whereas 30_output.conf sends all events to the main pipeline.

1

There are 1 answers

0
Armin Abfalterer On

Since updating to logstash 6.8.13 there a no more crashes.