diff --git a/src/dict.c b/src/dict.c index 9be7fb168ed00b8b633e06a47b0b74c87e6a70fd..6b7010ba2efba11231be224f5fe0479d8ccb605f 100644 --- a/src/dict.c +++ b/src/dict.c @@ -203,6 +203,7 @@ int dictRehash(dict *d, int n) { /* Note that rehashidx can't overflow as we are sure there are more * elements because ht[0].used != 0 */ + assert(d->ht[0].size > (unsigned)d->rehashidx); while(d->ht[0].table[d->rehashidx] == NULL) d->rehashidx++; de = d->ht[0].table[d->rehashidx]; /* Move all the keys in this bucket from the old to the new hash HT */ diff --git a/src/diskstore.c b/src/diskstore.c index 49c8706a48528141eb48bf3bab6dbada6bb837f4..9e86364e9c2ddd0e9cd54d8092e8bca2bcdeb76a 100644 --- a/src/diskstore.c +++ b/src/diskstore.c @@ -183,7 +183,7 @@ int dsKeyToPath(redisDb *db, char *buf, robj *key) { return (buf-origbuf)+41; } -int dsSet(redisDb *db, robj *key, robj *val) { +int dsSet(redisDb *db, robj *key, robj *val, time_t expire) { char buf[1024], buf2[1024]; FILE *fp; int retval, len; @@ -201,7 +201,7 @@ int dsSet(redisDb *db, robj *key, robj *val) { redisPanic("Unrecoverable diskstore error. Exiting."); } } - if ((retval = rdbSaveKeyValuePair(fp,db,key,val,time(NULL))) == -1) + if ((retval = rdbSaveKeyValuePair(fp,key,val,expire,time(NULL))) == -1) return REDIS_ERR; fclose(fp); if (retval == 0) { diff --git a/src/dscache.c b/src/dscache.c index 8243e794827ae1f9160ee58d94dee5396b71e503..aeeae40a107ca4a3a7858d6860eead989c3c1afc 100644 --- a/src/dscache.c +++ b/src/dscache.c @@ -418,10 +418,12 @@ void *IOThreadEntryPoint(void *arg) { /* Get a new job to process */ if (listLength(server.io_newjobs) == 0) { /* Wait for more work to do */ + redisLog(REDIS_DEBUG,"[T] wait for signal"); pthread_cond_wait(&server.io_condvar,&server.io_mutex); + redisLog(REDIS_DEBUG,"[T] signal received"); continue; } - redisLog(REDIS_DEBUG,"%ld IO jobs to process", + redisLog(REDIS_DEBUG,"[T] %ld IO jobs to process", listLength(server.io_newjobs)); ln = listFirst(server.io_newjobs); j = ln->value; @@ -431,7 +433,7 @@ void *IOThreadEntryPoint(void *arg) { ln = listLast(server.io_processing); /* We use ln later to remove it */ unlockThreadedIO(); - redisLog(REDIS_DEBUG,"Thread %ld: new job type %s: %p about key '%s'", + redisLog(REDIS_DEBUG,"[T] %ld: new job type %s: %p about key '%s'", (long) pthread_self(), (j->type == REDIS_IOJOB_LOAD) ? "load" : "save", (void*)j, (char*)j->key->ptr); @@ -444,17 +446,19 @@ void *IOThreadEntryPoint(void *arg) { if (j->val) j->expire = expire; } else if (j->type == REDIS_IOJOB_SAVE) { if (j->val) { - dsSet(j->db,j->key,j->val); + dsSet(j->db,j->key,j->val,j->expire); } else { dsDel(j->db,j->key); } } /* Done: insert the job into the processed queue */ - redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)", + redisLog(REDIS_DEBUG,"[T] %ld completed the job: %p (key %s)", (long) pthread_self(), (void*)j, (char*)j->key->ptr); + redisLog(REDIS_DEBUG,"[T] lock IO"); lockThreadedIO(); + redisLog(REDIS_DEBUG,"[T] IO locked"); listDelNode(server.io_processing,ln); listAddNodeTail(server.io_processed,j); @@ -501,30 +505,39 @@ int processActiveIOJobs(int max) { while(max == -1 || max > 0) { int io_processed_len; + redisLog(REDIS_DEBUG,"[P] lock IO"); lockThreadedIO(); + redisLog(REDIS_DEBUG,"Waiting IO jobs processing: new:%d proessing:%d processed:%d",listLength(server.io_newjobs),listLength(server.io_processing),listLength(server.io_processed)); + if (listLength(server.io_newjobs) == 0 && listLength(server.io_processing) == 0) { /* There is nothing more to process */ + redisLog(REDIS_DEBUG,"[P] Nothing to process, unlock IO, return"); unlockThreadedIO(); break; } -#if 0 +#if 1 /* If there are new jobs we need to signal the thread to * process the next one. FIXME: drop this if useless. */ - redisLog(REDIS_DEBUG,"waitEmptyIOJobsQueue: new %d, processing %d", + redisLog(REDIS_DEBUG,"[P] waitEmptyIOJobsQueue: new %d, processing %d, processed %d", listLength(server.io_newjobs), - listLength(server.io_processing)); + listLength(server.io_processing), + listLength(server.io_processed)); if (listLength(server.io_newjobs)) { + redisLog(REDIS_DEBUG,"[P] There are new jobs, signal"); pthread_cond_signal(&server.io_condvar); } #endif /* Check if we can process some finished job */ io_processed_len = listLength(server.io_processed); + redisLog(REDIS_DEBUG,"[P] Unblock IO"); unlockThreadedIO(); + redisLog(REDIS_DEBUG,"[P] Wait"); + usleep(10000); if (io_processed_len) { vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read, (void*)0xdeadbeef,0); @@ -590,7 +603,7 @@ void cacheForcePointInTime(void) { processAllPendingIOJobs(); } -void cacheCreateIOJob(int type, redisDb *db, robj *key, robj *val) { +void cacheCreateIOJob(int type, redisDb *db, robj *key, robj *val, time_t expire) { iojob *j; j = zmalloc(sizeof(*j)); @@ -600,6 +613,7 @@ void cacheCreateIOJob(int type, redisDb *db, robj *key, robj *val) { incrRefCount(key); j->val = val; if (val) incrRefCount(val); + j->expire = expire; lockThreadedIO(); queueIOJob(j); @@ -780,20 +794,23 @@ int cacheScheduleIOPushJobs(int flags) { op->type == REDIS_IO_LOAD ? "load" : "save", op->key->ptr); if (op->type == REDIS_IO_LOAD) { - cacheCreateIOJob(REDIS_IOJOB_LOAD,op->db,op->key,NULL); + cacheCreateIOJob(REDIS_IOJOB_LOAD,op->db,op->key,NULL,0); } else { + time_t expire = -1; + /* Lookup the key, in order to put the current value in the IO * Job. Otherwise if the key does not exists we schedule a disk * store delete operation, setting the value to NULL. */ de = dictFind(op->db->dict,op->key->ptr); if (de) { val = dictGetEntryVal(de); + expire = getExpire(op->db,op->key); } else { /* Setting the value to NULL tells the IO thread to delete * the key on disk. */ val = NULL; } - cacheCreateIOJob(REDIS_IOJOB_SAVE,op->db,op->key,val); + cacheCreateIOJob(REDIS_IOJOB_SAVE,op->db,op->key,val,expire); } /* Mark the operation as in progress. */ cacheScheduleIODelFlag(op->db,op->key,op->type); diff --git a/src/rdb.c b/src/rdb.c index 83fe81e503146c231083aa3a388b62c807a9a6aa..02317fda1d42718fe1a320d4d67175e7fc6863f3 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -399,13 +399,9 @@ off_t rdbSavedObjectLen(robj *o) { * On error -1 is returned. * On success if the key was actaully saved 1 is returned, otherwise 0 * is returned (the key was already expired). */ -int rdbSaveKeyValuePair(FILE *fp, redisDb *db, robj *key, robj *val, - time_t now) +int rdbSaveKeyValuePair(FILE *fp, robj *key, robj *val, + time_t expiretime, time_t now) { - time_t expiretime; - - expiretime = getExpire(db,key); - /* Save the expire time */ if (expiretime != -1) { /* If this key is already expired skip it */ @@ -460,9 +456,11 @@ int rdbSave(char *filename) { while((de = dictNext(di)) != NULL) { sds keystr = dictGetEntryKey(de); robj key, *o = dictGetEntryVal(de); + time_t expire; initStaticStringObject(key,keystr); - if (rdbSaveKeyValuePair(fp,db,&key,o,now) == -1) goto werr; + expire = getExpire(db,&key); + if (rdbSaveKeyValuePair(fp,&key,o,expire,now) == -1) goto werr; } dictReleaseIterator(di); } diff --git a/src/redis.h b/src/redis.h index 5ae9cc1c14e009125220de47674ec2afdf811118..6d49243dbba0568765591a6e58eaa78557c1948e 100644 --- a/src/redis.h +++ b/src/redis.h @@ -764,7 +764,7 @@ off_t rdbSavedObjectLen(robj *o); off_t rdbSavedObjectPages(robj *o); robj *rdbLoadObject(int type, FILE *fp); void backgroundSaveDoneHandler(int exitcode, int bysignal); -int rdbSaveKeyValuePair(FILE *fp, redisDb *db, robj *key, robj *val, time_t now); +int rdbSaveKeyValuePair(FILE *fp, robj *key, robj *val, time_t expireitme, time_t now); int rdbLoadType(FILE *fp); time_t rdbLoadTime(FILE *fp); robj *rdbLoadStringObject(FILE *fp); @@ -805,7 +805,7 @@ void resetCommandTableStats(void); /* Disk store */ int dsOpen(void); int dsClose(void); -int dsSet(redisDb *db, robj *key, robj *val); +int dsSet(redisDb *db, robj *key, robj *val, time_t expire); robj *dsGet(redisDb *db, robj *key, time_t *expire); int dsDel(redisDb *db, robj *key); int dsExists(redisDb *db, robj *key);