提交 9e9ee9fd 编写于 作者: S Simon Fels 提交者: GitHub

Merge pull request #48 from morphis/feature/correct-termination

Cleanup program termination so that things are correctly released
......@@ -28,6 +28,7 @@ apt-get install -qq -y \
libglm-dev \
libgtest-dev \
liblxc1 \
libproperties-cpp-dev \
libprotobuf-dev \
libsdl2-dev \
lxc-dev \
......
......@@ -24,10 +24,13 @@
#include <string>
#include <core/property.h>
namespace anbox {
class ApplicationManager : public DoNotCopyOrMove {
public:
virtual void launch(const android::Intent &intent, const graphics::Rect &launch_bounds = graphics::Rect::Invalid) = 0;
virtual core::Property<bool>& ready() = 0;
};
} // namespace anbox
......
......@@ -92,6 +92,10 @@ void AndroidApiStub::launch(const android::Intent &intent,
if (c->response->has_error()) throw std::runtime_error(c->response->error());
}
core::Property<bool>& AndroidApiStub::ready() {
return ready_;
}
void AndroidApiStub::application_launched(
Request<protobuf::rpc::Void> *request) {
(void)request;
......
......@@ -43,13 +43,14 @@ class AndroidApiStub : public anbox::ApplicationManager {
void set_rpc_channel(const std::shared_ptr<rpc::Channel> &channel);
void reset_rpc_channel();
void launch(const android::Intent &intent, const graphics::Rect &launch_bounds = graphics::Rect::Invalid) override;
void set_focused_task(const std::int32_t &id);
void remove_task(const std::int32_t &id);
void resize_task(const std::int32_t &id, const anbox::graphics::Rect &rect,
const std::int32_t &resize_mode);
void launch(const android::Intent &intent, const graphics::Rect &launch_bounds = graphics::Rect::Invalid) override;
core::Property<bool>& ready() override;
private:
void ensure_rpc_channel();
......@@ -72,6 +73,7 @@ class AndroidApiStub : public anbox::ApplicationManager {
common::WaitHandle remove_task_handle_;
common::WaitHandle resize_task_handle_;
graphics::Rect launch_bounds_ = graphics::Rect::Invalid;
core::Property<bool> ready_;
};
} // namespace bridge
} // namespace anbox
......
......@@ -109,7 +109,7 @@ bool anbox::cmds::ContainerManager::setup_mounts() {
return false;
}
auto m = common::MountEntry::create(loop_device, android_rootfs_dir, "squashfs", MS_MGC_VAL | MS_RDONLY);
auto m = common::MountEntry::create(loop_device, android_rootfs_dir, "squashfs", MS_MGC_VAL | MS_RDONLY | MS_PRIVATE);
if (!m) {
ERROR("Failed to mount Android rootfs");
return false;
......@@ -133,7 +133,7 @@ bool anbox::cmds::ContainerManager::setup_mounts() {
}
}
auto m = common::MountEntry::create(src_dir_path, target_dir_path, "", MS_MGC_VAL | MS_BIND);
auto m = common::MountEntry::create(src_dir_path, target_dir_path, "", MS_MGC_VAL | MS_BIND | MS_PRIVATE);
if (!m) {
ERROR("Failed to mount Android %s directory", dir_name);
mounts_.clear();
......
......@@ -16,14 +16,24 @@
*/
#include "anbox/cmds/launch.h"
#include "anbox/common/wait_handle.h"
#include "anbox/dbus/stub/application_manager.h"
#include "anbox/common/dispatcher.h"
#include "anbox/runtime.h"
#include "anbox/logger.h"
#include <core/dbus/asio/executor.h>
#include <boost/filesystem.hpp>
#include "core/posix/signal.h"
namespace fs = boost::filesystem;
namespace {
const boost::posix_time::seconds max_wait_timeout{30};
}
anbox::cmds::Launch::Launch()
: CommandWithFlagsAndAction{
cli::Name{"launch"}, cli::Usage{"launch"},
......@@ -46,13 +56,69 @@ anbox::cmds::Launch::Launch()
intent_.component));
action([this](const cli::Command::Context&) {
auto bus =
std::make_shared<core::dbus::Bus>(core::dbus::WellKnownBus::session);
bus->install_executor(core::dbus::asio::make_executor(bus));
auto stub = dbus::stub::ApplicationManager::create_for_bus(bus);
auto trap = core::posix::trap_signals_for_process({core::posix::Signal::sig_term, core::posix::Signal::sig_int});
trap->signal_raised().connect([trap](const core::posix::Signal& signal) {
INFO("Signal %i received. Good night.", static_cast<int>(signal));
trap->stop();
});
auto rt = Runtime::create();
auto bus = std::make_shared<core::dbus::Bus>(core::dbus::WellKnownBus::session);
bus->install_executor(core::dbus::asio::make_executor(bus, rt->service()));
std::shared_ptr<dbus::stub::ApplicationManager> stub;
try {
stub = dbus::stub::ApplicationManager::create_for_bus(bus);
} catch (...) {
ERROR("Anbox session manager service isn't running!");
return EXIT_FAILURE;
}
auto dispatcher = anbox::common::create_dispatcher_for_runtime(rt);
bool success = false;
dispatcher->dispatch([&]() {
if (stub->ready()) {
try {
stub->launch(intent_);
success = true;
} catch (std::exception &err) {
ERROR("err %s", err.what());
}
trap->stop();
return;
}
DEBUG("Android hasn't fully booted yet. Waiting a bit..");
stub->ready().changed().connect([&](bool ready) {
if (!ready)
return;
try {
stub->launch(intent_);
success = true;
} catch (std::exception &err) {
ERROR("Failed to launch activity: %s", err.what());
success = false;
}
trap->stop();
});
});
boost::asio::deadline_timer timer(rt->service());
timer.expires_from_now(max_wait_timeout);
timer.async_wait([&](const boost::system::error_code&) {
WARNING("Stop waiting as we're already waiting for too long. Something is wrong");
WARNING("with your setup and the container may have failed to boot.");
trap->stop();
});
stub->launch(intent_);
rt->start();
trap->run();
rt->stop();
return EXIT_SUCCESS;
return success ? EXIT_SUCCESS : EXIT_FAILURE;
});
}
......@@ -176,8 +176,10 @@ anbox::cmds::SessionManager::SessionManager(const BusFactory &bus_factory)
auto server = std::make_shared<bridge::PlatformApiSkeleton>(
pending_calls, policy, window_manager, launcher_storage);
server->register_boot_finished_handler(
[&]() { DEBUG("Android successfully booted"); });
server->register_boot_finished_handler([&]() {
DEBUG("Android successfully booted");
android_api_stub->ready().set(true);
});
return std::make_shared<bridge::PlatformMessageProcessor>(
sender, server, pending_calls);
}));
......@@ -193,7 +195,7 @@ anbox::cmds::SessionManager::SessionManager(const BusFactory &bus_factory)
{"/dev/fuse", "/dev/fuse"},
};
dispatcher->dispatch([&]() { container.start_container(container_configuration); });
dispatcher->dispatch([&]() { container.start(container_configuration); });
auto bus = bus_factory_();
bus->install_executor(core::dbus::asio::make_executor(bus, rt->service()));
......@@ -202,6 +204,11 @@ anbox::cmds::SessionManager::SessionManager(const BusFactory &bus_factory)
rt->start();
trap->run();
// Stop the container which should close all open connections we have on
// our side and should terminate all services.
container.stop();
rt->stop();
return EXIT_SUCCESS;
......
......@@ -42,7 +42,7 @@ Client::Client(const std::shared_ptr<Runtime> &rt)
Client::~Client() {}
void Client::start_container(const Configuration &configuration) {
void Client::start(const Configuration &configuration) {
try {
management_api_->start_container(configuration);
} catch (const std::exception &e) {
......@@ -52,6 +52,10 @@ void Client::start_container(const Configuration &configuration) {
}
}
void Client::stop() {
management_api_->stop_container();
}
void Client::register_terminate_handler(const TerminateCallback &callback) {
terminate_callback_ = callback;
}
......
......@@ -39,7 +39,8 @@ class Client {
Client(const std::shared_ptr<Runtime> &rt);
~Client();
void start_container(const Configuration &configuration);
void start(const Configuration &configuration);
void stop();
void register_terminate_handler(const TerminateCallback &callback);
......
......@@ -32,11 +32,11 @@ ManagementApiMessageProcessor::ManagementApiMessageProcessor(
ManagementApiMessageProcessor::~ManagementApiMessageProcessor() {}
void ManagementApiMessageProcessor::dispatch(
rpc::Invocation const &invocation) {
void ManagementApiMessageProcessor::dispatch(rpc::Invocation const &invocation) {
if (invocation.method_name() == "start_container")
invoke(this, server_.get(), &ManagementApiSkeleton::start_container,
invocation);
invoke(this, server_.get(), &ManagementApiSkeleton::start_container, invocation);
else if (invocation.method_name() == "stop_container")
invoke(this, server_.get(), &ManagementApiSkeleton::stop_container, invocation);
}
void ManagementApiMessageProcessor::process_event_sequence(
......
......@@ -55,8 +55,28 @@ void ManagementApiSkeleton::start_container(
try {
container_->start(container_configuration);
} catch (std::exception &err) {
response->set_error(
utils::string_format("Failed to start container: %s", err.what()));
response->set_error(utils::string_format("Failed to start container: %s", err.what()));
}
done->Run();
}
void ManagementApiSkeleton::stop_container(
anbox::protobuf::container::StopContainer const *request,
anbox::protobuf::rpc::Void *response, google::protobuf::Closure *done) {
(void)request;
if (container_->state() != Container::State::running) {
response->set_error("Container is not running");
done->Run();
return;
}
try {
container_->stop();
} catch (std::exception &err) {
response->set_error(utils::string_format("Failed to stop container: %s", err.what()));
}
done->Run();
......
......@@ -33,6 +33,7 @@ class Void;
} // namespace rpc
namespace container {
class StartContainer;
class StopContainer;
} // namespace container
} // namespace protobuf
namespace rpc {
......@@ -51,6 +52,10 @@ class ManagementApiSkeleton {
anbox::protobuf::container::StartContainer const *request,
anbox::protobuf::rpc::Void *response, google::protobuf::Closure *done);
void stop_container(
anbox::protobuf::container::StopContainer const *request,
anbox::protobuf::rpc::Void *response, google::protobuf::Closure *done);
private:
std::shared_ptr<rpc::PendingCallCache> pending_calls_;
std::shared_ptr<Container> container_;
......
......@@ -36,7 +36,7 @@ void ManagementApiStub::start_container(const Configuration &configuration) {
protobuf::container::StartContainer message;
auto message_configuration = new protobuf::container::Configuration;
for (const auto item : configuration.bind_mounts) {
for (const auto &item : configuration.bind_mounts) {
auto bind_mount_message = message_configuration->add_bind_mounts();
bind_mount_message->set_source(item.first);
bind_mount_message->set_target(item.second);
......@@ -46,24 +46,43 @@ void ManagementApiStub::start_container(const Configuration &configuration) {
{
std::lock_guard<decltype(mutex_)> lock(mutex_);
start_wait_handle_.expect_result();
c->wh.expect_result();
}
channel_->call_method(
"start_container", &message, c->response.get(),
google::protobuf::NewCallback(this, &ManagementApiStub::container_started,
c.get()));
channel_->call_method("start_container", &message, c->response.get(),
google::protobuf::NewCallback(this, &ManagementApiStub::container_started, c.get()));
start_wait_handle_.wait_for_all();
c->wh.wait_for_all();
if (c->response->has_error()) throw std::runtime_error(c->response->error());
}
void ManagementApiStub::container_started(
Request<protobuf::rpc::Void> *request) {
(void)request;
DEBUG("");
start_wait_handle_.result_received();
void ManagementApiStub::container_started(Request<protobuf::rpc::Void> *request) {
request->wh.result_received();
}
void ManagementApiStub::stop_container() {
auto c = std::make_shared<Request<protobuf::rpc::Void>>();
protobuf::container::StopContainer message;
message.set_force(false);
{
std::lock_guard<decltype(mutex_)> lock(mutex_);
c->wh.expect_result();
}
channel_->call_method("stop_container", &message, c->response.get(),
google::protobuf::NewCallback(this, &ManagementApiStub::container_stopped, c.get()));
c->wh.wait_for_all();
if (c->response->has_error()) throw std::runtime_error(c->response->error());
}
void ManagementApiStub::container_stopped(Request<protobuf::rpc::Void> *request) {
request->wh.result_received();
}
} // namespace container
} // namespace anbox
......@@ -40,6 +40,7 @@ class ManagementApiStub : public DoNotCopyOrMove {
~ManagementApiStub();
void start_container(const Configuration &configuration);
void stop_container();
private:
template <typename Response>
......@@ -47,13 +48,14 @@ class ManagementApiStub : public DoNotCopyOrMove {
Request() : response(std::make_shared<Response>()), success(true) {}
std::shared_ptr<Response> response;
bool success;
common::WaitHandle wh;
};
void container_started(Request<protobuf::rpc::Void> *request);
void container_stopped(Request<protobuf::rpc::Void> *request);
mutable std::mutex mutex_;
std::shared_ptr<rpc::Channel> channel_;
common::WaitHandle start_wait_handle_;
};
} // namespace container
} // namespace anbox
......
......@@ -33,10 +33,12 @@ namespace container {
std::shared_ptr<Service> Service::create(const std::shared_ptr<Runtime> &rt, bool privileged) {
auto sp = std::shared_ptr<Service>(new Service(rt, privileged));
auto delegate_connector = std::make_shared<
network::DelegateConnectionCreator<boost::asio::local::stream_protocol>>(
[sp](std::shared_ptr<boost::asio::local::stream_protocol::socket> const
&socket) { sp->new_client(socket); });
auto wp = std::weak_ptr<Service>(sp);
auto delegate_connector = std::make_shared<network::DelegateConnectionCreator<boost::asio::local::stream_protocol>>(
[wp](std::shared_ptr<boost::asio::local::stream_protocol::socket> const &socket) {
if (auto service = wp.lock())
service->new_client(socket);
});
const auto container_socket_path = SystemConfiguration::instance().container_socket_path();
sp->connector_ = std::make_shared<network::PublishedSocketConnector>(container_socket_path, rt, delegate_connector);
......@@ -56,7 +58,9 @@ Service::Service(const std::shared_ptr<Runtime> &rt, bool privileged)
privileged_(privileged) {
}
Service::~Service() {}
Service::~Service() {
connections_->clear();
}
int Service::next_id() { return next_connection_id_++; }
......
......@@ -19,6 +19,7 @@
#define ANBOX_DBUS_INTERFACE_H_
#include <core/dbus/macros.h>
#include <core/dbus/property.h>
#include <chrono>
#include <string>
......@@ -42,6 +43,9 @@ struct ApplicationManager {
}
};
};
struct Properties {
DBUS_CPP_READABLE_PROPERTY_DEF(Ready, ApplicationManager, bool)
};
};
} // namespace interface
} // namespace dbus
......
......@@ -20,15 +20,19 @@
#include "anbox/dbus/interface.h"
#include "anbox/logger.h"
#include <core/property.h>
namespace anbox {
namespace dbus {
namespace skeleton {
ApplicationManager::ApplicationManager(
const core::dbus::Bus::Ptr &bus, const core::dbus::Object::Ptr &object,
const std::shared_ptr<anbox::ApplicationManager> &impl)
: bus_(bus), object_(object), impl_(impl) {
object_->install_method_handler<
anbox::dbus::interface::ApplicationManager::Methods::Launch>(
: bus_(bus), object_(object), impl_(impl),
properties_{ object_->get_property<anbox::dbus::interface::ApplicationManager::Properties::Ready>() },
signals_{ object_->get_signal<core::dbus::interfaces::Properties::Signals::PropertiesChanged>() } {
object_->install_method_handler<anbox::dbus::interface::ApplicationManager::Methods::Launch>(
[this](const core::dbus::Message::Ptr &msg) {
auto reader = msg->reader();
......@@ -59,13 +63,38 @@ ApplicationManager::ApplicationManager(
bus_->send(reply);
});
// Forward AndroidApi status to our dbus property
properties_.ready->install([&]() { return impl_->ready().get(); });
impl_->ready().changed().connect([&](bool value) {
properties_.ready->set(value);
on_property_value_changed<anbox::dbus::interface::ApplicationManager::Properties::Ready>(value);
});
}
ApplicationManager::~ApplicationManager() {}
template<typename Property>
void ApplicationManager::on_property_value_changed(const typename Property::ValueType& value)
{
typedef std::map<std::string, core::dbus::types::Variant> Dictionary;
static const std::vector<std::string> the_empty_list_of_invalidated_properties;
Dictionary dict; dict[Property::name()] = core::dbus::types::Variant::encode(value);
signals_.properties_changed->emit(
std::make_tuple(core::dbus::traits::Service<anbox::dbus::interface::ApplicationManager>::interface_name(),
dict, the_empty_list_of_invalidated_properties));
}
void ApplicationManager::launch(const android::Intent &intent, const graphics::Rect &launch_bounds) {
impl_->launch(intent, launch_bounds);
}
core::Property<bool>& ApplicationManager::ready() {
return impl_->ready();
}
} // namespace skeleton
} // namespace dbus
} // namespace anbox
......@@ -23,6 +23,9 @@
#include <core/dbus/bus.h>
#include <core/dbus/object.h>
#include <core/dbus/service.h>
#include <core/dbus/property.h>
#include "anbox/dbus/interface.h"
namespace anbox {
namespace dbus {
......@@ -35,12 +38,23 @@ class ApplicationManager : public anbox::ApplicationManager {
~ApplicationManager();
void launch(const android::Intent &intent, const graphics::Rect &launch_bounds = graphics::Rect::Invalid) override;
core::Property<bool>& ready() override;
private:
template<typename Property>
void on_property_value_changed(const typename Property::ValueType& value);
core::dbus::Bus::Ptr bus_;
core::dbus::Service::Ptr service_;
core::dbus::Object::Ptr object_;
std::shared_ptr<anbox::ApplicationManager> impl_;
struct {
std::shared_ptr<core::dbus::Property<anbox::dbus::interface::ApplicationManager::Properties::Ready>> ready;
} properties_;
struct {
core::dbus::Signal<core::dbus::interfaces::Properties::Signals::PropertiesChanged,
core::dbus::interfaces::Properties::Signals::PropertiesChanged::ArgumentType>::Ptr properties_changed;
} signals_;
};
} // namespace skeleton
} // namespace dbus
......
......@@ -39,8 +39,7 @@ Service::Service(
: bus_(bus),
service_(service),
object_(object),
application_manager_(std::make_shared<ApplicationManager>(
bus_, object_, application_manager)) {}
application_manager_(std::make_shared<ApplicationManager>(bus_, object_, application_manager)) {}
Service::~Service() {}
} // namespace skeleton
......
......@@ -22,19 +22,22 @@
namespace anbox {
namespace dbus {
namespace stub {
std::shared_ptr<ApplicationManager> ApplicationManager::create_for_bus(
const core::dbus::Bus::Ptr &bus) {
auto service = core::dbus::Service::use_service(
bus, anbox::dbus::interface::Service::name());
auto object =
service->add_object_for_path(anbox::dbus::interface::Service::path());
std::shared_ptr<ApplicationManager> ApplicationManager::create_for_bus(const core::dbus::Bus::Ptr &bus) {
auto service = core::dbus::Service::use_service_or_throw_if_not_available(bus, anbox::dbus::interface::Service::name());
auto object = service->add_object_for_path(anbox::dbus::interface::Service::path());
return std::make_shared<ApplicationManager>(bus, service, object);
}
ApplicationManager::ApplicationManager(const core::dbus::Bus::Ptr &bus,
const core::dbus::Service::Ptr &service,
const core::dbus::Object::Ptr &object)
: bus_(bus), service_(service), object_(object) {}
: bus_(bus), service_(service), object_(object),
properties_{ object_->get_property<anbox::dbus::interface::ApplicationManager::Properties::Ready>() } {
// Forward changes on the dbus property to our users
ready_.install([&]() { return properties_.ready->get(); });
properties_.ready->changed().connect([&](bool value) { ready_.set(value); });
}
ApplicationManager::~ApplicationManager() {}
......@@ -48,6 +51,10 @@ void ApplicationManager::launch(const android::Intent &intent, const graphics::R
if (result.is_error()) throw std::runtime_error(result.error().print());
}
core::Property<bool>& ApplicationManager::ready() {
return ready_;
}
} // namespace skeleton
} // namespace dbus
} // namespace anbox
......@@ -24,6 +24,8 @@
#include <core/dbus/object.h>
#include <core/dbus/service.h>
#include "anbox/dbus/interface.h"
namespace anbox {
namespace dbus {
namespace stub {
......@@ -38,11 +40,16 @@ class ApplicationManager : public anbox::ApplicationManager {
~ApplicationManager();
void launch(const android::Intent &intent, const graphics::Rect &launch_bounds = graphics::Rect::Invalid) override;
core::Property<bool>& ready() override;
private:
core::dbus::Bus::Ptr bus_;
core::dbus::Service::Ptr service_;
core::dbus::Object::Ptr object_;
core::Property<bool> ready_;
struct {
std::shared_ptr<core::dbus::Property<anbox::dbus::interface::ApplicationManager::Properties::Ready>> ready;
} properties_;
};
} // namespace stub
} // namespace dbus
......
......@@ -33,7 +33,9 @@
RenderThread::RenderThread(const std::shared_ptr<Renderer> &renderer, IOStream *stream, emugl::Mutex *lock)
: emugl::Thread(), renderer_(renderer), m_lock(lock), m_stream(stream) {}
RenderThread::~RenderThread() {}
RenderThread::~RenderThread() {
forceStop();
}
RenderThread *RenderThread::create(const std::shared_ptr<Renderer> &renderer, IOStream *stream, emugl::Mutex *lock) {
return new RenderThread(renderer, stream, lock);
......
......@@ -80,7 +80,7 @@ ssize_t BaseSocketMessenger<stream_protocol>::send_raw(char const* data,
std::copy(data, data + length, whole_message.data());
std::unique_lock<std::mutex> lg(message_lock);
return ::send(socket_fd, data, length, 0);
return ::send(socket_fd, data, length, MSG_NOSIGNAL);
}
template <typename stream_protocol>
......@@ -95,8 +95,8 @@ void BaseSocketMessenger<stream_protocol>::send(char const* data,
ba::write(*socket, ba::buffer(whole_message.data(), whole_message.size()),
boost::asio::transfer_all());
} catch (const boost::system::system_error& err) {
DEBUG("Got error: %s", err.what());
if (err.code() == boost::asio::error::try_again) continue;
throw;
}
break;
}
......
......@@ -18,6 +18,7 @@
#include "anbox/network/published_socket_connector.h"
#include "anbox/network/connection_context.h"
#include "anbox/network/socket_helper.h"
#include "anbox/logger.h"
namespace anbox {
namespace network {
......@@ -35,8 +36,7 @@ PublishedSocketConnector::PublishedSocketConnector(
PublishedSocketConnector::~PublishedSocketConnector() {}
void PublishedSocketConnector::start_accept() {
auto socket = std::make_shared<boost::asio::local::stream_protocol::socket>(
runtime_->service());
auto socket = std::make_shared<boost::asio::local::stream_protocol::socket>(runtime_->service());
acceptor_.async_accept(*socket,
[this, socket](boost::system::error_code const& err) {
......@@ -44,10 +44,13 @@ void PublishedSocketConnector::start_accept() {
});
}
void PublishedSocketConnector::on_new_connection(
std::shared_ptr<boost::asio::local::stream_protocol::socket> const& socket,
boost::system::error_code const& err) {
if (!err) connection_creator_->create_connection_for(socket);
void PublishedSocketConnector::on_new_connection(std::shared_ptr<boost::asio::local::stream_protocol::socket> const& socket,
boost::system::error_code const& err) {
if (!err)
connection_creator_->create_connection_for(socket);
if (err.value() == boost::asio::error::operation_aborted)
return;
start_accept();
}
......
......@@ -53,16 +53,13 @@ void SocketConnection::send(char const* data, size_t length) {
}
void SocketConnection::read_next_message() {
auto callback = std::bind(&SocketConnection::on_read_size, this,
std::placeholders::_1, std::placeholders::_2);
auto callback = std::bind(&SocketConnection::on_read_size, this, std::placeholders::_1, std::placeholders::_2);
message_receiver_->async_receive_msg(callback, ba::buffer(buffer_));
}
void SocketConnection::on_read_size(const boost::system::error_code& error,
std::size_t bytes_read) {
void SocketConnection::on_read_size(const boost::system::error_code& error, std::size_t bytes_read) {
if (error) {
if (connections_) connections_->remove(id());
connections_->remove(id());
return;
}
......@@ -71,8 +68,8 @@ void SocketConnection::on_read_size(const boost::system::error_code& error,
if (processor_->process_data(data))
read_next_message();
else if (connections_)
connections_->remove(id());
else
connections_->remove(id());
}
} // namespace anbox
} // namespace network
......@@ -42,10 +42,6 @@ class SocketConnection {
void set_name(const std::string& name) { name_ = name; }
std::shared_ptr<MessageSender> message_sender() const {
return message_sender_;
}
int id() const { return id_; }
void send(char const* data, size_t length);
......@@ -55,10 +51,10 @@ class SocketConnection {
void on_read_size(const boost::system::error_code& ec,
std::size_t bytes_read);
std::shared_ptr<MessageReceiver> message_receiver_;
std::shared_ptr<MessageSender> message_sender_;
std::shared_ptr<MessageReceiver> const message_receiver_;
std::shared_ptr<MessageSender> const message_sender_;
int id_;
std::shared_ptr<Connections<SocketConnection>> connections_;
std::shared_ptr<Connections<SocketConnection>> const connections_;
std::shared_ptr<MessageProcessor> processor_;
std::array<std::uint8_t, 8192> buffer_;
std::string name_;
......
......@@ -53,12 +53,10 @@ void TcpSocketConnector::on_new_connection(
case boost::system::errc::success:
connection_creator_->create_connection_for(socket);
break;
case boost::system::errc::operation_canceled:
default:
// Socket was closed so don't listen for any further incoming
// connection attempts.
return;
default:
break;
}
start_accept();
......
......@@ -33,7 +33,7 @@ class TcpSocketMessenger : public BaseSocketMessenger<boost::asio::ip::tcp> {
std::shared_ptr<boost::asio::ip::tcp::socket> const &socket);
~TcpSocketMessenger();
unsigned short local_port() const;
unsigned short local_port() const override;
private:
unsigned short local_port_;
......
......@@ -13,3 +13,7 @@ message Configuration {
message StartContainer {
required Configuration configuration = 1;
}
message StopContainer {
optional bool force = 1;
}
......@@ -49,6 +49,7 @@ AdbMessageProcessor::AdbMessageProcessor(
host_notify_timer_(rt->service()) {}
AdbMessageProcessor::~AdbMessageProcessor() {
state_ = closed_by_host;
host_connector_.reset();
active_instance.unlock();
}
......@@ -62,6 +63,11 @@ void AdbMessageProcessor::advance_state() {
// one is established but will not use it until the active one is closed.
active_instance.lock();
if (state_ == closed_by_host) {
host_connector_.reset();
return;
}
wait_for_host_connection();
break;
case waiting_for_host_connection:
......@@ -122,9 +128,7 @@ void AdbMessageProcessor::wait_for_host_connection() {
}
}
void AdbMessageProcessor::on_host_connection(
std::shared_ptr<
boost::asio::basic_stream_socket<boost::asio::ip::tcp>> const &socket) {
void AdbMessageProcessor::on_host_connection(std::shared_ptr<boost::asio::basic_stream_socket<boost::asio::ip::tcp>> const &socket) {
host_messenger_ = std::make_shared<network::TcpSocketMessenger>(socket);
// set_no_delay() reduces the latency of sending data, at the cost
......@@ -136,16 +140,14 @@ void AdbMessageProcessor::on_host_connection(
// Let adb inside the container know that we have a connection to
// the adb host instance
messenger_->send(reinterpret_cast<const char *>(ok_command.data()),
ok_command.size());
messenger_->send(reinterpret_cast<const char *>(ok_command.data()), ok_command.size());
state_ = waiting_for_guest_start_command;
expected_command_ = start_command;
}
void AdbMessageProcessor::read_next_host_message() {
auto callback =
std::bind(&AdbMessageProcessor::on_host_read_size, this, _1, _2);
auto callback = std::bind(&AdbMessageProcessor::on_host_read_size, this, _1, _2);
host_messenger_->async_receive_msg(callback,
boost::asio::buffer(host_buffer_));
}
......@@ -153,15 +155,11 @@ void AdbMessageProcessor::read_next_host_message() {
void AdbMessageProcessor::on_host_read_size(
const boost::system::error_code &error, std::size_t bytes_read) {
if (error) {
// If messenger is still alive then close the connection which will
// trigger the terminate of our processor instance too.
if (messenger_) messenger_->close();
return;
state_ = closed_by_host;
BOOST_THROW_EXCEPTION(std::runtime_error(error.message()));
}
messenger_->send(reinterpret_cast<const char *>(host_buffer_.data()),
bytes_read);
messenger_->send(reinterpret_cast<const char *>(host_buffer_.data()), bytes_read);
read_next_host_message();
}
......
......@@ -60,7 +60,7 @@ class AdbMessageProcessor : public network::MessageProcessor {
std::shared_ptr<Runtime> runtime_;
State state_ = waiting_for_guest_accept_command;
std::string expected_command_;
std::shared_ptr<network::SocketMessenger> messenger_;
std::shared_ptr<network::SocketMessenger> const messenger_;
std::vector<std::uint8_t> buffer_;
std::shared_ptr<network::TcpSocketConnector> host_connector_;
std::shared_ptr<network::TcpSocketMessenger> host_messenger_;
......
......@@ -73,13 +73,14 @@ PipeConnectionCreator::PipeConnectionCreator(const std::shared_ptr<Renderer> &re
std::make_shared<network::Connections<network::SocketConnection>>()) {
}
PipeConnectionCreator::~PipeConnectionCreator() {}
PipeConnectionCreator::~PipeConnectionCreator() {
connections_->clear();
}
void PipeConnectionCreator::create_connection_for(
std::shared_ptr<boost::asio::local::stream_protocol::socket> const
&socket) {
auto const messenger =
std::make_shared<network::LocalSocketMessenger>(socket);
auto const messenger = std::make_shared<network::LocalSocketMessenger>(socket);
const auto type = identify_client(messenger);
auto const processor = create_processor(type, messenger);
if (!processor)
......
......@@ -68,8 +68,7 @@ class PipeConnectionCreator
std::shared_ptr<Renderer> renderer_;
std::shared_ptr<Runtime> runtime_;
std::atomic<int> next_connection_id_;
std::shared_ptr<network::Connections<network::SocketConnection>> const
connections_;
std::shared_ptr<network::Connections<network::SocketConnection>> const connections_;
};
} // namespace qemu
} // namespace anbox
......
......@@ -38,10 +38,9 @@ void exception_safe_run(boost::asio::io_service& service) {
// a service::work instance).
break;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
ERROR("%s", e.what());
} catch (...) {
std::cerr
<< "Unknown exception caught while executing boost::asio::io_service";
ERROR("Unknown exception caught while executing boost::asio::io_service");
}
}
}
......@@ -74,7 +73,9 @@ void Runtime::start() {
void Runtime::stop() {
service_.stop();
for (auto& worker : workers_) pthread_kill(worker.native_handle(), SIGTERM);
for (auto& worker : workers_)
if (worker.joinable())
worker.join();
}
std::function<void(std::function<void()>)> Runtime::to_dispatcher_functional() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册