提交 cea8c5cd 编写于 作者: A antirez

touched key for WATCH refactored into a more general thing that can be used...

touched key for WATCH refactored into a more general thing that can be used also for the cache system. Some more changes towards diskstore working.
上级 d0212210
......@@ -152,20 +152,41 @@ int selectDb(redisClient *c, int id) {
return REDIS_OK;
}
/*-----------------------------------------------------------------------------
* Hooks for key space changes.
*
* Every time a key in the database is modified the function
* signalModifiedKey() is called.
*
* Every time a DB is flushed the function signalFlushDb() is called.
*----------------------------------------------------------------------------*/
void signalModifiedKey(redisDb *db, robj *key) {
touchWatchedKey(db,key);
if (server.ds_enabled)
cacheScheduleForFlush(db,key);
}
void signalFlushedDb(int dbid) {
touchWatchedKeysOnFlush(dbid);
if (server.ds_enabled)
dsFlushDb(dbid);
}
/*-----------------------------------------------------------------------------
* Type agnostic commands operating on the key space
*----------------------------------------------------------------------------*/
void flushdbCommand(redisClient *c) {
server.dirty += dictSize(c->db->dict);
touchWatchedKeysOnFlush(c->db->id);
signalFlushedDb(c->db->id);
dictEmpty(c->db->dict);
dictEmpty(c->db->expires);
addReply(c,shared.ok);
}
void flushallCommand(redisClient *c) {
touchWatchedKeysOnFlush(-1);
signalFlushedDb(-1);
server.dirty += emptyDb();
addReply(c,shared.ok);
if (server.bgsavechildpid != -1) {
......@@ -181,7 +202,7 @@ void delCommand(redisClient *c) {
for (j = 1; j < c->argc; j++) {
if (dbDelete(c->db,c->argv[j])) {
touchWatchedKey(c->db,c->argv[j]);
signalModifiedKey(c->db,c->argv[j]);
server.dirty++;
deleted++;
}
......@@ -327,8 +348,8 @@ void renameGenericCommand(redisClient *c, int nx) {
dbReplace(c->db,c->argv[2],o);
}
dbDelete(c->db,c->argv[1]);
touchWatchedKey(c->db,c->argv[1]);
touchWatchedKey(c->db,c->argv[2]);
signalModifiedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[2]);
server.dirty++;
addReply(c,nx ? shared.cone : shared.ok);
}
......@@ -487,13 +508,13 @@ void expireGenericCommand(redisClient *c, robj *key, robj *param, long offset) {
if (seconds <= 0) {
if (dbDelete(c->db,key)) server.dirty++;
addReply(c, shared.cone);
touchWatchedKey(c->db,key);
signalModifiedKey(c->db,key);
return;
} else {
time_t when = time(NULL)+seconds;
setExpire(c->db,key,when);
addReply(c,shared.cone);
touchWatchedKey(c->db,key);
signalModifiedKey(c->db,key);
server.dirty++;
return;
}
......
......@@ -120,3 +120,6 @@ int dsDel(redisDb *db, robj *key) {
int dsExists(redisDb *db, robj *key) {
}
int dsFlushDb(int dbid) {
}
......@@ -208,16 +208,16 @@ robj *tryObjectEncoding(robj *o) {
/* Ok, this object can be encoded...
*
* Can I use a shared object? Only if the object is inside a given
* range and if this is the main thread, since when VM is enabled we
* have the constraint that I/O thread should only handle non-shared
* objects, in order to avoid race conditions (we don't have per-object
* locking).
* range and if the back end in use is in-memory. For disk store every
* object in memory used as value should be independent.
*
* Note that we also avoid using shared integers when maxmemory is used
* because very object needs to have a private LRU field for the LRU
* because every object needs to have a private LRU field for the LRU
* algorithm to work well. */
if (server.maxmemory == 0 && value >= 0 && value < REDIS_SHARED_INTEGERS &&
pthread_equal(pthread_self(),server.mainthread)) {
if (server.ds_enabled == 0 &&
server.maxmemory == 0 && value >= 0 && value < REDIS_SHARED_INTEGERS &&
pthread_equal(pthread_self(),server.mainthread))
{
decrRefCount(o);
incrRefCount(shared.integers[value]);
return shared.integers[value];
......
......@@ -848,7 +848,6 @@ int rdbLoad(char *filename) {
FILE *fp;
uint32_t dbid;
int type, retval, rdbver;
int swap_all_values = 0;
redisDb *db = server.db+0;
char buf[1024];
time_t expiretime, now = time(NULL);
......@@ -919,28 +918,6 @@ int rdbLoad(char *filename) {
/* Set the expire time if needed */
if (expiretime != -1) setExpire(db,key,expiretime);
/* Handle swapping while loading big datasets when VM is on */
/* If we detecter we are hopeless about fitting something in memory
* we just swap every new key on disk. Directly...
* Note that's important to check for this condition before resorting
* to random sampling, otherwise we may try to swap already
* swapped keys. */
if (swap_all_values) {
dictEntry *de = dictFind(db->dict,key->ptr);
/* de may be NULL since the key already expired */
if (de) {
vmpointer *vp;
val = dictGetEntryVal(de);
if (val->refcount == 1 &&
(vp = vmSwapObjectBlocking(val)) != NULL)
dictGetEntryVal(de) = vp;
}
decrRefCount(key);
continue;
}
decrRefCount(key);
}
fclose(fp);
......
......@@ -830,6 +830,9 @@ void initServer() {
server.slaves = listCreate();
server.monitors = listCreate();
server.unblocked_clients = listCreate();
server.cache_flush_queue = listCreate();
server.cache_flush_delay = 0;
createSharedObjects();
server.el = aeCreateEventLoop();
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
......
......@@ -431,7 +431,9 @@ struct redisServer {
/* Blocked clients */
unsigned int bpop_blocked_clients;
unsigned int cache_blocked_clients;
list *unblocked_clients;
list *unblocked_clients; /* list of clients to unblock before next loop */
list *cache_flush_queue; /* keys to flush on disk */
int cache_flush_delay; /* seconds to wait before flushing keys */
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
int sort_desc;
......@@ -554,6 +556,14 @@ typedef struct iojob {
* field is populated by the I/O thread for REDIS_IOJOB_LOAD. */
} iojob;
/* When diskstore is enabled and a flush operation is requested we push
* one of this structures into server.cache_flush_queue. */
typedef struct dirtykey {
redisDb *db;
robj *key;
time_t ctime; /* This is the creation time of the entry. */
} dirtykey;
/* Structure to hold list iteration abstraction. */
typedef struct {
robj *subject;
......@@ -774,33 +784,22 @@ int dsSet(redisDb *db, robj *key, robj *val);
robj *dsGet(redisDb *db, robj *key);
int dsDel(redisDb *db, robj *key);
int dsExists(redisDb *db, robj *key);
int dsFlushDb(int dbid);
/* Disk Store Cache */
void vmInit(void);
void vmMarkPagesFree(off_t page, off_t count);
robj *vmLoadObject(robj *o);
robj *vmPreviewObject(robj *o);
int vmSwapOneObjectBlocking(void);
int vmSwapOneObjectThreaded(void);
int vmCanSwapOut(void);
void dsInit(void);
void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int mask);
void vmCancelThreadedIOJob(robj *o);
void lockThreadedIO(void);
void unlockThreadedIO(void);
int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db);
void freeIOJob(iojob *j);
void queueIOJob(iojob *j);
int vmWriteObjectOnSwap(robj *o, off_t page);
robj *vmReadObjectFromSwap(off_t page, int type);
void waitEmptyIOJobsQueue(void);
void vmReopenSwapFile(void);
int vmFreePage(off_t page);
void zunionInterBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv);
void execBlockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd, int argc, robj **argv);
int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd);
int dontWaitForSwappedKey(redisClient *c, robj *key);
void handleClientsBlockedOnSwappedKey(redisDb *db, robj *key);
vmpointer *vmSwapObjectBlocking(robj *val);
int cacheFreeOneEntry(void);
/* Set data type */
robj *setTypeCreate(robj *value);
......@@ -871,6 +870,8 @@ robj *dbRandomKey(redisDb *db);
int dbDelete(redisDb *db, robj *key);
long long emptyDb();
int selectDb(redisClient *c, int id);
void signalModifiedKey(redisDb *db, robj *key);
void signalFlushedDb(int dbid);
/* Git SHA1 */
char *redisGitSHA1(void);
......
......@@ -368,7 +368,7 @@ void sortCommand(redisClient *c) {
* SORT result is empty a new key is set and maybe the old content
* replaced. */
server.dirty += 1+outputlen;
touchWatchedKey(c->db,storekey);
signalModifiedKey(c->db,storekey);
addReplyLongLong(c,outputlen);
}
......
......@@ -279,7 +279,7 @@ void hsetCommand(redisClient *c) {
hashTypeTryObjectEncoding(o,&c->argv[2], &c->argv[3]);
update = hashTypeSet(o,c->argv[2],c->argv[3]);
addReply(c, update ? shared.czero : shared.cone);
touchWatchedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
}
......@@ -294,7 +294,7 @@ void hsetnxCommand(redisClient *c) {
hashTypeTryObjectEncoding(o,&c->argv[2], &c->argv[3]);
hashTypeSet(o,c->argv[2],c->argv[3]);
addReply(c, shared.cone);
touchWatchedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
}
}
......@@ -315,7 +315,7 @@ void hmsetCommand(redisClient *c) {
hashTypeSet(o,c->argv[i],c->argv[i+1]);
}
addReply(c, shared.ok);
touchWatchedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
}
......@@ -342,7 +342,7 @@ void hincrbyCommand(redisClient *c) {
hashTypeSet(o,c->argv[2],new);
decrRefCount(new);
addReplyLongLong(c,value);
touchWatchedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
}
......@@ -402,7 +402,7 @@ void hdelCommand(redisClient *c) {
if (hashTypeDelete(o,c->argv[2])) {
if (hashTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
addReply(c,shared.cone);
touchWatchedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
} else {
addReply(c,shared.czero);
......
......@@ -274,14 +274,14 @@ void pushGenericCommand(redisClient *c, int where) {
return;
}
if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
touchWatchedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
addReply(c,shared.cone);
return;
}
}
listTypePush(lobj,c->argv[2],where);
addReplyLongLong(c,listTypeLength(lobj));
touchWatchedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
}
......@@ -330,7 +330,7 @@ void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
ziplistLen(subject->ptr) > server.list_max_ziplist_entries)
listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
touchWatchedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
} else {
/* Notify client of a failed insert */
......@@ -339,7 +339,7 @@ void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
}
} else {
listTypePush(subject,val,where);
touchWatchedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
}
......@@ -427,7 +427,7 @@ void lsetCommand(redisClient *c) {
o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr));
decrRefCount(value);
addReply(c,shared.ok);
touchWatchedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
}
} else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
......@@ -439,7 +439,7 @@ void lsetCommand(redisClient *c) {
listNodeValue(ln) = value;
incrRefCount(value);
addReply(c,shared.ok);
touchWatchedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
}
} else {
......@@ -458,7 +458,7 @@ void popGenericCommand(redisClient *c, int where) {
addReplyBulk(c,value);
decrRefCount(value);
if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
touchWatchedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
}
}
......@@ -573,7 +573,7 @@ void ltrimCommand(redisClient *c) {
redisPanic("Unknown list encoding");
}
if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
touchWatchedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
addReply(c,shared.ok);
}
......@@ -616,7 +616,7 @@ void lremCommand(redisClient *c) {
if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]);
addReplyLongLong(c,removed);
if (removed) touchWatchedKey(c->db,c->argv[1]);
if (removed) signalModifiedKey(c->db,c->argv[1]);
}
/* This is the semantic of this command:
......@@ -642,7 +642,7 @@ void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value
dstobj = createZiplistObject();
dbAdd(c->db,dstkey,dstobj);
} else {
touchWatchedKey(c->db,dstkey);
signalModifiedKey(c->db,dstkey);
server.dirty++;
}
listTypePush(dstobj,value,REDIS_HEAD);
......@@ -670,7 +670,7 @@ void rpoplpushCommand(redisClient *c) {
/* Delete the source list when it is empty */
if (listTypeLength(sobj) == 0) dbDelete(c->db,c->argv[1]);
touchWatchedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
}
}
......
......@@ -231,7 +231,7 @@ void saddCommand(redisClient *c) {
}
}
if (setTypeAdd(set,c->argv[2])) {
touchWatchedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
addReply(c,shared.cone);
} else {
......@@ -248,7 +248,7 @@ void sremCommand(redisClient *c) {
c->argv[2] = tryObjectEncoding(c->argv[2]);
if (setTypeRemove(set,c->argv[2])) {
if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
touchWatchedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
addReply(c,shared.cone);
} else {
......@@ -287,8 +287,8 @@ void smoveCommand(redisClient *c) {
/* Remove the src set from the database when empty */
if (setTypeSize(srcset) == 0) dbDelete(c->db,c->argv[1]);
touchWatchedKey(c->db,c->argv[1]);
touchWatchedKey(c->db,c->argv[2]);
signalModifiedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[2]);
server.dirty++;
/* Create the destination set when it doesn't exist */
......@@ -341,7 +341,7 @@ void spopCommand(redisClient *c) {
setTypeRemove(set,ele);
}
if (setTypeSize(set) == 0) dbDelete(c->db,c->argv[1]);
touchWatchedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
}
......@@ -382,7 +382,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum,
zfree(sets);
if (dstkey) {
if (dbDelete(c->db,dstkey)) {
touchWatchedKey(c->db,dstkey);
signalModifiedKey(c->db,dstkey);
server.dirty++;
}
addReply(c,shared.czero);
......@@ -486,7 +486,7 @@ void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum,
decrRefCount(dstset);
addReply(c,shared.czero);
}
touchWatchedKey(c->db,dstkey);
signalModifiedKey(c->db,dstkey);
server.dirty++;
} else {
setDeferredMultiBulkLength(c,replylen,cardinality);
......@@ -578,7 +578,7 @@ void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *
decrRefCount(dstset);
addReply(c,shared.czero);
}
touchWatchedKey(c->db,dstkey);
signalModifiedKey(c->db,dstkey);
server.dirty++;
}
zfree(sets);
......
......@@ -38,7 +38,7 @@ void setGenericCommand(redisClient *c, int nx, robj *key, robj *val, robj *expir
} else {
incrRefCount(val);
}
touchWatchedKey(c->db,key);
signalModifiedKey(c->db,key);
server.dirty++;
removeExpire(c->db,key);
if (expire) setExpire(c->db,key,time(NULL)+seconds);
......@@ -84,7 +84,7 @@ void getsetCommand(redisClient *c) {
c->argv[2] = tryObjectEncoding(c->argv[2]);
dbReplace(c->db,c->argv[1],c->argv[2]);
incrRefCount(c->argv[2]);
touchWatchedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
removeExpire(c->db,c->argv[1]);
}
......@@ -156,7 +156,7 @@ void setbitCommand(redisClient *c) {
byteval &= ~(1 << bit);
byteval |= ((on & 0x1) << bit);
((char*)o->ptr)[byte] = byteval;
touchWatchedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
addReply(c, bitval ? shared.cone : shared.czero);
}
......@@ -244,7 +244,7 @@ void setrangeCommand(redisClient *c) {
if (sdslen(value) > 0) {
o->ptr = sdsgrowzero(o->ptr,offset+sdslen(value));
memcpy((char*)o->ptr+offset,value,sdslen(value));
touchWatchedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
}
addReplyLongLong(c,sdslen(o->ptr));
......@@ -331,7 +331,7 @@ void msetGenericCommand(redisClient *c, int nx) {
dbReplace(c->db,c->argv[j],c->argv[j+1]);
incrRefCount(c->argv[j+1]);
removeExpire(c->db,c->argv[j]);
touchWatchedKey(c->db,c->argv[j]);
signalModifiedKey(c->db,c->argv[j]);
}
server.dirty += (c->argc-1)/2;
addReply(c, nx ? shared.cone : shared.ok);
......@@ -361,7 +361,7 @@ void incrDecrCommand(redisClient *c, long long incr) {
}
o = createStringObjectFromLongLong(value);
dbReplace(c->db,c->argv[1],o);
touchWatchedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
addReply(c,shared.colon);
addReply(c,o);
......@@ -424,7 +424,7 @@ void appendCommand(redisClient *c) {
o->ptr = sdscatlen(o->ptr,append->ptr,sdslen(append->ptr));
totlen = sdslen(o->ptr);
}
touchWatchedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
addReplyLongLong(c,totlen);
}
......
......@@ -399,7 +399,7 @@ void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double score, int
de = dictFind(zs->dict,ele);
redisAssert(de != NULL);
dictGetEntryVal(de) = &znode->score;
touchWatchedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
if (incr)
addReplyDouble(c,score);
......@@ -427,7 +427,7 @@ void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double score, int
/* Update the score in the current dict entry */
dictGetEntryVal(de) = &znode->score;
touchWatchedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
}
if (incr)
......@@ -477,7 +477,7 @@ void zremCommand(redisClient *c) {
dictDelete(zs->dict,c->argv[2]);
if (htNeedsResize(zs->dict)) dictResize(zs->dict);
if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]);
touchWatchedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
addReply(c,shared.cone);
}
......@@ -501,7 +501,7 @@ void zremrangebyscoreCommand(redisClient *c) {
deleted = zslDeleteRangeByScore(zs->zsl,range,zs->dict);
if (htNeedsResize(zs->dict)) dictResize(zs->dict);
if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]);
if (deleted) touchWatchedKey(c->db,c->argv[1]);
if (deleted) signalModifiedKey(c->db,c->argv[1]);
server.dirty += deleted;
addReplyLongLong(c,deleted);
}
......@@ -540,7 +540,7 @@ void zremrangebyrankCommand(redisClient *c) {
deleted = zslDeleteRangeByRank(zs->zsl,start+1,end+1,zs->dict);
if (htNeedsResize(zs->dict)) dictResize(zs->dict);
if (dictSize(zs->dict) == 0) dbDelete(c->db,c->argv[1]);
if (deleted) touchWatchedKey(c->db,c->argv[1]);
if (deleted) signalModifiedKey(c->db,c->argv[1]);
server.dirty += deleted;
addReplyLongLong(c, deleted);
}
......@@ -741,14 +741,14 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
}
if (dbDelete(c->db,dstkey)) {
touchWatchedKey(c->db,dstkey);
signalModifiedKey(c->db,dstkey);
touched = 1;
server.dirty++;
}
if (dstzset->zsl->length) {
dbAdd(c->db,dstkey,dstobj);
addReplyLongLong(c, dstzset->zsl->length);
if (!touched) touchWatchedKey(c->db,dstkey);
if (!touched) signalModifiedKey(c->db,dstkey);
server.dirty++;
} else {
decrRefCount(dstobj);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册