BIO、NIO、AIO区别及实战

    本章节将补上上次IO系列提过的BIO、NIO、AIO学习,本章将从传统的BIO到NIO再到AIO自浅至深介绍,并对相应的技术点进行分析及代码。

  首先了解IO的基本概念:  

      1.   基本 概念

           IO 是主存和外部设备 ( 硬盘、终端和网络等 ) 拷贝数据的过程。 IO 是操作系统的底层功能实现,底层通过 I/O 指令进行完成。

所有语言运行时系统提供执行 I/O 较高级别的工具。 (c 的 printf scanf,java 的面向对象封装 )

     2.    Java 标准 io 回顾

            Java 标准 IO 类库是 io 面向对象的一种抽象。基于本地方法的底层实现,我们无须关注底层实现。InputStream\OutputStream( 字节流 ) :一次传送一个字节。 Reader\Writer( 字符流 ) :一次一个字符。

  1、BIO编程

   1.1、传统的BIO编程

       网络编程的基本模型是C/S模型,即两个进程间的通信。

       服务端提供IP和监听端口,客户端通过连接操作想服务端监听的地址发起连接请求,通过三次握手连接,如果连接成功建立,双方就可以通过套接字进行通信。

       传统的同步阻塞模型开发中,ServerSocket负责绑定IP地址,启动监听端口;Socket负责发起连接操作。连接成功后,双方通过输入和输出流进行同步阻塞式通信。 

       简单的描述一下BIO的服务端通信模型:采用BIO通信模型的服务端,通常由一个独立的Acceptor线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理没处理完成后,通过输出流返回应答给客户端,线程销毁。即典型的一请求一应答通宵模型。

    传统BIO通信模型图:

blob.png

        该模型最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈1:1的正比关系,Java中的线程也是比较宝贵的系统资源,线程数量快速膨胀后,系统的性能将急剧下降,随着访问量的继续增大,系统最终就死-掉-了。

同步阻塞式I/O创建的Server源码:

 public class Server {
		final static int PROT = 8765;	
		public static void main(String[] args) {			
			ServerSocket server = null;
			try {
				server = new ServerSocket(PROT);
				System.out.println(" server start .. ");
				//进行阻塞
				Socket socket = server.accept();
				//新建一个线程执行客户端的任务
				new Thread(new ServerHandler(socket)).start();
				
			} catch (Exception e) {
				e.printStackTrace();
			} finally {
				if(server != null){
					try {
						server.close();
					} catch (IOException e) {
						e.printStackTrace();
					}
				}
				server = null;
			}
		}

    客户端消息处理线程ServerHandler源码:

public class ServerHandler implements Runnable{
	private Socket socket ;	
	public ServerHandler(Socket socket){
		this.socket = socket;
	}	
	@Override
	public void run() {
		BufferedReader in = null;
		PrintWriter out = null;
		try {
			in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
			out = new PrintWriter(this.socket.getOutputStream(), true);
			String body = null;
			while(true){
				body = in.readLine();
				if(body == null) break;
				System.out.println("Server :" + body);
				out.println("服务器端回送响的应数据.");
			}
			
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if(in != null){
				try {
					in.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
			if(out != null){
				try {
					out.close();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
			if(socket != null){
				try {
					socket.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
			socket = null;
		}
		
		
	}

}

  同步阻塞式I/O创建的Client源码:

public class Client {
	final static String ADDRESS = "127.0.0.1";
	final static int PORT = 8765;	
	public static void main(String[] args) {	
		Socket socket = null;
		BufferedReader in = null;
		PrintWriter out = null;
		try {
			socket = new Socket(ADDRESS, PORT);
			in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
			out = new PrintWriter(socket.getOutputStream(), true);
			
			//向服务器端发送数据
			out.println("接收到客户端的请求数据...");
			out.println("接收到客户端的请求数据1111...");
			String response = in.readLine();
			System.out.println("Client: " + response);
			
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			if(in != null){
				try {
					in.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
			if(out != null){
				try {
					out.close();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
			if(socket != null){
				try {
					socket.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
			socket = null;
		}
	}
}

      从以上代码,很容易看出,BIO主要的问题在于每当有一个新的客户端请求接入时,服务端必须创建一个新的线程来处理这条链路,在需要满足高性能、高并发的场景是没法应用的(大量创建新的线程会严重影响服务器性能,甚至罢工)。

 1.2、伪异步I/O编程

       为了改进这种一连接一线程的模型,我们可以使用线程池来管理这些线程(需要了解更多请参考前面提供的文章),实现1个或多个线程处理N个客户端的模型(但是底层还是使用的同步阻塞I/O),通常被称为“伪异步I/O模型“。

       伪异步I/O模型图:

blob.png

public final class ServerBetter {  
    //默认的端口号  
    private static int DEFAULT_PORT = 12345;  
    //单例的ServerSocket  
    private static ServerSocket server;  
    //线程池 懒汉式的单例  
    private static ExecutorService executorService = Executors.newFixedThreadPool(60);  
    //根据传入参数设置监听端口,如果没有参数调用以下方法并使用默认值  
    public static void start() throws IOException{  
        //使用默认值  
        start(DEFAULT_PORT);  
    }  
    //这个方法不会被大量并发访问,不太需要考虑效率,直接进行方法同步就行了  
    public synchronized static void start(int port) throws IOException{  
        if(server != null) return;  
        try{  
            //通过构造函数创建ServerSocket  
            //如果端口合法且空闲,服务端就监听成功  
            server = new ServerSocket(port);  
            System.out.println("服务器已启动,端口号:" + port);  
            Socket socket;  
            //通过无线循环监听客户端连接  
            //如果没有客户端接入,将阻塞在accept操作上。  
            while(true){  
                socket = server.accept();  
                //当有新的客户端接入时,会执行下面的代码  
                //然后创建一个新的线程处理这条Socket链路  
                executorService.execute(new ServerHandler(socket));  
            }  
        }finally{  
            //一些必要的清理工作  
            if(server != null){  
                System.out.println("服务器已关闭。");  
                server.close();  
                server = null;  
            }  
        }  
    }  
}

      我们知道,如果使用CachedThreadPool线程池(不限制线程数量,如果不清楚请参考之前的系列文章),其实除了能自动帮我们管理线程(复用),看起来也就像是1:1的客户端:线程数模型,而使用FixedThreadPool我们就有效的控制了线程的最大数量,保证了系统有限的资源的控制,实现了N:M的伪异步I/O模型。

    但是,正因为限制了线程数量,如果发生大量并发请求,超过最大数量的线程就只能等待,直到线程池中的有空闲的线程可以被复用。而对Socket的输入流就行读取时,会一直阻塞,直到发生:

    有数据可读

    可用数据以及读取完毕

    发生空指针或I/O异常

    所以在读取数据较慢时(比如数据量大、网络传输慢等),大量并发的情况下,其他接入的消息,只能一直等待,这就是最大的弊端。

    而后面即将介绍的NIO,就能解决这个难题。

2、NIO 编程

     JDK 1.4中的java.nio.*包中引入新的Java I/O库,其目的是提高速度。实际上,“旧”的I/O包已经使用NIO重新实现过,即使我们不显式的使用NIO编程,也能从中受益。速度的提高在文件I/O和网络I/O中都可能会发生,但本文只讨论后者。

   NIO提供了与传统BIO模型中的Socket和ServerSocket相对应的SocketChannel和ServerSocketChannel两种不同的套接字通道实现。 

  在 jdk1.4 里提供的新 api 。 Sun 官方标榜的特性如下:

–     为所有的原始类型提供 (Buffer) 缓存支持。

–     字符集编码解码解决方案。

–     Channel :一个新的原始 I/O 抽象。

–     支持锁和内存映射文件的文件访问接口。

–     提供多路 (non-bloking) 非阻塞式的高伸缩性网络 I/O 。

 2.1、缓冲区 Buffer

      Buffer是一个对象,包含一些要写入或者读出的数据。

      在NIO库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的;在写入数据时,也是写入到缓冲区中。任何时候访问NIO中的数据,都是通过缓冲区进行操作。

    缓冲区实际上是一个数组,并提供了对数据结构化访问以及维护读写位置等信息。

    具体的缓存区有这些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。他们实现了相同的接口:Buffer。

 2.2、通道 Channel

    我们对数据的读取和写入要通过Channel,它就像水管一样,是一个通道。通道不同于流的地方就是通道是双向的,可以用于读、写和同时读写操作。

    底层的操作系统的通道一般都是全双工的,所以全双工的Channel比流能更好的映射底层操作系统的API。

    Channel主要分两大类:

    SelectableChannel:用户网络读写

    FileChannel:用于文件操作

    后面代码会涉及的ServerSocketChannel和SocketChannel都是SelectableChannel的子类。

blob.png  

图:channel和buffer关系

 详细的简介:http://www.iteye.com/topic/834447

2.3、多路复用器 Selector

     Selector是Java  NIO 编程的基础。

    Selector提供选择已经就绪的任务的能力:Selector会不断轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。

    一个Selector可以同时轮询多个Channel,因为JDK使用了epoll()代替传统的select实现,所以没有最大连接句柄1024/2048的限制。所以,只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端。

2.4、NIO服务端代码演示:

    代码比传统的Socket编程看起来要复杂不少。

    直接贴代码吧,以注释的形式给出代码说明。

    NIO创建的Server源码:

public class Server {  
    private static int DEFAULT_PORT = 12345;  
    private static ServerHandle serverHandle;  
    public static void start(){  
        start(DEFAULT_PORT);  
    }  
    public static synchronized void start(int port){  
        if(serverHandle!=null)  
            serverHandle.stop();  
        serverHandle = new ServerHandle(port);  
        new Thread(serverHandle,"Server").start();  
    }  
    public static void main(String[] args){  
        start();  
    }  
}

ServerHandle:

public class ServerHandle implements Runnable{  
    private Selector selector;  
    private ServerSocketChannel serverChannel;  
    private volatile boolean started;  
    /** 
     * 构造方法 
     * @param port 指定要监听的端口号 
     */  
    public ServerHandle(int port) {  
        try{  
            //创建选择器  
            selector = Selector.open();  
            //打开监听通道  
            serverChannel = ServerSocketChannel.open();  
            //如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式  
            serverChannel.configureBlocking(false);//开启非阻塞模式  
            //绑定端口 backlog设为1024  
            serverChannel.socket().bind(new InetSocketAddress(port),1024);  
            //监听客户端连接请求  
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);  
            //标记服务器已开启  
            started = true;  
            System.out.println("服务器已启动,端口号:" + port);  
        }catch(IOException e){  
            e.printStackTrace();  
            System.exit(1);  
        }  
    }  
    public void stop(){  
        started = false;  
    }  
	
    @Override  
    public void run() {  
        //循环遍历selector  
        while(started){  
            try{  
                //无论是否有读写事件发生,selector每隔1s被唤醒一次  
                selector.select(1000);  
                //阻塞,只有当至少一个注册的事件发生的时候才会继续.  
//              selector.select();  
                Set<SelectionKey> keys = selector.selectedKeys();  
                Iterator<SelectionKey> it = keys.iterator();  
                SelectionKey key = null;  
                while(it.hasNext()){  
                    key = it.next();  
                    it.remove();  
                    try{  
                        handleInput(key);  
                    }catch(Exception e){  
                        if(key != null){  
                            key.cancel();  
                            if(key.channel() != null){  
                                key.channel().close();  
                            }  
                        }  
                    }  
                }  
            }catch(Throwable t){  
                t.printStackTrace();  
            }  
        }  
        //selector关闭后会自动释放里面管理的资源  
        if(selector != null)  
            try{  
                selector.close();  
            }catch (Exception e) {  
                e.printStackTrace();  
            }  
    }  
    private void handleInput(SelectionKey key) throws IOException{  
        if(key.isValid()){  
            //处理新接入的请求消息  
            if(key.isAcceptable()){  
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();  
                //通过ServerSocketChannel的accept创建SocketChannel实例  
                //完成该操作意味着完成TCP三次握手,TCP物理链路正式建立  
                SocketChannel sc = ssc.accept();  
                //设置为非阻塞的  
                sc.configureBlocking(false);  
                //注册为读  
                sc.register(selector, SelectionKey.OP_READ);  
            }  
            //读消息  
            if(key.isReadable()){  
                SocketChannel sc = (SocketChannel) key.channel();  
                //创建ByteBuffer,并开辟一个1M的缓冲区  
                ByteBuffer buffer = ByteBuffer.allocate(1024);  
                //读取请求码流,返回读取到的字节数  
                int readBytes = sc.read(buffer);  
                //读取到字节,对字节进行编解码  
                if(readBytes>0){  
                    //将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作  
                    buffer.flip();  
                    //根据缓冲区可读字节数创建字节数组  
                    byte[] bytes = new byte[buffer.remaining()];  
                    //将缓冲区可读字节数组复制到新建的数组中  
                    buffer.get(bytes);  
                    String expression = new String(bytes,"UTF-8");  
                    System.out.println("服务器收到消息:" + expression);  
                    //处理数据  
                    String result = null;  
                    try{  
                        result = Calculator.cal(expression).toString();  
                    }catch(Exception e){  
                        result = "计算错误:" + e.getMessage();  
                    }  
                    //发送应答消息  
                    doWrite(sc,result);  
                }  
                //没有读取到字节 忽略  
//              else if(readBytes==0);  
                //链路已经关闭,释放资源  
                else if(readBytes<0){  
                    key.cancel();  
                    sc.close();  
                }  
            }  
        }  
    }  
    //异步发送应答消息  
    private void doWrite(SocketChannel channel,String response) throws IOException{  
        //将消息编码为字节数组  
        byte[] bytes = response.getBytes();  
        //根据数组容量创建ByteBuffer  
        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);  
        //将字节数组复制到缓冲区  
        writeBuffer.put(bytes);  
        //flip操作  
        writeBuffer.flip();  
        //发送缓冲区的字节数组  
        channel.write(writeBuffer);  
        //****此处不含处理“写半包”的代码  
    }  
}

 

可以看到,创建NIO服务端的主要步骤如下:

    打开ServerSocketChannel,监听客户端连接

    绑定监听端口,设置连接为非阻塞模式

    创建Reactor线程,创建多路复用器并启动线程

    将ServerSocketChannel注册到Reactor线程中的Selector上,监听ACCEPT事件

    Selector轮询准备就绪的key

    Selector监听到新的客户端接入,处理新的接入请求,完成TCP三次握手,简历物理链路

    设置客户端链路为非阻塞模式

    将新接入的客户端连接注册到Reactor线程的Selector上,监听读操作,读取客户端发送的网络消息

    异步读取客户端消息到缓冲区

    对Buffer编解码,处理半包消息,将解码成功的消息封装成Task

    将应答消息编码为Buffer,调用SocketChannel的write将消息异步发送给客户端

    因为应答消息的发送,SocketChannel也是异步非阻塞的,所以不能保证一次能吧需要发送的数据发送完,此时就会出现写半包的问题。我们需要注册写操作,不断轮询Selector将没有发送完的消息发送完毕,然后通过Buffer的hasRemain()方法判断消息是否发送完成。

2.5、NIO客户端代码演示:

    还是直接上代码吧,过程也不需要太多解释了,跟服务端代码有点类似。

   Client: 

public class Client {  
    private static String DEFAULT_HOST = "127.0.0.1";  
    private static int DEFAULT_PORT = 12345;  
    private static ClientHandle clientHandle;  
    public static void start(){  
        start(DEFAULT_HOST,DEFAULT_PORT);  
    }  
    public static synchronized void start(String ip,int port){  
        if(clientHandle!=null)  
            clientHandle.stop();  
        clientHandle = new ClientHandle(ip,port);  
        new Thread(clientHandle,"Server").start();  
    }  
    //向服务器发送消息  
    public static boolean sendMsg(String msg) throws Exception{  
        if(msg.equals("q")) return false;  
        clientHandle.sendMsg(msg);  
        return true;  
    }  
    public static void main(String[] args){  
        start();  
    }  
}

ClientHandle:

public class ClientHandle implements Runnable{  
    private String host;  
    private int port;  
    private Selector selector;  
    private SocketChannel socketChannel;  
    private volatile boolean started;  
  
    public ClientHandle(String ip,int port) {  
        this.host = ip;  
        this.port = port;  
        try{  
            //创建选择器  
            selector = Selector.open();  
            //打开监听通道  
            socketChannel = SocketChannel.open();  
            //如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式  
            socketChannel.configureBlocking(false);//开启非阻塞模式  
            started = true;  
        }catch(IOException e){  
            e.printStackTrace();  
            System.exit(1);  
        }  
    }  
    public void stop(){  
        started = false;  
    }  
    @Override  
    public void run() {  
        try{  
            doConnect();  
        }catch(IOException e){  
            e.printStackTrace();  
            System.exit(1);  
        }  
        //循环遍历selector  
        while(started){  
            try{  
                //无论是否有读写事件发生,selector每隔1s被唤醒一次  
                selector.select(1000);  
                //阻塞,只有当至少一个注册的事件发生的时候才会继续.  
//              selector.select();  
                Set<SelectionKey> keys = selector.selectedKeys();  
                Iterator<SelectionKey> it = keys.iterator();  
                SelectionKey key = null;  
                while(it.hasNext()){  
                    key = it.next();  
                    it.remove();  
                    try{  
                        handleInput(key);  
                    }catch(Exception e){  
                        if(key != null){  
                            key.cancel();  
                            if(key.channel() != null){  
                                key.channel().close();  
                            }  
                        }  
                    }  
                }  
            }catch(Exception e){  
                e.printStackTrace();  
                System.exit(1);  
            }  
        }  
        //selector关闭后会自动释放里面管理的资源  
        if(selector != null)  
            try{  
                selector.close();  
            }catch (Exception e) {  
                e.printStackTrace();  
            }  
    }  
    private void handleInput(SelectionKey key) throws IOException{  
        if(key.isValid()){  
            SocketChannel sc = (SocketChannel) key.channel();  
            if(key.isConnectable()){  
                if(sc.finishConnect());  
                else System.exit(1);  
            }  
            //读消息  
            if(key.isReadable()){  
                //创建ByteBuffer,并开辟一个1M的缓冲区  
                ByteBuffer buffer = ByteBuffer.allocate(1024);  
                //读取请求码流,返回读取到的字节数  
                int readBytes = sc.read(buffer);  
                //读取到字节,对字节进行编解码  
                if(readBytes>0){  
                    //将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作  
                    buffer.flip();  
                    //根据缓冲区可读字节数创建字节数组  
                    byte[] bytes = new byte[buffer.remaining()];  
                    //将缓冲区可读字节数组复制到新建的数组中  
                    buffer.get(bytes);  
                    String result = new String(bytes,"UTF-8");  
                    System.out.println("客户端收到消息:" + result);  
                }  
                //没有读取到字节 忽略  
//              else if(readBytes==0);  
                //链路已经关闭,释放资源  
                else if(readBytes<0){  
                    key.cancel();  
                    sc.close();  
                }  
            }  
        }  
    }  
    //异步发送消息  
    private void doWrite(SocketChannel channel,String request) throws IOException{  
        //将消息编码为字节数组  
        byte[] bytes = request.getBytes();  
        //根据数组容量创建ByteBuffer  
        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);  
        //将字节数组复制到缓冲区  
        writeBuffer.put(bytes);  
        //flip操作  
        writeBuffer.flip();  
        //发送缓冲区的字节数组  
        channel.write(writeBuffer);  
        //****此处不含处理“写半包”的代码  
    }  
    private void doConnect() throws IOException{  
        if(socketChannel.connect(new InetSocketAddress(host,port)));  
        else socketChannel.register(selector, SelectionKey.OP_CONNECT);  
    }  
    public void sendMsg(String msg) throws Exception{  
        socketChannel.register(selector, SelectionKey.OP_READ);  
        doWrite(socketChannel, msg);  
    }  
}

2.6、演示结果

  首先运行服务器,顺便也运行一个客户端:

public class Test {  
    //测试主方法  
    @SuppressWarnings("resource")  
    public static void main(String[] args) throws Exception{  
        //运行服务器  
        Server.start();  
        //避免客户端先于服务器启动前执行代码  
        Thread.sleep(100);  
        //运行客户端   
        Client.start();  
        while(Client.sendMsg(new Scanner(System.in).nextLine()));  
    }  
}

3.AIO编程

    NIO 2.0引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现。

    异步的套接字通道时真正的异步非阻塞I/O,对应于UNIX网络编程中的事件驱动I/O(AIO)。他不需要过多的Selector对注册的通道进行轮询即可实现异步读写,从而简化了NIO的编程模型。

    直接上代码吧。

public class Server {
	//线程池
	private ExecutorService executorService;
	//线程组
	private AsynchronousChannelGroup threadGroup;
	//服务器通道
	public AsynchronousServerSocketChannel assc;
	
	public Server(int port){
		try {
			//创建一个缓存池
			executorService = Executors.newCachedThreadPool();
			//创建线程组
			threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
			//创建服务器通道
			assc = AsynchronousServerSocketChannel.open(threadGroup);
			//进行绑定
			assc.bind(new InetSocketAddress(port));
			
			System.out.println("server start , port : " + port);
			//进行阻塞
			assc.accept(this, new ServerCompletionHandler());
			//一直阻塞 不让服务器停止
			Thread.sleep(Integer.MAX_VALUE);
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) {
		Server server = new Server(8765);
	}
	
}


public class ServerCompletionHandler 
         implements CompletionHandler<AsynchronousSocketChannel, Server> {

	@Override
	public void completed(AsynchronousSocketChannel asc, Server attachment) {
		//当有下一个客户端接入的时候 直接调用Server的accept方法,这样反复执行下去,保证多个客户端都可以阻塞
		attachment.assc.accept(attachment, this);
		read(asc);
	}

	private void read(final AsynchronousSocketChannel asc) {
		//读取数据
		ByteBuffer buf = ByteBuffer.allocate(1024);
		asc.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() {
			@Override
			public void completed(Integer resultSize, ByteBuffer attachment) {
				//进行读取之后,重置标识位
				attachment.flip();
				//获得读取的字节数
				System.out.println("Server -> " + "收到客户端的数据长度为:" + resultSize);
				//获取读取的数据
				String resultData = new String(attachment.array()).trim();
				System.out.println("Server -> " + "收到客户端的数据信息为:" + resultData);
				String response = "服务器响应, 收到了客户端发来的数据: " + resultData;
				write(asc, response);
			}
			@Override
			public void failed(Throwable exc, ByteBuffer attachment) {
				exc.printStackTrace();
			}
		});
	}
	
	private void write(AsynchronousSocketChannel asc, String response) {
		try {
			ByteBuffer buf = ByteBuffer.allocate(1024);
			buf.put(response.getBytes());
			buf.flip();
			asc.write(buf).get();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
	}
	
	@Override
	public void failed(Throwable exc, Server attachment) {
		exc.printStackTrace();
	}

}




public class Client implements Runnable{

	private AsynchronousSocketChannel asc ;
	
	public Client() throws Exception {
		asc = AsynchronousSocketChannel.open();
	}
	
	public void connect(){
		asc.connect(new InetSocketAddress("127.0.0.1", 8765));
	}
	
	public void write(String request){
		try {
			asc.write(ByteBuffer.wrap(request.getBytes())).get();
			read();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	private void read() {
		ByteBuffer buf = ByteBuffer.allocate(1024);
		try {
			asc.read(buf).get();
			buf.flip();
			byte[] respByte = new byte[buf.remaining()];
			buf.get(respByte);
			System.out.println(new String(respByte,"utf-8").trim());
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		} catch (UnsupportedEncodingException e) {
			e.printStackTrace();
		}
	}
	
	@Override
	public void run() {
		while(true){
			
		}
	}
	
	public static void main(String[] args) throws Exception {
		Client c1 = new Client();
		c1.connect();
		
		Client c2 = new Client();
		c2.connect();
		
		Client c3 = new Client();
		c3.connect();
		
		new Thread(c1, "c1").start();
		new Thread(c2, "c2").start();
		new Thread(c3, "c3").start();
		
		Thread.sleep(1000);
		
		c1.write("c1 aaa");
		c2.write("c2 bbbb");
		c3.write("c3 ccccc");
	}
	
}

AIO是真正的异步非阻塞的,所以,在面对超级大量的客户端,更能得心应手。


下面就比较一下,几种I/O编程的优缺点。

4、各种I/O的对比

      先以一张表来直观的对比一下:

     blob.png

    具体选择什么样的模型或者NIO框架,完全基于业务的实际应用场景和性能需求,如果客户端很少,服务器负荷不重,就没有必要选择开发起来相对不那么简单的NIO做服务端;相反,就应考虑使用NIO或者相关的框架了。

参考文章:

http://www.iteye.com/topic/834447

http://www.iteye.com/magazines/132-Java-NIO

发表评论