Netty4自定义通讯协议

      Netty中,通讯的双方建立连接后,会把数据按照ByteBuf的方式进行传输,例如http协议中,就是通过HttpRequestDecoder对ByteBuf数据流进行处理,转换成http的对象。基于这个思路,我自定义一种通讯协议:Server和客户端直接传输java对象。

实现的原理是通过Encoder把java对象转换成ByteBuf流进行传输,通过Decoder把ByteBuf转换成java对象进行处理,处理逻辑如下图所示:

客户端传输的java bean为Request:

public class Request {
	
	/**
	 * 请求模块
	 */
	private short module;
	
	/**
	 * 命令号
	 */
	private short cmd;
	
	/**
	 * 数据部分
	 */
	private byte[] data;

	public short getModule() {
		return module;
	}

	public void setModule(short module) {
		this.module = module;
	}

	public short getCmd() {
		return cmd;
	}

	public void setCmd(short cmd) {
		this.cmd = cmd;
	}

	public byte[] getData() {
		return data;
	}

	public void setData(byte[] data) {
		this.data = data;
	}
	
	
	public int getDataLength(){
		if(data == null){
			return 0;
		}
		return data.length;
	}

	@Override
	public String toString() {
		return "Request [module=" + module + ", cmd=" + cmd + ", data="
				+ Arrays.toString(data) + "]";
	}
	
	
}

RequestEncoder 编码类:

/**
 * <pre>
 * 数据包格式
 * +——----——+——-----——+——----——+——----——+——-----——+
 * |  包头	|  模块号      |  命令号    |   长度     |   数据       |
 * +——----——+——-----——+——----——+——----——+——-----——+
 * </pre>
 *
 */
public class RequestEncoder extends MessageToByteEncoder<Request>{

	@Override
	protected void encode(ChannelHandlerContext ctx, Request message, ByteBuf buffer) 
	throws Exception {

		System.out.println("--------------RequestEncoder--------------");
		
		//包头
		buffer.writeInt(ConstantValue.HEADER_FLAG);
		//module
		buffer.writeShort(message.getModule());
		//cmd
		buffer.writeShort(message.getCmd());
		//长度
		int lenth = message.getData()==null? 0 : message.getData().length;
		if(lenth <= 0){
			buffer.writeInt(lenth);
		}else{
			buffer.writeInt(lenth);
			buffer.writeBytes(message.getData());
		}
	}
}

RequestDecoder 解码类:

/**
 * 数据包解码器
 * <pre>
 * 数据包格式
 * +——----——+——-----——+——----——+——----——+——-----——+
 * |  包头	|  模块号      |  命令号    |   长度     |   数据       |
 * +——----——+——-----——+——----——+——----——+——-----——+
 * </pre>
 * 包头4字节
 * 模块号2字节 
 * 命令号2字节
 * 长度4字节(数据部分占有字节数量)
 */
public class RequestDecoder extends ByteToMessageDecoder {
	
	/**
	 * 数据包基本长度
	 */
	public static int BASE_LENTH = 4 + 2 + 2 + 4;

	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) 
	throws Exception {
		System.out.println("--------------RequestDecoder--------------");
		while(true){
			if(buffer.readableBytes() >= BASE_LENTH){
				//第一个可读数据包的起始位置
				int beginIndex;
				
				while(true) {
					//包头开始游标点
					beginIndex = buffer.readerIndex();
					//标记初始读游标位置
					buffer.markReaderIndex();
					if (buffer.readInt() == ConstantValue.HEADER_FLAG) {
						break;
					}
					//未读到包头标识略过一个字节
					buffer.resetReaderIndex();
					buffer.readByte();
					
					//不满足
					if(buffer.readableBytes() < BASE_LENTH){
						return ;
					}
				}
				//读取命令号
				short module = buffer.readShort();
				short cmd = buffer.readShort();
				
				//读取数据长度 
				int lenth = buffer.readInt();
				if(lenth < 0 ){
					ctx.channel().close();
				}
				
				//数据包还没到齐
				if(buffer.readableBytes() < lenth){
					buffer.readerIndex(beginIndex);
					return ;
				}
				
				//读数据部分
				byte[] data = new byte[lenth];
				buffer.readBytes(data);
				
				Request message = new Request();
				message.setModule(module);
				message.setCmd(cmd);
				message.setData(data);
				//解析出消息对象,继续往下面的handler传递
				out.add(message);
			}else{
				break;
			}
		}
		
		//数据不完整,等待完整的数据包
		return ;
	}
}

服务端响应的java bean为Response:

/**
 * 返回对象
 *
 */
public class Response {
	/**
	 * 请求模块
	 */
	private short module;
	
	/**
	 * 命令号
	 */
	private short cmd;
	
	/**
	 * 状态码
	 */
	private int stateCode;
	
	/**
	 * 数据部分
	 */
	private byte[] data;

	public short getModule() {
		return module;
	}

	public void setModule(short module) {
		this.module = module;
	}

	public short getCmd() {
		return cmd;
	}

	public void setCmd(short cmd) {
		this.cmd = cmd;
	}

	public int getStateCode() {
		return stateCode;
	}

	public void setStateCode(int stateCode) {
		this.stateCode = stateCode;
	}

	public byte[] getData() {
		return data;
	}

	public void setData(byte[] data) {
		this.data = data;
	}
	
	public int getDataLength(){
		if(data == null){
			return 0;
		}
		return data.length;
	}
}

ResponseEecoder 编码类:

/**
 * <pre>
 * 数据包格式
 * +——----——+——-----——+——----——+——----——+——----——+——----——+
 * |  包头	|  模块号      |  命令号    |  结果码    |  长度       |   数据     |  
 * +——----——+——-----——+——----——+——----——+——----——+——----——+
 * </pre>
 *
 */
public class ResponseEncoder extends MessageToByteEncoder<Response>{

	@Override
	protected void encode(ChannelHandlerContext ctx, Response response, ByteBuf buffer) 
	throws Exception {
		System.out.println("--------------ResponseEncoder--------------");
		System.out.println("返回请求:" + "module:" +response.getModule() 
		+" cmd:" + response.getCmd() + " resultCode:" + response.getStateCode());
		
		//包头
		buffer.writeInt(ConstantValue.HEADER_FLAG);
		//module和cmd
		buffer.writeShort(response.getModule());
		buffer.writeShort(response.getCmd());
		//结果码
		buffer.writeInt(response.getStateCode());
		//长度
		int lenth = response.getData()==null? 0 : response.getData().length;
		if(lenth <= 0){
			buffer.writeInt(lenth);
		}else{
			buffer.writeInt(lenth);
			buffer.writeBytes(response.getData());
		}
	}
}

ResponseDecoder解码类:

/**
 * <pre>
 * 数据包格式
 * +——----——+——-----——+——----——+——----——+——----——+——----——+
 * |  包头	|  模块号      |  命令号    |  结果码    |  长度       |   数据     |  
 * +——----——+——-----——+——----——+——----——+——----——+——----——+
 * </pre>
 *
 */
public class ResponseDecoder extends ByteToMessageDecoder{
	
	/**
	 * 数据包基本长度
	 */
	public static int BASE_LENTH = 4 + 2 + 2 + 4 + 4;

	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out)
	 throws Exception {
		System.out.println("--------------ResponseDecoder--------------");
		while(true){
			if(buffer.readableBytes() >= BASE_LENTH){
				//第一个可读数据包的起始位置
				int beginIndex;
				
				while(true) {
					//包头开始游标点
					beginIndex = buffer.readerIndex();
					//标记初始读游标位置
					buffer.markReaderIndex();
					if (buffer.readInt() == ConstantValue.HEADER_FLAG) {
						break;
					}
					//未读到包头标识略过一个字节
					buffer.resetReaderIndex();
					buffer.readByte();
					
					//不满足
					if(buffer.readableBytes() < BASE_LENTH){
						return ;
					}
				}
				//读取模块号命令号
				short module = buffer.readShort();
				short cmd = buffer.readShort();
				
				int stateCode = buffer.readInt();
				
				//读取数据长度 
				int lenth = buffer.readInt();
				if(lenth < 0 ){
					ctx.channel().close();
				}
				
				//数据包还没到齐
				if(buffer.readableBytes() < lenth){
					buffer.readerIndex(beginIndex);
					return ;
				}
				
				//读数据部分
				byte[] data = new byte[lenth];
				buffer.readBytes(data);
				
				Response response = new Response();
				response.setModule(module);
				response.setCmd(cmd);
				response.setStateCode(stateCode);
				response.setData(data);
				//解析出消息对象,继续往下面的handler传递
				out.add(response);
			}else{
				break;
			}
		}
		//数据不完整,等待完整的数据包
		return ;
	}

}

/**
 * 静态值
 */
public interface ConstantValue {
	/**
	 * 包头标识
	 */
	public static int HEADER_FLAG = -21415431;
}

测试:

 服务端实现 Server:

/**
 * netty服务端入门
 */
public class Server {

	/**
	 * 启动
	 */
	public void start() {

		// 服务类
		ServerBootstrap b = new ServerBootstrap();

		// 创建boss和worker
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();

		try {
			// 设置循环线程组事例
			b.group(bossGroup, workerGroup);

			// 设置channel工厂
			b.channel(NioServerSocketChannel.class);

			// 设置管道
			b.childHandler(new ChannelInitializer<SocketChannel>() {
				@Override
				public void initChannel(SocketChannel ch) throws Exception {
					ch.pipeline().addLast(new RequestDecoder());
					ch.pipeline().addLast(new ResponseEncoder());
					ch.pipeline().addLast(new ServerHandler());
				}
			});

			b.option(ChannelOption.SO_BACKLOG, 2048);// 链接缓冲池队列大小
			System.out.println("start!!!");
			ChannelFuture f = b.bind(10102).sync();
			
			f.channel().closeFuture().sync();  

			System.out.println("start!!!");
		} catch (Exception e) {
			e.printStackTrace();
		}finally {  
            workerGroup.shutdownGracefully();  
            bossGroup.shutdownGracefully();  
        }  
	}
	
	public static void main(String[] args) {
		Server server=new Server();
		server.start();
	}

}

/**
 * 消息接受处理类
 * 
 */
public class ServerHandler extends SimpleChannelInboundHandler<Request> {

	/**
	 * 接收消息
	 */
	@Override
	public void messageReceived(ChannelHandlerContext ctx, Request request)
			throws Exception {

		System.out.println("====================服务端接收到客户端发送的消息===============");
		System.out.println(request.toString());

		Response response = new Response();
		response.setCmd(request.getCmd());
		response.setData("你好客户端".getBytes());
		response.setModule(request.getModule());
		response.setStateCode(1);
		
		Thread.sleep(2000);
		
		ctx.writeAndFlush(response);

	}

	/**
	 * 断线移除会话
	 */
	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("====================断线移除会话===============");
	}
}

 客户端实现 Client:

/**
 * netty客户端入门
 * 
 */
public class Client {

	/**
	 * 服务类
	 */
	Bootstrap bootstrap = new Bootstrap();

	/**
	 * 会话
	 */
	private Channel channel;

	/**
	 * 线程池
	 */
	private EventLoopGroup workerGroup = new NioEventLoopGroup();

	/**
	 * 初始化
	 */
	@PostConstruct
	public void init() {
		
		// 设置循环线程组事例
		bootstrap.group(workerGroup);

		// 设置channel工厂
		bootstrap.channel(NioSocketChannel.class);

		// 设置管道
		bootstrap.handler(new ChannelInitializer<SocketChannel>() {
			@Override
			public void initChannel(SocketChannel ch) throws Exception {
				ch.pipeline().addLast(new ResponseDecoder());
				ch.pipeline().addLast(new RequestEncoder());
				ch.pipeline().addLast(new ClientHandler());
			}
		});
	}

	/**
	 * 连接
	 * 
	 * @param ip
	 * @param port
	 * @throws InterruptedException
	 */
	public void connect() throws InterruptedException {

		// 连接服务端
		ChannelFuture connect = bootstrap.connect(new InetSocketAddress("127.0.0.1", 10102));
		connect.sync();
		channel = connect.channel();
	}

	/**
	 * 关闭
	 */
	public void shutdown() {
		workerGroup.shutdownGracefully();
	}

	/**
	 * 获取会话
	 * 
	 * @return
	 */
	public Channel getChannel() {
		return channel;
	}
	
	/**
	 * 发送消息
	 * @param request
	 * @throws InterruptedException 
	 */
	public void sendRequest(Request request) throws InterruptedException{
		if(channel == null || !channel.isActive()){
			connect();
		}
		channel.writeAndFlush(request);
		System.out.println("=============writeAndFlush===========");
	}
	
	public static void main(String[] args) throws InterruptedException {
		
		Client client=new Client();
		client.init();
		client.connect();
		
		Request request=new Request();
		request.setCmd((short)1);
		request.setData("增加数据".getBytes());
		request.setModule((short)2);
		
		client.sendRequest(request);
		Thread.sleep(100000);
	}
	
}
/**
 * 消息接受处理类
 *
 */
public class ClientHandler extends SimpleChannelInboundHandler<Response> {
	

	/**
	 * 接收消息
	 */
	public void messageReceived(ChannelHandlerContext ctx, Response response)
	 throws Exception {
		
		System.out.println("############################客户端接收到消息#######################");
	}
	
	

	/**
	 * 断开链接
	 */
	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
	}


}

发表评论