SearchTask.cpp 19.6 KB
Newer Older
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
J
jinhai 已提交
2
//
3 4
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
J
jinhai 已提交
5
//
6 7 8 9 10
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
J
jinhai 已提交
11

Z
update  
Zhiru Zhu 已提交
12 13
#include "scheduler/task/SearchTask.h"

S
shengjh 已提交
14
#include <fiu-local.h>
Z
update  
Zhiru Zhu 已提交
15

S
starlord 已提交
16
#include <algorithm>
Z
Zhiru Zhu 已提交
17
#include <memory>
S
starlord 已提交
18
#include <string>
19
#include <thread>
20
#include <unordered_map>
S
starlord 已提交
21
#include <utility>
22

23
#include "db/Utils.h"
Z
Zhiru Zhu 已提交
24 25
#include "db/engine/EngineFactory.h"
#include "metrics/Metrics.h"
S
shengjh 已提交
26
#include "scheduler/SchedInst.h"
Z
Zhiru Zhu 已提交
27
#include "scheduler/job/SearchJob.h"
28
#include "segment/SegmentReader.h"
29
#include "utils/CommonUtil.h"
Z
Zhiru Zhu 已提交
30 31
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
32
#include "utils/ValidationUtil.h"
Z
Zhiru Zhu 已提交
33

34
namespace milvus {
W
wxyu 已提交
35
namespace scheduler {
36 37 38 39

static constexpr size_t PARALLEL_REDUCE_THRESHOLD = 10000;
static constexpr size_t PARALLEL_REDUCE_BATCH = 1000;

40
// TODO(wxyu): remove unused code
S
starlord 已提交
41 42
// bool
// NeedParallelReduce(uint64_t nq, uint64_t topk) {
43 44 45 46 47 48 49 50 51 52
//    server::ServerConfig &config = server::ServerConfig::GetInstance();
//    server::ConfigNode &db_config = config.GetConfig(server::CONFIG_DB);
//    bool need_parallel = db_config.GetBoolValue(server::CONFIG_DB_PARALLEL_REDUCE, false);
//    if (!need_parallel) {
//        return false;
//    }
//
//    return nq * topk >= PARALLEL_REDUCE_THRESHOLD;
//}
//
S
starlord 已提交
53 54
// void
// ParallelReduce(std::function<void(size_t, size_t)> &reduce_function, size_t max_index) {
55 56 57 58 59 60
//    size_t reduce_batch = PARALLEL_REDUCE_BATCH;
//
//    auto thread_count = std::thread::hardware_concurrency() - 1; //not all core do this work
//    if (thread_count > 0) {
//        reduce_batch = max_index / thread_count + 1;
//    }
61
//    LOG_ENGINE_DEBUG_ << "use " << thread_count <<
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
//                     " thread parallelly do reduce, each thread process " << reduce_batch << " vectors";
//
//    std::vector<std::shared_ptr<std::thread> > thread_array;
//    size_t from_index = 0;
//    while (from_index < max_index) {
//        size_t to_index = from_index + reduce_batch;
//        if (to_index > max_index) {
//            to_index = max_index;
//        }
//
//        auto reduce_thread = std::make_shared<std::thread>(reduce_function, from_index, to_index);
//        thread_array.push_back(reduce_thread);
//
//        from_index = to_index;
//    }
//
//    for (auto &thread_ptr : thread_array) {
//        thread_ptr->join();
//    }
//}
82 83 84

void
CollectFileMetrics(int file_type, size_t file_size) {
Y
yudong.cai 已提交
85
    server::MetricsBase& inst = server::Metrics::GetInstance();
86
    switch (file_type) {
J
Jin Hai 已提交
87 88
        case SegmentSchema::RAW:
        case SegmentSchema::TO_INDEX: {
Y
yudong.cai 已提交
89 90 91
            inst.RawFileSizeHistogramObserve(file_size);
            inst.RawFileSizeTotalIncrement(file_size);
            inst.RawFileSizeGaugeSet(file_size);
92 93 94
            break;
        }
        default: {
Y
yudong.cai 已提交
95 96 97
            inst.IndexFileSizeHistogramObserve(file_size);
            inst.IndexFileSizeTotalIncrement(file_size);
            inst.IndexFileSizeGaugeSet(file_size);
98 99 100 101 102
            break;
        }
    }
}

J
Jin Hai 已提交
103
XSearchTask::XSearchTask(const std::shared_ptr<server::Context>& context, SegmentSchemaPtr file, TaskLabelPtr label)
Z
Zhiru Zhu 已提交
104
    : Task(TaskType::SearchTask, std::move(label)), context_(context), file_(file) {
W
wxyu 已提交
105
    if (file_) {
G
groot 已提交
106 107
        // distance -- value 0 means two vectors equal, ascending reduce, L2/HAMMING/JACCARD/TONIMOTO ...
        // similarity -- infinity value means two vectors equal, descending reduce, IP
X
fix  
xiaojun.lin 已提交
108 109
        if (file_->metric_type_ == static_cast<int>(MetricType::IP) &&
            file_->engine_type_ != static_cast<int>(EngineType::FAISS_PQ)) {
G
groot 已提交
110
            ascending_reduce = false;
111
        }
112 113

        EngineType engine_type;
J
Jin Hai 已提交
114 115 116
        if (file->file_type_ == SegmentSchema::FILE_TYPE::RAW ||
            file->file_type_ == SegmentSchema::FILE_TYPE::TO_INDEX ||
            file->file_type_ == SegmentSchema::FILE_TYPE::BACKUP) {
117 118
            engine_type = engine::utils::IsBinaryMetricType(file->metric_type_) ? EngineType::FAISS_BIN_IDMAP
                                                                                : EngineType::FAISS_IDMAP;
119 120 121 122
        } else {
            engine_type = (EngineType)file->engine_type_;
        }

123 124 125 126
        milvus::json json_params;
        if (!file_->index_params_.empty()) {
            json_params = milvus::json::parse(file_->index_params_);
        }
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
        //        if (auto job = job_.lock()) {
        //            auto search_job = std::static_pointer_cast<scheduler::SearchJob>(job);
        //            query::GeneralQueryPtr general_query = search_job->general_query();
        //            if (general_query != nullptr) {
        //                std::unordered_map<std::string, engine::DataType> types;
        //                auto attr_type = search_job->attr_type();
        //                auto type_it = attr_type.begin();
        //                for (; type_it != attr_type.end(); type_it++) {
        //                    types.insert(std::make_pair(type_it->first, (engine::DataType)(type_it->second)));
        //                }
        //                index_engine_ =
        //                    EngineFactory::Build(file_->dimension_, file_->location_, engine_type,
        //                                         (MetricType)file_->metric_type_, types, json_params);
        //            }
        //        }
142
        index_engine_ = EngineFactory::Build(file_->dimension_, file_->location_, engine_type,
143
                                             (MetricType)file_->metric_type_, json_params);
W
wxyu 已提交
144
    }
W
wxyu 已提交
145 146
}

147 148
void
XSearchTask::Load(LoadType type, uint8_t device_id) {
G
groot 已提交
149
    milvus::server::ContextFollower tracer(context_, "XSearchTask::Load " + std::to_string(file_->id_));
Z
Zhiru Zhu 已提交
150

151
    TimeRecorder rc(LogOut("[%s][%ld]", "search", 0));
W
wxyu 已提交
152 153
    Status stat = Status::OK();
    std::string error_msg;
154
    std::string type_str;
155 156

    try {
S
shengjh 已提交
157
        fiu_do_on("XSearchTask.Load.throw_std_exception", throw std::exception());
W
wxyu 已提交
158
        if (type == LoadType::DISK2CPU) {
W
wxyu 已提交
159
            stat = index_engine_->Load();
160
            type_str = "DISK2CPU";
W
wxyu 已提交
161
        } else if (type == LoadType::CPU2GPU) {
W
wxyu 已提交
162 163 164 165 166
            bool hybrid = false;
            if (index_engine_->IndexEngineType() == engine::EngineType::FAISS_IVFSQ8H) {
                hybrid = true;
            }
            stat = index_engine_->CopyToGpu(device_id, hybrid);
167
            type_str = "CPU2GPU" + std::to_string(device_id);
W
wxyu 已提交
168
        } else if (type == LoadType::GPU2CPU) {
W
wxyu 已提交
169
            stat = index_engine_->CopyToCpu();
170
            type_str = "GPU2CPU";
W
wxyu 已提交
171
        } else {
W
wxyu 已提交
172 173
            error_msg = "Wrong load type";
            stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
W
wxyu 已提交
174
        }
S
starlord 已提交
175 176
    } catch (std::exception& ex) {
        // typical error: out of disk space or permition denied
W
wxyu 已提交
177
        error_msg = "Failed to load index file: " + std::string(ex.what());
178
        LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] Encounter execption: %s", "search", 0, error_msg.c_str());
W
wxyu 已提交
179 180
        stat = Status(SERVER_UNEXPECTED_ERROR, error_msg);
    }
S
shengjh 已提交
181
    fiu_do_on("XSearchTask.Load.out_of_memory", stat = Status(SERVER_UNEXPECTED_ERROR, "out of memory"));
W
wxyu 已提交
182 183

    if (!stat.ok()) {
184 185
        Status s;
        if (stat.ToString().find("out of memory") != std::string::npos) {
186
            error_msg = "out of memory: " + type_str + " : " + stat.message();
187 188
            s = Status(SERVER_OUT_OF_MEMORY, error_msg);
        } else {
189
            error_msg = "Failed to load index file: " + type_str + " : " + stat.message();
190 191
            s = Status(SERVER_UNEXPECTED_ERROR, error_msg);
        }
192

S
starlord 已提交
193
        if (auto job = job_.lock()) {
W
wxyu 已提交
194 195 196
            auto search_job = std::static_pointer_cast<scheduler::SearchJob>(job);
            search_job->SearchDone(file_->id_);
            search_job->GetStatus() = s;
197 198 199 200 201
        }

        return;
    }

202
    size_t file_size = index_engine_->Size();
203

G
groot 已提交
204
    std::string info = "Search task load file id:" + std::to_string(file_->id_) + " " + type_str +
J
JinHai-CN 已提交
205
                       " file type:" + std::to_string(file_->file_type_) + " size:" + std::to_string(file_size) +
S
starlord 已提交
206
                       " bytes from location: " + file_->location_ + " totally cost";
G
groot 已提交
207
    rc.ElapseFromBegin(info);
208 209 210

    CollectFileMetrics(file_->file_type_, file_size);

S
starlord 已提交
211
    // step 2: return search task for later execution
212 213
    index_id_ = file_->id_;
    index_type_ = file_->file_type_;
S
starlord 已提交
214
    //    search_contexts_.swap(search_contexts_);
215 216 217 218
}

void
XSearchTask::Execute() {
219
    milvus::server::ContextFollower tracer(context_, "XSearchTask::Execute " + std::to_string(index_id_));
Z
Zhiru Zhu 已提交
220

221
    //    LOG_ENGINE_DEBUG_ << "Searching in file id:" << index_id_ << " with "
S
starlord 已提交
222
    //                     << search_contexts_.size() << " tasks";
223

224 225
    //    TimeRecorder rc("DoSearch file id:" + std::to_string(index_id_));
    TimeRecorder rc(LogOut("[%s][%ld] DoSearch file id:%ld", "search", 0, index_id_));
226

Y
Yu Kun 已提交
227
    server::CollectDurationMetrics metrics(index_type_);
228

S
starlord 已提交
229
    std::vector<int64_t> output_ids;
J
jinhai 已提交
230
    std::vector<float> output_distance;
W
wxyu 已提交
231 232 233

    if (auto job = job_.lock()) {
        auto search_job = std::static_pointer_cast<scheduler::SearchJob>(job);
G
groot 已提交
234 235 236 237 238 239

        if (index_engine_ == nullptr) {
            search_job->SearchDone(index_id_);
            return;
        }

S
starlord 已提交
240
        // step 1: allocate memory
241 242
        query::GeneralQueryPtr general_query = search_job->general_query();

W
wxyu 已提交
243 244
        uint64_t nq = search_job->nq();
        uint64_t topk = search_job->topk();
245

246
        const milvus::json& extra_params = search_job->extra_params();
G
groot 已提交
247
        const engine::VectorsData& vectors = search_job->vectors();
Y
yudong.cai 已提交
248 249 250

        output_ids.resize(topk * nq);
        output_distance.resize(topk * nq);
S
starlord 已提交
251 252
        std::string hdr =
            "job " + std::to_string(search_job->id()) + " nq " + std::to_string(nq) + " topk " + std::to_string(topk);
253 254

        try {
S
shengjh 已提交
255
            fiu_do_on("XSearchTask.Execute.throw_std_exception", throw std::exception());
S
starlord 已提交
256
            // step 2: search
W
wxyu 已提交
257 258 259 260 261
            bool hybrid = false;
            if (index_engine_->IndexEngineType() == engine::EngineType::FAISS_IVFSQ8H &&
                ResMgrInst::GetInstance()->GetResource(path().Last())->type() == ResourceType::CPU) {
                hybrid = true;
            }
G
groot 已提交
262
            Status s;
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
            if (general_query != nullptr) {
                std::unordered_map<std::string, engine::DataType> types;
                auto attr_type = search_job->attr_type();
                auto type_it = attr_type.begin();
                for (; type_it != attr_type.end(); type_it++) {
                    types.insert(std::make_pair(type_it->first, (engine::DataType)(type_it->second)));
                }
                faiss::ConcurrentBitsetPtr bitset;
                s = index_engine_->ExecBinaryQuery(general_query, bitset, types, nq, topk, output_distance, output_ids);

                if (!s.ok()) {
                    search_job->GetStatus() = s;
                    search_job->SearchDone(index_id_);
                    return;
                }

                auto spec_k = file_->row_count_ < topk ? file_->row_count_ : topk;
                if (spec_k == 0) {
                    LOG_ENGINE_WARNING_ << "Searching in an empty file. file location = " << file_->location_;
S
shengjun.li 已提交
282
                } else {
283 284 285 286 287 288 289 290 291
                    std::unique_lock<std::mutex> lock(search_job->mutex());
                    search_job->vector_count() = nq;
                    XSearchTask::MergeTopkToResultSet(output_ids, output_distance, spec_k, nq, topk, ascending_reduce,
                                                      search_job->GetResultIds(), search_job->GetResultDistances());
                }
                search_job->SearchDone(index_id_);
                index_engine_ = nullptr;
                return;
            }
G
groot 已提交
292
            if (!vectors.float_data_.empty()) {
293
                s = index_engine_->Search(nq, vectors.float_data_.data(), topk, extra_params, output_distance.data(),
G
groot 已提交
294 295
                                          output_ids.data(), hybrid);
            } else if (!vectors.binary_data_.empty()) {
296
                s = index_engine_->Search(nq, vectors.binary_data_.data(), topk, extra_params, output_distance.data(),
G
groot 已提交
297 298
                                          output_ids.data(), hybrid);
            }
299

S
shengjh 已提交
300 301
            fiu_do_on("XSearchTask.Execute.search_fail", s = Status(SERVER_UNEXPECTED_ERROR, ""));

302 303 304 305 306
            if (!s.ok()) {
                search_job->GetStatus() = s;
                search_job->SearchDone(index_id_);
                return;
            }
307

C
Cai Yudong 已提交
308 309
            // double span = rc.RecordSection(hdr + ", do search");
            // search_job->AccumSearchCost(span);
310

Y
yudong.cai 已提交
311
            // step 3: pick up topk result
312
            auto spec_k = file_->row_count_ < topk ? file_->row_count_ : topk;
Z
update  
Zhiru Zhu 已提交
313
            if (spec_k == 0) {
314 315
                LOG_ENGINE_WARNING_ << LogOut("[%s][%ld] Searching in an empty file. file location = %s", "search", 0,
                                              file_->location_.c_str());
S
shengjun.li 已提交
316
            } else {
G
groot 已提交
317
                std::unique_lock<std::mutex> lock(search_job->mutex());
G
groot 已提交
318
                XSearchTask::MergeTopkToResultSet(output_ids, output_distance, spec_k, nq, topk, ascending_reduce,
G
groot 已提交
319
                                                  search_job->GetResultIds(), search_job->GetResultDistances());
320 321 322
                LOG_ENGINE_DEBUG_ << "Merged result: "
                                  << "nq = " << nq << ", topk = " << topk << ", len of ids = " << output_ids.size()
                                  << ", len of distance = " << output_distance.size();
G
groot 已提交
323
            }
324

C
Cai Yudong 已提交
325 326
            // span = rc.RecordSection(hdr + ", reduce topk");
            // search_job->AccumReduceCost(span);
S
starlord 已提交
327
        } catch (std::exception& ex) {
328
            LOG_ENGINE_ERROR_ << LogOut("[%s][%ld] SearchTask encounter exception: %s", "search", 0, ex.what());
C
cqy123456 已提交
329
            search_job->GetStatus() = Status(SERVER_UNEXPECTED_ERROR, ex.what());
330 331
        }

Y
yudong.cai 已提交
332
        // step 4: notify to send result to client
W
wxyu 已提交
333
        search_job->SearchDone(index_id_);
334 335 336
    }

    rc.ElapseFromBegin("totally cost");
337 338 339

    // release index in resource
    index_engine_ = nullptr;
340 341
}

Y
yudong.cai 已提交
342
void
G
groot 已提交
343 344 345 346
XSearchTask::MergeTopkToResultSet(const scheduler::ResultIds& src_ids, const scheduler::ResultDistances& src_distances,
                                  size_t src_k, size_t nq, size_t topk, bool ascending, scheduler::ResultIds& tar_ids,
                                  scheduler::ResultDistances& tar_distances) {
    if (src_ids.empty()) {
347
        LOG_ENGINE_DEBUG_ << LogOut("[%s][%d] Search result is empty.", "search", 0);
G
groot 已提交
348
        return;
Y
yudong.cai 已提交
349 350
    }

G
groot 已提交
351 352 353 354 355 356
    size_t tar_k = tar_ids.size() / nq;
    size_t buf_k = std::min(topk, src_k + tar_k);

    scheduler::ResultIds buf_ids(nq * buf_k, -1);
    scheduler::ResultDistances buf_distances(nq * buf_k, 0.0);

Y
yudong.cai 已提交
357
    for (uint64_t i = 0; i < nq; i++) {
G
groot 已提交
358 359 360 361 362 363 364 365 366 367 368 369
        size_t buf_k_j = 0, src_k_j = 0, tar_k_j = 0;
        size_t buf_idx, src_idx, tar_idx;

        size_t buf_k_multi_i = buf_k * i;
        size_t src_k_multi_i = topk * i;
        size_t tar_k_multi_i = tar_k * i;

        while (buf_k_j < buf_k && src_k_j < src_k && tar_k_j < tar_k) {
            src_idx = src_k_multi_i + src_k_j;
            tar_idx = tar_k_multi_i + tar_k_j;
            buf_idx = buf_k_multi_i + buf_k_j;

370 371
            if ((tar_ids[tar_idx] == -1) ||  // initialized value
                (ascending && src_distances[src_idx] < tar_distances[tar_idx]) ||
G
groot 已提交
372 373 374 375 376 377 378 379
                (!ascending && src_distances[src_idx] > tar_distances[tar_idx])) {
                buf_ids[buf_idx] = src_ids[src_idx];
                buf_distances[buf_idx] = src_distances[src_idx];
                src_k_j++;
            } else {
                buf_ids[buf_idx] = tar_ids[tar_idx];
                buf_distances[buf_idx] = tar_distances[tar_idx];
                tar_k_j++;
Y
youny626 已提交
380
            }
G
groot 已提交
381 382
            buf_k_j++;
        }
Y
youny626 已提交
383

G
groot 已提交
384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401
        if (buf_k_j < buf_k) {
            if (src_k_j < src_k) {
                while (buf_k_j < buf_k && src_k_j < src_k) {
                    buf_idx = buf_k_multi_i + buf_k_j;
                    src_idx = src_k_multi_i + src_k_j;
                    buf_ids[buf_idx] = src_ids[src_idx];
                    buf_distances[buf_idx] = src_distances[src_idx];
                    src_k_j++;
                    buf_k_j++;
                }
            } else {
                while (buf_k_j < buf_k && tar_k_j < tar_k) {
                    buf_idx = buf_k_multi_i + buf_k_j;
                    tar_idx = tar_k_multi_i + tar_k_j;
                    buf_ids[buf_idx] = tar_ids[tar_idx];
                    buf_distances[buf_idx] = tar_distances[tar_idx];
                    tar_k_j++;
                    buf_k_j++;
Y
yudong.cai 已提交
402
                }
403 404 405
            }
        }
    }
G
groot 已提交
406 407
    tar_ids.swap(buf_ids);
    tar_distances.swap(buf_distances);
Y
yudong.cai 已提交
408
}
409

410 411 412 413 414 415 416 417 418 419
const std::string&
XSearchTask::GetLocation() const {
    return file_->location_;
}

size_t
XSearchTask::GetIndexId() const {
    return file_->id_;
}

S
starlord 已提交
420 421
// void
// XSearchTask::MergeTopkArray(std::vector<int64_t>& tar_ids, std::vector<float>& tar_distance, uint64_t& tar_input_k,
422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484
//                            const std::vector<int64_t>& src_ids, const std::vector<float>& src_distance,
//                            uint64_t src_input_k, uint64_t nq, uint64_t topk, bool ascending) {
//    if (src_ids.empty() || src_distance.empty()) {
//        return;
//    }
//
//    uint64_t output_k = std::min(topk, tar_input_k + src_input_k);
//    std::vector<int64_t> id_buf(nq * output_k, -1);
//    std::vector<float> dist_buf(nq * output_k, 0.0);
//
//    uint64_t buf_k, src_k, tar_k;
//    uint64_t src_idx, tar_idx, buf_idx;
//    uint64_t src_input_k_multi_i, tar_input_k_multi_i, buf_k_multi_i;
//
//    for (uint64_t i = 0; i < nq; i++) {
//        src_input_k_multi_i = src_input_k * i;
//        tar_input_k_multi_i = tar_input_k * i;
//        buf_k_multi_i = output_k * i;
//        buf_k = src_k = tar_k = 0;
//        while (buf_k < output_k && src_k < src_input_k && tar_k < tar_input_k) {
//            src_idx = src_input_k_multi_i + src_k;
//            tar_idx = tar_input_k_multi_i + tar_k;
//            buf_idx = buf_k_multi_i + buf_k;
//            if ((ascending && src_distance[src_idx] < tar_distance[tar_idx]) ||
//                (!ascending && src_distance[src_idx] > tar_distance[tar_idx])) {
//                id_buf[buf_idx] = src_ids[src_idx];
//                dist_buf[buf_idx] = src_distance[src_idx];
//                src_k++;
//            } else {
//                id_buf[buf_idx] = tar_ids[tar_idx];
//                dist_buf[buf_idx] = tar_distance[tar_idx];
//                tar_k++;
//            }
//            buf_k++;
//        }
//
//        if (buf_k < output_k) {
//            if (src_k < src_input_k) {
//                while (buf_k < output_k && src_k < src_input_k) {
//                    src_idx = src_input_k_multi_i + src_k;
//                    buf_idx = buf_k_multi_i + buf_k;
//                    id_buf[buf_idx] = src_ids[src_idx];
//                    dist_buf[buf_idx] = src_distance[src_idx];
//                    src_k++;
//                    buf_k++;
//                }
//            } else {
//                while (buf_k < output_k && tar_k < tar_input_k) {
//                    tar_idx = tar_input_k_multi_i + tar_k;
//                    buf_idx = buf_k_multi_i + buf_k;
//                    id_buf[buf_idx] = tar_ids[tar_idx];
//                    dist_buf[buf_idx] = tar_distance[tar_idx];
//                    tar_k++;
//                    buf_k++;
//                }
//            }
//        }
//    }
//
//    tar_ids.swap(id_buf);
//    tar_distance.swap(dist_buf);
//    tar_input_k = output_k;
//}
485

S
starlord 已提交
486 487
}  // namespace scheduler
}  // namespace milvus