paradigm shift
This commit is contained in:
112
lib/workaround/keepalive_realtime.dart
Normal file
112
lib/workaround/keepalive_realtime.dart
Normal file
@@ -0,0 +1,112 @@
|
||||
import 'dart:async';
|
||||
import 'dart:convert';
|
||||
import 'dart:developer';
|
||||
|
||||
import 'package:appwrite/appwrite.dart';
|
||||
import 'package:web_socket_channel/web_socket_channel.dart';
|
||||
// import this package https://pub.dev/packages/web_socket_channel
|
||||
|
||||
class RealtimeKeepAliveConnection {
|
||||
RealtimeKeepAliveConnection({
|
||||
required this.channels,
|
||||
required this.domain,
|
||||
required this.client,
|
||||
this.keepAlivePingDuration = const Duration(seconds: 90),
|
||||
required this.onData,
|
||||
required this.onError,
|
||||
});
|
||||
|
||||
final List<String> channels;
|
||||
final String domain;
|
||||
final Duration keepAlivePingDuration;
|
||||
final Client client;
|
||||
final Function(RealtimeMessage) onData;
|
||||
final Function(dynamic) onError;
|
||||
|
||||
// ignore: unused_field
|
||||
StreamSubscription<dynamic>? _subscription;
|
||||
WebSocketChannel? _webSocket;
|
||||
final Stopwatch _stopwatch = Stopwatch();
|
||||
bool _keepAlive = true;
|
||||
bool _sentKeepAlivePing = false;
|
||||
int reconnectCount = 0;
|
||||
|
||||
Future initialize() async {
|
||||
await _initRealtime(
|
||||
onData: _realtimeOnData,
|
||||
onDone: _realtimeOnDone,
|
||||
onError: _realtimeOnError,
|
||||
);
|
||||
_heartbeat();
|
||||
}
|
||||
|
||||
void close() {
|
||||
_keepAlive = false;
|
||||
_subscription!.cancel();
|
||||
}
|
||||
|
||||
void _heartbeat() async {
|
||||
while (_keepAlive) {
|
||||
await Future.delayed(keepAlivePingDuration);
|
||||
if (_webSocket != null) {
|
||||
_sentKeepAlivePing = true;
|
||||
_webSocket!.sink.add("ping");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void _realtimeOnData(RealtimeMessage data) {
|
||||
log("[$reconnectCount][${_stopwatch.elapsed}] onData");
|
||||
onData(data);
|
||||
}
|
||||
|
||||
void _realtimeOnDone() async {
|
||||
reconnectCount++;
|
||||
log("[$reconnectCount][${_stopwatch.elapsed}] onDone");
|
||||
if (_keepAlive) {
|
||||
if (_subscription != null) _subscription!.cancel();
|
||||
|
||||
_subscription = _subscription = await _initRealtime(
|
||||
onData: _realtimeOnData,
|
||||
onDone: _realtimeOnDone,
|
||||
onError: _realtimeOnError,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
void _realtimeOnError(dynamic e) {
|
||||
log("[$reconnectCount][${_stopwatch.elapsed}] onError:$e");
|
||||
onError(onError);
|
||||
}
|
||||
|
||||
Future _initRealtime({
|
||||
required Function(RealtimeMessage) onData,
|
||||
required Function() onDone,
|
||||
required Function(dynamic) onError,
|
||||
}) async {
|
||||
_stopwatch.reset();
|
||||
_stopwatch.start();
|
||||
String channelParams = channels.map((c) => "channels[]=$c").join('&');
|
||||
|
||||
String? projectId = client.config['project'];
|
||||
|
||||
final wssUrl = Uri.parse('wss://$domain/realtime?project=$projectId&$channelParams');
|
||||
_webSocket = WebSocketChannel.connect(wssUrl);
|
||||
|
||||
Realtime realtime = Realtime(client);
|
||||
RealtimeSubscription subscriptionRealTime = realtime.subscribe(channels);
|
||||
|
||||
subscriptionRealTime.stream.listen(onData, onDone: onDone, onError: onError);
|
||||
_subscription = _webSocket!.stream.listen(_handlePingMsg);
|
||||
}
|
||||
|
||||
void _handlePingMsg(dynamic response) {
|
||||
var json = jsonDecode(response);
|
||||
|
||||
if (json["type"] == "error" && _sentKeepAlivePing) {
|
||||
_sentKeepAlivePing = false;
|
||||
log("Web socket keep-alive heartbeat successful (Reconnect Count: $reconnectCount, Time alive: ${_stopwatch.elapsed})");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user