NativeBlockInputStream.cpp 6.7 KB
Newer Older
1
#include <Core/Defines.h>
2

3 4 5
#include <IO/ReadHelpers.h>
#include <IO/VarInt.h>
#include <IO/CompressedReadBufferFromFile.h>
A
Alexey Milovidov 已提交
6

7 8 9 10 11 12 13
#include <Columns/ColumnArray.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeFactory.h>
14

15
#include <DataStreams/NativeBlockInputStream.h>
A
Alexey Milovidov 已提交
16 17 18 19 20


namespace DB
{

21 22
namespace ErrorCodes
{
23 24 25
    extern const int INCORRECT_INDEX;
    extern const int LOGICAL_ERROR;
    extern const int CANNOT_READ_ALL_DATA;
26 27
}

28
NativeBlockInputStream::NativeBlockInputStream(
29 30 31 32 33 34
    ReadBuffer & istr_, UInt64 server_revision_,
    bool use_index_,
    IndexForNativeFormat::Blocks::const_iterator index_block_it_,
    IndexForNativeFormat::Blocks::const_iterator index_block_end_)
    : istr(istr_), server_revision(server_revision_),
    use_index(use_index_), index_block_it(index_block_it_), index_block_end(index_block_end_)
35
{
36 37 38 39 40 41 42 43
    if (use_index)
    {
        istr_concrete = typeid_cast<CompressedReadBufferFromFile *>(&istr);
        if (!istr_concrete)
            throw Exception("When need to use index for NativeBlockInputStream, istr must be CompressedReadBufferFromFile.", ErrorCodes::LOGICAL_ERROR);

        index_column_it = index_block_it->columns.begin();
    }
44 45 46
}


47
void NativeBlockInputStream::readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows)
48
{
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
    if (type.isNullable())
    {
        const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type);
        const IDataType & nested_type = *nullable_type.getNestedType();

        ColumnNullable & nullable_col = static_cast<ColumnNullable &>(column);
        IColumn & nested_col = *nullable_col.getNestedColumn();

        IColumn & null_map = nullable_col.getNullMapConcreteColumn();
        DataTypeUInt8{}.deserializeBinaryBulk(null_map, istr, rows, 0);

        readData(nested_type, nested_col, istr, rows);

        return;
    }
    else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
    {
        /** For arrays, you first need to deserialize the offsets, and then the values.
        */
        IColumn & offsets_column = *typeid_cast<ColumnArray &>(column).getOffsetsColumn();
        type_arr->getOffsetsType()->deserializeBinaryBulk(offsets_column, istr, rows, 0);

        if (offsets_column.size() != rows)
            throw Exception("Cannot read all data in NativeBlockInputStream.", ErrorCodes::CANNOT_READ_ALL_DATA);

        if (rows)
            readData(
                *type_arr->getNestedType(),
                typeid_cast<ColumnArray &>(column).getData(),
                istr,
                typeid_cast<const ColumnArray &>(column).getOffsets()[rows - 1]);
    }
    else
        type.deserializeBinaryBulk(column, istr, rows, 0);    /// TODO Use avg_value_size_hint.

    if (column.size() != rows)
        throw Exception("Cannot read all data in NativeBlockInputStream.", ErrorCodes::CANNOT_READ_ALL_DATA);
86 87 88
}


89
Block NativeBlockInputStream::readImpl()
A
Alexey Milovidov 已提交
90
{
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
    Block res;

    const DataTypeFactory & data_type_factory = DataTypeFactory::instance();

    if (use_index && index_block_it == index_block_end)
        return res;

    if (istr.eof())
    {
        if (use_index)
            throw Exception("Input doesn't contain all data for index.", ErrorCodes::CANNOT_READ_ALL_DATA);

        return res;
    }

    /// Additional information about the block.
    if (server_revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO)
        res.info.read(istr);

    /// Dimensions
    size_t columns = 0;
    size_t rows = 0;

    if (!use_index)
    {
        readVarUInt(columns, istr);
        readVarUInt(rows, istr);
    }
    else
    {
        columns = index_block_it->num_columns;
        rows = index_block_it->num_rows;
    }

    for (size_t i = 0; i < columns; ++i)
    {
        if (use_index)
        {
            /// If the current position is what is required, the real seek does not occur.
            istr_concrete->seek(index_column_it->location.offset_in_compressed_file, index_column_it->location.offset_in_decompressed_block);
        }

        ColumnWithTypeAndName column;

        /// Name
        readBinary(column.name, istr);

        /// Type
        String type_name;
        readBinary(type_name, istr);
        column.type = data_type_factory.get(type_name);

        if (use_index)
        {
            /// Index allows to do more checks.
            if (index_column_it->name != column.name)
                throw Exception("Index points to column with wrong name: corrupted index or data", ErrorCodes::INCORRECT_INDEX);
            if (index_column_it->type != type_name)
                throw Exception("Index points to column with wrong type: corrupted index or data", ErrorCodes::INCORRECT_INDEX);
        }

        /// Data
        column.column = column.type->createColumn();

        if (rows)    /// If no rows, nothing to read.
            readData(*column.type, *column.column, istr, rows);

        res.insert(std::move(column));

        if (use_index)
            ++index_column_it;
    }

    if (use_index)
    {
        if (index_column_it != index_block_it->columns.end())
            throw Exception("Inconsistent index: not all columns were read", ErrorCodes::INCORRECT_INDEX);

        ++index_block_it;
        if (index_block_it != index_block_end)
            index_column_it = index_block_it->columns.begin();
    }

    return res;
A
Alexey Milovidov 已提交
175 176
}

177 178 179

void IndexForNativeFormat::read(ReadBuffer & istr, const NameSet & required_columns)
{
180 181 182 183
    while (!istr.eof())
    {
        blocks.emplace_back();
        IndexOfBlockForNativeFormat & block = blocks.back();
184

185 186
        readVarUInt(block.num_columns, istr);
        readVarUInt(block.num_rows, istr);
187

188 189
        if (block.num_columns < required_columns.size())
            throw Exception("Index contain less than required columns", ErrorCodes::INCORRECT_INDEX);
190

191 192 193
        for (size_t i = 0; i < block.num_columns; ++i)
        {
            IndexOfOneColumnForNativeFormat column_index;
194

195 196 197 198
            readBinary(column_index.name, istr);
            readBinary(column_index.type, istr);
            readBinary(column_index.location.offset_in_compressed_file, istr);
            readBinary(column_index.location.offset_in_decompressed_block, istr);
199

200 201 202
            if (required_columns.count(column_index.name))
                block.columns.push_back(std::move(column_index));
        }
203

204 205 206 207
        if (block.columns.size() < required_columns.size())
            throw Exception("Index contain less than required columns", ErrorCodes::INCORRECT_INDEX);
        if (block.columns.size() > required_columns.size())
            throw Exception("Index contain duplicate columns", ErrorCodes::INCORRECT_INDEX);
208

209 210
        block.num_columns = block.columns.size();
    }
211 212 213
}


A
Alexey Milovidov 已提交
214
}