提交 364f519e 编写于 作者: V Vitaliy Lyudvichenko

Threads and ProfileEvents in system.processes. [#CLICKHOUSE-2910]

上级 efdda9cc
......@@ -42,9 +42,6 @@ public:
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
{
if (!place || !rhs)
throw Exception("nullptr", ErrorCodes::LOGICAL_ERROR);
data(place).count += data(rhs).count;
}
......
......@@ -370,6 +370,7 @@ namespace ErrorCodes
extern const int QUERY_IS_PROHIBITED = 392;
extern const int THERE_IS_NO_QUERY = 393;
extern const int QUERY_WAS_CANCELLED = 394;
extern const int PTHREAD_ERROR = 395;
extern const int KEEPER_EXCEPTION = 999;
......
......@@ -175,7 +175,7 @@ namespace CurrentMemoryTracker
}
}
DB::ActionBlockerSingleThread::BlockHolder getCurrentMemoryTrackerBlocker()
DB::ActionBlockerSingleThread::LockHolder getCurrentMemoryTrackerBlocker()
{
return (DB::current_thread) ? DB::current_thread->memory_tracker.blocker.cancel() : DB::ActionBlockerSingleThread::BlockHolder(nullptr);
return (DB::current_thread) ? DB::current_thread->memory_tracker.blocker.cancel() : DB::ActionBlockerSingleThread::LockHolder(nullptr);
}
......@@ -123,4 +123,4 @@ namespace CurrentMemoryTracker
}
DB::ActionBlockerSingleThread::BlockHolder getCurrentMemoryTrackerBlocker();
DB::ActionBlockerSingleThread::LockHolder getCurrentMemoryTrackerBlocker();
......@@ -180,6 +180,13 @@ void Counters::reset()
resetCounters();
}
void Counters::getPartiallyAtomicSnapshot(Counters & res) const
{
for (Event i = 0; i < num_counters; ++i)
res.counters[i].store(counters[i], std::memory_order_relaxed);
}
const char * getDescription(Event event)
{
static const char * descriptions[] =
......
......@@ -29,7 +29,7 @@ namespace ProfileEvents
{
Counter * counters = nullptr;
Counters * parent = nullptr;
const Level level = Level::Thread;
Level level = Level::Thread;
std::unique_ptr<Counter[]> counters_holder;
Counters() = default;
......@@ -55,6 +55,8 @@ namespace ProfileEvents
} while (current != nullptr);
}
void getPartiallyAtomicSnapshot(Counters & res) const;
/// Reset metrics and parent
void reset();
......
......@@ -5,6 +5,7 @@
#include <sys/time.h>
#include <sys/resource.h>
#include <pthread.h>
#include <Interpreters/ProcessList.h>
......@@ -26,6 +27,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int PTHREAD_ERROR;
}
......@@ -105,7 +107,7 @@ struct RusageCounters
};
struct ThreadStatus::Payload
struct ThreadStatus::Impl
{
RusageCounters last_rusage;
};
......@@ -118,13 +120,40 @@ struct ThreadStatus::Payload
// LOG_DEBUG(thread_status->log, "Thread " << thread_status->poco_thread_number << " is finished");
//}
static pthread_once_t once_query_at_exit_callback = PTHREAD_ONCE_INIT;
static pthread_key_t tid_key_at_exit;
static void thread_destructor(void * data)
{
auto thread_status = static_cast<ThreadStatus *>(data);
thread_status->onExit();
LOG_DEBUG(thread_status->log, "Destruct thread " << thread_status->poco_thread_number);
thread_status->thread_exited = true;
}
static void thread_create_at_exit_key() {
if (0 != pthread_key_create(&tid_key_at_exit, thread_destructor))
throw Exception("Failed pthread_key_create", ErrorCodes::PTHREAD_ERROR);
}
ThreadStatus::ThreadStatus()
: poco_thread_number(Poco::ThreadNumber::get()),
performance_counters(ProfileEvents::Level::Thread),
log(&Poco::Logger::get("ThreadStatus"))
{
impl = std::make_shared<Impl>();
LOG_DEBUG(log, "Thread " << poco_thread_number << " created");
if (0 != pthread_once(&once_query_at_exit_callback, thread_create_at_exit_key))
throw Exception("Failed pthread_once", ErrorCodes::PTHREAD_ERROR);
if (nullptr != pthread_getspecific(tid_key_at_exit))
throw Exception("pthread_getspecific is already set", ErrorCodes::LOGICAL_ERROR);
if (0 != pthread_setspecific(tid_key_at_exit, static_cast<void *>(this)))
throw Exception("Failed pthread_setspecific", ErrorCodes::PTHREAD_ERROR);
}
ThreadStatus::~ThreadStatus()
......@@ -134,52 +163,59 @@ ThreadStatus::~ThreadStatus()
void ThreadStatus::init(QueryStatus * parent_query_, ProfileEvents::Counters * parent_counters, MemoryTracker * parent_memory_tracker)
{
if (!initialized)
if (initialized)
{
if (auto counters_parent = performance_counters.parent)
if (counters_parent != parent_counters)
LOG_WARNING(current_thread->log, "Parent performance counters are already set, overwrite");
LOG_WARNING(log, "Parent performance counters are already set, overwrite");
if (auto tracker_parent = memory_tracker.getParent())
if (tracker_parent != parent_memory_tracker)
LOG_WARNING(current_thread->log, "Parent memory tracker is already set, overwrite");
LOG_WARNING(log, "Parent memory tracker is already set, overwrite");
return;
}
initialized = true;
parent_query = parent_query_;
performance_counters.parent = parent_counters;
memory_tracker.setParent(parent_memory_tracker);
memory_tracker.setDescription("(for thread)");
initialized = true;
/// Attach current thread to list of query threads
if (parent_query)
{
std::lock_guard lock(parent_query->threads_mutex);
auto res = parent_query->thread_statuses.emplace(current_thread->poco_thread_number, current_thread);
if (!res.second && res.first->second.get() != current_thread.get())
throw Exception("Thread " + std::to_string(current_thread->poco_thread_number) + " is set twice", ErrorCodes::LOGICAL_ERROR);
}
onStart();
}
void ThreadStatus::onStart()
{
payload = std::make_shared<Payload>();
/// First init of thread rusage counters, set real time to zero, other metrics remain as is
payload->last_rusage.setFromCurrent();
RusageCounters::incrementProfileEvents(payload->last_rusage, RusageCounters::zeros(payload->last_rusage.real_time));
impl->last_rusage.setFromCurrent();
RusageCounters::incrementProfileEvents(impl->last_rusage, RusageCounters::zeros(impl->last_rusage.real_time));
}
void ThreadStatus::onExit()
{
if (!initialized || !payload)
return;
RusageCounters::updateProfileEvents(payload->last_rusage);
RusageCounters::updateProfileEvents(impl->last_rusage);
}
void ThreadStatus::reset()
{
std::lock_guard lock(mutex);
initialized = false;
parent_query = nullptr;
performance_counters.reset();
memory_tracker.reset();
memory_tracker.setParent(nullptr);
initialized = false;
}
......@@ -190,19 +226,11 @@ void ThreadStatus::setCurrentThreadParentQuery(QueryStatus * parent_process)
if (!parent_process)
{
current_thread->init(parent_process, nullptr, nullptr);
current_thread->init(nullptr, nullptr, nullptr);
return;
}
current_thread->init(parent_process, &parent_process->performance_counters, &parent_process->memory_tracker);
{
std::lock_guard lock(parent_process->threads_mutex);
auto res = parent_process->thread_statuses.emplace(current_thread->poco_thread_number, current_thread);
if (!res.second && res.first->second.get() != current_thread.get())
throw Exception("Thread " + std::to_string(current_thread->poco_thread_number) + " is set twice", ErrorCodes::LOGICAL_ERROR);
}
}
void ThreadStatus::setCurrentThreadFromSibling(const ThreadStatusPtr & sibling_thread)
......@@ -213,11 +241,39 @@ void ThreadStatus::setCurrentThreadFromSibling(const ThreadStatusPtr & sibling_t
if (sibling_thread == nullptr)
throw Exception("Sibling thread was not initialized", ErrorCodes::LOGICAL_ERROR);
std::lock_guard lock(sibling_thread->mutex);
current_thread->init(sibling_thread->parent_query, sibling_thread->performance_counters.parent, sibling_thread->memory_tracker.getParent());
}
struct ScopeCurrentThread
{
ScopeCurrentThread()
{
if (!current_thread)
std::terminate(); // current_thread must be initialized
}
~ScopeCurrentThread()
{
if (!current_thread)
std::terminate(); // current_thread must be initialized
if (Poco::ThreadNumber::get() != current_thread->poco_thread_number)
std::terminate(); // unexpected thread number
current_thread->onExit();
LOG_DEBUG(current_thread->log, "Thread " << current_thread->poco_thread_number << " is exiting");
current_thread->thread_exited = true;
}
};
thread_local ThreadStatusPtr current_thread = ThreadStatus::create();
/// Order of current_thread and current_thread_scope matters
static thread_local ScopeCurrentThread current_thread_scope;
}
......@@ -3,6 +3,8 @@
#include <Common/MemoryTracker.h>
#include <memory>
#include <ext/shared_ptr_helper.h>
#include <mutex>
namespace Poco
{
......@@ -15,16 +17,18 @@ namespace DB
struct QueryStatus;
struct ThreadStatus;
struct ScopeCurrentThread;
using ThreadStatusPtr = std::shared_ptr<ThreadStatus>;
struct ThreadStatus : public ext::shared_ptr_helper<ThreadStatus>, public std::enable_shared_from_this<ThreadStatus>
struct ThreadStatus : public ext::shared_ptr_helper<ThreadStatus>
{
UInt32 poco_thread_number = 0;
QueryStatus * parent_query = nullptr;
ProfileEvents::Counters performance_counters;
MemoryTracker memory_tracker;
bool thread_exited = false;
std::mutex mutex;
void init(QueryStatus * parent_query_, ProfileEvents::Counters * parent_counters, MemoryTracker * parent_memory_tracker);
void onStart();
......@@ -38,14 +42,16 @@ struct ThreadStatus : public ext::shared_ptr_helper<ThreadStatus>, public std::e
~ThreadStatus();
protected:
friend struct ScopeCurrentThread;
//protected:
ThreadStatus();
bool initialized = false;
Poco::Logger * log;
struct Payload;
std::shared_ptr<Payload> payload;
struct Impl;
std::shared_ptr<Impl> impl;
};
......
......@@ -1275,10 +1275,9 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(
bool final,
ThreadPool * thread_pool) const
{
auto converter = [&](size_t bucket, const ThreadStatusPtr & main_thread)
auto converter = [&](size_t bucket, ThreadStatusPtr main_thread)
{
if (main_thread)
ThreadStatus::setCurrentThreadFromSibling(main_thread);
ThreadStatus::setCurrentThreadFromSibling(main_thread);
return convertOneBucketToBlock(data_variants, method, final, bucket);
};
......@@ -1555,7 +1554,7 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl(
template <typename Method>
void NO_INLINE Aggregator:: mergeBucketImpl(
void NO_INLINE Aggregator::mergeBucketImpl(
ManyAggregatedDataVariants & data, Int32 bucket, Arena * arena) const
{
/// We connect all aggregation results to the first.
......@@ -1722,14 +1721,13 @@ private:
if (max_scheduled_bucket_num >= NUM_BUCKETS)
return;
parallel_merge_data->pool.schedule([this, main_thread=current_thread] () { thread(max_scheduled_bucket_num, main_thread); });
parallel_merge_data->pool.schedule(std::bind(&MergingAndConvertingBlockInputStream::thread, this,
max_scheduled_bucket_num, current_thread));
}
void thread(Int32 bucket_num, const ThreadStatusPtr & main_thread)
void thread(Int32 bucket_num, ThreadStatusPtr main_thread)
{
if (main_thread)
ThreadStatus::setCurrentThreadFromSibling(main_thread);
ThreadStatus::setCurrentThreadFromSibling(main_thread);
setThreadName("MergingAggregtd");
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};
......@@ -2034,10 +2032,9 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
LOG_TRACE(log, "Merging partially aggregated two-level data.");
auto merge_bucket = [&bucket_to_blocks, &result, this](Int32 bucket, Arena * aggregates_pool, const ThreadStatusPtr & main_thread)
auto merge_bucket = [&bucket_to_blocks, &result, this](Int32 bucket, Arena * aggregates_pool, ThreadStatusPtr main_thread)
{
if (main_thread)
ThreadStatus::setCurrentThreadFromSibling(main_thread);
ThreadStatus::setCurrentThreadFromSibling(main_thread);
for (Block & block : bucket_to_blocks[bucket])
{
......
......@@ -133,24 +133,17 @@ ProcessListEntry::~ProcessListEntry()
/// Finalize all threads statuses
{
current_thread->onExit();
std::lock_guard lock(it->threads_mutex);
for (auto & elem : it->thread_statuses)
{
auto & thread_status = elem.second;
thread_status->onExit();
thread_status->reset();
thread_status.reset();
}
it->thread_statuses.clear();
};
}
/// Also reset query master thread status
/// NOTE: we can't destroy it, since master threads are selected from fixed thread pool
if (current_thread)
current_thread->reset();
std::lock_guard<std::mutex> lock(parent.mutex);
/// The order of removing memory_trackers is important.
......
......@@ -59,6 +59,10 @@ struct QueryStatusInfo
size_t written_bytes;
Int64 memory_usage;
ClientInfo client_info;
/// Optional fields, filled by request
std::vector<UInt32> thread_numbers;
std::unique_ptr<ProfileEvents::Counters> profile_counters;
};
......@@ -81,7 +85,8 @@ struct QueryStatus
MemoryTracker memory_tracker;
mutable std::shared_mutex threads_mutex;
using QueryThreadStatuses = std::map<int, ThreadStatusPtr>; /// Key is Poco's thread_id
/// Key is Poco's thread_id
using QueryThreadStatuses = std::map<int, ThreadStatusPtr>;
QueryThreadStatuses thread_statuses;
CurrentMetrics::Increment num_queries_increment{CurrentMetrics::Query};
......@@ -264,16 +269,35 @@ public:
size_t size() const { return processes.size(); }
/// Get current state of process list.
Info getInfo() const
Info getInfo(bool get_thread_list = false, bool get_profile_events = false) const
{
std::lock_guard<std::mutex> lock(mutex);
Info per_query_infos;
Info res;
res.reserve(processes.size());
for (const auto & elem : processes)
res.emplace_back(elem.getInfo());
std::lock_guard<std::mutex> lock(mutex);
return res;
per_query_infos.reserve(processes.size());
for (const auto & process : processes)
{
per_query_infos.emplace_back(process.getInfo());
QueryStatusInfo & current_info = per_query_infos.back();
if (get_thread_list)
{
std::lock_guard lock(process.threads_mutex);
current_info.thread_numbers.reserve(process.thread_statuses.size());
for (auto & thread_status_elem : process.thread_statuses)
current_info.thread_numbers.emplace_back(thread_status_elem.second->poco_thread_number);
}
if (get_profile_events)
{
current_info.profile_counters = std::make_unique<ProfileEvents::Counters>(ProfileEvents::Level::Process);
process.performance_counters.getPartiallyAtomicSnapshot(*current_info.profile_counters);
}
}
return per_query_infos;
}
void setMaxSize(size_t max_size_)
......
......@@ -56,7 +56,7 @@ class QueryLog;
class PartLog;
/// System logs should be destroyed in destructor of last Context and before tables,
/// System logs should be destroyed in destructor of the last Context and before tables,
/// because SystemLog destruction makes insert query while flushing data into underlying tables
struct SystemLogs
{
......
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Interpreters/ProcessList.h>
#include <Storages/System/StorageSystemProcesses.h>
#include <Interpreters/Context.h>
#include <Storages/System/VirtualColumnsProcessor.h>
namespace DB
......@@ -50,6 +53,18 @@ StorageSystemProcesses::StorageSystemProcesses(const std::string & name_)
{ "memory_usage", std::make_shared<DataTypeInt64>() },
{ "query", std::make_shared<DataTypeString>() }
};
virtual_columns = ColumnsWithTypeAndName{
{
std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt32>()),
"thread_numbers"
},
{
std::make_shared<DataTypeArray>(std::make_shared<DataTypeTuple>(
DataTypes{std::make_shared<DataTypeString>(), std::make_shared<DataTypeUInt64 >()})),
"profile_counters"
}
};
}
......@@ -61,12 +76,18 @@ BlockInputStreams StorageSystemProcesses::read(
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
ProcessList::Info info = context.getProcessList().getInfo();
auto virtual_columns_processor = getVirtualColumnsProcessor();
bool has_thread_numbers, has_profile_counters;
Names real_columns = virtual_columns_processor.process(column_names, {&has_thread_numbers, &has_profile_counters});
check(real_columns);
Block res_block = getSampleBlock().cloneEmpty();
virtual_columns_processor.appendVirtualColumns(res_block);
MutableColumns res_columns = res_block.cloneEmptyColumns();
MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();
ProcessList::Info info = context.getProcessList().getInfo(has_thread_numbers, has_profile_counters);
for (const auto & process : info)
{
......@@ -98,10 +119,38 @@ BlockInputStreams StorageSystemProcesses::read(
res_columns[i++]->insert(UInt64(process.written_bytes));
res_columns[i++]->insert(process.memory_usage);
res_columns[i++]->insert(process.query);
if (has_thread_numbers)
{
Array thread_numbers;
thread_numbers.reserve(process.thread_numbers.size());
for (const UInt32 thread_number : process.thread_numbers)
thread_numbers.emplace_back(UInt64(thread_number));
res_columns[i++]->insert(std::move(thread_numbers));
}
if (has_profile_counters)
{
Array profile_counters;
profile_counters.reserve(ProfileEvents::Counters::num_counters);
for (ProfileEvents::Event event = 0; event < ProfileEvents::Counters::num_counters; ++event)
{
Array name_and_counter{
String(ProfileEvents::getDescription(event)),
UInt64((*process.profile_counters)[event].load(std::memory_order_relaxed))
};
profile_counters.emplace_back(Tuple(std::move(name_and_counter)));
}
res_columns[i++]->insert(std::move(profile_counters));
}
}
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(getSampleBlock().cloneWithColumns(std::move(res_columns))));
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(res_block.cloneWithColumns(std::move(res_columns))));
}
}
......@@ -2,6 +2,7 @@
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Storages/System/VirtualColumnsProcessor.h>
namespace DB
......@@ -12,7 +13,7 @@ class Context;
/** Implements `processes` system table, which allows you to get information about the queries that are currently executing.
*/
class StorageSystemProcesses : public ext::shared_ptr_helper<StorageSystemProcesses>, public IStorage
class StorageSystemProcesses : public ext::shared_ptr_helper<StorageSystemProcesses>, public StorageWithVirtualColumns
{
public:
std::string getName() const override { return "SystemProcesses"; }
......
......@@ -11,6 +11,7 @@
#include <Parsers/queryToString.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypesNumber.h>
#include <Storages/System/VirtualColumnsProcessor.h>
namespace DB
......@@ -21,95 +22,6 @@ namespace ErrorCodes
extern const int CANNOT_GET_CREATE_TABLE_QUERY;
}
/// Some virtual columns routines
namespace
{
bool hasColumn(const ColumnsWithTypeAndName & columns, const String & column_name)
{
for (const auto & column : columns)
{
if (column.name == column_name)
return true;
}
return false;
}
NameAndTypePair tryGetColumn(const ColumnsWithTypeAndName & columns, const String & column_name)
{
for (const auto & column : columns)
{
if (column.name == column_name)
return {column.name, column.type};
}
return {};
}
struct VirtualColumnsProcessor
{
explicit VirtualColumnsProcessor(const ColumnsWithTypeAndName & all_virtual_columns_)
: all_virtual_columns(all_virtual_columns_), virtual_columns_mask(all_virtual_columns_.size(), 0) {}
/// Separates real and virtual column names, returns real ones
Names process(const Names & column_names, const std::vector<bool *> & virtual_columns_exists_flag = {})
{
Names real_column_names;
if (!virtual_columns_exists_flag.empty())
{
for (size_t i = 0; i < all_virtual_columns.size(); ++i)
*virtual_columns_exists_flag.at(i) = false;
}
for (const String & column_name : column_names)
{
ssize_t virtual_column_index = -1;
for (size_t i = 0; i < all_virtual_columns.size(); ++i)
{
if (column_name == all_virtual_columns[i].name)
{
virtual_column_index = i;
break;
}
}
if (virtual_column_index >= 0)
{
auto index = static_cast<size_t>(virtual_column_index);
virtual_columns_mask[index] = 1;
if (!virtual_columns_exists_flag.empty())
*virtual_columns_exists_flag.at(index) = true;
}
else
{
real_column_names.emplace_back(column_name);
}
}
return real_column_names;
}
void appendVirtualColumns(Block & block)
{
for (size_t i = 0; i < all_virtual_columns.size(); ++i)
{
if (virtual_columns_mask[i])
block.insert(all_virtual_columns[i].cloneEmpty());
}
}
protected:
const ColumnsWithTypeAndName & all_virtual_columns;
std::vector<UInt8> virtual_columns_mask;
};
}
StorageSystemTables::StorageSystemTables(const std::string & name_)
: name(name_)
......@@ -160,7 +72,7 @@ BlockInputStreams StorageSystemTables::read(
bool has_create_table_query = false;
bool has_engine_full = false;
VirtualColumnsProcessor virtual_columns_processor(virtual_columns);
auto virtual_columns_processor = getVirtualColumnsProcessor();
real_column_names = virtual_columns_processor.process(column_names, {&has_metadata_modification_time, &has_create_table_query, &has_engine_full});
check(real_column_names);
......@@ -265,15 +177,4 @@ BlockInputStreams StorageSystemTables::read(
return {std::make_shared<OneBlockInputStream>(res_block)};
}
bool StorageSystemTables::hasColumn(const String & column_name) const
{
return DB::hasColumn(virtual_columns, column_name) || ITableDeclaration::hasColumn(column_name);
}
NameAndTypePair StorageSystemTables::getColumn(const String & column_name) const
{
auto virtual_column = DB::tryGetColumn(virtual_columns, column_name);
return !virtual_column.name.empty() ? virtual_column : ITableDeclaration::getColumn(column_name);
}
}
......@@ -2,7 +2,7 @@
#include <ext/shared_ptr_helper.h>
#include <Storages/IStorage.h>
#include <Storages/System/VirtualColumnsProcessor.h>
namespace DB
{
......@@ -12,7 +12,7 @@ class Context;
/** Implements the system table `tables`, which allows you to get information about all tables.
*/
class StorageSystemTables : public ext::shared_ptr_helper<StorageSystemTables>, public IStorage
class StorageSystemTables : public ext::shared_ptr_helper<StorageSystemTables>, public StorageWithVirtualColumns
{
public:
std::string getName() const override { return "SystemTables"; }
......@@ -26,15 +26,9 @@ public:
size_t max_block_size,
unsigned num_streams) override;
bool hasColumn(const String & column_name) const override;
NameAndTypePair getColumn(const String & column_name) const override;
private:
const std::string name;
ColumnsWithTypeAndName virtual_columns;
protected:
StorageSystemTables(const std::string & name_);
};
......
#include "VirtualColumnsProcessor.h"
namespace DB
{
bool hasColumn(const ColumnsWithTypeAndName & columns, const String & column_name)
{
for (const auto & column : columns)
{
if (column.name == column_name)
return true;
}
return false;
}
NameAndTypePair tryGetColumn(const ColumnsWithTypeAndName & columns, const String & column_name)
{
for (const auto & column : columns)
{
if (column.name == column_name)
return {column.name, column.type};
}
return {};
}
bool StorageWithVirtualColumns::hasColumnImpl(const ITableDeclaration * table, const String & column_name) const
{
return DB::hasColumn(virtual_columns, column_name) || table->ITableDeclaration::hasColumn(column_name);
}
NameAndTypePair StorageWithVirtualColumns::getColumnImpl(const ITableDeclaration * table, const String & column_name) const
{
auto virtual_column = DB::tryGetColumn(virtual_columns, column_name);
return !virtual_column.name.empty() ? virtual_column : table->ITableDeclaration::getColumn(column_name);
}
VirtualColumnsProcessor StorageWithVirtualColumns::getVirtualColumnsProcessor()
{
return VirtualColumnsProcessor(virtual_columns);
}
Names VirtualColumnsProcessor::process(const Names & column_names, const std::vector<bool *> & virtual_columns_exists_flag)
{
Names real_column_names;
if (!virtual_columns_exists_flag.empty())
{
for (size_t i = 0; i < all_virtual_columns.size(); ++i)
*virtual_columns_exists_flag.at(i) = false;
}
for (const String & column_name : column_names)
{
ssize_t virtual_column_index = -1;
for (size_t i = 0; i < all_virtual_columns.size(); ++i)
{
if (column_name == all_virtual_columns[i].name)
{
virtual_column_index = i;
break;
}
}
if (virtual_column_index >= 0)
{
auto index = static_cast<size_t>(virtual_column_index);
virtual_columns_mask[index] = 1;
if (!virtual_columns_exists_flag.empty())
*virtual_columns_exists_flag.at(index) = true;
}
else
{
real_column_names.emplace_back(column_name);
}
}
return real_column_names;
}
void VirtualColumnsProcessor::appendVirtualColumns(Block & block)
{
for (size_t i = 0; i < all_virtual_columns.size(); ++i)
{
if (virtual_columns_mask[i])
block.insert(all_virtual_columns[i].cloneEmpty());
}
}
}
#pragma once
#include <Common/TypeList.h>
#include <Core/NamesAndTypes.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <Storages/IStorage.h>
/// Some virtual columns routines
namespace DB
{
struct VirtualColumnsProcessor;
/// Adaptor for storages having virtual columns
class StorageWithVirtualColumns : public IStorage
{
public:
bool hasColumn(const String & column_name) const override
{
return hasColumnImpl(this, column_name);
}
NameAndTypePair getColumn(const String & column_name) const override
{
return getColumnImpl(this, column_name);
}
protected:
VirtualColumnsProcessor getVirtualColumnsProcessor();
ColumnsWithTypeAndName virtual_columns;
protected:
/// Use these methods instead of regular hasColumn / getColumn
bool hasColumnImpl(const ITableDeclaration * table, const String & column_name) const;
NameAndTypePair getColumnImpl(const ITableDeclaration * table, const String & column_name) const;
};
struct VirtualColumnsProcessor
{
explicit VirtualColumnsProcessor(const ColumnsWithTypeAndName & all_virtual_columns_)
: all_virtual_columns(all_virtual_columns_), virtual_columns_mask(all_virtual_columns_.size(), 0) {}
/// Separates real and virtual column names, returns real ones
Names process(const Names & column_names, const std::vector<bool *> & virtual_columns_exists_flag = {});
/// Append spevified virtual columns (with empty data) to the result block
void appendVirtualColumns(Block & block);
protected:
const ColumnsWithTypeAndName & all_virtual_columns;
std::vector<UInt8> virtual_columns_mask;
};
}
......@@ -25,7 +25,7 @@ function pack_unpack_compare()
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.buf"
${CLICKHOUSE_CLIENT} --query "DROP TABLE IF EXISTS test.buf_file"
rm -f "${CLICKHOUSE_TMP}/$buf_file" stderr
rm -f "$buf_file" stderr
echo $((res_orig - res_db_file)) $((res_orig - res_ch_local1)) $((res_orig - res_ch_local2))
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册