diff --git a/core/src/scheduler/task/SearchTask.cpp b/core/src/scheduler/task/SearchTask.cpp index 2836d41dd4cc43866027b4989a2d5c2b82446c83..1f559ff5d0dffe91c2852c8b4cd49d310ca8df62 100644 --- a/core/src/scheduler/task/SearchTask.cpp +++ b/core/src/scheduler/task/SearchTask.cpp @@ -307,71 +307,71 @@ XSearchTask::MergeTopkToResultSet(const std::vector& input_ids, const s } } -void -XSearchTask::MergeTopkArray(std::vector& tar_ids, std::vector& tar_distance, uint64_t& tar_input_k, - const std::vector& src_ids, const std::vector& src_distance, - uint64_t src_input_k, uint64_t nq, uint64_t topk, bool ascending) { - if (src_ids.empty() || src_distance.empty()) { - return; - } - - uint64_t output_k = std::min(topk, tar_input_k + src_input_k); - std::vector id_buf(nq * output_k, -1); - std::vector dist_buf(nq * output_k, 0.0); - - uint64_t buf_k, src_k, tar_k; - uint64_t src_idx, tar_idx, buf_idx; - uint64_t src_input_k_multi_i, tar_input_k_multi_i, buf_k_multi_i; - - for (uint64_t i = 0; i < nq; i++) { - src_input_k_multi_i = src_input_k * i; - tar_input_k_multi_i = tar_input_k * i; - buf_k_multi_i = output_k * i; - buf_k = src_k = tar_k = 0; - while (buf_k < output_k && src_k < src_input_k && tar_k < tar_input_k) { - src_idx = src_input_k_multi_i + src_k; - tar_idx = tar_input_k_multi_i + tar_k; - buf_idx = buf_k_multi_i + buf_k; - if ((ascending && src_distance[src_idx] < tar_distance[tar_idx]) || - (!ascending && src_distance[src_idx] > tar_distance[tar_idx])) { - id_buf[buf_idx] = src_ids[src_idx]; - dist_buf[buf_idx] = src_distance[src_idx]; - src_k++; - } else { - id_buf[buf_idx] = tar_ids[tar_idx]; - dist_buf[buf_idx] = tar_distance[tar_idx]; - tar_k++; - } - buf_k++; - } - - if (buf_k < output_k) { - if (src_k < src_input_k) { - while (buf_k < output_k && src_k < src_input_k) { - src_idx = src_input_k_multi_i + src_k; - buf_idx = buf_k_multi_i + buf_k; - id_buf[buf_idx] = src_ids[src_idx]; - dist_buf[buf_idx] = src_distance[src_idx]; - src_k++; - buf_k++; - } - } else { - while (buf_k < output_k && tar_k < tar_input_k) { - tar_idx = tar_input_k_multi_i + tar_k; - buf_idx = buf_k_multi_i + buf_k; - id_buf[buf_idx] = tar_ids[tar_idx]; - dist_buf[buf_idx] = tar_distance[tar_idx]; - tar_k++; - buf_k++; - } - } - } - } - - tar_ids.swap(id_buf); - tar_distance.swap(dist_buf); - tar_input_k = output_k; -} +//void +//XSearchTask::MergeTopkArray(std::vector& tar_ids, std::vector& tar_distance, uint64_t& tar_input_k, +// const std::vector& src_ids, const std::vector& src_distance, +// uint64_t src_input_k, uint64_t nq, uint64_t topk, bool ascending) { +// if (src_ids.empty() || src_distance.empty()) { +// return; +// } +// +// uint64_t output_k = std::min(topk, tar_input_k + src_input_k); +// std::vector id_buf(nq * output_k, -1); +// std::vector dist_buf(nq * output_k, 0.0); +// +// uint64_t buf_k, src_k, tar_k; +// uint64_t src_idx, tar_idx, buf_idx; +// uint64_t src_input_k_multi_i, tar_input_k_multi_i, buf_k_multi_i; +// +// for (uint64_t i = 0; i < nq; i++) { +// src_input_k_multi_i = src_input_k * i; +// tar_input_k_multi_i = tar_input_k * i; +// buf_k_multi_i = output_k * i; +// buf_k = src_k = tar_k = 0; +// while (buf_k < output_k && src_k < src_input_k && tar_k < tar_input_k) { +// src_idx = src_input_k_multi_i + src_k; +// tar_idx = tar_input_k_multi_i + tar_k; +// buf_idx = buf_k_multi_i + buf_k; +// if ((ascending && src_distance[src_idx] < tar_distance[tar_idx]) || +// (!ascending && src_distance[src_idx] > tar_distance[tar_idx])) { +// id_buf[buf_idx] = src_ids[src_idx]; +// dist_buf[buf_idx] = src_distance[src_idx]; +// src_k++; +// } else { +// id_buf[buf_idx] = tar_ids[tar_idx]; +// dist_buf[buf_idx] = tar_distance[tar_idx]; +// tar_k++; +// } +// buf_k++; +// } +// +// if (buf_k < output_k) { +// if (src_k < src_input_k) { +// while (buf_k < output_k && src_k < src_input_k) { +// src_idx = src_input_k_multi_i + src_k; +// buf_idx = buf_k_multi_i + buf_k; +// id_buf[buf_idx] = src_ids[src_idx]; +// dist_buf[buf_idx] = src_distance[src_idx]; +// src_k++; +// buf_k++; +// } +// } else { +// while (buf_k < output_k && tar_k < tar_input_k) { +// tar_idx = tar_input_k_multi_i + tar_k; +// buf_idx = buf_k_multi_i + buf_k; +// id_buf[buf_idx] = tar_ids[tar_idx]; +// dist_buf[buf_idx] = tar_distance[tar_idx]; +// tar_k++; +// buf_k++; +// } +// } +// } +// } +// +// tar_ids.swap(id_buf); +// tar_distance.swap(dist_buf); +// tar_input_k = output_k; +//} } // namespace scheduler } // namespace milvus diff --git a/core/src/scheduler/task/SearchTask.h b/core/src/scheduler/task/SearchTask.h index 6a7381e0e66d5fd138026db4b17e2583f998fadd..bbbf891383440b327db7be7a5435f5d39ec5a952 100644 --- a/core/src/scheduler/task/SearchTask.h +++ b/core/src/scheduler/task/SearchTask.h @@ -42,10 +42,10 @@ class XSearchTask : public Task { MergeTopkToResultSet(const std::vector& input_ids, const std::vector& input_distance, uint64_t input_k, uint64_t nq, uint64_t topk, bool ascending, scheduler::ResultSet& result); - static void - MergeTopkArray(std::vector& tar_ids, std::vector& tar_distance, uint64_t& tar_input_k, - const std::vector& src_ids, const std::vector& src_distance, uint64_t src_input_k, - uint64_t nq, uint64_t topk, bool ascending); +// static void +// MergeTopkArray(std::vector& tar_ids, std::vector& tar_distance, uint64_t& tar_input_k, +// const std::vector& src_ids, const std::vector& src_distance, uint64_t src_input_k, +// uint64_t nq, uint64_t topk, bool ascending); public: TableFileSchemaPtr file_; diff --git a/core/unittest/db/test_search.cpp b/core/unittest/db/test_search.cpp index dc393b7a26f1e9f233865c75702eab2b320e8038..db736923a64ffed58150721fc9e600dded39dc2e 100644 --- a/core/unittest/db/test_search.cpp +++ b/core/unittest/db/test_search.cpp @@ -30,6 +30,7 @@ namespace ms = milvus::scheduler; void BuildResult(std::vector& output_ids, std::vector& output_distance, + uint64_t input_k, uint64_t topk, uint64_t nq, bool ascending) { @@ -39,9 +40,15 @@ BuildResult(std::vector& output_ids, output_distance.resize(nq * topk); for (uint64_t i = 0; i < nq; i++) { - for (uint64_t j = 0; j < topk; j++) { + //insert valid items + for (uint64_t j = 0; j < input_k; j++) { output_ids[i * topk + j] = (int64_t)(drand48() * 100000); - output_distance[i * topk + j] = ascending ? (j + drand48()) : ((topk - j) + drand48()); + output_distance[i * topk + j] = ascending ? (j + drand48()) : ((input_k - j) + drand48()); + } + //insert invalid items + for(uint64_t j = input_k; j < topk; j++) { + output_ids[i * topk + j] = -1; + output_distance[i * topk + j] = -1.0; } } } @@ -83,23 +90,32 @@ CheckTopkResult(const std::vector& input_ids_1, ASSERT_EQ(input_ids_1.size(), input_distance_1.size()); ASSERT_EQ(input_ids_2.size(), input_distance_2.size()); - uint64_t input_k1 = input_ids_1.size() / nq; - uint64_t input_k2 = input_ids_2.size() / nq; - for (int64_t i = 0; i < nq; i++) { std::vector - src_vec(input_distance_1.begin() + i * input_k1, input_distance_1.begin() + (i + 1) * input_k1); + src_vec(input_distance_1.begin() + i * topk, input_distance_1.begin() + (i + 1) * topk); src_vec.insert(src_vec.end(), - input_distance_2.begin() + i * input_k2, - input_distance_2.begin() + (i + 1) * input_k2); + input_distance_2.begin() + i * topk, + input_distance_2.begin() + (i + 1) * topk); if (ascending) { std::sort(src_vec.begin(), src_vec.end()); } else { std::sort(src_vec.begin(), src_vec.end(), std::greater()); } - uint64_t n = std::min(topk, input_k1 + input_k2); + //erase invalid items + std::vector::iterator iter; + for (iter = src_vec.begin(); iter != src_vec.end();) { + if (*iter < 0.0) + iter = src_vec.erase(iter); + else + ++iter; + } + + uint64_t n = std::min(topk, result[i].size()); for (uint64_t j = 0; j < n; j++) { + if(result[i][j].first < 0) { + continue; + } if (src_vec[j] != result[i][j].second) { std::cout << src_vec[j] << " " << result[i][j].second << std::endl; } @@ -114,8 +130,8 @@ void MergeTopkToResultSetTest(uint64_t topk_1, uint64_t topk_2, uint64_t nq, uin std::vector ids1, ids2; std::vector dist1, dist2; ms::ResultSet result; - BuildResult(ids1, dist1, topk_1, nq, ascending); - BuildResult(ids2, dist2, topk_2, nq, ascending); + BuildResult(ids1, dist1, topk_1, topk, nq, ascending); + BuildResult(ids2, dist2, topk_2, topk, nq, ascending); ms::XSearchTask::MergeTopkToResultSet(ids1, dist1, topk_1, nq, topk, ascending, result); ms::XSearchTask::MergeTopkToResultSet(ids2, dist2, topk_2, nq, topk, ascending, result); CheckTopkResult(ids1, dist1, ids2, dist2, topk, nq, ascending, result); @@ -142,62 +158,64 @@ TEST(DBSearchTest, MERGE_RESULT_SET_TEST) { MergeTopkToResultSetTest(TOP_K/2, TOP_K/3, NQ, TOP_K, false); } -void MergeTopkArrayTest(uint64_t topk_1, uint64_t topk_2, uint64_t nq, uint64_t topk, bool ascending) { - std::vector ids1, ids2; - std::vector dist1, dist2; - ms::ResultSet result; - BuildResult(ids1, dist1, topk_1, nq, ascending); - BuildResult(ids2, dist2, topk_2, nq, ascending); - uint64_t result_topk = std::min(topk, topk_1 + topk_2); - ms::XSearchTask::MergeTopkArray(ids1, dist1, topk_1, ids2, dist2, topk_2, nq, topk, ascending); - if (ids1.size() != result_topk * nq) { - std::cout << ids1.size() << " " << result_topk * nq << std::endl; - } - ASSERT_TRUE(ids1.size() == result_topk * nq); - ASSERT_TRUE(dist1.size() == result_topk * nq); - for (uint64_t i = 0; i < nq; i++) { - for (uint64_t k = 1; k < result_topk; k++) { - if (ascending) { - if (dist1[i * result_topk + k] < dist1[i * result_topk + k - 1]) { - std::cout << dist1[i * result_topk + k - 1] << " " << dist1[i * result_topk + k] << std::endl; - } - ASSERT_TRUE(dist1[i * result_topk + k] >= dist1[i * result_topk + k - 1]); - } else { - if (dist1[i * result_topk + k] > dist1[i * result_topk + k - 1]) { - std::cout << dist1[i * result_topk + k - 1] << " " << dist1[i * result_topk + k] << std::endl; - } - ASSERT_TRUE(dist1[i * result_topk + k] <= dist1[i * result_topk + k - 1]); - } - } - } -} - -TEST(DBSearchTest, MERGE_ARRAY_TEST) { - uint64_t NQ = 15; - uint64_t TOP_K = 64; - - /* test1, id1/dist1 valid, id2/dist2 empty */ - MergeTopkArrayTest(TOP_K, 0, NQ, TOP_K, true); - MergeTopkArrayTest(TOP_K, 0, NQ, TOP_K, false); - MergeTopkArrayTest(0, TOP_K, NQ, TOP_K, true); - MergeTopkArrayTest(0, TOP_K, NQ, TOP_K, false); - - /* test2, id1/dist1 valid, id2/dist2 valid */ - MergeTopkArrayTest(TOP_K, TOP_K, NQ, TOP_K, true); - MergeTopkArrayTest(TOP_K, TOP_K, NQ, TOP_K, false); - - /* test3, id1/dist1 small topk */ - MergeTopkArrayTest(TOP_K/2, TOP_K, NQ, TOP_K, true); - MergeTopkArrayTest(TOP_K/2, TOP_K, NQ, TOP_K, false); - MergeTopkArrayTest(TOP_K, TOP_K/2, NQ, TOP_K, true); - MergeTopkArrayTest(TOP_K, TOP_K/2, NQ, TOP_K, false); - - /* test4, id1/dist1 small topk, id2/dist2 small topk */ - MergeTopkArrayTest(TOP_K/2, TOP_K/3, NQ, TOP_K, true); - MergeTopkArrayTest(TOP_K/2, TOP_K/3, NQ, TOP_K, false); - MergeTopkArrayTest(TOP_K/3, TOP_K/2, NQ, TOP_K, true); - MergeTopkArrayTest(TOP_K/3, TOP_K/2, NQ, TOP_K, false); -} +//void MergeTopkArrayTest(uint64_t topk_1, uint64_t topk_2, uint64_t nq, uint64_t topk, bool ascending) { +// std::vector ids1, ids2; +// std::vector dist1, dist2; +// ms::ResultSet result; +// BuildResult(ids1, dist1, topk_1, topk, nq, ascending); +// BuildResult(ids2, dist2, topk_2, topk, nq, ascending); +// uint64_t result_topk = std::min(topk, topk_1 + topk_2); +// ms::XSearchTask::MergeTopkArray(ids1, dist1, topk_1, ids2, dist2, topk_2, nq, topk, ascending); +// if (ids1.size() != result_topk * nq) { +// std::cout << ids1.size() << " " << result_topk * nq << std::endl; +// } +// ASSERT_TRUE(ids1.size() == result_topk * nq); +// ASSERT_TRUE(dist1.size() == result_topk * nq); +// for (uint64_t i = 0; i < nq; i++) { +// for (uint64_t k = 1; k < result_topk; k++) { +// float f0 = dist1[i * topk + k - 1]; +// float f1 = dist1[i * topk + k]; +// if (ascending) { +// if (f1 < f0) { +// std::cout << f0 << " " << f1 << std::endl; +// } +// ASSERT_TRUE(f1 >= f0); +// } else { +// if (f1 > f0) { +// std::cout << f0 << " " << f1 << std::endl; +// } +// ASSERT_TRUE(f1 <= f0); +// } +// } +// } +//} + +//TEST(DBSearchTest, MERGE_ARRAY_TEST) { +// uint64_t NQ = 15; +// uint64_t TOP_K = 64; +// +// /* test1, id1/dist1 valid, id2/dist2 empty */ +// MergeTopkArrayTest(TOP_K, 0, NQ, TOP_K, true); +// MergeTopkArrayTest(TOP_K, 0, NQ, TOP_K, false); +// MergeTopkArrayTest(0, TOP_K, NQ, TOP_K, true); +// MergeTopkArrayTest(0, TOP_K, NQ, TOP_K, false); + +// /* test2, id1/dist1 valid, id2/dist2 valid */ +// MergeTopkArrayTest(TOP_K, TOP_K, NQ, TOP_K, true); +// MergeTopkArrayTest(TOP_K, TOP_K, NQ, TOP_K, false); +// +// /* test3, id1/dist1 small topk */ +// MergeTopkArrayTest(TOP_K/2, TOP_K, NQ, TOP_K, true); +// MergeTopkArrayTest(TOP_K/2, TOP_K, NQ, TOP_K, false); +// MergeTopkArrayTest(TOP_K, TOP_K/2, NQ, TOP_K, true); +// MergeTopkArrayTest(TOP_K, TOP_K/2, NQ, TOP_K, false); +// +// /* test4, id1/dist1 small topk, id2/dist2 small topk */ +// MergeTopkArrayTest(TOP_K/2, TOP_K/3, NQ, TOP_K, true); +// MergeTopkArrayTest(TOP_K/2, TOP_K/3, NQ, TOP_K, false); +// MergeTopkArrayTest(TOP_K/3, TOP_K/2, NQ, TOP_K, true); +// MergeTopkArrayTest(TOP_K/3, TOP_K/2, NQ, TOP_K, false); +//} TEST(DBSearchTest, REDUCE_PERF_TEST) { int32_t index_file_num = 478; /* sift1B dataset, index files num */ @@ -217,7 +235,7 @@ TEST(DBSearchTest, REDUCE_PERF_TEST) { /* generate testing data */ for (i = 0; i < index_file_num; i++) { - BuildResult(input_ids, input_distance, TOPK, NQ, ascending); + BuildResult(input_ids, input_distance, TOPK, TOPK, NQ, ascending); id_vec.push_back(input_ids); dist_vec.push_back(input_distance); } @@ -255,114 +273,114 @@ TEST(DBSearchTest, REDUCE_PERF_TEST) { rc1.RecordSection("reduce done"); - /////////////////////////////////////////////////////////////////////////////////////// - /* method-2 */ - std::vector> id_vec_2(index_file_num); - std::vector> dist_vec_2(index_file_num); - std::vector k_vec_2(index_file_num); - for (i = 0; i < index_file_num; i++) { - CopyResult(id_vec_2[i], dist_vec_2[i], top_k, id_vec[i], dist_vec[i], TOPK, nq); - k_vec_2[i] = top_k; - } - - std::string str2 = "Method-2 " + std::to_string(max_thread_num) + " " + - std::to_string(nq) + " " + std::to_string(top_k); - milvus::TimeRecorder rc2(str2); - - for (step = 1; step < index_file_num; step *= 2) { - for (i = 0; i + step < index_file_num; i += step * 2) { - ms::XSearchTask::MergeTopkArray(id_vec_2[i], dist_vec_2[i], k_vec_2[i], - id_vec_2[i + step], dist_vec_2[i + step], k_vec_2[i + step], - nq, top_k, ascending); - } - } - ms::XSearchTask::MergeTopkToResultSet(id_vec_2[0], - dist_vec_2[0], - k_vec_2[0], - nq, - top_k, - ascending, - final_result_2); - ASSERT_EQ(final_result_2.size(), nq); - - rc2.RecordSection("reduce done"); - - for (i = 0; i < nq; i++) { - ASSERT_EQ(final_result[i].size(), final_result_2[i].size()); - for (k = 0; k < final_result[i].size(); k++) { - if (final_result[i][k].first != final_result_2[i][k].first) { - std::cout << i << " " << k << std::endl; - } - ASSERT_EQ(final_result[i][k].first, final_result_2[i][k].first); - ASSERT_EQ(final_result[i][k].second, final_result_2[i][k].second); - } - } - - /////////////////////////////////////////////////////////////////////////////////////// - /* method-3 parallel */ - std::vector> id_vec_3(index_file_num); - std::vector> dist_vec_3(index_file_num); - std::vector k_vec_3(index_file_num); - for (i = 0; i < index_file_num; i++) { - CopyResult(id_vec_3[i], dist_vec_3[i], top_k, id_vec[i], dist_vec[i], TOPK, nq); - k_vec_3[i] = top_k; - } - - std::string str3 = "Method-3 " + std::to_string(max_thread_num) + " " + - std::to_string(nq) + " " + std::to_string(top_k); - milvus::TimeRecorder rc3(str3); - - for (step = 1; step < index_file_num; step *= 2) { - for (i = 0; i + step < index_file_num; i += step * 2) { - threads_list.push_back( - threadPool.enqueue(ms::XSearchTask::MergeTopkArray, - std::ref(id_vec_3[i]), - std::ref(dist_vec_3[i]), - std::ref(k_vec_3[i]), - std::ref(id_vec_3[i + step]), - std::ref(dist_vec_3[i + step]), - std::ref(k_vec_3[i + step]), - nq, - top_k, - ascending)); - } - - while (threads_list.size() > 0) { - int nready = 0; - for (auto it = threads_list.begin(); it != threads_list.end(); it = it) { - auto &p = *it; - std::chrono::milliseconds span(0); - if (p.wait_for(span) == std::future_status::ready) { - threads_list.erase(it++); - ++nready; - } else { - ++it; - } - } - - if (nready == 0) { - std::this_thread::yield(); - } - } - } - ms::XSearchTask::MergeTopkToResultSet(id_vec_3[0], - dist_vec_3[0], - k_vec_3[0], - nq, - top_k, - ascending, - final_result_3); - ASSERT_EQ(final_result_3.size(), nq); - - rc3.RecordSection("reduce done"); - - for (i = 0; i < nq; i++) { - ASSERT_EQ(final_result[i].size(), final_result_3[i].size()); - for (k = 0; k < final_result[i].size(); k++) { - ASSERT_EQ(final_result[i][k].first, final_result_3[i][k].first); - ASSERT_EQ(final_result[i][k].second, final_result_3[i][k].second); - } - } +// /////////////////////////////////////////////////////////////////////////////////////// +// /* method-2 */ +// std::vector> id_vec_2(index_file_num); +// std::vector> dist_vec_2(index_file_num); +// std::vector k_vec_2(index_file_num); +// for (i = 0; i < index_file_num; i++) { +// CopyResult(id_vec_2[i], dist_vec_2[i], top_k, id_vec[i], dist_vec[i], TOPK, nq); +// k_vec_2[i] = top_k; +// } +// +// std::string str2 = "Method-2 " + std::to_string(max_thread_num) + " " + +// std::to_string(nq) + " " + std::to_string(top_k); +// milvus::TimeRecorder rc2(str2); +// +// for (step = 1; step < index_file_num; step *= 2) { +// for (i = 0; i + step < index_file_num; i += step * 2) { +// ms::XSearchTask::MergeTopkArray(id_vec_2[i], dist_vec_2[i], k_vec_2[i], +// id_vec_2[i + step], dist_vec_2[i + step], k_vec_2[i + step], +// nq, top_k, ascending); +// } +// } +// ms::XSearchTask::MergeTopkToResultSet(id_vec_2[0], +// dist_vec_2[0], +// k_vec_2[0], +// nq, +// top_k, +// ascending, +// final_result_2); +// ASSERT_EQ(final_result_2.size(), nq); +// +// rc2.RecordSection("reduce done"); +// +// for (i = 0; i < nq; i++) { +// ASSERT_EQ(final_result[i].size(), final_result_2[i].size()); +// for (k = 0; k < final_result[i].size(); k++) { +// if (final_result[i][k].first != final_result_2[i][k].first) { +// std::cout << i << " " << k << std::endl; +// } +// ASSERT_EQ(final_result[i][k].first, final_result_2[i][k].first); +// ASSERT_EQ(final_result[i][k].second, final_result_2[i][k].second); +// } +// } +// +// /////////////////////////////////////////////////////////////////////////////////////// +// /* method-3 parallel */ +// std::vector> id_vec_3(index_file_num); +// std::vector> dist_vec_3(index_file_num); +// std::vector k_vec_3(index_file_num); +// for (i = 0; i < index_file_num; i++) { +// CopyResult(id_vec_3[i], dist_vec_3[i], top_k, id_vec[i], dist_vec[i], TOPK, nq); +// k_vec_3[i] = top_k; +// } +// +// std::string str3 = "Method-3 " + std::to_string(max_thread_num) + " " + +// std::to_string(nq) + " " + std::to_string(top_k); +// milvus::TimeRecorder rc3(str3); +// +// for (step = 1; step < index_file_num; step *= 2) { +// for (i = 0; i + step < index_file_num; i += step * 2) { +// threads_list.push_back( +// threadPool.enqueue(ms::XSearchTask::MergeTopkArray, +// std::ref(id_vec_3[i]), +// std::ref(dist_vec_3[i]), +// std::ref(k_vec_3[i]), +// std::ref(id_vec_3[i + step]), +// std::ref(dist_vec_3[i + step]), +// std::ref(k_vec_3[i + step]), +// nq, +// top_k, +// ascending)); +// } +// +// while (threads_list.size() > 0) { +// int nready = 0; +// for (auto it = threads_list.begin(); it != threads_list.end(); it = it) { +// auto &p = *it; +// std::chrono::milliseconds span(0); +// if (p.wait_for(span) == std::future_status::ready) { +// threads_list.erase(it++); +// ++nready; +// } else { +// ++it; +// } +// } +// +// if (nready == 0) { +// std::this_thread::yield(); +// } +// } +// } +// ms::XSearchTask::MergeTopkToResultSet(id_vec_3[0], +// dist_vec_3[0], +// k_vec_3[0], +// nq, +// top_k, +// ascending, +// final_result_3); +// ASSERT_EQ(final_result_3.size(), nq); +// +// rc3.RecordSection("reduce done"); +// +// for (i = 0; i < nq; i++) { +// ASSERT_EQ(final_result[i].size(), final_result_3[i].size()); +// for (k = 0; k < final_result[i].size(); k++) { +// ASSERT_EQ(final_result[i][k].first, final_result_3[i][k].first); +// ASSERT_EQ(final_result[i][k].second, final_result_3[i][k].second); +// } +// } } } }