5
0
Fork 0

Initial version

pull/1/head
Steven Roose 2016-11-27 21:31:19 +01:00
commit 3dadfb5062
16 changed files with 774 additions and 0 deletions

23
.gitignore vendored Normal file
View File

@ -0,0 +1,23 @@
# Files and directories created by pub
.packages
.pub/
build/
packages
# Remove the following pattern if you wish to check in your lock file
pubspec.lock.old
# Files created by dart2js
*.dart.js
*.part.js
*.js.deps
*.js.map
*.info.json
# Directory created by dartdoc
doc/api/
# JetBrains IDEs
.idea/
*.iml
*.ipr
*.iws

5
CHANGELOG.md Normal file
View File

@ -0,0 +1,5 @@
# Changelog
## 0.1.0 (2016-11-27)
- Initial version

21
LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2016 Steven Roose
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

34
README.md Normal file
View File

@ -0,0 +1,34 @@
# eventsource
A library for using EventSource or Server-Side Events (SSE).
Both client and server functionality is provided.
## Client usage
For more advanced usage, see the `example/` directory.
Creating a new EventSource client is as easy as a single call.
The http package is used under the hood, so wherever this package works, this lbirary will also work.
Browser usage is slightly different.
```dart
EventSource eventSource = await EventSource.connect("http://example.com/events");
// in browsers, you need to pass a http.BrowserClient:
EventSource eventSource = await EventSource.connect("http://example.com/events",
client: new http.BrowserClient());
```
## Server usage
We recommend using [`shelf_eventsource`](https://pub.dartlang.org/packages/shelf_eventsource) for
serving Server-Side Events.
This library provides an `EventSourcePublisher` that manages subscriptions, channels, encoding.
We refer to documentation in the [`shelf_eventsource`](https://pub.dartlang.org/packages/shelf_eventsource)
package for more information.
This library also includes a server provider for `dart:io`'s `HttpServer` in `io_server.dart`.
However, it has some issues with data flushing that are yet to be resolved, so we recommend using
shelf instead.
## Licensing
This project is available under the MIT license, as can be found in the LICENSE file.

View File

@ -0,0 +1,32 @@
import "package:eventsource/eventsource.dart";
import "package:http/browser_client.dart";
main() async {
// Because EventSource uses the http package, browser usage needs a special
// approach. This will change once https://github.com/dart-lang/http/issues/1
// is fixed.
EventSource eventSource = await EventSource
.connect("http://example.org/events", client: new BrowserClient());
// listen for events
eventSource.listen((Event event) {
print("New event:");
print(" event: ${event.event}");
print(" data: ${event.data}");
});
// If you know the last event.id from a previous connection, you can try this:
String lastId = "iknowmylastid";
eventSource = await EventSource.connect(
"http://example.org/events",
client: new BrowserClient(),
lastEventId: lastId,
);
// listen for events
eventSource.listen((Event event) {
print("New event:");
print(" event: ${event.event}");
print(" data: ${event.data}");
});
}

View File

@ -0,0 +1,27 @@
import "package:eventsource/eventsource.dart";
main() async {
// Because EventSource uses the http package, all platforms for which http
// works, will be able to use the generic method:
EventSource eventSource =
await EventSource.connect("http://example.org/events");
// listen for events
eventSource.listen((Event event) {
print("New event:");
print(" event: ${event.event}");
print(" data: ${event.data}");
});
// If you know the last event.id from a previous connection, you can try this:
String lastId = "iknowmylastid";
eventSource = await EventSource.connect("http://example.org/events",
lastEventId: lastId);
// listen for events
eventSource.listen((Event event) {
print("New event:");
print(" event: ${event.event}");
print(" data: ${event.data}");
});
}

141
lib/eventsource.dart Normal file
View File

@ -0,0 +1,141 @@
library eventsource;
export "src/event.dart";
import "dart:async";
import "dart:convert";
import "package:http/http.dart" as http;
import "package:http/src/utils.dart" show encodingForCharset;
import "package:http_parser/http_parser.dart" show MediaType;
import "src/event.dart";
import "src/decoder.dart";
enum EventSourceReadyState {
CONNECTING,
OPEN,
CLOSED,
}
class EventSourceSubscriptionException extends Event implements Exception {
int statusCode;
String message;
@override
String get data => "$statusCode: $message";
EventSourceSubscriptionException(this.statusCode, this.message)
: super(event: "error");
}
/// An EventSource client that exposes a [Stream] of [Event]s.
class EventSource extends Stream<Event> {
// interface attributes
final Uri url;
EventSourceReadyState get readyState => _readyState;
Stream<Event> get onOpen => this.where((e) => e.event == "open");
Stream<Event> get onMessage => this.where((e) => e.event == "message");
Stream<Event> get onError => this.where((e) => e.event == "error");
// internal attributes
StreamController<Event> _streamController =
new StreamController<Event>.broadcast();
EventSourceReadyState _readyState = EventSourceReadyState.CLOSED;
http.Client client;
Duration _retryDelay = const Duration(milliseconds: 3000);
String _lastEventId;
EventSourceDecoder _decoder;
/// Create a new EventSource by connecting to the specified url.
static Future<EventSource> connect(url,
{http.Client client, String lastEventId}) async {
// parameter initialization
url = url is Uri ? url : Uri.parse(url);
client = client ?? new http.Client();
lastEventId = lastEventId ?? "";
EventSource es = new EventSource._internal(url, client, lastEventId);
await es._start();
return es;
}
EventSource._internal(this.url, this.client, this._lastEventId) {
_decoder = new EventSourceDecoder(retryIndicator: _updateRetryDelay);
}
// proxy the listen call to the controller's listen call
@override
StreamSubscription<Event> listen(void onData(Event event),
{Function onError, void onDone(), bool cancelOnError}) =>
_streamController.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
/// Attempt to start a new connection.
Future _start() async {
_readyState = EventSourceReadyState.CONNECTING;
var request = new http.Request("GET", url);
request.headers["Cache-Control"] = "no-cache";
request.headers["Accept"] = "text/event-stream";
if (_lastEventId.isNotEmpty) {
request.headers["Last-Event-ID"] = _lastEventId;
}
var response = await client.send(request);
if (response.statusCode != 200) {
// server returned an error
var bodyBytes = await response.stream.toBytes();
String body = _encodingForHeaders(response.headers).decode(bodyBytes);
throw new EventSourceSubscriptionException(response.statusCode, body);
}
_readyState = EventSourceReadyState.OPEN;
// start streaming the data
response.stream.transform(_decoder).listen((Event event) {
_streamController.add(event);
_lastEventId = event.id;
},
cancelOnError: true,
onError: _retry,
onDone: () => _readyState = EventSourceReadyState.CLOSED);
}
/// Retries until a new connection is established. Uses exponential backoff.
Future _retry() async {
_readyState = EventSourceReadyState.CONNECTING;
// try reopening with exponential backoff
Duration backoff = _retryDelay;
while (true) {
await new Future.delayed(backoff);
try {
await _start();
break;
} catch (error) {
_streamController.addError(error);
backoff *= 2;
}
}
}
void _updateRetryDelay(Duration retry) {
_retryDelay = retry;
}
}
/// Returns the encoding to use for a response with the given headers. This
/// defaults to [LATIN1] if the headers don't specify a charset or
/// if that charset is unknown.
Encoding _encodingForHeaders(Map<String, String> headers) =>
encodingForCharset(_contentTypeForHeaders(headers).parameters['charset']);
/// Returns the [MediaType] object for the given headers's content-type.
///
/// Defaults to `application/octet-stream`.
MediaType _contentTypeForHeaders(Map<String, String> headers) {
var contentType = headers['content-type'];
if (contentType != null) return new MediaType.parse(contentType);
return new MediaType("application", "octet-stream");
}

62
lib/io_server.dart Normal file
View File

@ -0,0 +1,62 @@
library eventsource.io_server;
import "dart:io" as io;
import "package:sync/waitgroup.dart";
import "publisher.dart";
import "src/event.dart";
import "src/encoder.dart";
/// Create a handler to serve [io.HttpRequest] objects for the specified
/// channel.
/// This method can be passed to the [io.HttpServer.listen] method.
Function createIoHandler(EventSourcePublisher publisher,
{String channel: "", bool gzip: false}) {
void ioHandler(io.HttpRequest request) {
io.HttpResponse response = request.response;
// set content encoding to gzip if we allow it and the request supports it
bool useGzip = gzip &&
(request.headers.value(io.HttpHeaders.ACCEPT_ENCODING) ?? "")
.contains("gzip");
// set headers and status code
response.statusCode = 200;
response.headers.set("Content-Type", "text/event-stream; charset=utf-8");
response.headers
.set("Cache-Control", "no-cache, no-store, must-revalidate");
response.headers.set("Connection", "keep-alive");
if (useGzip) response.headers.set("Content-Encoding", "gzip");
// a wait group to keep track of flushes in order not to close while
// flushing
WaitGroup flushes = new WaitGroup();
// flush the headers
flushes.add(1);
response.flush().then((_) => flushes.done());
// create encoder for this connection
var encodedSink = new EventSourceEncoder(compressed: useGzip)
.startChunkedConversion(response);
// define the methods for pushing events and closing the connection
void onEvent(Event event) {
encodedSink.add(event);
flushes.add(1);
response.flush().then((_) => flushes.done());
}
void onClose() {
flushes.wait().then((_) => response.close());
}
// initialize the new subscription
publisher.newSubscription(
onEvent: onEvent,
onClose: onClose,
channel: channel,
lastEventId: request.headers.value("Last-Event-ID"));
}
return ioHandler;
}

131
lib/publisher.dart Normal file
View File

@ -0,0 +1,131 @@
library eventsource.server;
export "src/event.dart";
import "dart:async";
import "package:logging/logging.dart" as log;
import "src/event.dart";
import "src/event_cache.dart";
import "src/proxy_sink.dart";
/// An EventSource publisher. It can manage different channels of events.
/// This class forms the backbone of an EventSource server. To actually serve
/// a web server, use this together with [shelf_eventsource] or another server
/// implementation.
class EventSourcePublisher extends Sink<Event> {
log.Logger logger;
EventCache _cache;
/// Create a new EventSource server.
///
/// When using a cache, for efficient replaying, it is advisable to use a
/// custom Event implementation that overrides the `Event.compareTo` method.
/// F.e. if integer events are used, sorting should be done on integers and
/// not on the string representations of them.
/// If your Event's id properties are not incremental using
/// [Comparable.compare], set [comparableIds] to false.
EventSourcePublisher(
{int cacheCapacity: 0,
bool comparableIds: false,
bool enableLogging: true}) {
if (cacheCapacity > 0) {
_cache = new EventCache(cacheCapacity: cacheCapacity);
}
if (enableLogging) {
logger = new log.Logger("EventSourceServer");
}
}
Map<String, List<ProxySink>> _subsByChannel = {};
/// Creates a Sink for the specified channel.
/// The `add` and `remove` methods of this channel are equivalent to the
/// respective methods of this class with the specific channel passed along.
Sink<Event> channel(String channel) => new ProxySink(
onAdd: (e) => add(e, channels: [channel]),
onClose: () => close(channels: [channel]));
/// Add a publication to the specified channels.
/// By default, only adds to the default channel.
@override
void add(Event event, {Iterable<String> channels: const [""]}) {
for (String channel in channels) {
List subs = _subsByChannel[channel];
if (subs == null) {
continue;
}
_logFiner(
"Sending event on channel $channel to ${subs.length} subscribers.");
for (var sub in subs) {
sub.add(event);
}
}
if (_cache != null) {
_cache.add(event, channels);
}
}
/// Close the specified channels.
/// All the connections with the subscribers to this channels will be closed.
/// By default only closes the default channel.
@override
void close({Iterable<String> channels: const [""]}) {
for (String channel in channels) {
List subs = _subsByChannel[channel];
if (subs == null) {
continue;
}
_logInfo("Closing channel $channel with ${subs.length} subscribers.");
for (var sub in subs) {
sub.close();
}
}
if (_cache != null) {
_cache.clear(channels);
}
}
/// Close all the open channels.
void closeAllChannels() => close(channels: _subsByChannel.keys);
/// Initialize a new subscription and replay when possible.
/// Should not be used by the user directly.
void newSubscription(
{Function onEvent,
Function onClose,
String channel,
String lastEventId}) {
_logFine("New subscriber on channel $channel.");
// create a sink for the subscription
var sub = new ProxySink(onAdd: onEvent, onClose: onClose);
// save the subscription
_subsByChannel.putIfAbsent(channel, () => []).add(sub);
// replay past events
if (_cache != null && lastEventId != null) {
scheduleMicrotask(() {
_logFine("Replaying events on channel $channel from id $lastEventId.");
_cache.replay(sub, lastEventId, channel);
});
}
}
void _logInfo(message) {
if (logger != null) {
logger.log(log.Level.INFO, message);
}
}
void _logFine(message) {
if (logger != null) {
logger.log(log.Level.FINE, message);
}
}
void _logFiner(message) {
if (logger != null) {
logger.log(log.Level.FINER, message);
}
}
}

69
lib/src/decoder.dart Normal file
View File

@ -0,0 +1,69 @@
library eventsource.src.decoder;
import "dart:async";
import "dart:convert";
import "event.dart";
typedef void RetryIndicator(Duration);
class EventSourceDecoder implements StreamTransformer<List<int>, Event> {
RetryIndicator retryIndicator;
EventSourceDecoder({this.retryIndicator});
Stream<Event> bind(Stream<List<int>> stream) {
StreamController<Event> controller;
controller = new StreamController(onListen: () {
// the event we are currently building
Event currentEvent = new Event();
// the regexes we will use later
RegExp lineRegex = new RegExp(r"^([^:]*)(?::)?(?: )?(.*)?$");
RegExp removeEndingNewlineRegex = new RegExp(r"^((?:.|\n)*)\n$");
// This stream will receive chunks of data that is not necessarily a
// single event. So we build events on the fly and broadcast the event as
// soon as we encounter a double newline, then we start a new one.
stream
.transform(new Utf8Decoder())
.transform(new LineSplitter())
.listen((String line) {
if (line.isEmpty) {
// event is done
// strip ending newline from data
if (currentEvent.data != null) {
var match = removeEndingNewlineRegex.firstMatch(currentEvent.data);
currentEvent.data = match.group(1);
}
controller.add(currentEvent);
currentEvent = new Event();
return;
}
// match the line prefix and the value using the regex
Match match = lineRegex.firstMatch(line);
String field = match.group(1);
String value = match.group(2) ?? "";
if (field.isEmpty) {
// lines starting with a colon are to be ignored
return;
}
switch (field) {
case "event":
currentEvent.event = value;
break;
case "data":
currentEvent.data = (currentEvent.data ?? "") + value + "\n";
break;
case "id":
currentEvent.id = value;
break;
case "retry":
if (retryIndicator != null) {
retryIndicator(new Duration(milliseconds: int.parse(value)));
}
break;
}
});
});
return controller.stream;
}
}

56
lib/src/encoder.dart Normal file
View File

@ -0,0 +1,56 @@
library eventsource.src.encoder;
import "dart:convert";
import "dart:io";
import "event.dart";
import "proxy_sink.dart";
class EventSourceEncoder extends Converter<Event, List<int>> {
final bool compressed;
const EventSourceEncoder({bool this.compressed: false});
static Map<String, Function> _fields = {
"id: ": (e) => e.id,
"event: ": (e) => e.event,
"data: ": (e) => e.data,
};
@override
List<int> convert(Event event) {
String payload = convertToString(event);
List<int> bytes = UTF8.encode(payload);
if (compressed) {
bytes = GZIP.encode(bytes);
}
return bytes;
}
String convertToString(Event event) {
String payload = "";
for (String prefix in _fields.keys) {
String value = _fields[prefix](event);
if (value == null || value.isEmpty) {
continue;
}
// multi-line values need the field prefix on every line
value = value.replaceAll("\n", "\n$prefix");
payload += prefix + value + "\n";
}
payload += "\n";
return payload;
}
@override
Sink<Event> startChunkedConversion(Sink<List<int>> sink) {
Sink inputSink = sink;
if (compressed) {
inputSink = GZIP.encoder.startChunkedConversion(inputSink);
}
inputSink = UTF8.encoder.startChunkedConversion(inputSink);
return new ProxySink(
onAdd: (Event event) => inputSink.add(convertToString(event)),
onClose: () => inputSink.close());
}
}

21
lib/src/event.dart Normal file
View File

@ -0,0 +1,21 @@
library eventsource.src.event;
class Event implements Comparable<Event> {
/// An identifier that can be used to allow a client to replay
/// missed Events by returning the Last-Event-Id header.
/// Return empty string if not required.
String id;
/// The name of the event. Return empty string if not required.
String event;
/// The payload of the event.
String data;
Event({this.id, this.event, this.data});
Event.message({this.id, this.data}) : event = "message";
@override
int compareTo(Event other) => id.compareTo(other.id);
}

59
lib/src/event_cache.dart Normal file
View File

@ -0,0 +1,59 @@
library eventsource.src.cache;
import "package:collection/collection.dart";
import "event.dart";
//TODO use more efficient data structure than List
class EventCache {
final int cacheCapacity;
final bool comparableIds;
Map<String, List<Event>> _caches = new Map<String, List<Event>>();
EventCache({this.cacheCapacity, this.comparableIds: true});
void replay(Sink<Event> sink, String lastEventId, [String channel = ""]) {
List<Event> cache = _caches[channel];
if (cache == null || cache.isEmpty) {
// nothing to replay
return;
}
// find the location of lastEventId in the queue
int index;
if (comparableIds) {
// if comparableIds, we can use binary search
index = binarySearch(cache, lastEventId);
} else {
// otherwise, we starts from the last one and look one by one
index = cache.length - 1;
while (index > 0 && cache[index].id != lastEventId) {
index--;
}
}
if (index >= 0) {
// add them all to the sink
cache.sublist(index).forEach(sink.add);
}
}
/// Add a new [Event] to the cache(s) of the specified channel(s).
/// Please note that we assume events are added with increasing values of
/// [Event.id].
void add(Event event, [Iterable<String> channels = const [""]]) {
for (String channel in channels) {
List<Event> cache = _caches.putIfAbsent(channel, () => new List());
if (cacheCapacity != null && cache.length >= cacheCapacity) {
cache.removeAt(0);
}
cache.add(event);
}
}
void clear([Iterable<String> channels = const [""]]) {
channels.forEach(_caches.remove);
}
void clearAll() {
_caches.clear();
}
}

13
lib/src/proxy_sink.dart Normal file
View File

@ -0,0 +1,13 @@
library eventsource.src.proxy_sink;
/// Just a simple [Sink] implementation that proxies the [add] and [close]
/// methods.
class ProxySink<T> implements Sink<T> {
Function onAdd;
Function onClose;
ProxySink({this.onAdd, this.onClose});
@override
void add(t) => onAdd(t);
@override
void close() => onClose();
}

18
pubspec.yaml Normal file
View File

@ -0,0 +1,18 @@
name: eventsource
description: A client and server implementation of Server-Side Events.
version: 0.1.0
author: Steven Roose <stevenroose@gmail.com>
homepage: https://github.com/stevenroose/dart-eventsource
environment:
sdk: ">=1.0.0 <2.0.0"
dependencies:
collection: ">=1.4.1 <2.0.0"
http: ">=0.11.0 <0.12.0"
http_parser: ">=2.2.0 <4.0.0"
logging: ">=0.11.0 <0.12.0"
sync: ">=0.1.0 <0.2.0"
dev_dependencies:
test: ">=0.12.0 <0.13.0"

62
test/codec_test.dart Normal file
View File

@ -0,0 +1,62 @@
library codec_test;
import "dart:async";
import "dart:convert";
import "package:test/test.dart";
import "package:eventsource/src/decoder.dart";
import "package:eventsource/src/encoder.dart";
import "package:eventsource/src/event.dart";
Map<Event, String> _VECTORS = {
new Event(id: "1", event: "Add", data: "This is a test"):
"id: 1\nevent: Add\ndata: This is a test\n\n",
new Event(data: "This message, it\nhas two lines."):
"data: This message, it\ndata: has two lines.\n\n",
};
void main() {
group("encoder", () {
test("vectors", () {
var encoder = new EventSourceEncoder();
for (Event event in _VECTORS.keys) {
var encoded = _VECTORS[event];
expect(encoder.convert(event), equals(UTF8.encode(encoded)));
}
});
//TODO add gzip test
});
group("decoder", () {
test("vectors", () async {
for (Event event in _VECTORS.keys) {
var encoded = _VECTORS[event];
var stream = new Stream.fromIterable([encoded])
.transform(new Utf8Encoder())
.transform(new EventSourceDecoder());
stream.listen(expectAsync((decodedEvent) {
expect(decodedEvent.id, equals(event.id));
expect(decodedEvent.event, equals(event.event));
expect(decodedEvent.data, equals(event.data));
}, count: 1));
}
});
test("pass retry value", () async {
Event event = new Event(id: "1", event: "Add", data: "This is a test");
String encodedWithRetry =
"id: 1\nevent: Add\ndata: This is a test\nretry: 100\n\n";
var changeRetryValue = expectAsync((Duration value) {
expect(value.inMilliseconds, equals(100));
}, count: 1);
var stream = new Stream.fromIterable([encodedWithRetry])
.transform(new Utf8Encoder())
.transform(new EventSourceDecoder(retryIndicator: changeRetryValue));
stream.listen(expectAsync((decodedEvent) {
expect(decodedEvent.id, equals(event.id));
expect(decodedEvent.event, equals(event.event));
expect(decodedEvent.data, equals(event.data));
}, count: 1));
});
});
}