CompoundOperations.cpp 17.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

#include "db/snapshot/CompoundOperations.h"
#include <memory>
14
#include <sstream>
15 16
#include "db/snapshot/OperationExecutor.h"
#include "db/snapshot/Snapshots.h"
C
Cai Yudong 已提交
17
#include "utils/Status.h"
18 19 20 21 22

namespace milvus {
namespace engine {
namespace snapshot {

X
XuPeng-SH 已提交
23 24 25
BuildOperation::BuildOperation(const OperationContext& context, ScopedSnapshotT prev_ss) : BaseT(context, prev_ss) {
}

26 27
Status
BuildOperation::DoExecute(Store& store) {
C
Cai Yudong 已提交
28 29
    STATUS_CHECK(CheckStale(std::bind(&BuildOperation::CheckSegmentStale, this, std::placeholders::_1,
                                      context_.new_segment_files[0]->GetSegmentId())));
30

C
Cai Yudong 已提交
31 32 33
    SegmentCommitOperation sc_op(context_, GetAdjustedSS());
    STATUS_CHECK(sc_op(store));
    STATUS_CHECK(sc_op.GetResource(context_.new_segment_commit));
X
XuPeng-SH 已提交
34
    AddStepWithLsn(*context_.new_segment_commit, context_.lsn);
35

36
    PartitionCommitOperation pc_op(context_, GetAdjustedSS());
C
Cai Yudong 已提交
37
    STATUS_CHECK(pc_op(store));
38
    OperationContext cc_context;
C
Cai Yudong 已提交
39
    STATUS_CHECK(pc_op.GetResource(cc_context.new_partition_commit));
X
XuPeng-SH 已提交
40
    AddStepWithLsn(*cc_context.new_partition_commit, context_.lsn);
41

C
Cai Yudong 已提交
42 43
    context_.new_partition_commit = cc_context.new_partition_commit;
    STATUS_CHECK(pc_op.GetResource(context_.new_partition_commit));
X
XuPeng-SH 已提交
44
    AddStepWithLsn(*context_.new_partition_commit, context_.lsn);
45

46
    CollectionCommitOperation cc_op(cc_context, GetAdjustedSS());
C
Cai Yudong 已提交
47 48
    STATUS_CHECK(cc_op(store));
    STATUS_CHECK(cc_op.GetResource(context_.new_collection_commit));
X
XuPeng-SH 已提交
49
    AddStepWithLsn(*context_.new_collection_commit, context_.lsn);
50

C
Cai Yudong 已提交
51
    return Status::OK();
52 53
}

54 55 56 57
Status
BuildOperation::CheckSegmentStale(ScopedSnapshotT& latest_snapshot, ID_TYPE segment_id) const {
    auto segment = latest_snapshot->GetResource<Segment>(segment_id);
    if (!segment) {
58 59 60
        std::stringstream emsg;
        emsg << GetRepr() << ". Target segment " << segment_id << " is stale";
        return Status(SS_STALE_ERROR, emsg.str());
61
    }
62
    return Status::OK();
63 64
}

65 66
Status
BuildOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentFilePtr& created) {
C
Cai Yudong 已提交
67 68 69
    STATUS_CHECK(
        CheckStale(std::bind(&BuildOperation::CheckSegmentStale, this, std::placeholders::_1, context.segment_id)));

70 71
    auto segment = GetStartedSS()->GetResource<Segment>(context.segment_id);
    if (!segment) {
72 73 74
        std::stringstream emsg;
        emsg << GetRepr() << ". Invalid segment " << context.segment_id << " in context";
        return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
75 76 77 78
    }
    auto ctx = context;
    ctx.partition_id = segment->GetPartitionId();
    auto new_sf_op = std::make_shared<SegmentFileOperation>(ctx, GetStartedSS());
C
Cai Yudong 已提交
79 80
    STATUS_CHECK(new_sf_op->Push());
    STATUS_CHECK(new_sf_op->GetResource(created));
81
    context_.new_segment_files.push_back(created);
X
XuPeng-SH 已提交
82
    AddStepWithLsn(*created, context_.lsn);
C
Cai Yudong 已提交
83 84

    return Status::OK();
85 86 87 88 89 90
}

NewSegmentOperation::NewSegmentOperation(const OperationContext& context, ScopedSnapshotT prev_ss)
    : BaseT(context, prev_ss) {
}

91
Status
92 93 94 95
NewSegmentOperation::DoExecute(Store& store) {
    // PXU TODO:
    // 1. Check all requried field elements have related segment files
    // 2. Check Stale and others
96 97 98
    /* auto status = PrevSnapshotRequried(); */
    /* if (!status.ok()) return status; */
    // TODO: Check Context
C
Cai Yudong 已提交
99 100 101
    SegmentCommitOperation sc_op(context_, GetAdjustedSS());
    STATUS_CHECK(sc_op(store));
    STATUS_CHECK(sc_op.GetResource(context_.new_segment_commit));
X
XuPeng-SH 已提交
102
    AddStepWithLsn(*context_.new_segment_commit, context_.lsn);
103 104 105 106 107
    /* std::cout << GetRepr() << " POST_SC_MAP=("; */
    /* for (auto id : context_.new_segment_commit->GetMappings()) { */
    /*     std::cout << id << ","; */
    /* } */
    /* std::cout << ")" << std::endl; */
108 109

    OperationContext cc_context;
110
    PartitionCommitOperation pc_op(context_, GetAdjustedSS());
C
Cai Yudong 已提交
111 112
    STATUS_CHECK(pc_op(store));
    STATUS_CHECK(pc_op.GetResource(cc_context.new_partition_commit));
X
XuPeng-SH 已提交
113 114
    AddStepWithLsn(*cc_context.new_partition_commit, context_.lsn);
    context_.new_partition_commit = cc_context.new_partition_commit;
115 116 117 118 119
    /* std::cout << GetRepr() << " POST_PC_MAP=("; */
    /* for (auto id : cc_context.new_partition_commit->GetMappings()) { */
    /*     std::cout << id << ","; */
    /* } */
    /* std::cout << ")" << std::endl; */
120

121
    CollectionCommitOperation cc_op(cc_context, GetAdjustedSS());
C
Cai Yudong 已提交
122 123
    STATUS_CHECK(cc_op(store));
    STATUS_CHECK(cc_op.GetResource(context_.new_collection_commit));
X
XuPeng-SH 已提交
124
    AddStepWithLsn(*context_.new_collection_commit, context_.lsn);
125

C
Cai Yudong 已提交
126
    return Status::OK();
127 128
}

129 130
Status
NewSegmentOperation::CommitNewSegment(SegmentPtr& created) {
131
    auto op = std::make_shared<SegmentOperation>(context_, GetStartedSS());
C
Cai Yudong 已提交
132 133
    STATUS_CHECK(op->Push());
    STATUS_CHECK(op->GetResource(context_.new_segment));
134
    created = context_.new_segment;
X
XuPeng-SH 已提交
135
    AddStepWithLsn(*created, context_.lsn);
C
Cai Yudong 已提交
136
    return Status::OK();
137 138
}

139 140
Status
NewSegmentOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentFilePtr& created) {
C
Cai Yudong 已提交
141 142 143 144 145 146
    auto ctx = context;
    ctx.segment_id = context_.new_segment->GetID();
    ctx.partition_id = context_.new_segment->GetPartitionId();
    auto new_sf_op = std::make_shared<SegmentFileOperation>(ctx, GetStartedSS());
    STATUS_CHECK(new_sf_op->Push());
    STATUS_CHECK(new_sf_op->GetResource(created));
X
XuPeng-SH 已提交
147
    AddStepWithLsn(*created, context_.lsn);
148
    context_.new_segment_files.push_back(created);
C
Cai Yudong 已提交
149
    return Status::OK();
150 151 152 153 154
}

MergeOperation::MergeOperation(const OperationContext& context, ScopedSnapshotT prev_ss) : BaseT(context, prev_ss) {
}

155 156 157 158 159 160
Status
MergeOperation::OnSnapshotStale() {
    for (auto& stale_seg : context_.stale_segments) {
        auto expect_sc = GetStartedSS()->GetSegmentCommitBySegmentId(stale_seg->GetID());
        auto latest_sc = GetAdjustedSS()->GetSegmentCommitBySegmentId(stale_seg->GetID());
        if (!latest_sc || (latest_sc->GetID() != expect_sc->GetID())) {
161 162 163
            std::stringstream emsg;
            emsg << GetRepr() << ". Stale segment " << stale_seg->GetID() << " in context";
            return Status(SS_STALE_ERROR, emsg.str());
164 165 166 167 168
        }
    }
    return Status::OK();
}

169 170 171 172
Status
MergeOperation::CommitNewSegment(SegmentPtr& created) {
    if (context_.new_segment) {
        created = context_.new_segment;
C
Cai Yudong 已提交
173
        return Status::OK();
174
    }
175
    auto op = std::make_shared<SegmentOperation>(context_, GetStartedSS());
C
Cai Yudong 已提交
176 177
    STATUS_CHECK(op->Push());
    STATUS_CHECK(op->GetResource(context_.new_segment));
178
    created = context_.new_segment;
X
XuPeng-SH 已提交
179
    AddStepWithLsn(*created, context_.lsn);
C
Cai Yudong 已提交
180
    return Status::OK();
181 182
}

183 184
Status
MergeOperation::CommitNewSegmentFile(const SegmentFileContext& context, SegmentFilePtr& created) {
185
    // PXU TODO: Check element type and segment file mapping rules
186
    SegmentPtr new_segment;
C
Cai Yudong 已提交
187 188 189 190 191 192 193
    STATUS_CHECK(CommitNewSegment(new_segment));
    auto ctx = context;
    ctx.segment_id = new_segment->GetID();
    ctx.partition_id = new_segment->GetPartitionId();
    auto new_sf_op = std::make_shared<SegmentFileOperation>(ctx, GetStartedSS());
    STATUS_CHECK(new_sf_op->Push());
    STATUS_CHECK(new_sf_op->GetResource(created));
194
    context_.new_segment_files.push_back(created);
X
XuPeng-SH 已提交
195
    AddStepWithLsn(*created, context_.lsn);
C
Cai Yudong 已提交
196
    return Status::OK();
197 198
}

199 200
Status
MergeOperation::DoExecute(Store& store) {
201
    // PXU TODO:
C
Cai Yudong 已提交
202
    // 1. Check all required field elements have related segment files
203
    // 2. Check Stale and others
C
Cai Yudong 已提交
204 205 206
    SegmentCommitOperation sc_op(context_, GetAdjustedSS());
    STATUS_CHECK(sc_op(store));
    STATUS_CHECK(sc_op.GetResource(context_.new_segment_commit));
X
XuPeng-SH 已提交
207
    AddStepWithLsn(*context_.new_segment_commit, context_.lsn);
208 209 210 211 212
    /* std::cout << GetRepr() << " POST_SC_MAP=("; */
    /* for (auto id : context_.new_segment_commit->GetMappings()) { */
    /*     std::cout << id << ","; */
    /* } */
    /* std::cout << ")" << std::endl; */
213

214
    PartitionCommitOperation pc_op(context_, GetAdjustedSS());
C
Cai Yudong 已提交
215
    STATUS_CHECK(pc_op(store));
216
    OperationContext cc_context;
C
Cai Yudong 已提交
217
    STATUS_CHECK(pc_op.GetResource(cc_context.new_partition_commit));
X
XuPeng-SH 已提交
218 219
    AddStepWithLsn(*cc_context.new_partition_commit, context_.lsn);
    context_.new_partition_commit = cc_context.new_partition_commit;
220

221 222 223 224 225 226
    /* std::cout << GetRepr() << " POST_PC_MAP=("; */
    /* for (auto id : cc_context.new_partition_commit->GetMappings()) { */
    /*     std::cout << id << ","; */
    /* } */
    /* std::cout << ")" << std::endl; */

227
    CollectionCommitOperation cc_op(cc_context, GetAdjustedSS());
C
Cai Yudong 已提交
228 229
    STATUS_CHECK(cc_op(store));
    STATUS_CHECK(cc_op.GetResource(context_.new_collection_commit));
X
XuPeng-SH 已提交
230
    AddStepWithLsn(*context_.new_collection_commit, context_.lsn);
231

C
Cai Yudong 已提交
232
    return Status::OK();
233 234 235
}

GetSnapshotIDsOperation::GetSnapshotIDsOperation(ID_TYPE collection_id, bool reversed)
X
XuPeng-SH 已提交
236 237 238
    : BaseT(OperationContext(), ScopedSnapshotT(), OperationsType::O_Compound),
      collection_id_(collection_id),
      reversed_(reversed) {
239 240
}

241
Status
242 243
GetSnapshotIDsOperation::DoExecute(Store& store) {
    ids_ = store.AllActiveCollectionCommitIds(collection_id_, reversed_);
244
    return Status::OK();
245 246 247 248 249 250 251 252 253 254 255
}

const IDS_TYPE&
GetSnapshotIDsOperation::GetIDs() const {
    return ids_;
}

GetCollectionIDsOperation::GetCollectionIDsOperation(bool reversed)
    : BaseT(OperationContext(), ScopedSnapshotT()), reversed_(reversed) {
}

256
Status
257 258
GetCollectionIDsOperation::DoExecute(Store& store) {
    ids_ = store.AllActiveCollectionIds(reversed_);
259
    return Status::OK();
260 261 262 263 264 265 266
}

const IDS_TYPE&
GetCollectionIDsOperation::GetIDs() const {
    return ids_;
}

267
DropPartitionOperation::DropPartitionOperation(const PartitionContext& context, ScopedSnapshotT prev_ss)
X
XuPeng-SH 已提交
268 269 270 271 272 273
    : BaseT(OperationContext(), prev_ss), c_context_(context) {
}

std::string
DropPartitionOperation::GetRepr() const {
    std::stringstream ss;
274
    ss << "<" << GetName() << "(";
275 276
    if (GetAdjustedSS()) {
        ss << "SS=" << GetAdjustedSS()->GetID();
277
    }
X
XuPeng-SH 已提交
278 279 280 281 282
    ss << "," << c_context_.ToString();
    ss << "," << context_.ToString();
    ss << ",LSN=" << GetContextLsn();
    ss << ")>";
    return ss.str();
283 284 285 286 287
}

Status
DropPartitionOperation::DoExecute(Store& store) {
    PartitionPtr p;
X
XuPeng-SH 已提交
288
    auto id = c_context_.id;
289
    if (id == 0) {
C
Cai Yudong 已提交
290
        STATUS_CHECK(GetAdjustedSS()->GetPartitionId(c_context_.name, id));
X
XuPeng-SH 已提交
291
        c_context_.id = id;
292
    }
293
    auto p_c = GetAdjustedSS()->GetPartitionCommitByPartitionId(id);
294 295 296 297 298
    if (!p_c) {
        std::stringstream emsg;
        emsg << GetRepr() << ". PartitionCommit " << id << " not found";
        return Status(SS_NOT_FOUND_ERROR, emsg.str());
    }
X
XuPeng-SH 已提交
299
    context_.stale_partition_commit = p_c;
300 301 302

    OperationContext op_ctx;
    op_ctx.stale_partition_commit = p_c;
C
Cai Yudong 已提交
303 304 305
    auto cc_op = CollectionCommitOperation(op_ctx, GetAdjustedSS());
    STATUS_CHECK(cc_op(store));
    STATUS_CHECK(cc_op.GetResource(context_.new_collection_commit));
X
XuPeng-SH 已提交
306
    AddStepWithLsn(*context_.new_collection_commit, c_context_.lsn);
C
Cai Yudong 已提交
307
    return Status::OK();
308 309 310 311 312 313 314 315
}

CreatePartitionOperation::CreatePartitionOperation(const OperationContext& context, ScopedSnapshotT prev_ss)
    : BaseT(context, prev_ss) {
}

Status
CreatePartitionOperation::PreCheck() {
C
Cai Yudong 已提交
316
    STATUS_CHECK(BaseT::PreCheck());
317
    if (!context_.new_partition) {
318 319
        std::stringstream emsg;
        emsg << GetRepr() << ". Partition is missing";
C
Cai Yudong 已提交
320
        return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
321
    }
C
Cai Yudong 已提交
322
    return Status::OK();
323 324 325 326
}

Status
CreatePartitionOperation::CommitNewPartition(const PartitionContext& context, PartitionPtr& partition) {
327
    auto op = std::make_shared<PartitionOperation>(context, GetStartedSS());
C
Cai Yudong 已提交
328 329
    STATUS_CHECK(op->Push());
    STATUS_CHECK(op->GetResource(partition));
330
    context_.new_partition = partition;
X
XuPeng-SH 已提交
331
    AddStepWithLsn(*partition, context_.lsn);
C
Cai Yudong 已提交
332
    return Status::OK();
333 334 335 336
}

Status
CreatePartitionOperation::DoExecute(Store& store) {
C
Cai Yudong 已提交
337
    STATUS_CHECK(CheckStale());
338

339
    auto collection = GetAdjustedSS()->GetCollection();
340 341 342 343 344
    auto partition = context_.new_partition;

    PartitionCommitPtr pc;
    OperationContext pc_context;
    pc_context.new_partition = partition;
345
    auto pc_op = PartitionCommitOperation(pc_context, GetAdjustedSS());
C
Cai Yudong 已提交
346 347
    STATUS_CHECK(pc_op(store));
    STATUS_CHECK(pc_op.GetResource(pc));
X
XuPeng-SH 已提交
348
    AddStepWithLsn(*pc, context_.lsn);
C
Cai Yudong 已提交
349

350 351
    OperationContext cc_context;
    cc_context.new_partition_commit = pc;
X
XuPeng-SH 已提交
352
    context_.new_partition_commit = pc;
353
    auto cc_op = CollectionCommitOperation(cc_context, GetAdjustedSS());
C
Cai Yudong 已提交
354
    STATUS_CHECK(cc_op(store));
355
    CollectionCommitPtr cc;
C
Cai Yudong 已提交
356
    STATUS_CHECK(cc_op.GetResource(cc));
X
XuPeng-SH 已提交
357 358
    AddStepWithLsn(*cc, context_.lsn);
    context_.new_collection_commit = cc;
359

C
Cai Yudong 已提交
360
    return Status::OK();
361 362
}

363
CreateCollectionOperation::CreateCollectionOperation(const CreateCollectionContext& context)
X
XuPeng-SH 已提交
364 365 366 367 368 369 370 371 372 373 374 375
    : BaseT(OperationContext(), ScopedSnapshotT()), c_context_(context) {
}

Status
CreateCollectionOperation::PreCheck() {
    // TODO
    return Status::OK();
}

std::string
CreateCollectionOperation::GetRepr() const {
    std::stringstream ss;
376
    ss << "<" << GetName() << "(";
377 378
    if (GetAdjustedSS()) {
        ss << "SS=" << GetAdjustedSS()->GetID();
379
    }
X
XuPeng-SH 已提交
380 381 382 383 384
    ss << c_context_.ToString();
    ss << "," << context_.ToString();
    ss << ",LSN=" << GetContextLsn();
    ss << ")>";
    return ss.str();
385 386
}

387
Status
388 389
CreateCollectionOperation::DoExecute(Store& store) {
    // TODO: Do some checks
390
    CollectionPtr collection;
X
XuPeng-SH 已提交
391
    auto status = store.CreateCollection(Collection(c_context_.collection->GetName()), collection);
392 393 394 395
    if (!status.ok()) {
        std::cerr << status.ToString() << std::endl;
        return status;
    }
X
XuPeng-SH 已提交
396 397
    AddStepWithLsn(*collection, c_context_.lsn);
    context_.new_collection = collection;
398 399
    MappingT field_commit_ids = {};
    auto field_idx = 0;
X
XuPeng-SH 已提交
400
    for (auto& field_kv : c_context_.fields_schema) {
401 402 403
        field_idx++;
        auto& field_schema = field_kv.first;
        auto& field_elements = field_kv.second;
404 405
        FieldPtr field;
        status = store.CreateResource<Field>(Field(field_schema->GetName(), field_idx), field);
X
XuPeng-SH 已提交
406
        AddStepWithLsn(*field, c_context_.lsn);
407
        MappingT element_ids = {};
408 409 410
        FieldElementPtr raw_element;
        status = store.CreateResource<FieldElement>(
            FieldElement(collection->GetID(), field->GetID(), "RAW", FieldElementType::RAW), raw_element);
X
XuPeng-SH 已提交
411
        AddStepWithLsn(*raw_element, c_context_.lsn);
412 413
        element_ids.insert(raw_element->GetID());
        for (auto& element_schema : field_elements) {
414 415 416 417 418
            FieldElementPtr element;
            status =
                store.CreateResource<FieldElement>(FieldElement(collection->GetID(), field->GetID(),
                                                                element_schema->GetName(), element_schema->GetFtype()),
                                                   element);
X
XuPeng-SH 已提交
419
            AddStepWithLsn(*element, c_context_.lsn);
420 421
            element_ids.insert(element->GetID());
        }
422 423 424
        FieldCommitPtr field_commit;
        status = store.CreateResource<FieldCommit>(FieldCommit(collection->GetID(), field->GetID(), element_ids),
                                                   field_commit);
X
XuPeng-SH 已提交
425
        AddStepWithLsn(*field_commit, c_context_.lsn);
426 427
        field_commit_ids.insert(field_commit->GetID());
    }
428 429
    SchemaCommitPtr schema_commit;
    status = store.CreateResource<SchemaCommit>(SchemaCommit(collection->GetID(), field_commit_ids), schema_commit);
X
XuPeng-SH 已提交
430
    AddStepWithLsn(*schema_commit, c_context_.lsn);
431 432
    PartitionPtr partition;
    status = store.CreateResource<Partition>(Partition("_default", collection->GetID()), partition);
X
XuPeng-SH 已提交
433 434
    AddStepWithLsn(*partition, c_context_.lsn);
    context_.new_partition = partition;
435 436 437
    PartitionCommitPtr partition_commit;
    status = store.CreateResource<PartitionCommit>(PartitionCommit(collection->GetID(), partition->GetID()),
                                                   partition_commit);
X
XuPeng-SH 已提交
438 439
    AddStepWithLsn(*partition_commit, c_context_.lsn);
    context_.new_partition_commit = partition_commit;
440 441 442
    CollectionCommitPtr collection_commit;
    status = store.CreateResource<CollectionCommit>(
        CollectionCommit(collection->GetID(), schema_commit->GetID(), {partition_commit->GetID()}), collection_commit);
X
XuPeng-SH 已提交
443 444 445
    AddStepWithLsn(*collection_commit, c_context_.lsn);
    context_.new_collection_commit = collection_commit;
    c_context_.collection_commit = collection_commit;
446
    context_.new_collection_commit = collection_commit;
447
    return Status::OK();
448 449
}

450 451
Status
CreateCollectionOperation::GetSnapshot(ScopedSnapshotT& ss) const {
C
Cai Yudong 已提交
452 453
    STATUS_CHECK(CheckDone());
    STATUS_CHECK(CheckIDSNotEmpty());
454 455 456 457 458
    if (!c_context_.collection_commit) {
        std::stringstream emsg;
        emsg << GetRepr() << ". No snapshot is available";
        return Status(SS_CONSTRAINT_CHECK_ERROR, emsg.str());
    }
459 460
    /* status = Snapshots::GetInstance().GetSnapshot(ss, c_context_.collection_commit->GetCollectionId()); */
    ss = context_.latest_ss;
C
Cai Yudong 已提交
461
    return Status::OK();
462 463
}

464
Status
465
DropCollectionOperation::DoExecute(Store& store) {
466
    if (!context_.collection) {
467 468 469
        std::stringstream emsg;
        emsg << GetRepr() << ". Collection is missing in context";
        return Status(SS_INVALID_CONTEX_ERROR, emsg.str());
470 471
    }
    context_.collection->Deactivate();
X
XuPeng-SH 已提交
472
    AddStepWithLsn(*context_.collection, context_.lsn);
473
    return Status::OK();
474 475
}

476 477 478
}  // namespace snapshot
}  // namespace engine
}  // namespace milvus