resource_test.cpp 4.3 KB
Newer Older
W
wxyu 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/

#include "scheduler/resource/Resource.h"
#include "scheduler/resource/DiskResource.h"
#include "scheduler/resource/CpuResource.h"
#include "scheduler/resource/GpuResource.h"
#include "scheduler/task/Task.h"
#include "scheduler/task/TestTask.h"
#include "scheduler/ResourceFactory.h"
#include <gtest/gtest.h>


namespace zilliz {
namespace milvus {
namespace engine {

class ResourceTest : public testing::Test {
protected:
    void
    SetUp() override {
25 26 27
        disk_resource_ = ResourceFactory::Create("ssd", "DISK", 0);
        cpu_resource_ = ResourceFactory::Create("cpu", "CPU", 0);
        gpu_resource_ = ResourceFactory::Create("gpu", "GPU", 0);
28 29 30
        resources_.push_back(disk_resource_);
        resources_.push_back(cpu_resource_);
        resources_.push_back(gpu_resource_);
W
wxyu 已提交
31

32 33 34 35 36 37 38 39 40 41
        auto subscriber = [&](EventPtr event) {
            if (event->Type() == EventType::COPY_COMPLETED) {
                std::lock_guard<std::mutex> lock(load_mutex_);
                ++load_count_;
                cv_.notify_one();
            }

            if (event->Type() == EventType::FINISH_TASK) {
                std::lock_guard<std::mutex> lock(load_mutex_);
                ++exec_count_;
W
wxyu 已提交
42 43
                cv_.notify_one();
            }
W
wxyu 已提交
44 45 46 47 48
        };

        disk_resource_->RegisterSubscriber(subscriber);
        cpu_resource_->RegisterSubscriber(subscriber);
        gpu_resource_->RegisterSubscriber(subscriber);
W
wxyu 已提交
49 50 51 52 53 54 55 56 57 58 59

        disk_resource_->Start();
        cpu_resource_->Start();
        gpu_resource_->Start();
    }

    void
    TearDown() override {
        disk_resource_->Stop();
        cpu_resource_->Stop();
        gpu_resource_->Stop();
W
wxyu 已提交
60 61 62
    }

    void
63 64 65 66 67 68 69 70 71
    WaitLoader(uint64_t count) {
        std::unique_lock<std::mutex> lock(load_mutex_);
        cv_.wait(lock, [&] { return load_count_ == count; });
    }

    void
    WaitExecutor(uint64_t count) {
        std::unique_lock<std::mutex> lock(exec_mutex_);
        cv_.wait(lock, [&] { return exec_count_ == count; });
W
wxyu 已提交
72 73 74 75 76
    }

    ResourcePtr disk_resource_;
    ResourcePtr cpu_resource_;
    ResourcePtr gpu_resource_;
77 78 79 80 81
    std::vector<ResourcePtr> resources_;
    uint64_t load_count_ = 0;
    uint64_t exec_count_ = 0;
    std::mutex load_mutex_;
    std::mutex exec_mutex_;
W
wxyu 已提交
82 83 84 85
    std::condition_variable cv_;
};

TEST_F(ResourceTest, cpu_resource_test) {
86 87
    const uint64_t NUM = 100;
    std::vector<std::shared_ptr<TestTask>> tasks;
88
    TableFileSchemaPtr dummy = nullptr;
89
    for (uint64_t i = 0; i < NUM; ++i) {
90
        auto task = std::make_shared<TestTask>(dummy);
91 92 93 94
        tasks.push_back(task);
        cpu_resource_->task_table().Put(task);
    }

W
wxyu 已提交
95
    cpu_resource_->WakeupLoader();
96 97 98 99 100 101 102 103
    WaitLoader(NUM);
//    std::cout << "after WakeupLoader" << std::endl;
//    std::cout << cpu_resource_->task_table().Dump();

    for (uint64_t i = 0; i < NUM; ++i) {
        ASSERT_EQ(tasks[i]->load_count_, 1);
    }

W
wxyu 已提交
104
    cpu_resource_->WakeupExecutor();
105 106 107 108 109 110 111
    WaitExecutor(NUM);
//    std::cout << "after WakeupExecutor" << std::endl;
//    std::cout << cpu_resource_->task_table().Dump();

    for (uint64_t i = 0; i < NUM; ++i) {
        ASSERT_EQ(tasks[i]->exec_count_, 1);
    }
W
wxyu 已提交
112 113 114
}

TEST_F(ResourceTest, gpu_resource_test) {
115 116
    const uint64_t NUM = 100;
    std::vector<std::shared_ptr<TestTask>> tasks;
117
    TableFileSchemaPtr dummy = nullptr;
118
    for (uint64_t i = 0; i < NUM; ++i) {
119
        auto task = std::make_shared<TestTask>(dummy);
120 121 122 123
        tasks.push_back(task);
        gpu_resource_->task_table().Put(task);
    }

W
wxyu 已提交
124
    gpu_resource_->WakeupLoader();
125 126 127 128 129 130 131 132
    WaitLoader(NUM);
//    std::cout << "after WakeupLoader" << std::endl;
//    std::cout << cpu_resource_->task_table().Dump();

    for (uint64_t i = 0; i < NUM; ++i) {
        ASSERT_EQ(tasks[i]->load_count_, 1);
    }

W
wxyu 已提交
133
    gpu_resource_->WakeupExecutor();
134 135 136 137 138 139 140
    WaitExecutor(NUM);
//    std::cout << "after WakeupExecutor" << std::endl;
//    std::cout << cpu_resource_->task_table().Dump();

    for (uint64_t i = 0; i < NUM; ++i) {
        ASSERT_EQ(tasks[i]->exec_count_, 1);
    }
W
wxyu 已提交
141 142 143 144 145 146
}


}
}
}