投稿    登录
欢迎加入Nice Coder,与众多Coder分享经验,群号:530244901

【原创】谈谈java的NIO和AIO

java wjx@admin.cc 261浏览 0评论

预备知识:

      再谈NIO和AIO之前。不得不先笼统的谈一下IO,IO就是input和output,我们这里特指是网络io,UNINX提供了五种IO模型,分别如下:(考虑到信号驱动IO用的不多,就不再赘述,感兴趣可以参考《UNINX网络编程》)

 阻塞式IO:

      这是最基本的也是最直观的io模型,我们在java套接字中,通过Socket和ServerSocket就可以实现阻塞式io,在客服务器端接收到客户端连接的时候,会一直等待客户端的数据传输,在客户端的数据包还没有完全传过来之前,客户端线程会一直等待数据包。这个称之为阻塞式io,如果遇到客户端的网络环境差,服务器长时间接收不到数据,这个线程就一直在傻傻的等着,这是非常耗费资源的。

 非阻塞IO:

      非阻塞io就是所谓的NIO这里N的意思应该是No Block(非阻塞)注意,非阻塞式IO并不是异步IO,非阻塞和阻塞式IO都是同步IO。非阻塞式IO是这样工作的,客户端在接收数据包的时候,会一直去轮询这个状态,看看数据包是否准备好。而不是阻塞在那里。

 IO复用模型:

      系统准备多个IO节点,注册到一个多路选择器上,选择器不断去轮询这些IO节点,当有某个节点就绪的时候,便会执行回调函数。

 异步IO:

      接收到客户端的连接之后,服务器端不会去一直等待数据传输,而是去做其他的东西,在数据传输完成之后,执行一个回调函数去进行处理。

异步/同步/阻塞/非阻塞 IO的区别和联系:

      同步异步是相对的,阻塞和非阻塞是相对的。

      同步和异步是针对通讯机制来说的,同步就是A发起一个请求,就会一直等待这个请求返回结果之后再继续向下执行,异步则是A发起一个请求,直接去执行其他的部分,这个请求在结束之后,自己去调用一个回调函数。这叫做异步。也就是说同步是调用者自己去调用处理的结果,异步是被调用函数去调用另一个函数去处理自己。

      阻塞和非阻塞是针对线程的状态来说的,阻塞就是在发起一个请求之后,这个线程不去做其他的事情,而是挂起,一直到这个请求结束,非阻塞就是这个请求发出之后,线程可以去做其他的事情。区别就是线程可不可以去做其他的事情。

JAVA  NIO

      很多人把NIO翻译成new IO,不过我感觉翻译为No-block io比较接近他的本意,也就是非阻塞式io,NIO虽然是非阻塞的,可是他是同步的,关于为什么,我们在看了例子之后再说。

      简单介绍一下NIO的执行方式。在介绍NIO的执行方式之前,先介绍一下几个概念

 缓冲区Buffer:

      缓冲区是一个对象,里面存的是数据,NIO进行通讯,传递的数据,都包装到Buffer中,Buffer是一个抽象类。子类有ByteBuffer、CharBuffer等,常用的是字节缓冲区,也就是ByteBuffer

 通道Channel:

      channel是一个通道,通道就是通流某种物质的管道,在这里就是通流数据,他和流的不同之处就在于,流是单向的,只能向一个方向流动,而通道是一个管道,有两端,是双向的,可以进行读操作,也可以写操作,或者两者同时进行。

 多路复用器Selector:

      多路复用器是一个大管家,他管理这通道,通道把自己注册到Selector上面,Selector会轮询注册到自己的管道,通过判断这个管道的不同的状态,来进行相应的操作。

下面是示例代码,在这个示例代码中,客户端向服务器端发送Hello server,I am client服务器端回应hello client!This is server,代码只是实例demo,异常都做抛出处理,并且没有关闭连接。

服务器端代码:

package top.wangjingxin.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * Created by 王镜鑫 on 2017/1/17.
 */
public class NioTestService
{
    public static void main(String[] args) throws IOException
    {
        int port = 8080;
        new Thread(new NioServiceHandler(port)).start();//线程开始
    }
}
class NioServiceHandler implements Runnable
{
    Selector selector;
    ServerSocketChannel serverSocketChannel;

    public NioServiceHandler(int port) throws IOException
    {
        selector = Selector.open();//实例化多路复选器
        serverSocketChannel = ServerSocketChannel.open();//初始化服务器端通道
        serverSocketChannel.configureBlocking(false);//配置服务器端通道为非阻塞式
        serverSocketChannel.socket().bind(new InetSocketAddress(port),1024);//配置监听的端口
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//把服务器通道注册到多路复选器上,设置当前的状态为“可以接受客户端”
    }
    public void run()
    {
        SelectionKey key = null;//SelectorKey包装着通道,放到多路选择器中。
        while (true)
        {
            try
            {
                selector.select(1000);//设置等待某个通道准备的最长阻塞时间。
            } catch (IOException e)
            {
                e.printStackTrace();
            }
            Set<SelectionKey> keys = selector.selectedKeys();//获取多路选择器上的已选择的键集,这个键集是就绪状态的通道的集合,还有一个方法叫keys()返回的是注册到这个多路复选器上的键,不管是就绪状态的还是非就绪状态的。
            Iterator<SelectionKey> it = keys.iterator();
            while (it.hasNext())//遍历该就绪通道的键集
            {
                key = it.next();
                it.remove();//移除这个键,必须手动移除,因为这个键处理过后就不能再继续存在了。
                try
                {
                    handlerKey(key);//处理这个键
                } catch (IOException e)
                {
                    e.printStackTrace();
                }
            }
        }
    }
    private void handlerKey(SelectionKey key) throws IOException
    {
        if(key.isValid())//如果这个键是有效的
        {
            if(key.isAcceptable())//如果这个键注册的是Acceptable,也就是说这个键是服务器端。
            {
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();//将这个键转化成一个服务器,直接强制转换
                SocketChannel sc = ssc.accept();//然后获取这个服务器接收到的客户端,因为键集中都是已经就绪的通道,所以必定会得到一个连接到这个服务器的一个客户端。
                sc.configureBlocking(false);//配置这个客户端是非阻塞的。
                sc.register(selector,SelectionKey.OP_READ);//将这个客户端注册到多路复选器上,监听READ操作的就绪与否。
            }
            if(key.isReadable())//如果这个键是一个可以读的键,也就是说这个键是在获取客户端之后,客户端的读取操作已经就绪。现在就可以进行读取了。
            {
                SocketChannel sc = (SocketChannel) key.channel();//将这个键转化成一个客户端通道
                ByteBuffer requestBuffer = ByteBuffer.allocate(1024);//定义一个1024大小的字节缓冲区。
                int requestLength = sc.read(requestBuffer);//从客户端读取数据,读到字节缓冲区中
                if(requestLength>0)//如果的确读到东西了
                {
                    requestBuffer.flip();//初始化这个字节缓冲区。在上面进行读入之后,这个字节缓冲区的限制指针和位置指针都已经改变了。这个操作是将其还原、
                    byte[] request = new byte[requestBuffer.remaining()];//定义一个字节数组,大小就是这个字节缓冲区的字节的多少。
                    requestBuffer.get(request);//把字节缓冲区的内容复制到字节数组中。
                    String body = new String(request,"utf8");//用utf8编码,将字节数组中的内容编码为字符串。
                    System.out.println("request:"+body);
                    if(body.equalsIgnoreCase("Hello server,I am client"))//如果客户端给的请求内容正确。那么就直接反馈。
                    {
                        byte[] response = "hello client!This is server".getBytes();//将回复的内容转化成字节数组。
                        ByteBuffer responseBuffer = ByteBuffer.allocate(response.length);//定义一个字节缓冲区,用作回复
                        responseBuffer.put(response);//将字节数组的内容复制到字节缓冲区中。
                        responseBuffer.flip();//初始化这个字节数组。
                        sc.write(responseBuffer);//写到客户端。
                    }
                }
            }
        }
    }
}

下面是客户端的代码。

package top.wangjingxin.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * Created by 王镜鑫 on 2017/1/17.
 */
public class NioTestClient
{
    public static void main(String[] args) throws IOException
    {
        String host = "127.0.0.1";//定义ip
        int port = 8080;//定义端口
        new Thread(new NioClientHandler(host,port)).start();//线程启动。
    }
}
class NioClientHandler implements Runnable
{
    String host;
    int port;
    Selector selector;
    SocketChannel socketChannel;
    public NioClientHandler(String host,int port) throws IOException
    {
        this.port = port;
        this.host = host;
        selector = Selector.open();//实例化多路复选器
        socketChannel = SocketChannel.open();//实例化客户端
        socketChannel.configureBlocking(false);//配置客户端为非阻塞式
    }
    public void run()
    {
        try
        {
            connect();//客户端去连接服务器端
            while (true)
            {
                selector.select(1000);//配置就绪通道的等待时间
                Set<SelectionKey> keys = selector.selectedKeys();//获取就绪通道的键集
                Iterator<SelectionKey> it = keys.iterator();
                SelectionKey key = null;
                while (it.hasNext())
                {
                    key = it.next();
                    it.remove();
                    handlerKey(key);//处理该键
                }
            }
        } catch (IOException e)
        {
            e.printStackTrace();
        }
    }
    private void handlerKey(SelectionKey key) throws IOException
    {
        if(key.isValid())
        {
            SocketChannel sc = (SocketChannel) key.channel();//因为注册到多路复选器的肯定是个客户端,所以直接把键转化成客户端
            if(key.isReadable())//如果客户端现在的状态是可以读取的。也就是说现在的回复得到了响应。
            {
                ByteBuffer responseBuffer = ByteBuffer.allocate(1024);//定义一个大小为1024的字节缓冲区
                int responseLength = sc.read(responseBuffer);//在客户端读数据存到字节数组中
                if(responseLength>0)//转化成字符串,不再赘述
                {
                    responseBuffer.flip();
                    byte[] bytes = new byte[responseBuffer.remaining()];
                    responseBuffer.get(bytes);
                    String responseBody = new String(bytes,"utf8");
                    System.out.println("response:"+responseBody);
                }
            }
            if(key.isConnectable())//如果现在客户端OP_CONNECT操作是就绪状态,这时候可能是还在连接。
            {
                if(sc.finishConnect())//如果已经连接成功
                {
                    sc.register(selector,SelectionKey.OP_READ);//注册为可以读取的状态
                    write(sc);//写入请求。
                }
            }
        }
    }
    private void write(SocketChannel socketChannel) throws IOException
    {
        byte[] request = "Hello server,I am client".getBytes();
        ByteBuffer requestBuffer = ByteBuffer.allocate(request.length);
        requestBuffer.put(request);
        requestBuffer.flip();
        socketChannel.write(requestBuffer);
    }
    private void connect() throws IOException
    {
        if(socketChannel.connect(new InetSocketAddress(host,port)))//如果直接连接成功
        {
            socketChannel.register(selector, SelectionKey.OP_READ);//注册读取操作。
            write(socketChannel);//写入请求到服务器端。
        }else
        {
            socketChannel.register(selector,SelectionKey.OP_CONNECT);//否则注册可连接接操作。
        }
    }
}

上面就是NIO的一个简单实例,可见,相对于BIO,也就是阻塞式IO,NIO麻烦的多。总的来说,工作方式如下:

服务器端:

  1、打开ServerSocketChannel

  2、绑定监听的端口

  3、创建Selector

  4、将ServerSocketChannel的Accept操作注册到多路选择器上

  5、遍历多路选择器上就绪的Key,处理该key

  6、如果该键是可以Accept状态,读取服务器端的请求,然后注册为可写

  7、如果是Read状态,就写入响应

客户端:

  1、打开SocketChannel

  2、设置监听的主机和端口,设置异步

  3、创建Selector

  4、连接客户端,如果连接成功,则注册到多路复用器上,并且发送请求,否则注册Connect状态

  5、遍历多路选择器上就绪的Key,处理该Key

  6、如果是Connect状态,说明正在连接,判断是否连接成功,如果连接成功,就注册为Read状态,写入请求

  7、如果是Read状态,就入去响应

以上就是NIO的工作方式,总结一下工作方式,客户端和服务器端都是通道,通道具有事件,可以将事件注册到多路复选器上,事件有就绪和非就绪两种状态,就绪的状态会放到多路复选器的就绪键的集合中,起一个线程不断地去轮询就绪的状态,根据不同的状态做不同的处理。这个就是他的核心思想。

      NIO相比较普通的BIO,他的先进之处就在于,后者在建立连接,读取数据的时候,是去一直等待连接建立结束,数据读取结束的。而后者是注册一个状态,然后就放任不管,然后等他建立连接结束,读取数据结束的时候将该状态放到多路复选器的就绪键集中,而且有一个线程不断地去轮询就绪键集,不断地去处理就绪的键,注意! 这里说的读取数据是系统从网络读取数据到本地,这个过程是由系统完成的。代码中的read其实是直接从缓冲区中读取。BIO一直等待建立连接,读取数据结束,在未结束之前,该线程不去做其他的事情,会被挂起,所以称之为阻塞式。 而NIO是将状态注册到多路复选器中之后,不去管,线程去做其他的事情,所以叫非阻塞。那上面说的NIO是同步非阻塞是什么意思呢?对于NIO,注册到多路复选器上的某个状态就绪之后,这个状态对应的键会放到一个就绪的键集中,然后等待多路复选器去轮训到这个就绪的状态,然后调用某一个函数去处理他,而不是他自己主动去调用某个回调函数处理自己,所以从这个角度来看,NIO是同步的。

JAVA AIO

 AIO就是异步IO,A就是asynchronous的意思,因为NIO1.0虽然面向缓冲,利用多路复选器实现了同步非阻塞IO,可是在NIO1.0中需要使用一个线程不断去轮询就绪集合,开销也是比较大的,所以在jdk1.7中扩展了NIO,称之为NIO2.0,NIO2.0中引入了AIO,此外NIO2.0中还引入了异步文件通道。下面先介绍一下AIO相关知识。

 CompletionHandler接口:

      这个接口是一个专门用来定义回调操作的接口,实现了这个接口的类可以作为某个操作的回调函数,该接口定义了两个方法,一个是completed一个是fail,分别对应某个操作成功和失败的回调策略。在AIO中我们将会大量的使用该接口。

 CountDownLatch类:

      这个类在 java.util.cocurrent包下,作用就是调用它的await方法,这个线程就会一直等待,这个是用来辅助测试AIO的,想了解更多的话请继续关注博主的博客。

下面是示例代码,代码中,客户端发送Hello Aio Server!,服务器端接收到之后,响应hello aio client!

下面是服务器端代码

package top.wangjingxin.aio;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;

/**
 * Created by 王镜鑫 on 2017/1/17.
 */
public class AioTestServer
{
    public static void main(String[] args) throws IOException
    {
        int port = 8080;//设置端口号
        new Thread(new AioServerHandler(port)).start();//线程启动
    }
}
class AioServerHandler implements Runnable
{
    int port;
    CountDownLatch latch = null;
    AsynchronousServerSocketChannel serverSocketChannel = null;//定义异步服务器
    public AioServerHandler(int port) throws IOException
    {
        this.port = port;
        serverSocketChannel = AsynchronousServerSocketChannel.open();//实例化异步服务器
        serverSocketChannel.bind(new InetSocketAddress(port));//配置信息
    }
    public void run()
    {
        latch = new CountDownLatch(1);
        serverSocketChannel.accept(this,new AioAcceptCompleteHandler());//定义服务器的接收客户端连接操作,设置回调函数是AioAcceptCompleteHandler,第一个参数是一个附加参数,可以自定义,不过要求处理器的第二个泛型参数必须是这个附加参数的父类或者祖宗类    
        try
        {
            latch.await();//阻塞住这个线程,方便测试,开发的时候不会这样。具体怎么阻塞的请自行查阅资料
        } catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}
class AioAcceptCompleteHandler implements CompletionHandler<AsynchronousSocketChannel,AioServerHandler>//这个类就是accept操作的回调函数,实现了CompletionHandler接口,泛型信息中,第一个泛型必须是异步客户端类 {
    public void completed(AsynchronousSocketChannel result, AioServerHandler attachment)//传入两个参数,一个是接受的异步客户端,一个是accept方法中传入的自定义的附加参数
    {
        attachment.serverSocketChannel.accept(attachment,this);//这里为什么还得再重新调用accept方法,一会在解释。
        ByteBuffer request = ByteBuffer.allocate(1024);//定义一个1024大小的字节缓冲区
        result.read(request,request,new AioReadCompletionHandler(result));//调用接收到的异步客户端的读取方法,第一个参数是把请求信息读取到刚才定义的字节缓冲区中,第二个参数是一个自定义的附加参数,然后配置一个回调函数,来处理读取之后的操作。这个回调函数仍然实现了CompletionHandler接口。
    }
    public void failed(Throwable exc, AioServerHandler attachment)//失败的就不写了,因为这是个测试demo,不考虑鲁棒性
    {

    }
}
class AioReadCompletionHandler implements CompletionHandler<Integer,ByteBuffer>//读取操作的回调函数,两个泛型参数第一个规定必须是Integer,第二个必须是附加参数的父类或者祖宗类。
{
    AsynchronousSocketChannel socketChannel = null;
    public AioReadCompletionHandler(AsynchronousSocketChannel socketChannel)
    {
        this.socketChannel = socketChannel;//构造方法中把客户端的引用传过来。
    }
    public void completed(Integer result, ByteBuffer attachment)//这里是进行读取成功之后的回调。
    {
        attachment.flip();
        byte[] request = new byte[attachment.remaining()];
        attachment.get(request);
        try
        {
            String requestBody = new String(request,"utf8");//将请求转化为字符串
            System.out.println("request:"+requestBody);
            write();//执行写操作,为该请求提供响应
        } catch (UnsupportedEncodingException e)
        {
            e.printStackTrace();
        }
    }
    private void write()
    {
        byte[] response = "hello aio client!".getBytes();
        ByteBuffer responseBuffer = ByteBuffer.allocate(response.length);
        responseBuffer.put(response);
        responseBuffer.flip();
        socketChannel.write(responseBuffer, responseBuffer, new CompletionHandler<Integer, ByteBuffer>()//这里是写响应到客户端之后的回调函数,同样,泛型的第二个参数必须是附加参数的祖宗类或者父类,这里直接用了匿名类。
        {
            public void completed(Integer result, ByteBuffer attachment)
            {
                if(attachment.hasRemaining())//如果还没有写完
                {
                    socketChannel.write(attachment,attachment,this);//那么就继续写入
                }
            }
            public void failed(Throwable exc, ByteBuffer attachment)
            {

            }
        });
    }
    public void failed(Throwable exc, ByteBuffer attachment)
    {

    }
}

下面是客户端的代码

package top.wangjingxin.aio;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;

/**
 * Created by 王镜鑫 on 2017/1/17.
 */
public class AioTestClient
{
    public static void main(String[] args) throws IOException
    {
        int port = 8080;
        String host = "127.0.0.1";
        new Thread(new AioClientHandler(host,port)).start();//线程启动
    }
}
class AioClientHandler implements Runnable
{
    String host;
    int port;
    AsynchronousSocketChannel socketChannel;
    CountDownLatch latch;
    public AioClientHandler(String host,int port) throws IOException
    {
        this.host = host;
        this.port = port;
        socketChannel = AsynchronousSocketChannel.open();//实例化一个异步客户端
    }
    public void run()
    {
        latch = new CountDownLatch(1);
        socketChannel.connect(new InetSocketAddress(host,port),this,new AioConnectCompleteHandler());//进行连接,泛型和参数要求和服务器端一样。
        try
        {
            latch.await();//阻塞住当前线程
        } catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}
class AioConnectCompleteHandler implements CompletionHandler<Void,AioClientHandler>//连接完成之后的回调函数,泛型第一个参数类型必须是Void
{
    public void completed(Void result, AioClientHandler attachment)
    {
        byte[] request = "Hello Aio Server!".getBytes();
        ByteBuffer requestBuffer = ByteBuffer.allocate(request.length);
        requestBuffer.put(request);
        requestBuffer.flip();
        attachment.socketChannel.write(requestBuffer,requestBuffer,new AioWriteCompletionHandler());//连接好了之后就开始向服务器端写东西,写了东西就得回调。其中回调的泛型参数老规矩。
    }
    public void failed(Throwable exc, AioClientHandler attachment)
    {

    }
}
class AioWriteCompletionHandler implements CompletionHandler<Integer,ByteBuffer>//这里就是写入成功之后的回调,这个回调中可以获取响应的内容。
{
    public void completed(Integer result, ByteBuffer attachment)
    {
        attachment.flip();
        byte[] response = new byte[attachment.remaining()];
        attachment.get(response);
        try
        {
            String responseBody = new String(response,"utf8");
            System.out.println("response:"+responseBody);
        } catch (UnsupportedEncodingException e)
        {
            e.printStackTrace();
        }
    }
    public void failed(Throwable exc, ByteBuffer attachment)
    {

    }
}

以上就是AIO的工作流程,我们可以看到。虽然大量使用泛型,匿名类,但是原理很清晰简单,就是对客户端和服务器端的各种操作进行回调函数的注册。在完成某个操作之后,就会自己去调用该注册到该操作的回调函数,达到异步的效果。上面还留了一个问题,就是在进行accept操作的回调函数时,他又调用了服务器端的accept操作,原因是这样的。因为一个异步服务器端可以接收成千上万的客户端,所以需要在成功接入一个客户端之后继续调用accept方法,再异步接收其他的客户端的连接,最终形成一个循环。

以上就是本人对NIO和AIO的认知,能力有限,难免疏漏,欢迎批评指正。

转载请注明:王镜鑫的个人博客 » 【原创】谈谈java的NIO和AIO

喜欢 (8)or分享 (0)

您的支持是博主写作最大的动力,如果您喜欢我的文章,感觉我的文章对您有帮助,请狠狠点击下面的

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址