ExecutionEngineImpl.cpp 20.0 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18
#include "db/engine/ExecutionEngineImpl.h"
S
starlord 已提交
19
#include "cache/CpuCacheMgr.h"
S
starlord 已提交
20
#include "cache/GpuCacheMgr.h"
X
xiaojun.lin 已提交
21
#include "knowhere/common/Config.h"
S
starlord 已提交
22
#include "metrics/Metrics.h"
X
xiaojun.lin 已提交
23 24
#include "scheduler/Utils.h"
#include "server/Config.h"
J
jinhai 已提交
25
#include "utils/CommonUtil.h"
S
starlord 已提交
26
#include "utils/Exception.h"
S
starlord 已提交
27
#include "utils/Log.h"
Y
youny626 已提交
28

S
starlord 已提交
29 30
#include "wrapper/ConfAdapter.h"
#include "wrapper/ConfAdapterMgr.h"
S
starlord 已提交
31 32
#include "wrapper/VecImpl.h"
#include "wrapper/VecIndex.h"
X
xj.lin 已提交
33

S
starlord 已提交
34
#include <stdexcept>
S
starlord 已提交
35
#include <utility>
W
wxyu 已提交
36
#include <vector>
S
starlord 已提交
37

J
JinHai-CN 已提交
38
//#define ON_SEARCH
S
starlord 已提交
39 40 41
namespace milvus {
namespace engine {

W
wxyu 已提交
42 43
class CachedQuantizer : public cache::DataObj {
 public:
W
wxyu 已提交
44 45
    explicit CachedQuantizer(knowhere::QuantizerPtr data) : data_(std::move(data)) {
    }
W
wxyu 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60

    knowhere::QuantizerPtr
    Data() {
        return data_;
    }

    int64_t
    Size() override {
        return data_->size;
    }

 private:
    knowhere::QuantizerPtr data_;
};

S
starlord 已提交
61 62 63
ExecutionEngineImpl::ExecutionEngineImpl(uint16_t dimension, const std::string& location, EngineType index_type,
                                         MetricType metric_type, int32_t nlist)
    : location_(location), dim_(dimension), index_type_(index_type), metric_type_(metric_type), nlist_(nlist) {
X
xj.lin 已提交
64
    index_ = CreatetVecIndex(EngineType::FAISS_IDMAP);
65
    if (!index_) {
66
        throw Exception(DB_ERROR, "Unsupported index type");
67
    }
X
xj.lin 已提交
68

X
xiaojun.lin 已提交
69 70 71
    TempMetaConf temp_conf;
    temp_conf.gpu_id = gpu_num_;
    temp_conf.dim = dimension;
S
starlord 已提交
72
    temp_conf.metric_type = (metric_type_ == MetricType::IP) ? knowhere::METRICTYPE::IP : knowhere::METRICTYPE::L2;
X
xiaojun.lin 已提交
73 74 75 76
    auto adapter = AdapterMgr::GetInstance().GetAdapter(index_->GetType());
    auto conf = adapter->Match(temp_conf);

    auto ec = std::static_pointer_cast<BFIndex>(index_)->Build(conf);
77 78 79
    if (ec != KNOWHERE_SUCCESS) {
        throw Exception(DB_ERROR, "Build index error");
    }
S
starlord 已提交
80 81
}

S
starlord 已提交
82 83 84
ExecutionEngineImpl::ExecutionEngineImpl(VecIndexPtr index, const std::string& location, EngineType index_type,
                                         MetricType metric_type, int32_t nlist)
    : index_(std::move(index)), location_(location), index_type_(index_type), metric_type_(metric_type), nlist_(nlist) {
X
xj.lin 已提交
85
}
S
starlord 已提交
86

S
starlord 已提交
87 88
VecIndexPtr
ExecutionEngineImpl::CreatetVecIndex(EngineType type) {
89 90 91
    server::Config& config = server::Config::GetInstance();
    bool gpu_resource_enable = true;
    config.GetGpuResourceConfigEnable(gpu_resource_enable);
X
xj.lin 已提交
92 93 94 95
    std::shared_ptr<VecIndex> index;
    switch (type) {
        case EngineType::FAISS_IDMAP: {
            index = GetVecIndexFactory(IndexType::FAISS_IDMAP);
S
starlord 已提交
96 97
            break;
        }
J
jinhai 已提交
98
        case EngineType::FAISS_IVFFLAT: {
Y
yudong.cai 已提交
99
#ifdef MILVUS_GPU_VERSION
100 101 102
            if (gpu_resource_enable)
                index = GetVecIndexFactory(IndexType::FAISS_IVFFLAT_MIX);
            else
Y
youny626 已提交
103
#endif
104
                index = GetVecIndexFactory(IndexType::FAISS_IVFFLAT_CPU);
S
starlord 已提交
105 106
            break;
        }
J
jinhai 已提交
107
        case EngineType::FAISS_IVFSQ8: {
Y
yudong.cai 已提交
108
#ifdef MILVUS_GPU_VERSION
109 110 111
            if (gpu_resource_enable)
                index = GetVecIndexFactory(IndexType::FAISS_IVFSQ8_MIX);
            else
Y
youny626 已提交
112
#endif
113
                index = GetVecIndexFactory(IndexType::FAISS_IVFSQ8_CPU);
S
starlord 已提交
114 115
            break;
        }
X
xj.lin 已提交
116 117 118 119
        case EngineType::NSG_MIX: {
            index = GetVecIndexFactory(IndexType::NSG_MIX);
            break;
        }
Y
Yukikaze-CZR 已提交
120
#ifdef CUSTOMIZATION
W
wxyu 已提交
121 122 123 124
        case EngineType::FAISS_IVFSQ8H: {
            index = GetVecIndexFactory(IndexType::FAISS_IVFSQ8_HYBRID);
            break;
        }
Y
Yukikaze-CZR 已提交
125
#endif
Z
zirui.chen 已提交
126
        case EngineType::FAISS_PQ: {
Y
yudong.cai 已提交
127
#ifdef MILVUS_GPU_VERSION
128 129 130
            if (gpu_resource_enable)
                index = GetVecIndexFactory(IndexType::FAISS_IVFPQ_MIX);
            else
Z
zirui.chen 已提交
131
#endif
132
                index = GetVecIndexFactory(IndexType::FAISS_IVFPQ_CPU);
Z
zirui.chen 已提交
133 134
            break;
        }
135 136 137 138 139 140 141 142
        case EngineType::SPTAG_KDT: {
            index = GetVecIndexFactory(IndexType::SPTAG_KDT_RNT_CPU);
            break;
        }
        case EngineType::SPTAG_BKT: {
            index = GetVecIndexFactory(IndexType::SPTAG_BKT_RNT_CPU);
            break;
        }
X
xj.lin 已提交
143
        default: {
144
            ENGINE_LOG_ERROR << "Unsupported index type";
S
starlord 已提交
145 146 147
            return nullptr;
        }
    }
X
xj.lin 已提交
148
    return index;
S
starlord 已提交
149 150
}

151
void
W
wxyu 已提交
152 153 154 155 156
ExecutionEngineImpl::HybridLoad() const {
    if (index_type_ != EngineType::FAISS_IVFSQ8H) {
        return;
    }

W
update  
wxyu 已提交
157 158 159 160 161
    if (index_->GetType() == IndexType::FAISS_IDMAP) {
        ENGINE_LOG_WARNING << "HybridLoad with type FAISS_IDMAP, ignore";
        return;
    }

G
groot 已提交
162
#ifdef MILVUS_GPU_VERSION
W
wxyu 已提交
163
    const std::string key = location_ + ".quantizer";
164 165

    server::Config& config = server::Config::GetInstance();
Y
yudong.cai 已提交
166
    std::vector<int64_t> gpus;
167 168 169 170 171
    Status s = config.GetGpuResourceConfigSearchResources(gpus);
    if (!s.ok()) {
        ENGINE_LOG_ERROR << s.message();
        return;
    }
W
wxyu 已提交
172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209

    // cache hit
    {
        const int64_t NOT_FOUND = -1;
        int64_t device_id = NOT_FOUND;
        knowhere::QuantizerPtr quantizer = nullptr;

        for (auto& gpu : gpus) {
            auto cache = cache::GpuCacheMgr::GetInstance(gpu);
            if (auto cached_quantizer = cache->GetIndex(key)) {
                device_id = gpu;
                quantizer = std::static_pointer_cast<CachedQuantizer>(cached_quantizer)->Data();
            }
        }

        if (device_id != NOT_FOUND) {
            index_->SetQuantizer(quantizer);
            return;
        }
    }

    // cache miss
    {
        std::vector<int64_t> all_free_mem;
        for (auto& gpu : gpus) {
            auto cache = cache::GpuCacheMgr::GetInstance(gpu);
            auto free_mem = cache->CacheCapacity() - cache->CacheUsage();
            all_free_mem.push_back(free_mem);
        }

        auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end());
        auto best_index = std::distance(all_free_mem.begin(), max_e);
        auto best_device_id = gpus[best_index];

        auto quantizer_conf = std::make_shared<knowhere::QuantizerCfg>();
        quantizer_conf->mode = 1;
        quantizer_conf->gpu_id = best_device_id;
        auto quantizer = index_->LoadQuantizer(quantizer_conf);
W
add log  
wxyu 已提交
210 211 212
        if (quantizer == nullptr) {
            ENGINE_LOG_ERROR << "quantizer is nullptr";
        }
W
wxyu 已提交
213 214 215 216
        index_->SetQuantizer(quantizer);
        auto cache_quantizer = std::make_shared<CachedQuantizer>(quantizer);
        cache::GpuCacheMgr::GetInstance(best_device_id)->InsertItem(key, cache_quantizer);
    }
G
groot 已提交
217
#endif
W
wxyu 已提交
218 219 220 221 222 223 224
}

void
ExecutionEngineImpl::HybridUnset() const {
    if (index_type_ != EngineType::FAISS_IVFSQ8H) {
        return;
    }
W
update  
wxyu 已提交
225 226 227
    if (index_->GetType() == IndexType::FAISS_IDMAP) {
        return;
    }
W
wxyu 已提交
228
    index_->UnsetQuantizer();
229 230
}

S
starlord 已提交
231
Status
S
starlord 已提交
232
ExecutionEngineImpl::AddWithIds(int64_t n, const float* xdata, const int64_t* xids) {
233 234
    auto status = index_->Add(n, xdata, xids);
    return status;
S
starlord 已提交
235 236
}

S
starlord 已提交
237 238 239
size_t
ExecutionEngineImpl::Count() const {
    if (index_ == nullptr) {
S
starlord 已提交
240
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, return count 0";
S
starlord 已提交
241 242
        return 0;
    }
X
xj.lin 已提交
243
    return index_->Count();
S
starlord 已提交
244 245
}

S
starlord 已提交
246 247
size_t
ExecutionEngineImpl::Size() const {
S
starlord 已提交
248
    return (size_t)(Count() * Dimension()) * sizeof(float);
S
starlord 已提交
249 250
}

S
starlord 已提交
251 252 253
size_t
ExecutionEngineImpl::Dimension() const {
    if (index_ == nullptr) {
S
starlord 已提交
254
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, return dimension " << dim_;
S
starlord 已提交
255 256
        return dim_;
    }
X
xj.lin 已提交
257
    return index_->Dimension();
S
starlord 已提交
258 259
}

S
starlord 已提交
260 261
size_t
ExecutionEngineImpl::PhysicalSize() const {
J
jinhai 已提交
262
    return server::CommonUtil::GetFileSize(location_);
S
starlord 已提交
263 264
}

S
starlord 已提交
265 266
Status
ExecutionEngineImpl::Serialize() {
267
    auto status = write_index(index_, location_);
268 269 270 271 272

    // here we reset index size by file size,
    // since some index type(such as SQ8) data size become smaller after serialized
    index_->set_size(PhysicalSize());

273
    return status;
S
starlord 已提交
274 275
}

S
starlord 已提交
276 277
Status
ExecutionEngineImpl::Load(bool to_cache) {
278
    index_ = std::static_pointer_cast<VecIndex>(cache::CpuCacheMgr::GetInstance()->GetIndex(location_));
J
jinhai 已提交
279
    bool already_in_cache = (index_ != nullptr);
S
starlord 已提交
280
    if (!already_in_cache) {
X
xj.lin 已提交
281
        try {
Y
Yu Kun 已提交
282 283
            double physical_size = PhysicalSize();
            server::CollectExecutionEngineMetrics metrics(physical_size);
X
xj.lin 已提交
284
            index_ = read_index(location_);
S
starlord 已提交
285
            if (index_ == nullptr) {
S
starlord 已提交
286 287
                std::string msg = "Failed to load index from " + location_;
                ENGINE_LOG_ERROR << msg;
S
starlord 已提交
288
                return Status(DB_ERROR, msg);
S
starlord 已提交
289 290 291
            } else {
                ENGINE_LOG_DEBUG << "Disk io from: " << location_;
            }
S
starlord 已提交
292
        } catch (std::exception& e) {
S
starlord 已提交
293
            ENGINE_LOG_ERROR << e.what();
S
starlord 已提交
294
            return Status(DB_ERROR, e.what());
X
xj.lin 已提交
295
        }
X
xj.lin 已提交
296 297
    }

J
jinhai 已提交
298
    if (!already_in_cache && to_cache) {
X
xj.lin 已提交
299 300 301
        Cache();
    }
    return Status::OK();
X
xj.lin 已提交
302 303
}

S
starlord 已提交
304
Status
W
wxyu 已提交
305
ExecutionEngineImpl::CopyToGpu(uint64_t device_id, bool hybrid) {
X
xiaojun.lin 已提交
306
#if 0
W
wxyu 已提交
307
    if (hybrid) {
X
xiaojun.lin 已提交
308
        const std::string key = location_ + ".quantizer";
W
wxyu 已提交
309
        std::vector<uint64_t> gpus{device_id};
X
xiaojun.lin 已提交
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355

        const int64_t NOT_FOUND = -1;
        int64_t device_id = NOT_FOUND;

        // cache hit
        {
            knowhere::QuantizerPtr quantizer = nullptr;

            for (auto& gpu : gpus) {
                auto cache = cache::GpuCacheMgr::GetInstance(gpu);
                if (auto cached_quantizer = cache->GetIndex(key)) {
                    device_id = gpu;
                    quantizer = std::static_pointer_cast<CachedQuantizer>(cached_quantizer)->Data();
                }
            }

            if (device_id != NOT_FOUND) {
                // cache hit
                auto config = std::make_shared<knowhere::QuantizerCfg>();
                config->gpu_id = device_id;
                config->mode = 2;
                auto new_index = index_->LoadData(quantizer, config);
                index_ = new_index;
            }
        }

        if (device_id == NOT_FOUND) {
            // cache miss
            std::vector<int64_t> all_free_mem;
            for (auto& gpu : gpus) {
                auto cache = cache::GpuCacheMgr::GetInstance(gpu);
                auto free_mem = cache->CacheCapacity() - cache->CacheUsage();
                all_free_mem.push_back(free_mem);
            }

            auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end());
            auto best_index = std::distance(all_free_mem.begin(), max_e);
            device_id = gpus[best_index];

            auto pair = index_->CopyToGpuWithQuantizer(device_id);
            index_ = pair.first;

            // cache
            auto cached_quantizer = std::make_shared<CachedQuantizer>(pair.second);
            cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(key, cached_quantizer);
        }
W
wxyu 已提交
356 357
        return Status::OK();
    }
X
xiaojun.lin 已提交
358
#endif
Y
youny626 已提交
359

G
groot 已提交
360
#ifdef MILVUS_GPU_VERSION
Y
youny626 已提交
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
    auto index = std::static_pointer_cast<VecIndex>(cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_));
    bool already_in_cache = (index != nullptr);
    if (already_in_cache) {
        index_ = index;
    } else {
        if (index_ == nullptr) {
            ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to copy to gpu";
            return Status(DB_ERROR, "index is null");
        }

        try {
            index_ = index_->CopyToGpu(device_id);
            ENGINE_LOG_DEBUG << "CPU to GPU" << device_id;
        } catch (std::exception& e) {
            ENGINE_LOG_ERROR << e.what();
            return Status(DB_ERROR, e.what());
        }
378
    }
Y
youny626 已提交
379 380 381 382

    if (!already_in_cache) {
        GpuCache(device_id);
    }
G
groot 已提交
383
#endif
Y
youny626 已提交
384

385 386 387
    return Status::OK();
}

Y
Yu Kun 已提交
388 389
Status
ExecutionEngineImpl::CopyToIndexFileToGpu(uint64_t device_id) {
G
groot 已提交
390
#ifdef MILVUS_GPU_VERSION
F
fishpenguin 已提交
391
    gpu_num_ = device_id;
Y
Yu Kun 已提交
392 393 394
    auto to_index_data = std::make_shared<ToIndexData>(PhysicalSize());
    cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(to_index_data);
    milvus::cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(location_, obj);
G
groot 已提交
395
#endif
Y
Yu Kun 已提交
396 397 398
    return Status::OK();
}

S
starlord 已提交
399 400
Status
ExecutionEngineImpl::CopyToCpu() {
401
    auto index = std::static_pointer_cast<VecIndex>(cache::CpuCacheMgr::GetInstance()->GetIndex(location_));
W
wxyu 已提交
402 403 404 405
    bool already_in_cache = (index != nullptr);
    if (already_in_cache) {
        index_ = index;
    } else {
S
starlord 已提交
406
        if (index_ == nullptr) {
S
starlord 已提交
407
            ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to copy to cpu";
S
starlord 已提交
408
            return Status(DB_ERROR, "index is null");
S
starlord 已提交
409 410
        }

Y
Yu Kun 已提交
411 412 413
        try {
            index_ = index_->CopyToCpu();
            ENGINE_LOG_DEBUG << "GPU to CPU";
S
starlord 已提交
414
        } catch (std::exception& e) {
S
starlord 已提交
415
            ENGINE_LOG_ERROR << e.what();
S
starlord 已提交
416
            return Status(DB_ERROR, e.what());
Y
Yu Kun 已提交
417 418 419
        }
    }

W
wxyu 已提交
420
    if (!already_in_cache) {
Y
Yu Kun 已提交
421
        Cache();
422 423 424 425
    }
    return Status::OK();
}

426 427 428 429 430 431 432 433 434 435 436 437
// ExecutionEnginePtr
// ExecutionEngineImpl::Clone() {
//    if (index_ == nullptr) {
//        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to clone";
//        return nullptr;
//    }
//
//    auto ret = std::make_shared<ExecutionEngineImpl>(dim_, location_, index_type_, metric_type_, nlist_);
//    ret->Init();
//    ret->index_ = index_->Clone();
//    return ret;
//}
W
wxyu 已提交
438

S
starlord 已提交
439
Status
S
starlord 已提交
440
ExecutionEngineImpl::Merge(const std::string& location) {
X
xj.lin 已提交
441
    if (location == location_) {
S
starlord 已提交
442
        return Status(DB_ERROR, "Cannot Merge Self");
X
xj.lin 已提交
443 444
    }
    ENGINE_LOG_DEBUG << "Merge index file: " << location << " to: " << location_;
S
starlord 已提交
445

S
starlord 已提交
446
    auto to_merge = cache::CpuCacheMgr::GetInstance()->GetIndex(location);
X
xj.lin 已提交
447
    if (!to_merge) {
X
xj.lin 已提交
448
        try {
Y
Yu Kun 已提交
449 450
            double physical_size = server::CommonUtil::GetFileSize(location);
            server::CollectExecutionEngineMetrics metrics(physical_size);
X
xj.lin 已提交
451
            to_merge = read_index(location);
S
starlord 已提交
452
        } catch (std::exception& e) {
S
starlord 已提交
453
            ENGINE_LOG_ERROR << e.what();
S
starlord 已提交
454
            return Status(DB_ERROR, e.what());
X
xj.lin 已提交
455
        }
X
xj.lin 已提交
456 457
    }

S
starlord 已提交
458
    if (index_ == nullptr) {
S
starlord 已提交
459
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to merge";
S
starlord 已提交
460
        return Status(DB_ERROR, "index is null");
S
starlord 已提交
461 462
    }

X
xj.lin 已提交
463
    if (auto file_index = std::dynamic_pointer_cast<BFIndex>(to_merge)) {
464 465
        auto status = index_->Add(file_index->Count(), file_index->GetRawVectors(), file_index->GetRawIds());
        if (!status.ok()) {
X
xj.lin 已提交
466 467
            ENGINE_LOG_ERROR << "Merge: Add Error";
        }
468
        return status;
X
xj.lin 已提交
469
    } else {
S
starlord 已提交
470
        return Status(DB_ERROR, "file index type is not idmap");
X
xj.lin 已提交
471
    }
S
starlord 已提交
472 473 474
}

ExecutionEnginePtr
S
starlord 已提交
475
ExecutionEngineImpl::BuildIndex(const std::string& location, EngineType engine_type) {
X
xj.lin 已提交
476 477 478
    ENGINE_LOG_DEBUG << "Build index file: " << location << " from: " << location_;

    auto from_index = std::dynamic_pointer_cast<BFIndex>(index_);
S
starlord 已提交
479
    if (from_index == nullptr) {
S
starlord 已提交
480 481 482 483
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: from_index is null, failed to build index";
        return nullptr;
    }

S
starlord 已提交
484
    auto to_index = CreatetVecIndex(engine_type);
X
xj.lin 已提交
485
    if (!to_index) {
486
        throw Exception(DB_ERROR, "Unsupported index type");
X
xj.lin 已提交
487 488
    }

X
xiaojun.lin 已提交
489 490 491 492
    TempMetaConf temp_conf;
    temp_conf.gpu_id = gpu_num_;
    temp_conf.dim = Dimension();
    temp_conf.nlist = nlist_;
S
starlord 已提交
493
    temp_conf.metric_type = (metric_type_ == MetricType::IP) ? knowhere::METRICTYPE::IP : knowhere::METRICTYPE::L2;
X
xiaojun.lin 已提交
494 495 496 497
    temp_conf.size = Count();

    auto adapter = AdapterMgr::GetInstance().GetAdapter(to_index->GetType());
    auto conf = adapter->Match(temp_conf);
X
xj.lin 已提交
498

S
starlord 已提交
499 500 501 502
    auto status = to_index->BuildAll(Count(), from_index->GetRawVectors(), from_index->GetRawIds(), conf);
    if (!status.ok()) {
        throw Exception(DB_ERROR, status.message());
    }
X
xj.lin 已提交
503

S
starlord 已提交
504
    return std::make_shared<ExecutionEngineImpl>(to_index, location, engine_type, metric_type_, nlist_);
S
starlord 已提交
505 506
}

S
starlord 已提交
507
Status
W
wxyu 已提交
508
ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels,
J
JinHai-CN 已提交
509
                            bool hybrid) {
510
#if 0
J
JinHai-CN 已提交
511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562
    if (index_type_ == EngineType::FAISS_IVFSQ8H) {
        if (!hybrid) {
            const std::string key = location_ + ".quantizer";
            std::vector<uint64_t> gpus = scheduler::get_gpu_pool();

            const int64_t NOT_FOUND = -1;
            int64_t device_id = NOT_FOUND;

            // cache hit
            {
                knowhere::QuantizerPtr quantizer = nullptr;

                for (auto& gpu : gpus) {
                    auto cache = cache::GpuCacheMgr::GetInstance(gpu);
                    if (auto cached_quantizer = cache->GetIndex(key)) {
                        device_id = gpu;
                        quantizer = std::static_pointer_cast<CachedQuantizer>(cached_quantizer)->Data();
                    }
                }

                if (device_id != NOT_FOUND) {
                    // cache hit
                    auto config = std::make_shared<knowhere::QuantizerCfg>();
                    config->gpu_id = device_id;
                    config->mode = 2;
                    auto new_index = index_->LoadData(quantizer, config);
                    index_ = new_index;
                }
            }

            if (device_id == NOT_FOUND) {
                // cache miss
                std::vector<int64_t> all_free_mem;
                for (auto& gpu : gpus) {
                    auto cache = cache::GpuCacheMgr::GetInstance(gpu);
                    auto free_mem = cache->CacheCapacity() - cache->CacheUsage();
                    all_free_mem.push_back(free_mem);
                }

                auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end());
                auto best_index = std::distance(all_free_mem.begin(), max_e);
                device_id = gpus[best_index];

                auto pair = index_->CopyToGpuWithQuantizer(device_id);
                index_ = pair.first;

                // cache
                auto cached_quantizer = std::make_shared<CachedQuantizer>(pair.second);
                cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(key, cached_quantizer);
            }
        }
    }
563
#endif
J
JinHai-CN 已提交
564

S
starlord 已提交
565
    if (index_ == nullptr) {
S
starlord 已提交
566
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to search";
S
starlord 已提交
567
        return Status(DB_ERROR, "index is null");
S
starlord 已提交
568 569
    }

Y
Yu Kun 已提交
570
    ENGINE_LOG_DEBUG << "Search Params: [k]  " << k << " [nprobe] " << nprobe;
X
xiaojun.lin 已提交
571 572 573 574 575 576 577 578 579

    // TODO(linxj): remove here. Get conf from function
    TempMetaConf temp_conf;
    temp_conf.k = k;
    temp_conf.nprobe = nprobe;

    auto adapter = AdapterMgr::GetInstance().GetAdapter(index_->GetType());
    auto conf = adapter->MatchSearch(temp_conf, index_->GetType());

W
wxyu 已提交
580 581 582
    if (hybrid) {
        HybridLoad();
    }
W
wxyu 已提交
583

X
xiaojun.lin 已提交
584
    auto status = index_->Search(n, data, distances, labels, conf);
W
wxyu 已提交
585

W
wxyu 已提交
586 587 588
    if (hybrid) {
        HybridUnset();
    }
W
wxyu 已提交
589

590
    if (!status.ok()) {
X
xj.lin 已提交
591 592
        ENGINE_LOG_ERROR << "Search error";
    }
593
    return status;
S
starlord 已提交
594 595
}

S
starlord 已提交
596 597
Status
ExecutionEngineImpl::Cache() {
598
    cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(index_);
S
starlord 已提交
599
    milvus::cache::CpuCacheMgr::GetInstance()->InsertItem(location_, obj);
S
starlord 已提交
600 601 602 603

    return Status::OK();
}

S
starlord 已提交
604 605
Status
ExecutionEngineImpl::GpuCache(uint64_t gpu_id) {
G
groot 已提交
606
#ifdef MILVUS_GPU_VERSION
607
    cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(index_);
S
starlord 已提交
608
    milvus::cache::GpuCacheMgr::GetInstance(gpu_id)->InsertItem(location_, obj);
G
groot 已提交
609
#endif
610
    return Status::OK();
Y
Yu Kun 已提交
611 612
}

X
xj.lin 已提交
613
// TODO(linxj): remove.
S
starlord 已提交
614 615
Status
ExecutionEngineImpl::Init() {
G
groot 已提交
616
#ifdef MILVUS_GPU_VERSION
S
starlord 已提交
617
    server::Config& config = server::Config::GetInstance();
Y
yudong.cai 已提交
618
    std::vector<int64_t> gpu_ids;
619
    Status s = config.GetGpuResourceConfigBuildIndexResources(gpu_ids);
620 621 622 623
    for (auto id : gpu_ids) {
        if (gpu_num_ == id) {
            return Status::OK();
        }
S
starlord 已提交
624
    }
S
starlord 已提交
625

626 627
    std::string msg = "Invalid gpu_num";
    return Status(SERVER_INVALID_ARGUMENT, msg);
G
groot 已提交
628 629 630
#else
    return Status::OK();
#endif
S
starlord 已提交
631 632
}

S
starlord 已提交
633 634
}  // namespace engine
}  // namespace milvus