From 4a68bfbe1c816f9b1f2f43c1f01218689b57ed6f Mon Sep 17 00:00:00 2001 From: Hua Jiang Date: Wed, 21 Jul 2021 18:36:20 +0800 Subject: [PATCH] [Improvement-5852][server] Support two parameters related to task for the rest of type of tasks. (#5867) * provide two system parameters to support the rest of type of tasks * provide two system parameters to support the rest of type of tasks * improve test conversion --- .../server/utils/ParamUtils.java | 73 ++++--------------- .../server/worker/task/datax/DataxTask.java | 10 +-- .../server/worker/task/flink/FlinkTask.java | 10 +-- .../server/worker/task/http/HttpTask.java | 11 +-- .../server/worker/task/mr/MapReduceTask.java | 10 +-- .../worker/task/procedure/ProcedureTask.java | 8 +- .../server/worker/task/python/PythonTask.java | 18 ++--- .../server/worker/task/shell/ShellTask.java | 5 +- .../server/worker/task/spark/SparkTask.java | 10 +-- .../server/worker/task/sql/SqlTask.java | 12 +-- .../server/worker/task/sqoop/SqoopTask.java | 9 +-- .../server/master/ParamsTest.java | 49 +------------ .../server/utils/ParamUtilsTest.java | 63 +++++++++++----- 13 files changed, 85 insertions(+), 203 deletions(-) diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java index cbf663fce..57abf0b4e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java @@ -43,69 +43,15 @@ public class ParamUtils { /** * parameter conversion - * @param globalParams global params - * @param globalParamsMap global params map - * @param localParams local params - * @param commandType command type - * @param scheduleTime schedule time - * @return global params - */ - public static Map convert(Map globalParams, - Map globalParamsMap, - Map localParams, - Map varParams, - CommandType commandType, - Date scheduleTime) { - if (globalParams == null && localParams == null) { - return null; - } - // if it is a complement, - // you need to pass in the task instance id to locate the time - // of the process instance complement - Map timeParams = BusinessTimeUtils - .getBusinessTime(commandType, - scheduleTime); - - if (globalParamsMap != null) { - timeParams.putAll(globalParamsMap); - } - - if (globalParams != null && localParams != null) { - localParams.putAll(globalParams); - globalParams = localParams; - } else if (globalParams == null && localParams != null) { - globalParams = localParams; - } - if (varParams != null) { - varParams.putAll(globalParams); - globalParams = varParams; - } - Iterator> iter = globalParams.entrySet().iterator(); - while (iter.hasNext()) { - Map.Entry en = iter.next(); - Property property = en.getValue(); - - if (StringUtils.isNotEmpty(property.getValue()) - && property.getValue().startsWith("$")) { - /** - * local parameter refers to global parameter with the same name - * note: the global parameters of the process instance here are solidified parameters, - * and there are no variables in them. - */ - String val = property.getValue(); - val = ParameterUtils.convertParameterPlaceholders(val, timeParams); - property.setValue(val); - } - } - - return globalParams; - } - - /** - * parameter conversion + * Warning: + * When you first invoke the function of convert, the variables of localParams and varPool in the ShellParameters will be modified. + * But in the whole system the variables of localParams and varPool have been used in other functions. I'm not sure if this current + * situation is wrong. So I cannot modify the original logic. + * * @param taskExecutionContext the context of this task instance * @param parameters the parameters * @return global params + * */ public static Map convert(TaskExecutionContext taskExecutionContext, AbstractParameters parameters) { Preconditions.checkNotNull(taskExecutionContext); @@ -115,8 +61,11 @@ public class ParamUtils { CommandType commandType = CommandType.of(taskExecutionContext.getCmdTypeIfComplement()); Date scheduleTime = taskExecutionContext.getScheduleTime(); + // combining local and global parameters Map localParams = parameters.getLocalParametersMap(); + Map varParams = parameters.getVarPoolMap(); + if (globalParams == null && localParams == null) { return null; } @@ -141,6 +90,10 @@ public class ParamUtils { } else if (globalParams == null && localParams != null) { globalParams = localParams; } + if (varParams != null) { + varParams.putAll(globalParams); + globalParams = varParams; + } Iterator> iter = globalParams.entrySet().iterator(); while (iter.hasNext()) { Map.Entry en = iter.next(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java index b785cb5d4..84b2c27b5 100755 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java @@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.worker.task.datax; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.common.datasource.DatasourceUtil; -import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.process.Property; @@ -154,13 +153,8 @@ public class DataxTask extends AbstractTask { String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskExecutionContext.getTaskAppId()); Thread.currentThread().setName(threadLoggerInfoName); - // combining local and global parameters - Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), - taskExecutionContext.getDefinedParams(), - dataXParameters.getLocalParametersMap(), - dataXParameters.getVarPoolMap(), - CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), - taskExecutionContext.getScheduleTime()); + // replace placeholder,and combine local and global parameters + Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); // run datax procesDataSourceService.s String jsonFilePath = buildDataxJsonFile(paramsMap); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java index 27e5b42f4..863b91aaf 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/flink/FlinkTask.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.worker.task.flink; -import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.task.AbstractParameters; @@ -80,13 +79,8 @@ public class FlinkTask extends AbstractYarnTask { if (StringUtils.isNotEmpty(flinkParameters.getMainArgs())) { String args = flinkParameters.getMainArgs(); - // replace placeholder - Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), - taskExecutionContext.getDefinedParams(), - flinkParameters.getLocalParametersMap(), - flinkParameters.getVarPoolMap(), - CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), - taskExecutionContext.getScheduleTime()); + // combining local and global parameters + Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); logger.info("param Map : {}", paramsMap); if (paramsMap != null) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java index 01ac50bd0..4e3474157 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/http/HttpTask.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.worker.task.http; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.HttpMethod; import org.apache.dolphinscheduler.common.enums.HttpParametersType; import org.apache.dolphinscheduler.common.process.HttpProperty; @@ -137,13 +136,9 @@ public class HttpTask extends AbstractTask { protected CloseableHttpResponse sendRequest(CloseableHttpClient client) throws IOException { RequestBuilder builder = createRequestBuilder(); - // replace placeholder - Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), - taskExecutionContext.getDefinedParams(), - httpParameters.getLocalParametersMap(), - httpParameters.getVarPoolMap(), - CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), - taskExecutionContext.getScheduleTime()); + // replace placeholder,and combine local and global parameters + Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); + List httpPropertyList = new ArrayList<>(); if (CollectionUtils.isNotEmpty(httpParameters.getHttpParams())) { for (HttpProperty httpProperty : httpParameters.getHttpParams()) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java index ce908df59..5e8f3ca93 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/mr/MapReduceTask.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.worker.task.mr; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.ProgramType; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.ResourceInfo; @@ -84,13 +83,8 @@ public class MapReduceTask extends AbstractYarnTask { mapreduceParameters.setQueue(taskExecutionContext.getQueue()); setMainJarName(); - // replace placeholder - Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), - taskExecutionContext.getDefinedParams(), - mapreduceParameters.getLocalParametersMap(), - mapreduceParameters.getVarPoolMap(), - CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), - taskExecutionContext.getScheduleTime()); + // replace placeholder,and combine local and global parameters + Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); if (paramsMap != null) { String args = ParameterUtils.convertParameterPlaceholders(mapreduceParameters.getMainArgs(), ParamUtils.convert(paramsMap)); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java index 3748c7a49..1a1573ca9 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/procedure/ProcedureTask.java @@ -30,7 +30,6 @@ import static org.apache.dolphinscheduler.common.enums.DataType.VARCHAR; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.datasource.ConnectionParam; import org.apache.dolphinscheduler.common.datasource.DatasourceUtil; -import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.DataType; import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.Direct; @@ -119,12 +118,7 @@ public class ProcedureTask extends AbstractTask { connection = DatasourceUtil.getConnection(dbType, connectionParam); // combining local and global parameters - Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), - taskExecutionContext.getDefinedParams(), - procedureParameters.getLocalParametersMap(), - procedureParameters.getVarPoolMap(), - CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), - taskExecutionContext.getScheduleTime()); + Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); // call method stmt = connection.prepareCall(procedureParameters.getMethod()); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java index e784a79b2..0ee480d7d 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java @@ -14,25 +14,26 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.dolphinscheduler.server.worker.task.python; +package org.apache.dolphinscheduler.server.worker.task.python; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.python.PythonParameters; -import org.apache.dolphinscheduler.common.utils.*; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; +import org.apache.dolphinscheduler.common.utils.VarPoolUtils; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.utils.ParamUtils; import org.apache.dolphinscheduler.server.worker.task.AbstractTask; import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult; import org.apache.dolphinscheduler.server.worker.task.PythonCommandExecutor; -import org.slf4j.Logger; import java.util.Map; +import org.slf4j.Logger; + /** * python task */ @@ -115,13 +116,8 @@ public class PythonTask extends AbstractTask { private String buildCommand() throws Exception { String rawPythonScript = pythonParameters.getRawScript().replaceAll("\\r\\n", "\n"); - // replace placeholder - Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), - taskExecutionContext.getDefinedParams(), - pythonParameters.getLocalParametersMap(), - pythonParameters.getVarPoolMap(), - CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), - taskExecutionContext.getScheduleTime()); + // combining local and global parameters + Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); try { rawPythonScript = VarPoolUtils.convertPythonScriptPlaceholders(rawPythonScript); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java index f7887df41..32c2ad18f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java @@ -21,7 +21,6 @@ import static java.util.Calendar.DAY_OF_MONTH; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.enums.CommandType; -import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.shell.ShellParameters; @@ -42,10 +41,8 @@ import java.nio.file.StandardOpenOption; import java.nio.file.attribute.FileAttribute; import java.nio.file.attribute.PosixFilePermission; import java.nio.file.attribute.PosixFilePermissions; -import java.util.ArrayList; import java.util.Date; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Set; @@ -166,7 +163,7 @@ public class ShellTask extends AbstractTask { private String parseScript(String script) { // combining local and global parameters - Map paramsMap = ParamUtils.convert(taskExecutionContext,shellParameters); + Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job if (taskExecutionContext.getScheduleTime() != null) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java index a5a641cca..6939439ef 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/spark/SparkTask.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.worker.task.spark; -import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.SparkVersion; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.ResourceInfo; @@ -109,13 +108,8 @@ public class SparkTask extends AbstractYarnTask { // other parameters args.addAll(SparkArgsUtils.buildArgs(sparkParameters)); - // replace placeholder - Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), - taskExecutionContext.getDefinedParams(), - sparkParameters.getLocalParametersMap(), - sparkParameters.getVarPoolMap(), - CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), - taskExecutionContext.getScheduleTime()); + // replace placeholder, and combining local and global parameters + Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); String command = null; diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java index b174734e0..9dd8b516e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sql/SqlTask.java @@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.worker.task.sql; import org.apache.dolphinscheduler.common.Constants; import org.apache.dolphinscheduler.common.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.common.datasource.DatasourceUtil; -import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy; @@ -166,14 +165,8 @@ public class SqlTask extends AbstractTask { Map sqlParamsMap = new HashMap<>(); StringBuilder sqlBuilder = new StringBuilder(); - // find process instance by task id - - Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), - taskExecutionContext.getDefinedParams(), - sqlParameters.getLocalParametersMap(), - sqlParameters.getVarPoolMap(), - CommandType.of(taskExecutionContext.getCmdTypeIfComplement()), - taskExecutionContext.getScheduleTime()); + // combining local and global parameters + Map paramsMap = ParamUtils.convert(taskExecutionContext,getParameters()); // spell SQL according to the final user-defined variable if (paramsMap == null) { @@ -276,7 +269,6 @@ public class SqlTask extends AbstractTask { } } - public String setNonQuerySqlReturn(String updateResult, List properties) { String result = null; for (Property info :properties) { diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java index 1d1b32de0..2f3e48dc4 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java @@ -17,7 +17,6 @@ package org.apache.dolphinscheduler.server.worker.task.sqoop; -import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters; @@ -73,12 +72,8 @@ public class SqoopTask extends AbstractYarnTask { SqoopJobGenerator generator = new SqoopJobGenerator(); String script = generator.generateSqoopJob(sqoopParameters, sqoopTaskExecutionContext); - Map paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(sqoopTaskExecutionContext.getDefinedParams()), - sqoopTaskExecutionContext.getDefinedParams(), - sqoopParameters.getLocalParametersMap(), - sqoopParameters.getVarPoolMap(), - CommandType.of(sqoopTaskExecutionContext.getCmdTypeIfComplement()), - sqoopTaskExecutionContext.getScheduleTime()); + // combining local and global parameters + Map paramsMap = ParamUtils.convert(sqoopTaskExecutionContext,getParameters()); if (paramsMap != null) { String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap)); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java index 12613c61c..c3fa0fc54 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/ParamsTest.java @@ -14,27 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.dolphinscheduler.server.master; import org.apache.dolphinscheduler.common.enums.CommandType; -import org.apache.dolphinscheduler.common.enums.DataType; -import org.apache.dolphinscheduler.common.enums.Direct; -import org.apache.dolphinscheduler.common.process.Property; -import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.ParameterUtils; import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils; -import org.apache.dolphinscheduler.server.utils.ParamUtils; import java.util.Calendar; import java.util.Date; -import java.util.HashMap; import java.util.Map; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * user define param */ @@ -42,9 +36,8 @@ public class ParamsTest { private static final Logger logger = LoggerFactory.getLogger(ParamsTest.class); - @Test - public void systemParamsTest()throws Exception{ + public void systemParamsTest() throws Exception { String command = "${system.biz.date}"; // start process @@ -56,12 +49,10 @@ public class ParamsTest { logger.info("start process : {}",command); - Calendar calendar = Calendar.getInstance(); calendar.setTime(new Date()); calendar.add(Calendar.DAY_OF_MONTH, -5); - command = "${system.biz.date}"; // complement data timeParams = BusinessTimeUtils @@ -71,40 +62,4 @@ public class ParamsTest { logger.info("complement data : {}",command); } - - @Test - public void convertTest() throws Exception { - Map globalParams = new HashMap<>(); - Property property = new Property(); - property.setProp("global_param"); - property.setDirect(Direct.IN); - property.setType(DataType.VARCHAR); - property.setValue("${system.biz.date}"); - globalParams.put("global_param", property); - - Map globalParamsMap = new HashMap<>(); - globalParamsMap.put("global_param", "${system.biz.date}"); - - Map localParams = new HashMap<>(); - Property localProperty = new Property(); - localProperty.setProp("local_param"); - localProperty.setDirect(Direct.IN); - localProperty.setType(DataType.VARCHAR); - localProperty.setValue("${global_param}"); - localParams.put("local_param", localProperty); - - Map varPoolParams = new HashMap<>(); - Property varProperty = new Property(); - varProperty.setProp("local_param"); - varProperty.setDirect(Direct.IN); - varProperty.setType(DataType.VARCHAR); - varProperty.setValue("${global_param}"); - varPoolParams.put("varPool", varProperty); - - Map paramsMap = ParamUtils.convert(globalParams, globalParamsMap, - localParams,varPoolParams, CommandType.START_PROCESS, new Date()); - logger.info(JSONUtils.toJsonString(paramsMap)); - - - } } \ No newline at end of file diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java index 99a6eb211..4d7bc93b4 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import org.apache.dolphinscheduler.common.Constants; -import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.DataType; import org.apache.dolphinscheduler.common.enums.Direct; import org.apache.dolphinscheduler.common.enums.TaskType; @@ -85,7 +84,7 @@ public class ParamUtilsTest { localParams.put("local_param", localProperty); Property varProperty = new Property(); - varProperty.setProp("local_param"); + varProperty.setProp("varPool"); varProperty.setDirect(Direct.IN); varProperty.setType(DataType.VARCHAR); varProperty.setValue("${global_param}"); @@ -93,42 +92,72 @@ public class ParamUtilsTest { } /** - * Test convert + * This is basic test case for ParamUtils.convert. + * Warning: + * As you can see,this case invokes the function of convert in different situations. When you first invoke the function of convert, + * the variables of localParams and varPool in the ShellParameters will be modified. But in the whole system the variables of localParams + * and varPool have been used in other functions. I'm not sure if this current situation is wrong. So I cannot modify the original logic. */ @Test public void testConvert() { //The expected value - String expected = "{\"varPool\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}," + String expected = "{\"varPool\":{\"prop\":\"varPool\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}," + "\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}," + "\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}"; + //The expected value when globalParams is null but localParams is not null - String expected1 = "{\"varPool\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}," - + "\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}," + String expected1 = "{\"varPool\":{\"prop\":\"varPool\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}," + "\"local_param\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"}}"; //Define expected date , the month is 0-base Calendar calendar = Calendar.getInstance(); calendar.set(2019, 11, 30); Date date = calendar.getTime(); + List globalParamList = globalParams.values().stream().collect(Collectors.toList()); + List localParamList = localParams.values().stream().collect(Collectors.toList()); + List varPoolParamList = varPoolParams.values().stream().collect(Collectors.toList()); + + TaskExecutionContext taskExecutionContext = new TaskExecutionContext(); + taskExecutionContext.setTaskInstanceId(1); + taskExecutionContext.setTaskName("params test"); + taskExecutionContext.setTaskType(TaskType.SHELL.getDesc()); + taskExecutionContext.setHost("127.0.0.1:1234"); + taskExecutionContext.setExecutePath("/tmp/test"); + taskExecutionContext.setLogPath("/log"); + taskExecutionContext.setProcessInstanceId(1); + taskExecutionContext.setExecutorId(1); + taskExecutionContext.setCmdTypeIfComplement(0); + taskExecutionContext.setScheduleTime(date); + taskExecutionContext.setGlobalParams(JSONUtils.toJsonString(globalParamList)); + taskExecutionContext.setDefinedParams(globalParamsMap); + taskExecutionContext.setVarPool("[{\"prop\":\"varPool\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"${global_param}\"}]"); + taskExecutionContext.setTaskParams( + "{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd HH:mm:ss]\\necho \\\" ${task_execution_id} \\\"\\necho \\\" ${task_execution_path}\\\"\\n\"," + + "\"localParams\":" + + "[],\"resourceList\":[]}"); + + ShellParameters shellParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ShellParameters.class); + shellParameters.setLocalParams(localParamList); + + String varPoolParamsJson = JSONUtils.toJsonString(varPoolParams,SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS); + shellParameters.setVarPool(taskExecutionContext.getVarPool()); + shellParameters.dealOutParam(varPoolParamsJson); + //Invoke convert - Map paramsMap = ParamUtils.convert(globalParams, globalParamsMap, localParams, varPoolParams,CommandType.START_PROCESS, date); + Map paramsMap = ParamUtils.convert(taskExecutionContext, shellParameters); String result = JSONUtils.toJsonString(paramsMap); assertEquals(expected, result); - for (Map.Entry entry : paramsMap.entrySet()) { - - String key = entry.getKey(); - Property prop = entry.getValue(); - logger.info(key + " : " + prop.getValue()); - } - //Invoke convert with null globalParams - Map paramsMap1 = ParamUtils.convert(null, globalParamsMap, localParams,varPoolParams, CommandType.START_PROCESS, date); + taskExecutionContext.setDefinedParams(null); + Map paramsMap1 = ParamUtils.convert(taskExecutionContext, shellParameters); + String result1 = JSONUtils.toJsonString(paramsMap1); assertEquals(expected1, result1); - //Null check, invoke convert with null globalParams and null localParams - Map paramsMap2 = ParamUtils.convert(null, globalParamsMap, null, varPoolParams,CommandType.START_PROCESS, date); + // Null check, invoke convert with null globalParams and null localParams + shellParameters.setLocalParams(null); + Map paramsMap2 = ParamUtils.convert(taskExecutionContext, shellParameters); assertNull(paramsMap2); } -- GitLab