提交 b8061356 编写于 作者: A Avogar

Update parsing method in MsgPackRowInputFormat

上级 e46322fc
......@@ -24,34 +24,35 @@ namespace ErrorCodes
}
MsgPackRowInputFormat::MsgPackRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_)
: IRowInputFormat(header_, in_, std::move(params_)), buf(in_), data_types(header_.getDataTypes()) {}
: IRowInputFormat(header_, in_, std::move(params_)), buf(in), ctx(&reference_func, nullptr, msgpack::unpack_limit()), data_types(header_.getDataTypes()) {}
int MsgPackRowInputFormat::unpack(msgpack::zone & zone, size_t & offset)
{
offset = 0;
ctx.init();
ctx.user().set_zone(zone);
return ctx.execute(buf.position(), buf.buffer().end() - buf.position(), offset);
}
bool MsgPackRowInputFormat::readObject()
{
if (buf.eof())
return false;
PeekableReadBufferCheckpoint checkpoint{buf};
size_t offset = 0;
bool need_more_data = true;
while (need_more_data)
std::unique_ptr<msgpack::zone> zone(new msgpack::zone);
size_t offset;
while(!unpack(*zone, offset))
{
offset = 0;
try
{
object_handle = msgpack::unpack(buf.position(), buf.buffer().end() - buf.position(), offset);
need_more_data = false;
}
catch (msgpack::insufficient_bytes &)
{
buf.position() = buf.buffer().end();
if (buf.eof())
throw Exception("Unexpected end of file while parsing msgpack object.", ErrorCodes::INCORRECT_DATA);
buf.position() = buf.buffer().end();
buf.makeContinuousMemoryFromCheckpointToPos();
buf.rollbackToCheckpoint();
}
buf.position() = buf.buffer().end();
if (buf.eof())
throw Exception("Unexpected end of file while parsing msgpack object.", ErrorCodes::INCORRECT_DATA);
buf.position() = buf.buffer().end();
buf.makeContinuousMemoryFromCheckpointToPos();
buf.rollbackToCheckpoint();
}
buf.position() += offset;
object_handle = msgpack::object_handle(ctx.data(), std::move(zone));
return true;
}
......
......@@ -20,10 +20,15 @@ public:
private:
bool readObject();
void insertObject(IColumn & column, DataTypePtr type, const msgpack::object & object);
int unpack(msgpack::zone & zone, size_t & offset);
// msgpack makes a copy of object by default, this function tells unpacker not to copy.
static bool reference_func(msgpack::type::object_type, size_t, void *) { return true; }
PeekableReadBuffer buf;
DataTypes data_types;
msgpack::object_handle object_handle;
msgpack::v1::detail::context ctx;
DataTypes data_types;
};
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册