Not able to capture multiline logs using fluentd

58 views Asked by At

I am trying to create a centralized logging system using EFL. For gathering the logs from kubernetes nodes i am running fluentd as a deamonset. With this i am able to get single line logs but not able to get multiline logs .

Loading the fluentd.conf using config map.

kind: ConfigMap
apiVersion: v1
metadata:
  name: fluentd-config-map
  namespace: kube-system
  uid: ee7f05ea-b6fc-4b11-a0d6-0e6a709bf4c1
  
data:
      fluent.conf: |
          <source>
            @type prometheus
            @id in_prometheus
            bind "0.0.0.0"
            port 24231
            metrics_path "/metrics"
          </source>
          
          <source>
            @type prometheus_output_monitor
            @id in_prometheus_output_monitor
          </source>
          
          <label @FLUENT_LOG>
            <match fluent.**>
              @type null
              @id ignore_fluent_logs
            </match>
          </label>
          
          <source>
            @type tail
            @id in_tail_cluster_autoscaler
            multiline_flush_interval 5s
            path "/var/log/cluster-autoscaler.log"
            pos_file "/var/log/fluentd-cluster-autoscaler.log.pos"
            tag "cluster-autoscaler"
            <parse>
              @type "kubernetes"
              unmatched_lines
              expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
              time_format "%m%d %H:%M:%S.%N"
            </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_container_logs
            path "/var/log/containers/*.log"
            pos_file "/var/log/fluentd-containers.log.pos"
            tag "kubernetes.*"
            exclude_path /var/log/containers/fluent*
            read_from_head true
                <parse>
                    @type multi_format
                    <pattern>
                      format multiline
                      format_firstline /^\d{4}-\d{1,2}-\d{1,2}T\d{1,2}:\d{1,2}:\d{1,2}.\d{1,30}Z ?(stdout|stderr) \w \d{4}-\d{1,2}-\d{1,2} \d{1,2}:\d{1,2}:\d{1,2}.\d{3}/
                      format1 /^(?<time>\d{4}-\d{1,2}-\d{1,2}T\d{1,2}:\d{1,2}:\d{1,2}.\d{1,30}Z) (?<stream>stdout|stderr) ((?<logtag>.))? (?<logtime>\d{4}-\d{1,2}-\d{1,2} \d{1,2}:\d{1,2}:\d{1,2}.\d{3}) (?<log>.*)/
                    </pattern>           
                </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_docker
            path "/var/log/docker.log"
            pos_file "/var/log/fluentd-docker.log.pos"
            tag "docker"
            <parse>
              @type "regexp"
              expression /^time="(?<time>[^)]*)" level=(?<severity>[^ ]*) msg="(?<message>[^"]*)"( err="(?<error>[^"]*)")?( statusCode=($<status_code>\d+))?/
              unmatched_lines
            </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_etcd
            path "/var/log/etcd.log"
            pos_file "/var/log/fluentd-etcd.log.pos"
            tag "etcd"
            <parse>
              @type "none"
              unmatched_lines
            </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_glbc
            multiline_flush_interval 5s
            path "/var/log/glbc.log"
            pos_file "/var/log/fluentd-glbc.log.pos"
            tag "glbc"
            <parse>
              @type "kubernetes"
              unmatched_lines
              expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
              time_format "%m%d %H:%M:%S.%N"
            </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_kube_apiserver_audit
            multiline_flush_interval 5s
            path "/var/log/kubernetes/kube-apiserver-audit.log"
            pos_file "/var/log/kube-apiserver-audit.log.pos"
            tag "kube-apiserver-audit"
            <parse>
              @type "multiline"
              format_firstline "/^\\S+\\s+AUDIT:/"
              format1 /^(?<time>\S+) AUDIT:(?: (?:id="(?<id>(?:[^"\\]|\\.)*)"|ip="(?<ip>(?:[^"\\]|\\.)*)"|method="(?<method>(?:[^"\\]|\\.)*)"|user="(?<user>(?:[^"\\]|\\.)*)"|groups="(?<groups>(?:[^"\\]|\\.)*)"|as="(?<as>(?:[^"\\]|\\.)*)"|asgroups="(?<asgroups>(?:[^"\\]|\\.)*)"|namespace="(?<namespace>(?:[^"\\]|\\.)*)"|uri="(?<uri>(?:[^"\\]|\\.)*)"|response="(?<response>(?:[^"\\]|\\.)*)"|\w+="(?:[^"\\]|\\.)*"))*/
              time_format "%Y-%m-%dT%T.%L%Z"
              unmatched_lines
            </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_kube_apiserver
            multiline_flush_interval 5s
            path "/var/log/kube-apiserver.log"
            pos_file "/var/log/fluentd-kube-apiserver.log.pos"
            tag "kube-apiserver"
            <parse>
              @type "kubernetes"
              unmatched_lines
              expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
              time_format "%m%d %H:%M:%S.%N"
            </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_kube_controller_manager
            multiline_flush_interval 5s
            path "/var/log/kube-controller-manager.log"
            pos_file "/var/log/fluentd-kube-controller-manager.log.pos"
            tag "kube-controller-manager"
            <parse>
              @type "kubernetes"
              unmatched_lines
              expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
              time_format "%m%d %H:%M:%S.%N"
            </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_kube_proxy
            multiline_flush_interval 5s
            path "/var/log/kube-proxy.log"
            pos_file "/var/log/fluentd-kube-proxy.log.pos"
            tag "kube-proxy"
            <parse>
              @type "kubernetes"
              unmatched_lines
              expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
              time_format "%m%d %H:%M:%S.%N"
            </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_kube_scheduler
            multiline_flush_interval 5s
            path "/var/log/kube-scheduler.log"
            pos_file "/var/log/fluentd-kube-scheduler.log.pos"
            tag "kube-scheduler"
            <parse>
              @type "kubernetes"
              unmatched_lines
              expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
              time_format "%m%d %H:%M:%S.%N"
            </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_kubelet
            multiline_flush_interval 5s
            path "/var/log/kubelet.log"
            pos_file "/var/log/fluentd-kubelet.log.pos"
            tag "kubelet"
            <parse>
              @type "kubernetes"
              unmatched_lines
              expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
              time_format "%m%d %H:%M:%S.%N"
            </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_rescheduler
            multiline_flush_interval 5s
            path "/var/log/rescheduler.log"
            pos_file "/var/log/fluentd-rescheduler.log.pos"
            tag "rescheduler"
            <parse>
              @type "kubernetes"
              unmatched_lines
              expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
              time_format "%m%d %H:%M:%S.%N"
            </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_minion
            path "/var/log/salt/minion"
            pos_file "/var/log/fluentd-salt.pos"
            tag "salt"
            <parse>
              @type "regexp"
              expression /^(?<time>[^ ]* [^ ,]*)[^\[]*\[[^\]]*\]\[(?<severity>[^ \]]*) *\] (?<message>.*)$/
              time_format "%Y-%m-%d %H:%M:%S"
              unmatched_lines
            </parse>
          </source>
          
          <source>
            @type tail
            @id in_tail_startupscript
            path "/var/log/startupscript.log"
            pos_file "/var/log/fluentd-startupscript.log.pos"
            tag "startupscript"
            <parse>
              @type "syslog"
              unmatched_lines
            </parse>
          </source>
          
          <filter kubernetes.**>
            @type kubernetes_metadata
            @id filter_kube_metadata
            kubernetes_url "https://x.x.x.x:443/api"
            verify_ssl true
            ca_file ""
            skip_labels false
            skip_container_metadata false
            skip_master_url false
            skip_namespace_metadata false
            watch true
          </filter>
          
          <match **>
            @type elasticsearch
            @id out_es
            @log_level "info"
            include_tag_key true
            host "x.x.x.x"
            port 9200
            path ""
            scheme https
            ssl_verify false
            ssl_version TLSv1_2
            user "kibana10"
            password poorab
            reload_connections false
            reconnect_on_error true
            reload_on_failure true
            log_es_400_reason false
            logstash_prefix "logstash"
            logstash_dateformat "%Y.%m.%d"
            logstash_format true
            index_name "logstash"
            target_index_key
            type_name "fluentd"
            include_timestamp false
            template_name
            template_file
            template_overwrite false
            sniffer_class_name "Fluent::Plugin::ElasticsearchSimpleSniffer"
            request_timeout 5s
            application_name default
            <buffer>
              flush_thread_count 8
              flush_interval 5s
              chunk_limit_size 2M
              queue_limit_length 32
              retry_max_interval 30
              retry_forever true
            </buffer>
          </match>

Deployment File


kind: DaemonSet
apiVersion: apps/v1
metadata:
  name: fluentd
  namespace: kube-system
  labels:
    k8s-app: fluentd-logging
    version: v1
    kubernetes.io/cluster-service: "true"
spec:
  selector:
    matchLabels:
      k8s-app: fluentd-logging
      version: v1
  template:
    metadata:
      labels:
        k8s-app: fluentd-logging
        version: v1
        kubernetes.io/cluster-service: "true"
    spec:
      serviceAccount: fluentd
      serviceAccountName: fluentd
      tolerations:
        - key: node-role.kubernetes.io/master
          effect: NoSchedule
      volumes:
        - name: fluentd-config-map
          configMap:
            name: fluentd-config-map
        - name: varlog
          hostPath:
            path: /var/log
        - name: varlibdockercontainers
          hostPath:
            path: /var/lib/docker/containers
      containers:
        - name: fluentd
          image: 'fluent/fluentd-kubernetes-daemonset:v1.16-debian-elasticsearch8-2'
          env:
            - name: FLUENT_ELASTICSEARCH_HOST
              value: x.x.x.x
            - name: FLUENT_ELASTICSEARCH_PORT
              value: '9200'
            - name: FLUENT_ELASTICSEARCH_SCHEME
              value: https
            - name: FLUENT_ELASTICSEARCH_SSL_VERIFY
              value: 'false'
            - name: FLUENT_ELASTICSEARCH_SSL_VERSION
              value: TLSv1_2
            - name: FLUENT_ELASTICSEARCH_USER
              value: kibana
            - name: FLUENT_ELASTICSEARCH_PASSWORD
              value: password
            - name: FLUENT_UID
              value: "0"
            - name: FLUENTD_SYSTEMD_CONF
              value: disable
            - name: FLUENT_CONTAINER_TAIL_EXCLUDE_PATH
              value: /var/log/containers/fluent*
            - name: FLUENT_CONTAINER_TAIL_PARSER_TYPE
              value: /^(?<time>.+) (?<stream>stdout|stderr)( (?<logtag>.))? (?<log>.*)$/
          resources:
            limits:
              cpu: 100m
              memory: 200Mi
            requests:
              cpu: 100m
              memory: 200Mi
          volumeMounts:
            - name: fluentd-config-map
              mountPath: /fluentd/etc/fluent.conf
              subPath: fluent.conf
            - name: varlog
              mountPath: /var/log
            - name: varlibdockercontainers
              readOnly: true
              mountPath: /var/lib/docker/containers

Logs which i am trying to pull as single record in elastic search


2024-01-29 16:16:41.255 ERROR 1 --- [-StreamThread-1] c.o.b.d.p.app.transforms.ProcessorImpl   : Exception while parsing string {
  "sourceServer": "EC2AMAZ-TET8NM1",
  "timestamp": "2024-01-29T16:16:41.113Z",
  "messageType": "trackerLogRecord",
  "trackerLogRecord": {
    "genericAttributes": {
      "synergyDeviceId": "aaaa",
      "timestamp": "2024-01-29T16:15:37.390Z",
      "rawTimestamp": "303d81f2a7",
      "physicalOffset": gggg,
      "logicalOffset": 18,
      "totalRecordLen": 1,
      "type": 6,
      "extendedType": 2,
      "payloadLen": 3,
      "rPayload": "100"
    },
    "recordType": "Temperature",
    "BoardTemperature": {
      "boardTemperature": 2262.0
    }
  }
}, Cannot deserialize value of type `LogRecordType` from String "BoardTemperature": not one of the values accepted for Enum class: [trackerVersion ]
 at [Source: (String)"{
  "sourceServer": "xxxxxxx",
  "timestamp": "2024-01-29T16:16:41.113Z",
  "messageType": "LogRecord",
  "trackerLogRecord": {
    "genericAttributes": {
      "synergyDeviceId": "aaaa",
      "timestamp": "2024-01-29T16:15:37.390Z",
      "rawTimestamp": "303d81f2a7",
      "physicalOffset": gggg,
      "logicalOffset": 18,
      "totalRecordLen": 1,
      "type": 6,
      "extendedType": 2,
      "payloadLen": 3,
      "rPayload": "100"[truncated 148 chars]; line: 19, column: 19] (through reference chain: record["LogRecord"]->LogRecord["recordType"])
 [Ljava.lang.StackTraceElement;@315370cc
2024-01-29 16:16:41.255  INFO 1 --- [-StreamThread-1] c.o.b.d.p.app.kafka.producer.Producer    : Inserting record to: error

Regex i checked ... its fine . While pulling records fro kubernetes pods , fluentd adds some fields (2024-01-30T08:54:55.519393497Z stdout F 2024-01-30 08:54:55,519 INFO 1 --- [ scheduling-1] c.o.b.l.d.ExceptionLogger :) by itself which i have considered.

*** while pulling the logs from k8s pods , fluentd is adding three extra fields time , stream and logtype ex - 2024-01-30T08:54:55.519393497Z stdout F ... *** This is considered in regex and its working.

Below is explaination of first line of multiline first line regex 

format_firstline /^\d{4}-\d{1,2}-\d{1,2}T\d{1,2}:\d{1,2}:\d{1,2}.\d{1,30}Z ?(stdout|stderr) \w \d{4}-\d{1,2}-\d{1,2} \d{1,2}:\d{1,2}:\d{1,2}.\d{3}/


here first date regex represents the date added by fluent - This is there with every record added by fluentd
second is stream (stdout or stderr) - This is there with every record  added by fluentd
third denotes log type single character - - This is there with every record  added by fluentd
fourth date is the actual recieved in the logs and this indicates the start of the exception as rest of the lines in exception does not contains this date . 


Currently i am able to capture only first line of exception or if i change this i recieve every line of exception as an individual record.I need to capture complete exception as single record in elastic search.

*** Thanks and advance ***
0

There are 0 answers