subscr.c 15.6 KB
Newer Older
P
Per Liden 已提交
1 2
/*
 * net/tipc/subscr.c: TIPC subscription service
3
 *
P
Per Liden 已提交
4
 * Copyright (c) 2000-2006, Ericsson AB
P
Per Liden 已提交
5 6 7
 * Copyright (c) 2005, Wind River Systems
 * All rights reserved.
 *
P
Per Liden 已提交
8
 * Redistribution and use in source and binary forms, with or without
P
Per Liden 已提交
9 10
 * modification, are permitted provided that the following conditions are met:
 *
P
Per Liden 已提交
11 12 13 14 15 16 17 18
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 * 3. Neither the names of the copyright holders nor the names of its
 *    contributors may be used to endorse or promote products derived from
 *    this software without specific prior written permission.
P
Per Liden 已提交
19
 *
P
Per Liden 已提交
20 21 22 23 24 25 26 27 28 29 30 31 32 33
 * Alternatively, this software may be distributed under the terms of the
 * GNU General Public License ("GPL") version 2 as published by the Free
 * Software Foundation.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
P
Per Liden 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include "core.h"
#include "dbg.h"
#include "subscr.h"
#include "name_table.h"
#include "ref.h"

/**
 * struct subscriber - TIPC network topology subscriber
 * @ref: object reference to subscriber object itself
 * @lock: pointer to spinlock controlling access to subscriber object
 * @subscriber_list: adjacent subscribers in top. server's list of subscribers
 * @subscription_list: list of subscription objects for this subscriber
 * @port_ref: object reference to port used to communicate with subscriber
 */
51

P
Per Liden 已提交
52 53
struct subscriber {
	u32 ref;
54
	spinlock_t *lock;
P
Per Liden 已提交
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
	struct list_head subscriber_list;
	struct list_head subscription_list;
	u32 port_ref;
};

/**
 * struct top_srv - TIPC network topology subscription service
 * @user_ref: TIPC userid of subscription service
 * @setup_port: reference to TIPC port that handles subscription requests
 * @subscription_count: number of active subscriptions (not subscribers!)
 * @subscriber_list: list of ports subscribing to service
 * @lock: spinlock govering access to subscriber list
 */

struct top_srv {
	u32 user_ref;
	u32 setup_port;
	atomic_t subscription_count;
	struct list_head subscriber_list;
	spinlock_t lock;
};

static struct top_srv topsrv = { 0 };

/**
 * htohl - convert value to endianness used by destination
 * @in: value to convert
 * @swap: non-zero if endianness must be reversed
83
 *
P
Per Liden 已提交
84 85 86
 * Returns converted value
 */

S
Sam Ravnborg 已提交
87
static u32 htohl(u32 in, int swap)
P
Per Liden 已提交
88
{
89
	return swap ? (u32)___constant_swab32(in) : in;
P
Per Liden 已提交
90 91 92 93 94 95
}

/**
 * subscr_send_event - send a message containing a tipc_event to the subscriber
 */

96 97
static void subscr_send_event(struct subscription *sub,
			      u32 found_lower,
P
Per Liden 已提交
98
			      u32 found_upper,
99 100
			      u32 event,
			      u32 port_ref,
P
Per Liden 已提交
101 102 103 104 105 106 107
			      u32 node)
{
	struct iovec msg_sect;

	msg_sect.iov_base = (void *)&sub->evt;
	msg_sect.iov_len = sizeof(struct tipc_event);

108 109 110 111 112
	sub->evt.event = htohl(event, sub->swap);
	sub->evt.found_lower = htohl(found_lower, sub->swap);
	sub->evt.found_upper = htohl(found_upper, sub->swap);
	sub->evt.port.ref = htohl(port_ref, sub->swap);
	sub->evt.port.node = htohl(node, sub->swap);
P
Per Liden 已提交
113 114 115 116
	tipc_send(sub->owner->port_ref, 1, &msg_sect);
}

/**
117
 * tipc_subscr_overlap - test for subscription overlap with the given values
P
Per Liden 已提交
118 119 120 121
 *
 * Returns 1 if there is overlap, otherwise 0.
 */

122 123
int tipc_subscr_overlap(struct subscription *sub,
			u32 found_lower,
124
			u32 found_upper)
P
Per Liden 已提交
125 126 127 128 129 130 131 132 133 134 135 136

{
	if (found_lower < sub->seq.lower)
		found_lower = sub->seq.lower;
	if (found_upper > sub->seq.upper)
		found_upper = sub->seq.upper;
	if (found_lower > found_upper)
		return 0;
	return 1;
}

/**
137
 * tipc_subscr_report_overlap - issue event if there is subscription overlap
138
 *
P
Per Liden 已提交
139 140 141
 * Protected by nameseq.lock in name_table.c
 */

142 143
void tipc_subscr_report_overlap(struct subscription *sub,
				u32 found_lower,
144
				u32 found_upper,
145 146
				u32 event,
				u32 port_ref,
147 148
				u32 node,
				int must)
P
Per Liden 已提交
149 150 151
{
	dbg("Rep overlap %u:%u,%u<->%u,%u\n", sub->seq.type, sub->seq.lower,
	    sub->seq.upper, found_lower, found_upper);
152
	if (!tipc_subscr_overlap(sub, found_lower, found_upper))
P
Per Liden 已提交
153
		return;
154
	if (!must && !(sub->filter & TIPC_SUB_PORTS))
P
Per Liden 已提交
155
		return;
156 157

	sub->event_cb(sub, found_lower, found_upper, event, port_ref, node);
P
Per Liden 已提交
158 159 160 161 162 163 164 165 166 167 168 169 170 171
}

/**
 * subscr_timeout - subscription timeout has occurred
 */

static void subscr_timeout(struct subscription *sub)
{
	struct subscriber *subscriber;
	u32 subscriber_ref;

	/* Validate subscriber reference (in case subscriber is terminating) */

	subscriber_ref = sub->owner->ref;
172
	subscriber = (struct subscriber *)tipc_ref_lock(subscriber_ref);
P
Per Liden 已提交
173 174 175
	if (subscriber == NULL)
		return;

176 177 178 179 180 181 182
	/* Validate timeout (in case subscription is being cancelled) */

	if (sub->timeout == TIPC_WAIT_FOREVER) {
		tipc_ref_unlock(subscriber_ref);
		return;
	}

P
Per Liden 已提交
183 184
	/* Unlink subscription from name table */

185
	tipc_nametbl_unsubscribe(sub);
P
Per Liden 已提交
186 187 188

	/* Notify subscriber of timeout, then unlink subscription */

189 190
	subscr_send_event(sub,
			  sub->evt.s.seq.lower,
P
Per Liden 已提交
191
			  sub->evt.s.seq.upper,
192 193
			  TIPC_SUBSCR_TIMEOUT,
			  0,
P
Per Liden 已提交
194 195 196 197 198
			  0);
	list_del(&sub->subscription_list);

	/* Now destroy subscription */

199
	tipc_ref_unlock(subscriber_ref);
P
Per Liden 已提交
200 201 202 203 204
	k_term_timer(&sub->timer);
	kfree(sub);
	atomic_dec(&topsrv.subscription_count);
}

205 206 207 208 209 210 211 212 213 214 215 216 217 218
/**
 * subscr_del - delete a subscription within a subscription list
 *
 * Called with subscriber locked.
 */

static void subscr_del(struct subscription *sub)
{
	tipc_nametbl_unsubscribe(sub);
	list_del(&sub->subscription_list);
	kfree(sub);
	atomic_dec(&topsrv.subscription_count);
}

P
Per Liden 已提交
219 220
/**
 * subscr_terminate - terminate communication with a subscriber
221
 *
P
Per Liden 已提交
222
 * Called with subscriber locked.  Routine must temporarily release this lock
223
 * to enable subscription timeout routine(s) to finish without deadlocking;
P
Per Liden 已提交
224
 * the lock is then reclaimed to allow caller to release it upon return.
225
 * (This should work even in the unlikely event some other thread creates
P
Per Liden 已提交
226 227 228 229 230 231 232 233 234 235 236
 * a new object reference in the interim that uses this lock; this routine will
 * simply wait for it to be released, then claim it.)
 */

static void subscr_terminate(struct subscriber *subscriber)
{
	struct subscription *sub;
	struct subscription *sub_temp;

	/* Invalidate subscriber reference */

237
	tipc_ref_discard(subscriber->ref);
P
Per Liden 已提交
238 239 240
	spin_unlock_bh(subscriber->lock);

	/* Destroy any existing subscriptions for subscriber */
241

P
Per Liden 已提交
242 243 244 245 246 247
	list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
				 subscription_list) {
		if (sub->timeout != TIPC_WAIT_FOREVER) {
			k_cancel_timer(&sub->timer);
			k_term_timer(&sub->timer);
		}
248
		dbg("Term: Removing sub %u,%u,%u from subscriber %x list\n",
P
Per Liden 已提交
249
		    sub->seq.type, sub->seq.lower, sub->seq.upper, subscriber);
250
		subscr_del(sub);
P
Per Liden 已提交
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
	}

	/* Sever connection to subscriber */

	tipc_shutdown(subscriber->port_ref);
	tipc_deleteport(subscriber->port_ref);

	/* Remove subscriber from topology server's subscriber list */

	spin_lock_bh(&topsrv.lock);
	list_del(&subscriber->subscriber_list);
	spin_unlock_bh(&topsrv.lock);

	/* Now destroy subscriber */

	spin_lock_bh(subscriber->lock);
	kfree(subscriber);
}

270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
/**
 * subscr_cancel - handle subscription cancellation request
 *
 * Called with subscriber locked.  Routine must temporarily release this lock
 * to enable the subscription timeout routine to finish without deadlocking;
 * the lock is then reclaimed to allow caller to release it upon return.
 *
 * Note that fields of 's' use subscriber's endianness!
 */

static void subscr_cancel(struct tipc_subscr *s,
			  struct subscriber *subscriber)
{
	struct subscription *sub;
	struct subscription *sub_temp;
	int found = 0;

	/* Find first matching subscription, exit if not found */

	list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
				 subscription_list) {
		if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) {
			found = 1;
			break;
		}
	}
	if (!found)
		return;

	/* Cancel subscription timer (if used), then delete subscription */

	if (sub->timeout != TIPC_WAIT_FOREVER) {
		sub->timeout = TIPC_WAIT_FOREVER;
		spin_unlock_bh(subscriber->lock);
		k_cancel_timer(&sub->timer);
		k_term_timer(&sub->timer);
		spin_lock_bh(subscriber->lock);
	}
	dbg("Cancel: removing sub %u,%u,%u from subscriber %x list\n",
	    sub->seq.type, sub->seq.lower, sub->seq.upper, subscriber);
	subscr_del(sub);
}

P
Per Liden 已提交
313 314
/**
 * subscr_subscribe - create subscription for subscriber
315
 *
P
Per Liden 已提交
316 317 318 319 320 321 322
 * Called with subscriber locked
 */

static void subscr_subscribe(struct tipc_subscr *s,
			     struct subscriber *subscriber)
{
	struct subscription *sub;
323
	int swap;
P
Per Liden 已提交
324

325
	/* Determine subscriber's endianness */
326

327
	swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE));
328 329 330

	/* Detect & process a subscription cancellation request */

331 332
	if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
		s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
333 334 335 336
		subscr_cancel(s, subscriber);
		return;
	}

P
Per Liden 已提交
337 338 339
	/* Refuse subscription if global limit exceeded */

	if (atomic_read(&topsrv.subscription_count) >= tipc_max_subscriptions) {
340 341
		warn("Subscription rejected, subscription limit reached (%u)\n",
		     tipc_max_subscriptions);
P
Per Liden 已提交
342 343 344 345 346 347
		subscr_terminate(subscriber);
		return;
	}

	/* Allocate subscription object */

348
	sub = kzalloc(sizeof(*sub), GFP_ATOMIC);
349 350
	if (!sub) {
		warn("Subscription rejected, no memory\n");
P
Per Liden 已提交
351 352 353 354 355 356
		subscr_terminate(subscriber);
		return;
	}

	/* Initialize subscription object */

357 358 359 360 361
	sub->seq.type = htohl(s->seq.type, swap);
	sub->seq.lower = htohl(s->seq.lower, swap);
	sub->seq.upper = htohl(s->seq.upper, swap);
	sub->timeout = htohl(s->timeout, swap);
	sub->filter = htohl(s->filter, swap);
362 363
	if ((!(sub->filter & TIPC_SUB_PORTS)
	     == !(sub->filter & TIPC_SUB_SERVICE))
P
Per Liden 已提交
364
	    || (sub->seq.lower > sub->seq.upper)) {
365
		warn("Subscription rejected, illegal request\n");
P
Per Liden 已提交
366 367 368 369
		kfree(sub);
		subscr_terminate(subscriber);
		return;
	}
370
	sub->event_cb = subscr_send_event;
P
Per Liden 已提交
371 372 373 374
	memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr));
	INIT_LIST_HEAD(&sub->subscription_list);
	INIT_LIST_HEAD(&sub->nameseq_list);
	list_add(&sub->subscription_list, &subscriber->subscription_list);
375
	sub->swap = swap;
P
Per Liden 已提交
376 377 378 379 380 381 382
	atomic_inc(&topsrv.subscription_count);
	if (sub->timeout != TIPC_WAIT_FOREVER) {
		k_init_timer(&sub->timer,
			     (Handler)subscr_timeout, (unsigned long)sub);
		k_start_timer(&sub->timer, sub->timeout);
	}
	sub->owner = subscriber;
383
	tipc_nametbl_subscribe(sub);
P
Per Liden 已提交
384 385 386 387 388 389 390 391 392 393 394 395 396
}

/**
 * subscr_conn_shutdown_event - handle termination request from subscriber
 */

static void subscr_conn_shutdown_event(void *usr_handle,
				       u32 portref,
				       struct sk_buff **buf,
				       unsigned char const *data,
				       unsigned int size,
				       int reason)
{
397
	struct subscriber *subscriber;
P
Per Liden 已提交
398 399
	spinlock_t *subscriber_lock;

400
	subscriber = tipc_ref_lock((u32)(unsigned long)usr_handle);
P
Per Liden 已提交
401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418
	if (subscriber == NULL)
		return;

	subscriber_lock = subscriber->lock;
	subscr_terminate(subscriber);
	spin_unlock_bh(subscriber_lock);
}

/**
 * subscr_conn_msg_event - handle new subscription request from subscriber
 */

static void subscr_conn_msg_event(void *usr_handle,
				  u32 port_ref,
				  struct sk_buff **buf,
				  const unchar *data,
				  u32 size)
{
419
	struct subscriber *subscriber;
P
Per Liden 已提交
420 421
	spinlock_t *subscriber_lock;

422
	subscriber = tipc_ref_lock((u32)(unsigned long)usr_handle);
P
Per Liden 已提交
423 424 425 426 427 428 429 430
	if (subscriber == NULL)
		return;

	subscriber_lock = subscriber->lock;
	if (size != sizeof(struct tipc_subscr))
		subscr_terminate(subscriber);
	else
		subscr_subscribe((struct tipc_subscr *)data, subscriber);
431

P
Per Liden 已提交
432 433 434 435 436 437 438 439 440 441 442 443
	spin_unlock_bh(subscriber_lock);
}

/**
 * subscr_named_msg_event - handle request to establish a new subscriber
 */

static void subscr_named_msg_event(void *usr_handle,
				   u32 port_ref,
				   struct sk_buff **buf,
				   const unchar *data,
				   u32 size,
444
				   u32 importance,
P
Per Liden 已提交
445 446 447 448
				   struct tipc_portid const *orig,
				   struct tipc_name_seq const *dest)
{
	struct subscriber *subscriber;
449
	struct iovec msg_sect = {NULL, 0};
P
Per Liden 已提交
450 451 452 453 454
	spinlock_t *subscriber_lock;

	dbg("subscr_named_msg_event: orig = %x own = %x,\n",
	    orig->node, tipc_own_addr);
	if (size && (size != sizeof(struct tipc_subscr))) {
455
		warn("Subscriber rejected, invalid subscription size\n");
P
Per Liden 已提交
456 457 458 459 460
		return;
	}

	/* Create subscriber object */

461
	subscriber = kzalloc(sizeof(struct subscriber), GFP_ATOMIC);
P
Per Liden 已提交
462
	if (subscriber == NULL) {
463
		warn("Subscriber rejected, no memory\n");
P
Per Liden 已提交
464 465 466 467
		return;
	}
	INIT_LIST_HEAD(&subscriber->subscription_list);
	INIT_LIST_HEAD(&subscriber->subscriber_list);
468
	subscriber->ref = tipc_ref_acquire(subscriber, &subscriber->lock);
P
Per Liden 已提交
469
	if (subscriber->ref == 0) {
470
		warn("Subscriber rejected, reference table exhausted\n");
P
Per Liden 已提交
471 472 473
		kfree(subscriber);
		return;
	}
474
	spin_unlock_bh(subscriber->lock);
P
Per Liden 已提交
475 476 477 478

	/* Establish a connection to subscriber */

	tipc_createport(topsrv.user_ref,
479
			(void *)(unsigned long)subscriber->ref,
P
Per Liden 已提交
480
			importance,
481 482
			NULL,
			NULL,
P
Per Liden 已提交
483
			subscr_conn_shutdown_event,
484 485
			NULL,
			NULL,
P
Per Liden 已提交
486
			subscr_conn_msg_event,
487
			NULL,
P
Per Liden 已提交
488 489
			&subscriber->port_ref);
	if (subscriber->port_ref == 0) {
490
		warn("Subscriber rejected, unable to create port\n");
491
		tipc_ref_discard(subscriber->ref);
P
Per Liden 已提交
492 493 494 495 496 497 498 499
		kfree(subscriber);
		return;
	}
	tipc_connect2port(subscriber->port_ref, orig);


	/* Add subscriber to topology server's subscriber list */

500
	tipc_ref_lock(subscriber->ref);
P
Per Liden 已提交
501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518
	spin_lock_bh(&topsrv.lock);
	list_add(&subscriber->subscriber_list, &topsrv.subscriber_list);
	spin_unlock_bh(&topsrv.lock);

	/*
	 * Subscribe now if message contains a subscription,
	 * otherwise send an empty response to complete connection handshaking
	 */

	subscriber_lock = subscriber->lock;
	if (size)
		subscr_subscribe((struct tipc_subscr *)data, subscriber);
	else
		tipc_send(subscriber->port_ref, 1, &msg_sect);

	spin_unlock_bh(subscriber_lock);
}

519
int tipc_subscr_start(void)
P
Per Liden 已提交
520 521 522 523 524
{
	struct tipc_name_seq seq = {TIPC_TOP_SRV, TIPC_TOP_SRV, TIPC_TOP_SRV};
	int res = -1;

	memset(&topsrv, 0, sizeof (topsrv));
I
Ingo Molnar 已提交
525
	spin_lock_init(&topsrv.lock);
P
Per Liden 已提交
526 527 528
	INIT_LIST_HEAD(&topsrv.subscriber_list);

	spin_lock_bh(&topsrv.lock);
529
	res = tipc_attach(&topsrv.user_ref, NULL, NULL);
P
Per Liden 已提交
530 531 532 533 534
	if (res) {
		spin_unlock_bh(&topsrv.lock);
		return res;
	}

535 536 537 538 539 540 541 542 543 544 545 546
	res = tipc_createport(topsrv.user_ref,
			      NULL,
			      TIPC_CRITICAL_IMPORTANCE,
			      NULL,
			      NULL,
			      NULL,
			      NULL,
			      subscr_named_msg_event,
			      NULL,
			      NULL,
			      &topsrv.setup_port);
	if (res)
P
Per Liden 已提交
547 548
		goto failed;

549 550
	res = tipc_nametbl_publish_rsv(topsrv.setup_port, TIPC_NODE_SCOPE, &seq);
	if (res)
P
Per Liden 已提交
551 552 553 554 555 556
		goto failed;

	spin_unlock_bh(&topsrv.lock);
	return 0;

failed:
557
	err("Failed to create subscription service\n");
P
Per Liden 已提交
558 559 560 561 562 563
	tipc_detach(topsrv.user_ref);
	topsrv.user_ref = 0;
	spin_unlock_bh(&topsrv.lock);
	return res;
}

564
void tipc_subscr_stop(void)
P
Per Liden 已提交
565 566 567 568 569 570 571
{
	struct subscriber *subscriber;
	struct subscriber *subscriber_temp;
	spinlock_t *subscriber_lock;

	if (topsrv.user_ref) {
		tipc_deleteport(topsrv.setup_port);
572
		list_for_each_entry_safe(subscriber, subscriber_temp,
P
Per Liden 已提交
573 574
					 &topsrv.subscriber_list,
					 subscriber_list) {
575
			tipc_ref_lock(subscriber->ref);
P
Per Liden 已提交
576 577 578 579 580 581 582 583 584 585 586 587 588 589
			subscriber_lock = subscriber->lock;
			subscr_terminate(subscriber);
			spin_unlock_bh(subscriber_lock);
		}
		tipc_detach(topsrv.user_ref);
		topsrv.user_ref = 0;
	}
}


int tipc_ispublished(struct tipc_name const *name)
{
	u32 domain = 0;

590
	return(tipc_nametbl_translate(name->type, name->instance,&domain) != 0);
P
Per Liden 已提交
591 592
}