StorageSystemTables.cpp 20.2 KB
Newer Older
1 2 3 4
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeDateTime.h>
5
#include <DataTypes/DataTypeNullable.h>
6 7
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/System/StorageSystemTables.h>
A
Alexey Milovidov 已提交
8
#include <Storages/VirtualColumnUtils.h>
9
#include <Databases/IDatabase.h>
10
#include <Access/ContextAccess.h>
11
#include <Interpreters/Context.h>
12 13 14
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/queryToString.h>
#include <Common/typeid_cast.h>
15
#include <Common/StringUtils/StringUtils.h>
16
#include <DataTypes/DataTypesNumber.h>
17
#include <DataTypes/DataTypeArray.h>
G
Gleb Novikov 已提交
18
#include <Disks/StoragePolicy.h>
19 20
#include <Processors/Sources/SourceWithProgress.h>
#include <Processors/Pipe.h>
A
Alexander Tokmakov 已提交
21
#include <DataTypes/DataTypeUUID.h>
22 23 24 25 26


namespace DB
{

27 28
namespace ErrorCodes
{
A
Alexey Milovidov 已提交
29
    extern const int TABLE_IS_DROPPED;
30 31
}

32

33
StorageSystemTables::StorageSystemTables(const std::string & name_)
A
Alexander Tokmakov 已提交
34
    : IStorage({"system", name_})
35
{
A
alesapin 已提交
36 37
    StorageInMemoryMetadata storage_metadata;
    storage_metadata.setColumns(ColumnsDescription(
A
Alexey Milovidov 已提交
38
    {
A
Alexey Milovidov 已提交
39 40
        {"database", std::make_shared<DataTypeString>()},
        {"name", std::make_shared<DataTypeString>()},
A
Alexander Tokmakov 已提交
41
        {"uuid", std::make_shared<DataTypeUUID>()},
A
Alexey Milovidov 已提交
42 43
        {"engine", std::make_shared<DataTypeString>()},
        {"is_temporary", std::make_shared<DataTypeUInt8>()},
I
Bugfix  
Igor Mineev 已提交
44
        {"data_paths", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
45
        {"metadata_path", std::make_shared<DataTypeString>()},
46
        {"metadata_modification_time", std::make_shared<DataTypeDateTime>()},
47 48
        {"dependencies_database", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
        {"dependencies_table", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
49
        {"create_table_query", std::make_shared<DataTypeString>()},
50 51
        {"engine_full", std::make_shared<DataTypeString>()},
        {"partition_key", std::make_shared<DataTypeString>()},
52 53 54
        {"sorting_key", std::make_shared<DataTypeString>()},
        {"primary_key", std::make_shared<DataTypeString>()},
        {"sampling_key", std::make_shared<DataTypeString>()},
55
        {"storage_policy", std::make_shared<DataTypeString>()},
56
        {"total_rows", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>())},
57
        {"total_bytes", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>())},
58 59
        {"lifetime_rows", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>())},
        {"lifetime_bytes", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>())},
60
    }));
A
alesapin 已提交
61
    setInMemoryMetadata(storage_metadata);
62 63
}

64

65
static ColumnPtr getFilteredDatabases(const ASTPtr & query, const Context & context)
66
{
67
    MutableColumnPtr column = ColumnString::create();
68
    for (const auto & db : DatabaseCatalog::instance().getDatabases())
69
        column->insert(db.first);
70

71
    Block block { ColumnWithTypeAndName(std::move(column), std::make_shared<DataTypeString>(), "database") };
72
    VirtualColumnUtils::filterBlockWithQuery(query, block, context);
73
    return block.getByPosition(0).column;
74 75
}

A
alexey-milovidov 已提交
76 77
/// Avoid heavy operation on tables if we only queried columns that we can get without table object.
/// Otherwise it will require table initialization for Lazy database.
A
alexey-milovidov 已提交
78
static bool needLockStructure(const DatabasePtr & database, const Block & header)
V
fix  
Vasilyev Nikita 已提交
79
{
V
fix  
Vasilyev Nikita 已提交
80 81 82
    if (database->getEngineName() != "Lazy")
        return true;

A
Alexander Tokmakov 已提交
83
    static const std::set<std::string> columns_without_lock = { "database", "name", "uuid", "metadata_modification_time" };
A
alexey-milovidov 已提交
84
    for (const auto & column : header.getColumnsWithTypeAndName())
V
fix  
Vasilyev Nikita 已提交
85 86
    {
        if (columns_without_lock.find(column.name) == columns_without_lock.end())
V
fix  
Vasilyev Nikita 已提交
87 88 89 90
            return true;
    }
    return false;
}
91

92
class TablesBlockSource : public SourceWithProgress
93
{
94
public:
95
    TablesBlockSource(
K
kreuzerkrieg 已提交
96
        std::vector<UInt8> columns_mask_,
97
        Block header,
K
kreuzerkrieg 已提交
98 99 100
        UInt64 max_block_size_,
        ColumnPtr databases_,
        const Context & context_)
101 102
        : SourceWithProgress(std::move(header))
        , columns_mask(std::move(columns_mask_))
103 104 105
        , max_block_size(max_block_size_)
        , databases(std::move(databases_))
        , context(context_) {}
106 107 108 109

    String getName() const override { return "Tables"; }

protected:
110
    Chunk generate() override
111 112 113
    {
        if (done)
            return {};
114

115
        MutableColumns res_columns = getPort().getHeader().cloneEmptyColumns();
116

117 118
        const auto access = context.getAccess();
        const bool check_access_for_databases = !access->isGranted(AccessType::SHOW_TABLES);
119

120 121 122 123 124
        size_t rows_count = 0;
        while (rows_count < max_block_size)
        {
            if (tables_it && !tables_it->isValid())
                ++database_idx;
125

126 127 128
            while (database_idx < databases->size() && (!tables_it || !tables_it->isValid()))
            {
                database_name = databases->getDataAt(database_idx).toString();
129
                database = DatabaseCatalog::instance().tryGetDatabase(database_name);
130

131
                if (!database)
132 133 134 135 136
                {
                    /// Database was deleted just now or the user has no access.
                    ++database_idx;
                    continue;
                }
137

138 139
                break;
            }
140

A
Alexey Milovidov 已提交
141
            /// This is for temporary tables. They are output in single block regardless to max_block_size.
142 143 144 145 146
            if (database_idx >= databases->size())
            {
                if (context.hasSessionContext())
                {
                    Tables external_tables = context.getSessionContext().getExternalTables();
147

A
Alexey Milovidov 已提交
148
                    for (auto & table : external_tables)
149 150 151
                    {
                        size_t src_index = 0;
                        size_t res_index = 0;
152

153
                        // database
154 155
                        if (columns_mask[src_index++])
                            res_columns[res_index++]->insertDefault();
156

157
                        // name
158 159
                        if (columns_mask[src_index++])
                            res_columns[res_index++]->insert(table.first);
160

A
Alexander Tokmakov 已提交
161 162 163 164
                        // uuid
                        if (columns_mask[src_index++])
                            res_columns[res_index++]->insert(table.second->getStorageID().uuid);

165
                        // engine
166 167
                        if (columns_mask[src_index++])
                            res_columns[res_index++]->insert(table.second->getName());
168

169
                        // is_temporary
170
                        if (columns_mask[src_index++])
A
Amos Bird 已提交
171
                            res_columns[res_index++]->insert(1u);
172

173
                        // data_paths
174 175
                        if (columns_mask[src_index++])
                            res_columns[res_index++]->insertDefault();
176

177
                        // metadata_path
178 179
                        if (columns_mask[src_index++])
                            res_columns[res_index++]->insertDefault();
180

181
                        // metadata_modification_time
182 183
                        if (columns_mask[src_index++])
                            res_columns[res_index++]->insertDefault();
184

185
                        // dependencies_database
186 187
                        if (columns_mask[src_index++])
                            res_columns[res_index++]->insertDefault();
188

189
                        // dependencies_table
190 191
                        if (columns_mask[src_index++])
                            res_columns[res_index++]->insertDefault();
192

193
                        // create_table_query
194 195
                        if (columns_mask[src_index++])
                            res_columns[res_index++]->insertDefault();
196

197
                        // engine_full
198 199
                        if (columns_mask[src_index++])
                            res_columns[res_index++]->insert(table.second->getName());
200

201
                        // partition_key
202 203 204
                        if (columns_mask[src_index++])
                            res_columns[res_index++]->insertDefault();

205
                        // sorting_key
206 207 208
                        if (columns_mask[src_index++])
                            res_columns[res_index++]->insertDefault();

209
                        // primary_key
210 211
                        if (columns_mask[src_index++])
                            res_columns[res_index++]->insertDefault();
212

213
                        // sampling_key
214 215
                        if (columns_mask[src_index++])
                            res_columns[res_index++]->insertDefault();
216

217
                        // storage_policy
218 219
                        if (columns_mask[src_index++])
                            res_columns[res_index++]->insertDefault();
220 221 222 223

                        // total_rows
                        if (columns_mask[src_index++])
                            res_columns[res_index++]->insertDefault();
224 225 226 227

                        // total_bytes
                        if (columns_mask[src_index++])
                            res_columns[res_index++]->insertDefault();
228 229 230 231 232 233 234 235

                        // lifetime_rows
                        if (columns_mask[src_index++])
                            res_columns[res_index++]->insertDefault();

                        // lifetime_bytes
                        if (columns_mask[src_index++])
                            res_columns[res_index++]->insertDefault();
236 237 238
                    }
                }

239
                UInt64 num_rows = res_columns.at(0)->size();
N
Nikolai Kochetov 已提交
240
                done = true;
241
                return Chunk(std::move(res_columns), num_rows);
242
            }
243

244
            const bool check_access_for_tables = check_access_for_databases && !access->isGranted(AccessType::SHOW_TABLES, database_name);
245

246
            if (!tables_it || !tables_it->isValid())
247
                tables_it = database->getTablesIterator(context);
V
fix  
Vasilyev Nikita 已提交
248

249
            const bool need_lock_structure = needLockStructure(database, getPort().getHeader());
V
fix  
Vasilyev Nikita 已提交
250

251
            for (; rows_count < max_block_size && tables_it->isValid(); tables_it->next())
252
            {
253
                auto table_name = tables_it->name();
254
                if (check_access_for_tables && !access->isGranted(AccessType::SHOW_TABLES, database_name, table_name))
255
                    continue;
A
Alexey Milovidov 已提交
256

257
                StoragePtr table = nullptr;
A
alesapin 已提交
258
                TableLockHolder lock;
A
Alexey Milovidov 已提交
259

A
Alexander Kazakov 已提交
260
                if (need_lock_structure)
A
Alexey Milovidov 已提交
261
                {
A
Alexander Kazakov 已提交
262 263 264 265 266 267 268
                    table = tables_it->table();
                    if (table == nullptr)
                    {
                        // Table might have just been removed or detached for Lazy engine (see DatabaseLazy::tryGetTable())
                        continue;
                    }
                    try
V
Vasilyev Nikita 已提交
269
                    {
A
alesapin 已提交
270
                        lock = table->lockForShare(context.getCurrentQueryId(), context.getSettingsRef().lock_acquire_timeout);
V
Vasilyev Nikita 已提交
271
                    }
A
Alexander Kazakov 已提交
272 273 274 275 276 277
                    catch (const Exception & e)
                    {
                        if (e.code() == ErrorCodes::TABLE_IS_DROPPED)
                            continue;
                        throw;
                    }
A
Alexey Milovidov 已提交
278
                }
279 280

                ++rows_count;
281

282 283
                size_t src_index = 0;
                size_t res_index = 0;
284 285

                if (columns_mask[src_index++])
286
                    res_columns[res_index++]->insert(database_name);
287

288
                if (columns_mask[src_index++])
289
                    res_columns[res_index++]->insert(table_name);
290

A
Alexander Tokmakov 已提交
291 292 293
                if (columns_mask[src_index++])
                    res_columns[res_index++]->insert(tables_it->uuid());

294
                if (columns_mask[src_index++])
V
Vasilyev Nikita 已提交
295
                {
296
                    assert(table != nullptr);
297
                    res_columns[res_index++]->insert(table->getName());
V
Vasilyev Nikita 已提交
298
                }
299

300
                if (columns_mask[src_index++])
301
                    res_columns[res_index++]->insert(0u);  // is_temporary
302

303
                if (columns_mask[src_index++])
I
Fix.  
Igor Mineev 已提交
304
                {
305
                    assert(table != nullptr);
I
Igor Mineev 已提交
306
                    Array table_paths_array;
I
Fix.  
Igor Mineev 已提交
307
                    auto paths = table->getDataPaths();
I
Igor Mineev 已提交
308
                    table_paths_array.reserve(paths.size());
I
Igor Mineev 已提交
309
                    for (const String & path : paths)
I
Igor Mineev 已提交
310 311
                        table_paths_array.push_back(path);
                    res_columns[res_index++]->insert(table_paths_array);
I
Fix.  
Igor Mineev 已提交
312
                }
313 314

                if (columns_mask[src_index++])
A
alesapin 已提交
315
                    res_columns[res_index++]->insert(database->getObjectMetadataPath(table_name));
316

317
                if (columns_mask[src_index++])
318
                    res_columns[res_index++]->insert(static_cast<UInt64>(database->getObjectMetadataModificationTime(table_name)));
319 320 321 322 323

                {
                    Array dependencies_table_name_array;
                    Array dependencies_database_name_array;
                    if (columns_mask[src_index] || columns_mask[src_index + 1])
324
                    {
325
                        const auto dependencies = DatabaseCatalog::instance().getDependencies(StorageID(database_name, table_name));
326

327 328 329 330
                        dependencies_table_name_array.reserve(dependencies.size());
                        dependencies_database_name_array.reserve(dependencies.size());
                        for (const auto & dependency : dependencies)
                        {
A
Alexander Tokmakov 已提交
331 332
                            dependencies_table_name_array.push_back(dependency.table_name);
                            dependencies_database_name_array.push_back(dependency.database_name);
333 334 335
                        }
                    }

336 337
                    if (columns_mask[src_index++])
                        res_columns[res_index++]->insert(dependencies_database_name_array);
338

339 340 341
                    if (columns_mask[src_index++])
                        res_columns[res_index++]->insert(dependencies_table_name_array);
                }
Z
zhang2014 已提交
342

343 344
                if (columns_mask[src_index] || columns_mask[src_index + 1])
                {
345
                    ASTPtr ast = database->tryGetCreateTableQuery(table_name, context);
346

347 348
                    if (columns_mask[src_index++])
                        res_columns[res_index++]->insert(ast ? queryToString(ast) : "");
349

350 351 352
                    if (columns_mask[src_index++])
                    {
                        String engine_full;
353

354 355
                        if (ast)
                        {
I
Ivan Lezhankin 已提交
356 357
                            const auto & ast_create = ast->as<ASTCreateQuery &>();
                            if (ast_create.storage)
358
                            {
I
Ivan Lezhankin 已提交
359
                                engine_full = queryToString(*ast_create.storage);
360 361 362 363 364 365

                                static const char * const extra_head = " ENGINE = ";
                                if (startsWith(engine_full, extra_head))
                                    engine_full = engine_full.substr(strlen(extra_head));
                            }
                        }
366

367 368 369
                        res_columns[res_index++]->insert(engine_full);
                    }
                }
370
                else
371
                    src_index += 2;
372

A
alesapin 已提交
373 374 375 376
                StorageMetadataPtr metadata_snapshot;
                if (table != nullptr)
                    metadata_snapshot = table->getInMemoryMetadataPtr();

377 378 379
                ASTPtr expression_ptr;
                if (columns_mask[src_index++])
                {
A
alesapin 已提交
380
                    assert(metadata_snapshot != nullptr);
381
                    if ((expression_ptr = metadata_snapshot->getPartitionKeyAST()))
382 383 384 385 386
                        res_columns[res_index++]->insert(queryToString(expression_ptr));
                    else
                        res_columns[res_index++]->insertDefault();
                }

387 388
                if (columns_mask[src_index++])
                {
A
alesapin 已提交
389
                    assert(metadata_snapshot != nullptr);
A
alesapin 已提交
390
                    if ((expression_ptr = metadata_snapshot->getSortingKey().expression_list_ast))
391 392 393 394 395
                        res_columns[res_index++]->insert(queryToString(expression_ptr));
                    else
                        res_columns[res_index++]->insertDefault();
                }

396 397
                if (columns_mask[src_index++])
                {
A
alesapin 已提交
398
                    assert(metadata_snapshot != nullptr);
A
alesapin 已提交
399
                    if ((expression_ptr = metadata_snapshot->getPrimaryKey().expression_list_ast))
400 401 402 403 404 405 406
                        res_columns[res_index++]->insert(queryToString(expression_ptr));
                    else
                        res_columns[res_index++]->insertDefault();
                }

                if (columns_mask[src_index++])
                {
A
alesapin 已提交
407
                    assert(metadata_snapshot != nullptr);
408
                    if ((expression_ptr = metadata_snapshot->getSamplingKeyAST()))
409 410 411 412
                        res_columns[res_index++]->insert(queryToString(expression_ptr));
                    else
                        res_columns[res_index++]->insertDefault();
                }
413 414 415

                if (columns_mask[src_index++])
                {
416
                    assert(table != nullptr);
417 418 419 420 421 422
                    auto policy = table->getStoragePolicy();
                    if (policy)
                        res_columns[res_index++]->insert(policy->getName());
                    else
                        res_columns[res_index++]->insertDefault();
                }
423 424 425 426 427 428 429 430 431 432

                if (columns_mask[src_index++])
                {
                    assert(table != nullptr);
                    auto total_rows = table->totalRows();
                    if (total_rows)
                        res_columns[res_index++]->insert(*total_rows);
                    else
                        res_columns[res_index++]->insertDefault();
                }
433 434 435 436 437 438 439 440 441 442

                if (columns_mask[src_index++])
                {
                    assert(table != nullptr);
                    auto total_bytes = table->totalBytes();
                    if (total_bytes)
                        res_columns[res_index++]->insert(*total_bytes);
                    else
                        res_columns[res_index++]->insertDefault();
                }
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462

                if (columns_mask[src_index++])
                {
                    assert(table != nullptr);
                    auto lifetime_rows = table->lifetimeRows();
                    if (lifetime_rows)
                        res_columns[res_index++]->insert(*lifetime_rows);
                    else
                        res_columns[res_index++]->insertDefault();
                }

                if (columns_mask[src_index++])
                {
                    assert(table != nullptr);
                    auto lifetime_bytes = table->lifetimeBytes();
                    if (lifetime_bytes)
                        res_columns[res_index++]->insert(*lifetime_bytes);
                    else
                        res_columns[res_index++]->insertDefault();
                }
463 464
            }
        }
465

466 467
        UInt64 num_rows = res_columns.at(0)->size();
        return Chunk(std::move(res_columns), num_rows);
468 469 470
    }
private:
    std::vector<UInt8> columns_mask;
A
Alexey Milovidov 已提交
471
    UInt64 max_block_size;
472 473
    ColumnPtr databases;
    size_t database_idx = 0;
A
alesapin 已提交
474
    DatabaseTablesIteratorPtr tables_it;
475 476 477 478 479
    const Context context;
    bool done = false;
    DatabasePtr database;
    std::string database_name;
};
480 481


482
Pipes StorageSystemTables::read(
483
    const Names & column_names,
484
    const StorageMetadataPtr & metadata_snapshot,
485 486
    const SelectQueryInfo & query_info,
    const Context & context,
487
    QueryProcessingStage::Enum /*processed_stage*/,
488
    const size_t max_block_size,
489 490
    const unsigned /*num_streams*/)
{
A
alesapin 已提交
491
    metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
492

493
    /// Create a mask of what columns are needed in the result.
494

495
    NameSet names_set(column_names.begin(), column_names.end());
496

497
    Block sample_block = metadata_snapshot->getSampleBlock();
498
    Block res_block;
499

500 501 502 503 504 505 506
    std::vector<UInt8> columns_mask(sample_block.columns());
    for (size_t i = 0, size = columns_mask.size(); i < size; ++i)
    {
        if (names_set.count(sample_block.getByPosition(i).name))
        {
            columns_mask[i] = 1;
            res_block.insert(sample_block.getByPosition(i));
507 508 509
        }
    }

510
    ColumnPtr filtered_databases_column = getFilteredDatabases(query_info.query, context);
511 512 513 514 515 516

    Pipes pipes;
    pipes.emplace_back(std::make_shared<TablesBlockSource>(
        std::move(columns_mask), std::move(res_block), max_block_size, std::move(filtered_databases_column), context));

    return pipes;
517 518
}

519
}