message.c 12.6 KB
Newer Older
A
Andy Grover 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
/*
 * Copyright (c) 2006 Oracle.  All rights reserved.
 *
 * This software is available to you under a choice of one of two
 * licenses.  You may choose to be licensed under the terms of the GNU
 * General Public License (GPL) Version 2, available from the file
 * COPYING in the main directory of this source tree, or the
 * OpenIB.org BSD license below:
 *
 *     Redistribution and use in source and binary forms, with or
 *     without modification, are permitted provided that the following
 *     conditions are met:
 *
 *      - Redistributions of source code must retain the above
 *        copyright notice, this list of conditions and the following
 *        disclaimer.
 *
 *      - Redistributions in binary form must reproduce the above
 *        copyright notice, this list of conditions and the following
 *        disclaimer in the documentation and/or other materials
 *        provided with the distribution.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 *
 */
#include <linux/kernel.h>
34
#include <linux/slab.h>
35
#include <linux/export.h>
36 37 38
#include <linux/skbuff.h>
#include <linux/list.h>
#include <linux/errqueue.h>
A
Andy Grover 已提交
39 40 41 42 43 44 45 46

#include "rds.h"

static unsigned int	rds_exthdr_size[__RDS_EXTHDR_MAX] = {
[RDS_EXTHDR_NONE]	= 0,
[RDS_EXTHDR_VERSION]	= sizeof(struct rds_ext_header_version),
[RDS_EXTHDR_RDMA]	= sizeof(struct rds_ext_header_rdma),
[RDS_EXTHDR_RDMA_DEST]	= sizeof(struct rds_ext_header_rdma_dest),
47
[RDS_EXTHDR_NPATHS]	= sizeof(u16),
48
[RDS_EXTHDR_GEN_NUM]	= sizeof(u32),
A
Andy Grover 已提交
49 50 51 52 53
};


void rds_message_addref(struct rds_message *rm)
{
54 55
	rdsdebug("addref rm %p ref %d\n", rm, refcount_read(&rm->m_refcount));
	refcount_inc(&rm->m_refcount);
A
Andy Grover 已提交
56
}
A
Andy Grover 已提交
57
EXPORT_SYMBOL_GPL(rds_message_addref);
A
Andy Grover 已提交
58

59 60
static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie)
{
61 62
	struct rds_zcopy_cookies *ck = (struct rds_zcopy_cookies *)skb->cb;
	int ncookies = ck->num;
63

64
	if (ncookies == RDS_MAX_ZCOOKIES)
65
		return false;
66 67
	ck->cookies[ncookies] = cookie;
	ck->num =  ++ncookies;
68 69 70 71 72 73 74 75 76 77
	return true;
}

static void rds_rm_zerocopy_callback(struct rds_sock *rs,
				     struct rds_znotifier *znotif)
{
	struct sk_buff *skb, *tail;
	unsigned long flags;
	struct sk_buff_head *q;
	u32 cookie = znotif->z_cookie;
78
	struct rds_zcopy_cookies *ck;
79

80
	q = &rs->rs_zcookie_queue;
81 82 83 84 85 86 87
	spin_lock_irqsave(&q->lock, flags);
	tail = skb_peek_tail(q);

	if (tail && skb_zcookie_add(tail, cookie)) {
		spin_unlock_irqrestore(&q->lock, flags);
		mm_unaccount_pinned_pages(&znotif->z_mmp);
		consume_skb(rds_skb_from_znotifier(znotif));
88
		/* caller invokes rds_wake_sk_sleep() */
89 90 91 92
		return;
	}

	skb = rds_skb_from_znotifier(znotif);
93 94
	ck = (struct rds_zcopy_cookies *)skb->cb;
	memset(ck, 0, sizeof(*ck));
95 96 97 98 99
	WARN_ON(!skb_zcookie_add(skb, cookie));

	__skb_queue_tail(q, skb);

	spin_unlock_irqrestore(&q->lock, flags);
100
	/* caller invokes rds_wake_sk_sleep() */
101 102 103 104

	mm_unaccount_pinned_pages(&znotif->z_mmp);
}

A
Andy Grover 已提交
105 106 107 108 109
/*
 * This relies on dma_map_sg() not touching sg[].page during merging.
 */
static void rds_message_purge(struct rds_message *rm)
{
110
	unsigned long i, flags;
111
	bool zcopy = false;
A
Andy Grover 已提交
112 113 114 115

	if (unlikely(test_bit(RDS_MSG_PAGEVEC, &rm->m_flags)))
		return;

116 117
	spin_lock_irqsave(&rm->m_rs_lock, flags);
	if (rm->m_rs) {
118 119 120 121 122
		struct rds_sock *rs = rm->m_rs;

		if (rm->data.op_mmp_znotifier) {
			zcopy = true;
			rds_rm_zerocopy_callback(rs, rm->data.op_mmp_znotifier);
123
			rds_wake_sk_sleep(rs);
124 125 126
			rm->data.op_mmp_znotifier = NULL;
		}
		sock_put(rds_rs_to_sk(rs));
127 128 129
		rm->m_rs = NULL;
	}
	spin_unlock_irqrestore(&rm->m_rs_lock, flags);
A
Andy Grover 已提交
130

131 132 133 134 135 136 137 138 139
	for (i = 0; i < rm->data.op_nents; i++) {
		/* XXX will have to put_page for page refs */
		if (!zcopy)
			__free_page(sg_page(&rm->data.op_sg[i]));
		else
			put_page(sg_page(&rm->data.op_sg[i]));
	}
	rm->data.op_nents = 0;

A
Andy Grover 已提交
140 141 142 143
	if (rm->rdma.op_active)
		rds_rdma_free_op(&rm->rdma);
	if (rm->rdma.op_rdma_mr)
		rds_mr_put(rm->rdma.op_rdma_mr);
144 145 146 147 148

	if (rm->atomic.op_active)
		rds_atomic_free_op(&rm->atomic);
	if (rm->atomic.op_rdma_mr)
		rds_mr_put(rm->atomic.op_rdma_mr);
A
Andy Grover 已提交
149 150 151 152
}

void rds_message_put(struct rds_message *rm)
{
153 154 155
	rdsdebug("put rm %p ref %d\n", rm, refcount_read(&rm->m_refcount));
	WARN(!refcount_read(&rm->m_refcount), "danger refcount zero on %p\n", rm);
	if (refcount_dec_and_test(&rm->m_refcount)) {
A
Andy Grover 已提交
156 157 158 159 160 161 162
		BUG_ON(!list_empty(&rm->m_sock_item));
		BUG_ON(!list_empty(&rm->m_conn_item));
		rds_message_purge(rm);

		kfree(rm);
	}
}
A
Andy Grover 已提交
163
EXPORT_SYMBOL_GPL(rds_message_put);
A
Andy Grover 已提交
164 165 166 167 168 169 170 171 172 173

void rds_message_populate_header(struct rds_header *hdr, __be16 sport,
				 __be16 dport, u64 seq)
{
	hdr->h_flags = 0;
	hdr->h_sport = sport;
	hdr->h_dport = dport;
	hdr->h_sequence = cpu_to_be64(seq);
	hdr->h_exthdr[0] = RDS_EXTHDR_NONE;
}
A
Andy Grover 已提交
174
EXPORT_SYMBOL_GPL(rds_message_populate_header);
A
Andy Grover 已提交
175

176 177
int rds_message_add_extension(struct rds_header *hdr, unsigned int type,
			      const void *data, unsigned int len)
A
Andy Grover 已提交
178 179 180 181 182 183 184 185
{
	unsigned int ext_len = sizeof(u8) + len;
	unsigned char *dst;

	/* For now, refuse to add more than one extension header */
	if (hdr->h_exthdr[0] != RDS_EXTHDR_NONE)
		return 0;

186
	if (type >= __RDS_EXTHDR_MAX || len != rds_exthdr_size[type])
A
Andy Grover 已提交
187 188 189 190 191 192 193 194 195 196 197 198
		return 0;

	if (ext_len >= RDS_HEADER_EXT_SPACE)
		return 0;
	dst = hdr->h_exthdr;

	*dst++ = type;
	memcpy(dst, data, len);

	dst[len] = RDS_EXTHDR_NONE;
	return 1;
}
A
Andy Grover 已提交
199
EXPORT_SYMBOL_GPL(rds_message_add_extension);
A
Andy Grover 已提交
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254

/*
 * If a message has extension headers, retrieve them here.
 * Call like this:
 *
 * unsigned int pos = 0;
 *
 * while (1) {
 *	buflen = sizeof(buffer);
 *	type = rds_message_next_extension(hdr, &pos, buffer, &buflen);
 *	if (type == RDS_EXTHDR_NONE)
 *		break;
 *	...
 * }
 */
int rds_message_next_extension(struct rds_header *hdr,
		unsigned int *pos, void *buf, unsigned int *buflen)
{
	unsigned int offset, ext_type, ext_len;
	u8 *src = hdr->h_exthdr;

	offset = *pos;
	if (offset >= RDS_HEADER_EXT_SPACE)
		goto none;

	/* Get the extension type and length. For now, the
	 * length is implied by the extension type. */
	ext_type = src[offset++];

	if (ext_type == RDS_EXTHDR_NONE || ext_type >= __RDS_EXTHDR_MAX)
		goto none;
	ext_len = rds_exthdr_size[ext_type];
	if (offset + ext_len > RDS_HEADER_EXT_SPACE)
		goto none;

	*pos = offset + ext_len;
	if (ext_len < *buflen)
		*buflen = ext_len;
	memcpy(buf, src + offset, *buflen);
	return ext_type;

none:
	*pos = RDS_HEADER_EXT_SPACE;
	*buflen = 0;
	return RDS_EXTHDR_NONE;
}

int rds_message_add_rdma_dest_extension(struct rds_header *hdr, u32 r_key, u32 offset)
{
	struct rds_ext_header_rdma_dest ext_hdr;

	ext_hdr.h_rdma_rkey = cpu_to_be32(r_key);
	ext_hdr.h_rdma_offset = cpu_to_be32(offset);
	return rds_message_add_extension(hdr, RDS_EXTHDR_RDMA_DEST, &ext_hdr, sizeof(ext_hdr));
}
A
Andy Grover 已提交
255
EXPORT_SYMBOL_GPL(rds_message_add_rdma_dest_extension);
A
Andy Grover 已提交
256

257 258 259 260 261 262
/*
 * Each rds_message is allocated with extra space for the scatterlist entries
 * rds ops will need. This is to minimize memory allocation count. Then, each rds op
 * can grab SGs when initializing its part of the rds_message.
 */
struct rds_message *rds_message_alloc(unsigned int extra_len, gfp_t gfp)
A
Andy Grover 已提交
263 264 265
{
	struct rds_message *rm;

266 267 268
	if (extra_len > KMALLOC_MAX_SIZE - sizeof(struct rds_message))
		return NULL;

269
	rm = kzalloc(sizeof(struct rds_message) + extra_len, gfp);
A
Andy Grover 已提交
270 271 272
	if (!rm)
		goto out;

273 274 275
	rm->m_used_sgs = 0;
	rm->m_total_sgs = extra_len / sizeof(struct scatterlist);

276
	refcount_set(&rm->m_refcount, 1);
A
Andy Grover 已提交
277 278 279
	INIT_LIST_HEAD(&rm->m_sock_item);
	INIT_LIST_HEAD(&rm->m_conn_item);
	spin_lock_init(&rm->m_rs_lock);
C
Chris Mason 已提交
280
	init_waitqueue_head(&rm->m_flush_wait);
A
Andy Grover 已提交
281 282 283 284 285

out:
	return rm;
}

286 287 288 289 290 291 292 293 294
/*
 * RDS ops use this to grab SG entries from the rm's sg pool.
 */
struct scatterlist *rds_message_alloc_sgs(struct rds_message *rm, int nents)
{
	struct scatterlist *sg_first = (struct scatterlist *) &rm[1];
	struct scatterlist *sg_ret;

	WARN_ON(rm->m_used_sgs + nents > rm->m_total_sgs);
295
	WARN_ON(!nents);
296

297 298 299
	if (rm->m_used_sgs + nents > rm->m_total_sgs)
		return NULL;

300
	sg_ret = &sg_first[rm->m_used_sgs];
301
	sg_init_table(sg_ret, nents);
302 303 304 305 306
	rm->m_used_sgs += nents;

	return sg_ret;
}

A
Andy Grover 已提交
307 308 309 310
struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned int total_len)
{
	struct rds_message *rm;
	unsigned int i;
311 312
	int num_sgs = ceil(total_len, PAGE_SIZE);
	int extra_bytes = num_sgs * sizeof(struct scatterlist);
A
Andy Grover 已提交
313

314
	rm = rds_message_alloc(extra_bytes, GFP_NOWAIT);
315
	if (!rm)
A
Andy Grover 已提交
316 317 318 319
		return ERR_PTR(-ENOMEM);

	set_bit(RDS_MSG_PAGEVEC, &rm->m_flags);
	rm->m_inc.i_hdr.h_len = cpu_to_be32(total_len);
320 321
	rm->data.op_nents = ceil(total_len, PAGE_SIZE);
	rm->data.op_sg = rds_message_alloc_sgs(rm, num_sgs);
322 323
	if (!rm->data.op_sg) {
		rds_message_put(rm);
324
		return ERR_PTR(-ENOMEM);
325
	}
A
Andy Grover 已提交
326

327 328
	for (i = 0; i < rm->data.op_nents; ++i) {
		sg_set_page(&rm->data.op_sg[i],
A
Andy Grover 已提交
329 330 331 332 333 334 335
				virt_to_page(page_addrs[i]),
				PAGE_SIZE, 0);
	}

	return rm;
}

336
int rds_message_zcopy_from_user(struct rds_message *rm, struct iov_iter *from)
A
Andy Grover 已提交
337 338 339
{
	unsigned long sg_off;
	struct scatterlist *sg;
340
	int ret = 0;
S
Sowmini Varadhan 已提交
341
	int length = iov_iter_count(from);
342 343
	int total_copied = 0;
	struct sk_buff *skb;
A
Andy Grover 已提交
344

345
	rm->m_inc.i_hdr.h_len = cpu_to_be32(iov_iter_count(from));
A
Andy Grover 已提交
346 347 348 349

	/*
	 * now allocate and copy in the data payload.
	 */
350
	sg = rm->data.op_sg;
A
Andy Grover 已提交
351 352
	sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */

353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
	skb = alloc_skb(0, GFP_KERNEL);
	if (!skb)
		return -ENOMEM;
	BUILD_BUG_ON(sizeof(skb->cb) < max_t(int, sizeof(struct rds_znotifier),
					     sizeof(struct rds_zcopy_cookies)));
	rm->data.op_mmp_znotifier = RDS_ZCOPY_SKB(skb);
	if (mm_account_pinned_pages(&rm->data.op_mmp_znotifier->z_mmp,
				    length)) {
		ret = -ENOMEM;
		goto err;
	}
	while (iov_iter_count(from)) {
		struct page *pages;
		size_t start;
		ssize_t copied;

		copied = iov_iter_get_pages(from, &pages, PAGE_SIZE,
					    1, &start);
		if (copied < 0) {
			struct mmpin *mmp;
			int i;

			for (i = 0; i < rm->data.op_nents; i++)
				put_page(sg_page(&rm->data.op_sg[i]));
			mmp = &rm->data.op_mmp_znotifier->z_mmp;
			mm_unaccount_pinned_pages(mmp);
			ret = -EFAULT;
S
Sowmini Varadhan 已提交
380 381
			goto err;
		}
382 383 384 385 386 387 388 389 390
		total_copied += copied;
		iov_iter_advance(from, copied);
		length -= copied;
		sg_set_page(sg, pages, copied, start);
		rm->data.op_nents++;
		sg++;
	}
	WARN_ON_ONCE(length != 0);
	return ret;
S
Sowmini Varadhan 已提交
391
err:
392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412
	consume_skb(skb);
	rm->data.op_mmp_znotifier = NULL;
	return ret;
}

int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from,
			       bool zcopy)
{
	unsigned long to_copy, nbytes;
	unsigned long sg_off;
	struct scatterlist *sg;
	int ret = 0;

	rm->m_inc.i_hdr.h_len = cpu_to_be32(iov_iter_count(from));

	/* now allocate and copy in the data payload.  */
	sg = rm->data.op_sg;
	sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */

	if (zcopy)
		return rds_message_zcopy_from_user(rm, from);
S
Sowmini Varadhan 已提交
413

414
	while (iov_iter_count(from)) {
415
		if (!sg_page(sg)) {
416
			ret = rds_page_remainder_alloc(sg, iov_iter_count(from),
A
Andy Grover 已提交
417 418
						       GFP_HIGHUSER);
			if (ret)
419
				return ret;
420
			rm->data.op_nents++;
A
Andy Grover 已提交
421 422 423
			sg_off = 0;
		}

424 425
		to_copy = min_t(unsigned long, iov_iter_count(from),
				sg->length - sg_off);
A
Andy Grover 已提交
426

427
		rds_stats_add(s_copy_from_user, to_copy);
428 429 430
		nbytes = copy_page_from_iter(sg_page(sg), sg->offset + sg_off,
					     to_copy, from);
		if (nbytes != to_copy)
431
			return -EFAULT;
A
Andy Grover 已提交
432 433 434 435 436 437 438

		sg_off += to_copy;

		if (sg_off == sg->length)
			sg++;
	}

439
	return ret;
A
Andy Grover 已提交
440 441
}

442
int rds_message_inc_copy_to_user(struct rds_incoming *inc, struct iov_iter *to)
A
Andy Grover 已提交
443 444 445 446 447 448 449 450 451 452 453 454
{
	struct rds_message *rm;
	struct scatterlist *sg;
	unsigned long to_copy;
	unsigned long vec_off;
	int copied;
	int ret;
	u32 len;

	rm = container_of(inc, struct rds_message, m_inc);
	len = be32_to_cpu(rm->m_inc.i_hdr.h_len);

455
	sg = rm->data.op_sg;
A
Andy Grover 已提交
456 457 458
	vec_off = 0;
	copied = 0;

459
	while (iov_iter_count(to) && copied < len) {
460 461
		to_copy = min_t(unsigned long, iov_iter_count(to),
				sg->length - vec_off);
A
Andy Grover 已提交
462 463
		to_copy = min_t(unsigned long, to_copy, len - copied);

464 465 466 467 468
		rds_stats_add(s_copy_to_user, to_copy);
		ret = copy_page_to_iter(sg_page(sg), sg->offset + vec_off,
					to_copy, to);
		if (ret != to_copy)
			return -EFAULT;
A
Andy Grover 已提交
469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487

		vec_off += to_copy;
		copied += to_copy;

		if (vec_off == sg->length) {
			vec_off = 0;
			sg++;
		}
	}

	return copied;
}

/*
 * If the message is still on the send queue, wait until the transport
 * is done with it. This is particularly important for RDMA operations.
 */
void rds_message_wait(struct rds_message *rm)
{
C
Chris Mason 已提交
488
	wait_event_interruptible(rm->m_flush_wait,
A
Andy Grover 已提交
489 490 491 492 493 494
			!test_bit(RDS_MSG_MAPPED, &rm->m_flags));
}

void rds_message_unmapped(struct rds_message *rm)
{
	clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
C
Chris Mason 已提交
495
	wake_up_interruptible(&rm->m_flush_wait);
A
Andy Grover 已提交
496
}
A
Andy Grover 已提交
497
EXPORT_SYMBOL_GPL(rds_message_unmapped);
A
Andy Grover 已提交
498