diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java index 2f94de20c34213d426a91f913ce6abb7fed6f690..64c28ece3e6983524281d8859886b9cdfe9ee7a3 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filter/ExpressionMessageFilter.java @@ -70,7 +70,7 @@ public class ExpressionMessageFilter implements MessageFilter { // by tags code. if (ExpressionType.isTagType(subscriptionData.getExpressionType())) { - if (tagsCode == null || tagsCode < 0L) { + if (tagsCode == null) { return true; } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java index 7978942e5bbdd5f9cbd32176153d0819da221518..e544d90a124a0d4044b4087ab2c7cfaa05edfd22 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/filter/MessageStoreWithFilterTest.java @@ -24,12 +24,14 @@ import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.store.CommitLogDispatcher; +import org.apache.rocketmq.store.ConsumeQueueExt; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DispatchRequest; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageStatus; import org.apache.rocketmq.store.MessageArrivingListener; import org.apache.rocketmq.store.MessageExtBrokerInner; +import org.apache.rocketmq.store.MessageFilter; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.stats.BrokerStatsManager; @@ -77,24 +79,17 @@ public class MessageStoreWithFilterTest { try { StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123); } catch (UnknownHostException e) { - e.printStackTrace(); } try { BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0); } catch (UnknownHostException e) { - e.printStackTrace(); } } @Before - public void init() { + public void init() throws Exception { filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic); - try { - master = gen(filterManager); - } catch (Exception e) { - e.printStackTrace(); - assertThat(true).isFalse(); - } + master = gen(filterManager); } @After @@ -107,7 +102,7 @@ public class MessageStoreWithFilterTest { public MessageExtBrokerInner buildMessage() { MessageExtBrokerInner msg = new MessageExtBrokerInner(); msg.setTopic(topic); - msg.setTags("TAG1"); + msg.setTags(System.currentTimeMillis() + "TAG"); msg.setKeys("Hello"); msg.setBody(msgBody); msg.setKeys(String.valueOf(System.currentTimeMillis())); @@ -125,7 +120,7 @@ public class MessageStoreWithFilterTest { } public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize, - boolean enableCqExt, int cqExtFileSize) { + boolean enableCqExt, int cqExtFileSize) { MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); messageStoreConfig.setMapedFileSizeCommitLog(commitLogFileSize); messageStoreConfig.setMapedFileSizeConsumeQueue(cqFileSize); @@ -155,9 +150,7 @@ public class MessageStoreWithFilterTest { new MessageArrivingListener() { @Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode, - long msgStoreTime, byte[] filterBitMap, Map properties) { -// System.out.println(String.format("Msg coming: %s, %d, %d, %d", -// topic, queueId, logicOffset, tagsCode)); + long msgStoreTime, byte[] filterBitMap, Map properties) { } } , brokerConfig); @@ -166,8 +159,6 @@ public class MessageStoreWithFilterTest { @Override public void dispatch(DispatchRequest request) { try { -// System.out.println(String.format("offset:%d, bitMap:%s", request.getCommitLogOffset(), -// BitsArray.create(request.getBitMap()).toString())); } catch (Throwable e) { e.printStackTrace(); } @@ -183,7 +174,7 @@ public class MessageStoreWithFilterTest { } protected List putMsg(DefaultMessageStore master, int topicCount, - int msgCountPerTopic) throws Exception { + int msgCountPerTopic) throws Exception { List msgs = new ArrayList(); for (int i = 0; i < topicCount; i++) { String realTopic = topic + i; @@ -229,22 +220,10 @@ public class MessageStoreWithFilterTest { } @Test - public void testGetMessage_withFilterBitMapAndConsumerChanged() { - List msgs = null; - try { - msgs = putMsg(master, topicCount, msgPerTopic); - } catch (Exception e) { - e.printStackTrace(); - assertThat(true).isFalse(); - } + public void testGetMessage_withFilterBitMapAndConsumerChanged() throws Exception { + List msgs = putMsg(master, topicCount, msgPerTopic); - // sleep to wait for consume queue has been constructed. - try { - Thread.sleep(200); - } catch (InterruptedException e) { - e.printStackTrace(); - assertThat(true).isFalse(); - } + Thread.sleep(200); // reset consumer; String topic = "topic" + 0; @@ -303,16 +282,10 @@ public class MessageStoreWithFilterTest { } @Test - public void testGetMessage_withFilterBitMap() { - List msgs = null; - try { - msgs = putMsg(master, topicCount, msgPerTopic); - // sleep to wait for consume queue has been constructed. - Thread.sleep(200); - } catch (Exception e) { - e.printStackTrace(); - assertThat(true).isFalse(); - } + public void testGetMessage_withFilterBitMap() throws Exception { + List msgs = putMsg(master, topicCount, msgPerTopic); + + Thread.sleep(100); for (int i = 0; i < topicCount; i++) { String realTopic = topic + i; @@ -369,4 +342,32 @@ public class MessageStoreWithFilterTest { } } } + + @Test + public void testGetMessage_withFilter_checkTagsCode() throws Exception { + putMsg(master, topicCount, msgPerTopic); + + Thread.sleep(200); + + for (int i = 0; i < topicCount; i++) { + String realTopic = topic + i; + + GetMessageResult getMessageResult = master.getMessage("test", realTopic, queueId, 0, 10000, + new MessageFilter() { + @Override + public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) { + if (tagsCode != null && tagsCode <= ConsumeQueueExt.MAX_ADDR) { + return false; + } + return true; + } + + @Override + public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map properties) { + return true; + } + }); + assertThat(getMessageResult.getMessageCount()).isEqualTo(msgPerTopic); + } + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index 379162d7ddb3db1ce78e6cd071fd0544790cb310..0bf0aa9a5b9ee8f23700b699dfd5d54a8bef714c 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -569,6 +569,6 @@ public class ConsumeQueue { * Check {@code tagsCode} is address of extend file or tags code. */ public boolean isExtAddr(long tagsCode) { - return isExtReadEnable() && this.consumeQueueExt.isExtAddr(tagsCode); + return ConsumeQueueExt.isExtAddr(tagsCode); } } diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java index a118cde73b7e946302161a4c415347c54b4ebcbe..aeb2803e23ffb2b8e50d7886640d22dc9de4dd16 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java @@ -95,7 +95,7 @@ public class ConsumeQueueExt { * Just test {@code address} is less than 0. *

*/ - public boolean isExtAddr(final long address) { + public static boolean isExtAddr(final long address) { return address <= MAX_ADDR; } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 95a017aee6ff0705599a18d5d9e172c6084dacb7..ffa8dbc2a133325835d7b0e879d2c1a164f8483a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -16,23 +16,6 @@ */ package org.apache.rocketmq.store; -import java.io.File; -import java.io.IOException; -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceThread; @@ -56,6 +39,24 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + import static org.apache.rocketmq.store.config.BrokerRole.SLAVE; public class DefaultMessageStore implements MessageStore { @@ -487,7 +488,7 @@ public class DefaultMessageStore implements MessageStore { break; } - boolean extRet = false; + boolean extRet = false, isTagsCodeLegal = true; if (consumeQueue.isExtAddr(tagsCode)) { extRet = consumeQueue.getExt(tagsCode, cqExtUnit); if (extRet) { @@ -496,11 +497,12 @@ public class DefaultMessageStore implements MessageStore { // can't find ext content.Client will filter messages by tag also. log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}", tagsCode, offsetPy, sizePy, topic, group); + isTagsCodeLegal = false; } } if (messageFilter != null - && !messageFilter.isMatchedByConsumeQueue(tagsCode, extRet ? cqExtUnit : null)) { + && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; }