提交 f6696b12 编写于 作者: D duhengforever

Compatible with the original transactional message API

上级 9db2acdb
......@@ -54,6 +54,7 @@ import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.MixAll;
......@@ -120,7 +121,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (producer.getExecutorService() != null) {
this.checkExecutor = producer.getExecutorService();
} else {
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(2000);
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
this.checkExecutor = new ThreadPoolExecutor(
producer.getCheckThreadPoolMinSize(),
producer.getCheckThreadPoolMaxSize(),
......@@ -244,7 +245,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
return null == prev || !prev.ok();
}
/**
* This method will be removed in the version 5.0.0 and <code>getCheckListener</code> is recommended.
* @return
*/
@Override
@Deprecated
public TransactionCheckListener checkListener() {
if (this.defaultMQProducer instanceof TransactionMQProducer) {
TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer;
......@@ -255,7 +261,15 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
@Override
public TransactionListener getCheckListener() {
if (this.defaultMQProducer instanceof TransactionMQProducer) {
TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer;
return producer.getTransactionListener();
}
return null;
}
@Override
public void checkTransactionState(final String addr, final MessageExt msg,
final CheckTransactionStateRequestHeader header) {
Runnable request = new Runnable() {
......@@ -267,11 +281,19 @@ public class DefaultMQProducerImpl implements MQProducerInner {
@Override
public void run() {
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
if (transactionCheckListener != null) {
TransactionListener transactionListener = getCheckListener();
if (transactionCheckListener != null || transactionListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
if (transactionCheckListener != null) {
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} else if (transactionListener != null) {
log.info("Used new check API in transaction message");
localTransactionState = transactionListener.checkLocalTransaction(message);
} else {
log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
}
} catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
exception = e;
......@@ -282,7 +304,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
group,
exception);
} else {
log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group);
log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
}
}
......@@ -1100,7 +1122,8 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
if (null == localTransactionExecuter) {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
Validators.checkMessage(msg, this.defaultMQProducer);
......@@ -1126,7 +1149,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.info("Used new transaction API");
transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
......
......@@ -18,6 +18,7 @@ package org.apache.rocketmq.client.impl.producer;
import java.util.Set;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
......@@ -27,6 +28,7 @@ public interface MQProducerInner {
boolean isPublishTopicNeedUpdate(final String topic);
TransactionCheckListener checkListener();
TransactionListener getCheckListener();
void checkTransactionState(
final String addr,
......
......@@ -476,6 +476,19 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
}
/**
* This method is used to send transactional messages.
* @param msg Transactional message to send.
* @param arg Argument used along with local transaction executor.
* @return Transaction result.
* @throws MQClientException
*/
@Override
public TransactionSendResult sendMessageInTransaction(Message msg,
Object arg) throws MQClientException {
throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
}
/**
* Create a topic on broker.
*
......
......@@ -18,6 +18,10 @@ package org.apache.rocketmq.client.producer;
import org.apache.rocketmq.common.message.Message;
/**
* This interface will be removed in the version 5.0.0, interface {@link TransactionListener} is recommended.
*/
@Deprecated
public interface LocalTransactionExecuter {
LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg);
}
......@@ -83,6 +83,9 @@ public interface MQProducer extends MQAdmin {
TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException;
//for batch
SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
......
......@@ -17,7 +17,10 @@
package org.apache.rocketmq.client.producer;
import org.apache.rocketmq.common.message.MessageExt;
/**
* This interface will be removed in the version 5.0.0, interface {@link TransactionListener} is recommended.
*/
@Deprecated
public interface TransactionCheckListener {
LocalTransactionState checkLocalTransactionState(final MessageExt msg);
}
......@@ -14,14 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.transaction;
package org.apache.rocketmq.client.producer;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
public class LocalTransactionExecuterImpl implements LocalTransactionExecuter {
@Override public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
return LocalTransactionState.UNKNOW;
}
}
public interface TransactionListener {
/**
* When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
*
* @param msg Half(prepare) message
* @param arg Custom business parameter
* @return Transaction state
*/
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
/**
* When no response to prepare(half) message. broker will send check message to check the transaction status, and this
* method will be invoked to get local transaction status.
*
* @param msg Check message
* @return Transaction state
*/
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
\ No newline at end of file
......@@ -17,8 +17,11 @@
package org.apache.rocketmq.client.producer;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.remoting.RPCHook;
public class TransactionMQProducer extends DefaultMQProducer {
......@@ -29,6 +32,9 @@ public class TransactionMQProducer extends DefaultMQProducer {
private ExecutorService executorService;
private TransactionListener transactionListener;
public TransactionMQProducer() {
}
......@@ -52,7 +58,12 @@ public class TransactionMQProducer extends DefaultMQProducer {
this.defaultMQProducerImpl.destroyTransactionEnv();
}
/**
* This method will be removed in the version 5.0.0, method <code>sendMessageInTransaction(Message,Object)</code>} is recommended.
*/
@Override
@Deprecated
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException {
if (null == this.transactionCheckListener) {
......@@ -62,10 +73,23 @@ public class TransactionMQProducer extends DefaultMQProducer {
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);
}
@Override
public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
public TransactionCheckListener getTransactionCheckListener() {
return transactionCheckListener;
}
/**
* This method will be removed in the version 5.0.0 and set a custom thread pool is recommended.
*/
@Deprecated
public void setTransactionCheckListener(TransactionCheckListener transactionCheckListener) {
this.transactionCheckListener = transactionCheckListener;
}
......@@ -74,6 +98,10 @@ public class TransactionMQProducer extends DefaultMQProducer {
return checkThreadPoolMinSize;
}
/**
* This method will be removed in the version 5.0.0 and set a custom thread pool is recommended.
*/
@Deprecated
public void setCheckThreadPoolMinSize(int checkThreadPoolMinSize) {
this.checkThreadPoolMinSize = checkThreadPoolMinSize;
}
......@@ -82,6 +110,10 @@ public class TransactionMQProducer extends DefaultMQProducer {
return checkThreadPoolMaxSize;
}
/**
* This method will be removed in the version 5.0.0 and set a custom thread pool is recommended.
*/
@Deprecated
public void setCheckThreadPoolMaxSize(int checkThreadPoolMaxSize) {
this.checkThreadPoolMaxSize = checkThreadPoolMaxSize;
}
......@@ -90,6 +122,10 @@ public class TransactionMQProducer extends DefaultMQProducer {
return checkRequestHoldMax;
}
/**
* This method will be removed in the version 5.0.0 and set a custom thread pool is recommended.
*/
@Deprecated
public void setCheckRequestHoldMax(int checkRequestHoldMax) {
this.checkRequestHoldMax = checkRequestHoldMax;
}
......@@ -101,4 +137,12 @@ public class TransactionMQProducer extends DefaultMQProducer {
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
public TransactionListener getTransactionListener() {
return transactionListener;
}
public void setTransactionListener(TransactionListener transactionListener) {
this.transactionListener = transactionListener;
}
}
......@@ -16,18 +16,30 @@
*/
package org.apache.rocketmq.example.transaction;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
public class TransactionCheckListenerImpl implements TransactionCheckListener {
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private AtomicInteger localTrans = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
Integer status = localTrans.getAndIncrement() % 3;
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
......
......@@ -17,9 +17,8 @@
package org.apache.rocketmq.example.transaction;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
......@@ -33,7 +32,7 @@ import java.util.concurrent.TimeUnit;
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionCheckListener transactionListener = new TransactionCheckListenerImpl();
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
......@@ -43,9 +42,9 @@ public class TransactionProducer {
return thread;
}
});
LocalTransactionExecuter localTransactionExecuter = new LocalTransactionExecuterImpl();
producer.setExecutorService(executorService);
producer.setTransactionCheckListener(transactionListener);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
......@@ -54,7 +53,7 @@ public class TransactionProducer {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, localTransactionExecuter,null);
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册