提交 597fe3ce 编写于 作者: M Michal Privoznik

daemon: Create priority workers pool

This patch annotates APIs with low or high priority.
In low set MUST be all APIs which might eventually access monitor
(and thus block indefinitely). Other APIs may be marked as high
priority. However, some must be (e.g. domainDestroy).

For high priority calls (HPC), there are some high priority workers
(HPW) created in the pool. HPW can execute only HPC, although normal
worker can process any call regardless priority. Therefore, only those
APIs which are guaranteed to end in reasonable small amount of time
can be marked as HPC.

The size of this HPC pool is static, because HPC are expected to end
quickly, therefore jobs assigned to this pool will be served quickly.
It can be configured in libvirtd.conf via prio_workers variable.
Default is set to 5.

To mark API with low or high priority, append priority:{low|high} to
it's comment in src/remote/remote_protocol.x. This is similar to
autogen|skipgen. If not marked, the generator assumes low as default.
上级 63379890
...@@ -57,6 +57,7 @@ module Libvirtd = ...@@ -57,6 +57,7 @@ module Libvirtd =
| int_entry "max_clients" | int_entry "max_clients"
| int_entry "max_requests" | int_entry "max_requests"
| int_entry "max_client_requests" | int_entry "max_client_requests"
| int_entry "prio_workers"
let logging_entry = int_entry "log_level" let logging_entry = int_entry "log_level"
| str_entry "log_filters" | str_entry "log_filters"
......
...@@ -134,6 +134,8 @@ struct daemonConfig { ...@@ -134,6 +134,8 @@ struct daemonConfig {
int max_workers; int max_workers;
int max_clients; int max_clients;
int prio_workers;
int max_requests; int max_requests;
int max_client_requests; int max_client_requests;
...@@ -886,6 +888,8 @@ daemonConfigNew(bool privileged ATTRIBUTE_UNUSED) ...@@ -886,6 +888,8 @@ daemonConfigNew(bool privileged ATTRIBUTE_UNUSED)
data->max_workers = 20; data->max_workers = 20;
data->max_clients = 20; data->max_clients = 20;
data->prio_workers = 5;
data->max_requests = 20; data->max_requests = 20;
data->max_client_requests = 5; data->max_client_requests = 5;
...@@ -1042,6 +1046,8 @@ daemonConfigLoad(struct daemonConfig *data, ...@@ -1042,6 +1046,8 @@ daemonConfigLoad(struct daemonConfig *data,
GET_CONF_INT (conf, filename, max_workers); GET_CONF_INT (conf, filename, max_workers);
GET_CONF_INT (conf, filename, max_clients); GET_CONF_INT (conf, filename, max_clients);
GET_CONF_INT (conf, filename, prio_workers);
GET_CONF_INT (conf, filename, max_requests); GET_CONF_INT (conf, filename, max_requests);
GET_CONF_INT (conf, filename, max_client_requests); GET_CONF_INT (conf, filename, max_client_requests);
...@@ -1441,6 +1447,7 @@ int main(int argc, char **argv) { ...@@ -1441,6 +1447,7 @@ int main(int argc, char **argv) {
config->auth_unix_ro == REMOTE_AUTH_POLKIT; config->auth_unix_ro == REMOTE_AUTH_POLKIT;
if (!(srv = virNetServerNew(config->min_workers, if (!(srv = virNetServerNew(config->min_workers,
config->max_workers, config->max_workers,
config->prio_workers,
config->max_clients, config->max_clients,
config->mdns_adv ? config->mdns_name : NULL, config->mdns_adv ? config->mdns_name : NULL,
use_polkit_dbus, use_polkit_dbus,
......
...@@ -257,6 +257,12 @@ ...@@ -257,6 +257,12 @@
#min_workers = 5 #min_workers = 5
#max_workers = 20 #max_workers = 20
# The number of priority workers. If all workers from above
# pool will stuck, some calls marked as high priority
# (notably domainDestroy) can be executed in this pool.
#prio_workers = 5
# Total global limit on concurrent RPC calls. Should be # Total global limit on concurrent RPC calls. Should be
# at least as large as max_workers. Beyond this, RPC requests # at least as large as max_workers. Beyond this, RPC requests
# will be read into memory and queued. This directly impact # will be read into memory and queued. This directly impact
......
...@@ -663,7 +663,7 @@ qemudStartup(int privileged) { ...@@ -663,7 +663,7 @@ qemudStartup(int privileged) {
virHashForEach(qemu_driver->domains.objs, qemuDomainSnapshotLoad, virHashForEach(qemu_driver->domains.objs, qemuDomainSnapshotLoad,
qemu_driver->snapshotDir); qemu_driver->snapshotDir);
qemu_driver->workerPool = virThreadPoolNew(0, 1, processWatchdogEvent, qemu_driver); qemu_driver->workerPool = virThreadPoolNew(0, 1, 0, processWatchdogEvent, qemu_driver);
if (!qemu_driver->workerPool) if (!qemu_driver->workerPool)
goto error; goto error;
......
...@@ -571,7 +571,7 @@ qemuProcessHandleWatchdog(qemuMonitorPtr mon ATTRIBUTE_UNUSED, ...@@ -571,7 +571,7 @@ qemuProcessHandleWatchdog(qemuMonitorPtr mon ATTRIBUTE_UNUSED,
* deleted before handling watchdog event is finished. * deleted before handling watchdog event is finished.
*/ */
virDomainObjRef(vm); virDomainObjRef(vm);
if (virThreadPoolSendJob(driver->workerPool, wdEvent) < 0) { if (virThreadPoolSendJob(driver->workerPool, 0, wdEvent) < 0) {
if (virDomainObjUnref(vm) == 0) if (virDomainObjUnref(vm) == 0)
vm = NULL; vm = NULL;
VIR_FREE(wdEvent); VIR_FREE(wdEvent);
......
...@@ -52,9 +52,14 @@ const QEMU_PROGRAM = 0x20008087; ...@@ -52,9 +52,14 @@ const QEMU_PROGRAM = 0x20008087;
const QEMU_PROTOCOL_VERSION = 1; const QEMU_PROTOCOL_VERSION = 1;
enum qemu_procedure { enum qemu_procedure {
/* Each function must have a two-word comment. The first word is /* Each function must have a three-word comment. The first word is
* whether gendispatch.pl handles daemon, the second whether * whether gendispatch.pl handles daemon, the second whether
* it handles src/remote. */ * it handles src/remote.
QEMU_PROC_MONITOR_COMMAND = 1, /* skipgen skipgen */ * The last argument describes priority of API. There are two accepted
QEMU_PROC_DOMAIN_ATTACH = 2 /* autogen autogen */ * values: low, high; Each API that might eventually access hypervisor's
* monitor (and thus block) MUST fall into low priority. However, there
* are some exceptions to this rule, e.g. domainDestroy. Other APIs MAY
* be marked as high priority. If in doubt, it's safe to choose low. */
QEMU_PROC_MONITOR_COMMAND = 1, /* skipgen skipgen priority:low */
QEMU_PROC_DOMAIN_ATTACH = 2 /* autogen autogen priority:low */
}; };
此差异已折叠。
...@@ -146,12 +146,13 @@ while (<PROTOCOL>) { ...@@ -146,12 +146,13 @@ while (<PROTOCOL>) {
} }
if ($opt_b or $opt_k) { if ($opt_b or $opt_k) {
if (!($flags =~ m/^\s*\/\*\s*(\S+)\s+(\S+)\s*(.*)\*\/\s*$/)) { if (!($flags =~ m/^\s*\/\*\s*(\S+)\s+(\S+)\s*(\|.*)?\s+(priority:(\S+))?\s*\*\/\s*$/)) {
die "invalid generator flags for ${procprefix}_PROC_${name}" die "invalid generator flags for ${procprefix}_PROC_${name}"
} }
my $genmode = $opt_b ? $1 : $2; my $genmode = $opt_b ? $1 : $2;
my $genflags = $3; my $genflags = $3;
my $priority = defined $5 ? $5 : "low";
if ($genmode eq "autogen") { if ($genmode eq "autogen") {
push(@autogen, $ProcName); push(@autogen, $ProcName);
...@@ -171,6 +172,16 @@ while (<PROTOCOL>) { ...@@ -171,6 +172,16 @@ while (<PROTOCOL>) {
} else { } else {
$calls{$name}->{streamflag} = "none"; $calls{$name}->{streamflag} = "none";
} }
# for now, we distinguish only two levels of prioroty:
# low (0) and high (1)
if ($priority eq "high") {
$calls{$name}->{priority} = 1;
} elsif ($priority eq "low") {
$calls{$name}->{priority} = 0;
} else {
die "invalid priority ${priority} for ${procprefix}_PROC_${name}"
}
} }
$calls[$id] = $calls{$name}; $calls[$id] = $calls{$name};
...@@ -260,6 +271,7 @@ if ($opt_d) { ...@@ -260,6 +271,7 @@ if ($opt_d) {
print "$_:\n"; print "$_:\n";
print " name $calls{$_}->{name} ($calls{$_}->{ProcName})\n"; print " name $calls{$_}->{name} ($calls{$_}->{ProcName})\n";
print " $calls{$_}->{args} -> $calls{$_}->{ret}\n"; print " $calls{$_}->{args} -> $calls{$_}->{ret}\n";
print " priority -> $calls{$_}->{priority}\n";
} }
} }
...@@ -935,7 +947,7 @@ elsif ($opt_b) { ...@@ -935,7 +947,7 @@ elsif ($opt_b) {
print "virNetServerProgramProc ${structprefix}Procs[] = {\n"; print "virNetServerProgramProc ${structprefix}Procs[] = {\n";
for ($id = 0 ; $id <= $#calls ; $id++) { for ($id = 0 ; $id <= $#calls ; $id++) {
my ($comment, $name, $argtype, $arglen, $argfilter, $retlen, $retfilter); my ($comment, $name, $argtype, $arglen, $argfilter, $retlen, $retfilter, $priority);
if (defined $calls[$id] && !$calls[$id]->{msg}) { if (defined $calls[$id] && !$calls[$id]->{msg}) {
$comment = "/* Method $calls[$id]->{ProcName} => $id */"; $comment = "/* Method $calls[$id]->{ProcName} => $id */";
...@@ -958,7 +970,9 @@ elsif ($opt_b) { ...@@ -958,7 +970,9 @@ elsif ($opt_b) {
$retfilter = "xdr_void"; $retfilter = "xdr_void";
} }
print "{ $comment\n ${name},\n $arglen,\n (xdrproc_t)$argfilter,\n $retlen,\n (xdrproc_t)$retfilter,\n true \n},\n"; $priority = defined $calls[$id]->{priority} ? $calls[$id]->{priority} : 0;
print "{ $comment\n ${name},\n $arglen,\n (xdrproc_t)$argfilter,\n $retlen,\n (xdrproc_t)$retfilter,\n true,\n $priority\n},\n";
} }
print "};\n"; print "};\n";
print "size_t ${structprefix}NProcs = ARRAY_CARDINALITY(${structprefix}Procs);\n"; print "size_t ${structprefix}NProcs = ARRAY_CARDINALITY(${structprefix}Procs);\n";
......
...@@ -64,6 +64,7 @@ typedef virNetServerJob *virNetServerJobPtr; ...@@ -64,6 +64,7 @@ typedef virNetServerJob *virNetServerJobPtr;
struct _virNetServerJob { struct _virNetServerJob {
virNetServerClientPtr client; virNetServerClientPtr client;
virNetMessagePtr msg; virNetMessagePtr msg;
virNetServerProgramPtr prog;
}; };
struct _virNetServer { struct _virNetServer {
...@@ -128,61 +129,41 @@ static void virNetServerHandleJob(void *jobOpaque, void *opaque) ...@@ -128,61 +129,41 @@ static void virNetServerHandleJob(void *jobOpaque, void *opaque)
{ {
virNetServerPtr srv = opaque; virNetServerPtr srv = opaque;
virNetServerJobPtr job = jobOpaque; virNetServerJobPtr job = jobOpaque;
virNetServerProgramPtr prog = NULL;
size_t i;
virNetServerLock(srv);
VIR_DEBUG("server=%p client=%p message=%p",
srv, job->client, job->msg);
for (i = 0 ; i < srv->nprograms ; i++) { VIR_DEBUG("server=%p client=%p message=%p prog=%p",
if (virNetServerProgramMatches(srv->programs[i], job->msg)) { srv, job->client, job->msg, job->prog);
prog = srv->programs[i];
break;
}
}
if (!prog) { if (virNetServerProgramDispatch(job->prog,
if (virNetServerProgramUnknownError(job->client,
job->msg,
&job->msg->header) < 0)
goto error;
else
goto cleanup;
}
virNetServerProgramRef(prog);
virNetServerUnlock(srv);
if (virNetServerProgramDispatch(prog,
srv, srv,
job->client, job->client,
job->msg) < 0) job->msg) < 0)
goto error; goto error;
virNetServerLock(srv); virNetServerLock(srv);
virNetServerProgramFree(job->prog);
cleanup:
virNetServerProgramFree(prog);
virNetServerUnlock(srv); virNetServerUnlock(srv);
virNetServerClientFree(job->client); virNetServerClientFree(job->client);
VIR_FREE(job); VIR_FREE(job);
return; return;
error: error:
virNetServerProgramFree(job->prog);
virNetMessageFree(job->msg); virNetMessageFree(job->msg);
virNetServerClientClose(job->client); virNetServerClientClose(job->client);
goto cleanup; virNetServerClientFree(job->client);
VIR_FREE(job);
} }
static int virNetServerDispatchNewMessage(virNetServerClientPtr client, static int virNetServerDispatchNewMessage(virNetServerClientPtr client,
virNetMessagePtr msg, virNetMessagePtr msg,
void *opaque) void *opaque)
{ {
virNetServerPtr srv = opaque; virNetServerPtr srv = opaque;
virNetServerJobPtr job; virNetServerJobPtr job;
int ret; virNetServerProgramPtr prog = NULL;
unsigned int priority = 0;
size_t i;
int ret = -1;
VIR_DEBUG("server=%p client=%p message=%p", VIR_DEBUG("server=%p client=%p message=%p",
srv, client, msg); srv, client, msg);
...@@ -196,8 +177,29 @@ static int virNetServerDispatchNewMessage(virNetServerClientPtr client, ...@@ -196,8 +177,29 @@ static int virNetServerDispatchNewMessage(virNetServerClientPtr client,
job->msg = msg; job->msg = msg;
virNetServerLock(srv); virNetServerLock(srv);
if ((ret = virThreadPoolSendJob(srv->workers, job)) < 0) for (i = 0 ; i < srv->nprograms ; i++) {
if (virNetServerProgramMatches(srv->programs[i], job->msg)) {
prog = srv->programs[i];
break;
}
}
if (!prog) {
virNetServerProgramUnknownError(client, msg, &msg->header);
goto cleanup;
}
virNetServerProgramRef(prog);
job->prog = prog;
priority = virNetServerProgramGetPriority(prog, msg->header.proc);
ret = virThreadPoolSendJob(srv->workers, priority, job);
cleanup:
if (ret < 0) {
VIR_FREE(job); VIR_FREE(job);
virNetServerProgramFree(prog);
}
virNetServerUnlock(srv); virNetServerUnlock(srv);
return ret; return ret;
...@@ -274,6 +276,7 @@ static void virNetServerFatalSignal(int sig, siginfo_t *siginfo ATTRIBUTE_UNUSED ...@@ -274,6 +276,7 @@ static void virNetServerFatalSignal(int sig, siginfo_t *siginfo ATTRIBUTE_UNUSED
virNetServerPtr virNetServerNew(size_t min_workers, virNetServerPtr virNetServerNew(size_t min_workers,
size_t max_workers, size_t max_workers,
size_t priority_workers,
size_t max_clients, size_t max_clients,
const char *mdnsGroupName, const char *mdnsGroupName,
bool connectDBus ATTRIBUTE_UNUSED, bool connectDBus ATTRIBUTE_UNUSED,
...@@ -290,6 +293,7 @@ virNetServerPtr virNetServerNew(size_t min_workers, ...@@ -290,6 +293,7 @@ virNetServerPtr virNetServerNew(size_t min_workers,
srv->refs = 1; srv->refs = 1;
if (!(srv->workers = virThreadPoolNew(min_workers, max_workers, if (!(srv->workers = virThreadPoolNew(min_workers, max_workers,
priority_workers,
virNetServerHandleJob, virNetServerHandleJob,
srv))) srv)))
goto error; goto error;
......
...@@ -39,6 +39,7 @@ typedef int (*virNetServerClientInitHook)(virNetServerPtr srv, ...@@ -39,6 +39,7 @@ typedef int (*virNetServerClientInitHook)(virNetServerPtr srv,
virNetServerPtr virNetServerNew(size_t min_workers, virNetServerPtr virNetServerNew(size_t min_workers,
size_t max_workers, size_t max_workers,
size_t priority_workers,
size_t max_clients, size_t max_clients,
const char *mdnsGroupName, const char *mdnsGroupName,
bool connectDBus, bool connectDBus,
......
...@@ -108,6 +108,17 @@ static virNetServerProgramProcPtr virNetServerProgramGetProc(virNetServerProgram ...@@ -108,6 +108,17 @@ static virNetServerProgramProcPtr virNetServerProgramGetProc(virNetServerProgram
return &prog->procs[procedure]; return &prog->procs[procedure];
} }
unsigned int
virNetServerProgramGetPriority(virNetServerProgramPtr prog,
int procedure)
{
virNetServerProgramProcPtr proc = virNetServerProgramGetProc(prog, procedure);
if (!proc)
return 0;
return proc->priority;
}
static int static int
virNetServerProgramSendError(unsigned program, virNetServerProgramSendError(unsigned program,
......
...@@ -53,6 +53,7 @@ struct _virNetServerProgramProc { ...@@ -53,6 +53,7 @@ struct _virNetServerProgramProc {
size_t ret_len; size_t ret_len;
xdrproc_t ret_filter; xdrproc_t ret_filter;
bool needAuth; bool needAuth;
unsigned int priority;
}; };
virNetServerProgramPtr virNetServerProgramNew(unsigned program, virNetServerProgramPtr virNetServerProgramNew(unsigned program,
...@@ -63,6 +64,9 @@ virNetServerProgramPtr virNetServerProgramNew(unsigned program, ...@@ -63,6 +64,9 @@ virNetServerProgramPtr virNetServerProgramNew(unsigned program,
int virNetServerProgramGetID(virNetServerProgramPtr prog); int virNetServerProgramGetID(virNetServerProgramPtr prog);
int virNetServerProgramGetVersion(virNetServerProgramPtr prog); int virNetServerProgramGetVersion(virNetServerProgramPtr prog);
unsigned int virNetServerProgramGetPriority(virNetServerProgramPtr prog,
int procedure);
void virNetServerProgramRef(virNetServerProgramPtr prog); void virNetServerProgramRef(virNetServerProgramPtr prog);
int virNetServerProgramMatches(virNetServerProgramPtr prog, int virNetServerProgramMatches(virNetServerProgramPtr prog,
......
...@@ -37,7 +37,9 @@ typedef struct _virThreadPoolJob virThreadPoolJob; ...@@ -37,7 +37,9 @@ typedef struct _virThreadPoolJob virThreadPoolJob;
typedef virThreadPoolJob *virThreadPoolJobPtr; typedef virThreadPoolJob *virThreadPoolJobPtr;
struct _virThreadPoolJob { struct _virThreadPoolJob {
virThreadPoolJobPtr prev;
virThreadPoolJobPtr next; virThreadPoolJobPtr next;
unsigned int priority;
void *data; void *data;
}; };
...@@ -47,7 +49,8 @@ typedef virThreadPoolJobList *virThreadPoolJobListPtr; ...@@ -47,7 +49,8 @@ typedef virThreadPoolJobList *virThreadPoolJobListPtr;
struct _virThreadPoolJobList { struct _virThreadPoolJobList {
virThreadPoolJobPtr head; virThreadPoolJobPtr head;
virThreadPoolJobPtr *tail; virThreadPoolJobPtr tail;
virThreadPoolJobPtr firstPrio;
}; };
...@@ -57,6 +60,7 @@ struct _virThreadPool { ...@@ -57,6 +60,7 @@ struct _virThreadPool {
virThreadPoolJobFunc jobFunc; virThreadPoolJobFunc jobFunc;
void *jobOpaque; void *jobOpaque;
virThreadPoolJobList jobList; virThreadPoolJobList jobList;
size_t jobQueueDepth;
virMutex mutex; virMutex mutex;
virCond cond; virCond cond;
...@@ -66,33 +70,75 @@ struct _virThreadPool { ...@@ -66,33 +70,75 @@ struct _virThreadPool {
size_t freeWorkers; size_t freeWorkers;
size_t nWorkers; size_t nWorkers;
virThreadPtr workers; virThreadPtr workers;
size_t nPrioWorkers;
virThreadPtr prioWorkers;
virCond prioCond;
};
struct virThreadPoolWorkerData {
virThreadPoolPtr pool;
virCondPtr cond;
bool priority;
}; };
static void virThreadPoolWorker(void *opaque) static void virThreadPoolWorker(void *opaque)
{ {
virThreadPoolPtr pool = opaque; struct virThreadPoolWorkerData *data = opaque;
virThreadPoolPtr pool = data->pool;
virCondPtr cond = data->cond;
bool priority = data->priority;
virThreadPoolJobPtr job = NULL;
VIR_FREE(data);
virMutexLock(&pool->mutex); virMutexLock(&pool->mutex);
while (1) { while (1) {
while (!pool->quit && while (!pool->quit &&
!pool->jobList.head) { ((!priority && !pool->jobList.head) ||
pool->freeWorkers++; (priority && !pool->jobList.firstPrio))) {
if (virCondWait(&pool->cond, &pool->mutex) < 0) { if (!priority)
pool->freeWorkers--; pool->freeWorkers++;
if (virCondWait(cond, &pool->mutex) < 0) {
if (!priority)
pool->freeWorkers--;
goto out; goto out;
} }
pool->freeWorkers--; if (!priority)
pool->freeWorkers--;
} }
if (pool->quit) if (pool->quit)
break; break;
virThreadPoolJobPtr job = pool->jobList.head; if (priority) {
pool->jobList.head = pool->jobList.head->next; job = pool->jobList.firstPrio;
job->next = NULL; } else {
if (pool->jobList.tail == &job->next) job = pool->jobList.head;
pool->jobList.tail = &pool->jobList.head; }
if (job == pool->jobList.firstPrio) {
virThreadPoolJobPtr tmp = job->next;
while (tmp) {
if (tmp->priority) {
break;
}
tmp = tmp->next;
}
pool->jobList.firstPrio = tmp;
}
if (job->prev)
job->prev->next = job->next;
else
pool->jobList.head = job->next;
if (job->next)
job->next->prev = job->prev;
else
pool->jobList.tail = job->prev;
pool->jobQueueDepth--;
virMutexUnlock(&pool->mutex); virMutexUnlock(&pool->mutex);
(pool->jobFunc)(job->data, pool->jobOpaque); (pool->jobFunc)(job->data, pool->jobOpaque);
...@@ -101,19 +147,24 @@ static void virThreadPoolWorker(void *opaque) ...@@ -101,19 +147,24 @@ static void virThreadPoolWorker(void *opaque)
} }
out: out:
pool->nWorkers--; if (priority)
if (pool->nWorkers == 0) pool->nPrioWorkers--;
else
pool->nWorkers--;
if (pool->nWorkers == 0 && pool->nPrioWorkers==0)
virCondSignal(&pool->quit_cond); virCondSignal(&pool->quit_cond);
virMutexUnlock(&pool->mutex); virMutexUnlock(&pool->mutex);
} }
virThreadPoolPtr virThreadPoolNew(size_t minWorkers, virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
size_t maxWorkers, size_t maxWorkers,
size_t prioWorkers,
virThreadPoolJobFunc func, virThreadPoolJobFunc func,
void *opaque) void *opaque)
{ {
virThreadPoolPtr pool; virThreadPoolPtr pool;
size_t i; size_t i;
struct virThreadPoolWorkerData *data = NULL;
if (minWorkers > maxWorkers) if (minWorkers > maxWorkers)
minWorkers = maxWorkers; minWorkers = maxWorkers;
...@@ -123,8 +174,7 @@ virThreadPoolPtr virThreadPoolNew(size_t minWorkers, ...@@ -123,8 +174,7 @@ virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
return NULL; return NULL;
} }
pool->jobList.head = NULL; pool->jobList.tail = pool->jobList.head = NULL;
pool->jobList.tail = &pool->jobList.head;
pool->jobFunc = func; pool->jobFunc = func;
pool->jobOpaque = opaque; pool->jobOpaque = opaque;
...@@ -141,18 +191,51 @@ virThreadPoolPtr virThreadPoolNew(size_t minWorkers, ...@@ -141,18 +191,51 @@ virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
pool->maxWorkers = maxWorkers; pool->maxWorkers = maxWorkers;
for (i = 0; i < minWorkers; i++) { for (i = 0; i < minWorkers; i++) {
if (VIR_ALLOC(data) < 0) {
virReportOOMError();
goto error;
}
data->pool = pool;
data->cond = &pool->cond;
if (virThreadCreate(&pool->workers[i], if (virThreadCreate(&pool->workers[i],
true, true,
virThreadPoolWorker, virThreadPoolWorker,
pool) < 0) { data) < 0) {
goto error; goto error;
} }
pool->nWorkers++; pool->nWorkers++;
} }
if (prioWorkers) {
if (virCondInit(&pool->prioCond) < 0)
goto error;
if (VIR_ALLOC_N(pool->prioWorkers, prioWorkers) < 0)
goto error;
for (i = 0; i < prioWorkers; i++) {
if (VIR_ALLOC(data) < 0) {
virReportOOMError();
goto error;
}
data->pool = pool;
data->cond = &pool->prioCond;
data->priority = true;
if (virThreadCreate(&pool->prioWorkers[i],
true,
virThreadPoolWorker,
data) < 0) {
goto error;
}
pool->nPrioWorkers++;
}
}
return pool; return pool;
error: error:
VIR_FREE(data);
virThreadPoolFree(pool); virThreadPoolFree(pool);
return NULL; return NULL;
...@@ -161,17 +244,22 @@ error: ...@@ -161,17 +244,22 @@ error:
void virThreadPoolFree(virThreadPoolPtr pool) void virThreadPoolFree(virThreadPoolPtr pool)
{ {
virThreadPoolJobPtr job; virThreadPoolJobPtr job;
bool priority = false;
if (!pool) if (!pool)
return; return;
virMutexLock(&pool->mutex); virMutexLock(&pool->mutex);
pool->quit = true; pool->quit = true;
if (pool->nWorkers > 0) { if (pool->nWorkers > 0)
virCondBroadcast(&pool->cond); virCondBroadcast(&pool->cond);
ignore_value(virCondWait(&pool->quit_cond, &pool->mutex)); if (pool->nPrioWorkers > 0) {
priority = true;
virCondBroadcast(&pool->prioCond);
} }
ignore_value(virCondWait(&pool->quit_cond, &pool->mutex));
while ((job = pool->jobList.head)) { while ((job = pool->jobList.head)) {
pool->jobList.head = pool->jobList.head->next; pool->jobList.head = pool->jobList.head->next;
VIR_FREE(job); VIR_FREE(job);
...@@ -182,10 +270,19 @@ void virThreadPoolFree(virThreadPoolPtr pool) ...@@ -182,10 +270,19 @@ void virThreadPoolFree(virThreadPoolPtr pool)
virMutexDestroy(&pool->mutex); virMutexDestroy(&pool->mutex);
ignore_value(virCondDestroy(&pool->quit_cond)); ignore_value(virCondDestroy(&pool->quit_cond));
ignore_value(virCondDestroy(&pool->cond)); ignore_value(virCondDestroy(&pool->cond));
if (priority) {
VIR_FREE(pool->prioWorkers);
ignore_value(virCondDestroy(&pool->prioCond));
}
VIR_FREE(pool); VIR_FREE(pool);
} }
/*
* @priority - job priority
* Return: 0 on success, -1 otherwise
*/
int virThreadPoolSendJob(virThreadPoolPtr pool, int virThreadPoolSendJob(virThreadPoolPtr pool,
unsigned int priority,
void *jobData) void *jobData)
{ {
virThreadPoolJobPtr job; virThreadPoolJobPtr job;
...@@ -194,7 +291,7 @@ int virThreadPoolSendJob(virThreadPoolPtr pool, ...@@ -194,7 +291,7 @@ int virThreadPoolSendJob(virThreadPoolPtr pool,
if (pool->quit) if (pool->quit)
goto error; goto error;
if (pool->freeWorkers == 0 && if (pool->freeWorkers - pool->jobQueueDepth <= 0 &&
pool->nWorkers < pool->maxWorkers) { pool->nWorkers < pool->maxWorkers) {
if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) { if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) {
virReportOOMError(); virReportOOMError();
...@@ -216,13 +313,26 @@ int virThreadPoolSendJob(virThreadPoolPtr pool, ...@@ -216,13 +313,26 @@ int virThreadPoolSendJob(virThreadPoolPtr pool,
} }
job->data = jobData; job->data = jobData;
job->next = NULL; job->priority = priority;
*pool->jobList.tail = job;
pool->jobList.tail = &(*pool->jobList.tail)->next; job->prev = pool->jobList.tail;
if (pool->jobList.tail)
pool->jobList.tail->next = job;
pool->jobList.tail = job;
if (!pool->jobList.head)
pool->jobList.head = job;
if (priority && !pool->jobList.firstPrio)
pool->jobList.firstPrio = job;
pool->jobQueueDepth++;
virCondSignal(&pool->cond); virCondSignal(&pool->cond);
virMutexUnlock(&pool->mutex); if (priority)
virCondSignal(&pool->prioCond);
virMutexUnlock(&pool->mutex);
return 0; return 0;
error: error:
......
...@@ -35,12 +35,14 @@ typedef void (*virThreadPoolJobFunc)(void *jobdata, void *opaque); ...@@ -35,12 +35,14 @@ typedef void (*virThreadPoolJobFunc)(void *jobdata, void *opaque);
virThreadPoolPtr virThreadPoolNew(size_t minWorkers, virThreadPoolPtr virThreadPoolNew(size_t minWorkers,
size_t maxWorkers, size_t maxWorkers,
size_t prioWorkers,
virThreadPoolJobFunc func, virThreadPoolJobFunc func,
void *opaque) ATTRIBUTE_NONNULL(3); void *opaque) ATTRIBUTE_NONNULL(4);
void virThreadPoolFree(virThreadPoolPtr pool); void virThreadPoolFree(virThreadPoolPtr pool);
int virThreadPoolSendJob(virThreadPoolPtr pool, int virThreadPoolSendJob(virThreadPoolPtr pool,
unsigned int priority,
void *jobdata) ATTRIBUTE_NONNULL(1) void *jobdata) ATTRIBUTE_NONNULL(1)
ATTRIBUTE_RETURN_CHECK; ATTRIBUTE_RETURN_CHECK;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册