提交 b9105e13 编写于 作者: A Alexey Milovidov

Avoid deadlock in TraceCollector

上级 72d543f3
......@@ -437,6 +437,7 @@ namespace ErrorCodes
extern const int CANNOT_CREATE_TIMER = 460;
extern const int CANNOT_SET_TIMER_PERIOD = 461;
extern const int CANNOT_DELETE_TIMER = 462;
extern const int CANNOT_FCNTL = 463;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;
......
......@@ -16,6 +16,33 @@ extern LazyPipe trace_pipe;
namespace
{
/** Write to file descriptor but drop the data if write would block or fail.
* To use within signal handler. Motivating example: a signal handler invoked during execution of malloc
* should not block because some mutex (or even worse - a spinlock) may be held.
*/
class WriteBufferDiscardOnFailure : public WriteBufferFromFileDescriptor
{
protected:
void nextImpl() override
{
size_t bytes_written = 0;
while (bytes_written != offset())
{
ssize_t res = ::write(fd, working_buffer.begin() + bytes_written, offset() - bytes_written);
if ((-1 == res || 0 == res) && errno != EINTR)
break; /// Discard
if (res > 0)
bytes_written += res;
}
}
public:
using WriteBufferFromFileDescriptor::WriteBufferFromFileDescriptor;
~WriteBufferDiscardOnFailure() override {}
};
/// Normally query_id is a UUID (string with a fixed length) but user can provide custom query_id.
/// Thus upper bound on query_id length should be introduced to avoid buffer overflow in signal handler.
constexpr size_t QUERY_ID_MAX_LEN = 1024;
......@@ -33,7 +60,7 @@ namespace
sizeof(StackTrace) + // collected stack trace
sizeof(TimerType); // timer type
char buffer[buf_size];
WriteBufferFromFileDescriptor out(trace_pipe.fds_rw[1], buf_size, buffer);
WriteBufferDiscardOnFailure out(trace_pipe.fds_rw[1], buf_size, buffer);
StringRef query_id = CurrentThread::getQueryId();
query_id.size = std::min(query_id.size, QUERY_ID_MAX_LEN);
......
......@@ -12,6 +12,10 @@
#include <Common/Exception.h>
#include <Interpreters/TraceLog.h>
#include <unistd.h>
#include <fcntl.h>
namespace DB
{
......@@ -21,6 +25,7 @@ namespace ErrorCodes
{
extern const int NULL_POINTER_DEREFERENCE;
extern const int THREAD_IS_NOT_JOINABLE;
extern const int CANNOT_FCNTL;
}
TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log)
......@@ -31,6 +36,21 @@ TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log)
throw Exception("Invalid trace log pointer passed", ErrorCodes::NULL_POINTER_DEREFERENCE);
trace_pipe.open();
/** Turn write end of pipe to non-blocking mode to avoid deadlocks
* when QueryProfiler is invoked under locks and TraceCollector cannot pull data from pipe.
*/
int flags = fcntl(trace_pipe.fds_rw[1], F_GETFL, 0);
if (-1 == fcntl(trace_pipe.fds_rw[1], F_SETFL, flags | O_NONBLOCK))
throwFromErrno("Cannot set non-blocking mode of pipe", ErrorCodes::CANNOT_FCNTL);
/** Increase pipe size to avoid slowdown during fine-grained trace collection.
*/
int pipe_size = fcntl(trace_pipe.fds_rw[1], F_GETPIPE_SZ);
for (errno = 0; errno != EPERM && pipe_size <= 1048576; pipe_size *= 2)
if (-1 == fcntl(trace_pipe.fds_rw[1], F_SETPIPE_SZ, pipe_size) && errno != EPERM)
throwFromErrno("Cannot increase pipe capacity to " + toString(pipe_size), ErrorCodes::CANNOT_FCNTL);
thread = ThreadFromGlobalPool(&TraceCollector::run, this);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册