TaskDispatchStrategy.cpp 4.4 KB
Newer Older
G
groot 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
/*******************************************************************************
 * Copyright 上海赜睿信息科技有限公司(Zilliz) - All Rights Reserved
 * Unauthorized copying of this file, via any medium is strictly prohibited.
 * Proprietary and confidential.
 ******************************************************************************/
#include "TaskDispatchStrategy.h"
#include "context/SearchContext.h"
#include "context/DeleteContext.h"
#include "task/IndexLoadTask.h"
#include "task/DeleteTask.h"
#include "cache/CpuCacheMgr.h"
#include "utils/Error.h"
#include "db/Log.h"

namespace zilliz {
namespace milvus {
namespace engine {

class ReuseCacheIndexStrategy {
public:
    bool Schedule(const SearchContextPtr &context, std::list<ScheduleTaskPtr>& task_list) {
        if(context == nullptr) {
            return false;
        }

        SearchContext::Id2IndexMap index_files = context->GetIndexMap();
        //some index loader alread exists
        for(auto& task : task_list) {
            if(task->type() != ScheduleTaskType::kIndexLoad) {
                continue;
            }

            IndexLoadTaskPtr loader = std::static_pointer_cast<IndexLoadTask>(task);
            if(index_files.find(loader->file_->id_) != index_files.end()){
                ENGINE_LOG_INFO << "Append SearchContext to exist IndexLoaderContext";
                index_files.erase(loader->file_->id_);
                loader->search_contexts_.push_back(context);
            }
        }

        //index_files still contains some index files, create new loader
        for(auto& pair : index_files) {
            ENGINE_LOG_INFO << "Create new IndexLoaderContext for: " << pair.second->location_;
            IndexLoadTaskPtr new_loader = std::make_shared<IndexLoadTask>();
            new_loader->search_contexts_.push_back(context);
            new_loader->file_ = pair.second;

            auto index  = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(pair.second->location_);
            if(index != nullptr) {
                //if the index file has been in memory, increase its priority
                task_list.push_front(new_loader);
            } else {
                //index file not in memory, put it to tail
                task_list.push_back(new_loader);
            }
        }

        return true;
    }
};

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DeleteTableStrategy {
public:
    bool Schedule(const DeleteContextPtr &context, std::list<ScheduleTaskPtr> &task_list) {
        if (context == nullptr) {
            return false;
        }

        DeleteTaskPtr delete_task = std::make_shared<DeleteTask>(context);
        if(task_list.empty()) {
            task_list.push_back(delete_task);
            return true;
        }

        std::string table_id = context->table_id();
        for(auto iter = task_list.begin(); iter != task_list.end(); ++iter) {
            if((*iter)->type() != ScheduleTaskType::kIndexLoad) {
                continue;
            }

            //put delete task to proper position
            IndexLoadTaskPtr loader = std::static_pointer_cast<IndexLoadTask>(*iter);
            if(loader->file_->table_id_ == table_id) {

                task_list.insert(++iter, delete_task);
                break;
            }
        }

        return true;
    }
};


////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
bool TaskDispatchStrategy::Schedule(const ScheduleContextPtr &context_ptr,
                                    std::list<zilliz::milvus::engine::ScheduleTaskPtr> &task_list) {
    if(context_ptr == nullptr) {
        return false;
    }

    switch(context_ptr->type()) {
        case ScheduleContextType::kSearch: {
            SearchContextPtr search_context = std::static_pointer_cast<SearchContext>(context_ptr);
            ReuseCacheIndexStrategy strategy;
            return strategy.Schedule(search_context, task_list);
        }
        case ScheduleContextType::kDelete: {
            DeleteContextPtr delete_context = std::static_pointer_cast<DeleteContext>(context_ptr);
            DeleteTableStrategy strategy;
            return strategy.Schedule(delete_context, task_list);
        }
        default:
            ENGINE_LOG_ERROR << "Invalid schedule task type";
            return false;
    }
}

}
}
}