提交 7d5fb5df 编写于 作者: A antirez

Setting N I/O threads should mean N-1 additional + 1 main thread.

上级 28d146be
......@@ -2658,9 +2658,9 @@ int io_threads_active; /* Are the threads currently spinning waiting I/O? */
int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */
/* This is the list of clients each thread will serve when threaded I/O is
* used. We spawn N threads, and the N+1 list is used for the clients that
* are processed by the main thread itself (this is why ther is "+1"). */
list *io_threads_list[IO_THREADS_MAX_NUM+1];
* used. We spawn io_threads_num-1 threads, since one is the main thread
* itself. */
list *io_threads_list[IO_THREADS_MAX_NUM];
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
......@@ -2720,12 +2720,16 @@ void initThreadedIO(void) {
exit(1);
}
/* Spawn the I/O threads. */
/* Spawn and initialize the I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
io_threads_list[i] = listCreate();
if (i == 0) continue; /* Thread 0 is the main thread. */
/* Things we do only for the additional threads. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
io_threads_pending[i] = 0;
io_threads_list[i] = listCreate();
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
......@@ -2733,14 +2737,13 @@ void initThreadedIO(void) {
}
io_threads[i] = tid;
}
io_threads_list[server.io_threads_num] = listCreate(); /* For main thread */
}
void startThreadedIO(void) {
if (tio_debug) { printf("S"); fflush(stdout); }
if (tio_debug) printf("--- STARTING THREADED IO ---\n");
serverAssert(io_threads_active == 0);
for (int j = 0; j < server.io_threads_num; j++)
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_unlock(&io_threads_mutex[j]);
io_threads_active = 1;
}
......@@ -2754,7 +2757,7 @@ void stopThreadedIO(void) {
(int) listLength(server.clients_pending_read),
(int) listLength(server.clients_pending_write));
serverAssert(io_threads_active == 1);
for (int j = 0; j < server.io_threads_num; j++)
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_lock(&io_threads_mutex[j]);
io_threads_active = 0;
}
......@@ -2805,7 +2808,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
int target_id = item_id % (server.io_threads_num+1);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
......@@ -2813,23 +2816,23 @@ int handleClientsWithPendingWritesUsingThreads(void) {
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 0; j < server.io_threads_num; j++) {
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
/* Also use the main thread to process a slide of clients. */
listRewind(io_threads_list[server.io_threads_num],&li);
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[server.io_threads_num]);
listEmpty(io_threads_list[0]);
/* Wait for all threads to end their work. */
/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 0; j < server.io_threads_num; j++)
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
......@@ -2890,7 +2893,7 @@ int handleClientsWithPendingReadsUsingThreads(void) {
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % (server.io_threads_num+1);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
......@@ -2898,23 +2901,23 @@ int handleClientsWithPendingReadsUsingThreads(void) {
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
io_threads_op = IO_THREADS_OP_READ;
for (int j = 0; j < server.io_threads_num; j++) {
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
/* Also use the main thread to process a slide of clients. */
listRewind(io_threads_list[server.io_threads_num],&li);
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[server.io_threads_num]);
listEmpty(io_threads_list[0]);
/* Wait for all threads to end their work. */
/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 0; j < server.io_threads_num; j++)
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册