I am building a log ingestion service using AWS Kinesis Firehose and ElasticSearch. I sending logs to firehose using a nodejs api. This the structure of the log I am sending
{
"level": "error",
"message": "Failed to connect to DB",
"resourceId": "server-1234",
"timestamp": "2023-09-15T08:00:00Z",
"traceId": "abc-xyz-123",
"spanId": "span-456",
"commit": "5e5342f",
"metadata": {
"parentResourceId": "server-0987"
}
}
Before sending to firehose I am using JSON.stringify to convert this log object to string as firehose expects a string.However in my firehose delivery stream I am using a lambda function to transform it back into a json obejct. This is the lambda funciton I am using:
exports.handler = async (event) => {
const output = event.records.map((record) => {
// Decoding the base64 data
const payload = Buffer.from(record.data, 'base64').toString('utf8');
let parsedData;
try {
parsedData = JSON.parse(payload);
} catch (error) {
console.error('Error parsing JSON:', error);
return {
recordId: record.recordId,
result: 'ProcessingFailed',
data: record.data,
};
}
// Checking if 'message' exists and is a string that can be parsed as JSON
if (typeof parsedData.message === 'string') {
try {
const messageData = JSON.parse(parsedData.message);
parsedData = { ...parsedData, ...messageData };
delete parsedData.message;
} catch (error) {
console.error('Error parsing message JSON:', error);
}
}
console.log(parsedData)
// Re-encode the transformed data back to base64
const outputData = Buffer.from(JSON.stringify(parsedData)).toString('base64');
return {
recordId: record.recordId,
result: 'Ok',
data: outputData,
};
});
return { records: output };
};
However even after doing all this in my elastic search index the data is getting stored like this
Why is it storing it like a string? If I directly send the log to elasticsearch without having firehose in between it gets stored as a proper json object.Can I not send a json object using a firehose stream or am I missing some configuration???
this is my index mapping :
{
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
},
"aws": {
"properties": {
"firehose": {
"properties": {
"arn": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"parameters": {
"properties": {
"es_datastream_name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"request_id": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"kinesis": {
"properties": {
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"type": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
},
"cloud": {
"properties": {
"account": {
"properties": {
"id": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"provider": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"region": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"commit": {
"type": "keyword"
},
"level": {
"type": "keyword"
},
"message": {
"type": "text",
"fielddata": true
},
"metadata": {
"type": "nested",
"properties": {
"parentResourceId": {
"type": "keyword"
}
}
},
"resourceId": {
"type": "keyword"
},
"spanId": {
"type": "keyword"
},
"timestamp": {
"type": "date"
},
"traceId": {
"type": "keyword"
}
}
}
}