目录

muduo EventLoop

目录
class EventLoop : noncopyable
{
 public:
  typedef std::function<void()> Functor;

  EventLoop();
  ~EventLoop();  // force out-line dtor, for std::unique_ptr members.

  /// Loops forever.
  /// Must be called in the same thread as creation of the object.
  void loop();

  /// Quits loop.
  /// This is not 100% thread safe, if you call through a raw pointer,
  /// better to call through shared_ptr<EventLoop> for 100% safety.
  void quit();

  ...

  /// Runs callback immediately in the loop thread.
  /// It wakes up the loop, and run the cb.
  /// If in the same loop thread, cb is run within the function.
  /// Safe to call from other threads.
  void runInLoop(Functor cb);
  /// Queues callback in the loop thread.
  /// Runs after finish pooling.
  /// Safe to call from other threads.
  void queueInLoop(Functor cb);

  size_t queueSize() const;

  /// Runs callback at 'time'.
  /// Safe to call from other threads.
  TimerId runAt(Timestamp time, TimerCallback cb);
  /// Runs callback after @c delay seconds.
  /// Safe to call from other threads.
  TimerId runAfter(double delay, TimerCallback cb);
  /// Runs callback every @c interval seconds.
  /// Safe to call from other threads.
  TimerId runEvery(double interval, TimerCallback cb);
  /// Cancels the timer.
  /// Safe to call from other threads.
  void cancel(TimerId timerId);

  // internal usage
  void wakeup();
  void updateChannel(Channel* channel);
  void removeChannel(Channel* channel);
  bool hasChannel(Channel* channel);

  void assertInLoopThread();
  bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }

  ...

  static EventLoop* getEventLoopOfCurrentThread();

 private:
  void abortNotInLoopThread();
  void handleRead();  // waked up
  void doPendingFunctors();

  typedef std::vector<Channel*> ChannelList;

  bool looping_; // 该 eventloop 所属的线程,目前跑在 EventLoop::loop 的循环里
  std::atomic<bool> quit_; // 要求该 eventloop 所属的线程,跳出 EventLoop::loop 的循环
  bool eventHandling_; // 该 eventloop 所属的线程,目前跑在 EventLoop::loop 的循环里,poller_->poll 正在返回,正在依次调用每一个 active channels 的回调函数
  bool callingPendingFunctors_; // 正在调用 pendingFunctors_ 中的回调函数
  int64_t iteration_; // EventLoop::loop 已经循环的次数
  const pid_t threadId_;
  Timestamp pollReturnTime_;
  std::unique_ptr<Poller> poller_; 
  std::unique_ptr<TimerQueue> timerQueue_;
  int wakeupFd_;
  std::unique_ptr<Channel> wakeupChannel_;
  boost::any context_;

  // scratch variables
  ChannelList activeChannels_; // used in poller_->poll, stores active channels found by poller_
  Channel* currentActiveChannel_; // stores the current active channel who is currently executing callback

  mutable MutexLock mutex_;
  std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_); // 正在调用 pendingFunctors_ 中的回调函数
};

one loop per thread 顾名思义是每个线程只能有一个 EventLoop 对象,因此 EvenLoop 的构造函数会检查当前线程是否已经创建了其他 EventLoop 对象

// ​ __thread是GCC内置的线程局部存储设施,存取效率可以和全局变量相比。
//  __thread变量每一个线程有一份独立实体,各个线程的值互不干扰。可以用来修饰那些带有全局性且值可能变,但是又不值得用全局变量保护的变量。
__thread EventLoop* t_loopInThisThread = 0;

EventLoop* static EventLoop::getEventLoopOfCurrentThread()
{
  return t_loopInThisThread;
}

EventLoop::EventLoop()
  : looping_(false),
    quit_(false),
    eventHandling_(false),
    callingPendingFunctors_(false),
    iteration_(0),
    threadId_(CurrentThread::tid()),
    poller_(Poller::newDefaultPoller(this)),
    ...
    currentActiveChannel_(NULL)
{
  if (t_loopInThisThread)
  {
    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
              << " exists in this thread " << threadId_;
  }
  else
  {
    t_loopInThisThread = this;
  }
  ...
}

创建了 EventLoop 对象的线程是 IO线程,其主要功能是运行事件循环。EventLoop 对象所属的生命周期应该和其所属的线程一样长,因此它可以是一个在线程主函数上的stack对象(线程结束后将自动析构)

loop

loopEventLoop 的核心功能。 事件循环必须在 IO线程 执行,因此 EventLoop::loop() 会检首先查这一条件。

///
/// Loops forever.
///
/// Must be called in the same thread as creation of the object.
///
void EventLoop::loop()
{
  assertInLoopThread();
  looping_ = true;
  quit_ = false;  // FIXME: what if someone calls quit() before loop() ?

  while (!quit_)
  {
    activeChannels_.clear();
    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
    ++iteration_;
    // TODO sort channel by priority
    eventHandling_ = true;
    for (Channel* channel : activeChannels_)
    {
      currentActiveChannel_ = channel;
      currentActiveChannel_->handleEvent(pollReturnTime_);
    }
    currentActiveChannel_ = NULL;
    eventHandling_ = false;
    ...
  }

  looping_ = false;
}

EventLoop::loop() 它调用 Poller::poll()(负责多路复用)获得当前活动事件的 Channel 列表,然后原地依次调用每个 ChannelhandleEvent()函数(负责任务分发,根据发生事件的类型再原地调用该 Channel 对应事件类型的回调) 一般来说 Channel 可读事件的回调函数干的都是从 fd 里读东西出来,然后把东西放到 buff 里,再去调用用户指定的回调,读取 fd 这一操作一定是在 EventLoop 线程做的。 一般来说 Channel 可写事件的回调函数干的都是往 fd 里写东西,要是都写进去了,就去调用用户指定的回调,并不再关注可写事件,要是没有都写进去就把剩余的放到buff里,继续关注可写事件,写 fd 这一操作一定是在 EventLoop 线程做的EventLoopPollerChannel构成了 Reactor 模式的核心内容 https://img-blog.csdnimg.cn/1c726eb8845044ddb800fa1df347cd26.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAQnJhbnppbm8=,size_20,color_FFFFFF,t_70,g_se,x_16#pic_center 当我们想终止IO线程的事件循环时,需要调用 EventLoop::quit()

/// Quits loop.
///
/// This is not 100% thread safe, if you call through a raw pointer,
/// better to call through shared_ptr<EventLoop> for 100% safety.
void EventLoop::quit()
{
  quit_ = true;
  // There is a chance that loop() just executes while(!quit_) and exits,
  // then EventLoop destructs, then we are accessing an invalid object.
  // Can be fixed using mutex_ in both places. ????
  if (!isInLoopThread())
  {
    wakeup();
  }
}

如果是IO线程自己调用 EventLoop::quit(),说明此时没有卡在 Poller::poll中,只需要 quit_ = true,这样下一次 while (!quit_) 的时候会跳出循环 如果是其他线程,调用IO线程的 EventLoop::quit(),此时IO线程可能正卡在 Poller::poll 中,需要将其 wakeup()

wakeup

IO线程 正卡在 Poller::poll 时,需要有办法让其他线程可以唤醒 IO线程,现在Linux有了eventfd(2),可以更高效的唤醒

EventLoop::EventLoop()
  : ...
    wakeupFd_(createEventfd()),
    wakeupChannel_(new Channel(this, wakeupFd_)),
    ...
{
  ...
  wakeupChannel_->setReadCallback(
      std::bind(&EventLoop::handleRead, this));
  // we are always reading the wakeupfd
  wakeupChannel_->enableReading();
}

int createEventfd()
{
  int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
  return evtfd;
}

EventLoop 的构造函数中,会先生成 wakeupFd_,构造 wakeupChannel_,然后设置 ReadCallBack 为 EventLoop::handleRead,最后设置关心可读事件,注册在 Poller

void EventLoop::wakeup()
{
  uint64_t one = 1;
  ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
  if (n != sizeof one)
  {
    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
  }
}

void EventLoop::handleRead()
{
  uint64_t one = 1;
  ssize_t n = sockets::read(wakeupFd_, &one, sizeof one);
  if (n != sizeof one)
  {
    LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
  }
}

其他线程需要唤醒 IO线程 的时候,直接往 wakeupFd_ 里面写东西(uint64_t),这样 wakeupFd_有可读事件,就会从 Poller::poll 返回,然后会回调 wakeupFd_ 的 ReadCallBack ,把东西读出来 uint64_t

调配任务

EventLoop 有个非常有用的功能:在它的IO线程内执行某个用户任务回调,即 EventLoop::runInLoop,其中 Functorstd::function<void()>。如果IO线程自己调用这个函数,回调会同步进行。 这个功能常常被用来添加,修改,或删除需要监听的 Channel。例如 TcpConnection::connectEstablished TcpConnection::connectDestroyed

void EventLoop::runInLoop(Functor cb)
{
  if (isInLoopThread())
  {
    cb();
  }
  else
  {
    queueInLoop(std::move(cb));
  }
}

如果用户在其他线程调用 EventLoop::runInLoop ,cb会被加入pendingFunctors_队列,IO线程会被唤醒来调用这个 Functor

void EventLoop::queueInLoop(Functor cb)
{
  {
  MutexLockGuard lock(mutex_);
  pendingFunctors_.push_back(std::move(cb));
  }

  if (!isInLoopThread() || callingPendingFunctors_)
  {
    wakeup();
  }
}

void EventLoop::loop()
{
  ...
  
  while (!quit_)
  {
    ...
    
    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
    ...
    
    doPendingFunctors();
  }
  ...
  
}

void EventLoop::doPendingFunctors()
{
  std::vector<Functor> functors;
  callingPendingFunctors_ = true;

  {
  MutexLockGuard lock(mutex_);
  functors.swap(pendingFunctors_);
  }

  for (const Functor& functor : functors)
  {
    functor();
  }
  callingPendingFunctors_ = false;
}

EventLoop: : doPendingFunctors() 不是简单地在临界区内依次调用 Functor,而是把回调列表 swap() 到局部变量 functors 中,这样一方面减小了临界区的长度(意味着不会阻塞其他线程调用 queueInLoop() ),另一方面也避免了死锁(因为 Functor 可能再调用 queueInLoop())。

安排定时任务

EventLoop 中和 Timer 有关的接口全都可以跨线程调用,用来安排执行定时任务(TimerCallback

typedef std::function<void()> TimerCallback;

class EventLoop : noncopyable
{
 public:
  ...
  
  ///
  /// Runs callback at 'time'.
  /// Safe to call from other threads.
  ///
  TimerId runAt(Timestamp time, TimerCallback cb);
  ///
  /// Runs callback after @c delay seconds.
  /// Safe to call from other threads.
  ///
  TimerId runAfter(double delay, TimerCallback cb);
  ///
  /// Runs callback every @c interval seconds.
  /// Safe to call from other threads.
  ///
  TimerId runEvery(double interval, TimerCallback cb);
  ///
  /// Cancels the timer.
  /// Safe to call from other threads.
  ///
  void cancel(TimerId timerId);
  ...
  
};

EventLoop::runEvery 为例

TimerId EventLoop::runEvery(double interval, TimerCallback cb)
{
  Timestamp time(addTime(Timestamp::now(), interval));
  return timerQueue_->addTimer(std::move(cb), time, interval);
}

TimerQueue 构造函数会先生成 timerfd_,构造 timerfdChannel_,然后设置 ReadCallBack 为 TimerQueue::handleRead,最后设置关心可读事件,注册在 Poller

EventLoop::EventLoop()
  : ...
    timerQueue_(new TimerQueue(this)),
    ...
{
  ...
}

TimerQueue::TimerQueue(EventLoop* loop)
  : loop_(loop),
    timerfd_(createTimerfd()),
    timerfdChannel_(loop, timerfd_),
    timers_(),
    callingExpiredTimers_(false)
{
  timerfdChannel_.setReadCallback(
      std::bind(&TimerQueue::handleRead, this));
  // we are always reading the timerfd, we disarm it with timerfd_settime.
  timerfdChannel_.enableReading();
}

int createTimerfd()
{
  int timerfd = ::timerfd_create(CLOCK_MONOTONIC,
                                 TFD_NONBLOCK | TFD_CLOEXEC);
  return timerfd;
}

TimerQueue 需要能高效的组织目前尚未到期的 Timer:能快速根据当前时间找到已到期的 Timer,也要能高效的添加和删除 Timer。 一种做法是用二叉堆组织的优先队列,这种做法的复杂度为 O(logN) 另一种做法是使用使用二叉搜索树(std::setstd::map),把 Timer 按到期的时间排好先后顺序。操作的时间复杂度仍是 O(logN)。但是我们不能直接用 std::map<Timestamp, Timer*>,因为这样无法处理两个 Timer 到期时间相同的情况。有两个解决方案,一个是用 multimapmultiset,二是设法区分 key ,muduo 采用的是第二种做法。具体来说,以 pair<Timestamp, Timer*> 为 key,这样即便两个 Timer 到期时间相同,key 也不同

typedef std::pair<Timestamp, Timer*> Entry;
typedef std::set<Entry> TimerList;
///
/// A best efforts timer queue.
/// No guarantee that the callback will be on time.
///
class TimerQueue : noncopyable
{
 ...
 
  // Timer list sorted by expiration
  TimerList timers_;
  
 ...
};

Timer 记录了 callback, expiration, interval, repeat

typedef std::function<void()> TimerCallback;

class Timer : noncopyable
{
 public:
  Timer(TimerCallback cb, Timestamp when, double interval)
    : callback_(std::move(cb)),
      expiration_(when),
      interval_(interval),
      repeat_(interval > 0.0),
      sequence_(s_numCreated_.incrementAndGet())
  { }

  void run() const
  {
    callback_();
  }
  ...

 private:
  const TimerCallback callback_;
  Timestamp expiration_;
  const double interval_;
  const bool repeat_;
  const int64_t sequence_;

  static AtomicInt64 s_numCreated_;
};

TimerQueue::addTimer 是利用了 EventLoop 调配任务的特性,调配了一个 TimerQueue::addTimerInLoop 来添加定时任务

TimerId TimerQueue::addTimer(TimerCallback cb,
                             Timestamp when,
                             double interval)
{
  Timer* timer = new Timer(std::move(cb), when, interval);
  loop_->runInLoop(
      std::bind(&TimerQueue::addTimerInLoop, this, timer));
  return TimerId(timer, timer->sequence());
}

先将 Timer 插入到 timers_,如果新插入的 Timer 到期时间是最小的,那么 resetTimerfd

void TimerQueue::addTimerInLoop(Timer* timer)
{
  loop_->assertInLoopThread();
  bool earliestChanged = insert(timer);

  if (earliestChanged)
  {
    resetTimerfd(timerfd_, timer->expiration());
  }
}

bool TimerQueue::insert(Timer* timer)
{
  loop_->assertInLoopThread();
  assert(timers_.size() == activeTimers_.size());
  bool earliestChanged = false;
  Timestamp when = timer->expiration();
  TimerList::iterator it = timers_.begin();
  if (it == timers_.end() || when < it->first)
  { // 队列空,或者 when 小于队列头
    earliestChanged = true;
  }
  {
    std::pair<TimerList::iterator, bool> result
      = timers_.insert(Entry(when, timer));
    assert(result.second); (void)result;
  }
  ...

  return earliestChanged;
}

void resetTimerfd(int timerfd, Timestamp expiration)
{
  // wake up loop by timerfd_settime()
  struct itimerspec newValue;
  struct itimerspec oldValue;
  memZero(&newValue, sizeof newValue);
  memZero(&oldValue, sizeof oldValue);
  newValue.it_value = howMuchTimeFromNow(expiration);
  int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);
}

timerfd_ 的 ReadCallBack 为 TimerQueue::handleRead

void TimerQueue::handleRead()
{
  loop_->assertInLoopThread();
  Timestamp now(Timestamp::now());
  readTimerfd(timerfd_, now);

  std::vector<Entry> expired = getExpired(now);

  callingExpiredTimers_ = true;
  ...
  
  // safe to callback outside critical section
  for (const Entry& it : expired)
  {
    it.second->run();
  }
  
  callingExpiredTimers_ = false;

  reset(expired, now);
}

std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
{
  assert(timers_.size() == activeTimers_.size());
  std::vector<Entry> expired;
  Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
  TimerList::iterator end = timers_.lower_bound(sentry);
  assert(end == timers_.end() || now < end->first);
  std::copy(timers_.begin(), end, back_inserter(expired));
  timers_.erase(timers_.begin(), end);
  ...
  
  return expired;
}

void TimerQueue::reset(const std::vector<Entry>& expired, Timestamp now)
{
  Timestamp nextExpire;

  for (const Entry& it : expired)
  {
    ActiveTimer timer(it.second, it.second->sequence());
    if (it.second->repeat() // 将过期了的但是,repeat 的 timer 加回 timers_
        && ...)
    {
      it.second->restart(now);
      insert(it.second);
    }
    else
    {
      // FIXME move to a free list
      delete it.second; // FIXME: no delete please
    }
  }

  if (!timers_.empty())
  {
    nextExpire = timers_.begin()->second->expiration();
  }

  if (nextExpire.valid())
  {
    resetTimerfd(timerfd_, nextExpire);
  }
}
  1. timerfd_ 里读东西出来
  2. timers_ 找到所有失效的 Timer
  3. 依次调用所有失效 Timer 的用户回调函数
  4. resetTimerfd 为下一个到期时间最小的 Timer https://img-blog.csdnimg.cn/5ba10baace8a4d779a9675500dda8f19.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAQnJhbnppbm8=,size_20,color_FFFFFF,t_70,g_se,x_16#pic_center

EventLoopThread

IO线程不一定是主线程,我们可以在任何一个线程创建并运行 EventLoop。一个程序也可以有不止一个IO线程,我们可以按优先级将不同的 socket 分给不同的IO线程,避免优先级反转。为了方便将来使用,我们定义 EventLoopThread class,这正是 one loop per thread 的本意。

class EventLoopThread : noncopyable
{
 public:
  typedef std::function<void(EventLoop*)> ThreadInitCallback;

  EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback(),
                  const string& name = string());
  ~EventLoopThread();
  EventLoop* startLoop();

 private:
  void threadFunc();

  EventLoop* loop_ GUARDED_BY(mutex_);
  Thread thread_;
  MutexLock mutex_;
  Condition cond_ GUARDED_BY(mutex_);
  ThreadInitCallback callback_;
};

EventLoopThread::EventLoopThread(const ThreadInitCallback& cb,
                                 const string& name)
  : loop_(NULL),
    thread_(std::bind(&EventLoopThread::threadFunc, this), name),
    mutex_(),
    cond_(mutex_),
    callback_(cb)
{
}

EventLoopThread 会启动自己的线程(thread_(std::bind(&EventLoopThread::threadFunc, this), name)),并在其中运行 EventLoop::loop()。启动函数 startLoop() 定义如下,这个函数会返回新线程中 EventLoop 对象的指针,这样就可以对新线程中 EventLoop 对象执行调配任务,安排定时任务等操作。

EventLoop* EventLoopThread::startLoop()
{
  thread_.start();

  EventLoop* loop = NULL;
  {
    MutexLockGuard lock(mutex_);
    while (loop_ == NULL)
    {
      cond_.wait(); // 用条件变量来等待线程的创建与运行
    }
    loop = loop_;
  }

  return loop;
}

线程主函数在stack上定义 EventLoop 对象,然后将其地址赋值给成员变量 loop_,最后 notify() 条件变量,唤醒 startLoop()。 如果设置了 callback_ 回调函数,即在这个线程开始的时候需要调用这个回调

void EventLoopThread::threadFunc()
{
  EventLoop loop;

  if (callback_)
  {
    callback_(&loop);
  }

  {
    MutexLockGuard lock(mutex_);
    loop_ = &loop;
    cond_.notify();
  }

  loop.loop();
  
  MutexLockGuard lock(mutex_);
  loop_ = NULL;
}

EventLoopThread 对象析构的时候,会 loop_->quit() 使新线程从 loop.loop() 中返回

EventLoopThread::~EventLoopThread()
{
  if (loop_ != NULL) // not 100% race-free, eg. threadFunc could be running callback_.
  {
    // still a tiny chance to call destructed object, if threadFunc exits just now.
    // but when EventLoopThread destructs, usually programming is exiting anyway.
    loop_->quit();
    thread_.join();
  }
}

EventLoopThreadPool

即多个 EventLoopThread

class EventLoopThreadPool : noncopyable
{
 public:
  typedef std::function<void(EventLoop*)> ThreadInitCallback;

  EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg);
  ~EventLoopThreadPool();
  void setThreadNum(int numThreads) { numThreads_ = numThreads; }
  void start(const ThreadInitCallback& cb = ThreadInitCallback());

  // valid after calling start()
  /// round-robin
  EventLoop* getNextLoop();
  ...

 private:

  EventLoop* baseLoop_; // baseloop_ 不属于 threads_ 中任何一个 EventLoopThread
  string name_;
  bool started_;
  int numThreads_;
  int next_;
  std::vector<std::unique_ptr<EventLoopThread>> threads_;
  std::vector<EventLoop*> loops_;
};

EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg)
  : baseLoop_(baseLoop),
    name_(nameArg),
    started_(false),
    numThreads_(0),
    next_(0)
{
}

生成 numThreads_EventLoopThread,并依次调用 EventLoopThread::startLoop,获取每个的 EventLoop 对象,保存在 loops_

void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{
  assert(!started_);
  baseLoop_->assertInLoopThread();

  started_ = true;

  for (int i = 0; i < numThreads_; ++i)
  {
    char buf[name_.size() + 32];
    snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);
    EventLoopThread* t = new EventLoopThread(cb, buf);
    threads_.push_back(std::unique_ptr<EventLoopThread>(t));
    loops_.push_back(t->startLoop());
  }
  if (numThreads_ == 0 && cb)
  {
    cb(baseLoop_);
  }
}

以 round-robin 的策略返回一个 EventLoop,如果该 EventLoopThreadPool,压根就没有任何 thread,那么就返回 baseLoop_

EventLoop* EventLoopThreadPool::getNextLoop()
{
  baseLoop_->assertInLoopThread();
  assert(started_);
  EventLoop* loop = baseLoop_;

  if (!loops_.empty())
  {
    // round-robin
    loop = loops_[next_];
    ++next_;
    if (implicit_cast<size_t>(next_) >= loops_.size())
    {
      next_ = 0;
    }
  }
  return loop;
}