diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c index 2e4e9773c46bf998ba62cba4fef6ad4b4da58607..c03b4185c143c3deb74a24baacced90ee760ae80 100644 --- a/fs/ceph/messenger.c +++ b/fs/ceph/messenger.c @@ -316,7 +316,6 @@ static void reset_connection(struct ceph_connection *con) { /* reset connection, out_queue, msg_ and connect_seq */ /* discard existing out_queue and msg_seq */ - mutex_lock(&con->out_mutex); ceph_msg_remove_list(&con->out_queue); ceph_msg_remove_list(&con->out_sent); @@ -332,7 +331,6 @@ static void reset_connection(struct ceph_connection *con) con->out_msg = NULL; } con->in_seq = 0; - mutex_unlock(&con->out_mutex); } /* @@ -343,7 +341,9 @@ void ceph_con_close(struct ceph_connection *con) dout("con_close %p peer %s\n", con, pr_addr(&con->peer_addr.in_addr)); set_bit(CLOSED, &con->state); /* in case there's queued work */ clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ + mutex_lock(&con->mutex); reset_connection(con); + mutex_unlock(&con->mutex); queue_con(con); } @@ -392,7 +392,7 @@ void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con) memset(con, 0, sizeof(*con)); atomic_set(&con->nref, 1); con->msgr = msgr; - mutex_init(&con->out_mutex); + mutex_init(&con->mutex); INIT_LIST_HEAD(&con->out_queue); INIT_LIST_HEAD(&con->out_sent); INIT_DELAYED_WORK(&con->work, con_work); @@ -571,11 +571,13 @@ static void prepare_connect_authorizer(struct ceph_connection *con) int auth_len = 0; int auth_protocol = 0; + mutex_unlock(&con->mutex); if (con->ops->get_authorizer) con->ops->get_authorizer(con, &auth_buf, &auth_len, &auth_protocol, &con->auth_reply_buf, &con->auth_reply_buf_len, con->auth_retry); + mutex_lock(&con->mutex); con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol); con->out_connect.authorizer_len = cpu_to_le32(auth_len); @@ -1094,10 +1096,13 @@ static int process_connect(struct ceph_connection *con) le32_to_cpu(con->out_connect.protocol_version), le32_to_cpu(con->in_reply.protocol_version)); con->error_msg = "protocol version mismatch"; - if (con->ops->bad_proto) - con->ops->bad_proto(con); reset_connection(con); set_bit(CLOSED, &con->state); /* in case there's queued work */ + + mutex_unlock(&con->mutex); + if (con->ops->bad_proto) + con->ops->bad_proto(con); + mutex_lock(&con->mutex); return -1; case CEPH_MSGR_TAG_BADAUTHORIZER: @@ -1133,9 +1138,11 @@ static int process_connect(struct ceph_connection *con) prepare_read_connect(con); /* Tell ceph about it. */ + mutex_unlock(&con->mutex); pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name)); if (con->ops->peer_reset) con->ops->peer_reset(con); + mutex_lock(&con->mutex); break; case CEPH_MSGR_TAG_RETRY_SESSION: @@ -1221,7 +1228,6 @@ static void process_ack(struct ceph_connection *con) u64 ack = le64_to_cpu(con->in_temp_ack); u64 seq; - mutex_lock(&con->out_mutex); while (!list_empty(&con->out_sent)) { m = list_first_entry(&con->out_sent, struct ceph_msg, list_head); @@ -1232,7 +1238,6 @@ static void process_ack(struct ceph_connection *con) le16_to_cpu(m->hdr.type), m); ceph_msg_remove(m); } - mutex_unlock(&con->out_mutex); prepare_read_tag(con); } @@ -1366,8 +1371,10 @@ static int read_partial_message(struct ceph_connection *con) /* find pages for data payload */ want = calc_pages_for(data_off & ~PAGE_MASK, data_len); ret = -1; + mutex_unlock(&con->mutex); if (con->ops->prepare_pages) ret = con->ops->prepare_pages(con, m, want); + mutex_lock(&con->mutex); if (ret < 0) { dout("%p prepare_pages failed, skipping payload\n", m); con->in_base_pos = -data_len - sizeof(m->footer); @@ -1454,9 +1461,8 @@ static void process_message(struct ceph_connection *con) if (con->peer_name.type == 0) con->peer_name = msg->hdr.src.name; - mutex_lock(&con->out_mutex); con->in_seq++; - mutex_unlock(&con->out_mutex); + mutex_unlock(&con->mutex); dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n", msg, le64_to_cpu(msg->hdr.seq), @@ -1467,6 +1473,8 @@ static void process_message(struct ceph_connection *con) le32_to_cpu(msg->hdr.data_len), con->in_front_crc, con->in_middle_crc, con->in_data_crc); con->ops->dispatch(con, msg); + + mutex_lock(&con->mutex); prepare_read_tag(con); } @@ -1483,7 +1491,7 @@ static int try_write(struct ceph_connection *con) dout("try_write start %p state %lu nref %d\n", con, con->state, atomic_read(&con->nref)); - mutex_lock(&con->out_mutex); + mutex_lock(&con->mutex); more: dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); @@ -1576,7 +1584,7 @@ static int try_write(struct ceph_connection *con) done: ret = 0; out: - mutex_unlock(&con->out_mutex); + mutex_unlock(&con->mutex); dout("try_write done on %p\n", con); return ret; } @@ -1600,6 +1608,8 @@ static int try_read(struct ceph_connection *con) dout("try_read start on %p\n", con); msgr = con->msgr; + mutex_lock(&con->mutex); + more: dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, con->in_base_pos); @@ -1693,6 +1703,7 @@ static int try_read(struct ceph_connection *con) done: ret = 0; out: + mutex_unlock(&con->mutex); dout("try_read done on %p\n", con); return ret; @@ -1818,6 +1829,8 @@ static void ceph_fault(struct ceph_connection *con) clear_bit(BUSY, &con->state); /* to avoid an improbable race */ + mutex_lock(&con->mutex); + con_close_socket(con); if (con->in_msg) { @@ -1827,24 +1840,24 @@ static void ceph_fault(struct ceph_connection *con) /* If there are no messages in the queue, place the connection * in a STANDBY state (i.e., don't try to reconnect just yet). */ - mutex_lock(&con->out_mutex); if (list_empty(&con->out_queue) && !con->out_keepalive_pending) { dout("fault setting STANDBY\n"); set_bit(STANDBY, &con->state); - mutex_unlock(&con->out_mutex); + mutex_unlock(&con->mutex); goto out; } /* Requeue anything that hasn't been acked, and retry after a * delay. */ list_splice_init(&con->out_sent, &con->out_queue); - mutex_unlock(&con->out_mutex); if (con->delay == 0) con->delay = BASE_DELAY_INTERVAL; else if (con->delay < MAX_DELAY_INTERVAL) con->delay *= 2; + mutex_unlock(&con->mutex); + /* explicitly schedule work to try to reconnect again later. */ dout("fault queueing %p delay %lu\n", con, con->delay); con->ops->get(con); @@ -1920,7 +1933,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) msg->hdr.dst_erank = con->peer_addr.erank; /* queue */ - mutex_lock(&con->out_mutex); + mutex_lock(&con->mutex); BUG_ON(!list_empty(&msg->list_head)); list_add_tail(&msg->list_head, &con->out_queue); dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg, @@ -1929,7 +1942,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) le32_to_cpu(msg->hdr.front_len), le32_to_cpu(msg->hdr.middle_len), le32_to_cpu(msg->hdr.data_len)); - mutex_unlock(&con->out_mutex); + mutex_unlock(&con->mutex); /* if there wasn't anything waiting to send before, queue * new work */ @@ -1942,7 +1955,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) */ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) { - mutex_lock(&con->out_mutex); + mutex_lock(&con->mutex); if (!list_empty(&msg->list_head)) { dout("con_revoke %p msg %p\n", con, msg); list_del_init(&msg->list_head); @@ -1959,7 +1972,7 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg) } else { dout("con_revoke %p msg %p - not queued (sent?)\n", con, msg); } - mutex_unlock(&con->out_mutex); + mutex_unlock(&con->mutex); } /* diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h index e04c214b4f6f000fb14a7230444656d0ea3f0668..94b55de90331aa29f2f5eb58162e7285da1ff46a 100644 --- a/fs/ceph/messenger.h +++ b/fs/ceph/messenger.h @@ -155,8 +155,9 @@ struct ceph_connection { void *auth_reply_buf; /* where to put the authorizer reply */ int auth_reply_buf_len; + struct mutex mutex; + /* out queue */ - struct mutex out_mutex; struct list_head out_queue; struct list_head out_sent; /* sending or sent but unacked */ u64 out_seq; /* last message queued for send */