未验证 提交 bb3885cf 编写于 作者: Q qiaozhanwei 提交者: GitHub

master add kill task logic (#2058)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

* add TaskInstanceCacheManager receive Worker report result

* TaskInstance setExecutePath

* add TaskInstanceCacheManager to receive Worker Task result report

* TaskInstanceCacheManager add remove method

* add license

* add dispatcht task method

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify

* worker remove db

* ShellTask modify

* master persistence processId and appIds

* master persistence processId and appIds

* master add kill task logic

* master add kill task logic

* master add kill task logic

* javadoc error modify
上级 9d36eaf6
......@@ -57,4 +57,14 @@ public enum DbType {
public String getDescp() {
return descp;
}
public static DbType of(int type){
for(DbType ty : values()){
if(ty.getCode() == type){
return ty;
}
}
throw new IllegalArgumentException("invalid type : " + type);
}
}
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * kill task request command */public class KillTaskRequestCommand implements Serializable { /** * taskInstanceId */ private int taskInstanceId; /** * processId */ private int processId; /** * host */ private String host; /** * tenantCode */ private String tenantCode; /** * logPath */ private String logPath; /** * executePath */ private String executePath; public String getLogPath() { return logPath; } public void setLogPath(String logPath) { this.logPath = logPath; } public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public int getProcessId() { return processId; } public void setProcessId(int processId) { this.processId = processId; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public String getTenantCode() { return tenantCode; } public void setTenantCode(String tenantCode) { this.tenantCode = tenantCode; } public String getExecutePath() { return executePath; } public void setExecutePath(String executePath) { this.executePath = executePath; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.KILL_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } }
\ No newline at end of file
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * kill task request command */public class KillTaskRequestCommand implements Serializable { /** * task execution context */ private String taskExecutionContext; public String getTaskExecutionContext() { return taskExecutionContext; } public void setTaskExecutionContext(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } public KillTaskRequestCommand() { } public KillTaskRequestCommand(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.KILL_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "KillTaskRequestCommand{" + "taskExecutionContext='" + taskExecutionContext + '\'' + '}'; }}
\ No newline at end of file
......
......@@ -17,10 +17,11 @@
package org.apache.dolphinscheduler.server.builder;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
/**
* TaskExecutionContext builder
......@@ -47,6 +48,8 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setLogPath(taskInstance.getLogPath());
taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
taskExecutionContext.setHost(taskInstance.getHost());
taskExecutionContext.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
return this;
}
......@@ -54,7 +57,7 @@ public class TaskExecutionContextBuilder {
/**
* build processInstance related info
*
* @param processInstance
* @param processInstance processInstance
* @return TaskExecutionContextBuilder
*/
public TaskExecutionContextBuilder buildProcessInstanceRelatedInfo(ProcessInstance processInstance){
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.entity;
import java.io.Serializable;
/**
* master/worker task transport
*/
public class DataxTaskExecutionContext implements Serializable{
/**
* dataSourceId
*/
private int dataSourceId;
/**
* sourcetype
*/
private int sourcetype;
/**
* sourceConnectionParams
*/
private String sourceConnectionParams;
/**
* dataTargetId
*/
private int dataTargetId;
/**
* targetType
*/
private int targetType;
/**
* targetConnectionParams
*/
private String targetConnectionParams;
public int getDataSourceId() {
return dataSourceId;
}
public void setDataSourceId(int dataSourceId) {
this.dataSourceId = dataSourceId;
}
public int getSourcetype() {
return sourcetype;
}
public void setSourcetype(int sourcetype) {
this.sourcetype = sourcetype;
}
public String getSourceConnectionParams() {
return sourceConnectionParams;
}
public void setSourceConnectionParams(String sourceConnectionParams) {
this.sourceConnectionParams = sourceConnectionParams;
}
public int getDataTargetId() {
return dataTargetId;
}
public void setDataTargetId(int dataTargetId) {
this.dataTargetId = dataTargetId;
}
public int getTargetType() {
return targetType;
}
public void setTargetType(int targetType) {
this.targetType = targetType;
}
public String getTargetConnectionParams() {
return targetConnectionParams;
}
public void setTargetConnectionParams(String targetConnectionParams) {
this.targetConnectionParams = targetConnectionParams;
}
@Override
public String toString() {
return "DataxTaskExecutionContext{" +
"dataSourceId=" + dataSourceId +
", sourcetype=" + sourcetype +
", sourceConnectionParams='" + sourceConnectionParams + '\'' +
", dataTargetId=" + dataTargetId +
", targetType=" + targetType +
", targetConnectionParams='" + targetConnectionParams + '\'' +
'}';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.entity;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import java.io.Serializable;
import java.util.List;
/**
* SQL Task ExecutionContext
*/
public class SQLTaskExecutionContext implements Serializable {
/**
* warningGroupId
*/
private int warningGroupId;
/**
* udf function list
*/
private List<UdfFunc> udfFuncList;
public int getWarningGroupId() {
return warningGroupId;
}
public void setWarningGroupId(int warningGroupId) {
this.warningGroupId = warningGroupId;
}
public List<UdfFunc> getUdfFuncList() {
return udfFuncList;
}
public void setUdfFuncList(List<UdfFunc> udfFuncList) {
this.udfFuncList = udfFuncList;
}
@Override
public String toString() {
return "SQLTaskExecutionContext{" +
"warningGroupId=" + warningGroupId +
", udfFuncList=" + udfFuncList +
'}';
}
}
......@@ -15,12 +15,10 @@
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.entity;
package org.apache.dolphinscheduler.server.entity;
import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
/**
......@@ -144,7 +142,6 @@ public class TaskExecutionContext implements Serializable{
*/
private Map<String, String> definedParams;
/**
* task AppId
*/
......@@ -165,6 +162,20 @@ public class TaskExecutionContext implements Serializable{
*/
private String workerGroup;
/**
* sql TaskExecutionContext
*/
private SQLTaskExecutionContext sqlTaskExecutionContext;
/**
* datax TaskExecutionContext
*/
private DataxTaskExecutionContext dataxTaskExecutionContext;
public String getWorkerGroup() {
return workerGroup;
}
......@@ -373,6 +384,21 @@ public class TaskExecutionContext implements Serializable{
this.appIds = appIds;
}
public SQLTaskExecutionContext getSqlTaskExecutionContext() {
return sqlTaskExecutionContext;
}
public void setSqlTaskExecutionContext(SQLTaskExecutionContext sqlTaskExecutionContext) {
this.sqlTaskExecutionContext = sqlTaskExecutionContext;
}
public DataxTaskExecutionContext getDataxTaskExecutionContext() {
return dataxTaskExecutionContext;
}
public void setDataxTaskExecutionContext(DataxTaskExecutionContext dataxTaskExecutionContext) {
this.dataxTaskExecutionContext = dataxTaskExecutionContext;
}
@Override
public String toString() {
......@@ -402,6 +428,9 @@ public class TaskExecutionContext implements Serializable{
", taskAppId='" + taskAppId + '\'' +
", taskTimeoutStrategy=" + taskTimeoutStrategy +
", taskTimeout=" + taskTimeout +
", workerGroup='" + workerGroup + '\'' +
", sqlTaskExecutionContext=" + sqlTaskExecutionContext +
", dataxTaskExecutionContext=" + dataxTaskExecutionContext +
'}';
}
}
......@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistry;
import org.apache.dolphinscheduler.server.master.runner.MasterSchedulerThread;
......@@ -127,6 +128,7 @@ public class MasterServer implements IStoppable {
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingServer.start();
//
......
......@@ -20,7 +20,7 @@ package org.apache.dolphinscheduler.server.master.cache;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
/**
* task instance state manager
......
......@@ -20,7 +20,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.springframework.beans.factory.annotation.Autowired;
......
......@@ -67,8 +67,10 @@ public class ExecutorDispatcher implements InitializingBean {
}
/**
* task dispatch
* task dispatch
*
* @param context context
* @return result
* @throws ExecuteException
*/
public Boolean dispatch(final ExecutionContext context) throws ExecuteException {
......
......@@ -17,8 +17,8 @@
package org.apache.dolphinscheduler.server.master.dispatch.context;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
/**
......
......@@ -27,6 +27,7 @@ public abstract class AbstractExecutorManager<T> implements ExecutorManager<T>{
/**
* before execute , add time monitor , timeout
*
* @param context context
* @throws ExecuteException
*/
......
......@@ -27,6 +27,7 @@ public interface ExecutorManager<T> {
/**
* before execute
*
* @param executeContext executeContext
* @throws ExecuteException
*/
......@@ -35,12 +36,13 @@ public interface ExecutorManager<T> {
/**
* execute task
* @param context context
* @return T
* @throws ExecuteException
*/
T execute(ExecutionContext context) throws ExecuteException;
/**
* after execute
* after execute
* @param context context
* @throws ExecuteException
*/
......
......@@ -23,9 +23,9 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
......@@ -72,9 +72,11 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
}
/**
* execute logic
* execute logic
* @param context context
* @return result
* @throws ExecuteException
*/
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.executor;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.KillTaskRequestCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
/**
* netty executor manager
*/
@Service
public class NettyKillManager extends AbstractExecutorManager<Boolean>{
private final Logger logger = LoggerFactory.getLogger(NettyKillManager.class);
/**
* netty remote client
*/
private final NettyRemotingClient nettyRemotingClient;
public NettyKillManager(){
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
/**
* register KILL_TASK_RESPONSE command type TaskKillResponseProcessor
*/
this.nettyRemotingClient.registerProcessor(CommandType.KILL_TASK_RESPONSE, new TaskKillResponseProcessor());
}
/**
* execute logic
*
* @param context context
* @return result
* @throws ExecuteException
*/
@Override
public Boolean execute(ExecutionContext context) throws ExecuteException {
Host host = context.getHost();
Command command = buildCommand(context);
try {
doExecute(host, command);
return true;
}catch (ExecuteException ex) {
logger.error(String.format("execute context : %s error", context.getContext()), ex);
return false;
}
}
private Command buildCommand(ExecutionContext context) {
KillTaskRequestCommand requestCommand = new KillTaskRequestCommand();
TaskExecutionContext taskExecutionContext = context.getContext();
requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
return requestCommand.convert2Command();
}
/**
* execute logic
* @param host host
* @param command command
* @throws ExecuteException
*/
private void doExecute(final Host host, final Command command) throws ExecuteException {
/**
* retry count,default retry 3
*/
int retryCount = 3;
boolean success = false;
do {
try {
nettyRemotingClient.send(host, command);
success = true;
} catch (Exception ex) {
logger.error(String.format("send command : %s to %s error", command, host), ex);
retryCount--;
try {
Thread.sleep(100);
} catch (InterruptedException ignore) {}
}
} while (retryCount >= 0 && !success);
if (!success) {
throw new ExecuteException(String.format("send command : %s to %s error", command, host));
}
}
}
......@@ -24,6 +24,12 @@ import java.util.Collection;
*/
public class LowerWeightRoundRobin implements Selector<HostWeight>{
/**
* select
* @param sources sources
* @return HostWeight
*/
@Override
public HostWeight select(Collection<HostWeight> sources){
int totalWeight = 0;
int lowWeight = 0;
......
......@@ -68,7 +68,7 @@ public class TaskFuture {
}
/**
* wait for response
* wait for response
* @return command
* @throws InterruptedException
*/
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.KillTaskResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* task response processor
*/
public class TaskKillResponseProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskKillResponseProcessor.class);
/**
* task final result response
* need master process , state persistence
*
* @param channel channel
* @param command command
*/
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.KILL_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
KillTaskResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), KillTaskResponseCommand.class);
logger.info("received command : {}", responseCommand);
logger.info("已经接受到了worker杀任务的回应");
}
}
......@@ -72,6 +72,8 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
processService.changeTaskState(ExecutionStatus.of(responseCommand.getStatus()),
responseCommand.getEndTime(),
responseCommand.getProcessId(),
responseCommand.getAppIds(),
responseCommand.getTaskInstanceId());
}
......
......@@ -67,9 +67,10 @@ public class MasterRegistry {
private final String startTime;
/**
* construct
* construct
* @param zookeeperRegistryCenter zookeeperRegistryCenter
* @param port port
* @param heartBeatInterval heartBeatInterval
*/
public MasterRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){
this.zookeeperRegistryCenter = zookeeperRegistryCenter;
......
......@@ -24,8 +24,8 @@ import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
......@@ -147,7 +147,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
* @param taskInstance taskInstance
* @return TaskExecutionContext
*/
private TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance){
protected TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance){
taskInstance = processService.getTaskInstanceDetailByTaskId(taskInstance.getId());
Integer userId = taskInstance.getProcessDefine() == null ? 0 : taskInstance.getProcessDefine().getUserId();
......
......@@ -142,8 +142,9 @@ public class MasterExecThread implements Runnable {
/**
* constructor of MasterExecThread
* @param processInstance process instance
* @param processService process dao
* @param processInstance processInstance
* @param processService processService
* @param nettyRemotingClient nettyRemotingClient
*/
public MasterExecThread(ProcessInstance processInstance, ProcessService processService, NettyRemotingClient nettyRemotingClient){
this.processService = processService;
......
......@@ -26,15 +26,19 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyKillManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import static org.apache.dolphinscheduler.common.Constants.DOLPHINSCHEDULER_TASKS_KILL;
/**
* master task exec thread
......@@ -52,6 +56,9 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
*/
private TaskInstanceCacheManager taskInstanceCacheManager;
private NettyKillManager nettyKillManager;
/**
* constructor of MasterTaskExecThread
* @param taskInstance task instance
......@@ -60,6 +67,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
public MasterTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){
super(taskInstance, processInstance);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
this.nettyKillManager = SpringApplicationContext.getBean(NettyKillManager.class);
}
/**
......@@ -78,6 +86,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
/**
* TODO submit task instance and wait complete
*
* @return true is task quit is true
*/
@Override
......@@ -99,14 +108,14 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
}
/**
* TODO 在这里轮询数据库
* TODO polling db
*
* wait task quit
* @return true if task quit success
*/
public Boolean waitTaskQuit(){
// query new state
taskInstance = taskInstanceCacheManager.getByTaskInstanceId(taskInstance.getId());
taskInstance = processService.findTaskInstanceById(taskInstance.getId());
logger.info("wait task: process id: {}, task id:{}, task name:{} complete",
this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName());
// task time out
......@@ -147,7 +156,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
}
}
// updateProcessInstance task instance
taskInstance = taskInstanceCacheManager.getByTaskInstanceId(taskInstance.getId());
taskInstance = processService.findTaskInstanceById(taskInstance.getId());
processInstance = processService.findProcessInstanceById(processInstance.getId());
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) {
......@@ -163,23 +172,26 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
/**
* TODO Kill 任务
* TODO Kill TASK
*
* task instance add queue , waiting worker to kill
*/
private void cancelTaskInstance(){
private void cancelTaskInstance() throws Exception{
if(alreadyKilled){
return ;
}
alreadyKilled = true;
String host = taskInstance.getHost();
if(host == null){
host = Constants.NULL;
}
String queueValue = String.format("%s-%d",
host, taskInstance.getId());
// TODO 这里写
taskQueue.sadd(DOLPHINSCHEDULER_TASKS_KILL, queueValue);
TaskExecutionContext taskExecutionContext = super.getTaskExecutionContext(taskInstance);
ExecutionContext executionContext = new ExecutionContext(taskExecutionContext, ExecutorType.WORKER);
Host host = new Host();
host.setIp(taskInstance.getHost());
host.setPort(12346);
executionContext.setHost(host);
nettyKillManager.execute(executionContext);
logger.info("master add kill task :{} id:{} to kill queue",
taskInstance.getName(), taskInstance.getId() );
......@@ -197,7 +209,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
/**
* get remain time(s)
* get remain time?s?
*
* @return remain time
*/
......
......@@ -23,6 +23,11 @@ public interface Monitor {
/**
* monitor server and restart
*
* @param masterPath masterPath
* @param workerPath workerPath
* @param port port
* @param installPath installPath
*/
void monitor(String masterPath, String workerPath, Integer port, String installPath);
}
......@@ -74,7 +74,7 @@ public class ZookeeperNodeManager implements InitializingBean {
private ZookeeperRegistryCenter registryCenter;
/**
* init listener
* init listener
* @throws Exception
*/
@Override
......@@ -234,8 +234,8 @@ public class ZookeeperNodeManager implements InitializingBean {
/**
* get worker group nodes
* @param workerGroup
* @return
* @param workerGroup workerGroup
* @return worker nodes
*/
public Set<String> getWorkerGroupNodes(String workerGroup){
workerGroupLock.lock();
......
......@@ -129,7 +129,7 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/**
* get worker group directly
* @return
* @return worker group nodes
*/
public Set<String> getWorkerGroupDirectly() {
List<String> workers = getChildrenKeys(getWorkerPath());
......@@ -166,8 +166,8 @@ public class ZookeeperRegistryCenter implements InitializingBean {
/**
* get worker group path
* @param workerGroup
* @return
* @param workerGroup workerGroup
* @return worker group path
*/
public String getWorkerGroupPath(String workerGroup) {
return WORKER_PATH + "/" + workerGroup;
......
......@@ -111,7 +111,8 @@ public class ParamUtils {
/**
* get parameters map
* @return user defined params map
* @param definedParams definedParams
* @return parameters map
*/
public static Map<String,Property> getUserDefParamsMap(Map<String,String> definedParams) {
if (definedParams != null) {
......
......@@ -21,9 +21,8 @@ import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.commons.io.FileUtils;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -17,7 +17,8 @@
package org.apache.dolphinscheduler.server.worker.cache;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
/**
* TaskExecutionContextCacheManager
......
......@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.worker.cache.impl;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.springframework.stereotype.Service;
......
......@@ -33,13 +33,12 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.processor;
import com.alibaba.fastjson.JSONObject;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
......@@ -28,9 +29,9 @@ import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.KillTaskRequestCommand;
import org.apache.dolphinscheduler.remote.command.KillTaskResponseCommand;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
......@@ -75,32 +76,35 @@ public class TaskKillProcessor implements NettyRequestProcessor {
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
}
/**
* kill task logic
*
* @param killCommand killCommand
* @param context context
* @return execute result
*/
private Boolean doKill(KillTaskRequestCommand killCommand){
private Boolean doKill(TaskExecutionContext context){
try {
TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
TaskExecutionContext taskExecutionContext = taskExecutionContextCacheManager.getByTaskInstanceId(context.getTaskInstanceId());
context.setProcessId(taskExecutionContext.getProcessId());
Integer processId = taskExecutionContext.getProcessId();
if (processId == null || processId.equals(0)){
logger.error("process kill failed, process id :{}, task id:{}", processId, killCommand.getTaskInstanceId());
logger.error("process kill failed, process id :{}, task id:{}", processId, context.getTaskInstanceId());
return false;
}
killCommand.setProcessId(processId);
String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(killCommand.getProcessId()));
String cmd = String.format("sudo kill -9 %s", ProcessUtils.getPidsStr(context.getProcessId()));
logger.info("process id:{}, cmd:{}", killCommand.getProcessId(), cmd);
logger.info("process id:{}, cmd:{}", context.getProcessId(), cmd);
OSUtils.exeCmd(cmd);
// find log and kill yarn job
killYarnJob(killCommand.getHost(), killCommand.getLogPath(), killCommand.getExecutePath(), killCommand.getTenantCode());
killYarnJob(context.getHost(), context.getLogPath(), context.getExecutePath(), context.getTenantCode());
return true;
} catch (Exception e) {
......@@ -115,29 +119,37 @@ public class TaskKillProcessor implements NettyRequestProcessor {
KillTaskRequestCommand killTaskRequestCommand = FastJsonSerializer.deserialize(command.getBody(), KillTaskRequestCommand.class);
logger.info("received command : {}", killTaskRequestCommand);
Boolean killStatus = doKill(killTaskRequestCommand);
KillTaskResponseCommand killTaskResponseCommand = buildKillTaskResponseCommand(killTaskRequestCommand,killStatus);
String contextJson = killTaskRequestCommand.getTaskExecutionContext();
TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class);
Boolean killStatus = doKill(taskExecutionContext);
killTaskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
KillTaskResponseCommand killTaskResponseCommand = buildKillTaskResponseCommand(taskExecutionContext,killStatus);
killTaskCallbackService.sendKillResult(killTaskResponseCommand.getTaskInstanceId(),killTaskResponseCommand);
}
/**
* build KillTaskResponseCommand
*
* @param killTaskRequestCommand killTaskRequestCommand
* @param taskExecutionContext taskExecutionContext
* @param killStatus killStatus
* @return KillTaskResponseCommand
* @return build KillTaskResponseCommand
*/
private KillTaskResponseCommand buildKillTaskResponseCommand(KillTaskRequestCommand killTaskRequestCommand,
private KillTaskResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext,
Boolean killStatus) {
KillTaskResponseCommand killTaskResponseCommand = new KillTaskResponseCommand();
killTaskResponseCommand.setTaskInstanceId(killTaskRequestCommand.getTaskInstanceId());
killTaskResponseCommand.setHost(killTaskRequestCommand.getHost());
killTaskResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
killTaskResponseCommand.setHost(taskExecutionContext.getHost());
killTaskResponseCommand.setStatus(killStatus ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode());
killTaskResponseCommand.setProcessId(killTaskRequestCommand.getProcessId());
killTaskResponseCommand.setProcessId(taskExecutionContext.getProcessId());
killTaskResponseCommand.setAppIds(appIds);
return null;
return killTaskResponseCommand;
}
/**
......@@ -156,6 +168,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
String log = null;
try {
logClient = new LogClientService();
logger.info("view log host : {},logPath : {}", host,logPath);
log = logClient.viewLog(host, Constants.RPC_PORT, logPath);
} finally {
if(logClient != null){
......
......@@ -76,9 +76,11 @@ public class WorkerRegistry {
private String workerGroup;
/**
* construct
* construct
*
* @param zookeeperRegistryCenter zookeeperRegistryCenter
* @param port port
* @param heartBeatInterval heartBeatInterval
*/
public WorkerRegistry(ZookeeperRegistryCenter zookeeperRegistryCenter, int port, long heartBeatInterval){
this(zookeeperRegistryCenter, port, heartBeatInterval, DEFAULT_WORKER_GROUP);
......
......@@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
......
......@@ -21,17 +21,13 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.cache.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
......@@ -66,11 +62,6 @@ public abstract class AbstractCommandExecutor {
*/
protected Consumer<List<String>> logHandler;
/**
* timeout
*/
protected int timeout;
/**
* logger
*/
......@@ -132,6 +123,7 @@ public abstract class AbstractCommandExecutor {
/**
* task specific execution logic
*
* @param execCommand execCommand
* @return CommandExecuteResult
* @throws Exception
......@@ -174,8 +166,6 @@ public abstract class AbstractCommandExecutor {
// waiting for the run to finish
boolean status = process.waitFor(remainTime, TimeUnit.SECONDS);
// SHELL task state
result.setExitStatusCode(process.exitValue());
logger.info("process has exited, execute path:{}, processId:{} ,exitStatusCode:{}",
taskExecutionContext.getExecutePath(),
......@@ -195,6 +185,9 @@ public abstract class AbstractCommandExecutor {
ProcessUtils.kill(taskExecutionContext);
result.setExitStatusCode(EXIT_CODE_FAILURE);
}
// SHELL task state
result.setExitStatusCode(process.exitValue());
return result;
}
......@@ -378,7 +371,7 @@ public abstract class AbstractCommandExecutor {
List<String> appIds = new ArrayList<>();
/**
* analysis logget submited yarn application id
* analysis log?get submited yarn application id
*/
for (String log : logs) {
String appId = findAppId(log);
......@@ -440,13 +433,13 @@ public abstract class AbstractCommandExecutor {
/**
* get remain time(s)
* get remain time?s?
*
* @return remain time
*/
private long getRemaintime() {
long usedTime = (System.currentTimeMillis() - taskExecutionContext.getStartTime().getTime()) / 1000;
long remainTime = timeout - usedTime;
long remainTime = taskExecutionContext.getTaskTimeout() - usedTime;
if (remainTime < 0) {
throw new RuntimeException("task execution time out");
......
......@@ -30,7 +30,7 @@ import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.TaskRecordDao;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
......
......@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
......
......@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -17,7 +17,7 @@
package org.apache.dolphinscheduler.server.worker.task;
import org.apache.commons.io.FileUtils;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.slf4j.Logger;
import java.io.File;
......@@ -25,7 +25,6 @@ import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Date;
import java.util.List;
import java.util.function.Consumer;
......
......@@ -19,7 +19,7 @@ package org.apache.dolphinscheduler.server.worker.task;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.utils.EnumUtils;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.datax.DataxTask;
import org.apache.dolphinscheduler.server.worker.task.flink.FlinkTask;
import org.apache.dolphinscheduler.server.worker.task.http.HttpTask;
......
......@@ -129,19 +129,21 @@ public class TaskProps {
/**
* constructor
* @param taskParams task params
* @param taskDir task dir
* @param scheduleTime schedule time
* @param nodeName node name
* @param taskType task type
* @param taskInstanceId task instance id
* @param envFile env file
* @param tenantCode tenant code
* @param queue queue
* @param taskStartTime task start time
* @param definedParams defined params
* @param dependence dependence
* @param cmdTypeIfComplement cmd type if complement
* @param taskParams taskParams
* @param scheduleTime scheduleTime
* @param nodeName nodeName
* @param taskType taskType
* @param taskInstanceId taskInstanceId
* @param envFile envFile
* @param tenantCode tenantCode
* @param queue queue
* @param taskStartTime taskStartTime
* @param definedParams definedParams
* @param dependence dependence
* @param cmdTypeIfComplement cmdTypeIfComplement
* @param host host
* @param logPath logPath
* @param executePath executePath
*/
public TaskProps(String taskParams,
Date scheduleTime,
......
......@@ -39,6 +39,7 @@ import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
......@@ -50,13 +51,13 @@ import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.DataxUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
......@@ -98,11 +99,6 @@ public class DataxTask extends AbstractTask {
*/
private DataxParameters dataXParameters;
/**
* task dir
*/
private String taskDir;
/**
* shell command executor
*/
......@@ -113,11 +109,6 @@ public class DataxTask extends AbstractTask {
*/
private TaskExecutionContext taskExecutionContext;
/**
* processService
*/
private ProcessService processService;
/**
* constructor
* @param taskExecutionContext taskExecutionContext
......@@ -127,13 +118,9 @@ public class DataxTask extends AbstractTask {
super(taskExecutionContext, logger);
this.taskExecutionContext = taskExecutionContext;
logger.info("task dir : {}", taskDir);
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle,
taskExecutionContext,logger);
processService = SpringApplicationContext.getBean(ProcessService.class);
}
/**
......@@ -151,12 +138,11 @@ public class DataxTask extends AbstractTask {
/**
* run DataX process
*
*
* @throws Exception
*/
@Override
public void handle()
throws Exception {
public void handle() throws Exception {
try {
// set the name of the current thread
String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskExecutionContext.getTaskAppId());
......@@ -180,8 +166,8 @@ public class DataxTask extends AbstractTask {
/**
* cancel DataX process
*
* @param cancelApplication
*
* @param cancelApplication cancelApplication
* @throws Exception
*/
@Override
......@@ -200,7 +186,9 @@ public class DataxTask extends AbstractTask {
private String buildDataxJsonFile()
throws Exception {
// generate json
String fileName = String.format("%s/%s_job.json", taskDir, taskExecutionContext.getTaskAppId());
String fileName = String.format("%s/%s_job.json",
taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId());
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
......@@ -230,13 +218,14 @@ public class DataxTask extends AbstractTask {
*/
private List<JSONObject> buildDataxJobContentJson()
throws SQLException {
DataSource dataSource = processService.findDataSourceById(dataXParameters.getDataSource());
BaseDataSource dataSourceCfg = DataSourceFactory.getDatasource(dataSource.getType(),
dataSource.getConnectionParams());
DataxTaskExecutionContext dataxTaskExecutionContext = taskExecutionContext.getDataxTaskExecutionContext();
DataSource dataTarget = processService.findDataSourceById(dataXParameters.getDataTarget());
BaseDataSource dataTargetCfg = DataSourceFactory.getDatasource(dataTarget.getType(),
dataTarget.getConnectionParams());
BaseDataSource dataSourceCfg = DataSourceFactory.getDatasource(DbType.of(dataxTaskExecutionContext.getSourcetype()),
dataxTaskExecutionContext.getSourceConnectionParams());
BaseDataSource dataTargetCfg = DataSourceFactory.getDatasource(DbType.of(dataxTaskExecutionContext.getTargetType()),
dataxTaskExecutionContext.getTargetConnectionParams());
List<JSONObject> readerConnArr = new ArrayList<>();
JSONObject readerConn = new JSONObject();
......@@ -250,7 +239,7 @@ public class DataxTask extends AbstractTask {
readerParam.put("connection", readerConnArr);
JSONObject reader = new JSONObject();
reader.put("name", DataxUtils.getReaderPluginName(dataSource.getType()));
reader.put("name", DataxUtils.getReaderPluginName(DbType.of(dataxTaskExecutionContext.getSourcetype())));
reader.put("parameter", readerParam);
List<JSONObject> writerConnArr = new ArrayList<>();
......@@ -263,7 +252,9 @@ public class DataxTask extends AbstractTask {
writerParam.put("username", dataTargetCfg.getUser());
writerParam.put("password", dataTargetCfg.getPassword());
writerParam.put("column",
parsingSqlColumnNames(dataSource.getType(), dataTarget.getType(), dataSourceCfg, dataXParameters.getSql()));
parsingSqlColumnNames(DbType.of(dataxTaskExecutionContext.getSourcetype()),
DbType.of(dataxTaskExecutionContext.getTargetType()),
dataSourceCfg, dataXParameters.getSql()));
writerParam.put("connection", writerConnArr);
if (CollectionUtils.isNotEmpty(dataXParameters.getPreStatements())) {
......@@ -275,7 +266,7 @@ public class DataxTask extends AbstractTask {
}
JSONObject writer = new JSONObject();
writer.put("name", DataxUtils.getWriterPluginName(dataTarget.getType()));
writer.put("name", DataxUtils.getWriterPluginName(DbType.of(dataxTaskExecutionContext.getTargetType())));
writer.put("parameter", writerParam);
List<JSONObject> contentList = new ArrayList<>();
......@@ -348,7 +339,9 @@ public class DataxTask extends AbstractTask {
private String buildShellCommandFile(String jobConfigFilePath)
throws Exception {
// generate scripts
String fileName = String.format("%s/%s_node.sh", taskDir, taskExecutionContext.getTaskAppId());
String fileName = String.format("%s/%s_node.sh",
taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId());
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
......@@ -364,9 +357,6 @@ public class DataxTask extends AbstractTask {
sbr.append(jobConfigFilePath);
String dataxCommand = sbr.toString();
// find process instance by task id
ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskExecutionContext.getTaskInstanceId());
// combining local and global parameters
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
......
......@@ -23,12 +23,10 @@ import org.apache.dolphinscheduler.common.task.flink.FlinkParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.FlinkArgsUtils;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.slf4j.Logger;
import java.util.ArrayList;
......
......@@ -30,13 +30,9 @@ import org.apache.dolphinscheduler.common.task.http.HttpParameters;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.http.HttpEntity;
import org.apache.http.ParseException;
import org.apache.http.client.config.RequestConfig;
......@@ -67,10 +63,7 @@ public class HttpTask extends AbstractTask {
*/
private HttpParameters httpParameters;
/**
* process service
*/
private ProcessService processService;
/**
* Convert mill seconds to second unit
......@@ -146,7 +139,6 @@ public class HttpTask extends AbstractTask {
*/
protected CloseableHttpResponse sendRequest(CloseableHttpClient client) throws IOException {
RequestBuilder builder = createRequestBuilder();
ProcessInstance processInstance = processService.findProcessInstanceByTaskId(taskExecutionContext.getTaskInstanceId());
// replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
......
......@@ -25,7 +25,7 @@ import org.apache.dolphinscheduler.common.task.mr.MapreduceParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
......
......@@ -31,10 +31,9 @@ import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
......
......@@ -24,14 +24,11 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.python.PythonParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import java.util.Map;
......
......@@ -24,14 +24,11 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import java.io.File;
......@@ -54,11 +51,6 @@ public class ShellTask extends AbstractTask {
*/
private ShellParameters shellParameters;
/**
* task dir
*/
private String taskDir;
/**
* shell command executor
*/
......@@ -122,7 +114,10 @@ public class ShellTask extends AbstractTask {
*/
private String buildCommand() throws Exception {
// generate scripts
String fileName = String.format("%s/%s_node.sh", taskDir, taskExecutionContext.getTaskAppId());
String fileName = String.format("%s/%s_node.sh",
taskExecutionContext.getExecutePath(),
taskExecutionContext.getTaskAppId());
Path path = new File(fileName).toPath();
if (Files.exists(path)) {
......@@ -148,7 +143,7 @@ public class ShellTask extends AbstractTask {
shellParameters.setRawScript(script);
logger.info("raw script : {}", shellParameters.getRawScript());
logger.info("task dir : {}", taskDir);
logger.info("task execute path : {}", taskExecutionContext.getExecutePath());
Set<PosixFilePermission> perms = PosixFilePermissions.fromString(Constants.RWXR_XR_X);
FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
......
......@@ -24,11 +24,10 @@ import org.apache.dolphinscheduler.common.task.spark.SparkParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.SparkArgsUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.slf4j.Logger;
import java.util.ArrayList;
......
......@@ -19,7 +19,6 @@ package org.apache.dolphinscheduler.server.worker.task.sql;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.alert.utils.MailUtils;
import org.apache.dolphinscheduler.common.Constants;
......@@ -33,18 +32,15 @@ import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.utils.UDFUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.permission.PermissionCheck;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
import java.sql.*;
......@@ -64,22 +60,10 @@ public class SqlTask extends AbstractTask {
* sql parameters
*/
private SqlParameters sqlParameters;
/**
* process service
*/
private ProcessService processService;
/**
* alert dao
*/
private AlertDao alertDao;
/**
* datasource
*/
private DataSource dataSource;
/**
* base datasource
*/
......@@ -102,7 +86,7 @@ public class SqlTask extends AbstractTask {
if (!sqlParameters.checkParameters()) {
throw new RuntimeException("sql task params is not valid");
}
this.processService = SpringApplicationContext.getBean(ProcessService.class);
this.alertDao = SpringApplicationContext.getBean(AlertDao.class);
}
......@@ -111,6 +95,7 @@ public class SqlTask extends AbstractTask {
// set the name of the current thread
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, taskExecutionContext.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName);
logger.info("Full sql parameters: {}", sqlParameters);
logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {}",
sqlParameters.getType(),
......@@ -121,37 +106,15 @@ public class SqlTask extends AbstractTask {
sqlParameters.getShowType(),
sqlParameters.getConnParams());
// not set data source
if (sqlParameters.getDatasource() == 0){
logger.error("datasource id not exists");
exitStatusCode = -1;
return;
}
dataSource= processService.findDataSourceById(sqlParameters.getDatasource());
// data source is null
if (dataSource == null){
logger.error("datasource not exists");
exitStatusCode = -1;
return;
}
logger.info("datasource name : {} , type : {} , desc : {} , user_id : {} , parameter : {}",
dataSource.getName(),
dataSource.getType(),
dataSource.getNote(),
dataSource.getUserId(),
dataSource.getConnectionParams());
Connection con = null;
List<String> createFuncs = null;
try {
// load class
DataSourceFactory.loadClass(dataSource.getType());
DataSourceFactory.loadClass(DbType.valueOf(sqlParameters.getType()));
// get datasource
baseDataSource = DataSourceFactory.getDatasource(dataSource.getType(),
dataSource.getConnectionParams());
baseDataSource = DataSourceFactory.getDatasource(DbType.valueOf(sqlParameters.getType()),
sqlParameters.getConnParams());
// ready to execute SQL and parameter entity Map
SqlBinds mainSqlBinds = getSqlAndSqlParamsMap(sqlParameters.getSql());
......@@ -175,9 +138,8 @@ public class SqlTask extends AbstractTask {
for(int i=0;i<ids.length;i++){
idsArray[i]=Integer.parseInt(ids[i]);
}
List<UdfFunc> udfFuncList = processService.queryUdfFunListByids(idsArray);
createFuncs = UDFUtils.createFuncs(udfFuncList, taskExecutionContext.getTenantCode(), logger);
SQLTaskExecutionContext sqlTaskExecutionContext = taskExecutionContext.getSqlTaskExecutionContext();
createFuncs = UDFUtils.createFuncs(sqlTaskExecutionContext.getUdfFuncList(), taskExecutionContext.getTenantCode(), logger);
}
// execute sql task
......@@ -262,7 +224,7 @@ public class SqlTask extends AbstractTask {
CommonUtils.loadKerberosConf();
// if hive , load connection params if exists
if (HIVE == dataSource.getType()) {
if (HIVE == DbType.valueOf(sqlParameters.getType())) {
Properties paramProp = new Properties();
paramProp.setProperty(USER, baseDataSource.getUser());
paramProp.setProperty(PASSWORD, baseDataSource.getPassword());
......@@ -387,10 +349,7 @@ public class SqlTask extends AbstractTask {
*/
public void sendAttachment(String title,String content){
// process instance
ProcessInstance instance = processService.findProcessInstanceByTaskId(taskExecutionContext.getTaskInstanceId());
List<User> users = alertDao.queryUserByAlertGroupId(instance.getWarningGroupId());
List<User> users = alertDao.queryUserByAlertGroupId(taskExecutionContext.getSqlTaskExecutionContext().getWarningGroupId());
// receiving group list
List<String> receviersList = new ArrayList<String>();
......
......@@ -25,8 +25,8 @@ import org.apache.dolphinscheduler.dao.AlertDao;
import org.apache.dolphinscheduler.dao.DaoFactory;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.CuratorFramework;
......@@ -236,6 +236,8 @@ public class ZKMasterClient extends AbstractZKClient {
/**
* monitor master
* @param event event
* @param path path
*/
public void handleMasterEvent(TreeCacheEvent event, String path){
switch (event.getType()) {
......@@ -256,6 +258,8 @@ public class ZKMasterClient extends AbstractZKClient {
/**
* monitor worker
* @param event event
* @param path path
*/
public void handleWorkerEvent(TreeCacheEvent event, String path){
switch (event.getType()) {
......
......@@ -72,6 +72,8 @@ public class ZKWorkerClient extends AbstractZKClient {
/**
* monitor worker
* @param event event
* @param path path
*/
public void handleWorkerEvent(TreeCacheEvent event, String path){
switch (event.getType()) {
......
......@@ -1411,8 +1411,12 @@ public class ProcessService {
*/
public void changeTaskState(ExecutionStatus state,
Date endTime,
int processId,
String appIds,
int taskInstId) {
TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId);
taskInstance.setPid(processId);
taskInstance.setAppLink(appIds);
taskInstance.setState(state);
taskInstance.setEndTime(endTime);
saveTaskInstance(taskInstance);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册