diff --git a/src/debug.c b/src/debug.c index fff8d7277f8728ded6d1a37ef5c4f012edb7232d..88c88ca91afeebf61e84742fe26c3f5678b04d12 100644 --- a/src/debug.c +++ b/src/debug.c @@ -200,60 +200,28 @@ void debugCommand(redisClient *c) { } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) { dictEntry *de = dictFind(c->db->dict,c->argv[2]->ptr); robj *val; + char *strenc; + char *storage; if (!de) { addReply(c,shared.nokeyerr); return; } val = dictGetEntryVal(de); - if (!server.vm_enabled || (val->storage == REDIS_VM_MEMORY || - val->storage == REDIS_VM_SWAPPING)) { - char *strenc; - - strenc = strEncoding(val->encoding); - addReplyStatusFormat(c, - "Value at:%p refcount:%d " - "encoding:%s serializedlength:%lld " - "lru:%d lru_seconds_idle:%lu", - (void*)val, val->refcount, - strenc, (long long) rdbSavedObjectLen(val), - val->lru, estimateObjectIdleTime(val)); - } else { - vmpointer *vp = (vmpointer*) val; - addReplyStatusFormat(c, - "Value swapped at: page %llu " - "using %llu pages", - (unsigned long long) vp->page, - (unsigned long long) vp->usedpages); - } - } else if (!strcasecmp(c->argv[1]->ptr,"swapin") && c->argc == 3) { - lookupKeyRead(c->db,c->argv[2]); - addReply(c,shared.ok); - } else if (!strcasecmp(c->argv[1]->ptr,"swapout") && c->argc == 3) { - dictEntry *de = dictFind(c->db->dict,c->argv[2]->ptr); - robj *val; - vmpointer *vp; - - if (!server.vm_enabled) { - addReplyError(c,"Virtual Memory is disabled"); - return; - } - if (!de) { - addReply(c,shared.nokeyerr); - return; - } - val = dictGetEntryVal(de); - /* Swap it */ - if (val->storage != REDIS_VM_MEMORY) { - addReplyError(c,"This key is not in memory"); - } else if (val->refcount != 1) { - addReplyError(c,"Object is shared"); - } else if ((vp = vmSwapObjectBlocking(val)) != NULL) { - dictGetEntryVal(de) = vp; - addReply(c,shared.ok); - } else { - addReply(c,shared.err); + strenc = strEncoding(val->encoding); + switch(val->storage) { + case REDIS_DS_MEMORY: storage = "memory"; break; + case REDIS_DS_DIRTY: storage = "dirty"; break; + case REDIS_DS_SAVING: storage = "saving"; break; + default: storage = "unknown"; break; } + addReplyStatusFormat(c, + "Value at:%p refcount:%d " + "encoding:%s serializedlength:%lld " + "lru:%d lru_seconds_idle:%lu storage:%s", + (void*)val, val->refcount, + strenc, (long long) rdbSavedObjectLen(val), + val->lru, estimateObjectIdleTime(val), storage); } else if (!strcasecmp(c->argv[1]->ptr,"populate") && c->argc == 3) { long keys, j; robj *key, *val; diff --git a/src/diskstore.c b/src/diskstore.c index acc7c16f93ed6cb9c151fda11c0eac1ffcaa50d8..3904310d3c5c4a070ff0e8a1762014c043826c10 100644 --- a/src/diskstore.c +++ b/src/diskstore.c @@ -115,5 +115,8 @@ int dsSet(redisDb *db, robj *key, robj *val) { robj *dsGet(redisDb *db, robj *key) { } +int dsDel(redisDb *db, robj *key) { +} + int dsExists(redisDb *db, robj *key) { } diff --git a/src/dscache.c b/src/dscache.c index 2bda0b509016ddeff0c0afe077fdda3cdee0d6dd..5570e9c5dba59f4019d7b579cfa6f6c93a3bf212 100644 --- a/src/dscache.c +++ b/src/dscache.c @@ -227,6 +227,7 @@ int cacheFreeOneEntry(void) { dbDelete(best_db,kobj); decrRefCount(kobj); } + return REDIS_OK; } /* Return true if it's safe to swap out objects in a given moment. @@ -240,7 +241,8 @@ int dsCanTouchDiskStore(void) { void freeIOJob(iojob *j) { decrRefCount(j->key); - decrRefCount(j->val); + /* j->val can be NULL if the job is about deleting the key from disk. */ + if (j->val) decrRefCount(j->val); zfree(j); } @@ -279,13 +281,17 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, /* Post process it in the main thread, as there are things we * can do just here to avoid race conditions and/or invasive locks */ - redisLog(REDIS_DEBUG,"COMPLETED Job type: %d, ID %p, key: %s", j->type, (void*)j->id, (unsigned char*)j->key->ptr); + redisLog(REDIS_DEBUG,"COMPLETED Job type %s, key: %s", + (j->type == REDIS_IOJOB_LOAD) ? "load" : "save", + (unsigned char*)j->key->ptr); de = dictFind(j->db->dict,j->key->ptr); redisAssert(de != NULL); if (j->type == REDIS_IOJOB_LOAD) { + /* Create the key-value pair in the in-memory database */ dbAdd(j->db,j->key,j->val); + /* Handle clients waiting for this key to be loaded. */ + handleClientsBlockedOnSwappedKey(j->db,j->key); freeIOJob(j); - /* FIXME: notify clients waiting for this key */ } else if (j->type == REDIS_IOJOB_SAVE) { redisAssert(j->val->storage == REDIS_DS_SAVING); j->val->storage = REDIS_DS_MEMORY; @@ -330,22 +336,23 @@ void *IOThreadEntryPoint(void *arg) { j = ln->value; listDelNode(server.io_newjobs,ln); /* Add the job in the processing queue */ - j->thread = pthread_self(); listAddNodeTail(server.io_processing,j); ln = listLast(server.io_processing); /* We use ln later to remove it */ unlockThreadedIO(); - redisLog(REDIS_DEBUG,"Thread %ld got a new job (type %d): %p about key '%s'", - (long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr); + redisLog(REDIS_DEBUG,"Thread %ld: new job type %s: %p about key '%s'", + (long) pthread_self(), + (j->type == REDIS_IOJOB_LOAD) ? "load" : "save", + (void*)j, (char*)j->key->ptr); /* Process the Job */ if (j->type == REDIS_IOJOB_LOAD) { - vmpointer *vp = (vmpointer*)j->id; - j->val = vmReadObjectFromSwap(j->page,vp->vtype); - } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) { - j->pages = rdbSavedObjectPages(j->val); - } else if (j->type == REDIS_IOJOB_DO_SWAP) { - if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR) - j->canceled = 1; + j->val = dsGet(j->db,j->key); + redisAssert(j->val != NULL); + } else if (j->type == REDIS_IOJOB_SAVE) { + if (j->val) + dsSet(j->db,j->key,j->val); + else + dsDel(j->db,j->key); } /* Done: insert the job into the processed queue */ @@ -420,52 +427,50 @@ void queueIOJob(iojob *j) { spawnIOThread(); } -int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) { +void dsCreateIOJob(int type, redisDb *db, robj *key, robj *val) { iojob *j; j = zmalloc(sizeof(*j)); - j->type = REDIS_IOJOB_PREPARE_SWAP; + j->type = type; j->db = db; j->key = key; incrRefCount(key); - j->id = j->val = val; + j->val = val; incrRefCount(val); - j->canceled = 0; - j->thread = (pthread_t) -1; - val->storage = REDIS_VM_SWAPPING; lockThreadedIO(); queueIOJob(j); unlockThreadedIO(); - return REDIS_OK; } /* ============ Virtual Memory - Blocking clients on missing keys =========== */ /* This function makes the clinet 'c' waiting for the key 'key' to be loaded. - * If there is not already a job loading the key, it is craeted. - * The key is added to the io_keys list in the client structure, and also + * If the key is already in memory we don't need to block, regardless + * of the storage of the value object for this key: + * + * - If it's REDIS_DS_MEMORY we have the key in memory. + * - If it's REDIS_DS_DIRTY they key was modified, but still in memory. + * - if it's REDIS_DS_SAVING the key is being saved by an IO Job. When + * the client will lookup the key it will block if the key is still + * in this stage but it's more or less the best we can do. + * FIXME: we should try if it's actually better to suspend the client + * accessing an object that is being saved, and awake it only when + * the saving was completed. + * + * Otherwise if the key is not in memory, we block the client and start + * an IO Job to load it: + * + * the key is added to the io_keys list in the client structure, and also * in the hash table mapping swapped keys to waiting clients, that is, * server.io_waited_keys. */ int waitForSwappedKey(redisClient *c, robj *key) { struct dictEntry *de; - robj *o; list *l; - /* If the key does not exist or is already in RAM we don't need to - * block the client at all. */ + /* Return ASAP if the key is in memory */ de = dictFind(c->db->dict,key->ptr); - if (de == NULL) return 0; - o = dictGetEntryVal(de); - if (o->storage == REDIS_VM_MEMORY) { - return 0; - } else if (o->storage == REDIS_VM_SWAPPING) { - /* We were swapping the key, undo it! */ - vmCancelThreadedIOJob(o); - return 0; - } - - /* OK: the key is either swapped, or being loaded just now. */ + if (de != NULL) return 0; /* Add the key to the list of keys this client is waiting for. * This maps clients to keys they are waiting for. */ @@ -488,25 +493,8 @@ int waitForSwappedKey(redisClient *c, robj *key) { listAddNodeTail(l,c); /* Are we already loading the key from disk? If not create a job */ - if (o->storage == REDIS_VM_SWAPPED) { - iojob *j; - vmpointer *vp = (vmpointer*)o; - - o->storage = REDIS_VM_LOADING; - j = zmalloc(sizeof(*j)); - j->type = REDIS_IOJOB_LOAD; - j->db = c->db; - j->id = (robj*)vp; - j->key = key; - incrRefCount(key); - j->page = vp->page; - j->val = NULL; - j->canceled = 0; - j->thread = (pthread_t) -1; - lockThreadedIO(); - queueIOJob(j); - unlockThreadedIO(); - } + if (de == NULL) + dsCreateIOJob(REDIS_IOJOB_LOAD,c->db,key,NULL); return 1; } @@ -584,7 +572,7 @@ int blockClientOnSwappedKeys(redisClient *c, struct redisCommand *cmd) { if (listLength(c->io_keys)) { c->flags |= REDIS_IO_WAIT; aeDeleteFileEvent(server.el,c->fd,AE_READABLE); - server.vm_blocked_clients++; + server.cache_blocked_clients++; return 1; } else { return 0; diff --git a/src/redis.h b/src/redis.h index e12b1c18c04dd040ef28feb8499be90219db069d..1557e2604fd3296c2ecd870591d05659150de831 100644 --- a/src/redis.h +++ b/src/redis.h @@ -772,6 +772,7 @@ int dsOpen(void); int dsClose(void); int dsSet(redisDb *db, robj *key, robj *val); robj *dsGet(redisDb *db, robj *key); +int dsDel(redisDb *db, robj *key); int dsExists(redisDb *db, robj *key); /* Disk Store Cache */