group.c 21.9 KB
Newer Older
J
Jon Maloy 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
/*
 * net/tipc/group.c: TIPC group messaging code
 *
 * Copyright (c) 2017, Ericsson AB
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 * 3. Neither the names of the copyright holders nor the names of its
 *    contributors may be used to endorse or promote products derived from
 *    this software without specific prior written permission.
 *
 * Alternatively, this software may be distributed under the terms of the
 * GNU General Public License ("GPL") version 2 as published by the Free
 * Software Foundation.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include "core.h"
#include "addr.h"
#include "group.h"
#include "bcast.h"
#include "server.h"
#include "msg.h"
#include "socket.h"
#include "node.h"
#include "name_table.h"
#include "subscr.h"

#define ADV_UNIT (((MAX_MSG_SIZE + MAX_H_SIZE) / FLOWCTL_BLK_SZ) + 1)
#define ADV_IDLE ADV_UNIT
49
#define ADV_ACTIVE (ADV_UNIT * 12)
J
Jon Maloy 已提交
50 51 52 53 54 55

enum mbr_state {
	MBR_DISCOVERED,
	MBR_JOINING,
	MBR_PUBLISHED,
	MBR_JOINED,
56 57 58 59
	MBR_PENDING,
	MBR_ACTIVE,
	MBR_RECLAIMING,
	MBR_REMITTED,
J
Jon Maloy 已提交
60 61 62 63 64 65
	MBR_LEAVING
};

struct tipc_member {
	struct rb_node tree_node;
	struct list_head list;
J
Jon Maloy 已提交
66
	struct list_head small_win;
67
	struct sk_buff_head deferredq;
68
	struct tipc_group *group;
J
Jon Maloy 已提交
69 70
	u32 node;
	u32 port;
71
	u32 instance;
J
Jon Maloy 已提交
72
	enum mbr_state state;
73 74
	u16 advertised;
	u16 window;
J
Jon Maloy 已提交
75
	u16 bc_rcv_nxt;
76
	u16 bc_syncpt;
77
	u16 bc_acked;
78
	bool usr_pending;
J
Jon Maloy 已提交
79 80 81 82
};

struct tipc_group {
	struct rb_root members;
J
Jon Maloy 已提交
83
	struct list_head small_win;
84 85
	struct list_head pending;
	struct list_head active;
J
Jon Maloy 已提交
86 87 88 89 90 91 92 93 94
	struct tipc_nlist dests;
	struct net *net;
	int subid;
	u32 type;
	u32 instance;
	u32 domain;
	u32 scope;
	u32 portid;
	u16 member_cnt;
95 96
	u16 active_cnt;
	u16 max_active;
J
Jon Maloy 已提交
97
	u16 bc_snd_nxt;
98
	u16 bc_ackers;
J
Jon Maloy 已提交
99
	bool loopback;
100
	bool events;
J
Jon Maloy 已提交
101 102 103 104 105
};

static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
				  int mtyp, struct sk_buff_head *xmitq);

106 107 108
static void tipc_group_decr_active(struct tipc_group *grp,
				   struct tipc_member *m)
{
109 110
	if (m->state == MBR_ACTIVE || m->state == MBR_RECLAIMING ||
	    m->state == MBR_REMITTED)
111 112 113
		grp->active_cnt--;
}

114 115
static int tipc_group_rcvbuf_limit(struct tipc_group *grp)
{
116
	int max_active, active_pool, idle_pool;
117 118
	int mcnt = grp->member_cnt + 1;

119 120 121 122 123 124 125 126 127
	/* Limit simultaneous reception from other members */
	max_active = min(mcnt / 8, 64);
	max_active = max(max_active, 16);
	grp->max_active = max_active;

	/* Reserve blocks for active and idle members */
	active_pool = max_active * ADV_ACTIVE;
	idle_pool = (mcnt - max_active) * ADV_IDLE;

128
	/* Scale to bytes, considering worst-case truesize/msgsize ratio */
129
	return (active_pool + idle_pool) * FLOWCTL_BLK_SZ * 4;
130 131
}

J
Jon Maloy 已提交
132 133 134 135 136
u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
{
	return grp->bc_snd_nxt;
}

J
Jon Maloy 已提交
137
static bool tipc_group_is_receiver(struct tipc_member *m)
138
{
J
Jon Maloy 已提交
139
	return m && m->state != MBR_JOINING && m->state != MBR_LEAVING;
140 141
}

J
Jon Maloy 已提交
142
static bool tipc_group_is_sender(struct tipc_member *m)
J
Jon Maloy 已提交
143 144 145 146
{
	return m && m->state >= MBR_JOINED;
}

147 148 149 150 151 152 153
u32 tipc_group_exclude(struct tipc_group *grp)
{
	if (!grp->loopback)
		return grp->portid;
	return 0;
}

J
Jon Maloy 已提交
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
int tipc_group_size(struct tipc_group *grp)
{
	return grp->member_cnt;
}

struct tipc_group *tipc_group_create(struct net *net, u32 portid,
				     struct tipc_group_req *mreq)
{
	struct tipc_group *grp;
	u32 type = mreq->type;

	grp = kzalloc(sizeof(*grp), GFP_ATOMIC);
	if (!grp)
		return NULL;
	tipc_nlist_init(&grp->dests, tipc_own_addr(net));
J
Jon Maloy 已提交
169
	INIT_LIST_HEAD(&grp->small_win);
170 171
	INIT_LIST_HEAD(&grp->active);
	INIT_LIST_HEAD(&grp->pending);
J
Jon Maloy 已提交
172 173 174 175 176 177 178 179
	grp->members = RB_ROOT;
	grp->net = net;
	grp->portid = portid;
	grp->domain = addr_domain(net, mreq->scope);
	grp->type = type;
	grp->instance = mreq->instance;
	grp->scope = mreq->scope;
	grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
180
	grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS;
J
Jon Maloy 已提交
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
	if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid))
		return grp;
	kfree(grp);
	return NULL;
}

void tipc_group_delete(struct net *net, struct tipc_group *grp)
{
	struct rb_root *tree = &grp->members;
	struct tipc_member *m, *tmp;
	struct sk_buff_head xmitq;

	__skb_queue_head_init(&xmitq);

	rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) {
		tipc_group_proto_xmit(grp, m, GRP_LEAVE_MSG, &xmitq);
		list_del(&m->list);
		kfree(m);
	}
	tipc_node_distr_xmit(net, &xmitq);
	tipc_nlist_purge(&grp->dests);
	tipc_topsrv_kern_unsubscr(net, grp->subid);
	kfree(grp);
}

struct tipc_member *tipc_group_find_member(struct tipc_group *grp,
					   u32 node, u32 port)
{
	struct rb_node *n = grp->members.rb_node;
	u64 nkey, key = (u64)node << 32 | port;
	struct tipc_member *m;

	while (n) {
		m = container_of(n, struct tipc_member, tree_node);
		nkey = (u64)m->node << 32 | m->port;
		if (key < nkey)
			n = n->rb_left;
		else if (key > nkey)
			n = n->rb_right;
		else
			return m;
	}
	return NULL;
}

226 227 228 229 230 231
static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp,
						u32 node, u32 port)
{
	struct tipc_member *m;

	m = tipc_group_find_member(grp, node, port);
J
Jon Maloy 已提交
232
	if (m && tipc_group_is_receiver(m))
233 234 235 236
		return m;
	return NULL;
}

J
Jon Maloy 已提交
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
static struct tipc_member *tipc_group_find_node(struct tipc_group *grp,
						u32 node)
{
	struct tipc_member *m;
	struct rb_node *n;

	for (n = rb_first(&grp->members); n; n = rb_next(n)) {
		m = container_of(n, struct tipc_member, tree_node);
		if (m->node == node)
			return m;
	}
	return NULL;
}

static void tipc_group_add_to_tree(struct tipc_group *grp,
				   struct tipc_member *m)
{
	u64 nkey, key = (u64)m->node << 32 | m->port;
	struct rb_node **n, *parent = NULL;
	struct tipc_member *tmp;

	n = &grp->members.rb_node;
	while (*n) {
		tmp = container_of(*n, struct tipc_member, tree_node);
		parent = *n;
		tmp = container_of(parent, struct tipc_member, tree_node);
		nkey = (u64)tmp->node << 32 | tmp->port;
		if (key < nkey)
			n = &(*n)->rb_left;
		else if (key > nkey)
			n = &(*n)->rb_right;
		else
			return;
	}
	rb_link_node(&m->tree_node, parent, n);
	rb_insert_color(&m->tree_node, &grp->members);
}

static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
						    u32 node, u32 port,
						    int state)
{
	struct tipc_member *m;

	m = kzalloc(sizeof(*m), GFP_ATOMIC);
	if (!m)
		return NULL;
	INIT_LIST_HEAD(&m->list);
J
Jon Maloy 已提交
285
	INIT_LIST_HEAD(&m->small_win);
286
	__skb_queue_head_init(&m->deferredq);
287
	m->group = grp;
J
Jon Maloy 已提交
288 289
	m->node = node;
	m->port = port;
290
	m->bc_acked = grp->bc_snd_nxt - 1;
J
Jon Maloy 已提交
291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307
	grp->member_cnt++;
	tipc_group_add_to_tree(grp, m);
	tipc_nlist_add(&grp->dests, m->node);
	m->state = state;
	return m;
}

void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port)
{
	tipc_group_create_member(grp, node, port, MBR_DISCOVERED);
}

static void tipc_group_delete_member(struct tipc_group *grp,
				     struct tipc_member *m)
{
	rb_erase(&m->tree_node, &grp->members);
	grp->member_cnt--;
308 309 310 311 312

	/* Check if we were waiting for replicast ack from this member */
	if (grp->bc_ackers && less(m->bc_acked, grp->bc_snd_nxt - 1))
		grp->bc_ackers--;

J
Jon Maloy 已提交
313
	list_del_init(&m->list);
J
Jon Maloy 已提交
314
	list_del_init(&m->small_win);
315
	tipc_group_decr_active(grp, m);
J
Jon Maloy 已提交
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337

	/* If last member on a node, remove node from dest list */
	if (!tipc_group_find_node(grp, m->node))
		tipc_nlist_del(&grp->dests, m->node);

	kfree(m);
}

struct tipc_nlist *tipc_group_dests(struct tipc_group *grp)
{
	return &grp->dests;
}

void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
		     int *scope)
{
	seq->type = grp->type;
	seq->lower = grp->instance;
	seq->upper = grp->instance;
	*scope = grp->scope;
}

338 339 340 341 342
void tipc_group_update_member(struct tipc_member *m, int len)
{
	struct tipc_group *grp = m->group;
	struct tipc_member *_m, *tmp;

J
Jon Maloy 已提交
343
	if (!tipc_group_is_receiver(m))
344 345 346 347 348 349 350
		return;

	m->window -= len;

	if (m->window >= ADV_IDLE)
		return;

J
Jon Maloy 已提交
351
	list_del_init(&m->small_win);
352

J
Jon Maloy 已提交
353 354
	/* Sort member into small_window members' list */
	list_for_each_entry_safe(_m, tmp, &grp->small_win, small_win) {
355 356
		if (_m->window > m->window)
			break;
357
	}
358
	list_add_tail(&m->small_win, &_m->small_win);
359 360
}

361
void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack)
J
Jon Maloy 已提交
362
{
363
	u16 prev = grp->bc_snd_nxt - 1;
364 365
	struct tipc_member *m;
	struct rb_node *n;
366
	u16 ackers = 0;
367 368 369

	for (n = rb_first(&grp->members); n; n = rb_next(n)) {
		m = container_of(n, struct tipc_member, tree_node);
J
Jon Maloy 已提交
370
		if (tipc_group_is_receiver(m)) {
371
			tipc_group_update_member(m, len);
372
			m->bc_acked = prev;
373
			ackers++;
374
		}
375
	}
376 377 378

	/* Mark number of acknowledges to expect, if any */
	if (ack)
379
		grp->bc_ackers = ackers;
J
Jon Maloy 已提交
380 381 382
	grp->bc_snd_nxt++;
}

383 384
bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
		     int len, struct tipc_member **mbr)
385
{
386
	struct sk_buff_head xmitq;
387
	struct tipc_member *m;
388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406
	int adv, state;

	m = tipc_group_find_dest(grp, dnode, dport);
	*mbr = m;
	if (!m)
		return false;
	if (m->usr_pending)
		return true;
	if (m->window >= len)
		return false;
	m->usr_pending = true;

	/* If not fully advertised, do it now to prevent mutual blocking */
	adv = m->advertised;
	state = m->state;
	if (state < MBR_JOINED)
		return true;
	if (state == MBR_JOINED && adv == ADV_IDLE)
		return true;
407 408 409 410
	if (state == MBR_ACTIVE && adv == ADV_ACTIVE)
		return true;
	if (state == MBR_PENDING && adv == ADV_IDLE)
		return true;
411 412 413 414 415 416 417 418 419
	skb_queue_head_init(&xmitq);
	tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, &xmitq);
	tipc_node_distr_xmit(grp->net, &xmitq);
	return true;
}

bool tipc_group_bc_cong(struct tipc_group *grp, int len)
{
	struct tipc_member *m = NULL;
420

421 422 423 424
	/* If prev bcast was replicast, reject until all receivers have acked */
	if (grp->bc_ackers)
		return true;

J
Jon Maloy 已提交
425
	if (list_empty(&grp->small_win))
426 427
		return false;

J
Jon Maloy 已提交
428
	m = list_first_entry(&grp->small_win, struct tipc_member, small_win);
429 430 431
	if (m->window >= len)
		return false;

432
	return tipc_group_cong(grp, m->node, m->port, len, &m);
433 434
}

435 436 437 438 439 440 441 442 443
/* tipc_group_sort_msg() - sort msg into queue by bcast sequence number
 */
static void tipc_group_sort_msg(struct sk_buff *skb, struct sk_buff_head *defq)
{
	struct tipc_msg *_hdr, *hdr = buf_msg(skb);
	u16 bc_seqno = msg_grp_bc_seqno(hdr);
	struct sk_buff *_skb, *tmp;
	int mtyp = msg_type(hdr);

444
	/* Bcast/mcast may be bypassed by ucast or other bcast, - sort it in */
445 446 447 448 449 450 451 452 453 454 455 456 457 458
	if (mtyp == TIPC_GRP_BCAST_MSG || mtyp == TIPC_GRP_MCAST_MSG) {
		skb_queue_walk_safe(defq, _skb, tmp) {
			_hdr = buf_msg(_skb);
			if (!less(bc_seqno, msg_grp_bc_seqno(_hdr)))
				continue;
			__skb_queue_before(defq, _skb, skb);
			return;
		}
		/* Bcast was not bypassed, - add to tail */
	}
	/* Unicasts are never bypassed, - always add to tail */
	__skb_queue_tail(defq, skb);
}

J
Jon Maloy 已提交
459 460 461 462 463 464
/* tipc_group_filter_msg() - determine if we should accept arriving message
 */
void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
			   struct sk_buff_head *xmitq)
{
	struct sk_buff *skb = __skb_dequeue(inputq);
465
	bool ack, deliver, update, leave = false;
466
	struct sk_buff_head *defq;
J
Jon Maloy 已提交
467 468 469
	struct tipc_member *m;
	struct tipc_msg *hdr;
	u32 node, port;
470
	int mtyp, blks;
J
Jon Maloy 已提交
471 472 473 474 475 476 477 478 479 480 481 482

	if (!skb)
		return;

	hdr = buf_msg(skb);
	node =  msg_orignode(hdr);
	port = msg_origport(hdr);

	if (!msg_in_group(hdr))
		goto drop;

	m = tipc_group_find_member(grp, node, port);
J
Jon Maloy 已提交
483
	if (!tipc_group_is_sender(m))
J
Jon Maloy 已提交
484 485
		goto drop;

486 487
	if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt))
		goto drop;
488

489 490 491 492 493 494 495
	TIPC_SKB_CB(skb)->orig_member = m->instance;
	defq = &m->deferredq;
	tipc_group_sort_msg(skb, defq);

	while ((skb = skb_peek(defq))) {
		hdr = buf_msg(skb);
		mtyp = msg_type(hdr);
496
		blks = msg_blocks(hdr);
497
		deliver = true;
498
		ack = false;
499 500 501 502 503 504 505 506 507 508 509 510 511 512 513
		update = false;

		if (more(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt))
			break;

		/* Decide what to do with message */
		switch (mtyp) {
		case TIPC_GRP_MCAST_MSG:
			if (msg_nameinst(hdr) != grp->instance) {
				update = true;
				deliver = false;
			}
			/* Fall thru */
		case TIPC_GRP_BCAST_MSG:
			m->bc_rcv_nxt++;
514
			ack = msg_grp_bc_ack_req(hdr);
515 516 517
			break;
		case TIPC_GRP_UCAST_MSG:
			break;
518 519 520 521 522 523
		case TIPC_GRP_MEMBER_EVT:
			if (m->state == MBR_LEAVING)
				leave = true;
			if (!grp->events)
				deliver = false;
			break;
524 525
		default:
			break;
526 527
		}

528 529 530 531 532 533 534
		/* Execute decisions */
		__skb_dequeue(defq);
		if (deliver)
			__skb_queue_tail(inputq, skb);
		else
			kfree_skb(skb);

535 536 537
		if (ack)
			tipc_group_proto_xmit(grp, m, GRP_ACK_MSG, xmitq);

538 539
		if (leave) {
			__skb_queue_purge(defq);
J
Jon Maloy 已提交
540
			tipc_group_delete_member(grp, m);
541 542
			break;
		}
543 544
		if (!update)
			continue;
J
Jon Maloy 已提交
545

546 547
		tipc_group_update_rcv_win(grp, blks, node, port, xmitq);
	}
J
Jon Maloy 已提交
548 549 550 551 552
	return;
drop:
	kfree_skb(skb);
}

553 554 555
void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
			       u32 port, struct sk_buff_head *xmitq)
{
556 557 558 559
	struct list_head *active = &grp->active;
	int max_active = grp->max_active;
	int reclaim_limit = max_active * 3 / 4;
	int active_cnt = grp->active_cnt;
560
	struct tipc_member *m, *rm, *pm;
561 562 563 564 565 566 567 568 569

	m = tipc_group_find_member(grp, node, port);
	if (!m)
		return;

	m->advertised -= blks;

	switch (m->state) {
	case MBR_JOINED:
570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585
		/* First, decide if member can go active */
		if (active_cnt <= max_active) {
			m->state = MBR_ACTIVE;
			list_add_tail(&m->list, active);
			grp->active_cnt++;
			tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
		} else {
			m->state = MBR_PENDING;
			list_add_tail(&m->list, &grp->pending);
		}

		if (active_cnt < reclaim_limit)
			break;

		/* Reclaim from oldest active member, if possible */
		if (!list_empty(active)) {
586 587
			rm = list_first_entry(active, struct tipc_member, list);
			rm->state = MBR_RECLAIMING;
J
Jon Maloy 已提交
588
			list_del_init(&rm->list);
589 590 591
			tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq);
			break;
		}
592 593 594 595 596 597
		/* Nobody to reclaim from; - revert oldest pending to JOINED */
		pm = list_first_entry(&grp->pending, struct tipc_member, list);
		list_del_init(&pm->list);
		pm->state = MBR_JOINED;
		tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);
		break;
598 599 600 601 602 603 604 605 606 607 608
	case MBR_ACTIVE:
		if (!list_is_last(&m->list, &grp->active))
			list_move_tail(&m->list, &grp->active);
		if (m->advertised > (ADV_ACTIVE * 3 / 4))
			break;
		tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
		break;
	case MBR_REMITTED:
		if (m->advertised > ADV_IDLE)
			break;
		m->state = MBR_JOINED;
J
Jon Maloy 已提交
609
		grp->active_cnt--;
610 611
		if (m->advertised < ADV_IDLE) {
			pr_warn_ratelimited("Rcv unexpected msg after REMIT\n");
612
			tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
613
		}
J
Jon Maloy 已提交
614

615 616 617 618 619 620 621 622 623
		if (list_empty(&grp->pending))
			return;

		/* Set oldest pending member to active and advertise */
		pm = list_first_entry(&grp->pending, struct tipc_member, list);
		pm->state = MBR_ACTIVE;
		list_move_tail(&pm->list, &grp->active);
		grp->active_cnt++;
		tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);
624
		break;
625
	case MBR_RECLAIMING:
626 627 628 629 630 631 632 633
	case MBR_DISCOVERED:
	case MBR_JOINING:
	case MBR_LEAVING:
	default:
		break;
	}
}

634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667
static void tipc_group_create_event(struct tipc_group *grp,
				    struct tipc_member *m,
				    u32 event, u16 seqno,
				    struct sk_buff_head *inputq)
{	u32 dnode = tipc_own_addr(grp->net);
	struct tipc_event evt;
	struct sk_buff *skb;
	struct tipc_msg *hdr;

	evt.event = event;
	evt.found_lower = m->instance;
	evt.found_upper = m->instance;
	evt.port.ref = m->port;
	evt.port.node = m->node;
	evt.s.seq.type = grp->type;
	evt.s.seq.lower = m->instance;
	evt.s.seq.upper = m->instance;

	skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_GRP_MEMBER_EVT,
			      GROUP_H_SIZE, sizeof(evt), dnode, m->node,
			      grp->portid, m->port, 0);
	if (!skb)
		return;

	hdr = buf_msg(skb);
	msg_set_nametype(hdr, grp->type);
	msg_set_grp_evt(hdr, event);
	msg_set_dest_droppable(hdr, true);
	msg_set_grp_bc_seqno(hdr, seqno);
	memcpy(msg_data(hdr), &evt, sizeof(evt));
	TIPC_SKB_CB(skb)->orig_member = m->instance;
	__skb_queue_tail(inputq, skb);
}

J
Jon Maloy 已提交
668 669 670 671 672
static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
				  int mtyp, struct sk_buff_head *xmitq)
{
	struct tipc_msg *hdr;
	struct sk_buff *skb;
673
	int adv = 0;
J
Jon Maloy 已提交
674 675 676 677 678 679 680

	skb = tipc_msg_create(GROUP_PROTOCOL, mtyp, INT_H_SIZE, 0,
			      m->node, tipc_own_addr(grp->net),
			      m->port, grp->portid, 0);
	if (!skb)
		return;

681
	if (m->state == MBR_ACTIVE)
682
		adv = ADV_ACTIVE - m->advertised;
683 684
	else if (m->state == MBR_JOINED || m->state == MBR_PENDING)
		adv = ADV_IDLE - m->advertised;
685

J
Jon Maloy 已提交
686
	hdr = buf_msg(skb);
687 688

	if (mtyp == GRP_JOIN_MSG) {
J
Jon Maloy 已提交
689
		msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
690 691
		msg_set_adv_win(hdr, adv);
		m->advertised += adv;
692 693
	} else if (mtyp == GRP_LEAVE_MSG) {
		msg_set_grp_bc_syncpt(hdr, grp->bc_snd_nxt);
694 695 696
	} else if (mtyp == GRP_ADV_MSG) {
		msg_set_adv_win(hdr, adv);
		m->advertised += adv;
697 698
	} else if (mtyp == GRP_ACK_MSG) {
		msg_set_grp_bc_acked(hdr, m->bc_rcv_nxt);
699 700
	} else if (mtyp == GRP_REMIT_MSG) {
		msg_set_grp_remitted(hdr, m->window);
701
	}
J
Jon Maloy 已提交
702
	msg_set_dest_droppable(hdr, true);
J
Jon Maloy 已提交
703 704 705
	__skb_queue_tail(xmitq, skb);
}

706 707
void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
			  struct tipc_msg *hdr, struct sk_buff_head *inputq,
J
Jon Maloy 已提交
708 709 710 711
			  struct sk_buff_head *xmitq)
{
	u32 node = msg_orignode(hdr);
	u32 port = msg_origport(hdr);
712 713
	struct tipc_member *m, *pm;
	u16 remitted, in_flight;
J
Jon Maloy 已提交
714 715 716 717 718 719 720 721 722 723

	if (!grp)
		return;

	m = tipc_group_find_member(grp, node, port);

	switch (msg_type(hdr)) {
	case GRP_JOIN_MSG:
		if (!m)
			m = tipc_group_create_member(grp, node, port,
J
Jon Maloy 已提交
724
						     MBR_JOINING);
J
Jon Maloy 已提交
725 726
		if (!m)
			return;
727 728
		m->bc_syncpt = msg_grp_bc_syncpt(hdr);
		m->bc_rcv_nxt = m->bc_syncpt;
729
		m->window += msg_adv_win(hdr);
J
Jon Maloy 已提交
730 731

		/* Wait until PUBLISH event is received */
732
		if (m->state == MBR_DISCOVERED) {
J
Jon Maloy 已提交
733
			m->state = MBR_JOINING;
734
		} else if (m->state == MBR_PUBLISHED) {
J
Jon Maloy 已提交
735
			m->state = MBR_JOINED;
736 737 738
			*usr_wakeup = true;
			m->usr_pending = false;
			tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
739 740
			tipc_group_create_event(grp, m, TIPC_PUBLISHED,
						m->bc_syncpt, inputq);
741
		}
J
Jon Maloy 已提交
742
		list_del_init(&m->small_win);
743
		tipc_group_update_member(m, 0);
J
Jon Maloy 已提交
744 745 746 747
		return;
	case GRP_LEAVE_MSG:
		if (!m)
			return;
748
		m->bc_syncpt = msg_grp_bc_syncpt(hdr);
749
		list_del_init(&m->list);
J
Jon Maloy 已提交
750
		list_del_init(&m->small_win);
751
		*usr_wakeup = true;
J
Jon Maloy 已提交
752 753
		tipc_group_decr_active(grp, m);
		m->state = MBR_LEAVING;
754 755
		tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
					m->bc_syncpt, inputq);
756 757 758 759 760 761 762
		return;
	case GRP_ADV_MSG:
		if (!m)
			return;
		m->window += msg_adv_win(hdr);
		*usr_wakeup = m->usr_pending;
		m->usr_pending = false;
J
Jon Maloy 已提交
763
		list_del_init(&m->small_win);
J
Jon Maloy 已提交
764
		return;
765 766 767 768 769 770 771 772 773
	case GRP_ACK_MSG:
		if (!m)
			return;
		m->bc_acked = msg_grp_bc_acked(hdr);
		if (--grp->bc_ackers)
			break;
		*usr_wakeup = true;
		m->usr_pending = false;
		return;
774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791
	case GRP_RECLAIM_MSG:
		if (!m)
			return;
		*usr_wakeup = m->usr_pending;
		m->usr_pending = false;
		tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq);
		m->window = ADV_IDLE;
		return;
	case GRP_REMIT_MSG:
		if (!m || m->state != MBR_RECLAIMING)
			return;

		remitted = msg_grp_remitted(hdr);

		/* Messages preceding the REMIT still in receive queue */
		if (m->advertised > remitted) {
			m->state = MBR_REMITTED;
			in_flight = m->advertised - remitted;
792 793
			m->advertised = ADV_IDLE + in_flight;
			return;
794
		}
J
Jon Maloy 已提交
795
		/* This should never happen */
796
		if (m->advertised < remitted)
J
Jon Maloy 已提交
797
			pr_warn_ratelimited("Unexpected REMIT msg\n");
798

J
Jon Maloy 已提交
799 800
		/* All messages preceding the REMIT have been read */
		m->state = MBR_JOINED;
801
		grp->active_cnt--;
J
Jon Maloy 已提交
802
		m->advertised = ADV_IDLE;
803 804 805 806 807 808 809 810 811 812 813

		/* Set oldest pending member to active and advertise */
		if (list_empty(&grp->pending))
			return;
		pm = list_first_entry(&grp->pending, struct tipc_member, list);
		pm->state = MBR_ACTIVE;
		list_move_tail(&pm->list, &grp->active);
		grp->active_cnt++;
		if (pm->advertised <= (ADV_ACTIVE * 3 / 4))
			tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);
		return;
J
Jon Maloy 已提交
814 815 816 817 818
	default:
		pr_warn("Received unknown GROUP_PROTO message\n");
	}
}

819 820
/* tipc_group_member_evt() - receive and handle a member up/down event
 */
J
Jon Maloy 已提交
821
void tipc_group_member_evt(struct tipc_group *grp,
822 823
			   bool *usr_wakeup,
			   int *sk_rcvbuf,
824
			   struct tipc_msg *hdr,
825
			   struct sk_buff_head *inputq,
J
Jon Maloy 已提交
826 827 828
			   struct sk_buff_head *xmitq)
{
	struct tipc_event *evt = (void *)msg_data(hdr);
829
	u32 instance = evt->found_lower;
J
Jon Maloy 已提交
830 831
	u32 node = evt->port.node;
	u32 port = evt->port.ref;
832
	int event = evt->event;
J
Jon Maloy 已提交
833 834 835 836 837
	struct tipc_member *m;
	struct net *net;
	u32 self;

	if (!grp)
838
		return;
J
Jon Maloy 已提交
839 840 841 842

	net = grp->net;
	self = tipc_own_addr(net);
	if (!grp->loopback && node == self && port == grp->portid)
843
		return;
844

J
Jon Maloy 已提交
845 846
	m = tipc_group_find_member(grp, node, port);

847
	if (event == TIPC_PUBLISHED) {
J
Jon Maloy 已提交
848 849 850 851
		if (!m)
			m = tipc_group_create_member(grp, node, port,
						     MBR_DISCOVERED);
		if (!m)
852 853 854
			return;

		m->instance = instance;
J
Jon Maloy 已提交
855

856 857
		/* Hold back event if JOIN message not yet received */
		if (m->state == MBR_DISCOVERED) {
J
Jon Maloy 已提交
858
			m->state = MBR_PUBLISHED;
859
		} else {
860 861
			tipc_group_create_event(grp, m, TIPC_PUBLISHED,
						m->bc_syncpt, inputq);
J
Jon Maloy 已提交
862
			m->state = MBR_JOINED;
863 864
			*usr_wakeup = true;
			m->usr_pending = false;
865
		}
J
Jon Maloy 已提交
866
		tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
867
		tipc_group_update_member(m, 0);
868
	} else if (event == TIPC_WITHDRAWN) {
J
Jon Maloy 已提交
869
		if (!m)
870
			return;
871

872
		*usr_wakeup = true;
873
		m->usr_pending = false;
J
Jon Maloy 已提交
874 875
		tipc_group_decr_active(grp, m);
		m->state = MBR_LEAVING;
876
		list_del_init(&m->list);
J
Jon Maloy 已提交
877
		list_del_init(&m->small_win);
J
Jon Maloy 已提交
878 879 880 881 882

		/* Only send event if no LEAVE message can be expected */
		if (!tipc_node_is_up(net, node))
			tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
						m->bc_rcv_nxt, inputq);
J
Jon Maloy 已提交
883
	}
884
	*sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
J
Jon Maloy 已提交
885
}