提交 ba2efbae 编写于 作者: V Vitaly Baranov

Make the interface IRowOutputStream more generic and use it for writing protobufs.

上级 d448d3e9
......@@ -12,26 +12,12 @@ BlockOutputStreamFromRowOutputStream::BlockOutputStreamFromRowOutputStream(RowOu
void BlockOutputStreamFromRowOutputStream::write(const Block & block)
{
size_t rows = block.rows();
size_t columns = block.columns();
for (size_t i = 0; i < rows; ++i)
{
if (!first_row)
row_output->writeRowBetweenDelimiter();
first_row = false;
row_output->writeRowStartDelimiter();
for (size_t j = 0; j < columns; ++j)
{
if (j != 0)
row_output->writeFieldDelimiter();
auto & col = block.getByPosition(j);
row_output->writeField(*col.column, *col.type, i);
}
row_output->writeRowEndDelimiter();
row_output->write(block, i);
}
}
......
#include <Common/Exception.h>
#include <Core/Block.h>
#include <Formats/IRowOutputStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
void IRowOutputStream::write(const Block & block, size_t row_num)
{
......@@ -23,4 +29,9 @@ void IRowOutputStream::write(const Block & block, size_t row_num)
writeRowEndDelimiter();
}
void IRowOutputStream::writeField(const IColumn &, const IDataType &, size_t)
{
throw Exception("Method writeField is not implemented for output format", ErrorCodes::NOT_IMPLEMENTED);
}
}
......@@ -28,7 +28,7 @@ public:
virtual void write(const Block & block, size_t row_num);
/** Write single value. */
virtual void writeField(const IColumn & column, const IDataType & type, size_t row_num) = 0;
virtual void writeField(const IColumn & column, const IDataType & type, size_t row_num);
/** Write delimiter. */
virtual void writeFieldDelimiter() {} /// delimiter between values
......
......@@ -3,9 +3,10 @@
#include <Common/config.h>
#if USE_PROTOBUF
#include "ProtobufBlockOutputStream.h"
#include "ProtobufRowOutputStream.h"
#include <Core/Block.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
#include <Formats/FormatSchemaInfo.h>
#include <Formats/ProtobufSchemas.h>
#include <Interpreters/Context.h>
......@@ -21,27 +22,25 @@ namespace ErrorCodes
}
ProtobufBlockOutputStream::ProtobufBlockOutputStream(
ProtobufRowOutputStream::ProtobufRowOutputStream(
WriteBuffer & buffer_,
const Block & header_,
const google::protobuf::Descriptor * message_type,
const FormatSettings & format_settings_)
: writer(buffer_, message_type), header(header_), format_settings(format_settings_)
{
}
void ProtobufBlockOutputStream::write(const Block & block)
const Block & header,
const google::protobuf::Descriptor * message_type)
: writer(buffer_, message_type)
{
std::vector<const ColumnWithTypeAndName *> columns_in_write_order;
const auto & fields_in_write_order = writer.fieldsInWriteOrder();
columns_in_write_order.reserve(fields_in_write_order.size());
column_indices.reserve(fields_in_write_order.size());
data_types.reserve(fields_in_write_order.size());
for (size_t i = 0; i != fields_in_write_order.size(); ++i)
{
const auto * field = fields_in_write_order[i];
const ColumnWithTypeAndName * column = nullptr;
if (block.has(field->name()))
size_t column_index = static_cast<size_t>(-1);
DataTypePtr data_type = nullptr;
if (header.has(field->name()))
{
column = &block.getByName(field->name());
column_index = header.getPositionByName(field->name());
data_type = header.getByPosition(column_index).type;
}
else if (field->is_required())
{
......@@ -49,21 +48,19 @@ void ProtobufBlockOutputStream::write(const Block & block)
"Output doesn't have a column named '" + field->name() + "' which is required to write the output in the protobuf format.",
ErrorCodes::NO_DATA_FOR_REQUIRED_PROTOBUF_FIELD);
}
columns_in_write_order.emplace_back(column);
column_indices.emplace_back(column_index);
data_types.emplace_back(data_type);
}
}
for (size_t row_num = 0; row_num != block.rows(); ++row_num)
void ProtobufRowOutputStream::write(const Block & block, size_t row_num)
{
writer.newMessage();
for (size_t i = 0; i != data_types.size(); ++i)
{
writer.newMessage();
for (const auto * column : columns_in_write_order)
{
if (column)
{
assert(column->name == writer.currentField()->name());
column->type->serializeProtobuf(*(column->column), row_num, writer);
}
writer.nextField();
}
if (data_types[i])
data_types[i]->serializeProtobuf(*block.getByPosition(column_indices[i]).column, row_num, writer);
writer.nextField();
}
}
......@@ -71,10 +68,11 @@ void ProtobufBlockOutputStream::write(const Block & block)
void registerOutputFormatProtobuf(FormatFactory & factory)
{
factory.registerOutputFormat(
"Protobuf", [](WriteBuffer & buf, const Block & header, const Context & context, const FormatSettings & format_settings)
"Protobuf", [](WriteBuffer & buf, const Block & header, const Context & context, const FormatSettings &)
{
const auto * message_type = ProtobufSchemas::instance().getMessageTypeForFormatSchema(FormatSchemaInfo(context, "proto"));
return std::make_shared<ProtobufBlockOutputStream>(buf, header, message_type, format_settings);
return std::make_shared<BlockOutputStreamFromRowOutputStream>(
std::make_shared<ProtobufRowOutputStream>(buf, header, message_type), header);
});
}
......
#pragma once
#include <Core/Block.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Formats/IRowOutputStream.h>
#include <Formats/FormatSettings.h>
#include <Formats/ProtobufWriter.h>
......@@ -25,23 +25,21 @@ namespace DB
* SELECT * from table FORMAT Protobuf SETTINGS format_schema = 'schema:Message'
* where schema is the name of "schema.proto" file specifying protobuf schema.
*/
class ProtobufBlockOutputStream : public IBlockOutputStream
class ProtobufRowOutputStream : public IRowOutputStream
{
public:
ProtobufBlockOutputStream(
ProtobufRowOutputStream(
WriteBuffer & buffer_,
const Block & header_,
const google::protobuf::Descriptor * message_prototype_,
const FormatSettings & format_settings_);
const google::protobuf::Descriptor * message_prototype_);
Block getHeader() const override { return header; }
void write(const Block & block) override;
void write(const Block & block, size_t row_num) override;
std::string getContentType() const override { return "application/octet-stream"; }
private:
ProtobufWriter writer;
const Block header;
const FormatSettings format_settings;
DataTypes data_types;
std::vector<size_t> column_indices;
};
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册