muduo 使用网络库和 Protobuf 来构建 RPC 框架

通过阅读moduo中rpc的示例程序来分析,如何利用一个网络库和Protobuf来构建一个RPC框架

pb generated sources

分析pb生成了哪些源码,思考需要用网络库去实现哪部分来实现通信

package sudoku;
option cc_generic_services = true;
option java_generic_services = true;
option py_generic_services = true;

message SudokuRequest {
  required string checkerboard = 1;
}

message SudokuResponse {
  optional bool solved = 1 [default=false];
  optional string checkerboard = 2;
}

service SudokuService {
  rpc Solve (SudokuRequest) returns (SudokuResponse);
}
class SudokuService : public ::PROTOBUF_NAMESPACE_ID::Service {
 protected:
  // This class should be treated as an abstract interface.
  inline SudokuService() {};
 public:
  virtual ~SudokuService();

  typedef SudokuService_Stub Stub;

  static const ::PROTOBUF_NAMESPACE_ID::ServiceDescriptor* descriptor();

  virtual void Solve(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
                       const ::sudoku::SudokuRequest* request,
                       ::sudoku::SudokuResponse* response,
                       ::google::protobuf::Closure* done);

  // implements Service ----------------------------------------------

  const ::PROTOBUF_NAMESPACE_ID::ServiceDescriptor* GetDescriptor();
  void CallMethod(const ::PROTOBUF_NAMESPACE_ID::MethodDescriptor* method,
                  ::PROTOBUF_NAMESPACE_ID::RpcController* controller,
                  const ::PROTOBUF_NAMESPACE_ID::Message* request,
                  ::PROTOBUF_NAMESPACE_ID::Message* response,
                  ::google::protobuf::Closure* done);
  const ::PROTOBUF_NAMESPACE_ID::Message& GetRequestPrototype(
    const ::PROTOBUF_NAMESPACE_ID::MethodDescriptor* method) const;
  const ::PROTOBUF_NAMESPACE_ID::Message& GetResponsePrototype(
    const ::PROTOBUF_NAMESPACE_ID::MethodDescriptor* method) const;

 private:
  GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(SudokuService);
};

SudokuService override 了父类 Service的纯虚函数

// Abstract base interface for protocol-buffer-based RPC services.  Services
// themselves are abstract interfaces (implemented either by servers or as
// stubs), but they subclass this base interface.  The methods of this
// interface can be used to call the methods of the Service without knowing
// its exact type at compile time (analogous to Reflection).
class PROTOBUF_EXPORT Service {
 public:
  inline Service() {}
  virtual ~Service();
  ...

  // Get the ServiceDescriptor describing this service and its methods.
  virtual const ServiceDescriptor* GetDescriptor() = 0;

  virtual void CallMethod(const MethodDescriptor* method,
                          RpcController* controller, const Message* request,
                          Message* response, Closure* done) = 0;

  virtual const Message& GetRequestPrototype(
      const MethodDescriptor* method) const = 0;
  virtual const Message& GetResponsePrototype(
      const MethodDescriptor* method) const = 0;
...

};

每一种具体的 Service 类(SudokuService)或 Method(Solve) 或 Message类 (SudokuRequest, SudokuResponse)类都会对应一个全局唯一ServiceDescriptor常量实例或MethodDescriptor常量实例或Descriptor常量实例,用来获取该 ConcreteType 的各种信息

通过GetDescriptor()这个接口,可以获取到指向SudokuService对应的全局唯一ServiceDescriptor常量实例的指针(const ServiceDescriptor*)

const ::PROTOBUF_NAMESPACE_ID::ServiceDescriptor* SudokuService::GetDescriptor() {
  return descriptor();
}

通过ServiceDescriptor就可以获取service name,以及指向service所包含的methods各自对应的一个全局唯一MethodDescriptor常量实例的指针(const MethodDescriptor*)

class PROTOBUF_EXPORT ServiceDescriptor {
 public:
  ...

  // The name of the service, not including its containing scope.
  const std::string& name() const;
  // The fully-qualified name of the service, scope delimited by periods.
  const std::string& full_name() const;
  ...

  // The number of methods this service defines.
  int method_count() const;
  // Gets a MethodDescriptor by index, where 0 <= index < method_count().
  // These are returned in the order they were defined in the .proto file.
  const MethodDescriptor* method(int index) const;

  // Look up a MethodDescriptor by name.
  const MethodDescriptor* FindMethodByName(ConstStringParam name) const;
  ...
};

通过MethodDescriptor可以获取method name,指向该method所属的service所对应的全局唯一ServiceDescriptor常量实例的指针(const ServiceDescriptor*),以及指向该method的input message和output message各自所对应的全局唯一Descriptor常量实例指针(const Descriptor*)

class PROTOBUF_EXPORT MethodDescriptor {
 public:
  ...

  // Name of this method, not including containing scope.
  const std::string& name() const;
  // The fully-qualified name of the method, scope delimited by periods.
  const std::string& full_name() const;
  ...
  
  // Gets the service to which this method belongs.  Never nullptr.
  const ServiceDescriptor* service() const;

  // Gets the type of protocol message which this method accepts as input.
  const Descriptor* input_type() const;
  // Gets the type of protocol message which this message produces as output.
  const Descriptor* output_type() const;

  ...
};

SolveSudokuService中唯一的method,它是个虚函数,需要业务来继承SudokuService这个类并且 override Solve方法,来实现业务逻辑(接受 request,set response)。done->Run()Closure是个纯虚类,需要RPC框架来实现)的时候应该要能把response序列化后返回给client(返回RPC响应报文)。

class SudokuServiceImpl : public SudokuService
{
 public:
  void Solve(::google::protobuf::RpcController* controller,
                       const ::sudoku::SudokuRequest* request,
                       ::sudoku::SudokuResponse* response,
                       ::google::protobuf::Closure* done) override
  {
    LOG_INFO << "SudokuServiceImpl::Solve";
    response->set_solved(true);
    response->set_checkerboard("1234567");
    done->Run();
  }
};

class PROTOBUF_EXPORT Closure {
 public:
  Closure() {}
  virtual ~Closure();

  virtual void Run() = 0;

 private:
  GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(Closure);
};

传入一个MethodDescriptor和该 method 的input Message,output Message。根据method->index()来switch,调用真正的 method,在调用之前,先把input Message,output Message强转成真正的input类型和output类型

void SudokuService::CallMethod(const ::PROTOBUF_NAMESPACE_ID::MethodDescriptor* method,
                             ::PROTOBUF_NAMESPACE_ID::RpcController* controller,
                             const ::PROTOBUF_NAMESPACE_ID::Message* request,
                             ::PROTOBUF_NAMESPACE_ID::Message* response,
                             ::google::protobuf::Closure* done) {
  GOOGLE_DCHECK_EQ(method->service(), file_level_service_descriptors_sudoku_2eproto[0]);
  switch(method->index()) {
    case 0:
      Solve(controller,
             ::PROTOBUF_NAMESPACE_ID::internal::DownCast<const ::sudoku::SudokuRequest*>(
                 request),
             ::PROTOBUF_NAMESPACE_ID::internal::DownCast<::sudoku::SudokuResponse*>(
                 response),
             done);
      break;
    default:
      GOOGLE_LOG(FATAL) << "Bad method index; this should never happen.";
      break;
  }
}

每一种具体的Message类(SudokuRequestSudokuResponse),都会有一个全局唯一的常量实例,通过静态函数default_instance()获取。这个全局唯一的常量实例叫做具体的Message类的Prototype(很形象,原型机) GetRequestPrototypeGetResponsePrototype接受一个MethodDescriptor,根据method->index()来switch,返回该method 的 input Message的Prototype,和 output Message的Prototype

const ::PROTOBUF_NAMESPACE_ID::Message& SudokuService::GetRequestPrototype(
    const ::PROTOBUF_NAMESPACE_ID::MethodDescriptor* method) const {
  GOOGLE_DCHECK_EQ(method->service(), descriptor());
  switch(method->index()) {
    case 0:
      return ::sudoku::SudokuRequest::default_instance();
    default:
      GOOGLE_LOG(FATAL) << "Bad method index; this should never happen.";
      return *::PROTOBUF_NAMESPACE_ID::MessageFactory::generated_factory()
          ->GetPrototype(method->input_type());
  }
}

const ::PROTOBUF_NAMESPACE_ID::Message& SudokuService::GetResponsePrototype(
    const ::PROTOBUF_NAMESPACE_ID::MethodDescriptor* method) const {
  GOOGLE_DCHECK_EQ(method->service(), descriptor());
  switch(method->index()) {
    case 0:
      return ::sudoku::SudokuResponse::default_instance();
    default:
      GOOGLE_LOG(FATAL) << "Bad method index; this should never happen.";
      return *::PROTOBUF_NAMESPACE_ID::MessageFactory::generated_factory()
          ->GetPrototype(method->output_type());
  }
}

基类Message有纯虚函数New(),具体的Message类(SudokuRequestSudokuResponse)overrideNew()函数,返回一个具体的Message类实例。这样我们就可以不用管Message具体是什么类,而生成一个具体的Message类实例

class PROTOBUF_EXPORT Message : public MessageLite {
 public:
  constexpr Message() {}

  // Basic Operations ------------------------------------------------

  // Construct a new instance of the same type.  Ownership is passed to the
  // caller.  (This is also defined in MessageLite, but is defined again here
  // for return-type covariance.)
  Message* New() const override = 0;
  ...
}
int main()
{
  LOG_INFO << "pid = " << getpid();
  EventLoop loop;
  InetAddress listenAddr(9981);
  sudoku::SudokuServiceImpl impl;
  RpcServer server(&loop, listenAddr);
  server.registerService(&impl);
  server.start();
  loop.loop();
  google::protobuf::ShutdownProtobufLibrary();
}

RpcServer应该可以注册各种Service。 在收到一个完整的RPC请求报文后,首先解析出Service的名字和Method的名字

  1. 通过Service名字找到被注册的Service类,
  2. 调用ServiceDescriptor* Service::GetDescriptor()获取ServiceDescriptor
  3. 调用MethodDescriptor* ServiceDescriptor::FindMethodByName获取MethodDescriptor
  4. 调用GetRequestPrototypeGetResponsePrototype获取 Resuqest 和 Response 的 Prototype
  5. 各自New一个 Resuqest 和 Response 的实例
  6. 从请求报文中,反序列化出Resuqest
  7. 最后调用Service::CallMethod,执行业务逻辑
  8. done->Run()的时候应该要能把response序列化后返回给client(返回RPC响应报文)
class SudokuService_Stub : public SudokuService {
 public:
  SudokuService_Stub(::PROTOBUF_NAMESPACE_ID::RpcChannel* channel);
  SudokuService_Stub(::PROTOBUF_NAMESPACE_ID::RpcChannel* channel,
                   ::PROTOBUF_NAMESPACE_ID::Service::ChannelOwnership ownership);
  ~SudokuService_Stub();

  inline ::PROTOBUF_NAMESPACE_ID::RpcChannel* channel() { return channel_; }

  // implements SudokuService ------------------------------------------

  void Solve(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
                       const ::sudoku::SudokuRequest* request,
                       ::sudoku::SudokuResponse* response,
                       ::google::protobuf::Closure* done);
 private:
  ::PROTOBUF_NAMESPACE_ID::RpcChannel* channel_;
  bool owns_channel_;
  GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(SudokuService_Stub);
};

SudokuService_Stub的构造函数接受一个RpcChannel,如果owns_channel_ == true,则需要在析构函数中 delete RpcChannel

Solve是SudokuService_Stub中唯一的method,这个函数即是client向server发起一个RPC请求的入口。如果done != nullptr,那说明这次RPC调用是异步的,这个done里包含了回调函数

void SudokuService_Stub::Solve(::PROTOBUF_NAMESPACE_ID::RpcController* controller,
                              const ::sudoku::SudokuRequest* request,
                              ::sudoku::SudokuResponse* response,
                              ::google::protobuf::Closure* done) {
  channel_->CallMethod(descriptor()->method(0),
                       controller, request, response, done);
}

里面实际上是调用RpcChannelCallMethod接口,因为Solve是SudokuService的第一个method,所以是descriptor()->method(0)

RpcChannel是一个纯虚类,需要RPC框架来实现

class PROTOBUF_EXPORT RpcChannel {
 public:
  inline RpcChannel() {}
  virtual ~RpcChannel();

  // Call the given method of the remote service.  The signature of this
  // procedure looks the same as Service::CallMethod(), but the requirements
  // are less strict in one important way:  the request and response objects
  // need not be of any specific class as long as their descriptors are
  // method->input_type() and method->output_type().
  virtual void CallMethod(const MethodDescriptor* method,
                          RpcController* controller, const Message* request,
                          Message* response, Closure* done) = 0;

 private:
  GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(RpcChannel);
};

CallMethod接受一个MethodDescriptor和该method的input Message,output Message,以及回调函数done,发起RPC调用

void solved(sudoku::SudokuResponse* resp)
{
    LOG_INFO << "event loop thread, thread id = " << gettid();
    LOG_INFO << "solved:\n" << resp->DebugString();
}

int main(int argc, char* argv[])
{
  LOG_INFO << "pid = " << getpid();
  if (argc > 1)
  {
    EventLoopThread loopThread;
    InetAddress serverAddr(argv[1], 9981);

    sudoku::SudokuRequest request;
    request.set_checkerboard("001010");
    sudoku::SudokuResponse* response = new sudoku::SudokuResponse;

    RpcChannelPtr channel(new RpcChannel(loopThread.startLoop(), serverAddr));
    sudoku::SudokuService_Stub stub(channel.get());
    stub.Solve(NULL, &request, response, NewCallback(solved, response));

    LOG_INFO << "main thread, thread id = " << gettid() << " is going to sleep";
    CurrentThread::sleepUsec(3000*1000);
    LOG_INFO << "main thread, thread id = " << gettid() << " is going to exit";

  }
  else
  {
    printf("Usage: %s host_ip\n", argv[0]);
  }
  google::protobuf::ShutdownProtobufLibrary();
}

RpcChannel::CallMethod

  1. 调用MethodDescriptor::name()获取method名字
  2. 调用MethodDescriptor::service()::name()获取service名字
  3. 序列化request
  4. 将上面三步的结果组成RPC请求报文并发送
  5. (有回调,异步RPC调用),那么应该把这个回调函数注册在某个地方,CallMethod立刻返回。等到网络库收到完整RPC响应报文后再调用回调

RpcServer

class RpcServer
{
 public:
  RpcServer(EventLoop* loop,
            const InetAddress& listenAddr);
  ...
  
  void registerService(::google::protobuf::Service*)
  {
    const google::protobuf::ServiceDescriptor* desc = service->GetDescriptor();
    services_[desc->full_name()] = service;
  }
  
  void start()
  {
    server_.start();
  }

 private:
    struct doneCallbackArgs
    {
        doneCallbackArgs(::google::protobuf::Message* r,
                         int64_t i, const TcpConnectionPtr c)
        : response(r), id(i), conn(c) {}

        ::google::protobuf::Message* response;
        int64_t id;
        const TcpConnectionPtr conn;
    };

  void onConnection(const TcpConnectionPtr& conn)
  {
    LOG_INFO << "RpcServer - " << conn->peerAddress().toIpPort() << " -> "
      << conn->localAddress().toIpPort() << " is "
      << (conn->connected() ? "UP" : "DOWN");
  }

  void onRpcMessage(const TcpConnectionPtr& conn,
                    const RpcMessagePtr& messagePtr,
                    Timestamp receiveTime);

  void doneCallback(const doneCallbackArgs args);

  TcpServer server_;
  std::map<std::string, ::google::protobuf::Service*> services_;
  RpcCodec codec_;
};

RpcServer的核心成员是一个TcpServer用来通信,和一个std::map<std::string, ::google::protobuf::Service*>建立 service_name 到注册过的具体 service 类的映射。RpcCodec codec_属于一个无状态的工具类,负责解析出完整的TCP请求报文和发送一个完整的TCP响应报文

当有东西从建立的这个TcpConnection吐出来后会调RpcCodec::onMessage 先看一下,codec_RpcServer中是怎么初始化的

RpcServer::RpcServer(EventLoop* loop,
                     const InetAddress& listenAddr)
  : server_(loop, listenAddr, "RpcServer"),
    codec_(std::bind(&RpcServer::onRpcMessage, this, _1, _2, _3))
{
  server_.setConnectionCallback(
      std::bind(&RpcServer::onConnection, this, _1));
  server_.setMessageCallback(
          std::bind(&RpcCodec::onMessage, &codec_, _1, _2, _3));
}

所以可以看出codec_是用来处理TCP的分包(因为一有东西从连接里出来就会调RpcCodec::onMessage),等收到一个完整的TCP报文的时候再回调RpcServer::onRpcMessage 我们先不去看codec_内部,而是先去走完收到一个完整的TCP报文之后的流程

一个完整的TCP报文实际上也是一个Protobuf Message

enum MessageType
{
  REQUEST = 1;
  RESPONSE = 2;
  ERROR = 3; // not used
}

enum ErrorCode
{
  NO_ERROR = 0;
  WRONG_PROTO = 1;
  NO_SERVICE = 2;
  NO_METHOD = 3;
  INVALID_REQUEST = 4;
  INVALID_RESPONSE = 5;
  TIMEOUT = 6;
}

message RpcMessage
{
  required MessageType type = 1;
  required fixed64 id = 2;

  optional string service = 3;
  optional string method = 4;
  optional bytes request = 5;

  optional bytes response = 6;

  optional ErrorCode error = 7;
}
  1. 通过Service名字找到被注册的Service类,
  2. 调用ServiceDescriptor* Service::GetDescriptor()获取ServiceDescriptor
  3. 调用MethodDescriptor* ServiceDescriptor::FindMethodByName获取MethodDescriptor
  4. 调用GetRequestPrototype,获取 Resuqest 的 Prototype,并New一个 Resuqest 的实例
  5. 从请求报文中,反序列化出Resuqest
  6. 调用GetResponsePrototype,获取 Response 的 Prototype,并New一个 Response 的实例
  7. 最后调用Service::CallMethod,执行业务逻辑
  8. done->Run()的时候应该要能把response序列化后返回给client(返回RPC响应报文)
void RpcServer::onRpcMessage(const TcpConnectionPtr& conn,
                              const RpcMessagePtr& messagePtr,
                              Timestamp receiveTime)
{
  assert(conn == conn_);
  RpcMessage& message = *messagePtr;
  if (message.type() == RESPONSE)
  {
  }
  else if (message.type() == REQUEST)
  {
    // FIXME: extract to a function
    ErrorCode error = WRONG_PROTO;
    if (services_)
    {
      // 1. 通过Service名字找到被注册的Service类
      std::map<std::string, google::protobuf::Service*>::const_iterator it = services_->find(message.service());
      if (it != services_->end())
      {
        google::protobuf::Service* service = it->second;
        assert(service != NULL);
        // 2. 调用`ServiceDescriptor* Service::GetDescriptor()`获取`ServiceDescriptor`,
        const google::protobuf::ServiceDescriptor* desc = service->GetDescriptor();
        // 3. 调用`MethodDescriptor* ServiceDescriptor::FindMethodByName`获取`MethodDescriptor`,
        const google::protobuf::MethodDescriptor* method
          = desc->FindMethodByName(message.method());
        if (method)
        {
          // 4. 调用`GetRequestPrototype`,获取 Resuqest 的 Prototype,并New一个 Resuqest 的实例
          std::unique_ptr<google::protobuf::Message> request(service->GetRequestPrototype(method).New());
          // 5. 从请求报文中,反序列化出Resuqest
          if (request->ParseFromString(message.request()))
          {
            // 6. 调用`GetResponsePrototype`,获取 Response 的 Prototype,并New一个 Response 的实例
            google::protobuf::Message* response = service->GetResponsePrototype(method).New();
            // response is deleted in doneCallback
            int64_t id = message.id();
            // 7. 最后调用`Service::CallMethod`,执行业务逻辑
            service->CallMethod(method, NULL, get_pointer(request), response,
                                            google::protobuf::NewCallback(this, &RpcServer::doneCallback, doneCallbackArgs(response, id, conn)));
            error = NO_ERROR;
          }
          else
          {
            error = INVALID_REQUEST;
          }
        }
        else
        {
          error = NO_METHOD;
        }
      }
      else
      {
        error = NO_SERVICE;
      }
    }
    else
    {
      error = NO_SERVICE;
    }
    if (error != NO_ERROR)
    {
      RpcMessage response;
      response.set_type(RESPONSE);
      response.set_id(message.id());
      response.set_error(error);
      codec_.send(conn_, response);
    }
  }
  else if (message.type() == ERROR)
  {
  }
}

// 8. `done->Run()`的时候应该要能把response序列化后返回给client(返回RPC响应报文)
void RpcServer::doneCallback(const doneCallbackArgs args)
{
    std::unique_ptr<google::protobuf::Message> d(args.response);
    RpcMessage message;
    message.set_type(RESPONSE);
    message.set_id(args.id);
    message.set_response(args.response->SerializeAsString()); // FIXME: error check
    codec_.send(args.conn, message);
}

RpcChannel

class RpcChannel : public ::google::protobuf::RpcChannel
{
 public:
  RpcChannel() = delete;
  RpcChannel(EventLoop* loop, const InetAddress& serverAddr);
  ...
  
  // RpcChannel only support async RPC call (done != nullptr) now
  // FIXME support sync RPC call (done == nullptr)
  void CallMethod(const ::google::protobuf::MethodDescriptor* method,
                  ::google::protobuf::RpcController* controller,
                  const ::google::protobuf::Message* request,
                  ::google::protobuf::Message* response,
                  ::google::protobuf::Closure* done) override;

 private:
  TcpConnectionPtr connect(); // FIXME make this func async

  void onRpcMessage(const TcpConnectionPtr& conn,
                    const RpcMessagePtr& messagePtr,
                    Timestamp receiveTime);

  void onConnection(const TcpConnectionPtr& conn);

  struct OutstandingCall
  {
    ::google::protobuf::Message* response;
    ::google::protobuf::Closure* done;
  };

  RpcCodec codec_;
  AtomicInt64 id_;
  TcpClient client_;
  MutexLock mutex_;
  Condition cond_;

  std::map<int64_t, OutstandingCall> outstandings_ GUARDED_BY(mutex_);
};

RpcChannel::RpcChannel(EventLoop* loop, const InetAddress& serverAddr)
  : codec_(std::bind(&RpcChannel::onRpcMessage, this, _1, _2, _3)),
    client_(loop, serverAddr, "RpcChannel"),
    cond_(mutex_)
{
  LOG_INFO << "RpcChannel::ctor - " << this;
  client_.setMessageCallback(
          std::bind(&RpcCodec ::onMessage, &codec_, _1, _2, _3));
  client_.setConnectionCallback(
          std::bind(&RpcChannel::onConnection, this, _1));
}

RpcChannel的核心成员是一个TcpClient用来通信,一个AtomicInt64 id_为每一个从此channel发出去的RPC请求报文分配一个全局递增的id,和一个std::map<int64_t, OutstandingCall>建立 id 到用户定义回调函数和response Message 对象的映射。RpcCodec codec_的功能同上,属于一个无状态的工具类,负责发送出完整的TCP请求报文和解析出一个完整的TCP相应报文

当调用RpcChannel::CallMethod,如果没有建连则会先去建立TCP连接,这个建连接口RpcChannel::connect()是个同步接口(可以看RpcChannel::connect()RpcChannel::onConnection内部实现)。

建连完毕之后先调用MethodDescriptor::name()获取method名字,调用MethodDescriptor::service()::name()获取service名字,然后组装一个RPC请求报文,然后把用户的回调函数和response Message 对象记录在outstandings_,最后通过codec_发送RPC请求报文

void RpcChannel::CallMethod(const ::google::protobuf::MethodDescriptor* method,
                            google::protobuf::RpcController* controller,
                            const ::google::protobuf::Message* request,
                            ::google::protobuf::Message* response,
                            ::google::protobuf::Closure* done)
{
  TcpConnectionPtr conn = client_.connection();
  if (conn == NULL)
  {
      conn = connect();
  }
  RpcMessage message;
  message.set_type(REQUEST);
  int64_t id = id_.incrementAndGet();
  message.set_id(id);
  message.set_service(method->service()->full_name());
  message.set_method(method->name());
  message.set_request(request->SerializeAsString()); // FIXME: error check

  OutstandingCall out = { response, done };
  {
  MutexLockGuard lock(mutex_);
  outstandings_[id] = out;
  }
  codec_.send(conn, message);
}

等收到一个完整的TCP报文的时候再调用RpcChannel::onRpcMessage 根据RPC响应报文里的id,从outstandings_找到对应的用户回调和response Message对象。然后从报文中反序列化出respose对象,调用用户回调,最后析构response对象

void RpcChannel::onRpcMessage(const TcpConnectionPtr& conn,
                              const RpcMessagePtr& messagePtr,
                              Timestamp receiveTime)
{
  assert(conn == client_.connection());
  //printf("%s\n", message.DebugString().c_str());
  RpcMessage& message = *messagePtr;
  if (message.type() == RESPONSE)
  {
    int64_t id = message.id();
    assert(message.has_response() || message.has_error());

    OutstandingCall out = { NULL, NULL };

    {
      MutexLockGuard lock(mutex_);
      std::map<int64_t, OutstandingCall>::iterator it = outstandings_.find(id);
      if (it != outstandings_.end())
      {
        out = it->second;
        outstandings_.erase(it);
      }
    }

    if (out.response)
    {
      std::unique_ptr<google::protobuf::Message> d(out.response);
      if (message.has_response())
      {
        out.response->ParseFromString(message.response());
      }
      if (out.done)
      {
        out.done->Run();
      }
    }
  }
  else if (message.type() == REQUEST)
  {
  }
  else if (message.type() == ERROR)
  {
  }
}