Netty-事件及其流水线

ChannelHandler家族

Channel的生命周期

状态 描述
ChannelUnregistered Channel已经被创建,但还未注册到EventLoop
ChannelRegistered Channel已经被注册到EventLoop
ChannelActive Channel处于活动状态(已经连接到远程节点)。它现在可以接受或发送数据
ChannelInactive Channel没有连接到远程节点

Channel的正常生命周期如下图

点击加载

ChannelHandler的生命周期

下表列出了interface ChannelHandler定义的生命周期操作,在ChannelHandler被添加到ChannelPipeline中或者被从ChanngelPipeline中移除时会调用这些操作。这些方法中的每一个都接受一个ChannelHandlerContext参数。

方法 描述
handlerAdded 当把ChannelHandler添加到ChannelPipeline中时被调用
handlerRemoved 当从ChannelPipeline中移除ChannelHandler时被调用
exceptionCaught 当处理过程中ChannelPipeline中有错误产生时被调用

ChannelInboundHandler接口

处理入站数据以及各种状态变化
下表列出了interface ChannelInboundHandler的生命周期方法。这些方法将会在数据被接收或者与其对应的Channel状态发生改变时调用,这些方法和Channel的生命周期密切相关。

方法名称 描述
channelRegistered 当Channel已经注册到它的EventLoop并且能够处理I/O时被调用
channelUnregistered 当Channel从它的EventLoop注销并且无法处理任何I/O时被调用
channelActive 当Channel处于活动状态时被调用;Channel已经连接/绑定并且已经就绪
channelInactive 当Channel离开活动状态并且不再连接它的远程节点时被调用
channelReadComplete 当所有可读的字节都已经被从Channel中读取之后
channelRead 当从Channel读取数据时被调用
ChannelWritablilityChanged 当Channel的可写状态发生改变时被调用。用户可以确保写操作不会完成得太快(避免发生OutOfMemoryError)或者可以在Channel变为再次写时恢复写入。可以通过调用Channel的isWritable()方法来检测Channel的可写性。
userEventTriggered 当ChannelInboundHandler.fireUserEventTriggered()调用时,调用userEventTriggered方法,因为一个POJO被传经了ChannelPipeline
1
2
3
4
5
6
7
@Sharable
public class DiscardHandler extends ChannelInboudHandlerAdapter{
@Override
public void ChannelRead(ChannelHandlerContext ctx, Object msg){
ReferenceCountUtil.release(msg); //丢弃已接收的消息
}
}

使用SimpleChannelInboundHandler可以更加方便的管理资源。

1
2
3
4
5
6
7
@Sharable
public class SimpleDiscardHandler extends SimpleRead0(ChannelHandlerContext ctx, Object msg){
@Override
public void channelRead0(channelHandlerContext ctx, Object msg)
//资源在被读取之后将被释放,所以不能存储
///不需要任何显示的释放资源
}

由于SimpleChannelInboundHandler的channelRead0()方法消费之后会自动释放资源,所以你不应该存储指向任何消息的引用供将来使用,因为这些引用都将失效。

在出站方向这边,如果你处理了write()操作并丢弃了一个消息,那么你也应该负责释放它。

1
2
3
4
5
6
7
8
9
//丢弃并释放出站消息
@Sharable
public class DiscardOutboundHandler extends ChannelOutboundHandlerAdapter{
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise){
ReferenceCountUtil.release(msg);
promise.setSuccess(); //通知ChannelPromise数据已经被处理了
}
}

不仅要释放资源,还要通知ChannelPromise。否则可能会出现ChannelFutureListener收不到某个消息已经被处理了的通知的情况。
如果一个消息被消费或者丢弃了,并且没有传递给ChannelPipeline中的下一个ChannelOutboundHandler,那么用户就有责任调用ReferenceCountUtil.release()。如果消息到达了实际的传输层,那么当它被写入或者Channel关闭时,都将被自动释放。

ChannelOutboundHandler接口

处理出站数据并且允许拦截所有的操作

ChannelOutboundHandler的一个强大功能是可以按需推迟操作或者事件,这使得可以通过一些复杂的方法来处理请求。例如,如果到远程节点的写入被暂停了,那么你可以推迟冲刷操作并在稍后继续。

ChannelOutboundHandler的方法

ChannelHandlerContext ctx;
ScokAddress sa;
ChannelPromise cp;

类型 描述
bind(ctx, sa, cp) 当请求将Channel绑定到本地地址时被调用
connect(ctx, sa, sa, cp) 当请求将Channel连接到远程节点时被调用
disconnect(ctx, cp) 当请求将Channel从远程节点断开时被调用
close(ctx, cp) 当请求关闭Channel时被调用
deregister(ctx, cp) 当请求将Channel从它的EventLoop注销时被调用
read(ctx)) 当请求从Channel读取更多的数据时被调用
flush(ctx) 当请求通过Channel将入队数据冲刷到远程节点时被调用
write(ctx, object, cp) 当请求通过Channel将数据写到远程节点时被调用

ChannelPromise(cp):作用是在操作完成时得到通知。ChannelPromise是ChannelFuture的一个子类。

ChannelHandler适配器

点击加载

ChannelPipeline接口

可以这样理解,ChannelPipeline是一个拦截流经Channel的入站和出站事件的ChannelHandler实例链,那么就很容易看出这些ChannelHandler之间的交互是如何组成一个应用程序数据和事件处理逻辑核心的。
每一个新创建的Channel都将会被分配一个新的ChannelPipeline。这项关联是永久性的;Channel既不能附加另一个ChannelPipeline,也不能分离其当前的。
根据事件的起源,事件将会被ChannelInboundHandler或者ChannelOutboundHanlder处理。随后,通过调用ChannelHandlerContext实现,它(事件)将被转发给下一个的ChannelHandler。

点击加载

如图所示,Netty总是将ChannelPipeline的入站口(左侧)作为头部,而将出站口(该图右侧)作为尾端。

在ChannelPipeline传播事件时,它会测试ChannelPipeline中的下一个ChannelHandler的类型是否和事件的运动方向相匹配。如果不匹配,ChannelPipeline将跳过该ChannelHandler并前进到下一个,直到它找到和该事件所期望的方向相匹配为止。

ChanndelHandlerContext接口

ChannelHandlerContext的主要功能是管理它所关联的ChannelHandler和在同一个ChannelPipeline中的其他ChannelHandler之间的交互。

区别:调用Channel或者ChannelPipeline上的方法,它们将沿着整个ChannelPipeline上传播,而调用ChannelHandlerContext上的相同方法,则从所关联的ChannelHandler开始处理,并且只会传播给位于该ChannelPipe中的下一个能够处理该事件的ChannelHandler。

ChannelHandlerContext的API

方法名称 描述
alloc 返回和这个实例相关联的Channel所配置的ByteBufAllocator
bind 绑定到给定的SocketAddress,并返回ChannelFuture
channel 返回绑定到这个实例的Channel,并返回ChannelFuture
close 关闭Channel,并返回ChannelFuture
connect 连接给定的SocketAddress,并返回ChannelFuture
deregister 从之前分配的EventExecutor注销,并返回ChannelFuture
disconnect 从远程节点断开,并返回ChannelFuture
executor 返回调度事件的EventExecutor
handlder 返回绑定到这个实例的ChannelHandler
isRemoved 如果所关联的ChannelHandler已经被从ChannelPipeline中移除则返回true
name 返回这个实例的唯一名称
pipe 返回这个实例所关联的ChannelPipeline
read 将数据从Channel读取到第一个入站缓冲区;如果读取成功则触发一个ChannnelRead事件,并(在最后一个消息被读取后)通知ChannelInboundHandler的channelReadComplete(channelHandlerContext ctx)方法
write 通过这个实例写入消息经过ChannelPipeline
writeAndFlush 通过这个实例写入并冲刷消息,经过channelPipeline

当使用ChannelHandlerContextd的API时候,请牢记以下两点:

  • ChannelHandlerContext和ChannelHandler之间的关联(绑定)是永不会改变的,所以缓存对它的引用是安全的
  • ChannelHandlerContext的方法将产生更短的事件流

点击加载

如下代码,将通过ChannelHandlerContext获取到Channel的引用。调用Channel上的write()方法将会导致写入事件从尾端到头部地流经ChannelPipeline。

1
2
3
4
5
6
7
8
9
10
11
//从ChannelHandlerContext访问Channel
ChannelHandlerContext ctx = ...;
Channel channel = ctx.channel();
//通过Channel写入缓冲区
channel.write(Unpooled.copiedBuffer("Cherry", CharsetUtil.UTF_8));
//通过ChannelHandlerContext访问ChannelPipeline
ChannelHandlerContext ctx = ...;
ChannelPipeline pipeline = ctx.pipeline();
//通过ChannelPipeline写入缓冲区
pipeline.write(Unpooled.copiedBuffer("Cherry", CharsetUtil.UTF_8));

上面代码,被调用的Channel或ChannelPipeline上的write()方法将一直传播事件通过整个ChannelPipeline,但是在ChannelHandler级别上,事件从一个ChannelHandler到一下ChannelHandler的移动是由ChannelHandlerContext上的调用完成。
也就是我们的write(数据)中的数据会在整条流水线上传递。

点击加载

从ChannelPipe中的某个特定点进行事件传播

  • 减少将事件传经对它不感兴趣的ChannelHandler所带来的开销
  • 避免将时间传经那些可能会对它有副作用的ChannelHandler
1
2
ChannelHandlerContext ctx = ...;
ctx.write(Unpooled.copiedBuffer("Hello", CharsetUtil.UTF8));

写入的数据将在流水线上,从本ChannelHandler及其之后的ChannelHanlder。

点击加载

高级用法

缓存ChannelHandlerContext的引用

1
2
3
4
5
6
7
8
9
10
public class WriteHandler extends ChannelHandlerAdapter{
private ChannelHandlerContext ctx;
@Override
public void handlerAdd(ChannelHandlerContext ctx){
this.ctx = ctx;
}
public void send(String msg){
ctx.writeAndFlush(msg);
}
}

因为一个ChannelHandler可以从属多个ChannelPipeline,所以它也可以绑定到多个ChannelHandlerContext实例。对于这种用法指在多个ChannelPipeline中共享同一个ChannelHandler,对应的ChannelHandlerI必须使用@ChannelHandler.Sharable注解标注;否则,试图将它添加到多个ChannelPipeline时将会触发异常。显而易见,为了安全地被用于多个并发的Channel(即连接),这样的ChannelHandler必须是线程安全的。

共享同一个ChannelHandler的用处:再多个ChannelPipeline中安装同一个ChannelHandler的一个常见原因用于收集跨越多个Channel的统计信息。

异常处理