提交 9c868c91 编写于 作者: A Alexey Milovidov

Simplification

上级 e0000bef
#include <signal.h>
#include <poll.h>
#include <mutex>
#include <condition_variable>
#include <filesystem>
#include <ext/scope_guard.h>
......@@ -10,8 +10,9 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataStreams/OneBlockInputStream.h>
#include <IO/ReadHelpers.h>
#include <Common/PipeFDs.h>
#include <common/getThreadNumber.h>
namespace DB
......@@ -20,125 +21,171 @@ namespace DB
namespace ErrorCodes
{
extern const int CANNOT_SIGQUEUE;
extern const int CANNOT_MANIPULATE_SIGSET;
extern const int CANNOT_SET_SIGNAL_HANDLER;
extern const int CANNOT_READ_FROM_FILE_DESCRIPTOR;
extern const int LOGICAL_ERROR;
}
NamesAndTypesList StorageSystemStackTrace::getNamesAndTypes()
{
return
{
{ "thread_number", std::make_shared<DataTypeUInt32>() },
{ "query_id", std::make_shared<DataTypeString>() },
{ "trace", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>()) }
};
}
namespace
{
struct State
{
std::mutex mutex;
std::condition_variable condvar;
const pid_t expected_pid = getpid();
const int sig = SIGRTMIN;
UInt32 thread_number{0};
std::optional<StackTrace> stack_trace;
LazyPipeFDs notification_pipe;
size_t total_threads;
size_t threads_processed;
std::exception_ptr exception;
MutableColumns * columns_to_fill;
void signalHandler(int, siginfo_t * info, void * context)
{
/// In case malicious user is sending signals manually (for unknown reason).
/// If we don't check - it may break our synchronization.
if (info->si_pid != expected_pid)
return;
State() { reset(); }
/// All these methods are signal-safe.
const ucontext_t signal_context = *reinterpret_cast<ucontext_t *>(context);
stack_trace.emplace(signal_context);
thread_number = getThreadNumber();
void reset(MutableColumns * columns_to_fill_ = nullptr)
{
total_threads = 0;
threads_processed = 0;
exception = std::exception_ptr();
columns_to_fill = columns_to_fill_;
}
char buf = 0;
/// We cannot do anything if write failed.
(void)::write(notification_pipe.fds_rw[1], &buf, 1);
}
operator bool()
/// Wait for data in pipe.
bool wait(int timeout_ms)
{
while (true)
{
return columns_to_fill != nullptr;
int fd = notification_pipe.fds_rw[0];
pollfd poll_fd{fd, POLLIN, 0};
int poll_res = poll(&poll_fd, 1, timeout_ms);
if (poll_res < 0)
{
if (errno == EINTR)
{
--timeout_ms; /// Quite a hacky way to update timeout. Just to make sure we avoid infinite waiting.
if (timeout_ms == 0)
return false;
continue;
}
throwFromErrno("Cannot poll pipe", ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
}
if (poll_res == 0)
return false;
char buf = 0;
ssize_t read_res = ::read(fd, &buf, 1);
if (read_res == 1)
return true;
if (read_res < 0)
{
if (errno == EINTR)
continue;
throwFromErrno("Cannot read from pipe", ErrorCodes::CANNOT_READ_FROM_FILE_DESCRIPTOR);
}
throw Exception("Logical error: read for one byte returned more than one byte", ErrorCodes::LOGICAL_ERROR);
}
};
State state;
void callback(const siginfo_t &, const StackTrace & stack_trace, UInt32 thread_number)
{
std::lock_guard lock(state.mutex);
}
}
std::cerr << thread_number << " !\n";
if (!state)
return;
StorageSystemStackTrace::StorageSystemStackTrace(const String & name)
: IStorageSystemOneBlock<StorageSystemStackTrace>(name)
{
notification_pipe.open();
try
{
size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
/// Setup signal handler.
Array arr;
arr.reserve(stack_trace_size - stack_trace_offset);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
arr.emplace_back(reinterpret_cast<intptr_t>(stack_trace.getFrames()[i]));
struct sigaction sa{};
sa.sa_sigaction = signalHandler;
sa.sa_flags = SA_SIGINFO;
std::cerr << thread_number << " !!\n";
if (sigemptyset(&sa.sa_mask))
throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_MANIPULATE_SIGSET);
state.columns_to_fill->at(0)->insert(thread_number);
state.columns_to_fill->at(1)->insertDefault();
state.columns_to_fill->at(2)->insert(arr);
if (sigaddset(&sa.sa_mask, sig))
throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_MANIPULATE_SIGSET);
std::cerr << thread_number << " !!!\n";
if (sigaction(sig, &sa, nullptr))
throwFromErrno("Cannot set signal handler.", ErrorCodes::CANNOT_SET_SIGNAL_HANDLER);
}
++state.threads_processed;
std::cerr << state.threads_processed << ", " << state.total_threads << " !!!!\n";
if (state.threads_processed >= state.total_threads)
state.condvar.notify_one();
}
catch (...)
{
state.reset();
state.exception = std::current_exception();
state.condvar.notify_one();
}
}
NamesAndTypesList StorageSystemStackTrace::getNamesAndTypes()
{
return
{
{ "thread_number", std::make_shared<DataTypeUInt32>() },
{ "query_id", std::make_shared<DataTypeString>() },
{ "trace", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>()) }
};
}
void StorageSystemStackTrace::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const
{
std::unique_lock lock(state.mutex);
/// It shouldn't be possible to do concurrent reads from this table.
std::lock_guard lock(mutex);
/// Send a signal to every thread and wait for result.
/// We must wait for every thread one by one sequentially,
/// because there is a limit on number of queued signals in OS and otherwise signals may get lost.
/// Also, non-RT signals are not delivered if previous signal is handled right now (by default; but we use RT signals).
state.reset(&res_columns);
SCOPE_EXIT({ state.reset(); });
/// Obviously, results for different threads may be out of sync.
std::cerr << state.columns_to_fill->size() << "\n";
/// There is no better way to enumerate threads in a process other than looking into procfs.
/// Send a signal to every thread
std::filesystem::directory_iterator end;
for (std::filesystem::directory_iterator it("/proc/self/task"); it != end; ++it)
{
sigval sig_value;
sig_value.sival_ptr = reinterpret_cast<void *>(&callback);
sigval sig_value{};
pid_t tid = parse<pid_t>(it->path().filename());
if (0 == ::sigqueue(tid, SIGTSTP, sig_value))
std::cerr << "Requested: " << tid << "\n";
if (0 != ::sigqueue(tid, sig, sig_value))
{
++state.total_threads;
/// The thread may has been already finished.
if (ESRCH == errno)
continue;
throwFromErrno("Cannot send signal with sigqueue", ErrorCodes::CANNOT_SIGQUEUE);
}
/// Just in case we will wait for pipe with timeout. In case signal didn't get processed.
if (wait(100))
{
size_t stack_trace_size = stack_trace->getSize();
size_t stack_trace_offset = stack_trace->getOffset();
Array arr;
arr.reserve(stack_trace_size - stack_trace_offset);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
arr.emplace_back(reinterpret_cast<intptr_t>(stack_trace->getFrames()[i]));
std::cerr << tid << ", " << thread_number << " !!\n";
res_columns[0]->insert(thread_number);
res_columns[1]->insertDefault();
res_columns[2]->insert(arr);
}
else
{
/// The thread may have been already finished.
if (ESRCH != errno)
throwFromErrno("Cannot send signal with sigqueue", ErrorCodes::CANNOT_SIGQUEUE);
/// Cannot obtain a stack trace. But create a record in result nevertheless.
res_columns[0]->insert(tid);
res_columns[1]->insertDefault();
res_columns[2]->insertDefault();
}
}
std::cerr << state.threads_processed << ", " << state.total_threads << " sent\n";
/// Timeout one second for the case the signal pipe will be full and messages will be dropped.
state.condvar.wait_for(lock, std::chrono::seconds(1), []{ return state.threads_processed >= state.total_threads || state.exception; });
if (state.exception)
std::rethrow_exception(state.exception);
}
}
......
#pragma once
#include <mutex>
#include <ext/shared_ptr_helper.h>
#include <Storages/System/IStorageSystemOneBlock.h>
......@@ -17,13 +18,15 @@ class StorageSystemStackTrace : public ext::shared_ptr_helper<StorageSystemStack
friend struct ext::shared_ptr_helper<StorageSystemStackTrace>;
public:
String getName() const override { return "SystemStackTrace"; }
static NamesAndTypesList getNamesAndTypes();
StorageSystemStackTrace(const String & name);
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & query_info) const override;
mutable std::mutex mutex;
};
}
......
......@@ -105,7 +105,6 @@ static void terminateRequestedSignalHandler(int sig, siginfo_t * info, void * co
static void signalHandler(int sig, siginfo_t * info, void * context)
{
char buf[buf_size];
std::cerr << "Size of buffer: " << buf_size << "\n";
DB::WriteBufferFromFileDescriptorDiscardOnFailure out(signal_pipe.fds_rw[1], buf_size, buf);
const ucontext_t signal_context = *reinterpret_cast<ucontext_t *>(context);
......@@ -197,20 +196,6 @@ public:
DB::readPODBinary(stack_trace, in);
DB::readBinary(thread_num, in);
if (sig == SIGTSTP && info.si_value.sival_ptr)
{
/// TSTP signal with value is used to make a custom callback from this thread.
try
{
reinterpret_cast<SignalCallback *>(info.si_value.sival_ptr)(info, stack_trace, thread_num);
continue;
}
catch (...)
{
/// Failed to process, will use 'onFault' function.
}
}
/// This allows to receive more signals if failure happens inside onFault function.
/// Example: segfault while symbolizing stack trace.
std::thread([=] { onFault(sig, info, context, stack_trace, thread_num); }).detach();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册