提交 61a0fcba 编写于 作者: D duhengforever

Reslove compatibility issues and keep consistent with the old API

上级 7cae5839
...@@ -16,6 +16,22 @@ ...@@ -16,6 +16,22 @@
*/ */
package org.apache.rocketmq.client.impl.producer; package org.apache.rocketmq.client.impl.producer;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.common.ClientErrorCode; import org.apache.rocketmq.client.common.ClientErrorCode;
...@@ -31,12 +47,13 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance; ...@@ -31,12 +47,13 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.latency.MQFaultStrategy; import org.apache.rocketmq.client.latency.MQFaultStrategy;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
...@@ -65,23 +82,6 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException; ...@@ -65,23 +82,6 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import java.util.concurrent.RejectedExecutionException;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class DefaultMQProducerImpl implements MQProducerInner { public class DefaultMQProducerImpl implements MQProducerInner {
private final InternalLogger log = ClientLogger.getLog(); private final InternalLogger log = ClientLogger.getLog();
...@@ -122,8 +122,8 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -122,8 +122,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
} else { } else {
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(2000); this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(2000);
this.checkExecutor = new ThreadPoolExecutor( this.checkExecutor = new ThreadPoolExecutor(
1, producer.getCheckThreadPoolMinSize(),
1, producer.getCheckThreadPoolMaxSize(),
1000 * 60, 1000 * 60,
TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS,
this.checkRequestQueue); this.checkRequestQueue);
...@@ -131,8 +131,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -131,8 +131,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
} }
public void destroyTransactionEnv() { public void destroyTransactionEnv() {
this.checkExecutor.shutdown(); if (this.checkExecutor != null) {
this.checkRequestQueue.clear(); this.checkExecutor.shutdown();
}
} }
public void registerSendMessageHook(final SendMessageHook hook) { public void registerSendMessageHook(final SendMessageHook hook) {
...@@ -244,16 +245,17 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -244,16 +245,17 @@ public class DefaultMQProducerImpl implements MQProducerInner {
} }
@Override @Override
public TransactionListener checkListener() { public TransactionCheckListener checkListener() {
if (this.defaultMQProducer instanceof TransactionMQProducer) { if (this.defaultMQProducer instanceof TransactionMQProducer) {
TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer; TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer;
return producer.getTransactionListener(); return producer.getTransactionCheckListener();
} }
return null; return null;
} }
@Override @Override
public void checkTransactionState(final String addr, final MessageExt msg, public void checkTransactionState(final String addr, final MessageExt msg,
final CheckTransactionStateRequestHeader header) { final CheckTransactionStateRequestHeader header) {
Runnable request = new Runnable() { Runnable request = new Runnable() {
...@@ -264,12 +266,12 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -264,12 +266,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
@Override @Override
public void run() { public void run() {
TransactionListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener(); TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
if (transactionCheckListener != null) { if (transactionCheckListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null; Throwable exception = null;
try { try {
localTransactionState = transactionCheckListener.checkLocalTransaction(message); localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} catch (Throwable e) { } catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e); log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
exception = e; exception = e;
...@@ -1096,9 +1098,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1096,9 +1098,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
} }
public TransactionSendResult sendMessageInTransaction(final Message msg, public TransactionSendResult sendMessageInTransaction(final Message msg,
final TransactionListener tranExecuter, final Object arg) final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException { throws MQClientException {
if (null == tranExecuter) { if (null == localTransactionExecuter) {
throw new MQClientException("tranExecutor is null", null); throw new MQClientException("tranExecutor is null", null);
} }
Validators.checkMessage(msg, this.defaultMQProducer); Validators.checkMessage(msg, this.defaultMQProducer);
...@@ -1124,7 +1126,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1124,7 +1126,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (null != transactionId && !"".equals(transactionId)) { if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId); msg.setTransactionId(transactionId);
} }
localTransactionState = tranExecuter.executeLocalTransaction(msg, arg); localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
if (null == localTransactionState) { if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW; localTransactionState = LocalTransactionState.UNKNOW;
} }
......
...@@ -16,18 +16,17 @@ ...@@ -16,18 +16,17 @@
*/ */
package org.apache.rocketmq.client.impl.producer; package org.apache.rocketmq.client.impl.producer;
import org.apache.rocketmq.client.producer.TransactionListener; import java.util.Set;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
import java.util.Set;
public interface MQProducerInner { public interface MQProducerInner {
Set<String> getPublishTopicList(); Set<String> getPublishTopicList();
boolean isPublishTopicNeedUpdate(final String topic); boolean isPublishTopicNeedUpdate(final String topic);
TransactionListener checkListener(); TransactionCheckListener checkListener();
void checkTransactionState( void checkTransactionState(
final String addr, final String addr,
......
...@@ -464,12 +464,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -464,12 +464,14 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* This method is to send transactional messages. * This method is to send transactional messages.
* *
* @param msg Transactional message to send. * @param msg Transactional message to send.
* @param tranExecuter local transaction executor.
* @param arg Argument used along with local transaction executor. * @param arg Argument used along with local transaction executor.
* @return Transaction result. * @return Transaction result.
* @throws MQClientException if there is any client error. * @throws MQClientException if there is any client error.
*/ */
@Override @Override
public TransactionSendResult sendMessageInTransaction(Message msg, final Object arg) public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter,
final Object arg)
throws MQClientException { throws MQClientException {
throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class"); throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.producer;
import org.apache.rocketmq.common.message.Message;
public interface LocalTransactionExecuter {
LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg);
}
...@@ -80,7 +80,8 @@ public interface MQProducer extends MQAdmin { ...@@ -80,7 +80,8 @@ public interface MQProducer extends MQAdmin {
void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg) void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
throws MQClientException, RemotingException, InterruptedException; throws MQClientException, RemotingException, InterruptedException;
TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException; TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
//for batch //for batch
SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
......
...@@ -16,25 +16,8 @@ ...@@ -16,25 +16,8 @@
*/ */
package org.apache.rocketmq.client.producer; package org.apache.rocketmq.client.producer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
public interface TransactionListener { public interface TransactionCheckListener {
/** LocalTransactionState checkLocalTransactionState(final MessageExt msg);
* When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
*
* @param msg Half(prepare) message
* @param arg Custom business parameter
* @return Transaction state
*/
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
/**
* When no response to prepare(half) message. broker will send check message to check the transaction status, and this
* method will be invoked to get local transaction status.
*
* @param msg Check message
* @return Transaction state
*/
LocalTransactionState checkLocalTransaction(final MessageExt msg);
} }
...@@ -16,14 +16,16 @@ ...@@ -16,14 +16,16 @@
*/ */
package org.apache.rocketmq.client.producer; package org.apache.rocketmq.client.producer;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import java.util.concurrent.ExecutorService;
public class TransactionMQProducer extends DefaultMQProducer { public class TransactionMQProducer extends DefaultMQProducer {
private TransactionListener transactionListener; private TransactionCheckListener transactionCheckListener;
private int checkThreadPoolMinSize = 1;
private int checkThreadPoolMaxSize = 1;
private int checkRequestHoldMax = 2000;
private ExecutorService executorService; private ExecutorService executorService;
...@@ -51,20 +53,45 @@ public class TransactionMQProducer extends DefaultMQProducer { ...@@ -51,20 +53,45 @@ public class TransactionMQProducer extends DefaultMQProducer {
} }
@Override @Override
public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException { public TransactionSendResult sendMessageInTransaction(final Message msg,
if (null == this.transactionListener) { final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException {
throw new MQClientException("TransactionListener is null", null); if (null == this.transactionCheckListener) {
throw new MQClientException("localTransactionBranchCheckListener is null", null);
} }
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, transactionListener, arg); return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);
}
public TransactionCheckListener getTransactionCheckListener() {
return transactionCheckListener;
}
public void setTransactionCheckListener(TransactionCheckListener transactionCheckListener) {
this.transactionCheckListener = transactionCheckListener;
}
public int getCheckThreadPoolMinSize() {
return checkThreadPoolMinSize;
}
public void setCheckThreadPoolMinSize(int checkThreadPoolMinSize) {
this.checkThreadPoolMinSize = checkThreadPoolMinSize;
}
public int getCheckThreadPoolMaxSize() {
return checkThreadPoolMaxSize;
}
public void setCheckThreadPoolMaxSize(int checkThreadPoolMaxSize) {
this.checkThreadPoolMaxSize = checkThreadPoolMaxSize;
} }
public TransactionListener getTransactionListener() { public int getCheckRequestHoldMax() {
return transactionListener; return checkRequestHoldMax;
} }
public void setTransactionListener(TransactionListener transactionListener) { public void setCheckRequestHoldMax(int checkRequestHoldMax) {
this.transactionListener = transactionListener; this.checkRequestHoldMax = checkRequestHoldMax;
} }
public ExecutorService getExecutorService() { public ExecutorService getExecutorService() {
......
...@@ -150,13 +150,13 @@ public class BrokerConfig { ...@@ -150,13 +150,13 @@ public class BrokerConfig {
* that can be checked. * that can be checked.
*/ */
@ImportantField @ImportantField
private long transactionTimeOut = 3 * 1000; private long transactionTimeOut = 6 * 1000;
/** /**
* The maximum number of times the message was checked, if exceed this value, this message will be discarded. * The maximum number of times the message was checked, if exceed this value, this message will be discarded.
*/ */
@ImportantField @ImportantField
private int transactionCheckMax = 5; private int transactionCheckMax = 15;
/** /**
* Transaction message check interval. * Transaction message check interval.
......
...@@ -57,7 +57,7 @@ public class MixAll { ...@@ -57,7 +57,7 @@ public class MixAll {
public static final String WS_DOMAIN_SUBGROUP = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr"); public static final String WS_DOMAIN_SUBGROUP = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
//http://jmenv.tbsite.net:8080/rocketmq/nsaddr //http://jmenv.tbsite.net:8080/rocketmq/nsaddr
//public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP; //public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP;
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "AUTO_CREATE_TOPIC_KEY"; // Will be created at broker when isAutoCreateTopicEnable public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102"; // Will be created at broker when isAutoCreateTopicEnable
public static final String BENCHMARK_TOPIC = "BenchmarkTest"; public static final String BENCHMARK_TOPIC = "BenchmarkTest";
public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER"; public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER";
public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER"; public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER";
......
...@@ -17,15 +17,6 @@ ...@@ -17,15 +17,6 @@
package org.apache.rocketmq.example.benchmark; package org.apache.rocketmq.example.benchmark;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Timer; import java.util.Timer;
...@@ -33,18 +24,27 @@ import java.util.TimerTask; ...@@ -33,18 +24,27 @@ import java.util.TimerTask;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class TransactionProducer { public class TransactionProducer {
private static int threadCount; private static int threadCount;
private static int messageSize; private static int messageSize;
private static boolean isCheck; private static boolean ischeck;
private static boolean isCheckFalse; private static boolean ischeckffalse;
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 32; threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 32;
messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2; messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2;
isCheck = args.length >= 3 && Boolean.parseBoolean(args[2]); ischeck = args.length >= 3 && Boolean.parseBoolean(args[2]);
isCheckFalse = args.length >= 4 && Boolean.parseBoolean(args[3]); ischeckffalse = args.length >= 4 && Boolean.parseBoolean(args[3]);
final Message msg = buildMessage(messageSize); final Message msg = buildMessage(messageSize);
...@@ -73,8 +73,8 @@ public class TransactionProducer { ...@@ -73,8 +73,8 @@ public class TransactionProducer {
Long[] end = snapshotList.getLast(); Long[] end = snapshotList.getLast();
final long sendTps = final long sendTps =
(long)(((end[3] - begin[3]) / (double)(end[0] - begin[0])) * 1000L); (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L);
final double averageRT = (end[5] - begin[5]) / (double)(end[3] - begin[3]); final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
System.out.printf( System.out.printf(
"Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n", "Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n",
...@@ -92,14 +92,16 @@ public class TransactionProducer { ...@@ -92,14 +92,16 @@ public class TransactionProducer {
} }
}, 10000, 10000); }, 10000, 10000);
final TransactionListener transactionListener = final TransactionCheckListener transactionCheckListener =
new TransactionListenerImpl(isCheckFalse, isCheck, statsBenchmark); new TransactionCheckListenerBImpl(ischeckffalse, statsBenchmark);
final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer"); final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer");
producer.setInstanceName(Long.toString(System.currentTimeMillis())); producer.setInstanceName(Long.toString(System.currentTimeMillis()));
producer.setTransactionListener(transactionListener); producer.setTransactionCheckListener(transactionCheckListener);
producer.setDefaultTopicQueueNums(1000); producer.setDefaultTopicQueueNums(1000);
producer.start(); producer.start();
final TransactionExecuterBImpl tranExecuter = new TransactionExecuterBImpl(ischeck);
for (int i = 0; i < threadCount; i++) { for (int i = 0; i < threadCount; i++) {
sendThreadPool.execute(new Runnable() { sendThreadPool.execute(new Runnable() {
@Override @Override
...@@ -109,7 +111,7 @@ public class TransactionProducer { ...@@ -109,7 +111,7 @@ public class TransactionProducer {
// Thread.sleep(1000); // Thread.sleep(1000);
final long beginTimestamp = System.currentTimeMillis(); final long beginTimestamp = System.currentTimeMillis();
SendResult sendResult = SendResult sendResult =
producer.sendMessageInTransaction(msg, null); producer.sendMessageInTransaction(msg, tranExecuter, null);
if (sendResult != null) { if (sendResult != null) {
statsBenchmark.getSendRequestSuccessCount().incrementAndGet(); statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet(); statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet();
...@@ -122,7 +124,8 @@ public class TransactionProducer { ...@@ -122,7 +124,8 @@ public class TransactionProducer {
boolean updated = boolean updated =
statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT,
currentRT); currentRT);
if (updated) { break; } if (updated)
break;
prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
} }
...@@ -150,37 +153,43 @@ public class TransactionProducer { ...@@ -150,37 +153,43 @@ public class TransactionProducer {
} }
} }
class TransactionExecuterBImpl implements LocalTransactionExecuter {
class TransactionListenerImpl implements TransactionListener { private boolean ischeck;
private boolean isCheckFalse;
public TransactionExecuterBImpl(boolean ischeck) {
this.ischeck = ischeck;
}
@Override
public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
if (ischeck) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
class TransactionCheckListenerBImpl implements TransactionCheckListener {
private boolean ischeckffalse;
private StatsBenchmarkTProducer statsBenchmarkTProducer; private StatsBenchmarkTProducer statsBenchmarkTProducer;
private boolean isCheckLocal;
public TransactionListenerImpl(boolean isCheckFalse, boolean isCheckLocal, public TransactionCheckListenerBImpl(boolean ischeckffalse,
StatsBenchmarkTProducer statsBenchmarkTProducer) { StatsBenchmarkTProducer statsBenchmarkTProducer) {
this.isCheckFalse = isCheckFalse; this.ischeckffalse = ischeckffalse;
this.isCheckLocal = isCheckLocal;
this.statsBenchmarkTProducer = statsBenchmarkTProducer; this.statsBenchmarkTProducer = statsBenchmarkTProducer;
} }
@Override @Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) { public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
statsBenchmarkTProducer.getCheckRequestSuccessCount().incrementAndGet(); statsBenchmarkTProducer.getCheckRequestSuccessCount().incrementAndGet();
if (isCheckFalse) { if (ischeckffalse) {
return LocalTransactionState.ROLLBACK_MESSAGE; return LocalTransactionState.ROLLBACK_MESSAGE;
} }
return LocalTransactionState.COMMIT_MESSAGE; return LocalTransactionState.COMMIT_MESSAGE;
} }
@Override
public LocalTransactionState executeLocalTransaction(final Message msg, final Object arg) {
if (isCheckLocal) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
} }
class StatsBenchmarkTProducer { class StatsBenchmarkTProducer {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.transaction;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.Message;
public class LocalTransactionExecuterImpl implements LocalTransactionExecuter {
@Override public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
return LocalTransactionState.UNKNOW;
}
}
...@@ -16,30 +16,18 @@ ...@@ -16,30 +16,18 @@
*/ */
package org.apache.rocketmq.example.transaction; package org.apache.rocketmq.example.transaction;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import java.util.concurrent.ConcurrentHashMap; public class TransactionCheckListenerImpl implements TransactionCheckListener {
import java.util.concurrent.atomic.AtomicInteger;
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); private AtomicInteger localTrans = new AtomicInteger(0);
@Override @Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
int value = transactionIndex.getAndIncrement(); Integer status = localTrans.getAndIncrement() % 3;
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) { if (null != status) {
switch (status) { switch (status) {
case 0: case 0:
...@@ -48,6 +36,8 @@ public class TransactionListenerImpl implements TransactionListener { ...@@ -48,6 +36,8 @@ public class TransactionListenerImpl implements TransactionListener {
return LocalTransactionState.COMMIT_MESSAGE; return LocalTransactionState.COMMIT_MESSAGE;
case 2: case 2:
return LocalTransactionState.ROLLBACK_MESSAGE; return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
} }
} }
return LocalTransactionState.COMMIT_MESSAGE; return LocalTransactionState.COMMIT_MESSAGE;
......
...@@ -17,8 +17,9 @@ ...@@ -17,8 +17,9 @@
package org.apache.rocketmq.example.transaction; package org.apache.rocketmq.example.transaction;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
...@@ -32,7 +33,7 @@ import java.util.concurrent.TimeUnit; ...@@ -32,7 +33,7 @@ import java.util.concurrent.TimeUnit;
public class TransactionProducer { public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException { public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl(); TransactionCheckListener transactionListener = new TransactionCheckListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override @Override
...@@ -42,9 +43,9 @@ public class TransactionProducer { ...@@ -42,9 +43,9 @@ public class TransactionProducer {
return thread; return thread;
} }
}); });
LocalTransactionExecuter executor = new LocalTransactionExecuterImpl();
producer.setExecutorService(executorService); producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener); producer.setTransactionCheckListener(transactionListener);
producer.start(); producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
...@@ -53,7 +54,7 @@ public class TransactionProducer { ...@@ -53,7 +54,7 @@ public class TransactionProducer {
Message msg = Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i, new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null); SendResult sendResult = producer.sendMessageInTransaction(msg, executor,null);
System.out.printf("%s%n", sendResult); System.out.printf("%s%n", sendResult);
Thread.sleep(10); Thread.sleep(10);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册