import 'dart:async'; import 'dart:convert'; import 'dart:developer'; import 'package:appwrite/appwrite.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; // import this package https://pub.dev/packages/web_socket_channel class RealtimeKeepAliveConnection { RealtimeKeepAliveConnection({ required this.channels, required this.domain, required this.client, this.keepAlivePingDuration = const Duration(seconds: 90), required this.onData, required this.onError, }); final List channels; final String domain; final Duration keepAlivePingDuration; final Client client; final Function(RealtimeMessage) onData; final Function(dynamic) onError; // ignore: unused_field StreamSubscription? _subscription; WebSocketChannel? _webSocket; final Stopwatch _stopwatch = Stopwatch(); bool _keepAlive = true; bool _sentKeepAlivePing = false; int reconnectCount = 0; Future initialize() async { await _initRealtime( onData: _realtimeOnData, onDone: _realtimeOnDone, onError: _realtimeOnError, ); _heartbeat(); } void close() { _keepAlive = false; _subscription!.cancel(); } void _heartbeat() async { while (_keepAlive) { await Future.delayed(keepAlivePingDuration); if (_webSocket != null) { _sentKeepAlivePing = true; _webSocket!.sink.add("ping"); } } } void _realtimeOnData(RealtimeMessage data) { log("[$reconnectCount][${_stopwatch.elapsed}] onData"); onData(data); } void _realtimeOnDone() async { reconnectCount++; log("[$reconnectCount][${_stopwatch.elapsed}] onDone"); if (_keepAlive) { if (_subscription != null) _subscription!.cancel(); _subscription = _subscription = await _initRealtime( onData: _realtimeOnData, onDone: _realtimeOnDone, onError: _realtimeOnError, ); } } void _realtimeOnError(dynamic e) { log("[$reconnectCount][${_stopwatch.elapsed}] onError:$e"); onError(onError); } Future _initRealtime({ required Function(RealtimeMessage) onData, required Function() onDone, required Function(dynamic) onError, }) async { _stopwatch.reset(); _stopwatch.start(); String channelParams = channels.map((c) => "channels[]=$c").join('&'); String? projectId = client.config['project']; final wssUrl = Uri.parse('wss://$domain/realtime?project=$projectId&$channelParams'); _webSocket = WebSocketChannel.connect(wssUrl); Realtime realtime = Realtime(client); RealtimeSubscription subscriptionRealTime = realtime.subscribe(channels); subscriptionRealTime.stream.listen(onData, onDone: onDone, onError: onError); _subscription = _webSocket!.stream.listen(_handlePingMsg); } void _handlePingMsg(dynamic response) { var json = jsonDecode(response); if (json["type"] == "error" && _sentKeepAlivePing) { _sentKeepAlivePing = false; log("Web socket keep-alive heartbeat successful (Reconnect Count: $reconnectCount, Time alive: ${_stopwatch.elapsed})"); return; } } }