提交 0656327c 编写于 作者: R Rubik-W 提交者: lenboo

Fixbug datax task (#2909)

* fix:  local param bug

* fix: UT bug
Co-authored-by: NRubik-W <whm_777@163.com>
上级 334b5cef
...@@ -20,6 +20,7 @@ import java.util.ArrayList; ...@@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.process.ResourceInfo; import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
...@@ -31,7 +32,7 @@ public class DataxParameters extends AbstractParameters { ...@@ -31,7 +32,7 @@ public class DataxParameters extends AbstractParameters {
/** /**
* if custom json config,eg 0, 1 * if custom json config,eg 0, 1
*/ */
private Integer customConfig; private int customConfig;
/** /**
* if customConfig eq 1 ,then json is usable * if customConfig eq 1 ,then json is usable
...@@ -88,11 +89,11 @@ public class DataxParameters extends AbstractParameters { ...@@ -88,11 +89,11 @@ public class DataxParameters extends AbstractParameters {
*/ */
private int jobSpeedRecord; private int jobSpeedRecord;
public Integer getCustomConfig() { public int getCustomConfig() {
return customConfig; return customConfig;
} }
public void setCustomConfig(Integer customConfig) { public void setCustomConfig(int customConfig) {
this.customConfig = customConfig; this.customConfig = customConfig;
} }
...@@ -184,11 +185,9 @@ public class DataxParameters extends AbstractParameters { ...@@ -184,11 +185,9 @@ public class DataxParameters extends AbstractParameters {
this.jobSpeedRecord = jobSpeedRecord; this.jobSpeedRecord = jobSpeedRecord;
} }
@Override @Override
public boolean checkParameters() { public boolean checkParameters() {
if (customConfig == null) return false; if (customConfig == Flag.NO.ordinal()) {
if (customConfig == 0) {
return dataSource != 0 return dataSource != 0
&& dataTarget != 0 && dataTarget != 0
&& StringUtils.isNotEmpty(sql) && StringUtils.isNotEmpty(sql)
......
...@@ -48,7 +48,7 @@ public class ParameterUtils { ...@@ -48,7 +48,7 @@ public class ParameterUtils {
* @return convert parameters place holders * @return convert parameters place holders
*/ */
public static String convertParameterPlaceholders(String parameterString, Map<String, String> parameterMap) { public static String convertParameterPlaceholders(String parameterString, Map<String, String> parameterMap) {
if (StringUtils.isEmpty(parameterString)) { if (StringUtils.isEmpty(parameterString) || parameterMap == null) {
return parameterString; return parameterString;
} }
......
...@@ -41,6 +41,7 @@ import org.apache.dolphinscheduler.common.Constants; ...@@ -41,6 +41,7 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType; import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters; import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
...@@ -149,9 +150,16 @@ public class DataxTask extends AbstractTask { ...@@ -149,9 +150,16 @@ public class DataxTask extends AbstractTask {
String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskExecutionContext.getTaskAppId()); String threadLoggerInfoName = String.format("TaskLogInfo-%s", taskExecutionContext.getTaskAppId());
Thread.currentThread().setName(threadLoggerInfoName); Thread.currentThread().setName(threadLoggerInfoName);
// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
dataXParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
// run datax process // run datax process
String jsonFilePath = buildDataxJsonFile(); String jsonFilePath = buildDataxJsonFile(paramsMap);
String shellCommandFilePath = buildShellCommandFile(jsonFilePath); String shellCommandFilePath = buildShellCommandFile(jsonFilePath, paramsMap);
CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath); CommandExecuteResult commandExecuteResult = shellCommandExecutor.run(shellCommandFilePath);
setExitStatusCode(commandExecuteResult.getExitStatusCode()); setExitStatusCode(commandExecuteResult.getExitStatusCode());
...@@ -184,7 +192,7 @@ public class DataxTask extends AbstractTask { ...@@ -184,7 +192,7 @@ public class DataxTask extends AbstractTask {
* @return datax json file name * @return datax json file name
* @throws Exception if error throws Exception * @throws Exception if error throws Exception
*/ */
private String buildDataxJsonFile() private String buildDataxJsonFile(Map<String, Property> paramsMap)
throws Exception { throws Exception {
// generate json // generate json
String fileName = String.format("%s/%s_job.json", String fileName = String.format("%s/%s_job.json",
...@@ -197,26 +205,9 @@ public class DataxTask extends AbstractTask { ...@@ -197,26 +205,9 @@ public class DataxTask extends AbstractTask {
return fileName; return fileName;
} }
if (dataXParameters.getCustomConfig() == Flag.YES.ordinal()){
if (dataXParameters.getCustomConfig() == 1){
json = dataXParameters.getJson().replaceAll("\\r\\n", "\n"); json = dataXParameters.getJson().replaceAll("\\r\\n", "\n");
/**
* combining local and global parameters
*/
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
dataXParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
if (paramsMap != null){
json = ParameterUtils.convertParameterPlaceholders(json, ParamUtils.convert(paramsMap));
}
}else { }else {
JSONObject job = new JSONObject(); JSONObject job = new JSONObject();
job.put("content", buildDataxJobContentJson()); job.put("content", buildDataxJobContentJson());
job.put("setting", buildDataxJobSettingJson()); job.put("setting", buildDataxJobSettingJson());
...@@ -227,6 +218,9 @@ public class DataxTask extends AbstractTask { ...@@ -227,6 +218,9 @@ public class DataxTask extends AbstractTask {
json = root.toString(); json = root.toString();
} }
// replace placeholder
json = ParameterUtils.convertParameterPlaceholders(json, ParamUtils.convert(paramsMap));
logger.debug("datax job json : {}", json); logger.debug("datax job json : {}", json);
// create datax json file // create datax json file
...@@ -359,7 +353,7 @@ public class DataxTask extends AbstractTask { ...@@ -359,7 +353,7 @@ public class DataxTask extends AbstractTask {
* @return shell command file name * @return shell command file name
* @throws Exception if error throws Exception * @throws Exception if error throws Exception
*/ */
private String buildShellCommandFile(String jobConfigFilePath) private String buildShellCommandFile(String jobConfigFilePath, Map<String, Property> paramsMap)
throws Exception { throws Exception {
// generate scripts // generate scripts
String fileName = String.format("%s/%s_node.%s", String fileName = String.format("%s/%s_node.%s",
...@@ -380,18 +374,9 @@ public class DataxTask extends AbstractTask { ...@@ -380,18 +374,9 @@ public class DataxTask extends AbstractTask {
sbr.append(DATAX_HOME_EVN); sbr.append(DATAX_HOME_EVN);
sbr.append(" "); sbr.append(" ");
sbr.append(jobConfigFilePath); sbr.append(jobConfigFilePath);
String dataxCommand = sbr.toString();
// combining local and global parameters
// replace placeholder // replace placeholder
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()), String dataxCommand = ParameterUtils.convertParameterPlaceholders(sbr.toString(), ParamUtils.convert(paramsMap));
taskExecutionContext.getDefinedParams(),
dataXParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());
if (paramsMap != null) {
dataxCommand = ParameterUtils.convertParameterPlaceholders(dataxCommand, ParamUtils.convert(paramsMap));
}
logger.debug("raw script : {}", dataxCommand); logger.debug("raw script : {}", dataxCommand);
......
...@@ -21,9 +21,9 @@ import java.lang.reflect.Method; ...@@ -21,9 +21,9 @@ import java.lang.reflect.Method;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Map;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DbType; import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource; import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory; import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
...@@ -273,14 +273,15 @@ public class DataxTaskTest { ...@@ -273,14 +273,15 @@ public class DataxTaskTest {
setTaskParems(0); setTaskParems(0);
buildDataJson(); buildDataJson();
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage()); Assert.fail(e.getMessage());
} }
} }
public void buildDataJson() throws Exception { public void buildDataJson() throws Exception {
Method method = DataxTask.class.getDeclaredMethod("buildDataxJsonFile"); Method method = DataxTask.class.getDeclaredMethod("buildDataxJsonFile", new Class[]{Map.class});
method.setAccessible(true); method.setAccessible(true);
String filePath = (String) method.invoke(dataxTask, null); String filePath = (String) method.invoke(dataxTask, new Object[]{null});
Assert.assertNotNull(filePath); Assert.assertNotNull(filePath);
} }
...@@ -358,9 +359,9 @@ public class DataxTaskTest { ...@@ -358,9 +359,9 @@ public class DataxTaskTest {
public void testBuildShellCommandFile() public void testBuildShellCommandFile()
throws Exception { throws Exception {
try { try {
Method method = DataxTask.class.getDeclaredMethod("buildShellCommandFile", String.class); Method method = DataxTask.class.getDeclaredMethod("buildShellCommandFile", String.class, Map.class);
method.setAccessible(true); method.setAccessible(true);
Assert.assertNotNull(method.invoke(dataxTask, "test.json")); Assert.assertNotNull(method.invoke(dataxTask, "test.json", null));
} }
catch (Exception e) { catch (Exception e) {
Assert.fail(e.getMessage()); Assert.fail(e.getMessage());
......
...@@ -236,6 +236,12 @@ ...@@ -236,6 +236,12 @@
_onPostStatements (a) { _onPostStatements (a) {
this.postStatements = a this.postStatements = a
}, },
/**
* return localParams
*/
_onLocalParams (a) {
this.localParams = a
},
/** /**
* verification * verification
*/ */
...@@ -246,6 +252,11 @@ ...@@ -246,6 +252,11 @@
return false return false
} }
// localParams Subcomponent verification
if (!this.$refs.refLocalParams._verifProp()) {
return false
}
// storage // storage
this.$emit('on-params', { this.$emit('on-params', {
customConfig: this.customConfig, customConfig: this.customConfig,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册