提交 a220364b 编写于 作者: D duhengforever

Merge branch 'develop'

...@@ -100,7 +100,6 @@ import org.apache.rocketmq.store.config.MessageStoreConfig; ...@@ -100,7 +100,6 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStats; import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.stats.BrokerStatsManager;
public class BrokerController { public class BrokerController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private static final InternalLogger LOG_PROTECTION = InternalLoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME); private static final InternalLogger LOG_PROTECTION = InternalLoggerFactory.getLogger(LoggerName.PROTECTION_LOGGER_NAME);
...@@ -131,6 +130,7 @@ public class BrokerController { ...@@ -131,6 +130,7 @@ public class BrokerController {
private final BlockingQueue<Runnable> clientManagerThreadPoolQueue; private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
private final BlockingQueue<Runnable> heartbeatThreadPoolQueue; private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;
private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue; private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
private final BlockingQueue<Runnable> endTransactionThreadPoolQueue;
private final FilterServerManager filterServerManager; private final FilterServerManager filterServerManager;
private final BrokerStatsManager brokerStatsManager; private final BrokerStatsManager brokerStatsManager;
private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>(); private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
...@@ -146,6 +146,7 @@ public class BrokerController { ...@@ -146,6 +146,7 @@ public class BrokerController {
private ExecutorService clientManageExecutor; private ExecutorService clientManageExecutor;
private ExecutorService heartbeatExecutor; private ExecutorService heartbeatExecutor;
private ExecutorService consumerManageExecutor; private ExecutorService consumerManageExecutor;
private ExecutorService endTransactionExecutor;
private boolean updateMasterHAServerAddrPeriodically = false; private boolean updateMasterHAServerAddrPeriodically = false;
private BrokerStats brokerStats; private BrokerStats brokerStats;
private InetSocketAddress storeHost; private InetSocketAddress storeHost;
...@@ -189,6 +190,7 @@ public class BrokerController { ...@@ -189,6 +190,7 @@ public class BrokerController {
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity()); this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity()); this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity()); this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName()); this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort())); this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
...@@ -289,8 +291,15 @@ public class BrokerController { ...@@ -289,8 +291,15 @@ public class BrokerController {
1000 * 60, 1000 * 60,
TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS,
this.heartbeatThreadPoolQueue, this.heartbeatThreadPoolQueue,
new ThreadFactoryImpl("HeartbeatThread_",true)); new ThreadFactoryImpl("HeartbeatThread_", true));
this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getEndTransactionThreadPoolNums(),
this.brokerConfig.getEndTransactionThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.endTransactionThreadPoolQueue,
new ThreadFactoryImpl("EndTransactionThread_"));
this.consumerManageExecutor = this.consumerManageExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl( Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
...@@ -536,8 +545,8 @@ public class BrokerController { ...@@ -536,8 +545,8 @@ public class BrokerController {
/** /**
* EndTransactionProcessor * EndTransactionProcessor
*/ */
this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor); this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor); this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
/** /**
* Default * Default
...@@ -598,10 +607,15 @@ public class BrokerController { ...@@ -598,10 +607,15 @@ public class BrokerController {
return this.headSlowTimeMills(this.queryThreadPoolQueue); return this.headSlowTimeMills(this.queryThreadPoolQueue);
} }
public long headSlowTimeMills4EndTransactionThreadPoolQueue() {
return this.headSlowTimeMills(this.endTransactionThreadPoolQueue);
}
public void printWaterMark() { public void printWaterMark() {
LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue()); LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());
LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue()); LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());
LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: {}", this.queryThreadPoolQueue.size(), headSlowTimeMills4QueryThreadPoolQueue()); LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: {}", this.queryThreadPoolQueue.size(), headSlowTimeMills4QueryThreadPoolQueue());
LOG_WATER_MARK.info("[WATERMARK] Transaction Queue Size: {} SlowTimeMills: {}", this.endTransactionThreadPoolQueue.size(), headSlowTimeMills4EndTransactionThreadPoolQueue());
} }
public MessageStore getMessageStore() { public MessageStore getMessageStore() {
...@@ -741,6 +755,13 @@ public class BrokerController { ...@@ -741,6 +755,13 @@ public class BrokerController {
if (this.fileWatchService != null) { if (this.fileWatchService != null) {
this.fileWatchService.shutdown(); this.fileWatchService.shutdown();
} }
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.shutdown(false);
}
if (this.endTransactionExecutor != null) {
this.endTransactionExecutor.shutdown();
}
} }
private void unregisterBrokerAll() { private void unregisterBrokerAll() {
...@@ -1027,4 +1048,8 @@ public class BrokerController { ...@@ -1027,4 +1048,8 @@ public class BrokerController {
AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) { AbstractTransactionalMessageCheckListener transactionalMessageCheckListener) {
this.transactionalMessageCheckListener = transactionalMessageCheckListener; this.transactionalMessageCheckListener = transactionalMessageCheckListener;
} }
public BlockingQueue<Runnable> getEndTransactionThreadPoolQueue() {
return endTransactionThreadPoolQueue;
}
} }
...@@ -92,6 +92,9 @@ public class BrokerFastFailure { ...@@ -92,6 +92,9 @@ public class BrokerFastFailure {
cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(), cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue()); this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());
cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this
.brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());
} }
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) { void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
......
...@@ -1210,6 +1210,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { ...@@ -1210,6 +1210,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
runtimeInfo.put("queryThreadPoolQueueCapacity", runtimeInfo.put("queryThreadPoolQueueCapacity",
String.valueOf(this.brokerController.getBrokerConfig().getQueryThreadPoolQueueCapacity())); String.valueOf(this.brokerController.getBrokerConfig().getQueryThreadPoolQueueCapacity()));
runtimeInfo.put("EndTransactionQueueSize", String.valueOf(this.brokerController.getEndTransactionThreadPoolQueue().size()));
runtimeInfo.put("EndTransactionThreadPoolQueueCapacity",
String.valueOf(this.brokerController.getBrokerConfig().getEndTransactionPoolQueueCapacity()));
runtimeInfo.put("dispatchBehindBytes", String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes())); runtimeInfo.put("dispatchBehindBytes", String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes()));
runtimeInfo.put("pageCacheLockTimeMills", String.valueOf(this.brokerController.getMessageStore().lockTimeMills())); runtimeInfo.put("pageCacheLockTimeMills", String.valueOf(this.brokerController.getMessageStore().lockTimeMills()));
......
...@@ -125,6 +125,7 @@ public class ServiceProvider { ...@@ -125,6 +125,7 @@ public class ServiceProvider {
public static <T> T loadClass(String name, Class<?> clazz) { public static <T> T loadClass(String name, Class<?> clazz) {
final InputStream is = getResourceAsStream(getContextClassLoader(), name); final InputStream is = getResourceAsStream(getContextClassLoader(), name);
if (is != null) {
BufferedReader reader; BufferedReader reader;
try { try {
try { try {
...@@ -141,7 +142,8 @@ public class ServiceProvider { ...@@ -141,7 +142,8 @@ public class ServiceProvider {
return null; return null;
} }
} catch (Exception e) { } catch (Exception e) {
LOG.error("Error occured when looking for resource file " + name, e); LOG.warn("Error occurred when looking for resource file " + name, e);
}
} }
return null; return null;
} }
......
...@@ -16,6 +16,22 @@ ...@@ -16,6 +16,22 @@
*/ */
package org.apache.rocketmq.client.impl.producer; package org.apache.rocketmq.client.impl.producer;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.common.ClientErrorCode; import org.apache.rocketmq.client.common.ClientErrorCode;
...@@ -31,11 +47,13 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance; ...@@ -31,11 +47,13 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.latency.MQFaultStrategy; import org.apache.rocketmq.client.latency.MQFaultStrategy;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.client.producer.TransactionSendResult;
...@@ -65,23 +83,6 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException; ...@@ -65,23 +83,6 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import java.util.concurrent.RejectedExecutionException;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class DefaultMQProducerImpl implements MQProducerInner { public class DefaultMQProducerImpl implements MQProducerInner {
private final InternalLogger log = ClientLogger.getLog(); private final InternalLogger log = ClientLogger.getLog();
...@@ -120,10 +121,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -120,10 +121,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (producer.getExecutorService() != null) { if (producer.getExecutorService() != null) {
this.checkExecutor = producer.getExecutorService(); this.checkExecutor = producer.getExecutorService();
} else { } else {
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(2000); this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
this.checkExecutor = new ThreadPoolExecutor( this.checkExecutor = new ThreadPoolExecutor(
1, producer.getCheckThreadPoolMinSize(),
1, producer.getCheckThreadPoolMaxSize(),
1000 * 60, 1000 * 60,
TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS,
this.checkRequestQueue); this.checkRequestQueue);
...@@ -131,8 +132,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -131,8 +132,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
} }
public void destroyTransactionEnv() { public void destroyTransactionEnv() {
if (this.checkExecutor != null) {
this.checkExecutor.shutdown(); this.checkExecutor.shutdown();
this.checkRequestQueue.clear(); }
} }
public void registerSendMessageHook(final SendMessageHook hook) { public void registerSendMessageHook(final SendMessageHook hook) {
...@@ -243,16 +245,30 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -243,16 +245,30 @@ public class DefaultMQProducerImpl implements MQProducerInner {
return null == prev || !prev.ok(); return null == prev || !prev.ok();
} }
/**
* This method will be removed in the version 5.0.0 and <code>getCheckListener</code> is recommended.
* @return
*/
@Override @Override
public TransactionListener checkListener() { @Deprecated
public TransactionCheckListener checkListener() {
if (this.defaultMQProducer instanceof TransactionMQProducer) { if (this.defaultMQProducer instanceof TransactionMQProducer) {
TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer; TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer;
return producer.getTransactionListener(); return producer.getTransactionCheckListener();
} }
return null; return null;
} }
@Override
public TransactionListener getCheckListener() {
if (this.defaultMQProducer instanceof TransactionMQProducer) {
TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer;
return producer.getTransactionListener();
}
return null;
}
@Override @Override
public void checkTransactionState(final String addr, final MessageExt msg, public void checkTransactionState(final String addr, final MessageExt msg,
final CheckTransactionStateRequestHeader header) { final CheckTransactionStateRequestHeader header) {
...@@ -264,12 +280,20 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -264,12 +280,20 @@ public class DefaultMQProducerImpl implements MQProducerInner {
@Override @Override
public void run() { public void run() {
TransactionListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener(); TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
if (transactionCheckListener != null) { TransactionListener transactionListener = getCheckListener();
if (transactionCheckListener != null || transactionListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null; Throwable exception = null;
try { try {
localTransactionState = transactionCheckListener.checkLocalTransaction(message); if (transactionCheckListener != null) {
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} else if (transactionListener != null) {
log.debug("Used new check API in transaction message");
localTransactionState = transactionListener.checkLocalTransaction(message);
} else {
log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
}
} catch (Throwable e) { } catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e); log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
exception = e; exception = e;
...@@ -280,7 +304,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -280,7 +304,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
group, group,
exception); exception);
} else { } else {
log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group); log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
} }
} }
...@@ -1096,9 +1120,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1096,9 +1120,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
} }
public TransactionSendResult sendMessageInTransaction(final Message msg, public TransactionSendResult sendMessageInTransaction(final Message msg,
final TransactionListener tranExecuter, final Object arg) final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException { throws MQClientException {
if (null == tranExecuter) { TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null); throw new MQClientException("tranExecutor is null", null);
} }
Validators.checkMessage(msg, this.defaultMQProducer); Validators.checkMessage(msg, this.defaultMQProducer);
...@@ -1124,7 +1149,12 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1124,7 +1149,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (null != transactionId && !"".equals(transactionId)) { if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId); msg.setTransactionId(transactionId);
} }
localTransactionState = tranExecuter.executeLocalTransaction(msg, arg); if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) { if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW; localTransactionState = LocalTransactionState.UNKNOW;
} }
......
...@@ -16,18 +16,19 @@ ...@@ -16,18 +16,19 @@
*/ */
package org.apache.rocketmq.client.impl.producer; package org.apache.rocketmq.client.impl.producer;
import java.util.Set;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader; import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
import java.util.Set;
public interface MQProducerInner { public interface MQProducerInner {
Set<String> getPublishTopicList(); Set<String> getPublishTopicList();
boolean isPublishTopicNeedUpdate(final String topic); boolean isPublishTopicNeedUpdate(final String topic);
TransactionListener checkListener(); TransactionCheckListener checkListener();
TransactionListener getCheckListener();
void checkTransactionState( void checkTransactionState(
final String addr, final String addr,
......
...@@ -464,16 +464,31 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -464,16 +464,31 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* This method is to send transactional messages. * This method is to send transactional messages.
* *
* @param msg Transactional message to send. * @param msg Transactional message to send.
* @param tranExecuter local transaction executor.
* @param arg Argument used along with local transaction executor. * @param arg Argument used along with local transaction executor.
* @return Transaction result. * @return Transaction result.
* @throws MQClientException if there is any client error. * @throws MQClientException if there is any client error.
*/ */
@Override @Override
public TransactionSendResult sendMessageInTransaction(Message msg, final Object arg) public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter,
final Object arg)
throws MQClientException { throws MQClientException {
throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class"); throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
} }
/**
* This method is used to send transactional messages.
* @param msg Transactional message to send.
* @param arg Argument used along with local transaction executor.
* @return Transaction result.
* @throws MQClientException
*/
@Override
public TransactionSendResult sendMessageInTransaction(Message msg,
Object arg) throws MQClientException {
throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
}
/** /**
* Create a topic on broker. * Create a topic on broker.
* *
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.producer;
import org.apache.rocketmq.common.message.Message;
/**
* This interface will be removed in the version 5.0.0, interface {@link TransactionListener} is recommended.
*/
@Deprecated
public interface LocalTransactionExecuter {
LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg);
}
...@@ -80,7 +80,11 @@ public interface MQProducer extends MQAdmin { ...@@ -80,7 +80,11 @@ public interface MQProducer extends MQAdmin {
void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg) void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
throws MQClientException, RemotingException, InterruptedException; throws MQClientException, RemotingException, InterruptedException;
TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException; TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException;
//for batch //for batch
SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.producer;
import org.apache.rocketmq.common.message.MessageExt;
/**
* This interface will be removed in the version 5.0.0, interface {@link TransactionListener} is recommended.
*/
@Deprecated
public interface TransactionCheckListener {
LocalTransactionState checkLocalTransactionState(final MessageExt msg);
}
...@@ -16,17 +16,21 @@ ...@@ -16,17 +16,21 @@
*/ */
package org.apache.rocketmq.client.producer; package org.apache.rocketmq.client.producer;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import java.util.concurrent.ExecutorService;
public class TransactionMQProducer extends DefaultMQProducer { public class TransactionMQProducer extends DefaultMQProducer {
private TransactionListener transactionListener; private TransactionCheckListener transactionCheckListener;
private int checkThreadPoolMinSize = 1;
private int checkThreadPoolMaxSize = 1;
private int checkRequestHoldMax = 2000;
private ExecutorService executorService; private ExecutorService executorService;
private TransactionListener transactionListener;
public TransactionMQProducer() { public TransactionMQProducer() {
} }
...@@ -50,21 +54,77 @@ public class TransactionMQProducer extends DefaultMQProducer { ...@@ -50,21 +54,77 @@ public class TransactionMQProducer extends DefaultMQProducer {
this.defaultMQProducerImpl.destroyTransactionEnv(); this.defaultMQProducerImpl.destroyTransactionEnv();
} }
/**
* This method will be removed in the version 5.0.0, method <code>sendMessageInTransaction(Message,Object)</code>}
* is recommended.
*/
@Override
@Deprecated
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException {
if (null == this.transactionCheckListener) {
throw new MQClientException("localTransactionBranchCheckListener is null", null);
}
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);
}
@Override @Override
public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException { public TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException {
if (null == this.transactionListener) { if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null); throw new MQClientException("TransactionListener is null", null);
} }
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, transactionListener, arg); return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
} }
public TransactionListener getTransactionListener() { public TransactionCheckListener getTransactionCheckListener() {
return transactionListener; return transactionCheckListener;
} }
public void setTransactionListener(TransactionListener transactionListener) { /**
this.transactionListener = transactionListener; * This method will be removed in the version 5.0.0 and set a custom thread pool is recommended.
*/
@Deprecated
public void setTransactionCheckListener(TransactionCheckListener transactionCheckListener) {
this.transactionCheckListener = transactionCheckListener;
}
public int getCheckThreadPoolMinSize() {
return checkThreadPoolMinSize;
}
/**
* This method will be removed in the version 5.0.0 and set a custom thread pool is recommended.
*/
@Deprecated
public void setCheckThreadPoolMinSize(int checkThreadPoolMinSize) {
this.checkThreadPoolMinSize = checkThreadPoolMinSize;
}
public int getCheckThreadPoolMaxSize() {
return checkThreadPoolMaxSize;
}
/**
* This method will be removed in the version 5.0.0 and set a custom thread pool is recommended.
*/
@Deprecated
public void setCheckThreadPoolMaxSize(int checkThreadPoolMaxSize) {
this.checkThreadPoolMaxSize = checkThreadPoolMaxSize;
}
public int getCheckRequestHoldMax() {
return checkRequestHoldMax;
}
/**
* This method will be removed in the version 5.0.0 and set a custom thread pool is recommended.
*/
@Deprecated
public void setCheckRequestHoldMax(int checkRequestHoldMax) {
this.checkRequestHoldMax = checkRequestHoldMax;
} }
public ExecutorService getExecutorService() { public ExecutorService getExecutorService() {
...@@ -74,4 +134,12 @@ public class TransactionMQProducer extends DefaultMQProducer { ...@@ -74,4 +134,12 @@ public class TransactionMQProducer extends DefaultMQProducer {
public void setExecutorService(ExecutorService executorService) { public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService; this.executorService = executorService;
} }
public TransactionListener getTransactionListener() {
return transactionListener;
}
public void setTransactionListener(TransactionListener transactionListener) {
this.transactionListener = transactionListener;
}
} }
...@@ -63,7 +63,12 @@ public class BrokerConfig { ...@@ -63,7 +63,12 @@ public class BrokerConfig {
private int adminBrokerThreadPoolNums = 16; private int adminBrokerThreadPoolNums = 16;
private int clientManageThreadPoolNums = 32; private int clientManageThreadPoolNums = 32;
private int consumerManageThreadPoolNums = 32; private int consumerManageThreadPoolNums = 32;
private int heartbeatThreadPoolNums = Math.min(32,Runtime.getRuntime().availableProcessors()); private int heartbeatThreadPoolNums = Math.min(32, Runtime.getRuntime().availableProcessors());
/**
* Thread numbers for EndTransactionProcessor
*/
private int endTransactionThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors() * 2;
private int flushConsumerOffsetInterval = 1000 * 5; private int flushConsumerOffsetInterval = 1000 * 5;
...@@ -79,6 +84,7 @@ public class BrokerConfig { ...@@ -79,6 +84,7 @@ public class BrokerConfig {
private int clientManagerThreadPoolQueueCapacity = 1000000; private int clientManagerThreadPoolQueueCapacity = 1000000;
private int consumerManagerThreadPoolQueueCapacity = 1000000; private int consumerManagerThreadPoolQueueCapacity = 1000000;
private int heartbeatThreadPoolQueueCapacity = 50000; private int heartbeatThreadPoolQueueCapacity = 50000;
private int endTransactionPoolQueueCapacity = 100000;
private int filterServerNums = 0; private int filterServerNums = 0;
...@@ -111,6 +117,7 @@ public class BrokerConfig { ...@@ -111,6 +117,7 @@ public class BrokerConfig {
private long waitTimeMillsInSendQueue = 200; private long waitTimeMillsInSendQueue = 200;
private long waitTimeMillsInPullQueue = 5 * 1000; private long waitTimeMillsInPullQueue = 5 * 1000;
private long waitTimeMillsInHeartbeatQueue = 31 * 1000; private long waitTimeMillsInHeartbeatQueue = 31 * 1000;
private long waitTimeMillsInTransactionQueue = 3 * 1000;
private long startAcceptSendRequestTimeStamp = 0L; private long startAcceptSendRequestTimeStamp = 0L;
...@@ -150,13 +157,13 @@ public class BrokerConfig { ...@@ -150,13 +157,13 @@ public class BrokerConfig {
* that can be checked. * that can be checked.
*/ */
@ImportantField @ImportantField
private long transactionTimeOut = 3 * 1000; private long transactionTimeOut = 6 * 1000;
/** /**
* The maximum number of times the message was checked, if exceed this value, this message will be discarded. * The maximum number of times the message was checked, if exceed this value, this message will be discarded.
*/ */
@ImportantField @ImportantField
private int transactionCheckMax = 5; private int transactionCheckMax = 15;
/** /**
* Transaction message check interval. * Transaction message check interval.
...@@ -701,4 +708,28 @@ public class BrokerConfig { ...@@ -701,4 +708,28 @@ public class BrokerConfig {
public void setTransactionCheckInterval(long transactionCheckInterval) { public void setTransactionCheckInterval(long transactionCheckInterval) {
this.transactionCheckInterval = transactionCheckInterval; this.transactionCheckInterval = transactionCheckInterval;
} }
public int getEndTransactionThreadPoolNums() {
return endTransactionThreadPoolNums;
}
public void setEndTransactionThreadPoolNums(int endTransactionThreadPoolNums) {
this.endTransactionThreadPoolNums = endTransactionThreadPoolNums;
}
public int getEndTransactionPoolQueueCapacity() {
return endTransactionPoolQueueCapacity;
}
public void setEndTransactionPoolQueueCapacity(int endTransactionPoolQueueCapacity) {
this.endTransactionPoolQueueCapacity = endTransactionPoolQueueCapacity;
}
public long getWaitTimeMillsInTransactionQueue() {
return waitTimeMillsInTransactionQueue;
}
public void setWaitTimeMillsInTransactionQueue(long waitTimeMillsInTransactionQueue) {
this.waitTimeMillsInTransactionQueue = waitTimeMillsInTransactionQueue;
}
} }
...@@ -18,7 +18,7 @@ package org.apache.rocketmq.common; ...@@ -18,7 +18,7 @@ package org.apache.rocketmq.common;
public class MQVersion { public class MQVersion {
public static final int CURRENT_VERSION = Version.V4_3_0.ordinal(); public static final int CURRENT_VERSION = Version.V4_3_1.ordinal();
public static String getVersionDesc(int value) { public static String getVersionDesc(int value) {
int length = Version.values().length; int length = Version.values().length;
......
...@@ -57,7 +57,7 @@ public class MixAll { ...@@ -57,7 +57,7 @@ public class MixAll {
public static final String WS_DOMAIN_SUBGROUP = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr"); public static final String WS_DOMAIN_SUBGROUP = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
//http://jmenv.tbsite.net:8080/rocketmq/nsaddr //http://jmenv.tbsite.net:8080/rocketmq/nsaddr
//public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP; //public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP;
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "AUTO_CREATE_TOPIC_KEY"; // Will be created at broker when isAutoCreateTopicEnable public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102"; // Will be created at broker when isAutoCreateTopicEnable
public static final String BENCHMARK_TOPIC = "BenchmarkTest"; public static final String BENCHMARK_TOPIC = "BenchmarkTest";
public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER"; public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER";
public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER"; public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER";
......
#!/bin/sh
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
if [ -z "$ROCKETMQ_HOME" ] ; then
## resolve links - $0 may be a link to maven's home
PRG="$0"
# need this for relative symlinks
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG="`dirname "$PRG"`/$link"
fi
done
saveddir=`pwd`
ROCKETMQ_HOME=`dirname "$PRG"`/..
# make it fully qualified
ROCKETMQ_HOME=`cd "$ROCKETMQ_HOME" && pwd`
cd "$saveddir"
fi
export ROCKETMQ_HOME
sh ${ROCKETMQ_HOME}/bin/runserver.sh org.apache.rocketmq.filtersrv.FiltersrvStartup $@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
if not exist "%ROCKETMQ_HOME%\bin\runbroker.cmd" echo Please set the ROCKETMQ_HOME variable in your environment! & EXIT /B 1
call "%ROCKETMQ_HOME%\bin\runserver.cmd" org.apache.rocketmq.filtersrv.FiltersrvStartup %*
IF %ERRORLEVEL% EQU 0 (
ECHO "Filtersrv starts OK"
)
\ No newline at end of file
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<java>
<debug>false</debug>
<javahome>${JAVA_HOME}</javahome>
<jvmtype>server</jvmtype>
<mainclass>org.apache.rocketmq.filtersrv.FiltersrvStartup</mainclass>
<properties>
<java.ext.dirs>${cpd}/../lib</java.ext.dirs>
<rocketmq.home.dir>${cpd}/..</rocketmq.home.dir>
</properties>
<classpaths>
</classpaths>
<options>
<-Xms512m>
</-Xms512m>
<-Xmx1g>
</-Xmx1g>
<-XX:NewSize>256M</-XX:NewSize>
<-XX:MaxNewSize>512M</-XX:MaxNewSize>
<-XX:PermSize>128M</-XX:PermSize>
<-XX:MaxPermSize>128M</-XX:MaxPermSize>
</options>
</java>
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<appender name="DefaultAppender"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${user.home}/logs/rocketmqlogs/filtersrv_default.log</file>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>${user.home}/logs/rocketmqlogs/otherdays/filtersrv_default.%i.log.gz</fileNamePattern>
<minIndex>1</minIndex>
<maxIndex>5</maxIndex>
</rollingPolicy>
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<encoder>
<pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
<charset class="java.nio.charset.Charset">UTF-8</charset>
</encoder>
</appender>
<appender name="RocketmqFiltersrvAppender_inner"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${user.home}/logs/rocketmqlogs/filtersrv.log</file>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>${user.home}/logs/rocketmqlogs/otherdays/filtersrv.%i.log.gz</fileNamePattern>
<minIndex>1</minIndex>
<maxIndex>5</maxIndex>
</rollingPolicy>
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<encoder>
<pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
<charset class="java.nio.charset.Charset">UTF-8</charset>
</encoder>
</appender>
<appender name="RocketmqFiltersrvAppender" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="RocketmqFiltersrvAppender_inner"/>
<discardingThreshold>0</discardingThreshold>
</appender>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<append>true</append>
<encoder>
<pattern>%d{yyy-MM-dd HH\:mm\:ss,SSS} %p %t - %m%n</pattern>
<charset class="java.nio.charset.Charset">UTF-8</charset>
</encoder>
</appender>
<logger name="RocketmqFiltersrv" additivity="false">
<level value="INFO"/>
<appender-ref ref="RocketmqFiltersrvAppender"/>
</logger>
<logger name="RocketmqCommon" additivity="false">
<level value="INFO"/>
<appender-ref ref="RocketmqFiltersrvAppender"/>
</logger>
<logger name="RocketmqRemoting" additivity="false">
<level value="INFO"/>
<appender-ref ref="RocketmqFiltersrvAppender"/>
</logger>
<root>
<level value="INFO"/>
<appender-ref ref="DefaultAppender"/>
</root>
</configuration>
...@@ -17,15 +17,6 @@ ...@@ -17,15 +17,6 @@
package org.apache.rocketmq.example.benchmark; package org.apache.rocketmq.example.benchmark;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Timer; import java.util.Timer;
...@@ -33,18 +24,27 @@ import java.util.TimerTask; ...@@ -33,18 +24,27 @@ import java.util.TimerTask;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class TransactionProducer { public class TransactionProducer {
private static int threadCount; private static int threadCount;
private static int messageSize; private static int messageSize;
private static boolean isCheck; private static boolean ischeck;
private static boolean isCheckFalse; private static boolean ischeckffalse;
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 32; threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 32;
messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2; messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2;
isCheck = args.length >= 3 && Boolean.parseBoolean(args[2]); ischeck = args.length >= 3 && Boolean.parseBoolean(args[2]);
isCheckFalse = args.length >= 4 && Boolean.parseBoolean(args[3]); ischeckffalse = args.length >= 4 && Boolean.parseBoolean(args[3]);
final Message msg = buildMessage(messageSize); final Message msg = buildMessage(messageSize);
...@@ -73,8 +73,8 @@ public class TransactionProducer { ...@@ -73,8 +73,8 @@ public class TransactionProducer {
Long[] end = snapshotList.getLast(); Long[] end = snapshotList.getLast();
final long sendTps = final long sendTps =
(long)(((end[3] - begin[3]) / (double)(end[0] - begin[0])) * 1000L); (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L);
final double averageRT = (end[5] - begin[5]) / (double)(end[3] - begin[3]); final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
System.out.printf( System.out.printf(
"Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n", "Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n",
...@@ -92,14 +92,16 @@ public class TransactionProducer { ...@@ -92,14 +92,16 @@ public class TransactionProducer {
} }
}, 10000, 10000); }, 10000, 10000);
final TransactionListener transactionListener = final TransactionCheckListener transactionCheckListener =
new TransactionListenerImpl(isCheckFalse, isCheck, statsBenchmark); new TransactionCheckListenerBImpl(ischeckffalse, statsBenchmark);
final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer"); final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer");
producer.setInstanceName(Long.toString(System.currentTimeMillis())); producer.setInstanceName(Long.toString(System.currentTimeMillis()));
producer.setTransactionListener(transactionListener); producer.setTransactionCheckListener(transactionCheckListener);
producer.setDefaultTopicQueueNums(1000); producer.setDefaultTopicQueueNums(1000);
producer.start(); producer.start();
final TransactionExecuterBImpl tranExecuter = new TransactionExecuterBImpl(ischeck);
for (int i = 0; i < threadCount; i++) { for (int i = 0; i < threadCount; i++) {
sendThreadPool.execute(new Runnable() { sendThreadPool.execute(new Runnable() {
@Override @Override
...@@ -109,7 +111,7 @@ public class TransactionProducer { ...@@ -109,7 +111,7 @@ public class TransactionProducer {
// Thread.sleep(1000); // Thread.sleep(1000);
final long beginTimestamp = System.currentTimeMillis(); final long beginTimestamp = System.currentTimeMillis();
SendResult sendResult = SendResult sendResult =
producer.sendMessageInTransaction(msg, null); producer.sendMessageInTransaction(msg, tranExecuter, null);
if (sendResult != null) { if (sendResult != null) {
statsBenchmark.getSendRequestSuccessCount().incrementAndGet(); statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet(); statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet();
...@@ -122,7 +124,8 @@ public class TransactionProducer { ...@@ -122,7 +124,8 @@ public class TransactionProducer {
boolean updated = boolean updated =
statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT, statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT,
currentRT); currentRT);
if (updated) { break; } if (updated)
break;
prevMaxRT = statsBenchmark.getSendMessageMaxRT().get(); prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
} }
...@@ -150,37 +153,43 @@ public class TransactionProducer { ...@@ -150,37 +153,43 @@ public class TransactionProducer {
} }
} }
class TransactionExecuterBImpl implements LocalTransactionExecuter {
class TransactionListenerImpl implements TransactionListener { private boolean ischeck;
private boolean isCheckFalse;
public TransactionExecuterBImpl(boolean ischeck) {
this.ischeck = ischeck;
}
@Override
public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
if (ischeck) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
class TransactionCheckListenerBImpl implements TransactionCheckListener {
private boolean ischeckffalse;
private StatsBenchmarkTProducer statsBenchmarkTProducer; private StatsBenchmarkTProducer statsBenchmarkTProducer;
private boolean isCheckLocal;
public TransactionListenerImpl(boolean isCheckFalse, boolean isCheckLocal, public TransactionCheckListenerBImpl(boolean ischeckffalse,
StatsBenchmarkTProducer statsBenchmarkTProducer) { StatsBenchmarkTProducer statsBenchmarkTProducer) {
this.isCheckFalse = isCheckFalse; this.ischeckffalse = ischeckffalse;
this.isCheckLocal = isCheckLocal;
this.statsBenchmarkTProducer = statsBenchmarkTProducer; this.statsBenchmarkTProducer = statsBenchmarkTProducer;
} }
@Override @Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) { public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
statsBenchmarkTProducer.getCheckRequestSuccessCount().incrementAndGet(); statsBenchmarkTProducer.getCheckRequestSuccessCount().incrementAndGet();
if (isCheckFalse) { if (ischeckffalse) {
return LocalTransactionState.ROLLBACK_MESSAGE; return LocalTransactionState.ROLLBACK_MESSAGE;
} }
return LocalTransactionState.COMMIT_MESSAGE; return LocalTransactionState.COMMIT_MESSAGE;
} }
@Override
public LocalTransactionState executeLocalTransaction(final Message msg, final Object arg) {
if (isCheckLocal) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
} }
class StatsBenchmarkTProducer { class StatsBenchmarkTProducer {
......
...@@ -48,6 +48,8 @@ public class TransactionListenerImpl implements TransactionListener { ...@@ -48,6 +48,8 @@ public class TransactionListenerImpl implements TransactionListener {
return LocalTransactionState.COMMIT_MESSAGE; return LocalTransactionState.COMMIT_MESSAGE;
case 2: case 2:
return LocalTransactionState.ROLLBACK_MESSAGE; return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
} }
} }
return LocalTransactionState.COMMIT_MESSAGE; return LocalTransactionState.COMMIT_MESSAGE;
......
...@@ -131,7 +131,7 @@ public class ConsumeMessageCommand implements SubCommand { ...@@ -131,7 +131,7 @@ public class ConsumeMessageCommand implements SubCommand {
try { try {
/* Group name must be set before consumer start */ /* Group name must be set before consumer start */
if (commandLine.hasOption('g')) { if (commandLine.hasOption('g')) {
String consumerGroup = commandLine.getOptionValue('b').trim(); String consumerGroup = commandLine.getOptionValue('g').trim();
defaultMQPullConsumer.setConsumerGroup(consumerGroup); defaultMQPullConsumer.setConsumerGroup(consumerGroup);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册