From 61a0fcba5e94d2e50da3dc681102f1efc35932ca Mon Sep 17 00:00:00 2001 From: duhengforever Date: Wed, 8 Aug 2018 21:58:01 +0800 Subject: [PATCH] Reslove compatibility issues and keep consistent with the old API --- .../impl/producer/DefaultMQProducerImpl.java | 60 +++++++------- .../client/impl/producer/MQProducerInner.java | 7 +- .../client/producer/DefaultMQProducer.java | 4 +- .../producer/LocalTransactionExecuter.java | 23 +++++ .../rocketmq/client/producer/MQProducer.java | 3 +- ...ner.java => TransactionCheckListener.java} | 21 +---- .../producer/TransactionMQProducer.java | 49 ++++++++--- .../apache/rocketmq/common/BrokerConfig.java | 4 +- .../org/apache/rocketmq/common/MixAll.java | 2 +- .../benchmark/TransactionProducer.java | 83 ++++++++++--------- .../LocalTransactionExecuterImpl.java | 27 ++++++ ...java => TransactionCheckListenerImpl.java} | 26 ++---- .../transaction/TransactionProducer.java | 11 +-- 13 files changed, 192 insertions(+), 128 deletions(-) create mode 100644 client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java rename client/src/main/java/org/apache/rocketmq/client/producer/{TransactionListener.java => TransactionCheckListener.java} (53%) create mode 100644 example/src/main/java/org/apache/rocketmq/example/transaction/LocalTransactionExecuterImpl.java rename example/src/main/java/org/apache/rocketmq/example/transaction/{TransactionListenerImpl.java => TransactionCheckListenerImpl.java} (63%) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index e1d9f904..8d9bcd20 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -16,6 +16,22 @@ */ package org.apache.rocketmq.client.impl.producer; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.common.ClientErrorCode; @@ -31,12 +47,13 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.latency.MQFaultStrategy; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; -import org.apache.rocketmq.client.producer.TransactionListener; +import org.apache.rocketmq.client.producer.TransactionCheckListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.common.MixAll; @@ -65,23 +82,6 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; -import java.util.concurrent.RejectedExecutionException; - -import java.io.IOException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; public class DefaultMQProducerImpl implements MQProducerInner { private final InternalLogger log = ClientLogger.getLog(); @@ -122,8 +122,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { } else { this.checkRequestQueue = new LinkedBlockingQueue(2000); this.checkExecutor = new ThreadPoolExecutor( - 1, - 1, + producer.getCheckThreadPoolMinSize(), + producer.getCheckThreadPoolMaxSize(), 1000 * 60, TimeUnit.MILLISECONDS, this.checkRequestQueue); @@ -131,8 +131,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { } public void destroyTransactionEnv() { - this.checkExecutor.shutdown(); - this.checkRequestQueue.clear(); + if (this.checkExecutor != null) { + this.checkExecutor.shutdown(); + } } public void registerSendMessageHook(final SendMessageHook hook) { @@ -244,16 +245,17 @@ public class DefaultMQProducerImpl implements MQProducerInner { } @Override - public TransactionListener checkListener() { + public TransactionCheckListener checkListener() { if (this.defaultMQProducer instanceof TransactionMQProducer) { TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer; - return producer.getTransactionListener(); + return producer.getTransactionCheckListener(); } return null; } @Override + public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) { Runnable request = new Runnable() { @@ -264,12 +266,12 @@ public class DefaultMQProducerImpl implements MQProducerInner { @Override public void run() { - TransactionListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener(); + TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener(); if (transactionCheckListener != null) { LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable exception = null; try { - localTransactionState = transactionCheckListener.checkLocalTransaction(message); + localTransactionState = transactionCheckListener.checkLocalTransactionState(message); } catch (Throwable e) { log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e); exception = e; @@ -1096,9 +1098,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { } public TransactionSendResult sendMessageInTransaction(final Message msg, - final TransactionListener tranExecuter, final Object arg) + final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { - if (null == tranExecuter) { + if (null == localTransactionExecuter) { throw new MQClientException("tranExecutor is null", null); } Validators.checkMessage(msg, this.defaultMQProducer); @@ -1124,7 +1126,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } - localTransactionState = tranExecuter.executeLocalTransaction(msg, arg); + localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java index 52ebe1b5..dfd485dd 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/MQProducerInner.java @@ -16,18 +16,17 @@ */ package org.apache.rocketmq.client.impl.producer; -import org.apache.rocketmq.client.producer.TransactionListener; +import java.util.Set; +import org.apache.rocketmq.client.producer.TransactionCheckListener; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; -import java.util.Set; - public interface MQProducerInner { Set getPublishTopicList(); boolean isPublishTopicNeedUpdate(final String topic); - TransactionListener checkListener(); + TransactionCheckListener checkListener(); void checkTransactionState( final String addr, diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 065f068c..267dbe8f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -464,12 +464,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * This method is to send transactional messages. * * @param msg Transactional message to send. + * @param tranExecuter local transaction executor. * @param arg Argument used along with local transaction executor. * @return Transaction result. * @throws MQClientException if there is any client error. */ @Override - public TransactionSendResult sendMessageInTransaction(Message msg, final Object arg) + public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, + final Object arg) throws MQClientException { throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class"); } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java new file mode 100644 index 00000000..80b55469 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/producer/LocalTransactionExecuter.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.producer; + +import org.apache.rocketmq.common.message.Message; + +public interface LocalTransactionExecuter { + LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg); +} diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java index 0776ee15..14caf6ff 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java @@ -80,7 +80,8 @@ public interface MQProducer extends MQAdmin { void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg) throws MQClientException, RemotingException, InterruptedException; - TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException; + TransactionSendResult sendMessageInTransaction(final Message msg, + final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException; //for batch SendResult send(final Collection msgs) throws MQClientException, RemotingException, MQBrokerException, diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionListener.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java similarity index 53% rename from client/src/main/java/org/apache/rocketmq/client/producer/TransactionListener.java rename to client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java index c750e538..1cf5c4d9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionListener.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java @@ -16,25 +16,8 @@ */ package org.apache.rocketmq.client.producer; -import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; -public interface TransactionListener { - /** - * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction. - * - * @param msg Half(prepare) message - * @param arg Custom business parameter - * @return Transaction state - */ - LocalTransactionState executeLocalTransaction(final Message msg, final Object arg); - - /** - * When no response to prepare(half) message. broker will send check message to check the transaction status, and this - * method will be invoked to get local transaction status. - * - * @param msg Check message - * @return Transaction state - */ - LocalTransactionState checkLocalTransaction(final MessageExt msg); +public interface TransactionCheckListener { + LocalTransactionState checkLocalTransactionState(final MessageExt msg); } diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java index c4f122c5..14fae65a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java @@ -16,14 +16,16 @@ */ package org.apache.rocketmq.client.producer; +import java.util.concurrent.ExecutorService; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; -import java.util.concurrent.ExecutorService; - public class TransactionMQProducer extends DefaultMQProducer { - private TransactionListener transactionListener; + private TransactionCheckListener transactionCheckListener; + private int checkThreadPoolMinSize = 1; + private int checkThreadPoolMaxSize = 1; + private int checkRequestHoldMax = 2000; private ExecutorService executorService; @@ -51,20 +53,45 @@ public class TransactionMQProducer extends DefaultMQProducer { } @Override - public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException { - if (null == this.transactionListener) { - throw new MQClientException("TransactionListener is null", null); + public TransactionSendResult sendMessageInTransaction(final Message msg, + final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException { + if (null == this.transactionCheckListener) { + throw new MQClientException("localTransactionBranchCheckListener is null", null); } - return this.defaultMQProducerImpl.sendMessageInTransaction(msg, transactionListener, arg); + return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg); + } + + public TransactionCheckListener getTransactionCheckListener() { + return transactionCheckListener; + } + + public void setTransactionCheckListener(TransactionCheckListener transactionCheckListener) { + this.transactionCheckListener = transactionCheckListener; + } + + public int getCheckThreadPoolMinSize() { + return checkThreadPoolMinSize; + } + + public void setCheckThreadPoolMinSize(int checkThreadPoolMinSize) { + this.checkThreadPoolMinSize = checkThreadPoolMinSize; + } + + public int getCheckThreadPoolMaxSize() { + return checkThreadPoolMaxSize; + } + + public void setCheckThreadPoolMaxSize(int checkThreadPoolMaxSize) { + this.checkThreadPoolMaxSize = checkThreadPoolMaxSize; } - public TransactionListener getTransactionListener() { - return transactionListener; + public int getCheckRequestHoldMax() { + return checkRequestHoldMax; } - public void setTransactionListener(TransactionListener transactionListener) { - this.transactionListener = transactionListener; + public void setCheckRequestHoldMax(int checkRequestHoldMax) { + this.checkRequestHoldMax = checkRequestHoldMax; } public ExecutorService getExecutorService() { diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 442f456a..9b713d12 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -150,13 +150,13 @@ public class BrokerConfig { * that can be checked. */ @ImportantField - private long transactionTimeOut = 3 * 1000; + private long transactionTimeOut = 6 * 1000; /** * The maximum number of times the message was checked, if exceed this value, this message will be discarded. */ @ImportantField - private int transactionCheckMax = 5; + private int transactionCheckMax = 15; /** * Transaction message check interval. diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index ae97cc97..20d18676 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -57,7 +57,7 @@ public class MixAll { public static final String WS_DOMAIN_SUBGROUP = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr"); //http://jmenv.tbsite.net:8080/rocketmq/nsaddr //public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP; - public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "AUTO_CREATE_TOPIC_KEY"; // Will be created at broker when isAutoCreateTopicEnable + public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102"; // Will be created at broker when isAutoCreateTopicEnable public static final String BENCHMARK_TOPIC = "BenchmarkTest"; public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER"; public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER"; diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java index 2d8a5fea..d9fafdd0 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java @@ -17,15 +17,6 @@ package org.apache.rocketmq.example.benchmark; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.client.producer.LocalTransactionState; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.client.producer.TransactionListener; -import org.apache.rocketmq.client.producer.TransactionMQProducer; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.common.message.MessageExt; -import org.apache.rocketmq.remoting.common.RemotingHelper; - import java.io.UnsupportedEncodingException; import java.util.LinkedList; import java.util.Timer; @@ -33,18 +24,27 @@ import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.TransactionCheckListener; +import org.apache.rocketmq.client.producer.TransactionMQProducer; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.remoting.common.RemotingHelper; public class TransactionProducer { private static int threadCount; private static int messageSize; - private static boolean isCheck; - private static boolean isCheckFalse; + private static boolean ischeck; + private static boolean ischeckffalse; public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 32; messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2; - isCheck = args.length >= 3 && Boolean.parseBoolean(args[2]); - isCheckFalse = args.length >= 4 && Boolean.parseBoolean(args[3]); + ischeck = args.length >= 3 && Boolean.parseBoolean(args[2]); + ischeckffalse = args.length >= 4 && Boolean.parseBoolean(args[3]); final Message msg = buildMessage(messageSize); @@ -73,8 +73,8 @@ public class TransactionProducer { Long[] end = snapshotList.getLast(); final long sendTps = - (long)(((end[3] - begin[3]) / (double)(end[0] - begin[0])) * 1000L); - final double averageRT = (end[5] - begin[5]) / (double)(end[3] - begin[3]); + (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L); + final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]); System.out.printf( "Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n", @@ -92,14 +92,16 @@ public class TransactionProducer { } }, 10000, 10000); - final TransactionListener transactionListener = - new TransactionListenerImpl(isCheckFalse, isCheck, statsBenchmark); + final TransactionCheckListener transactionCheckListener = + new TransactionCheckListenerBImpl(ischeckffalse, statsBenchmark); final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer"); producer.setInstanceName(Long.toString(System.currentTimeMillis())); - producer.setTransactionListener(transactionListener); + producer.setTransactionCheckListener(transactionCheckListener); producer.setDefaultTopicQueueNums(1000); producer.start(); + final TransactionExecuterBImpl tranExecuter = new TransactionExecuterBImpl(ischeck); + for (int i = 0; i < threadCount; i++) { sendThreadPool.execute(new Runnable() { @Override @@ -109,7 +111,7 @@ public class TransactionProducer { // Thread.sleep(1000); final long beginTimestamp = System.currentTimeMillis(); SendResult sendResult = - producer.sendMessageInTransaction(msg, null); + producer.sendMessageInTransaction(msg, tranExecuter, null); if (sendResult != null) { statsBenchmark.getSendRequestSuccessCount().incrementAndGet(); statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet(); @@ -122,7 +124,8 @@ public class TransactionProducer { boolean updated = statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, currentRT); - if (updated) { break; } + if (updated) + break; prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); } @@ -150,37 +153,43 @@ public class TransactionProducer { } } +class TransactionExecuterBImpl implements LocalTransactionExecuter { -class TransactionListenerImpl implements TransactionListener { - private boolean isCheckFalse; + private boolean ischeck; + + public TransactionExecuterBImpl(boolean ischeck) { + this.ischeck = ischeck; + } + + @Override + public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) { + if (ischeck) { + return LocalTransactionState.UNKNOW; + } + return LocalTransactionState.COMMIT_MESSAGE; + } +} + +class TransactionCheckListenerBImpl implements TransactionCheckListener { + private boolean ischeckffalse; private StatsBenchmarkTProducer statsBenchmarkTProducer; - private boolean isCheckLocal; - public TransactionListenerImpl(boolean isCheckFalse, boolean isCheckLocal, - StatsBenchmarkTProducer statsBenchmarkTProducer) { - this.isCheckFalse = isCheckFalse; - this.isCheckLocal = isCheckLocal; + public TransactionCheckListenerBImpl(boolean ischeckffalse, + StatsBenchmarkTProducer statsBenchmarkTProducer) { + this.ischeckffalse = ischeckffalse; this.statsBenchmarkTProducer = statsBenchmarkTProducer; } @Override - public LocalTransactionState checkLocalTransaction(MessageExt msg) { + public LocalTransactionState checkLocalTransactionState(MessageExt msg) { statsBenchmarkTProducer.getCheckRequestSuccessCount().incrementAndGet(); - if (isCheckFalse) { + if (ischeckffalse) { return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE; } - - @Override - public LocalTransactionState executeLocalTransaction(final Message msg, final Object arg) { - if (isCheckLocal) { - return LocalTransactionState.UNKNOW; - } - return LocalTransactionState.COMMIT_MESSAGE; - } } class StatsBenchmarkTProducer { diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/LocalTransactionExecuterImpl.java b/example/src/main/java/org/apache/rocketmq/example/transaction/LocalTransactionExecuterImpl.java new file mode 100644 index 00000000..a3b363fd --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/transaction/LocalTransactionExecuterImpl.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.transaction; + +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; +import org.apache.rocketmq.client.producer.LocalTransactionState; +import org.apache.rocketmq.common.message.Message; + +public class LocalTransactionExecuterImpl implements LocalTransactionExecuter { + @Override public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) { + return LocalTransactionState.UNKNOW; + } +} diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionListenerImpl.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionCheckListenerImpl.java similarity index 63% rename from example/src/main/java/org/apache/rocketmq/example/transaction/TransactionListenerImpl.java rename to example/src/main/java/org/apache/rocketmq/example/transaction/TransactionCheckListenerImpl.java index ce471d2c..fcab5193 100644 --- a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionListenerImpl.java +++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionCheckListenerImpl.java @@ -16,30 +16,18 @@ */ package org.apache.rocketmq.example.transaction; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.client.producer.LocalTransactionState; -import org.apache.rocketmq.client.producer.TransactionListener; -import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.client.producer.TransactionCheckListener; import org.apache.rocketmq.common.message.MessageExt; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - -public class TransactionListenerImpl implements TransactionListener { - private AtomicInteger transactionIndex = new AtomicInteger(0); +public class TransactionCheckListenerImpl implements TransactionCheckListener { - private ConcurrentHashMap localTrans = new ConcurrentHashMap<>(); + private AtomicInteger localTrans = new AtomicInteger(0); @Override - public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { - int value = transactionIndex.getAndIncrement(); - int status = value % 3; - localTrans.put(msg.getTransactionId(), status); - return LocalTransactionState.UNKNOW; - } - - @Override - public LocalTransactionState checkLocalTransaction(MessageExt msg) { - Integer status = localTrans.get(msg.getTransactionId()); + public LocalTransactionState checkLocalTransactionState(MessageExt msg) { + Integer status = localTrans.getAndIncrement() % 3; if (null != status) { switch (status) { case 0: @@ -48,6 +36,8 @@ public class TransactionListenerImpl implements TransactionListener { return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; + default: + return LocalTransactionState.COMMIT_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; diff --git a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java index 75c805b9..6d80b225 100644 --- a/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/transaction/TransactionProducer.java @@ -17,8 +17,9 @@ package org.apache.rocketmq.example.transaction; import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.LocalTransactionExecuter; import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.client.producer.TransactionListener; +import org.apache.rocketmq.client.producer.TransactionCheckListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; @@ -32,7 +33,7 @@ import java.util.concurrent.TimeUnit; public class TransactionProducer { public static void main(String[] args) throws MQClientException, InterruptedException { - TransactionListener transactionListener = new TransactionListenerImpl(); + TransactionCheckListener transactionListener = new TransactionCheckListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), new ThreadFactory() { @Override @@ -42,9 +43,9 @@ public class TransactionProducer { return thread; } }); - + LocalTransactionExecuter executor = new LocalTransactionExecuterImpl(); producer.setExecutorService(executorService); - producer.setTransactionListener(transactionListener); + producer.setTransactionCheckListener(transactionListener); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; @@ -53,7 +54,7 @@ public class TransactionProducer { Message msg = new Message("TopicTest1234", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); - SendResult sendResult = producer.sendMessageInTransaction(msg, null); + SendResult sendResult = producer.sendMessageInTransaction(msg, executor,null); System.out.printf("%s%n", sendResult); Thread.sleep(10); -- GitLab