4
2
Fork 0

fix: made heartbeat more robust

pull/43/head
Sudharshan S. 2019-05-25 09:46:21 +08:00
parent 227bf52dfe
commit fe2f10fecf
Signed by: sudharshan
GPG Key ID: C861C97AAF3D9559
13 changed files with 94 additions and 68 deletions

View File

@ -188,6 +188,7 @@ extension PeerConnectionWrapper: RTCPeerConnectionDelegate {
func peerConnection(_ peerConnection: RTCPeerConnection, didAdd stream: RTCMediaStream) {
print("adding new stream from remote")
self.remoteAudioTrack = stream.audioTracks[0]
}
func peerConnection(_ peerConnection: RTCPeerConnection, didRemove stream: RTCMediaStream) {

View File

@ -77,7 +77,7 @@ class PeerManager: NSObject {
private extension PeerManager {
func initialiseEventSource() {
eventSource?.addEventListener("offer") { (id, event, data) in
print(data!)
guard let id = id, let data = data else {
// Incorrect packet type error
return

View File

@ -34,7 +34,6 @@ class SignalingApiProvider: NSObject {
let dataVal = try NSURLConnection.sendSynchronousRequest(request, returning: &response)
do {
if let jsonResult = try JSONSerialization.jsonObject(with: dataVal, options: []) as? [String] {
print("Synchronous\(jsonResult)")
// convert this to an array of strings
for device in jsonResult {
deviceList.append(device)
@ -68,7 +67,6 @@ class SignalingApiProvider: NSObject {
let dataVal = try NSURLConnection.sendSynchronousRequest(request, returning: &response)
do {
if let jsonResult = try JSONSerialization.jsonObject(with: dataVal, options: []) as? [Any] {
print("Synchronous\(jsonResult)")
// Need code to convert this to an array of strings
for user in jsonResult {
if let userObject = user as? [String: String] {
@ -99,6 +97,7 @@ class SignalingApiProvider: NSObject {
// prepare json data
let json: [String: Any] = ["event": event, "data": data]
print(json)
let jsonData = try? JSONSerialization.data(withJSONObject: json)

View File

@ -1,5 +1,6 @@
import "package:flutter/services.dart";
import 'routes.dart';
import "src/blocs/heartbeat_bloc.dart";
void main() {
SystemChrome.setSystemUIOverlayStyle(SystemUiOverlayStyle.light);

View File

@ -12,9 +12,12 @@ class ContactBloc {
fetchContacts() async {
List<User> contactList = await _provider.fetchContacts();
_contactsFetcher.sink.add(contactList);
return contactList;
}
dispose() {
_contactsFetcher.close();
}
}
final contactBloc = ContactBloc();

View File

@ -21,6 +21,7 @@ class ConversationsBloc {
}
}
// Should be a scoped widget
class ConversationMembersBloc {
final String conversationId;
final _provider = ConversationApiProvider();
@ -40,3 +41,5 @@ class ConversationMembersBloc {
_membersFetcher.close();
}
}
final conversationsBloc = ConversationsBloc();

View File

@ -7,62 +7,84 @@ import "package:rxdart/rxdart.dart";
import "package:http/http.dart" as http;
import "../models/ping_model.dart";
import "../models/user_model.dart";
import "../services/login_manager.dart";
import "../resources/contact_api_provider.dart";
import "../../settings.dart";
class HeartbeatReceiverBloc {
LoginManager loginManager = new LoginManager();
ContactApiProvider contactApiProvider = new ContactApiProvider();
String userId;
DateTime lastSeen;
String status;
final Map<String, DateTime> lastSeen = {};
final Map<String, String> status = {};
final _coloursFetcher = PublishSubject<String>();
final http.Client client = http.Client();
final _statusFetcher = PublishSubject<Map<String, String>>();
final http.Client client;
Observable<String> get colours => _coloursFetcher.stream;
Observable<Map<String, String>> get stream => _statusFetcher.stream;
HeartbeatReceiverBloc(String userId) : client = http.Client() {
this.userId = userId;
lastSeen = DateTime.fromMillisecondsSinceEpoch(0);
status = "";
HeartbeatReceiverBloc() {
init();
}
loginManager.getToken().then((token) {
EventSource.connect("$baseUrlHeartbeat/subscribe/$userId?token=$token",
init() async {
final users = await contactApiProvider.fetchContacts();
final authToken = await loginManager.getToken();
// Setting up event
for (final user in users) {
this.lastSeen[user.id] = DateTime.fromMillisecondsSinceEpoch(0);
this.status[user.id] = "";
EventSource.connect(
"$baseUrlHeartbeat/subscribe/${user.id}?token=$authToken",
client: client)
.then((es) {
es.listen((Event event) {
// Guard against empty packets
if (event.data == null) {
return;
}
Ping ping = Ping.fromJson(jsonDecode(event.data));
lastSeen = DateTime.fromMillisecondsSinceEpoch(ping.time * 1000);
status = ping.status;
this.lastSeen[user.id] =
DateTime.fromMillisecondsSinceEpoch(ping.time * 1000);
});
final checkDuration = Duration(seconds: 20);
final timeoutDuration = Duration(minutes: 1);
new Timer.periodic(checkDuration, (Timer t) {
if (status == "on_call") {
_coloursFetcher.sink.add("busy");
} else {
final now = new DateTime.now();
final difference = now.difference(this.lastSeen);
if (difference < timeoutDuration) {
_coloursFetcher.sink.add("online");
} else {
_coloursFetcher.sink.add("offline");
}
}
});
}).catchError((e) =>
{}); // Add actual error handling logic for stopped connections
}).catchError((e) => {});
}
// Setting up timers
final checkDuration = Duration(seconds: 20);
final timeoutDuration = Duration(minutes: 1);
new Timer.periodic(checkDuration, (Timer t) {
for (final user in users) {
final now = new DateTime.now();
final difference = now.difference(this.lastSeen[user.id]);
if (difference < timeoutDuration) {
_statusFetcher.sink.add({"user": user.id, "status": "online"});
this.status[user.id] = "online";
} else {
_statusFetcher.sink.add({"user": user.id, "status": "offline"});
this.status[user.id] = "offline";
}
}
});
}
getLastStatus(String userId) {
return status[userId];
}
flush() {
_statusFetcher.sink.add({"user": "flushing", "status": ""});
}
dispose() {
_coloursFetcher.close();
_statusFetcher.close();
client.close();
}
}
final heartbeatReceiverBloc = HeartbeatReceiverBloc();

View File

@ -32,11 +32,12 @@ class CacheHttp {
Future<String> fetch(String url,
{bool update = false, Map<String, String> headers = const {}}) async {
if (!this.hasInit) {
this.hasInit = true;
await this.init();
this.hasInit = true;
}
try {
final response = await http.get(url, headers: headers);
if (response.statusCode < 200 || response.statusCode >= 300) {
// Unsuccessful response, use cache
final body = await this.getCache(url);
@ -54,6 +55,9 @@ class CacheHttp {
}
Future<String> getCache(String url) async {
if (!this.hasInit) {
print("UNINIT");
}
List<Map> cached = await this
.db
.rawQuery("SELECT resource FROM cache WHERE url = ?", [url]);

View File

@ -7,6 +7,7 @@ import "../bottom_bar/bottom_bar.dart";
import "../../services/heartbeat_manager.dart";
import "../../services/conversation_manager.dart";
import "../../blocs/message_bloc.dart";
import "../../blocs/heartbeat_bloc.dart";
class Home extends StatefulWidget {
@override

View File

@ -13,24 +13,16 @@ class ContactList extends StatefulWidget {
}
class _ContactListState extends State<ContactList> {
final bloc = ContactBloc();
@override
void initState() {
super.initState();
bloc.fetchContacts();
}
@override
void dispose() {
bloc.dispose();
super.dispose();
contactBloc.fetchContacts();
}
@override
Widget build(BuildContext context) {
return StreamBuilder(
stream: bloc.contacts,
stream: contactBloc.contacts,
builder: (context, AsyncSnapshot<List<User>> snapshot) {
if (snapshot.hasData) {
return buildList(snapshot);

View File

@ -13,18 +13,10 @@ class ConversationList extends StatefulWidget {
}
class _ConversationListState extends State<ConversationList> {
final bloc = ConversationsBloc();
@override
initState() {
super.initState();
bloc.fetchConversations();
}
@override
dispose() {
bloc.dispose();
super.dispose();
conversationsBloc.fetchConversations();
}
@override
@ -32,7 +24,7 @@ class _ConversationListState extends State<ConversationList> {
return Padding(
padding: EdgeInsets.only(top: 10.0),
child: StreamBuilder(
stream: bloc.conversations,
stream: conversationsBloc.conversations,
builder: (context, AsyncSnapshot<List<Conversation>> snapshot) {
if (snapshot.hasData) {
return buildList(snapshot.data);

View File

@ -60,6 +60,7 @@ class _LoginPageState extends State<LoginPage> {
// Waiting for initialization
await ConversationManager.init(authToken);
print(authToken);
Navigator.pushNamed(context, 'welcome/otp');
}),

View File

@ -1,4 +1,5 @@
import "package:flutter/material.dart";
import "dart:async";
import "../../models/user_model.dart";
import "../../blocs/heartbeat_bloc.dart";
@ -20,18 +21,18 @@ class UserAvatar extends StatefulWidget {
}
class _UserAvatarState extends State<UserAvatar> {
HeartbeatReceiverBloc bloc;
String lastStatus = "";
@override
void initState() {
initState() {
super.initState();
bloc = HeartbeatReceiverBloc(widget.user.id);
lastStatus = heartbeatReceiverBloc.getLastStatus(widget.user.id);
initializeStream();
}
@override
void dispose() {
bloc.dispose();
super.dispose();
initializeStream() async {
return Future.delayed(
const Duration(milliseconds: 1), () => heartbeatReceiverBloc.flush());
}
@override
@ -55,12 +56,17 @@ class _UserAvatarState extends State<UserAvatar> {
),
radius: widget.radius),
StreamBuilder(
stream: bloc.colours,
builder: (context, AsyncSnapshot<String> snapshot) {
String state;
stream: heartbeatReceiverBloc.stream,
builder: (context, AsyncSnapshot<Map<String, String>> snapshot) {
Map<String, String> state;
if (snapshot.hasData) {
state = snapshot.data;
if (state == "online") {
if (state["user"] == widget.user.id) {
this.lastStatus = state["status"];
}
if (lastStatus == "online") {
return Container(
width: 12.0,
height: 12.0,
@ -71,6 +77,7 @@ class _UserAvatarState extends State<UserAvatar> {
width: 1.5, color: const Color(0xFFFFFFFF))));
}
}
return Container();
}),
]));