这篇文章将为大家详细讲解有关网状的中管道如何添加汉德勒,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
pipeline无疑是 netty中非常重要的一环,在io处理中netty通过已存放在pipeline中的handler有序处理socket中的信息,而pipeline中包含的都是一个个的handler,它是怎么添加进去的呢
创建代码如下:其中有handler和childhandler
公开作废开始()
eventloopgroup boss group=new nioeventloopgroup();
eventloopgroup group=new nioeventloopgroup();
尝试{
ServerBootstrapsb=newServerBootstrap();
某人的选择(渠道发展. SO_BACKLOG,1024);
//绑定线程池
某人团体
//指定使用的频道。通道(NioServerSocketChannel.class)
//绑定监听端口。本地地址(this.port)。handler(NewLoggingHandler(LogLevel .INFO))
//绑定客户端连接时候触发操作。childHandler(新的channel initializer socket channel(){ 0
@覆盖
铌
sp; protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
//字符串解码
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
//字符串编码
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast(new IdleStateHandler(0, 0, 120, TimeUnit.SECONDS));
}
});
// 服务器异步创建绑定
ChannelFuture cf = sb.bind().sync();
log.info(NettyServer.class + " 启动正在监听: " + cf.channel().localAddress());
// 关闭服务器通道
cf.channel().closeFuture().sync();
} finally {
// 释放线程池资源
group.shutdownGracefully().sync();
bossGroup.shutdownGracefully().sync();
}
}
顺着服务端bind方法点进去会看到如下:创建完channel实例后 会调用一个init方法
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
init方法为抽象方法 服务端实现如下
void init(Channel channel) throws Exception { //省略若干 ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); } //看这里 添加了一个ChannelInitializer实例 实例中实现了initChannel 获取配置的handler添加到pipeline p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
而pipeline的addlast方法如下
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); //这方法很简单 添加到链表中 但是此时添加的是上文的ChannelInitializer 而不是我们定义的handler addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventLoop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) { newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; } EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { callHandlerAddedInEventLoop(newCtx, executor); return this; } } //这个方法就是关键 将ChannelInitializer中的handler添加到pipeline中 以及删除ChannelInitializer //但请注意 首次注册不会走到这里 在前面的判断就会返回 会在注册的逻辑中触发 最终调用的一致 callHandlerAdded0(newCtx); return this; }
查看ChannelInitializer中的关键代码如下 OK 通过如下代码我们就看到 调用了ChannelInitializer的initChannel方法 就是在最开始我们定义的那个,然后里面就用继续调用pipeline add我们真正的handler
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { if (ctx.channel().isRegistered()) { // This should always be true with our current DefaultChannelPipeline implementation. // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers // will be added in the expected order. if (initChannel(ctx)) { // We are done with init the Channel, removing the initializer now. removeState(ctx); } } } private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.add(ctx)) { // Guard against re-entrance. try { initChannel((C) ctx.channel()); } catch (Throwable cause) { // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...). // We do so to prevent multiple calls to initChannel(...). exceptionCaught(ctx, cause); } finally { //看这里删除pipeline中的 ChannelInitializer ChannelInitializer 其实就是一个handler 只是它重写了handlerAdded方法 ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { pipeline.remove(this); } } return true; } return false; }
关于“netty中pipeline如何添加handler”这篇文章就分享到这里了,希望
内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/150152.html