I have a socket class that has a separate thread for recv/send data, and it is able to send/receive data correctly.
The part of the logic that is not working and giving me an exception is the part when there is a disconnect from the client and the server has to cleanup everything related to the socket and start listening again.
Please note that the cleanup works the first time, but it is only the second time when a disconnect occurs that I see the exception:
libc++abi: terminating
I cannot debug the application, but I was able to attach a debugger and get the callstack from the recv thread as below:
raise 0x00007f3873ebeca0
abort 0x00007f3873ec0148
abort_message 0x0000558c4d7dd9b4
demangling_terminate_handler() 0x0000558c4d7dd00d
std::__terminate(void (*)()) 0x0000558c4d7dcf52
std::terminate() 0x0000558c4d7dcea9
std::__1::thread::operator=[abi:v15006](std::__1::thread&&) 0x0000558c4d889eaa
Server::recvThread() 0x0000558c4d8855ec
decltype (((*((std::declval<Server*>)())).*((std::declval<void (Server::*)()>)()))()) std::__1::__invoke[abi:v15006]<void (Server::*)(), Server*, , void>(void (Server::*&&)(), Server*&&) 0x0000558c4d8924f9
void std::__1::__thread_execute[abi:v15006]<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, void (Server::*)(), Server*, 2ul>(std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, void (Server::*)(), Server*>&, std::__1::__tuple_indices<2ul>) 0x0000558c4d89247e
void* std::__1::__thread_proxy[abi:v15006]<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, void (Server::*)(), Server*> >(void*) 0x0000558c4d892252
start_thread 0x00007f3873c7444b
clone 0x00007f3873f7a52f
My Server class looks like below:
class Server{
public:
~Server();
bool listen();
void setPort(unsigned short port);
void shutdown();
protected:
void sendThread();
void recvThread();
void acceptThread();
void resetThread();
void joinThreads();
void cleanUp();
void queueMessage(Message responseMessage);
Message createMessageFromBuffer(std::vector<unsigned char>& byteBuffer);
std::thread m_sendThread;
std::thread m_recvThread;
std::thread m_acceptThread;
std::thread m_resetThread;
bool m_isShutdown;
bool m_needsReset;
std::mutex m_mutex;
std::condition_variable m_sendThreadWait;
int m_listenSocket;
unsigned short m_port;
unsigned int m_socketId;
std::deque<Message> m_messageQueue;
};
My Server.cpp
class looks like:
Server::~Server() {
shutdown();
}
bool Server::listen() {
std::cout<<"Server::listenn";
m_listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (m_listenSocket < 0) {
std::cout<<"Failed to open listen socket.n";
return false;
}
int optval = 0;
if (setsockopt(m_listenSocket, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
std::cout<<"Failed to set SO_REUSEADDR option.n";
close(m_listenSocket);
return false;
}
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(m_port);
if (bind(m_listenSocket, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
std::cout<<" Failed to bind to port."<<strerror(errno)<<"n";
close(m_listenSocket);
return false;
}
int backlog = 1; // We are only interested to pool one client.
if (::listen(m_listenSocket, backlog) < 0) {
std::cout<<" Failed to listen on the listen socket."<<strerror(errno)<<"n";
close(m_listenSocket);
return false;
}
std::cout<<"Server is listening."<<"port"<<m_port<<"n";
m_acceptThread = std::thread(&Server::acceptThread, this);
return true;
}
void Server::setPort(unsigned short port) {
std::cout<<"port"<<port<<"n";
m_port = port;
}
void Server::joinThreads() {
std::cout<<"Server::joinThreadsn";
if (m_acceptThread.joinable()) {
m_acceptThread.join();
}
if (m_recvThread.joinable()) {
m_recvThread.join();
}
if (m_sendThread.joinable()) {
m_sendThread.join();
}
}
void Server::shutdown() {
std::cout<<"Server::shutdownn";
{
std::lock_guard<std::mutex> lock(m_mutex);
if (m_isShutdown) {
// shutdown already invoked
return;
}
m_isShutdown = true;
}
if (m_resetThread.joinable()) {
m_resetThread.join();
}
cleanUp();
}
void Server::cleanUp(){
std::cout<<"Server::cleanUpn";
{
std::lock_guard<std::mutex> lock(m_mutex);
m_messageQueue.clear();
m_sendThreadWait.notify_one();
}
::shutdown(m_listenSocket, SHUT_RD);
::shutdown(m_socketId, SHUT_RDWR);
close(m_listenSocket);
close(m_socketId);
joinThreads();
}
void Server::resetThread() {
std::cout<<"Server::resetThread::started.n";
cleanUp();
{
std::lock_guard<std::mutex> lock(m_mutex);
m_needsReset = false;
}
listen();
std::cout<<"Server::resetThread::completed.n";
}
void Server::queuemessage(message message) {
std::lock_guard<std::mutex> lock(m_mutex);
m_messageQueue.emplace_back(message);
m_sendThreadWait.notify_one();
}
void Server::sendThread() {
std::cout<<"Server::sendThread::started.n";
try {
do {
message message = messageBuilder{}.build();
{
std::unique_lock <std::mutex> lock(m_mutex);
m_sendThreadWait.wait(lock, [this] { return m_messageQueue.size() || m_isShutdown || m_needsReset; });
if (m_isShutdown || m_needsReset) {
break;
}
message = m_messageQueue.front();
m_messageQueue.pop_front();
}
/// create send payload from message
std::string responsePayload = message.toString();
std::cout<<"Sending Payload"<<responsePayload<<"n";
std::vector<unsigned char> buffer(responsePayload.begin(), responsePayload.end());
int responseLength = responsePayload.size();
int responseLengthInNetworkByte = htonl(responseLength);
ssize_t bytesRemaining = responseLength;
ssize_t bytesAlreadySent = 0;
send(m_socketId, &responseLengthInNetworkByte, sizeof(int), 0);
do {
ssize_t bytes = send(m_socketId, &buffer[0] + bytesAlreadySent, bytesRemaining, 0);
{
std::unique_lock <std::mutex> lock(m_mutex);
if (m_isShutdown || m_needsReset) {
break;
}
}
bytesAlreadySent += bytes;
bytesRemaining -= bytes;
} while (bytesRemaining > 0);
} while (true);
{
std::unique_lock <std::mutex> lock(m_mutex);
if (!m_isShutdown && !m_needsReset) {
std::cout << "SendThread1n";
m_needsReset = true;
m_resetThread = std::thread(&Server::resetThread, this);
std::cout << "SendThread2n";
}
}
} catch (const std::exception &e) {
std::cout<<"send outside exception"<<e.what()<<"n";
}
std::cout<<"Server::sendThread::completed.n";
}
void Server::acceptThread() {
try {
std::cout<<"Server::acceptThread::started.n";
socklen_t sockaddrLength = sizeof(struct sockaddr_in);
int incomingSocketId = accept(m_listenSocket, NULL, &sockaddrLength);
if (incomingSocketId < 0) {
std::cout<<.m("Failed to accept the client connection").d("reason", strerror(errno)));
return;
}
m_socketId = incomingSocketId;
// Start send and recv thread
m_sendThread = std::thread(&Server::sendThread, this);
m_recvThread = std::thread(&Server::recvThread, this);
} catch (const std::exception& e) {
std::cout<<"socket closed."<<e.what()<<"n";
return;
}
std::cout<<"Server::acceptThread::completed.n";
}
void Server::recvThread() {
std::cout<<"Server::recvThread::started.n"));
try {
do {
{
std::unique_lock <std::mutex> lock(m_mutex);
if (m_isShutdown || m_needsReset) {
break;
}
}
// This loop reads the message size first before it reads the actual payload
int requestLenBuffer = 0; // Request size
ssize_t bytesRead = recv(m_socketId, &requestLenBuffer, sizeof(int), 0);
if (bytesRead > 0) {
int requestLenBufferInHostBytes = ntohl(requestLenBuffer);
std::cout<<.d("request size.", requestLenBufferInHostBytes));
ssize_t bytesRemaining = requestLenBufferInHostBytes;
ssize_t totalBytesRead = 0;
std::vector<unsigned char> byteBuffer(bytesRemaining + 1); // +1 for the NUL-terminator
bool joinThread = false;
// Read the request
while (bytesRemaining > 0) {
ssize_t bytesRead = recv(m_socketId, byteBuffer.data() + totalBytesRead, bytesRemaining, 0);
if (bytesRead > 0) {
totalBytesRead += bytesRead;
bytesRemaining -= bytesRead;
continue;
} else if (!bytesRead) {
std::cout<<"Server:recvThread::connection closed.n";
joinThread = true;
break;
} else {
std::cout<<"Server:recvThread"<<strerror(errno)<<"n";
joinThread = true;
break;
}
}
if (joinThread) {
break;
}
byteBuffer[requestLenBufferInHostBytes] = ''; // NUL-terminate the string
std::cout<<"Recieved Request: "<<byteBuffer.data()<<"n";
Message message = createMessageFromBuffer(byteBuffer);
} else if (!bytesRead) {
std::cout<<"Server:recvThread::connection closed.n";
break;
} else {
std::cout<<"Server:recvThread"<<strerror(errno)<<"n";
break;
}
} while (true);
{
std::unique_lock <std::mutex> lock(m_mutex);
if (!m_isShutdown && !m_needsReset) {
m_needsReset = true;
std::cout << "RecvThread1n";
try {
m_resetThread = std::thread(&Server::resetThread, this);
} catch (const std::exception &e) {
std::cout<<"exception"<<e.what()<<"n";
}
std::cout << "RecvThread2n";
}
}
}catch (const std::exception &e) {
std::cout<<"exception"<<e.what()<<"n";
}
std::cout<<"Server::recvThread::completedn";
}
Message Server::createMessageFromBuffer(std::vector<unsigned char>& byteBuffer) {
/// process buffer to create message
return message;
}
From console output:
...
Server:sendThread:Sending Payload=...
Server:sendThread:Sending Payload=...
Server:recvThread::connection closed.
RecvThread1
RecvThread2
Server:resetThread::started.
Server:cleanUp
Server:recvThread::completed
Server:sendThread::completed.
Server:joinThreads
Server:listen
Server:listen::SocketServer is listening.:port=49152
Server:acceptThread::started.
Server:resetThread::completed.
Server:sendThread::started.
Server:recvThread::started.
Server:acceptThread::completed.
Server:recvThread:request size.=160
Server:recvThread:Recieved Request: = ...
Server:createMessageFromBuffer
Server:sendThread:Sending Payload=...
Server:sendThread:Sending Payload=...
Server:recvThread::connection closed.
RecvThread1
libc++abi: terminatingServer:resetThread::started.
Server:cleanUp
As can be seen from the console, if the socket is closed the first time, resetThread
cleans everything up and the client can reconnect and send messages again, but the second time the client disconnects I see the exception.
I also enclosed restarting the resetThread
in a try..catch
block to catch the exception, but it seems like no exception is being caught there, so I need pointers as to what I might be missing here, or debug tips to the root cause of the issue, and to understand why the reset codepath is working only the first time.
25