Snapshot.h 10.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

#pragma once

#include <assert.h>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <iostream>
#include <limits>
#include <map>
#include <memory>
#include <mutex>
23
#include <set>
24 25 26
#include <shared_mutex>
#include <string>
#include <thread>
X
XuPeng-SH 已提交
27
#include <tuple>
28 29
#include <utility>
#include <vector>
30
#include "db/snapshot/Store.h"
X
XuPeng-SH 已提交
31
#include "db/snapshot/Utils.h"
32
#include "db/snapshot/WrappedTypes.h"
33
#include "utils/Status.h"
34 35 36 37 38

namespace milvus {
namespace engine {
namespace snapshot {

X
XuPeng-SH 已提交
39 40 41 42 43
using ScopedResourcesT =
    std::tuple<CollectionCommit::ScopedMapT, Collection::ScopedMapT, SchemaCommit::ScopedMapT, FieldCommit::ScopedMapT,
               Field::ScopedMapT, FieldElement::ScopedMapT, PartitionCommit::ScopedMapT, Partition::ScopedMapT,
               SegmentCommit::ScopedMapT, Segment::ScopedMapT, SegmentFile::ScopedMapT>;

44 45 46 47 48 49
class Snapshot : public ReferenceProxy {
 public:
    using Ptr = std::shared_ptr<Snapshot>;
    explicit Snapshot(ID_TYPE id);

    ID_TYPE
50
    GetID() const {
X
XuPeng-SH 已提交
51
        return GetCollectionCommit()->GetID();
52
    }
X
XuPeng-SH 已提交
53

54
    ID_TYPE
55
    GetCollectionId() const {
56
        auto it = GetResources<Collection>().cbegin();
X
XuPeng-SH 已提交
57
        return it->first;
58
    }
X
XuPeng-SH 已提交
59

60
    CollectionPtr
61 62
    GetCollection() const {
        return GetResources<Collection>().cbegin()->second.Get();
63 64
    }

65
    SchemaCommitPtr
66
    GetSchemaCommit() const {
67 68 69 70
        auto id = GetLatestSchemaCommitId();
        return GetResource<SchemaCommit>(id);
    }

71
    const std::string&
72
    GetName() const {
73
        return GetResources<Collection>().cbegin()->second->GetName();
74
    }
X
XuPeng-SH 已提交
75

76
    size_t
77 78 79 80
    NumberOfPartitions() const {
        return GetResources<Partition>().size();
    }

81
    const LSN_TYPE&
X
XuPeng-SH 已提交
82 83 84 85
    GetMaxLsn() const {
        return max_lsn_;
    }

86
    PartitionPtr
87
    GetPartition(const std::string& name) const {
88 89 90 91 92 93 94 95
        ID_TYPE id;
        auto status = GetPartitionId(name, id);
        if (!status.ok()) {
            return nullptr;
        }
        return GetResource<Partition>(id);
    }

96 97 98 99 100 101 102 103 104 105
    Status
    GetPartitionId(const std::string& name, ID_TYPE& id) const {
        auto it = partition_names_map_.find(name);
        if (it == partition_names_map_.end()) {
            return Status(SS_NOT_FOUND_ERROR, "Specified partition name not found");
        }
        id = it->second;
        return Status::OK();
    }

106
    CollectionCommitPtr
107 108
    GetCollectionCommit() const {
        return GetResources<CollectionCommit>().cbegin()->second.Get();
109 110
    }

111 112 113 114 115 116 117 118 119
    const std::set<ID_TYPE>&
    GetSegmentFileIds(ID_TYPE segment_id) const {
        auto it = seg_segfiles_map_.find(segment_id);
        if (it == seg_segfiles_map_.end()) {
            return empty_set_;
        }
        return it->second;
    }

120 121 122
    SegmentFilePtr
    GetSegmentFile(ID_TYPE segment_id, ID_TYPE field_element_id) const;

123
    ID_TYPE
124 125 126 127
    GetLatestSchemaCommitId() const {
        return latest_schema_commit_id_;
    }

128 129 130 131
    Status
    GetFieldElement(const std::string& field_name, const std::string& field_element_name,
                    FieldElementPtr& field_element) const;

132 133
    // PXU TODO: add const. Need to change Scopedxxxx::Get
    SegmentCommitPtr
134
    GetSegmentCommitBySegmentId(ID_TYPE segment_id) const {
135 136 137
        auto it = seg_segc_map_.find(segment_id);
        if (it == seg_segc_map_.end())
            return nullptr;
X
XuPeng-SH 已提交
138
        return GetResource<SegmentCommit>(it->second);
139 140
    }

141 142 143 144 145 146 147 148 149 150
    std::vector<std::string>
    GetPartitionNames() const {
        std::vector<std::string> names;
        for (auto& kv : partition_names_map_) {
            names.emplace_back(kv.first);
        }

        return std::move(names);
    }

151
    PartitionCommitPtr
152
    GetPartitionCommitByPartitionId(ID_TYPE partition_id) const {
153 154 155
        auto it = p_pc_map_.find(partition_id);
        if (it == p_pc_map_.end())
            return nullptr;
X
XuPeng-SH 已提交
156
        return GetResource<PartitionCommit>(it->second);
157 158
    }

159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
    template <typename HandlerT>
    void
    IterateResources(const typename HandlerT::Ptr& handler) {
        auto& resources = GetResources<typename HandlerT::ResourceT>();
        auto status = handler->PreIterate();
        if (!status.ok()) {
            handler->SetStatus(status);
            return;
        }
        for (auto& kv : resources) {
            status = handler->Handle(kv.second.Get());
            if (!status.ok()) {
                break;
            }
        }
        if (!status.ok()) {
            handler->SetStatus(status);
            return;
        }

        status = handler->PostIterate();
        handler->SetStatus(status);
    }

183
    std::vector<std::string>
184 185 186 187 188 189 190 191
    GetFieldNames() const {
        std::vector<std::string> names;
        for (auto& kv : field_names_map_) {
            names.emplace_back(kv.first);
        }
        return std::move(names);
    }

192
    bool
193 194 195 196 197
    HasField(const std::string& name) const {
        auto it = field_names_map_.find(name);
        return it != field_names_map_.end();
    }

198 199 200
    FieldPtr
    GetField(const std::string& name) const;

201
    bool
202 203 204 205 206
    HasFieldElement(const std::string& field_name, const std::string& field_element_name) const {
        auto id = GetFieldElementId(field_name, field_element_name);
        return id > 0;
    }

207
    ID_TYPE
208 209 210 211 212 213 214 215 216 217 218 219 220
    GetSegmentFileId(const std::string& field_name, const std::string& field_element_name, ID_TYPE segment_id) const {
        auto field_element_id = GetFieldElementId(field_name, field_element_name);
        auto it = element_segfiles_map_.find(field_element_id);
        if (it == element_segfiles_map_.end()) {
            return 0;
        }
        auto its = it->second.find(segment_id);
        if (its == it->second.end()) {
            return 0;
        }
        return its->second;
    }

221
    bool
222 223 224 225 226
    HasSegmentFile(const std::string& field_name, const std::string& field_element_name, ID_TYPE segment_id) const {
        auto id = GetSegmentFileId(field_name, field_element_name, segment_id);
        return id > 0;
    }

227
    ID_TYPE
228 229 230
    GetFieldElementId(const std::string& field_name, const std::string& field_element_name) const {
        auto itf = field_element_names_map_.find(field_name);
        if (itf == field_element_names_map_.end())
231
            return 0;
232 233 234 235 236 237 238 239
        auto itfe = itf->second.find(field_element_name);
        if (itfe == itf->second.end()) {
            return 0;
        }

        return itfe->second;
    }

240 241 242 243 244 245 246 247 248 249 250 251 252 253
    std::vector<FieldElementPtr>
    GetFieldElementsByField(const std::string& field_name) const {
        auto it = field_element_names_map_.find(field_name);
        if (it == field_element_names_map_.end()) {
            return {};
        }
        std::vector<FieldElementPtr> elements;
        for (auto& kv : it->second) {
            elements.push_back(GetResource<FieldElement>(kv.second));
        }

        return std::move(elements);
    }

254
    NUM_TYPE
255
    GetMaxSegmentNumByPartition(ID_TYPE partition_id) const {
256 257 258 259 260 261 262 263 264 265 266
        auto it = p_max_seg_num_.find(partition_id);
        if (it == p_max_seg_num_.end())
            return 0;
        return it->second;
    }

    void
    RefAll();
    void
    UnRefAll();

X
XuPeng-SH 已提交
267
    template <typename ResourceT>
268
    void
269
    DumpResource(const std::string& tag = "") const {
X
XuPeng-SH 已提交
270
        auto& resources = GetResources<ResourceT>();
271 272
        std::cout << typeid(*this).name() << " Dump " << GetID() << " " << ResourceT::Name << " Start [" << tag
                  << "]:" << resources.size() << std::endl;
X
XuPeng-SH 已提交
273 274 275
        for (auto& kv : resources) {
            std::cout << "\t" << kv.second->ToString() << std::endl;
        }
276 277
        std::cout << typeid(*this).name() << " Dump " << GetID() << " " << ResourceT::Name << "  End [" << tag
                  << "]:" << resources.size() << std::endl;
X
XuPeng-SH 已提交
278 279 280
    }

    template <typename T>
281
    void
X
XuPeng-SH 已提交
282 283 284 285 286 287 288
    DoUnRef(T& resource_map) {
        for (auto& kv : resource_map) {
            kv.second->UnRef();
        }
    }

    template <typename T>
289
    void
X
XuPeng-SH 已提交
290 291 292 293 294 295 296 297 298 299 300 301 302
    DoRef(T& resource_map) {
        for (auto& kv : resource_map) {
            kv.second->Ref();
        }
    }

    template <typename ResourceT>
    typename ResourceT::ScopedMapT&
    GetResources() {
        return std::get<Index<typename ResourceT::ScopedMapT, ScopedResourcesT>::value>(resources_);
    }

    template <typename ResourceT>
303
    const typename ResourceT::ScopedMapT&
X
XuPeng-SH 已提交
304 305 306 307 308 309
    GetResources() const {
        return std::get<Index<typename ResourceT::ScopedMapT, ScopedResourcesT>::value>(resources_);
    }

    template <typename ResourceT>
    typename ResourceT::Ptr
310
    GetResource(ID_TYPE id) const {
X
XuPeng-SH 已提交
311 312 313 314 315 316 317 318 319 320 321 322 323 324
        auto& resources = GetResources<ResourceT>();
        auto it = resources.find(id);
        if (it == resources.end()) {
            return nullptr;
        }
        return it->second.Get();
    }

    template <typename ResourceT>
    void
    AddResource(ScopedResource<ResourceT>& resource) {
        auto& resources = GetResources<ResourceT>();
        resources[resource->GetID()] = resource;
    }
325

326
    const std::string
327
    ToString() const;
328

329
 private:
330 331 332 333
    Snapshot(const Snapshot&) = delete;
    Snapshot&
    operator=(const Snapshot&) = delete;

334
    // PXU TODO: Re-org below data structures to reduce memory usage
X
XuPeng-SH 已提交
335
    ScopedResourcesT resources_;
336 337
    ID_TYPE current_schema_id_;
    std::map<std::string, ID_TYPE> field_names_map_;
338
    std::map<std::string, ID_TYPE> partition_names_map_;
339 340
    std::map<std::string, std::map<std::string, ID_TYPE>> field_element_names_map_;
    std::map<ID_TYPE, std::map<ID_TYPE, ID_TYPE>> element_segfiles_map_;
341
    std::map<ID_TYPE, std::set<ID_TYPE>> seg_segfiles_map_;
342 343 344 345
    std::map<ID_TYPE, ID_TYPE> seg_segc_map_;
    std::map<ID_TYPE, ID_TYPE> p_pc_map_;
    ID_TYPE latest_schema_commit_id_ = 0;
    std::map<ID_TYPE, NUM_TYPE> p_max_seg_num_;
X
XuPeng-SH 已提交
346
    LSN_TYPE max_lsn_;
347
    std::set<ID_TYPE> empty_set_;
348 349 350
};

using GCHandler = std::function<void(Snapshot::Ptr)>;
351 352
using ScopedSnapshotT = ScopedResource<Snapshot>;

353 354 355
}  // namespace snapshot
}  // namespace engine
}  // namespace milvus