4
1
Fork 0

Fix formatting
continuous-integration/drone/push Build is failing Details

pull/24/head
Ambrose Chua 2019-10-23 13:49:43 +08:00
parent 3ff4759790
commit b273ee9390
Signed by: ambrose
GPG Key ID: BC367D33F140B5C2
11 changed files with 314 additions and 295 deletions

View File

@ -1,4 +1,19 @@
kind: pipeline kind: pipeline
name: tests-fmt
steps:
- name: go
image: golang:1.13
commands:
- make test_fmt
trigger:
branch:
- master
- develop
event:
- push
- pull_request
---
kind: pipeline
name: tests-integration name: tests-integration
steps: steps:
- name: wait - name: wait
@ -36,7 +51,7 @@ kind: pipeline
name: tests-unit name: tests-unit
steps: steps:
- name: go - name: go
image: golang:1.12 image: golang:1.13
commands: commands:
- make test_unit - make test_unit
trigger: trigger:

View File

@ -3,6 +3,7 @@ GORUN=$(GOCMD) run
GOBUILD=$(GOCMD) build GOBUILD=$(GOCMD) build
GOCLEAN=$(GOCMD) clean GOCLEAN=$(GOCMD) clean
GOTEST=$(GOCMD) test GOTEST=$(GOCMD) test
GOFMT_PROG=gofmt
DOCKERCOMPOSE=docker-compose DOCKERCOMPOSE=docker-compose
@ -18,6 +19,9 @@ build:
test: test_unit test_integration test: test_unit test_integration
test_fmt:
$(GOFMT_PROG) -l .
test_unit: test_unit:
$(GOTEST) -tags=unit -v -cover $(GOTEST) -tags=unit -v -cover

View File

@ -54,24 +54,24 @@ func (h *Handler) CreateContact(w http.ResponseWriter, r *http.Request, p httpro
return return
} }
// Publish NATs // Publish NATs
if h.nc != nil { if h.nc != nil {
contact := Contact { contact := Contact{
UserA: userID, UserA: userID,
UserB: contact.ID, UserB: contact.ID,
} }
contactString, err := json.Marshal(&contact) contactString, err := json.Marshal(&contact)
if err == nil { if err == nil {
updateMsg := UpdateMsg { updateMsg := UpdateMsg{
Type: "add", Type: "add",
Data: string(contactString), Data: string(contactString),
} }
updateMsgString, err := json.Marshal(&updateMsg) updateMsgString, err := json.Marshal(&updateMsg)
if err == nil { if err == nil {
h.nc.Publish("contact", updateMsgString) h.nc.Publish("contact", updateMsgString)
} }
} }
} }
// Respond // Respond
w.WriteHeader(200) w.WriteHeader(200)

View File

@ -57,20 +57,20 @@ func (h *Handler) CreateConversation(w http.ResponseWriter, r *http.Request, p h
return return
} }
// Publish NATs // Publish NATs
if h.nc != nil { if h.nc != nil {
conversationString, err := json.Marshal(&conversation) conversationString, err := json.Marshal(&conversation)
if err == nil { if err == nil {
updateMsg := UpdateMsg { updateMsg := UpdateMsg{
Type: "add", Type: "add",
Data: string(conversationString), Data: string(conversationString),
} }
updateMsgString, err := json.Marshal(&updateMsg) updateMsgString, err := json.Marshal(&updateMsg)
if err == nil { if err == nil {
h.nc.Publish("conversation", updateMsgString) h.nc.Publish("conversation", updateMsgString)
} }
} }
} }
// Respond // Respond
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
@ -196,20 +196,20 @@ func (h *Handler) UpdateConversation(w http.ResponseWriter, r *http.Request, p h
} }
} }
// Publish NATs // Publish NATs
if h.nc != nil { if h.nc != nil {
conversationString, err := json.Marshal(&conversation) conversationString, err := json.Marshal(&conversation)
if err == nil { if err == nil {
updateMsg := UpdateMsg { updateMsg := UpdateMsg{
Type: "update", Type: "update",
Data: string(conversationString), Data: string(conversationString),
} }
updateMsgString, err := json.Marshal(&updateMsg) updateMsgString, err := json.Marshal(&updateMsg)
if err == nil { if err == nil {
h.nc.Publish("conversation", updateMsgString) h.nc.Publish("conversation", updateMsgString)
} }
} }
} }
w.WriteHeader(200) w.WriteHeader(200)
} }
@ -266,23 +266,23 @@ func (h *Handler) DeleteConversation(w http.ResponseWriter, r *http.Request, p h
return return
} }
// Publish NATs // Publish NATs
if h.nc != nil { if h.nc != nil {
conversation := Conversation { conversation := Conversation{
ID: conversationID, ID: conversationID,
} }
conversationString, err := json.Marshal(&conversation) conversationString, err := json.Marshal(&conversation)
if err == nil { if err == nil {
updateMsg := UpdateMsg { updateMsg := UpdateMsg{
Type: "delete", Type: "delete",
Data: string(conversationString), Data: string(conversationString),
} }
updateMsgString, err := json.Marshal(&updateMsg) updateMsgString, err := json.Marshal(&updateMsg)
if err == nil { if err == nil {
h.nc.Publish("conversation", updateMsgString) h.nc.Publish("conversation", updateMsgString)
} }
} }
} }
w.WriteHeader(200) w.WriteHeader(200)
} }
@ -378,25 +378,25 @@ func (h *Handler) CreateConversationMember(w http.ResponseWriter, r *http.Reques
return return
} }
// Publish NATs // Publish NATs
if h.nc != nil { if h.nc != nil {
member := Member { member := Member{
User: member.ID, User: member.ID,
Conversation: conversationID, Conversation: conversationID,
Pinned: false, // default Pinned: false, // default
} }
memberString, err := json.Marshal(&member) memberString, err := json.Marshal(&member)
if err == nil { if err == nil {
updateMsg := UpdateMsg { updateMsg := UpdateMsg{
Type: "add", Type: "add",
Data: string(memberString), Data: string(memberString),
} }
updateMsgString, err := json.Marshal(&updateMsg) updateMsgString, err := json.Marshal(&updateMsg)
if err == nil { if err == nil {
h.nc.Publish("member", updateMsgString) h.nc.Publish("member", updateMsgString)
} }
} }
} }
// Respond // Respond
//w.Header().Set("Content-Type", "application/json") //w.Header().Set("Content-Type", "application/json")
@ -465,25 +465,25 @@ func (h *Handler) PinConversation(w http.ResponseWriter, r *http.Request, p http
return return
} }
// Publish NATs // Publish NATs
if h.nc != nil { if h.nc != nil {
member := Member { member := Member{
User: userID, User: userID,
Conversation: conversationID, Conversation: conversationID,
Pinned: true, Pinned: true,
} }
memberString, err := json.Marshal(&member) memberString, err := json.Marshal(&member)
if err == nil { if err == nil {
updateMsg := UpdateMsg { updateMsg := UpdateMsg{
Type: "update", Type: "update",
Data: string(memberString), Data: string(memberString),
} }
updateMsgString, err := json.Marshal(&updateMsg) updateMsgString, err := json.Marshal(&updateMsg)
if err == nil { if err == nil {
h.nc.Publish("member", updateMsgString) h.nc.Publish("member", updateMsgString)
} }
} }
} }
w.WriteHeader(200) w.WriteHeader(200)
} }

View File

@ -3,40 +3,40 @@ package main
import ( import (
"database/sql" "database/sql"
"github.com/nats-io/go-nats" "github.com/nats-io/go-nats"
) )
type Handler struct { type Handler struct {
db *sql.DB db *sql.DB
nc *nats.Conn nc *nats.Conn
contactConnections map[string]chan []byte contactConnections map[string]chan []byte
conversationConnections map[string]chan []byte conversationConnections map[string]chan []byte
userConnections map[string]chan []byte userConnections map[string]chan []byte
memberConnections map[string]map[string]chan []byte memberConnections map[string]map[string]chan []byte
} }
func NewHandler(db *sql.DB, nc *nats.Conn) *Handler { func NewHandler(db *sql.DB, nc *nats.Conn) *Handler {
contactConnections := make(map[string]chan []byte) contactConnections := make(map[string]chan []byte)
conversationConnections := make(map[string]chan []byte) conversationConnections := make(map[string]chan []byte)
userConnections := make(map[string]chan []byte) userConnections := make(map[string]chan []byte)
memberConnections := make(map[string]map[string]chan []byte) memberConnections := make(map[string]map[string]chan []byte)
h := &Handler{ h := &Handler{
db, db,
nc, nc,
contactConnections, contactConnections,
conversationConnections, conversationConnections,
userConnections, userConnections,
memberConnections, memberConnections,
} }
if nc != nil { if nc != nil {
nc.Subscribe("contacts", h.ContactHandler) nc.Subscribe("contacts", h.ContactHandler)
nc.Subscribe("conversations", h.ConversationHandler) nc.Subscribe("conversations", h.ConversationHandler)
nc.Subscribe("users", h.UserHandler) nc.Subscribe("users", h.UserHandler)
nc.Subscribe("members", h.MemberHandler) nc.Subscribe("members", h.MemberHandler)
} }
return h return h
} }

28
main.go
View File

@ -7,8 +7,8 @@ import (
"os" "os"
"github.com/joho/godotenv" "github.com/joho/godotenv"
"github.com/nats-io/go-nats"
_ "github.com/lib/pq" _ "github.com/lib/pq"
"github.com/nats-io/go-nats"
) )
var listen string var listen string
@ -27,8 +27,8 @@ func main() {
// Database // Database
db := connect() db := connect()
// NATs // NATs
nc := connectNats() nc := connectNats()
// Handler // Handler
h := NewHandler(db, nc) h := NewHandler(db, nc)
// Routes // Routes
@ -56,15 +56,15 @@ func connect() *sql.DB {
} }
func connectNats() *nats.Conn { func connectNats() *nats.Conn {
natsHost := os.Getenv("NATS") natsHost := os.Getenv("NATS")
var nc *nats.Conn var nc *nats.Conn
var err error var err error
if natsHost != "" { if natsHost != "" {
log.Printf("connecting to nats %s", natsHost) log.Printf("connecting to nats %s", natsHost)
nc, err = nats.Connect(natsHost) nc, err = nats.Connect(natsHost)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
} }
return nc return nc
} }

View File

@ -1,101 +1,101 @@
package main package main
import ( import (
"encoding/json" "encoding/json"
"log" "log"
"github.com/nats-io/go-nats" "github.com/nats-io/go-nats"
) )
func (h *Handler) ContactHandler(msg *nats.Msg) { func (h *Handler) ContactHandler(msg *nats.Msg) {
// Validate JSON // Validate JSON
updateMsg := UpdateMsg{} updateMsg := UpdateMsg{}
err := json.Unmarshal(msg.Data, &updateMsg) err := json.Unmarshal(msg.Data, &updateMsg)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return return
} }
contact := Contact{} contact := Contact{}
err = json.Unmarshal([]byte(updateMsg.Data), &contact) err = json.Unmarshal([]byte(updateMsg.Data), &contact)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return return
} }
// Transmit // Transmit
for _, conn := range h.contactConnections { for _, conn := range h.contactConnections {
conn <- msg.Data conn <- msg.Data
} }
} }
func (h *Handler) ConversationHandler(msg *nats.Msg) { func (h *Handler) ConversationHandler(msg *nats.Msg) {
// Validate JSON // Validate JSON
updateMsg := UpdateMsg{} updateMsg := UpdateMsg{}
err := json.Unmarshal(msg.Data, &updateMsg) err := json.Unmarshal(msg.Data, &updateMsg)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return return
} }
conversation := Conversation{} conversation := Conversation{}
err = json.Unmarshal([]byte(updateMsg.Data), &conversation) err = json.Unmarshal([]byte(updateMsg.Data), &conversation)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return return
} }
// Transmit // Transmit
for _, conn := range h.conversationConnections { for _, conn := range h.conversationConnections {
conn <- msg.Data conn <- msg.Data
} }
} }
func (h *Handler) UserHandler(msg *nats.Msg) { func (h *Handler) UserHandler(msg *nats.Msg) {
// Validate JSON // Validate JSON
updateMsg := UpdateMsg{} updateMsg := UpdateMsg{}
err := json.Unmarshal(msg.Data, &updateMsg) err := json.Unmarshal(msg.Data, &updateMsg)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return return
} }
user := User{} user := User{}
err = json.Unmarshal([]byte(updateMsg.Data), &user) err = json.Unmarshal([]byte(updateMsg.Data), &user)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return return
} }
// Transmit // Transmit
for _, conn := range h.userConnections { for _, conn := range h.userConnections {
conn <- msg.Data conn <- msg.Data
} }
} }
func (h *Handler) MemberHandler(msg *nats.Msg) { func (h *Handler) MemberHandler(msg *nats.Msg) {
// Validate JSON // Validate JSON
updateMsg := UpdateMsg{} updateMsg := UpdateMsg{}
err := json.Unmarshal(msg.Data, &updateMsg) err := json.Unmarshal(msg.Data, &updateMsg)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return return
} }
member := Member{} member := Member{}
err = json.Unmarshal([]byte(updateMsg.Data), &member) err = json.Unmarshal([]byte(updateMsg.Data), &member)
if err != nil { if err != nil {
log.Println(err) log.Println(err)
return return
} }
// Get transmit channel // Get transmit channel
if channels, ok := h.memberConnections[member.Conversation]; ok { if channels, ok := h.memberConnections[member.Conversation]; ok {
// Transmit // Transmit
for _, conn := range channels { for _, conn := range channels {
conn <- msg.Data conn <- msg.Data
} }
} else { } else {
log.Printf("member conversation %s not found\n", member.Conversation) log.Printf("member conversation %s not found\n", member.Conversation)
} }
} }

View File

@ -38,11 +38,11 @@ func NewRouter(h *Handler) *httprouter.Router {
//router.DELETE("/user/:user/contact/:contact", h.DeleteContact) //router.DELETE("/user/:user/contact/:contact", h.DeleteContact)
//router.GET("/user/:user/contact/:contact/conversation/", h.GetContactConversations) //router.GET("/user/:user/contact/:contact/conversation/", h.GetContactConversations)
// Subscribe // Subscribe
router.GET("/user/subscribe/contact", AuthMiddleware(h.SubscribeContact)) router.GET("/user/subscribe/contact", AuthMiddleware(h.SubscribeContact))
router.GET("/user/subscribe/conversation", AuthMiddleware(h.SubscribeConversation)) router.GET("/user/subscribe/conversation", AuthMiddleware(h.SubscribeConversation))
router.GET("/user/subscribe", AuthMiddleware(h.SubscribeUser)) router.GET("/user/subscribe", AuthMiddleware(h.SubscribeUser))
router.GET("/user/subscribe/conversation/:conversation/member", AuthMiddleware(h.SubscribeMember)) router.GET("/user/subscribe/conversation/:conversation/member", AuthMiddleware(h.SubscribeMember))
return router return router
} }

View File

@ -1,64 +1,64 @@
package main package main
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"time" "time"
"github.com/julienschmidt/httprouter" "github.com/julienschmidt/httprouter"
) )
func (h *Handler) SubscribeContact(w http.ResponseWriter, r *http.Request, p httprouter.Params) { func (h *Handler) SubscribeContact(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
Subscribe(h.contactConnections, w, r, p) Subscribe(h.contactConnections, w, r, p)
} }
func (h *Handler) SubscribeConversation(w http.ResponseWriter, r *http.Request, p httprouter.Params) { func (h *Handler) SubscribeConversation(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
Subscribe(h.conversationConnections, w, r, p) Subscribe(h.conversationConnections, w, r, p)
} }
func (h *Handler) SubscribeUser(w http.ResponseWriter, r *http.Request, p httprouter.Params) { func (h *Handler) SubscribeUser(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
Subscribe(h.userConnections, w, r, p) Subscribe(h.userConnections, w, r, p)
} }
func (h *Handler) SubscribeMember(w http.ResponseWriter, r *http.Request, p httprouter.Params) { func (h *Handler) SubscribeMember(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
conversation := p.ByName("conversation") conversation := p.ByName("conversation")
if _, ok := h.memberConnections[conversation]; !ok { if _, ok := h.memberConnections[conversation]; !ok {
h.memberConnections[conversation] = make(map[string]chan []byte) h.memberConnections[conversation] = make(map[string]chan []byte)
} }
Subscribe(h.memberConnections[conversation], w, r, p) Subscribe(h.memberConnections[conversation], w, r, p)
} }
func Subscribe(channels map[string]chan []byte, w http.ResponseWriter, r *http.Request, p httprouter.Params) { func Subscribe(channels map[string]chan []byte, w http.ResponseWriter, r *http.Request, p httprouter.Params) {
flusher, ok := w.(http.Flusher) flusher, ok := w.(http.Flusher)
if !ok { if !ok {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return return
} }
w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive") w.Header().Set("Connection", "keep-alive")
id := RandomHex() id := RandomHex()
recv := make(chan []byte) recv := make(chan []byte)
channels[id] = recv channels[id] = recv
// Refresh connection periodically // Refresh connection periodically
resClosed := w.(http.CloseNotifier).CloseNotify() resClosed := w.(http.CloseNotifier).CloseNotify()
ticker := time.NewTicker(25 * time.Second) ticker := time.NewTicker(25 * time.Second)
for { for {
select { select {
case msg := <-recv: case msg := <-recv:
fmt.Fprintf(w, "data: %s\n\n", msg) fmt.Fprintf(w, "data: %s\n\n", msg)
flusher.Flush() flusher.Flush()
case <- ticker.C: case <-ticker.C:
w.Write([]byte(":\n\n")) w.Write([]byte(":\n\n"))
case <- resClosed: case <-resClosed:
ticker.Stop() ticker.Stop()
delete(channels, id) delete(channels, id)
return return
} }
} }
} }

View File

@ -3,19 +3,19 @@ package main
// String pointer means nullable // String pointer means nullable
type UpdateMsg struct { type UpdateMsg struct {
Type string `json:"type"` Type string `json:"type"`
Data string `json:"data"` Data string `json:"data"`
} }
type Contact struct { type Contact struct {
UserA string `json:"usera"` // First user ID UserA string `json:"usera"` // First user ID
UserB string `json:"userb"` // Second user ID UserB string `json:"userb"` // Second user ID
} }
type Member struct { type Member struct {
User string `json:"user"` User string `json:"user"`
Conversation string `json:"conversation"` Conversation string `json:"conversation"`
Pinned bool `json:"pinned"` Pinned bool `json:"pinned"`
} }
type Conversation struct { type Conversation struct {

56
user.go
View File

@ -50,20 +50,20 @@ func (h *Handler) CreateUser(w http.ResponseWriter, r *http.Request, _ httproute
} }
user.ID = finalId user.ID = finalId
// Publish NATs // Publish NATs
if h.nc != nil { if h.nc != nil {
userString, err := json.Marshal(&user) userString, err := json.Marshal(&user)
if err == nil { if err == nil {
updateMsg := UpdateMsg { updateMsg := UpdateMsg{
Type: "add", Type: "add",
Data: string(userString), Data: string(userString),
} }
updateMsgString, err := json.Marshal(&updateMsg) updateMsgString, err := json.Marshal(&updateMsg)
if err == nil { if err == nil {
h.nc.Publish("user", updateMsgString) h.nc.Publish("user", updateMsgString)
} }
} }
} }
// Respond // Respond
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
@ -181,20 +181,20 @@ func (h *Handler) UpdateUser(w http.ResponseWriter, r *http.Request, p httproute
return return
} }
// Publish NATs // Publish NATs
if h.nc != nil { if h.nc != nil {
userString, err := json.Marshal(&user) userString, err := json.Marshal(&user)
if err == nil { if err == nil {
updateMsg := UpdateMsg { updateMsg := UpdateMsg{
Type: "update", Type: "update",
Data: string(userString), Data: string(userString),
} }
updateMsgString, err := json.Marshal(&updateMsg) updateMsgString, err := json.Marshal(&updateMsg)
if err == nil { if err == nil {
h.nc.Publish("user", updateMsgString) h.nc.Publish("user", updateMsgString)
} }
} }
} }
w.WriteHeader(200) w.WriteHeader(200)
} }