Can't set document_id for deduplicating docs in Filebeat

496 views Asked by At

What are you trying to do?

I have location data of some sensors, I want to make geo-spatial queries to find which sensors are in a specific area (query by polygon, bounding-box, etc). The location data (lat-lon) for these sensors may change in the future. I should be able to paste json files in ndjson format in the watched folder and overwrite the existing data with the new location data for each sensor. I also have another filestream input for the indexing the logs of these sensors.

I went through docs for deduplication and filestream input for ndjson and followed them exactly.


Show me your configs.

# ============================== Filebeat inputs ===============================

filebeat.inputs:

- type: filestream
  id: "log"
  enabled: true
  paths:
    - D:\EFK\Data\Log\*.json
  parsers:
    - ndjson:
        keys_under_root: true
        add_error_key: true
  fields.doctype: "log"

- type: filestream
  id: "loc"
  enabled: true
  paths:
    - D:\EFK\Data\Location\*.json
  
  parsers:
    - ndjson:
        keys_under_root: true
        add_error_key: true
        document_id: "Id" # Not working as expected.
  
  fields.doctype: "location"
  
  processors:
    - copy_fields:
        fields:
          - from: "Lat"
            to: "fields.location.lat"
        fail_on_error: false
        ignore_missing: true
  
    - copy_fields:
        fields:
          - from: "Long"
            to: "fields.location.lon"
        fail_on_error: false
        ignore_missing: true


# ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:
  hosts: ["localhost:9200"]
  index: "sensor-%{[fields.doctype]}"
  
setup.ilm.enabled: false
setup.template:
  name: "sensor_template"
  pattern: "sensor-*"


# ------------------------------ Global Processors --------------------------
processors:
- drop_fields:
    fields: ["agent", "ecs", "input", "log", "host"]

What does your input file look like?

{"Id":1,"Lat":19.000000,"Long":20.00000,"key1":"value1"}
{"Id":2,"Lat":19.000000,"Long":20.00000,"key1":"value1"}
{"Id":3,"Lat":19.000000,"Long":20.00000,"key1":"value1"}

It's the 'Id' field here that I want to use for deduplicating (overwriting with new) documents.


Update 10/05/22 : I have also tried working with:

  1. json.document_id: "Id"
    filebeat.inputs
      - type: filestream
        id: "loc"
        enabled: true
        paths:
          - D:\EFK\Data\Location\*.json
        json.document_id: "Id"
    
  2. ndjson.document_id: "Id"
    filebeat.inputs
      - type: filestream
        id: "loc"
        enabled: true
        paths:
          - D:\EFK\Data\Location\*.json
        ndjson.document_id: "Id"
    
  3. Straight up document_id: "Id"
    filebeat.inputs
      - type: filestream
        id: "loc"
        enabled: true
        paths:
          - D:\EFK\Data\Location\*.json
        document_id: "Id"
    
  4. Trying to overwrite _id using copy_fields
    processors:
      - copy_fields:
          fields:
            - from: "Id"
              to: "@metadata_id"
          fail_on_error: false
          ignore_missing: true
    
    

Elasticsearch config has nothing special other than disabled security. And it's all running on localhost.

Version used for Elasticsearch, Kibana and Filebeat: 8.1.3

Please do comment if you need more info :)

References:

  1. Deduplication in Filebeat: https://www.elastic.co/guide/en/beats/filebeat/8.2/filebeat-deduplication.html#_how_can_i_avoid_duplicates
  2. Filebeat ndjson input: https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-filestream.html#_ndjson
  3. Copy_fields in Filebeat: https://www.elastic.co/guide/en/beats/filebeat/current/copy-fields.html#copy-fields
1

There are 1 answers

0
Paulo On

Tldr;

I believe you are close to the solution.

There are indeed a few way to set the @metadata._id Which are documented

Solution

Set up:

I have this document:

data.ndjson

{"content":"IM: Orchestration","level":"info", "@metadata":{"_id": "1"}}
{"content":"Investment Management","level":"info", "@metadata":{"_id": "2"}}
{"content":"IM: Orchestration","level":"info"}

And those settings:

filebeat.yml

filebeat.inputs:
- type: filestream
  paths: /usr/share/filebeat/data.ndjson

processors:
  - decode_json_fields:
      document_id: "@metadata._id"
      fields: ["message"]
      max_depth: 1
      target: ""

output.console:
  pretty: true

Results

When I am running those I get:

{
  "@timestamp": "2023-03-02T10:14:11.133Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "8.6.2",
    "_id": "1"
  },
  "input": {
    "type": "filestream"
  },
  "ecs": {
    "version": "8.0.0"
  },
  "host": {
    "name": "ef6c45b470cd"
  },
  "agent": {
    "id": "8260db74-3283-4841-9ff6-47e9fb3109bb",
    "name": "ef6c45b470cd",
    "type": "filebeat",
    "version": "8.6.2",
    "ephemeral_id": "ef8a508d-924c-45cc-9cdb-404c0ae34974"
  },
  "content": "IM: Orchestration",
  "level": "info",
  "message": "{\"content\":\"IM: Orchestration\",\"level\":\"info\", \"@metadata\":{\"_id\": \"1\"}}",
  "log": {
    "offset": 0,
    "file": {
      "path": "/usr/share/filebeat/data.ndjson"
    }
  }
}
{
  "@timestamp": "2023-03-02T10:14:11.133Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "8.6.2",
    "_id": "2"
  },
  "agent": {
    "id": "8260db74-3283-4841-9ff6-47e9fb3109bb",
    "name": "ef6c45b470cd",
    "type": "filebeat",
    "version": "8.6.2",
    "ephemeral_id": "ef8a508d-924c-45cc-9cdb-404c0ae34974"
  },
  "ecs": {
    "version": "8.0.0"
  },
  "host": {
    "name": "ef6c45b470cd"
  },
  "content": "Investment Management",
  "level": "info",
  "log": {
    "offset": 73,
    "file": {
      "path": "/usr/share/filebeat/data.ndjson"
    }
  },
  "message": "{\"content\":\"Investment Management\",\"level\":\"info\", \"@metadata\":{\"_id\": \"2\"}}",
  "input": {
    "type": "filestream"
  }
}
{
  "@timestamp": "2023-03-02T10:14:11.133Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "8.6.2"
  },
  "host": {
    "name": "ef6c45b470cd"
  },
  "agent": {
    "type": "filebeat",
    "version": "8.6.2",
    "ephemeral_id": "ef8a508d-924c-45cc-9cdb-404c0ae34974",
    "id": "8260db74-3283-4841-9ff6-47e9fb3109bb",
    "name": "ef6c45b470cd"
  },
  "content": "IM: Orchestration",
  "level": "info",
  "log": {
    "file": {
      "path": "/usr/share/filebeat/data.ndjson"
    },
    "offset": 150
  },
  "message": "{\"content\":\"IM: Orchestration\",\"level\":\"info\"}",
  "input": {
    "type": "filestream"
  },
  "ecs": {
    "version": "8.0.0"
  }
}

As you can see, the first two documents do have an id, the last one do not.

Conclusion

I expect you have to modify your config so that:

filebeat.yml

filebeat.inputs:
- type: filestream
  paths: /usr/share/filebeat/data.ndjson

processors:
  - decode_json_fields:
      document_id: "Id"
      fields: ["message"]
      max_depth: 1
      target: ""

output.console:
  pretty: true