msg.c 17.7 KB
Newer Older
P
Per Liden 已提交
1 2
/*
 * net/tipc/msg.c: TIPC message header routines
3
 *
4
 * Copyright (c) 2000-2006, 2014-2015, Ericsson AB
5
 * Copyright (c) 2005, 2010-2011, Wind River Systems
P
Per Liden 已提交
6 7
 * All rights reserved.
 *
P
Per Liden 已提交
8
 * Redistribution and use in source and binary forms, with or without
P
Per Liden 已提交
9 10
 * modification, are permitted provided that the following conditions are met:
 *
P
Per Liden 已提交
11 12 13 14 15 16 17 18
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 * 3. Neither the names of the copyright holders nor the names of its
 *    contributors may be used to endorse or promote products derived from
 *    this software without specific prior written permission.
P
Per Liden 已提交
19
 *
P
Per Liden 已提交
20 21 22 23 24 25 26 27 28 29 30 31 32 33
 * Alternatively, this software may be distributed under the terms of the
 * GNU General Public License ("GPL") version 2 as published by the Free
 * Software Foundation.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
P
Per Liden 已提交
34 35 36
 * POSSIBILITY OF SUCH DAMAGE.
 */

37
#include <net/sock.h>
P
Per Liden 已提交
38 39
#include "core.h"
#include "msg.h"
40 41
#include "addr.h"
#include "name_table.h"
P
Per Liden 已提交
42

43
#define MAX_FORWARD_SIZE 1024
44 45
#define BUF_HEADROOM (LL_MAX_HEADER + 48)
#define BUF_TAILROOM 16
46

47
static unsigned int align(unsigned int i)
48
{
49
	return (i + 3) & ~3u;
50 51
}

Y
Ying Xue 已提交
52 53 54 55 56 57 58 59 60
/**
 * tipc_buf_acquire - creates a TIPC message buffer
 * @size: message size (including TIPC header)
 *
 * Returns a new buffer with data pointers set to the specified size.
 *
 * NOTE: Headroom is reserved to allow prepending of a data link header.
 *       There may also be unrequested tailroom present at the buffer's end.
 */
61
struct sk_buff *tipc_buf_acquire(u32 size, gfp_t gfp)
Y
Ying Xue 已提交
62 63 64 65
{
	struct sk_buff *skb;
	unsigned int buf_size = (BUF_HEADROOM + size + 3) & ~3u;

66
	skb = alloc_skb_fclone(buf_size, gfp);
Y
Ying Xue 已提交
67 68 69 70 71 72 73 74
	if (skb) {
		skb_reserve(skb, BUF_HEADROOM);
		skb_put(skb, size);
		skb->next = NULL;
	}
	return skb;
}

75 76
void tipc_msg_init(u32 own_node, struct tipc_msg *m, u32 user, u32 type,
		   u32 hsize, u32 dnode)
77 78 79 80 81 82
{
	memset(m, 0, hsize);
	msg_set_version(m);
	msg_set_user(m, user);
	msg_set_hdr_sz(m, hsize);
	msg_set_size(m, hsize);
83
	msg_set_prevnode(m, own_node);
84
	msg_set_type(m, type);
85
	if (hsize > SHORT_H_SIZE) {
86 87
		msg_set_orignode(m, own_node);
		msg_set_destnode(m, dnode);
88 89 90
	}
}

91
struct sk_buff *tipc_msg_create(uint user, uint type,
92 93
				uint hdr_sz, uint data_sz, u32 dnode,
				u32 onode, u32 dport, u32 oport, int errcode)
94 95 96 97
{
	struct tipc_msg *msg;
	struct sk_buff *buf;

98
	buf = tipc_buf_acquire(hdr_sz + data_sz, GFP_ATOMIC);
99 100 101 102
	if (unlikely(!buf))
		return NULL;

	msg = buf_msg(buf);
103
	tipc_msg_init(onode, msg, user, type, hdr_sz, dnode);
104 105 106 107 108 109 110 111 112
	msg_set_size(msg, hdr_sz + data_sz);
	msg_set_origport(msg, oport);
	msg_set_destport(msg, dport);
	msg_set_errcode(msg, errcode);
	if (hdr_sz > SHORT_H_SIZE) {
		msg_set_orignode(msg, onode);
		msg_set_destnode(msg, dnode);
	}
	return buf;
113 114
}

115
/* tipc_buf_append(): Append a buffer to the fragment list of another buffer
116 117 118
 * @*headbuf: in:  NULL for first frag, otherwise value returned from prev call
 *            out: set when successful non-complete reassembly, otherwise NULL
 * @*buf:     in:  the buffer to append. Always defined
S
stephen hemminger 已提交
119
 *            out: head buf after successful complete reassembly, otherwise NULL
120
 * Returns 1 when reassembly complete, otherwise 0
121 122 123 124 125
 */
int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
{
	struct sk_buff *head = *headbuf;
	struct sk_buff *frag = *buf;
126
	struct sk_buff *tail = NULL;
127 128
	struct tipc_msg *msg;
	u32 fragid;
129
	int delta;
130
	bool headstolen;
131

132 133 134 135 136 137
	if (!frag)
		goto err;

	msg = buf_msg(frag);
	fragid = msg_type(msg);
	frag->next = NULL;
138 139 140
	skb_pull(frag, msg_hdr_sz(msg));

	if (fragid == FIRST_FRAGMENT) {
141 142 143 144
		if (unlikely(head))
			goto err;
		if (unlikely(skb_unclone(frag, GFP_ATOMIC)))
			goto err;
145
		head = *headbuf = frag;
146
		*buf = NULL;
147 148 149 150 151 152 153 154
		TIPC_SKB_CB(head)->tail = NULL;
		if (skb_is_nonlinear(head)) {
			skb_walk_frags(head, tail) {
				TIPC_SKB_CB(head)->tail = tail;
			}
		} else {
			skb_frag_list_init(head);
		}
155 156
		return 0;
	}
157

158
	if (!head)
159 160
		goto err;

161 162 163
	if (skb_try_coalesce(head, frag, &headstolen, &delta)) {
		kfree_skb_partial(frag, headstolen);
	} else {
164
		tail = TIPC_SKB_CB(head)->tail;
165 166 167 168 169 170 171 172 173
		if (!skb_has_frag_list(head))
			skb_shinfo(head)->frag_list = frag;
		else
			tail->next = frag;
		head->truesize += frag->truesize;
		head->data_len += frag->len;
		head->len += frag->len;
		TIPC_SKB_CB(head)->tail = frag;
	}
174

175
	if (fragid == LAST_FRAGMENT) {
176 177 178
		TIPC_SKB_CB(head)->validated = false;
		if (unlikely(!tipc_msg_validate(head)))
			goto err;
179 180 181 182 183 184 185
		*buf = head;
		TIPC_SKB_CB(head)->tail = NULL;
		*headbuf = NULL;
		return 1;
	}
	*buf = NULL;
	return 0;
186
err:
187
	kfree_skb(*buf);
188 189
	kfree_skb(*headbuf);
	*buf = *headbuf = NULL;
190 191
	return 0;
}
192

193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
/* tipc_msg_validate - validate basic format of received message
 *
 * This routine ensures a TIPC message has an acceptable header, and at least
 * as much data as the header indicates it should.  The routine also ensures
 * that the entire message header is stored in the main fragment of the message
 * buffer, to simplify future access to message header fields.
 *
 * Note: Having extra info present in the message header or data areas is OK.
 * TIPC will ignore the excess, under the assumption that it is optional info
 * introduced by a later release of the protocol.
 */
bool tipc_msg_validate(struct sk_buff *skb)
{
	struct tipc_msg *msg;
	int msz, hsz;

	if (unlikely(TIPC_SKB_CB(skb)->validated))
		return true;
	if (unlikely(!pskb_may_pull(skb, MIN_H_SIZE)))
		return false;

	hsz = msg_hdr_sz(buf_msg(skb));
	if (unlikely(hsz < MIN_H_SIZE) || (hsz > MAX_H_SIZE))
		return false;
	if (unlikely(!pskb_may_pull(skb, hsz)))
		return false;

	msg = buf_msg(skb);
	if (unlikely(msg_version(msg) != TIPC_VERSION))
		return false;

	msz = msg_size(msg);
	if (unlikely(msz < hsz))
		return false;
	if (unlikely((msz - hsz) > TIPC_MAX_USER_MSG_SIZE))
		return false;
	if (unlikely(skb->len < msz))
		return false;

	TIPC_SKB_CB(skb)->validated = true;
	return true;
}
235 236

/**
237
 * tipc_msg_build - create buffer chain containing specified header and data
238
 * @mhdr: Message header, to be prepended to data
239
 * @m: User message
240 241
 * @dsz: Total length of user data
 * @pktmax: Max packet size that can be used
242 243
 * @list: Buffer or chain of buffers to be returned to caller
 *
244 245
 * Returns message data size or errno: -ENOMEM, -EFAULT
 */
246
int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
247
		   int offset, int dsz, int pktmax, struct sk_buff_head *list)
248 249 250 251 252 253 254 255
{
	int mhsz = msg_hdr_sz(mhdr);
	int msz = mhsz + dsz;
	int pktno = 1;
	int pktsz;
	int pktrem = pktmax;
	int drem = dsz;
	struct tipc_msg pkthdr;
256
	struct sk_buff *skb;
257 258
	char *pktpos;
	int rc;
259

260 261 262 263
	msg_set_size(mhdr, msz);

	/* No fragmentation needed? */
	if (likely(msz <= pktmax)) {
264
		skb = tipc_buf_acquire(msz, GFP_KERNEL);
265
		if (unlikely(!skb))
266
			return -ENOMEM;
267
		skb_orphan(skb);
268 269 270
		__skb_queue_tail(list, skb);
		skb_copy_to_linear_data(skb, mhdr, mhsz);
		pktpos = skb->data + mhsz;
271
		if (copy_from_iter_full(pktpos, dsz, &m->msg_iter))
272 273 274 275 276 277
			return dsz;
		rc = -EFAULT;
		goto error;
	}

	/* Prepare reusable fragment header */
278 279
	tipc_msg_init(msg_prevnode(mhdr), &pkthdr, MSG_FRAGMENTER,
		      FIRST_FRAGMENT, INT_H_SIZE, msg_destnode(mhdr));
280 281
	msg_set_size(&pkthdr, pktmax);
	msg_set_fragm_no(&pkthdr, pktno);
282
	msg_set_importance(&pkthdr, msg_importance(mhdr));
283 284

	/* Prepare first fragment */
285
	skb = tipc_buf_acquire(pktmax, GFP_KERNEL);
286
	if (!skb)
287
		return -ENOMEM;
288
	skb_orphan(skb);
289 290 291
	__skb_queue_tail(list, skb);
	pktpos = skb->data;
	skb_copy_to_linear_data(skb, &pkthdr, INT_H_SIZE);
292 293
	pktpos += INT_H_SIZE;
	pktrem -= INT_H_SIZE;
294
	skb_copy_to_linear_data_offset(skb, INT_H_SIZE, mhdr, mhsz);
295 296 297 298 299 300 301
	pktpos += mhsz;
	pktrem -= mhsz;

	do {
		if (drem < pktrem)
			pktrem = drem;

302
		if (!copy_from_iter_full(pktpos, pktrem, &m->msg_iter)) {
303 304 305 306 307 308 309 310 311 312 313 314 315
			rc = -EFAULT;
			goto error;
		}
		drem -= pktrem;

		if (!drem)
			break;

		/* Prepare new fragment: */
		if (drem < (pktmax - INT_H_SIZE))
			pktsz = drem + INT_H_SIZE;
		else
			pktsz = pktmax;
316
		skb = tipc_buf_acquire(pktsz, GFP_KERNEL);
317
		if (!skb) {
318 319 320
			rc = -ENOMEM;
			goto error;
		}
321
		skb_orphan(skb);
322
		__skb_queue_tail(list, skb);
323 324 325
		msg_set_type(&pkthdr, FRAGMENT);
		msg_set_size(&pkthdr, pktsz);
		msg_set_fragm_no(&pkthdr, ++pktno);
326 327
		skb_copy_to_linear_data(skb, &pkthdr, INT_H_SIZE);
		pktpos = skb->data + INT_H_SIZE;
328 329 330
		pktrem = pktsz - INT_H_SIZE;

	} while (1);
331
	msg_set_type(buf_msg(skb), LAST_FRAGMENT);
332 333
	return dsz;
error:
334 335
	__skb_queue_purge(list);
	__skb_queue_head_init(list);
336 337 338
	return rc;
}

339 340
/**
 * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one
341 342
 * @skb: the buffer to append to ("bundle")
 * @msg:  message to be appended
343 344 345 346
 * @mtu:  max allowable size for the bundle buffer
 * Consumes buffer if successful
 * Returns true if bundling could be performed, otherwise false
 */
347
bool tipc_msg_bundle(struct sk_buff *skb, struct tipc_msg *msg, u32 mtu)
348
{
J
Jon Paul Maloy 已提交
349 350
	struct tipc_msg *bmsg;
	unsigned int bsz;
351
	unsigned int msz = msg_size(msg);
J
Jon Paul Maloy 已提交
352
	u32 start, pad;
353 354 355 356
	u32 max = mtu - INT_H_SIZE;

	if (likely(msg_user(msg) == MSG_FRAGMENTER))
		return false;
357
	if (!skb)
J
Jon Paul Maloy 已提交
358
		return false;
359
	bmsg = buf_msg(skb);
J
Jon Paul Maloy 已提交
360 361 362 363
	bsz = msg_size(bmsg);
	start = align(bsz);
	pad = start - bsz;

364
	if (unlikely(msg_user(msg) == TUNNEL_PROTOCOL))
365 366 367
		return false;
	if (unlikely(msg_user(msg) == BCAST_PROTOCOL))
		return false;
368
	if (unlikely(msg_user(bmsg) != MSG_BUNDLER))
369
		return false;
370
	if (unlikely(skb_tailroom(skb) < (pad + msz)))
371 372 373
		return false;
	if (unlikely(max < (start + msz)))
		return false;
374 375 376
	if ((msg_importance(msg) < TIPC_SYSTEM_IMPORTANCE) &&
	    (msg_importance(bmsg) == TIPC_SYSTEM_IMPORTANCE))
		return false;
377

378 379
	skb_put(skb, pad + msz);
	skb_copy_to_linear_data_offset(skb, start, msg, msz);
380 381 382 383 384
	msg_set_size(bmsg, start + msz);
	msg_set_msgcnt(bmsg, msg_msgcnt(bmsg) + 1);
	return true;
}

385 386
/**
 *  tipc_msg_extract(): extract bundled inner packet from buffer
387
 *  @skb: buffer to be extracted from.
388
 *  @iskb: extracted inner buffer, to be returned
389 390
 *  @pos: position in outer message of msg to be extracted.
 *        Returns position of next msg
391 392 393 394 395
 *  Consumes outer buffer when last packet extracted
 *  Returns true when when there is an extracted buffer, otherwise false
 */
bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos)
{
396
	struct tipc_msg *msg;
397
	int imsz, offset;
398

399
	*iskb = NULL;
400
	if (unlikely(skb_linearize(skb)))
401 402
		goto none;

403
	msg = buf_msg(skb);
404 405
	offset = msg_hdr_sz(msg) + *pos;
	if (unlikely(offset > (msg_size(msg) - MIN_H_SIZE)))
406 407
		goto none;

408 409
	*iskb = skb_clone(skb, GFP_ATOMIC);
	if (unlikely(!*iskb))
410
		goto none;
411 412 413 414
	skb_pull(*iskb, offset);
	imsz = msg_size(buf_msg(*iskb));
	skb_trim(*iskb, imsz);
	if (unlikely(!tipc_msg_validate(*iskb)))
415 416 417 418 419
		goto none;
	*pos += align(imsz);
	return true;
none:
	kfree_skb(skb);
420
	kfree_skb(*iskb);
421 422 423 424
	*iskb = NULL;
	return false;
}

425 426
/**
 * tipc_msg_make_bundle(): Create bundle buf and append message to its tail
427 428 429
 * @list: the buffer chain, where head is the buffer to replace/append
 * @skb: buffer to be created, appended to and returned in case of success
 * @msg: message to be appended
430
 * @mtu: max allowable size for the bundle buffer, inclusive header
431
 * @dnode: destination node for message. (Not always present in header)
S
stephen hemminger 已提交
432
 * Returns true if success, otherwise false
433
 */
434 435
bool tipc_msg_make_bundle(struct sk_buff **skb,  struct tipc_msg *msg,
			  u32 mtu, u32 dnode)
436
{
437
	struct sk_buff *_skb;
438 439 440 441 442 443
	struct tipc_msg *bmsg;
	u32 msz = msg_size(msg);
	u32 max = mtu - INT_H_SIZE;

	if (msg_user(msg) == MSG_FRAGMENTER)
		return false;
444
	if (msg_user(msg) == TUNNEL_PROTOCOL)
445 446 447 448 449 450
		return false;
	if (msg_user(msg) == BCAST_PROTOCOL)
		return false;
	if (msz > (max / 2))
		return false;

451
	_skb = tipc_buf_acquire(max, GFP_ATOMIC);
452
	if (!_skb)
453 454
		return false;

455 456
	skb_trim(_skb, INT_H_SIZE);
	bmsg = buf_msg(_skb);
457 458
	tipc_msg_init(msg_prevnode(msg), bmsg, MSG_BUNDLER, 0,
		      INT_H_SIZE, dnode);
459 460 461 462
	if (msg_isdata(msg))
		msg_set_importance(bmsg, TIPC_CRITICAL_IMPORTANCE);
	else
		msg_set_importance(bmsg, TIPC_SYSTEM_IMPORTANCE);
463 464 465
	msg_set_seqno(bmsg, msg_seqno(msg));
	msg_set_ack(bmsg, msg_ack(msg));
	msg_set_bcast_ack(bmsg, msg_bcast_ack(msg));
466 467
	tipc_msg_bundle(_skb, msg, mtu);
	*skb = _skb;
J
Jon Paul Maloy 已提交
468
	return true;
469
}
470 471 472

/**
 * tipc_msg_reverse(): swap source and destination addresses and add error code
473 474 475 476
 * @own_node: originating node id for reversed message
 * @skb:  buffer containing message to be reversed; may be replaced.
 * @err:  error code to be set in message, if any
 * Consumes buffer at failure
477 478
 * Returns true if success, otherwise false
 */
479
bool tipc_msg_reverse(u32 own_node,  struct sk_buff **skb, int err)
480
{
481
	struct sk_buff *_skb = *skb;
482
	struct tipc_msg *hdr;
483
	struct tipc_msg ohdr;
484
	int dlen;
485

486
	if (skb_linearize(_skb))
487
		goto exit;
488
	hdr = buf_msg(_skb);
489
	dlen = min_t(uint, msg_data_sz(hdr), MAX_FORWARD_SIZE);
490
	if (msg_dest_droppable(hdr))
491
		goto exit;
492
	if (msg_errcode(hdr))
493
		goto exit;
494 495 496 497 498 499

	/* Take a copy of original header before altering message */
	memcpy(&ohdr, hdr, msg_hdr_sz(hdr));

	/* Never return SHORT header; expand by replacing buffer if necessary */
	if (msg_short(hdr)) {
500
		*skb = tipc_buf_acquire(BASIC_H_SIZE + dlen, GFP_ATOMIC);
501 502 503 504 505 506 507 508
		if (!*skb)
			goto exit;
		memcpy((*skb)->data + BASIC_H_SIZE, msg_data(hdr), dlen);
		kfree_skb(_skb);
		_skb = *skb;
		hdr = buf_msg(_skb);
		memcpy(hdr, &ohdr, BASIC_H_SIZE);
		msg_set_hdr_sz(hdr, BASIC_H_SIZE);
509
	}
510

511
	if (skb_cloned(_skb) &&
512
	    pskb_expand_head(_skb, BUF_HEADROOM, BUF_TAILROOM, GFP_ATOMIC))
513 514
		goto exit;

515 516
	/* reassign after skb header modifications */
	hdr = buf_msg(_skb);
517 518
	/* Now reverse the concerned fields */
	msg_set_errcode(hdr, err);
519
	msg_set_non_seq(hdr, 0);
520 521 522 523 524 525 526 527
	msg_set_origport(hdr, msg_destport(&ohdr));
	msg_set_destport(hdr, msg_origport(&ohdr));
	msg_set_destnode(hdr, msg_prevnode(&ohdr));
	msg_set_prevnode(hdr, own_node);
	msg_set_orignode(hdr, own_node);
	msg_set_size(hdr, msg_hdr_sz(hdr) + dlen);
	skb_trim(_skb, msg_size(hdr));
	skb_orphan(_skb);
528 529
	return true;
exit:
530 531
	kfree_skb(_skb);
	*skb = NULL;
532 533
	return false;
}
534 535

/**
536 537
 * tipc_msg_lookup_dest(): try to find new destination for named message
 * @skb: the buffer containing the message.
538
 * @err: error code to be used by caller if lookup fails
539
 * Does not consume buffer
540
 * Returns true if a destination is found, false otherwise
541
 */
542
bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err)
543
{
544
	struct tipc_msg *msg = buf_msg(skb);
545 546
	u32 dport, dnode;
	u32 onode = tipc_own_addr(net);
547

548 549 550 551
	if (!msg_isdata(msg))
		return false;
	if (!msg_named(msg))
		return false;
552 553
	if (msg_errcode(msg))
		return false;
554
	*err = TIPC_ERR_NO_NAME;
555 556
	if (skb_linearize(skb))
		return false;
557
	msg = buf_msg(skb);
558
	if (msg_reroute_cnt(msg))
559
		return false;
560
	dnode = addr_domain(net, msg_lookup_scope(msg));
561
	dport = tipc_nametbl_translate(net, msg_nametype(msg),
562
				       msg_nameinst(msg), &dnode);
563
	if (!dport)
564
		return false;
565
	msg_incr_reroute_cnt(msg);
566 567 568
	if (dnode != onode)
		msg_set_prevnode(msg, onode);
	msg_set_destnode(msg, dnode);
569
	msg_set_destport(msg, dport);
570
	*err = TIPC_OK;
571 572 573 574 575 576 577 578

	if (!skb_cloned(skb))
		return true;

	/* Unclone buffer in case it was bundled */
	if (pskb_expand_head(skb, BUF_HEADROOM, BUF_TAILROOM, GFP_ATOMIC))
		return false;

579
	return true;
580
}
581 582 583 584

/* tipc_msg_reassemble() - clone a buffer chain of fragments and
 *                         reassemble the clones into one message
 */
585
bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq)
586
{
587
	struct sk_buff *skb, *_skb;
588
	struct sk_buff *frag = NULL;
589
	struct sk_buff *head = NULL;
590
	int hdr_len;
591 592

	/* Copy header if single buffer */
593 594
	if (skb_queue_len(list) == 1) {
		skb = skb_peek(list);
595 596 597 598 599 600
		hdr_len = skb_headroom(skb) + msg_hdr_sz(buf_msg(skb));
		_skb = __pskb_copy(skb, hdr_len, GFP_ATOMIC);
		if (!_skb)
			return false;
		__skb_queue_tail(rcvq, _skb);
		return true;
601 602 603
	}

	/* Clone all fragments and reassemble */
604 605
	skb_queue_walk(list, skb) {
		frag = skb_clone(skb, GFP_ATOMIC);
606 607 608 609 610 611 612 613
		if (!frag)
			goto error;
		frag->next = NULL;
		if (tipc_buf_append(&head, &frag))
			break;
		if (!head)
			goto error;
	}
614 615
	__skb_queue_tail(rcvq, frag);
	return true;
616 617 618
error:
	pr_warn("Failed do clone local mcast rcv buffer\n");
	kfree_skb(head);
619
	return false;
620
}
621

622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638
bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
			struct sk_buff_head *cpy)
{
	struct sk_buff *skb, *_skb;

	skb_queue_walk(msg, skb) {
		_skb = pskb_copy(skb, GFP_ATOMIC);
		if (!_skb) {
			__skb_queue_purge(cpy);
			return false;
		}
		msg_set_destnode(buf_msg(_skb), dst);
		__skb_queue_tail(cpy, _skb);
	}
	return true;
}

639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668
/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number
 * @list: list to be appended to
 * @seqno: sequence number of buffer to add
 * @skb: buffer to add
 */
void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
			     struct sk_buff *skb)
{
	struct sk_buff *_skb, *tmp;

	if (skb_queue_empty(list) || less(seqno, buf_seqno(skb_peek(list)))) {
		__skb_queue_head(list, skb);
		return;
	}

	if (more(seqno, buf_seqno(skb_peek_tail(list)))) {
		__skb_queue_tail(list, skb);
		return;
	}

	skb_queue_walk_safe(list, _skb, tmp) {
		if (more(seqno, buf_seqno(_skb)))
			continue;
		if (seqno == buf_seqno(_skb))
			break;
		__skb_queue_before(list, _skb, skb);
		return;
	}
	kfree_skb(skb);
}