From fe2f10fecfe9a7152185b3ea37cd020a44afe7b1 Mon Sep 17 00:00:00 2001 From: Sudharshan Date: Sat, 25 May 2019 09:46:21 +0800 Subject: [PATCH] fix: made heartbeat more robust --- ios/Runner/PeerConnectionWrapper.swift | 1 + ios/Runner/PeerManager.swift | 2 +- ios/Runner/SignalingApiProvider.swift | 3 +- lib/main.dart | 1 + lib/src/blocs/contact_bloc.dart | 3 + lib/src/blocs/conversation_bloc.dart | 3 + lib/src/blocs/heartbeat_bloc.dart | 88 ++++++++++++------- lib/src/services/cache_http.dart | 6 +- lib/src/ui/home/home.dart | 1 + lib/src/ui/home/widgets/contact_list.dart | 12 +-- .../ui/home/widgets/conversation_list.dart | 12 +-- lib/src/ui/login/widgets/login_page.dart | 1 + lib/src/ui/widgets/user_avatar.dart | 29 +++--- 13 files changed, 94 insertions(+), 68 deletions(-) diff --git a/ios/Runner/PeerConnectionWrapper.swift b/ios/Runner/PeerConnectionWrapper.swift index 44f77e5..489ef11 100644 --- a/ios/Runner/PeerConnectionWrapper.swift +++ b/ios/Runner/PeerConnectionWrapper.swift @@ -188,6 +188,7 @@ extension PeerConnectionWrapper: RTCPeerConnectionDelegate { func peerConnection(_ peerConnection: RTCPeerConnection, didAdd stream: RTCMediaStream) { print("adding new stream from remote") + self.remoteAudioTrack = stream.audioTracks[0] } func peerConnection(_ peerConnection: RTCPeerConnection, didRemove stream: RTCMediaStream) { diff --git a/ios/Runner/PeerManager.swift b/ios/Runner/PeerManager.swift index e0d65e6..ad586b6 100644 --- a/ios/Runner/PeerManager.swift +++ b/ios/Runner/PeerManager.swift @@ -77,7 +77,7 @@ class PeerManager: NSObject { private extension PeerManager { func initialiseEventSource() { eventSource?.addEventListener("offer") { (id, event, data) in - + print(data!) guard let id = id, let data = data else { // Incorrect packet type error return diff --git a/ios/Runner/SignalingApiProvider.swift b/ios/Runner/SignalingApiProvider.swift index 2e95f3f..cfa5f66 100644 --- a/ios/Runner/SignalingApiProvider.swift +++ b/ios/Runner/SignalingApiProvider.swift @@ -34,7 +34,6 @@ class SignalingApiProvider: NSObject { let dataVal = try NSURLConnection.sendSynchronousRequest(request, returning: &response) do { if let jsonResult = try JSONSerialization.jsonObject(with: dataVal, options: []) as? [String] { - print("Synchronous\(jsonResult)") // convert this to an array of strings for device in jsonResult { deviceList.append(device) @@ -68,7 +67,6 @@ class SignalingApiProvider: NSObject { let dataVal = try NSURLConnection.sendSynchronousRequest(request, returning: &response) do { if let jsonResult = try JSONSerialization.jsonObject(with: dataVal, options: []) as? [Any] { - print("Synchronous\(jsonResult)") // Need code to convert this to an array of strings for user in jsonResult { if let userObject = user as? [String: String] { @@ -99,6 +97,7 @@ class SignalingApiProvider: NSObject { // prepare json data let json: [String: Any] = ["event": event, "data": data] + print(json) let jsonData = try? JSONSerialization.data(withJSONObject: json) diff --git a/lib/main.dart b/lib/main.dart index ac6da35..a399de0 100644 --- a/lib/main.dart +++ b/lib/main.dart @@ -1,5 +1,6 @@ import "package:flutter/services.dart"; import 'routes.dart'; +import "src/blocs/heartbeat_bloc.dart"; void main() { SystemChrome.setSystemUIOverlayStyle(SystemUiOverlayStyle.light); diff --git a/lib/src/blocs/contact_bloc.dart b/lib/src/blocs/contact_bloc.dart index 68f353e..b6232c9 100644 --- a/lib/src/blocs/contact_bloc.dart +++ b/lib/src/blocs/contact_bloc.dart @@ -12,9 +12,12 @@ class ContactBloc { fetchContacts() async { List contactList = await _provider.fetchContacts(); _contactsFetcher.sink.add(contactList); + return contactList; } dispose() { _contactsFetcher.close(); } } + +final contactBloc = ContactBloc(); diff --git a/lib/src/blocs/conversation_bloc.dart b/lib/src/blocs/conversation_bloc.dart index e079494..befa150 100644 --- a/lib/src/blocs/conversation_bloc.dart +++ b/lib/src/blocs/conversation_bloc.dart @@ -21,6 +21,7 @@ class ConversationsBloc { } } +// Should be a scoped widget class ConversationMembersBloc { final String conversationId; final _provider = ConversationApiProvider(); @@ -40,3 +41,5 @@ class ConversationMembersBloc { _membersFetcher.close(); } } + +final conversationsBloc = ConversationsBloc(); diff --git a/lib/src/blocs/heartbeat_bloc.dart b/lib/src/blocs/heartbeat_bloc.dart index 1b05b85..ee63d90 100644 --- a/lib/src/blocs/heartbeat_bloc.dart +++ b/lib/src/blocs/heartbeat_bloc.dart @@ -7,62 +7,84 @@ import "package:rxdart/rxdart.dart"; import "package:http/http.dart" as http; import "../models/ping_model.dart"; +import "../models/user_model.dart"; import "../services/login_manager.dart"; +import "../resources/contact_api_provider.dart"; import "../../settings.dart"; class HeartbeatReceiverBloc { LoginManager loginManager = new LoginManager(); + ContactApiProvider contactApiProvider = new ContactApiProvider(); - String userId; - DateTime lastSeen; - String status; + final Map lastSeen = {}; + final Map status = {}; - final _coloursFetcher = PublishSubject(); + final http.Client client = http.Client(); + final _statusFetcher = PublishSubject>(); - final http.Client client; - Observable get colours => _coloursFetcher.stream; + Observable> get stream => _statusFetcher.stream; - HeartbeatReceiverBloc(String userId) : client = http.Client() { - this.userId = userId; - lastSeen = DateTime.fromMillisecondsSinceEpoch(0); - status = ""; + HeartbeatReceiverBloc() { + init(); + } - loginManager.getToken().then((token) { - EventSource.connect("$baseUrlHeartbeat/subscribe/$userId?token=$token", + init() async { + final users = await contactApiProvider.fetchContacts(); + final authToken = await loginManager.getToken(); + + // Setting up event + for (final user in users) { + this.lastSeen[user.id] = DateTime.fromMillisecondsSinceEpoch(0); + this.status[user.id] = ""; + + EventSource.connect( + "$baseUrlHeartbeat/subscribe/${user.id}?token=$authToken", client: client) .then((es) { es.listen((Event event) { - // Guard against empty packets if (event.data == null) { return; } Ping ping = Ping.fromJson(jsonDecode(event.data)); - lastSeen = DateTime.fromMillisecondsSinceEpoch(ping.time * 1000); - status = ping.status; + this.lastSeen[user.id] = + DateTime.fromMillisecondsSinceEpoch(ping.time * 1000); }); - final checkDuration = Duration(seconds: 20); - final timeoutDuration = Duration(minutes: 1); - new Timer.periodic(checkDuration, (Timer t) { - if (status == "on_call") { - _coloursFetcher.sink.add("busy"); - } else { - final now = new DateTime.now(); - final difference = now.difference(this.lastSeen); - if (difference < timeoutDuration) { - _coloursFetcher.sink.add("online"); - } else { - _coloursFetcher.sink.add("offline"); - } - } - }); - }).catchError((e) => - {}); // Add actual error handling logic for stopped connections + }).catchError((e) => {}); + } + + // Setting up timers + final checkDuration = Duration(seconds: 20); + final timeoutDuration = Duration(minutes: 1); + + new Timer.periodic(checkDuration, (Timer t) { + for (final user in users) { + final now = new DateTime.now(); + final difference = now.difference(this.lastSeen[user.id]); + + if (difference < timeoutDuration) { + _statusFetcher.sink.add({"user": user.id, "status": "online"}); + this.status[user.id] = "online"; + } else { + _statusFetcher.sink.add({"user": user.id, "status": "offline"}); + this.status[user.id] = "offline"; + } + } }); } + getLastStatus(String userId) { + return status[userId]; + } + + flush() { + _statusFetcher.sink.add({"user": "flushing", "status": ""}); + } + dispose() { - _coloursFetcher.close(); + _statusFetcher.close(); client.close(); } } + +final heartbeatReceiverBloc = HeartbeatReceiverBloc(); diff --git a/lib/src/services/cache_http.dart b/lib/src/services/cache_http.dart index 8f2b8b4..a1e15cb 100644 --- a/lib/src/services/cache_http.dart +++ b/lib/src/services/cache_http.dart @@ -32,11 +32,12 @@ class CacheHttp { Future fetch(String url, {bool update = false, Map headers = const {}}) async { if (!this.hasInit) { - this.hasInit = true; await this.init(); + this.hasInit = true; } try { final response = await http.get(url, headers: headers); + if (response.statusCode < 200 || response.statusCode >= 300) { // Unsuccessful response, use cache final body = await this.getCache(url); @@ -54,6 +55,9 @@ class CacheHttp { } Future getCache(String url) async { + if (!this.hasInit) { + print("UNINIT"); + } List cached = await this .db .rawQuery("SELECT resource FROM cache WHERE url = ?", [url]); diff --git a/lib/src/ui/home/home.dart b/lib/src/ui/home/home.dart index add7946..b7c5a33 100644 --- a/lib/src/ui/home/home.dart +++ b/lib/src/ui/home/home.dart @@ -7,6 +7,7 @@ import "../bottom_bar/bottom_bar.dart"; import "../../services/heartbeat_manager.dart"; import "../../services/conversation_manager.dart"; import "../../blocs/message_bloc.dart"; +import "../../blocs/heartbeat_bloc.dart"; class Home extends StatefulWidget { @override diff --git a/lib/src/ui/home/widgets/contact_list.dart b/lib/src/ui/home/widgets/contact_list.dart index 2ab250c..9161465 100644 --- a/lib/src/ui/home/widgets/contact_list.dart +++ b/lib/src/ui/home/widgets/contact_list.dart @@ -13,24 +13,16 @@ class ContactList extends StatefulWidget { } class _ContactListState extends State { - final bloc = ContactBloc(); - @override void initState() { super.initState(); - bloc.fetchContacts(); - } - - @override - void dispose() { - bloc.dispose(); - super.dispose(); + contactBloc.fetchContacts(); } @override Widget build(BuildContext context) { return StreamBuilder( - stream: bloc.contacts, + stream: contactBloc.contacts, builder: (context, AsyncSnapshot> snapshot) { if (snapshot.hasData) { return buildList(snapshot); diff --git a/lib/src/ui/home/widgets/conversation_list.dart b/lib/src/ui/home/widgets/conversation_list.dart index 41f9922..5e7e058 100644 --- a/lib/src/ui/home/widgets/conversation_list.dart +++ b/lib/src/ui/home/widgets/conversation_list.dart @@ -13,18 +13,10 @@ class ConversationList extends StatefulWidget { } class _ConversationListState extends State { - final bloc = ConversationsBloc(); - @override initState() { super.initState(); - bloc.fetchConversations(); - } - - @override - dispose() { - bloc.dispose(); - super.dispose(); + conversationsBloc.fetchConversations(); } @override @@ -32,7 +24,7 @@ class _ConversationListState extends State { return Padding( padding: EdgeInsets.only(top: 10.0), child: StreamBuilder( - stream: bloc.conversations, + stream: conversationsBloc.conversations, builder: (context, AsyncSnapshot> snapshot) { if (snapshot.hasData) { return buildList(snapshot.data); diff --git a/lib/src/ui/login/widgets/login_page.dart b/lib/src/ui/login/widgets/login_page.dart index 486742b..1159dc7 100644 --- a/lib/src/ui/login/widgets/login_page.dart +++ b/lib/src/ui/login/widgets/login_page.dart @@ -60,6 +60,7 @@ class _LoginPageState extends State { // Waiting for initialization await ConversationManager.init(authToken); + print(authToken); Navigator.pushNamed(context, 'welcome/otp'); }), diff --git a/lib/src/ui/widgets/user_avatar.dart b/lib/src/ui/widgets/user_avatar.dart index a8e2cb4..c93a25d 100644 --- a/lib/src/ui/widgets/user_avatar.dart +++ b/lib/src/ui/widgets/user_avatar.dart @@ -1,4 +1,5 @@ import "package:flutter/material.dart"; +import "dart:async"; import "../../models/user_model.dart"; import "../../blocs/heartbeat_bloc.dart"; @@ -20,18 +21,18 @@ class UserAvatar extends StatefulWidget { } class _UserAvatarState extends State { - HeartbeatReceiverBloc bloc; + String lastStatus = ""; @override - void initState() { + initState() { super.initState(); - bloc = HeartbeatReceiverBloc(widget.user.id); + lastStatus = heartbeatReceiverBloc.getLastStatus(widget.user.id); + initializeStream(); } - @override - void dispose() { - bloc.dispose(); - super.dispose(); + initializeStream() async { + return Future.delayed( + const Duration(milliseconds: 1), () => heartbeatReceiverBloc.flush()); } @override @@ -55,12 +56,17 @@ class _UserAvatarState extends State { ), radius: widget.radius), StreamBuilder( - stream: bloc.colours, - builder: (context, AsyncSnapshot snapshot) { - String state; + stream: heartbeatReceiverBloc.stream, + builder: (context, AsyncSnapshot> snapshot) { + Map state; if (snapshot.hasData) { state = snapshot.data; - if (state == "online") { + + if (state["user"] == widget.user.id) { + this.lastStatus = state["status"]; + } + + if (lastStatus == "online") { return Container( width: 12.0, height: 12.0, @@ -71,6 +77,7 @@ class _UserAvatarState extends State { width: 1.5, color: const Color(0xFFFFFFFF)))); } } + return Container(); }), ]));