From 92778f13c213029a0204c7079b815dd99f9affb4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A7=A6=E8=8B=B1=E6=9D=B0?= <327782001@qq.com> Date: Fri, 21 Apr 2023 19:28:11 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E6=95=B4=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../nio_07_optimization/NioClient.java | 32 +++++++ .../nio_07_optimization/NioServer.java | 57 +++++++++++++ .../nio_07_optimization/Worker.java | 83 +++++++++++++++++++ .../nio_07_optimization/Worker2.java | 65 +++++++++++++++ .../nio_08_aio/AioFileChannel.java | 40 +++++++++ .../heima/netty_04_netty/HelloClient.java | 34 ++++++++ .../heima/netty_04_netty/HelloServer.java | 37 +++++++++ 7 files changed, 348 insertions(+) create mode 100644 src/main/java/com/kwan/shuyu/heima/netty_03_nio/nio_07_optimization/NioClient.java create mode 100644 src/main/java/com/kwan/shuyu/heima/netty_03_nio/nio_07_optimization/NioServer.java create mode 100644 src/main/java/com/kwan/shuyu/heima/netty_03_nio/nio_07_optimization/Worker.java create mode 100644 src/main/java/com/kwan/shuyu/heima/netty_03_nio/nio_07_optimization/Worker2.java create mode 100644 src/main/java/com/kwan/shuyu/heima/netty_03_nio/nio_08_aio/AioFileChannel.java create mode 100644 src/main/java/com/kwan/shuyu/heima/netty_04_netty/HelloClient.java create mode 100644 src/main/java/com/kwan/shuyu/heima/netty_04_netty/HelloServer.java diff --git a/src/main/java/com/kwan/shuyu/heima/netty_03_nio/nio_07_optimization/NioClient.java b/src/main/java/com/kwan/shuyu/heima/netty_03_nio/nio_07_optimization/NioClient.java new file mode 100644 index 0000000..bcab468 --- /dev/null +++ b/src/main/java/com/kwan/shuyu/heima/netty_03_nio/nio_07_optimization/NioClient.java @@ -0,0 +1,32 @@ +package com.kwan.shuyu.heima.netty_03_nio.nio_07_optimization; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + + +/** + * NioClient 客户端 + *

+ * 发消息:sc.write(Charset.defaultCharset().encode("hi!")) + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/4/18 18:22 + */ +public class NioClient { + public static void main(String[] args) throws IOException { + //创建SocketChannel + SocketChannel sc = SocketChannel.open(); + //建立和服务端的连接 + sc.connect(new InetSocketAddress("localhost", 8080)); + int count = 0; + while (true) { + final ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 1024); + count += sc.read(byteBuffer); + System.out.println(count); + byteBuffer.clear(); + } + } +} diff --git a/src/main/java/com/kwan/shuyu/heima/netty_03_nio/nio_07_optimization/NioServer.java b/src/main/java/com/kwan/shuyu/heima/netty_03_nio/nio_07_optimization/NioServer.java new file mode 100644 index 0000000..6c50ebb --- /dev/null +++ b/src/main/java/com/kwan/shuyu/heima/netty_03_nio/nio_07_optimization/NioServer.java @@ -0,0 +1,57 @@ +package com.kwan.shuyu.heima.netty_03_nio.nio_07_optimization; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * 多线程的方式进行优化 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/4/18 18:22 + */ +@Slf4j +public class NioServer { + + public static void main(String[] args) throws IOException { + Thread.currentThread().setName("boss"); + ServerSocketChannel ssc = ServerSocketChannel.open(); + ssc.configureBlocking(false); + Selector boss = Selector.open(); + SelectionKey bossKey = ssc.register(boss, 0, null); + bossKey.interestOps(SelectionKey.OP_ACCEPT); + ssc.bind(new InetSocketAddress(8080)); + // 1.创建固定数量的worker 并初始A + Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()]; + for (int i = 0; i < workers.length; i++) { + workers[i] = new Worker("worker-" + i); + } + AtomicInteger index = new AtomicInteger(); + while (true) { + boss.select(); + Iterator iter = boss.selectedKeys().iterator(); + while (iter.hasNext()) { + SelectionKey key = iter.next(); + iter.remove(); + if (key.isAcceptable()) { + SocketChannel sc = ssc.accept(); + sc.configureBlocking(false); + log.info("connected...{}", sc.getRemoteAddress()); + //2.关联selector + log.info("before register.….{}", sc.getRemoteAddress()); + //负载均衡 + workers[index.incrementAndGet() % workers.length].register(sc); + log.info("after register.….(}", sc.getRemoteAddress()); + } + } + } + } +} \ No newline at end of file diff --git a/src/main/java/com/kwan/shuyu/heima/netty_03_nio/nio_07_optimization/Worker.java b/src/main/java/com/kwan/shuyu/heima/netty_03_nio/nio_07_optimization/Worker.java new file mode 100644 index 0000000..52ab6ff --- /dev/null +++ b/src/main/java/com/kwan/shuyu/heima/netty_03_nio/nio_07_optimization/Worker.java @@ -0,0 +1,83 @@ +package com.kwan.shuyu.heima.netty_03_nio.nio_07_optimization; + +import com.kwan.shuyu.until.ByteBufferUtil; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.concurrent.ConcurrentLinkedQueue; + + +/** + * 工作线程 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/4/21 15:01 + */ +public class Worker implements Runnable { + private Thread thread; + private Selector selector; + private String name; + private volatile boolean start = false;//还未初始化 + private ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + + public Worker(String name) { + this.name = name; + } + + /** + * 初始化线程,和selector + * + * @param sc + * @throws IOException + */ + public void register(SocketChannel sc) throws IOException { + if (!start) { + selector = Selector.open(); + thread = new Thread(this, name); + thread.start(); + start = true; + } + queue.add(() -> { + try { + sc.register(selector, SelectionKey.OP_READ, null); + } catch (ClosedChannelException e) { + e.printStackTrace(); + } + }); + selector.wakeup(); + } + + @Override + public void run() { + while (true) { + try { + selector.select(); + final Runnable task = queue.poll(); + if (task != null) { + task.run(); + } + final Iterator iterator = selector.selectedKeys().iterator(); + while (iterator.hasNext()) { + final SelectionKey key = iterator.next(); + iterator.remove(); + if (key.isReadable()) { + final ByteBuffer buffer = ByteBuffer.allocate(16); + final SocketChannel channel = (SocketChannel) key.channel(); + channel.read(buffer); + buffer.flip(); + ByteBufferUtil.debugAll(buffer); + } + } + } catch (IOException e) { + e.printStackTrace(); + } + } + } +} + diff --git a/src/main/java/com/kwan/shuyu/heima/netty_03_nio/nio_07_optimization/Worker2.java b/src/main/java/com/kwan/shuyu/heima/netty_03_nio/nio_07_optimization/Worker2.java new file mode 100644 index 0000000..b965765 --- /dev/null +++ b/src/main/java/com/kwan/shuyu/heima/netty_03_nio/nio_07_optimization/Worker2.java @@ -0,0 +1,65 @@ +package com.kwan.shuyu.heima.netty_03_nio.nio_07_optimization; + +import com.kwan.shuyu.until.ByteBufferUtil; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.Iterator; + + +/** + * 使用wakeup,工作线程 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/4/21 15:01 + */ +public class Worker2 implements Runnable { + private Thread thread; + private Selector selector; + private String name; + private volatile boolean start = false;//还未初始化 + + public Worker2(String name) { + this.name = name; + } + + //初始化线程,和selector + public void register(SocketChannel sc) throws IOException { + if (!start) { + selector = Selector.open(); + thread = new Thread(this, name); + thread.start(); + start = true; + } + selector.wakeup(); + sc.register(selector, SelectionKey.OP_READ, null); + } + + @Override + public void run() { + while (true) { + try { + selector.select(); + final Iterator iterator = selector.selectedKeys().iterator(); + while (iterator.hasNext()) { + final SelectionKey key = iterator.next(); + iterator.remove(); + if (key.isReadable()) { + final ByteBuffer buffer = ByteBuffer.allocate(16); + final SocketChannel channel = (SocketChannel) key.channel(); + channel.read(buffer); + buffer.flip(); + ByteBufferUtil.debugAll(buffer); + } + } + } catch (IOException e) { + e.printStackTrace(); + } + } + } +} + diff --git a/src/main/java/com/kwan/shuyu/heima/netty_03_nio/nio_08_aio/AioFileChannel.java b/src/main/java/com/kwan/shuyu/heima/netty_03_nio/nio_08_aio/AioFileChannel.java new file mode 100644 index 0000000..f21d645 --- /dev/null +++ b/src/main/java/com/kwan/shuyu/heima/netty_03_nio/nio_08_aio/AioFileChannel.java @@ -0,0 +1,40 @@ +package com.kwan.shuyu.heima.netty_03_nio.nio_08_aio; + +import com.kwan.shuyu.until.ByteBufferUtil; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.channels.CompletionHandler; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; + +/** + * 异步传输文件 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/4/21 18:37 + */ +@Slf4j +public class AioFileChannel { + public static void main(String[] args) throws IOException { + AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("data.txt"), StandardOpenOption.READ); + ByteBuffer buffer = ByteBuffer.allocate(16); + channel.read(buffer, 0, buffer, new CompletionHandler() { + @Override + public void completed(Integer result, ByteBuffer attachment) { + log.info("read completed....{}", result); + attachment.flip(); + ByteBufferUtil.debugAll(attachment); + } + + @Override + public void failed(Throwable exc, ByteBuffer attachment) { + exc.printStackTrace(); + } + }); + System.in.read(); + } +} \ No newline at end of file diff --git a/src/main/java/com/kwan/shuyu/heima/netty_04_netty/HelloClient.java b/src/main/java/com/kwan/shuyu/heima/netty_04_netty/HelloClient.java new file mode 100644 index 0000000..dbb42e0 --- /dev/null +++ b/src/main/java/com/kwan/shuyu/heima/netty_04_netty/HelloClient.java @@ -0,0 +1,34 @@ +package com.kwan.shuyu.heima.netty_04_netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.string.StringEncoder; + +import java.net.InetSocketAddress; + +public class HelloClient { + public static void main(String[] args) throws InterruptedException { + //1.启动类 + new Bootstrap() + // 2.添加 EventLoop + .group(new NioEventLoopGroup()) + //3.选择客户端channel实现 + .channel(NioSocketChannel.class) + // 4.添加处理器 + .handler(new ChannelInitializer() { + @Override //在连接建立后被调用 + protected void initChannel(NioSocketChannel ch) throws Exception { + ch.pipeline().addLast(new StringEncoder()); + } + }) + //连接到服务器 + .connect(new InetSocketAddress("localhost", 8080)) + .sync() + .channel() + //发送数据 + .writeAndFlush("hello world") + ; + } +} \ No newline at end of file diff --git a/src/main/java/com/kwan/shuyu/heima/netty_04_netty/HelloServer.java b/src/main/java/com/kwan/shuyu/heima/netty_04_netty/HelloServer.java new file mode 100644 index 0000000..73d992f --- /dev/null +++ b/src/main/java/com/kwan/shuyu/heima/netty_04_netty/HelloServer.java @@ -0,0 +1,37 @@ +package com.kwan.shuyu.heima.netty_04_netty; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.string.StringDecoder; + +public class HelloServer { + public static void main(String[] args) { + //启动器,负责组装netty组件 + new ServerBootstrap() + //BossEventLoop,WorkerEventLoop(selector,thread),group组 + .group(new NioEventLoopGroup()) + //选择服务器的ServerSocketChannel实现 + .channel(NioServerSocketChannel.class) + //boss负责处理连接 worker(child)负责处理读写,决定了worker(child)能执行哪些操作(handler) + .childHandler( + //channel代表和客户端进行数据读写的通道Initializer初始化 + new ChannelInitializer() { + @Override + protected void initChannel(NioSocketChannel ch) throws Exception { + //添加具体的Handler + ch.pipeline().addLast(new StringDecoder());//将ByteBuffer转为字符串 + ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {//自定义Handler + @Override//读事件 + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + System.out.println(msg); + } + }); + } + }).bind(8080); + } +} \ No newline at end of file -- GitLab