Pigeon源码分析(三) -- 客户端发送tcp底层源码分析

经过之前的分析,我们知道,一个请求显示经过层层的责任链,最后才会发出去。而决定发送到消息格式是在责任链中的一环完成的

InvokerProcessHandlerFactory # init()

public static void init() {
        if (!isInitialized) {
            if (Constants.MONITOR_ENABLE) {
                registerBizProcessFilter(new RemoteCallMonitorInvokeFilter());
            }
            registerBizProcessFilter(new TraceFilter());
            registerBizProcessFilter(new FaultInjectionFilter());
            registerBizProcessFilter(new DegradationFilter());
            registerBizProcessFilter(new ClusterInvokeFilter());
            registerBizProcessFilter(new GatewayInvokeFilter());
            registerBizProcessFilter(new ContextPrepareInvokeFilter());
            registerBizProcessFilter(new SecurityFilter());
            registerBizProcessFilter(new RemoteCallInvokeFilter());
            bizInvocationHandler = createInvocationHandler(bizProcessFilters);
            isInitialized = true;
        }
    }

我们看这个 ClusterInvokeFilter 

public class ClusterInvokeFilter extends InvocationInvokeFilter {

    private static final Logger logger = LoggerLoader.getLogger(ClusterInvokeFilter.class);

    public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
            throws Throwable {
        InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
        Cluster cluster = ClusterFactory.selectCluster(invokerConfig.getCluster());
        if (cluster == null) {
            throw new IllegalArgumentException("Unsupported cluster type:" + cluster);
        }
        return cluster.invoke(handler, invocationContext);
    }

}

这里以常见的 FailfastCluster 为例

@Override
    public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
            throws Throwable {
        InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
        InvocationRequest request = InvokerUtils.createRemoteCallRequest(invocationContext, invokerConfig);

        boolean timeoutRetry = invokerConfig.isTimeoutRetry();
......

  InvokerUtils

public static InvocationRequest createRemoteCallRequest(InvokerContext invokerContext,
                                                            InvokerConfig<?> invokerConfig) {
        InvocationRequest request = invokerContext.getRequest();
        if (request == null) {
            request = SerializerFactory.getSerializer(invokerConfig.getSerialize()).newRequest(invokerContext);
            invokerContext.setRequest(request);
        }
        return request;
    }

  拿到序列化器在决定请求的类型,序列化器主要分为两大类,一个是ThiftSeralizer,一类是非ThiftSeralizer

   ThriftSerializer

 public InvocationRequest newRequest(InvokerContext invokerContext) throws SerializationException {
        return new GenericRequest(invokerContext);
    }

  AbstractSerializer

public InvocationRequest newRequest(InvokerContext invokerContext) throws SerializationException {
        return InvocationUtils.newRequest(invokerContext);
    }

最终调用的是  DefaultInvocationBuilder

public InvocationRequest newRequest(InvokerContext invokerContext) {
        return new DefaultRequest(invokerContext);
    }

到这里就知道了,请求的类型就两大类,一类是GenericRequest 一类是 DefaultRequest

二 编码阶段

我们看客户端的channelHandlers。习惯了netty4的写法刚开始看还真不习惯 呵呵。

public class NettyClientPipelineFactory implements ChannelPipelineFactory {

    private NettyClient client;

    private static CodecConfig codecConfig = CodecConfigFactory.createClientConfig();

    public NettyClientPipelineFactory(NettyClient client) {
        this.client = client;
    }

    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline pipeline = pipeline();
        pipeline.addLast("framePrepender", new FramePrepender());
        pipeline.addLast("frameDecoder", new FrameDecoder());
        pipeline.addLast("crc32Handler", new Crc32Handler(codecConfig));
        pipeline.addLast("compressHandler", new CompressHandler(codecConfig));
        pipeline.addLast("invokerDecoder", new InvokerDecoder());
        pipeline.addLast("invokerEncoder", new InvokerEncoder());
        pipeline.addLast("clientHandler", new NettyClientHandler(this.client));
        return pipeline;
    }

}

主要分析这个类  InvokerEncoder。继续分析最终加密的逻辑在 

AbstractEncoder # encode

public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
        if (msg instanceof InvocationSerializable) {

            InvocationSerializable _msg = (InvocationSerializable) msg;
            try {

                ChannelBuffer frame;
                CodecEvent codecEvent;

                if (msg instanceof UnifiedInvocation) {//这就是Thrift对应的请求类型
                    frame = _doEncode(channel, (UnifiedInvocation) _msg);
                    codecEvent = new CodecEvent(frame, true);
                } else {
                    frame = doEncode(channel, _msg);//这是一般类型
                    codecEvent = new CodecEvent(frame, false);
                }
protected ChannelBuffer _doEncode(Channel channel, UnifiedInvocation msg)
            throws IOException {

        ChannelBufferOutputStream os = new ChannelBufferOutputStream(dynamicBuffer(CodecConstants.ESTIMATED_LENGTH,
                channel.getConfig().getBufferFactory()));

        //magic
        os.write(CodecConstants._MAGIC);//(byte) 0xAB (byte) 0xBA
        os.writeByte(msg.getProtocolVersion());//第三个字节
        //serialize
        byte serialize = SerializerFactory.convertToUnifiedSerialize(msg.getSerialize());
        //serialize
        os.writeByte(serialize);//第4个字节序列化方式
        //totalLength
        os.writeInt(Integer.MAX_VALUE);//5-8字节是消息体长度

        serialize(msg.getSerialize(), os, msg, channel);

        ChannelBuffer frame = os.buffer();
        //totalLength
        frame.setInt(CodecConstants._HEAD_LENGTH, frame.readableBytes() -
                CodecConstants._FRONT_LENGTH_);//这里会重新设置5-8字节的长度值

 再分析一下非Thrift类型的消息请求编码

protected ChannelBuffer doEncode(Channel channel, InvocationSerializable msg)
            throws IOException {
        ChannelBufferOutputStream os = new ChannelBufferOutputStream(dynamicBuffer(CodecConstants.ESTIMATED_LENGTH,
                channel.getConfig().getBufferFactory()));
        //magic
        os.write(CodecConstants.MAGIC); // 0x39 0x3A
        //serialize
        os.writeByte(msg.getSerialize());//序列化类型 这里注意 消息头部分就3个字节 和上面可是不一样的 
        //bodyLength
        os.writeInt(Integer.MAX_VALUE);//消息体长度

        serialize(msg.getSerialize(), os, msg, channel);
        //body
        ChannelBuffer frame = os.buffer();
        //sequence
        frame.writeLong(msg.getSequence());//写完消息体之后写8个字节长度的序列号
        //expand
        frame.writeBytes(CodecConstants.EXPAND);//再写三个扩展字段
     //

public static final byte EXPAND_FIRST = 0x1D;
public static final byte EXPAND_SECOND = 0x1E;
public static final byte EXPAND_THIRD = 0x1F;

//bodyLength
        frame.setInt(CodecConstants.HEAD_LENGTH, frame.readableBytes() -
                CodecConstants.FRONT_LENGTH);//再把4-7字节位置上重新 赋值一个int型
        doAfter(msg, frame.readableBytes());
        return frame;
    }
原文地址:https://www.cnblogs.com/juniorMa/p/14842724.html