rocketMQ 请求和响应

java 中的对象,请求和响应都是一个 RemotingCommand 对象。

RemotingCommand
  code 请求码,请求码/响应码
  CommandCustomHeader 对应的请求头
  byte[] 请求体,非必需

如下是一个 RegisterBrokerRequestHeader 对应的 RemotingCommand,RegisterBrokerRequestHeader 对象中的属性会被放入到 extFields 中

// org.apache.rocketmq.remoting.protocol.RemotingCommand#makeCustomHeaderToNet
{
    "code": 103,
    "extFields": {
        "brokerId": "0",
        "bodyCrc32": "546992350",
        "clusterName": "DefaultCluster",
        "brokerAddr": "172.18.232.137:10911",
        "haServerAddr": "172.18.232.137:10912",
        "compressed": "false",
        "brokerName": "broker-a"
    },
    "flag": 0,
    "language": "JAVA",
    "opaque": 42,
    "serializeTypeCurrentRPC": "JSON",
    "version": 275
}

请求响应对

Register Broker request
Register Broker response

对象序列化

header:使用 fastjson 转化为 json,然后 getByte
body:业务自行序列化

RemotingCommand
head length, head data, body data

网络发送

tcp 包

total length, head length, head data, body data
// 编码成 tcp
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand>
// 对 tcp 包进行解码
public class NettyDecoder extends LengthFieldBasedFrameDecoder

请求和响应对

/**
 * This map caches all on-going requests.
 * opaque 是请求 id,AtomicInteger 持续增加
 */
protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
    new ConcurrentHashMap<Integer, ResponseFuture>(256);

// 同时有定时任务,扫描过期的请求
NettyRemotingAbstract#scanResponseTable

请求的发送,分为同步/异步/oneway

同步和异步,需要服务端的响应,使用请求 id 来把请求和响应对应起来

NettyRemotingAbstract#invokeAsyncImpl
NettyRemotingAbstract#invokeSyncImpl

异步和 oneway,在极端情况下,会导致 on-going 的请求太多,导致 OOM,因此需要设限,使用的是 Semaphore

原文地址:https://www.cnblogs.com/allenwas3/p/12251974.html