如何实现一个RPC框架2 ——用Netty实现协议通信

前言:

​ 大家好,我是秋雨清笛,一个在读学生。这两个月里我初步实现了一个简单的RPC框架。做这个RPC框架的主要目的是为了学习,让自己能在平时的CRUD之余学习到一些不一样的东西,了解更多造轮子过程中的细节。实现并不是很复杂,主要目的还是学习。

接下来我将以几篇博文来谈谈我是怎么实现它的,遇到了一些什么样的技术细节。大家可以参照以下链接的源码:

https://github.com/PanYuDi/qyqd-rpc

本篇主要内容是用Netty实现通信

一、实现服务端

  1. 定义接口
public interface RpcServer {
    /**
     * 启动Rpc服务端
     */
    void start() throws InterruptedException;
}

  1. 写NettyServer
@Slf4j
public class NettyServer implements RpcServer {
    EventLoopGroup bossGroup;
    EventLoopGroup workerGroup;
    @Override
    public void start() {
        bossGroup = new NioEventLoopGroup();
        workerGroup = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new NettyChannelHandlerInitializer());
        try {
            ChannelFuture cf = bootstrap.bind(RpcConfig.PORT).sync();
            log.info("rpc server started at port " + RpcConfig.PORT);
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("occur exception when start server:", e);
        } finally {
            log.info("netty server closed");
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }


    }
}

需要说的不多,基本的Netty服务端模板,值得一提的是

.childOption(ChannelOption.SO_KEEPALIVE, true)

这一句启动了TCP长连接,为我们以后实现心跳机制和链接缓存埋下了伏笔

二、实现客户端

大家可以参考NettyClient客户端的代码,这里涉及到心跳机制的实现,具体实现我们下一篇再仔细讨论,我们要知道的是客户端唯一目的就是发送数据

public interface RpcClient {
    /**
     * 传输信息
     * @param message
     */
    public Object send(ProtocolRequestEndpointWrapper message);
}

三、编码和解码

这节我们这篇聊的重点

我们在上一篇定义了协议,现在我们要在Netty通信中实现这个协议,我们将在Netty handler调用链中实现协议的编码和解码。

Netty调用链机制分为了InboundHandler和OutboundHandler,这里不做赘诉。我们要实现的是把一个ProtocolMessage塞进Netty客户端然后经过客户端和服务端的handler能原封不动的由服务端拿到这个类

编码的实现

public class ChannelMessageEncoder extends MessageToByteEncoder<ProtocolMessage> {

    @Override
    protected void encode(ChannelHandlerContext ctx, ProtocolMessage msg, ByteBuf out) throws Exception {
        // 写入魔数
        out.writeBytes(ProtocolConstant.MAGIC);
        if(msg.getContent().length >= ProtocolConstant.MAX_FRAME_LENGTH) {
            throw new MessageCodecException("message size exceed");
        }
        // 长度包含了content长度加上其他字段的长度
        out.writeInt(msg.getContent().length + 14);
        // 写入版本号
        out.writeByte(ProtocolConstant.CURRENT_VERSION);
        // 写入消息类型
        out.writeByte(msg.getMessageType().getCode());
        // 写入requestId
        out.writeInt(msg.getRequestId());
        // 写入消息内容
        out.writeBytes(msg.getContent());
    }
}

我们使用Netty的MessageToByteEncoder,它可以将一个类写成Byte流。我们仅需按照我们定义的协议顺序依次写入

解码的实现

@Slf4j
public class ChannelMessageDecoder extends LengthFieldBasedFrameDecoder {
    public ChannelMessageDecoder() {
        super(ProtocolConstant.MAX_FRAME_LENGTH, 4,4, -8, 0);

    }
    public ChannelMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        ProtocolMessage protocolMessage = new ProtocolMessage();
        Object decoded = super.decode(ctx, in);
        if (decoded instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) decoded;
            return decodeFrame(buf);
        }
        return decoded;
    }

    private ProtocolMessage decodeFrame(ByteBuf buf) {
        ProtocolMessage protocolMessage = new ProtocolMessage();
        checkMagic(buf);
        protocolMessage.setLen(buf.readInt() - 14);
        checkVersion(buf);
        protocolMessage.setMessageType(ProtocolMessageTypeEnum.getEnum(buf.readByte()));
        protocolMessage.setRequestId(buf.readInt());
        byte[] content = new byte[protocolMessage.getLen()];
        buf.readBytes(content);
        protocolMessage.setContent(content);
        return protocolMessage;
    }
    private void checkMagic(ByteBuf buf) {
        byte[] magicData = new byte[ProtocolConstant.MAGIC_LEN];
        buf.readBytes(magicData);
        for(int i = 0; i < magicData.length; i++) {
            if(magicData[i] != ProtocolConstant.MAGIC[i]) {
                throw new MessageCodecException("magic check failed");
            }
        }
    }
    private void checkVersion(ByteBuf buf) {
        byte b = buf.readByte();
        Integer version = Integer.valueOf(b);
        if(!version.equals(ProtocolConstant.CURRENT_VERSION)) {
            throw new MessageCodecException("version check failed");
        }
    }
}

Netty提供的LengthFieldBasedFrameDecoder为我们提供了方便的解码方式

它的作用主要是按照偏移量先读取报文长度,然后自动读取到整个报文

    public ChannelMessageDecoder() {
        super(ProtocolConstant.MAX_FRAME_LENGTH, 4,4, -8, 0);

    }

构造函数的后四个参数决定了怎样读取这个报文

  1. lengthFieldOffset 长度域偏移量,因为我们的长度域之前有一个4B的魔数,所以这里我们传4
  2. lengthFieldLength 长度与的大小,我们协议中定义的是4B,所以传4
  3. lengthAdjustment 决定了读取完长度后,我们从哪里开始读?因为我们还是需要读取魔数和长度,所以我们回到开始的地方,也就是-8
  4. initialBytesToStrip 指定这个字段可以忽略到请求头的一些数据,我们设置为0不去跳过

了解这个解码器的原理后就简单了,接下来我们要做的就是依次从ByteBuf中读取数据就行了,也不用担心读多了,读多了也会抛出异常的,很容易debug

四、后续的处理

下一步就是在handler中继续处理ProtocolMessage了

public class NettyRpcServerChannelHandler extends ChannelInboundHandlerAdapter {
    MessageHandler messageHandler = new ServerMessageHandlerContextFactory().create();
    public NettyRpcServerChannelHandler() {
        super();
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if(msg instanceof ProtocolMessage) {
            ProtocolMessageUtils.setRequestId(((ProtocolMessage) msg).getRequestId());
            if(messageHandler.canHandle(msg)) {
                messageHandler.handle((RequestMessage) msg, ctx);
            }
        }
    }
}

这里的MessageHandler是我自己定义的抽象接口,这个handler会按照类型依次处理请求。

接下来就是具体的业务处理了,通过Netty的协议通信也基本实现了,下一篇将介绍心跳机制的实现

原文地址:https://www.cnblogs.com/PanYuDi/p/15795481.html