提交 91d37082 编写于 作者: S Simon Fels

First attempt on getting rid of the socket server

上级 e0ce4baa
......@@ -3,6 +3,7 @@
set(CMAKE_C_FLAGS "-Wall")
include_directories(
${CMAKE_SOURCE_DIR}/src
${CMAKE_SOURCE_DIR}/external/android-emugl/shared
${CMAKE_SOURCE_DIR}/external/android-emugl/host/include
${CMAKE_SOURCE_DIR}/external/android-emugl/shared/OpenglCodecCommon
......
......@@ -21,6 +21,8 @@
#include "ErrorLog.h"
#include "anbox/logger.h"
class IOStream {
public:
......@@ -70,7 +72,7 @@ public:
}
int flush() {
DEBUG("buf %p free %d buf size %d", m_buf, m_free, m_bufsize);
if (!m_buf || m_free == m_bufsize) return 0;
int stat = commitBuffer(m_bufsize - m_free);
......
......@@ -14,7 +14,7 @@ add_custom_command(
set(SOURCES
GLESv1Decoder.cpp)
if ("${cmake_build_type_lower}" STREQUAL "trace")
if ("${cmake_build_type_lower}" STREQUAL "debug")
set(OPENGL_DEBUG "-DOPENGL_DEBUG_PRINTOUT -DCHECK_GL_ERROR")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${OPENGL_DEBUG}")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${OPENGL_DEBUG}")
......
......@@ -14,7 +14,7 @@ add_custom_command(
set(SOURCES
GLESv2Decoder.cpp)
if ("${cmake_build_type_lower}" STREQUAL "trace")
if ("${cmake_build_type_lower}" STREQUAL "debug")
set(OPENGL_DEBUG "-DOPENGL_DEBUG_PRINTOUT -DCHECK_GL_ERROR")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${OPENGL_DEBUG}")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${OPENGL_DEBUG}")
......
......@@ -10,7 +10,7 @@ add_custom_command(
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
DEPENDS emugen)
if ("${cmake_build_type_lower}" STREQUAL "trace")
if ("${cmake_build_type_lower}" STREQUAL "debug")
set(OPENGL_DEBUG "-DOPENGL_DEBUG_PRINTOUT -DCHECK_GL_ERROR")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${OPENGL_DEBUG}")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${OPENGL_DEBUG}")
......
......@@ -255,12 +255,17 @@ void ColorBuffer::readPixels(int x,
GLenum p_format,
GLenum p_type,
void* pixels) {
DEBUG("");
ScopedHelperContext context(m_helper);
if (!context.isOk()) {
return;
}
DEBUG("Before bind");
if (bindFbo(&m_fbo, m_tex)) {
DEBUG("Off to GL to read pixels");
s_gles2.glReadPixels(x, y, width, height, p_format, p_type, pixels);
unbindFbo();
}
......
......@@ -545,7 +545,9 @@ HandleType FrameBuffer::genHandle()
HandleType FrameBuffer::createColorBuffer(int p_width, int p_height,
GLenum p_internalFormat)
{
DEBUG("width %d height %d", p_width, p_height);
emugl::Mutex::AutoLock mutex(m_lock);
DEBUG("Got lock");
HandleType ret = 0;
ColorBufferPtr cb(ColorBuffer::create(
......@@ -748,10 +750,16 @@ void FrameBuffer::readColorBuffer(HandleType p_colorbuffer,
int x, int y, int width, int height,
GLenum format, GLenum type, void *pixels)
{
DEBUG("handle %d x %d y %d width %d height %d",
p_colorbuffer, x, y, width, height);
emugl::Mutex::AutoLock mutex(m_lock);
DEBUG("Got lock");
ColorBufferMap::iterator c( m_colorbuffers.find(p_colorbuffer) );
if (c == m_colorbuffers.end()) {
DEBUG("Didn't found color buffer");
// bad colorbuffer handle
return;
}
......
......@@ -73,8 +73,8 @@ LayerManager::~LayerManager() {
}
void LayerManager::post_layer(const LayerInfo &layer) {
if (is_layer_blacklisted(layer.name))
return;
// if (is_layer_blacklisted(layer.name))
// return;
FrameBufferWindow *window = nullptr;
for (auto &l : layers_) {
......
......@@ -79,61 +79,6 @@ RENDER_APICALL int RENDER_APIENTRY initOpenGLRenderer(
set_emugl_crash_reporter(crashfunc);
set_emugl_logger(logfuncs.coarse);
set_emugl_cxt_logger(logfuncs.fine);
//
// Fail if renderer is already initialized
//
if (s_renderThread) {
return false;
}
// kUseThread is used to determine whether the RenderWindow should use
// a separate thread to manage its subwindow GL/GLES context.
// For now, this feature is disabled entirely for the following
// reasons:
//
// - It must be disabled on Windows at all times, otherwise the main window becomes
// unresponsive after a few seconds of user interaction (e.g. trying to
// move it over the desktop). Probably due to the subtle issues around
// input on this platform (input-queue is global, message-queue is
// per-thread). Also, this messes considerably the display of the
// main window when running the executable under Wine.
//
// - On Linux/XGL and OSX/Cocoa, this used to be necessary to avoid corruption
// issues with the GL state of the main window when using the SDL UI.
// After the switch to Qt, this is no longer necessary and may actually cause
// undesired interactions between the UI thread and the RenderWindow thread:
// for example, in a multi-monitor setup the context might be recreated when
// dragging the window between monitors, triggering a Qt-specific callback
// in the context of RenderWindow thread, which will become blocked on the UI
// thread, which may in turn be blocked on something else.
bool kUseThread = false;
//
// initialize the renderer and listen to connections
// on a thread in the current process.
//
s_renderWindow = new RenderWindow(native_display, kUseThread);
if (!s_renderWindow) {
ERR("Could not create rendering window class");
GL_LOG("Could not create rendering window class");
return false;
}
if (!s_renderWindow->isValid()) {
ERR("Could not initialize emulated framebuffer\n");
delete s_renderWindow;
s_renderWindow = NULL;
return false;
}
s_renderThread = RenderServer::create(addr, addrLen);
if (!s_renderThread) {
return false;
}
strncpy(s_renderAddr, addr, sizeof(s_renderAddr));
s_renderThread->start();
GL_LOG("OpenGL renderer initialized successfully");
return true;
}
......
......@@ -25,6 +25,8 @@
#include "OpenGLESDispatch/EGLDispatch.h"
#include "anbox/logger.h"
#include <map>
#include <string>
......@@ -236,6 +238,7 @@ static void rcDestroyWindowSurface(uint32_t windowSurface)
static uint32_t rcCreateColorBuffer(uint32_t width,
uint32_t height, GLenum internalFormat)
{
DEBUG("");
FrameBuffer *fb = FrameBuffer::getFB();
if (!fb) {
return 0;
......@@ -351,6 +354,8 @@ static void rcReadColorBuffer(uint32_t colorBuffer,
GLint width, GLint height,
GLenum format, GLenum type, void* pixels)
{
DEBUG("");
FrameBuffer *fb = FrameBuffer::getFB();
if (!fb) {
return;
......
......@@ -26,15 +26,19 @@
#include "OpenGLESDispatch/GLESv1Dispatch.h"
#include "../../../shared/OpenglCodecCommon/ChecksumCalculatorThreadInfo.h"
#include "anbox/logger.h"
#define STREAM_BUFFER_SIZE 4*1024*1024
RenderThread::RenderThread(IOStream *stream, emugl::Mutex *lock) :
emugl::Thread(),
m_lock(lock),
m_stream(stream) {}
m_stream(stream) {
DEBUG("");
}
RenderThread::~RenderThread() {
delete m_stream;
DEBUG("");
}
// static
......@@ -43,78 +47,75 @@ RenderThread* RenderThread::create(IOStream *stream, emugl::Mutex *lock) {
}
void RenderThread::forceStop() {
DEBUG("");
m_stream->forceStop();
}
intptr_t RenderThread::main() {
RenderThreadInfo tInfo;
// Not used below but will store a reference of itself in TLS so that
// it can be accessed down the stack in the same thread when decoding
// any of the commands.
ChecksumCalculatorThreadInfo tChecksumInfo;
//
// initialize decoders
//
tInfo.m_glDec.initGL(gles1_dispatch_get_proc_func, NULL);
tInfo.m_gl2Dec.initGL(gles2_dispatch_get_proc_func, NULL);
initRenderControlContext(&tInfo.m_rcDec);
ReadBuffer readBuf(STREAM_BUFFER_SIZE);
while (1) {
DEBUG("Started");
while (1) {
int stat = readBuf.getData(m_stream);
if (stat <= 0) {
DEBUG("Connection closed");
break;
}
DEBUG("Got %d bytes for decoding", readBuf.validData());
bool progress;
do {
progress = false;
m_lock->lock();
//
// try to process some of the command buffer using the GLESv1 decoder
//
DEBUG("Locked");
size_t last = tInfo.m_glDec.decode(readBuf.buf(), readBuf.validData(), m_stream);
if (last > 0) {
DEBUG("Ran GL commands");
progress = true;
readBuf.consume(last);
}
//
// try to process some of the command buffer using the GLESv2 decoder
//
last = tInfo.m_gl2Dec.decode(readBuf.buf(), readBuf.validData(), m_stream);
if (last > 0) {
DEBUG("Ran GL2 commands");
progress = true;
readBuf.consume(last);
}
//
// try to process some of the command buffer using the
// renderControl decoder
//
last = tInfo.m_rcDec.decode(readBuf.buf(), readBuf.validData(), m_stream);
if (last > 0) {
DEBUG("Ran RC commands");
readBuf.consume(last);
progress = true;
}
m_lock->unlock();
} while( progress );
DEBUG("Unlocked");
} while (progress);
}
//
DEBUG("Shutting down");
// Release references to the current thread's context/surfaces if any
//
FrameBuffer::getFB()->bindContext(0, 0, 0);
if (tInfo.currContext || tInfo.currDrawSurf || tInfo.currReadSurf) {
fprintf(stderr, "ERROR: RenderThread exiting with current context/surfaces\n");
}
if (tInfo.currContext || tInfo.currDrawSurf || tInfo.currReadSurf)
ERROR("Exiting with current context/surfaces");
FrameBuffer::getFB()->drainWindowSurface();
FrameBuffer::getFB()->drainRenderContext();
return 0;
......
......@@ -71,6 +71,9 @@ void GLRendererServer::start() {
log_funcs.coarse = logger_write;
log_funcs.fine = logger_write;
// HACK: This will do nothing but set our log functions
initOpenGLRenderer(nullptr, nullptr, 0, log_funcs, logger_write);
FrameBuffer::initialize(window_creator_->native_display());
}
} // namespace graphics
......
......@@ -26,122 +26,227 @@
#include <condition_variable>
#include <queue>
#include <functional>
namespace {
class DirectIOStream : public IOStream {
constexpr const size_t default_buffer_size{384};
constexpr const size_t max_send_buffer_size{1024};
class DelayedIOStream : public IOStream {
public:
explicit DirectIOStream(const std::shared_ptr<anbox::network::SocketMessenger> &messenger,
const size_t &buffer_size = 10000) :
typedef std::vector<char> Buffer;
explicit DelayedIOStream(const std::shared_ptr<anbox::network::SocketMessenger> &messenger,
size_t buffer_size = default_buffer_size) :
IOStream(buffer_size),
messenger_(messenger) {
// writer_thread_(std::bind(&DelayedIOStream::worker_thread, this)) {
}
virtual ~DirectIOStream() {
if (send_buffer_ != nullptr) {
free(send_buffer_);
send_buffer_ = nullptr;
}
virtual ~DelayedIOStream() {
DEBUG("");
forceStop();
DEBUG("Shutting down");
}
void* allocBuffer(size_t min_size) override {
size_t size = (send_buffer_size_ < min_size ? min_size : send_buffer_size_);
if (!send_buffer_)
send_buffer_ = (unsigned char *) malloc(size);
else if (send_buffer_size_ < size) {
unsigned char *p = (unsigned char *)realloc(send_buffer_, size);
if (p != NULL) {
send_buffer_ = p;
send_buffer_size_ = size;
} else {
free(send_buffer_);
send_buffer_ = NULL;
send_buffer_size_ = 0;
}
}
return send_buffer_;
DEBUG("min size %d", min_size);
if (buffer_.size() < min_size)
buffer_.resize(min_size);
return buffer_.data();
}
int commitBuffer(size_t size) override {
messenger_->send(reinterpret_cast<const char*>(send_buffer_), size);
return size;
DEBUG("size %d", size);
std::unique_lock<std::mutex> l(read_mutex_);
#if 0
if (buffer_.capacity() <= 2 * size) {
buffer_.resize(size);
out_queue_.push(std::move(buffer_));
} else {
out_queue_.push(Buffer(buffer_.data(), buffer_.data() + size));
}
DEBUG("Submitted data into output queue (%d bytes)", size);
can_write_.notify_all();
#else
ssize_t bytes_left = size;
while (bytes_left > 0) {
const ssize_t written = messenger_->send_raw(buffer_.data() + (size - bytes_left), bytes_left);
if (written < 0 ) {
if (errno != EINTR) {
ERROR("Failed to write data: %s", std::strerror(errno));
break;
}
WARNING("Socket busy, trying again");
} else
bytes_left -= written;
}
DEBUG("Sent data to remote (%d bytes)", buffer_.size());
#endif
return static_cast<int>(size);
}
const unsigned char* readFully(void*, size_t) override {
ERROR("Not implemented");
return nullptr;
const unsigned char* readFully(void *buffer, size_t length) override {
size_t size = length;
auto data = read(buffer, &size);
if (size < length)
return nullptr;
return data;
}
const unsigned char* read(void *data, size_t *size) override {
if (!wait_for_data() || buffer_.size() == 0) {
*size = 0;
const unsigned char* read(void *buffer, size_t *length) override {
std::unique_lock<std::mutex> l(read_mutex_);
if (stopped_) {
DEBUG("Aborting");
return nullptr;
}
auto bytes_to_read = *size;
if (bytes_to_read > buffer_.size())
bytes_to_read = buffer_.size();
if (current_read_buffer_left_ == 0 && in_queue_.empty()) {
DEBUG("Waiting for data to be available");
can_read_.wait(l);
if (stopped_) {
DEBUG("Aborting");
return nullptr;
}
}
DEBUG("Trying to read %d bytes", *length);
size_t read = 0;
auto buf = static_cast<unsigned char*>(buffer);
const auto buffer_end = buf + *length;
while (buf != buffer_end) {
if (current_read_buffer_left_ == 0) {
// If we don't have anymore buffers we need to stop reading here
if (in_queue_.empty())
break;
::memcpy(data, buffer_.data(), bytes_to_read);
buffer_.erase(buffer_.begin(), buffer_.begin() + bytes_to_read);
current_read_buffer_ = in_queue_.front();
in_queue_.pop();
current_read_buffer_left_ = current_read_buffer_.size();
}
const size_t current_size = std::min<size_t>(buffer_end - buf,
current_read_buffer_left_);
::memcpy(buffer, current_read_buffer_.data() +
(current_read_buffer_.size() - current_read_buffer_left_),
current_size);
read += current_size;
buf += current_size;
current_read_buffer_left_ -= current_size;
DEBUG("Size %d, left to read %d", current_size, current_read_buffer_left_);
}
if (read == 0)
return nullptr;
*length = read;
*size = bytes_to_read;
DEBUG("Read %d bytes (buffers left %d)", read, in_queue_.size());
return static_cast<const unsigned char*>(data);
return buf;
}
int writeFully(const void*, size_t) override {
int writeFully(const void *buffer, size_t length) override {
(void) buffer;
(void) length;
ERROR("Not implemented");
return 0;
return -1;
}
void forceStop() override {
std::unique_lock<std::mutex> l(mutex_);
buffer_.clear();
DEBUG("");
stopped_ = true;
can_read_.notify_all();
can_write_.notify_all();
}
void submitData(const std::vector<std::uint8_t> &data) {
std::unique_lock<std::mutex> l(mutex_);
for (const auto &byte : data)
buffer_.push_back(byte);
// buffer_.insert(buffer_.end(), data.begin(), data.end());
lock_.notify_one();
void post_data(const Buffer &buffer) {
DEBUG("Got data and waiting for lock");
std::unique_lock<std::mutex> l(read_mutex_);
DEBUG("Received %d bytes", buffer.size());
in_queue_.push(std::move(buffer));
can_read_.notify_all();
}
private:
bool wait_for_data() {
std::unique_lock<std::mutex> l(mutex_);
void worker_thread() {
DEBUG("Running send thread");
while (true) {
std::unique_lock<std::mutex> l(write_mutex_);
while (out_queue_.empty() && !stopped_) {
can_write_.wait(l, [&]() { return !out_queue_.empty() || stopped_; });
DEBUG("Woke up (queue size %d)", out_queue_.size());
}
if (stopped_)
break;
DEBUG("Going to send out %d bytes", out_queue_.front().size());
auto buffer = out_queue_.front();
out_queue_.pop();
if (!l.owns_lock())
return false;
ssize_t bytes_left = buffer.size();
while (bytes_left > 0) {
const ssize_t written = messenger_->send_raw(buffer.data() + (buffer.size() - bytes_left), bytes_left);
if (written < 0 ) {
if (errno != EINTR) {
ERROR("Failed to write data: %s", std::strerror(errno));
break;
}
WARNING("Socket busy, trying again");
} else
bytes_left -= written;
}
DEBUG("Sent %d bytes to client (queue size %d)", buffer.size(), out_queue_.size());
}
lock_.wait(l, [&]() { return !buffer_.empty(); });
return true;
DEBUG("Shutting down");
}
std::shared_ptr<anbox::network::SocketMessenger> messenger_;
std::mutex mutex_;
std::condition_variable lock_;
std::vector<std::uint8_t> buffer_;
unsigned char *send_buffer_ = nullptr;
size_t send_buffer_size_ = 0;
Buffer buffer_;
std::thread writer_thread_;
std::queue<Buffer> out_queue_;
Buffer current_write_buffer_;
size_t current_write_buffer_left_ = 0;
std::queue<Buffer> in_queue_;
Buffer current_read_buffer_;
size_t current_read_buffer_left_ = 0;
std::mutex write_mutex_;
std::mutex read_mutex_;
std::condition_variable can_write_;
std::condition_variable can_read_;
bool stopped_ = false;
};
}
namespace anbox {
namespace graphics {
emugl::Mutex OpenGlesMessageProcessor::global_lock{};
static int next_id = 0;
OpenGlesMessageProcessor::OpenGlesMessageProcessor(const std::shared_ptr<network::SocketMessenger> &messenger) :
messenger_(messenger),
stream_(std::make_shared<DirectIOStream>(messenger_)),
renderer_(RenderThread::create(stream_.get(), &global_lock)) {
id_(next_id++),
stream_(std::make_shared<DelayedIOStream>(messenger_)) {
// We have to read the client flags first before we can continue
// processing the actual commands
std::array<std::uint8_t, sizeof(unsigned int)> buffer;
messenger_->receive_msg(boost::asio::buffer(buffer));
unsigned int client_flags = 0;
auto err = messenger_->receive_msg(boost::asio::buffer(&client_flags, sizeof(unsigned int)));
if (err)
ERROR("%s", err.message());
renderer_.reset(RenderThread::create(stream_.get(), &global_lock));
renderer_->start();
DEBUG("Started new OpenGL ES message processor");
}
OpenGlesMessageProcessor::~OpenGlesMessageProcessor() {
......@@ -151,8 +256,9 @@ OpenGlesMessageProcessor::~OpenGlesMessageProcessor() {
}
bool OpenGlesMessageProcessor::process_data(const std::vector<std::uint8_t> &data) {
auto stream = std::static_pointer_cast<DirectIOStream>(stream_);
stream->submitData(data);
DEBUG("[%d] Got %d bytes", id_, data.size());
auto stream = std::static_pointer_cast<DelayedIOStream>(stream_);
stream->post_data(DelayedIOStream::Buffer(data.data(), data.data() + data.size()));
return true;
}
} // namespace graphics
......
......@@ -45,6 +45,7 @@ private:
static emugl::Mutex global_lock;
std::shared_ptr<network::SocketMessenger> messenger_;
int id_;
std::shared_ptr<IOStream> stream_;
std::shared_ptr<RenderThread> renderer_;
};
......
......@@ -74,6 +74,16 @@ Credentials BaseSocketMessenger<stream_protocol>::creds() const {
return {cr.pid, cr.uid, cr.gid};
}
template<typename stream_protocol>
ssize_t BaseSocketMessenger<stream_protocol>::send_raw(char const* data, size_t length)
{
VariableLengthArray<serialization_buffer_size> whole_message{length};
std::copy(data, data + length, whole_message.data());
std::unique_lock<std::mutex> lg(message_lock);
return ::send(socket_fd, data, length, 0);
}
template<typename stream_protocol>
void BaseSocketMessenger<stream_protocol>::send(char const* data, size_t length)
{
......@@ -88,6 +98,7 @@ void BaseSocketMessenger<stream_protocol>::send(char const* data, size_t length)
boost::asio::transfer_all());
}
catch (const boost::system::system_error &err) {
DEBUG("Got error: %s", err.what());
if (err.code() == boost::asio::error::try_again)
continue;
}
......
......@@ -37,6 +37,7 @@ public:
unsigned short local_port() const override;
void send(char const* data, size_t length) override;
ssize_t send_raw(char const* data, size_t length) override;
void async_receive_msg(AnboxReadHandler const& handle, boost::asio::mutable_buffers_1 const &buffer) override;
boost::system::error_code receive_msg(boost::asio::mutable_buffers_1 const& buffer) override;
size_t available_bytes() override;
......
......@@ -20,6 +20,7 @@
#define ANBOX_NETWORK_MESSAGE_SENDER_H_
#include <sys/types.h>
#include <cstddef>
namespace anbox {
namespace network {
......@@ -27,6 +28,7 @@ class MessageSender
{
public:
virtual void send(char const* data, size_t length) = 0;
virtual ssize_t send_raw(char const* data, size_t length) = 0;
protected:
MessageSender() = default;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册