diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java index c9e85ed1a508e9b2850977e3ed23c0637bda68ea..1d5943d43135cde7fca78b06e672cdd68bbe38e7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java @@ -55,7 +55,7 @@ public class EndTransactionProcessor implements NettyRequestProcessor { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final EndTransactionRequestHeader requestHeader = (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class); - LOGGER.info("Transaction request:{}", requestHeader); + LOGGER.debug("Transaction request:{}", requestHeader); if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) { response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE); LOGGER.warn("Message store is slave mode, so end transaction is forbidden. "); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java index 62507cdfdba6ba5c75dd5001d00b8b68f2b9e926..35d811207ba6687949877daa251730c3f26c77f6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java @@ -30,6 +30,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; import java.util.concurrent.TimeUnit; public abstract class AbstractTransactionalMessageCheckListener { @@ -48,7 +49,7 @@ public abstract class AbstractTransactionalMessageCheckListener { thread.setName("Transaction-msg-check-thread"); return thread; } - }); + }, new CallerRunsPolicy()); public AbstractTransactionalMessageCheckListener() { } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java index 84a62761bd61a084723abeaf37fedeeb5213735c..453a335a40b22ce370bd52f03b1dd718e8b3b9a0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java @@ -141,6 +141,7 @@ public class TransactionalMessageBridge { getMessageResult.getStatus(), topic, group, offset); break; case NO_MESSAGE_IN_QUEUE: + case OFFSET_OVERFLOW_ONE: pullStatus = PullStatus.NO_NEW_MSG; LOGGER.warn("No new message. GetMessageStatus={}, topic={}, groupId={}, requestOffset={}", getMessageResult.getStatus(), topic, group, offset); @@ -149,7 +150,6 @@ public class TransactionalMessageBridge { case NO_MATCHED_LOGIC_QUEUE: case OFFSET_FOUND_NULL: case OFFSET_OVERFLOW_BADLY: - case OFFSET_OVERFLOW_ONE: case OFFSET_TOO_SMALL: pullStatus = PullStatus.OFFSET_ILLEGAL; LOGGER.warn("Offset illegal. GetMessageStatus={}, topic={}, groupId={}, requestOffset={}", diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java index e1549b15177e0a2de5fb4160d31d13aee9798f0e..71b575eec098dfd2a2f7ada3b3749ba896700f15 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java @@ -159,7 +159,8 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ } if (removeMap.containsKey(i)) { log.info("Half offset {} has been committed/rolled back", i); - removeMap.remove(i); + Long removedOpOffset = removeMap.remove(i); + doneOpOffset.add(removedOpOffset); } else { GetResult getResult = getHalfMsg(messageQueue, i); MessageExt msgExt = getResult.getMsg(); @@ -223,7 +224,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ listener.resolveHalfMsg(msgExt); } else { pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset); - log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, + log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult); continue; } @@ -292,7 +293,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ } for (MessageExt opMessageExt : opMsg) { Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset)); - log.info("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(), + log.debug("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(), opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset); if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) { if (queueOffset < miniOffset) { @@ -460,7 +461,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ @Override public boolean deletePrepareMessage(MessageExt msgExt) { if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) { - log.info("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt); + log.debug("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt); return true; } else { log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId()); diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java index d431d3ecc6ad6c05dae4ebe8b7796d930c4905fb..4724a1d7ab69b7d9338cd130e12f2fef257d728d 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java @@ -103,6 +103,10 @@ public class Consumer { }, 10000, 10000); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); + if (commandLine.hasOption('n')) { + String ns = commandLine.getOptionValue('n'); + consumer.setNamesrvAddr(ns); + } consumer.setInstanceName(Long.toString(System.currentTimeMillis())); if (filterType == null || expression == null) { diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java index d9fafdd08e88ab4fcdad9226698f573db2ca7553..04707e1a28935349b74c4d50a849c8d8d7247933 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java @@ -24,6 +24,11 @@ import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.PosixParser; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.LocalTransactionExecuter; import org.apache.rocketmq.client.producer.LocalTransactionState; @@ -33,20 +38,19 @@ import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.srvutil.ServerUtil; public class TransactionProducer { - private static int threadCount; - private static int messageSize; - private static boolean ischeck; - private static boolean ischeckffalse; public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { - threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 32; - messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2; - ischeck = args.length >= 3 && Boolean.parseBoolean(args[2]); - ischeckffalse = args.length >= 4 && Boolean.parseBoolean(args[3]); + Options options = ServerUtil.buildCommandlineOptions(new Options()); + CommandLine commandLine = ServerUtil.parseCmdLine("TransactionProducer", args, buildCommandlineOptions(options), new PosixParser()); - final Message msg = buildMessage(messageSize); + final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest"; + final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 32; + final int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 2048; + final boolean ischeck = commandLine.hasOption('c') ? Boolean.parseBoolean(commandLine.getOptionValue('c')) : false; + final boolean ischeckffalse = commandLine.hasOption('r') ? Boolean.parseBoolean(commandLine.getOptionValue('r')) : true; final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount); @@ -98,6 +102,10 @@ public class TransactionProducer { producer.setInstanceName(Long.toString(System.currentTimeMillis())); producer.setTransactionCheckListener(transactionCheckListener); producer.setDefaultTopicQueueNums(1000); + if (commandLine.hasOption('n')) { + String ns = commandLine.getOptionValue('n'); + producer.setNamesrvAddr(ns); + } producer.start(); final TransactionExecuterBImpl tranExecuter = new TransactionExecuterBImpl(ischeck); @@ -111,7 +119,7 @@ public class TransactionProducer { // Thread.sleep(1000); final long beginTimestamp = System.currentTimeMillis(); SendResult sendResult = - producer.sendMessageInTransaction(msg, tranExecuter, null); + producer.sendMessageInTransaction(buildMessage(messageSize, topic), tranExecuter, null); if (sendResult != null) { statsBenchmark.getSendRequestSuccessCount().incrementAndGet(); statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet(); @@ -138,18 +146,45 @@ public class TransactionProducer { } } - private static Message buildMessage(final int messageSize) throws UnsupportedEncodingException { - Message msg = new Message(); - msg.setTopic("BenchmarkTest"); + private static Message buildMessage(final int messageSize, String topic) { + try { + Message msg = new Message(); + msg.setTopic(topic); - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < messageSize; i += 10) { - sb.append("hello baby"); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < messageSize; i += 10) { + sb.append("hello baby"); + } + msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)); + return msg; + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); } + } + + public static Options buildCommandlineOptions(final Options options) { + Option opt = new Option("w", "threadCount", true, "Thread count, Default: 32"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("s", "messageSize", true, "Message Size, Default: 2048"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("t", "topic", true, "Topic name, Default: BenchmarkTest"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("c", "check", true, "Check the message, Default: false"); + opt.setRequired(false); + options.addOption(opt); + + opt = new Option("r", "checkResult", true, "Message check result, Default: true"); + opt.setRequired(false); + options.addOption(opt); - msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)); - return msg; + return options; } }