From 3a2b172660d489dad3365d777b995abafe50a64d Mon Sep 17 00:00:00 2001 From: zhenhe Date: Wed, 4 Aug 2021 13:59:25 +0800 Subject: [PATCH] [ISSUE #3006]Replace ScheduledExecutorService instead of Timer to avoid affecting other tasks during exception (#3001) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 采用ScheduledExecutorService替代Timer,避免异常捕获时影响其他任务 多线程并行处理定时任务时,Timer运行多个TimeTask时,只要其中之一没有捕获抛出的异常,其它任务便会自动终止运行,使用ScheduledExecutorService则没有这个问题。 * optimize imports * Add @Override annotation * Revert "Add @Override annotation" This reverts commit 3ddccd88022db33361a2af08b36b0c8f5d963f48. Co-authored-by: wuzh --- .../rocketmq/example/benchmark/Consumer.java | 30 ++++++------ .../rocketmq/example/benchmark/Producer.java | 44 ++++++++++-------- .../benchmark/TransactionProducer.java | 46 ++++++++++--------- 3 files changed, 68 insertions(+), 52 deletions(-) diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java index d3ac36c2..154e6ed4 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java @@ -17,18 +17,11 @@ package org.apache.rocketmq.example.benchmark; -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; @@ -42,6 +35,16 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.srvutil.ServerUtil; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.TimerTask; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + public class Consumer { public static void main(String[] args) throws MQClientException, IOException { @@ -71,11 +74,12 @@ public class Consumer { final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer(); - final Timer timer = new Timer("BenchmarkTimerThread", true); + ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, + new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build()); final LinkedList snapshotList = new LinkedList(); - timer.scheduleAtFixedRate(new TimerTask() { + executorService.scheduleAtFixedRate(new TimerTask() { @Override public void run() { snapshotList.addLast(statsBenchmarkConsumer.createSnapshot()); @@ -83,9 +87,9 @@ public class Consumer { snapshotList.removeFirst(); } } - }, 1000, 1000); + }, 1000, 1000, TimeUnit.MILLISECONDS); - timer.scheduleAtFixedRate(new TimerTask() { + executorService.scheduleAtFixedRate(new TimerTask() { private void printStats() { if (snapshotList.size() >= 10) { Long[] begin = snapshotList.getFirst(); @@ -116,7 +120,7 @@ public class Consumer { e.printStackTrace(); } } - }, 10000, 10000); + }, 10000, 10000, TimeUnit.MILLISECONDS); RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null; DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely(), msgTraceEnable, null); diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java index 32d4b9f7..b198a0fc 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java @@ -16,32 +16,34 @@ */ package org.apache.rocketmq.example.benchmark; -import java.io.UnsupportedEncodingException; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.Random; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.srvutil.ServerUtil; +import java.io.UnsupportedEncodingException; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.Random; +import java.util.TimerTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicLong; + public class Producer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { @@ -73,7 +75,8 @@ public class Producer { final StatsBenchmarkProducer statsBenchmark = new StatsBenchmarkProducer(); - final Timer timer = new Timer("BenchmarkTimerThread", true); + ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, + new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build()); final LinkedList snapshotList = new LinkedList(); @@ -87,7 +90,7 @@ public class Producer { } } - timer.scheduleAtFixedRate(new TimerTask() { + executorService.scheduleAtFixedRate(new TimerTask() { @Override public void run() { snapshotList.addLast(statsBenchmark.createSnapshot()); @@ -95,9 +98,9 @@ public class Producer { snapshotList.removeFirst(); } } - }, 1000, 1000); + }, 1000, 1000, TimeUnit.MILLISECONDS); - timer.scheduleAtFixedRate(new TimerTask() { + executorService.scheduleAtFixedRate(new TimerTask() { private void printStats() { if (snapshotList.size() >= 10) { doPrintStats(snapshotList, statsBenchmark, false); @@ -112,7 +115,7 @@ public class Producer { e.printStackTrace(); } } - }, 10000, 10000); + }, 10000, 10000, TimeUnit.MILLISECONDS); RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null; final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer", rpcHook, msgTraceEnable, null); @@ -224,7 +227,12 @@ public class Producer { try { sendThreadPool.shutdown(); sendThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); - timer.cancel(); + executorService.shutdown(); + try { + executorService.awaitTermination(5000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + } + if (snapshotList.size() > 1) { doPrintStats(snapshotList, statsBenchmark, true); } else { diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java index 1b511d8a..c4f14a48 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java @@ -17,26 +17,11 @@ package org.apache.rocketmq.example.benchmark; -import java.io.UnsupportedEncodingException; -import java.nio.ByteBuffer; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.SendResult; @@ -48,6 +33,24 @@ import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.srvutil.ServerUtil; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + public class TransactionProducer { private static final long START_TIME = System.currentTimeMillis(); private static final AtomicLong MSG_COUNT = new AtomicLong(0); @@ -75,11 +78,12 @@ public class TransactionProducer { final StatsBenchmarkTProducer statsBenchmark = new StatsBenchmarkTProducer(); - final Timer timer = new Timer("BenchmarkTimerThread", true); + ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, + new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build()); final LinkedList snapshotList = new LinkedList<>(); - timer.scheduleAtFixedRate(new TimerTask() { + executorService.scheduleAtFixedRate(new TimerTask() { @Override public void run() { snapshotList.addLast(statsBenchmark.createSnapshot()); @@ -87,9 +91,9 @@ public class TransactionProducer { snapshotList.removeFirst(); } } - }, 1000, 1000); + }, 1000, 1000, TimeUnit.MILLISECONDS); - timer.scheduleAtFixedRate(new TimerTask() { + executorService.scheduleAtFixedRate(new TimerTask() { private void printStats() { if (snapshotList.size() >= 10) { Snapshot begin = snapshotList.getFirst(); @@ -121,7 +125,7 @@ public class TransactionProducer { e.printStackTrace(); } } - }, 10000, 10000); + }, 10000, 10000, TimeUnit.MILLISECONDS); final TransactionListener transactionCheckListener = new TransactionListenerImpl(statsBenchmark, config); final TransactionMQProducer producer = new TransactionMQProducer( -- GitLab