netty之管道处理流程

  1、我们在使用netty的是有都会存在将channelBuffer的数据处理成相应的String或者自定义数据。而这里主要是介绍管道里面存在的上行和下行的数据处理方式

  2、通过一张图片来看一下具体管道中的实现过程

  

  一个Channel中包括一个Socket、一个ChannelPipeline。一个ChannelPipeline中有一个ChannelSink和多个ChannelHandler。ChannelHandler分为两种:UpstremHandler、DownstreamHandler。

  不论是读数据还是写数据都要经过Channel中的ChannelPipeline。读数据的过程是从Socket到ChannelPipeline,由ChannelPipeline交给里面的UpstreamHandler(或者叫做InBoundHandler)从下到上依次处理 。写数据时,由要经过ChannelPipeline里面在DownStreamHandler(或者是OutBoundHandler)由上到下依次处理。

  3、因为UpstremHandler与DownstreamHandler的实现方式大同小异,我这里写的例子是UpstremHandler的例子

package com.troy.application.upstream;


import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.timeout.IdleStateHandler;
import org.jboss.netty.util.HashedWheelTimer;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Server {

    public static void main(String[] args) {

        //声明服务类
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        //设定线程池
        ExecutorService boss = Executors.newCachedThreadPool();
        ExecutorService work = Executors.newCachedThreadPool();

        //设置工厂
        serverBootstrap.setFactory(new NioServerSocketChannelFactory(boss,work));

        //设置管道流
        serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline channelPipeline = Channels.pipeline();
                //添加处理方式
                channelPipeline.addLast("idle",new IdleStateHandler(new HashedWheelTimer(),5,5,10));
                channelPipeline.addLast("handler1",new Handler1());
                channelPipeline.addLast("handler2",new Handler2());
                return channelPipeline;
            }
        });

        //设置端口
        serverBootstrap.bind(new InetSocketAddress(9000));
    }
}
package com.troy.application.upstream;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.UpstreamMessageEvent;

public class Handler1 extends SimpleChannelHandler {
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        ChannelBuffer channelBuffer = (ChannelBuffer) e.getMessage();
        byte[] array = channelBuffer.array();
        String message = new String(array);
        System.out.println("handler1"+message);
        ctx.sendUpstream(new UpstreamMessageEvent(ctx.getChannel(),message,e.getRemoteAddress()));
    }
}
package com.troy.application.upstream;

import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;

public class Handler2 extends SimpleChannelHandler {
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        String message = (String) e.getMessage();
        System.out.println("handler2"+message);
    }
}

  说明:这里最重要的两个方法是sendUpstream和sendDownstream。这两个方式在上行和下行的处理基本上是一样的。在源码里面handler的处理都会存在sendUpstream和sendDownstream。这个两个方法也是多重处理的基础。

原文地址:https://www.cnblogs.com/ll409546297/p/8041872.html