提交 37b6dd02 编写于 作者: C chengxiangwang

Merge branch 'develop' of https://github.com/apache/rocketmq

...@@ -10,4 +10,5 @@ devenv ...@@ -10,4 +10,5 @@ devenv
*.versionsBackup *.versionsBackup
!NOTICE-BIN !NOTICE-BIN
!LICENSE-BIN !LICENSE-BIN
.DS_Store .DS_Store
\ No newline at end of file localbin
...@@ -198,7 +198,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ ...@@ -198,7 +198,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
if (null != checkImmunityTimeStr) { if (null != checkImmunityTimeStr) {
checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout); checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
if (valueOfCurrentMinusBorn < checkImmunityTime) { if (valueOfCurrentMinusBorn < checkImmunityTime) {
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt, checkImmunityTime)) { if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
newOffset = i + 1; newOffset = i + 1;
i++; i++;
continue; continue;
...@@ -315,33 +315,26 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ ...@@ -315,33 +315,26 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ
* @param removeMap Op message map to determine whether a half message was responded by producer. * @param removeMap Op message map to determine whether a half message was responded by producer.
* @param doneOpOffset Op Message which has been checked. * @param doneOpOffset Op Message which has been checked.
* @param msgExt Half message * @param msgExt Half message
* @param checkImmunityTime User defined time to avoid being detected early.
* @return Return true if put success, otherwise return false. * @return Return true if put success, otherwise return false.
*/ */
private boolean checkPrepareQueueOffset(HashMap<Long, Long> removeMap, List<Long> doneOpOffset, MessageExt msgExt, private boolean checkPrepareQueueOffset(HashMap<Long, Long> removeMap, List<Long> doneOpOffset,
long checkImmunityTime) { MessageExt msgExt) {
if (System.currentTimeMillis() - msgExt.getBornTimestamp() < checkImmunityTime) { String prepareQueueOffsetStr = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET);
String prepareQueueOffsetStr = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET); if (null == prepareQueueOffsetStr) {
if (null == prepareQueueOffsetStr) { return putImmunityMsgBackToHalfQueue(msgExt);
return putImmunityMsgBackToHalfQueue(msgExt); } else {
long prepareQueueOffset = getLong(prepareQueueOffsetStr);
if (-1 == prepareQueueOffset) {
return false;
} else { } else {
long prepareQueueOffset = getLong(prepareQueueOffsetStr); if (removeMap.containsKey(prepareQueueOffset)) {
if (-1 == prepareQueueOffset) { long tmpOpOffset = removeMap.remove(prepareQueueOffset);
return false; doneOpOffset.add(tmpOpOffset);
return true;
} else { } else {
if (removeMap.containsKey(prepareQueueOffset)) { return putImmunityMsgBackToHalfQueue(msgExt);
long tmpOpOffset = removeMap.remove(prepareQueueOffset);
doneOpOffset.add(tmpOpOffset);
return true;
} else {
return putImmunityMsgBackToHalfQueue(msgExt);
}
} }
} }
} else {
return true;
} }
} }
......
...@@ -257,6 +257,18 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume ...@@ -257,6 +257,18 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, timeout); return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, timeout);
} }
@Override
public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums);
}
@Override
public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, timeout);
}
@Override @Override
public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback) public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback)
throws MQClientException, RemotingException, InterruptedException { throws MQClientException, RemotingException, InterruptedException {
...@@ -270,6 +282,20 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume ...@@ -270,6 +282,20 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback, timeout); this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback, timeout);
} }
@Override
public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
PullCallback pullCallback)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, pullCallback);
}
@Override
public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
PullCallback pullCallback, long timeout)
throws MQClientException, RemotingException, InterruptedException {
this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, pullCallback, timeout);
}
@Override @Override
public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums) public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException { throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
......
...@@ -66,6 +66,39 @@ public interface MQPullConsumer extends MQConsumer { ...@@ -66,6 +66,39 @@ public interface MQPullConsumer extends MQConsumer {
final int maxNums, final long timeout) throws MQClientException, RemotingException, final int maxNums, final long timeout) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException; MQBrokerException, InterruptedException;
/**
* Pulling the messages, not blocking
* <p>
* support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}
* </p>
*
* @param mq from which message queue
* @param selector message selector({@link MessageSelector}), can be null.
* @param offset from where to pull
* @param maxNums max pulling numbers
* @return The resulting {@code PullRequest}
*/
PullResult pull(final MessageQueue mq, final MessageSelector selector, final long offset,
final int maxNums) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
/**
* Pulling the messages in the specified timeout
* <p>
* support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92}
* </p>
*
* @param mq from which message queue
* @param selector message selector({@link MessageSelector}), can be null.
* @param offset from where to pull
* @param maxNums max pulling numbers
* @param timeout Pulling the messages in the specified timeout
* @return The resulting {@code PullRequest}
*/
PullResult pull(final MessageQueue mq, final MessageSelector selector, final long offset,
final int maxNums, final long timeout) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
/** /**
* Pulling the messages in a async. way * Pulling the messages in a async. way
*/ */
...@@ -80,6 +113,20 @@ public interface MQPullConsumer extends MQConsumer { ...@@ -80,6 +113,20 @@ public interface MQPullConsumer extends MQConsumer {
final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException, final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException,
InterruptedException; InterruptedException;
/**
* Pulling the messages in a async. way. Support message selection
*/
void pull(final MessageQueue mq, final MessageSelector selector, final long offset, final int maxNums,
final PullCallback pullCallback) throws MQClientException, RemotingException,
InterruptedException;
/**
* Pulling the messages in a async. way. Support message selection
*/
void pull(final MessageQueue mq, final MessageSelector selector, final long offset, final int maxNums,
final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException,
InterruptedException;
/** /**
* Pulling the messages,if no message arrival,blocking some time * Pulling the messages,if no message arrival,blocking some time
* *
......
...@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMap; ...@@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PullCallback; import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
...@@ -46,6 +47,7 @@ import org.apache.rocketmq.common.MixAll; ...@@ -46,6 +47,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.filter.FilterAPI; import org.apache.rocketmq.common.filter.FilterAPI;
import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
...@@ -158,17 +160,58 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { ...@@ -158,17 +160,58 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout) public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException { throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.pullSyncImpl(mq, subExpression, offset, maxNums, false, timeout); SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, timeout);
} }
private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block, public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return pull(mq, messageSelector, offset, maxNums, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
}
public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector);
return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, timeout);
}
private SubscriptionData getSubscriptionData(MessageQueue mq, String subExpression)
throws MQClientException {
if (null == mq) {
throw new MQClientException("mq is null", null);
}
try {
return FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
mq.getTopic(), subExpression);
} catch (Exception e) {
throw new MQClientException("parse subscription error", e);
}
}
private SubscriptionData getSubscriptionData(MessageQueue mq, MessageSelector messageSelector)
throws MQClientException {
if (null == mq) {
throw new MQClientException("mq is null", null);
}
try {
return FilterAPI.build(mq.getTopic(),
messageSelector.getExpression(), messageSelector.getExpressionType());
} catch (Exception e) {
throw new MQClientException("parse subscription error", e);
}
}
private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block,
long timeout) long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException { throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK(); this.makeSureStateOK();
if (null == mq) { if (null == mq) {
throw new MQClientException("mq is null", null); throw new MQClientException("mq is null", null);
} }
if (offset < 0) { if (offset < 0) {
...@@ -183,20 +226,14 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { ...@@ -183,20 +226,14 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false); int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
SubscriptionData subscriptionData;
try {
subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
mq.getTopic(), subExpression);
} catch (Exception e) {
throw new MQClientException("parse subscription error", e);
}
long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
PullResult pullResult = this.pullAPIWrapper.pullKernelImpl( PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
mq, mq,
subscriptionData.getSubString(), subscriptionData.getSubString(),
0L, subscriptionData.getExpressionType(),
isTagType ? 0L : subscriptionData.getSubVersion(),
offset, offset,
maxNums, maxNums,
sysFlag, sysFlag,
...@@ -369,12 +406,27 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { ...@@ -369,12 +406,27 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback,
long timeout) long timeout)
throws MQClientException, RemotingException, InterruptedException { throws MQClientException, RemotingException, InterruptedException {
this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout); SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, false, timeout);
}
public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
PullCallback pullCallback)
throws MQClientException, RemotingException, InterruptedException {
pull(mq, messageSelector, offset, maxNums, pullCallback, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis());
}
public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums,
PullCallback pullCallback,
long timeout)
throws MQClientException, RemotingException, InterruptedException {
SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector);
this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, false, timeout);
} }
private void pullAsyncImpl( private void pullAsyncImpl(
final MessageQueue mq, final MessageQueue mq,
final String subExpression, final SubscriptionData subscriptionData,
final long offset, final long offset,
final int maxNums, final int maxNums,
final PullCallback pullCallback, final PullCallback pullCallback,
...@@ -403,20 +455,14 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { ...@@ -403,20 +455,14 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
try { try {
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false); int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
final SubscriptionData subscriptionData;
try {
subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),
mq.getTopic(), subExpression);
} catch (Exception e) {
throw new MQClientException("parse subscription error", e);
}
long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;
boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
this.pullAPIWrapper.pullKernelImpl( this.pullAPIWrapper.pullKernelImpl(
mq, mq,
subscriptionData.getSubString(), subscriptionData.getSubString(),
0L, subscriptionData.getExpressionType(),
isTagType ? 0L : subscriptionData.getSubVersion(),
offset, offset,
maxNums, maxNums,
sysFlag, sysFlag,
...@@ -444,7 +490,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { ...@@ -444,7 +490,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums) public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException { throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
} }
public DefaultMQPullConsumer getDefaultMQPullConsumer() { public DefaultMQPullConsumer getDefaultMQPullConsumer() {
...@@ -454,7 +501,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { ...@@ -454,7 +501,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums,
PullCallback pullCallback) PullCallback pullCallback)
throws MQClientException, RemotingException, InterruptedException { throws MQClientException, RemotingException, InterruptedException {
this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, true, SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression);
this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, true,
this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());
} }
......
...@@ -209,34 +209,6 @@ public class PullAPIWrapper { ...@@ -209,34 +209,6 @@ public class PullAPIWrapper {
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
} }
public PullResult pullKernelImpl(
final MessageQueue mq,
final String subExpression,
final long subVersion,
final long offset,
final int maxNums,
final int sysFlag,
final long commitOffset,
final long brokerSuspendMaxTimeMillis,
final long timeoutMillis,
final CommunicationMode communicationMode,
final PullCallback pullCallback
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
return pullKernelImpl(
mq,
subExpression,
ExpressionType.TAG,
subVersion, offset,
maxNums,
sysFlag,
commitOffset,
brokerSuspendMaxTimeMillis,
timeoutMillis,
communicationMode,
pullCallback
);
}
public long recalculatePullFromWhichNode(final MessageQueue mq) { public long recalculatePullFromWhichNode(final MessageQueue mq) {
if (this.isConnectBrokerByUser()) { if (this.isConnectBrokerByUser()) {
return this.defaultBrokerId; return this.defaultBrokerId;
......
...@@ -1046,6 +1046,19 @@ public class MQClientInstance { ...@@ -1046,6 +1046,19 @@ public class MQClientInstance {
if (this.brokerVersionTable.get(brokerName).containsKey(brokerAddr)) { if (this.brokerVersionTable.get(brokerName).containsKey(brokerAddr)) {
return this.brokerVersionTable.get(brokerName).get(brokerAddr); return this.brokerVersionTable.get(brokerName).get(brokerAddr);
} }
} else {
HeartbeatData heartbeatData = prepareHeartbeatData();
try {
int version = this.mQClientAPIImpl.sendHearbeat(brokerAddr, heartbeatData, 3000);
return version;
} catch (Exception e) {
if (this.isBrokerInNameServer(brokerAddr)) {
log.info("send heart beat to broker[{} {}] failed", brokerName, brokerAddr);
} else {
log.info("send heart beat to broker[{} {}] exception, because the broker not up, forget it", brokerName,
brokerAddr);
}
}
} }
return 0; return 0;
} }
......
...@@ -30,8 +30,10 @@ import java.util.concurrent.ConcurrentMap; ...@@ -30,8 +30,10 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.common.ClientErrorCode; import org.apache.rocketmq.client.common.ClientErrorCode;
...@@ -101,6 +103,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -101,6 +103,10 @@ public class DefaultMQProducerImpl implements MQProducerInner {
private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
private final ExecutorService defaultAsyncSenderExecutor;
private ExecutorService asyncSenderExecutor;
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) { public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) {
this(defaultMQProducer, null); this(defaultMQProducer, null);
} }
...@@ -108,6 +114,22 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -108,6 +114,22 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) { public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer; this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook; this.rpcHook = rpcHook;
this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Runtime.getRuntime().availableProcessors(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.asyncSenderThreadPoolQueue,
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
}
});
} }
public void registerCheckForbiddenHook(CheckForbiddenHook checkForbiddenHook) { public void registerCheckForbiddenHook(CheckForbiddenHook checkForbiddenHook) {
...@@ -456,7 +478,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -456,7 +478,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public void send(final Message msg, final SendCallback sendCallback, final long timeout) public void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException { throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis(); final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getCallbackExecutor(); ExecutorService executor = this.getAsyncSenderExecutor();
try { try {
executor.submit(new Runnable() { executor.submit(new Runnable() {
@Override @Override
...@@ -957,7 +979,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -957,7 +979,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout) public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException { throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis(); final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getCallbackExecutor(); ExecutorService executor = this.getAsyncSenderExecutor();
try { try {
executor.submit(new Runnable() { executor.submit(new Runnable() {
@Override @Override
...@@ -1079,7 +1101,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1079,7 +1101,7 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout) public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException { throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis(); final long beginStartTime = System.currentTimeMillis();
ExecutorService executor = this.getCallbackExecutor(); ExecutorService executor = this.getAsyncSenderExecutor();
try { try {
executor.submit(new Runnable() { executor.submit(new Runnable() {
@Override @Override
...@@ -1243,9 +1265,13 @@ public class DefaultMQProducerImpl implements MQProducerInner { ...@@ -1243,9 +1265,13 @@ public class DefaultMQProducerImpl implements MQProducerInner {
public void setCallbackExecutor(final ExecutorService callbackExecutor) { public void setCallbackExecutor(final ExecutorService callbackExecutor) {
this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor); this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor);
} }
public ExecutorService getCallbackExecutor() {
return this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().getCallbackExecutor();
public ExecutorService getAsyncSenderExecutor() {
return null == asyncSenderExecutor ? defaultAsyncSenderExecutor : asyncSenderExecutor;
}
public void setAsyncSenderExecutor(ExecutorService asyncSenderExecutor) {
this.asyncSenderExecutor = asyncSenderExecutor;
} }
public SendResult send(Message msg, public SendResult send(Message msg,
......
...@@ -655,6 +655,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { ...@@ -655,6 +655,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {
this.defaultMQProducerImpl.setCallbackExecutor(callbackExecutor); this.defaultMQProducerImpl.setCallbackExecutor(callbackExecutor);
} }
/**
* Sets an Executor to be used for executing asynchronous send. If the Executor is not set, {@link
* DefaultMQProducerImpl#defaultAsyncSenderExecutor} will be used.
*
* @param asyncSenderExecutor the instance of Executor
*/
public void setAsyncSenderExecutor(final ExecutorService asyncSenderExecutor) {
this.defaultMQProducerImpl.setAsyncSenderExecutor(asyncSenderExecutor);
}
private MessageBatch batch(Collection<Message> msgs) throws MQClientException { private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
MessageBatch msgBatch; MessageBatch msgBatch;
try { try {
......
...@@ -167,10 +167,7 @@ public class DefaultMQProducerTest { ...@@ -167,10 +167,7 @@ public class DefaultMQProducerTest {
@Test @Test
public void testSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { public void testSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
final CountDownLatch countDownLatch = new CountDownLatch(1); final CountDownLatch countDownLatch = new CountDownLatch(1);
when(mQClientAPIImpl.getRemotingClient()).thenReturn((nettyRemotingClient));
when(nettyRemotingClient.getCallbackExecutor()).thenReturn(callbackExecutor);
producer.send(message, new SendCallback() { producer.send(message, new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
...@@ -186,15 +183,11 @@ public class DefaultMQProducerTest { ...@@ -186,15 +183,11 @@ public class DefaultMQProducerTest {
} }
}); });
countDownLatch.await(3000L, TimeUnit.MILLISECONDS); countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
callbackExecutor.shutdown();
} }
@Test @Test
public void testSendMessageAsync() throws RemotingException, MQClientException, InterruptedException { public void testSendMessageAsync() throws RemotingException, MQClientException, InterruptedException {
final AtomicInteger cc = new AtomicInteger(0); final AtomicInteger cc = new AtomicInteger(0);
final CountDownLatch countDownLatch = new CountDownLatch(6); final CountDownLatch countDownLatch = new CountDownLatch(6);
ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
when(mQClientAPIImpl.getRemotingClient()).thenReturn((nettyRemotingClient));
when(nettyRemotingClient.getCallbackExecutor()).thenReturn(callbackExecutor);
SendCallback sendCallback = new SendCallback() { SendCallback sendCallback = new SendCallback() {
@Override @Override
...@@ -226,16 +219,13 @@ public class DefaultMQProducerTest { ...@@ -226,16 +219,13 @@ public class DefaultMQProducerTest {
producer.send(message,messageQueueSelector,null,sendCallback,1000); producer.send(message,messageQueueSelector,null,sendCallback,1000);
countDownLatch.await(3000L, TimeUnit.MILLISECONDS); countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
callbackExecutor.shutdown();
assertThat(cc.get()).isEqualTo(6); assertThat(cc.get()).isEqualTo(6);
} }
@Test @Test
public void testSendMessageAsync_BodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { public void testSendMessageAsync_BodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
final CountDownLatch countDownLatch = new CountDownLatch(1); final CountDownLatch countDownLatch = new CountDownLatch(1);
when(mQClientAPIImpl.getRemotingClient()).thenReturn((nettyRemotingClient));
when(nettyRemotingClient.getCallbackExecutor()).thenReturn(callbackExecutor);
producer.send(bigMessage, new SendCallback() { producer.send(bigMessage, new SendCallback() {
@Override @Override
public void onSuccess(SendResult sendResult) { public void onSuccess(SendResult sendResult) {
...@@ -251,7 +241,6 @@ public class DefaultMQProducerTest { ...@@ -251,7 +241,6 @@ public class DefaultMQProducerTest {
} }
}); });
countDownLatch.await(3000L, TimeUnit.MILLISECONDS); countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
callbackExecutor.shutdown();
} }
@Test @Test
......
...@@ -18,5 +18,5 @@ ...@@ -18,5 +18,5 @@
package org.apache.rocketmq.common.constant; package org.apache.rocketmq.common.constant;
public class DBMsgConstants { public class DBMsgConstants {
public static final int MAX_BODY_SIZE = 64 * 1024 * 1204; //64KB public static final int MAX_BODY_SIZE = 64 * 1024 * 1024; //64KB
} }
...@@ -604,7 +604,7 @@ public class MessageStoreConfig { ...@@ -604,7 +604,7 @@ public class MessageStoreConfig {
} }
/** /**
* Enable transient commitLog store poll only if transientStorePoolEnable is true and the FlushDiskType is * Enable transient commitLog store pool only if transientStorePoolEnable is true and the FlushDiskType is
* ASYNC_FLUSH * ASYNC_FLUSH
* *
* @return <tt>true</tt> or <tt>false</tt> * @return <tt>true</tt> or <tt>false</tt>
......
...@@ -17,12 +17,18 @@ ...@@ -17,12 +17,18 @@
package org.apache.rocketmq.test.client.consumer.filter; package org.apache.rocketmq.test.client.consumer.filter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.test.base.BaseConf; import org.apache.rocketmq.test.base.BaseConf;
import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT;
import org.apache.rocketmq.test.client.consumer.broadcast.normal.NormalMsgTwoSameGroupConsumerIT;
import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer; import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer;
import org.apache.rocketmq.test.factory.ConsumerFactory; import org.apache.rocketmq.test.factory.ConsumerFactory;
...@@ -39,12 +45,14 @@ public class SqlFilterIT extends BaseConf { ...@@ -39,12 +45,14 @@ public class SqlFilterIT extends BaseConf {
private static Logger logger = Logger.getLogger(SqlFilterIT.class); private static Logger logger = Logger.getLogger(SqlFilterIT.class);
private RMQNormalProducer producer = null; private RMQNormalProducer producer = null;
private String topic = null; private String topic = null;
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
@Before @Before
public void setUp() { public void setUp() {
topic = initTopic(); topic = initTopic();
logger.info(String.format("use topic: %s;", topic)); logger.info(String.format("use topic: %s;", topic));
producer = getProducer(nsAddr, topic); producer = getProducer(nsAddr, topic);
OFFSE_TABLE.clear();
} }
@After @After
...@@ -71,4 +79,65 @@ public class SqlFilterIT extends BaseConf { ...@@ -71,4 +79,65 @@ public class SqlFilterIT extends BaseConf {
assertThat(consumer.getListener().getAllMsgBody().size()).isEqualTo(msgSize * 2); assertThat(consumer.getListener().getAllMsgBody().size()).isEqualTo(msgSize * 2);
} }
@Test
public void testFilterPullConsumer() throws Exception {
int msgSize = 16;
String group = initConsumerGroup();
MessageSelector selector = MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))");
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group);
consumer.setNamesrvAddr(nsAddr);
consumer.start();
Thread.sleep(3000);
producer.send("TagA", msgSize);
producer.send("TagB", msgSize);
producer.send("TagC", msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize * 3, producer.getAllUndupMsgBody().size());
List<String> receivedMessage = new ArrayList<>(2);
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic);
for (MessageQueue mq : mqs) {
SINGLE_MQ:
while (true) {
try {
PullResult pullResult =
consumer.pull(mq, selector, getMessageQueueOffset(mq), 32);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
List<MessageExt> msgs = pullResult.getMsgFoundList();
for (MessageExt msg : msgs) {
receivedMessage.add(new String(msg.getBody()));
}
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
assertThat(receivedMessage.size()).isEqualTo(msgSize * 2);
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null)
return offset;
return 0;
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}
} }
...@@ -155,7 +155,7 @@ public class ConsumerProgressSubCommand implements SubCommand { ...@@ -155,7 +155,7 @@ public class ConsumerProgressSubCommand implements SubCommand {
} }
System.out.printf("%n"); System.out.printf("%n");
System.out.printf("Consume TPS: %s%n", consumeStats.getConsumeTps()); System.out.printf("Consume TPS: %.2f%n", consumeStats.getConsumeTps());
System.out.printf("Diff Total: %d%n", diffTotal); System.out.printf("Diff Total: %d%n", diffTotal);
} else { } else {
System.out.printf("%-32s %-6s %-24s %-5s %-14s %-7s %s%n", System.out.printf("%-32s %-6s %-24s %-5s %-14s %-7s %s%n",
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册