提交 bc1c37aa 编写于 作者: H huzongtang

Integrate Acl for snode branch

......@@ -513,8 +513,8 @@ public class BrokerController {
*/
this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
// this.remotingServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, this.snodePullMessageProcessor, pullMessageExecutor);
// this.snodePullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, this.snodePullMessageProcessor, pullMessageExecutor);
this.snodePullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
/**
* QueryMessageProcessor
*/
......
......@@ -49,10 +49,10 @@ import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
......@@ -113,7 +113,7 @@ public class SnodePullMessageProcessor implements RequestProcessor {
}
} catch (Exception e) {
log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(),
requestHeader.getConsumerGroup());
requestHeader.getConsumerGroup(), e);
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark(e.getMessage());
return response;
......
......@@ -220,7 +220,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Whether update subscription relationship when every pull
*/
private boolean postSubscriptionWhenPull = false;
private boolean postSubscriptionWhenPull = true;
/**
* Whether the unit of subscription group
......
......@@ -31,7 +31,6 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.PushSubscriptionData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
public class RebalancePushImpl extends RebalanceImpl {
......@@ -55,8 +54,7 @@ public class RebalancePushImpl extends RebalanceImpl {
* When rebalance result changed, should update subscription's version to notify broker.
* Fix: inconsistency subscription may lead to consumer miss messages.
*/
SubscriptionData sub = this.subscriptionInner.get(topic);
PushSubscriptionData subscriptionData = (PushSubscriptionData) sub;
SubscriptionData subscriptionData = this.subscriptionInner.get(topic);
long newVersion = System.currentTimeMillis();
log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(), newVersion);
subscriptionData.setSubVersion(newVersion);
......
......@@ -74,6 +74,7 @@ public abstract class AbstractFlowControlService implements Interceptor {
Boolean acquired = this.acquiredThreadLocal.get();
if (acquired != null && acquired) {
SphO.exit();
this.acquiredThreadLocal.remove();
}
}
......@@ -82,6 +83,7 @@ public abstract class AbstractFlowControlService implements Interceptor {
Boolean acquired = this.acquiredThreadLocal.get();
if (acquired != null && acquired) {
SphO.exit();
this.acquiredThreadLocal.remove();
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.protocol.heartbeat;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.common.message.MessageQueue;
public class PushSubscriptionData extends SubscriptionData {
private Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
public Set<MessageQueue> getMessageQueueSet() {
return messageQueueSet;
}
public void setMessageQueueSet(Set<MessageQueue> messageQueueSet) {
this.messageQueueSet = messageQueueSet;
}
}
......@@ -25,12 +25,14 @@ import org.apache.rocketmq.common.filter.ExpressionType;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.common.message.MessageQueue;
public class SubscriptionData implements Comparable<SubscriptionData> {
public final static String SUB_ALL = "*";
private boolean classFilterMode = false;
private String topic;
private String subString;
private Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
private Set<String> tagsSet = new HashSet<String>();
private Set<Integer> codeSet = new HashSet<Integer>();
private long subVersion = System.currentTimeMillis();
......@@ -113,6 +115,14 @@ public class SubscriptionData implements Comparable<SubscriptionData> {
this.expressionType = expressionType;
}
public Set<MessageQueue> getMessageQueueSet() {
return messageQueueSet;
}
public void setMessageQueueSet(Set<MessageQueue> messageQueueSet) {
this.messageQueueSet = messageQueueSet;
}
@Override
public int hashCode() {
final int prime = 31;
......
......@@ -275,6 +275,7 @@ public class SnodeController {
this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
......
......@@ -54,7 +54,10 @@ public class ClientHousekeepingService implements ChannelEventListener {
Channel channel = ((NettyChannelImpl) remotingChannel).getChannel();
Attribute<Client> clientAttribute = channel.attr(SnodeConstant.NETTY_CLIENT_ATTRIBUTE_KEY);
if (clientAttribute != null) {
return clientAttribute.get().getClientRole();
Client client = clientAttribute.get();
if (client != null) {
return client.getClientRole();
}
}
}
log.warn("RemotingChannel type error: {}", remotingChannel.getClass());
......
......@@ -26,7 +26,6 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.PushSubscriptionData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
......@@ -47,8 +46,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
String groupId) {
Set<MessageQueue> prevSubSet = this.clientSubscriptionTable.get(remotingChannel);
Set<MessageQueue> keySet = new HashSet<>();
for (SubscriptionData tmp : subscriptionDataSet) {
PushSubscriptionData subscriptionData = (PushSubscriptionData) tmp;
for (SubscriptionData subscriptionData : subscriptionDataSet) {
if (subscriptionData.getTopic() != null && subscriptionData.getMessageQueueSet() != null && remotingChannel != null) {
for (MessageQueue messageQueue : subscriptionData.getMessageQueueSet()) {
keySet.add(messageQueue);
......@@ -58,7 +56,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
Set<RemotingChannel> prev = pushTable.putIfAbsent(messageQueue, clientSet);
clientSet = prev != null ? prev : clientSet;
}
log.info("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress());
log.debug("Register push session message queue: {}, group: {} remoting: {}", messageQueue, groupId, remotingChannel.remoteAddress());
clientSet.add(remotingChannel);
}
}
......
......@@ -78,6 +78,7 @@ public class SnodeConfig {
private int listenPort = 11911;
private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
private boolean enablePropertyFilter = true;
/**
* Acl feature switch
......@@ -298,6 +299,14 @@ public class SnodeConfig {
this.slowConsumerThreshold = slowConsumerThreshold;
}
public boolean isEnablePropertyFilter() {
return enablePropertyFilter;
}
public void setEnablePropertyFilter(boolean enablePropertyFilter) {
this.enablePropertyFilter = enablePropertyFilter;
}
public boolean isAclEnable() {
return aclEnable;
}
......
......@@ -21,14 +21,18 @@ import io.netty.util.Attribute;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody;
import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
import org.apache.rocketmq.common.protocol.header.UnregisterClientResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.filter.FilterFactory;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
......@@ -57,6 +61,8 @@ public class HeartbeatProcessor implements RequestProcessor {
return register(remotingChannel, request);
case RequestCode.UNREGISTER_CLIENT:
return unregister(remotingChannel, request);
case RequestCode.CHECK_CLIENT_CONFIG:
return this.checkClientConfig(remotingChannel, request);
default:
break;
}
......@@ -74,14 +80,12 @@ public class HeartbeatProcessor implements RequestProcessor {
Client client = new Client();
client.setClientId(heartbeatData.getClientID());
client.setRemotingChannel(remotingChannel);
Set<String> groupSet = new HashSet<>();
for (ProducerData producerData : heartbeatData.getProducerDataSet()) {
client.setClientRole(ClientRole.Producer);
groupSet.add(producerData.getGroupName());
this.snodeController.getProducerManager().register(producerData.getGroupName(), client);
}
log.info("Heartbeat consumerData: {}", heartbeatData.getConsumerDataSet());
Set<String> groupSet = new HashSet<>();
for (ConsumerData consumerData : heartbeatData.getConsumerDataSet()) {
client.setClientRole(ClientRole.Consumer);
groupSet.add(consumerData.getGroupName());
......@@ -133,6 +137,43 @@ public class HeartbeatProcessor implements RequestProcessor {
return response;
}
public RemotingCommand checkClientConfig(RemotingChannel ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
CheckClientRequestBody requestBody = CheckClientRequestBody.decode(request.getBody(),
CheckClientRequestBody.class);
if (requestBody != null && requestBody.getSubscriptionData() != null) {
SubscriptionData subscriptionData = requestBody.getSubscriptionData();
if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
if (!this.snodeController.getSnodeConfig().isEnablePropertyFilter()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("The snode does not support consumer to filter message by " + subscriptionData.getExpressionType());
return response;
}
try {
FilterFactory.INSTANCE.get(subscriptionData.getExpressionType()).compile(subscriptionData.getSubString());
} catch (Exception e) {
log.warn("Client {}@{} filter message, but failed to compile expression! sub={}, error={}",
requestBody.getClientId(), requestBody.getGroup(), requestBody.getSubscriptionData(), e.getMessage());
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark(e.getMessage());
return response;
}
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
@Override
public boolean rejectRequest() {
return false;
......
......@@ -105,7 +105,7 @@ public class PushServiceImpl implements PushService {
boolean slowConsumer = snodeController.getSlowConsumerService().isSlowConsumer(sendMessageResponseHeader.getQueueOffset(), topic, queueId, consumerGroup, enodeName);
if (slowConsumer) {
log.warn("[SlowConsumer]: {} closed as slow consumer", remotingChannel);//TODO metrics
remotingChannel.close();//FIXME this action should be discussed
remotingChannel.close(); //FIXME this action should be discussed
continue;
}
snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册