提交 2075eba1 编写于 作者: C CaptainB

refactor: 测试结束时清理待执行队列

上级 001753a1
......@@ -2,33 +2,34 @@ package io.metersphere.streaming.commons.utils;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Future;
public class ReportTasks {
private static final ConcurrentHashMap<String, CopyOnWriteArraySet<Runnable>> reportTasks = new ConcurrentHashMap<>();
private static final LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
private static final ConcurrentHashMap<String, CopyOnWriteArraySet<Future<?>>> reportTasks = new ConcurrentHashMap<>();
public static void addTask(String reportId, Runnable task) {
CopyOnWriteArraySet<Runnable> tasks = reportTasks.get(reportId);
public static void addTask(String reportId, Future<?> future) {
CopyOnWriteArraySet<Future<?>> tasks = reportTasks.get(reportId);
if (tasks == null) {
tasks = new CopyOnWriteArraySet<>();
reportTasks.put(reportId, tasks);
}
tasks.add(task);
tasks.add(future);
LogUtil.info("添加任务: reportId: {}, taskSize: {}", reportId, tasks.size());
}
public static void clearTasks(String reportId) {
taskQueue.removeAll(getTasks(reportId));
for (Future<?> task : getTasks(reportId)) {
try {
task.cancel(true);
} catch (Exception e) {
LogUtil.error("取消任务失败: ", e);
}
}
reportTasks.remove(reportId);
LogUtil.info("清理任务: reportId: {}", reportId);
}
private static CopyOnWriteArraySet<Runnable> getTasks(String reportId) {
private static CopyOnWriteArraySet<Future<?>> getTasks(String reportId) {
return reportTasks.getOrDefault(reportId, new CopyOnWriteArraySet<>());
}
public static LinkedBlockingQueue<Runnable> getTaskQueue() {
return taskQueue;
}
}
......@@ -31,7 +31,7 @@ public class ReportConsumer {
private final ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 20,
0L, TimeUnit.MILLISECONDS,
ReportTasks.getTaskQueue());
new LinkedBlockingQueue<>());
private final ThreadPoolExecutor saveExecutor = new ThreadPoolExecutor(30, 30,
0L, TimeUnit.MILLISECONDS,
......@@ -57,11 +57,10 @@ public class ReportConsumer {
}
String key = reportId + "_" + resourceIndex;
LogUtil.info("处理报告: reportId_resourceIndex: {}", key);
Runnable task = getRealtimeTask(content, reportId, resourceIndex);
executor.submit(task);
// 保存每个报告的任务队列
ReportTasks.addTask(reportId, task);
Runnable task = getRealtimeTask(content, reportId, resourceIndex);
Future<?> future = executor.submit(task);
ReportTasks.addTask(reportId, future);
}
private Runnable getRealtimeTask(List<ReportResult> content, String reportId, Integer resourceIndex) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册