​Netty-TCP 粘包和拆包问题

最近在学习李林峰先生的《Netty权威指南》, 目的是想多了解一下底层的知识; 所以在阅读时整理的基本笔记;

本章内容:

  1.TCP粘包/拆包的基础知识;

  2.使用Netty解决读半包问题;

 1.TCP粘包/拆包的基础知识;

      TCP网络传输是基于流的形式传输,所谓的流是没有界限的数据,好比河里的水,是没有断续的,TCP底层

      并不了解上层业务数据的具体含义,它会根据TCP缓存区的实际情况进行包的划分,也就是说,在业务上,

我们一个完整的包可能会被TCP分成多个包的形式进行发送,也可将多个小包封装成一个大的数据包发送出去,

这就是所谓的 TCP 粘包和拆包。

解决方式:

    (1)消息长度固定,累计读取到长度总和为定长LEN 的报文后,就认为读取到了一个完整的消息,将计数器置位,重新开始读取下一个数据报;例如每个报文的大小固定为200个字节,如果不够,空位补空格。

   (2)将回车换行符作为消息结束符,例如FTP协议,这种方式在文本协议中应用比较广泛:

   (3)将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符:

   (4)自定义协议,携带消息头消息体,在消息头中包含表示消息总长度的字段,然后进行业务逻辑处理。如:RocketMQ 自定义      NettyEncoder/NettyDecoder

Netty对以上四种应用做了统一的抽象,提供了4种解码器来解决对应的问题,,分别是:

     第一: LineBasedFrameDecoder  换行符

       依次编译bytebuf中的可读字符,判断看是否有“\n”或者“\r\n”,如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持单行的最大长度。如果连续读取到最大长度后,仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。

      http://blog.csdn.net/upup918/article/details/47864933

    LineBasedFrameDecoder 主要代码演示:

    ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); 
    ch.pipeline().addLast(new StringDecoder());
public class LineBasedFrameDecoderServer {

	public void bind(int port) {

		EventLoopGroup loopGroup = new NioEventLoopGroup();
		EventLoopGroup workGroup = new NioEventLoopGroup();

		ServerBootstrap serverBootstrap = new ServerBootstrap();
		serverBootstrap.group(loopGroup, workGroup);
		serverBootstrap.channel(NioServerSocketChannel.class);
		serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024);
		serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel ch) throws Exception {

				// 以下两行代码为了解决半包读问题
				ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
				ch.pipeline().addLast(new StringDecoder());

				ch.pipeline().addLast(new ChannelHandlerAdapter() {

					private int counter;

					public void channelRead(ChannelHandlerContext ctx,
							Object msg) throws Exception {
						String body = (String) msg;
						System.out.println("The time server receive order:"
								+ body + ";the counter is:" + (++counter));
						String currentTime = "QUERY TIME ORDER"
								.equalsIgnoreCase(body) ? new Date(System
								.currentTimeMillis()).toString() : "BAD ORDER";
						currentTime = currentTime
								+ System.getProperty("line.separator");
						ByteBuf resp = Unpooled.copiedBuffer(currentTime
								.getBytes());
						ctx.write(resp);
					}

					public void channelReadComplete(ChannelHandlerContext ctx) {
						ctx.flush();
					}

					public void exceptionCaught(ChannelHandlerContext ctx,
							Throwable cause) {
						ctx.close();
					}

				});
			}
		});

		try {
			ChannelFuture future = serverBootstrap.bind(port).sync();
			future.channel().closeFuture().sync();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			loopGroup.shutdownGracefully();
			workGroup.shutdownGracefully();
		}

	}

	public static void main(String[] args) {
		System.out.println("-------------------start ");
		new LineBasedFrameDecoderServer().bind(8080);
		System.out.println("-------------------end ");
	}

}

public class LineBasedFrameDecoderClient {

	public void connnect(String inetHost, int port) {

		EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
		Bootstrap workGroup = new Bootstrap();
		workGroup.channel(NioSocketChannel.class);
		workGroup.group(eventLoopGroup);
		workGroup.option(ChannelOption.TCP_NODELAY, true);
		workGroup.handler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel ch) throws Exception {
				  // 以下两行代码为了解决半包读问题  
                ch.pipeline().addLast(new LineBasedFrameDecoder(1024));  
                ch.pipeline().addLast(new StringDecoder());  
				
				ch.pipeline().addLast(new ChannelHandlerAdapter() {

					private int counter;  
					  
				    public void channelActive(ChannelHandlerContext ctx) {  
				    	byte[] req="QUERY TIME ORDER".getBytes();
				        ByteBuf message = null;  
				        for (int i = 0; i < 100; i++) {  
				            message = Unpooled.buffer(req.length);  
				            message.writeBytes(req);  
				            ctx.writeAndFlush(message);  
				        }  
				    }  
				  
				    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
				        String body = (String) msg;  
				        System.out.println("Now is:" + body + "; the counter is:" + (++counter));  
				    }  
				  
				    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  
				        ctx.close();  
				    }  
				  

				});
			}
		});
		ChannelFuture future;
		try {
			future = workGroup.connect(inetHost, port);
			future.channel().closeFuture().sync();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			eventLoopGroup.shutdownGracefully();
		}

	}

	public static void main(String[] args) {
		new LineBasedFrameDecoderClient().connnect("127.0.0.1", 8080);
	}
}

   第二: FixdLengthFrameDecoder  自动完成对定长消息的解码

          是固定长度解码器,它能按照指定的长度对消息进行自动解码,开发者不需要考虑TCP的粘包等问题。利用FixedLengthFrameDecoder解码,无论一次性接收到多少的数据,他都会按照构造函数中设置的长度进行解码;如果是半包消息,FixedLengthFrameDecoder会缓存半包消息并等待下一个包,到达后进行拼包,直到读取完整的包。

       FixdLengthFrameDecoder 主要代码演示:

 ch.pipeline().addLast(new FixedLengthFrameDecoder(5)); 
 //5 表示5个长度; 如:客户端输入“aaaaaa” ---> 服务端 只会接收到 “aaaaa” ; 所以数据实在不够的时候 以空位补空格。

   第三: DelimiterBaseFrameDecoder 自定义的分隔符

    DelimiterBaseFrameDecoder 主要代码演示:

   ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes());
   ch.pipeline().addLast(new    DelimiterBasedFrameDecoder(1024,delimiter));
   ch.pipeline().addLast(new StringDecoder());
   ch.pipeline().addLast(new EchoClientHandler());

      是自定义的分隔符解码,构造函数的第一个参数表示单个消息的最大长度,当达到该长度后仍然没有查到分隔符,就抛出TooLongFrameException异常,防止由于异常码流缺失分隔符导致的内存溢出。

public class DelimiterBaseFrameDecoderServer {

	public void bind(int port) {

		EventLoopGroup loopGroup = new NioEventLoopGroup();
		EventLoopGroup workGroup = new NioEventLoopGroup();

		ServerBootstrap serverBootstrap = new ServerBootstrap();
		serverBootstrap.group(loopGroup, workGroup);
		serverBootstrap.option(ChannelOption.SO_BACKLOG, 100);
		serverBootstrap.channel(NioServerSocketChannel.class);
		serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
		serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

			@Override
			protected void initChannel(SocketChannel ch) throws Exception {
				ByteBuf byteBuf = Unpooled.copiedBuffer("$_".getBytes());
				ch.pipeline().addLast(
						new DelimiterBasedFrameDecoder(1024, byteBuf));
				ch.pipeline().addLast(new StringDecoder());
				ch.pipeline().addLast(new ChannelHandlerAdapter() {

					private int counter;

					public void channelRead(ChannelHandlerContext ctx,
							Object msg) throws Exception {

						String body = (String) msg;

						System.out.println("The time server receive order:"
								+ body + ";the counter is:" + (++counter));

						body += "$_";

						ByteBuf resp = Unpooled.copiedBuffer(body.getBytes());
						ctx.write(resp);
					}

					public void channelReadComplete(ChannelHandlerContext ctx) {
						ctx.flush();
					}

					public void exceptionCaught(ChannelHandlerContext ctx,
							Throwable cause) {
						ctx.close();
					}

				});
			}
		});

		try {
			ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
			channelFuture.channel().closeFuture().sync();

		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			loopGroup.shutdownGracefully();
			workGroup.shutdownGracefully();
		}
	}
	
	public static void main(String[] args) {
		new DelimiterBaseFrameDecoderServer().bind(8080);
	}

}


public class DelimiterBaseFrameDecoderClient {

	public void connnection(String host, int port) {

		EventLoopGroup loopGroup = new NioEventLoopGroup();

		Bootstrap bootstrap = new Bootstrap();
		bootstrap.group(loopGroup);
		bootstrap.option(ChannelOption.TCP_NODELAY, true);
		bootstrap.channel(NioSocketChannel.class);
		bootstrap.handler(new ChannelInitializer<SocketChannel>() {

			@Override
			protected void initChannel(SocketChannel ch) throws Exception {

				ByteBuf byteBuf = Unpooled.copiedBuffer("$_".getBytes());
				ch.pipeline().addLast(
						new DelimiterBasedFrameDecoder(1024, byteBuf));
				ch.pipeline().addLast(new StringDecoder());
				ch.pipeline().addLast(new ChannelHandlerAdapter() {

					private int counter;

					public void channelActive(ChannelHandlerContext ctx) {
						byte[] req = "HI,Tony Welcome to Netty .$_".getBytes();
						ByteBuf message = null;
						for (int i = 0; i < 10; i++) {
							message = Unpooled.buffer(req.length);
							message.writeBytes(req);
							ctx.writeAndFlush(message);
						}
					}

					public void channelRead(ChannelHandlerContext ctx,
							Object msg) throws Exception {
						String body = (String) msg;
						System.out.println("Now is:" + body
								+ "; the counter is:" + (++counter));
					}

					public void exceptionCaught(ChannelHandlerContext ctx,
							Throwable cause) {
						ctx.close();
					}
				});

			}
		});

		ChannelFuture channelFuture;
		try {
			channelFuture = bootstrap.connect(host, port).sync();
			channelFuture.channel().closeFuture().sync();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			loopGroup.shutdownGracefully();
		}

	}

	public static void main(String[] args) {
		new DelimiterBaseFrameDecoderClient().connnection("127.0.0.1", 8080);
	}

}

其他文章:

      http://www.jianshu.com/p/5dcbc0456376

发表评论