未验证 提交 85ab55de 编写于 作者: X xingchun-chen 提交者: GitHub

Merge pull request #4149 from yh2388/fix-task-execpath-claer

[Fix][Server] Fix clear task execute path is related to master.
......@@ -192,8 +192,6 @@ public class MasterExecThread implements Runnable {
processService.updateProcessInstance(processInstance);
}finally {
taskExecService.shutdown();
// post handle
postHandle();
}
}
......@@ -381,27 +379,6 @@ public class MasterExecThread implements Runnable {
}
}
/**
* process post handle
*/
private void postHandle() {
logger.info("develop mode is: {}", CommonUtils.isDevelopMode());
if (!CommonUtils.isDevelopMode()) {
// get exec dir
String execLocalPath = org.apache.dolphinscheduler.common.utils.FileUtils
.getProcessExecDir(processInstance.getProcessDefinition().getProjectId(),
processInstance.getProcessDefinitionId(),
processInstance.getId());
try {
FileUtils.deleteDirectory(new File(execLocalPath));
} catch (IOException e) {
logger.error("delete exec dir failed ", e);
}
}
}
/**
* submit task to execute
* @param taskInstance task instance
......
......@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import com.alibaba.fastjson.JSONObject;
......@@ -25,6 +26,7 @@ import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.ResponceCache;
......@@ -34,16 +36,21 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.TaskManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.*;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* task scheduler thread
* task scheduler thread
*/
public class TaskExecuteThread implements Runnable {
......@@ -53,17 +60,17 @@ public class TaskExecuteThread implements Runnable {
private final Logger logger = LoggerFactory.getLogger(TaskExecuteThread.class);
/**
* task instance
* task instance
*/
private TaskExecutionContext taskExecutionContext;
/**
* abstract task
* abstract task
*/
private AbstractTask task;
/**
* task callback service
* task callback service
*/
private TaskCallbackService taskCallbackService;
......@@ -132,7 +139,7 @@ public class TaskExecuteThread implements Runnable {
responseCommand.setProcessId(task.getProcessId());
responseCommand.setAppIds(task.getAppIds());
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
}catch (Exception e){
} catch (Exception e) {
logger.error("task scheduler failure", e);
kill();
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
......@@ -141,18 +148,47 @@ public class TaskExecuteThread implements Runnable {
responseCommand.setAppIds(task.getAppIds());
} finally {
taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(),responseCommand.convert2Command(),Event.RESULT);
ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
clearTaskExecPath();
}
}
/**
* when task finish, clear execute path.
*/
private void clearTaskExecPath() {
logger.info("develop mode is: {}", CommonUtils.isDevelopMode());
if (!CommonUtils.isDevelopMode()) {
// get exec dir
String execLocalPath = taskExecutionContext.getExecutePath();
if (StringUtils.isEmpty(execLocalPath)) {
logger.warn("task: {} exec local path is empty.", taskExecutionContext.getTaskName());
return;
}
if ("/".equals(execLocalPath)) {
logger.warn("task: {} exec local path is '/', direct deletion is not allowed", taskExecutionContext.getTaskName());
return;
}
try {
org.apache.commons.io.FileUtils.deleteDirectory(new File(execLocalPath));
logger.info("exec local path: {} cleared.", execLocalPath);
} catch (IOException e) {
logger.error("delete exec dir failed : {}", e.getMessage(), e);
}
}
}
/**
* get global paras map
* @return
*/
private Map<String, String> getGlobalParamsMap() {
Map<String,String> globalParamsMap = new HashMap<>(16);
Map<String, String> globalParamsMap = new HashMap<>(16);
// global params string
String globalParamsStr = taskExecutionContext.getGlobalParams();
......@@ -165,17 +201,17 @@ public class TaskExecuteThread implements Runnable {
/**
* set task timeout
*
* @param taskExecutionContext TaskExecutionContext
* @param taskNode
*/
private void setTaskTimeout(TaskExecutionContext taskExecutionContext, TaskNode taskNode) {
// the default timeout is the maximum value of the integer
taskExecutionContext.setTaskTimeout(Integer.MAX_VALUE);
TaskTimeoutParameter taskTimeoutParameter = taskNode.getTaskTimeoutParameter();
if (taskTimeoutParameter.getEnable()){
if (taskTimeoutParameter.getEnable()) {
// get timeout strategy
taskExecutionContext.setTaskTimeoutStrategy(taskTimeoutParameter.getStrategy().getCode());
switch (taskTimeoutParameter.getStrategy()){
switch (taskTimeoutParameter.getStrategy()) {
case WARN:
break;
case FAILED:
......@@ -196,38 +232,32 @@ public class TaskExecuteThread implements Runnable {
}
}
/**
* kill task
* kill task
*/
public void kill(){
if (task != null){
public void kill() {
if (task != null) {
try {
task.cancelApplication(true);
}catch (Exception e){
logger.error(e.getMessage(),e);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
/**
* download resource file
*
* @param execLocalPath
* @param projectRes
* @param logger
*/
private void downloadResource(String execLocalPath,
Map<String,String> projectRes,
Map<String, String> projectRes,
Logger logger) throws Exception {
if (MapUtils.isEmpty(projectRes)){
if (MapUtils.isEmpty(projectRes)) {
return;
}
Set<Map.Entry<String, String>> resEntries = projectRes.entrySet();
for (Map.Entry<String,String> resource : resEntries) {
for (Map.Entry<String, String> resource : resEntries) {
String fullName = resource.getKey();
String tenantCode = resource.getValue();
File resFile = new File(execLocalPath, fullName);
......@@ -238,8 +268,8 @@ public class TaskExecuteThread implements Runnable {
logger.info("get resource file from hdfs :{}", resHdfsPath);
HadoopUtils.getInstance().copyHdfsToLocal(resHdfsPath, execLocalPath + File.separator + fullName, false, true);
}catch (Exception e){
logger.error(e.getMessage(),e);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e.getMessage());
}
} else {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import static org.powermock.api.mockito.PowerMockito.mock;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.cache.impl.TaskExecutionContextCacheManagerImpl;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
@RunWith(PowerMockRunner.class)
@PrepareForTest({TaskExecuteThread.class})
public class TaskExecuteThreadTest {
private static final Logger logger = LoggerFactory.getLogger(TaskExecuteThreadTest.class);
TaskExecutionContext taskExecutionContext;
TaskCallbackService taskCallbackService;
ApplicationContext applicationContext;
TaskExecutionContextCacheManagerImpl taskExecutionContextCacheManager;
private ProcessService processService;
@Before
public void init() throws Exception {
taskExecutionContext = PowerMockito.mock(TaskExecutionContext.class);
taskCallbackService = PowerMockito.mock(TaskCallbackService.class);
applicationContext = PowerMockito.mock(ApplicationContext.class);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
taskExecutionContextCacheManager = new TaskExecutionContextCacheManagerImpl();
Mockito.when(applicationContext.getBean(TaskExecutionContextCacheManagerImpl.class)).thenReturn(taskExecutionContextCacheManager);
}
@Test
public void testTaskClearExecPath() throws Exception {
processService = mock(ProcessService.class);
ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
TaskCallbackService taskCallbackService = Mockito.mock(TaskCallbackService.class);
TaskExecuteThread taskExecuteThread = PowerMockito.spy(new TaskExecuteThread(taskExecutionContext, taskCallbackService, logger));
Mockito.when(taskExecutionContext.getExecutePath()).thenReturn("/");
Assert.assertTrue(true);
}
@Test
public void testClearTaskExecPath() {
TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, logger);
Mockito.when(CommonUtils.isDevelopMode()).thenReturn(false);
Mockito.when(taskExecutionContext.getTaskJson()).thenThrow(new RuntimeException("测试异常后finally执行"));
try {
taskExecuteThread.run();
} catch (Exception ignored) {
//ignored
}
Mockito.when(taskExecutionContext.getExecutePath()).thenReturn(null);
try {
taskExecuteThread.run();
} catch (Exception ignored) {
//ignored
}
Mockito.when(taskExecutionContext.getExecutePath()).thenReturn("/");
try {
taskExecuteThread.run();
} catch (Exception ignored) {
//ignored
}
Mockito.when(taskExecutionContext.getExecutePath()).thenReturn("/data/test-testClearTaskExecPath");
try {
taskExecuteThread.run();
} catch (Exception ignored) {
//ignored
}
Assert.assertTrue(true);
}
@Test
public void testNotClearTaskExecPath() {
TaskExecuteThread taskExecuteThread = new TaskExecuteThread(taskExecutionContext, taskCallbackService, logger);
Mockito.when(CommonUtils.isDevelopMode()).thenReturn(true);
taskExecuteThread.run();
Assert.assertTrue(true);
}
}
......@@ -794,6 +794,7 @@
<include>**/server/log/WorkerLogFilterTest.java</include>
<include>**/server/master/consumer/TaskPriorityQueueConsumerTest.java</include>
<include>**/server/master/runner/MasterTaskExecThreadTest.java</include>
<include>**/server/worker/runner/TaskExecuteThreadTest.java</include>
<include>**/server/master/dispatch/executor/NettyExecutorManagerTest.java</include>
<include>**/server/master/dispatch/host/assign/LowerWeightRoundRobinTest.java</include>
<include>**/server/master/dispatch/host/assign/RandomSelectorTest.java</include>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册