StorageJoin.cpp 17.3 KB
Newer Older
1
#include <Storages/StorageJoin.h>
2
#include <Storages/StorageFactory.h>
3
#include <Interpreters/HashJoin.h>
4
#include <Interpreters/Context.h>
A
Amos Bird 已提交
5
#include <Parsers/ASTCreateQuery.h>
6
#include <Parsers/ASTSetQuery.h>
7
#include <Parsers/ASTIdentifier.h>
A
Amos Bird 已提交
8
#include <Core/ColumnNumbers.h>
9
#include <DataStreams/IBlockInputStream.h>
A
Amos Bird 已提交
10
#include <DataTypes/NestedUtils.h>
11
#include <Interpreters/joinDispatch.h>
12
#include <Interpreters/TableJoin.h>
13
#include <Interpreters/castColumn.h>
14
#include <Common/assert_cast.h>
A
alexey-milovidov 已提交
15
#include <Common/quoteString.h>
16

17
#include <Poco/String.h>    /// toLower
18
#include <Poco/File.h>
19 20
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
21 22 23 24 25


namespace DB
{

26 27
namespace ErrorCodes
{
A
Alexey Milovidov 已提交
28 29
    extern const int NOT_IMPLEMENTED;
    extern const int LOGICAL_ERROR;
C
chertus 已提交
30
    extern const int UNSUPPORTED_JOIN_KEYS;
31 32
    extern const int NO_SUCH_COLUMN_IN_TABLE;
    extern const int INCOMPATIBLE_TYPE_OF_JOIN;
33 34
    extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
    extern const int BAD_ARGUMENTS;
35 36
}

37
StorageJoin::StorageJoin(
A
Alexander Tokmakov 已提交
38
    const String & relative_path_,
39
    const StorageID & table_id_,
40
    const Names & key_names_,
A
Amos Bird 已提交
41 42 43 44
    bool use_nulls_,
    SizeLimits limits_,
    ASTTableJoin::Kind kind_,
    ASTTableJoin::Strictness strictness_,
45
    const ColumnsDescription & columns_,
A
Alexey Milovidov 已提交
46
    const ConstraintsDescription & constraints_,
47
    bool overwrite_,
V
Vxider 已提交
48
    const Context & context_,
V
Vxider 已提交
49 50
    bool persistent_)
    : StorageSetOrJoinBase{relative_path_, table_id_, columns_, constraints_, context_, persistent_}
A
Amos Bird 已提交
51 52 53 54 55
    , key_names(key_names_)
    , use_nulls(use_nulls_)
    , limits(limits_)
    , kind(kind_)
    , strictness(strictness_)
56
    , overwrite(overwrite_)
57
{
58
    auto metadata_snapshot = getInMemoryMetadataPtr();
59
    for (const auto & key : key_names)
60
        if (!metadata_snapshot->getColumns().hasPhysical(key))
61
            throw Exception{"Key column (" + key + ") does not exist in table declaration.", ErrorCodes::NO_SUCH_COLUMN_IN_TABLE};
62

63
    table_join = std::make_shared<TableJoin>(limits, use_nulls, kind, strictness, key_names);
64
    join = std::make_shared<HashJoin>(table_join, metadata_snapshot->getSampleBlock().sortColumns(), overwrite);
65
    restore();
66 67 68
}


A
alesapin 已提交
69
void StorageJoin::truncate(
A
alesapin 已提交
70
    const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder&)
71 72 73
{
    Poco::File(path).remove(true);
    Poco::File(path).createDirectories();
74
    Poco::File(path + "tmp/").createDirectories();
75 76

    increment = 0;
77
    join = std::make_shared<HashJoin>(table_join, metadata_snapshot->getSampleBlock().sortColumns(), overwrite);
78
}
79 80


81
HashJoinPtr StorageJoin::getJoin(std::shared_ptr<TableJoin> analyzed_join) const
82
{
83
    auto metadata_snapshot = getInMemoryMetadataPtr();
C
chertus 已提交
84
    if (!analyzed_join->sameStrictnessAndKind(strictness, kind))
A
Alexander Tokmakov 已提交
85
        throw Exception("Table " + getStorageID().getNameForLogs() + " has incompatible type of JOIN.", ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN);
C
chertus 已提交
86

C
chertus 已提交
87 88
    if ((analyzed_join->forceNullableRight() && !use_nulls) ||
        (!analyzed_join->forceNullableRight() && isLeftOrFull(analyzed_join->kind()) && use_nulls))
89
        throw Exception("Table " + getStorageID().getNameForLogs() + " needs the same join_use_nulls setting as present in LEFT or FULL JOIN.",
90 91
                        ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN);

C
chertus 已提交
92 93
    /// TODO: check key columns

94 95 96
    /// Some HACK to remove wrong names qualifiers: table.column -> column.
    analyzed_join->setRightKeys(key_names);

97
    HashJoinPtr join_clone = std::make_shared<HashJoin>(analyzed_join, metadata_snapshot->getSampleBlock().sortColumns());
C
chertus 已提交
98 99
    join_clone->reuseJoinedData(*join);
    return join_clone;
100 101 102
}


103
void StorageJoin::insertBlock(const Block & block) { join->addJoinedBlock(block, true); }
104

105
size_t StorageJoin::getSize() const { return join->getTotalRowCount(); }
106 107
std::optional<UInt64> StorageJoin::totalRows(const Settings &) const { return join->getTotalRowCount(); }
std::optional<UInt64> StorageJoin::totalBytes(const Settings &) const { return join->getTotalByteCount(); }
108

109 110 111

void registerStorageJoin(StorageFactory & factory)
{
112
    auto creator_fn = [](const StorageFactory::Arguments & args)
113 114 115 116 117
    {
        /// Join(ANY, LEFT, k1, k2, ...)

        ASTs & engine_args = args.engine_args;

A
Alexey Milovidov 已提交
118
        const auto & settings = args.context.getSettingsRef();
119

A
Amos Bird 已提交
120 121 122 123
        auto join_use_nulls = settings.join_use_nulls;
        auto max_rows_in_join = settings.max_rows_in_join;
        auto max_bytes_in_join = settings.max_bytes_in_join;
        auto join_overflow_mode = settings.join_overflow_mode;
124
        auto join_any_take_last_row = settings.join_any_take_last_row;
125
        auto old_any_join = settings.any_join_distinct_right_table_keys;
A
alexey-milovidov 已提交
126
        bool persistent = true;
A
Amos Bird 已提交
127 128 129

        if (args.storage_def && args.storage_def->settings)
        {
130
            for (const auto & setting : args.storage_def->settings->changes)
A
Amos Bird 已提交
131
            {
A
Alexey Milovidov 已提交
132
                if (setting.name == "join_use_nulls")
133
                    join_use_nulls = setting.value;
A
Alexey Milovidov 已提交
134
                else if (setting.name == "max_rows_in_join")
135
                    max_rows_in_join = setting.value;
A
Alexey Milovidov 已提交
136
                else if (setting.name == "max_bytes_in_join")
137
                    max_bytes_in_join = setting.value;
A
Alexey Milovidov 已提交
138
                else if (setting.name == "join_overflow_mode")
139
                    join_overflow_mode = setting.value;
140
                else if (setting.name == "join_any_take_last_row")
141
                    join_any_take_last_row = setting.value;
142
                else if (setting.name == "any_join_distinct_right_table_keys")
143
                    old_any_join = setting.value;
V
Vxider 已提交
144
                else if (setting.name == "persistent")
V
Vxider 已提交
145 146 147
                {
                    auto join_settings = std::make_unique<JoinSettings>();
                    join_settings->loadFromQuery(*args.storage_def);
A
alexey-milovidov 已提交
148
                    persistent = join_settings->persistent;
V
Vxider 已提交
149
                }
A
Amos Bird 已提交
150 151 152 153 154 155 156
                else
                    throw Exception(
                        "Unknown setting " + setting.name + " for storage " + args.engine_name,
                        ErrorCodes::BAD_ARGUMENTS);
            }
        }

157 158 159 160 161 162 163 164 165 166 167 168
        if (engine_args.size() < 3)
            throw Exception(
                "Storage Join requires at least 3 parameters: Join(ANY|ALL|SEMI|ANTI, LEFT|INNER|RIGHT, keys...).",
                ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);

        ASTTableJoin::Strictness strictness = ASTTableJoin::Strictness::Unspecified;
        ASTTableJoin::Kind kind = ASTTableJoin::Kind::Comma;

        if (auto opt_strictness_id = tryGetIdentifierName(engine_args[0]))
        {
            const String strictness_str = Poco::toLower(*opt_strictness_id);

C
chertus 已提交
169
            if (strictness_str == "any")
170 171 172 173 174 175
            {
                if (old_any_join)
                    strictness = ASTTableJoin::Strictness::RightAny;
                else
                    strictness = ASTTableJoin::Strictness::Any;
            }
C
chertus 已提交
176
            else if (strictness_str == "all")
177
                strictness = ASTTableJoin::Strictness::All;
C
chertus 已提交
178
            else if (strictness_str == "semi")
179
                strictness = ASTTableJoin::Strictness::Semi;
C
chertus 已提交
180
            else if (strictness_str == "anti")
181 182 183 184
                strictness = ASTTableJoin::Strictness::Anti;
        }

        if (strictness == ASTTableJoin::Strictness::Unspecified)
C
chertus 已提交
185 186
            throw Exception("First parameter of storage Join must be ANY or ALL or SEMI or ANTI (without quotes).",
                            ErrorCodes::BAD_ARGUMENTS);
187 188 189 190 191

        if (auto opt_kind_id = tryGetIdentifierName(engine_args[1]))
        {
            const String kind_str = Poco::toLower(*opt_kind_id);

C
chertus 已提交
192
            if (kind_str == "left")
193
                kind = ASTTableJoin::Kind::Left;
C
chertus 已提交
194
            else if (kind_str == "inner")
195
                kind = ASTTableJoin::Kind::Inner;
C
chertus 已提交
196
            else if (kind_str == "right")
197
                kind = ASTTableJoin::Kind::Right;
C
chertus 已提交
198
            else if (kind_str == "full")
199 200 201 202 203 204 205 206
            {
                if (strictness == ASTTableJoin::Strictness::Any)
                    strictness = ASTTableJoin::Strictness::RightAny;
                kind = ASTTableJoin::Kind::Full;
            }
        }

        if (kind == ASTTableJoin::Kind::Comma)
C
chertus 已提交
207 208
            throw Exception("Second parameter of storage Join must be LEFT or INNER or RIGHT or FULL (without quotes).",
                            ErrorCodes::BAD_ARGUMENTS);
209 210 211 212 213 214 215 216 217 218 219 220

        Names key_names;
        key_names.reserve(engine_args.size() - 2);
        for (size_t i = 2, size = engine_args.size(); i < size; ++i)
        {
            auto opt_key = tryGetIdentifierName(engine_args[i]);
            if (!opt_key)
                throw Exception("Parameter №" + toString(i + 1) + " of storage Join don't look like column name.", ErrorCodes::BAD_ARGUMENTS);

            key_names.push_back(*opt_key);
        }

221
        return StorageJoin::create(
A
Alexander Tokmakov 已提交
222
            args.relative_data_path,
223
            args.table_id,
A
Amos Bird 已提交
224
            key_names,
A
alesapin 已提交
225 226
            join_use_nulls,
            SizeLimits{max_rows_in_join, max_bytes_in_join, join_overflow_mode},
A
Amos Bird 已提交
227 228
            kind,
            strictness,
229
            args.columns,
A
Alexey Milovidov 已提交
230
            args.constraints,
A
Alexander Tokmakov 已提交
231
            join_any_take_last_row,
V
Vxider 已提交
232
            args.context,
A
alexey-milovidov 已提交
233
            persistent);
234 235 236
    };

    factory.registerStorage("Join", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, });
237 238
}

A
Amos Bird 已提交
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
template <typename T>
static const char * rawData(T & t)
{
    return reinterpret_cast<const char *>(&t);
}
template <typename T>
static size_t rawSize(T &)
{
    return sizeof(T);
}
template <>
const char * rawData(const StringRef & t)
{
    return t.data;
}
template <>
size_t rawSize(const StringRef & t)
{
    return t.size;
}

260
class JoinSource : public SourceWithProgress
A
Amos Bird 已提交
261 262
{
public:
263
    JoinSource(const HashJoin & parent_, UInt64 max_block_size_, Block sample_block_)
264 265 266 267 268
        : SourceWithProgress(sample_block_)
        , parent(parent_)
        , lock(parent.data->rwlock)
        , max_block_size(max_block_size_)
        , sample_block(std::move(sample_block_))
A
Amos Bird 已提交
269 270
    {
        column_indices.resize(sample_block.columns());
A
Artem Zuikov 已提交
271 272 273

        auto & saved_block = parent.getJoinedData()->sample_block;

A
Amos Bird 已提交
274 275 276
        for (size_t i = 0; i < sample_block.columns(); ++i)
        {
            auto & [_, type, name] = sample_block.getByPosition(i);
C
chertus 已提交
277
            if (parent.right_table_keys.has(name))
A
Amos Bird 已提交
278 279
            {
                key_pos = i;
A
Artem Zuikov 已提交
280
                const auto & column = parent.right_table_keys.getByName(name);
A
Artem Zuikov 已提交
281
                restored_block.insert(column);
A
Amos Bird 已提交
282 283 284
            }
            else
            {
A
Artem Zuikov 已提交
285
                size_t pos = saved_block.getPositionByName(name);
A
Amos Bird 已提交
286
                column_indices[i] = pos;
A
Artem Zuikov 已提交
287

A
Artem Zuikov 已提交
288
                const auto & column = saved_block.getByPosition(pos);
A
Artem Zuikov 已提交
289
                restored_block.insert(column);
A
Amos Bird 已提交
290 291 292 293 294 295 296
            }
        }
    }

    String getName() const override { return "Join"; }

protected:
297
    Chunk generate() override
A
Amos Bird 已提交
298
    {
C
chertus 已提交
299
        if (parent.data->blocks.empty())
300
            return {};
A
Amos Bird 已提交
301

302
        Chunk chunk;
C
chertus 已提交
303
        if (!joinDispatch(parent.kind, parent.strictness, parent.data->maps,
304
                [&](auto kind, auto strictness, auto & map) { chunk = createChunk<kind, strictness>(map); }))
305
            throw Exception("Logical error: unknown JOIN strictness", ErrorCodes::LOGICAL_ERROR);
306
        return chunk;
A
Amos Bird 已提交
307 308 309
    }

private:
310
    const HashJoin & parent;
A
Amos Bird 已提交
311
    std::shared_lock<std::shared_mutex> lock;
A
Alexey Milovidov 已提交
312
    UInt64 max_block_size;
A
Amos Bird 已提交
313
    Block sample_block;
A
Artem Zuikov 已提交
314
    Block restored_block; /// sample_block with parent column types
A
Amos Bird 已提交
315 316 317 318 319 320 321

    ColumnNumbers column_indices;
    std::optional<size_t> key_pos;

    std::unique_ptr<void, std::function<void(void *)>> position; /// type erasure


322
    template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps>
323
    Chunk createChunk(const Maps & maps)
A
Amos Bird 已提交
324
    {
325
        MutableColumns mut_columns = restored_block.cloneEmpty().mutateColumns();
A
Amos Bird 已提交
326 327 328

        size_t rows_added = 0;

C
chertus 已提交
329
        switch (parent.data->type)
A
Amos Bird 已提交
330 331
        {
#define M(TYPE)                                           \
332
    case HashJoin::Type::TYPE:                                \
333
        rows_added = fillColumns<KIND, STRICTNESS>(*maps.TYPE, mut_columns); \
A
Amos Bird 已提交
334 335 336 337 338
        break;
            APPLY_FOR_JOIN_VARIANTS_LIMITED(M)
#undef M

            default:
C
chertus 已提交
339
                throw Exception("Unsupported JOIN keys in StorageJoin. Type: " + toString(static_cast<UInt32>(parent.data->type)),
C
chertus 已提交
340
                                ErrorCodes::UNSUPPORTED_JOIN_KEYS);
A
Amos Bird 已提交
341 342 343 344 345
        }

        if (!rows_added)
            return {};

346 347 348 349 350 351
        Columns columns;
        columns.reserve(mut_columns.size());
        for (auto & col : mut_columns)
            columns.emplace_back(std::move(col));

        /// Correct nullability and LowCardinality types
A
Amos Bird 已提交
352
        for (size_t i = 0; i < columns.size(); ++i)
A
Artem Zuikov 已提交
353
        {
354 355
            const auto & src = restored_block.getByPosition(i);
            const auto & dst = sample_block.getByPosition(i);
A
Artem Zuikov 已提交
356

357
            if (!src.type->equals(*dst.type))
A
Amos Bird 已提交
358
            {
359 360 361
                auto arg = src;
                arg.column = std::move(columns[i]);
                columns[i] = castColumn(arg, dst.type);
A
Amos Bird 已提交
362
            }
A
Artem Zuikov 已提交
363
        }
A
Amos Bird 已提交
364

A
Artem Zuikov 已提交
365 366
        UInt64 num_rows = columns.at(0)->size();
        return Chunk(std::move(columns), num_rows);
A
Amos Bird 已提交
367 368
    }

369
    template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Map>
A
Artem Zuikov 已提交
370
    size_t fillColumns(const Map & map, MutableColumns & columns)
A
Amos Bird 已提交
371 372 373 374 375 376 377 378 379 380 381 382 383
    {
        size_t rows_added = 0;

        if (!position)
            position = decltype(position)(
                static_cast<void *>(new typename Map::const_iterator(map.begin())),
                [](void * ptr) { delete reinterpret_cast<typename Map::const_iterator *>(ptr); });

        auto & it = *reinterpret_cast<typename Map::const_iterator *>(position.get());
        auto end = map.end();

        for (; it != end; ++it)
        {
384
            if constexpr (STRICTNESS == ASTTableJoin::Strictness::RightAny)
A
Amos Bird 已提交
385
            {
386 387 388 389 390
                fillOne<Map>(columns, column_indices, it, key_pos, rows_added);
            }
            else if constexpr (STRICTNESS == ASTTableJoin::Strictness::All)
            {
                fillAll<Map>(columns, column_indices, it, key_pos, rows_added);
A
Amos Bird 已提交
391
            }
392 393
            else if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any)
            {
394 395 396 397 398 399 400 401 402 403 404
                if constexpr (KIND == ASTTableJoin::Kind::Left || KIND == ASTTableJoin::Kind::Inner)
                    fillOne<Map>(columns, column_indices, it, key_pos, rows_added);
                else if constexpr (KIND == ASTTableJoin::Kind::Right)
                    fillAll<Map>(columns, column_indices, it, key_pos, rows_added);
            }
            else if constexpr (STRICTNESS == ASTTableJoin::Strictness::Semi)
            {
                if constexpr (KIND == ASTTableJoin::Kind::Left)
                    fillOne<Map>(columns, column_indices, it, key_pos, rows_added);
                else if constexpr (KIND == ASTTableJoin::Kind::Right)
                    fillAll<Map>(columns, column_indices, it, key_pos, rows_added);
405
            }
406
            else if constexpr (STRICTNESS == ASTTableJoin::Strictness::Anti)
407
            {
408 409 410 411
                if constexpr (KIND == ASTTableJoin::Kind::Left)
                    fillOne<Map>(columns, column_indices, it, key_pos, rows_added);
                else if constexpr (KIND == ASTTableJoin::Kind::Right)
                    fillAll<Map>(columns, column_indices, it, key_pos, rows_added);
412
            }
A
Amos Bird 已提交
413
            else
414
                throw Exception("This JOIN is not implemented yet", ErrorCodes::NOT_IMPLEMENTED);
A
Amos Bird 已提交
415 416 417 418 419 420 421 422 423 424

            if (rows_added >= max_block_size)
            {
                ++it;
                break;
            }
        }

        return rows_added;
    }
425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451

    template <typename Map>
    static void fillOne(MutableColumns & columns, const ColumnNumbers & column_indices, typename Map::const_iterator & it,
                        const std::optional<size_t> & key_pos, size_t & rows_added)
    {
        for (size_t j = 0; j < columns.size(); ++j)
            if (j == key_pos)
                columns[j]->insertData(rawData(it->getKey()), rawSize(it->getKey()));
            else
                columns[j]->insertFrom(*it->getMapped().block->getByPosition(column_indices[j]).column.get(), it->getMapped().row_num);
        ++rows_added;
    }

    template <typename Map>
    static void fillAll(MutableColumns & columns, const ColumnNumbers & column_indices, typename Map::const_iterator & it,
                        const std::optional<size_t> & key_pos, size_t & rows_added)
    {
        for (auto ref_it = it->getMapped().begin(); ref_it.ok(); ++ref_it)
        {
            for (size_t j = 0; j < columns.size(); ++j)
                if (j == key_pos)
                    columns[j]->insertData(rawData(it->getKey()), rawSize(it->getKey()));
                else
                    columns[j]->insertFrom(*ref_it->block->getByPosition(column_indices[j]).column.get(), ref_it->row_num);
            ++rows_added;
        }
    }
A
Amos Bird 已提交
452 453 454 455
};


// TODO: multiple stream read and index read
N
Nikolai Kochetov 已提交
456
Pipe StorageJoin::read(
A
Amos Bird 已提交
457
    const Names & column_names,
458
    const StorageMetadataPtr & metadata_snapshot,
459
    SelectQueryInfo & /*query_info*/,
A
Amos Bird 已提交
460 461
    const Context & /*context*/,
    QueryProcessingStage::Enum /*processed_stage*/,
462
    size_t max_block_size,
A
Amos Bird 已提交
463 464
    unsigned /*num_streams*/)
{
A
alesapin 已提交
465
    metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
466

N
Nikolai Kochetov 已提交
467
    return Pipe(std::make_shared<JoinSource>(*join, max_block_size, metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID())));
A
Amos Bird 已提交
468 469
}

470
}