关于Redis的一些思考和总结

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.connection.RedisConnectionFactory;import org.springframework.data.redis.connection.jedis.JedisConnection;import org.springframework.stereotype.Service;import org.springframework.util.ReflectionUtils;import redis.clients.jedis.Jedis;import java.lang.reflect.Field;import java.util.Collections;@Servicepublic class RedisLock { private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; private static final Long RELEASE_SUCCESS = 1L; @Autowired private RedisConnectionFactory connectionFactory; /** * 尝试获取分布式锁 * @param lockKey 锁 * @param requestId 请求标识 * @param expireTime 超期时间 * @return 是否获取成功 */ public boolean lock(String lockKey, String requestId, int expireTime) { Field jedisField = ReflectionUtils.findField(JedisConnection.class, "jedis"); ReflectionUtils.makeAccessible(jedisField); Jedis jedis = (Jedis) ReflectionUtils.getField(jedisField, connectionFactory.getConnection()); String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } return false; } /** * 释放分布式锁 * @param lockKey 锁 * @param requestId 请求标识 * @return 是否释放成功 */ public boolean releaseLock(String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = getJedis().eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { return true; } return false; } public Jedis getJedis() { Field jedisField = ReflectionUtils.findField(JedisConnection.class, "jedis"); ReflectionUtils.makeAccessible(jedisField); Jedis jedis = (Jedis) ReflectionUtils.getField(jedisField, connectionFactory.getConnection()); return jedis; }}

3、数据结构的选择


这里要说的数据结构不是redis对外支持的那几种,如果想了解可以去读redis的文档,这里关注下redis的内部实现数据结构。

redis对外支持string、list、set、hash、zset5种数据结构(最新版支持geo等数据结构,但不在讨论范围),内部实现以下数据结构来支撑:

  • 整数:REDIS_ENCODING_INT
  • 字符串:REDIS_ENCODING_RAW
  • 双端链表:REDIS_ENCODING_LINKEDLIST
  • 跳跃表:REDIS_ENCODING_SKIPLIST
  • 压缩列表:REDIS_ENCODING_ZIPLIST
  • 字典:REDIS_ENCODING_HT
  • 整数集合:REDIS_ENCODING_INTSET

内部数据结构和外部数据结构的对应关系如下:

图片 1

数据类型映射

最终版本:

1、Redis的线程结构


Redis在设计上最大的亮点是其单线程结构,并且还能提供极其强大的并发处理能力和丰富的数据结构,这点让我很激动也很是困惑的。

激动的是redis强大的并发处理能力,以及其丰富的api接口,让日常的业务需要可以更爽的完成。更让我惊叹的是单线程的设计导致redis的代码非常的小巧,整个源码大约5w行,而且不需要处理多线程引入的并发问题,整个代码理解起来也很顺畅。

困惑的是单线程的设计结构为什么能支持这么大的并发量,这一点和我们常规处理大并发的习惯性思维不同。一般在面对大并发的请求,首先想到的是用多个线程来处理,io线程和业务线程分开,业务线程使用线程池来避免频繁创建和销毁线程,即便是一次请求阻塞了也不会影响到其他请求。为什么redis会选择反其道而行之,这么做是否会局限redis的使用,在使用redis时有没有特别需要注意的点?

第二版本:

2.2 定时器?

Redis的文档中多处提到定时处理的逻辑,如过期key的定期清理,aof定时写文件,但是如何在单线程中实现一个定时器呢?

在Redis中所有的IO事件都会被封装成redisServer.aeEventLoop,在Redis的启动函数中,会进行aeEventLoop事件的定时处理回调(serverCron)的注册:

if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
    serverPanic("Can't create event loop timers.");
    exit(1);
}

定时事件的注册过程如下:

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
    aeTimeProc *proc, void *clientData,
    aeEventFinalizerProc *finalizerProc)
{
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;

    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;

    te->id = id;
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    te->next = eventLoop->timeEventHead;
    eventLoop->timeEventHead = te;
    return id;
}

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) 是定时处理逻辑的回调,这里会处理过期key的清理、统计信息更新、对不合理的数据库进行大小调整、关闭和清理连接生效的客户端、尝试进行AOF或RDB持久化操作等。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {

    //...

    clientsCron();

    /* Handle background operations on Redis databases. */
    databasesCron();

    rewriteAppendOnlyFileBackground();

    backgroundSaveDoneHandler(exitcode,bysignal);

    freeClientsInAsyncFreeQueue();

    clientsArePaused();

    /* Replication cron function -- used to reconnect to master,
    * detect transfer failures, start background RDB transfers and so forth. */
    run_with_period(1000) replicationCron();

    /* Run the Redis Cluster cron. */
    run_with_period(100) {
        if (server.cluster_enabled) clusterCron();
    }

    /* Run the Sentinel timer if we are in sentinel mode. */
    run_with_period(100) {
        if (server.sentinel_mode) sentinelTimer();
    }

    /* Cleanup expired MIGRATE cached sockets. */
    run_with_period(1000) {migrateCloseTimedoutSockets();}

    //...

    return 1000/server.hz;
}

定时回调serverCron具体的处理业务后面再研究,先看看serverCron什么时候会被触发调用。
在Redis事件循环中aeProcessEvents会调用processTimeEvents,从名字上看出是处理定时事件。

static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te, *prev;
    long long maxId;
    time_t now = time(NULL);

    /**
    * 如果系统时间被调整到将来某段时间然后又被设置回正确的时间,
    * 这种情况下链表中的timeEvent有可能会被随机的延迟执行,因
    * 此在这个情况下把所有的timeEvent的触发时间设置为0表示及执行
    */
    if (now < eventLoop->lastTime) {
        te = eventLoop->timeEventHead;
        while(te) {
            te->when_sec = 0;
            te = te->next;
        }
    }

    eventLoop->lastTime = now; // 设置上次运行时间为now

    prev = NULL;
    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    while(te) {
        long now_sec, now_ms;
        long long id;

        /**
        * 删除已经被标志位 删除 的时间事件
        */
        if (te->id == AE_DELETED_EVENT_ID) {
        aeTimeEvent *next = te->next;
        if (prev == NULL)
            eventLoop->timeEventHead = te->next;
        else
            prev->next = te->next;
        if (te->finalizerProc)
            // 在时间事件节点被删除前调用finlizerProce()方法
            te->finalizerProc(eventLoop, te->clientData);
            zfree(te);
            te = next;
            continue;
        }

        if (te->id > maxId) {
            /**
            * te->id > maxId 表示当前te指向的timeEvent为当前循环中新添加的,
            * 对于新添加的节点在本次循环中不作处理。
            * PS:为什么会出现这种情况呢?有可能是在timeProc()里面会注册新的timeEvent节点?
            * 对于当前的Redis版本中不会出现te->id > maxId这种情况
            */
            te = te->next;
            continue;
        }

        aeGetTime(&now_sec, &now_ms);
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;
            id = te->id;

            // 如果当前时间已经超过了对应的timeEvent节点设置的触发时间,
            // 则调用timeProc()方法执行对应的任务,即serverCron
            retval = te->timeProc(eventLoop, id, te->clientData);

            processed++;
            if (retval != AE_NOMORE) {
                // 要执行多次,则计算下次执行时间
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                // 如果只需要执行一次,则把id设置为-1,再下次循环中删除
                te->id = AE_DELETED_EVENT_ID;
            }
        }
        prev = te;
        te = te->next;
    }
    return processed;
}

processTimeEvents会对定时事件进行时间判断,如果到了设置的触发时间则会调用注册的定时回调函数
serverCron。

这里需要注意te->timeProc,即serverCron的返回值,从之前serverCron分析来看,其返回值为1000/server.hz。server.hz是Redis server执行后台任务的频率,默认为10,此值越大表示redis对定时任务的执行次数越频繁,如定期清理过期key。aeAddMillisecondsToNow会根据serverCron的返回值来计算下次定时任务触发的unix时间。

定时器的后果 —— 阻塞

至此已经很清楚,Redis中的定时业务的处理是放在主线程之中,在主线程处理完一次请求之后,接着计算是否到了业务的定时周期,如果到了则处理定时业务。

但是这会加大主线程处理请求的延时,如果在定时回调中塞入过多的处理逻辑或者某一次处理耗时严重,如由于磁盘压力导致aof写文件耗时增加,那么就会阻塞整个主线程的处理。

Redis在主线程中塞入定时处理的业务逻辑,避免再引入一个单独的定时线程,简化了代码,但是也带来阻塞主线程业务处理的风险,因此在定时回调中处理相关定时业务逻辑时需要十分小心,密切注意处理耗时和对cpu的使用。

记录一下走过的弯路.

  • Redis的线程结构,单线程结构、异步化组件、最佳线程数、性能瓶颈、阻塞场景
  • 过期和淘汰策略,令人困惑的expire、单线程中的定时器、淘汰策略和时机
  • 数据结构的选择,内部数据结构实现、hash/B+/LSM的特点和场景对比
  • 高可用和集群部署的方式

第一版本:

4.3 codis/kvStrore

codis是豌豆荚开发的分布式redis集群解决方案,kvStrore是阿里云分布式redis产品,他们不同于redis cluster的无中心化方案,在服务端设置了一层proxy,在Proxy上实现数据分布策略,数据分布策略对客户端透明。

codis官方说明中对其架构有详细的说明,这里对其架构做一个简要的描述。

codis集群架构如下

图片 2

codis集群架构

数据分片

  • Codis 采用 Pre-sharding 的技术来实现数据的分片, 默认分成 1024 个 slots (0-1023), 对于每个key来说, 通过以下公式确定所属的 Slot Id : SlotId = crc32(key) % 1024。

    每一个 slot 都会有一个且必须有一个特定的 server group id 来表示这个 slot 的数据由哪个 server group 来提供。数据的迁移也是以slot为单位的。

高可用

  • proxy层高可用,codis-proxy是无状态的,每个proxy和所有redis实例相连,多个proxy在zk上注册,proxy的可用性由zk进行监控。客户端可以通过轮询选择一个可用的proxy,也可以实现负载均衡。

  • redis实例高可用,codis-proxy通过订阅redis-sentinel来感知redis实例的可用心。当一个group的master挂掉的时候,codis-proxy会检测到但不会自动将一个slave提升为master,而是报警出来,需要管理员通过codis-dashboard开放的RESTful API手动提升。

横向扩展

  • Codis支持水平扩容/缩容,扩容可以直接界面的 "Auto Rebalance" 按钮,缩容只需要将要下线的实例拥有的slot迁移到其它实例,然后在界面上删除下线的group即可。

采用redis的 increament操作完成锁的抢占.但是释放锁时,是每个线程都可以删除redis中的key值. 并且initLock会降上一次的操作给覆盖掉,所以也废弃掉此方法

1.4 瓶颈 —— cpu or 网络?

对cpu会成为Redis的性能瓶颈的担忧是可以理解的,但是在实际使用过程中cpu或许不是制约Redis的关键因素,网络io可能才是最大的瓶颈。

看一组Redis的性能测试数据对比,Redis版本2.8.19,测试环境如下:

  • KVStore(阿里云Redis):E5-2682 2.5GHz 1G内存 1G网卡
  • Redis on ECS SSD: E5-2682 2.5GHz 1G内存 1G网
  • Redis on ECS 硬盘: E5-2682 2.5GHz 1G内存 1G网

图片 3

set接口测试

图片 4

get接口测试

从对比的测试结果来看,Redis的性能还是很强悍的,单机qps最低也在5W以上,但是相同条件下KVStore比Redis on ECS要高出一倍。

分析对比结果,Redis on ECS SSD和Redis on ECS 硬盘在性能上差不多,说明磁盘并不是Redis的性能瓶颈。而Redis on ECS之所以低于KVStore主要是受限于ECS的网络io性能,并没有跑满cpu,导致并发处理上不去。

Redis性能分析官方文档中,也对影响Redis性能的因素进行了分析和对比测试,大部分情况下网络依然是制约其性能的首要因素。

但是毕竟Redis的单线程模型对多核cpu没有完全利用,如果有这样的担心,那么在网络io没有成为瓶颈时,可以在一台机器上多部署几个Redis的实例,充分利用cpu和网卡的能力。

/** * 分布式锁 * @param range 锁的长度 允许有多少个请求抢占资源 * @param key * @return */ public boolean getLock(int range, String key) { ValueOperationsString, Integer valueOper1 = template.opsForValue(); return valueOper1.increment(key, 1) = range; } /** * 初始化锁, 设置等于0 * @param key * @param expireSeconds * @return */ public void initLock(String key, Long expireSeconds) { ValueOperationsString, Integer operations = template.opsForValue(); template.setKeySerializer(new GenericJackson2JsonRedisSerializer()); template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); operations.set(key, 0, expireSeconds * 1000); } /** * 释放锁 * @param key */ public void releaseLock(String key) { ValueOperationsString, Integer operations = template.opsForValue(); template.setKeySerializer(new GenericJackson2JsonRedisSerializer()); template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); template.delete(key); }

4、集群部署


在大型互联网应用中,单机部署redis往往是不够的,单机redis在处理能力、内存容量、系统稳定上都不足以满足业务的需求,因此redis的集群部署是必不可少的。

Redis的集群部署方式平时业务开发过程中实际接触的比较少,在需要使用的时候会有专门的pe维护Redis集群。但是了解redis的高可用和集群部署方案还是很有必要的,这里对redis sentinel、redis cluster、kvstore、codis、tair的集群部署方式做一个分析对比。

采用set 和 del 完成锁的占用与释放,后经测试得知,set不是线程安全,在并发情况下常常会导致数据不一致.

1.2 异步化组件

但是除了客户端读写请求之外还有一些比较耗时的操作,如持久化RDB文件,持久化AOF文件等等,这些操作不能放在主线程里面处理,因此Redis会在适当的时候fork子进程来异步的处理这种任务。除了这些,Redis还有一组异步任务处理线程,用于处理不需要主线程同步处理的工作,总体上Redis的线程体系结构大致如下图:

图片 5

Redis线程结构

上图中间蓝色的部分代表主线程,最左边虚线代表通过fork得到的子进程,用来处理RDB持久化以及AOF持久化等任务,最右边橙色部分代表一组异步任务处理线程,下面会详细介绍这组异步任务处理线程,即Redis异步化组件——BIO组件。

在Redis中,异步任务处理线程组被封装在BIO组件中,源文件为bio.h和bio.c。bio异步线程启动时在main方法调用,会生成BIO_NUM_OPS(3)个线程,线程函数为bioProcessBackgroundJobs。

void bioInit(void) {
    pthread_attr_t attr;
    pthread_t thread;
    size_t stacksize;
    int j;

    for (j = 0; j < BIO_NUM_OPS; j++) {
        pthread_mutex_init(&bio_mutex[j],NULL);
        pthread_cond_init(&bio_newjob_cond[j],NULL);
        pthread_cond_init(&bio_step_cond[j],NULL);
        bio_jobs[j] = listCreate();
        bio_pending[j] = 0;
    }

    pthread_attr_init(&attr);
    pthread_attr_getstacksize(&attr,&stacksize);

    if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
    while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
    pthread_attr_setstacksize(&attr, stacksize);

    for (j = 0; j < BIO_NUM_OPS; j++) {
        void *arg = (void*)(unsigned long) j;
        if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
            exit(1);
        }

        bio_threads[j] = thread;
    }
}

bioProcessBackgroundJobs的处理过程如下:

void *bioProcessBackgroundJobs(void *arg) {

    pthread_mutex_lock(&bio_mutex[type]);

    //...

    while(1) {
        listNode *ln;

        /* Pop the job from the queue. */
        ln = listFirst(bio_jobs[type]);
        job = ln->value;

        pthread_mutex_unlock(&bio_mutex[type]);

        /* Process the job accordingly to its type. */
        if (type == BIO_CLOSE_FILE) {
            close((long)job->arg1);
        } else if (type == BIO_AOF_FSYNC) {
            aof_fsync((long)job->arg1);
        } else if (type == BIO_LAZY_FREE) {
            if (job->arg1)
                lazyfreeFreeObjectFromBioThread(job->arg1);
            else if (job->arg2 && job->arg3)
                lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
            else if (job->arg3)
                lazyfreeFreeSlotsMapFromBioThread(job->arg3);
        } else {
            serverPanic("Wrong job type in bioProcessBackgroundJobs().");
        }

        zfree(job);

        //...
        pthread_mutex_lock(&bio_mutex[type]);
    }
}

BIO线程目前包括三个线程,处理三种类型的任务:

  • 文件句柄关闭任务

    文件句柄的释放(close)对于操作系统来说是一个比较重的操作,在Redis中,当需要重新创建新的文件句柄,废弃的文件句柄失效的时候,这个废弃的文件句柄将由异步任务处理线程来关闭。

  • AOF持久化任务,Redis对于AOF文件的持久化有三种策略:

    • 关闭AOF功能
    • aof_fsync_everysec策略,即每秒一次,实际上并不是一定一秒钟一次
    • aof_fsync_always策略,即每次IO事件处理完毕,都将AOF持久化

    这三种策略分别对应不同的业务场景和用户需求,默认的策略为aof_fsync_everysec,这个时候对于aof缓冲区内容持久化工作会交给异步任务处理线程来处理。

  • 内存的释放也是比较重的操作,这部分工作可以交给异步任务处理线程来处理,Redis中通过一部任务释放的空间主要包括三种:

    • 对象空间的释放,当当前内存够用的时候,主线程不会同步释放废弃对象的内存,交给异步任务处理线程来释放,当然需要配合lazyfree_lazy_server_del参数使用。
    • DB空间的异步释放,当需要删除DB的时候,Redis会申请一个新的哈希表作为新的DB,将DB内存的释放的工作交给异步任务处理线程来处理。
    • slots-leys空间释放,在Redis Cluster模式下,slots-keys是由Redis实现的跳跃表数据结构支撑的,当Redis Cluster需要刷新slots-keys的时候,首先创建一个新的跳跃表结构作为新的slots-keys,然后将老的slots-keys结构释放的工作交给异步任务处理线程来处理。

近期工作遇到需要业务场景如下,需要每天定时推送给另一系统一批数据,但是由于系统是集群部署的,会造成统一情况下任务争用的情况,所以需要增加分布式锁来保证一定时间范围内有一个Job来完成定时任务. 前期考虑的方案有采用ZooKeeper分布式任务,Quartz分布式任务调度,但是由于Zookeeper需要增加额外组件,Quartz需要增加表,并且项目中现在已经有Redis这一组件存在,所以考虑采用Redis分布式锁的情况来完成分布式任务抢占这一功能

主要有以下内容:

@Overridepublic T Long set(String key,T value, Long cacheSeconds) {if (value instanceof HashMap) {BoundHashOperations valueOperations = redisTemplate.boundHashOps(key);valueOperations.putAll((Map) value);valueOperations.expire(cacheSeconds, TimeUnit.SECONDS);}else{//使用map存储BoundHashOperations valueOperations = redisTemplate.boundHashOps(key);valueOperations.put(key, value);//秒valueOperations.expire(cacheSeconds, TimeUnit.SECONDS);}return null;}@Overridepublic void del(String key) {redisTemplate.delete(key);}

2.1 令人困惑的expire

Redis文档中对于过期key的处理方式的描述有两种:被动和主动方式。

当一些客户端尝试访问它时,key会被发现并主动的过期。
但是这是不够的,因为有些过期的keys,永远不会访问他们。 无论如何,这些keys应该过期,所以定时随机测试设置keys的过期时间,并将过期的keys进行删除。
具体就是Redis每秒10次做的事情:

  • 测试随机的20个keys进行相关过期检测。
  • 删除所有已经过期的keys。
  • 如果有多于25%的keys过期,重复步奏1。

但是Redis的主线程是单线程,并没有一个专门的线程来负责定时对过期数据进行清理,Redis如何具体完成过期key的查找、定时任务如何设置、对过期keys删除的效果如何?Redis的文档并没有明确的说明,需要从源码中查找。

2.3 数据过期策略

了解了Redis对于定时任务的处理过程,再来看看Redis对于过期数据的处理策略。
Redis在以下三种情况下会进行过期key的清理。

访问key时,判断是否过期并淘汰

在访问一个key时会顺便检测下这个key是否过期,如果过期则删除。

    robj *lookupKeyRead(redisDb *db, robj *key) {
        if (expireIfNeeded(db,key) == 1) {
            if (server.masterhost == NULL) return NULL;
        }

        val = lookupKey(db,key,flags);
        if (val == NULL)
            server.stat_keyspace_misses++;
        else
            server.stat_keyspace_hits++;
        return val;
    }

定时逐出过期key

了解了Redis中定时器的实现方式和调用时机,再看看定时逐出过期key是如何具体完成的。databasesCron中调用activeExpireCycle处理过期key清理,databasesCron由serverCron调用。

    void databasesCron(void) {
        /* Expire keys by random sampling. Not required for slaves
        * as master will synthesize DELs for us. */
        if (server.active_expire_enabled && server.masterhost == NULL) {
            activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
        } else if (server.masterhost != NULL) {
            expireSlaveKeys();
        }

        /* Defrag keys gradually. */
        if (server.active_defrag_enabled)
            activeDefragCycle();

        //...
    }

activeExpireCycle比较复杂,在清除过期key的同时,还需要密切注意对cpu的使用,以免长时间占用cpu,阻塞业务处理。

注意activeExpireCycle的入参type,主要用于设置清理过期key时的cpu占用时间,如果是ACTIVE_EXPIRE_CYCLE_FAST则最长占用时间为1000微秒(1毫秒),如果不是ACTIVE_EXPIRE_CYCLE_FAST则由配置确定,默认为25000微秒(25毫秒)。

    void activeExpireCycle(int type) {
        //...

        static int timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100;

        if (type == ACTIVE_EXPIRE_CYCLE_FAST)
            timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; 

        //遍历所有db
        for (j = 0; j < dbs_per_call; j++) {
            //...
            do {        
                while (num--) {
                    dictEntry *de;
                    long long ttl;

                    //随机选取20个key判断,如果过期则进行清除
                    if ((de = dictGetRandomKey(db->expires)) == NULL) break;
                    ttl = dictGetSignedIntegerVal(de)-now;
                    if (activeExpireCycleTryExpire(db,de,now)) expired++;
                    if (ttl > 0) {
                        /* We want the average TTL of keys yet not expired. */
                        ttl_sum += ttl;
                        ttl_samples++;
                    }
                }

                //避免长时间占用cpu,如果cpu使用时间超过timelimit则返回
                iteration++;
                if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
                    long long elapsed = ustime()-start;

                    latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);
                    if (elapsed > timelimit) timelimit_exit = 1;
                }

                if (timelimit_exit) return;

            //若达到了25%cpu时间,则返回
            } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4);
        }
    }

每次事件循环执行时逐出部分过期key

    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
        while (!eventLoop->stop) {
            if (eventLoop->beforesleep != NULL)
                eventLoop->beforesleep(eventLoop);
            aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
        }
    }

在每次事件循环之前会调用beforesleep,beforesleep会对过期key进行一次主动检查。

    void beforeSleep(struct aeEventLoop *eventLoop) {
        //...

        if (server.active_expire_enabled && server.masterhost == NULL)
            activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
    }

注意activeExpireCycle的入参为ACTIVE_EXPIRE_CYCLE_FAST,所以这次过期key的清理的cpu占用时长为1毫秒。

Redis对过期key的清理策略总结

  • Redis配置项hz定义了serverCron任务的执行周期,默认为10,即cpu空闲时每秒执行10次。
  • 每次过期key清理的视觉不超过cpu时间的25%,即若hz=1,则一次清理时间最大为250ms,若hz=10,则一次清理时间最大为25ms。
  • 清理时依次遍历所有的db。
  • 从db中随机取20个key,判断是否过期,若过期则清理。
  • 若有5个以上key过期,则重复步骤4,否则遍历下一个db。
  • 在清理过程中,若达到了25%cpu时间,则退出清理过程。

日常的业务系统中经常使用到redis,平时也会研究下redis的设计文档和源码,对redis的使用场景、实现方案、运维要点这些常规知识点都有所了解,但是零零碎碎总感觉不够系统,这里结合源码对自己使用redis过程中的一些经验、疑惑、思考进行归纳和总结。

本文由美高梅官方网站发布于数据统计,转载请注明出处:关于Redis的一些思考和总结

TAG标签:
Ctrl+D 将本页面保存为书签,全面了解最新资讯,方便快捷。