muduo 使用网络库和 Protobuf 来构建 RPC 框架
通过阅读moduo中rpc的示例程序来分析,如何利用一个网络库和Protobuf来构建一个RPC框架
pb generated sources
分析pb生成了哪些源码,思考需要用网络库去实现哪部分来实现通信
sudoku.proto
package sudoku;
option cc_generic_services = true;
option java_generic_services = true;
option py_generic_services = true;
message SudokuRequest {
required string checkerboard = 1;
}
message SudokuResponse {
optional bool solved = 1 [default=false];
optional string checkerboard = 2;
}
service SudokuService {
rpc Solve (SudokuRequest) returns (SudokuResponse);
}
sudoku.proto.h & sudoku.proto.cc
SudokuService
class SudokuService : public ::PROTOBUF_NAMESPACE_ID::Service {
protected:
// This class should be treated as an abstract interface.
inline SudokuService() {};
public:
virtual ~SudokuService();
typedef SudokuService_Stub Stub;
static const ::PROTOBUF_NAMESPACE_ID::ServiceDescriptor* descriptor();
virtual void Solve(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
const ::sudoku::SudokuRequest* request,
::sudoku::SudokuResponse* response,
::google::protobuf::Closure* done);
// implements Service ----------------------------------------------
const ::PROTOBUF_NAMESPACE_ID::ServiceDescriptor* GetDescriptor();
void CallMethod(const ::PROTOBUF_NAMESPACE_ID::MethodDescriptor* method,
::PROTOBUF_NAMESPACE_ID::RpcController* controller,
const ::PROTOBUF_NAMESPACE_ID::Message* request,
::PROTOBUF_NAMESPACE_ID::Message* response,
::google::protobuf::Closure* done);
const ::PROTOBUF_NAMESPACE_ID::Message& GetRequestPrototype(
const ::PROTOBUF_NAMESPACE_ID::MethodDescriptor* method) const;
const ::PROTOBUF_NAMESPACE_ID::Message& GetResponsePrototype(
const ::PROTOBUF_NAMESPACE_ID::MethodDescriptor* method) const;
private:
GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(SudokuService);
};
SudokuService
override 了父类 Service
的纯虚函数
// Abstract base interface for protocol-buffer-based RPC services. Services
// themselves are abstract interfaces (implemented either by servers or as
// stubs), but they subclass this base interface. The methods of this
// interface can be used to call the methods of the Service without knowing
// its exact type at compile time (analogous to Reflection).
class PROTOBUF_EXPORT Service {
public:
inline Service() {}
virtual ~Service();
...
// Get the ServiceDescriptor describing this service and its methods.
virtual const ServiceDescriptor* GetDescriptor() = 0;
virtual void CallMethod(const MethodDescriptor* method,
RpcController* controller, const Message* request,
Message* response, Closure* done) = 0;
virtual const Message& GetRequestPrototype(
const MethodDescriptor* method) const = 0;
virtual const Message& GetResponsePrototype(
const MethodDescriptor* method) const = 0;
...
};
descriptor
每一种具体的 Service
类(SudokuService)或 Method
(Solve) 或 Message
类 (SudokuRequest, SudokuResponse)类都会对应一个全局唯一的 ServiceDescriptor
常量实例或MethodDescriptor
常量实例或Descriptor
常量实例,用来获取该 ConcreteType 的各种信息
ServiceDescriptor
通过GetDescriptor()
这个接口,可以获取到指向SudokuService
对应的全局唯一的ServiceDescriptor
常量实例的指针(const ServiceDescriptor*)
const ::PROTOBUF_NAMESPACE_ID::ServiceDescriptor* SudokuService::GetDescriptor() {
return descriptor();
}
通过ServiceDescriptor
就可以获取service name,以及指向service所包含的methods各自对应的一个全局唯一MethodDescriptor
常量实例的指针(const MethodDescriptor*)
class PROTOBUF_EXPORT ServiceDescriptor {
public:
...
// The name of the service, not including its containing scope.
const std::string& name() const;
// The fully-qualified name of the service, scope delimited by periods.
const std::string& full_name() const;
...
// The number of methods this service defines.
int method_count() const;
// Gets a MethodDescriptor by index, where 0 <= index < method_count().
// These are returned in the order they were defined in the .proto file.
const MethodDescriptor* method(int index) const;
// Look up a MethodDescriptor by name.
const MethodDescriptor* FindMethodByName(ConstStringParam name) const;
...
};
MethodDescriptor
通过MethodDescriptor
可以获取method name,指向该method所属的service所对应的全局唯一的ServiceDescriptor
常量实例的指针(const ServiceDescriptor*),以及指向该method的input message和output message各自所对应的全局唯一的Descriptor
常量实例指针(const Descriptor*)
class PROTOBUF_EXPORT MethodDescriptor {
public:
...
// Name of this method, not including containing scope.
const std::string& name() const;
// The fully-qualified name of the method, scope delimited by periods.
const std::string& full_name() const;
...
// Gets the service to which this method belongs. Never nullptr.
const ServiceDescriptor* service() const;
// Gets the type of protocol message which this method accepts as input.
const Descriptor* input_type() const;
// Gets the type of protocol message which this message produces as output.
const Descriptor* output_type() const;
...
};
Descriptor
method
Solve
是SudokuService
中唯一的method,它是个虚函数,需要业务来继承SudokuService
这个类并且 override Solve
方法,来实现业务逻辑(接受 request,set response)。done->Run()
(Closure
是个纯虚类,需要RPC框架来实现)的时候应该要能把response序列化后返回给client(返回RPC响应报文)。
class SudokuServiceImpl : public SudokuService
{
public:
void Solve(::google::protobuf::RpcController* controller,
const ::sudoku::SudokuRequest* request,
::sudoku::SudokuResponse* response,
::google::protobuf::Closure* done) override
{
LOG_INFO << "SudokuServiceImpl::Solve";
response->set_solved(true);
response->set_checkerboard("1234567");
done->Run();
}
};
class PROTOBUF_EXPORT Closure {
public:
Closure() {}
virtual ~Closure();
virtual void Run() = 0;
private:
GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(Closure);
};
CallMethod
传入一个MethodDescriptor
和该 method 的input Message
,output Message
。根据method->index()
来switch,调用真正的 method,在调用之前,先把input Message
,output Message
强转成真正的input类型和output类型
void SudokuService::CallMethod(const ::PROTOBUF_NAMESPACE_ID::MethodDescriptor* method,
::PROTOBUF_NAMESPACE_ID::RpcController* controller,
const ::PROTOBUF_NAMESPACE_ID::Message* request,
::PROTOBUF_NAMESPACE_ID::Message* response,
::google::protobuf::Closure* done) {
GOOGLE_DCHECK_EQ(method->service(), file_level_service_descriptors_sudoku_2eproto[0]);
switch(method->index()) {
case 0:
Solve(controller,
::PROTOBUF_NAMESPACE_ID::internal::DownCast<const ::sudoku::SudokuRequest*>(
request),
::PROTOBUF_NAMESPACE_ID::internal::DownCast<::sudoku::SudokuResponse*>(
response),
done);
break;
default:
GOOGLE_LOG(FATAL) << "Bad method index; this should never happen.";
break;
}
}
GetRequest\ResponsePrototype
每一种具体的Message
类(SudokuRequest
,SudokuResponse
),都会有一个全局唯一的常量实例,通过静态函数default_instance()
获取。这个全局唯一的常量实例叫做具体的Message
类的Prototype(很形象,原型机)
GetRequestPrototype
和GetResponsePrototype
接受一个MethodDescriptor
,根据method->index()
来switch,返回该method 的 input Message
的Prototype,和 output Message
的Prototype
const ::PROTOBUF_NAMESPACE_ID::Message& SudokuService::GetRequestPrototype(
const ::PROTOBUF_NAMESPACE_ID::MethodDescriptor* method) const {
GOOGLE_DCHECK_EQ(method->service(), descriptor());
switch(method->index()) {
case 0:
return ::sudoku::SudokuRequest::default_instance();
default:
GOOGLE_LOG(FATAL) << "Bad method index; this should never happen.";
return *::PROTOBUF_NAMESPACE_ID::MessageFactory::generated_factory()
->GetPrototype(method->input_type());
}
}
const ::PROTOBUF_NAMESPACE_ID::Message& SudokuService::GetResponsePrototype(
const ::PROTOBUF_NAMESPACE_ID::MethodDescriptor* method) const {
GOOGLE_DCHECK_EQ(method->service(), descriptor());
switch(method->index()) {
case 0:
return ::sudoku::SudokuResponse::default_instance();
default:
GOOGLE_LOG(FATAL) << "Bad method index; this should never happen.";
return *::PROTOBUF_NAMESPACE_ID::MessageFactory::generated_factory()
->GetPrototype(method->output_type());
}
}
基类Message
有纯虚函数New()
,具体的Message
类(SudokuRequest
,SudokuResponse
)overrideNew()
函数,返回一个具体的Message
类实例。这样我们就可以不用管Message
具体是什么类,而生成一个具体的Message
类实例
class PROTOBUF_EXPORT Message : public MessageLite {
public:
constexpr Message() {}
// Basic Operations ------------------------------------------------
// Construct a new instance of the same type. Ownership is passed to the
// caller. (This is also defined in MessageLite, but is defined again here
// for return-type covariance.)
Message* New() const override = 0;
...
}
我们需要一个什么样的RpcServer?
int main()
{
LOG_INFO << "pid = " << getpid();
EventLoop loop;
InetAddress listenAddr(9981);
sudoku::SudokuServiceImpl impl;
RpcServer server(&loop, listenAddr);
server.registerService(&impl);
server.start();
loop.loop();
google::protobuf::ShutdownProtobufLibrary();
}
RpcServer
应该可以注册各种Service
。
在收到一个完整的RPC请求报文后,首先解析出Service的名字和Method的名字
- 通过Service名字找到被注册的Service类,
- 调用
ServiceDescriptor* Service::GetDescriptor()
获取ServiceDescriptor
, - 调用
MethodDescriptor* ServiceDescriptor::FindMethodByName
获取MethodDescriptor
, - 调用
GetRequestPrototype
,GetResponsePrototype
获取 Resuqest 和 Response 的 Prototype - 各自New一个 Resuqest 和 Response 的实例
- 从请求报文中,反序列化出Resuqest
- 最后调用
Service::CallMethod
,执行业务逻辑 done->Run()
的时候应该要能把response序列化后返回给client(返回RPC响应报文)
SudokuService_Stub
class SudokuService_Stub : public SudokuService {
public:
SudokuService_Stub(::PROTOBUF_NAMESPACE_ID::RpcChannel* channel);
SudokuService_Stub(::PROTOBUF_NAMESPACE_ID::RpcChannel* channel,
::PROTOBUF_NAMESPACE_ID::Service::ChannelOwnership ownership);
~SudokuService_Stub();
inline ::PROTOBUF_NAMESPACE_ID::RpcChannel* channel() { return channel_; }
// implements SudokuService ------------------------------------------
void Solve(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
const ::sudoku::SudokuRequest* request,
::sudoku::SudokuResponse* response,
::google::protobuf::Closure* done);
private:
::PROTOBUF_NAMESPACE_ID::RpcChannel* channel_;
bool owns_channel_;
GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(SudokuService_Stub);
};
SudokuService_Stub
的构造函数接受一个RpcChannel
,如果owns_channel_ == true
,则需要在析构函数中 delete RpcChannel
method
Solve是SudokuService_Stub中唯一的method,这个函数即是client向server发起一个RPC请求的入口。如果done != nullptr
,那说明这次RPC调用是异步的,这个done里包含了回调函数
void SudokuService_Stub::Solve(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
const ::sudoku::SudokuRequest* request,
::sudoku::SudokuResponse* response,
::google::protobuf::Closure* done) {
channel_->CallMethod(descriptor()->method(0),
controller, request, response, done);
}
里面实际上是调用RpcChannel
的CallMethod
接口,因为Solve是SudokuService的第一个method,所以是descriptor()->method(0)
RpcChannel
RpcChannel
是一个纯虚类,需要RPC框架来实现
class PROTOBUF_EXPORT RpcChannel {
public:
inline RpcChannel() {}
virtual ~RpcChannel();
// Call the given method of the remote service. The signature of this
// procedure looks the same as Service::CallMethod(), but the requirements
// are less strict in one important way: the request and response objects
// need not be of any specific class as long as their descriptors are
// method->input_type() and method->output_type().
virtual void CallMethod(const MethodDescriptor* method,
RpcController* controller, const Message* request,
Message* response, Closure* done) = 0;
private:
GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(RpcChannel);
};
CallMethod
接受一个MethodDescriptor
和该method的input Message
,output Message
,以及回调函数done,发起RPC调用
我们需要一个什么样的RpcChannel?
void solved(sudoku::SudokuResponse* resp)
{
LOG_INFO << "event loop thread, thread id = " << gettid();
LOG_INFO << "solved:\n" << resp->DebugString();
}
int main(int argc, char* argv[])
{
LOG_INFO << "pid = " << getpid();
if (argc > 1)
{
EventLoopThread loopThread;
InetAddress serverAddr(argv[1], 9981);
sudoku::SudokuRequest request;
request.set_checkerboard("001010");
sudoku::SudokuResponse* response = new sudoku::SudokuResponse;
RpcChannelPtr channel(new RpcChannel(loopThread.startLoop(), serverAddr));
sudoku::SudokuService_Stub stub(channel.get());
stub.Solve(NULL, &request, response, NewCallback(solved, response));
LOG_INFO << "main thread, thread id = " << gettid() << " is going to sleep";
CurrentThread::sleepUsec(3000*1000);
LOG_INFO << "main thread, thread id = " << gettid() << " is going to exit";
}
else
{
printf("Usage: %s host_ip\n", argv[0]);
}
google::protobuf::ShutdownProtobufLibrary();
}
在RpcChannel::CallMethod
中
- 调用
MethodDescriptor::name()
获取method名字 - 调用
MethodDescriptor::service()::name()
获取service名字 - 序列化request
- 将上面三步的结果组成RPC请求报文并发送
- (有回调,异步RPC调用),那么应该把这个回调函数注册在某个地方,
CallMethod
立刻返回。等到网络库收到完整RPC响应报文后再调用回调
RpcController?
RpcServer
class RpcServer
{
public:
RpcServer(EventLoop* loop,
const InetAddress& listenAddr);
...
void registerService(::google::protobuf::Service*)
{
const google::protobuf::ServiceDescriptor* desc = service->GetDescriptor();
services_[desc->full_name()] = service;
}
void start()
{
server_.start();
}
private:
struct doneCallbackArgs
{
doneCallbackArgs(::google::protobuf::Message* r,
int64_t i, const TcpConnectionPtr c)
: response(r), id(i), conn(c) {}
::google::protobuf::Message* response;
int64_t id;
const TcpConnectionPtr conn;
};
void onConnection(const TcpConnectionPtr& conn)
{
LOG_INFO << "RpcServer - " << conn->peerAddress().toIpPort() << " -> "
<< conn->localAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");
}
void onRpcMessage(const TcpConnectionPtr& conn,
const RpcMessagePtr& messagePtr,
Timestamp receiveTime);
void doneCallback(const doneCallbackArgs args);
TcpServer server_;
std::map<std::string, ::google::protobuf::Service*> services_;
RpcCodec codec_;
};
RpcServer
的核心成员是一个TcpServer
用来通信,和一个std::map<std::string, ::google::protobuf::Service*>
建立 service_name 到注册过的具体 service 类的映射。RpcCodec codec_
属于一个无状态的工具类,负责解析出完整的TCP请求报文和发送一个完整的TCP响应报文
当有东西从建立的这个TcpConnection
吐出来后会调RpcCodec::onMessage
先看一下,codec_
在RpcServer
中是怎么初始化的
RpcServer::RpcServer(EventLoop* loop,
const InetAddress& listenAddr)
: server_(loop, listenAddr, "RpcServer"),
codec_(std::bind(&RpcServer::onRpcMessage, this, _1, _2, _3))
{
server_.setConnectionCallback(
std::bind(&RpcServer::onConnection, this, _1));
server_.setMessageCallback(
std::bind(&RpcCodec::onMessage, &codec_, _1, _2, _3));
}
所以可以看出codec_
是用来处理TCP的分包(因为一有东西从连接里出来就会调RpcCodec::onMessage
),等收到一个完整的TCP报文的时候再回调RpcServer::onRpcMessage
我们先不去看codec_
内部,而是先去走完收到一个完整的TCP报文之后的流程
一个完整的TCP报文实际上也是一个Protobuf Message
enum MessageType
{
REQUEST = 1;
RESPONSE = 2;
ERROR = 3; // not used
}
enum ErrorCode
{
NO_ERROR = 0;
WRONG_PROTO = 1;
NO_SERVICE = 2;
NO_METHOD = 3;
INVALID_REQUEST = 4;
INVALID_RESPONSE = 5;
TIMEOUT = 6;
}
message RpcMessage
{
required MessageType type = 1;
required fixed64 id = 2;
optional string service = 3;
optional string method = 4;
optional bytes request = 5;
optional bytes response = 6;
optional ErrorCode error = 7;
}
- 通过Service名字找到被注册的Service类,
- 调用
ServiceDescriptor* Service::GetDescriptor()
获取ServiceDescriptor
, - 调用
MethodDescriptor* ServiceDescriptor::FindMethodByName
获取MethodDescriptor
, - 调用
GetRequestPrototype
,获取 Resuqest 的 Prototype,并New一个 Resuqest 的实例 - 从请求报文中,反序列化出Resuqest
- 调用
GetResponsePrototype
,获取 Response 的 Prototype,并New一个 Response 的实例 - 最后调用
Service::CallMethod
,执行业务逻辑 done->Run()
的时候应该要能把response序列化后返回给client(返回RPC响应报文)
void RpcServer::onRpcMessage(const TcpConnectionPtr& conn,
const RpcMessagePtr& messagePtr,
Timestamp receiveTime)
{
assert(conn == conn_);
RpcMessage& message = *messagePtr;
if (message.type() == RESPONSE)
{
}
else if (message.type() == REQUEST)
{
// FIXME: extract to a function
ErrorCode error = WRONG_PROTO;
if (services_)
{
// 1. 通过Service名字找到被注册的Service类
std::map<std::string, google::protobuf::Service*>::const_iterator it = services_->find(message.service());
if (it != services_->end())
{
google::protobuf::Service* service = it->second;
assert(service != NULL);
// 2. 调用`ServiceDescriptor* Service::GetDescriptor()`获取`ServiceDescriptor`,
const google::protobuf::ServiceDescriptor* desc = service->GetDescriptor();
// 3. 调用`MethodDescriptor* ServiceDescriptor::FindMethodByName`获取`MethodDescriptor`,
const google::protobuf::MethodDescriptor* method
= desc->FindMethodByName(message.method());
if (method)
{
// 4. 调用`GetRequestPrototype`,获取 Resuqest 的 Prototype,并New一个 Resuqest 的实例
std::unique_ptr<google::protobuf::Message> request(service->GetRequestPrototype(method).New());
// 5. 从请求报文中,反序列化出Resuqest
if (request->ParseFromString(message.request()))
{
// 6. 调用`GetResponsePrototype`,获取 Response 的 Prototype,并New一个 Response 的实例
google::protobuf::Message* response = service->GetResponsePrototype(method).New();
// response is deleted in doneCallback
int64_t id = message.id();
// 7. 最后调用`Service::CallMethod`,执行业务逻辑
service->CallMethod(method, NULL, get_pointer(request), response,
google::protobuf::NewCallback(this, &RpcServer::doneCallback, doneCallbackArgs(response, id, conn)));
error = NO_ERROR;
}
else
{
error = INVALID_REQUEST;
}
}
else
{
error = NO_METHOD;
}
}
else
{
error = NO_SERVICE;
}
}
else
{
error = NO_SERVICE;
}
if (error != NO_ERROR)
{
RpcMessage response;
response.set_type(RESPONSE);
response.set_id(message.id());
response.set_error(error);
codec_.send(conn_, response);
}
}
else if (message.type() == ERROR)
{
}
}
// 8. `done->Run()`的时候应该要能把response序列化后返回给client(返回RPC响应报文)
void RpcServer::doneCallback(const doneCallbackArgs args)
{
std::unique_ptr<google::protobuf::Message> d(args.response);
RpcMessage message;
message.set_type(RESPONSE);
message.set_id(args.id);
message.set_response(args.response->SerializeAsString()); // FIXME: error check
codec_.send(args.conn, message);
}
RpcChannel
class RpcChannel : public ::google::protobuf::RpcChannel
{
public:
RpcChannel() = delete;
RpcChannel(EventLoop* loop, const InetAddress& serverAddr);
...
// RpcChannel only support async RPC call (done != nullptr) now
// FIXME support sync RPC call (done == nullptr)
void CallMethod(const ::google::protobuf::MethodDescriptor* method,
::google::protobuf::RpcController* controller,
const ::google::protobuf::Message* request,
::google::protobuf::Message* response,
::google::protobuf::Closure* done) override;
private:
TcpConnectionPtr connect(); // FIXME make this func async
void onRpcMessage(const TcpConnectionPtr& conn,
const RpcMessagePtr& messagePtr,
Timestamp receiveTime);
void onConnection(const TcpConnectionPtr& conn);
struct OutstandingCall
{
::google::protobuf::Message* response;
::google::protobuf::Closure* done;
};
RpcCodec codec_;
AtomicInt64 id_;
TcpClient client_;
MutexLock mutex_;
Condition cond_;
std::map<int64_t, OutstandingCall> outstandings_ GUARDED_BY(mutex_);
};
RpcChannel::RpcChannel(EventLoop* loop, const InetAddress& serverAddr)
: codec_(std::bind(&RpcChannel::onRpcMessage, this, _1, _2, _3)),
client_(loop, serverAddr, "RpcChannel"),
cond_(mutex_)
{
LOG_INFO << "RpcChannel::ctor - " << this;
client_.setMessageCallback(
std::bind(&RpcCodec ::onMessage, &codec_, _1, _2, _3));
client_.setConnectionCallback(
std::bind(&RpcChannel::onConnection, this, _1));
}
RpcChannel
的核心成员是一个TcpClient
用来通信,一个AtomicInt64 id_
为每一个从此channel
发出去的RPC请求报文分配一个全局递增的id,和一个std::map<int64_t, OutstandingCall>
建立 id 到用户定义回调函数和response Message
对象的映射。RpcCodec codec_
的功能同上,属于一个无状态的工具类,负责发送出完整的TCP请求报文和解析出一个完整的TCP相应报文
当调用RpcChannel::CallMethod
,如果没有建连则会先去建立TCP连接,这个建连接口RpcChannel::connect()
是个同步接口(可以看RpcChannel::connect()
和RpcChannel::onConnection
内部实现)。
建连完毕之后先调用MethodDescriptor::name()
获取method名字,调用MethodDescriptor::service()::name()
获取service名字,然后组装一个RPC请求报文,然后把用户的回调函数和response Message
对象记录在outstandings_
,最后通过codec_
发送RPC请求报文
void RpcChannel::CallMethod(const ::google::protobuf::MethodDescriptor* method,
google::protobuf::RpcController* controller,
const ::google::protobuf::Message* request,
::google::protobuf::Message* response,
::google::protobuf::Closure* done)
{
TcpConnectionPtr conn = client_.connection();
if (conn == NULL)
{
conn = connect();
}
RpcMessage message;
message.set_type(REQUEST);
int64_t id = id_.incrementAndGet();
message.set_id(id);
message.set_service(method->service()->full_name());
message.set_method(method->name());
message.set_request(request->SerializeAsString()); // FIXME: error check
OutstandingCall out = { response, done };
{
MutexLockGuard lock(mutex_);
outstandings_[id] = out;
}
codec_.send(conn, message);
}
等收到一个完整的TCP报文的时候再调用RpcChannel::onRpcMessage
根据RPC响应报文里的id,从outstandings_
找到对应的用户回调和response Message
对象。然后从报文中反序列化出respose对象,调用用户回调,最后析构response对象
void RpcChannel::onRpcMessage(const TcpConnectionPtr& conn,
const RpcMessagePtr& messagePtr,
Timestamp receiveTime)
{
assert(conn == client_.connection());
//printf("%s\n", message.DebugString().c_str());
RpcMessage& message = *messagePtr;
if (message.type() == RESPONSE)
{
int64_t id = message.id();
assert(message.has_response() || message.has_error());
OutstandingCall out = { NULL, NULL };
{
MutexLockGuard lock(mutex_);
std::map<int64_t, OutstandingCall>::iterator it = outstandings_.find(id);
if (it != outstandings_.end())
{
out = it->second;
outstandings_.erase(it);
}
}
if (out.response)
{
std::unique_ptr<google::protobuf::Message> d(out.response);
if (message.has_response())
{
out.response->ParseFromString(message.response());
}
if (out.done)
{
out.done->Run();
}
}
}
else if (message.type() == REQUEST)
{
}
else if (message.type() == ERROR)
{
}
}