diff --git a/drivers/xen/xenbus/xenbus.h b/drivers/xen/xenbus/xenbus.h index 51995276f5496c907a1e8455de582ea59ced4f73..149c5e7efc89e75f0fd56a01b40f0f15b462af04 100644 --- a/drivers/xen/xenbus/xenbus.h +++ b/drivers/xen/xenbus/xenbus.h @@ -32,6 +32,10 @@ #ifndef _XENBUS_XENBUS_H #define _XENBUS_XENBUS_H +#include +#include +#include + #define XEN_BUS_ID_SIZE 20 struct xen_bus_type { @@ -52,16 +56,49 @@ enum xenstore_init { XS_LOCAL, }; +struct xs_watch_event { + struct list_head list; + unsigned int len; + struct xenbus_watch *handle; + const char *path; + const char *token; + char body[]; +}; + +enum xb_req_state { + xb_req_state_queued, + xb_req_state_wait_reply, + xb_req_state_got_reply, + xb_req_state_aborted +}; + +struct xb_req_data { + struct list_head list; + wait_queue_head_t wq; + struct xsd_sockmsg msg; + enum xsd_sockmsg_type type; + char *body; + const struct kvec *vec; + int num_vecs; + int err; + enum xb_req_state state; + void (*cb)(struct xb_req_data *); + void *par; +}; + extern enum xenstore_init xen_store_domain_type; extern const struct attribute_group *xenbus_dev_groups[]; +extern struct mutex xs_response_mutex; +extern struct list_head xs_reply_list; +extern struct list_head xb_write_list; +extern wait_queue_head_t xb_waitq; +extern struct mutex xb_write_mutex; int xs_init(void); int xb_init_comms(void); void xb_deinit_comms(void); -int xb_write(const void *data, unsigned int len); -int xb_read(void *data, unsigned int len); -int xb_data_to_read(void); -int xb_wait_for_data_to_read(void); +int xs_watch_msg(struct xs_watch_event *event); +void xs_request_exit(struct xb_req_data *req); int xenbus_match(struct device *_dev, struct device_driver *_drv); int xenbus_dev_probe(struct device *_dev); @@ -92,6 +129,7 @@ int xenbus_read_otherend_details(struct xenbus_device *xendev, void xenbus_ring_ops_init(void); -void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg); +int xenbus_dev_request_and_reply(struct xsd_sockmsg *msg, void *par); +void xenbus_dev_queue_reply(struct xb_req_data *req); #endif diff --git a/drivers/xen/xenbus/xenbus_comms.c b/drivers/xen/xenbus/xenbus_comms.c index c21ec02643e1ce760dabac846381ce17f0bab8d2..856ada5d39c9764ca6d741544961cc0f81a5afa0 100644 --- a/drivers/xen/xenbus/xenbus_comms.c +++ b/drivers/xen/xenbus/xenbus_comms.c @@ -34,6 +34,7 @@ #include #include +#include #include #include #include @@ -42,11 +43,22 @@ #include #include "xenbus.h" +/* A list of replies. Currently only one will ever be outstanding. */ +LIST_HEAD(xs_reply_list); + +/* A list of write requests. */ +LIST_HEAD(xb_write_list); +DECLARE_WAIT_QUEUE_HEAD(xb_waitq); +DEFINE_MUTEX(xb_write_mutex); + +/* Protect xenbus reader thread against save/restore. */ +DEFINE_MUTEX(xs_response_mutex); + static int xenbus_irq; +static struct task_struct *xenbus_task; static DECLARE_WORK(probe_work, xenbus_probe); -static DECLARE_WAIT_QUEUE_HEAD(xb_waitq); static irqreturn_t wake_waiting(int irq, void *unused) { @@ -84,30 +96,31 @@ static const void *get_input_chunk(XENSTORE_RING_IDX cons, return buf + MASK_XENSTORE_IDX(cons); } +static int xb_data_to_write(void) +{ + struct xenstore_domain_interface *intf = xen_store_interface; + + return (intf->req_prod - intf->req_cons) != XENSTORE_RING_SIZE && + !list_empty(&xb_write_list); +} + /** * xb_write - low level write * @data: buffer to send * @len: length of buffer * - * Returns 0 on success, error otherwise. + * Returns number of bytes written or -err. */ -int xb_write(const void *data, unsigned len) +static int xb_write(const void *data, unsigned int len) { struct xenstore_domain_interface *intf = xen_store_interface; XENSTORE_RING_IDX cons, prod; - int rc; + unsigned int bytes = 0; while (len != 0) { void *dst; unsigned int avail; - rc = wait_event_interruptible( - xb_waitq, - (intf->req_prod - intf->req_cons) != - XENSTORE_RING_SIZE); - if (rc < 0) - return rc; - /* Read indexes, then verify. */ cons = intf->req_cons; prod = intf->req_prod; @@ -115,6 +128,11 @@ int xb_write(const void *data, unsigned len) intf->req_cons = intf->req_prod = 0; return -EIO; } + if (!xb_data_to_write()) + return bytes; + + /* Must write data /after/ reading the consumer index. */ + virt_mb(); dst = get_output_chunk(cons, prod, intf->req, &avail); if (avail == 0) @@ -122,52 +140,45 @@ int xb_write(const void *data, unsigned len) if (avail > len) avail = len; - /* Must write data /after/ reading the consumer index. */ - virt_mb(); - memcpy(dst, data, avail); data += avail; len -= avail; + bytes += avail; /* Other side must not see new producer until data is there. */ virt_wmb(); intf->req_prod += avail; /* Implies mb(): other side will see the updated producer. */ - notify_remote_via_evtchn(xen_store_evtchn); + if (prod <= intf->req_cons) + notify_remote_via_evtchn(xen_store_evtchn); } - return 0; + return bytes; } -int xb_data_to_read(void) +static int xb_data_to_read(void) { struct xenstore_domain_interface *intf = xen_store_interface; return (intf->rsp_cons != intf->rsp_prod); } -int xb_wait_for_data_to_read(void) -{ - return wait_event_interruptible(xb_waitq, xb_data_to_read()); -} - -int xb_read(void *data, unsigned len) +static int xb_read(void *data, unsigned int len) { struct xenstore_domain_interface *intf = xen_store_interface; XENSTORE_RING_IDX cons, prod; - int rc; + unsigned int bytes = 0; while (len != 0) { unsigned int avail; const char *src; - rc = xb_wait_for_data_to_read(); - if (rc < 0) - return rc; - /* Read indexes, then verify. */ cons = intf->rsp_cons; prod = intf->rsp_prod; + if (cons == prod) + return bytes; + if (!check_indexes(cons, prod)) { intf->rsp_cons = intf->rsp_prod = 0; return -EIO; @@ -185,17 +196,243 @@ int xb_read(void *data, unsigned len) memcpy(data, src, avail); data += avail; len -= avail; + bytes += avail; /* Other side must not see free space until we've copied out */ virt_mb(); intf->rsp_cons += avail; - pr_debug("Finished read of %i bytes (%i to go)\n", avail, len); - /* Implies mb(): other side will see the updated consumer. */ - notify_remote_via_evtchn(xen_store_evtchn); + if (intf->rsp_prod - cons >= XENSTORE_RING_SIZE) + notify_remote_via_evtchn(xen_store_evtchn); + } + + return bytes; +} + +static int process_msg(void) +{ + static struct { + struct xsd_sockmsg msg; + char *body; + union { + void *alloc; + struct xs_watch_event *watch; + }; + bool in_msg; + bool in_hdr; + unsigned int read; + } state; + struct xb_req_data *req; + int err; + unsigned int len; + + if (!state.in_msg) { + state.in_msg = true; + state.in_hdr = true; + state.read = 0; + + /* + * We must disallow save/restore while reading a message. + * A partial read across s/r leaves us out of sync with + * xenstored. + * xs_response_mutex is locked as long as we are processing one + * message. state.in_msg will be true as long as we are holding + * the lock here. + */ + mutex_lock(&xs_response_mutex); + + if (!xb_data_to_read()) { + /* We raced with save/restore: pending data 'gone'. */ + mutex_unlock(&xs_response_mutex); + state.in_msg = false; + return 0; + } + } + + if (state.in_hdr) { + if (state.read != sizeof(state.msg)) { + err = xb_read((void *)&state.msg + state.read, + sizeof(state.msg) - state.read); + if (err < 0) + goto out; + state.read += err; + if (state.read != sizeof(state.msg)) + return 0; + if (state.msg.len > XENSTORE_PAYLOAD_MAX) { + err = -EINVAL; + goto out; + } + } + + len = state.msg.len + 1; + if (state.msg.type == XS_WATCH_EVENT) + len += sizeof(*state.watch); + + state.alloc = kmalloc(len, GFP_NOIO | __GFP_HIGH); + if (!state.alloc) + return -ENOMEM; + + if (state.msg.type == XS_WATCH_EVENT) + state.body = state.watch->body; + else + state.body = state.alloc; + state.in_hdr = false; + state.read = 0; + } + + err = xb_read(state.body + state.read, state.msg.len - state.read); + if (err < 0) + goto out; + + state.read += err; + if (state.read != state.msg.len) + return 0; + + state.body[state.msg.len] = '\0'; + + if (state.msg.type == XS_WATCH_EVENT) { + state.watch->len = state.msg.len; + err = xs_watch_msg(state.watch); + } else { + err = -ENOENT; + mutex_lock(&xb_write_mutex); + list_for_each_entry(req, &xs_reply_list, list) { + if (req->msg.req_id == state.msg.req_id) { + if (req->state == xb_req_state_wait_reply) { + req->msg.type = state.msg.type; + req->msg.len = state.msg.len; + req->body = state.body; + req->state = xb_req_state_got_reply; + list_del(&req->list); + req->cb(req); + } else { + list_del(&req->list); + kfree(req); + } + err = 0; + break; + } + } + mutex_unlock(&xb_write_mutex); + if (err) + goto out; } + mutex_unlock(&xs_response_mutex); + + state.in_msg = false; + state.alloc = NULL; + return err; + + out: + mutex_unlock(&xs_response_mutex); + state.in_msg = false; + kfree(state.alloc); + state.alloc = NULL; + return err; +} + +static int process_writes(void) +{ + static struct { + struct xb_req_data *req; + int idx; + unsigned int written; + } state; + void *base; + unsigned int len; + int err = 0; + + if (!xb_data_to_write()) + return 0; + + mutex_lock(&xb_write_mutex); + + if (!state.req) { + state.req = list_first_entry(&xb_write_list, + struct xb_req_data, list); + state.idx = -1; + state.written = 0; + } + + if (state.req->state == xb_req_state_aborted) + goto out_err; + + while (state.idx < state.req->num_vecs) { + if (state.idx < 0) { + base = &state.req->msg; + len = sizeof(state.req->msg); + } else { + base = state.req->vec[state.idx].iov_base; + len = state.req->vec[state.idx].iov_len; + } + err = xb_write(base + state.written, len - state.written); + if (err < 0) + goto out_err; + state.written += err; + if (state.written != len) + goto out; + + state.idx++; + state.written = 0; + } + + list_del(&state.req->list); + state.req->state = xb_req_state_wait_reply; + list_add_tail(&state.req->list, &xs_reply_list); + state.req = NULL; + + out: + mutex_unlock(&xb_write_mutex); + + return 0; + + out_err: + state.req->msg.type = XS_ERROR; + state.req->err = err; + list_del(&state.req->list); + if (state.req->state == xb_req_state_aborted) + kfree(state.req); + else { + state.req->state = xb_req_state_got_reply; + wake_up(&state.req->wq); + } + + mutex_unlock(&xb_write_mutex); + + state.req = NULL; + + return err; +} + +static int xb_thread_work(void) +{ + return xb_data_to_read() || xb_data_to_write(); +} + +static int xenbus_thread(void *unused) +{ + int err; + + while (!kthread_should_stop()) { + if (wait_event_interruptible(xb_waitq, xb_thread_work())) + continue; + + err = process_msg(); + if (err == -ENOMEM) + schedule(); + else if (err) + pr_warn_ratelimited("error %d while reading message\n", + err); + + err = process_writes(); + if (err) + pr_warn_ratelimited("error %d while writing message\n", + err); + } + + xenbus_task = NULL; return 0; } @@ -223,6 +460,7 @@ int xb_init_comms(void) rebind_evtchn_irq(xen_store_evtchn, xenbus_irq); } else { int err; + err = bind_evtchn_to_irqhandler(xen_store_evtchn, wake_waiting, 0, "xenbus", &xb_waitq); if (err < 0) { @@ -231,6 +469,13 @@ int xb_init_comms(void) } xenbus_irq = err; + + if (!xenbus_task) { + xenbus_task = kthread_run(xenbus_thread, NULL, + "xenbus"); + if (IS_ERR(xenbus_task)) + return PTR_ERR(xenbus_task); + } } return 0; diff --git a/drivers/xen/xenbus/xenbus_dev_frontend.c b/drivers/xen/xenbus/xenbus_dev_frontend.c index e4b98477750771e52cf6d53d45810321e22dbaeb..4d343eed08f51e1a3d2a0628dccb256b95858fae 100644 --- a/drivers/xen/xenbus/xenbus_dev_frontend.c +++ b/drivers/xen/xenbus/xenbus_dev_frontend.c @@ -113,6 +113,7 @@ struct xenbus_file_priv { struct list_head read_buffers; wait_queue_head_t read_waitq; + struct kref kref; }; /* Read out any raw xenbus messages queued up. */ @@ -297,6 +298,107 @@ static void watch_fired(struct xenbus_watch *watch, mutex_unlock(&adap->dev_data->reply_mutex); } +static void xenbus_file_free(struct kref *kref) +{ + struct xenbus_file_priv *u; + struct xenbus_transaction_holder *trans, *tmp; + struct watch_adapter *watch, *tmp_watch; + struct read_buffer *rb, *tmp_rb; + + u = container_of(kref, struct xenbus_file_priv, kref); + + /* + * No need for locking here because there are no other users, + * by definition. + */ + + list_for_each_entry_safe(trans, tmp, &u->transactions, list) { + xenbus_transaction_end(trans->handle, 1); + list_del(&trans->list); + kfree(trans); + } + + list_for_each_entry_safe(watch, tmp_watch, &u->watches, list) { + unregister_xenbus_watch(&watch->watch); + list_del(&watch->list); + free_watch_adapter(watch); + } + + list_for_each_entry_safe(rb, tmp_rb, &u->read_buffers, list) { + list_del(&rb->list); + kfree(rb); + } + kfree(u); +} + +static struct xenbus_transaction_holder *xenbus_get_transaction( + struct xenbus_file_priv *u, uint32_t tx_id) +{ + struct xenbus_transaction_holder *trans; + + list_for_each_entry(trans, &u->transactions, list) + if (trans->handle.id == tx_id) + return trans; + + return NULL; +} + +void xenbus_dev_queue_reply(struct xb_req_data *req) +{ + struct xenbus_file_priv *u = req->par; + struct xenbus_transaction_holder *trans = NULL; + int rc; + LIST_HEAD(staging_q); + + xs_request_exit(req); + + mutex_lock(&u->msgbuffer_mutex); + + if (req->type == XS_TRANSACTION_START) { + trans = xenbus_get_transaction(u, 0); + if (WARN_ON(!trans)) + goto out; + if (req->msg.type == XS_ERROR) { + list_del(&trans->list); + kfree(trans); + } else { + rc = kstrtou32(req->body, 10, &trans->handle.id); + if (WARN_ON(rc)) + goto out; + } + } else if (req->msg.type == XS_TRANSACTION_END) { + trans = xenbus_get_transaction(u, req->msg.tx_id); + if (WARN_ON(!trans)) + goto out; + list_del(&trans->list); + kfree(trans); + } + + mutex_unlock(&u->msgbuffer_mutex); + + mutex_lock(&u->reply_mutex); + rc = queue_reply(&staging_q, &req->msg, sizeof(req->msg)); + if (!rc) + rc = queue_reply(&staging_q, req->body, req->msg.len); + if (!rc) { + list_splice_tail(&staging_q, &u->read_buffers); + wake_up(&u->read_waitq); + } else { + queue_cleanup(&staging_q); + } + mutex_unlock(&u->reply_mutex); + + kfree(req->body); + kfree(req); + + kref_put(&u->kref, xenbus_file_free); + + return; + + out: + mutex_unlock(&u->msgbuffer_mutex); +} + static int xenbus_command_reply(struct xenbus_file_priv *u, unsigned int msg_type, const char *reply) { @@ -317,6 +419,9 @@ static int xenbus_command_reply(struct xenbus_file_priv *u, wake_up(&u->read_waitq); mutex_unlock(&u->reply_mutex); + if (!rc) + kref_put(&u->kref, xenbus_file_free); + return rc; } @@ -324,57 +429,22 @@ static int xenbus_write_transaction(unsigned msg_type, struct xenbus_file_priv *u) { int rc; - void *reply; struct xenbus_transaction_holder *trans = NULL; - LIST_HEAD(staging_q); if (msg_type == XS_TRANSACTION_START) { - trans = kmalloc(sizeof(*trans), GFP_KERNEL); + trans = kzalloc(sizeof(*trans), GFP_KERNEL); if (!trans) { rc = -ENOMEM; goto out; } - } else if (u->u.msg.tx_id != 0) { - list_for_each_entry(trans, &u->transactions, list) - if (trans->handle.id == u->u.msg.tx_id) - break; - if (&trans->list == &u->transactions) - return xenbus_command_reply(u, XS_ERROR, "ENOENT"); - } - - reply = xenbus_dev_request_and_reply(&u->u.msg); - if (IS_ERR(reply)) { - if (msg_type == XS_TRANSACTION_START) - kfree(trans); - rc = PTR_ERR(reply); - goto out; - } + list_add(&trans->list, &u->transactions); + } else if (u->u.msg.tx_id != 0 && + !xenbus_get_transaction(u, u->u.msg.tx_id)) + return xenbus_command_reply(u, XS_ERROR, "ENOENT"); - if (msg_type == XS_TRANSACTION_START) { - if (u->u.msg.type == XS_ERROR) - kfree(trans); - else { - trans->handle.id = simple_strtoul(reply, NULL, 0); - list_add(&trans->list, &u->transactions); - } - } else if (u->u.msg.type == XS_TRANSACTION_END) { - list_del(&trans->list); + rc = xenbus_dev_request_and_reply(&u->u.msg, u); + if (rc) kfree(trans); - } - - mutex_lock(&u->reply_mutex); - rc = queue_reply(&staging_q, &u->u.msg, sizeof(u->u.msg)); - if (!rc) - rc = queue_reply(&staging_q, reply, u->u.msg.len); - if (!rc) { - list_splice_tail(&staging_q, &u->read_buffers); - wake_up(&u->read_waitq); - } else { - queue_cleanup(&staging_q); - } - mutex_unlock(&u->reply_mutex); - - kfree(reply); out: return rc; @@ -506,6 +576,8 @@ static ssize_t xenbus_file_write(struct file *filp, * OK, now we have a complete message. Do something with it. */ + kref_get(&u->kref); + msg_type = u->u.msg.type; switch (msg_type) { @@ -520,8 +592,10 @@ static ssize_t xenbus_file_write(struct file *filp, ret = xenbus_write_transaction(msg_type, u); break; } - if (ret != 0) + if (ret != 0) { rc = ret; + kref_put(&u->kref, xenbus_file_free); + } /* Buffered message consumed */ u->len = 0; @@ -546,6 +620,8 @@ static int xenbus_file_open(struct inode *inode, struct file *filp) if (u == NULL) return -ENOMEM; + kref_init(&u->kref); + INIT_LIST_HEAD(&u->transactions); INIT_LIST_HEAD(&u->watches); INIT_LIST_HEAD(&u->read_buffers); @@ -562,32 +638,8 @@ static int xenbus_file_open(struct inode *inode, struct file *filp) static int xenbus_file_release(struct inode *inode, struct file *filp) { struct xenbus_file_priv *u = filp->private_data; - struct xenbus_transaction_holder *trans, *tmp; - struct watch_adapter *watch, *tmp_watch; - struct read_buffer *rb, *tmp_rb; - - /* - * No need for locking here because there are no other users, - * by definition. - */ - list_for_each_entry_safe(trans, tmp, &u->transactions, list) { - xenbus_transaction_end(trans->handle, 1); - list_del(&trans->list); - kfree(trans); - } - - list_for_each_entry_safe(watch, tmp_watch, &u->watches, list) { - unregister_xenbus_watch(&watch->watch); - list_del(&watch->list); - free_watch_adapter(watch); - } - - list_for_each_entry_safe(rb, tmp_rb, &u->read_buffers, list) { - list_del(&rb->list); - kfree(rb); - } - kfree(u); + kref_put(&u->kref, xenbus_file_free); return 0; } diff --git a/drivers/xen/xenbus/xenbus_xs.c b/drivers/xen/xenbus/xenbus_xs.c index ebc768f44abe56f2f6d32b66e668b79a77941a91..e460802149555b6f0d5def7659d8e8207828eb53 100644 --- a/drivers/xen/xenbus/xenbus_xs.c +++ b/drivers/xen/xenbus/xenbus_xs.c @@ -43,6 +43,7 @@ #include #include #include +#include #include #include #include @@ -50,61 +51,28 @@ #include #include "xenbus.h" -struct xs_stored_msg { - struct list_head list; - - struct xsd_sockmsg hdr; - - union { - /* Queued replies. */ - struct { - char *body; - } reply; - - /* Queued watch events. */ - struct { - struct xenbus_watch *handle; - const char *path; - const char *token; - } watch; - } u; -}; +/* + * Framework to protect suspend/resume handling against normal Xenstore + * message handling: + * During suspend/resume there must be no open transaction and no pending + * Xenstore request. + * New watch events happening in this time can be ignored by firing all watches + * after resume. + */ -struct xs_handle { - /* A list of replies. Currently only one will ever be outstanding. */ - struct list_head reply_list; - spinlock_t reply_lock; - wait_queue_head_t reply_waitq; - - /* - * Mutex ordering: transaction_mutex -> watch_mutex -> request_mutex. - * response_mutex is never taken simultaneously with the other three. - * - * transaction_mutex must be held before incrementing - * transaction_count. The mutex is held when a suspend is in - * progress to prevent new transactions starting. - * - * When decrementing transaction_count to zero the wait queue - * should be woken up, the suspend code waits for count to - * reach zero. - */ - - /* One request at a time. */ - struct mutex request_mutex; - - /* Protect xenbus reader thread against save/restore. */ - struct mutex response_mutex; - - /* Protect transactions against save/restore. */ - struct mutex transaction_mutex; - atomic_t transaction_count; - wait_queue_head_t transaction_wq; - - /* Protect watch (de)register against save/restore. */ - struct rw_semaphore watch_mutex; -}; +/* Lock protecting enter/exit critical region. */ +static DEFINE_SPINLOCK(xs_state_lock); +/* Number of users in critical region (protected by xs_state_lock). */ +static unsigned int xs_state_users; +/* Suspend handler waiting or already active (protected by xs_state_lock)? */ +static int xs_suspend_active; +/* Unique Xenstore request id (protected by xs_state_lock). */ +static uint32_t xs_request_id; -static struct xs_handle xs_state; +/* Wait queue for all callers waiting for critical region to become usable. */ +static DECLARE_WAIT_QUEUE_HEAD(xs_state_enter_wq); +/* Wait queue for suspend handling waiting for critical region being empty. */ +static DECLARE_WAIT_QUEUE_HEAD(xs_state_exit_wq); /* List of registered watches, and a lock to protect it. */ static LIST_HEAD(watches); @@ -114,6 +82,9 @@ static DEFINE_SPINLOCK(watches_lock); static LIST_HEAD(watch_events); static DEFINE_SPINLOCK(watch_events_lock); +/* Protect watch (de)register against save/restore. */ +static DECLARE_RWSEM(xs_watch_rwsem); + /* * Details of the xenwatch callback kernel thread. The thread waits on the * watch_events_waitq for work to do (queued on watch_events list). When it @@ -124,6 +95,59 @@ static pid_t xenwatch_pid; static DEFINE_MUTEX(xenwatch_mutex); static DECLARE_WAIT_QUEUE_HEAD(watch_events_waitq); +static void xs_suspend_enter(void) +{ + spin_lock(&xs_state_lock); + xs_suspend_active++; + spin_unlock(&xs_state_lock); + wait_event(xs_state_exit_wq, xs_state_users == 0); +} + +static void xs_suspend_exit(void) +{ + spin_lock(&xs_state_lock); + xs_suspend_active--; + spin_unlock(&xs_state_lock); + wake_up_all(&xs_state_enter_wq); +} + +static uint32_t xs_request_enter(struct xb_req_data *req) +{ + uint32_t rq_id; + + req->type = req->msg.type; + + spin_lock(&xs_state_lock); + + while (!xs_state_users && xs_suspend_active) { + spin_unlock(&xs_state_lock); + wait_event(xs_state_enter_wq, xs_suspend_active == 0); + spin_lock(&xs_state_lock); + } + + if (req->type == XS_TRANSACTION_START) + xs_state_users++; + xs_state_users++; + rq_id = xs_request_id++; + + spin_unlock(&xs_state_lock); + + return rq_id; +} + +void xs_request_exit(struct xb_req_data *req) +{ + spin_lock(&xs_state_lock); + xs_state_users--; + if ((req->type == XS_TRANSACTION_START && req->msg.type == XS_ERROR) || + req->type == XS_TRANSACTION_END) + xs_state_users--; + spin_unlock(&xs_state_lock); + + if (xs_suspend_active && !xs_state_users) + wake_up(&xs_state_exit_wq); +} + static int get_error(const char *errorstring) { unsigned int i; @@ -161,21 +185,24 @@ static bool xenbus_ok(void) } return false; } -static void *read_reply(enum xsd_sockmsg_type *type, unsigned int *len) + +static bool test_reply(struct xb_req_data *req) { - struct xs_stored_msg *msg; - char *body; + if (req->state == xb_req_state_got_reply || !xenbus_ok()) + return true; - spin_lock(&xs_state.reply_lock); + /* Make sure to reread req->state each time. */ + barrier(); - while (list_empty(&xs_state.reply_list)) { - spin_unlock(&xs_state.reply_lock); - if (xenbus_ok()) - /* XXX FIXME: Avoid synchronous wait for response here. */ - wait_event_timeout(xs_state.reply_waitq, - !list_empty(&xs_state.reply_list), - msecs_to_jiffies(500)); - else { + return false; +} + +static void *read_reply(struct xb_req_data *req) +{ + while (req->state != xb_req_state_got_reply) { + wait_event(req->wq, test_reply(req)); + + if (!xenbus_ok()) /* * If we are in the process of being shut-down there is * no point of trying to contact XenBus - it is either @@ -183,76 +210,82 @@ static void *read_reply(enum xsd_sockmsg_type *type, unsigned int *len) * has been killed or is unreachable. */ return ERR_PTR(-EIO); - } - spin_lock(&xs_state.reply_lock); + if (req->err) + return ERR_PTR(req->err); + } - msg = list_entry(xs_state.reply_list.next, - struct xs_stored_msg, list); - list_del(&msg->list); + return req->body; +} - spin_unlock(&xs_state.reply_lock); +static void xs_send(struct xb_req_data *req, struct xsd_sockmsg *msg) +{ + bool notify; - *type = msg->hdr.type; - if (len) - *len = msg->hdr.len; - body = msg->u.reply.body; + req->msg = *msg; + req->err = 0; + req->state = xb_req_state_queued; + init_waitqueue_head(&req->wq); - kfree(msg); + req->msg.req_id = xs_request_enter(req); - return body; -} + mutex_lock(&xb_write_mutex); + list_add_tail(&req->list, &xb_write_list); + notify = list_is_singular(&xb_write_list); + mutex_unlock(&xb_write_mutex); -static void transaction_start(void) -{ - mutex_lock(&xs_state.transaction_mutex); - atomic_inc(&xs_state.transaction_count); - mutex_unlock(&xs_state.transaction_mutex); + if (notify) + wake_up(&xb_waitq); } -static void transaction_end(void) +static void *xs_wait_for_reply(struct xb_req_data *req, struct xsd_sockmsg *msg) { - if (atomic_dec_and_test(&xs_state.transaction_count)) - wake_up(&xs_state.transaction_wq); -} + void *ret; -static void transaction_suspend(void) -{ - mutex_lock(&xs_state.transaction_mutex); - wait_event(xs_state.transaction_wq, - atomic_read(&xs_state.transaction_count) == 0); + ret = read_reply(req); + + xs_request_exit(req); + + msg->type = req->msg.type; + msg->len = req->msg.len; + + mutex_lock(&xb_write_mutex); + if (req->state == xb_req_state_queued || + req->state == xb_req_state_wait_reply) + req->state = xb_req_state_aborted; + else + kfree(req); + mutex_unlock(&xb_write_mutex); + + return ret; } -static void transaction_resume(void) +static void xs_wake_up(struct xb_req_data *req) { - mutex_unlock(&xs_state.transaction_mutex); + wake_up(&req->wq); } -void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg) +int xenbus_dev_request_and_reply(struct xsd_sockmsg *msg, void *par) { - void *ret; - enum xsd_sockmsg_type type = msg->type; - int err; + struct xb_req_data *req; + struct kvec *vec; - if (type == XS_TRANSACTION_START) - transaction_start(); - - mutex_lock(&xs_state.request_mutex); + req = kmalloc(sizeof(*req) + sizeof(*vec), GFP_KERNEL); + if (!req) + return -ENOMEM; - err = xb_write(msg, sizeof(*msg) + msg->len); - if (err) { - msg->type = XS_ERROR; - ret = ERR_PTR(err); - } else - ret = read_reply(&msg->type, &msg->len); + vec = (struct kvec *)(req + 1); + vec->iov_len = msg->len; + vec->iov_base = msg + 1; - mutex_unlock(&xs_state.request_mutex); + req->vec = vec; + req->num_vecs = 1; + req->cb = xenbus_dev_queue_reply; + req->par = par; - if ((msg->type == XS_TRANSACTION_END) || - ((type == XS_TRANSACTION_START) && (msg->type == XS_ERROR))) - transaction_end(); + xs_send(req, msg); - return ret; + return 0; } EXPORT_SYMBOL(xenbus_dev_request_and_reply); @@ -263,37 +296,31 @@ static void *xs_talkv(struct xenbus_transaction t, unsigned int num_vecs, unsigned int *len) { + struct xb_req_data *req; struct xsd_sockmsg msg; void *ret = NULL; unsigned int i; int err; + req = kmalloc(sizeof(*req), GFP_NOIO | __GFP_HIGH); + if (!req) + return ERR_PTR(-ENOMEM); + + req->vec = iovec; + req->num_vecs = num_vecs; + req->cb = xs_wake_up; + msg.tx_id = t.id; - msg.req_id = 0; msg.type = type; msg.len = 0; for (i = 0; i < num_vecs; i++) msg.len += iovec[i].iov_len; - mutex_lock(&xs_state.request_mutex); - - err = xb_write(&msg, sizeof(msg)); - if (err) { - mutex_unlock(&xs_state.request_mutex); - return ERR_PTR(err); - } - - for (i = 0; i < num_vecs; i++) { - err = xb_write(iovec[i].iov_base, iovec[i].iov_len); - if (err) { - mutex_unlock(&xs_state.request_mutex); - return ERR_PTR(err); - } - } - - ret = read_reply(&msg.type, len); + xs_send(req, &msg); - mutex_unlock(&xs_state.request_mutex); + ret = xs_wait_for_reply(req, &msg); + if (len) + *len = msg.len; if (IS_ERR(ret)) return ret; @@ -500,13 +527,9 @@ int xenbus_transaction_start(struct xenbus_transaction *t) { char *id_str; - transaction_start(); - id_str = xs_single(XBT_NIL, XS_TRANSACTION_START, "", NULL); - if (IS_ERR(id_str)) { - transaction_end(); + if (IS_ERR(id_str)) return PTR_ERR(id_str); - } t->id = simple_strtoul(id_str, NULL, 0); kfree(id_str); @@ -520,18 +543,13 @@ EXPORT_SYMBOL_GPL(xenbus_transaction_start); int xenbus_transaction_end(struct xenbus_transaction t, int abort) { char abortstr[2]; - int err; if (abort) strcpy(abortstr, "F"); else strcpy(abortstr, "T"); - err = xs_error(xs_single(t, XS_TRANSACTION_END, abortstr, NULL)); - - transaction_end(); - - return err; + return xs_error(xs_single(t, XS_TRANSACTION_END, abortstr, NULL)); } EXPORT_SYMBOL_GPL(xenbus_transaction_end); @@ -664,6 +682,30 @@ static struct xenbus_watch *find_watch(const char *token) return NULL; } + +int xs_watch_msg(struct xs_watch_event *event) +{ + if (count_strings(event->body, event->len) != 2) { + kfree(event); + return -EINVAL; + } + event->path = (const char *)event->body; + event->token = (const char *)strchr(event->body, '\0') + 1; + + spin_lock(&watches_lock); + event->handle = find_watch(event->token); + if (event->handle != NULL) { + spin_lock(&watch_events_lock); + list_add_tail(&event->list, &watch_events); + wake_up(&watch_events_waitq); + spin_unlock(&watch_events_lock); + } else + kfree(event); + spin_unlock(&watches_lock); + + return 0; +} + /* * Certain older XenBus toolstack cannot handle reading values that are * not populated. Some Xen 3.4 installation are incapable of doing this @@ -712,7 +754,7 @@ int register_xenbus_watch(struct xenbus_watch *watch) sprintf(token, "%lX", (long)watch); - down_read(&xs_state.watch_mutex); + down_read(&xs_watch_rwsem); spin_lock(&watches_lock); BUG_ON(find_watch(token)); @@ -727,7 +769,7 @@ int register_xenbus_watch(struct xenbus_watch *watch) spin_unlock(&watches_lock); } - up_read(&xs_state.watch_mutex); + up_read(&xs_watch_rwsem); return err; } @@ -735,13 +777,13 @@ EXPORT_SYMBOL_GPL(register_xenbus_watch); void unregister_xenbus_watch(struct xenbus_watch *watch) { - struct xs_stored_msg *msg, *tmp; + struct xs_watch_event *event, *tmp; char token[sizeof(watch) * 2 + 1]; int err; sprintf(token, "%lX", (long)watch); - down_read(&xs_state.watch_mutex); + down_read(&xs_watch_rwsem); spin_lock(&watches_lock); BUG_ON(!find_watch(token)); @@ -752,7 +794,7 @@ void unregister_xenbus_watch(struct xenbus_watch *watch) if (err) pr_warn("Failed to release watch %s: %i\n", watch->node, err); - up_read(&xs_state.watch_mutex); + up_read(&xs_watch_rwsem); /* Make sure there are no callbacks running currently (unless its us) */ @@ -761,12 +803,11 @@ void unregister_xenbus_watch(struct xenbus_watch *watch) /* Cancel pending watch events. */ spin_lock(&watch_events_lock); - list_for_each_entry_safe(msg, tmp, &watch_events, list) { - if (msg->u.watch.handle != watch) + list_for_each_entry_safe(event, tmp, &watch_events, list) { + if (event->handle != watch) continue; - list_del(&msg->list); - kfree(msg->u.watch.path); - kfree(msg); + list_del(&event->list); + kfree(event); } spin_unlock(&watch_events_lock); @@ -777,10 +818,10 @@ EXPORT_SYMBOL_GPL(unregister_xenbus_watch); void xs_suspend(void) { - transaction_suspend(); - down_write(&xs_state.watch_mutex); - mutex_lock(&xs_state.request_mutex); - mutex_lock(&xs_state.response_mutex); + xs_suspend_enter(); + + down_write(&xs_watch_rwsem); + mutex_lock(&xs_response_mutex); } void xs_resume(void) @@ -790,31 +831,31 @@ void xs_resume(void) xb_init_comms(); - mutex_unlock(&xs_state.response_mutex); - mutex_unlock(&xs_state.request_mutex); - transaction_resume(); + mutex_unlock(&xs_response_mutex); + + xs_suspend_exit(); - /* No need for watches_lock: the watch_mutex is sufficient. */ + /* No need for watches_lock: the xs_watch_rwsem is sufficient. */ list_for_each_entry(watch, &watches, list) { sprintf(token, "%lX", (long)watch); xs_watch(watch->node, token); } - up_write(&xs_state.watch_mutex); + up_write(&xs_watch_rwsem); } void xs_suspend_cancel(void) { - mutex_unlock(&xs_state.response_mutex); - mutex_unlock(&xs_state.request_mutex); - up_write(&xs_state.watch_mutex); - mutex_unlock(&xs_state.transaction_mutex); + mutex_unlock(&xs_response_mutex); + up_write(&xs_watch_rwsem); + + xs_suspend_exit(); } static int xenwatch_thread(void *unused) { struct list_head *ent; - struct xs_stored_msg *msg; + struct xs_watch_event *event; for (;;) { wait_event_interruptible(watch_events_waitq, @@ -832,12 +873,10 @@ static int xenwatch_thread(void *unused) spin_unlock(&watch_events_lock); if (ent != &watch_events) { - msg = list_entry(ent, struct xs_stored_msg, list); - msg->u.watch.handle->callback(msg->u.watch.handle, - msg->u.watch.path, - msg->u.watch.token); - kfree(msg->u.watch.path); - kfree(msg); + event = list_entry(ent, struct xs_watch_event, list); + event->handle->callback(event->handle, event->path, + event->token); + kfree(event); } mutex_unlock(&xenwatch_mutex); @@ -846,126 +885,37 @@ static int xenwatch_thread(void *unused) return 0; } -static int process_msg(void) +/* + * Wake up all threads waiting for a xenstore reply. In case of shutdown all + * pending replies will be marked as "aborted" in order to let the waiters + * return in spite of xenstore possibly no longer being able to reply. This + * will avoid blocking shutdown by a thread waiting for xenstore but being + * necessary for shutdown processing to proceed. + */ +static int xs_reboot_notify(struct notifier_block *nb, + unsigned long code, void *unused) { - struct xs_stored_msg *msg; - char *body; - int err; - - /* - * We must disallow save/restore while reading a xenstore message. - * A partial read across s/r leaves us out of sync with xenstored. - */ - for (;;) { - err = xb_wait_for_data_to_read(); - if (err) - return err; - mutex_lock(&xs_state.response_mutex); - if (xb_data_to_read()) - break; - /* We raced with save/restore: pending data 'disappeared'. */ - mutex_unlock(&xs_state.response_mutex); - } + struct xb_req_data *req; - - msg = kmalloc(sizeof(*msg), GFP_NOIO | __GFP_HIGH); - if (msg == NULL) { - err = -ENOMEM; - goto out; - } - - err = xb_read(&msg->hdr, sizeof(msg->hdr)); - if (err) { - kfree(msg); - goto out; - } - - if (msg->hdr.len > XENSTORE_PAYLOAD_MAX) { - kfree(msg); - err = -EINVAL; - goto out; - } - - body = kmalloc(msg->hdr.len + 1, GFP_NOIO | __GFP_HIGH); - if (body == NULL) { - kfree(msg); - err = -ENOMEM; - goto out; - } - - err = xb_read(body, msg->hdr.len); - if (err) { - kfree(body); - kfree(msg); - goto out; - } - body[msg->hdr.len] = '\0'; - - if (msg->hdr.type == XS_WATCH_EVENT) { - if (count_strings(body, msg->hdr.len) != 2) { - err = -EINVAL; - kfree(msg); - kfree(body); - goto out; - } - msg->u.watch.path = (const char *)body; - msg->u.watch.token = (const char *)strchr(body, '\0') + 1; - - spin_lock(&watches_lock); - msg->u.watch.handle = find_watch(msg->u.watch.token); - if (msg->u.watch.handle != NULL) { - spin_lock(&watch_events_lock); - list_add_tail(&msg->list, &watch_events); - wake_up(&watch_events_waitq); - spin_unlock(&watch_events_lock); - } else { - kfree(body); - kfree(msg); - } - spin_unlock(&watches_lock); - } else { - msg->u.reply.body = body; - spin_lock(&xs_state.reply_lock); - list_add_tail(&msg->list, &xs_state.reply_list); - spin_unlock(&xs_state.reply_lock); - wake_up(&xs_state.reply_waitq); - } - - out: - mutex_unlock(&xs_state.response_mutex); - return err; + mutex_lock(&xb_write_mutex); + list_for_each_entry(req, &xs_reply_list, list) + wake_up(&req->wq); + list_for_each_entry(req, &xb_write_list, list) + wake_up(&req->wq); + mutex_unlock(&xb_write_mutex); + return NOTIFY_DONE; } -static int xenbus_thread(void *unused) -{ - int err; - - for (;;) { - err = process_msg(); - if (err) - pr_warn("error %d while reading message\n", err); - if (kthread_should_stop()) - break; - } - - return 0; -} +static struct notifier_block xs_reboot_nb = { + .notifier_call = xs_reboot_notify, +}; int xs_init(void) { int err; struct task_struct *task; - INIT_LIST_HEAD(&xs_state.reply_list); - spin_lock_init(&xs_state.reply_lock); - init_waitqueue_head(&xs_state.reply_waitq); - - mutex_init(&xs_state.request_mutex); - mutex_init(&xs_state.response_mutex); - mutex_init(&xs_state.transaction_mutex); - init_rwsem(&xs_state.watch_mutex); - atomic_set(&xs_state.transaction_count, 0); - init_waitqueue_head(&xs_state.transaction_wq); + register_reboot_notifier(&xs_reboot_nb); /* Initialize the shared memory rings to talk to xenstored */ err = xb_init_comms(); @@ -977,10 +927,6 @@ int xs_init(void) return PTR_ERR(task); xenwatch_pid = task->pid; - task = kthread_run(xenbus_thread, NULL, "xenbus"); - if (IS_ERR(task)) - return PTR_ERR(task); - /* shutdown watches for kexec boot */ xs_reset_watches();