未验证 提交 cdf40428 编写于 作者: Z Zhendong Liu 提交者: GitHub

Merge pull request #398 from duhengforever/develop_compatibility_issue

[ISSUE #395]Resolve compatibility issues and keep consistent with the old API
......@@ -16,6 +16,22 @@
*/
package org.apache.rocketmq.client.impl.producer;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.common.ClientErrorCode;
......@@ -31,11 +47,13 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.latency.MQFaultStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
......@@ -65,23 +83,6 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import java.util.concurrent.RejectedExecutionException;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class DefaultMQProducerImpl implements MQProducerInner {
private final InternalLogger log = ClientLogger.getLog();
......@@ -120,10 +121,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (producer.getExecutorService() != null) {
this.checkExecutor = producer.getExecutorService();
} else {
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(2000);
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
this.checkExecutor = new ThreadPoolExecutor(
1,
1,
producer.getCheckThreadPoolMinSize(),
producer.getCheckThreadPoolMaxSize(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.checkRequestQueue);
......@@ -131,8 +132,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public void destroyTransactionEnv() {
if (this.checkExecutor != null) {
this.checkExecutor.shutdown();
this.checkRequestQueue.clear();
}
}
public void registerSendMessageHook(final SendMessageHook hook) {
......@@ -243,16 +245,30 @@ public class DefaultMQProducerImpl implements MQProducerInner {
return null == prev || !prev.ok();
}
/**
* This method will be removed in the version 5.0.0 and <code>getCheckListener</code> is recommended.
* @return
*/
@Override
public TransactionListener checkListener() {
@Deprecated
public TransactionCheckListener checkListener() {
if (this.defaultMQProducer instanceof TransactionMQProducer) {
TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer;
return producer.getTransactionListener();
return producer.getTransactionCheckListener();
}
return null;
}
@Override
public TransactionListener getCheckListener() {
if (this.defaultMQProducer instanceof TransactionMQProducer) {
TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer;
return producer.getTransactionListener();
}
return null;
}
@Override
public void checkTransactionState(final String addr, final MessageExt msg,
final CheckTransactionStateRequestHeader header) {
......@@ -264,12 +280,20 @@ public class DefaultMQProducerImpl implements MQProducerInner {
@Override
public void run() {
TransactionListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
if (transactionCheckListener != null) {
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
TransactionListener transactionListener = getCheckListener();
if (transactionCheckListener != null || transactionListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
localTransactionState = transactionCheckListener.checkLocalTransaction(message);
if (transactionCheckListener != null) {
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} else if (transactionListener != null) {
log.debug("Used new check API in transaction message");
localTransactionState = transactionListener.checkLocalTransaction(message);
} else {
log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
}
} catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
exception = e;
......@@ -280,7 +304,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
group,
exception);
} else {
log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group);
log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
}
}
......@@ -1096,9 +1120,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public TransactionSendResult sendMessageInTransaction(final Message msg,
final TransactionListener tranExecuter, final Object arg)
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
if (null == tranExecuter) {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
Validators.checkMessage(msg, this.defaultMQProducer);
......@@ -1124,7 +1149,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
localTransactionState = tranExecuter.executeLocalTransaction(msg, arg);
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
......
......@@ -16,18 +16,19 @@
*/
package org.apache.rocketmq.client.impl.producer;
import java.util.Set;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
import java.util.Set;
public interface MQProducerInner {
Set<String> getPublishTopicList();
boolean isPublishTopicNeedUpdate(final String topic);
TransactionListener checkListener();
TransactionCheckListener checkListener();
TransactionListener getCheckListener();
void checkTransactionState(
final String addr,
......
......@@ -464,16 +464,31 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* This method is to send transactional messages.
*
* @param msg Transactional message to send.
* @param tranExecuter local transaction executor.
* @param arg Argument used along with local transaction executor.
* @return Transaction result.
* @throws MQClientException if there is any client error.
*/
@Override
public TransactionSendResult sendMessageInTransaction(Message msg, final Object arg)
public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter,
final Object arg)
throws MQClientException {
throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
}
/**
* This method is used to send transactional messages.
* @param msg Transactional message to send.
* @param arg Argument used along with local transaction executor.
* @return Transaction result.
* @throws MQClientException
*/
@Override
public TransactionSendResult sendMessageInTransaction(Message msg,
Object arg) throws MQClientException {
throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
}
/**
* Create a topic on broker.
*
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.producer;
import org.apache.rocketmq.common.message.Message;
/**
* This interface will be removed in the version 5.0.0, interface {@link TransactionListener} is recommended.
*/
@Deprecated
public interface LocalTransactionExecuter {
LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg);
}
......@@ -80,7 +80,11 @@ public interface MQProducer extends MQAdmin {
void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
throws MQClientException, RemotingException, InterruptedException;
TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException;
TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException;
//for batch
SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.producer;
import org.apache.rocketmq.common.message.MessageExt;
/**
* This interface will be removed in the version 5.0.0, interface {@link TransactionListener} is recommended.
*/
@Deprecated
public interface TransactionCheckListener {
LocalTransactionState checkLocalTransactionState(final MessageExt msg);
}
......@@ -16,17 +16,21 @@
*/
package org.apache.rocketmq.client.producer;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.concurrent.ExecutorService;
public class TransactionMQProducer extends DefaultMQProducer {
private TransactionListener transactionListener;
private TransactionCheckListener transactionCheckListener;
private int checkThreadPoolMinSize = 1;
private int checkThreadPoolMaxSize = 1;
private int checkRequestHoldMax = 2000;
private ExecutorService executorService;
private TransactionListener transactionListener;
public TransactionMQProducer() {
}
......@@ -50,21 +54,77 @@ public class TransactionMQProducer extends DefaultMQProducer {
this.defaultMQProducerImpl.destroyTransactionEnv();
}
/**
* This method will be removed in the version 5.0.0, method <code>sendMessageInTransaction(Message,Object)</code>}
* is recommended.
*/
@Override
@Deprecated
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException {
if (null == this.transactionCheckListener) {
throw new MQClientException("localTransactionBranchCheckListener is null", null);
}
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);
}
@Override
public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException {
public TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, transactionListener, arg);
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
public TransactionListener getTransactionListener() {
return transactionListener;
public TransactionCheckListener getTransactionCheckListener() {
return transactionCheckListener;
}
public void setTransactionListener(TransactionListener transactionListener) {
this.transactionListener = transactionListener;
/**
* This method will be removed in the version 5.0.0 and set a custom thread pool is recommended.
*/
@Deprecated
public void setTransactionCheckListener(TransactionCheckListener transactionCheckListener) {
this.transactionCheckListener = transactionCheckListener;
}
public int getCheckThreadPoolMinSize() {
return checkThreadPoolMinSize;
}
/**
* This method will be removed in the version 5.0.0 and set a custom thread pool is recommended.
*/
@Deprecated
public void setCheckThreadPoolMinSize(int checkThreadPoolMinSize) {
this.checkThreadPoolMinSize = checkThreadPoolMinSize;
}
public int getCheckThreadPoolMaxSize() {
return checkThreadPoolMaxSize;
}
/**
* This method will be removed in the version 5.0.0 and set a custom thread pool is recommended.
*/
@Deprecated
public void setCheckThreadPoolMaxSize(int checkThreadPoolMaxSize) {
this.checkThreadPoolMaxSize = checkThreadPoolMaxSize;
}
public int getCheckRequestHoldMax() {
return checkRequestHoldMax;
}
/**
* This method will be removed in the version 5.0.0 and set a custom thread pool is recommended.
*/
@Deprecated
public void setCheckRequestHoldMax(int checkRequestHoldMax) {
this.checkRequestHoldMax = checkRequestHoldMax;
}
public ExecutorService getExecutorService() {
......@@ -74,4 +134,12 @@ public class TransactionMQProducer extends DefaultMQProducer {
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
public TransactionListener getTransactionListener() {
return transactionListener;
}
public void setTransactionListener(TransactionListener transactionListener) {
this.transactionListener = transactionListener;
}
}
......@@ -157,7 +157,7 @@ public class BrokerConfig {
* that can be checked.
*/
@ImportantField
private long transactionTimeOut = 3 * 1000;
private long transactionTimeOut = 6 * 1000;
/**
* The maximum number of times the message was checked, if exceed this value, this message will be discarded.
......
......@@ -57,7 +57,7 @@ public class MixAll {
public static final String WS_DOMAIN_SUBGROUP = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
//http://jmenv.tbsite.net:8080/rocketmq/nsaddr
//public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP;
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "AUTO_CREATE_TOPIC_KEY"; // Will be created at broker when isAutoCreateTopicEnable
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102"; // Will be created at broker when isAutoCreateTopicEnable
public static final String BENCHMARK_TOPIC = "BenchmarkTest";
public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER";
public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER";
......
......@@ -17,15 +17,6 @@
package org.apache.rocketmq.example.benchmark;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.LinkedList;
import java.util.Timer;
......@@ -33,18 +24,27 @@ import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class TransactionProducer {
private static int threadCount;
private static int messageSize;
private static boolean isCheck;
private static boolean isCheckFalse;
private static boolean ischeck;
private static boolean ischeckffalse;
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 32;
messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2;
isCheck = args.length >= 3 && Boolean.parseBoolean(args[2]);
isCheckFalse = args.length >= 4 && Boolean.parseBoolean(args[3]);
ischeck = args.length >= 3 && Boolean.parseBoolean(args[2]);
ischeckffalse = args.length >= 4 && Boolean.parseBoolean(args[3]);
final Message msg = buildMessage(messageSize);
......@@ -73,8 +73,8 @@ public class TransactionProducer {
Long[] end = snapshotList.getLast();
final long sendTps =
(long)(((end[3] - begin[3]) / (double)(end[0] - begin[0])) * 1000L);
final double averageRT = (end[5] - begin[5]) / (double)(end[3] - begin[3]);
(long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L);
final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
System.out.printf(
"Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n",
......@@ -92,14 +92,16 @@ public class TransactionProducer {
}
}, 10000, 10000);
final TransactionListener transactionListener =
new TransactionListenerImpl(isCheckFalse, isCheck, statsBenchmark);
final TransactionCheckListener transactionCheckListener =
new TransactionCheckListenerBImpl(ischeckffalse, statsBenchmark);
final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer");
producer.setInstanceName(Long.toString(System.currentTimeMillis()));
producer.setTransactionListener(transactionListener);
producer.setTransactionCheckListener(transactionCheckListener);
producer.setDefaultTopicQueueNums(1000);
producer.start();
final TransactionExecuterBImpl tranExecuter = new TransactionExecuterBImpl(ischeck);
for (int i = 0; i < threadCount; i++) {
sendThreadPool.execute(new Runnable() {
@Override
......@@ -109,7 +111,7 @@ public class TransactionProducer {
// Thread.sleep(1000);
final long beginTimestamp = System.currentTimeMillis();
SendResult sendResult =
producer.sendMessageInTransaction(msg, null);
producer.sendMessageInTransaction(msg, tranExecuter, null);
if (sendResult != null) {
statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet();
......@@ -122,7 +124,8 @@ public class TransactionProducer {
boolean updated =
statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT,
currentRT);
if (updated) { break; }
if (updated)
break;
prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
}
......@@ -150,37 +153,43 @@ public class TransactionProducer {
}
}
class TransactionExecuterBImpl implements LocalTransactionExecuter {
class TransactionListenerImpl implements TransactionListener {
private boolean isCheckFalse;
private boolean ischeck;
public TransactionExecuterBImpl(boolean ischeck) {
this.ischeck = ischeck;
}
@Override
public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
if (ischeck) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
class TransactionCheckListenerBImpl implements TransactionCheckListener {
private boolean ischeckffalse;
private StatsBenchmarkTProducer statsBenchmarkTProducer;
private boolean isCheckLocal;
public TransactionListenerImpl(boolean isCheckFalse, boolean isCheckLocal,
public TransactionCheckListenerBImpl(boolean ischeckffalse,
StatsBenchmarkTProducer statsBenchmarkTProducer) {
this.isCheckFalse = isCheckFalse;
this.isCheckLocal = isCheckLocal;
this.ischeckffalse = ischeckffalse;
this.statsBenchmarkTProducer = statsBenchmarkTProducer;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
statsBenchmarkTProducer.getCheckRequestSuccessCount().incrementAndGet();
if (isCheckFalse) {
if (ischeckffalse) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState executeLocalTransaction(final Message msg, final Object arg) {
if (isCheckLocal) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
class StatsBenchmarkTProducer {
......
......@@ -48,6 +48,8 @@ public class TransactionListenerImpl implements TransactionListener {
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册