提交 86fa185b 编写于 作者: H hcz

Add JSONStrings formats

上级 60253539
......@@ -352,6 +352,8 @@ void registerInputFormatProcessorJSONEachRow(FormatFactory & factory);
void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory);
void registerInputFormatProcessorJSONCompactEachRow(FormatFactory & factory);
void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory);
void registerInputFormatProcessorJSONStringsEachRow(FormatFactory & factory);
void registerOutputFormatProcessorJSONStringsEachRow(FormatFactory & factory);
void registerInputFormatProcessorProtobuf(FormatFactory & factory);
void registerOutputFormatProcessorProtobuf(FormatFactory & factory);
void registerInputFormatProcessorTemplate(FormatFactory & factory);
......@@ -378,6 +380,7 @@ void registerOutputFormatProcessorVertical(FormatFactory & factory);
void registerOutputFormatProcessorJSON(FormatFactory & factory);
void registerOutputFormatProcessorJSONCompact(FormatFactory & factory);
void registerOutputFormatProcessorJSONEachRowWithProgress(FormatFactory & factory);
void registerOutputFormatProcessorJSONStrings(FormatFactory & factory);
void registerOutputFormatProcessorXML(FormatFactory & factory);
void registerOutputFormatProcessorODBCDriver2(FormatFactory & factory);
void registerOutputFormatProcessorNull(FormatFactory & factory);
......@@ -418,6 +421,8 @@ FormatFactory::FormatFactory()
registerOutputFormatProcessorJSONEachRow(*this);
registerInputFormatProcessorJSONCompactEachRow(*this);
registerOutputFormatProcessorJSONCompactEachRow(*this);
registerInputFormatProcessorJSONStringsEachRow(*this);
registerOutputFormatProcessorJSONStringsEachRow(*this);
registerInputFormatProcessorProtobuf(*this);
registerOutputFormatProcessorProtobuf(*this);
registerInputFormatProcessorTemplate(*this);
......@@ -444,6 +449,7 @@ FormatFactory::FormatFactory()
registerOutputFormatProcessorJSON(*this);
registerOutputFormatProcessorJSONCompact(*this);
registerOutputFormatProcessorJSONEachRowWithProgress(*this);
registerOutputFormatProcessorJSONStrings(*this);
registerOutputFormatProcessorXML(*this);
registerOutputFormatProcessorODBCDriver2(*this);
registerOutputFormatProcessorNull(*this);
......
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <Processors/Formats/Impl/JSONStringsEachRowRowInputFormat.h>
#include <Formats/FormatFactory.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeNullable.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
extern const int CANNOT_READ_ALL_DATA;
}
JSONStringsEachRowRowInputFormat::JSONStringsEachRowRowInputFormat(ReadBuffer & in_,
const Block & header_,
Params params_,
const FormatSettings & format_settings_,
bool with_names_)
: IRowInputFormat(header_, in_, std::move(params_)), format_settings(format_settings_), with_names(with_names_)
{
const auto & sample = getPort().getHeader();
size_t num_columns = sample.columns();
data_types.resize(num_columns);
column_indexes_by_names.reserve(num_columns);
for (size_t i = 0; i < num_columns; ++i)
{
const auto & column_info = sample.getByPosition(i);
data_types[i] = column_info.type;
column_indexes_by_names.emplace(column_info.name, i);
}
}
void JSONStringsEachRowRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
column_indexes_for_input_fields.clear();
not_seen_columns.clear();
}
void JSONStringsEachRowRowInputFormat::readPrefix()
{
/// In this format, BOM at beginning of stream cannot be confused with value, so it is safe to skip it.
skipBOMIfExists(in);
if (with_names)
{
size_t num_columns = getPort().getHeader().columns();
read_columns.assign(num_columns, false);
assertChar('[', in);
do
{
skipWhitespaceIfAny(in);
String column_name;
readJSONString(column_name, in);
addInputColumn(column_name);
skipWhitespaceIfAny(in);
}
while (checkChar(',', in));
assertChar(']', in);
skipEndOfLine();
/// Type checking
assertChar('[', in);
for (size_t i = 0; i < column_indexes_for_input_fields.size(); ++i)
{
skipWhitespaceIfAny(in);
String data_type;
readJSONString(data_type, in);
if (column_indexes_for_input_fields[i] &&
data_types[*column_indexes_for_input_fields[i]]->getName() != data_type)
{
throw Exception(
"Type of '" + getPort().getHeader().getByPosition(*column_indexes_for_input_fields[i]).name
+ "' must be " + data_types[*column_indexes_for_input_fields[i]]->getName() +
", not " + data_type,
ErrorCodes::INCORRECT_DATA
);
}
if (i != column_indexes_for_input_fields.size() - 1)
assertChar(',', in);
skipWhitespaceIfAny(in);
}
assertChar(']', in);
}
else
{
size_t num_columns = getPort().getHeader().columns();
read_columns.assign(num_columns, true);
column_indexes_for_input_fields.resize(num_columns);
for (size_t i = 0; i < num_columns; ++i)
{
column_indexes_for_input_fields[i] = i;
}
}
for (size_t i = 0; i < read_columns.size(); ++i)
{
if (!read_columns[i])
{
not_seen_columns.emplace_back(i);
}
}
}
void JSONStringsEachRowRowInputFormat::addInputColumn(const String & column_name)
{
names_of_columns.emplace_back(column_name);
const auto column_it = column_indexes_by_names.find(column_name);
if (column_it == column_indexes_by_names.end())
{
if (format_settings.skip_unknown_fields)
{
column_indexes_for_input_fields.push_back(std::nullopt);
return;
}
throw Exception(
"Unknown field found in JSONStringsEachRow header: '" + column_name + "' " +
"at position " + std::to_string(column_indexes_for_input_fields.size()) +
"\nSet the 'input_format_skip_unknown_fields' parameter explicitly to ignore and proceed",
ErrorCodes::INCORRECT_DATA
);
}
const auto column_index = column_it->second;
if (read_columns[column_index])
throw Exception("Duplicate field found while parsing JSONStringsEachRow header: " + column_name, ErrorCodes::INCORRECT_DATA);
read_columns[column_index] = true;
column_indexes_for_input_fields.emplace_back(column_index);
}
bool JSONStringsEachRowRowInputFormat::readRow(DB::MutableColumns &columns, DB::RowReadExtension &ext)
{
skipEndOfLine();
if (in.eof())
return false;
size_t num_columns = columns.size();
read_columns.assign(num_columns, false);
assertChar('[', in);
for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
{
const auto & table_column = column_indexes_for_input_fields[file_column];
if (table_column)
{
readField(*table_column, columns);
}
else
{
skipJSONField(in, StringRef(names_of_columns[file_column]));
}
skipWhitespaceIfAny(in);
if (in.eof())
throw Exception("Unexpected end of stream while parsing JSONStringsEachRow format", ErrorCodes::CANNOT_READ_ALL_DATA);
if (file_column + 1 != column_indexes_for_input_fields.size())
{
assertChar(',', in);
skipWhitespaceIfAny(in);
}
}
assertChar(']', in);
for (const auto & name : not_seen_columns)
columns[name]->insertDefault();
ext.read_columns = read_columns;
return true;
}
void JSONStringsEachRowRowInputFormat::skipEndOfLine()
{
skipWhitespaceIfAny(in);
if (!in.eof() && (*in.position() == ',' || *in.position() == ';'))
++in.position();
skipWhitespaceIfAny(in);
}
void JSONStringsEachRowRowInputFormat::readField(size_t index, MutableColumns & columns)
{
try
{
read_columns[index] = true;
const auto & type = data_types[index];
String str;
readJSONString(str, in);
ReadBufferFromString buf(str);
type->deserializeAsWholeText(*columns[index], buf, format_settings);
}
catch (Exception & e)
{
e.addMessage("(while read the value of key " + getPort().getHeader().getByPosition(index).name + ")");
throw;
}
}
void JSONStringsEachRowRowInputFormat::syncAfterError()
{
skipToUnescapedNextLineOrEOF(in);
}
void registerInputFormatProcessorJSONStringsEachRow(FormatFactory & factory)
{
factory.registerInputFormatProcessor("JSONStringsEachRow", [](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<JSONStringsEachRowRowInputFormat>(buf, sample, std::move(params), settings, false);
});
factory.registerInputFormatProcessor("JSONStringsEachRowWithNamesAndTypes", [](
ReadBuffer & buf,
const Block & sample,
IRowInputFormat::Params params,
const FormatSettings & settings)
{
return std::make_shared<JSONStringsEachRowRowInputFormat>(buf, sample, std::move(params), settings, true);
});
}
}
#pragma once
#pragma once
#include <Core/Block.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <Formats/FormatSettings.h>
#include <Common/HashTable/HashMap.h>
namespace DB
{
class ReadBuffer;
/** A stream for reading data in JSONStringsEachRow and JSONStringsEachRowWithNamesAndTypes formats
*/
class JSONStringsEachRowRowInputFormat : public IRowInputFormat
{
public:
JSONStringsEachRowRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_, bool with_names_);
String getName() const override { return "JSONStringsEachRowRowInputFormat"; }
void readPrefix() override;
bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
bool allowSyncAfterError() const override { return true; }
void syncAfterError() override;
void resetParser() override;
private:
void addInputColumn(const String & column_name);
void skipEndOfLine();
void readField(size_t index, MutableColumns & columns);
const FormatSettings format_settings;
using IndexesMap = std::unordered_map<String, size_t>;
IndexesMap column_indexes_by_names;
using OptionalIndexes = std::vector<std::optional<size_t>>;
OptionalIndexes column_indexes_for_input_fields;
DataTypes data_types;
std::vector<UInt8> read_columns;
std::vector<size_t> not_seen_columns;
/// This is for the correct exceptions in skipping unknown fields.
std::vector<String> names_of_columns;
bool with_names;
};
}
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferValidUTF8.h>
#include <Processors/Formats/Impl/JSONStringsEachRowRowOutputFormat.h>
#include <Formats/FormatFactory.h>
namespace DB
{
JSONStringsEachRowRowOutputFormat::JSONStringsEachRowRowOutputFormat(WriteBuffer & out_,
const Block & header_,
FormatFactory::WriteCallback callback,
const FormatSettings & settings_,
bool with_names_)
: IRowOutputFormat(header_, out_, callback), settings(settings_), with_names(with_names_)
{
const auto & sample = getPort(PortKind::Main).getHeader();
NamesAndTypesList columns(sample.getNamesAndTypesList());
fields.assign(columns.begin(), columns.end());
}
void JSONStringsEachRowRowOutputFormat::writeField(const IColumn & column, const IDataType & type, size_t row_num)
{
WriteBufferFromOwnString buf;
type.serializeAsText(column, row_num, buf, settings);
writeJSONString(buf.str(), out, settings);
}
void JSONStringsEachRowRowOutputFormat::writeFieldDelimiter()
{
writeCString(", ", out);
}
void JSONStringsEachRowRowOutputFormat::writeRowStartDelimiter()
{
writeChar('[', out);
}
void JSONStringsEachRowRowOutputFormat::writeRowEndDelimiter()
{
writeCString("]\n", out);
}
void JSONStringsEachRowRowOutputFormat::writeTotals(const Columns & columns, size_t row_num)
{
writeChar('\n', out);
size_t num_columns = columns.size();
writeChar('[', out);
for (size_t i = 0; i < num_columns; ++i)
{
if (i != 0)
JSONStringsEachRowRowOutputFormat::writeFieldDelimiter();
JSONStringsEachRowRowOutputFormat::writeField(*columns[i], *types[i], row_num);
}
writeCString("]\n", out);
}
void JSONStringsEachRowRowOutputFormat::writePrefix()
{
if (with_names)
{
writeChar('[', out);
for (size_t i = 0; i < fields.size(); ++i)
{
writeChar('\"', out);
writeString(fields[i].name, out);
writeChar('\"', out);
if (i != fields.size() - 1)
writeCString(", ", out);
}
writeCString("]\n[", out);
for (size_t i = 0; i < fields.size(); ++i)
{
writeJSONString(fields[i].type->getName(), out, settings);
if (i != fields.size() - 1)
writeCString(", ", out);
}
writeCString("]\n", out);
}
}
void JSONStringsEachRowRowOutputFormat::consumeTotals(DB::Chunk chunk)
{
if (with_names)
IRowOutputFormat::consumeTotals(std::move(chunk));
}
void registerOutputFormatProcessorJSONStringsEachRow(FormatFactory & factory)
{
factory.registerOutputFormatProcessor("JSONStringsEachRow", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings)
{
return std::make_shared<JSONStringsEachRowRowOutputFormat>(buf, sample, callback, format_settings, false);
});
factory.registerOutputFormatProcessor("JSONStringsEachRowWithNamesAndTypes", [](
WriteBuffer &buf,
const Block &sample,
FormatFactory::WriteCallback callback,
const FormatSettings &format_settings)
{
return std::make_shared<JSONStringsEachRowRowOutputFormat>(buf, sample, callback, format_settings, true);
});
}
}
#pragma once
#include <Core/Block.h>
#include <IO/WriteBuffer.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <Formats/FormatSettings.h>
namespace DB
{
/** The stream for outputting data in JSON format, by object per line.
* Does not validate UTF-8.
*/
class JSONStringsEachRowRowOutputFormat : public IRowOutputFormat
{
public:
JSONStringsEachRowRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & settings_, bool with_names);
String getName() const override { return "JSONStringsEachRowRowOutputFormat"; }
void writePrefix() override;
void writeBeforeTotals() override {}
void writeTotals(const Columns & columns, size_t row_num) override;
void writeAfterTotals() override {}
void writeField(const IColumn & column, const IDataType & type, size_t row_num) override;
void writeFieldDelimiter() override;
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
protected:
void consumeTotals(Chunk) override;
/// No extremes.
void consumeExtremes(Chunk) override {}
private:
FormatSettings settings;
NamesAndTypes fields;
bool with_names;
};
}
#include <Processors/Formats/Impl/JSONStringsRowOutputFormat.h>
#include <Formats/FormatFactory.h>
#include <IO/WriteHelpers.h>
namespace DB
{
JSONStringsRowOutputFormat::JSONStringsRowOutputFormat(
WriteBuffer & out_, const Block & header, FormatFactory::WriteCallback callback, const FormatSettings & settings_)
: JSONRowOutputFormat(out_, header, callback, settings_)
{
}
void JSONStringsRowOutputFormat::writeField(const IColumn & column, const IDataType & type, size_t row_num)
{
WriteBufferFromOwnString buf;
type.serializeAsText(column, row_num, buf, settings);
writeJSONString(buf.str(), *ostr, settings);
++field_number;
}
void JSONStringsRowOutputFormat::writeFieldDelimiter()
{
writeCString(", ", *ostr);
}
void JSONStringsRowOutputFormat::writeTotalsFieldDelimiter()
{
writeCString(",", *ostr);
}
void JSONStringsRowOutputFormat::writeRowStartDelimiter()
{
writeCString("\t\t[", *ostr);
}
void JSONStringsRowOutputFormat::writeRowEndDelimiter()
{
writeChar(']', *ostr);
field_number = 0;
++row_count;
}
void JSONStringsRowOutputFormat::writeBeforeTotals()
{
writeCString(",\n", *ostr);
writeChar('\n', *ostr);
writeCString("\t\"totals\": [", *ostr);
}
void JSONStringsRowOutputFormat::writeAfterTotals()
{
writeChar(']', *ostr);
}
void JSONStringsRowOutputFormat::writeExtremesElement(const char * title, const Columns & columns, size_t row_num)
{
writeCString("\t\t\"", *ostr);
writeCString(title, *ostr);
writeCString("\": [", *ostr);
size_t extremes_columns = columns.size();
for (size_t i = 0; i < extremes_columns; ++i)
{
if (i != 0)
writeTotalsFieldDelimiter();
writeField(*columns[i], *types[i], row_num);
}
writeChar(']', *ostr);
}
void registerOutputFormatProcessorJSONStrings(FormatFactory & factory)
{
factory.registerOutputFormatProcessor("JSONStrings", [](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const FormatSettings & format_settings)
{
return std::make_shared<JSONStringsRowOutputFormat>(buf, sample, callback, format_settings);
});
}
}
#pragma once
#include <Core/Block.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteBufferValidUTF8.h>
#include <Processors/Formats/Impl/JSONRowOutputFormat.h>
namespace DB
{
struct FormatSettings;
/** The stream for outputting data in the JSONStrings format.
*/
class JSONStringsRowOutputFormat : public JSONRowOutputFormat
{
public:
JSONStringsRowOutputFormat(WriteBuffer & out_, const Block & header, FormatFactory::WriteCallback callback, const FormatSettings & settings_);
String getName() const override { return "JSONStringsRowOutputFormat"; }
void writeField(const IColumn & column, const IDataType & type, size_t row_num) override;
void writeFieldDelimiter() override;
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
void writeBeforeTotals() override;
void writeAfterTotals() override;
protected:
void writeExtremesElement(const char * title, const Columns & columns, size_t row_num) override;
void writeTotalsField(const IColumn & column, const IDataType & type, size_t row_num) override
{
return writeField(column, type, row_num);
}
void writeTotalsFieldDelimiter() override;
};
}
......@@ -31,6 +31,9 @@ SRCS(
Formats/Impl/JSONEachRowRowOutputFormat.cpp
Formats/Impl/JSONEachRowWithProgressRowOutputFormat.cpp
Formats/Impl/JSONRowOutputFormat.cpp
Formats/Impl/JSONStringsEachRowRowInputFormat.cpp
Formats/Impl/JSONStringsEachRowRowOutputFormat.cpp
Formats/Impl/JSONStringsRowOutputFormat.cpp
Formats/Impl/MarkdownRowOutputFormat.cpp
Formats/Impl/MsgPackRowInputFormat.cpp
Formats/Impl/MsgPackRowOutputFormat.cpp
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册