提交 b997c974 编写于 作者: V vongosling

Polish remoting module's UT

上级 63de56c7
...@@ -131,8 +131,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti ...@@ -131,8 +131,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti
@Override @Override
public void start() { public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(// this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(), // nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() { new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0); private AtomicInteger threadIndex = new AtomicInteger(0);
......
...@@ -69,9 +69,6 @@ public class RemotingCommand { ...@@ -69,9 +69,6 @@ public class RemotingCommand {
} }
} }
/**
*/
private int code; private int code;
private LanguageCode language = LanguageCode.JAVA; private LanguageCode language = LanguageCode.JAVA;
private int version = 0; private int version = 0;
...@@ -80,13 +77,9 @@ public class RemotingCommand { ...@@ -80,13 +77,9 @@ public class RemotingCommand {
private String remark; private String remark;
private HashMap<String, String> extFields; private HashMap<String, String> extFields;
private transient CommandCustomHeader customHeader; private transient CommandCustomHeader customHeader;
/**
*/
private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer; private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
/**
*/
private transient byte[] body; private transient byte[] body;
protected RemotingCommand() { protected RemotingCommand() {
...@@ -117,9 +110,6 @@ public class RemotingCommand { ...@@ -117,9 +110,6 @@ public class RemotingCommand {
return createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, "not set any response code", classHeader); return createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, "not set any response code", classHeader);
} }
/**
*/
public static RemotingCommand createResponseCommand(int code, String remark, Class<? extends CommandCustomHeader> classHeader) { public static RemotingCommand createResponseCommand(int code, String remark, Class<? extends CommandCustomHeader> classHeader) {
RemotingCommand cmd = new RemotingCommand(); RemotingCommand cmd = new RemotingCommand();
cmd.markResponseType(); cmd.markResponseType();
...@@ -411,9 +401,7 @@ public class RemotingCommand { ...@@ -411,9 +401,7 @@ public class RemotingCommand {
return encodeHeader(this.body != null ? this.body.length : 0); return encodeHeader(this.body != null ? this.body.length : 0);
} }
/**
*/
public ByteBuffer encodeHeader(final int bodyLength) { public ByteBuffer encodeHeader(final int bodyLength) {
// 1> header length size // 1> header length size
int length = 4; int length = 4;
......
...@@ -42,10 +42,8 @@ public class RocketMQSerializable { ...@@ -42,10 +42,8 @@ public class RocketMQSerializable {
extLen = extFieldsBytes.length; extLen = extFieldsBytes.length;
} }
// ################### cal total length
int totalLen = calTotalLen(remarkLen, extLen); int totalLen = calTotalLen(remarkLen, extLen);
// ################### content
ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen); ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen);
// int code(~32767) // int code(~32767)
headerBuffer.putShort((short) cmd.getCode()); headerBuffer.putShort((short) cmd.getCode());
...@@ -76,7 +74,6 @@ public class RocketMQSerializable { ...@@ -76,7 +74,6 @@ public class RocketMQSerializable {
} }
public static byte[] mapSerialize(HashMap<String, String> map) { public static byte[] mapSerialize(HashMap<String, String> map) {
// keySize+key+valSize+val
// keySize+key+valSize+val // keySize+key+valSize+val
if (null == map || map.isEmpty()) if (null == map || map.isEmpty())
return null; return null;
...@@ -174,10 +171,10 @@ public class RocketMQSerializable { ...@@ -174,10 +171,10 @@ public class RocketMQSerializable {
HashMap<String, String> map = new HashMap<String, String>(); HashMap<String, String> map = new HashMap<String, String>();
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
short keySize = 0; short keySize;
byte[] keyContent = null; byte[] keyContent;
int valSize = 0; int valSize;
byte[] valContent = null; byte[] valContent;
while (byteBuffer.hasRemaining()) { while (byteBuffer.hasRemaining()) {
keySize = byteBuffer.getShort(); keySize = byteBuffer.getShort();
keyContent = new byte[keySize]; keyContent = new byte[keySize];
......
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
//
// Remoting protocol V0.1 draft
//
// protocol <length> <header length> <header data> <body data>
// 1 2 3 4
//
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
//
// Remoting protocol V0.1 draft
//
// protocol <length> <header length> <header data> <body data>
// 1 2 3 4
//
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* $Id: MixTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
package org.apache.rocketmq.remoting;
import org.junit.Test;
public class MixTest {
@Test
public void test_extFieldsValue() {
}
}
...@@ -15,12 +15,10 @@ ...@@ -15,12 +15,10 @@
* limitations under the License. * limitations under the License.
*/ */
/**
* $Id: NettyRPCTest.java 1831 2013-05-16 01:39:51Z vintagewang@apache.org $
*/
package org.apache.rocketmq.remoting; package org.apache.rocketmq.remoting;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import org.apache.rocketmq.remoting.annotation.CFNullable; import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.exception.RemotingCommandException;
...@@ -34,14 +32,16 @@ import org.apache.rocketmq.remoting.netty.NettyRemotingServer; ...@@ -34,14 +32,16 @@ import org.apache.rocketmq.remoting.netty.NettyRemotingServer;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig;
import org.apache.rocketmq.remoting.netty.ResponseFuture; import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
public class NettyRPCTest { public class RemotingServerTest {
private static RemotingServer remotingServer; private static RemotingServer remotingServer;
private static RemotingClient remotingClient; private static RemotingClient remotingClient;
...@@ -49,12 +49,9 @@ public class NettyRPCTest { ...@@ -49,12 +49,9 @@ public class NettyRPCTest {
NettyServerConfig config = new NettyServerConfig(); NettyServerConfig config = new NettyServerConfig();
RemotingServer remotingServer = new NettyRemotingServer(config); RemotingServer remotingServer = new NettyRemotingServer(config);
remotingServer.registerProcessor(0, new NettyRequestProcessor() { remotingServer.registerProcessor(0, new NettyRequestProcessor() {
private int i = 0;
@Override @Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) { public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
System.out.println("processRequest=" + request + " " + (i++)); request.setRemark("Hi " + ctx.channel().remoteAddress());
request.setRemark("hello, I am respponse " + ctx.channel().remoteAddress());
return request; return request;
} }
...@@ -63,7 +60,9 @@ public class NettyRPCTest { ...@@ -63,7 +60,9 @@ public class NettyRPCTest {
return false; return false;
} }
}, Executors.newCachedThreadPool()); }, Executors.newCachedThreadPool());
remotingServer.start(); remotingServer.start();
return remotingServer; return remotingServer;
} }
...@@ -75,7 +74,7 @@ public class NettyRPCTest { ...@@ -75,7 +74,7 @@ public class NettyRPCTest {
} }
@BeforeClass @BeforeClass
public static void initialize() throws InterruptedException { public static void setup() throws InterruptedException {
remotingServer = createRemotingServer(); remotingServer = createRemotingServer();
remotingClient = createRemotingClient(); remotingClient = createRemotingClient();
} }
...@@ -87,94 +86,49 @@ public class NettyRPCTest { ...@@ -87,94 +86,49 @@ public class NettyRPCTest {
} }
@Test @Test
public void test_RPC_Sync() throws InterruptedException, RemotingConnectException, public void testInvokeSync() throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException { RemotingSendRequestException, RemotingTimeoutException {
RequestHeader requestHeader = new RequestHeader();
requestHeader.setCount(1);
requestHeader.setMessageTitle("Welcome");
RemotingCommand request = RemotingCommand.createRequestCommand(0, requestHeader);
RemotingCommand response = remotingClient.invokeSync("localhost:8888", request, 1000 * 3);
assertTrue(response != null);
assertThat(response.getLanguage()).isEqualTo(LanguageCode.JAVA);
assertThat(response.getExtFields()).hasSize(2);
for (int i = 0; i < 100; i++) {
TestRequestHeader requestHeader = new TestRequestHeader();
requestHeader.setCount(i);
requestHeader.setMessageTitle("HelloMessageTitle");
RemotingCommand request = RemotingCommand.createRequestCommand(0, requestHeader);
RemotingCommand response = remotingClient.invokeSync("localhost:8888", request, 1000 * 3000);
System.out.println("invoke result = " + response);
assertTrue(response != null);
}
} }
@Test @Test
public void test_RPC_Oneway() throws InterruptedException, RemotingConnectException, public void testInvokeOneway() throws InterruptedException, RemotingConnectException,
RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException { RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException {
for (int i = 0; i < 100; i++) { RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
RemotingCommand request = RemotingCommand.createRequestCommand(0, null); request.setRemark("messi");
request.setRemark(String.valueOf(i)); remotingClient.invokeOneway("localhost:8888", request, 1000 * 3);
remotingClient.invokeOneway("localhost:8888", request, 1000 * 3);
}
} }
@Test @Test
public void test_RPC_Async() throws InterruptedException, RemotingConnectException, public void testInvokeAsync() throws InterruptedException, RemotingConnectException,
RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException { RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException {
for (int i = 0; i < 100; i++) { final CountDownLatch latch = new CountDownLatch(1);
RemotingCommand request = RemotingCommand.createRequestCommand(0, null); RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
request.setRemark(String.valueOf(i)); request.setRemark("messi");
remotingClient.invokeAsync("localhost:8888", request, 1000 * 3, new InvokeCallback() { remotingClient.invokeAsync("localhost:8888", request, 1000 * 3, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
System.out.println(responseFuture.getResponseCommand());
}
});
}
}
@Test
public void test_server_call_client() throws InterruptedException, RemotingConnectException,
RemotingSendRequestException, RemotingTimeoutException {
remotingServer.registerProcessor(0, new NettyRequestProcessor() {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
try {
return remotingServer.invokeSync(ctx.channel(), request, 1000 * 10);
} catch (InterruptedException | RemotingSendRequestException | RemotingTimeoutException e) {
e.printStackTrace();
}
return null;
}
@Override @Override
public boolean rejectRequest() { public void operationComplete(ResponseFuture responseFuture) {
return false; latch.countDown();
} assertTrue(responseFuture != null);
}, Executors.newCachedThreadPool()); assertThat(responseFuture.getResponseCommand().getLanguage()).isEqualTo(LanguageCode.JAVA);
assertThat(responseFuture.getResponseCommand().getExtFields()).hasSize(2);
remotingClient.registerProcessor(0, new NettyRequestProcessor() {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) {
System.out.println("client receive server request = " + request);
request.setRemark("client remark");
return request;
} }
});
@Override latch.await();
public boolean rejectRequest() {
return false;
}
}, Executors.newCachedThreadPool());
for (int i = 0; i < 3; i++) {
RemotingCommand request = RemotingCommand.createRequestCommand(0, null);
RemotingCommand response = remotingClient.invokeSync("localhost:8888", request, 1000 * 3);
System.out.println("invoke result = " + response);
assertTrue(response != null);
}
} }
} }
class TestRequestHeader implements CommandCustomHeader { class RequestHeader implements CommandCustomHeader {
@CFNullable @CFNullable
private Integer count; private Integer count;
...@@ -202,32 +156,3 @@ class TestRequestHeader implements CommandCustomHeader { ...@@ -202,32 +156,3 @@ class TestRequestHeader implements CommandCustomHeader {
} }
} }
class TestResponseHeader implements CommandCustomHeader {
@CFNullable
private Integer count;
@CFNullable
private String messageTitle;
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
@Override
public void checkFields() throws RemotingCommandException {
}
public String getMessageTitle() {
return messageTitle;
}
public void setMessageTitle(String messageTitle) {
this.messageTitle = messageTitle;
}
}
...@@ -26,9 +26,9 @@ public class RocketMQSerializableTest { ...@@ -26,9 +26,9 @@ public class RocketMQSerializableTest {
public void testRocketMQProtocolEncodeAndDecode_WithoutRemarkWithoutExtFields() { public void testRocketMQProtocolEncodeAndDecode_WithoutRemarkWithoutExtFields() {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, "2333"); System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, "2333");
int code = 103; //org.apache.rocketmq.common.protocol.RequestCode.REGISTER_BROKER //org.apache.rocketmq.common.protocol.RequestCode.REGISTER_BROKER
RemotingCommand cmd = RemotingCommand.createRequestCommand(code, int code = 103;
new SampleCommandCustomHeader()); RemotingCommand cmd = RemotingCommand.createRequestCommand(code, new SampleCommandCustomHeader());
cmd.setSerializeTypeCurrentRPC(SerializeType.ROCKETMQ); cmd.setSerializeTypeCurrentRPC(SerializeType.ROCKETMQ);
byte[] result = RocketMQSerializable.rocketMQProtocolEncode(cmd); byte[] result = RocketMQSerializable.rocketMQProtocolEncode(cmd);
...@@ -57,7 +57,8 @@ public class RocketMQSerializableTest { ...@@ -57,7 +57,8 @@ public class RocketMQSerializableTest {
public void testRocketMQProtocolEncodeAndDecode_WithRemarkWithoutExtFields() { public void testRocketMQProtocolEncodeAndDecode_WithRemarkWithoutExtFields() {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, "2333"); System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, "2333");
int code = 103; //org.apache.rocketmq.common.protocol.RequestCode.REGISTER_BROKER //org.apache.rocketmq.common.protocol.RequestCode.REGISTER_BROKER
int code = 103;
RemotingCommand cmd = RemotingCommand.createRequestCommand(code, RemotingCommand cmd = RemotingCommand.createRequestCommand(code,
new SampleCommandCustomHeader()); new SampleCommandCustomHeader());
cmd.setSerializeTypeCurrentRPC(SerializeType.ROCKETMQ); cmd.setSerializeTypeCurrentRPC(SerializeType.ROCKETMQ);
...@@ -94,7 +95,8 @@ public class RocketMQSerializableTest { ...@@ -94,7 +95,8 @@ public class RocketMQSerializableTest {
public void testRocketMQProtocolEncodeAndDecode_WithoutRemarkWithExtFields() { public void testRocketMQProtocolEncodeAndDecode_WithoutRemarkWithExtFields() {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, "2333"); System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, "2333");
int code = 103; //org.apache.rocketmq.common.protocol.RequestCode.REGISTER_BROKER //org.apache.rocketmq.common.protocol.RequestCode.REGISTER_BROKER
int code = 103;
RemotingCommand cmd = RemotingCommand.createRequestCommand(code, RemotingCommand cmd = RemotingCommand.createRequestCommand(code,
new SampleCommandCustomHeader()); new SampleCommandCustomHeader());
cmd.setSerializeTypeCurrentRPC(SerializeType.ROCKETMQ); cmd.setSerializeTypeCurrentRPC(SerializeType.ROCKETMQ);
...@@ -129,7 +131,7 @@ public class RocketMQSerializableTest { ...@@ -129,7 +131,7 @@ public class RocketMQSerializableTest {
@Test @Test
public void testIsBlank_NotBlank() { public void testIsBlank_NotBlank() {
assertThat(RocketMQSerializable.isBlank("aeiou")).isFalse(); assertThat(RocketMQSerializable.isBlank("bar")).isFalse();
assertThat(RocketMQSerializable.isBlank(" A ")).isFalse(); assertThat(RocketMQSerializable.isBlank(" A ")).isFalse();
} }
...@@ -146,6 +148,6 @@ public class RocketMQSerializableTest { ...@@ -146,6 +148,6 @@ public class RocketMQSerializableTest {
private int parseToInt(byte[] array, int index) { private int parseToInt(byte[] array, int index) {
return array[index] * 16777216 + array[++index] * 65536 + array[++index] * 256 return array[index] * 16777216 + array[++index] * 65536 + array[++index] * 256
+ array[++index]; + array[++index];
} }
} }
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.subclass;
import org.junit.Test;
public class TestSubClassAuto {
@Test
public void test_sub() {
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册