5
0
Fork 0

fix: Hardcode ICE servers

pull/6/head
Ambrose Chua 2019-07-27 16:37:23 +08:00
parent 3833e6cc26
commit 62c3a00c56
Signed by: ambrose
GPG Key ID: B34FBE029276BA5D
3 changed files with 242 additions and 250 deletions

@ -1 +1 @@
Subproject commit a36ddf9a8106e69a2ad1c4a42fc78ed8765fd23e Subproject commit 10e363e61b36d754351fc530f05b583152d62b04

View File

@ -1 +0,0 @@
stun:stun.l.google.com:19302

489
main.go
View File

@ -1,24 +1,23 @@
package main package main
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"io" "io"
"io/ioutil" "log"
"log" "net/http"
"net/http" "os"
"os" "strings"
"strings" "time"
"time"
. "webrtc/backend-protobuf/go" . "webrtc/backend-protobuf/go"
"github.com/joho/godotenv" "github.com/golang/protobuf/proto"
"github.com/julienschmidt/httprouter" "github.com/gorilla/websocket"
"github.com/pion/webrtc/v2" "github.com/joho/godotenv"
"github.com/gorilla/websocket" "github.com/julienschmidt/httprouter"
"github.com/golang/protobuf/proto" "github.com/nats-io/go-nats"
"github.com/nats-io/go-nats" "github.com/pion/webrtc/v2"
) )
// Peer config // Peer config
@ -32,286 +31,280 @@ var upgrader websocket.Upgrader
var mediaEngine webrtc.MediaEngine var mediaEngine webrtc.MediaEngine
var webrtcApi *webrtc.API var webrtcApi *webrtc.API
var userTracks map[string] map[string] *webrtc.Track // userid + clientid var userTracks map[string]map[string]*webrtc.Track // userid + clientid
var conversationUsers map[string] []string var conversationUsers map[string][]string
var userConversation map[string] string var userConversation map[string]string
var natsConn *nats.Conn var natsConn *nats.Conn
func main() { func main() {
// Load .env // Load .env
err := godotenv.Load() err := godotenv.Load()
if err != nil { if err != nil {
log.Fatal("Error loading .env file") log.Fatal("Error loading .env file")
} }
listen = os.Getenv("LISTEN") listen = os.Getenv("LISTEN")
natsHost = os.Getenv("NATS") natsHost = os.Getenv("NATS")
permissionsHost = os.Getenv("PERMISSIONS_HOST") permissionsHost = os.Getenv("PERMISSIONS_HOST")
upgrader = websocket.Upgrader{} upgrader = websocket.Upgrader{}
mediaEngine = webrtc.MediaEngine{} mediaEngine = webrtc.MediaEngine{}
mediaEngine.RegisterCodec(webrtc.NewRTPOpusCodec(webrtc.DefaultPayloadTypeOpus, 48000)) mediaEngine.RegisterCodec(webrtc.NewRTPOpusCodec(webrtc.DefaultPayloadTypeOpus, 48000))
webrtcApi = webrtc.NewAPI(webrtc.WithMediaEngine(mediaEngine)) webrtcApi = webrtc.NewAPI(webrtc.WithMediaEngine(mediaEngine))
// Read ICE servers // Read ICE servers
fileBytes, err := ioutil.ReadFile("iceservers.txt") peerConnectionConfig = webrtc.Configuration{
if err != nil { ICEServers: []webrtc.ICEServer{
log.Fatal("error opening ice servers file") {
} URLs: []string{"stun:stun.l.google.com:19302"},
fileString := string(fileBytes) },
servers := strings.Split(fileString, `\n`) },
SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback,
}
peerConnectionConfig = webrtc.Configuration{ userTracks = make(map[string]map[string]*webrtc.Track)
ICEServers: []webrtc.ICEServer{ conversationUsers = make(map[string][]string)
{ userConversation = make(map[string]string)
URLs: servers,
},
},
SDPSemantics: webrtc.SDPSemanticsUnifiedPlanWithFallback,
}
userTracks = make(map[string] map[string] *webrtc.Track) // NATs client
conversationUsers = make(map[string] []string) natsConn, err := nats.Connect(natsHost)
userConversation = make(map[string] string) if err != nil {
log.Println(err)
return
}
defer natsConn.Close()
// NATs client // Routes
natsConn, err := nats.Connect(natsHost)
if err != nil {
log.Println(err)
return
}
defer natsConn.Close()
// Routes
router := httprouter.New() router := httprouter.New()
router.GET("/connect", GetAuth(NewConnection)) router.GET("/connect", GetAuth(NewConnection))
router.POST("/join/:conversationid", GetAuth(JoinConversation)) router.POST("/join/:conversationid", GetAuth(JoinConversation))
// Start server // Start server
log.Printf("starting server on %s", listen) log.Printf("starting server on %s", listen)
log.Fatal(http.ListenAndServe(listen, router)) log.Fatal(http.ListenAndServe(listen, router))
} }
type RawClient struct { type RawClient struct {
UserId string `json:"userid"` UserId string `json:"userid"`
ClientId string `json:"clientid"` ClientId string `json:"clientid"`
} }
func GetAuth(next httprouter.Handle) httprouter.Handle { func GetAuth(next httprouter.Handle) httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
ua := r.Header.Get("X-User-Claim") ua := r.Header.Get("X-User-Claim")
if ua == "" { if ua == "" {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return return
} }
var client RawClient var client RawClient
err := json.Unmarshal([]byte(ua), &client) err := json.Unmarshal([]byte(ua), &client)
if err != nil { if err != nil {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return return
} }
if client.UserId == "" || client.ClientId == "" { if client.UserId == "" || client.ClientId == "" {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return return
} }
context := context.WithValue(r.Context(), "user", client) context := context.WithValue(r.Context(), "user", client)
next(w, r.WithContext(context), p) next(w, r.WithContext(context), p)
} }
} }
func NewConnection(w http.ResponseWriter, r *http.Request, p httprouter.Params) { func NewConnection(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
// Get user id // Get user id
user := r.Context().Value("user").(RawClient) user := r.Context().Value("user").(RawClient)
// Websocket client // Websocket client
c, err := upgrader.Upgrade(w, r, nil) c, err := upgrader.Upgrade(w, r, nil)
if err != nil { if err != nil {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return return
} }
// Read SDP from websocket // Read SDP from websocket
mt, msg, err := c.ReadMessage() mt, msg, err := c.ReadMessage()
if err != nil { if err != nil {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return return
} }
offerSDP := strings.Join(strings.Split(string(msg), "::")[1:], "::") offerSDP := strings.Join(strings.Split(string(msg), "::")[1:], "::")
// Establish connection (it's a publisher) // Establish connection (it's a publisher)
clientReceiver, err := webrtcApi.NewPeerConnection(peerConnectionConfig) clientReceiver, err := webrtcApi.NewPeerConnection(peerConnectionConfig)
if err != nil { if err != nil {
log.Printf("%s", err) log.Printf("%s", err)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return return
} }
// Spawn thread to wait for ice candidates // Spawn thread to wait for ice candidates
go func() { go func() {
for { for {
_, msg, err := c.ReadMessage() _, msg, err := c.ReadMessage()
iceCandidate := strings.Join(strings.Split(string(msg), "::")[1:], "::") iceCandidate := strings.Join(strings.Split(string(msg), "::")[1:], "::")
if err == nil { if err == nil {
clientReceiver.AddICECandidate(webrtc.ICECandidateInit { clientReceiver.AddICECandidate(webrtc.ICECandidateInit{
Candidate: iceCandidate, Candidate: iceCandidate,
}) })
} }
} }
}() }()
// Handle ICE candidates // Handle ICE candidates
clientReceiver.OnICECandidate(func(candidate *webrtc.ICECandidate) { clientReceiver.OnICECandidate(func(candidate *webrtc.ICECandidate) {
_ = c.WriteMessage(websocket.TextMessage, []byte("ice::" + candidate.String())) _ = c.WriteMessage(websocket.TextMessage, []byte("ice::"+candidate.String()))
}) })
_, err = clientReceiver.AddTransceiver(webrtc.RTPCodecTypeAudio) _, err = clientReceiver.AddTransceiver(webrtc.RTPCodecTypeAudio)
if err != nil { if err != nil {
log.Printf("%s", err) log.Printf("%s", err)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return return
} }
// Handle OnTrack // Handle OnTrack
clientReceiver.OnTrack(func(remoteTrack *webrtc.Track, receiver *webrtc.RTPReceiver) { clientReceiver.OnTrack(func(remoteTrack *webrtc.Track, receiver *webrtc.RTPReceiver) {
rtpBuf := make([]byte, 1400) rtpBuf := make([]byte, 1400)
for { for {
i, err := remoteTrack.Read(rtpBuf) i, err := remoteTrack.Read(rtpBuf)
if err != nil { if err != nil {
log.Printf("%s", err) log.Printf("%s", err)
break break
} }
if conversationId, ok := userConversation[user.UserId]; ok { if conversationId, ok := userConversation[user.UserId]; ok {
if users, ok2 := conversationUsers[conversationId]; ok2 { if users, ok2 := conversationUsers[conversationId]; ok2 {
for _, u := range users { for _, u := range users {
if clients, ok3 := userTracks[u]; ok3 { if clients, ok3 := userTracks[u]; ok3 {
for client, track := range clients { for client, track := range clients {
if !(u == user.UserId && client == user.ClientId) { if !(u == user.UserId && client == user.ClientId) {
_, err = track.Write(rtpBuf[:i]) _, err = track.Write(rtpBuf[:i])
if !(err == io.ErrClosedPipe || err == nil) { if !(err == io.ErrClosedPipe || err == nil) {
log.Printf("%s", err) log.Printf("%s", err)
break break
} }
} }
} }
} }
} }
} }
start := time.Now().Unix() start := time.Now().Unix()
bite := Bite { bite := Bite{
Start: uint64(start), Start: uint64(start),
Key: conversationId, Key: conversationId,
Data: rtpBuf[:i], Data: rtpBuf[:i],
} }
biteOut, err := proto.Marshal(&bite) biteOut, err := proto.Marshal(&bite)
if err != nil { if err != nil {
log.Printf("%s", err) log.Printf("%s", err)
} else { } else {
natsConn.Publish("bite", biteOut) natsConn.Publish("bite", biteOut)
} }
store := Store { store := Store{
Type: "bite", Type: "bite",
Bite: &bite, Bite: &bite,
} }
storeOut, err := proto.Marshal(&store) storeOut, err := proto.Marshal(&store)
if err != nil { if err != nil {
log.Printf("%s", err) log.Printf("%s", err)
} else { } else {
natsConn.Publish("store", storeOut) natsConn.Publish("store", storeOut)
} }
} }
} }
}) })
// Add sending track // Add sending track
var track *webrtc.Track = &webrtc.Track{} var track *webrtc.Track = &webrtc.Track{}
_, err = clientReceiver.AddTrack(track) _, err = clientReceiver.AddTrack(track)
if err != nil { if err != nil {
log.Printf("%s", err) log.Printf("%s", err)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return return
} }
userTracks[user.UserId][user.ClientId] = track userTracks[user.UserId][user.ClientId] = track
// Do signalling things // Do signalling things
err = clientReceiver.SetRemoteDescription( err = clientReceiver.SetRemoteDescription(
webrtc.SessionDescription { webrtc.SessionDescription{
SDP: offerSDP, SDP: offerSDP,
Type: webrtc.SDPTypeOffer, Type: webrtc.SDPTypeOffer,
}) })
if err != nil { if err != nil {
log.Printf("%s", err) log.Printf("%s", err)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return return
} }
answer, err := clientReceiver.CreateAnswer(nil) answer, err := clientReceiver.CreateAnswer(nil)
if err != nil { if err != nil {
log.Printf("%s", err) log.Printf("%s", err)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return return
} }
err = clientReceiver.SetLocalDescription(answer) err = clientReceiver.SetLocalDescription(answer)
if err != nil { if err != nil {
log.Printf("%s", err) log.Printf("%s", err)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return return
} }
err = c.WriteMessage(mt, []byte("answer::" + answer.SDP)) err = c.WriteMessage(mt, []byte("answer::"+answer.SDP))
if err != nil { if err != nil {
log.Printf("%s", err) log.Printf("%s", err)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return return
} }
} }
func JoinConversation(w http.ResponseWriter, r *http.Request, p httprouter.Params) { func JoinConversation(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
// Get user id // Get user id
user := r.Context().Value("user").(RawClient) user := r.Context().Value("user").(RawClient)
// Get conversation id // Get conversation id
conversationId := p.ByName("conversationid") conversationId := p.ByName("conversationid")
if conversationId == "" { if conversationId == "" {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return return
} }
// Check permissions from backend-permissions // Check permissions from backend-permissions
response, err := http.Get(permissionsHost + "/user/" + user.UserId + "/conversation/" + conversationId) response, err := http.Get(permissionsHost + "/user/" + user.UserId + "/conversation/" + conversationId)
if err != nil { if err != nil {
http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
return return
} }
response.Body.Close() response.Body.Close()
// Remove user from existing conversation // Remove user from existing conversation
if oldConversation, ok := userConversation[user.UserId]; ok { if oldConversation, ok := userConversation[user.UserId]; ok {
if users, ok2 := conversationUsers[oldConversation]; ok2 { if users, ok2 := conversationUsers[oldConversation]; ok2 {
var lastIndex int var lastIndex int
for i, u := range users { for i, u := range users {
if u == user.UserId { if u == user.UserId {
lastIndex = i lastIndex = i
break; break
} }
} }
users[lastIndex] = users[len(users) - 1] users[lastIndex] = users[len(users)-1]
conversationUsers[oldConversation] = users[:len(users)-1] conversationUsers[oldConversation] = users[:len(users)-1]
} }
} }
// Populate new values // Populate new values
userConversation[user.UserId] = conversationId userConversation[user.UserId] = conversationId
if _, ok := conversationUsers[conversationId]; !ok { if _, ok := conversationUsers[conversationId]; !ok {
conversationUsers[conversationId] = make([]string, 0) conversationUsers[conversationId] = make([]string, 0)
} }
conversationUsers[conversationId] = append(conversationUsers[conversationId], user.UserId) conversationUsers[conversationId] = append(conversationUsers[conversationId], user.UserId)
w.WriteHeader(200) w.WriteHeader(200)
} }