How to debug an exception in a multithread server class

I have a socket class that has a separate thread for recv/send data, and it is able to send/receive data correctly.

The part of the logic that is not working and giving me an exception is the part when there is a disconnect from the client and the server has to cleanup everything related to the socket and start listening again.

Please note that the cleanup works the first time, but it is only the second time when a disconnect occurs that I see the exception:

libc++abi: terminating

I cannot debug the application, but I was able to attach a debugger and get the callstack from the recv thread as below:

raise 0x00007f3873ebeca0
abort 0x00007f3873ec0148
abort_message 0x0000558c4d7dd9b4
demangling_terminate_handler() 0x0000558c4d7dd00d
std::__terminate(void (*)()) 0x0000558c4d7dcf52
std::terminate() 0x0000558c4d7dcea9
std::__1::thread::operator=[abi:v15006](std::__1::thread&&) 0x0000558c4d889eaa
Server::recvThread() 0x0000558c4d8855ec
decltype (((*((std::declval<Server*>)())).*((std::declval<void (Server::*)()>)()))()) std::__1::__invoke[abi:v15006]<void (Server::*)(), Server*, , void>(void (Server::*&&)(), Server*&&) 0x0000558c4d8924f9
void std::__1::__thread_execute[abi:v15006]<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, void (Server::*)(), Server*, 2ul>(std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, void (Server::*)(), Server*>&, std::__1::__tuple_indices<2ul>) 0x0000558c4d89247e
void* std::__1::__thread_proxy[abi:v15006]<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, void (Server::*)(), Server*> >(void*) 0x0000558c4d892252
start_thread 0x00007f3873c7444b
clone 0x00007f3873f7a52f

My Server class looks like below:

class Server{
public:

    ~Server();

    bool listen();
    void setPort(unsigned short port);
    void shutdown();

protected:

    void sendThread();

    void recvThread();

    void acceptThread();

    void resetThread();
 
    void joinThreads(); 

    void cleanUp();
   
    void queueMessage(Message responseMessage);
  
    Message createMessageFromBuffer(std::vector<unsigned char>& byteBuffer);


    std::thread m_sendThread;

    std::thread m_recvThread;
    std::thread m_acceptThread;
    std::thread m_resetThread;

    bool m_isShutdown;

    bool m_needsReset;

    std::mutex m_mutex;

    std::condition_variable m_sendThreadWait;

    int m_listenSocket;


    unsigned short m_port;

    unsigned int m_socketId;

    std::deque<Message> m_messageQueue;
};

My Server.cpp class looks like:

Server::~Server() {
    shutdown();
}

bool Server::listen() {
    std::cout<<"Server::listenn";
    m_listenSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (m_listenSocket < 0) {
        std::cout<<"Failed to open listen socket.n";
        return false;
    }

    int optval = 0;
    if (setsockopt(m_listenSocket, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
        std::cout<<"Failed to set SO_REUSEADDR option.n";
        close(m_listenSocket);
        return false;
    }

    struct sockaddr_in server_addr;
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(m_port);

    if (bind(m_listenSocket, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
        std::cout<<" Failed to bind to port."<<strerror(errno)<<"n";
        close(m_listenSocket);
        return false;
    }

    int backlog = 1;  // We are only interested to pool one client.
    if (::listen(m_listenSocket, backlog) < 0) {
        std::cout<<" Failed to listen on the listen socket."<<strerror(errno)<<"n";
        close(m_listenSocket);
        return false;
    }

    std::cout<<"Server is listening."<<"port"<<m_port<<"n";

    m_acceptThread = std::thread(&Server::acceptThread, this);

    return true;
}

void Server::setPort(unsigned short port) {
    std::cout<<"port"<<port<<"n";
    m_port = port;
}

void Server::joinThreads() {
    std::cout<<"Server::joinThreadsn";
    if (m_acceptThread.joinable()) {
        m_acceptThread.join();
    }

    if (m_recvThread.joinable()) {
        m_recvThread.join();
    }

    if (m_sendThread.joinable()) {
        m_sendThread.join();
    }
}

void Server::shutdown() {
    std::cout<<"Server::shutdownn";
    {
        std::lock_guard<std::mutex> lock(m_mutex);
        if (m_isShutdown) {
            // shutdown already invoked
            return;
        }
        m_isShutdown = true;
    }

    if (m_resetThread.joinable()) {
        m_resetThread.join();
    }

    cleanUp();
}

void Server::cleanUp(){
    std::cout<<"Server::cleanUpn";
    {
        std::lock_guard<std::mutex> lock(m_mutex);
        m_messageQueue.clear();
        m_sendThreadWait.notify_one();
    }

    ::shutdown(m_listenSocket, SHUT_RD);
    ::shutdown(m_socketId, SHUT_RDWR);
    close(m_listenSocket);
    close(m_socketId);

    joinThreads();
}

void Server::resetThread() {
    std::cout<<"Server::resetThread::started.n";
    cleanUp();
    {
        std::lock_guard<std::mutex> lock(m_mutex);
        m_needsReset = false;
    }
    listen();
    std::cout<<"Server::resetThread::completed.n";
}

void Server::queuemessage(message message) {
    std::lock_guard<std::mutex> lock(m_mutex);
    m_messageQueue.emplace_back(message);
    m_sendThreadWait.notify_one();
}

void Server::sendThread() {
    std::cout<<"Server::sendThread::started.n";
    try {
        do {
            message message = messageBuilder{}.build();
            {
                std::unique_lock <std::mutex> lock(m_mutex);
                m_sendThreadWait.wait(lock, [this] { return m_messageQueue.size() || m_isShutdown || m_needsReset; });
                if (m_isShutdown || m_needsReset) {
                    break;
                }
                message = m_messageQueue.front();
                m_messageQueue.pop_front();
            }

            /// create send payload from message 

            std::string responsePayload = message.toString();

            std::cout<<"Sending Payload"<<responsePayload<<"n";

            std::vector<unsigned char> buffer(responsePayload.begin(), responsePayload.end());
            int responseLength = responsePayload.size();
            int responseLengthInNetworkByte = htonl(responseLength);
            ssize_t bytesRemaining = responseLength;
            ssize_t bytesAlreadySent = 0;
            send(m_socketId, &responseLengthInNetworkByte, sizeof(int), 0);

            do {
                ssize_t bytes = send(m_socketId, &buffer[0] + bytesAlreadySent, bytesRemaining, 0);
                {
                    std::unique_lock <std::mutex> lock(m_mutex);
                    if (m_isShutdown || m_needsReset) {
                        break;
                    }
                }
                bytesAlreadySent += bytes;
                bytesRemaining -= bytes;
            } while (bytesRemaining > 0);
        } while (true);
        {
            std::unique_lock <std::mutex> lock(m_mutex);
            if (!m_isShutdown && !m_needsReset) {
                std::cout << "SendThread1n";
                m_needsReset = true;
                m_resetThread = std::thread(&Server::resetThread, this);
                std::cout << "SendThread2n";
            }
        }
    } catch (const std::exception &e) {
        std::cout<<"send outside exception"<<e.what()<<"n";
    }
    std::cout<<"Server::sendThread::completed.n";
}

void Server::acceptThread() {
    try {
        std::cout<<"Server::acceptThread::started.n";
        socklen_t sockaddrLength = sizeof(struct sockaddr_in);
        int incomingSocketId = accept(m_listenSocket, NULL, &sockaddrLength);
        if (incomingSocketId < 0) {
            std::cout<<.m("Failed to accept the client connection").d("reason", strerror(errno)));
            return;
        }

        m_socketId = incomingSocketId;

        // Start send and recv thread
        m_sendThread = std::thread(&Server::sendThread, this);
        m_recvThread = std::thread(&Server::recvThread, this);

    } catch (const std::exception& e) {
        std::cout<<"socket closed."<<e.what()<<"n";
        return;
    }
    std::cout<<"Server::acceptThread::completed.n";
}

void Server::recvThread() {
    std::cout<<"Server::recvThread::started.n"));
    try {
        do {
            {
                std::unique_lock <std::mutex> lock(m_mutex);
                if (m_isShutdown || m_needsReset) {
                    break;
                }
            }
            // This loop reads the message size first before it reads the actual payload
            int requestLenBuffer = 0;  // Request size
            ssize_t bytesRead = recv(m_socketId, &requestLenBuffer, sizeof(int), 0);
            if (bytesRead > 0) {
                int requestLenBufferInHostBytes = ntohl(requestLenBuffer);
                std::cout<<.d("request size.", requestLenBufferInHostBytes));
                ssize_t bytesRemaining = requestLenBufferInHostBytes;
                ssize_t totalBytesRead = 0;
                std::vector<unsigned char> byteBuffer(bytesRemaining + 1);  // +1 for the NUL-terminator
                bool joinThread = false;
                // Read the request
                while (bytesRemaining > 0) {
                    ssize_t bytesRead = recv(m_socketId, byteBuffer.data() + totalBytesRead, bytesRemaining, 0);
                    if (bytesRead > 0) {
                        totalBytesRead += bytesRead;
                        bytesRemaining -= bytesRead;
                        continue;

                    } else if (!bytesRead) {
                        std::cout<<"Server:recvThread::connection closed.n";
                        joinThread = true;
                        break;
                    } else {
                        std::cout<<"Server:recvThread"<<strerror(errno)<<"n";
                        joinThread = true;
                        break;
                    }
                }

                if (joinThread) {
                    break;
                }

                byteBuffer[requestLenBufferInHostBytes] = '';  // NUL-terminate the string
                std::cout<<"Recieved Request: "<<byteBuffer.data()<<"n";

                Message message = createMessageFromBuffer(byteBuffer);
                
            } else if (!bytesRead) {
                std::cout<<"Server:recvThread::connection closed.n";
                break;
            } else {
                std::cout<<"Server:recvThread"<<strerror(errno)<<"n";
                break;
            }
        } while (true);
        {
            std::unique_lock <std::mutex> lock(m_mutex);
            if (!m_isShutdown && !m_needsReset) {
                m_needsReset = true;
                std::cout << "RecvThread1n";
                try {
                  m_resetThread = std::thread(&Server::resetThread, this);

                } catch (const std::exception &e) {
                    std::cout<<"exception"<<e.what()<<"n";
                }
                std::cout << "RecvThread2n";

            }
        }
    }catch (const std::exception &e) {
        std::cout<<"exception"<<e.what()<<"n";
    }
    std::cout<<"Server::recvThread::completedn";
}

Message Server::createMessageFromBuffer(std::vector<unsigned char>& byteBuffer) {
    /// process buffer to create message
    return message;
}

From console output:

...
Server:sendThread:Sending Payload=...
Server:sendThread:Sending Payload=...
Server:recvThread::connection closed.
RecvThread1
RecvThread2
Server:resetThread::started.
Server:cleanUp
Server:recvThread::completed
Server:sendThread::completed.
Server:joinThreads
Server:listen
Server:listen::SocketServer is listening.:port=49152
Server:acceptThread::started.
Server:resetThread::completed.
Server:sendThread::started.
Server:recvThread::started.
Server:acceptThread::completed.
Server:recvThread:request size.=160
Server:recvThread:Recieved Request: = ...
Server:createMessageFromBuffer

Server:sendThread:Sending Payload=...
Server:sendThread:Sending Payload=...
Server:recvThread::connection closed.
RecvThread1
libc++abi: terminatingServer:resetThread::started.

Server:cleanUp

As can be seen from the console, if the socket is closed the first time, resetThread cleans everything up and the client can reconnect and send messages again, but the second time the client disconnects I see the exception.

I also enclosed restarting the resetThread in a try..catch block to catch the exception, but it seems like no exception is being caught there, so I need pointers as to what I might be missing here, or debug tips to the root cause of the issue, and to understand why the reset codepath is working only the first time.

25

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật