提交 62e65d63 编写于 作者: H hujie

ad unit test

......@@ -10,11 +10,11 @@ XX
XXXX
Follow this checklist to help us incorporate your contribution quickly and easily:
Follow this checklist to help us incorporate your contribution quickly and easily. Notice, `it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR`.
- [x] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/ROCKETMQ/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue.
- [ ] Format the pull request title like `[ROCKETMQ-XXX] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
- [ ] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
- [ ] Write necessary unit-test to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test).
- [ ] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test` to make sure integration-test pass.
- [x] Make sure there is a [Github issue](https://github.com/apache/rocketmq/issues) filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue.
- [x] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
- [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
- [x] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test).
- [x] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test` to make sure integration-test pass.
- [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas).
......@@ -137,6 +137,7 @@ public class BrokerController {
private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;
private final BlockingQueue<Runnable> consumerManagerThreadPoolQueue;
private final BlockingQueue<Runnable> endTransactionThreadPoolQueue;
private final FilterServerManager filterServerManager;
private final BrokerStatsManager brokerStatsManager;
private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
......@@ -152,6 +153,7 @@ public class BrokerController {
private ExecutorService clientManageExecutor;
private ExecutorService heartbeatExecutor;
private ExecutorService consumerManageExecutor;
private ExecutorService endTransactionExecutor;
private boolean updateMasterHAServerAddrPeriodically = false;
private BrokerStats brokerStats;
private InetSocketAddress storeHost;
......@@ -197,6 +199,7 @@ public class BrokerController {
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
......@@ -297,8 +300,19 @@ public class BrokerController {
1000 * 60,
TimeUnit.MILLISECONDS,
this.heartbeatThreadPoolQueue,
new ThreadFactoryImpl("HeartbeatThread_",true));
new ThreadFactoryImpl("HeartbeatThread_", true));
<<<<<<< HEAD
=======
this.endTransactionExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getEndTransactionThreadPoolNums(),
this.brokerConfig.getEndTransactionThreadPoolNums(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.endTransactionThreadPoolQueue,
new ThreadFactoryImpl("EndTransactionThread_"));
>>>>>>> 53a63460d3a1599a6c51058bb51a73746233022d
this.consumerManageExecutor =
Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(
"ConsumerManageThread_"));
......@@ -585,8 +599,8 @@ public class BrokerController {
/**
* EndTransactionProcessor
*/
this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.endTransactionExecutor);
/**
* Default
......@@ -647,10 +661,15 @@ public class BrokerController {
return this.headSlowTimeMills(this.queryThreadPoolQueue);
}
public long headSlowTimeMills4EndTransactionThreadPoolQueue() {
return this.headSlowTimeMills(this.endTransactionThreadPoolQueue);
}
public void printWaterMark() {
LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());
LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());
LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: {}", this.queryThreadPoolQueue.size(), headSlowTimeMills4QueryThreadPoolQueue());
LOG_WATER_MARK.info("[WATERMARK] Transaction Queue Size: {} SlowTimeMills: {}", this.endTransactionThreadPoolQueue.size(), headSlowTimeMills4EndTransactionThreadPoolQueue());
}
public MessageStore getMessageStore() {
......@@ -790,6 +809,13 @@ public class BrokerController {
if (this.fileWatchService != null) {
this.fileWatchService.shutdown();
}
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.shutdown(false);
}
if (this.endTransactionExecutor != null) {
this.endTransactionExecutor.shutdown();
}
}
private void unregisterBrokerAll() {
......@@ -1077,7 +1103,12 @@ public class BrokerController {
this.transactionalMessageCheckListener = transactionalMessageCheckListener;
}
<<<<<<< HEAD
public AclPlugController getAclPlugController() {
return this.aclPlugController;
=======
public BlockingQueue<Runnable> getEndTransactionThreadPoolQueue() {
return endTransactionThreadPoolQueue;
>>>>>>> 53a63460d3a1599a6c51058bb51a73746233022d
}
}
......@@ -92,6 +92,9 @@ public class BrokerFastFailure {
cleanExpiredRequestInQueue(this.brokerController.getHeartbeatThreadPoolQueue(),
this.brokerController.getBrokerConfig().getWaitTimeMillsInHeartbeatQueue());
cleanExpiredRequestInQueue(this.brokerController.getEndTransactionThreadPoolQueue(), this
.brokerController.getBrokerConfig().getWaitTimeMillsInTransactionQueue());
}
void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> blockingQueue, final long maxWaitTimeMillsInQueue) {
......
......@@ -1210,6 +1210,10 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
runtimeInfo.put("queryThreadPoolQueueCapacity",
String.valueOf(this.brokerController.getBrokerConfig().getQueryThreadPoolQueueCapacity()));
runtimeInfo.put("EndTransactionQueueSize", String.valueOf(this.brokerController.getEndTransactionThreadPoolQueue().size()));
runtimeInfo.put("EndTransactionThreadPoolQueueCapacity",
String.valueOf(this.brokerController.getBrokerConfig().getEndTransactionPoolQueueCapacity()));
runtimeInfo.put("dispatchBehindBytes", String.valueOf(this.brokerController.getMessageStore().dispatchBehindBytes()));
runtimeInfo.put("pageCacheLockTimeMills", String.valueOf(this.brokerController.getMessageStore().lockTimeMills()));
......
......@@ -125,6 +125,7 @@ public class ServiceProvider {
public static <T> T loadClass(String name, Class<?> clazz) {
final InputStream is = getResourceAsStream(getContextClassLoader(), name);
if (is != null) {
BufferedReader reader;
try {
try {
......@@ -141,7 +142,8 @@ public class ServiceProvider {
return null;
}
} catch (Exception e) {
LOG.error("Error occured when looking for resource file " + name, e);
LOG.warn("Error occurred when looking for resource file " + name, e);
}
}
return null;
}
......
......@@ -16,6 +16,22 @@
*/
package org.apache.rocketmq.client.impl.producer;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.common.ClientErrorCode;
......@@ -31,11 +47,13 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.latency.MQFaultStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
......@@ -65,23 +83,6 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import java.util.concurrent.RejectedExecutionException;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class DefaultMQProducerImpl implements MQProducerInner {
private final InternalLogger log = ClientLogger.getLog();
......@@ -120,10 +121,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (producer.getExecutorService() != null) {
this.checkExecutor = producer.getExecutorService();
} else {
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(2000);
this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax());
this.checkExecutor = new ThreadPoolExecutor(
1,
1,
producer.getCheckThreadPoolMinSize(),
producer.getCheckThreadPoolMaxSize(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.checkRequestQueue);
......@@ -131,8 +132,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public void destroyTransactionEnv() {
if (this.checkExecutor != null) {
this.checkExecutor.shutdown();
this.checkRequestQueue.clear();
}
}
public void registerSendMessageHook(final SendMessageHook hook) {
......@@ -243,16 +245,30 @@ public class DefaultMQProducerImpl implements MQProducerInner {
return null == prev || !prev.ok();
}
/**
* This method will be removed in the version 5.0.0 and <code>getCheckListener</code> is recommended.
* @return
*/
@Override
public TransactionListener checkListener() {
@Deprecated
public TransactionCheckListener checkListener() {
if (this.defaultMQProducer instanceof TransactionMQProducer) {
TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer;
return producer.getTransactionListener();
return producer.getTransactionCheckListener();
}
return null;
}
@Override
public TransactionListener getCheckListener() {
if (this.defaultMQProducer instanceof TransactionMQProducer) {
TransactionMQProducer producer = (TransactionMQProducer) defaultMQProducer;
return producer.getTransactionListener();
}
return null;
}
@Override
public void checkTransactionState(final String addr, final MessageExt msg,
final CheckTransactionStateRequestHeader header) {
......@@ -264,12 +280,20 @@ public class DefaultMQProducerImpl implements MQProducerInner {
@Override
public void run() {
TransactionListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
if (transactionCheckListener != null) {
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
TransactionListener transactionListener = getCheckListener();
if (transactionCheckListener != null || transactionListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
localTransactionState = transactionCheckListener.checkLocalTransaction(message);
if (transactionCheckListener != null) {
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} else if (transactionListener != null) {
log.debug("Used new check API in transaction message");
localTransactionState = transactionListener.checkLocalTransaction(message);
} else {
log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
}
} catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
exception = e;
......@@ -280,7 +304,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
group,
exception);
} else {
log.warn("checkTransactionState, pick transactionCheckListener by group[{}] failed", group);
log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
}
}
......@@ -1096,9 +1120,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
}
public TransactionSendResult sendMessageInTransaction(final Message msg,
final TransactionListener tranExecuter, final Object arg)
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
if (null == tranExecuter) {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
Validators.checkMessage(msg, this.defaultMQProducer);
......@@ -1124,7 +1149,12 @@ public class DefaultMQProducerImpl implements MQProducerInner {
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
localTransactionState = tranExecuter.executeLocalTransaction(msg, arg);
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
......
......@@ -16,18 +16,19 @@
*/
package org.apache.rocketmq.client.impl.producer;
import java.util.Set;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.header.CheckTransactionStateRequestHeader;
import java.util.Set;
public interface MQProducerInner {
Set<String> getPublishTopicList();
boolean isPublishTopicNeedUpdate(final String topic);
TransactionListener checkListener();
TransactionCheckListener checkListener();
TransactionListener getCheckListener();
void checkTransactionState(
final String addr,
......
......@@ -464,16 +464,31 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
* This method is to send transactional messages.
*
* @param msg Transactional message to send.
* @param tranExecuter local transaction executor.
* @param arg Argument used along with local transaction executor.
* @return Transaction result.
* @throws MQClientException if there is any client error.
*/
@Override
public TransactionSendResult sendMessageInTransaction(Message msg, final Object arg)
public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter,
final Object arg)
throws MQClientException {
throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
}
/**
* This method is used to send transactional messages.
* @param msg Transactional message to send.
* @param arg Argument used along with local transaction executor.
* @return Transaction result.
* @throws MQClientException
*/
@Override
public TransactionSendResult sendMessageInTransaction(Message msg,
Object arg) throws MQClientException {
throw new RuntimeException("sendMessageInTransaction not implement, please use TransactionMQProducer class");
}
/**
* Create a topic on broker.
*
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.producer;
import org.apache.rocketmq.common.message.Message;
/**
* This interface will be removed in the version 5.0.0, interface {@link TransactionListener} is recommended.
*/
@Deprecated
public interface LocalTransactionExecuter {
LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg);
}
......@@ -80,7 +80,11 @@ public interface MQProducer extends MQAdmin {
void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
throws MQClientException, RemotingException, InterruptedException;
TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException;
TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException;
TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException;
//for batch
SendResult send(final Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException,
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.producer;
import org.apache.rocketmq.common.message.MessageExt;
/**
* This interface will be removed in the version 5.0.0, interface {@link TransactionListener} is recommended.
*/
@Deprecated
public interface TransactionCheckListener {
LocalTransactionState checkLocalTransactionState(final MessageExt msg);
}
......@@ -16,17 +16,21 @@
*/
package org.apache.rocketmq.client.producer;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.concurrent.ExecutorService;
public class TransactionMQProducer extends DefaultMQProducer {
private TransactionListener transactionListener;
private TransactionCheckListener transactionCheckListener;
private int checkThreadPoolMinSize = 1;
private int checkThreadPoolMaxSize = 1;
private int checkRequestHoldMax = 2000;
private ExecutorService executorService;
private TransactionListener transactionListener;
public TransactionMQProducer() {
}
......@@ -50,21 +54,77 @@ public class TransactionMQProducer extends DefaultMQProducer {
this.defaultMQProducerImpl.destroyTransactionEnv();
}
/**
* This method will be removed in the version 5.0.0, method <code>sendMessageInTransaction(Message,Object)</code>}
* is recommended.
*/
@Override
@Deprecated
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException {
if (null == this.transactionCheckListener) {
throw new MQClientException("localTransactionBranchCheckListener is null", null);
}
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);
}
@Override
public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException {
public TransactionSendResult sendMessageInTransaction(final Message msg,
final Object arg) throws MQClientException {
if (null == this.transactionListener) {
throw new MQClientException("TransactionListener is null", null);
}
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, transactionListener, arg);
return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}
public TransactionListener getTransactionListener() {
return transactionListener;
public TransactionCheckListener getTransactionCheckListener() {
return transactionCheckListener;
}
public void setTransactionListener(TransactionListener transactionListener) {
this.transactionListener = transactionListener;
/**
* This method will be removed in the version 5.0.0 and set a custom thread pool is recommended.
*/
@Deprecated
public void setTransactionCheckListener(TransactionCheckListener transactionCheckListener) {
this.transactionCheckListener = transactionCheckListener;
}
public int getCheckThreadPoolMinSize() {
return checkThreadPoolMinSize;
}
/**
* This method will be removed in the version 5.0.0 and set a custom thread pool is recommended.
*/
@Deprecated
public void setCheckThreadPoolMinSize(int checkThreadPoolMinSize) {
this.checkThreadPoolMinSize = checkThreadPoolMinSize;
}
public int getCheckThreadPoolMaxSize() {
return checkThreadPoolMaxSize;
}
/**
* This method will be removed in the version 5.0.0 and set a custom thread pool is recommended.
*/
@Deprecated
public void setCheckThreadPoolMaxSize(int checkThreadPoolMaxSize) {
this.checkThreadPoolMaxSize = checkThreadPoolMaxSize;
}
public int getCheckRequestHoldMax() {
return checkRequestHoldMax;
}
/**
* This method will be removed in the version 5.0.0 and set a custom thread pool is recommended.
*/
@Deprecated
public void setCheckRequestHoldMax(int checkRequestHoldMax) {
this.checkRequestHoldMax = checkRequestHoldMax;
}
public ExecutorService getExecutorService() {
......@@ -74,4 +134,12 @@ public class TransactionMQProducer extends DefaultMQProducer {
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
public TransactionListener getTransactionListener() {
return transactionListener;
}
public void setTransactionListener(TransactionListener transactionListener) {
this.transactionListener = transactionListener;
}
}
......@@ -64,6 +64,14 @@ public class BrokerConfig {
private int clientManageThreadPoolNums = 32;
private int consumerManageThreadPoolNums = 32;
private int heartbeatThreadPoolNums = Math.min(32, Runtime.getRuntime().availableProcessors());
<<<<<<< HEAD
=======
/**
* Thread numbers for EndTransactionProcessor
*/
private int endTransactionThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors() * 2;
>>>>>>> 53a63460d3a1599a6c51058bb51a73746233022d
private int flushConsumerOffsetInterval = 1000 * 5;
......@@ -79,6 +87,7 @@ public class BrokerConfig {
private int clientManagerThreadPoolQueueCapacity = 1000000;
private int consumerManagerThreadPoolQueueCapacity = 1000000;
private int heartbeatThreadPoolQueueCapacity = 50000;
private int endTransactionPoolQueueCapacity = 100000;
private int filterServerNums = 0;
......@@ -111,6 +120,7 @@ public class BrokerConfig {
private long waitTimeMillsInSendQueue = 200;
private long waitTimeMillsInPullQueue = 5 * 1000;
private long waitTimeMillsInHeartbeatQueue = 31 * 1000;
private long waitTimeMillsInTransactionQueue = 3 * 1000;
private long startAcceptSendRequestTimeStamp = 0L;
......@@ -150,13 +160,13 @@ public class BrokerConfig {
* that can be checked.
*/
@ImportantField
private long transactionTimeOut = 3 * 1000;
private long transactionTimeOut = 6 * 1000;
/**
* The maximum number of times the message was checked, if exceed this value, this message will be discarded.
*/
@ImportantField
private int transactionCheckMax = 5;
private int transactionCheckMax = 15;
/**
* Transaction message check interval.
......@@ -704,6 +714,7 @@ public class BrokerConfig {
this.transactionCheckInterval = transactionCheckInterval;
}
<<<<<<< HEAD
public boolean isAclPlug() {
return isAclPlug;
}
......@@ -712,4 +723,29 @@ public class BrokerConfig {
this.isAclPlug = isAclPlug;
}
=======
public int getEndTransactionThreadPoolNums() {
return endTransactionThreadPoolNums;
}
public void setEndTransactionThreadPoolNums(int endTransactionThreadPoolNums) {
this.endTransactionThreadPoolNums = endTransactionThreadPoolNums;
}
public int getEndTransactionPoolQueueCapacity() {
return endTransactionPoolQueueCapacity;
}
public void setEndTransactionPoolQueueCapacity(int endTransactionPoolQueueCapacity) {
this.endTransactionPoolQueueCapacity = endTransactionPoolQueueCapacity;
}
public long getWaitTimeMillsInTransactionQueue() {
return waitTimeMillsInTransactionQueue;
}
public void setWaitTimeMillsInTransactionQueue(long waitTimeMillsInTransactionQueue) {
this.waitTimeMillsInTransactionQueue = waitTimeMillsInTransactionQueue;
}
>>>>>>> 53a63460d3a1599a6c51058bb51a73746233022d
}
......@@ -18,7 +18,7 @@ package org.apache.rocketmq.common;
public class MQVersion {
public static final int CURRENT_VERSION = Version.V4_3_0.ordinal();
public static final int CURRENT_VERSION = Version.V4_3_1.ordinal();
public static String getVersionDesc(int value) {
int length = Version.values().length;
......
......@@ -57,7 +57,7 @@ public class MixAll {
public static final String WS_DOMAIN_SUBGROUP = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
//http://jmenv.tbsite.net:8080/rocketmq/nsaddr
//public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP;
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "AUTO_CREATE_TOPIC_KEY"; // Will be created at broker when isAutoCreateTopicEnable
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102"; // Will be created at broker when isAutoCreateTopicEnable
public static final String BENCHMARK_TOPIC = "BenchmarkTest";
public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER";
public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER";
......
#!/bin/sh
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
if [ -z "$ROCKETMQ_HOME" ] ; then
## resolve links - $0 may be a link to maven's home
PRG="$0"
# need this for relative symlinks
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG="`dirname "$PRG"`/$link"
fi
done
saveddir=`pwd`
ROCKETMQ_HOME=`dirname "$PRG"`/..
# make it fully qualified
ROCKETMQ_HOME=`cd "$ROCKETMQ_HOME" && pwd`
cd "$saveddir"
fi
export ROCKETMQ_HOME
sh ${ROCKETMQ_HOME}/bin/runserver.sh org.apache.rocketmq.filtersrv.FiltersrvStartup $@
@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
if not exist "%ROCKETMQ_HOME%\bin\runbroker.cmd" echo Please set the ROCKETMQ_HOME variable in your environment! & EXIT /B 1
call "%ROCKETMQ_HOME%\bin\runserver.cmd" org.apache.rocketmq.filtersrv.FiltersrvStartup %*
IF %ERRORLEVEL% EQU 0 (
ECHO "Filtersrv starts OK"
)
\ No newline at end of file
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<java>
<debug>false</debug>
<javahome>${JAVA_HOME}</javahome>
<jvmtype>server</jvmtype>
<mainclass>org.apache.rocketmq.filtersrv.FiltersrvStartup</mainclass>
<properties>
<java.ext.dirs>${cpd}/../lib</java.ext.dirs>
<rocketmq.home.dir>${cpd}/..</rocketmq.home.dir>
</properties>
<classpaths>
</classpaths>
<options>
<-Xms512m>
</-Xms512m>
<-Xmx1g>
</-Xmx1g>
<-XX:NewSize>256M</-XX:NewSize>
<-XX:MaxNewSize>512M</-XX:MaxNewSize>
<-XX:PermSize>128M</-XX:PermSize>
<-XX:MaxPermSize>128M</-XX:MaxPermSize>
</options>
</java>
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<appender name="DefaultAppender"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${user.home}/logs/rocketmqlogs/filtersrv_default.log</file>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>${user.home}/logs/rocketmqlogs/otherdays/filtersrv_default.%i.log.gz</fileNamePattern>
<minIndex>1</minIndex>
<maxIndex>5</maxIndex>
</rollingPolicy>
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<encoder>
<pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
<charset class="java.nio.charset.Charset">UTF-8</charset>
</encoder>
</appender>
<appender name="RocketmqFiltersrvAppender_inner"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${user.home}/logs/rocketmqlogs/filtersrv.log</file>
<append>true</append>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>${user.home}/logs/rocketmqlogs/otherdays/filtersrv.%i.log.gz</fileNamePattern>
<minIndex>1</minIndex>
<maxIndex>5</maxIndex>
</rollingPolicy>
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<encoder>
<pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
<charset class="java.nio.charset.Charset">UTF-8</charset>
</encoder>
</appender>
<appender name="RocketmqFiltersrvAppender" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="RocketmqFiltersrvAppender_inner"/>
<discardingThreshold>0</discardingThreshold>
</appender>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<append>true</append>
<encoder>
<pattern>%d{yyy-MM-dd HH\:mm\:ss,SSS} %p %t - %m%n</pattern>
<charset class="java.nio.charset.Charset">UTF-8</charset>
</encoder>
</appender>
<logger name="RocketmqFiltersrv" additivity="false">
<level value="INFO"/>
<appender-ref ref="RocketmqFiltersrvAppender"/>
</logger>
<logger name="RocketmqCommon" additivity="false">
<level value="INFO"/>
<appender-ref ref="RocketmqFiltersrvAppender"/>
</logger>
<logger name="RocketmqRemoting" additivity="false">
<level value="INFO"/>
<appender-ref ref="RocketmqFiltersrvAppender"/>
</logger>
<root>
<level value="INFO"/>
<appender-ref ref="DefaultAppender"/>
</root>
</configuration>
......@@ -17,15 +17,6 @@
package org.apache.rocketmq.example.benchmark;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.LinkedList;
import java.util.Timer;
......@@ -33,18 +24,27 @@ import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionCheckListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class TransactionProducer {
private static int threadCount;
private static int messageSize;
private static boolean isCheck;
private static boolean isCheckFalse;
private static boolean ischeck;
private static boolean ischeckffalse;
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 32;
messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2;
isCheck = args.length >= 3 && Boolean.parseBoolean(args[2]);
isCheckFalse = args.length >= 4 && Boolean.parseBoolean(args[3]);
ischeck = args.length >= 3 && Boolean.parseBoolean(args[2]);
ischeckffalse = args.length >= 4 && Boolean.parseBoolean(args[3]);
final Message msg = buildMessage(messageSize);
......@@ -73,8 +73,8 @@ public class TransactionProducer {
Long[] end = snapshotList.getLast();
final long sendTps =
(long)(((end[3] - begin[3]) / (double)(end[0] - begin[0])) * 1000L);
final double averageRT = (end[5] - begin[5]) / (double)(end[3] - begin[3]);
(long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L);
final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
System.out.printf(
"Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n",
......@@ -92,14 +92,16 @@ public class TransactionProducer {
}
}, 10000, 10000);
final TransactionListener transactionListener =
new TransactionListenerImpl(isCheckFalse, isCheck, statsBenchmark);
final TransactionCheckListener transactionCheckListener =
new TransactionCheckListenerBImpl(ischeckffalse, statsBenchmark);
final TransactionMQProducer producer = new TransactionMQProducer("benchmark_transaction_producer");
producer.setInstanceName(Long.toString(System.currentTimeMillis()));
producer.setTransactionListener(transactionListener);
producer.setTransactionCheckListener(transactionCheckListener);
producer.setDefaultTopicQueueNums(1000);
producer.start();
final TransactionExecuterBImpl tranExecuter = new TransactionExecuterBImpl(ischeck);
for (int i = 0; i < threadCount; i++) {
sendThreadPool.execute(new Runnable() {
@Override
......@@ -109,7 +111,7 @@ public class TransactionProducer {
// Thread.sleep(1000);
final long beginTimestamp = System.currentTimeMillis();
SendResult sendResult =
producer.sendMessageInTransaction(msg, null);
producer.sendMessageInTransaction(msg, tranExecuter, null);
if (sendResult != null) {
statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet();
......@@ -122,7 +124,8 @@ public class TransactionProducer {
boolean updated =
statsBenchmark.getSendMessageMaxRT().compareAndSet(prevMaxRT,
currentRT);
if (updated) { break; }
if (updated)
break;
prevMaxRT = statsBenchmark.getSendMessageMaxRT().get();
}
......@@ -150,37 +153,43 @@ public class TransactionProducer {
}
}
class TransactionExecuterBImpl implements LocalTransactionExecuter {
class TransactionListenerImpl implements TransactionListener {
private boolean isCheckFalse;
private boolean ischeck;
public TransactionExecuterBImpl(boolean ischeck) {
this.ischeck = ischeck;
}
@Override
public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
if (ischeck) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
class TransactionCheckListenerBImpl implements TransactionCheckListener {
private boolean ischeckffalse;
private StatsBenchmarkTProducer statsBenchmarkTProducer;
private boolean isCheckLocal;
public TransactionListenerImpl(boolean isCheckFalse, boolean isCheckLocal,
public TransactionCheckListenerBImpl(boolean ischeckffalse,
StatsBenchmarkTProducer statsBenchmarkTProducer) {
this.isCheckFalse = isCheckFalse;
this.isCheckLocal = isCheckLocal;
this.ischeckffalse = ischeckffalse;
this.statsBenchmarkTProducer = statsBenchmarkTProducer;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
statsBenchmarkTProducer.getCheckRequestSuccessCount().incrementAndGet();
if (isCheckFalse) {
if (ischeckffalse) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState executeLocalTransaction(final Message msg, final Object arg) {
if (isCheckLocal) {
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
class StatsBenchmarkTProducer {
......
......@@ -48,6 +48,8 @@ public class TransactionListenerImpl implements TransactionListener {
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
......
......@@ -131,7 +131,7 @@ public class ConsumeMessageCommand implements SubCommand {
try {
/* Group name must be set before consumer start */
if (commandLine.hasOption('g')) {
String consumerGroup = commandLine.getOptionValue('b').trim();
String consumerGroup = commandLine.getOptionValue('g').trim();
defaultMQPullConsumer.setConsumerGroup(consumerGroup);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册