muduo TcpServer 和 TcpClient
Socket
管理一个套接字(listen fd 或 connection fd),其生命周期和管理的套接字一样长。
class Socket : noncopyable
{
public:
explicit Socket(int sockfd)
: sockfd_(sockfd)
{ }
// Socket(Socket&&) // move constructor in C++11
~Socket();
int fd() const { return sockfd_; }
...
/// abort if address in use
void bindAddress(const InetAddress& localaddr);
/// abort if address in use
void listen(); // 即设置这个套接字为 listen fd
/// On success, returns a non-negative integer that is
/// a descriptor for the accepted socket, which has been
/// set to non-blocking and close-on-exec. *peeraddr is assigned.
/// On error, -1 is returned, and *peeraddr is untouched.
int accept(InetAddress* peeraddr); // 从该 listen fd 中获得一个 connection fd
void shutdownWrite();
void setTcpNoDelay(bool on);
void setReuseAddr(bool on);
void setReusePort(bool on);
void setKeepAlive(bool on);
private:
const int sockfd_;
};
析构的时候会::close
套接字
Socket::~Socket()
{
sockets::close(sockfd_);
}
void sockets::close(int sockfd)
{
if (::close(sockfd) < 0)
{
LOG_SYSERR << "sockets::close";
}
}
bindAddress & listen
::bind
void Socket::bindAddress(const InetAddress& addr)
{
sockets::bindOrDie(sockfd_, addr.getSockAddr());
}
void sockets::bindOrDie(int sockfd, const struct sockaddr* addr)
{
int ret = ::bind(sockfd, addr, static_cast<socklen_t>(sizeof(struct sockaddr_in6)));
if (ret < 0)
{
LOG_SYSFATAL << "sockets::bindOrDie";
}
}
::listen
void Socket::listen()
{
sockets::listenOrDie(sockfd_);
}
void sockets::listenOrDie(int sockfd)
{
int ret = ::listen(sockfd, SOMAXCONN);
if (ret < 0)
{
LOG_SYSFATAL << "sockets::listenOrDie";
}
}
accept
::accept4
int Socket::accept(InetAddress* peeraddr)
{
struct sockaddr_in6 addr;
memZero(&addr, sizeof addr);
int connfd = sockets::accept(sockfd_, &addr);
if (connfd >= 0)
{
peeraddr->setSockAddrInet6(addr);
}
return connfd;
}
int sockets::accept(int sockfd, struct sockaddr_in6* addr)
{
socklen_t addrlen = static_cast<socklen_t>(sizeof *addr);
int connfd = ::accept4(sockfd, sockaddr_cast(addr),
&addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
return connfd;
}
shutdownWrite
::shutdown
void Socket::shutdownWrite()
{
sockets::shutdownWrite(sockfd_);
}
void sockets::shutdownWrite(int sockfd)
{
if (::shutdown(sockfd, SHUT_WR) < 0)
{
LOG_SYSERR << "sockets::shutdownWrite";
}
}
Acceptor
Acceptor
用于 accept
新的 TCP 连接,当接受的新的连接之后,通过 NewConnectionCallback
回调通知调用者。它是内部 class 供 TcpServer
使用,生命周期由后者控制。
class Acceptor : noncopyable
{
public:
typedef std::function<void (int sockfd, const InetAddress&)> NewConnectionCallback;
Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport);
~Acceptor();
void setNewConnectionCallback(const NewConnectionCallback& cb)
{ newConnectionCallback_ = cb; }
void listen();
bool listening() const { return listening_; }
private:
void handleRead();
EventLoop* loop_;
Socket acceptSocket_;
Channel acceptChannel_;
NewConnectionCallback newConnectionCallback_;
bool listening_;
int idleFd_;
};
Acceptor
构造函数接受一个 EventLoop*
,Channel acceptChannel_
就将跑在这个 loop 上
构造函数中会调用 sockets::createNonblockingOrDie
,来创建一个 非阻塞 的套接字(sockfd),用来初始化 Socket acceptSocket_
,然后调用 Socket::bindAddress
,来设置监听地址。
用刚刚创建的套接字构造 Channel acceptChannel_
,然后设置 ReadCallBack 为 Acceptor::handleRead
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport)
: loop_(loop),
acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),
acceptChannel_(loop, acceptSocket_.fd()),
listening_(false),
idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))
{
assert(idleFd_ >= 0);
acceptSocket_.setReuseAddr(true);
acceptSocket_.setReusePort(reuseport);
acceptSocket_.bindAddress(listenAddr);
acceptChannel_.setReadCallback(
std::bind(&Acceptor::handleRead, this));
}
::socket
int sockets::createNonblockingOrDie(sa_family_t family)
{
int sockfd = ::socket(family, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_TCP);
if (sockfd < 0)
{
LOG_SYSFATAL << "sockets::createNonblockingOrDie";
}
return sockfd;
}
Acceptor::listen()
会调用 Socket::listen
开始监听套接字(即设置这个套接字为 listen fd),然后设置 acceptChannel_
关心可读事件,最终会注册在 Poller
中。这里直接调用 acceptChannel_.enableReading
,说明此时 loop_
还没有开始 loop。
void Acceptor::listen()
{
loop_->assertInLoopThread();
listening_ = true;
acceptSocket_.listen();
acceptChannel_.enableReading();
}
当套接字可读时会回调 Acceptor::handleRead
,其中会调用 ::accept4
,来生成一个非阻塞的 connection fd,并回调 newConnectionCallback_
若 ::accept4
时 发生文件符耗尽(EMFILE
),那么先 close
掉之前打开的 idleFd_
("/dev/null"
),这样就腾出来一个 fd,这时再 ::accept4
,然后再把刚 accpet 的 connection fd close
掉,最后再打开的 idleFd_
("/dev/null"
)。这样算是优雅的 close
掉了 handle 不了的 connection 了
void Acceptor::handleRead()
{
loop_->assertInLoopThread();
InetAddress peerAddr;
//FIXME loop until no more
int connfd = acceptSocket_.accept(&peerAddr);
if (connfd >= 0)
{
if (newConnectionCallback_)
{
newConnectionCallback_(connfd, peerAddr);
}
else
{
sockets::close(connfd);
}
}
else
{
if (errno == EMFILE)
{
::close(idleFd_);
idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
::close(idleFd_);
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
}
}
}
TcpConnection
TcpConnection
在地位上我觉得和 Acceptor
是一样的:
- 都是自己拥有一个
Socket
(Acceptor
管理 listen fd,TcpConnection
管理 connection fd) - 构造函数都是接受一个
EventLoop*
,自己跑在自己接受的 loop 上 Acceptor
只关心可读事件,TcpConnection
关心可读或者可写事件,由其使用者控制
TcpConnection
被使用 shared_ptr
来管理
注意 TcpConnection
表示的是“一次TCP连接”,它是不可再生的,一旦连接断开,这个 TcpConnection
对象就没啥用了。另外 TcpConnection
没有发起连接的功能, 其构造函数的参数是已经建立好连接的 connection fd,因此其初始状态是 kConnecting
typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;
typedef std::function<void (const TcpConnectionPtr&)> ConnectionCallback;
typedef std::function<void (const TcpConnectionPtr&)> CloseCallback;
typedef std::function<void (const TcpConnectionPtr&)> WriteCompleteCallback;
typedef std::function<void (const TcpConnectionPtr&, size_t)> HighWaterMarkCallback;
class TcpConnection : noncopyable,
public std::enable_shared_from_this<TcpConnection>
{
public:
void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }
void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }
void setWriteCompleteCallback(const WriteCompleteCallback& cb)
{ writeCompleteCallback_ = cb; }
void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
{ highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }
private:
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
WriteCompleteCallback writeCompleteCallback_;
HighWaterMarkCallback highWaterMarkCallback_;
CloseCallback closeCallback_;
}
TcpConnection::TcpConnection(EventLoop* loop,
const string& nameArg,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr)
: loop_(CHECK_NOTNULL(loop)),
name_(nameArg),
state_(kConnecting),
reading_(true),
socket_(new Socket(sockfd)),
channel_(new Channel(loop, sockfd)),
localAddr_(localAddr),
peerAddr_(peerAddr),
highWaterMark_(64*1024*1024)
{
channel_->setReadCallback(
std::bind(&TcpConnection::handleRead, this, _1));
channel_->setWriteCallback(
std::bind(&TcpConnection::handleWrite, this));
channel_->setCloseCallback(
std::bind(&TcpConnection::handleClose, this));
channel_->setErrorCallback(
std::bind(&TcpConnection::handleError, this));
socket_->setKeepAlive(true);
}
TcpConnection
使用 channel_
来关注 connection fd 上的 event,并根据事件类型分发给 TcpConnection::handleRead
或 TcpConnection::handleWrite
或 TcpConnection::handleClose
或 TcpConnection::handleError
。在其中又会分别调用用户指定的 messageCallback_
,writeCompleteCallback_
,closeCallback_
connectEstablished
当 TcpServer
接受了一个新的连接,然后想让这个连接跑在某一个 EventLoop 上,这个时候就需要调配任务,这个任务的内容是:
- setState(kConnected)
- 设置channel_ 关心可读事件,最终会被注册到
Poller
中去 - 原地调用用户定义的回调 connectionCallback_(这个是在
TcpServer::newConnection
中“继承”TCPServer::connectionCallback_
的)
void TcpConnection::connectEstablished()
{
loop_->assertInLoopThread();
assert(state_ == kConnecting);
setState(kConnected);
channel_->tie(shared_from_this());
channel_->enableReading();
connectionCallback_(shared_from_this());
}
可读事件
可读事件发生后在IO线程原地回调 TcpConnection::handleRead
void TcpConnection::handleRead(Timestamp receiveTime)
{
...
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0)
{
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0)
{
handleClose();
}
else
{
// error
}
}
先从 connection fd 里面读数据出来,读到 Buffer inputBuffer_
中
(Buffer::readFd()
只调用一次 read(2)
,而没有反复调用 read(2)
直到其返回 EAGAIN。首先,这么做是正确的,因为muduo采用 level trigger,这么做不会丢失数据或消息。其次,对追求低延迟的程序来说,这么做是高效的,因为每次读数据只需要一次系统调用。再次,这样做照顾了多个连接的公平性,不会因为某个连接上数据量过大而影响其他连接处理消息)
4. 若从中读出来了数据,就回调用户定义的回调 messageCallback_
(这个是在 TcpServer::newConnection
中“继承” TCPServer::messageCallback_
的,或用户单独为这个 TcpConnection
通过 TcpConnection::setMessageCallback
设定的)
5. 若没读出来数据,说明对方先关闭了连接,read(2)
返回 0,将会调用 TcpConnection::handleClose()
执行关闭连接的逻辑
服务端被动关闭连接
TcpConnection::handleClose
会在可读事件发生,但是读出来 0 字节时被调用,或者 close 事件发生,作为 Channel::closeCallback_
被调用。
void TcpConnection::handleClose()
{
loop_->assertInLoopThread();
LOG_TRACE << "fd = " << channel_->fd() << " state = " << stateToString();
assert(state_ == kConnected || state_ == kDisconnecting);
// kConnected 代表是被动关闭链接
// kDisconnecting 代表是主动关闭连接,我方 FIN 已经发送过去,这时刚刚收到对方的 FIN
// we don't close fd, leave it to dtor, so we can find leaks easily.
setState(kDisconnected);
channel_->disableAll();
TcpConnectionPtr guardThis(shared_from_this());
connectionCallback_(guardThis);
// must be the last line
closeCallback_(guardThis);
}
setState(kDisconnected)
- 设置该 connection fd 对应的
channel_
,不再关注任何事件(这里有一个潜在的问题,如果对方是shutdown write,那么对方还可以接受数据,如果此时outpufBuffer_
中还有数据,说明我方还有数据没有发送,是不是应该想把outpufBuffer_
都写入 socket 再设置不关注写事件) - 原地调用用户定义的回调
connectionCallback_
(这个是在TcpServer::newConnection
中“继承”TCPServer::connectionCallback_
的,或用户单独为这个TcpConnection
通过TcpConnection::setconnectionCallback
设定的) - 原地调用在
TcpServer::newConnection
中设置的closeCallback_
(TcpServer::removeConnection
)
void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
// FIXME: unsafe
loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
loop_->assertInLoopThread();
size_t n = connections_.erase(conn->name());
(void)n;
assert(n == 1);
EventLoop* ioLoop = conn->getLoop();
ioLoop->queueInLoop(
std::bind(&TcpConnection::connectDestroyed, conn));
}
TcpServer::removeConnectionInLoop
的主要作用是把该 TcpConnectionPtr
从 ConnectionMap connections_
中删除,为了保证线程安全,没有使用加锁的方式,而是用 loop_->runInLoop
,在 TcpServer::loop_
所属的线程操作。
TcpServer::removeConnectionInLoop
把 conn 从 ConnectionMap
中移除。这时该 TcpConnection
已经是命悬一线:如果用户不持有 TcpConnectionPtr
的话,conn 的引用计数已降到1。注意这里用 std::bind
让TcpConnection
的生命期长至调用 connectDestroyed()
的时刻。
connectDestroyed
因为 TcpSever
和 TcpConnection
可能没有跑在一个 loop 上,需要 ioLoop->queueInLoop
跑 TcpConnection::connectDestroyed
的作用是把 connection fd 从 poller 中移除
void TcpConnection::connectDestroyed()
{
loop_->assertInLoopThread();
if (state_ == kConnected)
{ // 是暴力关闭的
setState(kDisconnected);
channel_->disableAll();
connectionCallback_(shared_from_this());
}
channel_->remove();
}
TcpConnection::handleClose
时,state_
就已经被设置为 kDisconnected
了,不会走进这个 if
这个函数执行完成之后,conn 会析构,最终会 close fd
服务端主动关闭连接
shutdown
void TcpConnection::shutdown()
{
// FIXME: use compare and swap
if (state_ == kConnected)
{
setState(kDisconnecting);
// FIXME: shared_from_this()?
loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));
}
}
void TcpConnection::shutdownInLoop()
{
loop_->assertInLoopThread();
if (!channel_->isWriting())
{
// we are not writing
socket_->shutdownWrite();
}
}
- setState(kDisconnecting)
- loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this))
如果不关注写事件(
!channel_->isWriting()
),说明没有待写的数据,直接::shutdown(sockfd, SHUT_WR)
如果不为空,等到写完之后,看到state_ == kDisconnecting
,就会 shutdown
shutdown 相当于主动向对方发送一个第一个 FIN,对方的 read 会返回 0,等到对方回复第二个 FIN,就会走一遍上面被动关闭的流程,最终 close fd
暴力
TcpServer::~TcpServer()
{
loop_->assertInLoopThread();
LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing";
for (auto& item : connections_)
{
TcpConnectionPtr conn(item.second);
item.second.reset();
conn->getLoop()->runInLoop(
std::bind(&TcpConnection::connectDestroyed, conn));
}
}
TcpServer
析构的时候会直接调用 TcpConnection::connectDestroyed
将该 connection fd 从 poller 上移除,如果用户没有持有 TcpConnectionPtr
,那么将会析构,最中 close fd
void TcpConnection::connectDestroyed()
{
loop_->assertInLoopThread();
if (state_ == kConnected)
{ // 是暴力关闭的
setState(kDisconnected);
channel_->disableAll();
connectionCallback_(shared_from_this());
}
channel_->remove();
}
暴力关闭会走进这个 if ,原地调用用户定义的回调 connectionCallback_
(这个是在 TcpServer::newConnection
中“继承” TCPServer::connectionCallback_
的,或用户单独为这个 TcpConnection
通过 TcpConnection::setconnectionCallback
设定的)
发送
发送数据接口 TcpConnection::send
是线程安全的,
如果是在IO线程调用则直接 TcpConnection::sendInLoop
,否则需要 loop_->runInLoop
来把IO线程唤醒,在IO线程原地回调 TcpConnection::sendInLoop
void TcpConnection::send(const StringPiece& message)
{
if (state_ == kConnected) // 如果不是 kConnected 则直接忽略这次 send
{
if (loop_->isInLoopThread())
{
sendInLoop(message);
}
else
{
void (TcpConnection::*fp)(const StringPiece& message) = &TcpConnection::sendInLoop;
loop_->runInLoop(
std::bind(fp,
this, // FIXME
message.as_string()));
//std::forward<string>(message)));
}
}
}
- 如果
!channel_->isWriting() && outputBuffer_.readableBytes()
(没有待发送的数据),则先尝试发送,如果一次都发完(没发完说明内核态 TCP 发送缓冲区满了EWOULDBLOCK
),则回调writeCompleteCallback_
- 把剩余的要发送的数据 append 到
outputBuffer_
中(如果积压的待发送的数据超过了highWaterMark_
,那就回调highWaterMarkCallback_
),并开始关心可写事件(由于 muduo 采用 level trigger,因此我们只在需要时才去关注可写事件,否则产生 busy loop)
void TcpConnection::sendInLoop(const void* data, size_t len)
{
loop_->assertInLoopThread();
ssize_t nwrote = 0;
size_t remaining = len;
bool faultError = false;
if (state_ == kDisconnected)
{
LOG_WARN << "disconnected, give up writing";
return;
}
// if no thing in output queue, try writing directly
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
nwrote = sockets::write(channel_->fd(), data, len);
if (nwrote >= 0)
{
remaining = len - nwrote;
if (remaining == 0 && writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
}
else // nwrote < 0
{
nwrote = 0;
if (errno != EWOULDBLOCK)
{
LOG_SYSERR << "TcpConnection::sendInLoop";
if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?
{
faultError = true;
}
}
}
}
assert(remaining <= len);
if (!faultError && remaining > 0)
{
size_t oldLen = outputBuffer_.readableBytes();
if (oldLen + remaining >= highWaterMark_
&& oldLen < highWaterMark_
&& highWaterMarkCallback_)
{
loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
}
outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
if (!channel_->isWriting())
{
channel_->enableWriting();
}
}
}
当 socket 变得可写时,会在IO线程原地回调TcpConnection::handleWrite()
,这里我们继续发送 outputBuffer_
中的数据。一旦发送完毕(outputBuffer_.readableBytes() == 0),立刻停止观察 writable 事件,避免 busy loop,并调用 writeCompleteCallback_
。另外如果这时连接正在关闭(state_ == kDisconnecting
),则调用 shutdownInLoop()
,继续主动关闭过程。
void TcpConnection::handleWrite()
{
loop_->assertInLoopThread();
if (channel_->isWriting())
{
ssize_t n = sockets::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
if (n > 0)
{
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0)
{
channel_->disableWriting();
if (writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
if (state_ == kDisconnecting)
{
shutdownInLoop();
}
}
}
else
{
// error
}
}
}
状态机
TcpServer
TcpServer
的构造函数接收一个 EventLoop* loop_
,和一个监听地址 const InetAddress&
。
TcpServer
的作用是:
- 在内部构造
std::unique_ptr<Acceptor> acceptor_
成员,让acceptor_
跑在EventLoop* loop_
。 - 内部构造
std::shared_ptr<EventLoopThreadPool> threadPool_
成员,利用acceptor_
accept 出的套接字构造TcpConnection
对象,让TcpConnection
对象跑在threadPool_
中的一个 loop 上
class TcpServer : noncopyable
{
public:
typedef std::function<void(EventLoop*)> ThreadInitCallback;
TcpServer::TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg,
Option option);
...
//
void setThreadNum(int numThreads)
{threadPool_->setThreadNum(numThreads);}
void setThreadInitCallback(const ThreadInitCallback& cb)
{ threadInitCallback_ = cb; }
/// Starts the server if it's not listening.
///
/// It's harmless to call it multiple times.
/// Thread safe.
void start();
/// Set connection callback.
/// Not thread safe.
void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }
/// Set message callback.
/// Not thread safe.
void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }
/// Set write complete callback.
/// Not thread safe.
void setWriteCompleteCallback(const WriteCompleteCallback& cb)
{ writeCompleteCallback_ = cb; }
private:
...
typedef std::map<string, TcpConnectionPtr> ConnectionMap;
EventLoop* loop_; // the acceptor loop
const string ipPort_;
const string name_;
std::unique_ptr<Acceptor> acceptor_; // avoid revealing Acceptor
std::shared_ptr<EventLoopThreadPool> threadPool_;
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
WriteCompleteCallback writeCompleteCallback_;
ThreadInitCallback threadInitCallback_;
AtomicInt32 started_;
// always in loop thread
int nextConnId_;
ConnectionMap connections_;
};
接受新连接
TcpServer
内部使用 Acceptor
来获取新的 connection fd,当 accept 出新的 connection fd 之后会回调 TcpServer::newConnection
TcpServer::TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg,
Option option)
: loop_(CHECK_NOTNULL(loop)),
ipPort_(listenAddr.toIpPort()),
name_(nameArg),
acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),
threadPool_(new EventLoopThreadPool(loop, name_)),
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
nextConnId_(1)
{
acceptor_->setNewConnectionCallback(
std::bind(&TcpServer::newConnection, this, _1, _2));
}
start
因为 TcpServer::start()
设计的是线程安全的(可以在 loop_
所属的IO线程之外的线程调用),所以要 loop_->runInLoop
。(Acceptor
跑在 loop_
上,可以说 TcpServer
跑在 loop_
上)
void TcpServer::start()
{
if (started_.getAndSet(1) == 0)
{
threadPool_->start(threadInitCallback_);
assert(!acceptor_->listening());
loop_->runInLoop(
std::bind(&Acceptor::listen, get_pointer(acceptor_)));
}
}
newConnection
当 accept 出新的 connection fd 之后会回调 TcpServer::newConnection
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
EventLoop* ioLoop = threadPool_->getNextLoop();
++nextConnId_;
char buf[64];
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
string connName = name_ + buf;
InetAddress localAddr(sockets::getLocalAddr(sockfd));
// FIXME poll with zero timeout to double confirm the new connection
// FIXME use make_shared if necessary
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));
connections_[connName] = conn;
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(
std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
TcpServer
自带一个std::shared_ptr<EventLoopThreadPool> threadPool_
,首先以 rr 的方式选出一个EventLoop* ioLoop
(如果threadPool_
的 size == 0 则返回Acceptor
在跑的loop_
)connection fd 会跑在这上面。- 创建一个
TcpConnectionPtr
(std::shared_ptr<TcpConnection>
)保存在std::map<string, TcpConnectionPtr> connections_
- 设置
conn
的 ConnectionCallback,MessageCallback,WriteCompleteCallback 分别为,用户指定的,保存在TcpServer
中的connectionCallback_
,messageCallback_
,writeCompleteCallback_
。设置 CloseCallback 为TcpServer::removeConnection
。 - 因为
ioLoop
(connection fd 的 IO线程可能正卡在::epoll
上呢),和loop_
(Acceptor
的IO线程,也就是当前线程)可能不是同一个对象,所以需要ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn))
void TcpConnection::connectEstablished()
{
loop_->assertInLoopThread();
assert(state_ == kConnecting);
setState(kConnected);
channel_->tie(shared_from_this());
channel_->enableReading();
connectionCallback_(shared_from_this());
}
- 更改状态为
kConnected
- 开始监听读事件,并加入到 Poller 中去
- 回调刚才在
TcpServer::newConnection
中设置的,用户指定的connectionCallback_
断开连接
TcpServer::removeConnection
见上面 服务端被动关闭连接
Connector
Connector
用于在客户端 connect 新的 TCP 连接,并通过 NewConnectionCallback
回调通知调用者。它是内部 class 供 TcpClient 使用,生命周期由后者控制。(个人认为其地位和 Accpector
一样的)
class Connector : noncopyable,
public std::enable_shared_from_this<Connector>
{
public:
typedef std::function<void (int sockfd)> NewConnectionCallback;
Connector(EventLoop* loop, const InetAddress& serverAddr);
~Connector();
void setNewConnectionCallback(const NewConnectionCallback& cb)
{ newConnectionCallback_ = cb; }
void start(); // can be called in any thread
...
private:
...
void startInLoop();
void stopInLoop();
void connect();
void connecting(int sockfd);
void handleWrite();
void handleError();
void retry(int sockfd);
int removeAndResetChannel();
void resetChannel();
EventLoop* loop_;
InetAddress serverAddr_;
bool connect_; // atomic
States state_; // FIXME: use atomic variable
std::unique_ptr<Channel> channel_;
NewConnectionCallback newConnectionCallback_;
int retryDelayMs_;
};
Connector
构造函数接受一个 EventLoop* loop_
,std::unique_ptr<Channel> channel_
就将跑在这个 loop_
上
Connector::Connector(EventLoop* loop, const InetAddress& serverAddr)
: loop_(loop),
serverAddr_(serverAddr),
connect_(false),
state_(kDisconnected),
retryDelayMs_(kInitRetryDelayMs)
{
LOG_DEBUG << "ctor[" << this << "]";
}
Connector::start
的目的就是初始化 std::unique_ptr<Channel> channel_
,并让其跑在 EventLoop* loop_
上。
void Connector::start()
{
connect_ = true;
loop_->runInLoop(std::bind(&Connector::startInLoop, this)); // FIXME: unsafe
}
void Connector::startInLoop()
{
loop_->assertInLoopThread();
assert(state_ == kDisconnected);
if (connect_)
{
connect();
}
}
在 非阻塞 网络编程中,发起连接的基本方式是调用 connect(2)
,当 socket 变得 可写 的时候表明连接建立完毕。
调用 sockets::createNonblockingOrDie
,来创建一个 非阻塞 的套接字(sockfd),然后调用 sockets::connect
,来设置连接地址。
void Connector::connect()
{
int sockfd = sockets::createNonblockingOrDie(serverAddr_.family());
int ret = sockets::connect(sockfd, serverAddr_.getSockAddr());
int savedErrno = (ret == 0) ? 0 : errno;
switch (savedErrno)
{
case 0:
case EINPROGRESS:
case EINTR:
case EISCONN:
connecting(sockfd);
break;
case EAGAIN:
case EADDRINUSE:
case EADDRNOTAVAIL:
case ECONNREFUSED:
case ENETUNREACH:
retry(sockfd);
break;
case EACCES:
case EPERM:
case EAFNOSUPPORT:
case EALREADY:
case EBADF:
case EFAULT:
case ENOTSOCK:
LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno;
sockets::close(sockfd);
break;
default:
LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno;
sockets::close(sockfd);
// connectErrorCallback_();
break;
}
}
::connect
int sockets::connect(int sockfd, const struct sockaddr* addr)
{
return ::connect(sockfd, addr, static_cast<socklen_t>(sizeof(struct sockaddr_in6)));
}
若 connect(2)
返回值为 0 或 EINPROGRESS
或 EINTR
或 EISCONN
则说明正在建立连接(因为是 non-blocking,所以 connect(2)
返回并不代表已经完成3次握手),则开始关注读事件。
用刚刚创建的套接字构造 std::unique_ptr<Channel> channel_
,然后设置 WriteCallback 为 Connector::handleWrite
void Connector::connecting(int sockfd)
{
setState(kConnecting);
assert(!channel_);
channel_.reset(new Channel(loop_, sockfd));
channel_->setWriteCallback(
std::bind(&Connector::handleWrite, this)); // FIXME: unsafe
channel_->setErrorCallback(
std::bind(&Connector::handleError, this)); // FIXME: unsafe
channel_->enableWriting();
}
当有可读事件时,回调 Connector::handleWrite()
另外,即便出现 socket 可写,也不一定意味着连接已成功建立,还需要用 getsockopt
再次确认一下。
并且要处理自连接(self-connection)。出现这种状况的原因如下。在发起连接的时候,TCP/IP 协议栈会先选择 source IP 和 source port,在没有显式调用 bind(2)
的情况下,source IP 由路由表确定,source port 由 TCP/IP 协议栈从 local port range 中选取尚未使用的 port(即ephemeral port)。如果destination IP 正好是本机,而 destination port 位于 local port range,且没有服务程序监听的话, ephemeral port 可能正好选中了 destination port,这就出现(source IP, source port) = (destination IP, destination port)的情况,即发生了自连接。处理办法是断开连接再重试,否则原本侦听 destination port 的服务进程也无法启动了。
确定 TCP 连接成功建立之后回调 newConnectionCallback_
通知调用者
void Connector::handleWrite()
{
if (state_ == kConnecting)
{
int sockfd = removeAndResetChannel(); // 将该 Channel 从 Poller 中移除
int err = sockets::getSocketError(sockfd);
if (err)
{
retry(sockfd);
}
else if (sockets::isSelfConnect(sockfd))
{
retry(sockfd);
}
else
{
setState(kConnected);
if (connect_)
{
newConnectionCallback_(sockfd);
}
...
}
}
else
{
// what happened?
assert(state_ == kDisconnected);
}
}
异常情况将会重试,即 close 掉这个 socket,过段时间重调 Connector::startInLoop
void Connector::retry(int sockfd)
{
sockets::close(sockfd);
setState(kDisconnected);
if (connect_)
{
loop_->runAfter(retryDelayMs_/1000.0,
std::bind(&Connector::startInLoop, shared_from_this()));
retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);
}
}
重试间隔逐渐延长,例如 0.5s,1s,2s,4s,直至 30s
TcpClient
有了 Connector
,TcpClient
就不难实现了,它的代码甚至与 TcpServer
有几分相似(都有 newConnection
和 removeConnection
这两个成员函数),只不过每个 TcpClient
只管理一个 TcpConnection
TcpClient::TcpClient(EventLoop* loop,
const InetAddress& serverAddr,
const string& nameArg)
: loop_(CHECK_NOTNULL(loop)),
connector_(new Connector(loop, serverAddr)),
name_(nameArg),
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
retry_(false),
connect_(true),
nextConnId_(1)
{
connector_->setNewConnectionCallback(
std::bind(&TcpClient::newConnection, this, _1));
}
建立新的连接
connect
void TcpClient::connect()
{
// FIXME: check state
LOG_INFO << "TcpClient::connect[" << name_ << "] - connecting to "
<< connector_->serverAddress().toIpPort();
connect_ = true;
connector_->start();
TcpClient::newConnection
当 connect 出新的 connection fd 之后会回调 TcpClient::newConnection
void TcpClient::newConnection(int sockfd)
{
loop_->assertInLoopThread();
InetAddress peerAddr(sockets::getPeerAddr(sockfd));
++nextConnId_;
string connName = name_ + buf;
InetAddress localAddr(sockets::getLocalAddr(sockfd));
TcpConnectionPtr conn(new TcpConnection(loop_,
connName,
sockfd,
localAddr,
peerAddr));
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(
std::bind(&TcpClient::removeConnection, this, _1)); // FIXME: unsafe
{
MutexLockGuard lock(mutex_);
connection_ = conn;
}
conn->connectEstablished();
}
- 创建一个
TcpConnectionPtr
(std::shared_ptr<TcpConnection>
)保存在connection_
- 设置
conn
的 ConnectionCallback,MessageCallback,WriteCompleteCallback 分别为,用户指定的,保存在TcpClient
中的connectionCallback_
,messageCallback_
,writeCompleteCallback_
。设置 CloseCallback 为TcpClient::removeConnection
。 - 现在正在
loop_
线程中,Connector
刚刚就是跑在loop_
线程中(现在已经被removeAndResetChannel()
过了),conn
也将会跑在loop_
线程中,所以这里原地回调TcpConnection::connectEstablished()
断开连接
TcpClient::removeConnection
当 conn 被动关闭时会回调 TcpClient::removeConnection
,其中会调用 TcpConnection::connectDestroyed
去把该 connection fd 从 Poller 中移除,最终 close socket
void TcpClient::removeConnection(const TcpConnectionPtr& conn)
{
loop_->assertInLoopThread();
assert(loop_ == conn->getLoop());
{
MutexLockGuard lock(mutex_);
assert(connection_ == conn);
connection_.reset();
}
loop_->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
if (retry_ && connect_)
{
connector_->restart();
}
}
如果指定了 retry_
,那会重新走一遍 Connector
建立连接的流程
void Connector::restart()
{
loop_->assertInLoopThread();
setState(kDisconnected);
retryDelayMs_ = kInitRetryDelayMs;
connect_ = true;
startInLoop();
}