未验证 提交 5152e2b8 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #2658 from yandex/issue-3578

CLICKHOUSE-3578 apply column defaults for semistructured input formats
# This strings autochanged from release_lib.sh:
set(VERSION_REVISION 54409 CACHE STRING "")
set(VERSION_REVISION 54410 CACHE STRING "") # changed manually for tests
set(VERSION_MAJOR 18 CACHE STRING "")
set(VERSION_MINOR 14 CACHE STRING "")
set(VERSION_PATCH 17 CACHE STRING "")
......
......@@ -43,6 +43,7 @@
#include <IO/WriteHelpers.h>
#include <IO/UseSSL.h>
#include <DataStreams/AsynchronousBlockInputStream.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <DataStreams/InternalTextLogsRowOutputStream.h>
#include <Parsers/ParserQuery.h>
#include <Parsers/ASTSetQuery.h>
......@@ -60,6 +61,7 @@
#include <Functions/registerFunctions.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Common/Config/configReadClient.h>
#include <Storages/ColumnsDescription.h>
#if USE_READLINE
#include "Suggest.h" // Y_IGNORE
......@@ -69,7 +71,6 @@
#pragma GCC optimize("-fno-var-tracking-assignments")
#endif
/// http://en.wikipedia.org/wiki/ANSI_escape_code
/// Similar codes \e[s, \e[u don't work in VT100 and Mosh.
......@@ -875,11 +876,12 @@ private:
/// Receive description of table structure.
Block sample;
if (receiveSampleBlock(sample))
ColumnsDescription columns_description;
if (receiveSampleBlock(sample, columns_description))
{
/// If structure was received (thus, server has not thrown an exception),
/// send our data with that structure.
sendData(sample);
sendData(sample, columns_description);
receiveEndOfQuery();
}
}
......@@ -917,7 +919,7 @@ private:
}
void sendData(Block & sample)
void sendData(Block & sample, const ColumnsDescription & columns_description)
{
/// If INSERT data must be sent.
const ASTInsertQuery * parsed_insert_query = typeid_cast<const ASTInsertQuery *>(&*parsed_query);
......@@ -928,19 +930,19 @@ private:
{
/// Send data contained in the query.
ReadBufferFromMemory data_in(parsed_insert_query->data, parsed_insert_query->end - parsed_insert_query->data);
sendDataFrom(data_in, sample);
sendDataFrom(data_in, sample, columns_description);
}
else if (!is_interactive)
{
/// Send data read from stdin.
sendDataFrom(std_in, sample);
sendDataFrom(std_in, sample, columns_description);
}
else
throw Exception("No data to insert", ErrorCodes::NO_DATA_TO_INSERT);
}
void sendDataFrom(ReadBuffer & buf, Block & sample)
void sendDataFrom(ReadBuffer & buf, Block & sample, const ColumnsDescription & columns_description)
{
String current_format = insert_format;
......@@ -952,6 +954,10 @@ private:
BlockInputStreamPtr block_input = context.getInputFormat(
current_format, buf, sample, insert_format_max_block_size);
const auto & column_defaults = columns_description.defaults;
if (!column_defaults.empty())
block_input = std::make_shared<AddingDefaultsBlockInputStream>(block_input, column_defaults, context);
BlockInputStreamPtr async_block_input = std::make_shared<AsynchronousBlockInputStream>(block_input);
async_block_input->readPrefix();
......@@ -1089,7 +1095,7 @@ private:
/// Receive the block that serves as an example of the structure of table where data will be inserted.
bool receiveSampleBlock(Block & out)
bool receiveSampleBlock(Block & out, ColumnsDescription & columns_description)
{
while (true)
{
......@@ -1110,6 +1116,10 @@ private:
onLogData(packet.block);
break;
case Protocol::Server::TableColumns:
columns_description = ColumnsDescription::parse(packet.multistring_message[1]);
return receiveSampleBlock(out, columns_description);
default:
throw NetException("Unexpected packet from server (expected Data, Exception or Log, got "
+ String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
......
......@@ -30,6 +30,7 @@
#include <Storages/StorageMemory.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Core/ExternalTable.h>
#include <Storages/ColumnDefault.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include "TCPHandler.h"
......@@ -360,6 +361,14 @@ void TCPHandler::processInsertQuery(const Settings & global_settings)
*/
state.io.out->writePrefix();
/// Send ColumnsDescription for insertion table
if (client_revision >= DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA)
{
const auto & db_and_table = query_context.getInsertionTable();
if (auto * columns = ColumnsDescription::loadFromContext(query_context, db_and_table.first, db_and_table.second))
sendTableColumns(*columns);
}
/// Send block to the client - table structure.
Block block = state.io.out->getHeader();
......@@ -860,6 +869,16 @@ void TCPHandler::sendLogData(const Block & block)
out->next();
}
void TCPHandler::sendTableColumns(const ColumnsDescription & columns)
{
writeVarUInt(Protocol::Server::TableColumns, *out);
/// Send external table name (empty name is the main table)
writeStringBinary("", *out);
writeStringBinary(columns.toString(), *out);
out->next();
}
void TCPHandler::sendException(const Exception & e, bool with_stack_trace)
{
......
......@@ -144,6 +144,7 @@ private:
void sendHello();
void sendData(const Block & block); /// Write a block to the network.
void sendLogData(const Block & block);
void sendTableColumns(const ColumnsDescription & columns);
void sendException(const Exception & e, bool with_stack_trace);
void sendProgress();
void sendLogs();
......
......@@ -603,6 +603,10 @@ Connection::Packet Connection::receivePacket()
res.block = receiveLogData();
return res;
case Protocol::Server::TableColumns:
res.multistring_message = receiveMultistringMessage(res.type);
return res;
case Protocol::Server::EndOfStream:
return res;
......@@ -712,6 +716,16 @@ std::unique_ptr<Exception> Connection::receiveException()
}
std::vector<String> Connection::receiveMultistringMessage(UInt64 msg_type)
{
size_t num = Protocol::Server::stringsInMessage(msg_type);
std::vector<String> out(num);
for (size_t i = 0; i < num; ++i)
readStringBinary(out[i], *in);
return out;
}
Progress Connection::receiveProgress()
{
//LOG_TRACE(log_wrapper.get(), "Receiving progress");
......
#pragma once
#include <optional>
#include <common/logger_useful.h>
#include <Poco/Net/StreamSocket.h>
......@@ -96,6 +98,7 @@ public:
Block block;
std::unique_ptr<Exception> exception;
std::vector<String> multistring_message;
Progress progress;
BlockStreamProfileInfo profile_info;
......@@ -254,6 +257,7 @@ private:
Block receiveLogData();
Block receiveDataImpl(BlockInputStreamPtr & stream);
std::vector<String> receiveMultistringMessage(UInt64 msg_type);
std::unique_ptr<Exception> receiveException();
Progress receiveProgress();
BlockStreamProfileInfo receiveProfileInfo();
......
......@@ -99,6 +99,13 @@ void Block::insertUnique(ColumnWithTypeAndName && elem)
}
void Block::erase(const std::set<size_t> & positions)
{
for (auto it = positions.rbegin(); it != positions.rend(); ++it)
erase(*it);
}
void Block::erase(size_t position)
{
if (data.empty())
......
......@@ -2,6 +2,7 @@
#include <vector>
#include <list>
#include <set>
#include <map>
#include <initializer_list>
......@@ -51,6 +52,8 @@ public:
void insertUnique(ColumnWithTypeAndName && elem);
/// remove the column at the specified position
void erase(size_t position);
/// remove the columns at the specified positions
void erase(const std::set<size_t> & positions);
/// remove the column with the specified name
void erase(const String & name);
......
......@@ -58,4 +58,20 @@ void BlockInfo::read(ReadBuffer & in)
}
}
void BlockMissingValues::setBit(size_t column_idx, size_t row_idx)
{
RowsBitMask & mask = rows_mask_by_column_id[column_idx];
mask.resize(row_idx + 1);
mask[row_idx] = true;
}
const BlockMissingValues::RowsBitMask & BlockMissingValues::getDefaultsBitmask(size_t column_idx) const
{
static RowsBitMask none;
auto it = rows_mask_by_column_id.find(column_idx);
if (it != rows_mask_by_column_id.end())
return it->second;
return none;
}
}
#pragma once
#include <unordered_map>
#include <Core/Types.h>
......@@ -43,4 +45,24 @@ struct BlockInfo
void read(ReadBuffer & in);
};
/// Block extention to support delayed defaults. AddingDefaultsBlockInputStream uses it to replace missing values with column defaults.
class BlockMissingValues
{
public:
using RowsBitMask = std::vector<bool>; /// a bit per row for a column
const RowsBitMask & getDefaultsBitmask(size_t column_idx) const;
void setBit(size_t column_idx, size_t row_idx);
bool empty() const { return rows_mask_by_column_id.empty(); }
size_t size() const { return rows_mask_by_column_id.size(); }
void clear() { rows_mask_by_column_id.clear(); }
private:
using RowsMaskByColumnId = std::unordered_map<size_t, RowsBitMask>;
/// If rows_mask_by_column_id[column_id][row_id] is true related value in Block should be replaced with column default.
/// It could contain less columns and rows then related block.
RowsMaskByColumnId rows_mask_by_column_id;
};
}
......@@ -51,6 +51,7 @@
/// Two-level (bucketed) aggregation is incompatible if servers are inconsistent in these rules
/// (keys will be placed in different buckets and result will not be fully aggregated).
#define DBMS_MIN_REVISION_WITH_CURRENT_AGGREGATION_VARIANT_SELECTION_METHOD 54408
#define DBMS_MIN_REVISION_WITH_COLUMN_DEFAULTS_METADATA 54410
#define DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE 54405
......
......@@ -69,7 +69,8 @@ namespace Protocol
Totals = 7, /// A block with totals (compressed or not).
Extremes = 8, /// A block with minimums and maximums (compressed or not).
TablesStatusResponse = 9, /// A response to TablesStatus request.
Log = 10 /// System logs of the query execution
Log = 10, /// System logs of the query execution
TableColumns = 11, /// Columns' description for default values calculation
};
/// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10
......@@ -78,11 +79,24 @@ namespace Protocol
/// See https://www.securecoding.cert.org/confluence/display/cplusplus/INT36-CPP.+Do+not+use+out-of-range+enumeration+values
inline const char * toString(UInt64 packet)
{
static const char * data[] = { "Hello", "Data", "Exception", "Progress", "Pong", "EndOfStream", "ProfileInfo", "Totals", "Extremes", "TablesStatusResponse", "Log" };
return packet < 11
static const char * data[] = { "Hello", "Data", "Exception", "Progress", "Pong", "EndOfStream", "ProfileInfo", "Totals",
"Extremes", "TablesStatusResponse", "Log", "TableColumns" };
return packet < 12
? data[packet]
: "Unknown packet";
}
inline size_t stringsInMessage(UInt64 msg_type)
{
switch (msg_type)
{
case TableColumns:
return 2;
default:
break;
}
return 0;
}
}
/// Packet types that client transmits.
......@@ -103,8 +117,8 @@ namespace Protocol
inline const char * toString(UInt64 packet)
{
static const char * data[] = { "Hello", "Query", "Data", "Cancel", "Ping", "TablesStatusRequest" };
return packet < 6
static const char * data[] = { "Hello", "Query", "Data", "Cancel", "Ping", "TablesStatusRequest", "KeepAlive" };
return packet < 7
? data[packet]
: "Unknown packet";
}
......
#include <Common/typeid_cast.h>
#include <Functions/FunctionHelpers.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/evaluateMissingDefaults.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnConst.h>
#include <Columns/FilterDescription.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeFixedString.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
extern const int TYPE_MISMATCH;
}
AddingDefaultsBlockInputStream::AddingDefaultsBlockInputStream(const BlockInputStreamPtr & input,
const ColumnDefaults & column_defaults_,
const Context & context_)
: column_defaults(column_defaults_),
context(context_)
{
children.push_back(input);
header = input->getHeader();
}
Block AddingDefaultsBlockInputStream::readImpl()
{
Block res = children.back()->read();
if (!res)
return res;
if (column_defaults.empty())
return res;
const BlockMissingValues & block_missing_values = children.back()->getMissingValues();
if (block_missing_values.empty())
return res;
Block evaluate_block{res};
/// remove columns for recalculation
for (const auto & column : column_defaults)
if (evaluate_block.has(column.first))
evaluate_block.erase(column.first);
evaluateMissingDefaults(evaluate_block, header.getNamesAndTypesList(), column_defaults, context, false);
std::unordered_map<size_t, MutableColumnPtr> mixed_columns;
for (const ColumnWithTypeAndName & column_def : evaluate_block)
{
const String & column_name = column_def.name;
if (column_defaults.count(column_name) == 0)
continue;
size_t block_column_position = res.getPositionByName(column_name);
ColumnWithTypeAndName & column_read = res.getByPosition(block_column_position);
const auto & defaults_mask = block_missing_values.getDefaultsBitmask(block_column_position);
checkCalculated(column_read, column_def, defaults_mask.size());
if (!defaults_mask.empty())
{
/// TODO: FixedString
if (isColumnedAsNumber(column_read.type) || isDecimal(column_read.type))
{
MutableColumnPtr column_mixed = (*std::move(column_read.column)).mutate();
mixNumberColumns(column_read.type->getTypeId(), column_mixed, column_def.column, defaults_mask);
column_read.column = std::move(column_mixed);
}
else
{
MutableColumnPtr column_mixed = mixColumns(column_read, column_def, defaults_mask);
mixed_columns.emplace(block_column_position, std::move(column_mixed));
}
}
}
if (!mixed_columns.empty())
{
/// replace columns saving block structure
MutableColumns mutation = res.mutateColumns();
for (size_t position = 0; position < mutation.size(); ++position)
{
auto it = mixed_columns.find(position);
if (it != mixed_columns.end())
mutation[position] = std::move(it->second);
}
res.setColumns(std::move(mutation));
}
return res;
}
void AddingDefaultsBlockInputStream::checkCalculated(const ColumnWithTypeAndName & col_read,
const ColumnWithTypeAndName & col_defaults,
size_t defaults_needed) const
{
size_t column_size = col_read.column->size();
if (column_size != col_defaults.column->size())
throw Exception("Mismatch column sizes while adding defaults", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
if (column_size < defaults_needed)
throw Exception("Unexpected defaults count", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
if (!col_read.type->equals(*col_defaults.type))
throw Exception("Mismach column types while adding defaults", ErrorCodes::TYPE_MISMATCH);
}
void AddingDefaultsBlockInputStream::mixNumberColumns(TypeIndex type_idx, MutableColumnPtr & column_mixed, const ColumnPtr & column_defs,
const BlockMissingValues::RowsBitMask & defaults_mask) const
{
auto call = [&](const auto & types) -> bool
{
using Types = std::decay_t<decltype(types)>;
using DataType = typename Types::LeftType;
if constexpr (!std::is_same_v<DataType, DataTypeString> && !std::is_same_v<DataType, DataTypeFixedString>)
{
using FieldType = typename DataType::FieldType;
using ColVecType = std::conditional_t<IsDecimalNumber<FieldType>, ColumnDecimal<FieldType>, ColumnVector<FieldType>>;
auto col_read = typeid_cast<ColVecType *>(column_mixed.get());
if (!col_read)
return false;
typename ColVecType::Container & dst = col_read->getData();
if (auto const_col_defs = checkAndGetColumnConst<ColVecType>(column_defs.get()))
{
FieldType value = checkAndGetColumn<ColVecType>(const_col_defs->getDataColumnPtr().get())->getData()[0];
for (size_t i = 0; i < defaults_mask.size(); ++i)
if (defaults_mask[i])
dst[i] = value;
return true;
}
else if (auto col_defs = checkAndGetColumn<ColVecType>(column_defs.get()))
{
auto & src = col_defs->getData();
for (size_t i = 0; i < defaults_mask.size(); ++i)
if (defaults_mask[i])
dst[i] = src[i];
return true;
}
}
return false;
};
if (!callOnIndexAndDataType<void>(type_idx, call))
throw Exception("Unexpected type on mixNumberColumns", ErrorCodes::LOGICAL_ERROR);
}
MutableColumnPtr AddingDefaultsBlockInputStream::mixColumns(const ColumnWithTypeAndName & col_read,
const ColumnWithTypeAndName & col_defaults,
const BlockMissingValues::RowsBitMask & defaults_mask) const
{
size_t column_size = col_read.column->size();
size_t defaults_needed = defaults_mask.size();
MutableColumnPtr column_mixed = col_read.column->cloneEmpty();
for (size_t i = 0; i < defaults_needed; ++i)
{
if (defaults_mask[i])
{
if (col_defaults.column->isColumnConst())
column_mixed->insert((*col_defaults.column)[i]);
else
column_mixed->insertFrom(*col_defaults.column, i);
}
else
column_mixed->insertFrom(*col_read.column, i);
}
for (size_t i = defaults_needed; i < column_size; ++i)
column_mixed->insertFrom(*col_read.column, i);
return column_mixed;
}
}
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Storages/ColumnDefault.h>
#include <Interpreters/Context.h>
namespace DB
{
/// Adds defaults to columns using BlockDelayedDefaults bitmask attached to Block by child InputStream.
class AddingDefaultsBlockInputStream : public IProfilingBlockInputStream
{
public:
AddingDefaultsBlockInputStream(
const BlockInputStreamPtr & input,
const ColumnDefaults & column_defaults_,
const Context & context_);
String getName() const override { return "AddingDefaults"; }
Block getHeader() const override { return header; }
protected:
Block readImpl() override;
private:
Block header;
const ColumnDefaults column_defaults;
const Context & context;
void checkCalculated(const ColumnWithTypeAndName & col_read, const ColumnWithTypeAndName & col_defaults, size_t needed) const;
MutableColumnPtr mixColumns(const ColumnWithTypeAndName & col_read, const ColumnWithTypeAndName & col_defaults,
const BlockMissingValues::RowsBitMask & defaults_mask) const;
void mixNumberColumns(TypeIndex type_idx, MutableColumnPtr & col_mixed, const ColumnPtr & col_defaults,
const BlockMissingValues::RowsBitMask & defaults_mask) const;
};
}
#include <DataStreams/AddingDefaultBlockInputStream.h>
#include <DataStreams/AddingMissedBlockInputStream.h>
#include <Interpreters/addMissingDefaults.h>
namespace DB
{
AddingDefaultBlockInputStream::AddingDefaultBlockInputStream(
AddingMissedBlockInputStream::AddingMissedBlockInputStream(
const BlockInputStreamPtr & input_,
const Block & header_,
const ColumnDefaults & column_defaults_,
......@@ -16,7 +16,7 @@ AddingDefaultBlockInputStream::AddingDefaultBlockInputStream(
children.emplace_back(input);
}
Block AddingDefaultBlockInputStream::readImpl()
Block AddingMissedBlockInputStream::readImpl()
{
Block src = children.back()->read();
if (!src)
......
......@@ -14,16 +14,16 @@ namespace DB
* 3. Columns that materialized from other columns (materialized columns)
* All three types of columns are materialized (not constants).
*/
class AddingDefaultBlockInputStream : public IProfilingBlockInputStream
class AddingMissedBlockInputStream : public IProfilingBlockInputStream
{
public:
AddingDefaultBlockInputStream(
AddingMissedBlockInputStream(
const BlockInputStreamPtr & input_,
const Block & header_,
const ColumnDefaults & column_defaults_,
const Context & context_);
String getName() const override { return "AddingDefault"; }
String getName() const override { return "AddingMissed"; }
Block getHeader() const override { return header; }
private:
......
......@@ -63,6 +63,12 @@ public:
*/
virtual Block read() = 0;
virtual const BlockMissingValues & getMissingValues() const
{
static const BlockMissingValues none;
return none;
}
/** Read something before starting all data or after the end of all data.
* In the `readSuffix` function, you can implement a finalization that can lead to an exception.
* readPrefix() must be called before the first call to read().
......
......@@ -4,7 +4,8 @@
#include <IO/ReadBufferFromMemory.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/InputStreamFromASTInsertQuery.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <Storages/ColumnsDescription.h>
namespace DB
{
......@@ -44,6 +45,10 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
input_buffer_contacenated = std::make_unique<ConcatReadBuffer>(buffers);
res_stream = context.getInputFormat(format, *input_buffer_contacenated, streams.out->getHeader(), context.getSettings().max_insert_block_size);
auto columns_description = ColumnsDescription::loadFromContext(context, ast_insert_query->database, ast_insert_query->table);
if (columns_description && !columns_description->defaults.empty())
res_stream = std::make_shared<AddingDefaultsBlockInputStream>(res_stream, columns_description->defaults, context);
}
}
......@@ -514,6 +514,13 @@ inline bool isNumber(const T & data_type)
return which.isInt() || which.isUInt() || which.isFloat();
}
template <typename T>
inline bool isColumnedAsNumber(const T & data_type)
{
WhichDataType which(data_type);
return which.isInt() || which.isUInt() || which.isFloat() || which.isDateOrDateTime() || which.isUUID();
}
template <typename T>
inline bool isString(const T & data_type)
{
......
......@@ -14,7 +14,7 @@ BinaryRowInputStream::BinaryRowInputStream(ReadBuffer & istr_, const Block & hea
}
bool BinaryRowInputStream::read(MutableColumns & columns)
bool BinaryRowInputStream::read(MutableColumns & columns, RowReadExtension &)
{
if (istr.eof())
return false;
......
......@@ -17,7 +17,7 @@ class BinaryRowInputStream : public IRowInputStream
public:
BinaryRowInputStream(ReadBuffer & istr_, const Block & header_);
bool read(MutableColumns & columns) override;
bool read(MutableColumns & columns, RowReadExtension &) override;
private:
ReadBuffer & istr;
......
......@@ -19,6 +19,7 @@ namespace ErrorCodes
extern const int TOO_LARGE_STRING_SIZE;
extern const int CANNOT_READ_ALL_DATA;
extern const int INCORRECT_DATA;
extern const int INCORRECT_NUMBER_OF_COLUMNS;
}
......@@ -52,6 +53,7 @@ Block BlockInputStreamFromRowInputStream::readImpl()
{
size_t num_columns = sample.columns();
MutableColumns columns = sample.cloneEmptyColumns();
block_missing_values.clear();
try
{
......@@ -60,8 +62,20 @@ Block BlockInputStreamFromRowInputStream::readImpl()
try
{
++total_rows;
if (!row_input->read(columns))
RowReadExtension info;
if (!row_input->read(columns, info))
break;
for (size_t column_idx = 0; column_idx < info.read_columns.size(); ++column_idx)
{
if (!info.read_columns[column_idx])
{
size_t column_size = columns[column_idx]->size();
if (column_size == 0)
throw Exception("Unexpected empty column", ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS);
block_missing_values.setBit(column_idx, column_size - 1);
}
}
}
catch (Exception & e)
{
......
......@@ -33,6 +33,8 @@ public:
Block getHeader() const override { return sample; }
const BlockMissingValues & getMissingValues() const override { return block_missing_values; }
protected:
Block readImpl() override;
......@@ -40,6 +42,7 @@ private:
RowInputStreamPtr row_input;
Block sample;
size_t max_block_size;
BlockMissingValues block_missing_values;
UInt64 allow_errors_num;
Float64 allow_errors_ratio;
......
......@@ -111,7 +111,7 @@ void CSVRowInputStream::readPrefix()
}
bool CSVRowInputStream::read(MutableColumns & columns)
bool CSVRowInputStream::read(MutableColumns & columns, RowReadExtension &)
{
if (istr.eof())
return false;
......
......@@ -21,7 +21,7 @@ public:
*/
CSVRowInputStream(ReadBuffer & istr_, const Block & header_, bool with_names_, const FormatSettings & format_settings);
bool read(MutableColumns & columns) override;
bool read(MutableColumns & columns, RowReadExtension &) override;
void readPrefix() override;
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
......
......@@ -200,7 +200,7 @@ CapnProtoRowInputStream::CapnProtoRowInputStream(ReadBuffer & istr_, const Block
}
bool CapnProtoRowInputStream::read(MutableColumns & columns)
bool CapnProtoRowInputStream::read(MutableColumns & columns, RowReadExtension &)
{
if (istr.eof())
return false;
......
......@@ -34,7 +34,7 @@ public:
*/
CapnProtoRowInputStream(ReadBuffer & istr_, const Block & header_, const String & schema_dir, const String & schema_file, const String & root_object);
bool read(MutableColumns & columns) override;
bool read(MutableColumns & columns, RowReadExtension &) override;
private:
// Build a traversal plan from a sorted list of fields
......
......@@ -10,6 +10,13 @@
namespace DB
{
/// Contains extra information about read data.
struct RowReadExtension
{
/// IRowInputStream.read() output. It contains non zero for columns that actually read from the source and zero otherwise.
/// It's used to attach defaults for partially filled rows.
std::vector<UInt8> read_columns;
};
/** Interface of stream, that allows to read data by rows.
*/
......@@ -19,7 +26,7 @@ public:
/** Read next row and append it to the columns.
* If no more rows - return false.
*/
virtual bool read(MutableColumns & columns) = 0;
virtual bool read(MutableColumns & columns, RowReadExtension & extra) = 0;
virtual void readPrefix() {} /// delimiter before begin of result
virtual void readSuffix() {} /// delimiter after end of result
......
......@@ -209,7 +209,8 @@ void JSONEachRowRowInputStream::readNestedData(const String & name, MutableColum
nested_prefix_length = 0;
}
bool JSONEachRowRowInputStream::read(MutableColumns & columns)
bool JSONEachRowRowInputStream::read(MutableColumns & columns, RowReadExtension & ext)
{
skipWhitespaceIfAny(istr);
......@@ -229,7 +230,6 @@ bool JSONEachRowRowInputStream::read(MutableColumns & columns)
size_t num_columns = columns.size();
/// Set of columns for which the values were read. The rest will be filled with default values.
/// TODO Ability to provide your DEFAULTs.
read_columns.assign(num_columns, false);
nested_prefix_length = 0;
......@@ -240,6 +240,8 @@ bool JSONEachRowRowInputStream::read(MutableColumns & columns)
if (!read_columns[i])
header.getByPosition(i).type->insertDefaultInto(*columns[i]);
/// return info about defaults set
ext.read_columns = read_columns;
return true;
}
......
......@@ -22,7 +22,7 @@ class JSONEachRowRowInputStream : public IRowInputStream
public:
JSONEachRowRowInputStream(ReadBuffer & istr_, const Block & header_, const FormatSettings & format_settings);
bool read(MutableColumns & columns) override;
bool read(MutableColumns & columns, RowReadExtension & ext) override;
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
......
......@@ -88,7 +88,7 @@ static bool readName(ReadBuffer & buf, StringRef & ref, String & tmp)
}
bool TSKVRowInputStream::read(MutableColumns & columns)
bool TSKVRowInputStream::read(MutableColumns & columns, RowReadExtension &)
{
if (istr.eof())
return false;
......
......@@ -25,7 +25,7 @@ class TSKVRowInputStream : public IRowInputStream
public:
TSKVRowInputStream(ReadBuffer & istr_, const Block & header_, const FormatSettings & format_settings);
bool read(MutableColumns & columns) override;
bool read(MutableColumns & columns, RowReadExtension &) override;
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
......
......@@ -75,7 +75,7 @@ static void checkForCarriageReturn(ReadBuffer & istr)
}
bool TabSeparatedRowInputStream::read(MutableColumns & columns)
bool TabSeparatedRowInputStream::read(MutableColumns & columns, RowReadExtension &)
{
if (istr.eof())
return false;
......
......@@ -22,7 +22,7 @@ public:
TabSeparatedRowInputStream(
ReadBuffer & istr_, const Block & header_, bool with_names_, bool with_types_, const FormatSettings & format_settings);
bool read(MutableColumns & columns) override;
bool read(MutableColumns & columns, RowReadExtension &) override;
void readPrefix() override;
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
......
......@@ -37,7 +37,7 @@ ValuesRowInputStream::ValuesRowInputStream(ReadBuffer & istr_, const Block & hea
}
bool ValuesRowInputStream::read(MutableColumns & columns)
bool ValuesRowInputStream::read(MutableColumns & columns, RowReadExtension &)
{
size_t num_columns = columns.size();
......
......@@ -23,7 +23,7 @@ public:
*/
ValuesRowInputStream(ReadBuffer & istr_, const Block & header_, const Context & context_, const FormatSettings & format_settings);
bool read(MutableColumns & columns) override;
bool read(MutableColumns & columns, RowReadExtension &) override;
private:
ReadBuffer & istr;
......
......@@ -123,6 +123,7 @@ private:
using ProgressCallback = std::function<void(const Progress & progress)>;
ProgressCallback progress_callback; /// Callback for tracking progress of query execution.
QueryStatus * process_list_elem = nullptr; /// For tracking total resource usage for query.
std::pair<String, String> insertion_table; /// Saved insertion table in query context
String default_format; /// Format, used when server formats data by itself and if query does not have FORMAT specification.
/// Thus, used in HTTP interface. If not specified - then some globally default format is used.
......@@ -233,6 +234,9 @@ public:
void setCurrentDatabase(const String & name);
void setCurrentQueryId(const String & query_id);
void setInsertionTable(std::pair<String, String> && db_and_table) { insertion_table = db_and_table; }
const std::pair<String, String> & getInsertionTable() const { return insertion_table; }
String getDefaultFormat() const; /// If default_format is not specified, some global default format is returned.
void setDefaultFormat(const String & name);
......
......@@ -159,4 +159,10 @@ void InterpreterInsertQuery::checkAccess(const ASTInsertQuery & query)
throw Exception("Cannot insert into table in readonly mode", ErrorCodes::READONLY);
}
std::pair<String, String> InterpreterInsertQuery::getDatabaseTable() const
{
ASTInsertQuery & query = typeid_cast<ASTInsertQuery &>(*query_ptr);
return {query.database, query.table};
}
}
......@@ -24,6 +24,8 @@ public:
*/
BlockIO execute() override;
std::pair<String, String> getDatabaseTable() const;
private:
StoragePtr getTable(const ASTInsertQuery & query);
Block getSampleBlock(const ASTInsertQuery & query, const StoragePtr & table);
......
......@@ -125,6 +125,7 @@ struct Settings
M(SettingUInt64, max_concurrent_queries_for_user, 0, "The maximum number of concurrent requests per user.") \
\
M(SettingBool, insert_deduplicate, true, "For INSERT queries in the replicated table, specifies that deduplication of insertings blocks should be preformed") \
M(SettingBool, insert_sample_with_metadata, false, "For INSERT queries, specifies that the server need to send metadata about column defaults to the client. This will be used to calculate default expressions.") \
\
M(SettingUInt64, insert_quorum, 0, "For INSERT queries in the replicated table, wait writing for the specified number of replicas and linearize the addition of the data. 0 - disabled.") \
M(SettingMilliseconds, insert_quorum_timeout, 600000, "") \
......
......@@ -12,14 +12,8 @@
namespace DB
{
void evaluateMissingDefaults(Block & block,
const NamesAndTypesList & required_columns,
const ColumnDefaults & column_defaults,
const Context & context)
static ASTPtr requiredExpressions(Block & block, const NamesAndTypesList & required_columns, const ColumnDefaults & column_defaults)
{
if (column_defaults.empty())
return;
ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
for (const auto & column : required_columns)
......@@ -35,20 +29,35 @@ void evaluateMissingDefaults(Block & block,
setAlias(it->second.expression->clone(), it->first));
}
/// nothing to evaluate
if (default_expr_list->children.empty())
return nullptr;
return default_expr_list;
}
void evaluateMissingDefaults(Block & block,
const NamesAndTypesList & required_columns,
const ColumnDefaults & column_defaults,
const Context & context, bool save_unneeded_columns)
{
if (column_defaults.empty())
return;
ASTPtr default_expr_list = requiredExpressions(block, required_columns, column_defaults);
if (!default_expr_list)
return;
if (!save_unneeded_columns)
{
auto syntax_result = SyntaxAnalyzer(context, {}).analyze(default_expr_list, block.getNamesAndTypesList());
ExpressionAnalyzer{default_expr_list, syntax_result, context}.getActions(true)->execute(block);
return;
}
/** ExpressionAnalyzer eliminates "unused" columns, in order to ensure their safety
* we are going to operate on a copy instead of the original block */
Block copy_block{block};
/// evaluate default values for defaulted columns
NamesAndTypesList available_columns;
for (size_t i = 0, size = block.columns(); i < size; ++i)
available_columns.emplace_back(block.getByPosition(i).name, block.getByPosition(i).type);
auto syntax_result = SyntaxAnalyzer(context, {}).analyze(default_expr_list, available_columns);
auto syntax_result = SyntaxAnalyzer(context, {}).analyze(default_expr_list, block.getNamesAndTypesList());
ExpressionAnalyzer{default_expr_list, syntax_result, context}.getActions(true)->execute(copy_block);
/// move evaluated columns to the original block, materializing them at the same time
......
......@@ -15,6 +15,6 @@ struct ColumnDefault;
void evaluateMissingDefaults(Block & block,
const NamesAndTypesList & required_columns,
const std::unordered_map<std::string, ColumnDefault> & column_defaults,
const Context & context);
const Context & context, bool save_unneeded_columns = true);
}
......@@ -207,6 +207,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
auto interpreter = InterpreterFactory::get(ast, context, stage);
res = interpreter->execute();
if (auto * insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter))
context.setInsertionTable(insert_interpreter->getDatabaseTable());
if (process_list_entry)
{
......
#include <Parsers/queryToString.h>
#include <Storages/ColumnDefault.h>
#include <Parsers/queryToString.h>
namespace
{
struct AliasNames
{
static constexpr const char * DEFAULT = "DEFAULT";
static constexpr const char * MATERIALIZED = "MATERIALIZED";
static constexpr const char * ALIAS = "ALIAS";
};
}
namespace DB
{
......@@ -14,9 +25,9 @@ namespace ErrorCodes
ColumnDefaultKind columnDefaultKindFromString(const std::string & str)
{
static const std::unordered_map<std::string, ColumnDefaultKind> map{
{ "DEFAULT", ColumnDefaultKind::Default },
{ "MATERIALIZED", ColumnDefaultKind::Materialized },
{ "ALIAS", ColumnDefaultKind::Alias }
{ AliasNames::DEFAULT, ColumnDefaultKind::Default },
{ AliasNames::MATERIALIZED, ColumnDefaultKind::Materialized },
{ AliasNames::ALIAS, ColumnDefaultKind::Alias }
};
const auto it = map.find(str);
......@@ -27,9 +38,9 @@ ColumnDefaultKind columnDefaultKindFromString(const std::string & str)
std::string toString(const ColumnDefaultKind kind)
{
static const std::unordered_map<ColumnDefaultKind, std::string> map{
{ ColumnDefaultKind::Default, "DEFAULT" },
{ ColumnDefaultKind::Materialized, "MATERIALIZED" },
{ ColumnDefaultKind::Alias, "ALIAS" }
{ ColumnDefaultKind::Default, AliasNames::DEFAULT },
{ ColumnDefaultKind::Materialized, AliasNames::MATERIALIZED },
{ ColumnDefaultKind::Alias, AliasNames::ALIAS }
};
const auto it = map.find(kind);
......
......@@ -33,5 +33,4 @@ bool operator==(const ColumnDefault & lhs, const ColumnDefault & rhs);
using ColumnDefaults = std::unordered_map<std::string, ColumnDefault>;
}
......@@ -12,6 +12,8 @@
#include <IO/ReadBufferFromString.h>
#include <DataTypes/DataTypeFactory.h>
#include <Common/Exception.h>
#include <Interpreters/Context.h>
#include <Storages/IStorage.h>
#include <Common/typeid_cast.h>
#include <ext/collection_cast.h>
......@@ -215,4 +217,18 @@ ColumnsDescription ColumnsDescription::parse(const String & str)
return result;
}
const ColumnsDescription * ColumnsDescription::loadFromContext(const Context & context, const String & db, const String & table)
{
if (context.getSettingsRef().insert_sample_with_metadata)
{
if (context.isTableExist(db, table))
{
StoragePtr storage = context.getTable(db, table);
return &storage->getColumns();
}
}
return nullptr;
}
}
......@@ -64,6 +64,7 @@ struct ColumnsDescription
String toString() const;
static ColumnsDescription parse(const String & str);
static const ColumnsDescription * loadFromContext(const Context & context, const String & db, const String & table);
};
}
......@@ -2274,7 +2274,8 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, const Context
ValuesRowInputStream input_stream(buf, partition_key_sample, context, format_settings);
MutableColumns columns = partition_key_sample.cloneEmptyColumns();
if (!input_stream.read(columns))
RowReadExtension unused;
if (!input_stream.read(columns, unused))
throw Exception(
"Could not parse partition value: `" + partition_ast.fields_str.toString() + "`",
ErrorCodes::INVALID_PARTITION_VALUE);
......
......@@ -4,7 +4,7 @@
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/castColumn.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <DataStreams/AddingDefaultBlockInputStream.h>
#include <DataStreams/AddingMissedBlockInputStream.h>
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <Databases/IDatabase.h>
......@@ -198,7 +198,7 @@ BlockInputStreams StorageBuffer::read(
streams_from_dst = destination->read(columns_intersection, query_info, context, processed_stage, max_block_size, num_streams);
for (auto & stream : streams_from_dst)
{
stream = std::make_shared<AddingDefaultBlockInputStream>(
stream = std::make_shared<AddingMissedBlockInputStream>(
stream, header_after_adding_defaults, getColumns().defaults, context);
stream = std::make_shared<ConvertingBlockInputStream>(
context, stream, header, ConvertingBlockInputStream::MatchColumnsMode::Name);
......
......@@ -14,6 +14,7 @@
#include <Formats/FormatFactory.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h>
......@@ -194,7 +195,11 @@ BlockInputStreams StorageFile::read(
size_t max_block_size,
unsigned /*num_streams*/)
{
return BlockInputStreams(1, std::make_shared<StorageFileBlockInputStream>(*this, context, max_block_size));
BlockInputStreamPtr block_input = std::make_shared<StorageFileBlockInputStream>(*this, context, max_block_size);
const ColumnsDescription & columns = getColumns();
if (columns.defaults.empty())
return {block_input};
return {std::make_shared<AddingDefaultsBlockInputStream>(block_input, columns.defaults, context)};
}
......
......@@ -12,6 +12,7 @@
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <Poco/Net/HTTPRequest.h>
......@@ -164,7 +165,7 @@ BlockInputStreams IStorageURLBase::read(const Names & column_names,
for (const auto & [param, value] : params)
request_uri.addQueryParameter(param, value);
return {std::make_shared<StorageURLBlockInputStream>(request_uri,
BlockInputStreamPtr block_input = std::make_shared<StorageURLBlockInputStream>(request_uri,
getReadMethod(),
getReadPOSTDataCallback(column_names, query_info, context, processed_stage, max_block_size),
format_name,
......@@ -172,7 +173,13 @@ BlockInputStreams IStorageURLBase::read(const Names & column_names,
getHeaderBlock(column_names),
context,
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef()))};
ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef()));
const ColumnsDescription & columns = getColumns();
if (columns.defaults.empty())
return {block_input};
return {std::make_shared<AddingDefaultsBlockInputStream>(block_input, columns.defaults, context)};
}
void IStorageURLBase::rename(const String & /*new_path_to_db*/, const String & /*new_database_name*/, const String & /*new_table_name*/) {}
......
0 0 6 6 6
0 5 5 1.7917595 5
1 1 2 1.0986123 42
1 1 2 1.0986123 42
2 2 4 1.609438 2
3 3 3 3 3
4 0 4 1.609438 42
SET insert_sample_with_metadata=1;
CREATE DATABASE IF NOT EXISTS test;
DROP TABLE IF EXISTS test.defaults;
CREATE TABLE IF NOT EXISTS test.defaults
(
x UInt32,
y UInt32,
a DEFAULT x + y,
b Float32 DEFAULT log(1 + x + y),
c UInt32 DEFAULT 42,
e MATERIALIZED x + y,
f ALIAS x + y
) ENGINE = Memory;
INSERT INTO test.defaults FORMAT JSONEachRow {"x":1, "y":1};
INSERT INTO test.defaults (x, y) SELECT x, y FROM test.defaults LIMIT 1;
INSERT INTO test.defaults FORMAT JSONEachRow {"x":2, "y":2, "c":2};
INSERT INTO test.defaults FORMAT JSONEachRow {"x":3, "y":3, "a":3, "b":3, "c":3};
INSERT INTO test.defaults FORMAT JSONEachRow {"x":4} {"y":5, "c":5} {"a":6, "b":6, "c":6};
SELECT * FROM test.defaults ORDER BY (x, y);
DROP TABLE IF EXISTS test.defaults;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册