ExecutionEngineImpl.cpp 20.1 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18
#include "db/engine/ExecutionEngineImpl.h"
S
starlord 已提交
19
#include "cache/CpuCacheMgr.h"
S
starlord 已提交
20
#include "cache/GpuCacheMgr.h"
X
xiaojun.lin 已提交
21
#include "knowhere/common/Config.h"
S
starlord 已提交
22
#include "metrics/Metrics.h"
X
xiaojun.lin 已提交
23 24
#include "scheduler/Utils.h"
#include "server/Config.h"
J
jinhai 已提交
25
#include "utils/CommonUtil.h"
S
starlord 已提交
26
#include "utils/Exception.h"
S
starlord 已提交
27
#include "utils/Log.h"
Y
youny626 已提交
28

S
starlord 已提交
29 30
#include "wrapper/ConfAdapter.h"
#include "wrapper/ConfAdapterMgr.h"
S
starlord 已提交
31 32
#include "wrapper/VecImpl.h"
#include "wrapper/VecIndex.h"
X
xj.lin 已提交
33

S
starlord 已提交
34
#include <stdexcept>
S
starlord 已提交
35
#include <utility>
W
wxyu 已提交
36
#include <vector>
S
starlord 已提交
37

J
JinHai-CN 已提交
38
//#define ON_SEARCH
S
starlord 已提交
39 40 41
namespace milvus {
namespace engine {

W
wxyu 已提交
42 43
class CachedQuantizer : public cache::DataObj {
 public:
W
wxyu 已提交
44 45
    explicit CachedQuantizer(knowhere::QuantizerPtr data) : data_(std::move(data)) {
    }
W
wxyu 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60

    knowhere::QuantizerPtr
    Data() {
        return data_;
    }

    int64_t
    Size() override {
        return data_->size;
    }

 private:
    knowhere::QuantizerPtr data_;
};

S
starlord 已提交
61 62 63
ExecutionEngineImpl::ExecutionEngineImpl(uint16_t dimension, const std::string& location, EngineType index_type,
                                         MetricType metric_type, int32_t nlist)
    : location_(location), dim_(dimension), index_type_(index_type), metric_type_(metric_type), nlist_(nlist) {
X
xj.lin 已提交
64
    index_ = CreatetVecIndex(EngineType::FAISS_IDMAP);
65
    if (!index_) {
66
        throw Exception(DB_ERROR, "Unsupported index type");
67
    }
X
xj.lin 已提交
68

X
xiaojun.lin 已提交
69 70 71
    TempMetaConf temp_conf;
    temp_conf.gpu_id = gpu_num_;
    temp_conf.dim = dimension;
S
starlord 已提交
72
    temp_conf.metric_type = (metric_type_ == MetricType::IP) ? knowhere::METRICTYPE::IP : knowhere::METRICTYPE::L2;
X
xiaojun.lin 已提交
73 74 75 76
    auto adapter = AdapterMgr::GetInstance().GetAdapter(index_->GetType());
    auto conf = adapter->Match(temp_conf);

    auto ec = std::static_pointer_cast<BFIndex>(index_)->Build(conf);
77 78 79
    if (ec != KNOWHERE_SUCCESS) {
        throw Exception(DB_ERROR, "Build index error");
    }
S
starlord 已提交
80 81
}

S
starlord 已提交
82 83 84
ExecutionEngineImpl::ExecutionEngineImpl(VecIndexPtr index, const std::string& location, EngineType index_type,
                                         MetricType metric_type, int32_t nlist)
    : index_(std::move(index)), location_(location), index_type_(index_type), metric_type_(metric_type), nlist_(nlist) {
X
xj.lin 已提交
85
}
S
starlord 已提交
86

S
starlord 已提交
87 88
VecIndexPtr
ExecutionEngineImpl::CreatetVecIndex(EngineType type) {
X
xj.lin 已提交
89 90 91 92
    std::shared_ptr<VecIndex> index;
    switch (type) {
        case EngineType::FAISS_IDMAP: {
            index = GetVecIndexFactory(IndexType::FAISS_IDMAP);
S
starlord 已提交
93 94
            break;
        }
J
jinhai 已提交
95
        case EngineType::FAISS_IVFFLAT: {
Y
youny626 已提交
96 97 98
#ifdef MILVUS_CPU_VERSION
            index = GetVecIndexFactory(IndexType::FAISS_IVFFLAT_CPU);
#else
X
xj.lin 已提交
99
            index = GetVecIndexFactory(IndexType::FAISS_IVFFLAT_MIX);
Y
youny626 已提交
100
#endif
S
starlord 已提交
101 102
            break;
        }
J
jinhai 已提交
103
        case EngineType::FAISS_IVFSQ8: {
Y
youny626 已提交
104 105 106
#ifdef MILVUS_CPU_VERSION
            index = GetVecIndexFactory(IndexType::FAISS_IVFSQ8_CPU);
#else
J
jinhai 已提交
107
            index = GetVecIndexFactory(IndexType::FAISS_IVFSQ8_MIX);
Y
youny626 已提交
108
#endif
S
starlord 已提交
109 110
            break;
        }
X
xj.lin 已提交
111 112 113 114
        case EngineType::NSG_MIX: {
            index = GetVecIndexFactory(IndexType::NSG_MIX);
            break;
        }
Y
Yukikaze-CZR 已提交
115
#ifdef CUSTOMIZATION
W
wxyu 已提交
116 117 118 119
        case EngineType::FAISS_IVFSQ8H: {
            index = GetVecIndexFactory(IndexType::FAISS_IVFSQ8_HYBRID);
            break;
        }
Y
Yukikaze-CZR 已提交
120
#endif
Z
zirui.chen 已提交
121
        case EngineType::FAISS_PQ: {
Z
zirui.chen 已提交
122 123 124
#ifdef MILVUS_CPU_VERSION
            index = GetVecIndexFactory(IndexType::FAISS_IVFPQ_CPU);
#else
Z
zirui.chen 已提交
125
            index = GetVecIndexFactory(IndexType::FAISS_IVFPQ_MIX);
Z
zirui.chen 已提交
126
#endif
Z
zirui.chen 已提交
127 128
            break;
        }
129 130 131 132 133 134 135 136
        case EngineType::SPTAG_KDT: {
            index = GetVecIndexFactory(IndexType::SPTAG_KDT_RNT_CPU);
            break;
        }
        case EngineType::SPTAG_BKT: {
            index = GetVecIndexFactory(IndexType::SPTAG_BKT_RNT_CPU);
            break;
        }
X
xj.lin 已提交
137
        default: {
138
            ENGINE_LOG_ERROR << "Unsupported index type";
S
starlord 已提交
139 140 141
            return nullptr;
        }
    }
X
xj.lin 已提交
142
    return index;
S
starlord 已提交
143 144
}

145
void
W
wxyu 已提交
146 147 148 149 150
ExecutionEngineImpl::HybridLoad() const {
    if (index_type_ != EngineType::FAISS_IVFSQ8H) {
        return;
    }

W
update  
wxyu 已提交
151 152 153 154 155
    if (index_->GetType() == IndexType::FAISS_IDMAP) {
        ENGINE_LOG_WARNING << "HybridLoad with type FAISS_IDMAP, ignore";
        return;
    }

G
groot 已提交
156
#ifdef MILVUS_GPU_VERSION
W
wxyu 已提交
157
    const std::string key = location_ + ".quantizer";
158 159

    server::Config& config = server::Config::GetInstance();
Y
yudong.cai 已提交
160
    std::vector<int64_t> gpus;
161 162 163 164 165
    Status s = config.GetGpuResourceConfigSearchResources(gpus);
    if (!s.ok()) {
        ENGINE_LOG_ERROR << s.message();
        return;
    }
W
wxyu 已提交
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203

    // cache hit
    {
        const int64_t NOT_FOUND = -1;
        int64_t device_id = NOT_FOUND;
        knowhere::QuantizerPtr quantizer = nullptr;

        for (auto& gpu : gpus) {
            auto cache = cache::GpuCacheMgr::GetInstance(gpu);
            if (auto cached_quantizer = cache->GetIndex(key)) {
                device_id = gpu;
                quantizer = std::static_pointer_cast<CachedQuantizer>(cached_quantizer)->Data();
            }
        }

        if (device_id != NOT_FOUND) {
            index_->SetQuantizer(quantizer);
            return;
        }
    }

    // cache miss
    {
        std::vector<int64_t> all_free_mem;
        for (auto& gpu : gpus) {
            auto cache = cache::GpuCacheMgr::GetInstance(gpu);
            auto free_mem = cache->CacheCapacity() - cache->CacheUsage();
            all_free_mem.push_back(free_mem);
        }

        auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end());
        auto best_index = std::distance(all_free_mem.begin(), max_e);
        auto best_device_id = gpus[best_index];

        auto quantizer_conf = std::make_shared<knowhere::QuantizerCfg>();
        quantizer_conf->mode = 1;
        quantizer_conf->gpu_id = best_device_id;
        auto quantizer = index_->LoadQuantizer(quantizer_conf);
W
add log  
wxyu 已提交
204 205 206
        if (quantizer == nullptr) {
            ENGINE_LOG_ERROR << "quantizer is nullptr";
        }
W
wxyu 已提交
207 208 209 210
        index_->SetQuantizer(quantizer);
        auto cache_quantizer = std::make_shared<CachedQuantizer>(quantizer);
        cache::GpuCacheMgr::GetInstance(best_device_id)->InsertItem(key, cache_quantizer);
    }
G
groot 已提交
211
#endif
W
wxyu 已提交
212 213 214 215 216 217 218
}

void
ExecutionEngineImpl::HybridUnset() const {
    if (index_type_ != EngineType::FAISS_IVFSQ8H) {
        return;
    }
W
update  
wxyu 已提交
219 220 221
    if (index_->GetType() == IndexType::FAISS_IDMAP) {
        return;
    }
W
wxyu 已提交
222
    index_->UnsetQuantizer();
223 224
}

S
starlord 已提交
225
Status
S
starlord 已提交
226
ExecutionEngineImpl::AddWithIds(int64_t n, const float* xdata, const int64_t* xids) {
227 228
    auto status = index_->Add(n, xdata, xids);
    return status;
S
starlord 已提交
229 230
}

S
starlord 已提交
231 232 233
size_t
ExecutionEngineImpl::Count() const {
    if (index_ == nullptr) {
S
starlord 已提交
234
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, return count 0";
S
starlord 已提交
235 236
        return 0;
    }
X
xj.lin 已提交
237
    return index_->Count();
S
starlord 已提交
238 239
}

S
starlord 已提交
240 241
size_t
ExecutionEngineImpl::Size() const {
S
starlord 已提交
242
    return (size_t)(Count() * Dimension()) * sizeof(float);
S
starlord 已提交
243 244
}

S
starlord 已提交
245 246 247
size_t
ExecutionEngineImpl::Dimension() const {
    if (index_ == nullptr) {
S
starlord 已提交
248
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, return dimension " << dim_;
S
starlord 已提交
249 250
        return dim_;
    }
X
xj.lin 已提交
251
    return index_->Dimension();
S
starlord 已提交
252 253
}

S
starlord 已提交
254 255
size_t
ExecutionEngineImpl::PhysicalSize() const {
J
jinhai 已提交
256
    return server::CommonUtil::GetFileSize(location_);
S
starlord 已提交
257 258
}

S
starlord 已提交
259 260
Status
ExecutionEngineImpl::Serialize() {
261
    auto status = write_index(index_, location_);
262 263 264 265

    // here we reset index size by file size,
    // since some index type(such as SQ8) data size become smaller after serialized
    index_->set_size(PhysicalSize());
G
add log  
groot 已提交
266
    ENGINE_LOG_DEBUG << "Finish serialize index file: " << location_ << " size: " << index_->Size();
267

G
groot 已提交
268 269 270 271 272
    if (index_->Size() == 0) {
        std::string msg = "Failed to serialize file: " + location_ + " reason: out of disk space or memory";
        status = Status(DB_ERROR, msg);
    }

273
    return status;
S
starlord 已提交
274 275
}

S
starlord 已提交
276 277
Status
ExecutionEngineImpl::Load(bool to_cache) {
278
    index_ = std::static_pointer_cast<VecIndex>(cache::CpuCacheMgr::GetInstance()->GetIndex(location_));
J
jinhai 已提交
279
    bool already_in_cache = (index_ != nullptr);
S
starlord 已提交
280
    if (!already_in_cache) {
X
xj.lin 已提交
281
        try {
Y
Yu Kun 已提交
282 283
            double physical_size = PhysicalSize();
            server::CollectExecutionEngineMetrics metrics(physical_size);
X
xj.lin 已提交
284
            index_ = read_index(location_);
S
starlord 已提交
285
            if (index_ == nullptr) {
S
starlord 已提交
286 287
                std::string msg = "Failed to load index from " + location_;
                ENGINE_LOG_ERROR << msg;
S
starlord 已提交
288
                return Status(DB_ERROR, msg);
S
starlord 已提交
289 290 291
            } else {
                ENGINE_LOG_DEBUG << "Disk io from: " << location_;
            }
S
starlord 已提交
292
        } catch (std::exception& e) {
S
starlord 已提交
293
            ENGINE_LOG_ERROR << e.what();
S
starlord 已提交
294
            return Status(DB_ERROR, e.what());
X
xj.lin 已提交
295
        }
X
xj.lin 已提交
296 297
    }

J
jinhai 已提交
298
    if (!already_in_cache && to_cache) {
X
xj.lin 已提交
299 300 301
        Cache();
    }
    return Status::OK();
X
xj.lin 已提交
302 303
}

S
starlord 已提交
304
Status
W
wxyu 已提交
305
ExecutionEngineImpl::CopyToGpu(uint64_t device_id, bool hybrid) {
X
xiaojun.lin 已提交
306
#if 0
W
wxyu 已提交
307
    if (hybrid) {
X
xiaojun.lin 已提交
308
        const std::string key = location_ + ".quantizer";
W
wxyu 已提交
309
        std::vector<uint64_t> gpus{device_id};
X
xiaojun.lin 已提交
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355

        const int64_t NOT_FOUND = -1;
        int64_t device_id = NOT_FOUND;

        // cache hit
        {
            knowhere::QuantizerPtr quantizer = nullptr;

            for (auto& gpu : gpus) {
                auto cache = cache::GpuCacheMgr::GetInstance(gpu);
                if (auto cached_quantizer = cache->GetIndex(key)) {
                    device_id = gpu;
                    quantizer = std::static_pointer_cast<CachedQuantizer>(cached_quantizer)->Data();
                }
            }

            if (device_id != NOT_FOUND) {
                // cache hit
                auto config = std::make_shared<knowhere::QuantizerCfg>();
                config->gpu_id = device_id;
                config->mode = 2;
                auto new_index = index_->LoadData(quantizer, config);
                index_ = new_index;
            }
        }

        if (device_id == NOT_FOUND) {
            // cache miss
            std::vector<int64_t> all_free_mem;
            for (auto& gpu : gpus) {
                auto cache = cache::GpuCacheMgr::GetInstance(gpu);
                auto free_mem = cache->CacheCapacity() - cache->CacheUsage();
                all_free_mem.push_back(free_mem);
            }

            auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end());
            auto best_index = std::distance(all_free_mem.begin(), max_e);
            device_id = gpus[best_index];

            auto pair = index_->CopyToGpuWithQuantizer(device_id);
            index_ = pair.first;

            // cache
            auto cached_quantizer = std::make_shared<CachedQuantizer>(pair.second);
            cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(key, cached_quantizer);
        }
W
wxyu 已提交
356 357
        return Status::OK();
    }
X
xiaojun.lin 已提交
358
#endif
Y
youny626 已提交
359

G
groot 已提交
360
#ifdef MILVUS_GPU_VERSION
Y
youny626 已提交
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
    auto index = std::static_pointer_cast<VecIndex>(cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_));
    bool already_in_cache = (index != nullptr);
    if (already_in_cache) {
        index_ = index;
    } else {
        if (index_ == nullptr) {
            ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to copy to gpu";
            return Status(DB_ERROR, "index is null");
        }

        try {
            index_ = index_->CopyToGpu(device_id);
            ENGINE_LOG_DEBUG << "CPU to GPU" << device_id;
        } catch (std::exception& e) {
            ENGINE_LOG_ERROR << e.what();
            return Status(DB_ERROR, e.what());
        }
378
    }
Y
youny626 已提交
379 380 381 382

    if (!already_in_cache) {
        GpuCache(device_id);
    }
G
groot 已提交
383
#endif
Y
youny626 已提交
384

385 386 387
    return Status::OK();
}

Y
Yu Kun 已提交
388 389
Status
ExecutionEngineImpl::CopyToIndexFileToGpu(uint64_t device_id) {
G
groot 已提交
390
#ifdef MILVUS_GPU_VERSION
F
fishpenguin 已提交
391
    gpu_num_ = device_id;
Y
Yu Kun 已提交
392 393 394
    auto to_index_data = std::make_shared<ToIndexData>(PhysicalSize());
    cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(to_index_data);
    milvus::cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(location_, obj);
G
groot 已提交
395
#endif
Y
Yu Kun 已提交
396 397 398
    return Status::OK();
}

S
starlord 已提交
399 400
Status
ExecutionEngineImpl::CopyToCpu() {
401
    auto index = std::static_pointer_cast<VecIndex>(cache::CpuCacheMgr::GetInstance()->GetIndex(location_));
W
wxyu 已提交
402 403 404 405
    bool already_in_cache = (index != nullptr);
    if (already_in_cache) {
        index_ = index;
    } else {
S
starlord 已提交
406
        if (index_ == nullptr) {
S
starlord 已提交
407
            ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to copy to cpu";
S
starlord 已提交
408
            return Status(DB_ERROR, "index is null");
S
starlord 已提交
409 410
        }

Y
Yu Kun 已提交
411 412 413
        try {
            index_ = index_->CopyToCpu();
            ENGINE_LOG_DEBUG << "GPU to CPU";
S
starlord 已提交
414
        } catch (std::exception& e) {
S
starlord 已提交
415
            ENGINE_LOG_ERROR << e.what();
S
starlord 已提交
416
            return Status(DB_ERROR, e.what());
Y
Yu Kun 已提交
417 418 419
        }
    }

W
wxyu 已提交
420
    if (!already_in_cache) {
Y
Yu Kun 已提交
421
        Cache();
422 423 424 425
    }
    return Status::OK();
}

426 427 428 429 430 431 432 433 434 435 436 437
// ExecutionEnginePtr
// ExecutionEngineImpl::Clone() {
//    if (index_ == nullptr) {
//        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to clone";
//        return nullptr;
//    }
//
//    auto ret = std::make_shared<ExecutionEngineImpl>(dim_, location_, index_type_, metric_type_, nlist_);
//    ret->Init();
//    ret->index_ = index_->Clone();
//    return ret;
//}
W
wxyu 已提交
438

S
starlord 已提交
439
Status
S
starlord 已提交
440
ExecutionEngineImpl::Merge(const std::string& location) {
X
xj.lin 已提交
441
    if (location == location_) {
S
starlord 已提交
442
        return Status(DB_ERROR, "Cannot Merge Self");
X
xj.lin 已提交
443 444
    }
    ENGINE_LOG_DEBUG << "Merge index file: " << location << " to: " << location_;
S
starlord 已提交
445

S
starlord 已提交
446
    auto to_merge = cache::CpuCacheMgr::GetInstance()->GetIndex(location);
X
xj.lin 已提交
447
    if (!to_merge) {
X
xj.lin 已提交
448
        try {
Y
Yu Kun 已提交
449 450
            double physical_size = server::CommonUtil::GetFileSize(location);
            server::CollectExecutionEngineMetrics metrics(physical_size);
X
xj.lin 已提交
451
            to_merge = read_index(location);
S
starlord 已提交
452
        } catch (std::exception& e) {
S
starlord 已提交
453
            ENGINE_LOG_ERROR << e.what();
S
starlord 已提交
454
            return Status(DB_ERROR, e.what());
X
xj.lin 已提交
455
        }
X
xj.lin 已提交
456 457
    }

S
starlord 已提交
458
    if (index_ == nullptr) {
S
starlord 已提交
459
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to merge";
S
starlord 已提交
460
        return Status(DB_ERROR, "index is null");
S
starlord 已提交
461 462
    }

X
xj.lin 已提交
463
    if (auto file_index = std::dynamic_pointer_cast<BFIndex>(to_merge)) {
464 465
        auto status = index_->Add(file_index->Count(), file_index->GetRawVectors(), file_index->GetRawIds());
        if (!status.ok()) {
X
xj.lin 已提交
466 467
            ENGINE_LOG_ERROR << "Merge: Add Error";
        }
468
        return status;
X
xj.lin 已提交
469
    } else {
S
starlord 已提交
470
        return Status(DB_ERROR, "file index type is not idmap");
X
xj.lin 已提交
471
    }
S
starlord 已提交
472 473 474
}

ExecutionEnginePtr
S
starlord 已提交
475
ExecutionEngineImpl::BuildIndex(const std::string& location, EngineType engine_type) {
X
xj.lin 已提交
476 477 478
    ENGINE_LOG_DEBUG << "Build index file: " << location << " from: " << location_;

    auto from_index = std::dynamic_pointer_cast<BFIndex>(index_);
S
starlord 已提交
479
    if (from_index == nullptr) {
S
starlord 已提交
480 481 482 483
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: from_index is null, failed to build index";
        return nullptr;
    }

S
starlord 已提交
484
    auto to_index = CreatetVecIndex(engine_type);
X
xj.lin 已提交
485
    if (!to_index) {
486
        throw Exception(DB_ERROR, "Unsupported index type");
X
xj.lin 已提交
487 488
    }

X
xiaojun.lin 已提交
489 490 491 492
    TempMetaConf temp_conf;
    temp_conf.gpu_id = gpu_num_;
    temp_conf.dim = Dimension();
    temp_conf.nlist = nlist_;
S
starlord 已提交
493
    temp_conf.metric_type = (metric_type_ == MetricType::IP) ? knowhere::METRICTYPE::IP : knowhere::METRICTYPE::L2;
X
xiaojun.lin 已提交
494 495 496 497
    temp_conf.size = Count();

    auto adapter = AdapterMgr::GetInstance().GetAdapter(to_index->GetType());
    auto conf = adapter->Match(temp_conf);
X
xj.lin 已提交
498

S
starlord 已提交
499 500 501 502
    auto status = to_index->BuildAll(Count(), from_index->GetRawVectors(), from_index->GetRawIds(), conf);
    if (!status.ok()) {
        throw Exception(DB_ERROR, status.message());
    }
X
xj.lin 已提交
503

G
add log  
groot 已提交
504
    ENGINE_LOG_DEBUG << "Finish build index file: " << location << " size: " << to_index->Size();
S
starlord 已提交
505
    return std::make_shared<ExecutionEngineImpl>(to_index, location, engine_type, metric_type_, nlist_);
S
starlord 已提交
506 507
}

S
starlord 已提交
508
Status
W
wxyu 已提交
509
ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels,
J
JinHai-CN 已提交
510
                            bool hybrid) {
511
#if 0
J
JinHai-CN 已提交
512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563
    if (index_type_ == EngineType::FAISS_IVFSQ8H) {
        if (!hybrid) {
            const std::string key = location_ + ".quantizer";
            std::vector<uint64_t> gpus = scheduler::get_gpu_pool();

            const int64_t NOT_FOUND = -1;
            int64_t device_id = NOT_FOUND;

            // cache hit
            {
                knowhere::QuantizerPtr quantizer = nullptr;

                for (auto& gpu : gpus) {
                    auto cache = cache::GpuCacheMgr::GetInstance(gpu);
                    if (auto cached_quantizer = cache->GetIndex(key)) {
                        device_id = gpu;
                        quantizer = std::static_pointer_cast<CachedQuantizer>(cached_quantizer)->Data();
                    }
                }

                if (device_id != NOT_FOUND) {
                    // cache hit
                    auto config = std::make_shared<knowhere::QuantizerCfg>();
                    config->gpu_id = device_id;
                    config->mode = 2;
                    auto new_index = index_->LoadData(quantizer, config);
                    index_ = new_index;
                }
            }

            if (device_id == NOT_FOUND) {
                // cache miss
                std::vector<int64_t> all_free_mem;
                for (auto& gpu : gpus) {
                    auto cache = cache::GpuCacheMgr::GetInstance(gpu);
                    auto free_mem = cache->CacheCapacity() - cache->CacheUsage();
                    all_free_mem.push_back(free_mem);
                }

                auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end());
                auto best_index = std::distance(all_free_mem.begin(), max_e);
                device_id = gpus[best_index];

                auto pair = index_->CopyToGpuWithQuantizer(device_id);
                index_ = pair.first;

                // cache
                auto cached_quantizer = std::make_shared<CachedQuantizer>(pair.second);
                cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(key, cached_quantizer);
            }
        }
    }
564
#endif
J
JinHai-CN 已提交
565

S
starlord 已提交
566
    if (index_ == nullptr) {
S
starlord 已提交
567
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to search";
S
starlord 已提交
568
        return Status(DB_ERROR, "index is null");
S
starlord 已提交
569 570
    }

Y
Yu Kun 已提交
571
    ENGINE_LOG_DEBUG << "Search Params: [k]  " << k << " [nprobe] " << nprobe;
X
xiaojun.lin 已提交
572 573 574 575 576 577 578 579 580

    // TODO(linxj): remove here. Get conf from function
    TempMetaConf temp_conf;
    temp_conf.k = k;
    temp_conf.nprobe = nprobe;

    auto adapter = AdapterMgr::GetInstance().GetAdapter(index_->GetType());
    auto conf = adapter->MatchSearch(temp_conf, index_->GetType());

W
wxyu 已提交
581 582 583
    if (hybrid) {
        HybridLoad();
    }
W
wxyu 已提交
584

X
xiaojun.lin 已提交
585
    auto status = index_->Search(n, data, distances, labels, conf);
W
wxyu 已提交
586

W
wxyu 已提交
587 588 589
    if (hybrid) {
        HybridUnset();
    }
W
wxyu 已提交
590

591
    if (!status.ok()) {
X
xj.lin 已提交
592 593
        ENGINE_LOG_ERROR << "Search error";
    }
594
    return status;
S
starlord 已提交
595 596
}

S
starlord 已提交
597 598
Status
ExecutionEngineImpl::Cache() {
599
    cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(index_);
S
starlord 已提交
600
    milvus::cache::CpuCacheMgr::GetInstance()->InsertItem(location_, obj);
S
starlord 已提交
601 602 603 604

    return Status::OK();
}

S
starlord 已提交
605 606
Status
ExecutionEngineImpl::GpuCache(uint64_t gpu_id) {
G
groot 已提交
607
#ifdef MILVUS_GPU_VERSION
608
    cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(index_);
S
starlord 已提交
609
    milvus::cache::GpuCacheMgr::GetInstance(gpu_id)->InsertItem(location_, obj);
G
groot 已提交
610
#endif
611
    return Status::OK();
Y
Yu Kun 已提交
612 613
}

X
xj.lin 已提交
614
// TODO(linxj): remove.
S
starlord 已提交
615 616
Status
ExecutionEngineImpl::Init() {
G
groot 已提交
617
#ifdef MILVUS_GPU_VERSION
S
starlord 已提交
618
    server::Config& config = server::Config::GetInstance();
Y
yudong.cai 已提交
619
    std::vector<int64_t> gpu_ids;
620
    Status s = config.GetGpuResourceConfigBuildIndexResources(gpu_ids);
621 622 623 624
    for (auto id : gpu_ids) {
        if (gpu_num_ == id) {
            return Status::OK();
        }
S
starlord 已提交
625
    }
S
starlord 已提交
626

627 628
    std::string msg = "Invalid gpu_num";
    return Status(SERVER_INVALID_ARGUMENT, msg);
G
groot 已提交
629 630 631
#else
    return Status::OK();
#endif
S
starlord 已提交
632 633
}

S
starlord 已提交
634 635
}  // namespace engine
}  // namespace milvus