提交 5520dfac 编写于 作者: S Simon Fels

Terminate container on shutdown to get all our threads/connections stopped

上级 03acb32e
......@@ -195,7 +195,7 @@ anbox::cmds::SessionManager::SessionManager(const BusFactory &bus_factory)
{"/dev/fuse", "/dev/fuse"},
};
dispatcher->dispatch([&]() { container.start_container(container_configuration); });
dispatcher->dispatch([&]() { container.start(container_configuration); });
auto bus = bus_factory_();
bus->install_executor(core::dbus::asio::make_executor(bus, rt->service()));
......@@ -204,6 +204,11 @@ anbox::cmds::SessionManager::SessionManager(const BusFactory &bus_factory)
rt->start();
trap->run();
// Stop the container which should close all open connections we have on
// our side and should terminate all services.
container.stop();
rt->stop();
return EXIT_SUCCESS;
......
......@@ -42,7 +42,7 @@ Client::Client(const std::shared_ptr<Runtime> &rt)
Client::~Client() {}
void Client::start_container(const Configuration &configuration) {
void Client::start(const Configuration &configuration) {
try {
management_api_->start_container(configuration);
} catch (const std::exception &e) {
......@@ -52,6 +52,10 @@ void Client::start_container(const Configuration &configuration) {
}
}
void Client::stop() {
management_api_->stop_container();
}
void Client::register_terminate_handler(const TerminateCallback &callback) {
terminate_callback_ = callback;
}
......
......@@ -39,7 +39,8 @@ class Client {
Client(const std::shared_ptr<Runtime> &rt);
~Client();
void start_container(const Configuration &configuration);
void start(const Configuration &configuration);
void stop();
void register_terminate_handler(const TerminateCallback &callback);
......
......@@ -32,11 +32,11 @@ ManagementApiMessageProcessor::ManagementApiMessageProcessor(
ManagementApiMessageProcessor::~ManagementApiMessageProcessor() {}
void ManagementApiMessageProcessor::dispatch(
rpc::Invocation const &invocation) {
void ManagementApiMessageProcessor::dispatch(rpc::Invocation const &invocation) {
if (invocation.method_name() == "start_container")
invoke(this, server_.get(), &ManagementApiSkeleton::start_container,
invocation);
invoke(this, server_.get(), &ManagementApiSkeleton::start_container, invocation);
else if (invocation.method_name() == "stop_container")
invoke(this, server_.get(), &ManagementApiSkeleton::stop_container, invocation);
}
void ManagementApiMessageProcessor::process_event_sequence(
......
......@@ -55,8 +55,28 @@ void ManagementApiSkeleton::start_container(
try {
container_->start(container_configuration);
} catch (std::exception &err) {
response->set_error(
utils::string_format("Failed to start container: %s", err.what()));
response->set_error(utils::string_format("Failed to start container: %s", err.what()));
}
done->Run();
}
void ManagementApiSkeleton::stop_container(
anbox::protobuf::container::StopContainer const *request,
anbox::protobuf::rpc::Void *response, google::protobuf::Closure *done) {
(void)request;
if (container_->state() != Container::State::running) {
response->set_error("Container is not running");
done->Run();
return;
}
try {
container_->stop();
} catch (std::exception &err) {
response->set_error(utils::string_format("Failed to stop container: %s", err.what()));
}
done->Run();
......
......@@ -33,6 +33,7 @@ class Void;
} // namespace rpc
namespace container {
class StartContainer;
class StopContainer;
} // namespace container
} // namespace protobuf
namespace rpc {
......@@ -51,6 +52,10 @@ class ManagementApiSkeleton {
anbox::protobuf::container::StartContainer const *request,
anbox::protobuf::rpc::Void *response, google::protobuf::Closure *done);
void stop_container(
anbox::protobuf::container::StopContainer const *request,
anbox::protobuf::rpc::Void *response, google::protobuf::Closure *done);
private:
std::shared_ptr<rpc::PendingCallCache> pending_calls_;
std::shared_ptr<Container> container_;
......
......@@ -36,7 +36,7 @@ void ManagementApiStub::start_container(const Configuration &configuration) {
protobuf::container::StartContainer message;
auto message_configuration = new protobuf::container::Configuration;
for (const auto item : configuration.bind_mounts) {
for (const auto &item : configuration.bind_mounts) {
auto bind_mount_message = message_configuration->add_bind_mounts();
bind_mount_message->set_source(item.first);
bind_mount_message->set_target(item.second);
......@@ -46,24 +46,43 @@ void ManagementApiStub::start_container(const Configuration &configuration) {
{
std::lock_guard<decltype(mutex_)> lock(mutex_);
start_wait_handle_.expect_result();
c->wh.expect_result();
}
channel_->call_method(
"start_container", &message, c->response.get(),
google::protobuf::NewCallback(this, &ManagementApiStub::container_started,
c.get()));
channel_->call_method("start_container", &message, c->response.get(),
google::protobuf::NewCallback(this, &ManagementApiStub::container_started, c.get()));
start_wait_handle_.wait_for_all();
c->wh.wait_for_all();
if (c->response->has_error()) throw std::runtime_error(c->response->error());
}
void ManagementApiStub::container_started(
Request<protobuf::rpc::Void> *request) {
(void)request;
DEBUG("");
start_wait_handle_.result_received();
void ManagementApiStub::container_started(Request<protobuf::rpc::Void> *request) {
request->wh.result_received();
}
void ManagementApiStub::stop_container() {
auto c = std::make_shared<Request<protobuf::rpc::Void>>();
protobuf::container::StopContainer message;
message.set_force(false);
{
std::lock_guard<decltype(mutex_)> lock(mutex_);
c->wh.expect_result();
}
channel_->call_method("stop_container", &message, c->response.get(),
google::protobuf::NewCallback(this, &ManagementApiStub::container_stopped, c.get()));
c->wh.wait_for_all();
if (c->response->has_error()) throw std::runtime_error(c->response->error());
}
void ManagementApiStub::container_stopped(Request<protobuf::rpc::Void> *request) {
request->wh.result_received();
}
} // namespace container
} // namespace anbox
......@@ -40,6 +40,7 @@ class ManagementApiStub : public DoNotCopyOrMove {
~ManagementApiStub();
void start_container(const Configuration &configuration);
void stop_container();
private:
template <typename Response>
......@@ -47,13 +48,14 @@ class ManagementApiStub : public DoNotCopyOrMove {
Request() : response(std::make_shared<Response>()), success(true) {}
std::shared_ptr<Response> response;
bool success;
common::WaitHandle wh;
};
void container_started(Request<protobuf::rpc::Void> *request);
void container_stopped(Request<protobuf::rpc::Void> *request);
mutable std::mutex mutex_;
std::shared_ptr<rpc::Channel> channel_;
common::WaitHandle start_wait_handle_;
};
} // namespace container
} // namespace anbox
......
......@@ -33,10 +33,12 @@ namespace container {
std::shared_ptr<Service> Service::create(const std::shared_ptr<Runtime> &rt, bool privileged) {
auto sp = std::shared_ptr<Service>(new Service(rt, privileged));
auto delegate_connector = std::make_shared<
network::DelegateConnectionCreator<boost::asio::local::stream_protocol>>(
[sp](std::shared_ptr<boost::asio::local::stream_protocol::socket> const
&socket) { sp->new_client(socket); });
auto wp = std::weak_ptr<Service>(sp);
auto delegate_connector = std::make_shared<network::DelegateConnectionCreator<boost::asio::local::stream_protocol>>(
[wp](std::shared_ptr<boost::asio::local::stream_protocol::socket> const &socket) {
if (auto service = wp.lock())
service->new_client(socket);
});
const auto container_socket_path = SystemConfiguration::instance().container_socket_path();
sp->connector_ = std::make_shared<network::PublishedSocketConnector>(container_socket_path, rt, delegate_connector);
......@@ -56,7 +58,9 @@ Service::Service(const std::shared_ptr<Runtime> &rt, bool privileged)
privileged_(privileged) {
}
Service::~Service() {}
Service::~Service() {
connections_->clear();
}
int Service::next_id() { return next_connection_id_++; }
......
......@@ -33,7 +33,9 @@
RenderThread::RenderThread(const std::shared_ptr<Renderer> &renderer, IOStream *stream, emugl::Mutex *lock)
: emugl::Thread(), renderer_(renderer), m_lock(lock), m_stream(stream) {}
RenderThread::~RenderThread() {}
RenderThread::~RenderThread() {
forceStop();
}
RenderThread *RenderThread::create(const std::shared_ptr<Renderer> &renderer, IOStream *stream, emugl::Mutex *lock) {
return new RenderThread(renderer, stream, lock);
......
......@@ -80,7 +80,7 @@ ssize_t BaseSocketMessenger<stream_protocol>::send_raw(char const* data,
std::copy(data, data + length, whole_message.data());
std::unique_lock<std::mutex> lg(message_lock);
return ::send(socket_fd, data, length, 0);
return ::send(socket_fd, data, length, MSG_NOSIGNAL);
}
template <typename stream_protocol>
......@@ -95,8 +95,8 @@ void BaseSocketMessenger<stream_protocol>::send(char const* data,
ba::write(*socket, ba::buffer(whole_message.data(), whole_message.size()),
boost::asio::transfer_all());
} catch (const boost::system::system_error& err) {
DEBUG("Got error: %s", err.what());
if (err.code() == boost::asio::error::try_again) continue;
throw;
}
break;
}
......
......@@ -18,6 +18,7 @@
#include "anbox/network/published_socket_connector.h"
#include "anbox/network/connection_context.h"
#include "anbox/network/socket_helper.h"
#include "anbox/logger.h"
namespace anbox {
namespace network {
......@@ -35,8 +36,7 @@ PublishedSocketConnector::PublishedSocketConnector(
PublishedSocketConnector::~PublishedSocketConnector() {}
void PublishedSocketConnector::start_accept() {
auto socket = std::make_shared<boost::asio::local::stream_protocol::socket>(
runtime_->service());
auto socket = std::make_shared<boost::asio::local::stream_protocol::socket>(runtime_->service());
acceptor_.async_accept(*socket,
[this, socket](boost::system::error_code const& err) {
......@@ -44,10 +44,13 @@ void PublishedSocketConnector::start_accept() {
});
}
void PublishedSocketConnector::on_new_connection(
std::shared_ptr<boost::asio::local::stream_protocol::socket> const& socket,
boost::system::error_code const& err) {
if (!err) connection_creator_->create_connection_for(socket);
void PublishedSocketConnector::on_new_connection(std::shared_ptr<boost::asio::local::stream_protocol::socket> const& socket,
boost::system::error_code const& err) {
if (!err)
connection_creator_->create_connection_for(socket);
if (err.value() == boost::asio::error::operation_aborted)
return;
start_accept();
}
......
......@@ -53,16 +53,13 @@ void SocketConnection::send(char const* data, size_t length) {
}
void SocketConnection::read_next_message() {
auto callback = std::bind(&SocketConnection::on_read_size, this,
std::placeholders::_1, std::placeholders::_2);
auto callback = std::bind(&SocketConnection::on_read_size, this, std::placeholders::_1, std::placeholders::_2);
message_receiver_->async_receive_msg(callback, ba::buffer(buffer_));
}
void SocketConnection::on_read_size(const boost::system::error_code& error,
std::size_t bytes_read) {
void SocketConnection::on_read_size(const boost::system::error_code& error, std::size_t bytes_read) {
if (error) {
if (connections_) connections_->remove(id());
connections_->remove(id());
return;
}
......@@ -71,8 +68,8 @@ void SocketConnection::on_read_size(const boost::system::error_code& error,
if (processor_->process_data(data))
read_next_message();
else if (connections_)
connections_->remove(id());
else
connections_->remove(id());
}
} // namespace anbox
} // namespace network
......@@ -42,10 +42,6 @@ class SocketConnection {
void set_name(const std::string& name) { name_ = name; }
std::shared_ptr<MessageSender> message_sender() const {
return message_sender_;
}
int id() const { return id_; }
void send(char const* data, size_t length);
......@@ -55,10 +51,10 @@ class SocketConnection {
void on_read_size(const boost::system::error_code& ec,
std::size_t bytes_read);
std::shared_ptr<MessageReceiver> message_receiver_;
std::shared_ptr<MessageSender> message_sender_;
std::shared_ptr<MessageReceiver> const message_receiver_;
std::shared_ptr<MessageSender> const message_sender_;
int id_;
std::shared_ptr<Connections<SocketConnection>> connections_;
std::shared_ptr<Connections<SocketConnection>> const connections_;
std::shared_ptr<MessageProcessor> processor_;
std::array<std::uint8_t, 8192> buffer_;
std::string name_;
......
......@@ -53,12 +53,10 @@ void TcpSocketConnector::on_new_connection(
case boost::system::errc::success:
connection_creator_->create_connection_for(socket);
break;
case boost::system::errc::operation_canceled:
default:
// Socket was closed so don't listen for any further incoming
// connection attempts.
return;
default:
break;
}
start_accept();
......
......@@ -33,7 +33,7 @@ class TcpSocketMessenger : public BaseSocketMessenger<boost::asio::ip::tcp> {
std::shared_ptr<boost::asio::ip::tcp::socket> const &socket);
~TcpSocketMessenger();
unsigned short local_port() const;
unsigned short local_port() const override;
private:
unsigned short local_port_;
......
......@@ -13,3 +13,7 @@ message Configuration {
message StartContainer {
required Configuration configuration = 1;
}
message StopContainer {
optional bool force = 1;
}
......@@ -49,6 +49,7 @@ AdbMessageProcessor::AdbMessageProcessor(
host_notify_timer_(rt->service()) {}
AdbMessageProcessor::~AdbMessageProcessor() {
state_ = closed_by_host;
host_connector_.reset();
active_instance.unlock();
}
......@@ -62,6 +63,11 @@ void AdbMessageProcessor::advance_state() {
// one is established but will not use it until the active one is closed.
active_instance.lock();
if (state_ == closed_by_host) {
host_connector_.reset();
return;
}
wait_for_host_connection();
break;
case waiting_for_host_connection:
......@@ -122,9 +128,7 @@ void AdbMessageProcessor::wait_for_host_connection() {
}
}
void AdbMessageProcessor::on_host_connection(
std::shared_ptr<
boost::asio::basic_stream_socket<boost::asio::ip::tcp>> const &socket) {
void AdbMessageProcessor::on_host_connection(std::shared_ptr<boost::asio::basic_stream_socket<boost::asio::ip::tcp>> const &socket) {
host_messenger_ = std::make_shared<network::TcpSocketMessenger>(socket);
// set_no_delay() reduces the latency of sending data, at the cost
......@@ -136,16 +140,14 @@ void AdbMessageProcessor::on_host_connection(
// Let adb inside the container know that we have a connection to
// the adb host instance
messenger_->send(reinterpret_cast<const char *>(ok_command.data()),
ok_command.size());
messenger_->send(reinterpret_cast<const char *>(ok_command.data()), ok_command.size());
state_ = waiting_for_guest_start_command;
expected_command_ = start_command;
}
void AdbMessageProcessor::read_next_host_message() {
auto callback =
std::bind(&AdbMessageProcessor::on_host_read_size, this, _1, _2);
auto callback = std::bind(&AdbMessageProcessor::on_host_read_size, this, _1, _2);
host_messenger_->async_receive_msg(callback,
boost::asio::buffer(host_buffer_));
}
......@@ -153,15 +155,11 @@ void AdbMessageProcessor::read_next_host_message() {
void AdbMessageProcessor::on_host_read_size(
const boost::system::error_code &error, std::size_t bytes_read) {
if (error) {
// If messenger is still alive then close the connection which will
// trigger the terminate of our processor instance too.
if (messenger_) messenger_->close();
return;
state_ = closed_by_host;
BOOST_THROW_EXCEPTION(std::runtime_error(error.message()));
}
messenger_->send(reinterpret_cast<const char *>(host_buffer_.data()),
bytes_read);
messenger_->send(reinterpret_cast<const char *>(host_buffer_.data()), bytes_read);
read_next_host_message();
}
......
......@@ -60,7 +60,7 @@ class AdbMessageProcessor : public network::MessageProcessor {
std::shared_ptr<Runtime> runtime_;
State state_ = waiting_for_guest_accept_command;
std::string expected_command_;
std::shared_ptr<network::SocketMessenger> messenger_;
std::shared_ptr<network::SocketMessenger> const messenger_;
std::vector<std::uint8_t> buffer_;
std::shared_ptr<network::TcpSocketConnector> host_connector_;
std::shared_ptr<network::TcpSocketMessenger> host_messenger_;
......
......@@ -73,13 +73,14 @@ PipeConnectionCreator::PipeConnectionCreator(const std::shared_ptr<Renderer> &re
std::make_shared<network::Connections<network::SocketConnection>>()) {
}
PipeConnectionCreator::~PipeConnectionCreator() {}
PipeConnectionCreator::~PipeConnectionCreator() {
connections_->clear();
}
void PipeConnectionCreator::create_connection_for(
std::shared_ptr<boost::asio::local::stream_protocol::socket> const
&socket) {
auto const messenger =
std::make_shared<network::LocalSocketMessenger>(socket);
auto const messenger = std::make_shared<network::LocalSocketMessenger>(socket);
const auto type = identify_client(messenger);
auto const processor = create_processor(type, messenger);
if (!processor)
......
......@@ -68,8 +68,7 @@ class PipeConnectionCreator
std::shared_ptr<Renderer> renderer_;
std::shared_ptr<Runtime> runtime_;
std::atomic<int> next_connection_id_;
std::shared_ptr<network::Connections<network::SocketConnection>> const
connections_;
std::shared_ptr<network::Connections<network::SocketConnection>> const connections_;
};
} // namespace qemu
} // namespace anbox
......
......@@ -38,10 +38,9 @@ void exception_safe_run(boost::asio::io_service& service) {
// a service::work instance).
break;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
ERROR("%s", e.what());
} catch (...) {
std::cerr
<< "Unknown exception caught while executing boost::asio::io_service";
ERROR("Unknown exception caught while executing boost::asio::io_service");
}
}
}
......@@ -74,7 +73,9 @@ void Runtime::start() {
void Runtime::stop() {
service_.stop();
for (auto& worker : workers_) pthread_kill(worker.native_handle(), SIGTERM);
for (auto& worker : workers_)
if (worker.joinable())
worker.join();
}
std::function<void(std::function<void()>)> Runtime::to_dispatcher_functional() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册