TaskDispatchStrategy.cpp 5.0 KB
Newer Older
G
groot 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
#include "TaskDispatchStrategy.h"
#include "context/SearchContext.h"
#include "context/DeleteContext.h"
#include "task/IndexLoadTask.h"
#include "task/DeleteTask.h"
#include "cache/CpuCacheMgr.h"
#include "utils/Error.h"
#include "db/Log.h"

namespace zilliz {
namespace milvus {
namespace engine {

class ReuseCacheIndexStrategy {
public:
    bool Schedule(const SearchContextPtr &context, std::list<ScheduleTaskPtr>& task_list) {
        if(context == nullptr) {
J
jinhai 已提交
23
            ENGINE_LOG_ERROR << "Task Dispatch context doesn't exist";
G
groot 已提交
24 25 26 27 28 29 30 31 32 33 34 35
            return false;
        }

        SearchContext::Id2IndexMap index_files = context->GetIndexMap();
        //some index loader alread exists
        for(auto& task : task_list) {
            if(task->type() != ScheduleTaskType::kIndexLoad) {
                continue;
            }

            IndexLoadTaskPtr loader = std::static_pointer_cast<IndexLoadTask>(task);
            if(index_files.find(loader->file_->id_) != index_files.end()){
36
                ENGINE_LOG_DEBUG << "Append SearchContext to exist IndexLoaderContext";
G
groot 已提交
37 38 39 40 41 42 43
                index_files.erase(loader->file_->id_);
                loader->search_contexts_.push_back(context);
            }
        }

        //index_files still contains some index files, create new loader
        for(auto& pair : index_files) {
44
            ENGINE_LOG_DEBUG << "Create new IndexLoaderContext for: " << pair.second->location_;
G
groot 已提交
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
            IndexLoadTaskPtr new_loader = std::make_shared<IndexLoadTask>();
            new_loader->search_contexts_.push_back(context);
            new_loader->file_ = pair.second;

            auto index  = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(pair.second->location_);
            if(index != nullptr) {
                //if the index file has been in memory, increase its priority
                task_list.push_front(new_loader);
            } else {
                //index file not in memory, put it to tail
                task_list.push_back(new_loader);
            }
        }

        return true;
    }
};

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DeleteTableStrategy {
public:
    bool Schedule(const DeleteContextPtr &context, std::list<ScheduleTaskPtr> &task_list) {
        if (context == nullptr) {
J
jinhai 已提交
68
            ENGINE_LOG_ERROR << "Task Dispatch context doesn't exist";
G
groot 已提交
69 70 71 72 73 74 75 76 77 78
            return false;
        }

        DeleteTaskPtr delete_task = std::make_shared<DeleteTask>(context);
        if(task_list.empty()) {
            task_list.push_back(delete_task);
            return true;
        }

        std::string table_id = context->table_id();
G
groot 已提交
79 80 81 82 83

        //put delete task to proper position
        //for example: task_list has 10 IndexLoadTask, only the No.5 IndexLoadTask is for table1
        //if user want to delete table1, the DeleteTask will be insert into No.6 position
        for(std::list<ScheduleTaskPtr>::reverse_iterator iter = task_list.rbegin(); iter != task_list.rend(); ++iter) {
G
groot 已提交
84 85 86 87 88
            if((*iter)->type() != ScheduleTaskType::kIndexLoad) {
                continue;
            }

            IndexLoadTaskPtr loader = std::static_pointer_cast<IndexLoadTask>(*iter);
G
groot 已提交
89 90
            if(loader->file_->table_id_ != table_id) {
                continue;
G
groot 已提交
91
            }
G
groot 已提交
92 93 94

            task_list.insert(iter.base(), delete_task);
            return true;
G
groot 已提交
95 96
        }

G
groot 已提交
97 98
        //no task is searching this table, put DeleteTask to front of list so that the table will be delete asap
        task_list.push_front(delete_task);
G
groot 已提交
99 100 101 102 103 104 105 106 107
        return true;
    }
};


////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
bool TaskDispatchStrategy::Schedule(const ScheduleContextPtr &context_ptr,
                                    std::list<zilliz::milvus::engine::ScheduleTaskPtr> &task_list) {
    if(context_ptr == nullptr) {
J
jinhai 已提交
108
        ENGINE_LOG_ERROR << "Task Dispatch context doesn't exist";
G
groot 已提交
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
        return false;
    }

    switch(context_ptr->type()) {
        case ScheduleContextType::kSearch: {
            SearchContextPtr search_context = std::static_pointer_cast<SearchContext>(context_ptr);
            ReuseCacheIndexStrategy strategy;
            return strategy.Schedule(search_context, task_list);
        }
        case ScheduleContextType::kDelete: {
            DeleteContextPtr delete_context = std::static_pointer_cast<DeleteContext>(context_ptr);
            DeleteTableStrategy strategy;
            return strategy.Schedule(delete_context, task_list);
        }
        default:
            ENGINE_LOG_ERROR << "Invalid schedule task type";
            return false;
    }
}

}
}
}