提交 4aacfad2 编写于 作者: J jinhai

Merge branch 'branch-0.5.0' into 'branch-0.5.0'

MS-555 Remove old scheduler

See merge request megasearch/milvus!582

Former-commit-id: 7be0f04b6e77aa3fa19c418f3856edde83e617b7
......@@ -14,6 +14,7 @@ Please mark all change in change log and use the ticket from JIRA.
- MS-558 - Refine status code
- MS-562 - Add JobMgr and TaskCreator in Scheduler
- MS-566 - Refactor cmake
- MS-555 - Remove old scheduler
## New Feature
......
......@@ -24,8 +24,8 @@
#include "meta/MetaFactory.h"
#include "meta/MetaConsts.h"
#include "metrics/Metrics.h"
#include "scheduler/TaskScheduler.h"
#include "scheduler/context/DeleteContext.h"
#include "scheduler/job/SearchJob.h"
#include "scheduler/job/DeleteJob.h"
#include "scheduler/SchedInst.h"
#include "utils/TimeRecorder.h"
#include "utils/Log.h"
......@@ -133,12 +133,10 @@ Status DBImpl::DeleteTable(const std::string& table_id, const meta::DatesT& date
meta_ptr_->DeleteTable(table_id); //soft delete table
//scheduler will determine when to delete table files
TaskScheduler& scheduler = TaskScheduler::GetInstance();
DeleteContextPtr context = std::make_shared<DeleteContext>(table_id,
meta_ptr_,
ResMgrInst::GetInstance()->GetNumOfComputeResource());
scheduler.Schedule(context);
context->WaitAndDelete();
auto nres = ResMgrInst::GetInstance()->GetNumOfComputeResource();
scheduler::DeleteJobPtr job = std::make_shared<scheduler::DeleteJob>(0, table_id, meta_ptr_, nres);
JobMgrInst::GetInstance()->Put(job);
job->WaitAndDelete();
} else {
meta_ptr_->DropPartitionsByDates(table_id, dates);
}
......@@ -418,51 +416,50 @@ Status DBImpl::Size(uint64_t& result) {
Status DBImpl::QueryAsync(const std::string& table_id, const meta::TableFilesSchema& files,
uint64_t k, uint64_t nq, uint64_t nprobe, const float* vectors,
const meta::DatesT& dates, QueryResults& results) {
using namespace scheduler;
server::CollectQueryMetrics metrics(nq);
TimeRecorder rc("");
//step 1: get files to search
ENGINE_LOG_DEBUG << "Engine query begin, index file count: " << files.size() << " date range count: " << dates.size();
SearchContextPtr context = std::make_shared<SearchContext>(k, nq, nprobe, vectors);
for (auto &file : files) {
SearchJobPtr job = std::make_shared<SearchJob>(0, k, nq, nprobe, vectors);
for (auto &file : files) {
TableFileSchemaPtr file_ptr = std::make_shared<meta::TableFileSchema>(file);
context->AddIndexFile(file_ptr);
job->AddIndexFile(file_ptr);
}
//step 2: put search task to scheduler
TaskScheduler& scheduler = TaskScheduler::GetInstance();
scheduler.Schedule(context);
context->WaitResult();
if (!context->GetStatus().ok()) {
return context->GetStatus();
JobMgrInst::GetInstance()->Put(job);
job->WaitResult();
if (!job->GetStatus().ok()) {
return job->GetStatus();
}
//step 3: print time cost information
double load_cost = context->LoadCost();
double search_cost = context->SearchCost();
double reduce_cost = context->ReduceCost();
std::string load_info = TimeRecorder::GetTimeSpanStr(load_cost);
std::string search_info = TimeRecorder::GetTimeSpanStr(search_cost);
std::string reduce_info = TimeRecorder::GetTimeSpanStr(reduce_cost);
if(search_cost > 0.0 || reduce_cost > 0.0) {
double total_cost = load_cost + search_cost + reduce_cost;
double load_percent = load_cost/total_cost;
double search_percent = search_cost/total_cost;
double reduce_percent = reduce_cost/total_cost;
ENGINE_LOG_DEBUG << "Engine load index totally cost: " << load_info << " percent: " << load_percent*100 << "%";
ENGINE_LOG_DEBUG << "Engine search index totally cost: " << search_info << " percent: " << search_percent*100 << "%";
ENGINE_LOG_DEBUG << "Engine reduce topk totally cost: " << reduce_info << " percent: " << reduce_percent*100 << "%";
} else {
ENGINE_LOG_DEBUG << "Engine load cost: " << load_info
<< " search cost: " << search_info
<< " reduce cost: " << reduce_info;
}
// double load_cost = context->LoadCost();
// double search_cost = context->SearchCost();
// double reduce_cost = context->ReduceCost();
// std::string load_info = TimeRecorder::GetTimeSpanStr(load_cost);
// std::string search_info = TimeRecorder::GetTimeSpanStr(search_cost);
// std::string reduce_info = TimeRecorder::GetTimeSpanStr(reduce_cost);
// if(search_cost > 0.0 || reduce_cost > 0.0) {
// double total_cost = load_cost + search_cost + reduce_cost;
// double load_percent = load_cost/total_cost;
// double search_percent = search_cost/total_cost;
// double reduce_percent = reduce_cost/total_cost;
//
// ENGINE_LOG_DEBUG << "Engine load index totally cost: " << load_info << " percent: " << load_percent*100 << "%";
// ENGINE_LOG_DEBUG << "Engine search index totally cost: " << search_info << " percent: " << search_percent*100 << "%";
// ENGINE_LOG_DEBUG << "Engine reduce topk totally cost: " << reduce_info << " percent: " << reduce_percent*100 << "%";
// } else {
// ENGINE_LOG_DEBUG << "Engine load cost: " << load_info
// << " search cost: " << search_info
// << " reduce cost: " << reduce_info;
// }
//step 4: construct results
results = context->GetResult();
results = job->GetResult();
rc.ElapseFromBegin("Engine query totally cost");
return Status::OK();
......
......@@ -29,7 +29,6 @@
#include <thread>
#include <list>
#include <set>
#include "scheduler/context/SearchContext.h"
namespace zilliz {
......
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "TaskDispatchQueue.h"
#include "TaskDispatchStrategy.h"
#include "utils/Error.h"
#include "utils/Log.h"
namespace zilliz {
namespace milvus {
namespace engine {
void
TaskDispatchQueue::Put(const ScheduleContextPtr &context) {
std::unique_lock <std::mutex> lock(mtx);
full_.wait(lock, [this] { return (queue_.size() < capacity_); });
if(context == nullptr) {
queue_.push_front(nullptr);
empty_.notify_all();
return;
}
TaskDispatchStrategy::Schedule(context, queue_);
empty_.notify_all();
}
ScheduleTaskPtr
TaskDispatchQueue::Take() {
std::unique_lock <std::mutex> lock(mtx);
empty_.wait(lock, [this] { return !queue_.empty(); });
ScheduleTaskPtr front(queue_.front());
queue_.pop_front();
full_.notify_all();
return front;
}
size_t
TaskDispatchQueue::Size() {
std::lock_guard <std::mutex> lock(mtx);
return queue_.size();
}
ScheduleTaskPtr
TaskDispatchQueue::Front() {
std::unique_lock <std::mutex> lock(mtx);
empty_.wait(lock, [this] { return !queue_.empty(); });
if (queue_.empty()) {
std::string error_msg = "blocking queue empty";
SERVER_LOG_ERROR << error_msg;
throw server::ServerException(SERVER_BLOCKING_QUEUE_EMPTY, error_msg);
}
ScheduleTaskPtr front(queue_.front());
return front;
}
ScheduleTaskPtr
TaskDispatchQueue::Back() {
std::unique_lock <std::mutex> lock(mtx);
empty_.wait(lock, [this] { return !queue_.empty(); });
if (queue_.empty()) {
std::string error_msg = "blocking queue empty";
SERVER_LOG_ERROR << error_msg;
throw server::ServerException(SERVER_BLOCKING_QUEUE_EMPTY, error_msg);
}
ScheduleTaskPtr back(queue_.back());
return back;
}
bool
TaskDispatchQueue::Empty() {
std::unique_lock <std::mutex> lock(mtx);
return queue_.empty();
}
void
TaskDispatchQueue::SetCapacity(const size_t capacity) {
capacity_ = (capacity > 0 ? capacity : capacity_);
}
}
}
}
\ No newline at end of file
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "context/IScheduleContext.h"
#include "task/IScheduleTask.h"
#include <condition_variable>
#include <iostream>
#include <queue>
#include <list>
namespace zilliz {
namespace milvus {
namespace engine {
class TaskDispatchQueue {
public:
TaskDispatchQueue() : mtx(), full_(), empty_() {}
TaskDispatchQueue(const TaskDispatchQueue &rhs) = delete;
TaskDispatchQueue &operator=(const TaskDispatchQueue &rhs) = delete;
using TaskList = std::list<ScheduleTaskPtr>;
void Put(const ScheduleContextPtr &context);
ScheduleTaskPtr Take();
ScheduleTaskPtr Front();
ScheduleTaskPtr Back();
size_t Size();
bool Empty();
void SetCapacity(const size_t capacity);
private:
mutable std::mutex mtx;
std::condition_variable full_;
std::condition_variable empty_;
TaskList queue_;
size_t capacity_ = 1000000;
};
}
}
}
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "TaskDispatchStrategy.h"
#include "context/SearchContext.h"
#include "context/DeleteContext.h"
#include "task/IndexLoadTask.h"
#include "task/DeleteTask.h"
#include "cache/CpuCacheMgr.h"
#include "utils/Error.h"
#include "utils/Log.h"
namespace zilliz {
namespace milvus {
namespace engine {
class ReuseCacheIndexStrategy {
public:
bool Schedule(const SearchContextPtr &context, std::list<ScheduleTaskPtr>& task_list) {
if(context == nullptr) {
ENGINE_LOG_ERROR << "Task Dispatch context doesn't exist";
return false;
}
SearchContext::Id2IndexMap index_files = context->GetIndexMap();
//some index loader alread exists
for(auto& task : task_list) {
if(task->type() != ScheduleTaskType::kIndexLoad) {
continue;
}
IndexLoadTaskPtr loader = std::static_pointer_cast<IndexLoadTask>(task);
if(index_files.find(loader->file_->id_) != index_files.end()){
ENGINE_LOG_DEBUG << "Append SearchContext to exist IndexLoaderContext";
index_files.erase(loader->file_->id_);
loader->search_contexts_.push_back(context);
}
}
//index_files still contains some index files, create new loader
for(auto& pair : index_files) {
ENGINE_LOG_DEBUG << "Create new IndexLoaderContext for: " << pair.second->location_;
IndexLoadTaskPtr new_loader = std::make_shared<IndexLoadTask>();
new_loader->search_contexts_.push_back(context);
new_loader->file_ = pair.second;
auto index = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(pair.second->location_);
if(index != nullptr) {
//if the index file has been in memory, increase its priority
task_list.push_front(new_loader);
} else {
//index file not in memory, put it to tail
task_list.push_back(new_loader);
}
}
return true;
}
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DeleteTableStrategy {
public:
bool Schedule(const DeleteContextPtr &context, std::list<ScheduleTaskPtr> &task_list) {
if (context == nullptr) {
ENGINE_LOG_ERROR << "Task Dispatch context doesn't exist";
return false;
}
DeleteTaskPtr delete_task = std::make_shared<DeleteTask>(context);
if(task_list.empty()) {
task_list.push_back(delete_task);
return true;
}
std::string table_id = context->table_id();
//put delete task to proper position
//for example: task_list has 10 IndexLoadTask, only the No.5 IndexLoadTask is for table1
//if user want to delete table1, the DeleteTask will be insert into No.6 position
for(std::list<ScheduleTaskPtr>::reverse_iterator iter = task_list.rbegin(); iter != task_list.rend(); ++iter) {
if((*iter)->type() != ScheduleTaskType::kIndexLoad) {
continue;
}
IndexLoadTaskPtr loader = std::static_pointer_cast<IndexLoadTask>(*iter);
if(loader->file_->table_id_ != table_id) {
continue;
}
task_list.insert(iter.base(), delete_task);
return true;
}
//no task is searching this table, put DeleteTask to front of list so that the table will be delete asap
task_list.push_front(delete_task);
return true;
}
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
bool TaskDispatchStrategy::Schedule(const ScheduleContextPtr &context_ptr,
std::list<zilliz::milvus::engine::ScheduleTaskPtr> &task_list) {
if(context_ptr == nullptr) {
ENGINE_LOG_ERROR << "Task Dispatch context doesn't exist";
return false;
}
switch(context_ptr->type()) {
case ScheduleContextType::kSearch: {
SearchContextPtr search_context = std::static_pointer_cast<SearchContext>(context_ptr);
ReuseCacheIndexStrategy strategy;
return strategy.Schedule(search_context, task_list);
}
case ScheduleContextType::kDelete: {
DeleteContextPtr delete_context = std::static_pointer_cast<DeleteContext>(context_ptr);
DeleteTableStrategy strategy;
return strategy.Schedule(delete_context, task_list);
}
default:
ENGINE_LOG_ERROR << "Invalid schedule task type";
return false;
}
}
}
}
}
\ No newline at end of file
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "context/IScheduleContext.h"
#include "task/IScheduleTask.h"
#include <list>
namespace zilliz {
namespace milvus {
namespace engine {
class TaskDispatchStrategy {
public:
static bool Schedule(const ScheduleContextPtr &context_ptr, std::list<ScheduleTaskPtr>& task_list);
};
}
}
}
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "server/ServerConfig.h"
#include "TaskScheduler.h"
#include "TaskDispatchQueue.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "db/engine/EngineFactory.h"
#include "scheduler/task/TaskConvert.h"
#include "scheduler/SchedInst.h"
#include "scheduler/ResourceFactory.h"
namespace zilliz {
namespace milvus {
namespace engine {
TaskScheduler::TaskScheduler()
: stopped_(true) {
Start();
}
TaskScheduler::~TaskScheduler() {
Stop();
}
TaskScheduler& TaskScheduler::GetInstance() {
static TaskScheduler s_instance;
return s_instance;
}
bool
TaskScheduler::Start() {
if(!stopped_) {
SERVER_LOG_INFO << "Task Scheduler isn't started";
return true;
}
stopped_ = false;
task_queue_.SetCapacity(2);
task_dispatch_thread_ = std::make_shared<std::thread>(&TaskScheduler::TaskDispatchWorker, this);
task_thread_ = std::make_shared<std::thread>(&TaskScheduler::TaskWorker, this);
return true;
}
bool
TaskScheduler::Stop() {
if(stopped_) {
SERVER_LOG_INFO << "Task Scheduler already stopped";
return true;
}
if(task_dispatch_thread_) {
task_dispatch_queue_.Put(nullptr);
task_dispatch_thread_->join();
task_dispatch_thread_ = nullptr;
}
if(task_thread_) {
task_queue_.Put(nullptr);
task_thread_->join();
task_thread_ = nullptr;
}
stopped_ = true;
return true;
}
bool
TaskScheduler::Schedule(ScheduleContextPtr context) {
task_dispatch_queue_.Put(context);
return true;
}
bool
TaskScheduler::TaskDispatchWorker() {
while(true) {
ScheduleTaskPtr task_ptr = task_dispatch_queue_.Take();
if(task_ptr == nullptr) {
SERVER_LOG_INFO << "Stop db task dispatch thread";
return true;
}
// TODO: Put task into Disk-TaskTable
auto task = TaskConvert(task_ptr);
auto disk_list = ResMgrInst::GetInstance()->GetDiskResources();
if (!disk_list.empty()) {
if (auto disk = disk_list[0].lock()) {
disk->task_table().Put(task);
}
}
}
}
bool
TaskScheduler::TaskWorker() {
while(true) {
// TODO: expected blocking forever
ScheduleTaskPtr task_ptr = task_queue_.Take();
if(task_ptr == nullptr) {
SERVER_LOG_INFO << "Stop db task worker thread";
return true;
}
//execute task
ScheduleTaskPtr next_task = task_ptr->Execute();
if(next_task != nullptr) {
task_queue_.Put(next_task);
}
}
}
}
}
}
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "context/IScheduleContext.h"
#include "task/IScheduleTask.h"
#include "TaskDispatchQueue.h"
#include "utils/BlockingQueue.h"
#include <thread>
namespace zilliz {
namespace milvus {
namespace engine {
class TaskScheduler {
private:
TaskScheduler();
virtual ~TaskScheduler();
public:
static TaskScheduler& GetInstance();
bool Schedule(ScheduleContextPtr context);
private:
bool Start();
bool Stop();
bool TaskDispatchWorker();
bool TaskWorker();
private:
std::shared_ptr<std::thread> task_dispatch_thread_;
std::shared_ptr<std::thread> task_thread_;
TaskDispatchQueue task_dispatch_queue_;
using TaskQueue = server::BlockingQueue<ScheduleTaskPtr>;
TaskQueue task_queue_;
bool stopped_ = true;
};
}
}
}
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "DeleteContext.h"
namespace zilliz {
namespace milvus {
namespace engine {
DeleteContext::DeleteContext(const std::string &table_id, meta::MetaPtr &meta_ptr, uint64_t num_resource)
: IScheduleContext(ScheduleContextType::kDelete),
table_id_(table_id),
meta_ptr_(meta_ptr),
num_resource_(num_resource) {
}
void DeleteContext::WaitAndDelete() {
#ifdef NEW_SCHEDULER
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [&] { return done_resource == num_resource_; });
meta_ptr_->DeleteTableFiles(table_id_);
#endif
}
void DeleteContext::ResourceDone() {
{
std::lock_guard<std::mutex> lock(mutex_);
++done_resource;
}
cv_.notify_one();
}
}
}
}
\ No newline at end of file
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "IScheduleContext.h"
#include "db/meta/Meta.h"
#include <mutex>
#include <condition_variable>
namespace zilliz {
namespace milvus {
namespace engine {
class DeleteContext : public IScheduleContext {
public:
DeleteContext(const std::string& table_id, meta::MetaPtr& meta_ptr, uint64_t num_resource);
std::string table_id() const { return table_id_; }
meta::MetaPtr meta() const { return meta_ptr_; }
void WaitAndDelete();
void ResourceDone();
private:
std::string table_id_;
meta::MetaPtr meta_ptr_;
uint64_t num_resource_;
uint64_t done_resource = 0;
std::mutex mutex_;
std::condition_variable cv_;
};
using DeleteContextPtr = std::shared_ptr<DeleteContext>;
}
}
}
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <memory>
namespace zilliz {
namespace milvus {
namespace engine {
enum class ScheduleContextType {
kUnknown = 0,
kSearch,
kDelete,
};
class IScheduleContext {
public:
IScheduleContext(ScheduleContextType type)
: type_(type) {
}
virtual ~IScheduleContext() = default;
ScheduleContextType type() const { return type_; }
protected:
ScheduleContextType type_;
};
using ScheduleContextPtr = std::shared_ptr<IScheduleContext>;
}
}
}
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "SearchContext.h"
#include "utils/Log.h"
#include <chrono>
namespace zilliz {
namespace milvus {
namespace engine {
SearchContext::SearchContext(uint64_t topk, uint64_t nq, uint64_t nprobe, const float* vectors)
: IScheduleContext(ScheduleContextType::kSearch),
topk_(topk),
nq_(nq),
nprobe_(nprobe),
vectors_(vectors) {
//use current time to identify this context
std::chrono::system_clock::time_point tp = std::chrono::system_clock::now();
long id = tp.time_since_epoch().count();
identity_ = std::to_string(id);
}
bool
SearchContext::AddIndexFile(TableFileSchemaPtr& index_file) {
std::unique_lock <std::mutex> lock(mtx_);
if(index_file == nullptr || map_index_files_.find(index_file->id_) != map_index_files_.end()) {
return false;
}
SERVER_LOG_DEBUG << "SearchContext " << identity_ << " add index file: " << index_file->id_;
map_index_files_[index_file->id_] = index_file;
return true;
}
void
SearchContext::IndexSearchDone(size_t index_id) {
std::unique_lock <std::mutex> lock(mtx_);
map_index_files_.erase(index_id);
done_cond_.notify_all();
SERVER_LOG_DEBUG << "SearchContext " << identity_ << " finish index file: " << index_id;
}
void
SearchContext::WaitResult() {
std::unique_lock <std::mutex> lock(mtx_);
done_cond_.wait(lock, [this] { return map_index_files_.empty(); });
SERVER_LOG_DEBUG << "SearchContext " << identity_ << " all done";
}
}
}
}
\ No newline at end of file
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "IScheduleContext.h"
#include "db/meta/MetaTypes.h"
#include <unordered_map>
#include <vector>
#include <memory>
#include <condition_variable>
namespace zilliz {
namespace milvus {
namespace engine {
using TableFileSchemaPtr = std::shared_ptr<meta::TableFileSchema>;
class SearchContext : public IScheduleContext {
public:
SearchContext(uint64_t topk, uint64_t nq, uint64_t nprobe, const float* vectors);
bool AddIndexFile(TableFileSchemaPtr& index_file);
uint64_t topk() const { return topk_; }
uint64_t nq() const { return nq_; }
uint64_t nprobe() const { return nprobe_; }
const float* vectors() const { return vectors_; }
using Id2IndexMap = std::unordered_map<size_t, TableFileSchemaPtr>;
const Id2IndexMap& GetIndexMap() const { return map_index_files_; }
using Id2DistanceMap = std::vector<std::pair<int64_t, double>>;
using ResultSet = std::vector<Id2DistanceMap>;
const ResultSet& GetResult() const { return result_; }
ResultSet& GetResult() { return result_; }
const std::string& Identity() const { return identity_; }
const Status& GetStatus() const { return status_; }
Status& GetStatus() { return status_; }
void IndexSearchDone(size_t index_id);
void WaitResult();
void AccumLoadCost(double span) { time_cost_load_ += span; }
void AccumSearchCost(double span) { time_cost_search_ += span; }
void AccumReduceCost(double span) { time_cost_reduce_ += span; }
double LoadCost() const { return time_cost_load_; }
double SearchCost() const { return time_cost_search_; }
double ReduceCost() const { return time_cost_reduce_; }
private:
uint64_t topk_ = 0;
uint64_t nq_ = 0;
uint64_t nprobe_ = 10;
const float* vectors_ = nullptr;
Id2IndexMap map_index_files_;
ResultSet result_;
std::mutex mtx_;
std::condition_variable done_cond_;
std::string identity_; //for debug
Status status_;
double time_cost_load_ = 0.0; //time cost for load all index files, unit: us
double time_cost_search_ = 0.0; //time cost for entire search, unit: us
double time_cost_reduce_ = 0.0; //time cost for entire reduce, unit: us
};
using SearchContextPtr = std::shared_ptr<SearchContext>;
}
}
}
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "DeleteTask.h"
namespace zilliz {
namespace milvus {
namespace engine {
DeleteTask::DeleteTask(const DeleteContextPtr& context)
: IScheduleTask(ScheduleTaskType::kDelete),
context_(context) {
}
std::shared_ptr<IScheduleTask> DeleteTask::Execute() {
return nullptr;
}
}
}
}
\ No newline at end of file
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "IScheduleTask.h"
#include "db/scheduler/context/DeleteContext.h"
namespace zilliz {
namespace milvus {
namespace engine {
class DeleteTask : public IScheduleTask {
public:
DeleteTask(const DeleteContextPtr& context);
virtual std::shared_ptr<IScheduleTask> Execute() override;
public:
DeleteContextPtr context_;
};
using DeleteTaskPtr = std::shared_ptr<DeleteTask>;
}
}
}
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <memory>
namespace zilliz {
namespace milvus {
namespace engine {
enum class ScheduleTaskType {
kUnknown = 0,
kIndexLoad,
kSearch,
kDelete,
};
class IScheduleTask {
public:
IScheduleTask(ScheduleTaskType type)
: type_(type) {
}
virtual ~IScheduleTask() = default;
ScheduleTaskType type() const { return type_; }
virtual std::shared_ptr<IScheduleTask> Execute() = 0;
protected:
ScheduleTaskType type_;
};
using ScheduleTaskPtr = std::shared_ptr<IScheduleTask>;
}
}
}
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "IndexLoadTask.h"
#include "SearchTask.h"
#include "db/engine/EngineFactory.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include "metrics/Metrics.h"
namespace zilliz {
namespace milvus {
namespace engine {
IndexLoadTask::IndexLoadTask()
: IScheduleTask(ScheduleTaskType::kIndexLoad) {
}
std::shared_ptr<IScheduleTask> IndexLoadTask::Execute() {
return nullptr;
}
}
}
}
\ No newline at end of file
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "IScheduleTask.h"
#include "db/scheduler/context/SearchContext.h"
namespace zilliz {
namespace milvus {
namespace engine {
class IndexLoadTask : public IScheduleTask {
public:
IndexLoadTask();
virtual std::shared_ptr<IScheduleTask> Execute() override;
public:
TableFileSchemaPtr file_;
std::vector<SearchContextPtr> search_contexts_;
};
using IndexLoadTaskPtr = std::shared_ptr<IndexLoadTask>;
}
}
}
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "SearchTask.h"
#include "metrics/Metrics.h"
#include "utils/Log.h"
#include "utils/TimeRecorder.h"
#include <thread>
namespace zilliz {
namespace milvus {
namespace engine {
SearchTask::SearchTask()
: IScheduleTask(ScheduleTaskType::kSearch) {
}
std::shared_ptr<IScheduleTask> SearchTask::Execute() {
return nullptr;
}
}
}
}
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "IScheduleTask.h"
#include "db/scheduler/context/SearchContext.h"
#include "db/engine/ExecutionEngine.h"
namespace zilliz {
namespace milvus {
namespace engine {
class SearchTask : public IScheduleTask {
public:
SearchTask();
virtual std::shared_ptr<IScheduleTask> Execute() override;
public:
size_t index_id_ = 0;
int file_type_ = 0; //for metrics
ExecutionEnginePtr index_engine_;
std::vector<SearchContextPtr> search_contexts_;
};
using SearchTaskPtr = std::shared_ptr<SearchTask>;
}
}
}
\ No newline at end of file
......@@ -73,6 +73,8 @@ private:
ResourceMgrPtr res_mgr_ = nullptr;
};
using JobMgrPtr = std::shared_ptr<JobMgr>;
}
}
}
......@@ -33,6 +33,9 @@ std::mutex ResMgrInst::mutex_;
SchedulerPtr SchedInst::instance = nullptr;
std::mutex SchedInst::mutex_;
scheduler::JobMgrPtr JobMgrInst::instance = nullptr;
std::mutex JobMgrInst::mutex_;
void
load_simple_config() {
server::ConfigNode &config = server::ServerConfig::GetInstance().GetConfig(server::CONFIG_RESOURCE);
......@@ -151,12 +154,14 @@ StartSchedulerService() {
// load_advance_config();
ResMgrInst::GetInstance()->Start();
SchedInst::GetInstance()->Start();
JobMgrInst::GetInstance()->Start();
}
void
StopSchedulerService() {
ResMgrInst::GetInstance()->Stop();
JobMgrInst::GetInstance()->Stop();
SchedInst::GetInstance()->Stop();
ResMgrInst::GetInstance()->Stop();
}
}
}
......
......@@ -19,6 +19,7 @@
#include "ResourceMgr.h"
#include "Scheduler.h"
#include "JobMgr.h"
#include <mutex>
#include <memory>
......@@ -64,6 +65,24 @@ private:
static std::mutex mutex_;
};
class JobMgrInst {
public:
static scheduler::JobMgrPtr
GetInstance() {
if (instance == nullptr) {
std::lock_guard<std::mutex> lock(mutex_);
if (instance == nullptr) {
instance = std::make_shared<scheduler::JobMgr>(ResMgrInst::GetInstance());
}
}
return instance;
}
private:
static scheduler::JobMgrPtr instance;
static std::mutex mutex_;
};
void
StartSchedulerService();
......
......@@ -15,7 +15,9 @@
// specific language governing permissions and limitations
// under the License.
#include <src/scheduler/tasklabel/BroadcastLabel.h>
#include "TaskCreator.h"
#include "tasklabel/DefaultLabel.h"
namespace zilliz {
......@@ -43,6 +45,8 @@ TaskCreator::Create(const SearchJobPtr &job) {
std::vector<TaskPtr> tasks;
for (auto &index_file : job->index_files()) {
auto task = std::make_shared<XSearchTask>(index_file.second);
task->label() = std::make_shared<engine::DefaultLabel>();
task->job_ = job;
tasks.emplace_back(task);
}
......@@ -52,8 +56,10 @@ TaskCreator::Create(const SearchJobPtr &job) {
std::vector<TaskPtr>
TaskCreator::Create(const DeleteJobPtr &job) {
std::vector<TaskPtr> tasks;
// auto task = std::make_shared<XDeleteTask>(job);
// tasks.emplace_back(task);
auto task = std::make_shared<XDeleteTask>(job);
task->label() = std::make_shared<engine::BroadcastLabel>();
task->job_ = job;
tasks.emplace_back(task);
return tasks;
}
......
......@@ -62,6 +62,7 @@ private:
};
using JobPtr = std::shared_ptr<Job>;
using JobWPtr = std::weak_ptr<Job>;
}
}
......
......@@ -28,7 +28,12 @@ SearchJob::SearchJob(zilliz::milvus::scheduler::JobId id,
uint64_t topk,
uint64_t nq,
uint64_t nprobe,
const float *vectors) : Job(id, JobType::SEARCH) {}
const float *vectors)
: Job(id, JobType::SEARCH),
topk_(topk),
nq_(nq),
nprobe_(nprobe),
vectors_(vectors) {}
bool
SearchJob::AddIndexFile(const TableFileSchemaPtr &index_file) {
......@@ -64,6 +69,11 @@ SearchJob::GetResult() {
return result_;
}
Status&
SearchJob::GetStatus() {
return status_;
}
}
}
......
......@@ -59,6 +59,9 @@ public:
ResultSet &
GetResult();
Status &
GetStatus();
public:
uint64_t
topk() const {
......@@ -94,6 +97,7 @@ private:
Id2IndexMap index_files_;
// TODO: column-base better ?
ResultSet result_;
Status status_;
std::mutex mutex_;
std::condition_variable cv_;
......
......@@ -23,8 +23,8 @@ namespace zilliz {
namespace milvus {
namespace engine {
XDeleteTask::XDeleteTask(DeleteContextPtr &delete_context)
: Task(TaskType::DeleteTask), delete_context_ptr_(delete_context) {}
XDeleteTask::XDeleteTask(const scheduler::DeleteJobPtr &delete_job)
: Task(TaskType::DeleteTask), delete_job_(delete_job) {}
void
XDeleteTask::Load(LoadType type, uint8_t device_id) {
......@@ -33,7 +33,7 @@ XDeleteTask::Load(LoadType type, uint8_t device_id) {
void
XDeleteTask::Execute() {
delete_context_ptr_->ResourceDone();
delete_job_->ResourceDone();
}
}
......
......@@ -17,7 +17,7 @@
#pragma once
#include <src/db/scheduler/context/DeleteContext.h>
#include "scheduler/job/DeleteJob.h"
#include "Task.h"
......@@ -28,7 +28,7 @@ namespace engine {
class XDeleteTask : public Task {
public:
explicit
XDeleteTask(DeleteContextPtr &delete_context);
XDeleteTask(const scheduler::DeleteJobPtr &job);
void
Load(LoadType type, uint8_t device_id) override;
......@@ -37,7 +37,7 @@ public:
Execute() override;
public:
DeleteContextPtr delete_context_ptr_;
scheduler::DeleteJobPtr delete_job_;
};
}
......
......@@ -22,6 +22,7 @@
#include "utils/Log.h"
#include <thread>
#include "scheduler/job/SearchJob.h"
namespace zilliz {
......@@ -94,7 +95,7 @@ CollectFileMetrics(int file_type, size_t file_size) {
}
}
XSearchTask::XSearchTask(TableFileSchemaPtr file)
XSearchTask::XSearchTask(meta::TableFileSchemaPtr file)
: Task(TaskType::SearchTask), file_(file) {
if (file_) {
index_engine_ = EngineFactory::Build(file_->dimension_,
......@@ -143,9 +144,10 @@ XSearchTask::Load(LoadType type, uint8_t device_id) {
s = Status(SERVER_UNEXPECTED_ERROR, error_msg);
}
for (auto &context : search_contexts_) {
context->IndexSearchDone(file_->id_);//mark as done avoid dead lock, even failed
context->GetStatus() = s;
if (auto job = job_.lock()){
auto search_job = std::static_pointer_cast<scheduler::SearchJob>(job);
search_job->SearchDone(file_->id_);
search_job->GetStatus() = s;
}
return;
......@@ -156,16 +158,16 @@ XSearchTask::Load(LoadType type, uint8_t device_id) {
std::string info = "Load file id:" + std::to_string(file_->id_) + " file type:" + std::to_string(file_->file_type_)
+ " size:" + std::to_string(file_size) + " bytes from location: " + file_->location_ + " totally cost";
double span = rc.ElapseFromBegin(info);
for (auto &context : search_contexts_) {
context->AccumLoadCost(span);
}
// for (auto &context : search_contexts_) {
// context->AccumLoadCost(span);
// }
CollectFileMetrics(file_->file_type_, file_size);
//step 2: return search task for later execution
index_id_ = file_->id_;
index_type_ = file_->file_type_;
search_contexts_.swap(search_contexts_);
// search_contexts_.swap(search_contexts_);
}
void
......@@ -174,8 +176,8 @@ XSearchTask::Execute() {
return;
}
ENGINE_LOG_DEBUG << "Searching in file id:" << index_id_ << " with "
<< search_contexts_.size() << " tasks";
// ENGINE_LOG_DEBUG << "Searching in file id:" << index_id_ << " with "
// << search_contexts_.size() << " tasks";
TimeRecorder rc("DoSearch file id:" + std::to_string(index_id_));
......@@ -183,16 +185,18 @@ XSearchTask::Execute() {
std::vector<long> output_ids;
std::vector<float> output_distance;
for (auto &context : search_contexts_) {
if (auto job = job_.lock()) {
auto search_job = std::static_pointer_cast<scheduler::SearchJob>(job);
//step 1: allocate memory
uint64_t nq = context->nq();
uint64_t topk = context->topk();
uint64_t nprobe = context->nprobe();
const float* vectors = context->vectors();
uint64_t nq = search_job->nq();
uint64_t topk = search_job->topk();
uint64_t nprobe = search_job->nprobe();
const float* vectors = search_job->vectors();
output_ids.resize(topk * nq);
output_distance.resize(topk * nq);
std::string hdr = "context " + context->Identity() +
std::string hdr = "job " + std::to_string(search_job->id()) +
" nq " + std::to_string(nq) +
" topk " + std::to_string(topk);
......@@ -201,30 +205,29 @@ XSearchTask::Execute() {
index_engine_->Search(nq, vectors, topk, nprobe, output_distance.data(), output_ids.data());
double span = rc.RecordSection(hdr + ", do search");
context->AccumSearchCost(span);
// search_job->AccumSearchCost(span);
//step 3: cluster result
SearchContext::ResultSet result_set;
scheduler::ResultSet result_set;
auto spec_k = index_engine_->Count() < topk ? index_engine_->Count() : topk;
XSearchTask::ClusterResult(output_ids, output_distance, nq, spec_k, result_set);
span = rc.RecordSection(hdr + ", cluster result");
context->AccumReduceCost(span);
// search_job->AccumReduceCost(span);
// step 4: pick up topk result
XSearchTask::TopkResult(result_set, topk, metric_l2, context->GetResult());
XSearchTask::TopkResult(result_set, topk, metric_l2, search_job->GetResult());
span = rc.RecordSection(hdr + ", reduce topk");
context->AccumReduceCost(span);
// search_job->AccumReduceCost(span);
} catch (std::exception &ex) {
ENGINE_LOG_ERROR << "SearchTask encounter exception: " << ex.what();
context->IndexSearchDone(index_id_);//mark as done avoid dead lock, even search failed
continue;
// search_job->IndexSearchDone(index_id_);//mark as done avoid dead lock, even search failed
}
//step 5: notify to send result to client
context->IndexSearchDone(index_id_);
search_job->SearchDone(index_id_);
}
rc.ElapseFromBegin("totally cost");
......@@ -237,7 +240,7 @@ Status XSearchTask::ClusterResult(const std::vector<long> &output_ids,
const std::vector<float> &output_distance,
uint64_t nq,
uint64_t topk,
SearchContext::ResultSet &result_set) {
scheduler::ResultSet &result_set) {
if (output_ids.size() < nq * topk || output_distance.size() < nq * topk) {
std::string msg = "Invalid id array size: " + std::to_string(output_ids.size()) +
" distance array size: " + std::to_string(output_distance.size());
......@@ -250,7 +253,7 @@ Status XSearchTask::ClusterResult(const std::vector<long> &output_ids,
std::function<void(size_t, size_t)> reduce_worker = [&](size_t from_index, size_t to_index) {
for (auto i = from_index; i < to_index; i++) {
SearchContext::Id2DistanceMap id_distance;
scheduler::Id2DistanceMap id_distance;
id_distance.reserve(topk);
for (auto k = 0; k < topk; k++) {
uint64_t index = i * topk + k;
......@@ -272,8 +275,8 @@ Status XSearchTask::ClusterResult(const std::vector<long> &output_ids,
return Status::OK();
}
Status XSearchTask::MergeResult(SearchContext::Id2DistanceMap &distance_src,
SearchContext::Id2DistanceMap &distance_target,
Status XSearchTask::MergeResult(scheduler::Id2DistanceMap &distance_src,
scheduler::Id2DistanceMap &distance_target,
uint64_t topk,
bool ascending) {
//Note: the score_src and score_target are already arranged by score in ascending order
......@@ -290,7 +293,7 @@ Status XSearchTask::MergeResult(SearchContext::Id2DistanceMap &distance_src,
size_t src_count = distance_src.size();
size_t target_count = distance_target.size();
SearchContext::Id2DistanceMap distance_merged;
scheduler::Id2DistanceMap distance_merged;
distance_merged.reserve(topk);
size_t src_index = 0, target_index = 0;
while (true) {
......@@ -346,10 +349,10 @@ Status XSearchTask::MergeResult(SearchContext::Id2DistanceMap &distance_src,
return Status::OK();
}
Status XSearchTask::TopkResult(SearchContext::ResultSet &result_src,
Status XSearchTask::TopkResult(scheduler::ResultSet &result_src,
uint64_t topk,
bool ascending,
SearchContext::ResultSet &result_target) {
scheduler::ResultSet &result_target) {
if (result_target.empty()) {
result_target.swap(result_src);
return Status::OK();
......@@ -363,8 +366,8 @@ Status XSearchTask::TopkResult(SearchContext::ResultSet &result_src,
std::function<void(size_t, size_t)> ReduceWorker = [&](size_t from_index, size_t to_index) {
for (size_t i = from_index; i < to_index; i++) {
SearchContext::Id2DistanceMap &score_src = result_src[i];
SearchContext::Id2DistanceMap &score_target = result_target[i];
scheduler::Id2DistanceMap &score_src = result_src[i];
scheduler::Id2DistanceMap &score_target = result_target[i];
XSearchTask::MergeResult(score_src, score_target, topk, ascending);
}
};
......
......@@ -18,6 +18,8 @@
#pragma once
#include "Task.h"
#include "db/meta/MetaTypes.h"
#include "scheduler/job/SearchJob.h"
namespace zilliz {
......@@ -28,7 +30,7 @@ namespace engine {
class XSearchTask : public Task {
public:
explicit
XSearchTask(TableFileSchemaPtr file);
XSearchTask(meta::TableFileSchemaPtr file);
void
Load(LoadType type, uint8_t device_id) override;
......@@ -41,20 +43,20 @@ public:
const std::vector<float> &output_distence,
uint64_t nq,
uint64_t topk,
SearchContext::ResultSet &result_set);
scheduler::ResultSet &result_set);
static Status MergeResult(SearchContext::Id2DistanceMap &distance_src,
SearchContext::Id2DistanceMap &distance_target,
static Status MergeResult(scheduler::Id2DistanceMap &distance_src,
scheduler::Id2DistanceMap &distance_target,
uint64_t topk,
bool ascending);
static Status TopkResult(SearchContext::ResultSet &result_src,
static Status TopkResult(scheduler::ResultSet &result_src,
uint64_t topk,
bool ascending,
SearchContext::ResultSet &result_target);
scheduler::ResultSet &result_target);
public:
TableFileSchemaPtr file_;
meta::TableFileSchemaPtr file_;
size_t index_id_ = 0;
int index_type_ = 0;
......
......@@ -17,9 +17,9 @@
#pragma once
#include "db/scheduler/context/SearchContext.h"
#include "db/scheduler/task/IScheduleTask.h"
#include "scheduler/tasklabel/TaskLabel.h"
#include "scheduler/job/Job.h"
#include "utils/Status.h"
#include "Path.h"
#include <string>
......@@ -84,7 +84,8 @@ public:
public:
Path task_path_;
std::vector<SearchContextPtr> search_contexts_;
// std::vector<SearchContextPtr> search_contexts_;
scheduler::JobWPtr job_;
TaskType type_;
TaskLabelPtr label_ = nullptr;
};
......
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "TaskConvert.h"
#include "scheduler/tasklabel/DefaultLabel.h"
#include "scheduler/tasklabel/BroadcastLabel.h"
namespace zilliz {
namespace milvus {
namespace engine {
TaskPtr
TaskConvert(const ScheduleTaskPtr &schedule_task) {
switch (schedule_task->type()) {
case ScheduleTaskType::kIndexLoad: {
auto load_task = std::static_pointer_cast<IndexLoadTask>(schedule_task);
auto task = std::make_shared<XSearchTask>(load_task->file_);
task->label() = std::make_shared<DefaultLabel>();
task->search_contexts_ = load_task->search_contexts_;
return task;
}
case ScheduleTaskType::kDelete: {
auto delete_task = std::static_pointer_cast<DeleteTask>(schedule_task);
auto task = std::make_shared<XDeleteTask>(delete_task->context_);
task->label() = std::make_shared<BroadcastLabel>();
return task;
}
default: {
// TODO: unexpected !!!
return nullptr;
}
}
}
}
}
}
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "db/scheduler/task/DeleteTask.h"
#include "db/scheduler/task/IndexLoadTask.h"
#include "Task.h"
#include "SearchTask.h"
#include "DeleteTask.h"
namespace zilliz {
namespace milvus {
namespace engine {
TaskPtr
TaskConvert(const ScheduleTaskPtr &schedule_task);
}
}
}
......@@ -25,7 +25,7 @@ namespace milvus {
namespace engine {
TestTask::TestTask(TableFileSchemaPtr &file) : XSearchTask(file) {}
TestTask::TestTask(meta::TableFileSchemaPtr &file) : XSearchTask(file) {}
void
TestTask::Load(LoadType type, uint8_t device_id) {
......
......@@ -26,7 +26,8 @@ namespace engine {
class TestTask : public XSearchTask {
public:
TestTask(TableFileSchemaPtr& file);
explicit
TestTask(meta::TableFileSchemaPtr& file);
public:
void
......
......@@ -17,11 +17,13 @@
# under the License.
#-------------------------------------------------------------------------------
aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR} test_files)
include_directories("${CUDA_TOOLKIT_ROOT_DIR}/include")
link_directories("${CUDA_TOOLKIT_ROOT_DIR}/lib64")
set(db_test_files
${common_files}
${test_files}
......
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gtest/gtest.h>
#include <thread>
#include "utils/easylogging++.h"
#include <boost/filesystem.hpp>
#include "db/scheduler/TaskScheduler.h"
#include "db/scheduler/TaskDispatchStrategy.h"
#include "db/scheduler/TaskDispatchQueue.h"
#include "db/scheduler/task/SearchTask.h"
#include "db/scheduler/task/DeleteTask.h"
#include "db/scheduler/task/IndexLoadTask.h"
using namespace zilliz::milvus;
namespace {
engine::TableFileSchemaPtr CreateTabileFileStruct(size_t id, const std::string& table_id) {
auto file = std::make_shared<engine::meta::TableFileSchema>();
file->id_ = id;
file->table_id_ = table_id;
return file;
}
}
TEST(DBSchedulerTest, TASK_QUEUE_TEST) {
engine::TaskDispatchQueue queue;
queue.SetCapacity(1000);
queue.Put(nullptr);
ASSERT_EQ(queue.Size(), 1UL);
auto ptr = queue.Take();
ASSERT_EQ(ptr, nullptr);
ASSERT_TRUE(queue.Empty());
engine::SearchContextPtr context_ptr = std::make_shared<engine::SearchContext>(1, 1, 10, nullptr);
for(size_t i = 0; i < 10; i++) {
auto file = CreateTabileFileStruct(i, "tbl");
context_ptr->AddIndexFile(file);
}
queue.Put(context_ptr);
ASSERT_EQ(queue.Size(), 10);
auto index_files = context_ptr->GetIndexMap();
ptr = queue.Front();
ASSERT_EQ(ptr->type(), engine::ScheduleTaskType::kIndexLoad);
engine::IndexLoadTaskPtr load_task = std::static_pointer_cast<engine::IndexLoadTask>(ptr);
ASSERT_EQ(load_task->file_->id_, index_files.begin()->first);
ptr = queue.Back();
ASSERT_EQ(ptr->type(), engine::ScheduleTaskType::kIndexLoad);
load_task->Execute();
}
TEST(DBSchedulerTest, SEARCH_SCHEDULER_TEST) {
std::list<engine::ScheduleTaskPtr> task_list;
bool ret = engine::TaskDispatchStrategy::Schedule(nullptr, task_list);
ASSERT_FALSE(ret);
for(size_t i = 10; i < 30; i++) {
engine::IndexLoadTaskPtr task_ptr = std::make_shared<engine::IndexLoadTask>();
task_ptr->file_ = CreateTabileFileStruct(i, "tbl");
task_list.push_back(task_ptr);
}
engine::SearchContextPtr context_ptr = std::make_shared<engine::SearchContext>(1, 1, 10, nullptr);
for(size_t i = 0; i < 20; i++) {
auto file = CreateTabileFileStruct(i, "tbl");
context_ptr->AddIndexFile(file);
}
ret = engine::TaskDispatchStrategy::Schedule(context_ptr, task_list);
ASSERT_TRUE(ret);
ASSERT_EQ(task_list.size(), 30);
}
TEST(DBSchedulerTest, DELETE_SCHEDULER_TEST) {
std::list<engine::ScheduleTaskPtr> task_list;
bool ret = engine::TaskDispatchStrategy::Schedule(nullptr, task_list);
ASSERT_FALSE(ret);
const std::string table_id = "to_delete_table";
for(size_t i = 0; i < 10; i++) {
engine::IndexLoadTaskPtr task_ptr = std::make_shared<engine::IndexLoadTask>();
task_ptr->file_ = CreateTabileFileStruct(i, table_id);
task_list.push_back(task_ptr);
}
for(size_t i = 0; i < 10; i++) {
engine::IndexLoadTaskPtr task_ptr = std::make_shared<engine::IndexLoadTask>();
task_ptr->file_ = CreateTabileFileStruct(i, "other_table");
task_list.push_back(task_ptr);
}
engine::meta::MetaPtr meta_ptr;
engine::DeleteContextPtr context_ptr = std::make_shared<engine::DeleteContext>(table_id, meta_ptr, 0);
ret = engine::TaskDispatchStrategy::Schedule(context_ptr, task_list);
ASSERT_TRUE(ret);
ASSERT_EQ(task_list.size(), 21);
auto temp_list = task_list;
for(size_t i = 0; ; i++) {
engine::ScheduleTaskPtr task_ptr = temp_list.front();
temp_list.pop_front();
if(task_ptr->type() == engine::ScheduleTaskType::kDelete) {
ASSERT_EQ(i, 10);
break;
}
}
context_ptr = std::make_shared<engine::DeleteContext>("no_task_table", meta_ptr, 0);
ret = engine::TaskDispatchStrategy::Schedule(context_ptr, task_list);
ASSERT_TRUE(ret);
ASSERT_EQ(task_list.size(), 22);
engine::ScheduleTaskPtr task_ptr = task_list.front();
ASSERT_EQ(task_ptr->type(), engine::ScheduleTaskType::kDelete);
}
......@@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.
#include "db/scheduler/task/SearchTask.h"
#include "server/ServerConfig.h"
#include "utils/TimeRecorder.h"
......@@ -50,9 +49,9 @@ void BuildResult(uint64_t nq,
}
}
void CheckResult(const engine::SearchContext::Id2DistanceMap& src_1,
const engine::SearchContext::Id2DistanceMap& src_2,
const engine::SearchContext::Id2DistanceMap& target,
void CheckResult(const scheduler::Id2DistanceMap& src_1,
const scheduler::Id2DistanceMap& src_2,
const scheduler::Id2DistanceMap& target,
bool ascending) {
for(uint64_t i = 0; i < target.size() - 1; i++) {
if(ascending) {
......@@ -81,7 +80,7 @@ void CheckResult(const engine::SearchContext::Id2DistanceMap& src_1,
void CheckCluster(const std::vector<long>& target_ids,
const std::vector<float>& target_distence,
const engine::SearchContext::ResultSet& src_result,
const scheduler::ResultSet& src_result,
int64_t nq,
int64_t topk) {
ASSERT_EQ(src_result.size(), nq);
......@@ -98,7 +97,7 @@ void CheckCluster(const std::vector<long>& target_ids,
}
}
void CheckTopkResult(const engine::SearchContext::ResultSet& src_result,
void CheckTopkResult(const scheduler::ResultSet& src_result,
bool ascending,
int64_t nq,
int64_t topk) {
......@@ -127,7 +126,7 @@ TEST(DBSearchTest, TOPK_TEST) {
bool ascending = true;
std::vector<long> target_ids;
std::vector<float> target_distence;
engine::SearchContext::ResultSet src_result;
scheduler::ResultSet src_result;
auto status = engine::XSearchTask::ClusterResult(target_ids, target_distence, NQ, TOP_K, src_result);
ASSERT_FALSE(status.ok());
ASSERT_TRUE(src_result.empty());
......@@ -137,7 +136,7 @@ TEST(DBSearchTest, TOPK_TEST) {
ASSERT_TRUE(status.ok());
ASSERT_EQ(src_result.size(), NQ);
engine::SearchContext::ResultSet target_result;
scheduler::ResultSet target_result;
status = engine::XSearchTask::TopkResult(target_result, TOP_K, ascending, target_result);
ASSERT_TRUE(status.ok());
......@@ -179,7 +178,7 @@ TEST(DBSearchTest, MERGE_TEST) {
std::vector<float> target_distence;
std::vector<long> src_ids;
std::vector<float> src_distence;
engine::SearchContext::ResultSet src_result, target_result;
scheduler::ResultSet src_result, target_result;
uint64_t src_count = 5, target_count = 8;
BuildResult(1, src_count, ascending, src_ids, src_distence);
......@@ -190,8 +189,8 @@ TEST(DBSearchTest, MERGE_TEST) {
ASSERT_TRUE(status.ok());
{
engine::SearchContext::Id2DistanceMap src = src_result[0];
engine::SearchContext::Id2DistanceMap target = target_result[0];
scheduler::Id2DistanceMap src = src_result[0];
scheduler::Id2DistanceMap target = target_result[0];
status = engine::XSearchTask::MergeResult(src, target, 10, ascending);
ASSERT_TRUE(status.ok());
ASSERT_EQ(target.size(), 10);
......@@ -199,8 +198,8 @@ TEST(DBSearchTest, MERGE_TEST) {
}
{
engine::SearchContext::Id2DistanceMap src = src_result[0];
engine::SearchContext::Id2DistanceMap target;
scheduler::Id2DistanceMap src = src_result[0];
scheduler::Id2DistanceMap target;
status = engine::XSearchTask::MergeResult(src, target, 10, ascending);
ASSERT_TRUE(status.ok());
ASSERT_EQ(target.size(), src_count);
......@@ -209,8 +208,8 @@ TEST(DBSearchTest, MERGE_TEST) {
}
{
engine::SearchContext::Id2DistanceMap src = src_result[0];
engine::SearchContext::Id2DistanceMap target = target_result[0];
scheduler::Id2DistanceMap src = src_result[0];
scheduler::Id2DistanceMap target = target_result[0];
status = engine::XSearchTask::MergeResult(src, target, 30, ascending);
ASSERT_TRUE(status.ok());
ASSERT_EQ(target.size(), src_count + target_count);
......@@ -218,8 +217,8 @@ TEST(DBSearchTest, MERGE_TEST) {
}
{
engine::SearchContext::Id2DistanceMap target = src_result[0];
engine::SearchContext::Id2DistanceMap src = target_result[0];
scheduler::Id2DistanceMap target = src_result[0];
scheduler::Id2DistanceMap src = target_result[0];
status = engine::XSearchTask::MergeResult(src, target, 30, ascending);
ASSERT_TRUE(status.ok());
ASSERT_EQ(target.size(), src_count + target_count);
......@@ -235,7 +234,7 @@ TEST(DBSearchTest, PARALLEL_CLUSTER_TEST) {
bool ascending = true;
std::vector<long> target_ids;
std::vector<float> target_distence;
engine::SearchContext::ResultSet src_result;
scheduler::ResultSet src_result;
auto DoCluster = [&](int64_t nq, int64_t topk) {
TimeRecorder rc("DoCluster");
......@@ -270,11 +269,11 @@ TEST(DBSearchTest, PARALLEL_TOPK_TEST) {
std::vector<long> target_ids;
std::vector<float> target_distence;
engine::SearchContext::ResultSet src_result;
scheduler::ResultSet src_result;
std::vector<long> insufficient_ids;
std::vector<float> insufficient_distence;
engine::SearchContext::ResultSet insufficient_result;
scheduler::ResultSet insufficient_result;
auto DoTopk = [&](int64_t nq, int64_t topk,int64_t insufficient_topk, bool ascending) {
src_result.clear();
......
......@@ -80,7 +80,7 @@ void DBTest::SetUp() {
auto res_mgr = engine::ResMgrInst::GetInstance();
res_mgr->Clear();
res_mgr->Add(engine::ResourceFactory::Create("disk", "DISK", 0, true, false));
res_mgr->Add(engine::ResourceFactory::Create("cpu", "CPU", 0, true, true));
res_mgr->Add(engine::ResourceFactory::Create("cpu", "CPU", 0, true, false));
res_mgr->Add(engine::ResourceFactory::Create("gtx1660", "GPU", 0, true, true));
auto default_conn = engine::Connection("IO", 500.0);
......@@ -90,6 +90,8 @@ void DBTest::SetUp() {
res_mgr->Start();
engine::SchedInst::GetInstance()->Start();
engine::JobMgrInst::GetInstance()->Start();
auto options = GetOptions();
db_ = engine::DBFactory::Build(options);
}
......@@ -100,8 +102,9 @@ void DBTest::TearDown() {
BaseTest::TearDown();
engine::ResMgrInst::GetInstance()->Stop();
engine::JobMgrInst::GetInstance()->Stop();
engine::SchedInst::GetInstance()->Stop();
engine::ResMgrInst::GetInstance()->Stop();
auto options = GetOptions();
boost::filesystem::remove_all(options.meta.path);
......
......@@ -46,7 +46,7 @@ TEST(NormalTest, INST_TEST) {
const uint64_t NUM_TASK = 1000;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
meta::TableFileSchemaPtr dummy = nullptr;
auto disks = res_mgr->GetDiskResources();
ASSERT_FALSE(disks.empty());
......
......@@ -187,7 +187,7 @@ TEST_F(ResourceMgrAdvanceTest, REGISTER_SUBSCRIBER) {
flag = true;
};
mgr1_->RegisterSubscriber(callback);
TableFileSchemaPtr dummy = nullptr;
meta::TableFileSchemaPtr dummy = nullptr;
disk_res->task_table().Put(std::make_shared<TestTask>(dummy));
sleep(1);
ASSERT_TRUE(flag);
......
......@@ -180,7 +180,7 @@ protected:
TEST_F(ResourceAdvanceTest, DISK_RESOURCE_TEST) {
const uint64_t NUM = 100;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
meta::TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
auto task = std::make_shared<TestTask>(dummy);
tasks.push_back(task);
......@@ -205,7 +205,7 @@ TEST_F(ResourceAdvanceTest, DISK_RESOURCE_TEST) {
TEST_F(ResourceAdvanceTest, CPU_RESOURCE_TEST) {
const uint64_t NUM = 100;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
meta::TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
auto task = std::make_shared<TestTask>(dummy);
tasks.push_back(task);
......@@ -230,7 +230,7 @@ TEST_F(ResourceAdvanceTest, CPU_RESOURCE_TEST) {
TEST_F(ResourceAdvanceTest, GPU_RESOURCE_TEST) {
const uint64_t NUM = 100;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
meta::TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
auto task = std::make_shared<TestTask>(dummy);
tasks.push_back(task);
......@@ -255,7 +255,7 @@ TEST_F(ResourceAdvanceTest, GPU_RESOURCE_TEST) {
TEST_F(ResourceAdvanceTest, TEST_RESOURCE_TEST) {
const uint64_t NUM = 100;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = nullptr;
meta::TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < NUM; ++i) {
auto task = std::make_shared<TestTask>(dummy);
tasks.push_back(task);
......
......@@ -157,7 +157,7 @@ insert_dummy_index_into_gpu_cache(uint64_t device_id) {
TEST_F(SchedulerTest, ON_LOAD_COMPLETED) {
const uint64_t NUM = 10;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = std::make_shared<meta::TableFileSchema>();
meta::TableFileSchemaPtr dummy = std::make_shared<meta::TableFileSchema>();
dummy->location_ = "location";
insert_dummy_index_into_gpu_cache(1);
......@@ -177,7 +177,7 @@ TEST_F(SchedulerTest, ON_LOAD_COMPLETED) {
TEST_F(SchedulerTest, PUSH_TASK_TO_NEIGHBOUR_RANDOMLY_TEST) {
const uint64_t NUM = 10;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy1 = std::make_shared<meta::TableFileSchema>();
meta::TableFileSchemaPtr dummy1 = std::make_shared<meta::TableFileSchema>();
dummy1->location_ = "location";
tasks.clear();
......@@ -248,7 +248,7 @@ protected:
TEST_F(SchedulerTest2, SPECIFIED_RESOURCE_TEST) {
const uint64_t NUM = 10;
std::vector<std::shared_ptr<TestTask>> tasks;
TableFileSchemaPtr dummy = std::make_shared<meta::TableFileSchema>();
meta::TableFileSchemaPtr dummy = std::make_shared<meta::TableFileSchema>();
dummy->location_ = "location";
for (uint64_t i = 0; i < NUM; ++i) {
......
......@@ -169,7 +169,7 @@ class TaskTableBaseTest : public ::testing::Test {
protected:
void
SetUp() override {
TableFileSchemaPtr dummy = nullptr;
meta::TableFileSchemaPtr dummy = nullptr;
invalid_task_ = nullptr;
task1_ = std::make_shared<TestTask>(dummy);
task2_ = std::make_shared<TestTask>(dummy);
......@@ -339,7 +339,7 @@ class TaskTableAdvanceTest : public ::testing::Test {
protected:
void
SetUp() override {
TableFileSchemaPtr dummy = nullptr;
meta::TableFileSchemaPtr dummy = nullptr;
for (uint64_t i = 0; i < 8; ++i) {
auto task = std::make_shared<TestTask>(dummy);
table1_.Put(task);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册