xpc_main.c 35.6 KB
Newer Older
1 2 3 4 5
/*
 * This file is subject to the terms and conditions of the GNU General Public
 * License.  See the file "COPYING" in the main directory of this archive
 * for more details.
 *
6
 * Copyright (c) 2004-2009 Silicon Graphics, Inc.  All Rights Reserved.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
 */

/*
 * Cross Partition Communication (XPC) support - standard version.
 *
 *	XPC provides a message passing capability that crosses partition
 *	boundaries. This module is made up of two parts:
 *
 *	    partition	This part detects the presence/absence of other
 *			partitions. It provides a heartbeat and monitors
 *			the heartbeats of other partitions.
 *
 *	    channel	This part manages the channels and sends/receives
 *			messages across them to/from other partitions.
 *
 *	There are a couple of additional functions residing in XP, which
 *	provide an interface to XPC for its users.
 *
 *
 *	Caveats:
 *
28
 *	  . Currently on sn2, we have no way to determine which nasid an IRQ
29 30 31 32 33
 *	    came from. Thus, xpc_send_IRQ_sn2() does a remote amo write
 *	    followed by an IPI. The amo indicates where data is to be pulled
 *	    from, so after the IPI arrives, the remote partition checks the amo
 *	    word. The IPI can actually arrive before the amo however, so other
 *	    code must periodically check for this case. Also, remote amo
34 35 36 37
 *	    operations do not reliably time out. Thus we do a remote PIO read
 *	    solely to know whether the remote partition is down and whether we
 *	    should stop sending IPIs to it. This remote PIO read operation is
 *	    set up in a special nofault region so SAL knows to ignore (and
38
 *	    cleanup) any errors due to the remote amo write, PIO read, and/or
39
 *	    PIO write operations.
40 41 42 43 44 45 46
 *
 *	    If/when new hardware solves this IPI problem, we should abandon
 *	    the current approach.
 *
 */

#include <linux/module.h>
47 48
#include <linux/sysctl.h>
#include <linux/device.h>
49
#include <linux/delay.h>
50
#include <linux/reboot.h>
51
#include <linux/kdebug.h>
52
#include <linux/kthread.h>
53
#include "xpc.h"
54 55 56 57 58 59 60 61

/* define two XPC debug device structures to be used with dev_dbg() et al */

struct device_driver xpc_dbg_name = {
	.name = "xpc"
};

struct device xpc_part_dbg_subname = {
62
	.init_name = "",	/* set to "part" at xpc_init() time */
63 64 65 66
	.driver = &xpc_dbg_name
};

struct device xpc_chan_dbg_subname = {
67
	.init_name = "",	/* set to "chan" at xpc_init() time */
68 69 70 71 72 73
	.driver = &xpc_dbg_name
};

struct device *xpc_part = &xpc_part_dbg_subname;
struct device *xpc_chan = &xpc_chan_dbg_subname;

74 75
static int xpc_kdebug_ignore;

76 77
/* systune related variables for /proc/sys directories */

78 79 80
static int xpc_hb_interval = XPC_HB_DEFAULT_INTERVAL;
static int xpc_hb_min_interval = 1;
static int xpc_hb_max_interval = 10;
81

82 83 84
static int xpc_hb_check_interval = XPC_HB_CHECK_DEFAULT_INTERVAL;
static int xpc_hb_check_min_interval = 10;
static int xpc_hb_check_max_interval = 120;
85

86 87 88
int xpc_disengage_timelimit = XPC_DISENGAGE_DEFAULT_TIMELIMIT;
static int xpc_disengage_min_timelimit;	/* = 0 */
static int xpc_disengage_max_timelimit = 120;
89 90 91

static ctl_table xpc_sys_xpc_hb_dir[] = {
	{
92 93 94 95 96 97 98
	 .procname = "hb_interval",
	 .data = &xpc_hb_interval,
	 .maxlen = sizeof(int),
	 .mode = 0644,
	 .proc_handler = &proc_dointvec_minmax,
	 .extra1 = &xpc_hb_min_interval,
	 .extra2 = &xpc_hb_max_interval},
99
	{
100 101 102 103 104 105 106
	 .procname = "hb_check_interval",
	 .data = &xpc_hb_check_interval,
	 .maxlen = sizeof(int),
	 .mode = 0644,
	 .proc_handler = &proc_dointvec_minmax,
	 .extra1 = &xpc_hb_check_min_interval,
	 .extra2 = &xpc_hb_check_max_interval},
107
	{}
108 109 110
};
static ctl_table xpc_sys_xpc_dir[] = {
	{
111 112 113
	 .procname = "hb",
	 .mode = 0555,
	 .child = xpc_sys_xpc_hb_dir},
114
	{
115 116
	 .procname = "disengage_timelimit",
	 .data = &xpc_disengage_timelimit,
117 118 119
	 .maxlen = sizeof(int),
	 .mode = 0644,
	 .proc_handler = &proc_dointvec_minmax,
120 121
	 .extra1 = &xpc_disengage_min_timelimit,
	 .extra2 = &xpc_disengage_max_timelimit},
122
	{}
123 124 125
};
static ctl_table xpc_sys_dir[] = {
	{
126 127 128
	 .procname = "xpc",
	 .mode = 0555,
	 .child = xpc_sys_xpc_dir},
129
	{}
130 131 132
};
static struct ctl_table_header *xpc_sysctl;

133 134
/* non-zero if any remote partition disengage was timed out */
int xpc_disengage_timedout;
135

136 137 138
/* #of activate IRQs received and not yet processed */
int xpc_activate_IRQ_rcvd;
DEFINE_SPINLOCK(xpc_activate_IRQ_rcvd_lock);
139 140

/* IRQ handler notifies this wait queue on receipt of an IRQ */
141
DECLARE_WAIT_QUEUE_HEAD(xpc_activate_IRQ_wq);
142 143

static unsigned long xpc_hb_check_timeout;
144
static struct timer_list xpc_hb_timer;
145

146
/* notification that the xpc_hb_checker thread has exited */
J
Jes Sorensen 已提交
147
static DECLARE_COMPLETION(xpc_hb_checker_exited);
148

149
/* notification that the xpc_discovery thread has exited */
J
Jes Sorensen 已提交
150
static DECLARE_COMPLETION(xpc_discovery_exited);
151 152 153

static void xpc_kthread_waitmsgs(struct xpc_partition *, struct xpc_channel *);

154 155 156 157 158
static int xpc_system_reboot(struct notifier_block *, unsigned long, void *);
static struct notifier_block xpc_reboot_notifier = {
	.notifier_call = xpc_system_reboot,
};

159 160 161 162 163
static int xpc_system_die(struct notifier_block *, unsigned long, void *);
static struct notifier_block xpc_die_notifier = {
	.notifier_call = xpc_system_die,
};

R
Robin Holt 已提交
164
struct xpc_arch_operations xpc_arch_ops;
165

166
/*
167
 * Timer function to enforce the timelimit on the partition disengage.
168 169
 */
static void
170
xpc_timeout_partition_disengage(unsigned long data)
171
{
172
	struct xpc_partition *part = (struct xpc_partition *)data;
173

174
	DBUG_ON(time_is_after_jiffies(part->disengage_timeout));
175

176
	(void)xpc_partition_disengaged(part);
177

178
	DBUG_ON(part->disengage_timeout != 0);
R
Robin Holt 已提交
179
	DBUG_ON(xpc_arch_ops.partition_engaged(XPC_PARTID(part)));
180 181
}

182 183 184 185 186 187 188 189
/*
 * Timer to produce the heartbeat.  The timer structures function is
 * already set when this is initially called.  A tunable is used to
 * specify when the next timeout should occur.
 */
static void
xpc_hb_beater(unsigned long dummy)
{
R
Robin Holt 已提交
190
	xpc_arch_ops.increment_heartbeat();
191

192
	if (time_is_before_eq_jiffies(xpc_hb_check_timeout))
193
		wake_up_interruptible(&xpc_activate_IRQ_wq);
194 195 196 197 198

	xpc_hb_timer.expires = jiffies + (xpc_hb_interval * HZ);
	add_timer(&xpc_hb_timer);
}

199 200 201
static void
xpc_start_hb_beater(void)
{
R
Robin Holt 已提交
202
	xpc_arch_ops.heartbeat_init();
203 204 205 206 207 208 209 210 211
	init_timer(&xpc_hb_timer);
	xpc_hb_timer.function = xpc_hb_beater;
	xpc_hb_beater(0);
}

static void
xpc_stop_hb_beater(void)
{
	del_timer_sync(&xpc_hb_timer);
R
Robin Holt 已提交
212
	xpc_arch_ops.heartbeat_exit();
213 214
}

215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
/*
 * At periodic intervals, scan through all active partitions and ensure
 * their heartbeat is still active.  If not, the partition is deactivated.
 */
static void
xpc_check_remote_hb(void)
{
	struct xpc_partition *part;
	short partid;
	enum xp_retval ret;

	for (partid = 0; partid < xp_max_npartitions; partid++) {

		if (xpc_exiting)
			break;

		if (partid == xp_partition_id)
			continue;

		part = &xpc_partitions[partid];

236 237
		if (part->act_state == XPC_P_AS_INACTIVE ||
		    part->act_state == XPC_P_AS_DEACTIVATING) {
238 239 240
			continue;
		}

R
Robin Holt 已提交
241
		ret = xpc_arch_ops.get_remote_heartbeat(part);
242 243 244 245 246
		if (ret != xpSuccess)
			XPC_DEACTIVATE_PARTITION(part, ret);
	}
}

247 248 249 250 251 252 253
/*
 * This thread is responsible for nearly all of the partition
 * activation/deactivation.
 */
static int
xpc_hb_checker(void *ignore)
{
254
	int force_IRQ = 0;
255 256 257

	/* this thread was marked active by xpc_hb_init() */

258
	set_cpus_allowed_ptr(current, cpumask_of(XPC_HB_CHECK_CPU));
259

260
	/* set our heartbeating to other partitions into motion */
261
	xpc_hb_check_timeout = jiffies + (xpc_hb_check_interval * HZ);
262
	xpc_start_hb_beater();
263

264
	while (!xpc_exiting) {
265 266 267

		dev_dbg(xpc_part, "woke up with %d ticks rem; %d IRQs have "
			"been received\n",
268
			(int)(xpc_hb_check_timeout - jiffies),
269
			xpc_activate_IRQ_rcvd);
270 271

		/* checking of remote heartbeats is skewed by IRQ handling */
272
		if (time_is_before_eq_jiffies(xpc_hb_check_timeout)) {
273 274 275
			xpc_hb_check_timeout = jiffies +
			    (xpc_hb_check_interval * HZ);

276 277 278 279
			dev_dbg(xpc_part, "checking remote heartbeats\n");
			xpc_check_remote_hb();

			/*
280 281
			 * On sn2 we need to periodically recheck to ensure no
			 * IRQ/amo pairs have been missed.
282
			 */
283 284
			if (is_shub())
				force_IRQ = 1;
285 286
		}

287
		/* check for outstanding IRQs */
288
		if (xpc_activate_IRQ_rcvd > 0 || force_IRQ != 0) {
289
			force_IRQ = 0;
290 291
			dev_dbg(xpc_part, "processing activate IRQs "
				"received\n");
R
Robin Holt 已提交
292
			xpc_arch_ops.process_activate_IRQ_rcvd();
293
		}
294 295

		/* wait for IRQ or timeout */
296
		(void)wait_event_interruptible(xpc_activate_IRQ_wq,
297
					       (time_is_before_eq_jiffies(
298
						xpc_hb_check_timeout) ||
299
						xpc_activate_IRQ_rcvd > 0 ||
300
						xpc_exiting));
301 302
	}

303 304
	xpc_stop_hb_beater();

305 306
	dev_dbg(xpc_part, "heartbeat checker is exiting\n");

307
	/* mark this thread as having exited */
J
Jes Sorensen 已提交
308
	complete(&xpc_hb_checker_exited);
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
	return 0;
}

/*
 * This thread will attempt to discover other partitions to activate
 * based on info provided by SAL. This new thread is short lived and
 * will exit once discovery is complete.
 */
static int
xpc_initiate_discovery(void *ignore)
{
	xpc_discovery();

	dev_dbg(xpc_part, "discovery thread is exiting\n");

324
	/* mark this thread as having exited */
J
Jes Sorensen 已提交
325
	complete(&xpc_discovery_exited);
326 327 328 329 330
	return 0;
}

/*
 * The first kthread assigned to a newly activated partition is the one
331
 * created by XPC HB with which it calls xpc_activating(). XPC hangs on to
332 333 334 335 336 337 338 339 340 341 342 343
 * that kthread until the partition is brought down, at which time that kthread
 * returns back to XPC HB. (The return of that kthread will signify to XPC HB
 * that XPC has dismantled all communication infrastructure for the associated
 * partition.) This kthread becomes the channel manager for that partition.
 *
 * Each active partition has a channel manager, who, besides connecting and
 * disconnecting channels, will ensure that each of the partition's connected
 * channels has the required number of assigned kthreads to get the work done.
 */
static void
xpc_channel_mgr(struct xpc_partition *part)
{
344
	while (part->act_state != XPC_P_AS_DEACTIVATING ||
345 346
	       atomic_read(&part->nchannels_active) > 0 ||
	       !xpc_partition_disengaged(part)) {
347

348
		xpc_process_sent_chctl_flags(part);
349 350 351 352 353 354 355 356 357 358 359 360 361 362 363

		/*
		 * Wait until we've been requested to activate kthreads or
		 * all of the channel's message queues have been torn down or
		 * a signal is pending.
		 *
		 * The channel_mgr_requests is set to 1 after being awakened,
		 * This is done to prevent the channel mgr from making one pass
		 * through the loop for each request, since he will
		 * be servicing all the requests in one pass. The reason it's
		 * set to 1 instead of 0 is so that other kthreads will know
		 * that the channel mgr is running and won't bother trying to
		 * wake him up.
		 */
		atomic_dec(&part->channel_mgr_requests);
364
		(void)wait_event_interruptible(part->channel_mgr_wq,
365
				(atomic_read(&part->channel_mgr_requests) > 0 ||
366
				 part->chctl.all_flags != 0 ||
367
				 (part->act_state == XPC_P_AS_DEACTIVATING &&
368 369
				 atomic_read(&part->nchannels_active) == 0 &&
				 xpc_partition_disengaged(part))));
370 371 372 373
		atomic_set(&part->channel_mgr_requests, 1);
	}
}

374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466
/*
 * Guarantee that the kzalloc'd memory is cacheline aligned.
 */
void *
xpc_kzalloc_cacheline_aligned(size_t size, gfp_t flags, void **base)
{
	/* see if kzalloc will give us cachline aligned memory by default */
	*base = kzalloc(size, flags);
	if (*base == NULL)
		return NULL;

	if ((u64)*base == L1_CACHE_ALIGN((u64)*base))
		return *base;

	kfree(*base);

	/* nope, we'll have to do it ourselves */
	*base = kzalloc(size + L1_CACHE_BYTES, flags);
	if (*base == NULL)
		return NULL;

	return (void *)L1_CACHE_ALIGN((u64)*base);
}

/*
 * Setup the channel structures necessary to support XPartition Communication
 * between the specified remote partition and the local one.
 */
static enum xp_retval
xpc_setup_ch_structures(struct xpc_partition *part)
{
	enum xp_retval ret;
	int ch_number;
	struct xpc_channel *ch;
	short partid = XPC_PARTID(part);

	/*
	 * Allocate all of the channel structures as a contiguous chunk of
	 * memory.
	 */
	DBUG_ON(part->channels != NULL);
	part->channels = kzalloc(sizeof(struct xpc_channel) * XPC_MAX_NCHANNELS,
				 GFP_KERNEL);
	if (part->channels == NULL) {
		dev_err(xpc_chan, "can't get memory for channels\n");
		return xpNoMemory;
	}

	/* allocate the remote open and close args */

	part->remote_openclose_args =
	    xpc_kzalloc_cacheline_aligned(XPC_OPENCLOSE_ARGS_SIZE,
					  GFP_KERNEL, &part->
					  remote_openclose_args_base);
	if (part->remote_openclose_args == NULL) {
		dev_err(xpc_chan, "can't get memory for remote connect args\n");
		ret = xpNoMemory;
		goto out_1;
	}

	part->chctl.all_flags = 0;
	spin_lock_init(&part->chctl_lock);

	atomic_set(&part->channel_mgr_requests, 1);
	init_waitqueue_head(&part->channel_mgr_wq);

	part->nchannels = XPC_MAX_NCHANNELS;

	atomic_set(&part->nchannels_active, 0);
	atomic_set(&part->nchannels_engaged, 0);

	for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
		ch = &part->channels[ch_number];

		ch->partid = partid;
		ch->number = ch_number;
		ch->flags = XPC_C_DISCONNECTED;

		atomic_set(&ch->kthreads_assigned, 0);
		atomic_set(&ch->kthreads_idle, 0);
		atomic_set(&ch->kthreads_active, 0);

		atomic_set(&ch->references, 0);
		atomic_set(&ch->n_to_notify, 0);

		spin_lock_init(&ch->lock);
		init_completion(&ch->wdisconnect_wait);

		atomic_set(&ch->n_on_msg_allocate_wq, 0);
		init_waitqueue_head(&ch->msg_allocate_wq);
		init_waitqueue_head(&ch->idle_wq);
	}

R
Robin Holt 已提交
467
	ret = xpc_arch_ops.setup_ch_structures(part);
468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510
	if (ret != xpSuccess)
		goto out_2;

	/*
	 * With the setting of the partition setup_state to XPC_P_SS_SETUP,
	 * we're declaring that this partition is ready to go.
	 */
	part->setup_state = XPC_P_SS_SETUP;

	return xpSuccess;

	/* setup of ch structures failed */
out_2:
	kfree(part->remote_openclose_args_base);
	part->remote_openclose_args = NULL;
out_1:
	kfree(part->channels);
	part->channels = NULL;
	return ret;
}

/*
 * Teardown the channel structures necessary to support XPartition Communication
 * between the specified remote partition and the local one.
 */
static void
xpc_teardown_ch_structures(struct xpc_partition *part)
{
	DBUG_ON(atomic_read(&part->nchannels_engaged) != 0);
	DBUG_ON(atomic_read(&part->nchannels_active) != 0);

	/*
	 * Make this partition inaccessible to local processes by marking it
	 * as no longer setup. Then wait before proceeding with the teardown
	 * until all existing references cease.
	 */
	DBUG_ON(part->setup_state != XPC_P_SS_SETUP);
	part->setup_state = XPC_P_SS_WTEARDOWN;

	wait_event(part->teardown_wq, (atomic_read(&part->references) == 0));

	/* now we can begin tearing down the infrastructure */

R
Robin Holt 已提交
511
	xpc_arch_ops.teardown_ch_structures(part);
512 513 514 515 516 517 518 519 520

	kfree(part->remote_openclose_args_base);
	part->remote_openclose_args = NULL;
	kfree(part->channels);
	part->channels = NULL;

	part->setup_state = XPC_P_SS_TORNDOWN;
}

521 522 523 524 525 526 527
/*
 * When XPC HB determines that a partition has come up, it will create a new
 * kthread and that kthread will call this function to attempt to set up the
 * basic infrastructure used for Cross Partition Communication with the newly
 * upped partition.
 *
 * The kthread that was created by XPC HB and which setup the XPC
528 529 530
 * infrastructure will remain assigned to the partition becoming the channel
 * manager for that partition until the partition is deactivating, at which
 * time the kthread will teardown the XPC infrastructure and then exit.
531 532 533 534
 */
static int
xpc_activating(void *__partid)
{
535
	short partid = (u64)__partid;
536 537 538
	struct xpc_partition *part = &xpc_partitions[partid];
	unsigned long irq_flags;

539
	DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
540 541 542

	spin_lock_irqsave(&part->act_lock, irq_flags);

543 544
	if (part->act_state == XPC_P_AS_DEACTIVATING) {
		part->act_state = XPC_P_AS_INACTIVE;
545 546 547 548 549 550
		spin_unlock_irqrestore(&part->act_lock, irq_flags);
		part->remote_rp_pa = 0;
		return 0;
	}

	/* indicate the thread is activating */
551 552
	DBUG_ON(part->act_state != XPC_P_AS_ACTIVATION_REQ);
	part->act_state = XPC_P_AS_ACTIVATING;
553 554 555 556

	XPC_SET_REASON(part, 0, 0);
	spin_unlock_irqrestore(&part->act_lock, irq_flags);

557
	dev_dbg(xpc_part, "activating partition %d\n", partid);
558

R
Robin Holt 已提交
559
	xpc_arch_ops.allow_hb(partid);
560

561
	if (xpc_setup_ch_structures(part) == xpSuccess) {
562 563
		(void)xpc_part_ref(part);	/* this will always succeed */

R
Robin Holt 已提交
564
		if (xpc_arch_ops.make_first_contact(part) == xpSuccess) {
565 566 567 568 569 570
			xpc_mark_partition_active(part);
			xpc_channel_mgr(part);
			/* won't return until partition is deactivating */
		}

		xpc_part_deref(part);
571
		xpc_teardown_ch_structures(part);
572
	}
573

R
Robin Holt 已提交
574
	xpc_arch_ops.disallow_hb(partid);
575 576
	xpc_mark_partition_inactive(part);

577
	if (part->reason == xpReactivating) {
578
		/* interrupting ourselves results in activating partition */
R
Robin Holt 已提交
579
		xpc_arch_ops.request_partition_reactivation(part);
580 581 582 583 584 585 586 587
	}

	return 0;
}

void
xpc_activate_partition(struct xpc_partition *part)
{
588
	short partid = XPC_PARTID(part);
589
	unsigned long irq_flags;
590
	struct task_struct *kthread;
591 592 593

	spin_lock_irqsave(&part->act_lock, irq_flags);

594
	DBUG_ON(part->act_state != XPC_P_AS_INACTIVE);
595

596
	part->act_state = XPC_P_AS_ACTIVATION_REQ;
597
	XPC_SET_REASON(part, xpCloneKThread, __LINE__);
598 599

	spin_unlock_irqrestore(&part->act_lock, irq_flags);
600

601 602 603
	kthread = kthread_run(xpc_activating, (void *)((u64)partid), "xpc%02d",
			      partid);
	if (IS_ERR(kthread)) {
604
		spin_lock_irqsave(&part->act_lock, irq_flags);
605
		part->act_state = XPC_P_AS_INACTIVE;
606
		XPC_SET_REASON(part, xpCloneKThreadFailed, __LINE__);
607 608
		spin_unlock_irqrestore(&part->act_lock, irq_flags);
	}
609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630
}

void
xpc_activate_kthreads(struct xpc_channel *ch, int needed)
{
	int idle = atomic_read(&ch->kthreads_idle);
	int assigned = atomic_read(&ch->kthreads_assigned);
	int wakeup;

	DBUG_ON(needed <= 0);

	if (idle > 0) {
		wakeup = (needed > idle) ? idle : needed;
		needed -= wakeup;

		dev_dbg(xpc_chan, "wakeup %d idle kthreads, partid=%d, "
			"channel=%d\n", wakeup, ch->partid, ch->number);

		/* only wakeup the requested number of kthreads */
		wake_up_nr(&ch->idle_wq, wakeup);
	}

631
	if (needed <= 0)
632 633 634 635
		return;

	if (needed + assigned > ch->kthreads_assigned_limit) {
		needed = ch->kthreads_assigned_limit - assigned;
636
		if (needed <= 0)
637 638 639 640 641 642
			return;
	}

	dev_dbg(xpc_chan, "create %d new kthreads, partid=%d, channel=%d\n",
		needed, ch->partid, ch->number);

643
	xpc_create_kthreads(ch, needed, 0);
644 645 646 647 648 649 650 651
}

/*
 * This function is where XPC's kthreads wait for messages to deliver.
 */
static void
xpc_kthread_waitmsgs(struct xpc_partition *part, struct xpc_channel *ch)
{
R
Robin Holt 已提交
652 653 654
	int (*n_of_deliverable_payloads) (struct xpc_channel *) =
		xpc_arch_ops.n_of_deliverable_payloads;

655 656 657
	do {
		/* deliver messages to their intended recipients */

R
Robin Holt 已提交
658
		while (n_of_deliverable_payloads(ch) > 0 &&
659
		       !(ch->flags & XPC_C_DISCONNECTING)) {
660
			xpc_deliver_payload(ch);
661 662 663
		}

		if (atomic_inc_return(&ch->kthreads_idle) >
664
		    ch->kthreads_idle_limit) {
665 666 667 668 669 670 671 672
			/* too many idle kthreads on this channel */
			atomic_dec(&ch->kthreads_idle);
			break;
		}

		dev_dbg(xpc_chan, "idle kthread calling "
			"wait_event_interruptible_exclusive()\n");

673
		(void)wait_event_interruptible_exclusive(ch->idle_wq,
R
Robin Holt 已提交
674
				(n_of_deliverable_payloads(ch) > 0 ||
675
				 (ch->flags & XPC_C_DISCONNECTING)));
676 677 678

		atomic_dec(&ch->kthreads_idle);

679
	} while (!(ch->flags & XPC_C_DISCONNECTING));
680 681 682
}

static int
683
xpc_kthread_start(void *args)
684
{
685
	short partid = XPC_UNPACK_ARG1(args);
686 687 688 689
	u16 ch_number = XPC_UNPACK_ARG2(args);
	struct xpc_partition *part = &xpc_partitions[partid];
	struct xpc_channel *ch;
	int n_needed;
690
	unsigned long irq_flags;
R
Robin Holt 已提交
691 692
	int (*n_of_deliverable_payloads) (struct xpc_channel *) =
		xpc_arch_ops.n_of_deliverable_payloads;
693 694 695 696 697 698 699 700 701 702

	dev_dbg(xpc_chan, "kthread starting, partid=%d, channel=%d\n",
		partid, ch_number);

	ch = &part->channels[ch_number];

	if (!(ch->flags & XPC_C_DISCONNECTING)) {

		/* let registerer know that connection has been established */

703
		spin_lock_irqsave(&ch->lock, irq_flags);
704 705
		if (!(ch->flags & XPC_C_CONNECTEDCALLOUT)) {
			ch->flags |= XPC_C_CONNECTEDCALLOUT;
706 707
			spin_unlock_irqrestore(&ch->lock, irq_flags);

708 709
			xpc_connected_callout(ch);

710 711 712 713
			spin_lock_irqsave(&ch->lock, irq_flags);
			ch->flags |= XPC_C_CONNECTEDCALLOUT_MADE;
			spin_unlock_irqrestore(&ch->lock, irq_flags);

714 715 716 717 718 719 720
			/*
			 * It is possible that while the callout was being
			 * made that the remote partition sent some messages.
			 * If that is the case, we may need to activate
			 * additional kthreads to help deliver them. We only
			 * need one less than total #of messages to deliver.
			 */
R
Robin Holt 已提交
721
			n_needed = n_of_deliverable_payloads(ch) - 1;
722
			if (n_needed > 0 && !(ch->flags & XPC_C_DISCONNECTING))
723
				xpc_activate_kthreads(ch, n_needed);
724

725 726
		} else {
			spin_unlock_irqrestore(&ch->lock, irq_flags);
727 728 729 730 731
		}

		xpc_kthread_waitmsgs(part, ch);
	}

732
	/* let registerer know that connection is disconnecting */
733

734 735
	spin_lock_irqsave(&ch->lock, irq_flags);
	if ((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
736
	    !(ch->flags & XPC_C_DISCONNECTINGCALLOUT)) {
737
		ch->flags |= XPC_C_DISCONNECTINGCALLOUT;
738
		spin_unlock_irqrestore(&ch->lock, irq_flags);
739

740
		xpc_disconnect_callout(ch, xpDisconnecting);
741 742 743 744 745 746

		spin_lock_irqsave(&ch->lock, irq_flags);
		ch->flags |= XPC_C_DISCONNECTINGCALLOUT_MADE;
	}
	spin_unlock_irqrestore(&ch->lock, irq_flags);

747 748
	if (atomic_dec_return(&ch->kthreads_assigned) == 0 &&
	    atomic_dec_return(&part->nchannels_engaged) == 0) {
R
Robin Holt 已提交
749
		xpc_arch_ops.indicate_partition_disengaged(part);
750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773
	}

	xpc_msgqueue_deref(ch);

	dev_dbg(xpc_chan, "kthread exiting, partid=%d, channel=%d\n",
		partid, ch_number);

	xpc_part_deref(part);
	return 0;
}

/*
 * For each partition that XPC has established communications with, there is
 * a minimum of one kernel thread assigned to perform any operation that
 * may potentially sleep or block (basically the callouts to the asynchronous
 * functions registered via xpc_connect()).
 *
 * Additional kthreads are created and destroyed by XPC as the workload
 * demands.
 *
 * A kthread is assigned to one of the active channels that exists for a given
 * partition.
 */
void
774
xpc_create_kthreads(struct xpc_channel *ch, int needed,
775
		    int ignore_disconnecting)
776 777 778
{
	unsigned long irq_flags;
	u64 args = XPC_PACK_ARGS(ch->partid, ch->number);
779
	struct xpc_partition *part = &xpc_partitions[ch->partid];
780
	struct task_struct *kthread;
R
Robin Holt 已提交
781 782
	void (*indicate_partition_disengaged) (struct xpc_partition *) =
		xpc_arch_ops.indicate_partition_disengaged;
783 784

	while (needed-- > 0) {
785 786 787 788 789 790

		/*
		 * The following is done on behalf of the newly created
		 * kthread. That kthread is responsible for doing the
		 * counterpart to the following before it exits.
		 */
791 792 793 794
		if (ignore_disconnecting) {
			if (!atomic_inc_not_zero(&ch->kthreads_assigned)) {
				/* kthreads assigned had gone to zero */
				BUG_ON(!(ch->flags &
795
					 XPC_C_DISCONNECTINGCALLOUT_MADE));
796 797 798 799 800 801
				break;
			}

		} else if (ch->flags & XPC_C_DISCONNECTING) {
			break;

802 803
		} else if (atomic_inc_return(&ch->kthreads_assigned) == 1 &&
			   atomic_inc_return(&part->nchannels_engaged) == 1) {
R
Robin Holt 已提交
804
			xpc_arch_ops.indicate_partition_engaged(part);
805
		}
806
		(void)xpc_part_ref(part);
807 808
		xpc_msgqueue_ref(ch);

809 810 811
		kthread = kthread_run(xpc_kthread_start, (void *)args,
				      "xpc%02dc%d", ch->partid, ch->number);
		if (IS_ERR(kthread)) {
812
			/* the fork failed */
813 814 815 816 817 818 819

			/*
			 * NOTE: if (ignore_disconnecting &&
			 * !(ch->flags & XPC_C_DISCONNECTINGCALLOUT)) is true,
			 * then we'll deadlock if all other kthreads assigned
			 * to this channel are blocked in the channel's
			 * registerer, because the only thing that will unblock
820
			 * them is the xpDisconnecting callout that this
821
			 * failed kthread_run() would have made.
822 823
			 */

824 825
			if (atomic_dec_return(&ch->kthreads_assigned) == 0 &&
			    atomic_dec_return(&part->nchannels_engaged) == 0) {
R
Robin Holt 已提交
826
				indicate_partition_disengaged(part);
827 828 829
			}
			xpc_msgqueue_deref(ch);
			xpc_part_deref(part);
830 831

			if (atomic_read(&ch->kthreads_assigned) <
832
			    ch->kthreads_idle_limit) {
833 834 835 836 837 838
				/*
				 * Flag this as an error only if we have an
				 * insufficient #of kthreads for the channel
				 * to function.
				 */
				spin_lock_irqsave(&ch->lock, irq_flags);
839
				XPC_DISCONNECT_CHANNEL(ch, xpLackOfResources,
840
						       &irq_flags);
841 842 843 844 845 846 847 848 849 850
				spin_unlock_irqrestore(&ch->lock, irq_flags);
			}
			break;
		}
	}
}

void
xpc_disconnect_wait(int ch_number)
{
851
	unsigned long irq_flags;
852
	short partid;
853 854
	struct xpc_partition *part;
	struct xpc_channel *ch;
855
	int wakeup_channel_mgr;
856 857

	/* now wait for all callouts to the caller's function to cease */
858
	for (partid = 0; partid < xp_max_npartitions; partid++) {
859 860
		part = &xpc_partitions[partid];

861
		if (!xpc_part_ref(part))
862
			continue;
863

864
		ch = &part->channels[ch_number];
865

866
		if (!(ch->flags & XPC_C_WDISCONNECT)) {
867
			xpc_part_deref(part);
868
			continue;
869
		}
870

J
Jes Sorensen 已提交
871
		wait_for_completion(&ch->wdisconnect_wait);
872 873 874 875 876

		spin_lock_irqsave(&ch->lock, irq_flags);
		DBUG_ON(!(ch->flags & XPC_C_DISCONNECTED));
		wakeup_channel_mgr = 0;

877
		if (ch->delayed_chctl_flags) {
878
			if (part->act_state != XPC_P_AS_DEACTIVATING) {
879 880 881 882
				spin_lock(&part->chctl_lock);
				part->chctl.flags[ch->number] |=
				    ch->delayed_chctl_flags;
				spin_unlock(&part->chctl_lock);
883 884
				wakeup_channel_mgr = 1;
			}
885
			ch->delayed_chctl_flags = 0;
886
		}
887 888 889 890

		ch->flags &= ~XPC_C_WDISCONNECT;
		spin_unlock_irqrestore(&ch->lock, irq_flags);

891
		if (wakeup_channel_mgr)
892 893 894
			xpc_wakeup_channel_mgr(part);

		xpc_part_deref(part);
895 896 897
	}
}

898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938
static int
xpc_setup_partitions(void)
{
	short partid;
	struct xpc_partition *part;

	xpc_partitions = kzalloc(sizeof(struct xpc_partition) *
				 xp_max_npartitions, GFP_KERNEL);
	if (xpc_partitions == NULL) {
		dev_err(xpc_part, "can't get memory for partition structure\n");
		return -ENOMEM;
	}

	/*
	 * The first few fields of each entry of xpc_partitions[] need to
	 * be initialized now so that calls to xpc_connect() and
	 * xpc_disconnect() can be made prior to the activation of any remote
	 * partition. NOTE THAT NONE OF THE OTHER FIELDS BELONGING TO THESE
	 * ENTRIES ARE MEANINGFUL UNTIL AFTER AN ENTRY'S CORRESPONDING
	 * PARTITION HAS BEEN ACTIVATED.
	 */
	for (partid = 0; partid < xp_max_npartitions; partid++) {
		part = &xpc_partitions[partid];

		DBUG_ON((u64)part != L1_CACHE_ALIGN((u64)part));

		part->activate_IRQ_rcvd = 0;
		spin_lock_init(&part->act_lock);
		part->act_state = XPC_P_AS_INACTIVE;
		XPC_SET_REASON(part, 0, 0);

		init_timer(&part->disengage_timer);
		part->disengage_timer.function =
		    xpc_timeout_partition_disengage;
		part->disengage_timer.data = (unsigned long)part;

		part->setup_state = XPC_P_SS_UNSET;
		init_waitqueue_head(&part->teardown_wq);
		atomic_set(&part->references, 0);
	}

R
Robin Holt 已提交
939
	return xpc_arch_ops.setup_partitions();
940 941 942 943 944
}

static void
xpc_teardown_partitions(void)
{
R
Robin Holt 已提交
945
	xpc_arch_ops.teardown_partitions();
946 947 948
	kfree(xpc_partitions);
}

949
static void
950
xpc_do_exit(enum xp_retval reason)
951
{
952
	short partid;
953
	int active_part_count, printed_waiting_msg = 0;
954
	struct xpc_partition *part;
955
	unsigned long printmsg_time, disengage_timeout = 0;
956

957 958
	/* a 'rmmod XPC' and a 'reboot' cannot both end up here together */
	DBUG_ON(xpc_exiting == 1);
959 960

	/*
961 962 963
	 * Let the heartbeat checker thread and the discovery thread
	 * (if one is running) know that they should exit. Also wake up
	 * the heartbeat checker thread in case it's sleeping.
964 965
	 */
	xpc_exiting = 1;
966
	wake_up_interruptible(&xpc_activate_IRQ_wq);
967

968
	/* wait for the discovery thread to exit */
J
Jes Sorensen 已提交
969
	wait_for_completion(&xpc_discovery_exited);
970

971
	/* wait for the heartbeat checker thread to exit */
J
Jes Sorensen 已提交
972
	wait_for_completion(&xpc_hb_checker_exited);
973

974
	/* sleep for a 1/3 of a second or so */
975
	(void)msleep_interruptible(300);
976 977 978

	/* wait for all partitions to become inactive */

979 980
	printmsg_time = jiffies + (XPC_DEACTIVATE_PRINTMSG_INTERVAL * HZ);
	xpc_disengage_timedout = 0;
981

982 983 984
	do {
		active_part_count = 0;

985
		for (partid = 0; partid < xp_max_npartitions; partid++) {
986 987
			part = &xpc_partitions[partid];

988
			if (xpc_partition_disengaged(part) &&
989
			    part->act_state == XPC_P_AS_INACTIVE) {
990
				continue;
991
			}
992 993 994 995

			active_part_count++;

			XPC_DEACTIVATE_PARTITION(part, reason);
996

997 998
			if (part->disengage_timeout > disengage_timeout)
				disengage_timeout = part->disengage_timeout;
999
		}
1000

R
Robin Holt 已提交
1001
		if (xpc_arch_ops.any_partition_engaged()) {
1002
			if (time_is_before_jiffies(printmsg_time)) {
1003
				dev_info(xpc_part, "waiting for remote "
1004 1005 1006
					 "partitions to deactivate, timeout in "
					 "%ld seconds\n", (disengage_timeout -
					 jiffies) / HZ);
1007
				printmsg_time = jiffies +
1008
				    (XPC_DEACTIVATE_PRINTMSG_INTERVAL * HZ);
1009 1010 1011 1012 1013 1014
				printed_waiting_msg = 1;
			}

		} else if (active_part_count > 0) {
			if (printed_waiting_msg) {
				dev_info(xpc_part, "waiting for local partition"
1015
					 " to deactivate\n");
1016 1017 1018 1019
				printed_waiting_msg = 0;
			}

		} else {
1020
			if (!xpc_disengage_timedout) {
1021
				dev_info(xpc_part, "all partitions have "
1022
					 "deactivated\n");
1023 1024
			}
			break;
1025 1026
		}

1027
		/* sleep for a 1/3 of a second or so */
1028
		(void)msleep_interruptible(300);
1029 1030 1031

	} while (1);

R
Robin Holt 已提交
1032
	DBUG_ON(xpc_arch_ops.any_partition_engaged());
1033

1034
	xpc_teardown_rsvd_page();
1035

1036
	if (reason == xpUnloading) {
1037
		(void)unregister_die_notifier(&xpc_die_notifier);
1038
		(void)unregister_reboot_notifier(&xpc_reboot_notifier);
1039
	}
1040

1041 1042 1043
	/* clear the interface to XPC's functions */
	xpc_clear_interface();

1044
	if (xpc_sysctl)
1045
		unregister_sysctl_table(xpc_sysctl);
1046

1047
	xpc_teardown_partitions();
1048 1049 1050

	if (is_shub())
		xpc_exit_sn2();
1051
	else if (is_uv())
1052
		xpc_exit_uv();
1053 1054
}

1055
/*
1056 1057 1058 1059 1060
 * This function is called when the system is being rebooted.
 */
static int
xpc_system_reboot(struct notifier_block *nb, unsigned long event, void *unused)
{
1061
	enum xp_retval reason;
1062 1063 1064

	switch (event) {
	case SYS_RESTART:
1065
		reason = xpSystemReboot;
1066 1067
		break;
	case SYS_HALT:
1068
		reason = xpSystemHalt;
1069 1070
		break;
	case SYS_POWER_OFF:
1071
		reason = xpSystemPoweroff;
1072 1073
		break;
	default:
1074
		reason = xpSystemGoingDown;
1075 1076 1077 1078 1079 1080 1081
	}

	xpc_do_exit(reason);
	return NOTIFY_DONE;
}

/*
1082 1083
 * Notify other partitions to deactivate from us by first disengaging from all
 * references to our memory.
1084 1085
 */
static void
1086
xpc_die_deactivate(void)
1087 1088
{
	struct xpc_partition *part;
1089
	short partid;
1090
	int any_engaged;
1091 1092
	long keep_waiting;
	long wait_to_print;
1093 1094 1095 1096

	/* keep xpc_hb_checker thread from doing anything (just in case) */
	xpc_exiting = 1;

R
Robin Holt 已提交
1097
	xpc_arch_ops.disallow_all_hbs();   /*indicate we're deactivated */
1098

1099
	for (partid = 0; partid < xp_max_npartitions; partid++) {
1100 1101
		part = &xpc_partitions[partid];

R
Robin Holt 已提交
1102
		if (xpc_arch_ops.partition_engaged(partid) ||
1103
		    part->act_state != XPC_P_AS_INACTIVE) {
R
Robin Holt 已提交
1104 1105
			xpc_arch_ops.request_partition_deactivation(part);
			xpc_arch_ops.indicate_partition_disengaged(part);
1106 1107 1108
		}
	}

1109 1110
	/*
	 * Though we requested that all other partitions deactivate from us,
1111 1112 1113 1114 1115 1116
	 * we only wait until they've all disengaged or we've reached the
	 * defined timelimit.
	 *
	 * Given that one iteration through the following while-loop takes
	 * approximately 200 microseconds, calculate the #of loops to take
	 * before bailing and the #of loops before printing a waiting message.
1117
	 */
1118 1119
	keep_waiting = xpc_disengage_timelimit * 1000 * 5;
	wait_to_print = XPC_DEACTIVATE_PRINTMSG_INTERVAL * 1000 * 5;
1120

1121
	while (1) {
R
Robin Holt 已提交
1122
		any_engaged = xpc_arch_ops.any_partition_engaged();
1123 1124
		if (!any_engaged) {
			dev_info(xpc_part, "all partitions have deactivated\n");
1125 1126
			break;
		}
1127

1128
		if (!keep_waiting--) {
1129 1130
			for (partid = 0; partid < xp_max_npartitions;
			     partid++) {
R
Robin Holt 已提交
1131
				if (xpc_arch_ops.partition_engaged(partid)) {
1132
					dev_info(xpc_part, "deactivate from "
1133 1134
						 "remote partition %d timed "
						 "out\n", partid);
1135 1136 1137 1138 1139
				}
			}
			break;
		}

1140
		if (!wait_to_print--) {
1141
			dev_info(xpc_part, "waiting for remote partitions to "
1142
				 "deactivate, timeout in %ld seconds\n",
1143 1144 1145
				 keep_waiting / (1000 * 5));
			wait_to_print = XPC_DEACTIVATE_PRINTMSG_INTERVAL *
			    1000 * 5;
1146
		}
1147 1148

		udelay(200);
1149 1150 1151 1152
	}
}

/*
1153 1154 1155 1156 1157 1158
 * This function is called when the system is being restarted or halted due
 * to some sort of system failure. If this is the case we need to notify the
 * other partitions to disengage from all references to our memory.
 * This function can also be called when our heartbeater could be offlined
 * for a time. In this case we need to notify other partitions to not worry
 * about the lack of a heartbeat.
1159 1160 1161 1162
 */
static int
xpc_system_die(struct notifier_block *nb, unsigned long event, void *unused)
{
1163
#ifdef CONFIG_IA64		/* !!! temporary kludge */
1164 1165 1166
	switch (event) {
	case DIE_MACHINE_RESTART:
	case DIE_MACHINE_HALT:
1167
		xpc_die_deactivate();
1168
		break;
1169 1170 1171

	case DIE_KDEBUG_ENTER:
		/* Should lack of heartbeat be ignored by other partitions? */
1172
		if (!xpc_kdebug_ignore)
1173
			break;
1174

1175
		/* fall through */
1176 1177
	case DIE_MCA_MONARCH_ENTER:
	case DIE_INIT_MONARCH_ENTER:
R
Robin Holt 已提交
1178
		xpc_arch_ops.offline_heartbeat();
1179
		break;
1180 1181 1182

	case DIE_KDEBUG_LEAVE:
		/* Is lack of heartbeat being ignored by other partitions? */
1183
		if (!xpc_kdebug_ignore)
1184
			break;
1185

1186
		/* fall through */
1187 1188
	case DIE_MCA_MONARCH_LEAVE:
	case DIE_INIT_MONARCH_LEAVE:
R
Robin Holt 已提交
1189
		xpc_arch_ops.online_heartbeat();
1190 1191
		break;
	}
1192 1193 1194
#else
	xpc_die_deactivate();
#endif
1195 1196 1197 1198

	return NOTIFY_DONE;
}

1199 1200 1201 1202
int __init
xpc_init(void)
{
	int ret;
1203
	struct task_struct *kthread;
1204

1205 1206
	dev_set_name(xpc_part, "part");
	dev_set_name(xpc_chan, "chan");
1207

1208 1209 1210
	if (is_shub()) {
		/*
		 * The ia64-sn2 architecture supports at most 64 partitions.
1211
		 * And the inability to unregister remote amos restricts us
1212 1213 1214
		 * further to only support exactly 64 partitions on this
		 * architecture, no less.
		 */
1215 1216 1217 1218 1219 1220
		if (xp_max_npartitions != 64) {
			dev_err(xpc_part, "max #of partitions not set to 64\n");
			ret = -EINVAL;
		} else {
			ret = xpc_init_sn2();
		}
1221 1222

	} else if (is_uv()) {
1223
		ret = xpc_init_uv();
1224 1225

	} else {
1226
		ret = -ENODEV;
1227
	}
1228

1229 1230 1231 1232 1233
	if (ret != 0)
		return ret;

	ret = xpc_setup_partitions();
	if (ret != 0) {
1234
		dev_err(xpc_part, "can't get memory for partition structure\n");
1235
		goto out_1;
1236
	}
1237

1238 1239
	xpc_sysctl = register_sysctl_table(xpc_sys_dir);

1240 1241 1242 1243 1244
	/*
	 * Fill the partition reserved page with the information needed by
	 * other partitions to discover we are alive and establish initial
	 * communications.
	 */
1245 1246
	ret = xpc_setup_rsvd_page();
	if (ret != 0) {
1247
		dev_err(xpc_part, "can't setup our reserved page\n");
1248
		goto out_2;
1249 1250
	}

1251 1252
	/* add ourselves to the reboot_notifier_list */
	ret = register_reboot_notifier(&xpc_reboot_notifier);
1253
	if (ret != 0)
1254 1255
		dev_warn(xpc_part, "can't register reboot notifier\n");

1256
	/* add ourselves to the die_notifier list */
1257
	ret = register_die_notifier(&xpc_die_notifier);
1258
	if (ret != 0)
1259 1260
		dev_warn(xpc_part, "can't register die notifier\n");

1261 1262 1263 1264
	/*
	 * The real work-horse behind xpc.  This processes incoming
	 * interrupts and monitors remote heartbeats.
	 */
1265 1266
	kthread = kthread_run(xpc_hb_checker, NULL, XPC_HB_CHECK_THREAD_NAME);
	if (IS_ERR(kthread)) {
1267
		dev_err(xpc_part, "failed while forking hb check thread\n");
1268
		ret = -EBUSY;
1269
		goto out_3;
1270 1271 1272 1273 1274 1275 1276
	}

	/*
	 * Startup a thread that will attempt to discover other partitions to
	 * activate based on info provided by SAL. This new thread is short
	 * lived and will exit once discovery is complete.
	 */
1277 1278 1279
	kthread = kthread_run(xpc_initiate_discovery, NULL,
			      XPC_DISCOVERY_THREAD_NAME);
	if (IS_ERR(kthread)) {
1280 1281 1282
		dev_err(xpc_part, "failed while forking discovery thread\n");

		/* mark this new thread as a non-starter */
J
Jes Sorensen 已提交
1283
		complete(&xpc_discovery_exited);
1284

1285
		xpc_do_exit(xpUnloading);
1286 1287 1288 1289 1290
		return -EBUSY;
	}

	/* set the interface to point at XPC's functions */
	xpc_set_interface(xpc_initiate_connect, xpc_initiate_disconnect,
1291 1292
			  xpc_initiate_send, xpc_initiate_send_notify,
			  xpc_initiate_received, xpc_initiate_partid_to_nasids);
1293 1294

	return 0;
1295 1296

	/* initialization was not successful */
1297
out_3:
1298
	xpc_teardown_rsvd_page();
1299

1300 1301
	(void)unregister_die_notifier(&xpc_die_notifier);
	(void)unregister_reboot_notifier(&xpc_reboot_notifier);
1302
out_2:
1303 1304
	if (xpc_sysctl)
		unregister_sysctl_table(xpc_sysctl);
1305 1306

	xpc_teardown_partitions();
1307 1308 1309
out_1:
	if (is_shub())
		xpc_exit_sn2();
1310
	else if (is_uv())
1311
		xpc_exit_uv();
1312
	return ret;
1313 1314
}

1315
module_init(xpc_init);
1316 1317 1318 1319

void __exit
xpc_exit(void)
{
1320
	xpc_do_exit(xpUnloading);
1321 1322
}

1323
module_exit(xpc_exit);
1324 1325 1326 1327 1328 1329 1330

MODULE_AUTHOR("Silicon Graphics, Inc.");
MODULE_DESCRIPTION("Cross Partition Communication (XPC) support");
MODULE_LICENSE("GPL");

module_param(xpc_hb_interval, int, 0);
MODULE_PARM_DESC(xpc_hb_interval, "Number of seconds between "
1331
		 "heartbeat increments.");
1332 1333 1334

module_param(xpc_hb_check_interval, int, 0);
MODULE_PARM_DESC(xpc_hb_check_interval, "Number of seconds between "
1335
		 "heartbeat checks.");
1336

1337 1338 1339
module_param(xpc_disengage_timelimit, int, 0);
MODULE_PARM_DESC(xpc_disengage_timelimit, "Number of seconds to wait "
		 "for disengage to complete.");
1340

1341 1342
module_param(xpc_kdebug_ignore, int, 0);
MODULE_PARM_DESC(xpc_kdebug_ignore, "Should lack of heartbeat be ignored by "
1343
		 "other partitions when dropping into kdebug.");