TaskManager.java 3.7 KB
Newer Older
L
ligang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

18
package org.apache.dolphinscheduler.server.worker.task;
L
ligang 已提交
19

Q
qiaozhanwei 已提交
20
import org.apache.dolphinscheduler.common.enums.TaskType;
21
import org.apache.dolphinscheduler.common.utils.EnumUtils;
Q
qiaozhanwei 已提交
22
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
23
import org.apache.dolphinscheduler.server.worker.task.datax.DataxTask;
Q
qiaozhanwei 已提交
24 25 26
import org.apache.dolphinscheduler.server.worker.task.flink.FlinkTask;
import org.apache.dolphinscheduler.server.worker.task.http.HttpTask;
import org.apache.dolphinscheduler.server.worker.task.mr.MapReduceTask;
27
import org.apache.dolphinscheduler.server.worker.task.procedure.ProcedureTask;
Q
qiaozhanwei 已提交
28 29 30 31
import org.apache.dolphinscheduler.server.worker.task.python.PythonTask;
import org.apache.dolphinscheduler.server.worker.task.shell.ShellTask;
import org.apache.dolphinscheduler.server.worker.task.spark.SparkTask;
import org.apache.dolphinscheduler.server.worker.task.sql.SqlTask;
_和's avatar
_和 已提交
32
import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopTask;
33 34
import org.apache.dolphinscheduler.service.alert.AlertClientService;

L
ligang 已提交
35 36 37
import org.slf4j.Logger;

/**
38
 * task manaster
L
ligang 已提交
39 40 41
 */
public class TaskManager {

42 43 44 45 46 47 48
    /**
     * create new task
     * @param taskExecutionContext  taskExecutionContext
     * @param logger    logger
     * @return AbstractTask
     * @throws IllegalArgumentException illegal argument exception
     */
49
    public static AbstractTask newTask(TaskExecutionContext taskExecutionContext, Logger logger, AlertClientService alertClientService) throws IllegalArgumentException {
50 51 52 53 54 55
        TaskType anEnum = EnumUtils.getEnum(TaskType.class, taskExecutionContext.getTaskType());
        if (anEnum == null) {
            logger.error("not support task type: {}", taskExecutionContext.getTaskType());
            throw new IllegalArgumentException("not support task type");
        }
        switch (anEnum) {
56 57 58 59 60 61
            case SHELL:
            case WATERDROP:
                return new ShellTask(taskExecutionContext, logger);
            case PROCEDURE:
                return new ProcedureTask(taskExecutionContext, logger);
            case SQL:
62
                return new SqlTask(taskExecutionContext, logger, alertClientService);
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
            case MR:
                return new MapReduceTask(taskExecutionContext, logger);
            case SPARK:
                return new SparkTask(taskExecutionContext, logger);
            case FLINK:
                return new FlinkTask(taskExecutionContext, logger);
            case PYTHON:
                return new PythonTask(taskExecutionContext, logger);
            case HTTP:
                return new HttpTask(taskExecutionContext, logger);
            case DATAX:
                return new DataxTask(taskExecutionContext, logger);
            case SQOOP:
                return new SqoopTask(taskExecutionContext, logger);
            default:
78
                logger.error("not support task type: {}", taskExecutionContext.getTaskType());
79 80
                throw new IllegalArgumentException("not support task type");
        }
L
ligang 已提交
81 82
    }
}