提交 e260fd09 编写于 作者: G Guodong Rong 提交者: Liu Jiaming

Use two buffers to handle message larger than 65536 bytes.

上级 8a2f1992
......@@ -12,6 +12,7 @@
#include <utility>
#include "boost/bind.hpp"
#include "cyber/common/log.h"
#include "cyber/message/protobuf_factory.h"
#include "modules/contrib/cyber_bridge/clients.h"
......@@ -93,12 +94,24 @@ void Client::handle_read(const boost::system::error_code& ec,
}
void Client::handle_write(const boost::system::error_code& ec) {
if (ec && ec != boost::asio::error::operation_aborted) {
AERROR << "Client write failed, disconnecting" << ec;
clients.stop(shared_from_this());
node.remove(shared_from_this());
if (ec) {
if (ec != boost::asio::error::operation_aborted) {
AERROR << "Client write failed, disconnecting" << ec;
clients.stop(shared_from_this());
node.remove(shared_from_this());
}
return;
}
std::lock_guard<std::mutex> lock(publish_mutex);
writing.clear();
if (!pending.empty()) {
writing.swap(pending);
boost::asio::async_write(
socket, boost::asio::buffer(writing.data(), writing.size()),
boost::bind(&Client::handle_write, shared_from_this(),
boost::asio::placeholders::error));
}
}
// [1] [count] [string] ... [string]
......@@ -255,32 +268,43 @@ void Client::handle_publish() {
buffer.erase(buffer.begin(), buffer.begin() + offset);
}
void Client::publish(const std::string& channel, const std::string& msg) {
std::vector<uint8_t> data;
data.reserve(sizeof(uint8_t) + sizeof(uint32_t) + channel.size() +
sizeof(uint32_t) + msg.size());
void fill_data(std::vector<uint8_t>* data, const std::string& channel,
const std::string& msg) {
data->reserve(data->size() + sizeof(uint8_t) + sizeof(uint32_t) +
channel.size() + sizeof(uint32_t) + msg.size());
data.push_back(OP_PUBLISH);
data->push_back(OP_PUBLISH);
data.push_back(uint8_t(channel.size() >> 0));
data.push_back(uint8_t(channel.size() >> 8));
data.push_back(uint8_t(channel.size() >> 16));
data.push_back(uint8_t(channel.size() >> 24));
data->push_back(uint8_t(channel.size() >> 0));
data->push_back(uint8_t(channel.size() >> 8));
data->push_back(uint8_t(channel.size() >> 16));
data->push_back(uint8_t(channel.size() >> 24));
const uint8_t* channel_data =
reinterpret_cast<const uint8_t*>(channel.data());
data.insert(data.end(), channel_data, channel_data + channel.size());
data->insert(data->end(), channel_data, channel_data + channel.size());
data.push_back(uint8_t(msg.size() >> 0));
data.push_back(uint8_t(msg.size() >> 8));
data.push_back(uint8_t(msg.size() >> 16));
data.push_back(uint8_t(msg.size() >> 24));
data->push_back(uint8_t(msg.size() >> 0));
data->push_back(uint8_t(msg.size() >> 8));
data->push_back(uint8_t(msg.size() >> 16));
data->push_back(uint8_t(msg.size() >> 24));
const uint8_t* msg_data = reinterpret_cast<const uint8_t*>(msg.data());
data.insert(data.end(), msg_data, msg_data + msg.size());
data->insert(data->end(), msg_data, msg_data + msg.size());
}
boost::asio::async_write(
socket, boost::asio::buffer(data.data(), data.size()),
boost::bind(&Client::handle_write, shared_from_this(),
boost::asio::placeholders::error));
void Client::publish(const std::string& channel, const std::string& msg) {
std::lock_guard<std::mutex> lock(publish_mutex);
if (writing.empty()) {
fill_data(&writing, channel, msg);
boost::asio::async_write(
socket, boost::asio::buffer(writing.data(), writing.size()),
boost::bind(&Client::handle_write, shared_from_this(),
boost::asio::placeholders::error));
} else if (pending.size() < MAX_PENDING_SIZE) {
fill_data(&pending, channel, msg);
} else {
// If pending size is larger than MAX_PENDING_SIZE, discard the message.
AERROR << "Pending size too large. Discard message.";
}
}
uint32_t Client::get32le(size_t offset) const {
......
......@@ -17,8 +17,7 @@ class Node;
class Client : public std::enable_shared_from_this<Client> {
public:
Client(Node* node, Clients* clients,
boost::asio::ip::tcp::socket socket);
Client(Node* node, Clients* clients, boost::asio::ip::tcp::socket socket);
~Client();
void start();
......@@ -34,6 +33,10 @@ class Client : public std::enable_shared_from_this<Client> {
uint8_t temp[1024 * 1024];
std::vector<uint8_t> buffer;
std::vector<uint8_t> writing;
std::vector<uint8_t> pending;
std::mutex publish_mutex;
const uint MAX_PENDING_SIZE = 1073741824; // 1GB
void handle_read(const boost::system::error_code& ec, std::size_t length);
void handle_write(const boost::system::error_code& ec);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册