提交 4875647a 编写于 作者: D David Teigland

dlm: fixes for nodir mode

The "nodir" mode (statically assign master nodes instead
of using the resource directory) has always been highly
experimental, and never seriously used.  This commit
fixes a number of problems, making nodir much more usable.

- Major change to recovery: recover all locks and restart
  all in-progress operations after recovery.  In some
  cases it's not possible to know which in-progess locks
  to recover, so recover all.  (Most require recovery
  in nodir mode anyway since rehashing changes most
  master nodes.)

- Change the way nodir mode is enabled, from a command
  line mount arg passed through gfs2, into a sysfs
  file managed by dlm_controld, consistent with the
  other config settings.

- Allow recovering MSTCPY locks on an rsb that has not
  yet been turned into a master copy.

- Ignore RCOM_LOCK and RCOM_LOCK_REPLY recovery messages
  from a previous, aborted recovery cycle.  Base this
  on the local recovery status not being in the state
  where any nodes should be sending LOCK messages for the
  current recovery cycle.

- Hold rsb lock around dlm_purge_mstcpy_locks() because it
  may run concurrently with dlm_recover_master_copy().

- Maintain highbast on process-copy lkb's (in addition to
  the master as is usual), because the lkb can switch
  back and forth between being a master and being a
  process copy as the master node changes in recovery.

- When recovering MSTCPY locks, flag rsb's that have
  non-empty convert or waiting queues for granting
  at the end of recovery.  (Rename flag from LOCKS_PURGED
  to RECOVER_GRANT and similar for the recovery function,
  because it's not only resources with purged locks
  that need grant a grant attempt.)

- Replace a couple of unnecessary assertion panics with
  error messages.
Signed-off-by: NDavid Teigland <teigland@redhat.com>
上级 6d40c4a7
...@@ -310,6 +310,7 @@ void dlm_callback_resume(struct dlm_ls *ls) ...@@ -310,6 +310,7 @@ void dlm_callback_resume(struct dlm_ls *ls)
} }
mutex_unlock(&ls->ls_cb_mutex); mutex_unlock(&ls->ls_cb_mutex);
log_debug(ls, "dlm_callback_resume %d", count); if (count)
log_debug(ls, "dlm_callback_resume %d", count);
} }
...@@ -271,6 +271,8 @@ struct dlm_lkb { ...@@ -271,6 +271,8 @@ struct dlm_lkb {
ktime_t lkb_last_cast_time; /* for debugging */ ktime_t lkb_last_cast_time; /* for debugging */
ktime_t lkb_last_bast_time; /* for debugging */ ktime_t lkb_last_bast_time; /* for debugging */
uint64_t lkb_recover_seq; /* from ls_recover_seq */
char *lkb_lvbptr; char *lkb_lvbptr;
struct dlm_lksb *lkb_lksb; /* caller's status block */ struct dlm_lksb *lkb_lksb; /* caller's status block */
void (*lkb_astfn) (void *astparam); void (*lkb_astfn) (void *astparam);
...@@ -325,7 +327,7 @@ enum rsb_flags { ...@@ -325,7 +327,7 @@ enum rsb_flags {
RSB_NEW_MASTER, RSB_NEW_MASTER,
RSB_NEW_MASTER2, RSB_NEW_MASTER2,
RSB_RECOVER_CONVERT, RSB_RECOVER_CONVERT,
RSB_LOCKS_PURGED, RSB_RECOVER_GRANT,
}; };
static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag) static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag)
...@@ -571,6 +573,7 @@ struct dlm_ls { ...@@ -571,6 +573,7 @@ struct dlm_ls {
struct mutex ls_requestqueue_mutex; struct mutex ls_requestqueue_mutex;
struct dlm_rcom *ls_recover_buf; struct dlm_rcom *ls_recover_buf;
int ls_recover_nodeid; /* for debugging */ int ls_recover_nodeid; /* for debugging */
unsigned int ls_recover_locks_in; /* for log info */
uint64_t ls_rcom_seq; uint64_t ls_rcom_seq;
spinlock_t ls_rcom_spin; spinlock_t ls_rcom_spin;
struct list_head ls_recover_list; struct list_head ls_recover_list;
...@@ -597,6 +600,7 @@ struct dlm_ls { ...@@ -597,6 +600,7 @@ struct dlm_ls {
#define LSFL_UEVENT_WAIT 5 #define LSFL_UEVENT_WAIT 5
#define LSFL_TIMEWARN 6 #define LSFL_TIMEWARN 6
#define LSFL_CB_DELAY 7 #define LSFL_CB_DELAY 7
#define LSFL_NODIR 8
/* much of this is just saving user space pointers associated with the /* much of this is just saving user space pointers associated with the
lock that we pass back to the user lib with an ast */ lock that we pass back to the user lib with an ast */
...@@ -644,7 +648,7 @@ static inline int dlm_recovery_stopped(struct dlm_ls *ls) ...@@ -644,7 +648,7 @@ static inline int dlm_recovery_stopped(struct dlm_ls *ls)
static inline int dlm_no_directory(struct dlm_ls *ls) static inline int dlm_no_directory(struct dlm_ls *ls)
{ {
return (ls->ls_exflags & DLM_LSFL_NODIR) ? 1 : 0; return test_bit(LSFL_NODIR, &ls->ls_flags);
} }
int dlm_netlink_init(void); int dlm_netlink_init(void);
......
...@@ -161,10 +161,11 @@ static const int __quecvt_compat_matrix[8][8] = { ...@@ -161,10 +161,11 @@ static const int __quecvt_compat_matrix[8][8] = {
void dlm_print_lkb(struct dlm_lkb *lkb) void dlm_print_lkb(struct dlm_lkb *lkb)
{ {
printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x " printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x "
"sts %d rq %d gr %d wait_type %d wait_nodeid %d\n", "sts %d rq %d gr %d wait_type %d wait_nodeid %d seq %llu\n",
lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags, lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode, lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid); lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_wait_nodeid,
(unsigned long long)lkb->lkb_recover_seq);
} }
static void dlm_print_rsb(struct dlm_rsb *r) static void dlm_print_rsb(struct dlm_rsb *r)
...@@ -251,8 +252,6 @@ static inline int is_process_copy(struct dlm_lkb *lkb) ...@@ -251,8 +252,6 @@ static inline int is_process_copy(struct dlm_lkb *lkb)
static inline int is_master_copy(struct dlm_lkb *lkb) static inline int is_master_copy(struct dlm_lkb *lkb)
{ {
if (lkb->lkb_flags & DLM_IFL_MSTCPY)
DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0; return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
} }
...@@ -1519,13 +1518,13 @@ static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) ...@@ -1519,13 +1518,13 @@ static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
} }
lkb->lkb_rqmode = DLM_LOCK_IV; lkb->lkb_rqmode = DLM_LOCK_IV;
lkb->lkb_highbast = 0;
} }
static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb) static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
{ {
set_lvb_lock(r, lkb); set_lvb_lock(r, lkb);
_grant_lock(r, lkb); _grant_lock(r, lkb);
lkb->lkb_highbast = 0;
} }
static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb, static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
...@@ -1887,7 +1886,8 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, ...@@ -1887,7 +1886,8 @@ static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
/* Returns the highest requested mode of all blocked conversions; sets /* Returns the highest requested mode of all blocked conversions; sets
cw if there's a blocked conversion to DLM_LOCK_CW. */ cw if there's a blocked conversion to DLM_LOCK_CW. */
static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw) static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw,
unsigned int *count)
{ {
struct dlm_lkb *lkb, *s; struct dlm_lkb *lkb, *s;
int hi, demoted, quit, grant_restart, demote_restart; int hi, demoted, quit, grant_restart, demote_restart;
...@@ -1906,6 +1906,8 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw) ...@@ -1906,6 +1906,8 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
if (can_be_granted(r, lkb, 0, &deadlk)) { if (can_be_granted(r, lkb, 0, &deadlk)) {
grant_lock_pending(r, lkb); grant_lock_pending(r, lkb);
grant_restart = 1; grant_restart = 1;
if (count)
(*count)++;
continue; continue;
} }
...@@ -1939,14 +1941,17 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw) ...@@ -1939,14 +1941,17 @@ static int grant_pending_convert(struct dlm_rsb *r, int high, int *cw)
return max_t(int, high, hi); return max_t(int, high, hi);
} }
static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw) static int grant_pending_wait(struct dlm_rsb *r, int high, int *cw,
unsigned int *count)
{ {
struct dlm_lkb *lkb, *s; struct dlm_lkb *lkb, *s;
list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) { list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
if (can_be_granted(r, lkb, 0, NULL)) if (can_be_granted(r, lkb, 0, NULL)) {
grant_lock_pending(r, lkb); grant_lock_pending(r, lkb);
else { if (count)
(*count)++;
} else {
high = max_t(int, lkb->lkb_rqmode, high); high = max_t(int, lkb->lkb_rqmode, high);
if (lkb->lkb_rqmode == DLM_LOCK_CW) if (lkb->lkb_rqmode == DLM_LOCK_CW)
*cw = 1; *cw = 1;
...@@ -1975,16 +1980,20 @@ static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw) ...@@ -1975,16 +1980,20 @@ static int lock_requires_bast(struct dlm_lkb *gr, int high, int cw)
return 0; return 0;
} }
static void grant_pending_locks(struct dlm_rsb *r) static void grant_pending_locks(struct dlm_rsb *r, unsigned int *count)
{ {
struct dlm_lkb *lkb, *s; struct dlm_lkb *lkb, *s;
int high = DLM_LOCK_IV; int high = DLM_LOCK_IV;
int cw = 0; int cw = 0;
DLM_ASSERT(is_master(r), dlm_dump_rsb(r);); if (!is_master(r)) {
log_print("grant_pending_locks r nodeid %d", r->res_nodeid);
dlm_dump_rsb(r);
return;
}
high = grant_pending_convert(r, high, &cw); high = grant_pending_convert(r, high, &cw, count);
high = grant_pending_wait(r, high, &cw); high = grant_pending_wait(r, high, &cw, count);
if (high == DLM_LOCK_IV) if (high == DLM_LOCK_IV)
return; return;
...@@ -2520,7 +2529,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb) ...@@ -2520,7 +2529,7 @@ static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
before we try again to grant this one. */ before we try again to grant this one. */
if (is_demoted(lkb)) { if (is_demoted(lkb)) {
grant_pending_convert(r, DLM_LOCK_IV, NULL); grant_pending_convert(r, DLM_LOCK_IV, NULL, NULL);
if (_can_be_granted(r, lkb, 1)) { if (_can_be_granted(r, lkb, 1)) {
grant_lock(r, lkb); grant_lock(r, lkb);
queue_cast(r, lkb, 0); queue_cast(r, lkb, 0);
...@@ -2548,7 +2557,7 @@ static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, ...@@ -2548,7 +2557,7 @@ static void do_convert_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
{ {
switch (error) { switch (error) {
case 0: case 0:
grant_pending_locks(r); grant_pending_locks(r, NULL);
/* grant_pending_locks also sends basts */ /* grant_pending_locks also sends basts */
break; break;
case -EAGAIN: case -EAGAIN:
...@@ -2571,7 +2580,7 @@ static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb) ...@@ -2571,7 +2580,7 @@ static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, static void do_unlock_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
int error) int error)
{ {
grant_pending_locks(r); grant_pending_locks(r, NULL);
} }
/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */ /* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
...@@ -2592,7 +2601,7 @@ static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb, ...@@ -2592,7 +2601,7 @@ static void do_cancel_effects(struct dlm_rsb *r, struct dlm_lkb *lkb,
int error) int error)
{ {
if (error) if (error)
grant_pending_locks(r); grant_pending_locks(r, NULL);
} }
/* /*
...@@ -3452,8 +3461,9 @@ static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms) ...@@ -3452,8 +3461,9 @@ static int receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
goto fail; goto fail;
if (lkb->lkb_remid != ms->m_lkid) { if (lkb->lkb_remid != ms->m_lkid) {
log_error(ls, "receive_convert %x remid %x remote %d %x", log_error(ls, "receive_convert %x remid %x recover_seq %llu "
lkb->lkb_id, lkb->lkb_remid, "remote %d %x", lkb->lkb_id, lkb->lkb_remid,
(unsigned long long)lkb->lkb_recover_seq,
ms->m_header.h_nodeid, ms->m_lkid); ms->m_header.h_nodeid, ms->m_lkid);
error = -ENOENT; error = -ENOENT;
goto fail; goto fail;
...@@ -3631,6 +3641,7 @@ static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms) ...@@ -3631,6 +3641,7 @@ static int receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
goto out; goto out;
queue_bast(r, lkb, ms->m_bastmode); queue_bast(r, lkb, ms->m_bastmode);
lkb->lkb_highbast = ms->m_bastmode;
out: out:
unlock_rsb(r); unlock_rsb(r);
put_rsb(r); put_rsb(r);
...@@ -3710,8 +3721,13 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms) ...@@ -3710,8 +3721,13 @@ static int receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
mstype = lkb->lkb_wait_type; mstype = lkb->lkb_wait_type;
error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY); error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
if (error) if (error) {
log_error(ls, "receive_request_reply %x remote %d %x result %d",
lkb->lkb_id, ms->m_header.h_nodeid, ms->m_lkid,
ms->m_result);
dlm_dump_rsb(r);
goto out; goto out;
}
/* Optimization: the dir node was also the master, so it took our /* Optimization: the dir node was also the master, so it took our
lookup as a request and sent request reply instead of lookup reply */ lookup as a request and sent request reply instead of lookup reply */
...@@ -4122,21 +4138,28 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms, ...@@ -4122,21 +4138,28 @@ static void _receive_message(struct dlm_ls *ls, struct dlm_message *ms,
* happen in normal usage for the async messages and cancel, so * happen in normal usage for the async messages and cancel, so
* only use log_debug for them. * only use log_debug for them.
* *
* Other errors are expected and normal. * Some errors are expected and normal.
*/ */
if (error == -ENOENT && noent) { if (error == -ENOENT && noent) {
log_debug(ls, "receive %d no %x remote %d %x seq %u", log_debug(ls, "receive %d no %x remote %d %x saved_seq %u",
ms->m_type, ms->m_remid, ms->m_header.h_nodeid, ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
ms->m_lkid, saved_seq); ms->m_lkid, saved_seq);
} else if (error == -ENOENT) { } else if (error == -ENOENT) {
log_error(ls, "receive %d no %x remote %d %x seq %u", log_error(ls, "receive %d no %x remote %d %x saved_seq %u",
ms->m_type, ms->m_remid, ms->m_header.h_nodeid, ms->m_type, ms->m_remid, ms->m_header.h_nodeid,
ms->m_lkid, saved_seq); ms->m_lkid, saved_seq);
if (ms->m_type == DLM_MSG_CONVERT) if (ms->m_type == DLM_MSG_CONVERT)
dlm_dump_rsb_hash(ls, ms->m_hash); dlm_dump_rsb_hash(ls, ms->m_hash);
} }
if (error == -EINVAL) {
log_error(ls, "receive %d inval from %d lkid %x remid %x "
"saved_seq %u",
ms->m_type, ms->m_header.h_nodeid,
ms->m_lkid, ms->m_remid, saved_seq);
}
} }
/* If the lockspace is in recovery mode (locking stopped), then normal /* If the lockspace is in recovery mode (locking stopped), then normal
...@@ -4200,9 +4223,11 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid) ...@@ -4200,9 +4223,11 @@ void dlm_receive_buffer(union dlm_packet *p, int nodeid)
ls = dlm_find_lockspace_global(hd->h_lockspace); ls = dlm_find_lockspace_global(hd->h_lockspace);
if (!ls) { if (!ls) {
if (dlm_config.ci_log_debug) if (dlm_config.ci_log_debug) {
log_print("invalid lockspace %x from %d cmd %d type %d", printk_ratelimited(KERN_DEBUG "dlm: invalid lockspace "
hd->h_lockspace, nodeid, hd->h_cmd, type); "%u from %d cmd %d type %d\n",
hd->h_lockspace, nodeid, hd->h_cmd, type);
}
if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS) if (hd->h_cmd == DLM_RCOM && type == DLM_RCOM_STATUS)
dlm_send_ls_not_ready(nodeid, &p->rcom); dlm_send_ls_not_ready(nodeid, &p->rcom);
...@@ -4253,16 +4278,10 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb, ...@@ -4253,16 +4278,10 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb,
static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb, static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb,
int dir_nodeid) int dir_nodeid)
{ {
if (dlm_is_removed(ls, lkb->lkb_wait_nodeid)) if (dlm_no_directory(ls))
return 1;
if (!dlm_no_directory(ls))
return 0;
if (dir_nodeid == dlm_our_nodeid())
return 1; return 1;
if (dir_nodeid != lkb->lkb_wait_nodeid) if (dlm_is_removed(ls, lkb->lkb_wait_nodeid))
return 1; return 1;
return 0; return 0;
...@@ -4519,112 +4538,177 @@ int dlm_recover_waiters_post(struct dlm_ls *ls) ...@@ -4519,112 +4538,177 @@ int dlm_recover_waiters_post(struct dlm_ls *ls)
return error; return error;
} }
static void purge_queue(struct dlm_rsb *r, struct list_head *queue, static void purge_mstcpy_list(struct dlm_ls *ls, struct dlm_rsb *r,
int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb)) struct list_head *list)
{ {
struct dlm_ls *ls = r->res_ls;
struct dlm_lkb *lkb, *safe; struct dlm_lkb *lkb, *safe;
list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) { list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
if (test(ls, lkb)) { if (!is_master_copy(lkb))
rsb_set_flag(r, RSB_LOCKS_PURGED); continue;
del_lkb(r, lkb);
/* this put should free the lkb */ /* don't purge lkbs we've added in recover_master_copy for
if (!dlm_put_lkb(lkb)) the current recovery seq */
log_error(ls, "purged lkb not released");
} if (lkb->lkb_recover_seq == ls->ls_recover_seq)
continue;
del_lkb(r, lkb);
/* this put should free the lkb */
if (!dlm_put_lkb(lkb))
log_error(ls, "purged mstcpy lkb not released");
} }
} }
static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb) void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
{ {
return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid)); struct dlm_ls *ls = r->res_ls;
}
static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb) purge_mstcpy_list(ls, r, &r->res_grantqueue);
{ purge_mstcpy_list(ls, r, &r->res_convertqueue);
return is_master_copy(lkb); purge_mstcpy_list(ls, r, &r->res_waitqueue);
} }
static void purge_dead_locks(struct dlm_rsb *r) static void purge_dead_list(struct dlm_ls *ls, struct dlm_rsb *r,
struct list_head *list,
int nodeid_gone, unsigned int *count)
{ {
purge_queue(r, &r->res_grantqueue, &purge_dead_test); struct dlm_lkb *lkb, *safe;
purge_queue(r, &r->res_convertqueue, &purge_dead_test);
purge_queue(r, &r->res_waitqueue, &purge_dead_test);
}
void dlm_purge_mstcpy_locks(struct dlm_rsb *r) list_for_each_entry_safe(lkb, safe, list, lkb_statequeue) {
{ if (!is_master_copy(lkb))
purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test); continue;
purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test); if ((lkb->lkb_nodeid == nodeid_gone) ||
dlm_is_removed(ls, lkb->lkb_nodeid)) {
del_lkb(r, lkb);
/* this put should free the lkb */
if (!dlm_put_lkb(lkb))
log_error(ls, "purged dead lkb not released");
rsb_set_flag(r, RSB_RECOVER_GRANT);
(*count)++;
}
}
} }
/* Get rid of locks held by nodes that are gone. */ /* Get rid of locks held by nodes that are gone. */
int dlm_purge_locks(struct dlm_ls *ls) void dlm_recover_purge(struct dlm_ls *ls)
{ {
struct dlm_rsb *r; struct dlm_rsb *r;
struct dlm_member *memb;
int nodes_count = 0;
int nodeid_gone = 0;
unsigned int lkb_count = 0;
/* cache one removed nodeid to optimize the common
case of a single node removed */
list_for_each_entry(memb, &ls->ls_nodes_gone, list) {
nodes_count++;
nodeid_gone = memb->nodeid;
}
log_debug(ls, "dlm_purge_locks"); if (!nodes_count)
return;
down_write(&ls->ls_root_sem); down_write(&ls->ls_root_sem);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) { list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
hold_rsb(r); hold_rsb(r);
lock_rsb(r); lock_rsb(r);
if (is_master(r)) if (is_master(r)) {
purge_dead_locks(r); purge_dead_list(ls, r, &r->res_grantqueue,
nodeid_gone, &lkb_count);
purge_dead_list(ls, r, &r->res_convertqueue,
nodeid_gone, &lkb_count);
purge_dead_list(ls, r, &r->res_waitqueue,
nodeid_gone, &lkb_count);
}
unlock_rsb(r); unlock_rsb(r);
unhold_rsb(r); unhold_rsb(r);
cond_resched();
schedule();
} }
up_write(&ls->ls_root_sem); up_write(&ls->ls_root_sem);
return 0; if (lkb_count)
log_debug(ls, "dlm_recover_purge %u locks for %u nodes",
lkb_count, nodes_count);
} }
static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket) static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
{ {
struct rb_node *n; struct rb_node *n;
struct dlm_rsb *r, *r_ret = NULL; struct dlm_rsb *r;
spin_lock(&ls->ls_rsbtbl[bucket].lock); spin_lock(&ls->ls_rsbtbl[bucket].lock);
for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) { for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
r = rb_entry(n, struct dlm_rsb, res_hashnode); r = rb_entry(n, struct dlm_rsb, res_hashnode);
if (!rsb_flag(r, RSB_LOCKS_PURGED))
if (!rsb_flag(r, RSB_RECOVER_GRANT))
continue;
rsb_clear_flag(r, RSB_RECOVER_GRANT);
if (!is_master(r))
continue; continue;
hold_rsb(r); hold_rsb(r);
rsb_clear_flag(r, RSB_LOCKS_PURGED); spin_unlock(&ls->ls_rsbtbl[bucket].lock);
r_ret = r; return r;
break;
} }
spin_unlock(&ls->ls_rsbtbl[bucket].lock); spin_unlock(&ls->ls_rsbtbl[bucket].lock);
return r_ret; return NULL;
} }
void dlm_grant_after_purge(struct dlm_ls *ls) /*
* Attempt to grant locks on resources that we are the master of.
* Locks may have become grantable during recovery because locks
* from departed nodes have been purged (or not rebuilt), allowing
* previously blocked locks to now be granted. The subset of rsb's
* we are interested in are those with lkb's on either the convert or
* waiting queues.
*
* Simplest would be to go through each master rsb and check for non-empty
* convert or waiting queues, and attempt to grant on those rsbs.
* Checking the queues requires lock_rsb, though, for which we'd need
* to release the rsbtbl lock. This would make iterating through all
* rsb's very inefficient. So, we rely on earlier recovery routines
* to set RECOVER_GRANT on any rsb's that we should attempt to grant
* locks for.
*/
void dlm_recover_grant(struct dlm_ls *ls)
{ {
struct dlm_rsb *r; struct dlm_rsb *r;
int bucket = 0; int bucket = 0;
unsigned int count = 0;
unsigned int rsb_count = 0;
unsigned int lkb_count = 0;
while (1) { while (1) {
r = find_purged_rsb(ls, bucket); r = find_grant_rsb(ls, bucket);
if (!r) { if (!r) {
if (bucket == ls->ls_rsbtbl_size - 1) if (bucket == ls->ls_rsbtbl_size - 1)
break; break;
bucket++; bucket++;
continue; continue;
} }
rsb_count++;
count = 0;
lock_rsb(r); lock_rsb(r);
if (is_master(r)) { grant_pending_locks(r, &count);
grant_pending_locks(r); lkb_count += count;
confirm_master(r, 0); confirm_master(r, 0);
}
unlock_rsb(r); unlock_rsb(r);
put_rsb(r); put_rsb(r);
schedule(); cond_resched();
} }
if (lkb_count)
log_debug(ls, "dlm_recover_grant %u locks on %u resources",
lkb_count, rsb_count);
} }
static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid, static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
...@@ -4723,11 +4807,26 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) ...@@ -4723,11 +4807,26 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
remid = le32_to_cpu(rl->rl_lkid); remid = le32_to_cpu(rl->rl_lkid);
error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), /* In general we expect the rsb returned to be R_MASTER, but we don't
R_MASTER, &r); have to require it. Recovery of masters on one node can overlap
recovery of locks on another node, so one node can send us MSTCPY
locks before we've made ourselves master of this rsb. We can still
add new MSTCPY locks that we receive here without any harm; when
we make ourselves master, dlm_recover_masters() won't touch the
MSTCPY locks we've received early. */
error = find_rsb(ls, rl->rl_name, le16_to_cpu(rl->rl_namelen), 0, &r);
if (error) if (error)
goto out; goto out;
if (dlm_no_directory(ls) && (dlm_dir_nodeid(r) != dlm_our_nodeid())) {
log_error(ls, "dlm_recover_master_copy remote %d %x not dir",
rc->rc_header.h_nodeid, remid);
error = -EBADR;
put_rsb(r);
goto out;
}
lock_rsb(r); lock_rsb(r);
lkb = search_remid(r, rc->rc_header.h_nodeid, remid); lkb = search_remid(r, rc->rc_header.h_nodeid, remid);
...@@ -4749,12 +4848,18 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc) ...@@ -4749,12 +4848,18 @@ int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
attach_lkb(r, lkb); attach_lkb(r, lkb);
add_lkb(r, lkb, rl->rl_status); add_lkb(r, lkb, rl->rl_status);
error = 0; error = 0;
ls->ls_recover_locks_in++;
if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
rsb_set_flag(r, RSB_RECOVER_GRANT);
out_remid: out_remid:
/* this is the new value returned to the lock holder for /* this is the new value returned to the lock holder for
saving in its process-copy lkb */ saving in its process-copy lkb */
rl->rl_remid = cpu_to_le32(lkb->lkb_id); rl->rl_remid = cpu_to_le32(lkb->lkb_id);
lkb->lkb_recover_seq = ls->ls_recover_seq;
out_unlock: out_unlock:
unlock_rsb(r); unlock_rsb(r);
put_rsb(r); put_rsb(r);
...@@ -4786,17 +4891,20 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc) ...@@ -4786,17 +4891,20 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
return error; return error;
} }
r = lkb->lkb_resource;
hold_rsb(r);
lock_rsb(r);
if (!is_process_copy(lkb)) { if (!is_process_copy(lkb)) {
log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d", log_error(ls, "dlm_recover_process_copy bad %x remote %d %x %d",
lkid, rc->rc_header.h_nodeid, remid, result); lkid, rc->rc_header.h_nodeid, remid, result);
dlm_print_lkb(lkb); dlm_dump_rsb(r);
unlock_rsb(r);
put_rsb(r);
dlm_put_lkb(lkb);
return -EINVAL; return -EINVAL;
} }
r = lkb->lkb_resource;
hold_rsb(r);
lock_rsb(r);
switch (result) { switch (result) {
case -EBADR: case -EBADR:
/* There's a chance the new master received our lock before /* There's a chance the new master received our lock before
......
...@@ -32,9 +32,9 @@ void dlm_adjust_timeouts(struct dlm_ls *ls); ...@@ -32,9 +32,9 @@ void dlm_adjust_timeouts(struct dlm_ls *ls);
int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len, int dlm_search_rsb_tree(struct rb_root *tree, char *name, int len,
unsigned int flags, struct dlm_rsb **r_ret); unsigned int flags, struct dlm_rsb **r_ret);
int dlm_purge_locks(struct dlm_ls *ls); void dlm_recover_purge(struct dlm_ls *ls);
void dlm_purge_mstcpy_locks(struct dlm_rsb *r); void dlm_purge_mstcpy_locks(struct dlm_rsb *r);
void dlm_grant_after_purge(struct dlm_ls *ls); void dlm_recover_grant(struct dlm_ls *ls);
int dlm_recover_waiters_post(struct dlm_ls *ls); int dlm_recover_waiters_post(struct dlm_ls *ls);
void dlm_recover_waiters_pre(struct dlm_ls *ls); void dlm_recover_waiters_pre(struct dlm_ls *ls);
int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc); int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc);
......
...@@ -74,6 +74,19 @@ static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len) ...@@ -74,6 +74,19 @@ static ssize_t dlm_id_store(struct dlm_ls *ls, const char *buf, size_t len)
return len; return len;
} }
static ssize_t dlm_nodir_show(struct dlm_ls *ls, char *buf)
{
return snprintf(buf, PAGE_SIZE, "%u\n", dlm_no_directory(ls));
}
static ssize_t dlm_nodir_store(struct dlm_ls *ls, const char *buf, size_t len)
{
int val = simple_strtoul(buf, NULL, 0);
if (val == 1)
set_bit(LSFL_NODIR, &ls->ls_flags);
return len;
}
static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf) static ssize_t dlm_recover_status_show(struct dlm_ls *ls, char *buf)
{ {
uint32_t status = dlm_recover_status(ls); uint32_t status = dlm_recover_status(ls);
...@@ -107,6 +120,12 @@ static struct dlm_attr dlm_attr_id = { ...@@ -107,6 +120,12 @@ static struct dlm_attr dlm_attr_id = {
.store = dlm_id_store .store = dlm_id_store
}; };
static struct dlm_attr dlm_attr_nodir = {
.attr = {.name = "nodir", .mode = S_IRUGO | S_IWUSR},
.show = dlm_nodir_show,
.store = dlm_nodir_store
};
static struct dlm_attr dlm_attr_recover_status = { static struct dlm_attr dlm_attr_recover_status = {
.attr = {.name = "recover_status", .mode = S_IRUGO}, .attr = {.name = "recover_status", .mode = S_IRUGO},
.show = dlm_recover_status_show .show = dlm_recover_status_show
...@@ -121,6 +140,7 @@ static struct attribute *dlm_attrs[] = { ...@@ -121,6 +140,7 @@ static struct attribute *dlm_attrs[] = {
&dlm_attr_control.attr, &dlm_attr_control.attr,
&dlm_attr_event.attr, &dlm_attr_event.attr,
&dlm_attr_id.attr, &dlm_attr_id.attr,
&dlm_attr_nodir.attr,
&dlm_attr_recover_status.attr, &dlm_attr_recover_status.attr,
&dlm_attr_recover_nodeid.attr, &dlm_attr_recover_nodeid.attr,
NULL, NULL,
......
...@@ -492,30 +492,41 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in) ...@@ -492,30 +492,41 @@ int dlm_send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid) void dlm_receive_rcom(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
{ {
int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock); int lock_size = sizeof(struct dlm_rcom) + sizeof(struct rcom_lock);
int stop, reply = 0; int stop, reply = 0, lock = 0;
uint32_t status;
uint64_t seq; uint64_t seq;
switch (rc->rc_type) { switch (rc->rc_type) {
case DLM_RCOM_LOCK:
lock = 1;
break;
case DLM_RCOM_LOCK_REPLY:
lock = 1;
reply = 1;
break;
case DLM_RCOM_STATUS_REPLY: case DLM_RCOM_STATUS_REPLY:
case DLM_RCOM_NAMES_REPLY: case DLM_RCOM_NAMES_REPLY:
case DLM_RCOM_LOOKUP_REPLY: case DLM_RCOM_LOOKUP_REPLY:
case DLM_RCOM_LOCK_REPLY:
reply = 1; reply = 1;
}; };
spin_lock(&ls->ls_recover_lock); spin_lock(&ls->ls_recover_lock);
status = ls->ls_recover_status;
stop = test_bit(LSFL_RECOVERY_STOP, &ls->ls_flags); stop = test_bit(LSFL_RECOVERY_STOP, &ls->ls_flags);
seq = ls->ls_recover_seq; seq = ls->ls_recover_seq;
spin_unlock(&ls->ls_recover_lock); spin_unlock(&ls->ls_recover_lock);
if ((stop && (rc->rc_type != DLM_RCOM_STATUS)) || if ((stop && (rc->rc_type != DLM_RCOM_STATUS)) ||
(reply && (rc->rc_seq_reply != seq))) { (reply && (rc->rc_seq_reply != seq)) ||
(lock && !(status & DLM_RS_DIR))) {
log_limit(ls, "dlm_receive_rcom ignore msg %d " log_limit(ls, "dlm_receive_rcom ignore msg %d "
"from %d %llu %llu seq %llu", "from %d %llu %llu recover seq %llu sts %x gen %u",
rc->rc_type, nodeid, rc->rc_type,
nodeid,
(unsigned long long)rc->rc_seq, (unsigned long long)rc->rc_seq,
(unsigned long long)rc->rc_seq_reply, (unsigned long long)rc->rc_seq_reply,
(unsigned long long)seq); (unsigned long long)seq,
status, ls->ls_generation);
goto out; goto out;
} }
......
...@@ -339,9 +339,12 @@ static void set_lock_master(struct list_head *queue, int nodeid) ...@@ -339,9 +339,12 @@ static void set_lock_master(struct list_head *queue, int nodeid)
{ {
struct dlm_lkb *lkb; struct dlm_lkb *lkb;
list_for_each_entry(lkb, queue, lkb_statequeue) list_for_each_entry(lkb, queue, lkb_statequeue) {
if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) if (!(lkb->lkb_flags & DLM_IFL_MSTCPY)) {
lkb->lkb_nodeid = nodeid; lkb->lkb_nodeid = nodeid;
lkb->lkb_remid = 0;
}
}
} }
static void set_master_lkbs(struct dlm_rsb *r) static void set_master_lkbs(struct dlm_rsb *r)
...@@ -354,18 +357,16 @@ static void set_master_lkbs(struct dlm_rsb *r) ...@@ -354,18 +357,16 @@ static void set_master_lkbs(struct dlm_rsb *r)
/* /*
* Propagate the new master nodeid to locks * Propagate the new master nodeid to locks
* The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider. * The NEW_MASTER flag tells dlm_recover_locks() which rsb's to consider.
* The NEW_MASTER2 flag tells recover_lvb() and set_locks_purged() which * The NEW_MASTER2 flag tells recover_lvb() and recover_grant() which
* rsb's to consider. * rsb's to consider.
*/ */
static void set_new_master(struct dlm_rsb *r, int nodeid) static void set_new_master(struct dlm_rsb *r, int nodeid)
{ {
lock_rsb(r);
r->res_nodeid = nodeid; r->res_nodeid = nodeid;
set_master_lkbs(r); set_master_lkbs(r);
rsb_set_flag(r, RSB_NEW_MASTER); rsb_set_flag(r, RSB_NEW_MASTER);
rsb_set_flag(r, RSB_NEW_MASTER2); rsb_set_flag(r, RSB_NEW_MASTER2);
unlock_rsb(r);
} }
/* /*
...@@ -376,9 +377,9 @@ static void set_new_master(struct dlm_rsb *r, int nodeid) ...@@ -376,9 +377,9 @@ static void set_new_master(struct dlm_rsb *r, int nodeid)
static int recover_master(struct dlm_rsb *r) static int recover_master(struct dlm_rsb *r)
{ {
struct dlm_ls *ls = r->res_ls; struct dlm_ls *ls = r->res_ls;
int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid(); int error, ret_nodeid;
int our_nodeid = dlm_our_nodeid();
dir_nodeid = dlm_dir_nodeid(r); int dir_nodeid = dlm_dir_nodeid(r);
if (dir_nodeid == our_nodeid) { if (dir_nodeid == our_nodeid) {
error = dlm_dir_lookup(ls, our_nodeid, r->res_name, error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
...@@ -388,7 +389,9 @@ static int recover_master(struct dlm_rsb *r) ...@@ -388,7 +389,9 @@ static int recover_master(struct dlm_rsb *r)
if (ret_nodeid == our_nodeid) if (ret_nodeid == our_nodeid)
ret_nodeid = 0; ret_nodeid = 0;
lock_rsb(r);
set_new_master(r, ret_nodeid); set_new_master(r, ret_nodeid);
unlock_rsb(r);
} else { } else {
recover_list_add(r); recover_list_add(r);
error = dlm_send_rcom_lookup(r, dir_nodeid); error = dlm_send_rcom_lookup(r, dir_nodeid);
...@@ -398,24 +401,33 @@ static int recover_master(struct dlm_rsb *r) ...@@ -398,24 +401,33 @@ static int recover_master(struct dlm_rsb *r)
} }
/* /*
* When not using a directory, most resource names will hash to a new static * All MSTCPY locks are purged and rebuilt, even if the master stayed the same.
* master nodeid and the resource will need to be remastered. * This is necessary because recovery can be started, aborted and restarted,
* causing the master nodeid to briefly change during the aborted recovery, and
* change back to the original value in the second recovery. The MSTCPY locks
* may or may not have been purged during the aborted recovery. Another node
* with an outstanding request in waiters list and a request reply saved in the
* requestqueue, cannot know whether it should ignore the reply and resend the
* request, or accept the reply and complete the request. It must do the
* former if the remote node purged MSTCPY locks, and it must do the later if
* the remote node did not. This is solved by always purging MSTCPY locks, in
* which case, the request reply would always be ignored and the request
* resent.
*/ */
static int recover_master_static(struct dlm_rsb *r) static int recover_master_static(struct dlm_rsb *r)
{ {
int master = dlm_dir_nodeid(r); int dir_nodeid = dlm_dir_nodeid(r);
int new_master = dir_nodeid;
if (master == dlm_our_nodeid()) if (dir_nodeid == dlm_our_nodeid())
master = 0; new_master = 0;
if (r->res_nodeid != master) { lock_rsb(r);
if (is_master(r)) dlm_purge_mstcpy_locks(r);
dlm_purge_mstcpy_locks(r); set_new_master(r, new_master);
set_new_master(r, master); unlock_rsb(r);
return 1; return 1;
}
return 0;
} }
/* /*
...@@ -481,7 +493,9 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc) ...@@ -481,7 +493,9 @@ int dlm_recover_master_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
if (nodeid == dlm_our_nodeid()) if (nodeid == dlm_our_nodeid())
nodeid = 0; nodeid = 0;
lock_rsb(r);
set_new_master(r, nodeid); set_new_master(r, nodeid);
unlock_rsb(r);
recover_list_del(r); recover_list_del(r);
if (recover_list_empty(ls)) if (recover_list_empty(ls))
...@@ -556,8 +570,6 @@ int dlm_recover_locks(struct dlm_ls *ls) ...@@ -556,8 +570,6 @@ int dlm_recover_locks(struct dlm_ls *ls)
struct dlm_rsb *r; struct dlm_rsb *r;
int error, count = 0; int error, count = 0;
log_debug(ls, "dlm_recover_locks");
down_read(&ls->ls_root_sem); down_read(&ls->ls_root_sem);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) { list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
if (is_master(r)) { if (is_master(r)) {
...@@ -584,7 +596,7 @@ int dlm_recover_locks(struct dlm_ls *ls) ...@@ -584,7 +596,7 @@ int dlm_recover_locks(struct dlm_ls *ls)
} }
up_read(&ls->ls_root_sem); up_read(&ls->ls_root_sem);
log_debug(ls, "dlm_recover_locks %d locks", count); log_debug(ls, "dlm_recover_locks %d out", count);
error = dlm_wait_function(ls, &recover_list_empty); error = dlm_wait_function(ls, &recover_list_empty);
out: out:
...@@ -721,21 +733,19 @@ static void recover_conversion(struct dlm_rsb *r) ...@@ -721,21 +733,19 @@ static void recover_conversion(struct dlm_rsb *r)
} }
/* We've become the new master for this rsb and waiting/converting locks may /* We've become the new master for this rsb and waiting/converting locks may
need to be granted in dlm_grant_after_purge() due to locks that may have need to be granted in dlm_recover_grant() due to locks that may have
existed from a removed node. */ existed from a removed node. */
static void set_locks_purged(struct dlm_rsb *r) static void recover_grant(struct dlm_rsb *r)
{ {
if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue)) if (!list_empty(&r->res_waitqueue) || !list_empty(&r->res_convertqueue))
rsb_set_flag(r, RSB_LOCKS_PURGED); rsb_set_flag(r, RSB_RECOVER_GRANT);
} }
void dlm_recover_rsbs(struct dlm_ls *ls) void dlm_recover_rsbs(struct dlm_ls *ls)
{ {
struct dlm_rsb *r; struct dlm_rsb *r;
int count = 0; unsigned int count = 0;
log_debug(ls, "dlm_recover_rsbs");
down_read(&ls->ls_root_sem); down_read(&ls->ls_root_sem);
list_for_each_entry(r, &ls->ls_root_list, res_root_list) { list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
...@@ -744,7 +754,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls) ...@@ -744,7 +754,7 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
if (rsb_flag(r, RSB_RECOVER_CONVERT)) if (rsb_flag(r, RSB_RECOVER_CONVERT))
recover_conversion(r); recover_conversion(r);
if (rsb_flag(r, RSB_NEW_MASTER2)) if (rsb_flag(r, RSB_NEW_MASTER2))
set_locks_purged(r); recover_grant(r);
recover_lvb(r); recover_lvb(r);
count++; count++;
} }
...@@ -754,7 +764,8 @@ void dlm_recover_rsbs(struct dlm_ls *ls) ...@@ -754,7 +764,8 @@ void dlm_recover_rsbs(struct dlm_ls *ls)
} }
up_read(&ls->ls_root_sem); up_read(&ls->ls_root_sem);
log_debug(ls, "dlm_recover_rsbs %d rsbs", count); if (count)
log_debug(ls, "dlm_recover_rsbs %d done", count);
} }
/* Create a single list of all root rsb's to be used during recovery */ /* Create a single list of all root rsb's to be used during recovery */
......
...@@ -84,6 +84,8 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) ...@@ -84,6 +84,8 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
goto fail; goto fail;
} }
ls->ls_recover_locks_in = 0;
dlm_set_recover_status(ls, DLM_RS_NODES); dlm_set_recover_status(ls, DLM_RS_NODES);
error = dlm_recover_members_wait(ls); error = dlm_recover_members_wait(ls);
...@@ -130,7 +132,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) ...@@ -130,7 +132,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
* Clear lkb's for departed nodes. * Clear lkb's for departed nodes.
*/ */
dlm_purge_locks(ls); dlm_recover_purge(ls);
/* /*
* Get new master nodeid's for rsb's that were mastered on * Get new master nodeid's for rsb's that were mastered on
...@@ -161,6 +163,9 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) ...@@ -161,6 +163,9 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
goto fail; goto fail;
} }
log_debug(ls, "dlm_recover_locks %u in",
ls->ls_recover_locks_in);
/* /*
* Finalize state in master rsb's now that all locks can be * Finalize state in master rsb's now that all locks can be
* checked. This includes conversion resolution and lvb * checked. This includes conversion resolution and lvb
...@@ -225,7 +230,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv) ...@@ -225,7 +230,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
goto fail; goto fail;
} }
dlm_grant_after_purge(ls); dlm_recover_grant(ls);
log_debug(ls, "dlm_recover %llu generation %u done: %u ms", log_debug(ls, "dlm_recover %llu generation %u done: %u ms",
(unsigned long long)rv->seq, ls->ls_generation, (unsigned long long)rv->seq, ls->ls_generation,
......
...@@ -65,6 +65,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms) ...@@ -65,6 +65,7 @@ void dlm_add_requestqueue(struct dlm_ls *ls, int nodeid, struct dlm_message *ms)
int dlm_process_requestqueue(struct dlm_ls *ls) int dlm_process_requestqueue(struct dlm_ls *ls)
{ {
struct rq_entry *e; struct rq_entry *e;
struct dlm_message *ms;
int error = 0; int error = 0;
mutex_lock(&ls->ls_requestqueue_mutex); mutex_lock(&ls->ls_requestqueue_mutex);
...@@ -78,6 +79,14 @@ int dlm_process_requestqueue(struct dlm_ls *ls) ...@@ -78,6 +79,14 @@ int dlm_process_requestqueue(struct dlm_ls *ls)
e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list); e = list_entry(ls->ls_requestqueue.next, struct rq_entry, list);
mutex_unlock(&ls->ls_requestqueue_mutex); mutex_unlock(&ls->ls_requestqueue_mutex);
ms = &e->request;
log_limit(ls, "dlm_process_requestqueue msg %d from %d "
"lkid %x remid %x result %d seq %u",
ms->m_type, ms->m_header.h_nodeid,
ms->m_lkid, ms->m_remid, ms->m_result,
e->recover_seq);
dlm_receive_message_saved(ls, &e->request, e->recover_seq); dlm_receive_message_saved(ls, &e->request, e->recover_seq);
mutex_lock(&ls->ls_requestqueue_mutex); mutex_lock(&ls->ls_requestqueue_mutex);
...@@ -140,35 +149,7 @@ static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid) ...@@ -140,35 +149,7 @@ static int purge_request(struct dlm_ls *ls, struct dlm_message *ms, int nodeid)
if (!dlm_no_directory(ls)) if (!dlm_no_directory(ls))
return 0; return 0;
/* with no directory, the master is likely to change as a part of return 1;
recovery; requests to/from the defunct master need to be purged */
switch (type) {
case DLM_MSG_REQUEST:
case DLM_MSG_CONVERT:
case DLM_MSG_UNLOCK:
case DLM_MSG_CANCEL:
/* we're no longer the master of this resource, the sender
will resend to the new master (see waiter_needs_recovery) */
if (dlm_hash2nodeid(ls, ms->m_hash) != dlm_our_nodeid())
return 1;
break;
case DLM_MSG_REQUEST_REPLY:
case DLM_MSG_CONVERT_REPLY:
case DLM_MSG_UNLOCK_REPLY:
case DLM_MSG_CANCEL_REPLY:
case DLM_MSG_GRANT:
/* this reply is from the former master of the resource,
we'll resend to the new master if needed */
if (dlm_hash2nodeid(ls, ms->m_hash) != nodeid)
return 1;
break;
}
return 0;
} }
void dlm_purge_requestqueue(struct dlm_ls *ls) void dlm_purge_requestqueue(struct dlm_ls *ls)
......
...@@ -556,7 +556,6 @@ struct gfs2_sb_host { ...@@ -556,7 +556,6 @@ struct gfs2_sb_host {
struct lm_lockstruct { struct lm_lockstruct {
int ls_jid; int ls_jid;
unsigned int ls_first; unsigned int ls_first;
unsigned int ls_nodir;
const struct lm_lockops *ls_ops; const struct lm_lockops *ls_ops;
dlm_lockspace_t *ls_dlm; dlm_lockspace_t *ls_dlm;
......
...@@ -1209,8 +1209,6 @@ static int gdlm_mount(struct gfs2_sbd *sdp, const char *table) ...@@ -1209,8 +1209,6 @@ static int gdlm_mount(struct gfs2_sbd *sdp, const char *table)
fsname++; fsname++;
flags = DLM_LSFL_FS | DLM_LSFL_NEWEXCL; flags = DLM_LSFL_FS | DLM_LSFL_NEWEXCL;
if (ls->ls_nodir)
flags |= DLM_LSFL_NODIR;
/* /*
* create/join lockspace * create/join lockspace
......
...@@ -994,6 +994,7 @@ static int gfs2_lm_mount(struct gfs2_sbd *sdp, int silent) ...@@ -994,6 +994,7 @@ static int gfs2_lm_mount(struct gfs2_sbd *sdp, int silent)
ls->ls_jid = option; ls->ls_jid = option;
break; break;
case Opt_id: case Opt_id:
case Opt_nodir:
/* Obsolete, but left for backward compat purposes */ /* Obsolete, but left for backward compat purposes */
break; break;
case Opt_first: case Opt_first:
...@@ -1002,12 +1003,6 @@ static int gfs2_lm_mount(struct gfs2_sbd *sdp, int silent) ...@@ -1002,12 +1003,6 @@ static int gfs2_lm_mount(struct gfs2_sbd *sdp, int silent)
goto hostdata_error; goto hostdata_error;
ls->ls_first = option; ls->ls_first = option;
break; break;
case Opt_nodir:
ret = match_int(&tmp[0], &option);
if (ret || (option != 0 && option != 1))
goto hostdata_error;
ls->ls_nodir = option;
break;
case Opt_err: case Opt_err:
default: default:
hostdata_error: hostdata_error:
......
...@@ -67,7 +67,6 @@ struct dlm_lksb { ...@@ -67,7 +67,6 @@ struct dlm_lksb {
/* dlm_new_lockspace() flags */ /* dlm_new_lockspace() flags */
#define DLM_LSFL_NODIR 0x00000001
#define DLM_LSFL_TIMEWARN 0x00000002 #define DLM_LSFL_TIMEWARN 0x00000002
#define DLM_LSFL_FS 0x00000004 #define DLM_LSFL_FS 0x00000004
#define DLM_LSFL_NEWEXCL 0x00000008 #define DLM_LSFL_NEWEXCL 0x00000008
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册