未验证 提交 e31b48fd 编写于 作者: Q qiaozhanwei 提交者: GitHub

1,get workergroup from zk modify 2,SpringConnectionFactory repeat load modify (#2201)

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* 1, master persistent task
2. extract  master and worker communication model

* add license

* modify javadoc error

* TaskExecutionContext create modify

* buildAckCommand taskInstanceId not set modify

* java doc error modify

* add comment

* ExecutorManager interface add generic type

* add TaskInstanceCacheManager receive Worker report result

* TaskInstance setExecutePath

* add TaskInstanceCacheManager to receive Worker Task result report

* TaskInstanceCacheManager add remove method

* add license

* add dispatcht task method

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* AbstractCommandExecutor remove db access

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* taskInstanceCache is null ,need load from db

* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify

* worker remove db

* ShellTask modify

* master persistence processId and appIds

* master persistence processId and appIds

* master add kill task logic

* master add kill task logic

* master add kill task logic

* javadoc error modify

* remove chinese log

* executeDirectly method add Override

* remote module modify

* TaskKillResponseProcessor command type modify

* create buildKillCommand

* host add host:port format

* host add host:port format

* TaskAckProcessor modify

* TaskAckProcessor modify

* task prioriry refator

* remove ITaskQueue

* task prioriry refator

* remove ITaskQueue

* TaskPriority refactor

* remove logs

* WorkerServer refactor

* MasterSchedulerService modify

* WorkerConfig listen port modify

* modify master and worker listen port

* cancelTaskInstance set TaskExecutionContext host,logPath,executePath

* cancelTaskInstance set TaskExecutionContext host,logPath,executePath

* Encapsulate the parameters required by sqltask

* 1,Encapsulate the parameters required by sqltask
2,SQLTask optimization

* AbstractTask modify

* ProcedureTask optimization

* MasterSchedulerService modify

* TaskUpdateQueueConsumer modify

* test

* DataxTask process run debug

* DataxTask process run debug

* add protobuf dependency,MR、Spark task etc need this

* TaskUpdateQueueConsumer modify

* TaskExecutionContextBuilder set TaskInstance workgroup

* WorkerGroupService queryAllGroup modify
query available work group

* 1,get workergroup from zk modify
2,SpringConnectionFactory repeat load modify
Co-authored-by: Nqiaozhanwei <qiaozhanwei@analysys.com.cn>
上级 11dde203
...@@ -33,10 +33,7 @@ import org.springframework.beans.factory.annotation.Autowired; ...@@ -33,10 +33,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.util.Date; import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/** /**
* work group service * work group service
...@@ -184,10 +181,22 @@ public class WorkerGroupService extends BaseService { ...@@ -184,10 +181,22 @@ public class WorkerGroupService extends BaseService {
* @return all worker group list * @return all worker group list
*/ */
public Map<String,Object> queryAllGroup() { public Map<String,Object> queryAllGroup() {
Map<String, Object> result = new HashMap<>(5); Map<String, Object> result = new HashMap<>();
String WORKER_PATH = zookeeperCachedOperator.getZookeeperConfig().getDsRoot()+"/nodes" +"/worker"; String workerPath = zookeeperCachedOperator.getZookeeperConfig().getDsRoot()+"/nodes" +"/worker";
List<String> workerGroupList = zookeeperCachedOperator.getChildrenKeys(WORKER_PATH); List<String> workerGroupList = zookeeperCachedOperator.getChildrenKeys(workerPath);
result.put(Constants.DATA_LIST, workerGroupList);
// available workerGroup list
List<String> availableWorkerGroupList = new ArrayList<>();
for (String workerGroup : workerGroupList){
String workerGroupPath= workerPath + "/" + workerGroup;
List<String> childrenNodes = zookeeperCachedOperator.getChildrenKeys(workerGroupPath);
if (CollectionUtils.isNotEmpty(childrenNodes)){
availableWorkerGroupList.add(workerGroup);
}
}
result.put(Constants.DATA_LIST, availableWorkerGroupList);
putMsg(result, Status.SUCCESS); putMsg(result, Status.SUCCESS);
return result; return result;
} }
......
...@@ -123,7 +123,6 @@ public class SpringConnectionFactory { ...@@ -123,7 +123,6 @@ public class SpringConnectionFactory {
@Bean @Bean
public SqlSessionFactory sqlSessionFactory() throws Exception { public SqlSessionFactory sqlSessionFactory() throws Exception {
MybatisConfiguration configuration = new MybatisConfiguration(); MybatisConfiguration configuration = new MybatisConfiguration();
configuration.addMappers("org.apache.dolphinscheduler.dao.mapper");
configuration.setMapUnderscoreToCamelCase(true); configuration.setMapUnderscoreToCamelCase(true);
configuration.setCacheEnabled(false); configuration.setCacheEnabled(false);
configuration.setCallSettersOnNulls(true); configuration.setCallSettersOnNulls(true);
......
...@@ -17,15 +17,12 @@ ...@@ -17,15 +17,12 @@
package org.apache.dolphinscheduler.server.builder; package org.apache.dolphinscheduler.server.builder;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.entity.*;
import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.DataxTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.ProcedureTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext; import org.apache.dolphinscheduler.server.entity.SQLTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext; import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import java.util.List;
/** /**
* TaskExecutionContext builder * TaskExecutionContext builder
*/ */
...@@ -51,7 +48,7 @@ public class TaskExecutionContextBuilder { ...@@ -51,7 +48,7 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setLogPath(taskInstance.getLogPath()); taskExecutionContext.setLogPath(taskInstance.getLogPath());
taskExecutionContext.setExecutePath(taskInstance.getExecutePath()); taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
taskExecutionContext.setTaskJson(taskInstance.getTaskJson()); taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
taskExecutionContext.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP); taskExecutionContext.setWorkerGroup(taskInstance.getWorkerGroup());
return this; return this;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册