I’m working on a peer-to-peer network project using Raft for consensus and libp2p for peer-to-peer communication. I encountered an issue where the RequestVote RPC in Raft is failing with a Msgpack decode error. Here are the error logs:
2024-06-12T23:16:22.856Z [ERROR] raft: failed to make requestVote RPC: target="{Voter QmWaqhWAmGpg4GRTJTQYxrfHHTnz2h2C144VsiRpcxp67K 127.0.0.1:8080}" error="msgpack decode error [pos 1]: invalid byte descriptor for decoding bytes, got: 0x13" term=17
...
Here are some verbose logs (includes output from bits of code I deem not related to this issue, but I don’t see a point in pruning their output):
kurlox-client-1 | 2024-06-12 23:29:11 | ***DEBUG*** | #1 ---------------------------------------------------- here
kurlox-client-1 | 2024-06-12 23:29:11 | ***DEBUG*** | Local ID: QmWaqhWAmGpg4GRTJTQYxrfHHTnz2h2C144VsiRpcxp67K
kurlox-client-1 | 2024-06-12 23:29:11 | ***DEBUG*** | RAFT ADDR: 127.0.0.1:9000
kurlox-client-1 | 2024-06-12 23:29:11 | ***DEBUG*** | #3 ---------------------------------------------------- here
kurlox-client-1 |
kurlox-client-1 | 2024-06-12T23:29:11.648Z [INFO] raft: initial configuration: index=0 servers=[]2024-06-12T23:29:11.649Z [INFO] raft: entering follower state: follower="Node at 127.0.0.1:9000 [Follower]" leader-address= leader-id=
kurlox-client-1 | 2024-06-12 23:29:11 | ***DEBUG*** | #1 ---------------------------------------------------- here
kurlox-client-1 | 2024-06-12 23:29:11 | ***DEBUG*** | Local ID: QmYcBxutfuTTTWXJsLucyXFS4Pm4eBnTov5Ud1Bm993q6q
kurlox-client-1 | 2024-06-12 23:29:11 | ***DEBUG*** | RAFT ADDR: 127.0.0.1:9001
kurlox-client-1 | 2024-06-12 23:29:11 | ***DEBUG*** | #3 ---------------------------------------------------- here
kurlox-client-1 | 2024-06-12T23:29:11.649Z [INFO] raft: initial configuration: index=0 servers=[]
kurlox-client-1 | 2024-06-12T23:29:11.649Z [INFO] raft: entering follower state: follower="Node at 127.0.0.1:9001 [Follower]" leader-address= leader-id=
kurlox-client-1 | 2024-06-12T23:29:12.932Z [WARN] raft: heartbeat timeout reached, starting election: last-leader-addr= last-leader-id=
kurlox-client-1 | 2024-06-12T23:29:12.932Z [INFO] raft: entering candidate state: node="Node at 127.0.0.1:9000 [Candidate]" term=2
kurlox-client-1 | 2024-06-12T23:29:12.932Z [DEBUG] raft: voting for self: term=2 id=QmWaqhWAmGpg4GRTJTQYxrfHHTnz2h2C144VsiRpcxp67K
kurlox-client-1 | 2024-06-12T23:29:12.933Z [DEBUG] raft: asking for vote: term=2 from=QmYcBxutfuTTTWXJsLucyXFS4Pm4eBnTov5Ud1Bm993q6q address=127.0.0.1:8081
kurlox-client-1 | 2024-06-12T23:29:12.933Z [DEBUG] raft: calculated votes needed: needed=2 term=2
kurlox-client-1 | 2024-06-12T23:29:12.933Z [DEBUG] raft: vote granted: from=QmWaqhWAmGpg4GRTJTQYxrfHHTnz2h2C144VsiRpcxp67K term=2 tally=1
kurlox-client-1 | 2024-06-12T23:29:12.934Z [ERROR] raft: failed to make requestVote RPC: target="{Voter QmYcBxutfuTTTWXJsLucyXFS4Pm4eBnTov5Ud1Bm993q6q 127.0.0.1:8081}" error="msgpack decode error [pos 1]: invalid byte descriptor for decoding bytes, got: 0x13" term=2
The relevant parts of my code where I handle the FSM (Finite State Machine) and Msgpack encoding/decoding are as follows:
package p2p
import (
"context"
"crypto/ed25519"
"fmt"
"io"
"net"
"regexp"
"sync"
"time"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/raft"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
mdns "github.com/libp2p/go-libp2p/p2p/discovery/mdns"
routing "github.com/libp2p/go-libp2p/p2p/discovery/routing"
tls "github.com/libp2p/go-libp2p/p2p/security/tls"
"github.com/multiformats/go-multiaddr"
)
type FSM struct {
mu sync.Mutex
state map[string]string
}
func NewFSM() *FSM {
return &FSM{
state: make(map[string]string),
}
}
// Apply applies a Raft log entry to the FSM.
func (fsm *FSM) Apply(log *raft.Log) interface{} {
fsm.mu.Lock()
defer fsm.mu.Unlock()
var handle codec.MsgpackHandle
dec := codec.NewDecoderBytes(log.Data, &handle)
var cmd map[string]string
if err := dec.Decode(&cmd); err != nil {
panic(fmt.Sprintf("failed to unmarshal command: %s", err))
}
for k, v := range cmd {
fsm.state[k] = v
}
return nil
}
// Restore restores the FSM to a previous state based on a snapshot.
func (fsm *FSM) Restore(rc io.ReadCloser) error {
fsm.mu.Lock()
defer fsm.mu.Unlock()
var handle codec.MsgpackHandle
dec := codec.NewDecoder(rc, &handle)
var state map[string]string
if err := dec.Decode(&state); err != nil {
return fmt.Errorf("failed to decode snapshot: %s", err)
}
fsm.state = state
return nil
}
// Snapshot returns a snapshot of the FSM's state.
func (fsm *FSM) Snapshot() (raft.FSMSnapshot, error) {
fsm.mu.Lock()
defer fsm.mu.Unlock()
// Create a copy of the state to include in the snapshot
state := make(map[string]string)
for k, v := range fsm.state {
state[k] = v
}
return &FSMSnapshot{state: state}, nil
}
// FSMSnapshot represents a snapshot of the FSM's state.
type FSMSnapshot struct {
state map[string]string
}
// Persist saves the FSM's state to the given sink.
func (s *FSMSnapshot) Persist(sink raft.SnapshotSink) error {
var handle codec.MsgpackHandle
var buf []byte
enc := codec.NewEncoderBytes(&buf, &handle)
if err := enc.Encode(s.state); err != nil {
sink.Cancel()
return fmt.Errorf("failed to marshal snapshot data: %s", err)
}
if _, err := sink.Write(buf); err != nil {
sink.Cancel()
return fmt.Errorf("failed to write snapshot data: %s", err)
}
if err := sink.Close(); err != nil {
sink.Cancel()
return fmt.Errorf("failed to close snapshot sink: %s", err)
}
return nil
}
// Release is called when the snapshot is no longer needed.
func (s *FSMSnapshot) Release() {}
Additionally, here is the part of my code where I initialize and start Raft:
func initializeRaftStoresAndTransport(localID raft.ServerID, raftAddr string) (raft.LogStore, raft.StableStore, raft.SnapshotStore, raft.Transport, error) {
// Initialize Raft stores
logStore := raft.NewInmemStore() // Only for testing; not for production
stableStore := raft.NewInmemStore() // Only for testing; not for production
snapshotStore := raft.NewInmemSnapshotStore()
// Initialize Raft transport with the provided address
tcpAddr, err := net.ResolveTCPAddr("tcp", raftAddr)
if err != nil {
return nil, nil, nil, nil, err
}
transport, err := raft.NewTCPTransportWithConfig(
tcpAddr.String(),
tcpAddr,
&raft.NetworkTransportConfig{
Logger: nil,
MsgpackUseNewTimeFormat: true,
},
)
if err != nil {
return nil, nil, nil, nil, err
}
return logStore, stableStore, snapshotStore, transport, nil
}
func (p *P2PNetwork) InitAndStartRaft(ctx context.Context, raftPeers []raft.Server) error {
if p.Raft != nil {
return nil // Raft already started
}
// Initialize Raft stores and transport with the actual address
address := p.raftAddr
localID := raft.ServerID(p.Host.ID().String())
logStore, stableStore, snapshotStore, transport, err := initializeRaftStoresAndTransport(localID, address)
if err != nil {
return err
}
// Initialize FSM and Raft node
fsm := NewFSM()
config := raft.DefaultConfig()
config.LocalID = localID
raftNode, err := raft.NewRaft(config, fsm, logStore, stableStore, snapshotStore, transport)
if err != nil {
return err
}
p.Raft = &RaftHandler{
RaftNode: raftNode,
TustedNodes: make(map[string]ed25519.PublicKey),
}
// Convert multiaddrs to IP:Port format before bootstrapping Raft cluster
var convertedRaftPeers []raft.Server
for _, peer := range raftPeers {
address, err := multiaddr.NewMultiaddr(string(peer.Address))
if err != nil {
return err
}
ipPort, err := ExtractIPAndPortFromMultiaddr(address)
if err != nil {
return err
}
convertedRaftPeers = append(convertedRaftPeers, raft.Server{
ID: peer.ID,
Address: raft.ServerAddress(ipPort),
})
}
if len(convertedRaftPeers) == 0 {
return fmt.Errorf("no peers provided for Raft bootstrap")
}
err = raftNode.BootstrapCluster(raft.Configuration{
Servers: convertedRaftPeers,
}).Error()
if err != nil {
return err
}
return nil
}
Attempted Solutions:
- Set
MsgpackUseNewTimeFormat: true
. I saw a similar post that said to do this. It didn’t work. - I’ve ensured all my nodes are running the same versions of things (just testing with 2 nodes locally).
Environment:
Go version: 1.21
Libraries:
- github.com/hashicorp/raft v1.6.1
- github.com/hashicorp/go-msgpack v0.5.5
- github.com/libp2p/go-libp2p v0.33.2
Steps to Reproduce:
- Start the Raft cluster with multiple nodes.
- Nodes attempt to elect a leader.
- The error occurs during the RequestVote RPC.
Suspected Reasons for Issue:
I think it could be: msgpack
package error, differing protocols between lib-p2p and Raft, TCP vs. HTTP encoding issues, some invalid character passed, or something perhaps completely different.
Question:
What could be causing the msgpack decode error [pos 1]: invalid byte descriptor for decoding bytes, got: 0x13
? How can I fix this issue?
Any help or pointers would be greatly appreciated! Let me know if more context is needed.