学习netty之前,要先了解操作系统中的IO、零拷贝(已经附上链接了)
原生的NIO存在问题:
netty是对NIO进行封装,解决了上述问题:
适用于各种传输类型的统一API,阻塞和非阻塞Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型延迟更低;减少资源消耗;最小化不必要的内存复制。下面讲解的是netty线程模式的由来
现有的线程模式:
单 Reactor 单线程;单 Reactor多线程;主从 Reactor多线程1、每个连接都需要独立的线程完成数据的输入,业务处理,数据返回。当并发数很大,就会创建大量的线程,占用很大系统资源。
2、连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在 Handler对象中的read 操作,导致上面的处理线程资源浪费。
基于I/O多路复用模型:多个客户端进行连接,先把连接请求给Reactor,多个连接共用一个阻塞对象Reactor,由Reactor负责监听和分发,当客户端连接没有数据时不会阻塞线程。
基于线程池复用线程资源:不必再为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。(解决了当并发数很大时,会创建大量线程,占用很大系统资源)
Reactor模式中核心组成:
特点:

上图解析:
特点:

上图解析:
特点:

上图解析:
生活中的体现:
单 Reactor 单线程:前台接待员和服务员是同一个人,全程为顾客服务单 Reactor 多线程:1 个前台接待员,多个服务员,接待员只负责接待主从 Reactor 多线程:多个前台接待员,多个服务生
netty线程模型主要是依据主从Reactor多线程
BossGroup中的NioEventLoop就像MainReactor可以有多个,WorkerGroup中的NioEventLoop就像SubReactor一样可以有多个。
Netty抽象出两组线程池,BossGroup专门负责接收客户端的连接,WorkerGroup专门负责网络的读写
BossGroup和WorkerGroup类型都是NioEventLoopGroup,NioEventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环是NioEventLoop(NioEventLoopGroup可以有多个线程,即可以含有多个NioEventLoop)
NioEventLoop表示一个不断循环的执行处理任务的线程,每个 NioEventLoop都有一个Selector,用于监听绑定在其上的socket的网络通讯
每个BossGroup下面的NioEventLoop循环执行的步骤有3步:
1、轮询 accept 事件
2、处理 accept 事件,与 client 建立连接,生成NioSocketChannel,并将其注册到某个WorkerGroup的NioEventLoop上的Selector
3、继续处理任务队列的任务,即runAllTasks
每个WorkerGroup下面的NioEventLoop循环执行的步骤有3步:
1、轮询 read,write 事件
2、处理 I/O 事件,即 read,write 事件,在对应NioSocketChannel处理
3、处理任务队列的任务,即runAllTasks
每个WorkerGroup的NioEventLoop处理业务时,会使用pipeline(管道),pipeline中包含了channel(通道),即通过pipeline可以获取到对应通道,每个通道中都有一个channelPipeline维护了很多的处理器channelhandler。
服务端:
NettyServer
import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;public class NettyServer { public static void main(String[] args) throws Exception { //创建BossGroup 和 WorkerGroup //说明 //1. 创建两个线程组 bossGroup 和 workerGroup //2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成 //3. 两个都是无限循环 //4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数 // 默认实际 cpu核数 * 2 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //8 try { //创建服务器端的启动对象,配置参数 ServerBootstrap bootstrap = new ServerBootstrap(); //使用链式编程来进行设置 bootstrap.group(bossGroup, workerGroup) //设置两个线程组 .channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现 .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列等待连接个数 .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态// .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象) //给pipeline 设置处理器 @Override protected void initChannel(SocketChannel ch) throws Exception { System.out.println("客户socketchannel hashcode=" + ch.hashCode()); //可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue ch.pipeline().addLast(new NettyServerHandler()); } }); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器 System.out.println(".....服务器 is ready..."); //绑定一个端口并且同步生成了一个 ChannelFuture 对象(也就是立马返回这样一个对象) //启动服务器(并绑定端口) ChannelFuture cf = bootstrap.bind(6668).sync(); //给cf 注册监听器,监控我们关心的事件 cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (cf.isSuccess()) { System.out.println("监听端口 6668 成功"); } else { System.out.println("监听端口 6668 失败"); } } }); //对关闭通道事件 进行监听 cf.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}NettyServerHandler
import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelPipeline;import io.netty.util.CharsetUtil;import java.util.concurrent.TimeUnit;/*说明1. 我们自定义一个Handler 需要继承netty 规定好的某个HandlerAdapter(规范)2. 这时我们自定义一个Handler , 才能称为一个handler */public class NettyServerHandler extends ChannelInboundHandlerAdapter { //读取数据事件(这里我们可以读取客户端发送的消息) /* 1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址 2. Object msg: 就是客户端发送的数据 默认Object */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channle =" + ctx.channel()); System.out.println("server ctx =" + ctx); System.out.println("看看channel 和 pipeline的关系"); Channel channel = ctx.channel(); ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链表 //将 msg 转成一个 ByteBuf //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer. ByteBuf buf = (ByteBuf) msg; System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("客户端地址:" + channel.remoteAddress()); } //数据读取完毕 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //writeAndFlush 是 write + flush //将数据写入到缓存,并刷新 //一般讲,我们对这个发送的数据进行编码 ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8)); } //发生异常后, 一般是需要关闭通道 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); }}客户端:
NettyClient
import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;public class NettyClient { public static void main(String[] args) throws Exception { //客户端需要一个事件循环组 EventLoopGroup group = new NioEventLoopGroup(); try { //创建客户端启动对象 //注意客户端使用的不是 ServerBootstrap 而是 Bootstrap Bootstrap bootstrap = new Bootstrap(); //设置相关参数 bootstrap.group(group) //设置线程组 .channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器 } }); System.out.println("客户端 ok.."); //启动客户端去连接服务器端 //关于 ChannelFuture 要分析,涉及到netty的异步模型 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync(); //对关闭通道事件 进行监听 channelFuture.channel().closeFuture().sync(); }finally { group.shutdownGracefully(); } }}NettyClientHandler
import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.CharsetUtil;public class NettyClientHandler extends ChannelInboundHandlerAdapter { //当通道就绪就会触发该方法 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client " + ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8)); } //当通道有读取事件时,会触发 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8)); System.out.println("服务器的地址: "+ ctx.channel().remoteAddress()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类。
Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以立即返回一个ChannelFuture,它可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件
Future-Listener 机制
当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。
NioSocketChannel,异步的客户端 TCP Socket 连接。NioServerSocketChannel,异步的服务器端 TCP Socket 连接。NioDatagramChannel,异步的 UDP 连接。

ChannelPipeline addFirst(ChannelHandler... handlers),把一个业务处理类(handler)添加到链中的第一个位置ChannelPipeline addLast(ChannelHandler... handlers),把一个业务处理类(handler)添加到链中的最后一个位置
ChannelHandlerContext的常用方法:
1、ChannelFuture close(),关闭通道2、ChannelOutboundInvoker flush(),刷新3、ChannelFuture writeAndFlush(Object msg),将数据写到ChannelPipeline 中当前 ChannelHandler 的下一个ChannelHandler 开始处理(出站)
Netty在创建Channel实例后,一般都需要设置 ChannelOption 参数
ByteBuf buffer = Unpooled.buffer(10);
ByteBuf byteBuf = Unpooled.copiedBuffer("hello,world!", Charset.forName("utf-8"));
GroupChatServer
import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;public class GroupChatServer { private int port; //监听端口 public GroupChatServer(int port) { this.port = port; } //编写run方法,处理客户端的请求 public void run() throws Exception{ //创建两个线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //获取到pipeline ChannelPipeline pipeline = ch.pipeline(); //向pipeline加入解码器 pipeline.addLast("decoder", new StringDecoder()); //向pipeline加入编码器 pipeline.addLast("encoder", new StringEncoder()); //加入自己的业务处理handler pipeline.addLast(new GroupChatServerHandler()); } }); System.out.println("netty 服务器启动"); ChannelFuture channelFuture = b.bind(port).sync(); //监听关闭 channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new GroupChatServer(7000).run(); }}GroupChatServerHandler
import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.channel.group.ChannelGroup;import io.netty.channel.group.DefaultChannelGroup;import io.netty.util.concurrent.GlobalEventExecutor;import java.text.SimpleDateFormat;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> { //这样写还要自己遍历Channel //public static List<Channel> channels = new ArrayList<Channel>(); //使用一个hashmap 管理私聊(私聊本案例并未实现,只是提供个思路) //public static Map<String, Channel> channels = new HashMap<String,Channel>(); //定义一个channle 组,管理所有的channel //GlobalEventExecutor.INSTANCE) 是全局的事件执行器,是一个单例 private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //handlerAdded 表示连接建立,一旦连接,第一个被执行 //将当前channel 加入到 channelGroup @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //将该客户加入聊天的信息推送给其它在线的客户端 //该方法会将 channelGroup 中所有的channel 遍历,并发送消息,我们不需要自己遍历 channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " \n"); channelGroup.add(channel); //私聊如何实现// channels.put("userid100",channel); } //断开连接, 将xx客户离开信息推送给当前在线的客户 @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了\n"); System.out.println("channelGroup size" + channelGroup.size()); } //表示channel 处于活动状态, 提示 xx上线 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //这个是给服务端看的,客户端上面已经提示xxx加入群聊了 System.out.println(ctx.channel().remoteAddress() + " 上线了~"); } //表示channel 处于不活动状态, 提示 xx离线了 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress() + " 离线了~"); } //读取数据,转发给在线的每一个客户端 @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { //获取到当前channel Channel channel = ctx.channel(); //这时我们遍历channelGroup, 根据不同的情况,回送不同的消息 channelGroup.forEach(ch -> { if(channel != ch) { //不是当前的channel,转发消息 ch.writeAndFlush("[客户]" + channel.remoteAddress() + " 发送了消息" + msg + "\n"); }else {//回显自己发送的消息给自己 ch.writeAndFlush("[自己]发送了消息" + msg + "\n"); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //关闭通道 ctx.close(); }}GroupChatClient
import io.netty.bootstrap.Bootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import java.util.Scanner;public class GroupChatClient { //属性 private final String host; private final int port; public GroupChatClient(String host, int port) { this.host = host; this.port = port; } public void run() throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //得到pipeline ChannelPipeline pipeline = ch.pipeline(); //加入相关handler pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); //加入自定义的handler pipeline.addLast(new GroupChatClientHandler()); } }); ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); //得到channel Channel channel = channelFuture.channel(); System.out.println("-------" + channel.localAddress()+ "--------"); //客户端需要输入信息,创建一个扫描器 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String msg = scanner.nextLine(); //通过channel 发送到服务器端 channel.writeAndFlush(msg + "\r\n"); } }finally { group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new GroupChatClient("127.0.0.1", 7000).run(); }}GroupChatClientHandler
import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> { //从服务器拿到的数据 @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg.trim()); }}IdleStateHandler是netty 提供的处理空闲状态的处理器(放在ChannelPipeline维护的ChannelHandler的双向链表上):
编解码器放在ChannelPipeline维护的ChannelHandler的双向链表上
服务端发数据给客户端:服务端—>出站(编码)—>Socket通道—>入站(解码)—>客户端
客户端发数据给服务端:客户端—>出站(编码)—>Socket通道—>入站(解码)—>服务端
编解码器(自定义编解码器时需要继承下面的其中一个类):
TCP粘包和拆包
使用优化方法(Nagle 算法),将多次间隔较小且数据量小的数据,合并成一个大的数据块,然后进行封包。这样做虽然提高了效率,但是接收端就难于分辨出完整的数据包了,出现了粘包和拆包的问题,因为面向流的通信是无消息保护边界的。
解决方案
使用自定义协议+编解码器来解决,关键就是要解决服务器端每次读取数据长度的问题,这个问题解决,就不会出现服务器多读或少读数据的问题,从而避免的 TCP 粘包、拆包。
1、RPC(Remote Procedure Call)远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程
2、两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样
RPC的调用流程图
RPC 的目标就是将 2 - 8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用