4
1
Fork 0

Resolves #19 added SSE subscribe streams
continuous-integration/drone/push Build is failing Details

pull/24/head
Daniel Lim 2019-10-06 23:20:06 +08:00
parent 2842e1867b
commit 3ff4759790
13 changed files with 514 additions and 3 deletions

1
.env
View File

@ -1,2 +1,3 @@
LISTEN=:8080
POSTGRES=postgresql://root@localhost:5432/core?sslmode=disable
NATS=nats://nats:4222

124
README.md
View File

@ -41,6 +41,10 @@ Unless otherwise noted, bodies and responses are with `Content-Type: application
| [Get Conversation Members](#Get-Conversation-Members) |
| [Create Contact](#Create-Contact) |
| [Get Contacts](#Get-Contacts) |
| [Subscribe Contact](#Subscribe-Contact) |
| [Subscribe Conversation](#Subscribe-Conversation) |
| [Subscribe User](#Subscribe-User) |
| [Subscribe Member](#Subscribe-Member) |
---
@ -573,3 +577,123 @@ List of user objects in user's contacts.
| ---- | ----------- |
| 400 | Invalid `X-User-Claim` header. |
| 500 | Error occurred retrieving entries from the database. |
---
### Subscribe Contact
```
GET /user/subscribe/contact
```
Subscribe to an Eventsource stream giving update in changes in state of the contacts database.
#### Success (200 OK)
An Eventsource stream. Each event (stringified json) will be of the following format:
```json
{
"type": "<add|update>",
"data": {
"usera": "<user id>",
"userb": "<user id>"
}
}
```
The json in the data field is also stringified.
---
### Subscribe Conversation
```
GET /user/subscribe/conversation
```
Subscribe to an Eventsource stream giving update in changes in state of the conversations database.
#### Success (200 OK)
An Eventsource stream. Each event (stringified json) will be of the following format:
```json
{
"type": "<add|update|delete>",
"data": {
"id": "<conversation id>",
"title": "<string>",
"dm": "<bool>",
"picture": "<string>",
"pinned": "<bool>"
}
}
```
The json in the data field is also stringified.
---
### Subscribe User
```
GET /user/subscribe/user
```
Subscribe to an Eventsource stream giving update in changes in state of the users database.
#### Success (200 OK)
An Eventsource stream. Each event (stringified json) will be of the following format:
```json
{
"type": "<add|update>",
"data": {
"id": "<user id>",
"username": "<string>",
"bio": "<string>",
"profile_pic": "<string>",
"last_name": "<string>",
"phone_number": "<string>"
}
}
```
The json in the data field is also stringified.
---
### Subscribe Member
```
GET /user/subscribe/conversation/:conversation/member
```
Subscribe to an Eventsource stream giving update in changes in state of the conversation members database.
#### URL Params
| Name | Type | Description | Required |
| ---- | ---- | ----------- | -------- |
| conversation | String | Conversation's ID. | ✓ |
#### Success (200 OK)
An Eventsource stream. Each event (stringified json) will be of the following format:
```json
{
"type": "<add|update>",
"data": {
"user": "<user id>",
"conversation": "<conversation id>",
"pinned": "<bool>"
}
}
```
The json in the data field is also stringified.
---

View File

@ -54,6 +54,25 @@ func (h *Handler) CreateContact(w http.ResponseWriter, r *http.Request, p httpro
return
}
// Publish NATs
if h.nc != nil {
contact := Contact {
UserA: userID,
UserB: contact.ID,
}
contactString, err := json.Marshal(&contact)
if err == nil {
updateMsg := UpdateMsg {
Type: "add",
Data: string(contactString),
}
updateMsgString, err := json.Marshal(&updateMsg)
if err == nil {
h.nc.Publish("contact", updateMsgString)
}
}
}
// Respond
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json")

View File

@ -57,6 +57,21 @@ func (h *Handler) CreateConversation(w http.ResponseWriter, r *http.Request, p h
return
}
// Publish NATs
if h.nc != nil {
conversationString, err := json.Marshal(&conversation)
if err == nil {
updateMsg := UpdateMsg {
Type: "add",
Data: string(conversationString),
}
updateMsgString, err := json.Marshal(&updateMsg)
if err == nil {
h.nc.Publish("conversation", updateMsgString)
}
}
}
// Respond
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(conversation)
@ -181,6 +196,21 @@ func (h *Handler) UpdateConversation(w http.ResponseWriter, r *http.Request, p h
}
}
// Publish NATs
if h.nc != nil {
conversationString, err := json.Marshal(&conversation)
if err == nil {
updateMsg := UpdateMsg {
Type: "update",
Data: string(conversationString),
}
updateMsgString, err := json.Marshal(&updateMsg)
if err == nil {
h.nc.Publish("conversation", updateMsgString)
}
}
}
w.WriteHeader(200)
}
@ -236,6 +266,24 @@ func (h *Handler) DeleteConversation(w http.ResponseWriter, r *http.Request, p h
return
}
// Publish NATs
if h.nc != nil {
conversation := Conversation {
ID: conversationID,
}
conversationString, err := json.Marshal(&conversation)
if err == nil {
updateMsg := UpdateMsg {
Type: "delete",
Data: string(conversationString),
}
updateMsgString, err := json.Marshal(&updateMsg)
if err == nil {
h.nc.Publish("conversation", updateMsgString)
}
}
}
w.WriteHeader(200)
}
@ -330,6 +378,26 @@ func (h *Handler) CreateConversationMember(w http.ResponseWriter, r *http.Reques
return
}
// Publish NATs
if h.nc != nil {
member := Member {
User: member.ID,
Conversation: conversationID,
Pinned: false, // default
}
memberString, err := json.Marshal(&member)
if err == nil {
updateMsg := UpdateMsg {
Type: "add",
Data: string(memberString),
}
updateMsgString, err := json.Marshal(&updateMsg)
if err == nil {
h.nc.Publish("member", updateMsgString)
}
}
}
// Respond
//w.Header().Set("Content-Type", "application/json")
//json.NewEncoder(w).Encode(member)
@ -397,5 +465,25 @@ func (h *Handler) PinConversation(w http.ResponseWriter, r *http.Request, p http
return
}
// Publish NATs
if h.nc != nil {
member := Member {
User: userID,
Conversation: conversationID,
Pinned: true,
}
memberString, err := json.Marshal(&member)
if err == nil {
updateMsg := UpdateMsg {
Type: "update",
Data: string(memberString),
}
updateMsgString, err := json.Marshal(&updateMsg)
if err == nil {
h.nc.Publish("member", updateMsgString)
}
}
}
w.WriteHeader(200)
}

3
go.mod
View File

@ -6,6 +6,9 @@ require (
github.com/joho/godotenv v1.3.0
github.com/julienschmidt/httprouter v0.0.0-20180715161854-348b672cd90d
github.com/lib/pq v0.0.0-20180523175426-90697d60dd84
github.com/nats-io/go-nats v1.7.2
github.com/nats-io/nkeys v0.1.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/ttacon/builder v0.0.0-20170518171403-c099f663e1c2 // indirect
github.com/ttacon/libphonenumber v1.0.0
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 // indirect

13
go.sum
View File

@ -8,9 +8,22 @@ github.com/julienschmidt/httprouter v0.0.0-20180715161854-348b672cd90d h1:of6+Tp
github.com/julienschmidt/httprouter v0.0.0-20180715161854-348b672cd90d/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/lib/pq v0.0.0-20180523175426-90697d60dd84 h1:it29sI2IM490luSc3RAhp5WuCYnc6RtbfLVAB7nmC5M=
github.com/lib/pq v0.0.0-20180523175426-90697d60dd84/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/nats-io/go-nats v1.7.2 h1:cJujlwCYR8iMz5ofZSD/p2WLW8FabhkQ2lIEVbSvNSA=
github.com/nats-io/go-nats v1.7.2/go.mod h1:+t7RHT5ApZebkrQdnn6AhQJmhJJiKAvJUio1PiiCtj0=
github.com/nats-io/nkeys v0.1.0 h1:qMd4+pRHgdr1nAClu+2h/2a5F2TmKcCzjCDazVgRoX4=
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/ttacon/builder v0.0.0-20170518171403-c099f663e1c2 h1:5u+EJUQiosu3JFX0XS0qTf5FznsMOzTjGqavBGuCbo0=
github.com/ttacon/builder v0.0.0-20170518171403-c099f663e1c2/go.mod h1:4kyMkleCiLkgY6z8gK5BkI01ChBtxR0ro3I1ZDcGM3w=
github.com/ttacon/libphonenumber v1.0.0 h1:5DJsnAoMCC+LjJ6lQqjjf2EHiDD6KH4rVhDsMn5oXII=
github.com/ttacon/libphonenumber v1.0.0/go.mod h1:E0TpmdVMq5dyVlQ7oenAkhsLu86OkUl+yR4OAxyEg/M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

View File

@ -2,12 +2,41 @@ package main
import (
"database/sql"
"github.com/nats-io/go-nats"
)
type Handler struct {
db *sql.DB
nc *nats.Conn
contactConnections map[string]chan []byte
conversationConnections map[string]chan []byte
userConnections map[string]chan []byte
memberConnections map[string]map[string]chan []byte
}
func NewHandler(db *sql.DB) *Handler {
return &Handler{db}
func NewHandler(db *sql.DB, nc *nats.Conn) *Handler {
contactConnections := make(map[string]chan []byte)
conversationConnections := make(map[string]chan []byte)
userConnections := make(map[string]chan []byte)
memberConnections := make(map[string]map[string]chan []byte)
h := &Handler{
db,
nc,
contactConnections,
conversationConnections,
userConnections,
memberConnections,
}
if nc != nil {
nc.Subscribe("contacts", h.ContactHandler)
nc.Subscribe("conversations", h.ConversationHandler)
nc.Subscribe("users", h.UserHandler)
nc.Subscribe("members", h.MemberHandler)
}
return h
}

19
main.go
View File

@ -7,6 +7,7 @@ import (
"os"
"github.com/joho/godotenv"
"github.com/nats-io/go-nats"
_ "github.com/lib/pq"
)
@ -26,8 +27,10 @@ func main() {
// Database
db := connect()
// NATs
nc := connectNats()
// Handler
h := NewHandler(db)
h := NewHandler(db, nc)
// Routes
router := NewRouter(h)
@ -51,3 +54,17 @@ func connect() *sql.DB {
return db
}
func connectNats() *nats.Conn {
natsHost := os.Getenv("NATS")
var nc *nats.Conn
var err error
if natsHost != "" {
log.Printf("connecting to nats %s", natsHost)
nc, err = nats.Connect(natsHost)
if err != nil {
log.Fatal(err)
}
}
return nc
}

101
nats_handlers.go Normal file
View File

@ -0,0 +1,101 @@
package main
import (
"encoding/json"
"log"
"github.com/nats-io/go-nats"
)
func (h *Handler) ContactHandler(msg *nats.Msg) {
// Validate JSON
updateMsg := UpdateMsg{}
err := json.Unmarshal(msg.Data, &updateMsg)
if err != nil {
log.Println(err)
return
}
contact := Contact{}
err = json.Unmarshal([]byte(updateMsg.Data), &contact)
if err != nil {
log.Println(err)
return
}
// Transmit
for _, conn := range h.contactConnections {
conn <- msg.Data
}
}
func (h *Handler) ConversationHandler(msg *nats.Msg) {
// Validate JSON
updateMsg := UpdateMsg{}
err := json.Unmarshal(msg.Data, &updateMsg)
if err != nil {
log.Println(err)
return
}
conversation := Conversation{}
err = json.Unmarshal([]byte(updateMsg.Data), &conversation)
if err != nil {
log.Println(err)
return
}
// Transmit
for _, conn := range h.conversationConnections {
conn <- msg.Data
}
}
func (h *Handler) UserHandler(msg *nats.Msg) {
// Validate JSON
updateMsg := UpdateMsg{}
err := json.Unmarshal(msg.Data, &updateMsg)
if err != nil {
log.Println(err)
return
}
user := User{}
err = json.Unmarshal([]byte(updateMsg.Data), &user)
if err != nil {
log.Println(err)
return
}
// Transmit
for _, conn := range h.userConnections {
conn <- msg.Data
}
}
func (h *Handler) MemberHandler(msg *nats.Msg) {
// Validate JSON
updateMsg := UpdateMsg{}
err := json.Unmarshal(msg.Data, &updateMsg)
if err != nil {
log.Println(err)
return
}
member := Member{}
err = json.Unmarshal([]byte(updateMsg.Data), &member)
if err != nil {
log.Println(err)
return
}
// Get transmit channel
if channels, ok := h.memberConnections[member.Conversation]; ok {
// Transmit
for _, conn := range channels {
conn <- msg.Data
}
} else {
log.Printf("member conversation %s not found\n", member.Conversation)
}
}

View File

@ -38,5 +38,11 @@ func NewRouter(h *Handler) *httprouter.Router {
//router.DELETE("/user/:user/contact/:contact", h.DeleteContact)
//router.GET("/user/:user/contact/:contact/conversation/", h.GetContactConversations)
// Subscribe
router.GET("/user/subscribe/contact", AuthMiddleware(h.SubscribeContact))
router.GET("/user/subscribe/conversation", AuthMiddleware(h.SubscribeConversation))
router.GET("/user/subscribe", AuthMiddleware(h.SubscribeUser))
router.GET("/user/subscribe/conversation/:conversation/member", AuthMiddleware(h.SubscribeMember))
return router
}

64
subscribe.go Normal file
View File

@ -0,0 +1,64 @@
package main
import (
"fmt"
"net/http"
"time"
"github.com/julienschmidt/httprouter"
)
func (h *Handler) SubscribeContact(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
Subscribe(h.contactConnections, w, r, p)
}
func (h *Handler) SubscribeConversation(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
Subscribe(h.conversationConnections, w, r, p)
}
func (h *Handler) SubscribeUser(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
Subscribe(h.userConnections, w, r, p)
}
func (h *Handler) SubscribeMember(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
conversation := p.ByName("conversation")
if _, ok := h.memberConnections[conversation]; !ok {
h.memberConnections[conversation] = make(map[string]chan []byte)
}
Subscribe(h.memberConnections[conversation], w, r, p)
}
func Subscribe(channels map[string]chan []byte, w http.ResponseWriter, r *http.Request, p httprouter.Params) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
id := RandomHex()
recv := make(chan []byte)
channels[id] = recv
// Refresh connection periodically
resClosed := w.(http.CloseNotifier).CloseNotify()
ticker := time.NewTicker(25 * time.Second)
for {
select {
case msg := <-recv:
fmt.Fprintf(w, "data: %s\n\n", msg)
flusher.Flush()
case <- ticker.C:
w.Write([]byte(":\n\n"))
case <- resClosed:
ticker.Stop()
delete(channels, id)
return
}
}
}

View File

@ -2,6 +2,22 @@ package main
// String pointer means nullable
type UpdateMsg struct {
Type string `json:"type"`
Data string `json:"data"`
}
type Contact struct {
UserA string `json:"usera"` // First user ID
UserB string `json:"userb"` // Second user ID
}
type Member struct {
User string `json:"user"`
Conversation string `json:"conversation"`
Pinned bool `json:"pinned"`
}
type Conversation struct {
ID string `json:"id"` // id
Title string `json:"title"` // title

30
user.go
View File

@ -50,6 +50,21 @@ func (h *Handler) CreateUser(w http.ResponseWriter, r *http.Request, _ httproute
}
user.ID = finalId
// Publish NATs
if h.nc != nil {
userString, err := json.Marshal(&user)
if err == nil {
updateMsg := UpdateMsg {
Type: "add",
Data: string(userString),
}
updateMsgString, err := json.Marshal(&updateMsg)
if err == nil {
h.nc.Publish("user", updateMsgString)
}
}
}
// Respond
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(user)
@ -166,5 +181,20 @@ func (h *Handler) UpdateUser(w http.ResponseWriter, r *http.Request, p httproute
return
}
// Publish NATs
if h.nc != nil {
userString, err := json.Marshal(&user)
if err == nil {
updateMsg := UpdateMsg {
Type: "update",
Data: string(userString),
}
updateMsgString, err := json.Marshal(&updateMsg)
if err == nil {
h.nc.Publish("user", updateMsgString)
}
}
}
w.WriteHeader(200)
}