import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.connection.RedisConnectionFactory;import org.springframework.data.redis.connection.jedis.JedisConnection;import org.springframework.stereotype.Service;import org.springframework.util.ReflectionUtils;import redis.clients.jedis.Jedis;import java.lang.reflect.Field;import java.util.Collections;@Servicepublic class RedisLock { private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; private static final Long RELEASE_SUCCESS = 1L; @Autowired private RedisConnectionFactory connectionFactory; /** * 尝试获取分布式锁 * @param lockKey 锁 * @param requestId 请求标识 * @param expireTime 超期时间 * @return 是否获取成功 */ public boolean lock(String lockKey, String requestId, int expireTime) { Field jedisField = ReflectionUtils.findField(JedisConnection.class, "jedis"); ReflectionUtils.makeAccessible(jedisField); Jedis jedis = (Jedis) ReflectionUtils.getField(jedisField, connectionFactory.getConnection()); String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } return false; } /** * 释放分布式锁 * @param lockKey 锁 * @param requestId 请求标识 * @return 是否释放成功 */ public boolean releaseLock(String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = getJedis().eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { return true; } return false; } public Jedis getJedis() { Field jedisField = ReflectionUtils.findField(JedisConnection.class, "jedis"); ReflectionUtils.makeAccessible(jedisField); Jedis jedis = (Jedis) ReflectionUtils.getField(jedisField, connectionFactory.getConnection()); return jedis; }}
这里要说的数据结构不是redis对外支持的那几种,如果想了解可以去读redis的文档,这里关注下redis的内部实现数据结构。
redis对外支持string、list、set、hash、zset5种数据结构(最新版支持geo等数据结构,但不在讨论范围),内部实现以下数据结构来支撑:
内部数据结构和外部数据结构的对应关系如下:
数据类型映射
最终版本:
Redis在设计上最大的亮点是其单线程结构,并且还能提供极其强大的并发处理能力和丰富的数据结构,这点让我很激动也很是困惑的。
激动的是redis强大的并发处理能力,以及其丰富的api接口,让日常的业务需要可以更爽的完成。更让我惊叹的是单线程的设计导致redis的代码非常的小巧,整个源码大约5w行,而且不需要处理多线程引入的并发问题,整个代码理解起来也很顺畅。
困惑的是单线程的设计结构为什么能支持这么大的并发量,这一点和我们常规处理大并发的习惯性思维不同。一般在面对大并发的请求,首先想到的是用多个线程来处理,io线程和业务线程分开,业务线程使用线程池来避免频繁创建和销毁线程,即便是一次请求阻塞了也不会影响到其他请求。为什么redis会选择反其道而行之,这么做是否会局限redis的使用,在使用redis时有没有特别需要注意的点?
第二版本:
Redis的文档中多处提到定时处理的逻辑,如过期key的定期清理,aof定时写文件,但是如何在单线程中实现一个定时器呢?
在Redis中所有的IO事件都会被封装成redisServer.aeEventLoop,在Redis的启动函数中,会进行aeEventLoop事件的定时处理回调(serverCron)的注册:
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
定时事件的注册过程如下:
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
te->next = eventLoop->timeEventHead;
eventLoop->timeEventHead = te;
return id;
}
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) 是定时处理逻辑的回调,这里会处理过期key的清理、统计信息更新、对不合理的数据库进行大小调整、关闭和清理连接生效的客户端、尝试进行AOF或RDB持久化操作等。
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
//...
clientsCron();
/* Handle background operations on Redis databases. */
databasesCron();
rewriteAppendOnlyFileBackground();
backgroundSaveDoneHandler(exitcode,bysignal);
freeClientsInAsyncFreeQueue();
clientsArePaused();
/* Replication cron function -- used to reconnect to master,
* detect transfer failures, start background RDB transfers and so forth. */
run_with_period(1000) replicationCron();
/* Run the Redis Cluster cron. */
run_with_period(100) {
if (server.cluster_enabled) clusterCron();
}
/* Run the Sentinel timer if we are in sentinel mode. */
run_with_period(100) {
if (server.sentinel_mode) sentinelTimer();
}
/* Cleanup expired MIGRATE cached sockets. */
run_with_period(1000) {migrateCloseTimedoutSockets();}
//...
return 1000/server.hz;
}
定时回调serverCron具体的处理业务后面再研究,先看看serverCron什么时候会被触发调用。
在Redis事件循环中aeProcessEvents会调用processTimeEvents,从名字上看出是处理定时事件。
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te, *prev;
long long maxId;
time_t now = time(NULL);
/**
* 如果系统时间被调整到将来某段时间然后又被设置回正确的时间,
* 这种情况下链表中的timeEvent有可能会被随机的延迟执行,因
* 此在这个情况下把所有的timeEvent的触发时间设置为0表示及执行
*/
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
eventLoop->lastTime = now; // 设置上次运行时间为now
prev = NULL;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
while(te) {
long now_sec, now_ms;
long long id;
/**
* 删除已经被标志位 删除 的时间事件
*/
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
if (prev == NULL)
eventLoop->timeEventHead = te->next;
else
prev->next = te->next;
if (te->finalizerProc)
// 在时间事件节点被删除前调用finlizerProce()方法
te->finalizerProc(eventLoop, te->clientData);
zfree(te);
te = next;
continue;
}
if (te->id > maxId) {
/**
* te->id > maxId 表示当前te指向的timeEvent为当前循环中新添加的,
* 对于新添加的节点在本次循环中不作处理。
* PS:为什么会出现这种情况呢?有可能是在timeProc()里面会注册新的timeEvent节点?
* 对于当前的Redis版本中不会出现te->id > maxId这种情况
*/
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
// 如果当前时间已经超过了对应的timeEvent节点设置的触发时间,
// 则调用timeProc()方法执行对应的任务,即serverCron
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
if (retval != AE_NOMORE) {
// 要执行多次,则计算下次执行时间
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
// 如果只需要执行一次,则把id设置为-1,再下次循环中删除
te->id = AE_DELETED_EVENT_ID;
}
}
prev = te;
te = te->next;
}
return processed;
}
processTimeEvents会对定时事件进行时间判断,如果到了设置的触发时间则会调用注册的定时回调函数
serverCron。
这里需要注意te->timeProc,即serverCron的返回值,从之前serverCron分析来看,其返回值为1000/server.hz。server.hz是Redis server执行后台任务的频率,默认为10,此值越大表示redis对定时任务的执行次数越频繁,如定期清理过期key。aeAddMillisecondsToNow会根据serverCron的返回值来计算下次定时任务触发的unix时间。
定时器的后果 —— 阻塞
至此已经很清楚,Redis中的定时业务的处理是放在主线程之中,在主线程处理完一次请求之后,接着计算是否到了业务的定时周期,如果到了则处理定时业务。
但是这会加大主线程处理请求的延时,如果在定时回调中塞入过多的处理逻辑或者某一次处理耗时严重,如由于磁盘压力导致aof写文件耗时增加,那么就会阻塞整个主线程的处理。
Redis在主线程中塞入定时处理的业务逻辑,避免再引入一个单独的定时线程,简化了代码,但是也带来阻塞主线程业务处理的风险,因此在定时回调中处理相关定时业务逻辑时需要十分小心,密切注意处理耗时和对cpu的使用。
记录一下走过的弯路.
第一版本:
codis是豌豆荚开发的分布式redis集群解决方案,kvStrore是阿里云分布式redis产品,他们不同于redis cluster的无中心化方案,在服务端设置了一层proxy,在Proxy上实现数据分布策略,数据分布策略对客户端透明。
codis官方说明中对其架构有详细的说明,这里对其架构做一个简要的描述。
codis集群架构如下
codis集群架构
数据分片
Codis 采用 Pre-sharding 的技术来实现数据的分片, 默认分成 1024 个 slots (0-1023), 对于每个key来说, 通过以下公式确定所属的 Slot Id : SlotId = crc32(key) % 1024。
每一个 slot 都会有一个且必须有一个特定的 server group id 来表示这个 slot 的数据由哪个 server group 来提供。数据的迁移也是以slot为单位的。
高可用
proxy层高可用,codis-proxy是无状态的,每个proxy和所有redis实例相连,多个proxy在zk上注册,proxy的可用性由zk进行监控。客户端可以通过轮询选择一个可用的proxy,也可以实现负载均衡。
redis实例高可用,codis-proxy通过订阅redis-sentinel来感知redis实例的可用心。当一个group的master挂掉的时候,codis-proxy会检测到但不会自动将一个slave提升为master,而是报警出来,需要管理员通过codis-dashboard开放的RESTful API手动提升。
横向扩展
采用redis的 increament操作完成锁的抢占.但是释放锁时,是每个线程都可以删除redis中的key值. 并且initLock会降上一次的操作给覆盖掉,所以也废弃掉此方法
对cpu会成为Redis的性能瓶颈的担忧是可以理解的,但是在实际使用过程中cpu或许不是制约Redis的关键因素,网络io可能才是最大的瓶颈。
看一组Redis的性能测试数据对比,Redis版本2.8.19,测试环境如下:
set接口测试
get接口测试
从对比的测试结果来看,Redis的性能还是很强悍的,单机qps最低也在5W以上,但是相同条件下KVStore比Redis on ECS要高出一倍。
分析对比结果,Redis on ECS SSD和Redis on ECS 硬盘在性能上差不多,说明磁盘并不是Redis的性能瓶颈。而Redis on ECS之所以低于KVStore主要是受限于ECS的网络io性能,并没有跑满cpu,导致并发处理上不去。
在Redis性能分析官方文档中,也对影响Redis性能的因素进行了分析和对比测试,大部分情况下网络依然是制约其性能的首要因素。
但是毕竟Redis的单线程模型对多核cpu没有完全利用,如果有这样的担心,那么在网络io没有成为瓶颈时,可以在一台机器上多部署几个Redis的实例,充分利用cpu和网卡的能力。
/** * 分布式锁 * @param range 锁的长度 允许有多少个请求抢占资源 * @param key * @return */ public boolean getLock(int range, String key) { ValueOperationsString, Integer valueOper1 = template.opsForValue(); return valueOper1.increment(key, 1) = range; } /** * 初始化锁, 设置等于0 * @param key * @param expireSeconds * @return */ public void initLock(String key, Long expireSeconds) { ValueOperationsString, Integer operations = template.opsForValue(); template.setKeySerializer(new GenericJackson2JsonRedisSerializer()); template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); operations.set(key, 0, expireSeconds * 1000); } /** * 释放锁 * @param key */ public void releaseLock(String key) { ValueOperationsString, Integer operations = template.opsForValue(); template.setKeySerializer(new GenericJackson2JsonRedisSerializer()); template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); template.delete(key); }
在大型互联网应用中,单机部署redis往往是不够的,单机redis在处理能力、内存容量、系统稳定上都不足以满足业务的需求,因此redis的集群部署是必不可少的。
Redis的集群部署方式平时业务开发过程中实际接触的比较少,在需要使用的时候会有专门的pe维护Redis集群。但是了解redis的高可用和集群部署方案还是很有必要的,这里对redis sentinel、redis cluster、kvstore、codis、tair的集群部署方式做一个分析对比。
采用set 和 del 完成锁的占用与释放,后经测试得知,set不是线程安全,在并发情况下常常会导致数据不一致.
但是除了客户端读写请求之外还有一些比较耗时的操作,如持久化RDB文件,持久化AOF文件等等,这些操作不能放在主线程里面处理,因此Redis会在适当的时候fork子进程来异步的处理这种任务。除了这些,Redis还有一组异步任务处理线程,用于处理不需要主线程同步处理的工作,总体上Redis的线程体系结构大致如下图:
Redis线程结构
上图中间蓝色的部分代表主线程,最左边虚线代表通过fork得到的子进程,用来处理RDB持久化以及AOF持久化等任务,最右边橙色部分代表一组异步任务处理线程,下面会详细介绍这组异步任务处理线程,即Redis异步化组件——BIO组件。
在Redis中,异步任务处理线程组被封装在BIO组件中,源文件为bio.h和bio.c。bio异步线程启动时在main方法调用,会生成BIO_NUM_OPS(3)个线程,线程函数为bioProcessBackgroundJobs。
void bioInit(void) {
pthread_attr_t attr;
pthread_t thread;
size_t stacksize;
int j;
for (j = 0; j < BIO_NUM_OPS; j++) {
pthread_mutex_init(&bio_mutex[j],NULL);
pthread_cond_init(&bio_newjob_cond[j],NULL);
pthread_cond_init(&bio_step_cond[j],NULL);
bio_jobs[j] = listCreate();
bio_pending[j] = 0;
}
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr,&stacksize);
if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
pthread_attr_setstacksize(&attr, stacksize);
for (j = 0; j < BIO_NUM_OPS; j++) {
void *arg = (void*)(unsigned long) j;
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
exit(1);
}
bio_threads[j] = thread;
}
}
bioProcessBackgroundJobs的处理过程如下:
void *bioProcessBackgroundJobs(void *arg) {
pthread_mutex_lock(&bio_mutex[type]);
//...
while(1) {
listNode *ln;
/* Pop the job from the queue. */
ln = listFirst(bio_jobs[type]);
job = ln->value;
pthread_mutex_unlock(&bio_mutex[type]);
/* Process the job accordingly to its type. */
if (type == BIO_CLOSE_FILE) {
close((long)job->arg1);
} else if (type == BIO_AOF_FSYNC) {
aof_fsync((long)job->arg1);
} else if (type == BIO_LAZY_FREE) {
if (job->arg1)
lazyfreeFreeObjectFromBioThread(job->arg1);
else if (job->arg2 && job->arg3)
lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
else if (job->arg3)
lazyfreeFreeSlotsMapFromBioThread(job->arg3);
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
}
zfree(job);
//...
pthread_mutex_lock(&bio_mutex[type]);
}
}
BIO线程目前包括三个线程,处理三种类型的任务:
文件句柄关闭任务
文件句柄的释放(close)对于操作系统来说是一个比较重的操作,在Redis中,当需要重新创建新的文件句柄,废弃的文件句柄失效的时候,这个废弃的文件句柄将由异步任务处理线程来关闭。
AOF持久化任务,Redis对于AOF文件的持久化有三种策略:
这三种策略分别对应不同的业务场景和用户需求,默认的策略为aof_fsync_everysec,这个时候对于aof缓冲区内容持久化工作会交给异步任务处理线程来处理。
内存的释放也是比较重的操作,这部分工作可以交给异步任务处理线程来处理,Redis中通过一部任务释放的空间主要包括三种:
近期工作遇到需要业务场景如下,需要每天定时推送给另一系统一批数据,但是由于系统是集群部署的,会造成统一情况下任务争用的情况,所以需要增加分布式锁来保证一定时间范围内有一个Job来完成定时任务. 前期考虑的方案有采用ZooKeeper分布式任务,Quartz分布式任务调度,但是由于Zookeeper需要增加额外组件,Quartz需要增加表,并且项目中现在已经有Redis这一组件存在,所以考虑采用Redis分布式锁的情况来完成分布式任务抢占这一功能
主要有以下内容:
@Overridepublic T Long set(String key,T value, Long cacheSeconds) {if (value instanceof HashMap) {BoundHashOperations valueOperations = redisTemplate.boundHashOps(key);valueOperations.putAll((Map) value);valueOperations.expire(cacheSeconds, TimeUnit.SECONDS);}else{//使用map存储BoundHashOperations valueOperations = redisTemplate.boundHashOps(key);valueOperations.put(key, value);//秒valueOperations.expire(cacheSeconds, TimeUnit.SECONDS);}return null;}@Overridepublic void del(String key) {redisTemplate.delete(key);}
Redis文档中对于过期key的处理方式的描述有两种:被动和主动方式。
当一些客户端尝试访问它时,key会被发现并主动的过期。
但是这是不够的,因为有些过期的keys,永远不会访问他们。
无论如何,这些keys应该过期,所以定时随机测试设置keys的过期时间,并将过期的keys进行删除。
具体就是Redis每秒10次做的事情:
但是Redis的主线程是单线程,并没有一个专门的线程来负责定时对过期数据进行清理,Redis如何具体完成过期key的查找、定时任务如何设置、对过期keys删除的效果如何?Redis的文档并没有明确的说明,需要从源码中查找。
了解了Redis对于定时任务的处理过程,再来看看Redis对于过期数据的处理策略。
Redis在以下三种情况下会进行过期key的清理。
访问key时,判断是否过期并淘汰
在访问一个key时会顺便检测下这个key是否过期,如果过期则删除。
robj *lookupKeyRead(redisDb *db, robj *key) {
if (expireIfNeeded(db,key) == 1) {
if (server.masterhost == NULL) return NULL;
}
val = lookupKey(db,key,flags);
if (val == NULL)
server.stat_keyspace_misses++;
else
server.stat_keyspace_hits++;
return val;
}
定时逐出过期key
了解了Redis中定时器的实现方式和调用时机,再看看定时逐出过期key是如何具体完成的。databasesCron中调用activeExpireCycle处理过期key清理,databasesCron由serverCron调用。
void databasesCron(void) {
/* Expire keys by random sampling. Not required for slaves
* as master will synthesize DELs for us. */
if (server.active_expire_enabled && server.masterhost == NULL) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
} else if (server.masterhost != NULL) {
expireSlaveKeys();
}
/* Defrag keys gradually. */
if (server.active_defrag_enabled)
activeDefragCycle();
//...
}
activeExpireCycle比较复杂,在清除过期key的同时,还需要密切注意对cpu的使用,以免长时间占用cpu,阻塞业务处理。
注意activeExpireCycle的入参type,主要用于设置清理过期key时的cpu占用时间,如果是ACTIVE_EXPIRE_CYCLE_FAST则最长占用时间为1000微秒(1毫秒),如果不是ACTIVE_EXPIRE_CYCLE_FAST则由配置确定,默认为25000微秒(25毫秒)。
void activeExpireCycle(int type) {
//...
static int timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100;
if (type == ACTIVE_EXPIRE_CYCLE_FAST)
timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION;
//遍历所有db
for (j = 0; j < dbs_per_call; j++) {
//...
do {
while (num--) {
dictEntry *de;
long long ttl;
//随机选取20个key判断,如果过期则进行清除
if ((de = dictGetRandomKey(db->expires)) == NULL) break;
ttl = dictGetSignedIntegerVal(de)-now;
if (activeExpireCycleTryExpire(db,de,now)) expired++;
if (ttl > 0) {
/* We want the average TTL of keys yet not expired. */
ttl_sum += ttl;
ttl_samples++;
}
}
//避免长时间占用cpu,如果cpu使用时间超过timelimit则返回
iteration++;
if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
long long elapsed = ustime()-start;
latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);
if (elapsed > timelimit) timelimit_exit = 1;
}
if (timelimit_exit) return;
//若达到了25%cpu时间,则返回
} while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4);
}
}
每次事件循环执行时逐出部分过期key
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
在每次事件循环之前会调用beforesleep,beforesleep会对过期key进行一次主动检查。
void beforeSleep(struct aeEventLoop *eventLoop) {
//...
if (server.active_expire_enabled && server.masterhost == NULL)
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
}
注意activeExpireCycle的入参为ACTIVE_EXPIRE_CYCLE_FAST,所以这次过期key的清理的cpu占用时长为1毫秒。
Redis对过期key的清理策略总结
日常的业务系统中经常使用到redis,平时也会研究下redis的设计文档和源码,对redis的使用场景、实现方案、运维要点这些常规知识点都有所了解,但是零零碎碎总感觉不够系统,这里结合源码对自己使用redis过程中的一些经验、疑惑、思考进行归纳和总结。
本文由美高梅官方网站发布于数据统计,转载请注明出处:关于Redis的一些思考和总结