NativeBlockInputStream.cpp 7.8 KB
Newer Older
1
#include <Core/Defines.h>
2

3 4
#include <IO/ReadHelpers.h>
#include <IO/VarInt.h>
P
proller 已提交
5
#include <Compression/CompressedReadBufferFromFile.h>
A
Alexey Milovidov 已提交
6

7
#include <DataTypes/DataTypeFactory.h>
8
#include <Common/typeid_cast.h>
9
#include <ext/range.h>
10

11
#include <DataStreams/NativeBlockInputStream.h>
12
#include <DataTypes/DataTypeLowCardinality.h>
A
Alexey Milovidov 已提交
13 14 15 16 17


namespace DB
{

18 19
namespace ErrorCodes
{
20 21 22
    extern const int INCORRECT_INDEX;
    extern const int LOGICAL_ERROR;
    extern const int CANNOT_READ_ALL_DATA;
23
    extern const int NOT_IMPLEMENTED;
24 25
}

26 27 28 29 30 31

NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_)
    : istr(istr_), server_revision(server_revision_)
{
}

32 33
NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_)
    : istr(istr_), header(header_), server_revision(server_revision_)
34 35 36 37
{
}

NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_,
38 39 40
    IndexForNativeFormat::Blocks::const_iterator index_block_it_,
    IndexForNativeFormat::Blocks::const_iterator index_block_end_)
    : istr(istr_), server_revision(server_revision_),
41
    use_index(true), index_block_it(index_block_it_), index_block_end(index_block_end_)
42
{
43 44 45 46
    istr_concrete = typeid_cast<CompressedReadBufferFromFile *>(&istr);
    if (!istr_concrete)
        throw Exception("When need to use index for NativeBlockInputStream, istr must be CompressedReadBufferFromFile.", ErrorCodes::LOGICAL_ERROR);

47 48 49
    if (index_block_it == index_block_end)
        return;

50
    index_column_it = index_block_it->columns.begin();
51

52 53 54 55
    /// Initialize header from the index.
    for (const auto & column : index_block_it->columns)
    {
        auto type = DataTypeFactory::instance().get(column.type);
A
Alexey Milovidov 已提交
56
        header.insert(ColumnWithTypeAndName{ type, column.name });
57
    }
58 59 60
}


61
void NativeBlockInputStream::readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint)
62
{
63 64 65 66 67 68 69 70
    IDataType::DeserializeBinaryBulkSettings settings;
    settings.getter = [&](IDataType::SubstreamPath) -> ReadBuffer * { return &istr; };
    settings.avg_value_size_hint = avg_value_size_hint;
    settings.position_independent_encoding = false;

    IDataType::DeserializeBinaryBulkStatePtr state;
    type.deserializeBinaryBulkStatePrefix(settings, state);
    type.deserializeBinaryBulkWithMultipleStreams(column, rows, settings, state);
71 72

    if (column.size() != rows)
73 74
        throw Exception("Cannot read all data in NativeBlockInputStream. Rows read: " + toString(column.size()) + ". Rows expected: " + toString(rows) + ".",
            ErrorCodes::CANNOT_READ_ALL_DATA);
75 76 77
}


78
Block NativeBlockInputStream::getHeader() const
79
{
80
    return header;
81 82 83
}


84
Block NativeBlockInputStream::readImpl()
A
Alexey Milovidov 已提交
85
{
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
    Block res;

    const DataTypeFactory & data_type_factory = DataTypeFactory::instance();

    if (use_index && index_block_it == index_block_end)
        return res;

    if (istr.eof())
    {
        if (use_index)
            throw Exception("Input doesn't contain all data for index.", ErrorCodes::CANNOT_READ_ALL_DATA);

        return res;
    }

    /// Additional information about the block.
102
    if (server_revision > 0)
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
        res.info.read(istr);

    /// Dimensions
    size_t columns = 0;
    size_t rows = 0;

    if (!use_index)
    {
        readVarUInt(columns, istr);
        readVarUInt(rows, istr);
    }
    else
    {
        columns = index_block_it->num_columns;
        rows = index_block_it->num_rows;
    }

    for (size_t i = 0; i < columns; ++i)
    {
        if (use_index)
        {
            /// If the current position is what is required, the real seek does not occur.
            istr_concrete->seek(index_column_it->location.offset_in_compressed_file, index_column_it->location.offset_in_decompressed_block);
        }

        ColumnWithTypeAndName column;

        /// Name
        readBinary(column.name, istr);

        /// Type
        String type_name;
        readBinary(type_name, istr);
        column.type = data_type_factory.get(type_name);

        if (use_index)
        {
            /// Index allows to do more checks.
            if (index_column_it->name != column.name)
                throw Exception("Index points to column with wrong name: corrupted index or data", ErrorCodes::INCORRECT_INDEX);
            if (index_column_it->type != type_name)
                throw Exception("Index points to column with wrong type: corrupted index or data", ErrorCodes::INCORRECT_INDEX);
        }

        /// Data
148
        MutableColumnPtr read_column = column.type->createColumn();
149

150
        double avg_value_size_hint = avg_value_size_hints.empty() ? 0 : avg_value_size_hints[i];
151
        if (rows)    /// If no rows, nothing to read.
152 153 154
            readData(*column.type, *read_column, istr, rows, avg_value_size_hint);

        column.column = std::move(read_column);
155

156
        if (header)
157
        {
158 159 160 161
            /// Support insert from old clients without low cardinality type.
            auto & header_column = header.getByName(column.name);
            if (!header_column.type->equals(*column.type))
            {
C
CurtizJ 已提交
162
                column.column = recursiveTypeConversion(column.column, column.type, header.getByPosition(i).type);
163 164
                column.type = header.getByPosition(i).type;
            }
165
        }
166

167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
        res.insert(std::move(column));

        if (use_index)
            ++index_column_it;
    }

    if (use_index)
    {
        if (index_column_it != index_block_it->columns.end())
            throw Exception("Inconsistent index: not all columns were read", ErrorCodes::INCORRECT_INDEX);

        ++index_block_it;
        if (index_block_it != index_block_end)
            index_column_it = index_block_it->columns.begin();
    }

183 184 185
    if (rows && header)
    {
        /// Allow to skip columns. Fill them with default values.
186 187
        Block tmp_res;

188
        for (auto & col : header)
189 190
        {
            if (res.has(col.name))
191
                tmp_res.insert(res.getByName(col.name));
192
            else
193
                tmp_res.insert({col.type->createColumn()->cloneResized(rows), col.type, col.name});
194 195 196
        }

        res.swap(tmp_res);
197 198
    }

199
    return res;
A
Alexey Milovidov 已提交
200 201
}

202 203 204 205 206 207
void NativeBlockInputStream::updateAvgValueSizeHints(const Block & block)
{
    auto rows = block.rows();
    if (rows < 10)
        return;

208
    avg_value_size_hints.resize_fill(block.columns(), 0);
209 210 211 212

    for (auto idx : ext::range(0, block.columns()))
    {
        auto & avg_value_size_hint = avg_value_size_hints[idx];
213
        IDataType::updateAvgValueSizeHint(*block.getByPosition(idx).column, avg_value_size_hint);
214 215
    }
}
216 217 218

void IndexForNativeFormat::read(ReadBuffer & istr, const NameSet & required_columns)
{
219 220 221 222
    while (!istr.eof())
    {
        blocks.emplace_back();
        IndexOfBlockForNativeFormat & block = blocks.back();
223

224 225
        readVarUInt(block.num_columns, istr);
        readVarUInt(block.num_rows, istr);
226

227 228
        if (block.num_columns < required_columns.size())
            throw Exception("Index contain less than required columns", ErrorCodes::INCORRECT_INDEX);
229

230 231 232
        for (size_t i = 0; i < block.num_columns; ++i)
        {
            IndexOfOneColumnForNativeFormat column_index;
233

234 235 236 237
            readBinary(column_index.name, istr);
            readBinary(column_index.type, istr);
            readBinary(column_index.location.offset_in_compressed_file, istr);
            readBinary(column_index.location.offset_in_decompressed_block, istr);
238

239 240 241
            if (required_columns.count(column_index.name))
                block.columns.push_back(std::move(column_index));
        }
242

243 244 245 246
        if (block.columns.size() < required_columns.size())
            throw Exception("Index contain less than required columns", ErrorCodes::INCORRECT_INDEX);
        if (block.columns.size() > required_columns.size())
            throw Exception("Index contain duplicate columns", ErrorCodes::INCORRECT_INDEX);
247

248 249
        block.num_columns = block.columns.size();
    }
250 251
}

A
Alexey Milovidov 已提交
252
}