ExecutionEngineImpl.cpp 11.4 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

S
starlord 已提交
18 19 20 21 22
#include "ExecutionEngineImpl.h"
#include "cache/GpuCacheMgr.h"
#include "cache/CpuCacheMgr.h"
#include "metrics/Metrics.h"
#include "utils/Log.h"
J
jinhai 已提交
23
#include "utils/CommonUtil.h"
S
starlord 已提交
24
#include "utils/Exception.h"
S
starlord 已提交
25

X
xiaojun.lin 已提交
26 27
#include "src/wrapper/vec_index.h"
#include "src/wrapper/vec_impl.h"
X
xiaojun.lin 已提交
28
#include "knowhere/common/Exception.h"
X
xj.lin 已提交
29

S
starlord 已提交
30
#include <stdexcept>
S
starlord 已提交
31 32 33 34 35 36

namespace zilliz {
namespace milvus {
namespace engine {

ExecutionEngineImpl::ExecutionEngineImpl(uint16_t dimension,
X
xj.lin 已提交
37
                                         const std::string &location,
S
starlord 已提交
38 39 40 41 42 43 44 45
                                         EngineType index_type,
                                         MetricType metric_type,
                                         int32_t nlist)
    : location_(location),
      dim_(dimension),
      index_type_(index_type),
      metric_type_(metric_type),
      nlist_(nlist) {
X
xj.lin 已提交
46 47

    index_ = CreatetVecIndex(EngineType::FAISS_IDMAP);
48 49 50
    if (!index_) {
        throw Exception(DB_ERROR, "Could not create VecIndex");
    }
X
xj.lin 已提交
51

X
xj.lin 已提交
52 53
    Config build_cfg;
    build_cfg["dim"] = dimension;
S
starlord 已提交
54
    build_cfg["metric_type"] = (metric_type_ == MetricType::IP) ? "IP" : "L2";
X
xj.lin 已提交
55 56
    AutoGenParams(index_->GetType(), 0, build_cfg);
    auto ec = std::static_pointer_cast<BFIndex>(index_)->Build(build_cfg);
57 58 59
    if (ec != KNOWHERE_SUCCESS) {
        throw Exception(DB_ERROR, "Build index error");
    }
S
starlord 已提交
60 61
}

X
xj.lin 已提交
62 63
ExecutionEngineImpl::ExecutionEngineImpl(VecIndexPtr index,
                                         const std::string &location,
S
starlord 已提交
64 65 66 67 68 69 70 71
                                         EngineType index_type,
                                         MetricType metric_type,
                                         int32_t nlist)
    : index_(std::move(index)),
      location_(location),
      index_type_(index_type),
      metric_type_(metric_type),
      nlist_(nlist) {
X
xj.lin 已提交
72
}
S
starlord 已提交
73

X
xj.lin 已提交
74 75 76 77 78
VecIndexPtr ExecutionEngineImpl::CreatetVecIndex(EngineType type) {
    std::shared_ptr<VecIndex> index;
    switch (type) {
        case EngineType::FAISS_IDMAP: {
            index = GetVecIndexFactory(IndexType::FAISS_IDMAP);
S
starlord 已提交
79 80
            break;
        }
J
jinhai 已提交
81
        case EngineType::FAISS_IVFFLAT: {
X
xj.lin 已提交
82
            index = GetVecIndexFactory(IndexType::FAISS_IVFFLAT_MIX);
S
starlord 已提交
83 84
            break;
        }
J
jinhai 已提交
85 86
        case EngineType::FAISS_IVFSQ8: {
            index = GetVecIndexFactory(IndexType::FAISS_IVFSQ8_MIX);
S
starlord 已提交
87 88
            break;
        }
X
xj.lin 已提交
89 90 91 92
        case EngineType::NSG_MIX: {
            index = GetVecIndexFactory(IndexType::NSG_MIX);
            break;
        }
X
xj.lin 已提交
93
        default: {
S
starlord 已提交
94 95 96 97
            ENGINE_LOG_ERROR << "Invalid engine type";
            return nullptr;
        }
    }
X
xj.lin 已提交
98
    return index;
S
starlord 已提交
99 100 101
}

Status ExecutionEngineImpl::AddWithIds(long n, const float *xdata, const long *xids) {
X
xj.lin 已提交
102
    auto ec = index_->Add(n, xdata, xids);
S
starlord 已提交
103
    if (ec != KNOWHERE_SUCCESS) {
S
starlord 已提交
104
        return Status(DB_ERROR, "Add error");
X
xj.lin 已提交
105
    }
S
starlord 已提交
106 107 108 109
    return Status::OK();
}

size_t ExecutionEngineImpl::Count() const {
S
starlord 已提交
110
    if(index_ == nullptr) {
S
starlord 已提交
111
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, return count 0";
S
starlord 已提交
112 113
        return 0;
    }
X
xj.lin 已提交
114
    return index_->Count();
S
starlord 已提交
115 116 117
}

size_t ExecutionEngineImpl::Size() const {
X
xj.lin 已提交
118
    return (size_t) (Count() * Dimension()) * sizeof(float);
S
starlord 已提交
119 120 121
}

size_t ExecutionEngineImpl::Dimension() const {
S
starlord 已提交
122
    if(index_ == nullptr) {
S
starlord 已提交
123
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, return dimension " << dim_;
S
starlord 已提交
124 125
        return dim_;
    }
X
xj.lin 已提交
126
    return index_->Dimension();
S
starlord 已提交
127 128 129
}

size_t ExecutionEngineImpl::PhysicalSize() const {
J
jinhai 已提交
130
    return server::CommonUtil::GetFileSize(location_);
S
starlord 已提交
131 132 133
}

Status ExecutionEngineImpl::Serialize() {
X
xj.lin 已提交
134
    auto ec = write_index(index_, location_);
S
starlord 已提交
135
    if (ec != KNOWHERE_SUCCESS) {
S
starlord 已提交
136
        return Status(DB_ERROR, "Serialize: write to disk error");
X
xj.lin 已提交
137
    }
S
starlord 已提交
138 139 140
    return Status::OK();
}

J
jinhai 已提交
141
Status ExecutionEngineImpl::Load(bool to_cache) {
S
starlord 已提交
142
    index_ = cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
J
jinhai 已提交
143
    bool already_in_cache = (index_ != nullptr);
S
starlord 已提交
144
    if (!already_in_cache) {
X
xj.lin 已提交
145
        try {
Y
Yu Kun 已提交
146 147
            double physical_size = PhysicalSize();
            server::CollectExecutionEngineMetrics metrics(physical_size);
X
xj.lin 已提交
148
            index_ = read_index(location_);
S
starlord 已提交
149
            if(index_ == nullptr) {
S
starlord 已提交
150 151
                std::string msg = "Failed to load index from " + location_;
                ENGINE_LOG_ERROR << msg;
S
starlord 已提交
152
                return Status(DB_ERROR, msg);
S
starlord 已提交
153 154 155
            } else {
                ENGINE_LOG_DEBUG << "Disk io from: " << location_;
            }
X
xj.lin 已提交
156
        } catch (std::exception &e) {
S
starlord 已提交
157
            ENGINE_LOG_ERROR << e.what();
S
starlord 已提交
158
            return Status(DB_ERROR, e.what());
X
xj.lin 已提交
159
        }
X
xj.lin 已提交
160 161
    }

J
jinhai 已提交
162
    if (!already_in_cache && to_cache) {
X
xj.lin 已提交
163 164 165
        Cache();
    }
    return Status::OK();
X
xj.lin 已提交
166 167
}

168
Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
S
starlord 已提交
169
    auto index = cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
W
wxyu 已提交
170 171 172 173
    bool already_in_cache = (index != nullptr);
    if (already_in_cache) {
        index_ = index;
    } else {
S
starlord 已提交
174 175
        if(index_ == nullptr) {
            ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to copy to gpu";
S
starlord 已提交
176
            return Status(DB_ERROR, "index is null");
S
starlord 已提交
177 178
        }

Y
Yu Kun 已提交
179 180 181 182
        try {
            index_ = index_->CopyToGpu(device_id);
            ENGINE_LOG_DEBUG << "CPU to GPU" << device_id;
        } catch (std::exception &e) {
S
starlord 已提交
183
            ENGINE_LOG_ERROR << e.what();
S
starlord 已提交
184
            return Status(DB_ERROR, e.what());
Y
Yu Kun 已提交
185
        }
186
    }
Y
Yu Kun 已提交
187 188 189 190 191

    if (!already_in_cache) {
        GpuCache(device_id);
    }

192 193 194 195
    return Status::OK();
}

Status ExecutionEngineImpl::CopyToCpu() {
S
starlord 已提交
196
    auto index = cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
W
wxyu 已提交
197 198 199 200
    bool already_in_cache = (index != nullptr);
    if (already_in_cache) {
        index_ = index;
    } else {
S
starlord 已提交
201 202
        if(index_ == nullptr) {
            ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to copy to cpu";
S
starlord 已提交
203
            return Status(DB_ERROR, "index is null");
S
starlord 已提交
204 205
        }

Y
Yu Kun 已提交
206 207 208 209
        try {
            index_ = index_->CopyToCpu();
            ENGINE_LOG_DEBUG << "GPU to CPU";
        } catch (std::exception &e) {
S
starlord 已提交
210
            ENGINE_LOG_ERROR << e.what();
S
starlord 已提交
211
            return Status(DB_ERROR, e.what());
Y
Yu Kun 已提交
212 213 214
        }
    }

W
wxyu 已提交
215
    if (!already_in_cache) {
Y
Yu Kun 已提交
216
        Cache();
217 218 219 220
    }
    return Status::OK();
}

W
wxyu 已提交
221
ExecutionEnginePtr ExecutionEngineImpl::Clone() {
S
starlord 已提交
222 223 224 225 226
    if(index_ == nullptr) {
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to clone";
        return nullptr;
    }

W
wxyu 已提交
227 228 229 230 231 232
    auto ret = std::make_shared<ExecutionEngineImpl>(dim_, location_, index_type_, metric_type_, nlist_);
    ret->Init();
    ret->index_ = index_->Clone();
    return ret;
}

X
xj.lin 已提交
233 234
Status ExecutionEngineImpl::Merge(const std::string &location) {
    if (location == location_) {
S
starlord 已提交
235
        return Status(DB_ERROR, "Cannot Merge Self");
X
xj.lin 已提交
236 237
    }
    ENGINE_LOG_DEBUG << "Merge index file: " << location << " to: " << location_;
S
starlord 已提交
238

S
starlord 已提交
239
    auto to_merge = cache::CpuCacheMgr::GetInstance()->GetIndex(location);
X
xj.lin 已提交
240
    if (!to_merge) {
X
xj.lin 已提交
241
        try {
Y
Yu Kun 已提交
242 243
            double physical_size = server::CommonUtil::GetFileSize(location);
            server::CollectExecutionEngineMetrics metrics(physical_size);
X
xj.lin 已提交
244 245
            to_merge = read_index(location);
        } catch (std::exception &e) {
S
starlord 已提交
246
            ENGINE_LOG_ERROR << e.what();
S
starlord 已提交
247
            return Status(DB_ERROR, e.what());
X
xj.lin 已提交
248
        }
X
xj.lin 已提交
249 250
    }

S
starlord 已提交
251 252
    if(index_ == nullptr) {
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to merge";
S
starlord 已提交
253
        return Status(DB_ERROR, "index is null");
S
starlord 已提交
254 255
    }

X
xj.lin 已提交
256
    if (auto file_index = std::dynamic_pointer_cast<BFIndex>(to_merge)) {
X
xj.lin 已提交
257
        auto ec = index_->Add(file_index->Count(), file_index->GetRawVectors(), file_index->GetRawIds());
S
starlord 已提交
258
        if (ec != KNOWHERE_SUCCESS) {
X
xj.lin 已提交
259
            ENGINE_LOG_ERROR << "Merge: Add Error";
S
starlord 已提交
260
            return Status(DB_ERROR, "Merge: Add Error");
X
xj.lin 已提交
261
        }
X
xj.lin 已提交
262 263
        return Status::OK();
    } else {
S
starlord 已提交
264
        return Status(DB_ERROR, "file index type is not idmap");
X
xj.lin 已提交
265
    }
S
starlord 已提交
266 267 268
}

ExecutionEnginePtr
S
starlord 已提交
269
ExecutionEngineImpl::BuildIndex(const std::string &location, EngineType engine_type) {
X
xj.lin 已提交
270 271 272
    ENGINE_LOG_DEBUG << "Build index file: " << location << " from: " << location_;

    auto from_index = std::dynamic_pointer_cast<BFIndex>(index_);
S
starlord 已提交
273 274 275 276 277
    if(from_index == nullptr) {
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: from_index is null, failed to build index";
        return nullptr;
    }

S
starlord 已提交
278
    auto to_index = CreatetVecIndex(engine_type);
X
xj.lin 已提交
279
    if (!to_index) {
280
        throw Exception(DB_ERROR, "Could not create VecIndex");
X
xj.lin 已提交
281 282
    }

X
xj.lin 已提交
283 284
    Config build_cfg;
    build_cfg["dim"] = Dimension();
S
starlord 已提交
285 286
    build_cfg["metric_type"] = (metric_type_ == MetricType::IP) ? "IP" : "L2";
    build_cfg["gpu_id"] = gpu_num_;
X
xj.lin 已提交
287
    build_cfg["nlist"] = nlist_;
X
xj.lin 已提交
288 289
    AutoGenParams(to_index->GetType(), Count(), build_cfg);

X
xj.lin 已提交
290 291 292
    auto ec = to_index->BuildAll(Count(),
                                 from_index->GetRawVectors(),
                                 from_index->GetRawIds(),
X
xj.lin 已提交
293
                                 build_cfg);
294
    if (ec != KNOWHERE_SUCCESS) { throw Exception(DB_ERROR, "Build index error"); }
X
xj.lin 已提交
295

S
starlord 已提交
296
    return std::make_shared<ExecutionEngineImpl>(to_index, location, engine_type, metric_type_, nlist_);
S
starlord 已提交
297 298 299
}

Status ExecutionEngineImpl::Search(long n,
X
xj.lin 已提交
300 301
                                   const float *data,
                                   long k,
Y
Yu Kun 已提交
302
                                   long nprobe,
X
xj.lin 已提交
303 304
                                   float *distances,
                                   long *labels) const {
S
starlord 已提交
305 306
    if(index_ == nullptr) {
        ENGINE_LOG_ERROR << "ExecutionEngineImpl: index is null, failed to search";
S
starlord 已提交
307
        return Status(DB_ERROR, "index is null");
S
starlord 已提交
308 309
    }

Y
Yu Kun 已提交
310
    ENGINE_LOG_DEBUG << "Search Params: [k]  " << k << " [nprobe] " << nprobe;
S
starlord 已提交
311 312
    auto cfg = Config::object{{"k", k}, {"nprobe", nprobe}};
    auto ec = index_->Search(n, data, distances, labels, cfg);
S
starlord 已提交
313
    if (ec != KNOWHERE_SUCCESS) {
X
xj.lin 已提交
314
        ENGINE_LOG_ERROR << "Search error";
S
starlord 已提交
315
        return Status(DB_ERROR, "Search: Search Error");
X
xj.lin 已提交
316
    }
S
starlord 已提交
317 318 319 320
    return Status::OK();
}

Status ExecutionEngineImpl::Cache() {
321 322
    cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index_, PhysicalSize());
    zilliz::milvus::cache::CpuCacheMgr::GetInstance()->InsertItem(location_, obj);
S
starlord 已提交
323 324 325 326

    return Status::OK();
}

Y
Yu Kun 已提交
327
Status ExecutionEngineImpl::GpuCache(uint64_t gpu_id) {
328 329
    cache::DataObjPtr obj = std::make_shared<cache::DataObj>(index_, PhysicalSize());
    zilliz::milvus::cache::GpuCacheMgr::GetInstance(gpu_id)->InsertItem(location_, obj);
330 331

    return Status::OK();
Y
Yu Kun 已提交
332 333
}

X
xj.lin 已提交
334
// TODO(linxj): remove.
S
starlord 已提交
335
Status ExecutionEngineImpl::Init() {
X
xj.lin 已提交
336 337
    using namespace zilliz::milvus::server;
    ServerConfig &config = ServerConfig::GetInstance();
S
starlord 已提交
338 339
    ConfigNode server_config = config.GetConfig(CONFIG_DB);
    gpu_num_ = server_config.GetInt32Value(CONFIG_DB_BUILD_INDEX_GPU, 0);
S
starlord 已提交
340 341 342 343 344 345 346 347

    return Status::OK();
}


} // namespace engine
} // namespace milvus
} // namespace zilliz