GRPC异步双向流处理的流程伪代码

摘抄自stackflow:

https://stackoverflow.com/questions/67784384/c-grpc-clientasyncreaderwriter-how-to-check-if-data-is-available-for-read

// Base class for async bidir RPCs handlers. 
// This is so that the handling thread is not associated with a specific rpc method.
class RpcHandler {
  // This will be used as the "tag" argument to the various grpc calls.
  struct TagData {
    enum class Type {
      start_done,
      read_done,
      write_done,
      // add more as needed...
    };

    RpcHandler* handler;
    Type evt;
  };

  struct TagSet {
    TagSet(RpcHandler* self)
        : start_done{self, TagData::Type::start_done},
          read_done{self, TagData::Type::read_done},
          write_done{self, TagData::Type::write_done} {}
    TagData start_done;
    TagData read_done;
    TagData write_done;
  };

 public:
  RpcHandler() : tags(this) {}

  virtual ~RpcHandler() = default;

  // The actual tag objects we'll be passing
  TagSet tags;

  virtual void on_ready() = 0;
  virtual void on_recv() = 0;
  virtual void on_write_done() = 0;

  static void handling_thread_main(grpc::CompletionQueue* cq) {
    void* raw_tag = nullptr;
    bool ok = false;

    while (cq->Next(&raw_tag, &ok)) {
      TagData* tag = reinterpret_cast<TagData*>(raw_tag);
      if(!ok) {
        // Handle error
      }
      else {
        switch (tag->evt) {
          case TagData::Type::start_done:
            tag->handler->on_ready();
            break;
          case TagData::Type::read_done:
            tag->handler->on_recv();
            break;
          case TagData::Type::write_done:
            tag->handler->on_write_done();
            break;
        }
      }
    }
  }
};

void do_something_with_response(Response const&);

class MyHandler final : public RpcHandler {
 public:
  using responder_ptr =
      std::unique_ptr<grpc::ClientAsyncReaderWriter<Request, Response>>;

  MyHandler(responder_ptr responder) : responder_(std::move(responder)) {
    // This lock is needed because StartCall() can
    // cause the handler thread to access the object.
    std::lock_guard lock(mutex_);

    responder_->StartCall(&tags.start_done);
  }

  ~MyHandler() {
    // TODO: finish/abort the streaming rpc as appropriate.
  }

  void send(const Request& msg) {
    std::lock_guard lock(mutex_);
    if (!sending_) {
      sending_ = true;
      responder_->Write(msg, &tags.write_done);
    } else {
      // TODO: add some form of synchronous wait, or outright failure
      // if the queue starts to get too big.
      queued_msgs_.push(msg);
    }
  }

 private:
  // When the rpc is ready, queue the first read
  void on_ready() override {
    std::lock_guard l(mutex_);  // To synchronize with the constructor
    responder_->Read(&incoming_, &tags.read_done);
  };

  // When a message arrives, use it, and start reading the next one
  void on_recv() override {
    // incoming_ never leaves the handling thread, so no need to lock

    // ------ If handling is cheap and stays in the handling thread.
    do_something_with_response(incoming_);
    responder_->Read(&incoming_, &tags.read_done);

    // ------ If responses is expensive or involves another thread.
    // Response msg = std::move(incoming_); 
    // responder_->Read(&incoming_, &tags.read_done); 
    // do_something_with_response(msg);
  };

  // When has been sent, send the next one is there is any
  void on_write_done() override {
    std::lock_guard lock(mutex_);
    if (!queued_msgs_.empty()) {
      responder_->Write(queued_msgs_.front(), &tags.write_done);
      queued_msgs_.pop();
    } else {
      sending_ = false;
    }
  };

  responder_ptr responder_;

  // Only ever touched by the handler thread post-construction.
  Response incoming_;

  bool sending_ = false;
  std::queue<Request> queued_msgs_;

  std::mutex mutex_;  // grpc might be thread-safe, MyHandler isn't...
};

int main() {
  // Start the thread as soon as you have a completion queue.
  auto cq = std::make_unique<grpc::CompletionQueue>();
  std::thread t(RpcHandler::handling_thread_main, cq.get());

  // Multiple concurent RPCs sharing the same handling thread:
  MyHandler handler1(serviceA->MethodA(&context, cq.get()));
  MyHandler handler2(serviceA->MethodA(&context, cq.get()));
  MyHandlerB handler3(serviceA->MethodB(&context, cq.get()));
  MyHandlerC handler4(serviceB->MethodC(&context, cq.get()));
}
原文地址:https://www.cnblogs.com/judes/p/15561681.html