ExecutionEngineImpl.cpp 14.9 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18
#include "db/engine/ExecutionEngineImpl.h"
S
starlord 已提交
19
#include "cache/CpuCacheMgr.h"
S
starlord 已提交
20
#include "cache/GpuCacheMgr.h"
S
starlord 已提交
21
#include "metrics/Metrics.h"
J
jinhai 已提交
22
#include "utils/CommonUtil.h"
S
starlord 已提交
23
#include "utils/Exception.h"
S
starlord 已提交
24
#include "utils/Log.h"
S
starlord 已提交
25

X
xiaojun.lin 已提交
26
#include "knowhere/common/Config.h"
S
starlord 已提交
27
#include "knowhere/common/Exception.h"
28
#include "server/Config.h"
S
starlord 已提交
29 30 31 32
#include "src/wrapper/VecImpl.h"
#include "src/wrapper/VecIndex.h"
#include "wrapper/ConfAdapter.h"
#include "wrapper/ConfAdapterMgr.h"
X
xj.lin 已提交
33

W
wxyu 已提交
34
#include <src/core/knowhere/knowhere/index/vector_index/IndexIVFSQHybrid.h>
W
wxyu 已提交
35
#include <src/scheduler/Utils.h>
S
starlord 已提交
36
#include <stdexcept>
S
starlord 已提交
37
#include <utility>
W
wxyu 已提交
38
#include <vector>
S
starlord 已提交
39 40 41 42

namespace milvus {
namespace engine {

W
wxyu 已提交
43 44
class CachedQuantizer : public cache::DataObj {
 public:
W
wxyu 已提交
45 46
    explicit CachedQuantizer(knowhere::QuantizerPtr data) : data_(std::move(data)) {
    }
W
wxyu 已提交
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61

    knowhere::QuantizerPtr
    Data() {
        return data_;
    }

    int64_t
    Size() override {
        return data_->size;
    }

 private:
    knowhere::QuantizerPtr data_;
};

S
starlord 已提交
62 63 64
ExecutionEngineImpl::ExecutionEngineImpl(uint16_t dimension, const std::string& location, EngineType index_type,
                                         MetricType metric_type, int32_t nlist)
    : location_(location), dim_(dimension), index_type_(index_type), metric_type_(metric_type), nlist_(nlist) {
X
xj.lin 已提交
65
    index_ = CreatetVecIndex(EngineType::FAISS_IDMAP);
66 67 68
    if (!index_) {
        throw Exception(DB_ERROR, "Could not create VecIndex");
    }
X
xj.lin 已提交
69

X
xiaojun.lin 已提交
70 71 72
    TempMetaConf temp_conf;
    temp_conf.gpu_id = gpu_num_;
    temp_conf.dim = dimension;
S
starlord 已提交
73
    temp_conf.metric_type = (metric_type_ == MetricType::IP) ? knowhere::METRICTYPE::IP : knowhere::METRICTYPE::L2;
X
xiaojun.lin 已提交
74 75 76 77
    auto adapter = AdapterMgr::GetInstance().GetAdapter(index_->GetType());
    auto conf = adapter->Match(temp_conf);

    auto ec = std::static_pointer_cast<BFIndex>(index_)->Build(conf);
78 79 80
    if (ec != KNOWHERE_SUCCESS) {
        throw Exception(DB_ERROR, "Build index error");
    }
S
starlord 已提交
81 82
}

S
starlord 已提交
83 84 85
ExecutionEngineImpl::ExecutionEngineImpl(VecIndexPtr index, const std::string& location, EngineType index_type,
                                         MetricType metric_type, int32_t nlist)
    : index_(std::move(index)), location_(location), index_type_(index_type), metric_type_(metric_type), nlist_(nlist) {
X
xj.lin 已提交
86
}
S
starlord 已提交
87

S
starlord 已提交
88 89
VecIndexPtr
ExecutionEngineImpl::CreatetVecIndex(EngineType type) {
X
xj.lin 已提交
90 91 92 93
    std::shared_ptr<VecIndex> index;
    switch (type) {
        case EngineType::FAISS_IDMAP: {
            index = GetVecIndexFactory(IndexType::FAISS_IDMAP);
S
starlord 已提交
94 95
            break;
        }
J
jinhai 已提交
96
        case EngineType::FAISS_IVFFLAT: {
X
xj.lin 已提交
97
            index = GetVecIndexFactory(IndexType::FAISS_IVFFLAT_MIX);
S
starlord 已提交
98 99
            break;
        }
J
jinhai 已提交
100 101
        case EngineType::FAISS_IVFSQ8: {
            index = GetVecIndexFactory(IndexType::FAISS_IVFSQ8_MIX);
S
starlord 已提交
102 103
            break;
        }
X
xj.lin 已提交
104 105 106 107
        case EngineType::NSG_MIX: {
            index = GetVecIndexFactory(IndexType::NSG_MIX);
            break;
        }
W
wxyu 已提交
108 109 110 111
        case EngineType::FAISS_IVFSQ8H: {
            index = GetVecIndexFactory(IndexType::FAISS_IVFSQ8_HYBRID);
            break;
        }
X
xj.lin 已提交
112
        default: {
S
starlord 已提交
113 114 115 116
            ENGINE_LOG_ERROR << "Invalid engine type";
            return nullptr;
        }
    }
X
xj.lin 已提交
117
    return index;
S
starlord 已提交
118 119
}

120
void
W
wxyu 已提交
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
ExecutionEngineImpl::HybridLoad() const {
    if (index_type_ != EngineType::FAISS_IVFSQ8H) {
        return;
    }

    const std::string key = location_ + ".quantizer";
    std::vector<uint64_t> gpus = scheduler::get_gpu_pool();

    // cache hit
    {
        const int64_t NOT_FOUND = -1;
        int64_t device_id = NOT_FOUND;
        knowhere::QuantizerPtr quantizer = nullptr;

        for (auto& gpu : gpus) {
            auto cache = cache::GpuCacheMgr::GetInstance(gpu);
            if (auto cached_quantizer = cache->GetIndex(key)) {
                device_id = gpu;
                quantizer = std::static_pointer_cast<CachedQuantizer>(cached_quantizer)->Data();
            }
        }

        if (device_id != NOT_FOUND) {
            index_->SetQuantizer(quantizer);
            return;
        }
    }

    // cache miss
    {
        std::vector<int64_t> all_free_mem;
        for (auto& gpu : gpus) {
            auto cache = cache::GpuCacheMgr::GetInstance(gpu);
            auto free_mem = cache->CacheCapacity() - cache->CacheUsage();
            all_free_mem.push_back(free_mem);
        }

        auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end());
        auto best_index = std::distance(all_free_mem.begin(), max_e);
        auto best_device_id = gpus[best_index];

        auto quantizer_conf = std::make_shared<knowhere::QuantizerCfg>();
        quantizer_conf->mode = 1;
        quantizer_conf->gpu_id = best_device_id;
        auto quantizer = index_->LoadQuantizer(quantizer_conf);
        index_->SetQuantizer(quantizer);
        auto cache_quantizer = std::make_shared<CachedQuantizer>(quantizer);
        cache::GpuCacheMgr::GetInstance(best_device_id)->InsertItem(key, cache_quantizer);
    }
}

void
ExecutionEngineImpl::HybridUnset() const {
    if (index_type_ != EngineType::FAISS_IVFSQ8H) {
        return;
    }
    index_->UnsetQuantizer();
178 179
}

S
starlord 已提交
180
Status
S
starlord 已提交
181
ExecutionEngineImpl::AddWithIds(int64_t n, const float* xdata, const int64_t* xids) {
182 183
    auto status = index_->Add(n, xdata, xids);
    return status;
S
starlord 已提交
184 185
}

S
starlord 已提交
186 187 188
size_t
ExecutionEngineImpl::Count() const {
    if (index_ == nullptr) {
S
starlord 已提交
189
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, return count 0";
S
starlord 已提交
190 191
        return 0;
    }
X
xj.lin 已提交
192
    return index_->Count();
S
starlord 已提交
193 194
}

S
starlord 已提交
195 196
size_t
ExecutionEngineImpl::Size() const {
S
starlord 已提交
197
    return (size_t)(Count() * Dimension()) * sizeof(float);
S
starlord 已提交
198 199
}

S
starlord 已提交
200 201 202
size_t
ExecutionEngineImpl::Dimension() const {
    if (index_ == nullptr) {
S
starlord 已提交
203
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, return dimension " << dim_;
S
starlord 已提交
204 205
        return dim_;
    }
X
xj.lin 已提交
206
    return index_->Dimension();
S
starlord 已提交
207 208
}

S
starlord 已提交
209 210
size_t
ExecutionEngineImpl::PhysicalSize() const {
J
jinhai 已提交
211
    return server::CommonUtil::GetFileSize(location_);
S
starlord 已提交
212 213
}

S
starlord 已提交
214 215
Status
ExecutionEngineImpl::Serialize() {
216 217
    auto status = write_index(index_, location_);
    return status;
S
starlord 已提交
218 219
}

S
starlord 已提交
220 221
Status
ExecutionEngineImpl::Load(bool to_cache) {
222
    index_ = std::static_pointer_cast<VecIndex>(cache::CpuCacheMgr::GetInstance()->GetIndex(location_));
J
jinhai 已提交
223
    bool already_in_cache = (index_ != nullptr);
S
starlord 已提交
224
    if (!already_in_cache) {
X
xj.lin 已提交
225
        try {
Y
Yu Kun 已提交
226 227
            double physical_size = PhysicalSize();
            server::CollectExecutionEngineMetrics metrics(physical_size);
X
xj.lin 已提交
228
            index_ = read_index(location_);
S
starlord 已提交
229
            if (index_ == nullptr) {
S
starlord 已提交
230 231
                std::string msg = "Failed to load index from " + location_;
                ENGINE_LOG_ERROR << msg;
S
starlord 已提交
232
                return Status(DB_ERROR, msg);
S
starlord 已提交
233 234 235
            } else {
                ENGINE_LOG_DEBUG << "Disk io from: " << location_;
            }
S
starlord 已提交
236
        } catch (std::exception& e) {
S
starlord 已提交
237
            ENGINE_LOG_ERROR << e.what();
S
starlord 已提交
238
            return Status(DB_ERROR, e.what());
X
xj.lin 已提交
239
        }
X
xj.lin 已提交
240 241
    }

J
jinhai 已提交
242
    if (!already_in_cache && to_cache) {
X
xj.lin 已提交
243 244 245
        Cache();
    }
    return Status::OK();
X
xj.lin 已提交
246 247
}

S
starlord 已提交
248
Status
W
wxyu 已提交
249 250 251 252 253 254 255 256 257 258 259 260
ExecutionEngineImpl::CopyToGpu(uint64_t device_id, bool hybrid) {
    if (hybrid) {
        auto key = location_ + ".quantizer";
        auto quantizer =
            std::static_pointer_cast<CachedQuantizer>(cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(key));

        auto conf = std::make_shared<knowhere::QuantizerCfg>();
        conf->gpu_id = device_id;

        if (quantizer) {
            // cache hit
            conf->mode = 2;
W
wxyu 已提交
261 262
            auto new_index = index_->LoadData(quantizer->Data(), conf);
            index_ = new_index;
W
wxyu 已提交
263
        } else {
W
wxyu 已提交
264 265
            auto pair = index_->CopyToGpuWithQuantizer(device_id);
            index_ = pair.first;
W
wxyu 已提交
266 267

            // cache
W
wxyu 已提交
268
            auto cached_quantizer = std::make_shared<CachedQuantizer>(pair.second);
W
wxyu 已提交
269 270 271 272 273
            cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(key, cached_quantizer);
        }
        return Status::OK();
    }

274
    auto index = std::static_pointer_cast<VecIndex>(cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_));
W
wxyu 已提交
275 276 277 278
    bool already_in_cache = (index != nullptr);
    if (already_in_cache) {
        index_ = index;
    } else {
S
starlord 已提交
279
        if (index_ == nullptr) {
S
starlord 已提交
280
            ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to copy to gpu";
S
starlord 已提交
281
            return Status(DB_ERROR, "index is null");
S
starlord 已提交
282 283
        }

Y
Yu Kun 已提交
284 285 286
        try {
            index_ = index_->CopyToGpu(device_id);
            ENGINE_LOG_DEBUG << "CPU to GPU" << device_id;
S
starlord 已提交
287
        } catch (std::exception& e) {
S
starlord 已提交
288
            ENGINE_LOG_ERROR << e.what();
S
starlord 已提交
289
            return Status(DB_ERROR, e.what());
Y
Yu Kun 已提交
290
        }
291
    }
Y
Yu Kun 已提交
292 293 294 295 296

    if (!already_in_cache) {
        GpuCache(device_id);
    }

297 298 299
    return Status::OK();
}

Y
Yu Kun 已提交
300 301
Status
ExecutionEngineImpl::CopyToIndexFileToGpu(uint64_t device_id) {
Y
Yu Kun 已提交
302 303 304
    auto to_index_data = std::make_shared<ToIndexData>(PhysicalSize());
    cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(to_index_data);
    milvus::cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(location_, obj);
Y
Yu Kun 已提交
305 306 307
    return Status::OK();
}

S
starlord 已提交
308 309
Status
ExecutionEngineImpl::CopyToCpu() {
310
    auto index = std::static_pointer_cast<VecIndex>(cache::CpuCacheMgr::GetInstance()->GetIndex(location_));
W
wxyu 已提交
311 312 313 314
    bool already_in_cache = (index != nullptr);
    if (already_in_cache) {
        index_ = index;
    } else {
S
starlord 已提交
315
        if (index_ == nullptr) {
S
starlord 已提交
316
            ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to copy to cpu";
S
starlord 已提交
317
            return Status(DB_ERROR, "index is null");
S
starlord 已提交
318 319
        }

Y
Yu Kun 已提交
320 321 322
        try {
            index_ = index_->CopyToCpu();
            ENGINE_LOG_DEBUG << "GPU to CPU";
S
starlord 已提交
323
        } catch (std::exception& e) {
S
starlord 已提交
324
            ENGINE_LOG_ERROR << e.what();
S
starlord 已提交
325
            return Status(DB_ERROR, e.what());
Y
Yu Kun 已提交
326 327 328
        }
    }

W
wxyu 已提交
329
    if (!already_in_cache) {
Y
Yu Kun 已提交
330
        Cache();
331 332 333 334
    }
    return Status::OK();
}

S
starlord 已提交
335 336 337
ExecutionEnginePtr
ExecutionEngineImpl::Clone() {
    if (index_ == nullptr) {
S
starlord 已提交
338 339 340 341
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to clone";
        return nullptr;
    }

W
wxyu 已提交
342 343 344 345 346 347
    auto ret = std::make_shared<ExecutionEngineImpl>(dim_, location_, index_type_, metric_type_, nlist_);
    ret->Init();
    ret->index_ = index_->Clone();
    return ret;
}

S
starlord 已提交
348
Status
S
starlord 已提交
349
ExecutionEngineImpl::Merge(const std::string& location) {
X
xj.lin 已提交
350
    if (location == location_) {
S
starlord 已提交
351
        return Status(DB_ERROR, "Cannot Merge Self");
X
xj.lin 已提交
352 353
    }
    ENGINE_LOG_DEBUG << "Merge index file: " << location << " to: " << location_;
S
starlord 已提交
354

S
starlord 已提交
355
    auto to_merge = cache::CpuCacheMgr::GetInstance()->GetIndex(location);
X
xj.lin 已提交
356
    if (!to_merge) {
X
xj.lin 已提交
357
        try {
Y
Yu Kun 已提交
358 359
            double physical_size = server::CommonUtil::GetFileSize(location);
            server::CollectExecutionEngineMetrics metrics(physical_size);
X
xj.lin 已提交
360
            to_merge = read_index(location);
S
starlord 已提交
361
        } catch (std::exception& e) {
S
starlord 已提交
362
            ENGINE_LOG_ERROR << e.what();
S
starlord 已提交
363
            return Status(DB_ERROR, e.what());
X
xj.lin 已提交
364
        }
X
xj.lin 已提交
365 366
    }

S
starlord 已提交
367
    if (index_ == nullptr) {
S
starlord 已提交
368
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to merge";
S
starlord 已提交
369
        return Status(DB_ERROR, "index is null");
S
starlord 已提交
370 371
    }

X
xj.lin 已提交
372
    if (auto file_index = std::dynamic_pointer_cast<BFIndex>(to_merge)) {
373 374
        auto status = index_->Add(file_index->Count(), file_index->GetRawVectors(), file_index->GetRawIds());
        if (!status.ok()) {
X
xj.lin 已提交
375 376
            ENGINE_LOG_ERROR << "Merge: Add Error";
        }
377
        return status;
X
xj.lin 已提交
378
    } else {
S
starlord 已提交
379
        return Status(DB_ERROR, "file index type is not idmap");
X
xj.lin 已提交
380
    }
S
starlord 已提交
381 382 383
}

ExecutionEnginePtr
S
starlord 已提交
384
ExecutionEngineImpl::BuildIndex(const std::string& location, EngineType engine_type) {
X
xj.lin 已提交
385 386 387
    ENGINE_LOG_DEBUG << "Build index file: " << location << " from: " << location_;

    auto from_index = std::dynamic_pointer_cast<BFIndex>(index_);
S
starlord 已提交
388
    if (from_index == nullptr) {
S
starlord 已提交
389 390 391 392
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: from_index is null, failed to build index";
        return nullptr;
    }

S
starlord 已提交
393
    auto to_index = CreatetVecIndex(engine_type);
X
xj.lin 已提交
394
    if (!to_index) {
395
        throw Exception(DB_ERROR, "Could not create VecIndex");
X
xj.lin 已提交
396 397
    }

X
xiaojun.lin 已提交
398 399 400 401
    TempMetaConf temp_conf;
    temp_conf.gpu_id = gpu_num_;
    temp_conf.dim = Dimension();
    temp_conf.nlist = nlist_;
S
starlord 已提交
402
    temp_conf.metric_type = (metric_type_ == MetricType::IP) ? knowhere::METRICTYPE::IP : knowhere::METRICTYPE::L2;
X
xiaojun.lin 已提交
403 404 405 406
    temp_conf.size = Count();

    auto adapter = AdapterMgr::GetInstance().GetAdapter(to_index->GetType());
    auto conf = adapter->Match(temp_conf);
X
xj.lin 已提交
407

S
starlord 已提交
408 409 410 411
    auto status = to_index->BuildAll(Count(), from_index->GetRawVectors(), from_index->GetRawIds(), conf);
    if (!status.ok()) {
        throw Exception(DB_ERROR, status.message());
    }
X
xj.lin 已提交
412

S
starlord 已提交
413
    return std::make_shared<ExecutionEngineImpl>(to_index, location, engine_type, metric_type_, nlist_);
S
starlord 已提交
414 415
}

S
starlord 已提交
416
Status
W
wxyu 已提交
417 418
ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels,
                            bool hybrid) const {
S
starlord 已提交
419
    if (index_ == nullptr) {
S
starlord 已提交
420
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to search";
S
starlord 已提交
421
        return Status(DB_ERROR, "index is null");
S
starlord 已提交
422 423
    }

Y
Yu Kun 已提交
424
    ENGINE_LOG_DEBUG << "Search Params: [k]  " << k << " [nprobe] " << nprobe;
X
xiaojun.lin 已提交
425 426 427 428 429 430 431 432 433

    // TODO(linxj): remove here. Get conf from function
    TempMetaConf temp_conf;
    temp_conf.k = k;
    temp_conf.nprobe = nprobe;

    auto adapter = AdapterMgr::GetInstance().GetAdapter(index_->GetType());
    auto conf = adapter->MatchSearch(temp_conf, index_->GetType());

W
wxyu 已提交
434 435 436
    if (hybrid) {
        HybridLoad();
    }
W
wxyu 已提交
437

X
xiaojun.lin 已提交
438
    auto status = index_->Search(n, data, distances, labels, conf);
W
wxyu 已提交
439

W
wxyu 已提交
440 441 442
    if (hybrid) {
        HybridUnset();
    }
W
wxyu 已提交
443

444
    if (!status.ok()) {
X
xj.lin 已提交
445 446
        ENGINE_LOG_ERROR << "Search error";
    }
447
    return status;
S
starlord 已提交
448 449
}

S
starlord 已提交
450 451
Status
ExecutionEngineImpl::Cache() {
452
    cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(index_);
S
starlord 已提交
453
    milvus::cache::CpuCacheMgr::GetInstance()->InsertItem(location_, obj);
S
starlord 已提交
454 455 456 457

    return Status::OK();
}

S
starlord 已提交
458 459
Status
ExecutionEngineImpl::GpuCache(uint64_t gpu_id) {
460
    cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(index_);
S
starlord 已提交
461
    milvus::cache::GpuCacheMgr::GetInstance(gpu_id)->InsertItem(location_, obj);
462 463

    return Status::OK();
Y
Yu Kun 已提交
464 465
}

X
xj.lin 已提交
466
// TODO(linxj): remove.
S
starlord 已提交
467 468
Status
ExecutionEngineImpl::Init() {
S
starlord 已提交
469
    server::Config& config = server::Config::GetInstance();
470
    Status s = config.GetDBConfigBuildIndexGPU(gpu_num_);
S
starlord 已提交
471
    if (!s.ok()) {
S
starlord 已提交
472
        return s;
S
starlord 已提交
473
    }
S
starlord 已提交
474 475 476 477

    return Status::OK();
}

S
starlord 已提交
478 479
}  // namespace engine
}  // namespace milvus