提交 82f53573 编写于 作者: D duhenglucky

Modify unit test

上级 94dc4427
......@@ -44,6 +44,7 @@ public class PlainAccessValidatorTest {
@Before
public void init() {
System.setProperty("rocketmq.home.dir", "src/test/resources");
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl.yml");
plainAccessValidator = new PlainAccessValidator();
sessionCredentials = new SessionCredentials();
sessionCredentials.setAccessKey("RocketMQ");
......
......@@ -28,7 +28,6 @@ import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.acl.common.AclException;
import org.apache.rocketmq.acl.common.Permission;
import org.apache.rocketmq.acl.plain.PlainPermissionLoader.PlainAccessConfig;
import org.apache.rocketmq.common.UtilAll;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
......@@ -269,7 +268,6 @@ public class PlainPermissionLoaderTest {
public void initializeTest() {
System.setProperty("rocketmq.acl.plain.file", "/conf/plain_acl_null.yml");
new PlainPermissionLoader();
}
}
snode:
countLimit: # flow control type, only requestCount & requestSize support
countLimit: # Flow control type, only requestCount & requestSize support
- flowControlResourceName: 310
flowControlGrade: 1
flowControlBehavior: 1
......
......@@ -20,7 +20,6 @@ import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
......@@ -34,12 +33,10 @@ public interface EnodeService {
CompletableFuture<RemotingCommand> pullMessage(final String enodeName, final RemotingCommand request);
void notifyConsumerIdsChanged(final RemotingChannel channel, final String consumerGroup);
RemotingCommand creatTopic(String enodeName,
TopicConfig topicConfig) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException,
void updateEnodeAddress(String clusterName) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException;
boolean persistSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig);
......
......@@ -19,7 +19,15 @@ package org.apache.rocketmq.snode.service;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface PushService {
/**
* TODO how to resolve the slow consumer: close or ignore?
*
* @param enodeName
* @param topic
* @param queueId
* @param message
* @param response
*/
void pushMessage(final String enodeName, final String topic, final Integer queueId, final byte[] message,
final RemotingCommand response);
......
......@@ -31,14 +31,12 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
......@@ -121,27 +119,6 @@ public class EnodeServiceImpl implements EnodeService {
return future;
}
@Override
public void notifyConsumerIdsChanged(
final RemotingChannel channel,
final String consumerGroup) {
if (null == consumerGroup) {
log.error("NotifyConsumerIdsChanged consumerGroup is null");
return;
}
NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
requestHeader.setConsumerGroup(consumerGroup);
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader);
try {
this.snodeController.getSnodeServer().invokeOneway(channel, request, SnodeConstant.ONE_WAY_TIMEOUT);
} catch (Exception e) {
log.error("NotifyConsumerIdsChanged consumer group: {} exception ", consumerGroup, e);
}
}
private ClusterInfo getBrokerClusterInfo(
final long timeoutMillis) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
......@@ -158,7 +135,7 @@ public class EnodeServiceImpl implements EnodeService {
}
@Override
public void updateEnodeAddr(String clusterName) throws InterruptedException, RemotingTimeoutException,
public void updateEnodeAddress(String clusterName) throws InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
synchronized (this) {
ClusterInfo clusterInfo = getBrokerClusterInfo(SnodeConstant.DEFAULT_TIMEOUT_MILLS);
......
......@@ -104,8 +104,8 @@ public class PushServiceImpl implements PushService {
if (subscription.getSubscriptionData(topic) != null) {
boolean slowConsumer = snodeController.getSlowConsumerService().isSlowConsumer(sendMessageResponseHeader.getQueueOffset(), topic, queueId, consumerGroup, enodeName);
if (slowConsumer) {
log.warn("[SlowConsumer]: {} closed as slow consumer", remotingChannel);//TODO metrics
remotingChannel.close(); //FIXME this action should be discussed
log.warn("[SlowConsumer]: {} closed as slow consumer", remotingChannel);
remotingChannel.close();
continue;
}
snodeController.getSnodeServer().push(remotingChannel, pushMessage, SnodeConstant.DEFAULT_TIMEOUT_MILLS);
......
......@@ -96,7 +96,7 @@ public class ScheduledServiceImpl implements ScheduledService {
@Override
public void run() {
try {
snodeController.getEnodeService().updateEnodeAddr(snodeConfig.getClusterName());
snodeController.getEnodeService().updateEnodeAddress(snodeConfig.getClusterName());
} catch (Exception ex) {
log.warn("Update broker addr error:{}", ex);
}
......
......@@ -27,8 +27,10 @@ public class SnodeControllerTest {
@Test
public void testSnodeRestart() {
ServerConfig serverConfig = new ServerConfig();
serverConfig.setListenPort(10912);
SnodeController snodeController = new SnodeController(
new ServerConfig(),
serverConfig,
new ClientConfig(),
new SnodeConfig());
assertThat(snodeController.initialize());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册