java nio

参照网址 http://ifeve.com/overview/

学习netty之前需要先弄懂NIO各个组件的基本功能

NIO 主要是由Channel,Selector,Buffer组成

Channel的实现如下:

  • FileChannel – 从文件中读取数据
  • DatagramChannel – UDP读写网络中读取数据
  • SocketChannel – TCP读写网络中读取数据
  • ServerSocketChannel – 监听新的TCP连接 每一个新进来的连接都会创建对应的SocketChannel

FileChannel的简单示例如下,去掉之前的inputstream,outputstream读写,改成了nio的byteBuffer读写

    public void readFile() throws IOException, URISyntaxException {

        URL url = this.getClass().getResource("/data.txt");
        RandomAccessFile file = new RandomAccessFile(new File(url.toURI()), "rw");
        FileChannel channel = file.getChannel();

        ByteBuffer buffer = ByteBuffer.allocate(20);
        int read = channel.read(buffer);

        while (read != -1) {

            System.out.println("read: " + read);
            buffer.flip();

            while (buffer.hasRemaining()) {
                System.out.println(buffer.get());
            }

            buffer.clear();
            read = channel.read(buffer);

        }

        channel.close();
        file.close();

    }

可以使用nio的Files来读取每一行数据,是配合Path对象来处理每行数据的,使用lambda表达式打印每一行数据

    public void readFile() throws IOException, URISyntaxException {

        URL url = this.getClass().getResource("/data.txt");

        Files.lines(Paths.get(url.toURI())).forEach(line -> {
            System.out.println(line);
        });

    }

对比上来说 整洁了不少

Buffer的类型如下:没有BooleanBuffer,相应的多一个内存映射MappedByteBuffer

  • ByteBuffer
  • MappedByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • FloatBuffer
  • LongBuffer
  • ShortBuffer

flip方法将Buffer从写模式切换到读模式
cap,pos,lim

Buffer分配

  • ByteBuffer.allocate(215)

Buffer写入数据

  • channel.read(buffer)
  • buffer.put(215)

Buffer读出数据

  • channel.write(buffer)
  • buffer.get()
  • buffer.getInt()

下面这些组件都需要慢慢琢磨学习

Selector
创建一个Selector使用如下方法
Selector selector = Selector.open();

Selector一般是配合channel使用,把channel注册到selector上,如果配合使用的时候必须使用非阻塞模式,FileChannel不能注册到Selector上。
注册监测的事件包含以下几种

  • connect – 连接就绪
  • accept – 接收就绪
  • read – 读就绪
  • write – 写就绪

如果需要监听多个事件 使用SelectionKey.OP_READ | SelectionKey.OP_WRITE 或的方式可以同时监听多个事件

SocketChannel,ServerSocketChannel

类似FileChannel可以配合Selector更好的监控渠道的状态

DatagramChannel

是一个发送UDP包的通道,无连接的网络协议,发送和接收的都是数据包

Pipe

java nio 可以通过两个线程之间单向数据连接,数据写入到sink,从source读取

nio pipe

sink通道的获取
Pipe.SinkChannel sinkChannel = pipe.sink();

source通道的获取
Pipe.SourceChannel sourceChannel = pipe.source();

前面都是一些零散的基础概念
主要是为了后面学习netty做准备

netty中会持有两个EventLoopGroup,其中一个boss组是来接收客户端发来的TCP链接请求的,worker组是用来完成三次我搜的链接套接字的网络IO请求。
三次握手的TCP时序图
tcp status

又从正网弄了一个TCP状态跃迁图
tcp status

服务器上tcp连接都会处于其中的某些状态
tcp server status

time_wait数相对过多,发起socket主动关闭的一方 socket将进入TIME_WAIT状态,主要是由于socket keeplive有时间限制,到一定时间时 会主动关闭socket连接,但是客户端还没有响应关闭。

每一个socket都会打开一个socketChannel,channel中的数据会通过channelPipeline来执行多个handler来处理数据。
tcp server status
分为输入和输出两类handler

JDK预置了interface java.util.concurrent.Future,但是其所提供的实现,只允许手动检查对应的操作是否已经完成,或者一直阻塞直到它完成。这是非常繁琐的,所以Netty提供了它自己的实现——ChannelFuture,用于在执行异步操作的时候使用

理论看太多容易迷失怎么样具体去操作
现在比较流行lettuce框架,底层跟redis通信采用的是netty来做的

RedisClient是redis客户端连接类,看其中一个方法

connectStatefulAsync

    private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection,
            DefaultEndpoint endpoint, RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) {

        ConnectionBuilder connectionBuilder;
        // ssl 的关联操作
        if (redisURI.isSsl()) {
            SslConnectionBuilder sslConnectionBuilder = SslConnectionBuilder.sslConnectionBuilder();
            sslConnectionBuilder.ssl(redisURI);
            connectionBuilder = sslConnectionBuilder;
        } else {
            connectionBuilder = ConnectionBuilder.connectionBuilder();
        }

        connectionBuilder.connection(connection);
        connectionBuilder.clientOptions(clientOptions);
        connectionBuilder.clientResources(clientResources);
        connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint);

        // 这个protect方法就是内部包装的传统的netty启动方法
        connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI);
        channelType(connectionBuilder, redisURI);

        if (clientOptions.isPingBeforeActivateConnection()) {
            if (hasPassword(redisURI)) {
                connectionBuilder.enableAuthPingBeforeConnect();
            } else {
                connectionBuilder.enablePingBeforeConnect();
            }
        }

        ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder);

        if (!clientOptions.isPingBeforeActivateConnection() && hasPassword(redisURI)) {

            future = future.thenApplyAsync(channelHandler -> {

                connection.async().auth(new String(redisURI.getPassword()));

                return channelHandler;
            }, clientResources.eventExecutorGroup());
        }

        if (LettuceStrings.isNotEmpty(redisURI.getClientName())) {
            future.thenApply(channelHandler -> {
                connection.setClientName(redisURI.getClientName());
                return channelHandler;
            });

        }

        if (redisURI.getDatabase() != 0) {

            future = future.thenApplyAsync(channelHandler -> {

                connection.async().select(redisURI.getDatabase());

                return channelHandler;
            }, clientResources.eventExecutorGroup());
        }

        return future.thenApply(channelHandler -> (S) connection);
    }

connectionBuilder 方法

    protected void connectionBuilder(Supplier<SocketAddress> socketAddressSupplier, ConnectionBuilder connectionBuilder,
            RedisURI redisURI) {

        Bootstrap redisBootstrap = new Bootstrap();
        redisBootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
        redisBootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
        redisBootstrap.option(ChannelOption.ALLOCATOR, BUF_ALLOCATOR);

        SocketOptions socketOptions = getOptions().getSocketOptions();

        redisBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
                Math.toIntExact(socketOptions.getConnectTimeout().toMillis()));

        if (LettuceStrings.isEmpty(redisURI.getSocket())) {
            redisBootstrap.option(ChannelOption.SO_KEEPALIVE, socketOptions.isKeepAlive());
            redisBootstrap.option(ChannelOption.TCP_NODELAY, socketOptions.isTcpNoDelay());
        }

        connectionBuilder.timeout(redisURI.getTimeout());
        connectionBuilder.password(redisURI.getPassword());

        connectionBuilder.bootstrap(redisBootstrap);
        connectionBuilder.channelGroup(channels).connectionEvents(connectionEvents).timer(timer);
        connectionBuilder.socketAddressSupplier(socketAddressSupplier);
    }

DefaultEventLoopGroupProvider中有创建NioEventLoopGroup的工作,默认大小是取系统变量io.netty.eventLoopThreads的值和cpu的个数,我的是4核,所有默认持有4个单线程的Executor执行器,task在每个里面都是由单个执行器来队列完成的。
查看服务器在跑的线程名称后发现实际只有一个
“lettuce-nioEventLoop-6-1” #965 daemon prio=5 os_prio=0 tid=0x00007fec152bc800 nid=0x75d4 runnable [0x00007fecc935f000]
应该是负载还没到,后期压力测试下看看

spring webflux

https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html