飞道的博客

redis系列,键过期的知识你要知道的全在这里完结篇(二)

322人阅读  评论(0)


前言

即上篇文章介绍的跟过期相关的命令,以及整个过期键的存储过程,这篇文章可以说是过期键的终极篇,主要讲过期键的管理策略

过期键清理相关策略

过期键清理主要分为两大类
1, 被动清理
2, 主动清理

被动清理

redis 本身就是一个key/value的系统即所有的操作都会涉及到对key的操作,即所有对key的操作都可能触发被动清理过期键,即如果发现当前键已经过期,则对该键走过期流程

具体我们来看get相关的代码来做一个例子
get 的代码方法进入后会调用以下的方法

 * The return value of the function is 0 if the key is still valid,
 * otherwise the function returns 1 if the key is expired. */
int expireIfNeeded(redisDb *db, robj *key) {
   
    //查找键是否过期
    if (!keyIsExpired(db,key)) return 0;

    /* If we are running in the context of a slave, instead of
     * evicting the expired key from the database, we return ASAP:
     * the slave key expiration is controlled by the master that will
     * send us synthesized DEL operations for expired keys.
     *
     * Still we try to return the right information to the caller,
     * that is, 0 if we think the key should be still valid, 1 if
     * we think the key is expired at this time. *
	//如果当前的访问不是在主节点,则直接返回,但是返回客户端仍然为null,再更上层的方法会有说明
    if (server.masterhost != NULL) return 1;

    /* Delete the key */
    //统计过期键
    server.stat_expiredkeys++;
    //这个方法主要服务于同步通知到aof的记录,和同步从节点的操作
    propagateExpire(db,key,server.lazyfree_lazy_expire);
    //发布一个过期策略,这个主要涉及到的是redis 事务 watch 的监听,和日志
    notifyKeyspaceEvent(NOTIFY_EXPIRED,
        "expired",key,db->id);
        //采用同步删除还是异步删除 ,具体的异步
    int retval = server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
                                               dbSyncDelete(db,key);
    if (retval) signalModifiedKey(NULL,db,key);
    return retval;
}

get的方法入口可以在源码 t_string.c 看到
get方法主流程
getCommand->getGenericCommand->lookupKeyReadOrReply->
lookupKeyRead->lookupKeyReadWithFlags->expireIfNeeded

业务流程就是
先查询是否过期->过期则执行删除操作(这里指的主节点的操作)->过期则返回客户端为null结果,同时会记录到redis的命中率里面去

**总结:**显然仅仅通过这种被动策略的清理远远达不到,及时清理内存的目的,可能为此频繁触发,内存满了而不得不进行lru 策略或者lfu 策略,这样会给系统造成大的停顿。所以好的解决思路应该是能够在空闲的时候,将不必要的内存给清理到,那么有了以下的主动清理策略。

主动清理流程

在开始讲主动清理流程我们看作者代码的一段注释

/* This is our timer interrupt, called server.hz times per second.
 //这里我们有许多事情要去做异步处理(即现在是同步处理)
 * Here is where we do a number of things that need to be done asynchronously.
 * For instance:
 *   主动过期键集合(可以一种惰性查询的方式,大概意思是这个清理流程不是一次清理干净,将一些工作延后处理具体我们下面会具体分析到)
 * - Active expired keys collection** (it is also performed in a lazy way on
 *   lookup).
 * - Software watchdog.
 * - Update some statistic.
 * - Incremental rehashing of the DBs hash tables.
 * - Triggering BGSAVE / AOF rewrite, and handling of terminated children.
 * - Clients timeout of different kinds.
 * - Replication reconnection.
 * - Many more...
 *
 * Everything directly called here will be called server.hz times per second,
 * so in order to throttle execution of things we want to do less frequently
 * a macro is used: run_with_period(milliseconds) { .... }
 */

这里有一个关键信息我们的主动删除策略是在主流程里面
主流程最主要就是包括我们所有的数据处理主流程,即我们理解redis是单线程,其单线程就是指的这个主流程,即目前现在所有数据处理仍然是没有冲突的

既然主动删除策略是在主流程里面, 那么我们就要对于这个策略必须有几点定位,
1, 不能阻塞主流程。
2, 最好能闲时处理。
3, 根据不同的情况执行的情况,比如我们过期键占用比较高,我们希望清理出更多的内存出来。

带着上面三个诉求我们来看redis 作者是如何来涉及的。

主动删除策略里面也分为两种删除策略:

  1. 慢速周期
  2. 快速周期

慢速周期:
慢速周期的特点执行频率慢,但是每次执行时间长。

在很多文章里面讲到了,cpu的使用率,其实在单线程里面,时间即cpu的使用率,可以看到慢速周期在极限情况下,每秒最多执行250ms ,即占用总的主的流程里面的25%的cpu。

快速周期
快速周期的特点执行频率更快,但是执行时间短

快速周期会被before sleep 触发但是并不是在所有情况下都会执行,只有在满足特定条件会触发。 下面会在代码里面做详细说明

/* Try to expire a few timed out keys. The algorithm used is adaptive and
 * will use few CPU cycles if there are few expiring keys, otherwise
 * it will get more aggressive to avoid that too much memory is used by
 * keys that can be removed from the keyspace.
 * 函数尝试删除数据库中已经过期的键。
 * 当带有过期时间的键比较少时,函数运行得比较保守,
 * 如果带有过期时间的键比较多,那么函数会以更积极的方式来删除过期键,
 * 从而可能地释放被过期键占用的内存。
 * Every expire cycle tests multiple databases: the next call will start
 * again from the next db, with the exception of exists for time limit: in that
 * case we restart again from the last database we were processing. Anyway
 * no more than CRON_DBS_PER_CALL databases are tested at every iteration.
 * 所有的数据库都会被访问到,如果因为时间限制,那么下次清理还会从这个db 开始, 每次循环中被测试的数据库数目不会超过 REDIS_DBCRON_DBS_PER_CALL 。
 * The function can perform more or less work, depending on the "type"
 * argument. It can execute a "fast cycle" or a "slow cycle". The slow
 * cycle is the main way we collect expired cycles: this happens with
 * the "server.hz" frequency (usually 10 hertz).
 * 方法可能会有快速循环和慢速循环两个种类,取决我们的"类型"内容,通常我们会会用到慢速循环,每秒运行10次,会跟我们的server.hz 这个变量有关系
 * However the slow cycle can exit for timeout, since it used too much time.
 * For this reason the function is also invoked to perform a fast cycle
 * at every event loop cycle, in the beforeSleep() function. The fast cycle
 * will try to perform less work, but will do it much more often.
 * 如果慢速周期存在超时存在,因此花费更多的时间,基于此理由我们会转换城快速周期,快速周期每次运行时间会变更短,但是会执行的更频繁,
 * 在beforeSleep 这个方法会执行到快速周期这个方法
 * The following are the details of the two expire cycles and their stop
 * conditions:
 *
 * If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a
 * "fast" expire cycle that takes no longer than EXPIRE_FAST_CYCLE_DURATION
 * microseconds, and is not repeated again before the same amount of time.
 * The cycle will also refuse to run at all if the latest slow cycle did not
 * terminate because of a time limit condition.
 * 如果类型等于 ACTIVE_EXPIRE_CYCLE_FAST 这个常量 ,将会运行快速周期,快速周期总的运行时间
 * 快速周期不会超过EXPIRE_FAST_CYCLE_DURATION, 然后不会再单位时间里面再重复执行
 * 如果最近的慢周期还没有结束,也不会启动快周期的运行。
 * If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is
 * executed, where the time limit is a percentage of the REDIS_HZ period
 * as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. In the
 * fast cycle, the check of every database is interrupted once the number
 * of already expired keys in the database is estimated to be lower than
 * a given percentage, in order to avoid doing too much work to gain too
 * little memory.
 *
 * The configured expire "effort" will modify the baseline parameters in
 * order to do more work in both the fast and slow expire cycles.
 */

#define ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP 20 /* Keys for each DB loop. */
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds. */
#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* Max % of CPU to use. */
#define ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE 10 /* % of stale keys after which
                                                   we do extra efforts. */
/**
 * 方法目的定时从设置了过期的字典中,淘汰已经过期的键释放内存,但为了保持每次回收效率,会在过期占用比低的时候不执行循环
 * 但如果某时出现大量的过期键,也会限制他的时间和cpu使用率
 * @param type
 */
void activeExpireCycle(int type) {
   
    /* Adjust the running parameters according to the configured expire
     * effort. The default effort is 1, and the maximum configurable effort
     * is 10. */
    unsigned long
    effort = server.active_expire_effort-1, /* Rescale from 0 to 9. */
    //每次从字典里面取出键的数
    config_keys_per_loop = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP +
                           ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP/4*effort,
    //快速回收周期的时间,默认是1000微妙,通过effort的方式来自定义,最多不超过3250微妙
    config_cycle_fast_duration = ACTIVE_EXPIRE_CYCLE_FAST_DURATION +
                                 ACTIVE_EXPIRE_CYCLE_FAST_DURATION/4*effort,
    //慢周期cpu的使用率 最高为43
    config_cycle_slow_time_perc = ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC +
                                  2*effort,

    config_cycle_acceptable_stale = ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE-
                                    effort;

    /* This function has some global state in order to continue the work
     * incrementally across calls. */
    //全局变量可能会被其它地方饮用
    static unsigned int current_db = 0; /* Last DB tested. */
    //上次调用是否触发了 时间超时
    static int timelimit_exit = 0;      /* Time limit hit in previous call? */
    static long long last_fast_cycle = 0; /* When last fast cycle ran. */

    int j, iteration = 0;
    int dbs_per_call = CRON_DBS_PER_CALL;
    //定义开始时间,时间限制和花费时间
    long long start = ustime(), timelimit, elapsed;

    /* When clients are paused the dataset should be static not just from the
     * POV of clients not being able to write, but also from the POV of
     * expires and evictions of keys not being performed. */
    /**
     * 当集群中所有客户端都被暂停,则不会触发过期策略
     */
    if (clientsArePaused()) return;
    //第二,在一个周期内只会执行一次fast cycle
    if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
   
        /* Don't start a fast cycle if the previous cycle did not exit
         * for time limit, unless the percentage of estimated stale keys is
         * too high. Also never repeat a fast cycle for the same period
         * as the fast cycle total duration itself. */
        //快速周期只会在上一次是因为超时退出,且目前整个server
        if (!timelimit_exit &&
            server.stat_expired_stale_perc < config_cycle_acceptable_stale)
            return;
        //过去开始时间+两个快速周期,等于现在这个周期结束时间。一次快速迭代默认是1毫秒
        if (start < last_fast_cycle + (long long)config_cycle_fast_duration*2)
            return;

        last_fast_cycle = start;
    }

    /* We usually should test CRON_DBS_PER_CALL per iteration, with
     * two exceptions:
     *
     * 1) Don't test more DBs than we have.
     * 2) If last time we hit the time limit, we want to scan all DBs
     * in this iteration, as there is work to do in some DB and we don't want
     * expired keys to use memory for too much time. */
    //如果上个时间触及到了超时,则扫描全部的db
    if (dbs_per_call > server.dbnum || timelimit_exit)
        dbs_per_call = server.dbnum;

    /* We can use at max 'config_cycle_slow_time_perc' percentage of CPU
     * time per iteration. Since this function gets called with a frequency of
     * server.hz times per second, the following is the max amount of
     * microseconds we can spend in this function. */
    // 函数处理的微秒时间上限
    // ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 默认为 25 ,也即是 25 % 的 CPU 时间, 可以看到每次这个方法由cron决定周期
    // 默认情况下 该方法一秒之内被限定的时间为250毫秒, 250毫秒又分为10次调用,每次调用时长不能超过25毫秒
    timelimit = config_cycle_slow_time_perc*1000000/server.hz/100;
    timelimit_exit = 0;
    if (timelimit <= 0) timelimit = 1;

    if (type == ACTIVE_EXPIRE_CYCLE_FAST)
        timelimit = config_cycle_fast_duration; /* in microseconds. 快速周期已微妙为单位*/

    /* Accumulate some global stats as we expire keys, to have some idea
     * about the number of keys that are already logically expired, but still
     * existing inside the database. */
    long total_sampled = 0;
    long total_expired = 0;

    for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
   
        /* Expired and checked in a single loop. */
        unsigned long expired, sampled;

        redisDb *db = server.db+(current_db % server.dbnum);

        /* Increment the DB now so we are sure if we run out of time
         * in the current DB we'll restart from the next. This allows to
         * distribute the time evenly across DBs. */
        //如果这次时间执行完,下次就轮到执行下一个db。保证每个db的数据都能得到释放,而不是只能释放db
        current_db++;

        /* Continue to expire if at the end of the cycle there are still
         * a big percentage of keys to expire, compared to the number of keys
         * we scanned. The percentage, stored in config_cycle_acceptable_stale
         * is not fixed, but depends on the Redis configured "expire effort". */
        //如果有大的百分比的过期数据在redis 里面
        do {
   
            unsigned long num, slots;
            long long now, ttl_sum;
            int ttl_samples;
            iteration++;

            /* If there is nothing to expire try next DB ASAP. */
            //
            if ((num = dictSize(db->expires)) == 0) {
   
                db->avg_ttl = 0;
                break;
            }
            slots = dictSlots(db->expires);
            now = mstime();

            /* When there are less than 1% filled slots, sampling the key
             * space is expensive, so stop here waiting for better times...
             * The dictionary will be resized asap. */
            //如果使用到的数据比较目前字典可以容纳的总数量小于百分之一,则没必要继续执行下去
            if (num && slots > DICT_HT_INITIAL_SIZE &&
                (num*100/slots < 1)) break;

            /* The main collection cycle. Sample random keys among keys
             * with an expire set, checking for expired ones. */
            expired = 0;
            sampled = 0;
            ttl_sum = 0;
            ttl_samples = 0;

            if (num > config_keys_per_loop)
                num = config_keys_per_loop;

            /* Here we access the low level representation of the hash table
             * for speed concerns: this makes this code coupled with dict.c,
             * but it hardly changed in ten years.
             *
             * Note that certain places of the hash table may be empty,
             * so we want also a stop condition about the number of
             * buckets that we scanned. However scanning for free buckets
             * is very fast: we are in the cache line scanning a sequential
             * array of NULL pointers, so we can scan a lot more buckets
             * than keys in the same time. */
            //控制便利bucket 的数目,已防止便利了很多空数据。
            long max_buckets = num*20;
            long checked_buckets = 0;

            while (sampled < num && checked_buckets < max_buckets) {
   
                for (int table = 0; table < 2; table++) {
   
                    //如果正在做rehash 则退出
                    if (table == 1 && !dictIsRehashing(db->expires)) break;
                    //全局记录当前遍历的位置
                    unsigned long idx = db->expires_cursor;
                    //想当于一个取余的操作
                    idx &= db->expires->ht[table].sizemask;
                    dictEntry *de = db->expires->ht[table].table[idx];
                    long long ttl;

                    /* Scan the current bucket of the current table. */

                    checked_buckets++;
                    //如果entry 不为空,选便利单个entry(便利的是,expired map)
                    while(de) {
   
                        /* Get the next entry now since this entry may get
                         * deleted. */
                        dictEntry *e = de;
                        de = de->next;

                        ttl = dictGetSignedIntegerVal(e)-now;
                        //如果过期,统计过期数
                        if (activeExpireCycleTryExpire(db,e,now)) expired++;
                        //如果没过期,计算样本数,计算总的残余过期时间
                        if (ttl > 0) {
   
                            /* We want the average TTL of keys yet
                             * not expired. */
                            ttl_sum += ttl;
                            ttl_samples++;
                        }
                        sampled++;
                    }
                }
                //游标++
                db->expires_cursor++;
            }
            //计算总的过期数目
            total_expired += expired;
            //计算总的样本数目
            total_sampled += sampled;

            /* Update the average TTL stats for this database. */
            //更新平均的
            if (ttl_samples) {
   
                //平均到期时间
                long long avg_ttl = ttl_sum/ttl_samples;

                /* Do a simple running average with a few samples.
                 * We just use the current estimate with a weight of 2%
                 * and the previous estimate with a weight of 98%. *///
                //更新db的平均到期时间,会采用占比的方式更新
                if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
                db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
            }

            /* We can't block forever here even if there are many keys to
             * expire. So after a given amount of milliseconds return to the
             * caller waiting for the other active expire cycle. */
            //每16次循环来检查是否超过预期执行时间
            if ((iteration & 0xf) == 0) {
    /* check once every 16 iterations. */
                elapsed = ustime()-start;
                if (elapsed > timelimit) {
   
                    timelimit_exit = 1;
                    server.stat_expired_time_cap_reached_count++;
                    break;
                }
            }
            /* We don't repeat the cycle for the current database if there are
             * an acceptable amount of stale keys (logically expired but yet
             * not reclaimed). */
        //
        }//过期数小于10%(默认情况下,则退出本次db的循环,注意是db的循环,即如果db0完了,可能还会去执行db1的 
        while (sampled == 0 ||
                 (expired*100/sampled) > config_cycle_acceptable_stale);
    }

    elapsed = ustime()-start;
    server.stat_expire_cycle_time_used += elapsed;
    latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);

    /* Update our estimate of keys existing but yet to be expired.
     * Running average with this sample accounting for 5%. */
    double current_perc;
    if (total_sampled) {
   
        current_perc = (double)total_expired/total_sampled;
    } else
        current_perc = 0;
    // 当前的过期百分比, 并不会完全更新历史百分比,而是利用一定的比率再统计,所以我们去看监控的时候 这个值,并不代表瞬时的一个百分比
    server.stat_expired_stale_perc = (current_perc*0.05)+
                                     (server.stat_expired_stale_perc*0.95);
}

大流程图如下:

其中看代码的时候 有以下几个要点要注意

  1. fast cycle 执行,上文有说过它是通过before sleep 来触发的,那什么时候before sleep 在哪里触发了

before sleep 会在主循环里面会被调用到,或者在处理rdb/aof过程中调用到,但是要注意到下面一句话, expire key 只会在主循环里面会被涉及(这里应该也是涉及到数据一致性的问题)

  1. fast cycle的执行条件是必须上一次退出是因为超时而退出且,上一个快速执行周期时间已经过去,默认情况下一个cycle的执行周期为1000微秒
  2. 作者代码也很巧妙,每次执行都会遍历不同的db,加入一个db的expired 的占比比较高,但下一个执行周期会跳过,这样能保证redis里面的每个db都能回收内存。
  3. 我们通过active-expire-effort 来调节每个过期周期时间长,和调低整个过期键的占用比率,即active-expire-effort 越大,花费在过期键的时间越长具体可以参照上面的代码注释。

总结:

以上代码都是以redis 6.0.7的最新代码,首先可以看出目前主的数据处理流程还是单线程,作者的所有代码都讲究平衡时间的占比,包括过期键的周期处理时间,以及执行效率,再以上代码都有很完整的体现。但是未来是一个多核的时代,个人猜想redis应该会打破这种完全数据不冲突的情况,从而真正的变成一个多线程的处理的redis, 但是从目前的结构来说 ,如果真的是要多线程redis,可能整个redis 架构是会有大的变动的,单数据处理变成一个多线处理的情况, 各自的处理不再阻塞其它流程,所考虑问题应会从控制单位时间执行时长变化成如何减少这些操作的相互竞争。


转载:https://blog.csdn.net/qq_33361976/article/details/108685615
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场