提交 71f99a27 编写于 作者: A alesapin

Compileable getSampleBlockWithColumns in StorageInMemoryMetadata

上级 08b9aa6b
......@@ -377,14 +377,14 @@ InterpreterSelectQuery::InterpreterSelectQuery(
if (storage)
{
source_header = storage->getSampleBlockForColumns(required_columns);
source_header = metadata_snapshot->getSampleBlockForColumns(required_columns, storage->getVirtuals());
/// Fix source_header for filter actions.
if (row_policy_filter)
{
filter_info = std::make_shared<FilterInfo>();
filter_info->column_name = generateFilterActions(filter_info->actions, row_policy_filter, required_columns);
source_header = storage->getSampleBlockForColumns(filter_info->actions->getRequiredColumns());
source_header = metadata_snapshot->getSampleBlockForColumns(filter_info->actions->getRequiredColumns(), storage->getVirtuals());
}
}
......@@ -1336,7 +1336,7 @@ void InterpreterSelectQuery::executeFetchColumns(
if (pipes.empty())
{
Pipe pipe(std::make_shared<NullSource>(storage->getSampleBlockForColumns(required_columns)));
Pipe pipe(std::make_shared<NullSource>(metadata_snapshot->getSampleBlockForColumns(required_columns, storage->getVirtuals())));
if (query_info.prewhere_info)
{
......
......@@ -62,37 +62,6 @@ Block IStorage::getSampleBlock() const
return res;
}
Block IStorage::getSampleBlockForColumns(const Names & column_names) const
{
Block res;
std::unordered_map<String, DataTypePtr> columns_map;
NamesAndTypesList all_columns = getColumns().getAll();
for (const auto & elem : all_columns)
columns_map.emplace(elem.name, elem.type);
/// Virtual columns must be appended after ordinary, because user can
/// override them.
for (const auto & column : getVirtuals())
columns_map.emplace(column.name, column.type);
for (const auto & name : column_names)
{
auto it = columns_map.find(name);
if (it != columns_map.end())
{
res.insert({it->second->createColumn(), it->second, it->first});
}
else
{
throw Exception(
"Column " + backQuote(name) + " not found in table " + getStorageID().getNameForLogs(), ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
}
}
return res;
}
namespace
{
......
......@@ -159,7 +159,6 @@ public: /// thread-unsafe part. lockStructure must be acquired
void setInMemoryMetadata(const StorageInMemoryMetadata & metadata_) { metadata = std::make_shared<StorageInMemoryMetadata>(metadata_); }
Block getSampleBlock() const; /// ordinary + materialized.
Block getSampleBlockForColumns(const Names & column_names) const; /// ordinary + materialized + aliases + virtuals.
/// Verify that all the requested names are in the table and are set correctly:
/// list of names is not empty and the names do not repeat.
......
......@@ -26,8 +26,8 @@ KafkaBlockInputStream::KafkaBlockInputStream(
, max_block_size(max_block_size_)
, commit_in_suffix(commit_in_suffix_)
, non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized())
, virtual_header(storage.getSampleBlockForColumns(
{"_topic", "_key", "_offset", "_partition", "_timestamp", "_timestamp_ms", "_headers.name", "_headers.value"}))
, virtual_header(metadata_snapshot->getSampleBlockForColumns(
{"_topic", "_key", "_offset", "_partition", "_timestamp", "_timestamp_ms", "_headers.name", "_headers.value"}, storage.getVirtuals()))
{
}
......@@ -44,7 +44,7 @@ KafkaBlockInputStream::~KafkaBlockInputStream()
Block KafkaBlockInputStream::getHeader() const
{
return storage.getSampleBlockForColumns(column_names);
return metadata_snapshot->getSampleBlockForColumns(column_names, storage.getVirtuals());
}
void KafkaBlockInputStream::readPrefixImpl()
......
......@@ -20,6 +20,7 @@ namespace ErrorCodes
MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
Block header,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
UInt64 max_block_size_rows_,
UInt64 preferred_block_size_bytes_,
......@@ -27,16 +28,16 @@ MergeTreeBaseSelectProcessor::MergeTreeBaseSelectProcessor(
const MergeTreeReaderSettings & reader_settings_,
bool use_uncompressed_cache_,
const Names & virt_column_names_)
:
SourceWithProgress(getHeader(std::move(header), prewhere_info_, virt_column_names_)),
storage(storage_),
prewhere_info(prewhere_info_),
max_block_size_rows(max_block_size_rows_),
preferred_block_size_bytes(preferred_block_size_bytes_),
preferred_max_column_in_block_size_bytes(preferred_max_column_in_block_size_bytes_),
reader_settings(reader_settings_),
use_uncompressed_cache(use_uncompressed_cache_),
virt_column_names(virt_column_names_)
: SourceWithProgress(getHeader(std::move(header), prewhere_info_, virt_column_names_))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, prewhere_info(prewhere_info_)
, max_block_size_rows(max_block_size_rows_)
, preferred_block_size_bytes(preferred_block_size_bytes_)
, preferred_max_column_in_block_size_bytes(preferred_max_column_in_block_size_bytes_)
, reader_settings(reader_settings_)
, use_uncompressed_cache(use_uncompressed_cache_)
, virt_column_names(virt_column_names_)
{
header_without_virtual_columns = getPort().getHeader();
......
......@@ -22,6 +22,7 @@ public:
MergeTreeBaseSelectProcessor(
Block header,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
UInt64 max_block_size_rows_,
UInt64 preferred_block_size_bytes_,
......@@ -54,6 +55,7 @@ protected:
protected:
const MergeTreeData & storage;
StorageMetadataPtr metadata_snapshot;
PrewhereInfoPtr prewhere_info;
......
......@@ -579,7 +579,7 @@ public:
/// parts should be sorted.
MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart(
const FutureMergedMutatedPart & future_part,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
MergeList::Entry & merge_entry,
TableStructureReadLockHolder &,
time_t time_of_merge,
......@@ -712,7 +712,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
for (const auto & part : parts)
{
auto input = std::make_unique<MergeTreeSequentialSource>(
data, part, merging_column_names, read_with_direct_io, true);
data, metadata_snapshot, part, merging_column_names, read_with_direct_io, true);
input->setProgressCallback(
MergeProgressCallback(merge_entry, watch_prev_elapsed, horizontal_stage_progress));
......@@ -898,7 +898,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
for (size_t part_num = 0; part_num < parts.size(); ++part_num)
{
auto column_part_source = std::make_shared<MergeTreeSequentialSource>(
data, parts[part_num], column_names, read_with_direct_io, true);
data, metadata_snapshot, parts[part_num], column_names, read_with_direct_io, true);
column_part_source->setProgressCallback(
MergeProgressCallback(merge_entry, watch_prev_elapsed, column_progress));
......
......@@ -147,6 +147,7 @@ static RelativeSize convertAbsoluteSampleSizeToRelative(const ASTPtr & node, siz
Pipes MergeTreeDataSelectExecutor::read(
const Names & column_names_to_return,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
const UInt64 max_block_size,
......@@ -154,13 +155,15 @@ Pipes MergeTreeDataSelectExecutor::read(
const PartitionIdToMaxBlock * max_block_numbers_to_read) const
{
return readFromParts(
data.getDataPartsVector(), column_names_to_return, query_info, context,
max_block_size, num_streams, max_block_numbers_to_read);
data.getDataPartsVector(), column_names_to_return, metadata_snapshot,
query_info, context, max_block_size, num_streams,
max_block_numbers_to_read);
}
Pipes MergeTreeDataSelectExecutor::readFromParts(
MergeTreeData::DataPartsVector parts,
const Names & column_names_to_return,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
const UInt64 max_block_size,
......@@ -205,7 +208,7 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
}
}
NamesAndTypesList available_real_columns = data.getColumns().getAllPhysical();
NamesAndTypesList available_real_columns = metadata_snapshot->getColumns().getAllPhysical();
/// If there are only virtual columns in the query, you must request at least one non-virtual one.
if (real_column_names.empty())
......@@ -629,6 +632,7 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
std::move(parts_with_ranges),
num_streams,
column_names_to_read,
metadata_snapshot,
max_block_size,
settings.use_uncompressed_cache,
query_info,
......@@ -650,6 +654,7 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
std::move(parts_with_ranges),
num_streams,
column_names_to_read,
metadata_snapshot,
max_block_size,
settings.use_uncompressed_cache,
query_info,
......@@ -665,6 +670,7 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
std::move(parts_with_ranges),
num_streams,
column_names_to_read,
metadata_snapshot,
max_block_size,
settings.use_uncompressed_cache,
query_info,
......@@ -727,6 +733,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
RangesInDataParts && parts,
size_t num_streams,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
UInt64 max_block_size,
bool use_uncompressed_cache,
const SelectQueryInfo & query_info,
......@@ -783,8 +790,18 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
num_streams = std::max((sum_marks + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, parts.size());
MergeTreeReadPoolPtr pool = std::make_shared<MergeTreeReadPool>(
num_streams, sum_marks, min_marks_for_concurrent_read, parts, data, query_info.prewhere_info, true,
column_names, MergeTreeReadPool::BackoffSettings(settings), settings.preferred_block_size_bytes, false);
num_streams,
sum_marks,
min_marks_for_concurrent_read,
parts,
data,
metadata_snapshot,
query_info.prewhere_info,
true,
column_names,
MergeTreeReadPool::BackoffSettings(settings),
settings.preferred_block_size_bytes,
false);
/// Let's estimate total number of rows for progress bar.
LOG_TRACE(log, "Reading approx. {} rows with {} streams", total_rows, num_streams);
......@@ -792,8 +809,9 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
for (size_t i = 0; i < num_streams; ++i)
{
auto source = std::make_shared<MergeTreeThreadSelectBlockInputProcessor>(
i, pool, min_marks_for_concurrent_read, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, data, use_uncompressed_cache,
i, pool, min_marks_for_concurrent_read, max_block_size,
settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes,
data, metadata_snapshot, use_uncompressed_cache,
query_info.prewhere_info, reader_settings, virt_columns);
if (i == 0)
......@@ -812,7 +830,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
for (const auto & part : parts)
{
auto source = std::make_shared<MergeTreeSelectProcessor>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
data, metadata_snapshot, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, use_uncompressed_cache,
query_info.prewhere_info, true, reader_settings, virt_columns, part.part_index_in_query);
......@@ -845,6 +863,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
RangesInDataParts && parts,
size_t num_streams,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
UInt64 max_block_size,
bool use_uncompressed_cache,
const SelectQueryInfo & query_info,
......@@ -1004,18 +1023,38 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
if (input_order_info->direction == 1)
{
pipes.emplace_back(std::make_shared<MergeTreeSelectProcessor>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, ranges_to_get_from_part,
use_uncompressed_cache, query_info.prewhere_info, true, reader_settings,
virt_columns, part.part_index_in_query));
data,
metadata_snapshot,
part.data_part,
max_block_size,
settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes,
column_names,
ranges_to_get_from_part,
use_uncompressed_cache,
query_info.prewhere_info,
true,
reader_settings,
virt_columns,
part.part_index_in_query));
}
else
{
pipes.emplace_back(std::make_shared<MergeTreeReverseSelectProcessor>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, ranges_to_get_from_part,
use_uncompressed_cache, query_info.prewhere_info, true, reader_settings,
virt_columns, part.part_index_in_query));
data,
metadata_snapshot,
part.data_part,
max_block_size,
settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes,
column_names,
ranges_to_get_from_part,
use_uncompressed_cache,
query_info.prewhere_info,
true,
reader_settings,
virt_columns,
part.part_index_in_query));
pipes.back().addSimpleTransform(std::make_shared<ReverseTransform>(pipes.back().getHeader()));
}
......@@ -1050,6 +1089,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts,
size_t num_streams,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
UInt64 max_block_size,
bool use_uncompressed_cache,
const SelectQueryInfo & query_info,
......@@ -1088,7 +1128,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
for (const auto & part : parts)
{
auto source_processor = std::make_shared<MergeTreeSelectProcessor>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
data, metadata_snapshot, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, use_uncompressed_cache,
query_info.prewhere_info, true, reader_settings,
virt_columns, part.part_index_in_query);
......
......@@ -26,6 +26,7 @@ public:
Pipes read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
UInt64 max_block_size,
......@@ -35,6 +36,7 @@ public:
Pipes readFromParts(
MergeTreeData::DataPartsVector parts,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
UInt64 max_block_size,
......@@ -50,6 +52,7 @@ private:
RangesInDataParts && parts,
size_t num_streams,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
UInt64 max_block_size,
bool use_uncompressed_cache,
const SelectQueryInfo & query_info,
......@@ -62,6 +65,7 @@ private:
RangesInDataParts && parts,
size_t num_streams,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
UInt64 max_block_size,
bool use_uncompressed_cache,
const SelectQueryInfo & query_info,
......@@ -75,6 +79,7 @@ private:
RangesInDataParts && parts,
size_t num_streams,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
UInt64 max_block_size,
bool use_uncompressed_cache,
const SelectQueryInfo & query_info,
......
......@@ -17,17 +17,28 @@ namespace ErrorCodes
namespace DB
{
MergeTreeReadPool::MergeTreeReadPool(
const size_t threads_, const size_t sum_marks_, const size_t min_marks_for_concurrent_read_,
RangesInDataParts parts_, const MergeTreeData & data_, const PrewhereInfoPtr & prewhere_info_,
const bool check_columns_, const Names & column_names_,
const BackoffSettings & backoff_settings_, size_t preferred_block_size_bytes_,
const size_t threads_,
const size_t sum_marks_,
const size_t min_marks_for_concurrent_read_,
RangesInDataParts parts_,
const MergeTreeData & data_,
const StorageMetadataPtr & metadata_snapshot_,
const PrewhereInfoPtr & prewhere_info_,
const bool check_columns_,
const Names & column_names_,
const BackoffSettings & backoff_settings_,
size_t preferred_block_size_bytes_,
const bool do_not_steal_tasks_)
: backoff_settings{backoff_settings_}, backoff_state{threads_}, data{data_},
column_names{column_names_}, do_not_steal_tasks{do_not_steal_tasks_},
predict_block_size_bytes{preferred_block_size_bytes_ > 0}, prewhere_info{prewhere_info_}, parts_ranges{parts_}
: backoff_settings{backoff_settings_}
, backoff_state{threads_}
, data{data_}
, metadata_snapshot{metadata_snapshot_}
, column_names{column_names_}
, do_not_steal_tasks{do_not_steal_tasks_}
, predict_block_size_bytes{preferred_block_size_bytes_ > 0}
, prewhere_info{prewhere_info_}
, parts_ranges{parts_}
{
/// parts don't contain duplicate MergeTreeDataPart's.
const auto per_part_sum_marks = fillPerPartInfo(parts_, check_columns_);
......@@ -139,7 +150,7 @@ MarkRanges MergeTreeReadPool::getRestMarks(const IMergeTreeDataPart & part, cons
Block MergeTreeReadPool::getHeader() const
{
return data.getSampleBlockForColumns(column_names);
return metadata_snapshot->getSampleBlockForColumns(column_names, data.getVirtuals());
}
void MergeTreeReadPool::profileFeedback(const ReadBufferFromFileBase::ProfileInfo info)
......
......@@ -68,7 +68,7 @@ private:
public:
MergeTreeReadPool(
const size_t threads_, const size_t sum_marks_, const size_t min_marks_for_concurrent_read_,
RangesInDataParts parts_, const MergeTreeData & data_, const PrewhereInfoPtr & prewhere_info_,
RangesInDataParts parts_, const MergeTreeData & data_, const StorageMetadataPtr & metadata_snapshot_, const PrewhereInfoPtr & prewhere_info_,
const bool check_columns_, const Names & column_names_,
const BackoffSettings & backoff_settings_, size_t preferred_block_size_bytes_,
const bool do_not_steal_tasks_ = false);
......@@ -95,6 +95,7 @@ private:
RangesInDataParts & parts, const size_t min_marks_for_concurrent_read);
const MergeTreeData & data;
StorageMetadataPtr metadata_snapshot;
Names column_names;
bool do_not_steal_tasks;
bool predict_block_size_bytes;
......
......@@ -34,6 +34,7 @@ static Block replaceTypes(Block && header, const MergeTreeData::DataPartPtr & da
MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const MergeTreeData::DataPartPtr & owned_data_part_,
UInt64 max_block_size_rows_,
size_t preferred_block_size_bytes_,
......@@ -49,8 +50,8 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
bool quiet)
:
MergeTreeBaseSelectProcessor{
replaceTypes(storage_.getSampleBlockForColumns(required_columns_), owned_data_part_),
storage_, prewhere_info_, max_block_size_rows_,
replaceTypes(metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals()), owned_data_part_),
storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_},
required_columns{std::move(required_columns_)},
......
......@@ -18,6 +18,7 @@ class MergeTreeReverseSelectProcessor : public MergeTreeBaseSelectProcessor
public:
MergeTreeReverseSelectProcessor(
const MergeTreeData & storage,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData::DataPartPtr & owned_data_part,
UInt64 max_block_size_rows,
size_t preferred_block_size_bytes,
......
......@@ -14,6 +14,7 @@ namespace ErrorCodes
MergeTreeSelectProcessor::MergeTreeSelectProcessor(
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const MergeTreeData::DataPartPtr & owned_data_part_,
UInt64 max_block_size_rows_,
size_t preferred_block_size_bytes_,
......@@ -29,8 +30,8 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
bool quiet)
:
MergeTreeBaseSelectProcessor{
storage_.getSampleBlockForColumns(required_columns_),
storage_, prewhere_info_, max_block_size_rows_,
metadata_snapshot_->getSampleBlockForColumns(required_columns_, storage_.getVirtuals()),
storage_, metadata_snapshot_, prewhere_info_, max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_},
required_columns{std::move(required_columns_)},
......
......@@ -18,6 +18,7 @@ class MergeTreeSelectProcessor : public MergeTreeBaseSelectProcessor
public:
MergeTreeSelectProcessor(
const MergeTreeData & storage,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeData::DataPartPtr & owned_data_part,
UInt64 max_block_size_rows,
size_t preferred_block_size_bytes,
......
......@@ -11,13 +11,15 @@ namespace ErrorCodes
MergeTreeSequentialSource::MergeTreeSequentialSource(
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
MergeTreeData::DataPartPtr data_part_,
Names columns_to_read_,
bool read_with_direct_io_,
bool take_column_types_from_storage,
bool quiet)
: SourceWithProgress(storage_.getSampleBlockForColumns(columns_to_read_))
: SourceWithProgress(metadata_snapshot_->getSampleBlockForColumns(columns_to_read_, storage_.getVirtuals()))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, data_part(std::move(data_part_))
, columns_to_read(std::move(columns_to_read_))
, read_with_direct_io(read_with_direct_io_)
......@@ -41,7 +43,7 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
NamesAndTypesList columns_for_reader;
if (take_column_types_from_storage)
{
const NamesAndTypesList & physical_columns = storage.getColumns().getAllPhysical();
const NamesAndTypesList & physical_columns = metadata_snapshot->getColumns().getAllPhysical();
columns_for_reader = physical_columns.addTypes(columns_to_read);
}
else
......
......@@ -14,12 +14,12 @@ class MergeTreeSequentialSource : public SourceWithProgress
public:
MergeTreeSequentialSource(
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
MergeTreeData::DataPartPtr data_part_,
Names columns_to_read_,
bool read_with_direct_io_,
bool take_column_types_from_storage,
bool quiet = false
);
bool quiet = false);
~MergeTreeSequentialSource() override;
......@@ -35,6 +35,7 @@ protected:
private:
const MergeTreeData & storage;
StorageMetadataPtr metadata_snapshot;
/// Data part will not be removed if the pointer owns it
MergeTreeData::DataPartPtr data_part;
......
......@@ -16,12 +16,15 @@ MergeTreeThreadSelectBlockInputProcessor::MergeTreeThreadSelectBlockInputProcess
size_t preferred_block_size_bytes_,
size_t preferred_max_column_in_block_size_bytes_,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
const MergeTreeReaderSettings & reader_settings_,
const Names & virt_column_names_)
:
MergeTreeBaseSelectProcessor{pool_->getHeader(), storage_, prewhere_info_, max_block_size_rows_,
MergeTreeBaseSelectProcessor{
pool_->getHeader(), storage_, metadata_snapshot_, prewhere_info_,
max_block_size_rows_,
preferred_block_size_bytes_, preferred_max_column_in_block_size_bytes_,
reader_settings_, use_uncompressed_cache_, virt_column_names_},
thread{thread_},
......
......@@ -22,6 +22,7 @@ public:
size_t preferred_block_size_bytes_,
size_t preferred_max_column_in_block_size_bytes_,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const bool use_uncompressed_cache_,
const PrewhereInfoPtr & prewhere_info_,
const MergeTreeReaderSettings & reader_settings_,
......
......@@ -21,15 +21,15 @@ public:
Pipes read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
unsigned num_streams) override
{
return MergeTreeDataSelectExecutor(part->storage).readFromParts(
{part}, column_names, query_info, context, max_block_size, num_streams);
return MergeTreeDataSelectExecutor(part->storage)
.readFromParts({part}, column_names, metadata_snapshot, query_info, context, max_block_size, num_streams);
}
......
......@@ -88,9 +88,11 @@ StorageBuffer::StorageBuffer(
class BufferSource : public SourceWithProgress
{
public:
BufferSource(const Names & column_names_, StorageBuffer::Buffer & buffer_, const StorageBuffer & storage)
: SourceWithProgress(storage.getSampleBlockForColumns(column_names_))
, column_names(column_names_.begin(), column_names_.end()), buffer(buffer_) {}
BufferSource(const Names & column_names_, StorageBuffer::Buffer & buffer_, const StorageBuffer & storage, const StorageMetadataPtr & metadata_snapshot)
: SourceWithProgress(
metadata_snapshot->getSampleBlockForColumns(column_names_, storage.getVirtuals()))
, column_names(column_names_.begin(), column_names_.end())
, buffer(buffer_) {}
String getName() const override { return "Buffer"; }
......@@ -145,7 +147,7 @@ QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(const Context
Pipes StorageBuffer::read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
......@@ -236,7 +238,7 @@ Pipes StorageBuffer::read(
Pipes pipes_from_buffers;
pipes_from_buffers.reserve(num_shards);
for (auto & buf : buffers)
pipes_from_buffers.emplace_back(std::make_shared<BufferSource>(column_names, buf, *this));
pipes_from_buffers.emplace_back(std::make_shared<BufferSource>(column_names, buf, *this, metadata_snapshot));
/** If the sources from the table were processed before some non-initial stage of query execution,
* then sources from the buffers must also be wrapped in the processing pipeline before the same stage.
......
#include <Storages/StorageInMemoryMetadata.h>
#include <Common/quoteString.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int COLUMN_QUERIED_MORE_THAN_ONCE;
extern const int DUPLICATE_COLUMN;
extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
extern const int EMPTY_LIST_OF_COLUMNS_QUERIED;
extern const int NO_SUCH_COLUMN_IN_TABLE;
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
extern const int TYPE_MISMATCH;
extern const int TABLE_IS_DROPPED;
extern const int NOT_IMPLEMENTED;
extern const int DEADLOCK_AVOIDED;
}
StorageInMemoryMetadata::StorageInMemoryMetadata(
const ColumnsDescription & columns_,
......@@ -240,4 +257,37 @@ Block StorageInMemoryMetadata::getSampleBlock() const
return res;
}
Block StorageInMemoryMetadata::getSampleBlockForColumns(const Names & column_names, const NamesAndTypesList & virtuals) const
{
Block res;
std::unordered_map<String, DataTypePtr> columns_map;
NamesAndTypesList all_columns = getColumns().getAll();
for (const auto & elem : all_columns)
columns_map.emplace(elem.name, elem.type);
/// Virtual columns must be appended after ordinary, because user can
/// override them.
for (const auto & column : virtuals)
columns_map.emplace(column.name, column.type);
for (const auto & name : column_names)
{
auto it = columns_map.find(name);
if (it != columns_map.end())
{
res.insert({it->second->createColumn(), it->second, it->first});
}
else
{
throw Exception(
"Column " + backQuote(name) + " not found in table " /*+ getStorageID().getNameForLogs() TODO(alesap)*/,
ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
}
}
return res;
}
}
......@@ -110,6 +110,8 @@ struct StorageInMemoryMetadata
Block getSampleBlock() const; /// ordinary + materialized.
Block getSampleBlockNonMaterialized() const; /// ordinary.
Block getSampleBlockWithVirtuals(const NamesAndTypesList & virtuals) const; /// ordinary + materialized + virtuals.
Block getSampleBlockForColumns(
const Names & column_names, const NamesAndTypesList & virtuals) const; /// ordinary + materialized + aliases + virtuals.
};
using StorageMetadataPtr = std::shared_ptr<StorageInMemoryMetadata>;
......
......@@ -435,7 +435,7 @@ private:
// TODO: multiple stream read and index read
Pipes StorageJoin::read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
......@@ -445,7 +445,7 @@ Pipes StorageJoin::read(
check(column_names);
Pipes pipes;
pipes.emplace_back(std::make_shared<JoinSource>(*join, max_block_size, getSampleBlockForColumns(column_names)));
pipes.emplace_back(std::make_shared<JoinSource>(*join, max_block_size, metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals())));
return pipes;
}
......
......@@ -22,9 +22,19 @@ namespace ErrorCodes
class MemorySource : public SourceWithProgress
{
public:
MemorySource(Names column_names_, BlocksList::iterator begin_, BlocksList::iterator end_, const StorageMemory & storage)
: SourceWithProgress(storage.getSampleBlockForColumns(column_names_))
, column_names(std::move(column_names_)), begin(begin_), end(end_), it(begin) {}
MemorySource(
Names column_names_,
BlocksList::iterator begin_,
BlocksList::iterator end_,
const StorageMemory & storage,
const StorageMetadataPtr & metadata_snapshot)
: SourceWithProgress(metadata_snapshot->getSampleBlockForColumns(column_names_, storage.getVirtuals()))
, column_names(std::move(column_names_))
, begin(begin_)
, end(end_)
, it(begin)
{
}
String getName() const override { return "Memory"; }
......@@ -60,9 +70,14 @@ private:
class MemoryBlockOutputStream : public IBlockOutputStream
{
public:
explicit MemoryBlockOutputStream(StorageMemory & storage_) : storage(storage_) {}
explicit MemoryBlockOutputStream(
StorageMemory & storage_,
const StorageMetadataPtr & metadata_snapshot_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
{}
Block getHeader() const override { return storage.getSampleBlock(); }
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override
{
......@@ -72,6 +87,7 @@ public:
}
private:
StorageMemory & storage;
StorageMetadataPtr metadata_snapshot;
};
......@@ -87,7 +103,7 @@ StorageMemory::StorageMemory(const StorageID & table_id_, ColumnsDescription col
Pipes StorageMemory::read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /*query_info*/,
const Context & /*context*/,
QueryProcessingStage::Enum /*processed_stage*/,
......@@ -113,16 +129,16 @@ Pipes StorageMemory::read(
std::advance(begin, stream * size / num_streams);
std::advance(end, (stream + 1) * size / num_streams);
pipes.emplace_back(std::make_shared<MemorySource>(column_names, begin, end, *this));
pipes.emplace_back(std::make_shared<MemorySource>(column_names, begin, end, *this, metadata_snapshot));
}
return pipes;
}
BlockOutputStreamPtr StorageMemory::write(const ASTPtr & /*query*/, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & /*context*/)
BlockOutputStreamPtr StorageMemory::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
{
return std::make_shared<MemoryBlockOutputStream>(*this);
return std::make_shared<MemoryBlockOutputStream>(*this, metadata_snapshot);
}
......
......@@ -129,7 +129,7 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage(const Context &
Pipes StorageMerge::read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
......@@ -157,7 +157,7 @@ Pipes StorageMerge::read(
modified_context->setSetting("optimize_move_to_prewhere", false);
/// What will be result structure depending on query processed stage in source tables?
Block header = getQueryHeader(column_names, query_info, context, processed_stage);
Block header = getQueryHeader(column_names, metadata_snapshot, query_info, context, processed_stage);
/** First we make list of selected tables to find out its size.
* This is necessary to correctly pass the recommended number of threads to each table.
......@@ -401,13 +401,17 @@ void StorageMerge::alter(
}
Block StorageMerge::getQueryHeader(
const Names & column_names, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum processed_stage)
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage)
{
switch (processed_stage)
{
case QueryProcessingStage::FetchColumns:
{
Block header = getSampleBlockForColumns(column_names);
Block header = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals());
if (query_info.prewhere_info)
{
query_info.prewhere_info->prewhere_actions->execute(header);
......@@ -420,7 +424,7 @@ Block StorageMerge::getQueryHeader(
case QueryProcessingStage::WithMergeableState:
case QueryProcessingStage::Complete:
return materializeBlock(InterpreterSelectQuery(
query_info.query, context, std::make_shared<OneBlockInputStream>(getSampleBlockForColumns(column_names)),
query_info.query, context, std::make_shared<OneBlockInputStream>(metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals())),
SelectQueryOptions(processed_stage).analyze()).getSampleBlock());
}
throw Exception("Logical Error: unknown processed stage.", ErrorCodes::LOGICAL_ERROR);
......
......@@ -74,8 +74,12 @@ protected:
const String & table_name_regexp_,
const Context & context_);
Block getQueryHeader(const Names & column_names, const SelectQueryInfo & query_info,
const Context & context, QueryProcessingStage::Enum processed_stage);
Block getQueryHeader(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage);
Pipes createSources(
const SelectQueryInfo & query_info, const QueryProcessingStage::Enum & processed_stage,
......
......@@ -177,14 +177,15 @@ StorageMergeTree::~StorageMergeTree()
Pipes StorageMergeTree::read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum /*processed_stage*/,
const size_t max_block_size,
const unsigned num_streams)
{
return reader.read(column_names, query_info, context, max_block_size, num_streams);
return reader.read(column_names, metadata_snapshot, query_info,
context, max_block_size, num_streams);
}
std::optional<UInt64> StorageMergeTree::totalRows() const
......
......@@ -24,7 +24,7 @@ public:
Pipes read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo &,
const Context & /*context*/,
QueryProcessingStage::Enum /*processing_stage*/,
......@@ -32,13 +32,14 @@ public:
unsigned) override
{
Pipes pipes;
pipes.emplace_back(std::make_shared<NullSource>(getSampleBlockForColumns(column_names)));
pipes.emplace_back(
std::make_shared<NullSource>(metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals())));
return pipes;
}
BlockOutputStreamPtr write(const ASTPtr &, const StorageMetadataPtr & /*metadata_snapshot*/, const Context &) override
BlockOutputStreamPtr write(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &) override
{
return std::make_shared<NullBlockOutputStream>(getSampleBlock());
return std::make_shared<NullBlockOutputStream>(metadata_snapshot->getSampleBlock());
}
void checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */) const override;
......
......@@ -3387,7 +3387,7 @@ ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock StorageReplicatedMerg
Pipes StorageReplicatedMergeTree::read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum /*processed_stage*/,
......@@ -3402,10 +3402,10 @@ Pipes StorageReplicatedMergeTree::read(
if (context.getSettingsRef().select_sequential_consistency)
{
auto max_added_blocks = getMaxAddedBlocks();
return reader.read(column_names, query_info, context, max_block_size, num_streams, &max_added_blocks);
return reader.read(column_names, metadata_snapshot, query_info, context, max_block_size, num_streams, &max_added_blocks);
}
return reader.read(column_names, query_info, context, max_block_size, num_streams);
return reader.read(column_names, metadata_snapshot, query_info, context, max_block_size, num_streams);
}
......
......@@ -285,7 +285,7 @@ Strings listFilesWithRegexpMatching(Aws::S3::S3Client & client, const S3::URI &
Pipes StorageS3::read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum /*processed_stage*/,
......@@ -309,9 +309,9 @@ Pipes StorageS3::read(
need_file_column,
format_name,
getName(),
getHeaderBlock(column_names),
metadata_snapshot->getSampleBlock(),
context,
getColumns().getDefaults(),
metadata_snapshot->getColumns().getDefaults(),
max_block_size,
chooseCompressionMethod(uri.endpoint, compression_method),
client,
......@@ -321,11 +321,11 @@ Pipes StorageS3::read(
return narrowPipes(std::move(pipes), num_streams);
}
BlockOutputStreamPtr StorageS3::write(const ASTPtr & /*query*/, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & /*context*/)
BlockOutputStreamPtr StorageS3::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
{
return std::make_shared<StorageS3BlockOutputStream>(
format_name, min_upload_part_size, getSampleBlock(), context_global,
chooseCompressionMethod(uri.endpoint, compression_method),
format_name, min_upload_part_size, metadata_snapshot->getSampleBlock(),
context_global, chooseCompressionMethod(uri.endpoint, compression_method),
client, uri.bucket, uri.key);
}
......
......@@ -41,11 +41,6 @@ public:
return "S3";
}
Block getHeaderBlock(const Names & /*column_names*/) const
{
return getSampleBlock();
}
Pipes read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
......
......@@ -54,12 +54,13 @@ public:
static Block getHeader(
StorageStripeLog & storage,
const StorageMetadataPtr & metadata_snapshot,
const Names & column_names,
IndexForNativeFormat::Blocks::const_iterator index_begin,
IndexForNativeFormat::Blocks::const_iterator index_end)
{
if (index_begin == index_end)
return storage.getSampleBlockForColumns(column_names);
return metadata_snapshot->getSampleBlockForColumns(column_names, storage.getVirtuals());
/// TODO: check if possible to always return storage.getSampleBlock()
......@@ -74,13 +75,22 @@ public:
return header;
}
StripeLogSource(StorageStripeLog & storage_, const Names & column_names, size_t max_read_buffer_size_,
StripeLogSource(
StorageStripeLog & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const Names & column_names,
size_t max_read_buffer_size_,
std::shared_ptr<const IndexForNativeFormat> & index_,
IndexForNativeFormat::Blocks::const_iterator index_begin_,
IndexForNativeFormat::Blocks::const_iterator index_end_)
: SourceWithProgress(getHeader(storage_, column_names, index_begin_, index_end_))
, storage(storage_), max_read_buffer_size(max_read_buffer_size_)
, index(index_), index_begin(index_begin_), index_end(index_end_)
: SourceWithProgress(
getHeader(storage_, metadata_snapshot_, column_names, index_begin_, index_end_))
, storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, max_read_buffer_size(max_read_buffer_size_)
, index(index_)
, index_begin(index_begin_)
, index_end(index_end_)
{
}
......@@ -110,6 +120,7 @@ protected:
private:
StorageStripeLog & storage;
StorageMetadataPtr metadata_snapshot;
size_t max_read_buffer_size;
std::shared_ptr<const IndexForNativeFormat> index;
......@@ -253,7 +264,7 @@ void StorageStripeLog::rename(const String & new_path_to_table_data, const Stora
Pipes StorageStripeLog::read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & /*query_info*/,
const Context & context,
QueryProcessingStage::Enum /*processed_stage*/,
......@@ -271,7 +282,7 @@ Pipes StorageStripeLog::read(
String index_file = table_path + "index.mrk";
if (!disk->exists(index_file))
{
pipes.emplace_back(std::make_shared<NullSource>(getSampleBlockForColumns(column_names)));
pipes.emplace_back(std::make_shared<NullSource>(metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals())));
return pipes;
}
......@@ -291,7 +302,7 @@ Pipes StorageStripeLog::read(
std::advance(end, (stream + 1) * size / num_streams);
pipes.emplace_back(std::make_shared<StripeLogSource>(
*this, column_names, context.getSettingsRef().max_read_buffer_size, index, begin, end));
*this, metadata_snapshot, column_names, context.getSettingsRef().max_read_buffer_size, index, begin, end));
}
/// We do not keep read lock directly at the time of reading, because we read ranges of data that do not change.
......
......@@ -157,7 +157,7 @@ std::function<void(std::ostream &)> IStorageURLBase::getReadPOSTDataCallback(con
Pipes IStorageURLBase::read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum processed_stage,
......@@ -170,14 +170,15 @@ Pipes IStorageURLBase::read(
request_uri.addQueryParameter(param, value);
Pipes pipes;
pipes.emplace_back(std::make_shared<StorageURLSource>(request_uri,
pipes.emplace_back(std::make_shared<StorageURLSource>(
request_uri,
getReadMethod(),
getReadPOSTDataCallback(column_names, query_info, context, processed_stage, max_block_size),
format_name,
getName(),
getHeaderBlock(column_names),
getHeaderBlock(column_names, metadata_snapshot),
context,
getColumns().getDefaults(),
metadata_snapshot->getColumns().getDefaults(),
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(context),
chooseCompressionMethod(request_uri.getPath(), compression_method)));
......
......@@ -62,7 +62,7 @@ private:
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const;
virtual Block getHeaderBlock(const Names & column_names) const = 0;
virtual Block getHeaderBlock(const Names & column_names, const StorageMetadataPtr & metadata_snapshot) const = 0;
};
class StorageURLBlockOutputStream : public IBlockOutputStream
......@@ -124,9 +124,9 @@ public:
return "URL";
}
Block getHeaderBlock(const Names & /*column_names*/) const override
Block getHeaderBlock(const Names & /*column_names*/, const StorageMetadataPtr & metadata_snapshot) const override
{
return getSampleBlock();
return metadata_snapshot->getSampleBlock();
}
};
}
......@@ -54,7 +54,7 @@ StorageView::StorageView(
Pipes StorageView::read(
const Names & column_names,
const StorageMetadataPtr & /*metadata_snapshot*/,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum /*processed_stage*/,
......@@ -86,8 +86,9 @@ Pipes StorageView::read(
/// And also convert to expected structure.
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(header, getSampleBlockForColumns(column_names),
ConvertingTransform::MatchColumnsMode::Name);
return std::make_shared<ConvertingTransform>(
header, metadata_snapshot->getSampleBlockForColumns(
column_names, getVirtuals()), ConvertingTransform::MatchColumnsMode::Name);
});
pipes = std::move(pipeline).getPipes();
......
......@@ -97,16 +97,16 @@ Pipes StorageXDBC::read(
return IStorageURLBase::read(column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
}
BlockOutputStreamPtr StorageXDBC::write(const ASTPtr & /*query*/, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context)
BlockOutputStreamPtr StorageXDBC::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & context)
{
bridge_helper->startBridgeSync();
NamesAndTypesList cols;
Poco::URI request_uri = uri;
request_uri.setPath("/write");
for (const String & name : getSampleBlock().getNames())
for (const String & name : metadata_snapshot->getSampleBlock().getNames())
{
auto column_data = getColumns().getPhysical(name);
auto column_data = metadata_snapshot->getColumns().getPhysical(name);
cols.emplace_back(column_data.name, column_data.type);
}
auto url_params = bridge_helper->getURLParams(cols.toString(), 65536);
......@@ -117,14 +117,17 @@ BlockOutputStreamPtr StorageXDBC::write(const ASTPtr & /*query*/, const StorageM
request_uri.addQueryParameter("format_name", format_name);
return std::make_shared<StorageURLBlockOutputStream>(
request_uri, format_name, getSampleBlock(), context,
ConnectionTimeouts::getHTTPTimeouts(context),
chooseCompressionMethod(uri.toString(), compression_method));
request_uri,
format_name,
metadata_snapshot->getSampleBlock(),
context,
ConnectionTimeouts::getHTTPTimeouts(context),
chooseCompressionMethod(uri.toString(), compression_method));
}
Block StorageXDBC::getHeaderBlock(const Names & column_names) const
Block StorageXDBC::getHeaderBlock(const Names & column_names, const StorageMetadataPtr & metadata_snapshot) const
{
return getSampleBlockForColumns(column_names);
return metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals());
}
std::string StorageXDBC::getName() const
......
......@@ -24,11 +24,12 @@ public:
size_t max_block_size,
unsigned num_streams) override;
StorageXDBC(const StorageID & table_id_,
const std::string & remote_database_name,
const std::string & remote_table_name,
const ColumnsDescription & columns_,
const Context & context_, BridgeHelperPtr bridge_helper_);
StorageXDBC(
const StorageID & table_id_,
const std::string & remote_database_name,
const std::string & remote_table_name,
const ColumnsDescription & columns_,
const Context & context_, BridgeHelperPtr bridge_helper_);
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) override;
......@@ -42,19 +43,21 @@ private:
std::string getReadMethod() const override;
std::vector<std::pair<std::string, std::string>> getReadURIParams(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const override;
std::vector<std::pair<std::string, std::string>> getReadURIParams(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const override;
std::function<void(std::ostream &)> getReadPOSTDataCallback(const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const override;
std::function<void(std::ostream &)> getReadPOSTDataCallback(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size) const override;
Block getHeaderBlock(const Names & column_names) const override;
Block getHeaderBlock(const Names & column_names, const StorageMetadataPtr & metadata_snapshot) const override;
std::string getName() const override;
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册