rabbitmq_1源码

rabbitmq_1源码

带着疑惑rabbitmq如何用一条tcp连接的,下面就来解答

仓库地址: github.com/valinurovam/garagemq

  1. 定位main函数

    	// Start GarageMQ broker
    	srv.Start()
    
  2. Start()里面只有这个函数有用

    go srv.listen()
    
  3. Listen()函数

    for {
    		conn, err := srv.listener.AcceptTCP()
    		if err != nil {
    			if srv.status != Running {
    				return
    			}
    			srv.stopWithError(err, "accepting connection")
    		}
    		log.WithFields(log.Fields{
    			"from": conn.RemoteAddr().String(),
    			"to":   conn.LocalAddr().String(),
    		}).Info("accepting connection")
    
    		glog.DEBUG.Println("读取tcp数据大小: ", srv.config.TCP.ReadBufSize, " 写大小: ", srv.config.TCP.WriteBufSize)
    		conn.SetReadBuffer(srv.config.TCP.ReadBufSize)
    		conn.SetWriteBuffer(srv.config.TCP.WriteBufSize)
    		conn.SetNoDelay(srv.config.TCP.Nodelay)
    
    		// 处理连接,放进server的map
    		srv.acceptConnection(conn)
    	}
    
  4. 处理连接函数,只是简单的定位连接表示,然后继续看连接处理函数

    	srv.connLock.Lock()
    	defer srv.connLock.Unlock()
    
    	connection := NewConnection(srv, conn)
    	srv.connections[connection.id] = connection
    	go connection.handleConnection()
    
  5. handleConnection()

    先读取8字节标识符

    处理管道和连接的关系

    	conn.ctx, conn.cancelCtx = context.WithCancel(context.Background())
    
    	channel := NewChannel(0, conn)
    	conn.channelsLock.Lock()
    	conn.channels[channel.id] = channel
    	conn.channelsLock.Unlock()
    
    	channel.start()
    	conn.wg.Add(1)
    	go conn.handleOutgoing()
    	conn.wg.Add(1)
    	go conn.handleIncoming()
    
  6. 继续追踪管道start,类似于tcp连接读写监控

总结

初探之后,go仓库他没有做到连接复用,但是整体思想还是可以继续研究

原文地址:https://www.cnblogs.com/maomaomaoge/p/15603637.html