提交 6ecfc03c 编写于 作者: A Avogar

Change parsing msgpack data.

上级 3d9466e8
......@@ -24,21 +24,34 @@ namespace ErrorCodes
}
MsgPackRowInputFormat::MsgPackRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_)
: IRowInputFormat(header_, in_, std::move(params_)), data_types(header_.getDataTypes()) {}
: IRowInputFormat(header_, in_, std::move(params_)), buf(in_), data_types(header_.getDataTypes()) {}
bool MsgPackRowInputFormat::readObject()
{
if (in.eof() && unpacker.nonparsed_size() == 0)
if (buf.eof())
return false;
while (!unpacker.next(object_handle))
PeekableReadBufferCheckpoint checkpoint{buf};
size_t offset;
bool need_more_data = true;
while (need_more_data)
{
if (in.eof())
throw Exception("Unexpected end of file while parsing MsgPack object.", ErrorCodes::INCORRECT_DATA);
unpacker.reserve_buffer(in.available());
memcpy(unpacker.buffer(), in.position(), in.available());
unpacker.buffer_consumed(in.available());
in.position() += in.available();
offset = 0;
try
{
object_handle = msgpack::unpack(buf.position(), buf.buffer().end() - buf.position(), offset);
need_more_data = false;
}
catch (msgpack::insufficient_bytes &)
{
buf.position() = buf.buffer().end();
if (buf.eof())
throw Exception("Unexpected end of file while parsing msgpack object.", ErrorCodes::INCORRECT_DATA);
buf.position() = buf.buffer().end();
buf.makeContinuousMemoryFromCheckpointToPos();
buf.rollbackToCheckpoint();
}
}
buf.position() += offset;
return true;
}
......@@ -168,9 +181,9 @@ bool MsgPackRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &
void registerInputFormatProcessorMsgPack(FormatFactory & factory)
{
factory.registerInputFormatProcessor("MsgPack", [](
ReadBuffer &buf,
const Block &sample,
const RowInputFormatParams &params,
ReadBuffer & buf,
const Block & sample,
const RowInputFormatParams & params,
const FormatSettings &)
{
return std::make_shared<MsgPackRowInputFormat>(sample, buf, params);
......
......@@ -2,6 +2,7 @@
#include <Processors/Formats/IRowInputFormat.h>
#include <Formats/FormatFactory.h>
#include <IO/PeekableReadBuffer.h>
#include <msgpack.hpp>
namespace DB
......@@ -20,8 +21,8 @@ private:
bool readObject();
void insertObject(IColumn & column, DataTypePtr type, const msgpack::object & object);
PeekableReadBuffer buf;
DataTypes data_types;
msgpack::unpacker unpacker;
msgpack::object_handle object_handle;
};
......
......@@ -6,3 +6,5 @@
42 42 42 42 42 42 42 42 42.42 42.42 42 1970-02-12 1970-01-01 03:00:42 1970-01-01 03:00:00.042 [42]
[[1,2,3],[1001,2002],[3167]] [[['one'],['two']],[['three']],[['four'],['five']]]
[[1,2,3],[1001,2002],[3167]] [[['one'],['two']],[['three']],[['four'],['five']]]
[0,1,2,3,42,253,254,255]
[255,254,253,42,3,2,1,0]
......@@ -4,26 +4,49 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS msgpack";
$CLICKHOUSE_CLIENT --query="CREATE TABLE msgpack (uint8 UInt8, uint16 UInt16, uint32 UInt32, uint64 UInt64, int8 Int8, int16 Int16, int32 Int32, int64 Int64, float Float32, double Float64, string String, date Date, datetime DateTime, datetime64 DateTime64, array Array(UInt32)) ENGINE = Memory";
$CLICKHOUSE_CLIENT --query="INSERT INTO msgpack VALUES (255, 65535, 4294967295, 100000000000, -128, -32768, -2147483648, -100000000000, 2.02, 10000.0000001, 'String', 18980, 1639872000, 1639872000000, [1,2,3,4,5]), (4, 1234, 3244467295, 500000000000, -1, -256, -14741221, -7000000000, 100.1, 14321.032141201, 'Another string', 20000, 1839882000, 1639872891123, [5,4,3,2,1]),(42, 42, 42, 42, 42, 42, 42, 42, 42.42, 42.42, '42', 42, 42, 42, [42])";
$CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack FORMAT MsgPack" > $CURDIR/data_msgpack/all_types.msgpk;
$CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack FORMAT MsgPack" > $CURDIR/tmp_msgpac_test_all_types.msgpk;
cat $CURDIR/tmp_msgpac_test_all_types.msgpk | $CLICKHOUSE_CLIENT --query="INSERT INTO msgpack FORMAT MsgPack";
cat $CURDIR/data_msgpack/all_types.msgpk | $CLICKHOUSE_CLIENT --query="INSERT INTO msgpack FORMAT MsgPack";
rm $CURDIR/tmp_msgpac_test_all_types.msgpk
$CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack";
$CLICKHOUSE_CLIENT --query="DROP TABLE msgpack";
$CLICKHOUSE_CLIENT --query="CREATE TABLE msgpack (array1 Array(Array(UInt32)), array2 Array(Array(Array(String)))) ENGINE = Memory";
$CLICKHOUSE_CLIENT --query="INSERT INTO msgpack VALUES ([[1,2,3], [1001, 2002], [3167]], [[['one'], ['two']], [['three']],[['four'], ['five']]])";
$CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack FORMAT MsgPack" > $CURDIR/data_msgpack/nested_arrays.msgpk;
$CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack FORMAT MsgPack" > $CURDIR/tmp_msgpack_test_nested_arrays.msgpk;
cat $CURDIR/tmp_msgpack_test_nested_arrays.msgpk | $CLICKHOUSE_CLIENT --query="INSERT INTO msgpack FORMAT MsgPack";
rm $CURDIR/tmp_msgpack_test_nested_arrays.msgpk;
$CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack";
$CLICKHOUSE_CLIENT --query="DROP TABLE msgpack";
$CLICKHOUSE_CLIENT --query="CREATE TABLE msgpack (array Array(UInt8)) ENGINE = Memory";
$CLICKHOUSE_CLIENT --query="INSERT INTO msgpack VALUES ([0, 1, 2, 3, 42, 253, 254, 255]), ([255, 254, 253, 42, 3, 2, 1, 0])";
$CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack FORMAT MsgPack" > $CURDIR/tmp_msgpack_type_conversion.msgpk;
$CLICKHOUSE_CLIENT --query="DROP TABLE msgpack";
$CLICKHOUSE_CLIENT --query="CREATE TABLE msgpack (array Array(Int64)) ENGINE = Memory";
cat $CURDIR/data_msgpack/nested_arrays.msgpk | $CLICKHOUSE_CLIENT --query="INSERT INTO msgpack FORMAT MsgPack";
cat $CURDIR/tmp_msgpack_type_conversion.msgpk | $CLICKHOUSE_CLIENT --query="INSERT INTO msgpack FORMAT MsgPack";
rm $CURDIR/tmp_msgpack_type_conversion.msgpk;
$CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack";
......
ґ _onetwothreefourfive
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册