提交 d8f61553 编写于 作者: M massakam 提交者: Matteo Merli

Introduce strand to C++ client for exclusive control (#4750)

上级 849b6c57
......@@ -138,6 +138,13 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
executor_(executor),
resolver_(executor->createTcpResolver()),
socket_(executor->createSocket()),
#if BOOST_VERSION >= 107000
strand_(boost::asio::make_strand(executor_->io_service_.get_executor())),
#elif BOOST_VERSION >= 106600
strand_(executor_->io_service_.get_executor()),
#else
strand_(executor_->io_service_),
#endif
logicalAddress_(logicalAddress),
physicalAddress_(physicalAddress),
cnxString_("[<none> -> " + physicalAddress + "] "),
......@@ -344,9 +351,16 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
return;
}
}
#if BOOST_VERSION >= 106600
tlsSocket_->async_handshake(
boost::asio::ssl::stream<tcp::socket>::client,
std::bind(&ClientConnection::handleHandshake, shared_from_this(), std::placeholders::_1));
boost::asio::bind_executor(strand_, std::bind(&ClientConnection::handleHandshake,
shared_from_this(), std::placeholders::_1)));
#else
tlsSocket_->async_handshake(boost::asio::ssl::stream<tcp::socket>::client,
strand_.wrap(std::bind(&ClientConnection::handleHandshake,
shared_from_this(), std::placeholders::_1)));
#endif
} else {
handleHandshake(boost::system::errc::make_error_code(boost::system::errc::success));
}
......@@ -1146,31 +1160,57 @@ void ClientConnection::sendCommand(const SharedBuffer& cmd) {
if (pendingWriteOperations_++ == 0) {
// Write immediately to socket
asyncWrite(cmd.const_asio_buffer(),
customAllocWriteHandler(std::bind(&ClientConnection::handleSend, shared_from_this(),
std::placeholders::_1, cmd)));
if (tlsSocket_) {
#if BOOST_VERSION >= 106600
boost::asio::post(strand_,
std::bind(&ClientConnection::sendCommandInternal, shared_from_this(), cmd));
#else
strand_.post(std::bind(&ClientConnection::sendCommandInternal, shared_from_this(), cmd));
#endif
} else {
sendCommandInternal(cmd);
}
} else {
// Queue to send later
pendingWriteBuffers_.push_back(cmd);
}
}
void ClientConnection::sendCommandInternal(const SharedBuffer& cmd) {
asyncWrite(cmd.const_asio_buffer(),
customAllocWriteHandler(
std::bind(&ClientConnection::handleSend, shared_from_this(), std::placeholders::_1, cmd)));
}
void ClientConnection::sendMessage(const OpSendMsg& opSend) {
Lock lock(mutex_);
if (pendingWriteOperations_++ == 0) {
PairSharedBuffer buffer = Commands::newSend(outgoingBuffer_, outgoingCmd_, opSend.producerId_,
opSend.sequenceId_, getChecksumType(), opSend.msg_);
// Write immediately to socket
asyncWrite(buffer, customAllocWriteHandler(std::bind(&ClientConnection::handleSendPair,
shared_from_this(), std::placeholders::_1)));
if (tlsSocket_) {
#if BOOST_VERSION >= 106600
boost::asio::post(strand_,
std::bind(&ClientConnection::sendMessageInternal, shared_from_this(), opSend));
#else
strand_.post(std::bind(&ClientConnection::sendMessageInternal, shared_from_this(), opSend));
#endif
} else {
sendMessageInternal(opSend);
}
} else {
// Queue to send later
pendingWriteBuffers_.push_back(opSend);
}
}
void ClientConnection::sendMessageInternal(const OpSendMsg& opSend) {
PairSharedBuffer buffer = Commands::newSend(outgoingBuffer_, outgoingCmd_, opSend.producerId_,
opSend.sequenceId_, getChecksumType(), opSend.msg_);
asyncWrite(buffer, customAllocWriteHandler(std::bind(&ClientConnection::handleSendPair,
shared_from_this(), std::placeholders::_1)));
}
void ClientConnection::handleSend(const boost::system::error_code& err, const SharedBuffer&) {
if (err) {
LOG_WARN(cnxString_ << "Could not send message on connection: " << err << " " << err.message());
......
......@@ -24,6 +24,7 @@
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio/strand.hpp>
#include <boost/any.hpp>
#include <mutex>
#include <functional>
......@@ -125,7 +126,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
LookupDataResultPromisePtr promise);
void sendCommand(const SharedBuffer& cmd);
void sendCommandInternal(const SharedBuffer& cmd);
void sendMessage(const OpSendMsg& opSend);
void sendMessageInternal(const OpSendMsg& opSend);
void registerProducer(int producerId, ProducerImplPtr producer);
void registerConsumer(int consumerId, ConsumerImplPtr consumer);
......@@ -220,7 +223,11 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
template <typename ConstBufferSequence, typename WriteHandler>
inline void asyncWrite(const ConstBufferSequence& buffers, WriteHandler handler) {
if (tlsSocket_) {
boost::asio::async_write(*tlsSocket_, buffers, handler);
#if BOOST_VERSION >= 106600
boost::asio::async_write(*tlsSocket_, buffers, boost::asio::bind_executor(strand_, handler));
#else
boost::asio::async_write(*tlsSocket_, buffers, strand_.wrap(handler));
#endif
} else {
boost::asio::async_write(*socket_, buffers, handler);
}
......@@ -229,7 +236,11 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
template <typename MutableBufferSequence, typename ReadHandler>
inline void asyncReceive(const MutableBufferSequence& buffers, ReadHandler handler) {
if (tlsSocket_) {
tlsSocket_->async_read_some(buffers, handler);
#if BOOST_VERSION >= 106600
tlsSocket_->async_read_some(buffers, boost::asio::bind_executor(strand_, handler));
#else
tlsSocket_->async_read_some(buffers, strand_.wrap(handler));
#endif
} else {
socket_->async_receive(buffers, handler);
}
......@@ -319,6 +330,12 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
friend class PulsarFriend;
bool isTlsAllowInsecureConnection_;
#if BOOST_VERSION >= 106600
boost::asio::strand<boost::asio::io_service::executor_type> strand_;
#else
boost::asio::io_service::strand strand_;
#endif
};
} // namespace pulsar
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册