5
0
Fork 0
dart-eventsource/lib/publisher.dart

132 lines
3.9 KiB
Dart

library eventsource.server;
export "src/event.dart";
import "dart:async";
import "package:logging/logging.dart" as log;
import "src/event.dart";
import "src/event_cache.dart";
import "src/proxy_sink.dart";
/// An EventSource publisher. It can manage different channels of events.
/// This class forms the backbone of an EventSource server. To actually serve
/// a web server, use this together with [shelf_eventsource] or another server
/// implementation.
class EventSourcePublisher extends Sink<Event> {
log.Logger logger;
EventCache _cache;
/// Create a new EventSource server.
///
/// When using a cache, for efficient replaying, it is advisable to use a
/// custom Event implementation that overrides the `Event.compareTo` method.
/// F.e. if integer events are used, sorting should be done on integers and
/// not on the string representations of them.
/// If your Event's id properties are not incremental using
/// [Comparable.compare], set [comparableIds] to false.
EventSourcePublisher(
{int cacheCapacity: 0,
bool comparableIds: false,
bool enableLogging: true}) {
if (cacheCapacity > 0) {
_cache = new EventCache(cacheCapacity: cacheCapacity);
}
if (enableLogging) {
logger = new log.Logger("EventSourceServer");
}
}
Map<String, List<ProxySink>> _subsByChannel = {};
/// Creates a Sink for the specified channel.
/// The `add` and `remove` methods of this channel are equivalent to the
/// respective methods of this class with the specific channel passed along.
Sink<Event> channel(String channel) => new ProxySink(
onAdd: (e) => add(e, channels: [channel]),
onClose: () => close(channels: [channel]));
/// Add a publication to the specified channels.
/// By default, only adds to the default channel.
@override
void add(Event event, {Iterable<String> channels: const [""]}) {
for (String channel in channels) {
List subs = _subsByChannel[channel];
if (subs == null) {
continue;
}
_logFiner(
"Sending event on channel $channel to ${subs.length} subscribers.");
for (var sub in subs) {
sub.add(event);
}
}
if (_cache != null) {
_cache.add(event, channels);
}
}
/// Close the specified channels.
/// All the connections with the subscribers to this channels will be closed.
/// By default only closes the default channel.
@override
void close({Iterable<String> channels: const [""]}) {
for (String channel in channels) {
List subs = _subsByChannel[channel];
if (subs == null) {
continue;
}
_logInfo("Closing channel $channel with ${subs.length} subscribers.");
for (var sub in subs) {
sub.close();
}
}
if (_cache != null) {
_cache.clear(channels);
}
}
/// Close all the open channels.
void closeAllChannels() => close(channels: _subsByChannel.keys);
/// Initialize a new subscription and replay when possible.
/// Should not be used by the user directly.
void newSubscription(
{Function onEvent,
Function onClose,
String channel,
String lastEventId}) {
_logFine("New subscriber on channel $channel.");
// create a sink for the subscription
var sub = new ProxySink(onAdd: onEvent, onClose: onClose);
// save the subscription
_subsByChannel.putIfAbsent(channel, () => []).add(sub);
// replay past events
if (_cache != null && lastEventId != null) {
scheduleMicrotask(() {
_logFine("Replaying events on channel $channel from id $lastEventId.");
_cache.replay(sub, lastEventId, channel);
});
}
}
void _logInfo(message) {
if (logger != null) {
logger.log(log.Level.INFO, message);
}
}
void _logFine(message) {
if (logger != null) {
logger.log(log.Level.FINE, message);
}
}
void _logFiner(message) {
if (logger != null) {
logger.log(log.Level.FINER, message);
}
}
}