提交 149fc42b 编写于 作者: D duhenglucky

Add unit test

上级 2fb8b5f2
......@@ -188,6 +188,7 @@ public class DefaultMQProducerTest {
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
callbackExecutor.shutdown();
}
@Test
public void testSendMessageAsync() throws RemotingException, MQClientException, InterruptedException {
final AtomicInteger cc = new AtomicInteger(0);
......@@ -218,12 +219,12 @@ public class DefaultMQProducerTest {
Message message = new Message();
message.setTopic("test");
message.setBody("hello world".getBytes());
producer.send(new Message(),sendCallback);
producer.send(message,sendCallback,1000);
producer.send(message,new MessageQueue(),sendCallback);
producer.send(new Message(),new MessageQueue(),sendCallback,1000);
producer.send(new Message(),messageQueueSelector,null,sendCallback);
producer.send(message,messageQueueSelector,null,sendCallback,1000);
producer.send(new Message(), sendCallback);
producer.send(message, sendCallback, 1000);
producer.send(message, new MessageQueue(), sendCallback);
producer.send(new Message(), new MessageQueue(), sendCallback, 1000);
producer.send(new Message(), messageQueueSelector, null, sendCallback);
producer.send(message, messageQueueSelector, null, sendCallback, 1000);
countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
callbackExecutor.shutdown();
......
......@@ -54,7 +54,7 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
@CFNullable
private boolean m; //batch
private String n; //enode addr
private String n; //enode name
public static SendMessageRequestHeader createSendMessageRequestHeaderV1(final SendMessageRequestHeaderV2 v2) {
SendMessageRequestHeader v1 = new SendMessageRequestHeader();
......@@ -209,4 +209,23 @@ public class SendMessageRequestHeaderV2 implements CommandCustomHeader {
public void setN(String n) {
this.n = n;
}
@Override public String toString() {
return "SendMessageRequestHeaderV2{" +
"a='" + a + '\'' +
", b='" + b + '\'' +
", c='" + c + '\'' +
", d=" + d +
", e=" + e +
", f=" + f +
", g=" + g +
", h=" + h +
", i='" + i + '\'' +
", j=" + j +
", k=" + k +
", l=" + l +
", m=" + m +
", n='" + n + '\'' +
'}';
}
}
\ No newline at end of file
......@@ -16,7 +16,8 @@
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<groupId>org.apache</groupId>
......@@ -158,7 +159,7 @@
</executions>
<configuration>
<rules>
<banCircularDependencies />
<banCircularDependencies/>
</rules>
<fail>true</fail>
</configuration>
......@@ -575,7 +576,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>19.0</version>
<version>27.0.1-jre</version>
</dependency>
<dependency>
<groupId>io.openmessaging</groupId>
......
......@@ -275,4 +275,16 @@ public class SnodeController {
public PushService getPushService() {
return pushService;
}
public void setNnodeService(NnodeService nnodeService) {
this.nnodeService = nnodeService;
}
public void setRemotingClient(RemotingClient remotingClient) {
this.remotingClient = remotingClient;
}
public void setEnodeService(EnodeService enodeService) {
this.enodeService = enodeService;
}
}
......@@ -35,7 +35,7 @@ import org.apache.rocketmq.remoting.interceptor.ResponseContext;
public class SendMessageProcessor implements RequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
private final SnodeController snodeController;
private SnodeController snodeController;
public SendMessageProcessor(final SnodeController snodeController) {
this.snodeController = snodeController;
......@@ -53,6 +53,7 @@ public class SendMessageProcessor implements RequestProcessor {
boolean isSendBack = false;
if (request.getCode() == RequestCode.SEND_MESSAGE_V2) {
sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
System.out.println("sendMessageRequestHeaderV2: " + sendMessageRequestHeaderV2);
enodeName = sendMessageRequestHeaderV2.getN();
} else {
isSendBack = true;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.service;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.SmoothRateLimiter;
public class FlowControlService {
RateLimiter rateLimiter = SmoothRateLimiter.create()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class SnodeControllerTest {
@Test
public void testSnodeRestart() {
SnodeController snodeController = new SnodeController(
new ServerConfig(),
new ClientConfig(),
new SnodeConfig());
assertThat(snodeController.initialize());
snodeController.start();
snodeController.shutdown();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class SnodeTestBase {
public SendMessageRequestHeaderV2 createSendMsgRequestHeader(String group, String topic) {
SendMessageRequestHeaderV2 requestHeader = new SendMessageRequestHeaderV2();
requestHeader.setA(group);
requestHeader.setB(topic);
requestHeader.setC(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC);
requestHeader.setD(3);
requestHeader.setE(1);
requestHeader.setF(0);
requestHeader.setG(System.currentTimeMillis());
requestHeader.setH(124);
return requestHeader;
}
public RemotingCommand createSendMesssageCommand(String group, String topic) {
SendMessageRequestHeaderV2 requestHeader = createSendMsgRequestHeader(group, topic);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeader);
request.setBody(new byte[] {'a'});
return request;
}
public RemotingCommand createSuccessResponse() {
RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
response.setCode(ResponseCode.SUCCESS);
response.setOpaque(1234);
SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
responseHeader.setMsgId("123");
responseHeader.setQueueId(1);
responseHeader.setQueueOffset(123L);
response.addExtField(MessageConst.PROPERTY_MSG_REGION, "RegionHZ");
response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, "true");
response.addExtField("queueId", String.valueOf(responseHeader.getQueueId()));
response.addExtField("msgId", responseHeader.getMsgId());
response.addExtField("queueOffset", String.valueOf(responseHeader.getQueueOffset()));
return response;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.processor;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.CodecHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.service.EnodeService;
import org.apache.rocketmq.snode.service.NnodeService;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class SendMessageProcessorTest {
private SendMessageProcessor sendMessageProcessor;
@Spy
private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig());
@Mock
private RemotingChannel remotingChannel;
private String topic = "SnodeTopic";
private String group = "SnodeGroup";
private String enodeName = "enodeName";
@Mock
private EnodeService enodeService;
@Mock
private NnodeService nnodeService;
@Before
public void init() {
snodeController.setNnodeService(nnodeService);
snodeController.setEnodeService(enodeService);
sendMessageProcessor = new SendMessageProcessor(snodeController);
}
@Test
public void testProcessRequest() throws RemotingCommandException {
snodeController.setEnodeService(enodeService);
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
RemotingCommand request = createSendMesssageCommand();
SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
System.out.println("sendMessageRequestHeaderV2: " + sendMessageRequestHeaderV2);
when(this.snodeController.getEnodeService().sendMessage(anyString(), any(RemotingCommand.class))).thenReturn(future);
sendMessageProcessor.processRequest(remotingChannel, request);
}
private SendMessageRequestHeaderV2 createSendMsgRequestHeader() {
SendMessageRequestHeaderV2 requestHeader = new SendMessageRequestHeaderV2();
requestHeader.setA(group);
requestHeader.setB(topic);
requestHeader.setC(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC);
requestHeader.setD(3);
requestHeader.setE(1);
requestHeader.setF(0);
requestHeader.setG(System.currentTimeMillis());
requestHeader.setH(124);
requestHeader.setN("enodeName");
return requestHeader;
}
private RemotingCommand createSendMesssageCommand() {
SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = createSendMsgRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, sendMessageRequestHeaderV2);
request.setBody(new byte[] {'a'});
CodecHelper.makeCustomHeaderToNet(request);
return request;
}
private void assertSendMessageResult(int responseCode) throws RemotingCommandException {
}
RemotingCommand createSendMessageResponse(int responseCode) {
return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null);
}
CompletableFuture<RemotingCommand> createResponseCompletableFuture(int responseCode) {
CompletableFuture<RemotingCommand> completableFuture = new CompletableFuture<>();
return completableFuture;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.SnodeTestBase;
import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.snode.service.impl.EnodeServiceImpl;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class EnodeServiceImplTest extends SnodeTestBase {
private EnodeService enodeService;
@Spy
private SnodeController snodeController = new SnodeController(new ServerConfig(), new ClientConfig(), new SnodeConfig());
@Mock
private NnodeService nnodeService;
@Mock
private NettyRemotingClient remotingClient;
private String enodeName = "enodeName";
private String topic = "SnodeTopic";
private String group = "SnodeGroup";
@Before
public void init() {
enodeService = new EnodeServiceImpl(snodeController);
}
@Test
public void sendMessageTest() throws Exception {
snodeController.setNnodeService(nnodeService);
snodeController.setRemotingClient(remotingClient);
when(snodeController.getNnodeService().getAddressByEnodeName(anyString(), anyBoolean())).thenReturn("1024");
CompletableFuture<RemotingCommand> responseCF = new CompletableFuture<>();
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
InvokeCallback callback = mock.getArgument(3);
RemotingCommand request = mock.getArgument(1);
ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null);
responseFuture.setResponseCommand(createSuccessResponse());
callback.operationComplete(responseFuture);
responseCF.complete(createSuccessResponse());
return null;
}
}).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
RemotingCommand response = enodeService.sendMessage(enodeName, createSendMesssageCommand(group, topic)).get(3000L, TimeUnit.MILLISECONDS);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
@Test
public void pullMessageTest() throws Exception {
snodeController.setNnodeService(nnodeService);
snodeController.setRemotingClient(remotingClient);
when(snodeController.getNnodeService().getAddressByEnodeName(anyString(), anyBoolean())).thenReturn("1024");
CompletableFuture<RemotingCommand> responseCF = new CompletableFuture<>();
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock mock) throws Throwable {
InvokeCallback callback = mock.getArgument(3);
RemotingCommand request = mock.getArgument(1);
ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null);
RemotingCommand remotingCommand = createSuccessResponse();
GetMessageResult getMessageResult = createGetMessageResult();
remotingCommand.encodeHeader(getMessageResult.getBufferTotalSize());
responseFuture.setResponseCommand(remotingCommand);
responseCF.complete(remotingCommand);
callback.operationComplete(responseFuture);
return null;
}
}).when(remotingClient).invokeAsync(anyString(), any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
RemotingCommand response = enodeService.pullMessage(enodeName, createPullMessage()).get(3000L, TimeUnit.MILLISECONDS);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
}
private GetMessageResult createGetMessageResult() {
GetMessageResult getMessageResult = new GetMessageResult();
getMessageResult.setStatus(GetMessageStatus.FOUND);
getMessageResult.setMinOffset(100);
getMessageResult.setMaxOffset(1024);
getMessageResult.setNextBeginOffset(516);
return getMessageResult;
}
private RemotingCommand createPullMessage() {
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setCommitOffset(123L);
requestHeader.setConsumerGroup(group);
requestHeader.setMaxMsgNums(100);
requestHeader.setQueueId(1);
requestHeader.setQueueOffset(456L);
requestHeader.setSubscription("*");
requestHeader.setTopic(topic);
requestHeader.setSysFlag(0);
requestHeader.setSubVersion(100L);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
return request;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册