CapnProto should now load jagged structures

Changed how actions are created for CapnProto so it supports jagged
structures.
上级 171f5179
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
#include <boost/algorithm/string.hpp> #include <boost/algorithm/string.hpp>
#include <boost/range/join.hpp> #include <boost/range/join.hpp>
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <iostream>
namespace DB namespace DB
{ {
...@@ -108,34 +108,43 @@ capnp::StructSchema::Field getFieldOrThrow(capnp::StructSchema node, const std:: ...@@ -108,34 +108,43 @@ capnp::StructSchema::Field getFieldOrThrow(capnp::StructSchema node, const std::
else else
throw Exception("Field " + field + " doesn't exist in schema " + node.getShortDisplayName().cStr(), ErrorCodes::THERE_IS_NO_COLUMN); throw Exception("Field " + field + " doesn't exist in schema " + node.getShortDisplayName().cStr(), ErrorCodes::THERE_IS_NO_COLUMN);
} }
void CapnProtoRowInputStream::createActions(const NestedFieldList & sortedFields, capnp::StructSchema reader) void CapnProtoRowInputStream::createActions(const NestedFieldList & sortedFields, capnp::StructSchema reader)
{ {
String last; std::vector<capnp::StructSchema::Field> parents;
size_t level = 0; std::vector<std::string> tokens;
capnp::StructSchema::Field parent; std::vector<std::string> acc;
for (const auto & field : sortedFields) capnp::StructSchema cur_reader = reader;
size_t level = 0;
for (const auto & field : sortedFields)
{ {
// Move to a different field in the same structure, keep parent while(level > (field.tokens.size()-1) || (level > 0 && tokens[level-1] != field.tokens[level-1])) {
if (level > 0 && field.tokens[level - 1] != last) level--;
{ acc.push_back("POP");
auto child = getFieldOrThrow(parent.getContainingStruct(), field.tokens[level - 1]); actions.push_back({Action::POP});
reader = child.getType().asStruct(); tokens.pop_back();
actions.push_back({Action::POP}); parents.pop_back();
actions.push_back({Action::PUSH, child}); if(level > 0) {
} cur_reader = parents[level-1].getType().asStruct();
// Descend to a nested structure } else {
cur_reader = reader;
break;
}
}
for (; level < field.tokens.size() - 1; ++level) for (; level < field.tokens.size() - 1; ++level)
{ {
auto node = getFieldOrThrow(reader, field.tokens[level]);
auto node = getFieldOrThrow(cur_reader, field.tokens[level]);
if (node.getType().isStruct()) if (node.getType().isStruct())
{ {
// Descend to field structure // Descend to field structure
last = field.tokens[level]; parents.push_back(node);
parent = node; tokens.push_back(field.tokens[level]);
reader = parent.getType().asStruct();
actions.push_back({Action::PUSH, parent}); cur_reader = node.getType().asStruct();
acc.push_back("PUSH:"+field.tokens[level]);
actions.push_back({Action::PUSH, node});
} }
else if (node.getType().isList()) else if (node.getType().isList())
{ {
...@@ -146,7 +155,7 @@ void CapnProtoRowInputStream::createActions(const NestedFieldList & sortedFields ...@@ -146,7 +155,7 @@ void CapnProtoRowInputStream::createActions(const NestedFieldList & sortedFields
} }
// Read field from the structure // Read field from the structure
auto node = getFieldOrThrow(reader, field.tokens[level]); auto node = getFieldOrThrow(cur_reader, field.tokens[level]);
if (node.getType().isList() && actions.size() > 0 && actions.back().field == node) if (node.getType().isList() && actions.size() > 0 && actions.back().field == node)
{ {
// The field list here flattens Nested elements into multiple arrays // The field list here flattens Nested elements into multiple arrays
...@@ -160,8 +169,13 @@ void CapnProtoRowInputStream::createActions(const NestedFieldList & sortedFields ...@@ -160,8 +169,13 @@ void CapnProtoRowInputStream::createActions(const NestedFieldList & sortedFields
} }
else else
{ {
acc.push_back("READ:"+field.tokens[level]);
actions.push_back({Action::READ, node, {field.pos}}); actions.push_back({Action::READ, node, {field.pos}});
} }
for(size_t i = 0; i < acc.size(); i++) {
//std::cout << acc[i] << ",";
}
//std::cout << std::endl;
} }
} }
...@@ -191,9 +205,14 @@ CapnProtoRowInputStream::CapnProtoRowInputStream(ReadBuffer & istr_, const Block ...@@ -191,9 +205,14 @@ CapnProtoRowInputStream::CapnProtoRowInputStream(ReadBuffer & istr_, const Block
// Reorder list to make sure we don't have to backtrack // Reorder list to make sure we don't have to backtrack
std::sort(list.begin(), list.end(), [](const NestedField & a, const NestedField & b) std::sort(list.begin(), list.end(), [](const NestedField & a, const NestedField & b)
{ {
if (a.tokens.size() == b.tokens.size())
return a.tokens < b.tokens; size_t min = std::min(a.tokens.size(),b.tokens.size());
return a.tokens.size() < b.tokens.size(); for(size_t i = 0; i < min; i++) {
if(a.tokens[i] != b.tokens[i]){
return a.tokens[i] > b.tokens[i];
}
}
return a.tokens.size() < b.tokens.size();
}); });
createActions(list, root); createActions(list, root);
...@@ -202,6 +221,7 @@ CapnProtoRowInputStream::CapnProtoRowInputStream(ReadBuffer & istr_, const Block ...@@ -202,6 +221,7 @@ CapnProtoRowInputStream::CapnProtoRowInputStream(ReadBuffer & istr_, const Block
bool CapnProtoRowInputStream::read(MutableColumns & columns, RowReadExtension &) bool CapnProtoRowInputStream::read(MutableColumns & columns, RowReadExtension &)
{ {
std::cout << "############################################" << std::endl;
if (istr.eof()) if (istr.eof())
return false; return false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册