提交 4649a8ed 编写于 作者: D duhenglucky

Add sql filter support for snode

上级 044222b5
......@@ -49,10 +49,10 @@ import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.netty.RequestTask;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
......@@ -113,7 +113,7 @@ public class SnodePullMessageProcessor implements RequestProcessor {
}
} catch (Exception e) {
log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(),
requestHeader.getConsumerGroup());
requestHeader.getConsumerGroup(), e);
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark(e.getMessage());
return response;
......
......@@ -220,7 +220,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
/**
* Whether update subscription relationship when every pull
*/
private boolean postSubscriptionWhenPull = false;
private boolean postSubscriptionWhenPull = true;
/**
* Whether the unit of subscription group
......
......@@ -230,6 +230,7 @@ public class SnodeController {
this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, heartbeatProcessor, this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
......
......@@ -78,6 +78,7 @@ public class SnodeConfig {
private int listenPort = 11911;
private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
private boolean enablePropertyFilter = true;
public void setSnodeHeartBeatInterval(long snodeHeartBeatInterval) {
this.snodeHeartBeatInterval = snodeHeartBeatInterval;
......@@ -291,4 +292,12 @@ public class SnodeConfig {
public void setSlowConsumerThreshold(int slowConsumerThreshold) {
this.slowConsumerThreshold = slowConsumerThreshold;
}
public boolean isEnablePropertyFilter() {
return enablePropertyFilter;
}
public void setEnablePropertyFilter(boolean enablePropertyFilter) {
this.enablePropertyFilter = enablePropertyFilter;
}
}
......@@ -21,14 +21,18 @@ import io.netty.util.Attribute;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.filter.ExpressionType;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody;
import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
import org.apache.rocketmq.common.protocol.header.UnregisterClientResponseHeader;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.common.protocol.heartbeat.ProducerData;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.filter.FilterFactory;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
......@@ -57,6 +61,8 @@ public class HeartbeatProcessor implements RequestProcessor {
return register(remotingChannel, request);
case RequestCode.UNREGISTER_CLIENT:
return unregister(remotingChannel, request);
case RequestCode.CHECK_CLIENT_CONFIG:
return this.checkClientConfig(remotingChannel, request);
default:
break;
}
......@@ -131,6 +137,43 @@ public class HeartbeatProcessor implements RequestProcessor {
return response;
}
public RemotingCommand checkClientConfig(RemotingChannel ctx, RemotingCommand request) {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
CheckClientRequestBody requestBody = CheckClientRequestBody.decode(request.getBody(),
CheckClientRequestBody.class);
if (requestBody != null && requestBody.getSubscriptionData() != null) {
SubscriptionData subscriptionData = requestBody.getSubscriptionData();
if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
if (!this.snodeController.getSnodeConfig().isEnablePropertyFilter()) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("The snode does not support consumer to filter message by " + subscriptionData.getExpressionType());
return response;
}
try {
FilterFactory.INSTANCE.get(subscriptionData.getExpressionType()).compile(subscriptionData.getSubString());
} catch (Exception e) {
log.warn("Client {}@{} filter message, but failed to compile expression! sub={}, error={}",
requestBody.getClientId(), requestBody.getGroup(), requestBody.getSubscriptionData(), e.getMessage());
response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);
response.setRemark(e.getMessage());
return response;
}
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
@Override
public boolean rejectRequest() {
return false;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册