CapnProtoRowInputStream.cpp 11.6 KB
Newer Older
P
proller 已提交
1 2
#include "CapnProtoRowInputStream.h"

P
proller 已提交
3
#if USE_CAPNP
4
#include <IO/ReadBuffer.h>
5 6 7
#include <Interpreters/Context.h>
#include <Formats/FormatFactory.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>
8
#include <Formats/FormatSchemaInfo.h>
9 10 11
#include <capnp/serialize.h>
#include <capnp/dynamic.h>
#include <capnp/common.h>
12 13 14 15
#include <boost/algorithm/string.hpp>
#include <boost/range/join.hpp>
#include <common/logger_useful.h>

16

17 18 19
namespace DB
{

20 21 22 23 24
namespace ErrorCodes
{
    extern const int BAD_TYPE_OF_FIELD;
    extern const int BAD_ARGUMENTS;
    extern const int THERE_IS_NO_COLUMN;
A
Alexey Milovidov 已提交
25
    extern const int LOGICAL_ERROR;
26 27
}

28
CapnProtoRowInputStream::NestedField split(const Block & header, size_t i)
29
{
30
    CapnProtoRowInputStream::NestedField field = {{}, i};
31 32

    // Remove leading dot in field definition, e.g. ".msg" -> "msg"
33
    String name(header.safeGetByPosition(i).name);
34 35 36
    if (name.size() > 0 && name[0] == '.')
        name.erase(0, 1);

O
Odin Hultgren Van Der Horst 已提交
37
    boost::split(field.tokens, name, boost::is_any_of("._"));
38 39 40 41 42 43
    return field;
}


Field convertNodeToField(capnp::DynamicValue::Reader value)
{
A
Alexey Milovidov 已提交
44
    switch (value.getType())
45
    {
A
Alexey Milovidov 已提交
46
        case capnp::DynamicValue::UNKNOWN:
47
            throw Exception("Unknown field type", ErrorCodes::BAD_TYPE_OF_FIELD);
A
Alexey Milovidov 已提交
48 49 50
        case capnp::DynamicValue::VOID:
            return Field();
        case capnp::DynamicValue::BOOL:
A
Amos Bird 已提交
51
            return value.as<bool>() ? 1u : 0u;
A
Alexey Milovidov 已提交
52
        case capnp::DynamicValue::INT:
A
Amos Bird 已提交
53
            return value.as<int64_t>();
A
Alexey Milovidov 已提交
54
        case capnp::DynamicValue::UINT:
A
Amos Bird 已提交
55
            return value.as<uint64_t>();
A
Alexey Milovidov 已提交
56
        case capnp::DynamicValue::FLOAT:
A
Amos Bird 已提交
57
            return value.as<double>();
A
Alexey Milovidov 已提交
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
        case capnp::DynamicValue::TEXT:
        {
            auto arr = value.as<capnp::Text>();
            return String(arr.begin(), arr.size());
        }
        case capnp::DynamicValue::DATA:
        {
            auto arr = value.as<capnp::Data>().asChars();
            return String(arr.begin(), arr.size());
        }
        case capnp::DynamicValue::LIST:
        {
            auto listValue = value.as<capnp::DynamicList>();
            Array res(listValue.size());
            for (auto i : kj::indices(listValue))
73 74
                res[i] = convertNodeToField(listValue[i]);

A
Alexey Milovidov 已提交
75 76 77
            return res;
        }
        case capnp::DynamicValue::ENUM:
A
Amos Bird 已提交
78
            return value.as<capnp::DynamicEnum>().getRaw();
A
Alexey Milovidov 已提交
79
        case capnp::DynamicValue::STRUCT:
80 81 82 83 84 85 86 87 88 89 90
        {
            auto structValue = value.as<capnp::DynamicStruct>();
            const auto & fields = structValue.getSchema().getFields();

            Field field = Tuple(TupleBackend(fields.size()));
            TupleBackend & tuple = get<Tuple &>(field).toUnderType();
            for (auto i : kj::indices(fields))
                tuple[i] = convertNodeToField(structValue.get(fields[i]));

            return field;
        }
A
Alexey Milovidov 已提交
91
        case capnp::DynamicValue::CAPABILITY:
92
            throw Exception("CAPABILITY type not supported", ErrorCodes::BAD_TYPE_OF_FIELD);
A
Alexey Milovidov 已提交
93
        case capnp::DynamicValue::ANY_POINTER:
94
            throw Exception("ANY_POINTER type not supported", ErrorCodes::BAD_TYPE_OF_FIELD);
A
Alexey Milovidov 已提交
95
    }
96
    return Field();
97 98 99 100 101 102 103
}

capnp::StructSchema::Field getFieldOrThrow(capnp::StructSchema node, const std::string & field)
{
    KJ_IF_MAYBE(child, node.findFieldByName(field))
        return *child;
    else
104
        throw Exception("Field " + field + " doesn't exist in schema " + node.getShortDisplayName().cStr(), ErrorCodes::THERE_IS_NO_COLUMN);
105
}
106

A
Alexey Milovidov 已提交
107 108

void CapnProtoRowInputStream::createActions(const NestedFieldList & sorted_fields, capnp::StructSchema reader)
109
{
110 111 112
    /// Columns in a table can map to fields in Cap'n'Proto or to structs.

    /// Store common parents and their tokens in order to backtrack.
113
    std::vector<capnp::StructSchema::Field> parents;
114
    std::vector<std::string> parent_tokens;
115 116

    capnp::StructSchema cur_reader = reader;
117

A
Alexey Milovidov 已提交
118
    for (const auto & field : sorted_fields)
119
    {
A
Alexey Milovidov 已提交
120 121 122
        if (field.tokens.empty())
            throw Exception("Logical error in CapnProtoRowInputStream", ErrorCodes::LOGICAL_ERROR);

123
        // Backtrack to common parent
124 125
        while (field.tokens.size() < parent_tokens.size() + 1
            || !std::equal(parent_tokens.begin(), parent_tokens.end(), field.tokens.begin()))
126
        {
127 128
            actions.push_back({Action::POP});
            parents.pop_back();
129
            parent_tokens.pop_back();
A
Alexey Milovidov 已提交
130

131
            if (parents.empty())
132
            {
133 134 135
                cur_reader = reader;
                break;
            }
136 137
            else
                cur_reader = parents.back().getType().asStruct();
138
        }
A
Alexey Milovidov 已提交
139

140
        // Go forward
141
        while (parent_tokens.size() + 1 < field.tokens.size())
142
        {
143 144
            const auto & token = field.tokens[parents.size()];
            auto node = getFieldOrThrow(cur_reader, token);
145 146
            if (node.getType().isStruct())
            {
147
                // Descend to field structure
148 149
                parents.emplace_back(node);
                parent_tokens.emplace_back(token);
150
                cur_reader = node.getType().asStruct();
151
                actions.push_back({Action::PUSH, node});
152 153 154
            }
            else if (node.getType().isList())
            {
155
                break; // Collect list
156 157
            }
            else
158
                throw Exception("Field " + token + " is neither Struct nor List", ErrorCodes::BAD_TYPE_OF_FIELD);
159
        }
160

161
        // Read field from the structure
162
        auto node = getFieldOrThrow(cur_reader, field.tokens[parents.size()]);
163 164
        if (node.getType().isList() && actions.size() > 0 && actions.back().field == node)
        {
165
            // The field list here flattens Nested elements into multiple arrays
166
            // In order to map Nested types in Cap'nProto back, they need to be collected
167 168 169 170 171 172
            // Since the field names are sorted, the order of field positions must be preserved
            // For example, if the fields are { b @0 :Text, a @1 :Text }, the `a` would come first
            // even though it's position is second.
            auto & columns = actions.back().columns;
            auto it = std::upper_bound(columns.cbegin(), columns.cend(), field.pos);
            columns.insert(it, field.pos);
173 174 175
        }
        else
        {
176 177
            actions.push_back({Action::READ, node, {field.pos}});
        }
178 179 180
    }
}

181
CapnProtoRowInputStream::CapnProtoRowInputStream(ReadBuffer & istr_, const Block & header_, const FormatSchemaInfo& info)
182
    : istr(istr_), header(header_), parser(std::make_shared<SchemaParser>())
183 184
{
    // Parse the schema and fetch the root object
P
proller 已提交
185 186 187

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
188
    auto schema = parser->impl.parseDiskFile(info.schemaPath(), info.absoluteSchemaPath(), {});
P
proller 已提交
189 190
#pragma GCC diagnostic pop

191
    root = schema.getNested(info.messageName()).asStruct();
192 193 194

    /**
     * The schema typically consists of fields in various nested structures.
195
     * Here we gather the list of fields and sort them in a way so that fields in the same structure are adjacent,
196 197 198
     * and the nesting level doesn't decrease to make traversal easier.
     */
    NestedFieldList list;
199 200 201
    size_t num_columns = header.columns();
    for (size_t i = 0; i < num_columns; ++i)
        list.push_back(split(header, i));
202

A
Alexey Milovidov 已提交
203
    // Order list first by value of strings then by length of string vector.
204
    std::sort(list.begin(), list.end(), [](const NestedField & a, const NestedField & b) { return a.tokens < b.tokens; });
205 206 207
    createActions(list, root);
}

V
Vladislav Smirnov 已提交
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
kj::Array<capnp::word> CapnProtoRowInputStream::readMessage()
{
    uint32_t segment_count;
    istr.readStrict(reinterpret_cast<char*>(&segment_count), sizeof(uint32_t));

    // one for segmentCount and one because segmentCount starts from 0
    const auto prefix_size = (2 + segment_count) * sizeof(uint32_t);
    const auto words_prefix_size = (segment_count + 1) / 2 + 1;
    auto prefix = kj::heapArray<capnp::word>(words_prefix_size);
    auto prefix_chars = prefix.asChars();
    ::memcpy(prefix_chars.begin(), &segment_count, sizeof(uint32_t));

    // read size of each segment
    for (size_t i = 0; i <= segment_count; ++i)
        istr.readStrict(prefix_chars.begin() + ((i + 1) * sizeof(uint32_t)), sizeof(uint32_t));

    // calculate size of message
    const auto expected_words = capnp::expectedSizeInWordsFromPrefix(prefix);
    const auto expected_bytes = expected_words * sizeof(capnp::word);
    const auto data_size = expected_bytes - prefix_size;
    auto msg = kj::heapArray<capnp::word>(expected_words);
    auto msg_chars = msg.asChars();

    // read full message
    ::memcpy(msg_chars.begin(), prefix_chars.begin(), prefix_size);
    istr.readStrict(msg_chars.begin() + prefix_size, data_size);

    return msg;
}
237

C
chertus 已提交
238
bool CapnProtoRowInputStream::read(MutableColumns & columns, RowReadExtension &)
239 240 241 242
{
    if (istr.eof())
        return false;

V
Vladislav Smirnov 已提交
243
    auto array = readMessage();
P
proller 已提交
244 245

#if CAPNP_VERSION >= 8000
246
    capnp::UnalignedFlatArrayMessageReader msg(array);
P
proller 已提交
247 248 249
#else
    capnp::FlatArrayMessageReader msg(array);
#endif
250 251 252 253 254
    std::vector<capnp::DynamicStruct::Reader> stack;
    stack.push_back(msg.getRoot<capnp::DynamicStruct>(root));

    for (auto action : actions)
    {
A
Alexey Milovidov 已提交
255 256 257 258 259
        switch (action.type)
        {
            case Action::READ:
            {
                Field value = convertNodeToField(stack.back().get(action.field));
260 261
                if (action.columns.size() > 1)
                {
262 263 264 265 266 267
                    // Nested columns must be flattened into several arrays
                    // e.g. Array(Tuple(x ..., y ...)) -> Array(x ...), Array(y ...)
                    const Array & collected = DB::get<const Array &>(value);
                    size_t size = collected.size();
                    // The flattened array contains an array of a part of the nested tuple
                    Array flattened(size);
268 269
                    for (size_t column_index = 0; column_index < action.columns.size(); ++column_index)
                    {
270
                        // Populate array with a single tuple elements
271 272
                        for (size_t off = 0; off < size; ++off)
                        {
273 274 275 276 277 278
                            const TupleBackend & tuple = DB::get<const Tuple &>(collected[off]).toUnderType();
                            flattened[off] = tuple[column_index];
                        }
                        auto & col = columns[action.columns[column_index]];
                        col->insert(flattened);
                    }
279 280 281
                }
                else
                {
282 283 284 285
                    auto & col = columns[action.columns[0]];
                    col->insert(value);
                }

A
Alexey Milovidov 已提交
286 287 288 289 290 291 292 293
                break;
            }
            case Action::POP:
                stack.pop_back();
                break;
            case Action::PUSH:
                stack.push_back(stack.back().get(action.field).as<capnp::DynamicStruct>());
                break;
294 295 296 297 298 299
        }
    }

    return true;
}

300 301
void registerInputFormatCapnProto(FormatFactory & factory)
{
302 303
    factory.registerInputFormat(
        "CapnProto",
304 305 306 307 308
        [](ReadBuffer & buf,
           const Block & sample,
           const Context & context,
           UInt64 max_block_size,
           UInt64 rows_portion_size,
309
           FormatFactory::ReadCallback callback,
310
           const FormatSettings & settings)
311 312
        {
            return std::make_shared<BlockInputStreamFromRowInputStream>(
313
                std::make_shared<CapnProtoRowInputStream>(buf, sample, FormatSchemaInfo(context, "CapnProto")),
314 315
                sample,
                max_block_size,
316
                rows_portion_size,
317
                callback,
318 319
                settings);
        });
320 321 322 323 324 325 326 327 328 329
}

}

#else

namespace DB
{
    class FormatFactory;
    void registerInputFormatCapnProto(FormatFactory &) {}
330 331
}

332
#endif // USE_CAPNP