socket.c 74.8 KB
Newer Older
P
Per Liden 已提交
1
/*
2
 * net/tipc/socket.c: TIPC socket API
3
 *
4
 * Copyright (c) 2001-2007, 2012-2015, Ericsson AB
5
 * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
P
Per Liden 已提交
6 7
 * All rights reserved.
 *
P
Per Liden 已提交
8
 * Redistribution and use in source and binary forms, with or without
P
Per Liden 已提交
9 10
 * modification, are permitted provided that the following conditions are met:
 *
P
Per Liden 已提交
11 12 13 14 15 16 17 18
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 * 3. Neither the names of the copyright holders nor the names of its
 *    contributors may be used to endorse or promote products derived from
 *    this software without specific prior written permission.
P
Per Liden 已提交
19
 *
P
Per Liden 已提交
20 21 22 23 24 25 26 27 28 29 30 31 32 33
 * Alternatively, this software may be distributed under the terms of the
 * GNU General Public License ("GPL") version 2 as published by the Free
 * Software Foundation.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
P
Per Liden 已提交
34 35 36
 * POSSIBILITY OF SUCH DAMAGE.
 */

37
#include <linux/rhashtable.h>
P
Per Liden 已提交
38
#include "core.h"
39
#include "name_table.h"
E
Erik Hugne 已提交
40
#include "node.h"
41
#include "link.h"
42
#include "name_distr.h"
43
#include "socket.h"
44
#include "bcast.h"
45
#include "netlink.h"
46

47
#define CONN_TIMEOUT_DEFAULT	8000	/* default connect timeout = 8s */
48
#define CONN_PROBING_INTERVAL	msecs_to_jiffies(3600000)  /* [ms] => 1 h */
49 50 51
#define TIPC_FWD_MSG		1
#define TIPC_MAX_PORT		0xffffffff
#define TIPC_MIN_PORT		1
52

53 54
enum {
	TIPC_LISTEN = TCP_LISTEN,
55
	TIPC_ESTABLISHED = TCP_ESTABLISHED,
56
	TIPC_OPEN = TCP_CLOSE,
57
	TIPC_DISCONNECTING = TCP_CLOSE_WAIT,
58 59
};

60 61 62 63 64 65 66
/**
 * struct tipc_sock - TIPC socket structure
 * @sk: socket - interacts with 'port' and with user via the socket API
 * @conn_type: TIPC type used when connection was established
 * @conn_instance: TIPC instance used when connection was established
 * @published: non-zero if port has one or more associated names
 * @max_pkt: maximum packet size "hint" used when building messages sent by port
67
 * @portid: unique port identity in TIPC socket hash table
68 69 70 71 72 73 74 75 76
 * @phdr: preformatted message header used when sending messages
 * @publications: list of publications for port
 * @pub_count: total # of publications port has made during its lifetime
 * @probing_state:
 * @conn_timeout: the time we can wait for an unresponded setup request
 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
 * @link_cong: non-zero if owner must sleep because of link congestion
 * @sent_unacked: # messages sent by socket, and not yet acked by peer
 * @rcv_unacked: # messages read by user, but not yet acked back to peer
77
 * @peer: 'connected' peer for dgram/rdm
78 79
 * @node: hash table node
 * @rcu: rcu struct for tipc_sock
80 81 82 83 84 85 86
 */
struct tipc_sock {
	struct sock sk;
	u32 conn_type;
	u32 conn_instance;
	int published;
	u32 max_pkt;
87
	u32 portid;
88 89 90 91 92 93
	struct tipc_msg phdr;
	struct list_head sock_list;
	struct list_head publications;
	u32 pub_count;
	uint conn_timeout;
	atomic_t dupl_rcvcnt;
94
	bool probe_unacked;
95
	bool link_cong;
96 97
	u16 snt_unacked;
	u16 snd_win;
98
	u16 peer_caps;
99 100
	u16 rcv_unacked;
	u16 rcv_win;
101
	struct sockaddr_tipc peer;
102 103
	struct rhash_head node;
	struct rcu_head rcu;
104
};
P
Per Liden 已提交
105

106
static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
107
static void tipc_data_ready(struct sock *sk);
108
static void tipc_write_space(struct sock *sk);
109
static void tipc_sock_destruct(struct sock *sk);
110 111
static int tipc_release(struct socket *sock);
static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
112
static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
113
static void tipc_sk_timeout(unsigned long data);
114
static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
J
Jon Paul Maloy 已提交
115
			   struct tipc_name_seq const *seq);
116
static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
J
Jon Paul Maloy 已提交
117
			    struct tipc_name_seq const *seq);
118
static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
119 120
static int tipc_sk_insert(struct tipc_sock *tsk);
static void tipc_sk_remove(struct tipc_sock *tsk);
121 122 123
static int __tipc_send_stream(struct socket *sock, struct msghdr *m,
			      size_t dsz);
static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
P
Per Liden 已提交
124

125 126 127
static const struct proto_ops packet_ops;
static const struct proto_ops stream_ops;
static const struct proto_ops msg_ops;
P
Per Liden 已提交
128 129
static struct proto tipc_proto;

130 131
static const struct rhashtable_params tsk_rht_params;

132
/*
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
 * Revised TIPC socket locking policy:
 *
 * Most socket operations take the standard socket lock when they start
 * and hold it until they finish (or until they need to sleep).  Acquiring
 * this lock grants the owner exclusive access to the fields of the socket
 * data structures, with the exception of the backlog queue.  A few socket
 * operations can be done without taking the socket lock because they only
 * read socket information that never changes during the life of the socket.
 *
 * Socket operations may acquire the lock for the associated TIPC port if they
 * need to perform an operation on the port.  If any routine needs to acquire
 * both the socket lock and the port lock it must take the socket lock first
 * to avoid the risk of deadlock.
 *
 * The dispatcher handling incoming messages cannot grab the socket lock in
 * the standard fashion, since invoked it runs at the BH level and cannot block.
 * Instead, it checks to see if the socket lock is currently owned by someone,
 * and either handles the message itself or adds it to the socket's backlog
 * queue; in the latter case the queued message is processed once the process
 * owning the socket lock releases it.
 *
 * NOTE: Releasing the socket lock while an operation is sleeping overcomes
 * the problem of a blocked socket operation preventing any other operations
 * from occurring.  However, applications must be careful if they have
 * multiple threads trying to send (or receive) on the same socket, as these
 * operations might interfere with each other.  For example, doing a connect
 * and a receive at the same time might allow the receive to consume the
 * ACK message meant for the connect.  While additional work could be done
 * to try and overcome this, it doesn't seem to be worthwhile at the present.
 *
 * NOTE: Releasing the socket lock while an operation is sleeping also ensures
 * that another operation that must be performed in a non-blocking manner is
 * not delayed for very long because the lock has already been taken.
 *
 * NOTE: This code assumes that certain fields of a port/socket pair are
 * constant over its lifetime; such fields can be examined without taking
 * the socket lock and/or port lock, and do not need to be re-read even
 * after resuming processing after waiting.  These fields include:
 *   - socket type
 *   - pointer to socket sk structure (aka tipc_sock structure)
 *   - pointer to port structure
 *   - port reference
 */

177 178 179 180 181
static u32 tsk_own_node(struct tipc_sock *tsk)
{
	return msg_prevnode(&tsk->phdr);
}

182
static u32 tsk_peer_node(struct tipc_sock *tsk)
183
{
184
	return msg_destnode(&tsk->phdr);
185 186
}

187
static u32 tsk_peer_port(struct tipc_sock *tsk)
188
{
189
	return msg_destport(&tsk->phdr);
190 191
}

192
static  bool tsk_unreliable(struct tipc_sock *tsk)
193
{
194
	return msg_src_droppable(&tsk->phdr) != 0;
195 196
}

197
static void tsk_set_unreliable(struct tipc_sock *tsk, bool unreliable)
198
{
199
	msg_set_src_droppable(&tsk->phdr, unreliable ? 1 : 0);
200 201
}

202
static bool tsk_unreturnable(struct tipc_sock *tsk)
203
{
204
	return msg_dest_droppable(&tsk->phdr) != 0;
205 206
}

207
static void tsk_set_unreturnable(struct tipc_sock *tsk, bool unreturnable)
208
{
209
	msg_set_dest_droppable(&tsk->phdr, unreturnable ? 1 : 0);
210 211
}

212
static int tsk_importance(struct tipc_sock *tsk)
213
{
214
	return msg_importance(&tsk->phdr);
215 216
}

217
static int tsk_set_importance(struct tipc_sock *tsk, int imp)
218 219 220
{
	if (imp > TIPC_CRITICAL_IMPORTANCE)
		return -EINVAL;
221
	msg_set_importance(&tsk->phdr, (u32)imp);
222 223
	return 0;
}
224

225 226 227 228 229
static struct tipc_sock *tipc_sk(const struct sock *sk)
{
	return container_of(sk, struct tipc_sock, sk);
}

230
static bool tsk_conn_cong(struct tipc_sock *tsk)
231
{
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
	return tsk->snt_unacked >= tsk->snd_win;
}

/* tsk_blocks(): translate a buffer size in bytes to number of
 * advertisable blocks, taking into account the ratio truesize(len)/len
 * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
 */
static u16 tsk_adv_blocks(int len)
{
	return len / FLOWCTL_BLK_SZ / 4;
}

/* tsk_inc(): increment counter for sent or received data
 * - If block based flow control is not supported by peer we
 *   fall back to message based ditto, incrementing the counter
 */
static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
{
	if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
		return ((msglen / FLOWCTL_BLK_SZ) + 1);
	return 1;
253 254
}

255
/**
256
 * tsk_advance_rx_queue - discard first buffer in socket receive queue
257 258
 *
 * Caller must hold socket lock
P
Per Liden 已提交
259
 */
260
static void tsk_advance_rx_queue(struct sock *sk)
P
Per Liden 已提交
261
{
262
	kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
P
Per Liden 已提交
263 264
}

265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
/* tipc_sk_respond() : send response message back to sender
 */
static void tipc_sk_respond(struct sock *sk, struct sk_buff *skb, int err)
{
	u32 selector;
	u32 dnode;
	u32 onode = tipc_own_addr(sock_net(sk));

	if (!tipc_msg_reverse(onode, &skb, err))
		return;

	dnode = msg_destnode(buf_msg(skb));
	selector = msg_origport(buf_msg(skb));
	tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
}

P
Per Liden 已提交
281
/**
282
 * tsk_rej_rx_queue - reject all buffers in socket receive queue
283 284
 *
 * Caller must hold socket lock
P
Per Liden 已提交
285
 */
286
static void tsk_rej_rx_queue(struct sock *sk)
P
Per Liden 已提交
287
{
288
	struct sk_buff *skb;
289

290 291
	while ((skb = __skb_dequeue(&sk->sk_receive_queue)))
		tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT);
P
Per Liden 已提交
292 293
}

294 295 296 297 298
static bool tipc_sk_connected(struct sock *sk)
{
	return sk->sk_socket->state == SS_CONNECTED;
}

299 300 301 302 303 304 305 306 307 308
/* tipc_sk_type_connectionless - check if the socket is datagram socket
 * @sk: socket
 *
 * Returns true if connection less, false otherwise
 */
static bool tipc_sk_type_connectionless(struct sock *sk)
{
	return sk->sk_type == SOCK_RDM || sk->sk_type == SOCK_DGRAM;
}

309
/* tsk_peer_msg - verify if message was sent by connected port's peer
J
Jon Paul Maloy 已提交
310 311 312 313
 *
 * Handles cases where the node's network address has changed from
 * the default of <0.0.0> to its configured setting.
 */
314
static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
J
Jon Paul Maloy 已提交
315
{
316 317
	struct sock *sk = &tsk->sk;
	struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id);
318
	u32 peer_port = tsk_peer_port(tsk);
J
Jon Paul Maloy 已提交
319 320 321
	u32 orig_node;
	u32 peer_node;

322
	if (unlikely(!tipc_sk_connected(sk)))
J
Jon Paul Maloy 已提交
323 324 325 326 327 328
		return false;

	if (unlikely(msg_origport(msg) != peer_port))
		return false;

	orig_node = msg_orignode(msg);
329
	peer_node = tsk_peer_node(tsk);
J
Jon Paul Maloy 已提交
330 331 332 333

	if (likely(orig_node == peer_node))
		return true;

334
	if (!orig_node && (peer_node == tn->own_addr))
J
Jon Paul Maloy 已提交
335 336
		return true;

337
	if (!peer_node && (orig_node == tn->own_addr))
J
Jon Paul Maloy 已提交
338 339 340 341 342
		return true;

	return false;
}

343 344 345 346 347 348 349 350 351 352
/* tipc_set_sk_state - set the sk_state of the socket
 * @sk: socket
 *
 * Caller must hold socket lock
 *
 * Returns 0 on success, errno otherwise
 */
static int tipc_set_sk_state(struct sock *sk, int state)
{
	int oldstate = sk->sk_socket->state;
353
	int oldsk_state = sk->sk_state;
354 355 356
	int res = -EINVAL;

	switch (state) {
357 358 359
	case TIPC_OPEN:
		res = 0;
		break;
360
	case TIPC_LISTEN:
361
		if (oldsk_state == TIPC_OPEN)
362 363
			res = 0;
		break;
364 365
	case TIPC_ESTABLISHED:
		if (oldstate == SS_CONNECTING ||
366
		    oldsk_state == TIPC_OPEN)
367 368
			res = 0;
		break;
369 370 371 372 373
	case TIPC_DISCONNECTING:
		if (oldstate == SS_CONNECTING ||
		    oldsk_state == TIPC_ESTABLISHED)
			res = 0;
		break;
374 375 376 377 378 379 380 381
	}

	if (!res)
		sk->sk_state = state;

	return res;
}

P
Per Liden 已提交
382
/**
383
 * tipc_sk_create - create a TIPC socket
384
 * @net: network namespace (must be default network)
P
Per Liden 已提交
385 386
 * @sock: pre-allocated socket structure
 * @protocol: protocol indicator (must be 0)
387
 * @kern: caused by kernel or by userspace?
388
 *
389 390
 * This routine creates additional data structures used by the TIPC socket,
 * initializes them, and links them together.
P
Per Liden 已提交
391 392 393
 *
 * Returns 0 on success, errno otherwise
 */
394 395
static int tipc_sk_create(struct net *net, struct socket *sock,
			  int protocol, int kern)
P
Per Liden 已提交
396
{
397
	struct tipc_net *tn;
398
	const struct proto_ops *ops;
P
Per Liden 已提交
399
	struct sock *sk;
400
	struct tipc_sock *tsk;
401
	struct tipc_msg *msg;
402 403

	/* Validate arguments */
P
Per Liden 已提交
404 405 406 407 408
	if (unlikely(protocol != 0))
		return -EPROTONOSUPPORT;

	switch (sock->type) {
	case SOCK_STREAM:
409
		ops = &stream_ops;
P
Per Liden 已提交
410 411
		break;
	case SOCK_SEQPACKET:
412
		ops = &packet_ops;
P
Per Liden 已提交
413 414 415
		break;
	case SOCK_DGRAM:
	case SOCK_RDM:
416
		ops = &msg_ops;
P
Per Liden 已提交
417
		break;
418 419
	default:
		return -EPROTOTYPE;
P
Per Liden 已提交
420 421
	}

422
	/* Allocate socket's protocol area */
423
	sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto, kern);
424
	if (sk == NULL)
P
Per Liden 已提交
425 426
		return -ENOMEM;

427
	tsk = tipc_sk(sk);
428 429 430
	tsk->max_pkt = MAX_PKT_DEFAULT;
	INIT_LIST_HEAD(&tsk->publications);
	msg = &tsk->phdr;
431 432
	tn = net_generic(sock_net(sk), tipc_net_id);
	tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
433
		      NAMED_H_SIZE, 0);
P
Per Liden 已提交
434

435 436 437
	/* Finish initializing socket data structures */
	sock->ops = ops;
	sock_init_data(sock, sk);
438
	tipc_set_sk_state(sk, TIPC_OPEN);
439
	if (tipc_sk_insert(tsk)) {
M
Masanari Iida 已提交
440
		pr_warn("Socket create failed; port number exhausted\n");
441 442 443
		return -EINVAL;
	}
	msg_set_origport(msg, tsk->portid);
444
	setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk);
445
	sk->sk_backlog_rcv = tipc_backlog_rcv;
446
	sk->sk_rcvbuf = sysctl_tipc_rmem[1];
447 448
	sk->sk_data_ready = tipc_data_ready;
	sk->sk_write_space = tipc_write_space;
449
	sk->sk_destruct = tipc_sock_destruct;
450 451
	tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
	atomic_set(&tsk->dupl_rcvcnt, 0);
452

453 454 455 456
	/* Start out with safe limits until we receive an advertised window */
	tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN);
	tsk->rcv_win = tsk->snd_win;

457
	if (tipc_sk_type_connectionless(sk)) {
458
		tsk_set_unreturnable(tsk, true);
459
		if (sock->type == SOCK_DGRAM)
460
			tsk_set_unreliable(tsk, true);
461
	}
462

P
Per Liden 已提交
463 464 465
	return 0;
}

466 467 468 469 470 471 472
static void tipc_sk_callback(struct rcu_head *head)
{
	struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);

	sock_put(&tsk->sk);
}

P
Per Liden 已提交
473
/**
474
 * tipc_release - destroy a TIPC socket
P
Per Liden 已提交
475 476 477 478 479 480 481
 * @sock: socket to destroy
 *
 * This routine cleans up any messages that are still queued on the socket.
 * For DGRAM and RDM socket types, all queued messages are rejected.
 * For SEQPACKET and STREAM socket types, the first message is rejected
 * and any others are discarded.  (If the first message on a STREAM socket
 * is partially-read, it is discarded and the next one is rejected instead.)
482
 *
P
Per Liden 已提交
483 484 485 486 487 488
 * NOTE: Rejected messages are not necessarily returned to the sender!  They
 * are returned or discarded according to the "destination droppable" setting
 * specified for the message by the sender.
 *
 * Returns 0 on success, errno otherwise
 */
489
static int tipc_release(struct socket *sock)
P
Per Liden 已提交
490 491
{
	struct sock *sk = sock->sk;
492
	struct net *net;
493
	struct tipc_sock *tsk;
494
	struct sk_buff *skb;
495
	u32 dnode;
P
Per Liden 已提交
496

497 498 499 500 501
	/*
	 * Exit if socket isn't fully initialized (occurs when a failed accept()
	 * releases a pre-allocated child socket that was never used)
	 */
	if (sk == NULL)
P
Per Liden 已提交
502
		return 0;
503

504
	net = sock_net(sk);
505
	tsk = tipc_sk(sk);
506 507 508 509 510 511
	lock_sock(sk);

	/*
	 * Reject all unreceived messages, except on an active connection
	 * (which disconnects locally & sends a 'FIN+' to peer)
	 */
512
	dnode = tsk_peer_node(tsk);
P
Per Liden 已提交
513
	while (sock->state != SS_DISCONNECTING) {
514 515
		skb = __skb_dequeue(&sk->sk_receive_queue);
		if (skb == NULL)
P
Per Liden 已提交
516
			break;
517
		if (TIPC_SKB_CB(skb)->bytes_read)
518
			kfree_skb(skb);
519 520 521 522
		else {
			if ((sock->state == SS_CONNECTING) ||
			    (sock->state == SS_CONNECTED)) {
				sock->state = SS_DISCONNECTING;
523
				tipc_node_remove_conn(net, dnode, tsk->portid);
524
			}
525
			tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT);
526
		}
P
Per Liden 已提交
527 528
	}

529
	tipc_sk_withdraw(tsk, 0, NULL);
530
	sk_stop_timer(sk, &sk->sk_timer);
531
	tipc_sk_remove(tsk);
532
	if (tipc_sk_connected(sk)) {
533
		skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
534
				      TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode,
535
				      tsk_own_node(tsk), tsk_peer_port(tsk),
536
				      tsk->portid, TIPC_ERR_NO_PORT);
537
		if (skb)
538
			tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
539
		tipc_node_remove_conn(net, dnode, tsk->portid);
540
	}
P
Per Liden 已提交
541

542 543 544
	/* Reject any messages that accumulated in backlog queue */
	sock->state = SS_DISCONNECTING;
	release_sock(sk);
545 546

	call_rcu(&tsk->rcu, tipc_sk_callback);
547
	sock->sk = NULL;
P
Per Liden 已提交
548

549
	return 0;
P
Per Liden 已提交
550 551 552
}

/**
553
 * tipc_bind - associate or disassocate TIPC name(s) with a socket
P
Per Liden 已提交
554 555 556
 * @sock: socket structure
 * @uaddr: socket address describing name(s) and desired operation
 * @uaddr_len: size of socket address data structure
557
 *
P
Per Liden 已提交
558 559 560
 * Name and name sequence binding is indicated using a positive scope value;
 * a negative scope value unbinds the specified name.  Specifying no name
 * (i.e. a socket address length of 0) unbinds all names from the socket.
561
 *
P
Per Liden 已提交
562
 * Returns 0 on success, errno otherwise
563 564 565
 *
 * NOTE: This routine doesn't need to take the socket lock since it doesn't
 *       access any non-constant socket information.
P
Per Liden 已提交
566
 */
567 568
static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
		     int uaddr_len)
P
Per Liden 已提交
569
{
570
	struct sock *sk = sock->sk;
P
Per Liden 已提交
571
	struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
572
	struct tipc_sock *tsk = tipc_sk(sk);
573
	int res = -EINVAL;
P
Per Liden 已提交
574

575 576
	lock_sock(sk);
	if (unlikely(!uaddr_len)) {
577
		res = tipc_sk_withdraw(tsk, 0, NULL);
578 579
		goto exit;
	}
580

581 582 583 584 585 586 587 588
	if (uaddr_len < sizeof(struct sockaddr_tipc)) {
		res = -EINVAL;
		goto exit;
	}
	if (addr->family != AF_TIPC) {
		res = -EAFNOSUPPORT;
		goto exit;
	}
P
Per Liden 已提交
589 590 591

	if (addr->addrtype == TIPC_ADDR_NAME)
		addr->addr.nameseq.upper = addr->addr.nameseq.lower;
592 593 594 595
	else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
		res = -EAFNOSUPPORT;
		goto exit;
	}
596

597
	if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
598
	    (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
599 600 601 602
	    (addr->addr.nameseq.type != TIPC_CFG_SRV)) {
		res = -EACCES;
		goto exit;
	}
603

604
	res = (addr->scope > 0) ?
605 606
		tipc_sk_publish(tsk, addr->scope, &addr->addr.nameseq) :
		tipc_sk_withdraw(tsk, -addr->scope, &addr->addr.nameseq);
607 608 609
exit:
	release_sock(sk);
	return res;
P
Per Liden 已提交
610 611
}

612
/**
613
 * tipc_getname - get port ID of socket or peer socket
P
Per Liden 已提交
614 615 616
 * @sock: socket structure
 * @uaddr: area for returned socket address
 * @uaddr_len: area for returned length of socket address
617
 * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
618
 *
P
Per Liden 已提交
619
 * Returns 0 on success, errno otherwise
620
 *
621 622
 * NOTE: This routine doesn't need to take the socket lock since it only
 *       accesses socket information that is unchanging (or which changes in
623
 *       a completely predictable manner).
P
Per Liden 已提交
624
 */
625 626
static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
			int *uaddr_len, int peer)
P
Per Liden 已提交
627 628
{
	struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
629 630
	struct sock *sk = sock->sk;
	struct tipc_sock *tsk = tipc_sk(sk);
631
	struct tipc_net *tn = net_generic(sock_net(sock->sk), tipc_net_id);
P
Per Liden 已提交
632

633
	memset(addr, 0, sizeof(*addr));
634
	if (peer) {
635
		if ((sock->state != SS_CONNECTED) &&
636
		    ((peer != 2) || (sk->sk_state != TIPC_DISCONNECTING)))
637
			return -ENOTCONN;
638 639
		addr->addr.id.ref = tsk_peer_port(tsk);
		addr->addr.id.node = tsk_peer_node(tsk);
640
	} else {
641
		addr->addr.id.ref = tsk->portid;
642
		addr->addr.id.node = tn->own_addr;
643
	}
P
Per Liden 已提交
644 645 646 647 648 649 650

	*uaddr_len = sizeof(*addr);
	addr->addrtype = TIPC_ADDR_ID;
	addr->family = AF_TIPC;
	addr->scope = 0;
	addr->addr.name.domain = 0;

651
	return 0;
P
Per Liden 已提交
652 653 654
}

/**
655
 * tipc_poll - read and possibly block on pollmask
P
Per Liden 已提交
656 657 658 659
 * @file: file structure associated with the socket
 * @sock: socket for which to calculate the poll bits
 * @wait: ???
 *
660 661 662 663 664 665 666 667
 * Returns pollmask value
 *
 * COMMENTARY:
 * It appears that the usual socket locking mechanisms are not useful here
 * since the pollmask info is potentially out-of-date the moment this routine
 * exits.  TCP and other protocols seem to rely on higher level poll routines
 * to handle any preventable race conditions, so TIPC will do the same ...
 *
668 669 670
 * IMPORTANT: The fact that a read or write operation is indicated does NOT
 * imply that the operation will succeed, merely that it should be performed
 * and will not block.
P
Per Liden 已提交
671
 */
672 673
static unsigned int tipc_poll(struct file *file, struct socket *sock,
			      poll_table *wait)
P
Per Liden 已提交
674
{
675
	struct sock *sk = sock->sk;
676
	struct tipc_sock *tsk = tipc_sk(sk);
677
	u32 mask = 0;
678

679
	sock_poll_wait(file, sk_sleep(sk), wait);
680

681 682
	switch ((int)sock->state) {
	case SS_CONNECTED:
683
		if (!tsk->link_cong && !tsk_conn_cong(tsk))
684 685 686 687 688 689 690 691 692
			mask |= POLLOUT;
		/* fall thru' */
	case SS_CONNECTING:
		if (!skb_queue_empty(&sk->sk_receive_queue))
			mask |= (POLLIN | POLLRDNORM);
		break;
	case SS_DISCONNECTING:
		mask = (POLLIN | POLLRDNORM | POLLHUP);
		break;
693 694 695 696 697 698 699 700 701
	default:
		switch (sk->sk_state) {
		case TIPC_OPEN:
			if (!tsk->link_cong)
				mask |= POLLOUT;
			if (tipc_sk_type_connectionless(sk) &&
			    (!skb_queue_empty(&sk->sk_receive_queue)))
				mask |= (POLLIN | POLLRDNORM);
			break;
702 703 704
		case TIPC_DISCONNECTING:
			mask = (POLLIN | POLLRDNORM | POLLHUP);
			break;
705 706 707 708 709
		case TIPC_LISTEN:
			if (!skb_queue_empty(&sk->sk_receive_queue))
				mask |= (POLLIN | POLLRDNORM);
			break;
		}
710
	}
711 712

	return mask;
P
Per Liden 已提交
713 714
}

715 716 717 718
/**
 * tipc_sendmcast - send multicast message
 * @sock: socket structure
 * @seq: destination address
719
 * @msg: message to send
720 721 722 723 724 725 726
 * @dsz: total length of message data
 * @timeo: timeout to wait for wakeup
 *
 * Called from function tipc_sendmsg(), which has done all sanity checks
 * Returns the number of bytes sent on success, or errno
 */
static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
727
			  struct msghdr *msg, size_t dsz, long timeo)
728 729
{
	struct sock *sk = sock->sk;
730
	struct tipc_sock *tsk = tipc_sk(sk);
731
	struct net *net = sock_net(sk);
732
	struct tipc_msg *mhdr = &tsk->phdr;
733
	struct sk_buff_head pktchain;
A
Al Viro 已提交
734
	struct iov_iter save = msg->msg_iter;
735 736 737
	uint mtu;
	int rc;

738 739 740
	if (!timeo && tsk->link_cong)
		return -ELINKCONG;

741 742 743 744 745 746 747 748 749
	msg_set_type(mhdr, TIPC_MCAST_MSG);
	msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
	msg_set_destport(mhdr, 0);
	msg_set_destnode(mhdr, 0);
	msg_set_nametype(mhdr, seq->type);
	msg_set_namelower(mhdr, seq->lower);
	msg_set_nameupper(mhdr, seq->upper);
	msg_set_hdr_sz(mhdr, MCAST_H_SIZE);

750 751
	skb_queue_head_init(&pktchain);

752
new_mtu:
753
	mtu = tipc_bcast_get_mtu(net);
754
	rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &pktchain);
755 756 757 758
	if (unlikely(rc < 0))
		return rc;

	do {
759
		rc = tipc_bcast_xmit(net, &pktchain);
760 761 762 763 764 765 766 767
		if (likely(!rc))
			return dsz;

		if (rc == -ELINKCONG) {
			tsk->link_cong = 1;
			rc = tipc_wait_for_sndmsg(sock, &timeo);
			if (!rc)
				continue;
768
		}
769
		__skb_queue_purge(&pktchain);
A
Al Viro 已提交
770 771
		if (rc == -EMSGSIZE) {
			msg->msg_iter = save;
772
			goto new_mtu;
A
Al Viro 已提交
773
		}
774 775
		break;
	} while (1);
776 777 778
	return rc;
}

779 780 781 782 783 784
/**
 * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
 * @arrvq: queue with arriving messages, to be cloned after destination lookup
 * @inputq: queue with cloned messages, delivered to socket after dest lookup
 *
 * Multi-threaded: parallel calls with reference to same queues may occur
785
 */
786 787
void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
		       struct sk_buff_head *inputq)
788
{
789
	struct tipc_msg *msg;
790 791
	struct tipc_plist dports;
	u32 portid;
792
	u32 scope = TIPC_CLUSTER_SCOPE;
793 794 795
	struct sk_buff_head tmpq;
	uint hsz;
	struct sk_buff *skb, *_skb;
796

797
	__skb_queue_head_init(&tmpq);
798
	tipc_plist_init(&dports);
799

800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820
	skb = tipc_skb_peek(arrvq, &inputq->lock);
	for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
		msg = buf_msg(skb);
		hsz = skb_headroom(skb) + msg_hdr_sz(msg);

		if (in_own_node(net, msg_orignode(msg)))
			scope = TIPC_NODE_SCOPE;

		/* Create destination port list and message clones: */
		tipc_nametbl_mc_translate(net,
					  msg_nametype(msg), msg_namelower(msg),
					  msg_nameupper(msg), scope, &dports);
		portid = tipc_plist_pop(&dports);
		for (; portid; portid = tipc_plist_pop(&dports)) {
			_skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
			if (_skb) {
				msg_set_destport(buf_msg(_skb), portid);
				__skb_queue_tail(&tmpq, _skb);
				continue;
			}
			pr_warn("Failed to clone mcast rcv buffer\n");
821
		}
822 823 824 825 826 827 828 829 830
		/* Append to inputq if not already done by other thread */
		spin_lock_bh(&inputq->lock);
		if (skb_peek(arrvq) == skb) {
			skb_queue_splice_tail_init(&tmpq, inputq);
			kfree_skb(__skb_dequeue(arrvq));
		}
		spin_unlock_bh(&inputq->lock);
		__skb_queue_purge(&tmpq);
		kfree_skb(skb);
831
	}
832
	tipc_sk_rcv(net, inputq);
833 834
}

835 836 837
/**
 * tipc_sk_proto_rcv - receive a connection mng protocol message
 * @tsk: receiving socket
838
 * @skb: pointer to message buffer.
839
 */
J
Jon Paul Maloy 已提交
840 841
static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
			      struct sk_buff_head *xmitq)
842
{
843
	struct sock *sk = &tsk->sk;
J
Jon Paul Maloy 已提交
844
	u32 onode = tsk_own_node(tsk);
845 846
	struct tipc_msg *hdr = buf_msg(skb);
	int mtyp = msg_type(hdr);
847
	bool conn_cong;
848

849
	/* Ignore if connection cannot be validated: */
850
	if (!tsk_peer_msg(tsk, hdr))
851 852
		goto exit;

853
	tsk->probe_unacked = false;
854

855 856
	if (mtyp == CONN_PROBE) {
		msg_set_type(hdr, CONN_PROBE_REPLY);
J
Jon Paul Maloy 已提交
857 858
		if (tipc_msg_reverse(onode, &skb, TIPC_OK))
			__skb_queue_tail(xmitq, skb);
859 860
		return;
	} else if (mtyp == CONN_ACK) {
861
		conn_cong = tsk_conn_cong(tsk);
862 863 864
		tsk->snt_unacked -= msg_conn_ack(hdr);
		if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
			tsk->snd_win = msg_adv_win(hdr);
865
		if (conn_cong)
866 867 868
			sk->sk_write_space(sk);
	} else if (mtyp != CONN_PROBE_REPLY) {
		pr_warn("Received unknown CONN_PROTO msg\n");
869 870
	}
exit:
871
	kfree_skb(skb);
872 873
}

874 875 876
static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
{
	struct sock *sk = sock->sk;
877
	struct tipc_sock *tsk = tipc_sk(sk);
878 879 880 881 882 883 884 885 886 887 888 889 890 891 892
	DEFINE_WAIT(wait);
	int done;

	do {
		int err = sock_error(sk);
		if (err)
			return err;
		if (sock->state == SS_DISCONNECTING)
			return -EPIPE;
		if (!*timeo_p)
			return -EAGAIN;
		if (signal_pending(current))
			return sock_intr_errno(*timeo_p);

		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
893
		done = sk_wait_event(sk, timeo_p, !tsk->link_cong);
894 895 896 897 898
		finish_wait(sk_sleep(sk), &wait);
	} while (!done);
	return 0;
}

P
Per Liden 已提交
899
/**
900
 * tipc_sendmsg - send message in connectionless manner
P
Per Liden 已提交
901 902
 * @sock: socket structure
 * @m: message to send
903
 * @dsz: amount of user data to be sent
904
 *
P
Per Liden 已提交
905
 * Message must have an destination specified explicitly.
906
 * Used for SOCK_RDM and SOCK_DGRAM messages,
P
Per Liden 已提交
907 908
 * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
 * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
909
 *
P
Per Liden 已提交
910 911
 * Returns the number of bytes sent on success, or errno otherwise
 */
912
static int tipc_sendmsg(struct socket *sock,
913
			struct msghdr *m, size_t dsz)
914 915 916 917 918 919 920 921 922 923 924 925
{
	struct sock *sk = sock->sk;
	int ret;

	lock_sock(sk);
	ret = __tipc_sendmsg(sock, m, dsz);
	release_sock(sk);

	return ret;
}

static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
P
Per Liden 已提交
926
{
927
	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
928
	struct sock *sk = sock->sk;
929
	struct tipc_sock *tsk = tipc_sk(sk);
930
	struct net *net = sock_net(sk);
931
	struct tipc_msg *mhdr = &tsk->phdr;
932
	u32 dnode, dport;
933
	struct sk_buff_head pktchain;
934
	bool is_connectionless = tipc_sk_type_connectionless(sk);
935
	struct sk_buff *skb;
936
	struct tipc_name_seq *seq;
A
Al Viro 已提交
937
	struct iov_iter save;
938
	u32 mtu;
939
	long timeo;
E
Erik Hugne 已提交
940
	int rc;
P
Per Liden 已提交
941

942
	if (dsz > TIPC_MAX_USER_MSG_SIZE)
943
		return -EMSGSIZE;
944
	if (unlikely(!dest)) {
945
		if (is_connectionless && tsk->peer.family == AF_TIPC)
946
			dest = &tsk->peer;
947 948 949 950 951 952
		else
			return -EDESTADDRREQ;
	} else if (unlikely(m->msg_namelen < sizeof(*dest)) ||
		   dest->family != AF_TIPC) {
		return -EINVAL;
	}
953
	if (!is_connectionless) {
954
		if (sk->sk_state == TIPC_LISTEN)
955
			return -EPIPE;
956
		if (sk->sk_state != TIPC_OPEN)
957 958 959
			return -EISCONN;
		if (tsk->published)
			return -EOPNOTSUPP;
960
		if (dest->addrtype == TIPC_ADDR_NAME) {
961 962
			tsk->conn_type = dest->addr.name.name.type;
			tsk->conn_instance = dest->addr.name.name.instance;
963
		}
P
Per Liden 已提交
964
	}
965
	seq = &dest->addr.nameseq;
966
	timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
967 968

	if (dest->addrtype == TIPC_ADDR_MCAST) {
969
		return tipc_sendmcast(sock, seq, m, dsz, timeo);
970 971 972 973 974 975 976 977 978 979 980
	} else if (dest->addrtype == TIPC_ADDR_NAME) {
		u32 type = dest->addr.name.name.type;
		u32 inst = dest->addr.name.name.instance;
		u32 domain = dest->addr.name.domain;

		dnode = domain;
		msg_set_type(mhdr, TIPC_NAMED_MSG);
		msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
		msg_set_nametype(mhdr, type);
		msg_set_nameinst(mhdr, inst);
		msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
981
		dport = tipc_nametbl_translate(net, type, inst, &dnode);
982 983
		msg_set_destnode(mhdr, dnode);
		msg_set_destport(mhdr, dport);
984 985
		if (unlikely(!dport && !dnode))
			return -EHOSTUNREACH;
986 987 988 989 990 991 992 993 994
	} else if (dest->addrtype == TIPC_ADDR_ID) {
		dnode = dest->addr.id.node;
		msg_set_type(mhdr, TIPC_DIRECT_MSG);
		msg_set_lookup_scope(mhdr, 0);
		msg_set_destnode(mhdr, dnode);
		msg_set_destport(mhdr, dest->addr.id.ref);
		msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
	}

995
	skb_queue_head_init(&pktchain);
A
Al Viro 已提交
996
	save = m->msg_iter;
997
new_mtu:
998
	mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
999
	rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &pktchain);
1000
	if (rc < 0)
1001
		return rc;
1002 1003

	do {
1004
		skb = skb_peek(&pktchain);
1005
		TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
1006
		rc = tipc_node_xmit(net, &pktchain, dnode, tsk->portid);
1007
		if (likely(!rc)) {
1008
			if (!is_connectionless)
1009
				sock->state = SS_CONNECTING;
1010
			return dsz;
1011
		}
1012 1013 1014 1015 1016 1017
		if (rc == -ELINKCONG) {
			tsk->link_cong = 1;
			rc = tipc_wait_for_sndmsg(sock, &timeo);
			if (!rc)
				continue;
		}
1018
		__skb_queue_purge(&pktchain);
A
Al Viro 已提交
1019 1020
		if (rc == -EMSGSIZE) {
			m->msg_iter = save;
1021
			goto new_mtu;
A
Al Viro 已提交
1022
		}
1023 1024
		break;
	} while (1);
1025 1026

	return rc;
P
Per Liden 已提交
1027 1028
}

1029 1030 1031
static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
{
	struct sock *sk = sock->sk;
1032
	struct tipc_sock *tsk = tipc_sk(sk);
1033 1034 1035 1036 1037 1038 1039
	DEFINE_WAIT(wait);
	int done;

	do {
		int err = sock_error(sk);
		if (err)
			return err;
1040
		if (sk->sk_state == TIPC_DISCONNECTING)
1041 1042 1043 1044 1045 1046 1047 1048 1049 1050
			return -EPIPE;
		else if (sock->state != SS_CONNECTED)
			return -ENOTCONN;
		if (!*timeo_p)
			return -EAGAIN;
		if (signal_pending(current))
			return sock_intr_errno(*timeo_p);

		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
		done = sk_wait_event(sk, timeo_p,
1051
				     (!tsk->link_cong &&
1052
				      !tsk_conn_cong(tsk)) ||
1053
				      !tipc_sk_connected(sk));
1054 1055 1056 1057 1058
		finish_wait(sk_sleep(sk), &wait);
	} while (!done);
	return 0;
}

1059
/**
1060
 * tipc_send_stream - send stream-oriented data
P
Per Liden 已提交
1061
 * @sock: socket structure
1062 1063
 * @m: data to send
 * @dsz: total length of data to be transmitted
1064
 *
1065
 * Used for SOCK_STREAM data.
1066
 *
1067 1068
 * Returns the number of bytes sent on success (or partial success),
 * or errno if no data sent
P
Per Liden 已提交
1069
 */
1070
static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082
{
	struct sock *sk = sock->sk;
	int ret;

	lock_sock(sk);
	ret = __tipc_send_stream(sock, m, dsz);
	release_sock(sk);

	return ret;
}

static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
P
Per Liden 已提交
1083
{
1084
	struct sock *sk = sock->sk;
1085
	struct net *net = sock_net(sk);
1086
	struct tipc_sock *tsk = tipc_sk(sk);
1087
	struct tipc_msg *mhdr = &tsk->phdr;
1088
	struct sk_buff_head pktchain;
1089
	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1090
	u32 portid = tsk->portid;
1091
	int rc = -EINVAL;
1092
	long timeo;
1093 1094
	u32 dnode;
	uint mtu, send, sent = 0;
A
Al Viro 已提交
1095
	struct iov_iter save;
1096
	int hlen = MIN_H_SIZE;
P
Per Liden 已提交
1097 1098

	/* Handle implied connection establishment */
1099
	if (unlikely(dest)) {
1100
		rc = __tipc_sendmsg(sock, m, dsz);
1101
		hlen = msg_hdr_sz(mhdr);
1102
		if (dsz && (dsz == rc))
1103
			tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
1104 1105 1106
		return rc;
	}
	if (dsz > (uint)INT_MAX)
1107 1108
		return -EMSGSIZE;

1109
	if (unlikely(sock->state != SS_CONNECTED)) {
1110
		if (sk->sk_state == TIPC_DISCONNECTING)
1111
			return -EPIPE;
1112
		else
1113
			return -ENOTCONN;
1114
	}
1115

1116
	timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1117 1118 1119
	if (!timeo && tsk->link_cong)
		return -ELINKCONG;

1120
	dnode = tsk_peer_node(tsk);
1121
	skb_queue_head_init(&pktchain);
1122 1123

next:
A
Al Viro 已提交
1124
	save = m->msg_iter;
1125
	mtu = tsk->max_pkt;
1126
	send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
1127
	rc = tipc_msg_build(mhdr, m, sent, send, mtu, &pktchain);
1128
	if (unlikely(rc < 0))
1129
		return rc;
1130

1131
	do {
1132
		if (likely(!tsk_conn_cong(tsk))) {
1133
			rc = tipc_node_xmit(net, &pktchain, dnode, portid);
1134
			if (likely(!rc)) {
1135
				tsk->snt_unacked += tsk_inc(tsk, send + hlen);
1136 1137
				sent += send;
				if (sent == dsz)
1138
					return dsz;
1139 1140 1141
				goto next;
			}
			if (rc == -EMSGSIZE) {
1142
				__skb_queue_purge(&pktchain);
1143 1144
				tsk->max_pkt = tipc_node_get_mtu(net, dnode,
								 portid);
A
Al Viro 已提交
1145
				m->msg_iter = save;
1146 1147 1148 1149
				goto next;
			}
			if (rc != -ELINKCONG)
				break;
1150

1151
			tsk->link_cong = 1;
1152 1153 1154
		}
		rc = tipc_wait_for_sndpkt(sock, &timeo);
	} while (!rc);
1155

1156
	__skb_queue_purge(&pktchain);
1157
	return sent ? sent : rc;
P
Per Liden 已提交
1158 1159
}

1160
/**
1161
 * tipc_send_packet - send a connection-oriented message
P
Per Liden 已提交
1162
 * @sock: socket structure
1163 1164
 * @m: message to send
 * @dsz: length of data to be transmitted
1165
 *
1166
 * Used for SOCK_SEQPACKET messages.
1167
 *
1168
 * Returns the number of bytes sent on success, or errno otherwise
P
Per Liden 已提交
1169
 */
1170
static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
P
Per Liden 已提交
1171
{
1172 1173
	if (dsz > TIPC_MAX_USER_MSG_SIZE)
		return -EMSGSIZE;
P
Per Liden 已提交
1174

1175
	return tipc_send_stream(sock, m, dsz);
P
Per Liden 已提交
1176 1177
}

1178
/* tipc_sk_finish_conn - complete the setup of a connection
P
Per Liden 已提交
1179
 */
1180
static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1181
				u32 peer_node)
P
Per Liden 已提交
1182
{
1183 1184
	struct sock *sk = &tsk->sk;
	struct net *net = sock_net(sk);
1185
	struct tipc_msg *msg = &tsk->phdr;
P
Per Liden 已提交
1186

1187 1188 1189 1190 1191
	msg_set_destnode(msg, peer_node);
	msg_set_destport(msg, peer_port);
	msg_set_type(msg, TIPC_CONN_MSG);
	msg_set_lookup_scope(msg, 0);
	msg_set_hdr_sz(msg, SHORT_H_SIZE);
1192

1193
	sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTERVAL);
1194
	tipc_set_sk_state(sk, TIPC_ESTABLISHED);
1195 1196
	tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
	tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
1197
	tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
1198 1199 1200 1201 1202 1203
	if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
		return;

	/* Fall back to message based flow control */
	tsk->rcv_win = FLOWCTL_MSG_WIN;
	tsk->snd_win = FLOWCTL_MSG_WIN;
P
Per Liden 已提交
1204 1205 1206 1207 1208 1209
}

/**
 * set_orig_addr - capture sender's address for received message
 * @m: descriptor for message info
 * @msg: received message header
1210
 *
P
Per Liden 已提交
1211 1212
 * Note: Address is not captured if not requested by receiver.
 */
S
Sam Ravnborg 已提交
1213
static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
P
Per Liden 已提交
1214
{
1215
	DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name);
P
Per Liden 已提交
1216

1217
	if (addr) {
P
Per Liden 已提交
1218 1219
		addr->family = AF_TIPC;
		addr->addrtype = TIPC_ADDR_ID;
1220
		memset(&addr->addr, 0, sizeof(addr->addr));
P
Per Liden 已提交
1221 1222
		addr->addr.id.ref = msg_origport(msg);
		addr->addr.id.node = msg_orignode(msg);
1223 1224
		addr->addr.name.domain = 0;	/* could leave uninitialized */
		addr->scope = 0;		/* could leave uninitialized */
P
Per Liden 已提交
1225 1226 1227 1228 1229
		m->msg_namelen = sizeof(struct sockaddr_tipc);
	}
}

/**
1230
 * tipc_sk_anc_data_recv - optionally capture ancillary data for received message
P
Per Liden 已提交
1231 1232
 * @m: descriptor for message info
 * @msg: received message header
1233
 * @tsk: TIPC port associated with message
1234
 *
P
Per Liden 已提交
1235
 * Note: Ancillary data is not captured if not requested by receiver.
1236
 *
P
Per Liden 已提交
1237 1238
 * Returns 0 if successful, otherwise errno
 */
1239 1240
static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
				 struct tipc_sock *tsk)
P
Per Liden 已提交
1241 1242 1243 1244
{
	u32 anc_data[3];
	u32 err;
	u32 dest_type;
1245
	int has_name;
P
Per Liden 已提交
1246 1247 1248 1249 1250 1251 1252 1253 1254 1255
	int res;

	if (likely(m->msg_controllen == 0))
		return 0;

	/* Optionally capture errored message object(s) */
	err = msg ? msg_errcode(msg) : 0;
	if (unlikely(err)) {
		anc_data[0] = err;
		anc_data[1] = msg_data_sz(msg);
1256 1257
		res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
		if (res)
P
Per Liden 已提交
1258
			return res;
1259 1260 1261 1262 1263 1264
		if (anc_data[1]) {
			res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
				       msg_data(msg));
			if (res)
				return res;
		}
P
Per Liden 已提交
1265 1266 1267 1268 1269 1270
	}

	/* Optionally capture message destination object */
	dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
	switch (dest_type) {
	case TIPC_NAMED_MSG:
1271
		has_name = 1;
P
Per Liden 已提交
1272 1273 1274 1275 1276
		anc_data[0] = msg_nametype(msg);
		anc_data[1] = msg_namelower(msg);
		anc_data[2] = msg_namelower(msg);
		break;
	case TIPC_MCAST_MSG:
1277
		has_name = 1;
P
Per Liden 已提交
1278 1279 1280 1281 1282
		anc_data[0] = msg_nametype(msg);
		anc_data[1] = msg_namelower(msg);
		anc_data[2] = msg_nameupper(msg);
		break;
	case TIPC_CONN_MSG:
1283 1284 1285 1286
		has_name = (tsk->conn_type != 0);
		anc_data[0] = tsk->conn_type;
		anc_data[1] = tsk->conn_instance;
		anc_data[2] = tsk->conn_instance;
P
Per Liden 已提交
1287 1288
		break;
	default:
1289
		has_name = 0;
P
Per Liden 已提交
1290
	}
1291 1292 1293 1294 1295
	if (has_name) {
		res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
		if (res)
			return res;
	}
P
Per Liden 已提交
1296 1297 1298 1299

	return 0;
}

1300
static void tipc_sk_send_ack(struct tipc_sock *tsk)
1301
{
1302 1303
	struct sock *sk = &tsk->sk;
	struct net *net = sock_net(sk);
1304
	struct sk_buff *skb = NULL;
1305
	struct tipc_msg *msg;
1306 1307
	u32 peer_port = tsk_peer_port(tsk);
	u32 dnode = tsk_peer_node(tsk);
1308

1309
	if (!tipc_sk_connected(sk))
1310
		return;
1311 1312 1313
	skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
			      dnode, tsk_own_node(tsk), peer_port,
			      tsk->portid, TIPC_OK);
1314
	if (!skb)
1315
		return;
1316
	msg = buf_msg(skb);
1317 1318 1319 1320 1321 1322 1323 1324
	msg_set_conn_ack(msg, tsk->rcv_unacked);
	tsk->rcv_unacked = 0;

	/* Adjust to and advertize the correct window limit */
	if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) {
		tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
		msg_set_adv_win(msg, tsk->rcv_win);
	}
1325
	tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
1326 1327
}

1328
static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
Y
Ying Xue 已提交
1329 1330 1331
{
	struct sock *sk = sock->sk;
	DEFINE_WAIT(wait);
1332
	long timeo = *timeop;
Y
Ying Xue 已提交
1333 1334 1335 1336
	int err;

	for (;;) {
		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1337
		if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
Y
Ying Xue 已提交
1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351
			if (sock->state == SS_DISCONNECTING) {
				err = -ENOTCONN;
				break;
			}
			release_sock(sk);
			timeo = schedule_timeout(timeo);
			lock_sock(sk);
		}
		err = 0;
		if (!skb_queue_empty(&sk->sk_receive_queue))
			break;
		err = -EAGAIN;
		if (!timeo)
			break;
1352 1353 1354
		err = sock_intr_errno(timeo);
		if (signal_pending(current))
			break;
Y
Ying Xue 已提交
1355 1356
	}
	finish_wait(sk_sleep(sk), &wait);
1357
	*timeop = timeo;
Y
Ying Xue 已提交
1358 1359 1360
	return err;
}

1361
/**
1362
 * tipc_recvmsg - receive packet-oriented message
P
Per Liden 已提交
1363 1364 1365
 * @m: descriptor for message info
 * @buf_len: total size of user buffer area
 * @flags: receive flags
1366
 *
P
Per Liden 已提交
1367 1368 1369 1370 1371
 * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
 * If the complete message doesn't fit in user area, truncate it.
 *
 * Returns size of returned message data, errno otherwise
 */
1372 1373
static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
			int flags)
P
Per Liden 已提交
1374
{
1375
	struct sock *sk = sock->sk;
1376
	struct tipc_sock *tsk = tipc_sk(sk);
P
Per Liden 已提交
1377 1378
	struct sk_buff *buf;
	struct tipc_msg *msg;
1379
	bool is_connectionless = tipc_sk_type_connectionless(sk);
Y
Ying Xue 已提交
1380
	long timeo;
P
Per Liden 已提交
1381 1382
	unsigned int sz;
	u32 err;
1383
	int res, hlen;
P
Per Liden 已提交
1384

1385
	/* Catch invalid receive requests */
P
Per Liden 已提交
1386 1387 1388
	if (unlikely(!buf_len))
		return -EINVAL;

1389
	lock_sock(sk);
P
Per Liden 已提交
1390

1391
	if (!is_connectionless && unlikely(sk->sk_state == TIPC_OPEN)) {
1392
		res = -ENOTCONN;
P
Per Liden 已提交
1393 1394 1395
		goto exit;
	}

Y
Ying Xue 已提交
1396
	timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1397
restart:
P
Per Liden 已提交
1398

1399
	/* Look for a message in receive queue; wait if necessary */
1400
	res = tipc_wait_for_rcvmsg(sock, &timeo);
Y
Ying Xue 已提交
1401 1402
	if (res)
		goto exit;
P
Per Liden 已提交
1403

1404 1405
	/* Look at first message in receive queue */
	buf = skb_peek(&sk->sk_receive_queue);
P
Per Liden 已提交
1406 1407
	msg = buf_msg(buf);
	sz = msg_data_sz(msg);
1408
	hlen = msg_hdr_sz(msg);
P
Per Liden 已提交
1409 1410 1411 1412
	err = msg_errcode(msg);

	/* Discard an empty non-errored message & try again */
	if ((!sz) && (!err)) {
1413
		tsk_advance_rx_queue(sk);
P
Per Liden 已提交
1414 1415 1416 1417 1418 1419 1420
		goto restart;
	}

	/* Capture sender's address (optional) */
	set_orig_addr(m, msg);

	/* Capture ancillary data (optional) */
1421
	res = tipc_sk_anc_data_recv(m, msg, tsk);
1422
	if (res)
P
Per Liden 已提交
1423 1424 1425 1426 1427 1428 1429 1430
		goto exit;

	/* Capture message data (if valid) & compute return value (always) */
	if (!err) {
		if (unlikely(buf_len < sz)) {
			sz = buf_len;
			m->msg_flags |= MSG_TRUNC;
		}
1431
		res = skb_copy_datagram_msg(buf, hlen, m, sz);
1432
		if (res)
P
Per Liden 已提交
1433 1434 1435
			goto exit;
		res = sz;
	} else {
1436 1437
		if (is_connectionless || err == TIPC_CONN_SHUTDOWN ||
		    m->msg_control)
P
Per Liden 已提交
1438 1439 1440 1441 1442
			res = 0;
		else
			res = -ECONNRESET;
	}

1443 1444 1445
	if (unlikely(flags & MSG_PEEK))
		goto exit;

1446
	if (likely(!is_connectionless)) {
1447 1448 1449
		tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
		if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
			tipc_sk_send_ack(tsk);
1450
	}
1451
	tsk_advance_rx_queue(sk);
P
Per Liden 已提交
1452
exit:
1453
	release_sock(sk);
P
Per Liden 已提交
1454 1455 1456
	return res;
}

1457
/**
1458
 * tipc_recv_stream - receive stream-oriented data
P
Per Liden 已提交
1459 1460 1461
 * @m: descriptor for message info
 * @buf_len: total size of user buffer area
 * @flags: receive flags
1462 1463
 *
 * Used for SOCK_STREAM messages only.  If not enough data is available
P
Per Liden 已提交
1464 1465 1466 1467
 * will optionally wait for more; never truncates data.
 *
 * Returns size of returned message data, errno otherwise
 */
1468 1469
static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
			    size_t buf_len, int flags)
P
Per Liden 已提交
1470
{
1471
	struct sock *sk = sock->sk;
1472
	struct tipc_sock *tsk = tipc_sk(sk);
P
Per Liden 已提交
1473 1474
	struct sk_buff *buf;
	struct tipc_msg *msg;
Y
Ying Xue 已提交
1475
	long timeo;
P
Per Liden 已提交
1476
	unsigned int sz;
1477
	int target;
P
Per Liden 已提交
1478 1479
	int sz_copied = 0;
	u32 err;
1480
	int res = 0, hlen;
P
Per Liden 已提交
1481

1482
	/* Catch invalid receive attempts */
P
Per Liden 已提交
1483 1484 1485
	if (unlikely(!buf_len))
		return -EINVAL;

1486
	lock_sock(sk);
P
Per Liden 已提交
1487

1488
	if (unlikely(sk->sk_state == TIPC_OPEN)) {
1489
		res = -ENOTCONN;
P
Per Liden 已提交
1490 1491 1492
		goto exit;
	}

1493
	target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
Y
Ying Xue 已提交
1494
	timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
P
Per Liden 已提交
1495

1496
restart:
1497
	/* Look for a message in receive queue; wait if necessary */
1498
	res = tipc_wait_for_rcvmsg(sock, &timeo);
Y
Ying Xue 已提交
1499 1500
	if (res)
		goto exit;
P
Per Liden 已提交
1501

1502 1503
	/* Look at first message in receive queue */
	buf = skb_peek(&sk->sk_receive_queue);
P
Per Liden 已提交
1504 1505
	msg = buf_msg(buf);
	sz = msg_data_sz(msg);
1506
	hlen = msg_hdr_sz(msg);
P
Per Liden 已提交
1507 1508 1509 1510
	err = msg_errcode(msg);

	/* Discard an empty non-errored message & try again */
	if ((!sz) && (!err)) {
1511
		tsk_advance_rx_queue(sk);
P
Per Liden 已提交
1512 1513 1514 1515 1516 1517
		goto restart;
	}

	/* Optionally capture sender's address & ancillary data of first msg */
	if (sz_copied == 0) {
		set_orig_addr(m, msg);
1518
		res = tipc_sk_anc_data_recv(m, msg, tsk);
1519
		if (res)
P
Per Liden 已提交
1520 1521 1522 1523 1524
			goto exit;
	}

	/* Capture message data (if valid) & compute return value (always) */
	if (!err) {
1525 1526 1527
		u32 offset = TIPC_SKB_CB(buf)->bytes_read;
		u32 needed;
		int sz_to_copy;
P
Per Liden 已提交
1528

1529
		sz -= offset;
P
Per Liden 已提交
1530
		needed = (buf_len - sz_copied);
1531
		sz_to_copy = min(sz, needed);
1532

1533
		res = skb_copy_datagram_msg(buf, hlen + offset, m, sz_to_copy);
1534
		if (res)
P
Per Liden 已提交
1535
			goto exit;
1536

P
Per Liden 已提交
1537 1538 1539 1540
		sz_copied += sz_to_copy;

		if (sz_to_copy < sz) {
			if (!(flags & MSG_PEEK))
1541 1542
				TIPC_SKB_CB(buf)->bytes_read =
					offset + sz_to_copy;
P
Per Liden 已提交
1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554
			goto exit;
		}
	} else {
		if (sz_copied != 0)
			goto exit; /* can't add error msg to valid data */

		if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
			res = 0;
		else
			res = -ECONNRESET;
	}

1555 1556 1557 1558 1559 1560 1561
	if (unlikely(flags & MSG_PEEK))
		goto exit;

	tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
	if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
		tipc_sk_send_ack(tsk);
	tsk_advance_rx_queue(sk);
P
Per Liden 已提交
1562 1563

	/* Loop around if more data is required */
1564 1565
	if ((sz_copied < buf_len) &&	/* didn't get all requested data */
	    (!skb_queue_empty(&sk->sk_receive_queue) ||
1566
	    (sz_copied < target)) &&	/* and more is ready or required */
1567
	    (!err))			/* and haven't reached a FIN */
P
Per Liden 已提交
1568 1569 1570
		goto restart;

exit:
1571
	release_sock(sk);
1572
	return sz_copied ? sz_copied : res;
P
Per Liden 已提交
1573 1574
}

1575 1576 1577 1578 1579 1580 1581 1582 1583 1584
/**
 * tipc_write_space - wake up thread if port congestion is released
 * @sk: socket
 */
static void tipc_write_space(struct sock *sk)
{
	struct socket_wq *wq;

	rcu_read_lock();
	wq = rcu_dereference(sk->sk_wq);
H
Herbert Xu 已提交
1585
	if (skwq_has_sleeper(wq))
1586 1587 1588 1589 1590 1591 1592 1593 1594 1595
		wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
						POLLWRNORM | POLLWRBAND);
	rcu_read_unlock();
}

/**
 * tipc_data_ready - wake up threads to indicate messages have been received
 * @sk: socket
 * @len: the length of messages
 */
1596
static void tipc_data_ready(struct sock *sk)
1597 1598 1599 1600 1601
{
	struct socket_wq *wq;

	rcu_read_lock();
	wq = rcu_dereference(sk->sk_wq);
H
Herbert Xu 已提交
1602
	if (skwq_has_sleeper(wq))
1603 1604 1605 1606 1607
		wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
						POLLRDNORM | POLLRDBAND);
	rcu_read_unlock();
}

1608 1609 1610 1611 1612
static void tipc_sock_destruct(struct sock *sk)
{
	__skb_queue_purge(&sk->sk_receive_queue);
}

1613 1614
/**
 * filter_connect - Handle all incoming messages for a connection-based socket
1615
 * @tsk: TIPC socket
1616
 * @skb: pointer to message buffer. Set to NULL if buffer is consumed
1617
 *
1618
 * Returns true if everything ok, false otherwise
1619
 */
1620
static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
1621
{
1622
	struct sock *sk = &tsk->sk;
1623
	struct net *net = sock_net(sk);
1624
	struct socket *sock = sk->sk_socket;
1625
	struct tipc_msg *hdr = buf_msg(skb);
1626

1627 1628
	if (unlikely(msg_mcast(hdr)))
		return false;
1629 1630 1631

	switch ((int)sock->state) {
	case SS_CONNECTED:
1632

1633
		/* Accept only connection-based messages sent by peer */
1634 1635 1636 1637
		if (unlikely(!tsk_peer_msg(tsk, hdr)))
			return false;

		if (unlikely(msg_errcode(hdr))) {
1638
			tipc_set_sk_state(sk, TIPC_DISCONNECTING);
1639 1640 1641
			/* Let timer expire on it's own */
			tipc_node_remove_conn(net, tsk_peer_node(tsk),
					      tsk->portid);
1642
			sk->sk_state_change(sk);
1643
		}
1644 1645
		return true;

1646
	case SS_CONNECTING:
1647

1648 1649 1650
		/* Accept only ACK or NACK message */
		if (unlikely(!msg_connected(hdr)))
			return false;
1651

1652
		if (unlikely(msg_errcode(hdr))) {
1653
			tipc_set_sk_state(sk, TIPC_DISCONNECTING);
1654
			sk->sk_err = ECONNREFUSED;
1655
			return true;
1656 1657
		}

1658
		if (unlikely(!msg_isdata(hdr))) {
1659
			tipc_set_sk_state(sk, TIPC_DISCONNECTING);
1660
			sk->sk_err = EINVAL;
1661
			return true;
1662 1663
		}

1664 1665
		tipc_sk_finish_conn(tsk, msg_origport(hdr), msg_orignode(hdr));
		msg_set_importance(&tsk->phdr, msg_importance(hdr));
1666 1667
		sock->state = SS_CONNECTED;

1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679
		/* If 'ACK+' message, add to socket receive queue */
		if (msg_data_sz(hdr))
			return true;

		/* If empty 'ACK-' message, wake up sleeping connect() */
		if (waitqueue_active(sk_sleep(sk)))
			wake_up_interruptible(sk_sleep(sk));

		/* 'ACK-' message is neither accepted nor rejected: */
		msg_set_dest_droppable(hdr, 1);
		return false;

1680 1681 1682
	case SS_DISCONNECTING:
		break;
	}
1683

1684 1685
	switch (sk->sk_state) {
	case TIPC_OPEN:
1686
	case TIPC_DISCONNECTING:
1687 1688
		break;
	case TIPC_LISTEN:
1689
		/* Accept only SYN message */
1690 1691
		if (!msg_connected(hdr) && !(msg_errcode(hdr)))
			return true;
1692 1693
		break;
	default:
1694
		pr_err("Unknown sk_state %u\n", sk->sk_state);
1695
	}
1696

1697
	return false;
1698 1699
}

1700 1701 1702
/**
 * rcvbuf_limit - get proper overload limit of socket receive queue
 * @sk: socket
1703
 * @skb: message
1704
 *
1705 1706
 * For connection oriented messages, irrespective of importance,
 * default queue limit is 2 MB.
1707
 *
1708 1709
 * For connectionless messages, queue limits are based on message
 * importance as follows:
1710
 *
1711 1712 1713 1714
 * TIPC_LOW_IMPORTANCE       (2 MB)
 * TIPC_MEDIUM_IMPORTANCE    (4 MB)
 * TIPC_HIGH_IMPORTANCE      (8 MB)
 * TIPC_CRITICAL_IMPORTANCE  (16 MB)
1715 1716 1717
 *
 * Returns overload limit according to corresponding message importance
 */
1718
static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
1719
{
1720 1721 1722 1723 1724
	struct tipc_sock *tsk = tipc_sk(sk);
	struct tipc_msg *hdr = buf_msg(skb);

	if (unlikely(!msg_connected(hdr)))
		return sk->sk_rcvbuf << msg_importance(hdr);
1725

1726 1727
	if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
		return sk->sk_rcvbuf;
1728

1729
	return FLOWCTL_MSG_LIM;
1730 1731
}

1732
/**
1733 1734
 * filter_rcv - validate incoming message
 * @sk: socket
1735
 * @skb: pointer to message.
1736
 *
1737 1738 1739
 * Enqueues message on receive queue if acceptable; optionally handles
 * disconnect indication for a connected socket.
 *
1740
 * Called with socket lock already taken
1741
 *
1742
 * Returns true if message was added to socket receive queue, otherwise false
P
Per Liden 已提交
1743
 */
J
Jon Paul Maloy 已提交
1744 1745
static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
		       struct sk_buff_head *xmitq)
P
Per Liden 已提交
1746
{
1747
	struct tipc_sock *tsk = tipc_sk(sk);
1748 1749 1750 1751
	struct tipc_msg *hdr = buf_msg(skb);
	unsigned int limit = rcvbuf_limit(sk, skb);
	int err = TIPC_OK;
	int usr = msg_user(hdr);
P
Per Liden 已提交
1752

1753
	if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
J
Jon Paul Maloy 已提交
1754
		tipc_sk_proto_rcv(tsk, skb, xmitq);
1755
		return false;
1756
	}
1757

1758 1759
	if (unlikely(usr == SOCK_WAKEUP)) {
		kfree_skb(skb);
1760 1761
		tsk->link_cong = 0;
		sk->sk_write_space(sk);
1762
		return false;
1763 1764
	}

1765 1766 1767 1768 1769
	/* Drop if illegal message type */
	if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) {
		kfree_skb(skb);
		return false;
	}
1770

1771
	/* Reject if wrong message type for current socket state */
1772
	if (tipc_sk_type_connectionless(sk)) {
1773 1774 1775 1776 1777 1778 1779
		if (msg_connected(hdr)) {
			err = TIPC_ERR_NO_PORT;
			goto reject;
		}
	} else if (unlikely(!filter_connect(tsk, skb))) {
		err = TIPC_ERR_NO_PORT;
		goto reject;
P
Per Liden 已提交
1780 1781 1782
	}

	/* Reject message if there isn't room to queue it */
1783 1784 1785 1786
	if (unlikely(sk_rmem_alloc_get(sk) + skb->truesize >= limit)) {
		err = TIPC_ERR_OVERLOAD;
		goto reject;
	}
P
Per Liden 已提交
1787

1788
	/* Enqueue message */
1789
	TIPC_SKB_CB(skb)->bytes_read = 0;
1790 1791
	__skb_queue_tail(&sk->sk_receive_queue, skb);
	skb_set_owner_r(skb, sk);
1792

1793
	sk->sk_data_ready(sk);
1794 1795 1796
	return true;

reject:
J
Jon Paul Maloy 已提交
1797 1798
	if (tipc_msg_reverse(tsk_own_node(tsk), &skb, err))
		__skb_queue_tail(xmitq, skb);
1799
	return false;
1800
}
P
Per Liden 已提交
1801

1802
/**
1803
 * tipc_backlog_rcv - handle incoming message from backlog queue
1804
 * @sk: socket
1805
 * @skb: message
1806
 *
1807
 * Caller must hold socket lock
1808 1809 1810
 *
 * Returns 0
 */
1811
static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1812
{
1813
	unsigned int truesize = skb->truesize;
J
Jon Paul Maloy 已提交
1814 1815
	struct sk_buff_head xmitq;
	u32 dnode, selector;
1816

J
Jon Paul Maloy 已提交
1817 1818 1819
	__skb_queue_head_init(&xmitq);

	if (likely(filter_rcv(sk, skb, &xmitq))) {
1820
		atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt);
J
Jon Paul Maloy 已提交
1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831
		return 0;
	}

	if (skb_queue_empty(&xmitq))
		return 0;

	/* Send response/rejected message */
	skb = __skb_dequeue(&xmitq);
	dnode = msg_destnode(buf_msg(skb));
	selector = msg_origport(buf_msg(skb));
	tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
1832 1833 1834
	return 0;
}

1835
/**
1836 1837 1838 1839 1840
 * tipc_sk_enqueue - extract all buffers with destination 'dport' from
 *                   inputq and try adding them to socket or backlog queue
 * @inputq: list of incoming buffers with potentially different destinations
 * @sk: socket where the buffers should be enqueued
 * @dport: port number for the socket
1841 1842 1843
 *
 * Caller must hold socket lock
 */
1844
static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
J
Jon Paul Maloy 已提交
1845
			    u32 dport, struct sk_buff_head *xmitq)
1846
{
J
Jon Paul Maloy 已提交
1847 1848
	unsigned long time_limit = jiffies + 2;
	struct sk_buff *skb;
1849 1850
	unsigned int lim;
	atomic_t *dcnt;
J
Jon Paul Maloy 已提交
1851
	u32 onode;
1852 1853

	while (skb_queue_len(inputq)) {
1854
		if (unlikely(time_after_eq(jiffies, time_limit)))
1855 1856
			return;

1857 1858
		skb = tipc_skb_dequeue(inputq, dport);
		if (unlikely(!skb))
1859 1860 1861
			return;

		/* Add message directly to receive queue if possible */
1862
		if (!sock_owned_by_user(sk)) {
J
Jon Paul Maloy 已提交
1863
			filter_rcv(sk, skb, xmitq);
1864
			continue;
1865
		}
1866 1867

		/* Try backlog, compensating for double-counted bytes */
1868
		dcnt = &tipc_sk(sk)->dupl_rcvcnt;
1869
		if (!sk->sk_backlog.len)
1870 1871 1872 1873
			atomic_set(dcnt, 0);
		lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
		if (likely(!sk_add_backlog(sk, skb, lim)))
			continue;
1874 1875

		/* Overload => reject message back to sender */
J
Jon Paul Maloy 已提交
1876 1877 1878
		onode = tipc_own_addr(sock_net(sk));
		if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD))
			__skb_queue_tail(xmitq, skb);
1879
		break;
1880
	}
1881 1882
}

1883
/**
1884 1885 1886 1887
 * tipc_sk_rcv - handle a chain of incoming buffers
 * @inputq: buffer list containing the buffers
 * Consumes all buffers in list until inputq is empty
 * Note: may be called in multiple threads referring to the same queue
1888
 */
1889
void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
1890
{
J
Jon Paul Maloy 已提交
1891
	struct sk_buff_head xmitq;
1892
	u32 dnode, dport = 0;
E
Erik Hugne 已提交
1893
	int err;
1894 1895
	struct tipc_sock *tsk;
	struct sock *sk;
1896
	struct sk_buff *skb;
1897

J
Jon Paul Maloy 已提交
1898
	__skb_queue_head_init(&xmitq);
1899 1900 1901
	while (skb_queue_len(inputq)) {
		dport = tipc_skb_peek_port(inputq, dport);
		tsk = tipc_sk_lookup(net, dport);
1902

1903 1904 1905
		if (likely(tsk)) {
			sk = &tsk->sk;
			if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
J
Jon Paul Maloy 已提交
1906
				tipc_sk_enqueue(inputq, sk, dport, &xmitq);
1907 1908
				spin_unlock_bh(&sk->sk_lock.slock);
			}
J
Jon Paul Maloy 已提交
1909 1910 1911 1912 1913
			/* Send pending response/rejected messages, if any */
			while ((skb = __skb_dequeue(&xmitq))) {
				dnode = msg_destnode(buf_msg(skb));
				tipc_node_xmit_skb(net, skb, dnode, dport);
			}
1914 1915 1916
			sock_put(sk);
			continue;
		}
1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929

		/* No destination socket => dequeue skb if still there */
		skb = tipc_skb_dequeue(inputq, dport);
		if (!skb)
			return;

		/* Try secondary lookup if unresolved named message */
		err = TIPC_ERR_NO_PORT;
		if (tipc_msg_lookup_dest(net, skb, &err))
			goto xmit;

		/* Prepare for message rejection */
		if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err))
1930
			continue;
1931
xmit:
1932
		dnode = msg_destnode(buf_msg(skb));
1933
		tipc_node_xmit_skb(net, skb, dnode, dport);
1934
	}
P
Per Liden 已提交
1935 1936
}

Y
Ying Xue 已提交
1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958
static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
{
	struct sock *sk = sock->sk;
	DEFINE_WAIT(wait);
	int done;

	do {
		int err = sock_error(sk);
		if (err)
			return err;
		if (!*timeo_p)
			return -ETIMEDOUT;
		if (signal_pending(current))
			return sock_intr_errno(*timeo_p);

		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
		done = sk_wait_event(sk, timeo_p, sock->state != SS_CONNECTING);
		finish_wait(sk_sleep(sk), &wait);
	} while (!done);
	return 0;
}

P
Per Liden 已提交
1959
/**
1960
 * tipc_connect - establish a connection to another TIPC port
P
Per Liden 已提交
1961 1962 1963
 * @sock: socket structure
 * @dest: socket address for destination port
 * @destlen: size of socket address data structure
1964
 * @flags: file-related flags associated with socket
P
Per Liden 已提交
1965 1966 1967
 *
 * Returns 0 on success, errno otherwise
 */
1968 1969
static int tipc_connect(struct socket *sock, struct sockaddr *dest,
			int destlen, int flags)
P
Per Liden 已提交
1970
{
1971
	struct sock *sk = sock->sk;
1972
	struct tipc_sock *tsk = tipc_sk(sk);
1973 1974
	struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
	struct msghdr m = {NULL,};
1975
	long timeout = (flags & O_NONBLOCK) ? 0 : tsk->conn_timeout;
Y
Ying Xue 已提交
1976
	socket_state previous;
1977
	int res = 0;
1978

1979 1980
	lock_sock(sk);

1981
	/* DGRAM/RDM connect(), just save the destaddr */
1982
	if (tipc_sk_type_connectionless(sk)) {
1983
		if (dst->family == AF_UNSPEC) {
1984
			memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc));
1985 1986
		} else if (destlen != sizeof(struct sockaddr_tipc)) {
			res = -EINVAL;
1987
		} else {
1988
			memcpy(&tsk->peer, dest, destlen);
1989
		}
1990 1991
		goto exit;
	}
1992 1993 1994 1995 1996 1997 1998

	/*
	 * Reject connection attempt using multicast address
	 *
	 * Note: send_msg() validates the rest of the address fields,
	 *       so there's no need to do it here
	 */
1999 2000 2001 2002 2003
	if (dst->addrtype == TIPC_ADDR_MCAST) {
		res = -EINVAL;
		goto exit;
	}

Y
Ying Xue 已提交
2004
	previous = sock->state;
2005 2006 2007

	switch (sk->sk_state) {
	case TIPC_OPEN:
2008 2009 2010 2011 2012 2013 2014 2015 2016 2017
		/* Send a 'SYN-' to destination */
		m.msg_name = dest;
		m.msg_namelen = destlen;

		/* If connect is in non-blocking case, set MSG_DONTWAIT to
		 * indicate send_msg() is never blocked.
		 */
		if (!timeout)
			m.msg_flags = MSG_DONTWAIT;

2018
		res = __tipc_sendmsg(sock, &m, 0);
2019 2020 2021 2022 2023 2024 2025 2026
		if ((res < 0) && (res != -EWOULDBLOCK))
			goto exit;

		/* Just entered SS_CONNECTING state; the only
		 * difference is that return value in non-blocking
		 * case is EINPROGRESS, rather than EALREADY.
		 */
		res = -EINPROGRESS;
2027 2028 2029 2030
		break;
	}

	switch (sock->state) {
2031
	case SS_CONNECTING:
Y
Ying Xue 已提交
2032 2033 2034 2035 2036 2037 2038
		if (previous == SS_CONNECTING)
			res = -EALREADY;
		if (!timeout)
			goto exit;
		timeout = msecs_to_jiffies(timeout);
		/* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
		res = tipc_wait_for_connect(sock, &timeout);
2039 2040 2041 2042 2043 2044
		break;
	case SS_CONNECTED:
		res = -EISCONN;
		break;
	default:
		res = -EINVAL;
Y
Ying Xue 已提交
2045
		break;
2046
	}
2047 2048
exit:
	release_sock(sk);
2049
	return res;
P
Per Liden 已提交
2050 2051
}

2052
/**
2053
 * tipc_listen - allow socket to listen for incoming connections
P
Per Liden 已提交
2054 2055
 * @sock: socket structure
 * @len: (unused)
2056
 *
P
Per Liden 已提交
2057 2058
 * Returns 0 on success, errno otherwise
 */
2059
static int tipc_listen(struct socket *sock, int len)
P
Per Liden 已提交
2060
{
2061 2062 2063 2064
	struct sock *sk = sock->sk;
	int res;

	lock_sock(sk);
2065
	res = tipc_set_sk_state(sk, TIPC_LISTEN);
2066
	release_sock(sk);
2067

2068
	return res;
P
Per Liden 已提交
2069 2070
}

Y
Ying Xue 已提交
2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084
static int tipc_wait_for_accept(struct socket *sock, long timeo)
{
	struct sock *sk = sock->sk;
	DEFINE_WAIT(wait);
	int err;

	/* True wake-one mechanism for incoming connections: only
	 * one process gets woken up, not the 'whole herd'.
	 * Since we do not 'race & poll' for established sockets
	 * anymore, the common case will execute the loop only once.
	*/
	for (;;) {
		prepare_to_wait_exclusive(sk_sleep(sk), &wait,
					  TASK_INTERRUPTIBLE);
2085
		if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
Y
Ying Xue 已提交
2086 2087 2088 2089 2090 2091 2092 2093 2094 2095
			release_sock(sk);
			timeo = schedule_timeout(timeo);
			lock_sock(sk);
		}
		err = 0;
		if (!skb_queue_empty(&sk->sk_receive_queue))
			break;
		err = -EAGAIN;
		if (!timeo)
			break;
2096 2097 2098
		err = sock_intr_errno(timeo);
		if (signal_pending(current))
			break;
Y
Ying Xue 已提交
2099 2100 2101 2102 2103
	}
	finish_wait(sk_sleep(sk), &wait);
	return err;
}

2104
/**
2105
 * tipc_accept - wait for connection request
P
Per Liden 已提交
2106 2107 2108
 * @sock: listening socket
 * @newsock: new socket that is to be connected
 * @flags: file-related flags associated with socket
2109
 *
P
Per Liden 已提交
2110 2111
 * Returns 0 on success, errno otherwise
 */
2112
static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
P
Per Liden 已提交
2113
{
2114
	struct sock *new_sk, *sk = sock->sk;
P
Per Liden 已提交
2115
	struct sk_buff *buf;
2116
	struct tipc_sock *new_tsock;
2117
	struct tipc_msg *msg;
Y
Ying Xue 已提交
2118
	long timeo;
2119
	int res;
P
Per Liden 已提交
2120

2121
	lock_sock(sk);
P
Per Liden 已提交
2122

2123
	if (sk->sk_state != TIPC_LISTEN) {
2124
		res = -EINVAL;
P
Per Liden 已提交
2125 2126
		goto exit;
	}
Y
Ying Xue 已提交
2127 2128 2129 2130
	timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
	res = tipc_wait_for_accept(sock, timeo);
	if (res)
		goto exit;
2131 2132 2133

	buf = skb_peek(&sk->sk_receive_queue);

2134
	res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 0);
2135 2136
	if (res)
		goto exit;
2137
	security_sk_clone(sock->sk, new_sock->sk);
P
Per Liden 已提交
2138

2139
	new_sk = new_sock->sk;
2140
	new_tsock = tipc_sk(new_sk);
2141
	msg = buf_msg(buf);
P
Per Liden 已提交
2142

2143 2144 2145 2146 2147 2148 2149
	/* we lock on new_sk; but lockdep sees the lock on sk */
	lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);

	/*
	 * Reject any stray messages received by new socket
	 * before the socket lock was taken (very, very unlikely)
	 */
2150
	tsk_rej_rx_queue(new_sk);
2151 2152

	/* Connect new socket to it's peer */
2153
	tipc_sk_finish_conn(new_tsock, msg_origport(msg), msg_orignode(msg));
2154 2155
	new_sock->state = SS_CONNECTED;

2156
	tsk_set_importance(new_tsock, msg_importance(msg));
2157
	if (msg_named(msg)) {
2158 2159
		new_tsock->conn_type = msg_nametype(msg);
		new_tsock->conn_instance = msg_nameinst(msg);
P
Per Liden 已提交
2160
	}
2161 2162 2163 2164 2165 2166 2167 2168

	/*
	 * Respond to 'SYN-' by discarding it & returning 'ACK'-.
	 * Respond to 'SYN+' by queuing it on new socket.
	 */
	if (!msg_data_sz(msg)) {
		struct msghdr m = {NULL,};

2169
		tsk_advance_rx_queue(sk);
2170
		__tipc_send_stream(new_sock, &m, 0);
2171 2172 2173
	} else {
		__skb_dequeue(&sk->sk_receive_queue);
		__skb_queue_head(&new_sk->sk_receive_queue, buf);
2174
		skb_set_owner_r(buf, new_sk);
2175 2176
	}
	release_sock(new_sk);
P
Per Liden 已提交
2177
exit:
2178
	release_sock(sk);
P
Per Liden 已提交
2179 2180 2181 2182
	return res;
}

/**
2183
 * tipc_shutdown - shutdown socket connection
P
Per Liden 已提交
2184
 * @sock: socket structure
2185
 * @how: direction to close (must be SHUT_RDWR)
P
Per Liden 已提交
2186 2187
 *
 * Terminates connection (if necessary), then purges socket's receive queue.
2188
 *
P
Per Liden 已提交
2189 2190
 * Returns 0 on success, errno otherwise
 */
2191
static int tipc_shutdown(struct socket *sock, int how)
P
Per Liden 已提交
2192
{
2193
	struct sock *sk = sock->sk;
2194
	struct net *net = sock_net(sk);
2195
	struct tipc_sock *tsk = tipc_sk(sk);
2196
	struct sk_buff *skb;
2197 2198 2199 2200
	u32 dnode = tsk_peer_node(tsk);
	u32 dport = tsk_peer_port(tsk);
	u32 onode = tipc_own_addr(net);
	u32 oport = tsk->portid;
P
Per Liden 已提交
2201 2202
	int res;

2203 2204
	if (how != SHUT_RDWR)
		return -EINVAL;
P
Per Liden 已提交
2205

2206
	lock_sock(sk);
P
Per Liden 已提交
2207

2208
	if (sock->state == SS_CONNECTING || sock->state == SS_CONNECTED) {
P
Per Liden 已提交
2209 2210

restart:
2211 2212
		dnode = tsk_peer_node(tsk);

2213
		/* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
2214 2215
		skb = __skb_dequeue(&sk->sk_receive_queue);
		if (skb) {
2216
			if (TIPC_SKB_CB(skb)->bytes_read) {
2217
				kfree_skb(skb);
P
Per Liden 已提交
2218 2219
				goto restart;
			}
2220
			tipc_sk_respond(sk, skb, TIPC_CONN_SHUTDOWN);
2221
		} else {
2222
			skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
2223
					      TIPC_CONN_MSG, SHORT_H_SIZE,
2224 2225
					      0, dnode, onode, dport, oport,
					      TIPC_CONN_SHUTDOWN);
2226 2227
			if (skb)
				tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
P
Per Liden 已提交
2228
		}
2229
		tipc_set_sk_state(sk, TIPC_DISCONNECTING);
2230
		tipc_node_remove_conn(net, dnode, tsk->portid);
2231
	}
P
Per Liden 已提交
2232

2233 2234
	switch (sk->sk_state) {
	case TIPC_DISCONNECTING:
P
Per Liden 已提交
2235

2236
		/* Discard any unreceived messages */
2237
		__skb_queue_purge(&sk->sk_receive_queue);
2238 2239 2240

		/* Wake up anyone sleeping in poll */
		sk->sk_state_change(sk);
P
Per Liden 已提交
2241 2242 2243 2244 2245 2246 2247
		res = 0;
		break;

	default:
		res = -ENOTCONN;
	}

2248
	release_sock(sk);
P
Per Liden 已提交
2249 2250 2251
	return res;
}

2252
static void tipc_sk_timeout(unsigned long data)
2253
{
2254 2255
	struct tipc_sock *tsk = (struct tipc_sock *)data;
	struct sock *sk = &tsk->sk;
2256
	struct sk_buff *skb = NULL;
2257
	u32 peer_port, peer_node;
2258
	u32 own_node = tsk_own_node(tsk);
2259

J
Jon Paul Maloy 已提交
2260
	bh_lock_sock(sk);
2261
	if (!tipc_sk_connected(sk)) {
J
Jon Paul Maloy 已提交
2262 2263
		bh_unlock_sock(sk);
		goto exit;
2264
	}
2265 2266
	peer_port = tsk_peer_port(tsk);
	peer_node = tsk_peer_node(tsk);
2267

2268
	if (tsk->probe_unacked) {
2269
		if (!sock_owned_by_user(sk)) {
2270
			tipc_set_sk_state(sk, TIPC_DISCONNECTING);
2271 2272 2273 2274 2275 2276 2277 2278
			tipc_node_remove_conn(sock_net(sk), tsk_peer_node(tsk),
					      tsk_peer_port(tsk));
			sk->sk_state_change(sk);
		} else {
			/* Try again later */
			sk_reset_timer(sk, &sk->sk_timer, (HZ / 20));
		}

2279 2280
		bh_unlock_sock(sk);
		goto exit;
2281
	}
2282 2283 2284 2285

	skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE,
			      INT_H_SIZE, 0, peer_node, own_node,
			      peer_port, tsk->portid, TIPC_OK);
2286
	tsk->probe_unacked = true;
2287
	sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTERVAL);
2288
	bh_unlock_sock(sk);
2289
	if (skb)
2290
		tipc_node_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
J
Jon Paul Maloy 已提交
2291
exit:
2292
	sock_put(sk);
2293 2294
}

2295
static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
J
Jon Paul Maloy 已提交
2296 2297
			   struct tipc_name_seq const *seq)
{
2298 2299
	struct sock *sk = &tsk->sk;
	struct net *net = sock_net(sk);
J
Jon Paul Maloy 已提交
2300 2301 2302
	struct publication *publ;
	u32 key;

2303
	if (tipc_sk_connected(sk))
J
Jon Paul Maloy 已提交
2304
		return -EINVAL;
2305 2306
	key = tsk->portid + tsk->pub_count + 1;
	if (key == tsk->portid)
J
Jon Paul Maloy 已提交
2307 2308
		return -EADDRINUSE;

2309
	publ = tipc_nametbl_publish(net, seq->type, seq->lower, seq->upper,
2310
				    scope, tsk->portid, key);
J
Jon Paul Maloy 已提交
2311 2312 2313
	if (unlikely(!publ))
		return -EINVAL;

2314 2315 2316
	list_add(&publ->pport_list, &tsk->publications);
	tsk->pub_count++;
	tsk->published = 1;
J
Jon Paul Maloy 已提交
2317 2318 2319
	return 0;
}

2320
static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
J
Jon Paul Maloy 已提交
2321 2322
			    struct tipc_name_seq const *seq)
{
2323
	struct net *net = sock_net(&tsk->sk);
J
Jon Paul Maloy 已提交
2324 2325 2326 2327
	struct publication *publ;
	struct publication *safe;
	int rc = -EINVAL;

2328
	list_for_each_entry_safe(publ, safe, &tsk->publications, pport_list) {
J
Jon Paul Maloy 已提交
2329 2330 2331 2332 2333 2334 2335 2336 2337
		if (seq) {
			if (publ->scope != scope)
				continue;
			if (publ->type != seq->type)
				continue;
			if (publ->lower != seq->lower)
				continue;
			if (publ->upper != seq->upper)
				break;
2338
			tipc_nametbl_withdraw(net, publ->type, publ->lower,
J
Jon Paul Maloy 已提交
2339 2340 2341 2342
					      publ->ref, publ->key);
			rc = 0;
			break;
		}
2343
		tipc_nametbl_withdraw(net, publ->type, publ->lower,
J
Jon Paul Maloy 已提交
2344 2345 2346
				      publ->ref, publ->key);
		rc = 0;
	}
2347 2348
	if (list_empty(&tsk->publications))
		tsk->published = 0;
J
Jon Paul Maloy 已提交
2349 2350 2351
	return rc;
}

2352 2353 2354
/* tipc_sk_reinit: set non-zero address in all existing sockets
 *                 when we go from standalone to network mode.
 */
2355
void tipc_sk_reinit(struct net *net)
2356
{
2357
	struct tipc_net *tn = net_generic(net, tipc_net_id);
2358 2359 2360
	const struct bucket_table *tbl;
	struct rhash_head *pos;
	struct tipc_sock *tsk;
2361
	struct tipc_msg *msg;
2362
	int i;
2363

2364
	rcu_read_lock();
2365
	tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2366 2367 2368 2369
	for (i = 0; i < tbl->size; i++) {
		rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
			spin_lock_bh(&tsk->sk.sk_lock.slock);
			msg = &tsk->phdr;
2370 2371
			msg_set_prevnode(msg, tn->own_addr);
			msg_set_orignode(msg, tn->own_addr);
2372 2373
			spin_unlock_bh(&tsk->sk.sk_lock.slock);
		}
2374
	}
2375
	rcu_read_unlock();
2376 2377
}

2378
static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
2379
{
2380
	struct tipc_net *tn = net_generic(net, tipc_net_id);
2381
	struct tipc_sock *tsk;
2382

2383
	rcu_read_lock();
2384
	tsk = rhashtable_lookup_fast(&tn->sk_rht, &portid, tsk_rht_params);
2385 2386 2387
	if (tsk)
		sock_hold(&tsk->sk);
	rcu_read_unlock();
2388

2389
	return tsk;
2390 2391
}

2392
static int tipc_sk_insert(struct tipc_sock *tsk)
2393
{
2394 2395 2396
	struct sock *sk = &tsk->sk;
	struct net *net = sock_net(sk);
	struct tipc_net *tn = net_generic(net, tipc_net_id);
2397 2398
	u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1;
	u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT;
2399

2400 2401 2402 2403 2404 2405
	while (remaining--) {
		portid++;
		if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT))
			portid = TIPC_MIN_PORT;
		tsk->portid = portid;
		sock_hold(&tsk->sk);
2406 2407
		if (!rhashtable_lookup_insert_fast(&tn->sk_rht, &tsk->node,
						   tsk_rht_params))
2408 2409
			return 0;
		sock_put(&tsk->sk);
2410 2411
	}

2412
	return -1;
2413 2414
}

2415
static void tipc_sk_remove(struct tipc_sock *tsk)
2416
{
2417
	struct sock *sk = &tsk->sk;
2418
	struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id);
2419

2420
	if (!rhashtable_remove_fast(&tn->sk_rht, &tsk->node, tsk_rht_params)) {
2421 2422
		WARN_ON(atomic_read(&sk->sk_refcnt) == 1);
		__sock_put(sk);
2423 2424 2425
	}
}

2426 2427 2428 2429 2430 2431 2432
static const struct rhashtable_params tsk_rht_params = {
	.nelem_hint = 192,
	.head_offset = offsetof(struct tipc_sock, node),
	.key_offset = offsetof(struct tipc_sock, portid),
	.key_len = sizeof(u32), /* portid */
	.max_size = 1048576,
	.min_size = 256,
2433
	.automatic_shrinking = true,
2434 2435
};

2436
int tipc_sk_rht_init(struct net *net)
2437
{
2438
	struct tipc_net *tn = net_generic(net, tipc_net_id);
2439 2440

	return rhashtable_init(&tn->sk_rht, &tsk_rht_params);
2441 2442
}

2443
void tipc_sk_rht_destroy(struct net *net)
2444
{
2445 2446
	struct tipc_net *tn = net_generic(net, tipc_net_id);

2447 2448
	/* Wait for socket readers to complete */
	synchronize_net();
2449

2450
	rhashtable_destroy(&tn->sk_rht);
2451 2452
}

P
Per Liden 已提交
2453
/**
2454
 * tipc_setsockopt - set socket option
P
Per Liden 已提交
2455 2456 2457 2458 2459
 * @sock: socket structure
 * @lvl: option level
 * @opt: option identifier
 * @ov: pointer to new option value
 * @ol: length of option value
2460 2461
 *
 * For stream sockets only, accepts and ignores all IPPROTO_TCP options
P
Per Liden 已提交
2462
 * (to ease compatibility).
2463
 *
P
Per Liden 已提交
2464 2465
 * Returns 0 on success, errno otherwise
 */
2466 2467
static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
			   char __user *ov, unsigned int ol)
P
Per Liden 已提交
2468
{
2469
	struct sock *sk = sock->sk;
2470
	struct tipc_sock *tsk = tipc_sk(sk);
P
Per Liden 已提交
2471 2472 2473
	u32 value;
	int res;

2474 2475
	if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
		return 0;
P
Per Liden 已提交
2476 2477 2478 2479
	if (lvl != SOL_TIPC)
		return -ENOPROTOOPT;
	if (ol < sizeof(value))
		return -EINVAL;
2480 2481
	res = get_user(value, (u32 __user *)ov);
	if (res)
P
Per Liden 已提交
2482 2483
		return res;

2484
	lock_sock(sk);
2485

P
Per Liden 已提交
2486 2487
	switch (opt) {
	case TIPC_IMPORTANCE:
2488
		res = tsk_set_importance(tsk, value);
P
Per Liden 已提交
2489 2490 2491
		break;
	case TIPC_SRC_DROPPABLE:
		if (sock->type != SOCK_STREAM)
2492
			tsk_set_unreliable(tsk, value);
2493
		else
P
Per Liden 已提交
2494 2495 2496
			res = -ENOPROTOOPT;
		break;
	case TIPC_DEST_DROPPABLE:
2497
		tsk_set_unreturnable(tsk, value);
P
Per Liden 已提交
2498 2499
		break;
	case TIPC_CONN_TIMEOUT:
2500
		tipc_sk(sk)->conn_timeout = value;
2501
		/* no need to set "res", since already 0 at this point */
P
Per Liden 已提交
2502 2503 2504 2505 2506
		break;
	default:
		res = -EINVAL;
	}

2507 2508
	release_sock(sk);

P
Per Liden 已提交
2509 2510 2511 2512
	return res;
}

/**
2513
 * tipc_getsockopt - get socket option
P
Per Liden 已提交
2514 2515 2516 2517 2518
 * @sock: socket structure
 * @lvl: option level
 * @opt: option identifier
 * @ov: receptacle for option value
 * @ol: receptacle for length of option value
2519 2520
 *
 * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
P
Per Liden 已提交
2521
 * (to ease compatibility).
2522
 *
P
Per Liden 已提交
2523 2524
 * Returns 0 on success, errno otherwise
 */
2525 2526
static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
			   char __user *ov, int __user *ol)
P
Per Liden 已提交
2527
{
2528
	struct sock *sk = sock->sk;
2529
	struct tipc_sock *tsk = tipc_sk(sk);
2530
	int len;
P
Per Liden 已提交
2531
	u32 value;
2532
	int res;
P
Per Liden 已提交
2533

2534 2535
	if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
		return put_user(0, ol);
P
Per Liden 已提交
2536 2537
	if (lvl != SOL_TIPC)
		return -ENOPROTOOPT;
2538 2539
	res = get_user(len, ol);
	if (res)
2540
		return res;
P
Per Liden 已提交
2541

2542
	lock_sock(sk);
P
Per Liden 已提交
2543 2544 2545

	switch (opt) {
	case TIPC_IMPORTANCE:
2546
		value = tsk_importance(tsk);
P
Per Liden 已提交
2547 2548
		break;
	case TIPC_SRC_DROPPABLE:
2549
		value = tsk_unreliable(tsk);
P
Per Liden 已提交
2550 2551
		break;
	case TIPC_DEST_DROPPABLE:
2552
		value = tsk_unreturnable(tsk);
P
Per Liden 已提交
2553 2554
		break;
	case TIPC_CONN_TIMEOUT:
2555
		value = tsk->conn_timeout;
2556
		/* no need to set "res", since already 0 at this point */
P
Per Liden 已提交
2557
		break;
2558
	case TIPC_NODE_RECVQ_DEPTH:
2559
		value = 0; /* was tipc_queue_size, now obsolete */
2560
		break;
2561
	case TIPC_SOCK_RECVQ_DEPTH:
2562 2563
		value = skb_queue_len(&sk->sk_receive_queue);
		break;
P
Per Liden 已提交
2564 2565 2566 2567
	default:
		res = -EINVAL;
	}

2568 2569
	release_sock(sk);

2570 2571
	if (res)
		return res;	/* "get" failed */
P
Per Liden 已提交
2572

2573 2574 2575 2576 2577 2578 2579
	if (len < sizeof(value))
		return -EINVAL;

	if (copy_to_user(ov, &value, sizeof(value)))
		return -EFAULT;

	return put_user(sizeof(value), ol);
P
Per Liden 已提交
2580 2581
}

2582
static int tipc_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
E
Erik Hugne 已提交
2583
{
2584
	struct sock *sk = sock->sk;
E
Erik Hugne 已提交
2585 2586 2587 2588 2589 2590 2591
	struct tipc_sioc_ln_req lnr;
	void __user *argp = (void __user *)arg;

	switch (cmd) {
	case SIOCGETLINKNAME:
		if (copy_from_user(&lnr, argp, sizeof(lnr)))
			return -EFAULT;
2592 2593
		if (!tipc_node_get_linkname(sock_net(sk),
					    lnr.bearer_id & 0xffff, lnr.peer,
E
Erik Hugne 已提交
2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604
					    lnr.linkname, TIPC_MAX_LINK_NAME)) {
			if (copy_to_user(argp, &lnr, sizeof(lnr)))
				return -EFAULT;
			return 0;
		}
		return -EADDRNOTAVAIL;
	default:
		return -ENOIOCTLCMD;
	}
}

2605 2606
/* Protocol switches for the various types of TIPC sockets */

2607
static const struct proto_ops msg_ops = {
2608
	.owner		= THIS_MODULE,
P
Per Liden 已提交
2609
	.family		= AF_TIPC,
2610 2611 2612
	.release	= tipc_release,
	.bind		= tipc_bind,
	.connect	= tipc_connect,
2613
	.socketpair	= sock_no_socketpair,
2614
	.accept		= sock_no_accept,
2615 2616
	.getname	= tipc_getname,
	.poll		= tipc_poll,
E
Erik Hugne 已提交
2617
	.ioctl		= tipc_ioctl,
2618
	.listen		= sock_no_listen,
2619 2620 2621 2622 2623
	.shutdown	= tipc_shutdown,
	.setsockopt	= tipc_setsockopt,
	.getsockopt	= tipc_getsockopt,
	.sendmsg	= tipc_sendmsg,
	.recvmsg	= tipc_recvmsg,
2624 2625
	.mmap		= sock_no_mmap,
	.sendpage	= sock_no_sendpage
P
Per Liden 已提交
2626 2627
};

2628
static const struct proto_ops packet_ops = {
2629
	.owner		= THIS_MODULE,
P
Per Liden 已提交
2630
	.family		= AF_TIPC,
2631 2632 2633
	.release	= tipc_release,
	.bind		= tipc_bind,
	.connect	= tipc_connect,
2634
	.socketpair	= sock_no_socketpair,
2635 2636 2637
	.accept		= tipc_accept,
	.getname	= tipc_getname,
	.poll		= tipc_poll,
E
Erik Hugne 已提交
2638
	.ioctl		= tipc_ioctl,
2639 2640 2641 2642 2643 2644
	.listen		= tipc_listen,
	.shutdown	= tipc_shutdown,
	.setsockopt	= tipc_setsockopt,
	.getsockopt	= tipc_getsockopt,
	.sendmsg	= tipc_send_packet,
	.recvmsg	= tipc_recvmsg,
2645 2646
	.mmap		= sock_no_mmap,
	.sendpage	= sock_no_sendpage
P
Per Liden 已提交
2647 2648
};

2649
static const struct proto_ops stream_ops = {
2650
	.owner		= THIS_MODULE,
P
Per Liden 已提交
2651
	.family		= AF_TIPC,
2652 2653 2654
	.release	= tipc_release,
	.bind		= tipc_bind,
	.connect	= tipc_connect,
2655
	.socketpair	= sock_no_socketpair,
2656 2657 2658
	.accept		= tipc_accept,
	.getname	= tipc_getname,
	.poll		= tipc_poll,
E
Erik Hugne 已提交
2659
	.ioctl		= tipc_ioctl,
2660 2661 2662 2663 2664 2665
	.listen		= tipc_listen,
	.shutdown	= tipc_shutdown,
	.setsockopt	= tipc_setsockopt,
	.getsockopt	= tipc_getsockopt,
	.sendmsg	= tipc_send_stream,
	.recvmsg	= tipc_recv_stream,
2666 2667
	.mmap		= sock_no_mmap,
	.sendpage	= sock_no_sendpage
P
Per Liden 已提交
2668 2669
};

2670
static const struct net_proto_family tipc_family_ops = {
2671
	.owner		= THIS_MODULE,
P
Per Liden 已提交
2672
	.family		= AF_TIPC,
2673
	.create		= tipc_sk_create
P
Per Liden 已提交
2674 2675 2676 2677 2678
};

static struct proto tipc_proto = {
	.name		= "TIPC",
	.owner		= THIS_MODULE,
2679 2680
	.obj_size	= sizeof(struct tipc_sock),
	.sysctl_rmem	= sysctl_tipc_rmem
P
Per Liden 已提交
2681 2682 2683
};

/**
2684
 * tipc_socket_init - initialize TIPC socket interface
2685
 *
P
Per Liden 已提交
2686 2687
 * Returns 0 on success, errno otherwise
 */
2688
int tipc_socket_init(void)
P
Per Liden 已提交
2689 2690 2691
{
	int res;

2692
	res = proto_register(&tipc_proto, 1);
P
Per Liden 已提交
2693
	if (res) {
2694
		pr_err("Failed to register TIPC protocol type\n");
P
Per Liden 已提交
2695 2696 2697 2698 2699
		goto out;
	}

	res = sock_register(&tipc_family_ops);
	if (res) {
2700
		pr_err("Failed to register TIPC socket type\n");
P
Per Liden 已提交
2701 2702 2703 2704 2705 2706 2707 2708
		proto_unregister(&tipc_proto);
		goto out;
	}
 out:
	return res;
}

/**
2709
 * tipc_socket_stop - stop TIPC socket interface
P
Per Liden 已提交
2710
 */
2711
void tipc_socket_stop(void)
P
Per Liden 已提交
2712 2713 2714 2715
{
	sock_unregister(tipc_family_ops.family);
	proto_unregister(&tipc_proto);
}
2716 2717

/* Caller should hold socket lock for the passed tipc socket. */
2718
static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk)
2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752
{
	u32 peer_node;
	u32 peer_port;
	struct nlattr *nest;

	peer_node = tsk_peer_node(tsk);
	peer_port = tsk_peer_port(tsk);

	nest = nla_nest_start(skb, TIPC_NLA_SOCK_CON);

	if (nla_put_u32(skb, TIPC_NLA_CON_NODE, peer_node))
		goto msg_full;
	if (nla_put_u32(skb, TIPC_NLA_CON_SOCK, peer_port))
		goto msg_full;

	if (tsk->conn_type != 0) {
		if (nla_put_flag(skb, TIPC_NLA_CON_FLAG))
			goto msg_full;
		if (nla_put_u32(skb, TIPC_NLA_CON_TYPE, tsk->conn_type))
			goto msg_full;
		if (nla_put_u32(skb, TIPC_NLA_CON_INST, tsk->conn_instance))
			goto msg_full;
	}
	nla_nest_end(skb, nest);

	return 0;

msg_full:
	nla_nest_cancel(skb, nest);

	return -EMSGSIZE;
}

/* Caller should hold socket lock for the passed tipc socket. */
2753 2754
static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
			    struct tipc_sock *tsk)
2755 2756 2757 2758
{
	int err;
	void *hdr;
	struct nlattr *attrs;
2759 2760
	struct net *net = sock_net(skb->sk);
	struct tipc_net *tn = net_generic(net, tipc_net_id);
2761
	struct sock *sk = &tsk->sk;
2762 2763

	hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2764
			  &tipc_genl_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
2765 2766 2767 2768 2769 2770
	if (!hdr)
		goto msg_cancel;

	attrs = nla_nest_start(skb, TIPC_NLA_SOCK);
	if (!attrs)
		goto genlmsg_cancel;
2771
	if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid))
2772
		goto attr_msg_cancel;
2773
	if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tn->own_addr))
2774 2775
		goto attr_msg_cancel;

2776
	if (tipc_sk_connected(sk)) {
2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800
		err = __tipc_nl_add_sk_con(skb, tsk);
		if (err)
			goto attr_msg_cancel;
	} else if (!list_empty(&tsk->publications)) {
		if (nla_put_flag(skb, TIPC_NLA_SOCK_HAS_PUBL))
			goto attr_msg_cancel;
	}
	nla_nest_end(skb, attrs);
	genlmsg_end(skb, hdr);

	return 0;

attr_msg_cancel:
	nla_nest_cancel(skb, attrs);
genlmsg_cancel:
	genlmsg_cancel(skb, hdr);
msg_cancel:
	return -EMSGSIZE;
}

int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
{
	int err;
	struct tipc_sock *tsk;
2801 2802
	const struct bucket_table *tbl;
	struct rhash_head *pos;
2803 2804
	struct net *net = sock_net(skb->sk);
	struct tipc_net *tn = net_generic(net, tipc_net_id);
2805 2806
	u32 tbl_id = cb->args[0];
	u32 prev_portid = cb->args[1];
2807

2808
	rcu_read_lock();
2809
	tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2810 2811
	for (; tbl_id < tbl->size; tbl_id++) {
		rht_for_each_entry_rcu(tsk, pos, tbl, tbl_id, node) {
2812
			spin_lock_bh(&tsk->sk.sk_lock.slock);
2813 2814 2815 2816 2817
			if (prev_portid && prev_portid != tsk->portid) {
				spin_unlock_bh(&tsk->sk.sk_lock.slock);
				continue;
			}

2818
			err = __tipc_nl_add_sk(skb, cb, tsk);
2819 2820 2821 2822 2823 2824
			if (err) {
				prev_portid = tsk->portid;
				spin_unlock_bh(&tsk->sk.sk_lock.slock);
				goto out;
			}
			prev_portid = 0;
2825 2826
			spin_unlock_bh(&tsk->sk.sk_lock.slock);
		}
2827
	}
2828
out:
2829
	rcu_read_unlock();
2830 2831
	cb->args[0] = tbl_id;
	cb->args[1] = prev_portid;
2832 2833 2834

	return skb->len;
}
2835 2836

/* Caller should hold socket lock for the passed tipc socket. */
2837 2838 2839
static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
				 struct netlink_callback *cb,
				 struct publication *publ)
2840 2841 2842 2843 2844
{
	void *hdr;
	struct nlattr *attrs;

	hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2845
			  &tipc_genl_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
2846 2847 2848 2849 2850 2851 2852 2853 2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875
	if (!hdr)
		goto msg_cancel;

	attrs = nla_nest_start(skb, TIPC_NLA_PUBL);
	if (!attrs)
		goto genlmsg_cancel;

	if (nla_put_u32(skb, TIPC_NLA_PUBL_KEY, publ->key))
		goto attr_msg_cancel;
	if (nla_put_u32(skb, TIPC_NLA_PUBL_TYPE, publ->type))
		goto attr_msg_cancel;
	if (nla_put_u32(skb, TIPC_NLA_PUBL_LOWER, publ->lower))
		goto attr_msg_cancel;
	if (nla_put_u32(skb, TIPC_NLA_PUBL_UPPER, publ->upper))
		goto attr_msg_cancel;

	nla_nest_end(skb, attrs);
	genlmsg_end(skb, hdr);

	return 0;

attr_msg_cancel:
	nla_nest_cancel(skb, attrs);
genlmsg_cancel:
	genlmsg_cancel(skb, hdr);
msg_cancel:
	return -EMSGSIZE;
}

/* Caller should hold socket lock for the passed tipc socket. */
2876 2877 2878
static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
				  struct netlink_callback *cb,
				  struct tipc_sock *tsk, u32 *last_publ)
2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916 2917 2918
{
	int err;
	struct publication *p;

	if (*last_publ) {
		list_for_each_entry(p, &tsk->publications, pport_list) {
			if (p->key == *last_publ)
				break;
		}
		if (p->key != *last_publ) {
			/* We never set seq or call nl_dump_check_consistent()
			 * this means that setting prev_seq here will cause the
			 * consistence check to fail in the netlink callback
			 * handler. Resulting in the last NLMSG_DONE message
			 * having the NLM_F_DUMP_INTR flag set.
			 */
			cb->prev_seq = 1;
			*last_publ = 0;
			return -EPIPE;
		}
	} else {
		p = list_first_entry(&tsk->publications, struct publication,
				     pport_list);
	}

	list_for_each_entry_from(p, &tsk->publications, pport_list) {
		err = __tipc_nl_add_sk_publ(skb, cb, p);
		if (err) {
			*last_publ = p->key;
			return err;
		}
	}
	*last_publ = 0;

	return 0;
}

int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
{
	int err;
2919
	u32 tsk_portid = cb->args[0];
2920 2921
	u32 last_publ = cb->args[1];
	u32 done = cb->args[2];
2922
	struct net *net = sock_net(skb->sk);
2923 2924
	struct tipc_sock *tsk;

2925
	if (!tsk_portid) {
2926 2927 2928 2929 2930 2931 2932
		struct nlattr **attrs;
		struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];

		err = tipc_nlmsg_parse(cb->nlh, &attrs);
		if (err)
			return err;

2933 2934 2935
		if (!attrs[TIPC_NLA_SOCK])
			return -EINVAL;

2936 2937 2938 2939 2940 2941 2942 2943 2944
		err = nla_parse_nested(sock, TIPC_NLA_SOCK_MAX,
				       attrs[TIPC_NLA_SOCK],
				       tipc_nl_sock_policy);
		if (err)
			return err;

		if (!sock[TIPC_NLA_SOCK_REF])
			return -EINVAL;

2945
		tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
2946 2947 2948 2949 2950
	}

	if (done)
		return 0;

2951
	tsk = tipc_sk_lookup(net, tsk_portid);
2952 2953 2954 2955 2956 2957 2958 2959
	if (!tsk)
		return -EINVAL;

	lock_sock(&tsk->sk);
	err = __tipc_nl_list_sk_publ(skb, cb, tsk, &last_publ);
	if (!err)
		done = 1;
	release_sock(&tsk->sk);
2960
	sock_put(&tsk->sk);
2961

2962
	cb->args[0] = tsk_portid;
2963 2964 2965 2966 2967
	cb->args[1] = last_publ;
	cb->args[2] = done;

	return skb->len;
}