From 3dadfb5062f585593fb1f1d185434a1b137a7a6f Mon Sep 17 00:00:00 2001 From: Steven Roose Date: Sun, 27 Nov 2016 21:31:19 +0100 Subject: [PATCH] Initial version --- .gitignore | 23 ++++++ CHANGELOG.md | 5 ++ LICENSE | 21 ++++++ README.md | 34 +++++++++ example/client_browser.dart | 32 ++++++++ example/client_generic.dart | 27 +++++++ lib/eventsource.dart | 141 ++++++++++++++++++++++++++++++++++++ lib/io_server.dart | 62 ++++++++++++++++ lib/publisher.dart | 131 +++++++++++++++++++++++++++++++++ lib/src/decoder.dart | 69 ++++++++++++++++++ lib/src/encoder.dart | 56 ++++++++++++++ lib/src/event.dart | 21 ++++++ lib/src/event_cache.dart | 59 +++++++++++++++ lib/src/proxy_sink.dart | 13 ++++ pubspec.yaml | 18 +++++ test/codec_test.dart | 62 ++++++++++++++++ 16 files changed, 774 insertions(+) create mode 100644 .gitignore create mode 100644 CHANGELOG.md create mode 100644 LICENSE create mode 100644 README.md create mode 100644 example/client_browser.dart create mode 100644 example/client_generic.dart create mode 100644 lib/eventsource.dart create mode 100644 lib/io_server.dart create mode 100644 lib/publisher.dart create mode 100644 lib/src/decoder.dart create mode 100644 lib/src/encoder.dart create mode 100644 lib/src/event.dart create mode 100644 lib/src/event_cache.dart create mode 100644 lib/src/proxy_sink.dart create mode 100644 pubspec.yaml create mode 100644 test/codec_test.dart diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e05fe4d --- /dev/null +++ b/.gitignore @@ -0,0 +1,23 @@ +# Files and directories created by pub +.packages +.pub/ +build/ +packages +# Remove the following pattern if you wish to check in your lock file +pubspec.lock.old + +# Files created by dart2js +*.dart.js +*.part.js +*.js.deps +*.js.map +*.info.json + +# Directory created by dartdoc +doc/api/ + +# JetBrains IDEs +.idea/ +*.iml +*.ipr +*.iws diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..c7b14d2 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,5 @@ +# Changelog + +## 0.1.0 (2016-11-27) + +- Initial version diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..d917ba1 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2016 Steven Roose + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..20f7396 --- /dev/null +++ b/README.md @@ -0,0 +1,34 @@ +# eventsource + +A library for using EventSource or Server-Side Events (SSE). +Both client and server functionality is provided. + +## Client usage + +For more advanced usage, see the `example/` directory. +Creating a new EventSource client is as easy as a single call. +The http package is used under the hood, so wherever this package works, this lbirary will also work. +Browser usage is slightly different. + +```dart +EventSource eventSource = await EventSource.connect("http://example.com/events"); +// in browsers, you need to pass a http.BrowserClient: +EventSource eventSource = await EventSource.connect("http://example.com/events", + client: new http.BrowserClient()); +``` + +## Server usage + +We recommend using [`shelf_eventsource`](https://pub.dartlang.org/packages/shelf_eventsource) for +serving Server-Side Events. +This library provides an `EventSourcePublisher` that manages subscriptions, channels, encoding. +We refer to documentation in the [`shelf_eventsource`](https://pub.dartlang.org/packages/shelf_eventsource) +package for more information. + +This library also includes a server provider for `dart:io`'s `HttpServer` in `io_server.dart`. +However, it has some issues with data flushing that are yet to be resolved, so we recommend using +shelf instead. + +## Licensing + +This project is available under the MIT license, as can be found in the LICENSE file. \ No newline at end of file diff --git a/example/client_browser.dart b/example/client_browser.dart new file mode 100644 index 0000000..d77aeca --- /dev/null +++ b/example/client_browser.dart @@ -0,0 +1,32 @@ +import "package:eventsource/eventsource.dart"; +import "package:http/browser_client.dart"; + +main() async { + // Because EventSource uses the http package, browser usage needs a special + // approach. This will change once https://github.com/dart-lang/http/issues/1 + // is fixed. + + EventSource eventSource = await EventSource + .connect("http://example.org/events", client: new BrowserClient()); + // listen for events + eventSource.listen((Event event) { + print("New event:"); + print(" event: ${event.event}"); + print(" data: ${event.data}"); + }); + + // If you know the last event.id from a previous connection, you can try this: + + String lastId = "iknowmylastid"; + eventSource = await EventSource.connect( + "http://example.org/events", + client: new BrowserClient(), + lastEventId: lastId, + ); + // listen for events + eventSource.listen((Event event) { + print("New event:"); + print(" event: ${event.event}"); + print(" data: ${event.data}"); + }); +} diff --git a/example/client_generic.dart b/example/client_generic.dart new file mode 100644 index 0000000..29528ef --- /dev/null +++ b/example/client_generic.dart @@ -0,0 +1,27 @@ +import "package:eventsource/eventsource.dart"; + +main() async { + // Because EventSource uses the http package, all platforms for which http + // works, will be able to use the generic method: + + EventSource eventSource = + await EventSource.connect("http://example.org/events"); + // listen for events + eventSource.listen((Event event) { + print("New event:"); + print(" event: ${event.event}"); + print(" data: ${event.data}"); + }); + + // If you know the last event.id from a previous connection, you can try this: + + String lastId = "iknowmylastid"; + eventSource = await EventSource.connect("http://example.org/events", + lastEventId: lastId); + // listen for events + eventSource.listen((Event event) { + print("New event:"); + print(" event: ${event.event}"); + print(" data: ${event.data}"); + }); +} diff --git a/lib/eventsource.dart b/lib/eventsource.dart new file mode 100644 index 0000000..4683f87 --- /dev/null +++ b/lib/eventsource.dart @@ -0,0 +1,141 @@ +library eventsource; + +export "src/event.dart"; + +import "dart:async"; +import "dart:convert"; + +import "package:http/http.dart" as http; +import "package:http/src/utils.dart" show encodingForCharset; +import "package:http_parser/http_parser.dart" show MediaType; + +import "src/event.dart"; +import "src/decoder.dart"; + +enum EventSourceReadyState { + CONNECTING, + OPEN, + CLOSED, +} + +class EventSourceSubscriptionException extends Event implements Exception { + int statusCode; + String message; + + @override + String get data => "$statusCode: $message"; + + EventSourceSubscriptionException(this.statusCode, this.message) + : super(event: "error"); +} + +/// An EventSource client that exposes a [Stream] of [Event]s. +class EventSource extends Stream { + // interface attributes + + final Uri url; + + EventSourceReadyState get readyState => _readyState; + + Stream get onOpen => this.where((e) => e.event == "open"); + Stream get onMessage => this.where((e) => e.event == "message"); + Stream get onError => this.where((e) => e.event == "error"); + + // internal attributes + + StreamController _streamController = + new StreamController.broadcast(); + + EventSourceReadyState _readyState = EventSourceReadyState.CLOSED; + + http.Client client; + Duration _retryDelay = const Duration(milliseconds: 3000); + String _lastEventId; + EventSourceDecoder _decoder; + + /// Create a new EventSource by connecting to the specified url. + static Future connect(url, + {http.Client client, String lastEventId}) async { + // parameter initialization + url = url is Uri ? url : Uri.parse(url); + client = client ?? new http.Client(); + lastEventId = lastEventId ?? ""; + EventSource es = new EventSource._internal(url, client, lastEventId); + await es._start(); + return es; + } + + EventSource._internal(this.url, this.client, this._lastEventId) { + _decoder = new EventSourceDecoder(retryIndicator: _updateRetryDelay); + } + + // proxy the listen call to the controller's listen call + @override + StreamSubscription listen(void onData(Event event), + {Function onError, void onDone(), bool cancelOnError}) => + _streamController.stream.listen(onData, + onError: onError, onDone: onDone, cancelOnError: cancelOnError); + + /// Attempt to start a new connection. + Future _start() async { + _readyState = EventSourceReadyState.CONNECTING; + var request = new http.Request("GET", url); + request.headers["Cache-Control"] = "no-cache"; + request.headers["Accept"] = "text/event-stream"; + if (_lastEventId.isNotEmpty) { + request.headers["Last-Event-ID"] = _lastEventId; + } + var response = await client.send(request); + if (response.statusCode != 200) { + // server returned an error + var bodyBytes = await response.stream.toBytes(); + String body = _encodingForHeaders(response.headers).decode(bodyBytes); + throw new EventSourceSubscriptionException(response.statusCode, body); + } + _readyState = EventSourceReadyState.OPEN; + // start streaming the data + response.stream.transform(_decoder).listen((Event event) { + _streamController.add(event); + _lastEventId = event.id; + }, + cancelOnError: true, + onError: _retry, + onDone: () => _readyState = EventSourceReadyState.CLOSED); + } + + /// Retries until a new connection is established. Uses exponential backoff. + Future _retry() async { + _readyState = EventSourceReadyState.CONNECTING; + // try reopening with exponential backoff + Duration backoff = _retryDelay; + while (true) { + await new Future.delayed(backoff); + try { + await _start(); + break; + } catch (error) { + _streamController.addError(error); + backoff *= 2; + } + } + } + + void _updateRetryDelay(Duration retry) { + _retryDelay = retry; + } +} + +/// Returns the encoding to use for a response with the given headers. This +/// defaults to [LATIN1] if the headers don't specify a charset or +/// if that charset is unknown. +Encoding _encodingForHeaders(Map headers) => + encodingForCharset(_contentTypeForHeaders(headers).parameters['charset']); + +/// Returns the [MediaType] object for the given headers's content-type. +/// +/// Defaults to `application/octet-stream`. +MediaType _contentTypeForHeaders(Map headers) { + var contentType = headers['content-type']; + if (contentType != null) return new MediaType.parse(contentType); + return new MediaType("application", "octet-stream"); +} diff --git a/lib/io_server.dart b/lib/io_server.dart new file mode 100644 index 0000000..74556ef --- /dev/null +++ b/lib/io_server.dart @@ -0,0 +1,62 @@ +library eventsource.io_server; + +import "dart:io" as io; + +import "package:sync/waitgroup.dart"; + +import "publisher.dart"; +import "src/event.dart"; +import "src/encoder.dart"; + +/// Create a handler to serve [io.HttpRequest] objects for the specified +/// channel. +/// This method can be passed to the [io.HttpServer.listen] method. +Function createIoHandler(EventSourcePublisher publisher, + {String channel: "", bool gzip: false}) { + void ioHandler(io.HttpRequest request) { + io.HttpResponse response = request.response; + + // set content encoding to gzip if we allow it and the request supports it + bool useGzip = gzip && + (request.headers.value(io.HttpHeaders.ACCEPT_ENCODING) ?? "") + .contains("gzip"); + + // set headers and status code + response.statusCode = 200; + response.headers.set("Content-Type", "text/event-stream; charset=utf-8"); + response.headers + .set("Cache-Control", "no-cache, no-store, must-revalidate"); + response.headers.set("Connection", "keep-alive"); + if (useGzip) response.headers.set("Content-Encoding", "gzip"); + // a wait group to keep track of flushes in order not to close while + // flushing + WaitGroup flushes = new WaitGroup(); + // flush the headers + flushes.add(1); + response.flush().then((_) => flushes.done()); + + // create encoder for this connection + var encodedSink = new EventSourceEncoder(compressed: useGzip) + .startChunkedConversion(response); + + // define the methods for pushing events and closing the connection + void onEvent(Event event) { + encodedSink.add(event); + flushes.add(1); + response.flush().then((_) => flushes.done()); + } + + void onClose() { + flushes.wait().then((_) => response.close()); + } + + // initialize the new subscription + publisher.newSubscription( + onEvent: onEvent, + onClose: onClose, + channel: channel, + lastEventId: request.headers.value("Last-Event-ID")); + } + + return ioHandler; +} diff --git a/lib/publisher.dart b/lib/publisher.dart new file mode 100644 index 0000000..bffbdd7 --- /dev/null +++ b/lib/publisher.dart @@ -0,0 +1,131 @@ +library eventsource.server; + +export "src/event.dart"; + +import "dart:async"; + +import "package:logging/logging.dart" as log; + +import "src/event.dart"; +import "src/event_cache.dart"; +import "src/proxy_sink.dart"; + +/// An EventSource publisher. It can manage different channels of events. +/// This class forms the backbone of an EventSource server. To actually serve +/// a web server, use this together with [shelf_eventsource] or another server +/// implementation. +class EventSourcePublisher extends Sink { + log.Logger logger; + EventCache _cache; + + /// Create a new EventSource server. + /// + /// When using a cache, for efficient replaying, it is advisable to use a + /// custom Event implementation that overrides the `Event.compareTo` method. + /// F.e. if integer events are used, sorting should be done on integers and + /// not on the string representations of them. + /// If your Event's id properties are not incremental using + /// [Comparable.compare], set [comparableIds] to false. + EventSourcePublisher( + {int cacheCapacity: 0, + bool comparableIds: false, + bool enableLogging: true}) { + if (cacheCapacity > 0) { + _cache = new EventCache(cacheCapacity: cacheCapacity); + } + if (enableLogging) { + logger = new log.Logger("EventSourceServer"); + } + } + + Map> _subsByChannel = {}; + + /// Creates a Sink for the specified channel. + /// The `add` and `remove` methods of this channel are equivalent to the + /// respective methods of this class with the specific channel passed along. + Sink channel(String channel) => new ProxySink( + onAdd: (e) => add(e, channels: [channel]), + onClose: () => close(channels: [channel])); + + /// Add a publication to the specified channels. + /// By default, only adds to the default channel. + @override + void add(Event event, {Iterable channels: const [""]}) { + for (String channel in channels) { + List subs = _subsByChannel[channel]; + if (subs == null) { + continue; + } + _logFiner( + "Sending event on channel $channel to ${subs.length} subscribers."); + for (var sub in subs) { + sub.add(event); + } + } + if (_cache != null) { + _cache.add(event, channels); + } + } + + /// Close the specified channels. + /// All the connections with the subscribers to this channels will be closed. + /// By default only closes the default channel. + @override + void close({Iterable channels: const [""]}) { + for (String channel in channels) { + List subs = _subsByChannel[channel]; + if (subs == null) { + continue; + } + _logInfo("Closing channel $channel with ${subs.length} subscribers."); + for (var sub in subs) { + sub.close(); + } + } + if (_cache != null) { + _cache.clear(channels); + } + } + + /// Close all the open channels. + void closeAllChannels() => close(channels: _subsByChannel.keys); + + /// Initialize a new subscription and replay when possible. + /// Should not be used by the user directly. + void newSubscription( + {Function onEvent, + Function onClose, + String channel, + String lastEventId}) { + _logFine("New subscriber on channel $channel."); + // create a sink for the subscription + var sub = new ProxySink(onAdd: onEvent, onClose: onClose); + // save the subscription + _subsByChannel.putIfAbsent(channel, () => []).add(sub); + // replay past events + if (_cache != null && lastEventId != null) { + scheduleMicrotask(() { + _logFine("Replaying events on channel $channel from id $lastEventId."); + _cache.replay(sub, lastEventId, channel); + }); + } + } + + void _logInfo(message) { + if (logger != null) { + logger.log(log.Level.INFO, message); + } + } + + void _logFine(message) { + if (logger != null) { + logger.log(log.Level.FINE, message); + } + } + + void _logFiner(message) { + if (logger != null) { + logger.log(log.Level.FINER, message); + } + } +} diff --git a/lib/src/decoder.dart b/lib/src/decoder.dart new file mode 100644 index 0000000..bab866a --- /dev/null +++ b/lib/src/decoder.dart @@ -0,0 +1,69 @@ +library eventsource.src.decoder; + +import "dart:async"; +import "dart:convert"; + +import "event.dart"; + +typedef void RetryIndicator(Duration); + +class EventSourceDecoder implements StreamTransformer, Event> { + RetryIndicator retryIndicator; + + EventSourceDecoder({this.retryIndicator}); + + Stream bind(Stream> stream) { + StreamController controller; + controller = new StreamController(onListen: () { + // the event we are currently building + Event currentEvent = new Event(); + // the regexes we will use later + RegExp lineRegex = new RegExp(r"^([^:]*)(?::)?(?: )?(.*)?$"); + RegExp removeEndingNewlineRegex = new RegExp(r"^((?:.|\n)*)\n$"); + // This stream will receive chunks of data that is not necessarily a + // single event. So we build events on the fly and broadcast the event as + // soon as we encounter a double newline, then we start a new one. + stream + .transform(new Utf8Decoder()) + .transform(new LineSplitter()) + .listen((String line) { + if (line.isEmpty) { + // event is done + // strip ending newline from data + if (currentEvent.data != null) { + var match = removeEndingNewlineRegex.firstMatch(currentEvent.data); + currentEvent.data = match.group(1); + } + controller.add(currentEvent); + currentEvent = new Event(); + return; + } + // match the line prefix and the value using the regex + Match match = lineRegex.firstMatch(line); + String field = match.group(1); + String value = match.group(2) ?? ""; + if (field.isEmpty) { + // lines starting with a colon are to be ignored + return; + } + switch (field) { + case "event": + currentEvent.event = value; + break; + case "data": + currentEvent.data = (currentEvent.data ?? "") + value + "\n"; + break; + case "id": + currentEvent.id = value; + break; + case "retry": + if (retryIndicator != null) { + retryIndicator(new Duration(milliseconds: int.parse(value))); + } + break; + } + }); + }); + return controller.stream; + } +} diff --git a/lib/src/encoder.dart b/lib/src/encoder.dart new file mode 100644 index 0000000..f69bcb5 --- /dev/null +++ b/lib/src/encoder.dart @@ -0,0 +1,56 @@ +library eventsource.src.encoder; + +import "dart:convert"; +import "dart:io"; + +import "event.dart"; +import "proxy_sink.dart"; + +class EventSourceEncoder extends Converter> { + final bool compressed; + + const EventSourceEncoder({bool this.compressed: false}); + + static Map _fields = { + "id: ": (e) => e.id, + "event: ": (e) => e.event, + "data: ": (e) => e.data, + }; + + @override + List convert(Event event) { + String payload = convertToString(event); + List bytes = UTF8.encode(payload); + if (compressed) { + bytes = GZIP.encode(bytes); + } + return bytes; + } + + String convertToString(Event event) { + String payload = ""; + for (String prefix in _fields.keys) { + String value = _fields[prefix](event); + if (value == null || value.isEmpty) { + continue; + } + // multi-line values need the field prefix on every line + value = value.replaceAll("\n", "\n$prefix"); + payload += prefix + value + "\n"; + } + payload += "\n"; + return payload; + } + + @override + Sink startChunkedConversion(Sink> sink) { + Sink inputSink = sink; + if (compressed) { + inputSink = GZIP.encoder.startChunkedConversion(inputSink); + } + inputSink = UTF8.encoder.startChunkedConversion(inputSink); + return new ProxySink( + onAdd: (Event event) => inputSink.add(convertToString(event)), + onClose: () => inputSink.close()); + } +} diff --git a/lib/src/event.dart b/lib/src/event.dart new file mode 100644 index 0000000..d209b9e --- /dev/null +++ b/lib/src/event.dart @@ -0,0 +1,21 @@ +library eventsource.src.event; + +class Event implements Comparable { + /// An identifier that can be used to allow a client to replay + /// missed Events by returning the Last-Event-Id header. + /// Return empty string if not required. + String id; + + /// The name of the event. Return empty string if not required. + String event; + + /// The payload of the event. + String data; + + Event({this.id, this.event, this.data}); + + Event.message({this.id, this.data}) : event = "message"; + + @override + int compareTo(Event other) => id.compareTo(other.id); +} diff --git a/lib/src/event_cache.dart b/lib/src/event_cache.dart new file mode 100644 index 0000000..abe6e17 --- /dev/null +++ b/lib/src/event_cache.dart @@ -0,0 +1,59 @@ +library eventsource.src.cache; + +import "package:collection/collection.dart"; + +import "event.dart"; + +//TODO use more efficient data structure than List +class EventCache { + final int cacheCapacity; + final bool comparableIds; + Map> _caches = new Map>(); + + EventCache({this.cacheCapacity, this.comparableIds: true}); + + void replay(Sink sink, String lastEventId, [String channel = ""]) { + List cache = _caches[channel]; + if (cache == null || cache.isEmpty) { + // nothing to replay + return; + } + // find the location of lastEventId in the queue + int index; + if (comparableIds) { + // if comparableIds, we can use binary search + index = binarySearch(cache, lastEventId); + } else { + // otherwise, we starts from the last one and look one by one + index = cache.length - 1; + while (index > 0 && cache[index].id != lastEventId) { + index--; + } + } + if (index >= 0) { + // add them all to the sink + cache.sublist(index).forEach(sink.add); + } + } + + /// Add a new [Event] to the cache(s) of the specified channel(s). + /// Please note that we assume events are added with increasing values of + /// [Event.id]. + void add(Event event, [Iterable channels = const [""]]) { + for (String channel in channels) { + List cache = _caches.putIfAbsent(channel, () => new List()); + if (cacheCapacity != null && cache.length >= cacheCapacity) { + cache.removeAt(0); + } + cache.add(event); + } + } + + void clear([Iterable channels = const [""]]) { + channels.forEach(_caches.remove); + } + + void clearAll() { + _caches.clear(); + } +} diff --git a/lib/src/proxy_sink.dart b/lib/src/proxy_sink.dart new file mode 100644 index 0000000..7a226f7 --- /dev/null +++ b/lib/src/proxy_sink.dart @@ -0,0 +1,13 @@ +library eventsource.src.proxy_sink; + +/// Just a simple [Sink] implementation that proxies the [add] and [close] +/// methods. +class ProxySink implements Sink { + Function onAdd; + Function onClose; + ProxySink({this.onAdd, this.onClose}); + @override + void add(t) => onAdd(t); + @override + void close() => onClose(); +} diff --git a/pubspec.yaml b/pubspec.yaml new file mode 100644 index 0000000..d052085 --- /dev/null +++ b/pubspec.yaml @@ -0,0 +1,18 @@ +name: eventsource +description: A client and server implementation of Server-Side Events. +version: 0.1.0 +author: Steven Roose +homepage: https://github.com/stevenroose/dart-eventsource + +environment: + sdk: ">=1.0.0 <2.0.0" + +dependencies: + collection: ">=1.4.1 <2.0.0" + http: ">=0.11.0 <0.12.0" + http_parser: ">=2.2.0 <4.0.0" + logging: ">=0.11.0 <0.12.0" + sync: ">=0.1.0 <0.2.0" + +dev_dependencies: + test: ">=0.12.0 <0.13.0" diff --git a/test/codec_test.dart b/test/codec_test.dart new file mode 100644 index 0000000..d54cd66 --- /dev/null +++ b/test/codec_test.dart @@ -0,0 +1,62 @@ +library codec_test; + +import "dart:async"; +import "dart:convert"; + +import "package:test/test.dart"; + +import "package:eventsource/src/decoder.dart"; +import "package:eventsource/src/encoder.dart"; +import "package:eventsource/src/event.dart"; + +Map _VECTORS = { + new Event(id: "1", event: "Add", data: "This is a test"): + "id: 1\nevent: Add\ndata: This is a test\n\n", + new Event(data: "This message, it\nhas two lines."): + "data: This message, it\ndata: has two lines.\n\n", +}; + +void main() { + group("encoder", () { + test("vectors", () { + var encoder = new EventSourceEncoder(); + for (Event event in _VECTORS.keys) { + var encoded = _VECTORS[event]; + expect(encoder.convert(event), equals(UTF8.encode(encoded))); + } + }); + //TODO add gzip test + }); + + group("decoder", () { + test("vectors", () async { + for (Event event in _VECTORS.keys) { + var encoded = _VECTORS[event]; + var stream = new Stream.fromIterable([encoded]) + .transform(new Utf8Encoder()) + .transform(new EventSourceDecoder()); + stream.listen(expectAsync((decodedEvent) { + expect(decodedEvent.id, equals(event.id)); + expect(decodedEvent.event, equals(event.event)); + expect(decodedEvent.data, equals(event.data)); + }, count: 1)); + } + }); + test("pass retry value", () async { + Event event = new Event(id: "1", event: "Add", data: "This is a test"); + String encodedWithRetry = + "id: 1\nevent: Add\ndata: This is a test\nretry: 100\n\n"; + var changeRetryValue = expectAsync((Duration value) { + expect(value.inMilliseconds, equals(100)); + }, count: 1); + var stream = new Stream.fromIterable([encodedWithRetry]) + .transform(new Utf8Encoder()) + .transform(new EventSourceDecoder(retryIndicator: changeRetryValue)); + stream.listen(expectAsync((decodedEvent) { + expect(decodedEvent.id, equals(event.id)); + expect(decodedEvent.event, equals(event.event)); + expect(decodedEvent.data, equals(event.data)); + }, count: 1)); + }); + }); +}