websocket automatic reconnect with flutter & riverpod?

3.5k views Asked by At

1. OBJECTIVE

I would like the connection between my custom WebSocket server (API) and my Flutter app, to be re-established automatically when encountering network issues or when the WebSocket server encounter issues.

  • Use case 1: the wifi stops and suddenly comes back.
  • Use case 2: the API is not started and restarts suddenly.
  • Constraint: I use Riverpod as a state management library (and I want to keep it :)). I emphasize about the state management library because I create the WS connection in a StreamProvider (cf. Riverpod).

2. INITIAL SETUP WITHOUT AUTOMATIC RECONNECT

  • I created a StreamProvider as shown below:
final hostProvider =
  StreamProvider.autoDispose.family<Host, String>((ref, ip) async* {
  //SOCKET OPEN 
  final channel = IOWebSocketChannel.connect('ws://$ip:$port/v1/path');

  ref.onDispose(() {
    // SOCKET CLOSE
    return channel.sink.close();
  });

  await for (final json in channel.stream) {
    final jsonStr = jsonDecode(json as String);
    yield Host.fromJson(jsonStr as Map<String, dynamic>);
  }
});
  • And I created a widget to consume the data:
useProvider(hostProvider(ip)).when(
  data: (data) => show the result
  loading: () => show progress bar
  error: (error, _) => show error
);

This piece of code works great. However, there is no automatic reconnect mechanism.

3. AUTOMATIC RECONNECT ATTEMPTS

  1. I called a function connectWs in a try/catch whenever exceptions are caught:
final hostProvider =
    StreamProvider.autoDispose.family<Host, String>((ref, ip) async* {
  // Open the connection
  connectWs('ws://$ip:$port/v1/path').then((value) async* {
    final channel = IOWebSocketChannel(value);

    ref.onDispose(() {
      return channel.sink.close();
    });

    await for (final json in channel.stream) {
      final jsonStr = jsonDecode(json as String);
      yield Host.fromJson(jsonStr as Map<String, dynamic>);
    }
  });
});

Future<WebSocket> connectWs(String path) async {
  try {
    return await WebSocket.connect(path);
  } catch (e) {
    print("Error! " + e.toString());
    await Future.delayed(Duration(milliseconds: 2000));
    return await connectWs(path);
  }
}
  1. I created a connectProvider provider, as shown here below, I 'watched' in hostProvider in order to create a channel. Whenever there is an exception, I use the refresh function from the Riverpod library to recreate the channel:
// used in hostProvider
ref.container.refresh(connectProvider(ip))

final connectProvider =
  Provider.family<Host, String>((ref, ip) {
  //SOCKET OPEN 
  return IOWebSocketChannel.connect('ws://$ip:$port/v1/path');
  });

Thanks in advance for your help.

2

There are 2 answers

0
josharchibal On BEST ANSWER

Thanks, @Dewey.

In the end, I found a workaround that works for my use case:

My providers: channelProvider & streamProvider

static final channelProvider = Provider.autoDispose
  .family<IOWebSocketChannel, HttpParam>((ref, httpParam) {
log.i('channelProvider | Metrics - $httpParam');
return IOWebSocketChannel.connect(
    'ws://${httpParam.ip}:$port/v1/${httpParam.path}');
}); 

static final streamProvider =
  StreamProvider.autoDispose.family<dynamic, HttpParam>((ref, httpParam) {
log.i('streamProvider | Metrics - $httpParam');
log.i('streamProvider | Metrics - socket ${httpParam.path} opened');    

var bStream = ref
    .watch(channelProvider(httpParam))
    .stream
    .asBroadcastStream(onCancel: (sub) => sub.cancel());

var isSubControlError = false;  

final sub = bStream.listen(
  (data) {
      ref
      .watch(channelProvider(httpParam))
      .sink
      ?.add('> sink add ${httpParam.path}');
   },
  onError: (_, stack) => null,
  onDone: () async {
      isSubControlError = true;
      await Future.delayed(Duration(seconds: 10));
      ref.container.refresh(channelProvider(httpParam));
  },
);  

ref.onDispose(() {
  log.i('streamProvider | Metrics - socket ${httpParam.path} closed');
  sub.cancel();
  if (isSubControlError == false)
    ref.watch(channelProvider(httpParam)).sink?.close(1001);
  bStream = null;
}); 

return bStream;
});

I consume streamProvider that way in my widget:

return useProvider(MetricsWsRepository.streamProvider(HttpParam(
  ip: ip,
  path: 'dummy-path',
))).when(
  data: (data) => deserialize & doSomething1,
  loading:() => doSomething2,
  error: (_, stack) => doSomething3
  )
0
Dewey On

I'm a bit of a beginner with riverpod but it seems to me you want to use a higher-level redux/bloc style flow to recreate the provider each time it fails ...

This higher level bloc creates the provider when the connection succeeds, and when the connection fails, you dispatch an event to the bloc that tells it to reconnect and recreate the provider ...

That's my thought, but again, I'm a beginner with this package.