bcast.c 26.9 KB
Newer Older
P
Per Liden 已提交
1 2
/*
 * net/tipc/bcast.c: TIPC broadcast code
3
 *
4
 * Copyright (c) 2004-2006, 2014-2015, Ericsson AB
P
Per Liden 已提交
5
 * Copyright (c) 2004, Intel Corporation.
6
 * Copyright (c) 2005, 2010-2011, Wind River Systems
P
Per Liden 已提交
7 8
 * All rights reserved.
 *
P
Per Liden 已提交
9
 * Redistribution and use in source and binary forms, with or without
P
Per Liden 已提交
10 11
 * modification, are permitted provided that the following conditions are met:
 *
P
Per Liden 已提交
12 13 14 15 16 17 18 19
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 * 3. Neither the names of the copyright holders nor the names of its
 *    contributors may be used to endorse or promote products derived from
 *    this software without specific prior written permission.
P
Per Liden 已提交
20
 *
P
Per Liden 已提交
21 22 23 24 25 26 27 28 29 30 31 32 33 34
 * Alternatively, this software may be distributed under the terms of the
 * GNU General Public License ("GPL") version 2 as published by the Free
 * Software Foundation.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
P
Per Liden 已提交
35 36 37
 * POSSIBILITY OF SUCH DAMAGE.
 */

38 39
#include "socket.h"
#include "msg.h"
P
Per Liden 已提交
40
#include "bcast.h"
41
#include "name_distr.h"
42
#include "core.h"
P
Per Liden 已提交
43

44 45
#define	MAX_PKT_DEFAULT_MCAST	1500	/* bcast link max packet size (fixed) */
#define	BCLINK_WIN_DEFAULT	20	/* bcast link window size (default) */
P
Per Liden 已提交
46

47
const char tipc_bclink_name[] = "broadcast-link";
P
Per Liden 已提交
48

49 50 51
static void tipc_nmap_diff(struct tipc_node_map *nm_a,
			   struct tipc_node_map *nm_b,
			   struct tipc_node_map *nm_diff);
52 53
static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node);
static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node);
P
Per Liden 已提交
54

55
static void tipc_bclink_lock(struct net *net)
56
{
57 58 59
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	spin_lock_bh(&tn->bclink->lock);
60 61
}

62
static void tipc_bclink_unlock(struct net *net)
63
{
64
	struct tipc_net *tn = net_generic(net, tipc_net_id);
65

66
	spin_unlock_bh(&tn->bclink->lock);
67 68
}

69 70 71 72 73 74 75
void tipc_bclink_input(struct net *net)
{
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	tipc_sk_mcast_rcv(net, &tn->bclink->arrvq, &tn->bclink->inputq);
}

76 77 78 79 80
uint  tipc_bclink_get_mtu(void)
{
	return MAX_PKT_DEFAULT_MCAST;
}

S
Sam Ravnborg 已提交
81
static u32 bcbuf_acks(struct sk_buff *buf)
P
Per Liden 已提交
82
{
83
	return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle;
P
Per Liden 已提交
84 85
}

S
Sam Ravnborg 已提交
86
static void bcbuf_set_acks(struct sk_buff *buf, u32 acks)
P
Per Liden 已提交
87
{
88
	TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks;
P
Per Liden 已提交
89 90
}

S
Sam Ravnborg 已提交
91
static void bcbuf_decr_acks(struct sk_buff *buf)
P
Per Liden 已提交
92 93 94 95
{
	bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);
}

96
void tipc_bclink_add_node(struct net *net, u32 addr)
97
{
98 99 100 101 102
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	tipc_bclink_lock(net);
	tipc_nmap_add(&tn->bclink->bcast_nodes, addr);
	tipc_bclink_unlock(net);
103 104
}

105
void tipc_bclink_remove_node(struct net *net, u32 addr)
106
{
107 108 109 110
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	tipc_bclink_lock(net);
	tipc_nmap_remove(&tn->bclink->bcast_nodes, addr);
111 112 113 114 115

	/* Last node? => reset backlog queue */
	if (!tn->bclink->bcast_nodes.count)
		tipc_link_purge_backlog(&tn->bclink->link);

116
	tipc_bclink_unlock(net);
117
}
P
Per Liden 已提交
118

119
static void bclink_set_last_sent(struct net *net)
120
{
121 122 123
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;

124
	bcl->silent_intv_cnt = mod(bcl->snd_nxt - 1);
125 126
}

127
u32 tipc_bclink_get_last_sent(struct net *net)
128
{
129 130
	struct tipc_net *tn = net_generic(net, tipc_net_id);

131
	return tn->bcl->silent_intv_cnt;
132 133
}

134
static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
P
Per Liden 已提交
135
{
136 137
	node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ?
						seqno : node->bclink.last_sent;
P
Per Liden 已提交
138 139
}

140
/**
141 142
 * tipc_bclink_retransmit_to - get most recent node to request retransmission
 *
143
 * Called with bclink_lock locked
144
 */
145
struct tipc_node *tipc_bclink_retransmit_to(struct net *net)
146
{
147 148 149
	struct tipc_net *tn = net_generic(net, tipc_net_id);

	return tn->bclink->retransmit_to;
150 151
}

152
/**
P
Per Liden 已提交
153 154 155
 * bclink_retransmit_pkt - retransmit broadcast packets
 * @after: sequence number of last packet to *not* retransmit
 * @to: sequence number of last packet to retransmit
156
 *
157
 * Called with bclink_lock locked
P
Per Liden 已提交
158
 */
159
static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)
P
Per Liden 已提交
160
{
161
	struct sk_buff *skb;
162
	struct tipc_link *bcl = tn->bcl;
P
Per Liden 已提交
163

J
Jon Paul Maloy 已提交
164
	skb_queue_walk(&bcl->transmq, skb) {
165 166
		if (more(buf_seqno(skb), after)) {
			tipc_link_retransmit(bcl, skb, mod(to - after));
167
			break;
168
		}
169
	}
P
Per Liden 已提交
170 171
}

172 173 174 175 176
/**
 * tipc_bclink_wakeup_users - wake up pending users
 *
 * Called with no locks taken
 */
177
void tipc_bclink_wakeup_users(struct net *net)
178
{
179
	struct tipc_net *tn = net_generic(net, tipc_net_id);
180

181
	tipc_sk_rcv(net, &tn->bclink->link.wakeupq);
182 183
}

184
/**
185
 * tipc_bclink_acknowledge - handle acknowledgement of broadcast packets
P
Per Liden 已提交
186 187
 * @n_ptr: node that sent acknowledgement info
 * @acked: broadcast sequence # that has been acknowledged
188
 *
189
 * Node is locked, bclink_lock unlocked.
P
Per Liden 已提交
190
 */
191
void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
P
Per Liden 已提交
192
{
193
	struct sk_buff *skb, *tmp;
P
Per Liden 已提交
194
	unsigned int released = 0;
195 196
	struct net *net = n_ptr->net;
	struct tipc_net *tn = net_generic(net, tipc_net_id);
P
Per Liden 已提交
197

198 199 200
	if (unlikely(!n_ptr->bclink.recv_permitted))
		return;

201
	tipc_bclink_lock(net);
202

203
	/* Bail out if tx queue is empty (no clean up is required) */
J
Jon Paul Maloy 已提交
204
	skb = skb_peek(&tn->bcl->transmq);
205
	if (!skb)
206 207 208 209 210 211 212 213 214
		goto exit;

	/* Determine which messages need to be acknowledged */
	if (acked == INVALID_LINK_SEQ) {
		/*
		 * Contact with specified node has been lost, so need to
		 * acknowledge sent messages only (if other nodes still exist)
		 * or both sent and unsent messages (otherwise)
		 */
215
		if (tn->bclink->bcast_nodes.count)
216
			acked = tn->bcl->silent_intv_cnt;
217
		else
218
			acked = tn->bcl->snd_nxt;
219 220 221 222 223
	} else {
		/*
		 * Bail out if specified sequence number does not correspond
		 * to a message that has been sent and not yet acknowledged
		 */
224
		if (less(acked, buf_seqno(skb)) ||
225
		    less(tn->bcl->silent_intv_cnt, acked) ||
226 227 228 229 230
		    less_eq(acked, n_ptr->bclink.acked))
			goto exit;
	}

	/* Skip over packets that node has previously acknowledged */
J
Jon Paul Maloy 已提交
231
	skb_queue_walk(&tn->bcl->transmq, skb) {
232 233 234
		if (more(buf_seqno(skb), n_ptr->bclink.acked))
			break;
	}
P
Per Liden 已提交
235 236

	/* Update packets that node is now acknowledging */
J
Jon Paul Maloy 已提交
237
	skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) {
238 239
		if (more(buf_seqno(skb), acked))
			break;
J
Jon Paul Maloy 已提交
240 241
		bcbuf_decr_acks(skb);
		bclink_set_last_sent(net);
242
		if (bcbuf_acks(skb) == 0) {
J
Jon Paul Maloy 已提交
243
			__skb_unlink(skb, &tn->bcl->transmq);
244
			kfree_skb(skb);
P
Per Liden 已提交
245 246 247 248 249 250
			released = 1;
		}
	}
	n_ptr->bclink.acked = acked;

	/* Try resolving broadcast link congestion, if necessary */
J
Jon Paul Maloy 已提交
251
	if (unlikely(skb_peek(&tn->bcl->backlogq))) {
252 253
		tipc_link_push_packets(tn->bcl);
		bclink_set_last_sent(net);
254
	}
255
	if (unlikely(released && !skb_queue_empty(&tn->bcl->wakeupq)))
256
		n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS;
257
exit:
258
	tipc_bclink_unlock(net);
P
Per Liden 已提交
259 260
}

261
/**
262
 * tipc_bclink_update_link_state - update broadcast link state
263
 *
Y
Ying Xue 已提交
264
 * RCU and node lock set
P
Per Liden 已提交
265
 */
266
void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
267
				   u32 last_sent)
P
Per Liden 已提交
268
{
269
	struct sk_buff *buf;
270
	struct net *net = n_ptr->net;
271
	struct tipc_net *tn = net_generic(net, tipc_net_id);
P
Per Liden 已提交
272

273 274 275
	/* Ignore "stale" link state info */
	if (less_eq(last_sent, n_ptr->bclink.last_in))
		return;
P
Per Liden 已提交
276

277 278 279 280 281 282 283 284 285 286 287 288 289 290 291
	/* Update link synchronization state; quit if in sync */
	bclink_update_last_sent(n_ptr, last_sent);

	if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
		return;

	/* Update out-of-sync state; quit if loss is still unconfirmed */
	if ((++n_ptr->bclink.oos_state) == 1) {
		if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
			return;
		n_ptr->bclink.oos_state++;
	}

	/* Don't NACK if one has been recently sent (or seen) */
	if (n_ptr->bclink.oos_state & 0x1)
P
Per Liden 已提交
292 293
		return;

294
	/* Send NACK */
295
	buf = tipc_buf_acquire(INT_H_SIZE);
P
Per Liden 已提交
296
	if (buf) {
297
		struct tipc_msg *msg = buf_msg(buf);
J
Jon Paul Maloy 已提交
298
		struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferdq);
299
		u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent;
300

301
		tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG,
302
			      INT_H_SIZE, n_ptr->addr);
303
		msg_set_non_seq(msg, 1);
304
		msg_set_mc_netid(msg, tn->net_id);
305 306
		msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
		msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
307
		msg_set_bcgap_to(msg, to);
P
Per Liden 已提交
308

309
		tipc_bclink_lock(net);
310
		tipc_bearer_send(net, MAX_BEARERS, buf, NULL);
311 312
		tn->bcl->stats.sent_nacks++;
		tipc_bclink_unlock(net);
313
		kfree_skb(buf);
P
Per Liden 已提交
314

315
		n_ptr->bclink.oos_state++;
P
Per Liden 已提交
316 317 318
	}
}

319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *hdr)
{
	u16 last = msg_last_bcast(hdr);
	int mtyp = msg_type(hdr);

	if (unlikely(msg_user(hdr) != LINK_PROTOCOL))
		return;
	if (mtyp == STATE_MSG) {
		tipc_bclink_update_link_state(n, last);
		return;
	}
	/* Compatibility: older nodes don't know BCAST_PROTOCOL synchronization,
	 * and transfer synch info in LINK_PROTOCOL messages.
	 */
	if (tipc_node_is_up(n))
		return;
	if ((mtyp != RESET_MSG) && (mtyp != ACTIVATE_MSG))
		return;
	n->bclink.last_sent = last;
	n->bclink.last_in = last;
	n->bclink.oos_state = 0;
}

342
/**
343
 * bclink_peek_nack - monitor retransmission requests sent by other nodes
P
Per Liden 已提交
344
 *
345 346
 * Delay any upcoming NACK by this node if another node has already
 * requested the first message this node is going to ask for.
P
Per Liden 已提交
347
 */
348
static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
P
Per Liden 已提交
349
{
350
	struct tipc_node *n_ptr = tipc_node_find(net, msg_destnode(msg));
P
Per Liden 已提交
351

352
	if (unlikely(!n_ptr))
P
Per Liden 已提交
353
		return;
354

355
	tipc_node_lock(n_ptr);
356
	if (n_ptr->bclink.recv_permitted &&
357 358 359
	    (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
	    (n_ptr->bclink.last_in == msg_bcgap_after(msg)))
		n_ptr->bclink.oos_state = 2;
360
	tipc_node_unlock(n_ptr);
361
	tipc_node_put(n_ptr);
P
Per Liden 已提交
362 363
}

364
/* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster
365
 *                    and to identified node local sockets
366
 * @net: the applicable net namespace
367
 * @list: chain of buffers containing message
368 369 370
 * Consumes the buffer chain, except when returning -ELINKCONG
 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
 */
371
int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
372
{
373 374 375
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;
	struct tipc_bclink *bclink = tn->bclink;
376 377
	int rc = 0;
	int bc = 0;
378
	struct sk_buff *skb;
379 380
	struct sk_buff_head arrvq;
	struct sk_buff_head inputq;
381 382

	/* Prepare clone of message for local node */
383
	skb = tipc_msg_reassemble(list);
384
	if (unlikely(!skb))
385
		return -EHOSTUNREACH;
386

387
	/* Broadcast to all nodes */
388
	if (likely(bclink)) {
389
		tipc_bclink_lock(net);
390
		if (likely(bclink->bcast_nodes.count)) {
391
			rc = __tipc_link_xmit(net, bcl, list);
392
			if (likely(!rc)) {
J
Jon Paul Maloy 已提交
393
				u32 len = skb_queue_len(&bcl->transmq);
394

395
				bclink_set_last_sent(net);
396
				bcl->stats.queue_sz_counts++;
397
				bcl->stats.accu_queue_sz += len;
398 399 400
			}
			bc = 1;
		}
401
		tipc_bclink_unlock(net);
402 403 404
	}

	if (unlikely(!bc))
405
		__skb_queue_purge(list);
406

407
	if (unlikely(rc)) {
408
		kfree_skb(skb);
409 410 411 412 413 414 415
		return rc;
	}
	/* Deliver message clone */
	__skb_queue_head_init(&arrvq);
	skb_queue_head_init(&inputq);
	__skb_queue_tail(&arrvq, skb);
	tipc_sk_mcast_rcv(net, &arrvq, &inputq);
416 417 418
	return rc;
}

419
/**
420 421
 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
 *
422
 * Called with both sending node's lock and bclink_lock taken.
423 424 425
 */
static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
{
426 427
	struct tipc_net *tn = net_generic(node->net, tipc_net_id);

428 429 430
	bclink_update_last_sent(node, seqno);
	node->bclink.last_in = seqno;
	node->bclink.oos_state = 0;
431
	tn->bcl->stats.recv_info++;
432 433 434 435 436

	/*
	 * Unicast an ACK periodically, ensuring that
	 * all nodes in the cluster don't ACK at the same time
	 */
437
	if (((seqno - tn->own_addr) % TIPC_MIN_LINK_WIN) == 0) {
438
		tipc_link_proto_xmit(node_active_link(node, node->addr),
439
				     STATE_MSG, 0, 0, 0, 0);
440
		tn->bcl->stats.sent_acks++;
441 442 443
	}
}

444
/**
445
 * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards
446
 *
Y
Ying Xue 已提交
447
 * RCU is locked, no other locks set
P
Per Liden 已提交
448
 */
449
void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
450
{
451
	struct tipc_net *tn = net_generic(net, tipc_net_id);
452
	struct tipc_link *bcl = tn->bcl;
P
Per Liden 已提交
453
	struct tipc_msg *msg = buf_msg(buf);
454
	struct tipc_node *node;
P
Per Liden 已提交
455 456
	u32 next_in;
	u32 seqno;
457
	int deferred = 0;
458 459
	int pos = 0;
	struct sk_buff *iskb;
460
	struct sk_buff_head *arrvq, *inputq;
P
Per Liden 已提交
461

462
	/* Screen out unwanted broadcast messages */
463
	if (msg_mc_netid(msg) != tn->net_id)
464 465
		goto exit;

466
	node = tipc_node_find(net, msg_prevnode(msg));
467 468 469 470
	if (unlikely(!node))
		goto exit;

	tipc_node_lock(node);
471
	if (unlikely(!node->bclink.recv_permitted))
472
		goto unlock;
P
Per Liden 已提交
473

474
	/* Handle broadcast protocol message */
P
Per Liden 已提交
475
	if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
476 477
		if (msg_type(msg) != STATE_MSG)
			goto unlock;
478
		if (msg_destnode(msg) == tn->own_addr) {
479
			tipc_bclink_acknowledge(node, msg_bcast_ack(msg));
480
			tipc_bclink_lock(net);
P
Per Liden 已提交
481
			bcl->stats.recv_nacks++;
482 483
			tn->bclink->retransmit_to = node;
			bclink_retransmit_pkt(tn, msg_bcgap_after(msg),
P
Per Liden 已提交
484
					      msg_bcgap_to(msg));
485
			tipc_bclink_unlock(net);
486
			tipc_node_unlock(node);
P
Per Liden 已提交
487
		} else {
488
			tipc_node_unlock(node);
489
			bclink_peek_nack(net, msg);
P
Per Liden 已提交
490
		}
491
		tipc_node_put(node);
492
		goto exit;
P
Per Liden 已提交
493 494
	}

495
	/* Handle in-sequence broadcast message */
P
Per Liden 已提交
496
	seqno = msg_seqno(msg);
497
	next_in = mod(node->bclink.last_in + 1);
498 499
	arrvq = &tn->bclink->arrvq;
	inputq = &tn->bclink->inputq;
P
Per Liden 已提交
500 501

	if (likely(seqno == next_in)) {
502
receive:
503
		/* Deliver message to destination */
P
Per Liden 已提交
504
		if (likely(msg_isdata(msg))) {
505
			tipc_bclink_lock(net);
506
			bclink_accept_pkt(node, seqno);
507 508 509 510
			spin_lock_bh(&inputq->lock);
			__skb_queue_tail(arrvq, buf);
			spin_unlock_bh(&inputq->lock);
			node->action_flags |= TIPC_BCAST_MSG_EVT;
511
			tipc_bclink_unlock(net);
512
			tipc_node_unlock(node);
P
Per Liden 已提交
513
		} else if (msg_user(msg) == MSG_BUNDLER) {
514
			tipc_bclink_lock(net);
515
			bclink_accept_pkt(node, seqno);
P
Per Liden 已提交
516 517
			bcl->stats.recv_bundles++;
			bcl->stats.recv_bundled += msg_msgcnt(msg);
518 519 520 521 522 523 524
			pos = 0;
			while (tipc_msg_extract(buf, &iskb, &pos)) {
				spin_lock_bh(&inputq->lock);
				__skb_queue_tail(arrvq, iskb);
				spin_unlock_bh(&inputq->lock);
			}
			node->action_flags |= TIPC_BCAST_MSG_EVT;
525
			tipc_bclink_unlock(net);
526
			tipc_node_unlock(node);
P
Per Liden 已提交
527
		} else if (msg_user(msg) == MSG_FRAGMENTER) {
528
			tipc_bclink_lock(net);
529
			bclink_accept_pkt(node, seqno);
530 531 532 533 534
			tipc_buf_append(&node->bclink.reasm_buf, &buf);
			if (unlikely(!buf && !node->bclink.reasm_buf)) {
				tipc_bclink_unlock(net);
				goto unlock;
			}
P
Per Liden 已提交
535
			bcl->stats.recv_fragments++;
536
			if (buf) {
P
Per Liden 已提交
537
				bcl->stats.recv_fragmented++;
538
				msg = buf_msg(buf);
539
				tipc_bclink_unlock(net);
E
Erik Hugne 已提交
540 541
				goto receive;
			}
542
			tipc_bclink_unlock(net);
543
			tipc_node_unlock(node);
P
Per Liden 已提交
544
		} else {
545
			tipc_bclink_lock(net);
546
			bclink_accept_pkt(node, seqno);
547
			tipc_bclink_unlock(net);
548
			tipc_node_unlock(node);
549
			kfree_skb(buf);
P
Per Liden 已提交
550
		}
551
		buf = NULL;
552 553

		/* Determine new synchronization state */
554
		tipc_node_lock(node);
555 556 557
		if (unlikely(!tipc_node_is_up(node)))
			goto unlock;

558
		if (node->bclink.last_in == node->bclink.last_sent)
559 560
			goto unlock;

J
Jon Paul Maloy 已提交
561
		if (skb_queue_empty(&node->bclink.deferdq)) {
562 563 564 565
			node->bclink.oos_state = 1;
			goto unlock;
		}

J
Jon Paul Maloy 已提交
566
		msg = buf_msg(skb_peek(&node->bclink.deferdq));
567 568 569 570 571 572
		seqno = msg_seqno(msg);
		next_in = mod(next_in + 1);
		if (seqno != next_in)
			goto unlock;

		/* Take in-sequence message from deferred queue & deliver it */
J
Jon Paul Maloy 已提交
573
		buf = __skb_dequeue(&node->bclink.deferdq);
574 575 576 577 578
		goto receive;
	}

	/* Handle out-of-sequence broadcast message */
	if (less(next_in, seqno)) {
J
Jon Paul Maloy 已提交
579
		deferred = tipc_link_defer_pkt(&node->bclink.deferdq,
580
					       buf);
581
		bclink_update_last_sent(node, seqno);
582
		buf = NULL;
583
	}
584

585
	tipc_bclink_lock(net);
586

587 588
	if (deferred)
		bcl->stats.deferred_recv++;
589 590
	else
		bcl->stats.duplicates++;
591

592
	tipc_bclink_unlock(net);
593

594
unlock:
595
	tipc_node_unlock(node);
596
	tipc_node_put(node);
597
exit:
598
	kfree_skb(buf);
P
Per Liden 已提交
599 600
}

601
u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)
P
Per Liden 已提交
602
{
603
	return (n_ptr->bclink.recv_permitted &&
604
		(tipc_bclink_get_last_sent(n_ptr->net) != n_ptr->bclink.acked));
P
Per Liden 已提交
605 606 607 608
}


/**
609
 * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer
610
 *
611 612
 * Send packet over as many bearers as necessary to reach all nodes
 * that have joined the broadcast link.
613
 *
614 615
 * Returns 0 (packet sent successfully) under all circumstances,
 * since the broadcast link's pseudo-bearer never blocks
P
Per Liden 已提交
616
 */
617 618
static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
			      struct tipc_bearer *unused1,
A
Adrian Bunk 已提交
619
			      struct tipc_media_addr *unused2)
P
Per Liden 已提交
620 621
{
	int bp_index;
622
	struct tipc_msg *msg = buf_msg(buf);
623
	struct tipc_net *tn = net_generic(net, tipc_net_id);
624 625
	struct tipc_bcbearer *bcbearer = tn->bcbearer;
	struct tipc_bclink *bclink = tn->bclink;
P
Per Liden 已提交
626

627
	/* Prepare broadcast link message for reliable transmission,
628 629 630 631
	 * if first time trying to send it;
	 * preparation is skipped for broadcast link protocol messages
	 * since they are sent in an unreliable manner and don't need it
	 */
P
Per Liden 已提交
632
	if (likely(!msg_non_seq(buf_msg(buf)))) {
633
		bcbuf_set_acks(buf, bclink->bcast_nodes.count);
634
		msg_set_non_seq(msg, 1);
635
		msg_set_mc_netid(msg, tn->net_id);
636
		tn->bcl->stats.sent_info++;
637
		if (WARN_ON(!bclink->bcast_nodes.count)) {
638 639 640
			dump_stack();
			return 0;
		}
P
Per Liden 已提交
641 642 643
	}

	/* Send buffer over bearers until all targets reached */
644
	bcbearer->remains = bclink->bcast_nodes;
P
Per Liden 已提交
645 646

	for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) {
647 648
		struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary;
		struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary;
649 650
		struct tipc_bearer *bp[2] = {p, s};
		struct tipc_bearer *b = bp[msg_link_selector(msg)];
651
		struct sk_buff *tbuf;
P
Per Liden 已提交
652 653

		if (!p)
654
			break; /* No more bearers to try */
655 656
		if (!b)
			b = p;
657
		tipc_nmap_diff(&bcbearer->remains, &b->nodes,
658
			       &bcbearer->remains_new);
659
		if (bcbearer->remains_new.count == bcbearer->remains.count)
660
			continue; /* Nothing added by bearer pair */
P
Per Liden 已提交
661

662 663
		if (bp_index == 0) {
			/* Use original buffer for first bearer */
664
			tipc_bearer_send(net, b->identity, buf, &b->bcast_addr);
665 666
		} else {
			/* Avoid concurrent buffer access */
667
			tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC);
668 669
			if (!tbuf)
				break;
670 671
			tipc_bearer_send(net, b->identity, tbuf,
					 &b->bcast_addr);
672 673
			kfree_skb(tbuf); /* Bearer keeps a clone */
		}
674
		if (bcbearer->remains_new.count == 0)
675
			break; /* All targets reached */
P
Per Liden 已提交
676

677
		bcbearer->remains = bcbearer->remains_new;
P
Per Liden 已提交
678
	}
679

680
	return 0;
P
Per Liden 已提交
681 682 683
}

/**
684
 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
P
Per Liden 已提交
685
 */
686 687
void tipc_bcbearer_sort(struct net *net, struct tipc_node_map *nm_ptr,
			u32 node, bool action)
P
Per Liden 已提交
688
{
689
	struct tipc_net *tn = net_generic(net, tipc_net_id);
690
	struct tipc_bcbearer *bcbearer = tn->bcbearer;
691 692
	struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp;
	struct tipc_bcbearer_pair *bp_curr;
Y
Ying Xue 已提交
693
	struct tipc_bearer *b;
P
Per Liden 已提交
694 695 696
	int b_index;
	int pri;

697
	tipc_bclink_lock(net);
P
Per Liden 已提交
698

699 700 701 702 703
	if (action)
		tipc_nmap_add(nm_ptr, node);
	else
		tipc_nmap_remove(nm_ptr, node);

P
Per Liden 已提交
704 705 706
	/* Group bearers by priority (can assume max of two per priority) */
	memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp));

Y
Ying Xue 已提交
707
	rcu_read_lock();
P
Per Liden 已提交
708
	for (b_index = 0; b_index < MAX_BEARERS; b_index++) {
709
		b = rcu_dereference_rtnl(tn->bearer_list[b_index]);
710
		if (!b || !b->nodes.count)
P
Per Liden 已提交
711 712 713 714 715 716 717
			continue;

		if (!bp_temp[b->priority].primary)
			bp_temp[b->priority].primary = b;
		else
			bp_temp[b->priority].secondary = b;
	}
Y
Ying Xue 已提交
718
	rcu_read_unlock();
P
Per Liden 已提交
719 720 721 722 723

	/* Create array of bearer pairs for broadcasting */
	bp_curr = bcbearer->bpairs;
	memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs));

P
Per Liden 已提交
724
	for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) {
P
Per Liden 已提交
725 726 727 728 729 730 731

		if (!bp_temp[pri].primary)
			continue;

		bp_curr->primary = bp_temp[pri].primary;

		if (bp_temp[pri].secondary) {
732 733
			if (tipc_nmap_equal(&bp_temp[pri].primary->nodes,
					    &bp_temp[pri].secondary->nodes)) {
P
Per Liden 已提交
734 735 736 737 738 739 740 741 742 743
				bp_curr->secondary = bp_temp[pri].secondary;
			} else {
				bp_curr++;
				bp_curr->primary = bp_temp[pri].secondary;
			}
		}

		bp_curr++;
	}

744
	tipc_bclink_unlock(net);
P
Per Liden 已提交
745 746
}

747 748
static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb,
				      struct tipc_stats *stats)
749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797
{
	int i;
	struct nlattr *nest;

	struct nla_map {
		__u32 key;
		__u32 val;
	};

	struct nla_map map[] = {
		{TIPC_NLA_STATS_RX_INFO, stats->recv_info},
		{TIPC_NLA_STATS_RX_FRAGMENTS, stats->recv_fragments},
		{TIPC_NLA_STATS_RX_FRAGMENTED, stats->recv_fragmented},
		{TIPC_NLA_STATS_RX_BUNDLES, stats->recv_bundles},
		{TIPC_NLA_STATS_RX_BUNDLED, stats->recv_bundled},
		{TIPC_NLA_STATS_TX_INFO, stats->sent_info},
		{TIPC_NLA_STATS_TX_FRAGMENTS, stats->sent_fragments},
		{TIPC_NLA_STATS_TX_FRAGMENTED, stats->sent_fragmented},
		{TIPC_NLA_STATS_TX_BUNDLES, stats->sent_bundles},
		{TIPC_NLA_STATS_TX_BUNDLED, stats->sent_bundled},
		{TIPC_NLA_STATS_RX_NACKS, stats->recv_nacks},
		{TIPC_NLA_STATS_RX_DEFERRED, stats->deferred_recv},
		{TIPC_NLA_STATS_TX_NACKS, stats->sent_nacks},
		{TIPC_NLA_STATS_TX_ACKS, stats->sent_acks},
		{TIPC_NLA_STATS_RETRANSMITTED, stats->retransmitted},
		{TIPC_NLA_STATS_DUPLICATES, stats->duplicates},
		{TIPC_NLA_STATS_LINK_CONGS, stats->link_congs},
		{TIPC_NLA_STATS_MAX_QUEUE, stats->max_queue_sz},
		{TIPC_NLA_STATS_AVG_QUEUE, stats->queue_sz_counts ?
			(stats->accu_queue_sz / stats->queue_sz_counts) : 0}
	};

	nest = nla_nest_start(skb, TIPC_NLA_LINK_STATS);
	if (!nest)
		return -EMSGSIZE;

	for (i = 0; i <  ARRAY_SIZE(map); i++)
		if (nla_put_u32(skb, map[i].key, map[i].val))
			goto msg_full;

	nla_nest_end(skb, nest);

	return 0;
msg_full:
	nla_nest_cancel(skb, nest);

	return -EMSGSIZE;
}

798
int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
799 800 801 802 803
{
	int err;
	void *hdr;
	struct nlattr *attrs;
	struct nlattr *prop;
804 805
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;
806 807 808 809

	if (!bcl)
		return 0;

810
	tipc_bclink_lock(net);
811

812
	hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family,
813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828
			  NLM_F_MULTI, TIPC_NL_LINK_GET);
	if (!hdr)
		return -EMSGSIZE;

	attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK);
	if (!attrs)
		goto msg_full;

	/* The broadcast link is always up */
	if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP))
		goto attr_msg_full;

	if (nla_put_flag(msg->skb, TIPC_NLA_LINK_BROADCAST))
		goto attr_msg_full;
	if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, bcl->name))
		goto attr_msg_full;
829
	if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, bcl->rcv_nxt))
830
		goto attr_msg_full;
831
	if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, bcl->snd_nxt))
832 833 834 835 836
		goto attr_msg_full;

	prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP);
	if (!prop)
		goto attr_msg_full;
837
	if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->window))
838 839 840 841 842 843 844
		goto prop_msg_full;
	nla_nest_end(msg->skb, prop);

	err = __tipc_nl_add_bc_link_stat(msg->skb, &bcl->stats);
	if (err)
		goto attr_msg_full;

845
	tipc_bclink_unlock(net);
846 847 848 849 850 851 852 853 854 855
	nla_nest_end(msg->skb, attrs);
	genlmsg_end(msg->skb, hdr);

	return 0;

prop_msg_full:
	nla_nest_cancel(msg->skb, prop);
attr_msg_full:
	nla_nest_cancel(msg->skb, attrs);
msg_full:
856
	tipc_bclink_unlock(net);
857 858 859 860
	genlmsg_cancel(msg->skb, hdr);

	return -EMSGSIZE;
}
P
Per Liden 已提交
861

862
int tipc_bclink_reset_stats(struct net *net)
P
Per Liden 已提交
863
{
864 865 866
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;

P
Per Liden 已提交
867 868 869
	if (!bcl)
		return -ENOPROTOOPT;

870
	tipc_bclink_lock(net);
P
Per Liden 已提交
871
	memset(&bcl->stats, 0, sizeof(bcl->stats));
872
	tipc_bclink_unlock(net);
873
	return 0;
P
Per Liden 已提交
874 875
}

876
int tipc_bclink_set_queue_limits(struct net *net, u32 limit)
P
Per Liden 已提交
877
{
878 879 880
	struct tipc_net *tn = net_generic(net, tipc_net_id);
	struct tipc_link *bcl = tn->bcl;

P
Per Liden 已提交
881 882 883 884 885
	if (!bcl)
		return -ENOPROTOOPT;
	if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN))
		return -EINVAL;

886
	tipc_bclink_lock(net);
887
	tipc_link_set_queue_limits(bcl, limit);
888
	tipc_bclink_unlock(net);
889
	return 0;
P
Per Liden 已提交
890 891
}

892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912
int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[])
{
	int err;
	u32 win;
	struct nlattr *props[TIPC_NLA_PROP_MAX + 1];

	if (!attrs[TIPC_NLA_LINK_PROP])
		return -EINVAL;

	err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_LINK_PROP], props);
	if (err)
		return err;

	if (!props[TIPC_NLA_PROP_WIN])
		return -EOPNOTSUPP;

	win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);

	return tipc_bclink_set_queue_limits(net, win);
}

913
int tipc_bclink_init(struct net *net)
P
Per Liden 已提交
914
{
915
	struct tipc_net *tn = net_generic(net, tipc_net_id);
916 917 918
	struct tipc_bcbearer *bcbearer;
	struct tipc_bclink *bclink;
	struct tipc_link *bcl;
919

920 921 922 923 924 925 926 927 928 929 930
	bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC);
	if (!bcbearer)
		return -ENOMEM;

	bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC);
	if (!bclink) {
		kfree(bcbearer);
		return -ENOMEM;
	}

	bcl = &bclink->link;
P
Per Liden 已提交
931
	bcbearer->bearer.media = &bcbearer->media;
932
	bcbearer->media.send_msg = tipc_bcbearer_send;
933
	sprintf(bcbearer->media.name, "tipc-broadcast");
P
Per Liden 已提交
934

935
	spin_lock_init(&bclink->lock);
J
Jon Paul Maloy 已提交
936 937 938
	__skb_queue_head_init(&bcl->transmq);
	__skb_queue_head_init(&bcl->backlogq);
	__skb_queue_head_init(&bcl->deferdq);
939
	skb_queue_head_init(&bcl->wakeupq);
940
	bcl->snd_nxt = 1;
I
Ingo Molnar 已提交
941
	spin_lock_init(&bclink->node.lock);
942 943
	__skb_queue_head_init(&bclink->arrvq);
	skb_queue_head_init(&bclink->inputq);
P
Per Liden 已提交
944
	bcl->owner = &bclink->node;
945
	bcl->owner->net = net;
946
	bcl->mtu = MAX_PKT_DEFAULT_MCAST;
947
	tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);
948
	bcl->bearer_id = MAX_BEARERS;
949
	rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcbearer->bearer);
950 951
	bcl->pmsg = (struct tipc_msg *)&bcl->proto_msg;
	msg_set_prevnode(bcl->pmsg, tn->own_addr);
952
	strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);
953 954 955
	tn->bcbearer = bcbearer;
	tn->bclink = bclink;
	tn->bcl = bcl;
956
	return 0;
P
Per Liden 已提交
957 958
}

959
void tipc_bclink_stop(struct net *net)
P
Per Liden 已提交
960
{
961 962
	struct tipc_net *tn = net_generic(net, tipc_net_id);

963 964 965
	tipc_bclink_lock(net);
	tipc_link_purge_queues(tn->bcl);
	tipc_bclink_unlock(net);
966

967
	RCU_INIT_POINTER(tn->bearer_list[BCBEARER], NULL);
968
	synchronize_net();
969 970
	kfree(tn->bcbearer);
	kfree(tn->bclink);
P
Per Liden 已提交
971 972
}

973 974 975
/**
 * tipc_nmap_add - add a node to a node map
 */
976
static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
977 978 979 980 981 982 983 984 985 986 987 988 989 990
{
	int n = tipc_node(node);
	int w = n / WSIZE;
	u32 mask = (1 << (n % WSIZE));

	if ((nm_ptr->map[w] & mask) == 0) {
		nm_ptr->count++;
		nm_ptr->map[w] |= mask;
	}
}

/**
 * tipc_nmap_remove - remove a node from a node map
 */
991
static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)
992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008
{
	int n = tipc_node(node);
	int w = n / WSIZE;
	u32 mask = (1 << (n % WSIZE));

	if ((nm_ptr->map[w] & mask) != 0) {
		nm_ptr->map[w] &= ~mask;
		nm_ptr->count--;
	}
}

/**
 * tipc_nmap_diff - find differences between node maps
 * @nm_a: input node map A
 * @nm_b: input node map B
 * @nm_diff: output node map A-B (i.e. nodes of A that are not in B)
 */
1009 1010 1011
static void tipc_nmap_diff(struct tipc_node_map *nm_a,
			   struct tipc_node_map *nm_b,
			   struct tipc_node_map *nm_diff)
1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029
{
	int stop = ARRAY_SIZE(nm_a->map);
	int w;
	int b;
	u32 map;

	memset(nm_diff, 0, sizeof(*nm_diff));
	for (w = 0; w < stop; w++) {
		map = nm_a->map[w] ^ (nm_a->map[w] & nm_b->map[w]);
		nm_diff->map[w] = map;
		if (map != 0) {
			for (b = 0 ; b < WSIZE; b++) {
				if (map & (1 << b))
					nm_diff->count++;
			}
		}
	}
}