xpc_channel.c 27.2 KB
Newer Older
1 2 3 4 5
/*
 * This file is subject to the terms and conditions of the GNU General Public
 * License.  See the file "COPYING" in the main directory of this archive
 * for more details.
 *
6
 * Copyright (c) 2004-2008 Silicon Graphics, Inc.  All Rights Reserved.
7 8 9 10 11 12 13 14 15 16
 */

/*
 * Cross Partition Communication (XPC) channel support.
 *
 *	This is the part of XPC that manages the channels and
 *	sends/receives messages across them to/from other partitions.
 *
 */

17
#include <linux/device.h>
18
#include "xpc.h"
19 20 21 22 23 24 25 26 27 28

/*
 * Process a connect message from a remote partition.
 *
 * Note: xpc_process_connect() is expecting to be called with the
 * spin_lock_irqsave held and will leave it locked upon return.
 */
static void
xpc_process_connect(struct xpc_channel *ch, unsigned long *irq_flags)
{
29
	enum xp_retval ret;
30 31 32 33

	DBUG_ON(!spin_is_locked(&ch->lock));

	if (!(ch->flags & XPC_C_OPENREQUEST) ||
34
	    !(ch->flags & XPC_C_ROPENREQUEST)) {
35 36 37 38 39 40 41 42 43 44
		/* nothing more to do for now */
		return;
	}
	DBUG_ON(!(ch->flags & XPC_C_CONNECTING));

	if (!(ch->flags & XPC_C_SETUP)) {
		spin_unlock_irqrestore(&ch->lock, *irq_flags);
		ret = xpc_allocate_msgqueues(ch);
		spin_lock_irqsave(&ch->lock, *irq_flags);

45
		if (ret != xpSuccess)
46
			XPC_DISCONNECT_CHANNEL(ch, ret, irq_flags);
47

48 49
		ch->flags |= XPC_C_SETUP;

50
		if (ch->flags & (XPC_C_CONNECTED | XPC_C_DISCONNECTING))
51 52 53 54 55 56 57 58
			return;

		DBUG_ON(ch->local_msgqueue == NULL);
		DBUG_ON(ch->remote_msgqueue == NULL);
	}

	if (!(ch->flags & XPC_C_OPENREPLY)) {
		ch->flags |= XPC_C_OPENREPLY;
59
		xpc_send_chctl_openreply(ch, irq_flags);
60 61
	}

62
	if (!(ch->flags & XPC_C_ROPENREPLY))
63 64 65 66 67 68 69
		return;

	DBUG_ON(ch->remote_msgqueue_pa == 0);

	ch->flags = (XPC_C_CONNECTED | XPC_C_SETUP);	/* clear all else */

	dev_info(xpc_chan, "channel %d to partition %d connected\n",
70
		 ch->number, ch->partid);
71 72

	spin_unlock_irqrestore(&ch->lock, *irq_flags);
73
	xpc_create_kthreads(ch, 1, 0);
74 75 76 77 78 79 80 81 82 83
	spin_lock_irqsave(&ch->lock, *irq_flags);
}

/*
 * spin_lock_irqsave() is expected to be held on entry.
 */
static void
xpc_process_disconnect(struct xpc_channel *ch, unsigned long *irq_flags)
{
	struct xpc_partition *part = &xpc_partitions[ch->partid];
84
	u32 channel_was_connected = (ch->flags & XPC_C_WASCONNECTED);
85 86 87

	DBUG_ON(!spin_is_locked(&ch->lock));

88
	if (!(ch->flags & XPC_C_DISCONNECTING))
89 90 91 92 93 94
		return;

	DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));

	/* make sure all activity has settled down first */

95
	if (atomic_read(&ch->kthreads_assigned) > 0 ||
96
	    atomic_read(&ch->references) > 0) {
97 98
		return;
	}
99
	DBUG_ON((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
100
		!(ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE));
101

102 103
	if (part->act_state == XPC_P_DEACTIVATING) {
		/* can't proceed until the other side disengages from us */
104
		if (xpc_partition_engaged(ch->partid))
105
			return;
106

107
	} else {
108 109 110

		/* as long as the other side is up do the full protocol */

111
		if (!(ch->flags & XPC_C_RCLOSEREQUEST))
112 113 114 115
			return;

		if (!(ch->flags & XPC_C_CLOSEREPLY)) {
			ch->flags |= XPC_C_CLOSEREPLY;
116
			xpc_send_chctl_closereply(ch, irq_flags);
117 118
		}

119
		if (!(ch->flags & XPC_C_RCLOSEREPLY))
120 121 122
			return;
	}

123 124
	/* wake those waiting for notify completion */
	if (atomic_read(&ch->n_to_notify) > 0) {
125
		/* we do callout while holding ch->lock, callout can't block */
126
		xpc_notify_senders_of_disconnect(ch);
127 128
	}

129 130
	/* both sides are disconnected now */

131
	if (ch->flags & XPC_C_DISCONNECTINGCALLOUT_MADE) {
132
		spin_unlock_irqrestore(&ch->lock, *irq_flags);
133
		xpc_disconnect_callout(ch, xpDisconnected);
134 135 136
		spin_lock_irqsave(&ch->lock, *irq_flags);
	}

137 138 139
	/* it's now safe to free the channel's message queues */
	xpc_free_msgqueues(ch);

140 141 142 143 144
	/*
	 * Mark the channel disconnected and clear all other flags, including
	 * XPC_C_SETUP (because of call to xpc_free_msgqueues()) but not
	 * including XPC_C_WDISCONNECT (if it was set).
	 */
145
	ch->flags = (XPC_C_DISCONNECTED | (ch->flags & XPC_C_WDISCONNECT));
146 147 148

	atomic_dec(&part->nchannels_active);

149
	if (channel_was_connected) {
150
		dev_info(xpc_chan, "channel %d to partition %d disconnected, "
151
			 "reason=%d\n", ch->number, ch->partid, ch->reason);
152
	}
153 154

	if (ch->flags & XPC_C_WDISCONNECT) {
J
Jes Sorensen 已提交
155 156
		/* we won't lose the CPU since we're holding ch->lock */
		complete(&ch->wdisconnect_wait);
157
	} else if (ch->delayed_chctl_flags) {
158
		if (part->act_state != XPC_P_DEACTIVATING) {
159 160 161 162 163
			/* time to take action on any delayed chctl flags */
			spin_lock(&part->chctl_lock);
			part->chctl.flags[ch->number] |=
			    ch->delayed_chctl_flags;
			spin_unlock(&part->chctl_lock);
164
		}
165
		ch->delayed_chctl_flags = 0;
166
	}
167 168 169 170 171 172
}

/*
 * Process a change in the channel's remote connection state.
 */
static void
173 174
xpc_process_openclose_chctl_flags(struct xpc_partition *part, int ch_number,
				  u8 chctl_flags)
175 176 177
{
	unsigned long irq_flags;
	struct xpc_openclose_args *args =
178
	    &part->remote_openclose_args[ch_number];
179
	struct xpc_channel *ch = &part->channels[ch_number];
180
	enum xp_retval reason;
181 182 183

	spin_lock_irqsave(&ch->lock, irq_flags);

184
again:
185

186 187
	if ((ch->flags & XPC_C_DISCONNECTED) &&
	    (ch->flags & XPC_C_WDISCONNECT)) {
188
		/*
189
		 * Delay processing chctl flags until thread waiting disconnect
190 191
		 * has had a chance to see that the channel is disconnected.
		 */
192
		ch->delayed_chctl_flags |= chctl_flags;
193 194 195 196
		spin_unlock_irqrestore(&ch->lock, irq_flags);
		return;
	}

197
	if (chctl_flags & XPC_CHCTL_CLOSEREQUEST) {
198

199
		dev_dbg(xpc_chan, "XPC_CHCTL_CLOSEREQUEST (reason=%d) received "
200 201 202 203 204 205
			"from partid=%d, channel=%d\n", args->reason,
			ch->partid, ch->number);

		/*
		 * If RCLOSEREQUEST is set, we're probably waiting for
		 * RCLOSEREPLY. We should find it and a ROPENREQUEST packed
206
		 * with this RCLOSEREQUEST in the chctl_flags.
207 208 209 210 211 212 213 214
		 */

		if (ch->flags & XPC_C_RCLOSEREQUEST) {
			DBUG_ON(!(ch->flags & XPC_C_DISCONNECTING));
			DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
			DBUG_ON(!(ch->flags & XPC_C_CLOSEREPLY));
			DBUG_ON(ch->flags & XPC_C_RCLOSEREPLY);

215 216
			DBUG_ON(!(chctl_flags & XPC_CHCTL_CLOSEREPLY));
			chctl_flags &= ~XPC_CHCTL_CLOSEREPLY;
217 218 219 220
			ch->flags |= XPC_C_RCLOSEREPLY;

			/* both sides have finished disconnecting */
			xpc_process_disconnect(ch, &irq_flags);
221 222
			DBUG_ON(!(ch->flags & XPC_C_DISCONNECTED));
			goto again;
223 224 225
		}

		if (ch->flags & XPC_C_DISCONNECTED) {
226 227 228 229 230 231 232 233 234
			if (!(chctl_flags & XPC_CHCTL_OPENREQUEST)) {
				if (part->chctl.flags[ch_number] &
				    XPC_CHCTL_OPENREQUEST) {

					DBUG_ON(ch->delayed_chctl_flags != 0);
					spin_lock(&part->chctl_lock);
					part->chctl.flags[ch_number] |=
					    XPC_CHCTL_CLOSEREQUEST;
					spin_unlock(&part->chctl_lock);
235
				}
236 237 238 239 240 241 242 243 244 245 246
				spin_unlock_irqrestore(&ch->lock, irq_flags);
				return;
			}

			XPC_SET_REASON(ch, 0, 0);
			ch->flags &= ~XPC_C_DISCONNECTED;

			atomic_inc(&part->nchannels_active);
			ch->flags |= (XPC_C_CONNECTING | XPC_C_ROPENREQUEST);
		}

247
		chctl_flags &= ~(XPC_CHCTL_OPENREQUEST | XPC_CHCTL_OPENREPLY);
248 249 250 251 252 253 254 255 256 257

		/*
		 * The meaningful CLOSEREQUEST connection state fields are:
		 *      reason = reason connection is to be closed
		 */

		ch->flags |= XPC_C_RCLOSEREQUEST;

		if (!(ch->flags & XPC_C_DISCONNECTING)) {
			reason = args->reason;
258 259 260 261
			if (reason <= xpSuccess || reason > xpUnknownReason)
				reason = xpUnknownReason;
			else if (reason == xpUnregistering)
				reason = xpOtherUnregistering;
262 263

			XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);
264

265
			DBUG_ON(chctl_flags & XPC_CHCTL_CLOSEREPLY);
266 267
			spin_unlock_irqrestore(&ch->lock, irq_flags);
			return;
268
		}
269 270

		xpc_process_disconnect(ch, &irq_flags);
271 272
	}

273
	if (chctl_flags & XPC_CHCTL_CLOSEREPLY) {
274

275 276
		dev_dbg(xpc_chan, "XPC_CHCTL_CLOSEREPLY received from partid="
			"%d, channel=%d\n", ch->partid, ch->number);
277 278 279 280 281 282 283 284

		if (ch->flags & XPC_C_DISCONNECTED) {
			DBUG_ON(part->act_state != XPC_P_DEACTIVATING);
			spin_unlock_irqrestore(&ch->lock, irq_flags);
			return;
		}

		DBUG_ON(!(ch->flags & XPC_C_CLOSEREQUEST));
285 286

		if (!(ch->flags & XPC_C_RCLOSEREQUEST)) {
287 288 289 290 291 292 293 294
			if (part->chctl.flags[ch_number] &
			    XPC_CHCTL_CLOSEREQUEST) {

				DBUG_ON(ch->delayed_chctl_flags != 0);
				spin_lock(&part->chctl_lock);
				part->chctl.flags[ch_number] |=
				    XPC_CHCTL_CLOSEREPLY;
				spin_unlock(&part->chctl_lock);
295 296 297 298
			}
			spin_unlock_irqrestore(&ch->lock, irq_flags);
			return;
		}
299 300 301 302 303 304 305 306 307

		ch->flags |= XPC_C_RCLOSEREPLY;

		if (ch->flags & XPC_C_CLOSEREPLY) {
			/* both sides have finished disconnecting */
			xpc_process_disconnect(ch, &irq_flags);
		}
	}

308
	if (chctl_flags & XPC_CHCTL_OPENREQUEST) {
309

310
		dev_dbg(xpc_chan, "XPC_CHCTL_OPENREQUEST (msg_size=%d, "
311 312 313 314
			"local_nentries=%d) received from partid=%d, "
			"channel=%d\n", args->msg_size, args->local_nentries,
			ch->partid, ch->number);

315
		if (part->act_state == XPC_P_DEACTIVATING ||
316
		    (ch->flags & XPC_C_ROPENREQUEST)) {
317 318 319 320 321
			spin_unlock_irqrestore(&ch->lock, irq_flags);
			return;
		}

		if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_WDISCONNECT)) {
322
			ch->delayed_chctl_flags |= XPC_CHCTL_OPENREQUEST;
323 324 325 326
			spin_unlock_irqrestore(&ch->lock, irq_flags);
			return;
		}
		DBUG_ON(!(ch->flags & (XPC_C_DISCONNECTED |
327
				       XPC_C_OPENREQUEST)));
328
		DBUG_ON(ch->flags & (XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
329
				     XPC_C_OPENREPLY | XPC_C_CONNECTED));
330 331 332 333 334 335

		/*
		 * The meaningful OPENREQUEST connection state fields are:
		 *      msg_size = size of channel's messages in bytes
		 *      local_nentries = remote partition's local_nentries
		 */
336 337 338 339 340
		if (args->msg_size == 0 || args->local_nentries == 0) {
			/* assume OPENREQUEST was delayed by mistake */
			spin_unlock_irqrestore(&ch->lock, irq_flags);
			return;
		}
341 342 343 344 345 346

		ch->flags |= (XPC_C_ROPENREQUEST | XPC_C_CONNECTING);
		ch->remote_nentries = args->local_nentries;

		if (ch->flags & XPC_C_OPENREQUEST) {
			if (args->msg_size != ch->msg_size) {
347
				XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes,
348
						       &irq_flags);
349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
				spin_unlock_irqrestore(&ch->lock, irq_flags);
				return;
			}
		} else {
			ch->msg_size = args->msg_size;

			XPC_SET_REASON(ch, 0, 0);
			ch->flags &= ~XPC_C_DISCONNECTED;

			atomic_inc(&part->nchannels_active);
		}

		xpc_process_connect(ch, &irq_flags);
	}

364
	if (chctl_flags & XPC_CHCTL_OPENREPLY) {
365

366 367 368
		dev_dbg(xpc_chan, "XPC_CHCTL_OPENREPLY (local_msgqueue_pa="
			"0x%lx, local_nentries=%d, remote_nentries=%d) "
			"received from partid=%d, channel=%d\n",
369 370
			args->local_msgqueue_pa, args->local_nentries,
			args->remote_nentries, ch->partid, ch->number);
371 372 373 374 375

		if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED)) {
			spin_unlock_irqrestore(&ch->lock, irq_flags);
			return;
		}
376
		if (!(ch->flags & XPC_C_OPENREQUEST)) {
377
			XPC_DISCONNECT_CHANNEL(ch, xpOpenCloseError,
378
					       &irq_flags);
379 380 381 382
			spin_unlock_irqrestore(&ch->lock, irq_flags);
			return;
		}

383 384 385 386 387 388
		DBUG_ON(!(ch->flags & XPC_C_ROPENREQUEST));
		DBUG_ON(ch->flags & XPC_C_CONNECTED);

		/*
		 * The meaningful OPENREPLY connection state fields are:
		 *      local_msgqueue_pa = physical address of remote
389
		 *                          partition's local_msgqueue
390 391 392 393 394 395 396 397 398 399 400
		 *      local_nentries = remote partition's local_nentries
		 *      remote_nentries = remote partition's remote_nentries
		 */
		DBUG_ON(args->local_msgqueue_pa == 0);
		DBUG_ON(args->local_nentries == 0);
		DBUG_ON(args->remote_nentries == 0);

		ch->flags |= XPC_C_ROPENREPLY;
		ch->remote_msgqueue_pa = args->local_msgqueue_pa;

		if (args->local_nentries < ch->remote_nentries) {
401
			dev_dbg(xpc_chan, "XPC_CHCTL_OPENREPLY: new "
402 403 404 405 406 407 408 409
				"remote_nentries=%d, old remote_nentries=%d, "
				"partid=%d, channel=%d\n",
				args->local_nentries, ch->remote_nentries,
				ch->partid, ch->number);

			ch->remote_nentries = args->local_nentries;
		}
		if (args->remote_nentries < ch->local_nentries) {
410
			dev_dbg(xpc_chan, "XPC_CHCTL_OPENREPLY: new "
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427
				"local_nentries=%d, old local_nentries=%d, "
				"partid=%d, channel=%d\n",
				args->remote_nentries, ch->local_nentries,
				ch->partid, ch->number);

			ch->local_nentries = args->remote_nentries;
		}

		xpc_process_connect(ch, &irq_flags);
	}

	spin_unlock_irqrestore(&ch->lock, irq_flags);
}

/*
 * Attempt to establish a channel connection to a remote partition.
 */
428
static enum xp_retval
429 430 431 432 433
xpc_connect_channel(struct xpc_channel *ch)
{
	unsigned long irq_flags;
	struct xpc_registration *registration = &xpc_registrations[ch->number];

434
	if (mutex_trylock(&registration->mutex) == 0)
435
		return xpRetry;
436 437

	if (!XPC_CHANNEL_REGISTERED(ch->number)) {
J
Jes Sorensen 已提交
438
		mutex_unlock(&registration->mutex);
439
		return xpUnregistered;
440 441 442 443 444 445 446 447 448
	}

	spin_lock_irqsave(&ch->lock, irq_flags);

	DBUG_ON(ch->flags & XPC_C_CONNECTED);
	DBUG_ON(ch->flags & XPC_C_OPENREQUEST);

	if (ch->flags & XPC_C_DISCONNECTING) {
		spin_unlock_irqrestore(&ch->lock, irq_flags);
J
Jes Sorensen 已提交
449
		mutex_unlock(&registration->mutex);
450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479
		return ch->reason;
	}

	/* add info from the channel connect registration to the channel */

	ch->kthreads_assigned_limit = registration->assigned_limit;
	ch->kthreads_idle_limit = registration->idle_limit;
	DBUG_ON(atomic_read(&ch->kthreads_assigned) != 0);
	DBUG_ON(atomic_read(&ch->kthreads_idle) != 0);
	DBUG_ON(atomic_read(&ch->kthreads_active) != 0);

	ch->func = registration->func;
	DBUG_ON(registration->func == NULL);
	ch->key = registration->key;

	ch->local_nentries = registration->nentries;

	if (ch->flags & XPC_C_ROPENREQUEST) {
		if (registration->msg_size != ch->msg_size) {
			/* the local and remote sides aren't the same */

			/*
			 * Because XPC_DISCONNECT_CHANNEL() can block we're
			 * forced to up the registration sema before we unlock
			 * the channel lock. But that's okay here because we're
			 * done with the part that required the registration
			 * sema. XPC_DISCONNECT_CHANNEL() requires that the
			 * channel lock be locked and will unlock and relock
			 * the channel lock as needed.
			 */
J
Jes Sorensen 已提交
480
			mutex_unlock(&registration->mutex);
481
			XPC_DISCONNECT_CHANNEL(ch, xpUnequalMsgSizes,
482
					       &irq_flags);
483
			spin_unlock_irqrestore(&ch->lock, irq_flags);
484
			return xpUnequalMsgSizes;
485 486 487 488 489 490 491 492 493 494
		}
	} else {
		ch->msg_size = registration->msg_size;

		XPC_SET_REASON(ch, 0, 0);
		ch->flags &= ~XPC_C_DISCONNECTED;

		atomic_inc(&xpc_partitions[ch->partid].nchannels_active);
	}

J
Jes Sorensen 已提交
495
	mutex_unlock(&registration->mutex);
496 497 498 499

	/* initiate the connection */

	ch->flags |= (XPC_C_OPENREQUEST | XPC_C_CONNECTING);
500
	xpc_send_chctl_openrequest(ch, &irq_flags);
501 502 503 504 505

	xpc_process_connect(ch, &irq_flags);

	spin_unlock_irqrestore(&ch->lock, irq_flags);

506
	return xpSuccess;
507 508 509
}

void
510
xpc_process_sent_chctl_flags(struct xpc_partition *part)
511 512
{
	unsigned long irq_flags;
513
	union xpc_channel_ctl_flags chctl;
514 515
	struct xpc_channel *ch;
	int ch_number;
516
	u32 ch_flags;
517

518
	chctl.all_flags = xpc_get_chctl_all_flags(part);
519 520 521 522 523 524 525 526 527 528 529 530

	/*
	 * Initiate channel connections for registered channels.
	 *
	 * For each connected channel that has pending messages activate idle
	 * kthreads and/or create new kthreads as needed.
	 */

	for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
		ch = &part->channels[ch_number];

		/*
531
		 * Process any open or close related chctl flags, and then deal
532 533 534
		 * with connecting or disconnecting the channel as required.
		 */

535 536 537 538
		if (chctl.flags[ch_number] & XPC_OPENCLOSE_CHCTL_FLAGS) {
			xpc_process_openclose_chctl_flags(part, ch_number,
							chctl.flags[ch_number]);
		}
539

540
		ch_flags = ch->flags;	/* need an atomic snapshot of flags */
541

542
		if (ch_flags & XPC_C_DISCONNECTING) {
543 544 545 546 547 548
			spin_lock_irqsave(&ch->lock, irq_flags);
			xpc_process_disconnect(ch, &irq_flags);
			spin_unlock_irqrestore(&ch->lock, irq_flags);
			continue;
		}

549
		if (part->act_state == XPC_P_DEACTIVATING)
550 551
			continue;

552 553 554
		if (!(ch_flags & XPC_C_CONNECTED)) {
			if (!(ch_flags & XPC_C_OPENREQUEST)) {
				DBUG_ON(ch_flags & XPC_C_SETUP);
555
				(void)xpc_connect_channel(ch);
556 557 558 559 560 561 562 563 564
			} else {
				spin_lock_irqsave(&ch->lock, irq_flags);
				xpc_process_connect(ch, &irq_flags);
				spin_unlock_irqrestore(&ch->lock, irq_flags);
			}
			continue;
		}

		/*
565 566 567
		 * Process any message related chctl flags, this may involve
		 * the activation of kthreads to deliver any pending messages
		 * sent from the other partition.
568 569
		 */

570 571
		if (chctl.flags[ch_number] & XPC_MSG_CHCTL_FLAGS)
			xpc_process_msg_chctl_flags(part, ch_number);
572 573 574 575
	}
}

/*
576 577
 * XPC's heartbeat code calls this function to inform XPC that a partition is
 * going down.  XPC responds by tearing down the XPartition Communication
578 579 580 581 582 583 584
 * infrastructure used for the just downed partition.
 *
 * XPC's heartbeat code will never call this function and xpc_partition_up()
 * at the same time. Nor will it ever make multiple calls to either function
 * at the same time.
 */
void
585
xpc_partition_going_down(struct xpc_partition *part, enum xp_retval reason)
586 587 588 589 590 591 592 593 594 595 596 597 598
{
	unsigned long irq_flags;
	int ch_number;
	struct xpc_channel *ch;

	dev_dbg(xpc_chan, "deactivating partition %d, reason=%d\n",
		XPC_PARTID(part), reason);

	if (!xpc_part_ref(part)) {
		/* infrastructure for this partition isn't currently set up */
		return;
	}

599
	/* disconnect channels associated with the partition going down */
600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624

	for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
		ch = &part->channels[ch_number];

		xpc_msgqueue_ref(ch);
		spin_lock_irqsave(&ch->lock, irq_flags);

		XPC_DISCONNECT_CHANNEL(ch, reason, &irq_flags);

		spin_unlock_irqrestore(&ch->lock, irq_flags);
		xpc_msgqueue_deref(ch);
	}

	xpc_wakeup_channel_mgr(part);

	xpc_part_deref(part);
}

/*
 * Called by XP at the time of channel connection registration to cause
 * XPC to establish connections to all currently active partitions.
 */
void
xpc_initiate_connect(int ch_number)
{
625
	short partid;
626 627 628
	struct xpc_partition *part;
	struct xpc_channel *ch;

629
	DBUG_ON(ch_number < 0 || ch_number >= XPC_MAX_NCHANNELS);
630

631
	for (partid = 0; partid < xp_max_npartitions; partid++) {
632 633 634 635 636
		part = &xpc_partitions[partid];

		if (xpc_part_ref(part)) {
			ch = &part->channels[ch_number];

637 638 639 640 641
			/*
			 * Initiate the establishment of a connection on the
			 * newly registered channel to the remote partition.
			 */
			xpc_wakeup_channel_mgr(part);
642 643 644 645 646 647 648 649 650 651 652
			xpc_part_deref(part);
		}
	}
}

void
xpc_connected_callout(struct xpc_channel *ch)
{
	/* let the registerer know that a connection has been established */

	if (ch->func != NULL) {
653
		dev_dbg(xpc_chan, "ch->func() called, reason=xpConnected, "
654 655
			"partid=%d, channel=%d\n", ch->partid, ch->number);

656
		ch->func(xpConnected, ch->partid, ch->number,
657
			 (void *)(u64)ch->local_nentries, ch->key);
658

659
		dev_dbg(xpc_chan, "ch->func() returned, reason=xpConnected, "
660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680
			"partid=%d, channel=%d\n", ch->partid, ch->number);
	}
}

/*
 * Called by XP at the time of channel connection unregistration to cause
 * XPC to teardown all current connections for the specified channel.
 *
 * Before returning xpc_initiate_disconnect() will wait until all connections
 * on the specified channel have been closed/torndown. So the caller can be
 * assured that they will not be receiving any more callouts from XPC to the
 * function they registered via xpc_connect().
 *
 * Arguments:
 *
 *	ch_number - channel # to unregister.
 */
void
xpc_initiate_disconnect(int ch_number)
{
	unsigned long irq_flags;
681
	short partid;
682 683 684
	struct xpc_partition *part;
	struct xpc_channel *ch;

685
	DBUG_ON(ch_number < 0 || ch_number >= XPC_MAX_NCHANNELS);
686 687

	/* initiate the channel disconnect for every active partition */
688
	for (partid = 0; partid < xp_max_npartitions; partid++) {
689 690 691 692 693 694 695 696
		part = &xpc_partitions[partid];

		if (xpc_part_ref(part)) {
			ch = &part->channels[ch_number];
			xpc_msgqueue_ref(ch);

			spin_lock_irqsave(&ch->lock, irq_flags);

697 698 699
			if (!(ch->flags & XPC_C_DISCONNECTED)) {
				ch->flags |= XPC_C_WDISCONNECT;

700
				XPC_DISCONNECT_CHANNEL(ch, xpUnregistering,
701
						       &irq_flags);
702
			}
703 704 705 706 707 708 709 710 711 712 713 714 715 716

			spin_unlock_irqrestore(&ch->lock, irq_flags);

			xpc_msgqueue_deref(ch);
			xpc_part_deref(part);
		}
	}

	xpc_disconnect_wait(ch_number);
}

/*
 * To disconnect a channel, and reflect it back to all who may be waiting.
 *
717 718 719
 * An OPEN is not allowed until XPC_C_DISCONNECTING is cleared by
 * xpc_process_disconnect(), and if set, XPC_C_WDISCONNECT is cleared by
 * xpc_disconnect_wait().
720 721 722 723 724
 *
 * THE CHANNEL IS TO BE LOCKED BY THE CALLER AND WILL REMAIN LOCKED UPON RETURN.
 */
void
xpc_disconnect_channel(const int line, struct xpc_channel *ch,
725
		       enum xp_retval reason, unsigned long *irq_flags)
726
{
727
	u32 channel_was_connected = (ch->flags & XPC_C_CONNECTED);
728 729 730

	DBUG_ON(!spin_is_locked(&ch->lock));

731
	if (ch->flags & (XPC_C_DISCONNECTING | XPC_C_DISCONNECTED))
732
		return;
733

734 735 736 737 738 739 740
	DBUG_ON(!(ch->flags & (XPC_C_CONNECTING | XPC_C_CONNECTED)));

	dev_dbg(xpc_chan, "reason=%d, line=%d, partid=%d, channel=%d\n",
		reason, line, ch->partid, ch->number);

	XPC_SET_REASON(ch, reason, line);

741
	ch->flags |= (XPC_C_CLOSEREQUEST | XPC_C_DISCONNECTING);
742 743
	/* some of these may not have been set */
	ch->flags &= ~(XPC_C_OPENREQUEST | XPC_C_OPENREPLY |
744 745
		       XPC_C_ROPENREQUEST | XPC_C_ROPENREPLY |
		       XPC_C_CONNECTING | XPC_C_CONNECTED);
746

747
	xpc_send_chctl_closerequest(ch, irq_flags);
748

749
	if (channel_was_connected)
750 751
		ch->flags |= XPC_C_WASCONNECTED;

752 753 754
	spin_unlock_irqrestore(&ch->lock, *irq_flags);

	/* wake all idle kthreads so they can exit */
755 756
	if (atomic_read(&ch->kthreads_idle) > 0) {
		wake_up_all(&ch->idle_wq);
757 758

	} else if ((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
759
		   !(ch->flags & XPC_C_DISCONNECTINGCALLOUT)) {
760
		/* start a kthread that will do the xpDisconnecting callout */
761
		xpc_create_kthreads(ch, 1, 1);
762 763 764
	}

	/* wake those waiting to allocate an entry from the local msg queue */
765
	if (atomic_read(&ch->n_on_msg_allocate_wq) > 0)
766 767 768 769 770 771
		wake_up(&ch->msg_allocate_wq);

	spin_lock_irqsave(&ch->lock, *irq_flags);
}

void
772
xpc_disconnect_callout(struct xpc_channel *ch, enum xp_retval reason)
773 774
{
	/*
775
	 * Let the channel's registerer know that the channel is being
776
	 * disconnected. We don't want to do this if the registerer was never
777
	 * informed of a connection being made.
778 779 780
	 */

	if (ch->func != NULL) {
781 782
		dev_dbg(xpc_chan, "ch->func() called, reason=%d, partid=%d, "
			"channel=%d\n", reason, ch->partid, ch->number);
783

784
		ch->func(reason, ch->partid, ch->number, NULL, ch->key);
785

786 787
		dev_dbg(xpc_chan, "ch->func() returned, reason=%d, partid=%d, "
			"channel=%d\n", reason, ch->partid, ch->number);
788 789 790 791 792 793 794
	}
}

/*
 * Wait for a message entry to become available for the specified channel,
 * but don't wait any longer than 1 jiffy.
 */
795
enum xp_retval
796 797
xpc_allocate_msg_wait(struct xpc_channel *ch)
{
798
	enum xp_retval ret;
799 800

	if (ch->flags & XPC_C_DISCONNECTING) {
801
		DBUG_ON(ch->reason == xpInterrupted);
802 803 804 805 806 807 808 809 810
		return ch->reason;
	}

	atomic_inc(&ch->n_on_msg_allocate_wq);
	ret = interruptible_sleep_on_timeout(&ch->msg_allocate_wq, 1);
	atomic_dec(&ch->n_on_msg_allocate_wq);

	if (ch->flags & XPC_C_DISCONNECTING) {
		ret = ch->reason;
811
		DBUG_ON(ch->reason == xpInterrupted);
812
	} else if (ret == 0) {
813
		ret = xpTimeout;
814
	} else {
815
		ret = xpInterrupted;
816 817 818 819 820 821
	}

	return ret;
}

/*
822 823
 * Send a message that contains the user's payload on the specified channel
 * connected to the specified partition.
824
 *
825 826
 * NOTE that this routine can sleep waiting for a message entry to become
 * available. To not sleep, pass in the XPC_NOWAIT flag.
827
 *
828 829
 * Once sent, this routine will not wait for the message to be received, nor
 * will notification be given when it does happen.
830 831 832 833 834
 *
 * Arguments:
 *
 *	partid - ID of partition to which the channel is connected.
 *	ch_number - channel # to send message on.
835 836 837
 *	flags - see xp.h for valid flags.
 *	payload - pointer to the payload which is to be sent.
 *	payload_size - size of the payload in bytes.
838
 */
839
enum xp_retval
840 841
xpc_initiate_send(short partid, int ch_number, u32 flags, void *payload,
		  u16 payload_size)
842 843
{
	struct xpc_partition *part = &xpc_partitions[partid];
844
	enum xp_retval ret = xpUnknownReason;
845

846
	dev_dbg(xpc_chan, "payload=0x%p, partid=%d, channel=%d\n", payload,
847 848
		partid, ch_number);

849
	DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
850
	DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
851
	DBUG_ON(payload == NULL);
852

853 854 855 856 857
	if (xpc_part_ref(part)) {
		ret = xpc_send_msg(&part->channels[ch_number], flags, payload,
				   payload_size, 0, NULL, NULL);
		xpc_part_deref(part);
	}
858 859 860 861 862

	return ret;
}

/*
863 864
 * Send a message that contains the user's payload on the specified channel
 * connected to the specified partition.
865
 *
866 867 868 869
 * NOTE that this routine can sleep waiting for a message entry to become
 * available. To not sleep, pass in the XPC_NOWAIT flag.
 *
 * This routine will not wait for the message to be sent or received.
870 871 872 873 874 875 876 877 878 879 880 881 882
 *
 * Once the remote end of the channel has received the message, the function
 * passed as an argument to xpc_initiate_send_notify() will be called. This
 * allows the sender to free up or re-use any buffers referenced by the
 * message, but does NOT mean the message has been processed at the remote
 * end by a receiver.
 *
 * If this routine returns an error, the caller's function will NOT be called.
 *
 * Arguments:
 *
 *	partid - ID of partition to which the channel is connected.
 *	ch_number - channel # to send message on.
883 884 885
 *	flags - see xp.h for valid flags.
 *	payload - pointer to the payload which is to be sent.
 *	payload_size - size of the payload in bytes.
886 887 888 889
 *	func - function to call with asynchronous notification of message
 *		  receipt. THIS FUNCTION MUST BE NON-BLOCKING.
 *	key - user-defined key to be passed to the function when it's called.
 */
890
enum xp_retval
891 892
xpc_initiate_send_notify(short partid, int ch_number, u32 flags, void *payload,
			 u16 payload_size, xpc_notify_func func, void *key)
893 894
{
	struct xpc_partition *part = &xpc_partitions[partid];
895
	enum xp_retval ret = xpUnknownReason;
896

897
	dev_dbg(xpc_chan, "payload=0x%p, partid=%d, channel=%d\n", payload,
898 899
		partid, ch_number);

900
	DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
901
	DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);
902
	DBUG_ON(payload == NULL);
903 904
	DBUG_ON(func == NULL);

905 906 907 908 909
	if (xpc_part_ref(part)) {
		ret = xpc_send_msg(&part->channels[ch_number], flags, payload,
				   payload_size, XPC_N_CALL, func, key);
		xpc_part_deref(part);
	}
910 911 912 913 914 915 916 917 918 919 920
	return ret;
}

/*
 * Deliver a message to its intended recipient.
 */
void
xpc_deliver_msg(struct xpc_channel *ch)
{
	struct xpc_msg *msg;

921 922
	msg = xpc_get_deliverable_msg(ch);
	if (msg != NULL) {
923 924 925 926 927 928 929 930 931 932 933 934 935

		/*
		 * This ref is taken to protect the payload itself from being
		 * freed before the user is finished with it, which the user
		 * indicates by calling xpc_initiate_received().
		 */
		xpc_msgqueue_ref(ch);

		atomic_inc(&ch->kthreads_active);

		if (ch->func != NULL) {
			dev_dbg(xpc_chan, "ch->func() called, msg=0x%p, "
				"msg_number=%ld, partid=%d, channel=%d\n",
936
				msg, (signed long)msg->number, ch->partid,
937 938 939
				ch->number);

			/* deliver the message to its intended recipient */
940
			ch->func(xpMsgReceived, ch->partid, ch->number,
941
				 &msg->payload, ch->key);
942 943 944

			dev_dbg(xpc_chan, "ch->func() returned, msg=0x%p, "
				"msg_number=%ld, partid=%d, channel=%d\n",
945
				msg, (signed long)msg->number, ch->partid,
946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967
				ch->number);
		}

		atomic_dec(&ch->kthreads_active);
	}
}

/*
 * Acknowledge receipt of a delivered message.
 *
 * If a message has XPC_M_INTERRUPT set, send an interrupt to the partition
 * that sent the message.
 *
 * This function, although called by users, does not call xpc_part_ref() to
 * ensure that the partition infrastructure is in place. It relies on the
 * fact that we called xpc_msgqueue_ref() in xpc_deliver_msg().
 *
 * Arguments:
 *
 *	partid - ID of partition to which the channel is connected.
 *	ch_number - channel # message received on.
 *	payload - pointer to the payload area allocated via
968
 *			xpc_initiate_send() or xpc_initiate_send_notify().
969 970
 */
void
971
xpc_initiate_received(short partid, int ch_number, void *payload)
972 973 974 975 976
{
	struct xpc_partition *part = &xpc_partitions[partid];
	struct xpc_channel *ch;
	struct xpc_msg *msg = XPC_MSG_ADDRESS(payload);

977
	DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
978 979 980
	DBUG_ON(ch_number < 0 || ch_number >= part->nchannels);

	ch = &part->channels[ch_number];
981
	xpc_received_msg(ch, msg);
982 983 984 985

	/* the call to xpc_msgqueue_ref() was done by xpc_deliver_msg()  */
	xpc_msgqueue_deref(ch);
}