xpc_main.c 37.8 KB
Newer Older
1 2 3 4 5
/*
 * This file is subject to the terms and conditions of the GNU General Public
 * License.  See the file "COPYING" in the main directory of this archive
 * for more details.
 *
6
 * Copyright (c) 2004-2008 Silicon Graphics, Inc.  All Rights Reserved.
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
 */

/*
 * Cross Partition Communication (XPC) support - standard version.
 *
 *	XPC provides a message passing capability that crosses partition
 *	boundaries. This module is made up of two parts:
 *
 *	    partition	This part detects the presence/absence of other
 *			partitions. It provides a heartbeat and monitors
 *			the heartbeats of other partitions.
 *
 *	    channel	This part manages the channels and sends/receives
 *			messages across them to/from other partitions.
 *
 *	There are a couple of additional functions residing in XP, which
 *	provide an interface to XPC for its users.
 *
 *
 *	Caveats:
 *
28
 *	  . Currently on sn2, we have no way to determine which nasid an IRQ
29 30 31 32 33
 *	    came from. Thus, xpc_send_IRQ_sn2() does a remote amo write
 *	    followed by an IPI. The amo indicates where data is to be pulled
 *	    from, so after the IPI arrives, the remote partition checks the amo
 *	    word. The IPI can actually arrive before the amo however, so other
 *	    code must periodically check for this case. Also, remote amo
34 35 36 37
 *	    operations do not reliably time out. Thus we do a remote PIO read
 *	    solely to know whether the remote partition is down and whether we
 *	    should stop sending IPIs to it. This remote PIO read operation is
 *	    set up in a special nofault region so SAL knows to ignore (and
38
 *	    cleanup) any errors due to the remote amo write, PIO read, and/or
39
 *	    PIO write operations.
40 41 42 43 44 45 46
 *
 *	    If/when new hardware solves this IPI problem, we should abandon
 *	    the current approach.
 *
 */

#include <linux/module.h>
47 48
#include <linux/sysctl.h>
#include <linux/device.h>
49
#include <linux/delay.h>
50
#include <linux/reboot.h>
51
#include <linux/kdebug.h>
52
#include <linux/kthread.h>
53
#include "xpc.h"
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73

/* define two XPC debug device structures to be used with dev_dbg() et al */

struct device_driver xpc_dbg_name = {
	.name = "xpc"
};

struct device xpc_part_dbg_subname = {
	.bus_id = {0},		/* set to "part" at xpc_init() time */
	.driver = &xpc_dbg_name
};

struct device xpc_chan_dbg_subname = {
	.bus_id = {0},		/* set to "chan" at xpc_init() time */
	.driver = &xpc_dbg_name
};

struct device *xpc_part = &xpc_part_dbg_subname;
struct device *xpc_chan = &xpc_chan_dbg_subname;

74 75
static int xpc_kdebug_ignore;

76 77
/* systune related variables for /proc/sys directories */

78 79 80
static int xpc_hb_interval = XPC_HB_DEFAULT_INTERVAL;
static int xpc_hb_min_interval = 1;
static int xpc_hb_max_interval = 10;
81

82 83 84
static int xpc_hb_check_interval = XPC_HB_CHECK_DEFAULT_INTERVAL;
static int xpc_hb_check_min_interval = 10;
static int xpc_hb_check_max_interval = 120;
85

86 87 88
int xpc_disengage_timelimit = XPC_DISENGAGE_DEFAULT_TIMELIMIT;
static int xpc_disengage_min_timelimit;	/* = 0 */
static int xpc_disengage_max_timelimit = 120;
89 90 91

static ctl_table xpc_sys_xpc_hb_dir[] = {
	{
92 93 94 95 96 97 98 99 100
	 .ctl_name = CTL_UNNUMBERED,
	 .procname = "hb_interval",
	 .data = &xpc_hb_interval,
	 .maxlen = sizeof(int),
	 .mode = 0644,
	 .proc_handler = &proc_dointvec_minmax,
	 .strategy = &sysctl_intvec,
	 .extra1 = &xpc_hb_min_interval,
	 .extra2 = &xpc_hb_max_interval},
101
	{
102 103 104 105 106 107 108 109 110
	 .ctl_name = CTL_UNNUMBERED,
	 .procname = "hb_check_interval",
	 .data = &xpc_hb_check_interval,
	 .maxlen = sizeof(int),
	 .mode = 0644,
	 .proc_handler = &proc_dointvec_minmax,
	 .strategy = &sysctl_intvec,
	 .extra1 = &xpc_hb_check_min_interval,
	 .extra2 = &xpc_hb_check_max_interval},
111
	{}
112 113 114
};
static ctl_table xpc_sys_xpc_dir[] = {
	{
115 116 117 118
	 .ctl_name = CTL_UNNUMBERED,
	 .procname = "hb",
	 .mode = 0555,
	 .child = xpc_sys_xpc_hb_dir},
119
	{
120
	 .ctl_name = CTL_UNNUMBERED,
121 122
	 .procname = "disengage_timelimit",
	 .data = &xpc_disengage_timelimit,
123 124 125 126
	 .maxlen = sizeof(int),
	 .mode = 0644,
	 .proc_handler = &proc_dointvec_minmax,
	 .strategy = &sysctl_intvec,
127 128
	 .extra1 = &xpc_disengage_min_timelimit,
	 .extra2 = &xpc_disengage_max_timelimit},
129
	{}
130 131 132
};
static ctl_table xpc_sys_dir[] = {
	{
133 134 135 136
	 .ctl_name = CTL_UNNUMBERED,
	 .procname = "xpc",
	 .mode = 0555,
	 .child = xpc_sys_xpc_dir},
137
	{}
138 139 140
};
static struct ctl_table_header *xpc_sysctl;

141 142
/* non-zero if any remote partition disengage was timed out */
int xpc_disengage_timedout;
143

144 145 146
/* #of activate IRQs received and not yet processed */
int xpc_activate_IRQ_rcvd;
DEFINE_SPINLOCK(xpc_activate_IRQ_rcvd_lock);
147 148

/* IRQ handler notifies this wait queue on receipt of an IRQ */
149
DECLARE_WAIT_QUEUE_HEAD(xpc_activate_IRQ_wq);
150 151

static unsigned long xpc_hb_check_timeout;
152 153
static struct timer_list xpc_hb_timer;
void *xpc_heartbeating_to_mask;
154

155
/* notification that the xpc_hb_checker thread has exited */
J
Jes Sorensen 已提交
156
static DECLARE_COMPLETION(xpc_hb_checker_exited);
157

158
/* notification that the xpc_discovery thread has exited */
J
Jes Sorensen 已提交
159
static DECLARE_COMPLETION(xpc_discovery_exited);
160 161 162

static void xpc_kthread_waitmsgs(struct xpc_partition *, struct xpc_channel *);

163 164 165 166 167
static int xpc_system_reboot(struct notifier_block *, unsigned long, void *);
static struct notifier_block xpc_reboot_notifier = {
	.notifier_call = xpc_system_reboot,
};

168 169 170 171 172
static int xpc_system_die(struct notifier_block *, unsigned long, void *);
static struct notifier_block xpc_die_notifier = {
	.notifier_call = xpc_system_die,
};

173
int (*xpc_setup_partitions_sn) (void);
174 175 176
enum xp_retval (*xpc_get_partition_rsvd_page_pa) (void *buf, u64 *cookie,
						  unsigned long *rp_pa,
						  size_t *len);
177
int (*xpc_setup_rsvd_page_sn) (struct xpc_rsvd_page *rp);
178 179 180 181 182
void (*xpc_heartbeat_init) (void);
void (*xpc_heartbeat_exit) (void);
void (*xpc_increment_heartbeat) (void);
void (*xpc_offline_heartbeat) (void);
void (*xpc_online_heartbeat) (void);
183
enum xp_retval (*xpc_get_remote_heartbeat) (struct xpc_partition *part);
184

185
enum xp_retval (*xpc_make_first_contact) (struct xpc_partition *part);
186
void (*xpc_notify_senders_of_disconnect) (struct xpc_channel *ch);
187
u64 (*xpc_get_chctl_all_flags) (struct xpc_partition *part);
188 189
enum xp_retval (*xpc_setup_msg_structures) (struct xpc_channel *ch);
void (*xpc_teardown_msg_structures) (struct xpc_channel *ch);
190
void (*xpc_process_msg_chctl_flags) (struct xpc_partition *part, int ch_number);
191 192
int (*xpc_n_of_deliverable_payloads) (struct xpc_channel *ch);
void *(*xpc_get_deliverable_payload) (struct xpc_channel *ch);
193

194
void (*xpc_request_partition_activation) (struct xpc_rsvd_page *remote_rp,
195 196
					  unsigned long remote_rp_pa,
					  int nasid);
197 198 199
void (*xpc_request_partition_reactivation) (struct xpc_partition *part);
void (*xpc_request_partition_deactivation) (struct xpc_partition *part);
void (*xpc_cancel_partition_deactivation_request) (struct xpc_partition *part);
200

201 202 203
void (*xpc_process_activate_IRQ_rcvd) (void);
enum xp_retval (*xpc_setup_ch_structures_sn) (struct xpc_partition *part);
void (*xpc_teardown_ch_structures_sn) (struct xpc_partition *part);
204

205 206 207 208 209 210
void (*xpc_indicate_partition_engaged) (struct xpc_partition *part);
int (*xpc_partition_engaged) (short partid);
int (*xpc_any_partition_engaged) (void);
void (*xpc_indicate_partition_disengaged) (struct xpc_partition *part);
void (*xpc_assume_partition_disengaged) (short partid);

211
void (*xpc_send_chctl_closerequest) (struct xpc_channel *ch,
212
				     unsigned long *irq_flags);
213 214 215
void (*xpc_send_chctl_closereply) (struct xpc_channel *ch,
				   unsigned long *irq_flags);
void (*xpc_send_chctl_openrequest) (struct xpc_channel *ch,
216
				    unsigned long *irq_flags);
217 218
void (*xpc_send_chctl_openreply) (struct xpc_channel *ch,
				  unsigned long *irq_flags);
219

220 221 222
void (*xpc_save_remote_msgqueue_pa) (struct xpc_channel *ch,
				     unsigned long msgqueue_pa);

223 224 225 226 227
enum xp_retval (*xpc_send_payload) (struct xpc_channel *ch, u32 flags,
				    void *payload, u16 payload_size,
				    u8 notify_type, xpc_notify_func func,
				    void *key);
void (*xpc_received_payload) (struct xpc_channel *ch, void *payload);
228

229
/*
230
 * Timer function to enforce the timelimit on the partition disengage.
231 232
 */
static void
233
xpc_timeout_partition_disengage(unsigned long data)
234
{
235
	struct xpc_partition *part = (struct xpc_partition *)data;
236

237
	DBUG_ON(time_is_after_jiffies(part->disengage_timeout));
238

239
	(void)xpc_partition_disengaged(part);
240

241 242
	DBUG_ON(part->disengage_timeout != 0);
	DBUG_ON(xpc_partition_engaged(XPC_PARTID(part)));
243 244
}

245 246 247 248 249 250 251 252
/*
 * Timer to produce the heartbeat.  The timer structures function is
 * already set when this is initially called.  A tunable is used to
 * specify when the next timeout should occur.
 */
static void
xpc_hb_beater(unsigned long dummy)
{
253
	xpc_increment_heartbeat();
254

255
	if (time_is_before_eq_jiffies(xpc_hb_check_timeout))
256
		wake_up_interruptible(&xpc_activate_IRQ_wq);
257 258 259 260 261

	xpc_hb_timer.expires = jiffies + (xpc_hb_interval * HZ);
	add_timer(&xpc_hb_timer);
}

262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277
static void
xpc_start_hb_beater(void)
{
	xpc_heartbeat_init();
	init_timer(&xpc_hb_timer);
	xpc_hb_timer.function = xpc_hb_beater;
	xpc_hb_beater(0);
}

static void
xpc_stop_hb_beater(void)
{
	del_timer_sync(&xpc_hb_timer);
	xpc_heartbeat_exit();
}

278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298
/*
 * At periodic intervals, scan through all active partitions and ensure
 * their heartbeat is still active.  If not, the partition is deactivated.
 */
static void
xpc_check_remote_hb(void)
{
	struct xpc_partition *part;
	short partid;
	enum xp_retval ret;

	for (partid = 0; partid < xp_max_npartitions; partid++) {

		if (xpc_exiting)
			break;

		if (partid == xp_partition_id)
			continue;

		part = &xpc_partitions[partid];

299 300
		if (part->act_state == XPC_P_AS_INACTIVE ||
		    part->act_state == XPC_P_AS_DEACTIVATING) {
301 302 303 304 305 306 307 308 309
			continue;
		}

		ret = xpc_get_remote_heartbeat(part);
		if (ret != xpSuccess)
			XPC_DEACTIVATE_PARTITION(part, ret);
	}
}

310 311 312 313 314 315 316
/*
 * This thread is responsible for nearly all of the partition
 * activation/deactivation.
 */
static int
xpc_hb_checker(void *ignore)
{
317
	int force_IRQ = 0;
318 319 320

	/* this thread was marked active by xpc_hb_init() */

321
	set_cpus_allowed_ptr(current, &cpumask_of_cpu(XPC_HB_CHECK_CPU));
322

323
	/* set our heartbeating to other partitions into motion */
324
	xpc_hb_check_timeout = jiffies + (xpc_hb_check_interval * HZ);
325
	xpc_start_hb_beater();
326

327
	while (!xpc_exiting) {
328 329 330

		dev_dbg(xpc_part, "woke up with %d ticks rem; %d IRQs have "
			"been received\n",
331
			(int)(xpc_hb_check_timeout - jiffies),
332
			xpc_activate_IRQ_rcvd);
333 334

		/* checking of remote heartbeats is skewed by IRQ handling */
335
		if (time_is_before_eq_jiffies(xpc_hb_check_timeout)) {
336 337 338
			xpc_hb_check_timeout = jiffies +
			    (xpc_hb_check_interval * HZ);

339 340 341 342
			dev_dbg(xpc_part, "checking remote heartbeats\n");
			xpc_check_remote_hb();

			/*
343 344
			 * On sn2 we need to periodically recheck to ensure no
			 * IRQ/amo pairs have been missed.
345
			 */
346 347
			if (is_shub())
				force_IRQ = 1;
348 349
		}

350
		/* check for outstanding IRQs */
351
		if (xpc_activate_IRQ_rcvd > 0 || force_IRQ != 0) {
352
			force_IRQ = 0;
353 354 355
			dev_dbg(xpc_part, "processing activate IRQs "
				"received\n");
			xpc_process_activate_IRQ_rcvd();
356
		}
357 358

		/* wait for IRQ or timeout */
359
		(void)wait_event_interruptible(xpc_activate_IRQ_wq,
360
					       (time_is_before_eq_jiffies(
361
						xpc_hb_check_timeout) ||
362
						xpc_activate_IRQ_rcvd > 0 ||
363
						xpc_exiting));
364 365
	}

366 367
	xpc_stop_hb_beater();

368 369
	dev_dbg(xpc_part, "heartbeat checker is exiting\n");

370
	/* mark this thread as having exited */
J
Jes Sorensen 已提交
371
	complete(&xpc_hb_checker_exited);
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386
	return 0;
}

/*
 * This thread will attempt to discover other partitions to activate
 * based on info provided by SAL. This new thread is short lived and
 * will exit once discovery is complete.
 */
static int
xpc_initiate_discovery(void *ignore)
{
	xpc_discovery();

	dev_dbg(xpc_part, "discovery thread is exiting\n");

387
	/* mark this thread as having exited */
J
Jes Sorensen 已提交
388
	complete(&xpc_discovery_exited);
389 390 391 392 393
	return 0;
}

/*
 * The first kthread assigned to a newly activated partition is the one
394
 * created by XPC HB with which it calls xpc_activating(). XPC hangs on to
395 396 397 398 399 400 401 402 403 404 405 406
 * that kthread until the partition is brought down, at which time that kthread
 * returns back to XPC HB. (The return of that kthread will signify to XPC HB
 * that XPC has dismantled all communication infrastructure for the associated
 * partition.) This kthread becomes the channel manager for that partition.
 *
 * Each active partition has a channel manager, who, besides connecting and
 * disconnecting channels, will ensure that each of the partition's connected
 * channels has the required number of assigned kthreads to get the work done.
 */
static void
xpc_channel_mgr(struct xpc_partition *part)
{
407
	while (part->act_state != XPC_P_AS_DEACTIVATING ||
408 409
	       atomic_read(&part->nchannels_active) > 0 ||
	       !xpc_partition_disengaged(part)) {
410

411
		xpc_process_sent_chctl_flags(part);
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426

		/*
		 * Wait until we've been requested to activate kthreads or
		 * all of the channel's message queues have been torn down or
		 * a signal is pending.
		 *
		 * The channel_mgr_requests is set to 1 after being awakened,
		 * This is done to prevent the channel mgr from making one pass
		 * through the loop for each request, since he will
		 * be servicing all the requests in one pass. The reason it's
		 * set to 1 instead of 0 is so that other kthreads will know
		 * that the channel mgr is running and won't bother trying to
		 * wake him up.
		 */
		atomic_dec(&part->channel_mgr_requests);
427
		(void)wait_event_interruptible(part->channel_mgr_wq,
428
				(atomic_read(&part->channel_mgr_requests) > 0 ||
429
				 part->chctl.all_flags != 0 ||
430
				 (part->act_state == XPC_P_AS_DEACTIVATING &&
431 432
				 atomic_read(&part->nchannels_active) == 0 &&
				 xpc_partition_disengaged(part))));
433 434 435 436
		atomic_set(&part->channel_mgr_requests, 1);
	}
}

437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583
/*
 * Guarantee that the kzalloc'd memory is cacheline aligned.
 */
void *
xpc_kzalloc_cacheline_aligned(size_t size, gfp_t flags, void **base)
{
	/* see if kzalloc will give us cachline aligned memory by default */
	*base = kzalloc(size, flags);
	if (*base == NULL)
		return NULL;

	if ((u64)*base == L1_CACHE_ALIGN((u64)*base))
		return *base;

	kfree(*base);

	/* nope, we'll have to do it ourselves */
	*base = kzalloc(size + L1_CACHE_BYTES, flags);
	if (*base == NULL)
		return NULL;

	return (void *)L1_CACHE_ALIGN((u64)*base);
}

/*
 * Setup the channel structures necessary to support XPartition Communication
 * between the specified remote partition and the local one.
 */
static enum xp_retval
xpc_setup_ch_structures(struct xpc_partition *part)
{
	enum xp_retval ret;
	int ch_number;
	struct xpc_channel *ch;
	short partid = XPC_PARTID(part);

	/*
	 * Allocate all of the channel structures as a contiguous chunk of
	 * memory.
	 */
	DBUG_ON(part->channels != NULL);
	part->channels = kzalloc(sizeof(struct xpc_channel) * XPC_MAX_NCHANNELS,
				 GFP_KERNEL);
	if (part->channels == NULL) {
		dev_err(xpc_chan, "can't get memory for channels\n");
		return xpNoMemory;
	}

	/* allocate the remote open and close args */

	part->remote_openclose_args =
	    xpc_kzalloc_cacheline_aligned(XPC_OPENCLOSE_ARGS_SIZE,
					  GFP_KERNEL, &part->
					  remote_openclose_args_base);
	if (part->remote_openclose_args == NULL) {
		dev_err(xpc_chan, "can't get memory for remote connect args\n");
		ret = xpNoMemory;
		goto out_1;
	}

	part->chctl.all_flags = 0;
	spin_lock_init(&part->chctl_lock);

	atomic_set(&part->channel_mgr_requests, 1);
	init_waitqueue_head(&part->channel_mgr_wq);

	part->nchannels = XPC_MAX_NCHANNELS;

	atomic_set(&part->nchannels_active, 0);
	atomic_set(&part->nchannels_engaged, 0);

	for (ch_number = 0; ch_number < part->nchannels; ch_number++) {
		ch = &part->channels[ch_number];

		ch->partid = partid;
		ch->number = ch_number;
		ch->flags = XPC_C_DISCONNECTED;

		atomic_set(&ch->kthreads_assigned, 0);
		atomic_set(&ch->kthreads_idle, 0);
		atomic_set(&ch->kthreads_active, 0);

		atomic_set(&ch->references, 0);
		atomic_set(&ch->n_to_notify, 0);

		spin_lock_init(&ch->lock);
		init_completion(&ch->wdisconnect_wait);

		atomic_set(&ch->n_on_msg_allocate_wq, 0);
		init_waitqueue_head(&ch->msg_allocate_wq);
		init_waitqueue_head(&ch->idle_wq);
	}

	ret = xpc_setup_ch_structures_sn(part);
	if (ret != xpSuccess)
		goto out_2;

	/*
	 * With the setting of the partition setup_state to XPC_P_SS_SETUP,
	 * we're declaring that this partition is ready to go.
	 */
	part->setup_state = XPC_P_SS_SETUP;

	return xpSuccess;

	/* setup of ch structures failed */
out_2:
	kfree(part->remote_openclose_args_base);
	part->remote_openclose_args = NULL;
out_1:
	kfree(part->channels);
	part->channels = NULL;
	return ret;
}

/*
 * Teardown the channel structures necessary to support XPartition Communication
 * between the specified remote partition and the local one.
 */
static void
xpc_teardown_ch_structures(struct xpc_partition *part)
{
	DBUG_ON(atomic_read(&part->nchannels_engaged) != 0);
	DBUG_ON(atomic_read(&part->nchannels_active) != 0);

	/*
	 * Make this partition inaccessible to local processes by marking it
	 * as no longer setup. Then wait before proceeding with the teardown
	 * until all existing references cease.
	 */
	DBUG_ON(part->setup_state != XPC_P_SS_SETUP);
	part->setup_state = XPC_P_SS_WTEARDOWN;

	wait_event(part->teardown_wq, (atomic_read(&part->references) == 0));

	/* now we can begin tearing down the infrastructure */

	xpc_teardown_ch_structures_sn(part);

	kfree(part->remote_openclose_args_base);
	part->remote_openclose_args = NULL;
	kfree(part->channels);
	part->channels = NULL;

	part->setup_state = XPC_P_SS_TORNDOWN;
}

584 585 586 587 588 589 590
/*
 * When XPC HB determines that a partition has come up, it will create a new
 * kthread and that kthread will call this function to attempt to set up the
 * basic infrastructure used for Cross Partition Communication with the newly
 * upped partition.
 *
 * The kthread that was created by XPC HB and which setup the XPC
591 592 593
 * infrastructure will remain assigned to the partition becoming the channel
 * manager for that partition until the partition is deactivating, at which
 * time the kthread will teardown the XPC infrastructure and then exit.
594 595 596 597
 */
static int
xpc_activating(void *__partid)
{
598
	short partid = (u64)__partid;
599 600 601
	struct xpc_partition *part = &xpc_partitions[partid];
	unsigned long irq_flags;

602
	DBUG_ON(partid < 0 || partid >= xp_max_npartitions);
603 604 605

	spin_lock_irqsave(&part->act_lock, irq_flags);

606 607
	if (part->act_state == XPC_P_AS_DEACTIVATING) {
		part->act_state = XPC_P_AS_INACTIVE;
608 609 610 611 612 613
		spin_unlock_irqrestore(&part->act_lock, irq_flags);
		part->remote_rp_pa = 0;
		return 0;
	}

	/* indicate the thread is activating */
614 615
	DBUG_ON(part->act_state != XPC_P_AS_ACTIVATION_REQ);
	part->act_state = XPC_P_AS_ACTIVATING;
616 617 618 619

	XPC_SET_REASON(part, 0, 0);
	spin_unlock_irqrestore(&part->act_lock, irq_flags);

620
	dev_dbg(xpc_part, "activating partition %d\n", partid);
621

622
	xpc_allow_hb(partid);
623

624
	if (xpc_setup_ch_structures(part) == xpSuccess) {
625 626 627 628 629 630 631 632 633
		(void)xpc_part_ref(part);	/* this will always succeed */

		if (xpc_make_first_contact(part) == xpSuccess) {
			xpc_mark_partition_active(part);
			xpc_channel_mgr(part);
			/* won't return until partition is deactivating */
		}

		xpc_part_deref(part);
634
		xpc_teardown_ch_structures(part);
635
	}
636

637
	xpc_disallow_hb(partid);
638 639
	xpc_mark_partition_inactive(part);

640
	if (part->reason == xpReactivating) {
641
		/* interrupting ourselves results in activating partition */
642
		xpc_request_partition_reactivation(part);
643 644 645 646 647 648 649 650
	}

	return 0;
}

void
xpc_activate_partition(struct xpc_partition *part)
{
651
	short partid = XPC_PARTID(part);
652
	unsigned long irq_flags;
653
	struct task_struct *kthread;
654 655 656

	spin_lock_irqsave(&part->act_lock, irq_flags);

657
	DBUG_ON(part->act_state != XPC_P_AS_INACTIVE);
658

659
	part->act_state = XPC_P_AS_ACTIVATION_REQ;
660
	XPC_SET_REASON(part, xpCloneKThread, __LINE__);
661 662

	spin_unlock_irqrestore(&part->act_lock, irq_flags);
663

664 665 666
	kthread = kthread_run(xpc_activating, (void *)((u64)partid), "xpc%02d",
			      partid);
	if (IS_ERR(kthread)) {
667
		spin_lock_irqsave(&part->act_lock, irq_flags);
668
		part->act_state = XPC_P_AS_INACTIVE;
669
		XPC_SET_REASON(part, xpCloneKThreadFailed, __LINE__);
670 671
		spin_unlock_irqrestore(&part->act_lock, irq_flags);
	}
672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693
}

void
xpc_activate_kthreads(struct xpc_channel *ch, int needed)
{
	int idle = atomic_read(&ch->kthreads_idle);
	int assigned = atomic_read(&ch->kthreads_assigned);
	int wakeup;

	DBUG_ON(needed <= 0);

	if (idle > 0) {
		wakeup = (needed > idle) ? idle : needed;
		needed -= wakeup;

		dev_dbg(xpc_chan, "wakeup %d idle kthreads, partid=%d, "
			"channel=%d\n", wakeup, ch->partid, ch->number);

		/* only wakeup the requested number of kthreads */
		wake_up_nr(&ch->idle_wq, wakeup);
	}

694
	if (needed <= 0)
695 696 697 698
		return;

	if (needed + assigned > ch->kthreads_assigned_limit) {
		needed = ch->kthreads_assigned_limit - assigned;
699
		if (needed <= 0)
700 701 702 703 704 705
			return;
	}

	dev_dbg(xpc_chan, "create %d new kthreads, partid=%d, channel=%d\n",
		needed, ch->partid, ch->number);

706
	xpc_create_kthreads(ch, needed, 0);
707 708 709 710 711 712 713 714 715 716 717
}

/*
 * This function is where XPC's kthreads wait for messages to deliver.
 */
static void
xpc_kthread_waitmsgs(struct xpc_partition *part, struct xpc_channel *ch)
{
	do {
		/* deliver messages to their intended recipients */

718
		while (xpc_n_of_deliverable_payloads(ch) > 0 &&
719
		       !(ch->flags & XPC_C_DISCONNECTING)) {
720
			xpc_deliver_payload(ch);
721 722 723
		}

		if (atomic_inc_return(&ch->kthreads_idle) >
724
		    ch->kthreads_idle_limit) {
725 726 727 728 729 730 731 732
			/* too many idle kthreads on this channel */
			atomic_dec(&ch->kthreads_idle);
			break;
		}

		dev_dbg(xpc_chan, "idle kthread calling "
			"wait_event_interruptible_exclusive()\n");

733
		(void)wait_event_interruptible_exclusive(ch->idle_wq,
734
				(xpc_n_of_deliverable_payloads(ch) > 0 ||
735
				 (ch->flags & XPC_C_DISCONNECTING)));
736 737 738

		atomic_dec(&ch->kthreads_idle);

739
	} while (!(ch->flags & XPC_C_DISCONNECTING));
740 741 742
}

static int
743
xpc_kthread_start(void *args)
744
{
745
	short partid = XPC_UNPACK_ARG1(args);
746 747 748 749
	u16 ch_number = XPC_UNPACK_ARG2(args);
	struct xpc_partition *part = &xpc_partitions[partid];
	struct xpc_channel *ch;
	int n_needed;
750
	unsigned long irq_flags;
751 752 753 754 755 756 757 758 759 760

	dev_dbg(xpc_chan, "kthread starting, partid=%d, channel=%d\n",
		partid, ch_number);

	ch = &part->channels[ch_number];

	if (!(ch->flags & XPC_C_DISCONNECTING)) {

		/* let registerer know that connection has been established */

761
		spin_lock_irqsave(&ch->lock, irq_flags);
762 763
		if (!(ch->flags & XPC_C_CONNECTEDCALLOUT)) {
			ch->flags |= XPC_C_CONNECTEDCALLOUT;
764 765
			spin_unlock_irqrestore(&ch->lock, irq_flags);

766 767
			xpc_connected_callout(ch);

768 769 770 771
			spin_lock_irqsave(&ch->lock, irq_flags);
			ch->flags |= XPC_C_CONNECTEDCALLOUT_MADE;
			spin_unlock_irqrestore(&ch->lock, irq_flags);

772 773 774 775 776 777 778
			/*
			 * It is possible that while the callout was being
			 * made that the remote partition sent some messages.
			 * If that is the case, we may need to activate
			 * additional kthreads to help deliver them. We only
			 * need one less than total #of messages to deliver.
			 */
779
			n_needed = xpc_n_of_deliverable_payloads(ch) - 1;
780
			if (n_needed > 0 && !(ch->flags & XPC_C_DISCONNECTING))
781
				xpc_activate_kthreads(ch, n_needed);
782

783 784
		} else {
			spin_unlock_irqrestore(&ch->lock, irq_flags);
785 786 787 788 789
		}

		xpc_kthread_waitmsgs(part, ch);
	}

790
	/* let registerer know that connection is disconnecting */
791

792 793
	spin_lock_irqsave(&ch->lock, irq_flags);
	if ((ch->flags & XPC_C_CONNECTEDCALLOUT_MADE) &&
794
	    !(ch->flags & XPC_C_DISCONNECTINGCALLOUT)) {
795
		ch->flags |= XPC_C_DISCONNECTINGCALLOUT;
796
		spin_unlock_irqrestore(&ch->lock, irq_flags);
797

798
		xpc_disconnect_callout(ch, xpDisconnecting);
799 800 801 802 803 804

		spin_lock_irqsave(&ch->lock, irq_flags);
		ch->flags |= XPC_C_DISCONNECTINGCALLOUT_MADE;
	}
	spin_unlock_irqrestore(&ch->lock, irq_flags);

805 806 807
	if (atomic_dec_return(&ch->kthreads_assigned) == 0 &&
	    atomic_dec_return(&part->nchannels_engaged) == 0) {
		xpc_indicate_partition_disengaged(part);
808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831
	}

	xpc_msgqueue_deref(ch);

	dev_dbg(xpc_chan, "kthread exiting, partid=%d, channel=%d\n",
		partid, ch_number);

	xpc_part_deref(part);
	return 0;
}

/*
 * For each partition that XPC has established communications with, there is
 * a minimum of one kernel thread assigned to perform any operation that
 * may potentially sleep or block (basically the callouts to the asynchronous
 * functions registered via xpc_connect()).
 *
 * Additional kthreads are created and destroyed by XPC as the workload
 * demands.
 *
 * A kthread is assigned to one of the active channels that exists for a given
 * partition.
 */
void
832
xpc_create_kthreads(struct xpc_channel *ch, int needed,
833
		    int ignore_disconnecting)
834 835 836
{
	unsigned long irq_flags;
	u64 args = XPC_PACK_ARGS(ch->partid, ch->number);
837
	struct xpc_partition *part = &xpc_partitions[ch->partid];
838
	struct task_struct *kthread;
839 840

	while (needed-- > 0) {
841 842 843 844 845 846

		/*
		 * The following is done on behalf of the newly created
		 * kthread. That kthread is responsible for doing the
		 * counterpart to the following before it exits.
		 */
847 848 849 850
		if (ignore_disconnecting) {
			if (!atomic_inc_not_zero(&ch->kthreads_assigned)) {
				/* kthreads assigned had gone to zero */
				BUG_ON(!(ch->flags &
851
					 XPC_C_DISCONNECTINGCALLOUT_MADE));
852 853 854 855 856 857
				break;
			}

		} else if (ch->flags & XPC_C_DISCONNECTING) {
			break;

858 859 860
		} else if (atomic_inc_return(&ch->kthreads_assigned) == 1 &&
			   atomic_inc_return(&part->nchannels_engaged) == 1) {
				xpc_indicate_partition_engaged(part);
861
		}
862
		(void)xpc_part_ref(part);
863 864
		xpc_msgqueue_ref(ch);

865 866 867
		kthread = kthread_run(xpc_kthread_start, (void *)args,
				      "xpc%02dc%d", ch->partid, ch->number);
		if (IS_ERR(kthread)) {
868
			/* the fork failed */
869 870 871 872 873 874 875

			/*
			 * NOTE: if (ignore_disconnecting &&
			 * !(ch->flags & XPC_C_DISCONNECTINGCALLOUT)) is true,
			 * then we'll deadlock if all other kthreads assigned
			 * to this channel are blocked in the channel's
			 * registerer, because the only thing that will unblock
876
			 * them is the xpDisconnecting callout that this
877
			 * failed kthread_run() would have made.
878 879
			 */

880 881
			if (atomic_dec_return(&ch->kthreads_assigned) == 0 &&
			    atomic_dec_return(&part->nchannels_engaged) == 0) {
882
				xpc_indicate_partition_disengaged(part);
883 884 885
			}
			xpc_msgqueue_deref(ch);
			xpc_part_deref(part);
886 887

			if (atomic_read(&ch->kthreads_assigned) <
888
			    ch->kthreads_idle_limit) {
889 890 891 892 893 894
				/*
				 * Flag this as an error only if we have an
				 * insufficient #of kthreads for the channel
				 * to function.
				 */
				spin_lock_irqsave(&ch->lock, irq_flags);
895
				XPC_DISCONNECT_CHANNEL(ch, xpLackOfResources,
896
						       &irq_flags);
897 898 899 900 901 902 903 904 905 906
				spin_unlock_irqrestore(&ch->lock, irq_flags);
			}
			break;
		}
	}
}

void
xpc_disconnect_wait(int ch_number)
{
907
	unsigned long irq_flags;
908
	short partid;
909 910
	struct xpc_partition *part;
	struct xpc_channel *ch;
911
	int wakeup_channel_mgr;
912 913

	/* now wait for all callouts to the caller's function to cease */
914
	for (partid = 0; partid < xp_max_npartitions; partid++) {
915 916
		part = &xpc_partitions[partid];

917
		if (!xpc_part_ref(part))
918
			continue;
919

920
		ch = &part->channels[ch_number];
921

922
		if (!(ch->flags & XPC_C_WDISCONNECT)) {
923
			xpc_part_deref(part);
924
			continue;
925
		}
926

J
Jes Sorensen 已提交
927
		wait_for_completion(&ch->wdisconnect_wait);
928 929 930 931 932

		spin_lock_irqsave(&ch->lock, irq_flags);
		DBUG_ON(!(ch->flags & XPC_C_DISCONNECTED));
		wakeup_channel_mgr = 0;

933
		if (ch->delayed_chctl_flags) {
934
			if (part->act_state != XPC_P_AS_DEACTIVATING) {
935 936 937 938
				spin_lock(&part->chctl_lock);
				part->chctl.flags[ch->number] |=
				    ch->delayed_chctl_flags;
				spin_unlock(&part->chctl_lock);
939 940
				wakeup_channel_mgr = 1;
			}
941
			ch->delayed_chctl_flags = 0;
942
		}
943 944 945 946

		ch->flags &= ~XPC_C_WDISCONNECT;
		spin_unlock_irqrestore(&ch->lock, irq_flags);

947
		if (wakeup_channel_mgr)
948 949 950
			xpc_wakeup_channel_mgr(part);

		xpc_part_deref(part);
951 952 953
	}
}

954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003
static int
xpc_setup_partitions(void)
{
	short partid;
	struct xpc_partition *part;

	xpc_partitions = kzalloc(sizeof(struct xpc_partition) *
				 xp_max_npartitions, GFP_KERNEL);
	if (xpc_partitions == NULL) {
		dev_err(xpc_part, "can't get memory for partition structure\n");
		return -ENOMEM;
	}

	/*
	 * The first few fields of each entry of xpc_partitions[] need to
	 * be initialized now so that calls to xpc_connect() and
	 * xpc_disconnect() can be made prior to the activation of any remote
	 * partition. NOTE THAT NONE OF THE OTHER FIELDS BELONGING TO THESE
	 * ENTRIES ARE MEANINGFUL UNTIL AFTER AN ENTRY'S CORRESPONDING
	 * PARTITION HAS BEEN ACTIVATED.
	 */
	for (partid = 0; partid < xp_max_npartitions; partid++) {
		part = &xpc_partitions[partid];

		DBUG_ON((u64)part != L1_CACHE_ALIGN((u64)part));

		part->activate_IRQ_rcvd = 0;
		spin_lock_init(&part->act_lock);
		part->act_state = XPC_P_AS_INACTIVE;
		XPC_SET_REASON(part, 0, 0);

		init_timer(&part->disengage_timer);
		part->disengage_timer.function =
		    xpc_timeout_partition_disengage;
		part->disengage_timer.data = (unsigned long)part;

		part->setup_state = XPC_P_SS_UNSET;
		init_waitqueue_head(&part->teardown_wq);
		atomic_set(&part->references, 0);
	}

	return xpc_setup_partitions_sn();
}

static void
xpc_teardown_partitions(void)
{
	kfree(xpc_partitions);
}

1004
static void
1005
xpc_do_exit(enum xp_retval reason)
1006
{
1007
	short partid;
1008
	int active_part_count, printed_waiting_msg = 0;
1009
	struct xpc_partition *part;
1010
	unsigned long printmsg_time, disengage_timeout = 0;
1011

1012 1013
	/* a 'rmmod XPC' and a 'reboot' cannot both end up here together */
	DBUG_ON(xpc_exiting == 1);
1014 1015

	/*
1016 1017 1018
	 * Let the heartbeat checker thread and the discovery thread
	 * (if one is running) know that they should exit. Also wake up
	 * the heartbeat checker thread in case it's sleeping.
1019 1020
	 */
	xpc_exiting = 1;
1021
	wake_up_interruptible(&xpc_activate_IRQ_wq);
1022

1023
	/* wait for the discovery thread to exit */
J
Jes Sorensen 已提交
1024
	wait_for_completion(&xpc_discovery_exited);
1025

1026
	/* wait for the heartbeat checker thread to exit */
J
Jes Sorensen 已提交
1027
	wait_for_completion(&xpc_hb_checker_exited);
1028

1029
	/* sleep for a 1/3 of a second or so */
1030
	(void)msleep_interruptible(300);
1031 1032 1033

	/* wait for all partitions to become inactive */

1034 1035
	printmsg_time = jiffies + (XPC_DEACTIVATE_PRINTMSG_INTERVAL * HZ);
	xpc_disengage_timedout = 0;
1036

1037 1038 1039
	do {
		active_part_count = 0;

1040
		for (partid = 0; partid < xp_max_npartitions; partid++) {
1041 1042
			part = &xpc_partitions[partid];

1043
			if (xpc_partition_disengaged(part) &&
1044
			    part->act_state == XPC_P_AS_INACTIVE) {
1045
				continue;
1046
			}
1047 1048 1049 1050

			active_part_count++;

			XPC_DEACTIVATE_PARTITION(part, reason);
1051

1052 1053
			if (part->disengage_timeout > disengage_timeout)
				disengage_timeout = part->disengage_timeout;
1054
		}
1055

1056
		if (xpc_any_partition_engaged()) {
1057
			if (time_is_before_jiffies(printmsg_time)) {
1058
				dev_info(xpc_part, "waiting for remote "
1059 1060 1061
					 "partitions to deactivate, timeout in "
					 "%ld seconds\n", (disengage_timeout -
					 jiffies) / HZ);
1062
				printmsg_time = jiffies +
1063
				    (XPC_DEACTIVATE_PRINTMSG_INTERVAL * HZ);
1064 1065 1066 1067 1068 1069
				printed_waiting_msg = 1;
			}

		} else if (active_part_count > 0) {
			if (printed_waiting_msg) {
				dev_info(xpc_part, "waiting for local partition"
1070
					 " to deactivate\n");
1071 1072 1073 1074
				printed_waiting_msg = 0;
			}

		} else {
1075
			if (!xpc_disengage_timedout) {
1076
				dev_info(xpc_part, "all partitions have "
1077
					 "deactivated\n");
1078 1079
			}
			break;
1080 1081
		}

1082
		/* sleep for a 1/3 of a second or so */
1083
		(void)msleep_interruptible(300);
1084 1085 1086

	} while (1);

1087
	DBUG_ON(xpc_any_partition_engaged());
1088
	DBUG_ON(xpc_any_hbs_allowed() != 0);
1089

1090
	xpc_teardown_rsvd_page();
1091

1092
	if (reason == xpUnloading) {
1093
		(void)unregister_die_notifier(&xpc_die_notifier);
1094
		(void)unregister_reboot_notifier(&xpc_reboot_notifier);
1095
	}
1096

1097 1098 1099
	/* clear the interface to XPC's functions */
	xpc_clear_interface();

1100
	if (xpc_sysctl)
1101
		unregister_sysctl_table(xpc_sysctl);
1102

1103
	xpc_teardown_partitions();
1104 1105 1106

	if (is_shub())
		xpc_exit_sn2();
1107
	else if (is_uv())
1108
		xpc_exit_uv();
1109 1110
}

1111
/*
1112 1113 1114 1115 1116
 * This function is called when the system is being rebooted.
 */
static int
xpc_system_reboot(struct notifier_block *nb, unsigned long event, void *unused)
{
1117
	enum xp_retval reason;
1118 1119 1120

	switch (event) {
	case SYS_RESTART:
1121
		reason = xpSystemReboot;
1122 1123
		break;
	case SYS_HALT:
1124
		reason = xpSystemHalt;
1125 1126
		break;
	case SYS_POWER_OFF:
1127
		reason = xpSystemPoweroff;
1128 1129
		break;
	default:
1130
		reason = xpSystemGoingDown;
1131 1132 1133 1134 1135 1136 1137
	}

	xpc_do_exit(reason);
	return NOTIFY_DONE;
}

/*
1138 1139
 * Notify other partitions to deactivate from us by first disengaging from all
 * references to our memory.
1140 1141
 */
static void
1142
xpc_die_deactivate(void)
1143 1144
{
	struct xpc_partition *part;
1145
	short partid;
1146
	int any_engaged;
1147 1148
	long keep_waiting;
	long wait_to_print;
1149 1150 1151 1152

	/* keep xpc_hb_checker thread from doing anything (just in case) */
	xpc_exiting = 1;

1153
	xpc_disallow_all_hbs();	/*indicate we're deactivated */
1154

1155
	for (partid = 0; partid < xp_max_npartitions; partid++) {
1156 1157
		part = &xpc_partitions[partid];

1158
		if (xpc_partition_engaged(partid) ||
1159
		    part->act_state != XPC_P_AS_INACTIVE) {
1160 1161
			xpc_request_partition_deactivation(part);
			xpc_indicate_partition_disengaged(part);
1162 1163 1164
		}
	}

1165 1166
	/*
	 * Though we requested that all other partitions deactivate from us,
1167 1168 1169 1170 1171 1172
	 * we only wait until they've all disengaged or we've reached the
	 * defined timelimit.
	 *
	 * Given that one iteration through the following while-loop takes
	 * approximately 200 microseconds, calculate the #of loops to take
	 * before bailing and the #of loops before printing a waiting message.
1173
	 */
1174 1175
	keep_waiting = xpc_disengage_timelimit * 1000 * 5;
	wait_to_print = XPC_DEACTIVATE_PRINTMSG_INTERVAL * 1000 * 5;
1176

1177
	while (1) {
1178 1179 1180
		any_engaged = xpc_any_partition_engaged();
		if (!any_engaged) {
			dev_info(xpc_part, "all partitions have deactivated\n");
1181 1182
			break;
		}
1183

1184
		if (!keep_waiting--) {
1185 1186
			for (partid = 0; partid < xp_max_npartitions;
			     partid++) {
1187 1188
				if (xpc_partition_engaged(partid)) {
					dev_info(xpc_part, "deactivate from "
1189 1190
						 "remote partition %d timed "
						 "out\n", partid);
1191 1192 1193 1194 1195
				}
			}
			break;
		}

1196
		if (!wait_to_print--) {
1197
			dev_info(xpc_part, "waiting for remote partitions to "
1198
				 "deactivate, timeout in %ld seconds\n",
1199 1200 1201
				 keep_waiting / (1000 * 5));
			wait_to_print = XPC_DEACTIVATE_PRINTMSG_INTERVAL *
			    1000 * 5;
1202
		}
1203 1204

		udelay(200);
1205 1206 1207 1208
	}
}

/*
1209 1210 1211 1212 1213 1214
 * This function is called when the system is being restarted or halted due
 * to some sort of system failure. If this is the case we need to notify the
 * other partitions to disengage from all references to our memory.
 * This function can also be called when our heartbeater could be offlined
 * for a time. In this case we need to notify other partitions to not worry
 * about the lack of a heartbeat.
1215 1216 1217 1218
 */
static int
xpc_system_die(struct notifier_block *nb, unsigned long event, void *unused)
{
1219
#ifdef CONFIG_IA64		/* !!! temporary kludge */
1220 1221 1222
	switch (event) {
	case DIE_MACHINE_RESTART:
	case DIE_MACHINE_HALT:
1223
		xpc_die_deactivate();
1224
		break;
1225 1226 1227

	case DIE_KDEBUG_ENTER:
		/* Should lack of heartbeat be ignored by other partitions? */
1228
		if (!xpc_kdebug_ignore)
1229
			break;
1230

1231
		/* fall through */
1232 1233
	case DIE_MCA_MONARCH_ENTER:
	case DIE_INIT_MONARCH_ENTER:
1234
		xpc_offline_heartbeat();
1235
		break;
1236 1237 1238

	case DIE_KDEBUG_LEAVE:
		/* Is lack of heartbeat being ignored by other partitions? */
1239
		if (!xpc_kdebug_ignore)
1240
			break;
1241

1242
		/* fall through */
1243 1244
	case DIE_MCA_MONARCH_LEAVE:
	case DIE_INIT_MONARCH_LEAVE:
1245
		xpc_online_heartbeat();
1246 1247
		break;
	}
1248 1249 1250
#else
	xpc_die_deactivate();
#endif
1251 1252 1253 1254

	return NOTIFY_DONE;
}

1255 1256 1257 1258
int __init
xpc_init(void)
{
	int ret;
1259
	struct task_struct *kthread;
1260 1261 1262

	snprintf(xpc_part->bus_id, BUS_ID_SIZE, "part");
	snprintf(xpc_chan->bus_id, BUS_ID_SIZE, "chan");
1263

1264 1265 1266
	if (is_shub()) {
		/*
		 * The ia64-sn2 architecture supports at most 64 partitions.
1267
		 * And the inability to unregister remote amos restricts us
1268 1269 1270
		 * further to only support exactly 64 partitions on this
		 * architecture, no less.
		 */
1271 1272 1273 1274 1275 1276
		if (xp_max_npartitions != 64) {
			dev_err(xpc_part, "max #of partitions not set to 64\n");
			ret = -EINVAL;
		} else {
			ret = xpc_init_sn2();
		}
1277 1278

	} else if (is_uv()) {
1279
		ret = xpc_init_uv();
1280 1281

	} else {
1282
		ret = -ENODEV;
1283
	}
1284

1285 1286 1287 1288 1289
	if (ret != 0)
		return ret;

	ret = xpc_setup_partitions();
	if (ret != 0) {
1290
		dev_err(xpc_part, "can't get memory for partition structure\n");
1291
		goto out_1;
1292
	}
1293

1294 1295
	xpc_sysctl = register_sysctl_table(xpc_sys_dir);

1296 1297 1298 1299 1300
	/*
	 * Fill the partition reserved page with the information needed by
	 * other partitions to discover we are alive and establish initial
	 * communications.
	 */
1301 1302
	ret = xpc_setup_rsvd_page();
	if (ret != 0) {
1303
		dev_err(xpc_part, "can't setup our reserved page\n");
1304
		goto out_2;
1305 1306
	}

1307 1308
	/* add ourselves to the reboot_notifier_list */
	ret = register_reboot_notifier(&xpc_reboot_notifier);
1309
	if (ret != 0)
1310 1311
		dev_warn(xpc_part, "can't register reboot notifier\n");

1312
	/* add ourselves to the die_notifier list */
1313
	ret = register_die_notifier(&xpc_die_notifier);
1314
	if (ret != 0)
1315 1316
		dev_warn(xpc_part, "can't register die notifier\n");

1317 1318 1319 1320
	/*
	 * The real work-horse behind xpc.  This processes incoming
	 * interrupts and monitors remote heartbeats.
	 */
1321 1322
	kthread = kthread_run(xpc_hb_checker, NULL, XPC_HB_CHECK_THREAD_NAME);
	if (IS_ERR(kthread)) {
1323
		dev_err(xpc_part, "failed while forking hb check thread\n");
1324
		ret = -EBUSY;
1325
		goto out_3;
1326 1327 1328 1329 1330 1331 1332
	}

	/*
	 * Startup a thread that will attempt to discover other partitions to
	 * activate based on info provided by SAL. This new thread is short
	 * lived and will exit once discovery is complete.
	 */
1333 1334 1335
	kthread = kthread_run(xpc_initiate_discovery, NULL,
			      XPC_DISCOVERY_THREAD_NAME);
	if (IS_ERR(kthread)) {
1336 1337 1338
		dev_err(xpc_part, "failed while forking discovery thread\n");

		/* mark this new thread as a non-starter */
J
Jes Sorensen 已提交
1339
		complete(&xpc_discovery_exited);
1340

1341
		xpc_do_exit(xpUnloading);
1342 1343 1344 1345 1346
		return -EBUSY;
	}

	/* set the interface to point at XPC's functions */
	xpc_set_interface(xpc_initiate_connect, xpc_initiate_disconnect,
1347 1348
			  xpc_initiate_send, xpc_initiate_send_notify,
			  xpc_initiate_received, xpc_initiate_partid_to_nasids);
1349 1350

	return 0;
1351 1352

	/* initialization was not successful */
1353
out_3:
1354
	xpc_teardown_rsvd_page();
1355

1356 1357
	(void)unregister_die_notifier(&xpc_die_notifier);
	(void)unregister_reboot_notifier(&xpc_reboot_notifier);
1358
out_2:
1359 1360
	if (xpc_sysctl)
		unregister_sysctl_table(xpc_sysctl);
1361 1362

	xpc_teardown_partitions();
1363 1364 1365
out_1:
	if (is_shub())
		xpc_exit_sn2();
1366
	else if (is_uv())
1367
		xpc_exit_uv();
1368
	return ret;
1369 1370
}

1371
module_init(xpc_init);
1372 1373 1374 1375

void __exit
xpc_exit(void)
{
1376
	xpc_do_exit(xpUnloading);
1377 1378
}

1379
module_exit(xpc_exit);
1380 1381 1382 1383 1384 1385 1386

MODULE_AUTHOR("Silicon Graphics, Inc.");
MODULE_DESCRIPTION("Cross Partition Communication (XPC) support");
MODULE_LICENSE("GPL");

module_param(xpc_hb_interval, int, 0);
MODULE_PARM_DESC(xpc_hb_interval, "Number of seconds between "
1387
		 "heartbeat increments.");
1388 1389 1390

module_param(xpc_hb_check_interval, int, 0);
MODULE_PARM_DESC(xpc_hb_check_interval, "Number of seconds between "
1391
		 "heartbeat checks.");
1392

1393 1394 1395
module_param(xpc_disengage_timelimit, int, 0);
MODULE_PARM_DESC(xpc_disengage_timelimit, "Number of seconds to wait "
		 "for disengage to complete.");
1396

1397 1398
module_param(xpc_kdebug_ignore, int, 0);
MODULE_PARM_DESC(xpc_kdebug_ignore, "Should lack of heartbeat be ignored by "
1399
		 "other partitions when dropping into kdebug.");