AWS MSK to Firehose record transformation before landing into S3

189 views Asked by At

I`m trying to use AWS MSK as a source in AWS Firehose delivery stream, but I have hit a dead-end in trying to apply data transformation to the Firehose stream before it dumps data to S3.
First of all, according to AWS Documentation valid values for the type parameter are: RecordDeAggregation | Lambda | MetadataExtraction | AppendDelimiterToRecord | Decompression. The one I'm interested in is AppendDelimiterToRecord, but deploying a Firehose Stream with correctly set AppendDelimiterToRecord processor type results in absolutely nothing, nor is this option available when deploying/editing the Firehose Stream with AWS Console.
Have I misconfigured something or is AWS just being weird?
Second of all, using Lambda as a processor results in the following error message, which arrives into /processing-failed prefix in S3 bucket, "errorCode":"Lambda.InvalidReturnFormat","errorMessage":"The data field cannot be null if status is Ok". The data field is certainly not null, because I can see from Lamdba logs that it's populated. No errors during processing in Lambda either.
Again, have I missed something or is AWS being weird?

P.S Can't share any code due to corporate policies, but I'm using the latest version of Terraform to create all of the objects mentioned above
P.S.S Lambda was created from AWS blueprint for Lambda to Kinesis integration

1

There are 1 answers

0
Kojimba On BEST ANSWER

Apparently, even though the error message of Firehose clearly states errorCode":"Lambda.InvalidReturnFormat","errorMessage":"The data field cannot be null if status is Ok" the actual field that you have to populate in the response payload of Lambda is kafkaRecordValue (in base64 encoded string).
So the full payload will be

{
  "records": [
    {"recordId": "123", "result": "Ok", "kafkaRecordValue": "YsAxazaS"},
    {"recordId": "123", "result": "Ok", "kafkaRecordValue": "YsAxazaS"},
    ...
  ]
}

and not

{
  "records": [
    {"recordId": "123", "result": "Ok", "data": "YsAxazaS"},
    {"recordId": "123", "result": "Ok", "data": "YsAxazaS"},
    ...
  ]
}

P.S I'm so disappointed in AWS Docs and the clarity of existing docs I`m honestly holding back from cursing here