提交 254d4324 编写于 作者: V vsair 提交者: vsair

[ROCKETMQ-284] ExpressionMessageFilter will pass some message.

Author: vsair <liuxuedee@gmail.com>

Closes #160 from vsair/ROCKETMQ-284.
上级 84583086
...@@ -70,7 +70,7 @@ public class ExpressionMessageFilter implements MessageFilter { ...@@ -70,7 +70,7 @@ public class ExpressionMessageFilter implements MessageFilter {
// by tags code. // by tags code.
if (ExpressionType.isTagType(subscriptionData.getExpressionType())) { if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
if (tagsCode == null || tagsCode < 0L) { if (tagsCode == null) {
return true; return true;
} }
......
...@@ -24,12 +24,14 @@ import org.apache.rocketmq.common.message.MessageDecoder; ...@@ -24,12 +24,14 @@ import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.store.CommitLogDispatcher; import org.apache.rocketmq.store.CommitLogDispatcher;
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.DispatchRequest; import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus; import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageArrivingListener; import org.apache.rocketmq.store.MessageArrivingListener;
import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageFilter;
import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.stats.BrokerStatsManager;
...@@ -77,24 +79,17 @@ public class MessageStoreWithFilterTest { ...@@ -77,24 +79,17 @@ public class MessageStoreWithFilterTest {
try { try {
StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123); StoreHost = new InetSocketAddress(InetAddress.getLocalHost(), 8123);
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
e.printStackTrace();
} }
try { try {
BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0); BornHost = new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 0);
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
e.printStackTrace();
} }
} }
@Before @Before
public void init() { public void init() throws Exception {
filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic); filterManager = ConsumerFilterManagerTest.gen(topicCount, msgPerTopic);
try { master = gen(filterManager);
master = gen(filterManager);
} catch (Exception e) {
e.printStackTrace();
assertThat(true).isFalse();
}
} }
@After @After
...@@ -107,7 +102,7 @@ public class MessageStoreWithFilterTest { ...@@ -107,7 +102,7 @@ public class MessageStoreWithFilterTest {
public MessageExtBrokerInner buildMessage() { public MessageExtBrokerInner buildMessage() {
MessageExtBrokerInner msg = new MessageExtBrokerInner(); MessageExtBrokerInner msg = new MessageExtBrokerInner();
msg.setTopic(topic); msg.setTopic(topic);
msg.setTags("TAG1"); msg.setTags(System.currentTimeMillis() + "TAG");
msg.setKeys("Hello"); msg.setKeys("Hello");
msg.setBody(msgBody); msg.setBody(msgBody);
msg.setKeys(String.valueOf(System.currentTimeMillis())); msg.setKeys(String.valueOf(System.currentTimeMillis()));
...@@ -125,7 +120,7 @@ public class MessageStoreWithFilterTest { ...@@ -125,7 +120,7 @@ public class MessageStoreWithFilterTest {
} }
public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize, public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize,
boolean enableCqExt, int cqExtFileSize) { boolean enableCqExt, int cqExtFileSize) {
MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setMapedFileSizeCommitLog(commitLogFileSize); messageStoreConfig.setMapedFileSizeCommitLog(commitLogFileSize);
messageStoreConfig.setMapedFileSizeConsumeQueue(cqFileSize); messageStoreConfig.setMapedFileSizeConsumeQueue(cqFileSize);
...@@ -155,9 +150,7 @@ public class MessageStoreWithFilterTest { ...@@ -155,9 +150,7 @@ public class MessageStoreWithFilterTest {
new MessageArrivingListener() { new MessageArrivingListener() {
@Override @Override
public void arriving(String topic, int queueId, long logicOffset, long tagsCode, public void arriving(String topic, int queueId, long logicOffset, long tagsCode,
long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) {
// System.out.println(String.format("Msg coming: %s, %d, %d, %d",
// topic, queueId, logicOffset, tagsCode));
} }
} }
, brokerConfig); , brokerConfig);
...@@ -166,8 +159,6 @@ public class MessageStoreWithFilterTest { ...@@ -166,8 +159,6 @@ public class MessageStoreWithFilterTest {
@Override @Override
public void dispatch(DispatchRequest request) { public void dispatch(DispatchRequest request) {
try { try {
// System.out.println(String.format("offset:%d, bitMap:%s", request.getCommitLogOffset(),
// BitsArray.create(request.getBitMap()).toString()));
} catch (Throwable e) { } catch (Throwable e) {
e.printStackTrace(); e.printStackTrace();
} }
...@@ -183,7 +174,7 @@ public class MessageStoreWithFilterTest { ...@@ -183,7 +174,7 @@ public class MessageStoreWithFilterTest {
} }
protected List<MessageExtBrokerInner> putMsg(DefaultMessageStore master, int topicCount, protected List<MessageExtBrokerInner> putMsg(DefaultMessageStore master, int topicCount,
int msgCountPerTopic) throws Exception { int msgCountPerTopic) throws Exception {
List<MessageExtBrokerInner> msgs = new ArrayList<MessageExtBrokerInner>(); List<MessageExtBrokerInner> msgs = new ArrayList<MessageExtBrokerInner>();
for (int i = 0; i < topicCount; i++) { for (int i = 0; i < topicCount; i++) {
String realTopic = topic + i; String realTopic = topic + i;
...@@ -229,22 +220,10 @@ public class MessageStoreWithFilterTest { ...@@ -229,22 +220,10 @@ public class MessageStoreWithFilterTest {
} }
@Test @Test
public void testGetMessage_withFilterBitMapAndConsumerChanged() { public void testGetMessage_withFilterBitMapAndConsumerChanged() throws Exception {
List<MessageExtBrokerInner> msgs = null; List<MessageExtBrokerInner> msgs = putMsg(master, topicCount, msgPerTopic);
try {
msgs = putMsg(master, topicCount, msgPerTopic);
} catch (Exception e) {
e.printStackTrace();
assertThat(true).isFalse();
}
// sleep to wait for consume queue has been constructed. Thread.sleep(200);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
assertThat(true).isFalse();
}
// reset consumer; // reset consumer;
String topic = "topic" + 0; String topic = "topic" + 0;
...@@ -303,16 +282,10 @@ public class MessageStoreWithFilterTest { ...@@ -303,16 +282,10 @@ public class MessageStoreWithFilterTest {
} }
@Test @Test
public void testGetMessage_withFilterBitMap() { public void testGetMessage_withFilterBitMap() throws Exception {
List<MessageExtBrokerInner> msgs = null; List<MessageExtBrokerInner> msgs = putMsg(master, topicCount, msgPerTopic);
try {
msgs = putMsg(master, topicCount, msgPerTopic); Thread.sleep(100);
// sleep to wait for consume queue has been constructed.
Thread.sleep(200);
} catch (Exception e) {
e.printStackTrace();
assertThat(true).isFalse();
}
for (int i = 0; i < topicCount; i++) { for (int i = 0; i < topicCount; i++) {
String realTopic = topic + i; String realTopic = topic + i;
...@@ -369,4 +342,32 @@ public class MessageStoreWithFilterTest { ...@@ -369,4 +342,32 @@ public class MessageStoreWithFilterTest {
} }
} }
} }
@Test
public void testGetMessage_withFilter_checkTagsCode() throws Exception {
putMsg(master, topicCount, msgPerTopic);
Thread.sleep(200);
for (int i = 0; i < topicCount; i++) {
String realTopic = topic + i;
GetMessageResult getMessageResult = master.getMessage("test", realTopic, queueId, 0, 10000,
new MessageFilter() {
@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
if (tagsCode != null && tagsCode <= ConsumeQueueExt.MAX_ADDR) {
return false;
}
return true;
}
@Override
public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
return true;
}
});
assertThat(getMessageResult.getMessageCount()).isEqualTo(msgPerTopic);
}
}
} }
...@@ -569,6 +569,6 @@ public class ConsumeQueue { ...@@ -569,6 +569,6 @@ public class ConsumeQueue {
* Check {@code tagsCode} is address of extend file or tags code. * Check {@code tagsCode} is address of extend file or tags code.
*/ */
public boolean isExtAddr(long tagsCode) { public boolean isExtAddr(long tagsCode) {
return isExtReadEnable() && this.consumeQueueExt.isExtAddr(tagsCode); return ConsumeQueueExt.isExtAddr(tagsCode);
} }
} }
...@@ -95,7 +95,7 @@ public class ConsumeQueueExt { ...@@ -95,7 +95,7 @@ public class ConsumeQueueExt {
* Just test {@code address} is less than 0. * Just test {@code address} is less than 0.
* </p> * </p>
*/ */
public boolean isExtAddr(final long address) { public static boolean isExtAddr(final long address) {
return address <= MAX_ADDR; return address <= MAX_ADDR;
} }
......
...@@ -16,23 +16,6 @@ ...@@ -16,23 +16,6 @@
*/ */
package org.apache.rocketmq.store; package org.apache.rocketmq.store;
import java.io.File;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.ServiceThread; import org.apache.rocketmq.common.ServiceThread;
...@@ -56,6 +39,24 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager; ...@@ -56,6 +39,24 @@ import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.rocketmq.store.config.BrokerRole.SLAVE; import static org.apache.rocketmq.store.config.BrokerRole.SLAVE;
public class DefaultMessageStore implements MessageStore { public class DefaultMessageStore implements MessageStore {
...@@ -487,7 +488,7 @@ public class DefaultMessageStore implements MessageStore { ...@@ -487,7 +488,7 @@ public class DefaultMessageStore implements MessageStore {
break; break;
} }
boolean extRet = false; boolean extRet = false, isTagsCodeLegal = true;
if (consumeQueue.isExtAddr(tagsCode)) { if (consumeQueue.isExtAddr(tagsCode)) {
extRet = consumeQueue.getExt(tagsCode, cqExtUnit); extRet = consumeQueue.getExt(tagsCode, cqExtUnit);
if (extRet) { if (extRet) {
...@@ -496,11 +497,12 @@ public class DefaultMessageStore implements MessageStore { ...@@ -496,11 +497,12 @@ public class DefaultMessageStore implements MessageStore {
// can't find ext content.Client will filter messages by tag also. // can't find ext content.Client will filter messages by tag also.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}", log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",
tagsCode, offsetPy, sizePy, topic, group); tagsCode, offsetPy, sizePy, topic, group);
isTagsCodeLegal = false;
} }
} }
if (messageFilter != null if (messageFilter != null
&& !messageFilter.isMatchedByConsumeQueue(tagsCode, extRet ? cqExtUnit : null)) { && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
if (getResult.getBufferTotalSize() == 0) { if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE; status = GetMessageStatus.NO_MATCHED_MESSAGE;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册