diff --git a/.env b/.env index b388ac2..5b21f4e 100644 --- a/.env +++ b/.env @@ -1,2 +1,3 @@ LISTEN=:8080 POSTGRES=postgresql://root@localhost:5432/core?sslmode=disable +NATS=nats://nats:4222 diff --git a/README.md b/README.md index b8e7ed7..534e8c3 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,10 @@ Unless otherwise noted, bodies and responses are with `Content-Type: application | [Get Conversation Members](#Get-Conversation-Members) | | [Create Contact](#Create-Contact) | | [Get Contacts](#Get-Contacts) | +| [Subscribe Contact](#Subscribe-Contact) | +| [Subscribe Conversation](#Subscribe-Conversation) | +| [Subscribe User](#Subscribe-User) | +| [Subscribe Member](#Subscribe-Member) | --- @@ -573,3 +577,123 @@ List of user objects in user's contacts. | ---- | ----------- | | 400 | Invalid `X-User-Claim` header. | | 500 | Error occurred retrieving entries from the database. | + +--- + +### Subscribe Contact + +``` +GET /user/subscribe/contact +``` + +Subscribe to an Eventsource stream giving update in changes in state of the contacts database. + +#### Success (200 OK) + +An Eventsource stream. Each event (stringified json) will be of the following format: + +```json +{ + "type": "", + "data": { + "usera": "", + "userb": "" + } +} +``` + +The json in the data field is also stringified. + +--- + +### Subscribe Conversation + +``` +GET /user/subscribe/conversation +``` + +Subscribe to an Eventsource stream giving update in changes in state of the conversations database. + +#### Success (200 OK) + +An Eventsource stream. Each event (stringified json) will be of the following format: + +```json +{ + "type": "", + "data": { + "id": "", + "title": "", + "dm": "", + "picture": "", + "pinned": "" + } +} +``` + +The json in the data field is also stringified. + +--- + +### Subscribe User + +``` +GET /user/subscribe/user +``` + +Subscribe to an Eventsource stream giving update in changes in state of the users database. + +#### Success (200 OK) + +An Eventsource stream. Each event (stringified json) will be of the following format: + +```json +{ + "type": "", + "data": { + "id": "", + "username": "", + "bio": "", + "profile_pic": "", + "last_name": "", + "phone_number": "" + } +} +``` + +The json in the data field is also stringified. + +--- + +### Subscribe Member + +``` +GET /user/subscribe/conversation/:conversation/member +``` + +Subscribe to an Eventsource stream giving update in changes in state of the conversation members database. + +#### URL Params + +| Name | Type | Description | Required | +| ---- | ---- | ----------- | -------- | +| conversation | String | Conversation's ID. | ✓ | + +#### Success (200 OK) + +An Eventsource stream. Each event (stringified json) will be of the following format: + +```json +{ + "type": "", + "data": { + "user": "", + "conversation": "", + "pinned": "" + } +} +``` + +The json in the data field is also stringified. + +--- diff --git a/contact.go b/contact.go index b9f4b2b..8bab544 100644 --- a/contact.go +++ b/contact.go @@ -54,6 +54,25 @@ func (h *Handler) CreateContact(w http.ResponseWriter, r *http.Request, p httpro return } + // Publish NATs + if h.nc != nil { + contact := Contact { + UserA: userID, + UserB: contact.ID, + } + contactString, err := json.Marshal(&contact) + if err == nil { + updateMsg := UpdateMsg { + Type: "add", + Data: string(contactString), + } + updateMsgString, err := json.Marshal(&updateMsg) + if err == nil { + h.nc.Publish("contact", updateMsgString) + } + } + } + // Respond w.WriteHeader(200) w.Header().Set("Content-Type", "application/json") diff --git a/conversation.go b/conversation.go index d7093d5..9cbc79b 100644 --- a/conversation.go +++ b/conversation.go @@ -57,6 +57,21 @@ func (h *Handler) CreateConversation(w http.ResponseWriter, r *http.Request, p h return } + // Publish NATs + if h.nc != nil { + conversationString, err := json.Marshal(&conversation) + if err == nil { + updateMsg := UpdateMsg { + Type: "add", + Data: string(conversationString), + } + updateMsgString, err := json.Marshal(&updateMsg) + if err == nil { + h.nc.Publish("conversation", updateMsgString) + } + } + } + // Respond w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(conversation) @@ -181,6 +196,21 @@ func (h *Handler) UpdateConversation(w http.ResponseWriter, r *http.Request, p h } } + // Publish NATs + if h.nc != nil { + conversationString, err := json.Marshal(&conversation) + if err == nil { + updateMsg := UpdateMsg { + Type: "update", + Data: string(conversationString), + } + updateMsgString, err := json.Marshal(&updateMsg) + if err == nil { + h.nc.Publish("conversation", updateMsgString) + } + } + } + w.WriteHeader(200) } @@ -236,6 +266,24 @@ func (h *Handler) DeleteConversation(w http.ResponseWriter, r *http.Request, p h return } + // Publish NATs + if h.nc != nil { + conversation := Conversation { + ID: conversationID, + } + conversationString, err := json.Marshal(&conversation) + if err == nil { + updateMsg := UpdateMsg { + Type: "delete", + Data: string(conversationString), + } + updateMsgString, err := json.Marshal(&updateMsg) + if err == nil { + h.nc.Publish("conversation", updateMsgString) + } + } + } + w.WriteHeader(200) } @@ -330,6 +378,26 @@ func (h *Handler) CreateConversationMember(w http.ResponseWriter, r *http.Reques return } + // Publish NATs + if h.nc != nil { + member := Member { + User: member.ID, + Conversation: conversationID, + Pinned: false, // default + } + memberString, err := json.Marshal(&member) + if err == nil { + updateMsg := UpdateMsg { + Type: "add", + Data: string(memberString), + } + updateMsgString, err := json.Marshal(&updateMsg) + if err == nil { + h.nc.Publish("member", updateMsgString) + } + } + } + // Respond //w.Header().Set("Content-Type", "application/json") //json.NewEncoder(w).Encode(member) @@ -397,5 +465,25 @@ func (h *Handler) PinConversation(w http.ResponseWriter, r *http.Request, p http return } + // Publish NATs + if h.nc != nil { + member := Member { + User: userID, + Conversation: conversationID, + Pinned: true, + } + memberString, err := json.Marshal(&member) + if err == nil { + updateMsg := UpdateMsg { + Type: "update", + Data: string(memberString), + } + updateMsgString, err := json.Marshal(&updateMsg) + if err == nil { + h.nc.Publish("member", updateMsgString) + } + } + } + w.WriteHeader(200) } diff --git a/go.mod b/go.mod index ba5dd0c..c7975ee 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,9 @@ require ( github.com/joho/godotenv v1.3.0 github.com/julienschmidt/httprouter v0.0.0-20180715161854-348b672cd90d github.com/lib/pq v0.0.0-20180523175426-90697d60dd84 + github.com/nats-io/go-nats v1.7.2 + github.com/nats-io/nkeys v0.1.0 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/ttacon/builder v0.0.0-20170518171403-c099f663e1c2 // indirect github.com/ttacon/libphonenumber v1.0.0 golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 // indirect diff --git a/go.sum b/go.sum index 02e4a6d..23777db 100644 --- a/go.sum +++ b/go.sum @@ -8,9 +8,22 @@ github.com/julienschmidt/httprouter v0.0.0-20180715161854-348b672cd90d h1:of6+Tp github.com/julienschmidt/httprouter v0.0.0-20180715161854-348b672cd90d/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/lib/pq v0.0.0-20180523175426-90697d60dd84 h1:it29sI2IM490luSc3RAhp5WuCYnc6RtbfLVAB7nmC5M= github.com/lib/pq v0.0.0-20180523175426-90697d60dd84/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/nats-io/go-nats v1.7.2 h1:cJujlwCYR8iMz5ofZSD/p2WLW8FabhkQ2lIEVbSvNSA= +github.com/nats-io/go-nats v1.7.2/go.mod h1:+t7RHT5ApZebkrQdnn6AhQJmhJJiKAvJUio1PiiCtj0= +github.com/nats-io/nkeys v0.1.0 h1:qMd4+pRHgdr1nAClu+2h/2a5F2TmKcCzjCDazVgRoX4= +github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/ttacon/builder v0.0.0-20170518171403-c099f663e1c2 h1:5u+EJUQiosu3JFX0XS0qTf5FznsMOzTjGqavBGuCbo0= github.com/ttacon/builder v0.0.0-20170518171403-c099f663e1c2/go.mod h1:4kyMkleCiLkgY6z8gK5BkI01ChBtxR0ro3I1ZDcGM3w= github.com/ttacon/libphonenumber v1.0.0 h1:5DJsnAoMCC+LjJ6lQqjjf2EHiDD6KH4rVhDsMn5oXII= github.com/ttacon/libphonenumber v1.0.0/go.mod h1:E0TpmdVMq5dyVlQ7oenAkhsLu86OkUl+yR4OAxyEg/M= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc= +golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/handlers.go b/handlers.go index 4ad5dec..6db6edd 100644 --- a/handlers.go +++ b/handlers.go @@ -2,12 +2,41 @@ package main import ( "database/sql" + + "github.com/nats-io/go-nats" ) type Handler struct { db *sql.DB + nc *nats.Conn + + contactConnections map[string]chan []byte + conversationConnections map[string]chan []byte + userConnections map[string]chan []byte + memberConnections map[string]map[string]chan []byte } -func NewHandler(db *sql.DB) *Handler { - return &Handler{db} +func NewHandler(db *sql.DB, nc *nats.Conn) *Handler { + contactConnections := make(map[string]chan []byte) + conversationConnections := make(map[string]chan []byte) + userConnections := make(map[string]chan []byte) + memberConnections := make(map[string]map[string]chan []byte) + + h := &Handler{ + db, + nc, + contactConnections, + conversationConnections, + userConnections, + memberConnections, + } + + if nc != nil { + nc.Subscribe("contacts", h.ContactHandler) + nc.Subscribe("conversations", h.ConversationHandler) + nc.Subscribe("users", h.UserHandler) + nc.Subscribe("members", h.MemberHandler) + } + + return h } diff --git a/main.go b/main.go index 6414575..d402785 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,7 @@ import ( "os" "github.com/joho/godotenv" + "github.com/nats-io/go-nats" _ "github.com/lib/pq" ) @@ -26,8 +27,10 @@ func main() { // Database db := connect() + // NATs + nc := connectNats() // Handler - h := NewHandler(db) + h := NewHandler(db, nc) // Routes router := NewRouter(h) @@ -51,3 +54,17 @@ func connect() *sql.DB { return db } + +func connectNats() *nats.Conn { + natsHost := os.Getenv("NATS") + var nc *nats.Conn + var err error + if natsHost != "" { + log.Printf("connecting to nats %s", natsHost) + nc, err = nats.Connect(natsHost) + if err != nil { + log.Fatal(err) + } + } + return nc +} diff --git a/nats_handlers.go b/nats_handlers.go new file mode 100644 index 0000000..5e20be8 --- /dev/null +++ b/nats_handlers.go @@ -0,0 +1,101 @@ +package main + +import ( + "encoding/json" + "log" + + "github.com/nats-io/go-nats" +) + +func (h *Handler) ContactHandler(msg *nats.Msg) { + // Validate JSON + updateMsg := UpdateMsg{} + err := json.Unmarshal(msg.Data, &updateMsg) + if err != nil { + log.Println(err) + return + } + + contact := Contact{} + err = json.Unmarshal([]byte(updateMsg.Data), &contact) + if err != nil { + log.Println(err) + return + } + + // Transmit + for _, conn := range h.contactConnections { + conn <- msg.Data + } +} + +func (h *Handler) ConversationHandler(msg *nats.Msg) { + // Validate JSON + updateMsg := UpdateMsg{} + err := json.Unmarshal(msg.Data, &updateMsg) + if err != nil { + log.Println(err) + return + } + + conversation := Conversation{} + err = json.Unmarshal([]byte(updateMsg.Data), &conversation) + if err != nil { + log.Println(err) + return + } + + // Transmit + for _, conn := range h.conversationConnections { + conn <- msg.Data + } +} + +func (h *Handler) UserHandler(msg *nats.Msg) { + // Validate JSON + updateMsg := UpdateMsg{} + err := json.Unmarshal(msg.Data, &updateMsg) + if err != nil { + log.Println(err) + return + } + + user := User{} + err = json.Unmarshal([]byte(updateMsg.Data), &user) + if err != nil { + log.Println(err) + return + } + + // Transmit + for _, conn := range h.userConnections { + conn <- msg.Data + } +} + +func (h *Handler) MemberHandler(msg *nats.Msg) { + // Validate JSON + updateMsg := UpdateMsg{} + err := json.Unmarshal(msg.Data, &updateMsg) + if err != nil { + log.Println(err) + return + } + + member := Member{} + err = json.Unmarshal([]byte(updateMsg.Data), &member) + if err != nil { + log.Println(err) + return + } + + // Get transmit channel + if channels, ok := h.memberConnections[member.Conversation]; ok { + // Transmit + for _, conn := range channels { + conn <- msg.Data + } + } else { + log.Printf("member conversation %s not found\n", member.Conversation) + } +} diff --git a/router.go b/router.go index 0ad1c67..308741c 100644 --- a/router.go +++ b/router.go @@ -38,5 +38,11 @@ func NewRouter(h *Handler) *httprouter.Router { //router.DELETE("/user/:user/contact/:contact", h.DeleteContact) //router.GET("/user/:user/contact/:contact/conversation/", h.GetContactConversations) + // Subscribe + router.GET("/user/subscribe/contact", AuthMiddleware(h.SubscribeContact)) + router.GET("/user/subscribe/conversation", AuthMiddleware(h.SubscribeConversation)) + router.GET("/user/subscribe", AuthMiddleware(h.SubscribeUser)) + router.GET("/user/subscribe/conversation/:conversation/member", AuthMiddleware(h.SubscribeMember)) + return router } diff --git a/subscribe.go b/subscribe.go new file mode 100644 index 0000000..20dbf05 --- /dev/null +++ b/subscribe.go @@ -0,0 +1,64 @@ +package main + +import ( + "fmt" + "net/http" + "time" + + "github.com/julienschmidt/httprouter" +) + +func (h *Handler) SubscribeContact(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + Subscribe(h.contactConnections, w, r, p) +} + +func (h *Handler) SubscribeConversation(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + Subscribe(h.conversationConnections, w, r, p) +} + +func (h *Handler) SubscribeUser(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + Subscribe(h.userConnections, w, r, p) +} + +func (h *Handler) SubscribeMember(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + conversation := p.ByName("conversation") + if _, ok := h.memberConnections[conversation]; !ok { + h.memberConnections[conversation] = make(map[string]chan []byte) + } + + Subscribe(h.memberConnections[conversation], w, r, p) +} + +func Subscribe(channels map[string]chan []byte, w http.ResponseWriter, r *http.Request, p httprouter.Params) { + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + id := RandomHex() + recv := make(chan []byte) + channels[id] = recv + + // Refresh connection periodically + resClosed := w.(http.CloseNotifier).CloseNotify() + ticker := time.NewTicker(25 * time.Second) + + for { + select { + case msg := <-recv: + fmt.Fprintf(w, "data: %s\n\n", msg) + flusher.Flush() + case <- ticker.C: + w.Write([]byte(":\n\n")) + case <- resClosed: + ticker.Stop() + delete(channels, id) + return + } + } +} diff --git a/types.go b/types.go index 59773f3..bc59099 100644 --- a/types.go +++ b/types.go @@ -2,6 +2,22 @@ package main // String pointer means nullable +type UpdateMsg struct { + Type string `json:"type"` + Data string `json:"data"` +} + +type Contact struct { + UserA string `json:"usera"` // First user ID + UserB string `json:"userb"` // Second user ID +} + +type Member struct { + User string `json:"user"` + Conversation string `json:"conversation"` + Pinned bool `json:"pinned"` +} + type Conversation struct { ID string `json:"id"` // id Title string `json:"title"` // title diff --git a/user.go b/user.go index 647c460..09f3b1f 100644 --- a/user.go +++ b/user.go @@ -50,6 +50,21 @@ func (h *Handler) CreateUser(w http.ResponseWriter, r *http.Request, _ httproute } user.ID = finalId + // Publish NATs + if h.nc != nil { + userString, err := json.Marshal(&user) + if err == nil { + updateMsg := UpdateMsg { + Type: "add", + Data: string(userString), + } + updateMsgString, err := json.Marshal(&updateMsg) + if err == nil { + h.nc.Publish("user", updateMsgString) + } + } + } + // Respond w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(user) @@ -166,5 +181,20 @@ func (h *Handler) UpdateUser(w http.ResponseWriter, r *http.Request, p httproute return } + // Publish NATs + if h.nc != nil { + userString, err := json.Marshal(&user) + if err == nil { + updateMsg := UpdateMsg { + Type: "update", + Data: string(userString), + } + updateMsgString, err := json.Marshal(&updateMsg) + if err == nil { + h.nc.Publish("user", updateMsgString) + } + } + } + w.WriteHeader(200) }