IStorage.cpp 7.8 KB
Newer Older
1
#include <Storages/IStorage.h>
2

3
#include <Common/StringUtils/StringUtils.h>
4
#include <Common/quoteString.h>
A
cleanup  
Alexander Kuzmenkov 已提交
5 6 7
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <Interpreters/Context.h>
A
alesapin 已提交
8
#include <Interpreters/ExpressionActions.h>
N
Nikolai Kochetov 已提交
9
#include <Interpreters/InterpreterSelectQuery.h>
A
cleanup  
Alexander Kuzmenkov 已提交
10 11 12 13 14
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Processors/Pipe.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Storages/AlterCommands.h>
A
alexey-milovidov 已提交
15 16 17 18


namespace DB
{
19 20
namespace ErrorCodes
{
A
Alexey Milovidov 已提交
21
    extern const int TABLE_IS_DROPPED;
A
alesapin 已提交
22
    extern const int NOT_IMPLEMENTED;
23
    extern const int DEADLOCK_AVOIDED;
24 25
}

A
alesapin 已提交
26
bool IStorage::isVirtualColumn(const String & column_name, const StorageMetadataPtr & metadata_snapshot) const
27
{
A
Alexey Milovidov 已提交
28
    /// Virtual column maybe overridden by real column
A
alesapin 已提交
29
    return !metadata_snapshot->getColumns().has(column_name) && getVirtuals().contains(column_name);
30 31
}

32
RWLockImpl::LockHolder IStorage::tryLockTimed(
F
fastio 已提交
33
    const RWLock & rwlock, RWLockImpl::Type type, const String & query_id, const std::chrono::milliseconds & acquire_timeout) const
34
{
35
    auto lock_holder = rwlock->getLock(type, query_id, acquire_timeout);
36
    if (!lock_holder)
37 38
    {
        const String type_str = type == RWLockImpl::Type::Read ? "READ" : "WRITE";
39
        throw Exception(
F
fastio 已提交
40 41 42 43 44
            type_str + " locking attempt on \"" + getStorageID().getFullTableName() + "\" has timed out! ("
                + std::to_string(acquire_timeout.count())
                + "ms) "
                  "Possible deadlock avoided. Client should retry.",
            ErrorCodes::DEADLOCK_AVOIDED);
45
    }
46
    return lock_holder;
47 48
}

49
TableLockHolder IStorage::lockForShare(const String & query_id, const std::chrono::milliseconds & acquire_timeout)
50
{
A
alesapin 已提交
51
    TableLockHolder result = tryLockTimed(drop_lock, RWLockImpl::Read, query_id, acquire_timeout);
52 53 54

    if (is_dropped)
        throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
A
alesapin 已提交
55

56 57 58
    return result;
}

59
TableLockHolder IStorage::lockForAlter(const String & query_id, const std::chrono::milliseconds & acquire_timeout)
60
{
A
alesapin 已提交
61
    TableLockHolder result = tryLockTimed(alter_lock, RWLockImpl::Write, query_id, acquire_timeout);
62 63 64

    if (is_dropped)
        throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
A
alesapin 已提交
65

66 67 68 69
    return result;
}


70
TableExclusiveLockHolder IStorage::lockExclusively(const String & query_id, const std::chrono::milliseconds & acquire_timeout)
71
{
A
alesapin 已提交
72 73
    TableExclusiveLockHolder result;
    result.alter_lock = tryLockTimed(alter_lock, RWLockImpl::Write, query_id, acquire_timeout);
74 75 76 77

    if (is_dropped)
        throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);

A
alesapin 已提交
78
    result.drop_lock = tryLockTimed(drop_lock, RWLockImpl::Write, query_id, acquire_timeout);
79 80 81 82

    return result;
}

N
Nikolai Kochetov 已提交
83
Pipe IStorage::read(
F
fastio 已提交
84 85 86 87 88 89 90
    const Names & /*column_names*/,
    const StorageMetadataPtr & /*metadata_snapshot*/,
    SelectQueryInfo & /*query_info*/,
    const Context & /*context*/,
    QueryProcessingStage::Enum /*processed_stage*/,
    size_t /*max_block_size*/,
    unsigned /*num_streams*/)
N
Nikolai Kochetov 已提交
91 92 93 94
{
    throw Exception("Method read is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}

95
void IStorage::read(
F
fastio 已提交
96 97 98 99 100 101 102 103
    QueryPlan & query_plan,
    const Names & column_names,
    const StorageMetadataPtr & metadata_snapshot,
    SelectQueryInfo & query_info,
    const Context & context,
    QueryProcessingStage::Enum processed_stage,
    size_t max_block_size,
    unsigned num_streams)
104
{
105
    auto pipe = read(column_names, metadata_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
N
Nikolai Kochetov 已提交
106 107 108 109 110 111 112 113 114 115
    if (pipe.empty())
    {
        auto header = metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID());
        InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info);
    }
    else
    {
        auto read_step = std::make_unique<ReadFromStorageStep>(std::move(pipe), getName());
        query_plan.addStep(std::move(read_step));
    }
116 117
}

N
Nikolai Kochetov 已提交
118
Pipe IStorage::alterPartition(
F
fastio 已提交
119
    const StorageMetadataPtr & /* metadata_snapshot */, const PartitionCommands & /* commands */, const Context & /* context */)
N
Nikolai Kochetov 已提交
120 121 122 123
{
    throw Exception("Partition operations are not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}

F
fastio 已提交
124
void IStorage::alter(const AlterCommands & params, const Context & context, TableLockHolder &)
125
{
126
    auto table_id = getStorageID();
A
alesapin 已提交
127 128 129
    StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
    params.apply(new_metadata, context);
    DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(context, table_id, new_metadata);
130
    setInMemoryMetadata(new_metadata);
A
alesapin 已提交
131
}
132

A
alesapin 已提交
133

134
void IStorage::checkAlterIsPossible(const AlterCommands & commands, const Context & /* context */) const
A
alesapin 已提交
135 136
{
    for (const auto & command : commands)
137
    {
A
alesapin 已提交
138
        if (!command.isCommentAlter())
A
alesapin 已提交
139 140 141
            throw Exception(
                "Alter of type '" + alterTypeToString(command.type) + "' is not supported by storage " + getName(),
                ErrorCodes::NOT_IMPLEMENTED);
142 143
    }
}
144

145 146 147 148 149
void IStorage::checkMutationIsPossible(const MutationCommands & /*commands*/, const Settings & /*settings*/) const
{
    throw Exception("Table engine " + getName() + " doesn't support mutations", ErrorCodes::NOT_IMPLEMENTED);
}

F
fastio 已提交
150 151
void IStorage::checkAlterPartitionIsPossible(
    const PartitionCommands & /*commands*/, const StorageMetadataPtr & /*metadata_snapshot*/, const Settings & /*settings*/) const
A
alesapin 已提交
152
{
A
alesapin 已提交
153
    throw Exception("Table engine " + getName() + " doesn't support partitioning", ErrorCodes::NOT_IMPLEMENTED);
A
alesapin 已提交
154
}
155

A
Alexander Tokmakov 已提交
156
StorageID IStorage::getStorageID() const
A
Alexander Tokmakov 已提交
157
{
158
    std::lock_guard lock(id_mutex);
A
fixes  
Alexander Tokmakov 已提交
159
    return storage_id;
A
Alexander Tokmakov 已提交
160 161
}

A
Alexander Tokmakov 已提交
162
void IStorage::renameInMemory(const StorageID & new_table_id)
A
Alexander Tokmakov 已提交
163
{
164
    std::lock_guard lock(id_mutex);
A
Alexander Tokmakov 已提交
165
    storage_id = new_table_id;
A
Alexander Tokmakov 已提交
166 167
}

168
NamesAndTypesList IStorage::getVirtuals() const
A
alesapin 已提交
169
{
170
    return {};
A
alesapin 已提交
171 172
}

F
fastio 已提交
173 174 175 176 177 178 179 180 181
Names IStorage::getAllRegisteredNames() const
{
    Names result;
    auto getter = [](const auto & column) { return column.name; };
    const NamesAndTypesList & available_columns = getInMemoryMetadata().getColumns().getAllPhysical();
    std::transform(available_columns.begin(), available_columns.end(), std::back_inserter(result), getter);
    return result;
}

F
fix  
feng lv 已提交
182
NameDependencies IStorage::getDependentViewsByColumn(const Context & context) const
183 184 185 186 187 188
{
    NameDependencies name_deps;
    auto dependencies = DatabaseCatalog::instance().getDependencies(storage_id);
    for (const auto & depend_id : dependencies)
    {
        auto depend_table = DatabaseCatalog::instance().getTable(depend_id, context);
F
fix  
feng lv 已提交
189 190 191 192 193 194 195
        if (depend_table->getInMemoryMetadataPtr()->select.inner_query)
        {
            const auto & select_query = depend_table->getInMemoryMetadataPtr()->select.inner_query;
            auto required_columns = InterpreterSelectQuery(select_query, context, SelectQueryOptions{}.noModify()).getRequiredColumns();
            for (const auto & col_name : required_columns)
                name_deps[col_name].push_back(depend_id.table_name);
        }
196 197 198 199
    }
    return name_deps;
}

A
debug  
Alexander Kuzmenkov 已提交
200 201
std::string PrewhereDAGInfo::dump() const
{
A
cleanup  
Alexander Kuzmenkov 已提交
202
    WriteBufferFromOwnString ss;
A
debug  
Alexander Kuzmenkov 已提交
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
    ss << "PrewhereDagInfo\n";

    if (alias_actions)
    {
        ss << "alias_actions " << alias_actions->dumpDAG() << "\n";
    }

    if (prewhere_actions)
    {
        ss << "prewhere_actions " << prewhere_actions->dumpDAG() << "\n";
    }

    if (remove_columns_actions)
    {
        ss << "remove_columns_actions " << remove_columns_actions->dumpDAG() << "\n";
    }

    ss << "remove_prewhere_column " << remove_prewhere_column
       << ", need_filter " << need_filter << "\n";

    return ss.str();
}

226
std::string FilterDAGInfo::dump() const
A
debug  
Alexander Kuzmenkov 已提交
227
{
A
cleanup  
Alexander Kuzmenkov 已提交
228
    WriteBufferFromOwnString ss;
229
    ss << "FilterDAGInfo for column '" << column_name <<"', do_remove_column "
A
debug  
Alexander Kuzmenkov 已提交
230
       << do_remove_column << "\n";
231
    if (actions)
A
debug  
Alexander Kuzmenkov 已提交
232
    {
233
        ss << "actions " << actions->dumpDAG() << "\n";
A
debug  
Alexander Kuzmenkov 已提交
234 235 236
    }

    return ss.str();
A
Alexander Kuzmenkov 已提交
237
}
A
debug  
Alexander Kuzmenkov 已提交
238

A
alexey-milovidov 已提交
239
}