提交 6194ea89 编写于 作者: S Sage Weil

libceph: resubmit linger ops when pg mapping changes

The linger op registration (i.e., watch) modifies the object state.  As
such, the OSD will reply with success if it has already applied without
doing the associated side-effects (setting up the watch session state).
If we lose the ACK and resubmit, we will see success but the watch will not
be correctly registered and we won't get notifies.

To fix this, always resubmit the linger op with a new tid.  We accomplish
this by re-registering as a linger (i.e., 'registered') if we are not yet
registered.  Then the second loop will treat this just like a normal
case of re-registering.

This mirrors a similar fix on the userland ceph.git, commit 5dd68b95, and
ceph bug #2796.
Signed-off-by: NSage Weil <sage@inktank.com>
Reviewed-by: NAlex Elder <elder@inktank.com>
Reviewed-by: NYehuda Sadeh <yehuda@inktank.com>
上级 8c50c817
...@@ -891,7 +891,9 @@ static void __register_linger_request(struct ceph_osd_client *osdc, ...@@ -891,7 +891,9 @@ static void __register_linger_request(struct ceph_osd_client *osdc,
{ {
dout("__register_linger_request %p\n", req); dout("__register_linger_request %p\n", req);
list_add_tail(&req->r_linger_item, &osdc->req_linger); list_add_tail(&req->r_linger_item, &osdc->req_linger);
list_add_tail(&req->r_linger_osd, &req->r_osd->o_linger_requests); if (req->r_osd)
list_add_tail(&req->r_linger_osd,
&req->r_osd->o_linger_requests);
} }
static void __unregister_linger_request(struct ceph_osd_client *osdc, static void __unregister_linger_request(struct ceph_osd_client *osdc,
...@@ -1305,8 +1307,9 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) ...@@ -1305,8 +1307,9 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
dout("kick_requests %s\n", force_resend ? " (force resend)" : ""); dout("kick_requests %s\n", force_resend ? " (force resend)" : "");
mutex_lock(&osdc->request_mutex); mutex_lock(&osdc->request_mutex);
for (p = rb_first(&osdc->requests); p; p = rb_next(p)) { for (p = rb_first(&osdc->requests); p; ) {
req = rb_entry(p, struct ceph_osd_request, r_node); req = rb_entry(p, struct ceph_osd_request, r_node);
p = rb_next(p);
err = __map_request(osdc, req, force_resend); err = __map_request(osdc, req, force_resend);
if (err < 0) if (err < 0)
continue; /* error */ continue; /* error */
...@@ -1314,12 +1317,25 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend) ...@@ -1314,12 +1317,25 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
dout("%p tid %llu maps to no osd\n", req, req->r_tid); dout("%p tid %llu maps to no osd\n", req, req->r_tid);
needmap++; /* request a newer map */ needmap++; /* request a newer map */
} else if (err > 0) { } else if (err > 0) {
dout("%p tid %llu requeued on osd%d\n", req, req->r_tid, if (!req->r_linger) {
dout("%p tid %llu requeued on osd%d\n", req,
req->r_tid,
req->r_osd ? req->r_osd->o_osd : -1); req->r_osd ? req->r_osd->o_osd : -1);
if (!req->r_linger)
req->r_flags |= CEPH_OSD_FLAG_RETRY; req->r_flags |= CEPH_OSD_FLAG_RETRY;
} }
} }
if (req->r_linger && list_empty(&req->r_linger_item)) {
/*
* register as a linger so that we will
* re-submit below and get a new tid
*/
dout("%p tid %llu restart on osd%d\n",
req, req->r_tid,
req->r_osd ? req->r_osd->o_osd : -1);
__register_linger_request(osdc, req);
__unregister_request(osdc, req);
}
}
list_for_each_entry_safe(req, nreq, &osdc->req_linger, list_for_each_entry_safe(req, nreq, &osdc->req_linger,
r_linger_item) { r_linger_item) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册