diff --git a/.gitignore b/.gitignore index 80c6f569862c2916f5fc8b11ed6d3690064e3938..bd19c1f640a54a82a52a69e04cbe6a6ee80f4328 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,5 @@ devenv *.versionsBackup !NOTICE-BIN !LICENSE-BIN -.DS_Store \ No newline at end of file +.DS_Store +localbin diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java index 15e5c84ff2df5dc8cbc486f85022fff7811d3e20..1c227af150200adde921551b7575b965df96c58c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java @@ -198,7 +198,7 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ if (null != checkImmunityTimeStr) { checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout); if (valueOfCurrentMinusBorn < checkImmunityTime) { - if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt, checkImmunityTime)) { + if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) { newOffset = i + 1; i++; continue; @@ -315,33 +315,26 @@ public class TransactionalMessageServiceImpl implements TransactionalMessageServ * @param removeMap Op message map to determine whether a half message was responded by producer. * @param doneOpOffset Op Message which has been checked. * @param msgExt Half message - * @param checkImmunityTime User defined time to avoid being detected early. * @return Return true if put success, otherwise return false. */ - private boolean checkPrepareQueueOffset(HashMap removeMap, List doneOpOffset, MessageExt msgExt, - long checkImmunityTime) { - if (System.currentTimeMillis() - msgExt.getBornTimestamp() < checkImmunityTime) { - String prepareQueueOffsetStr = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET); - if (null == prepareQueueOffsetStr) { - return putImmunityMsgBackToHalfQueue(msgExt); + private boolean checkPrepareQueueOffset(HashMap removeMap, List doneOpOffset, + MessageExt msgExt) { + String prepareQueueOffsetStr = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET); + if (null == prepareQueueOffsetStr) { + return putImmunityMsgBackToHalfQueue(msgExt); + } else { + long prepareQueueOffset = getLong(prepareQueueOffsetStr); + if (-1 == prepareQueueOffset) { + return false; } else { - long prepareQueueOffset = getLong(prepareQueueOffsetStr); - if (-1 == prepareQueueOffset) { - return false; + if (removeMap.containsKey(prepareQueueOffset)) { + long tmpOpOffset = removeMap.remove(prepareQueueOffset); + doneOpOffset.add(tmpOpOffset); + return true; } else { - if (removeMap.containsKey(prepareQueueOffset)) { - long tmpOpOffset = removeMap.remove(prepareQueueOffset); - doneOpOffset.add(tmpOpOffset); - return true; - } else { - return putImmunityMsgBackToHalfQueue(msgExt); - } + return putImmunityMsgBackToHalfQueue(msgExt); } - } - - } else { - return true; } } diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java index cd70670307846d35208248ca0930d4f1c874fdd3..6befbf3b5e92f4bc7531892a77e7cb60cc00a528 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java @@ -257,6 +257,18 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume return this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, timeout); } + @Override + public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums); + } + + @Override + public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, timeout); + } + @Override public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback) throws MQClientException, RemotingException, InterruptedException { @@ -270,6 +282,20 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume this.defaultMQPullConsumerImpl.pull(mq, subExpression, offset, maxNums, pullCallback, timeout); } + @Override + public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, + PullCallback pullCallback) + throws MQClientException, RemotingException, InterruptedException { + this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, pullCallback); + } + + @Override + public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, + PullCallback pullCallback, long timeout) + throws MQClientException, RemotingException, InterruptedException { + this.defaultMQPullConsumerImpl.pull(mq, messageSelector, offset, maxNums, pullCallback, timeout); + } + @Override public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java index 33002c9837176982f9015093a6b945a512b04111..28b807c2ed89ac460d9cfd0e61a35dc144c42242 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java @@ -66,6 +66,39 @@ public interface MQPullConsumer extends MQConsumer { final int maxNums, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException; + /** + * Pulling the messages, not blocking + *

+ * support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92} + *

+ * + * @param mq from which message queue + * @param selector message selector({@link MessageSelector}), can be null. + * @param offset from where to pull + * @param maxNums max pulling numbers + * @return The resulting {@code PullRequest} + */ + PullResult pull(final MessageQueue mq, final MessageSelector selector, final long offset, + final int maxNums) throws MQClientException, RemotingException, MQBrokerException, + InterruptedException; + + /** + * Pulling the messages in the specified timeout + *

+ * support other message selection, such as {@link org.apache.rocketmq.common.filter.ExpressionType#SQL92} + *

+ * + * @param mq from which message queue + * @param selector message selector({@link MessageSelector}), can be null. + * @param offset from where to pull + * @param maxNums max pulling numbers + * @param timeout Pulling the messages in the specified timeout + * @return The resulting {@code PullRequest} + */ + PullResult pull(final MessageQueue mq, final MessageSelector selector, final long offset, + final int maxNums, final long timeout) throws MQClientException, RemotingException, MQBrokerException, + InterruptedException; + /** * Pulling the messages in a async. way */ @@ -80,6 +113,20 @@ public interface MQPullConsumer extends MQConsumer { final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException, InterruptedException; + /** + * Pulling the messages in a async. way. Support message selection + */ + void pull(final MessageQueue mq, final MessageSelector selector, final long offset, final int maxNums, + final PullCallback pullCallback) throws MQClientException, RemotingException, + InterruptedException; + + /** + * Pulling the messages in a async. way. Support message selection + */ + void pull(final MessageQueue mq, final MessageSelector selector, final long offset, final int maxNums, + final PullCallback pullCallback, long timeout) throws MQClientException, RemotingException, + InterruptedException; + /** * Pulling the messages,if no message arrival,blocking some time * diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index 420d89b2fd0fb5e7abe8f7881eeee8c0c3d3e265..39c43d592d7772d335e166fa88af6a8322fd9438 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.PullCallback; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; @@ -46,6 +47,7 @@ import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceState; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.filter.FilterAPI; import org.apache.rocketmq.common.help.FAQUrl; import org.apache.rocketmq.logging.InternalLogger; @@ -158,17 +160,58 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { public PullResult pull(MessageQueue mq, String subExpression, long offset, int maxNums, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.pullSyncImpl(mq, subExpression, offset, maxNums, false, timeout); + SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression); + return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, timeout); } - private PullResult pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block, + public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + return pull(mq, messageSelector, offset, maxNums, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis()); + } + + public PullResult pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, long timeout) + throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector); + return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, false, timeout); + } + + private SubscriptionData getSubscriptionData(MessageQueue mq, String subExpression) + throws MQClientException { + + if (null == mq) { + throw new MQClientException("mq is null", null); + } + + try { + return FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), + mq.getTopic(), subExpression); + } catch (Exception e) { + throw new MQClientException("parse subscription error", e); + } + } + + private SubscriptionData getSubscriptionData(MessageQueue mq, MessageSelector messageSelector) + throws MQClientException { + + if (null == mq) { + throw new MQClientException("mq is null", null); + } + + try { + return FilterAPI.build(mq.getTopic(), + messageSelector.getExpression(), messageSelector.getExpressionType()); + } catch (Exception e) { + throw new MQClientException("parse subscription error", e); + } + } + + private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); if (null == mq) { throw new MQClientException("mq is null", null); - } if (offset < 0) { @@ -183,20 +226,14 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false); - SubscriptionData subscriptionData; - try { - subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), - mq.getTopic(), subExpression); - } catch (Exception e) { - throw new MQClientException("parse subscription error", e); - } - long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; + boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType()); PullResult pullResult = this.pullAPIWrapper.pullKernelImpl( mq, subscriptionData.getSubString(), - 0L, + subscriptionData.getExpressionType(), + isTagType ? 0L : subscriptionData.getSubVersion(), offset, maxNums, sysFlag, @@ -369,12 +406,27 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { public void pull(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback, long timeout) throws MQClientException, RemotingException, InterruptedException { - this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, false, timeout); + SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression); + this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, false, timeout); + } + + public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, + PullCallback pullCallback) + throws MQClientException, RemotingException, InterruptedException { + pull(mq, messageSelector, offset, maxNums, pullCallback, this.defaultMQPullConsumer.getConsumerPullTimeoutMillis()); + } + + public void pull(MessageQueue mq, MessageSelector messageSelector, long offset, int maxNums, + PullCallback pullCallback, + long timeout) + throws MQClientException, RemotingException, InterruptedException { + SubscriptionData subscriptionData = getSubscriptionData(mq, messageSelector); + this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, false, timeout); } private void pullAsyncImpl( final MessageQueue mq, - final String subExpression, + final SubscriptionData subscriptionData, final long offset, final int maxNums, final PullCallback pullCallback, @@ -403,20 +455,14 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { try { int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false); - final SubscriptionData subscriptionData; - try { - subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), - mq.getTopic(), subExpression); - } catch (Exception e) { - throw new MQClientException("parse subscription error", e); - } - long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; + boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType()); this.pullAPIWrapper.pullKernelImpl( mq, subscriptionData.getSubString(), - 0L, + subscriptionData.getExpressionType(), + isTagType ? 0L : subscriptionData.getSubVersion(), offset, maxNums, sysFlag, @@ -444,7 +490,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); + SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression); + return this.pullSyncImpl(mq, subscriptionData, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); } public DefaultMQPullConsumer getDefaultMQPullConsumer() { @@ -454,7 +501,8 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { public void pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums, PullCallback pullCallback) throws MQClientException, RemotingException, InterruptedException { - this.pullAsyncImpl(mq, subExpression, offset, maxNums, pullCallback, true, + SubscriptionData subscriptionData = getSubscriptionData(mq, subExpression); + this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, pullCallback, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java index b650e35e02f8b908c480a76cd5daf51131dc5e32..1d2d24fa3b9d05aea77137fc33439a4bff3e777b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/PullAPIWrapper.java @@ -209,34 +209,6 @@ public class PullAPIWrapper { throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); } - public PullResult pullKernelImpl( - final MessageQueue mq, - final String subExpression, - final long subVersion, - final long offset, - final int maxNums, - final int sysFlag, - final long commitOffset, - final long brokerSuspendMaxTimeMillis, - final long timeoutMillis, - final CommunicationMode communicationMode, - final PullCallback pullCallback - ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { - return pullKernelImpl( - mq, - subExpression, - ExpressionType.TAG, - subVersion, offset, - maxNums, - sysFlag, - commitOffset, - brokerSuspendMaxTimeMillis, - timeoutMillis, - communicationMode, - pullCallback - ); - } - public long recalculatePullFromWhichNode(final MessageQueue mq) { if (this.isConnectBrokerByUser()) { return this.defaultBrokerId; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 9ffaed0a4f9086a87d88603f00f0335330af560d..80347d1052ed31f19abe8bb743d4b37ceb4a534d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -1046,6 +1046,19 @@ public class MQClientInstance { if (this.brokerVersionTable.get(brokerName).containsKey(brokerAddr)) { return this.brokerVersionTable.get(brokerName).get(brokerAddr); } + } else { + HeartbeatData heartbeatData = prepareHeartbeatData(); + try { + int version = this.mQClientAPIImpl.sendHearbeat(brokerAddr, heartbeatData, 3000); + return version; + } catch (Exception e) { + if (this.isBrokerInNameServer(brokerAddr)) { + log.info("send heart beat to broker[{} {}] failed", brokerName, brokerAddr); + } else { + log.info("send heart beat to broker[{} {}] exception, because the broker not up, forget it", brokerName, + brokerAddr); + } + } } return 0; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 7ace9d5b07d13bb850e2c51bc3eb4d9ee9d517ab..90f4f7876352c613dbce7e8d31e6557585a3fc0f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -30,8 +30,10 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.client.QueryResult; import org.apache.rocketmq.client.Validators; import org.apache.rocketmq.client.common.ClientErrorCode; @@ -101,6 +103,10 @@ public class DefaultMQProducerImpl implements MQProducerInner { private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy(); + private final BlockingQueue asyncSenderThreadPoolQueue; + private final ExecutorService defaultAsyncSenderExecutor; + private ExecutorService asyncSenderExecutor; + public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) { this(defaultMQProducer, null); } @@ -108,6 +114,22 @@ public class DefaultMQProducerImpl implements MQProducerInner { public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) { this.defaultMQProducer = defaultMQProducer; this.rpcHook = rpcHook; + + this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue(50000); + this.defaultAsyncSenderExecutor = new ThreadPoolExecutor( + Runtime.getRuntime().availableProcessors(), + Runtime.getRuntime().availableProcessors(), + 1000 * 60, + TimeUnit.MILLISECONDS, + this.asyncSenderThreadPoolQueue, + new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet()); + } + }); } public void registerCheckForbiddenHook(CheckForbiddenHook checkForbiddenHook) { @@ -456,7 +478,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { public void send(final Message msg, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException { final long beginStartTime = System.currentTimeMillis(); - ExecutorService executor = this.getCallbackExecutor(); + ExecutorService executor = this.getAsyncSenderExecutor(); try { executor.submit(new Runnable() { @Override @@ -957,7 +979,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { public void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException { final long beginStartTime = System.currentTimeMillis(); - ExecutorService executor = this.getCallbackExecutor(); + ExecutorService executor = this.getAsyncSenderExecutor(); try { executor.submit(new Runnable() { @Override @@ -1079,7 +1101,7 @@ public class DefaultMQProducerImpl implements MQProducerInner { public void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, InterruptedException { final long beginStartTime = System.currentTimeMillis(); - ExecutorService executor = this.getCallbackExecutor(); + ExecutorService executor = this.getAsyncSenderExecutor(); try { executor.submit(new Runnable() { @Override @@ -1243,9 +1265,13 @@ public class DefaultMQProducerImpl implements MQProducerInner { public void setCallbackExecutor(final ExecutorService callbackExecutor) { this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor); } - public ExecutorService getCallbackExecutor() { - return this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().getCallbackExecutor(); + public ExecutorService getAsyncSenderExecutor() { + return null == asyncSenderExecutor ? defaultAsyncSenderExecutor : asyncSenderExecutor; + } + + public void setAsyncSenderExecutor(ExecutorService asyncSenderExecutor) { + this.asyncSenderExecutor = asyncSenderExecutor; } public SendResult send(Message msg, diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 9732d0eb84458062d203db0af55df8e37043b1a3..f57e52c3efaa93abe55b1d6db40d0d085c435066 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -655,6 +655,16 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { this.defaultMQProducerImpl.setCallbackExecutor(callbackExecutor); } + /** + * Sets an Executor to be used for executing asynchronous send. If the Executor is not set, {@link + * DefaultMQProducerImpl#defaultAsyncSenderExecutor} will be used. + * + * @param asyncSenderExecutor the instance of Executor + */ + public void setAsyncSenderExecutor(final ExecutorService asyncSenderExecutor) { + this.defaultMQProducerImpl.setAsyncSenderExecutor(asyncSenderExecutor); + } + private MessageBatch batch(Collection msgs) throws MQClientException { MessageBatch msgBatch; try { diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index c225afd6842b24adc49818575d54a2d5ca5e016d..9540755fe339f37651f983fbe1ab524e7fa94004 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -167,10 +167,7 @@ public class DefaultMQProducerTest { @Test public void testSendMessageAsync_Success() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { - ExecutorService callbackExecutor = Executors.newSingleThreadExecutor(); final CountDownLatch countDownLatch = new CountDownLatch(1); - when(mQClientAPIImpl.getRemotingClient()).thenReturn((nettyRemotingClient)); - when(nettyRemotingClient.getCallbackExecutor()).thenReturn(callbackExecutor); producer.send(message, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { @@ -186,15 +183,11 @@ public class DefaultMQProducerTest { } }); countDownLatch.await(3000L, TimeUnit.MILLISECONDS); - callbackExecutor.shutdown(); } @Test public void testSendMessageAsync() throws RemotingException, MQClientException, InterruptedException { final AtomicInteger cc = new AtomicInteger(0); final CountDownLatch countDownLatch = new CountDownLatch(6); - ExecutorService callbackExecutor = Executors.newSingleThreadExecutor(); - when(mQClientAPIImpl.getRemotingClient()).thenReturn((nettyRemotingClient)); - when(nettyRemotingClient.getCallbackExecutor()).thenReturn(callbackExecutor); SendCallback sendCallback = new SendCallback() { @Override @@ -226,16 +219,13 @@ public class DefaultMQProducerTest { producer.send(message,messageQueueSelector,null,sendCallback,1000); countDownLatch.await(3000L, TimeUnit.MILLISECONDS); - callbackExecutor.shutdown(); assertThat(cc.get()).isEqualTo(6); } @Test public void testSendMessageAsync_BodyCompressed() throws RemotingException, InterruptedException, MQBrokerException, MQClientException { - ExecutorService callbackExecutor = Executors.newSingleThreadExecutor(); + final CountDownLatch countDownLatch = new CountDownLatch(1); - when(mQClientAPIImpl.getRemotingClient()).thenReturn((nettyRemotingClient)); - when(nettyRemotingClient.getCallbackExecutor()).thenReturn(callbackExecutor); producer.send(bigMessage, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { @@ -251,7 +241,6 @@ public class DefaultMQProducerTest { } }); countDownLatch.await(3000L, TimeUnit.MILLISECONDS); - callbackExecutor.shutdown(); } @Test diff --git a/common/src/main/java/org/apache/rocketmq/common/constant/DBMsgConstants.java b/common/src/main/java/org/apache/rocketmq/common/constant/DBMsgConstants.java index 8273aaa788b4516b494c01b2b00f71554ffaee96..8d2b34497657d8152fa519c2569dc9116d782624 100644 --- a/common/src/main/java/org/apache/rocketmq/common/constant/DBMsgConstants.java +++ b/common/src/main/java/org/apache/rocketmq/common/constant/DBMsgConstants.java @@ -18,5 +18,5 @@ package org.apache.rocketmq.common.constant; public class DBMsgConstants { - public static final int MAX_BODY_SIZE = 64 * 1024 * 1204; //64KB + public static final int MAX_BODY_SIZE = 64 * 1024 * 1024; //64KB } diff --git a/docs/cn/index.md b/docs/cn/index.md new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/docs/en/index.md b/docs/en/index.md new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index 02aa84a3e6b590b43901e529b810b4812144ff72..8d60321ed8f2529699e07e8b96f845bd81785665 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -604,7 +604,7 @@ public class MessageStoreConfig { } /** - * Enable transient commitLog store poll only if transientStorePoolEnable is true and the FlushDiskType is + * Enable transient commitLog store pool only if transientStorePoolEnable is true and the FlushDiskType is * ASYNC_FLUSH * * @return true or false diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java index 79f15dcc99da9ea04b20c756ffc12ddc03199b53..a0f6555ced072f0f28fb78bdff3bd7735a910cde 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java @@ -17,12 +17,18 @@ package org.apache.rocketmq.test.client.consumer.filter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.log4j.Logger; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.test.base.BaseConf; -import org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT; -import org.apache.rocketmq.test.client.consumer.broadcast.normal.NormalMsgTwoSameGroupConsumerIT; -import org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer; import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; import org.apache.rocketmq.test.client.rmq.RMQSqlConsumer; import org.apache.rocketmq.test.factory.ConsumerFactory; @@ -39,12 +45,14 @@ public class SqlFilterIT extends BaseConf { private static Logger logger = Logger.getLogger(SqlFilterIT.class); private RMQNormalProducer producer = null; private String topic = null; + private static final Map OFFSE_TABLE = new HashMap(); @Before public void setUp() { topic = initTopic(); logger.info(String.format("use topic: %s;", topic)); producer = getProducer(nsAddr, topic); + OFFSE_TABLE.clear(); } @After @@ -71,4 +79,65 @@ public class SqlFilterIT extends BaseConf { assertThat(consumer.getListener().getAllMsgBody().size()).isEqualTo(msgSize * 2); } + + @Test + public void testFilterPullConsumer() throws Exception { + int msgSize = 16; + + String group = initConsumerGroup(); + MessageSelector selector = MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))"); + DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group); + consumer.setNamesrvAddr(nsAddr); + consumer.start(); + Thread.sleep(3000); + producer.send("TagA", msgSize); + producer.send("TagB", msgSize); + producer.send("TagC", msgSize); + Assert.assertEquals("Not all sent succeeded", msgSize * 3, producer.getAllUndupMsgBody().size()); + + List receivedMessage = new ArrayList<>(2); + Set mqs = consumer.fetchSubscribeMessageQueues(topic); + for (MessageQueue mq : mqs) { + SINGLE_MQ: + while (true) { + try { + PullResult pullResult = + consumer.pull(mq, selector, getMessageQueueOffset(mq), 32); + putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); + switch (pullResult.getPullStatus()) { + case FOUND: + List msgs = pullResult.getMsgFoundList(); + for (MessageExt msg : msgs) { + receivedMessage.add(new String(msg.getBody())); + } + break; + case NO_MATCHED_MSG: + break; + case NO_NEW_MSG: + break SINGLE_MQ; + case OFFSET_ILLEGAL: + break; + default: + break; + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + assertThat(receivedMessage.size()).isEqualTo(msgSize * 2); + } + + private static long getMessageQueueOffset(MessageQueue mq) { + Long offset = OFFSE_TABLE.get(mq); + if (offset != null) + return offset; + + return 0; + } + + private static void putMessageQueueOffset(MessageQueue mq, long offset) { + OFFSE_TABLE.put(mq, offset); + } } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java index a1b3c1a227d60e83fe2d2e45d9ff0c642313ab15..b946ee141ebc507da3df465b9a9fa31c043790a7 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java @@ -155,7 +155,7 @@ public class ConsumerProgressSubCommand implements SubCommand { } System.out.printf("%n"); - System.out.printf("Consume TPS: %s%n", consumeStats.getConsumeTps()); + System.out.printf("Consume TPS: %.2f%n", consumeStats.getConsumeTps()); System.out.printf("Diff Total: %d%n", diffTotal); } else { System.out.printf("%-32s %-6s %-24s %-5s %-14s %-7s %s%n",