未验证 提交 2bb5ebaf 编写于 作者: Q qiaozhanwei 提交者: GitHub

add TaskInstanceCacheManager receive Worker report result,modify master...

add TaskInstanceCacheManager receive Worker report result,modify master polling db transfrom to cache (#2021)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

* add TaskInstanceCacheManager receive Worker report result

* TaskInstance setExecutePath

* add TaskInstanceCacheManager to receive Worker Task result report

* TaskInstanceCacheManager add remove method

* add license

* add dispatcht task method

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access
上级 9dc9745a
...@@ -26,7 +26,7 @@ import java.io.File; ...@@ -26,7 +26,7 @@ import java.io.File;
/** /**
* common utils * common utils
*/ */
public class CommonUtils { public class CommonUtils {
private CommonUtils() { private CommonUtils() {
throw new IllegalStateException("CommonUtils class"); throw new IllegalStateException("CommonUtils class");
} }
......
...@@ -462,6 +462,14 @@ public class TaskInstance implements Serializable { ...@@ -462,6 +462,14 @@ public class TaskInstance implements Serializable {
this.workerGroupId = workerGroupId; this.workerGroupId = workerGroupId;
} }
public String getDependentResult() {
return dependentResult;
}
public void setDependentResult(String dependentResult) {
this.dependentResult = dependentResult;
}
@Override @Override
public String toString() { public String toString() {
return "TaskInstance{" + return "TaskInstance{" +
...@@ -481,27 +489,19 @@ public class TaskInstance implements Serializable { ...@@ -481,27 +489,19 @@ public class TaskInstance implements Serializable {
", logPath='" + logPath + '\'' + ", logPath='" + logPath + '\'' +
", retryTimes=" + retryTimes + ", retryTimes=" + retryTimes +
", alertFlag=" + alertFlag + ", alertFlag=" + alertFlag +
", flag=" + flag +
", processInstance=" + processInstance + ", processInstance=" + processInstance +
", processDefine=" + processDefine + ", processDefine=" + processDefine +
", pid=" + pid + ", pid=" + pid +
", appLink='" + appLink + '\'' + ", appLink='" + appLink + '\'' +
", flag=" + flag + ", flag=" + flag +
", dependency=" + dependency + ", dependency='" + dependency + '\'' +
", duration=" + duration + ", duration=" + duration +
", maxRetryTimes=" + maxRetryTimes + ", maxRetryTimes=" + maxRetryTimes +
", retryInterval=" + retryInterval + ", retryInterval=" + retryInterval +
", taskInstancePriority=" + taskInstancePriority + ", taskInstancePriority=" + taskInstancePriority +
", processInstancePriority=" + processInstancePriority + ", processInstancePriority=" + processInstancePriority +
", workGroupId=" + workerGroupId + ", dependentResult='" + dependentResult + '\'' +
", workerGroupId=" + workerGroupId +
'}'; '}';
} }
public String getDependentResult() {
return dependentResult;
}
public void setDependentResult(String dependentResult) {
this.dependentResult = dependentResult;
}
} }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.Date; /** * execute task response command */public class ExecuteTaskResponseCommand implements Serializable { public ExecuteTaskResponseCommand() { } public ExecuteTaskResponseCommand(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } /** * task instance id */ private int taskInstanceId; /** * status */ private int status; /** * end time */ private Date endTime; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public Date getEndTime() { return endTime; } public void setEndTime(Date endTime) { this.endTime = endTime; } /** * package response command * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } } /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.Date; /** * execute task response command */public class ExecuteTaskResponseCommand implements Serializable { public ExecuteTaskResponseCommand() { } public ExecuteTaskResponseCommand(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } /** * task instance id */ private int taskInstanceId; /** * status */ private int status; /** * end time */ private Date endTime; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public Date getEndTime() { return endTime; } public void setEndTime(Date endTime) { this.endTime = endTime; } /** * package response command * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskResponseCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + ", endTime=" + endTime + '}'; }}
\ No newline at end of file \ No newline at end of file
......
...@@ -51,12 +51,16 @@ public class TaskExecutionContext implements Serializable{ ...@@ -51,12 +51,16 @@ public class TaskExecutionContext implements Serializable{
*/ */
private String executePath; private String executePath;
/**
* log path
*/
private String logPath;
/** /**
* task json * task json
*/ */
private String taskJson; private String taskJson;
/** /**
* process instance id * process instance id
*/ */
...@@ -228,6 +232,14 @@ public class TaskExecutionContext implements Serializable{ ...@@ -228,6 +232,14 @@ public class TaskExecutionContext implements Serializable{
this.cmdTypeIfComplement = cmdTypeIfComplement; this.cmdTypeIfComplement = cmdTypeIfComplement;
} }
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
@Override @Override
public String toString() { public String toString() {
return "TaskExecutionContext{" + return "TaskExecutionContext{" +
...@@ -236,6 +248,7 @@ public class TaskExecutionContext implements Serializable{ ...@@ -236,6 +248,7 @@ public class TaskExecutionContext implements Serializable{
", startTime=" + startTime + ", startTime=" + startTime +
", taskType='" + taskType + '\'' + ", taskType='" + taskType + '\'' +
", executePath='" + executePath + '\'' + ", executePath='" + executePath + '\'' +
", logPath='" + logPath + '\'' +
", taskJson='" + taskJson + '\'' + ", taskJson='" + taskJson + '\'' +
", processInstanceId=" + processInstanceId + ", processInstanceId=" + processInstanceId +
", scheduleTime=" + scheduleTime + ", scheduleTime=" + scheduleTime +
......
...@@ -44,6 +44,7 @@ public class TaskExecutionContextBuilder { ...@@ -44,6 +44,7 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setTaskName(taskInstance.getName()); taskExecutionContext.setTaskName(taskInstance.getName());
taskExecutionContext.setStartTime(taskInstance.getStartTime()); taskExecutionContext.setStartTime(taskInstance.getStartTime());
taskExecutionContext.setTaskType(taskInstance.getTaskType()); taskExecutionContext.setTaskType(taskInstance.getTaskType());
taskExecutionContext.setLogPath(taskInstance.getLogPath());
taskExecutionContext.setExecutePath(taskInstance.getExecutePath()); taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
taskExecutionContext.setTaskJson(taskInstance.getTaskJson()); taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
return this; return this;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.cache;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
/**
* task instance state manager
*/
public interface TaskInstanceCacheManager {
/**
* get taskInstance by taskInstance id
*
* @param taskInstanceId taskInstanceId
* @return taskInstance
*/
TaskInstance getByTaskInstanceId(Integer taskInstanceId);
/**
* cache taskInstance
*
* @param taskExecutionContext taskExecutionContext
*/
void cacheTaskInstance(TaskExecutionContext taskExecutionContext);
/**
* cache taskInstance
*
* @param taskAckCommand taskAckCommand
*/
void cacheTaskInstance(ExecuteTaskAckCommand taskAckCommand);
/**
* cache taskInstance
*
* @param executeTaskResponseCommand executeTaskResponseCommand
*/
void cacheTaskInstance(ExecuteTaskResponseCommand executeTaskResponseCommand);
/**
* remove taskInstance by taskInstanceId
* @param taskInstanceId taskInstanceId
*/
void removeByTaskInstanceId(Integer taskInstanceId);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.cache.impl;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* taskInstance state manager
*/
@Component
public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
/**
* taskInstance caceh
*/
private Map<Integer,TaskInstance> taskInstanceCache = new ConcurrentHashMap<>();
/**
* get taskInstance by taskInstance id
*
* @param taskInstanceId taskInstanceId
* @return taskInstance
*/
@Override
public TaskInstance getByTaskInstanceId(Integer taskInstanceId) {
return taskInstanceCache.get(taskInstanceId);
}
/**
* cache taskInstance
*
* @param taskExecutionContext taskExecutionContext
*/
@Override
public void cacheTaskInstance(TaskExecutionContext taskExecutionContext) {
TaskInstance taskInstance = getByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
if (taskInstance == null){
taskInstance = new TaskInstance();
}
taskInstance.setId(taskExecutionContext.getTaskInstanceId());
taskInstance.setName(taskExecutionContext.getTaskName());
taskInstance.setStartTime(taskExecutionContext.getStartTime());
taskInstance.setTaskType(taskInstance.getTaskType());
taskInstance.setExecutePath(taskInstance.getExecutePath());
taskInstance.setTaskJson(taskInstance.getTaskJson());
}
/**
* cache taskInstance
*
* @param taskAckCommand taskAckCommand
*/
@Override
public void cacheTaskInstance(ExecuteTaskAckCommand taskAckCommand) {
TaskInstance taskInstance = getByTaskInstanceId(taskAckCommand.getTaskInstanceId());
if (taskInstance == null){
taskInstance = new TaskInstance();
}
taskInstance.setState(ExecutionStatus.of(taskAckCommand.getStatus()));
taskInstance.setStartTime(taskAckCommand.getStartTime());
taskInstance.setHost(taskAckCommand.getHost());
taskInstance.setExecutePath(taskAckCommand.getExecutePath());
taskInstance.setLogPath(taskAckCommand.getLogPath());
}
/**
* cache taskInstance
*
* @param executeTaskResponseCommand executeTaskResponseCommand
*/
@Override
public void cacheTaskInstance(ExecuteTaskResponseCommand executeTaskResponseCommand) {
TaskInstance taskInstance = getByTaskInstanceId(executeTaskResponseCommand.getTaskInstanceId());
if (taskInstance == null){
taskInstance = new TaskInstance();
}
taskInstance.setState(ExecutionStatus.of(executeTaskResponseCommand.getStatus()));
taskInstance.setEndTime(executeTaskResponseCommand.getEndTime());
}
/**
* remove taskInstance by taskInstanceId
* @param taskInstanceId taskInstanceId
*/
@Override
public void removeByTaskInstanceId(Integer taskInstanceId) {
taskInstanceCache.remove(taskInstanceId);
}
}
...@@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.remote.command.CommandType; ...@@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -42,8 +44,14 @@ public class TaskAckProcessor implements NettyRequestProcessor { ...@@ -42,8 +44,14 @@ public class TaskAckProcessor implements NettyRequestProcessor {
*/ */
private final ProcessService processService; private final ProcessService processService;
/**
* taskInstance cache manager
*/
private final TaskInstanceCacheManager taskInstanceCacheManager;
public TaskAckProcessor(){ public TaskAckProcessor(){
this.processService = SpringApplicationContext.getBean(ProcessService.class); this.processService = SpringApplicationContext.getBean(ProcessService.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
} }
/** /**
...@@ -55,7 +63,9 @@ public class TaskAckProcessor implements NettyRequestProcessor { ...@@ -55,7 +63,9 @@ public class TaskAckProcessor implements NettyRequestProcessor {
public void process(Channel channel, Command command) { public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.EXECUTE_TASK_ACK == command.getType(), String.format("invalid command type : %s", command.getType())); Preconditions.checkArgument(CommandType.EXECUTE_TASK_ACK == command.getType(), String.format("invalid command type : %s", command.getType()));
ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskAckCommand.class); ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskAckCommand.class);
logger.info("taskAckCommand : {}",taskAckCommand); logger.info("taskAckCommand : {}", taskAckCommand);
taskInstanceCacheManager.cacheTaskInstance(taskAckCommand);
/** /**
* change Task state * change Task state
*/ */
...@@ -65,6 +75,7 @@ public class TaskAckProcessor implements NettyRequestProcessor { ...@@ -65,6 +75,7 @@ public class TaskAckProcessor implements NettyRequestProcessor {
taskAckCommand.getExecutePath(), taskAckCommand.getExecutePath(),
taskAckCommand.getLogPath(), taskAckCommand.getLogPath(),
taskAckCommand.getTaskInstanceId()); taskAckCommand.getTaskInstanceId());
} }
} }
...@@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.remote.command.CommandType; ...@@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand; import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger; import org.slf4j.Logger;
...@@ -42,8 +44,14 @@ public class TaskResponseProcessor implements NettyRequestProcessor { ...@@ -42,8 +44,14 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
*/ */
private final ProcessService processService; private final ProcessService processService;
/**
* taskInstance cache manager
*/
private final TaskInstanceCacheManager taskInstanceCacheManager;
public TaskResponseProcessor(){ public TaskResponseProcessor(){
this.processService = SpringApplicationContext.getBean(ProcessService.class); this.processService = SpringApplicationContext.getBean(ProcessService.class);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
} }
/** /**
...@@ -56,9 +64,15 @@ public class TaskResponseProcessor implements NettyRequestProcessor { ...@@ -56,9 +64,15 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
@Override @Override
public void process(Channel channel, Command command) { public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.EXECUTE_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType())); Preconditions.checkArgument(CommandType.EXECUTE_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
logger.info("received command : {}", command);
ExecuteTaskResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskResponseCommand.class); ExecuteTaskResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskResponseCommand.class);
processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()), responseCommand.getEndTime(), responseCommand.getTaskInstanceId()); logger.info("received command : {}", responseCommand);
taskInstanceCacheManager.cacheTaskInstance(responseCommand);
processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()),
responseCommand.getEndTime(),
responseCommand.getTaskInstanceId());
} }
......
...@@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.runner; ...@@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.runner;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.FileUtils; import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.AlertDao; import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
...@@ -39,6 +40,8 @@ import org.slf4j.LoggerFactory; ...@@ -39,6 +40,8 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import static org.apache.dolphinscheduler.common.Constants.DOLPHINSCHEDULER_TASKS_QUEUE;
/** /**
* master task exec base class * master task exec base class
*/ */
...@@ -163,6 +166,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> { ...@@ -163,6 +166,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId()); String userQueue = processService.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue); taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode()); taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
taskInstance.setExecutePath(getExecLocalPath(taskInstance));
return TaskExecutionContextBuilder.get() return TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance) .buildTaskInstanceRelatedInfo(taskInstance)
...@@ -172,6 +176,19 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> { ...@@ -172,6 +176,19 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
} }
/**
* get execute local path
*
* @return execute local path
*/
private String getExecLocalPath(TaskInstance taskInstance){
return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
taskInstance.getId());
}
/** /**
* whehter tenant is null * whehter tenant is null
* @param tenant tenant * @param tenant tenant
...@@ -187,19 +204,6 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> { ...@@ -187,19 +204,6 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
} }
return false; return false;
} }
/**
* get execute local path
*
* @return execute local path
*/
private String getExecLocalPath(TaskInstance taskInstance){
return FileUtils.getProcessExecDir(taskInstance.getProcessDefine().getProjectId(),
taskInstance.getProcessDefine().getId(),
taskInstance.getProcessInstance().getId(),
taskInstance.getId());
}
/** /**
* submit master base task exec thread * submit master base task exec thread
* @return TaskInstance * @return TaskInstance
...@@ -210,7 +214,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> { ...@@ -210,7 +214,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
int retryTimes = 1; int retryTimes = 1;
boolean submitDB = false; boolean submitDB = false;
boolean submitQueue = false; boolean submitTask = false;
TaskInstance task = null; TaskInstance task = null;
while (retryTimes <= commitRetryTimes){ while (retryTimes <= commitRetryTimes){
try { try {
...@@ -221,27 +225,60 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> { ...@@ -221,27 +225,60 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
submitDB = true; submitDB = true;
} }
} }
if(submitDB && !submitQueue){ if(submitDB && !submitTask){
// submit task to queue // dispatcht task
submitQueue = dispatch(task); submitTask = dispatchtTask(task);
} }
if(submitDB && submitQueue){ if(submitDB && submitTask){
return task; return task;
} }
if(!submitDB){ if(!submitDB){
logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes); logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes);
}else if(!submitQueue){ }else if(!submitTask){
logger.error("task commit to queue failed , taskId {} has already retry {} times, please check the queue", taskInstance.getId(), retryTimes); logger.error("task commit failed , taskId {} has already retry {} times, please check", taskInstance.getId(), retryTimes);
} }
Thread.sleep(commitRetryInterval); Thread.sleep(commitRetryInterval);
} catch (Exception e) { } catch (Exception e) {
logger.error("task commit to mysql and queue failed",e); logger.error("task commit to mysql and dispatcht task failed",e);
} }
retryTimes += 1; retryTimes += 1;
} }
return task; return task;
} }
/**
* dispatcht task
* @param taskInstance taskInstance
* @return whether submit task success
*/
public Boolean dispatchtTask(TaskInstance taskInstance) {
try{
if(taskInstance.isSubProcess()){
return true;
}
if(taskInstance.getState().typeIsFinished()){
logger.info(String.format("submit task , but task [%s] state [%s] is already finished. ", taskInstance.getName(), taskInstance.getState().toString()));
return true;
}
// task cannot submit when running
if(taskInstance.getState() == ExecutionStatus.RUNNING_EXEUTION){
logger.info(String.format("submit to task, but task [%s] state already be running. ", taskInstance.getName()));
return true;
}
logger.info("task ready to submit: {}" , taskInstance);
boolean submitTask = dispatch(taskInstance);
logger.info(String.format("master submit success, task : %s", taskInstance.getName()) );
return submitTask;
}catch (Exception e){
logger.error("submit task Exception: ", e);
logger.error("task error : %s", JSONUtils.toJson(taskInstance));
return false;
}
}
/** /**
* submit wait complete * submit wait complete
* @return true * @return true
......
...@@ -404,7 +404,7 @@ public class MasterExecThread implements Runnable { ...@@ -404,7 +404,7 @@ public class MasterExecThread implements Runnable {
} }
/** /**
* submit task to execute * TODO submit task to execute
* @param taskInstance task instance * @param taskInstance task instance
* @return TaskInstance * @return TaskInstance
*/ */
...@@ -873,7 +873,7 @@ public class MasterExecThread implements Runnable { ...@@ -873,7 +873,7 @@ public class MasterExecThread implements Runnable {
} }
logger.info("task :{}, id:{} complete, state is {} ", logger.info("task :{}, id:{} complete, state is {} ",
task.getName(), task.getId(), task.getState().toString()); task.getName(), task.getId(), task.getState().toString());
// node success , post node submit //TODO node success , post node submit
if(task.getState() == ExecutionStatus.SUCCESS){ if(task.getState() == ExecutionStatus.SUCCESS){
completeTaskList.put(task.getName(), task); completeTaskList.put(task.getName(), task);
submitPostNode(task.getName()); submitPostNode(task.getName());
......
...@@ -26,6 +26,9 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; ...@@ -26,6 +26,9 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -43,6 +46,12 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { ...@@ -43,6 +46,12 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
*/ */
private static final Logger logger = LoggerFactory.getLogger(MasterTaskExecThread.class); private static final Logger logger = LoggerFactory.getLogger(MasterTaskExecThread.class);
/**
* taskInstance state manager
*/
private TaskInstanceCacheManager taskInstanceCacheManager;
/** /**
* constructor of MasterTaskExecThread * constructor of MasterTaskExecThread
* @param taskInstance task instance * @param taskInstance task instance
...@@ -50,6 +59,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { ...@@ -50,6 +59,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
*/ */
public MasterTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){ public MasterTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){
super(taskInstance, processInstance); super(taskInstance, processInstance);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
} }
/** /**
...@@ -67,7 +77,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { ...@@ -67,7 +77,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
private Boolean alreadyKilled = false; private Boolean alreadyKilled = false;
/** /**
* submit task instance and wait complete * TODO submit task instance and wait complete
* @return true is task quit is true * @return true is task quit is true
*/ */
@Override @Override
...@@ -89,12 +99,16 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { ...@@ -89,12 +99,16 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
} }
/** /**
* wait task quit * TODO 在这里轮询数据库
* TODO wait task quit
* @return true if task quit success * @return true if task quit success
*/ */
public Boolean waitTaskQuit(){ public Boolean waitTaskQuit(){
// query new state // query new state
taskInstance = processService.findTaskInstanceById(taskInstance.getId()); taskInstance = taskInstanceCacheManager.getByTaskInstanceId(taskInstance.getId());
if (taskInstance == null){
taskInstance = processService.findTaskInstanceById(taskInstance.getId());
}
logger.info("wait task: process id: {}, task id:{}, task name:{} complete", logger.info("wait task: process id: {}, task id:{}, task name:{} complete",
this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName()); this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName());
// task time out // task time out
...@@ -119,6 +133,8 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { ...@@ -119,6 +133,8 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
} }
// task instance finished // task instance finished
if (taskInstance.getState().typeIsFinished()){ if (taskInstance.getState().typeIsFinished()){
// if task is final result , then remove taskInstance from cache
taskInstanceCacheManager.removeByTaskInstanceId(taskInstance.getId());
break; break;
} }
if(checkTimeout){ if(checkTimeout){
...@@ -133,7 +149,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { ...@@ -133,7 +149,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
} }
} }
// updateProcessInstance task instance // updateProcessInstance task instance
taskInstance = processService.findTaskInstanceById(taskInstance.getId()); taskInstance = taskInstanceCacheManager.getByTaskInstanceId(taskInstance.getId());
processInstance = processService.findProcessInstanceById(processInstance.getId()); processInstance = processService.findProcessInstanceById(processInstance.getId());
Thread.sleep(Constants.SLEEP_TIME_MILLIS); Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) { } catch (Exception e) {
...@@ -149,6 +165,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { ...@@ -149,6 +165,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
/** /**
* TODO Kill 任务
* task instance add queue , waiting worker to kill * task instance add queue , waiting worker to kill
*/ */
private void cancelTaskInstance(){ private void cancelTaskInstance(){
...@@ -162,6 +179,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { ...@@ -162,6 +179,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
} }
String queueValue = String.format("%s-%d", String queueValue = String.format("%s-%d",
host, taskInstance.getId()); host, taskInstance.getId());
// TODO 这里写
taskQueue.sadd(DOLPHINSCHEDULER_TASKS_KILL, queueValue); taskQueue.sadd(DOLPHINSCHEDULER_TASKS_KILL, queueValue);
logger.info("master add kill task :{} id:{} to kill queue", logger.info("master add kill task :{} id:{} to kill queue",
......
...@@ -155,6 +155,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor { ...@@ -155,6 +155,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
}else{ }else{
ackCommand.setExecutePath(taskExecutionContext.getExecutePath()); ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
} }
taskExecutionContext.setLogPath(ackCommand.getLogPath());
return ackCommand; return ackCommand;
} }
......
...@@ -42,6 +42,12 @@ public class TaskKillProcessor implements NettyRequestProcessor { ...@@ -42,6 +42,12 @@ public class TaskKillProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskKillProcessor.class); private final Logger logger = LoggerFactory.getLogger(TaskKillProcessor.class);
/**
* task kill process
*
* @param channel channel
* @param command command
*/
@Override @Override
public void process(Channel channel, Command command) { public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.KILL_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType())); Preconditions.checkArgument(CommandType.KILL_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
...@@ -51,6 +57,11 @@ public class TaskKillProcessor implements NettyRequestProcessor { ...@@ -51,6 +57,11 @@ public class TaskKillProcessor implements NettyRequestProcessor {
} }
/**
* kill task logic
*
* @param killCommand killCommand
*/
private void doKill(KillTaskRequestCommand killCommand){ private void doKill(KillTaskRequestCommand killCommand){
try { try {
if(killCommand.getProcessId() == 0 ){ if(killCommand.getProcessId() == 0 ){
...@@ -71,6 +82,14 @@ public class TaskKillProcessor implements NettyRequestProcessor { ...@@ -71,6 +82,14 @@ public class TaskKillProcessor implements NettyRequestProcessor {
} }
} }
/**
* kill yarn job
*
* @param host host
* @param logPath logPath
* @param executePath executePath
* @param tenantCode tenantCode
*/
public void killYarnJob(String host, String logPath, String executePath, String tenantCode) { public void killYarnJob(String host, String logPath, String executePath, String tenantCode) {
try { try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS); Thread.sleep(Constants.SLEEP_TIME_MILLIS);
......
...@@ -103,7 +103,6 @@ public class TaskExecuteThread implements Runnable { ...@@ -103,7 +103,6 @@ public class TaskExecuteThread implements Runnable {
// set task props // set task props
TaskProps taskProps = new TaskProps(taskNode.getParams(), TaskProps taskProps = new TaskProps(taskNode.getParams(),
taskExecutionContext.getExecutePath(),
taskExecutionContext.getScheduleTime(), taskExecutionContext.getScheduleTime(),
taskExecutionContext.getTaskName(), taskExecutionContext.getTaskName(),
taskExecutionContext.getTaskType(), taskExecutionContext.getTaskType(),
...@@ -114,7 +113,10 @@ public class TaskExecuteThread implements Runnable { ...@@ -114,7 +113,10 @@ public class TaskExecuteThread implements Runnable {
taskExecutionContext.getStartTime(), taskExecutionContext.getStartTime(),
getGlobalParamsMap(), getGlobalParamsMap(),
null, null,
CommandType.of(taskExecutionContext.getCmdTypeIfComplement())); CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
OSUtils.getHost(),
taskExecutionContext.getLogPath(),
taskExecutionContext.getExecutePath());
// set task timeout // set task timeout
setTaskTimeout(taskProps, taskNode); setTaskTimeout(taskProps, taskNode);
...@@ -142,7 +144,7 @@ public class TaskExecuteThread implements Runnable { ...@@ -142,7 +144,7 @@ public class TaskExecuteThread implements Runnable {
// task result process // task result process
task.after(); task.after();
//
responseCommand.setStatus(task.getExitStatus().getCode()); responseCommand.setStatus(task.getExitStatus().getCode());
responseCommand.setEndTime(new Date()); responseCommand.setEndTime(new Date());
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus()); logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
......
...@@ -18,13 +18,17 @@ package org.apache.dolphinscheduler.server.worker.task; ...@@ -18,13 +18,17 @@ package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils; import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger; import org.slf4j.Logger;
import java.io.*; import java.io.*;
...@@ -37,6 +41,8 @@ import java.util.function.Consumer; ...@@ -37,6 +41,8 @@ import java.util.function.Consumer;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import static org.apache.dolphinscheduler.common.Constants.*;
/** /**
* abstract command executor * abstract command executor
*/ */
...@@ -69,7 +75,7 @@ public abstract class AbstractCommandExecutor { ...@@ -69,7 +75,7 @@ public abstract class AbstractCommandExecutor {
/** /**
* task appId * task appId
*/ */
protected final int taskInstId; protected final int taskInstanceId;
/** /**
* tenant code , execute task linux user * tenant code , execute task linux user
...@@ -101,92 +107,36 @@ public abstract class AbstractCommandExecutor { ...@@ -101,92 +107,36 @@ public abstract class AbstractCommandExecutor {
*/ */
protected final List<String> logBuffer; protected final List<String> logBuffer;
/**
* log path
*/
private String logPath;
/**
* execute path
*/
private String executePath;
public AbstractCommandExecutor(Consumer<List<String>> logHandler, public AbstractCommandExecutor(Consumer<List<String>> logHandler,
String taskDir, String taskAppId,int taskInstId,String tenantCode, String envFile, String taskDir,
Date startTime, int timeout, Logger logger){ String taskAppId,
Integer taskInstanceId,
String tenantCode, String envFile,
Date startTime, int timeout, String logPath,String executePath,Logger logger){
this.logHandler = logHandler; this.logHandler = logHandler;
this.taskDir = taskDir; this.taskDir = taskDir;
this.taskAppId = taskAppId; this.taskAppId = taskAppId;
this.taskInstId = taskInstId; this.taskInstanceId = taskInstanceId;
this.tenantCode = tenantCode; this.tenantCode = tenantCode;
this.envFile = envFile; this.envFile = envFile;
this.startTime = startTime; this.startTime = startTime;
this.timeout = timeout; this.timeout = timeout;
this.logPath = logPath;
this.executePath = executePath;
this.logger = logger; this.logger = logger;
this.logBuffer = Collections.synchronizedList(new ArrayList<>()); this.logBuffer = Collections.synchronizedList(new ArrayList<>());
} }
/**
* task specific execution logic
*
* @param execCommand exec command
* @param processService process dao
* @return exit status code
*/
public int run(String execCommand, ProcessService processService) {
int exitStatusCode;
try {
if (StringUtils.isEmpty(execCommand)) {
exitStatusCode = 0;
return exitStatusCode;
}
String commandFilePath = buildCommandFilePath();
// create command file if not exists
createCommandFileIfNotExists(execCommand, commandFilePath);
//build process
buildProcess(commandFilePath);
// parse process output
parseProcessOutput(process);
// get process id
int pid = getProcessId(process);
processService.updatePidByTaskInstId(taskInstId, pid, "");
logger.info("process start, process id is: {}", pid);
// if timeout occurs, exit directly
long remainTime = getRemaintime();
// waiting for the run to finish
boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
if (status) {
exitStatusCode = process.exitValue();
logger.info("process has exited, work dir:{}, pid:{} ,exitStatusCode:{}", taskDir, pid,exitStatusCode);
//update process state to db
exitStatusCode = updateState(processService, exitStatusCode, pid, taskInstId);
} else {
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
if (taskInstance == null) {
logger.error("task instance id:{} not exist", taskInstId);
} else {
ProcessUtils.kill(taskInstance);
}
exitStatusCode = -1;
logger.warn("process timeout, work dir:{}, pid:{}", taskDir, pid);
}
} catch (InterruptedException e) {
exitStatusCode = -1;
logger.error(String.format("interrupt exception: {}, task may be cancelled or killed",e.getMessage()), e);
throw new RuntimeException("interrupt exception. exitCode is : " + exitStatusCode);
} catch (Exception e) {
exitStatusCode = -1;
logger.error(e.getMessage(), e);
throw new RuntimeException("process error . exitCode is : " + exitStatusCode);
}
return exitStatusCode;
}
/** /**
* build process * build process
* *
...@@ -217,35 +167,80 @@ public abstract class AbstractCommandExecutor { ...@@ -217,35 +167,80 @@ public abstract class AbstractCommandExecutor {
} }
/** /**
* update process state to db * task specific execution logic
* * @param execCommand execCommand
* @param processService process dao * @return CommandExecuteResult
* @param exitStatusCode exit status code * @throws Exception
* @param pid process id
* @param taskInstId task instance id
* @return exit status code
*/ */
private int updateState(ProcessService processService, int exitStatusCode, int pid, int taskInstId) { public CommandExecuteResult run(String execCommand) throws Exception{
//get yarn state by log
if (exitStatusCode == 0) {
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstId);
logger.info("process id is {}", pid);
List<String> appIds = getAppLinks(taskInstance.getLogPath());
if (appIds.size() > 0) {
String appUrl = String.join(Constants.COMMA, appIds);
logger.info("yarn log url:{}",appUrl);
processService.updatePidByTaskInstId(taskInstId, pid, appUrl);
}
// check if all operations are completed CommandExecuteResult result = new CommandExecuteResult();
if (!isSuccessOfYarnState(appIds)) {
exitStatusCode = -1;
} if (StringUtils.isEmpty(execCommand)) {
return result;
}
String commandFilePath = buildCommandFilePath();
// create command file if not exists
createCommandFileIfNotExists(execCommand, commandFilePath);
//build process
buildProcess(commandFilePath);
// parse process output
parseProcessOutput(process);
Integer processId = getProcessId(process);
result.setProcessId(processId);
// print process id
logger.info("process start, process id is: {}", processId);
// if timeout occurs, exit directly
long remainTime = getRemaintime();
// waiting for the run to finish
boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
// SHELL task state
result.setExitStatusCode(process.exitValue());
logger.info("process has exited, execute path:{}, processId:{} ,exitStatusCode:{}",
taskDir,
processId
, result.getExitStatusCode());
// if SHELL task exit
if (status) {
// set appIds
List<String> appIds = getAppIds(logPath);
result.setAppIds(String.join(Constants.COMMA, appIds));
// if yarn task , yarn state is final state
result.setExitStatusCode(isSuccessOfYarnState(appIds) ? EXIT_CODE_SUCCESS : EXIT_CODE_FAILURE);
} else {
logger.error("process has failure , exitStatusCode : {} , ready to kill ...", result.getExitStatusCode());
TaskInstance taskInstance = new TaskInstance();
taskInstance.setPid(processId);
taskInstance.setHost(OSUtils.getHost());
taskInstance.setLogPath(logPath);
taskInstance.setExecutePath(executePath);
ProcessInstance processInstance = new ProcessInstance();
processInstance.setTenantCode(tenantCode);
taskInstance.setProcessInstance(processInstance);
ProcessUtils.kill(taskInstance);
result.setExitStatusCode(EXIT_CODE_FAILURE);
} }
return exitStatusCode; return result;
} }
/** /**
* cancel application * cancel application
* @throws Exception exception * @throws Exception exception
...@@ -378,10 +373,6 @@ public abstract class AbstractCommandExecutor { ...@@ -378,10 +373,6 @@ public abstract class AbstractCommandExecutor {
parseProcessOutputExecutorService.shutdown(); parseProcessOutputExecutorService.shutdown();
} }
public int getPid() {
return getProcessId(process);
}
/** /**
* check yarn state * check yarn state
* *
...@@ -389,11 +380,10 @@ public abstract class AbstractCommandExecutor { ...@@ -389,11 +380,10 @@ public abstract class AbstractCommandExecutor {
* @return is success of yarn task state * @return is success of yarn task state
*/ */
public boolean isSuccessOfYarnState(List<String> appIds) { public boolean isSuccessOfYarnState(List<String> appIds) {
boolean result = true; boolean result = true;
try { try {
for (String appId : appIds) { for (String appId : appIds) {
while(true){ while(Stopper.isRunning()){
ExecutionStatus applicationStatus = HadoopUtils.getInstance().getApplicationStatus(appId); ExecutionStatus applicationStatus = HadoopUtils.getInstance().getApplicationStatus(appId);
logger.info("appId:{}, final state:{}",appId,applicationStatus.name()); logger.info("appId:{}, final state:{}",appId,applicationStatus.name());
if (applicationStatus.equals(ExecutionStatus.FAILURE) || if (applicationStatus.equals(ExecutionStatus.FAILURE) ||
...@@ -406,7 +396,7 @@ public abstract class AbstractCommandExecutor { ...@@ -406,7 +396,7 @@ public abstract class AbstractCommandExecutor {
} }
Thread.sleep(Constants.SLEEP_TIME_MILLIS); Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} }
} }
} catch (Exception e) { } catch (Exception e) {
logger.error(String.format("yarn applications: %s status failed ", appIds.toString()),e); logger.error(String.format("yarn applications: %s status failed ", appIds.toString()),e);
result = false; result = false;
...@@ -415,15 +405,20 @@ public abstract class AbstractCommandExecutor { ...@@ -415,15 +405,20 @@ public abstract class AbstractCommandExecutor {
} }
public int getProcessId() {
return getProcessId(process);
}
/** /**
* get app links * get app links
* @param fileName file name *
* @param logPath log path
* @return app id list * @return app id list
*/ */
private List<String> getAppLinks(String fileName) { private List<String> getAppIds(String logPath) {
List<String> logs = convertFile2List(fileName); List<String> logs = convertFile2List(logPath);
List<String> appIds = new ArrayList<String>(); List<String> appIds = new ArrayList<>();
/** /**
* analysis log,get submited yarn application id * analysis log,get submited yarn application id
*/ */
...@@ -565,6 +560,5 @@ public abstract class AbstractCommandExecutor { ...@@ -565,6 +560,5 @@ public abstract class AbstractCommandExecutor {
} }
protected abstract String buildCommandFilePath(); protected abstract String buildCommandFilePath();
protected abstract String commandInterpreter(); protected abstract String commandInterpreter();
protected abstract boolean checkFindApp(String line);
protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException; protected abstract void createCommandFileIfNotExists(String execCommand, String commandFile) throws IOException;
} }
...@@ -55,6 +55,17 @@ public abstract class AbstractTask { ...@@ -55,6 +55,17 @@ public abstract class AbstractTask {
protected Logger logger; protected Logger logger;
/**
* SHELL process pid
*/
protected Integer processId;
/**
* other resource manager appId , for example : YARN etc
*/
protected String appIds;
/** /**
* cancel * cancel
*/ */
...@@ -119,6 +130,22 @@ public abstract class AbstractTask { ...@@ -119,6 +130,22 @@ public abstract class AbstractTask {
this.exitStatusCode = exitStatusCode; this.exitStatusCode = exitStatusCode;
} }
public String getAppIds() {
return appIds;
}
public void setAppIds(String appIds) {
this.appIds = appIds;
}
public Integer getProcessId() {
return processId;
}
public void setProcessId(Integer processId) {
this.processId = processId;
}
/** /**
* get task parameters * get task parameters
* @return AbstractParameters * @return AbstractParameters
...@@ -126,6 +153,7 @@ public abstract class AbstractTask { ...@@ -126,6 +153,7 @@ public abstract class AbstractTask {
public abstract AbstractParameters getParameters(); public abstract AbstractParameters getParameters();
/** /**
* result processing * result processing
*/ */
...@@ -146,7 +174,7 @@ public abstract class AbstractTask { ...@@ -146,7 +174,7 @@ public abstract class AbstractTask {
&& paramsMap.containsKey("v_proc_date")){ && paramsMap.containsKey("v_proc_date")){
String vProcDate = paramsMap.get("v_proc_date").getValue(); String vProcDate = paramsMap.get("v_proc_date").getValue();
if (!StringUtils.isEmpty(vProcDate)){ if (!StringUtils.isEmpty(vProcDate)){
TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskProps.getNodeName(), vProcDate); TaskRecordStatus taskRecordState = TaskRecordDao.getTaskRecordState(taskProps.getTaskName(), vProcDate);
logger.info("task record status : {}",taskRecordState); logger.info("task record status : {}",taskRecordState);
if (taskRecordState == TaskRecordStatus.FAILURE){ if (taskRecordState == TaskRecordStatus.FAILURE){
setExitStatusCode(Constants.EXIT_CODE_FAILURE); setExitStatusCode(Constants.EXIT_CODE_FAILURE);
......
...@@ -26,11 +26,6 @@ import org.slf4j.Logger; ...@@ -26,11 +26,6 @@ import org.slf4j.Logger;
* abstract yarn task * abstract yarn task
*/ */
public abstract class AbstractYarnTask extends AbstractTask { public abstract class AbstractYarnTask extends AbstractTask {
/**
* process instance
*/
/** /**
* process task * process task
*/ */
...@@ -50,21 +45,26 @@ public abstract class AbstractYarnTask extends AbstractTask { ...@@ -50,21 +45,26 @@ public abstract class AbstractYarnTask extends AbstractTask {
super(taskProps, logger); super(taskProps, logger);
this.processService = SpringApplicationContext.getBean(ProcessService.class); this.processService = SpringApplicationContext.getBean(ProcessService.class);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskProps.getTaskDir(), taskProps.getExecutePath(),
taskProps.getTaskAppId(), taskProps.getTaskAppId(),
taskProps.getTaskInstId(), taskProps.getTaskInstanceId(),
taskProps.getTenantCode(), taskProps.getTenantCode(),
taskProps.getEnvFile(), taskProps.getEnvFile(),
taskProps.getTaskStartTime(), taskProps.getTaskStartTime(),
taskProps.getTaskTimeout(), taskProps.getTaskTimeout(),
taskProps.getLogPath(),
taskProps.getExecutePath(),
logger); logger);
} }
@Override @Override
public void handle() throws Exception { public void handle() throws Exception {
try { try {
// construct process // SHELL task exit code
exitStatusCode = shellCommandExecutor.run(buildCommand(), processService); CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(buildCommand());
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
} catch (Exception e) { } catch (Exception e) {
logger.error("yarn process failure", e); logger.error("yarn process failure", e);
exitStatusCode = -1; exitStatusCode = -1;
...@@ -82,7 +82,7 @@ public abstract class AbstractYarnTask extends AbstractTask { ...@@ -82,7 +82,7 @@ public abstract class AbstractYarnTask extends AbstractTask {
cancel = true; cancel = true;
// cancel process // cancel process
shellCommandExecutor.cancelApplication(); shellCommandExecutor.cancelApplication();
TaskInstance taskInstance = processService.findTaskInstanceById(taskProps.getTaskInstId()); TaskInstance taskInstance = processService.findTaskInstanceById(taskProps.getTaskInstanceId());
if (status && taskInstance != null){ if (status && taskInstance != null){
ProcessUtils.killYarnJob(taskInstance); ProcessUtils.killYarnJob(taskInstance);
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.task;
/**
* command execute result
*/
public class CommandExecuteResult {
/**
* command exit code
*/
private Integer exitStatusCode;
/**
* appIds
*/
private String appIds;
/**
* process id
*/
private Integer processId;
public CommandExecuteResult(){
this.exitStatusCode = 0;
}
public Integer getExitStatusCode() {
return exitStatusCode;
}
public void setExitStatusCode(Integer exitStatusCode) {
this.exitStatusCode = exitStatusCode;
}
public String getAppIds() {
return appIds;
}
public void setAppIds(String appIds) {
this.appIds = appIds;
}
public Integer getProcessId() {
return processId;
}
public void setProcessId(Integer processId) {
this.processId = processId;
}
}
...@@ -67,8 +67,10 @@ public class PythonCommandExecutor extends AbstractCommandExecutor { ...@@ -67,8 +67,10 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
String envFile, String envFile,
Date startTime, Date startTime,
int timeout, int timeout,
String logPath,
String executePath,
Logger logger) { Logger logger) {
super(logHandler,taskDir,taskAppId,taskInstId,tenantCode, envFile, startTime, timeout, logger); super(logHandler,taskDir,taskAppId,taskInstId,tenantCode, envFile, startTime, timeout,logPath,executePath,logger);
} }
...@@ -132,15 +134,6 @@ public class PythonCommandExecutor extends AbstractCommandExecutor { ...@@ -132,15 +134,6 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
return pythonHome; return pythonHome;
} }
/**
* check find yarn application id
* @param line line
* @return boolean
*/
@Override
protected boolean checkFindApp(String line) {
return true;
}
/** /**
......
...@@ -53,13 +53,15 @@ public class ShellCommandExecutor extends AbstractCommandExecutor { ...@@ -53,13 +53,15 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
public ShellCommandExecutor(Consumer<List<String>> logHandler, public ShellCommandExecutor(Consumer<List<String>> logHandler,
String taskDir, String taskDir,
String taskAppId, String taskAppId,
int taskInstId, Integer taskInstId,
String tenantCode, String tenantCode,
String envFile, String envFile,
Date startTime, Date startTime,
int timeout, Integer timeout,
String logPath,
String executePath,
Logger logger) { Logger logger) {
super(logHandler,taskDir,taskAppId,taskInstId,tenantCode, envFile, startTime, timeout, logger); super(logHandler,taskDir,taskAppId,taskInstId,tenantCode, envFile, startTime, timeout,logPath,executePath,logger);
} }
...@@ -78,15 +80,6 @@ public class ShellCommandExecutor extends AbstractCommandExecutor { ...@@ -78,15 +80,6 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
return SH; return SH;
} }
/**
* check find yarn application id
* @param line line
* @return true if line contains task app id
*/
@Override
protected boolean checkFindApp(String line) {
return line.contains(taskAppId);
}
/** /**
* create command file if not exists * create command file if not exists
......
...@@ -35,12 +35,12 @@ public class TaskProps { ...@@ -35,12 +35,12 @@ public class TaskProps {
/** /**
* task node name * task node name
**/ **/
private String nodeName; private String taskName;
/** /**
* task instance id * task instance id
**/ **/
private int taskInstId; private int taskInstanceId;
/** /**
* tenant code , execute task linux user * tenant code , execute task linux user
...@@ -57,11 +57,6 @@ public class TaskProps { ...@@ -57,11 +57,6 @@ public class TaskProps {
**/ **/
private String taskParams; private String taskParams;
/**
* task dir
**/
private String taskDir;
/** /**
* queue * queue
**/ **/
...@@ -111,6 +106,22 @@ public class TaskProps { ...@@ -111,6 +106,22 @@ public class TaskProps {
*/ */
private CommandType cmdTypeIfComplement; private CommandType cmdTypeIfComplement;
/**
* host
*/
private String host;
/**
* log path
*/
private String logPath;
/**
* execute path
*/
private String executePath;
/** /**
* constructor * constructor
*/ */
...@@ -123,7 +134,7 @@ public class TaskProps { ...@@ -123,7 +134,7 @@ public class TaskProps {
* @param scheduleTime schedule time * @param scheduleTime schedule time
* @param nodeName node name * @param nodeName node name
* @param taskType task type * @param taskType task type
* @param taskInstId task instance id * @param taskInstanceId task instance id
* @param envFile env file * @param envFile env file
* @param tenantCode tenant code * @param tenantCode tenant code
* @param queue queue * @param queue queue
...@@ -133,24 +144,25 @@ public class TaskProps { ...@@ -133,24 +144,25 @@ public class TaskProps {
* @param cmdTypeIfComplement cmd type if complement * @param cmdTypeIfComplement cmd type if complement
*/ */
public TaskProps(String taskParams, public TaskProps(String taskParams,
String taskDir,
Date scheduleTime, Date scheduleTime,
String nodeName, String nodeName,
String taskType, String taskType,
int taskInstId, int taskInstanceId,
String envFile, String envFile,
String tenantCode, String tenantCode,
String queue, String queue,
Date taskStartTime, Date taskStartTime,
Map<String, String> definedParams, Map<String, String> definedParams,
String dependence, String dependence,
CommandType cmdTypeIfComplement){ CommandType cmdTypeIfComplement,
String host,
String logPath,
String executePath){
this.taskParams = taskParams; this.taskParams = taskParams;
this.taskDir = taskDir;
this.scheduleTime = scheduleTime; this.scheduleTime = scheduleTime;
this.nodeName = nodeName; this.taskName = nodeName;
this.taskType = taskType; this.taskType = taskType;
this.taskInstId = taskInstId; this.taskInstanceId = taskInstanceId;
this.envFile = envFile; this.envFile = envFile;
this.tenantCode = tenantCode; this.tenantCode = tenantCode;
this.queue = queue; this.queue = queue;
...@@ -158,7 +170,9 @@ public class TaskProps { ...@@ -158,7 +170,9 @@ public class TaskProps {
this.definedParams = definedParams; this.definedParams = definedParams;
this.dependence = dependence; this.dependence = dependence;
this.cmdTypeIfComplement = cmdTypeIfComplement; this.cmdTypeIfComplement = cmdTypeIfComplement;
this.host = host;
this.logPath = logPath;
this.executePath = executePath;
} }
public String getTenantCode() { public String getTenantCode() {
...@@ -177,12 +191,12 @@ public class TaskProps { ...@@ -177,12 +191,12 @@ public class TaskProps {
this.taskParams = taskParams; this.taskParams = taskParams;
} }
public String getTaskDir() { public String getExecutePath() {
return taskDir; return executePath;
} }
public void setTaskDir(String taskDir) { public void setExecutePath(String executePath) {
this.taskDir = taskDir; this.executePath = executePath;
} }
public Map<String, String> getDefinedParams() { public Map<String, String> getDefinedParams() {
...@@ -202,20 +216,20 @@ public class TaskProps { ...@@ -202,20 +216,20 @@ public class TaskProps {
} }
public String getNodeName() { public String getTaskName() {
return nodeName; return taskName;
} }
public void setNodeName(String nodeName) { public void setTaskName(String taskName) {
this.nodeName = nodeName; this.taskName = taskName;
} }
public int getTaskInstId() { public int getTaskInstanceId() {
return taskInstId; return taskInstanceId;
} }
public void setTaskInstId(int taskInstId) { public void setTaskInstanceId(int taskInstanceId) {
this.taskInstId = taskInstId; this.taskInstanceId = taskInstanceId;
} }
public String getQueue() { public String getQueue() {
...@@ -291,6 +305,22 @@ public class TaskProps { ...@@ -291,6 +305,22 @@ public class TaskProps {
this.cmdTypeIfComplement = cmdTypeIfComplement; this.cmdTypeIfComplement = cmdTypeIfComplement;
} }
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
/** /**
* get parameters map * get parameters map
* @return user defined params map * @return user defined params map
......
...@@ -52,6 +52,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance; ...@@ -52,6 +52,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.server.utils.DataxUtils; import org.apache.dolphinscheduler.server.utils.DataxUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
...@@ -121,12 +122,12 @@ public class DataxTask extends AbstractTask { ...@@ -121,12 +122,12 @@ public class DataxTask extends AbstractTask {
public DataxTask(TaskProps props, Logger logger) { public DataxTask(TaskProps props, Logger logger) {
super(props, logger); super(props, logger);
this.taskDir = props.getTaskDir(); this.taskDir = props.getExecutePath();
logger.info("task dir : {}", taskDir); logger.info("task dir : {}", taskDir);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, props.getTaskDir(), props.getTaskAppId(), this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, props.getExecutePath(), props.getTaskAppId(),
props.getTaskInstId(), props.getTenantCode(), props.getEnvFile(), props.getTaskStartTime(), props.getTaskInstanceId(), props.getTenantCode(), props.getEnvFile(), props.getTaskStartTime(),
props.getTaskTimeout(), logger); props.getTaskTimeout(), props.getLogPath(),props.getExecutePath(),logger);
this.processService = SpringApplicationContext.getBean(ProcessService.class); this.processService = SpringApplicationContext.getBean(ProcessService.class);
} }
...@@ -160,10 +161,15 @@ public class DataxTask extends AbstractTask { ...@@ -160,10 +161,15 @@ public class DataxTask extends AbstractTask {
// run datax process // run datax process
String jsonFilePath = buildDataxJsonFile(); String jsonFilePath = buildDataxJsonFile();
String shellCommandFilePath = buildShellCommandFile(jsonFilePath); String shellCommandFilePath = buildShellCommandFile(jsonFilePath);
exitStatusCode = shellCommandExecutor.run(shellCommandFilePath, processService); CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
} }
catch (Exception e) { catch (Exception e) {
exitStatusCode = -1; logger.error("datax task failure", e);
setExitStatusCode(Constants.EXIT_CODE_FAILURE);
throw e; throw e;
} }
} }
...@@ -355,7 +361,7 @@ public class DataxTask extends AbstractTask { ...@@ -355,7 +361,7 @@ public class DataxTask extends AbstractTask {
String dataxCommand = sbr.toString(); String dataxCommand = sbr.toString();
// find process instance by task id // find process instance by task id
ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstId()); ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstanceId());
// combining local and global parameters // combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
......
...@@ -107,7 +107,7 @@ public class DependentTask extends AbstractTask { ...@@ -107,7 +107,7 @@ public class DependentTask extends AbstractTask {
try{ try{
TaskInstance taskInstance = null; TaskInstance taskInstance = null;
while(Stopper.isRunning()){ while(Stopper.isRunning()){
taskInstance = processService.findTaskInstanceById(this.taskProps.getTaskInstId()); taskInstance = processService.findTaskInstanceById(this.taskProps.getTaskInstanceId());
if(taskInstance == null){ if(taskInstance == null){
exitStatusCode = -1; exitStatusCode = -1;
......
...@@ -68,7 +68,7 @@ public class FlinkTask extends AbstractYarnTask { ...@@ -68,7 +68,7 @@ public class FlinkTask extends AbstractYarnTask {
if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) { if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) {
String args = flinkParameters.getMainArgs(); String args = flinkParameters.getMainArgs();
// get process instance by task instance id // get process instance by task instance id
ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstId()); ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstanceId());
/** /**
* combining local and global parameters * combining local and global parameters
......
...@@ -138,7 +138,7 @@ public class HttpTask extends AbstractTask { ...@@ -138,7 +138,7 @@ public class HttpTask extends AbstractTask {
*/ */
protected CloseableHttpResponse sendRequest(CloseableHttpClient client) throws IOException { protected CloseableHttpResponse sendRequest(CloseableHttpClient client) throws IOException {
RequestBuilder builder = createRequestBuilder(); RequestBuilder builder = createRequestBuilder();
ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstId()); ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstanceId());
Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(), Map<String, Property> paramsMap = ParamUtils.convert(taskProps.getUserDefParamsMap(),
taskProps.getDefinedParams(), taskProps.getDefinedParams(),
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.task.python; package org.apache.dolphinscheduler.server.worker.task.python;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.python.PythonParameters; import org.apache.dolphinscheduler.common.task.python.PythonParameters;
...@@ -24,6 +25,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; ...@@ -24,6 +25,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor; import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
...@@ -65,16 +67,18 @@ public class PythonTask extends AbstractTask { ...@@ -65,16 +67,18 @@ public class PythonTask extends AbstractTask {
public PythonTask(TaskProps taskProps, Logger logger) { public PythonTask(TaskProps taskProps, Logger logger) {
super(taskProps, logger); super(taskProps, logger);
this.taskDir = taskProps.getTaskDir(); this.taskDir = taskProps.getExecutePath();
this.pythonCommandExecutor = new PythonCommandExecutor(this::logHandle, this.pythonCommandExecutor = new PythonCommandExecutor(this::logHandle,
taskProps.getTaskDir(), taskProps.getExecutePath(),
taskProps.getTaskAppId(), taskProps.getTaskAppId(),
taskProps.getTaskInstId(), taskProps.getTaskInstanceId(),
taskProps.getTenantCode(), taskProps.getTenantCode(),
taskProps.getEnvFile(), taskProps.getEnvFile(),
taskProps.getTaskStartTime(), taskProps.getTaskStartTime(),
taskProps.getTaskTimeout(), taskProps.getTaskTimeout(),
taskProps.getLogPath(),
taskProps.getExecutePath(),
logger); logger);
this.processService = SpringApplicationContext.getBean(ProcessService.class); this.processService = SpringApplicationContext.getBean(ProcessService.class);
} }
...@@ -94,10 +98,15 @@ public class PythonTask extends AbstractTask { ...@@ -94,10 +98,15 @@ public class PythonTask extends AbstractTask {
public void handle() throws Exception { public void handle() throws Exception {
try { try {
// construct process // construct process
exitStatusCode = pythonCommandExecutor.run(buildCommand(), processService); CommandExecuteResult commandExecuteResult = pythonCommandExecutor.run(buildCommand());
} catch (Exception e) {
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
}
catch (Exception e) {
logger.error("python task failure", e); logger.error("python task failure", e);
exitStatusCode = -1; setExitStatusCode(Constants.EXIT_CODE_FAILURE);
throw e; throw e;
} }
} }
......
...@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils; ...@@ -25,6 +25,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor; import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps; import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
...@@ -74,15 +75,17 @@ public class ShellTask extends AbstractTask { ...@@ -74,15 +75,17 @@ public class ShellTask extends AbstractTask {
public ShellTask(TaskProps taskProps, Logger logger) { public ShellTask(TaskProps taskProps, Logger logger) {
super(taskProps, logger); super(taskProps, logger);
this.taskDir = taskProps.getTaskDir(); this.taskDir = taskProps.getExecutePath();
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskProps.getTaskDir(), this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskProps.getExecutePath(),
taskProps.getTaskAppId(), taskProps.getTaskAppId(),
taskProps.getTaskInstId(), taskProps.getTaskInstanceId(),
taskProps.getTenantCode(), taskProps.getTenantCode(),
taskProps.getEnvFile(), taskProps.getEnvFile(),
taskProps.getTaskStartTime(), taskProps.getTaskStartTime(),
taskProps.getTaskTimeout(), taskProps.getTaskTimeout(),
taskProps.getLogPath(),
taskProps.getExecutePath(),
logger); logger);
this.processService = SpringApplicationContext.getBean(ProcessService.class); this.processService = SpringApplicationContext.getBean(ProcessService.class);
} }
...@@ -102,10 +105,13 @@ public class ShellTask extends AbstractTask { ...@@ -102,10 +105,13 @@ public class ShellTask extends AbstractTask {
public void handle() throws Exception { public void handle() throws Exception {
try { try {
// construct process // construct process
exitStatusCode = shellCommandExecutor.run(buildCommand(), processService); CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(buildCommand());
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
} catch (Exception e) { } catch (Exception e) {
logger.error("shell task failure", e); logger.error("shell task failure", e);
exitStatusCode = -1; setExitStatusCode(Constants.EXIT_CODE_FAILURE);
throw e; throw e;
} }
} }
......
...@@ -316,7 +316,7 @@ public class SqlTask extends AbstractTask { ...@@ -316,7 +316,7 @@ public class SqlTask extends AbstractTask {
sendAttachment(sqlParameters.getTitle(), sendAttachment(sqlParameters.getTitle(),
JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue)); JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
}else{ }else{
sendAttachment(taskProps.getNodeName() + " query resultsets ", sendAttachment(taskProps.getTaskName() + " query resultsets ",
JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue)); JSONObject.toJSONString(resultJSONArray, SerializerFeature.WriteMapNullValue));
} }
} }
...@@ -384,7 +384,7 @@ public class SqlTask extends AbstractTask { ...@@ -384,7 +384,7 @@ public class SqlTask extends AbstractTask {
public void sendAttachment(String title,String content){ public void sendAttachment(String title,String content){
// process instance // process instance
ProcessInstance instance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstId()); ProcessInstance instance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstanceId());
List<User> users = alertDao.queryUserByAlertGroupId(instance.getWarningGroupId()); List<User> users = alertDao.queryUserByAlertGroupId(instance.getWarningGroupId());
...@@ -471,7 +471,7 @@ public class SqlTask extends AbstractTask { ...@@ -471,7 +471,7 @@ public class SqlTask extends AbstractTask {
*/ */
private void checkUdfPermission(Integer[] udfFunIds) throws Exception{ private void checkUdfPermission(Integer[] udfFunIds) throws Exception{
// process instance // process instance
ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstId()); ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstanceId());
int userId = processInstance.getExecutorId(); int userId = processInstance.getExecutorId();
PermissionCheck<Integer> permissionCheckUdf = new PermissionCheck<Integer>(AuthorizationType.UDF, processService,udfFunIds,userId,logger); PermissionCheck<Integer> permissionCheckUdf = new PermissionCheck<Integer>(AuthorizationType.UDF, processService,udfFunIds,userId,logger);
...@@ -485,7 +485,7 @@ public class SqlTask extends AbstractTask { ...@@ -485,7 +485,7 @@ public class SqlTask extends AbstractTask {
*/ */
private void checkDataSourcePermission(int dataSourceId) throws Exception{ private void checkDataSourcePermission(int dataSourceId) throws Exception{
// process instance // process instance
ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstId()); ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskProps.getTaskInstanceId());
int userId = processInstance.getExecutorId(); int userId = processInstance.getExecutorId();
PermissionCheck<Integer> permissionCheckDataSource = new PermissionCheck<Integer>(AuthorizationType.DATASOURCE, processService,new Integer[]{dataSourceId},userId,logger); PermissionCheck<Integer> permissionCheckDataSource = new PermissionCheck<Integer>(AuthorizationType.DATASOURCE, processService,new Integer[]{dataSourceId},userId,logger);
......
...@@ -55,13 +55,13 @@ public class ShellCommandExecutorTest { ...@@ -55,13 +55,13 @@ public class ShellCommandExecutorTest {
TaskProps taskProps = new TaskProps(); TaskProps taskProps = new TaskProps();
// processDefineId_processInstanceId_taskInstanceId // processDefineId_processInstanceId_taskInstanceId
taskProps.setTaskDir("/opt/soft/program/tmp/dolphinscheduler/exec/flow/5/36/2864/7657"); taskProps.setExecutePath("/opt/soft/program/tmp/dolphinscheduler/exec/flow/5/36/2864/7657");
taskProps.setTaskAppId("36_2864_7657"); taskProps.setTaskAppId("36_2864_7657");
// set tenant -> task execute linux user // set tenant -> task execute linux user
taskProps.setTenantCode("hdfs"); taskProps.setTenantCode("hdfs");
taskProps.setTaskStartTime(new Date()); taskProps.setTaskStartTime(new Date());
taskProps.setTaskTimeout(360000); taskProps.setTaskTimeout(360000);
taskProps.setTaskInstId(7657); taskProps.setTaskInstanceId(7657);
......
...@@ -97,15 +97,15 @@ public class SqlExecutorTest { ...@@ -97,15 +97,15 @@ public class SqlExecutorTest {
*/ */
private void sharedTestSqlTask(String nodeName, String taskAppId, String tenantCode, int taskInstId) throws Exception { private void sharedTestSqlTask(String nodeName, String taskAppId, String tenantCode, int taskInstId) throws Exception {
TaskProps taskProps = new TaskProps(); TaskProps taskProps = new TaskProps();
taskProps.setTaskDir(""); taskProps.setExecutePath("");
// processDefineId_processInstanceId_taskInstanceId // processDefineId_processInstanceId_taskInstanceId
taskProps.setTaskAppId(taskAppId); taskProps.setTaskAppId(taskAppId);
// set tenant -> task execute linux user // set tenant -> task execute linux user
taskProps.setTenantCode(tenantCode); taskProps.setTenantCode(tenantCode);
taskProps.setTaskStartTime(new Date()); taskProps.setTaskStartTime(new Date());
taskProps.setTaskTimeout(360000); taskProps.setTaskTimeout(360000);
taskProps.setTaskInstId(taskInstId); taskProps.setTaskInstanceId(taskInstId);
taskProps.setNodeName(nodeName); taskProps.setTaskName(nodeName);
taskProps.setCmdTypeIfComplement(CommandType.START_PROCESS); taskProps.setCmdTypeIfComplement(CommandType.START_PROCESS);
......
...@@ -71,9 +71,9 @@ public class DataxTaskTest { ...@@ -71,9 +71,9 @@ public class DataxTaskTest {
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService); Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
TaskProps props = new TaskProps(); TaskProps props = new TaskProps();
props.setTaskDir("/tmp"); props.setExecutePath("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis())); props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstId(1); props.setTaskInstanceId(1);
props.setTenantCode("1"); props.setTenantCode("1");
props.setEnvFile(".dolphinscheduler_env.sh"); props.setEnvFile(".dolphinscheduler_env.sh");
props.setTaskStartTime(new Date()); props.setTaskStartTime(new Date());
...@@ -87,8 +87,8 @@ public class DataxTaskTest { ...@@ -87,8 +87,8 @@ public class DataxTaskTest {
Mockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource()); Mockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource());
Mockito.when(processService.findProcessInstanceByTaskId(1)).thenReturn(getProcessInstance()); Mockito.when(processService.findProcessInstanceByTaskId(1)).thenReturn(getProcessInstance());
String fileName = String.format("%s/%s_node.sh", props.getTaskDir(), props.getTaskAppId()); String fileName = String.format("%s/%s_node.sh", props.getExecutePath(), props.getTaskAppId());
Mockito.when(shellCommandExecutor.run(fileName, processService)).thenReturn(0); Mockito.when(shellCommandExecutor.run(fileName)).thenReturn(null);
} }
private DataSource getDataSource() { private DataSource getDataSource() {
...@@ -118,9 +118,9 @@ public class DataxTaskTest { ...@@ -118,9 +118,9 @@ public class DataxTaskTest {
public void testDataxTask() public void testDataxTask()
throws Exception { throws Exception {
TaskProps props = new TaskProps(); TaskProps props = new TaskProps();
props.setTaskDir("/tmp"); props.setExecutePath("/tmp");
props.setTaskAppId(String.valueOf(System.currentTimeMillis())); props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
props.setTaskInstId(1); props.setTaskInstanceId(1);
props.setTenantCode("1"); props.setTenantCode("1");
Assert.assertNotNull(new DataxTask(props, logger)); Assert.assertNotNull(new DataxTask(props, logger));
} }
......
...@@ -50,7 +50,7 @@ public class DependentTaskTest { ...@@ -50,7 +50,7 @@ public class DependentTaskTest {
"\"relation\":\"OR\"\n" + "\"relation\":\"OR\"\n" +
"}"; "}";
taskProps.setTaskInstId(252612); taskProps.setTaskInstanceId(252612);
taskProps.setDependence(dependString); taskProps.setDependence(dependString);
DependentTask dependentTask = new DependentTask(taskProps, logger); DependentTask dependentTask = new DependentTask(taskProps, logger);
dependentTask.init(); dependentTask.init();
......
...@@ -170,7 +170,7 @@ ...@@ -170,7 +170,7 @@
*/ */
_downloadLog () { _downloadLog () {
downloadFile('/dolphinscheduler/log/download-log', { downloadFile('/dolphinscheduler/log/download-log', {
taskInstId: this.stateId || this.logId taskInstanceId: this.stateId || this.logId
}) })
}, },
/** /**
...@@ -256,7 +256,7 @@ ...@@ -256,7 +256,7 @@
computed: { computed: {
_rtParam () { _rtParam () {
return { return {
taskInstId: this.stateId || this.logId, taskInstanceId: this.stateId || this.logId,
skipLineNum: parseInt(`${this.loadingIndex ? this.loadingIndex + '000' : 0}`), skipLineNum: parseInt(`${this.loadingIndex ? this.loadingIndex + '000' : 0}`),
limit: parseInt(`${this.loadingIndex ? this.loadingIndex + 1 : 1}000`) limit: parseInt(`${this.loadingIndex ? this.loadingIndex + 1 : 1}000`)
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册