文章 12
评论 4
浏览 32306
C++高性能服务端编程——muduo网络库源码阅读(二)事件响应与任务管理

C++高性能服务端编程——muduo网络库源码阅读(二)事件响应与任务管理

概述

上一篇文章简要介绍了reactor事件驱动模型和muduo网络库的总体架构,并且简述了事件循环核心类EventLoop的整体执行逻辑。容易看出,loop函数中 poller_对象的 poll方法和Channel的 handleEvent方法是我们需要重点关注的对象,它们分别对应对事件响应和处理的代码实现。除此以外,EventLoop中任务队列的管理和线程安全的细节也是我们需要关注的。本文将结合相关的源码,对muduo的事件响应、处理以及任务管理机制做一个分析。

muduo的IO多路复用机制

muduo实现了Linux中poll和epoll两种IO多路复用机制,同时用了一些额外的数据结构对其进行封装,以配合muduo的整体逻辑实现。在muduo中,Poller和Channel两个类是实现该机制的关键。

Channel

前面已经提过,muduo中Channel类相当于上一篇文章reactor模型中的handler,每个文件描述符都对应一个Channel,由它负责注册对应事件的回调函数。需要注意的是,Channel并不拥有这个文件描述符,只是拥有文件描述符的事件信息和回调逻辑。换句话说,Channel更像是一个代理类,由它去做具体的事情,但文件描述符的生命周期不由它管理,下面是Channel类的声明

class Channel : noncopyable
{
 public:
  typedef std::function<void()> EventCallback;
  typedef std::function<void(Timestamp)> ReadEventCallback;

  Channel(EventLoop* loop, int fd);
  ~Channel();

  void handleEvent(Timestamp receiveTime);
  void setReadCallback(ReadEventCallback cb)
  { readCallback_ = std::move(cb); }
  void setWriteCallback(EventCallback cb)
  { writeCallback_ = std::move(cb); }
  void setCloseCallback(EventCallback cb)
  { closeCallback_ = std::move(cb); }
  void setErrorCallback(EventCallback cb)
  { errorCallback_ = std::move(cb); }

  /// Tie this channel to the owner object managed by shared_ptr,
  /// prevent the owner object being destroyed in handleEvent.
  void tie(const std::shared_ptr<void>&);

  int fd() const { return fd_; }
  int events() const { return events_; }
  void set_revents(int revt) { revents_ = revt; } // used by pollers
  // int revents() const { return revents_; }
  bool isNoneEvent() const { return events_ == kNoneEvent; }

  void enableReading() { events_ |= kReadEvent; update(); }
  void disableReading() { events_ &= ~kReadEvent; update(); }
  void enableWriting() { events_ |= kWriteEvent; update(); }
  void disableWriting() { events_ &= ~kWriteEvent; update(); }
  void disableAll() { events_ = kNoneEvent; update(); }
  bool isWriting() const { return events_ & kWriteEvent; }
  bool isReading() const { return events_ & kReadEvent; }

  // for Poller
  int index() { return index_; }
  void set_index(int idx) { index_ = idx; }

  // for debug
  string reventsToString() const;
  string eventsToString() const;

  void doNotLogHup() { logHup_ = false; }

  EventLoop* ownerLoop() { return loop_; }
  void remove();

 private:
  static string eventsToString(int fd, int ev);

  void update();
  void handleEventWithGuard(Timestamp receiveTime);

  static const int kNoneEvent;
  static const int kReadEvent;
  static const int kWriteEvent;

  EventLoop* loop_;
  const int  fd_;
  int        events_;
  int        revents_; // it's the received event types of epoll or poll
  int        index_; // used by Poller.
  bool       logHup_;

  std::weak_ptr<void> tie_;
  bool tied_;
  bool eventHandling_;
  bool addedToLoop_;
  ReadEventCallback readCallback_;
  EventCallback writeCallback_;
  EventCallback closeCallback_;
  EventCallback errorCallback_;
};

它的构造函数比较简单,只是各个成员变量进行了初始化,即

Channel::Channel(EventLoop* loop, int fd__)
  : loop_(loop),
    fd_(fd__),
    events_(0),
    revents_(0),
    index_(-1),
    logHup_(true),
    tied_(false),
    eventHandling_(false),
    addedToLoop_(false)
{
}

Channel类中有几个比较重要的回调函数,从命名看分别对应读事件、写事件、出错、文件关闭发生时的回调,并提供了相应的接口去设置这些回调函数,其中读事件回调相比其他回调函数多了一个 Timestamp(muduo中一个记录时间戳的类)参数,传递读事件发生时的时刻。

Channel中最重要的三个成员变量是 fd_events_revents_,分别对应文件描述符、希望Poller监听的事件以及实际发生的事件,此外还有一个 index_变量用于辅助Poller的查找,Channel中提供了一系列方法来设置和检查 events_变量的值(这里统一采用了 <poll.h>头文件中的宏定义表示,在数值上epoll和poll的事件宏是一致的),并预先设定了几个常量的值,分别为

const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent = POLLIN | POLLPRI;
const int Channel::kWriteEvent = POLLOUT;

这三个常量分别表示无、可读、可写事件,设置监听事件也是通过 events_变量与这些常量进行位运算实现,通过调用 enableReading/enableWriting方法就可以设置监听可读/可写事件,设置完成后需要调用 update方法更新Poller中的文件描述符设置。接下来,我们看Channel中的核心方法 handleEvent

void Channel::handleEvent(Timestamp receiveTime)
{
  std::shared_ptr<void> guard;
  if (tied_)
  {
    guard = tie_.lock();
    if (guard)
    {
      handleEventWithGuard(receiveTime);
    }
  }
  else
  {
    handleEventWithGuard(receiveTime);
  }
}

这个方法中有一个 tie_变量,这里是用来保证线程安全的一个辅助变量,它是一个指向任意类型的weak_ptr

std::weak_ptr<void> tie_;

可以调用Channel的 tie方法给 tie_变量绑定一个具体的对象

void Channel::tie(const std::shared_ptr<void>& obj)
{
  tie_ = obj;
  tied_ = true;
}

绑定后当前Channel对象就成了这个对象的从属,handleEvent方法中首先调用了 tie_lock方法提升权限(具体可参考《关于C++智能指针的一些思考和总结》),如果 tie_绑定的对象已经被释放,那么当前Channel对象也随之失效,不再进行后面的操作,提升成功后会调用 handleEventWithGuard方法,具体为

void Channel::handleEventWithGuard(Timestamp receiveTime)
{
  eventHandling_ = true;
  LOG_TRACE << reventsToString();
  if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
  {
    if (logHup_)
    {
      LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLHUP";
    }
    if (closeCallback_) closeCallback_();
  }

  if (revents_ & POLLNVAL)
  {
    LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLNVAL";
  }

  if (revents_ & (POLLERR | POLLNVAL))
  {
    if (errorCallback_) errorCallback_();
  }
  if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
  {
    if (readCallback_) readCallback_(receiveTime);
  }
  if (revents_ & POLLOUT)
  {
    if (writeCallback_) writeCallback_();
  }
  eventHandling_ = false;
}

这个方法描述了Channel对具体事件的处理逻辑,其中 POLLHUPPOLLERRPOLLNVAL事件表示错误发生(logHup_变量表示是否为 POLLHUP事件记录日志),调用 errorCallback_POLLINPOLLPRIPOLLRDHUP均表示可读事件发生,调用 readCallback_POLLOUT表示可写事件发生,调用 writeCallback_(这些宏定义的含义可以参考《Unix网络编程:卷一》6.10节,本文不再赘述)。通过这样的逻辑,就可以利用回调函数来控制不同类型文件描述符的事件处理。

Poller类与poll/epoll

上一篇文章简单提过Poller类,这是一个抽象基类,规定了一个统一的调用接口,在实现具体的poll/epoll机制时需要继承这个基类并重写方法,用户在使用时就可以按照统一的调用方式而无需关心内部的具体实现,先看Poller类的声明

class Poller : noncopyable
{
 public:
  typedef std::vector<Channel*> ChannelList;

  Poller(EventLoop* loop);
  virtual ~Poller();

  /// Polls the I/O events.
  /// Must be called in the loop thread.
  virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0;

  /// Changes the interested I/O events.
  /// Must be called in the loop thread.
  virtual void updateChannel(Channel* channel) = 0;

  /// Remove the channel, when it destructs.
  /// Must be called in the loop thread.
  virtual void removeChannel(Channel* channel) = 0;

  virtual bool hasChannel(Channel* channel) const;

  static Poller* newDefaultPoller(EventLoop* loop);

  void assertInLoopThread() const
  {
    ownerLoop_->assertInLoopThread();
  }

 protected:
  typedef std::map<int, Channel*> ChannelMap;
  ChannelMap channels_;

 private:
  EventLoop* ownerLoop_;
};

Poller对象的生命周期与它所属的EventLoop对象一致(这一点从EventLoop类的定义中可以看出),同样它有一枚指向拥有它的EventLoop对象的指针用于交互,Poller的构造函数只做了为之赋值这一操作。Poller中有三个纯虚函数 pollupdateChannelremoveChannel由子类实现,muduo中Poller的两个派生类 PollPollerEPollPoller分别实现了Linux的poll和epoll机制,下面我们分别分析两个类的实现细节。

PollPoller

PollPoller除了重写了三个方法以外,还新增了一个存储 pollfd结构体的容器

std::vector<struct pollfd> pollfds_;

PollPoller所监控的每一个文件描述符(Channel)都对应一个 pollfd结构体,它的 poll函数代码为

Timestamp PollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
  // XXX pollfds_ shouldn't change
  int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs);
  int savedErrno = errno;
  Timestamp now(Timestamp::now());
  if (numEvents > 0)
  {
    LOG_TRACE << numEvents << " events happened";
    fillActiveChannels(numEvents, activeChannels);
  }
  else if (numEvents == 0)
  {
    LOG_TRACE << " nothing happened";
  }
  else
  {
    if (savedErrno != EINTR)
    {
      errno = savedErrno;
      LOG_SYSERR << "PollPoller::poll()";
    }
  }
  return now;
}

除去一些日志和异常判断的操作外,poll函数整体逻辑为先调用Linux的poll函数获得它监控的一组文件描述符上有多少事件发生,然后调用 fillActiveChannels函数获取所有活跃的Channel指针,具体代码为

void PollPoller::fillActiveChannels(int numEvents,
                                    ChannelList* activeChannels) const
{
  for (PollFdList::const_iterator pfd = pollfds_.begin();
      pfd != pollfds_.end() && numEvents > 0; ++pfd)
  {
    if (pfd->revents > 0)
    {
      --numEvents;
      ChannelMap::const_iterator ch = channels_.find(pfd->fd);
      assert(ch != channels_.end());
      Channel* channel = ch->second;
      assert(channel->fd() == pfd->fd);
      channel->set_revents(pfd->revents);
      // pfd->revents = 0;
      activeChannels->push_back(channel);
    }
  }
}

实现上并没有什么特别之处,即遍历整个容器选出发生事件的描述符,然后从 channels_(基类的数据结构)查找出其所对应的Channel指针,然后为其设置实际发生的事件

channel->set_revents(pfd->revents);

updateChannelremoveChannel的功能分别为添加新的Channel/更新已有Channel信息和删除Channel,代码如下(原来的代码中用了大量的 assert来保证程序的准确性,这里篇幅所限略去)

void PollPoller::updateChannel(Channel* channel)
{
  Poller::assertInLoopThread();
  LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
  if (channel->index() < 0)
  {
    // 略去一堆assert
    struct pollfd pfd;
    pfd.fd = channel->fd();
    pfd.events = static_cast<short>(channel->events());
    pfd.revents = 0;
    pollfds_.push_back(pfd);
    int idx = static_cast<int>(pollfds_.size())-1;
    channel->set_index(idx);
    channels_[pfd.fd] = channel;
  }
  else
  {
    // 略去assert
    int idx = channel->index();
    struct pollfd& pfd = pollfds_[idx];
    pfd.fd = channel->fd();
    pfd.events = static_cast<short>(channel->events());
    pfd.revents = 0;
    if (channel->isNoneEvent())
    {
      // ignore this pollfd
      pfd.fd = -channel->fd()-1;
    }
  }
}

void PollPoller::removeChannel(Channel* channel)
{
  //略去assert
  Poller::assertInLoopThread();
  LOG_TRACE << "fd = " << channel->fd();
  int idx = channel->index();
  const struct pollfd& pfd = pollfds_[idx]; (void)pfd;
  size_t n = channels_.erase(channel->fd());
  assert(n == 1); (void)n;
  if (implicit_cast<size_t>(idx) == pollfds_.size()-1)
  {
    pollfds_.pop_back();
  }
  else
  {
    int channelAtEnd = pollfds_.back().fd;
    iter_swap(pollfds_.begin()+idx, pollfds_.end()-1);
    if (channelAtEnd < 0)
    {
      channelAtEnd = -channelAtEnd-1;
    }
    channels_[channelAtEnd]->set_index(idx);
    pollfds_.pop_back();
  }
}

前面介绍Channel定义时提到过它有一个 index_成员变量,这个变量就是为了辅助Poller管理Channel而添加的,updateChannel首先做了一个判断,如果Channel的index小于0(即为-1),那么它就是新增的,为它新建一个 pollfd结构体并加入 pollfds_容器中,最后将这个Channel的 index_设置为结构体在容器中的索引。如果索引大于0,说明是已经存在的Channel,这时只要取出Channel的 index_对应位置的 pollfd结构体并将它所监听的事件修改为Channel现在需要监听的事件,并重置实际事件即可

pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;

removeChannel函数的基本逻辑也是根据Channel的索引来删除 pollfd结构体,但值得一提的是,这里将要删除的Channel和容器中最后一个Channel交换了位置和索引,最后只需要将容器最后的Channel弹出即可。通过这样的技巧可以将删除Channel操作的时间复杂度降为常数,removeChannel函数整体时间复杂度为 std::map查找的时间复杂度O(logN)。

EPollPoller

EpollPoller的逻辑要比PollPoller简单一些,因为epoll函数可以直接返回发生的事件以及对应的Channel,因此Channel中的index含义也有所不同。EpollPoller中有一个 epoll_event结构体的容器,用来存储 epoll_wait函数所返回的事件结构体

std::vector<struct epoll_event> events_;

考虑到 std::vector内存连续的特点,调用方式即为

int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), timeoutMs);

EPollPoller::updateChannel方法也和 PollPoller::updateChannel方法有所不同,这里Channel的index不再表示事件结构体的索引,而是只取三个值表示某个Channel为新增、已存在或已删除,具体值定义如下

const int kNew = -1;
const int kAdded = 1;
const int kDeleted = 2;

EPollPoller::updateChannel方法的具体代码如下(略去assert)

void EPollPoller::updateChannel(Channel* channel)
{
  Poller::assertInLoopThread();
  const int index = channel->index();
  LOG_TRACE << "fd = " << channel->fd()
    << " events = " << channel->events() << " index = " << index;
  if (index == kNew || index == kDeleted)
  {
    // a new one, add with EPOLL_CTL_ADD
    int fd = channel->fd();
    if (index == kNew)
    {
      channels_[fd] = channel;
    }

    channel->set_index(kAdded);
    update(EPOLL_CTL_ADD, channel);
  }
  else
  {
    // update existing one with EPOLL_CTL_MOD/DEL
    int fd = channel->fd();
    (void)fd;
    if (channel->isNoneEvent())
    {
      update(EPOLL_CTL_DEL, channel);
      channel->set_index(kDeleted);
    }
    else
    {
      update(EPOLL_CTL_MOD, channel);
    }
  }
}

当Channel的index值为 kNewkDeleted时,表示为新注册或之前被删除的Channel,于是将Channel指针加入容器中,并设置其索引为 kAdded。反之,如果为 kAdded则说明需要更新已经存在的Channel参数。添加、删除或更改 epoll_event结构体是通过 epoll_ctl函数实现,这里用一个 update方法做了一个封装

void EPollPoller::update(int operation, Channel* channel)
{
  struct epoll_event event;
  memZero(&event, sizeof event);
  event.events = channel->events();
  event.data.ptr = channel;
  int fd = channel->fd();
  LOG_TRACE << "epoll_ctl op = " << operationToString(operation)
    << " fd = " << fd << " event = { " << channel->eventsToString() << " }";
  if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)
  {
    if (operation == EPOLL_CTL_DEL)
    {
      LOG_SYSERR << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd;
    }
    else
    {
      LOG_SYSFATAL << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd;
    }
  }
}

至此,Poller类及其派生类的代码和IO多路复用的总体实现基本介绍完毕,可以研读muduo代码学习更细节的内容,本文篇幅所限就不再细究。

EventLoop任务管理

本节介绍一下muduo中EventLoop类的任务管理机制,这里的“任务”其实是一个笔者所总结的一个概念,具体来说就是一个封装了某个需要执行的函数及其参数的函数对象(即 std::function)。muduo的线程模型总体来说是“one loop per thread + thread pool”,即程序运行时每个线程有且仅有一个EventLoop对象,主线程通过一个线程池来维护这些拥有EventLoop对象的子线程(主线程也有一个EventLoop对象,具体细节会在后面的文章中介绍)。相应的,为了保证线程安全性,muduo中几乎所有涉及到操作线程内数据的函数都是通过该线程内EventLoop对象的接口调用,这样可以将不同线程间的操作统一到一个线程内从而实现线程安全(同时会增加一些内存拷贝的开销,但总的来说不会明显影响性能),实现上就用到了前文提到的EventLoop类中的任务队列。EventLoop支持普通任务和定时任务的添加,下面逐一介绍具体的实现细节。

普通任务

这里的“普通任务”指的是那些不带有定时信息,希望工作线程尽可能快地处理完毕的任务,这种任务一般通过调用 EventLoop::runInLoop方法来执行,具体的代码为

void EventLoop::runInLoop(Functor cb)
{
  if (isInLoopThread())
  {
    cb();
  }
  else
  {
    queueInLoop(std::move(cb));
  }
}

Functor是函数对象类型,头文件中声明为

typedef std::function<void()> Functor;

可见EventLoop中所有任务都以无返回值且无参数的函数形式调用并执行,EventLoop::runInLoop方法首先判断调用线程是否为EventLoop所在线程,如果是则直接执行任务,否则调用 EventLoop::queueInLoop方法将其加入任务队列

void EventLoop::queueInLoop(Functor cb)
{
  {
  MutexLockGuard lock(mutex_);
  pendingFunctors_.push_back(std::move(cb));
  }

  if (!isInLoopThread() || callingPendingFunctors_)
  {
    wakeup();
  }
}

如前所述,pendingFunctors_容器所有线程都可以操作,因此需要用互斥锁来保证线程安全。添加完任务后,函数又做了一个判断,如果不是EventLoop所在线程调用或正在执行任务队列中的函数,则调用 wakeup函数唤醒当前线程。EventLoop的唤醒机制是用Linux中的 eventfd来实现的,需要包含头文件 <sys/eventfd.h>,通过调用 eventfd函数创建,在muduo中为

int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);

这一句表示创建了一个 eventfd“文件”,并将其设置为非阻塞且随进程退出关闭,函数返回一个文件描述符。这个文件描述符同样对应一个Channel对象,并需要注册到Poller中监听其事件,声明为

int wakeupFd_;
std::unique_ptr<Channel> wakeupChannel_;

wakeupFd_的生命周期由EventLoop对象管理,EventLoop类在构造函数中创建该文件,并为其创建Channel对象以及注册事件回调

EventLoop::EventLoop()
  : looping_(false),
    quit_(false),
    eventHandling_(false),
    callingPendingFunctors_(false),
    iteration_(0),
    threadId_(CurrentThread::tid()),
    poller_(Poller::newDefaultPoller(this)),
    timerQueue_(new TimerQueue(this)),
    wakeupFd_(createEventfd()),
    wakeupChannel_(new Channel(this, wakeupFd_)),
    currentActiveChannel_(NULL)
{
  LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;
  if (t_loopInThisThread)
  {
    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
              << " exists in this thread " << threadId_;
  }
  else
  {
    t_loopInThisThread = this;
  }
  wakeupChannel_->setReadCallback(
      std::bind(&EventLoop::handleRead, this));
  // we are always reading the wakeupfd
  wakeupChannel_->enableReading();
}

EventLoop的构造函数除了初始化一些成员变量外,还将 EventLoop::handleRead函数设置为 wakeupChannel_的读事件回调,并设置监听可读事件(对于 wakeupFd_这个文件描述符而言,我们只需要监听其读事件即可)。EventLoop会在析构函数中关闭该文件,并移除 wakeupChannel_

EventLoop::~EventLoop()
{
  LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_
            << " destructs in thread " << CurrentThread::tid();
  wakeupChannel_->disableAll();
  wakeupChannel_->remove();
  ::close(wakeupFd_);
  t_loopInThisThread = NULL;
}

eventfd中写入8个字节的数据就可以使其产生可读事件,因此前面的 wakeup函数定义为

void EventLoop::wakeup()
{
  uint64_t one = 1;
  ssize_t n = sockets::write(wakeupFd_, &one, sizeof one);
  if (n != sizeof one)
  {
    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
  }
}

处理可读事件的回调函数 EventLoop::handleRead也很简单,只需要将数据读出并做一些简单的判断即可(代码自己看吧,不贴出来了😂...... )。总结下来,eventfd类似一个通知机制,可以通过写入8个字节数据的方式通知阻塞在 Poller::poll函数上的线程,将其唤醒。介绍完这些,EventLoop::queueInLoop函数的逻辑就很好理解了,可以分下面两种情况讨论

  • 在EventLoop线程中调用:一种可能是在执行事件回调时(即调用Channel::handleEvent函数时),需要调用该函数添加任务,这时因为执行完事件回调后会调用doPendingFunctors函数执行任务队列中的函数(上一篇文章介绍过具体细节),因此添加的事件可以很快(这里的“很快”指的是线程在执行任务前不会被阻塞)得到执行,不需要唤醒。另一种可能是在执行pendingFunctors_中的任务时调用(即前面代码中callingPendingFunctors_变量为真),这时就需要唤醒,因为任务执行完毕后,当前线程会继续调用poll函数等待事件发生。设想这样一种情况,线程在执行任务时向pendingFunctors_中添加了新的任务,所有任务执行完后Poller所监听的所有文件描述符都没有事件发生,这样最差情况下就需要等待timeoutMs毫秒的时间(即线程阻塞在pollepoll_wait函数上直到超时),新添加的任务就不能很快得到执行,因此需要通过前面介绍的通知机制唤醒线程。
  • 在其他线程调用:其他线程向本线程添加任务时,由于不知道本线程的具体状态,因此就需要调用wakeup函数以确保本线程不会被阻塞。

定时任务

muduo支持通过定时器添加在特定时间执行的任务,定时器类定义在net文件夹下的Timer.h和Timer.cc文件中,它的实现比较简单,这里就不再详细介绍。下面,我们重点讨论定时器管理类TimerQueue的实现。

在介绍这个类的细节前,我们先看Linux中的 timerfd,muduo中利用这个定时器文件来实现事件循环对定时任务的响应。使用 timerfd需要包含头文件 <sys/timerfd.h>,并通过 timerfd_create创建一个 timerfd

int timerfd = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);

第一个标志位表示将其设置成以绝对时间为准的单调递增定时器,第二个参数表示它为非阻塞且会随着进程退出而被关闭。使用时,可以利用 timerfd_settime设定一个从设定时间开始的等待时长,到达该时长后 timerfd就会产生可读事件,muduo中封装了一个 resetTimerfd函数来设置时长

void resetTimerfd(int timerfd, Timestamp expiration)
{
  // wake up loop by timerfd_settime()
  struct itimerspec newValue;
  struct itimerspec oldValue;
  memZero(&newValue, sizeof newValue);
  memZero(&oldValue, sizeof oldValue);
  newValue.it_value = howMuchTimeFromNow(expiration);
  int ret = ::timerfd_settime(timerfd, 0, &newValue, &oldValue);
  if (ret)
  {
    LOG_SYSERR << "timerfd_settime()";
  }
}

expiration参数表示一个时间点(应晚于当前时刻),该函数会计算当前时刻到这个时间点的时长,并设置 newValue结构体的值,调用 timerfd_settime函数设置时长。

TimerQueue类也是在构造函数中创建 timerfd,并创建对应的Channel以及注册回调函数

TimerQueue::TimerQueue(EventLoop* loop)
  : loop_(loop),
    timerfd_(createTimerfd()),
    timerfdChannel_(loop, timerfd_),
    timers_(),
    callingExpiredTimers_(false)
{
  timerfdChannel_.setReadCallback(
      std::bind(&TimerQueue::handleRead, this));
  // we are always reading the timerfd, we disarm it with timerfd_settime.
  timerfdChannel_.enableReading();
}

timerfdChannel_的生命周期是由TimerQueue类所控制的,它的可读事件回调函数为 TimerQueue::handleRead,并且被设置为只监听可读事件。外部通过 TimerQueue::addTimer方法添加新的定时器

TimerId TimerQueue::addTimer(TimerCallback cb,
                             Timestamp when,
                             double interval)
{
  Timer* timer = new Timer(std::move(cb), when, interval);
  loop_->runInLoop(
      std::bind(&TimerQueue::addTimerInLoop, this, timer));
  return TimerId(timer, timer->sequence());
}

void TimerQueue::addTimerInLoop(Timer* timer)
{
  loop_->assertInLoopThread();
  bool earliestChanged = insert(timer);

  if (earliestChanged)
  {
    resetTimerfd(timerfd_, timer->expiration());
  }
}

可以看到这里的代码逻辑和前文介绍的一致,TimerQueue::addTimer方法并不直接添加定时器,而是通过 EventLoop::runInLoop方法间接调用 TimerQueue::addTimerInLoop方法来添加定时器,代码中的 insert函数是TimerQueue的一个私有成员函数,在插入新的定时器时比较新定时器和定时器队列中最早失效的定时器时间先后,如果新定时器失效时间较早,那么就设置 earliestChanged变量为true,这表示最早失效时间改变,需要调用 resetTimerfd函数重新设置时长。

需要指出的是,TimerQueue有一个 timers_成员变量,这是一个 std::set容器,存储定时器的时间和指针对,定义为

typedef std::pair<Timestamp, Timer*> Entry;
typedef std::set<Entry> TimerList;
TimerList timers_;

TimerListEntry类型的集合容器,我们知道 std::set中的元素默认情况下是按照顺序从小到大排列的,在不指定比较器的情况下,先比较 Timestamp类型变量的大小,如果一致再比较 Timer*类型的变量大小,由此确定其大小顺序。这要求这两种类型都支持比较大小的运算符(一般情况下重载 <即可),muduo中的 Timestamp类重载了大小关系的所有比较运算符,时间较早者为小的一方。timers_容器在添加新的定时器时,会将定时器按时间前后顺序排列,如果时间戳相同则根据指针值大小排列。

现在我们来看下 timerfdChannel_中注册的可读事件回调,即 TimerQueue::handleRead

void TimerQueue::handleRead()
{
  loop_->assertInLoopThread();
  Timestamp now(Timestamp::now());
  readTimerfd(timerfd_, now);

  std::vector<Entry> expired = getExpired(now);

  callingExpiredTimers_ = true;
  cancelingTimers_.clear();
  // safe to callback outside critical section
  for (const Entry& it : expired)
  {
    it.second->run();
  }
  callingExpiredTimers_ = false;

  reset(expired, now);
}

通过静态方法 Timestamp::now()获得当前时刻,并传入 getExpired函数中获取所有早于该时刻或在该时刻失效的定时器,然后调用每个定时器的 run 方法来执行它的定时任务, TimerQueue::getExpired的定义为

std::vector<TimerQueue::Entry> TimerQueue::getExpired(Timestamp now)
{
  assert(timers_.size() == activeTimers_.size());
  std::vector<Entry> expired;
  Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX));
  TimerList::iterator end = timers_.lower_bound(sentry);
  assert(end == timers_.end() || now < end->first);
  std::copy(timers_.begin(), end, back_inserter(expired));
  timers_.erase(timers_.begin(), end);

  for (const Entry& it : expired)
  {
    ActiveTimer timer(it.second, it.second->sequence());
    size_t n = activeTimers_.erase(timer);
    assert(n == 1); (void)n;
  }

  assert(timers_.size() == activeTimers_.size());
  return expired;
}

这个函数的整体逻辑并不难理解,重点在于理解下面这两句

Entry sentry(now, reinterpret_cast<Timer*>(UINTPTR_MAX)); 
TimerList::iterator end = timers_.lower_bound(sentry);

一般的编译器实现中,指针类型的值是一个无符号整型变量,32位操作系统上为4个字节,64位系统上为8个字节,UINTPTR_MAX宏表示指针的最大值。前文提到过,同样失效时间的定时器会通过比较指针值的大小确定大小顺序。在获得所有失效定时器时应当考虑到会有失效时刻恰好等于当前时刻 now的定时器,显然这个定时器是符合条件的,为了确保它能被取出,这里新建了一个临时变量 sentry,它的第二个值大小为 UINTPTR_MAX,并用 reinterpret_cast将其强制转化为 Timer*类型,这个变量本身是没有物理意义的,也不能使用,但所有时间戳等于 now的定时器必然都是“小于”它的,利用这个变量和 std::setlower_bound方法就可以将这些定时器选中。

前面实际上分析了TimerQueue类中的关键实现思想,了解这些后,TimerQueue的其他接口很容易就能看明白,这里就略去分析。现在回过头看如何向EventLoop对象添加定时任务,EventLoop类提供了 EventLoop::runAtEventLoop::runAfterEventLoop::runEvery三个接口,分别表示添加特定时刻执行的任务、从现在开始经过特定时长后执行的任务以及按照特定时长周期性执行的任务,代码如下

TimerId EventLoop::runAt(Timestamp time, TimerCallback cb)
{
  return timerQueue_->addTimer(std::move(cb), time, 0.0);
}

TimerId EventLoop::runAfter(double delay, TimerCallback cb)
{
  Timestamp time(addTime(Timestamp::now(), delay));
  return runAt(time, std::move(cb));
}

TimerId EventLoop::runEvery(double interval, TimerCallback cb)
{
  Timestamp time(addTime(Timestamp::now(), interval));
  return timerQueue_->addTimer(std::move(cb), time, interval);
}

这里需要解释的是,EventLoop::runEvery函数是通过设置 Timer对象的 interval变量(结合前文的 TimerQueue::addTimer代码理解,这个变量表示执行周期)来实现周期性执行任务的。TimerQueue类在处理完某个时刻到期的所有定时任务以后,会检查每个定时器是否设置了 interval变量,如果有就重启该定时器,具体可查看 TimerQueue::reset方法。

最后总结一下定时任务的实现逻辑:通过EventLoop类提供的接口注册定时器,定时器由TimerQueue对象管理,某个时刻队列中可能存在多个定时器,以这些定时器中最早过期定时器的过期时长设置 timerfd的等待时长。达到设定的时刻后,timerfd会产生可读事件,EventLoop会调用 timerfdChannel_中注册的读事件回调函数处理所有到期的定时任务。

总结

这篇文章主要介绍了muduo中的IO多路复用机制实现和任务管理机制,相比上一篇文章多了很多细节上的内容。本文所介绍的机制是理解muduo架构的关键,也是muduo中reactor模型的基础,后面文章将介绍的其他muduo类的实现也都与这些机制密切相关。


标题:C++高性能服务端编程——muduo网络库源码阅读(二)事件响应与任务管理
作者:coollwd
地址:http://coollwd.top/articles/2020/10/13/1603440103012.html

Everything that kills me makes me feel alive

取消