Message Hub & Confluent Kafka Connect S3

340 views Asked by At

I have requirement to consume messages from IBM MHub topic into IBM Object Storage.

I got it working with local Kafka server with Confluent Kafka Connect S3 plugin as standalone worker for sink Amazon S3 bucket and file. Both was a success.

If I configure Confluent Kafka Connect S3 as distributed worker for IBM MHub cluster I get no errors but still no messages end up to Amazon S3 bucket. I tried file sink also, no luck either.

Is it possible at all?

2

There are 2 answers

0
Jarkko Turpeinen On BEST ANSWER

From: https://kafka.apache.org/documentation/#connect_running

The parameters that are configured here are intended for producers and consumers used by Kafka Connect to access the configuration, offset and status topics. For configuration of Kafka source and Kafka sink tasks, the same parameters can be used but need to be prefixed with consumer. and producer. respectively. The only parameter that is inherited from the worker configuration is bootstrap.servers, which in most cases will be sufficient, since the same cluster is often used for all purposes. A notable exception is a secured cluster, which requires extra parameters to allow connections. These parameters will need to be set up to three times in the worker configuration, once for management access, once for Kafka sinks and once for Kafka sources.

So the solution was adding duplicate configuration with consumer. prefix into worker configuration so that required sasl_ssl settings took place instead of defaults on sink consumer.

IBM Cloud Object Storage also works. Requires credentials eg. env vars: AWS_ACCESS_KEY_ID="see cos credentials" & AWS_SECRET_ACCESS_KEY="see cos credentials"

Connector config:

{
"name": "s3-sink",
"config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "5",
    "topics": "your-topic",
    "s3.region": "eu-central-1",
    "store.url": "https://s3.eu-geo.objectstorage.softlayer.net",
    "s3.bucket.name": "your-bucket",
    "s3.part.size": "5242880",
    "flush.size": "1",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "schema.compatibility": "NONE",
    "name": "s3-sink"
}

}

1
SimonGormley On

You could try using the Message Hub (now known as Event Streams) Cloud Object Storage bridge : https://console.bluemix.net/docs/services/MessageHub/messagehub115.html#cloud_object_storage_bridge

Seems to match your requirement?