源码rabbit_2_通过架构来看数据结构

源码rabbit_2_通过架构来看数据结构

架构参考文章: https://www.cnblogs.com/arthinking/p/15422958.html

纠正上一篇博客理解上的错误: 用一条tcp连接不是所有客户端都用一个,而是每个客户端都自己有一个,订阅不同主题啥的,不用创建一个连接,而是用虚拟管道(这和mqtt主题没啥不同,太像了)

生产端:一般地,同一个客户端(client)里面的每个生产者(producer)创建一个专门的通道(channel),复用同一个TCP连接(connection),每个生产者可以往Broker发布消息,发布消息的时候,需指定虚拟主机,以及虚拟主机上的交换机,并且消息需要带上routing_key;

架构图

https://cdn.itzhai.com/image-20211010222005614-a.png-itzhai

数据结构解析

Conn

// Connection represents AMQP-connection
type Connection struct {
	id               uint64
	server           *Server
	netConn          *net.TCPConn
	logger           *log.Entry
	channelsLock     sync.RWMutex
	channels         map[uint16]*Channel
	outgoing         chan *amqp.Frame
	clientProperties *amqp.Table
	maxChannels      uint16
	maxFrameSize     uint32
	statusLock       sync.RWMutex
	status           int
	qos              *qos.AmqpQos
	virtualHost      *VirtualHost
	vhostName        string
	closeCh          chan bool
	srvMetrics       *SrvMetricsState
	metrics          *ConnMetricsState
	userName         string

	wg        *sync.WaitGroup
	ctx       context.Context
	cancelCtx context.CancelFunc

	heartbeatInterval uint16
	heartbeatTimeout  uint16
	heartbeatTimer    *time.Ticker

	lastOutgoingTS chan time.Time
}

虚拟机

// VirtualHost represents AMQP virtual host
// Each virtual host is "parent" for its queues and exchanges
type VirtualHost struct {
	name            string
	system          bool
	exLock          sync.RWMutex
	exchanges       map[string]*exchange.Exchange
	quLock          sync.RWMutex
	queues          map[string]*queue.Queue
	msgStorageP     *msgstorage.MsgStorage
	msgStorageT     *msgstorage.MsgStorage
	srv             *Server
	srvStorage      *srvstorage.SrvStorage
	srvConfig       *config.Config
	logger          *log.Entry
	autoDeleteQueue chan string
}

队列

// Queue is an implementation of the AMQP-queue entity
type Queue struct {
	safequeue.SafeQueue
	name        string
	connID      uint64
	exclusive   bool
	autoDelete  bool
	durable     bool
	cmrLock     sync.RWMutex
	consumers   []interfaces.Consumer
	consumeExcl bool
	call        chan struct{}
	wasConsumed bool
	shardSize   int
	actLock     sync.RWMutex
	active      bool
	// persistent storage
	msgPStorage interfaces.MsgStorage
	// transient storage
	msgTStorage     interfaces.MsgStorage
	currentConsumer int
	metrics         *MetricsState
	autoDeleteQueue chan string
	queueLength     int64

	// lock for sync load swapped-messages from disk
	loadSwapLock           sync.Mutex
	maxMessagesInRAM       uint64
	lastStoredMsgID        uint64
	lastMemMsgID           uint64
	swappedToDisk          bool
	maybeLoadFromStorageCh chan struct{}
	wg                     *sync.WaitGroup
}

管道

// Channel is an implementation of the AMQP-channel entity
// Within a single socket connection, there can be multiple
// independent threads of control, called "channels"
type Channel struct {
	active             bool
	confirmMode        bool
	id                 uint16
	conn               *Connection
	server             *Server
	incoming           chan *amqp.Frame
	outgoing           chan *amqp.Frame
	logger             *log.Entry
	status             int
	protoVersion       string
	currentMessage     *amqp.Message
	cmrLock            sync.RWMutex
	consumers          map[string]*consumer.Consumer
	qos                *qos.AmqpQos
	consumerQos        *qos.AmqpQos
	deliveryTag        uint64
	confirmDeliveryTag uint64
	confirmLock        sync.Mutex
	confirmQueue       []*amqp.ConfirmMeta
	ackLock            sync.Mutex
	ackStore           map[uint64]*UnackedMessage
	metrics            *ChannelMetricsState

	bufferPool *pool.BufferPool

	closeCh chan bool
}
原文地址:https://www.cnblogs.com/maomaomaoge/p/15606319.html