未验证 提交 79eb2e85 编写于 作者: B blackberrier 提交者: GitHub

[Improvement-5539][Master] Check status of taskInstance from cache (#5572)

* improvement:check status of taskInstance from cache

* issue5572 use timer instead of while&sleep; consider concurrent modification

* use computeifpresent instead of lock

* simplify getByTaskInstanceId function

* add ut for TaskInstanceCacheManagerImpl; fix bug in TaskInstanceCacheManagerImpl

* add Apache license header;add test class in root pom
上级 e2243d63
...@@ -530,6 +530,11 @@ public final class Constants { ...@@ -530,6 +530,11 @@ public final class Constants {
*/ */
public static final int SLEEP_TIME_MILLIS = 1000; public static final int SLEEP_TIME_MILLIS = 1000;
/**
* master task instance cache-database refresh interval
*/
public static final int CACHE_REFRESH_TIME_MILLIS = 20 * 1000;
/** /**
* heartbeat for zk info length * heartbeat for zk info length
*/ */
......
...@@ -14,8 +14,11 @@ ...@@ -14,8 +14,11 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.dolphinscheduler.server.master.cache.impl; package org.apache.dolphinscheduler.server.master.cache.impl;
import static org.apache.dolphinscheduler.common.Constants.CACHE_REFRESH_TIME_MILLIS;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand; import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
...@@ -25,8 +28,14 @@ import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager; ...@@ -25,8 +28,14 @@ import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -47,6 +56,24 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager { ...@@ -47,6 +56,24 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
@Autowired @Autowired
private ProcessService processService; private ProcessService processService;
/**
* taskInstance cache refresh timer
*/
private Timer refreshTaskInstanceTimer = null;
@PostConstruct
public void init() {
//issue#5539 add thread to fetch task state from database in a fixed rate
this.refreshTaskInstanceTimer = new Timer(true);
refreshTaskInstanceTimer.scheduleAtFixedRate(
new RefreshTaskInstanceTimerTask(), CACHE_REFRESH_TIME_MILLIS, CACHE_REFRESH_TIME_MILLIS
);
}
@PreDestroy
public void close() {
this.refreshTaskInstanceTimer.cancel();
}
/** /**
* get taskInstance by taskInstance id * get taskInstance by taskInstance id
...@@ -56,12 +83,7 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager { ...@@ -56,12 +83,7 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
*/ */
@Override @Override
public TaskInstance getByTaskInstanceId(Integer taskInstanceId) { public TaskInstance getByTaskInstanceId(Integer taskInstanceId) {
TaskInstance taskInstance = taskInstanceCache.get(taskInstanceId); return taskInstanceCache.computeIfAbsent(taskInstanceId, k -> processService.findTaskInstanceById(taskInstanceId));
if (taskInstance == null){
taskInstance = processService.findTaskInstanceById(taskInstanceId);
taskInstanceCache.put(taskInstanceId,taskInstance);
}
return taskInstance;
} }
/** /**
...@@ -106,6 +128,7 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager { ...@@ -106,6 +128,7 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
TaskInstance taskInstance = getByTaskInstanceId(taskExecuteResponseCommand.getTaskInstanceId()); TaskInstance taskInstance = getByTaskInstanceId(taskExecuteResponseCommand.getTaskInstanceId());
taskInstance.setState(ExecutionStatus.of(taskExecuteResponseCommand.getStatus())); taskInstance.setState(ExecutionStatus.of(taskExecuteResponseCommand.getStatus()));
taskInstance.setEndTime(taskExecuteResponseCommand.getEndTime()); taskInstance.setEndTime(taskExecuteResponseCommand.getEndTime());
taskInstanceCache.put(taskExecuteResponseCommand.getTaskInstanceId(), taskInstance);
} }
/** /**
...@@ -116,4 +139,17 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager { ...@@ -116,4 +139,17 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
public void removeByTaskInstanceId(Integer taskInstanceId) { public void removeByTaskInstanceId(Integer taskInstanceId) {
taskInstanceCache.remove(taskInstanceId); taskInstanceCache.remove(taskInstanceId);
} }
class RefreshTaskInstanceTimerTask extends TimerTask {
@Override
public void run() {
for (Entry<Integer, TaskInstance> taskInstanceEntry : taskInstanceCache.entrySet()) {
TaskInstance taskInstance = processService.findTaskInstanceById(taskInstanceEntry.getKey());
if (null != taskInstance && taskInstance.getState() == ExecutionStatus.NEED_FAULT_TOLERANCE) {
taskInstanceCache.computeIfPresent(taskInstanceEntry.getKey(), (k, v) -> taskInstance);
}
}
}
}
} }
...@@ -143,7 +143,8 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread { ...@@ -143,7 +143,8 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
this.checkTimeoutFlag = !alertTimeout(); this.checkTimeoutFlag = !alertTimeout();
} }
// updateProcessInstance task instance // updateProcessInstance task instance
taskInstance = processService.findTaskInstanceById(taskInstance.getId()); //issue#5539 Check status of taskInstance from cache
taskInstance = taskInstanceCacheManager.getByTaskInstanceId(taskInstance.getId());
processInstance = processService.findProcessInstanceById(processInstance.getId()); processInstance = processService.findProcessInstanceById(processInstance.getId());
Thread.sleep(Constants.SLEEP_TIME_MILLIS); Thread.sleep(Constants.SLEEP_TIME_MILLIS);
} catch (Exception e) { } catch (Exception e) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.cache.impl;
import static org.apache.dolphinscheduler.common.Constants.CACHE_REFRESH_TIME_MILLIS;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class TaskInstanceCacheManagerImplTest {
@InjectMocks
private TaskInstanceCacheManagerImpl taskInstanceCacheManager;
@Mock(name = "processService")
private ProcessService processService;
@Before
public void before() {
TaskExecuteAckCommand taskExecuteAckCommand = new TaskExecuteAckCommand();
taskExecuteAckCommand.setStatus(1);
taskExecuteAckCommand.setExecutePath("/dolphinscheduler/worker");
taskExecuteAckCommand.setHost("worker007");
taskExecuteAckCommand.setLogPath("/temp/worker.log");
taskExecuteAckCommand.setStartTime(new Date(1970, Calendar.AUGUST,7));
taskExecuteAckCommand.setTaskInstanceId(0);
taskInstanceCacheManager.cacheTaskInstance(taskExecuteAckCommand);
}
@Test
public void testInit() throws InterruptedException {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(0);
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
taskInstance.setExecutePath("/dolphinscheduler/worker");
taskInstance.setHost("worker007");
taskInstance.setLogPath("/temp/worker.log");
taskInstance.setProcessInstanceId(0);
Mockito.when(processService.findTaskInstanceById(0)).thenReturn(taskInstance);
taskInstanceCacheManager.init();
TimeUnit.MILLISECONDS.sleep(CACHE_REFRESH_TIME_MILLIS + 1000);
Assert.assertEquals(taskInstance.getState(), taskInstanceCacheManager.getByTaskInstanceId(0).getState());
}
@Test
public void getByTaskInstanceIdFromCache() {
TaskInstance instanceGot = taskInstanceCacheManager.getByTaskInstanceId(0);
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(0);
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setExecutePath("/dolphinscheduler/worker");
taskInstance.setHost("worker007");
taskInstance.setLogPath("/temp/worker.log");
taskInstance.setStartTime(new Date(1970, Calendar.AUGUST,7));
Assert.assertEquals(taskInstance.toString(), instanceGot.toString());
}
@Test
public void getByTaskInstanceIdFromDatabase() {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setExecutePath("/dolphinscheduler/worker");
taskInstance.setHost("worker007");
taskInstance.setLogPath("/temp/worker.log");
taskInstance.setStartTime(new Date(1970, Calendar.AUGUST,7));
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
TaskInstance instanceGot = taskInstanceCacheManager.getByTaskInstanceId(1);
Assert.assertEquals(taskInstance, instanceGot);
}
@Test
public void cacheTaskInstanceByTaskExecutionContext() {
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
taskExecutionContext.setTaskInstanceId(2);
taskExecutionContext.setTaskName("blackberrier test");
taskExecutionContext.setStartTime(new Date(1970, Calendar.AUGUST,7));
taskExecutionContext.setTaskType(TaskType.SPARK.getDesc());
taskExecutionContext.setExecutePath("/tmp");
taskInstanceCacheManager.cacheTaskInstance(taskExecutionContext);
TaskInstance taskInstance = taskInstanceCacheManager.getByTaskInstanceId(2);
Assert.assertEquals(taskInstance.getId(), 2);
Assert.assertEquals(taskInstance.getName(), "blackberrier test");
Assert.assertEquals(taskInstance.getStartTime(), new Date(1970, Calendar.AUGUST, 7));
Assert.assertEquals(taskInstance.getTaskType(), TaskType.SPARK.getDesc());
Assert.assertEquals(taskInstance.getExecutePath(), "/tmp");
}
@Test
public void testCacheTaskInstanceByTaskExecuteAckCommand() {
TaskInstance taskInstance = taskInstanceCacheManager.getByTaskInstanceId(0);
Assert.assertEquals(ExecutionStatus.RUNNING_EXECUTION, taskInstance.getState());
Assert.assertEquals(new Date(1970, Calendar.AUGUST, 7), taskInstance.getStartTime());
Assert.assertEquals("worker007", taskInstance.getHost());
Assert.assertEquals("/dolphinscheduler/worker", taskInstance.getExecutePath());
Assert.assertEquals("/temp/worker.log", taskInstance.getLogPath());
}
@Test
public void testCacheTaskInstanceByTaskExecuteResponseCommand() {
TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand();
responseCommand.setTaskInstanceId(0);
responseCommand.setStatus(9);
responseCommand.setEndTime(new Date(1970, Calendar.AUGUST, 8));
taskInstanceCacheManager.cacheTaskInstance(responseCommand);
TaskInstance taskInstance = taskInstanceCacheManager.getByTaskInstanceId(0);
Assert.assertEquals(new Date(1970, Calendar.AUGUST, 8), taskInstance.getEndTime());
Assert.assertEquals(ExecutionStatus.KILL, taskInstance.getState());
}
@Test
public void removeByTaskInstanceId() {
taskInstanceCacheManager.removeByTaskInstanceId(0);
Assert.assertNull(taskInstanceCacheManager.getByTaskInstanceId(0));
}
}
\ No newline at end of file
...@@ -966,6 +966,7 @@ ...@@ -966,6 +966,7 @@
<!--<include>**/server/log/TaskLogDiscriminatorTest.java</include>--> <!--<include>**/server/log/TaskLogDiscriminatorTest.java</include>-->
<include>**/server/log/TaskLogFilterTest.java</include> <include>**/server/log/TaskLogFilterTest.java</include>
<include>**/server/log/WorkerLogFilterTest.java</include> <include>**/server/log/WorkerLogFilterTest.java</include>
<include>**/server/master/cache/impl/TaskInstanceCacheManagerImplTest.java</include>
<include>**/server/master/config/MasterConfigTest.java</include> <include>**/server/master/config/MasterConfigTest.java</include>
<include>**/server/master/consumer/TaskPriorityQueueConsumerTest.java</include> <include>**/server/master/consumer/TaskPriorityQueueConsumerTest.java</include>
<include>**/server/master/runner/MasterTaskExecThreadTest.java</include> <include>**/server/master/runner/MasterTaskExecThreadTest.java</include>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册