未验证 提交 94abced3 编写于 作者: P panzhi 提交者: GitHub

[ISSUE #3199] Two timed task for RequestFutureTable (#3202)

Co-authored-by: Npanzhi33 <wb-pz502261@alibaba-inc.com>
上级 3e014241
......@@ -24,14 +24,14 @@ import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
......@@ -107,7 +107,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
private final RPCHook rpcHook;
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
private final ExecutorService defaultAsyncSenderExecutor;
private final Timer timer = new Timer("RequestHouseKeepingService", true);
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "RequestHouseKeepingService");
}
});
protected BlockingQueue<Runnable> checkRequestQueue;
protected ExecutorService checkExecutor;
private ServiceState serviceState = ServiceState.CREATE_JUST;
......@@ -227,16 +232,23 @@ public class DefaultMQProducerImpl implements MQProducerInner {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
RequestFutureTable.scanExpiredRequest();
} catch (Throwable e) {
log.error("scan RequestFutureTable exception", e);
this.startScheduledTask();
}
private void startScheduledTask() {
if (RequestFutureTable.getProducerNum().incrementAndGet() == 1) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
RequestFutureTable.scanExpiredRequest();
} catch (Throwable e) {
log.error("scan RequestFutureTable exception", e);
}
}
}
}, 1000 * 3, 1000);
}, 1000 * 3, 1000, TimeUnit.MILLISECONDS);
}
}
private void checkConfig() throws MQClientException {
......@@ -266,7 +278,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (shutdownFactory) {
this.mQClientFactory.shutdown();
}
this.timer.cancel();
if (RequestFutureTable.getProducerNum().decrementAndGet() == 0) {
scheduledExecutorService.shutdown();
}
log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup());
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
break;
......
......@@ -22,6 +22,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.common.ClientErrorCode;
import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.log.ClientLogger;
......@@ -30,6 +32,7 @@ import org.apache.rocketmq.logging.InternalLogger;
public class RequestFutureTable {
private static InternalLogger log = ClientLogger.getLog();
private static ConcurrentHashMap<String, RequestResponseFuture> requestFutureTable = new ConcurrentHashMap<String, RequestResponseFuture>();
private static final AtomicInteger producerNum = new AtomicInteger(0);
public static ConcurrentHashMap<String, RequestResponseFuture> getRequestFutureTable() {
return requestFutureTable;
......@@ -59,4 +62,8 @@ public class RequestFutureTable {
}
}
}
public static AtomicInteger getProducerNum() {
return producerNum;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册