muduo TcpServer 和 TcpClient

Socket

管理一个套接字(listen fd 或 connection fd),其生命周期和管理的套接字一样长。

class Socket : noncopyable
{
 public:
  explicit Socket(int sockfd)
    : sockfd_(sockfd)
  { }

  // Socket(Socket&&) // move constructor in C++11
  ~Socket();

  int fd() const { return sockfd_; }
  ...

  /// abort if address in use
  void bindAddress(const InetAddress& localaddr);
  /// abort if address in use
  void listen(); // 即设置这个套接字为 listen fd

  /// On success, returns a non-negative integer that is
  /// a descriptor for the accepted socket, which has been
  /// set to non-blocking and close-on-exec. *peeraddr is assigned.
  /// On error, -1 is returned, and *peeraddr is untouched.
  int accept(InetAddress* peeraddr); // 从该 listen fd 中获得一个 connection fd

  void shutdownWrite();
  
  void setTcpNoDelay(bool on);
  void setReuseAddr(bool on);
  void setReusePort(bool on);
  void setKeepAlive(bool on);

 private:
  const int sockfd_;
};

析构的时候会::close套接字

Socket::~Socket()
{
  sockets::close(sockfd_);
}

void sockets::close(int sockfd)
{
  if (::close(sockfd) < 0)
  {
    LOG_SYSERR << "sockets::close";
  }
}

::bind

void Socket::bindAddress(const InetAddress& addr)
{
  sockets::bindOrDie(sockfd_, addr.getSockAddr());
}

void sockets::bindOrDie(int sockfd, const struct sockaddr* addr)
{
  int ret = ::bind(sockfd, addr, static_cast<socklen_t>(sizeof(struct sockaddr_in6)));
  if (ret < 0)
  {
    LOG_SYSFATAL << "sockets::bindOrDie";
  }
}

::listen

void Socket::listen()
{
  sockets::listenOrDie(sockfd_);
}

void sockets::listenOrDie(int sockfd)
{
  int ret = ::listen(sockfd, SOMAXCONN);
  if (ret < 0)
  {
    LOG_SYSFATAL << "sockets::listenOrDie";
  }
}

::accept4

int Socket::accept(InetAddress* peeraddr)
{
  struct sockaddr_in6 addr;
  memZero(&addr, sizeof addr);
  int connfd = sockets::accept(sockfd_, &addr);
  if (connfd >= 0)
  {
    peeraddr->setSockAddrInet6(addr);
  }
  return connfd;
}

int sockets::accept(int sockfd, struct sockaddr_in6* addr)
{
  socklen_t addrlen = static_cast<socklen_t>(sizeof *addr);
  int connfd = ::accept4(sockfd, sockaddr_cast(addr),
                         &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
  return connfd;
}

::shutdown

void Socket::shutdownWrite()
{
  sockets::shutdownWrite(sockfd_);
}

void sockets::shutdownWrite(int sockfd)
{
  if (::shutdown(sockfd, SHUT_WR) < 0)
  {
    LOG_SYSERR << "sockets::shutdownWrite";
  }
}

Acceptor

Acceptor 用于 accept 新的 TCP 连接,当接受的新的连接之后,通过 NewConnectionCallback 回调通知调用者。它是内部 class 供 TcpServer 使用,生命周期由后者控制。

class Acceptor : noncopyable
{
 public:
  typedef std::function<void (int sockfd, const InetAddress&)> NewConnectionCallback;

  Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport);
  ~Acceptor();

  void setNewConnectionCallback(const NewConnectionCallback& cb)
  { newConnectionCallback_ = cb; }

  void listen();

  bool listening() const { return listening_; }

 private:
  void handleRead();

  EventLoop* loop_;
  Socket acceptSocket_;
  Channel acceptChannel_;
  NewConnectionCallback newConnectionCallback_;
  bool listening_;
  int idleFd_;
};

Acceptor 构造函数接受一个 EventLoop*Channel acceptChannel_ 就将跑在这个 loop 上 构造函数中会调用 sockets::createNonblockingOrDie,来创建一个 非阻塞 的套接字(sockfd),用来初始化 Socket acceptSocket_,然后调用 Socket::bindAddress,来设置监听地址。 用刚刚创建的套接字构造 Channel acceptChannel_,然后设置 ReadCallBack 为 Acceptor::handleRead

Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport)
  : loop_(loop),
    acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),
    acceptChannel_(loop, acceptSocket_.fd()),
    listening_(false),
    idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))
{
  assert(idleFd_ >= 0);
  acceptSocket_.setReuseAddr(true);
  acceptSocket_.setReusePort(reuseport);
  acceptSocket_.bindAddress(listenAddr);
  acceptChannel_.setReadCallback(
      std::bind(&Acceptor::handleRead, this));
}

::socket

int sockets::createNonblockingOrDie(sa_family_t family)
{
  int sockfd = ::socket(family, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_TCP);
  if (sockfd < 0)
  {
    LOG_SYSFATAL << "sockets::createNonblockingOrDie";
  }
  return sockfd;
}

Acceptor::listen() 会调用 Socket::listen 开始监听套接字(即设置这个套接字为 listen fd),然后设置 acceptChannel_ 关心可读事件,最终会注册在 Poller 中。这里直接调用 acceptChannel_.enableReading,说明此时 loop_ 还没有开始 loop。

void Acceptor::listen()
{
  loop_->assertInLoopThread();
  listening_ = true;
  acceptSocket_.listen();
  acceptChannel_.enableReading();
}

当套接字可读时会回调 Acceptor::handleRead,其中会调用 ::accept4,来生成一个非阻塞的 connection fd,并回调 newConnectionCallback_::accept4 时 发生文件符耗尽(EMFILE),那么先 close 掉之前打开的 idleFd_"/dev/null"),这样就腾出来一个 fd,这时再 ::accept4,然后再把刚 accpet 的 connection fd close 掉,最后再打开的 idleFd_"/dev/null")。这样算是优雅的 close 掉了 handle 不了的 connection 了

void Acceptor::handleRead()
{
  loop_->assertInLoopThread();
  InetAddress peerAddr;
  //FIXME loop until no more
  int connfd = acceptSocket_.accept(&peerAddr);
  if (connfd >= 0)
  {
    if (newConnectionCallback_)
    {
      newConnectionCallback_(connfd, peerAddr);
    }
    else
    {
      sockets::close(connfd);
    }
  }
  else
  {
    if (errno == EMFILE)
    {
      ::close(idleFd_);
      idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
      ::close(idleFd_);
      idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
    }
  }
}

TcpConnection

TcpConnection 在地位上我觉得和 Acceptor 是一样的:

  1. 都是自己拥有一个 SocketAcceptor 管理 listen fd,TcpConnection 管理 connection fd)
  2. 构造函数都是接受一个 EventLoop*,自己跑在自己接受的 loop 上
  3. Acceptor 只关心可读事件,TcpConnection 关心可读或者可写事件,由其使用者控制

TcpConnection 被使用 shared_ptr 来管理 注意 TcpConnection 表示的是“一次TCP连接”,它是不可再生的,一旦连接断开,这个 TcpConnection 对象就没啥用了。另外 TcpConnection 没有发起连接的功能, 其构造函数的参数是已经建立好连接的 connection fd,因此其初始状态是 kConnecting

typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;
typedef std::function<void (const TcpConnectionPtr&)> ConnectionCallback;
typedef std::function<void (const TcpConnectionPtr&)> CloseCallback;
typedef std::function<void (const TcpConnectionPtr&)> WriteCompleteCallback;
typedef std::function<void (const TcpConnectionPtr&, size_t)> HighWaterMarkCallback;

class TcpConnection : noncopyable,
                      public std::enable_shared_from_this<TcpConnection>
{
  public:
    void setConnectionCallback(const ConnectionCallback& cb)
    { connectionCallback_ = cb; }

    void setMessageCallback(const MessageCallback& cb)
    { messageCallback_ = cb; }

    void setWriteCompleteCallback(const WriteCompleteCallback& cb)
    { writeCompleteCallback_ = cb; }

    void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
    { highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }
    
  private:
  	ConnectionCallback connectionCallback_;
    MessageCallback messageCallback_;
    WriteCompleteCallback writeCompleteCallback_;
    HighWaterMarkCallback highWaterMarkCallback_;
    CloseCallback closeCallback_;
}

TcpConnection::TcpConnection(EventLoop* loop,
                             const string& nameArg,
                             int sockfd,
                             const InetAddress& localAddr,
                             const InetAddress& peerAddr)
  : loop_(CHECK_NOTNULL(loop)),
    name_(nameArg),
    state_(kConnecting),
    reading_(true),
    socket_(new Socket(sockfd)),
    channel_(new Channel(loop, sockfd)),
    localAddr_(localAddr),
    peerAddr_(peerAddr),
    highWaterMark_(64*1024*1024)
{
  channel_->setReadCallback(
      std::bind(&TcpConnection::handleRead, this, _1));
  channel_->setWriteCallback(
      std::bind(&TcpConnection::handleWrite, this));
  channel_->setCloseCallback(
      std::bind(&TcpConnection::handleClose, this));
  channel_->setErrorCallback(
      std::bind(&TcpConnection::handleError, this));
  socket_->setKeepAlive(true);
}

TcpConnection 使用 channel_ 来关注 connection fd 上的 event,并根据事件类型分发给 TcpConnection::handleReadTcpConnection::handleWriteTcpConnection::handleCloseTcpConnection::handleError。在其中又会分别调用用户指定的 messageCallback_writeCompleteCallback_closeCallback_

TcpServer 接受了一个新的连接,然后想让这个连接跑在某一个 EventLoop 上,这个时候就需要调配任务,这个任务的内容是:

  1. setState(kConnected)
  2. 设置channel_ 关心可读事件,最终会被注册到 Poller 中去
  3. 原地调用用户定义的回调 connectionCallback_(这个是在 TcpServer::newConnection 中“继承” TCPServer::connectionCallback_的)
void TcpConnection::connectEstablished()
{
  loop_->assertInLoopThread();
  assert(state_ == kConnecting);
  setState(kConnected);
  channel_->tie(shared_from_this());
  channel_->enableReading();

  connectionCallback_(shared_from_this());
}

可读事件发生后在IO线程原地回调 TcpConnection::handleRead

void TcpConnection::handleRead(Timestamp receiveTime)
{
  ...
  ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
  if (n > 0)
  {
    messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
  }
  else if (n == 0)
  {
    handleClose();
  }
  else
  {
    // error
  }
}

先从 connection fd 里面读数据出来,读到 Buffer inputBuffer_ 中 (Buffer::readFd() 只调用一次 read(2),而没有反复调用 read(2) 直到其返回 EAGAIN。首先,这么做是正确的,因为muduo采用 level trigger,这么做不会丢失数据或消息。其次,对追求低延迟的程序来说,这么做是高效的,因为每次读数据只需要一次系统调用。再次,这样做照顾了多个连接的公平性,不会因为某个连接上数据量过大而影响其他连接处理消息) 4. 若从中读出来了数据,就回调用户定义的回调 messageCallback_(这个是在 TcpServer::newConnection 中“继承” TCPServer::messageCallback_的,或用户单独为这个 TcpConnection 通过 TcpConnection::setMessageCallback 设定的) 5. 若没读出来数据,说明对方先关闭了连接,read(2) 返回 0,将会调用 TcpConnection::handleClose() 执行关闭连接的逻辑

TcpConnection::handleClose 会在可读事件发生,但是读出来 0 字节时被调用,或者 close 事件发生,作为 Channel::closeCallback_ 被调用。

void TcpConnection::handleClose()
{
  loop_->assertInLoopThread();
  LOG_TRACE << "fd = " << channel_->fd() << " state = " << stateToString();
  assert(state_ == kConnected || state_ == kDisconnecting);
  // kConnected 代表是被动关闭链接
  // kDisconnecting 代表是主动关闭连接,我方 FIN 已经发送过去,这时刚刚收到对方的 FIN
  // we don't close fd, leave it to dtor, so we can find leaks easily.
  setState(kDisconnected);
  channel_->disableAll();

  TcpConnectionPtr guardThis(shared_from_this());
  connectionCallback_(guardThis);
  // must be the last line
  closeCallback_(guardThis);
}
  1. setState(kDisconnected)
  2. 设置该 connection fd 对应的 channel_,不再关注任何事件(这里有一个潜在的问题,如果对方是shutdown write,那么对方还可以接受数据,如果此时 outpufBuffer_ 中还有数据,说明我方还有数据没有发送,是不是应该想把 outpufBuffer_ 都写入 socket 再设置不关注写事件)
  3. 原地调用用户定义的回调 connectionCallback_(这个是在 TcpServer::newConnection 中“继承” TCPServer::connectionCallback_的,或用户单独为这个 TcpConnection 通过 TcpConnection::setconnectionCallback 设定的)
  4. 原地调用在 TcpServer::newConnection 中设置的 closeCallback_TcpServer::removeConnection
void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
  // FIXME: unsafe
  loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}

void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
  loop_->assertInLoopThread();
  size_t n = connections_.erase(conn->name());
  (void)n;
  assert(n == 1);
  EventLoop* ioLoop = conn->getLoop();
  ioLoop->queueInLoop(
      std::bind(&TcpConnection::connectDestroyed, conn));
}

TcpServer::removeConnectionInLoop 的主要作用是把该 TcpConnectionPtrConnectionMap connections_ 中删除,为了保证线程安全,没有使用加锁的方式,而是用 loop_->runInLoop,在 TcpServer::loop_ 所属的线程操作。

TcpServer::removeConnectionInLoop 把 conn 从 ConnectionMap 中移除。这时该 TcpConnection 已经是命悬一线:如果用户不持有 TcpConnectionPtr 的话,conn 的引用计数已降到1。注意这里用 std::bindTcpConnection 的生命期长至调用 connectDestroyed() 的时刻。

因为 TcpSeverTcpConnection 可能没有跑在一个 loop 上,需要 ioLoop->queueInLoopTcpConnection::connectDestroyed 的作用是把 connection fd 从 poller 中移除

void TcpConnection::connectDestroyed()
{
  loop_->assertInLoopThread();
  if (state_ == kConnected)
  { // 是暴力关闭的
    setState(kDisconnected);
    channel_->disableAll();

    connectionCallback_(shared_from_this());
  }
  channel_->remove();
}

TcpConnection::handleClose 时,state_ 就已经被设置为 kDisconnected 了,不会走进这个 if 这个函数执行完成之后,conn 会析构,最终会 close fd

void TcpConnection::shutdown()
{
  // FIXME: use compare and swap
  if (state_ == kConnected)
  {
    setState(kDisconnecting);
    // FIXME: shared_from_this()?
    loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));
  }
}

void TcpConnection::shutdownInLoop()
{
  loop_->assertInLoopThread();
  if (!channel_->isWriting())
  {
    // we are not writing
    socket_->shutdownWrite();
  }
}
  1. setState(kDisconnecting)
  2. loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this)) 如果不关注写事件(!channel_->isWriting()),说明没有待写的数据,直接 ::shutdown(sockfd, SHUT_WR) 如果不为空,等到写完之后,看到 state_ == kDisconnecting,就会 shutdown https://img-blog.csdnimg.cn/0c5ff3d207154df192603f9f9dfc6cdf.png#pic_center

shutdown 相当于主动向对方发送一个第一个 FIN,对方的 read 会返回 0,等到对方回复第二个 FIN,就会走一遍上面被动关闭的流程,最终 close fd

TcpServer::~TcpServer()
{
  loop_->assertInLoopThread();
  LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing";

  for (auto& item : connections_)
  {
    TcpConnectionPtr conn(item.second);
    item.second.reset();
    conn->getLoop()->runInLoop(
      std::bind(&TcpConnection::connectDestroyed, conn));
  }
}

TcpServer 析构的时候会直接调用 TcpConnection::connectDestroyed 将该 connection fd 从 poller 上移除,如果用户没有持有 TcpConnectionPtr,那么将会析构,最中 close fd

void TcpConnection::connectDestroyed()
{
  loop_->assertInLoopThread();
  if (state_ == kConnected)
  { // 是暴力关闭的
    setState(kDisconnected);
    channel_->disableAll();

    connectionCallback_(shared_from_this());
  }
  channel_->remove();
}

暴力关闭会走进这个 if ,原地调用用户定义的回调 connectionCallback_(这个是在 TcpServer::newConnection 中“继承” TCPServer::connectionCallback_的,或用户单独为这个 TcpConnection 通过 TcpConnection::setconnectionCallback 设定的)

发送数据接口 TcpConnection::send 是线程安全的, 如果是在IO线程调用则直接 TcpConnection::sendInLoop,否则需要 loop_->runInLoop 来把IO线程唤醒,在IO线程原地回调 TcpConnection::sendInLoop

void TcpConnection::send(const StringPiece& message)
{
  if (state_ == kConnected) // 如果不是 kConnected 则直接忽略这次 send
  {
    if (loop_->isInLoopThread())
    {
      sendInLoop(message);
    }
    else
    {
      void (TcpConnection::*fp)(const StringPiece& message) = &TcpConnection::sendInLoop;
      loop_->runInLoop(
          std::bind(fp,
                    this,     // FIXME
                    message.as_string()));
                    //std::forward<string>(message)));
    }
  }
}
  1. 如果 !channel_->isWriting() && outputBuffer_.readableBytes()(没有待发送的数据),则先尝试发送,如果一次都发完(没发完说明内核态 TCP 发送缓冲区满了 EWOULDBLOCK),则回调 writeCompleteCallback_
  2. 把剩余的要发送的数据 append 到 outputBuffer_ 中(如果积压的待发送的数据超过了 highWaterMark_,那就回调 highWaterMarkCallback_),并开始关心可写事件(由于 muduo 采用 level trigger,因此我们只在需要时才去关注可写事件,否则产生 busy loop)
void TcpConnection::sendInLoop(const void* data, size_t len)
{
  loop_->assertInLoopThread();
  ssize_t nwrote = 0;
  size_t remaining = len;
  bool faultError = false;
  if (state_ == kDisconnected)
  {
    LOG_WARN << "disconnected, give up writing";
    return;
  }
  // if no thing in output queue, try writing directly
  if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
  {
    nwrote = sockets::write(channel_->fd(), data, len);
    if (nwrote >= 0)
    {
      remaining = len - nwrote;
      if (remaining == 0 && writeCompleteCallback_)
      {
        loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
      }
    }
    else // nwrote < 0
    {
      nwrote = 0;
      if (errno != EWOULDBLOCK)
      {
        LOG_SYSERR << "TcpConnection::sendInLoop";
        if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?
        {
          faultError = true;
        }
      }
    }
  }

  assert(remaining <= len);
  if (!faultError && remaining > 0)
  {
    size_t oldLen = outputBuffer_.readableBytes();
    if (oldLen + remaining >= highWaterMark_
        && oldLen < highWaterMark_
        && highWaterMarkCallback_)
    {
      loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
    }
    outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
    if (!channel_->isWriting())
    {
      channel_->enableWriting();
    }
  }
}

当 socket 变得可写时,会在IO线程原地回调TcpConnection::handleWrite(),这里我们继续发送 outputBuffer_ 中的数据。一旦发送完毕(outputBuffer_.readableBytes() == 0),立刻停止观察 writable 事件,避免 busy loop,并调用 writeCompleteCallback_。另外如果这时连接正在关闭(state_ == kDisconnecting),则调用 shutdownInLoop(),继续主动关闭过程。

void TcpConnection::handleWrite()
{
  loop_->assertInLoopThread();
  if (channel_->isWriting())
  {
    ssize_t n = sockets::write(channel_->fd(),
                               outputBuffer_.peek(),
                               outputBuffer_.readableBytes());
    if (n > 0)
    {
      outputBuffer_.retrieve(n);
      if (outputBuffer_.readableBytes() == 0)
      {
        channel_->disableWriting();
        if (writeCompleteCallback_)
        {
          loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
        }
        if (state_ == kDisconnecting)
        {
          shutdownInLoop();
        }
      }
    }
    else
    {
      // error
    }
  }
  
}

https://img-blog.csdnimg.cn/f47202a698dd4b8e87e0cf5d42895a60.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAQnJhbnppbm8=,size_20,color_FFFFFF,t_70,g_se,x_16#pic_center

TcpServer

TcpServer 的构造函数接收一个 EventLoop* loop_,和一个监听地址 const InetAddress&TcpServer 的作用是:

  1. 在内部构造 std::unique_ptr<Acceptor> acceptor_ 成员,让 acceptor_ 跑在 EventLoop* loop_
  2. 内部构造 std::shared_ptr<EventLoopThreadPool> threadPool_ 成员,利用 acceptor_ accept 出的套接字构造 TcpConnection 对象,让 TcpConnection 对象跑在 threadPool_ 中的一个 loop 上
class TcpServer : noncopyable
{
 public:
  typedef std::function<void(EventLoop*)> ThreadInitCallback;
  TcpServer::TcpServer(EventLoop* loop,
                     const InetAddress& listenAddr,
                     const string& nameArg,
                     Option option);
  
  ...

  // 
  void setThreadNum(int numThreads)
  {threadPool_->setThreadNum(numThreads);}
  void setThreadInitCallback(const ThreadInitCallback& cb)
  { threadInitCallback_ = cb; }
  
  /// Starts the server if it's not listening.
  ///
  /// It's harmless to call it multiple times.
  /// Thread safe.
  void start();

  /// Set connection callback.
  /// Not thread safe.
  void setConnectionCallback(const ConnectionCallback& cb)
  { connectionCallback_ = cb; }

  /// Set message callback.
  /// Not thread safe.
  void setMessageCallback(const MessageCallback& cb)
  { messageCallback_ = cb; }

  /// Set write complete callback.
  /// Not thread safe.
  void setWriteCompleteCallback(const WriteCompleteCallback& cb)
  { writeCompleteCallback_ = cb; }

 private:
  ...

  typedef std::map<string, TcpConnectionPtr> ConnectionMap;

  EventLoop* loop_;  // the acceptor loop
  const string ipPort_;
  const string name_;
  std::unique_ptr<Acceptor> acceptor_; // avoid revealing Acceptor
  std::shared_ptr<EventLoopThreadPool> threadPool_;
  ConnectionCallback connectionCallback_;
  MessageCallback messageCallback_;
  WriteCompleteCallback writeCompleteCallback_;
  ThreadInitCallback threadInitCallback_;
  AtomicInt32 started_;
  // always in loop thread
  int nextConnId_;
  ConnectionMap connections_;
};

TcpServer 内部使用 Acceptor 来获取新的 connection fd,当 accept 出新的 connection fd 之后会回调 TcpServer::newConnection

TcpServer::TcpServer(EventLoop* loop,
                     const InetAddress& listenAddr,
                     const string& nameArg,
                     Option option)
  : loop_(CHECK_NOTNULL(loop)),
    ipPort_(listenAddr.toIpPort()),
    name_(nameArg),
    acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),
    threadPool_(new EventLoopThreadPool(loop, name_)),
    connectionCallback_(defaultConnectionCallback),
    messageCallback_(defaultMessageCallback),
    nextConnId_(1)
{
  acceptor_->setNewConnectionCallback(
      std::bind(&TcpServer::newConnection, this, _1, _2));
}

因为 TcpServer::start() 设计的是线程安全的(可以在 loop_ 所属的IO线程之外的线程调用),所以要 loop_->runInLoop。(Acceptor 跑在 loop_ 上,可以说 TcpServer 跑在 loop_ 上)

void TcpServer::start()
{
  if (started_.getAndSet(1) == 0)
  {
    threadPool_->start(threadInitCallback_);

    assert(!acceptor_->listening());
    loop_->runInLoop(
        std::bind(&Acceptor::listen, get_pointer(acceptor_)));
  }
}

当 accept 出新的 connection fd 之后会回调 TcpServer::newConnection

void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
  loop_->assertInLoopThread();
  EventLoop* ioLoop = threadPool_->getNextLoop();
  ++nextConnId_;
  char buf[64];
  snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
  string connName = name_ + buf;

  InetAddress localAddr(sockets::getLocalAddr(sockfd));
  // FIXME poll with zero timeout to double confirm the new connection
  // FIXME use make_shared if necessary
  TcpConnectionPtr conn(new TcpConnection(ioLoop,
                                          connName,
                                          sockfd,
                                          localAddr,
                                          peerAddr));
  connections_[connName] = conn;
  conn->setConnectionCallback(connectionCallback_);
  conn->setMessageCallback(messageCallback_);
  conn->setWriteCompleteCallback(writeCompleteCallback_);
  conn->setCloseCallback(
      std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
  ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
  1. TcpServer 自带一个 std::shared_ptr<EventLoopThreadPool> threadPool_,首先以 rr 的方式选出一个 EventLoop* ioLoop(如果 threadPool_ 的 size == 0 则返回 Acceptor 在跑的 loop_)connection fd 会跑在这上面。
  2. 创建一个 TcpConnectionPtrstd::shared_ptr<TcpConnection>)保存在 std::map<string, TcpConnectionPtr> connections_
  3. 设置 conn 的 ConnectionCallback,MessageCallback,WriteCompleteCallback 分别为,用户指定的,保存在 TcpServer 中的 connectionCallback_messageCallback_writeCompleteCallback_。设置 CloseCallback 为 TcpServer::removeConnection
  4. 因为 ioLoop (connection fd 的 IO线程可能正卡在 ::epoll 上呢),和 loop_Acceptor 的IO线程,也就是当前线程)可能不是同一个对象,所以需要 ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn))
void TcpConnection::connectEstablished()
{
  loop_->assertInLoopThread();
  assert(state_ == kConnecting);
  setState(kConnected);
  channel_->tie(shared_from_this());
  channel_->enableReading();

  connectionCallback_(shared_from_this());
}
  1. 更改状态为 kConnected
  2. 开始监听读事件,并加入到 Poller 中去
  3. 回调刚才在 TcpServer::newConnection 中设置的,用户指定的 connectionCallback_ https://img-blog.csdnimg.cn/e49a383f2b31463dbe465f85ae2e0e23.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAQnJhbnppbm8=,size_20,color_FFFFFF,t_70,g_se,x_16#pic_center

见上面 服务端被动关闭连接

Connector

Connector 用于在客户端 connect 新的 TCP 连接,并通过 NewConnectionCallback 回调通知调用者。它是内部 class 供 TcpClient 使用,生命周期由后者控制。(个人认为其地位和 Accpector 一样的)

class Connector : noncopyable,
                  public std::enable_shared_from_this<Connector>
{
 public:
  typedef std::function<void (int sockfd)> NewConnectionCallback;

  Connector(EventLoop* loop, const InetAddress& serverAddr);
  ~Connector();

  void setNewConnectionCallback(const NewConnectionCallback& cb)
  { newConnectionCallback_ = cb; }

  void start();  // can be called in any thread
  ...

 private:
  ...
  
  void startInLoop();
  void stopInLoop();
  void connect();
  void connecting(int sockfd);
  void handleWrite();
  void handleError();
  void retry(int sockfd);
  int removeAndResetChannel();
  void resetChannel();

  EventLoop* loop_;
  InetAddress serverAddr_;
  bool connect_; // atomic
  States state_;  // FIXME: use atomic variable
  std::unique_ptr<Channel> channel_;
  NewConnectionCallback newConnectionCallback_;
  int retryDelayMs_;
};

Connector 构造函数接受一个 EventLoop* loop_std::unique_ptr<Channel> channel_ 就将跑在这个 loop_

Connector::Connector(EventLoop* loop, const InetAddress& serverAddr)
  : loop_(loop),
    serverAddr_(serverAddr),
    connect_(false),
    state_(kDisconnected),
    retryDelayMs_(kInitRetryDelayMs)
{
  LOG_DEBUG << "ctor[" << this << "]";
}

Connector::start 的目的就是初始化 std::unique_ptr<Channel> channel_,并让其跑在 EventLoop* loop_ 上。

void Connector::start()
{
  connect_ = true;
  loop_->runInLoop(std::bind(&Connector::startInLoop, this)); // FIXME: unsafe
}

void Connector::startInLoop()
{
  loop_->assertInLoopThread();
  assert(state_ == kDisconnected);
  if (connect_)
  {
    connect();
  }
}

非阻塞 网络编程中,发起连接的基本方式是调用 connect(2),当 socket 变得 可写 的时候表明连接建立完毕。

调用 sockets::createNonblockingOrDie,来创建一个 非阻塞 的套接字(sockfd),然后调用 sockets::connect,来设置连接地址。

void Connector::connect()
{
  int sockfd = sockets::createNonblockingOrDie(serverAddr_.family());
  int ret = sockets::connect(sockfd, serverAddr_.getSockAddr());
  int savedErrno = (ret == 0) ? 0 : errno;
  switch (savedErrno)
  {
    case 0:
    case EINPROGRESS:
    case EINTR:
    case EISCONN:
      connecting(sockfd);
      break;

    case EAGAIN:
    case EADDRINUSE:
    case EADDRNOTAVAIL:
    case ECONNREFUSED:
    case ENETUNREACH:
      retry(sockfd);
      break;

    case EACCES:
    case EPERM:
    case EAFNOSUPPORT:
    case EALREADY:
    case EBADF:
    case EFAULT:
    case ENOTSOCK:
      LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno;
      sockets::close(sockfd);
      break;
    default:
      LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno;
      sockets::close(sockfd);
      // connectErrorCallback_();
      break;
  }
}

::connect

int sockets::connect(int sockfd, const struct sockaddr* addr)
{
  return ::connect(sockfd, addr, static_cast<socklen_t>(sizeof(struct sockaddr_in6)));
}

connect(2) 返回值为 0 或 EINPROGRESSEINTREISCONN 则说明正在建立连接(因为是 non-blocking,所以 connect(2) 返回并不代表已经完成3次握手),则开始关注读事件。 用刚刚创建的套接字构造 std::unique_ptr<Channel> channel_,然后设置 WriteCallback 为 Connector::handleWrite

void Connector::connecting(int sockfd)
{
  setState(kConnecting);
  assert(!channel_);
  channel_.reset(new Channel(loop_, sockfd));
  channel_->setWriteCallback(
      std::bind(&Connector::handleWrite, this)); // FIXME: unsafe
  channel_->setErrorCallback(
      std::bind(&Connector::handleError, this)); // FIXME: unsafe

  channel_->enableWriting();
}

当有可读事件时,回调 Connector::handleWrite()

另外,即便出现 socket 可写,也不一定意味着连接已成功建立,还需要用 getsockopt 再次确认一下。

并且要处理自连接(self-connection)。出现这种状况的原因如下。在发起连接的时候,TCP/IP 协议栈会先选择 source IP 和 source port,在没有显式调用 bind(2) 的情况下,source IP 由路由表确定,source port 由 TCP/IP 协议栈从 local port range 中选取尚未使用的 port(即ephemeral port)。如果destination IP 正好是本机,而 destination port 位于 local port range,且没有服务程序监听的话, ephemeral port 可能正好选中了 destination port,这就出现(source IP, source port) = (destination IP, destination port)的情况,即发生了自连接。处理办法是断开连接再重试,否则原本侦听 destination port 的服务进程也无法启动了。

确定 TCP 连接成功建立之后回调 newConnectionCallback_ 通知调用者

void Connector::handleWrite()
{
  if (state_ == kConnecting)
  {
    int sockfd = removeAndResetChannel(); // 将该 Channel 从 Poller 中移除
    int err = sockets::getSocketError(sockfd);
    if (err)
    {
      retry(sockfd);
    }
    else if (sockets::isSelfConnect(sockfd))
    {
      retry(sockfd);
    }
    else
    {
      setState(kConnected);
      if (connect_)
      {
        newConnectionCallback_(sockfd);
      }
      ...
    }
  }
  else
  {
    // what happened?
    assert(state_ == kDisconnected);
  }
}

异常情况将会重试,即 close 掉这个 socket,过段时间重调 Connector::startInLoop

void Connector::retry(int sockfd)
{
  sockets::close(sockfd);
  setState(kDisconnected);
  if (connect_)
  {
    loop_->runAfter(retryDelayMs_/1000.0,
                    std::bind(&Connector::startInLoop, shared_from_this()));
    retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);
  }
}

重试间隔逐渐延长,例如 0.5s,1s,2s,4s,直至 30s

TcpClient

有了 ConnectorTcpClient 就不难实现了,它的代码甚至与 TcpServer 有几分相似(都有 newConnectionremoveConnection 这两个成员函数),只不过每个 TcpClient 只管理一个 TcpConnection

TcpClient::TcpClient(EventLoop* loop,
                     const InetAddress& serverAddr,
                     const string& nameArg)
  : loop_(CHECK_NOTNULL(loop)),
    connector_(new Connector(loop, serverAddr)),
    name_(nameArg),
    connectionCallback_(defaultConnectionCallback),
    messageCallback_(defaultMessageCallback),
    retry_(false),
    connect_(true),
    nextConnId_(1)
{
  connector_->setNewConnectionCallback(
      std::bind(&TcpClient::newConnection, this, _1));
}
void TcpClient::connect()
{
  // FIXME: check state
  LOG_INFO << "TcpClient::connect[" << name_ << "] - connecting to "
           << connector_->serverAddress().toIpPort();
  connect_ = true;
  connector_->start();

当 connect 出新的 connection fd 之后会回调 TcpClient::newConnection

void TcpClient::newConnection(int sockfd)
{
  loop_->assertInLoopThread();
  InetAddress peerAddr(sockets::getPeerAddr(sockfd));
  ++nextConnId_;
  string connName = name_ + buf;

  InetAddress localAddr(sockets::getLocalAddr(sockfd));
  TcpConnectionPtr conn(new TcpConnection(loop_,
                                          connName,
                                          sockfd,
                                          localAddr,
                                          peerAddr));

  conn->setConnectionCallback(connectionCallback_);
  conn->setMessageCallback(messageCallback_);
  conn->setWriteCompleteCallback(writeCompleteCallback_);
  conn->setCloseCallback(
      std::bind(&TcpClient::removeConnection, this, _1)); // FIXME: unsafe
  {
    MutexLockGuard lock(mutex_);
    connection_ = conn;
  }
  conn->connectEstablished();
}
  1. 创建一个 TcpConnectionPtrstd::shared_ptr<TcpConnection>)保存在 connection_
  2. 设置 conn 的 ConnectionCallback,MessageCallback,WriteCompleteCallback 分别为,用户指定的,保存在 TcpClient 中的 connectionCallback_messageCallback_writeCompleteCallback_。设置 CloseCallback 为 TcpClient::removeConnection
  3. 现在正在 loop_ 线程中,Connector 刚刚就是跑在 loop_ 线程中(现在已经被 removeAndResetChannel() 过了),conn 也将会跑在 loop_ 线程中,所以这里原地回调 TcpConnection::connectEstablished()

当 conn 被动关闭时会回调 TcpClient::removeConnection,其中会调用 TcpConnection::connectDestroyed 去把该 connection fd 从 Poller 中移除,最终 close socket

void TcpClient::removeConnection(const TcpConnectionPtr& conn)
{
  loop_->assertInLoopThread();
  assert(loop_ == conn->getLoop());

  {
    MutexLockGuard lock(mutex_);
    assert(connection_ == conn);
    connection_.reset();
  }

  loop_->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
  if (retry_ && connect_)
  {
    connector_->restart();
  }
}

如果指定了 retry_,那会重新走一遍 Connector 建立连接的流程

void Connector::restart()
{
  loop_->assertInLoopThread();
  setState(kDisconnected);
  retryDelayMs_ = kInitRetryDelayMs;
  connect_ = true;
  startInLoop();
}