提交 d164024e 编写于 作者: A Alexey Milovidov

Better semantic of sharing columns: development [#CLICKHOUSE-2].

上级 037d304a
Subproject commit 81d4fdfcb887f89b0f7b1e9b503cbe63e6d8366b
Subproject commit bcf9ebad48b2162d25f5fc432b176d74a09f498d
......@@ -300,6 +300,36 @@ Block Block::cloneEmpty() const
}
MutableColumns Block::cloneEmptyColumns() const
{
size_t num_columns = data.size();
MutableColumns columns(num_columns);
for (size_t i = 0; i < num_columns; ++i)
columns[i] = data[i].column->cloneEmpty();
return columns;
}
void Block::setColumns(MutableColumns && columns)
{
size_t num_columns = data.size();
for (size_t i = 0; i < num_columns; ++i)
data[i].column = std::move(columns[i]);
}
Block Block::cloneWithColumns(MutableColumns && columns) const
{
Block res;
size_t num_columns = data.size();
for (size_t i = 0; i < num_columns; ++i)
res.insert({ std::move(columns[i]), data[i].type ? data[i].type->clone() : nullptr, data[i].name });
return res;
}
Block Block::sortColumns() const
{
Block sorted_block;
......
......@@ -105,6 +105,13 @@ public:
/** Get the same block, but empty. */
Block cloneEmpty() const;
/** Get empty columns with the same types as in block. */
MutableColumns cloneEmptyColumns() const;
/** Replace columns in a block */
void setColumns(MutableColumns && columns);
Block cloneWithColumns(MutableColumns && columns) const;
/** Get a block with columns that have been rearranged in the order of their names. */
Block sortColumns() const;
......
......@@ -6,24 +6,20 @@
namespace DB
{
BinaryRowInputStream::BinaryRowInputStream(ReadBuffer & istr_)
: istr(istr_)
BinaryRowInputStream::BinaryRowInputStream(ReadBuffer & istr_, const Block & header_)
: istr(istr_), header(header_)
{
}
bool BinaryRowInputStream::read(Block & block)
bool BinaryRowInputStream::read(MutableColumns & columns)
{
if (istr.eof())
return false;
size_t columns = block.columns();
for (size_t i = 0; i < columns; ++i)
{
MutableColumnPtr column = block.getByPosition(i).column->mutate();
block.getByPosition(i).type->deserializeBinary(*column, istr);
block.getByPosition(i).column = std::move(column);
}
size_t num_columns = columns.size();
for (size_t i = 0; i < num_columns; ++i)
header.getByPosition(i).type->deserializeBinary(*columns[i], istr);
return true;
}
......
......@@ -15,12 +15,13 @@ class ReadBuffer;
class BinaryRowInputStream : public IRowInputStream
{
public:
BinaryRowInputStream(ReadBuffer & istr_);
BinaryRowInputStream(ReadBuffer & istr_, const Block & header_);
bool read(Block & block) override;
bool read(MutableColumns & columns) override;
private:
ReadBuffer & istr;
const Block & header;
};
}
......@@ -43,7 +43,8 @@ static bool isParseError(int code)
Block BlockInputStreamFromRowInputStream::readImpl()
{
Block res = sample.cloneEmpty();
size_t num_columns = sample.columns();
MutableColumns columns = sample.cloneEmptyColumns();
try
{
......@@ -52,7 +53,7 @@ Block BlockInputStreamFromRowInputStream::readImpl()
try
{
++total_rows;
if (!row_input->read(res))
if (!row_input->read(columns))
break;
}
catch (Exception & e)
......@@ -87,14 +88,13 @@ Block BlockInputStreamFromRowInputStream::readImpl()
/// Truncate all columns in block to minimal size (remove values, that was appended to only part of columns).
size_t columns = res.columns();
size_t min_size = std::numeric_limits<size_t>::max();
for (size_t column_idx = 0; column_idx < columns; ++column_idx)
min_size = std::min(min_size, res.getByPosition(column_idx).column->size());
for (size_t column_idx = 0; column_idx < num_columns; ++column_idx)
min_size = std::min(min_size, columns[column_idx]->size());
for (size_t column_idx = 0; column_idx < columns; ++column_idx)
for (size_t column_idx = 0; column_idx < num_columns; ++column_idx)
{
auto & column = res.getByPosition(column_idx).column;
auto & column = columns[column_idx];
if (column->size() > min_size)
column->popBack(column->size() - min_size);
}
......@@ -120,10 +120,10 @@ Block BlockInputStreamFromRowInputStream::readImpl()
throw;
}
if (res.rows() == 0)
res.clear();
if (columns.empty() || columns[0]->empty())
return {};
return res;
return sample.cloneWithColumns(columns);
}
}
......@@ -43,7 +43,7 @@ protected:
private:
RowInputStreamPtr row_input;
const Block sample;
const Block & sample;
size_t max_block_size;
UInt64 allow_errors_num;
......
......@@ -15,13 +15,13 @@ namespace ErrorCodes
}
CSVRowInputStream::CSVRowInputStream(ReadBuffer & istr_, const Block & sample_, const char delimiter_, bool with_names_, bool with_types_)
: istr(istr_), sample(sample_), delimiter(delimiter_), with_names(with_names_), with_types(with_types_)
CSVRowInputStream::CSVRowInputStream(ReadBuffer & istr_, const Block & header_, const char delimiter_, bool with_names_, bool with_types_)
: istr(istr_), header(header_), delimiter(delimiter_), with_names(with_names_), with_types(with_types_)
{
size_t columns = sample.columns();
data_types.resize(columns);
for (size_t i = 0; i < columns; ++i)
data_types[i] = sample.safeGetByPosition(i).type;
size_t num_columns = header.columns();
data_types.resize(num_columns);
for (size_t i = 0; i < num_columns; ++i)
data_types[i] = header.safeGetByPosition(i).type;
}
......@@ -81,16 +81,16 @@ static inline void skipWhitespacesAndTabs(ReadBuffer & buf)
}
static void skipRow(ReadBuffer & istr, const char delimiter, size_t columns)
static void skipRow(ReadBuffer & istr, const char delimiter, size_t num_columns)
{
String tmp;
for (size_t i = 0; i < columns; ++i)
for (size_t i = 0; i < num_columns; ++i)
{
skipWhitespacesAndTabs(istr);
readCSVString(tmp, istr);
skipWhitespacesAndTabs(istr);
skipDelimiter(istr, delimiter, i + 1 == columns);
skipDelimiter(istr, delimiter, i + 1 == num_columns);
}
}
......@@ -101,18 +101,18 @@ void CSVRowInputStream::readPrefix()
/// so BOM at beginning of stream cannot be confused with BOM in first string value, and it is safe to skip it.
skipBOMIfExists(istr);
size_t columns = sample.columns();
size_t num_columns = data_types.size();
String tmp;
if (with_names)
skipRow(istr, delimiter, columns);
skipRow(istr, delimiter, num_columns);
if (with_types)
skipRow(istr, delimiter, columns);
skipRow(istr, delimiter, num_columns);
}
bool CSVRowInputStream::read(Block & block)
bool CSVRowInputStream::read(MutableColumns & columns)
{
if (istr.eof())
return false;
......@@ -124,7 +124,7 @@ bool CSVRowInputStream::read(Block & block)
for (size_t i = 0; i < size; ++i)
{
skipWhitespacesAndTabs(istr);
data_types[i]->deserializeTextCSV(*block.getByPosition(i).column.get(), istr, delimiter);
data_types[i]->deserializeTextCSV(columns[i], istr, delimiter);
skipWhitespacesAndTabs(istr);
skipDelimiter(istr, delimiter, i + 1 == size);
......@@ -140,7 +140,8 @@ String CSVRowInputStream::getDiagnosticInfo()
return {};
WriteBufferFromOwnString out;
Block block = sample.cloneEmpty();
MutableColumns columns = header.cloneEmptyColumns();
/// It is possible to display detailed diagnostics only if the last and next to last rows are still in the read buffer.
size_t bytes_read_at_start_of_buffer = istr.count() - istr.offset();
......@@ -151,14 +152,14 @@ String CSVRowInputStream::getDiagnosticInfo()
}
size_t max_length_of_column_name = 0;
for (size_t i = 0; i < sample.columns(); ++i)
if (sample.safeGetByPosition(i).name.size() > max_length_of_column_name)
max_length_of_column_name = sample.safeGetByPosition(i).name.size();
for (size_t i = 0; i < header.columns(); ++i)
if (header.safeGetByPosition(i).name.size() > max_length_of_column_name)
max_length_of_column_name = header.safeGetByPosition(i).name.size();
size_t max_length_of_data_type_name = 0;
for (size_t i = 0; i < sample.columns(); ++i)
if (sample.safeGetByPosition(i).type->getName().size() > max_length_of_data_type_name)
max_length_of_data_type_name = sample.safeGetByPosition(i).type->getName().size();
for (size_t i = 0; i < header.columns(); ++i)
if (header.safeGetByPosition(i).type->getName().size() > max_length_of_data_type_name)
max_length_of_data_type_name = header.safeGetByPosition(i).type->getName().size();
/// Roll back the cursor to the beginning of the previous or current row and parse all over again. But now we derive detailed information.
......@@ -167,7 +168,7 @@ String CSVRowInputStream::getDiagnosticInfo()
istr.position() = pos_of_prev_row;
out << "\nRow " << (row_num - 1) << ":\n";
if (!parseRowAndPrintDiagnosticInfo(block, out, max_length_of_column_name, max_length_of_data_type_name))
if (!parseRowAndPrintDiagnosticInfo(columns, out, max_length_of_column_name, max_length_of_data_type_name))
return out.str();
}
else
......@@ -182,14 +183,14 @@ String CSVRowInputStream::getDiagnosticInfo()
}
out << "\nRow " << row_num << ":\n";
parseRowAndPrintDiagnosticInfo(block, out, max_length_of_column_name, max_length_of_data_type_name);
parseRowAndPrintDiagnosticInfo(columns, out, max_length_of_column_name, max_length_of_data_type_name);
out << "\n";
return out.str();
}
bool CSVRowInputStream::parseRowAndPrintDiagnosticInfo(Block & block,
bool CSVRowInputStream::parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
WriteBuffer & out, size_t max_length_of_column_name, size_t max_length_of_data_type_name)
{
size_t size = data_types.size();
......@@ -202,7 +203,7 @@ bool CSVRowInputStream::parseRowAndPrintDiagnosticInfo(Block & block,
}
out << "Column " << i << ", " << std::string((i < 10 ? 2 : i < 100 ? 1 : 0), ' ')
<< "name: " << sample.safeGetByPosition(i).name << ", " << std::string(max_length_of_column_name - sample.safeGetByPosition(i).name.size(), ' ')
<< "name: " << header.safeGetByPosition(i).name << ", " << std::string(max_length_of_column_name - header.safeGetByPosition(i).name.size(), ' ')
<< "type: " << data_types[i]->getName() << ", " << std::string(max_length_of_data_type_name - data_types[i]->getName().size(), ' ');
BufferBase::Position prev_position = istr.position();
......@@ -213,7 +214,7 @@ bool CSVRowInputStream::parseRowAndPrintDiagnosticInfo(Block & block,
{
skipWhitespacesAndTabs(istr);
prev_position = istr.position();
data_types[i]->deserializeTextCSV(*block.safeGetByPosition(i).column, istr, delimiter);
data_types[i]->deserializeTextCSV(columns[i], istr, delimiter);
curr_position = istr.position();
skipWhitespacesAndTabs(istr);
}
......
......@@ -18,9 +18,9 @@ public:
/** with_names - in the first line the header with column names
* with_types - on the next line header with type names
*/
CSVRowInputStream(ReadBuffer & istr_, const Block & sample_, const char delimiter_, bool with_names_ = false, bool with_types_ = false);
CSVRowInputStream(ReadBuffer & istr_, const Block & header_, const char delimiter_, bool with_names_ = false, bool with_types_ = false);
bool read(Block & block) override;
bool read(MutableColumns & columns) override;
void readPrefix() override;
bool allowSyncAfterError() const override { return true; };
void syncAfterError() override;
......@@ -29,7 +29,7 @@ public:
private:
ReadBuffer & istr;
const Block sample;
const Block & header;
const char delimiter;
bool with_names;
bool with_types;
......@@ -48,7 +48,7 @@ private:
void updateDiagnosticInfo();
bool parseRowAndPrintDiagnosticInfo(Block & block,
bool parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
WriteBuffer & out, size_t max_length_of_column_name, size_t max_length_of_data_type_name);
};
......
......@@ -20,12 +20,12 @@ static String getSchemaPath(const String & schema_dir, const String & schema_fil
return schema_dir + escapeForFileName(schema_file) + ".capnp";
}
CapnProtoRowInputStream::NestedField split(const Block & sample, size_t i)
CapnProtoRowInputStream::NestedField split(const Block & header, size_t i)
{
CapnProtoRowInputStream::NestedField field = {{}, i};
// Remove leading dot in field definition, e.g. ".msg" -> "msg"
String name(sample.safeGetByPosition(i).name);
String name(header.safeGetByPosition(i).name);
if (name.size() > 0 && name[0] == '.')
name.erase(0, 1);
......@@ -91,7 +91,7 @@ void CapnProtoRowInputStream::createActions(const NestedFieldList & sortedFields
String last;
size_t level = 0;
capnp::StructSchema::Field parent;
for (const auto & field : sortedFields)
{
// Move to a different field in the same structure, keep parent
......@@ -115,8 +115,8 @@ void CapnProtoRowInputStream::createActions(const NestedFieldList & sortedFields
}
}
CapnProtoRowInputStream::CapnProtoRowInputStream(ReadBuffer & istr_, const Block & sample_, const String & schema_dir, const String & schema_file, const String & root_object)
: istr(istr_), sample(sample_), parser(std::make_shared<SchemaParser>())
CapnProtoRowInputStream::CapnProtoRowInputStream(ReadBuffer & istr_, const Block & header_, const String & schema_dir, const String & schema_file, const String & root_object)
: istr(istr_), header(header_), parser(std::make_shared<SchemaParser>())
{
// Parse the schema and fetch the root object
......@@ -129,9 +129,9 @@ CapnProtoRowInputStream::CapnProtoRowInputStream(ReadBuffer & istr_, const Block
* and the nesting level doesn't decrease to make traversal easier.
*/
NestedFieldList list;
size_t columns = sample.columns();
for (size_t i = 0; i < columns; ++i)
list.push_back(split(sample, i));
size_t num_columns = header.columns();
for (size_t i = 0; i < num_columns; ++i)
list.push_back(split(header, i));
// Reorder list to make sure we don't have to backtrack
std::sort(list.begin(), list.end(), [](const NestedField & a, const NestedField & b)
......@@ -145,7 +145,7 @@ CapnProtoRowInputStream::CapnProtoRowInputStream(ReadBuffer & istr_, const Block
}
bool CapnProtoRowInputStream::read(Block & block)
bool CapnProtoRowInputStream::read(MutableColumns & columns)
{
if (istr.eof())
return false;
......@@ -153,7 +153,7 @@ bool CapnProtoRowInputStream::read(Block & block)
// Read from underlying buffer directly
auto buf = istr.buffer();
auto base = reinterpret_cast<const capnp::word *>(istr.position());
// Check if there's enough bytes in the buffer to read the full message
kj::Array<capnp::word> heap_array;
auto array = kj::arrayPtr(base, buf.size() - istr.offset());
......@@ -174,9 +174,9 @@ bool CapnProtoRowInputStream::read(Block & block)
{
switch (action.type) {
case Action::READ: {
auto & col = block.getByPosition(action.column);
auto & col = columns[i];
Field value = convertNodeToField(stack.back().get(action.field));
col.column->insert(value);
col->insert(value);
break;
}
case Action::POP:
......
......@@ -32,9 +32,9 @@ public:
* schema_file - location of the capnproto schema, e.g. "schema.canpn"
* root_object - name to the root object, e.g. "Message"
*/
CapnProtoRowInputStream(ReadBuffer & istr_, const Block & sample_, const String & schema_dir, const String & schema_file, const String & root_object);
CapnProtoRowInputStream(ReadBuffer & istr_, const Block & header_, const String & schema_dir, const String & schema_file, const String & root_object);
bool read(Block & block) override;
bool read(MutableColumns & columns) override;
private:
// Build a traversal plan from a sorted list of fields
......@@ -62,7 +62,7 @@ private:
using SchemaParser = DestructorCatcher<capnp::SchemaParser>;
ReadBuffer & istr;
const Block sample;
const Block & header;
std::shared_ptr<SchemaParser> parser;
capnp::StructSchema root;
std::vector<Action> actions;
......@@ -70,4 +70,4 @@ private:
}
#endif // USE_CAPNP
\ No newline at end of file
#endif // USE_CAPNP
......@@ -4,21 +4,22 @@
#include <memory>
#include <string>
#include <Columns/IColumn.h>
namespace DB
{
class Block;
/** Interface of stream, that allows to read data by rows.
*/
class IRowInputStream : private boost::noncopyable
{
public:
/** Read next row and append it to block.
/** Read next row and append it to the columns.
* If no more rows - return false.
*/
virtual bool read(Block & block) = 0;
virtual bool read(MutableColumns & columns) = 0;
virtual void readPrefix() {}; /// delimiter before begin of result
virtual void readSuffix() {}; /// delimiter after end of result
......
......@@ -12,15 +12,15 @@ namespace ErrorCodes
}
JSONEachRowRowInputStream::JSONEachRowRowInputStream(ReadBuffer & istr_, const Block & sample_, bool skip_unknown_)
: istr(istr_), sample(sample_), skip_unknown(skip_unknown_), name_map(sample.columns())
JSONEachRowRowInputStream::JSONEachRowRowInputStream(ReadBuffer & istr_, const Block & header_, bool skip_unknown_)
: istr(istr_), header(header_), skip_unknown(skip_unknown_), name_map(header.columns())
{
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(istr);
size_t columns = sample.columns();
for (size_t i = 0; i < columns; ++i)
name_map[sample.safeGetByPosition(i).name] = i; /// NOTE You could place names more cache-locally.
size_t num_columns = header.columns();
for (size_t i = 0; i < num_columns; ++i)
name_map[header.safeGetByPosition(i).name] = i; /// NOTE You could place names more cache-locally.
}
......@@ -58,15 +58,15 @@ static void skipColonDelimeter(ReadBuffer & istr)
}
bool JSONEachRowRowInputStream::read(Block & block)
bool JSONEachRowRowInputStream::read(MutableColumns & columns)
{
skipWhitespaceIfAny(istr);
/// We consume ;, or \n before scanning a new row, instead scanning to next row at the end.
/// The reason is that if we want an exact number of rows read with LIMIT x
/// The reason is that if we want an exact number of rows read with LIMIT x
/// from a streaming table engine with text data format, like File or Kafka
/// then seeking to next ;, or \n would trigger reading of an extra row at the end.
/// Semicolon is added for convenience as it could be used at end of INSERT query.
if (!istr.eof() && (*istr.position() == ',' || *istr.position() == ';'))
++istr.position();
......@@ -77,12 +77,12 @@ bool JSONEachRowRowInputStream::read(Block & block)
assertChar('{', istr);
size_t columns = block.columns();
size_t num_columns = columns.size();
/// Set of columns for which the values were read. The rest will be filled with default values.
/// TODO Ability to provide your DEFAULTs.
bool read_columns[columns];
memset(read_columns, 0, columns);
bool read_columns[num_columns];
memset(read_columns, 0, num_columns);
bool first = true;
while (true)
......@@ -130,19 +130,13 @@ bool JSONEachRowRowInputStream::read(Block & block)
read_columns[index] = true;
auto & col = block.getByPosition(index);
col.type->deserializeTextJSON(*col.column, istr);
header.getByPosition(index).type->deserializeTextJSON(*columns[index], istr);
}
/// Fill non-visited columns with the default values.
for (size_t i = 0; i < columns; ++i)
{
for (size_t i = 0; i < num_columns; ++i)
if (!read_columns[i])
{
ColumnWithTypeAndName & elem = block.getByPosition(i);
elem.type->insertDefaultInto(*elem.column);
}
}
header.getByPosition(i).type->insertDefaultInto(*columns[i]);
return true;
}
......
......@@ -18,15 +18,15 @@ class ReadBuffer;
class JSONEachRowRowInputStream : public IRowInputStream
{
public:
JSONEachRowRowInputStream(ReadBuffer & istr_, const Block & sample_, bool skip_unknown_);
JSONEachRowRowInputStream(ReadBuffer & istr_, const Block & header_, bool skip_unknown_);
bool read(Block & block) override;
bool read(MutableColumns & columns) override;
bool allowSyncAfterError() const override { return true; };
void syncAfterError() override;
private:
ReadBuffer & istr;
const Block sample;
const Block & header;
bool skip_unknown;
/// Buffer for the read from the stream field name. Used when you have to copy it.
......
......@@ -14,16 +14,16 @@ namespace ErrorCodes
}
TSKVRowInputStream::TSKVRowInputStream(ReadBuffer & istr_, const Block & sample_, bool skip_unknown_)
: istr(istr_), sample(sample_), skip_unknown(skip_unknown_), name_map(sample.columns())
TSKVRowInputStream::TSKVRowInputStream(ReadBuffer & istr_, const Block & header_, bool skip_unknown_)
: istr(istr_), header(header_), skip_unknown(skip_unknown_), name_map(header.columns())
{
/// In this format, we assume that column name cannot contain BOM,
/// so BOM at beginning of stream cannot be confused with name of field, and it is safe to skip it.
skipBOMIfExists(istr);
size_t columns = sample.columns();
for (size_t i = 0; i < columns; ++i)
name_map[sample.safeGetByPosition(i).name] = i; /// NOTE You could place names more cache-locally.
size_t num_columns = header.columns();
for (size_t i = 0; i < num_columns; ++i)
name_map[header.safeGetByPosition(i).name] = i; /// NOTE You could place names more cache-locally.
}
......@@ -86,17 +86,17 @@ static bool readName(ReadBuffer & buf, StringRef & ref, String & tmp)
}
bool TSKVRowInputStream::read(Block & block)
bool TSKVRowInputStream::read(MutableColumns & columns)
{
if (istr.eof())
return false;
size_t columns = block.columns();
size_t num_columns = columns.size();
/// Set of columns for which the values were read. The rest will be filled with default values.
/// TODO Ability to provide your DEFAULTs.
bool read_columns[columns];
memset(read_columns, 0, columns);
bool read_columns[num_columns];
memset(read_columns, 0, num_columns);
if (unlikely(*istr.position() == '\n'))
{
......@@ -135,8 +135,7 @@ bool TSKVRowInputStream::read(Block & block)
read_columns[index] = true;
auto & col = block.getByPosition(index);
col.type->deserializeTextEscaped(*col.column, istr);
sample.getByPosition(index).type->deserializeTextEscaped(*columns[index], istr);
}
}
else
......@@ -165,7 +164,7 @@ bool TSKVRowInputStream::read(Block & block)
/// Possibly a garbage was written into column, remove it
if (index >= 0)
{
block.getByPosition(index).column->popBack(1);
columns[index]->popBack(1);
read_columns[index] = false;
}
......@@ -175,14 +174,9 @@ bool TSKVRowInputStream::read(Block & block)
}
/// Fill in the not met columns with default values.
for (size_t i = 0; i < columns; ++i)
{
for (size_t i = 0; i < num_columns; ++i)
if (!read_columns[i])
{
ColumnWithTypeAndName & elem = block.getByPosition(i);
elem.type->insertDefaultInto(*elem.column);
}
}
header.getByPosition(i).type->insertDefaultInto(*columns[i]);
return true;
}
......
......@@ -22,15 +22,15 @@ class ReadBuffer;
class TSKVRowInputStream : public IRowInputStream
{
public:
TSKVRowInputStream(ReadBuffer & istr_, const Block & sample_, bool skip_unknown_);
TSKVRowInputStream(ReadBuffer & istr_, const Block & header_, bool skip_unknown_);
bool read(Block & block) override;
bool read(MutableColumns & columns) override;
bool allowSyncAfterError() const override { return true; };
void syncAfterError() override;
private:
ReadBuffer & istr;
const Block sample;
const Block & header;
/// Skip unknown fields.
bool skip_unknown;
......
......@@ -16,19 +16,19 @@ namespace ErrorCodes
}
TabSeparatedRowInputStream::TabSeparatedRowInputStream(ReadBuffer & istr_, const Block & sample_, bool with_names_, bool with_types_)
: istr(istr_), sample(sample_), with_names(with_names_), with_types(with_types_)
TabSeparatedRowInputStream::TabSeparatedRowInputStream(ReadBuffer & istr_, const Block & header_, bool with_names_, bool with_types_)
: istr(istr_), header(header_), with_names(with_names_), with_types(with_types_)
{
size_t columns = sample.columns();
data_types.resize(columns);
for (size_t i = 0; i < columns; ++i)
data_types[i] = sample.safeGetByPosition(i).type;
size_t num_columns = header.columns();
data_types.resize(num_columns);
for (size_t i = 0; i < num_columns; ++i)
data_types[i] = header.safeGetByPosition(i).type;
}
void TabSeparatedRowInputStream::readPrefix()
{
size_t columns = sample.columns();
size_t num_columns = header.columns();
String tmp;
if (with_names || with_types)
......@@ -41,19 +41,19 @@ void TabSeparatedRowInputStream::readPrefix()
if (with_names)
{
for (size_t i = 0; i < columns; ++i)
for (size_t i = 0; i < num_columns; ++i)
{
readEscapedString(tmp, istr);
assertChar(i == columns - 1 ? '\n' : '\t', istr);
assertChar(i == num_columns - 1 ? '\n' : '\t', istr);
}
}
if (with_types)
{
for (size_t i = 0; i < columns; ++i)
for (size_t i = 0; i < num_columns; ++i)
{
readEscapedString(tmp, istr);
assertChar(i == columns - 1 ? '\n' : '\t', istr);
assertChar(i == num_columns - 1 ? '\n' : '\t', istr);
}
}
}
......@@ -72,7 +72,7 @@ static void checkForCarriageReturn(ReadBuffer & istr)
}
bool TabSeparatedRowInputStream::read(Block & block)
bool TabSeparatedRowInputStream::read(MutableColumns & columns)
{
if (istr.eof())
return false;
......@@ -83,7 +83,7 @@ bool TabSeparatedRowInputStream::read(Block & block)
for (size_t i = 0; i < size; ++i)
{
data_types[i]->deserializeTextEscaped(*block.getByPosition(i).column.get(), istr);
data_types[i]->deserializeTextEscaped(*columns[i], istr);
/// skip separators
if (i + 1 == size)
......@@ -110,7 +110,7 @@ String TabSeparatedRowInputStream::getDiagnosticInfo()
return {};
WriteBufferFromOwnString out;
Block block = sample.cloneEmpty();
MutableColumns columns = header.cloneEmptyColumns();
/// It is possible to display detailed diagnostics only if the last and next to last lines are still in the read buffer.
size_t bytes_read_at_start_of_buffer = istr.count() - istr.offset();
......@@ -121,14 +121,14 @@ String TabSeparatedRowInputStream::getDiagnosticInfo()
}
size_t max_length_of_column_name = 0;
for (size_t i = 0; i < sample.columns(); ++i)
if (sample.safeGetByPosition(i).name.size() > max_length_of_column_name)
max_length_of_column_name = sample.safeGetByPosition(i).name.size();
for (size_t i = 0; i < header.columns(); ++i)
if (header.safeGetByPosition(i).name.size() > max_length_of_column_name)
max_length_of_column_name = header.safeGetByPosition(i).name.size();
size_t max_length_of_data_type_name = 0;
for (size_t i = 0; i < sample.columns(); ++i)
if (sample.safeGetByPosition(i).type->getName().size() > max_length_of_data_type_name)
max_length_of_data_type_name = sample.safeGetByPosition(i).type->getName().size();
for (size_t i = 0; i < header.columns(); ++i)
if (header.safeGetByPosition(i).type->getName().size() > max_length_of_data_type_name)
max_length_of_data_type_name = header.safeGetByPosition(i).type->getName().size();
/// Roll back the cursor to the beginning of the previous or current line and pars all over again. But now we derive detailed information.
......@@ -137,7 +137,7 @@ String TabSeparatedRowInputStream::getDiagnosticInfo()
istr.position() = pos_of_prev_row;
out << "\nRow " << (row_num - 1) << ":\n";
if (!parseRowAndPrintDiagnosticInfo(block, out, max_length_of_column_name, max_length_of_data_type_name))
if (!parseRowAndPrintDiagnosticInfo(columns, out, max_length_of_column_name, max_length_of_data_type_name))
return out.str();
}
else
......@@ -152,14 +152,14 @@ String TabSeparatedRowInputStream::getDiagnosticInfo()
}
out << "\nRow " << row_num << ":\n";
parseRowAndPrintDiagnosticInfo(block, out, max_length_of_column_name, max_length_of_data_type_name);
parseRowAndPrintDiagnosticInfo(columns, out, max_length_of_column_name, max_length_of_data_type_name);
out << "\n";
return out.str();
}
bool TabSeparatedRowInputStream::parseRowAndPrintDiagnosticInfo(Block & block,
bool TabSeparatedRowInputStream::parseRowAndPrintDiagnosticInfo(MutableColumnPtr & columns,
WriteBuffer & out, size_t max_length_of_column_name, size_t max_length_of_data_type_name)
{
size_t size = data_types.size();
......@@ -172,7 +172,7 @@ bool TabSeparatedRowInputStream::parseRowAndPrintDiagnosticInfo(Block & block,
}
out << "Column " << i << ", " << std::string((i < 10 ? 2 : i < 100 ? 1 : 0), ' ')
<< "name: " << sample.safeGetByPosition(i).name << ", " << std::string(max_length_of_column_name - sample.safeGetByPosition(i).name.size(), ' ')
<< "name: " << header.safeGetByPosition(i).name << ", " << std::string(max_length_of_column_name - header.safeGetByPosition(i).name.size(), ' ')
<< "type: " << data_types[i]->getName() << ", " << std::string(max_length_of_data_type_name - data_types[i]->getName().size(), ' ');
auto prev_position = istr.position();
......@@ -180,7 +180,7 @@ bool TabSeparatedRowInputStream::parseRowAndPrintDiagnosticInfo(Block & block,
try
{
data_types[i]->deserializeTextEscaped(*block.safeGetByPosition(i).column, istr);
data_types[i]->deserializeTextEscaped(*columns[i], istr);
}
catch (...)
{
......
......@@ -18,9 +18,9 @@ public:
/** with_names - the first line is the header with the names of the columns
* with_types - on the next line header with type names
*/
TabSeparatedRowInputStream(ReadBuffer & istr_, const Block & sample_, bool with_names_ = false, bool with_types_ = false);
TabSeparatedRowInputStream(ReadBuffer & istr_, const Block & header_, bool with_names_ = false, bool with_types_ = false);
bool read(Block & block) override;
bool read(MutableColumns & columns) override;
void readPrefix() override;
bool allowSyncAfterError() const override { return true; };
void syncAfterError() override;
......@@ -29,7 +29,7 @@ public:
private:
ReadBuffer & istr;
const Block sample;
const Block & header;
bool with_names;
bool with_types;
DataTypes data_types;
......@@ -47,7 +47,7 @@ private:
void updateDiagnosticInfo();
bool parseRowAndPrintDiagnosticInfo(Block & block,
bool parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
WriteBuffer & out, size_t max_length_of_column_name, size_t max_length_of_data_type_name);
};
......
......@@ -25,17 +25,17 @@ namespace ErrorCodes
}
ValuesRowInputStream::ValuesRowInputStream(ReadBuffer & istr_, const Context & context_, bool interpret_expressions_)
: istr(istr_), context(context_), interpret_expressions(interpret_expressions_)
ValuesRowInputStream::ValuesRowInputStream(ReadBuffer & istr_, const Block & header_, const Context & context_, bool interpret_expressions_)
: istr(istr_), header(header_), context(context_), interpret_expressions(interpret_expressions_)
{
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(istr);
}
bool ValuesRowInputStream::read(Block & block)
bool ValuesRowInputStream::read(MutableColumns & columns)
{
size_t size = block.columns();
size_t num_columns = columns.size();
skipWhitespaceIfAny(istr);
......@@ -50,23 +50,21 @@ bool ValuesRowInputStream::read(Block & block)
assertChar('(', istr);
for (size_t i = 0; i < size; ++i)
for (size_t i = 0; i < num_columns; ++i)
{
skipWhitespaceIfAny(istr);
char * prev_istr_position = istr.position();
size_t prev_istr_bytes = istr.count() - istr.offset();
auto & col = block.getByPosition(i);
bool rollback_on_exception = false;
try
{
col.type->deserializeTextQuoted(*col.column, istr);
header.getByPosition(i).type->deserializeTextQuoted(*columns[i], istr);
rollback_on_exception = true;
skipWhitespaceIfAny(istr);
if (i != size - 1)
if (i != num_columns - 1)
assertChar(',', istr);
else
assertChar(')', istr);
......@@ -93,9 +91,9 @@ bool ValuesRowInputStream::read(Block & block)
throw;
if (rollback_on_exception)
col.column->popBack(1);
columns[i]->popBack(1);
IDataType & type = *block.safeGetByPosition(i).type;
IDataType & type = *header.getByPosition(i).type;
Expected expected;
......@@ -123,11 +121,11 @@ bool ValuesRowInputStream::read(Block & block)
ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE};
}
col.column->insert(value);
columns[i]->insert(value);
skipWhitespaceIfAny(istr);
if (i != size - 1)
if (i != num_columns - 1)
assertChar(',', istr);
else
assertChar(')', istr);
......
......@@ -20,12 +20,13 @@ public:
* If interpret_expressions is true, it will, in addition, try to use SQL parser and interpreter
* in case when streaming parser could not parse field (this is very slow).
*/
ValuesRowInputStream(ReadBuffer & istr_, const Context & context_, bool interpret_expressions_);
ValuesRowInputStream(ReadBuffer & istr_, const Block & header_, const Context & context_, bool interpret_expressions_);
bool read(Block & block) override;
bool read(MutableColumns & columns) override;
private:
ReadBuffer & istr;
cosnt Block & header;
const Context & context;
bool interpret_expressions;
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册