AWS Kinesis .NET Consumer

Asked by At

I am experimenting with producer and consumer using AWS Kinesis and the issue is the consumer keeps receiving the first message (or record) that we produced though we have changed the data object sent multiple times . Additionally we have tried multiple ShardIteratorType and none have worked. Latest produces no results, all others produce the same original record.

using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using Amazon;
using Amazon.Internal;
using Amazon.Kinesis;
using Amazon.Kinesis.Model;
using BenchmarkRuleSetModel.Models;
using MongoDB.Driver;
using Newtonsoft.Json;

namespace ConsoleApp7
{
    internal class Program
    {
        private static AmazonKinesisClient _client;
        private static string _streamName;

        static async Task ReadFromStream()
        {
            var kinesisStreamName = _streamName;

            var describeRequest = new DescribeStreamRequest
            {
                StreamName = kinesisStreamName,
            };

            var describeResponse = await _client.DescribeStreamAsync(describeRequest);
            var shards = describeResponse.StreamDescription.Shards;

            foreach (var shard in shards)
            {
                var iteratorRequest = new GetShardIteratorRequest
                {
                    StreamName = kinesisStreamName,
                    ShardId = shard.ShardId,
                    ShardIteratorType = ShardIteratorType.AT_TIMESTAMP,
                    Timestamp = DateTime.MinValue
                };

                var iteratorResponse = await _client.GetShardIteratorAsync(iteratorRequest);
                var iteratorId = iteratorResponse.ShardIterator;

                while (!string.IsNullOrEmpty(iteratorId))
                {
                    var getRequest = new GetRecordsRequest
                    {
                        ShardIterator = iteratorId, Limit = 10000
                    };

                    var getResponse = await _client.GetRecordsAsync(getRequest);
                    var nextIterator = getResponse.NextShardIterator;
                    var records = getResponse.Records;

                    if (records.Count > 0)
                    {
                        Console.WriteLine("Received {0} records. ", records.Count);
                        foreach (var record in records)
                        {
                            var json = Encoding.UTF8.GetString(record.Data.ToArray());
                            Console.WriteLine("Json string: " + json);
                        }
                    }

                    iteratorId = nextIterator;
                }
            }
        }

        private static async Task<string> Produce()
        {
            var data = new
            {
                Message = "Hello world!",
                Author = "Amir"
            };

            //convert to byte array in prep for adding to stream
            var oByte = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data));

            using (var ms = new MemoryStream(oByte))
            {
                //create put request
                var requestRecord = new PutRecordRequest
                {
                    StreamName = _streamName,
                    PartitionKey = Guid.NewGuid().ToString(),
                    Data = ms
                };
                //list name of Kinesis stream
                //give partition key that is used to place record in particular shard
                //add record as memorystream

                //PUT the record to Kinesis
                var response = await _client.PutRecordAsync(requestRecord);

                return response.SequenceNumber;
            }
        }

        static void Main(string[] args)
        {
            _client = new AmazonKinesisClient("ExampleKey", "ExampleSecret", RegionEndpoint.EUWest2);

            _streamName = "SomeStream";

            Produce().Wait();

            ReadFromStream().Wait();
        }
    }
}

1 Answers

2
Misza On

First of all, as I have debugged your code, I noticed that it loops infinitely in the inner loop (while (!string.IsNullOrEmpty(iteratorId))) and never loops over all the shards in your stream (assuming you have >1). The reason is explained in https://docs.aws.amazon.com/streams/latest/dev/troubleshooting-consumers.html#getrecords-returns-empty - because the producer never called MergeShards or SplitShards, they remain open, thus NextShardIterator will never be NULL.

This is why you only ever see records put on the first shard (or at least I did when running your code) - you must read from shards in parallel.

As far as your usage pattern goes, you're using:

ShardIteratorType = ShardIteratorType.AT_TIMESTAMP,
Timestamp = DateTime.MinValue

By this, you're essentially telling Kinesis "give me all the records in the stream from the beginning of time" (or at least as far as the retention period reaches). That's why you keep seeing the same old records in addition to new ones (again, that's what I saw when I ran your code).

A GetRecords[Async] call does not actually remove records from the stream (see https://stackoverflow.com/a/25741304/4940707). The correct way of using Kinesis is to move checkpoint-to-checkpoint. If the consumer was to persist the SequenceNumber from the last record read and then restart as such:

ShardIteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER,
StartingSequenceNumber = lastSeenSequenceNumber

Then you'd see only newer records.