未验证 提交 6a6f2e7d 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #1544 from areyouok/4.5.2.fix

[ISSUE #1519]Optimise performance/stability of  transaction message
...@@ -55,7 +55,7 @@ public class EndTransactionProcessor implements NettyRequestProcessor { ...@@ -55,7 +55,7 @@ public class EndTransactionProcessor implements NettyRequestProcessor {
final RemotingCommand response = RemotingCommand.createResponseCommand(null); final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader = final EndTransactionRequestHeader requestHeader =
(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class); (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
LOGGER.info("Transaction request:{}", requestHeader); LOGGER.debug("Transaction request:{}", requestHeader);
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) { if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE); response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
LOGGER.warn("Message store is slave mode, so end transaction is forbidden. "); LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
......
...@@ -30,6 +30,7 @@ import java.util.concurrent.ArrayBlockingQueue; ...@@ -30,6 +30,7 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public abstract class AbstractTransactionalMessageCheckListener { public abstract class AbstractTransactionalMessageCheckListener {
...@@ -48,7 +49,7 @@ public abstract class AbstractTransactionalMessageCheckListener { ...@@ -48,7 +49,7 @@ public abstract class AbstractTransactionalMessageCheckListener {
thread.setName("Transaction-msg-check-thread"); thread.setName("Transaction-msg-check-thread");
return thread; return thread;
} }
}); }, new CallerRunsPolicy());
public AbstractTransactionalMessageCheckListener() { public AbstractTransactionalMessageCheckListener() {
} }
......
...@@ -141,6 +141,7 @@ public class TransactionalMessageBridge { ...@@ -141,6 +141,7 @@ public class TransactionalMessageBridge {
getMessageResult.getStatus(), topic, group, offset); getMessageResult.getStatus(), topic, group, offset);
break; break;
case NO_MESSAGE_IN_QUEUE: case NO_MESSAGE_IN_QUEUE:
case OFFSET_OVERFLOW_ONE:
pullStatus = PullStatus.NO_NEW_MSG; pullStatus = PullStatus.NO_NEW_MSG;
LOGGER.warn("No new message. GetMessageStatus={}, topic={}, groupId={}, requestOffset={}", LOGGER.warn("No new message. GetMessageStatus={}, topic={}, groupId={}, requestOffset={}",
getMessageResult.getStatus(), topic, group, offset); getMessageResult.getStatus(), topic, group, offset);
...@@ -149,7 +150,6 @@ public class TransactionalMessageBridge { ...@@ -149,7 +150,6 @@ public class TransactionalMessageBridge {
case NO_MATCHED_LOGIC_QUEUE: case NO_MATCHED_LOGIC_QUEUE:
case OFFSET_FOUND_NULL: case OFFSET_FOUND_NULL:
case OFFSET_OVERFLOW_BADLY: case OFFSET_OVERFLOW_BADLY:
case OFFSET_OVERFLOW_ONE:
case OFFSET_TOO_SMALL: case OFFSET_TOO_SMALL:
pullStatus = PullStatus.OFFSET_ILLEGAL; pullStatus = PullStatus.OFFSET_ILLEGAL;
LOGGER.warn("Offset illegal. GetMessageStatus={}, topic={}, groupId={}, requestOffset={}", LOGGER.warn("Offset illegal. GetMessageStatus={}, topic={}, groupId={}, requestOffset={}",
......
...@@ -159,7 +159,8 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ ...@@ -159,7 +159,8 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
} }
if (removeMap.containsKey(i)) { if (removeMap.containsKey(i)) {
log.info("Half offset {} has been committed/rolled back", i); log.info("Half offset {} has been committed/rolled back", i);
removeMap.remove(i); Long removedOpOffset = removeMap.remove(i);
doneOpOffset.add(removedOpOffset);
} else { } else {
GetResult getResult = getHalfMsg(messageQueue, i); GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg(); MessageExt msgExt = getResult.getMsg();
...@@ -223,7 +224,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ ...@@ -223,7 +224,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
listener.resolveHalfMsg(msgExt); listener.resolveHalfMsg(msgExt);
} else { } else {
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset); pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
messageQueue, pullResult); messageQueue, pullResult);
continue; continue;
} }
...@@ -292,7 +293,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ ...@@ -292,7 +293,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
} }
for (MessageExt opMessageExt : opMsg) { for (MessageExt opMessageExt : opMsg) {
Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset)); Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
log.info("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(), log.debug("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(),
opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset); opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset);
if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) { if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {
if (queueOffset < miniOffset) { if (queueOffset < miniOffset) {
...@@ -460,7 +461,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ ...@@ -460,7 +461,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
@Override @Override
public boolean deletePrepareMessage(MessageExt msgExt) { public boolean deletePrepareMessage(MessageExt msgExt) {
if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) { if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {
log.info("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt); log.debug("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt);
return true; return true;
} else { } else {
log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId()); log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId());
......
...@@ -103,6 +103,10 @@ public class Consumer { ...@@ -103,6 +103,10 @@ public class Consumer {
}, 10000, 10000); }, 10000, 10000);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
if (commandLine.hasOption('n')) {
String ns = commandLine.getOptionValue('n');
consumer.setNamesrvAddr(ns);
}
consumer.setInstanceName(Long.toString(System.currentTimeMillis())); consumer.setInstanceName(Long.toString(System.currentTimeMillis()));
if (filterType == null || expression == null) { if (filterType == null || expression == null) {
......
...@@ -24,6 +24,11 @@ import java.util.TimerTask; ...@@ -24,6 +24,11 @@ import java.util.TimerTask;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter; import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.LocalTransactionState;
...@@ -33,20 +38,19 @@ import org.apache.rocketmq.client.producer.TransactionMQProducer; ...@@ -33,20 +38,19 @@ import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.srvutil.ServerUtil;
public class TransactionProducer { public class TransactionProducer {
private static int threadCount;
private static int messageSize;
private static boolean ischeck;
private static boolean ischeckffalse;
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 32; Options options = ServerUtil.buildCommandlineOptions(new Options());
messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2; CommandLine commandLine = ServerUtil.parseCmdLine("TransactionProducer", args, buildCommandlineOptions(options), new PosixParser());
ischeck = args.length >= 3 && Boolean.parseBoolean(args[2]);
ischeckffalse = args.length >= 4 && Boolean.parseBoolean(args[3]);
final Message msg = buildMessage(messageSize); final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest";
final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 32;
final int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 2048;
final boolean ischeck = commandLine.hasOption('c') ? Boolean.parseBoolean(commandLine.getOptionValue('c')) : false;
final boolean ischeckffalse = commandLine.hasOption('r') ? Boolean.parseBoolean(commandLine.getOptionValue('r')) : true;
final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount); final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount);
...@@ -98,6 +102,10 @@ public class TransactionProducer { ...@@ -98,6 +102,10 @@ public class TransactionProducer {
producer.setInstanceName(Long.toString(System.currentTimeMillis())); producer.setInstanceName(Long.toString(System.currentTimeMillis()));
producer.setTransactionCheckListener(transactionCheckListener); producer.setTransactionCheckListener(transactionCheckListener);
producer.setDefaultTopicQueueNums(1000); producer.setDefaultTopicQueueNums(1000);
if (commandLine.hasOption('n')) {
String ns = commandLine.getOptionValue('n');
producer.setNamesrvAddr(ns);
}
producer.start(); producer.start();
final TransactionExecuterBImpl tranExecuter = new TransactionExecuterBImpl(ischeck); final TransactionExecuterBImpl tranExecuter = new TransactionExecuterBImpl(ischeck);
...@@ -111,7 +119,7 @@ public class TransactionProducer { ...@@ -111,7 +119,7 @@ public class TransactionProducer {
// Thread.sleep(1000); // Thread.sleep(1000);
final long beginTimestamp = System.currentTimeMillis(); final long beginTimestamp = System.currentTimeMillis();
SendResult sendResult = SendResult sendResult =
producer.sendMessageInTransaction(msg, tranExecuter, null); producer.sendMessageInTransaction(buildMessage(messageSize, topic), tranExecuter, null);
if (sendResult != null) { if (sendResult != null) {
statsBenchmark.getSendRequestSuccessCount().incrementAndGet(); statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet(); statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet();
...@@ -138,18 +146,45 @@ public class TransactionProducer { ...@@ -138,18 +146,45 @@ public class TransactionProducer {
} }
} }
private static Message buildMessage(final int messageSize) throws UnsupportedEncodingException { private static Message buildMessage(final int messageSize, String topic) {
try {
Message msg = new Message(); Message msg = new Message();
msg.setTopic("BenchmarkTest"); msg.setTopic(topic);
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
for (int i = 0; i < messageSize; i += 10) { for (int i = 0; i < messageSize; i += 10) {
sb.append("hello baby"); sb.append("hello baby");
} }
msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET)); msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
return msg; return msg;
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
public static Options buildCommandlineOptions(final Options options) {
Option opt = new Option("w", "threadCount", true, "Thread count, Default: 32");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("s", "messageSize", true, "Message Size, Default: 2048");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("t", "topic", true, "Topic name, Default: BenchmarkTest");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("c", "check", true, "Check the message, Default: false");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("r", "checkResult", true, "Message check result, Default: true");
opt.setRequired(false);
options.addOption(opt);
return options;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册