I'm trying to publish messages to GCP pubsub topic using Spring Boot framework. Occasionally running into errors where it says managed channel was not shutdown but I'm not sure where exactly I need to shut it down.

GCPPublisherConfig.java

@Configuration
@RequiredArgsConstructor
public class GCPPublisherConfig {

    private final AppProperties appProperties;

    @Bean
    public PublisherFactory publisherFactory(CredentialsProvider defaultCredentialsProvider) {
        DefaultPublisherFactory factory = new DefaultPublisherFactory(() -> appProperties.getPubsubProjectId());
        factory.setEnableMessageOrdering(true);
        factory.setCredentialsProvider(defaultCredentialsProvider);
        return factory;
    }

    @Bean
    public PubSubTemplate pubSubTemplate(
            PublisherFactory publisherFactory, SubscriberFactory subscriberFactory) {
        return new PubSubTemplate(publisherFactory, subscriberFactory);
    }

}

MessagePublisher.java

@RequiredArgsConstructor
@Component
public class MessagePublisher {

    private final PubSubTemplate pubSubTemplate;
    private final AppProperties appProperties;
    private final Gson gson;

    public Mono<String> publishMessageToGcpTopic(Message message, String orderingKey){
        log.info("publishing message to pubsub topic");

        PubsubMessage msgToPublish = PubsubMessage.newBuilder()
                .setData(ByteString.copyFromUtf8(gson.toJson(message)))
                .setOrderingKey(orderingKey)
                .build();

        return Mono.fromFuture(
                this.pubSubTemplate
                .publish(appProperties.getPubsubTopicId(), msgToPublish)
                .exceptionally(ex -> {
                    log.error("error when publishing messaging to gcp", ex);
                    return "0";
                }));
    }
}

AppService.java

@Override
    public Flux<Integer> getData(){
      // this is not the actual service logic. I have created this sample logic to reproduce the error.
           return Flux.range(1, 1000)
                    .map(it -> {
                        log.info("number - {}", it);
                        return it;
                    });
    };

AppController.java

@GetMapping("/publish/messages")
    public Mono<Boolean> testMessages(){

        return appService.getData()
                .flatMap(it -> {
                    EdslMessage edslMessage = EdslMessage.builder().build();
                    messagePublisher.publishMessageToGcpTopic(edslMessage, it);
                    return Mono.just(true);
                })
                .switchIfEmpty(Mono.defer(() -> {
                    log.warn("No entries found for date less than current date");
                    return Mono.just(false);
               }))
                .reduce((a, b) -> a && b);
}

Error Stacktrace

[ctor-http-nio-3] i.g.i.ManagedChannelOrphanWrapper        : *~*~*~ Previous channel ManagedChannelImpl{logId=571, target=pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.

java.lang.RuntimeException: ManagedChannel allocation site
    at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:102)
    at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:60)
    at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:51)
    at io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:631)
    at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:297)
    at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:391)
    at com.google.api.gax.grpc.ChannelPool.<init>(ChannelPool.java:107)
    at com.google.api.gax.grpc.ChannelPool.create(ChannelPool.java:85)
    at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:237)
    at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:231)
    at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:236)
    at com.google.cloud.pubsub.v1.stub.GrpcPublisherStub.create(GrpcPublisherStub.java:203)
    at com.google.cloud.pubsub.v1.Publisher.<init>(Publisher.java:201)
    at com.google.cloud.pubsub.v1.Publisher.<init>(Publisher.java:91)
    at com.google.cloud.pubsub.v1.Publisher$Builder.build(Publisher.java:881)
    at com.google.cloud.spring.pubsub.support.DefaultPublisherFactory.createPublisher(DefaultPublisherFactory.java:186)
    at com.google.cloud.spring.pubsub.core.publisher.PubSubPublisherTemplate.publish(PubSubPublisherTemplate.java:94)
    at com.google.cloud.spring.pubsub.core.PubSubTemplate.publish(PubSubTemplate.java:120)
    at org.ascension.swe.myaccount.careteambuilder.replay.MessagePublisher.publishMessageToGcpTopic(MessagePublisher.java:33)
    at org.ascension.swe.myaccount.careteambuilder.replay.TestController.lambda$testMessages$1(TestController.java:54)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
    at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:156)
    at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:111)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
    at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:69)
    at reactor.core.publisher.FluxFromMonoOperator.subscribe(FluxFromMonoOperator.java:83)
    at reactor.core.publisher.FluxDeferContextual.subscribe(FluxDeferContextual.java:57)
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:292)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158)
    at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:293)
    at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:474)
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
    at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:122)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158)
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299)
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
    at reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2071)
    at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:145)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
    at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:413)
    at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:431)
    at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:651)
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:113)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:276)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)
1

There are 1 answers

2
Md Alif Al Amin On

You have to explicitly shutdown ManagedChannel instances when they are no longer needed. To close the channels to release resources.

While using Spring Cloud GCP to work with Pub/Sub.you can explicitly shut down the PubSubTemplate and the associated channel when they are no longer needed. You can define a @PreDestroy method in your MessagePublisher bean to handle the shutdown, here is the code:

@PreDestroy
public void shutdown() {
    log.info("Shutting down PubSubTemplate and its channels...");
    pubSubTemplate.getPublisherFactory().shutdown();
}

The @PreDestroy method is called when the Spring container is shutting down and can be used to perform cleanup tasks like closing resources.