学习netty之前需要先弄懂NIO各个组件的基本功能
NIO 主要是由Channel,Selector,Buffer组成
Channel的实现如下:
- FileChannel – 从文件中读取数据
- DatagramChannel – UDP读写网络中读取数据
- SocketChannel – TCP读写网络中读取数据
- ServerSocketChannel – 监听新的TCP连接 每一个新进来的连接都会创建对应的SocketChannel
FileChannel的简单示例如下,去掉之前的inputstream,outputstream读写,改成了nio的byteBuffer读写
public void readFile() throws IOException, URISyntaxException {
URL url = this.getClass().getResource("/data.txt");
RandomAccessFile file = new RandomAccessFile(new File(url.toURI()), "rw");
FileChannel channel = file.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(20);
int read = channel.read(buffer);
while (read != -1) {
System.out.println("read: " + read);
buffer.flip();
while (buffer.hasRemaining()) {
System.out.println(buffer.get());
}
buffer.clear();
read = channel.read(buffer);
}
channel.close();
file.close();
}
可以使用nio的Files来读取每一行数据,是配合Path对象来处理每行数据的,使用lambda表达式打印每一行数据
public void readFile() throws IOException, URISyntaxException {
URL url = this.getClass().getResource("/data.txt");
Files.lines(Paths.get(url.toURI())).forEach(line -> {
System.out.println(line);
});
}
对比上来说 整洁了不少
Buffer的类型如下:没有BooleanBuffer,相应的多一个内存映射MappedByteBuffer
- ByteBuffer
- MappedByteBuffer
- CharBuffer
- DoubleBuffer
- FloatBuffer
- IntBuffer
- FloatBuffer
- LongBuffer
- ShortBuffer
flip方法将Buffer从写模式切换到读模式
Buffer分配
- ByteBuffer.allocate(215)
Buffer写入数据
- channel.read(buffer)
- buffer.put(215)
Buffer读出数据
- channel.write(buffer)
- buffer.get()
- buffer.getInt()
下面这些组件都需要慢慢琢磨学习
Selector
创建一个Selector使用如下方法
Selector selector = Selector.open();
Selector一般是配合channel使用,把channel注册到selector上,如果配合使用的时候必须使用非阻塞模式,FileChannel不能注册到Selector上。
注册监测的事件包含以下几种
- connect – 连接就绪
- accept – 接收就绪
- read – 读就绪
- write – 写就绪
如果需要监听多个事件 使用SelectionKey.OP_READ | SelectionKey.OP_WRITE 或的方式可以同时监听多个事件
SocketChannel,ServerSocketChannel
类似FileChannel可以配合Selector更好的监控渠道的状态
DatagramChannel
是一个发送UDP包的通道,无连接的网络协议,发送和接收的都是数据包
Pipe
java nio 可以通过两个线程之间单向数据连接,数据写入到sink,从source读取
sink通道的获取
Pipe.SinkChannel sinkChannel = pipe.sink();
source通道的获取
Pipe.SourceChannel sourceChannel = pipe.source();
前面都是一些零散的基础概念
主要是为了后面学习netty做准备
netty中会持有两个EventLoopGroup,其中一个boss组是来接收客户端发来的TCP链接请求的,worker组是用来完成三次我搜的链接套接字的网络IO请求。
三次握手的TCP时序图
又从正网弄了一个TCP状态跃迁图
服务器上tcp连接都会处于其中的某些状态
time_wait数相对过多,发起socket主动关闭的一方 socket将进入TIME_WAIT状态,主要是由于socket keeplive有时间限制,到一定时间时 会主动关闭socket连接,但是客户端还没有响应关闭。
每一个socket都会打开一个socketChannel,channel中的数据会通过channelPipeline来执行多个handler来处理数据。
分为输入和输出两类handler
JDK预置了interface java.util.concurrent.Future,但是其所提供的实现,只允许手动检查对应的操作是否已经完成,或者一直阻塞直到它完成。这是非常繁琐的,所以Netty提供了它自己的实现——ChannelFuture,用于在执行异步操作的时候使用
理论看太多容易迷失怎么样具体去操作
现在比较流行lettuce框架,底层跟redis通信采用的是netty来做的
RedisClient是redis客户端连接类,看其中一个方法
connectStatefulAsync
private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection,
DefaultEndpoint endpoint, RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) {
ConnectionBuilder connectionBuilder;
// ssl 的关联操作
if (redisURI.isSsl()) {
SslConnectionBuilder sslConnectionBuilder = SslConnectionBuilder.sslConnectionBuilder();
sslConnectionBuilder.ssl(redisURI);
connectionBuilder = sslConnectionBuilder;
} else {
connectionBuilder = ConnectionBuilder.connectionBuilder();
}
connectionBuilder.connection(connection);
connectionBuilder.clientOptions(clientOptions);
connectionBuilder.clientResources(clientResources);
connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint);
// 这个protect方法就是内部包装的传统的netty启动方法
connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI);
channelType(connectionBuilder, redisURI);
if (clientOptions.isPingBeforeActivateConnection()) {
if (hasPassword(redisURI)) {
connectionBuilder.enableAuthPingBeforeConnect();
} else {
connectionBuilder.enablePingBeforeConnect();
}
}
ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder);
if (!clientOptions.isPingBeforeActivateConnection() && hasPassword(redisURI)) {
future = future.thenApplyAsync(channelHandler -> {
connection.async().auth(new String(redisURI.getPassword()));
return channelHandler;
}, clientResources.eventExecutorGroup());
}
if (LettuceStrings.isNotEmpty(redisURI.getClientName())) {
future.thenApply(channelHandler -> {
connection.setClientName(redisURI.getClientName());
return channelHandler;
});
}
if (redisURI.getDatabase() != 0) {
future = future.thenApplyAsync(channelHandler -> {
connection.async().select(redisURI.getDatabase());
return channelHandler;
}, clientResources.eventExecutorGroup());
}
return future.thenApply(channelHandler -> (S) connection);
}
connectionBuilder 方法
protected void connectionBuilder(Supplier<SocketAddress> socketAddressSupplier, ConnectionBuilder connectionBuilder,
RedisURI redisURI) {
Bootstrap redisBootstrap = new Bootstrap();
redisBootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
redisBootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
redisBootstrap.option(ChannelOption.ALLOCATOR, BUF_ALLOCATOR);
SocketOptions socketOptions = getOptions().getSocketOptions();
redisBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
Math.toIntExact(socketOptions.getConnectTimeout().toMillis()));
if (LettuceStrings.isEmpty(redisURI.getSocket())) {
redisBootstrap.option(ChannelOption.SO_KEEPALIVE, socketOptions.isKeepAlive());
redisBootstrap.option(ChannelOption.TCP_NODELAY, socketOptions.isTcpNoDelay());
}
connectionBuilder.timeout(redisURI.getTimeout());
connectionBuilder.password(redisURI.getPassword());
connectionBuilder.bootstrap(redisBootstrap);
connectionBuilder.channelGroup(channels).connectionEvents(connectionEvents).timer(timer);
connectionBuilder.socketAddressSupplier(socketAddressSupplier);
}
DefaultEventLoopGroupProvider中有创建NioEventLoopGroup的工作,默认大小是取系统变量io.netty.eventLoopThreads的值和cpu的个数,我的是4核,所有默认持有4个单线程的Executor执行器,task在每个里面都是由单个执行器来队列完成的。
查看服务器在跑的线程名称后发现实际只有一个
“lettuce-nioEventLoop-6-1” #965 daemon prio=5 os_prio=0 tid=0x00007fec152bc800 nid=0x75d4 runnable [0x00007fecc935f000]
应该是负载还没到,后期压力测试下看看
spring webflux
https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html