How to put 25k record to kinesis stream and Test tool to acknowledge it

3.4k views Asked by At

I have developed a piece of software which writes record to Amazon kinesis Stream web service. i am trying to understand do we have any software tool which will allow me to measure what maximum throughput my code is generating to Kinesis Stream for 1 Shard in one second. Yes i do agree it depends on hardware configurations too. But for start i want o know for general purpose machine then might be i will able to see horizontal scalability

With this i am trying to achieve 25k records per second to write to kinesis stream

Reference : Kinesis http://aws.amazon.com/kinesis/

2

There are 2 answers

0
Dmitri T On BEST ANSWER

I believe you can use Apache JMeter for this as

  1. Download and install JMeter
  2. Download Amazon Kinesis Java Client Library and drop jars to JMeter classpath (you can use /lib folder of your JMeter installation)
  3. Using JSR223 Sampler, "groovy" as a language and AmazonKinesisRecordProducerSample as a reference implement the code which will write records to stream

See Beanshell vs JSR223 vs Java JMeter Scripting: The Performance-Off You've Been Waiting For! guide for instructions on installing "groovy" engine support and scripting best practices.

2
Nishit NShah On

Thanks for the hints. I have figured a way out for working code in groovy to use AWS-Java-SDK to send records using Kinesis Stream: and here is the sample code:

/*
 * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *  http://aws.amazon.com/apache2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

import java.nio.ByteBuffer
import java.util.List
import java.util.concurrent.TimeUnit

import com.amazonaws.AmazonClientException
import com.amazonaws.AmazonServiceException
import com.amazonaws.auth.AWSCredentials
import com.amazonaws.auth.profile.ProfileCredentialsProvider
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.model.CreateStreamRequest
import com.amazonaws.services.kinesis.model.DescribeStreamRequest
import com.amazonaws.services.kinesis.model.DescribeStreamResult
import com.amazonaws.services.kinesis.model.ListStreamsRequest
import com.amazonaws.services.kinesis.model.ListStreamsResult
import com.amazonaws.services.kinesis.model.PutRecordRequest
import com.amazonaws.services.kinesis.model.PutRecordResult
import com.amazonaws.services.kinesis.model.ResourceNotFoundException
import com.amazonaws.services.kinesis.model.StreamDescription

 class AmazonKinesisRecordProducerSample {

    /*
     * Before running the code:
     *      Fill in your AWS access credentials in the provided credentials
     *      file template, and be sure to move the file to the default location
     *      (~/.aws/credentials) where the sample code will load the
     *      credentials from.
     *      https://console.aws.amazon.com/iam/home?#security_credential
     *
     * WARNING:
     *      To avoid accidental leakage of your credentials, DO NOT keep
     *      the credentials file in your source directory.
     */

     def kinesis

    def init() {
        /*
         * The ProfileCredentialsProvider will return your [default]
         * credential profile by reading from the credentials file located at
         * (~/.aws/credentials).
         */
            AWSCredentials credentials = null
            credentials = new ProfileCredentialsProvider().getCredentials()
             kinesis = new AmazonKinesisClient(credentials)         
    }    
}

 def amazonKinesisRecordProducerSample= new AmazonKinesisRecordProducerSample() 
amazonKinesisRecordProducerSample.init()

  def myStreamName="<KINESIS STREAM NAME>"

        println("Press CTRL-C to stop.")
        // Write records to the stream until this program is aborted.
        while (true) {
            def createTime = System.currentTimeMillis()
            def data='<Data IN STRING FORMAT>'
            def partitionkey="<PARTITION KEY>"
            def putRecordRequest = new PutRecordRequest()
            putRecordRequest.setStreamName(myStreamName)
           putRecordRequest.setData(ByteBuffer.wrap(String.valueOf(data).getBytes()))
          putRecordRequest.setPartitionKey(partitionkey)
            def putRecordResult = new PutRecordResult()
            putRecordResult = amazonKinesisRecordProducerSample.kinesis.putRecord(putRecordRequest)
            printf("Successfully put record, partition key : %s, ShardID : %s, SequenceNumber : %s.\n",
                    putRecordRequest.getPartitionKey(),
                    putRecordResult.getShardId(),
                    putRecordResult.getSequenceNumber())
        }

Note:This code will work only if you have Kinesis stream already created and is enabled.If you require to create the stream and then use it, please refer code example given in aws-java-sdk src folder.