提交 05600eb8 编写于 作者: A antirez

fixed two diskstore issues, a quasi-deadlock creating problems with I/O speed...

fixed two diskstore issues, a quasi-deadlock creating problems with I/O speed and a race condition among threads
上级 9c104c68
......@@ -203,6 +203,7 @@ int dictRehash(dict *d, int n) {
/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
assert(d->ht[0].size > (unsigned)d->rehashidx);
while(d->ht[0].table[d->rehashidx] == NULL) d->rehashidx++;
de = d->ht[0].table[d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
......
......@@ -183,7 +183,7 @@ int dsKeyToPath(redisDb *db, char *buf, robj *key) {
return (buf-origbuf)+41;
}
int dsSet(redisDb *db, robj *key, robj *val) {
int dsSet(redisDb *db, robj *key, robj *val, time_t expire) {
char buf[1024], buf2[1024];
FILE *fp;
int retval, len;
......@@ -201,7 +201,7 @@ int dsSet(redisDb *db, robj *key, robj *val) {
redisPanic("Unrecoverable diskstore error. Exiting.");
}
}
if ((retval = rdbSaveKeyValuePair(fp,db,key,val,time(NULL))) == -1)
if ((retval = rdbSaveKeyValuePair(fp,key,val,expire,time(NULL))) == -1)
return REDIS_ERR;
fclose(fp);
if (retval == 0) {
......
......@@ -418,10 +418,12 @@ void *IOThreadEntryPoint(void *arg) {
/* Get a new job to process */
if (listLength(server.io_newjobs) == 0) {
/* Wait for more work to do */
redisLog(REDIS_DEBUG,"[T] wait for signal");
pthread_cond_wait(&server.io_condvar,&server.io_mutex);
redisLog(REDIS_DEBUG,"[T] signal received");
continue;
}
redisLog(REDIS_DEBUG,"%ld IO jobs to process",
redisLog(REDIS_DEBUG,"[T] %ld IO jobs to process",
listLength(server.io_newjobs));
ln = listFirst(server.io_newjobs);
j = ln->value;
......@@ -431,7 +433,7 @@ void *IOThreadEntryPoint(void *arg) {
ln = listLast(server.io_processing); /* We use ln later to remove it */
unlockThreadedIO();
redisLog(REDIS_DEBUG,"Thread %ld: new job type %s: %p about key '%s'",
redisLog(REDIS_DEBUG,"[T] %ld: new job type %s: %p about key '%s'",
(long) pthread_self(),
(j->type == REDIS_IOJOB_LOAD) ? "load" : "save",
(void*)j, (char*)j->key->ptr);
......@@ -444,17 +446,19 @@ void *IOThreadEntryPoint(void *arg) {
if (j->val) j->expire = expire;
} else if (j->type == REDIS_IOJOB_SAVE) {
if (j->val) {
dsSet(j->db,j->key,j->val);
dsSet(j->db,j->key,j->val,j->expire);
} else {
dsDel(j->db,j->key);
}
}
/* Done: insert the job into the processed queue */
redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)",
redisLog(REDIS_DEBUG,"[T] %ld completed the job: %p (key %s)",
(long) pthread_self(), (void*)j, (char*)j->key->ptr);
redisLog(REDIS_DEBUG,"[T] lock IO");
lockThreadedIO();
redisLog(REDIS_DEBUG,"[T] IO locked");
listDelNode(server.io_processing,ln);
listAddNodeTail(server.io_processed,j);
......@@ -501,30 +505,39 @@ int processActiveIOJobs(int max) {
while(max == -1 || max > 0) {
int io_processed_len;
redisLog(REDIS_DEBUG,"[P] lock IO");
lockThreadedIO();
redisLog(REDIS_DEBUG,"Waiting IO jobs processing: new:%d proessing:%d processed:%d",listLength(server.io_newjobs),listLength(server.io_processing),listLength(server.io_processed));
if (listLength(server.io_newjobs) == 0 &&
listLength(server.io_processing) == 0)
{
/* There is nothing more to process */
redisLog(REDIS_DEBUG,"[P] Nothing to process, unlock IO, return");
unlockThreadedIO();
break;
}
#if 0
#if 1
/* If there are new jobs we need to signal the thread to
* process the next one. FIXME: drop this if useless. */
redisLog(REDIS_DEBUG,"waitEmptyIOJobsQueue: new %d, processing %d",
redisLog(REDIS_DEBUG,"[P] waitEmptyIOJobsQueue: new %d, processing %d, processed %d",
listLength(server.io_newjobs),
listLength(server.io_processing));
listLength(server.io_processing),
listLength(server.io_processed));
if (listLength(server.io_newjobs)) {
redisLog(REDIS_DEBUG,"[P] There are new jobs, signal");
pthread_cond_signal(&server.io_condvar);
}
#endif
/* Check if we can process some finished job */
io_processed_len = listLength(server.io_processed);
redisLog(REDIS_DEBUG,"[P] Unblock IO");
unlockThreadedIO();
redisLog(REDIS_DEBUG,"[P] Wait");
usleep(10000);
if (io_processed_len) {
vmThreadedIOCompletedJob(NULL,server.io_ready_pipe_read,
(void*)0xdeadbeef,0);
......@@ -590,7 +603,7 @@ void cacheForcePointInTime(void) {
processAllPendingIOJobs();
}
void cacheCreateIOJob(int type, redisDb *db, robj *key, robj *val) {
void cacheCreateIOJob(int type, redisDb *db, robj *key, robj *val, time_t expire) {
iojob *j;
j = zmalloc(sizeof(*j));
......@@ -600,6 +613,7 @@ void cacheCreateIOJob(int type, redisDb *db, robj *key, robj *val) {
incrRefCount(key);
j->val = val;
if (val) incrRefCount(val);
j->expire = expire;
lockThreadedIO();
queueIOJob(j);
......@@ -780,20 +794,23 @@ int cacheScheduleIOPushJobs(int flags) {
op->type == REDIS_IO_LOAD ? "load" : "save", op->key->ptr);
if (op->type == REDIS_IO_LOAD) {
cacheCreateIOJob(REDIS_IOJOB_LOAD,op->db,op->key,NULL);
cacheCreateIOJob(REDIS_IOJOB_LOAD,op->db,op->key,NULL,0);
} else {
time_t expire = -1;
/* Lookup the key, in order to put the current value in the IO
* Job. Otherwise if the key does not exists we schedule a disk
* store delete operation, setting the value to NULL. */
de = dictFind(op->db->dict,op->key->ptr);
if (de) {
val = dictGetEntryVal(de);
expire = getExpire(op->db,op->key);
} else {
/* Setting the value to NULL tells the IO thread to delete
* the key on disk. */
val = NULL;
}
cacheCreateIOJob(REDIS_IOJOB_SAVE,op->db,op->key,val);
cacheCreateIOJob(REDIS_IOJOB_SAVE,op->db,op->key,val,expire);
}
/* Mark the operation as in progress. */
cacheScheduleIODelFlag(op->db,op->key,op->type);
......
......@@ -399,13 +399,9 @@ off_t rdbSavedObjectLen(robj *o) {
* On error -1 is returned.
* On success if the key was actaully saved 1 is returned, otherwise 0
* is returned (the key was already expired). */
int rdbSaveKeyValuePair(FILE *fp, redisDb *db, robj *key, robj *val,
time_t now)
int rdbSaveKeyValuePair(FILE *fp, robj *key, robj *val,
time_t expiretime, time_t now)
{
time_t expiretime;
expiretime = getExpire(db,key);
/* Save the expire time */
if (expiretime != -1) {
/* If this key is already expired skip it */
......@@ -460,9 +456,11 @@ int rdbSave(char *filename) {
while((de = dictNext(di)) != NULL) {
sds keystr = dictGetEntryKey(de);
robj key, *o = dictGetEntryVal(de);
time_t expire;
initStaticStringObject(key,keystr);
if (rdbSaveKeyValuePair(fp,db,&key,o,now) == -1) goto werr;
expire = getExpire(db,&key);
if (rdbSaveKeyValuePair(fp,&key,o,expire,now) == -1) goto werr;
}
dictReleaseIterator(di);
}
......
......@@ -764,7 +764,7 @@ off_t rdbSavedObjectLen(robj *o);
off_t rdbSavedObjectPages(robj *o);
robj *rdbLoadObject(int type, FILE *fp);
void backgroundSaveDoneHandler(int exitcode, int bysignal);
int rdbSaveKeyValuePair(FILE *fp, redisDb *db, robj *key, robj *val, time_t now);
int rdbSaveKeyValuePair(FILE *fp, robj *key, robj *val, time_t expireitme, time_t now);
int rdbLoadType(FILE *fp);
time_t rdbLoadTime(FILE *fp);
robj *rdbLoadStringObject(FILE *fp);
......@@ -805,7 +805,7 @@ void resetCommandTableStats(void);
/* Disk store */
int dsOpen(void);
int dsClose(void);
int dsSet(redisDb *db, robj *key, robj *val);
int dsSet(redisDb *db, robj *key, robj *val, time_t expire);
robj *dsGet(redisDb *db, robj *key, time_t *expire);
int dsDel(redisDb *db, robj *key);
int dsExists(redisDb *db, robj *key);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册