知识点:
- Netty框架如何引导服务端监听网络端口并读写消息
- Netty框架如何连接远程服务器并读写消息
- Netty框架ChannelInboundHandlerAdapter部分事件使用方法
- Netty框架Channel管道使用方法
前言
上一篇对Netty框架做了一个大概的介绍,并对核心部件Channel、ChannelHeadler、Future、事件从概念与作用上做了说明,另外还与Java NIO 在编码上做了一个简单的对比,引出了EventLoop。本篇使用一个小示例来了解下Netty框架怎么使用,真正的用起来。
交互图

服务端示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
| public static void main(String[] args) throws InterruptedException { NioEventLoopGroup master = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(master); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new MessageChannel()); } }); ChannelFuture future = bootstrap.bind(30888).sync(); future.channel().closeFuture().sync(); master.shutdownGracefully().sync(); }
public static class MessageChannel extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; String message = buf.toString(CharsetUtil.UTF_8); System.out.println("Client message:" + message); ctx.writeAndFlush(ctx.alloc().buffer().writeBytes(("Copy message:" + message).getBytes())); if ("bye".equalsIgnoreCase(message.replace("\n", ""))) { ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("bye".getBytes())); ctx.close(); } }
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("Client connect:" + ctx.channel().remoteAddress()); super.channelActive(ctx); ctx.channel().writeAndFlush(Unpooled.copiedBuffer("Connect success!".getBytes())); }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); cause.printStackTrace(); ctx.close(); } }
|
首先创建ServiceBootstrap实例。因为你准备使用NIO网络模型传输数据,所以使用NioEventLoopGroup来接受和处理新的连接,并且将Channel的类型指定为NioServiceSocketChannel。在这之后使用本地地址与指定的端口将服务器绑定到这个地址上监听新的连接请求。
在请求的处理上使用特殊的类ChannelInitializer。这是一个关键的类型,当有一个新的连接被接受时会创建一个新的Channel实例,而ChannelInitializer会把MessageChannel类型的实例添加至Channel的Pipeline中;而MessageChannel将会处理入站消息并输出一些信息给客户端。(这里使用MessageChannel模拟业务处理逻辑)
- MessageChannel 实现了业务逻辑
- main方法引导服务启动
引导过程中的步骤如下:
- 创建一个ServiceBootstrap实例以引导和绑定服务器监听的地址与端口
- 创建并分配一个NioEventLoopGroup实例以进行事件的处理
- 当有新连接时使用一个ChannelInitializer实例化一个Channel
- 使用ServiceBootstrap.bind()方法启动服务器监听
客户端示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| public static void main(String[] arg) throws InterruptedException { NioEventLoopGroup group=new NioEventLoopGroup() Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(group) bootstrap.option(ChannelOption.SO_KEEPALIVE,true); bootstrap.remoteAddress("127.0.0.1", 30888); final MessageChannel channel = new MessageChannel(); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(channel); } }); ChannelFuture channelFuture = bootstrap.connect().sync(); channelFuture.channel().closeFuture().sync(); group.shutdownGracefully(); }
@ChannelHandler.Sharable public static class MessageChannel extends SimpleChannelInboundHandler<ByteBuf> { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("Client received: " + msg.toString(CharsetUtil.UTF_8)); }
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("正在连接..."); ctx.channel().write(Unpooled.copiedBuffer("客户端的消息\n".getBytes())); ctx.channel().flush(); super.channelActive(ctx); }
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("连接关闭..."); super.channelInactive(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); cause.printStackTrace(); ctx.close(); } }
|
客户端的代码逻辑与服务端的逻辑基本相同,在Bootstrap类型上使用了Bootstrap类而不是ServiceBootStrap类,这和我们之前讲Java NIO时基本相同,就不多阐述了。这里可以看到不同的是MessageChannel类型客户端继承自SimpleChannelInboundHandler类型,服务端继承自ChannelInboundHandlerAdapter类型;其实没有什么区别,它们都是继承自ChannelInboundHandlerAdapter类型,只是SimpleChannelInboundHandler更方便处理。