voidlinsertCommand(redisClient *c) { // 编码 refval 对象 c->argv[4] = tryObjectEncoding(c->argv[4]); if (strcasecmp(c->argv[2]->ptr,"after") == 0) { pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_TAIL); } elseif (strcasecmp(c->argv[2]->ptr,"before") == 0) { pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_HEAD); } else { addReply(c,shared.syntaxerr); } } voidpushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) { robj *subject; listTypeIterator *iter; listTypeEntry entry; int inserted = 0; // 取出列表对象 if ((subject = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL || checkType(c,subject,REDIS_LIST)) return; // 执行的是 LINSERT 命令 if (refval != NULL) { /* We're not sure if this value can be inserted yet, but we cannot * convert the list inside the iterator. We don't want to loop over * the list twice (once to see if the value can be inserted and once * to do the actual insert), so we assume this value can be inserted * and convert the ziplist to a regular list if necessary. */ // 看保存值 value 是否需要将列表编码转换为双端链表 listTypeTryConversion(subject,val); /* Seek refval from head to tail */ // 在列表中查找 refval 对象 iter = listTypeInitIterator(subject,0,REDIS_TAIL); while (listTypeNext(iter,&entry)) { if (listTypeEqual(&entry,refval)) { // 找到了,将值插入到节点的前面或后面 listTypeInsert(&entry,val,where); inserted = 1; break; } } listTypeReleaseIterator(iter); if (inserted) { /* Check if the length exceeds the ziplist length threshold. */ // 查看插入之后是否需要将编码转换为双端链表 if (subject->encoding == REDIS_ENCODING_ZIPLIST && ziplistLen(subject->ptr) > server.list_max_ziplist_entries) listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST); signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(REDIS_NOTIFY_LIST,"linsert", c->argv[1],c->db->id); server.dirty++; } else { /* Notify client of a failed insert */ // refval 不存在,插入失败 addReply(c,shared.cnegone); return; } // 执行的是 LPUSHX 或 RPUSHX 命令 } else { char *event = (where == REDIS_HEAD) ? "lpush" : "rpush"; listTypePush(subject,val,where); signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,c->argv[1],c->db->id); server.dirty++; } addReplyLongLong(c,listTypeLength(subject)); }
voidbrpopCommand(redisClient *c) { blockingPopGenericCommand(c,REDIS_TAIL); } /* Blocking RPOP/LPOP */ voidblockingPopGenericCommand(redisClient *c, int where) { robj *o; mstime_t timeout; int j; // 取出 timeout 参数 if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS) != REDIS_OK) return; // 遍历所有列表键 for (j = 1; j < c->argc-1; j++) { // 取出列表键 o = lookupKeyWrite(c->db,c->argv[j]); // 有非空列表? if (o != NULL) { if (o->type != REDIS_LIST) { addReply(c,shared.wrongtypeerr); return; } else { // 非空列表 if (listTypeLength(o) != 0) { /* Non empty list, this is like a non normal [LR]POP. */ char *event = (where == REDIS_HEAD) ? "lpop" : "rpop"; // 弹出值 robj *value = listTypePop(o,where); redisAssert(value != NULL); // 回复客户端 addReplyMultiBulkLen(c,2); // 回复弹出元素的列表 addReplyBulk(c,c->argv[j]); // 回复弹出值 addReplyBulk(c,value); decrRefCount(value); notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event, c->argv[j],c->db->id); // 删除空列表 if (listTypeLength(o) == 0) { dbDelete(c->db,c->argv[j]); notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del", c->argv[j],c->db->id); } signalModifiedKey(c->db,c->argv[j]); server.dirty++; /* Replicate it as an [LR]POP instead of B[LR]POP. */ // 传播一个 [LR]POP 而不是 B[LR]POP rewriteClientCommandVector(c,2, (where == REDIS_HEAD) ? shared.lpop : shared.rpop, c->argv[j]); return; } } } } /* If we are inside a MULTI/EXEC and the list is empty the only thing * we can do is treating it as a timeout (even with timeout 0). */ // 如果命令在一个事务中执行,那么为了不产生死等待 // 服务器只能向客户端发送一个空回复 if (c->flags & REDIS_MULTI) { addReply(c,shared.nullmultibulk); return; } /* If the list is empty or the key does not exists we must block */ // 所有输入列表键都不存在,只能阻塞了 blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL); } voidblockForKeys(redisClient *c, robj **keys, int numkeys, mstime_t timeout, robj *target) { dictEntry *de; list *l; int j; // 设置阻塞状态的超时和目标选项 c->bpop.timeout = timeout; // target 在执行 RPOPLPUSH 命令时使用 c->bpop.target = target; if (target != NULL) incrRefCount(target); // 关联阻塞客户端和键的相关信息 for (j = 0; j < numkeys; j++) { /* If the key already exists in the dict ignore it. */ // c->bpop.keys 是一个集合(值为 NULL 的字典) // 它记录所有造成客户端阻塞的键 // 以下语句在键不存在于集合的时候,将它添加到集合 if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue; incrRefCount(keys[j]); /* And in the other "side", to map keys -> clients */ // c->db->blocking_keys 字典的键为造成客户端阻塞的键 // 而值则是一个链表,链表中包含了所有被阻塞的客户端 // 以下程序将阻塞键和被阻塞客户端关联起来 de = dictFind(c->db->blocking_keys,keys[j]); if (de == NULL) { // 链表不存在,新创建一个,并将它关联到字典中 int retval; /* For every key we take a list of clients blocked for it */ l = listCreate(); retval = dictAdd(c->db->blocking_keys,keys[j],l); incrRefCount(keys[j]); redisAssertWithInfo(c,keys[j],retval == DICT_OK); } else { l = dictGetVal(de); } // 将客户端填接到被阻塞客户端的链表中 listAddNodeTail(l,c); } blockClient(c,REDIS_BLOCKED_LIST); }
voidbrpoplpushCommand(redisClient *c) { mstime_t timeout; // 取出 timeout 参数 if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS) != REDIS_OK) return; // 取出列表键 robj *key = lookupKeyWrite(c->db, c->argv[1]); // 键为空,阻塞 if (key == NULL) { if (c->flags & REDIS_MULTI) { /* Blocking against an empty list in a multi state * returns immediately. */ addReply(c, shared.nullbulk); } else { /* The list is empty and the client blocks. */ blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]); } // 键非空,执行 RPOPLPUSH } else { if (key->type != REDIS_LIST) { addReply(c, shared.wrongtypeerr); } else { /* The list exists and has elements, so * the regular rpoplpushCommand is executed. */ redisAssertWithInfo(c,key,listTypeLength(key) > 0); rpoplpushCommand(c); } } }
voidlrangeCommand(redisClient *c) { robj *o; long start, end, llen, rangelen; // 取出索引值 start 和 end if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) || (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return; // 取出列表对象 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL || checkType(c,o,REDIS_LIST)) return; // 取出列表长度 llen = listTypeLength(o); /* convert negative indexes */ // 将负数索引转换成正数索引 if (start < 0) start = llen+start; if (end < 0) end = llen+end; if (start < 0) start = 0; /* Invariant: start >= 0, so this test will be true when end < 0. * The range is empty when start > end or start >= length. */ if (start > end || start >= llen) { addReply(c,shared.emptymultibulk); return; } if (end >= llen) end = llen-1; rangelen = (end-start)+1; /* Return the result in form of a multi-bulk reply */ addReplyMultiBulkLen(c,rangelen); if (o->encoding == REDIS_ENCODING_ZIPLIST) { unsignedchar *p = ziplistIndex(o->ptr,start); unsignedchar *vstr; unsignedint vlen; longlong vlong; // 遍历 ziplist ,并将指定索引上的值添加到回复中 while(rangelen--) { ziplistGet(p,&vstr,&vlen,&vlong); if (vstr) { addReplyBulkCBuffer(c,vstr,vlen); } else { addReplyBulkLongLong(c,vlong); } p = ziplistNext(o->ptr,p); } } elseif (o->encoding == REDIS_ENCODING_LINKEDLIST) { listNode *ln; /* If we are nearest to the end of the list, reach the element * starting from tail and going backward, as it is faster. */ if (start > llen/2) start -= llen; ln = listIndex(o->ptr,start); // 遍历双端链表,将指定索引上的值添加到回复 while(rangelen--) { addReplyBulk(c,ln->value); ln = ln->next; } } else { redisPanic("List encoding is not LINKEDLIST nor ZIPLIST!"); } }
voidrpoplpushCommand(redisClient *c) { robj *sobj, *value; // 来源列表 if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL || checkType(c,sobj,REDIS_LIST)) return; // 空列表,没有元素可 pop ,直接返回 if (listTypeLength(sobj) == 0) { /* This may only happen after loading very old RDB files. Recent * versions of Redis delete keys of empty lists. */ addReply(c,shared.nullbulk); // 源列表非空 } else { // 目标对象 robj *dobj = lookupKeyWrite(c->db,c->argv[2]); robj *touchedkey = c->argv[1]; // 检查目标对象是否列表 if (dobj && checkType(c,dobj,REDIS_LIST)) return; // 从源列表中弹出值 value = listTypePop(sobj,REDIS_TAIL); /* We saved touched key, and protect it, since rpoplpushHandlePush * may change the client command argument vector (it does not * currently). */ incrRefCount(touchedkey); // 将值推入目标列表中,如果目标列表不存在,那么创建一个新列表 rpoplpushHandlePush(c,c->argv[2],dobj,value); /* listTypePop returns an object with its refcount incremented */ decrRefCount(value); /* Delete the source list when it is empty */ notifyKeyspaceEvent(REDIS_NOTIFY_LIST,"rpop",touchedkey,c->db->id); // 如果源列表已经为空,那么将它删除 if (listTypeLength(sobj) == 0) { dbDelete(c->db,touchedkey); notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del", touchedkey,c->db->id); } signalModifiedKey(c->db,touchedkey); decrRefCount(touchedkey); server.dirty++; } }