Why can’t I read from the stdout/stderr of a process spawned with forkpty in the C++20 coroutines + Asio single-threaded multi-coroutine model?

I am new to corotine and asio.
I am working on a local process management tool where a client is responsible for launching a program. This client sends commands to a backend daemon that manages the process. I am using a single-threaded, multi-coroutine model. My daemon needs to interact with the child process to obtain logs for IPC transmission to the client.

My question is: why does my daemon get stuck at code 1 async_read_some and not trigger reading stdout, while code 2 works fine?

Here is a simple demo:

client code:

#include <iostream>
#include <pwd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <thread>
#include <unistd.h>

void receive_output(int sockfd)
{
    char buffer[1024];
    while (true)
    {
        ssize_t n = recv(sockfd, buffer, sizeof(buffer) - 1, 0);
        if (n <= 0)
            break;
        buffer[n] = '';
        std::cout << buffer << std::flush;
    }
}

void send_input(int sockfd)
{
    std::string input;
    while (std::getline(std::cin, input))
    {
        input += 'n';
        send(sockfd, input.c_str(), input.size(), 0);
    }
}

int main()
{
    int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
    struct sockaddr_un addr;
    addr.sun_family = AF_UNIX;
    strcpy(addr.sun_path, "/tmp/socket");

    connect(sockfd, (struct sockaddr *)&addr, sizeof(addr));

    std::string command = "test";
    send(sockfd, command.c_str(), command.size(), 0);

    std::thread output_thread(receive_output, sockfd);
    std::thread input_thread(send_input, sockfd);

    output_thread.join();
    input_thread.join();

    close(sockfd);
    return 0;
}

server code:

#include <boost/asio.hpp>
#include <boost/asio/awaitable.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/posix/stream_descriptor.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <cstdlib>
#include <fcntl.h>
#include <iostream>
#include <pty.h>
#include <pwd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/wait.h>

using boost::asio::awaitable;
using boost::asio::use_awaitable;
namespace asio = boost::asio;

boost::asio::io_context io_context;

void set_nonblocking(int fd)
{
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags == -1)
    {
        perror("fcntl(F_GETFL)");
        return;
    }
    flags |= O_NONBLOCK;
    if (fcntl(fd, F_SETFL, flags) == -1)
    {
        perror("fcntl(F_SETFL)");
    }
}

awaitable<std::string> receive_command(asio::posix::stream_descriptor &client_stream)
{
    char buffer[256];
    std::size_t len = co_await client_stream.async_read_some(boost::asio::buffer(buffer), use_awaitable);
    buffer[len] = '';
    co_return std::string(buffer);
}

awaitable<void> run_process_in_pty(const std::string &command, asio::posix::stream_descriptor &client_stream,
                                   int client_fd)
{
    int master_fd;
    pid_t pid = forkpty(&master_fd, NULL, NULL, NULL);
    //   set_nonblocking(master_fd);
    if (pid == 0)
    {
        // Child process: Set user and change directory
        setvbuf(stdout, nullptr, _IONBF, 0);

        // execute echo just for test
        // execl(command.c_str(), "", NULL);
        execl("/bin/echo", "echo", "Testing PTY output!", NULL);

        std::cerr << "Failed to execute command: " << strerror(errno) << std::endl;
        exit(EXIT_FAILURE);
    }
    else
    {
        // Parent process: Handle I/O between client and pty
        asio::posix::stream_descriptor pty_stream(io_context, master_fd);

        // handle output
        co_spawn(
            io_context,
            [&client_stream, &pty_stream]() -> awaitable<void> {
                char buffer[1024];
                try
                {
                    while (true)
                    {
                        // code 1
                        // std::size_t n = co_await pty_stream.async_read_some(boost::asio::buffer(buffer), use_awaitable);

                        // code 2
                        ssize_t n = read(pty_stream.native_handle(), buffer, sizeof(buffer));

                        std::cout << buffer << std::endl;
                        if (n == 0)
                            break; // End of stream

                        //   co_await asio::async_write(
                        //   client_stream, boost::asio::buffer(buffer, n),
                        //   use_awaitable);
                    }
                }
                catch (std::exception &e)
                {
                    std::cerr << strerror(errno) << std::endl;
                }
                co_return;
            },
            asio::detached);

        // handle input



        waitpid(pid, nullptr, 0);

        co_return;
    }
}

awaitable<void> accept_client(asio::posix::stream_descriptor client_stream, int client_fd)
{
    std::string command = co_await receive_command(client_stream);
    std::cout << "Received command: " << command << std::endl;

    co_await run_process_in_pty(command, client_stream, client_fd);
    client_stream.close();
}

awaitable<int> async_accept(int server_fd)
{
    int client_fd;
    co_await asio::post(io_context, use_awaitable);
    client_fd = accept(server_fd, nullptr, nullptr);
    if (client_fd < 0)
    {
        std::cerr << "Failed to accept client connection" << std::endl;
        co_return - 1;
    }
    std::cout << "clientfd, " << client_fd << std::endl;
    set_nonblocking(client_fd);
    co_return client_fd;
}

awaitable<void> daemon_loop()
{
    const char *socket_path = "/tmp/socket";

    if (access(socket_path, F_OK) == 0)
    {
        if (unlink(socket_path) != 0)
        {
            std::cerr << "Failed to remove existing socket file" << std::endl;
            exit(EXIT_FAILURE);
        }
    }

    int server_fd = socket(AF_UNIX, SOCK_STREAM, 0);
    if (server_fd < 0)
    {
        std::cerr << "Failed to create socket" << std::endl;
        exit(EXIT_FAILURE);
    }
    //   set_nonblocking(server_fd);

    struct sockaddr_un addr;
    memset(&addr, 0, sizeof(addr));
    addr.sun_family = AF_UNIX;
    strcpy(addr.sun_path, socket_path);
    if (bind(server_fd, (struct sockaddr *)&addr, sizeof(addr)) < 0)
    {
        std::cerr << "Failed to bind socket" << std::endl;
        exit(EXIT_FAILURE);
    }

    if (listen(server_fd, 5) < 0)
    {
        std::cerr << "Failed to listen on socket" << std::endl;
        exit(EXIT_FAILURE);
    }

    asio::posix::stream_descriptor server_stream(io_context, server_fd);

    while (true)
    {
        int client_fd = co_await async_accept(server_fd);
        asio::posix::stream_descriptor client_stream(io_context, client_fd);
        std::cout << "Accepted connection" << std::endl;
        co_spawn(io_context, accept_client(std::move(client_stream), client_fd), asio::detached);
    }
}

int main()
{
    std::cout << "start" << std::endl;
    co_spawn(io_context, daemon_loop(), asio::detached);
    io_context.run();
    return 0;
}

Here is the different running result between code 1 and code 2

code 1:

code 2:

The most likely problem is your use of ::waitpid which blocks the only available IO service thread.

Your co_spawn in run_process_in_pty also plays fast and loose with object lifetimes. It captures client_stream and pty_stream by reference, but they are either locals or references from another coroutine frame.

Regardless, all the code just looks highly overcomplicated, somehow forcing C-style network code into Asio’s coroutine framework. Why not use Asio? So you can get:

awaitable<void> client_connection(Socket client) {
    std::cout << "Accepted connection" << std::endl;
    std::string command = co_await receive_command(client);
    std::cout << "Received command: " << command << std::endl;

    co_await run_process_in_pty(command, client, client.native_handle());
    // client.close(); // implicit due to RAII lifetime
}

awaitable<void> daemon_loop() {
    path socket_path = "/tmp/socket";
    auto ex          = co_await asio::this_coro::executor;

    if (exists(socket_path)) {
        if (!is_socket(socket_path))
            throw std::runtime_error("File exists and is not a socket");
        if (!remove(socket_path))
            throw std::runtime_error("Failed to remove existing socket file");
    }

    UNIX::acceptor acc(ex, socket_path.native());
    acc.listen(5);

    while (true) {
        co_spawn(ex, client_connection(co_await acc.async_accept()), asio::detached);
    }
}

int main() {
    boost::asio::io_context io_context;

    std::cout << "start" << std::endl;
    co_spawn(io_context, daemon_loop(), asio::detached);
    io_context.run();
}

BONUS DEMO

As a bonus, let me also demonstrate how to use Boost Process for the child process instead:

Live On Coliru

#include <boost/asio.hpp>
#include <boost/process/v2.hpp>
#include <filesystem>
#include <iostream>
#include <pty.h>
#include <sys/wait.h>

namespace asio = boost::asio;
namespace bp   = boost::process::v2;
using UNIX     = asio::local::stream_protocol;
using Socket   = UNIX::socket;
using std::filesystem::path;

asio::awaitable<std::string> receive_command(UNIX::socket& client_stream) {
#if 1
    std::string buf;
    co_await async_read_until(client_stream, asio::dynamic_buffer(buf), 'n', asio::deferred);
    while (buf.ends_with('n'))
        buf.pop_back();
    co_return buf;
#else
    char buffer[512];
    auto n = co_await client_stream.async_read_some(asio::buffer(buffer), asio::deferred);
    co_return std::string(buffer, n);
#endif
}

asio::awaitable<void> run_process( //
    [[maybe_unused]] std::string const& command, Socket& client_stream) try {
    asio::any_io_executor ex = co_await asio::this_coro::executor;

    //bp::popen child(ex, "/bin/echo", {"Testing PTY output for command [", command, "]"});
    bp::popen child(ex, "/bin/sh", {"-c", command});

    // int client_fd = client_stream.native_handle();

    for (char buffer[1024]; ;) {
        // auto [ec, n] = co_await child.async_read_some(boost::asio::buffer(buffer),
        // asio::as_tuple(asio::deferred));
        auto n = co_await child.async_read_some(boost::asio::buffer(buffer));

        std::cout << quoted(std::string_view(buffer, n)) << std::endl;

        if (n)
            co_await async_write(client_stream, boost::asio::buffer(buffer, n), asio::deferred);

        // if (ec) {
        // std::cout << "End of stream (" << ec.message() << ")" << std::endl;
        // break;
        //}
    }
} catch (boost::system::system_error const& se) {
    std::cerr << "run_process: " << se.code().message() << std::endl;
} catch (std::exception const& e) {
    std::cerr << "run_process: " << e.what() << std::endl;
}

asio::awaitable<void> client_connection(Socket client) {
    std::cout << "Accepted connection" << std::endl;
    std::string command = co_await receive_command(client);
    std::cout << "Received command: " << command << std::endl;

    co_await run_process(command, client);
    client.close(); // implicit due to RAII lifetime
}

asio::awaitable<void> daemon_loop() {
    path socket_path = "/tmp/socket";
    auto ex          = co_await asio::this_coro::executor;

    if (exists(socket_path)) {
        if (!is_socket(socket_path))
            throw std::runtime_error("File exists and is not a socket");
        if (!remove(socket_path))
            throw std::runtime_error("Failed to remove existing socket file");
    }

    UNIX::acceptor acc(ex, socket_path.native());
    acc.listen(5);

    while (true) {
        co_spawn(ex, client_connection(co_await acc.async_accept()), asio::detached);
    }
}

int main() {
    boost::asio::io_context io_context;

    std::cout << "start" << std::endl;
    co_spawn(io_context, daemon_loop(), asio::detached);
    io_context.run();
}

With a local, more functional demonstration:

1

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật