I’m trying to set up a single server single client connection in golang,
The goal is is to perform the fastest connection between the server and the client
The rate from server to client need to be enough for full hd video transfer at 60 fps (around 30Mbps once frames are compressed)
My actual transfer rate with the following snippets are around 11 Mbps with Ethernet and 6 Mbps for WiFi
I’ve settled up a multi channels video sending for both client and server, the server channels effectively speed up the rate by 2 over WiFi but channels for the client don’t help at all actually, which is logic given the nature of the code used by the client side channels
The goal of the following code is to perform the fastest possible transfert using websocket from the server to the client
main.go (server) =>
package main
import (
"fmt"
"log"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type Channel struct {
name string
buffer chan []byte
bytesSent int
mu sync.Mutex
}
type Server struct {
clients map[*websocket.Conn]bool
broadcast chan []byte
channels map[string]*Channel
mu sync.Mutex
}
func newServer() *Server {
return &Server{
clients: make(map[*websocket.Conn]bool),
broadcast: make(chan []byte),
channels: make(map[string]*Channel),
}
}
func (s *Server) handleConnections(w http.ResponseWriter, r *http.Request) {
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Fatalf("Failed to upgrade to WebSocket: %v", err)
}
defer ws.Close()
s.mu.Lock()
s.clients[ws] = true
s.mu.Unlock()
for {
_, msg, err := ws.ReadMessage()
if err != nil {
log.Printf("Error reading message: %v", err)
delete(s.clients, ws)
break
}
s.broadcast <- msg
}
}
func (s *Server) handleMessages() {
for {
msg := <-s.broadcast
for client := range s.clients {
err := client.WriteMessage(websocket.BinaryMessage, msg)
if err != nil {
log.Printf("Error writing message: %v", err)
client.Close()
delete(s.clients, client)
}
}
}
}
func (s *Server) createChannel(name string) {
s.mu.Lock()
defer s.mu.Unlock()
if _, exists := s.channels[name]; !exists {
s.channels[name] = &Channel{name: name, buffer: make(chan []byte, 100)}
}
}
func (s *Server) startChannel(name string) {
s.mu.Lock()
channel, exists := s.channels[name]
s.mu.Unlock()
if !exists {
log.Printf("Channel %s does not exist", name)
return
}
go func() {
for msg := range channel.buffer {
s.broadcast <- append([]byte(name+": "), msg...)
channel.mu.Lock()
channel.bytesSent += len(msg)
channel.mu.Unlock()
}
}()
}
func (s *Server) sendToChannel(name string, msg []byte) {
s.mu.Lock()
channel, exists := s.channels[name]
s.mu.Unlock()
if !exists {
log.Printf("Channel %s does not exist", name)
return
}
channel.buffer <- msg
}
func main() {
server := newServer()
http.HandleFunc("/ws", server.handleConnections)
go server.handleMessages()
numChannels := 10 // Number of channels
for i := 1; i <= numChannels; i++ {
channelName := fmt.Sprintf("channel_%d", i)
server.createChannel(channelName)
server.startChannel(channelName)
}
// Queue for distributing data
dataQueue := make(chan []byte, 1000)
// Generate dummy data
go func() {
dummyData := make([]byte, 512*1024) // 0.5 MB of dummy data
ticker := time.NewTicker(time.Millisecond * 17) // 60 times per second
for {
select {
case <-ticker.C:
dataQueue <- dummyData
}
}
}()
// Distribute data among channels
go func() {
channelId := 1
for data := range dataQueue {
channelName := fmt.Sprintf("channel_%d", channelId)
server.sendToChannel(channelName, data)
channelId++
if channelId > numChannels {
channelId = 1
}
}
}()
// Log the total sent data every second
go func() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for range ticker.C {
totalSent := 0
for _, channel := range server.channels {
channel.mu.Lock()
totalSent += channel.bytesSent
channel.bytesSent = 0 // reset for the next second
channel.mu.Unlock()
}
log.Printf("Total Sent: %.2f MB/s", float64(totalSent)/1024/1024)
}
}()
log.Println("Server started on :8080")
err := http.ListenAndServe(":8080", nil)
if err != nil {
log.Fatalf("Server failed to start: %v", err)
}
}
main.go (client)
package main
import (
"log"
"net/url"
"sync"
"time"
"github.com/gorilla/websocket"
)
type ChannelStats struct {
sync.Mutex
bytesReceived int
}
func readFromWebSocket(u url.URL, messageChan chan []byte, done chan bool) {
u.Path = "/ws"
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
log.Fatalf("Dial error: %v", err)
}
defer c.Close()
for {
_, message, err := c.ReadMessage()
if err != nil {
log.Printf("Read error: %v", err)
done <- true
return
}
messageChan <- message
}
}
func processMessages(stats *ChannelStats, messageChan chan []byte, wg *sync.WaitGroup, done chan bool) {
defer wg.Done()
for {
select {
case message := <-messageChan:
stats.Lock()
stats.bytesReceived += len(message)
stats.Unlock()
case <-done:
return
}
}
}
func main() {
u := url.URL{Scheme: "ws", Host: "192.168.0.86:8080"}
var wg sync.WaitGroup
done := make(chan bool)
messageChan := make(chan []byte, 100) // Buffered channel to hold messages
stats := &ChannelStats{}
numWorkers := 10 // Number of worker goroutines
// Start a single reader goroutine
go readFromWebSocket(u, messageChan, done)
// Start multiple worker goroutines
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go processMessages(stats, messageChan, &wg, done)
}
// Log stats every second
go func() {
for {
time.Sleep(1 * time.Second)
stats.Lock()
totalReceived := stats.bytesReceived
stats.bytesReceived = 0 // reset for the next second
stats.Unlock()
log.Printf("Total Received: %.2f MB/s", float64(totalReceived)/1024/1024)
}
}()
// Wait for all workers to finish
wg.Wait()
close(done)
}
I’m open to suggestions about the technology/library used to perform this data transfer, keeping in mind the goal is simply to perform the fastest data transfer in golang