提交 0355dfc4 编写于 作者: B break60

Merge branch 'dev-1.3.0' of https://github.com/apache/incubator-dolphinscheduler into dev-1.3.0

......@@ -507,7 +507,7 @@ public final class Constants {
/**
* heartbeat for zk info length
*/
public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 9;
public static final int HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH = 10;
/**
......
......@@ -18,8 +18,6 @@ package org.apache.dolphinscheduler.common.utils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.model.Server;
import java.util.Date;
/**
* heartbeat for ZK reigster res info
*/
......@@ -109,6 +107,8 @@ public class ResInfo {
Double.parseDouble(masterArray[2])));
masterServer.setCreateTime(DateUtils.stringToDate(masterArray[6]));
masterServer.setLastHeartbeatTime(DateUtils.stringToDate(masterArray[7]));
//set process id
masterServer.setId(Integer.parseInt(masterArray[9]));
return masterServer;
}
......
......@@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.dao.datasource;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -51,6 +52,9 @@ public class MySQLDataSource extends BaseDataSource {
@Override
protected String filterOther(String other){
if(StringUtils.isBlank(other)){
return "";
}
if(other.contains(sensitiveParam)){
int index = other.indexOf(sensitiveParam);
String tmp = sensitiveParam;
......
......@@ -203,7 +203,7 @@
<packager>dolphinscheduler</packager>
<!-- <version>${project.version}</version> -->
<prefix>/opt/soft</prefix>
<autoRequires>false</autoRequires>
<defineStatements>
<!-- disable compile python when rpm build -->
<defineStatement>__os_install_post %(echo '%{__os_install_post}' | sed -e 's!/usr/lib[^[:space:]]*/brp-python-bytecompile[[:space:]].*$!!g')</defineStatement>
......
......@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.master.registry;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
......@@ -131,6 +132,7 @@ public class MasterRegistry {
* @return
*/
private String getLocalAddress(){
return OSUtils.getHost() + ":" + masterConfig.getListenPort();
return OSUtils.getHost() + Constants.COLON + masterConfig.getListenPort();
}
}
......@@ -27,6 +27,7 @@ import org.apache.dolphinscheduler.common.model.TaskNode;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
......@@ -38,7 +39,6 @@ import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Date;
import java.util.Set;
......@@ -142,6 +142,9 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){
cancelTaskInstance();
}
if(processInstance.getState() == ExecutionStatus.READY_PAUSE){
pauseTask();
}
// task instance finished
if (taskInstance.getState().typeIsFinished()){
// if task is final result , then remove taskInstance from cache
......@@ -176,20 +179,33 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
return true;
}
/**
* pause task if task have not been dispatched to worker, do not dispatch anymore.
*
*/
public void pauseTask() {
taskInstance = processService.findTaskInstanceById(taskInstance.getId());
if(taskInstance == null){
return;
}
if(StringUtils.isBlank(taskInstance.getHost())){
taskInstance.setState(ExecutionStatus.PAUSE);
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
}
}
/**
* task instance add queue , waiting worker to kill
*/
private void cancelTaskInstance() throws Exception{
if(alreadyKilled){
return ;
return;
}
alreadyKilled = true;
String taskInstanceWorkerGroup = taskInstance.getWorkerGroup();
// not exists
if (!existsValidWorkerGroup(taskInstanceWorkerGroup)){
taskInstance = processService.findTaskInstanceById(taskInstance.getId());
if(StringUtils.isBlank(taskInstance.getHost())){
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
......
......@@ -73,7 +73,9 @@ public class HeartBeatTask extends Thread{
builder.append(reservedMemory).append(Constants.COMMA);
builder.append(startTime).append(Constants.COMMA);
builder.append(DateUtils.dateToString(new Date())).append(Constants.COMMA);
builder.append(status);
builder.append(status).append(COMMA);
//save process id
builder.append(OSUtils.getProcessID());
zookeeperRegistryCenter.getZookeeperCachedOperator().update(heartBeatPath, builder.toString());
} catch (Throwable ex){
logger.error("error write heartbeat info", ex);
......
......@@ -19,6 +19,7 @@ package org.apache.dolphinscheduler.server.worker.registry;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
......@@ -148,6 +149,7 @@ public class WorkerRegistry {
* @return
*/
private String getLocalAddress(){
return OSUtils.getHost() + ":" + workerConfig.getListenPort();
return OSUtils.getHost() + Constants.COLON + workerConfig.getListenPort();
}
}
......@@ -235,7 +235,6 @@ public class TaskPriorityQueueConsumerTest {
dataSource.setUpdateTime(new Date());
Mockito.when(processService.findDataSourceById(1)).thenReturn(dataSource);
Thread.sleep(10000);
}
......
......@@ -17,8 +17,8 @@
package org.apache.dolphinscheduler.server.master.runner;
import junit.framework.Assert;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.consumer.TaskPriorityQueueConsumer;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
......@@ -27,12 +27,13 @@ import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.apache.dolphinscheduler.server.registry.ZookeeperRegistryCenter;
import org.apache.dolphinscheduler.server.zk.SpringZKServer;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.springframework.context.ApplicationContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
......@@ -45,6 +46,7 @@ import java.util.Set;
ZookeeperNodeManager.class, ZookeeperCachedOperator.class, ZookeeperConfig.class})
public class MasterTaskExecThreadTest {
@Test
public void testExistsValidWorkerGroup1(){
ZookeeperRegistryCenter zookeeperRegistryCenter = Mockito.mock(ZookeeperRegistryCenter.class);
......@@ -76,5 +78,36 @@ public class MasterTaskExecThreadTest {
masterTaskExecThread.existsValidWorkerGroup("test1");
}
@Test
public void testPauseTask(){
ProcessService processService = Mockito.mock(ProcessService.class);
ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
TaskInstance taskInstance = getTaskInstance();
Mockito.when(processService.findTaskInstanceById(252612))
.thenReturn(taskInstance);
Mockito.when(processService.updateTaskInstance(taskInstance))
.thenReturn(true);
MasterTaskExecThread masterTaskExecThread = new MasterTaskExecThread(taskInstance);
masterTaskExecThread.pauseTask();
org.junit.Assert.assertEquals(ExecutionStatus.PAUSE, taskInstance.getState());
}
private TaskInstance getTaskInstance(){
TaskInstance taskInstance = new TaskInstance();
taskInstance.setTaskType("SHELL");
taskInstance.setId(252612);
taskInstance.setName("C");
taskInstance.setProcessInstanceId(10111);
taskInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
return taskInstance;
}
}
......@@ -123,12 +123,15 @@ public abstract class AbstractZKClient extends ZookeeperCachedOperator {
String parentPath = getZNodeParentPath(zkNodeType);
List<Server> masterServers = new ArrayList<>();
int i = 0;
for (Map.Entry<String, String> entry : masterMap.entrySet()) {
Server masterServer = ResInfo.parseHeartbeatForZKInfo(entry.getValue());
masterServer.setZkDirectory(parentPath + "/"+ entry.getKey());
masterServer.setId(i);
i ++;
String key = entry.getKey();
masterServer.setZkDirectory(parentPath + "/"+ key);
//set host and port
String[] hostAndPort=key.split(COLON);
String[] hosts=hostAndPort[0].split(DIVISION_STRING);
masterServer.setHost(hosts[hosts.length-1]);// fetch the last one
masterServer.setPort(Integer.parseInt(hostAndPort[1]));
masterServers.add(masterServer);
}
return masterServers;
......
......@@ -22,7 +22,7 @@
<div class="row-title">
<div class="left">
<span class="sp">IP: {{item.host}}</span>
<span class="sp">{{$t('Process Pid')}}: {{item.port}}</span>
<span class="sp">{{$t('Process Pid')}}: {{item.id}}</span>
<span class="sp">{{$t('Zk registration directory')}}: {{item.zkDirectory}}</span>
</div>
<div class="right">
......
......@@ -22,7 +22,7 @@
<div class="row-title">
<div class="left">
<span class="sp">IP: {{item.host}}</span>
<span class="sp">{{$t('Process Pid')}}: {{item.port}}</span>
<span class="sp">{{$t('Process Pid')}}: {{item.id}}</span>
<span class="sp">{{$t('Zk registration directory')}}: {{item.zkDirectory}}</span>
</div>
<div class="right">
......
......@@ -168,7 +168,7 @@ export default {
'Project Name': '项目名称',
'Please enter name': '请输入名称',
'Owned Users': '所属用户',
'Process Pid': '进程pid',
'Process Pid': '进程Pid',
'Zk registration directory': 'zk注册目录',
cpuUsage: 'cpuUsage',
memoryUsage: 'memoryUsage',
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册