Files
Bus-Infotainment--IBus-/lib/workaround/keepalive_realtime.dart
2024-05-03 14:03:51 +01:00

113 lines
3.1 KiB
Dart

import 'dart:async';
import 'dart:convert';
import 'dart:developer';
import 'package:appwrite/appwrite.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
// import this package https://pub.dev/packages/web_socket_channel
class RealtimeKeepAliveConnection {
RealtimeKeepAliveConnection({
required this.channels,
required this.domain,
required this.client,
this.keepAlivePingDuration = const Duration(seconds: 90),
required this.onData,
required this.onError,
});
final List<String> channels;
final String domain;
final Duration keepAlivePingDuration;
final Client client;
final Function(RealtimeMessage) onData;
final Function(dynamic) onError;
// ignore: unused_field
StreamSubscription<dynamic>? _subscription;
WebSocketChannel? _webSocket;
final Stopwatch _stopwatch = Stopwatch();
bool _keepAlive = true;
bool _sentKeepAlivePing = false;
int reconnectCount = 0;
Future initialize() async {
await _initRealtime(
onData: _realtimeOnData,
onDone: _realtimeOnDone,
onError: _realtimeOnError,
);
_heartbeat();
}
void close() {
_keepAlive = false;
_subscription!.cancel();
}
void _heartbeat() async {
while (_keepAlive) {
await Future.delayed(keepAlivePingDuration);
if (_webSocket != null) {
_sentKeepAlivePing = true;
_webSocket!.sink.add("ping");
}
}
}
void _realtimeOnData(RealtimeMessage data) {
log("[$reconnectCount][${_stopwatch.elapsed}] onData");
onData(data);
}
void _realtimeOnDone() async {
reconnectCount++;
log("[$reconnectCount][${_stopwatch.elapsed}] onDone");
if (_keepAlive) {
if (_subscription != null) _subscription!.cancel();
_subscription = _subscription = await _initRealtime(
onData: _realtimeOnData,
onDone: _realtimeOnDone,
onError: _realtimeOnError,
);
}
}
void _realtimeOnError(dynamic e) {
log("[$reconnectCount][${_stopwatch.elapsed}] onError:$e");
onError(onError);
}
Future _initRealtime({
required Function(RealtimeMessage) onData,
required Function() onDone,
required Function(dynamic) onError,
}) async {
_stopwatch.reset();
_stopwatch.start();
String channelParams = channels.map((c) => "channels[]=$c").join('&');
String? projectId = client.config['project'];
final wssUrl = Uri.parse('wss://$domain/realtime?project=$projectId&$channelParams');
_webSocket = WebSocketChannel.connect(wssUrl);
Realtime realtime = Realtime(client);
RealtimeSubscription subscriptionRealTime = realtime.subscribe(channels);
subscriptionRealTime.stream.listen(onData, onDone: onDone, onError: onError);
_subscription = _webSocket!.stream.listen(_handlePingMsg);
}
void _handlePingMsg(dynamic response) {
var json = jsonDecode(response);
if (json["type"] == "error" && _sentKeepAlivePing) {
_sentKeepAlivePing = false;
log("Web socket keep-alive heartbeat successful (Reconnect Count: $reconnectCount, Time alive: ${_stopwatch.elapsed})");
return;
}
}
}