5
0
Fork 0

Fixed messed up SSE implementation

master
UnicodingUnicorn 2019-04-02 12:11:12 +08:00
parent 23306b37e9
commit c864ab0d0a
1 changed files with 16 additions and 4 deletions

20
main.go
View File

@ -1,6 +1,8 @@
package main package main
import ( import (
"crypto/rand"
"encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log" "log"
@ -27,7 +29,7 @@ type Ping struct {
Status string `json:"status"` Status string `json:"status"`
} }
var connections map[string][]chan []byte var connections map[string]map[string] chan []byte
var redisClient *redis.Client var redisClient *redis.Client
func main() { func main() {
@ -39,7 +41,7 @@ func main() {
listen = os.Getenv("LISTEN") listen = os.Getenv("LISTEN")
redisHost = os.Getenv("REDIS") redisHost = os.Getenv("REDIS")
connections = make(map[string][]chan []byte) connections = make(map[string]map[string] chan []byte)
// Redis // Redis
redisClient = redis.NewClient(&redis.Options{ redisClient = redis.NewClient(&redis.Options{
@ -58,6 +60,15 @@ func main() {
log.Fatal(http.ListenAndServe(listen, router)) log.Fatal(http.ListenAndServe(listen, router))
} }
func RandomHex() string {
b := make([]byte, 16)
_, err := rand.Read(b)
if err != nil {
panic("unable to generate 16 bytes of randomness")
}
return hex.EncodeToString(b)
}
func Subscribe(w http.ResponseWriter, r *http.Request, p httprouter.Params) { func Subscribe(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
flusher, ok := w.(http.Flusher) flusher, ok := w.(http.Flusher)
if !ok { if !ok {
@ -72,7 +83,8 @@ func Subscribe(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
userId := p.ByName("userid") userId := p.ByName("userid")
recv := make(chan []byte) recv := make(chan []byte)
connections[userId] = append(connections[userId], recv); chanId := RandomHex()
connections[userId][chanId] = recv
// Refresh connection periodically // Refresh connection periodically
resClosed := w.(http.CloseNotifier).CloseNotify() resClosed := w.(http.CloseNotifier).CloseNotify()
@ -102,7 +114,7 @@ func Subscribe(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
w.Write([]byte(":\n\n")) w.Write([]byte(":\n\n"))
case <- resClosed: case <- resClosed:
ticker.Stop() ticker.Stop()
delete(connections, userId) delete(connections[userId], chanId)
return return
} }
} }