I am using wampsharpclient to connect to the websocket. I am unable to reconnect to the socket if it is disconnected.Here below is the code which I have written for connection logic and reconnecting if connection is broken or error occurs.

public static async Task WampConnect(String uri, String kafkaTopic, String subscribeTopic)
            {
                log.Info("Entering to WampConnect method: " + Thread.CurrentThread.ManagedThreadId + " - " + kafkaTopic);
                await Task.Run(async () =>
                {
                    DefaultWampChannelFactory factory = new DefaultWampChannelFactory();
                    IWampChannel channel = factory.CreateJsonChannel(uri, "realm1");
                    IWampRealmProxy realmProxy = channel.RealmProxy;
                    await channel.Open();
                    var retry = true;
                    channel.RealmProxy.Monitor.ConnectionError +=  async(sender, args) =>
                    {
                        log.Info("Error Reason: " + args.Exception + " - Reconnecting");
                        retry = true;
                       while (retry)
                        {
                            factory = new DefaultWampChannelFactory();
                            channel = factory.CreateJsonChannel(uri, "realm1");
                            realmProxy = channel.RealmProxy;
                            await channel.Open();
                            //await Task.Delay(2000);
                        }
                    };
                    channel.RealmProxy.Monitor.ConnectionEstablished +=
                        (sender, args) =>
                        {
                            retry = false;
                            log.Info("connected session with ID " + args.SessionId);
                            IWampSubject topic = realmProxy.Services.GetSubject(subscribeTopic);
                            IDisposable disposable = topic.Subscribe(
                                async serializedEvent =>
                                {
                                    Object arguments = null;

                                    if (subscribeTopic.Equals("ticker"))
                                    {
                                        String[] tickerData = serializedEvent.Arguments.Select(argument => argument.Deserialize<String>()).ToArray();
                                        arguments = string.Join(",", tickerData);

                                    }
                                    else
                                    {
                                        arguments = serializedEvent.Arguments.Select(argument => argument.Deserialize<JObject>()).ToArray<JObject>();

                                    }
                                    await SendToKafka(arguments, kafkaTopic);
                                }
                                );
                        };

                    channel.RealmProxy.Monitor.ConnectionBroken +=  (sender, args) =>
                    {
                        log.Info("Disconnected Reason: " + args.Reason + " - Reconnecting");
                        retry = true;
                        /*while(retry)
                        {
                            factory = new DefaultWampChannelFactory();
                            channel = factory.CreateJsonChannel(uri, "realm1");
                            realmProxy = channel.RealmProxy;
                            await channel.Open();
                            await Task.Delay(2000);
                        }*/

                    };
                });
                // log.Info("Exiting  WampConnect method: " + Thread.CurrentThread.ManagedThreadId + " - " + kafkaTopic);

            }
0

There are 0 answers