提交 ab479995 编写于 作者: L Linus Torvalds

Merge branch 'upstream-linus' of git://oss.oracle.com/home/sourcebo/git/ocfs2

......@@ -208,6 +208,9 @@ static inline void __dlm_set_joining_node(struct dlm_ctxt *dlm,
#define DLM_LOCK_RES_IN_PROGRESS 0x00000010
#define DLM_LOCK_RES_MIGRATING 0x00000020
/* max milliseconds to wait to sync up a network failure with a node death */
#define DLM_NODE_DEATH_WAIT_MAX (5 * 1000)
#define DLM_PURGE_INTERVAL_MS (8 * 1000)
struct dlm_lock_resource
......@@ -658,6 +661,7 @@ int dlm_launch_recovery_thread(struct dlm_ctxt *dlm);
void dlm_complete_recovery_thread(struct dlm_ctxt *dlm);
void dlm_wait_for_recovery(struct dlm_ctxt *dlm);
int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node);
int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout);
void dlm_put(struct dlm_ctxt *dlm);
struct dlm_ctxt *dlm_grab(struct dlm_ctxt *dlm);
......
......@@ -392,6 +392,11 @@ static enum dlm_status dlm_send_remote_convert_request(struct dlm_ctxt *dlm,
} else {
mlog_errno(tmpret);
if (dlm_is_host_down(tmpret)) {
/* instead of logging the same network error over
* and over, sleep here and wait for the heartbeat
* to notice the node is dead. times out after 5s. */
dlm_wait_for_node_death(dlm, res->owner,
DLM_NODE_DEATH_WAIT_MAX);
ret = DLM_RECOVERING;
mlog(0, "node %u died so returning DLM_RECOVERING "
"from convert message!\n", res->owner);
......@@ -421,7 +426,7 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
struct dlm_lockstatus *lksb;
enum dlm_status status = DLM_NORMAL;
u32 flags;
int call_ast = 0, kick_thread = 0;
int call_ast = 0, kick_thread = 0, ast_reserved = 0;
if (!dlm_grab(dlm)) {
dlm_error(DLM_REJECTED);
......@@ -490,6 +495,7 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
status = __dlm_lockres_state_to_status(res);
if (status == DLM_NORMAL) {
__dlm_lockres_reserve_ast(res);
ast_reserved = 1;
res->state |= DLM_LOCK_RES_IN_PROGRESS;
status = __dlmconvert_master(dlm, res, lock, flags,
cnv->requested_type,
......@@ -512,10 +518,10 @@ int dlm_convert_lock_handler(struct o2net_msg *msg, u32 len, void *data)
else
dlm_lock_put(lock);
/* either queue the ast or release it */
/* either queue the ast or release it, if reserved */
if (call_ast)
dlm_queue_ast(dlm, lock);
else
else if (ast_reserved)
dlm_lockres_release_ast(dlm, res);
if (kick_thread)
......
......@@ -220,6 +220,17 @@ static enum dlm_status dlmlock_remote(struct dlm_ctxt *dlm,
dlm_error(status);
dlm_revert_pending_lock(res, lock);
dlm_lock_put(lock);
} else if (dlm_is_recovery_lock(res->lockname.name,
res->lockname.len)) {
/* special case for the $RECOVERY lock.
* there will never be an AST delivered to put
* this lock on the proper secondary queue
* (granted), so do it manually. */
mlog(0, "%s: $RECOVERY lock for this node (%u) is "
"mastered by %u; got lock, manually granting (no ast)\n",
dlm->name, dlm->node_num, res->owner);
list_del_init(&lock->list);
list_add_tail(&lock->list, &res->granted);
}
spin_unlock(&res->spinlock);
......@@ -646,7 +657,19 @@ enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode,
mlog(0, "retrying lock with migration/"
"recovery/in progress\n");
msleep(100);
dlm_wait_for_recovery(dlm);
/* no waiting for dlm_reco_thread */
if (recovery) {
if (status == DLM_RECOVERING) {
mlog(0, "%s: got RECOVERING "
"for $REOCVERY lock, master "
"was %u\n", dlm->name,
res->owner);
dlm_wait_for_node_death(dlm, res->owner,
DLM_NODE_DEATH_WAIT_MAX);
}
} else {
dlm_wait_for_recovery(dlm);
}
goto retry_lock;
}
......
......@@ -2482,7 +2482,9 @@ void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
atomic_set(&mle->woken, 1);
spin_unlock(&mle->spinlock);
wake_up(&mle->wq);
/* final put will take care of list removal */
/* do not need events any longer, so detach
* from heartbeat */
__dlm_mle_detach_hb_events(dlm, mle);
__dlm_put_mle(mle);
}
continue;
......@@ -2537,6 +2539,9 @@ void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node)
spin_unlock(&res->spinlock);
dlm_lockres_put(res);
/* about to get rid of mle, detach from heartbeat */
__dlm_mle_detach_hb_events(dlm, mle);
/* dump the mle */
spin_lock(&dlm->master_lock);
__dlm_put_mle(mle);
......
......@@ -278,6 +278,24 @@ int dlm_is_node_dead(struct dlm_ctxt *dlm, u8 node)
return dead;
}
int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout)
{
if (timeout) {
mlog(ML_NOTICE, "%s: waiting %dms for notification of "
"death of node %u\n", dlm->name, timeout, node);
wait_event_timeout(dlm->dlm_reco_thread_wq,
dlm_is_node_dead(dlm, node),
msecs_to_jiffies(timeout));
} else {
mlog(ML_NOTICE, "%s: waiting indefinitely for notification "
"of death of node %u\n", dlm->name, node);
wait_event(dlm->dlm_reco_thread_wq,
dlm_is_node_dead(dlm, node));
}
/* for now, return 0 */
return 0;
}
/* callers of the top-level api calls (dlmlock/dlmunlock) should
* block on the dlm->reco.event when recovery is in progress.
* the dlm recovery thread will set this state when it begins
......@@ -2032,6 +2050,30 @@ static int dlm_pick_recovery_master(struct dlm_ctxt *dlm)
dlm->reco.new_master);
status = -EEXIST;
} else {
status = 0;
/* see if recovery was already finished elsewhere */
spin_lock(&dlm->spinlock);
if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
status = -EINVAL;
mlog(0, "%s: got reco EX lock, but "
"node got recovered already\n", dlm->name);
if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) {
mlog(ML_ERROR, "%s: new master is %u "
"but no dead node!\n",
dlm->name, dlm->reco.new_master);
BUG();
}
}
spin_unlock(&dlm->spinlock);
}
/* if this node has actually become the recovery master,
* set the master and send the messages to begin recovery */
if (!status) {
mlog(0, "%s: dead=%u, this=%u, sending "
"begin_reco now\n", dlm->name,
dlm->reco.dead_node, dlm->node_num);
status = dlm_send_begin_reco_message(dlm,
dlm->reco.dead_node);
/* this always succeeds */
......
......@@ -1584,10 +1584,9 @@ static int ocfs2_commit_thread(void *arg)
while (!(kthread_should_stop() &&
atomic_read(&journal->j_num_trans) == 0)) {
wait_event_interruptible_timeout(osb->checkpoint_event,
atomic_read(&journal->j_num_trans)
|| kthread_should_stop(),
OCFS2_CHECKPOINT_INTERVAL);
wait_event_interruptible(osb->checkpoint_event,
atomic_read(&journal->j_num_trans)
|| kthread_should_stop());
status = ocfs2_commit_cache(osb);
if (status < 0)
......
......@@ -29,8 +29,6 @@
#include <linux/fs.h>
#include <linux/jbd.h>
#define OCFS2_CHECKPOINT_INTERVAL (8 * HZ)
enum ocfs2_journal_state {
OCFS2_JOURNAL_FREE = 0,
OCFS2_JOURNAL_LOADED,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册