muduo Poller 和 Channel
Poller
class Poller
是IO multiplexing(多路复用)的封装,是一个纯虚类。
一个 Poller
只属于一个 class EventLoop
,每一个线程最多只能有一个 class EventLoop
///
/// Base class for IO Multiplexing
///
/// This class doesn't own the Channel objects.
class Poller : noncopyable
{
public:
typedef std::vector<Channel*> ChannelList;
Poller::Poller(EventLoop* loop)
: ownerLoop_(loop){}
virtual ~Poller();
/// Polls the I/O events.
/// Must be called in the loop thread.
virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0;
/// Changes the interested I/O events.
/// Must be called in the loop thread.
virtual void updateChannel(Channel* channel) = 0;
/// Remove the channel, when it destructs.
/// Must be called in the loop thread.
virtual void removeChannel(Channel* channel) = 0;
virtual bool hasChannel(Channel* channel) const;
static Poller* newDefaultPoller(EventLoop* loop);
void assertInLoopThread() const
{
ownerLoop_->assertInLoopThread();
}
protected:
typedef std::map<int, Channel*> ChannelMap;
ChannelMap channels_;
private:
EventLoop* ownerLoop_;
};
ChannelMap channels_
记录了该 Poller
所监控的所有 fd(文件描述符)到每个 fd 所对应的 class Channel
(Channel 保存了此 fd,并记录了业务对此 fd 所关心的事件)之间的映射
Poller
的作用是:
- 记录监控了哪些 fd,和每个 fd 所对应的
Channel
(由updateChannel
,removeChannel
设置) - 调用
poll(int timeoutMs, ChannelList* activeChannels)
,找的被监控的所有 fd 中其所关心的事件发生了的 fd 所对应的Channel
,将发生了的事件通过调用Channel::set_revents
记录在该Channel
中,并返回这些Channel
poll
updateChannel
removeChannel
hasChannel
,这四个接口都不是线程安全的,分别只会被 EventLoop::loop
,EventLoop::updateChannel
,EventLoop::hasChannel
,EventLoop::removeChannel
调用。
EventLoop::loop
EventLoop::updateChannel
EventLoop::hasChannel
EventLoop::removeChannel
也不是线程安全的,只能在 EventLoop
对象所属的线程调用。这样就保证了被调用的时候,该线程是没有卡在 ::poll
或 ::epoll
上的,就可以安全的修改 ChannelMap channels_
和 PollFdList pollfds_
(PollPoller
)或调用 ::epoll_ctl
对 epollfd_(EPollPoller
)进行修改
muduo中同时支持poll和epoll两种多路复用机制,分别由class PollPoller
和class EPollPoller
实现
static Poller* Poller::newDefaultPoller(EventLoop* loop)
{
if (::getenv("MUDUO_USE_POLL"))
{
return new PollPoller(loop);
}
else
{
return new EPollPoller(loop);
}
}
PollPoller
///
/// IO Multiplexing with poll(2).
///
class PollPoller : public Poller
{
public:
PollPoller::PollPoller(EventLoop* loop)
: Poller(loop){}
~PollPoller() override;
Timestamp poll(int timeoutMs, ChannelList* activeChannels) override;
void updateChannel(Channel* channel) override;
void removeChannel(Channel* channel) override;
private:
void fillActiveChannels(int numEvents,
ChannelList* activeChannels) const;
typedef std::vector<struct pollfd> PollFdList;
PollFdList pollfds_;
};
/* Data structure describing a polling request. */
struct pollfd
{
int fd; /* File descriptor to poll. */
short int events; /* Types of events poller cares about. */
short int revents; /* Types of events that actually occurred. */
};
PollFdList pollfds_
记录了 PollPoller
需要监控的所有 fd
updateChannel
void PollPoller::updateChannel(Channel* channel)
{
Poller::assertInLoopThread();
LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
if (channel->index() < 0)
{
// a new one, add to pollfds_
assert(channels_.find(channel->fd()) == channels_.end());
struct pollfd pfd;
pfd.fd = channel->fd();
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
pollfds_.push_back(pfd);
int idx = static_cast<int>(pollfds_.size())-1;
channel->set_index(idx);
channels_[pfd.fd] = channel;
}
else
{
// update existing one
assert(channels_.find(channel->fd()) != channels_.end());
assert(channels_[channel->fd()] == channel);
int idx = channel->index();
assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
struct pollfd& pfd = pollfds_[idx];
assert(pfd.fd == channel->fd() || pfd.fd == -channel->fd()-1);
pfd.fd = channel->fd();
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
if (channel->isNoneEvent())
{
// ignore this pollfd
pfd.fd = -channel->fd()-1;
}
}
}
- 新增一个
Channel
(不会不关心任何事件): 新建一个并初始化一个struct pollfd
,加入到PollFdList pollfds_
中 设置Channel
的 index 为其在PollFdList pollfds_
的 offset 便于更新的时候查找 更新ChannelMap channels_
,新添加一个映射channels_[pfd.fd] = channel
- 更新一个
Channel
(更新对 fd 所关心事件): 根据Channel
的 index 从PollFdList pollfds_
找到其对应的struct pollfd
更新pfd.events
如果不关心任何事件,则把pfd.fd
设置成负数
Poll
Timestamp PollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
// XXX pollfds_ shouldn't change
int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs);
...
Timestamp now(Timestamp::now());
if (numEvents > 0)
{
LOG_TRACE << numEvents << " events happened";
fillActiveChannels(numEvents, activeChannels);
}
else if (numEvents == 0)
{
LOG_TRACE << " nothing happened";
}
else
{
// error
...
}
return now;
}
void PollPoller::fillActiveChannels(int numEvents,
ChannelList* activeChannels) const
{
for (PollFdList::const_iterator pfd = pollfds_.begin();
pfd != pollfds_.end() && numEvents > 0; ++pfd)
{
if (pfd->revents > 0)
{
--numEvents;
ChannelMap::const_iterator ch = channels_.find(pfd->fd);
assert(ch != channels_.end());
Channel* channel = ch->second;
assert(channel->fd() == pfd->fd);
channel->set_revents(pfd->revents);
// pfd->revents = 0;
activeChannels->push_back(channel);
}
}
}
调用 ::poll
,::poll
返回后立刻记录下时间
- 如果
numEvents > 0
则调用fillActiveChannels
,遍历PollFdList pollfds_
找到所有的pfd->revents > 0
的struct pollfd
,然后再找到其对应的Channel
,然后channel->set_revents(pfd->revents)
,最后将其加入ChannelList* activeChannels
- 如果
numEvents == 0
说明::poll
的 timeoutMs 超时
返回::poll
返回后记录下的时间
EPollPoller
class EPollPoller : public Poller
{
public:
EPollPoller(EventLoop* loop);
~EPollPoller() override;
Timestamp poll(int timeoutMs, ChannelList* activeChannels) override;
void updateChannel(Channel* channel) override;
void removeChannel(Channel* channel) override;
private:
static const int kInitEventListSize = 16;
void fillActiveChannels(int numEvents,
ChannelList* activeChannels) const;
void update(int operation, Channel* channel);
typedef std::vector<struct epoll_event> EventList;
int epollfd_;
EventList events_;
};
EPollPoller::EPollPoller(EventLoop* loop)
: Poller(loop),
epollfd_(::epoll_create1(EPOLL_CLOEXEC)),
events_(kInitEventListSize)
{
}
EPollPoller::~EPollPoller()
{
::close(epollfd_);
}
EventList events_
不是保存的所有关注的 fd 列表,而是保存的每一次 epoll_wait
调用返回的活动 fd 列表,初始大小为 kInitEventListSize = 16
,每一次 epoll_wait
都会复用 EventList events_
updateChannel
void EPollPoller::updateChannel(Channel* channel)
{
Poller::assertInLoopThread();
const int index = channel->index();
if (index == kNew || index == kDeleted)
{
// a new one, add with EPOLL_CTL_ADD
int fd = channel->fd();
if (index == kNew)
{
channels_[fd] = channel;
}
channel->set_index(kAdded);
update(EPOLL_CTL_ADD, channel);
}
else
{
// update existing one with EPOLL_CTL_MOD/DEL
int fd = channel->fd();
if (channel->isNoneEvent())
{
update(EPOLL_CTL_DEL, channel);
channel->set_index(kDeleted);
}
else
{
update(EPOLL_CTL_MOD, channel);
}
}
}
- 新增一个
Channel
(不会不关心任何事件)channel->index() == kNew
: 更新ChannelMap channels_
,新添加一个映射channels_[fd] = channel
设置Channel
的 index 为kNew
update(EPOLL_CTL_ADD, channel)
- 更新一个
Channel
(更新对 fd 所关心事件): 不关心任何事 -> 关心某些事:channel->set_index(kAdded);
update(EPOLL_CTL_ADD, channel);
关心某些事 -> 不关心任何事:channel->set_index(kDeleted);
update(EPOLL_CTL_DEL, channel);
关心某些事 -> 关心某些事:update(EPOLL_CTL_MOD, channel)
void EPollPoller::update(int operation, Channel* channel)
{
struct epoll_event event;
memZero(&event, sizeof event);
event.events = channel->events();
event.data.ptr = channel;
int fd = channel->fd();
if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)
{
// error
}
}
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event
{
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
} __EPOLL_PACKED;
epoll_data
是个 union ,muduo 使用的是 void *ptr
,用于存放 Channel *
,这样可以减少一步lookup
Poll
Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
int numEvents = ::epoll_wait(epollfd_,
&*events_.begin(),
static_cast<int>(events_.size()),
timeoutMs);
Timestamp now(Timestamp::now());
if (numEvents > 0)
{
fillActiveChannels(numEvents, activeChannels);
if (implicit_cast<size_t>(numEvents) == events_.size())
{
events_.resize(events_.size()*2);
}
}
else if (numEvents == 0)
{
LOG_TRACE << "nothing happened";
}
else
{
// error
}
return now;
}
void EPollPoller::fillActiveChannels(int numEvents,
ChannelList* activeChannels) const
{
assert(implicit_cast<size_t>(numEvents) <= events_.size());
for (int i = 0; i < numEvents; ++i)
{
Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
channel->set_revents(events_[i].events);
activeChannels->push_back(channel);
}
}
调用 ::epoll_wait
,::epoll_wait
返回后立刻记录下时间
- 如果
numEvents > 0
则调用fillActiveChannels
,EventList events_
中已经存在了numEvents
个活动 fd 各自所对应的struct epoll_event
,epoll_data
的ptr
存放了Channel *
,然后channel->set_revents(events_[i].events)
,最后将其加入ChannelList* activeChannels
如果numEvents == events_.size()
,说明events_
的大小可能不够,对它进行两倍扩容 - 如果
numEvents == 0
说明::epoll_wait
的 timeoutMs 超时
返回::epoll_wait
返回后记录下的时间
Channel
每个 Channel
对象自始至终只属于一个 EventLoop
,因此每一个 Channel
对象只属于某一个IO线程。每个 Channel
对象自始至终只负责一个 fd 的 IO事件分发,但它并不拥有这个 fd(class Socket
拥有此 fd),也不需要在析构的时候关闭 fd。Channel
会把不同的IO事件分发为不同的回调函数(readCallback_
,writeCallback_
,closeCallback_
,errorCallback_
)。Channel
的生命周期由其 owner class负责管理,它一般是其他 class 的直接或间接成员
例如 Acceptor
中的 Channel acceptChannel_
成员,TcpConnection
中的 Channel channel_
成员
///
/// A selectable I/O channel.
///
/// This class doesn't own the file descriptor.
/// The file descriptor could be a socket,
/// an eventfd, a timerfd, or a signalfd
class Channel : noncopyable
{
public:
typedef std::function<void()> EventCallback;
typedef std::function<void(Timestamp)> ReadEventCallback;
Channel(EventLoop* loop, int fd);
~Channel();
void handleEvent(Timestamp receiveTime);
void setReadCallback(ReadEventCallback cb)
{ readCallback_ = std::move(cb); }
void setWriteCallback(EventCallback cb)
{ writeCallback_ = std::move(cb); }
void setCloseCallback(EventCallback cb)
{ closeCallback_ = std::move(cb); }
void setErrorCallback(EventCallback cb)
{ errorCallback_ = std::move(cb); }
/// Tie this channel to the owner object managed by shared_ptr,
/// prevent the owner object being destroyed in handleEvent.
void tie(const std::shared_ptr<void>& obj)
{
tie_ = obj;
tied_ = true;
}
int fd() const { return fd_; }
int events() const { return events_; }
void set_revents(int revt) { revents_ = revt; } // used by pollers
bool isNoneEvent() const { return events_ == kNoneEvent; }
void enableReading() { events_ |= kReadEvent; update(); }
void disableReading() { events_ &= ~kReadEvent; update(); }
void enableWriting() { events_ |= kWriteEvent; update(); }
void disableWriting() { events_ &= ~kWriteEvent; update(); }
void disableAll() { events_ = kNoneEvent; update(); }
bool isWriting() const { return events_ & kWriteEvent; }
bool isReading() const { return events_ & kReadEvent; }
// for Poller
int index() { return index_; }
void set_index(int idx) { index_ = idx; }
EventLoop* ownerLoop() { return loop_; }
void remove() {
assert(isNoneEvent());
addedToLoop_ = false;
loop_->removeChannel(this);
}
private:
void update() {
addedToLoop_ = true;
loop_->updateChannel(this);
}
void handleEventWithGuard(Timestamp receiveTime);
static const int kNoneEvent = 0;
static const int kReadEvent = POLLIN | POLLPRI;
static const int kWriteEvent = POLLOUT;
EventLoop* loop_;
const int fd_;
int events_; // 用户关心的IO事件
int revents_; // 目前活动的事件,由Poller设置。it's the received event types of epoll or poll
int index_; // used by Poller.
std::weak_ptr<void> tie_; // 用于绑定这个 channel 对象所属的对象
bool tied_; // 是否绑定了这个 channel 对象所属的对象
bool eventHandling_; // 是否正在执行用户指定的事件的回调函数
bool addedToLoop_;
ReadEventCallback readCallback_;
EventCallback writeCallback_;
EventCallback closeCallback_;
EventCallback errorCallback_;
};
Channel
的作用是:
- 记录 fd,关心的 fd 事件(可读,可写,关闭,出错), fd 事件分别的回调函数
- 设置开始关心某些事件,设置停止关心某些事件
- 根据已经发生的事件,分别调用事件的回调函数
Channel
的函数成员都只能其所属的EventLoop所属的线程被调用,因此更新数据成员都不必加锁
设置关心事件
Channel::update
-> EventLoop::updateChannel
-> EPollPoller::updateChannel
或者 PollPoller::updateChannel
见上面
class Channel : noncopyable
{
public:
...
void enableReading() { events_ |= kReadEvent; update(); }
void disableReading() { events_ &= ~kReadEvent; update(); }
void enableWriting() { events_ |= kWriteEvent; update(); }
void disableWriting() { events_ &= ~kWriteEvent; update(); }
void disableAll() { events_ = kNoneEvent; update(); }
bool isWriting() const { return events_ & kWriteEvent; }
bool isReading() const { return events_ & kReadEvent; }
...
};
void Channel::update()
{
addedToLoop_ = true;
loop_->updateChannel(this);
}
void EventLoop::updateChannel(Channel* channel)
{
assert(channel->ownerLoop() == this);
assertInLoopThread();
poller_->updateChannel(channel);
}
handleEvent()
Channel::handleEvent()
是 Channel
的核心,它由 EventLoop::loop()
调用,
它的核心功能是根据 revent_
的值分别调用不同的用户回调
void Channel::handleEvent(Timestamp receiveTime)
{
std::shared_ptr<void> guard;
if (tied_) // 绑定了其所属的对象
{
guard = tie_.lock();
if (guard) // 其对象没有被析构
{
handleEventWithGuard(receiveTime);
}
}
else
{
handleEventWithGuard(receiveTime);
}
}
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
eventHandling_ = true;
if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
{
if (closeCallback_) closeCallback_();
}
if (revents_ & POLLNVAL)
{
LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLNVAL";
}
if (revents_ & (POLLERR | POLLNVAL))
{
if (errorCallback_) errorCallback_();
}
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
{
if (readCallback_) readCallback_(receiveTime);
}
if (revents_ & POLLOUT)
{
if (writeCallback_) writeCallback_();
}
eventHandling_ = false;
}