NativeBlockInputStream.cpp 7.4 KB
Newer Older
1
#include <Core/Defines.h>
2

3 4 5
#include <IO/ReadHelpers.h>
#include <IO/VarInt.h>
#include <IO/CompressedReadBufferFromFile.h>
A
Alexey Milovidov 已提交
6

7 8 9 10 11 12 13
#include <Columns/ColumnArray.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeFactory.h>
14
#include <Common/typeid_cast.h>
15
#include <ext/range.h>
16

17
#include <DataStreams/NativeBlockInputStream.h>
A
Alexey Milovidov 已提交
18 19 20 21 22


namespace DB
{

23 24
namespace ErrorCodes
{
25 26 27
    extern const int INCORRECT_INDEX;
    extern const int LOGICAL_ERROR;
    extern const int CANNOT_READ_ALL_DATA;
28 29
}

30
NativeBlockInputStream::NativeBlockInputStream(
31 32 33 34 35 36
    ReadBuffer & istr_, UInt64 server_revision_,
    bool use_index_,
    IndexForNativeFormat::Blocks::const_iterator index_block_it_,
    IndexForNativeFormat::Blocks::const_iterator index_block_end_)
    : istr(istr_), server_revision(server_revision_),
    use_index(use_index_), index_block_it(index_block_it_), index_block_end(index_block_end_)
37
{
38 39 40 41 42 43 44 45
    if (use_index)
    {
        istr_concrete = typeid_cast<CompressedReadBufferFromFile *>(&istr);
        if (!istr_concrete)
            throw Exception("When need to use index for NativeBlockInputStream, istr must be CompressedReadBufferFromFile.", ErrorCodes::LOGICAL_ERROR);

        index_column_it = index_block_it->columns.begin();
    }
46 47 48
}


49
void NativeBlockInputStream::readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint)
50
{
51 52 53 54 55 56 57 58 59
    if (type.isNullable())
    {
        const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type);
        const IDataType & nested_type = *nullable_type.getNestedType();

        ColumnNullable & nullable_col = static_cast<ColumnNullable &>(column);
        IColumn & nested_col = *nullable_col.getNestedColumn();

        IColumn & null_map = nullable_col.getNullMapConcreteColumn();
60
        DataTypeUInt8{}.deserializeBinaryBulk(null_map, istr, rows, avg_value_size_hint);
61

62
        readData(nested_type, nested_col, istr, rows, avg_value_size_hint);
63 64 65 66 67

        return;
    }
    else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
    {
68 69
        /** For arrays, we deserialize the offsets first, and then the values.
          */
70 71 72 73 74 75 76 77 78 79 80
        IColumn & offsets_column = *typeid_cast<ColumnArray &>(column).getOffsetsColumn();
        type_arr->getOffsetsType()->deserializeBinaryBulk(offsets_column, istr, rows, 0);

        if (offsets_column.size() != rows)
            throw Exception("Cannot read all data in NativeBlockInputStream.", ErrorCodes::CANNOT_READ_ALL_DATA);

        if (rows)
            readData(
                *type_arr->getNestedType(),
                typeid_cast<ColumnArray &>(column).getData(),
                istr,
81
                typeid_cast<const ColumnArray &>(column).getOffsets()[rows - 1], 0);
82 83
    }
    else
84
        type.deserializeBinaryBulk(column, istr, rows, avg_value_size_hint);
85 86 87

    if (column.size() != rows)
        throw Exception("Cannot read all data in NativeBlockInputStream.", ErrorCodes::CANNOT_READ_ALL_DATA);
88 89 90
}


91
Block NativeBlockInputStream::readImpl()
A
Alexey Milovidov 已提交
92
{
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
    Block res;

    const DataTypeFactory & data_type_factory = DataTypeFactory::instance();

    if (use_index && index_block_it == index_block_end)
        return res;

    if (istr.eof())
    {
        if (use_index)
            throw Exception("Input doesn't contain all data for index.", ErrorCodes::CANNOT_READ_ALL_DATA);

        return res;
    }

    /// Additional information about the block.
    if (server_revision >= DBMS_MIN_REVISION_WITH_BLOCK_INFO)
        res.info.read(istr);

    /// Dimensions
    size_t columns = 0;
    size_t rows = 0;

    if (!use_index)
    {
        readVarUInt(columns, istr);
        readVarUInt(rows, istr);
    }
    else
    {
        columns = index_block_it->num_columns;
        rows = index_block_it->num_rows;
    }

    for (size_t i = 0; i < columns; ++i)
    {
        if (use_index)
        {
            /// If the current position is what is required, the real seek does not occur.
            istr_concrete->seek(index_column_it->location.offset_in_compressed_file, index_column_it->location.offset_in_decompressed_block);
        }

        ColumnWithTypeAndName column;

        /// Name
        readBinary(column.name, istr);

        /// Type
        String type_name;
        readBinary(type_name, istr);
        column.type = data_type_factory.get(type_name);

        if (use_index)
        {
            /// Index allows to do more checks.
            if (index_column_it->name != column.name)
                throw Exception("Index points to column with wrong name: corrupted index or data", ErrorCodes::INCORRECT_INDEX);
            if (index_column_it->type != type_name)
                throw Exception("Index points to column with wrong type: corrupted index or data", ErrorCodes::INCORRECT_INDEX);
        }

        /// Data
        column.column = column.type->createColumn();

157
        double avg_value_size_hint = avg_value_size_hints.empty() ? 0 : avg_value_size_hints[i];
158
        if (rows)    /// If no rows, nothing to read.
159
            readData(*column.type, *column.column, istr, rows, avg_value_size_hint);
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177

        res.insert(std::move(column));

        if (use_index)
            ++index_column_it;
    }

    if (use_index)
    {
        if (index_column_it != index_block_it->columns.end())
            throw Exception("Inconsistent index: not all columns were read", ErrorCodes::INCORRECT_INDEX);

        ++index_block_it;
        if (index_block_it != index_block_end)
            index_column_it = index_block_it->columns.begin();
    }

    return res;
A
Alexey Milovidov 已提交
178 179
}

180 181 182 183 184 185
void NativeBlockInputStream::updateAvgValueSizeHints(const Block & block)
{
    auto rows = block.rows();
    if (rows < 10)
        return;

186
    avg_value_size_hints.resize_fill(block.columns(), 0);
187 188 189 190

    for (auto idx : ext::range(0, block.columns()))
    {
        auto & avg_value_size_hint = avg_value_size_hints[idx];
191
        IDataType::updateAvgValueSizeHint(*block.getByPosition(idx).column, avg_value_size_hint);
192 193
    }
}
194 195 196

void IndexForNativeFormat::read(ReadBuffer & istr, const NameSet & required_columns)
{
197 198 199 200
    while (!istr.eof())
    {
        blocks.emplace_back();
        IndexOfBlockForNativeFormat & block = blocks.back();
201

202 203
        readVarUInt(block.num_columns, istr);
        readVarUInt(block.num_rows, istr);
204

205 206
        if (block.num_columns < required_columns.size())
            throw Exception("Index contain less than required columns", ErrorCodes::INCORRECT_INDEX);
207

208 209 210
        for (size_t i = 0; i < block.num_columns; ++i)
        {
            IndexOfOneColumnForNativeFormat column_index;
211

212 213 214 215
            readBinary(column_index.name, istr);
            readBinary(column_index.type, istr);
            readBinary(column_index.location.offset_in_compressed_file, istr);
            readBinary(column_index.location.offset_in_decompressed_block, istr);
216

217 218 219
            if (required_columns.count(column_index.name))
                block.columns.push_back(std::move(column_index));
        }
220

221 222 223 224
        if (block.columns.size() < required_columns.size())
            throw Exception("Index contain less than required columns", ErrorCodes::INCORRECT_INDEX);
        if (block.columns.size() > required_columns.size())
            throw Exception("Index contain duplicate columns", ErrorCodes::INCORRECT_INDEX);
225

226 227
        block.num_columns = block.columns.size();
    }
228 229
}

A
Alexey Milovidov 已提交
230
}