TDLR;
I have a Kafka-enabled Azure Event Hub that I'm trying to connect to from Google Cloud's Dataflow service to stream the data into Google Big Query. I successfully can use the Kafka CLI to talk to the Azure Event Hub. However, with GCP, after 5 minutes, I get timeout errors in the GCP Dataflow job window.
Azure EH w/ Kafka enabled -> GCP Dataflow -> GCP Big Query table
Details
To set up the Kafka-enabled Event Hub, I followed the details on this GitHub page. It has the developer add a jaas.conf
and client_common.properties
. The jaas.conf
includes a reference to the login module along with a username/password. The username for Event Hubs with Kafka is $ConnectionString
. The password is the connection string copied from the CLI. The client_common.properties
contains two flags: security.protocol=SASL_SSL
and sasl.mechanism=PLAIN
. By configuring these files, I'm able to send and receive data using the Kafka CLI tools and the Azure Event Hub. I can see the data streaming from the producer to the consumer through the Azure Event Hub.
export KAFKA_OPTS="-Djava.security.auth.login.config=jaas.conf"
(echo -n "1|"; cat message.json | jq . -c) | kafka-conle-producer.sh --topic test-event-hub --broker-list test-eh-namespace.servicebus.windows.net:9093 --producer.config client_common.properties --property "parse.key=true" --property "key.separator=|"
kafka-console-consumer.sh --topic test-event-hub --bootstrap-server test-eh-namespace.servicebus.windows.net:9093 --consumer.config client_common.properties --property "print.key=true"
# prints: 1 { "transaction_time": "2020-07-20 15:14:54", "first_name": "Joe", "last_name": "Smith" }
I modified the Google's Data Flow template for Kafka -> Big Query. There was already a configuration map specified for the reseting of the offset. I added additional configuration to match the Azure Event Hubs with Kafka tutorial. While not best practice, I add the connection string to the password field to test. When I upload it to the GCP Data Flow engine and run the job, I get timeout errors every 5 minutes in the log and nothing ends up in Google Big Query.
Job Command
gcloud dataflow jobs run kafka-test --gcs-location=<removed> --region=us-east1 --worker-zone=us-east4-a --parameters bootstrapServers=test-eh-namespace.servicebus.servicebus.windows.net:9093,inputTopic=test-event-hub,outputTableSpec=project:Kafka_Test.test --service-account-email my-service-account.iam.gserviceaccount.com
Errors in GCP DataFlow
# these errors show up in the worker logs
Operation ongoing in step ReadFromKafka/KafkaIO.Read/Read(KafkaUnboundedSource)/DataflowRunner.StreamingUnboundedRead.ReadWithIds for at least 05m00s without outputting or completing in state process at java.lang.Thread.sleep(Native Method) at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:45) at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:366) at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1481) at com.google.cloud.teleport.kafka.connector.KafkaUnboundedSource.updatedSpecWithAssignedPartitions(KafkaUnboundedSource.java:85) at com.google.cloud.teleport.kafka.connector.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:125) at com.google.cloud.teleport.kafka.connector.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:45) at org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReader.iterator(WorkerCustomSources.java:433) at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:186) at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163) at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1426) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:163) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1105) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
Execution of work for computation 'S4' on key '0000000000000001' failed with uncaught exception. Work will be retried locally.
# this error shows up in the Job log
Error message from worker: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
Updated Configuration
Map<String, Object> props = new HashMap<>();
// azure event hub authentication
props.put("sasl.mechanism", "PLAIN");
props.put("security.protocol", "SASL_SSL")
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"<removed>\";");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// https://github.com/Azure/azure-event-hubs-for-kafka/blob/master/CONFIGURATION.md
props.put("request.timeout.ms", 60000);
props.put("session.timeout.ms", 15000);
props.put("max.poll.interval.ms", 30000);
props.put("offset.metadata.max.bytes", 1024);
props.put("connections.max.idle.ms", 180000);
props.put("metadata.max.age.ms", 180000);
Pipeline
PCollectionTuple convertedTableRows =
pipeline
/*
* Step #1: Read messages in from Kafka
*/
.apply(
"ReadFromKafka",
KafkaIO.<String, String>read()
.withConsumerConfigUpdates(ImmutableMap.of(props))
.withBootstrapServers(options.getBootstrapServers())
.withTopics(topicsList)
.withKeyDeserializerAndCoder(
StringDeserializer.class, NullableCoder.of(StringUtf8Coder.of()))
.withValueDeserializerAndCoder(
StringDeserializer.class, NullableCoder.of(StringUtf8Coder.of()))
.withoutMetadata())
/*
* Step #2: Transform the Kafka Messages into TableRows
*/
.apply("ConvertMessageToTableRow", new MessageToTableRow(options));
Overview
This application has a complex build process that was ported over from a GCP Data Flow templates. The build process brings over GCP Dataflow docker image construction and deployment scripts that are brought in as dependencies. Simply clone the repo to get started.
Prerequisites
Configure Environment Variables
First step is to set up the environment variables to configure the build and deployment scripts for the given application.
Modify, Build, & Upload Project
Before building, you will need to update ./kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQuery.java file with the additional content to handle the authentication string:
Once you have set up the project and the file changed, the next phase is building the docker image to upload to Google's Container Registry. This command will also build the
common
files that interact with miscellaneous Google services. If the build is successful, the container will be pushed into Google Container Registry (GCR). From the GCR, you can deploy into Google Dataflow.Create & Upload Image Spec (only done once)
Prior to launching the project in Dataflow, the Dataflow runner needs a Flex Template to know how to execute the project. The Flex Template is a JSON metadata file that contains parameters and instructions to construct the GCP Dataflow application. A Flex Template must be uploaded to Google Cloud Storage (GCS) to the corresponding bucket name set up by the environment variables. This step must match this environment variable
TEMPLATE_IMAGE_SPEC=${BUCKET_NAME}/images/${TEMPLATE_MODULE}-image-spec.json
.Execute the Image with Dataflow
Once you have an image uploaded to GCP and have uploaded a Flex Template, you can launch the Dataflow application. The parameters must match the parameters included in the Flex Template's metadata section.
Once you run this command, check in the GCP Cloud Console to view the status. The Dataflow Job should be working successfully at this point pulling messages from the Azure Event Grid and inserting them into Google Big Query.
The GCP repo assumes Google Big Query/Dataflow will dynamically make the tables to have the correct rows, but YMMV as I found this finky. The work around is to create the schema in Google Big Query in advance of running the Dataflow job.