FluentBit S3 upload with container name as key in s3

1.2k views Asked by At

My log file name in s3 looks like kube.var.log.containers.development-api-connect-green-58db8964cb-wrzg5_default_api-connect-fa7cafd99a1bbb8bca002c8ab5e3b2aefc774566bb7e9eb054054112f43f1e87.log/ here I want to extract only container name from tag so that s3 is well structured like this :

s3://<bucket-name>/eks/<container-name>/YYYY/MM/DD/<object-name>

I tried extracting container-name with s3_key_format_tag_delimiters as _. but $TAG[2] has container id with it which I do not want. Can't split on - as well since container name can be like a or a-b or a-b-c... also.

Pod name in s3 path is not feasible as pod name keeps changing for any service when new pods comes up.

Is there any way to achieve this in fluentBit?

My current configuration looks like this:

apiVersion: v1
kind: ConfigMap
metadata:
  name: fluent-bit-config
  namespace: logging
  labels:
    k8s-app: fluent-bit
data:
  # Configuration files: server, input, filters and output
  # ======================================================
  fluent-bit.conf: |
    [SERVICE]
        Flush         1
        Log_Level     info
        Daemon        off
        Parsers_File  parsers.conf
        HTTP_Server   On
        HTTP_Listen   0.0.0.0
        HTTP_Port     2020

    @INCLUDE input-kubernetes.conf
    @INCLUDE filter-kubernetes.conf
    @INCLUDE output-elasticsearch.conf
    @INCLUDE output-s3.conf

  input-kubernetes.conf: |
    [INPUT]
        Name              tail
        Tag               kube.*
        Path              /var/log/containers/*_default_*.log
        Parser            docker
        DB                /var/log/flb_kube.db
        Mem_Buf_Limit     5MB
        Skip_Long_Lines   On
        Refresh_Interval  10

  filter-kubernetes.conf: |
    [FILTER]
        Name                kubernetes
        Match               kube.*
        Kube_URL            https://kubernetes.default.svc:443
        Kube_CA_File        /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
        Kube_Token_File     /var/run/secrets/kubernetes.io/serviceaccount/token
        Kube_Tag_Prefix     kube.var.log.containers.
        Merge_Log           Off
        Merge_Log_Key       log_processed
        K8S-Logging.Parser  On
        K8S-Logging.Exclude Off

  output-elasticsearch.conf: |
    [OUTPUT]
        Name            es
        Match           *
        Host            ${FLUENT_ELASTICSEARCH_HOST}
        Port            ${FLUENT_ELASTICSEARCH_PORT}
        Logstash_Format On
        Replace_Dots    On
        Retry_Limit     False
        tls On
        tls.verify Off
  output-s3.conf: |
    [OUTPUT]
        Name            s3
        Match           *
        bucket                       dev-eks-logs
        region                       us-east-1
        total_file_size              250M
        s3_key_format                /eks/%Y/%m/%d/
        s3_key_format_tag_delimiters .
  parsers.conf: |
    [PARSER]
        Name   apache
        Format regex
        Regex  ^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")?$
        Time_Key time
        Time_Format %d/%b/%Y:%H:%M:%S %z

    [PARSER]
        Name   apache2
        Format regex
        Regex  ^(?<host>[^ ]*) [^ ]* (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^ ]*) +\S*)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")?$
        Time_Key time
        Time_Format %d/%b/%Y:%H:%M:%S %z

    [PARSER]
        Name   apache_error
        Format regex
        Regex  ^\[[^ ]* (?<time>[^\]]*)\] \[(?<level>[^\]]*)\](?: \[pid (?<pid>[^\]]*)\])?( \[client (?<client>[^\]]*)\])? (?<message>.*)$

    [PARSER]
        Name   nginx
        Format regex
        Regex ^(?<remote>[^ ]*) (?<host>[^ ]*) (?<user>[^ ]*) \[(?<time>[^\]]*)\] "(?<method>\S+)(?: +(?<path>[^\"]*?)(?: +\S*)?)?" (?<code>[^ ]*) (?<size>[^ ]*)(?: "(?<referer>[^\"]*)" "(?<agent>[^\"]*)")?$
        Time_Key time
        Time_Format %d/%b/%Y:%H:%M:%S %z

    [PARSER]
        Name   json
        Format json
        Time_Key time
        Time_Format %d/%b/%Y:%H:%M:%S %z

    [PARSER]
        Name        docker
        Format      json
        Time_Key    time
        Time_Format %Y-%m-%dT%H:%M:%S.%L
        Time_Keep   On

    [PARSER]
        # http://rubular.com/r/tjUt3Awgg4
        Name cri
        Format regex
        Regex ^(?<time>[^ ]+) (?<stream>stdout|stderr) (?<logtag>[^ ]*) (?<message>.*)$
        Time_Key    time
        Time_Format %Y-%m-%dT%H:%M:%S.%L%z

    [PARSER]
        Name        syslog
        Format      regex
        Regex       ^\<(?<pri>[0-9]+)\>(?<time>[^ ]* {1,2}[^ ]* [^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?(?:[^\:]*\:)? *(?<message>.*)$
        Time_Key    time
        Time_Format %b %d %H:%M:%S
1

There are 1 answers

0
Ashish Kumar On

Use below code, its been tested & working for me. As its mentioned, you are also using ES, sharing code for dynamic indices as well.

Note: Set ENVIRONMENT, CLUSTER_NAME using configMap.

    [FILTER]
        Name    record_modifier
        Match *
        Record cluster_name ${CLUSTER_NAME}
    [FILTER]
        Name    record_modifier
        Match *
        Record app_name "labels_not_specified"
    [FILTER]
        Name    lua
        Match   kube.*
        script  es_index.lua
        call    append_es_index
    [FILTER]
        name    lua
        alias   set_std_keys
        match   kube.*
        script  s3_path.lua
        call   set_std_keys
    [FILTER]
        name rewrite_tag
        match kube.*
        rule $log ^.*$ s3.${ENVIRONMENT}.$cluster_name.$namespace_name.$app_name.$container_name true

Make sure INPUT, FILTER and OUTPUT block are placed correctly.

    [OUTPUT]
        Name            es
        Match           kube.*
        Host            xxxx
        Port            yyyy
        HTTP_User       dummy_username
        HTTP_Passwd     dummy_password
        Retry_Limit     False
        Logstash_Format On
        Logstash_Prefix_Key $es_index
        Logstash_DateFormat %Y-%m-%d
        Suppress_Type_Name On
        Tls   On
        Type  _doc
        Retry_Limit 5
        Buffer_Size 50M
        #Trace_Error On

    [OUTPUT]
        name s3
        match s3.*
        region ap-south-1
        bucket dummy-centralized-logging
        upload_timeout 2m
        use_put_object On
        content_type application/json
        compression gzip
        preserve_data_ordering On
        total_file_size              250M
        s3_key_format /$TAG[1]/$TAG[2]/$TAG[3]/$TAG[4]/$TAG[5]/%Y-%m-%d-%H/$UUID-%M-%S.log
        s3_key_format_tag_delimiters .

Mount these 2 files which is responsible for dynamic parsing for s3 path & es indices -


  s3_path.lua: |
    function set_std_keys(tag, timestamp, record)

        -- Pull up cluster
        if (record["cluster_name"] ~= nil) then
            record["cluster_name"] = record["cluster_name"]
        else
            record["cluster_name"] = "k8s"
        end

        if (record["kubernetes"] ~= nil) then
            kube = record["kubernetes"]

            -- Pull up namespace
            if (kube["namespace_name"] ~= nil and string.len(kube["namespace_name"]) > 0) then
                record["namespace_name"] = kube["namespace_name"]
            else
                record["namespace_name"] = "default"
            end

            -- Pull up container name
            if (kube["container_name"] ~= nil and string.len(kube["container_name"]) > 0) then
                record["container_name"] = kube["container_name"]
            end

            -- Pull up app name (Deployment, StateFuleSets, DaemonSet, Job, CronJob etc)
            if (kube["labels"] ~= nil) then
                labels = kube["labels"]

                if (labels["app"] ~= nil and string.len(labels["app"]) > 0) then
                    record["app_name"] = labels["app"]
                elseif (labels["app.kubernetes.io/instance"] ~= nil and string.len(labels["app.kubernetes.io/instance"]) > 0) then
                    record["app_name"] = labels["app.kubernetes.io/instance"]
                elseif (labels["k8s-app"] ~= nil and string.len(labels["k8s-app"]) > 0) then
                    record["app_name"] = labels["k8s-app"]
                elseif (labels["name"] ~= nil and string.len(labels["name"]) > 0) then
                    record["app_name"] = labels["name"]
                end
            else
                record["app_name"] = record["app_name"]
            end
        end

      return 2, timestamp, record
    end

  es_index.lua: |
    function append_es_index(tag, timestamp, record)
      new_record = record

      if (record["cluster_name"] ~= nil) then
          es_index = record["cluster_name"]
      else
          es_index = "k8s"
      end

      if (record["kubernetes"] ~= nil) then
          kube = record["kubernetes"]
          if (kube["namespace_name"] ~= nil and string.len(kube["namespace_name"]) > 0) then
              es_index = es_index .. "." .. kube["namespace_name"]
          else
              es_index = es_index .. "." .. "default"
          end

          if (kube["labels"] ~= nil) then
              labels = kube["labels"]

              if (labels["app"] ~= nil and string.len(labels["app"]) > 0) then
                  es_index = es_index .. "." ..  labels["app"]
              elseif (labels["app.kubernetes.io/instance"] ~= nil and string.len(labels["app.kubernetes.io/instance"]) > 0) then
                   es_index = es_index .. "." .. labels["app.kubernetes.io/instance"]
              elseif (labels["k8s-app"] ~= nil and string.len(labels["k8s-app"]) > 0) then
                   es_index = es_index .. "." .. labels["k8s-app"]
              elseif (labels["name"] ~= nil and string.len(labels["name"]) > 0) then
                   es_index = es_index .. "." .. labels["name"]
              end
          else
              es_index = es_index .. "." .. record["app_name"]
          end
      end

      new_record["es_index"] =  es_index

      return 1, timestamp, new_record
    end

Hope this will work. Enjoy!!