socket.c 74.0 KB
Newer Older
P
Per Liden 已提交
1
/*
2
 * net/tipc/socket.c: TIPC socket API
3
 *
4
 * Copyright (c) 2001-2007, 2012-2015, Ericsson AB
5
 * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
P
Per Liden 已提交
6 7
 * All rights reserved.
 *
P
Per Liden 已提交
8
 * Redistribution and use in source and binary forms, with or without
P
Per Liden 已提交
9 10
 * modification, are permitted provided that the following conditions are met:
 *
P
Per Liden 已提交
11 12 13 14 15 16 17 18
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 * 3. Neither the names of the copyright holders nor the names of its
 *    contributors may be used to endorse or promote products derived from
 *    this software without specific prior written permission.
P
Per Liden 已提交
19
 *
P
Per Liden 已提交
20 21 22 23 24 25 26 27 28 29 30 31 32 33
 * Alternatively, this software may be distributed under the terms of the
 * GNU General Public License ("GPL") version 2 as published by the Free
 * Software Foundation.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
P
Per Liden 已提交
34 35 36
 * POSSIBILITY OF SUCH DAMAGE.
 */

37 38
#include <linux/rhashtable.h>
#include <linux/jhash.h>
P
Per Liden 已提交
39
#include "core.h"
40
#include "name_table.h"
E
Erik Hugne 已提交
41
#include "node.h"
42
#include "link.h"
43
#include "name_distr.h"
44
#include "socket.h"
45

46 47
#define SS_LISTENING		-1	/* socket is listening */
#define SS_READY		-2	/* socket is connectionless */
P
Per Liden 已提交
48

49
#define CONN_TIMEOUT_DEFAULT	8000	/* default connect timeout = 8s */
50
#define CONN_PROBING_INTERVAL	msecs_to_jiffies(3600000)  /* [ms] => 1 h */
51 52 53 54 55
#define TIPC_FWD_MSG		1
#define TIPC_CONN_OK		0
#define TIPC_CONN_PROBING	1
#define TIPC_MAX_PORT		0xffffffff
#define TIPC_MIN_PORT		1
56 57 58 59 60 61 62 63 64

/**
 * struct tipc_sock - TIPC socket structure
 * @sk: socket - interacts with 'port' and with user via the socket API
 * @connected: non-zero if port is currently connected to a peer port
 * @conn_type: TIPC type used when connection was established
 * @conn_instance: TIPC instance used when connection was established
 * @published: non-zero if port has one or more associated names
 * @max_pkt: maximum packet size "hint" used when building messages sent by port
65
 * @portid: unique port identity in TIPC socket hash table
66 67 68 69 70
 * @phdr: preformatted message header used when sending messages
 * @port_list: adjacent ports in TIPC's global list of ports
 * @publications: list of publications for port
 * @pub_count: total # of publications port has made during its lifetime
 * @probing_state:
71
 * @probing_intv:
72 73 74 75 76
 * @conn_timeout: the time we can wait for an unresponded setup request
 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
 * @link_cong: non-zero if owner must sleep because of link congestion
 * @sent_unacked: # messages sent by socket, and not yet acked by peer
 * @rcv_unacked: # messages read by user, but not yet acked back to peer
77 78
 * @node: hash table node
 * @rcu: rcu struct for tipc_sock
79 80 81 82 83 84 85 86
 */
struct tipc_sock {
	struct sock sk;
	int connected;
	u32 conn_type;
	u32 conn_instance;
	int published;
	u32 max_pkt;
87
	u32 portid;
88 89 90 91 92
	struct tipc_msg phdr;
	struct list_head sock_list;
	struct list_head publications;
	u32 pub_count;
	u32 probing_state;
93
	unsigned long probing_intv;
94 95 96 97 98
	uint conn_timeout;
	atomic_t dupl_rcvcnt;
	bool link_cong;
	uint sent_unacked;
	uint rcv_unacked;
99 100
	struct rhash_head node;
	struct rcu_head rcu;
101
};
P
Per Liden 已提交
102

103
static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
104
static void tipc_data_ready(struct sock *sk);
105
static void tipc_write_space(struct sock *sk);
106 107
static int tipc_release(struct socket *sock);
static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
108
static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
109
static void tipc_sk_timeout(unsigned long data);
110
static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
J
Jon Paul Maloy 已提交
111
			   struct tipc_name_seq const *seq);
112
static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
J
Jon Paul Maloy 已提交
113
			    struct tipc_name_seq const *seq);
114
static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
115 116
static int tipc_sk_insert(struct tipc_sock *tsk);
static void tipc_sk_remove(struct tipc_sock *tsk);
P
Per Liden 已提交
117

118 119 120
static const struct proto_ops packet_ops;
static const struct proto_ops stream_ops;
static const struct proto_ops msg_ops;
P
Per Liden 已提交
121 122

static struct proto tipc_proto;
123
static struct proto tipc_proto_kern;
P
Per Liden 已提交
124

125 126 127 128 129 130 131 132
static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = {
	[TIPC_NLA_SOCK_UNSPEC]		= { .type = NLA_UNSPEC },
	[TIPC_NLA_SOCK_ADDR]		= { .type = NLA_U32 },
	[TIPC_NLA_SOCK_REF]		= { .type = NLA_U32 },
	[TIPC_NLA_SOCK_CON]		= { .type = NLA_NESTED },
	[TIPC_NLA_SOCK_HAS_PUBL]	= { .type = NLA_FLAG }
};

133
/*
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
 * Revised TIPC socket locking policy:
 *
 * Most socket operations take the standard socket lock when they start
 * and hold it until they finish (or until they need to sleep).  Acquiring
 * this lock grants the owner exclusive access to the fields of the socket
 * data structures, with the exception of the backlog queue.  A few socket
 * operations can be done without taking the socket lock because they only
 * read socket information that never changes during the life of the socket.
 *
 * Socket operations may acquire the lock for the associated TIPC port if they
 * need to perform an operation on the port.  If any routine needs to acquire
 * both the socket lock and the port lock it must take the socket lock first
 * to avoid the risk of deadlock.
 *
 * The dispatcher handling incoming messages cannot grab the socket lock in
 * the standard fashion, since invoked it runs at the BH level and cannot block.
 * Instead, it checks to see if the socket lock is currently owned by someone,
 * and either handles the message itself or adds it to the socket's backlog
 * queue; in the latter case the queued message is processed once the process
 * owning the socket lock releases it.
 *
 * NOTE: Releasing the socket lock while an operation is sleeping overcomes
 * the problem of a blocked socket operation preventing any other operations
 * from occurring.  However, applications must be careful if they have
 * multiple threads trying to send (or receive) on the same socket, as these
 * operations might interfere with each other.  For example, doing a connect
 * and a receive at the same time might allow the receive to consume the
 * ACK message meant for the connect.  While additional work could be done
 * to try and overcome this, it doesn't seem to be worthwhile at the present.
 *
 * NOTE: Releasing the socket lock while an operation is sleeping also ensures
 * that another operation that must be performed in a non-blocking manner is
 * not delayed for very long because the lock has already been taken.
 *
 * NOTE: This code assumes that certain fields of a port/socket pair are
 * constant over its lifetime; such fields can be examined without taking
 * the socket lock and/or port lock, and do not need to be re-read even
 * after resuming processing after waiting.  These fields include:
 *   - socket type
 *   - pointer to socket sk structure (aka tipc_sock structure)
 *   - pointer to port structure
 *   - port reference
 */

178 179 180 181 182
static u32 tsk_own_node(struct tipc_sock *tsk)
{
	return msg_prevnode(&tsk->phdr);
}

183
static u32 tsk_peer_node(struct tipc_sock *tsk)
184
{
185
	return msg_destnode(&tsk->phdr);
186 187
}

188
static u32 tsk_peer_port(struct tipc_sock *tsk)
189
{
190
	return msg_destport(&tsk->phdr);
191 192
}

193
static  bool tsk_unreliable(struct tipc_sock *tsk)
194
{
195
	return msg_src_droppable(&tsk->phdr) != 0;
196 197
}

198
static void tsk_set_unreliable(struct tipc_sock *tsk, bool unreliable)
199
{
200
	msg_set_src_droppable(&tsk->phdr, unreliable ? 1 : 0);
201 202
}

203
static bool tsk_unreturnable(struct tipc_sock *tsk)
204
{
205
	return msg_dest_droppable(&tsk->phdr) != 0;
206 207
}

208
static void tsk_set_unreturnable(struct tipc_sock *tsk, bool unreturnable)
209
{
210
	msg_set_dest_droppable(&tsk->phdr, unreturnable ? 1 : 0);
211 212
}

213
static int tsk_importance(struct tipc_sock *tsk)
214
{
215
	return msg_importance(&tsk->phdr);
216 217
}

218
static int tsk_set_importance(struct tipc_sock *tsk, int imp)
219 220 221
{
	if (imp > TIPC_CRITICAL_IMPORTANCE)
		return -EINVAL;
222
	msg_set_importance(&tsk->phdr, (u32)imp);
223 224
	return 0;
}
225

226 227 228 229 230 231 232 233 234 235
static struct tipc_sock *tipc_sk(const struct sock *sk)
{
	return container_of(sk, struct tipc_sock, sk);
}

static int tsk_conn_cong(struct tipc_sock *tsk)
{
	return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
}

236
/**
237
 * tsk_advance_rx_queue - discard first buffer in socket receive queue
238 239
 *
 * Caller must hold socket lock
P
Per Liden 已提交
240
 */
241
static void tsk_advance_rx_queue(struct sock *sk)
P
Per Liden 已提交
242
{
243
	kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
P
Per Liden 已提交
244 245 246
}

/**
247
 * tsk_rej_rx_queue - reject all buffers in socket receive queue
248 249
 *
 * Caller must hold socket lock
P
Per Liden 已提交
250
 */
251
static void tsk_rej_rx_queue(struct sock *sk)
P
Per Liden 已提交
252
{
253
	struct sk_buff *skb;
254
	u32 dnode;
255
	u32 own_node = tsk_own_node(tipc_sk(sk));
256

257
	while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
258 259
		if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_ERR_NO_PORT))
			tipc_link_xmit_skb(sock_net(sk), skb, dnode, 0);
260
	}
P
Per Liden 已提交
261 262
}

263
/* tsk_peer_msg - verify if message was sent by connected port's peer
J
Jon Paul Maloy 已提交
264 265 266 267
 *
 * Handles cases where the node's network address has changed from
 * the default of <0.0.0> to its configured setting.
 */
268
static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
J
Jon Paul Maloy 已提交
269
{
270
	struct tipc_net *tn = net_generic(sock_net(&tsk->sk), tipc_net_id);
271
	u32 peer_port = tsk_peer_port(tsk);
J
Jon Paul Maloy 已提交
272 273 274
	u32 orig_node;
	u32 peer_node;

275
	if (unlikely(!tsk->connected))
J
Jon Paul Maloy 已提交
276 277 278 279 280 281
		return false;

	if (unlikely(msg_origport(msg) != peer_port))
		return false;

	orig_node = msg_orignode(msg);
282
	peer_node = tsk_peer_node(tsk);
J
Jon Paul Maloy 已提交
283 284 285 286

	if (likely(orig_node == peer_node))
		return true;

287
	if (!orig_node && (peer_node == tn->own_addr))
J
Jon Paul Maloy 已提交
288 289
		return true;

290
	if (!peer_node && (orig_node == tn->own_addr))
J
Jon Paul Maloy 已提交
291 292 293 294 295
		return true;

	return false;
}

P
Per Liden 已提交
296
/**
297
 * tipc_sk_create - create a TIPC socket
298
 * @net: network namespace (must be default network)
P
Per Liden 已提交
299 300
 * @sock: pre-allocated socket structure
 * @protocol: protocol indicator (must be 0)
301
 * @kern: caused by kernel or by userspace?
302
 *
303 304
 * This routine creates additional data structures used by the TIPC socket,
 * initializes them, and links them together.
P
Per Liden 已提交
305 306 307
 *
 * Returns 0 on success, errno otherwise
 */
308 309
static int tipc_sk_create(struct net *net, struct socket *sock,
			  int protocol, int kern)
P
Per Liden 已提交
310
{
311
	struct tipc_net *tn;
312 313
	const struct proto_ops *ops;
	socket_state state;
P
Per Liden 已提交
314
	struct sock *sk;
315
	struct tipc_sock *tsk;
316
	struct tipc_msg *msg;
317 318

	/* Validate arguments */
P
Per Liden 已提交
319 320 321 322 323
	if (unlikely(protocol != 0))
		return -EPROTONOSUPPORT;

	switch (sock->type) {
	case SOCK_STREAM:
324 325
		ops = &stream_ops;
		state = SS_UNCONNECTED;
P
Per Liden 已提交
326 327
		break;
	case SOCK_SEQPACKET:
328 329
		ops = &packet_ops;
		state = SS_UNCONNECTED;
P
Per Liden 已提交
330 331 332
		break;
	case SOCK_DGRAM:
	case SOCK_RDM:
333 334
		ops = &msg_ops;
		state = SS_READY;
P
Per Liden 已提交
335
		break;
336 337
	default:
		return -EPROTOTYPE;
P
Per Liden 已提交
338 339
	}

340
	/* Allocate socket's protocol area */
341 342 343 344 345
	if (!kern)
		sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
	else
		sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto_kern);

346
	if (sk == NULL)
P
Per Liden 已提交
347 348
		return -ENOMEM;

349
	tsk = tipc_sk(sk);
350 351 352
	tsk->max_pkt = MAX_PKT_DEFAULT;
	INIT_LIST_HEAD(&tsk->publications);
	msg = &tsk->phdr;
353 354
	tn = net_generic(sock_net(sk), tipc_net_id);
	tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
355
		      NAMED_H_SIZE, 0);
P
Per Liden 已提交
356

357 358 359 360
	/* Finish initializing socket data structures */
	sock->ops = ops;
	sock->state = state;
	sock_init_data(sock, sk);
361 362 363 364 365
	if (tipc_sk_insert(tsk)) {
		pr_warn("Socket create failed; port numbrer exhausted\n");
		return -EINVAL;
	}
	msg_set_origport(msg, tsk->portid);
366
	setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk);
367
	sk->sk_backlog_rcv = tipc_backlog_rcv;
368
	sk->sk_rcvbuf = sysctl_tipc_rmem[1];
369 370
	sk->sk_data_ready = tipc_data_ready;
	sk->sk_write_space = tipc_write_space;
371
	tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
372
	tsk->sent_unacked = 0;
373
	atomic_set(&tsk->dupl_rcvcnt, 0);
374

375
	if (sock->state == SS_READY) {
376
		tsk_set_unreturnable(tsk, true);
377
		if (sock->type == SOCK_DGRAM)
378
			tsk_set_unreliable(tsk, true);
379
	}
P
Per Liden 已提交
380 381 382
	return 0;
}

383 384 385 386 387 388 389 390 391 392 393
/**
 * tipc_sock_create_local - create TIPC socket from inside TIPC module
 * @type: socket type - SOCK_RDM or SOCK_SEQPACKET
 *
 * We cannot use sock_creat_kern here because it bumps module user count.
 * Since socket owner and creator is the same module we must make sure
 * that module count remains zero for module local sockets, otherwise
 * we cannot do rmmod.
 *
 * Returns 0 on success, errno otherwise
 */
394
int tipc_sock_create_local(struct net *net, int type, struct socket **res)
395 396 397 398 399 400 401 402
{
	int rc;

	rc = sock_create_lite(AF_TIPC, type, 0, res);
	if (rc < 0) {
		pr_err("Failed to create kernel socket\n");
		return rc;
	}
403
	tipc_sk_create(net, *res, 0, 1);
404 405 406 407 408 409 410 411 412 413 414 415 416

	return 0;
}

/**
 * tipc_sock_release_local - release socket created by tipc_sock_create_local
 * @sock: the socket to be released.
 *
 * Module reference count is not incremented when such sockets are created,
 * so we must keep it from being decremented when they are released.
 */
void tipc_sock_release_local(struct socket *sock)
{
417
	tipc_release(sock);
418 419 420 421 422 423 424 425 426 427 428 429 430 431 432
	sock->ops = NULL;
	sock_release(sock);
}

/**
 * tipc_sock_accept_local - accept a connection on a socket created
 * with tipc_sock_create_local. Use this function to avoid that
 * module reference count is inadvertently incremented.
 *
 * @sock:    the accepting socket
 * @newsock: reference to the new socket to be created
 * @flags:   socket flags
 */

int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
433
			   int flags)
434 435 436 437 438 439 440 441 442
{
	struct sock *sk = sock->sk;
	int ret;

	ret = sock_create_lite(sk->sk_family, sk->sk_type,
			       sk->sk_protocol, newsock);
	if (ret < 0)
		return ret;

443
	ret = tipc_accept(sock, *newsock, flags);
444 445 446 447 448 449 450 451
	if (ret < 0) {
		sock_release(*newsock);
		return ret;
	}
	(*newsock)->ops = sock->ops;
	return ret;
}

452 453 454 455 456 457 458
static void tipc_sk_callback(struct rcu_head *head)
{
	struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);

	sock_put(&tsk->sk);
}

P
Per Liden 已提交
459
/**
460
 * tipc_release - destroy a TIPC socket
P
Per Liden 已提交
461 462 463 464 465 466 467
 * @sock: socket to destroy
 *
 * This routine cleans up any messages that are still queued on the socket.
 * For DGRAM and RDM socket types, all queued messages are rejected.
 * For SEQPACKET and STREAM socket types, the first message is rejected
 * and any others are discarded.  (If the first message on a STREAM socket
 * is partially-read, it is discarded and the next one is rejected instead.)
468
 *
P
Per Liden 已提交
469 470 471 472 473 474
 * NOTE: Rejected messages are not necessarily returned to the sender!  They
 * are returned or discarded according to the "destination droppable" setting
 * specified for the message by the sender.
 *
 * Returns 0 on success, errno otherwise
 */
475
static int tipc_release(struct socket *sock)
P
Per Liden 已提交
476 477
{
	struct sock *sk = sock->sk;
478
	struct net *net;
479
	struct tipc_sock *tsk;
480
	struct sk_buff *skb;
481
	u32 dnode, probing_state;
P
Per Liden 已提交
482

483 484 485 486 487
	/*
	 * Exit if socket isn't fully initialized (occurs when a failed accept()
	 * releases a pre-allocated child socket that was never used)
	 */
	if (sk == NULL)
P
Per Liden 已提交
488
		return 0;
489

490
	net = sock_net(sk);
491
	tsk = tipc_sk(sk);
492 493 494 495 496 497
	lock_sock(sk);

	/*
	 * Reject all unreceived messages, except on an active connection
	 * (which disconnects locally & sends a 'FIN+' to peer)
	 */
498
	dnode = tsk_peer_node(tsk);
P
Per Liden 已提交
499
	while (sock->state != SS_DISCONNECTING) {
500 501
		skb = __skb_dequeue(&sk->sk_receive_queue);
		if (skb == NULL)
P
Per Liden 已提交
502
			break;
503 504
		if (TIPC_SKB_CB(skb)->handle != NULL)
			kfree_skb(skb);
505 506 507 508
		else {
			if ((sock->state == SS_CONNECTING) ||
			    (sock->state == SS_CONNECTED)) {
				sock->state = SS_DISCONNECTING;
509
				tsk->connected = 0;
510
				tipc_node_remove_conn(net, dnode, tsk->portid);
511
			}
512
			if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
513
					     TIPC_ERR_NO_PORT))
514
				tipc_link_xmit_skb(net, skb, dnode, 0);
515
		}
P
Per Liden 已提交
516 517
	}

518
	tipc_sk_withdraw(tsk, 0, NULL);
519
	probing_state = tsk->probing_state;
520 521
	if (del_timer_sync(&sk->sk_timer) &&
	    probing_state != TIPC_CONN_PROBING)
522
		sock_put(sk);
523
	tipc_sk_remove(tsk);
524
	if (tsk->connected) {
525
		skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
526
				      TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode,
527
				      tsk_own_node(tsk), tsk_peer_port(tsk),
528
				      tsk->portid, TIPC_ERR_NO_PORT);
529
		if (skb)
530 531
			tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
		tipc_node_remove_conn(net, dnode, tsk->portid);
532
	}
P
Per Liden 已提交
533

534
	/* Discard any remaining (connection-based) messages in receive queue */
535
	__skb_queue_purge(&sk->sk_receive_queue);
P
Per Liden 已提交
536

537 538 539
	/* Reject any messages that accumulated in backlog queue */
	sock->state = SS_DISCONNECTING;
	release_sock(sk);
540 541

	call_rcu(&tsk->rcu, tipc_sk_callback);
542
	sock->sk = NULL;
P
Per Liden 已提交
543

544
	return 0;
P
Per Liden 已提交
545 546 547
}

/**
548
 * tipc_bind - associate or disassocate TIPC name(s) with a socket
P
Per Liden 已提交
549 550 551
 * @sock: socket structure
 * @uaddr: socket address describing name(s) and desired operation
 * @uaddr_len: size of socket address data structure
552
 *
P
Per Liden 已提交
553 554 555
 * Name and name sequence binding is indicated using a positive scope value;
 * a negative scope value unbinds the specified name.  Specifying no name
 * (i.e. a socket address length of 0) unbinds all names from the socket.
556
 *
P
Per Liden 已提交
557
 * Returns 0 on success, errno otherwise
558 559 560
 *
 * NOTE: This routine doesn't need to take the socket lock since it doesn't
 *       access any non-constant socket information.
P
Per Liden 已提交
561
 */
562 563
static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
		     int uaddr_len)
P
Per Liden 已提交
564
{
565
	struct sock *sk = sock->sk;
P
Per Liden 已提交
566
	struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
567
	struct tipc_sock *tsk = tipc_sk(sk);
568
	int res = -EINVAL;
P
Per Liden 已提交
569

570 571
	lock_sock(sk);
	if (unlikely(!uaddr_len)) {
572
		res = tipc_sk_withdraw(tsk, 0, NULL);
573 574
		goto exit;
	}
575

576 577 578 579 580 581 582 583
	if (uaddr_len < sizeof(struct sockaddr_tipc)) {
		res = -EINVAL;
		goto exit;
	}
	if (addr->family != AF_TIPC) {
		res = -EAFNOSUPPORT;
		goto exit;
	}
P
Per Liden 已提交
584 585 586

	if (addr->addrtype == TIPC_ADDR_NAME)
		addr->addr.nameseq.upper = addr->addr.nameseq.lower;
587 588 589 590
	else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
		res = -EAFNOSUPPORT;
		goto exit;
	}
591

592
	if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
593
	    (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
594 595 596 597
	    (addr->addr.nameseq.type != TIPC_CFG_SRV)) {
		res = -EACCES;
		goto exit;
	}
598

599
	res = (addr->scope > 0) ?
600 601
		tipc_sk_publish(tsk, addr->scope, &addr->addr.nameseq) :
		tipc_sk_withdraw(tsk, -addr->scope, &addr->addr.nameseq);
602 603 604
exit:
	release_sock(sk);
	return res;
P
Per Liden 已提交
605 606
}

607
/**
608
 * tipc_getname - get port ID of socket or peer socket
P
Per Liden 已提交
609 610 611
 * @sock: socket structure
 * @uaddr: area for returned socket address
 * @uaddr_len: area for returned length of socket address
612
 * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
613
 *
P
Per Liden 已提交
614
 * Returns 0 on success, errno otherwise
615
 *
616 617
 * NOTE: This routine doesn't need to take the socket lock since it only
 *       accesses socket information that is unchanging (or which changes in
618
 *       a completely predictable manner).
P
Per Liden 已提交
619
 */
620 621
static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
			int *uaddr_len, int peer)
P
Per Liden 已提交
622 623
{
	struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
624
	struct tipc_sock *tsk = tipc_sk(sock->sk);
625
	struct tipc_net *tn = net_generic(sock_net(sock->sk), tipc_net_id);
P
Per Liden 已提交
626

627
	memset(addr, 0, sizeof(*addr));
628
	if (peer) {
629 630 631
		if ((sock->state != SS_CONNECTED) &&
			((peer != 2) || (sock->state != SS_DISCONNECTING)))
			return -ENOTCONN;
632 633
		addr->addr.id.ref = tsk_peer_port(tsk);
		addr->addr.id.node = tsk_peer_node(tsk);
634
	} else {
635
		addr->addr.id.ref = tsk->portid;
636
		addr->addr.id.node = tn->own_addr;
637
	}
P
Per Liden 已提交
638 639 640 641 642 643 644

	*uaddr_len = sizeof(*addr);
	addr->addrtype = TIPC_ADDR_ID;
	addr->family = AF_TIPC;
	addr->scope = 0;
	addr->addr.name.domain = 0;

645
	return 0;
P
Per Liden 已提交
646 647 648
}

/**
649
 * tipc_poll - read and possibly block on pollmask
P
Per Liden 已提交
650 651 652 653
 * @file: file structure associated with the socket
 * @sock: socket for which to calculate the poll bits
 * @wait: ???
 *
654 655 656 657 658 659 660 661 662
 * Returns pollmask value
 *
 * COMMENTARY:
 * It appears that the usual socket locking mechanisms are not useful here
 * since the pollmask info is potentially out-of-date the moment this routine
 * exits.  TCP and other protocols seem to rely on higher level poll routines
 * to handle any preventable race conditions, so TIPC will do the same ...
 *
 * TIPC sets the returned events as follows:
663 664 665 666
 *
 * socket state		flags set
 * ------------		---------
 * unconnected		no read flags
667
 *			POLLOUT if port is not congested
668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686
 *
 * connecting		POLLIN/POLLRDNORM if ACK/NACK in rx queue
 *			no write flags
 *
 * connected		POLLIN/POLLRDNORM if data in rx queue
 *			POLLOUT if port is not congested
 *
 * disconnecting	POLLIN/POLLRDNORM/POLLHUP
 *			no write flags
 *
 * listening		POLLIN if SYN in rx queue
 *			no write flags
 *
 * ready		POLLIN/POLLRDNORM if data in rx queue
 * [connectionless]	POLLOUT (since port cannot be congested)
 *
 * IMPORTANT: The fact that a read or write operation is indicated does NOT
 * imply that the operation will succeed, merely that it should be performed
 * and will not block.
P
Per Liden 已提交
687
 */
688 689
static unsigned int tipc_poll(struct file *file, struct socket *sock,
			      poll_table *wait)
P
Per Liden 已提交
690
{
691
	struct sock *sk = sock->sk;
692
	struct tipc_sock *tsk = tipc_sk(sk);
693
	u32 mask = 0;
694

695
	sock_poll_wait(file, sk_sleep(sk), wait);
696

697
	switch ((int)sock->state) {
698
	case SS_UNCONNECTED:
699
		if (!tsk->link_cong)
700 701
			mask |= POLLOUT;
		break;
702 703
	case SS_READY:
	case SS_CONNECTED:
704
		if (!tsk->link_cong && !tsk_conn_cong(tsk))
705 706 707 708 709 710 711 712 713 714 715
			mask |= POLLOUT;
		/* fall thru' */
	case SS_CONNECTING:
	case SS_LISTENING:
		if (!skb_queue_empty(&sk->sk_receive_queue))
			mask |= (POLLIN | POLLRDNORM);
		break;
	case SS_DISCONNECTING:
		mask = (POLLIN | POLLRDNORM | POLLHUP);
		break;
	}
716 717

	return mask;
P
Per Liden 已提交
718 719
}

720 721 722 723
/**
 * tipc_sendmcast - send multicast message
 * @sock: socket structure
 * @seq: destination address
724
 * @msg: message to send
725 726 727 728 729 730 731
 * @dsz: total length of message data
 * @timeo: timeout to wait for wakeup
 *
 * Called from function tipc_sendmsg(), which has done all sanity checks
 * Returns the number of bytes sent on success, or errno
 */
static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
732
			  struct msghdr *msg, size_t dsz, long timeo)
733 734
{
	struct sock *sk = sock->sk;
735
	struct tipc_sock *tsk = tipc_sk(sk);
736
	struct net *net = sock_net(sk);
737
	struct tipc_msg *mhdr = &tsk->phdr;
738
	struct sk_buff_head *pktchain = &sk->sk_write_queue;
A
Al Viro 已提交
739
	struct iov_iter save = msg->msg_iter;
740 741 742 743 744 745 746 747 748 749 750 751 752 753
	uint mtu;
	int rc;

	msg_set_type(mhdr, TIPC_MCAST_MSG);
	msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
	msg_set_destport(mhdr, 0);
	msg_set_destnode(mhdr, 0);
	msg_set_nametype(mhdr, seq->type);
	msg_set_namelower(mhdr, seq->lower);
	msg_set_nameupper(mhdr, seq->upper);
	msg_set_hdr_sz(mhdr, MCAST_H_SIZE);

new_mtu:
	mtu = tipc_bclink_get_mtu();
754
	rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, pktchain);
755 756 757 758
	if (unlikely(rc < 0))
		return rc;

	do {
759
		rc = tipc_bclink_xmit(net, pktchain);
760 761 762 763
		if (likely(rc >= 0)) {
			rc = dsz;
			break;
		}
A
Al Viro 已提交
764 765
		if (rc == -EMSGSIZE) {
			msg->msg_iter = save;
766
			goto new_mtu;
A
Al Viro 已提交
767
		}
768 769
		if (rc != -ELINKCONG)
			break;
770
		tipc_sk(sk)->link_cong = 1;
771 772
		rc = tipc_wait_for_sndmsg(sock, &timeo);
		if (rc)
773
			__skb_queue_purge(pktchain);
774 775 776 777
	} while (!rc);
	return rc;
}

778 779 780 781 782 783
/**
 * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
 * @arrvq: queue with arriving messages, to be cloned after destination lookup
 * @inputq: queue with cloned messages, delivered to socket after dest lookup
 *
 * Multi-threaded: parallel calls with reference to same queues may occur
784
 */
785 786
void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
		       struct sk_buff_head *inputq)
787
{
788
	struct tipc_msg *msg;
789 790
	struct tipc_plist dports;
	u32 portid;
791
	u32 scope = TIPC_CLUSTER_SCOPE;
792 793 794
	struct sk_buff_head tmpq;
	uint hsz;
	struct sk_buff *skb, *_skb;
795

796
	__skb_queue_head_init(&tmpq);
797
	tipc_plist_init(&dports);
798

799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819
	skb = tipc_skb_peek(arrvq, &inputq->lock);
	for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
		msg = buf_msg(skb);
		hsz = skb_headroom(skb) + msg_hdr_sz(msg);

		if (in_own_node(net, msg_orignode(msg)))
			scope = TIPC_NODE_SCOPE;

		/* Create destination port list and message clones: */
		tipc_nametbl_mc_translate(net,
					  msg_nametype(msg), msg_namelower(msg),
					  msg_nameupper(msg), scope, &dports);
		portid = tipc_plist_pop(&dports);
		for (; portid; portid = tipc_plist_pop(&dports)) {
			_skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
			if (_skb) {
				msg_set_destport(buf_msg(_skb), portid);
				__skb_queue_tail(&tmpq, _skb);
				continue;
			}
			pr_warn("Failed to clone mcast rcv buffer\n");
820
		}
821 822 823 824 825 826 827 828 829
		/* Append to inputq if not already done by other thread */
		spin_lock_bh(&inputq->lock);
		if (skb_peek(arrvq) == skb) {
			skb_queue_splice_tail_init(&tmpq, inputq);
			kfree_skb(__skb_dequeue(arrvq));
		}
		spin_unlock_bh(&inputq->lock);
		__skb_queue_purge(&tmpq);
		kfree_skb(skb);
830
	}
831
	tipc_sk_rcv(net, inputq);
832 833
}

834 835 836
/**
 * tipc_sk_proto_rcv - receive a connection mng protocol message
 * @tsk: receiving socket
837
 * @skb: pointer to message buffer. Set to NULL if buffer is consumed.
838
 */
839
static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff **skb)
840
{
841
	struct tipc_msg *msg = buf_msg(*skb);
842
	int conn_cong;
843 844
	u32 dnode;
	u32 own_node = tsk_own_node(tsk);
845
	/* Ignore if connection cannot be validated: */
846
	if (!tsk_peer_msg(tsk, msg))
847 848
		goto exit;

849
	tsk->probing_state = TIPC_CONN_OK;
850 851

	if (msg_type(msg) == CONN_ACK) {
852
		conn_cong = tsk_conn_cong(tsk);
853 854
		tsk->sent_unacked -= msg_msgcnt(msg);
		if (conn_cong)
855
			tsk->sk.sk_write_space(&tsk->sk);
856
	} else if (msg_type(msg) == CONN_PROBE) {
857 858 859 860
		if (tipc_msg_reverse(own_node, *skb, &dnode, TIPC_OK)) {
			msg_set_type(msg, CONN_PROBE_REPLY);
			return;
		}
861 862 863
	}
	/* Do nothing if msg_type() == CONN_PROBE_REPLY */
exit:
864 865
	kfree_skb(*skb);
	*skb = NULL;
866 867
}

868 869 870
static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
{
	struct sock *sk = sock->sk;
871
	struct tipc_sock *tsk = tipc_sk(sk);
872 873 874 875 876 877 878 879 880 881 882 883 884 885 886
	DEFINE_WAIT(wait);
	int done;

	do {
		int err = sock_error(sk);
		if (err)
			return err;
		if (sock->state == SS_DISCONNECTING)
			return -EPIPE;
		if (!*timeo_p)
			return -EAGAIN;
		if (signal_pending(current))
			return sock_intr_errno(*timeo_p);

		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
887
		done = sk_wait_event(sk, timeo_p, !tsk->link_cong);
888 889 890 891 892
		finish_wait(sk_sleep(sk), &wait);
	} while (!done);
	return 0;
}

P
Per Liden 已提交
893
/**
894
 * tipc_sendmsg - send message in connectionless manner
895
 * @iocb: if NULL, indicates that socket lock is already held
P
Per Liden 已提交
896 897
 * @sock: socket structure
 * @m: message to send
898
 * @dsz: amount of user data to be sent
899
 *
P
Per Liden 已提交
900
 * Message must have an destination specified explicitly.
901
 * Used for SOCK_RDM and SOCK_DGRAM messages,
P
Per Liden 已提交
902 903
 * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
 * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
904
 *
P
Per Liden 已提交
905 906
 * Returns the number of bytes sent on success, or errno otherwise
 */
907
static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
908
			struct msghdr *m, size_t dsz)
P
Per Liden 已提交
909
{
910
	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
911
	struct sock *sk = sock->sk;
912
	struct tipc_sock *tsk = tipc_sk(sk);
913
	struct net *net = sock_net(sk);
914
	struct tipc_msg *mhdr = &tsk->phdr;
915
	u32 dnode, dport;
916
	struct sk_buff_head *pktchain = &sk->sk_write_queue;
917
	struct sk_buff *skb;
918
	struct tipc_name_seq *seq = &dest->addr.nameseq;
A
Al Viro 已提交
919
	struct iov_iter save;
920
	u32 mtu;
921
	long timeo;
E
Erik Hugne 已提交
922
	int rc;
P
Per Liden 已提交
923 924 925

	if (unlikely(!dest))
		return -EDESTADDRREQ;
926

927 928
	if (unlikely((m->msg_namelen < sizeof(*dest)) ||
		     (dest->family != AF_TIPC)))
P
Per Liden 已提交
929
		return -EINVAL;
930 931

	if (dsz > TIPC_MAX_USER_MSG_SIZE)
932
		return -EMSGSIZE;
P
Per Liden 已提交
933

934 935 936
	if (iocb)
		lock_sock(sk);

937
	if (unlikely(sock->state != SS_READY)) {
938
		if (sock->state == SS_LISTENING) {
939
			rc = -EPIPE;
940 941 942
			goto exit;
		}
		if (sock->state != SS_UNCONNECTED) {
943
			rc = -EISCONN;
944 945
			goto exit;
		}
946
		if (tsk->published) {
947
			rc = -EOPNOTSUPP;
948 949
			goto exit;
		}
950
		if (dest->addrtype == TIPC_ADDR_NAME) {
951 952
			tsk->conn_type = dest->addr.name.name.type;
			tsk->conn_instance = dest->addr.name.name.instance;
953
		}
P
Per Liden 已提交
954 955
	}

956
	timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
957 958

	if (dest->addrtype == TIPC_ADDR_MCAST) {
959
		rc = tipc_sendmcast(sock, seq, m, dsz, timeo);
960 961 962 963 964 965 966 967 968 969 970 971
		goto exit;
	} else if (dest->addrtype == TIPC_ADDR_NAME) {
		u32 type = dest->addr.name.name.type;
		u32 inst = dest->addr.name.name.instance;
		u32 domain = dest->addr.name.domain;

		dnode = domain;
		msg_set_type(mhdr, TIPC_NAMED_MSG);
		msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
		msg_set_nametype(mhdr, type);
		msg_set_nameinst(mhdr, inst);
		msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
972
		dport = tipc_nametbl_translate(net, type, inst, &dnode);
973 974 975 976 977
		msg_set_destnode(mhdr, dnode);
		msg_set_destport(mhdr, dport);
		if (unlikely(!dport && !dnode)) {
			rc = -EHOSTUNREACH;
			goto exit;
978
		}
979 980 981 982 983 984 985 986 987
	} else if (dest->addrtype == TIPC_ADDR_ID) {
		dnode = dest->addr.id.node;
		msg_set_type(mhdr, TIPC_DIRECT_MSG);
		msg_set_lookup_scope(mhdr, 0);
		msg_set_destnode(mhdr, dnode);
		msg_set_destport(mhdr, dest->addr.id.ref);
		msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
	}

A
Al Viro 已提交
988
	save = m->msg_iter;
989
new_mtu:
990
	mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
991
	rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, pktchain);
992 993 994 995
	if (rc < 0)
		goto exit;

	do {
996
		skb = skb_peek(pktchain);
997
		TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
998
		rc = tipc_link_xmit(net, pktchain, dnode, tsk->portid);
999 1000
		if (likely(rc >= 0)) {
			if (sock->state != SS_READY)
1001
				sock->state = SS_CONNECTING;
1002
			rc = dsz;
1003
			break;
1004
		}
A
Al Viro 已提交
1005 1006
		if (rc == -EMSGSIZE) {
			m->msg_iter = save;
1007
			goto new_mtu;
A
Al Viro 已提交
1008
		}
1009
		if (rc != -ELINKCONG)
1010
			break;
1011
		tsk->link_cong = 1;
1012
		rc = tipc_wait_for_sndmsg(sock, &timeo);
1013
		if (rc)
1014
			__skb_queue_purge(pktchain);
1015
	} while (!rc);
1016 1017 1018
exit:
	if (iocb)
		release_sock(sk);
1019 1020

	return rc;
P
Per Liden 已提交
1021 1022
}

1023 1024 1025
static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
{
	struct sock *sk = sock->sk;
1026
	struct tipc_sock *tsk = tipc_sk(sk);
1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044
	DEFINE_WAIT(wait);
	int done;

	do {
		int err = sock_error(sk);
		if (err)
			return err;
		if (sock->state == SS_DISCONNECTING)
			return -EPIPE;
		else if (sock->state != SS_CONNECTED)
			return -ENOTCONN;
		if (!*timeo_p)
			return -EAGAIN;
		if (signal_pending(current))
			return sock_intr_errno(*timeo_p);

		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
		done = sk_wait_event(sk, timeo_p,
1045
				     (!tsk->link_cong &&
1046 1047
				      !tsk_conn_cong(tsk)) ||
				     !tsk->connected);
1048 1049 1050 1051 1052
		finish_wait(sk_sleep(sk), &wait);
	} while (!done);
	return 0;
}

1053
/**
1054 1055
 * tipc_send_stream - send stream-oriented data
 * @iocb: (unused)
P
Per Liden 已提交
1056
 * @sock: socket structure
1057 1058
 * @m: data to send
 * @dsz: total length of data to be transmitted
1059
 *
1060
 * Used for SOCK_STREAM data.
1061
 *
1062 1063
 * Returns the number of bytes sent on success (or partial success),
 * or errno if no data sent
P
Per Liden 已提交
1064
 */
1065 1066
static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
			    struct msghdr *m, size_t dsz)
P
Per Liden 已提交
1067
{
1068
	struct sock *sk = sock->sk;
1069
	struct net *net = sock_net(sk);
1070
	struct tipc_sock *tsk = tipc_sk(sk);
1071
	struct tipc_msg *mhdr = &tsk->phdr;
1072
	struct sk_buff_head *pktchain = &sk->sk_write_queue;
1073
	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1074
	u32 portid = tsk->portid;
1075
	int rc = -EINVAL;
1076
	long timeo;
1077 1078
	u32 dnode;
	uint mtu, send, sent = 0;
A
Al Viro 已提交
1079
	struct iov_iter save;
P
Per Liden 已提交
1080 1081

	/* Handle implied connection establishment */
1082 1083 1084
	if (unlikely(dest)) {
		rc = tipc_sendmsg(iocb, sock, m, dsz);
		if (dsz && (dsz == rc))
1085
			tsk->sent_unacked = 1;
1086 1087 1088
		return rc;
	}
	if (dsz > (uint)INT_MAX)
1089 1090
		return -EMSGSIZE;

1091 1092
	if (iocb)
		lock_sock(sk);
P
Per Liden 已提交
1093

1094 1095
	if (unlikely(sock->state != SS_CONNECTED)) {
		if (sock->state == SS_DISCONNECTING)
1096
			rc = -EPIPE;
1097
		else
1098
			rc = -ENOTCONN;
1099 1100
		goto exit;
	}
1101

1102
	timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1103
	dnode = tsk_peer_node(tsk);
1104 1105

next:
A
Al Viro 已提交
1106
	save = m->msg_iter;
1107
	mtu = tsk->max_pkt;
1108
	send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
1109
	rc = tipc_msg_build(mhdr, m, sent, send, mtu, pktchain);
1110 1111
	if (unlikely(rc < 0))
		goto exit;
1112
	do {
1113
		if (likely(!tsk_conn_cong(tsk))) {
1114
			rc = tipc_link_xmit(net, pktchain, dnode, portid);
1115
			if (likely(!rc)) {
1116
				tsk->sent_unacked++;
1117 1118 1119 1120 1121 1122
				sent += send;
				if (sent == dsz)
					break;
				goto next;
			}
			if (rc == -EMSGSIZE) {
1123 1124
				tsk->max_pkt = tipc_node_get_mtu(net, dnode,
								 portid);
A
Al Viro 已提交
1125
				m->msg_iter = save;
1126 1127 1128 1129
				goto next;
			}
			if (rc != -ELINKCONG)
				break;
1130
			tsk->link_cong = 1;
1131 1132
		}
		rc = tipc_wait_for_sndpkt(sock, &timeo);
1133
		if (rc)
1134
			__skb_queue_purge(pktchain);
1135
	} while (!rc);
1136
exit:
1137 1138
	if (iocb)
		release_sock(sk);
1139
	return sent ? sent : rc;
P
Per Liden 已提交
1140 1141
}

1142
/**
1143 1144
 * tipc_send_packet - send a connection-oriented message
 * @iocb: if NULL, indicates that socket lock is already held
P
Per Liden 已提交
1145
 * @sock: socket structure
1146 1147
 * @m: message to send
 * @dsz: length of data to be transmitted
1148
 *
1149
 * Used for SOCK_SEQPACKET messages.
1150
 *
1151
 * Returns the number of bytes sent on success, or errno otherwise
P
Per Liden 已提交
1152
 */
1153 1154
static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
			    struct msghdr *m, size_t dsz)
P
Per Liden 已提交
1155
{
1156 1157
	if (dsz > TIPC_MAX_USER_MSG_SIZE)
		return -EMSGSIZE;
P
Per Liden 已提交
1158

1159
	return tipc_send_stream(iocb, sock, m, dsz);
P
Per Liden 已提交
1160 1161
}

1162
/* tipc_sk_finish_conn - complete the setup of a connection
P
Per Liden 已提交
1163
 */
1164
static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1165
				u32 peer_node)
P
Per Liden 已提交
1166
{
1167 1168
	struct sock *sk = &tsk->sk;
	struct net *net = sock_net(sk);
1169
	struct tipc_msg *msg = &tsk->phdr;
P
Per Liden 已提交
1170

1171 1172 1173 1174 1175
	msg_set_destnode(msg, peer_node);
	msg_set_destport(msg, peer_port);
	msg_set_type(msg, TIPC_CONN_MSG);
	msg_set_lookup_scope(msg, 0);
	msg_set_hdr_sz(msg, SHORT_H_SIZE);
1176

1177
	tsk->probing_intv = CONN_PROBING_INTERVAL;
1178 1179
	tsk->probing_state = TIPC_CONN_OK;
	tsk->connected = 1;
1180
	sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
1181 1182
	tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
	tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
P
Per Liden 已提交
1183 1184 1185 1186 1187 1188
}

/**
 * set_orig_addr - capture sender's address for received message
 * @m: descriptor for message info
 * @msg: received message header
1189
 *
P
Per Liden 已提交
1190 1191
 * Note: Address is not captured if not requested by receiver.
 */
S
Sam Ravnborg 已提交
1192
static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
P
Per Liden 已提交
1193
{
1194
	DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name);
P
Per Liden 已提交
1195

1196
	if (addr) {
P
Per Liden 已提交
1197 1198
		addr->family = AF_TIPC;
		addr->addrtype = TIPC_ADDR_ID;
1199
		memset(&addr->addr, 0, sizeof(addr->addr));
P
Per Liden 已提交
1200 1201
		addr->addr.id.ref = msg_origport(msg);
		addr->addr.id.node = msg_orignode(msg);
1202 1203
		addr->addr.name.domain = 0;	/* could leave uninitialized */
		addr->scope = 0;		/* could leave uninitialized */
P
Per Liden 已提交
1204 1205 1206 1207 1208
		m->msg_namelen = sizeof(struct sockaddr_tipc);
	}
}

/**
1209
 * tipc_sk_anc_data_recv - optionally capture ancillary data for received message
P
Per Liden 已提交
1210 1211
 * @m: descriptor for message info
 * @msg: received message header
1212
 * @tsk: TIPC port associated with message
1213
 *
P
Per Liden 已提交
1214
 * Note: Ancillary data is not captured if not requested by receiver.
1215
 *
P
Per Liden 已提交
1216 1217
 * Returns 0 if successful, otherwise errno
 */
1218 1219
static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
				 struct tipc_sock *tsk)
P
Per Liden 已提交
1220 1221 1222 1223
{
	u32 anc_data[3];
	u32 err;
	u32 dest_type;
1224
	int has_name;
P
Per Liden 已提交
1225 1226 1227 1228 1229 1230 1231 1232 1233 1234
	int res;

	if (likely(m->msg_controllen == 0))
		return 0;

	/* Optionally capture errored message object(s) */
	err = msg ? msg_errcode(msg) : 0;
	if (unlikely(err)) {
		anc_data[0] = err;
		anc_data[1] = msg_data_sz(msg);
1235 1236
		res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
		if (res)
P
Per Liden 已提交
1237
			return res;
1238 1239 1240 1241 1242 1243
		if (anc_data[1]) {
			res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
				       msg_data(msg));
			if (res)
				return res;
		}
P
Per Liden 已提交
1244 1245 1246 1247 1248 1249
	}

	/* Optionally capture message destination object */
	dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
	switch (dest_type) {
	case TIPC_NAMED_MSG:
1250
		has_name = 1;
P
Per Liden 已提交
1251 1252 1253 1254 1255
		anc_data[0] = msg_nametype(msg);
		anc_data[1] = msg_namelower(msg);
		anc_data[2] = msg_namelower(msg);
		break;
	case TIPC_MCAST_MSG:
1256
		has_name = 1;
P
Per Liden 已提交
1257 1258 1259 1260 1261
		anc_data[0] = msg_nametype(msg);
		anc_data[1] = msg_namelower(msg);
		anc_data[2] = msg_nameupper(msg);
		break;
	case TIPC_CONN_MSG:
1262 1263 1264 1265
		has_name = (tsk->conn_type != 0);
		anc_data[0] = tsk->conn_type;
		anc_data[1] = tsk->conn_instance;
		anc_data[2] = tsk->conn_instance;
P
Per Liden 已提交
1266 1267
		break;
	default:
1268
		has_name = 0;
P
Per Liden 已提交
1269
	}
1270 1271 1272 1273 1274
	if (has_name) {
		res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
		if (res)
			return res;
	}
P
Per Liden 已提交
1275 1276 1277 1278

	return 0;
}

1279
static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
1280
{
1281
	struct net *net = sock_net(&tsk->sk);
1282
	struct sk_buff *skb = NULL;
1283
	struct tipc_msg *msg;
1284 1285
	u32 peer_port = tsk_peer_port(tsk);
	u32 dnode = tsk_peer_node(tsk);
1286

1287
	if (!tsk->connected)
1288
		return;
1289 1290 1291
	skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
			      dnode, tsk_own_node(tsk), peer_port,
			      tsk->portid, TIPC_OK);
1292
	if (!skb)
1293
		return;
1294
	msg = buf_msg(skb);
1295
	msg_set_msgcnt(msg, ack);
1296
	tipc_link_xmit_skb(net, skb, dnode, msg_link_selector(msg));
1297 1298
}

1299
static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
Y
Ying Xue 已提交
1300 1301 1302
{
	struct sock *sk = sock->sk;
	DEFINE_WAIT(wait);
1303
	long timeo = *timeop;
Y
Ying Xue 已提交
1304 1305 1306 1307
	int err;

	for (;;) {
		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1308
		if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
Y
Ying Xue 已提交
1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327
			if (sock->state == SS_DISCONNECTING) {
				err = -ENOTCONN;
				break;
			}
			release_sock(sk);
			timeo = schedule_timeout(timeo);
			lock_sock(sk);
		}
		err = 0;
		if (!skb_queue_empty(&sk->sk_receive_queue))
			break;
		err = sock_intr_errno(timeo);
		if (signal_pending(current))
			break;
		err = -EAGAIN;
		if (!timeo)
			break;
	}
	finish_wait(sk_sleep(sk), &wait);
1328
	*timeop = timeo;
Y
Ying Xue 已提交
1329 1330 1331
	return err;
}

1332
/**
1333
 * tipc_recvmsg - receive packet-oriented message
P
Per Liden 已提交
1334 1335 1336 1337
 * @iocb: (unused)
 * @m: descriptor for message info
 * @buf_len: total size of user buffer area
 * @flags: receive flags
1338
 *
P
Per Liden 已提交
1339 1340 1341 1342 1343
 * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
 * If the complete message doesn't fit in user area, truncate it.
 *
 * Returns size of returned message data, errno otherwise
 */
1344 1345
static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock,
			struct msghdr *m, size_t buf_len, int flags)
P
Per Liden 已提交
1346
{
1347
	struct sock *sk = sock->sk;
1348
	struct tipc_sock *tsk = tipc_sk(sk);
P
Per Liden 已提交
1349 1350
	struct sk_buff *buf;
	struct tipc_msg *msg;
Y
Ying Xue 已提交
1351
	long timeo;
P
Per Liden 已提交
1352 1353 1354 1355
	unsigned int sz;
	u32 err;
	int res;

1356
	/* Catch invalid receive requests */
P
Per Liden 已提交
1357 1358 1359
	if (unlikely(!buf_len))
		return -EINVAL;

1360
	lock_sock(sk);
P
Per Liden 已提交
1361

1362 1363
	if (unlikely(sock->state == SS_UNCONNECTED)) {
		res = -ENOTCONN;
P
Per Liden 已提交
1364 1365 1366
		goto exit;
	}

Y
Ying Xue 已提交
1367
	timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1368
restart:
P
Per Liden 已提交
1369

1370
	/* Look for a message in receive queue; wait if necessary */
1371
	res = tipc_wait_for_rcvmsg(sock, &timeo);
Y
Ying Xue 已提交
1372 1373
	if (res)
		goto exit;
P
Per Liden 已提交
1374

1375 1376
	/* Look at first message in receive queue */
	buf = skb_peek(&sk->sk_receive_queue);
P
Per Liden 已提交
1377 1378 1379 1380 1381 1382
	msg = buf_msg(buf);
	sz = msg_data_sz(msg);
	err = msg_errcode(msg);

	/* Discard an empty non-errored message & try again */
	if ((!sz) && (!err)) {
1383
		tsk_advance_rx_queue(sk);
P
Per Liden 已提交
1384 1385 1386 1387 1388 1389 1390
		goto restart;
	}

	/* Capture sender's address (optional) */
	set_orig_addr(m, msg);

	/* Capture ancillary data (optional) */
1391
	res = tipc_sk_anc_data_recv(m, msg, tsk);
1392
	if (res)
P
Per Liden 已提交
1393 1394 1395 1396 1397 1398 1399 1400
		goto exit;

	/* Capture message data (if valid) & compute return value (always) */
	if (!err) {
		if (unlikely(buf_len < sz)) {
			sz = buf_len;
			m->msg_flags |= MSG_TRUNC;
		}
1401
		res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg), m, sz);
1402
		if (res)
P
Per Liden 已提交
1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414
			goto exit;
		res = sz;
	} else {
		if ((sock->state == SS_READY) ||
		    ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
			res = 0;
		else
			res = -ECONNRESET;
	}

	/* Consume received message (optional) */
	if (likely(!(flags & MSG_PEEK))) {
1415
		if ((sock->state != SS_READY) &&
1416
		    (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1417
			tipc_sk_send_ack(tsk, tsk->rcv_unacked);
1418 1419
			tsk->rcv_unacked = 0;
		}
1420
		tsk_advance_rx_queue(sk);
1421
	}
P
Per Liden 已提交
1422
exit:
1423
	release_sock(sk);
P
Per Liden 已提交
1424 1425 1426
	return res;
}

1427
/**
1428
 * tipc_recv_stream - receive stream-oriented data
P
Per Liden 已提交
1429 1430 1431 1432
 * @iocb: (unused)
 * @m: descriptor for message info
 * @buf_len: total size of user buffer area
 * @flags: receive flags
1433 1434
 *
 * Used for SOCK_STREAM messages only.  If not enough data is available
P
Per Liden 已提交
1435 1436 1437 1438
 * will optionally wait for more; never truncates data.
 *
 * Returns size of returned message data, errno otherwise
 */
1439 1440
static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock,
			    struct msghdr *m, size_t buf_len, int flags)
P
Per Liden 已提交
1441
{
1442
	struct sock *sk = sock->sk;
1443
	struct tipc_sock *tsk = tipc_sk(sk);
P
Per Liden 已提交
1444 1445
	struct sk_buff *buf;
	struct tipc_msg *msg;
Y
Ying Xue 已提交
1446
	long timeo;
P
Per Liden 已提交
1447
	unsigned int sz;
1448
	int sz_to_copy, target, needed;
P
Per Liden 已提交
1449 1450
	int sz_copied = 0;
	u32 err;
1451
	int res = 0;
P
Per Liden 已提交
1452

1453
	/* Catch invalid receive attempts */
P
Per Liden 已提交
1454 1455 1456
	if (unlikely(!buf_len))
		return -EINVAL;

1457
	lock_sock(sk);
P
Per Liden 已提交
1458

Y
Ying Xue 已提交
1459
	if (unlikely(sock->state == SS_UNCONNECTED)) {
1460
		res = -ENOTCONN;
P
Per Liden 已提交
1461 1462 1463
		goto exit;
	}

1464
	target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
Y
Ying Xue 已提交
1465
	timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
P
Per Liden 已提交
1466

1467
restart:
1468
	/* Look for a message in receive queue; wait if necessary */
1469
	res = tipc_wait_for_rcvmsg(sock, &timeo);
Y
Ying Xue 已提交
1470 1471
	if (res)
		goto exit;
P
Per Liden 已提交
1472

1473 1474
	/* Look at first message in receive queue */
	buf = skb_peek(&sk->sk_receive_queue);
P
Per Liden 已提交
1475 1476 1477 1478 1479 1480
	msg = buf_msg(buf);
	sz = msg_data_sz(msg);
	err = msg_errcode(msg);

	/* Discard an empty non-errored message & try again */
	if ((!sz) && (!err)) {
1481
		tsk_advance_rx_queue(sk);
P
Per Liden 已提交
1482 1483 1484 1485 1486 1487
		goto restart;
	}

	/* Optionally capture sender's address & ancillary data of first msg */
	if (sz_copied == 0) {
		set_orig_addr(m, msg);
1488
		res = tipc_sk_anc_data_recv(m, msg, tsk);
1489
		if (res)
P
Per Liden 已提交
1490 1491 1492 1493 1494
			goto exit;
	}

	/* Capture message data (if valid) & compute return value (always) */
	if (!err) {
1495
		u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
P
Per Liden 已提交
1496

1497
		sz -= offset;
P
Per Liden 已提交
1498 1499
		needed = (buf_len - sz_copied);
		sz_to_copy = (sz <= needed) ? sz : needed;
1500

1501 1502
		res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg) + offset,
					    m, sz_to_copy);
1503
		if (res)
P
Per Liden 已提交
1504
			goto exit;
1505

P
Per Liden 已提交
1506 1507 1508 1509
		sz_copied += sz_to_copy;

		if (sz_to_copy < sz) {
			if (!(flags & MSG_PEEK))
1510 1511
				TIPC_SKB_CB(buf)->handle =
				(void *)(unsigned long)(offset + sz_to_copy);
P
Per Liden 已提交
1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525
			goto exit;
		}
	} else {
		if (sz_copied != 0)
			goto exit; /* can't add error msg to valid data */

		if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
			res = 0;
		else
			res = -ECONNRESET;
	}

	/* Consume received message (optional) */
	if (likely(!(flags & MSG_PEEK))) {
1526
		if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1527
			tipc_sk_send_ack(tsk, tsk->rcv_unacked);
1528 1529
			tsk->rcv_unacked = 0;
		}
1530
		tsk_advance_rx_queue(sk);
1531
	}
P
Per Liden 已提交
1532 1533

	/* Loop around if more data is required */
1534 1535
	if ((sz_copied < buf_len) &&	/* didn't get all requested data */
	    (!skb_queue_empty(&sk->sk_receive_queue) ||
1536
	    (sz_copied < target)) &&	/* and more is ready or required */
1537 1538
	    (!(flags & MSG_PEEK)) &&	/* and aren't just peeking at data */
	    (!err))			/* and haven't reached a FIN */
P
Per Liden 已提交
1539 1540 1541
		goto restart;

exit:
1542
	release_sock(sk);
1543
	return sz_copied ? sz_copied : res;
P
Per Liden 已提交
1544 1545
}

1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566
/**
 * tipc_write_space - wake up thread if port congestion is released
 * @sk: socket
 */
static void tipc_write_space(struct sock *sk)
{
	struct socket_wq *wq;

	rcu_read_lock();
	wq = rcu_dereference(sk->sk_wq);
	if (wq_has_sleeper(wq))
		wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
						POLLWRNORM | POLLWRBAND);
	rcu_read_unlock();
}

/**
 * tipc_data_ready - wake up threads to indicate messages have been received
 * @sk: socket
 * @len: the length of messages
 */
1567
static void tipc_data_ready(struct sock *sk)
1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578
{
	struct socket_wq *wq;

	rcu_read_lock();
	wq = rcu_dereference(sk->sk_wq);
	if (wq_has_sleeper(wq))
		wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
						POLLRDNORM | POLLRDBAND);
	rcu_read_unlock();
}

1579 1580
/**
 * filter_connect - Handle all incoming messages for a connection-based socket
1581
 * @tsk: TIPC socket
1582
 * @skb: pointer to message buffer. Set to NULL if buffer is consumed
1583
 *
S
stephen hemminger 已提交
1584
 * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise
1585
 */
1586
static int filter_connect(struct tipc_sock *tsk, struct sk_buff **skb)
1587
{
1588
	struct sock *sk = &tsk->sk;
1589
	struct net *net = sock_net(sk);
1590
	struct socket *sock = sk->sk_socket;
1591
	struct tipc_msg *msg = buf_msg(*skb);
1592
	int retval = -TIPC_ERR_NO_PORT;
1593 1594 1595 1596 1597 1598 1599

	if (msg_mcast(msg))
		return retval;

	switch ((int)sock->state) {
	case SS_CONNECTED:
		/* Accept only connection-based messages sent by peer */
1600
		if (tsk_peer_msg(tsk, msg)) {
1601 1602
			if (unlikely(msg_errcode(msg))) {
				sock->state = SS_DISCONNECTING;
1603
				tsk->connected = 0;
1604
				/* let timer expire on it's own */
1605
				tipc_node_remove_conn(net, tsk_peer_node(tsk),
1606
						      tsk->portid);
1607 1608 1609 1610 1611 1612
			}
			retval = TIPC_OK;
		}
		break;
	case SS_CONNECTING:
		/* Accept only ACK or NACK message */
1613 1614 1615 1616

		if (unlikely(!msg_connected(msg)))
			break;

1617 1618
		if (unlikely(msg_errcode(msg))) {
			sock->state = SS_DISCONNECTING;
1619
			sk->sk_err = ECONNREFUSED;
1620 1621 1622 1623
			retval = TIPC_OK;
			break;
		}

1624
		if (unlikely(msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)) {
1625
			sock->state = SS_DISCONNECTING;
1626
			sk->sk_err = EINVAL;
1627
			retval = TIPC_OK;
1628 1629 1630
			break;
		}

1631 1632
		tipc_sk_finish_conn(tsk, msg_origport(msg), msg_orignode(msg));
		msg_set_importance(&tsk->phdr, msg_importance(msg));
1633 1634
		sock->state = SS_CONNECTED;

1635 1636 1637 1638 1639 1640
		/* If an incoming message is an 'ACK-', it should be
		 * discarded here because it doesn't contain useful
		 * data. In addition, we should try to wake up
		 * connect() routine if sleeping.
		 */
		if (msg_data_sz(msg) == 0) {
1641 1642
			kfree_skb(*skb);
			*skb = NULL;
1643 1644 1645 1646
			if (waitqueue_active(sk_sleep(sk)))
				wake_up_interruptible(sk_sleep(sk));
		}
		retval = TIPC_OK;
1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661
		break;
	case SS_LISTENING:
	case SS_UNCONNECTED:
		/* Accept only SYN message */
		if (!msg_connected(msg) && !(msg_errcode(msg)))
			retval = TIPC_OK;
		break;
	case SS_DISCONNECTING:
		break;
	default:
		pr_err("Unknown socket state %u\n", sock->state);
	}
	return retval;
}

1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672
/**
 * rcvbuf_limit - get proper overload limit of socket receive queue
 * @sk: socket
 * @buf: message
 *
 * For all connection oriented messages, irrespective of importance,
 * the default overload value (i.e. 67MB) is set as limit.
 *
 * For all connectionless messages, by default new queue limits are
 * as belows:
 *
1673 1674 1675 1676
 * TIPC_LOW_IMPORTANCE       (4 MB)
 * TIPC_MEDIUM_IMPORTANCE    (8 MB)
 * TIPC_HIGH_IMPORTANCE      (16 MB)
 * TIPC_CRITICAL_IMPORTANCE  (32 MB)
1677 1678 1679 1680 1681 1682 1683 1684
 *
 * Returns overload limit according to corresponding message importance
 */
static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
{
	struct tipc_msg *msg = buf_msg(buf);

	if (msg_connected(msg))
1685 1686 1687 1688
		return sysctl_tipc_rmem[2];

	return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
		msg_importance(msg);
1689 1690
}

1691
/**
1692 1693
 * filter_rcv - validate incoming message
 * @sk: socket
1694
 * @skb: pointer to message. Set to NULL if buffer is consumed.
1695
 *
1696 1697 1698
 * Enqueues message on receive queue if acceptable; optionally handles
 * disconnect indication for a connected socket.
 *
1699
 * Called with socket lock already taken
1700
 *
1701
 * Returns 0 (TIPC_OK) if message was ok, -TIPC error code if rejected
P
Per Liden 已提交
1702
 */
1703
static int filter_rcv(struct sock *sk, struct sk_buff **skb)
P
Per Liden 已提交
1704
{
1705
	struct socket *sock = sk->sk_socket;
1706
	struct tipc_sock *tsk = tipc_sk(sk);
1707 1708
	struct tipc_msg *msg = buf_msg(*skb);
	unsigned int limit = rcvbuf_limit(sk, *skb);
1709
	int rc = TIPC_OK;
P
Per Liden 已提交
1710

1711 1712 1713 1714
	if (unlikely(msg_user(msg) == CONN_MANAGER)) {
		tipc_sk_proto_rcv(tsk, skb);
		return TIPC_OK;
	}
1715

1716
	if (unlikely(msg_user(msg) == SOCK_WAKEUP)) {
1717
		kfree_skb(*skb);
1718 1719
		tsk->link_cong = 0;
		sk->sk_write_space(sk);
1720
		*skb = NULL;
1721 1722 1723
		return TIPC_OK;
	}

P
Per Liden 已提交
1724
	/* Reject message if it is wrong sort of message for socket */
1725
	if (msg_type(msg) > TIPC_DIRECT_MSG)
1726
		return -TIPC_ERR_NO_PORT;
1727

P
Per Liden 已提交
1728
	if (sock->state == SS_READY) {
1729
		if (msg_connected(msg))
1730
			return -TIPC_ERR_NO_PORT;
P
Per Liden 已提交
1731
	} else {
1732 1733
		rc = filter_connect(tsk, skb);
		if (rc != TIPC_OK || !*skb)
1734
			return rc;
P
Per Liden 已提交
1735 1736 1737
	}

	/* Reject message if there isn't room to queue it */
1738
	if (sk_rmem_alloc_get(sk) + (*skb)->truesize >= limit)
1739
		return -TIPC_ERR_OVERLOAD;
P
Per Liden 已提交
1740

1741
	/* Enqueue message */
1742 1743 1744
	TIPC_SKB_CB(*skb)->handle = NULL;
	__skb_queue_tail(&sk->sk_receive_queue, *skb);
	skb_set_owner_r(*skb, sk);
1745

1746
	sk->sk_data_ready(sk);
1747
	*skb = NULL;
1748 1749
	return TIPC_OK;
}
P
Per Liden 已提交
1750

1751
/**
1752
 * tipc_backlog_rcv - handle incoming message from backlog queue
1753
 * @sk: socket
1754
 * @skb: message
1755
 *
1756
 * Caller must hold socket lock
1757 1758 1759
 *
 * Returns 0
 */
1760
static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1761
{
1762 1763 1764
	int err;
	atomic_t *dcnt;
	u32 dnode;
1765
	struct tipc_sock *tsk = tipc_sk(sk);
1766
	struct net *net = sock_net(sk);
1767
	uint truesize = skb->truesize;
1768

1769 1770 1771 1772 1773
	err = filter_rcv(sk, &skb);
	if (likely(!skb)) {
		dcnt = &tsk->dupl_rcvcnt;
		if (atomic_read(dcnt) < TIPC_CONN_OVERLOAD_LIMIT)
			atomic_add(truesize, dcnt);
1774 1775
		return 0;
	}
1776 1777
	if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err))
		tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
1778 1779 1780
	return 0;
}

1781
/**
1782 1783 1784 1785 1786 1787
 * tipc_sk_enqueue - extract all buffers with destination 'dport' from
 *                   inputq and try adding them to socket or backlog queue
 * @inputq: list of incoming buffers with potentially different destinations
 * @sk: socket where the buffers should be enqueued
 * @dport: port number for the socket
 * @_skb: returned buffer to be forwarded or rejected, if applicable
1788 1789 1790
 *
 * Caller must hold socket lock
 *
1791 1792
 * Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD
 * or -TIPC_ERR_NO_PORT
1793
 */
1794 1795
static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
			   u32 dport, struct sk_buff **_skb)
1796 1797 1798
{
	unsigned int lim;
	atomic_t *dcnt;
1799 1800 1801 1802 1803
	int err;
	struct sk_buff *skb;
	unsigned long time_limit = jiffies + 2;

	while (skb_queue_len(inputq)) {
1804 1805
		if (unlikely(time_after_eq(jiffies, time_limit)))
			return TIPC_OK;
1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822
		skb = tipc_skb_dequeue(inputq, dport);
		if (unlikely(!skb))
			return TIPC_OK;
		if (!sock_owned_by_user(sk)) {
			err = filter_rcv(sk, &skb);
			if (likely(!skb))
				continue;
			*_skb = skb;
			return err;
		}
		dcnt = &tipc_sk(sk)->dupl_rcvcnt;
		if (sk->sk_backlog.len)
			atomic_set(dcnt, 0);
		lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
		if (likely(!sk_add_backlog(sk, skb, lim)))
			continue;
		*_skb = skb;
1823
		return -TIPC_ERR_OVERLOAD;
1824
	}
1825 1826 1827
	return TIPC_OK;
}

1828
/**
1829 1830 1831 1832 1833 1834
 * tipc_sk_rcv - handle a chain of incoming buffers
 * @inputq: buffer list containing the buffers
 * Consumes all buffers in list until inputq is empty
 * Note: may be called in multiple threads referring to the same queue
 * Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH
 * Only node local calls check the return value, sending single-buffer queues
1835
 */
1836
int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
1837
{
1838 1839 1840
	u32 dnode, dport = 0;
	int err = -TIPC_ERR_NO_PORT;
	struct sk_buff *skb;
1841
	struct tipc_sock *tsk;
1842
	struct tipc_net *tn;
1843 1844
	struct sock *sk;

1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870
	while (skb_queue_len(inputq)) {
		skb = NULL;
		dport = tipc_skb_peek_port(inputq, dport);
		tsk = tipc_sk_lookup(net, dport);
		if (likely(tsk)) {
			sk = &tsk->sk;
			if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
				err = tipc_sk_enqueue(inputq, sk, dport, &skb);
				spin_unlock_bh(&sk->sk_lock.slock);
				dport = 0;
			}
			sock_put(sk);
		} else {
			skb = tipc_skb_dequeue(inputq, dport);
		}
		if (likely(!skb))
			continue;
		if (tipc_msg_lookup_dest(net, skb, &dnode, &err))
			goto xmit;
		if (!err) {
			dnode = msg_destnode(buf_msg(skb));
			goto xmit;
		}
		tn = net_generic(net, tipc_net_id);
		if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
			continue;
1871
xmit:
1872 1873
		tipc_link_xmit_skb(net, skb, dnode, dport);
	}
1874
	return err ? -EHOSTUNREACH : 0;
P
Per Liden 已提交
1875 1876
}

Y
Ying Xue 已提交
1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898
static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
{
	struct sock *sk = sock->sk;
	DEFINE_WAIT(wait);
	int done;

	do {
		int err = sock_error(sk);
		if (err)
			return err;
		if (!*timeo_p)
			return -ETIMEDOUT;
		if (signal_pending(current))
			return sock_intr_errno(*timeo_p);

		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
		done = sk_wait_event(sk, timeo_p, sock->state != SS_CONNECTING);
		finish_wait(sk_sleep(sk), &wait);
	} while (!done);
	return 0;
}

P
Per Liden 已提交
1899
/**
1900
 * tipc_connect - establish a connection to another TIPC port
P
Per Liden 已提交
1901 1902 1903
 * @sock: socket structure
 * @dest: socket address for destination port
 * @destlen: size of socket address data structure
1904
 * @flags: file-related flags associated with socket
P
Per Liden 已提交
1905 1906 1907
 *
 * Returns 0 on success, errno otherwise
 */
1908 1909
static int tipc_connect(struct socket *sock, struct sockaddr *dest,
			int destlen, int flags)
P
Per Liden 已提交
1910
{
1911
	struct sock *sk = sock->sk;
1912 1913
	struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
	struct msghdr m = {NULL,};
Y
Ying Xue 已提交
1914 1915
	long timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout;
	socket_state previous;
1916 1917
	int res;

1918 1919
	lock_sock(sk);

1920
	/* For now, TIPC does not allow use of connect() with DGRAM/RDM types */
1921 1922 1923 1924
	if (sock->state == SS_READY) {
		res = -EOPNOTSUPP;
		goto exit;
	}
1925 1926 1927 1928 1929 1930 1931

	/*
	 * Reject connection attempt using multicast address
	 *
	 * Note: send_msg() validates the rest of the address fields,
	 *       so there's no need to do it here
	 */
1932 1933 1934 1935 1936
	if (dst->addrtype == TIPC_ADDR_MCAST) {
		res = -EINVAL;
		goto exit;
	}

Y
Ying Xue 已提交
1937
	previous = sock->state;
1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949
	switch (sock->state) {
	case SS_UNCONNECTED:
		/* Send a 'SYN-' to destination */
		m.msg_name = dest;
		m.msg_namelen = destlen;

		/* If connect is in non-blocking case, set MSG_DONTWAIT to
		 * indicate send_msg() is never blocked.
		 */
		if (!timeout)
			m.msg_flags = MSG_DONTWAIT;

1950
		res = tipc_sendmsg(NULL, sock, &m, 0);
1951 1952 1953 1954 1955 1956 1957 1958 1959
		if ((res < 0) && (res != -EWOULDBLOCK))
			goto exit;

		/* Just entered SS_CONNECTING state; the only
		 * difference is that return value in non-blocking
		 * case is EINPROGRESS, rather than EALREADY.
		 */
		res = -EINPROGRESS;
	case SS_CONNECTING:
Y
Ying Xue 已提交
1960 1961 1962 1963 1964 1965 1966
		if (previous == SS_CONNECTING)
			res = -EALREADY;
		if (!timeout)
			goto exit;
		timeout = msecs_to_jiffies(timeout);
		/* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
		res = tipc_wait_for_connect(sock, &timeout);
1967 1968 1969 1970 1971 1972
		break;
	case SS_CONNECTED:
		res = -EISCONN;
		break;
	default:
		res = -EINVAL;
Y
Ying Xue 已提交
1973
		break;
1974
	}
1975 1976
exit:
	release_sock(sk);
1977
	return res;
P
Per Liden 已提交
1978 1979
}

1980
/**
1981
 * tipc_listen - allow socket to listen for incoming connections
P
Per Liden 已提交
1982 1983
 * @sock: socket structure
 * @len: (unused)
1984
 *
P
Per Liden 已提交
1985 1986
 * Returns 0 on success, errno otherwise
 */
1987
static int tipc_listen(struct socket *sock, int len)
P
Per Liden 已提交
1988
{
1989 1990 1991 1992
	struct sock *sk = sock->sk;
	int res;

	lock_sock(sk);
P
Per Liden 已提交
1993

1994
	if (sock->state != SS_UNCONNECTED)
1995 1996 1997 1998 1999 2000 2001 2002
		res = -EINVAL;
	else {
		sock->state = SS_LISTENING;
		res = 0;
	}

	release_sock(sk);
	return res;
P
Per Liden 已提交
2003 2004
}

Y
Ying Xue 已提交
2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018
static int tipc_wait_for_accept(struct socket *sock, long timeo)
{
	struct sock *sk = sock->sk;
	DEFINE_WAIT(wait);
	int err;

	/* True wake-one mechanism for incoming connections: only
	 * one process gets woken up, not the 'whole herd'.
	 * Since we do not 'race & poll' for established sockets
	 * anymore, the common case will execute the loop only once.
	*/
	for (;;) {
		prepare_to_wait_exclusive(sk_sleep(sk), &wait,
					  TASK_INTERRUPTIBLE);
2019
		if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
Y
Ying Xue 已提交
2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037 2038 2039 2040
			release_sock(sk);
			timeo = schedule_timeout(timeo);
			lock_sock(sk);
		}
		err = 0;
		if (!skb_queue_empty(&sk->sk_receive_queue))
			break;
		err = -EINVAL;
		if (sock->state != SS_LISTENING)
			break;
		err = sock_intr_errno(timeo);
		if (signal_pending(current))
			break;
		err = -EAGAIN;
		if (!timeo)
			break;
	}
	finish_wait(sk_sleep(sk), &wait);
	return err;
}

2041
/**
2042
 * tipc_accept - wait for connection request
P
Per Liden 已提交
2043 2044 2045
 * @sock: listening socket
 * @newsock: new socket that is to be connected
 * @flags: file-related flags associated with socket
2046
 *
P
Per Liden 已提交
2047 2048
 * Returns 0 on success, errno otherwise
 */
2049
static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
P
Per Liden 已提交
2050
{
2051
	struct sock *new_sk, *sk = sock->sk;
P
Per Liden 已提交
2052
	struct sk_buff *buf;
2053
	struct tipc_sock *new_tsock;
2054
	struct tipc_msg *msg;
Y
Ying Xue 已提交
2055
	long timeo;
2056
	int res;
P
Per Liden 已提交
2057

2058
	lock_sock(sk);
P
Per Liden 已提交
2059

2060 2061
	if (sock->state != SS_LISTENING) {
		res = -EINVAL;
P
Per Liden 已提交
2062 2063
		goto exit;
	}
Y
Ying Xue 已提交
2064 2065 2066 2067
	timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
	res = tipc_wait_for_accept(sock, timeo);
	if (res)
		goto exit;
2068 2069 2070

	buf = skb_peek(&sk->sk_receive_queue);

2071
	res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);
2072 2073
	if (res)
		goto exit;
P
Per Liden 已提交
2074

2075
	new_sk = new_sock->sk;
2076
	new_tsock = tipc_sk(new_sk);
2077
	msg = buf_msg(buf);
P
Per Liden 已提交
2078

2079 2080 2081 2082 2083 2084 2085
	/* we lock on new_sk; but lockdep sees the lock on sk */
	lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);

	/*
	 * Reject any stray messages received by new socket
	 * before the socket lock was taken (very, very unlikely)
	 */
2086
	tsk_rej_rx_queue(new_sk);
2087 2088

	/* Connect new socket to it's peer */
2089
	tipc_sk_finish_conn(new_tsock, msg_origport(msg), msg_orignode(msg));
2090 2091
	new_sock->state = SS_CONNECTED;

2092
	tsk_set_importance(new_tsock, msg_importance(msg));
2093
	if (msg_named(msg)) {
2094 2095
		new_tsock->conn_type = msg_nametype(msg);
		new_tsock->conn_instance = msg_nameinst(msg);
P
Per Liden 已提交
2096
	}
2097 2098 2099 2100 2101 2102 2103 2104

	/*
	 * Respond to 'SYN-' by discarding it & returning 'ACK'-.
	 * Respond to 'SYN+' by queuing it on new socket.
	 */
	if (!msg_data_sz(msg)) {
		struct msghdr m = {NULL,};

2105
		tsk_advance_rx_queue(sk);
2106
		tipc_send_packet(NULL, new_sock, &m, 0);
2107 2108 2109
	} else {
		__skb_dequeue(&sk->sk_receive_queue);
		__skb_queue_head(&new_sk->sk_receive_queue, buf);
2110
		skb_set_owner_r(buf, new_sk);
2111 2112
	}
	release_sock(new_sk);
P
Per Liden 已提交
2113
exit:
2114
	release_sock(sk);
P
Per Liden 已提交
2115 2116 2117 2118
	return res;
}

/**
2119
 * tipc_shutdown - shutdown socket connection
P
Per Liden 已提交
2120
 * @sock: socket structure
2121
 * @how: direction to close (must be SHUT_RDWR)
P
Per Liden 已提交
2122 2123
 *
 * Terminates connection (if necessary), then purges socket's receive queue.
2124
 *
P
Per Liden 已提交
2125 2126
 * Returns 0 on success, errno otherwise
 */
2127
static int tipc_shutdown(struct socket *sock, int how)
P
Per Liden 已提交
2128
{
2129
	struct sock *sk = sock->sk;
2130
	struct net *net = sock_net(sk);
2131
	struct tipc_sock *tsk = tipc_sk(sk);
2132
	struct sk_buff *skb;
2133
	u32 dnode;
P
Per Liden 已提交
2134 2135
	int res;

2136 2137
	if (how != SHUT_RDWR)
		return -EINVAL;
P
Per Liden 已提交
2138

2139
	lock_sock(sk);
P
Per Liden 已提交
2140 2141

	switch (sock->state) {
2142
	case SS_CONNECTING:
P
Per Liden 已提交
2143 2144 2145
	case SS_CONNECTED:

restart:
2146
		/* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
2147 2148 2149 2150
		skb = __skb_dequeue(&sk->sk_receive_queue);
		if (skb) {
			if (TIPC_SKB_CB(skb)->handle != NULL) {
				kfree_skb(skb);
P
Per Liden 已提交
2151 2152
				goto restart;
			}
2153
			if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
2154
					     TIPC_CONN_SHUTDOWN))
2155 2156 2157
				tipc_link_xmit_skb(net, skb, dnode,
						   tsk->portid);
			tipc_node_remove_conn(net, dnode, tsk->portid);
2158
		} else {
2159
			dnode = tsk_peer_node(tsk);
2160 2161

			skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
2162
					      TIPC_CONN_MSG, SHORT_H_SIZE,
2163
					      0, dnode, tsk_own_node(tsk),
2164
					      tsk_peer_port(tsk),
2165
					      tsk->portid, TIPC_CONN_SHUTDOWN);
2166
			tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
P
Per Liden 已提交
2167
		}
2168
		tsk->connected = 0;
2169
		sock->state = SS_DISCONNECTING;
2170
		tipc_node_remove_conn(net, dnode, tsk->portid);
P
Per Liden 已提交
2171 2172 2173 2174
		/* fall through */

	case SS_DISCONNECTING:

2175
		/* Discard any unreceived messages */
2176
		__skb_queue_purge(&sk->sk_receive_queue);
2177 2178 2179

		/* Wake up anyone sleeping in poll */
		sk->sk_state_change(sk);
P
Per Liden 已提交
2180 2181 2182 2183 2184 2185 2186
		res = 0;
		break;

	default:
		res = -ENOTCONN;
	}

2187
	release_sock(sk);
P
Per Liden 已提交
2188 2189 2190
	return res;
}

2191
static void tipc_sk_timeout(unsigned long data)
2192
{
2193 2194
	struct tipc_sock *tsk = (struct tipc_sock *)data;
	struct sock *sk = &tsk->sk;
2195
	struct sk_buff *skb = NULL;
2196
	u32 peer_port, peer_node;
2197
	u32 own_node = tsk_own_node(tsk);
2198

J
Jon Paul Maloy 已提交
2199
	bh_lock_sock(sk);
2200
	if (!tsk->connected) {
J
Jon Paul Maloy 已提交
2201 2202
		bh_unlock_sock(sk);
		goto exit;
2203
	}
2204 2205
	peer_port = tsk_peer_port(tsk);
	peer_node = tsk_peer_node(tsk);
2206

2207
	if (tsk->probing_state == TIPC_CONN_PROBING) {
2208
		/* Previous probe not answered -> self abort */
2209
		skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
2210
				      TIPC_CONN_MSG, SHORT_H_SIZE, 0,
2211
				      own_node, peer_node, tsk->portid,
2212
				      peer_port, TIPC_ERR_NO_PORT);
2213
	} else {
2214 2215
		skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE,
				      INT_H_SIZE, 0, peer_node, own_node,
2216
				      peer_port, tsk->portid, TIPC_OK);
2217
		tsk->probing_state = TIPC_CONN_PROBING;
2218
		sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
2219 2220
	}
	bh_unlock_sock(sk);
2221
	if (skb)
2222
		tipc_link_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
J
Jon Paul Maloy 已提交
2223
exit:
2224
	sock_put(sk);
2225 2226
}

2227
static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
J
Jon Paul Maloy 已提交
2228 2229
			   struct tipc_name_seq const *seq)
{
2230
	struct net *net = sock_net(&tsk->sk);
J
Jon Paul Maloy 已提交
2231 2232 2233
	struct publication *publ;
	u32 key;

2234
	if (tsk->connected)
J
Jon Paul Maloy 已提交
2235
		return -EINVAL;
2236 2237
	key = tsk->portid + tsk->pub_count + 1;
	if (key == tsk->portid)
J
Jon Paul Maloy 已提交
2238 2239
		return -EADDRINUSE;

2240
	publ = tipc_nametbl_publish(net, seq->type, seq->lower, seq->upper,
2241
				    scope, tsk->portid, key);
J
Jon Paul Maloy 已提交
2242 2243 2244
	if (unlikely(!publ))
		return -EINVAL;

2245 2246 2247
	list_add(&publ->pport_list, &tsk->publications);
	tsk->pub_count++;
	tsk->published = 1;
J
Jon Paul Maloy 已提交
2248 2249 2250
	return 0;
}

2251
static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
J
Jon Paul Maloy 已提交
2252 2253
			    struct tipc_name_seq const *seq)
{
2254
	struct net *net = sock_net(&tsk->sk);
J
Jon Paul Maloy 已提交
2255 2256 2257 2258
	struct publication *publ;
	struct publication *safe;
	int rc = -EINVAL;

2259
	list_for_each_entry_safe(publ, safe, &tsk->publications, pport_list) {
J
Jon Paul Maloy 已提交
2260 2261 2262 2263 2264 2265 2266 2267 2268
		if (seq) {
			if (publ->scope != scope)
				continue;
			if (publ->type != seq->type)
				continue;
			if (publ->lower != seq->lower)
				continue;
			if (publ->upper != seq->upper)
				break;
2269
			tipc_nametbl_withdraw(net, publ->type, publ->lower,
J
Jon Paul Maloy 已提交
2270 2271 2272 2273
					      publ->ref, publ->key);
			rc = 0;
			break;
		}
2274
		tipc_nametbl_withdraw(net, publ->type, publ->lower,
J
Jon Paul Maloy 已提交
2275 2276 2277
				      publ->ref, publ->key);
		rc = 0;
	}
2278 2279
	if (list_empty(&tsk->publications))
		tsk->published = 0;
J
Jon Paul Maloy 已提交
2280 2281 2282
	return rc;
}

2283 2284 2285
/* tipc_sk_reinit: set non-zero address in all existing sockets
 *                 when we go from standalone to network mode.
 */
2286
void tipc_sk_reinit(struct net *net)
2287
{
2288
	struct tipc_net *tn = net_generic(net, tipc_net_id);
2289 2290 2291
	const struct bucket_table *tbl;
	struct rhash_head *pos;
	struct tipc_sock *tsk;
2292
	struct tipc_msg *msg;
2293
	int i;
2294

2295
	rcu_read_lock();
2296
	tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2297 2298 2299 2300
	for (i = 0; i < tbl->size; i++) {
		rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
			spin_lock_bh(&tsk->sk.sk_lock.slock);
			msg = &tsk->phdr;
2301 2302
			msg_set_prevnode(msg, tn->own_addr);
			msg_set_orignode(msg, tn->own_addr);
2303 2304
			spin_unlock_bh(&tsk->sk.sk_lock.slock);
		}
2305
	}
2306
	rcu_read_unlock();
2307 2308
}

2309
static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
2310
{
2311
	struct tipc_net *tn = net_generic(net, tipc_net_id);
2312
	struct tipc_sock *tsk;
2313

2314
	rcu_read_lock();
2315
	tsk = rhashtable_lookup(&tn->sk_rht, &portid);
2316 2317 2318
	if (tsk)
		sock_hold(&tsk->sk);
	rcu_read_unlock();
2319

2320
	return tsk;
2321 2322
}

2323
static int tipc_sk_insert(struct tipc_sock *tsk)
2324
{
2325 2326 2327
	struct sock *sk = &tsk->sk;
	struct net *net = sock_net(sk);
	struct tipc_net *tn = net_generic(net, tipc_net_id);
2328 2329
	u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1;
	u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT;
2330

2331 2332 2333 2334 2335 2336
	while (remaining--) {
		portid++;
		if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT))
			portid = TIPC_MIN_PORT;
		tsk->portid = portid;
		sock_hold(&tsk->sk);
2337
		if (rhashtable_lookup_insert(&tn->sk_rht, &tsk->node))
2338 2339
			return 0;
		sock_put(&tsk->sk);
2340 2341
	}

2342
	return -1;
2343 2344
}

2345
static void tipc_sk_remove(struct tipc_sock *tsk)
2346
{
2347
	struct sock *sk = &tsk->sk;
2348
	struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id);
2349

2350
	if (rhashtable_remove(&tn->sk_rht, &tsk->node)) {
2351 2352
		WARN_ON(atomic_read(&sk->sk_refcnt) == 1);
		__sock_put(sk);
2353 2354 2355
	}
}

2356
int tipc_sk_rht_init(struct net *net)
2357
{
2358
	struct tipc_net *tn = net_generic(net, tipc_net_id);
2359 2360 2361 2362 2363 2364 2365 2366 2367 2368 2369
	struct rhashtable_params rht_params = {
		.nelem_hint = 192,
		.head_offset = offsetof(struct tipc_sock, node),
		.key_offset = offsetof(struct tipc_sock, portid),
		.key_len = sizeof(u32), /* portid */
		.hashfn = jhash,
		.max_shift = 20, /* 1M */
		.min_shift = 8,  /* 256 */
		.grow_decision = rht_grow_above_75,
		.shrink_decision = rht_shrink_below_30,
	};
2370

2371
	return rhashtable_init(&tn->sk_rht, &rht_params);
2372 2373
}

2374
void tipc_sk_rht_destroy(struct net *net)
2375
{
2376 2377
	struct tipc_net *tn = net_generic(net, tipc_net_id);

2378 2379
	/* Wait for socket readers to complete */
	synchronize_net();
2380

2381
	rhashtable_destroy(&tn->sk_rht);
2382 2383
}

P
Per Liden 已提交
2384
/**
2385
 * tipc_setsockopt - set socket option
P
Per Liden 已提交
2386 2387 2388 2389 2390
 * @sock: socket structure
 * @lvl: option level
 * @opt: option identifier
 * @ov: pointer to new option value
 * @ol: length of option value
2391 2392
 *
 * For stream sockets only, accepts and ignores all IPPROTO_TCP options
P
Per Liden 已提交
2393
 * (to ease compatibility).
2394
 *
P
Per Liden 已提交
2395 2396
 * Returns 0 on success, errno otherwise
 */
2397 2398
static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
			   char __user *ov, unsigned int ol)
P
Per Liden 已提交
2399
{
2400
	struct sock *sk = sock->sk;
2401
	struct tipc_sock *tsk = tipc_sk(sk);
P
Per Liden 已提交
2402 2403 2404
	u32 value;
	int res;

2405 2406
	if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
		return 0;
P
Per Liden 已提交
2407 2408 2409 2410
	if (lvl != SOL_TIPC)
		return -ENOPROTOOPT;
	if (ol < sizeof(value))
		return -EINVAL;
2411 2412
	res = get_user(value, (u32 __user *)ov);
	if (res)
P
Per Liden 已提交
2413 2414
		return res;

2415
	lock_sock(sk);
2416

P
Per Liden 已提交
2417 2418
	switch (opt) {
	case TIPC_IMPORTANCE:
2419
		res = tsk_set_importance(tsk, value);
P
Per Liden 已提交
2420 2421 2422
		break;
	case TIPC_SRC_DROPPABLE:
		if (sock->type != SOCK_STREAM)
2423
			tsk_set_unreliable(tsk, value);
2424
		else
P
Per Liden 已提交
2425 2426 2427
			res = -ENOPROTOOPT;
		break;
	case TIPC_DEST_DROPPABLE:
2428
		tsk_set_unreturnable(tsk, value);
P
Per Liden 已提交
2429 2430
		break;
	case TIPC_CONN_TIMEOUT:
2431
		tipc_sk(sk)->conn_timeout = value;
2432
		/* no need to set "res", since already 0 at this point */
P
Per Liden 已提交
2433 2434 2435 2436 2437
		break;
	default:
		res = -EINVAL;
	}

2438 2439
	release_sock(sk);

P
Per Liden 已提交
2440 2441 2442 2443
	return res;
}

/**
2444
 * tipc_getsockopt - get socket option
P
Per Liden 已提交
2445 2446 2447 2448 2449
 * @sock: socket structure
 * @lvl: option level
 * @opt: option identifier
 * @ov: receptacle for option value
 * @ol: receptacle for length of option value
2450 2451
 *
 * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
P
Per Liden 已提交
2452
 * (to ease compatibility).
2453
 *
P
Per Liden 已提交
2454 2455
 * Returns 0 on success, errno otherwise
 */
2456 2457
static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
			   char __user *ov, int __user *ol)
P
Per Liden 已提交
2458
{
2459
	struct sock *sk = sock->sk;
2460
	struct tipc_sock *tsk = tipc_sk(sk);
2461
	int len;
P
Per Liden 已提交
2462
	u32 value;
2463
	int res;
P
Per Liden 已提交
2464

2465 2466
	if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
		return put_user(0, ol);
P
Per Liden 已提交
2467 2468
	if (lvl != SOL_TIPC)
		return -ENOPROTOOPT;
2469 2470
	res = get_user(len, ol);
	if (res)
2471
		return res;
P
Per Liden 已提交
2472

2473
	lock_sock(sk);
P
Per Liden 已提交
2474 2475 2476

	switch (opt) {
	case TIPC_IMPORTANCE:
2477
		value = tsk_importance(tsk);
P
Per Liden 已提交
2478 2479
		break;
	case TIPC_SRC_DROPPABLE:
2480
		value = tsk_unreliable(tsk);
P
Per Liden 已提交
2481 2482
		break;
	case TIPC_DEST_DROPPABLE:
2483
		value = tsk_unreturnable(tsk);
P
Per Liden 已提交
2484 2485
		break;
	case TIPC_CONN_TIMEOUT:
2486
		value = tsk->conn_timeout;
2487
		/* no need to set "res", since already 0 at this point */
P
Per Liden 已提交
2488
		break;
2489
	case TIPC_NODE_RECVQ_DEPTH:
2490
		value = 0; /* was tipc_queue_size, now obsolete */
2491
		break;
2492
	case TIPC_SOCK_RECVQ_DEPTH:
2493 2494
		value = skb_queue_len(&sk->sk_receive_queue);
		break;
P
Per Liden 已提交
2495 2496 2497 2498
	default:
		res = -EINVAL;
	}

2499 2500
	release_sock(sk);

2501 2502
	if (res)
		return res;	/* "get" failed */
P
Per Liden 已提交
2503

2504 2505 2506 2507 2508 2509 2510
	if (len < sizeof(value))
		return -EINVAL;

	if (copy_to_user(ov, &value, sizeof(value)))
		return -EFAULT;

	return put_user(sizeof(value), ol);
P
Per Liden 已提交
2511 2512
}

2513
static int tipc_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
E
Erik Hugne 已提交
2514
{
2515
	struct sock *sk = sock->sk;
E
Erik Hugne 已提交
2516 2517 2518 2519 2520 2521 2522
	struct tipc_sioc_ln_req lnr;
	void __user *argp = (void __user *)arg;

	switch (cmd) {
	case SIOCGETLINKNAME:
		if (copy_from_user(&lnr, argp, sizeof(lnr)))
			return -EFAULT;
2523 2524
		if (!tipc_node_get_linkname(sock_net(sk),
					    lnr.bearer_id & 0xffff, lnr.peer,
E
Erik Hugne 已提交
2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535
					    lnr.linkname, TIPC_MAX_LINK_NAME)) {
			if (copy_to_user(argp, &lnr, sizeof(lnr)))
				return -EFAULT;
			return 0;
		}
		return -EADDRNOTAVAIL;
	default:
		return -ENOIOCTLCMD;
	}
}

2536 2537
/* Protocol switches for the various types of TIPC sockets */

2538
static const struct proto_ops msg_ops = {
2539
	.owner		= THIS_MODULE,
P
Per Liden 已提交
2540
	.family		= AF_TIPC,
2541 2542 2543
	.release	= tipc_release,
	.bind		= tipc_bind,
	.connect	= tipc_connect,
2544
	.socketpair	= sock_no_socketpair,
2545
	.accept		= sock_no_accept,
2546 2547
	.getname	= tipc_getname,
	.poll		= tipc_poll,
E
Erik Hugne 已提交
2548
	.ioctl		= tipc_ioctl,
2549
	.listen		= sock_no_listen,
2550 2551 2552 2553 2554
	.shutdown	= tipc_shutdown,
	.setsockopt	= tipc_setsockopt,
	.getsockopt	= tipc_getsockopt,
	.sendmsg	= tipc_sendmsg,
	.recvmsg	= tipc_recvmsg,
2555 2556
	.mmap		= sock_no_mmap,
	.sendpage	= sock_no_sendpage
P
Per Liden 已提交
2557 2558
};

2559
static const struct proto_ops packet_ops = {
2560
	.owner		= THIS_MODULE,
P
Per Liden 已提交
2561
	.family		= AF_TIPC,
2562 2563 2564
	.release	= tipc_release,
	.bind		= tipc_bind,
	.connect	= tipc_connect,
2565
	.socketpair	= sock_no_socketpair,
2566 2567 2568
	.accept		= tipc_accept,
	.getname	= tipc_getname,
	.poll		= tipc_poll,
E
Erik Hugne 已提交
2569
	.ioctl		= tipc_ioctl,
2570 2571 2572 2573 2574 2575
	.listen		= tipc_listen,
	.shutdown	= tipc_shutdown,
	.setsockopt	= tipc_setsockopt,
	.getsockopt	= tipc_getsockopt,
	.sendmsg	= tipc_send_packet,
	.recvmsg	= tipc_recvmsg,
2576 2577
	.mmap		= sock_no_mmap,
	.sendpage	= sock_no_sendpage
P
Per Liden 已提交
2578 2579
};

2580
static const struct proto_ops stream_ops = {
2581
	.owner		= THIS_MODULE,
P
Per Liden 已提交
2582
	.family		= AF_TIPC,
2583 2584 2585
	.release	= tipc_release,
	.bind		= tipc_bind,
	.connect	= tipc_connect,
2586
	.socketpair	= sock_no_socketpair,
2587 2588 2589
	.accept		= tipc_accept,
	.getname	= tipc_getname,
	.poll		= tipc_poll,
E
Erik Hugne 已提交
2590
	.ioctl		= tipc_ioctl,
2591 2592 2593 2594 2595 2596
	.listen		= tipc_listen,
	.shutdown	= tipc_shutdown,
	.setsockopt	= tipc_setsockopt,
	.getsockopt	= tipc_getsockopt,
	.sendmsg	= tipc_send_stream,
	.recvmsg	= tipc_recv_stream,
2597 2598
	.mmap		= sock_no_mmap,
	.sendpage	= sock_no_sendpage
P
Per Liden 已提交
2599 2600
};

2601
static const struct net_proto_family tipc_family_ops = {
2602
	.owner		= THIS_MODULE,
P
Per Liden 已提交
2603
	.family		= AF_TIPC,
2604
	.create		= tipc_sk_create
P
Per Liden 已提交
2605 2606 2607 2608 2609
};

static struct proto tipc_proto = {
	.name		= "TIPC",
	.owner		= THIS_MODULE,
2610 2611
	.obj_size	= sizeof(struct tipc_sock),
	.sysctl_rmem	= sysctl_tipc_rmem
P
Per Liden 已提交
2612 2613
};

2614 2615 2616 2617 2618 2619
static struct proto tipc_proto_kern = {
	.name		= "TIPC",
	.obj_size	= sizeof(struct tipc_sock),
	.sysctl_rmem	= sysctl_tipc_rmem
};

P
Per Liden 已提交
2620
/**
2621
 * tipc_socket_init - initialize TIPC socket interface
2622
 *
P
Per Liden 已提交
2623 2624
 * Returns 0 on success, errno otherwise
 */
2625
int tipc_socket_init(void)
P
Per Liden 已提交
2626 2627 2628
{
	int res;

2629
	res = proto_register(&tipc_proto, 1);
P
Per Liden 已提交
2630
	if (res) {
2631
		pr_err("Failed to register TIPC protocol type\n");
P
Per Liden 已提交
2632 2633 2634 2635 2636
		goto out;
	}

	res = sock_register(&tipc_family_ops);
	if (res) {
2637
		pr_err("Failed to register TIPC socket type\n");
P
Per Liden 已提交
2638 2639 2640 2641 2642 2643 2644 2645
		proto_unregister(&tipc_proto);
		goto out;
	}
 out:
	return res;
}

/**
2646
 * tipc_socket_stop - stop TIPC socket interface
P
Per Liden 已提交
2647
 */
2648
void tipc_socket_stop(void)
P
Per Liden 已提交
2649 2650 2651 2652
{
	sock_unregister(tipc_family_ops.family);
	proto_unregister(&tipc_proto);
}
2653 2654

/* Caller should hold socket lock for the passed tipc socket. */
2655
static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk)
2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689
{
	u32 peer_node;
	u32 peer_port;
	struct nlattr *nest;

	peer_node = tsk_peer_node(tsk);
	peer_port = tsk_peer_port(tsk);

	nest = nla_nest_start(skb, TIPC_NLA_SOCK_CON);

	if (nla_put_u32(skb, TIPC_NLA_CON_NODE, peer_node))
		goto msg_full;
	if (nla_put_u32(skb, TIPC_NLA_CON_SOCK, peer_port))
		goto msg_full;

	if (tsk->conn_type != 0) {
		if (nla_put_flag(skb, TIPC_NLA_CON_FLAG))
			goto msg_full;
		if (nla_put_u32(skb, TIPC_NLA_CON_TYPE, tsk->conn_type))
			goto msg_full;
		if (nla_put_u32(skb, TIPC_NLA_CON_INST, tsk->conn_instance))
			goto msg_full;
	}
	nla_nest_end(skb, nest);

	return 0;

msg_full:
	nla_nest_cancel(skb, nest);

	return -EMSGSIZE;
}

/* Caller should hold socket lock for the passed tipc socket. */
2690 2691
static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
			    struct tipc_sock *tsk)
2692 2693 2694 2695
{
	int err;
	void *hdr;
	struct nlattr *attrs;
2696 2697
	struct net *net = sock_net(skb->sk);
	struct tipc_net *tn = net_generic(net, tipc_net_id);
2698 2699

	hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2700
			  &tipc_genl_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
2701 2702 2703 2704 2705 2706
	if (!hdr)
		goto msg_cancel;

	attrs = nla_nest_start(skb, TIPC_NLA_SOCK);
	if (!attrs)
		goto genlmsg_cancel;
2707
	if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid))
2708
		goto attr_msg_cancel;
2709
	if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tn->own_addr))
2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736
		goto attr_msg_cancel;

	if (tsk->connected) {
		err = __tipc_nl_add_sk_con(skb, tsk);
		if (err)
			goto attr_msg_cancel;
	} else if (!list_empty(&tsk->publications)) {
		if (nla_put_flag(skb, TIPC_NLA_SOCK_HAS_PUBL))
			goto attr_msg_cancel;
	}
	nla_nest_end(skb, attrs);
	genlmsg_end(skb, hdr);

	return 0;

attr_msg_cancel:
	nla_nest_cancel(skb, attrs);
genlmsg_cancel:
	genlmsg_cancel(skb, hdr);
msg_cancel:
	return -EMSGSIZE;
}

int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
{
	int err;
	struct tipc_sock *tsk;
2737 2738
	const struct bucket_table *tbl;
	struct rhash_head *pos;
2739 2740
	struct net *net = sock_net(skb->sk);
	struct tipc_net *tn = net_generic(net, tipc_net_id);
2741 2742
	u32 tbl_id = cb->args[0];
	u32 prev_portid = cb->args[1];
2743

2744
	rcu_read_lock();
2745
	tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2746 2747
	for (; tbl_id < tbl->size; tbl_id++) {
		rht_for_each_entry_rcu(tsk, pos, tbl, tbl_id, node) {
2748
			spin_lock_bh(&tsk->sk.sk_lock.slock);
2749 2750 2751 2752 2753
			if (prev_portid && prev_portid != tsk->portid) {
				spin_unlock_bh(&tsk->sk.sk_lock.slock);
				continue;
			}

2754
			err = __tipc_nl_add_sk(skb, cb, tsk);
2755 2756 2757 2758 2759 2760
			if (err) {
				prev_portid = tsk->portid;
				spin_unlock_bh(&tsk->sk.sk_lock.slock);
				goto out;
			}
			prev_portid = 0;
2761 2762
			spin_unlock_bh(&tsk->sk.sk_lock.slock);
		}
2763
	}
2764
out:
2765
	rcu_read_unlock();
2766 2767
	cb->args[0] = tbl_id;
	cb->args[1] = prev_portid;
2768 2769 2770

	return skb->len;
}
2771 2772

/* Caller should hold socket lock for the passed tipc socket. */
2773 2774 2775
static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
				 struct netlink_callback *cb,
				 struct publication *publ)
2776 2777 2778 2779 2780
{
	void *hdr;
	struct nlattr *attrs;

	hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2781
			  &tipc_genl_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
2782 2783 2784 2785 2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806 2807 2808 2809 2810 2811
	if (!hdr)
		goto msg_cancel;

	attrs = nla_nest_start(skb, TIPC_NLA_PUBL);
	if (!attrs)
		goto genlmsg_cancel;

	if (nla_put_u32(skb, TIPC_NLA_PUBL_KEY, publ->key))
		goto attr_msg_cancel;
	if (nla_put_u32(skb, TIPC_NLA_PUBL_TYPE, publ->type))
		goto attr_msg_cancel;
	if (nla_put_u32(skb, TIPC_NLA_PUBL_LOWER, publ->lower))
		goto attr_msg_cancel;
	if (nla_put_u32(skb, TIPC_NLA_PUBL_UPPER, publ->upper))
		goto attr_msg_cancel;

	nla_nest_end(skb, attrs);
	genlmsg_end(skb, hdr);

	return 0;

attr_msg_cancel:
	nla_nest_cancel(skb, attrs);
genlmsg_cancel:
	genlmsg_cancel(skb, hdr);
msg_cancel:
	return -EMSGSIZE;
}

/* Caller should hold socket lock for the passed tipc socket. */
2812 2813 2814
static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
				  struct netlink_callback *cb,
				  struct tipc_sock *tsk, u32 *last_publ)
2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853 2854
{
	int err;
	struct publication *p;

	if (*last_publ) {
		list_for_each_entry(p, &tsk->publications, pport_list) {
			if (p->key == *last_publ)
				break;
		}
		if (p->key != *last_publ) {
			/* We never set seq or call nl_dump_check_consistent()
			 * this means that setting prev_seq here will cause the
			 * consistence check to fail in the netlink callback
			 * handler. Resulting in the last NLMSG_DONE message
			 * having the NLM_F_DUMP_INTR flag set.
			 */
			cb->prev_seq = 1;
			*last_publ = 0;
			return -EPIPE;
		}
	} else {
		p = list_first_entry(&tsk->publications, struct publication,
				     pport_list);
	}

	list_for_each_entry_from(p, &tsk->publications, pport_list) {
		err = __tipc_nl_add_sk_publ(skb, cb, p);
		if (err) {
			*last_publ = p->key;
			return err;
		}
	}
	*last_publ = 0;

	return 0;
}

int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
{
	int err;
2855
	u32 tsk_portid = cb->args[0];
2856 2857
	u32 last_publ = cb->args[1];
	u32 done = cb->args[2];
2858
	struct net *net = sock_net(skb->sk);
2859 2860
	struct tipc_sock *tsk;

2861
	if (!tsk_portid) {
2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875 2876 2877
		struct nlattr **attrs;
		struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];

		err = tipc_nlmsg_parse(cb->nlh, &attrs);
		if (err)
			return err;

		err = nla_parse_nested(sock, TIPC_NLA_SOCK_MAX,
				       attrs[TIPC_NLA_SOCK],
				       tipc_nl_sock_policy);
		if (err)
			return err;

		if (!sock[TIPC_NLA_SOCK_REF])
			return -EINVAL;

2878
		tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
2879 2880 2881 2882 2883
	}

	if (done)
		return 0;

2884
	tsk = tipc_sk_lookup(net, tsk_portid);
2885 2886 2887 2888 2889 2890 2891 2892
	if (!tsk)
		return -EINVAL;

	lock_sock(&tsk->sk);
	err = __tipc_nl_list_sk_publ(skb, cb, tsk, &last_publ);
	if (!err)
		done = 1;
	release_sock(&tsk->sk);
2893
	sock_put(&tsk->sk);
2894

2895
	cb->args[0] = tsk_portid;
2896 2897 2898 2899 2900
	cb->args[1] = last_publ;
	cb->args[2] = done;

	return skb->len;
}