diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 4077bf459f7a857398c277c2413dec10eb25e5e6..12189562c49db4757e37c67c340a5199d754f379 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -513,8 +513,8 @@ public class BrokerController { */ this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor); this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList); -// this.remotingServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, this.snodePullMessageProcessor, pullMessageExecutor); -// this.snodePullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList); + this.remotingServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, this.snodePullMessageProcessor, pullMessageExecutor); + this.snodePullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList); /** * QueryMessageProcessor */ diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java index 74a0176775ebfc8455702a233b206814a88b4bac..ee67e21a84b640bd59e89dcb9296144ceda37005 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SnodePullMessageProcessor.java @@ -49,10 +49,10 @@ import org.apache.rocketmq.common.sysflag.PullSysFlag; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; +import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; import org.apache.rocketmq.remoting.netty.RequestTask; import org.apache.rocketmq.remoting.protocol.RemotingCommand; @@ -113,7 +113,7 @@ public class SnodePullMessageProcessor implements RequestProcessor { } } catch (Exception e) { log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), - requestHeader.getConsumerGroup()); + requestHeader.getConsumerGroup(), e); response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); response.setRemark(e.getMessage()); return response; diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index d51030a159240cd9b34aa4dfd9ca438b013cf577..92c7c18e35787050a4bb54c7533d6025528fd12d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -220,7 +220,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume /** * Whether update subscription relationship when every pull */ - private boolean postSubscriptionWhenPull = false; + private boolean postSubscriptionWhenPull = true; /** * Whether the unit of subscription group diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java index b772b84fa23edfca613a6f2bf4314a7e7476d2a2..b92b6bedd3ac81aa93022e4d35414bda121fe35e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java @@ -31,7 +31,6 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -import org.apache.rocketmq.common.protocol.heartbeat.PushSubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; public class RebalancePushImpl extends RebalanceImpl { @@ -55,8 +54,7 @@ public class RebalancePushImpl extends RebalanceImpl { * When rebalance result changed, should update subscription's version to notify broker. * Fix: inconsistency subscription may lead to consumer miss messages. */ - SubscriptionData sub = this.subscriptionInner.get(topic); - PushSubscriptionData subscriptionData = (PushSubscriptionData) sub; + SubscriptionData subscriptionData = this.subscriptionInner.get(topic); long newVersion = System.currentTimeMillis(); log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion); subscriptionData.setSubVersion(newVersion); diff --git a/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java b/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java index 8713a5d507569562b067fae61c50a7d38dff586e..e4267f48a9b08aae934dafcb2d955dd3bec94188 100644 --- a/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java +++ b/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java @@ -74,6 +74,7 @@ public abstract class AbstractFlowControlService implements Interceptor { Boolean acquired = this.acquiredThreadLocal.get(); if (acquired != null && acquired) { SphO.exit(); + this.acquiredThreadLocal.remove(); } } @@ -82,6 +83,7 @@ public abstract class AbstractFlowControlService implements Interceptor { Boolean acquired = this.acquiredThreadLocal.get(); if (acquired != null && acquired) { SphO.exit(); + this.acquiredThreadLocal.remove(); } } diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/PushSubscriptionData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/PushSubscriptionData.java deleted file mode 100644 index 02245d2259308120d82cb32f092989e1e063769a..0000000000000000000000000000000000000000 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/PushSubscriptionData.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.common.protocol.heartbeat; - -import java.util.HashSet; -import java.util.Set; -import org.apache.rocketmq.common.message.MessageQueue; - -public class PushSubscriptionData extends SubscriptionData { - private Set messageQueueSet = new HashSet(); - - public Set getMessageQueueSet() { - return messageQueueSet; - } - - public void setMessageQueueSet(Set messageQueueSet) { - this.messageQueueSet = messageQueueSet; - } -} diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java index 4b0a303975e8b97bf13fe36daea8f9ebd41a3504..be88ff3bf0e4b5cb0b8bf758ab44a6d01e32c142 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java @@ -25,12 +25,14 @@ import org.apache.rocketmq.common.filter.ExpressionType; import java.util.HashSet; import java.util.Set; +import org.apache.rocketmq.common.message.MessageQueue; public class SubscriptionData implements Comparable { public final static String SUB_ALL = "*"; private boolean classFilterMode = false; private String topic; private String subString; + private Set messageQueueSet = new HashSet(); private Set tagsSet = new HashSet(); private Set codeSet = new HashSet(); private long subVersion = System.currentTimeMillis(); @@ -113,6 +115,14 @@ public class SubscriptionData implements Comparable { this.expressionType = expressionType; } + public Set getMessageQueueSet() { + return messageQueueSet; + } + + public void setMessageQueueSet(Set messageQueueSet) { + this.messageQueueSet = messageQueueSet; + } + @Override public int hashCode() { final int prime = 31; diff --git a/remoting/src/test/resources/service/org.apache.rocketmq.remoting.RemotingClient b/remoting/src/test/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingClient similarity index 100% rename from remoting/src/test/resources/service/org.apache.rocketmq.remoting.RemotingClient rename to remoting/src/test/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingClient diff --git a/remoting/src/test/resources/service/org.apache.rocketmq.remoting.RemotingServer b/remoting/src/test/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingServer similarity index 100% rename from remoting/src/test/resources/service/org.apache.rocketmq.remoting.RemotingServer rename to remoting/src/test/resources/META-INF/service/org.apache.rocketmq.remoting.RemotingServer diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java index 33f9bbdbeda18aa57fb3c4c555e9f41806140893..d3a7c0c9353c79ae58582702d994a0b151eda7e3 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java @@ -275,6 +275,7 @@ public class SnodeController { this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor); this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, this.heartbeatExecutor); this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, this.heartbeatExecutor); + this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, this.heartbeatExecutor); this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor); this.snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java index 42ceacf254848dd67516c95131fb0a1bfbce7d47..26112efc6cf78e04ca76bc5571357edbd6f3bd8f 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/ClientHousekeepingService.java @@ -54,7 +54,10 @@ public class ClientHousekeepingService implements ChannelEventListener { Channel channel = ((NettyChannelImpl) remotingChannel).getChannel(); Attribute clientAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ATTRIBUTE_KEY); if (clientAttribute != null) { - return clientAttribute.get().getClientRole(); + Client client = clientAttribute.get(); + if (client != null) { + return client.getClientRole(); + } } } log.warn("RemotingChannel type error: {}", remotingChannel.getClass()); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java index 729f282be22c6f1ad8a2e2f009d4c4856a06a2a6..72cfd6afacc5a701ffb9290de62fca56e24146fb 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java @@ -26,7 +26,6 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -import org.apache.rocketmq.common.protocol.heartbeat.PushSubscriptionData; import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; @@ -47,8 +46,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager { String groupId) { Set prevSubSet = this.clientSubscriptionTable.get(remotingChannel); Set keySet = new HashSet<>(); - for (SubscriptionData tmp : subscriptionDataSet) { - PushSubscriptionData subscriptionData = (PushSubscriptionData) tmp; + for (SubscriptionData subscriptionData : subscriptionDataSet) { if (subscriptionData.getTopic() != null && subscriptionData.getMessageQueueSet() != null && remotingChannel != null) { for (MessageQueue messageQueue : subscriptionData.getMessageQueueSet()) { keySet.add(messageQueue); @@ -58,7 +56,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager { Set prev = pushTable.putIfAbsent(messageQueue, clientSet); clientSet = prev != null ? prev : clientSet; } - log.info("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress()); + log.debug("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress()); clientSet.add(remotingChannel); } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java b/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java index 408b53093497298d425c8fe832d0b1b340281120..0d41501f7dc7eae745288bfdb439d350cd8a1eb0 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java @@ -78,6 +78,7 @@ public class SnodeConfig { private int listenPort = 11911; private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true")); + private boolean enablePropertyFilter = true; /** * Acl feature switch @@ -298,6 +299,14 @@ public class SnodeConfig { this.slowConsumerThreshold = slowConsumerThreshold; } + public boolean isEnablePropertyFilter() { + return enablePropertyFilter; + } + + public void setEnablePropertyFilter(boolean enablePropertyFilter) { + this.enablePropertyFilter = enablePropertyFilter; + } + public boolean isAclEnable() { return aclEnable; } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java index 435d165af75454d8db851d6a4f65f903e9ec6edf..a36704c2651b58b8d732a922e8e9f8d369ea366f 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/HeartbeatProcessor.java @@ -21,14 +21,18 @@ import io.netty.util.Attribute; import java.util.HashSet; import java.util.Set; import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.filter.ExpressionType; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody; import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader; import org.apache.rocketmq.common.protocol.header.UnregisterClientResponseHeader; import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData; import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData; import org.apache.rocketmq.common.protocol.heartbeat.ProducerData; +import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.filter.FilterFactory; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; @@ -57,6 +61,8 @@ public class HeartbeatProcessor implements RequestProcessor { return register(remotingChannel, request); case RequestCode.UNREGISTER_CLIENT: return unregister(remotingChannel, request); + case RequestCode.CHECK_CLIENT_CONFIG: + return this.checkClientConfig(remotingChannel, request); default: break; } @@ -74,14 +80,12 @@ public class HeartbeatProcessor implements RequestProcessor { Client client = new Client(); client.setClientId(heartbeatData.getClientID()); client.setRemotingChannel(remotingChannel); - Set groupSet = new HashSet<>(); for (ProducerData producerData : heartbeatData.getProducerDataSet()) { client.setClientRole(ClientRole.Producer); - groupSet.add(producerData.getGroupName()); this.snodeController.getProducerManager().register(producerData.getGroupName(), client); } - log.info("Heartbeat consumerData: {}", heartbeatData.getConsumerDataSet()); + Set groupSet = new HashSet<>(); for (ConsumerData consumerData : heartbeatData.getConsumerDataSet()) { client.setClientRole(ClientRole.Consumer); groupSet.add(consumerData.getGroupName()); @@ -133,6 +137,43 @@ public class HeartbeatProcessor implements RequestProcessor { return response; } + public RemotingCommand checkClientConfig(RemotingChannel ctx, RemotingCommand request) { + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + + CheckClientRequestBody requestBody = CheckClientRequestBody.decode(request.getBody(), + CheckClientRequestBody.class); + + if (requestBody != null && requestBody.getSubscriptionData() != null) { + SubscriptionData subscriptionData = requestBody.getSubscriptionData(); + + if (ExpressionType.isTagType(subscriptionData.getExpressionType())) { + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + + if (!this.snodeController.getSnodeConfig().isEnablePropertyFilter()) { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark("The snode does not support consumer to filter message by " + subscriptionData.getExpressionType()); + return response; + } + + try { + FilterFactory.INSTANCE.get(subscriptionData.getExpressionType()).compile(subscriptionData.getSubString()); + } catch (Exception e) { + log.warn("Client {}@{} filter message, but failed to compile expression! sub={}, error={}", + requestBody.getClientId(), requestBody.getGroup(), requestBody.getSubscriptionData(), e.getMessage()); + response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); + response.setRemark(e.getMessage()); + return response; + } + } + + response.setCode(ResponseCode.SUCCESS); + response.setRemark(null); + return response; + } + @Override public boolean rejectRequest() { return false; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java index 9883dd0e2389e3bcb7e69905736af5bd02066868..da93a4d118c10bf3c9e8906b5952ac20b81e86a6 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java @@ -105,7 +105,7 @@ public class PushServiceImpl implements PushService { boolean slowConsumer = snodeController.getSlowConsumerService().isSlowConsumer(sendMessageResponseHeader.getQueueOffset(), topic, queueId, consumerGroup, enodeName); if (slowConsumer) { log.warn("[SlowConsumer]: {} closed as slow consumer", remotingChannel);//TODO metrics - remotingChannel.close();//FIXME this action should be discussed + remotingChannel.close(); //FIXME this action should be discussed continue; } snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.DEFAULT_TIMEOUT_MILLS);