提交 e548e9b9 编写于 作者: Y Yan, Zheng 提交者: Ilya Dryomov

ceph: re-send flushing caps (which are revoked) in reconnect stage

if flushing caps were revoked, we should re-send the cap flush in
client reconnect stage. This guarantees that MDS processes the cap
flush message before issuing the flushing caps to other client.
Signed-off-by: NYan, Zheng <zyan@redhat.com>
上级 a2971c8c
...@@ -1486,6 +1486,7 @@ static int __mark_caps_flushing(struct inode *inode, ...@@ -1486,6 +1486,7 @@ static int __mark_caps_flushing(struct inode *inode,
cf = kmalloc(sizeof(*cf), GFP_ATOMIC); cf = kmalloc(sizeof(*cf), GFP_ATOMIC);
cf->caps = flushing; cf->caps = flushing;
cf->kick = false;
spin_lock(&mdsc->cap_dirty_lock); spin_lock(&mdsc->cap_dirty_lock);
list_del_init(&ci->i_dirty_item); list_del_init(&ci->i_dirty_item);
...@@ -2101,7 +2102,8 @@ static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc, ...@@ -2101,7 +2102,8 @@ static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc,
static int __kick_flushing_caps(struct ceph_mds_client *mdsc, static int __kick_flushing_caps(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session, struct ceph_mds_session *session,
struct ceph_inode_info *ci) struct ceph_inode_info *ci,
bool kick_all)
{ {
struct inode *inode = &ci->vfs_inode; struct inode *inode = &ci->vfs_inode;
struct ceph_cap *cap; struct ceph_cap *cap;
...@@ -2127,7 +2129,9 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc, ...@@ -2127,7 +2129,9 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc,
for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) { for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) {
cf = rb_entry(n, struct ceph_cap_flush, i_node); cf = rb_entry(n, struct ceph_cap_flush, i_node);
if (cf->tid >= first_tid) if (cf->tid < first_tid)
continue;
if (kick_all || cf->kick)
break; break;
} }
if (!n) { if (!n) {
...@@ -2136,6 +2140,8 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc, ...@@ -2136,6 +2140,8 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc,
} }
cf = rb_entry(n, struct ceph_cap_flush, i_node); cf = rb_entry(n, struct ceph_cap_flush, i_node);
cf->kick = false;
first_tid = cf->tid + 1; first_tid = cf->tid + 1;
dout("kick_flushing_caps %p cap %p tid %llu %s\n", inode, dout("kick_flushing_caps %p cap %p tid %llu %s\n", inode,
...@@ -2149,6 +2155,49 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc, ...@@ -2149,6 +2155,49 @@ static int __kick_flushing_caps(struct ceph_mds_client *mdsc,
return delayed; return delayed;
} }
void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session)
{
struct ceph_inode_info *ci;
struct ceph_cap *cap;
struct ceph_cap_flush *cf;
struct rb_node *n;
dout("early_kick_flushing_caps mds%d\n", session->s_mds);
list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
spin_lock(&ci->i_ceph_lock);
cap = ci->i_auth_cap;
if (!(cap && cap->session == session)) {
pr_err("%p auth cap %p not mds%d ???\n",
&ci->vfs_inode, cap, session->s_mds);
spin_unlock(&ci->i_ceph_lock);
continue;
}
/*
* if flushing caps were revoked, we re-send the cap flush
* in client reconnect stage. This guarantees MDS * processes
* the cap flush message before issuing the flushing caps to
* other client.
*/
if ((cap->issued & ci->i_flushing_caps) !=
ci->i_flushing_caps) {
spin_unlock(&ci->i_ceph_lock);
if (!__kick_flushing_caps(mdsc, session, ci, true))
continue;
spin_lock(&ci->i_ceph_lock);
}
for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) {
cf = rb_entry(n, struct ceph_cap_flush, i_node);
cf->kick = true;
}
spin_unlock(&ci->i_ceph_lock);
}
}
void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session) struct ceph_mds_session *session)
{ {
...@@ -2158,7 +2207,7 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, ...@@ -2158,7 +2207,7 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
dout("kick_flushing_caps mds%d\n", session->s_mds); dout("kick_flushing_caps mds%d\n", session->s_mds);
list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) { list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
int delayed = __kick_flushing_caps(mdsc, session, ci); int delayed = __kick_flushing_caps(mdsc, session, ci, false);
if (delayed) { if (delayed) {
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
__cap_delay_requeue(mdsc, ci); __cap_delay_requeue(mdsc, ci);
...@@ -2191,7 +2240,7 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc, ...@@ -2191,7 +2240,7 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
spin_unlock(&ci->i_ceph_lock); spin_unlock(&ci->i_ceph_lock);
delayed = __kick_flushing_caps(mdsc, session, ci); delayed = __kick_flushing_caps(mdsc, session, ci, true);
if (delayed) { if (delayed) {
spin_lock(&ci->i_ceph_lock); spin_lock(&ci->i_ceph_lock);
__cap_delay_requeue(mdsc, ci); __cap_delay_requeue(mdsc, ci);
......
...@@ -2982,6 +2982,9 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc, ...@@ -2982,6 +2982,9 @@ static void send_mds_reconnect(struct ceph_mds_client *mdsc,
reply->hdr.data_len = cpu_to_le32(pagelist->length); reply->hdr.data_len = cpu_to_le32(pagelist->length);
ceph_msg_data_add_pagelist(reply, pagelist); ceph_msg_data_add_pagelist(reply, pagelist);
ceph_early_kick_flushing_caps(mdsc, session);
ceph_con_send(&session->s_con, reply); ceph_con_send(&session->s_con, reply);
mutex_unlock(&session->s_mutex); mutex_unlock(&session->s_mutex);
......
...@@ -189,9 +189,10 @@ static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap) ...@@ -189,9 +189,10 @@ static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
struct ceph_cap_flush { struct ceph_cap_flush {
u64 tid; u64 tid;
int caps; int caps;
struct rb_node g_node; bool kick;
struct rb_node g_node; // global
union { union {
struct rb_node i_node; struct rb_node i_node; // inode
struct list_head list; struct list_head list;
}; };
}; };
...@@ -868,6 +869,8 @@ extern void ceph_queue_caps_release(struct inode *inode); ...@@ -868,6 +869,8 @@ extern void ceph_queue_caps_release(struct inode *inode);
extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc); extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc);
extern int ceph_fsync(struct file *file, loff_t start, loff_t end, extern int ceph_fsync(struct file *file, loff_t start, loff_t end,
int datasync); int datasync);
extern void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session);
extern void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc, extern void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
struct ceph_mds_session *session); struct ceph_mds_session *session);
extern struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, extern struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册