ERROR while connecting to Kafka from Apache Beam on top of Flink

124 views Asked by At

I am trying to run Beam pipeline in local docker-compose environment on top of Flink. I need either to connect secure Kafka cluster from my application through kerberos. I wrote my own Dockerfile for Flink jobmanager and taskmanager.

Dockerfile for my-image-apache-beam/flink:1.16-java11:

FROM flink:1.16-java11



# python SDK
COPY --from=apache/beam_python3.10_sdk /opt/apache/beam/ /opt/apache/beam/

# java SDK
COPY --from=apache/beam_java11_sdk:2.51.0 /opt/apache/beam/ /opt/apache/beam_java/

COPY krb5.conf /etc/

My docker-compose.yml

version: "2.2"
services:
  jobmanager:
    image: my-image-apache-beam/flink:1.16-java11
    ports:
      - "8081:8081"
    volumes:
      - artifacts:/tmp/beam-artifact-staging
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager        

  taskmanager:
    image: registry.kontur.host/srs/apache-beam/flink:1.16-java11
    depends_on:
      - jobmanager
    command: taskmanager
    ports:
      - "8100-8200:8100-8200"
    volumes:
      - artifacts:/tmp/beam-artifact-staging
    scale: 1
    extra_hosts:
      - "host.docker.internal:host-gateway"
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2
        taskmanager.memory.process.size: 2Gb

  beam_job_server:
    image: apache/beam_flink1.16_job_server
    command: --flink-master=jobmanager --job-host=0.0.0.0
    ports:
      - "8097:8097"
      - "8098:8098"
      - "8099:8099"
    volumes:
      - artifacts:/tmp/beam-artifact-staging

  python-worker-harness:
    image: "apache/beam_python3.10_sdk"
    command: "-worker_pool"
    ports:
      - "50000:50000"
    volumes:
      - artifacts:/tmp/beam-artifact-staging


volumes:
    artifacts:

And eventually my pipeline:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.kafka import ReadFromKafka, WriteToKafka, default_io_expansion_service

import os
import logging


job_server = "localhost"


pipeline_external_environment = [
    "--runner=PortableRunner",
    f"--job_endpoint={job_server}:8099",
    f"--artifact_endpoint={job_server}:8098",
    "--environment_type=EXTERNAL",
    "--environment_config=python-worker-harness:50000"
]

kafka_process_expansion_service = default_io_expansion_service(
    append_args=[
        "--defaultEnvironmentType=PROCESS",
        "--defaultEnvironmentConfig={\"command\":\"/opt/apache/beam_java/boot\"}"
    ]
)


def run():
    pipeline_options = PipelineOptions(pipeline_external_environment)

    sasl_kerberos_principal = os.getenv('SASL_KERBEROS_PRINCIPAL')
    sasl_kerberos_password = os.getenv('SASL_KERBEROS_PASSWORD')

    source_config = {
        'bootstrap.servers':
            'kafka-host1:9093,kafka-host2:9093,kafka-host3:9093',
        'security.protocol': 'SASL_PLAINTEXT',
        'sasl.mechanism': 'GSSAPI',
        'sasl.kerberos.service.name': 'kafka',
        'sasl.kerberos.principal': f'{sasl_kerberos_principal}',
        'sasl.kerberos.kinit.cmd': f'kinit -R || echo {sasl_kerberos_password} | kinit {sasl_kerberos_principal}',
        'sasl.jaas.config':
            f'com.sun.security.auth.module.Krb5LoginModule required debug=true principal={sasl_kerberos_principal} useTicketCache=true;',
        'group.id': 'test_group_1',
        'auto.offset.reset': 'earliest'}

    source_topic = 'Test_Source2-0_0_0_0.id-0'

    sink_topic = 'Beam.Test'

    with beam.Pipeline(options=pipeline_options) as pipeline:
        outputs = (pipeline
                   | 'Read topic from Kafka' >> ReadFromKafka(consumer_config=source_config,
                                                              topics=[source_topic],
                                                              expansion_service=kafka_process_expansion_service
                                                              )
                   | 'Write topic to Kafka' >> WriteToKafka(producer_config=source_config,
                                                            topic=sink_topic,
                                                            expansion_service=kafka_process_expansion_service
                                                            )
                   )


if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

But I got stuck with ERROR below:

INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Receive slot request 2f0a7a3cd89226651c2f84bd11e23321 for job 1dc3e31750be59cab4f2fcd0710b255e from resource manager with leader id 00000000000000000000000000000000.
2023-11-22 12:52:29,065 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Allocated slot for 2f0a7a3cd89226651c2f84bd11e23321.
2023-11-22 12:52:29,065 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job 1dc3e31750be59cab4f2fcd0710b255e for job leader monitoring.
2023-11-22 12:52:29,066 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to register at job manager akka.tcp://flink@jobmanager:6123/user/rpc/jobmanager_20 with leader id 00000000-0000-0000-0000-000000000000.
2023-11-22 12:52:29,073 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved JobManager address, beginning registration
2023-11-22 12:52:29,083 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful registration at job manager akka.tcp://flink@jobmanager:6123/user/rpc/jobmanager_20 for job 1dc3e31750be59cab4f2fcd0710b255e.
2023-11-22 12:52:29,084 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Establish JobManager connection for job 1dc3e31750be59cab4f2fcd0710b255e.
2023-11-22 12:52:29,084 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Offer reserved slots to the leader of job 1dc3e31750be59cab4f2fcd0710b255e.
2023-11-22 12:52:29,119 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 2f0a7a3cd89226651c2f84bd11e23321.
2023-11-22 12:52:29,122 INFO  org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader [] - Creating a changelog storage with name 'memory'.
2023-11-22 12:52:29,123 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task Source: Impulse -> [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (23453970cf391f954eec648c133f6b45_cbc357ccb763df2852fee8c4fc7d55f2_0_0), deploy into slot with allocation id 2f0a7a3cd89226651c2f84bd11e23321.
2023-11-22 12:52:29,124 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Impulse -> [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (23453970cf391f954eec648c133f6b45_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from CREATED to DEPLOYING.
2023-11-22 12:52:29,125 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Loading JAR files for task Source: Impulse -> [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (23453970cf391f954eec648c133f6b45_cbc357ccb763df2852fee8c4fc7d55f2_0_0) [DEPLOYING].
2023-11-22 12:52:29,127 INFO  org.apache.flink.runtime.blob.BlobClient                     [] - Downloading 1dc3e31750be59cab4f2fcd0710b255e/p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a from jobmanager/172.19.0.2:6124
2023-11-22 12:52:29,145 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 2f0a7a3cd89226651c2f84bd11e23321.
2023-11-22 12:52:29,149 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task [5]{Read topic from Kafka, Write topic to Kafka} (1/1)#0 (23453970cf391f954eec648c133f6b45_508275ad2a106fd681f6d94bbcc7822d_0_0), deploy into slot with allocation id 2f0a7a3cd89226651c2f84bd11e23321.
2023-11-22 12:52:29,151 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 2f0a7a3cd89226651c2f84bd11e23321.
2023-11-22 12:52:29,150 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - [5]{Read topic from Kafka, Write topic to Kafka} (1/1)#0 (23453970cf391f954eec648c133f6b45_508275ad2a106fd681f6d94bbcc7822d_0_0) switched from CREATED to DEPLOYING.
2023-11-22 12:52:29,151 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Loading JAR files for task [5]{Read topic from Kafka, Write topic to Kafka} (1/1)#0 (23453970cf391f954eec648c133f6b45_508275ad2a106fd681f6d94bbcc7822d_0_0) [DEPLOYING].
2023-11-22 12:52:31,693 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@1cce4869
2023-11-22 12:52:31,693 INFO  org.apache.flink.runtime.state.StateBackendLoader            [] - State backend loader loads the state backend as HashMapStateBackend
2023-11-22 12:52:31,696 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Checkpoint storage is set to 'jobmanager'
2023-11-22 12:52:31,727 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - [5]{Read topic from Kafka, Write topic to Kafka} (1/1)#0 (23453970cf391f954eec648c133f6b45_508275ad2a106fd681f6d94bbcc7822d_0_0) switched from DEPLOYING to INITIALIZING.
2023-11-22 12:52:33,035 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@7252a43f
2023-11-22 12:52:33,036 INFO  org.apache.flink.runtime.state.StateBackendLoader            [] - State backend loader loads the state backend as HashMapStateBackend
2023-11-22 12:52:33,036 INFO  org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Checkpoint storage is set to 'jobmanager'
2023-11-22 12:52:33,038 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Impulse -> [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (23453970cf391f954eec648c133f6b45_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from DEPLOYING to INITIALIZING.
2023-11-22 12:52:33,384 WARN  org.apache.flink.metrics.MetricGroup                         [] - The operator name [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} exceeded the 80 characters length limit and was truncated.
2023-11-22 12:52:33,461 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder [] - Finished to build heap keyed state-backend.
2023-11-22 12:52:33,473 INFO  org.apache.flink.runtime.state.heap.HeapKeyedStateBackend    [] - Initializing heap keyed state backend with stream factory.
2023-11-22 12:52:34,193 INFO  org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Beam Fn Logging client connected.
2023-11-22 12:52:35,529 WARN  org.apache.beam.runners.fnexecution.logging.GrpcLoggingService [] - Logging client failed unexpectedly.
org.apache.beam.vendor.grpc.v1p54p0.io.grpc.StatusRuntimeException: CANCELLED: client cancelled
        at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Status.asRuntimeException(Status.java:530) ~[blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?]
        at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:291) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?]
        at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?]
        at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?]
        at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?]
        at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?]
        at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:378) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?]
        at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:365) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?]
        at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:923) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?]
        at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?]
        at org.apache.beam.vendor.grpc.v1p54p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) [blob_p-039dcd4a6f7c7c98e043dd08ae889a32d54677d0-e15023028c24bfb8cee4aa954d7d578a:?]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
        at java.lang.Thread.run(Unknown Source) [?:?]
2023-11-22 12:52:38,914 INFO  org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory [] - Still waiting for startup of environment '/opt/apache/beam_java/boot' for worker id 1-1
2023-11-22 12:52:38,925 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Impulse -> [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (23453970cf391f954eec648c133f6b45_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from INITIALIZING to FAILED with failure cause: org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 1
        at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086)
        at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
        at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
        at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
        at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:452)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:437)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:304)
        at org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
        at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:207)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:258)
        at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672)
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.IllegalStateException: Process died with exit code 1
        at org.apache.beam.runners.fnexecution.environment.ProcessManager$RunningProcess.isAliveOrThrow(ProcessManager.java:75)
        at org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory.createEnvironment(ProcessEnvironmentFactory.java:110)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:253)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:232)
        at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571)
        at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313)
        at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190)
        at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080)
        ... 20 more

2023-11-22 12:52:38,925 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - [5]{Read topic from Kafka, Write topic to Kafka} (1/1)#0 (23453970cf391f954eec648c133f6b45_508275ad2a106fd681f6d94bbcc7822d_0_0) switched from INITIALIZING to FAILED with failure cause: org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 1
        at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086)
        at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
        at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
        at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
        at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:452)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:437)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:304)
        at org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
        at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:207)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:258)
        at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:706)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:672)
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.IllegalStateException: Process died with exit code 1
        at org.apache.beam.runners.fnexecution.environment.ProcessManager$RunningProcess.isAliveOrThrow(ProcessManager.java:75)
        at org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory.createEnvironment(ProcessEnvironmentFactory.java:110)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:253)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:232)
        at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571)
        at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313)
        at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190)
        at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080)
        at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
        at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
        at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
        at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:452)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:437)
        at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:304)
        at org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
        at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:207)
        at org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:258)
        at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:731)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
        ... 7 more

2023-11-22 12:52:38,927 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Source: Impulse -> [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 (23453970cf391f954eec648c133f6b45_cbc357ccb763df2852fee8c4fc7d55f2_0_0).
2023-11-22 12:52:38,927 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for [5]{Read topic from Kafka, Write topic to Kafka} (1/1)#0 (23453970cf391f954eec648c133f6b45_508275ad2a106fd681f6d94bbcc7822d_0_0).
2023-11-22 12:52:38,934 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FAILED to JobManager for task [5]{Read topic from Kafka, Write topic to Kafka} (1/1)#0 23453970cf391f954eec648c133f6b45_508275ad2a106fd681f6d94bbcc7822d_0_0.
2023-11-22 12:52:38,941 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: Impulse -> [3]Read topic from Kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)#0 23453970cf391f954eec648c133f6b45_cbc357ccb763df2852fee8c4fc7d55f2_0_0.
2023-11-22 12:52:39,055 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:1, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1, taskHeapMemory=268.800mb (281857222 bytes), taskOffHeapMemory=0 bytes, managedMemory=317.440mb (332859969 bytes), networkMemory=79.360mb (83214992 bytes)}, allocationId: 2f0a7a3cd89226651c2f84bd11e23321, jobId: 1dc3e31750be59cab4f2fcd0710b255e).

I can't realise what could cause this. Glad to get any help!

0

There are 0 answers