JxtaBiDiPipe主动连接的问题

首先看几个connect方法

    /**
     * Connect to a JxtaServerPipe with default timeout
     *
     * 
@param group  group context
     * 
@param pipeAd PipeAdvertisement
     * 
@throws IOException if an io error occurs
     
*/
    
public void connect(PeerGroup group, PipeAdvertisement pipeAd) throws IOException {
        connect(group, pipeAd, timeout);
    }

    
/**
     * Connects to a remote JxtaBiDiPipe
     *
     * 
@param group   group context
     * 
@param pipeAd  PipeAdvertisement
     * 
@param timeout timeout in ms, also reset object default timeout
     *                to that of timeout
     * 
@throws IOException if an io error occurs
     
*/
    
public void connect(PeerGroup group, PipeAdvertisement pipeAd, int timeout) throws IOException {
        connect(group, 
null, pipeAd, timeout, null);
    }

    
/**
     * Connects to a remote JxtaServerPipe
     *
     * 
@param group       group context
     * 
@param peerid      peer to connect to
     * 
@param pipeAd      PipeAdvertisement
     * 
@param timeout     timeout in ms, also reset object default timeout to that of timeout
     * 
@param msgListener application PipeMsgListener
     * 
@throws IOException if an io error occurs
     
*/
    
public void connect(PeerGroup group, PeerID peerid, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener) throws IOException {
        connect(group, peerid, pipeAd, timeout, msgListener, isReliable);
    }

    
/**
     * Connects to a remote JxtaServerPipe
     *
     * 
@param group       group context
     * 
@param peerid      peer to connect to
     * 
@param pipeAd      PipeAdvertisement
     * 
@param timeout     timeout in ms, also reset object default timeout to that of timeout
     * 
@param msgListener application PipeMsgListener
     * 
@param reliable    Reliable connection
     * 
@throws IOException if an io error occurs
     
*/
    
public void connect(PeerGroup group, PeerID peerid, PipeAdvertisement pipeAd, int timeout, PipeMsgListener msgListener, boolean reliable) throws IOException
    {
        
this.peerid = peerid;
        
this.pipeAdv = pipeAd;
        
this.group = group;
        
this.msgListener = msgListener;
        
this.isReliable = reliable;

        
if (isBound())
        {
            
throw new IOException("Pipe already bound");
        }
        
if (timeout <= 0)
        {
            
throw new IllegalArgumentException("Invalid timeout :" + timeout);
        }

        initDeferredMessenger(pipeAd);
        createRLib();
//Can create early now with deferred messenger
        if (isBound())
        {
            
throw new IOException("Pipe already bound");
        }
        
if (timeout <= 0)
        {
            
throw new IllegalArgumentException("Invalid timeout :" + timeout);
        }
        waiting 
= true;
        if (msgListener == null)
        {
            
throw new IllegalArgumentException("Must use with a message listener");
        }

        pipeSvc 
= this.group.getPipeService();
        
this.timeout = (timeout == 0? Integer.MAX_VALUE : timeout;
        
if (myPipeAdv == null)
        {
            myPipeAdv 
= JxtaServerPipe.newInputPipe(group, pipeAd);
            
this.inputPipe = pipeSvc.createInputPipe(myPipeAdv, pipeMsgListener);
        }
        
new RetryingOutputPipeConnect(this);

    }

 以上这4个connect方法只有第4个执行了实际的操作,前3个只是提供了一些便利的调用。

从黄色底色的代码可以看出,所有的connect方法都要求必须提供PipeMsgListener,然而前两个connect方法只是用了null值,因此调用前两个connect方法必然会抛出异常IllegalArgumentException。
----------------------------------------------------------------------------------------------------------------------

再看下面这个方法

    /**
     * Sets message listener for a pipe spawned by the JxtaServerPipe.
     * There is a window where a message could arrive prior to listener being
     * registered therefore a message queue is created to queue messages, once
     * a listener is registered these messages will be dequeued by calling the
     * listener until the queue is empty.
     * <p/>
     * Sending messages vis {
@link #sendMessage(Message)} from within a
     * {
@code PipeMsgListener} may result in a deadlock due to contention
     * between the sending and receiving portions of BiDi pipes.
     *
     * 
@param msgListener New value of property listener.
     
*/
    
public void setMessageListener(PipeMsgListener msgListener) {
        BlockingQueue
<PipeMsgEvent> drainQueue = null;
        
synchronized (this) {
            
this.msgListener = msgListener;

            
if (null != msgListener) {
                drainQueue 
= queue;
                queue 
= null;
            } 
else {
                queue 
= new ArrayBlockingQueue<PipeMsgEvent>(windowSize);
            }
        }

        
if (null != drainQueue) {
            
while (!drainQueue.isEmpty()) {
                PipeMsgEvent event 
= drainQueue.poll();

                
if (null != event) {
                    push(event);
                }
            }
        }
    }

 从第一行注释中可以看出,该方法是用于由JxtaServerPipe生成的实例(即通过accept方法返回的)来设置消息监听器。

而像这种主动连接(相当于客户端)的pipe,不应该调用该方法。

那么这种主动连接的pipe设置消息监听器就只用通过构造函数(实际上是在构造函数中调用了connect方法)或调用connect方法,这样的限制会导致很多时候处理起来不够灵活。

原文地址:https://www.cnblogs.com/cuizhf/p/2182389.html