未验证 提交 0cbf5c75 编写于 作者: W Wenjun Ruan 提交者: GitHub

Merge pull request #44 from ruanwenjun/dev_wenjun_cpAsyncTaskExecuteThread

Refactor worker execute task process, add async task implementation
......@@ -19,10 +19,11 @@ package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.shell.ShellExecutor;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.lang3.StringUtils;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
......@@ -92,6 +93,23 @@ public class OSUtils {
return Double.parseDouble(df.format(memoryUsage));
}
/**
* get disk usage
* Keep 2 decimal
*
* @return disk free size, unit: GB
*/
public static double diskAvailable() {
File file = new File(".");
long freeSpace = file.getFreeSpace(); //unallocated / free disk space in bytes.
double diskAvailable = freeSpace / 1024.0 / 1024 / 1024;
DecimalFormat df = new DecimalFormat(TWO_DECIMAL);
df.setRoundingMode(RoundingMode.HALF_UP);
return Double.parseDouble(df.format(diskAvailable));
}
/**
* get available physical memory size
* <p>
......@@ -249,6 +267,25 @@ public class OSUtils {
return users;
}
/**
* whether the user exists in linux
*
* @return boolean
*/
public static boolean existTenantCodeInLinux(String tenantCode) {
try{
String result = exeCmd("id "+ tenantCode);
if (!StringUtils.isEmpty(result)){
return result.contains("uid=");
}
}catch (Exception e){
//because ShellExecutor method throws exception to the linux return status is not 0
//not exist user return status is 1
logger.error(e.getMessage(), e);
}
return false;
}
/**
* create user
*
......
......@@ -43,6 +43,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
// todo: make this class to be utils
@Component
public class TaskPluginManager {
private static final Logger logger = LoggerFactory.getLogger(TaskPluginManager.class);
......
......@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.plugin.task.api;
import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskExecuteFunction;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
......@@ -85,12 +86,14 @@ public abstract class AbstractTask {
return null;
}
public abstract void handle() throws TaskException;
/**
* task handle
*
* @throws Exception exception
* This is used for Async task
*/
public abstract void handle() throws Exception;
public AsyncTaskExecuteFunction getAsyncTaskExecuteFunction() throws TaskException {
throw new TaskException("This is not supported");
}
/**
* cancel application
......
......@@ -42,7 +42,7 @@ public abstract class AbstractYarnTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
try {
// SHELL task exit code
TaskResponse response = shellCommandExecutor.run(buildCommand());
......@@ -52,7 +52,7 @@ public abstract class AbstractYarnTask extends AbstractTaskExecutor {
} catch (Exception e) {
logger.error("yarn process failure", e);
exitStatusCode = -1;
throw e;
throw new TaskException("Execute task failed", e);
}
}
......
......@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.plugin.task.api;
import lombok.NonNull;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecuteType;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
......@@ -31,4 +33,8 @@ public interface TaskChannel {
ResourceParametersHelper getResources(String parameters);
default @NonNull TaskExecuteType getTaskExecuteType() {
return TaskExecuteType.SYNC;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.async;
public interface AsyncTaskCallbackFunction {
void executeRunning();
void executeSuccess();
void executeFailed();
void executeThrowing(Throwable throwable);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.async;
import lombok.NonNull;
import java.time.Duration;
public interface AsyncTaskExecuteFunction {
@NonNull AsyncTaskExecutionStatus getTaskExecuteStatus();
@NonNull Duration getTaskExecuteInterval();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.async;
import lombok.Data;
import lombok.NonNull;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
@Data
public class AsyncTaskExecutionContext implements Delayed {
private final int taskInstanceId;
private final @NonNull AsyncTaskExecuteFunction asyncTaskExecuteFunction;
private final @NonNull AsyncTaskCallbackFunction asyncTaskCallbackFunction;
public AsyncTaskExecutionContext(int taskInstanceId,
@NonNull AsyncTaskExecuteFunction asyncTaskExecuteFunction,
@NonNull AsyncTaskCallbackFunction asyncTaskCallbackFunction) {
this.taskInstanceId = taskInstanceId;
this.asyncTaskExecuteFunction = asyncTaskExecuteFunction;
this.asyncTaskCallbackFunction = asyncTaskCallbackFunction;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.toSeconds(asyncTaskExecuteFunction.getTaskExecuteInterval().toMillis());
}
@Override
public int compareTo(Delayed o) {
if (o == null) {
return 1;
}
return Long.compare(this.getDelay(TimeUnit.SECONDS), o.getDelay(TimeUnit.SECONDS));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.plugin.task.api.async;
public enum AsyncTaskExecutionStatus {
RUNNING,
SUCCESS,
FAILED,
;
}
package org.apache.dolphinscheduler.plugin.task.api.enums;
public enum TaskExecuteType {
SYNC(0, "Will use SyncWorkerDelayTaskExecuteRunnable to execute the task"),
ASYNC(1, "Will use AsyncWorkerDelayTaskExecuteRunnable to execute the task"),
;
private final int code;
private final String desc;
TaskExecuteType(int code, String desc) {
this.code = code;
this.desc = desc;
}
public int getCode() {
return code;
}
public String getDesc() {
return desc;
}
}
......@@ -17,21 +17,30 @@
package org.apache.dolphinscheduler.plugin.task.datax;
import static org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils.decodePassword;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
import com.alibaba.druid.sql.ast.statement.SQLSelect;
import com.alibaba.druid.sql.ast.statement.SQLSelectItem;
import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock;
import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
import com.alibaba.druid.sql.ast.statement.SQLUnionQuery;
import com.alibaba.druid.sql.parser.SQLStatementParser;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
......@@ -39,9 +48,6 @@ import org.apache.dolphinscheduler.spi.enums.Flag;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
......@@ -57,7 +63,6 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
......@@ -65,17 +70,9 @@ import java.util.concurrent.ExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
import com.alibaba.druid.sql.ast.expr.SQLPropertyExpr;
import com.alibaba.druid.sql.ast.statement.SQLSelect;
import com.alibaba.druid.sql.ast.statement.SQLSelectItem;
import com.alibaba.druid.sql.ast.statement.SQLSelectQueryBlock;
import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
import com.alibaba.druid.sql.ast.statement.SQLUnionQuery;
import com.alibaba.druid.sql.parser.SQLStatementParser;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import static org.apache.dolphinscheduler.plugin.datasource.api.utils.PasswordUtils.decodePassword;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
public class DataxTask extends AbstractTaskExecutor {
/**
......@@ -147,7 +144,7 @@ public class DataxTask extends AbstractTaskExecutor {
* @throws Exception if error throws Exception
*/
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
try {
// replace placeholder,and combine local and global parameters
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
......@@ -162,7 +159,7 @@ public class DataxTask extends AbstractTaskExecutor {
setProcessId(commandExecuteResult.getProcessId());
} catch (Exception e) {
setExitStatusCode(EXIT_CODE_FAILURE);
throw e;
throw new TaskException("Execute DataX task failed", e);
}
}
......
......@@ -24,6 +24,7 @@ import static com.fasterxml.jackson.databind.MapperFeature.REQUIRE_SETTERS_FOR_G
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
......@@ -111,7 +112,7 @@ public class EmrTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws InterruptedException {
public void handle() throws TaskException {
ClusterStatus clusterStatus = null;
try {
RunJobFlowRequest runJobFlowRequest = createRunJobFlowRequest();
......@@ -132,6 +133,9 @@ public class EmrTask extends AbstractTaskExecutor {
} catch (EmrTaskException | SdkBaseException e) {
logger.error("emr task submit failed with error", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TaskException("Execute emr task failed", e);
} finally {
final int exitStatusCode = calculateExitStatusCode(clusterStatus);
setExitStatusCode(exitStatusCode);
......
......@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.plugin.task.http;
import static org.apache.dolphinscheduler.plugin.task.http.HttpTaskConstants.APPLICATION_JSON;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
......@@ -89,7 +90,7 @@ public class HttpTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
long startTime = System.currentTimeMillis();
String formatTimeStamp = DateUtils.formatTimeStamp(startTime);
String statusCode = null;
......@@ -108,7 +109,7 @@ public class HttpTask extends AbstractTaskExecutor {
appendMessage(e.toString());
exitStatusCode = -1;
logger.error("httpUrl[" + httpParameters.getUrl() + "] connection failed:" + output, e);
throw e;
throw new TaskException("Execute http task failed", e);
}
}
......
......@@ -18,28 +18,26 @@
package org.apache.dolphinscheduler.plugin.task.jupyter;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.PropertyUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
public class JupyterTask extends AbstractTaskExecutor {
/**
......@@ -79,7 +77,7 @@ public class JupyterTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
try {
// SHELL task exit code
TaskResponse response = shellCommandExecutor.run(buildCommand());
......@@ -89,7 +87,7 @@ public class JupyterTask extends AbstractTaskExecutor {
} catch (Exception e) {
logger.error("jupyter task execution failure", e);
exitStatusCode = -1;
throw e;
throw new TaskException("Execute jupyter task failed", e);
}
}
......
......@@ -17,24 +17,23 @@
package org.apache.dolphinscheduler.plugin.task.mlflow;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.MapUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
/**
* shell task
*/
......@@ -79,7 +78,7 @@ public class MlflowTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
try {
// construct process
String command = buildCommand();
......@@ -89,9 +88,9 @@ public class MlflowTask extends AbstractTaskExecutor {
setProcessId(commandExecuteResult.getProcessId());
mlflowParameters.dealOutParam(shellCommandExecutor.getVarPool());
} catch (Exception e) {
logger.error("shell task error", e);
logger.error("Mlflow task error", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw e;
throw new TaskException("Execute Mlflow task failed", e);
}
}
......
......@@ -17,14 +17,14 @@
package org.apache.dolphinscheduler.plugin.task.pigeon;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.dolphinscheduler.spi.utils.StringUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.http.HttpEntity;
import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
......@@ -33,6 +33,8 @@ import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import java.net.HttpURLConnection;
import java.net.URI;
......@@ -42,9 +44,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
/**
* TIS DataX Task
**/
......@@ -74,7 +73,7 @@ public class PigeonTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
// Trigger PIGEON DataX pipeline
logger.info("start execute PIGEON task");
long startTime = System.currentTimeMillis();
......@@ -150,6 +149,7 @@ public class PigeonTask extends AbstractTaskExecutor {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new TaskException("Execute pigeon task failed", e);
}
}
......
......@@ -17,19 +17,16 @@
package org.apache.dolphinscheduler.plugin.task.procedure;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
import org.apache.dolphinscheduler.plugin.datasource.api.plugin.DataSourceClientProvider;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParamUtils;
import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.spi.datasource.ConnectionParam;
import org.apache.dolphinscheduler.spi.enums.DbType;
......@@ -44,6 +41,9 @@ import java.sql.Types;
import java.util.HashMap;
import java.util.Map;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
/**
* procedure task
*/
......@@ -84,7 +84,7 @@ public class ProcedureTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
logger.info("procedure type : {}, datasource : {}, method : {} , localParams : {}",
procedureParameters.getType(),
procedureParameters.getDatasource(),
......@@ -123,7 +123,7 @@ public class ProcedureTask extends AbstractTaskExecutor {
} catch (Exception e) {
setExitStatusCode(EXIT_CODE_FAILURE);
logger.error("procedure task error", e);
throw e;
throw new TaskException("Execute procedure task failed", e);
} finally {
close(stmt, connection);
}
......
......@@ -98,7 +98,7 @@ public class PythonTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
try {
// generate the content of this python script
String pythonScriptContent = buildPythonScriptContent();
......
......@@ -17,11 +17,9 @@
package org.apache.dolphinscheduler.plugin.task.seatunnel;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
......@@ -31,8 +29,6 @@ import org.apache.dolphinscheduler.plugin.task.api.parser.ParameterUtils;
import org.apache.dolphinscheduler.plugin.task.api.utils.OSUtils;
import org.apache.dolphinscheduler.spi.utils.JSONUtils;
import org.apache.commons.collections4.MapUtils;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
......@@ -40,10 +36,12 @@ import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_X;
/**
* seatunnel task
*/
......@@ -90,7 +88,7 @@ public class SeatunnelTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
try {
// construct process
String command = buildCommand();
......@@ -102,7 +100,7 @@ public class SeatunnelTask extends AbstractTaskExecutor {
} catch (Exception e) {
logger.error("seatunnel task error", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw e;
throw new TaskException("Execute Seatunnel task failed", e);
}
}
......
......@@ -22,6 +22,7 @@ import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.RWXR_XR_
import org.apache.dolphinscheduler.plugin.task.api.AbstractTaskExecutor;
import org.apache.dolphinscheduler.plugin.task.api.ShellCommandExecutor;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskResponse;
......@@ -88,7 +89,7 @@ public class ShellTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
try {
// construct process
String command = buildCommand();
......@@ -100,7 +101,7 @@ public class ShellTask extends AbstractTaskExecutor {
} catch (Exception e) {
logger.error("shell task error", e);
setExitStatusCode(EXIT_CODE_FAILURE);
throw e;
throw new TaskException("Execute shell task error", e);
}
}
......
......@@ -116,7 +116,7 @@ public class SqlTask extends AbstractTaskExecutor {
}
@Override
public void handle() throws Exception {
public void handle() throws TaskException {
logger.info("Full sql parameters: {}", sqlParameters);
logger.info("sql type : {}, datasource : {}, sql : {} , localParams : {},udfs : {},showType : {},connParams : {},varPool : {} ,query max result limit {}",
sqlParameters.getType(),
......@@ -157,8 +157,8 @@ public class SqlTask extends AbstractTaskExecutor {
} catch (Exception e) {
setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
logger.error("sql task error: {}", e.toString());
throw e;
logger.error("sql task error", e);
throw new TaskException("Execute sql task failed", e);
}
}
......
......@@ -17,39 +17,84 @@
package org.apache.dolphinscheduler.server.worker.metrics;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import lombok.experimental.UtilityClass;
@UtilityClass
public class WorkerServerMetrics {
private static final Counter WORKER_OVERLOAD_COUNTER =
private final Counter workerOverloadCounter =
Counter.builder("ds.worker.overload.count")
.description("overloaded workers count")
.register(Metrics.globalRegistry);
private static final Counter WORKER_SUBMIT_QUEUE_IS_FULL_COUNTER =
private final Counter workerFullSubmitQueueCounter =
Counter.builder("ds.worker.full.submit.queue.count")
.description("full worker submit queues count")
.register(Metrics.globalRegistry);
public static void incWorkerOverloadCount() {
WORKER_OVERLOAD_COUNTER.increment();
private final Counter workerResourceDownloadSuccessCounter =
Counter.builder("ds.worker.resource.download.count")
.tag("status", "success")
.description("worker resource download success count")
.register(Metrics.globalRegistry);
private final Counter workerResourceDownloadFailCounter =
Counter.builder("ds.worker.resource.download.count")
.tag("status", "fail")
.description("worker resource download failure count")
.register(Metrics.globalRegistry);
private final Timer workerResourceDownloadDurationTimer =
Timer.builder("ds.worker.resource.download.duration")
.publishPercentiles(0.5, 0.75, 0.95, 0.99)
.publishPercentileHistogram()
.description("time cost of resource download on workers")
.register(Metrics.globalRegistry);
private final DistributionSummary workerResourceDownloadSizeDistribution =
DistributionSummary.builder("ds.worker.resource.download.size")
.baseUnit("bytes")
.publishPercentiles(0.5, 0.75, 0.95, 0.99)
.publishPercentileHistogram()
.description("size of downloaded resource files on worker")
.register(Metrics.globalRegistry);
public void incWorkerOverloadCount() {
workerOverloadCounter.increment();
}
public void incWorkerSubmitQueueIsFullCount() {
workerFullSubmitQueueCounter.increment();
}
public static void incWorkerSubmitQueueIsFullCount() {
WORKER_SUBMIT_QUEUE_IS_FULL_COUNTER.increment();
public void incWorkerResourceDownloadSuccessCount() {
workerResourceDownloadSuccessCounter.increment();
}
public static void registerWorkerRunningTaskGauge(Supplier<Number> supplier) {
public void incWorkerResourceDownloadFailureCount() {
workerResourceDownloadFailCounter.increment();
}
public void recordWorkerResourceDownloadTime(final long milliseconds) {
workerResourceDownloadDurationTimer.record(milliseconds, TimeUnit.MILLISECONDS);
}
public void recordWorkerResourceDownloadSize(final long size) {
workerResourceDownloadSizeDistribution.record(size);
}
public void registerWorkerRunningTaskGauge(final Supplier<Number> supplier) {
Gauge.builder("ds.task.running", supplier)
.description("number of running tasks on workers")
.register(Metrics.globalRegistry);
}
}
......@@ -17,14 +17,14 @@
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.Constants;
import com.google.common.base.Preconditions;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
......@@ -36,26 +36,16 @@ import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.TaskMetrics;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerDelayTaskExecuteRunnable;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnableFactoryBuilder;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.commons.lang.SystemUtils;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import io.netty.channel.Channel;
/**
* Used to handle {@link CommandType#TASK_DISPATCH_REQUEST}
*/
......@@ -91,12 +81,12 @@ public class TaskDispatchProcessor implements NettyRequestProcessor {
@Autowired(required = false)
private StorageOperate storageOperate;
@Counted(value = "dolphinscheduler_task_execution_count", description = "task execute total count")
@Timed(value = "dolphinscheduler_task_execution_timer", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
@Counted(value = "ds.task.execution.count", description = "task execute total count")
@Timed(value = "ds.task.execution.duration", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_DISPATCH_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType()));
String.format("invalid command type : %s", command.getType()));
TaskDispatchCommand taskDispatchCommand = JSONUtils.parseObject(command.getBody(), TaskDispatchCommand.class);
......@@ -104,7 +94,7 @@ public class TaskDispatchProcessor implements NettyRequestProcessor {
logger.error("task execute request command content is null");
return;
}
final String masterAddress = taskDispatchCommand.getMessageSenderAddress();
final String workflowMasterAddress = taskDispatchCommand.getMessageSenderAddress();
logger.info("task execute request message: {}", taskDispatchCommand);
TaskExecutionContext taskExecutionContext = taskDispatchCommand.getTaskExecutionContext();
......@@ -114,100 +104,40 @@ public class TaskDispatchProcessor implements NettyRequestProcessor {
return;
}
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
TaskMetrics.incrTaskTypeExecuteCount(taskExecutionContext.getTaskType());
// set cache, it will be used when kill task
TaskExecutionContextCacheManager.cacheTaskExecutionContext(taskExecutionContext);
// todo custom logger
taskExecutionContext.setHost(workerConfig.getWorkerAddress());
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
}
// check if the OS user exists
if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) {
logger.error("tenantCode: {} does not exist, taskInstanceId: {}",
taskExecutionContext.getTenantCode(),
taskExecutionContext.getTaskInstanceId());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
taskExecutionContext.setEndTime(new Date());
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress,
CommandType.TASK_EXECUTE_RESULT);
return;
}
// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);
logger.info("task instance local execute path : {}", execLocalPath);
taskExecutionContext.setExecutePath(execLocalPath);
try {
FileUtils.createWorkDirIfAbsent(execLocalPath);
} catch (Throwable ex) {
logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}",
execLocalPath,
taskExecutionContext.getTaskInstanceId(),
ex);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress,
CommandType.TASK_EXECUTE_RESULT);
return;
}
}
// delay task process
long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getDelayTime() * 60L);
long remainTime = DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L);
if (remainTime > 0) {
logger.info("delay the execution of task instance {}, delay time: {} s",
taskExecutionContext.getTaskInstanceId(),
remainTime);
logger.info("Current taskInstance is choose delay execution, delay time: {}s", remainTime);
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
taskExecutionContext.setStartTime(null);
workerMessageSender.sendMessage(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
workerMessageSender.sendMessage(taskExecutionContext, workflowMasterAddress, CommandType.TASK_EXECUTE_RESULT);
}
WorkerDelayTaskExecuteRunnable workerTaskExecuteRunnable = WorkerTaskExecuteRunnableFactoryBuilder.createWorkerDelayTaskExecuteRunnableFactory(
taskExecutionContext,
workerConfig,
workflowMasterAddress,
workerMessageSender,
alertClientService,
taskPluginManager,
storageOperate)
.createWorkerTaskExecuteRunnable();
// submit task to manager
boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext,
masterAddress,
workerMessageSender,
alertClientService,
taskPluginManager,
storageOperate));
boolean offer = workerManager.offer(workerTaskExecuteRunnable);
if (!offer) {
logger.warn("submit task to wait queue error, queue is full, queue size is {}, taskInstanceId: {}",
workerManager.getWaitSubmitQueueSize(), taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
logger.warn("submit task to wait queue error, queue is full, current queue size is {}, will send a task reject message to master", workerManager.getWaitSubmitQueueSize());
workerMessageSender.sendMessageWithRetry(taskExecutionContext, workflowMasterAddress, CommandType.TASK_EXECUTE_RESULT);
}
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
/**
* get execute local path
*
* @param taskExecutionContext taskExecutionContext
* @return execute local path
*/
private String getExecLocalPath(TaskExecutionContext taskExecutionContext) {
return FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
}
}
......@@ -17,6 +17,11 @@
package org.apache.dolphinscheduler.server.worker.processor;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
......@@ -34,26 +39,17 @@ import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerTaskExecuteRunnable;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.commons.lang3.StringUtils;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* task kill processor
......@@ -81,7 +77,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType()));
String.format("invalid command type : %s", command.getType()));
TaskKillRequestCommand killCommand = JSONUtils.parseObject(command.getBody(), TaskKillRequestCommand.class);
if (killCommand == null) {
logger.error("task kill request command is null");
......@@ -90,8 +86,8 @@ public class TaskKillProcessor implements NettyRequestProcessor {
logger.info("task kill command : {}", killCommand);
int taskInstanceId = killCommand.getTaskInstanceId();
TaskExecutionContext taskExecutionContext
= TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
TaskExecutionContext taskExecutionContext =
TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
if (taskExecutionContext == null) {
logger.error("taskRequest cache is null, taskInstanceId: {}", killCommand.getTaskInstanceId());
return;
......@@ -108,10 +104,12 @@ public class TaskKillProcessor implements NettyRequestProcessor {
return;
}
// if processId > 0, it should call cancelApplication to cancel remote application too.
this.cancelApplication(taskInstanceId);
Pair<Boolean, List<String>> result = doKill(taskExecutionContext);
taskExecutionContext.setCurrentExecutionStatus(
result.getLeft() ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE);
result.getLeft() ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE);
taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, result.getRight()));
sendTaskKillResponseCommand(channel, taskExecutionContext);
......@@ -159,12 +157,12 @@ public class TaskKillProcessor implements NettyRequestProcessor {
* @param taskInstanceId
*/
protected void cancelApplication(int taskInstanceId) {
TaskExecuteThread taskExecuteThread = workerManager.getTaskExecuteThread(taskInstanceId);
if (taskExecuteThread == null) {
WorkerTaskExecuteRunnable workerTaskExecuteRunnable = workerManager.getTaskExecuteThread(taskInstanceId);
if (workerTaskExecuteRunnable == null) {
logger.warn("taskExecuteThread not found, taskInstanceId:{}", taskInstanceId);
return;
}
AbstractTask task = taskExecuteThread.getTask();
AbstractTask task = workerTaskExecuteRunnable.getTask();
if (task == null) {
logger.warn("task not found, taskInstanceId:{}", taskInstanceId);
return;
......@@ -189,7 +187,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
}
try {
String pidsStr = ProcessUtils.getPidsStr(processId);
if (!StringUtils.isEmpty(pidsStr)) {
if (!Strings.isNullOrEmpty(pidsStr)) {
String cmd = String.format("kill -9 %s", pidsStr);
cmd = OSUtils.getSudoCmd(tenantCode, cmd);
logger.info("process id:{}, cmd:{}", processId, cmd);
......@@ -217,9 +215,9 @@ public class TaskKillProcessor implements NettyRequestProcessor {
host.getPort());
String log = logClient.viewLog(host.getIp(), host.getPort(), logPath);
List<String> appIds = Collections.emptyList();
if (!StringUtils.isEmpty(log)) {
if (!Strings.isNullOrEmpty(log)) {
appIds = LoggerUtils.getAppIds(log, logger);
if (StringUtils.isEmpty(executePath)) {
if (Strings.isNullOrEmpty(executePath)) {
logger.error("task instance execute path is empty");
throw new RuntimeException("task instance execute path is empty");
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskCallbackFunction;
import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskExecuteFunction;
import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskExecutionStatus;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Component
public class AsyncConditionTaskLooper extends BaseDaemonThread {
private final Logger logger = LoggerFactory.getLogger(AsyncConditionTaskLooper.class);
private final ExecutorService asyncCheckThreadPool = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * 2, new NamedThreadFactory("AsyncTaskCheckThreadPool"));
protected AsyncConditionTaskLooper() {
super("AsyncConditionTaskLooper");
}
@Override
public synchronized void start() {
logger.info("Master Event execute service starting");
super.start();
logger.info("Master Event execute service started");
}
@Override
public void run() {
while (Stopper.isRunning()) {
try {
AsyncTaskExecutionContext asyncTaskExecutionContext = AsyncTaskDelayQueue.pollAsyncTask();
if (asyncTaskExecutionContext == null) {
continue;
}
if (TaskExecutionContextCacheManager.getByTaskInstanceId(asyncTaskExecutionContext.getTaskInstanceId()) == null) {
logger.warn("Cannot find the taskInstance from TaskExecutionContextCacheManager, the task may already been killed");
continue;
}
asyncCheckThreadPool.submit(() -> {
final AsyncTaskExecuteFunction asyncTaskExecuteFunction = asyncTaskExecutionContext.getAsyncTaskExecuteFunction();
final AsyncTaskCallbackFunction asyncTaskCallbackFunction = asyncTaskExecutionContext.getAsyncTaskCallbackFunction();
try {
AsyncTaskExecutionStatus asyncTaskExecutionStatus = asyncTaskExecuteFunction.getTaskExecuteStatus();
switch (asyncTaskExecutionStatus) {
case RUNNING:
asyncTaskCallbackFunction.executeRunning();
break;
case SUCCESS:
asyncTaskCallbackFunction.executeSuccess();
break;
case FAILED:
asyncTaskCallbackFunction.executeFailed();
break;
}
} catch (Exception ex) {
asyncTaskCallbackFunction.executeThrowing(ex);
}
});
} catch (InterruptedException e) {
logger.info("AsyncConditionTaskLooper has been interrupted, will break this loop", e);
break;
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import lombok.NonNull;
import lombok.experimental.UtilityClass;
import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskExecutionContext;
import javax.annotation.Nullable;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
@UtilityClass
public class AsyncTaskDelayQueue {
private final DelayQueue<AsyncTaskExecutionContext> asyncTaskCheckDelayQueue = new DelayQueue<>();
public void addAsyncTask(@NonNull AsyncTaskExecutionContext asyncTaskExecutionContext) {
asyncTaskCheckDelayQueue.add(asyncTaskExecutionContext);
}
public @Nullable AsyncTaskExecutionContext pollAsyncTask() throws InterruptedException {
return asyncTaskCheckDelayQueue.poll(1, TimeUnit.MINUTES);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskCallbackFunction;
import org.apache.dolphinscheduler.plugin.task.api.async.AsyncTaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import javax.annotation.Nullable;
public class AsyncWorkerDelayTaskExecuteRunnable extends WorkerDelayTaskExecuteRunnable {
private AsyncTaskExecutionContext asyncTaskExecutionContext;
public AsyncWorkerDelayTaskExecuteRunnable(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull WorkerConfig workerConfig,
@NonNull String workflowMasterAddress,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull AlertClientService alertClientService,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate) {
super(taskExecutionContext, workerConfig, workflowMasterAddress, workerMessageSender, alertClientService, taskPluginManager, storageOperate);
}
@Override
public void executeTask() throws TaskException {
if (task == null) {
throw new TaskException("The task plugin instance is null");
}
// we execute the handle method here, but for async task, this method will not block
task.handle();
// submit the task to async task queue
asyncTaskExecutionContext = new AsyncTaskExecutionContext(
taskExecutionContext.getTaskInstanceId(),
task.getAsyncTaskExecuteFunction(),
new AsyncTaskCallbackFunctionImpl(this)
);
AsyncTaskDelayQueue.addAsyncTask(asyncTaskExecutionContext);
}
public AsyncTaskExecutionContext getAsyncTaskExecutionContext() {
return asyncTaskExecutionContext;
}
@Override
protected void afterExecute() throws TaskException {
// do nothing, since this task doesn't really finished
}
@Override
public void afterThrowing(Throwable throwable) throws TaskException {
// need to clear from the async queue
super.afterThrowing(throwable);
}
public static class AsyncTaskCallbackFunctionImpl implements AsyncTaskCallbackFunction {
private final AsyncWorkerDelayTaskExecuteRunnable asyncWorkerDelayTaskExecuteRunnable;
public AsyncTaskCallbackFunctionImpl(@NonNull AsyncWorkerDelayTaskExecuteRunnable asyncWorkerDelayTaskExecuteRunnable) {
this.asyncWorkerDelayTaskExecuteRunnable = asyncWorkerDelayTaskExecuteRunnable;
}
@Override
public void executeRunning() {
AsyncTaskDelayQueue.addAsyncTask(asyncWorkerDelayTaskExecuteRunnable.getAsyncTaskExecutionContext());
}
@Override
public void executeSuccess() {
executeFinished();
}
@Override
public void executeFailed() {
executeFinished();
}
@Override
public void executeThrowing(Throwable throwable) {
asyncWorkerDelayTaskExecuteRunnable.afterThrowing(throwable);
}
private void executeFinished() {
if (asyncWorkerDelayTaskExecuteRunnable.task == null) {
throw new TaskException("The current task instance is null");
}
asyncWorkerDelayTaskExecuteRunnable.sendAlertIfNeeded();
asyncWorkerDelayTaskExecuteRunnable.sendTaskResult();
TaskExecutionContextCacheManager.removeByTaskInstanceId(asyncWorkerDelayTaskExecuteRunnable.getTaskExecutionContext().getTaskInstanceId());
asyncWorkerDelayTaskExecuteRunnable.clearTaskExecPathIfNeeded();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import javax.annotation.Nullable;
public class AsyncWorkerDelayTaskExecuteRunnableFactory extends WorkerDelayTaskExecuteRunnableFactory<AsyncWorkerDelayTaskExecuteRunnable> {
public AsyncWorkerDelayTaskExecuteRunnableFactory(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull WorkerConfig workerConfig,
@NonNull String workflowMasterAddress,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull AlertClientService alertClientService,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate) {
super(taskExecutionContext, workerConfig, workflowMasterAddress, workerMessageSender, alertClientService, taskPluginManager, storageOperate);
}
@Override
public AsyncWorkerDelayTaskExecuteRunnable createWorkerTaskExecuteRunnable() {
return new AsyncWorkerDelayTaskExecuteRunnable(
taskExecutionContext,
workerConfig,
workflowMasterAddress,
workerMessageSender,
alertClientService,
taskPluginManager,
storageOperate
);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import javax.annotation.Nullable;
public class SyncWorkerDelayTaskExecuteRunnable extends WorkerDelayTaskExecuteRunnable {
public SyncWorkerDelayTaskExecuteRunnable(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull WorkerConfig workerConfig,
@NonNull String workflowMaster,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull AlertClientService alertClientService,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate) {
super(taskExecutionContext, workerConfig, workflowMaster, workerMessageSender, alertClientService, taskPluginManager, storageOperate);
}
@Override
public void executeTask() throws TaskException {
if (task == null) {
throw new TaskException("The task plugin instance is not initialized");
}
task.handle();
}
@Override
protected void afterExecute() {
super.afterExecute();
}
@Override
protected void afterThrowing(Throwable throwable) throws TaskException {
super.afterThrowing(throwable);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import javax.annotation.Nullable;
public class SyncWorkerDelayTaskExecuteRunnableFactory extends WorkerDelayTaskExecuteRunnableFactory<SyncWorkerDelayTaskExecuteRunnable> {
protected SyncWorkerDelayTaskExecuteRunnableFactory(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull WorkerConfig workerConfig,
@NonNull String workflowMasterAddress,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull AlertClientService alertClientService,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate) {
super(taskExecutionContext, workerConfig, workflowMasterAddress, workerMessageSender, alertClientService, taskPluginManager, storageOperate);
}
@Override
public SyncWorkerDelayTaskExecuteRunnable createWorkerTaskExecuteRunnable() {
return new SyncWorkerDelayTaskExecuteRunnable(
taskExecutionContext,
workerConfig,
workflowMasterAddress,
workerMessageSender,
alertClientService,
taskPluginManager,
storageOperate);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredException;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import java.io.File;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Strings;
import lombok.NonNull;
/**
* task scheduler thread
*/
public class TaskExecuteThread implements Runnable, Delayed {
/**
* logger
*/
private final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class);
/**
* task instance
*/
private final TaskExecutionContext taskExecutionContext;
private final String masterAddress;
private final StorageOperate storageOperate;
/**
* abstract task
*/
private AbstractTask task;
/**
* task callback service
*/
private final WorkerMessageSender workerMessageSender;
/**
* alert client server
*/
private final AlertClientService alertClientService;
private TaskPluginManager taskPluginManager;
/**
* constructor
*
* @param taskExecutionContext taskExecutionContext
* @param workerMessageSender used for worker send message to master
*/
public TaskExecuteThread(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull String masterAddress,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull AlertClientService alertClientService,
StorageOperate storageOperate) {
this.taskExecutionContext = taskExecutionContext;
this.masterAddress = masterAddress;
this.workerMessageSender = workerMessageSender;
this.alertClientService = alertClientService;
this.storageOperate = storageOperate;
}
public TaskExecuteThread(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull String masterAddress,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull AlertClientService alertClientService,
@NonNull TaskPluginManager taskPluginManager,
StorageOperate storageOperate) {
this.taskExecutionContext = taskExecutionContext;
this.masterAddress = masterAddress;
this.workerMessageSender = workerMessageSender;
this.alertClientService = alertClientService;
this.taskPluginManager = taskPluginManager;
this.storageOperate = storageOperate;
}
@Override
public void run() {
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS);
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setEndTime(new Date());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress,
CommandType.TASK_EXECUTE_RESULT);
logger.info("Task dry run success");
return;
}
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
logger.info("script path : {}", taskExecutionContext.getExecutePath());
if (taskExecutionContext.getStartTime() == null) {
taskExecutionContext.setStartTime(new Date());
}
logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());
// callback task execute running
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress,
CommandType.TASK_EXECUTE_RUNNING);
// copy hdfs/minio file to local
List<Pair<String, String>> fileDownloads = downloadCheck(taskExecutionContext.getExecutePath(),
taskExecutionContext.getResources());
if (!fileDownloads.isEmpty()) {
downloadResource(taskExecutionContext.getExecutePath(), logger, fileDownloads);
}
taskExecutionContext.setEnvFile(CommonUtils.getSystemEnvPath());
taskExecutionContext.setTaskAppId(String.format("%s_%s",
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()));
TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
if (null == taskChannel) {
throw new ServiceException(String.format("%s Task Plugin Not Found,Please Check Config File.", taskExecutionContext.getTaskType()));
}
String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setTaskLogName(taskLogName);
// set the name of the current thread
Thread.currentThread().setName(taskLogName);
task = taskChannel.createTask(taskExecutionContext);
// task init
this.task.init();
//init varPool
this.task.getParameters().setVarPool(taskExecutionContext.getVarPool());
// task handle
this.task.handle();
// task result process
if (this.task.getNeedAlert()) {
sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus().getCode());
}
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.of(this.task.getExitStatus().getCode()));
taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
taskExecutionContext.setProcessId(this.task.getProcessId());
taskExecutionContext.setAppIds(this.task.getAppIds());
taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus());
} catch (Throwable e) {
logger.error("task scheduler failure", e);
kill();
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
taskExecutionContext.setProcessId(this.task.getProcessId());
taskExecutionContext.setAppIds(this.task.getAppIds());
} finally {
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
workerMessageSender.sendMessageWithRetry(taskExecutionContext,
masterAddress,
CommandType.TASK_EXECUTE_RESULT);
clearTaskExecPath();
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
private void sendAlert(TaskAlertInfo taskAlertInfo, int status) {
int strategy = status == ExecutionStatus.SUCCESS.getCode() ? WarningType.SUCCESS.getCode() : WarningType.FAILURE.getCode();
alertClientService.sendAlert(taskAlertInfo.getAlertGroupId(), taskAlertInfo.getTitle(), taskAlertInfo.getContent(), strategy);
}
/**
* when task finish, clear execute path.
*/
private void clearTaskExecPath() {
logger.info("develop mode is: {}", CommonUtils.isDevelopMode());
if (!CommonUtils.isDevelopMode()) {
// get exec dir
String execLocalPath = taskExecutionContext.getExecutePath();
if (StringUtils.isEmpty(execLocalPath)) {
logger.warn("task: {} exec local path is empty.", taskExecutionContext.getTaskName());
return;
}
if (SINGLE_SLASH.equals(execLocalPath)) {
logger.warn("task: {} exec local path is '/', direct deletion is not allowed", taskExecutionContext.getTaskName());
return;
}
try {
org.apache.commons.io.FileUtils.deleteDirectory(new File(execLocalPath));
logger.info("exec local path: {} cleared.", execLocalPath);
} catch (IOException e) {
if (e instanceof NoSuchFileException) {
// this is expected
} else {
logger.error("Delete exec dir failed.", e);
}
}
}
}
/**
* kill task
*/
public void kill() {
if (task != null) {
try {
task.cancelApplication(true);
ProcessUtils.killYarnJob(taskExecutionContext);
} catch (Exception e) {
logger.error("Kill task failed", e);
}
}
}
/**
* download resource file
*
* @param execLocalPath execLocalPath
* @param fileDownloads projectRes
* @param logger logger
*/
public void downloadResource(String execLocalPath, Logger logger, List<Pair<String, String>> fileDownloads) {
for (Pair<String, String> fileDownload : fileDownloads) {
try {
// query the tenant code of the resource according to the name of the resource
String fullName = fileDownload.getLeft();
String tenantCode = fileDownload.getRight();
String resHdfsPath = storageOperate.getResourceFileName(tenantCode, fullName);
logger.info("get resource file from hdfs :{}", resHdfsPath);
storageOperate.download(tenantCode, resHdfsPath, execLocalPath + File.separator + fullName, false, true);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new ServiceException(e.getMessage());
}
}
}
/**
* download resource check
*
* @param execLocalPath
* @param projectRes
* @return
*/
public List<Pair<String, String>> downloadCheck(String execLocalPath, Map<String, String> projectRes) {
if (MapUtils.isEmpty(projectRes)) {
return Collections.emptyList();
}
List<Pair<String, String>> downloadFile = new ArrayList<>();
projectRes.forEach((key, value) -> {
File resFile = new File(execLocalPath, key);
boolean notExist = !resFile.exists();
if (notExist) {
downloadFile.add(Pair.of(key, value));
} else {
logger.info("file : {} exists ", resFile.getName());
}
});
if (!downloadFile.isEmpty() && !PropertyUtils.getResUploadStartupState()) {
throw new StorageOperateNoConfiguredException("Storage service config does not exist!");
}
return downloadFile;
}
/**
* get current TaskExecutionContext
*
* @return TaskExecutionContext
*/
public TaskExecutionContext getTaskExecutionContext() {
return this.taskExecutionContext;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(DateUtils.getRemainTime(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getDelayTime() * 60L), TimeUnit.SECONDS);
}
@Override
public int compareTo(Delayed o) {
if (o == null) {
return 1;
}
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
public AbstractTask getTask() {
return task;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import javax.annotation.Nullable;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public abstract class WorkerDelayTaskExecuteRunnable extends WorkerTaskExecuteRunnable implements Delayed {
protected WorkerDelayTaskExecuteRunnable(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull WorkerConfig workerConfig,
@NonNull String masterAddress,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull AlertClientService alertClientService,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate) {
super(taskExecutionContext, workerConfig, masterAddress, workerMessageSender, alertClientService, taskPluginManager, storageOperate);
}
@Override
public long getDelay(TimeUnit unit) {
TaskExecutionContext taskExecutionContext = getTaskExecutionContext();
return unit.convert(
DateUtils.getRemainTime(
taskExecutionContext.getFirstSubmitTime(), taskExecutionContext.getDelayTime() * 60L), TimeUnit.SECONDS);
}
@Override
public int compareTo(Delayed o) {
if (o == null) {
return 1;
}
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import javax.annotation.Nullable;
public abstract class WorkerDelayTaskExecuteRunnableFactory<T extends WorkerDelayTaskExecuteRunnable> implements WorkerTaskExecuteRunnableFactory<T> {
protected final @NonNull TaskExecutionContext taskExecutionContext;
protected final @NonNull WorkerConfig workerConfig;
protected final @NonNull String workflowMasterAddress;
protected final @NonNull WorkerMessageSender workerMessageSender;
protected final @NonNull AlertClientService alertClientService;
protected final @NonNull TaskPluginManager taskPluginManager;
protected final @Nullable StorageOperate storageOperate;
protected WorkerDelayTaskExecuteRunnableFactory(
@NonNull TaskExecutionContext taskExecutionContext,
@NonNull WorkerConfig workerConfig,
@NonNull String workflowMasterAddress,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull AlertClientService alertClientService,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate) {
this.taskExecutionContext = taskExecutionContext;
this.workerConfig = workerConfig;
this.workflowMasterAddress = workflowMasterAddress;
this.workerMessageSender = workerMessageSender;
this.alertClientService = alertClientService;
this.taskPluginManager = taskPluginManager;
this.storageOperate = storageOperate;
}
public abstract T createWorkerTaskExecuteRunnable();
}
......@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
......@@ -33,6 +34,7 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
public class WorkerExecService {
/**
* logger of WorkerExecService
*/
......@@ -48,19 +50,21 @@ public class WorkerExecService {
/**
* running task
*/
private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap;
private final ConcurrentHashMap<Integer, WorkerTaskExecuteRunnable> taskExecuteThreadMap;
public WorkerExecService(ExecutorService execService, ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap) {
public WorkerExecService(ExecutorService execService,
ConcurrentHashMap<Integer, WorkerTaskExecuteRunnable> taskExecuteThreadMap) {
this.execService = execService;
this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService);
this.taskExecuteThreadMap = taskExecuteThreadMap;
WorkerServerMetrics.registerWorkerRunningTaskGauge(taskExecuteThreadMap::size);
}
public void submit(TaskExecuteThread taskExecuteThread) {
public void submit(final WorkerTaskExecuteRunnable taskExecuteThread) {
taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), taskExecuteThread);
ListenableFuture future = this.listeningExecutorService.submit(taskExecuteThread);
FutureCallback futureCallback = new FutureCallback() {
@Override
public void onSuccess(Object o) {
taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
......@@ -69,9 +73,9 @@ public class WorkerExecService {
@Override
public void onFailure(Throwable throwable) {
logger.error("task execute failed, processInstanceId:{}, taskInstanceId:{}",
taskExecuteThread.getTaskExecutionContext().getProcessInstanceId(),
taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(),
throwable);
taskExecuteThread.getTaskExecutionContext().getProcessInstanceId(),
taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(),
throwable);
taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
}
};
......@@ -87,4 +91,8 @@ public class WorkerExecService {
return ((ThreadPoolExecutor) this.execService).getQueue().size();
}
}
\ No newline at end of file
public Map<Integer, WorkerTaskExecuteRunnable> getTaskExecuteThreadMap() {
return taskExecuteThreadMap;
}
}
\ No newline at end of file
......@@ -17,18 +17,17 @@
package org.apache.dolphinscheduler.server.worker.runner;
import com.facebook.presto.jdbc.internal.javax.annotation.Nullable;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
/**
* Manage tasks
*/
......@@ -37,30 +36,27 @@ public class WorkerManagerThread implements Runnable {
private final Logger logger = LoggerFactory.getLogger(WorkerManagerThread.class);
/**
* task queue
*/
private final BlockingQueue<TaskExecuteThread> waitSubmitQueue;
private final DelayQueue<WorkerDelayTaskExecuteRunnable> waitSubmitQueue;
/**
* thread executor service
*/
private final WorkerExecService workerExecService;
private final int workerExecThreads;
/**
* running task
*/
private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, WorkerTaskExecuteRunnable> taskExecuteThreadMap = new ConcurrentHashMap<>();
public WorkerManagerThread(WorkerConfig workerConfig) {
workerExecThreads = workerConfig.getExecThreads();
this.waitSubmitQueue = new DelayQueue<>();
workerExecService = new WorkerExecService(ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread",
workerConfig.getExecThreads()),
taskExecuteThreadMap);
workerExecService = new WorkerExecService(
ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads()),
taskExecuteThreadMap);
}
public TaskExecuteThread getTaskExecuteThread(Integer taskInstanceId) {
return this.taskExecuteThreadMap.get(taskInstanceId);
public @Nullable WorkerTaskExecuteRunnable getTaskExecuteThread(Integer taskInstanceId) {
return taskExecuteThreadMap.get(taskInstanceId);
}
/**
......@@ -78,7 +74,7 @@ public class WorkerManagerThread implements Runnable {
* @return queue size
*/
public int getThreadPoolQueueSize() {
return this.workerExecService.getThreadPoolQueueSize();
return workerExecService.getThreadPoolQueueSize();
}
/**
......@@ -87,20 +83,13 @@ public class WorkerManagerThread implements Runnable {
*/
public void killTaskBeforeExecuteByInstanceId(Integer taskInstanceId) {
waitSubmitQueue.stream()
.filter(taskExecuteThread -> taskExecuteThread.getTaskExecutionContext()
.getTaskInstanceId() == taskInstanceId)
.forEach(waitSubmitQueue::remove);
.filter(taskExecuteThread -> taskExecuteThread.getTaskExecutionContext()
.getTaskInstanceId() == taskInstanceId)
.forEach(waitSubmitQueue::remove);
}
/**
* submit task
*
* @param taskExecuteThread taskExecuteThread
* @return submit result
*/
public boolean offer(TaskExecuteThread taskExecuteThread) {
return waitSubmitQueue.offer(taskExecuteThread);
public boolean offer(WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable) {
return waitSubmitQueue.offer(workerDelayTaskExecuteRunnable);
}
public void start() {
......@@ -114,15 +103,20 @@ public class WorkerManagerThread implements Runnable {
@Override
public void run() {
Thread.currentThread().setName("Worker-Execute-Manager-Thread");
TaskExecuteThread taskExecuteThread;
while (Stopper.isRunning()) {
try {
taskExecuteThread = waitSubmitQueue.take();
workerExecService.submit(taskExecuteThread);
final WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable = waitSubmitQueue.take();
workerExecService.submit(workerDelayTaskExecuteRunnable);
} catch (Exception e) {
logger.error("An unexpected interrupt is happened, "
+ "the exception will be ignored and this thread will continue to run", e);
+ "the exception will be ignored and this thread will continue to run", e);
}
}
}
public void clearTask() {
waitSubmitQueue.clear();
workerExecService.getTaskExecuteThreadMap().values().forEach(WorkerTaskExecuteRunnable::cancelTask);
workerExecService.getTaskExecuteThreadMap().clear();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import com.google.common.base.Strings;
import lombok.NonNull;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.TaskPluginException;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.utils.Checker;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Date;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
public abstract class WorkerTaskExecuteRunnable implements Runnable {
protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, WorkerTaskExecuteRunnable.class));
protected final @NonNull TaskExecutionContext taskExecutionContext;
protected final @NonNull WorkerConfig workerConfig;
protected final @NonNull String masterAddress;
protected final @NonNull WorkerMessageSender workerMessageSender;
protected final @NonNull AlertClientService alertClientService;
protected final @NonNull TaskPluginManager taskPluginManager;
protected final @Nullable StorageOperate storageOperate;
protected @Nullable AbstractTask task;
protected WorkerTaskExecuteRunnable(
@NonNull TaskExecutionContext taskExecutionContext,
@NonNull WorkerConfig workerConfig,
@NonNull String masterAddress,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull AlertClientService alertClientService,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate) {
this.taskExecutionContext = taskExecutionContext;
this.workerConfig = workerConfig;
this.masterAddress = masterAddress;
this.workerMessageSender = workerMessageSender;
this.alertClientService = alertClientService;
this.taskPluginManager = taskPluginManager;
this.storageOperate = storageOperate;
String taskLogName = LoggerUtils.buildTaskId(taskExecutionContext.getFirstSubmitTime(),
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setTaskLogName(taskLogName);
logger.info("Set task logger name: {}", taskLogName);
}
protected abstract void executeTask();
protected void afterExecute() throws TaskException {
if (task == null) {
throw new TaskException("The current task instance is null");
}
sendAlertIfNeeded();
sendTaskResult();
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
logger.info("Remove the current task execute context from worker cache");
clearTaskExecPathIfNeeded();
}
protected void afterThrowing(Throwable throwable) throws TaskException {
cancelTask();
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
taskExecutionContext.setEndTime(new Date());
workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
logger.info("Get a exception when execute the task, will send the task execute result to master, the current task execute result is {}", ExecutionStatus.FAILURE);
}
public void cancelTask() {
// cancel the task
if (task != null) {
try {
task.cancelApplication(true);
ProcessUtils.killYarnJob(taskExecutionContext);
} catch (Exception e) {
logger.error("Task execute failed and cancel the application failed, this will not affect the taskInstance status, but you need to check manual", e);
}
}
}
@Override
public void run() {
try {
// set the thread name to make sure the log be written to the task log file
Thread.currentThread().setName(taskExecutionContext.getTaskLogName());
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
logger.info("Begin to pulling task");
initializeTask();
if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS);
taskExecutionContext.setEndTime(new Date());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
logger.info("The current execute mode is dry run, will stop the subsequent process and set the taskInstance status to success");
return;
}
beforeExecute();
executeTask();
afterExecute();
} catch (Throwable ex) {
logger.error("Task execute failed, due to meet an exception", ex);
afterThrowing(ex);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
}
protected void initializeTask() {
logger.info("Begin to initialize task");
Date taskStartTime = new Date();
taskExecutionContext.setStartTime(taskStartTime);
logger.info("Set task startTime: {}", taskStartTime);
String systemEnvPath = CommonUtils.getSystemEnvPath();
taskExecutionContext.setEnvFile(systemEnvPath);
logger.info("Set task envFile: {}", systemEnvPath);
String taskAppId = String.format("%s_%s", taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setTaskAppId(taskAppId);
logger.info("Set task appId: {}", taskAppId);
logger.info("End initialize task");
}
protected void beforeExecute() {
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RUNNING);
logger.info("Set task status to {}", ExecutionStatus.RUNNING_EXECUTION);
Checker.checkTenantExist(workerConfig, taskExecutionContext);
logger.info("TenantCode:{} check success", taskExecutionContext.getTenantCode());
Checker.createProcessLocalPathIfAbsent(taskExecutionContext);
logger.info("ProcessExecDir:{} check success", taskExecutionContext.getExecutePath());
Checker.downloadResourcesIfNeeded(storageOperate, taskExecutionContext, logger);
logger.info("Resources:{} check success", taskExecutionContext.getResources());
TaskChannel taskChannel = taskPluginManager.getTaskChannelMap().get(taskExecutionContext.getTaskType());
if (null == taskChannel) {
throw new TaskPluginException(String.format("%s task plugin not found, please check config file.", taskExecutionContext.getTaskType()));
}
task = taskChannel.createTask(taskExecutionContext);
if (task == null) {
throw new TaskPluginException(String.format("%s task is null, please check the task plugin is correct", taskExecutionContext.getTaskType()));
}
logger.info("Task plugin: {} create success", taskExecutionContext.getTaskType());
task.init();
logger.info("Success initialized task plugin instance success");
task.getParameters().setVarPool(taskExecutionContext.getVarPool());
logger.info("Success set taskVarPool: {}", taskExecutionContext.getVarPool());
}
protected void sendAlertIfNeeded() {
if (!task.getNeedAlert()) {
return;
}
logger.info("The current task need to send alert, begin to send alert");
ExecutionStatus status = task.getExitStatus();
TaskAlertInfo taskAlertInfo = task.getTaskAlertInfo();
int strategy = status == ExecutionStatus.SUCCESS ? WarningType.SUCCESS.getCode() : WarningType.FAILURE.getCode();
alertClientService.sendAlert(taskAlertInfo.getAlertGroupId(), taskAlertInfo.getTitle(), taskAlertInfo.getContent(), strategy);
logger.info("Success send alert");
}
protected void sendTaskResult() {
taskExecutionContext.setCurrentExecutionStatus(task.getExitStatus());
taskExecutionContext.setEndTime(new Date());
taskExecutionContext.setProcessId(task.getProcessId());
taskExecutionContext.setAppIds(task.getAppIds());
taskExecutionContext.setVarPool(JSONUtils.toJsonString(task.getParameters().getVarPool()));
workerMessageSender.sendMessageWithRetry(taskExecutionContext, masterAddress, CommandType.TASK_EXECUTE_RESULT);
logger.info("Send task execute result to master, the current task status: {}", taskExecutionContext.getCurrentExecutionStatus());
}
protected void clearTaskExecPathIfNeeded() {
String execLocalPath = taskExecutionContext.getExecutePath();
if (!CommonUtils.isDevelopMode()) {
logger.info("The current execute mode isn't develop mode, will clear the task execute file: {}", execLocalPath);
// get exec dir
if (Strings.isNullOrEmpty(execLocalPath)) {
logger.warn("The task execute file is {} no need to clear", taskExecutionContext.getTaskName());
return;
}
if (SINGLE_SLASH.equals(execLocalPath)) {
logger.warn("The task execute file is '/', direct deletion is not allowed");
return;
}
try {
org.apache.commons.io.FileUtils.deleteDirectory(new File(execLocalPath));
logger.info("Success clear the task execute file: {}", execLocalPath);
} catch (IOException e) {
if (e instanceof NoSuchFileException) {
// this is expected
} else {
logger.error("Delete task execute file: {} failed, this will not affect the task status, but you need to clear this manually", execLocalPath, e);
}
}
} else {
logger.info("The current execute mode is develop mode, will not clear the task execute file: {}", execLocalPath);
}
}
public @NonNull TaskExecutionContext getTaskExecutionContext() {
return taskExecutionContext;
}
public @Nullable AbstractTask getTask() {
return task;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
public interface WorkerTaskExecuteRunnableFactory<T> {
T createWorkerTaskExecuteRunnable();
}
......@@ -17,71 +17,50 @@
package org.apache.dolphinscheduler.server.worker.runner;
import lombok.NonNull;
import lombok.experimental.UtilityClass;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClientTest;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.commons.lang3.tuple.Pair;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@RunWith(PowerMockRunner.class)
public class TaskExecuteThreadTest {
private static final Logger LOGGER = LoggerFactory.getLogger(WorkerRegistryClientTest.class);
@Mock
private TaskExecutionContext taskExecutionContext;
@Mock
private WorkerMessageSender workerMessageSender;
@Mock
private AlertClientService alertClientService;
@Mock
private StorageOperate storageOperate;
@Mock
private TaskPluginManager taskPluginManager;
@Test
public void checkTest() {
TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext,
"127.0.0.1:5678",
workerMessageSender,
alertClientService,
taskPluginManager,
storageOperate);
String path = "/";
Map<String, String> projectRes = new HashMap<>();
projectRes.put("shell", "shell.sh");
List<Pair<String, String>> downloads = new ArrayList<>();
try {
downloads = taskExecuteThread.downloadCheck(path, projectRes);
} catch (Exception e) {
Assert.assertNotNull(e);
}
downloads.add(Pair.of("shell", "shell.sh"));
try{
taskExecuteThread.downloadResource(path, LOGGER, downloads);
}catch (Exception e){
@UtilityClass
public class WorkerTaskExecuteRunnableFactoryBuilder {
public static WorkerDelayTaskExecuteRunnableFactory<?> createWorkerDelayTaskExecuteRunnableFactory(@NonNull TaskExecutionContext taskExecutionContext,
@NonNull WorkerConfig workerConfig,
@NonNull String workflowMasterAddress,
@NonNull WorkerMessageSender workerMessageSender,
@NonNull AlertClientService alertClientService,
@NonNull TaskPluginManager taskPluginManager,
@Nullable StorageOperate storageOperate) {
String taskType = taskExecutionContext.getTaskType();
TaskChannel taskChannel = taskPluginManager.getTaskChannel(taskType);
switch (taskChannel.getTaskExecuteType()) {
case SYNC:
return new SyncWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext,
workerConfig,
workflowMasterAddress,
workerMessageSender,
alertClientService,
taskPluginManager,
storageOperate);
case ASYNC:
return new AsyncWorkerDelayTaskExecuteRunnableFactory(taskExecutionContext,
workerConfig,
workflowMasterAddress,
workerMessageSender,
alertClientService,
taskPluginManager,
storageOperate);
default:
throw new IllegalArgumentException(String.format("The current taskExecuteType: %s is invalidated", taskChannel.getTaskExecuteType()));
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.utils;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.dolphinscheduler.common.exception.StorageOperateNoConfiguredException;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.metrics.WorkerServerMetrics;
import org.slf4j.Logger;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class Checker {
public static void checkTenantExist(WorkerConfig workerConfig, TaskExecutionContext taskExecutionContext) {
try {
boolean osUserExistFlag;
// if Using distributed is true and Currently supported systems are linux,Should not let it
// automatically
// create tenants,so TenantAutoCreate has no effect
if (workerConfig.isTenantDistributedUser() && SystemUtils.IS_OS_LINUX) {
// use the id command to judge in linux
osUserExistFlag = OSUtils.existTenantCodeInLinux(taskExecutionContext.getTenantCode());
} else if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
// if not exists this user, then create
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
} else {
osUserExistFlag = OSUtils.getUserList().contains(taskExecutionContext.getTenantCode());
}
if (!osUserExistFlag) {
throw new TaskException(String.format("TenantCode: %s doesn't exist", taskExecutionContext.getTenantCode()));
}
} catch (TaskException ex) {
throw ex;
} catch (Exception ex) {
throw new TaskException(String.format("TenantCode: %s doesn't exist", taskExecutionContext.getTenantCode()));
}
}
public static void createProcessLocalPathIfAbsent(TaskExecutionContext taskExecutionContext) throws TaskException {
try {
// local execute path
String execLocalPath = FileUtils.getProcessExecDir(
taskExecutionContext.getProjectCode(),
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setExecutePath(execLocalPath);
FileUtils.createWorkDirIfAbsent(execLocalPath);
} catch (Throwable ex) {
throw new TaskException("Cannot create process execute dir", ex);
}
}
public static void downloadResourcesIfNeeded(StorageOperate storageOperate, TaskExecutionContext taskExecutionContext, Logger logger) {
String execLocalPath = taskExecutionContext.getExecutePath();
Map<String, String> projectRes = taskExecutionContext.getResources();
if (MapUtils.isEmpty(projectRes)) {
return;
}
List<Pair<String, String>> downloadFiles = new ArrayList<>();
projectRes.forEach((key, value) -> {
File resFile = new File(execLocalPath, key);
boolean notExist = !resFile.exists();
if (notExist) {
downloadFiles.add(Pair.of(key, value));
} else {
logger.info("file : {} exists ", resFile.getName());
}
});
if (!downloadFiles.isEmpty() && !PropertyUtils.getResUploadStartupState()) {
throw new StorageOperateNoConfiguredException("Storage service config does not exist!");
}
if (CollectionUtils.isNotEmpty(downloadFiles)) {
for (Pair<String, String> fileDownload : downloadFiles) {
try {
// query the tenant code of the resource according to the name of the resource
String fullName = fileDownload.getLeft();
String tenantCode = fileDownload.getRight();
String resPath = storageOperate.getResourceFileName(tenantCode, fullName);
logger.info("get resource file from path:{}", resPath);
long resourceDownloadStartTime = System.currentTimeMillis();
storageOperate.download(tenantCode, resPath, execLocalPath + File.separator + fullName, false, true);
WorkerServerMetrics
.recordWorkerResourceDownloadTime(System.currentTimeMillis() - resourceDownloadStartTime);
WorkerServerMetrics.recordWorkerResourceDownloadSize(
Files.size(Paths.get(execLocalPath, fullName)));
WorkerServerMetrics.incWorkerResourceDownloadSuccessCount();
} catch (Exception e) {
WorkerServerMetrics.incWorkerResourceDownloadFailureCount();
throw new TaskException(String.format("Download resource file: %s error", fileDownload), e);
}
}
}
}
}
......@@ -17,157 +17,31 @@
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskDispatchCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.concurrent.ExecutorService;
/**
* test task execute processor
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({SpringApplicationContext.class, WorkerConfig.class, FileUtils.class, JsonSerializer.class,
JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
@Ignore
public class TaskDispatchProcessorTest {
private TaskExecutionContext taskExecutionContext;
private WorkerMessageSender workerMessageSender;
private ExecutorService workerExecService;
private StorageOperate storageOperate;
private WorkerConfig workerConfig;
private Command command;
private Command ackCommand;
private TaskDispatchCommand taskRequestCommand;
private AlertClientService alertClientService;
private WorkerManagerThread workerManager;
@Before
public void before() throws Exception {
// init task execution context
taskExecutionContext = getTaskExecutionContext();
workerConfig = new WorkerConfig();
workerConfig.setExecThreads(1);
workerConfig.setListenPort(1234);
command = new Command();
command.setType(CommandType.TASK_DISPATCH_REQUEST);
ackCommand = new TaskExecuteRunningCommand("127.0.0.1:1234",
"127.0.0.1:5678",
System.currentTimeMillis()).convert2Command();
taskRequestCommand = new TaskDispatchCommand(taskExecutionContext,
"127.0.0.1:5678",
"127.0.0.1:1234",
System.currentTimeMillis());
alertClientService = PowerMockito.mock(AlertClientService.class);
workerExecService = PowerMockito.mock(ExecutorService.class);
PowerMockito.when(workerExecService.submit(Mockito.any(TaskExecuteThread.class))).thenReturn(null);
PowerMockito.mockStatic(ChannelUtils.class);
PowerMockito.when(ChannelUtils.toAddress(null)).thenReturn(null);
workerMessageSender = PowerMockito.mock(WorkerMessageSender.class);
PowerMockito.mockStatic(SpringApplicationContext.class);
PowerMockito.when(SpringApplicationContext.getBean(WorkerMessageSender.class)).thenReturn(workerMessageSender);
PowerMockito.when(SpringApplicationContext.getBean(WorkerConfig.class)).thenReturn(workerConfig);
workerManager = PowerMockito.mock(WorkerManagerThread.class);
storageOperate = PowerMockito.mock(StorageOperate.class);
PowerMockito.when(workerManager.offer(new TaskExecuteThread(taskExecutionContext,
"127.0.0.1:5678",
workerMessageSender,
alertClientService,
storageOperate))).thenReturn(Boolean.TRUE);
PowerMockito.when(SpringApplicationContext.getBean(WorkerManagerThread.class)).thenReturn(workerManager);
PowerMockito.mockStatic(ThreadUtils.class);
PowerMockito.when(ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread",
workerConfig.getExecThreads())).thenReturn(
workerExecService);
PowerMockito.mockStatic(JsonSerializer.class);
PowerMockito.when(JsonSerializer.deserialize(command.getBody(), TaskDispatchCommand.class)).thenReturn(
taskRequestCommand);
PowerMockito.mockStatic(JSONUtils.class);
PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskDispatchCommand.class)).thenReturn(
taskRequestCommand);
PowerMockito.mockStatic(FileUtils.class);
PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId())).thenReturn(
taskExecutionContext.getExecutePath());
PowerMockito.doNothing().when(FileUtils.class, "createWorkDirIfAbsent", taskExecutionContext.getExecutePath());
SimpleTaskExecuteThread simpleTaskExecuteThread = new SimpleTaskExecuteThread(new TaskExecutionContext(),
workerMessageSender,
"127.0.0.1:5678",
LoggerFactory.getLogger(
TaskDispatchProcessorTest.class),
alertClientService,
storageOperate);
PowerMockito.whenNew(TaskExecuteThread.class).withAnyArguments().thenReturn(simpleTaskExecuteThread);
}
@Test
public void testNormalExecution() {
TaskDispatchProcessor processor = new TaskDispatchProcessor();
processor.process(null, command);
Assert.assertEquals(ExecutionStatus.RUNNING_EXECUTION, taskExecutionContext.getCurrentExecutionStatus());
}
@Test
public void testDelayExecution() {
taskExecutionContext.setDelayTime(1);
TaskDispatchProcessor processor = new TaskDispatchProcessor();
processor.process(null, command);
Assert.assertEquals(ExecutionStatus.DELAY_EXECUTION, taskExecutionContext.getCurrentExecutionStatus());
}
public TaskExecutionContext getTaskExecutionContext() {
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
......@@ -185,20 +59,4 @@ public class TaskDispatchProcessorTest {
return taskExecutionContext;
}
private static class SimpleTaskExecuteThread extends TaskExecuteThread {
public SimpleTaskExecuteThread(TaskExecutionContext taskExecutionContext,
WorkerMessageSender workerMessageSender,
String masterAddress,
Logger taskLogger,
AlertClientService alertClientService,
StorageOperate storageOperate) {
super(taskExecutionContext, masterAddress, workerMessageSender, alertClientService, storageOperate);
}
@Override
public void run() {
//
}
}
}
package org.apache.dolphinscheduler.server.worker.runner;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.Silent.class)
public class SyncWorkerDelayTaskExecuteRunnableTest {
@Test
public void executeTask() {
}
@Test
public void afterExecute() {
}
@Test
public void afterThrowing() {
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册