diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java index 0809f31129fd6c06826d7937a5fd7c17ebe13087..535c274989f801a7bccf4627b06793eab83e959a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java @@ -18,10 +18,7 @@ package org.apache.dolphinscheduler.server.builder; import org.apache.dolphinscheduler.dao.entity.*; -import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext; -import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext; -import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext; -import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.server.entity.*; /** * TaskExecutionContext builder @@ -111,14 +108,25 @@ public class TaskExecutionContextBuilder { /** * build procedureTask related info * - * @param procedureTaskExecutionContext - * @return + * @param procedureTaskExecutionContext procedureTaskExecutionContext + * @return TaskExecutionContextBuilder */ public TaskExecutionContextBuilder buildProcedureTaskRelatedInfo(ProcedureTaskExecutionContext procedureTaskExecutionContext){ taskExecutionContext.setProcedureTaskExecutionContext(procedureTaskExecutionContext); return this; } + /** + * build sqoopTask related info + * + * @param sqoopTaskExecutionContext sqoopTaskExecutionContext + * @return TaskExecutionContextBuilder + */ + public TaskExecutionContextBuilder buildSqoopTaskRelatedInfo(SqoopTaskExecutionContext sqoopTaskExecutionContext){ + taskExecutionContext.setSqoopTaskExecutionContext(sqoopTaskExecutionContext); + return this; + } + /** * create diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java index 30927669155777976857811ff8509744cb941ff0..735795f4637612f379d356fa0b212748af169f73 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java @@ -159,6 +159,7 @@ public class TaskPriorityQueueConsumer extends Thread{ SQLTaskExecutionContext sqlTaskExecutionContext = new SQLTaskExecutionContext(); DataxTaskExecutionContext dataxTaskExecutionContext = new DataxTaskExecutionContext(); ProcedureTaskExecutionContext procedureTaskExecutionContext = new ProcedureTaskExecutionContext(); + SqoopTaskExecutionContext sqoopTaskExecutionContext = new SqoopTaskExecutionContext(); // SQL task @@ -178,6 +179,10 @@ public class TaskPriorityQueueConsumer extends Thread{ setProcedureTaskRelation(procedureTaskExecutionContext, taskNode); } + if (taskType == TaskType.SQOOP){ + setSqoopTaskRelation(sqoopTaskExecutionContext,taskNode); + } + return TaskExecutionContextBuilder.get() .buildTaskInstanceRelatedInfo(taskInstance) @@ -222,6 +227,28 @@ public class TaskPriorityQueueConsumer extends Thread{ dataxTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); } + + /** + * set datax task relation + * @param sqoopTaskExecutionContext sqoopTaskExecutionContext + * @param taskNode taskNode + */ + private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskNode taskNode) { + DataxParameters dataxParameters = JSONObject.parseObject(taskNode.getParams(), DataxParameters.class); + + DataSource dataSource = processService.findDataSourceById(dataxParameters.getDataSource()); + DataSource dataTarget = processService.findDataSourceById(dataxParameters.getDataTarget()); + + + sqoopTaskExecutionContext.setDataSourceId(dataxParameters.getDataSource()); + sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode()); + sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams()); + + sqoopTaskExecutionContext.setDataTargetId(dataxParameters.getDataTarget()); + sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode()); + sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams()); + } + /** * set SQL task relation * @param sqlTaskExecutionContext sqlTaskExecutionContext