Connecting KafkaSource to SASL enabled kafka broker of AWS MSK cluster forknative eventing

275 views Asked by At

We are trying to implement event-driven architecture with our applications using knative eventing.

We wish to connect to an Apache Kafka cluster(AWS MSK) and have those messages flow through Knative Eventing.

Using the following blog we have deployed the kind : KafkaSource but it failing to connect MSK brokers when SASL authentication is enable to MSK cluster side.

https://knative.dev/docs/eventing/sources/kafka-source/#enabling-sasl-for-kafkasources

Note: And we are able to connect over plaintext communication with no authentication.

Please suggest a way to connect KafkaSource to MKS brokers having SASL enabled.

Please find the KafkaSource here

kind: KafkaSource
metadata:
  name: kafka-source
spec:
  consumerGroup: kntive-groups
  bootstrapServers:
  - my-cluster-kafka-bootstrap.kafka:9096 #MSK broker
  topics:
  - knative-input-topic
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: my-app-service
    uri: /myAppUrl
  net:
    sasl:
      enable: true
      user:
        secretKeyRef:
          name: msk-secret
          key: user
      password:
        secretKeyRef:
          name: msk-secret
          key: password
      type:
        secretKeyRef:
          name: msk-secret
          key: saslType
    tls:
      enable: false
      caCert:
        secretKeyRef:
          name: msk-secret-tf
          key: ca.crt

Please find the logs here(Noticed that it is always throwing one common error irrespective of any mistake in code- (client has run out of available brokers to talk to) But the brokers are actually rachable.

kubectl get kafkasource kafka-source -n my-ns -o yaml

status:   conditions:
  - lastTransitionTime: "2023-02-03T06:10:15Z"
    message: 'kafka: client has run out of available brokers to talk to (Is your cluster
      reachable?)'
    reason: ClientCreationFailed
    status: "False"
    type: ConnectionEstablished
  - lastTransitionTime: "2023-02-03T06:10:15Z"
    status: Unknown
    type: Deployed
  - lastTransitionTime: "2023-02-03T06:10:15Z"
    status: Unknown
    type: InitialOffsetsCommitted
  - lastTransitionTime: "2023-02-03T06:10:15Z"
    message: 'kafka: client has run out of available brokers to talk to (Is your cluster
      reachable?)'
    reason: ClientCreationFailed
    status: "False"
    type: Ready
  - lastTransitionTime: "2023-02-03T06:10:15Z"
    status: "True"
    type: SinkProvided

Logs:

kubectl logs deployment.apps/kafka-controller-manager -n knative-eventing
{
  "level": "info",
  "ts": "2023-02-03T06:21:18.865Z",
  "logger": "kafka-controller",
  "caller": "client/config.go:288",
  "msg": "Built Sarama config: &{Admin:{Retry:{Max:5 Backoff:100ms} Timeout:3s} Net:{MaxOpenRequests:5 DialTimeout:30s ReadTimeout:30s WriteTimeout:30s TLS:{Enable:false Config:<nil>} SASL:{Enable:true Mechanism:SCRAM-SHA-512 Version:0 Handshake:true AuthIdentity: User:maas-ml-testuser2 Password: SCRAMAuthzID: SCRAMClientGeneratorFunc:0x1861980 TokenProvider:<nil> GSSAPI:{AuthType:0 KeyTabPath: KerberosConfigPath: ServiceName: Username: Password: Realm: DisablePAFXFAST:false}} KeepAlive:0s LocalAddr:<nil> Proxy:{Enable:false Dialer:<nil>}} Metadata:{Retry:{Max:3 Backoff:250ms BackoffFunc:<nil>} RefreshFrequency:10m0s Full:true Timeout:0s AllowAutoTopicCreation:true} Producer:{MaxMessageBytes:1000000 RequiredAcks:1 Timeout:10s Compression:none CompressionLevel:-1000 Partitioner:0x17cf660 Idempotent:false Return:{Successes:true Errors:true} Flush:{Bytes:0 Messages:0 Frequency:0s MaxMessages:0} Retry:{Max:3 Backoff:100ms BackoffFunc:<nil>} Interceptors:[]} Consumer:{Group:{Session:{Timeout:10s} Heartbeat:{Interval:3s} Rebalance:{Strategy:0x2f67290 Timeout:1m0s Retry:{Max:4 Backoff:2s}} Member:{UserData:[]}} Retry:{Backoff:2s BackoffFunc:<nil>} Fetch:{Min:1 Default:1048576 Max:0} MaxWaitTime:250ms MaxProcessingTime:100ms Return:{Errors:true} Offsets:{CommitInterval:0s AutoCommit:{Enable:true Interval:1s} Initial:-2 Retention:0s Retry:{Max:3}} IsolationLevel:0 Interceptors:[]} ClientID:sarama RackID: ChannelBufferSize:256 ApiVersionsRequest:true Version:1.0.0 MetricRegistry:0xc002ca4080}",
  "commit": "394f005-dirty",
  "knative.dev/controller": "knative.dev.eventing-kafka.pkg.source.reconciler.source.Reconciler",
  "knative.dev/kind": "sources.knative.dev.KafkaSource",
  "knative.dev/traceid": "4d6b80c4-2116-4acb-b5bc-e1d074c2a380",
  "knative.dev/key": "coal-dev/uat-kafka-source"
}
{
  "level": "error",
  "ts": "2023-02-03T06:21:19.654Z",
  "logger": "kafka-controller",
  "caller": "source/kafkasource.go:184",
  "msg": "unable to create a kafka client",
  "commit": "394f005-dirty",
  "knative.dev/controller": "knative.dev.eventing-kafka.pkg.source.reconciler.source.Reconciler",
  "knative.dev/kind": "sources.knative.dev.KafkaSource",
  "knative.dev/traceid": "4d6b80c4-2116-4acb-b5bc-e1d074c2a380",
  "knative.dev/key": "coal-dev/uat-kafka-source",
  "error": "kafka: client has run out of available brokers to talk to (Is your cluster reachable?)",
  "stacktrace": "knative.dev/eventing-kafka/pkg/source/reconciler/source.(*Reconciler).ReconcileKind\n\tknative.dev/eventing-kafka/pkg/source/reconciler/source/kafkasource.go:184\nknative.dev/eventing-kafka/pkg/client/injection/reconciler/sources/v1beta1/kafkasource.(*reconcilerImpl).Reconcile\n\tknative.dev/eventing-kafka/pkg/client/injection/reconciler/sources/v1beta1/kafkasource/reconciler.go:239\nknative.dev/pkg/controller.(*Impl).processNextWorkItem\n\tknative.dev/[email protected]/controller/controller.go:542\nknative.dev/pkg/controller.(*Impl).RunContext.func3\n\tknative.dev/[email protected]/controller/controller.go:491"
}
{
  "level": "error",
  "ts": "2023-02-03T06:21:19.655Z",
  "logger": "kafka-controller",
  "caller": "kafkasource/reconciler.go:302",
  "msg": "Returned an error",
  "commit": "394f005-dirty",
  "knative.dev/controller": "knative.dev.eventing-kafka.pkg.source.reconciler.source.Reconciler",
  "knative.dev/kind": "sources.knative.dev.KafkaSource",
  "knative.dev/traceid": "4d6b80c4-2116-4acb-b5bc-e1d074c2a380",
  "knative.dev/key": "coal-dev/uat-kafka-source",
  "targetMethod": "ReconcileKind",
  "error": "kafka: client has run out of available brokers to talk to (Is your cluster reachable?)",
  "stacktrace": "knative.dev/eventing-kafka/pkg/client/injection/reconciler/sources/v1beta1/kafkasource.(*reconcilerImpl).Reconcile\n\tknative.dev/eventing-kafka/pkg/client/injection/reconciler/sources/v1beta1/kafkasource/reconciler.go:302\nknative.dev/pkg/controller.(*Impl).processNextWorkItem\n\tknative.dev/[email protected]/controller/controller.go:542\nknative.dev/pkg/controller.(*Impl).RunContext.func3\n\tknative.dev/[email protected]/controller/controller.go:491"
}
{
  "level": "error",
  "ts": "2023-02-03T06:21:19.655Z",
  "logger": "kafka-controller",
  "caller": "controller/controller.go:566",
  "msg": "Reconcile error",
  "commit": "394f005-dirty",
  "knative.dev/controller": "knative.dev.eventing-kafka.pkg.source.reconciler.source.Reconciler",
  "knative.dev/kind": "sources.knative.dev.KafkaSource",
  "knative.dev/traceid": "4d6b80c4-2116-4acb-b5bc-e1d074c2a380",
  "knative.dev/key": "coal-dev/uat-kafka-source",
  "duration": 0.813508291,
  "error": "kafka: client has run out of available brokers to talk to (Is your cluster reachable?)",
  "stacktrace": "knative.dev/pkg/controller.(*Impl).handleErr\n\tknative.dev/[email protected]/controller/controller.go:566\nknative.dev/pkg/controller.(*Impl).processNextWorkItem\n\tknative.dev/[email protected]/controller/controller.go:543\nknative.dev/pkg/controller.(*Impl).RunContext.func3\n\tknative.dev/[email protected]/controller/controller.go:491"
}
{
  "level": "info",
  "ts": "2023-02-03T06:21:19.655Z",
  "logger": "kafka-controller.event-broadcaster",
  "caller": "record/event.go:285",
  "msg": "Event(v1.ObjectReference{Kind:\"KafkaSource\", Namespace:\"coal-dev\", Name:\"uat-kafka-source\", UID:\"1b6dd5c4-539a-424a-811c-fd16a5d2468d\", APIVersion:\"sources.knative.dev/v1beta1\", ResourceVersion:\"56774522\", FieldPath:\"\"}): type: 'Warning' reason: 'InternalError' kafka: client has run out of available brokers to talk to (Is your cluster reachable?)",
  "commit": "394f005-dirty"
}
0

There are 0 answers