bcast.c 30.0 KB
Newer Older
P
Per Liden 已提交
1 2
/*
 * net/tipc/bcast.c: TIPC broadcast code
3
 *
4
 * Copyright (c) 2004-2006, 2014-2015, Ericsson AB
P
Per Liden 已提交
5
 * Copyright (c) 2004, Intel Corporation.
6
 * Copyright (c) 2005, 2010-2011, Wind River Systems
P
Per Liden 已提交
7 8
 * All rights reserved.
 *
P
Per Liden 已提交
9
 * Redistribution and use in source and binary forms, with or without
P
Per Liden 已提交
10 11
 * modification, are permitted provided that the following conditions are met:
 *
P
Per Liden 已提交
12 13 14 15 16 17 18 19
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 * 3. Neither the names of the copyright holders nor the names of its
 *    contributors may be used to endorse or promote products derived from
 *    this software without specific prior written permission.
P
Per Liden 已提交
20
 *
P
Per Liden 已提交
21 22 23 24 25 26 27 28 29 30 31 32 33 34
 * Alternatively, this software may be distributed under the terms of the
 * GNU General Public License ("GPL") version 2 as published by the Free
 * Software Foundation.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
P
Per Liden 已提交
35 36 37
 * POSSIBILITY OF SUCH DAMAGE.
 */

38
#include <linux/tipc_config.h>
39 40
#include "socket.h"
#include "msg.h"
P
Per Liden 已提交
41
#include "bcast.h"
42
#include "name_distr.h"
43 44
#include "link.h"
#include "node.h"
P
Per Liden 已提交
45

46
#define	MAX_PKT_DEFAULT_MCAST	1500	/* bcast link max packet size (fixed) */
47 48
#define	BCLINK_WIN_DEFAULT	50	/* bcast link window size (default) */
#define	BCLINK_WIN_MIN	        32	/* bcast minimum link window size */
P
Per Liden 已提交
49

50
const char tipc_bclink_name[] = "broadcast-link";
P
Per Liden 已提交
51

52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
/**
 * struct tipc_bcbearer_pair - a pair of bearers used by broadcast link
 * @primary: pointer to primary bearer
 * @secondary: pointer to secondary bearer
 *
 * Bearers must have same priority and same set of reachable destinations
 * to be paired.
 */

struct tipc_bcbearer_pair {
	struct tipc_bearer *primary;
	struct tipc_bearer *secondary;
};

#define	BCBEARER		MAX_BEARERS

/**
 * struct tipc_bcbearer - bearer used by broadcast link
 * @bearer: (non-standard) broadcast bearer structure
 * @media: (non-standard) broadcast media structure
 * @bpairs: array of bearer pairs
 * @bpairs_temp: temporary array of bearer pairs used by tipc_bcbearer_sort()
 * @remains: temporary node map used by tipc_bcbearer_send()
 * @remains_new: temporary node map used tipc_bcbearer_send()
 *
 * Note: The fields labelled "temporary" are incorporated into the bearer
 * to avoid consuming potentially limited stack space through the use of
 * large local variables within multicast routines.  Concurrent access is
 * prevented through use of the spinlock "bcast_lock".
 */
struct tipc_bcbearer {
	struct tipc_bearer bearer;
	struct tipc_media media;
	struct tipc_bcbearer_pair bpairs[MAX_BEARERS];
	struct tipc_bcbearer_pair bpairs_temp[TIPC_MAX_LINK_PRI + 1];
	struct tipc_node_map remains;
	struct tipc_node_map remains_new;
};

/**
 * struct tipc_bc_base - link used for broadcast messages
 * @lock: spinlock governing access to structure
 * @link: (non-standard) broadcast link structure
 * @node: (non-standard) node structure representing b'cast link's peer node
 * @bcast_nodes: map of broadcast-capable nodes
 * @retransmit_to: node that most recently requested a retransmit
 *
 * Handles sequence numbering, fragmentation, bundling, etc.
 */
struct tipc_bc_base {
	spinlock_t lock; /* spinlock protecting broadcast structs */
	struct tipc_link link;
	struct tipc_node node;
	struct sk_buff_head arrvq;
	struct sk_buff_head inputq;
	struct tipc_node_map bcast_nodes;
	struct tipc_node *retransmit_to;
};

/**
 * tipc_nmap_equal - test for equality of node maps
 */
static int tipc_nmap_equal(struct tipc_node_map *nm_a,
			   struct tipc_node_map *nm_b)
{
	return !memcmp(nm_a, nm_b, sizeof(*nm_a));
}

120 121 122
static void tipc_nmap_diff(struct tipc_node_map *nm_a,
			   struct tipc_node_map *nm_b,
			   struct tipc_node_map *nm_diff);
123 124
static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node);
static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node);
125
static void tipc_bclink_lock(struct net *net)
126
{
127 128
	struct tipc_net *tn = net_generic(net, tipc_net_id);

129
	spin_lock_bh(&tn->bcbase->lock);
130 131
}

132
static void tipc_bclink_unlock(struct net *net)
133
{
134
	struct tipc_net *tn = net_generic(net, tipc_net_id);
135

136
	spin_unlock_bh(&tn->bcbase->lock);
137 138
}

139 140 141 142
void tipc_bclink_input(struct net *net)
{
	struct tipc_net *tn = net_generic(net, tipc_net_id);

143
	tipc_sk_mcast_rcv(net, &tn->bcbase->arrvq, &tn->bcbase->inputq);
144 145
}

146
uint  tipc_bcast_get_mtu(void)
147 148 149 150
{
	return MAX_PKT_DEFAULT_MCAST;
}

S
Sam Ravnborg 已提交
151
static u32 bcbuf_acks(struct sk_buff *buf)
P
Per Liden 已提交
152
{
153
	return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle;
P
Per Liden 已提交
154 155
}

S
Sam Ravnborg 已提交
156
static void bcbuf_set_acks(struct sk_buff *buf, u32 acks)
P
Per Liden 已提交
157
{
158
	TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks;
P
Per Liden 已提交
159 160
}

S
Sam Ravnborg 已提交
161
static void bcbuf_decr_acks(struct sk_buff *buf)
P
Per Liden 已提交
162 163 164 165
{
	bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);
}

166
void tipc_bclink_add_node(struct net *net, u32 addr)
167
{
168 169 170
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	tipc_bclink_lock(net);
171
	tipc_nmap_add(&tn->bcbase->bcast_nodes, addr);
172
	tipc_bclink_unlock(net);
173 174
}

175
void tipc_bclink_remove_node(struct net *net, u32 addr)
176
{
177 178 179
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	tipc_bclink_lock(net);
180
	tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr);
181 182

	/* Last node? => reset backlog queue */
183 184
	if (!tn->bcbase->bcast_nodes.count)
		tipc_link_purge_backlog(&tn->bcbase->link);
185

186
	tipc_bclink_unlock(net);
187
}
P
Per Liden 已提交
188

189
static void bclink_set_last_sent(struct net *net)
190
{
191 192 193
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;

194
	bcl->silent_intv_cnt = mod(bcl->snd_nxt - 1);
195 196
}

197
u32 tipc_bclink_get_last_sent(struct net *net)
198
{
199 200
	struct tipc_net *tn = net_generic(net, tipc_net_id);

201
	return tn->bcl->silent_intv_cnt;
202 203
}

204
static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
P
Per Liden 已提交
205
{
206 207
	node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ?
						seqno : node->bclink.last_sent;
P
Per Liden 已提交
208 209
}

210
/**
211 212
 * tipc_bclink_retransmit_to - get most recent node to request retransmission
 *
213
 * Called with bclink_lock locked
214
 */
215
struct tipc_node *tipc_bclink_retransmit_to(struct net *net)
216
{
217 218
	struct tipc_net *tn = net_generic(net, tipc_net_id);

219
	return tn->bcbase->retransmit_to;
220 221
}

222
/**
P
Per Liden 已提交
223 224 225
 * bclink_retransmit_pkt - retransmit broadcast packets
 * @after: sequence number of last packet to *not* retransmit
 * @to: sequence number of last packet to retransmit
226
 *
227
 * Called with bclink_lock locked
P
Per Liden 已提交
228
 */
229
static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)
P
Per Liden 已提交
230
{
231
	struct sk_buff *skb;
232
	struct tipc_link *bcl = tn->bcl;
P
Per Liden 已提交
233

J
Jon Paul Maloy 已提交
234
	skb_queue_walk(&bcl->transmq, skb) {
235 236
		if (more(buf_seqno(skb), after)) {
			tipc_link_retransmit(bcl, skb, mod(to - after));
237
			break;
238
		}
239
	}
P
Per Liden 已提交
240 241
}

242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265
/**
 * bclink_prepare_wakeup - prepare users for wakeup after congestion
 * @bcl: broadcast link
 * @resultq: queue for users which can be woken up
 * Move a number of waiting users, as permitted by available space in
 * the send queue, from link wait queue to specified queue for wakeup
 */
static void bclink_prepare_wakeup(struct tipc_link *bcl, struct sk_buff_head *resultq)
{
	int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
	int imp, lim;
	struct sk_buff *skb, *tmp;

	skb_queue_walk_safe(&bcl->wakeupq, skb, tmp) {
		imp = TIPC_SKB_CB(skb)->chain_imp;
		lim = bcl->window + bcl->backlog[imp].limit;
		pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
		if ((pnd[imp] + bcl->backlog[imp].len) >= lim)
			continue;
		skb_unlink(skb, &bcl->wakeupq);
		skb_queue_tail(resultq, skb);
	}
}

266 267 268 269 270
/**
 * tipc_bclink_wakeup_users - wake up pending users
 *
 * Called with no locks taken
 */
271
void tipc_bclink_wakeup_users(struct net *net)
272
{
273
	struct tipc_net *tn = net_generic(net, tipc_net_id);
274 275
	struct tipc_link *bcl = tn->bcl;
	struct sk_buff_head resultq;
276

277 278 279
	skb_queue_head_init(&resultq);
	bclink_prepare_wakeup(bcl, &resultq);
	tipc_sk_rcv(net, &resultq);
280 281
}

282
/**
283
 * tipc_bclink_acknowledge - handle acknowledgement of broadcast packets
P
Per Liden 已提交
284 285
 * @n_ptr: node that sent acknowledgement info
 * @acked: broadcast sequence # that has been acknowledged
286
 *
287
 * Node is locked, bclink_lock unlocked.
P
Per Liden 已提交
288
 */
289
void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
P
Per Liden 已提交
290
{
291
	struct sk_buff *skb, *tmp;
P
Per Liden 已提交
292
	unsigned int released = 0;
293 294
	struct net *net = n_ptr->net;
	struct tipc_net *tn = net_generic(net, tipc_net_id);
P
Per Liden 已提交
295

296 297 298
	if (unlikely(!n_ptr->bclink.recv_permitted))
		return;

299
	tipc_bclink_lock(net);
300

301
	/* Bail out if tx queue is empty (no clean up is required) */
J
Jon Paul Maloy 已提交
302
	skb = skb_peek(&tn->bcl->transmq);
303
	if (!skb)
304 305 306 307 308 309 310 311 312
		goto exit;

	/* Determine which messages need to be acknowledged */
	if (acked == INVALID_LINK_SEQ) {
		/*
		 * Contact with specified node has been lost, so need to
		 * acknowledge sent messages only (if other nodes still exist)
		 * or both sent and unsent messages (otherwise)
		 */
313
		if (tn->bcbase->bcast_nodes.count)
314
			acked = tn->bcl->silent_intv_cnt;
315
		else
316
			acked = tn->bcl->snd_nxt;
317 318 319 320 321
	} else {
		/*
		 * Bail out if specified sequence number does not correspond
		 * to a message that has been sent and not yet acknowledged
		 */
322
		if (less(acked, buf_seqno(skb)) ||
323
		    less(tn->bcl->silent_intv_cnt, acked) ||
324 325 326 327 328
		    less_eq(acked, n_ptr->bclink.acked))
			goto exit;
	}

	/* Skip over packets that node has previously acknowledged */
J
Jon Paul Maloy 已提交
329
	skb_queue_walk(&tn->bcl->transmq, skb) {
330 331 332
		if (more(buf_seqno(skb), n_ptr->bclink.acked))
			break;
	}
P
Per Liden 已提交
333 334

	/* Update packets that node is now acknowledging */
J
Jon Paul Maloy 已提交
335
	skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) {
336 337
		if (more(buf_seqno(skb), acked))
			break;
J
Jon Paul Maloy 已提交
338 339
		bcbuf_decr_acks(skb);
		bclink_set_last_sent(net);
340
		if (bcbuf_acks(skb) == 0) {
J
Jon Paul Maloy 已提交
341
			__skb_unlink(skb, &tn->bcl->transmq);
342
			kfree_skb(skb);
P
Per Liden 已提交
343 344 345 346 347 348
			released = 1;
		}
	}
	n_ptr->bclink.acked = acked;

	/* Try resolving broadcast link congestion, if necessary */
J
Jon Paul Maloy 已提交
349
	if (unlikely(skb_peek(&tn->bcl->backlogq))) {
350 351
		tipc_link_push_packets(tn->bcl);
		bclink_set_last_sent(net);
352
	}
353
	if (unlikely(released && !skb_queue_empty(&tn->bcl->wakeupq)))
354
		n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS;
355
exit:
356
	tipc_bclink_unlock(net);
P
Per Liden 已提交
357 358
}

359
/**
360
 * tipc_bclink_update_link_state - update broadcast link state
361
 *
Y
Ying Xue 已提交
362
 * RCU and node lock set
P
Per Liden 已提交
363
 */
364
void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
365
				   u32 last_sent)
P
Per Liden 已提交
366
{
367
	struct sk_buff *buf;
368
	struct net *net = n_ptr->net;
369
	struct tipc_net *tn = net_generic(net, tipc_net_id);
P
Per Liden 已提交
370

371 372 373
	/* Ignore "stale" link state info */
	if (less_eq(last_sent, n_ptr->bclink.last_in))
		return;
P
Per Liden 已提交
374

375 376 377 378 379 380 381 382 383 384 385 386 387 388 389
	/* Update link synchronization state; quit if in sync */
	bclink_update_last_sent(n_ptr, last_sent);

	if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
		return;

	/* Update out-of-sync state; quit if loss is still unconfirmed */
	if ((++n_ptr->bclink.oos_state) == 1) {
		if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
			return;
		n_ptr->bclink.oos_state++;
	}

	/* Don't NACK if one has been recently sent (or seen) */
	if (n_ptr->bclink.oos_state & 0x1)
P
Per Liden 已提交
390 391
		return;

392
	/* Send NACK */
393
	buf = tipc_buf_acquire(INT_H_SIZE);
P
Per Liden 已提交
394
	if (buf) {
395
		struct tipc_msg *msg = buf_msg(buf);
J
Jon Paul Maloy 已提交
396
		struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferdq);
397
		u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent;
398

399
		tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG,
400
			      INT_H_SIZE, n_ptr->addr);
401
		msg_set_non_seq(msg, 1);
402
		msg_set_mc_netid(msg, tn->net_id);
403 404
		msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
		msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
405
		msg_set_bcgap_to(msg, to);
P
Per Liden 已提交
406

407
		tipc_bclink_lock(net);
408
		tipc_bearer_send(net, MAX_BEARERS, buf, NULL);
409 410
		tn->bcl->stats.sent_nacks++;
		tipc_bclink_unlock(net);
411
		kfree_skb(buf);
P
Per Liden 已提交
412

413
		n_ptr->bclink.oos_state++;
P
Per Liden 已提交
414 415 416
	}
}

417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439
void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *hdr)
{
	u16 last = msg_last_bcast(hdr);
	int mtyp = msg_type(hdr);

	if (unlikely(msg_user(hdr) != LINK_PROTOCOL))
		return;
	if (mtyp == STATE_MSG) {
		tipc_bclink_update_link_state(n, last);
		return;
	}
	/* Compatibility: older nodes don't know BCAST_PROTOCOL synchronization,
	 * and transfer synch info in LINK_PROTOCOL messages.
	 */
	if (tipc_node_is_up(n))
		return;
	if ((mtyp != RESET_MSG) && (mtyp != ACTIVATE_MSG))
		return;
	n->bclink.last_sent = last;
	n->bclink.last_in = last;
	n->bclink.oos_state = 0;
}

440
/**
441
 * bclink_peek_nack - monitor retransmission requests sent by other nodes
P
Per Liden 已提交
442
 *
443 444
 * Delay any upcoming NACK by this node if another node has already
 * requested the first message this node is going to ask for.
P
Per Liden 已提交
445
 */
446
static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
P
Per Liden 已提交
447
{
448
	struct tipc_node *n_ptr = tipc_node_find(net, msg_destnode(msg));
P
Per Liden 已提交
449

450
	if (unlikely(!n_ptr))
P
Per Liden 已提交
451
		return;
452

453
	tipc_node_lock(n_ptr);
454
	if (n_ptr->bclink.recv_permitted &&
455 456 457
	    (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
	    (n_ptr->bclink.last_in == msg_bcgap_after(msg)))
		n_ptr->bclink.oos_state = 2;
458
	tipc_node_unlock(n_ptr);
459
	tipc_node_put(n_ptr);
P
Per Liden 已提交
460 461
}

462
/* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster
463
 *                    and to identified node local sockets
464
 * @net: the applicable net namespace
465
 * @list: chain of buffers containing message
466 467 468
 * Consumes the buffer chain, except when returning -ELINKCONG
 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
 */
469
int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
470
{
471 472
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;
473
	struct tipc_bc_base *bclink = tn->bcbase;
474 475
	int rc = 0;
	int bc = 0;
476
	struct sk_buff *skb;
477 478
	struct sk_buff_head arrvq;
	struct sk_buff_head inputq;
479 480

	/* Prepare clone of message for local node */
481
	skb = tipc_msg_reassemble(list);
482
	if (unlikely(!skb))
483
		return -EHOSTUNREACH;
484

485
	/* Broadcast to all nodes */
486
	if (likely(bclink)) {
487
		tipc_bclink_lock(net);
488
		if (likely(bclink->bcast_nodes.count)) {
489
			rc = __tipc_link_xmit(net, bcl, list);
490
			if (likely(!rc)) {
J
Jon Paul Maloy 已提交
491
				u32 len = skb_queue_len(&bcl->transmq);
492

493
				bclink_set_last_sent(net);
494
				bcl->stats.queue_sz_counts++;
495
				bcl->stats.accu_queue_sz += len;
496 497 498
			}
			bc = 1;
		}
499
		tipc_bclink_unlock(net);
500 501 502
	}

	if (unlikely(!bc))
503
		__skb_queue_purge(list);
504

505
	if (unlikely(rc)) {
506
		kfree_skb(skb);
507 508 509 510 511 512 513
		return rc;
	}
	/* Deliver message clone */
	__skb_queue_head_init(&arrvq);
	skb_queue_head_init(&inputq);
	__skb_queue_tail(&arrvq, skb);
	tipc_sk_mcast_rcv(net, &arrvq, &inputq);
514 515 516
	return rc;
}

517
/**
518 519
 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
 *
520
 * Called with both sending node's lock and bclink_lock taken.
521 522 523
 */
static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
{
524 525
	struct tipc_net *tn = net_generic(node->net, tipc_net_id);

526 527 528
	bclink_update_last_sent(node, seqno);
	node->bclink.last_in = seqno;
	node->bclink.oos_state = 0;
529
	tn->bcl->stats.recv_info++;
530 531 532 533 534

	/*
	 * Unicast an ACK periodically, ensuring that
	 * all nodes in the cluster don't ACK at the same time
	 */
535
	if (((seqno - tn->own_addr) % TIPC_MIN_LINK_WIN) == 0) {
536
		tipc_link_proto_xmit(node_active_link(node, node->addr),
537
				     STATE_MSG, 0, 0, 0, 0);
538
		tn->bcl->stats.sent_acks++;
539 540 541
	}
}

542
/**
543
 * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards
544
 *
Y
Ying Xue 已提交
545
 * RCU is locked, no other locks set
P
Per Liden 已提交
546
 */
547
void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
548
{
549
	struct tipc_net *tn = net_generic(net, tipc_net_id);
550
	struct tipc_link *bcl = tn->bcl;
P
Per Liden 已提交
551
	struct tipc_msg *msg = buf_msg(buf);
552
	struct tipc_node *node;
P
Per Liden 已提交
553 554
	u32 next_in;
	u32 seqno;
555
	int deferred = 0;
556 557
	int pos = 0;
	struct sk_buff *iskb;
558
	struct sk_buff_head *arrvq, *inputq;
P
Per Liden 已提交
559

560
	/* Screen out unwanted broadcast messages */
561
	if (msg_mc_netid(msg) != tn->net_id)
562 563
		goto exit;

564
	node = tipc_node_find(net, msg_prevnode(msg));
565 566 567 568
	if (unlikely(!node))
		goto exit;

	tipc_node_lock(node);
569
	if (unlikely(!node->bclink.recv_permitted))
570
		goto unlock;
P
Per Liden 已提交
571

572
	/* Handle broadcast protocol message */
P
Per Liden 已提交
573
	if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
574 575
		if (msg_type(msg) != STATE_MSG)
			goto unlock;
576
		if (msg_destnode(msg) == tn->own_addr) {
577
			tipc_bclink_acknowledge(node, msg_bcast_ack(msg));
578
			tipc_bclink_lock(net);
P
Per Liden 已提交
579
			bcl->stats.recv_nacks++;
580
			tn->bcbase->retransmit_to = node;
581
			bclink_retransmit_pkt(tn, msg_bcgap_after(msg),
P
Per Liden 已提交
582
					      msg_bcgap_to(msg));
583
			tipc_bclink_unlock(net);
584
			tipc_node_unlock(node);
P
Per Liden 已提交
585
		} else {
586
			tipc_node_unlock(node);
587
			bclink_peek_nack(net, msg);
P
Per Liden 已提交
588
		}
589
		tipc_node_put(node);
590
		goto exit;
P
Per Liden 已提交
591 592
	}

593
	/* Handle in-sequence broadcast message */
P
Per Liden 已提交
594
	seqno = msg_seqno(msg);
595
	next_in = mod(node->bclink.last_in + 1);
596 597
	arrvq = &tn->bcbase->arrvq;
	inputq = &tn->bcbase->inputq;
P
Per Liden 已提交
598 599

	if (likely(seqno == next_in)) {
600
receive:
601
		/* Deliver message to destination */
P
Per Liden 已提交
602
		if (likely(msg_isdata(msg))) {
603
			tipc_bclink_lock(net);
604
			bclink_accept_pkt(node, seqno);
605 606 607 608
			spin_lock_bh(&inputq->lock);
			__skb_queue_tail(arrvq, buf);
			spin_unlock_bh(&inputq->lock);
			node->action_flags |= TIPC_BCAST_MSG_EVT;
609
			tipc_bclink_unlock(net);
610
			tipc_node_unlock(node);
P
Per Liden 已提交
611
		} else if (msg_user(msg) == MSG_BUNDLER) {
612
			tipc_bclink_lock(net);
613
			bclink_accept_pkt(node, seqno);
P
Per Liden 已提交
614 615
			bcl->stats.recv_bundles++;
			bcl->stats.recv_bundled += msg_msgcnt(msg);
616 617 618 619 620 621 622
			pos = 0;
			while (tipc_msg_extract(buf, &iskb, &pos)) {
				spin_lock_bh(&inputq->lock);
				__skb_queue_tail(arrvq, iskb);
				spin_unlock_bh(&inputq->lock);
			}
			node->action_flags |= TIPC_BCAST_MSG_EVT;
623
			tipc_bclink_unlock(net);
624
			tipc_node_unlock(node);
P
Per Liden 已提交
625
		} else if (msg_user(msg) == MSG_FRAGMENTER) {
626
			tipc_bclink_lock(net);
627
			bclink_accept_pkt(node, seqno);
628 629 630 631 632
			tipc_buf_append(&node->bclink.reasm_buf, &buf);
			if (unlikely(!buf && !node->bclink.reasm_buf)) {
				tipc_bclink_unlock(net);
				goto unlock;
			}
P
Per Liden 已提交
633
			bcl->stats.recv_fragments++;
634
			if (buf) {
P
Per Liden 已提交
635
				bcl->stats.recv_fragmented++;
636
				msg = buf_msg(buf);
637
				tipc_bclink_unlock(net);
E
Erik Hugne 已提交
638 639
				goto receive;
			}
640
			tipc_bclink_unlock(net);
641
			tipc_node_unlock(node);
P
Per Liden 已提交
642
		} else {
643
			tipc_bclink_lock(net);
644
			bclink_accept_pkt(node, seqno);
645
			tipc_bclink_unlock(net);
646
			tipc_node_unlock(node);
647
			kfree_skb(buf);
P
Per Liden 已提交
648
		}
649
		buf = NULL;
650 651

		/* Determine new synchronization state */
652
		tipc_node_lock(node);
653 654 655
		if (unlikely(!tipc_node_is_up(node)))
			goto unlock;

656
		if (node->bclink.last_in == node->bclink.last_sent)
657 658
			goto unlock;

J
Jon Paul Maloy 已提交
659
		if (skb_queue_empty(&node->bclink.deferdq)) {
660 661 662 663
			node->bclink.oos_state = 1;
			goto unlock;
		}

J
Jon Paul Maloy 已提交
664
		msg = buf_msg(skb_peek(&node->bclink.deferdq));
665 666 667 668 669 670
		seqno = msg_seqno(msg);
		next_in = mod(next_in + 1);
		if (seqno != next_in)
			goto unlock;

		/* Take in-sequence message from deferred queue & deliver it */
J
Jon Paul Maloy 已提交
671
		buf = __skb_dequeue(&node->bclink.deferdq);
672 673 674 675 676
		goto receive;
	}

	/* Handle out-of-sequence broadcast message */
	if (less(next_in, seqno)) {
J
Jon Paul Maloy 已提交
677
		deferred = tipc_link_defer_pkt(&node->bclink.deferdq,
678
					       buf);
679
		bclink_update_last_sent(node, seqno);
680
		buf = NULL;
681
	}
682

683
	tipc_bclink_lock(net);
684

685 686
	if (deferred)
		bcl->stats.deferred_recv++;
687 688
	else
		bcl->stats.duplicates++;
689

690
	tipc_bclink_unlock(net);
691

692
unlock:
693
	tipc_node_unlock(node);
694
	tipc_node_put(node);
695
exit:
696
	kfree_skb(buf);
P
Per Liden 已提交
697 698
}

699
u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)
P
Per Liden 已提交
700
{
701
	return (n_ptr->bclink.recv_permitted &&
702
		(tipc_bclink_get_last_sent(n_ptr->net) != n_ptr->bclink.acked));
P
Per Liden 已提交
703 704 705 706
}


/**
707
 * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer
708
 *
709 710
 * Send packet over as many bearers as necessary to reach all nodes
 * that have joined the broadcast link.
711
 *
712 713
 * Returns 0 (packet sent successfully) under all circumstances,
 * since the broadcast link's pseudo-bearer never blocks
P
Per Liden 已提交
714
 */
715 716
static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
			      struct tipc_bearer *unused1,
A
Adrian Bunk 已提交
717
			      struct tipc_media_addr *unused2)
P
Per Liden 已提交
718 719
{
	int bp_index;
720
	struct tipc_msg *msg = buf_msg(buf);
721
	struct tipc_net *tn = net_generic(net, tipc_net_id);
722
	struct tipc_bcbearer *bcbearer = tn->bcbearer;
723
	struct tipc_bc_base *bclink = tn->bcbase;
P
Per Liden 已提交
724

725
	/* Prepare broadcast link message for reliable transmission,
726 727 728 729
	 * if first time trying to send it;
	 * preparation is skipped for broadcast link protocol messages
	 * since they are sent in an unreliable manner and don't need it
	 */
P
Per Liden 已提交
730
	if (likely(!msg_non_seq(buf_msg(buf)))) {
731
		bcbuf_set_acks(buf, bclink->bcast_nodes.count);
732
		msg_set_non_seq(msg, 1);
733
		msg_set_mc_netid(msg, tn->net_id);
734
		tn->bcl->stats.sent_info++;
735
		if (WARN_ON(!bclink->bcast_nodes.count)) {
736 737 738
			dump_stack();
			return 0;
		}
P
Per Liden 已提交
739 740 741
	}

	/* Send buffer over bearers until all targets reached */
742
	bcbearer->remains = bclink->bcast_nodes;
P
Per Liden 已提交
743 744

	for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) {
745 746
		struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary;
		struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary;
747 748
		struct tipc_bearer *bp[2] = {p, s};
		struct tipc_bearer *b = bp[msg_link_selector(msg)];
749
		struct sk_buff *tbuf;
P
Per Liden 已提交
750 751

		if (!p)
752
			break; /* No more bearers to try */
753 754
		if (!b)
			b = p;
755
		tipc_nmap_diff(&bcbearer->remains, &b->nodes,
756
			       &bcbearer->remains_new);
757
		if (bcbearer->remains_new.count == bcbearer->remains.count)
758
			continue; /* Nothing added by bearer pair */
P
Per Liden 已提交
759

760 761
		if (bp_index == 0) {
			/* Use original buffer for first bearer */
762
			tipc_bearer_send(net, b->identity, buf, &b->bcast_addr);
763 764
		} else {
			/* Avoid concurrent buffer access */
765
			tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC);
766 767
			if (!tbuf)
				break;
768 769
			tipc_bearer_send(net, b->identity, tbuf,
					 &b->bcast_addr);
770 771
			kfree_skb(tbuf); /* Bearer keeps a clone */
		}
772
		if (bcbearer->remains_new.count == 0)
773
			break; /* All targets reached */
P
Per Liden 已提交
774

775
		bcbearer->remains = bcbearer->remains_new;
P
Per Liden 已提交
776
	}
777

778
	return 0;
P
Per Liden 已提交
779 780 781
}

/**
782
 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
P
Per Liden 已提交
783
 */
784 785
void tipc_bcbearer_sort(struct net *net, struct tipc_node_map *nm_ptr,
			u32 node, bool action)
P
Per Liden 已提交
786
{
787
	struct tipc_net *tn = net_generic(net, tipc_net_id);
788
	struct tipc_bcbearer *bcbearer = tn->bcbearer;
789 790
	struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp;
	struct tipc_bcbearer_pair *bp_curr;
Y
Ying Xue 已提交
791
	struct tipc_bearer *b;
P
Per Liden 已提交
792 793 794
	int b_index;
	int pri;

795
	tipc_bclink_lock(net);
P
Per Liden 已提交
796

797 798 799 800 801
	if (action)
		tipc_nmap_add(nm_ptr, node);
	else
		tipc_nmap_remove(nm_ptr, node);

P
Per Liden 已提交
802 803 804
	/* Group bearers by priority (can assume max of two per priority) */
	memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp));

Y
Ying Xue 已提交
805
	rcu_read_lock();
P
Per Liden 已提交
806
	for (b_index = 0; b_index < MAX_BEARERS; b_index++) {
807
		b = rcu_dereference_rtnl(tn->bearer_list[b_index]);
808
		if (!b || !b->nodes.count)
P
Per Liden 已提交
809 810 811 812 813 814 815
			continue;

		if (!bp_temp[b->priority].primary)
			bp_temp[b->priority].primary = b;
		else
			bp_temp[b->priority].secondary = b;
	}
Y
Ying Xue 已提交
816
	rcu_read_unlock();
P
Per Liden 已提交
817 818 819 820 821

	/* Create array of bearer pairs for broadcasting */
	bp_curr = bcbearer->bpairs;
	memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs));

P
Per Liden 已提交
822
	for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) {
P
Per Liden 已提交
823 824 825 826 827 828 829

		if (!bp_temp[pri].primary)
			continue;

		bp_curr->primary = bp_temp[pri].primary;

		if (bp_temp[pri].secondary) {
830 831
			if (tipc_nmap_equal(&bp_temp[pri].primary->nodes,
					    &bp_temp[pri].secondary->nodes)) {
P
Per Liden 已提交
832 833 834 835 836 837 838 839 840 841
				bp_curr->secondary = bp_temp[pri].secondary;
			} else {
				bp_curr++;
				bp_curr->primary = bp_temp[pri].secondary;
			}
		}

		bp_curr++;
	}

842
	tipc_bclink_unlock(net);
P
Per Liden 已提交
843 844
}

845 846
static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb,
				      struct tipc_stats *stats)
847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895
{
	int i;
	struct nlattr *nest;

	struct nla_map {
		__u32 key;
		__u32 val;
	};

	struct nla_map map[] = {
		{TIPC_NLA_STATS_RX_INFO, stats->recv_info},
		{TIPC_NLA_STATS_RX_FRAGMENTS, stats->recv_fragments},
		{TIPC_NLA_STATS_RX_FRAGMENTED, stats->recv_fragmented},
		{TIPC_NLA_STATS_RX_BUNDLES, stats->recv_bundles},
		{TIPC_NLA_STATS_RX_BUNDLED, stats->recv_bundled},
		{TIPC_NLA_STATS_TX_INFO, stats->sent_info},
		{TIPC_NLA_STATS_TX_FRAGMENTS, stats->sent_fragments},
		{TIPC_NLA_STATS_TX_FRAGMENTED, stats->sent_fragmented},
		{TIPC_NLA_STATS_TX_BUNDLES, stats->sent_bundles},
		{TIPC_NLA_STATS_TX_BUNDLED, stats->sent_bundled},
		{TIPC_NLA_STATS_RX_NACKS, stats->recv_nacks},
		{TIPC_NLA_STATS_RX_DEFERRED, stats->deferred_recv},
		{TIPC_NLA_STATS_TX_NACKS, stats->sent_nacks},
		{TIPC_NLA_STATS_TX_ACKS, stats->sent_acks},
		{TIPC_NLA_STATS_RETRANSMITTED, stats->retransmitted},
		{TIPC_NLA_STATS_DUPLICATES, stats->duplicates},
		{TIPC_NLA_STATS_LINK_CONGS, stats->link_congs},
		{TIPC_NLA_STATS_MAX_QUEUE, stats->max_queue_sz},
		{TIPC_NLA_STATS_AVG_QUEUE, stats->queue_sz_counts ?
			(stats->accu_queue_sz / stats->queue_sz_counts) : 0}
	};

	nest = nla_nest_start(skb, TIPC_NLA_LINK_STATS);
	if (!nest)
		return -EMSGSIZE;

	for (i = 0; i <  ARRAY_SIZE(map); i++)
		if (nla_put_u32(skb, map[i].key, map[i].val))
			goto msg_full;

	nla_nest_end(skb, nest);

	return 0;
msg_full:
	nla_nest_cancel(skb, nest);

	return -EMSGSIZE;
}

896
int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
897 898 899 900 901
{
	int err;
	void *hdr;
	struct nlattr *attrs;
	struct nlattr *prop;
902 903
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;
904 905 906 907

	if (!bcl)
		return 0;

908
	tipc_bclink_lock(net);
909

910
	hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family,
911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926
			  NLM_F_MULTI, TIPC_NL_LINK_GET);
	if (!hdr)
		return -EMSGSIZE;

	attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK);
	if (!attrs)
		goto msg_full;

	/* The broadcast link is always up */
	if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP))
		goto attr_msg_full;

	if (nla_put_flag(msg->skb, TIPC_NLA_LINK_BROADCAST))
		goto attr_msg_full;
	if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, bcl->name))
		goto attr_msg_full;
927
	if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, bcl->rcv_nxt))
928
		goto attr_msg_full;
929
	if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, bcl->snd_nxt))
930 931 932 933 934
		goto attr_msg_full;

	prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP);
	if (!prop)
		goto attr_msg_full;
935
	if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->window))
936 937 938 939 940 941 942
		goto prop_msg_full;
	nla_nest_end(msg->skb, prop);

	err = __tipc_nl_add_bc_link_stat(msg->skb, &bcl->stats);
	if (err)
		goto attr_msg_full;

943
	tipc_bclink_unlock(net);
944 945 946 947 948 949 950 951 952 953
	nla_nest_end(msg->skb, attrs);
	genlmsg_end(msg->skb, hdr);

	return 0;

prop_msg_full:
	nla_nest_cancel(msg->skb, prop);
attr_msg_full:
	nla_nest_cancel(msg->skb, attrs);
msg_full:
954
	tipc_bclink_unlock(net);
955 956 957 958
	genlmsg_cancel(msg->skb, hdr);

	return -EMSGSIZE;
}
P
Per Liden 已提交
959

960
int tipc_bclink_reset_stats(struct net *net)
P
Per Liden 已提交
961
{
962 963 964
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;

P
Per Liden 已提交
965 966 967
	if (!bcl)
		return -ENOPROTOOPT;

968
	tipc_bclink_lock(net);
P
Per Liden 已提交
969
	memset(&bcl->stats, 0, sizeof(bcl->stats));
970
	tipc_bclink_unlock(net);
971
	return 0;
P
Per Liden 已提交
972 973
}

974
int tipc_bclink_set_queue_limits(struct net *net, u32 limit)
P
Per Liden 已提交
975
{
976 977 978
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;

P
Per Liden 已提交
979 980
	if (!bcl)
		return -ENOPROTOOPT;
981 982 983
	if (limit < BCLINK_WIN_MIN)
		limit = BCLINK_WIN_MIN;
	if (limit > TIPC_MAX_LINK_WIN)
P
Per Liden 已提交
984
		return -EINVAL;
985
	tipc_bclink_lock(net);
986
	tipc_link_set_queue_limits(bcl, limit);
987
	tipc_bclink_unlock(net);
988
	return 0;
P
Per Liden 已提交
989 990
}

991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011
int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[])
{
	int err;
	u32 win;
	struct nlattr *props[TIPC_NLA_PROP_MAX + 1];

	if (!attrs[TIPC_NLA_LINK_PROP])
		return -EINVAL;

	err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_LINK_PROP], props);
	if (err)
		return err;

	if (!props[TIPC_NLA_PROP_WIN])
		return -EOPNOTSUPP;

	win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);

	return tipc_bclink_set_queue_limits(net, win);
}

1012
int tipc_bcast_init(struct net *net)
P
Per Liden 已提交
1013
{
1014
	struct tipc_net *tn = net_generic(net, tipc_net_id);
1015
	struct tipc_bcbearer *bcbearer;
1016
	struct tipc_bc_base *bclink;
1017
	struct tipc_link *bcl;
1018

1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029
	bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC);
	if (!bcbearer)
		return -ENOMEM;

	bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC);
	if (!bclink) {
		kfree(bcbearer);
		return -ENOMEM;
	}

	bcl = &bclink->link;
P
Per Liden 已提交
1030
	bcbearer->bearer.media = &bcbearer->media;
1031
	bcbearer->media.send_msg = tipc_bcbearer_send;
1032
	sprintf(bcbearer->media.name, "tipc-broadcast");
P
Per Liden 已提交
1033

1034
	spin_lock_init(&bclink->lock);
J
Jon Paul Maloy 已提交
1035 1036 1037
	__skb_queue_head_init(&bcl->transmq);
	__skb_queue_head_init(&bcl->backlogq);
	__skb_queue_head_init(&bcl->deferdq);
1038
	skb_queue_head_init(&bcl->wakeupq);
1039
	bcl->snd_nxt = 1;
I
Ingo Molnar 已提交
1040
	spin_lock_init(&bclink->node.lock);
1041 1042
	__skb_queue_head_init(&bclink->arrvq);
	skb_queue_head_init(&bclink->inputq);
P
Per Liden 已提交
1043
	bcl->owner = &bclink->node;
1044
	bcl->owner->net = net;
1045
	bcl->mtu = MAX_PKT_DEFAULT_MCAST;
1046
	tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);
1047
	bcl->bearer_id = MAX_BEARERS;
1048
	rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcbearer->bearer);
1049 1050
	bcl->pmsg = (struct tipc_msg *)&bcl->proto_msg;
	msg_set_prevnode(bcl->pmsg, tn->own_addr);
1051
	strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);
1052
	tn->bcbearer = bcbearer;
1053
	tn->bcbase = bclink;
1054
	tn->bcl = bcl;
1055
	return 0;
P
Per Liden 已提交
1056 1057
}

1058
void tipc_bcast_stop(struct net *net)
P
Per Liden 已提交
1059
{
1060 1061
	struct tipc_net *tn = net_generic(net, tipc_net_id);

1062 1063 1064
	tipc_bclink_lock(net);
	tipc_link_purge_queues(tn->bcl);
	tipc_bclink_unlock(net);
1065

1066
	RCU_INIT_POINTER(tn->bearer_list[BCBEARER], NULL);
1067
	synchronize_net();
1068
	kfree(tn->bcbearer);
1069
	kfree(tn->bcbase);
P
Per Liden 已提交
1070 1071
}

1072 1073 1074
/**
 * tipc_nmap_add - add a node to a node map
 */
1075
static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089
{
	int n = tipc_node(node);
	int w = n / WSIZE;
	u32 mask = (1 << (n % WSIZE));

	if ((nm_ptr->map[w] & mask) == 0) {
		nm_ptr->count++;
		nm_ptr->map[w] |= mask;
	}
}

/**
 * tipc_nmap_remove - remove a node from a node map
 */
1090
static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)
1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107
{
	int n = tipc_node(node);
	int w = n / WSIZE;
	u32 mask = (1 << (n % WSIZE));

	if ((nm_ptr->map[w] & mask) != 0) {
		nm_ptr->map[w] &= ~mask;
		nm_ptr->count--;
	}
}

/**
 * tipc_nmap_diff - find differences between node maps
 * @nm_a: input node map A
 * @nm_b: input node map B
 * @nm_diff: output node map A-B (i.e. nodes of A that are not in B)
 */
1108 1109 1110
static void tipc_nmap_diff(struct tipc_node_map *nm_a,
			   struct tipc_node_map *nm_b,
			   struct tipc_node_map *nm_diff)
1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128
{
	int stop = ARRAY_SIZE(nm_a->map);
	int w;
	int b;
	u32 map;

	memset(nm_diff, 0, sizeof(*nm_diff));
	for (w = 0; w < stop; w++) {
		map = nm_a->map[w] ^ (nm_a->map[w] & nm_b->map[w]);
		nm_diff->map[w] = map;
		if (map != 0) {
			for (b = 0 ; b < WSIZE; b++) {
				if (map & (1 << b))
					nm_diff->count++;
			}
		}
	}
}