muduo EventLoop
class EventLoop : noncopyable
{
public:
typedef std::function<void()> Functor;
EventLoop();
~EventLoop(); // force out-line dtor, for std::unique_ptr members.
/// Loops forever.
/// Must be called in the same thread as creation of the object.
void loop();
/// Quits loop.
/// This is not 100% thread safe, if you call through a raw pointer,
/// better to call through shared_ptr<EventLoop> for 100% safety.
void quit();
...
/// Runs callback immediately in the loop thread.
/// It wakes up the loop, and run the cb.
/// If in the same loop thread, cb is run within the function.
/// Safe to call from other threads.
void runInLoop(Functor cb);
/// Queues callback in the loop thread.
/// Runs after finish pooling.
/// Safe to call from other threads.
void queueInLoop(Functor cb);
size_t queueSize() const;
/// Runs callback at 'time'.
/// Safe to call from other threads.
TimerId runAt(Timestamp time, TimerCallback cb);
/// Runs callback after @c delay seconds.
/// Safe to call from other threads.
TimerId runAfter(double delay, TimerCallback cb);
/// Runs callback every @c interval seconds.
/// Safe to call from other threads.
TimerId runEvery(double interval, TimerCallback cb);
/// Cancels the timer.
/// Safe to call from other threads.
void cancel(TimerId timerId);
// internal usage
void wakeup();
void updateChannel(Channel* channel);
void removeChannel(Channel* channel);
bool hasChannel(Channel* channel);
void assertInLoopThread();
bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }
...
static EventLoop* getEventLoopOfCurrentThread();
private:
void abortNotInLoopThread();
void handleRead(); // waked up
void doPendingFunctors();
typedef std::vector<Channel*> ChannelList;
bool looping_; // 该 eventloop 所属的线程,目前跑在 EventLoop::loop 的循环里
std::atomic<bool> quit_; // 要求该 eventloop 所属的线程,跳出 EventLoop::loop 的循环
bool eventHandling_; // 该 eventloop 所属的线程,目前跑在 EventLoop::loop 的循环里,poller_->poll 正在返回,正在依次调用每一个 active channels 的回调函数
bool callingPendingFunctors_; // 正在调用 pendingFunctors_ 中的回调函数
int64_t iteration_; // EventLoop::loop 已经循环的次数
const pid_t threadId_;
Timestamp pollReturnTime_;
std::unique_ptr<Poller> poller_;
std::unique_ptr<TimerQueue> timerQueue_;
int wakeupFd_;
std::unique_ptr<Channel> wakeupChannel_;
boost::any context_;
// scratch variables
ChannelList activeChannels_; // used in poller_->poll, stores active channels found by poller_
Channel* currentActiveChannel_; // stores the current active channel who is currently executing callback
mutable MutexLock mutex_;
std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_); // 正在调用 pendingFunctors_ 中的回调函数
};
one loop per thread 顾名思义是每个线程只能有一个 EventLoop
对象,因此 EvenLoop
的构造函数会检查当前线程是否已经创建了其他 EventLoop
对象
// __thread是GCC内置的线程局部存储设施,存取效率可以和全局变量相比。
// __thread变量每一个线程有一份独立实体,各个线程的值互不干扰。可以用来修饰那些带有全局性且值可能变,但是又不值得用全局变量保护的变量。
__thread EventLoop* t_loopInThisThread = 0;
EventLoop* static EventLoop::getEventLoopOfCurrentThread()
{
return t_loopInThisThread;
}
EventLoop::EventLoop()
: looping_(false),
quit_(false),
eventHandling_(false),
callingPendingFunctors_(false),
iteration_(0),
threadId_(CurrentThread::tid()),
poller_(Poller::newDefaultPoller(this)),
...
currentActiveChannel_(NULL)
{
if (t_loopInThisThread)
{
LOG_FATAL << "Another EventLoop " << t_loopInThisThread
<< " exists in this thread " << threadId_;
}
else
{
t_loopInThisThread = this;
}
...
}
创建了 EventLoop
对象的线程是 IO线程,其主要功能是运行事件循环。EventLoop
对象所属的生命周期应该和其所属的线程一样长,因此它可以是一个在线程主函数上的stack对象(线程结束后将自动析构)
loop
loop
是 EventLoop
的核心功能。
事件循环必须在 IO线程 执行,因此 EventLoop::loop()
会检首先查这一条件。
///
/// Loops forever.
///
/// Must be called in the same thread as creation of the object.
///
void EventLoop::loop()
{
assertInLoopThread();
looping_ = true;
quit_ = false; // FIXME: what if someone calls quit() before loop() ?
while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
++iteration_;
// TODO sort channel by priority
eventHandling_ = true;
for (Channel* channel : activeChannels_)
{
currentActiveChannel_ = channel;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
...
}
looping_ = false;
}
EventLoop::loop()
它调用 Poller::poll()
(负责多路复用)获得当前活动事件的 Channel
列表,然后原地依次调用每个 Channel
的 handleEvent()
函数(负责任务分发,根据发生事件的类型再原地调用该 Channel
对应事件类型的回调)
一般来说 Channel
可读事件的回调函数干的都是从 fd 里读东西出来,然后把东西放到 buff 里,再去调用用户指定的回调,读取 fd 这一操作一定是在 EventLoop
线程做的。
一般来说 Channel
可写事件的回调函数干的都是往 fd 里写东西,要是都写进去了,就去调用用户指定的回调,并不再关注可写事件,要是没有都写进去就把剩余的放到buff里,继续关注可写事件,写 fd 这一操作一定是在 EventLoop
线程做的。
EventLoop
,Poller
,Channel
构成了 Reactor 模式的核心内容
当我们想终止IO线程的事件循环时,需要调用 EventLoop::quit()
/// Quits loop.
///
/// This is not 100% thread safe, if you call through a raw pointer,
/// better to call through shared_ptr<EventLoop> for 100% safety.
void EventLoop::quit()
{
quit_ = true;
// There is a chance that loop() just executes while(!quit_) and exits,
// then EventLoop destructs, then we are accessing an invalid object.
// Can be fixed using mutex_ in both places. ????
if (!isInLoopThread())
{
wakeup();
}
}
如果是IO线程自己调用 EventLoop::quit()
,说明此时没有卡在 Poller::poll
中,只需要 quit_ = true
,这样下一次 while (!quit_)
的时候会跳出循环
如果是其他线程,调用IO线程的 EventLoop::quit()
,此时IO线程可能正卡在 Poller::poll
中,需要将其 wakeup()
wakeup
当 IO线程 正卡在 Poller::poll
时,需要有办法让其他线程可以唤醒 IO线程,现在Linux有了eventfd(2),可以更高效的唤醒
EventLoop::EventLoop()
: ...
wakeupFd_(createEventfd()),
wakeupChannel_(new Channel(this, wakeupFd_)),
...
{
...
wakeupChannel_->setReadCallback(
std::bind(&EventLoop::handleRead, this));
// we are always reading the wakeupfd
wakeupChannel_->enableReading();
}
int createEventfd()
{
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
return evtfd;
}
在 EventLoop
的构造函数中,会先生成 wakeupFd_
,构造 wakeupChannel_
,然后设置 ReadCallBack 为 EventLoop::handleRead
,最后设置关心可读事件,注册在 Poller
中
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
}
}
void EventLoop::handleRead()
{
uint64_t one = 1;
ssize_t n = sockets::read(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
}
}
其他线程需要唤醒 IO线程 的时候,直接往 wakeupFd_
里面写东西(uint64_t
),这样 wakeupFd_
有可读事件,就会从 Poller::poll
返回,然后会回调 wakeupFd_
的 ReadCallBack ,把东西读出来 uint64_t
调配任务
EventLoop
有个非常有用的功能:在它的IO线程内执行某个用户任务回调,即 EventLoop::runInLoop
,其中 Functor
是 std::function<void()>
。如果IO线程自己调用这个函数,回调会同步进行。
这个功能常常被用来添加,修改,或删除需要监听的 Channel
。例如 TcpConnection::connectEstablished
TcpConnection::connectDestroyed
void EventLoop::runInLoop(Functor cb)
{
if (isInLoopThread())
{
cb();
}
else
{
queueInLoop(std::move(cb));
}
}
如果用户在其他线程调用 EventLoop::runInLoop
,cb会被加入pendingFunctors_
队列,IO线程会被唤醒来调用这个 Functor
void EventLoop::queueInLoop(Functor cb)
{
{
MutexLockGuard lock(mutex_);
pendingFunctors_.push_back(std::move(cb));
}
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}
void EventLoop::loop()
{
...
while (!quit_)
{
...
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
...
doPendingFunctors();
}
...
}
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);
}
for (const Functor& functor : functors)
{
functor();
}
callingPendingFunctors_ = false;
}
EventLoop: : doPendingFunctors()
不是简单地在临界区内依次调用 Functor
,而是把回调列表 swap()
到局部变量 functors
中,这样一方面减小了临界区的长度(意味着不会阻塞其他线程调用 queueInLoop()
),另一方面也避免了死锁(因为 Functor
可能再调用 queueInLoop()
)。
安排定时任务
EventLoop
中和 Timer 有关的接口全都可以跨线程调用,用来安排执行定时任务(TimerCallback
)
typedef std::function<void()> TimerCallback;
class EventLoop : noncopyable
{
public:
...
///
/// Runs callback at 'time'.
/// Safe to call from other threads.
///
TimerId runAt(Timestamp time, TimerCallback cb);
///
/// Runs callback after @c delay seconds.
/// Safe to call from other threads.
///
TimerId runAfter(double delay, TimerCallback cb);
///
/// Runs callback every @c interval seconds.
/// Safe to call from other threads.
///
TimerId runEvery(double interval, TimerCallback cb);
///
/// Cancels the timer.
/// Safe to call from other threads.
///
void cancel(TimerId timerId);
...
};
以 EventLoop::runEvery
为例
TimerId EventLoop::runEvery(double interval, TimerCallback cb)
{
Timestamp time(addTime(Timestamp::now(), interval));
return timerQueue_->addTimer(std::move(cb), time, interval);
}
TimerQueue
构造函数会先生成 timerfd_
,构造 timerfdChannel_
,然后设置 ReadCallBack 为 TimerQueue::handleRead
,最后设置关心可读事件,注册在 Poller
中
EventLoop::EventLoop()
: ...
timerQueue_(new TimerQueue(this)),
...
{
...
}
TimerQueue::TimerQueue(EventLoop* loop)
: loop_(loop),
timerfd_(createTimerfd()),
timerfdChannel_(loop, timerfd_),
timers_(),
callingExpiredTimers_(false)
{
timerfdChannel_.setReadCallback(
std::bind(&TimerQueue::handleRead, this));
// we are always reading the timerfd, we disarm it with timerfd_settime.
timerfdChannel_.enableReading();
}
int createTimerfd()
{
int timerfd = ::timerfd_create(CLOCK_MONOTONIC,
TFD_NONBLOCK | TFD_CLOEXEC);
return timerfd;
}
TimerQueue
需要能高效的组织目前尚未到期的 Timer
:能快速根据当前时间找到已到期的 Timer
,也要能高效的添加和删除 Timer
。
一种做法是用二叉堆组织的优先队列,这种做法的复杂度为 O(logN)
另一种做法是使用使用二叉搜索树(std::set
,std::map
),把 Timer
按到期的时间排好先后顺序。操作的时间复杂度仍是 O(logN)。但是我们不能直接用 std::map<Timestamp, Timer*>
,因为这样无法处理两个 Timer
到期时间相同的情况。有两个解决方案,一个是用 multimap
或 multiset
,二是设法区分 key ,muduo 采用的是第二种做法。具体来说,以 pair<Timestamp, Timer*>
为 key,这样即便两个 Timer
到期时间相同,key 也不同
typedef std::pair<Timestamp, Timer*> Entry;
typedef std::set<Entry> TimerList;
///
/// A best efforts timer queue.
/// No guarantee that the callback will be on time.
///
class TimerQueue : noncopyable
{
...
// Timer list sorted by expiration
TimerList timers_;
...
};
Timer
记录了 callback, expiration, interval, repeat
typedef std::function<void()> TimerCallback;
class Timer : noncopyable
{
public:
Timer(TimerCallback cb, Timestamp when, double interval)
: callback_(std::move(cb)),
expiration_(when),
interval_(interval),
repeat_(interval > 0.0),
sequence_(s_numCreated_.incrementAndGet())
{ }
void run() const
{
callback_();
}
...
private:
const TimerCallback callback_;
Timestamp expiration_;
const double interval_;
const bool repeat_;
const int64_t sequence_;
static AtomicInt64 s_numCreated_;
};
TimerQueue::addTimer
是利用了 EventLoop
调配任务的特性,调配了一个 TimerQueue::addTimerInLoop
来添加定时任务
TimerId TimerQueue::addTimer(TimerCallback cb,
Timestamp when,
double interval)
{
Timer* timer = new Timer(std::move(cb), when, interval);
loop_->runInLoop(
std::bind(&TimerQueue::addTimerInLoop, this, timer));
return TimerId(timer, timer->sequence());
}
先将 Timer
插入到 timers_
,如果新插入的 Timer
到期时间是最小的,那么 resetTimerfd
void TimerQueue::addTimerInLoop(Timer* timer)
{
loop_->assertInLoopThread();
bool earliestChanged = insert(timer);
if (earliestChanged)
{
resetTimerfd(timerfd_, timer->expiration());
}
}
bool TimerQueue::insert(Timer* timer)
{
loop_->assertInLoopThread();
assert(timers_.size() == activeTimers_.size());
bool earliestChanged = false;
Timestamp when = timer->expiration();
TimerList::iterator it = timers_.begin();
if (it == timers_.end() || when < it->first)
{ // 队列空,或者 when 小于队列头
earliestChanged = true;
}
{
std::pair<TimerList::iterator, bool> result
= timers_.insert(Entry(when, timer));
assert(result.second); (void)result;
}
...
return earliestChanged;
}
void resetTimerfd(int timerfd, Timestamp expiration)
{
// wake up loop by timerfd_settime()
struct itimerspec newValue;
struct itimerspec oldValue;
memZero(&newValue, sizeof newValue);
memZero(&oldValue, sizeof oldValue);
newValue.it_value = howMuchTimeFromNow(expiration);
int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);
}
timerfd_
的 ReadCallBack 为 TimerQueue::handleRead
void TimerQueue::handleRead()
{
loop_->assertInLoopThread();
Timestamp now(Timestamp::now());
readTimerfd(timerfd_, now);
std::vector<Entry> expired = getExpired(now);
callingExpiredTimers_ = true;
...
// safe to callback outside critical section
for (const Entry& it : expired)
{
it.second->run();
}
callingExpiredTimers_ = false;
reset(expired, now);
}
std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
{
assert(timers_.size() == activeTimers_.size());
std::vector<Entry> expired;
Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
TimerList::iterator end = timers_.lower_bound(sentry);
assert(end == timers_.end() || now < end->first);
std::copy(timers_.begin(), end, back_inserter(expired));
timers_.erase(timers_.begin(), end);
...
return expired;
}
void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
{
Timestamp nextExpire;
for (const Entry& it : expired)
{
ActiveTimer timer(it.second, it.second->sequence());
if (it.second->repeat() // 将过期了的但是,repeat 的 timer 加回 timers_
&& ...)
{
it.second->restart(now);
insert(it.second);
}
else
{
// FIXME move to a free list
delete it.second; // FIXME: no delete please
}
}
if (!timers_.empty())
{
nextExpire = timers_.begin()->second->expiration();
}
if (nextExpire.valid())
{
resetTimerfd(timerfd_, nextExpire);
}
}
- 从
timerfd_
里读东西出来 - 从
timers_
找到所有失效的Timer
- 依次调用所有失效
Timer
的用户回调函数 resetTimerfd
为下一个到期时间最小的Timer
EventLoopThread
IO线程不一定是主线程,我们可以在任何一个线程创建并运行 EventLoop
。一个程序也可以有不止一个IO线程,我们可以按优先级将不同的 socket 分给不同的IO线程,避免优先级反转。为了方便将来使用,我们定义 EventLoopThread
class,这正是 one loop per thread 的本意。
class EventLoopThread : noncopyable
{
public:
typedef std::function<void(EventLoop*)> ThreadInitCallback;
EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback(),
const string& name = string());
~EventLoopThread();
EventLoop* startLoop();
private:
void threadFunc();
EventLoop* loop_ GUARDED_BY(mutex_);
Thread thread_;
MutexLock mutex_;
Condition cond_ GUARDED_BY(mutex_);
ThreadInitCallback callback_;
};
EventLoopThread::EventLoopThread(const ThreadInitCallback& cb,
const string& name)
: loop_(NULL),
thread_(std::bind(&EventLoopThread::threadFunc, this), name),
mutex_(),
cond_(mutex_),
callback_(cb)
{
}
EventLoopThread
会启动自己的线程(thread_(std::bind(&EventLoopThread::threadFunc, this), name)
),并在其中运行 EventLoop::loop()
。启动函数 startLoop()
定义如下,这个函数会返回新线程中 EventLoop
对象的指针,这样就可以对新线程中 EventLoop
对象执行调配任务,安排定时任务等操作。
EventLoop* EventLoopThread::startLoop()
{
thread_.start();
EventLoop* loop = NULL;
{
MutexLockGuard lock(mutex_);
while (loop_ == NULL)
{
cond_.wait(); // 用条件变量来等待线程的创建与运行
}
loop = loop_;
}
return loop;
}
线程主函数在stack上定义 EventLoop
对象,然后将其地址赋值给成员变量 loop_
,最后 notify()
条件变量,唤醒 startLoop()
。
如果设置了 callback_
回调函数,即在这个线程开始的时候需要调用这个回调
void EventLoopThread::threadFunc()
{
EventLoop loop;
if (callback_)
{
callback_(&loop);
}
{
MutexLockGuard lock(mutex_);
loop_ = &loop;
cond_.notify();
}
loop.loop();
MutexLockGuard lock(mutex_);
loop_ = NULL;
}
EventLoopThread
对象析构的时候,会 loop_->quit()
使新线程从 loop.loop()
中返回
EventLoopThread::~EventLoopThread()
{
if (loop_ != NULL) // not 100% race-free, eg. threadFunc could be running callback_.
{
// still a tiny chance to call destructed object, if threadFunc exits just now.
// but when EventLoopThread destructs, usually programming is exiting anyway.
loop_->quit();
thread_.join();
}
}
EventLoopThreadPool
即多个 EventLoopThread
class EventLoopThreadPool : noncopyable
{
public:
typedef std::function<void(EventLoop*)> ThreadInitCallback;
EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg);
~EventLoopThreadPool();
void setThreadNum(int numThreads) { numThreads_ = numThreads; }
void start(const ThreadInitCallback& cb = ThreadInitCallback());
// valid after calling start()
/// round-robin
EventLoop* getNextLoop();
...
private:
EventLoop* baseLoop_; // baseloop_ 不属于 threads_ 中任何一个 EventLoopThread
string name_;
bool started_;
int numThreads_;
int next_;
std::vector<std::unique_ptr<EventLoopThread>> threads_;
std::vector<EventLoop*> loops_;
};
EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg)
: baseLoop_(baseLoop),
name_(nameArg),
started_(false),
numThreads_(0),
next_(0)
{
}
生成 numThreads_
个 EventLoopThread
,并依次调用 EventLoopThread::startLoop
,获取每个的 EventLoop
对象,保存在 loops_
void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{
assert(!started_);
baseLoop_->assertInLoopThread();
started_ = true;
for (int i = 0; i < numThreads_; ++i)
{
char buf[name_.size() + 32];
snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);
EventLoopThread* t = new EventLoopThread(cb, buf);
threads_.push_back(std::unique_ptr<EventLoopThread>(t));
loops_.push_back(t->startLoop());
}
if (numThreads_ == 0 && cb)
{
cb(baseLoop_);
}
}
以 round-robin 的策略返回一个 EventLoop
,如果该 EventLoopThreadPool
,压根就没有任何 thread,那么就返回 baseLoop_
EventLoop* EventLoopThreadPool::getNextLoop()
{
baseLoop_->assertInLoopThread();
assert(started_);
EventLoop* loop = baseLoop_;
if (!loops_.empty())
{
// round-robin
loop = loops_[next_];
++next_;
if (implicit_cast<size_t>(next_) >= loops_.size())
{
next_ = 0;
}
}
return loop;
}