bcast.c 28.1 KB
Newer Older
P
Per Liden 已提交
1 2
/*
 * net/tipc/bcast.c: TIPC broadcast code
3
 *
4
 * Copyright (c) 2004-2006, 2014, Ericsson AB
P
Per Liden 已提交
5
 * Copyright (c) 2004, Intel Corporation.
6
 * Copyright (c) 2005, 2010-2011, Wind River Systems
P
Per Liden 已提交
7 8
 * All rights reserved.
 *
P
Per Liden 已提交
9
 * Redistribution and use in source and binary forms, with or without
P
Per Liden 已提交
10 11
 * modification, are permitted provided that the following conditions are met:
 *
P
Per Liden 已提交
12 13 14 15 16 17 18 19
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 * 3. Neither the names of the copyright holders nor the names of its
 *    contributors may be used to endorse or promote products derived from
 *    this software without specific prior written permission.
P
Per Liden 已提交
20
 *
P
Per Liden 已提交
21 22 23 24 25 26 27 28 29 30 31 32 33 34
 * Alternatively, this software may be distributed under the terms of the
 * GNU General Public License ("GPL") version 2 as published by the Free
 * Software Foundation.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
P
Per Liden 已提交
35 36 37
 * POSSIBILITY OF SUCH DAMAGE.
 */

38 39
#include "socket.h"
#include "msg.h"
P
Per Liden 已提交
40
#include "bcast.h"
41
#include "name_distr.h"
42
#include "core.h"
P
Per Liden 已提交
43

44 45
#define	MAX_PKT_DEFAULT_MCAST	1500	/* bcast link max packet size (fixed) */
#define	BCLINK_WIN_DEFAULT	20	/* bcast link window size (default) */
P
Per Liden 已提交
46

47
const char tipc_bclink_name[] = "broadcast-link";
P
Per Liden 已提交
48

49 50 51
static void tipc_nmap_diff(struct tipc_node_map *nm_a,
			   struct tipc_node_map *nm_b,
			   struct tipc_node_map *nm_diff);
52 53
static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node);
static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node);
P
Per Liden 已提交
54

55
static void tipc_bclink_lock(struct net *net)
56
{
57 58 59
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	spin_lock_bh(&tn->bclink->lock);
60 61
}

62
static void tipc_bclink_unlock(struct net *net)
63
{
64
	struct tipc_net *tn = net_generic(net, tipc_net_id);
65 66
	struct tipc_node *node = NULL;

67 68
	if (likely(!tn->bclink->flags)) {
		spin_unlock_bh(&tn->bclink->lock);
69 70 71
		return;
	}

72 73 74
	if (tn->bclink->flags & TIPC_BCLINK_RESET) {
		tn->bclink->flags &= ~TIPC_BCLINK_RESET;
		node = tipc_bclink_retransmit_to(net);
75
	}
76
	spin_unlock_bh(&tn->bclink->lock);
77 78 79 80 81

	if (node)
		tipc_link_reset_all(node);
}

82 83 84 85 86
uint  tipc_bclink_get_mtu(void)
{
	return MAX_PKT_DEFAULT_MCAST;
}

87
void tipc_bclink_set_flags(struct net *net, unsigned int flags)
88
{
89 90 91
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	tn->bclink->flags |= flags;
92 93
}

S
Sam Ravnborg 已提交
94
static u32 bcbuf_acks(struct sk_buff *buf)
P
Per Liden 已提交
95
{
96
	return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle;
P
Per Liden 已提交
97 98
}

S
Sam Ravnborg 已提交
99
static void bcbuf_set_acks(struct sk_buff *buf, u32 acks)
P
Per Liden 已提交
100
{
101
	TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks;
P
Per Liden 已提交
102 103
}

S
Sam Ravnborg 已提交
104
static void bcbuf_decr_acks(struct sk_buff *buf)
P
Per Liden 已提交
105 106 107 108
{
	bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);
}

109
void tipc_bclink_add_node(struct net *net, u32 addr)
110
{
111 112 113 114 115
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	tipc_bclink_lock(net);
	tipc_nmap_add(&tn->bclink->bcast_nodes, addr);
	tipc_bclink_unlock(net);
116 117
}

118
void tipc_bclink_remove_node(struct net *net, u32 addr)
119
{
120 121 122 123 124
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	tipc_bclink_lock(net);
	tipc_nmap_remove(&tn->bclink->bcast_nodes, addr);
	tipc_bclink_unlock(net);
125
}
P
Per Liden 已提交
126

127
static void bclink_set_last_sent(struct net *net)
128
{
129 130 131
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;

132 133 134 135 136 137
	if (bcl->next_out)
		bcl->fsm_msg_cnt = mod(buf_seqno(bcl->next_out) - 1);
	else
		bcl->fsm_msg_cnt = mod(bcl->next_out_no - 1);
}

138
u32 tipc_bclink_get_last_sent(struct net *net)
139
{
140 141 142
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	return tn->bcl->fsm_msg_cnt;
143 144
}

145
static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
P
Per Liden 已提交
146
{
147 148
	node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ?
						seqno : node->bclink.last_sent;
P
Per Liden 已提交
149 150 151
}


152
/**
153 154
 * tipc_bclink_retransmit_to - get most recent node to request retransmission
 *
155
 * Called with bclink_lock locked
156
 */
157
struct tipc_node *tipc_bclink_retransmit_to(struct net *net)
158
{
159 160 161
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	return tn->bclink->retransmit_to;
162 163
}

164
/**
P
Per Liden 已提交
165 166 167
 * bclink_retransmit_pkt - retransmit broadcast packets
 * @after: sequence number of last packet to *not* retransmit
 * @to: sequence number of last packet to retransmit
168
 *
169
 * Called with bclink_lock locked
P
Per Liden 已提交
170
 */
171
static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)
P
Per Liden 已提交
172
{
173
	struct sk_buff *skb;
174
	struct tipc_link *bcl = tn->bcl;
P
Per Liden 已提交
175

176
	skb_queue_walk(&bcl->outqueue, skb) {
177 178
		if (more(buf_seqno(skb), after)) {
			tipc_link_retransmit(bcl, skb, mod(to - after));
179
			break;
180
		}
181
	}
P
Per Liden 已提交
182 183
}

184 185 186 187 188
/**
 * tipc_bclink_wakeup_users - wake up pending users
 *
 * Called with no locks taken
 */
189
void tipc_bclink_wakeup_users(struct net *net)
190
{
191
	struct tipc_net *tn = net_generic(net, tipc_net_id);
192 193
	struct sk_buff *skb;

194
	while ((skb = skb_dequeue(&tn->bclink->link.waiting_sks)))
195
		tipc_sk_rcv(net, skb);
196 197
}

198
/**
199
 * tipc_bclink_acknowledge - handle acknowledgement of broadcast packets
P
Per Liden 已提交
200 201
 * @n_ptr: node that sent acknowledgement info
 * @acked: broadcast sequence # that has been acknowledged
202
 *
203
 * Node is locked, bclink_lock unlocked.
P
Per Liden 已提交
204
 */
205
void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
P
Per Liden 已提交
206
{
207
	struct sk_buff *skb, *tmp;
P
Per Liden 已提交
208 209
	struct sk_buff *next;
	unsigned int released = 0;
210 211
	struct net *net = n_ptr->net;
	struct tipc_net *tn = net_generic(net, tipc_net_id);
P
Per Liden 已提交
212

213
	tipc_bclink_lock(net);
214
	/* Bail out if tx queue is empty (no clean up is required) */
215
	skb = skb_peek(&tn->bcl->outqueue);
216
	if (!skb)
217 218 219 220 221 222 223 224 225
		goto exit;

	/* Determine which messages need to be acknowledged */
	if (acked == INVALID_LINK_SEQ) {
		/*
		 * Contact with specified node has been lost, so need to
		 * acknowledge sent messages only (if other nodes still exist)
		 * or both sent and unsent messages (otherwise)
		 */
226 227
		if (tn->bclink->bcast_nodes.count)
			acked = tn->bcl->fsm_msg_cnt;
228
		else
229
			acked = tn->bcl->next_out_no;
230 231 232 233 234
	} else {
		/*
		 * Bail out if specified sequence number does not correspond
		 * to a message that has been sent and not yet acknowledged
		 */
235
		if (less(acked, buf_seqno(skb)) ||
236
		    less(tn->bcl->fsm_msg_cnt, acked) ||
237 238 239 240 241
		    less_eq(acked, n_ptr->bclink.acked))
			goto exit;
	}

	/* Skip over packets that node has previously acknowledged */
242
	skb_queue_walk(&tn->bcl->outqueue, skb) {
243 244 245
		if (more(buf_seqno(skb), n_ptr->bclink.acked))
			break;
	}
P
Per Liden 已提交
246 247

	/* Update packets that node is now acknowledging */
248
	skb_queue_walk_from_safe(&tn->bcl->outqueue, skb, tmp) {
249 250
		if (more(buf_seqno(skb), acked))
			break;
P
Per Liden 已提交
251

252 253
		next = tipc_skb_queue_next(&tn->bcl->outqueue, skb);
		if (skb != tn->bcl->next_out) {
254 255 256
			bcbuf_decr_acks(skb);
		} else {
			bcbuf_set_acks(skb, 0);
257 258
			tn->bcl->next_out = next;
			bclink_set_last_sent(net);
259 260
		}

261
		if (bcbuf_acks(skb) == 0) {
262
			__skb_unlink(skb, &tn->bcl->outqueue);
263
			kfree_skb(skb);
P
Per Liden 已提交
264 265 266 267 268 269
			released = 1;
		}
	}
	n_ptr->bclink.acked = acked;

	/* Try resolving broadcast link congestion, if necessary */
270 271 272
	if (unlikely(tn->bcl->next_out)) {
		tipc_link_push_packets(tn->bcl);
		bclink_set_last_sent(net);
273
	}
274
	if (unlikely(released && !skb_queue_empty(&tn->bcl->waiting_sks)))
275 276
		n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS;

277
exit:
278
	tipc_bclink_unlock(net);
P
Per Liden 已提交
279 280
}

281
/**
282
 * tipc_bclink_update_link_state - update broadcast link state
283
 *
Y
Ying Xue 已提交
284
 * RCU and node lock set
P
Per Liden 已提交
285
 */
286 287
void tipc_bclink_update_link_state(struct net *net, struct tipc_node *n_ptr,
				   u32 last_sent)
P
Per Liden 已提交
288
{
289
	struct sk_buff *buf;
290
	struct tipc_net *tn = net_generic(net, tipc_net_id);
P
Per Liden 已提交
291

292 293 294
	/* Ignore "stale" link state info */
	if (less_eq(last_sent, n_ptr->bclink.last_in))
		return;
P
Per Liden 已提交
295

296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
	/* Update link synchronization state; quit if in sync */
	bclink_update_last_sent(n_ptr, last_sent);

	if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
		return;

	/* Update out-of-sync state; quit if loss is still unconfirmed */
	if ((++n_ptr->bclink.oos_state) == 1) {
		if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
			return;
		n_ptr->bclink.oos_state++;
	}

	/* Don't NACK if one has been recently sent (or seen) */
	if (n_ptr->bclink.oos_state & 0x1)
P
Per Liden 已提交
311 312
		return;

313
	/* Send NACK */
314
	buf = tipc_buf_acquire(INT_H_SIZE);
P
Per Liden 已提交
315
	if (buf) {
316
		struct tipc_msg *msg = buf_msg(buf);
317 318
		struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferred_queue);
		u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent;
319

320
		tipc_msg_init(net, msg, BCAST_PROTOCOL, STATE_MSG,
321
			      INT_H_SIZE, n_ptr->addr);
322
		msg_set_non_seq(msg, 1);
323
		msg_set_mc_netid(msg, tn->net_id);
324 325
		msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
		msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
326
		msg_set_bcgap_to(msg, to);
P
Per Liden 已提交
327

328
		tipc_bclink_lock(net);
329
		tipc_bearer_send(net, MAX_BEARERS, buf, NULL);
330 331
		tn->bcl->stats.sent_nacks++;
		tipc_bclink_unlock(net);
332
		kfree_skb(buf);
P
Per Liden 已提交
333

334
		n_ptr->bclink.oos_state++;
P
Per Liden 已提交
335 336 337
	}
}

338
/**
339
 * bclink_peek_nack - monitor retransmission requests sent by other nodes
P
Per Liden 已提交
340
 *
341 342
 * Delay any upcoming NACK by this node if another node has already
 * requested the first message this node is going to ask for.
P
Per Liden 已提交
343
 */
344
static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
P
Per Liden 已提交
345
{
346
	struct tipc_node *n_ptr = tipc_node_find(net, msg_destnode(msg));
P
Per Liden 已提交
347

348
	if (unlikely(!n_ptr))
P
Per Liden 已提交
349
		return;
350

351
	tipc_node_lock(n_ptr);
352

353
	if (n_ptr->bclink.recv_permitted &&
354 355 356 357
	    (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
	    (n_ptr->bclink.last_in == msg_bcgap_after(msg)))
		n_ptr->bclink.oos_state = 2;

358
	tipc_node_unlock(n_ptr);
P
Per Liden 已提交
359 360
}

361 362
/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster
 *                    and to identified node local sockets
363
 * @net: the applicable net namespace
364
 * @list: chain of buffers containing message
365 366 367
 * Consumes the buffer chain, except when returning -ELINKCONG
 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
 */
368
int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
369
{
370 371 372
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;
	struct tipc_bclink *bclink = tn->bclink;
373 374
	int rc = 0;
	int bc = 0;
375
	struct sk_buff *skb;
376 377

	/* Prepare clone of message for local node */
378 379 380
	skb = tipc_msg_reassemble(list);
	if (unlikely(!skb)) {
		__skb_queue_purge(list);
381 382 383 384 385
		return -EHOSTUNREACH;
	}

	/* Broadcast to all other nodes */
	if (likely(bclink)) {
386
		tipc_bclink_lock(net);
387
		if (likely(bclink->bcast_nodes.count)) {
388
			rc = __tipc_link_xmit(net, bcl, list);
389
			if (likely(!rc)) {
390 391
				u32 len = skb_queue_len(&bcl->outqueue);

392
				bclink_set_last_sent(net);
393
				bcl->stats.queue_sz_counts++;
394
				bcl->stats.accu_queue_sz += len;
395 396 397
			}
			bc = 1;
		}
398
		tipc_bclink_unlock(net);
399 400 401
	}

	if (unlikely(!bc))
402
		__skb_queue_purge(list);
403 404 405

	/* Deliver message clone */
	if (likely(!rc))
406
		tipc_sk_mcast_rcv(net, skb);
407
	else
408
		kfree_skb(skb);
409 410 411 412

	return rc;
}

413
/**
414 415
 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
 *
416
 * Called with both sending node's lock and bclink_lock taken.
417 418 419
 */
static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
{
420 421
	struct tipc_net *tn = net_generic(node->net, tipc_net_id);

422 423 424
	bclink_update_last_sent(node, seqno);
	node->bclink.last_in = seqno;
	node->bclink.oos_state = 0;
425
	tn->bcl->stats.recv_info++;
426 427 428 429 430

	/*
	 * Unicast an ACK periodically, ensuring that
	 * all nodes in the cluster don't ACK at the same time
	 */
431
	if (((seqno - tn->own_addr) % TIPC_MIN_LINK_WIN) == 0) {
432 433
		tipc_link_proto_xmit(node->active_links[node->addr & 1],
				     STATE_MSG, 0, 0, 0, 0, 0);
434
		tn->bcl->stats.sent_acks++;
435 436 437
	}
}

438
/**
439
 * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards
440
 *
Y
Ying Xue 已提交
441
 * RCU is locked, no other locks set
P
Per Liden 已提交
442
 */
443
void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
444
{
445
	struct tipc_net *tn = net_generic(net, tipc_net_id);
446
	struct tipc_link *bcl = tn->bcl;
P
Per Liden 已提交
447
	struct tipc_msg *msg = buf_msg(buf);
448
	struct tipc_node *node;
P
Per Liden 已提交
449 450
	u32 next_in;
	u32 seqno;
451
	int deferred = 0;
P
Per Liden 已提交
452

453
	/* Screen out unwanted broadcast messages */
454
	if (msg_mc_netid(msg) != tn->net_id)
455 456
		goto exit;

457
	node = tipc_node_find(net, msg_prevnode(msg));
458 459 460 461
	if (unlikely(!node))
		goto exit;

	tipc_node_lock(node);
462
	if (unlikely(!node->bclink.recv_permitted))
463
		goto unlock;
P
Per Liden 已提交
464

465
	/* Handle broadcast protocol message */
P
Per Liden 已提交
466
	if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
467 468
		if (msg_type(msg) != STATE_MSG)
			goto unlock;
469
		if (msg_destnode(msg) == tn->own_addr) {
470 471
			tipc_bclink_acknowledge(node, msg_bcast_ack(msg));
			tipc_node_unlock(node);
472
			tipc_bclink_lock(net);
P
Per Liden 已提交
473
			bcl->stats.recv_nacks++;
474 475
			tn->bclink->retransmit_to = node;
			bclink_retransmit_pkt(tn, msg_bcgap_after(msg),
P
Per Liden 已提交
476
					      msg_bcgap_to(msg));
477
			tipc_bclink_unlock(net);
P
Per Liden 已提交
478
		} else {
479
			tipc_node_unlock(node);
480
			bclink_peek_nack(net, msg);
P
Per Liden 已提交
481
		}
482
		goto exit;
P
Per Liden 已提交
483 484
	}

485
	/* Handle in-sequence broadcast message */
P
Per Liden 已提交
486
	seqno = msg_seqno(msg);
487
	next_in = mod(node->bclink.last_in + 1);
P
Per Liden 已提交
488 489

	if (likely(seqno == next_in)) {
490
receive:
491
		/* Deliver message to destination */
P
Per Liden 已提交
492
		if (likely(msg_isdata(msg))) {
493
			tipc_bclink_lock(net);
494
			bclink_accept_pkt(node, seqno);
495
			tipc_bclink_unlock(net);
496
			tipc_node_unlock(node);
497
			if (likely(msg_mcast(msg)))
498
				tipc_sk_mcast_rcv(net, buf);
499
			else
500
				kfree_skb(buf);
P
Per Liden 已提交
501
		} else if (msg_user(msg) == MSG_BUNDLER) {
502
			tipc_bclink_lock(net);
503
			bclink_accept_pkt(node, seqno);
P
Per Liden 已提交
504 505
			bcl->stats.recv_bundles++;
			bcl->stats.recv_bundled += msg_msgcnt(msg);
506
			tipc_bclink_unlock(net);
507
			tipc_node_unlock(node);
508
			tipc_link_bundle_rcv(net, buf);
P
Per Liden 已提交
509
		} else if (msg_user(msg) == MSG_FRAGMENTER) {
510 511
			tipc_buf_append(&node->bclink.reasm_buf, &buf);
			if (unlikely(!buf && !node->bclink.reasm_buf))
512
				goto unlock;
513
			tipc_bclink_lock(net);
514
			bclink_accept_pkt(node, seqno);
P
Per Liden 已提交
515
			bcl->stats.recv_fragments++;
516
			if (buf) {
P
Per Liden 已提交
517
				bcl->stats.recv_fragmented++;
518
				msg = buf_msg(buf);
519
				tipc_bclink_unlock(net);
E
Erik Hugne 已提交
520 521
				goto receive;
			}
522
			tipc_bclink_unlock(net);
523
			tipc_node_unlock(node);
524
		} else if (msg_user(msg) == NAME_DISTRIBUTOR) {
525
			tipc_bclink_lock(net);
526
			bclink_accept_pkt(node, seqno);
527
			tipc_bclink_unlock(net);
528
			tipc_node_unlock(node);
529
			tipc_named_rcv(net, buf);
P
Per Liden 已提交
530
		} else {
531
			tipc_bclink_lock(net);
532
			bclink_accept_pkt(node, seqno);
533
			tipc_bclink_unlock(net);
534
			tipc_node_unlock(node);
535
			kfree_skb(buf);
P
Per Liden 已提交
536
		}
537
		buf = NULL;
538 539

		/* Determine new synchronization state */
540
		tipc_node_lock(node);
541 542 543
		if (unlikely(!tipc_node_is_up(node)))
			goto unlock;

544
		if (node->bclink.last_in == node->bclink.last_sent)
545 546
			goto unlock;

547
		if (skb_queue_empty(&node->bclink.deferred_queue)) {
548 549 550 551
			node->bclink.oos_state = 1;
			goto unlock;
		}

552
		msg = buf_msg(skb_peek(&node->bclink.deferred_queue));
553 554 555 556 557 558
		seqno = msg_seqno(msg);
		next_in = mod(next_in + 1);
		if (seqno != next_in)
			goto unlock;

		/* Take in-sequence message from deferred queue & deliver it */
559
		buf = __skb_dequeue(&node->bclink.deferred_queue);
560 561 562 563 564
		goto receive;
	}

	/* Handle out-of-sequence broadcast message */
	if (less(next_in, seqno)) {
565
		deferred = tipc_link_defer_pkt(&node->bclink.deferred_queue,
566
					       buf);
567
		bclink_update_last_sent(node, seqno);
568
		buf = NULL;
569
	}
570

571
	tipc_bclink_lock(net);
572

573 574
	if (deferred)
		bcl->stats.deferred_recv++;
575 576
	else
		bcl->stats.duplicates++;
577

578
	tipc_bclink_unlock(net);
579

580
unlock:
581
	tipc_node_unlock(node);
582
exit:
583
	kfree_skb(buf);
P
Per Liden 已提交
584 585
}

586
u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)
P
Per Liden 已提交
587
{
588
	return (n_ptr->bclink.recv_permitted &&
589
		(tipc_bclink_get_last_sent(n_ptr->net) != n_ptr->bclink.acked));
P
Per Liden 已提交
590 591 592 593
}


/**
594
 * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer
595
 *
596 597
 * Send packet over as many bearers as necessary to reach all nodes
 * that have joined the broadcast link.
598
 *
599 600
 * Returns 0 (packet sent successfully) under all circumstances,
 * since the broadcast link's pseudo-bearer never blocks
P
Per Liden 已提交
601
 */
602 603
static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
			      struct tipc_bearer *unused1,
A
Adrian Bunk 已提交
604
			      struct tipc_media_addr *unused2)
P
Per Liden 已提交
605 606
{
	int bp_index;
607
	struct tipc_msg *msg = buf_msg(buf);
608
	struct tipc_net *tn = net_generic(net, tipc_net_id);
609 610
	struct tipc_bcbearer *bcbearer = tn->bcbearer;
	struct tipc_bclink *bclink = tn->bclink;
P
Per Liden 已提交
611

612
	/* Prepare broadcast link message for reliable transmission,
613 614 615 616
	 * if first time trying to send it;
	 * preparation is skipped for broadcast link protocol messages
	 * since they are sent in an unreliable manner and don't need it
	 */
P
Per Liden 已提交
617
	if (likely(!msg_non_seq(buf_msg(buf)))) {
618
		bcbuf_set_acks(buf, bclink->bcast_nodes.count);
619
		msg_set_non_seq(msg, 1);
620
		msg_set_mc_netid(msg, tn->net_id);
621
		tn->bcl->stats.sent_info++;
622

623
		if (WARN_ON(!bclink->bcast_nodes.count)) {
624 625 626
			dump_stack();
			return 0;
		}
P
Per Liden 已提交
627 628 629
	}

	/* Send buffer over bearers until all targets reached */
630
	bcbearer->remains = bclink->bcast_nodes;
P
Per Liden 已提交
631 632

	for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) {
633 634
		struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary;
		struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary;
635 636
		struct tipc_bearer *bp[2] = {p, s};
		struct tipc_bearer *b = bp[msg_link_selector(msg)];
637
		struct sk_buff *tbuf;
P
Per Liden 已提交
638 639

		if (!p)
640
			break; /* No more bearers to try */
641 642
		if (!b)
			b = p;
643
		tipc_nmap_diff(&bcbearer->remains, &b->nodes,
644
			       &bcbearer->remains_new);
645
		if (bcbearer->remains_new.count == bcbearer->remains.count)
646
			continue; /* Nothing added by bearer pair */
P
Per Liden 已提交
647

648 649
		if (bp_index == 0) {
			/* Use original buffer for first bearer */
650
			tipc_bearer_send(net, b->identity, buf, &b->bcast_addr);
651 652
		} else {
			/* Avoid concurrent buffer access */
653
			tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC);
654 655
			if (!tbuf)
				break;
656 657
			tipc_bearer_send(net, b->identity, tbuf,
					 &b->bcast_addr);
658 659
			kfree_skb(tbuf); /* Bearer keeps a clone */
		}
660
		if (bcbearer->remains_new.count == 0)
661
			break; /* All targets reached */
P
Per Liden 已提交
662

663
		bcbearer->remains = bcbearer->remains_new;
P
Per Liden 已提交
664
	}
665

666
	return 0;
P
Per Liden 已提交
667 668 669
}

/**
670
 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
P
Per Liden 已提交
671
 */
672 673
void tipc_bcbearer_sort(struct net *net, struct tipc_node_map *nm_ptr,
			u32 node, bool action)
P
Per Liden 已提交
674
{
675
	struct tipc_net *tn = net_generic(net, tipc_net_id);
676
	struct tipc_bcbearer *bcbearer = tn->bcbearer;
677 678
	struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp;
	struct tipc_bcbearer_pair *bp_curr;
Y
Ying Xue 已提交
679
	struct tipc_bearer *b;
P
Per Liden 已提交
680 681 682
	int b_index;
	int pri;

683
	tipc_bclink_lock(net);
P
Per Liden 已提交
684

685 686 687 688 689
	if (action)
		tipc_nmap_add(nm_ptr, node);
	else
		tipc_nmap_remove(nm_ptr, node);

P
Per Liden 已提交
690 691 692
	/* Group bearers by priority (can assume max of two per priority) */
	memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp));

Y
Ying Xue 已提交
693
	rcu_read_lock();
P
Per Liden 已提交
694
	for (b_index = 0; b_index < MAX_BEARERS; b_index++) {
695
		b = rcu_dereference_rtnl(tn->bearer_list[b_index]);
696
		if (!b || !b->nodes.count)
P
Per Liden 已提交
697 698 699 700 701 702 703
			continue;

		if (!bp_temp[b->priority].primary)
			bp_temp[b->priority].primary = b;
		else
			bp_temp[b->priority].secondary = b;
	}
Y
Ying Xue 已提交
704
	rcu_read_unlock();
P
Per Liden 已提交
705 706 707 708 709

	/* Create array of bearer pairs for broadcasting */
	bp_curr = bcbearer->bpairs;
	memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs));

P
Per Liden 已提交
710
	for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) {
P
Per Liden 已提交
711 712 713 714 715 716 717

		if (!bp_temp[pri].primary)
			continue;

		bp_curr->primary = bp_temp[pri].primary;

		if (bp_temp[pri].secondary) {
718 719
			if (tipc_nmap_equal(&bp_temp[pri].primary->nodes,
					    &bp_temp[pri].secondary->nodes)) {
P
Per Liden 已提交
720 721 722 723 724 725 726 727 728 729
				bp_curr->secondary = bp_temp[pri].secondary;
			} else {
				bp_curr++;
				bp_curr->primary = bp_temp[pri].secondary;
			}
		}

		bp_curr++;
	}

730
	tipc_bclink_unlock(net);
P
Per Liden 已提交
731 732
}

733 734
static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb,
				      struct tipc_stats *stats)
735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783
{
	int i;
	struct nlattr *nest;

	struct nla_map {
		__u32 key;
		__u32 val;
	};

	struct nla_map map[] = {
		{TIPC_NLA_STATS_RX_INFO, stats->recv_info},
		{TIPC_NLA_STATS_RX_FRAGMENTS, stats->recv_fragments},
		{TIPC_NLA_STATS_RX_FRAGMENTED, stats->recv_fragmented},
		{TIPC_NLA_STATS_RX_BUNDLES, stats->recv_bundles},
		{TIPC_NLA_STATS_RX_BUNDLED, stats->recv_bundled},
		{TIPC_NLA_STATS_TX_INFO, stats->sent_info},
		{TIPC_NLA_STATS_TX_FRAGMENTS, stats->sent_fragments},
		{TIPC_NLA_STATS_TX_FRAGMENTED, stats->sent_fragmented},
		{TIPC_NLA_STATS_TX_BUNDLES, stats->sent_bundles},
		{TIPC_NLA_STATS_TX_BUNDLED, stats->sent_bundled},
		{TIPC_NLA_STATS_RX_NACKS, stats->recv_nacks},
		{TIPC_NLA_STATS_RX_DEFERRED, stats->deferred_recv},
		{TIPC_NLA_STATS_TX_NACKS, stats->sent_nacks},
		{TIPC_NLA_STATS_TX_ACKS, stats->sent_acks},
		{TIPC_NLA_STATS_RETRANSMITTED, stats->retransmitted},
		{TIPC_NLA_STATS_DUPLICATES, stats->duplicates},
		{TIPC_NLA_STATS_LINK_CONGS, stats->link_congs},
		{TIPC_NLA_STATS_MAX_QUEUE, stats->max_queue_sz},
		{TIPC_NLA_STATS_AVG_QUEUE, stats->queue_sz_counts ?
			(stats->accu_queue_sz / stats->queue_sz_counts) : 0}
	};

	nest = nla_nest_start(skb, TIPC_NLA_LINK_STATS);
	if (!nest)
		return -EMSGSIZE;

	for (i = 0; i <  ARRAY_SIZE(map); i++)
		if (nla_put_u32(skb, map[i].key, map[i].val))
			goto msg_full;

	nla_nest_end(skb, nest);

	return 0;
msg_full:
	nla_nest_cancel(skb, nest);

	return -EMSGSIZE;
}

784
int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
785 786 787 788 789
{
	int err;
	void *hdr;
	struct nlattr *attrs;
	struct nlattr *prop;
790 791
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;
792 793 794 795

	if (!bcl)
		return 0;

796
	tipc_bclink_lock(net);
797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830

	hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_v2_family,
			  NLM_F_MULTI, TIPC_NL_LINK_GET);
	if (!hdr)
		return -EMSGSIZE;

	attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK);
	if (!attrs)
		goto msg_full;

	/* The broadcast link is always up */
	if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP))
		goto attr_msg_full;

	if (nla_put_flag(msg->skb, TIPC_NLA_LINK_BROADCAST))
		goto attr_msg_full;
	if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, bcl->name))
		goto attr_msg_full;
	if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, bcl->next_in_no))
		goto attr_msg_full;
	if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, bcl->next_out_no))
		goto attr_msg_full;

	prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP);
	if (!prop)
		goto attr_msg_full;
	if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->queue_limit[0]))
		goto prop_msg_full;
	nla_nest_end(msg->skb, prop);

	err = __tipc_nl_add_bc_link_stat(msg->skb, &bcl->stats);
	if (err)
		goto attr_msg_full;

831
	tipc_bclink_unlock(net);
832 833 834 835 836 837 838 839 840 841
	nla_nest_end(msg->skb, attrs);
	genlmsg_end(msg->skb, hdr);

	return 0;

prop_msg_full:
	nla_nest_cancel(msg->skb, prop);
attr_msg_full:
	nla_nest_cancel(msg->skb, attrs);
msg_full:
842
	tipc_bclink_unlock(net);
843 844 845 846
	genlmsg_cancel(msg->skb, hdr);

	return -EMSGSIZE;
}
P
Per Liden 已提交
847

848
int tipc_bclink_stats(struct net *net, char *buf, const u32 buf_size)
P
Per Liden 已提交
849
{
850 851
	int ret;
	struct tipc_stats *s;
852 853
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;
P
Per Liden 已提交
854 855 856 857

	if (!bcl)
		return 0;

858
	tipc_bclink_lock(net);
P
Per Liden 已提交
859

860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881
	s = &bcl->stats;

	ret = tipc_snprintf(buf, buf_size, "Link <%s>\n"
			    "  Window:%u packets\n",
			    bcl->name, bcl->queue_limit[0]);
	ret += tipc_snprintf(buf + ret, buf_size - ret,
			     "  RX packets:%u fragments:%u/%u bundles:%u/%u\n",
			     s->recv_info, s->recv_fragments,
			     s->recv_fragmented, s->recv_bundles,
			     s->recv_bundled);
	ret += tipc_snprintf(buf + ret, buf_size - ret,
			     "  TX packets:%u fragments:%u/%u bundles:%u/%u\n",
			     s->sent_info, s->sent_fragments,
			     s->sent_fragmented, s->sent_bundles,
			     s->sent_bundled);
	ret += tipc_snprintf(buf + ret, buf_size - ret,
			     "  RX naks:%u defs:%u dups:%u\n",
			     s->recv_nacks, s->deferred_recv, s->duplicates);
	ret += tipc_snprintf(buf + ret, buf_size - ret,
			     "  TX naks:%u acks:%u dups:%u\n",
			     s->sent_nacks, s->sent_acks, s->retransmitted);
	ret += tipc_snprintf(buf + ret, buf_size - ret,
882 883
			     "  Congestion link:%u  Send queue max:%u avg:%u\n",
			     s->link_congs, s->max_queue_sz,
884 885
			     s->queue_sz_counts ?
			     (s->accu_queue_sz / s->queue_sz_counts) : 0);
P
Per Liden 已提交
886

887
	tipc_bclink_unlock(net);
888
	return ret;
P
Per Liden 已提交
889 890
}

891
int tipc_bclink_reset_stats(struct net *net)
P
Per Liden 已提交
892
{
893 894 895
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;

P
Per Liden 已提交
896 897 898
	if (!bcl)
		return -ENOPROTOOPT;

899
	tipc_bclink_lock(net);
P
Per Liden 已提交
900
	memset(&bcl->stats, 0, sizeof(bcl->stats));
901
	tipc_bclink_unlock(net);
902
	return 0;
P
Per Liden 已提交
903 904
}

905
int tipc_bclink_set_queue_limits(struct net *net, u32 limit)
P
Per Liden 已提交
906
{
907 908 909
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;

P
Per Liden 已提交
910 911 912 913 914
	if (!bcl)
		return -ENOPROTOOPT;
	if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN))
		return -EINVAL;

915
	tipc_bclink_lock(net);
916
	tipc_link_set_queue_limits(bcl, limit);
917
	tipc_bclink_unlock(net);
918
	return 0;
P
Per Liden 已提交
919 920
}

921
int tipc_bclink_init(struct net *net)
P
Per Liden 已提交
922
{
923
	struct tipc_net *tn = net_generic(net, tipc_net_id);
924 925 926
	struct tipc_bcbearer *bcbearer;
	struct tipc_bclink *bclink;
	struct tipc_link *bcl;
927

928 929 930 931 932 933 934 935 936 937 938
	bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC);
	if (!bcbearer)
		return -ENOMEM;

	bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC);
	if (!bclink) {
		kfree(bcbearer);
		return -ENOMEM;
	}

	bcl = &bclink->link;
P
Per Liden 已提交
939
	bcbearer->bearer.media = &bcbearer->media;
940
	bcbearer->media.send_msg = tipc_bcbearer_send;
941
	sprintf(bcbearer->media.name, "tipc-broadcast");
P
Per Liden 已提交
942

943
	spin_lock_init(&bclink->lock);
944
	__skb_queue_head_init(&bcl->outqueue);
945
	__skb_queue_head_init(&bcl->deferred_queue);
946
	skb_queue_head_init(&bcl->waiting_sks);
P
Per Liden 已提交
947
	bcl->next_out_no = 1;
I
Ingo Molnar 已提交
948
	spin_lock_init(&bclink->node.lock);
949
	__skb_queue_head_init(&bclink->node.waiting_sks);
P
Per Liden 已提交
950
	bcl->owner = &bclink->node;
951
	bcl->owner->net = net;
952
	bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;
953
	tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);
954
	bcl->bearer_id = MAX_BEARERS;
955
	rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcbearer->bearer);
P
Per Liden 已提交
956
	bcl->state = WORKING_WORKING;
957
	strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);
958 959 960
	tn->bcbearer = bcbearer;
	tn->bclink = bclink;
	tn->bcl = bcl;
961
	return 0;
P
Per Liden 已提交
962 963
}

964
void tipc_bclink_stop(struct net *net)
P
Per Liden 已提交
965
{
966 967
	struct tipc_net *tn = net_generic(net, tipc_net_id);

968 969 970
	tipc_bclink_lock(net);
	tipc_link_purge_queues(tn->bcl);
	tipc_bclink_unlock(net);
971

972
	RCU_INIT_POINTER(tn->bearer_list[BCBEARER], NULL);
973
	synchronize_net();
974 975
	kfree(tn->bcbearer);
	kfree(tn->bclink);
P
Per Liden 已提交
976 977
}

978 979 980
/**
 * tipc_nmap_add - add a node to a node map
 */
981
static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
982 983 984 985 986 987 988 989 990 991 992 993 994 995
{
	int n = tipc_node(node);
	int w = n / WSIZE;
	u32 mask = (1 << (n % WSIZE));

	if ((nm_ptr->map[w] & mask) == 0) {
		nm_ptr->count++;
		nm_ptr->map[w] |= mask;
	}
}

/**
 * tipc_nmap_remove - remove a node from a node map
 */
996
static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)
997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013
{
	int n = tipc_node(node);
	int w = n / WSIZE;
	u32 mask = (1 << (n % WSIZE));

	if ((nm_ptr->map[w] & mask) != 0) {
		nm_ptr->map[w] &= ~mask;
		nm_ptr->count--;
	}
}

/**
 * tipc_nmap_diff - find differences between node maps
 * @nm_a: input node map A
 * @nm_b: input node map B
 * @nm_diff: output node map A-B (i.e. nodes of A that are not in B)
 */
1014 1015 1016
static void tipc_nmap_diff(struct tipc_node_map *nm_a,
			   struct tipc_node_map *nm_b,
			   struct tipc_node_map *nm_diff)
1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034
{
	int stop = ARRAY_SIZE(nm_a->map);
	int w;
	int b;
	u32 map;

	memset(nm_diff, 0, sizeof(*nm_diff));
	for (w = 0; w < stop; w++) {
		map = nm_a->map[w] ^ (nm_a->map[w] & nm_b->map[w]);
		nm_diff->map[w] = map;
		if (map != 0) {
			for (b = 0 ; b < WSIZE; b++) {
				if (map & (1 << b))
					nm_diff->count++;
			}
		}
	}
}
1035 1036 1037 1038

/**
 * tipc_port_list_add - add a port to a port list, ensuring no duplicates
 */
1039
void tipc_port_list_add(struct tipc_port_list *pl_ptr, u32 port)
1040
{
1041
	struct tipc_port_list *item = pl_ptr;
1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059
	int i;
	int item_sz = PLSIZE;
	int cnt = pl_ptr->count;

	for (; ; cnt -= item_sz, item = item->next) {
		if (cnt < PLSIZE)
			item_sz = cnt;
		for (i = 0; i < item_sz; i++)
			if (item->ports[i] == port)
				return;
		if (i < PLSIZE) {
			item->ports[i] = port;
			pl_ptr->count++;
			return;
		}
		if (!item->next) {
			item->next = kmalloc(sizeof(*item), GFP_ATOMIC);
			if (!item->next) {
1060
				pr_warn("Incomplete multicast delivery, no memory\n");
1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071
				return;
			}
			item->next->next = NULL;
		}
	}
}

/**
 * tipc_port_list_free - free dynamically created entries in port_list chain
 *
 */
1072
void tipc_port_list_free(struct tipc_port_list *pl_ptr)
1073
{
1074 1075
	struct tipc_port_list *item;
	struct tipc_port_list *next;
1076 1077 1078 1079 1080 1081

	for (item = pl_ptr->next; item; item = next) {
		next = item->next;
		kfree(item);
	}
}