I am trying to run Beam pipeline in local docker-compose environment on top of Flink. I need either to connect secure Kafka cluster from my application through kerberos. I wrote my own Dockerfile for Flink jobmanager and taskmanager.
Dockerfile for my-image-apache-beam/flink:1.16-java11:
FROM flink:1.16-java11
# python SDK
COPY --from=apache/beam_python3.10_sdk /opt/apache/beam/ /opt/apache/beam/
# java SDK
COPY --from=apache/beam_java11_sdk:2.51.0 /opt/apache/beam/ /opt/apache/beam_java/
COPY krb5.conf /etc/
My docker-compose.yml
version: "2.2"
services:
jobmanager:
image: my-image-apache-beam/flink:1.16-java11
ports:
- "8081:8081"
volumes:
- artifacts:/tmp/beam-artifact-staging
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager:
image: registry.kontur.host/srs/apache-beam/flink:1.16-java11
depends_on:
- jobmanager
command: taskmanager
ports:
- "8100-8200:8100-8200"
volumes:
- artifacts:/tmp/beam-artifact-staging
scale: 1
extra_hosts:
- "host.docker.internal:host-gateway"
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
taskmanager.memory.process.size: 2Gb
beam_job_server:
image: apache/beam_flink1.16_job_server
command: --flink-master=jobmanager --job-host=0.0.0.0
ports:
- "8097:8097"
- "8098:8098"
- "8099:8099"
volumes:
- artifacts:/tmp/beam-artifact-staging
python-worker-harness:
image: "apache/beam_python3.10_sdk"
command: "-worker_pool"
ports:
- "50000:50000"
volumes:
- artifacts:/tmp/beam-artifact-staging
volumes:
artifacts:
And eventually my pipeline:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.kafka import ReadFromKafka, WriteToKafka, default_io_expansion_service
import os
import logging
job_server = "localhost"
pipeline_external_environment = [
"--runner=PortableRunner",
f"--job_endpoint={job_server}:8099",
f"--artifact_endpoint={job_server}:8098",
"--environment_type=EXTERNAL",
"--environment_config=python-worker-harness:50000"
]
kafka_process_expansion_service = default_io_expansion_service(
append_args=[
"--defaultEnvironmentType=PROCESS",
"--defaultEnvironmentConfig={\"command\":\"/opt/apache/beam_java/boot\"}"
]
)
def run():
pipeline_options = PipelineOptions(pipeline_external_environment)
sasl_kerberos_principal = os.getenv('SASL_KERBEROS_PRINCIPAL')
sasl_kerberos_password = os.getenv('SASL_KERBEROS_PASSWORD')
source_config = {
'bootstrap.servers':
'kafka-host1:9093,kafka-host2:9093,kafka-host3:9093',
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanism': 'GSSAPI',
'sasl.kerberos.service.name': 'kafka',
'sasl.kerberos.principal': f'{sasl_kerberos_principal}',
'sasl.kerberos.kinit.cmd': f'kinit -R || echo {sasl_kerberos_password} | kinit {sasl_kerberos_principal}',
'sasl.jaas.config':
f'com.sun.security.auth.module.Krb5LoginModule required debug=true principal={sasl_kerberos_principal} useTicketCache=true;',
'group.id': 'test_group_1',
'auto.offset.reset': 'earliest'}
source_topic = 'Test_Source2-0_0_0_0.id-0'
sink_topic = 'Beam.Test'
with beam.Pipeline(options=pipeline_options) as pipeline:
outputs = (pipeline
| 'Read topic from Kafka' >> ReadFromKafka(consumer_config=source_config,
topics=[source_topic],
expansion_service=kafka_process_expansion_service
)
| 'Write topic to Kafka' >> WriteToKafka(producer_config=source_config,
topic=sink_topic,
expansion_service=kafka_process_expansion_service
)
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
But I got stuck with ERROR below:
INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Receive slot request 2f0a7a3cd89226651c2f84bd11e23321 for job 1dc3e31750be59cab4f2fcd0710b255e from resource manager with leader id 00000000000000000000000000000000.
2023-11-22 12:52:29,065 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Allocated slot for 2f0a7a3cd89226651c2f84bd11e23321.
2023-11-22 12:52:29,065 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job 1dc3e31750be59cab4f2fcd0710b255e for job leader monitoring.
2023-11-22 12:52:29,066 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to register at job manager akka.tcp://flink@jobmanager:6123/user/rpc/jobmanager_20 with leader id 00000000-0000-0000-0000-000000000000.
2023-11-22 12:52:29,073 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved JobManager address, beginning registration
2023-11-22 12:52:29,083 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful registration at job manager akka.tcp://flink@jobmanager:6123/user/rpc/jobmanager_20 for job 1dc3e31750be59cab4f2fcd0710b255e.
2023-11-22 12:52:29,084 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Establish JobManager connection for job 1dc3e31750be59cab4f2fcd0710b255e.
2023-11-22 12:52:29,084 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Offer reserved slots to the leader of job 1dc3e31750be59cab4f2fcd0710b255e.
2023-11-22 12:52:29,119 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 2f0a7a3cd89226651c2f84bd11e23321.
2023-11-22 12:52:29,122 INFO org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader [] - Creating a changelog storage with name 'memory'.
2023-11-22 12:52:29,123 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task Source: Impulse -> [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (23453970cf391f954eec648c133f6b45_cbc357ccb763df2852fee8c4fc7d55f2_0_0), deploy into slot with allocation id 2f0a7a3cd89226651c2f84bd11e23321.
2023-11-22 12:52:29,124 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Impulse -> [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (23453970cf391f954eec648c133f6b45_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from CREATED to DEPLOYING.
2023-11-22 12:52:29,125 INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task Source: Impulse -> [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (23453970cf391f954eec648c133f6b45_cbc357ccb763df2852fee8c4fc7d55f2_0_0) [DEPLOYING].
2023-11-22 12:52:29,127 INFO org.apache.flink.runtime.blob.BlobClient [] - Downloading 1dc3e31750be59cab4f2fcd0710b255e/p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a from jobmanager/172.19.0.2:6124
2023-11-22 12:52:29,145 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 2f0a7a3cd89226651c2f84bd11e23321.
2023-11-22 12:52:29,149 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task [5]{Read topic from Kafka, Write topic to Kafka} (1/1)#0 (23453970cf391f954eec648c133f6b45_508275ad2a106fd681f6d94bbcc7822d_0_0), deploy into slot with allocation id 2f0a7a3cd89226651c2f84bd11e23321.
2023-11-22 12:52:29,151 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 2f0a7a3cd89226651c2f84bd11e23321.
2023-11-22 12:52:29,150 INFO org.apache.flink.runtime.taskmanager.Task [] - [5]{Read topic from Kafka, Write topic to Kafka} (1/1)#0 (23453970cf391f954eec648c133f6b45_508275ad2a106fd681f6d94bbcc7822d_0_0) switched from CREATED to DEPLOYING.
2023-11-22 12:52:29,151 INFO org.apache.flink.runtime.taskmanager.Task [] - Loading JAR files for task [5]{Read topic from Kafka, Write topic to Kafka} (1/1)#0 (23453970cf391f954eec648c133f6b45_508275ad2a106fd681f6d94bbcc7822d_0_0) [DEPLOYING].
2023-11-22 12:52:31,693 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@1cce4869
2023-11-22 12:52:31,693 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader loads the state backend as HashMapStateBackend
2023-11-22 12:52:31,696 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage is set to 'jobmanager'
2023-11-22 12:52:31,727 INFO org.apache.flink.runtime.taskmanager.Task [] - [5]{Read topic from Kafka, Write topic to Kafka} (1/1)#0 (23453970cf391f954eec648c133f6b45_508275ad2a106fd681f6d94bbcc7822d_0_0) switched from DEPLOYING to INITIALIZING.
2023-11-22 12:52:33,035 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@7252a43f
2023-11-22 12:52:33,036 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader loads the state backend as HashMapStateBackend
2023-11-22 12:52:33,036 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - Checkpoint storage is set to 'jobmanager'
2023-11-22 12:52:33,038 INFO org.apache.flink.runtime.taskmanager.Task [] - Source: Impulse -> [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (23453970cf391f954eec648c133f6b45_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from DEPLOYING to INITIALIZING.
2023-11-22 12:52:33,384 WARN org.apache.flink.metrics.MetricGroup [] - The operator name [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} exceeded the 80 characters length limit and was truncated.
2023-11-22 12:52:33,461 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder [] - Finished to build heap keyed state-backend.
2023-11-22 12:52:33,473 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend [] - Initializing heap keyed state backend with stream factory.
2023-11-22 12:52:34,193 INFO org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Beam Fn Logging client connected.
2023-11-22 12:52:35,529 WARN org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Logging client failed unexpectedly.
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.StatusRuntimeException: CANCELLED: client cancelled
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Status.asRuntimeException(Status.java:530) ~[blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?]
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:291) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?]
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?]
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?]
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?]
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?]
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:378) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?]
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:365) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?]
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:923) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?]
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?]
at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
2023-11-22 12:52:38,914 INFO org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory [] - Still waiting for startup of environment '/opt/apache/beam_java/boot' for worker id 1-1
2023-11-22 12:52:38,925 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Impulse -> [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (23453970cf391f954eec648c133f6b45_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from INITIALIZING to FAILED with failure cause: org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 1
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:452)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:437)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:304)
at org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:207)
at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:258)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.IllegalStateException: Process died with exit code 1
at org.apache.beam.runners.fnexecution.environment.ProcessManager$RunningProcess.isAliveOrThrow(ProcessManager.java:75)
at org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory.createEnvironment(ProcessEnvironmentFactory.java:110)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:253)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:232)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080)
... 20 more
2023-11-22 12:52:38,925 WARN org.apache.flink.runtime.taskmanager.Task [] - [5]{Read topic from Kafka, Write topic to Kafka} (1/1)#0 (23453970cf391f954eec648c133f6b45_508275ad2a106fd681f6d94bbcc7822d_0_0) switched from INITIALIZING to FAILED with failure cause: org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 1
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:452)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:437)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:304)
at org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:207)
at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:258)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.IllegalStateException: Process died with exit code 1
at org.apache.beam.runners.fnexecution.environment.ProcessManager$RunningProcess.isAliveOrThrow(ProcessManager.java:75)
at org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory.createEnvironment(ProcessEnvironmentFactory.java:110)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:253)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:232)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:452)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:437)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:304)
at org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:207)
at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:258)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
... 7 more
2023-11-22 12:52:38,927 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: Impulse -> [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (23453970cf391f954eec648c133f6b45_cbc357ccb763df2852fee8c4fc7d55f2_0_0).
2023-11-22 12:52:38,927 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for [5]{Read topic from Kafka, Write topic to Kafka} (1/1)#0 (23453970cf391f954eec648c133f6b45_508275ad2a106fd681f6d94bbcc7822d_0_0).
2023-11-22 12:52:38,934 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task [5]{Read topic from Kafka, Write topic to Kafka} (1/1)#0 23453970cf391f954eec648c133f6b45_508275ad2a106fd681f6d94bbcc7822d_0_0.
2023-11-22 12:52:38,941 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: Impulse -> [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 23453970cf391f954eec648c133f6b45_cbc357ccb763df2852fee8c4fc7d55f2_0_0.
2023-11-22 12:52:39,055 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:1, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1, taskHeapMemory=268.800mb (281857222 bytes), taskOffHeapMemory=0 bytes, managedMemory=317.440mb (332859969 bytes), networkMemory=79.360mb (83214992 bytes)}, allocationId: 2f0a7a3cd89226651c2f84bd11e23321, jobId: 1dc3e31750be59cab4f2fcd0710b255e).
I can't realise what could cause this. Glad to get any help!