diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f2dd955 --- /dev/null +++ b/.gitignore @@ -0,0 +1,12 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out diff --git a/README.md b/README.md index e19ead9..a3f43cf 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,33 @@ # backend-subscribe -Client subscribe counterpart to backend-publish. \ No newline at end of file +Client subscribe counterpart to backend-publish. Subscribe to receive the results of your requests to backend-publish in some weird extended streaming async HTTP-ish thing. + +## API + +``` +GET /subscribe/:userid/client/:clientid +``` + +Subscribe to your SSE stream. + +### URL Params + +In the future, this will be supplied via token. + +| Name | Type | Description | Required | +| ---- | ---- | ----------- | -------- | +| userid | String | User's ID. | ✓ | +| clientid | String | Device's ID. Must be unique to the device. I suggest something based on MAC address. | ✓ | + +### Success Response (200 OK) + +An [EventSource](https://developer.mozilla.org/en-US/docs/Web/API/EventSource) stream. + +### Event + +``` +{ + "code": + "message": +} +``` diff --git a/client.pb.go b/client.pb.go new file mode 100644 index 0000000..b968e65 --- /dev/null +++ b/client.pb.go @@ -0,0 +1,84 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: client.proto + +package main + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type Client struct { + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + Client string `protobuf:"bytes,2,opt,name=client,proto3" json:"client,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Client) Reset() { *m = Client{} } +func (m *Client) String() string { return proto.CompactTextString(m) } +func (*Client) ProtoMessage() {} +func (*Client) Descriptor() ([]byte, []int) { + return fileDescriptor_014de31d7ac8c57c, []int{0} +} + +func (m *Client) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Client.Unmarshal(m, b) +} +func (m *Client) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Client.Marshal(b, m, deterministic) +} +func (m *Client) XXX_Merge(src proto.Message) { + xxx_messageInfo_Client.Merge(m, src) +} +func (m *Client) XXX_Size() int { + return xxx_messageInfo_Client.Size(m) +} +func (m *Client) XXX_DiscardUnknown() { + xxx_messageInfo_Client.DiscardUnknown(m) +} + +var xxx_messageInfo_Client proto.InternalMessageInfo + +func (m *Client) GetKey() string { + if m != nil { + return m.Key + } + return "" +} + +func (m *Client) GetClient() string { + if m != nil { + return m.Client + } + return "" +} + +func init() { + proto.RegisterType((*Client)(nil), "main.Client") +} + +func init() { proto.RegisterFile("client.proto", fileDescriptor_014de31d7ac8c57c) } + +var fileDescriptor_014de31d7ac8c57c = []byte{ + // 83 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x49, 0xce, 0xc9, 0x4c, + 0xcd, 0x2b, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0xc9, 0x4d, 0xcc, 0xcc, 0x53, 0x32, + 0xe2, 0x62, 0x73, 0x06, 0x8b, 0x0a, 0x09, 0x70, 0x31, 0x67, 0xa7, 0x56, 0x4a, 0x30, 0x2a, 0x30, + 0x6a, 0x70, 0x06, 0x81, 0x98, 0x42, 0x62, 0x5c, 0x6c, 0x10, 0x1d, 0x12, 0x4c, 0x60, 0x41, 0x28, + 0x2f, 0x89, 0x0d, 0x6c, 0x80, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0x95, 0x71, 0x3f, 0xbd, 0x50, + 0x00, 0x00, 0x00, +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..5d37906 --- /dev/null +++ b/main.go @@ -0,0 +1,118 @@ +package main + +import ( + "encoding/json" + "flag" + "fmt" + "log" + "net/http" + "time" + + "github.com/julienschmidt/httprouter" + "github.com/nats-io/go-nats" + "github.com/golang/protobuf/proto" +) + +type RawClient struct { + UserId string + ClientId string +} + +type JsonResponse struct { + Code uint32 `json:"code"` + Message string `json:"message"` +} + +var listen string +var natsHost string + +var nc *nats.Conn +var connections map[RawClient]chan []byte + +func main() { + // Parse flags + flag.StringVar(&listen, "listen", ":8080", "host and port to listen on") + flag.StringVar(&natsHost, "nats", "nats://localhost:4222", "host and port of NATS") + flag.Parse() + + connections = make(map[RawClient]chan []byte) + + // Routes + router := httprouter.New() + router.GET("/subscribe/:userid/client/:clientid", Subscribe) + + // NATS + var err error + nc, err = nats.Connect(natsHost) + if err != nil { + log.Fatal(err) + } + nc.Subscribe("res", ResponseHandler) + + // Start server + log.Printf("starting server on %s", listen) + log.Fatal(http.ListenAndServe(listen, router)) +} + +func Subscribe(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + client := RawClient { + UserId: p.ByName("userid"), + ClientId: p.ByName("clintid"), + } + recv := make(chan []byte) + connections[client] = recv; + + // Refresh connection periodically + resClosed := w.(http.CloseNotifier).CloseNotify() + ticker := time.NewTicker(25 * time.Second) + + for { + select { + case msg := <- recv: + fmt.Fprintf(w, "data: %s\n\n", msg) + flusher.Flush() + case <- ticker.C: + w.Write([]byte(":\n\n")) + case <- resClosed: + ticker.Stop() + delete(connections, client) + return + } + } +} + +func ResponseHandler(msg *nats.Msg) { + response := Response {} + if err := proto.Unmarshal(msg.Data, &response); err != nil { // Fail quietly + log.Println(err) + return + } + + client := RawClient { + UserId: response.Client.Key, + ClientId: response.Client.Client, + } + connection, present := connections[client] + if present { + res := JsonResponse { + Code: response.Code, + Message: string(response.Message), + } + json_string, err := json.Marshal(res) + if err != nil { + log.Println(err) + return + } + connection <- []byte(json_string) + } +} diff --git a/response.pb.go b/response.pb.go new file mode 100644 index 0000000..3797a39 --- /dev/null +++ b/response.pb.go @@ -0,0 +1,95 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: response.proto + +package main + +import ( + fmt "fmt" + proto "github.com/golang/protobuf/proto" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type Response struct { + Code uint32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"` + Message []byte `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` + Client *Client `protobuf:"bytes,3,opt,name=client,proto3" json:"client,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Response) Reset() { *m = Response{} } +func (m *Response) String() string { return proto.CompactTextString(m) } +func (*Response) ProtoMessage() {} +func (*Response) Descriptor() ([]byte, []int) { + return fileDescriptor_0fbc901015fa5021, []int{0} +} + +func (m *Response) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Response.Unmarshal(m, b) +} +func (m *Response) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Response.Marshal(b, m, deterministic) +} +func (m *Response) XXX_Merge(src proto.Message) { + xxx_messageInfo_Response.Merge(m, src) +} +func (m *Response) XXX_Size() int { + return xxx_messageInfo_Response.Size(m) +} +func (m *Response) XXX_DiscardUnknown() { + xxx_messageInfo_Response.DiscardUnknown(m) +} + +var xxx_messageInfo_Response proto.InternalMessageInfo + +func (m *Response) GetCode() uint32 { + if m != nil { + return m.Code + } + return 0 +} + +func (m *Response) GetMessage() []byte { + if m != nil { + return m.Message + } + return nil +} + +func (m *Response) GetClient() *Client { + if m != nil { + return m.Client + } + return nil +} + +func init() { + proto.RegisterType((*Response)(nil), "main.Response") +} + +func init() { proto.RegisterFile("response.proto", fileDescriptor_0fbc901015fa5021) } + +var fileDescriptor_0fbc901015fa5021 = []byte{ + // 129 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2b, 0x4a, 0x2d, 0x2e, + 0xc8, 0xcf, 0x2b, 0x4e, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0xc9, 0x4d, 0xcc, 0xcc, + 0x93, 0xe2, 0x49, 0xce, 0xc9, 0x4c, 0xcd, 0x2b, 0x81, 0x88, 0x29, 0xc5, 0x71, 0x71, 0x04, 0x41, + 0x55, 0x09, 0x09, 0x71, 0xb1, 0x24, 0xe7, 0xa7, 0xa4, 0x4a, 0x30, 0x2a, 0x30, 0x6a, 0xf0, 0x06, + 0x81, 0xd9, 0x42, 0x12, 0x5c, 0xec, 0xb9, 0xa9, 0xc5, 0xc5, 0x89, 0xe9, 0xa9, 0x12, 0x4c, 0x0a, + 0x8c, 0x1a, 0x3c, 0x41, 0x30, 0xae, 0x90, 0x0a, 0x17, 0x1b, 0xc4, 0x24, 0x09, 0x66, 0x05, 0x46, + 0x0d, 0x6e, 0x23, 0x1e, 0x3d, 0x90, 0xf1, 0x7a, 0xce, 0x60, 0xb1, 0x20, 0xa8, 0x5c, 0x12, 0x1b, + 0xd8, 0x1a, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0x14, 0x8c, 0xe3, 0xa3, 0x8c, 0x00, 0x00, + 0x00, +}