Snapshots.cpp 5.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

#include "db/snapshot/Snapshots.h"
#include "db/snapshot/CompoundOperations.h"

namespace milvus {
namespace engine {
namespace snapshot {

19
Status
X
XuPeng-SH 已提交
20
Snapshots::DropCollection(ID_TYPE collection_id, const LSN_TYPE& lsn) {
21 22 23 24
    ScopedSnapshotT ss;
    auto status = GetSnapshot(ss, collection_id);
    if (!status.ok())
        return status;
X
XuPeng-SH 已提交
25
    return DoDropCollection(ss, lsn);
26 27 28
}

Status
X
XuPeng-SH 已提交
29
Snapshots::DropCollection(const std::string& name, const LSN_TYPE& lsn) {
30 31 32 33
    ScopedSnapshotT ss;
    auto status = GetSnapshot(ss, name);
    if (!status.ok())
        return status;
X
XuPeng-SH 已提交
34
    return DoDropCollection(ss, lsn);
35
}
36

37
Status
X
XuPeng-SH 已提交
38
Snapshots::DoDropCollection(ScopedSnapshotT& ss, const LSN_TYPE& lsn) {
39
    OperationContext context;
X
XuPeng-SH 已提交
40
    context.lsn = lsn;
41
    context.collection = ss->GetCollection();
X
XuPeng-SH 已提交
42
    auto op = std::make_shared<SoftDeleteCollectionOperation>(context, ss);
43 44 45 46 47 48 49
    op->Push();
    auto status = op->GetStatus();

    std::unique_lock<std::shared_timed_mutex> lock(mutex_);
    name_id_map_.erase(context.collection->GetName());
    holders_.erase(context.collection->GetID());
    return status;
50 51
}

52 53 54 55 56 57 58 59
Status
Snapshots::GetSnapshotNoLoad(ScopedSnapshotT& ss, ID_TYPE collection_id, bool scoped) {
    SnapshotHolderPtr holder;
    auto status = GetHolder(collection_id, holder, false);
    if (!status.ok())
        return status;
    status = holder->GetSnapshot(ss, 0, scoped, false);
    return status;
60 61
}

62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
Status
Snapshots::GetSnapshot(ScopedSnapshotT& ss, ID_TYPE collection_id, ID_TYPE id, bool scoped) {
    SnapshotHolderPtr holder;
    auto status = GetHolder(collection_id, holder);
    if (!status.ok())
        return status;
    status = holder->GetSnapshot(ss, id, scoped);
    return status;
}

Status
Snapshots::GetSnapshot(ScopedSnapshotT& ss, const std::string& name, ID_TYPE id, bool scoped) {
    SnapshotHolderPtr holder;
    auto status = GetHolder(name, holder);
    if (!status.ok())
        return status;
    status = holder->GetSnapshot(ss, id, scoped);
    return status;
80 81
}

82 83
Status
Snapshots::GetCollectionIds(IDS_TYPE& ids) const {
84 85 86 87
    std::shared_lock<std::shared_timed_mutex> lock(mutex_);
    for (auto& kv : holders_) {
        ids.push_back(kv.first);
    }
88
    return Status::OK();
89 90
}

91 92
Status
Snapshots::LoadNoLock(ID_TYPE collection_id, SnapshotHolderPtr& holder) {
93 94 95 96
    auto op = std::make_shared<GetSnapshotIDsOperation>(collection_id, false);
    op->Push();
    auto& collection_commit_ids = op->GetIDs();
    if (collection_commit_ids.size() == 0) {
97
        return Status(SS_NOT_FOUND_ERROR, "No collection commit found");
98
    }
99 100
    holder = std::make_shared<SnapshotHolder>(collection_id,
                                              std::bind(&Snapshots::SnapshotGCCallback, this, std::placeholders::_1));
101 102 103
    for (auto c_c_id : collection_commit_ids) {
        holder->Add(c_c_id);
    }
104
    return Status::OK();
105 106 107 108 109 110 111
}

void
Snapshots::Init() {
    auto op = std::make_shared<GetCollectionIDsOperation>();
    op->Push();
    auto& collection_ids = op->GetIDs();
112
    SnapshotHolderPtr holder;
113
    for (auto collection_id : collection_ids) {
114
        GetHolder(collection_id, holder);
115 116 117
    }
}

118 119
Status
Snapshots::GetHolder(const std::string& name, SnapshotHolderPtr& holder) {
120 121 122 123 124
    {
        std::unique_lock<std::shared_timed_mutex> lock(mutex_);
        auto kv = name_id_map_.find(name);
        if (kv != name_id_map_.end()) {
            lock.unlock();
125
            return GetHolder(kv->second, holder);
126 127 128 129 130 131
        }
    }
    LoadOperationContext context;
    context.name = name;
    auto op = std::make_shared<LoadOperation<Collection>>(context);
    op->Push();
132 133 134 135 136
    CollectionPtr c;
    auto status = op->GetResource(c);
    if (!status.ok())
        return status;
    return GetHolder(c->GetID(), holder);
137 138
}

139 140 141
Status
Snapshots::GetHolder(ID_TYPE collection_id, SnapshotHolderPtr& holder, bool load) {
    Status status;
142 143
    {
        std::unique_lock<std::shared_timed_mutex> lock(mutex_);
144 145 146 147 148
        status = GetHolderNoLock(collection_id, holder);
        if (status.ok() && holder)
            return status;
        if (!load)
            return status;
149
    }
150 151 152
    status = LoadNoLock(collection_id, holder);
    if (!status.ok())
        return status;
153 154 155

    std::unique_lock<std::shared_timed_mutex> lock(mutex_);
    holders_[collection_id] = holder;
156 157 158 159 160 161
    ScopedSnapshotT ss;
    status = holder->GetSnapshot(ss);
    if (!status.ok())
        return status;
    name_id_map_[ss->GetName()] = collection_id;
    return status;
162 163
}

164 165
Status
Snapshots::GetHolderNoLock(ID_TYPE collection_id, SnapshotHolderPtr& holder) {
166 167
    auto it = holders_.find(collection_id);
    if (it == holders_.end()) {
168
        return Status(SS_NOT_FOUND_ERROR, "Specified snapshot holder not found");
169
    }
170 171
    holder = it->second;
    return Status::OK();
172 173
}

174
Status
175 176 177 178 179
Snapshots::Reset() {
    std::unique_lock<std::shared_timed_mutex> lock(mutex_);
    holders_.clear();
    name_id_map_.clear();
    to_release_.clear();
180
    return Status::OK();
181 182 183 184 185 186 187 188 189 190 191 192 193
}

void
Snapshots::SnapshotGCCallback(Snapshot::Ptr ss_ptr) {
    /* to_release_.push_back(ss_ptr); */
    ss_ptr->UnRef();
    std::cout << &(*ss_ptr) << " Snapshot " << ss_ptr->GetID() << " RefCnt = " << ss_ptr->RefCnt() << " To be removed"
              << std::endl;
}

}  // namespace snapshot
}  // namespace engine
}  // namespace milvus