ExecutionEngineImpl.cpp 20.8 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18
#include "db/engine/ExecutionEngineImpl.h"
S
starlord 已提交
19
#include "cache/CpuCacheMgr.h"
S
starlord 已提交
20
#include "cache/GpuCacheMgr.h"
X
xiaojun.lin 已提交
21
#include "knowhere/common/Config.h"
S
starlord 已提交
22
#include "metrics/Metrics.h"
X
xiaojun.lin 已提交
23 24
#include "scheduler/Utils.h"
#include "server/Config.h"
J
jinhai 已提交
25
#include "utils/CommonUtil.h"
S
starlord 已提交
26
#include "utils/Exception.h"
S
starlord 已提交
27
#include "utils/Log.h"
Y
youny626 已提交
28

S
starlord 已提交
29 30
#include "wrapper/ConfAdapter.h"
#include "wrapper/ConfAdapterMgr.h"
S
starlord 已提交
31 32
#include "wrapper/VecImpl.h"
#include "wrapper/VecIndex.h"
X
xj.lin 已提交
33

S
starlord 已提交
34
#include <stdexcept>
S
starlord 已提交
35
#include <utility>
W
wxyu 已提交
36
#include <vector>
S
starlord 已提交
37

J
JinHai-CN 已提交
38
//#define ON_SEARCH
S
starlord 已提交
39 40 41
namespace milvus {
namespace engine {

W
wxyu 已提交
42 43
class CachedQuantizer : public cache::DataObj {
 public:
W
wxyu 已提交
44 45
    explicit CachedQuantizer(knowhere::QuantizerPtr data) : data_(std::move(data)) {
    }
W
wxyu 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60

    knowhere::QuantizerPtr
    Data() {
        return data_;
    }

    int64_t
    Size() override {
        return data_->size;
    }

 private:
    knowhere::QuantizerPtr data_;
};

S
starlord 已提交
61 62 63
ExecutionEngineImpl::ExecutionEngineImpl(uint16_t dimension, const std::string& location, EngineType index_type,
                                         MetricType metric_type, int32_t nlist)
    : location_(location), dim_(dimension), index_type_(index_type), metric_type_(metric_type), nlist_(nlist) {
X
xj.lin 已提交
64
    index_ = CreatetVecIndex(EngineType::FAISS_IDMAP);
65
    if (!index_) {
66
        throw Exception(DB_ERROR, "Unsupported index type");
67
    }
X
xj.lin 已提交
68

X
xiaojun.lin 已提交
69 70 71
    TempMetaConf temp_conf;
    temp_conf.gpu_id = gpu_num_;
    temp_conf.dim = dimension;
S
starlord 已提交
72
    temp_conf.metric_type = (metric_type_ == MetricType::IP) ? knowhere::METRICTYPE::IP : knowhere::METRICTYPE::L2;
X
xiaojun.lin 已提交
73 74 75 76
    auto adapter = AdapterMgr::GetInstance().GetAdapter(index_->GetType());
    auto conf = adapter->Match(temp_conf);

    auto ec = std::static_pointer_cast<BFIndex>(index_)->Build(conf);
77 78 79
    if (ec != KNOWHERE_SUCCESS) {
        throw Exception(DB_ERROR, "Build index error");
    }
S
starlord 已提交
80 81
}

S
starlord 已提交
82 83 84
ExecutionEngineImpl::ExecutionEngineImpl(VecIndexPtr index, const std::string& location, EngineType index_type,
                                         MetricType metric_type, int32_t nlist)
    : index_(std::move(index)), location_(location), index_type_(index_type), metric_type_(metric_type), nlist_(nlist) {
X
xj.lin 已提交
85
}
S
starlord 已提交
86

S
starlord 已提交
87 88
VecIndexPtr
ExecutionEngineImpl::CreatetVecIndex(EngineType type) {
Y
yudong.cai 已提交
89
#ifdef MILVUS_GPU_VERSION
90 91 92
    server::Config& config = server::Config::GetInstance();
    bool gpu_resource_enable = true;
    config.GetGpuResourceConfigEnable(gpu_resource_enable);
Y
yudong.cai 已提交
93
#endif
X
xj.lin 已提交
94 95 96 97
    std::shared_ptr<VecIndex> index;
    switch (type) {
        case EngineType::FAISS_IDMAP: {
            index = GetVecIndexFactory(IndexType::FAISS_IDMAP);
S
starlord 已提交
98 99
            break;
        }
J
jinhai 已提交
100
        case EngineType::FAISS_IVFFLAT: {
Y
yudong.cai 已提交
101
#ifdef MILVUS_GPU_VERSION
102 103 104
            if (gpu_resource_enable)
                index = GetVecIndexFactory(IndexType::FAISS_IVFFLAT_MIX);
            else
Y
youny626 已提交
105
#endif
106
                index = GetVecIndexFactory(IndexType::FAISS_IVFFLAT_CPU);
S
starlord 已提交
107 108
            break;
        }
J
jinhai 已提交
109
        case EngineType::FAISS_IVFSQ8: {
Y
yudong.cai 已提交
110
#ifdef MILVUS_GPU_VERSION
111 112 113
            if (gpu_resource_enable)
                index = GetVecIndexFactory(IndexType::FAISS_IVFSQ8_MIX);
            else
Y
youny626 已提交
114
#endif
115
                index = GetVecIndexFactory(IndexType::FAISS_IVFSQ8_CPU);
S
starlord 已提交
116 117
            break;
        }
X
xj.lin 已提交
118 119 120 121
        case EngineType::NSG_MIX: {
            index = GetVecIndexFactory(IndexType::NSG_MIX);
            break;
        }
Y
Yukikaze-CZR 已提交
122
#ifdef CUSTOMIZATION
W
wxyu 已提交
123
        case EngineType::FAISS_IVFSQ8H: {
124 125 126 127 128
            if (gpu_resource_enable) {
                index = GetVecIndexFactory(IndexType::FAISS_IVFSQ8_HYBRID);
            } else {
                throw Exception(DB_ERROR, "No GPU resources for IVFSQ8H");
            }
W
wxyu 已提交
129 130
            break;
        }
Y
Yukikaze-CZR 已提交
131
#endif
Z
zirui.chen 已提交
132
        case EngineType::FAISS_PQ: {
Y
yudong.cai 已提交
133
#ifdef MILVUS_GPU_VERSION
134 135 136
            if (gpu_resource_enable)
                index = GetVecIndexFactory(IndexType::FAISS_IVFPQ_MIX);
            else
Z
zirui.chen 已提交
137
#endif
138
                index = GetVecIndexFactory(IndexType::FAISS_IVFPQ_CPU);
Z
zirui.chen 已提交
139 140
            break;
        }
141 142 143 144 145 146 147 148
        case EngineType::SPTAG_KDT: {
            index = GetVecIndexFactory(IndexType::SPTAG_KDT_RNT_CPU);
            break;
        }
        case EngineType::SPTAG_BKT: {
            index = GetVecIndexFactory(IndexType::SPTAG_BKT_RNT_CPU);
            break;
        }
X
xj.lin 已提交
149
        default: {
150
            ENGINE_LOG_ERROR << "Unsupported index type";
S
starlord 已提交
151 152 153
            return nullptr;
        }
    }
X
xj.lin 已提交
154
    return index;
S
starlord 已提交
155 156
}

157
void
W
wxyu 已提交
158 159 160 161 162
ExecutionEngineImpl::HybridLoad() const {
    if (index_type_ != EngineType::FAISS_IVFSQ8H) {
        return;
    }

W
update  
wxyu 已提交
163 164 165 166 167
    if (index_->GetType() == IndexType::FAISS_IDMAP) {
        ENGINE_LOG_WARNING << "HybridLoad with type FAISS_IDMAP, ignore";
        return;
    }

G
groot 已提交
168
#ifdef MILVUS_GPU_VERSION
W
wxyu 已提交
169
    const std::string key = location_ + ".quantizer";
170 171

    server::Config& config = server::Config::GetInstance();
Y
yudong.cai 已提交
172
    std::vector<int64_t> gpus;
173 174 175 176 177
    Status s = config.GetGpuResourceConfigSearchResources(gpus);
    if (!s.ok()) {
        ENGINE_LOG_ERROR << s.message();
        return;
    }
W
wxyu 已提交
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215

    // cache hit
    {
        const int64_t NOT_FOUND = -1;
        int64_t device_id = NOT_FOUND;
        knowhere::QuantizerPtr quantizer = nullptr;

        for (auto& gpu : gpus) {
            auto cache = cache::GpuCacheMgr::GetInstance(gpu);
            if (auto cached_quantizer = cache->GetIndex(key)) {
                device_id = gpu;
                quantizer = std::static_pointer_cast<CachedQuantizer>(cached_quantizer)->Data();
            }
        }

        if (device_id != NOT_FOUND) {
            index_->SetQuantizer(quantizer);
            return;
        }
    }

    // cache miss
    {
        std::vector<int64_t> all_free_mem;
        for (auto& gpu : gpus) {
            auto cache = cache::GpuCacheMgr::GetInstance(gpu);
            auto free_mem = cache->CacheCapacity() - cache->CacheUsage();
            all_free_mem.push_back(free_mem);
        }

        auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end());
        auto best_index = std::distance(all_free_mem.begin(), max_e);
        auto best_device_id = gpus[best_index];

        auto quantizer_conf = std::make_shared<knowhere::QuantizerCfg>();
        quantizer_conf->mode = 1;
        quantizer_conf->gpu_id = best_device_id;
        auto quantizer = index_->LoadQuantizer(quantizer_conf);
W
add log  
wxyu 已提交
216 217 218
        if (quantizer == nullptr) {
            ENGINE_LOG_ERROR << "quantizer is nullptr";
        }
W
wxyu 已提交
219 220 221 222
        index_->SetQuantizer(quantizer);
        auto cache_quantizer = std::make_shared<CachedQuantizer>(quantizer);
        cache::GpuCacheMgr::GetInstance(best_device_id)->InsertItem(key, cache_quantizer);
    }
G
groot 已提交
223
#endif
W
wxyu 已提交
224 225 226 227 228 229 230
}

void
ExecutionEngineImpl::HybridUnset() const {
    if (index_type_ != EngineType::FAISS_IVFSQ8H) {
        return;
    }
W
update  
wxyu 已提交
231 232 233
    if (index_->GetType() == IndexType::FAISS_IDMAP) {
        return;
    }
W
wxyu 已提交
234
    index_->UnsetQuantizer();
235 236
}

S
starlord 已提交
237
Status
S
starlord 已提交
238
ExecutionEngineImpl::AddWithIds(int64_t n, const float* xdata, const int64_t* xids) {
239 240
    auto status = index_->Add(n, xdata, xids);
    return status;
S
starlord 已提交
241 242
}

S
starlord 已提交
243 244 245
size_t
ExecutionEngineImpl::Count() const {
    if (index_ == nullptr) {
S
starlord 已提交
246
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, return count 0";
S
starlord 已提交
247 248
        return 0;
    }
X
xj.lin 已提交
249
    return index_->Count();
S
starlord 已提交
250 251
}

S
starlord 已提交
252 253
size_t
ExecutionEngineImpl::Size() const {
S
starlord 已提交
254
    return (size_t)(Count() * Dimension()) * sizeof(float);
S
starlord 已提交
255 256
}

S
starlord 已提交
257 258 259
size_t
ExecutionEngineImpl::Dimension() const {
    if (index_ == nullptr) {
S
starlord 已提交
260
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, return dimension " << dim_;
S
starlord 已提交
261 262
        return dim_;
    }
X
xj.lin 已提交
263
    return index_->Dimension();
S
starlord 已提交
264 265
}

S
starlord 已提交
266 267
size_t
ExecutionEngineImpl::PhysicalSize() const {
J
jinhai 已提交
268
    return server::CommonUtil::GetFileSize(location_);
S
starlord 已提交
269 270
}

S
starlord 已提交
271 272
Status
ExecutionEngineImpl::Serialize() {
273
    auto status = write_index(index_, location_);
274 275 276 277

    // here we reset index size by file size,
    // since some index type(such as SQ8) data size become smaller after serialized
    index_->set_size(PhysicalSize());
G
add log  
groot 已提交
278
    ENGINE_LOG_DEBUG << "Finish serialize index file: " << location_ << " size: " << index_->Size();
279

G
groot 已提交
280 281 282 283 284
    if (index_->Size() == 0) {
        std::string msg = "Failed to serialize file: " + location_ + " reason: out of disk space or memory";
        status = Status(DB_ERROR, msg);
    }

285
    return status;
S
starlord 已提交
286 287
}

S
starlord 已提交
288 289
Status
ExecutionEngineImpl::Load(bool to_cache) {
290
    index_ = std::static_pointer_cast<VecIndex>(cache::CpuCacheMgr::GetInstance()->GetIndex(location_));
J
jinhai 已提交
291
    bool already_in_cache = (index_ != nullptr);
S
starlord 已提交
292
    if (!already_in_cache) {
X
xj.lin 已提交
293
        try {
Y
Yu Kun 已提交
294 295
            double physical_size = PhysicalSize();
            server::CollectExecutionEngineMetrics metrics(physical_size);
X
xj.lin 已提交
296
            index_ = read_index(location_);
S
starlord 已提交
297
            if (index_ == nullptr) {
S
starlord 已提交
298 299
                std::string msg = "Failed to load index from " + location_;
                ENGINE_LOG_ERROR << msg;
S
starlord 已提交
300
                return Status(DB_ERROR, msg);
S
starlord 已提交
301 302 303
            } else {
                ENGINE_LOG_DEBUG << "Disk io from: " << location_;
            }
S
starlord 已提交
304
        } catch (std::exception& e) {
S
starlord 已提交
305
            ENGINE_LOG_ERROR << e.what();
S
starlord 已提交
306
            return Status(DB_ERROR, e.what());
X
xj.lin 已提交
307
        }
X
xj.lin 已提交
308 309
    }

J
jinhai 已提交
310
    if (!already_in_cache && to_cache) {
X
xj.lin 已提交
311 312 313
        Cache();
    }
    return Status::OK();
X
xj.lin 已提交
314 315
}

S
starlord 已提交
316
Status
W
wxyu 已提交
317
ExecutionEngineImpl::CopyToGpu(uint64_t device_id, bool hybrid) {
X
xiaojun.lin 已提交
318
#if 0
W
wxyu 已提交
319
    if (hybrid) {
X
xiaojun.lin 已提交
320
        const std::string key = location_ + ".quantizer";
W
wxyu 已提交
321
        std::vector<uint64_t> gpus{device_id};
X
xiaojun.lin 已提交
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367

        const int64_t NOT_FOUND = -1;
        int64_t device_id = NOT_FOUND;

        // cache hit
        {
            knowhere::QuantizerPtr quantizer = nullptr;

            for (auto& gpu : gpus) {
                auto cache = cache::GpuCacheMgr::GetInstance(gpu);
                if (auto cached_quantizer = cache->GetIndex(key)) {
                    device_id = gpu;
                    quantizer = std::static_pointer_cast<CachedQuantizer>(cached_quantizer)->Data();
                }
            }

            if (device_id != NOT_FOUND) {
                // cache hit
                auto config = std::make_shared<knowhere::QuantizerCfg>();
                config->gpu_id = device_id;
                config->mode = 2;
                auto new_index = index_->LoadData(quantizer, config);
                index_ = new_index;
            }
        }

        if (device_id == NOT_FOUND) {
            // cache miss
            std::vector<int64_t> all_free_mem;
            for (auto& gpu : gpus) {
                auto cache = cache::GpuCacheMgr::GetInstance(gpu);
                auto free_mem = cache->CacheCapacity() - cache->CacheUsage();
                all_free_mem.push_back(free_mem);
            }

            auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end());
            auto best_index = std::distance(all_free_mem.begin(), max_e);
            device_id = gpus[best_index];

            auto pair = index_->CopyToGpuWithQuantizer(device_id);
            index_ = pair.first;

            // cache
            auto cached_quantizer = std::make_shared<CachedQuantizer>(pair.second);
            cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(key, cached_quantizer);
        }
W
wxyu 已提交
368 369
        return Status::OK();
    }
X
xiaojun.lin 已提交
370
#endif
Y
youny626 已提交
371

G
groot 已提交
372
#ifdef MILVUS_GPU_VERSION
Y
youny626 已提交
373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389
    auto index = std::static_pointer_cast<VecIndex>(cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_));
    bool already_in_cache = (index != nullptr);
    if (already_in_cache) {
        index_ = index;
    } else {
        if (index_ == nullptr) {
            ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to copy to gpu";
            return Status(DB_ERROR, "index is null");
        }

        try {
            index_ = index_->CopyToGpu(device_id);
            ENGINE_LOG_DEBUG << "CPU to GPU" << device_id;
        } catch (std::exception& e) {
            ENGINE_LOG_ERROR << e.what();
            return Status(DB_ERROR, e.what());
        }
390
    }
Y
youny626 已提交
391 392 393 394

    if (!already_in_cache) {
        GpuCache(device_id);
    }
G
groot 已提交
395
#endif
Y
youny626 已提交
396

397 398 399
    return Status::OK();
}

Y
Yu Kun 已提交
400 401
Status
ExecutionEngineImpl::CopyToIndexFileToGpu(uint64_t device_id) {
G
groot 已提交
402
#ifdef MILVUS_GPU_VERSION
F
fishpenguin 已提交
403
    gpu_num_ = device_id;
Y
Yu Kun 已提交
404 405 406
    auto to_index_data = std::make_shared<ToIndexData>(PhysicalSize());
    cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(to_index_data);
    milvus::cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(location_, obj);
G
groot 已提交
407
#endif
Y
Yu Kun 已提交
408 409 410
    return Status::OK();
}

S
starlord 已提交
411 412
Status
ExecutionEngineImpl::CopyToCpu() {
413
    auto index = std::static_pointer_cast<VecIndex>(cache::CpuCacheMgr::GetInstance()->GetIndex(location_));
W
wxyu 已提交
414 415 416 417
    bool already_in_cache = (index != nullptr);
    if (already_in_cache) {
        index_ = index;
    } else {
S
starlord 已提交
418
        if (index_ == nullptr) {
S
starlord 已提交
419
            ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to copy to cpu";
S
starlord 已提交
420
            return Status(DB_ERROR, "index is null");
S
starlord 已提交
421 422
        }

Y
Yu Kun 已提交
423 424 425
        try {
            index_ = index_->CopyToCpu();
            ENGINE_LOG_DEBUG << "GPU to CPU";
S
starlord 已提交
426
        } catch (std::exception& e) {
S
starlord 已提交
427
            ENGINE_LOG_ERROR << e.what();
S
starlord 已提交
428
            return Status(DB_ERROR, e.what());
Y
Yu Kun 已提交
429 430 431
        }
    }

W
wxyu 已提交
432
    if (!already_in_cache) {
Y
Yu Kun 已提交
433
        Cache();
434 435 436 437
    }
    return Status::OK();
}

438 439 440 441 442 443 444 445 446 447 448 449
// ExecutionEnginePtr
// ExecutionEngineImpl::Clone() {
//    if (index_ == nullptr) {
//        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to clone";
//        return nullptr;
//    }
//
//    auto ret = std::make_shared<ExecutionEngineImpl>(dim_, location_, index_type_, metric_type_, nlist_);
//    ret->Init();
//    ret->index_ = index_->Clone();
//    return ret;
//}
W
wxyu 已提交
450

S
starlord 已提交
451
Status
S
starlord 已提交
452
ExecutionEngineImpl::Merge(const std::string& location) {
X
xj.lin 已提交
453
    if (location == location_) {
S
starlord 已提交
454
        return Status(DB_ERROR, "Cannot Merge Self");
X
xj.lin 已提交
455 456
    }
    ENGINE_LOG_DEBUG << "Merge index file: " << location << " to: " << location_;
S
starlord 已提交
457

S
starlord 已提交
458
    auto to_merge = cache::CpuCacheMgr::GetInstance()->GetIndex(location);
X
xj.lin 已提交
459
    if (!to_merge) {
X
xj.lin 已提交
460
        try {
Y
Yu Kun 已提交
461 462
            double physical_size = server::CommonUtil::GetFileSize(location);
            server::CollectExecutionEngineMetrics metrics(physical_size);
X
xj.lin 已提交
463
            to_merge = read_index(location);
S
starlord 已提交
464
        } catch (std::exception& e) {
S
starlord 已提交
465
            ENGINE_LOG_ERROR << e.what();
S
starlord 已提交
466
            return Status(DB_ERROR, e.what());
X
xj.lin 已提交
467
        }
X
xj.lin 已提交
468 469
    }

S
starlord 已提交
470
    if (index_ == nullptr) {
S
starlord 已提交
471
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to merge";
S
starlord 已提交
472
        return Status(DB_ERROR, "index is null");
S
starlord 已提交
473 474
    }

X
xj.lin 已提交
475
    if (auto file_index = std::dynamic_pointer_cast<BFIndex>(to_merge)) {
476 477
        auto status = index_->Add(file_index->Count(), file_index->GetRawVectors(), file_index->GetRawIds());
        if (!status.ok()) {
G
groot 已提交
478
            ENGINE_LOG_ERROR << "Failed to merge: " << location << " to: " << location_;
G
groot 已提交
479 480
        } else {
            ENGINE_LOG_DEBUG << "Finish merge index file: " << location;
X
xj.lin 已提交
481
        }
482
        return status;
X
xj.lin 已提交
483
    } else {
S
starlord 已提交
484
        return Status(DB_ERROR, "file index type is not idmap");
X
xj.lin 已提交
485
    }
S
starlord 已提交
486 487 488
}

ExecutionEnginePtr
S
starlord 已提交
489
ExecutionEngineImpl::BuildIndex(const std::string& location, EngineType engine_type) {
X
xj.lin 已提交
490 491 492
    ENGINE_LOG_DEBUG << "Build index file: " << location << " from: " << location_;

    auto from_index = std::dynamic_pointer_cast<BFIndex>(index_);
S
starlord 已提交
493
    if (from_index == nullptr) {
S
starlord 已提交
494 495 496 497
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: from_index is null, failed to build index";
        return nullptr;
    }

S
starlord 已提交
498
    auto to_index = CreatetVecIndex(engine_type);
X
xj.lin 已提交
499
    if (!to_index) {
500
        throw Exception(DB_ERROR, "Unsupported index type");
X
xj.lin 已提交
501 502
    }

X
xiaojun.lin 已提交
503 504 505 506
    TempMetaConf temp_conf;
    temp_conf.gpu_id = gpu_num_;
    temp_conf.dim = Dimension();
    temp_conf.nlist = nlist_;
S
starlord 已提交
507
    temp_conf.metric_type = (metric_type_ == MetricType::IP) ? knowhere::METRICTYPE::IP : knowhere::METRICTYPE::L2;
X
xiaojun.lin 已提交
508 509 510 511
    temp_conf.size = Count();

    auto adapter = AdapterMgr::GetInstance().GetAdapter(to_index->GetType());
    auto conf = adapter->Match(temp_conf);
X
xj.lin 已提交
512

S
starlord 已提交
513 514 515 516
    auto status = to_index->BuildAll(Count(), from_index->GetRawVectors(), from_index->GetRawIds(), conf);
    if (!status.ok()) {
        throw Exception(DB_ERROR, status.message());
    }
X
xj.lin 已提交
517

G
add log  
groot 已提交
518
    ENGINE_LOG_DEBUG << "Finish build index file: " << location << " size: " << to_index->Size();
S
starlord 已提交
519
    return std::make_shared<ExecutionEngineImpl>(to_index, location, engine_type, metric_type_, nlist_);
S
starlord 已提交
520 521
}

S
starlord 已提交
522
Status
W
wxyu 已提交
523
ExecutionEngineImpl::Search(int64_t n, const float* data, int64_t k, int64_t nprobe, float* distances, int64_t* labels,
J
JinHai-CN 已提交
524
                            bool hybrid) {
525
#if 0
J
JinHai-CN 已提交
526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577
    if (index_type_ == EngineType::FAISS_IVFSQ8H) {
        if (!hybrid) {
            const std::string key = location_ + ".quantizer";
            std::vector<uint64_t> gpus = scheduler::get_gpu_pool();

            const int64_t NOT_FOUND = -1;
            int64_t device_id = NOT_FOUND;

            // cache hit
            {
                knowhere::QuantizerPtr quantizer = nullptr;

                for (auto& gpu : gpus) {
                    auto cache = cache::GpuCacheMgr::GetInstance(gpu);
                    if (auto cached_quantizer = cache->GetIndex(key)) {
                        device_id = gpu;
                        quantizer = std::static_pointer_cast<CachedQuantizer>(cached_quantizer)->Data();
                    }
                }

                if (device_id != NOT_FOUND) {
                    // cache hit
                    auto config = std::make_shared<knowhere::QuantizerCfg>();
                    config->gpu_id = device_id;
                    config->mode = 2;
                    auto new_index = index_->LoadData(quantizer, config);
                    index_ = new_index;
                }
            }

            if (device_id == NOT_FOUND) {
                // cache miss
                std::vector<int64_t> all_free_mem;
                for (auto& gpu : gpus) {
                    auto cache = cache::GpuCacheMgr::GetInstance(gpu);
                    auto free_mem = cache->CacheCapacity() - cache->CacheUsage();
                    all_free_mem.push_back(free_mem);
                }

                auto max_e = std::max_element(all_free_mem.begin(), all_free_mem.end());
                auto best_index = std::distance(all_free_mem.begin(), max_e);
                device_id = gpus[best_index];

                auto pair = index_->CopyToGpuWithQuantizer(device_id);
                index_ = pair.first;

                // cache
                auto cached_quantizer = std::make_shared<CachedQuantizer>(pair.second);
                cache::GpuCacheMgr::GetInstance(device_id)->InsertItem(key, cached_quantizer);
            }
        }
    }
578
#endif
J
JinHai-CN 已提交
579

S
starlord 已提交
580
    if (index_ == nullptr) {
S
starlord 已提交
581
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to search";
S
starlord 已提交
582
        return Status(DB_ERROR, "index is null");
S
starlord 已提交
583 584
    }

Y
Yu Kun 已提交
585
    ENGINE_LOG_DEBUG << "Search Params: [k]  " << k << " [nprobe] " << nprobe;
X
xiaojun.lin 已提交
586 587 588 589 590 591 592 593 594

    // TODO(linxj): remove here. Get conf from function
    TempMetaConf temp_conf;
    temp_conf.k = k;
    temp_conf.nprobe = nprobe;

    auto adapter = AdapterMgr::GetInstance().GetAdapter(index_->GetType());
    auto conf = adapter->MatchSearch(temp_conf, index_->GetType());

W
wxyu 已提交
595 596 597
    if (hybrid) {
        HybridLoad();
    }
W
wxyu 已提交
598

X
xiaojun.lin 已提交
599
    auto status = index_->Search(n, data, distances, labels, conf);
W
wxyu 已提交
600

W
wxyu 已提交
601 602 603
    if (hybrid) {
        HybridUnset();
    }
W
wxyu 已提交
604

605
    if (!status.ok()) {
G
groot 已提交
606
        ENGINE_LOG_ERROR << "Search error:" << status.message();
X
xj.lin 已提交
607
    }
608
    return status;
S
starlord 已提交
609 610
}

S
starlord 已提交
611 612
Status
ExecutionEngineImpl::Cache() {
613
    cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(index_);
S
starlord 已提交
614
    milvus::cache::CpuCacheMgr::GetInstance()->InsertItem(location_, obj);
S
starlord 已提交
615 616 617 618

    return Status::OK();
}

S
starlord 已提交
619 620
Status
ExecutionEngineImpl::GpuCache(uint64_t gpu_id) {
G
groot 已提交
621
#ifdef MILVUS_GPU_VERSION
622
    cache::DataObjPtr obj = std::static_pointer_cast<cache::DataObj>(index_);
S
starlord 已提交
623
    milvus::cache::GpuCacheMgr::GetInstance(gpu_id)->InsertItem(location_, obj);
G
groot 已提交
624
#endif
625
    return Status::OK();
Y
Yu Kun 已提交
626 627
}

X
xj.lin 已提交
628
// TODO(linxj): remove.
S
starlord 已提交
629 630
Status
ExecutionEngineImpl::Init() {
G
groot 已提交
631
#ifdef MILVUS_GPU_VERSION
S
starlord 已提交
632
    server::Config& config = server::Config::GetInstance();
Y
yudong.cai 已提交
633
    std::vector<int64_t> gpu_ids;
634
    Status s = config.GetGpuResourceConfigBuildIndexResources(gpu_ids);
F
fishpenguin 已提交
635 636 637
    if (!s.ok()) {
        gpu_num_ = knowhere::INVALID_VALUE;
    }
638 639 640 641
    for (auto id : gpu_ids) {
        if (gpu_num_ == id) {
            return Status::OK();
        }
S
starlord 已提交
642
    }
S
starlord 已提交
643

644 645
    std::string msg = "Invalid gpu_num";
    return Status(SERVER_INVALID_ARGUMENT, msg);
G
groot 已提交
646 647 648
#else
    return Status::OK();
#endif
S
starlord 已提交
649 650
}

S
starlord 已提交
651 652
}  // namespace engine
}  // namespace milvus