thread.c 8.4 KB
Newer Older
1 2 3 4 5 6
/*
 * Copyright (C) Sistina Software, Inc.  1997-2003 All rights reserved.
 * Copyright (C) 2004-2005 Red Hat, Inc.  All rights reserved.
 *
 * This copyrighted material is made available to anyone wishing to use,
 * modify, copy, or redistribute it subject to the terms and conditions
7
 * of the GNU General Public License version 2.
8
 */
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27

#include "lock_dlm.h"

/* A lock placed on this queue is re-submitted to DLM as soon as the lock_dlm
   thread gets to it. */

static void queue_submit(struct gdlm_lock *lp)
{
	struct gdlm_ls *ls = lp->ls;

	spin_lock(&ls->async_lock);
	list_add_tail(&lp->delay_list, &ls->submit);
	spin_unlock(&ls->async_lock);
	wake_up(&ls->thread_wait);
}

static void process_blocking(struct gdlm_lock *lp, int bast_mode)
{
	struct gdlm_ls *ls = lp->ls;
28
	unsigned int cb = 0;
29 30 31 32 33 34 35 36 37 38 39 40

	switch (gdlm_make_lmstate(bast_mode)) {
	case LM_ST_EXCLUSIVE:
		cb = LM_CB_NEED_E;
		break;
	case LM_ST_DEFERRED:
		cb = LM_CB_NEED_D;
		break;
	case LM_ST_SHARED:
		cb = LM_CB_NEED_S;
		break;
	default:
41
		gdlm_assert(0, "unknown bast mode %u", lp->bast_mode);
42 43
	}

S
Steven Whitehouse 已提交
44
	ls->fscb(ls->sdp, cb, &lp->lockname);
45 46
}

47 48 49 50 51 52 53
static void wake_up_ast(struct gdlm_lock *lp)
{
	clear_bit(LFL_AST_WAIT, &lp->flags);
	smp_mb__after_clear_bit();
	wake_up_bit(&lp->flags, LFL_AST_WAIT);
}

54 55 56 57
static void process_complete(struct gdlm_lock *lp)
{
	struct gdlm_ls *ls = lp->ls;
	struct lm_async_cb acb;
58
	s16 prev_mode = lp->cur;
59 60 61 62

	memset(&acb, 0, sizeof(acb));

	if (lp->lksb.sb_status == -DLM_ECANCEL) {
63
		log_info("complete dlm cancel %x,%llx flags %lx",
64
		 	 lp->lockname.ln_type,
65
			 (unsigned long long)lp->lockname.ln_number,
66
			 lp->flags);
67 68 69 70 71 72 73 74 75 76

		lp->req = lp->cur;
		acb.lc_ret |= LM_OUT_CANCELED;
		if (lp->cur == DLM_LOCK_IV)
			lp->lksb.sb_lkid = 0;
		goto out;
	}

	if (test_and_clear_bit(LFL_DLM_UNLOCK, &lp->flags)) {
		if (lp->lksb.sb_status != -DLM_EUNLOCK) {
77 78
			log_info("unlock sb_status %d %x,%llx flags %lx",
				 lp->lksb.sb_status, lp->lockname.ln_type,
79 80
				 (unsigned long long)lp->lockname.ln_number,
				 lp->flags);
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
			return;
		}

		lp->cur = DLM_LOCK_IV;
		lp->req = DLM_LOCK_IV;
		lp->lksb.sb_lkid = 0;

		if (test_and_clear_bit(LFL_UNLOCK_DELETE, &lp->flags)) {
			gdlm_delete_lp(lp);
			return;
		}
		goto out;
	}

	if (lp->lksb.sb_flags & DLM_SBF_VALNOTVALID)
		memset(lp->lksb.sb_lvbptr, 0, GDLM_LVB_SIZE);

	if (lp->lksb.sb_flags & DLM_SBF_ALTMODE) {
		if (lp->req == DLM_LOCK_PR)
			lp->req = DLM_LOCK_CW;
		else if (lp->req == DLM_LOCK_CW)
			lp->req = DLM_LOCK_PR;
	}

	/*
	 * A canceled lock request.  The lock was just taken off the delayed
	 * list and was never even submitted to dlm.
	 */

	if (test_and_clear_bit(LFL_CANCEL, &lp->flags)) {
111
		log_info("complete internal cancel %x,%llx",
112
		 	 lp->lockname.ln_type,
113
			 (unsigned long long)lp->lockname.ln_number);
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
		lp->req = lp->cur;
		acb.lc_ret |= LM_OUT_CANCELED;
		goto out;
	}

	/*
	 * An error occured.
	 */

	if (lp->lksb.sb_status) {
		/* a "normal" error */
		if ((lp->lksb.sb_status == -EAGAIN) &&
		    (lp->lkf & DLM_LKF_NOQUEUE)) {
			lp->req = lp->cur;
			if (lp->cur == DLM_LOCK_IV)
				lp->lksb.sb_lkid = 0;
			goto out;
		}

		/* this could only happen with cancels I think */
134 135
		log_info("ast sb_status %d %x,%llx flags %lx",
			 lp->lksb.sb_status, lp->lockname.ln_type,
136 137
			 (unsigned long long)lp->lockname.ln_number,
			 lp->flags);
138 139 140 141 142 143 144 145
		return;
	}

	/*
	 * This is an AST for an EX->EX conversion for sync_lvb from GFS.
	 */

	if (test_and_clear_bit(LFL_SYNC_LVB, &lp->flags)) {
146
		wake_up_ast(lp);
147 148 149 150 151 152 153 154 155 156
		return;
	}

	/*
	 * A lock has been demoted to NL because it initially completed during
	 * BLOCK_LOCKS.  Now it must be requested in the originally requested
	 * mode.
	 */

	if (test_and_clear_bit(LFL_REREQUEST, &lp->flags)) {
157
		gdlm_assert(lp->req == DLM_LOCK_NL, "%x,%llx",
158 159
			    lp->lockname.ln_type,
			    (unsigned long long)lp->lockname.ln_number);
160
		gdlm_assert(lp->prev_req > DLM_LOCK_NL, "%x,%llx",
161 162
			    lp->lockname.ln_type,
			    (unsigned long long)lp->lockname.ln_number);
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197

		lp->cur = DLM_LOCK_NL;
		lp->req = lp->prev_req;
		lp->prev_req = DLM_LOCK_IV;
		lp->lkf &= ~DLM_LKF_CONVDEADLK;

		set_bit(LFL_NOCACHE, &lp->flags);

		if (test_bit(DFL_BLOCK_LOCKS, &ls->flags) &&
		    !test_bit(LFL_NOBLOCK, &lp->flags))
			gdlm_queue_delayed(lp);
		else
			queue_submit(lp);
		return;
	}

	/*
	 * A request is granted during dlm recovery.  It may be granted
	 * because the locks of a failed node were cleared.  In that case,
	 * there may be inconsistent data beneath this lock and we must wait
	 * for recovery to complete to use it.  When gfs recovery is done this
	 * granted lock will be converted to NL and then reacquired in this
	 * granted state.
	 */

	if (test_bit(DFL_BLOCK_LOCKS, &ls->flags) &&
	    !test_bit(LFL_NOBLOCK, &lp->flags) &&
	    lp->req != DLM_LOCK_NL) {

		lp->cur = lp->req;
		lp->prev_req = lp->req;
		lp->req = DLM_LOCK_NL;
		lp->lkf |= DLM_LKF_CONVERT;
		lp->lkf &= ~DLM_LKF_CONVDEADLK;

198
		log_debug("rereq %x,%llx id %x %d,%d",
199 200
			  lp->lockname.ln_type,
			  (unsigned long long)lp->lockname.ln_number,
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
			  lp->lksb.sb_lkid, lp->cur, lp->req);

		set_bit(LFL_REREQUEST, &lp->flags);
		queue_submit(lp);
		return;
	}

	/*
	 * DLM demoted the lock to NL before it was granted so GFS must be
	 * told it cannot cache data for this lock.
	 */

	if (lp->lksb.sb_flags & DLM_SBF_DEMOTED)
		set_bit(LFL_NOCACHE, &lp->flags);

216
out:
217 218 219 220 221 222 223
	/*
	 * This is an internal lock_dlm lock
	 */

	if (test_bit(LFL_INLOCK, &lp->flags)) {
		clear_bit(LFL_NOBLOCK, &lp->flags);
		lp->cur = lp->req;
224
		wake_up_ast(lp);
225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
		return;
	}

	/*
	 * Normal completion of a lock request.  Tell GFS it now has the lock.
	 */

	clear_bit(LFL_NOBLOCK, &lp->flags);
	lp->cur = lp->req;

	acb.lc_name = lp->lockname;
	acb.lc_ret |= gdlm_make_lmstate(lp->cur);

	if (!test_and_clear_bit(LFL_NOCACHE, &lp->flags) &&
	    (lp->cur > DLM_LOCK_NL) && (prev_mode > DLM_LOCK_NL))
		acb.lc_ret |= LM_OUT_CACHEABLE;

S
Steven Whitehouse 已提交
242
	ls->fscb(ls->sdp, LM_CB_ASYNC, &acb);
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
}

static inline int no_work(struct gdlm_ls *ls, int blocking)
{
	int ret;

	spin_lock(&ls->async_lock);
	ret = list_empty(&ls->complete) && list_empty(&ls->submit);
	if (ret && blocking)
		ret = list_empty(&ls->blocking);
	spin_unlock(&ls->async_lock);

	return ret;
}

static inline int check_drop(struct gdlm_ls *ls)
{
	if (!ls->drop_locks_count)
		return 0;

	if (time_after(jiffies, ls->drop_time + ls->drop_locks_period * HZ)) {
		ls->drop_time = jiffies;
		if (ls->all_locks_count >= ls->drop_locks_count)
			return 1;
	}
	return 0;
}

static int gdlm_thread(void *data)
{
	struct gdlm_ls *ls = (struct gdlm_ls *) data;
	struct gdlm_lock *lp = NULL;
	int blist = 0;
	uint8_t complete, blocking, submit, drop;
	DECLARE_WAITQUEUE(wait, current);

	/* Only thread1 is allowed to do blocking callbacks since gfs
	   may wait for a completion callback within a blocking cb. */

	if (current == ls->thread1)
		blist = 1;

	while (!kthread_should_stop()) {
		set_current_state(TASK_INTERRUPTIBLE);
		add_wait_queue(&ls->thread_wait, &wait);
		if (no_work(ls, blist))
			schedule();
		remove_wait_queue(&ls->thread_wait, &wait);
		set_current_state(TASK_RUNNING);

		complete = blocking = submit = drop = 0;

		spin_lock(&ls->async_lock);

		if (blist && !list_empty(&ls->blocking)) {
			lp = list_entry(ls->blocking.next, struct gdlm_lock,
					blist);
			list_del_init(&lp->blist);
			blocking = lp->bast_mode;
			lp->bast_mode = 0;
		} else if (!list_empty(&ls->complete)) {
			lp = list_entry(ls->complete.next, struct gdlm_lock,
					clist);
			list_del_init(&lp->clist);
			complete = 1;
		} else if (!list_empty(&ls->submit)) {
			lp = list_entry(ls->submit.next, struct gdlm_lock,
					delay_list);
			list_del_init(&lp->delay_list);
			submit = 1;
		}

		drop = check_drop(ls);
		spin_unlock(&ls->async_lock);

		if (complete)
			process_complete(lp);

		else if (blocking)
			process_blocking(lp, blocking);

		else if (submit)
325
			gdlm_do_lock(lp);
326 327

		if (drop)
S
Steven Whitehouse 已提交
328
			ls->fscb(ls->sdp, LM_CB_DROPLOCKS, NULL);
329 330 331 332 333 334 335 336 337 338 339 340 341 342 343

		schedule();
	}

	return 0;
}

int gdlm_init_threads(struct gdlm_ls *ls)
{
	struct task_struct *p;
	int error;

	p = kthread_run(gdlm_thread, ls, "lock_dlm1");
	error = IS_ERR(p);
	if (error) {
344
		log_error("can't start lock_dlm1 thread %d", error);
345 346 347 348 349 350 351
		return error;
	}
	ls->thread1 = p;

	p = kthread_run(gdlm_thread, ls, "lock_dlm2");
	error = IS_ERR(p);
	if (error) {
352
		log_error("can't start lock_dlm2 thread %d", error);
353 354 355 356 357 358 359 360 361 362 363 364 365 366
		kthread_stop(ls->thread1);
		return error;
	}
	ls->thread2 = p;

	return 0;
}

void gdlm_release_threads(struct gdlm_ls *ls)
{
	kthread_stop(ls->thread1);
	kthread_stop(ls->thread2);
}