Actor模型 异步方法调用,伪代码

#ifdef _CODEPASTE
class Message {
};
class MQ_Servant
{
public:
    MQ_Servant(size_t mq_size);
    // Message queue implementation operations.
    void put_i(const Message &msg);
    Message get_i(void);
    // Predicates.
    bool empty_i(void) const;
    bool full_i(void) const;
private:
    // Internal Queue representation, e.g., a
    // circular array or a linked list, etc.
};
class Message_Future{
public:
    // Copy constructor binds <this> and <f> to the
    // same <Message_Future_Rep>, which is created if
    // necessary.
    Message_Future(const Message_Future &f);
    // Constructor that initializes <Message_Future> to
    // point to <Message> <m> immediately.
    Message_Future(const Message &m);
    // Assignment operator that binds <this> and <f>
    // to the same <Message_Future_Rep>, which is
    // created if necessary.
    void operator= (const Message_Future &f);
    // ... other constructors/destructors, etc.,
    // Type conversion, which block
    // waiting to obtain the result of the
// asynchronous method invocation.
    operator Message ();
};
class Consumer_Handler
{
public:
    Consumer_Handler(void);
    // Put the message into the queue.
    void put(const Message &msg) {
        message_queue_.put(msg);
    }
private:
    // Proxy to the Active Object.
    MQ_Proxy message_queue_;
    // Connection to the remote consumer.
    SOCK_Stream connection_;
    // Entry point into the new thread.
    static void *svc_run(void *arg);
};
Consumer_Handler::Consumer_Handler(void)
{
    // Spawn a separate thread to get messages
    // from the message queue and send them to
    // the consumer.
    Thread_Manager::instance()->spawn(svc_run,
        this);
}
void *
Consumer_Handler::svc_run(void *args)
{
    Consumer_Handler *this_obj =
        reinterpret_cast<Consumer_Handler *> (args);
    for (;;) {
        // Conversion of Message_Future from the
        // get() method into a Message causes the
        // thread to block until a message is
        // available.
        Message msg = this_obj->message_queue_.get();
        // Transmit message to the consumer.
        this_obj->connection_.send(msg);
    }
}
Supplier_Handler::route_message(const Message &msg)
{
    // Locate the appropriate consumer based on the
    // address information in the Message.
    Consumer_Handler *ch =
        routing_table_.find(msg.address());
    // Put the Message into the Consumer Handler’s queue.
    ch->put(msg);
};
class MQ_Scheduler
{
public:
    // Initialize the Activation_Queue to have the
    // specified capacity and make the Scheduler
    // run in its own thread of control.
    MQ_Scheduler(size_t high_water_mark)
        : act_queue_(new Activation_Queue
        (high_water_mark))
    {
        // Spawn a separate thread to dispatch
        // method requests.
        Thread_Manager::instance()->spawn(svc_run,
            this);
    }
    // ... Other constructors/destructors, etc.,
    // Insert the Method Request into
    // the Activation_Queue. This method
    // runs in the thread of its client, i.e.,
    // in the Proxy’s thread.
    void enqueue(Method_Request *method_request) {
        act_queue_->    (method_request);
    }
    // Dispatch the Method Requests on their Servant
    // in the Scheduler’s thread.
    virtual void dispatch(void) {
        // Iterate continuously in a
// separate thread.
        for (;;) {
            Activation_Queue::iterator i;
            // The iterator’s <begin> call blocks
            // when the <Activation_Queue> is empty.
            for (i = act_queue_->begin();
                i != act_queue_->end();
                i++) {
                // Select a Method Request ‘mr’
                // whose guard evaluates to true.
                Method_Request *mr = *i;
                if (mr->guard()) {
                    // Remove <mr> from the queue first
                    // in case <call> throws an exception.
                    act_queue_->dequeue(mr);
                    mr->call();
                    delete mr;
                }
            }
        }
    }
protected:
    // Queue of pending Method_Requests.
    Activation_Queue *act_queue_;
    // Entry point into the new thread.
    static void *svc_run(void *args) {
        MQ_Scheduler *this_obj =
            reinterpret_cast<MQ_Scheduler *> (args);
        this_obj->dispatch();
    }
};

class Method_Request
{
public:
    // Evaluate the synchronization constraint.
    virtual bool guard(void) const = 0;
    // Implement the method.
    virtual void call(void) = 0;
};
class Put : public Method_Request
{
public:
    Put(MQ_Servant *rep,
        Message arg)
        : servant_(rep), arg_(arg) {}
    virtual bool guard(void) const { // Synchronization constraint: only allow
// <put_i> calls when the queue is not full.
        return !servant_->full_i();
    }
    virtual void call(void) {
        // Insert message into the servant.
        servant_->put_i(arg_);
    }
private:
    MQ_Servant *servant_;
    Message arg_;
};
class Get : public Method_Request
{
public:
    Get(MQ_Servant *rep,
        const Message_Future &f)
        : servant_(rep), result_(f) {}
    bool guard(void) const {
        // Synchronization constraint:
        // cannot call a <get_i> method until
        // the queue is not empty.
        return !servant_->empty_i();
    }
    virtual void call(void) {
        // Bind the dequeued message to the
        // future result object.
        result_ = servant_->get_i();
    }
private:
    MQ_Servant *servant_;
    // Message_Future result value.
    Message_Future result_;
};
class Activation_Queue
{
public:
    // Block for an "infinite" amount of time
    // waiting for <enqueue> and <dequeue> methods
    // to complete.
    const int INFINITE = -1;
    // Define a "trait".
    typedef Activation_Queue_Iterator
        iterator;
    // Constructor creates the queue with the
    // specified high water mark that determines
    // its capacity.
    Activation_Queue(size_t high_water_mark);
    // Insert <method_request> into the queue, waiting
    // up to <msec_timeout> amount of time for space
    // to become available in the queue.
    void enqueue(Method_Request *method_request,
        long msec_timeout = INFINITE);
    // Remove <method_request> from the queue, waiting
    // up to <msec_timeout> amount of time for a
    // <method_request> to appear in the queue.
    void dequeue(Method_Request *method_request,
        long msec_timeout = INFINITE);
private:
    // Synchronization mechanisms, e.g., condition
    // variables and mutexes, and the queue
    // implementation, e.g., an array or a linked
    // list, go here.
    // ...
};
class MQ_Proxy
{
public:
    // Bound the message queue size.
    enum { MAX_SIZE = 100 };
    MQ_Proxy(size_t size = MAX_SIZE)
        : scheduler_(new MQ_Scheduler(size)),
        servant_(new MQ_Servant(size)) {}
    // Schedule <put> to execute on the active object.
    void put(const Message &m) {
        Method_Request *method_request = new Put(servant_, m);
        scheduler_->enqueue(method_request);
    }
    // Return a Message_Future as the ‘‘future’’
    // result of an asynchronous <get>
    // method on the active object.
    Message_Future get(void) {
        Message_Future result;
        Method_Request *method_request = new Get(servant_, result);
        scheduler_->enqueue(method_request);
        return result;
    }
    // ... empty() and full() predicate implementations ...
protected:
    // The Servant that implements the
    // Active Object methods.
    MQ_Servant *servant_;
    // A scheduler for the Message Queue.
    MQ_Scheduler *scheduler_;
};
#endif
原文地址:https://www.cnblogs.com/xuyouzhu/p/15214187.html