Redis定时任务

软件发布|下载排行|最新软件

当前位置:首页IT学院IT技术

Redis定时任务

心城以北   2022-05-23 我要评论

本文主要是基于 redis 6.2 源码进行分析定时事件的数据结构和常见操作。

数据结构

在 redis 中通过 aeTimeEvent 结构来创建定时任务事件,代码如下:

/* Time event structure */
typedef struct aeTimeEvent {
    // 标识符
    long long id; /* time event identifier. */
    // 定时纳秒数
    monotime when;
    // 定时回调函数
    aeTimeProc *timeProc;
    // 注销定时器时候的回调函数
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *prev;
    struct aeTimeEvent *next;
    int refcount; /* refcount to prevent timer events from being
             * freed in recursive time event calls. */
} aeTimeEvent;

常见操作

1. 创建定时事件

redis 中最重要的定时函数且是周期执行的函数,使用的是 serverCron 函数。在 redis 中由于定时任务比较少,因此并没有严格的按照过期时间来排序的,而是按照 id自增 + 头插法 来保证基本有序。

if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
  serverPanic("Can't create event loop timers.");
  exit(1);
}

//创建定时器对象
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;

    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    te->when = getMonotonicUs() + milliseconds * 1000;
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    te->prev = NULL;
    // 头插法 
    te->next = eventLoop->timeEventHead;
    te->refcount = 0;
    if (te->next)
        te->next->prev = te;
    eventLoop->timeEventHead = te;
    return id;
}

2. 触发定时事件

redis 中是采用 IO 复用来进行定时任务的。

查找距离现在最近的定时事件,见 usUntilEarliestTimer

​
/* How many microseconds until the first timer should fire.
 * If there are no timers, -1 is returned.
 *
 * Note that's O(N) since time events are unsorted.
 * Possible optimizations (not needed by Redis so far, but...):
 * 1) Insert the event in order, so that the nearest is just the head.
 *    Much better but still insertion or deletion of timers is O(N).
 * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
 */
static int64_t usUntilEarliestTimer(aeEventLoop *eventLoop) {
    aeTimeEvent *te = eventLoop->timeEventHead;
    if (te == NULL) return -1;

    aeTimeEvent *earliest = NULL;
    while (te) {
        if (!earliest || te->when < earliest->when)
            earliest = te;
        te = te->next;
    }

    monotime now = getMonotonicUs();
    return (now >= earliest->when) ? 0 : earliest->when - now;
}

​

这里时间复杂度可能比较高,实际中需要结合具体场景使用。

更新剩余过期时间,想想为啥呢?因为我们前面提到过,IO 复用有可能因为 IO 事件返回,所以需要更新。

if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
  usUntilTimer = usUntilEarliestTimer(eventLoop);

if (usUntilTimer >= 0) {
  tv.tv_sec = usUntilTimer / 1000000;
  tv.tv_usec = usUntilTimer % 1000000;
  tvp = &tv;
} else {
  if (flags & AE_DONT_WAIT) {
    // 不等待
    tv.tv_sec = tv.tv_usec = 0;
    tvp = &tv;
  } else {
    /* Otherwise we can block */
    tvp = NULL; /* wait forever */
  }
}

3. 执行定时事件

一次性的执行完直接删除,周期性的执行完在重新添加到链表。

/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
  int processed = 0;
  aeTimeEvent *te;
  long long maxId;

  te = eventLoop->timeEventHead;
  maxId = eventLoop->timeEventNextId-1;
  monotime now = getMonotonicUs();
  
  // 删除定时器
  while(te) {
    long long id;
        
    // 下一轮中对事件进行删除
    /* Remove events scheduled for deletion. */
    if (te->id == AE_DELETED_EVENT_ID) {
      aeTimeEvent *next = te->next;
      /* If a reference exists for this timer event,
             * don't free it. This is currently incremented
             * for recursive timerProc calls */
      if (te->refcount) {
        te = next;
        continue;
      }
      if (te->prev)
        te->prev->next = te->next;
      else
        eventLoop->timeEventHead = te->next;
      if (te->next)
        te->next->prev = te->prev;
      if (te->finalizerProc) {
        te->finalizerProc(eventLoop, te->clientData);
        now = getMonotonicUs();
      }
      zfree(te);
      te = next;
      continue;
    }
    
    if (te->id > maxId) {
      te = te->next;
      continue;
    }

    if (te->when <= now) {
      int retval;

      id = te->id;
      te->refcount++;
      // timeProc 函数返回值 retVal 为时间事件执行的间隔
      retval = te->timeProc(eventLoop, id, te->clientData);
      te->refcount--;
      processed++;
      now = getMonotonicUs();
      if (retval != AE_NOMORE) {
        te->when = now + retval * 1000;
      } else {
        // 如果超时了,那么标记为删除
        te->id = AE_DELETED_EVENT_ID;
      }
    }
    // 执行下一个
    te = te->next;
  }
  return processed;
}

总结

优点:实现简单
缺点:如果定时任务很多,效率比较低。

Copyright 2022 版权所有 软件发布 访问手机版

声明:所有软件和文章来自软件开发商或者作者 如有异议 请与本站联系 联系我们