未验证 提交 3a2b1726 编写于 作者: Z zhenhe 提交者: GitHub

[ISSUE #3006]Replace ScheduledExecutorService instead of Timer to avoid...

[ISSUE #3006]Replace ScheduledExecutorService instead of Timer to avoid affecting other tasks during exception (#3001)

* 采用ScheduledExecutorService替代Timer,避免异常捕获时影响其他任务

多线程并行处理定时任务时,Timer运行多个TimeTask时,只要其中之一没有捕获抛出的异常,其它任务便会自动终止运行,使用ScheduledExecutorService则没有这个问题。

* optimize imports

* Add @Override annotation

* Revert "Add @Override annotation"

This reverts commit 3ddccd88022db33361a2af08b36b0c8f5d963f48.
Co-authored-by: Nwuzh <wuzh@bosera.com>
上级 0fdc73d4
...@@ -17,18 +17,11 @@ ...@@ -17,18 +17,11 @@
package org.apache.rocketmq.example.benchmark; package org.apache.rocketmq.example.benchmark;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option; import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser; import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
...@@ -42,6 +35,16 @@ import org.apache.rocketmq.common.message.MessageExt; ...@@ -42,6 +35,16 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.srvutil.ServerUtil;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.TimerTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class Consumer { public class Consumer {
public static void main(String[] args) throws MQClientException, IOException { public static void main(String[] args) throws MQClientException, IOException {
...@@ -71,11 +74,12 @@ public class Consumer { ...@@ -71,11 +74,12 @@ public class Consumer {
final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer(); final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer();
final Timer timer = new Timer("BenchmarkTimerThread", true); ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build());
final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>(); final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>();
timer.scheduleAtFixedRate(new TimerTask() { executorService.scheduleAtFixedRate(new TimerTask() {
@Override @Override
public void run() { public void run() {
snapshotList.addLast(statsBenchmarkConsumer.createSnapshot()); snapshotList.addLast(statsBenchmarkConsumer.createSnapshot());
...@@ -83,9 +87,9 @@ public class Consumer { ...@@ -83,9 +87,9 @@ public class Consumer {
snapshotList.removeFirst(); snapshotList.removeFirst();
} }
} }
}, 1000, 1000); }, 1000, 1000, TimeUnit.MILLISECONDS);
timer.scheduleAtFixedRate(new TimerTask() { executorService.scheduleAtFixedRate(new TimerTask() {
private void printStats() { private void printStats() {
if (snapshotList.size() >= 10) { if (snapshotList.size() >= 10) {
Long[] begin = snapshotList.getFirst(); Long[] begin = snapshotList.getFirst();
...@@ -116,7 +120,7 @@ public class Consumer { ...@@ -116,7 +120,7 @@ public class Consumer {
e.printStackTrace(); e.printStackTrace();
} }
} }
}, 10000, 10000); }, 10000, 10000, TimeUnit.MILLISECONDS);
RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null; RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null;
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely(), msgTraceEnable, null); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely(), msgTraceEnable, null);
......
...@@ -16,32 +16,34 @@ ...@@ -16,32 +16,34 @@
*/ */
package org.apache.rocketmq.example.benchmark; package org.apache.rocketmq.example.benchmark;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option; import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser; import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.srvutil.ServerUtil;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Random;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
public class Producer { public class Producer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
...@@ -73,7 +75,8 @@ public class Producer { ...@@ -73,7 +75,8 @@ public class Producer {
final StatsBenchmarkProducer statsBenchmark = new StatsBenchmarkProducer(); final StatsBenchmarkProducer statsBenchmark = new StatsBenchmarkProducer();
final Timer timer = new Timer("BenchmarkTimerThread", true); ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build());
final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>(); final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>();
...@@ -87,7 +90,7 @@ public class Producer { ...@@ -87,7 +90,7 @@ public class Producer {
} }
} }
timer.scheduleAtFixedRate(new TimerTask() { executorService.scheduleAtFixedRate(new TimerTask() {
@Override @Override
public void run() { public void run() {
snapshotList.addLast(statsBenchmark.createSnapshot()); snapshotList.addLast(statsBenchmark.createSnapshot());
...@@ -95,9 +98,9 @@ public class Producer { ...@@ -95,9 +98,9 @@ public class Producer {
snapshotList.removeFirst(); snapshotList.removeFirst();
} }
} }
}, 1000, 1000); }, 1000, 1000, TimeUnit.MILLISECONDS);
timer.scheduleAtFixedRate(new TimerTask() { executorService.scheduleAtFixedRate(new TimerTask() {
private void printStats() { private void printStats() {
if (snapshotList.size() >= 10) { if (snapshotList.size() >= 10) {
doPrintStats(snapshotList, statsBenchmark, false); doPrintStats(snapshotList, statsBenchmark, false);
...@@ -112,7 +115,7 @@ public class Producer { ...@@ -112,7 +115,7 @@ public class Producer {
e.printStackTrace(); e.printStackTrace();
} }
} }
}, 10000, 10000); }, 10000, 10000, TimeUnit.MILLISECONDS);
RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null; RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null;
final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer", rpcHook, msgTraceEnable, null); final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer", rpcHook, msgTraceEnable, null);
...@@ -224,7 +227,12 @@ public class Producer { ...@@ -224,7 +227,12 @@ public class Producer {
try { try {
sendThreadPool.shutdown(); sendThreadPool.shutdown();
sendThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); sendThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
timer.cancel(); executorService.shutdown();
try {
executorService.awaitTermination(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
if (snapshotList.size() > 1) { if (snapshotList.size() > 1) {
doPrintStats(snapshotList, statsBenchmark, true); doPrintStats(snapshotList, statsBenchmark, true);
} else { } else {
......
...@@ -17,26 +17,11 @@ ...@@ -17,26 +17,11 @@
package org.apache.rocketmq.example.benchmark; package org.apache.rocketmq.example.benchmark;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option; import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser; import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
...@@ -48,6 +33,24 @@ import org.apache.rocketmq.common.message.MessageConst; ...@@ -48,6 +33,24 @@ import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.srvutil.ServerUtil; import org.apache.rocketmq.srvutil.ServerUtil;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TransactionProducer { public class TransactionProducer {
private static final long START_TIME = System.currentTimeMillis(); private static final long START_TIME = System.currentTimeMillis();
private static final AtomicLong MSG_COUNT = new AtomicLong(0); private static final AtomicLong MSG_COUNT = new AtomicLong(0);
...@@ -75,11 +78,12 @@ public class TransactionProducer { ...@@ -75,11 +78,12 @@ public class TransactionProducer {
final StatsBenchmarkTProducer statsBenchmark = new StatsBenchmarkTProducer(); final StatsBenchmarkTProducer statsBenchmark = new StatsBenchmarkTProducer();
final Timer timer = new Timer("BenchmarkTimerThread", true); ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build());
final LinkedList<Snapshot> snapshotList = new LinkedList<>(); final LinkedList<Snapshot> snapshotList = new LinkedList<>();
timer.scheduleAtFixedRate(new TimerTask() { executorService.scheduleAtFixedRate(new TimerTask() {
@Override @Override
public void run() { public void run() {
snapshotList.addLast(statsBenchmark.createSnapshot()); snapshotList.addLast(statsBenchmark.createSnapshot());
...@@ -87,9 +91,9 @@ public class TransactionProducer { ...@@ -87,9 +91,9 @@ public class TransactionProducer {
snapshotList.removeFirst(); snapshotList.removeFirst();
} }
} }
}, 1000, 1000); }, 1000, 1000, TimeUnit.MILLISECONDS);
timer.scheduleAtFixedRate(new TimerTask() { executorService.scheduleAtFixedRate(new TimerTask() {
private void printStats() { private void printStats() {
if (snapshotList.size() >= 10) { if (snapshotList.size() >= 10) {
Snapshot begin = snapshotList.getFirst(); Snapshot begin = snapshotList.getFirst();
...@@ -121,7 +125,7 @@ public class TransactionProducer { ...@@ -121,7 +125,7 @@ public class TransactionProducer {
e.printStackTrace(); e.printStackTrace();
} }
} }
}, 10000, 10000); }, 10000, 10000, TimeUnit.MILLISECONDS);
final TransactionListener transactionCheckListener = new TransactionListenerImpl(statsBenchmark, config); final TransactionListener transactionCheckListener = new TransactionListenerImpl(statsBenchmark, config);
final TransactionMQProducer producer = new TransactionMQProducer( final TransactionMQProducer producer = new TransactionMQProducer(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册