RabbitMQ学习之ConntectionFactory与Conntection的认知

在发送和接收消息重要的类有:ConnectionFactory, Connection,Channel和 QueueingConsumer。

ConntectionFactory类是方便创建与AMQP代理相关联的Connection;下面来看看ConntectionFactory是如何创建一个Contention.首先通过new ConnectionFactory()创建一个ConnectionFactory;并设置此连接工厂的主机设置为broke IP。通过ConnectionFactory的newConnection()方法 创建一个Connection; newConnection方法通过得到当前连接的地址及端口号来获得一个Address,通过createFrameHandler的方法 来得到FrameHandler;再通过new AMQConnection(this, frameHandler)来得到Connection并启动。
    代码清单1创建Connection的源码(ConnectionFactory.Java中)

[java] view plain copy
 
 print?
  1. protected FrameHandler createFrameHandler(Address addr)  
  2.         throws IOException {  
  3.   
  4.         String hostName = addr.getHost();//获取主机IP  
  5.         int portNumber = portOrDefault(addr.getPort());//获取端口  
  6.         Socket socket = null;  
  7.         try {  
  8.             socket = factory.createSocket();  
  9.             configureSocket(socket);  
  10.             socket.connect(new InetSocketAddress(hostName, portNumber),  
  11.                     connectionTimeout);  
  12.             return createFrameHandler(socket);  
  13.         } catch (IOException ioe) {  
  14.             quietTrySocketClose(socket);  
  15.             throw ioe;  
  16.         }  
  17.     }  
  18.   
  19.     private static void quietTrySocketClose(Socket socket) {  
  20.         if (socket != null)  
  21.             try { socket.close(); } catch (Exception _) {/*ignore exceptions*/}  
  22.     }  
  23.   
  24.     protected FrameHandler createFrameHandler(Socket sock)  
  25.         throws IOException  
  26.     {  
  27.         return new SocketFrameHandler(sock);  
  28.     }  
  29.   
  30.     /** 
  31.      *  Provides a hook to insert custom configuration of the sockets 
  32.      *  used to connect to an AMQP server before they connect. 
  33.      * 
  34.      *  The default behaviour of this method is to disable Nagle's 
  35.      *  algorithm to get more consistently low latency.  However it 
  36.      *  may be overridden freely and there is no requirement to retain 
  37.      *  this behaviour. 
  38.      * 
  39.      *  @param socket The socket that is to be used for the Connection 
  40.      */  
  41.     protected void configureSocket(Socket socket) throws IOException{  
  42.         // disable Nagle's algorithm, for more consistently low latency  
  43.         socket.setTcpNoDelay(true);  
  44.     }  
  45.   
  46.     /** 
  47.      * Create a new broker connection 
  48.      * @param addrs an array of known broker addresses (hostname/port pairs) to try in order 
  49.      * @return an interface to the connection 
  50.      * @throws IOException if it encounters a problem 
  51.      */  
  52.     public Connection newConnection(Address[] addrs) throws IOException {  
  53.         return newConnection(null, addrs);  
  54.     }  
  55.   
  56.     /** 
  57.      * Create a new broker connection 
  58.      * @param executor thread execution service for consumers on the connection 
  59.      * @param addrs an array of known broker addresses (hostname/port pairs) to try in order 
  60.      * @return an interface to the connection 
  61.      * @throws IOException if it encounters a problem 
  62.      */  
  63.     public Connection newConnection(ExecutorService executor, Address[] addrs)  
  64.         throws IOException  
  65.     {  
  66.         IOException lastException = null;  
  67.         for (Address addr : addrs) {  
  68.             try {  
  69.                 FrameHandler frameHandler = createFrameHandler(addr);  
  70.                 AMQConnection conn =  
  71.                     new AMQConnection(username,  
  72.                                       password,  
  73.                                       frameHandler,  
  74.                                       executor,  
  75.                                       virtualHost,  
  76.                                       getClientProperties(),  
  77.                                       requestedFrameMax,  
  78.                                       requestedChannelMax,  
  79.                                       requestedHeartbeat,  
  80.                                       saslConfig);  
  81.                 conn.start();  
  82.                 return conn;  
  83.             } catch (IOException e) {  
  84.                 lastException = e;  
  85.             }  
  86.         }  
  87.   
  88.         throw (lastException != null) ? lastException  
  89.                                       : new IOException("failed to connect");  
  90.     }  
  91.   
  92.     /** 
  93.      * Create a new broker connection 
  94.      * @return an interface to the connection 
  95.      * @throws IOException if it encounters a problem 
  96.      */  
  97.     public Connection newConnection() throws IOException {  
  98.         return newConnection(null,  
  99.                              new Address[] {new Address(getHost(), getPort())}  
  100.                             );  
  101.     }  
  102.   
  103.     /** 
  104.      * Create a new broker connection 
  105.      * @param executor thread execution service for consumers on the connection 
  106.      * @return an interface to the connection 
  107.      * @throws IOException if it encounters a problem 
  108.      */  
  109.     public Connection newConnection(ExecutorService executor) throws IOException {  
  110.         return newConnection(executor,  
  111.                              new Address[] {new Address(getHost(), getPort())}  
  112.                             );  
  113.     }  

  代码清单2 连接启动

[java] view plain copy
 
 print?
  1. /** 
  2.      * Start up the connection, including the MainLoop thread. 
  3.      * Sends the protocol 
  4.      * version negotiation header, and runs through 
  5.      * Connection.Start/.StartOk, Connection.Tune/.TuneOk, and then 
  6.      * calls Connection.Open and waits for the OpenOk. Sets heart-beat 
  7.      * and frame max values after tuning has taken place. 
  8.      * @throws IOException if an error is encountered 
  9.      * either before, or during, protocol negotiation; 
  10.      * sub-classes {@link ProtocolVersionMismatchException} and 
  11.      * {@link PossibleAuthenticationFailureException} will be thrown in the 
  12.      * corresponding circumstances. If an exception is thrown, connection 
  13.      * resources allocated can all be garbage collected when the connection 
  14.      * object is no longer referenced. 
  15.      */  
  16.     public void start()  
  17.         throws IOException  
  18.     {  
  19.         this._running = true;  
  20.         // Make sure that the first thing we do is to send the header,  
  21.         // which should cause any socket errors to show up for us, rather  
  22.         // than risking them pop out in the MainLoop  
  23.         AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =  
  24.             new AMQChannel.SimpleBlockingRpcContinuation();  
  25.         // We enqueue an RPC continuation here without sending an RPC  
  26.         // request, since the protocol specifies that after sending  
  27.         // the version negotiation header, the client (connection  
  28.         // initiator) is to wait for a connection.start method to  
  29.         // arrive.  
  30.         _channel0.enqueueRpc(connStartBlocker);  
  31.         try {  
  32.             // The following two lines are akin to AMQChannel's  
  33.             // transmit() method for this pseudo-RPC.  
  34.             _frameHandler.setTimeout(HANDSHAKE_TIMEOUT);  
  35.             _frameHandler.sendHeader();  
  36.         } catch (IOException ioe) {  
  37.             _frameHandler.close();  
  38.             throw ioe;  
  39.         }  
  40.   
  41.         // start the main loop going  
  42.         new MainLoop("AMQP Connection " + getHostAddress() + ":" + getPort()).start();  
  43.         // after this point clear-up of MainLoop is triggered by closing the frameHandler.  
  44.   
  45.         AMQP.Connection.Start connStart = null;  
  46.         AMQP.Connection.Tune connTune = null;  
  47.         try {  
  48.             connStart =  
  49.                 (AMQP.Connection.Start) connStartBlocker.getReply().getMethod();  
  50.   
  51.             _serverProperties = Collections.unmodifiableMap(connStart.getServerProperties());  
  52.   
  53.             Version serverVersion =  
  54.                 new Version(connStart.getVersionMajor(),  
  55.                             connStart.getVersionMinor());  
  56.   
  57.             if (!Version.checkVersion(clientVersion, serverVersion)) {  
  58.                 throw new ProtocolVersionMismatchException(clientVersion,  
  59.                                                            serverVersion);  
  60.             }  
  61.   
  62.             String[] mechanisms = connStart.getMechanisms().toString().split(" ");  
  63.             SaslMechanism sm = this.saslConfig.getSaslMechanism(mechanisms);  
  64.             if (sm == null) {  
  65.                 throw new IOException("No compatible authentication mechanism found - " +  
  66.                         "server offered [" + connStart.getMechanisms() + "]");  
  67.             }  
  68.   
  69.             LongString challenge = null;  
  70.             LongString response = sm.handleChallenge(null, this.username, this.password);  
  71.   
  72.             do {  
  73.                 Method method = (challenge == null)  
  74.                     ? new AMQP.Connection.StartOk.Builder()  
  75.                                     .clientProperties(_clientProperties)  
  76.                                     .mechanism(sm.getName())  
  77.                                     .response(response)  
  78.                           .build()  
  79.                     : new AMQP.Connection.SecureOk.Builder().response(response).build();  
  80.   
  81.                 try {  
  82.                     Method serverResponse = _channel0.rpc(method).getMethod();  
  83.                     if (serverResponse instanceof AMQP.Connection.Tune) {  
  84.                         connTune = (AMQP.Connection.Tune) serverResponse;  
  85.                     } else {  
  86.                         challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();  
  87.                         response = sm.handleChallenge(challenge, this.username, this.password);  
  88.                     }  
  89.                 } catch (ShutdownSignalException e) {  
  90.                     throw new PossibleAuthenticationFailureException(e);  
  91.                 }  
  92.             } while (connTune == null);  
  93.         } catch (ShutdownSignalException sse) {  
  94.             _frameHandler.close();  
  95.             throw AMQChannel.wrap(sse);  
  96.         } catch(IOException ioe) {  
  97.             _frameHandler.close();  
  98.             throw ioe;  
  99.         }  
  100.   
  101.         try {  
  102.             int channelMax =  
  103.                 negotiatedMaxValue(this.requestedChannelMax,  
  104.                                    connTune.getChannelMax());  
  105.             _channelManager = new ChannelManager(this._workService, channelMax);  
  106.   
  107.             int frameMax =  
  108.                 negotiatedMaxValue(this.requestedFrameMax,  
  109.                                    connTune.getFrameMax());  
  110.             this._frameMax = frameMax;  
  111.   
  112.             int heartbeat =  
  113.                 negotiatedMaxValue(this.requestedHeartbeat,  
  114.                                    connTune.getHeartbeat());  
  115.   
  116.             setHeartbeat(heartbeat);  
  117.   
  118.             _channel0.transmit(new AMQP.Connection.TuneOk.Builder()  
  119.                                 .channelMax(channelMax)  
  120.                                 .frameMax(frameMax)  
  121.                                 .heartbeat(heartbeat)  
  122.                               .build());  
  123.             _channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder()  
  124.                                       .virtualHost(_virtualHost)  
  125.                                     .build());  
  126.         } catch (IOException ioe) {  
  127.             _heartbeatSender.shutdown();  
  128.             _frameHandler.close();  
  129.             throw ioe;  
  130.         } catch (ShutdownSignalException sse) {  
  131.             _heartbeatSender.shutdown();  
  132.             _frameHandler.close();  
  133.             throw AMQChannel.wrap(sse);  
  134.         }  
  135.   
  136.         // We can now respond to errors having finished tailoring the connection  
  137.         this._inConnectionNegotiation = false;  
  138.   
  139.         return;  
  140.     }  


转载:http://wubin850219.iteye.com/blog/1007984

原文地址:https://www.cnblogs.com/telwanggs/p/7124719.html