ExecutionEngineImpl.cpp 11.0 KB
Newer Older
G
groot 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
X
xj.lin 已提交
6
#include <stdexcept>
Y
Yu Kun 已提交
7
#include "src/cache/GpuCacheMgr.h"
X
xj.lin 已提交
8

G
groot 已提交
9 10
#include "src/metrics/Metrics.h"
#include "db/Log.h"
J
jinhai 已提交
11
#include "utils/CommonUtil.h"
G
groot 已提交
12

X
xj.lin 已提交
13 14 15
#include "src/cache/CpuCacheMgr.h"
#include "ExecutionEngineImpl.h"
#include "wrapper/knowhere/vec_index.h"
G
groot 已提交
16
#include "wrapper/knowhere/vec_impl.h"
X
xj.lin 已提交
17
#include "knowhere/common/exception.h"
G
groot 已提交
18
#include "db/Exception.h"
X
xj.lin 已提交
19

G
groot 已提交
20 21 22 23 24 25

namespace zilliz {
namespace milvus {
namespace engine {

ExecutionEngineImpl::ExecutionEngineImpl(uint16_t dimension,
X
xj.lin 已提交
26
                                         const std::string &location,
G
groot 已提交
27 28 29 30 31 32 33 34
                                         EngineType index_type,
                                         MetricType metric_type,
                                         int32_t nlist)
    : location_(location),
      dim_(dimension),
      index_type_(index_type),
      metric_type_(metric_type),
      nlist_(nlist) {
X
xj.lin 已提交
35 36 37 38

    index_ = CreatetVecIndex(EngineType::FAISS_IDMAP);
    if (!index_) throw Exception("Create Empty VecIndex");

X
xj.lin 已提交
39 40
    Config build_cfg;
    build_cfg["dim"] = dimension;
G
groot 已提交
41
    build_cfg["metric_type"] = (metric_type_ == MetricType::IP) ? "IP" : "L2";
X
xj.lin 已提交
42 43
    AutoGenParams(index_->GetType(), 0, build_cfg);
    auto ec = std::static_pointer_cast<BFIndex>(index_)->Build(build_cfg);
X
xj.lin 已提交
44
    if (ec != server::KNOWHERE_SUCCESS) { throw Exception("Build index error"); }
G
groot 已提交
45 46
}

X
xj.lin 已提交
47 48
ExecutionEngineImpl::ExecutionEngineImpl(VecIndexPtr index,
                                         const std::string &location,
G
groot 已提交
49 50 51 52 53 54 55 56
                                         EngineType index_type,
                                         MetricType metric_type,
                                         int32_t nlist)
    : index_(std::move(index)),
      location_(location),
      index_type_(index_type),
      metric_type_(metric_type),
      nlist_(nlist) {
X
xj.lin 已提交
57
}
G
groot 已提交
58

X
xj.lin 已提交
59 60 61 62 63
VecIndexPtr ExecutionEngineImpl::CreatetVecIndex(EngineType type) {
    std::shared_ptr<VecIndex> index;
    switch (type) {
        case EngineType::FAISS_IDMAP: {
            index = GetVecIndexFactory(IndexType::FAISS_IDMAP);
G
groot 已提交
64 65
            break;
        }
J
jinhai 已提交
66
        case EngineType::FAISS_IVFFLAT: {
X
xj.lin 已提交
67
            index = GetVecIndexFactory(IndexType::FAISS_IVFFLAT_MIX);
G
groot 已提交
68 69
            break;
        }
J
jinhai 已提交
70 71
        case EngineType::FAISS_IVFSQ8: {
            index = GetVecIndexFactory(IndexType::FAISS_IVFSQ8_MIX);
G
groot 已提交
72 73
            break;
        }
X
xj.lin 已提交
74 75 76 77
        case EngineType::NSG_MIX: {
            index = GetVecIndexFactory(IndexType::NSG_MIX);
            break;
        }
X
xj.lin 已提交
78
        default: {
G
groot 已提交
79 80 81 82
            ENGINE_LOG_ERROR << "Invalid engine type";
            return nullptr;
        }
    }
X
xj.lin 已提交
83
    return index;
G
groot 已提交
84 85 86
}

Status ExecutionEngineImpl::AddWithIds(long n, const float *xdata, const long *xids) {
X
xj.lin 已提交
87
    auto ec = index_->Add(n, xdata, xids);
X
xj.lin 已提交
88 89 90
    if (ec != server::KNOWHERE_SUCCESS) {
        return Status::Error("Add error");
    }
G
groot 已提交
91 92 93 94
    return Status::OK();
}

size_t ExecutionEngineImpl::Count() const {
G
groot 已提交
95
    if(index_ == nullptr) {
G
groot 已提交
96
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, return count 0";
G
groot 已提交
97 98
        return 0;
    }
X
xj.lin 已提交
99
    return index_->Count();
G
groot 已提交
100 101 102
}

size_t ExecutionEngineImpl::Size() const {
X
xj.lin 已提交
103
    return (size_t) (Count() * Dimension()) * sizeof(float);
G
groot 已提交
104 105 106
}

size_t ExecutionEngineImpl::Dimension() const {
G
groot 已提交
107
    if(index_ == nullptr) {
G
groot 已提交
108
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, return dimension " << dim_;
G
groot 已提交
109 110
        return dim_;
    }
X
xj.lin 已提交
111
    return index_->Dimension();
G
groot 已提交
112 113 114
}

size_t ExecutionEngineImpl::PhysicalSize() const {
J
jinhai 已提交
115
    return server::CommonUtil::GetFileSize(location_);
G
groot 已提交
116 117 118
}

Status ExecutionEngineImpl::Serialize() {
X
xj.lin 已提交
119 120 121 122
    auto ec = write_index(index_, location_);
    if (ec != server::KNOWHERE_SUCCESS) {
        return Status::Error("Serialize: write to disk error");
    }
G
groot 已提交
123 124 125
    return Status::OK();
}

J
jinhai 已提交
126
Status ExecutionEngineImpl::Load(bool to_cache) {
X
xj.lin 已提交
127
    index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
J
jinhai 已提交
128
    bool already_in_cache = (index_ != nullptr);
G
groot 已提交
129
    if (!already_in_cache) {
X
xj.lin 已提交
130
        try {
Y
Yu Kun 已提交
131 132
            double physical_size = PhysicalSize();
            server::CollectExecutionEngineMetrics metrics(physical_size);
X
xj.lin 已提交
133
            index_ = read_index(location_);
G
groot 已提交
134 135 136 137 138
            if(index_ == nullptr) {
                ENGINE_LOG_ERROR << "Failed to load index from " << location_;
            } else {
                ENGINE_LOG_DEBUG << "Disk io from: " << location_;
            }
X
xj.lin 已提交
139 140 141 142 143 144
        } catch (knowhere::KnowhereException &e) {
            ENGINE_LOG_ERROR << e.what();
            return Status::Error(e.what());
        } catch (std::exception &e) {
            return Status::Error(e.what());
        }
X
xj.lin 已提交
145 146
    }

J
jinhai 已提交
147
    if (!already_in_cache && to_cache) {
X
xj.lin 已提交
148 149 150
        Cache();
    }
    return Status::OK();
X
xj.lin 已提交
151 152
}

153
Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
W
wxyu 已提交
154 155 156 157 158
    auto index = zilliz::milvus::cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
    bool already_in_cache = (index != nullptr);
    if (already_in_cache) {
        index_ = index;
    } else {
G
groot 已提交
159 160 161 162 163
        if(index_ == nullptr) {
            ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to copy to gpu";
            return Status::Error("index is null");
        }

Y
Yu Kun 已提交
164 165 166 167 168 169 170 171 172
        try {
            index_ = index_->CopyToGpu(device_id);
            ENGINE_LOG_DEBUG << "CPU to GPU" << device_id;
        } catch (knowhere::KnowhereException &e) {
            ENGINE_LOG_ERROR << e.what();
            return Status::Error(e.what());
        } catch (std::exception &e) {
            return Status::Error(e.what());
        }
173
    }
Y
Yu Kun 已提交
174 175 176 177 178

    if (!already_in_cache) {
        GpuCache(device_id);
    }

179 180 181 182
    return Status::OK();
}

Status ExecutionEngineImpl::CopyToCpu() {
W
wxyu 已提交
183 184 185 186 187
    auto index = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
    bool already_in_cache = (index != nullptr);
    if (already_in_cache) {
        index_ = index;
    } else {
G
groot 已提交
188 189 190 191 192
        if(index_ == nullptr) {
            ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to copy to cpu";
            return Status::Error("index is null");
        }

Y
Yu Kun 已提交
193 194 195 196 197 198 199 200 201 202 203
        try {
            index_ = index_->CopyToCpu();
            ENGINE_LOG_DEBUG << "GPU to CPU";
        } catch (knowhere::KnowhereException &e) {
            ENGINE_LOG_ERROR << e.what();
            return Status::Error(e.what());
        } catch (std::exception &e) {
            return Status::Error(e.what());
        }
    }

W
wxyu 已提交
204
    if (!already_in_cache) {
Y
Yu Kun 已提交
205
        Cache();
206 207 208 209
    }
    return Status::OK();
}

W
wxyu 已提交
210
ExecutionEnginePtr ExecutionEngineImpl::Clone() {
G
groot 已提交
211 212 213 214 215
    if(index_ == nullptr) {
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to clone";
        return nullptr;
    }

W
wxyu 已提交
216 217 218 219 220 221
    auto ret = std::make_shared<ExecutionEngineImpl>(dim_, location_, index_type_, metric_type_, nlist_);
    ret->Init();
    ret->index_ = index_->Clone();
    return ret;
}

X
xj.lin 已提交
222 223 224 225 226
Status ExecutionEngineImpl::Merge(const std::string &location) {
    if (location == location_) {
        return Status::Error("Cannot Merge Self");
    }
    ENGINE_LOG_DEBUG << "Merge index file: " << location << " to: " << location_;
G
groot 已提交
227

X
xj.lin 已提交
228 229
    auto to_merge = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location);
    if (!to_merge) {
X
xj.lin 已提交
230
        try {
Y
Yu Kun 已提交
231 232
            double physical_size = server::CommonUtil::GetFileSize(location);
            server::CollectExecutionEngineMetrics metrics(physical_size);
X
xj.lin 已提交
233 234 235 236 237 238 239
            to_merge = read_index(location);
        } catch (knowhere::KnowhereException &e) {
            ENGINE_LOG_ERROR << e.what();
            return Status::Error(e.what());
        } catch (std::exception &e) {
            return Status::Error(e.what());
        }
X
xj.lin 已提交
240 241
    }

G
groot 已提交
242 243 244 245 246
    if(index_ == nullptr) {
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to merge";
        return Status::Error("index is null");
    }

X
xj.lin 已提交
247
    if (auto file_index = std::dynamic_pointer_cast<BFIndex>(to_merge)) {
X
xj.lin 已提交
248 249 250 251 252
        auto ec = index_->Add(file_index->Count(), file_index->GetRawVectors(), file_index->GetRawIds());
        if (ec != server::KNOWHERE_SUCCESS) {
            ENGINE_LOG_ERROR << "Merge: Add Error";
            return Status::Error("Merge: Add Error");
        }
X
xj.lin 已提交
253 254 255 256
        return Status::OK();
    } else {
        return Status::Error("file index type is not idmap");
    }
G
groot 已提交
257 258 259
}

ExecutionEnginePtr
G
groot 已提交
260
ExecutionEngineImpl::BuildIndex(const std::string &location, EngineType engine_type) {
X
xj.lin 已提交
261 262 263
    ENGINE_LOG_DEBUG << "Build index file: " << location << " from: " << location_;

    auto from_index = std::dynamic_pointer_cast<BFIndex>(index_);
G
groot 已提交
264 265 266 267 268
    if(from_index == nullptr) {
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: from_index is null, failed to build index";
        return nullptr;
    }

G
groot 已提交
269
    auto to_index = CreatetVecIndex(engine_type);
X
xj.lin 已提交
270 271 272 273
    if (!to_index) {
        throw Exception("Create Empty VecIndex");
    }

X
xj.lin 已提交
274 275
    Config build_cfg;
    build_cfg["dim"] = Dimension();
G
groot 已提交
276 277
    build_cfg["metric_type"] = (metric_type_ == MetricType::IP) ? "IP" : "L2";
    build_cfg["gpu_id"] = gpu_num_;
X
xj.lin 已提交
278
    build_cfg["nlist"] = nlist_;
X
xj.lin 已提交
279 280
    AutoGenParams(to_index->GetType(), Count(), build_cfg);

X
xj.lin 已提交
281 282 283
    auto ec = to_index->BuildAll(Count(),
                                 from_index->GetRawVectors(),
                                 from_index->GetRawIds(),
X
xj.lin 已提交
284
                                 build_cfg);
X
xj.lin 已提交
285
    if (ec != server::KNOWHERE_SUCCESS) { throw Exception("Build index error"); }
X
xj.lin 已提交
286

G
groot 已提交
287
    return std::make_shared<ExecutionEngineImpl>(to_index, location, engine_type, metric_type_, nlist_);
G
groot 已提交
288 289 290
}

Status ExecutionEngineImpl::Search(long n,
X
xj.lin 已提交
291 292
                                   const float *data,
                                   long k,
Y
Yu Kun 已提交
293
                                   long nprobe,
X
xj.lin 已提交
294 295
                                   float *distances,
                                   long *labels) const {
G
groot 已提交
296 297 298 299 300
    if(index_ == nullptr) {
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to search";
        return Status::Error("index is null");
    }

Y
Yu Kun 已提交
301 302
    ENGINE_LOG_DEBUG << "Search Params: [k]  " << k << " [nprobe] " << nprobe;
    auto ec = index_->Search(n, data, distances, labels, Config::object{{"k", k}, {"nprobe", nprobe}});
X
xj.lin 已提交
303 304 305 306
    if (ec != server::KNOWHERE_SUCCESS) {
        ENGINE_LOG_ERROR << "Search error";
        return Status::Error("Search: Search Error");
    }
G
groot 已提交
307 308 309 310
    return Status::OK();
}

Status ExecutionEngineImpl::Cache() {
X
xj.lin 已提交
311
    zilliz::milvus::cache::CpuCacheMgr::GetInstance()->InsertItem(location_, index_);
G
groot 已提交
312 313 314 315

    return Status::OK();
}

Y
Yu Kun 已提交
316 317
Status ExecutionEngineImpl::GpuCache(uint64_t gpu_id) {
    zilliz::milvus::cache::GpuCacheMgr::GetInstance(gpu_id)->InsertItem(location_, index_);
318 319

    return Status::OK();
Y
Yu Kun 已提交
320 321
}

X
xj.lin 已提交
322
// TODO(linxj): remove.
G
groot 已提交
323
Status ExecutionEngineImpl::Init() {
X
xj.lin 已提交
324 325 326
    using namespace zilliz::milvus::server;
    ServerConfig &config = ServerConfig::GetInstance();
    ConfigNode server_config = config.GetConfig(CONFIG_SERVER);
W
wxyu 已提交
327
    gpu_num_ = server_config.GetInt32Value("gpu_index", 0);
G
groot 已提交
328 329 330 331 332 333 334 335

    return Status::OK();
}


} // namespace engine
} // namespace milvus
} // namespace zilliz