Operations.h 7.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

#pragma once

#include <assert.h>
#include <any>
#include <atomic>
#include <condition_variable>
#include <memory>
#include <mutex>
20
#include <string>
21 22 23
#include <thread>
#include <vector>
#include "Context.h"
24 25
#include "db/snapshot/Snapshot.h"
#include "db/snapshot/Store.h"
26
#include "utils/Error.h"
27
#include "utils/Status.h"
28 29 30 31 32 33

namespace milvus {
namespace engine {
namespace snapshot {

using StepsT = std::vector<std::any>;
34
using CheckStaleFunc = std::function<Status(ScopedSnapshotT&)>;
35

X
XuPeng-SH 已提交
36 37
enum OperationsType { Invalid, W_Leaf, O_Leaf, W_Compound, O_Compound };

38 39
class Operations : public std::enable_shared_from_this<Operations> {
 public:
X
XuPeng-SH 已提交
40 41
    Operations(const OperationContext& context, ScopedSnapshotT prev_ss,
               const OperationsType& type = OperationsType::Invalid);
42 43 44 45 46 47

    const ScopedSnapshotT&
    GetPrevSnapshot() const {
        return prev_ss_;
    }

X
XuPeng-SH 已提交
48 49 50 51 52
    virtual const LSN_TYPE&
    GetContextLsn() const {
        return context_.lsn;
    }

53 54 55 56
    virtual Status
    CheckStale(const CheckStaleFunc& checker = nullptr) const;
    virtual Status
    DoCheckStale(ScopedSnapshotT& latest_snapshot) const;
57 58 59

    template <typename StepT>
    void
60
    AddStep(const StepT& step, bool activate = true);
X
XuPeng-SH 已提交
61 62 63
    template <typename StepT>
    void
    AddStepWithLsn(const StepT& step, const LSN_TYPE& lsn, bool activate = true);
64 65 66 67 68 69 70 71 72 73 74 75 76
    void
    SetStepResult(ID_TYPE id) {
        ids_.push_back(id);
    }

    StepsT&
    GetSteps() {
        return steps_;
    }

    ID_TYPE
    GetID() const;

X
XuPeng-SH 已提交
77 78 79 80 81
    virtual const OperationsType&
    GetType() const {
        return type_;
    }

82
    virtual Status
83
    OnExecute(Store&);
84
    virtual Status
85
    PreExecute(Store&);
86
    virtual Status
87
    DoExecute(Store&);
88
    virtual Status
89 90
    PostExecute(Store&);

91 92
    virtual Status
    GetSnapshot(ScopedSnapshotT& ss) const;
93

94
    virtual Status
95
    operator()(Store& store);
96
    virtual Status
97 98
    Push(bool sync = true);

99 100 101 102
    virtual Status
    PreCheck();

    virtual Status
103 104
    ApplyToStore(Store& store);

105
    Status
106 107 108
    WaitToFinish();

    void
109
    Done(Store& store);
110

111 112 113 114 115 116 117 118
    void
    SetStatus(const Status& status);

    Status
    GetStatus() const {
        return status_;
    }

X
XuPeng-SH 已提交
119 120
    virtual std::string
    GetName() const {
121 122 123
        return typeid(*this).name();
    }
    virtual std::string
X
XuPeng-SH 已提交
124
    GetRepr() const;
125 126 127 128

    virtual std::string
    ToString() const;

X
XuPeng-SH 已提交
129 130 131
    Status
    RollBack();

132 133 134 135 136
    virtual Status
    OnSnapshotStale();
    virtual Status
    OnSnapshotDropped();

X
XuPeng-SH 已提交
137 138 139 140
    virtual ~Operations();

    friend std::ostream&
    operator<<(std::ostream& out, const Operations& operation);
141 142

 protected:
143 144 145 146 147 148 149 150 151 152 153 154
    virtual std::string
    SuccessString() const;
    virtual std::string
    FailureString() const;

    Status
    DoneRequired() const;
    Status
    IDSNotEmptyRequried() const;
    Status
    PrevSnapshotRequried() const;

X
XuPeng-SH 已提交
155 156 157
    Status
    ApplyRollBack(Store&);

158 159 160 161
    OperationContext context_;
    ScopedSnapshotT prev_ss_;
    StepsT steps_;
    std::vector<ID_TYPE> ids_;
162 163
    bool done_ = false;
    Status status_;
164 165 166
    mutable std::mutex finish_mtx_;
    std::condition_variable finish_cond_;
    ID_TYPE uid_;
X
XuPeng-SH 已提交
167
    OperationsType type_;
168 169 170 171
};

template <typename StepT>
void
172 173 174 175 176
Operations::AddStep(const StepT& step, bool activate) {
    auto s = std::make_shared<StepT>(step);
    if (activate)
        s->Activate();
    steps_.push_back(s);
177 178
}

X
XuPeng-SH 已提交
179 180 181 182 183 184 185 186 187 188
template <typename StepT>
void
Operations::AddStepWithLsn(const StepT& step, const LSN_TYPE& lsn, bool activate) {
    auto s = std::make_shared<StepT>(step);
    if (activate)
        s->Activate();
    s->SetLsn(lsn);
    steps_.push_back(s);
}

189 190 191 192
template <typename ResourceT>
class CommitOperation : public Operations {
 public:
    using BaseT = Operations;
X
XuPeng-SH 已提交
193 194
    CommitOperation(const OperationContext& context, ScopedSnapshotT prev_ss)
        : BaseT(context, prev_ss, OperationsType::W_Leaf) {
195 196 197 198 199 200 201
    }

    virtual typename ResourceT::Ptr
    GetPrevResource() const {
        return nullptr;
    }

202 203 204 205 206 207 208 209 210 211 212
    Status
    GetResource(typename ResourceT::Ptr& res, bool wait = false) {
        if (wait) {
            WaitToFinish();
        }
        auto status = DoneRequired();
        if (!status.ok())
            return status;
        status = IDSNotEmptyRequried();
        if (!status.ok())
            return status;
213
        resource_->SetID(ids_[0]);
214 215
        res = resource_;
        return status;
216 217 218
    }

 protected:
219 220 221 222 223 224 225 226
    Status
    ResourceNotNullRequired() const {
        Status status;
        if (!resource_)
            return Status(SS_CONSTRAINT_CHECK_ERROR, "No specified resource");
        return status;
    }

227 228 229 230 231 232 233
    typename ResourceT::Ptr resource_;
};

template <typename ResourceT>
class LoadOperation : public Operations {
 public:
    explicit LoadOperation(const LoadOperationContext& context)
X
XuPeng-SH 已提交
234
        : Operations(OperationContext(), ScopedSnapshotT(), OperationsType::O_Leaf), context_(context) {
235 236
    }

237
    Status
238
    ApplyToStore(Store& store) override {
239
        if (done_) {
240
            Done(store);
241
            return status_;
242
        }
243 244
        auto status = store.GetResource<ResourceT>(context_.id, resource_);
        SetStatus(status);
245
        Done(store);
246
        return status_;
247 248
    }

249 250 251 252 253 254 255 256 257 258 259 260 261
    Status
    GetResource(typename ResourceT::Ptr& res, bool wait = false) {
        if (wait) {
            WaitToFinish();
        }
        auto status = DoneRequired();
        if (!status.ok())
            return status;
        status = ResourceNotNullRequired();
        if (!status.ok())
            return status;
        res = resource_;
        return status;
262 263 264
    }

 protected:
265 266 267 268 269 270 271 272
    Status
    ResourceNotNullRequired() const {
        Status status;
        if (!resource_)
            return Status(SS_CONSTRAINT_CHECK_ERROR, "No specified resource");
        return status;
    }

273 274 275 276 277 278 279
    LoadOperationContext context_;
    typename ResourceT::Ptr resource_;
};

template <typename ResourceT>
class HardDeleteOperation : public Operations {
 public:
X
XuPeng-SH 已提交
280 281
    explicit HardDeleteOperation(ID_TYPE id)
        : Operations(OperationContext(), ScopedSnapshotT(), OperationsType::W_Leaf), id_(id) {
282 283
    }

284
    Status
285
    ApplyToStore(Store& store) override {
286 287 288 289
        if (done_)
            return status_;
        auto status = store.RemoveResource<ResourceT>(id_);
        SetStatus(status);
290
        Done(store);
291
        return status_;
292 293
    }

294 295 296 297 298 299 300
 protected:
    ID_TYPE id_;
};

template <>
class HardDeleteOperation<Collection> : public Operations {
 public:
X
XuPeng-SH 已提交
301 302
    explicit HardDeleteOperation(ID_TYPE id)
        : Operations(OperationContext(), ScopedSnapshotT(), OperationsType::W_Leaf), id_(id) {
303 304 305 306 307
    }

    Status
    ApplyToStore(Store& store) override {
        if (done_) {
308
            Done(store);
309 310 311 312
            return status_;
        }
        auto status = store.RemoveCollection(id_);
        SetStatus(status);
313
        Done(store);
314
        return status_;
315 316 317 318 319 320 321 322 323 324 325
    }

 protected:
    ID_TYPE id_;
};

using OperationsPtr = std::shared_ptr<Operations>;

}  // namespace snapshot
}  // namespace engine
}  // namespace milvus