fix:整理

上级 9034ce8f
package com.kwan.shuyu.heima.netty_03_nio.nio_07_optimization;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
/**
* NioClient 客户端
* <p>
* 发消息:sc.write(Charset.defaultCharset().encode("hi!"))
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/18 18:22
*/
public class NioClient {
public static void main(String[] args) throws IOException {
//创建SocketChannel
SocketChannel sc = SocketChannel.open();
//建立和服务端的连接
sc.connect(new InetSocketAddress("localhost", 8080));
int count = 0;
while (true) {
final ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 1024);
count += sc.read(byteBuffer);
System.out.println(count);
byteBuffer.clear();
}
}
}
package com.kwan.shuyu.heima.netty_03_nio.nio_07_optimization;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 多线程的方式进行优化
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/18 18:22
*/
@Slf4j
public class NioServer {
public static void main(String[] args) throws IOException {
Thread.currentThread().setName("boss");
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
Selector boss = Selector.open();
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
// 1.创建固定数量的worker 并初始A
Worker[] workers = new Worker[Runtime.getRuntime().availableProcessors()];
for (int i = 0; i < workers.length; i++) {
workers[i] = new Worker("worker-" + i);
}
AtomicInteger index = new AtomicInteger();
while (true) {
boss.select();
Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
log.info("connected...{}", sc.getRemoteAddress());
//2.关联selector
log.info("before register.….{}", sc.getRemoteAddress());
//负载均衡
workers[index.incrementAndGet() % workers.length].register(sc);
log.info("after register.….(}", sc.getRemoteAddress());
}
}
}
}
}
\ No newline at end of file
package com.kwan.shuyu.heima.netty_03_nio.nio_07_optimization;
import com.kwan.shuyu.until.ByteBufferUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* 工作线程
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/21 15:01
*/
public class Worker implements Runnable {
private Thread thread;
private Selector selector;
private String name;
private volatile boolean start = false;//还未初始化
private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
public Worker(String name) {
this.name = name;
}
/**
* 初始化线程,和selector
*
* @param sc
* @throws IOException
*/
public void register(SocketChannel sc) throws IOException {
if (!start) {
selector = Selector.open();
thread = new Thread(this, name);
thread.start();
start = true;
}
queue.add(() -> {
try {
sc.register(selector, SelectionKey.OP_READ, null);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
});
selector.wakeup();
}
@Override
public void run() {
while (true) {
try {
selector.select();
final Runnable task = queue.poll();
if (task != null) {
task.run();
}
final Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
final SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
final ByteBuffer buffer = ByteBuffer.allocate(16);
final SocketChannel channel = (SocketChannel) key.channel();
channel.read(buffer);
buffer.flip();
ByteBufferUtil.debugAll(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
package com.kwan.shuyu.heima.netty_03_nio.nio_07_optimization;
import com.kwan.shuyu.until.ByteBufferUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/**
* 使用wakeup,工作线程
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/21 15:01
*/
public class Worker2 implements Runnable {
private Thread thread;
private Selector selector;
private String name;
private volatile boolean start = false;//还未初始化
public Worker2(String name) {
this.name = name;
}
//初始化线程,和selector
public void register(SocketChannel sc) throws IOException {
if (!start) {
selector = Selector.open();
thread = new Thread(this, name);
thread.start();
start = true;
}
selector.wakeup();
sc.register(selector, SelectionKey.OP_READ, null);
}
@Override
public void run() {
while (true) {
try {
selector.select();
final Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
final SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
final ByteBuffer buffer = ByteBuffer.allocate(16);
final SocketChannel channel = (SocketChannel) key.channel();
channel.read(buffer);
buffer.flip();
ByteBufferUtil.debugAll(buffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
package com.kwan.shuyu.heima.netty_03_nio.nio_08_aio;
import com.kwan.shuyu.until.ByteBufferUtil;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
/**
* 异步传输文件
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/21 18:37
*/
@Slf4j
public class AioFileChannel {
public static void main(String[] args) throws IOException {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("data.txt"), StandardOpenOption.READ);
ByteBuffer buffer = ByteBuffer.allocate(16);
channel.read(buffer, 0, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
log.info("read completed....{}", result);
attachment.flip();
ByteBufferUtil.debugAll(attachment);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
System.in.read();
}
}
\ No newline at end of file
package com.kwan.shuyu.heima.netty_04_netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
//1.启动类
new Bootstrap()
// 2.添加 EventLoop
.group(new NioEventLoopGroup())
//3.选择客户端channel实现
.channel(NioSocketChannel.class)
// 4.添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override //在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder());
}
})
//连接到服务器
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel()
//发送数据
.writeAndFlush("hello world")
;
}
}
\ No newline at end of file
package com.kwan.shuyu.heima.netty_04_netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
public class HelloServer {
public static void main(String[] args) {
//启动器,负责组装netty组件
new ServerBootstrap()
//BossEventLoop,WorkerEventLoop(selector,thread),group组
.group(new NioEventLoopGroup())
//选择服务器的ServerSocketChannel实现
.channel(NioServerSocketChannel.class)
//boss负责处理连接 worker(child)负责处理读写,决定了worker(child)能执行哪些操作(handler)
.childHandler(
//channel代表和客户端进行数据读写的通道Initializer初始化
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
//添加具体的Handler
ch.pipeline().addLast(new StringDecoder());//将ByteBuffer转为字符串
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {//自定义Handler
@Override//读事件
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
});
}
}).bind(8080);
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册