提交 23e2d17d 编写于 作者: N Nikolai Kochetov

Added JSONEachRowWithProgressRowOutputFormat.

上级 2ae3db79
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferValidUTF8.h>
#include <Formats/JSONEachRowWithProgressRowOutputStream.h>
#include <Formats/FormatFactory.h>
#include <Formats/BlockOutputStreamFromRowOutputStream.h>
namespace DB
{
void JSONEachRowWithProgressRowOutputStream::writeRowStartDelimiter()
{
writeCString("{\"row\":{", ostr);
}
void JSONEachRowWithProgressRowOutputStream::writeRowEndDelimiter()
{
writeCString("}}\n", ostr);
field_number = 0;
}
void JSONEachRowWithProgressRowOutputStream::onProgress(const Progress & value)
{
progress.incrementPiecewiseAtomically(value);
writeCString("{\"progress\":", ostr);
progress.writeJSON(ostr);
writeCString("}\n", ostr);
}
void registerOutputFormatJSONEachRowWithProgress(FormatFactory & factory)
{
factory.registerOutputFormat("JSONEachRowWithProgress", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
const FormatSettings & format_settings)
{
return std::make_shared<BlockOutputStreamFromRowOutputStream>(
std::make_shared<JSONEachRowWithProgressRowOutputStream>(buf, sample, format_settings), sample);
});
}
}
...@@ -29,8 +29,9 @@ protected: ...@@ -29,8 +29,9 @@ protected:
void consumeTotals(Chunk) override {} void consumeTotals(Chunk) override {}
void consumeExtremes(Chunk) override {} void consumeExtremes(Chunk) override {}
private:
size_t field_number = 0; size_t field_number = 0;
private:
Names fields; Names fields;
FormatSettings settings; FormatSettings settings;
......
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferValidUTF8.h>
#include <Processors/Formats/Impl/JSONEachRowWithProgressRowOutputFormat.h>
#include <Formats/FormatFactory.h>
namespace DB
{
void JSONEachRowWithProgressRowOutputFormat::writeRowStartDelimiter()
{
writeCString("{\"row\":{", out);
}
void JSONEachRowWithProgressRowOutputFormat::writeRowEndDelimiter()
{
writeCString("}}\n", out);
field_number = 0;
}
void JSONEachRowWithProgressRowOutputFormat::onProgress(const Progress & value)
{
progress.incrementPiecewiseAtomically(value);
writeCString("{\"progress\":", out);
progress.writeJSON(out);
writeCString("}\n", out);
}
void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory)
{
factory.registerOutputFormatProcessor("JSONEachRowWithProgress", [](
WriteBuffer & buf,
const Block & sample,
const Context &,
const FormatSettings & format_settings)
{
return std::make_shared<JSONEachRowWithProgressRowOutputFormat>(buf, sample, format_settings);
});
}
}
#pragma once #pragma once
#include <Processors/Formats/Impl/JSONEachRowRowOutputFormat.h>
#include <IO/Progress.h>
#include <Formats/JSONEachRowRowOutputStream.h>
namespace DB namespace DB
{ {
/** The stream for outputting data in JSON format, by object per line class JSONEachRowWithProgressRowOutputFormat : public JSONEachRowRowOutputFormat
* that includes progress rows. Does not validate UTF-8.
*/
class JSONEachRowWithProgressRowOutputStream : public JSONEachRowRowOutputStream
{ {
public: public:
using JSONEachRowRowOutputStream::JSONEachRowRowOutputStream; using JSONEachRowRowOutputFormat::JSONEachRowRowOutputFormat;
void writeRowStartDelimiter() override; void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override; void writeRowEndDelimiter() override;
...@@ -24,4 +18,3 @@ private: ...@@ -24,4 +18,3 @@ private:
}; };
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册