4
2
Fork 0

feat: streaming contact used to configure heartbeat

pull/74/head
Sudharshan S. 2019-08-27 09:51:10 +08:00
parent 8814c4f98d
commit 060f6ae750
Signed by: sudharshan
GPG Key ID: C861C97AAF3D9559
3 changed files with 51 additions and 38 deletions

View File

@ -42,6 +42,7 @@ class ContactBloc {
dispose() {
_contactsFetcher.close();
client.close();
}
}

View File

@ -92,6 +92,7 @@ class ConversationMembersBloc {
dispose() {
_membersFetcher.close();
client.close();
}
}

View File

@ -7,19 +7,23 @@ import "package:rxdart/rxdart.dart";
import "package:http/http.dart" as http;
import "../models/ping_model.dart";
import "../models/user_model.dart";
import "../services/login_manager.dart";
import "../resources/contact_api_provider.dart";
import "../blocs/contact_bloc.dart";
import "../../settings.dart";
class HeartbeatReceiverBloc {
LoginManager loginManager = new LoginManager();
final List<StreamSubscription> esHandles = [];
final Map<String, DateTime> lastSeen = {};
final Map<String, String> status = {};
final http.Client client = http.Client();
final _statusFetcher = PublishSubject<Map<String, String>>();
List<User> lastContacts;
Observable<Map<String, String>> get stream => _statusFetcher.stream;
HeartbeatReceiverBloc() {
@ -27,46 +31,53 @@ class HeartbeatReceiverBloc {
}
init() async {
final users = await contactApiProvider.fetchContacts();
final authToken = await loginManager.getToken();
// Setting up event
for (final user in users) {
this.lastSeen[user.id] = DateTime.fromMillisecondsSinceEpoch(0);
this.status[user.id] = "";
EventSource.connect(
"$baseUrlHeartbeat/subscribe/${user.id}?token=$authToken",
client: client)
.then((es) {
es.listen((Event event) {
if (event.data == null) {
return;
}
Ping ping = Ping.fromJson(jsonDecode(event.data));
this.lastSeen[user.id] =
DateTime.fromMillisecondsSinceEpoch(ping.time * 1000);
_statusFetcher.sink.add({"user": user.id, "status": "online"});
this.status[user.id] = "online";
});
}).catchError((e) => {});
}
// Setting up timers
final checkDuration = Duration(seconds: 20);
final timeoutDuration = Duration(minutes: 20);
new Timer.periodic(checkDuration, (Timer t) {
for (final user in users) {
final now = new DateTime.now();
final difference = now.difference(this.lastSeen[user.id]);
if (difference > timeoutDuration) {
_statusFetcher.sink.add({"user": user.id, "status": "offline"});
this.status[user.id] = "offline";
}
contactBloc.contacts.listen((List<User> contacts) {
if (contacts == this.lastContacts) {
return;
}
esHandles.forEach((handle) async => await handle.cancel());
for (final contact in contacts) {
this.lastSeen[contact.id] = DateTime.fromMillisecondsSinceEpoch(0);
this.status[contact.id] = "offline";
EventSource.connect(
"$baseUrlHeartbeat/subscribe/${contact.id}?token=$authToken",
client: this.client)
.then((es) {
StreamSubscription handle = es.listen((Event event) {
if (event.data == null) {
return;
}
Ping ping = Ping.fromJson(jsonDecode(event.data));
this.lastSeen[contact.id] =
DateTime.fromMillisecondsSinceEpoch(ping.time * 1000);
_statusFetcher.sink.add({"user": contact.id, "status": "online"});
this.status[contact.id] = "online";
});
esHandles.add(handle);
});
}
this.lastContacts = contacts;
});
// Setting up timers to send the heartbeat to the stream
final checkDuration = Duration(seconds: 20);
final timeoutDuration = Duration(minutes: 1);
Timer.periodic(checkDuration, (Timer t) {
this.lastSeen.forEach((id, lastSeen) {
this.status[id] =
(DateTime.now().difference(lastSeen) < timeoutDuration)
? "online"
: "offline";
_statusFetcher.sink.add({"user": id, "status": this.status[id]});
});
});
}