Refactoring WriteCallback->RowOutputFormatParams

上级 e17d4e1e
......@@ -8,6 +8,7 @@
#include <DataStreams/ParallelParsingBlockInputStream.h>
#include <Formats/FormatSettings.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/Formats/OutputStreamToOutputFormat.h>
#include <DataStreams/NativeBlockInputStream.h>
......@@ -268,10 +269,13 @@ OutputFormatPtr FormatFactory::getOutputFormat(
const Settings & settings = context.getSettingsRef();
FormatSettings format_settings = getOutputFormatSetting(settings, context);
RowOutputFormatParams params;
params.callback = std::move(callback);
/** TODO: Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns.
*/
auto format = output_getter(buf, sample, std::move(callback), format_settings);
auto format = output_getter(buf, sample, params, format_settings);
/// Enable auto-flush for streaming mode. Currently it is needed by INSERT WATCH query.
if (format_settings.enable_streaming)
......
......@@ -27,6 +27,7 @@ class IInputFormat;
class IOutputFormat;
struct RowInputFormatParams;
struct RowOutputFormatParams;
using InputFormatPtr = std::shared_ptr<IInputFormat>;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
......@@ -80,7 +81,7 @@ private:
using OutputProcessorCreator = std::function<OutputFormatPtr(
WriteBuffer & buf,
const Block & sample,
WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings)>;
struct Creators
......
......@@ -25,8 +25,8 @@ void IRowOutputFormat::consume(DB::Chunk chunk)
write(columns, row);
if (write_single_row_callback)
write_single_row_callback(columns, row);
if (params.callback)
params.callback(columns, row);
}
}
......
......@@ -9,6 +9,15 @@
namespace DB
{
/// Common parameters for generating blocks.
struct RowOutputFormatParams
{
using WriteCallback = std::function<void(const Columns & columns,size_t row)>;
// Callback used to indicate that another row is written.
WriteCallback callback;
};
class WriteBuffer;
/** Output format that writes data row by row.
......@@ -24,8 +33,10 @@ protected:
void finalize() override;
public:
IRowOutputFormat(const Block & header, WriteBuffer & out_, FormatFactory::WriteCallback callback)
: IOutputFormat(header, out_), types(header.getDataTypes()), write_single_row_callback(callback)
using Params = RowOutputFormatParams;
IRowOutputFormat(const Block & header, WriteBuffer & out_, const Params & params_)
: IOutputFormat(header, out_), types(header.getDataTypes()), params(params_)
{
}
......@@ -59,8 +70,7 @@ private:
bool prefix_written = false;
bool suffix_written = false;
// Callback used to indicate that another row is written.
FormatFactory::WriteCallback write_single_row_callback;
Params params;
void writePrefixIfNot()
{
......
......@@ -79,7 +79,7 @@ void registerOutputFormatProcessorArrow(FormatFactory & factory)
"Arrow",
[](WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, false, format_settings);
......@@ -89,7 +89,7 @@ void registerOutputFormatProcessorArrow(FormatFactory & factory)
"ArrowStream",
[](WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, true, format_settings);
......
......@@ -347,8 +347,8 @@ static avro::Codec getCodec(const std::string & codec_name)
}
AvroRowOutputFormat::AvroRowOutputFormat(
WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & settings_)
: IRowOutputFormat(header_, out_, callback)
WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params, const FormatSettings & settings_)
: IRowOutputFormat(header_, out_, params)
, settings(settings_)
, serializer(header_.getColumnsWithTypeAndName())
, file_writer(
......@@ -383,10 +383,10 @@ void registerOutputFormatProcessorAvro(FormatFactory & factory)
factory.registerOutputFormatProcessor("Avro", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<AvroRowOutputFormat>(buf, sample, callback, settings);
return std::make_shared<AvroRowOutputFormat>(buf, sample, params, settings);
});
}
......
......@@ -43,7 +43,7 @@ private:
class AvroRowOutputFormat : public IRowOutputFormat
{
public:
AvroRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & settings_);
AvroRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params, const FormatSettings & settings_);
virtual ~AvroRowOutputFormat() override;
String getName() const override { return "AvroRowOutputFormat"; }
......
......@@ -9,8 +9,8 @@
namespace DB
{
BinaryRowOutputFormat::BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_, FormatFactory::WriteCallback callback)
: IRowOutputFormat(header, out_, callback), with_names(with_names_), with_types(with_types_)
BinaryRowOutputFormat::BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_, const RowOutputFormatParams & params)
: IRowOutputFormat(header, out_, params), with_names(with_names_), with_types(with_types_)
{
}
......@@ -52,19 +52,19 @@ void registerOutputFormatProcessorRowBinary(FormatFactory & factory)
factory.registerOutputFormatProcessor("RowBinary", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings &)
{
return std::make_shared<BinaryRowOutputFormat>(buf, sample, false, false, callback);
return std::make_shared<BinaryRowOutputFormat>(buf, sample, false, false, params);
});
factory.registerOutputFormatProcessor("RowBinaryWithNamesAndTypes", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings &)
{
return std::make_shared<BinaryRowOutputFormat>(buf, sample, true, true, callback);
return std::make_shared<BinaryRowOutputFormat>(buf, sample, true, true, params);
});
}
......
......@@ -17,7 +17,7 @@ class WriteBuffer;
class BinaryRowOutputFormat: public IRowOutputFormat
{
public:
BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_, FormatFactory::WriteCallback callback);
BinaryRowOutputFormat(WriteBuffer & out_, const Block & header, bool with_names_, bool with_types_, const RowOutputFormatParams & params);
String getName() const override { return "BinaryRowOutputFormat"; }
......
......@@ -8,8 +8,8 @@ namespace DB
{
CSVRowOutputFormat::CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, callback), with_names(with_names_), format_settings(format_settings_)
CSVRowOutputFormat::CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, const RowOutputFormatParams & params, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params), with_names(with_names_), format_settings(format_settings_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
size_t columns = sample.columns();
......@@ -77,10 +77,10 @@ void registerOutputFormatProcessorCSV(FormatFactory & factory)
factory.registerOutputFormatProcessor(with_names ? "CSVWithNames" : "CSV", [=](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<CSVRowOutputFormat>(buf, sample, with_names, callback, format_settings);
return std::make_shared<CSVRowOutputFormat>(buf, sample, with_names, params, format_settings);
});
}
}
......
......@@ -20,7 +20,7 @@ public:
/** with_names - output in the first line a header with column names
* with_types - output in the next line header with the names of the types
*/
CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_);
CSVRowOutputFormat(WriteBuffer & out_, const Block & header_, bool with_names_, const RowOutputFormatParams & params, const FormatSettings & format_settings_);
String getName() const override { return "CSVRowOutputFormat"; }
......
......@@ -10,11 +10,11 @@ namespace DB
JSONCompactEachRowRowOutputFormat::JSONCompactEachRowRowOutputFormat(WriteBuffer & out_,
const Block & header_,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings_,
bool with_names_,
bool yield_strings_)
: IRowOutputFormat(header_, out_, callback), settings(settings_), with_names(with_names_), yield_strings(yield_strings_)
: IRowOutputFormat(header_, out_, params), settings(settings_), with_names(with_names_), yield_strings(yield_strings_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
NamesAndTypesList columns(sample.getNamesAndTypesList());
......@@ -103,37 +103,37 @@ void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory)
factory.registerOutputFormatProcessor("JSONCompactEachRow", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, callback, format_settings, false, false);
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, params, format_settings, false, false);
});
factory.registerOutputFormatProcessor("JSONCompactEachRowWithNamesAndTypes", [](
WriteBuffer &buf,
const Block &sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings &format_settings)
{
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, callback, format_settings, true, false);
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, params, format_settings, true, false);
});
factory.registerOutputFormatProcessor("JSONCompactStringsEachRow", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, callback, format_settings, false, true);
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, params, format_settings, false, true);
});
factory.registerOutputFormatProcessor("JSONCompactStringsEachRowWithNamesAndTypes", [](
WriteBuffer &buf,
const Block &sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings &format_settings)
{
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, callback, format_settings, true, true);
return std::make_shared<JSONCompactEachRowRowOutputFormat>(buf, sample, params, format_settings, true, true);
});
}
......
......@@ -18,7 +18,7 @@ public:
JSONCompactEachRowRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings_,
bool with_names_,
bool yield_strings_);
......
......@@ -10,10 +10,10 @@ namespace DB
JSONCompactRowOutputFormat::JSONCompactRowOutputFormat(
WriteBuffer & out_,
const Block & header,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings_,
bool yield_strings_)
: JSONRowOutputFormat(out_, header, callback, settings_, yield_strings_)
: JSONRowOutputFormat(out_, header, params, settings_, yield_strings_)
{
}
......@@ -93,19 +93,19 @@ void registerOutputFormatProcessorJSONCompact(FormatFactory & factory)
factory.registerOutputFormatProcessor("JSONCompact", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, callback, format_settings, false);
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, params, format_settings, false);
});
factory.registerOutputFormatProcessor("JSONCompactStrings", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, callback, format_settings, true);
return std::make_shared<JSONCompactRowOutputFormat>(buf, sample, params, format_settings, true);
});
}
......
......@@ -19,7 +19,7 @@ public:
JSONCompactRowOutputFormat(
WriteBuffer & out_,
const Block & header,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings_,
bool yield_strings_);
......
......@@ -11,10 +11,10 @@ namespace DB
JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings_,
bool yield_strings_)
: IRowOutputFormat(header_, out_, callback), settings(settings_), yield_strings(yield_strings_)
: IRowOutputFormat(header_, out_, params), settings(settings_), yield_strings(yield_strings_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
size_t columns = sample.columns();
......@@ -71,19 +71,19 @@ void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory)
factory.registerOutputFormatProcessor("JSONEachRow", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, callback, format_settings, false);
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, params, format_settings, false);
});
factory.registerOutputFormatProcessor("JSONStringsEachRow", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, callback, format_settings, true);
return std::make_shared<JSONEachRowRowOutputFormat>(buf, sample, params, format_settings, true);
});
}
......
......@@ -18,7 +18,7 @@ public:
JSONEachRowRowOutputFormat(
WriteBuffer & out_,
const Block & header_,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings_,
bool yield_strings_);
......
......@@ -33,19 +33,19 @@ void registerOutputFormatProcessorJSONEachRowWithProgress(FormatFactory & factor
factory.registerOutputFormatProcessor("JSONEachRowWithProgress", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf, sample, callback, format_settings, false);
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf, sample, params, format_settings, false);
});
factory.registerOutputFormatProcessor("JSONStringsEachRowWithProgress", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf, sample, callback, format_settings, true);
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf, sample, params, format_settings, true);
});
}
......
......@@ -10,10 +10,10 @@ namespace DB
JSONRowOutputFormat::JSONRowOutputFormat(
WriteBuffer & out_,
const Block & header,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings_,
bool yield_strings_)
: IRowOutputFormat(header, out_, callback), settings(settings_), yield_strings(yield_strings_)
: IRowOutputFormat(header, out_, params), settings(settings_), yield_strings(yield_strings_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
NamesAndTypesList columns(sample.getNamesAndTypesList());
......@@ -271,19 +271,19 @@ void registerOutputFormatProcessorJSON(FormatFactory & factory)
factory.registerOutputFormatProcessor("JSON", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONRowOutputFormat>(buf, sample, callback, format_settings, false);
return std::make_shared<JSONRowOutputFormat>(buf, sample, params, format_settings, false);
});
factory.registerOutputFormatProcessor("JSONStrings", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & format_settings)
{
return std::make_shared<JSONRowOutputFormat>(buf, sample, callback, format_settings, true);
return std::make_shared<JSONRowOutputFormat>(buf, sample, params, format_settings, true);
});
}
......
......@@ -19,7 +19,7 @@ public:
JSONRowOutputFormat(
WriteBuffer & out_,
const Block & header,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings_,
bool yield_strings_);
......
......@@ -5,8 +5,8 @@
namespace DB
{
MarkdownRowOutputFormat::MarkdownRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, callback), format_settings(format_settings_) {}
MarkdownRowOutputFormat::MarkdownRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params), format_settings(format_settings_) {}
void MarkdownRowOutputFormat::writePrefix()
{
......@@ -60,10 +60,10 @@ void registerOutputFormatProcessorMarkdown(FormatFactory & factory)
factory.registerOutputFormatProcessor("Markdown", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<MarkdownRowOutputFormat>(buf, sample, callback, settings);
return std::make_shared<MarkdownRowOutputFormat>(buf, sample, params, settings);
});
}
......
......@@ -12,7 +12,7 @@ class ReadBuffer;
class MarkdownRowOutputFormat : public IRowOutputFormat
{
public:
MarkdownRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_);
MarkdownRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params, const FormatSettings & format_settings_);
/// Write higher part of markdown table like this:
/// |columnName1|columnName2|...|columnNameN|
......
......@@ -24,8 +24,8 @@ namespace ErrorCodes
extern const int ILLEGAL_COLUMN;
}
MsgPackRowOutputFormat::MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback)
: IRowOutputFormat(header_, out_, callback), packer(out_) {}
MsgPackRowOutputFormat::MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params)
: IRowOutputFormat(header_, out_, params), packer(out_) {}
void MsgPackRowOutputFormat::serializeField(const IColumn & column, DataTypePtr data_type, size_t row_num)
{
......@@ -154,10 +154,10 @@ void registerOutputFormatProcessorMsgPack(FormatFactory & factory)
factory.registerOutputFormatProcessor("MsgPack", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings &)
{
return std::make_shared<MsgPackRowOutputFormat>(buf, sample, callback);
return std::make_shared<MsgPackRowOutputFormat>(buf, sample, params);
});
}
......
......@@ -20,7 +20,7 @@ namespace DB
class MsgPackRowOutputFormat : public IRowOutputFormat
{
public:
MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback);
MsgPackRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params);
String getName() const override { return "MsgPackRowOutputFormat"; }
......
......@@ -95,7 +95,7 @@ void registerOutputFormatProcessorMySQLWire(FormatFactory & factory)
"MySQLWire",
[](WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & settings) { return std::make_shared<MySQLOutputFormat>(buf, sample, settings); });
}
......
......@@ -168,7 +168,7 @@ void registerOutputFormatProcessorNative(FormatFactory & factory)
factory.registerOutputFormatProcessor("Native", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings &)
{
return std::make_shared<NativeOutputFormatFromNativeBlockOutputStream>(sample, buf);
......
......@@ -21,7 +21,7 @@ void registerOutputFormatProcessorNull(FormatFactory & factory)
factory.registerOutputFormatProcessor("Null", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings &)
{
return std::make_shared<NullOutputFormat>(sample, buf);
......
......@@ -107,7 +107,7 @@ void ODBCDriver2BlockOutputFormat::writePrefix()
void registerOutputFormatProcessorODBCDriver2(FormatFactory & factory)
{
factory.registerOutputFormatProcessor(
"ODBCDriver2", [](WriteBuffer & buf, const Block & sample, FormatFactory::WriteCallback, const FormatSettings & format_settings)
"ODBCDriver2", [](WriteBuffer & buf, const Block & sample, const RowOutputFormatParams &, const FormatSettings & format_settings)
{
return std::make_shared<ODBCDriver2BlockOutputFormat>(buf, sample, format_settings);
});
......
......@@ -433,7 +433,7 @@ void registerOutputFormatProcessorORC(FormatFactory & factory)
factory.registerOutputFormatProcessor("ORC", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<ORCBlockOutputFormat>(buf, sample, format_settings);
......
......@@ -84,7 +84,7 @@ void registerOutputFormatProcessorParquet(FormatFactory & factory)
"Parquet",
[](WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
auto impl = std::make_shared<ParquetBlockOutputFormat>(buf, sample, format_settings);
......
......@@ -73,7 +73,7 @@ void registerOutputFormatProcessorPostgreSQLWire(FormatFactory & factory)
"PostgreSQLWire",
[](WriteBuffer & buf,
const Block & sample,
const FormatFactory::WriteCallback &,
const RowOutputFormatParams &,
const FormatSettings & settings) { return std::make_shared<PostgreSQLOutputFormat>(buf, sample, settings); });
}
}
......@@ -403,7 +403,7 @@ void registerOutputFormatProcessorPretty(FormatFactory & factory)
factory.registerOutputFormatProcessor("Pretty", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<PrettyBlockOutputFormat>(buf, sample, format_settings);
......@@ -412,7 +412,7 @@ void registerOutputFormatProcessorPretty(FormatFactory & factory)
factory.registerOutputFormatProcessor("PrettyNoEscapes", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
FormatSettings changed_settings = format_settings;
......
......@@ -259,7 +259,7 @@ void registerOutputFormatProcessorPrettyCompact(FormatFactory & factory)
factory.registerOutputFormatProcessor(name, [mono_block = mono_block](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<PrettyCompactBlockOutputFormat>(buf, sample, format_settings, mono_block);
......@@ -269,7 +269,7 @@ void registerOutputFormatProcessorPrettyCompact(FormatFactory & factory)
factory.registerOutputFormatProcessor("PrettyCompactNoEscapes", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
FormatSettings changed_settings = format_settings;
......
......@@ -113,7 +113,7 @@ void registerOutputFormatProcessorPrettySpace(FormatFactory & factory)
factory.registerOutputFormatProcessor("PrettySpace", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
return std::make_shared<PrettySpaceBlockOutputFormat>(buf, sample, format_settings);
......@@ -122,7 +122,7 @@ void registerOutputFormatProcessorPrettySpace(FormatFactory & factory)
factory.registerOutputFormatProcessor("PrettySpaceNoEscapes", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & format_settings)
{
FormatSettings changed_settings = format_settings;
......
......@@ -20,10 +20,10 @@ namespace ErrorCodes
ProtobufRowOutputFormat::ProtobufRowOutputFormat(
WriteBuffer & out_,
const Block & header,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSchemaInfo & format_schema,
const bool single_message_mode_)
: IRowOutputFormat(header, out_, callback)
: IRowOutputFormat(header, out_, params)
, data_types(header.getDataTypes())
, writer(out, ProtobufSchemas::instance().getMessageTypeForFormatSchema(format_schema), header.getNames(), single_message_mode_)
{
......@@ -46,15 +46,14 @@ void registerOutputFormatProcessorProtobuf(FormatFactory & factory)
{
for (bool single_message_mode : {false, true})
{
factory.registerOutputFormatProcessor(
single_message_mode ? "ProtobufSingle" : "Protobuf",
[single_message_mode](WriteBuffer & buf,
const Block & header,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<ProtobufRowOutputFormat>(buf, header, std::move(callback),
return std::make_shared<ProtobufRowOutputFormat>(buf, header, params,
FormatSchemaInfo(settings.schema.format_schema, "Protobuf", true,
settings.schema.is_server, settings.schema.format_schema_path),
single_message_mode);
......
......@@ -39,7 +39,7 @@ public:
ProtobufRowOutputFormat(
WriteBuffer & out_,
const Block & header,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSchemaInfo & format_schema,
const bool single_message_mode_);
......
......@@ -7,8 +7,8 @@
namespace DB
{
TSKVRowOutputFormat::TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_)
: TabSeparatedRowOutputFormat(out_, header, false, false, callback, format_settings_)
TSKVRowOutputFormat::TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, const RowOutputFormatParams & params, const FormatSettings & format_settings_)
: TabSeparatedRowOutputFormat(out_, header, false, false, params, format_settings_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
NamesAndTypesList columns(sample.getNamesAndTypesList());
......@@ -44,10 +44,10 @@ void registerOutputFormatProcessorTSKV(FormatFactory & factory)
factory.registerOutputFormatProcessor("TSKV", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<TSKVRowOutputFormat>(buf, sample, callback, settings);
return std::make_shared<TSKVRowOutputFormat>(buf, sample, params, settings);
});
}
......
......@@ -14,7 +14,7 @@ namespace DB
class TSKVRowOutputFormat: public TabSeparatedRowOutputFormat
{
public:
TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, FormatFactory::WriteCallback callback, const FormatSettings & format_settings);
TSKVRowOutputFormat(WriteBuffer & out_, const Block & header, const RowOutputFormatParams & params, const FormatSettings & format_settings);
String getName() const override { return "TSKVRowOutputFormat"; }
......
......@@ -18,9 +18,9 @@ public:
const Block & header_,
bool with_names_,
bool with_types_,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & format_settings_)
: TabSeparatedRowOutputFormat(out_, header_, with_names_, with_types_, callback, format_settings_)
: TabSeparatedRowOutputFormat(out_, header_, with_names_, with_types_, params, format_settings_)
{
}
......
......@@ -11,9 +11,9 @@ TabSeparatedRowOutputFormat::TabSeparatedRowOutputFormat(
const Block & header_,
bool with_names_,
bool with_types_,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, callback), with_names(with_names_), with_types(with_types_), format_settings(format_settings_)
: IRowOutputFormat(header_, out_, params), with_names(with_names_), with_types(with_types_), format_settings(format_settings_)
{
}
......@@ -80,10 +80,10 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory)
factory.registerOutputFormatProcessor(name, [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, false, false, callback, settings);
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, false, false, params, settings);
});
}
......@@ -92,10 +92,10 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory)
factory.registerOutputFormatProcessor(name, [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<TabSeparatedRawRowOutputFormat>(buf, sample, false, false, callback, settings);
return std::make_shared<TabSeparatedRawRowOutputFormat>(buf, sample, false, false, params, settings);
});
}
......@@ -104,10 +104,10 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory)
factory.registerOutputFormatProcessor(name, [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, true, false, callback, settings);
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, true, false, params, settings);
});
}
......@@ -116,10 +116,10 @@ void registerOutputFormatProcessorTabSeparated(FormatFactory & factory)
factory.registerOutputFormatProcessor(name, [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, true, true, callback, settings);
return std::make_shared<TabSeparatedRowOutputFormat>(buf, sample, true, true, params, settings);
});
}
}
......
......@@ -23,7 +23,7 @@ public:
const Block & header_,
bool with_names_,
bool with_types_,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & format_settings_);
String getName() const override { return "TabSeparatedRowOutputFormat"; }
......
......@@ -232,7 +232,7 @@ void registerOutputFormatProcessorTemplate(FormatFactory & factory)
factory.registerOutputFormatProcessor("Template", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & settings)
{
ParsedTemplateFormatString resultset_format;
......@@ -270,7 +270,7 @@ void registerOutputFormatProcessorTemplate(FormatFactory & factory)
factory.registerOutputFormatProcessor("CustomSeparated", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const RowOutputFormatParams &,
const FormatSettings & settings)
{
ParsedTemplateFormatString resultset_format = ParsedTemplateFormatString::setupCustomSeparatedResultsetFormat(settings.custom);
......
......@@ -10,8 +10,8 @@ namespace DB
{
ValuesRowOutputFormat::ValuesRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, callback), format_settings(format_settings_)
ValuesRowOutputFormat::ValuesRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params), format_settings(format_settings_)
{
}
......@@ -46,10 +46,10 @@ void registerOutputFormatProcessorValues(FormatFactory & factory)
factory.registerOutputFormatProcessor("Values", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<ValuesRowOutputFormat>(buf, sample, callback, settings);
return std::make_shared<ValuesRowOutputFormat>(buf, sample, params, settings);
});
}
......
......@@ -15,7 +15,7 @@ class WriteBuffer;
class ValuesRowOutputFormat : public IRowOutputFormat
{
public:
ValuesRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_);
ValuesRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params, const FormatSettings & format_settings_);
String getName() const override { return "ValuesRowOutputFormat"; }
......
......@@ -11,8 +11,8 @@ namespace DB
{
VerticalRowOutputFormat::VerticalRowOutputFormat(
WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, callback), format_settings(format_settings_)
WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params), format_settings(format_settings_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
size_t columns = sample.columns();
......@@ -168,10 +168,10 @@ void registerOutputFormatProcessorVertical(FormatFactory & factory)
factory.registerOutputFormatProcessor("Vertical", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<VerticalRowOutputFormat>(buf, sample, callback, settings);
return std::make_shared<VerticalRowOutputFormat>(buf, sample, params, settings);
});
}
......
......@@ -18,7 +18,7 @@ class Context;
class VerticalRowOutputFormat : public IRowOutputFormat
{
public:
VerticalRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_);
VerticalRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params, const FormatSettings & format_settings_);
String getName() const override { return "VerticalRowOutputFormat"; }
......
......@@ -7,8 +7,8 @@
namespace DB
{
XMLRowOutputFormat::XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, callback), format_settings(format_settings_)
XMLRowOutputFormat::XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params, const FormatSettings & format_settings_)
: IRowOutputFormat(header_, out_, params), format_settings(format_settings_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
NamesAndTypesList columns(sample.getNamesAndTypesList());
......@@ -245,10 +245,10 @@ void registerOutputFormatProcessorXML(FormatFactory & factory)
factory.registerOutputFormatProcessor("XML", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const RowOutputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<XMLRowOutputFormat>(buf, sample, callback, settings);
return std::make_shared<XMLRowOutputFormat>(buf, sample, params, settings);
});
}
......
......@@ -16,7 +16,7 @@ namespace DB
class XMLRowOutputFormat : public IRowOutputFormat
{
public:
XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & format_settings_);
XMLRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params, const FormatSettings & format_settings_);
String getName() const override { return "XMLRowOutputFormat"; }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册