TaskQueueZkImpl.java 12.7 KB
Newer Older
L
ligang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.escheduler.common.queue;


import cn.escheduler.common.Constants;
import cn.escheduler.common.utils.Bytes;
22 23
import cn.escheduler.common.utils.IpUtils;
import cn.escheduler.common.utils.OSUtils;
L
ligang 已提交
24 25 26 27 28 29 30
import cn.escheduler.common.zk.AbstractZKClient;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

31
import java.util.*;
L
ligang 已提交
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80

/**
 * A singleton of a task queue implemented with zookeeper
 * tasks queue implemention
 */
public class TaskQueueZkImpl extends AbstractZKClient implements ITaskQueue {

    private static final Logger logger = LoggerFactory.getLogger(TaskQueueZkImpl.class);

    private static TaskQueueZkImpl instance;

    private TaskQueueZkImpl(){
        init();
    }

    public static TaskQueueZkImpl getInstance(){
        if (null == instance) {
            synchronized (TaskQueueZkImpl.class) {
                if(null == instance) {
                    instance = new TaskQueueZkImpl();
                }
            }
        }
        return instance;
    }


    /**
     * get all tasks from tasks queue
     * @param key   task queue name
     * @return
     */
    @Override
    public List<String> getAllTasks(String key) {
        try {
            List<String> list = getZkClient().getChildren().forPath(getTasksPath(key));

            return list;
        } catch (Exception e) {
            logger.error("get all tasks from tasks queue exception",e);
        }

        return new ArrayList<String>();
    }

    /**
     * check task exists in the task queue or not
     *
     * @param key       queue name
81
     * @param task      ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
L
ligang 已提交
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
     * @return true if exists in the queue
     */
    @Override
    public boolean checkTaskExists(String key, String task) {
        String taskPath = getTasksPath(key) + Constants.SINGLE_SLASH + task;

        try {
            Stat stat = zkClient.checkExists().forPath(taskPath);

            if(null == stat){
                logger.info("check task:{} not exist in task queue",task);
                return false;
            }else{
                logger.info("check task {} exists in task queue ",task);
                return true;
            }

        } catch (Exception e) {
            logger.info(String.format("task {} check exists in task queue exception ", task), e);
        }

        return false;
    }


    /**
     * add task to tasks queue
     *
     * @param key      task queue name
111
     * @param value    ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
L
ligang 已提交
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
     */
    @Override
    public void add(String key, String value) {
        try {
            String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value;
            String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value));

            logger.info("add task : {} to tasks queue , result success",result);
        } catch (Exception e) {
            logger.error("add task to tasks queue exception",e);
        }

    }


    /**
     * An element pops out of the queue <p>
     * note:
130
     *   ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}_host1,host2,...
L
ligang 已提交
131 132
     *   The tasks with the highest priority are selected by comparing the priorities of the above four levels from high to low.
     *
133
     *   流程优先级_流程实例id_任务优先级_任务id_任务执行的机器id1,任务执行的机器id2,...       high <- low
L
ligang 已提交
134
     * @param  key  task queue name
135 136
     * @param  tasksNum    how many elements to poll
     * @return the task ids  to be executed
L
ligang 已提交
137 138
     */
    @Override
139
    public List<String> poll(String key, int tasksNum) {
L
ligang 已提交
140 141 142 143 144 145 146
        try{
            CuratorFramework zk = getZkClient();
            String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH;
            List<String> list = zk.getChildren().forPath(getTasksPath(key));

            if(list != null && list.size() > 0){

147 148 149
                String workerIp = OSUtils.getHost();
                String workerIpLongStr = String.valueOf(IpUtils.ipToLong(workerIp));

L
ligang 已提交
150 151
                int size = list.size();

152 153 154

                Set<String> taskTreeSet = new TreeSet<>();

L
ligang 已提交
155
                for (int i = 0; i < size; i++) {
156

L
ligang 已提交
157 158 159
                    String taskDetail = list.get(i);
                    String[] taskDetailArrs = taskDetail.split(Constants.UNDERLINE);

160 161 162
                    //向前版本兼容
                    if(taskDetailArrs.length >= 4){

L
ligang 已提交
163 164
                        //format ${processInstancePriority}_${processInstanceId}_${taskInstancePriority}_${taskId}
                        String formatTask = String.format("%s_%010d_%s_%010d", taskDetailArrs[0], Long.parseLong(taskDetailArrs[1]), taskDetailArrs[2], Long.parseLong(taskDetailArrs[3]));
165 166 167 168 169 170 171 172 173 174
                        if(taskDetailArrs.length > 4){
                            String taskHosts = taskDetailArrs[4];

                            //task can assign to any worker host if equals default ip value of worker server
                            if(!taskHosts.equals(Constants.DEFAULT_WORKER_ID)){
                                String[] taskHostsArr = taskHosts.split(Constants.COMMA);

                                if(!Arrays.asList(taskHostsArr).contains(workerIpLongStr)){
                                    continue;
                                }
L
ligang 已提交
175 176
                            }
                        }
177 178 179

                        taskTreeSet.add(formatTask);

L
ligang 已提交
180 181
                    }

182
                }
L
ligang 已提交
183

184
                List<String> taskslist = getTasksListFromTreeSet(tasksNum, taskTreeSet);
L
ligang 已提交
185

186
                logger.info("consume tasks: {},there still have {} tasks need to be executed", Arrays.toString(taskslist.toArray()), size - taskslist.size());
L
ligang 已提交
187

188 189 190
                return taskslist;
            }else{
                Thread.sleep(Constants.SLEEP_TIME_MILLIS);
L
ligang 已提交
191 192 193 194 195
            }

        } catch (Exception e) {
            logger.error("add task to tasks queue exception",e);
        }
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
        return new ArrayList<String>();
    }


    /**
     * get task list from tree set
     *
     * @param tasksNum
     * @param taskTreeSet
     */
    public List<String> getTasksListFromTreeSet(int tasksNum, Set<String> taskTreeSet) {
        Iterator<String> iterator = taskTreeSet.iterator();
        int j = 0;
        List<String> taskslist = new ArrayList<>(tasksNum);
        while(iterator.hasNext()){
            if(j++ < tasksNum){
                String task = iterator.next();
                taskslist.add(task);
            }
        }
        return taskslist;
L
ligang 已提交
217 218
    }

219

B
baoliang 已提交
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
    @Override
    public void removeNode(String key, String nodeValue){

        CuratorFramework zk = getZkClient();
        String tasksQueuePath = getTasksPath(key) + Constants.SINGLE_SLASH;
        String taskIdPath = tasksQueuePath + nodeValue;
        logger.info("consume task {}", taskIdPath);
        try{
            zk.delete().forPath(taskIdPath);
        }catch(Exception e){
            logger.error(String.format("delete task:%s from zookeeper fail, exception:" ,nodeValue) ,e);
        }

    }

L
ligang 已提交
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393


    /**
     * In order to be compatible with redis implementation
     *
     * To be compatible with the redis implementation, add an element to the set
     * @param key   The key is the kill/cancel queue path name
     * @param value host-taskId  The name of the zookeeper node
     */
    @Override
    public void sadd(String key,String value) {
        try {

            if(value != null && value.trim().length() > 0){
                String path = getTasksPath(key) + Constants.SINGLE_SLASH;
                CuratorFramework zk = getZkClient();
                Stat stat = zk.checkExists().forPath(path + value);

                if(null == stat){
                    String result = zk.create().withMode(CreateMode.PERSISTENT).forPath(path + value,Bytes.toBytes(value));
                    logger.info("add task:{} to tasks set result:{} ",value,result);
                }else{
                    logger.info("task {} exists in tasks set ",value);
                }

            }else{
                logger.warn("add host-taskId:{} to tasks set is empty ",value);
            }

        } catch (Exception e) {
            logger.error("add task to tasks set exception",e);
        }
    }


    /**
     * delete the value corresponding to the key in the set
     * @param key   The key is the kill/cancel queue path name
     * @param value host-taskId-taskType The name of the zookeeper node
     */
    @Override
    public void srem(String key, String value) {
        try{
            String path = getTasksPath(key) + Constants.SINGLE_SLASH;
            CuratorFramework zk = getZkClient();
            Stat stat = zk.checkExists().forPath(path + value);

            if(null != stat){
                zk.delete().forPath(path + value);
                logger.info("delete task:{} from tasks set ",value);
            }else{
                logger.info("delete task:{} from tasks set fail, there is no this task",value);
            }

        }catch(Exception e){
            logger.error(String.format("delete task:" + value + " exception"),e);
        }
    }


    /**
     * Gets all the elements of the set based on the key
     * @param key  The key is the kill/cancel queue path name
     * @return
     */
    @Override
    public Set<String> smembers(String key) {

        Set<String> tasksSet = new HashSet<>();

        try {
            List<String> list = getZkClient().getChildren().forPath(getTasksPath(key));

            for (String task : list) {
                tasksSet.add(task);
            }

            return tasksSet;
        } catch (Exception e) {
            logger.error("get all tasks from tasks queue exception",e);
        }

        return tasksSet;
    }



    /**
     * Init the task queue of zookeeper node
     */
    private void init(){
        try {
            String tasksQueuePath = getTasksPath(Constants.SCHEDULER_TASKS_QUEUE);
            String tasksCancelPath = getTasksPath(Constants.SCHEDULER_TASKS_KILL);

            for(String taskQueuePath : new String[]{tasksQueuePath,tasksCancelPath}){
                if(zkClient.checkExists().forPath(taskQueuePath) == null){
                    // create a persistent parent node
                    zkClient.create().creatingParentContainersIfNeeded()
                            .withMode(CreateMode.PERSISTENT).forPath(taskQueuePath);
                    logger.info("create tasks queue parent node success : {} ",taskQueuePath);
                }
            }

        } catch (Exception e) {
            logger.error("create zk node failure",e);
        }
    }


    /**
     * Clear the task queue of zookeeper node
     */
    @Override
    public void delete(){
        try {
            String tasksQueuePath = getTasksPath(Constants.SCHEDULER_TASKS_QUEUE);
            String tasksCancelPath = getTasksPath(Constants.SCHEDULER_TASKS_KILL);

            for(String taskQueuePath : new String[]{tasksQueuePath,tasksCancelPath}){
                if(zkClient.checkExists().forPath(taskQueuePath) != null){

                    List<String> list = zkClient.getChildren().forPath(taskQueuePath);

                    for (String task : list) {
                        zkClient.delete().forPath(taskQueuePath + Constants.SINGLE_SLASH + task);
                        logger.info("delete task from tasks queue : {}/{} ",taskQueuePath,task);

                    }

                }
            }

        } catch (Exception e) {
            logger.error("delete all tasks in tasks queue failure",e);
        }
    }


    /**
     * get zookeeper client of CuratorFramework
     * @return
     */
    public CuratorFramework getZkClient() {
        return zkClient;
    }


    /**
     * Get the task queue path
     * @param key  task queue name
     * @return
     */
    public String getTasksPath(String key){
        return conf.getString(Constants.ZOOKEEPER_SCHEDULER_ROOT) + Constants.SINGLE_SLASH + key;
    }


}