From 50be9b97bc975439cb34703acad5ac108952e672 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 15 Sep 2011 15:46:29 +0200 Subject: [PATCH] Use a different thread for every different type of background job --- src/aof.c | 2 +- src/bio.c | 98 +++++++++++++++++++++++++++++++++++-------------------- src/bio.h | 9 +++-- 3 files changed, 67 insertions(+), 42 deletions(-) diff --git a/src/aof.c b/src/aof.c index 7b15acb82..b1b0d9dce 100644 --- a/src/aof.c +++ b/src/aof.c @@ -771,7 +771,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { redisLog(REDIS_NOTICE, "Background AOF rewrite successful"); /* Asynchronously close the overwritten AOF. */ - if (oldfd != -1) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,(void*)(long)oldfd); + if (oldfd != -1) bioCreateBackgroundJob(REDIS_BIO_CLOSE_FILE,(void*)(long)oldfd,NULL,NULL); redisLog(REDIS_VERBOSE, "Background AOF rewrite signal handler took %lldus", ustime()-now); diff --git a/src/bio.c b/src/bio.c index 882d4183b..145a2324f 100644 --- a/src/bio.c +++ b/src/bio.c @@ -16,7 +16,10 @@ * ------ * * The design is trivial, we have a structure representing a job to perform - * and a single thread performing all the I/O operations in the queue. + * and a different thread and job queue for every job type. + * Every thread wait for new jobs in its queue, and process every job + * sequentially. + * * Currently there is no way for the creator of the job to be notified about * the completion of the operation, this will only be added when/if needed. */ @@ -24,22 +27,24 @@ #include "redis.h" #include "bio.h" -static pthread_mutex_t bio_mutex; -static pthread_cond_t bio_condvar; -static list *bio_jobs; +static pthread_mutex_t bio_mutex[REDIS_BIO_NUM_OPS]; +static pthread_cond_t bio_condvar[REDIS_BIO_NUM_OPS]; +static list *bio_jobs[REDIS_BIO_NUM_OPS]; /* The following array is used to hold the number of pending jobs for every * OP type. This allows us to export the bioPendingJobsOfType() API that is * useful when the main thread wants to perform some operation that may involve * objects shared with the background thread. The main thread will just wait * that there are no longer jobs of this type to be executed before performing * the sensible operation. This data is also useful for reporting. */ -static unsigned long long *bio_pending; +static unsigned long long bio_pending[REDIS_BIO_NUM_OPS]; /* This structure represents a background Job. It is only used locally to this * file as the API deos not expose the internals at all. */ struct bio_job { - int type; /* Job type, for instance BIO_JOB_CLOSE */ - void *data; /* Job specific arguments pointer. */ + time_t time; /* Time at which the job was created. */ + /* Job specific arguments pointers. If we need to pass more than three + * arguments we can just pass a pointer to a structure or alike. */ + void *arg1, *arg2, *arg3; }; void *bioProcessBackgroundJobs(void *arg); @@ -56,11 +61,12 @@ void bioInit(void) { int j; /* Initialization of state vars and objects */ - pthread_mutex_init(&bio_mutex,NULL); - pthread_cond_init(&bio_condvar,NULL); - bio_jobs = listCreate(); - bio_pending = zmalloc(sizeof(*bio_pending)*REDIS_BIO_MAX_OP_ID); - for (j = 0; j < REDIS_BIO_MAX_OP_ID; j++) bio_pending[j] = 0; + for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { + pthread_mutex_init(&bio_mutex[j],NULL); + pthread_cond_init(&bio_condvar[j],NULL); + bio_jobs[j] = listCreate(); + bio_pending[j] = 0; + } /* Set the stack size as by default it may be small in some system */ pthread_attr_init(&attr); @@ -69,52 +75,57 @@ void bioInit(void) { while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; pthread_attr_setstacksize(&attr, stacksize); - /* Ready to spawn our thread */ - if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,NULL) != 0) { - redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs."); - exit(1); + /* Ready to spawn our threads. We use the single argument the thread + * function accepts in order to pass the job ID the thread is + * responsible of. */ + for (j = 0; j < REDIS_BIO_NUM_OPS; j++) { + void *arg = (void*)(unsigned long) j; + if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { + redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs."); + exit(1); + } } } -void bioCreateBackgroundJob(int type, void *data) { +void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { struct bio_job *job = zmalloc(sizeof(*job)); - job->type = type; - job->data = data; - pthread_mutex_lock(&bio_mutex); - listAddNodeTail(bio_jobs,job); + job->time = time(NULL); + job->arg1 = arg1; + job->arg2 = arg2; + job->arg3 = arg3; + pthread_mutex_lock(&bio_mutex[type]); + listAddNodeTail(bio_jobs[type],job); bio_pending[type]++; - pthread_cond_signal(&bio_condvar); - pthread_mutex_unlock(&bio_mutex); + pthread_cond_signal(&bio_condvar[type]); + pthread_mutex_unlock(&bio_mutex[type]); } void *bioProcessBackgroundJobs(void *arg) { struct bio_job *job; - REDIS_NOTUSED(arg); + unsigned long type = (unsigned long) arg; pthread_detach(pthread_self()); - pthread_mutex_lock(&bio_mutex); + pthread_mutex_lock(&bio_mutex[type]); while(1) { listNode *ln; - int type; /* The loop always starts with the lock hold. */ - if (listLength(bio_jobs) == 0) { - pthread_cond_wait(&bio_condvar,&bio_mutex); + if (listLength(bio_jobs[type]) == 0) { + pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]); continue; } /* Pop the job from the queue. */ - ln = listFirst(bio_jobs); + ln = listFirst(bio_jobs[type]); job = ln->value; - type = job->type; - listDelNode(bio_jobs,ln); + listDelNode(bio_jobs[type],ln); /* It is now possible to unlock the background system as we know have * a stand alone job structure to process.*/ - pthread_mutex_unlock(&bio_mutex); + pthread_mutex_unlock(&bio_mutex[type]); /* Process the job accordingly to its type. */ if (type == REDIS_BIO_CLOSE_FILE) { - close((long)job->data); + close((long)job->arg1); } else { redisPanic("Wrong job type in bioProcessBackgroundJobs()."); } @@ -122,7 +133,7 @@ void *bioProcessBackgroundJobs(void *arg) { /* Lock again before reiterating the loop, if there are no longer * jobs to process we'll block again in pthread_cond_wait(). */ - pthread_mutex_lock(&bio_mutex); + pthread_mutex_lock(&bio_mutex[type]); bio_pending[type]--; } } @@ -130,9 +141,9 @@ void *bioProcessBackgroundJobs(void *arg) { /* Return the number of pending jobs of the specified type. */ unsigned long long bioPendingJobsOfType(int type) { unsigned long long val; - pthread_mutex_lock(&bio_mutex); + pthread_mutex_lock(&bio_mutex[type]); val = bio_pending[type]; - pthread_mutex_unlock(&bio_mutex); + pthread_mutex_unlock(&bio_mutex[type]); return val; } @@ -164,3 +175,18 @@ void bioWaitPendingJobsLE(int type, unsigned long long num) { if (bioPendingJobsOfType(type) <= num) break; } } + +/* Return the older job of the specified type. */ +time_t bioOlderJobOfType(int type) { + time_t time; + listNode *ln; + struct bio_job *job; + + pthread_mutex_lock(&bio_mutex[type]); + ln = listFirst(bio_jobs[type]); + job = ln->value; + time = job->time; + pthread_mutex_unlock(&bio_mutex[type]); + return time; +} + diff --git a/src/bio.h b/src/bio.h index ded0d2dce..ebd01ee28 100644 --- a/src/bio.h +++ b/src/bio.h @@ -1,11 +1,10 @@ /* Exported API */ void bioInit(void); -void bioCreateBackgroundJob(int type, void *data); +void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3); unsigned long long bioPendingJobsOfType(int type); void bioWaitPendingJobsLE(int type, unsigned long long num); +time_t bioOlderJobOfType(int type); /* Background job opcodes */ -#define REDIS_BIO_ZERO_OP_ID 0 /* We don't use zero as it is the most likely - * passed value in case of bugs/races. */ -#define REDIS_BIO_CLOSE_FILE 1 /* Deferred close(2) syscall. */ -#define REDIS_BIO_MAX_OP_ID 1 +#define REDIS_BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */ +#define REDIS_BIO_NUM_OPS 1 -- GitLab