TaskDispatchStrategy.cpp 5.4 KB
Newer Older
J
jinhai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

G
groot 已提交
18 19 20 21 22 23 24
#include "TaskDispatchStrategy.h"
#include "context/SearchContext.h"
#include "context/DeleteContext.h"
#include "task/IndexLoadTask.h"
#include "task/DeleteTask.h"
#include "cache/CpuCacheMgr.h"
#include "utils/Error.h"
G
groot 已提交
25
#include "utils/Log.h"
G
groot 已提交
26 27 28 29 30 31 32 33 34

namespace zilliz {
namespace milvus {
namespace engine {

class ReuseCacheIndexStrategy {
public:
    bool Schedule(const SearchContextPtr &context, std::list<ScheduleTaskPtr>& task_list) {
        if(context == nullptr) {
J
jinhai 已提交
35
            ENGINE_LOG_ERROR << "Task Dispatch context doesn't exist";
G
groot 已提交
36 37 38 39 40 41 42 43 44 45 46 47
            return false;
        }

        SearchContext::Id2IndexMap index_files = context->GetIndexMap();
        //some index loader alread exists
        for(auto& task : task_list) {
            if(task->type() != ScheduleTaskType::kIndexLoad) {
                continue;
            }

            IndexLoadTaskPtr loader = std::static_pointer_cast<IndexLoadTask>(task);
            if(index_files.find(loader->file_->id_) != index_files.end()){
G
groot 已提交
48
                ENGINE_LOG_DEBUG << "Append SearchContext to exist IndexLoaderContext";
G
groot 已提交
49 50 51 52 53 54 55
                index_files.erase(loader->file_->id_);
                loader->search_contexts_.push_back(context);
            }
        }

        //index_files still contains some index files, create new loader
        for(auto& pair : index_files) {
G
groot 已提交
56
            ENGINE_LOG_DEBUG << "Create new IndexLoaderContext for: " << pair.second->location_;
G
groot 已提交
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
            IndexLoadTaskPtr new_loader = std::make_shared<IndexLoadTask>();
            new_loader->search_contexts_.push_back(context);
            new_loader->file_ = pair.second;

            auto index  = zilliz::milvus::cache::CpuCacheMgr::GetInstance()->GetIndex(pair.second->location_);
            if(index != nullptr) {
                //if the index file has been in memory, increase its priority
                task_list.push_front(new_loader);
            } else {
                //index file not in memory, put it to tail
                task_list.push_back(new_loader);
            }
        }

        return true;
    }
};

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class DeleteTableStrategy {
public:
    bool Schedule(const DeleteContextPtr &context, std::list<ScheduleTaskPtr> &task_list) {
        if (context == nullptr) {
J
jinhai 已提交
80
            ENGINE_LOG_ERROR << "Task Dispatch context doesn't exist";
G
groot 已提交
81 82 83 84 85 86 87 88 89 90
            return false;
        }

        DeleteTaskPtr delete_task = std::make_shared<DeleteTask>(context);
        if(task_list.empty()) {
            task_list.push_back(delete_task);
            return true;
        }

        std::string table_id = context->table_id();
G
groot 已提交
91 92 93 94 95

        //put delete task to proper position
        //for example: task_list has 10 IndexLoadTask, only the No.5 IndexLoadTask is for table1
        //if user want to delete table1, the DeleteTask will be insert into No.6 position
        for(std::list<ScheduleTaskPtr>::reverse_iterator iter = task_list.rbegin(); iter != task_list.rend(); ++iter) {
G
groot 已提交
96 97 98 99 100
            if((*iter)->type() != ScheduleTaskType::kIndexLoad) {
                continue;
            }

            IndexLoadTaskPtr loader = std::static_pointer_cast<IndexLoadTask>(*iter);
G
groot 已提交
101 102
            if(loader->file_->table_id_ != table_id) {
                continue;
G
groot 已提交
103
            }
G
groot 已提交
104 105 106

            task_list.insert(iter.base(), delete_task);
            return true;
G
groot 已提交
107 108
        }

G
groot 已提交
109 110
        //no task is searching this table, put DeleteTask to front of list so that the table will be delete asap
        task_list.push_front(delete_task);
G
groot 已提交
111 112 113 114 115 116 117 118 119
        return true;
    }
};


////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
bool TaskDispatchStrategy::Schedule(const ScheduleContextPtr &context_ptr,
                                    std::list<zilliz::milvus::engine::ScheduleTaskPtr> &task_list) {
    if(context_ptr == nullptr) {
J
jinhai 已提交
120
        ENGINE_LOG_ERROR << "Task Dispatch context doesn't exist";
G
groot 已提交
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
        return false;
    }

    switch(context_ptr->type()) {
        case ScheduleContextType::kSearch: {
            SearchContextPtr search_context = std::static_pointer_cast<SearchContext>(context_ptr);
            ReuseCacheIndexStrategy strategy;
            return strategy.Schedule(search_context, task_list);
        }
        case ScheduleContextType::kDelete: {
            DeleteContextPtr delete_context = std::static_pointer_cast<DeleteContext>(context_ptr);
            DeleteTableStrategy strategy;
            return strategy.Schedule(delete_context, task_list);
        }
        default:
            ENGINE_LOG_ERROR << "Invalid schedule task type";
            return false;
    }
}

}
}
}