diff --git a/fs/io-wq.c b/fs/io-wq.c index 965022fe9961c9c9a660d2694e563dcc297a5c6b..1d01edada8aab43a03098086ec099f5b4a2c7783 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -120,6 +120,9 @@ struct io_wq { refcount_t refs; struct completion done; + atomic_t worker_refs; + struct completion worker_done; + struct hlist_node cpuhp_node; pid_t task_pid; @@ -189,7 +192,8 @@ static void io_worker_exit(struct io_worker *worker) raw_spin_unlock_irq(&wqe->lock); kfree_rcu(worker, rcu); - io_wq_put(wqe->wq); + if (atomic_dec_and_test(&wqe->wq->worker_refs)) + complete(&wqe->wq->worker_done); } static inline bool io_wqe_run_queue(struct io_wqe *wqe) @@ -648,14 +652,15 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) init_completion(&worker->ref_done); init_completion(&worker->started); - refcount_inc(&wq->refs); + atomic_inc(&wq->worker_refs); if (index == IO_WQ_ACCT_BOUND) pid = io_wq_fork_thread(task_thread_bound, worker); else pid = io_wq_fork_thread(task_thread_unbound, worker); if (pid < 0) { - io_wq_put(wq); + if (atomic_dec_and_test(&wq->worker_refs)) + complete(&wq->worker_done); kfree(worker); return false; } @@ -736,6 +741,7 @@ static int io_wq_manager(void *data) { struct io_wq *wq = data; char buf[TASK_COMM_LEN]; + int node; sprintf(buf, "iou-mgr-%d", wq->task_pid); set_task_comm(current, buf); @@ -753,6 +759,15 @@ static int io_wq_manager(void *data) } while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)); io_wq_check_workers(wq); + + rcu_read_lock(); + for_each_node(node) + io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL); + rcu_read_unlock(); + + /* we might not ever have created any workers */ + if (atomic_read(&wq->worker_refs)) + wait_for_completion(&wq->worker_done); wq->manager = NULL; io_wq_put(wq); do_exit(0); @@ -796,6 +811,7 @@ static int io_wq_fork_manager(struct io_wq *wq) if (wq->manager) return 0; + reinit_completion(&wq->worker_done); clear_bit(IO_WQ_BIT_EXIT, &wq->state); refcount_inc(&wq->refs); current->flags |= PF_IO_WORKER; @@ -1050,6 +1066,9 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data) init_completion(&wq->done); refcount_set(&wq->refs, 1); + init_completion(&wq->worker_done); + atomic_set(&wq->worker_refs, 0); + ret = io_wq_fork_manager(wq); if (!ret) return wq; @@ -1077,11 +1096,6 @@ static void io_wq_destroy(struct io_wq *wq) if (wq->manager) wake_up_process(wq->manager); - rcu_read_lock(); - for_each_node(node) - io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL); - rcu_read_unlock(); - spin_lock_irq(&wq->hash->wait.lock); for_each_node(node) { struct io_wqe *wqe = wq->wqes[node];