这篇文章将讲述如何使用 gRPC 异步/非阻塞 C++ API 编写一个简单的服务端(server)和客户端(client)。在读取这篇文章之前,你需要先了解 Protocol Buffer 和 gRPC 基础,你可在本博客搜索到它们的相关文章。

这篇文章将围绕 gRPC 的官方例子 Greeter 展开学习,你可在 grpc/examples/protos/helloworld.proto 中查看服务的定义,在 grpc/examples/cpp/helloworld/ 中查看完整代码。

概览

gRPC 使用 CompletionQueue API 进行异步操作。基本工作流程如下:

  • 在 RPC 调用上绑定一个 CompletionQueue
  • 做一些事情,例如读或写这样的事情,用一个唯一的 void* 标记表示
  • 调用 CompletionQueue::Next 等待操作完成,如果返回之前的一个标记,则表示对应的操作已经完成。

异步客户端

要使用异步客户端调用远程方法,首先要创建 channel 和 stub,这个过程和 grpc/examples/cpp/helloworld/greeter_client.cc 中差不多(同步示例)。一旦你创建了 stub,就可以做以下事情来进行异步调用了:

  • 初始化 RPC 并为其创建句柄。将 RPC 绑定到一个 CompletionQueue 上。

    CompletionQueue cq;
    std::unique_ptr<ClientAsyncResponseReader<HelloReply> > rpc(
        stub_->AsyncSayHello(&context, request, &cq)); 
  • 使用一个唯一的标记(这里用的是 (void*)1),请求回复和最终状态

    Status status;
    rpc->Finish(&reply, &status, (void*)1);
  • 等待完成队列返回标记(tag)。一旦返回了之前对应 Finish() 函数中传递的标记,应答和状态就可以被返回了。

    void* got_tag;
    bool ok = false;
    cq.Next(&got_tag, &ok);
    if (ok && got_tag == (void*)1) {
        // check reply and status
    }

你可以在 grpc/examples/cpp/helloworld/greeter_async_client.cc 中看到完整的客户端示例。

异步服务端

服务器实现一个带有标记(tag)的 RPC 调用请求,然后等待完成队列返回标记。异步处理 RPC 的基本流程是:

  • 构建一个 server 并导出异步服务

    helloworld::Greeter::AsyncService service;
    ServerBuilder builder;
    builder.AddListeningPort("0.0.0.0:50051", InsecureServerCredentials());
    builder.RegisterAsyncService(&service);
    auto cq = builder.AddCompletionQueue();
    auto server = builder.BuildAndStart();
  • 请求一个 RPC,并提供唯一的标记(tag)

    ServerContext context;
    HelloRequest request;
    ServerAsyncResponseWriter<HelloReply> responder;
    service.RequestSayHello(&context, &request, &responder, &cq, &cq, (void*)1);
  • 等待完成队列返回标记。一旦检索到标记,context 、request 和 responder 就准备好了。

    HelloReply reply;
    Status status;
    void* got_tag;
    bool ok = false;
    cq.Next(&got_tag, &ok);
    if (ok && got_tag == (void*)1) {
        // set reply and status
        responder.Finish(reply, status, (void*)2);
    }
  • 等待完成队列返回标记。RPC 在标记返回时完成。

    void* got_tag;
    bool ok = false;
    cq.Next(&got_tag, &ok);
    if (ok && got_tag == (void*)2) {
        // clean up
    }

但是,这个基本流程没有考虑到服务端同时处理多个请求。要解决这个问题,完整的异步服务端示例使用一个CallData 对象来维护每个 RPC 的状态,并使用该对象的地址作为调用的唯一标记。

class CallData {
    public:
    // Take in the "service" instance (in this case representing an asynchronous
    // server) and the completion queue "cq" used for asynchronous communication
    // with the gRPC runtime.
    CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq) 
        : service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {
        // Invoke the serving logic right away.
        Proceed();
    }

    void Proceed() {
        if (status_ == CREATE) {
            // As part of the initial CREATE state, we *request* that the system
            // start processing SayHello requests. In this request, "this" acts are
            // the tag uniquely identifying the request (so that different CallData
            // instances can serve different requests concurrently), in this case
            // the memory address of this CallData instance.
            service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
                                  this);
            // Make this instance progress to the PROCESS state.
            status_ = PROCESS;
        } else if (status_ == PROCESS) {
            // Spawn a new CallData instance to serve new clients while we process
            // the one for this CallData. The instance will deallocate itself as
            // part of its FINISH state.
            new CallData(service_, cq_);

                // The actual processing.
            std::string prefix("Hello ");
            reply_.set_message(prefix + request_.name());

            // And we are done! Let the gRPC runtime know we've finished, using the
            // memory address of this instance as the uniquely identifying tag for
            // the event.
            responder_.Finish(reply_, Status::OK, this);
            status_ = FINISH;
          } else {
            GPR_ASSERT(status_ == FINISH);
            // Once in the FINISH state, deallocate ourselves (CallData).
            delete this;
        }
    }
}

为简单起见,服务端对所有事件只使用一个完成队列,并在 HandleRpcs 中运行一个主循环来查询队列:

void HandleRpcs() {
    // Spawn a new CallData instance to serve new clients.
    new CallData(&service_, cq_.get());
    void* tag;  // uniquely identifies a request.
    bool ok;
    while (true) {
        // Block waiting to read the next event from the completion queue. The
        // event is uniquely identified by its tag, which in this case is the
        // memory address of a CallData instance.
          cq_->Next(&tag, &ok);
          GPR_ASSERT(ok);
          static_cast<CallData*>(tag)->Proceed();
    }
}

关闭服务端

我们在使用一个完成队列来获取异步通知,在服务端被关闭之后,也必须小心地关闭它。

记住,我们在 ServerImpl::Run() 函数中通过运行 cq_ = builder.AddCompletionQueue() 来获得完成队列实例cq_ ,看看 ServerBuilder::AddCompletionQueue 的文档,我们可以看到:

… Caller is required to shutdown the server prior to shutting down the returned completion queue.

有关详细信息,请参考 ServerBuilder::AddCompletionQueue 的完整文档字符串。在我们的示例中,ServerImpl 的析构函数如下所示:

~ServerImpl() {
    server_->Shutdown();
    // Always shutdown the completion queue after the server.
    cq_->Shutdown();
}

你可以在 grpc/examples/cpp/helloworld/greeter_async_server.cc 中看到完整的服务端示例。

实现多个服务

前面官方提到的示例中只实现了一个 SayHello RPC 服务,如果想要实现多个 RPC服务该怎么办呢?下面的将讲述如何对示例中的代码进行修改,使他再支持一个名为 SayBye 的服务。

这个方法就是为每个 RPC 服务都实现一个不同的 CallData 类。但是,当你从 cq_->Next() 获取标记时,你知道它是指向这些类之一的对象的指针,但是你不知道它的确切类型。

为了克服这个问题,你可以让它们都继承一个具有 virtual Proceed() 成员函数的类,再根据需要在每个子类中实现它,当您获得一个标记时,将其转换为 CallData 并调用 Proceed()

class CallData {
 public:
  virtual void Proceed() = 0;
};

class HelloCallData final : public CallData {...};
class ByeCallData final : public CallData {...};

...
new HelloCallData(...);
new ByeCallData(...);
cq_->Next(&tag, &ok);
static_cast<CallData*>(tag)->Proceed();
...

多线程

对于如何在多线程中使用异步 RPC API 完成队列,官方的的文档说明是:

Right now, the best performance trade-off is having numcpu's threads and one completion queue per thread.

当前,最好的权衡性能的方法是使用创建 cpu 个数的线程数,并在每个线程中都使用一个完成队列。

标签: protobuf, Protocol Buffer, gRPC, RPC, 异步

添加新评论