未验证 提交 d296e1b6 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #10216 from Avogar/msgpack_format

Update MsgPack format
......@@ -24,34 +24,35 @@ namespace ErrorCodes
}
MsgPackRowInputFormat::MsgPackRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_)
: IRowInputFormat(header_, in_, std::move(params_)), buf(in_), data_types(header_.getDataTypes()) {}
: IRowInputFormat(header_, in_, std::move(params_)), buf(in), ctx(&reference_func, nullptr, msgpack::unpack_limit()), data_types(header_.getDataTypes()) {}
int MsgPackRowInputFormat::unpack(msgpack::zone & zone, size_t & offset)
{
offset = 0;
ctx.init();
ctx.user().set_zone(zone);
return ctx.execute(buf.position(), buf.buffer().end() - buf.position(), offset);
}
bool MsgPackRowInputFormat::readObject()
{
if (buf.eof())
return false;
PeekableReadBufferCheckpoint checkpoint{buf};
size_t offset = 0;
bool need_more_data = true;
while (need_more_data)
std::unique_ptr<msgpack::zone> zone(new msgpack::zone);
size_t offset;
while (!unpack(*zone, offset))
{
offset = 0;
try
{
object_handle = msgpack::unpack(buf.position(), buf.buffer().end() - buf.position(), offset);
need_more_data = false;
}
catch (msgpack::insufficient_bytes &)
{
buf.position() = buf.buffer().end();
if (buf.eof())
throw Exception("Unexpected end of file while parsing msgpack object.", ErrorCodes::INCORRECT_DATA);
buf.position() = buf.buffer().end();
buf.makeContinuousMemoryFromCheckpointToPos();
buf.rollbackToCheckpoint();
}
buf.position() = buf.buffer().end();
if (buf.eof())
throw Exception("Unexpected end of file while parsing msgpack object.", ErrorCodes::INCORRECT_DATA);
buf.position() = buf.buffer().end();
buf.makeContinuousMemoryFromCheckpointToPos();
buf.rollbackToCheckpoint();
}
buf.position() += offset;
object_handle = msgpack::object_handle(ctx.data(), std::move(zone));
return true;
}
......@@ -119,8 +120,8 @@ void MsgPackRowInputFormat::insertObject(IColumn & column, DataTypePtr data_type
case TypeIndex::FixedString: [[fallthrough]];
case TypeIndex::String:
{
String str = object.as<String>();
column.insertData(str.data(), str.size());
msgpack::object_str obj_str = object.via.str;
column.insertData(obj_str.ptr, obj_str.size);
return;
}
case TypeIndex::Array:
......
......@@ -20,10 +20,15 @@ public:
private:
bool readObject();
void insertObject(IColumn & column, DataTypePtr type, const msgpack::object & object);
int unpack(msgpack::zone & zone, size_t & offset);
// msgpack makes a copy of object by default, this function tells unpacker not to copy.
static bool reference_func(msgpack::type::object_type, size_t, void *) { return true; }
PeekableReadBuffer buf;
DataTypes data_types;
msgpack::object_handle object_handle;
msgpack::v1::detail::context ctx;
DataTypes data_types;
};
}
......@@ -85,7 +85,6 @@ void MsgPackRowOutputFormat::serializeField(const IColumn & column, DataTypePtr
packer.pack_uint64(assert_cast<const DataTypeDateTime64::ColumnType &>(column).getElement(row_num));
return;
}
case TypeIndex::FixedString: [[fallthrough]];
case TypeIndex::String:
{
const StringRef & string = assert_cast<const ColumnString &>(column).getDataAt(row_num);
......@@ -93,6 +92,13 @@ void MsgPackRowOutputFormat::serializeField(const IColumn & column, DataTypePtr
packer.pack_str_body(string.data, string.size);
return;
}
case TypeIndex::FixedString:
{
const StringRef & string = assert_cast<const ColumnFixedString &>(column).getDataAt(row_num);
packer.pack_str(string.size);
packer.pack_str_body(string.data, string.size);
return;
}
case TypeIndex::Array:
{
auto nested_type = assert_cast<const DataTypeArray &>(*data_type).getNestedType();
......
......@@ -34,6 +34,7 @@
<value>RowBinary</value>
<value>Native</value>
<value>Avro</value>
<value>MsgPack</value>
</values>
</substitution>
</substitutions>
......@@ -42,7 +43,7 @@
<table_exists>test.hits</table_exists>
</preconditions>
<query>SELECT * FROM table_{format}</query>
<query>SELECT * FROM table_{format} FORMAT Null</query>
<drop_query>DROP TABLE IF EXISTS table_{format}</drop_query>
</test>
......@@ -44,6 +44,7 @@
<value>ODBCDriver2</value>
<value>MySQLWire</value>
<value>Avro</value>
<!-- <value>MsgPack</value> Does not work in performance test for unknown reason. -->
</values>
</substitution>
</substitutions>
......
......@@ -8,3 +8,6 @@
[[1,2,3],[1001,2002],[3167]] [[['one'],['two']],[['three']],[['four'],['five']]]
[0,1,2,3,42,253,254,255]
[255,254,253,42,3,2,1,0]
2020-01-01
2020-01-02
2020-01-02
......@@ -8,7 +8,7 @@ $CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS msgpack";
$CLICKHOUSE_CLIENT --query="CREATE TABLE msgpack (uint8 UInt8, uint16 UInt16, uint32 UInt32, uint64 UInt64, int8 Int8, int16 Int16, int32 Int32, int64 Int64, float Float32, double Float64, string String, date Date, datetime DateTime, datetime64 DateTime64, array Array(UInt32)) ENGINE = Memory";
$CLICKHOUSE_CLIENT --query="INSERT INTO msgpack VALUES (255, 65535, 4294967295, 100000000000, -128, -32768, -2147483648, -100000000000, 2.02, 10000.0000001, 'String', 18980, 1639872000, 1639872000000, [1,2,3,4,5]), (4, 1234, 3244467295, 500000000000, -1, -256, -14741221, -7000000000, 100.1, 14321.032141201, 'Another string', 20000, 1839882000, 1639872891123, [5,4,3,2,1]),(42, 42, 42, 42, 42, 42, 42, 42, 42.42, 42.42, '42', 42, 42, 42, [42])";
$CLICKHOUSE_CLIENT --query="INSERT INTO msgpack VALUES (255, 65535, 4294967295, 100000000000, -128, -32768, -2147483648, -100000000000, 2.02, 10000.0000001, 'String', 18980, 1639872000, 1639872000000, [1,2,3,4,5]), (4, 1234, 3244467295, 500000000000, -1, -256, -14741221, -7000000000, 100.1, 14321.032141201, 'Another string', 20000, 1839882000, 1639872891123, [5,4,3,2,1]), (42, 42, 42, 42, 42, 42, 42, 42, 42.42, 42.42, '42', 42, 42, 42, [42])";
$CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack FORMAT MsgPack" > $CURDIR/tmp_msgpac_test_all_types.msgpk;
......@@ -52,3 +52,11 @@ $CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack";
$CLICKHOUSE_CLIENT --query="DROP TABLE msgpack";
$CLICKHOUSE_CLIENT --query="CREATE TABLE msgpack (date FixedString(10)) ENGINE = Memory";
$CLICKHOUSE_CLIENT --query="INSERT INTO msgpack VALUES ('2020-01-01'), ('2020-01-02'), ('2020-01-02')";
$CLICKHOUSE_CLIENT --query="SELECT * FROM msgpack";
$CLICKHOUSE_CLIENT --query="DROP TABLE msgpack";
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册