library eventsource; export "src/event.dart"; import "dart:async"; import "dart:convert"; import "package:http/http.dart" as http; import "package:http/src/utils.dart" show encodingForCharset; import "package:http_parser/http_parser.dart" show MediaType; import "src/event.dart"; import "src/decoder.dart"; enum EventSourceReadyState { CONNECTING, OPEN, CLOSED, } class EventSourceSubscriptionException extends Event implements Exception { int statusCode; String message; @override String get data => "$statusCode: $message"; EventSourceSubscriptionException(this.statusCode, this.message) : super(event: "error"); } /// An EventSource client that exposes a [Stream] of [Event]s. class EventSource extends Stream { // interface attributes final Uri url; String authToken = ""; EventSourceReadyState get readyState => _readyState; Stream get onOpen => this.where((e) => e.event == "open"); Stream get onMessage => this.where((e) => e.event == "message"); Stream get onError => this.where((e) => e.event == "error"); // internal attributes StreamController _streamController = new StreamController.broadcast(); EventSourceReadyState _readyState = EventSourceReadyState.CLOSED; http.Client client; Duration _retryDelay = const Duration(milliseconds: 3000); String _lastEventId; EventSourceDecoder _decoder; /// Create a new EventSource by connecting to the specified url. static Future connect(url, {http.Client client, String lastEventId, String authToken}) async { // parameter initialization url = url is Uri ? url : Uri.parse(url); client = client ?? new http.Client(); lastEventId = lastEventId ?? ""; EventSource es = new EventSource._internal(url, client, lastEventId, authToken); await es._start(); return es; } EventSource._internal( this.url, this.client, this._lastEventId, this.authToken) { _decoder = new EventSourceDecoder(retryIndicator: _updateRetryDelay); } // proxy the listen call to the controller's listen call @override StreamSubscription listen(void onData(Event event), {Function onError, void onDone(), bool cancelOnError}) => _streamController.stream.listen(onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError); /// Attempt to start a new connection. Future _start() async { _readyState = EventSourceReadyState.CONNECTING; var request = new http.Request("GET", url); request.headers["Cache-Control"] = "no-cache"; request.headers["Accept"] = "text/event-stream"; request.headers["Authorization"] = "Bearer $authToken"; if (_lastEventId.isNotEmpty) { request.headers["Last-Event-ID"] = _lastEventId; } var response = await client.send(request); if (response.statusCode != 200) { // server returned an error var bodyBytes = await response.stream.toBytes(); String body = _encodingForHeaders(response.headers).decode(bodyBytes); throw new EventSourceSubscriptionException(response.statusCode, body); } _readyState = EventSourceReadyState.OPEN; // start streaming the data response.stream.transform(_decoder).listen((Event event) { _streamController.add(event); _lastEventId = event.id; }, cancelOnError: true, onError: _retry, onDone: () => _readyState = EventSourceReadyState.CLOSED); } /// Retries until a new connection is established. Uses exponential backoff. Future _retry(dynamic e) async { _readyState = EventSourceReadyState.CONNECTING; // try reopening with exponential backoff Duration backoff = _retryDelay; while (true) { await new Future.delayed(backoff); try { await _start(); break; } catch (error) { _streamController.addError(error); backoff *= 2; } } } void _updateRetryDelay(Duration retry) { _retryDelay = retry; } } /// Returns the encoding to use for a response with the given headers. This /// defaults to [LATIN1] if the headers don't specify a charset or /// if that charset is unknown. Encoding _encodingForHeaders(Map headers) => encodingForCharset(_contentTypeForHeaders(headers).parameters['charset']); /// Returns the [MediaType] object for the given headers's content-type. /// /// Defaults to `application/octet-stream`. MediaType _contentTypeForHeaders(Map headers) { var contentType = headers['content-type']; if (contentType != null) return new MediaType.parse(contentType); return new MediaType("application", "octet-stream"); }