I am trying to create a centralized logging system using EFL. For gathering the logs from kubernetes nodes i am running fluentd as a deamonset. With this i am able to get single line logs but not able to get multiline logs .
Loading the fluentd.conf using config map.
kind: ConfigMap
apiVersion: v1
metadata:
name: fluentd-config-map
namespace: kube-system
uid: ee7f05ea-b6fc-4b11-a0d6-0e6a709bf4c1
data:
fluent.conf: |
<source>
@type prometheus
@id in_prometheus
bind "0.0.0.0"
port 24231
metrics_path "/metrics"
</source>
<source>
@type prometheus_output_monitor
@id in_prometheus_output_monitor
</source>
<label @FLUENT_LOG>
<match fluent.**>
@type null
@id ignore_fluent_logs
</match>
</label>
<source>
@type tail
@id in_tail_cluster_autoscaler
multiline_flush_interval 5s
path "/var/log/cluster-autoscaler.log"
pos_file "/var/log/fluentd-cluster-autoscaler.log.pos"
tag "cluster-autoscaler"
<parse>
@type "kubernetes"
unmatched_lines
expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
time_format "%m%d %H:%M:%S.%N"
</parse>
</source>
<source>
@type tail
@id in_tail_container_logs
path "/var/log/containers/*.log"
pos_file "/var/log/fluentd-containers.log.pos"
tag "kubernetes.*"
exclude_path /var/log/containers/fluent*
read_from_head true
<parse>
@type multi_format
<pattern>
format multiline
format_firstline /^\d{4}-\d{1,2}-\d{1,2}T\d{1,2}:\d{1,2}:\d{1,2}.\d{1,30}Z ?(stdout|stderr) \w \d{4}-\d{1,2}-\d{1,2} \d{1,2}:\d{1,2}:\d{1,2}.\d{3}/
format1 /^(?<time>\d{4}-\d{1,2}-\d{1,2}T\d{1,2}:\d{1,2}:\d{1,2}.\d{1,30}Z) (?<stream>stdout|stderr) ((?<logtag>.))? (?<logtime>\d{4}-\d{1,2}-\d{1,2} \d{1,2}:\d{1,2}:\d{1,2}.\d{3}) (?<log>.*)/
</pattern>
</parse>
</source>
<source>
@type tail
@id in_tail_docker
path "/var/log/docker.log"
pos_file "/var/log/fluentd-docker.log.pos"
tag "docker"
<parse>
@type "regexp"
expression /^time="(?<time>[^)]*)" level=(?<severity>[^ ]*) msg="(?<message>[^"]*)"( err="(?<error>[^"]*)")?( statusCode=($<status_code>\d+))?/
unmatched_lines
</parse>
</source>
<source>
@type tail
@id in_tail_etcd
path "/var/log/etcd.log"
pos_file "/var/log/fluentd-etcd.log.pos"
tag "etcd"
<parse>
@type "none"
unmatched_lines
</parse>
</source>
<source>
@type tail
@id in_tail_glbc
multiline_flush_interval 5s
path "/var/log/glbc.log"
pos_file "/var/log/fluentd-glbc.log.pos"
tag "glbc"
<parse>
@type "kubernetes"
unmatched_lines
expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
time_format "%m%d %H:%M:%S.%N"
</parse>
</source>
<source>
@type tail
@id in_tail_kube_apiserver_audit
multiline_flush_interval 5s
path "/var/log/kubernetes/kube-apiserver-audit.log"
pos_file "/var/log/kube-apiserver-audit.log.pos"
tag "kube-apiserver-audit"
<parse>
@type "multiline"
format_firstline "/^\\S+\\s+AUDIT:/"
format1 /^(?<time>\S+) AUDIT:(?: (?:id="(?<id>(?:[^"\\]|\\.)*)"|ip="(?<ip>(?:[^"\\]|\\.)*)"|method="(?<method>(?:[^"\\]|\\.)*)"|user="(?<user>(?:[^"\\]|\\.)*)"|groups="(?<groups>(?:[^"\\]|\\.)*)"|as="(?<as>(?:[^"\\]|\\.)*)"|asgroups="(?<asgroups>(?:[^"\\]|\\.)*)"|namespace="(?<namespace>(?:[^"\\]|\\.)*)"|uri="(?<uri>(?:[^"\\]|\\.)*)"|response="(?<response>(?:[^"\\]|\\.)*)"|\w+="(?:[^"\\]|\\.)*"))*/
time_format "%Y-%m-%dT%T.%L%Z"
unmatched_lines
</parse>
</source>
<source>
@type tail
@id in_tail_kube_apiserver
multiline_flush_interval 5s
path "/var/log/kube-apiserver.log"
pos_file "/var/log/fluentd-kube-apiserver.log.pos"
tag "kube-apiserver"
<parse>
@type "kubernetes"
unmatched_lines
expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
time_format "%m%d %H:%M:%S.%N"
</parse>
</source>
<source>
@type tail
@id in_tail_kube_controller_manager
multiline_flush_interval 5s
path "/var/log/kube-controller-manager.log"
pos_file "/var/log/fluentd-kube-controller-manager.log.pos"
tag "kube-controller-manager"
<parse>
@type "kubernetes"
unmatched_lines
expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
time_format "%m%d %H:%M:%S.%N"
</parse>
</source>
<source>
@type tail
@id in_tail_kube_proxy
multiline_flush_interval 5s
path "/var/log/kube-proxy.log"
pos_file "/var/log/fluentd-kube-proxy.log.pos"
tag "kube-proxy"
<parse>
@type "kubernetes"
unmatched_lines
expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
time_format "%m%d %H:%M:%S.%N"
</parse>
</source>
<source>
@type tail
@id in_tail_kube_scheduler
multiline_flush_interval 5s
path "/var/log/kube-scheduler.log"
pos_file "/var/log/fluentd-kube-scheduler.log.pos"
tag "kube-scheduler"
<parse>
@type "kubernetes"
unmatched_lines
expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
time_format "%m%d %H:%M:%S.%N"
</parse>
</source>
<source>
@type tail
@id in_tail_kubelet
multiline_flush_interval 5s
path "/var/log/kubelet.log"
pos_file "/var/log/fluentd-kubelet.log.pos"
tag "kubelet"
<parse>
@type "kubernetes"
unmatched_lines
expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
time_format "%m%d %H:%M:%S.%N"
</parse>
</source>
<source>
@type tail
@id in_tail_rescheduler
multiline_flush_interval 5s
path "/var/log/rescheduler.log"
pos_file "/var/log/fluentd-rescheduler.log.pos"
tag "rescheduler"
<parse>
@type "kubernetes"
unmatched_lines
expression /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<message>.*)/m
time_format "%m%d %H:%M:%S.%N"
</parse>
</source>
<source>
@type tail
@id in_tail_minion
path "/var/log/salt/minion"
pos_file "/var/log/fluentd-salt.pos"
tag "salt"
<parse>
@type "regexp"
expression /^(?<time>[^ ]* [^ ,]*)[^\[]*\[[^\]]*\]\[(?<severity>[^ \]]*) *\] (?<message>.*)$/
time_format "%Y-%m-%d %H:%M:%S"
unmatched_lines
</parse>
</source>
<source>
@type tail
@id in_tail_startupscript
path "/var/log/startupscript.log"
pos_file "/var/log/fluentd-startupscript.log.pos"
tag "startupscript"
<parse>
@type "syslog"
unmatched_lines
</parse>
</source>
<filter kubernetes.**>
@type kubernetes_metadata
@id filter_kube_metadata
kubernetes_url "https://x.x.x.x:443/api"
verify_ssl true
ca_file ""
skip_labels false
skip_container_metadata false
skip_master_url false
skip_namespace_metadata false
watch true
</filter>
<match **>
@type elasticsearch
@id out_es
@log_level "info"
include_tag_key true
host "x.x.x.x"
port 9200
path ""
scheme https
ssl_verify false
ssl_version TLSv1_2
user "kibana10"
password poorab
reload_connections false
reconnect_on_error true
reload_on_failure true
log_es_400_reason false
logstash_prefix "logstash"
logstash_dateformat "%Y.%m.%d"
logstash_format true
index_name "logstash"
target_index_key
type_name "fluentd"
include_timestamp false
template_name
template_file
template_overwrite false
sniffer_class_name "Fluent::Plugin::ElasticsearchSimpleSniffer"
request_timeout 5s
application_name default
<buffer>
flush_thread_count 8
flush_interval 5s
chunk_limit_size 2M
queue_limit_length 32
retry_max_interval 30
retry_forever true
</buffer>
</match>
Deployment File
kind: DaemonSet
apiVersion: apps/v1
metadata:
name: fluentd
namespace: kube-system
labels:
k8s-app: fluentd-logging
version: v1
kubernetes.io/cluster-service: "true"
spec:
selector:
matchLabels:
k8s-app: fluentd-logging
version: v1
template:
metadata:
labels:
k8s-app: fluentd-logging
version: v1
kubernetes.io/cluster-service: "true"
spec:
serviceAccount: fluentd
serviceAccountName: fluentd
tolerations:
- key: node-role.kubernetes.io/master
effect: NoSchedule
volumes:
- name: fluentd-config-map
configMap:
name: fluentd-config-map
- name: varlog
hostPath:
path: /var/log
- name: varlibdockercontainers
hostPath:
path: /var/lib/docker/containers
containers:
- name: fluentd
image: 'fluent/fluentd-kubernetes-daemonset:v1.16-debian-elasticsearch8-2'
env:
- name: FLUENT_ELASTICSEARCH_HOST
value: x.x.x.x
- name: FLUENT_ELASTICSEARCH_PORT
value: '9200'
- name: FLUENT_ELASTICSEARCH_SCHEME
value: https
- name: FLUENT_ELASTICSEARCH_SSL_VERIFY
value: 'false'
- name: FLUENT_ELASTICSEARCH_SSL_VERSION
value: TLSv1_2
- name: FLUENT_ELASTICSEARCH_USER
value: kibana
- name: FLUENT_ELASTICSEARCH_PASSWORD
value: password
- name: FLUENT_UID
value: "0"
- name: FLUENTD_SYSTEMD_CONF
value: disable
- name: FLUENT_CONTAINER_TAIL_EXCLUDE_PATH
value: /var/log/containers/fluent*
- name: FLUENT_CONTAINER_TAIL_PARSER_TYPE
value: /^(?<time>.+) (?<stream>stdout|stderr)( (?<logtag>.))? (?<log>.*)$/
resources:
limits:
cpu: 100m
memory: 200Mi
requests:
cpu: 100m
memory: 200Mi
volumeMounts:
- name: fluentd-config-map
mountPath: /fluentd/etc/fluent.conf
subPath: fluent.conf
- name: varlog
mountPath: /var/log
- name: varlibdockercontainers
readOnly: true
mountPath: /var/lib/docker/containers
Logs which i am trying to pull as single record in elastic search
2024-01-29 16:16:41.255 ERROR 1 --- [-StreamThread-1] c.o.b.d.p.app.transforms.ProcessorImpl : Exception while parsing string {
"sourceServer": "EC2AMAZ-TET8NM1",
"timestamp": "2024-01-29T16:16:41.113Z",
"messageType": "trackerLogRecord",
"trackerLogRecord": {
"genericAttributes": {
"synergyDeviceId": "aaaa",
"timestamp": "2024-01-29T16:15:37.390Z",
"rawTimestamp": "303d81f2a7",
"physicalOffset": gggg,
"logicalOffset": 18,
"totalRecordLen": 1,
"type": 6,
"extendedType": 2,
"payloadLen": 3,
"rPayload": "100"
},
"recordType": "Temperature",
"BoardTemperature": {
"boardTemperature": 2262.0
}
}
}, Cannot deserialize value of type `LogRecordType` from String "BoardTemperature": not one of the values accepted for Enum class: [trackerVersion ]
at [Source: (String)"{
"sourceServer": "xxxxxxx",
"timestamp": "2024-01-29T16:16:41.113Z",
"messageType": "LogRecord",
"trackerLogRecord": {
"genericAttributes": {
"synergyDeviceId": "aaaa",
"timestamp": "2024-01-29T16:15:37.390Z",
"rawTimestamp": "303d81f2a7",
"physicalOffset": gggg,
"logicalOffset": 18,
"totalRecordLen": 1,
"type": 6,
"extendedType": 2,
"payloadLen": 3,
"rPayload": "100"[truncated 148 chars]; line: 19, column: 19] (through reference chain: record["LogRecord"]->LogRecord["recordType"])
[Ljava.lang.StackTraceElement;@315370cc
2024-01-29 16:16:41.255 INFO 1 --- [-StreamThread-1] c.o.b.d.p.app.kafka.producer.Producer : Inserting record to: error
Regex i checked ... its fine . While pulling records fro kubernetes pods , fluentd adds some fields (2024-01-30T08:54:55.519393497Z stdout F 2024-01-30 08:54:55,519 INFO 1 --- [ scheduling-1] c.o.b.l.d.ExceptionLogger :) by itself which i have considered.
*** while pulling the logs from k8s pods , fluentd is adding three extra fields time , stream and logtype ex - 2024-01-30T08:54:55.519393497Z stdout F ... *** This is considered in regex and its working.
Below is explaination of first line of multiline first line regex
format_firstline /^\d{4}-\d{1,2}-\d{1,2}T\d{1,2}:\d{1,2}:\d{1,2}.\d{1,30}Z ?(stdout|stderr) \w \d{4}-\d{1,2}-\d{1,2} \d{1,2}:\d{1,2}:\d{1,2}.\d{3}/
here first date regex represents the date added by fluent - This is there with every record added by fluentd
second is stream (stdout or stderr) - This is there with every record added by fluentd
third denotes log type single character - - This is there with every record added by fluentd
fourth date is the actual recieved in the logs and this indicates the start of the exception as rest of the lines in exception does not contains this date .
Currently i am able to capture only first line of exception or if i change this i recieve every line of exception as an individual record.I need to capture complete exception as single record in elastic search.
*** Thanks and advance ***