ExecutionEngineImpl.cpp 10.4 KB
Newer Older
S
starlord 已提交
1 2 3 4 5
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
X
xj.lin 已提交
6
#include <stdexcept>
Y
Yu Kun 已提交
7
#include "src/cache/GpuCacheMgr.h"
X
xj.lin 已提交
8

S
starlord 已提交
9 10 11
#include "src/server/ServerConfig.h"
#include "src/metrics/Metrics.h"
#include "db/Log.h"
J
jinhai 已提交
12
#include "utils/CommonUtil.h"
S
starlord 已提交
13

X
xj.lin 已提交
14 15 16
#include "src/cache/CpuCacheMgr.h"
#include "ExecutionEngineImpl.h"
#include "wrapper/knowhere/vec_index.h"
S
starlord 已提交
17
#include "wrapper/knowhere/vec_impl.h"
X
xj.lin 已提交
18
#include "knowhere/common/exception.h"
S
starlord 已提交
19
#include "db/Exception.h"
X
xj.lin 已提交
20

S
starlord 已提交
21 22 23 24 25

namespace zilliz {
namespace milvus {
namespace engine {

X
xj.lin 已提交
26 27 28 29 30 31 32 33
namespace {
std::string GetMetricType() {
    server::ServerConfig &config = server::ServerConfig::GetInstance();
    server::ConfigNode engine_config = config.GetConfig(server::CONFIG_ENGINE);
    return engine_config.GetValue(server::CONFIG_METRICTYPE, "L2");
}
}

S
starlord 已提交
34
ExecutionEngineImpl::ExecutionEngineImpl(uint16_t dimension,
X
xj.lin 已提交
35 36 37
                                         const std::string &location,
                                         EngineType type)
    : location_(location), dim(dimension), build_type(type) {
X
xj.lin 已提交
38
    current_type = EngineType::FAISS_IDMAP;
X
xj.lin 已提交
39 40 41 42

    index_ = CreatetVecIndex(EngineType::FAISS_IDMAP);
    if (!index_) throw Exception("Create Empty VecIndex");

X
xj.lin 已提交
43 44
    Config build_cfg;
    build_cfg["dim"] = dimension;
X
xj.lin 已提交
45
    build_cfg["metric_type"] = GetMetricType();
X
xj.lin 已提交
46 47
    AutoGenParams(index_->GetType(), 0, build_cfg);
    auto ec = std::static_pointer_cast<BFIndex>(index_)->Build(build_cfg);
X
xj.lin 已提交
48
    if (ec != server::KNOWHERE_SUCCESS) { throw Exception("Build index error"); }
S
starlord 已提交
49 50
}

X
xj.lin 已提交
51 52 53 54
ExecutionEngineImpl::ExecutionEngineImpl(VecIndexPtr index,
                                         const std::string &location,
                                         EngineType type)
    : index_(std::move(index)), location_(location), build_type(type) {
X
xj.lin 已提交
55
    current_type = type;
X
xj.lin 已提交
56
}
S
starlord 已提交
57

X
xj.lin 已提交
58 59 60 61 62
VecIndexPtr ExecutionEngineImpl::CreatetVecIndex(EngineType type) {
    std::shared_ptr<VecIndex> index;
    switch (type) {
        case EngineType::FAISS_IDMAP: {
            index = GetVecIndexFactory(IndexType::FAISS_IDMAP);
S
starlord 已提交
63 64
            break;
        }
J
jinhai 已提交
65
        case EngineType::FAISS_IVFFLAT: {
X
xj.lin 已提交
66
            index = GetVecIndexFactory(IndexType::FAISS_IVFFLAT_MIX);
S
starlord 已提交
67 68
            break;
        }
J
jinhai 已提交
69 70
        case EngineType::FAISS_IVFSQ8: {
            index = GetVecIndexFactory(IndexType::FAISS_IVFSQ8_MIX);
S
starlord 已提交
71 72
            break;
        }
X
xj.lin 已提交
73 74 75 76
        case EngineType::NSG_MIX: {
            index = GetVecIndexFactory(IndexType::NSG_MIX);
            break;
        }
X
xj.lin 已提交
77
        default: {
S
starlord 已提交
78 79 80 81
            ENGINE_LOG_ERROR << "Invalid engine type";
            return nullptr;
        }
    }
X
xj.lin 已提交
82
    return index;
S
starlord 已提交
83 84 85
}

Status ExecutionEngineImpl::AddWithIds(long n, const float *xdata, const long *xids) {
X
xj.lin 已提交
86
    auto ec = index_->Add(n, xdata, xids);
X
xj.lin 已提交
87 88 89
    if (ec != server::KNOWHERE_SUCCESS) {
        return Status::Error("Add error");
    }
S
starlord 已提交
90 91 92 93
    return Status::OK();
}

size_t ExecutionEngineImpl::Count() const {
X
xj.lin 已提交
94
    return index_->Count();
S
starlord 已提交
95 96 97
}

size_t ExecutionEngineImpl::Size() const {
X
xj.lin 已提交
98
    return (size_t) (Count() * Dimension()) * sizeof(float);
S
starlord 已提交
99 100 101
}

size_t ExecutionEngineImpl::Dimension() const {
X
xj.lin 已提交
102
    return index_->Dimension();
S
starlord 已提交
103 104 105
}

size_t ExecutionEngineImpl::PhysicalSize() const {
J
jinhai 已提交
106
    return server::CommonUtil::GetFileSize(location_);
S
starlord 已提交
107 108 109
}

Status ExecutionEngineImpl::Serialize() {
X
xj.lin 已提交
110 111 112 113
    auto ec = write_index(index_, location_);
    if (ec != server::KNOWHERE_SUCCESS) {
        return Status::Error("Serialize: write to disk error");
    }
S
starlord 已提交
114 115 116
    return Status::OK();
}

J
jinhai 已提交
117
Status ExecutionEngineImpl::Load(bool to_cache) {
X
xj.lin 已提交
118
    index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
J
jinhai 已提交
119
    bool already_in_cache = (index_ != nullptr);
X
xj.lin 已提交
120 121
    auto start_time = METRICS_NOW_TIME;
    if (!index_) {
X
xj.lin 已提交
122 123 124 125 126 127 128 129 130
        try {
            index_ = read_index(location_);
            ENGINE_LOG_DEBUG << "Disk io from: " << location_;
        } catch (knowhere::KnowhereException &e) {
            ENGINE_LOG_ERROR << e.what();
            return Status::Error(e.what());
        } catch (std::exception &e) {
            return Status::Error(e.what());
        }
X
xj.lin 已提交
131 132
    }

J
jinhai 已提交
133
    if (!already_in_cache && to_cache) {
X
xj.lin 已提交
134 135 136
        Cache();
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time, end_time);
X
xj.lin 已提交
137

X
xj.lin 已提交
138
        server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
J
jinhai 已提交
139
        double physical_size = PhysicalSize();
X
xj.lin 已提交
140

J
jinhai 已提交
141 142
        server::Metrics::GetInstance().FaissDiskLoadSizeBytesHistogramObserve(physical_size);
        server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size / double(total_time));
X
xj.lin 已提交
143 144
    }
    return Status::OK();
X
xj.lin 已提交
145 146
}

147
Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
Y
Yu Kun 已提交
148 149 150 151 152 153 154 155 156 157 158 159 160
    index_ = zilliz::milvus::cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
    bool already_in_cache = (index_ != nullptr);
    auto start_time = METRICS_NOW_TIME;
    if (!index_) {
        try {
            index_ = index_->CopyToGpu(device_id);
            ENGINE_LOG_DEBUG << "CPU to GPU" << device_id;
        } catch (knowhere::KnowhereException &e) {
            ENGINE_LOG_ERROR << e.what();
            return Status::Error(e.what());
        } catch (std::exception &e) {
            return Status::Error(e.what());
        }
161
    }
Y
Yu Kun 已提交
162 163 164 165 166 167 168 169 170 171 172

    if (!already_in_cache) {
        GpuCache(device_id);
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time, end_time);
        double physical_size = PhysicalSize();

        server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
        server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size);
    }

173 174 175 176
    return Status::OK();
}

Status ExecutionEngineImpl::CopyToCpu() {
Y
Yu Kun 已提交
177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
    index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
    bool already_in_cache = (index_ != nullptr);
    auto start_time = METRICS_NOW_TIME;
    if (!index_) {
        try {
            index_ = index_->CopyToCpu();
            ENGINE_LOG_DEBUG << "GPU to CPU";
        } catch (knowhere::KnowhereException &e) {
            ENGINE_LOG_ERROR << e.what();
            return Status::Error(e.what());
        } catch (std::exception &e) {
            return Status::Error(e.what());
        }
    }

    if(!already_in_cache) {
        Cache();
        auto end_time = METRICS_NOW_TIME;
        auto total_time = METRICS_MICROSECONDS(start_time, end_time);
        double physical_size = PhysicalSize();

        server::Metrics::GetInstance().FaissDiskLoadDurationSecondsHistogramObserve(total_time);
        server::Metrics::GetInstance().FaissDiskLoadIOSpeedGaugeSet(physical_size);
200
    }
Y
Yu Kun 已提交
201

202 203 204
    return Status::OK();
}

X
xj.lin 已提交
205 206 207 208 209
Status ExecutionEngineImpl::Merge(const std::string &location) {
    if (location == location_) {
        return Status::Error("Cannot Merge Self");
    }
    ENGINE_LOG_DEBUG << "Merge index file: " << location << " to: " << location_;
S
starlord 已提交
210

X
xj.lin 已提交
211 212
    auto to_merge = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location);
    if (!to_merge) {
X
xj.lin 已提交
213 214 215 216 217 218 219 220
        try {
            to_merge = read_index(location);
        } catch (knowhere::KnowhereException &e) {
            ENGINE_LOG_ERROR << e.what();
            return Status::Error(e.what());
        } catch (std::exception &e) {
            return Status::Error(e.what());
        }
X
xj.lin 已提交
221 222
    }

X
xj.lin 已提交
223
    if (auto file_index = std::dynamic_pointer_cast<BFIndex>(to_merge)) {
X
xj.lin 已提交
224 225 226 227 228
        auto ec = index_->Add(file_index->Count(), file_index->GetRawVectors(), file_index->GetRawIds());
        if (ec != server::KNOWHERE_SUCCESS) {
            ENGINE_LOG_ERROR << "Merge: Add Error";
            return Status::Error("Merge: Add Error");
        }
X
xj.lin 已提交
229 230 231 232
        return Status::OK();
    } else {
        return Status::Error("file index type is not idmap");
    }
S
starlord 已提交
233 234 235
}

ExecutionEnginePtr
X
xj.lin 已提交
236 237 238 239 240
ExecutionEngineImpl::BuildIndex(const std::string &location) {
    ENGINE_LOG_DEBUG << "Build index file: " << location << " from: " << location_;

    auto from_index = std::dynamic_pointer_cast<BFIndex>(index_);
    auto to_index = CreatetVecIndex(build_type);
X
xj.lin 已提交
241 242 243 244
    if (!to_index) {
        throw Exception("Create Empty VecIndex");
    }

X
xj.lin 已提交
245 246
    Config build_cfg;
    build_cfg["dim"] = Dimension();
X
xj.lin 已提交
247
    build_cfg["metric_type"] = GetMetricType();
X
xj.lin 已提交
248
    build_cfg["gpu_id"] = gpu_num;
X
xj.lin 已提交
249
    build_cfg["nlist"] = nlist_;
X
xj.lin 已提交
250 251
    AutoGenParams(to_index->GetType(), Count(), build_cfg);

X
xj.lin 已提交
252 253 254
    auto ec = to_index->BuildAll(Count(),
                                 from_index->GetRawVectors(),
                                 from_index->GetRawIds(),
X
xj.lin 已提交
255
                                 build_cfg);
X
xj.lin 已提交
256
    if (ec != server::KNOWHERE_SUCCESS) { throw Exception("Build index error"); }
X
xj.lin 已提交
257 258

    return std::make_shared<ExecutionEngineImpl>(to_index, location, build_type);
S
starlord 已提交
259 260 261
}

Status ExecutionEngineImpl::Search(long n,
X
xj.lin 已提交
262 263
                                   const float *data,
                                   long k,
Y
Yu Kun 已提交
264
                                   long nprobe,
X
xj.lin 已提交
265 266
                                   float *distances,
                                   long *labels) const {
Y
Yu Kun 已提交
267 268
    ENGINE_LOG_DEBUG << "Search Params: [k]  " << k << " [nprobe] " << nprobe;
    auto ec = index_->Search(n, data, distances, labels, Config::object{{"k", k}, {"nprobe", nprobe}});
X
xj.lin 已提交
269 270 271 272
    if (ec != server::KNOWHERE_SUCCESS) {
        ENGINE_LOG_ERROR << "Search error";
        return Status::Error("Search: Search Error");
    }
S
starlord 已提交
273 274 275 276
    return Status::OK();
}

Status ExecutionEngineImpl::Cache() {
X
xj.lin 已提交
277
    zilliz::milvus::cache::CpuCacheMgr::GetInstance()->InsertItem(location_, index_);
S
starlord 已提交
278 279 280 281

    return Status::OK();
}

Y
Yu Kun 已提交
282 283 284 285
Status ExecutionEngineImpl::GpuCache(uint64_t gpu_id) {
    zilliz::milvus::cache::GpuCacheMgr::GetInstance(gpu_id)->InsertItem(location_, index_);
}

X
xj.lin 已提交
286
// TODO(linxj): remove.
S
starlord 已提交
287
Status ExecutionEngineImpl::Init() {
X
xj.lin 已提交
288 289 290 291 292 293
    using namespace zilliz::milvus::server;
    ServerConfig &config = ServerConfig::GetInstance();
    ConfigNode server_config = config.GetConfig(CONFIG_SERVER);
    gpu_num = server_config.GetInt32Value("gpu_index", 0);

    switch (build_type) {
J
jinhai 已提交
294 295
        case EngineType::FAISS_IVFSQ8:
        case EngineType::FAISS_IVFFLAT: {
X
xj.lin 已提交
296
            ConfigNode engine_config = config.GetConfig(CONFIG_ENGINE);
X
xj.lin 已提交
297
            nlist_ = engine_config.GetInt32Value(CONFIG_NLIST, 16384);
X
xj.lin 已提交
298 299 300
            break;
        }
    }
S
starlord 已提交
301 302 303 304 305 306 307 308

    return Status::OK();
}


} // namespace engine
} // namespace milvus
} // namespace zilliz