WebSocket协议是现代Web开发中不可或缺的一部分,它允许客户端和服务器之间建立持久的连接,实现双向实时通信。与传统的HTTP请求不同,WebSocket提供了一种全双工的通信通道,使得数据可以在任意方向上传输,而无需等待对方请求或者应答。
Boost.Beast:这是一个基于Boost库的WebSocket库,提供了高性能和易用的API来实现WebSocket通信。Boost.Beast适用于需要高性能和易用性的场景。
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/strand.hpp>
#include <boost/config.hpp>
#include <algorithm>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <vector>
#include <atomic>
#include <mutex>
#include <queue>
#include <condition_variable>
namespace beast = boost::beast;
namespace http = beast::http;
namespace websocket = beast::websocket;
namespace net = boost::asio;
using tcp = boost::asio::ip::tcp;
// 线程安全的消息队列
template<typename T>
class ConcurrentQueue {
std::queue<T> queue_;
mutable std::mutex mutex_;
std::condition_variable cond_;
public:
void push(T const& value) {
std::unique_lock<std::mutex> lock(mutex_);
queue_.push(value);
lock.unlock();
cond_.notify_one();
}
bool try_pop(T& value) {
std::unique_lock<std::mutex> lock(mutex_);
if (queue_.empty()) {
return false;
}
value = queue_.front();
queue_.pop();
return true;
}
void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this] { return !queue_.empty(); });
value = queue_.front();
queue_.pop();
}
bool empty() const {
std::unique_lock<std::mutex> lock(mutex_);
return queue_.empty();
}
};
// WebSocket 会话类
class WebSocketSession : public std::enable_shared_from_this<WebSocketSession> {
websocket::stream<beast::tcp_stream> ws_;
beast::multi_buffer buffer_;
ConcurrentQueue<std::string>& output_queue_;
std::atomic<bool> is_closing_{ false };
std::atomic<bool> writing_{ false };
std::queue<std::string> write_queue_;
std::mutex write_mutex_;
std::vector<char> write_buffer_;
static constexpr size_t MAX_BUFFER_SIZE = 128 * 1024 * 1024; // 16MB
public:
WebSocketSession(tcp::socket&& socket, ConcurrentQueue<std::string>& output_queue)
: ws_(std::move(socket))
, output_queue_(output_queue)
, write_buffer_(MAX_BUFFER_SIZE){
}
~WebSocketSession() {
close();
}
// 获取底层 WebSocket 流
websocket::stream<beast::tcp_stream>& ws() {
return ws_;
}
void run() {
// 设置更合理的超时选项
websocket::stream_base::timeout timeout{
std::chrono::seconds(30), // 握手超时
std::chrono::seconds(30), // 空闲超时
true // 启用 keep-alive pings
};
ws_.set_option(timeout);
// 设置消息大小限制
ws_.read_message_max(MAX_BUFFER_SIZE);
// 设置 decorator
ws_.set_option(websocket::stream_base::decorator(
[](websocket::response_type& res) {
res.set(http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-server");
}));
// 异步接受 WebSocket 握手
ws_.async_accept(
beast::bind_front_handler(
&WebSocketSession::on_accept,
shared_from_this()));
}
void on_accept(beast::error_code ec) {
if (ec) {
std::cerr << "WebSocket accept error: " << ec.message() << "\n";
return;
}
// 开始读取消息
do_read();
}
void close(websocket::close_code code = websocket::close_code::normal) {
if (is_closing_.exchange(true)) {
return;
}
beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
ws_.async_close(code,
[self = shared_from_this()](beast::error_code ec) {
if (ec) {
std::cerr << "WebSocket close error: " << ec.message() << "\n";
}
});
}
void do_read() {
if (is_closing_) {
return;
}
// 检查缓冲区大小
if (buffer_.size() >= MAX_BUFFER_SIZE) {
std::cerr << "Buffer size exceeded, closing connection\n";
close(websocket::close_code::too_big);
return;
}
// 异步读取
ws_.async_read(
buffer_,
beast::bind_front_handler(
&WebSocketSession::on_read,
shared_from_this()));
}
void on_read(beast::error_code ec, std::size_t bytes_transferred) {
std::cout << "on_read bytes_transferred:" << bytes_transferred << "\n";
if (ec == websocket::error::closed) {
close();
return;
}
if (ec) {
std::cerr << "WebSocket read error: " << ec.message() << "\n";
close();
return;
}
// 检查消息是否完整
if (!ws_.is_message_done()) {
do_read();
return;
}
// 处理完整消息
std::string message = beast::buffers_to_string(buffer_.data());
buffer_.consume(buffer_.size());
try {
output_queue_.push(message);
}
catch (const std::exception& e) {
std::cerr << "Message queue error: " << e.what() << "\n";
close(websocket::close_code::internal_error);
return;
}
// 继续读取
do_read();
}
void do_write(const std::string& message) {
if (is_closing_) {
return;
}
// 添加到写队列
{
std::lock_guard<std::mutex> lock(write_mutex_);
write_queue_.push(message);
if (writing_) {
return;
}
writing_ = true;
}
write_loop();
}
void write_loop() {
{
std::lock_guard<std::mutex> lock(write_mutex_);
if (write_queue_.empty()) {
writing_ = false;
return;
}
std::string msg = std::move(write_queue_.front());
write_buffer_.assign(msg.begin(), msg.end());
write_queue_.pop();
}
ws_.async_write(
net::buffer(&write_buffer_[0], write_buffer_.size()),
beast::bind_front_handler(
&WebSocketSession::on_write,
shared_from_this()));
}
void on_write(beast::error_code ec, std::size_t bytes_transferred) {
if (ec) {
std::cerr << "WebSocket write error: " << ec.message() << "\n";
close();
return;
}
std::cout << "on_write bytes_transferred:" << bytes_transferred << "\n";
write_loop();
}
};
// WebSocket 服务器类
class WebSocketServer {
net::io_context ioc_;
tcp::acceptor acceptor_;
std::vector<std::thread> io_threads_;
std::atomic<bool> stopped_{ false };
// 输出消息队列
ConcurrentQueue<std::string> output_queue_;
// 活动会话列表
std::vector<std::shared_ptr<WebSocketSession>> sessions_;
mutable std::mutex sessions_mutex_;
public:
WebSocketServer(
unsigned short port,
int thread_count = std::thread::hardware_concurrency())
: ioc_(thread_count)
, acceptor_(ioc_, { net::ip::make_address("0.0.0.0"), port })
{
// 启动 I/O 线程池
for (int i = 0; i < thread_count; ++i) {
io_threads_.emplace_back([this] {
ioc_.run();
});
}
// 启动输出处理线程
io_threads_.emplace_back([this] {
process_output();
});
}
~WebSocketServer() {
stop();
}
// 启动服务器
void start() {
if (!stopped_) {
do_accept();
}
}
// 停止服务器
void stop() {
if (stopped_.exchange(true)) {
return;
}
// 关闭所有会话
{
std::lock_guard<std::mutex> lock(sessions_mutex_);
for (auto& session : sessions_) {
session->close();
}
sessions_.clear();
}
// 停止 I/O 上下文
ioc_.stop();
// 等待所有线程结束
for (auto& thread : io_threads_) {
if (thread.joinable()) {
thread.join();
}
}
}
// 广播消息给所有客户端
void broadcast(const std::string& message) {
std::lock_guard<std::mutex> lock(sessions_mutex_);
for (auto& session : sessions_) {
session->do_write(message);
}
}
private:
void do_accept() {
acceptor_.async_accept(
net::make_strand(ioc_),
beast::bind_front_handler(
&WebSocketServer::on_accept,
this));
}
void on_accept(beast::error_code ec, tcp::socket socket) {
if (ec) {
if (ec != net::error::operation_aborted) {
std::cerr << "Accept error: " << ec.message() << "\n";
}
return;
}
// 创建新会话并添加到会话列表
auto session = std::make_shared<WebSocketSession>(
std::move(socket), output_queue_);
{
std::lock_guard<std::mutex> lock(sessions_mutex_);
sessions_.push_back(session);
}
// 启动会话
session->run();
// 继续接受新连接
if (!stopped_) {
do_accept();
}
}
// 处理输出消息
void process_output() {
while (!stopped_) {
std::string message;
output_queue_.wait_and_pop(message);
broadcast(message);
}
}
};
int main(int argc, char* argv[]) {
try {
auto const port = 8080;
auto const threads = std::max<int>(1, std::thread::hardware_concurrency());
// 创建并启动服务器
WebSocketServer server{ port, threads };
server.start();
std::cout << "WebSocket server is running on port " << port << std::endl;
std::cout << "Press Enter to exit..." << std::endl;
// 等待用户输入以停止服务器
std::cin.get();
server.stop();
}
catch (std::exception const& e) {
std::cerr << "Error: " << e.what() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
} 收藏的用户(0) X
正在加载信息~
推荐阅读
最新回复 (1)
-
#include <boost/beast/core.hpp> #include <boost/beast/websocket.hpp> #include <boost/asio/ip/tcp.hpp> #include <boost/asio/strand.hpp> #include <boost/config.hpp> #include <algorithm> #include <cstdlib> #include <functional> #include <iostream> #include <memory> #include <string> #include <thread> #include <vector> #include <atomic> #include <mutex> #include <queue> #include <condition_variable> namespace beast = boost::beast; namespace http = beast::http; namespace websocket = beast::websocket; namespace net = boost::asio; using tcp = boost::asio::ip::tcp; // 线程安全的消息队列 template<typename T> class ConcurrentQueue { std::queue<T> queue_; mutable std::mutex mutex_; std::condition_variable cond_; public: void push(T const& value) { std::unique_lock<std::mutex> lock(mutex_); queue_.push(value); lock.unlock(); cond_.notify_one(); } bool try_pop(T& value) { std::unique_lock<std::mutex> lock(mutex_); if (queue_.empty()) { return false; } value = queue_.front(); queue_.pop(); return true; } void wait_and_pop(T& value) { std::unique_lock<std::mutex> lock(mutex_); cond_.wait(lock, [this] { return !queue_.empty(); }); value = queue_.front(); queue_.pop(); } bool empty() const { std::unique_lock<std::mutex> lock(mutex_); return queue_.empty(); } }; // WebSocket 会话类 class WebSocketSession : public std::enable_shared_from_this<WebSocketSession> { websocket::stream<beast::tcp_stream> ws_; // 读写入缓冲空 beast::flat_buffer read_buffer_; std::vector<char> write_buffer_; ConcurrentQueue<std::string>& output_queue_; // 关闭处理标志 std::atomic<bool> is_closing_{ false }; net::steady_timer timer_; bool ping_sent_ = false; std::chrono::steady_clock::time_point last_pong_time_; public: WebSocketSession(tcp::socket&& socket, ConcurrentQueue<std::string>& output_queue) : ws_(std::move(socket)) , output_queue_(output_queue) , write_buffer_(10 * 1024 * 1024) , timer_(ws_.get_executor()) { } ~WebSocketSession() { close(); } // 获取底层 WebSocket 流 websocket::stream<beast::tcp_stream>& ws() { return ws_; } // 启动 WebSocket 会话 void run() { // 设置建议的 timeout 选项 ws_.set_option( websocket::stream_base::timeout::suggested( beast::role_type::server)); // 设置 decorator 修改握手响应 ws_.set_option(websocket::stream_base::decorator( [](websocket::response_type& res) { res.set(http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-server"); })); ws_.read_message_max(10 * 1024 * 1024); ws_.control_callback([&](websocket::frame_type kind, beast::string_view payload) { switch (kind) { case websocket::frame_type::ping: // 收到Ping,自动回复Pong std::cout << "Received ping, auto-replying with pong\n"; break; case websocket::frame_type::pong: // 收到Pong响应 ping_sent_ = false; last_pong_time_ = std::chrono::steady_clock::now(); std::cout << "Received pong\n"; break; case websocket::frame_type::close: std::cout << "Received close frame\n"; break; } }); // 异步接受 WebSocket 握手 ws_.async_accept( beast::bind_front_handler( &WebSocketSession::on_accept, shared_from_this())); } // 关闭连接 void close() { if (is_closing_.exchange(true)) { return; // 已经在关闭过程中 } // 安全地关闭 WebSocket 连接 beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30)); ws_.async_close(websocket::close_code::normal, [self = shared_from_this()](beast::error_code ec) { if (ec) { std::cerr << "WebSocket close error: " << ec.message() << "\n"; } }); } void on_close(beast::error_code ec) { if (ec) std::cerr << "onclose error: " << ec.message() << "\n"; // 取消定时器 timer_.cancel(); } void start_ping_timer() { timer_.expires_after(std::chrono::seconds(5)); timer_.async_wait( beast::bind_front_handler( &WebSocketSession::on_ping_timer, shared_from_this())); } void on_ping_timer(beast::error_code ec) { if (ec == net::error::operation_aborted) return; if (ec)std::cerr << "timer: " << ec.message() << "\n"; // 检查上次Pong响应时间 auto now = std::chrono::steady_clock::now(); if (now - last_pong_time_ > std::chrono::seconds(10)) { std::cerr << "No pong received for 10 seconds, closing connection\n"; return ws_.async_close(websocket::close_code::normal, beast::bind_front_handler( &WebSocketSession::on_close, shared_from_this())); } // 发送Ping if (!ping_sent_) { ping_sent_ = true; ws_.async_ping("", beast::bind_front_handler( &WebSocketSession::on_ping, shared_from_this())); } else { // 已经发送过Ping但未收到Pong,可能是连接问题 std::cerr << "Previous ping not answered, closing connection\n"; return ws_.async_close(websocket::close_code::normal, beast::bind_front_handler( &WebSocketSession::on_close, shared_from_this())); } } void on_ping(beast::error_code ec) { if (ec) std::cerr << "ping error: " << ec.message() << "\n"; // 重置定时器 start_ping_timer(); } void on_accept(beast::error_code ec) { if (ec) { std::cerr << "WebSocket accept error: " << ec.message() << "\n"; return; } last_pong_time_ = std::chrono::steady_clock::now(); // 开始心跳检测 start_ping_timer(); // 开始读取消息 do_read(); } void do_read() { if (is_closing_) { return; } // 读取 WebSocket 消息 ws_.async_read( read_buffer_, beast::bind_front_handler( &WebSocketSession::on_read, shared_from_this())); } void on_read(beast::error_code ec, std::size_t bytes_transferred) { boost::ignore_unused(bytes_transferred); if (ec == websocket::error::closed) { // 客户端正常关闭连接 close(); return; } if (ec) { std::cerr << "WebSocket read error: " << ec.message() << "\n"; close(); return; } // 处理接收到的消息 std::string message = beast::buffers_to_string(read_buffer_.data()); read_buffer_.consume(read_buffer_.size()); std::cout << "on_read bytes_transferred: " << bytes_transferred << std::endl; // 将消息放入输出队列 output_queue_.push(message); // 继续读取下一条消息 do_read(); } void do_write(const std::string& message) { if (is_closing_) { return; } write_buffer_.assign(message.begin(), message.end()); // 异步写入消息 ws_.async_write( net::buffer(write_buffer_), beast::bind_front_handler( &WebSocketSession::on_write, shared_from_this())); } void on_write(beast::error_code ec, std::size_t bytes_transferred) { boost::ignore_unused(bytes_transferred); if (ec) { std::cerr << "WebSocket write error: " << ec.message() << "\n"; close(); return; } std::cout << "on_write bytes_transferred: " << bytes_transferred << std::endl; } }; // WebSocket 服务器类 class WebSocketServer { net::io_context ioc_; tcp::acceptor acceptor_; std::vector<std::thread> io_threads_; std::atomic<bool> stopped_{ false }; // 输出消息队列 ConcurrentQueue<std::string> output_queue_; // 活动会话列表 std::vector<std::shared_ptr<WebSocketSession>> sessions_; mutable std::mutex sessions_mutex_; public: WebSocketServer( unsigned short port, int thread_count = std::thread::hardware_concurrency()) : ioc_(thread_count) , acceptor_(ioc_, { net::ip::make_address("0.0.0.0"), port }) { // 启动 I/O 线程池 for (int i = 0; i < thread_count; ++i) { io_threads_.emplace_back([this] { ioc_.run(); }); } // 启动输出处理线程 io_threads_.emplace_back([this] { process_output(); }); } ~WebSocketServer() { stop(); } // 启动服务器 void start() { if (!stopped_) { do_accept(); } } // 停止服务器 void stop() { if (stopped_.exchange(true)) { return; } // 关闭所有会话 { std::lock_guard<std::mutex> lock(sessions_mutex_); for (auto& session : sessions_) { session->close(); } sessions_.clear(); } // 停止 I/O 上下文 ioc_.stop(); // 等待所有线程结束 for (auto& thread : io_threads_) { if (thread.joinable()) { thread.join(); } } } // 广播消息给所有客户端 void broadcast(const std::string& message) { std::lock_guard<std::mutex> lock(sessions_mutex_); for (auto& session : sessions_) { session->do_write(message); } } private: void do_accept() { acceptor_.async_accept( net::make_strand(ioc_), beast::bind_front_handler( &WebSocketServer::on_accept, this)); } void on_accept(beast::error_code ec, tcp::socket socket) { if (ec) { if (ec != net::error::operation_aborted) { std::cerr << "Accept error: " << ec.message() << "\n"; } return; } // 创建新会话并添加到会话列表 auto session = std::make_shared<WebSocketSession>( std::move(socket), output_queue_); { std::lock_guard<std::mutex> lock(sessions_mutex_); sessions_.push_back(session); } // 启动会话 session->run(); // 继续接受新连接 if (!stopped_) { do_accept(); } } // 处理输出消息 void process_output() { while (!stopped_) { std::string message; output_queue_.wait_and_pop(message); // 示例:回显消息给所有客户端 std::string response = "Echo: " + message; broadcast(response); } } }; int main(int argc, char* argv[]) { try { auto const port = 8080; auto const threads = std::max<int>(1, std::thread::hardware_concurrency()); // 创建并启动服务器 WebSocketServer server{ port, threads }; server.start(); std::cout << "WebSocket server is running on port " << port << std::endl; std::cout << "Press Enter to exit..." << std::endl; // 等待停止信号 std::mutex mtx; std::condition_variable cv; std::unique_lock<std::mutex> lock(mtx); cv.wait(lock); server.stop(); } catch (std::exception const& e) { std::cerr << "Error: " << e.what() << std::endl; return EXIT_FAILURE; } return EXIT_SUCCESS; }
站点信息
- 文章2313
- 用户1336
- 访客11759686
每日一句
Happiness depends on your mindset.
幸福取决于你的心态。
幸福取决于你的心态。
新会员