diff --git a/src/dscache.c b/src/dscache.c index 5adbeeeda6c3c0ae785fc78917b193a0a06afcc8..e2fd7e30224dfc08a89bf10ce81186085b924058 100644 --- a/src/dscache.c +++ b/src/dscache.c @@ -107,13 +107,13 @@ * as a fully non-blocking VM. */ +void spawnIOThread(void); + /* =================== Virtual Memory - Blocking Side ====================== */ void dsInit(void) { - off_t totsize; int pipefds[2]; size_t stacksize; - struct flock fl; zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */ @@ -239,42 +239,22 @@ int dsCanTouchDiskStore(void) { /* =================== Virtual Memory - Threaded I/O ======================= */ void freeIOJob(iojob *j) { - if ((j->type == REDIS_IOJOB_PREPARE_SWAP || - j->type == REDIS_IOJOB_DO_SWAP || - j->type == REDIS_IOJOB_LOAD) && j->val != NULL) - { - /* we fix the storage type, otherwise decrRefCount() will try to - * kill the I/O thread Job (that does no longer exists). */ - if (j->val->storage == REDIS_VM_SWAPPING) - j->val->storage = REDIS_VM_MEMORY; - decrRefCount(j->val); - } decrRefCount(j->key); zfree(j); } /* Every time a thread finished a Job, it writes a byte into the write side * of an unix pipe in order to "awake" the main thread, and this function - * is called. - * - * Note that this is called both by the event loop, when a I/O thread - * sends a byte in the notification pipe, and is also directly called from - * waitEmptyIOJobsQueue(). - * - * In the latter case we don't want to swap more, so we use the - * "privdata" argument setting it to a not NULL value to signal this - * condition. */ + * is called. */ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int mask) { char buf[1]; - int retval, processed = 0, toprocess = -1, trytoswap = 1; + int retval, processed = 0, toprocess = -1; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); REDIS_NOTUSED(privdata); - if (privdata != NULL) trytoswap = 0; /* check the comments above... */ - /* For every byte we read in the read side of the pipe, there is one * I/O job completed to process. */ while((retval = read(fd,buf,1)) == 1) { @@ -295,11 +275,7 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, j = ln->value; listDelNode(server.io_processed,ln); unlockThreadedIO(); - /* If this job is marked as canceled, just ignore it */ - if (j->canceled) { - freeIOJob(j); - continue; - } + /* Post process it in the main thread, as there are things we * can do just here to avoid race conditions and/or invasive locks */ redisLog(REDIS_DEBUG,"COMPLETED Job type: %d, ID %p, key: %s", j->type, (void*)j->id, (unsigned char*)j->key->ptr); @@ -322,27 +298,6 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, handleClientsBlockedOnSwappedKey(db,j->key); freeIOJob(j); zfree(vp); - } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) { - /* Now we know the amount of pages required to swap this object. - * Let's find some space for it, and queue this task again - * rebranded as REDIS_IOJOB_DO_SWAP. */ - if (!vmCanSwapOut() || - vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR) - { - /* Ooops... no space or we can't swap as there is - * a fork()ed Redis trying to save stuff on disk. */ - j->val->storage = REDIS_VM_MEMORY; /* undo operation */ - freeIOJob(j); - } else { - /* Note that we need to mark this pages as used now, - * if the job will be canceled, we'll mark them as freed - * again. */ - vmMarkPagesUsed(j->page,j->pages); - j->type = REDIS_IOJOB_DO_SWAP; - lockThreadedIO(); - queueIOJob(j); - unlockThreadedIO(); - } } else if (j->type == REDIS_IOJOB_DO_SWAP) { vmpointer *vp; diff --git a/src/redis.h b/src/redis.h index 8dd461698e054db121abb30c9a86913c19429c2a..e12b1c18c04dd040ef28feb8499be90219db069d 100644 --- a/src/redis.h +++ b/src/redis.h @@ -125,6 +125,7 @@ #define REDIS_DS_SAVING 2 /* There is an IO Job created for this obj. */ #define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1 +#define REDIS_THREAD_STACK_SIZE (1024*1024*4) /* Client flags */ #define REDIS_SLAVE 1 /* This client is a slave server */ @@ -542,22 +543,15 @@ typedef struct zset { } zset; /* VM threaded I/O request message */ -#define REDIS_IOJOB_LOAD 0 /* Load from disk to memory */ -#define REDIS_IOJOB_PREPARE_SWAP 1 /* Compute needed pages */ -#define REDIS_IOJOB_DO_SWAP 2 /* Swap from memory to disk */ +#define REDIS_IOJOB_LOAD 0 +#define REDIS_IOJOB_SAVE 1 + typedef struct iojob { int type; /* Request type, REDIS_IOJOB_* */ redisDb *db;/* Redis database */ - robj *key; /* This I/O request is about swapping this key */ - robj *id; /* Unique identifier of this job: - this is the object to swap for REDIS_IOREQ_*_SWAP, or the - vmpointer objct for REDIS_IOREQ_LOAD. */ - robj *val; /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this - * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */ - off_t page; /* Swap page where to read/write the object */ - off_t pages; /* Swap pages needed to save object. PREPARE_SWAP return val */ - int canceled; /* True if this command was canceled by blocking side of VM */ - pthread_t thread; /* ID of the thread processing this entry */ + robj *key; /* This I/O request is about this key */ + robj *val; /* the value to swap for REDIS_IOJOB_SAVE, otherwise this + * field is populated by the I/O thread for REDIS_IOJOB_LOAD. */ } iojob; /* Structure to hold list iteration abstraction. */