PHP Ratchet websocket listening + sending

I need to establish communication in PHP with a websocket server that is hosted on a node. The first part is to establish a connection to listen for responses from the ws server. The second is to be able to send my own commands and track responses in the listener from the first part. I am using the symfony project and the ratchet/pawl package.

I have created such a client to establish a connection and send messages:

<?php

namespace AppWebSocket;

use RatchetClientWebSocket;
use RatchetClientConnector;
use ReactEventLoopLoop;
use PsrLogLoggerInterface;

class WebsocketClient
{
    private $connection;
    private $loop;
    private $logger;

    public function __construct(LoggerInterface $logger)
    {
        $this->loop = Loop::get();
        $this->logger = $logger;
    }

    public function connect(string $url)
    {
        $connector = new Connector($this->loop);

        $connector($url)->then(function (WebSocket $conn) {
            $this->connection = $conn;
            dump('Connected to WebSocket');

            $conn->on('message', function ($msg) {
                dump("Received message: {$msg}");
                $this->onMessage($msg);
            });

            $conn->on('close', function ($code = null, $reason = null) {
                dump('Connection closed ({$code} - {$reason})');
            });
        }, function (Exception $e) {
            dump("Could not connect: {$e->getMessage()}");
        });
    }

    public function send(string $message)
    {
        if ($this->connection) {
            $this->connection->send($message);
        } else {
            echo "No connection established.n";
        }
    }

    private function onMessage($message)
    {
        $this->logger->info("Received message: {$message}");
    }

    public function run()
    {
        $this->loop->run();
    }

    public function close()
    {
        if ($this->connection) {
            $this->connection->close();
        }
    }
}

I also created a symfony command that sets up the ws server listening:

<?php

namespace AppCommand;

use AppWebSocketWebsocketClient;
use SymfonyComponentConsoleCommandCommand;
use SymfonyComponentConsoleInputInputInterface;
use SymfonyComponentConsoleOutputOutputInterface;
use SymfonyComponentConsoleStyleSymfonyStyle;

class WebsocketListenCommand extends Command
{
    protected static $defaultName = 'app:listen-websocket';
    private $webSocketClient;

    public function __construct(WebsocketClient $webSocketClient)
    {
        parent::__construct();
        $this->webSocketClient = $webSocketClient;
    }

    protected function configure(): void
    {
        $this
            ->setDescription('Listens for and processes messages from WebSocket');
    }

    protected function execute(InputInterface $input, OutputInterface $output): int
    {
        $io = new SymfonyStyle($input, $output);

        $io->success('Listening for WebSocket messages...');

        $this->webSocketClient->connect('ws://x.x.x.x:y');

        $this->webSocketClient->run();

        return Command::SUCCESS;
    }

}

After running the above command, everything works fine. It receives responses in real time.

I would then like to send messages to the websocket server and track the response (while keeping the listening I set up earlier). It injects the service anywhere in the code and tries to send a message:

$this->webSocketClient->send('{"some json, doesnt matter"}');

When trying to use the send method, it doesn’t see any connection and can’t send the message. I have tried setting up the connection myself during send, but then my listener stops responding. Where does the problem lie? Perhaps ZeroMQ or some other solution needs to be used here? I would appreciate your help.

It injects the service anywhere in the code

You’re getting a different PHP object for each instantiation. So, unless you’re explicitly calling connect() with the new object, you’re not getting a connection at all, and send() won’t possibly work.

If you are explicitly calling connect() in the new object, then you’re getting a different connection. This may or may not be ok for your requirements, it depends what you’re trying to do.

So, if you need to both receive and send, your best bet would probably be to have one PHP client script act as a long-lived listener. It would just sit and wait to receive messages. Then when you need to send a message, you’d create a new client object, make a new connection, send the message, and then immediately disconnect. If the receiving side needs to know which client sent the message, that information would have to be embedded in the message, because it would come from a different connection.

Alternatively, you might be able to simply use an HTTP POST to send your messages.

Also Alternatively, you might be able to use SSE via Mercure.

If none of those is possible, then you’ve got a non-trivial problem to work around. Whatever data you need to build the message that you want to send will need to be somehow passed into the long-lived listener script, so that it can perform the send using the same connection that it already has open. This could be done with a lightweight message queue, so your infinite listener loop would listen for websocket messages and then check the queue for new pending sends, and send anything it finds there. This is rather roundabout though, and you should be able to make use of one of the simpler methods.

Ok it looks like using ZeroMQ has solved the problem. It saves the collated WS connection, which I can then use anywhere in the code as I wanted.

I created a pusher class with a method to distribute messages to all collated connections (onOpen):

class Pusher implements MessageComponentInterface
{
protected SplObjectStorage $clients;
...
public function onOpen($conn): void
{
    // Store the new connection to send messages to later
    $this->clients->attach($conn);

    dump("New connection!");
}
...
public function onPushingOutMessage($entry): void
{
    if (is_array($entry)) {
        $entry = json_encode($entry);
    }

    /** @var WebSocket $client */
    foreach ($this->clients as $client) {
        $client->send($entry);
    }

    dump("Pushed message $entry to all connected clients");
}
...

I have added a ZeroMQ class that connects to WS:

class ZeroMQTransport
{
    // Name of method that MessageComponent interfaces should implement to handle pushed updates to websocket clients
    public const string PUSH_HANDLER = 'onPushingOutMessage';
    public const string DSN = 'tcp://127.0.0.1:XXXX';// XXXX - TCP port

    private ZMQSocket $socket;

    public function __construct(private readonly PayloadBuilder $messageBuilder)
    {
        $context = new ZMQContext();
        $socket = $context->getSocket(ZMQ::SOCKET_PUSH, static::PUSH_HANDLER);
        $socket->connect(static::DSN);

        $this->socket = $socket;
    }

    public function send(MessageInterface $message): void
    {
        $payload = $this->messageBuilder->build($message);
        try {
            $this->socket->send($payload);
        } catch (ZMQSocketException $e) {
            echo $e->getMessage();
        }
    }

    public function __destruct()
    {
        $this->socket->disconnect(self::DSN);
    }

    private function __clone() {}

    public function __wakeup()
    {
        throw new Exception("Cannot unserialize singleton");
    }
}

An updated connect method in the WS client:

public function connect(): void
{
    // Listen for the web server to make a ZeroMQ push
    $context = new Context($this->loop);
    $pull = $context->getSocket(ZMQ::SOCKET_PULL);
    try {
        $pull->bind(ZeroMQTransport::DSN);
    } catch (ZMQSocketException $e) {
        $this->logger->error('Error in initializing the socket listener: ' . $e->getMessage() . "nStack: " . $e->getTraceAsString());
    }

    $pull->on('message', function ($payload) {
        $this->pusher->onPushingOutMessage($payload);
    });

    $connector = new Connector($this->loop);

    $connector($this->url)->then(function (WebSocket $conn) {
        $this->logger->info("Connected to WebSocket");
        $conn->on('message', function ($msg) {
            dump("Received message: $msg");
            $this->logger->info("Received message: $msg");
            $this->onMessage($msg);
        });

        $conn->on('error', function (Exception $exception) use ($conn) {
            $this->logger->error("Connection error");
            $this->pusher->onError($conn, $exception);
        });

        $conn->on('close', function ($code = null, $reason = null) use ($conn) {
            $this->logger->error("Connection closed ($code - $reason)");
            $this->pusher->onClose($conn);
        });

        $this->connection = $conn;
        $this->pusher->onOpen($conn);
    }, function (Exception $e) {
        $this->logger->critical("Could not connect: {$e->getMessage()}");
    });
}

Now I can send a WS message and catch the response in my listener:

$this->zeroMQTransport->send($message);

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật