gRPC C++ 异步使用入门
这篇文章将讲述如何使用 gRPC 异步/非阻塞 C++ API 编写一个简单的服务端(server)和客户端(client)。在读取这篇文章之前,你需要先了解 Protocol Buffer 和 gRPC 基础,你可在本博客搜索到它们的相关文章。
这篇文章将围绕 gRPC 的官方例子 Greeter 展开学习,你可在 grpc/examples/protos/helloworld.proto
中查看服务的定义,在 grpc/examples/cpp/helloworld/
中查看完整代码。
概览
gRPC 使用 CompletionQueue API 进行异步操作。基本工作流程如下:
- 在 RPC 调用上绑定一个
CompletionQueue
- 做一些事情,例如读或写这样的事情,用一个唯一的
void*
标记表示 - 调用
CompletionQueue::Next
等待操作完成,如果返回之前的一个标记,则表示对应的操作已经完成。
异步客户端
要使用异步客户端调用远程方法,首先要创建 channel 和 stub,这个过程和 grpc/examples/cpp/helloworld/greeter_client.cc
中差不多(同步示例)。一旦你创建了 stub,就可以做以下事情来进行异步调用了:
初始化 RPC 并为其创建句柄。将 RPC 绑定到一个
CompletionQueue
上。CompletionQueue cq; std::unique_ptr<ClientAsyncResponseReader<HelloReply> > rpc( stub_->AsyncSayHello(&context, request, &cq));
使用一个唯一的标记(这里用的是
(void*)1
),请求回复和最终状态Status status; rpc->Finish(&reply, &status, (void*)1);
等待完成队列返回标记(tag)。一旦返回了之前对应
Finish()
函数中传递的标记,应答和状态就可以被返回了。void* got_tag; bool ok = false; cq.Next(&got_tag, &ok); if (ok && got_tag == (void*)1) { // check reply and status }
你可以在 grpc/examples/cpp/helloworld/greeter_async_client.cc
中看到完整的客户端示例。
异步服务端
服务器实现一个带有标记(tag)的 RPC 调用请求,然后等待完成队列返回标记。异步处理 RPC 的基本流程是:
构建一个 server 并导出异步服务
helloworld::Greeter::AsyncService service; ServerBuilder builder; builder.AddListeningPort("0.0.0.0:50051", InsecureServerCredentials()); builder.RegisterAsyncService(&service); auto cq = builder.AddCompletionQueue(); auto server = builder.BuildAndStart();
请求一个 RPC,并提供唯一的标记(tag)
ServerContext context; HelloRequest request; ServerAsyncResponseWriter<HelloReply> responder; service.RequestSayHello(&context, &request, &responder, &cq, &cq, (void*)1);
等待完成队列返回标记。一旦检索到标记,context 、request 和 responder 就准备好了。
HelloReply reply; Status status; void* got_tag; bool ok = false; cq.Next(&got_tag, &ok); if (ok && got_tag == (void*)1) { // set reply and status responder.Finish(reply, status, (void*)2); }
等待完成队列返回标记。RPC 在标记返回时完成。
void* got_tag; bool ok = false; cq.Next(&got_tag, &ok); if (ok && got_tag == (void*)2) { // clean up }
但是,这个基本流程没有考虑到服务端同时处理多个请求。要解决这个问题,完整的异步服务端示例使用一个CallData
对象来维护每个 RPC 的状态,并使用该对象的地址作为调用的唯一标记。
class CallData {
public:
// Take in the "service" instance (in this case representing an asynchronous
// server) and the completion queue "cq" used for asynchronous communication
// with the gRPC runtime.
CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
: service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {
// Invoke the serving logic right away.
Proceed();
}
void Proceed() {
if (status_ == CREATE) {
// As part of the initial CREATE state, we *request* that the system
// start processing SayHello requests. In this request, "this" acts are
// the tag uniquely identifying the request (so that different CallData
// instances can serve different requests concurrently), in this case
// the memory address of this CallData instance.
service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
this);
// Make this instance progress to the PROCESS state.
status_ = PROCESS;
} else if (status_ == PROCESS) {
// Spawn a new CallData instance to serve new clients while we process
// the one for this CallData. The instance will deallocate itself as
// part of its FINISH state.
new CallData(service_, cq_);
// The actual processing.
std::string prefix("Hello ");
reply_.set_message(prefix + request_.name());
// And we are done! Let the gRPC runtime know we've finished, using the
// memory address of this instance as the uniquely identifying tag for
// the event.
responder_.Finish(reply_, Status::OK, this);
status_ = FINISH;
} else {
GPR_ASSERT(status_ == FINISH);
// Once in the FINISH state, deallocate ourselves (CallData).
delete this;
}
}
}
为简单起见,服务端对所有事件只使用一个完成队列,并在 HandleRpcs
中运行一个主循环来查询队列:
void HandleRpcs() {
// Spawn a new CallData instance to serve new clients.
new CallData(&service_, cq_.get());
void* tag; // uniquely identifies a request.
bool ok;
while (true) {
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
cq_->Next(&tag, &ok);
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}
关闭服务端
我们在使用一个完成队列来获取异步通知,在服务端被关闭之后,也必须小心地关闭它。
记住,我们在 ServerImpl::Run()
函数中通过运行 cq_ = builder.AddCompletionQueue()
来获得完成队列实例cq_
,看看 ServerBuilder::AddCompletionQueue
的文档,我们可以看到:
… Caller is required to shutdown the server prior to shutting down the returned completion queue.
有关详细信息,请参考 ServerBuilder::AddCompletionQueue
的完整文档字符串。在我们的示例中,ServerImpl
的析构函数如下所示:
~ServerImpl() {
server_->Shutdown();
// Always shutdown the completion queue after the server.
cq_->Shutdown();
}
你可以在 grpc/examples/cpp/helloworld/greeter_async_server.cc
中看到完整的服务端示例。
实现多个服务
前面官方提到的示例中只实现了一个 SayHello
RPC 服务,如果想要实现多个 RPC服务该怎么办呢?下面的将讲述如何对示例中的代码进行修改,使他再支持一个名为 SayBye
的服务。
这个方法就是为每个 RPC 服务都实现一个不同的 CallData
类。但是,当你从 cq_->Next()
获取标记时,你知道它是指向这些类之一的对象的指针,但是你不知道它的确切类型。
为了克服这个问题,你可以让它们都继承一个具有 virtual Proceed()
成员函数的类,再根据需要在每个子类中实现它,当您获得一个标记时,将其转换为 CallData
并调用 Proceed()
。
class CallData {
public:
virtual void Proceed() = 0;
};
class HelloCallData final : public CallData {...};
class ByeCallData final : public CallData {...};
...
new HelloCallData(...);
new ByeCallData(...);
cq_->Next(&tag, &ok);
static_cast<CallData*>(tag)->Proceed();
...
多线程
对于如何在多线程中使用异步 RPC API 完成队列,官方的的文档说明是:
Right now, the best performance trade-off is having numcpu's threads and one completion queue per thread.
当前,最好的权衡性能的方法是使用创建 cpu 个数的线程数,并在每个线程中都使用一个完成队列。