diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java index d3ac36c2046042090bc6bac1f08ce306810bb846..154e6ed4dac017a953fc46c22ce33f2d40e68ec7 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java @@ -17,18 +17,11 @@ package org.apache.rocketmq.example.benchmark; -import java.io.IOException; -import java.util.LinkedList; -import java.util.List; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; @@ -42,6 +35,16 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.srvutil.ServerUtil; +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.TimerTask; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + public class Consumer { public static void main(String[] args) throws MQClientException, IOException { @@ -71,11 +74,12 @@ public class Consumer { final StatsBenchmarkConsumer statsBenchmarkConsumer = new StatsBenchmarkConsumer(); - final Timer timer = new Timer("BenchmarkTimerThread", true); + ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, + new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build()); final LinkedList snapshotList = new LinkedList(); - timer.scheduleAtFixedRate(new TimerTask() { + executorService.scheduleAtFixedRate(new TimerTask() { @Override public void run() { snapshotList.addLast(statsBenchmarkConsumer.createSnapshot()); @@ -83,9 +87,9 @@ public class Consumer { snapshotList.removeFirst(); } } - }, 1000, 1000); + }, 1000, 1000, TimeUnit.MILLISECONDS); - timer.scheduleAtFixedRate(new TimerTask() { + executorService.scheduleAtFixedRate(new TimerTask() { private void printStats() { if (snapshotList.size() >= 10) { Long[] begin = snapshotList.getFirst(); @@ -116,7 +120,7 @@ public class Consumer { e.printStackTrace(); } } - }, 10000, 10000); + }, 10000, 10000, TimeUnit.MILLISECONDS); RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null; DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely(), msgTraceEnable, null); diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java index 32d4b9f7462eb9e3a5ad373295fffc6932970e3a..b198a0fc99808cf0eeac4a81e005e004ec760cad 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java @@ -16,32 +16,34 @@ */ package org.apache.rocketmq.example.benchmark; -import java.io.UnsupportedEncodingException; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.Random; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.srvutil.ServerUtil; +import java.io.UnsupportedEncodingException; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.Random; +import java.util.TimerTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicLong; + public class Producer { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { @@ -73,7 +75,8 @@ public class Producer { final StatsBenchmarkProducer statsBenchmark = new StatsBenchmarkProducer(); - final Timer timer = new Timer("BenchmarkTimerThread", true); + ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, + new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build()); final LinkedList snapshotList = new LinkedList(); @@ -87,7 +90,7 @@ public class Producer { } } - timer.scheduleAtFixedRate(new TimerTask() { + executorService.scheduleAtFixedRate(new TimerTask() { @Override public void run() { snapshotList.addLast(statsBenchmark.createSnapshot()); @@ -95,9 +98,9 @@ public class Producer { snapshotList.removeFirst(); } } - }, 1000, 1000); + }, 1000, 1000, TimeUnit.MILLISECONDS); - timer.scheduleAtFixedRate(new TimerTask() { + executorService.scheduleAtFixedRate(new TimerTask() { private void printStats() { if (snapshotList.size() >= 10) { doPrintStats(snapshotList, statsBenchmark, false); @@ -112,7 +115,7 @@ public class Producer { e.printStackTrace(); } } - }, 10000, 10000); + }, 10000, 10000, TimeUnit.MILLISECONDS); RPCHook rpcHook = aclEnable ? AclClient.getAclRPCHook() : null; final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer", rpcHook, msgTraceEnable, null); @@ -224,7 +227,12 @@ public class Producer { try { sendThreadPool.shutdown(); sendThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); - timer.cancel(); + executorService.shutdown(); + try { + executorService.awaitTermination(5000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + } + if (snapshotList.size() > 1) { doPrintStats(snapshotList, statsBenchmark, true); } else { diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java index 1b511d8a9e5ffef29452badc44176ad5833855a7..c4f14a48b9b3ed795bbf92ee40f44144669ec109 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java @@ -17,26 +17,11 @@ package org.apache.rocketmq.example.benchmark; -import java.io.UnsupportedEncodingException; -import java.nio.ByteBuffer; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.SendResult; @@ -48,6 +33,24 @@ import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.srvutil.ServerUtil; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.LinkedHashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.TimerTask; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + public class TransactionProducer { private static final long START_TIME = System.currentTimeMillis(); private static final AtomicLong MSG_COUNT = new AtomicLong(0); @@ -75,11 +78,12 @@ public class TransactionProducer { final StatsBenchmarkTProducer statsBenchmark = new StatsBenchmarkTProducer(); - final Timer timer = new Timer("BenchmarkTimerThread", true); + ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, + new BasicThreadFactory.Builder().namingPattern("BenchmarkTimerThread-%d").daemon(true).build()); final LinkedList snapshotList = new LinkedList<>(); - timer.scheduleAtFixedRate(new TimerTask() { + executorService.scheduleAtFixedRate(new TimerTask() { @Override public void run() { snapshotList.addLast(statsBenchmark.createSnapshot()); @@ -87,9 +91,9 @@ public class TransactionProducer { snapshotList.removeFirst(); } } - }, 1000, 1000); + }, 1000, 1000, TimeUnit.MILLISECONDS); - timer.scheduleAtFixedRate(new TimerTask() { + executorService.scheduleAtFixedRate(new TimerTask() { private void printStats() { if (snapshotList.size() >= 10) { Snapshot begin = snapshotList.getFirst(); @@ -121,7 +125,7 @@ public class TransactionProducer { e.printStackTrace(); } } - }, 10000, 10000); + }, 10000, 10000, TimeUnit.MILLISECONDS); final TransactionListener transactionCheckListener = new TransactionListenerImpl(statsBenchmark, config); final TransactionMQProducer producer = new TransactionMQProducer(