I am working on a TCP file streaming server written in Go that communicates with an ESP32 microcontroller. The ESP32 sends TCP packets, including ACK packets with zero-length bodies but containing crucial information in their headers.
My problem is that I cannot correctly capture and handle these zero-length TCP ACK packets. The ACK packets are important for managing the streaming process and ensuring data integrity. Here’s a breakdown of my setup:
Go TCP Server:
- Listens for incoming connections.
- Streams files to the client (ESP32).
- Needs to correctly handle ACK packets to manage flow control.
ESP32 Client:
- Connects to the Go server.
- Receives file data.
- Sends ACK packets with zero-length bodies but necessary information in the TCP headers.
Issue:
- The server reads data from the connection but fails to capture or correctly interpret the zero-length TCP ACK packets.
- doesn’t matter whether I use a regular connection reader or a Bufio reader. The problem always remains the same: packets with a length greater than 0 are processed without any issues.
Here is the source code:
package handlers
import (
"bufio"
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
"strings"
"time"
"source/config"
"source/models"
"source/utils"
)
const (
// (Старый метод) Параметры скейла окна
// initialWindowSize = 80 // начальный размер окна в байтах
// maxWindowSize = 500 // максимальный размер окна в байтах
ackTimeout = 3 * time.Second
chunkSize = 2000
maxBufferSize = 1024 * 30
)
type StreamHandler struct {
config config.Config
}
func NewStreamHandler(config config.Config) *StreamHandler {
return &StreamHandler{config: config}
}
func (h *StreamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Println(utils.LogFileStreaming)
request := new(models.Request)
if err := request.Deserialize(r.Body); err != nil {
utils.SendErrorMsgAndLog(w, http.StatusBadRequest, utils.MergeMessages(utils.ErrBadRequest, err.Error()))
return
}
fileType := request.Data.FileType
filesDir, err := h.config.GetFilesDir(fileType)
if err != nil {
utils.SendError(w, http.StatusBadRequest, utils.MergeMessages(utils.ErrInvalidFileType, fileType))
return
}
var streamParams models.StreamHandlerParams
if err := streamParams.Deserialize(request.Data.Params); err != nil {
utils.SendErrorMsgAndLog(w, http.StatusBadRequest, utils.MergeMessages(utils.ErrBadRequest, err.Error()))
return
}
files, err := os.ReadDir(filesDir)
if err != nil {
utils.SendError(w, http.StatusInternalServerError, utils.ErrReadDir)
return
}
var filePath string
for _, file := range files {
if !file.IsDir() && strings.TrimSuffix(file.Name(), filepath.Ext(file.Name())) == streamParams.FileName {
filePath = filepath.Join(filesDir, file.Name())
break
}
}
if filePath == "" {
utils.SendError(w, http.StatusNotFound, utils.MergeMessages(utils.ErrFilesNotFound, streamParams.FileName))
return
}
file, err := os.Open(filePath)
if err != nil {
utils.SendError(w, http.StatusUnprocessableEntity, utils.MergeMessages(utils.ErrOpenEffect, streamParams.FileName))
return
}
defer file.Close()
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s.effect", streamParams.FileName))
w.Header().Set("Content-Type", "application/octet-stream")
w.WriteHeader(http.StatusOK)
flusher, ok := w.(http.Flusher)
if !ok {
utils.SendError(w, http.StatusInternalServerError, "Streaming not supported")
return
}
log.Println(utils.MergeMessages(utils.LogFileStreamed, streamParams.FileName))
buffer := make([]byte, chunkSize)
reader := bufio.NewReader(r.Body)
for {
n, err := file.Read(buffer)
if err != nil && err != io.EOF {
utils.SendError(w, http.StatusInternalServerError, utils.ErrStreaming)
return
}
if n == 0 {
break
}
if _, err := w.Write(buffer[:n]); err != nil {
utils.SendError(w, http.StatusInternalServerError, utils.ErrStreaming)
return
}
flusher.Flush()
if !waitForAck(reader) {
log.Println("ACK не был получен, поток остановлен")
return
}
}
log.Println(utils.MergeMessages(utils.LogFileStreamed, streamParams.FileName))
}
func waitForAck(reader *bufio.Reader) bool {
ackChan := make(chan bool, 1)
go func() {
response, err := reader.ReadString('n')
if err != nil {
ackChan <- false
return
}
if strings.Contains(response, "ACK") {
ackChan <- true
} else {
ackChan <- false
}
}()
select {
case ack := <-ackChan:
return ack
case <-time.After(ackTimeout):
return false
}
}
Никита Танцура is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.
3