I have a TCP server that tries to replicate a Redis client, which responds to
~ ➤ echo -ne '*1rn$4rnPINGrn' | nc localhost 6379
PONG%
correctly.
When I do
redis-cli PING
there is no response.
Step by step,
- I execute the command.
- Line 16, Tokio spawns a task for handling the stream.
- Line 38,
recv.lines()
can be iterated over with no problem. - We start iterating, line is equal to (after each loop)
*1
$4
indicating that next commands length is 4 characters.PING
indicating the command.
- Loop is over, and it hangs, with no errors and no response at all? Breakpoint at line 54
match command
never gets hit and task suddenly hangs after iterating all lines in loop. - Redis hangs because there is literally no response.
- Tokio console shows the task as never yielded (didnt include the tracer code).
- CTRL+C
redis-cli PING
command. - Breakpoint at line 54 gets hit! So it hangs until I SIGSEGV the redis-cli?
??
main file:
mod commands;
use std::{
io::{BufRead, BufReader, BufWriter, Read, Write},
net::SocketAddr,
net::{TcpListener, TcpStream},
};
use commands::{RedisCommand, RedisCommandResponse};
#[tokio::main]
pub async fn main() {
let listener = TcpListener::bind("127.0.0.1:6379").unwrap();
for stream in listener.incoming() {
match stream {
Ok(mut _stream) => {
tokio::task::spawn(handle_stream(_stream.try_clone().unwrap()));
}
Err(e) => {
println!("error: {}", e);
}
}
}
}
#[tracing::instrument]
async fn handle_stream(_stream: TcpStream) {
// 0 => type of the input coming in
// 1 => length of the first parameter
// 2 => first parameter, command.
// 3 => length of the second parameter
// 4 => second paramter, if exists.
let mut n: u8 = 0;
let mut command: RedisCommand = RedisCommand::UNKNOWN;
let mut out: String = String::new();
let recv = BufReader::new(&_stream);
let mut deb_buf: String = String::new();
println!("{}", deb_buf);
for l in recv.lines() {
let line: String = match l {
Ok(line) => line,
Err(error) => {
write_string_to_stream(format!("Invalid line: {}", error), _stream);
return;
}
};
if n == 4 && command == RedisCommand::ECHO {
out = line.clone();
}
if let Ok(_cmd) = RedisCommand::from_str(&line) {
command = _cmd;
}
n += 1;
}
match command {
RedisCommand::UNKNOWN => write_string_to_stream(String::from("Invalid command"), _stream),
RedisCommand::ECHO => write_string_to_stream(out, _stream),
RedisCommand::PING => write_string_to_stream(String::from("PONG"), _stream),
}
}
fn write_string_to_stream(out: String, _stream: TcpStream) {
let mut writer = BufWriter::new(&_stream);
match writer.write_all(out.as_bytes()) {
Ok(_) => {}
Err(_) => {
_stream.shutdown(std::net::Shutdown::Both);
return;
}
};
}
and the commands:
use std::cmp::PartialEq;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum RedisCommandError {
#[error("invalid command")]
Invalid(),
#[error("unknown command")]
Unknown,
}
#[derive(Error, Debug)]
pub enum RedisResponseCommandError {
#[error("invalid response command")]
Invalid(),
#[error("unknown command")]
Unknown,
}
#[derive(PartialEq, Debug)]
pub enum RedisCommand {
UNKNOWN,
PING,
ECHO,
}
impl RedisCommand {
pub fn from_str(s: &str) -> Result<RedisCommand, RedisCommandError> {
match s {
"PING" => Ok(RedisCommand::PING),
"ECHO" => Ok(RedisCommand::ECHO),
_ => Err(RedisCommandError::Invalid()),
}
}
}
#[derive(PartialEq)]
pub enum RedisCommandResponse {
PONG,
}
impl RedisCommandResponse {
pub fn to_string(self) -> String {
match self {
RedisCommandResponse::PONG => "+PONGrn".to_owned(),
}
}
}
only tokio is necessary for cargo.
<3
2