提交 82102c68 编写于 作者: A Alexey Milovidov

Fixed conflicting headers on Fedora Rawhide while using Linux native AIO #2520

上级 3ea8ce1c
#pragma once
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#include <Common/Exception.h>
#include <common/logger_useful.h>
#include <ext/singleton.h>
#include <Poco/Logger.h>
#include <boost/range/iterator_range.hpp>
#include <boost/noncopyable.hpp>
#include <condition_variable>
#include <future>
#include <mutex>
#include <map>
#include <linux/aio_abi.h>
#include <sys/syscall.h>
#include <unistd.h>
#include <errno.h>
/** Small wrappers for asynchronous I/O.
*/
inline int io_setup(unsigned nr, aio_context_t * ctxp)
{
return syscall(__NR_io_setup, nr, ctxp);
}
inline int io_destroy(aio_context_t ctx)
{
return syscall(__NR_io_destroy, ctx);
}
/// last argument is an array of pointers technically speaking
inline int io_submit(aio_context_t ctx, long nr, struct iocb * iocbpp[])
{
return syscall(__NR_io_submit, ctx, nr, iocbpp);
}
inline int io_getevents(aio_context_t ctx, long min_nr, long max_nr, io_event *events, struct timespec * timeout)
{
return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events, timeout);
}
struct AIOContext : private boost::noncopyable
{
aio_context_t ctx;
AIOContext(unsigned int nr_events = 128)
{
ctx = 0;
if (io_setup(nr_events, &ctx) < 0)
DB::throwFromErrno("io_setup failed");
}
~AIOContext()
{
io_destroy(ctx);
}
};
namespace DB
{
namespace ErrorCodes
{
extern const int AIO_COMPLETION_ERROR;
extern const int AIO_SUBMIT_ERROR;
}
class AIOContextPool : public ext::singleton<AIOContextPool>
{
friend class ext::singleton<AIOContextPool>;
static const auto max_concurrent_events = 128;
static const auto timeout_sec = 1;
AIOContext aio_context{max_concurrent_events};
using ID = size_t;
using BytesRead = ssize_t;
/// Autoincremental id used to identify completed requests
ID id{};
mutable std::mutex mutex;
mutable std::condition_variable have_resources;
std::map<ID, std::promise<BytesRead>> promises;
std::atomic<bool> cancelled{false};
std::thread io_completion_monitor{&AIOContextPool::doMonitor, this};
~AIOContextPool()
{
cancelled.store(true, std::memory_order_relaxed);
io_completion_monitor.join();
}
void doMonitor()
{
/// continue checking for events unless cancelled
while (!cancelled.load(std::memory_order_relaxed))
waitForCompletion();
/// wait until all requests have been completed
while (!promises.empty())
waitForCompletion();
}
void waitForCompletion()
{
/// array to hold completion events
io_event events[max_concurrent_events];
try
{
const auto num_events = getCompletionEvents(events, max_concurrent_events);
fulfillPromises(events, num_events);
notifyProducers(num_events);
}
catch (...)
{
/// there was an error, log it, return to any producer and continue
reportExceptionToAnyProducer();
tryLogCurrentException("AIOContextPool::waitForCompletion()");
}
}
int getCompletionEvents(io_event events[], const int max_events)
{
timespec timeout{timeout_sec, 0};
auto num_events = 0;
/// request 1 to `max_events` events
while ((num_events = io_getevents(aio_context.ctx, 1, max_events, events, &timeout)) < 0)
if (errno != EINTR)
throwFromErrno("io_getevents: Failed to wait for asynchronous IO completion",
ErrorCodes::AIO_COMPLETION_ERROR, errno);
return num_events;
}
void fulfillPromises(const io_event events[], const int num_events)
{
if (num_events == 0)
return;
const std::lock_guard<std::mutex> lock{mutex};
/// look at returned events and find corresponding promise, set result and erase promise from map
for (const auto & event : boost::make_iterator_range(events, events + num_events))
{
/// get id from event
const auto id = event.data;
/// set value via promise and release it
const auto it = promises.find(id);
if (it == std::end(promises))
{
LOG_ERROR(&Poco::Logger::get("AIOcontextPool"), "Found io_event with unknown id " << id);
continue;
}
it->second.set_value(event.res);
promises.erase(it);
}
}
void notifyProducers(const int num_producers) const
{
if (num_producers == 0)
return;
if (num_producers > 1)
have_resources.notify_all();
else
have_resources.notify_one();
}
void reportExceptionToAnyProducer()
{
const std::lock_guard<std::mutex> lock{mutex};
const auto any_promise_it = std::begin(promises);
any_promise_it->second.set_exception(std::current_exception());
}
public:
/// Request AIO read operation for iocb, returns a future with number of bytes read
std::future<BytesRead> post(struct iocb & iocb)
{
std::unique_lock<std::mutex> lock{mutex};
/// get current id and increment it by one
const auto request_id = id++;
/// create a promise and put request in "queue"
promises.emplace(request_id, std::promise<BytesRead>{});
/// store id in AIO request for further identification
iocb.aio_data = request_id;
auto num_requests = 0;
struct iocb * requests[] { &iocb };
/// submit a request
while ((num_requests = io_submit(aio_context.ctx, 1, requests)) < 0)
{
if (errno == EAGAIN)
/// wait until at least one event has been completed (or a spurious wakeup) and try again
have_resources.wait(lock);
else if (errno != EINTR)
throwFromErrno("io_submit: Failed to submit a request for asynchronous IO",
ErrorCodes::AIO_SUBMIT_ERROR, errno);
}
return promises[request_id].get_future();
}
};
}
#endif
......@@ -273,8 +273,8 @@ namespace ErrorCodes
extern const int INFINITE_LOOP = 269;
extern const int CANNOT_COMPRESS = 270;
extern const int CANNOT_DECOMPRESS = 271;
extern const int AIO_SUBMIT_ERROR = 272;
extern const int AIO_COMPLETION_ERROR = 273;
extern const int CANNOT_IO_SUBMIT = 272;
extern const int CANNOT_IO_GETEVENTS = 273;
extern const int AIO_READ_ERROR = 274;
extern const int AIO_WRITE_ERROR = 275;
extern const int INDEX_NOT_USED = 277;
......@@ -376,6 +376,7 @@ namespace ErrorCodes
extern const int FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT = 399;
extern const int CANNOT_STAT = 400;
extern const int FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME = 401;
extern const int CANNOT_IOSETUP = 402;
extern const int KEEPER_EXCEPTION = 999;
......
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#include <boost/noncopyable.hpp>
#include <Common/Exception.h>
#include <sys/syscall.h>
#include <unistd.h>
#include <IO/AIO.h>
/** Small wrappers for asynchronous I/O.
*/
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_IOSETUP;
}
}
int io_setup(unsigned nr, aio_context_t * ctxp)
{
return syscall(__NR_io_setup, nr, ctxp);
}
int io_destroy(aio_context_t ctx)
{
return syscall(__NR_io_destroy, ctx);
}
int io_submit(aio_context_t ctx, long nr, struct iocb * iocbpp[])
{
return syscall(__NR_io_submit, ctx, nr, iocbpp);
}
int io_getevents(aio_context_t ctx, long min_nr, long max_nr, io_event * events, struct timespec * timeout)
{
return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events, timeout);
}
AIOContext::AIOContext(unsigned int nr_events)
{
ctx = 0;
if (io_setup(nr_events, &ctx) < 0)
DB::throwFromErrno("io_setup failed", DB::ErrorCodes::CANNOT_IOSETUP);
}
AIOContext::~AIOContext()
{
io_destroy(ctx);
}
#endif
#pragma once
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
/// https://stackoverflow.com/questions/20759750/resolving-redefinition-of-timespec-in-time-h
#define timespec linux_timespec
#include <linux/aio_abi.h>
#undef timespec
/** Small wrappers for asynchronous I/O.
*/
int io_setup(unsigned nr, aio_context_t * ctxp);
int io_destroy(aio_context_t ctx);
/// last argument is an array of pointers technically speaking
int io_submit(aio_context_t ctx, long nr, struct iocb * iocbpp[]);
int io_getevents(aio_context_t ctx, long min_nr, long max_nr, io_event * events, struct timespec * timeout);
struct AIOContext : private boost::noncopyable
{
aio_context_t ctx;
AIOContext(unsigned int nr_events = 128);
~AIOContext();
};
#endif
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#include <Common/Exception.h>
#include <common/logger_useful.h>
#include <Poco/Logger.h>
#include <boost/range/iterator_range.hpp>
#include <linux/aio_abi.h>
#include <errno.h>
#include <IO/AIOContextPool.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_IO_SUBMIT;
extern const int CANNOT_IO_GETEVENTS;
}
AIOContextPool::~AIOContextPool()
{
cancelled.store(true, std::memory_order_relaxed);
io_completion_monitor.join();
}
void AIOContextPool::doMonitor()
{
/// continue checking for events unless cancelled
while (!cancelled.load(std::memory_order_relaxed))
waitForCompletion();
/// wait until all requests have been completed
while (!promises.empty())
waitForCompletion();
}
void AIOContextPool::waitForCompletion()
{
/// array to hold completion events
io_event events[max_concurrent_events];
try
{
const auto num_events = getCompletionEvents(events, max_concurrent_events);
fulfillPromises(events, num_events);
notifyProducers(num_events);
}
catch (...)
{
/// there was an error, log it, return to any producer and continue
reportExceptionToAnyProducer();
tryLogCurrentException("AIOContextPool::waitForCompletion()");
}
}
int AIOContextPool::getCompletionEvents(io_event events[], const int max_events)
{
timespec timeout{timeout_sec, 0};
auto num_events = 0;
/// request 1 to `max_events` events
while ((num_events = io_getevents(aio_context.ctx, 1, max_events, events, &timeout)) < 0)
if (errno != EINTR)
throwFromErrno("io_getevents: Failed to wait for asynchronous IO completion", ErrorCodes::CANNOT_IO_GETEVENTS, errno);
return num_events;
}
void AIOContextPool::fulfillPromises(const io_event events[], const int num_events)
{
if (num_events == 0)
return;
const std::lock_guard<std::mutex> lock{mutex};
/// look at returned events and find corresponding promise, set result and erase promise from map
for (const auto & event : boost::make_iterator_range(events, events + num_events))
{
/// get id from event
const auto id = event.data;
/// set value via promise and release it
const auto it = promises.find(id);
if (it == std::end(promises))
{
LOG_ERROR(&Poco::Logger::get("AIOcontextPool"), "Found io_event with unknown id " << id);
continue;
}
it->second.set_value(event.res);
promises.erase(it);
}
}
void AIOContextPool::notifyProducers(const int num_producers) const
{
if (num_producers == 0)
return;
if (num_producers > 1)
have_resources.notify_all();
else
have_resources.notify_one();
}
void AIOContextPool::reportExceptionToAnyProducer()
{
const std::lock_guard<std::mutex> lock{mutex};
const auto any_promise_it = std::begin(promises);
any_promise_it->second.set_exception(std::current_exception());
}
std::future<AIOContextPool::BytesRead> AIOContextPool::post(struct iocb & iocb)
{
std::unique_lock<std::mutex> lock{mutex};
/// get current id and increment it by one
const auto request_id = id++;
/// create a promise and put request in "queue"
promises.emplace(request_id, std::promise<BytesRead>{});
/// store id in AIO request for further identification
iocb.aio_data = request_id;
auto num_requests = 0;
struct iocb * requests[] { &iocb };
/// submit a request
while ((num_requests = io_submit(aio_context.ctx, 1, requests)) < 0)
{
if (errno == EAGAIN)
/// wait until at least one event has been completed (or a spurious wakeup) and try again
have_resources.wait(lock);
else if (errno != EINTR)
throwFromErrno("io_submit: Failed to submit a request for asynchronous IO", ErrorCodes::CANNOT_IO_SUBMIT);
}
return promises[request_id].get_future();
}
}
#endif
#pragma once
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#include <ext/singleton.h>
#include <condition_variable>
#include <future>
#include <mutex>
#include <map>
#include <IO/AIO.h>
namespace DB
{
class AIOContextPool : public ext::singleton<AIOContextPool>
{
friend class ext::singleton<AIOContextPool>;
static const auto max_concurrent_events = 128;
static const auto timeout_sec = 1;
AIOContext aio_context{max_concurrent_events};
using ID = size_t;
using BytesRead = ssize_t;
/// Autoincremental id used to identify completed requests
ID id{};
mutable std::mutex mutex;
mutable std::condition_variable have_resources;
std::map<ID, std::promise<BytesRead>> promises;
std::atomic<bool> cancelled{false};
std::thread io_completion_monitor{&AIOContextPool::doMonitor, this};
~AIOContextPool();
void doMonitor();
void waitForCompletion();
int getCompletionEvents(io_event events[], const int max_events);
void fulfillPromises(const io_event events[], const int num_events);
void notifyProducers(const int num_producers) const;
void reportExceptionToAnyProducer();
public:
/// Request AIO read operation for iocb, returns a future with number of bytes read
std::future<BytesRead> post(struct iocb & iocb);
};
}
#endif
#if !(defined(__FreeBSD__) || defined(__APPLE__) || defined(_MSC_VER))
#include <IO/ReadBufferAIO.h>
#include <IO/AIOContextPool.h>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
#include <Core/Defines.h>
......
......@@ -5,11 +5,12 @@
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/AIO.h>
#include <Core/Defines.h>
#include <Common/AIO.h>
#include <Common/CurrentMetrics.h>
#include <string>
#include <limits>
#include <future>
#include <unistd.h>
#include <fcntl.h>
......
......@@ -32,9 +32,9 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int AIO_READ_ERROR;
extern const int AIO_SUBMIT_ERROR;
extern const int AIO_WRITE_ERROR;
extern const int AIO_COMPLETION_ERROR;
extern const int CANNOT_IO_SUBMIT;
extern const int CANNOT_IO_GETEVENTS;
extern const int CANNOT_TRUNCATE_FILE;
extern const int CANNOT_FSYNC;
}
......@@ -119,12 +119,12 @@ void WriteBufferAIO::nextImpl()
request.aio_offset = region_aligned_begin;
/// Send the request.
while (io_submit(aio_context.ctx, request_ptrs.size(), request_ptrs.data()) < 0)
while (io_submit(aio_context.ctx, 1, &request_ptr) < 0)
{
if (errno != EINTR)
{
aio_failed = true;
throw Exception("Cannot submit request for asynchronous IO on file " + filename, ErrorCodes::AIO_SUBMIT_ERROR);
throw Exception("Cannot submit request for asynchronous IO on file " + filename, ErrorCodes::CANNOT_IO_SUBMIT);
}
}
......@@ -184,17 +184,18 @@ bool WriteBufferAIO::waitForAIOCompletion()
CurrentMetrics::Increment metric_increment{CurrentMetrics::Write};
while (io_getevents(aio_context.ctx, events.size(), events.size(), events.data(), nullptr) < 0)
io_event event;
while (io_getevents(aio_context.ctx, 1, 1, &event, nullptr) < 0)
{
if (errno != EINTR)
{
aio_failed = true;
throw Exception("Failed to wait for asynchronous IO completion on file " + filename, ErrorCodes::AIO_COMPLETION_ERROR);
throw Exception("Failed to wait for asynchronous IO completion on file " + filename, ErrorCodes::CANNOT_IO_GETEVENTS);
}
}
is_pending_write = false;
bytes_written = events[0].res;
bytes_written = event.res;
ProfileEvents::increment(ProfileEvents::WriteBufferAIOWrite);
ProfileEvents::increment(ProfileEvents::WriteBufferAIOWriteBytes, bytes_written);
......
......@@ -6,7 +6,7 @@
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <Core/Defines.h>
#include <Common/AIO.h>
#include <IO/AIO.h>
#include <Common/CurrentMetrics.h>
#include <string>
......@@ -58,9 +58,8 @@ private:
BufferWithOwnMemory<WriteBuffer> flush_buffer;
/// Description of the asynchronous write request.
iocb request = {};
std::vector<iocb *> request_ptrs{&request};
std::vector<io_event> events{1};
iocb request{};
iocb * request_ptr{&request};
AIOContext aio_context{1};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册