提交 77660194 编写于 作者: Y Yu Kun

fix PushTaskLocality bug


Former-commit-id: ef7b229f035318dc6434e480e8f51cd774960229
上级 589763ed
......@@ -70,15 +70,15 @@ resource_config:
type: GPU
memory: 6
device_id: 0
enable_loader: false
enable_executor: false
enable_loader: true
enable_executor: true
gtx1660:
type: GPU
memory: 6
device_id: 1
enable_loader: false
enable_executor: false
enable_loader: true
enable_executor: true
# connection list, length: 0~N
# format: -${resource_name}===${resource_name}
......
......@@ -124,6 +124,8 @@ Status ExecutionEngineImpl::Serialize() {
}
Status ExecutionEngineImpl::Load(bool to_cache) {
std::cout << "load\n";
index_ = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(location_);
bool already_in_cache = (index_ != nullptr);
if (!index_) {
......@@ -147,6 +149,7 @@ Status ExecutionEngineImpl::Load(bool to_cache) {
}
Status ExecutionEngineImpl::CopyToGpu(uint64_t device_id) {
std::cout << "copytogpu\n";
auto index = zilliz::milvus::cache::GpuCacheMgr::GetInstance(device_id)->GetIndex(location_);
bool already_in_cache = (index != nullptr);
if (already_in_cache) {
......
......@@ -120,15 +120,20 @@ Scheduler::OnCopyCompleted(const EventPtr &event) {
auto task = load_completed_event->task_table_item_->task;
auto search_task = std::static_pointer_cast<XSearchTask>(task);
auto location = search_task->index_engine_->GetLocation();
bool moved = false;
for (auto i = 0; i < res_mgr_.lock()->GetNumGpuResource(); ++i) {
auto index = zilliz::milvus::cache::GpuCacheMgr::GetInstance(i)->GetIndex(location);
if (index != nullptr) {
moved = true;
auto dest_resource = res_mgr_.lock()->GetResource(ResourceType::GPU, i);
Action::PushTaskToResource(load_completed_event->task_table_item_->task, dest_resource);
break;
}
}
if (not moved) {
Action::PushTaskToNeighbourRandomly(task, resource);
}
}
break;
}
......
......@@ -21,6 +21,8 @@ get_neighbours(const ResourcePtr &self) {
if (not node) continue;
auto resource = std::static_pointer_cast<Resource>(node);
// if (not resource->HasExecutor()) continue;
neighbours.emplace_back(resource);
}
return neighbours;
......@@ -31,11 +33,16 @@ void
Action::PushTaskToNeighbourRandomly(const TaskPtr &task,
const ResourcePtr &self) {
auto neighbours = get_neighbours(self);
std::random_device rd;
std::mt19937 mt(rd());
std::uniform_int_distribution<uint64_t> dist(0, neighbours.size() - 1);
if (not neighbours.empty()) {
std::random_device rd;
std::mt19937 mt(rd());
std::uniform_int_distribution<uint64_t> dist(0, neighbours.size() - 1);
neighbours[dist(mt)]->task_table().Put(task);
} else {
//TODO: process
}
neighbours[dist(mt)]->task_table().Put(task);
}
void
......
......@@ -23,11 +23,11 @@ namespace {
const std::string TABLE_NAME = GetTableName();
constexpr int64_t TABLE_DIMENSION = 512;
constexpr int64_t TABLE_INDEX_FILE_SIZE = 768;
constexpr int64_t BATCH_ROW_COUNT = 100000;
constexpr int64_t NQ = 10;
constexpr int64_t BATCH_ROW_COUNT = 1000000;
constexpr int64_t NQ = 100;
constexpr int64_t TOP_K = 10;
constexpr int64_t SEARCH_TARGET = 5000; //change this value, result is different
constexpr int64_t ADD_VECTOR_LOOP = 10;
constexpr int64_t ADD_VECTOR_LOOP = 1;
constexpr int64_t SECONDS_EACH_HOUR = 3600;
#define BLOCK_SPLITER std::cout << "===========================================" << std::endl;
......@@ -172,15 +172,20 @@ namespace {
record_array.push_back(pair.second);
}
std::vector<TopKQueryResult> topk_query_result_array;
{
TimeRecorder rc(phase_name);
Status stat = conn->Search(TABLE_NAME, record_array, query_range_array, TOP_K, 10, topk_query_result_array);
std::cout << "SearchVector function call status: " << stat.ToString() << std::endl;
auto start = std::chrono::high_resolution_clock::now();
for (auto i = 0; i < 5; ++i) {
std::vector<TopKQueryResult> topk_query_result_array;
{
TimeRecorder rc(phase_name);
Status stat = conn->Search(TABLE_NAME, record_array, query_range_array, TOP_K, 32, topk_query_result_array);
std::cout << "SearchVector function call status: " << stat.ToString() << std::endl;
}
}
auto finish = std::chrono::high_resolution_clock::now();
std::cout << "SEARCHVECTOR COST: " << std::chrono::duration_cast<std::chrono::duration<double>>(finish - start).count() << "s\n";
PrintSearchResult(search_record_array, topk_query_result_array);
CheckResult(search_record_array, topk_query_result_array);
// PrintSearchResult(search_record_array, topk_query_result_array);
// CheckResult(search_record_array, topk_query_result_array);
}
}
......@@ -276,10 +281,10 @@ ClientTest::Test(const std::string& address, const std::string& port) {
{//search vectors without index
Sleep(2);
int64_t row_count = 0;
Status stat = conn->CountTable(TABLE_NAME, row_count);
std::cout << TABLE_NAME << "(" << row_count << " rows)" << std::endl;
DoSearch(conn, search_record_array, "Search without index");
// int64_t row_count = 0;
// Status stat = conn->CountTable(TABLE_NAME, row_count);
// std::cout << TABLE_NAME << "(" << row_count << " rows)" << std::endl;
// DoSearch(conn, search_record_array, "Search without index");
}
{//wait unit build index finish
......@@ -287,7 +292,7 @@ ClientTest::Test(const std::string& address, const std::string& port) {
IndexParam index;
index.table_name = TABLE_NAME;
index.index_type = IndexType::gpu_ivfflat;
index.nlist = 1000;
index.nlist = 16384;
index.metric_type = 1;
Status stat = conn->CreateIndex(index);
std::cout << "CreateIndex function call status: " << stat.ToString() << std::endl;
......@@ -304,6 +309,7 @@ ClientTest::Test(const std::string& address, const std::string& port) {
{//search vectors after build index finish
DoSearch(conn, search_record_array, "Search after build index finish");
// std::cout << conn->DumpTaskTables() << std::endl;
}
{//delete index
......
......@@ -329,6 +329,21 @@ ClientProxy::ServerStatus() const {
}
}
std::string
ClientProxy::DumpTaskTables() const {
if (channel_ == nullptr) {
return "not connected to server";
}
try {
std::string dummy;
Status status = client_ptr_->Cmd(dummy, "tasktable");
return dummy;
} catch (std::exception &ex) {
return "connection lost";
}
}
Status
ClientProxy::DeleteByRange(milvus::Range &range, const std::string &table_name) {
try {
......
......@@ -68,6 +68,9 @@ public:
virtual std::string
ServerStatus() const override;
virtual std::string
DumpTaskTables() const override;
virtual Status
DeleteByRange(Range &range,
const std::string &table_name) override;
......
......@@ -318,6 +318,9 @@ class Connection {
virtual std::string
ServerStatus() const = 0;
virtual std::string
DumpTaskTables() const = 0;
/**
* @brief delete tables by range
*
......
......@@ -114,6 +114,11 @@ ConnectionImpl::ServerStatus() const {
return client_proxy_->ServerStatus();
}
std::string
ConnectionImpl::DumpTaskTables() const {
return client_proxy_->DumpTaskTables();
}
Status
ConnectionImpl::DeleteByRange(Range &range,
const std::string &table_name) {
......
......@@ -70,6 +70,9 @@ public:
virtual std::string
ServerStatus() const override;
virtual std::string
DumpTaskTables() const override;
virtual Status
DeleteByRange(Range &range,
const std::string &table_name) override;
......
......@@ -42,5 +42,5 @@ add_subdirectory(server)
add_subdirectory(db)
add_subdirectory(knowhere)
add_subdirectory(metrics)
#add_subdirectory(scheduler)
add_subdirectory(scheduler)
#add_subdirectory(storage)
\ No newline at end of file
......@@ -24,7 +24,7 @@ using namespace zilliz::milvus;
namespace {
static const char* TABLE_NAME = "test_group";
static constexpr int64_t TABLE_DIM = 256;
static constexpr int64_t TABLE_DIM = 512;
static constexpr int64_t VECTOR_COUNT = 250000;
static constexpr int64_t INSERT_LOOP = 10000;
static constexpr int64_t SECONDS_EACH_HOUR = 3600;
......
......@@ -64,9 +64,12 @@ void DBTest::SetUp() {
res_mgr->Clear();
res_mgr->Add(engine::ResourceFactory::Create("disk", "DISK", 0, true, false));
res_mgr->Add(engine::ResourceFactory::Create("cpu", "CPU", 0, true, true));
res_mgr->Add(engine::ResourceFactory::Create("gtx1660", "GPU", 0, true, true));
auto default_conn = engine::Connection("IO", 500.0);
auto PCIE = engine::Connection("IO", 11000.0);
res_mgr->Connect("disk", "cpu", default_conn);
res_mgr->Connect("cpu", "gtx1660", PCIE);
res_mgr->Start();
engine::SchedInst::GetInstance()->Start();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册