未验证 提交 5f261cec 编写于 作者: V Vitaly Baranov 提交者: GitHub

Merge pull request #15199 from filimonov/non_delimited_protobuf

ProtobufSingle format
......@@ -43,6 +43,7 @@ The supported formats are:
| [PrettyNoEscapes](#prettynoescapes) | ✗ | ✔ |
| [PrettySpace](#prettyspace) | ✗ | ✔ |
| [Protobuf](#protobuf) | ✔ | ✔ |
| [ProtobufSingle](#protobufsingle) | ✔ | ✔ |
| [Avro](#data-format-avro) | ✔ | ✔ |
| [AvroConfluent](#data-format-avro-confluent) | ✔ | ✗ |
| [Parquet](#data-format-parquet) | ✔ | ✔ |
......@@ -1076,6 +1077,10 @@ ClickHouse inputs and outputs protobuf messages in the `length-delimited` format
It means before every message should be written its length as a [varint](https://developers.google.com/protocol-buffers/docs/encoding#varints).
See also [how to read/write length-delimited protobuf messages in popular languages](https://cwiki.apache.org/confluence/display/GEODE/Delimiting+Protobuf+Messages).
## ProtobufSingle {#protobufsingle}
Same as [Protobuf](#protobuf) but for storing/parsing single Protobuf message without length delimiters.
## Avro {#data-format-avro}
[Apache Avro](https://avro.apache.org/) is a row-oriented data serialization framework developed within Apache’s Hadoop project.
......
......@@ -27,6 +27,7 @@ ClickHouse может принимать (`INSERT`) и отдавать (`SELECT
| [PrettyNoEscapes](#prettynoescapes) | ✗ | ✔ |
| [PrettySpace](#prettyspace) | ✗ | ✔ |
| [Protobuf](#protobuf) | ✔ | ✔ |
| [ProtobufSingle](#protobufsingle) | ✔ | ✔ |
| [Parquet](#data-format-parquet) | ✔ | ✔ |
| [Arrow](#data-format-arrow) | ✔ | ✔ |
| [ArrowStream](#data-format-arrow-stream) | ✔ | ✔ |
......@@ -948,6 +949,10 @@ message MessageType {
ClickHouse пишет и читает сообщения `Protocol Buffers` в формате `length-delimited`. Это означает, что перед каждым сообщением пишется его длина
в формате [varint](https://developers.google.com/protocol-buffers/docs/encoding#varints). См. также [как читать и записывать сообщения Protocol Buffers в формате length-delimited в различных языках программирования](https://cwiki.apache.org/confluence/display/GEODE/Delimiting+Protobuf+Messages).
## ProtobufSingle {#protobufsingle}
То же, что [Protobuf](#protobuf), но без разделителей. Позволяет записать / прочитать не более одного сообщения за раз.
## Avro {#data-format-avro}
[Apache Avro](https://avro.apache.org/) — это ориентированный на строки фреймворк для сериализации данных. Разработан в рамках проекта Apache Hadoop.
......@@ -958,7 +963,7 @@ ClickHouse пишет и читает сообщения `Protocol Buffers` в
## AvroConfluent {#data-format-avro-confluent}
Для формата `AvroConfluent` ClickHouse поддерживает декодирование сообщений `Avro` с одним объектом. Такие сообщения используются с [Kafka] (http://kafka.apache.org/) и реестром схем [Confluent](https://docs.confluent.io/current/schema-registry/index.html).
Для формата `AvroConfluent` ClickHouse поддерживает декодирование сообщений `Avro` с одним объектом. Такие сообщения используются с [Kafka] (http://kafka.apache.org/) и реестром схем [Confluent](https://docs.confluent.io/current/schema-registry/index.html).
Каждое сообщение `Avro` содержит идентификатор схемы, который может быть разрешен для фактической схемы с помощью реестра схем.
......@@ -972,7 +977,7 @@ URL-адрес реестра схем настраивается с помощ
### Использование {#ispolzovanie}
Чтобы быстро проверить разрешение схемы, используйте [kafkacat](https://github.com/edenhill/kafkacat) с языком запросов [clickhouse-local](../operations/utilities/clickhouse-local.md):
Чтобы быстро проверить разрешение схемы, используйте [kafkacat](https://github.com/edenhill/kafkacat) с языком запросов [clickhouse-local](../operations/utilities/clickhouse-local.md):
``` bash
$ kafkacat -b kafka-broker -C -t topic1 -o beginning -f '%s' -c 3 | clickhouse-local --input-format AvroConfluent --format_avro_schema_registry_url 'http://schema-registry' -S "field1 Int64, field2 String" -q 'select * from table'
......
......@@ -509,6 +509,7 @@ namespace ErrorCodes
extern const int UNKNOWN_MYSQL_DATATYPES_SUPPORT_LEVEL = 543;
extern const int ROW_AND_ROWS_TOGETHER = 544;
extern const int FIRST_AND_NEXT_TOGETHER = 545;
extern const int NO_ROW_DELIMITER = 546;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;
......
......@@ -398,7 +398,12 @@ class IColumn;
M(Bool, force_optimize_skip_unused_shards_no_nested, false, "Obsolete setting, does nothing. Will be removed after 2020-12-01. Use force_optimize_skip_unused_shards_nesting instead.", 0) \
M(Bool, experimental_use_processors, true, "Obsolete setting, does nothing. Will be removed after 2020-11-29.", 0) \
M(Bool, optimize_trivial_insert_select, true, "Optimize trivial 'INSERT INTO table SELECT ... FROM TABLES' query", 0) \
M(Bool, allow_experimental_database_atomic, true, "Obsolete setting, does nothing. Will be removed after 2021-02-12", 0)
M(Bool, allow_experimental_database_atomic, true, "Obsolete setting, does nothing. Will be removed after 2021-02-12", 0) \
M(Bool, allow_non_metadata_alters, true, "Allow to execute alters which affects not only tables metadata, but also data on disk", 0) \
M(Bool, enable_global_with_statement, false, "Propagate WITH statements to UNION queries and all subqueries", 0)
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS below.
#define FORMAT_FACTORY_SETTINGS(M) \
M(Char, format_csv_delimiter, ',', "The character to be considered as a delimiter in CSV data. If setting with a string, a string has to have a length of 1.", 0) \
......@@ -463,9 +468,10 @@ class IColumn;
\
M(Bool, output_format_enable_streaming, false, "Enable streaming in output formats that support it.", 0) \
M(Bool, output_format_write_statistics, true, "Write statistics about read rows, bytes, time elapsed in suitable output formats.", 0) \
M(Bool, allow_non_metadata_alters, true, "Allow to execute alters which affects not only tables metadata, but also data on disk", 0) \
M(Bool, output_format_pretty_row_numbers, false, "Add row numbers before each row for pretty output format", 0) \
M(Bool, enable_global_with_statement, false, "Propagate WITH statements to UNION queries and all subqueries", 0) \
M(Bool, output_format_pretty_row_numbers, false, "Add row numbers before each row for pretty output format", 0)
// End of FORMAT_FACTORY_SETTINGS
// Please add settings non-related to formats into the COMMON_SETTINGS above.
#define LIST_OF_SETTINGS(M) \
COMMON_SETTINGS(M) \
......
......@@ -8,6 +8,7 @@
#include <DataStreams/ParallelParsingBlockInputStream.h>
#include <Formats/FormatSettings.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/Formats/OutputStreamToOutputFormat.h>
#include <DataStreams/NativeBlockInputStream.h>
......@@ -203,7 +204,7 @@ BlockInputStreamPtr FormatFactory::getInput(
BlockOutputStreamPtr FormatFactory::getOutput(
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback) const
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback, const bool ignore_no_row_delimiter) const
{
if (!getCreators(name).output_processor_creator)
{
......@@ -221,7 +222,7 @@ BlockOutputStreamPtr FormatFactory::getOutput(
output_getter(buf, sample, std::move(callback), format_settings), sample);
}
auto format = getOutputFormat(name, buf, sample, context, std::move(callback));
auto format = getOutputFormat(name, buf, sample, context, std::move(callback), ignore_no_row_delimiter);
return std::make_shared<MaterializingBlockOutputStream>(std::make_shared<OutputStreamToOutputFormat>(format), sample);
}
......@@ -260,7 +261,7 @@ InputFormatPtr FormatFactory::getInputFormat(
OutputFormatPtr FormatFactory::getOutputFormat(
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback) const
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback, const bool ignore_no_row_delimiter) const
{
const auto & output_getter = getCreators(name).output_processor_creator;
if (!output_getter)
......@@ -269,10 +270,14 @@ OutputFormatPtr FormatFactory::getOutputFormat(
const Settings & settings = context.getSettingsRef();
FormatSettings format_settings = getOutputFormatSetting(settings, context);
RowOutputFormatParams params;
params.ignore_no_row_delimiter = ignore_no_row_delimiter;
params.callback = std::move(callback);
/** TODO: Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns.
*/
auto format = output_getter(buf, sample, std::move(callback), format_settings);
auto format = output_getter(buf, sample, params, format_settings);
/// Enable auto-flush for streaming mode. Currently it is needed by INSERT WATCH query.
if (format_settings.enable_streaming)
......
......@@ -27,6 +27,7 @@ class IInputFormat;
class IOutputFormat;
struct RowInputFormatParams;
struct RowOutputFormatParams;
using InputFormatPtr = std::shared_ptr<IInputFormat>;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
......@@ -80,7 +81,7 @@ private:
using OutputProcessorCreator = std::function<OutputFormatPtr(
WriteBuffer & buf,
const Block & sample,
WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings)>;
struct Creators
......@@ -107,7 +108,7 @@ public:
ReadCallback callback = {}) const;
BlockOutputStreamPtr getOutput(const String & name, WriteBuffer & buf,
const Block & sample, const Context & context, WriteCallback callback = {}) const;
const Block & sample, const Context & context, WriteCallback callback = {}, const bool ignore_no_row_delimiter = false) const;
InputFormatPtr getInputFormat(
const String & name,
......@@ -118,7 +119,7 @@ public:
ReadCallback callback = {}) const;
OutputFormatPtr getOutputFormat(
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback = {}) const;
const String & name, WriteBuffer & buf, const Block & sample, const Context & context, WriteCallback callback = {}, const bool ignore_no_row_delimiter = false) const;
/// Register format by its name.
void registerInputFormat(const String & name, InputCreator input_creator);
......
......@@ -38,26 +38,29 @@ namespace
// Those inequations helps checking conditions in ProtobufReader::SimpleReader.
constexpr Int64 END_OF_VARINT = -1;
constexpr Int64 END_OF_GROUP = -2;
constexpr Int64 END_OF_FILE = -3;
Int64 decodeZigZag(UInt64 n) { return static_cast<Int64>((n >> 1) ^ (~(n & 1) + 1)); }
[[noreturn]] void throwUnknownFormat()
{
throw Exception("Protobuf messages are corrupted or don't match the provided schema. Please note that Protobuf stream is length-delimited: every message is prefixed by its length in varint.", ErrorCodes::UNKNOWN_PROTOBUF_FORMAT);
}
}
// SimpleReader is an utility class to deserialize protobufs.
// Knows nothing about protobuf schemas, just provides useful functions to deserialize data.
ProtobufReader::SimpleReader::SimpleReader(ReadBuffer & in_)
ProtobufReader::SimpleReader::SimpleReader(ReadBuffer & in_, const bool use_length_delimiters_)
: in(in_)
, cursor(0)
, current_message_level(0)
, current_message_end(0)
, field_end(0)
, last_string_pos(-1)
, use_length_delimiters(use_length_delimiters_)
{
}
[[noreturn]] void ProtobufReader::SimpleReader::throwUnknownFormat() const
{
throw Exception(std::string("Protobuf messages are corrupted or don't match the provided schema.") + (use_length_delimiters ? " Please note that Protobuf stream is length-delimited: every message is prefixed by its length in varint." : ""), ErrorCodes::UNKNOWN_PROTOBUF_FORMAT);
}
bool ProtobufReader::SimpleReader::startMessage()
......@@ -66,8 +69,16 @@ bool ProtobufReader::SimpleReader::startMessage()
assert(!current_message_level);
if (unlikely(in.eof()))
return false;
size_t size_of_message = readVarint();
current_message_end = cursor + size_of_message;
if (use_length_delimiters)
{
size_t size_of_message = readVarint();
current_message_end = cursor + size_of_message;
}
else
{
current_message_end = END_OF_FILE;
}
++current_message_level;
field_end = cursor;
return true;
......@@ -150,8 +161,23 @@ bool ProtobufReader::SimpleReader::readFieldNumber(UInt32 & field_number)
throwUnknownFormat();
}
if ((cursor >= current_message_end) && (current_message_end != END_OF_GROUP))
return false;
if (cursor >= current_message_end)
{
if (current_message_end == END_OF_FILE)
{
if (unlikely(in.eof()))
{
current_message_end = cursor;
return false;
}
}
else if (current_message_end == END_OF_GROUP)
{
/// We'll check for the `GROUP_END` marker later.
}
else
return false;
}
UInt64 varint = readVarint();
if (unlikely(varint & (static_cast<UInt64>(0xFFFFFFFF) << 32)))
......@@ -1077,8 +1103,8 @@ std::unique_ptr<ProtobufReader::IConverter> ProtobufReader::createConverter<goog
ProtobufReader::ProtobufReader(
ReadBuffer & in_, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names)
: simple_reader(in_)
ReadBuffer & in_, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names, const bool use_length_delimiters_)
: simple_reader(in_, use_length_delimiters_)
{
root_message = ProtobufColumnMatcher::matchColumns<ColumnMatcherTraits>(column_names, message_type);
setTraitsDataAfterMatchingColumns(root_message.get());
......
......@@ -37,7 +37,7 @@ using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
class ProtobufReader : private boost::noncopyable
{
public:
ProtobufReader(ReadBuffer & in_, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names);
ProtobufReader(ReadBuffer & in_, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names, const bool use_length_delimiters_);
~ProtobufReader();
/// Should be called when we start reading a new message.
......@@ -93,7 +93,7 @@ private:
class SimpleReader
{
public:
SimpleReader(ReadBuffer & in_);
SimpleReader(ReadBuffer & in_, const bool use_length_delimiters_);
bool startMessage();
void endMessage(bool ignore_errors);
void startNestedMessage();
......@@ -126,6 +126,7 @@ private:
UInt64 continueReadingVarint(UInt64 first_byte);
void ignoreVarint();
void ignoreGroup();
[[noreturn]] void throwUnknownFormat() const;
ReadBuffer & in;
Int64 cursor;
......@@ -134,6 +135,7 @@ private:
std::vector<Int64> parent_message_ends;
Int64 field_end;
Int64 last_string_pos;
const bool use_length_delimiters;
};
class IConverter
......
......@@ -123,7 +123,11 @@ namespace
// SimpleWriter is an utility class to serialize protobufs.
// Knows nothing about protobuf schemas, just provides useful functions to serialize data.
ProtobufWriter::SimpleWriter::SimpleWriter(WriteBuffer & out_) : out(out_), current_piece_start(0), num_bytes_skipped(0)
ProtobufWriter::SimpleWriter::SimpleWriter(WriteBuffer & out_, const bool use_length_delimiters_)
: out(out_)
, current_piece_start(0)
, num_bytes_skipped(0)
, use_length_delimiters(use_length_delimiters_)
{
}
......@@ -136,8 +140,11 @@ void ProtobufWriter::SimpleWriter::startMessage()
void ProtobufWriter::SimpleWriter::endMessage()
{
pieces.emplace_back(current_piece_start, buffer.size());
size_t size_of_message = buffer.size() - num_bytes_skipped;
writeVarint(size_of_message, out);
if (use_length_delimiters)
{
size_t size_of_message = buffer.size() - num_bytes_skipped;
writeVarint(size_of_message, out);
}
for (const auto & piece : pieces)
if (piece.end > piece.start)
out.write(reinterpret_cast<char *>(&buffer[piece.start]), piece.end - piece.start);
......@@ -827,8 +834,8 @@ std::unique_ptr<ProtobufWriter::IConverter> ProtobufWriter::createConverter<goog
ProtobufWriter::ProtobufWriter(
WriteBuffer & out, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names)
: simple_writer(out)
WriteBuffer & out, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names, const bool use_length_delimiters_)
: simple_writer(out, use_length_delimiters_)
{
std::vector<const google::protobuf::FieldDescriptor *> field_descriptors_without_match;
root_message = ProtobufColumnMatcher::matchColumns<ColumnMatcherTraits>(column_names, message_type, field_descriptors_without_match);
......
......@@ -37,7 +37,7 @@ using ConstAggregateDataPtr = const char *;
class ProtobufWriter : private boost::noncopyable
{
public:
ProtobufWriter(WriteBuffer & out, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names);
ProtobufWriter(WriteBuffer & out, const google::protobuf::Descriptor * message_type, const std::vector<String> & column_names, const bool use_length_delimiters_);
~ProtobufWriter();
/// Should be called at the beginning of writing a message.
......@@ -89,7 +89,7 @@ private:
class SimpleWriter
{
public:
SimpleWriter(WriteBuffer & out_);
SimpleWriter(WriteBuffer & out_, const bool use_length_delimiters_);
~SimpleWriter();
void startMessage();
......@@ -138,6 +138,7 @@ private:
size_t current_piece_start;
size_t num_bytes_skipped;
std::vector<NestedInfo> nested_infos;
const bool use_length_delimiters;
};
class IConverter
......
......@@ -38,13 +38,14 @@ try
FormatSettings format_settings;
RowInputFormatParams params{DEFAULT_INSERT_BLOCK_SIZE, 0, 0, []{}};
RowInputFormatParams in_params{DEFAULT_INSERT_BLOCK_SIZE, 0, 0, []{}};
RowOutputFormatParams out_params{[](const Columns & /* columns */, size_t /* row */){},false};
InputFormatPtr input_format = std::make_shared<TabSeparatedRowInputFormat>(sample, in_buf, params, false, false, format_settings);
InputFormatPtr input_format = std::make_shared<TabSeparatedRowInputFormat>(sample, in_buf, in_params, false, false, format_settings);
BlockInputStreamPtr block_input = std::make_shared<InputStreamFromInputFormat>(std::move(input_format));
BlockOutputStreamPtr block_output = std::make_shared<OutputStreamToOutputFormat>(
std::make_shared<TabSeparatedRowOutputFormat>(out_buf, sample, false, false, [](const Columns & /* columns */, size_t /* row */){}, format_settings));
std::make_shared<TabSeparatedRowOutputFormat>(out_buf, sample, false, false, out_params, format_settings));
copyData(*block_input, *block_output);
return 0;
......
......@@ -60,7 +60,6 @@ public:
*/
virtual ~WriteBuffer() {}
inline void nextIfAtEnd()
{
if (!hasPendingData())
......
......@@ -21,12 +21,13 @@ void IRowOutputFormat::consume(DB::Chunk chunk)
{
if (!first_row)
writeRowBetweenDelimiter();
first_row = false;
write(columns, row);
if (write_single_row_callback)
write_single_row_callback(columns, row);
if (params.callback)
params.callback(columns, row);
first_row = false;
}
}
......
......@@ -9,6 +9,22 @@
namespace DB
{
struct RowOutputFormatParams
{
using WriteCallback = std::function<void(const Columns & columns,size_t row)>;
// Callback used to indicate that another row is written.
WriteCallback callback;
/**
* some buffers (kafka / rabbit) split the rows internally using callback
* so we can push there formats without framing / delimiters
* (like ProtobufSingle). In other cases you can't write more than single row
* in unframed format.
*/
bool ignore_no_row_delimiter = false;
};
class WriteBuffer;
/** Output format that writes data row by row.
......@@ -17,6 +33,7 @@ class IRowOutputFormat : public IOutputFormat
{
protected:
DataTypes types;
bool first_row = true;
void consume(Chunk chunk) override;
void consumeTotals(Chunk chunk) override;
......@@ -24,8 +41,10 @@ protected:
void finalize() override;
public:
IRowOutputFormat(const Block & header, WriteBuffer & out_, FormatFactory::WriteCallback callback)
: IOutputFormat(header, out_), types(header.getDataTypes()), write_single_row_callback(callback)
using Params = RowOutputFormatParams;
IRowOutputFormat(const Block & header, WriteBuffer & out_, const Params & params_)
: IOutputFormat(header, out_), types(header.getDataTypes()), params(params_)
{
}
......@@ -55,12 +74,10 @@ public:
virtual void writeLastSuffix() {} /// Write something after resultset, totals end extremes.
private:
bool first_row = true;
bool prefix_written = false;
bool suffix_written = false;
// Callback used to indicate that another row is written.
FormatFactory::WriteCallback write_single_row_callback;
Params params;
void writePrefixIfNot()
{
......
......@@ -79,7 +79,7 @@ void registerOutputFormatProcessorArrow(FormatFactory & factory)
"Arrow",
[](WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, false, format_settings);
......@@ -89,7 +89,7 @@ void registerOutputFormatProcessorArrow(FormatFactory & factory)
"ArrowStream",
[](WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, true, format_settings);
......
......@@ -347,8 +347,8 @@ static avro::Codec getCodec(const std::string & codec_name)
}
AvroRowOutputFormat::AvroRowOutputFormat(
WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & settings_)
: IRowOutputFormat(header_, out_, callback)
WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & settings_)
: IRowOutputFormat(header_, out_, params_)
, settings(settings_)
, serializer(header_.getColumnsWithTypeAndName())
, file_writer(
......@@ -383,10 +383,10 @@ void registerOutputFormatProcessorAvro(FormatFactory & factory)
factory.registerOutputFormatProcessor("Avro", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<AvroRowOutputFormat>(buf, sample, callback, settings);
return std::make_shared<AvroRowOutputFormat>(buf, sample, params, settings);
});
}
......
......@@ -43,7 +43,7 @@ private:
class AvroRowOutputFormat : public IRowOutputFormat
{
public:
AvroRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & settings_);
AvroRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & settings_);
virtual ~AvroRowOutputFormat() override;
String getName() const override { return "AvroRowOutputFormat"; }
......
......@@ -9,8 +9,8 @@
namespace DB
{
BinaryRowOutputFormat::BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_, FormatFactory::WriteCallback callback)
: IRowOutputFormat(header, out_, callback), with_names(with_names_), with_types(with_types_)
BinaryRowOutputFormat::BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_, const RowOutputFormatParams & params_)
: IRowOutputFormat(header, out_, params_), with_names(with_names_), with_types(with_types_)
{
}
......@@ -52,19 +52,19 @@ void registerOutputFormatProcessorRowBinary(FormatFactory & factory)
factory.registerOutputFormatProcessor("RowBinary", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings &)
{
return std::make_shared<BinaryRowOutputFormat>(buf, sample, false, false, callback);
return std::make_shared<BinaryRowOutputFormat>(buf, sample, false, false, params);
});
factory.registerOutputFormatProcessor("RowBinaryWithNamesAndTypes", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings &)
{
return std::make_shared<BinaryRowOutputFormat>(buf, sample, true, true, callback);
return std::make_shared<BinaryRowOutputFormat>(buf, sample, true, true, params);
});
}
......
......@@ -17,7 +17,7 @@ class WriteBuffer;
class BinaryRowOutputFormat: public IRowOutputFormat
{
public:
BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_, FormatFactory::WriteCallback callback);
BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_, const RowOutputFormatParams & params_);
String getName() const override { return "BinaryRowOutputFormat"; }
......
......@@ -8,8 +8,8 @@ namespace DB
{
CSVRowOutputFormat::CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, callback), with_names(with_names_), format_settings(format_settings_)
CSVRowOutputFormat::CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), with_names(with_names_), format_settings(format_settings_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
size_t columns = sample.columns();
......@@ -77,10 +77,10 @@ void registerOutputFormatProcessorCSV(FormatFactory & factory)
factory.registerOutputFormatProcessor(with_names ? "CSVWithNames" : "CSV", [=](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<CSVRowOutputFormat>(buf, sample, with_names, callback, format_settings);
return std::make_shared<CSVRowOutputFormat>(buf, sample, with_names, params, format_settings);
});
}
}
......
......@@ -20,7 +20,7 @@ public:
/** with_names - output in the first line a header with column names
* with_types - output in the next line header with the names of the types
*/
CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_);
CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_);
String getName() const override { return "CSVRowOutputFormat"; }
......
......@@ -10,11 +10,11 @@ namespace DB
JSONCompactEachRowRowOutputFormat::JSONCompactEachRowRowOutputFormat(WriteBuffer & out_,
const Block & header_,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool with_names_,
bool yield_strings_)
: IRowOutputFormat(header_, out_, callback), settings(settings_), with_names(with_names_), yield_strings(yield_strings_)
: IRowOutputFormat(header_, out_, params_), settings(settings_), with_names(with_names_), yield_strings(yield_strings_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
NamesAndTypesList columns(sample.getNamesAndTypesList());
......@@ -103,37 +103,37 @@ void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory)
factory.registerOutputFormatProcessor("JSONCompactEachRow", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, callback, format_settings, false, false);
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, params, format_settings, false, false);
});
factory.registerOutputFormatProcessor("JSONCompactEachRowWithNamesAndTypes", [](
WriteBuffer &buf,
const Block &sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings &format_settings)
{
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, callback, format_settings, true, false);
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, params, format_settings, true, false);
});
factory.registerOutputFormatProcessor("JSONCompactStringsEachRow", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, callback, format_settings, false, true);
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, params, format_settings, false, true);
});
factory.registerOutputFormatProcessor("JSONCompactStringsEachRowWithNamesAndTypes", [](
WriteBuffer &buf,
const Block &sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings &format_settings)
{
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, callback, format_settings, true, true);
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, params, format_settings, true, true);
});
}
......
......@@ -18,7 +18,7 @@ public:
JSONCompactEachRowRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool with_names_,
bool yield_strings_);
......
......@@ -10,10 +10,10 @@ namespace DB
JSONCompactRowOutputFormat::JSONCompactRowOutputFormat(
WriteBuffer & out_,
const Block & header,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool yield_strings_)
: JSONRowOutputFormat(out_, header, callback, settings_, yield_strings_)
: JSONRowOutputFormat(out_, header, params_, settings_, yield_strings_)
{
}
......@@ -93,19 +93,19 @@ void registerOutputFormatProcessorJSONCompact(FormatFactory & factory)
factory.registerOutputFormatProcessor("JSONCompact", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, callback, format_settings, false);
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, params, format_settings, false);
});
factory.registerOutputFormatProcessor("JSONCompactStrings", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, callback, format_settings, true);
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, params, format_settings, true);
});
}
......
......@@ -19,7 +19,7 @@ public:
JSONCompactRowOutputFormat(
WriteBuffer & out_,
const Block & header,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool yield_strings_);
......
......@@ -11,10 +11,10 @@ namespace DB
JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool yield_strings_)
: IRowOutputFormat(header_, out_, callback), settings(settings_), yield_strings(yield_strings_)
: IRowOutputFormat(header_, out_, params_), settings(settings_), yield_strings(yield_strings_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
size_t columns = sample.columns();
......@@ -71,19 +71,19 @@ void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory)
factory.registerOutputFormatProcessor("JSONEachRow", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, callback, format_settings, false);
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, params, format_settings, false);
});
factory.registerOutputFormatProcessor("JSONStringsEachRow", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, callback, format_settings, true);
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, params, format_settings, true);
});
}
......
......@@ -18,7 +18,7 @@ public:
JSONEachRowRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool yield_strings_);
......
......@@ -33,19 +33,19 @@ void registerOutputFormatProcessorJSONEachRowWithProgress(FormatFactory & factor
factory.registerOutputFormatProcessor("JSONEachRowWithProgress", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf, sample, callback, format_settings, false);
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf, sample, params, format_settings, false);
});
factory.registerOutputFormatProcessor("JSONStringsEachRowWithProgress", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf, sample, callback, format_settings, true);
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf, sample, params, format_settings, true);
});
}
......
......@@ -10,10 +10,10 @@ namespace DB
JSONRowOutputFormat::JSONRowOutputFormat(
WriteBuffer & out_,
const Block & header,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool yield_strings_)
: IRowOutputFormat(header, out_, callback), settings(settings_), yield_strings(yield_strings_)
: IRowOutputFormat(header, out_, params_), settings(settings_), yield_strings(yield_strings_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
NamesAndTypesList columns(sample.getNamesAndTypesList());
......@@ -271,19 +271,19 @@ void registerOutputFormatProcessorJSON(FormatFactory & factory)
factory.registerOutputFormatProcessor("JSON", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONRowOutputFormat>(buf, sample, callback, format_settings, false);
return std::make_shared<JSONRowOutputFormat>(buf, sample, params, format_settings, false);
});
factory.registerOutputFormatProcessor("JSONStrings", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONRowOutputFormat>(buf, sample, callback, format_settings, true);
return std::make_shared<JSONRowOutputFormat>(buf, sample, params, format_settings, true);
});
}
......
......@@ -19,7 +19,7 @@ public:
JSONRowOutputFormat(
WriteBuffer & out_,
const Block & header,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params_,
const FormatSettings & settings_,
bool yield_strings_);
......
......@@ -5,8 +5,8 @@
namespace DB
{
MarkdownRowOutputFormat::MarkdownRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, callback), format_settings(format_settings_) {}
MarkdownRowOutputFormat::MarkdownRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), format_settings(format_settings_) {}
void MarkdownRowOutputFormat::writePrefix()
{
......@@ -60,10 +60,10 @@ void registerOutputFormatProcessorMarkdown(FormatFactory & factory)
factory.registerOutputFormatProcessor("Markdown", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<MarkdownRowOutputFormat>(buf, sample, callback, settings);
return std::make_shared<MarkdownRowOutputFormat>(buf, sample, params, settings);
});
}
......
......@@ -12,7 +12,7 @@ class ReadBuffer;
class MarkdownRowOutputFormat : public IRowOutputFormat
{
public:
MarkdownRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_);
MarkdownRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_);
/// Write higher part of markdown table like this:
/// |columnName1|columnName2|...|columnNameN|
......
......@@ -24,8 +24,8 @@ namespace ErrorCodes
extern const int ILLEGAL_COLUMN;
}
MsgPackRowOutputFormat::MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback)
: IRowOutputFormat(header_, out_, callback), packer(out_) {}
MsgPackRowOutputFormat::MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_)
: IRowOutputFormat(header_, out_, params_), packer(out_) {}
void MsgPackRowOutputFormat::serializeField(const IColumn & column, DataTypePtr data_type, size_t row_num)
{
......@@ -154,10 +154,10 @@ void registerOutputFormatProcessorMsgPack(FormatFactory & factory)
factory.registerOutputFormatProcessor("MsgPack", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings &)
{
return std::make_shared<MsgPackRowOutputFormat>(buf, sample, callback);
return std::make_shared<MsgPackRowOutputFormat>(buf, sample, params);
});
}
......
......@@ -20,7 +20,7 @@ namespace DB
class MsgPackRowOutputFormat : public IRowOutputFormat
{
public:
MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback);
MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_);
String getName() const override { return "MsgPackRowOutputFormat"; }
......
......@@ -95,7 +95,7 @@ void registerOutputFormatProcessorMySQLWire(FormatFactory & factory)
"MySQLWire",
[](WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & settings) { return std::make_shared<MySQLOutputFormat>(buf, sample, settings); });
}
......
......@@ -168,7 +168,7 @@ void registerOutputFormatProcessorNative(FormatFactory & factory)
factory.registerOutputFormatProcessor("Native", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings &)
{
return std::make_shared<NativeOutputFormatFromNativeBlockOutputStream>(sample, buf);
......
......@@ -21,7 +21,7 @@ void registerOutputFormatProcessorNull(FormatFactory & factory)
factory.registerOutputFormatProcessor("Null", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings &)
{
return std::make_shared<NullOutputFormat>(sample, buf);
......
......@@ -107,7 +107,7 @@ void ODBCDriver2BlockOutputFormat::writePrefix()
void registerOutputFormatProcessorODBCDriver2(FormatFactory & factory)
{
factory.registerOutputFormatProcessor(
"ODBCDriver2", [](WriteBuffer & buf, const Block & sample, FormatFactory::WriteCallback, const FormatSettings & format_settings)
"ODBCDriver2", [](WriteBuffer & buf, const Block & sample, const RowOutputFormatParams &, const FormatSettings & format_settings)
{
return std::make_shared<ODBCDriver2BlockOutputFormat>(buf, sample, format_settings);
});
......
......@@ -433,7 +433,7 @@ void registerOutputFormatProcessorORC(FormatFactory & factory)
factory.registerOutputFormatProcessor("ORC", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<ORCBlockOutputFormat>(buf, sample, format_settings);
......
......@@ -84,7 +84,7 @@ void registerOutputFormatProcessorParquet(FormatFactory & factory)
"Parquet",
[](WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
auto impl = std::make_shared<ParquetBlockOutputFormat>(buf, sample, format_settings);
......
......@@ -73,7 +73,7 @@ void registerOutputFormatProcessorPostgreSQLWire(FormatFactory & factory)
"PostgreSQLWire",
[](WriteBuffer & buf,
const Block & sample,
const FormatFactory::WriteCallback &,
const RowOutputFormatParams &,
const FormatSettings & settings) { return std::make_shared<PostgreSQLOutputFormat>(buf, sample, settings); });
}
}
......@@ -403,7 +403,7 @@ void registerOutputFormatProcessorPretty(FormatFactory & factory)
factory.registerOutputFormatProcessor("Pretty", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<PrettyBlockOutputFormat>(buf, sample, format_settings);
......@@ -412,7 +412,7 @@ void registerOutputFormatProcessorPretty(FormatFactory & factory)
factory.registerOutputFormatProcessor("PrettyNoEscapes", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
FormatSettings changed_settings = format_settings;
......
......@@ -259,7 +259,7 @@ void registerOutputFormatProcessorPrettyCompact(FormatFactory & factory)
factory.registerOutputFormatProcessor(name, [mono_block = mono_block](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<PrettyCompactBlockOutputFormat>(buf, sample, format_settings, mono_block);
......@@ -269,7 +269,7 @@ void registerOutputFormatProcessorPrettyCompact(FormatFactory & factory)
factory.registerOutputFormatProcessor("PrettyCompactNoEscapes", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
FormatSettings changed_settings = format_settings;
......
......@@ -113,7 +113,7 @@ void registerOutputFormatProcessorPrettySpace(FormatFactory & factory)
factory.registerOutputFormatProcessor("PrettySpace", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<PrettySpaceBlockOutputFormat>(buf, sample, format_settings);
......@@ -122,7 +122,7 @@ void registerOutputFormatProcessorPrettySpace(FormatFactory & factory)
factory.registerOutputFormatProcessor("PrettySpaceNoEscapes", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
FormatSettings changed_settings = format_settings;
......
......@@ -11,10 +11,10 @@
namespace DB
{
ProtobufRowInputFormat::ProtobufRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSchemaInfo & info_)
ProtobufRowInputFormat::ProtobufRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSchemaInfo & info_, const bool use_length_delimiters_)
: IRowInputFormat(header_, in_, params_)
, data_types(header_.getDataTypes())
, reader(in, ProtobufSchemas::instance().getMessageTypeForFormatSchema(info_), header_.getNames())
, reader(in, ProtobufSchemas::instance().getMessageTypeForFormatSchema(info_), header_.getNames(), use_length_delimiters_)
{
}
......@@ -67,16 +67,20 @@ void ProtobufRowInputFormat::syncAfterError()
void registerInputFormatProcessorProtobuf(FormatFactory & factory)
{
factory.registerInputFormatProcessor("Protobuf", [](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
for (bool use_length_delimiters : {false, true})
{
return std::make_shared<ProtobufRowInputFormat>(buf, sample, std::move(params),
FormatSchemaInfo(settings.schema.format_schema, "Protobuf", true,
settings.schema.is_server, settings.schema.format_schema_path));
});
factory.registerInputFormatProcessor(use_length_delimiters ? "Protobuf" : "ProtobufSingle", [use_length_delimiters](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<ProtobufRowInputFormat>(buf, sample, std::move(params),
FormatSchemaInfo(settings.schema.format_schema, "Protobuf", true,
settings.schema.is_server, settings.schema.format_schema_path),
use_length_delimiters);
});
}
}
}
......
......@@ -16,17 +16,20 @@ class FormatSchemaInfo;
/** Stream designed to deserialize data from the google protobuf format.
* Each row is read as a separated message.
* These messages are delimited according to documentation
* One Protobuf message is parsed as one row of data.
*
* Input buffer may contain single protobuf message (use_length_delimiters_ = false),
* or any number of messages (use_length_delimiters = true). In the second case
* parser assumes messages are length-delimited according to documentation
* https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/util/delimited_message_util.h
* Serializing in the protobuf format requires the 'format_schema' setting to be set, e.g.
* Parsing of the protobuf format requires the 'format_schema' setting to be set, e.g.
* INSERT INTO table FORMAT Protobuf SETTINGS format_schema = 'schema:Message'
* where schema is the name of "schema.proto" file specifying protobuf schema.
*/
class ProtobufRowInputFormat : public IRowInputFormat
{
public:
ProtobufRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSchemaInfo & info_);
ProtobufRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSchemaInfo & info_, const bool use_length_delimiters_);
~ProtobufRowInputFormat() override;
String getName() const override { return "ProtobufRowInputFormat"; }
......
......@@ -14,23 +14,31 @@ namespace DB
{
namespace ErrorCodes
{
extern const int NO_ROW_DELIMITER;
}
ProtobufRowOutputFormat::ProtobufRowOutputFormat(
WriteBuffer & out_,
const Block & header,
FormatFactory::WriteCallback callback,
const FormatSchemaInfo & format_schema)
: IRowOutputFormat(header, out_, callback)
const RowOutputFormatParams & params_,
const FormatSchemaInfo & format_schema,
const bool use_length_delimiters_)
: IRowOutputFormat(header, out_, params_)
, data_types(header.getDataTypes())
, writer(out, ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema), header.getNames())
, writer(out, ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema), header.getNames(), use_length_delimiters_)
, throw_on_multiple_rows_undelimited(!use_length_delimiters_ && !params_.ignore_no_row_delimiter)
{
value_indices.resize(header.columns());
}
void ProtobufRowOutputFormat::write(const Columns & columns, size_t row_num)
{
if (throw_on_multiple_rows_undelimited && !first_row)
{
throw Exception("The ProtobufSingle format can't be used to write multiple rows because this format doesn't have any row delimiter.", ErrorCodes::NO_ROW_DELIMITER);
}
writer.startMessage();
std::fill(value_indices.begin(), value_indices.end(), 0);
size_t column_index;
......@@ -43,17 +51,21 @@ void ProtobufRowOutputFormat::write(const Columns & columns, size_t row_num)
void registerOutputFormatProcessorProtobuf(FormatFactory & factory)
{
factory.registerOutputFormatProcessor(
"Protobuf",
[](WriteBuffer & buf,
const Block & header,
FormatFactory::WriteCallback callback,
const FormatSettings & settings)
{
return std::make_shared<ProtobufRowOutputFormat>(buf, header, std::move(callback),
FormatSchemaInfo(settings.schema.format_schema, "Protobuf", true,
settings.schema.is_server, settings.schema.format_schema_path));
});
for (bool use_length_delimiters : {false, true})
{
factory.registerOutputFormatProcessor(
use_length_delimiters ? "Protobuf" : "ProtobufSingle",
[use_length_delimiters](WriteBuffer & buf,
const Block & header,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<ProtobufRowOutputFormat>(buf, header, params,
FormatSchemaInfo(settings.schema.format_schema, "Protobuf", true,
settings.schema.is_server, settings.schema.format_schema_path),
use_length_delimiters);
});
}
}
}
......
......@@ -25,7 +25,9 @@ namespace DB
{
/** Stream designed to serialize data in the google protobuf format.
* Each row is written as a separated message.
* These messages are delimited according to documentation
*
* With use_length_delimiters=0 it can write only single row as plain protobuf message,
* otherwise Protobuf messages are delimited according to documentation
* https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/util/delimited_message_util.h
* Serializing in the protobuf format requires the 'format_schema' setting to be set, e.g.
* SELECT * from table FORMAT Protobuf SETTINGS format_schema = 'schema:Message'
......@@ -37,8 +39,9 @@ public:
ProtobufRowOutputFormat(
WriteBuffer & out_,
const Block & header,
FormatFactory::WriteCallback callback,
const FormatSchemaInfo & format_schema);
const RowOutputFormatParams & params_,
const FormatSchemaInfo & format_schema,
const bool use_length_delimiters_);
String getName() const override { return "ProtobufRowOutputFormat"; }
......@@ -50,6 +53,7 @@ private:
DataTypes data_types;
ProtobufWriter writer;
std::vector<size_t> value_indices;
const bool throw_on_multiple_rows_undelimited;
};
}
......
......@@ -9,8 +9,8 @@ namespace DB
RawBLOBRowOutputFormat::RawBLOBRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
FormatFactory::WriteCallback callback)
: IRowOutputFormat(header_, out_, callback)
const RowOutputFormatParams & params_)
: IRowOutputFormat(header_, out_, params_)
{
}
......@@ -27,10 +27,10 @@ void registerOutputFormatProcessorRawBLOB(FormatFactory & factory)
factory.registerOutputFormatProcessor("RawBLOB", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings &)
{
return std::make_shared<RawBLOBRowOutputFormat>(buf, sample, callback);
return std::make_shared<RawBLOBRowOutputFormat>(buf, sample, params);
});
}
......
......@@ -30,7 +30,7 @@ public:
RawBLOBRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
FormatFactory::WriteCallback callback);
const RowOutputFormatParams & params_);
String getName() const override { return "RawBLOBRowOutputFormat"; }
......
......@@ -7,8 +7,8 @@
namespace DB
{
TSKVRowOutputFormat::TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_)
: TabSeparatedRowOutputFormat(out_, header, false, false, callback, format_settings_)
TSKVRowOutputFormat::TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: TabSeparatedRowOutputFormat(out_, header, false, false, params_, format_settings_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
NamesAndTypesList columns(sample.getNamesAndTypesList());
......@@ -44,10 +44,10 @@ void registerOutputFormatProcessorTSKV(FormatFactory & factory)
factory.registerOutputFormatProcessor("TSKV", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<TSKVRowOutputFormat>(buf, sample, callback, settings);
return std::make_shared<TSKVRowOutputFormat>(buf, sample, params, settings);
});
}
......
......@@ -14,7 +14,7 @@ namespace DB
class TSKVRowOutputFormat: public TabSeparatedRowOutputFormat
{
public:
TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, FormatFactory::WriteCallback callback, const FormatSettings & format_settings);
TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, const RowOutputFormatParams & params_, const FormatSettings & format_settings);
String getName() const override { return "TSKVRowOutputFormat"; }
......
......@@ -18,9 +18,9 @@ public:
const Block & header_,
bool with_names_,
bool with_types_,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params_,
const FormatSettings & format_settings_)
: TabSeparatedRowOutputFormat(out_, header_, with_names_, with_types_, callback, format_settings_)
: TabSeparatedRowOutputFormat(out_, header_, with_names_, with_types_, params_, format_settings_)
{
}
......
......@@ -11,9 +11,9 @@ TabSeparatedRowOutputFormat::TabSeparatedRowOutputFormat(
const Block & header_,
bool with_names_,
bool with_types_,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params_,
const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, callback), with_names(with_names_), with_types(with_types_), format_settings(format_settings_)
: IRowOutputFormat(header_, out_, params_), with_names(with_names_), with_types(with_types_), format_settings(format_settings_)
{
}
......@@ -80,10 +80,10 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory)
factory.registerOutputFormatProcessor(name, [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, false, false, callback, settings);
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, false, false, params, settings);
});
}
......@@ -92,10 +92,10 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory)
factory.registerOutputFormatProcessor(name, [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<TabSeparatedRawRowOutputFormat>(buf, sample, false, false, callback, settings);
return std::make_shared<TabSeparatedRawRowOutputFormat>(buf, sample, false, false, params, settings);
});
}
......@@ -104,10 +104,10 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory)
factory.registerOutputFormatProcessor(name, [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, true, false, callback, settings);
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, true, false, params, settings);
});
}
......@@ -116,10 +116,10 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory)
factory.registerOutputFormatProcessor(name, [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, true, true, callback, settings);
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, true, true, params, settings);
});
}
}
......
......@@ -23,7 +23,7 @@ public:
const Block & header_,
bool with_names_,
bool with_types_,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params_,
const FormatSettings & format_settings_);
String getName() const override { return "TabSeparatedRowOutputFormat"; }
......
......@@ -232,7 +232,7 @@ void registerOutputFormatProcessorTemplate(FormatFactory & factory)
factory.registerOutputFormatProcessor("Template", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & settings)
{
ParsedTemplateFormatString resultset_format;
......@@ -270,7 +270,7 @@ void registerOutputFormatProcessorTemplate(FormatFactory & factory)
factory.registerOutputFormatProcessor("CustomSeparated", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & settings)
{
ParsedTemplateFormatString resultset_format = ParsedTemplateFormatString::setupCustomSeparatedResultsetFormat(settings.custom);
......
......@@ -10,8 +10,8 @@ namespace DB
{
ValuesRowOutputFormat::ValuesRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, callback), format_settings(format_settings_)
ValuesRowOutputFormat::ValuesRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), format_settings(format_settings_)
{
}
......@@ -46,10 +46,10 @@ void registerOutputFormatProcessorValues(FormatFactory & factory)
factory.registerOutputFormatProcessor("Values", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<ValuesRowOutputFormat>(buf, sample, callback, settings);
return std::make_shared<ValuesRowOutputFormat>(buf, sample, params, settings);
});
}
......
......@@ -15,7 +15,7 @@ class WriteBuffer;
class ValuesRowOutputFormat : public IRowOutputFormat
{
public:
ValuesRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_);
ValuesRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_);
String getName() const override { return "ValuesRowOutputFormat"; }
......
......@@ -11,8 +11,8 @@ namespace DB
{
VerticalRowOutputFormat::VerticalRowOutputFormat(
WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, callback), format_settings(format_settings_)
WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), format_settings(format_settings_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
size_t columns = sample.columns();
......@@ -168,10 +168,10 @@ void registerOutputFormatProcessorVertical(FormatFactory & factory)
factory.registerOutputFormatProcessor("Vertical", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<VerticalRowOutputFormat>(buf, sample, callback, settings);
return std::make_shared<VerticalRowOutputFormat>(buf, sample, params, settings);
});
}
......
......@@ -18,7 +18,7 @@ class Context;
class VerticalRowOutputFormat : public IRowOutputFormat
{
public:
VerticalRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_);
VerticalRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_);
String getName() const override { return "VerticalRowOutputFormat"; }
......
......@@ -7,8 +7,8 @@
namespace DB
{
XMLRowOutputFormat::XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, callback), format_settings(format_settings_)
XMLRowOutputFormat::XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params_), format_settings(format_settings_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
NamesAndTypesList columns(sample.getNamesAndTypesList());
......@@ -245,10 +245,10 @@ void registerOutputFormatProcessorXML(FormatFactory & factory)
factory.registerOutputFormatProcessor("XML", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<XMLRowOutputFormat>(buf, sample, callback, settings);
return std::make_shared<XMLRowOutputFormat>(buf, sample, params, settings);
});
}
......
......@@ -16,7 +16,7 @@ namespace DB
class XMLRowOutputFormat : public IRowOutputFormat
{
public:
XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_);
XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & format_settings_);
String getName() const override { return "XMLRowOutputFormat"; }
......
......@@ -32,7 +32,13 @@ void KafkaBlockOutputStream::writePrefix()
if (!buffer)
throw Exception("Failed to create Kafka producer!", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
child = FormatFactory::instance().getOutput(storage.getFormatName(), *buffer, getHeader(), *context, [this](const Columns & columns, size_t row){ buffer->countRow(columns, row); });
child = FormatFactory::instance().getOutput(
storage.getFormatName(), *buffer, getHeader(), *context, [this](const Columns & columns, size_t row)
{
buffer->countRow(columns, row);
},
/* ignore_no_row_delimiter = */ true
);
}
void KafkaBlockOutputStream::write(const Block & block)
......
......@@ -46,7 +46,9 @@ void RabbitMQBlockOutputStream::writePrefix()
storage.getFormatName(), *buffer, getHeader(), context, [this](const Columns & /* columns */, size_t /* rows */)
{
buffer->countRow();
});
},
/* ignore_no_row_delimiter = */ true
);
}
......
......@@ -103,6 +103,17 @@ def kafka_produce_protobuf_messages(topic, start_index, num_messages):
producer.flush()
print(("Produced {} messages for topic {}".format(num_messages, topic)))
def kafka_produce_protobuf_messages_no_delimeters(topic, start_index, num_messages):
data = ''
producer = KafkaProducer(bootstrap_servers="localhost:9092")
for i in range(start_index, start_index + num_messages):
msg = kafka_pb2.KeyValuePair()
msg.key = i
msg.value = str(i)
serialized_msg = msg.SerializeToString()
producer.send(topic=topic, value=serialized_msg)
producer.flush()
print("Produced {} messages for topic {}".format(num_messages, topic))
def avro_confluent_message(schema_registry_client, value):
# type: (CachedSchemaRegistryClient, dict) -> str
......@@ -971,6 +982,55 @@ def test_kafka_protobuf(kafka_cluster):
kafka_check_result(result, True)
@pytest.mark.timeout(30)
def test_kafka_protobuf_no_delimiter(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'pb_no_delimiter',
kafka_group_name = 'pb_no_delimiter',
kafka_format = 'ProtobufSingle',
kafka_schema = 'kafka.proto:KeyValuePair';
''')
kafka_produce_protobuf_messages_no_delimeters('pb_no_delimiter', 0, 20)
kafka_produce_protobuf_messages_no_delimeters('pb_no_delimiter', 20, 1)
kafka_produce_protobuf_messages_no_delimeters('pb_no_delimiter', 21, 29)
result = ''
while True:
result += instance.query('SELECT * FROM test.kafka', ignore_error=True)
if kafka_check_result(result):
break
kafka_check_result(result, True)
instance.query('''
CREATE TABLE test.kafka_writer (key UInt64, value String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'pb_no_delimiter',
kafka_group_name = 'pb_no_delimiter',
kafka_format = 'ProtobufSingle',
kafka_schema = 'kafka.proto:KeyValuePair';
''')
instance.query("INSERT INTO test.kafka_writer VALUES (13,'Friday'),(42,'Answer to the Ultimate Question of Life, the Universe, and Everything'), (110, 'just a number')")
time.sleep(1)
result = instance.query("SELECT * FROM test.kafka ORDER BY key", ignore_error=True)
expected = '''\
13 Friday
42 Answer to the Ultimate Question of Life, the Universe, and Everything
110 just a number
'''
assert TSV(result) == TSV(expected)
@pytest.mark.timeout(180)
def test_kafka_materialized_view(kafka_cluster):
instance.query('''
......
......@@ -8,4 +8,11 @@ a7522158-3d41-4b77-ad69-6c598ee55c49 Ivan Petrov male 1980-12-29 png +7495123456
0 0
2 4
3 9
a7da1aa6-f425-4789-8947-b034786ed374 Vasily Sidorov male 1995-07-28 bmp +442012345678 1 2018-12-30 00:00:00 23 leo ['Sunny'] [250,244,10] Murmansk [68.970680,33.074982] 3.14159265358979 100000000000.00 800 -3.2 154400000 ['pound'] [16] 503 []
c694ad8a-f714-4ea3-907d-fd54fb25d9b5 Natalia Sokolova female 1992-03-08 jpg \N 0 \N 26 pisces [] [100,200,50] Plymouth [50.403724,-4.142123] 3.14159 \N 0.007 5.4 -20000000000000 [] [] \N []
a7522158-3d41-4b77-ad69-6c598ee55c49 Ivan Petrov male 1980-12-29 png +74951234567\0 1 2019-01-05 18:45:00 38 capricorn ['Yesterday','Flowers'] [255,0,0] Moscow [55.753216,37.622504] 3.14 214.10 0.1 5.8 17060000000 ['meter','centimeter','kilometer'] [1,0.01,1000] 500 [501,502]
3faee064-c4f7-4d34-b6f3-8d81c2b6a15d Nick Kolesnikov male 1998-12-26 bmp 412-687-5007\0 1 2018-11-19 05:59:59 20 capricorn ['Havana'] [128,0,128] Pittsburgh [40.517192,-79.949456] 3.1415926535898 50000000000.00 780 18.3 195500007 ['ounce','carat','gram'] [28.35,0.2,1] 9494 []
2 4
3 9
ok
ok
......@@ -48,6 +48,14 @@ source "$CURDIR"/00825_protobuf_format_input.insh
$CLICKHOUSE_CLIENT --query "SELECT * FROM in_persons_00825 ORDER BY uuid;"
$CLICKHOUSE_CLIENT --query "SELECT * FROM in_squares_00825 ORDER BY number;"
$CLICKHOUSE_CLIENT --query "TRUNCATE TABLE in_persons_00825;"
$CLICKHOUSE_CLIENT --query "TRUNCATE TABLE in_squares_00825;"
source "$CURDIR"/00825_protobuf_format_input_single.insh
$CLICKHOUSE_CLIENT --query "SELECT * FROM in_persons_00825 ORDER BY uuid;"
$CLICKHOUSE_CLIENT --query "SELECT * FROM in_squares_00825 ORDER BY number;"
# Try to input malformed data.
set +eo pipefail
echo -ne '\xe0\x80\x3f\x0b' \
......@@ -55,5 +63,12 @@ echo -ne '\xe0\x80\x3f\x0b' \
| grep -qF "Protobuf messages are corrupted" && echo "ok" || echo "fail"
set -eo pipefail
# Try to input malformed data for ProtobufSingle
set +eo pipefail
echo -ne '\xff\xff\x3f\x0b' \
| $CLICKHOUSE_CLIENT --query="INSERT INTO in_persons_00825 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:Person'" 2>&1 \
| grep -qF "Protobuf messages are corrupted" && echo "ok" || echo "fail"
set -eo pipefail
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS in_persons_00825;"
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS in_squares_00825;"
echo -ne '\x0a\x24\x61\x37\x35\x32\x32\x31\x35\x38\x2d\x33\x64\x34\x31\x2d\x34\x62\x37\x37\x2d\x61\x64\x36\x39\x2d\x36\x63\x35\x39\x38\x65\x65\x35\x35\x63\x34\x39\x12\x04\x49\x76\x61\x6e\x1a\x06\x50\x65\x74\x72\x6f\x76\x20\x01\x28\xaf\x1f\x32\x03\x70\x6e\x67\x3a\x0c\x2b\x37\x34\x39\x35\x31\x32\x33\x34\x35\x36\x37\x40\x01\x4d\xfc\xd0\x30\x5c\x50\x26\x58\x09\x62\x09\x59\x65\x73\x74\x65\x72\x64\x61\x79\x62\x07\x46\x6c\x6f\x77\x65\x72\x73\x6a\x04\xff\x01\x00\x00\x72\x06\x4d\x6f\x73\x63\x6f\x77\x7a\x08\x4b\x03\x5f\x42\x72\x7d\x16\x42\x81\x01\x1f\x85\xeb\x51\xb8\x1e\x09\x40\x89\x01\x33\x33\x33\x33\x33\xc3\x6a\x40\x95\x01\xcd\xcc\xcc\x3d\x9d\x01\x9a\x99\xb9\x40\xa0\x01\x80\xc4\xd7\x8d\x7f\xaa\x01\x0c\x0a\x05\x6d\x65\x74\x65\x72\x15\x00\x00\x80\x3f\xaa\x01\x11\x0a\x0a\x63\x65\x6e\x74\x69\x6d\x65\x74\x65\x72\x15\x0a\xd7\x23\x3c\xaa\x01\x10\x0a\x09\x6b\x69\x6c\x6f\x6d\x65\x74\x65\x72\x15\x00\x00\x7a\x44\xb2\x01\x10\x0a\x0e\xa2\x06\x0b\x0a\x09\x08\xf4\x03\x12\x04\xf5\x03\xf6\x03' | $CLICKHOUSE_CLIENT --query="INSERT INTO in_persons_00825 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:Person'"
echo -ne '\x0a\x24\x63\x36\x39\x34\x61\x64\x38\x61\x2d\x66\x37\x31\x34\x2d\x34\x65\x61\x33\x2d\x39\x30\x37\x64\x2d\x66\x64\x35\x34\x66\x62\x32\x35\x64\x39\x62\x35\x12\x07\x4e\x61\x74\x61\x6c\x69\x61\x1a\x08\x53\x6f\x6b\x6f\x6c\x6f\x76\x61\x28\xa6\x3f\x32\x03\x6a\x70\x67\x50\x1a\x58\x0b\x6a\x04\x64\xc8\x01\x32\x72\x08\x50\x6c\x79\x6d\x6f\x75\x74\x68\x7a\x08\x6a\x9d\x49\x42\x46\x8c\x84\xc0\x81\x01\x6e\x86\x1b\xf0\xf9\x21\x09\x40\x95\x01\x42\x60\xe5\x3b\x9d\x01\xcd\xcc\xac\x40\xa0\x01\xff\xff\xa9\xce\x93\x8c\x09' | $CLICKHOUSE_CLIENT --query="INSERT INTO in_persons_00825 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:Person'"
echo -ne '\x0a\x24\x61\x37\x64\x61\x31\x61\x61\x36\x2d\x66\x34\x32\x35\x2d\x34\x37\x38\x39\x2d\x38\x39\x34\x37\x2d\x62\x30\x33\x34\x37\x38\x36\x65\x64\x33\x37\x34\x12\x06\x56\x61\x73\x69\x6c\x79\x1a\x07\x53\x69\x64\x6f\x72\x6f\x76\x20\x01\x28\xfb\x48\x32\x03\x62\x6d\x70\x3a\x0d\x2b\x34\x34\x32\x30\x31\x32\x33\x34\x35\x36\x37\x38\x40\x01\x4d\x50\xe0\x27\x5c\x50\x17\x58\x04\x62\x05\x53\x75\x6e\x6e\x79\x6a\x05\xfa\x01\xf4\x01\x0a\x72\x08\x4d\x75\x72\x6d\x61\x6e\x73\x6b\x7a\x08\xfd\xf0\x89\x42\xc8\x4c\x04\x42\x81\x01\x11\x2d\x44\x54\xfb\x21\x09\x40\x89\x01\x00\x00\x00\xe8\x76\x48\x37\x42\x95\x01\x00\x00\x48\x44\x9d\x01\xcd\xcc\x4c\xc0\xa0\x01\x80\xd4\x9f\x93\x01\xaa\x01\x0c\x0a\x05\x70\x6f\x75\x6e\x64\x15\x00\x00\x80\x41\xb2\x01\x0a\x0a\x08\xa2\x06\x05\x0a\x03\x08\xf7\x03' | $CLICKHOUSE_CLIENT --query="INSERT INTO in_persons_00825 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:Person'"
echo -ne '\x0a\x24\x33\x66\x61\x65\x65\x30\x36\x34\x2d\x63\x34\x66\x37\x2d\x34\x64\x33\x34\x2d\x62\x36\x66\x33\x2d\x38\x64\x38\x31\x63\x32\x62\x36\x61\x31\x35\x64\x12\x04\x4e\x69\x63\x6b\x1a\x0a\x4b\x6f\x6c\x65\x73\x6e\x69\x6b\x6f\x76\x20\x01\x28\xda\x52\x32\x03\x62\x6d\x70\x3a\x0c\x34\x31\x32\x2d\x36\x38\x37\x2d\x35\x30\x30\x37\x40\x01\x4d\x2f\x27\xf2\x5b\x50\x14\x58\x09\x62\x06\x48\x61\x76\x61\x6e\x61\x68\x80\x01\x68\x00\x68\x80\x01\x72\x0a\x50\x69\x74\x74\x73\x62\x75\x72\x67\x68\x7a\x08\x9b\x11\x22\x42\x1f\xe6\x9f\xc2\x81\x01\x28\x2d\x44\x54\xfb\x21\x09\x40\x89\x01\x00\x00\x00\xe8\x76\x48\x27\x42\x95\x01\x00\x00\x43\x44\x9d\x01\x66\x66\x92\x41\xa0\x01\xce\xdf\xb8\xba\x01\xab\x01\x0d\xcd\xcc\xe2\x41\x0d\xcd\xcc\x4c\x3e\x0d\x00\x00\x80\x3f\x12\x05\x6f\x75\x6e\x63\x65\x12\x05\x63\x61\x72\x61\x74\x12\x04\x67\x72\x61\x6d\xac\x01\xb3\x01\x0b\xa2\x06\x05\x0b\x08\x96\x4a\x0c\x0c\xb4\x01' | $CLICKHOUSE_CLIENT --query="INSERT INTO in_persons_00825 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format_syntax2:Syntax2Person'"
echo -ne '\x08\x02\x10\x04' | $CLICKHOUSE_CLIENT --query="INSERT INTO in_squares_00825 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:NumberAndSquare'"
echo -ne '\x08\x03\x10\x09' | $CLICKHOUSE_CLIENT --query="INSERT INTO in_squares_00825 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:NumberAndSquare'"
### Actually empty Protobuf message is a valid message (with all values default).
### It will work in Kafka but clickhouse-client forbids that:
### Code: 108. DB::Exception: No data to insert
## echo -ne '' | $CLICKHOUSE_CLIENT --query="INSERT INTO in_squares_00825 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:NumberAndSquare'"
\ No newline at end of file
......@@ -10,7 +10,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
set -e -o pipefail
# Run the client.
$CLICKHOUSE_CLIENT --multiquery <<EOF
$CLICKHOUSE_CLIENT -mnT <<EOF
DROP TABLE IF EXISTS out_persons_00825;
DROP TABLE IF EXISTS out_squares_00825;
......@@ -58,6 +58,21 @@ SELECT * FROM out_persons_00825 ORDER BY name FORMAT Protobuf SETTINGS format_sc
SELECT 'SQUARES->';
SELECT * FROM out_squares_00825 ORDER BY number FORMAT Protobuf SETTINGS format_schema = '$CURDIR/00825_protobuf_format:NumberAndSquare';
SELECT '\n\n** ProtobufSingle **\n\n';
SELECT * FROM out_persons_00825 ORDER BY name LIMIT 1 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:Person';
SELECT 'ALTERNATIVE->';
SELECT * FROM out_persons_00825 ORDER BY name LIMIT 1 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:AltPerson';
SELECT 'STRINGS->';
SELECT * FROM out_persons_00825 ORDER BY name LIMIT 1 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:StrPerson';
SELECT 'SYNTAX2->';
SELECT * FROM out_persons_00825 ORDER BY name LIMIT 1 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format_syntax2:Syntax2Person';
SELECT 'SQUARES->';
SELECT * FROM out_squares_00825 ORDER BY number LIMIT 1 FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:NumberAndSquare';
-- Code: 546, e.displayText() = DB::Exception: The ProtobufSingle format can't be used to write multiple rows because this format doesn't have any row delimiter.
SELECT * FROM out_persons_00825 ORDER BY name FORMAT ProtobufSingle SETTINGS format_schema = '$CURDIR/00825_protobuf_format:Person'; -- { clientError 546 }
DROP TABLE IF EXISTS out_persons_00825;
DROP TABLE IF EXISTS out_squares_00825;
EOF
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册