From b8061356b0cb443d77d404828b5c93e51cf16a58 Mon Sep 17 00:00:00 2001 From: Avogar Date: Wed, 15 Apr 2020 02:08:55 +0300 Subject: [PATCH] Update parsing method in MsgPackRowInputFormat --- .../Formats/Impl/MsgPackRowInputFormat.cpp | 39 ++++++++++--------- .../Formats/Impl/MsgPackRowInputFormat.h | 7 +++- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/src/Processors/Formats/Impl/MsgPackRowInputFormat.cpp b/src/Processors/Formats/Impl/MsgPackRowInputFormat.cpp index 53c5a623a3..9b24978cf5 100644 --- a/src/Processors/Formats/Impl/MsgPackRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/MsgPackRowInputFormat.cpp @@ -24,34 +24,35 @@ namespace ErrorCodes } MsgPackRowInputFormat::MsgPackRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_) - : IRowInputFormat(header_, in_, std::move(params_)), buf(in_), data_types(header_.getDataTypes()) {} + : IRowInputFormat(header_, in_, std::move(params_)), buf(in), ctx(&reference_func, nullptr, msgpack::unpack_limit()), data_types(header_.getDataTypes()) {} + +int MsgPackRowInputFormat::unpack(msgpack::zone & zone, size_t & offset) +{ + offset = 0; + ctx.init(); + ctx.user().set_zone(zone); + return ctx.execute(buf.position(), buf.buffer().end() - buf.position(), offset); +} bool MsgPackRowInputFormat::readObject() { if (buf.eof()) return false; + PeekableReadBufferCheckpoint checkpoint{buf}; - size_t offset = 0; - bool need_more_data = true; - while (need_more_data) + std::unique_ptr zone(new msgpack::zone); + size_t offset; + while(!unpack(*zone, offset)) { - offset = 0; - try - { - object_handle = msgpack::unpack(buf.position(), buf.buffer().end() - buf.position(), offset); - need_more_data = false; - } - catch (msgpack::insufficient_bytes &) - { - buf.position() = buf.buffer().end(); - if (buf.eof()) - throw Exception("Unexpected end of file while parsing msgpack object.", ErrorCodes::INCORRECT_DATA); - buf.position() = buf.buffer().end(); - buf.makeContinuousMemoryFromCheckpointToPos(); - buf.rollbackToCheckpoint(); - } + buf.position() = buf.buffer().end(); + if (buf.eof()) + throw Exception("Unexpected end of file while parsing msgpack object.", ErrorCodes::INCORRECT_DATA); + buf.position() = buf.buffer().end(); + buf.makeContinuousMemoryFromCheckpointToPos(); + buf.rollbackToCheckpoint(); } buf.position() += offset; + object_handle = msgpack::object_handle(ctx.data(), std::move(zone)); return true; } diff --git a/src/Processors/Formats/Impl/MsgPackRowInputFormat.h b/src/Processors/Formats/Impl/MsgPackRowInputFormat.h index 8ed23a1e0f..a426dc4950 100644 --- a/src/Processors/Formats/Impl/MsgPackRowInputFormat.h +++ b/src/Processors/Formats/Impl/MsgPackRowInputFormat.h @@ -20,10 +20,15 @@ public: private: bool readObject(); void insertObject(IColumn & column, DataTypePtr type, const msgpack::object & object); + int unpack(msgpack::zone & zone, size_t & offset); + + // msgpack makes a copy of object by default, this function tells unpacker not to copy. + static bool reference_func(msgpack::type::object_type, size_t, void *) { return true; } PeekableReadBuffer buf; - DataTypes data_types; msgpack::object_handle object_handle; + msgpack::v1::detail::context ctx; + DataTypes data_types; }; } -- GitLab