From 62c3a00c567b77cc92c0a9f325868caf57e80c04 Mon Sep 17 00:00:00 2001 From: Ambrose Chua Date: Sat, 27 Jul 2019 16:37:23 +0800 Subject: [PATCH] fix: Hardcode ICE servers --- backend-protobuf | 2 +- iceservers.txt | 1 - main.go | 489 +++++++++++++++++++++++------------------------ 3 files changed, 242 insertions(+), 250 deletions(-) delete mode 100644 iceservers.txt diff --git a/backend-protobuf b/backend-protobuf index a36ddf9..10e363e 160000 --- a/backend-protobuf +++ b/backend-protobuf @@ -1 +1 @@ -Subproject commit a36ddf9a8106e69a2ad1c4a42fc78ed8765fd23e +Subproject commit 10e363e61b36d754351fc530f05b583152d62b04 diff --git a/iceservers.txt b/iceservers.txt deleted file mode 100644 index ba42212..0000000 --- a/iceservers.txt +++ /dev/null @@ -1 +0,0 @@ -stun:stun.l.google.com:19302 diff --git a/main.go b/main.go index 70f5d57..95e954a 100644 --- a/main.go +++ b/main.go @@ -1,24 +1,23 @@ package main import ( - "context" - "encoding/json" - "io" - "io/ioutil" - "log" - "net/http" - "os" - "strings" - "time" + "context" + "encoding/json" + "io" + "log" + "net/http" + "os" + "strings" + "time" - . "webrtc/backend-protobuf/go" + . "webrtc/backend-protobuf/go" - "github.com/joho/godotenv" - "github.com/julienschmidt/httprouter" - "github.com/pion/webrtc/v2" - "github.com/gorilla/websocket" - "github.com/golang/protobuf/proto" - "github.com/nats-io/go-nats" + "github.com/golang/protobuf/proto" + "github.com/gorilla/websocket" + "github.com/joho/godotenv" + "github.com/julienschmidt/httprouter" + "github.com/nats-io/go-nats" + "github.com/pion/webrtc/v2" ) // Peer config @@ -32,286 +31,280 @@ var upgrader websocket.Upgrader var mediaEngine webrtc.MediaEngine var webrtcApi *webrtc.API -var userTracks map[string] map[string] *webrtc.Track // userid + clientid -var conversationUsers map[string] []string -var userConversation map[string] string +var userTracks map[string]map[string]*webrtc.Track // userid + clientid +var conversationUsers map[string][]string +var userConversation map[string]string var natsConn *nats.Conn func main() { - // Load .env - err := godotenv.Load() - if err != nil { - log.Fatal("Error loading .env file") - } - listen = os.Getenv("LISTEN") - natsHost = os.Getenv("NATS") - permissionsHost = os.Getenv("PERMISSIONS_HOST") + // Load .env + err := godotenv.Load() + if err != nil { + log.Fatal("Error loading .env file") + } + listen = os.Getenv("LISTEN") + natsHost = os.Getenv("NATS") + permissionsHost = os.Getenv("PERMISSIONS_HOST") - upgrader = websocket.Upgrader{} + upgrader = websocket.Upgrader{} - mediaEngine = webrtc.MediaEngine{} - mediaEngine.RegisterCodec(webrtc.NewRTPOpusCodec(webrtc.DefaultPayloadTypeOpus, 48000)) - webrtcApi = webrtc.NewAPI(webrtc.WithMediaEngine(mediaEngine)) + mediaEngine = webrtc.MediaEngine{} + mediaEngine.RegisterCodec(webrtc.NewRTPOpusCodec(webrtc.DefaultPayloadTypeOpus, 48000)) + webrtcApi = webrtc.NewAPI(webrtc.WithMediaEngine(mediaEngine)) - // Read ICE servers - fileBytes, err := ioutil.ReadFile("iceservers.txt") - if err != nil { - log.Fatal("error opening ice servers file") - } - fileString := string(fileBytes) - servers := strings.Split(fileString, `\n`) + // Read ICE servers + peerConnectionConfig = webrtc.Configuration{ + ICEServers: []webrtc.ICEServer{ + { + URLs: []string{"stun:stun.l.google.com:19302"}, + }, + }, + SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback, + } - peerConnectionConfig = webrtc.Configuration{ - ICEServers: []webrtc.ICEServer{ - { - URLs: servers, - }, - }, - SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback, - } + userTracks = make(map[string]map[string]*webrtc.Track) + conversationUsers = make(map[string][]string) + userConversation = make(map[string]string) - userTracks = make(map[string] map[string] *webrtc.Track) - conversationUsers = make(map[string] []string) - userConversation = make(map[string] string) + // NATs client + natsConn, err := nats.Connect(natsHost) + if err != nil { + log.Println(err) + return + } + defer natsConn.Close() - // NATs client - natsConn, err := nats.Connect(natsHost) - if err != nil { - log.Println(err) - return - } - defer natsConn.Close() - - // Routes + // Routes router := httprouter.New() - router.GET("/connect", GetAuth(NewConnection)) - router.POST("/join/:conversationid", GetAuth(JoinConversation)) + router.GET("/connect", GetAuth(NewConnection)) + router.POST("/join/:conversationid", GetAuth(JoinConversation)) - // Start server - log.Printf("starting server on %s", listen) + // Start server + log.Printf("starting server on %s", listen) log.Fatal(http.ListenAndServe(listen, router)) } type RawClient struct { - UserId string `json:"userid"` - ClientId string `json:"clientid"` + UserId string `json:"userid"` + ClientId string `json:"clientid"` } + func GetAuth(next httprouter.Handle) httprouter.Handle { - return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { - ua := r.Header.Get("X-User-Claim") - if ua == "" { - http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) - return - } + return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + ua := r.Header.Get("X-User-Claim") + if ua == "" { + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + return + } - var client RawClient - err := json.Unmarshal([]byte(ua), &client) - if err != nil { - http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) - return - } + var client RawClient + err := json.Unmarshal([]byte(ua), &client) + if err != nil { + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + return + } - if client.UserId == "" || client.ClientId == "" { - http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) - return - } + if client.UserId == "" || client.ClientId == "" { + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + return + } - context := context.WithValue(r.Context(), "user", client) - next(w, r.WithContext(context), p) - } + context := context.WithValue(r.Context(), "user", client) + next(w, r.WithContext(context), p) + } } func NewConnection(w http.ResponseWriter, r *http.Request, p httprouter.Params) { - // Get user id - user := r.Context().Value("user").(RawClient) + // Get user id + user := r.Context().Value("user").(RawClient) - // Websocket client + // Websocket client c, err := upgrader.Upgrade(w, r, nil) if err != nil { - http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) - return - } + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } - // Read SDP from websocket + // Read SDP from websocket mt, msg, err := c.ReadMessage() - if err != nil { - http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) - return - } - offerSDP := strings.Join(strings.Split(string(msg), "::")[1:], "::") + if err != nil { + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } + offerSDP := strings.Join(strings.Split(string(msg), "::")[1:], "::") - // Establish connection (it's a publisher) - clientReceiver, err := webrtcApi.NewPeerConnection(peerConnectionConfig) - if err != nil { - log.Printf("%s", err) - http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) - return - } + // Establish connection (it's a publisher) + clientReceiver, err := webrtcApi.NewPeerConnection(peerConnectionConfig) + if err != nil { + log.Printf("%s", err) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } - // Spawn thread to wait for ice candidates - go func() { - for { - _, msg, err := c.ReadMessage() - iceCandidate := strings.Join(strings.Split(string(msg), "::")[1:], "::") - if err == nil { - clientReceiver.AddICECandidate(webrtc.ICECandidateInit { - Candidate: iceCandidate, - }) - } - } - }() + // Spawn thread to wait for ice candidates + go func() { + for { + _, msg, err := c.ReadMessage() + iceCandidate := strings.Join(strings.Split(string(msg), "::")[1:], "::") + if err == nil { + clientReceiver.AddICECandidate(webrtc.ICECandidateInit{ + Candidate: iceCandidate, + }) + } + } + }() - // Handle ICE candidates - clientReceiver.OnICECandidate(func(candidate *webrtc.ICECandidate) { - _ = c.WriteMessage(websocket.TextMessage, []byte("ice::" + candidate.String())) - }) + // Handle ICE candidates + clientReceiver.OnICECandidate(func(candidate *webrtc.ICECandidate) { + _ = c.WriteMessage(websocket.TextMessage, []byte("ice::"+candidate.String())) + }) - _, err = clientReceiver.AddTransceiver(webrtc.RTPCodecTypeAudio) - if err != nil { - log.Printf("%s", err) - http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) - return - } + _, err = clientReceiver.AddTransceiver(webrtc.RTPCodecTypeAudio) + if err != nil { + log.Printf("%s", err) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } - // Handle OnTrack - clientReceiver.OnTrack(func(remoteTrack *webrtc.Track, receiver *webrtc.RTPReceiver) { - rtpBuf := make([]byte, 1400) - for { - i, err := remoteTrack.Read(rtpBuf) - if err != nil { - log.Printf("%s", err) - break - } + // Handle OnTrack + clientReceiver.OnTrack(func(remoteTrack *webrtc.Track, receiver *webrtc.RTPReceiver) { + rtpBuf := make([]byte, 1400) + for { + i, err := remoteTrack.Read(rtpBuf) + if err != nil { + log.Printf("%s", err) + break + } - if conversationId, ok := userConversation[user.UserId]; ok { - if users, ok2 := conversationUsers[conversationId]; ok2 { - for _, u := range users { - if clients, ok3 := userTracks[u]; ok3 { - for client, track := range clients { - if !(u == user.UserId && client == user.ClientId) { - _, err = track.Write(rtpBuf[:i]) - if !(err == io.ErrClosedPipe || err == nil) { - log.Printf("%s", err) - break - } - } - } - } - } - } + if conversationId, ok := userConversation[user.UserId]; ok { + if users, ok2 := conversationUsers[conversationId]; ok2 { + for _, u := range users { + if clients, ok3 := userTracks[u]; ok3 { + for client, track := range clients { + if !(u == user.UserId && client == user.ClientId) { + _, err = track.Write(rtpBuf[:i]) + if !(err == io.ErrClosedPipe || err == nil) { + log.Printf("%s", err) + break + } + } + } + } + } + } - start := time.Now().Unix() - bite := Bite { - Start: uint64(start), - Key: conversationId, - Data: rtpBuf[:i], - } - biteOut, err := proto.Marshal(&bite) - if err != nil { - log.Printf("%s", err) - } else { - natsConn.Publish("bite", biteOut) - } + start := time.Now().Unix() + bite := Bite{ + Start: uint64(start), + Key: conversationId, + Data: rtpBuf[:i], + } + biteOut, err := proto.Marshal(&bite) + if err != nil { + log.Printf("%s", err) + } else { + natsConn.Publish("bite", biteOut) + } - store := Store { - Type: "bite", - Bite: &bite, - } + store := Store{ + Type: "bite", + Bite: &bite, + } - storeOut, err := proto.Marshal(&store) - if err != nil { - log.Printf("%s", err) - } else { - natsConn.Publish("store", storeOut) - } - } - } - }) + storeOut, err := proto.Marshal(&store) + if err != nil { + log.Printf("%s", err) + } else { + natsConn.Publish("store", storeOut) + } + } + } + }) - // Add sending track - var track *webrtc.Track = &webrtc.Track{} - _, err = clientReceiver.AddTrack(track) - if err != nil { - log.Printf("%s", err) - http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) - return - } - userTracks[user.UserId][user.ClientId] = track + // Add sending track + var track *webrtc.Track = &webrtc.Track{} + _, err = clientReceiver.AddTrack(track) + if err != nil { + log.Printf("%s", err) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } + userTracks[user.UserId][user.ClientId] = track - // Do signalling things - err = clientReceiver.SetRemoteDescription( - webrtc.SessionDescription { - SDP: offerSDP, - Type: webrtc.SDPTypeOffer, - }) - if err != nil { - log.Printf("%s", err) - http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) - return - } + // Do signalling things + err = clientReceiver.SetRemoteDescription( + webrtc.SessionDescription{ + SDP: offerSDP, + Type: webrtc.SDPTypeOffer, + }) + if err != nil { + log.Printf("%s", err) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } - answer, err := clientReceiver.CreateAnswer(nil) - if err != nil { - log.Printf("%s", err) - http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) - return - } + answer, err := clientReceiver.CreateAnswer(nil) + if err != nil { + log.Printf("%s", err) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } - err = clientReceiver.SetLocalDescription(answer) - if err != nil { - log.Printf("%s", err) - http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) - return - } + err = clientReceiver.SetLocalDescription(answer) + if err != nil { + log.Printf("%s", err) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } - err = c.WriteMessage(mt, []byte("answer::" + answer.SDP)) - if err != nil { - log.Printf("%s", err) - http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) - return - } + err = c.WriteMessage(mt, []byte("answer::"+answer.SDP)) + if err != nil { + log.Printf("%s", err) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } } func JoinConversation(w http.ResponseWriter, r *http.Request, p httprouter.Params) { - // Get user id - user := r.Context().Value("user").(RawClient) - // Get conversation id - conversationId := p.ByName("conversationid") - if conversationId == "" { - http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) - return - } + // Get user id + user := r.Context().Value("user").(RawClient) + // Get conversation id + conversationId := p.ByName("conversationid") + if conversationId == "" { + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + return + } - // Check permissions from backend-permissions - response, err := http.Get(permissionsHost + "/user/" + user.UserId + "/conversation/" + conversationId) - if err != nil { - http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) - return - } - response.Body.Close() + // Check permissions from backend-permissions + response, err := http.Get(permissionsHost + "/user/" + user.UserId + "/conversation/" + conversationId) + if err != nil { + http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) + return + } + response.Body.Close() - // Remove user from existing conversation - if oldConversation, ok := userConversation[user.UserId]; ok { - if users, ok2 := conversationUsers[oldConversation]; ok2 { - var lastIndex int - for i, u := range users { - if u == user.UserId { - lastIndex = i - break; - } - } - users[lastIndex] = users[len(users) - 1] - conversationUsers[oldConversation] = users[:len(users)-1] - } - } + // Remove user from existing conversation + if oldConversation, ok := userConversation[user.UserId]; ok { + if users, ok2 := conversationUsers[oldConversation]; ok2 { + var lastIndex int + for i, u := range users { + if u == user.UserId { + lastIndex = i + break + } + } + users[lastIndex] = users[len(users)-1] + conversationUsers[oldConversation] = users[:len(users)-1] + } + } - // Populate new values - userConversation[user.UserId] = conversationId - if _, ok := conversationUsers[conversationId]; !ok { - conversationUsers[conversationId] = make([]string, 0) - } - conversationUsers[conversationId] = append(conversationUsers[conversationId], user.UserId) + // Populate new values + userConversation[user.UserId] = conversationId + if _, ok := conversationUsers[conversationId]; !ok { + conversationUsers[conversationId] = make([]string, 0) + } + conversationUsers[conversationId] = append(conversationUsers[conversationId], user.UserId) - w.WriteHeader(200) + w.WriteHeader(200) }