SqlExecutorTest.java 5.2 KB
Newer Older
L
ligang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
Q
qiaozhanwei 已提交
17
package org.apache.dolphinscheduler.server.worker.sql;
L
ligang 已提交
18

19
import com.alibaba.fastjson.JSONObject;
Q
qiaozhanwei 已提交
20
import org.apache.dolphinscheduler.common.Constants;
L
lgcareer 已提交
21
import org.apache.dolphinscheduler.common.enums.CommandType;
Q
qiaozhanwei 已提交
22 23
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode;
T
Tboy 已提交
24
import org.apache.dolphinscheduler.common.utils.SpringApplicationContext;
Q
qiaozhanwei 已提交
25 26 27 28 29 30
import org.apache.dolphinscheduler.dao.ProcessDao;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
L
ligang 已提交
31
import org.junit.Before;
32
import org.junit.Ignore;
L
ligang 已提交
33 34 35 36 37 38 39 40 41
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;

/**
 *  python shell command executor test
 */
42
@Ignore
L
ligang 已提交
43 44 45 46 47 48 49 50
public class SqlExecutorTest {

    private static final Logger logger = LoggerFactory.getLogger(SqlExecutorTest.class);

    private ProcessDao processDao = null;

    @Before
    public void before(){
51
        processDao = SpringApplicationContext.getBean(ProcessDao.class);
L
ligang 已提交
52 53 54 55
    }

    @Test
    public void test() throws Exception {
B
Baoqi 已提交
56 57 58
        String nodeName = "mysql sql test";
        String taskAppId = "51_11282_263978";
        String tenantCode = "hdfs";
L
lgcareer 已提交
59
        int taskInstId = 7;
B
Baoqi 已提交
60 61 62 63 64 65 66 67
        sharedTestSqlTask(nodeName, taskAppId, tenantCode, taskInstId);
    }

    @Test
    public void testClickhouse() throws Exception {
        String nodeName = "ClickHouse sql test";
        String taskAppId = "1_11_20";
        String tenantCode = "default";
B
Baoqi 已提交
68 69 70 71 72 73 74 75 76 77
        int taskInstId = 20;
        sharedTestSqlTask(nodeName, taskAppId, tenantCode, taskInstId);
    }

    @Test
    public void testOracle() throws Exception {
        String nodeName = "oracle sql test";
        String taskAppId = "2_13_25";
        String tenantCode = "demo";
        int taskInstId = 25;
B
Baoqi 已提交
78 79
        sharedTestSqlTask(nodeName, taskAppId, tenantCode, taskInstId);
    }
B
Baoqi 已提交
80 81 82 83 84 85 86 87 88

    @Test
    public void testSQLServer() throws Exception {
        String nodeName = "SQL Server sql test";
        String taskAppId = "3_14_27";
        String tenantCode = "demo";
        int taskInstId = 27;
        sharedTestSqlTask(nodeName, taskAppId, tenantCode, taskInstId);
    }
L
ligang 已提交
89

B
Baoqi 已提交
90 91 92 93 94 95 96 97
    /**
     * Basic test template for SQLTasks, mainly test different types of DBMS types
     * @param nodeName node name for selected task
     * @param taskAppId task app id
     * @param tenantCode tenant code
     * @param taskInstId task instance id
     * @throws Exception
     */
B
Baoqi 已提交
98
    private void sharedTestSqlTask(String nodeName, String taskAppId, String tenantCode, int taskInstId) throws Exception {
L
ligang 已提交
99 100 101
        TaskProps taskProps = new TaskProps();
        taskProps.setTaskDir("");
        // processDefineId_processInstanceId_taskInstanceId
B
Baoqi 已提交
102
        taskProps.setTaskAppId(taskAppId);
L
ligang 已提交
103
        // set tenant -> task execute linux user
B
Baoqi 已提交
104
        taskProps.setTenantCode(tenantCode);
L
ligang 已提交
105 106
        taskProps.setTaskStartTime(new Date());
        taskProps.setTaskTimeout(360000);
B
Baoqi 已提交
107 108
        taskProps.setTaskInstId(taskInstId);
        taskProps.setNodeName(nodeName);
L
lgcareer 已提交
109
        taskProps.setCmdTypeIfComplement(CommandType.START_PROCESS);
L
ligang 已提交
110 111


B
Baoqi 已提交
112
        TaskInstance taskInstance = processDao.findTaskInstanceById(taskInstId);
L
ligang 已提交
113 114 115 116 117 118 119

        String taskJson = taskInstance.getTaskJson();
        TaskNode taskNode = JSONObject.parseObject(taskJson, TaskNode.class);
        taskProps.setTaskParams(taskNode.getParams());


        // custom logger
120
        Logger taskLogger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
L
ligang 已提交
121 122 123 124 125
                taskInstance.getProcessDefinitionId(),
                taskInstance.getProcessInstanceId(),
                taskInstance.getId()));


L
lgcareer 已提交
126
        AbstractTask task = TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger);
L
ligang 已提交
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147

        logger.info("task info : {}", task);

        // job init
        task.init();

        // job handle
        task.handle();
        ExecutionStatus status = ExecutionStatus.SUCCESS;

        if (task.getExitStatusCode() == Constants.EXIT_CODE_SUCCESS){
            status = ExecutionStatus.SUCCESS;
        }else if (task.getExitStatusCode() == Constants.EXIT_CODE_KILL){
            status = ExecutionStatus.KILL;
        }else {
            status = ExecutionStatus.FAILURE;
        }

        logger.info(status.toString());
    }
}