diff --git a/src/dscache.c b/src/dscache.c index fc358c5290e57899ae59dfc8941c600de6fc540e..fe3f1c1dd20fbd25f5a777cb11a8ff6205ecf9f1 100644 --- a/src/dscache.c +++ b/src/dscache.c @@ -87,6 +87,8 @@ * * - If dsSet() fails on the write thread log the error and reschedule the * key for flush. + * + * - Check why INCR will not update the LRU info for the object. */ /* Virtual Memory is composed mainly of two subsystems: @@ -133,6 +135,7 @@ void dsInit(void) { server.io_processed = listCreate(); server.io_ready_clients = listCreate(); pthread_mutex_init(&server.io_mutex,NULL); + pthread_cond_init(&server.io_condvar,NULL); server.io_active_threads = 0; if (pipe(pipefds) == -1) { redisLog(REDIS_WARNING,"Unable to intialized DS: pipe(2): %s. Exiting." @@ -329,13 +332,14 @@ void *IOThreadEntryPoint(void *arg) { REDIS_NOTUSED(arg); pthread_detach(pthread_self()); + lockThreadedIO(); while(1) { + /* Wait for more work to do */ + pthread_cond_wait(&server.io_condvar,&server.io_mutex); /* Get a new job to process */ - lockThreadedIO(); if (listLength(server.io_newjobs) == 0) { - /* No new jobs in queue, exit. */ + /* No new jobs in queue, reiterate. */ unlockThreadedIO(); - sleep(1); continue; } ln = listFirst(server.io_newjobs); @@ -345,6 +349,7 @@ void *IOThreadEntryPoint(void *arg) { listAddNodeTail(server.io_processing,j); ln = listLast(server.io_processing); /* We use ln later to remove it */ unlockThreadedIO(); + redisLog(REDIS_DEBUG,"Thread %ld: new job type %s: %p about key '%s'", (long) pthread_self(), (j->type == REDIS_IOJOB_LOAD) ? "load" : "save", @@ -367,15 +372,17 @@ void *IOThreadEntryPoint(void *arg) { /* Done: insert the job into the processed queue */ redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)", (long) pthread_self(), (void*)j, (char*)j->key->ptr); + lockThreadedIO(); listDelNode(server.io_processing,ln); listAddNodeTail(server.io_processed,j); - unlockThreadedIO(); /* Signal the main thread there is new stuff to process */ redisAssert(write(server.io_ready_pipe_write,"x",1) == 1); } - return NULL; /* never reached */ + /* never reached, but that's the full pattern... */ + unlockThreadedIO(); + return NULL; } void spawnIOThread(void) { @@ -449,6 +456,7 @@ void dsCreateIOJob(int type, redisDb *db, robj *key, robj *val) { lockThreadedIO(); queueIOJob(j); + pthread_cond_signal(&server.io_condvar); unlockThreadedIO(); } diff --git a/src/networking.c b/src/networking.c index 524c396c38b97529c45c014223cd57fe5d6a740f..0774f7ee174030404180103ecba6fe9b05d7d3f3 100644 --- a/src/networking.c +++ b/src/networking.c @@ -167,7 +167,7 @@ void _addReplyStringToList(redisClient *c, char *s, size_t len) { void addReply(redisClient *c, robj *obj) { if (_installWriteEvent(c) != REDIS_OK) return; - redisAssert(!server.ds_enabled || obj->storage == REDIS_DS_MEMORY); + redisAssert(!server.ds_enabled || obj->storage != REDIS_DS_SAVING); /* This is an important place where we can avoid copy-on-write * when there is a saving child running, avoiding touching the diff --git a/src/object.c b/src/object.c index f4c34dcfd8ab039d2f5801632b5dfff9bff38ae7..a132665e99ce6366e686ba7ca3182b4ba4538cd0 100644 --- a/src/object.c +++ b/src/object.c @@ -32,6 +32,7 @@ robj *createStringObject(char *ptr, size_t len) { robj *createStringObjectFromLongLong(long long value) { robj *o; if (value >= 0 && value < REDIS_SHARED_INTEGERS && + !server.ds_enabled && pthread_equal(pthread_self(),server.mainthread)) { incrRefCount(shared.integers[value]); o = shared.integers[value]; @@ -214,7 +215,7 @@ robj *tryObjectEncoding(robj *o) { * Note that we also avoid using shared integers when maxmemory is used * because every object needs to have a private LRU field for the LRU * algorithm to work well. */ - if (server.ds_enabled == 0 && + if (!server.ds_enabled && server.maxmemory == 0 && value >= 0 && value < REDIS_SHARED_INTEGERS && pthread_equal(pthread_self(),server.mainthread)) { diff --git a/src/redis.h b/src/redis.h index 054ee930d5f88fba7a3e2911926529203c49889b..3a5a274b02558820486149d9e52eaba2f4e9fdf7 100644 --- a/src/redis.h +++ b/src/redis.h @@ -459,7 +459,7 @@ struct redisServer { list *io_processed; /* List of VM I/O jobs already processed */ list *io_ready_clients; /* Clients ready to be unblocked. All keys loaded */ pthread_mutex_t io_mutex; /* lock to access io_jobs/io_done/io_thread_job */ - pthread_mutex_t io_swapfile_mutex; /* So we can lseek + write */ + pthread_cond_t io_condvar; /* I/O threads conditional variable */ pthread_attr_t io_threads_attr; /* attributes for threads creation */ int io_active_threads; /* Number of running I/O threads */ int vm_max_threads; /* Max number of I/O threads running at the same time */