使用JDK NIO类库时开发NIO的异步服务端时,需要用到:多路复用器Selector,ServerSocketChannel,SocketChanel,ByteBuffer,SelectionKey等。如果用源生的JAVA NIO搭建服务端,无疑是十分复杂的,这不仅拖慢了项目的进度,而且开发出来的项目可能还不稳定。那么Netty是如何降低其复杂度的呢?Netty开发简单异步服务端不超过9步,下面分步骤来讲解用Netty来开发异步服务端:
第一步:创建EventLoopGroup
//第一步 创建EventLoopGroup EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();
讲解:EventLoopGroup 是EventLoop的数组,EventLoop的职责是处理所有注册到本线程多路复用器Selector上的Channel,Selector的轮询操作由EventLoop线程的run方法驱动。
第二步:创建ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
讲解:ServerBootstrap是Netty服务端启动的辅助类,从代码可以看出,我们在创建ServerBootstrap时,用的是一个无参构造器创建的,你也会惊讶的发现,ServerBootstrap只有一个无参构造器,因为这个辅助类需要的参数太多了,而且有些参数是用户可以选择性添加的,所以ServerBootstrap引入了Builder模式 让用户动态添加参数:
ServerBootstrap b = new ServerBootstrap();//动态添加参数,支持链式添加b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler()).childHandler(new HttpServerHandler());
第三步:设置并绑定服务端Channel(开始绑定ServerSocketChannel)
b.channel(NioServerSocketChannel.class)
讲解:这里只需要传入对应的.class类即可,ServerBootstrap会通过工厂类,利用反射帮我们创建对应的Class对象。
第四步:创建并初始化处理网络时间的职责链 ChannelPipeline
ch.pipeline().addLast(new LoggingHandler()); //请求 http解码 ch.pipeline().addLast("http-decoder",new HttpRequestDecoder()); //将多个消息转换为单一的FullHttpRequest ch.pipeline().addLast("http-aggregator",new HttpObjectAggregator(65536)); //应答http编码 ch.pipeline().addLast("http-encoder",new HttpResponseEncoder()); //连接异步发送请求,防止内存溢出 ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler()); ch.pipeline().addLast("httpServerHandlerAdapter",new HttpServerHandlerAdapter());
讲解:ChannelPipeline并不是NIO服务端必须的,它本质就是一个处理网络事件的职责链,负责管理和执行ChannelHandler,我们可以在其中添加服务端接收请求的解码和发送请求的编码,以及粘包\拆包的处理等。
第五步:添加并设置ChannelHandler
ChannelHandler是Netty提供给我们定制和扩展的关键接口,利用ChannelHandler我们可以完成大多数功能的定制,例如消息编解码,心跳安全认证等,下面是我定义的Handler:
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler()) //加入自定义的handler .childHandler(new HttpServerHandler());//我定义的handlerpublic class HttpServerHandler extends ChannelInitializer{ @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler()); //请求 http解码 ch.pipeline().addLast("http-decoder",new HttpRequestDecoder()); //将多个消息转换为单一的FullHttpRequest ch.pipeline().addLast("http-aggregator",new HttpObjectAggregator(65536)); //应答http编码 ch.pipeline().addLast("http-encoder",new HttpResponseEncoder()); //连接异步发送请求,防止内存溢出 ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler()); ch.pipeline().addLast("httpServerHandlerAdapter",new HttpServerHandlerAdapter()); }//第二层对消息处理public class HttpServerHandlerAdapter extends SimpleChannelInboundHandler { @Override public boolean acceptInboundMessage(Object msg) throws Exception { System.out.println(msg); return super.acceptInboundMessage(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { System.out.println(msg); } }
第六步:绑定并启动监听端口:
ChannelFuture f = b.bind(8080).sync(); System.out.println("Http服务器启动完成,监听端口为:"+port); f.channel().closeFuture().sync();
讲解:在监听端口之前,Netty会做一系列的初始化和检测工作,然后启动监听端口,同时,会将ServerSocketChannel注册到多路复用器Selector上,监听客户端的连接。
第七步:Selector轮询:
讲解:Selector的轮询是由Reactor线程NioEventLoop负责调度的,然后将准备就绪的Channel放到一个集合中;
第八步:Reactor线程NioEventLoop将准备就绪的Channel去执行我们定义好的ChannelPipeline方法,并执行系统定义的ChannelHandler;
第九步:最终,执行我们根据具体业务逻辑定义好的的ChannelHandler。
完整源码如下:
HttpServer:
package kaoqin.server;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.logging.LoggingHandler;public class HttpServer { private void run(int port) throws InterruptedException{ //第一步 创建EventLoopGroup EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler()) //加入自定义的handler .childHandler(new HttpServerHandler()); //.option(ChannelOption.AUTO_READ,true); ChannelFuture f = b.bind(port).sync(); System.out.println("Http服务器启动完成,监听端口为:"+port); f.channel().closeFuture().sync(); }finally{ bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new HttpServer().run(9999); }}
HttpServerHandler:
package kaoqin.server;import io.netty.channel.ChannelInitializer;import io.netty.channel.socket.SocketChannel;import io.netty.handler.codec.http.HttpObjectAggregator;import io.netty.handler.codec.http.HttpRequestDecoder;import io.netty.handler.codec.http.HttpResponseEncoder;import io.netty.handler.logging.LoggingHandler;import io.netty.handler.stream.ChunkedWriteHandler;public class HttpServerHandler extends ChannelInitializer{ @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler()); //请求 http解码 ch.pipeline().addLast("http-decoder",new HttpRequestDecoder()); //将多个消息转换为单一的FullHttpRequest ch.pipeline().addLast("http-aggregator",new HttpObjectAggregator(65536)); //应答http编码 ch.pipeline().addLast("http-encoder",new HttpResponseEncoder()); //连接异步发送请求,防止内存溢出 ch.pipeline().addLast("http-chunked",new ChunkedWriteHandler()); ch.pipeline().addLast("httpServerHandlerAdapter",new HttpServerHandlerAdapter()); }}
HttpServerHandlerAdapter:
package kaoqin.server;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.handler.codec.http.FullHttpRequest;public class HttpServerHandlerAdapter extends SimpleChannelInboundHandler{ @Override public boolean acceptInboundMessage(Object msg) throws Exception { System.out.println(msg); return super.acceptInboundMessage(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception { System.out.println(msg); } }