Maybe somebody can provide simple example(or references) on how to setup client and server using websocket from boost/beast library? I need an example on how to handle input message on server and respond/not to it(such that client wont crush), how to send some data to specific “subscribed” connections and how to handle it on client side.
I found this example, but it doesn’t work as intended:
Client side:
<code>#include <boost/lockfree/queue.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <nlohmann/json.hpp>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using tcp = asio::ip::tcp;
using json = nlohmann::json;
// Thread-safe queue class
template <typename T>
class ThreadSafeQueue {
public:
void push(const T& value) {
std::lock_guard<std::mutex> lock(mtx_);
queue_.push(value);
cond_var_.notify_one();
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lock(mtx_);
if (queue_.empty()) {
return false;
}
value = queue_.front();
queue_.pop();
return true;
}
void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lock(mtx_);
cond_var_.wait(lock, [this] { return !queue_.empty(); });
value = queue_.front();
queue_.pop();
}
private:
std::queue<T> queue_;
mutable std::mutex mtx_;
std::condition_variable cond_var_;
};
class Singleton {
public:
ThreadSafeQueue<json>* data_queue;
websocket::stream<tcp::socket>* ws;
Singleton(
ThreadSafeQueue<json>* data_queue_,
websocket::stream<tcp::socket>* ws_
) :
data_queue(data_queue_),
ws(ws_)
{}
};
void readData(Singleton& data) {
try {
beast::flat_buffer buffer;
while (true) {
data.ws->read(buffer);
auto received_message = beast::buffers_to_string(buffer.data());
json received_json = json::parse(received_message);
data.data_queue->push(received_json);
std::cout << "Received from server: " << received_message << "n";
buffer.consume(buffer.size());
}
}
catch (std::exception e) {
std::cout << "Error occured in reader: " << e.what() << "n";
}
}
void sendData(Singleton& data) {
try {
std::string input;
while (true) {
std::cout << "To send: ";
std::getline(std::cin, input);
if (input == "stop") break;
json message = {
{input[0] == 'h' ? "echo" : "null", input}
};
data.ws->write(asio::buffer(message.dump()));
}
}
catch (std::exception e) {
std::cout << "Error occured in sender: " << e.what() << "n";
}
}
int main() {
std::string const host = "127.0.0.1";
std::string const port = "9002";
asio::io_context ioc;
tcp::resolver resolver(ioc);
websocket::stream<tcp::socket> ws(ioc);
auto const results = resolver.resolve(host, port);
asio::connect(ws.next_layer(), results);
ws.handshake(host, "/");
ThreadSafeQueue<json> data_queue;
Singleton single(&data_queue, &ws);
std::thread reader(readData, std::ref(single));
std::thread sender(sendData, std::ref(single));
reader.join();
sender.join();
return 0;
}
</code>
<code>#include <boost/lockfree/queue.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <nlohmann/json.hpp>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using tcp = asio::ip::tcp;
using json = nlohmann::json;
// Thread-safe queue class
template <typename T>
class ThreadSafeQueue {
public:
void push(const T& value) {
std::lock_guard<std::mutex> lock(mtx_);
queue_.push(value);
cond_var_.notify_one();
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lock(mtx_);
if (queue_.empty()) {
return false;
}
value = queue_.front();
queue_.pop();
return true;
}
void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lock(mtx_);
cond_var_.wait(lock, [this] { return !queue_.empty(); });
value = queue_.front();
queue_.pop();
}
private:
std::queue<T> queue_;
mutable std::mutex mtx_;
std::condition_variable cond_var_;
};
class Singleton {
public:
ThreadSafeQueue<json>* data_queue;
websocket::stream<tcp::socket>* ws;
Singleton(
ThreadSafeQueue<json>* data_queue_,
websocket::stream<tcp::socket>* ws_
) :
data_queue(data_queue_),
ws(ws_)
{}
};
void readData(Singleton& data) {
try {
beast::flat_buffer buffer;
while (true) {
data.ws->read(buffer);
auto received_message = beast::buffers_to_string(buffer.data());
json received_json = json::parse(received_message);
data.data_queue->push(received_json);
std::cout << "Received from server: " << received_message << "n";
buffer.consume(buffer.size());
}
}
catch (std::exception e) {
std::cout << "Error occured in reader: " << e.what() << "n";
}
}
void sendData(Singleton& data) {
try {
std::string input;
while (true) {
std::cout << "To send: ";
std::getline(std::cin, input);
if (input == "stop") break;
json message = {
{input[0] == 'h' ? "echo" : "null", input}
};
data.ws->write(asio::buffer(message.dump()));
}
}
catch (std::exception e) {
std::cout << "Error occured in sender: " << e.what() << "n";
}
}
int main() {
std::string const host = "127.0.0.1";
std::string const port = "9002";
asio::io_context ioc;
tcp::resolver resolver(ioc);
websocket::stream<tcp::socket> ws(ioc);
auto const results = resolver.resolve(host, port);
asio::connect(ws.next_layer(), results);
ws.handshake(host, "/");
ThreadSafeQueue<json> data_queue;
Singleton single(&data_queue, &ws);
std::thread reader(readData, std::ref(single));
std::thread sender(sendData, std::ref(single));
reader.join();
sender.join();
return 0;
}
</code>
#include <boost/lockfree/queue.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <nlohmann/json.hpp>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using tcp = asio::ip::tcp;
using json = nlohmann::json;
// Thread-safe queue class
template <typename T>
class ThreadSafeQueue {
public:
void push(const T& value) {
std::lock_guard<std::mutex> lock(mtx_);
queue_.push(value);
cond_var_.notify_one();
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lock(mtx_);
if (queue_.empty()) {
return false;
}
value = queue_.front();
queue_.pop();
return true;
}
void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lock(mtx_);
cond_var_.wait(lock, [this] { return !queue_.empty(); });
value = queue_.front();
queue_.pop();
}
private:
std::queue<T> queue_;
mutable std::mutex mtx_;
std::condition_variable cond_var_;
};
class Singleton {
public:
ThreadSafeQueue<json>* data_queue;
websocket::stream<tcp::socket>* ws;
Singleton(
ThreadSafeQueue<json>* data_queue_,
websocket::stream<tcp::socket>* ws_
) :
data_queue(data_queue_),
ws(ws_)
{}
};
void readData(Singleton& data) {
try {
beast::flat_buffer buffer;
while (true) {
data.ws->read(buffer);
auto received_message = beast::buffers_to_string(buffer.data());
json received_json = json::parse(received_message);
data.data_queue->push(received_json);
std::cout << "Received from server: " << received_message << "n";
buffer.consume(buffer.size());
}
}
catch (std::exception e) {
std::cout << "Error occured in reader: " << e.what() << "n";
}
}
void sendData(Singleton& data) {
try {
std::string input;
while (true) {
std::cout << "To send: ";
std::getline(std::cin, input);
if (input == "stop") break;
json message = {
{input[0] == 'h' ? "echo" : "null", input}
};
data.ws->write(asio::buffer(message.dump()));
}
}
catch (std::exception e) {
std::cout << "Error occured in sender: " << e.what() << "n";
}
}
int main() {
std::string const host = "127.0.0.1";
std::string const port = "9002";
asio::io_context ioc;
tcp::resolver resolver(ioc);
websocket::stream<tcp::socket> ws(ioc);
auto const results = resolver.resolve(host, port);
asio::connect(ws.next_layer(), results);
ws.handshake(host, "/");
ThreadSafeQueue<json> data_queue;
Singleton single(&data_queue, &ws);
std::thread reader(readData, std::ref(single));
std::thread sender(sendData, std::ref(single));
reader.join();
sender.join();
return 0;
}
Server side:
<code>#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <nlohmann/json.hpp>
#include <iostream>
#include <memory>
#include <string>
namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using tcp = asio::ip::tcp;
using json = nlohmann::json;
class WebSocketSession : public std::enable_shared_from_this<WebSocketSession> {
public:
WebSocketSession(tcp::socket socket)
: ws_(std::move(socket)) {}
void run() {
ws_.async_accept(
beast::bind_front_handler(
&WebSocketSession::on_accept,
shared_from_this()
)
);
}
private:
void on_accept(beast::error_code ec) {
if (ec) {
std::cerr << "Accept error: " << ec.message() << std::endl;
return;
}
do_read();
}
void do_read() {
ws_.async_read(
buffer_,
beast::bind_front_handler(
&WebSocketSession::on_read,
shared_from_this()
)
);
}
void on_read(beast::error_code ec, std::size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);
if (ec) {
if (ec == websocket::error::closed) {
return;
}
std::cerr << "Read error: " << ec.message() << std::endl;
return;
}
try {
auto received_message = beast::buffers_to_string(buffer_.data());
json received_json = json::parse(received_message);
std::string response_message;
if (received_json.contains("echo")) {
json response_json = {
{"type", "response"},
{"original", received_json}
};
response_message = response_json.dump();
}
else {
buffer_.consume(buffer_.size());
do_read();
return;
}
response_ptr_ = std::make_shared<std::string>(std::move(response_message));
ws_.text(ws_.got_text());
ws_.async_write(
asio::buffer(*response_ptr_),
beast::bind_front_handler(
&WebSocketSession::on_write,
shared_from_this()));
}
catch (const std::exception& e) {
std::cerr << "Processing error: " << e.what() << std::endl;
}
}
void on_write(beast::error_code ec, std::size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);
if (ec) {
std::cerr << "Write error: " << ec.message() << std::endl;
return;
}
buffer_.consume(buffer_.size());
response_ptr_.reset();
do_read();
}
websocket::stream<tcp::socket> ws_;
beast::flat_buffer buffer_;
std::shared_ptr<std::string> response_ptr_;
};
class WebSocketServer {
public:
WebSocketServer(asio::io_context& ioc, tcp::endpoint endpoint)
: acceptor_(ioc, endpoint) {
do_accept();
}
private:
void do_accept() {
acceptor_.async_accept(
beast::bind_front_handler(
&WebSocketServer::on_accept,
this
)
);
}
void on_accept(beast::error_code ec, tcp::socket socket) {
if (ec) {
std::cerr << "Accept error: " << ec.message() << std::endl;
}
else {
std::make_shared<WebSocketSession>(std::move(socket))->run();
}
do_accept();
}
tcp::acceptor acceptor_;
};
int main() {
std::cout << "Web server is running:n";
try {
asio::io_context ioc;
tcp::endpoint endpoint(tcp::v4(), 9002);
WebSocketServer server(ioc, endpoint);
ioc.run();
}
catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
</code>
<code>#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <nlohmann/json.hpp>
#include <iostream>
#include <memory>
#include <string>
namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using tcp = asio::ip::tcp;
using json = nlohmann::json;
class WebSocketSession : public std::enable_shared_from_this<WebSocketSession> {
public:
WebSocketSession(tcp::socket socket)
: ws_(std::move(socket)) {}
void run() {
ws_.async_accept(
beast::bind_front_handler(
&WebSocketSession::on_accept,
shared_from_this()
)
);
}
private:
void on_accept(beast::error_code ec) {
if (ec) {
std::cerr << "Accept error: " << ec.message() << std::endl;
return;
}
do_read();
}
void do_read() {
ws_.async_read(
buffer_,
beast::bind_front_handler(
&WebSocketSession::on_read,
shared_from_this()
)
);
}
void on_read(beast::error_code ec, std::size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);
if (ec) {
if (ec == websocket::error::closed) {
return;
}
std::cerr << "Read error: " << ec.message() << std::endl;
return;
}
try {
auto received_message = beast::buffers_to_string(buffer_.data());
json received_json = json::parse(received_message);
std::string response_message;
if (received_json.contains("echo")) {
json response_json = {
{"type", "response"},
{"original", received_json}
};
response_message = response_json.dump();
}
else {
buffer_.consume(buffer_.size());
do_read();
return;
}
response_ptr_ = std::make_shared<std::string>(std::move(response_message));
ws_.text(ws_.got_text());
ws_.async_write(
asio::buffer(*response_ptr_),
beast::bind_front_handler(
&WebSocketSession::on_write,
shared_from_this()));
}
catch (const std::exception& e) {
std::cerr << "Processing error: " << e.what() << std::endl;
}
}
void on_write(beast::error_code ec, std::size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);
if (ec) {
std::cerr << "Write error: " << ec.message() << std::endl;
return;
}
buffer_.consume(buffer_.size());
response_ptr_.reset();
do_read();
}
websocket::stream<tcp::socket> ws_;
beast::flat_buffer buffer_;
std::shared_ptr<std::string> response_ptr_;
};
class WebSocketServer {
public:
WebSocketServer(asio::io_context& ioc, tcp::endpoint endpoint)
: acceptor_(ioc, endpoint) {
do_accept();
}
private:
void do_accept() {
acceptor_.async_accept(
beast::bind_front_handler(
&WebSocketServer::on_accept,
this
)
);
}
void on_accept(beast::error_code ec, tcp::socket socket) {
if (ec) {
std::cerr << "Accept error: " << ec.message() << std::endl;
}
else {
std::make_shared<WebSocketSession>(std::move(socket))->run();
}
do_accept();
}
tcp::acceptor acceptor_;
};
int main() {
std::cout << "Web server is running:n";
try {
asio::io_context ioc;
tcp::endpoint endpoint(tcp::v4(), 9002);
WebSocketServer server(ioc, endpoint);
ioc.run();
}
catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
</code>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <nlohmann/json.hpp>
#include <iostream>
#include <memory>
#include <string>
namespace asio = boost::asio;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using tcp = asio::ip::tcp;
using json = nlohmann::json;
class WebSocketSession : public std::enable_shared_from_this<WebSocketSession> {
public:
WebSocketSession(tcp::socket socket)
: ws_(std::move(socket)) {}
void run() {
ws_.async_accept(
beast::bind_front_handler(
&WebSocketSession::on_accept,
shared_from_this()
)
);
}
private:
void on_accept(beast::error_code ec) {
if (ec) {
std::cerr << "Accept error: " << ec.message() << std::endl;
return;
}
do_read();
}
void do_read() {
ws_.async_read(
buffer_,
beast::bind_front_handler(
&WebSocketSession::on_read,
shared_from_this()
)
);
}
void on_read(beast::error_code ec, std::size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);
if (ec) {
if (ec == websocket::error::closed) {
return;
}
std::cerr << "Read error: " << ec.message() << std::endl;
return;
}
try {
auto received_message = beast::buffers_to_string(buffer_.data());
json received_json = json::parse(received_message);
std::string response_message;
if (received_json.contains("echo")) {
json response_json = {
{"type", "response"},
{"original", received_json}
};
response_message = response_json.dump();
}
else {
buffer_.consume(buffer_.size());
do_read();
return;
}
response_ptr_ = std::make_shared<std::string>(std::move(response_message));
ws_.text(ws_.got_text());
ws_.async_write(
asio::buffer(*response_ptr_),
beast::bind_front_handler(
&WebSocketSession::on_write,
shared_from_this()));
}
catch (const std::exception& e) {
std::cerr << "Processing error: " << e.what() << std::endl;
}
}
void on_write(beast::error_code ec, std::size_t bytes_transferred) {
boost::ignore_unused(bytes_transferred);
if (ec) {
std::cerr << "Write error: " << ec.message() << std::endl;
return;
}
buffer_.consume(buffer_.size());
response_ptr_.reset();
do_read();
}
websocket::stream<tcp::socket> ws_;
beast::flat_buffer buffer_;
std::shared_ptr<std::string> response_ptr_;
};
class WebSocketServer {
public:
WebSocketServer(asio::io_context& ioc, tcp::endpoint endpoint)
: acceptor_(ioc, endpoint) {
do_accept();
}
private:
void do_accept() {
acceptor_.async_accept(
beast::bind_front_handler(
&WebSocketServer::on_accept,
this
)
);
}
void on_accept(beast::error_code ec, tcp::socket socket) {
if (ec) {
std::cerr << "Accept error: " << ec.message() << std::endl;
}
else {
std::make_shared<WebSocketSession>(std::move(socket))->run();
}
do_accept();
}
tcp::acceptor acceptor_;
};
int main() {
std::cout << "Web server is running:n";
try {
asio::io_context ioc;
tcp::endpoint endpoint(tcp::v4(), 9002);
WebSocketServer server(ioc, endpoint);
ioc.run();
}
catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
1