提交 6fd801de 编写于 作者: 小傅哥's avatar 小傅哥

feat: 模拟RPC请求

上级 0dcdb8eb
...@@ -72,6 +72,22 @@ ...@@ -72,6 +72,22 @@
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- Protostuff -->
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
<version>1.0.10</version>
</dependency>
<dependency>
<groupId>com.dyuproject.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
<version>1.0.10</version>
</dependency>
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>2.4</version>
</dependency>
<dependency> <dependency>
<groupId>cn.bugstack</groupId> <groupId>cn.bugstack</groupId>
......
package cn.bugstack.dev.tech.dubbo.consumer.config;
import cn.bugstack.dev.tech.dubbo.api.IUserService;
import cn.bugstack.dev.tech.dubbo.api.dto.UserResDTO;
import cn.bugstack.dev.tech.dubbo.api.types.Constants;
import cn.bugstack.dev.tech.dubbo.api.types.Response;
import com.alibaba.fastjson2.JSON;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.stereotype.Component;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
@Slf4j
@Component("rpcProxyBeanFactory")
public class RPCProxyBeanFactory implements FactoryBean<IUserService>, Runnable {
private Channel channel;
// 缓存数据,实际RPC会对每次的调用生成一个ID来标记获取
private Object responseCache;
public RPCProxyBeanFactory() throws InterruptedException {
new Thread(this).start();
while (null == channel) {
Thread.sleep(150);
log.info("Rpc Socket 链接等待...");
}
}
@Override
public IUserService getObject() throws Exception {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Class<?>[] classes = {IUserService.class};
InvocationHandler handler = new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (Object.class.equals(method.getDeclaringClass())) {
return method.invoke(this, args);
}
Map<String, Object> request = new HashMap<>();
request.put("clazz", IUserService.class);
request.put("methodName", method.getName());
request.put("paramTypes", method.getParameterTypes());
request.put("args", args);
channel.writeAndFlush(request);
// 模拟超时等待,一般RPC接口请求,都有一个超时等待时长。
Thread.sleep(350);
return responseCache;
}
};
return (IUserService) Proxy.newProxyInstance(classLoader, classes, handler);
}
@Override
public Class<?> getObjectType() {
return IUserService.class;
}
@Override
public void run() {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.AUTO_READ, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new ObjectEncoder());
channel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
channel.pipeline().addLast(new SimpleChannelInboundHandler<Map<String, Object>>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Map<String, Object> data) throws Exception {
responseCache = data.get("data");
}
});
}
});
ChannelFuture channelFuture = b.connect("127.0.0.1", 22881).syncUninterruptibly();
this.channel = channelFuture.channel();
channelFuture.channel().closeFuture().syncUninterruptibly();
} finally {
workerGroup.shutdownGracefully();
}
}
}
...@@ -5,6 +5,15 @@ import cn.bugstack.dev.tech.dubbo.api.dto.UserReqDTO; ...@@ -5,6 +5,15 @@ import cn.bugstack.dev.tech.dubbo.api.dto.UserReqDTO;
import cn.bugstack.dev.tech.dubbo.api.dto.UserResDTO; import cn.bugstack.dev.tech.dubbo.api.dto.UserResDTO;
import cn.bugstack.dev.tech.dubbo.api.types.Response; import cn.bugstack.dev.tech.dubbo.api.types.Response;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import jdk.nashorn.internal.ir.annotations.Reference;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.config.annotation.DubboReference; import org.apache.dubbo.config.annotation.DubboReference;
import org.junit.Test; import org.junit.Test;
...@@ -12,6 +21,8 @@ import org.junit.runner.RunWith; ...@@ -12,6 +21,8 @@ import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
@Slf4j @Slf4j
@RunWith(SpringRunner.class) @RunWith(SpringRunner.class)
@SpringBootTest @SpringBootTest
...@@ -21,6 +32,9 @@ public class ApiTest { ...@@ -21,6 +32,9 @@ public class ApiTest {
@DubboReference(interfaceClass = IUserService.class, version = "1.0.0") @DubboReference(interfaceClass = IUserService.class, version = "1.0.0")
private IUserService userService; private IUserService userService;
@Resource(name = "rpcProxyBeanFactory")
private IUserService proxyUserService;
@Test @Test
public void test_userService() { public void test_userService() {
UserReqDTO reqDTO = UserReqDTO.builder().userId("10001").build(); UserReqDTO reqDTO = UserReqDTO.builder().userId("10001").build();
...@@ -28,4 +42,11 @@ public class ApiTest { ...@@ -28,4 +42,11 @@ public class ApiTest {
log.info("测试结果 req: {} res: {}", JSON.toJSONString(reqDTO), JSON.toJSONString(resDTO)); log.info("测试结果 req: {} res: {}", JSON.toJSONString(reqDTO), JSON.toJSONString(resDTO));
} }
@Test
public void test_proxyUserService(){
UserReqDTO reqDTO = UserReqDTO.builder().userId("10001").build();
Response<UserResDTO> resDTO = proxyUserService.queryUserInfo(reqDTO);
log.info("测试结果 req: {} res: {}", JSON.toJSONString(reqDTO), JSON.toJSONString(resDTO));
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册