提交 f34a6cd8 编写于 作者: A antirez

still more work for diskstore

上级 f2da3a62
...@@ -107,13 +107,13 @@ ...@@ -107,13 +107,13 @@
* as a fully non-blocking VM. * as a fully non-blocking VM.
*/ */
void spawnIOThread(void);
/* =================== Virtual Memory - Blocking Side ====================== */ /* =================== Virtual Memory - Blocking Side ====================== */
void dsInit(void) { void dsInit(void) {
off_t totsize;
int pipefds[2]; int pipefds[2];
size_t stacksize; size_t stacksize;
struct flock fl;
zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */ zmalloc_enable_thread_safeness(); /* we need thread safe zmalloc() */
...@@ -239,42 +239,22 @@ int dsCanTouchDiskStore(void) { ...@@ -239,42 +239,22 @@ int dsCanTouchDiskStore(void) {
/* =================== Virtual Memory - Threaded I/O ======================= */ /* =================== Virtual Memory - Threaded I/O ======================= */
void freeIOJob(iojob *j) { void freeIOJob(iojob *j) {
if ((j->type == REDIS_IOJOB_PREPARE_SWAP ||
j->type == REDIS_IOJOB_DO_SWAP ||
j->type == REDIS_IOJOB_LOAD) && j->val != NULL)
{
/* we fix the storage type, otherwise decrRefCount() will try to
* kill the I/O thread Job (that does no longer exists). */
if (j->val->storage == REDIS_VM_SWAPPING)
j->val->storage = REDIS_VM_MEMORY;
decrRefCount(j->val);
}
decrRefCount(j->key); decrRefCount(j->key);
zfree(j); zfree(j);
} }
/* Every time a thread finished a Job, it writes a byte into the write side /* Every time a thread finished a Job, it writes a byte into the write side
* of an unix pipe in order to "awake" the main thread, and this function * of an unix pipe in order to "awake" the main thread, and this function
* is called. * is called. */
*
* Note that this is called both by the event loop, when a I/O thread
* sends a byte in the notification pipe, and is also directly called from
* waitEmptyIOJobsQueue().
*
* In the latter case we don't want to swap more, so we use the
* "privdata" argument setting it to a not NULL value to signal this
* condition. */
void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
int mask) int mask)
{ {
char buf[1]; char buf[1];
int retval, processed = 0, toprocess = -1, trytoswap = 1; int retval, processed = 0, toprocess = -1;
REDIS_NOTUSED(el); REDIS_NOTUSED(el);
REDIS_NOTUSED(mask); REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata); REDIS_NOTUSED(privdata);
if (privdata != NULL) trytoswap = 0; /* check the comments above... */
/* For every byte we read in the read side of the pipe, there is one /* For every byte we read in the read side of the pipe, there is one
* I/O job completed to process. */ * I/O job completed to process. */
while((retval = read(fd,buf,1)) == 1) { while((retval = read(fd,buf,1)) == 1) {
...@@ -295,11 +275,7 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, ...@@ -295,11 +275,7 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
j = ln->value; j = ln->value;
listDelNode(server.io_processed,ln); listDelNode(server.io_processed,ln);
unlockThreadedIO(); unlockThreadedIO();
/* If this job is marked as canceled, just ignore it */
if (j->canceled) {
freeIOJob(j);
continue;
}
/* Post process it in the main thread, as there are things we /* Post process it in the main thread, as there are things we
* can do just here to avoid race conditions and/or invasive locks */ * can do just here to avoid race conditions and/or invasive locks */
redisLog(REDIS_DEBUG,"COMPLETED Job type: %d, ID %p, key: %s", j->type, (void*)j->id, (unsigned char*)j->key->ptr); redisLog(REDIS_DEBUG,"COMPLETED Job type: %d, ID %p, key: %s", j->type, (void*)j->id, (unsigned char*)j->key->ptr);
...@@ -322,27 +298,6 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, ...@@ -322,27 +298,6 @@ void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
handleClientsBlockedOnSwappedKey(db,j->key); handleClientsBlockedOnSwappedKey(db,j->key);
freeIOJob(j); freeIOJob(j);
zfree(vp); zfree(vp);
} else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
/* Now we know the amount of pages required to swap this object.
* Let's find some space for it, and queue this task again
* rebranded as REDIS_IOJOB_DO_SWAP. */
if (!vmCanSwapOut() ||
vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR)
{
/* Ooops... no space or we can't swap as there is
* a fork()ed Redis trying to save stuff on disk. */
j->val->storage = REDIS_VM_MEMORY; /* undo operation */
freeIOJob(j);
} else {
/* Note that we need to mark this pages as used now,
* if the job will be canceled, we'll mark them as freed
* again. */
vmMarkPagesUsed(j->page,j->pages);
j->type = REDIS_IOJOB_DO_SWAP;
lockThreadedIO();
queueIOJob(j);
unlockThreadedIO();
}
} else if (j->type == REDIS_IOJOB_DO_SWAP) { } else if (j->type == REDIS_IOJOB_DO_SWAP) {
vmpointer *vp; vmpointer *vp;
......
...@@ -125,6 +125,7 @@ ...@@ -125,6 +125,7 @@
#define REDIS_DS_SAVING 2 /* There is an IO Job created for this obj. */ #define REDIS_DS_SAVING 2 /* There is an IO Job created for this obj. */
#define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1 #define REDIS_MAX_COMPLETED_JOBS_PROCESSED 1
#define REDIS_THREAD_STACK_SIZE (1024*1024*4)
/* Client flags */ /* Client flags */
#define REDIS_SLAVE 1 /* This client is a slave server */ #define REDIS_SLAVE 1 /* This client is a slave server */
...@@ -542,22 +543,15 @@ typedef struct zset { ...@@ -542,22 +543,15 @@ typedef struct zset {
} zset; } zset;
/* VM threaded I/O request message */ /* VM threaded I/O request message */
#define REDIS_IOJOB_LOAD 0 /* Load from disk to memory */ #define REDIS_IOJOB_LOAD 0
#define REDIS_IOJOB_PREPARE_SWAP 1 /* Compute needed pages */ #define REDIS_IOJOB_SAVE 1
#define REDIS_IOJOB_DO_SWAP 2 /* Swap from memory to disk */
typedef struct iojob { typedef struct iojob {
int type; /* Request type, REDIS_IOJOB_* */ int type; /* Request type, REDIS_IOJOB_* */
redisDb *db;/* Redis database */ redisDb *db;/* Redis database */
robj *key; /* This I/O request is about swapping this key */ robj *key; /* This I/O request is about this key */
robj *id; /* Unique identifier of this job: robj *val; /* the value to swap for REDIS_IOJOB_SAVE, otherwise this
this is the object to swap for REDIS_IOREQ_*_SWAP, or the * field is populated by the I/O thread for REDIS_IOJOB_LOAD. */
vmpointer objct for REDIS_IOREQ_LOAD. */
robj *val; /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this
* field is populated by the I/O thread for REDIS_IOREQ_LOAD. */
off_t page; /* Swap page where to read/write the object */
off_t pages; /* Swap pages needed to save object. PREPARE_SWAP return val */
int canceled; /* True if this command was canceled by blocking side of VM */
pthread_t thread; /* ID of the thread processing this entry */
} iojob; } iojob;
/* Structure to hold list iteration abstraction. */ /* Structure to hold list iteration abstraction. */
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册