// Singleton import 'dart:async'; import 'dart:convert'; import 'package:appwrite/appwrite.dart' as appwrite; import 'package:appwrite/models.dart' as models; import 'package:bus_infotainment/audio_cache.dart'; import 'package:bus_infotainment/auth/api_constants.dart'; import 'package:bus_infotainment/auth/auth_api.dart'; import 'package:bus_infotainment/tfl_datasets.dart'; import 'package:bus_infotainment/utils/audio%20wrapper.dart'; import 'package:bus_infotainment/utils/delegates.dart'; import 'package:flutter/foundation.dart'; import 'package:flutter/services.dart'; import 'package:http/http.dart' as http; import 'package:ntp/ntp.dart'; class LiveInformation { static final LiveInformation _singleton = LiveInformation._internal(); factory LiveInformation() { return _singleton; } LiveInformation._internal(); Future Initialize() async { { // Load the bus sequences try { http.Response response = await http.get(Uri.parse('https://tfl.gov.uk/bus-sequences.csv')); busSequences = BusSequences.fromCSV(response.body); print("Loaded bus sequences from TFL"); } catch (e) { String csv = await rootBundle.loadString("assets/datasets/bus-sequences.csv"); busSequences = BusSequences.fromCSV(csv); print("Loaded bus sequences from assets"); } if (auth.isAuthenticated()){ print("Auth is authenticated"); setupRealtime(); } else { print("Auth is not authenticated"); auth.onLogin.addListener((value) { setupRealtime(); }); } } refreshTimer(); } Timer refreshTimer() => Timer.periodic(const Duration(milliseconds: 100), (timer) async { await updateNtpOffset(); _handleAnnouncementQueue(); }); int ntpOffset = -1; DateTime lastNtpUpdate = DateTime.now().add(const Duration(seconds: -15)); /// updates the NTP offset from DateTime.now() Future updateNtpOffset() async { // Only update the NTP offset every 10 seconds if (DateTime.now().difference(lastNtpUpdate).inSeconds < 10) { return; } var res = await http.get(Uri.parse('http://worldtimeapi.org/api/timezone/Europe/London')); if (res.statusCode == 200) { var json = jsonDecode(res.body); DateTime time = DateTime.parse(json['datetime']); ntpOffset = time.millisecondsSinceEpoch - DateTime.now().millisecondsSinceEpoch; lastNtpUpdate = DateTime.now(); } } DateTime getNow() { if (ntpOffset == -1) { throw Exception("NTP offset not set"); } return DateTime.now().add(Duration(milliseconds: ntpOffset)); } AudioWrapper audioPlayer = AudioWrapper(); AnnouncementCache announcementCache = AnnouncementCache(); List announcementQueue = []; AnnouncementQueueEntry? lastAnnouncement; DateTime lastAnnouncementTimeStamp = DateTime.now().toUtc(); EventDelegate announcementDelegate = EventDelegate(); String _currentAnnouncement = "*** NO MESSAGE ***"; String get currentAnnouncement => _currentAnnouncement; void set currentAnnouncement(String value) { _currentAnnouncement = value; } bool isPlayingAnnouncement = false; void _handleAnnouncementQueue() async { int timerInterval = 100; // print("Handling announcement queue"); if (!isPlayingAnnouncement) { if (announcementQueue.isNotEmpty) { print("Handling announcement queue"); AnnouncementQueueEntry announcement = announcementQueue.first; print("Queue length: ${announcementQueue.length}"); { DateTime now = getNow(); if (announcement.scheduledTime != null) { int milisecondDifference = abs(now.millisecondsSinceEpoch - announcement.scheduledTime!.millisecondsSinceEpoch); // print("Q Difference: ${milisecondDifference}"); if (milisecondDifference <= timerInterval) { // Account for the time lost by the periodic timer await Future.delayed(Duration(milliseconds: timerInterval - milisecondDifference)); } else { print("Due in: ${milisecondDifference}ms"); return; } } } announcementQueue.removeAt(0); lastAnnouncement = announcement; isPlayingAnnouncement = true; if (kIsWeb) { await Future.delayed(Duration(milliseconds: 100)); } print("Displaying announcement: ${announcement.displayText}"); announcementDelegate.trigger(announcement); _currentAnnouncement = announcement.displayText; lastAnnouncementTimeStamp = getNow(); if (announcement.audioSources.isNotEmpty) { try { for (AudioWrapperSource source in announcement.audioSources) { Duration? duration = await audioPlayer.play(source); await Future.delayed(duration!); await Future.delayed(Duration(milliseconds: 150)); } } finally { audioPlayer.stop(); } } else { if (announcementQueue.isNotEmpty) { await Future.delayed(Duration(seconds: 5)); } } isPlayingAnnouncement = false; print("Popped announcement queue"); } } } Future _getDestinationAnnouncement(BusRouteVariant routeVariant, {bool sendToServer = false}) async { String display = "${routeVariant.busRoute.routeNumber} to ${routeVariant.busStops.last.formattedStopName}"; String audio_route = "R_${routeVariant.busRoute.routeNumber}_001.mp3"; String audio_destination = routeVariant.busStops.last.getAudioFileName(); // Cache the audio files await announcementCache.loadAnnouncements([audio_route, audio_destination]); AudioWrapperSource source_route = AudioWrapperByteSource(announcementCache[audio_route]); AudioWrapperSource source_destination = AudioWrapperByteSource(announcementCache[audio_destination]); return AnnouncementQueueEntry( sendToServer: sendToServer, displayText: display, audioSources: [source_route, AudioWrapperAssetSource("audio/to_destination.wav"), source_destination] ); } Future getDestinationAnnouncement(BusRouteVariant routeVariant, {bool sendToServer = true}) async { return DestinationAnnouncementEntry( routeVariant: routeVariant, audioSources: [], sendToServer: sendToServer, ); } late BusSequences busSequences; BusRouteVariant? _currentRouteVariant; EventDelegate routeVariantDelegate = EventDelegate(); Future setRouteVariant(BusRouteVariant routeVariant) async { _currentRouteVariant = routeVariant; routeVariantDelegate.trigger(routeVariant); // cache all of the stop announcements List audioFiles = []; for (BusRouteStops stop in routeVariant.busStops) { audioFiles.add(stop.getAudioFileName()); print("Cached stop audio: ${stop.getAudioFileName()}"); } await announcementCache.loadAnnouncements(audioFiles); } BusRouteVariant? getRouteVariant() { return _currentRouteVariant; } void queueAnnouncement(AnnouncementQueueEntry announcement) async { // Make sure the timestamp of the announcement is after the last announcement // If so, dont queue it // If timestamp is null, then skip this check if (announcement.timestamp != null && announcement.timestamp!.toUtc().isBefore(lastAnnouncementTimeStamp)) { print("Announcement is too old"); print("LastAnnouncement: $lastAnnouncementTimeStamp"); print("Announcement: ${announcement.timestamp}"); int difference = announcement.timestamp!.difference(lastAnnouncementTimeStamp).inMilliseconds; print("Difference: $difference"); return; } else if (announcement.timestamp == null) { print("Announcement `${announcement.displayText}` does not have timestamp"); } // If there is an announcement in the queue with the same timestamp, dont queue it if (announcementQueue.any((element) => element.timestamp == announcement.timestamp)) { print("Announcement with same timestamp already in queue"); return; } if (!announcement.sendToServer) { if (announcement is DestinationAnnouncementEntry) { BusRouteVariant routeVariant = announcement.routeVariant; if (getRouteVariant() != routeVariant) { setRouteVariant(routeVariant); } announcementQueue.add( await _getDestinationAnnouncement( routeVariant, sendToServer: false ) ); print("Queued destination announcement: ${announcement.displayText}"); print("Audios: ${announcement.audioSources.length}"); return; } announcementQueue.add(announcement); print("Queued announcement: ${announcement.displayText} (no server)"); return; } final databases = appwrite.Databases(auth.client); print("Queuing announcement: ${announcement.displayText} (server)"); print("Announcement type: ${announcement.runtimeType}"); if (announcement.runtimeType == InformationAnnouncementEntry) { announcement as InformationAnnouncementEntry; print("Queing to InformationAnnouncementEntry"); // 5 sedonds in the future DateTime scheduledTime = (await getNow()).add(Duration(seconds: 1)); final document = databases.createDocument( documentId: appwrite.ID.unique(), databaseId: ApiConstants.INFO_Q_DATABASE_ID, collectionId: ApiConstants.INFORMATION_Q_COLLECTION_ID, data: { "ManualAnnouncementIndex": manualAnnouncements.indexOf(announcement), "ScheduledTime": scheduledTime.toIso8601String(), "SessionID": sessionID, } ); print("Queued manual announcement: ${announcement.shortName} (server)"); } else if (announcement.runtimeType == ManualAnnouncementEntry) { announcement as ManualAnnouncementEntry; print("Queing to ManualAnnouncementEntry"); // 5 sedonds in the future DateTime scheduledTime = (await getNow()).add(Duration(seconds: 1)); print("debug2"); List audioFileNames = []; for (AudioWrapperSource source in announcement.audioSources) { if (source is AudioWrapperByteSource) { Uint8List? bytes = await source.bytes; String? filename = null; for (String key in announcementCache.keys) { if (announcementCache[key] == bytes) { filename = key; break; } } } } final document = databases.createDocument( documentId: appwrite.ID.unique(), databaseId: ApiConstants.INFO_Q_DATABASE_ID, collectionId: ApiConstants.MANUAL_Q_COLLECTION_ID, data: { "DisplayText": announcement.displayText, "AudioFileNames": audioFileNames, "ScheduledTime": scheduledTime.toIso8601String(), "SessionID": sessionID, } ); print("Queued manual announcement: ${announcement.shortName}"); } else if (announcement.runtimeType == DestinationAnnouncementEntry) { announcement as DestinationAnnouncementEntry; print("Queing to DestinationAnnouncementEntry"); // 5 sedonds in the future DateTime scheduledTime = (getNow()).add(Duration(seconds: 2)); final document = databases.createDocument( documentId: appwrite.ID.unique(), databaseId: ApiConstants.INFO_Q_DATABASE_ID, collectionId: ApiConstants.DEST_Q_COLLECTION_ID, data: { "RouteNumber": announcement.routeVariant.busRoute.routeNumber, "RouteVariantIndex": announcement.routeVariant.routeVariant, "ScheduledTime": scheduledTime.toIso8601String(), "SessionID": sessionID, } ); print("Queued manual announcement: ${announcement.shortName} (server)"); } } List manualAnnouncements = [ InformationAnnouncementEntry( shortName: "Driver Change", informationText: "Driver Change", audioSources: [AudioWrapperAssetSource("audio/manual_announcements/driverchange.mp3")], ), InformationAnnouncementEntry( shortName: "No Standing Upr Deck", informationText: "No standing on the upper deck", audioSources: [AudioWrapperAssetSource("audio/manual_announcements/nostanding.mp3")], ), InformationAnnouncementEntry( shortName: "Face Covering", informationText: "Please wear a face covering!", audioSources: [AudioWrapperAssetSource("audio/manual_announcements/facecovering.mp3")], ), InformationAnnouncementEntry( shortName: "Seats Upstairs", informationText: "Seats are available upstairs", audioSources: [AudioWrapperAssetSource("audio/manual_announcements/seatsupstairs.mp3")], ), InformationAnnouncementEntry( shortName: "Bus Terminates Here", informationText: "Bus terminates here. Please take your belongings with you", audioSources: [AudioWrapperAssetSource("audio/manual_announcements/busterminateshere.mp3")], ), InformationAnnouncementEntry( shortName: "Bus On Diversion", informationText: "Bus on diversion. Please listen for further announcements", audioSources: [AudioWrapperAssetSource("audio/manual_announcements/busondiversion.mp3")], ), InformationAnnouncementEntry( shortName: "Destination Change", informationText: "Destination Changed - please listen for further instructions", audioSources: [AudioWrapperAssetSource("audio/manual_announcements/destinationchange.mp3")], ), InformationAnnouncementEntry( shortName: "Wheelchair Space", informationText: "Wheelchair space requested", audioSources: [AudioWrapperAssetSource("audio/manual_announcements/wheelchairspace1.mp3")], ), InformationAnnouncementEntry( shortName: "Move Down The Bus", informationText: "Please move down the bus", audioSources: [AudioWrapperAssetSource("audio/manual_announcements/movedownthebus.mp3")], ), InformationAnnouncementEntry( shortName: "Next Stop Closed", informationText: "The next bus stop is closed", audioSources: [AudioWrapperAssetSource("audio/manual_announcements/nextstopclosed.wav")], ), InformationAnnouncementEntry( shortName: "CCTV In Operation", informationText: "CCTV is in operation on this bus", audioSources: [AudioWrapperAssetSource("audio/manual_announcements/cctvoperation.mp3")], ), InformationAnnouncementEntry( shortName: "Safe Door Opening", informationText: "Driver will open the doors when it is safe to do so", audioSources: [AudioWrapperAssetSource("audio/manual_announcements/safedooropening.mp3")], ), InformationAnnouncementEntry( shortName: "Buggy Safety", informationText: "For your child's safety, please remain with your buggy", audioSources: [AudioWrapperAssetSource("audio/manual_announcements/buggysafety.mp3")], ), InformationAnnouncementEntry( shortName: "Wheelchair Space 2", informationText: "Wheelchair priority space required", audioSources: [AudioWrapperAssetSource("audio/manual_announcements/wheelchairspace2.mp3")], ), InformationAnnouncementEntry( shortName: "Service Regulation", informationText: "Regulating service - please listen for further information", audioSources: [AudioWrapperAssetSource("audio/manual_announcements/serviceregulation.mp3")], ), InformationAnnouncementEntry( shortName: "Bus Ready To Depart", informationText: "This bus is ready to depart", audioSources: [AudioWrapperAssetSource("audio/manual_announcements/readytodepart.mp3")], ), ]; AuthAPI auth = AuthAPI(); String sessionID = "65de648aa7f44684ecce"; void updateServer() async { final databases = appwrite.Databases(auth.client); // final document = databases.updateDocument( // databaseId: ApiConstants.INFO_DATABASE_ID, // collectionId: ApiConstants.INFO_COLLECTION_ID, // documentId: documentID, // data: { // "Display": _currentAnnouncement, // } // ); print("Updated server with announcement: $_currentAnnouncement"); } void pullServer() async { if (auth.status == AuthStatus.UNAUTHENTICATED) { return; } final databases = appwrite.Databases(auth.client); // final document = await databases.getDocument( // databaseId: ApiConstants.INFO_DATABASE_ID, // collectionId: ApiConstants.INFO_COLLECTION_ID, // documentId: documentID, // ); // queueAnnouncement(AnnouncementQueueEntry( // displayText: document.data['Display'], // audioSources: [], // sendToServer: false, // Don't send this back to the server, else we'll get an infinite loop // )); // print("Pulled announcement from server: ${document.data['Display']}"); } bool purgeRunning = false; Future deleteAllManualQueueEntries() async { if (purgeRunning) { return; } purgeRunning = true; final databases = appwrite.Databases(auth.client); int offset = 0; const int limit = 25; // Maximum number of documents that can be fetched at once while (true) { // Fetch a page of documents from the manual queue collection print("Deleting manual queue entries"); final manual_q = await databases.listDocuments( databaseId: ApiConstants.INFO_Q_DATABASE_ID, collectionId: ApiConstants.MANUAL_Q_COLLECTION_ID, queries: [ appwrite.Query.search("SessionID", sessionID), appwrite.Query.limit(limit), appwrite.Query.offset(offset), appwrite.Query.orderDesc('\$createdAt') ] ); // If there are no documents in the fetched page, break the loop if (manual_q.documents.isEmpty) { break; } // Delete each document in the fetched page for (models.Document doc in manual_q.documents) { await databases.deleteDocument( databaseId: ApiConstants.INFO_Q_DATABASE_ID, collectionId: ApiConstants.MANUAL_Q_COLLECTION_ID, documentId: doc.$id, ); } // Go to the next page offset += limit; } print("Deleted all manual queue entries"); } void pullQueue() async { if (auth.status == AuthStatus.UNAUTHENTICATED) { return; } List queue = []; final databases = appwrite.Databases(auth.client); // Pull the information queue { final manual_q = await databases.listDocuments( databaseId: ApiConstants.INFO_Q_DATABASE_ID, collectionId: ApiConstants.INFORMATION_Q_COLLECTION_ID, queries: [ appwrite.Query.search("SessionID", sessionID), appwrite.Query.limit(25), appwrite.Query.offset(0), appwrite.Query.orderDesc('\$createdAt') ] ); for (models.Document doc in manual_q.documents) { int index = doc.data['ManualAnnouncementIndex']; InformationAnnouncementEntry announcement_clone = InformationAnnouncementEntry( shortName: manualAnnouncements[index].shortName, informationText: manualAnnouncements[index].displayText, audioSources: manualAnnouncements[index].audioSources, scheduledTime: doc.data["ScheduledTime"] != null ? DateTime.parse(doc.data["ScheduledTime"]) : null, timestamp: DateTime.parse(doc.$createdAt), sendToServer: false, ); // sort the queue by timestamp, so the oldest announcements are at the front queue.add(announcement_clone); } } // Pull the manual queue { final manual_q = await databases.listDocuments( databaseId: ApiConstants.INFO_Q_DATABASE_ID, collectionId: ApiConstants.MANUAL_Q_COLLECTION_ID, queries: [ appwrite.Query.search("SessionID", sessionID), appwrite.Query.limit(25), appwrite.Query.offset(0), appwrite.Query.orderDesc('\$createdAt') ] ); for (models.Document doc in manual_q.documents) { List audioSources = []; for (String filename in doc.data["AudioFileNames"]) { audioSources.add(AudioWrapperByteSource(announcementCache[filename])); } ManualAnnouncementEntry announcement_clone = ManualAnnouncementEntry( sendToServer: false, shortName: "", informationText: doc.data["DisplayText"], audioSources: [...audioSources], scheduledTime: doc.data["ScheduledTime"] != null ? DateTime.parse(doc.data["ScheduledTime"]) : null, ); // sort the queue by timestamp, so the oldest announcements are at the front queue.add(announcement_clone); } } // pull the destination queue { final dest_q = await databases.listDocuments( databaseId: ApiConstants.INFO_Q_DATABASE_ID, collectionId: ApiConstants.DEST_Q_COLLECTION_ID, queries: [ appwrite.Query.search("SessionID", sessionID), appwrite.Query.limit(25), appwrite.Query.offset(0), appwrite.Query.orderDesc('\$createdAt') ] ); for (models.Document doc in dest_q.documents) { BusRoute? route = busSequences.routes[doc.data["RouteNumber"]]; BusRouteVariant? routeVariant = route!.routeVariants[doc.data["RouteVariantIndex"]]; DestinationAnnouncementEntry announcement_clone = DestinationAnnouncementEntry( routeVariant: routeVariant!, scheduledTime: doc.data["ScheduledTime"] != null ? DateTime.parse(doc.data["ScheduledTime"]) : null, timestamp: DateTime.parse(doc.$createdAt), sendToServer: false, audioSources: [], ); // sort the queue by timestamp, so the oldest announcements are at the front queue.add(announcement_clone); } } for (AnnouncementQueueEntry entry in queue) { // Dont queue announcements that are older than now if (entry.scheduledTime != null && entry.scheduledTime!.isBefore(await getNow())) { continue; } queueAnnouncement(entry); } } appwrite.RealtimeSubscription? information_q_subscription; appwrite.RealtimeSubscription? manual_q_subscription; appwrite.RealtimeSubscription? destination_q_subscription; Future setupRealtime() async { if (information_q_subscription != null) { return; } // await deleteAllManualQueueEntries(); //todo print("Setting up realtime"); // Websocket appwrite.Realtime realtime = appwrite.Realtime(auth.client); information_q_subscription = realtime.subscribe( ['databases.${ApiConstants.INFO_Q_DATABASE_ID}.collections.${ApiConstants.INFORMATION_Q_COLLECTION_ID}.documents'], ); information_q_subscription?.stream.listen((event) { print("Manual queue entry added"); pullQueue(); }); manual_q_subscription = realtime.subscribe( ['databases.${ApiConstants.INFO_Q_DATABASE_ID}.collections.${ApiConstants.MANUAL_Q_COLLECTION_ID}.documents'], ); manual_q_subscription?.stream.listen((event) { print("Manual queue entry added"); pullQueue(); }); destination_q_subscription = realtime.subscribe( ['databases.${ApiConstants.INFO_Q_DATABASE_ID}.collections.${ApiConstants.DEST_Q_COLLECTION_ID}.documents'], ); destination_q_subscription?.stream.listen((event) { print("Destination queue entry added"); pullQueue(); }); print("Subscribed to servers"); await Future.delayed(Duration(seconds: 90)); information_q_subscription?.close(); information_q_subscription = null; manual_q_subscription?.close(); manual_q_subscription = null; destination_q_subscription?.close(); destination_q_subscription = null; setupRealtime(); } } class AnnouncementQueueEntry { final String displayText; final List audioSources; bool sendToServer = true; DateTime? scheduledTime; DateTime? timestamp; AnnouncementQueueEntry({required this.displayText, required this.audioSources, this.sendToServer = true, this.scheduledTime, this.timestamp}); } class NamedAnnouncementQueueEntry extends AnnouncementQueueEntry { final String shortName; NamedAnnouncementQueueEntry({ required this.shortName, required String displayText, required List audioSources, DateTime? scheduledTime, DateTime? timestamp, bool sendToServer = true, }) : super( displayText: displayText, audioSources: audioSources, sendToServer: sendToServer, scheduledTime: scheduledTime, timestamp: timestamp, ); } class ManualAnnouncementEntry extends NamedAnnouncementQueueEntry { ManualAnnouncementEntry({ required String shortName, required String informationText, required List audioSources, DateTime? scheduledTime, DateTime? timestamp, bool sendToServer = true, }) : super( shortName: shortName, displayText: informationText, audioSources: audioSources, sendToServer: sendToServer, scheduledTime: scheduledTime, timestamp: timestamp, ); } class InformationAnnouncementEntry extends NamedAnnouncementQueueEntry { InformationAnnouncementEntry({ required String shortName, required String informationText, required List audioSources, DateTime? scheduledTime, DateTime? timestamp, bool sendToServer = true, }) : super( shortName: shortName, displayText: informationText, audioSources: audioSources, sendToServer: sendToServer, scheduledTime: scheduledTime, timestamp: timestamp, ); } class DestinationAnnouncementEntry extends NamedAnnouncementQueueEntry { final BusRouteVariant routeVariant; DestinationAnnouncementEntry({ required this.routeVariant, required List audioSources, DateTime? scheduledTime, DateTime? timestamp, bool sendToServer = true, }) : super( shortName: "Destination", displayText: "${routeVariant.busRoute.routeNumber} to ${routeVariant.busStops.last.formattedStopName}", audioSources: audioSources, sendToServer: sendToServer, scheduledTime: scheduledTime, timestamp: timestamp, ); } var abs = (int value) => value < 0 ? -value : value;