From 94abced3f7b391a560421247ad0c70401d4bd6a8 Mon Sep 17 00:00:00 2001 From: panzhi Date: Sat, 31 Jul 2021 02:06:50 +0800 Subject: [PATCH] [ISSUE #3199] Two timed task for RequestFutureTable (#3202) Co-authored-by: panzhi33 --- .../impl/producer/DefaultMQProducerImpl.java | 40 +++++++++++++------ .../client/producer/RequestFutureTable.java | 7 ++++ 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index fac3ed35..00ee3b0a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -24,14 +24,14 @@ import java.util.HashSet; import java.util.List; import java.util.Random; import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -107,7 +107,12 @@ public class DefaultMQProducerImpl implements MQProducerInner { private final RPCHook rpcHook; private final BlockingQueue asyncSenderThreadPoolQueue; private final ExecutorService defaultAsyncSenderExecutor; - private final Timer timer = new Timer("RequestHouseKeepingService", true); + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "RequestHouseKeepingService"); + } + }); protected BlockingQueue checkRequestQueue; protected ExecutorService checkExecutor; private ServiceState serviceState = ServiceState.CREATE_JUST; @@ -227,16 +232,23 @@ public class DefaultMQProducerImpl implements MQProducerInner { this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); - this.timer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - try { - RequestFutureTable.scanExpiredRequest(); - } catch (Throwable e) { - log.error("scan RequestFutureTable exception", e); + this.startScheduledTask(); + + } + + private void startScheduledTask() { + if (RequestFutureTable.getProducerNum().incrementAndGet() == 1) { + this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + RequestFutureTable.scanExpiredRequest(); + } catch (Throwable e) { + log.error("scan RequestFutureTable exception", e); + } } - } - }, 1000 * 3, 1000); + }, 1000 * 3, 1000, TimeUnit.MILLISECONDS); + } } private void checkConfig() throws MQClientException { @@ -266,7 +278,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { if (shutdownFactory) { this.mQClientFactory.shutdown(); } - this.timer.cancel(); + if (RequestFutureTable.getProducerNum().decrementAndGet() == 0) { + scheduledExecutorService.shutdown(); + } log.info("the producer [{}] shutdown OK", this.defaultMQProducer.getProducerGroup()); this.serviceState = ServiceState.SHUTDOWN_ALREADY; break; diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java index 3d4caa20..52cda3e8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java @@ -22,6 +22,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + import org.apache.rocketmq.client.common.ClientErrorCode; import org.apache.rocketmq.client.exception.RequestTimeoutException; import org.apache.rocketmq.client.log.ClientLogger; @@ -30,6 +32,7 @@ import org.apache.rocketmq.logging.InternalLogger; public class RequestFutureTable { private static InternalLogger log = ClientLogger.getLog(); private static ConcurrentHashMap requestFutureTable = new ConcurrentHashMap(); + private static final AtomicInteger producerNum = new AtomicInteger(0); public static ConcurrentHashMap getRequestFutureTable() { return requestFutureTable; @@ -59,4 +62,8 @@ public class RequestFutureTable { } } } + + public static AtomicInteger getProducerNum() { + return producerNum; + } } -- GitLab