提交 41adafa0 编写于 作者: T Trond Myklebust

Merge branch 'bh-remove'

...@@ -183,8 +183,9 @@ struct rpc_task_setup { ...@@ -183,8 +183,9 @@ struct rpc_task_setup {
#define RPC_NR_PRIORITY (1 + RPC_PRIORITY_PRIVILEGED - RPC_PRIORITY_LOW) #define RPC_NR_PRIORITY (1 + RPC_PRIORITY_PRIVILEGED - RPC_PRIORITY_LOW)
struct rpc_timer { struct rpc_timer {
struct timer_list timer;
struct list_head list; struct list_head list;
unsigned long expires;
struct delayed_work dwork;
}; };
/* /*
......
...@@ -56,6 +56,7 @@ struct sock_xprt { ...@@ -56,6 +56,7 @@ struct sock_xprt {
*/ */
unsigned long sock_state; unsigned long sock_state;
struct delayed_work connect_worker; struct delayed_work connect_worker;
struct work_struct error_worker;
struct work_struct recv_worker; struct work_struct recv_worker;
struct mutex recv_mutex; struct mutex recv_mutex;
struct sockaddr_storage srcaddr; struct sockaddr_storage srcaddr;
...@@ -84,6 +85,10 @@ struct sock_xprt { ...@@ -84,6 +85,10 @@ struct sock_xprt {
#define XPRT_SOCK_CONNECTING 1U #define XPRT_SOCK_CONNECTING 1U
#define XPRT_SOCK_DATA_READY (2) #define XPRT_SOCK_DATA_READY (2)
#define XPRT_SOCK_UPD_TIMEOUT (3) #define XPRT_SOCK_UPD_TIMEOUT (3)
#define XPRT_SOCK_WAKE_ERROR (4)
#define XPRT_SOCK_WAKE_WRITE (5)
#define XPRT_SOCK_WAKE_PENDING (6)
#define XPRT_SOCK_WAKE_DISCONNECT (7)
#endif /* __KERNEL__ */ #endif /* __KERNEL__ */
......
...@@ -47,7 +47,7 @@ static mempool_t *rpc_buffer_mempool __read_mostly; ...@@ -47,7 +47,7 @@ static mempool_t *rpc_buffer_mempool __read_mostly;
static void rpc_async_schedule(struct work_struct *); static void rpc_async_schedule(struct work_struct *);
static void rpc_release_task(struct rpc_task *task); static void rpc_release_task(struct rpc_task *task);
static void __rpc_queue_timer_fn(struct timer_list *t); static void __rpc_queue_timer_fn(struct work_struct *);
/* /*
* RPC tasks sit here while waiting for conditions to improve. * RPC tasks sit here while waiting for conditions to improve.
...@@ -88,13 +88,19 @@ __rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task) ...@@ -88,13 +88,19 @@ __rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
task->tk_timeout = 0; task->tk_timeout = 0;
list_del(&task->u.tk_wait.timer_list); list_del(&task->u.tk_wait.timer_list);
if (list_empty(&queue->timer_list.list)) if (list_empty(&queue->timer_list.list))
del_timer(&queue->timer_list.timer); cancel_delayed_work(&queue->timer_list.dwork);
} }
static void static void
rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires) rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires)
{ {
timer_reduce(&queue->timer_list.timer, expires); unsigned long now = jiffies;
queue->timer_list.expires = expires;
if (time_before_eq(expires, now))
expires = 0;
else
expires -= now;
mod_delayed_work(rpciod_workqueue, &queue->timer_list.dwork, expires);
} }
/* /*
...@@ -108,6 +114,7 @@ __rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task, ...@@ -108,6 +114,7 @@ __rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task,
task->tk_pid, jiffies_to_msecs(timeout - jiffies)); task->tk_pid, jiffies_to_msecs(timeout - jiffies));
task->tk_timeout = timeout; task->tk_timeout = timeout;
if (list_empty(&queue->timer_list.list) || time_before(timeout, queue->timer_list.expires))
rpc_set_queue_timer(queue, timeout); rpc_set_queue_timer(queue, timeout);
list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list); list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list);
} }
...@@ -251,7 +258,8 @@ static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const c ...@@ -251,7 +258,8 @@ static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const c
queue->maxpriority = nr_queues - 1; queue->maxpriority = nr_queues - 1;
rpc_reset_waitqueue_priority(queue); rpc_reset_waitqueue_priority(queue);
queue->qlen = 0; queue->qlen = 0;
timer_setup(&queue->timer_list.timer, __rpc_queue_timer_fn, 0); queue->timer_list.expires = 0;
INIT_DEFERRABLE_WORK(&queue->timer_list.dwork, __rpc_queue_timer_fn);
INIT_LIST_HEAD(&queue->timer_list.list); INIT_LIST_HEAD(&queue->timer_list.list);
rpc_assign_waitqueue_name(queue, qname); rpc_assign_waitqueue_name(queue, qname);
} }
...@@ -270,7 +278,7 @@ EXPORT_SYMBOL_GPL(rpc_init_wait_queue); ...@@ -270,7 +278,7 @@ EXPORT_SYMBOL_GPL(rpc_init_wait_queue);
void rpc_destroy_wait_queue(struct rpc_wait_queue *queue) void rpc_destroy_wait_queue(struct rpc_wait_queue *queue)
{ {
del_timer_sync(&queue->timer_list.timer); cancel_delayed_work_sync(&queue->timer_list.dwork);
} }
EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue); EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue);
...@@ -425,9 +433,9 @@ void rpc_sleep_on_timeout(struct rpc_wait_queue *q, struct rpc_task *task, ...@@ -425,9 +433,9 @@ void rpc_sleep_on_timeout(struct rpc_wait_queue *q, struct rpc_task *task,
/* /*
* Protect the queue operations. * Protect the queue operations.
*/ */
spin_lock_bh(&q->lock); spin_lock(&q->lock);
__rpc_sleep_on_priority_timeout(q, task, timeout, task->tk_priority); __rpc_sleep_on_priority_timeout(q, task, timeout, task->tk_priority);
spin_unlock_bh(&q->lock); spin_unlock(&q->lock);
} }
EXPORT_SYMBOL_GPL(rpc_sleep_on_timeout); EXPORT_SYMBOL_GPL(rpc_sleep_on_timeout);
...@@ -443,9 +451,9 @@ void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task, ...@@ -443,9 +451,9 @@ void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
/* /*
* Protect the queue operations. * Protect the queue operations.
*/ */
spin_lock_bh(&q->lock); spin_lock(&q->lock);
__rpc_sleep_on_priority(q, task, task->tk_priority); __rpc_sleep_on_priority(q, task, task->tk_priority);
spin_unlock_bh(&q->lock); spin_unlock(&q->lock);
} }
EXPORT_SYMBOL_GPL(rpc_sleep_on); EXPORT_SYMBOL_GPL(rpc_sleep_on);
...@@ -459,9 +467,9 @@ void rpc_sleep_on_priority_timeout(struct rpc_wait_queue *q, ...@@ -459,9 +467,9 @@ void rpc_sleep_on_priority_timeout(struct rpc_wait_queue *q,
/* /*
* Protect the queue operations. * Protect the queue operations.
*/ */
spin_lock_bh(&q->lock); spin_lock(&q->lock);
__rpc_sleep_on_priority_timeout(q, task, timeout, priority); __rpc_sleep_on_priority_timeout(q, task, timeout, priority);
spin_unlock_bh(&q->lock); spin_unlock(&q->lock);
} }
EXPORT_SYMBOL_GPL(rpc_sleep_on_priority_timeout); EXPORT_SYMBOL_GPL(rpc_sleep_on_priority_timeout);
...@@ -476,9 +484,9 @@ void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task, ...@@ -476,9 +484,9 @@ void rpc_sleep_on_priority(struct rpc_wait_queue *q, struct rpc_task *task,
/* /*
* Protect the queue operations. * Protect the queue operations.
*/ */
spin_lock_bh(&q->lock); spin_lock(&q->lock);
__rpc_sleep_on_priority(q, task, priority); __rpc_sleep_on_priority(q, task, priority);
spin_unlock_bh(&q->lock); spin_unlock(&q->lock);
} }
EXPORT_SYMBOL_GPL(rpc_sleep_on_priority); EXPORT_SYMBOL_GPL(rpc_sleep_on_priority);
...@@ -556,9 +564,9 @@ void rpc_wake_up_queued_task_on_wq(struct workqueue_struct *wq, ...@@ -556,9 +564,9 @@ void rpc_wake_up_queued_task_on_wq(struct workqueue_struct *wq,
{ {
if (!RPC_IS_QUEUED(task)) if (!RPC_IS_QUEUED(task))
return; return;
spin_lock_bh(&queue->lock); spin_lock(&queue->lock);
rpc_wake_up_task_on_wq_queue_locked(wq, queue, task); rpc_wake_up_task_on_wq_queue_locked(wq, queue, task);
spin_unlock_bh(&queue->lock); spin_unlock(&queue->lock);
} }
/* /*
...@@ -568,9 +576,9 @@ void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task ...@@ -568,9 +576,9 @@ void rpc_wake_up_queued_task(struct rpc_wait_queue *queue, struct rpc_task *task
{ {
if (!RPC_IS_QUEUED(task)) if (!RPC_IS_QUEUED(task))
return; return;
spin_lock_bh(&queue->lock); spin_lock(&queue->lock);
rpc_wake_up_task_queue_locked(queue, task); rpc_wake_up_task_queue_locked(queue, task);
spin_unlock_bh(&queue->lock); spin_unlock(&queue->lock);
} }
EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task); EXPORT_SYMBOL_GPL(rpc_wake_up_queued_task);
...@@ -603,9 +611,9 @@ rpc_wake_up_queued_task_set_status(struct rpc_wait_queue *queue, ...@@ -603,9 +611,9 @@ rpc_wake_up_queued_task_set_status(struct rpc_wait_queue *queue,
{ {
if (!RPC_IS_QUEUED(task)) if (!RPC_IS_QUEUED(task))
return; return;
spin_lock_bh(&queue->lock); spin_lock(&queue->lock);
rpc_wake_up_task_queue_set_status_locked(queue, task, status); rpc_wake_up_task_queue_set_status_locked(queue, task, status);
spin_unlock_bh(&queue->lock); spin_unlock(&queue->lock);
} }
/* /*
...@@ -668,12 +676,12 @@ struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq, ...@@ -668,12 +676,12 @@ struct rpc_task *rpc_wake_up_first_on_wq(struct workqueue_struct *wq,
dprintk("RPC: wake_up_first(%p \"%s\")\n", dprintk("RPC: wake_up_first(%p \"%s\")\n",
queue, rpc_qname(queue)); queue, rpc_qname(queue));
spin_lock_bh(&queue->lock); spin_lock(&queue->lock);
task = __rpc_find_next_queued(queue); task = __rpc_find_next_queued(queue);
if (task != NULL) if (task != NULL)
task = rpc_wake_up_task_on_wq_queue_action_locked(wq, queue, task = rpc_wake_up_task_on_wq_queue_action_locked(wq, queue,
task, func, data); task, func, data);
spin_unlock_bh(&queue->lock); spin_unlock(&queue->lock);
return task; return task;
} }
...@@ -712,7 +720,7 @@ void rpc_wake_up(struct rpc_wait_queue *queue) ...@@ -712,7 +720,7 @@ void rpc_wake_up(struct rpc_wait_queue *queue)
{ {
struct list_head *head; struct list_head *head;
spin_lock_bh(&queue->lock); spin_lock(&queue->lock);
head = &queue->tasks[queue->maxpriority]; head = &queue->tasks[queue->maxpriority];
for (;;) { for (;;) {
while (!list_empty(head)) { while (!list_empty(head)) {
...@@ -726,7 +734,7 @@ void rpc_wake_up(struct rpc_wait_queue *queue) ...@@ -726,7 +734,7 @@ void rpc_wake_up(struct rpc_wait_queue *queue)
break; break;
head--; head--;
} }
spin_unlock_bh(&queue->lock); spin_unlock(&queue->lock);
} }
EXPORT_SYMBOL_GPL(rpc_wake_up); EXPORT_SYMBOL_GPL(rpc_wake_up);
...@@ -741,7 +749,7 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status) ...@@ -741,7 +749,7 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
{ {
struct list_head *head; struct list_head *head;
spin_lock_bh(&queue->lock); spin_lock(&queue->lock);
head = &queue->tasks[queue->maxpriority]; head = &queue->tasks[queue->maxpriority];
for (;;) { for (;;) {
while (!list_empty(head)) { while (!list_empty(head)) {
...@@ -756,13 +764,15 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status) ...@@ -756,13 +764,15 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
break; break;
head--; head--;
} }
spin_unlock_bh(&queue->lock); spin_unlock(&queue->lock);
} }
EXPORT_SYMBOL_GPL(rpc_wake_up_status); EXPORT_SYMBOL_GPL(rpc_wake_up_status);
static void __rpc_queue_timer_fn(struct timer_list *t) static void __rpc_queue_timer_fn(struct work_struct *work)
{ {
struct rpc_wait_queue *queue = from_timer(queue, t, timer_list.timer); struct rpc_wait_queue *queue = container_of(work,
struct rpc_wait_queue,
timer_list.dwork.work);
struct rpc_task *task, *n; struct rpc_task *task, *n;
unsigned long expires, now, timeo; unsigned long expires, now, timeo;
...@@ -932,13 +942,13 @@ static void __rpc_execute(struct rpc_task *task) ...@@ -932,13 +942,13 @@ static void __rpc_execute(struct rpc_task *task)
* rpc_task pointer may still be dereferenced. * rpc_task pointer may still be dereferenced.
*/ */
queue = task->tk_waitqueue; queue = task->tk_waitqueue;
spin_lock_bh(&queue->lock); spin_lock(&queue->lock);
if (!RPC_IS_QUEUED(task)) { if (!RPC_IS_QUEUED(task)) {
spin_unlock_bh(&queue->lock); spin_unlock(&queue->lock);
continue; continue;
} }
rpc_clear_running(task); rpc_clear_running(task);
spin_unlock_bh(&queue->lock); spin_unlock(&queue->lock);
if (task_is_async) if (task_is_async)
return; return;
......
...@@ -302,9 +302,9 @@ static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task) ...@@ -302,9 +302,9 @@ static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task) if (test_bit(XPRT_LOCKED, &xprt->state) && xprt->snd_task == task)
return 1; return 1;
spin_lock_bh(&xprt->transport_lock); spin_lock(&xprt->transport_lock);
retval = xprt->ops->reserve_xprt(xprt, task); retval = xprt->ops->reserve_xprt(xprt, task);
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
return retval; return retval;
} }
...@@ -381,9 +381,9 @@ static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *ta ...@@ -381,9 +381,9 @@ static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *ta
{ {
if (xprt->snd_task != task) if (xprt->snd_task != task)
return; return;
spin_lock_bh(&xprt->transport_lock); spin_lock(&xprt->transport_lock);
xprt->ops->release_xprt(xprt, task); xprt->ops->release_xprt(xprt, task);
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
} }
/* /*
...@@ -435,9 +435,9 @@ xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req) ...@@ -435,9 +435,9 @@ xprt_request_get_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
if (req->rq_cong) if (req->rq_cong)
return true; return true;
spin_lock_bh(&xprt->transport_lock); spin_lock(&xprt->transport_lock);
ret = __xprt_get_cong(xprt, req) != 0; ret = __xprt_get_cong(xprt, req) != 0;
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
return ret; return ret;
} }
EXPORT_SYMBOL_GPL(xprt_request_get_cong); EXPORT_SYMBOL_GPL(xprt_request_get_cong);
...@@ -464,9 +464,9 @@ static void ...@@ -464,9 +464,9 @@ static void
xprt_clear_congestion_window_wait(struct rpc_xprt *xprt) xprt_clear_congestion_window_wait(struct rpc_xprt *xprt)
{ {
if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) { if (test_and_clear_bit(XPRT_CWND_WAIT, &xprt->state)) {
spin_lock_bh(&xprt->transport_lock); spin_lock(&xprt->transport_lock);
__xprt_lock_write_next_cong(xprt); __xprt_lock_write_next_cong(xprt);
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
} }
} }
...@@ -563,9 +563,9 @@ bool xprt_write_space(struct rpc_xprt *xprt) ...@@ -563,9 +563,9 @@ bool xprt_write_space(struct rpc_xprt *xprt)
if (!test_bit(XPRT_WRITE_SPACE, &xprt->state)) if (!test_bit(XPRT_WRITE_SPACE, &xprt->state))
return false; return false;
spin_lock_bh(&xprt->transport_lock); spin_lock(&xprt->transport_lock);
ret = xprt_clear_write_space_locked(xprt); ret = xprt_clear_write_space_locked(xprt);
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
return ret; return ret;
} }
EXPORT_SYMBOL_GPL(xprt_write_space); EXPORT_SYMBOL_GPL(xprt_write_space);
...@@ -634,9 +634,9 @@ int xprt_adjust_timeout(struct rpc_rqst *req) ...@@ -634,9 +634,9 @@ int xprt_adjust_timeout(struct rpc_rqst *req)
req->rq_retries = 0; req->rq_retries = 0;
xprt_reset_majortimeo(req); xprt_reset_majortimeo(req);
/* Reset the RTT counters == "slow start" */ /* Reset the RTT counters == "slow start" */
spin_lock_bh(&xprt->transport_lock); spin_lock(&xprt->transport_lock);
rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
status = -ETIMEDOUT; status = -ETIMEDOUT;
} }
...@@ -668,11 +668,11 @@ static void xprt_autoclose(struct work_struct *work) ...@@ -668,11 +668,11 @@ static void xprt_autoclose(struct work_struct *work)
void xprt_disconnect_done(struct rpc_xprt *xprt) void xprt_disconnect_done(struct rpc_xprt *xprt)
{ {
dprintk("RPC: disconnected transport %p\n", xprt); dprintk("RPC: disconnected transport %p\n", xprt);
spin_lock_bh(&xprt->transport_lock); spin_lock(&xprt->transport_lock);
xprt_clear_connected(xprt); xprt_clear_connected(xprt);
xprt_clear_write_space_locked(xprt); xprt_clear_write_space_locked(xprt);
xprt_wake_pending_tasks(xprt, -ENOTCONN); xprt_wake_pending_tasks(xprt, -ENOTCONN);
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
} }
EXPORT_SYMBOL_GPL(xprt_disconnect_done); EXPORT_SYMBOL_GPL(xprt_disconnect_done);
...@@ -684,7 +684,7 @@ EXPORT_SYMBOL_GPL(xprt_disconnect_done); ...@@ -684,7 +684,7 @@ EXPORT_SYMBOL_GPL(xprt_disconnect_done);
void xprt_force_disconnect(struct rpc_xprt *xprt) void xprt_force_disconnect(struct rpc_xprt *xprt)
{ {
/* Don't race with the test_bit() in xprt_clear_locked() */ /* Don't race with the test_bit() in xprt_clear_locked() */
spin_lock_bh(&xprt->transport_lock); spin_lock(&xprt->transport_lock);
set_bit(XPRT_CLOSE_WAIT, &xprt->state); set_bit(XPRT_CLOSE_WAIT, &xprt->state);
/* Try to schedule an autoclose RPC call */ /* Try to schedule an autoclose RPC call */
if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0) if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
...@@ -692,7 +692,7 @@ void xprt_force_disconnect(struct rpc_xprt *xprt) ...@@ -692,7 +692,7 @@ void xprt_force_disconnect(struct rpc_xprt *xprt)
else if (xprt->snd_task) else if (xprt->snd_task)
rpc_wake_up_queued_task_set_status(&xprt->pending, rpc_wake_up_queued_task_set_status(&xprt->pending,
xprt->snd_task, -ENOTCONN); xprt->snd_task, -ENOTCONN);
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
} }
EXPORT_SYMBOL_GPL(xprt_force_disconnect); EXPORT_SYMBOL_GPL(xprt_force_disconnect);
...@@ -726,7 +726,7 @@ xprt_request_retransmit_after_disconnect(struct rpc_task *task) ...@@ -726,7 +726,7 @@ xprt_request_retransmit_after_disconnect(struct rpc_task *task)
void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
{ {
/* Don't race with the test_bit() in xprt_clear_locked() */ /* Don't race with the test_bit() in xprt_clear_locked() */
spin_lock_bh(&xprt->transport_lock); spin_lock(&xprt->transport_lock);
if (cookie != xprt->connect_cookie) if (cookie != xprt->connect_cookie)
goto out; goto out;
if (test_bit(XPRT_CLOSING, &xprt->state)) if (test_bit(XPRT_CLOSING, &xprt->state))
...@@ -737,7 +737,7 @@ void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie) ...@@ -737,7 +737,7 @@ void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
queue_work(xprtiod_workqueue, &xprt->task_cleanup); queue_work(xprtiod_workqueue, &xprt->task_cleanup);
xprt_wake_pending_tasks(xprt, -EAGAIN); xprt_wake_pending_tasks(xprt, -EAGAIN);
out: out:
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
} }
static bool static bool
...@@ -759,18 +759,13 @@ xprt_init_autodisconnect(struct timer_list *t) ...@@ -759,18 +759,13 @@ xprt_init_autodisconnect(struct timer_list *t)
{ {
struct rpc_xprt *xprt = from_timer(xprt, t, timer); struct rpc_xprt *xprt = from_timer(xprt, t, timer);
spin_lock(&xprt->transport_lock);
if (!RB_EMPTY_ROOT(&xprt->recv_queue)) if (!RB_EMPTY_ROOT(&xprt->recv_queue))
goto out_abort; return;
/* Reset xprt->last_used to avoid connect/autodisconnect cycling */ /* Reset xprt->last_used to avoid connect/autodisconnect cycling */
xprt->last_used = jiffies; xprt->last_used = jiffies;
if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
goto out_abort;
spin_unlock(&xprt->transport_lock);
queue_work(xprtiod_workqueue, &xprt->task_cleanup);
return; return;
out_abort: queue_work(xprtiod_workqueue, &xprt->task_cleanup);
spin_unlock(&xprt->transport_lock);
} }
bool xprt_lock_connect(struct rpc_xprt *xprt, bool xprt_lock_connect(struct rpc_xprt *xprt,
...@@ -779,7 +774,7 @@ bool xprt_lock_connect(struct rpc_xprt *xprt, ...@@ -779,7 +774,7 @@ bool xprt_lock_connect(struct rpc_xprt *xprt,
{ {
bool ret = false; bool ret = false;
spin_lock_bh(&xprt->transport_lock); spin_lock(&xprt->transport_lock);
if (!test_bit(XPRT_LOCKED, &xprt->state)) if (!test_bit(XPRT_LOCKED, &xprt->state))
goto out; goto out;
if (xprt->snd_task != task) if (xprt->snd_task != task)
...@@ -787,13 +782,13 @@ bool xprt_lock_connect(struct rpc_xprt *xprt, ...@@ -787,13 +782,13 @@ bool xprt_lock_connect(struct rpc_xprt *xprt,
xprt->snd_task = cookie; xprt->snd_task = cookie;
ret = true; ret = true;
out: out:
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
return ret; return ret;
} }
void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie)
{ {
spin_lock_bh(&xprt->transport_lock); spin_lock(&xprt->transport_lock);
if (xprt->snd_task != cookie) if (xprt->snd_task != cookie)
goto out; goto out;
if (!test_bit(XPRT_LOCKED, &xprt->state)) if (!test_bit(XPRT_LOCKED, &xprt->state))
...@@ -802,7 +797,7 @@ void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie) ...@@ -802,7 +797,7 @@ void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie)
xprt->ops->release_xprt(xprt, NULL); xprt->ops->release_xprt(xprt, NULL);
xprt_schedule_autodisconnect(xprt); xprt_schedule_autodisconnect(xprt);
out: out:
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
wake_up_bit(&xprt->state, XPRT_LOCKED); wake_up_bit(&xprt->state, XPRT_LOCKED);
} }
...@@ -1412,14 +1407,14 @@ xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task) ...@@ -1412,14 +1407,14 @@ xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task)
xprt_inject_disconnect(xprt); xprt_inject_disconnect(xprt);
task->tk_flags |= RPC_TASK_SENT; task->tk_flags |= RPC_TASK_SENT;
spin_lock_bh(&xprt->transport_lock); spin_lock(&xprt->transport_lock);
xprt->stat.sends++; xprt->stat.sends++;
xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs; xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
xprt->stat.bklog_u += xprt->backlog.qlen; xprt->stat.bklog_u += xprt->backlog.qlen;
xprt->stat.sending_u += xprt->sending.qlen; xprt->stat.sending_u += xprt->sending.qlen;
xprt->stat.pending_u += xprt->pending.qlen; xprt->stat.pending_u += xprt->pending.qlen;
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
req->rq_connect_cookie = connect_cookie; req->rq_connect_cookie = connect_cookie;
out_dequeue: out_dequeue:
...@@ -1766,13 +1761,13 @@ void xprt_release(struct rpc_task *task) ...@@ -1766,13 +1761,13 @@ void xprt_release(struct rpc_task *task)
xprt = req->rq_xprt; xprt = req->rq_xprt;
xprt_request_dequeue_all(task, req); xprt_request_dequeue_all(task, req);
spin_lock_bh(&xprt->transport_lock); spin_lock(&xprt->transport_lock);
xprt->ops->release_xprt(xprt, task); xprt->ops->release_xprt(xprt, task);
if (xprt->ops->release_request) if (xprt->ops->release_request)
xprt->ops->release_request(task); xprt->ops->release_request(task);
xprt->last_used = jiffies; xprt->last_used = jiffies;
xprt_schedule_autodisconnect(xprt); xprt_schedule_autodisconnect(xprt);
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
if (req->rq_buffer) if (req->rq_buffer)
xprt->ops->buf_free(task); xprt->ops->buf_free(task);
xprt_inject_disconnect(xprt); xprt_inject_disconnect(xprt);
......
...@@ -1360,10 +1360,10 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep) ...@@ -1360,10 +1360,10 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
else if (credits > buf->rb_max_requests) else if (credits > buf->rb_max_requests)
credits = buf->rb_max_requests; credits = buf->rb_max_requests;
if (buf->rb_credits != credits) { if (buf->rb_credits != credits) {
spin_lock_bh(&xprt->transport_lock); spin_lock(&xprt->transport_lock);
buf->rb_credits = credits; buf->rb_credits = credits;
xprt->cwnd = credits << RPC_CWNDSHIFT; xprt->cwnd = credits << RPC_CWNDSHIFT;
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
} }
req = rpcr_to_rdmar(rqst); req = rpcr_to_rdmar(rqst);
......
...@@ -72,9 +72,9 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp, ...@@ -72,9 +72,9 @@ int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt, __be32 *rdma_resp,
else if (credits > r_xprt->rx_buf.rb_bc_max_requests) else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
credits = r_xprt->rx_buf.rb_bc_max_requests; credits = r_xprt->rx_buf.rb_bc_max_requests;
spin_lock_bh(&xprt->transport_lock); spin_lock(&xprt->transport_lock);
xprt->cwnd = credits << RPC_CWNDSHIFT; xprt->cwnd = credits << RPC_CWNDSHIFT;
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
spin_lock(&xprt->queue_lock); spin_lock(&xprt->queue_lock);
ret = 0; ret = 0;
......
...@@ -226,9 +226,9 @@ static void handle_connect_req(struct rdma_cm_id *new_cma_id, ...@@ -226,9 +226,9 @@ static void handle_connect_req(struct rdma_cm_id *new_cma_id,
* Enqueue the new transport on the accept queue of the listening * Enqueue the new transport on the accept queue of the listening
* transport * transport
*/ */
spin_lock_bh(&listen_xprt->sc_lock); spin_lock(&listen_xprt->sc_lock);
list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q); list_add_tail(&newxprt->sc_accept_q, &listen_xprt->sc_accept_q);
spin_unlock_bh(&listen_xprt->sc_lock); spin_unlock(&listen_xprt->sc_lock);
set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags); set_bit(XPT_CONN, &listen_xprt->sc_xprt.xpt_flags);
svc_xprt_enqueue(&listen_xprt->sc_xprt); svc_xprt_enqueue(&listen_xprt->sc_xprt);
...@@ -401,7 +401,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) ...@@ -401,7 +401,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt); listen_rdma = container_of(xprt, struct svcxprt_rdma, sc_xprt);
clear_bit(XPT_CONN, &xprt->xpt_flags); clear_bit(XPT_CONN, &xprt->xpt_flags);
/* Get the next entry off the accept list */ /* Get the next entry off the accept list */
spin_lock_bh(&listen_rdma->sc_lock); spin_lock(&listen_rdma->sc_lock);
if (!list_empty(&listen_rdma->sc_accept_q)) { if (!list_empty(&listen_rdma->sc_accept_q)) {
newxprt = list_entry(listen_rdma->sc_accept_q.next, newxprt = list_entry(listen_rdma->sc_accept_q.next,
struct svcxprt_rdma, sc_accept_q); struct svcxprt_rdma, sc_accept_q);
...@@ -409,7 +409,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt) ...@@ -409,7 +409,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
} }
if (!list_empty(&listen_rdma->sc_accept_q)) if (!list_empty(&listen_rdma->sc_accept_q))
set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags); set_bit(XPT_CONN, &listen_rdma->sc_xprt.xpt_flags);
spin_unlock_bh(&listen_rdma->sc_lock); spin_unlock(&listen_rdma->sc_lock);
if (!newxprt) if (!newxprt)
return NULL; return NULL;
......
...@@ -880,7 +880,7 @@ static int xs_nospace(struct rpc_rqst *req) ...@@ -880,7 +880,7 @@ static int xs_nospace(struct rpc_rqst *req)
req->rq_slen); req->rq_slen);
/* Protect against races with write_space */ /* Protect against races with write_space */
spin_lock_bh(&xprt->transport_lock); spin_lock(&xprt->transport_lock);
/* Don't race with disconnect */ /* Don't race with disconnect */
if (xprt_connected(xprt)) { if (xprt_connected(xprt)) {
...@@ -890,7 +890,7 @@ static int xs_nospace(struct rpc_rqst *req) ...@@ -890,7 +890,7 @@ static int xs_nospace(struct rpc_rqst *req)
} else } else
ret = -ENOTCONN; ret = -ENOTCONN;
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
/* Race breaker in case memory is freed before above code is called */ /* Race breaker in case memory is freed before above code is called */
if (ret == -EAGAIN) { if (ret == -EAGAIN) {
...@@ -1211,6 +1211,15 @@ static void xs_sock_reset_state_flags(struct rpc_xprt *xprt) ...@@ -1211,6 +1211,15 @@ static void xs_sock_reset_state_flags(struct rpc_xprt *xprt)
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state); clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state);
clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state);
clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state);
}
static void xs_run_error_worker(struct sock_xprt *transport, unsigned int nr)
{
set_bit(nr, &transport->sock_state);
queue_work(xprtiod_workqueue, &transport->error_worker);
} }
static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt) static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
...@@ -1231,6 +1240,7 @@ static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt) ...@@ -1231,6 +1240,7 @@ static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
*/ */
static void xs_error_report(struct sock *sk) static void xs_error_report(struct sock *sk)
{ {
struct sock_xprt *transport;
struct rpc_xprt *xprt; struct rpc_xprt *xprt;
int err; int err;
...@@ -1238,13 +1248,14 @@ static void xs_error_report(struct sock *sk) ...@@ -1238,13 +1248,14 @@ static void xs_error_report(struct sock *sk)
if (!(xprt = xprt_from_sock(sk))) if (!(xprt = xprt_from_sock(sk)))
goto out; goto out;
transport = container_of(xprt, struct sock_xprt, xprt);
err = -sk->sk_err; err = -sk->sk_err;
if (err == 0) if (err == 0)
goto out; goto out;
dprintk("RPC: xs_error_report client %p, error=%d...\n", dprintk("RPC: xs_error_report client %p, error=%d...\n",
xprt, -err); xprt, -err);
trace_rpc_socket_error(xprt, sk->sk_socket, err); trace_rpc_socket_error(xprt, sk->sk_socket, err);
xprt_wake_pending_tasks(xprt, err); xs_run_error_worker(transport, XPRT_SOCK_WAKE_ERROR);
out: out:
read_unlock_bh(&sk->sk_callback_lock); read_unlock_bh(&sk->sk_callback_lock);
} }
...@@ -1333,6 +1344,7 @@ static void xs_destroy(struct rpc_xprt *xprt) ...@@ -1333,6 +1344,7 @@ static void xs_destroy(struct rpc_xprt *xprt)
cancel_delayed_work_sync(&transport->connect_worker); cancel_delayed_work_sync(&transport->connect_worker);
xs_close(xprt); xs_close(xprt);
cancel_work_sync(&transport->recv_worker); cancel_work_sync(&transport->recv_worker);
cancel_work_sync(&transport->error_worker);
xs_xprt_free(xprt); xs_xprt_free(xprt);
module_put(THIS_MODULE); module_put(THIS_MODULE);
} }
...@@ -1386,9 +1398,9 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt, ...@@ -1386,9 +1398,9 @@ static void xs_udp_data_read_skb(struct rpc_xprt *xprt,
} }
spin_lock_bh(&xprt->transport_lock); spin_lock(&xprt->transport_lock);
xprt_adjust_cwnd(xprt, task, copied); xprt_adjust_cwnd(xprt, task, copied);
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
spin_lock(&xprt->queue_lock); spin_lock(&xprt->queue_lock);
xprt_complete_rqst(task, copied); xprt_complete_rqst(task, copied);
__UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS); __UDPX_INC_STATS(sk, UDP_MIB_INDATAGRAMS);
...@@ -1498,7 +1510,6 @@ static void xs_tcp_state_change(struct sock *sk) ...@@ -1498,7 +1510,6 @@ static void xs_tcp_state_change(struct sock *sk)
trace_rpc_socket_state_change(xprt, sk->sk_socket); trace_rpc_socket_state_change(xprt, sk->sk_socket);
switch (sk->sk_state) { switch (sk->sk_state) {
case TCP_ESTABLISHED: case TCP_ESTABLISHED:
spin_lock(&xprt->transport_lock);
if (!xprt_test_and_set_connected(xprt)) { if (!xprt_test_and_set_connected(xprt)) {
xprt->connect_cookie++; xprt->connect_cookie++;
clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state); clear_bit(XPRT_SOCK_CONNECTING, &transport->sock_state);
...@@ -1507,9 +1518,8 @@ static void xs_tcp_state_change(struct sock *sk) ...@@ -1507,9 +1518,8 @@ static void xs_tcp_state_change(struct sock *sk)
xprt->stat.connect_count++; xprt->stat.connect_count++;
xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_time += (long)jiffies -
xprt->stat.connect_start; xprt->stat.connect_start;
xprt_wake_pending_tasks(xprt, -EAGAIN); xs_run_error_worker(transport, XPRT_SOCK_WAKE_PENDING);
} }
spin_unlock(&xprt->transport_lock);
break; break;
case TCP_FIN_WAIT1: case TCP_FIN_WAIT1:
/* The client initiated a shutdown of the socket */ /* The client initiated a shutdown of the socket */
...@@ -1525,7 +1535,7 @@ static void xs_tcp_state_change(struct sock *sk) ...@@ -1525,7 +1535,7 @@ static void xs_tcp_state_change(struct sock *sk)
/* The server initiated a shutdown of the socket */ /* The server initiated a shutdown of the socket */
xprt->connect_cookie++; xprt->connect_cookie++;
clear_bit(XPRT_CONNECTED, &xprt->state); clear_bit(XPRT_CONNECTED, &xprt->state);
xs_tcp_force_close(xprt); xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT);
/* fall through */ /* fall through */
case TCP_CLOSING: case TCP_CLOSING:
/* /*
...@@ -1547,7 +1557,7 @@ static void xs_tcp_state_change(struct sock *sk) ...@@ -1547,7 +1557,7 @@ static void xs_tcp_state_change(struct sock *sk)
xprt_clear_connecting(xprt); xprt_clear_connecting(xprt);
clear_bit(XPRT_CLOSING, &xprt->state); clear_bit(XPRT_CLOSING, &xprt->state);
/* Trigger the socket release */ /* Trigger the socket release */
xs_tcp_force_close(xprt); xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT);
} }
out: out:
read_unlock_bh(&sk->sk_callback_lock); read_unlock_bh(&sk->sk_callback_lock);
...@@ -1556,6 +1566,7 @@ static void xs_tcp_state_change(struct sock *sk) ...@@ -1556,6 +1566,7 @@ static void xs_tcp_state_change(struct sock *sk)
static void xs_write_space(struct sock *sk) static void xs_write_space(struct sock *sk)
{ {
struct socket_wq *wq; struct socket_wq *wq;
struct sock_xprt *transport;
struct rpc_xprt *xprt; struct rpc_xprt *xprt;
if (!sk->sk_socket) if (!sk->sk_socket)
...@@ -1564,12 +1575,13 @@ static void xs_write_space(struct sock *sk) ...@@ -1564,12 +1575,13 @@ static void xs_write_space(struct sock *sk)
if (unlikely(!(xprt = xprt_from_sock(sk)))) if (unlikely(!(xprt = xprt_from_sock(sk))))
return; return;
transport = container_of(xprt, struct sock_xprt, xprt);
rcu_read_lock(); rcu_read_lock();
wq = rcu_dereference(sk->sk_wq); wq = rcu_dereference(sk->sk_wq);
if (!wq || test_and_clear_bit(SOCKWQ_ASYNC_NOSPACE, &wq->flags) == 0) if (!wq || test_and_clear_bit(SOCKWQ_ASYNC_NOSPACE, &wq->flags) == 0)
goto out; goto out;
if (xprt_write_space(xprt)) xs_run_error_worker(transport, XPRT_SOCK_WAKE_WRITE);
sk->sk_write_pending--; sk->sk_write_pending--;
out: out:
rcu_read_unlock(); rcu_read_unlock();
...@@ -1664,9 +1676,9 @@ static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t ...@@ -1664,9 +1676,9 @@ static void xs_udp_set_buffer_size(struct rpc_xprt *xprt, size_t sndsize, size_t
*/ */
static void xs_udp_timer(struct rpc_xprt *xprt, struct rpc_task *task) static void xs_udp_timer(struct rpc_xprt *xprt, struct rpc_task *task)
{ {
spin_lock_bh(&xprt->transport_lock); spin_lock(&xprt->transport_lock);
xprt_adjust_cwnd(xprt, task, -ETIMEDOUT); xprt_adjust_cwnd(xprt, task, -ETIMEDOUT);
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
} }
static int xs_get_random_port(void) static int xs_get_random_port(void)
...@@ -2201,13 +2213,13 @@ static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt, ...@@ -2201,13 +2213,13 @@ static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt,
unsigned int opt_on = 1; unsigned int opt_on = 1;
unsigned int timeo; unsigned int timeo;
spin_lock_bh(&xprt->transport_lock); spin_lock(&xprt->transport_lock);
keepidle = DIV_ROUND_UP(xprt->timeout->to_initval, HZ); keepidle = DIV_ROUND_UP(xprt->timeout->to_initval, HZ);
keepcnt = xprt->timeout->to_retries + 1; keepcnt = xprt->timeout->to_retries + 1;
timeo = jiffies_to_msecs(xprt->timeout->to_initval) * timeo = jiffies_to_msecs(xprt->timeout->to_initval) *
(xprt->timeout->to_retries + 1); (xprt->timeout->to_retries + 1);
clear_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state); clear_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state);
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
/* TCP Keepalive options */ /* TCP Keepalive options */
kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
...@@ -2232,7 +2244,7 @@ static void xs_tcp_set_connect_timeout(struct rpc_xprt *xprt, ...@@ -2232,7 +2244,7 @@ static void xs_tcp_set_connect_timeout(struct rpc_xprt *xprt,
struct rpc_timeout to; struct rpc_timeout to;
unsigned long initval; unsigned long initval;
spin_lock_bh(&xprt->transport_lock); spin_lock(&xprt->transport_lock);
if (reconnect_timeout < xprt->max_reconnect_timeout) if (reconnect_timeout < xprt->max_reconnect_timeout)
xprt->max_reconnect_timeout = reconnect_timeout; xprt->max_reconnect_timeout = reconnect_timeout;
if (connect_timeout < xprt->connect_timeout) { if (connect_timeout < xprt->connect_timeout) {
...@@ -2249,7 +2261,7 @@ static void xs_tcp_set_connect_timeout(struct rpc_xprt *xprt, ...@@ -2249,7 +2261,7 @@ static void xs_tcp_set_connect_timeout(struct rpc_xprt *xprt,
xprt->connect_timeout = connect_timeout; xprt->connect_timeout = connect_timeout;
} }
set_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state); set_bit(XPRT_SOCK_UPD_TIMEOUT, &transport->sock_state);
spin_unlock_bh(&xprt->transport_lock); spin_unlock(&xprt->transport_lock);
} }
static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock) static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
...@@ -2461,6 +2473,56 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task) ...@@ -2461,6 +2473,56 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
delay); delay);
} }
static void xs_wake_disconnect(struct sock_xprt *transport)
{
if (test_and_clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state))
xs_tcp_force_close(&transport->xprt);
}
static void xs_wake_write(struct sock_xprt *transport)
{
if (test_and_clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state))
xprt_write_space(&transport->xprt);
}
static void xs_wake_error(struct sock_xprt *transport)
{
int sockerr;
int sockerr_len = sizeof(sockerr);
if (!test_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state))
return;
mutex_lock(&transport->recv_mutex);
if (transport->sock == NULL)
goto out;
if (!test_and_clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state))
goto out;
if (kernel_getsockopt(transport->sock, SOL_SOCKET, SO_ERROR,
(char *)&sockerr, &sockerr_len) != 0)
goto out;
if (sockerr < 0)
xprt_wake_pending_tasks(&transport->xprt, sockerr);
out:
mutex_unlock(&transport->recv_mutex);
}
static void xs_wake_pending(struct sock_xprt *transport)
{
if (test_and_clear_bit(XPRT_SOCK_WAKE_PENDING, &transport->sock_state))
xprt_wake_pending_tasks(&transport->xprt, -EAGAIN);
}
static void xs_error_handle(struct work_struct *work)
{
struct sock_xprt *transport = container_of(work,
struct sock_xprt, error_worker);
xs_wake_disconnect(transport);
xs_wake_write(transport);
xs_wake_error(transport);
xs_wake_pending(transport);
}
/** /**
* xs_local_print_stats - display AF_LOCAL socket-specifc stats * xs_local_print_stats - display AF_LOCAL socket-specifc stats
* @xprt: rpc_xprt struct containing statistics * @xprt: rpc_xprt struct containing statistics
...@@ -2873,6 +2935,7 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args) ...@@ -2873,6 +2935,7 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
xprt->timeout = &xs_local_default_timeout; xprt->timeout = &xs_local_default_timeout;
INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn); INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
INIT_WORK(&transport->error_worker, xs_error_handle);
INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket); INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket);
switch (sun->sun_family) { switch (sun->sun_family) {
...@@ -2943,6 +3006,7 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args) ...@@ -2943,6 +3006,7 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
xprt->timeout = &xs_udp_default_timeout; xprt->timeout = &xs_udp_default_timeout;
INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn); INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn);
INIT_WORK(&transport->error_worker, xs_error_handle);
INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket); INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket);
switch (addr->sa_family) { switch (addr->sa_family) {
...@@ -3024,6 +3088,7 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args) ...@@ -3024,6 +3088,7 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
(xprt->timeout->to_retries + 1); (xprt->timeout->to_retries + 1);
INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn); INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
INIT_WORK(&transport->error_worker, xs_error_handle);
INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket); INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket);
switch (addr->sa_family) { switch (addr->sa_family) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册