提交 1276e0d8 编写于 作者: Q qiaozhanwei

refactor-worker merge to dev bug fix

上级 b45e70f8
...@@ -27,6 +27,9 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters; ...@@ -27,6 +27,9 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.datax.DataxParameters; import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters; import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
import org.apache.dolphinscheduler.common.task.sql.SqlParameters; import org.apache.dolphinscheduler.common.task.sql.SqlParameters;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.common.thread.Stopper; import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.entity.*;
...@@ -44,7 +47,6 @@ import org.springframework.beans.factory.annotation.Autowired; ...@@ -44,7 +47,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
...@@ -219,14 +221,18 @@ public class TaskPriorityQueueConsumer extends Thread{ ...@@ -219,14 +221,18 @@ public class TaskPriorityQueueConsumer extends Thread{
DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget()); DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget());
if (dataSource != null){
dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource()); dataxTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource());
dataxTaskExecutionContext.setSourcetype(dataSource.getType().getCode()); dataxTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
dataxTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); dataxTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
}
if (dataTarget != null){
dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget()); dataxTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget());
dataxTaskExecutionContext.setTargetType(dataTarget.getType().getCode()); dataxTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
} }
}
/** /**
...@@ -235,20 +241,26 @@ public class TaskPriorityQueueConsumer extends Thread{ ...@@ -235,20 +241,26 @@ public class TaskPriorityQueueConsumer extends Thread{
* @param taskNode taskNode * @param taskNode taskNode
*/ */
private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskNode taskNode) { private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskNode taskNode) {
DataxParameters dataxParameters = JSONObject.parseObject(taskNode.getParams(), DataxParameters.class); SqoopParameters sqoopParameters = JSONObject.parseObject(taskNode.getParams(), SqoopParameters.class);
DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource()); SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceMysqlParameter.class);
DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget()); TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetMysqlParameter.class);
DataSource dataSource = processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource());
DataSource dataTarget = processService.findDataSourceById(targetMysqlParameter.getTargetDatasource());
sqoopTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource()); if (dataSource != null){
sqoopTaskExecutionContext.setDataSourceId(dataSource.getId());
sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode()); sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
}
sqoopTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget()); if (dataTarget != null){
sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId());
sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode()); sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
} }
}
/** /**
* set SQL task relation * set SQL task relation
......
...@@ -21,8 +21,6 @@ import com.alibaba.fastjson.JSONObject; ...@@ -21,8 +21,6 @@ import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.model.TaskNode; import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.process.Property; import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.process.ResourceInfo;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter; import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.*; import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
...@@ -36,7 +34,6 @@ import org.slf4j.LoggerFactory; ...@@ -36,7 +34,6 @@ import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
/** /**
...@@ -213,6 +210,10 @@ public class TaskExecuteThread implements Runnable { ...@@ -213,6 +210,10 @@ public class TaskExecuteThread implements Runnable {
List<String> projectRes, List<String> projectRes,
String tenantCode, String tenantCode,
Logger logger) throws Exception { Logger logger) throws Exception {
if (CollectionUtils.isEmpty(projectRes)){
return;
}
for (String resource : projectRes) { for (String resource : projectRes) {
File resFile = new File(execLocalPath, resource); File resFile = new File(execLocalPath, resource);
if (!resFile.exists()) { if (!resFile.exists()) {
......
...@@ -76,28 +76,28 @@ public class SqoopTaskTest { ...@@ -76,28 +76,28 @@ public class SqoopTaskTest {
SqoopParameters sqoopParameters1 = JSON.parseObject(data1,SqoopParameters.class); SqoopParameters sqoopParameters1 = JSON.parseObject(data1,SqoopParameters.class);
SqoopJobGenerator generator = new SqoopJobGenerator(); SqoopJobGenerator generator = new SqoopJobGenerator();
String script = generator.generateSqoopJob(sqoopParameters1); String script = generator.generateSqoopJob(sqoopParameters1,new TaskExecutionContext());
String expected = "sqoop import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile --delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'"; String expected = "sqoop import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile --delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'";
Assert.assertEquals(expected, script); Assert.assertEquals(expected, script);
String data2 = "{\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HDFS\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"exportDir\\\":\\\"/ods/tmp/test/person7\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"id,name,age,sex,create_time\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":true,\\\"targetUpdateKey\\\":\\\"id\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; String data2 = "{\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HDFS\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"exportDir\\\":\\\"/ods/tmp/test/person7\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"id,name,age,sex,create_time\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":true,\\\"targetUpdateKey\\\":\\\"id\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}";
SqoopParameters sqoopParameters2 = JSON.parseObject(data2,SqoopParameters.class); SqoopParameters sqoopParameters2 = JSON.parseObject(data2,SqoopParameters.class);
String script2 = generator.generateSqoopJob(sqoopParameters2); String script2 = generator.generateSqoopJob(sqoopParameters2,new TaskExecutionContext());
String expected2 = "sqoop export -m 1 --export-dir /ods/tmp/test/person7 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' --lines-terminated-by '\\n' --update-key id --update-mode allowinsert"; String expected2 = "sqoop export -m 1 --export-dir /ods/tmp/test/person7 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' --lines-terminated-by '\\n' --update-key id --update-mode allowinsert";
Assert.assertEquals(expected2, script2); Assert.assertEquals(expected2, script2);
String data3 = "{\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HIVE\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-17\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":false,\\\"targetUpdateKey\\\":\\\"\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}"; String data3 = "{\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HIVE\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-17\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":false,\\\"targetUpdateKey\\\":\\\"\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}";
SqoopParameters sqoopParameters3 = JSON.parseObject(data3,SqoopParameters.class); SqoopParameters sqoopParameters3 = JSON.parseObject(data3,SqoopParameters.class);
String script3 = generator.generateSqoopJob(sqoopParameters3); String script3 = generator.generateSqoopJob(sqoopParameters3,new TaskExecutionContext());
String expected3 = "sqoop export -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date --hcatalog-partition-values 2020-02-17 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --fields-terminated-by '@' --lines-terminated-by '\\n'"; String expected3 = "sqoop export -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date --hcatalog-partition-values 2020-02-17 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --fields-terminated-by '@' --lines-terminated-by '\\n'";
Assert.assertEquals(expected3, script3); Assert.assertEquals(expected3, script3);
String data4 = "{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}"; String data4 = "{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}";
SqoopParameters sqoopParameters4 = JSON.parseObject(data4,SqoopParameters.class); SqoopParameters sqoopParameters4 = JSON.parseObject(data4,SqoopParameters.class);
String script4 = generator.generateSqoopJob(sqoopParameters4); String script4 = generator.generateSqoopJob(sqoopParameters4,new TaskExecutionContext());
String expected4 = "sqoop import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --query 'SELECT * FROM person_2 WHERE $CONDITIONS' --map-column-java id=Integer --hive-import --hive-table stg.person_internal_2 --create-hive-table --hive-overwrite -delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16"; String expected4 = "sqoop import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --query 'SELECT * FROM person_2 WHERE $CONDITIONS' --map-column-java id=Integer --hive-import --hive-table stg.person_internal_2 --create-hive-table --hive-overwrite -delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16";
Assert.assertEquals(expected4, script4); Assert.assertEquals(expected4, script4);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册