I have my Flink app deployed to AWS Managed Flink, while trying to implement async I/O with Redisson (Redis client) to connect to AWS MemoryDB (a Redis Cluster), I got an error (see below). It isn't clear to me how Redisson is creating new sockets and how AWS Managed Flink is configuring the maximum number of sockets. Can someone shed some light on how to resolve this issue? Thanks.

java.lang.IllegalStateException: failed to create a new resolver
at io.netty.resolver.AddressResolverGroup.getResolver(AddressResolverGroup.java:72)
at org.redisson.client.RedisClient.resolveAddr(RedisClient.java:203)
at org.redisson.client.RedisClient.connectAsync(RedisClient.java:220)
at org.redisson.connection.MasterSlaveConnectionManager.connectToNode(MasterSlaveConnectionManager.java:125)
at org.redisson.connection.MasterSlaveConnectionManager.connectToNode(MasterSlaveConnectionManager.java:111)
at org.redisson.cluster.ClusterConnectionManager.doConnect(ClusterConnectionManager.java:91)
at org.redisson.connection.MasterSlaveConnectionManager.connect(MasterSlaveConnectionManager.java:192)
at org.redisson.config.ConfigSupport.createConnectionManager(ConfigSupport.java:220)
at org.redisson.Redisson.<init>(Redisson.java:69)
at org.redisson.Redisson.create(Redisson.java:114)
at com.amazon.otf.etl.MyAsyncMap.open(MyAsyncMap.java:40)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:163)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: io.netty.channel.ChannelException: Unable to create Channel from class class io.netty.channel.socket.nio.NioDatagramChannel
at io.netty.channel.ReflectiveChannelFactory.newChannel(ReflectiveChannelFactory.java:46)
at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:310)
at io.netty.bootstrap.AbstractBootstrap.register(AbstractBootstrap.java:227)
at io.netty.resolver.dns.DnsNameResolver.<init>(DnsNameResolver.java:517)
at io.netty.resolver.dns.DnsNameResolverBuilder.build(DnsNameResolverBuilder.java:527)
at io.netty.resolver.dns.DnsAddressResolverGroup.newNameResolver(DnsAddressResolverGroup.java:114)
at io.netty.resolver.dns.DnsAddressResolverGroup.newResolver(DnsAddressResolverGroup.java:92)
at io.netty.resolver.dns.DnsAddressResolverGroup.newResolver(DnsAddressResolverGroup.java:77)
at io.netty.resolver.AddressResolverGroup.getResolver(AddressResolverGroup.java:70)
... 23 more
Caused by: java.lang.reflect.InvocationTargetException
at jdk.internal.reflect.GeneratedConstructorAccessor247.newInstance(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at io.netty.channel.ReflectiveChannelFactory.newChannel(ReflectiveChannelFactory.java:44)
... 31 more
Caused by: io.netty.channel.ChannelException: Failed to open a socket.
at io.netty.channel.socket.nio.NioDatagramChannel.newSocket(NioDatagramChannel.java:91)
at io.netty.channel.socket.nio.NioDatagramChannel.<init>(NioDatagramChannel.java:120)
... 35 more
Caused by: java.net.SocketException: maximum number of DatagramSockets reached
at java.base/sun.net.ResourceManager.beforeUdpCreate(ResourceManager.java:72)
at java.base/sun.nio.ch.DatagramChannelImpl.<init>(DatagramChannelImpl.java:132)
at java.base/sun.nio.ch.SelectorProviderImpl.openDatagramChannel(SelectorProviderImpl.java:42)
at io.netty.channel.socket.nio.NioDatagramChannel.newSocket(NioDatagramChannel.java:89)
... 36 more
0

There are 0 answers