drbd_receiver.c 153.5 KB
Newer Older
P
Philipp Reisner 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
/*
   drbd_receiver.c

   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.

   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.

   drbd is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2, or (at your option)
   any later version.

   drbd is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with drbd; see the file COPYING.  If not, write to
   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
 */


#include <linux/module.h>

#include <asm/uaccess.h>
#include <net/sock.h>

#include <linux/drbd.h>
#include <linux/fs.h>
#include <linux/file.h>
#include <linux/in.h>
#include <linux/mm.h>
#include <linux/memcontrol.h>
#include <linux/mm_inline.h>
#include <linux/slab.h>
#include <linux/pkt_sched.h>
#define __KERNEL_SYSCALLS__
#include <linux/unistd.h>
#include <linux/vmalloc.h>
#include <linux/random.h>
#include <linux/string.h>
#include <linux/scatterlist.h>
#include "drbd_int.h"
47
#include "drbd_protocol.h"
P
Philipp Reisner 已提交
48 49 50 51
#include "drbd_req.h"

#include "drbd_vli.h"

52 53
struct packet_info {
	enum drbd_packet cmd;
54 55
	unsigned int size;
	unsigned int vnr;
56
	void *data;
57 58
};

P
Philipp Reisner 已提交
59 60 61 62 63 64
enum finish_epoch {
	FE_STILL_LIVE,
	FE_DESTROYED,
	FE_RECYCLED,
};

65 66
static int drbd_do_features(struct drbd_connection *connection);
static int drbd_do_auth(struct drbd_connection *connection);
67
static int drbd_disconnected(struct drbd_device *device);
P
Philipp Reisner 已提交
68

69
static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
70
static int e_end_block(struct drbd_work *, int);
P
Philipp Reisner 已提交
71 72 73 74


#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)

75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
/*
 * some helper functions to deal with single linked page lists,
 * page->private being our "next" pointer.
 */

/* If at least n pages are linked at head, get n pages off.
 * Otherwise, don't modify head, and return NULL.
 * Locking is the responsibility of the caller.
 */
static struct page *page_chain_del(struct page **head, int n)
{
	struct page *page;
	struct page *tmp;

	BUG_ON(!n);
	BUG_ON(!head);

	page = *head;
93 94 95 96

	if (!page)
		return NULL;

97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
	while (page) {
		tmp = page_chain_next(page);
		if (--n == 0)
			break; /* found sufficient pages */
		if (tmp == NULL)
			/* insufficient pages, don't use any of them. */
			return NULL;
		page = tmp;
	}

	/* add end of list marker for the returned list */
	set_page_private(page, 0);
	/* actual return value, and adjustment of head */
	page = *head;
	*head = tmp;
	return page;
}

/* may be used outside of locks to find the tail of a (usually short)
 * "private" page chain, before adding it back to a global chain head
 * with page_chain_add() under a spinlock. */
static struct page *page_chain_tail(struct page *page, int *len)
{
	struct page *tmp;
	int i = 1;
	while ((tmp = page_chain_next(page)))
		++i, page = tmp;
	if (len)
		*len = i;
	return page;
}

static int page_chain_free(struct page *page)
{
	struct page *tmp;
	int i = 0;
	page_chain_for_each_safe(page, tmp) {
		put_page(page);
		++i;
	}
	return i;
}

static void page_chain_add(struct page **head,
		struct page *chain_first, struct page *chain_last)
{
#if 1
	struct page *tmp;
	tmp = page_chain_tail(chain_first, NULL);
	BUG_ON(tmp != chain_last);
#endif

	/* add chain to head */
	set_page_private(chain_last, (unsigned long)*head);
	*head = chain_first;
}

154
static struct page *__drbd_alloc_pages(struct drbd_device *device,
155
				       unsigned int number)
P
Philipp Reisner 已提交
156 157
{
	struct page *page = NULL;
158
	struct page *tmp = NULL;
159
	unsigned int i = 0;
P
Philipp Reisner 已提交
160 161 162

	/* Yes, testing drbd_pp_vacant outside the lock is racy.
	 * So what. It saves a spin_lock. */
163
	if (drbd_pp_vacant >= number) {
P
Philipp Reisner 已提交
164
		spin_lock(&drbd_pp_lock);
165 166 167
		page = page_chain_del(&drbd_pp_pool, number);
		if (page)
			drbd_pp_vacant -= number;
P
Philipp Reisner 已提交
168
		spin_unlock(&drbd_pp_lock);
169 170
		if (page)
			return page;
P
Philipp Reisner 已提交
171
	}
172

P
Philipp Reisner 已提交
173 174 175
	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
	 * "criss-cross" setup, that might cause write-out on some other DRBD,
	 * which in turn might block on the other node at this very place.  */
176 177 178 179 180 181 182 183 184 185 186 187
	for (i = 0; i < number; i++) {
		tmp = alloc_page(GFP_TRY);
		if (!tmp)
			break;
		set_page_private(tmp, (unsigned long)page);
		page = tmp;
	}

	if (i == number)
		return page;

	/* Not enough pages immediately available this time.
188
	 * No need to jump around here, drbd_alloc_pages will retry this
189 190 191 192 193 194 195 196 197
	 * function "soon". */
	if (page) {
		tmp = page_chain_tail(page, NULL);
		spin_lock(&drbd_pp_lock);
		page_chain_add(&drbd_pp_pool, page, tmp);
		drbd_pp_vacant += i;
		spin_unlock(&drbd_pp_lock);
	}
	return NULL;
P
Philipp Reisner 已提交
198 199
}

200
static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
201
					   struct list_head *to_be_freed)
P
Philipp Reisner 已提交
202
{
203
	struct drbd_peer_request *peer_req;
P
Philipp Reisner 已提交
204 205 206 207 208 209 210
	struct list_head *le, *tle;

	/* The EEs are always appended to the end of the list. Since
	   they are sent in order over the wire, they have to finish
	   in order. As soon as we see the first not finished we can
	   stop to examine the list... */

211
	list_for_each_safe(le, tle, &device->net_ee) {
212
		peer_req = list_entry(le, struct drbd_peer_request, w.list);
213
		if (drbd_peer_req_has_active_page(peer_req))
P
Philipp Reisner 已提交
214 215 216 217 218
			break;
		list_move(le, to_be_freed);
	}
}

219
static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device)
P
Philipp Reisner 已提交
220 221
{
	LIST_HEAD(reclaimed);
222
	struct drbd_peer_request *peer_req, *t;
P
Philipp Reisner 已提交
223

224
	spin_lock_irq(&first_peer_device(device)->connection->req_lock);
225
	reclaim_finished_net_peer_reqs(device, &reclaimed);
226
	spin_unlock_irq(&first_peer_device(device)->connection->req_lock);
P
Philipp Reisner 已提交
227

228
	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
229
		drbd_free_net_peer_req(device, peer_req);
P
Philipp Reisner 已提交
230 231 232
}

/**
233
 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
234
 * @device:	DRBD device.
235 236 237 238 239 240
 * @number:	number of pages requested
 * @retry:	whether to retry, if not enough pages are available right now
 *
 * Tries to allocate number pages, first from our own page pool, then from
 * the kernel, unless this allocation would exceed the max_buffers setting.
 * Possibly retry until DRBD frees sufficient pages somewhere else.
P
Philipp Reisner 已提交
241
 *
242
 * Returns a page chain linked via page->private.
P
Philipp Reisner 已提交
243
 */
244
struct page *drbd_alloc_pages(struct drbd_device *device, unsigned int number,
245
			      bool retry)
P
Philipp Reisner 已提交
246 247
{
	struct page *page = NULL;
248
	struct net_conf *nc;
P
Philipp Reisner 已提交
249
	DEFINE_WAIT(wait);
250
	int mxb;
P
Philipp Reisner 已提交
251

252 253
	/* Yes, we may run up to @number over max_buffers. If we
	 * follow it strictly, the admin will get it wrong anyways. */
254
	rcu_read_lock();
255
	nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
256 257 258
	mxb = nc ? nc->max_buffers : 1000000;
	rcu_read_unlock();

259 260
	if (atomic_read(&device->pp_in_use) < mxb)
		page = __drbd_alloc_pages(device, number);
P
Philipp Reisner 已提交
261

262
	while (page == NULL) {
P
Philipp Reisner 已提交
263 264
		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);

265
		drbd_kick_lo_and_reclaim_net(device);
P
Philipp Reisner 已提交
266

267 268
		if (atomic_read(&device->pp_in_use) < mxb) {
			page = __drbd_alloc_pages(device, number);
P
Philipp Reisner 已提交
269 270 271 272 273 274 275 276
			if (page)
				break;
		}

		if (!retry)
			break;

		if (signal_pending(current)) {
277
			dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
P
Philipp Reisner 已提交
278 279 280 281 282 283 284
			break;
		}

		schedule();
	}
	finish_wait(&drbd_pp_wait, &wait);

285
	if (page)
286
		atomic_add(number, &device->pp_in_use);
P
Philipp Reisner 已提交
287 288 289
	return page;
}

290
/* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
291
 * Is also used from inside an other spin_lock_irq(&first_peer_device(device)->connection->req_lock);
292 293
 * Either links the page chain back to the global pool,
 * or returns all pages to the system. */
294
static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
P
Philipp Reisner 已提交
295
{
296
	atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
P
Philipp Reisner 已提交
297
	int i;
298

299 300 301
	if (page == NULL)
		return;

302
	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
303 304 305 306 307 308 309 310
		i = page_chain_free(page);
	else {
		struct page *tmp;
		tmp = page_chain_tail(page, &i);
		spin_lock(&drbd_pp_lock);
		page_chain_add(&drbd_pp_pool, page, tmp);
		drbd_pp_vacant += i;
		spin_unlock(&drbd_pp_lock);
P
Philipp Reisner 已提交
311
	}
312
	i = atomic_sub_return(i, a);
313
	if (i < 0)
314 315
		dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
P
Philipp Reisner 已提交
316 317 318 319 320 321 322 323
	wake_up(&drbd_pp_wait);
}

/*
You need to hold the req_lock:
 _drbd_wait_ee_list_empty()

You must not have the req_lock:
324
 drbd_free_peer_req()
325
 drbd_alloc_peer_req()
326
 drbd_free_peer_reqs()
P
Philipp Reisner 已提交
327
 drbd_ee_fix_bhs()
328
 drbd_finish_peer_reqs()
P
Philipp Reisner 已提交
329 330 331 332
 drbd_clear_done_ee()
 drbd_wait_ee_list_empty()
*/

333
struct drbd_peer_request *
334
drbd_alloc_peer_req(struct drbd_device *device, u64 id, sector_t sector,
335
		    unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
P
Philipp Reisner 已提交
336
{
337
	struct drbd_peer_request *peer_req;
338
	struct page *page = NULL;
339
	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
P
Philipp Reisner 已提交
340

341
	if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
P
Philipp Reisner 已提交
342 343
		return NULL;

344 345
	peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
	if (!peer_req) {
P
Philipp Reisner 已提交
346
		if (!(gfp_mask & __GFP_NOWARN))
347
			dev_err(DEV, "%s: allocation failed\n", __func__);
P
Philipp Reisner 已提交
348 349 350
		return NULL;
	}

351
	if (data_size) {
352
		page = drbd_alloc_pages(device, nr_pages, (gfp_mask & __GFP_WAIT));
353 354 355
		if (!page)
			goto fail;
	}
P
Philipp Reisner 已提交
356

357 358 359 360 361 362 363
	drbd_clear_interval(&peer_req->i);
	peer_req->i.size = data_size;
	peer_req->i.sector = sector;
	peer_req->i.local = false;
	peer_req->i.waiting = false;

	peer_req->epoch = NULL;
364
	peer_req->w.device = device;
365 366 367
	peer_req->pages = page;
	atomic_set(&peer_req->pending_bios, 0);
	peer_req->flags = 0;
368 369 370 371
	/*
	 * The block_id is opaque to the receiver.  It is not endianness
	 * converted, and sent back to the sender unchanged.
	 */
372
	peer_req->block_id = id;
P
Philipp Reisner 已提交
373

374
	return peer_req;
P
Philipp Reisner 已提交
375

376
 fail:
377
	mempool_free(peer_req, drbd_ee_mempool);
P
Philipp Reisner 已提交
378 379 380
	return NULL;
}

381
void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
382
		       int is_net)
P
Philipp Reisner 已提交
383
{
384 385
	if (peer_req->flags & EE_HAS_DIGEST)
		kfree(peer_req->digest);
386
	drbd_free_pages(device, peer_req->pages, is_net);
387 388 389
	D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
	D_ASSERT(drbd_interval_empty(&peer_req->i));
	mempool_free(peer_req, drbd_ee_mempool);
P
Philipp Reisner 已提交
390 391
}

392
int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
P
Philipp Reisner 已提交
393 394
{
	LIST_HEAD(work_list);
395
	struct drbd_peer_request *peer_req, *t;
P
Philipp Reisner 已提交
396
	int count = 0;
397
	int is_net = list == &device->net_ee;
P
Philipp Reisner 已提交
398

399
	spin_lock_irq(&first_peer_device(device)->connection->req_lock);
P
Philipp Reisner 已提交
400
	list_splice_init(list, &work_list);
401
	spin_unlock_irq(&first_peer_device(device)->connection->req_lock);
P
Philipp Reisner 已提交
402

403
	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
404
		__drbd_free_peer_req(device, peer_req, is_net);
P
Philipp Reisner 已提交
405 406 407 408 409 410
		count++;
	}
	return count;
}

/*
411
 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
P
Philipp Reisner 已提交
412
 */
413
static int drbd_finish_peer_reqs(struct drbd_device *device)
P
Philipp Reisner 已提交
414 415 416
{
	LIST_HEAD(work_list);
	LIST_HEAD(reclaimed);
417
	struct drbd_peer_request *peer_req, *t;
418
	int err = 0;
P
Philipp Reisner 已提交
419

420
	spin_lock_irq(&first_peer_device(device)->connection->req_lock);
421 422
	reclaim_finished_net_peer_reqs(device, &reclaimed);
	list_splice_init(&device->done_ee, &work_list);
423
	spin_unlock_irq(&first_peer_device(device)->connection->req_lock);
P
Philipp Reisner 已提交
424

425
	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
426
		drbd_free_net_peer_req(device, peer_req);
P
Philipp Reisner 已提交
427 428

	/* possible callbacks here:
429
	 * e_end_block, and e_end_resync_block, e_send_superseded.
P
Philipp Reisner 已提交
430 431
	 * all ignore the last argument.
	 */
432
	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
433 434
		int err2;

P
Philipp Reisner 已提交
435
		/* list_del not necessary, next/prev members not touched */
436 437 438
		err2 = peer_req->w.cb(&peer_req->w, !!err);
		if (!err)
			err = err2;
439
		drbd_free_peer_req(device, peer_req);
P
Philipp Reisner 已提交
440
	}
441
	wake_up(&device->ee_wait);
P
Philipp Reisner 已提交
442

443
	return err;
P
Philipp Reisner 已提交
444 445
}

446
static void _drbd_wait_ee_list_empty(struct drbd_device *device,
447
				     struct list_head *head)
P
Philipp Reisner 已提交
448 449 450 451 452 453
{
	DEFINE_WAIT(wait);

	/* avoids spin_lock/unlock
	 * and calling prepare_to_wait in the fast path */
	while (!list_empty(head)) {
454
		prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
455
		spin_unlock_irq(&first_peer_device(device)->connection->req_lock);
J
Jens Axboe 已提交
456
		io_schedule();
457
		finish_wait(&device->ee_wait, &wait);
458
		spin_lock_irq(&first_peer_device(device)->connection->req_lock);
P
Philipp Reisner 已提交
459 460 461
	}
}

462
static void drbd_wait_ee_list_empty(struct drbd_device *device,
463
				    struct list_head *head)
P
Philipp Reisner 已提交
464
{
465
	spin_lock_irq(&first_peer_device(device)->connection->req_lock);
466
	_drbd_wait_ee_list_empty(device, head);
467
	spin_unlock_irq(&first_peer_device(device)->connection->req_lock);
P
Philipp Reisner 已提交
468 469
}

470
static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
P
Philipp Reisner 已提交
471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491
{
	mm_segment_t oldfs;
	struct kvec iov = {
		.iov_base = buf,
		.iov_len = size,
	};
	struct msghdr msg = {
		.msg_iovlen = 1,
		.msg_iov = (struct iovec *)&iov,
		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
	};
	int rv;

	oldfs = get_fs();
	set_fs(KERNEL_DS);
	rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
	set_fs(oldfs);

	return rv;
}

492
static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
P
Philipp Reisner 已提交
493 494 495
{
	int rv;

496
	rv = drbd_recv_short(connection->data.socket, buf, size, 0);
P
Philipp Reisner 已提交
497

P
Philipp Reisner 已提交
498 499
	if (rv < 0) {
		if (rv == -ECONNRESET)
500
			conn_info(connection, "sock was reset by peer\n");
P
Philipp Reisner 已提交
501
		else if (rv != -ERESTARTSYS)
502
			conn_err(connection, "sock_recvmsg returned %d\n", rv);
P
Philipp Reisner 已提交
503
	} else if (rv == 0) {
504
		if (test_bit(DISCONNECT_SENT, &connection->flags)) {
505 506
			long t;
			rcu_read_lock();
507
			t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
508 509
			rcu_read_unlock();

510
			t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
511

512 513 514
			if (t)
				goto out;
		}
515
		conn_info(connection, "sock was shut down by peer\n");
516 517
	}

P
Philipp Reisner 已提交
518
	if (rv != size)
519
		conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
P
Philipp Reisner 已提交
520

521
out:
P
Philipp Reisner 已提交
522 523 524
	return rv;
}

525
static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
526 527 528
{
	int err;

529
	err = drbd_recv(connection, buf, size);
530 531 532 533 534 535 536 537
	if (err != size) {
		if (err >= 0)
			err = -EIO;
	} else
		err = 0;
	return err;
}

538
static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
539 540 541
{
	int err;

542
	err = drbd_recv_all(connection, buf, size);
543
	if (err && !signal_pending(current))
544
		conn_warn(connection, "short read (expected size %d)\n", (int)size);
545 546 547
	return err;
}

548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566
/* quoting tcp(7):
 *   On individual connections, the socket buffer size must be set prior to the
 *   listen(2) or connect(2) calls in order to have it take effect.
 * This is our wrapper to do so.
 */
static void drbd_setbufsize(struct socket *sock, unsigned int snd,
		unsigned int rcv)
{
	/* open coded SO_SNDBUF, SO_RCVBUF */
	if (snd) {
		sock->sk->sk_sndbuf = snd;
		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
	}
	if (rcv) {
		sock->sk->sk_rcvbuf = rcv;
		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
	}
}

567
static struct socket *drbd_try_connect(struct drbd_connection *connection)
P
Philipp Reisner 已提交
568 569 570 571
{
	const char *what;
	struct socket *sock;
	struct sockaddr_in6 src_in6;
572 573 574
	struct sockaddr_in6 peer_in6;
	struct net_conf *nc;
	int err, peer_addr_len, my_addr_len;
575
	int sndbuf_size, rcvbuf_size, connect_int;
P
Philipp Reisner 已提交
576 577
	int disconnect_on_error = 1;

578
	rcu_read_lock();
579
	nc = rcu_dereference(connection->net_conf);
580 581
	if (!nc) {
		rcu_read_unlock();
P
Philipp Reisner 已提交
582
		return NULL;
583 584 585
	}
	sndbuf_size = nc->sndbuf_size;
	rcvbuf_size = nc->rcvbuf_size;
586
	connect_int = nc->connect_int;
587
	rcu_read_unlock();
588

589 590
	my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
	memcpy(&src_in6, &connection->my_addr, my_addr_len);
591

592
	if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
593 594 595 596
		src_in6.sin6_port = 0;
	else
		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */

597 598
	peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
	memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
P
Philipp Reisner 已提交
599 600

	what = "sock_create_kern";
601 602
	err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
			       SOCK_STREAM, IPPROTO_TCP, &sock);
P
Philipp Reisner 已提交
603 604 605 606 607 608
	if (err < 0) {
		sock = NULL;
		goto out;
	}

	sock->sk->sk_rcvtimeo =
609
	sock->sk->sk_sndtimeo = connect_int * HZ;
610
	drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
P
Philipp Reisner 已提交
611 612 613 614 615 616 617 618 619

       /* explicitly bind to the configured IP as source IP
	*  for the outgoing connections.
	*  This is needed for multihomed hosts and to be
	*  able to use lo: interfaces for drbd.
	* Make sure to use 0 as port number, so linux selects
	*  a free one dynamically.
	*/
	what = "bind before connect";
620
	err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
P
Philipp Reisner 已提交
621 622 623 624 625 626 627
	if (err < 0)
		goto out;

	/* connect may fail, peer not yet available.
	 * stay C_WF_CONNECTION, don't go Disconnecting! */
	disconnect_on_error = 0;
	what = "connect";
628
	err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
P
Philipp Reisner 已提交
629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645

out:
	if (err < 0) {
		if (sock) {
			sock_release(sock);
			sock = NULL;
		}
		switch (-err) {
			/* timeout, busy, signal pending */
		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
		case EINTR: case ERESTARTSYS:
			/* peer not (yet) available, network problem */
		case ECONNREFUSED: case ENETUNREACH:
		case EHOSTDOWN:    case EHOSTUNREACH:
			disconnect_on_error = 0;
			break;
		default:
646
			conn_err(connection, "%s failed, err = %d\n", what, err);
P
Philipp Reisner 已提交
647 648
		}
		if (disconnect_on_error)
649
			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
P
Philipp Reisner 已提交
650
	}
651

P
Philipp Reisner 已提交
652 653 654
	return sock;
}

655
struct accept_wait_data {
656
	struct drbd_connection *connection;
657 658 659 660 661 662
	struct socket *s_listen;
	struct completion door_bell;
	void (*original_sk_state_change)(struct sock *sk);

};

663
static void drbd_incoming_connection(struct sock *sk)
664 665
{
	struct accept_wait_data *ad = sk->sk_user_data;
666
	void (*state_change)(struct sock *sk);
667

668 669 670 671
	state_change = ad->original_sk_state_change;
	if (sk->sk_state == TCP_ESTABLISHED)
		complete(&ad->door_bell);
	state_change(sk);
672 673
}

674
static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
P
Philipp Reisner 已提交
675
{
676
	int err, sndbuf_size, rcvbuf_size, my_addr_len;
677
	struct sockaddr_in6 my_addr;
678
	struct socket *s_listen;
679
	struct net_conf *nc;
P
Philipp Reisner 已提交
680 681
	const char *what;

682
	rcu_read_lock();
683
	nc = rcu_dereference(connection->net_conf);
684 685
	if (!nc) {
		rcu_read_unlock();
686
		return -EIO;
687 688 689 690
	}
	sndbuf_size = nc->sndbuf_size;
	rcvbuf_size = nc->rcvbuf_size;
	rcu_read_unlock();
P
Philipp Reisner 已提交
691

692 693
	my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
	memcpy(&my_addr, &connection->my_addr, my_addr_len);
P
Philipp Reisner 已提交
694 695

	what = "sock_create_kern";
696
	err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
697
			       SOCK_STREAM, IPPROTO_TCP, &s_listen);
P
Philipp Reisner 已提交
698 699 700 701 702
	if (err) {
		s_listen = NULL;
		goto out;
	}

703
	s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
704
	drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
P
Philipp Reisner 已提交
705 706

	what = "bind before listen";
707
	err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
P
Philipp Reisner 已提交
708 709 710
	if (err < 0)
		goto out;

711 712 713
	ad->s_listen = s_listen;
	write_lock_bh(&s_listen->sk->sk_callback_lock);
	ad->original_sk_state_change = s_listen->sk->sk_state_change;
714
	s_listen->sk->sk_state_change = drbd_incoming_connection;
715 716
	s_listen->sk->sk_user_data = ad;
	write_unlock_bh(&s_listen->sk->sk_callback_lock);
P
Philipp Reisner 已提交
717

718 719 720 721 722
	what = "listen";
	err = s_listen->ops->listen(s_listen, 5);
	if (err < 0)
		goto out;

723
	return 0;
P
Philipp Reisner 已提交
724 725 726 727 728
out:
	if (s_listen)
		sock_release(s_listen);
	if (err < 0) {
		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
729 730
			conn_err(connection, "%s failed, err = %d\n", what, err);
			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
P
Philipp Reisner 已提交
731 732 733
		}
	}

734
	return -EIO;
P
Philipp Reisner 已提交
735 736
}

737
static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
P
Philipp Reisner 已提交
738
{
739 740 741 742
	write_lock_bh(&sk->sk_callback_lock);
	sk->sk_state_change = ad->original_sk_state_change;
	sk->sk_user_data = NULL;
	write_unlock_bh(&sk->sk_callback_lock);
P
Philipp Reisner 已提交
743 744
}

745
static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
P
Philipp Reisner 已提交
746
{
747 748 749 750 751
	int timeo, connect_int, err = 0;
	struct socket *s_estab = NULL;
	struct net_conf *nc;

	rcu_read_lock();
752
	nc = rcu_dereference(connection->net_conf);
753 754 755 756 757 758 759 760
	if (!nc) {
		rcu_read_unlock();
		return NULL;
	}
	connect_int = nc->connect_int;
	rcu_read_unlock();

	timeo = connect_int * HZ;
761 762
	/* 28.5% random jitter */
	timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
763

764 765 766
	err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
	if (err <= 0)
		return NULL;
P
Philipp Reisner 已提交
767

768
	err = kernel_accept(ad->s_listen, &s_estab, 0);
P
Philipp Reisner 已提交
769 770
	if (err < 0) {
		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
771 772
			conn_err(connection, "accept failed, err = %d\n", err);
			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
P
Philipp Reisner 已提交
773 774 775
		}
	}

776 777
	if (s_estab)
		unregister_state_change(s_estab->sk, ad);
P
Philipp Reisner 已提交
778 779 780 781

	return s_estab;
}

782
static int decode_header(struct drbd_connection *, void *, struct packet_info *);
P
Philipp Reisner 已提交
783

784
static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
785 786
			     enum drbd_packet cmd)
{
787
	if (!conn_prepare_command(connection, sock))
788
		return -EIO;
789
	return conn_send_command(connection, sock, cmd, 0, NULL, 0);
P
Philipp Reisner 已提交
790 791
}

792
static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
P
Philipp Reisner 已提交
793
{
794
	unsigned int header_size = drbd_header_size(connection);
795 796
	struct packet_info pi;
	int err;
P
Philipp Reisner 已提交
797

798
	err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
799 800 801 802 803
	if (err != header_size) {
		if (err >= 0)
			err = -EIO;
		return err;
	}
804
	err = decode_header(connection, connection->data.rbuf, &pi);
805 806 807
	if (err)
		return err;
	return pi.cmd;
P
Philipp Reisner 已提交
808 809 810 811 812 813
}

/**
 * drbd_socket_okay() - Free the socket if its connection is not okay
 * @sock:	pointer to the pointer to the socket.
 */
814
static int drbd_socket_okay(struct socket **sock)
P
Philipp Reisner 已提交
815 816 817 818 819
{
	int rr;
	char tb[4];

	if (!*sock)
820
		return false;
P
Philipp Reisner 已提交
821

822
	rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
P
Philipp Reisner 已提交
823 824

	if (rr > 0 || rr == -EAGAIN) {
825
		return true;
P
Philipp Reisner 已提交
826 827 828
	} else {
		sock_release(*sock);
		*sock = NULL;
829
		return false;
P
Philipp Reisner 已提交
830 831
	}
}
832 833
/* Gets called if a connection is established, or if a new minor gets created
   in a connection */
834
int drbd_connected(struct drbd_device *device)
835
{
836
	int err;
837

838 839
	atomic_set(&device->packet_seq, 0);
	device->peer_seq = 0;
840

841 842
	device->state_mutex = first_peer_device(device)->connection->agreed_pro_version < 100 ?
		&first_peer_device(device)->connection->cstate_mutex :
843
		&device->own_state_mutex;
844

845
	err = drbd_send_sync_param(device);
846
	if (!err)
847
		err = drbd_send_sizes(device, 0, 0);
848
	if (!err)
849
		err = drbd_send_uuids(device);
850
	if (!err)
851 852 853 854 855
		err = drbd_send_current_state(device);
	clear_bit(USE_DEGR_WFC_T, &device->flags);
	clear_bit(RESIZE_PENDING, &device->flags);
	atomic_set(&device->ap_in_flight, 0);
	mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
856
	return err;
857
}
P
Philipp Reisner 已提交
858 859 860 861 862 863 864 865 866

/*
 * return values:
 *   1 yes, we have a valid connection
 *   0 oops, did not work out, please try again
 *  -1 peer talks different language,
 *     no point in trying again, please go standalone.
 *  -2 We do not have a network config...
 */
867
static int conn_connect(struct drbd_connection *connection)
P
Philipp Reisner 已提交
868
{
869
	struct drbd_socket sock, msock;
870
	struct drbd_peer_device *peer_device;
871
	struct net_conf *nc;
872
	int vnr, timeout, h, ok;
873
	bool discard_my_data;
874
	enum drbd_state_rv rv;
875
	struct accept_wait_data ad = {
876
		.connection = connection,
877 878
		.door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
	};
P
Philipp Reisner 已提交
879

880 881
	clear_bit(DISCONNECT_SENT, &connection->flags);
	if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
P
Philipp Reisner 已提交
882 883
		return -2;

884
	mutex_init(&sock.mutex);
885 886
	sock.sbuf = connection->data.sbuf;
	sock.rbuf = connection->data.rbuf;
887 888
	sock.socket = NULL;
	mutex_init(&msock.mutex);
889 890
	msock.sbuf = connection->meta.sbuf;
	msock.rbuf = connection->meta.rbuf;
891 892
	msock.socket = NULL;

893
	/* Assume that the peer only understands protocol 80 until we know better.  */
894
	connection->agreed_pro_version = 80;
P
Philipp Reisner 已提交
895

896
	if (prepare_listen_socket(connection, &ad))
897
		return 0;
P
Philipp Reisner 已提交
898 899

	do {
900
		struct socket *s;
P
Philipp Reisner 已提交
901

902
		s = drbd_try_connect(connection);
P
Philipp Reisner 已提交
903
		if (s) {
904 905
			if (!sock.socket) {
				sock.socket = s;
906
				send_first_packet(connection, &sock, P_INITIAL_DATA);
907
			} else if (!msock.socket) {
908
				clear_bit(RESOLVE_CONFLICTS, &connection->flags);
909
				msock.socket = s;
910
				send_first_packet(connection, &msock, P_INITIAL_META);
P
Philipp Reisner 已提交
911
			} else {
912
				conn_err(connection, "Logic error in conn_connect()\n");
P
Philipp Reisner 已提交
913 914 915 916
				goto out_release_sockets;
			}
		}

917 918
		if (sock.socket && msock.socket) {
			rcu_read_lock();
919
			nc = rcu_dereference(connection->net_conf);
920 921 922 923 924
			timeout = nc->ping_timeo * HZ / 10;
			rcu_read_unlock();
			schedule_timeout_interruptible(timeout);
			ok = drbd_socket_okay(&sock.socket);
			ok = drbd_socket_okay(&msock.socket) && ok;
P
Philipp Reisner 已提交
925 926 927 928 929
			if (ok)
				break;
		}

retry:
930
		s = drbd_wait_for_connect(connection, &ad);
P
Philipp Reisner 已提交
931
		if (s) {
932
			int fp = receive_first_packet(connection, s);
933 934
			drbd_socket_okay(&sock.socket);
			drbd_socket_okay(&msock.socket);
935
			switch (fp) {
936
			case P_INITIAL_DATA:
937
				if (sock.socket) {
938
					conn_warn(connection, "initial packet S crossed\n");
939
					sock_release(sock.socket);
940 941
					sock.socket = s;
					goto randomize;
P
Philipp Reisner 已提交
942
				}
943
				sock.socket = s;
P
Philipp Reisner 已提交
944
				break;
945
			case P_INITIAL_META:
946
				set_bit(RESOLVE_CONFLICTS, &connection->flags);
947
				if (msock.socket) {
948
					conn_warn(connection, "initial packet M crossed\n");
949
					sock_release(msock.socket);
950 951
					msock.socket = s;
					goto randomize;
P
Philipp Reisner 已提交
952
				}
953
				msock.socket = s;
P
Philipp Reisner 已提交
954 955
				break;
			default:
956
				conn_warn(connection, "Error receiving initial packet\n");
P
Philipp Reisner 已提交
957
				sock_release(s);
958
randomize:
959
				if (prandom_u32() & 1)
P
Philipp Reisner 已提交
960 961 962 963
					goto retry;
			}
		}

964
		if (connection->cstate <= C_DISCONNECTING)
P
Philipp Reisner 已提交
965 966 967 968
			goto out_release_sockets;
		if (signal_pending(current)) {
			flush_signals(current);
			smp_rmb();
969
			if (get_t_state(&connection->receiver) == EXITING)
P
Philipp Reisner 已提交
970 971 972
				goto out_release_sockets;
		}

973 974 975
		ok = drbd_socket_okay(&sock.socket);
		ok = drbd_socket_okay(&msock.socket) && ok;
	} while (!ok);
P
Philipp Reisner 已提交
976

977 978
	if (ad.s_listen)
		sock_release(ad.s_listen);
P
Philipp Reisner 已提交
979

980 981
	sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
	msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
P
Philipp Reisner 已提交
982

983 984
	sock.socket->sk->sk_allocation = GFP_NOIO;
	msock.socket->sk->sk_allocation = GFP_NOIO;
P
Philipp Reisner 已提交
985

986 987
	sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
	msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
P
Philipp Reisner 已提交
988 989

	/* NOT YET ...
990
	 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
991
	 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
992
	 * first set it to the P_CONNECTION_FEATURES timeout,
P
Philipp Reisner 已提交
993
	 * which we set to 4x the configured ping_timeout. */
994
	rcu_read_lock();
995
	nc = rcu_dereference(connection->net_conf);
996

997 998
	sock.socket->sk->sk_sndtimeo =
	sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
P
Philipp Reisner 已提交
999

1000
	msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1001
	timeout = nc->timeout * HZ / 10;
1002
	discard_my_data = nc->discard_my_data;
1003
	rcu_read_unlock();
P
Philipp Reisner 已提交
1004

1005
	msock.socket->sk->sk_sndtimeo = timeout;
P
Philipp Reisner 已提交
1006 1007

	/* we don't want delays.
L
Lucas De Marchi 已提交
1008
	 * we use TCP_CORK where appropriate, though */
1009 1010
	drbd_tcp_nodelay(sock.socket);
	drbd_tcp_nodelay(msock.socket);
P
Philipp Reisner 已提交
1011

1012 1013 1014
	connection->data.socket = sock.socket;
	connection->meta.socket = msock.socket;
	connection->last_received = jiffies;
P
Philipp Reisner 已提交
1015

1016
	h = drbd_do_features(connection);
P
Philipp Reisner 已提交
1017 1018 1019
	if (h <= 0)
		return h;

1020
	if (connection->cram_hmac_tfm) {
1021
		/* drbd_request_state(device, NS(conn, WFAuth)); */
1022
		switch (drbd_do_auth(connection)) {
1023
		case -1:
1024
			conn_err(connection, "Authentication of peer failed\n");
P
Philipp Reisner 已提交
1025
			return -1;
1026
		case 0:
1027
			conn_err(connection, "Authentication of peer failed, trying again.\n");
1028
			return 0;
P
Philipp Reisner 已提交
1029 1030 1031
		}
	}

1032 1033
	connection->data.socket->sk->sk_sndtimeo = timeout;
	connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
P
Philipp Reisner 已提交
1034

1035
	if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1036
		return -1;
P
Philipp Reisner 已提交
1037

1038
	set_bit(STATE_SENT, &connection->flags);
1039

P
Philipp Reisner 已提交
1040
	rcu_read_lock();
1041 1042
	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
		struct drbd_device *device = peer_device->device;
1043
		kref_get(&device->kref);
1044 1045
		rcu_read_unlock();

1046 1047 1048 1049 1050 1051 1052
		/* Prevent a race between resync-handshake and
		 * being promoted to Primary.
		 *
		 * Grab and release the state mutex, so we know that any current
		 * drbd_set_role() is finished, and any incoming drbd_set_role
		 * will see the STATE_SENT flag, and wait for it to be cleared.
		 */
1053 1054
		mutex_lock(device->state_mutex);
		mutex_unlock(device->state_mutex);
1055

1056
		if (discard_my_data)
1057
			set_bit(DISCARD_MY_DATA, &device->flags);
1058
		else
1059
			clear_bit(DISCARD_MY_DATA, &device->flags);
1060

1061
		drbd_connected(device);
1062
		kref_put(&device->kref, drbd_destroy_device);
P
Philipp Reisner 已提交
1063 1064 1065 1066
		rcu_read_lock();
	}
	rcu_read_unlock();

1067 1068 1069
	rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
	if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
		clear_bit(STATE_SENT, &connection->flags);
1070
		return 0;
1071
	}
1072

1073
	drbd_thread_start(&connection->asender);
P
Philipp Reisner 已提交
1074

1075
	mutex_lock(&connection->conf_update);
1076 1077 1078 1079
	/* The discard_my_data flag is a single-shot modifier to the next
	 * connection attempt, the handshake of which is now well underway.
	 * No need for rcu style copying of the whole struct
	 * just to clear a single value. */
1080 1081
	connection->net_conf->discard_my_data = 0;
	mutex_unlock(&connection->conf_update);
1082

1083
	return h;
P
Philipp Reisner 已提交
1084 1085

out_release_sockets:
1086 1087
	if (ad.s_listen)
		sock_release(ad.s_listen);
1088 1089 1090 1091
	if (sock.socket)
		sock_release(sock.socket);
	if (msock.socket)
		sock_release(msock.socket);
P
Philipp Reisner 已提交
1092 1093 1094
	return -1;
}

1095
static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
P
Philipp Reisner 已提交
1096
{
1097
	unsigned int header_size = drbd_header_size(connection);
1098

1099 1100 1101 1102
	if (header_size == sizeof(struct p_header100) &&
	    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
		struct p_header100 *h = header;
		if (h->pad != 0) {
1103
			conn_err(connection, "Header padding is not zero\n");
1104 1105 1106 1107 1108 1109 1110
			return -EINVAL;
		}
		pi->vnr = be16_to_cpu(h->volume);
		pi->cmd = be16_to_cpu(h->command);
		pi->size = be32_to_cpu(h->length);
	} else if (header_size == sizeof(struct p_header95) &&
		   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1111 1112
		struct p_header95 *h = header;
		pi->cmd = be16_to_cpu(h->command);
1113 1114
		pi->size = be32_to_cpu(h->length);
		pi->vnr = 0;
1115 1116 1117 1118 1119
	} else if (header_size == sizeof(struct p_header80) &&
		   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
		struct p_header80 *h = header;
		pi->cmd = be16_to_cpu(h->command);
		pi->size = be16_to_cpu(h->length);
1120
		pi->vnr = 0;
1121
	} else {
1122
		conn_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1123
			 be32_to_cpu(*(__be32 *)header),
1124
			 connection->agreed_pro_version);
1125
		return -EINVAL;
P
Philipp Reisner 已提交
1126
	}
1127
	pi->data = header + header_size;
1128
	return 0;
1129
}
P
Philipp Reisner 已提交
1130

1131
static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1132
{
1133
	void *buffer = connection->data.rbuf;
1134
	int err;
1135

1136
	err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1137
	if (err)
1138
		return err;
1139

1140 1141
	err = decode_header(connection, buffer, pi);
	connection->last_received = jiffies;
P
Philipp Reisner 已提交
1142

1143
	return err;
P
Philipp Reisner 已提交
1144 1145
}

1146
static void drbd_flush(struct drbd_connection *connection)
P
Philipp Reisner 已提交
1147 1148
{
	int rv;
1149
	struct drbd_peer_device *peer_device;
1150 1151
	int vnr;

1152
	if (connection->write_ordering >= WO_bdev_flush) {
1153
		rcu_read_lock();
1154 1155 1156
		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
			struct drbd_device *device = peer_device->device;

1157
			if (!get_ldev(device))
1158
				continue;
1159
			kref_get(&device->kref);
1160 1161
			rcu_read_unlock();

1162
			rv = blkdev_issue_flush(device->ldev->backing_bdev,
1163 1164 1165 1166 1167 1168
					GFP_NOIO, NULL);
			if (rv) {
				dev_info(DEV, "local disk flush failed with status %d\n", rv);
				/* would rather check on EOPNOTSUPP, but that is not reliable.
				 * don't try again for ANY return value != 0
				 * if (rv == -EOPNOTSUPP) */
1169
				drbd_bump_write_ordering(connection, WO_drain_io);
1170
			}
1171
			put_ldev(device);
1172
			kref_put(&device->kref, drbd_destroy_device);
P
Philipp Reisner 已提交
1173

1174 1175 1176
			rcu_read_lock();
			if (rv)
				break;
P
Philipp Reisner 已提交
1177
		}
1178
		rcu_read_unlock();
P
Philipp Reisner 已提交
1179 1180 1181 1182 1183
	}
}

/**
 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1184
 * @device:	DRBD device.
P
Philipp Reisner 已提交
1185 1186 1187
 * @epoch:	Epoch object.
 * @ev:		Epoch event.
 */
1188
static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
P
Philipp Reisner 已提交
1189 1190 1191
					       struct drbd_epoch *epoch,
					       enum epoch_event ev)
{
1192
	int epoch_size;
P
Philipp Reisner 已提交
1193 1194 1195
	struct drbd_epoch *next_epoch;
	enum finish_epoch rv = FE_STILL_LIVE;

1196
	spin_lock(&connection->epoch_lock);
P
Philipp Reisner 已提交
1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215
	do {
		next_epoch = NULL;

		epoch_size = atomic_read(&epoch->epoch_size);

		switch (ev & ~EV_CLEANUP) {
		case EV_PUT:
			atomic_dec(&epoch->active);
			break;
		case EV_GOT_BARRIER_NR:
			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
			break;
		case EV_BECAME_LAST:
			/* nothing to do*/
			break;
		}

		if (epoch_size != 0 &&
		    atomic_read(&epoch->active) == 0 &&
1216
		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
P
Philipp Reisner 已提交
1217
			if (!(ev & EV_CLEANUP)) {
1218 1219 1220
				spin_unlock(&connection->epoch_lock);
				drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
				spin_lock(&connection->epoch_lock);
P
Philipp Reisner 已提交
1221
			}
1222 1223 1224
#if 0
			/* FIXME: dec unacked on connection, once we have
			 * something to count pending connection packets in. */
1225
			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1226
				dec_unacked(epoch->connection);
1227
#endif
P
Philipp Reisner 已提交
1228

1229
			if (connection->current_epoch != epoch) {
P
Philipp Reisner 已提交
1230 1231 1232
				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
				list_del(&epoch->list);
				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1233
				connection->epochs--;
P
Philipp Reisner 已提交
1234 1235 1236 1237 1238 1239 1240
				kfree(epoch);

				if (rv == FE_STILL_LIVE)
					rv = FE_DESTROYED;
			} else {
				epoch->flags = 0;
				atomic_set(&epoch->epoch_size, 0);
1241
				/* atomic_set(&epoch->active, 0); is already zero */
P
Philipp Reisner 已提交
1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252
				if (rv == FE_STILL_LIVE)
					rv = FE_RECYCLED;
			}
		}

		if (!next_epoch)
			break;

		epoch = next_epoch;
	} while (1);

1253
	spin_unlock(&connection->epoch_lock);
P
Philipp Reisner 已提交
1254 1255 1256 1257 1258 1259

	return rv;
}

/**
 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1260
 * @connection:	DRBD connection.
P
Philipp Reisner 已提交
1261 1262
 * @wo:		Write ordering method to try.
 */
1263
void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo)
P
Philipp Reisner 已提交
1264
{
P
Philipp Reisner 已提交
1265
	struct disk_conf *dc;
1266
	struct drbd_peer_device *peer_device;
P
Philipp Reisner 已提交
1267
	enum write_ordering_e pwo;
1268
	int vnr;
P
Philipp Reisner 已提交
1269 1270 1271 1272 1273 1274
	static char *write_ordering_str[] = {
		[WO_none] = "none",
		[WO_drain_io] = "drain",
		[WO_bdev_flush] = "flush",
	};

1275
	pwo = connection->write_ordering;
P
Philipp Reisner 已提交
1276
	wo = min(pwo, wo);
P
Philipp Reisner 已提交
1277
	rcu_read_lock();
1278 1279 1280
	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
		struct drbd_device *device = peer_device->device;

1281
		if (!get_ldev_if_state(device, D_ATTACHING))
1282
			continue;
1283
		dc = rcu_dereference(device->ldev->disk_conf);
1284 1285 1286 1287 1288

		if (wo == WO_bdev_flush && !dc->disk_flushes)
			wo = WO_drain_io;
		if (wo == WO_drain_io && !dc->disk_drain)
			wo = WO_none;
1289
		put_ldev(device);
1290
	}
P
Philipp Reisner 已提交
1291
	rcu_read_unlock();
1292 1293 1294
	connection->write_ordering = wo;
	if (pwo != connection->write_ordering || wo == WO_bdev_flush)
		conn_info(connection, "Method to ensure write ordering: %s\n", write_ordering_str[connection->write_ordering]);
P
Philipp Reisner 已提交
1295 1296
}

1297
/**
1298
 * drbd_submit_peer_request()
1299
 * @device:	DRBD device.
1300
 * @peer_req:	peer request
1301
 * @rw:		flag field, see bio->bi_rw
1302 1303 1304 1305 1306 1307 1308 1309 1310 1311
 *
 * May spread the pages to multiple bios,
 * depending on bio_add_page restrictions.
 *
 * Returns 0 if all bios have been submitted,
 * -ENOMEM if we could not allocate enough bios,
 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
 *  single page to an empty bio (which should never happen and likely indicates
 *  that the lower level IO stack is in some way broken). This has been observed
 *  on certain Xen deployments.
1312 1313
 */
/* TODO allocate from our own bio_set. */
1314
int drbd_submit_peer_request(struct drbd_device *device,
1315 1316
			     struct drbd_peer_request *peer_req,
			     const unsigned rw, const int fault_type)
1317 1318 1319
{
	struct bio *bios = NULL;
	struct bio *bio;
1320 1321 1322
	struct page *page = peer_req->pages;
	sector_t sector = peer_req->i.sector;
	unsigned ds = peer_req->i.size;
1323 1324
	unsigned n_bios = 0;
	unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1325
	int err = -ENOMEM;
1326 1327 1328 1329

	/* In most cases, we will only need one bio.  But in case the lower
	 * level restrictions happen to be different at this offset on this
	 * side than those of the sending peer, we may need to submit the
1330 1331 1332 1333 1334
	 * request in more than one bio.
	 *
	 * Plain bio_alloc is good enough here, this is no DRBD internally
	 * generated bio, but a bio allocated on behalf of the peer.
	 */
1335 1336 1337 1338 1339 1340
next_bio:
	bio = bio_alloc(GFP_NOIO, nr_pages);
	if (!bio) {
		dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
		goto fail;
	}
1341
	/* > peer_req->i.sector, unless this is the first bio */
1342
	bio->bi_iter.bi_sector = sector;
1343
	bio->bi_bdev = device->ldev->backing_bdev;
1344
	bio->bi_rw = rw;
1345
	bio->bi_private = peer_req;
1346
	bio->bi_end_io = drbd_peer_request_endio;
1347 1348 1349 1350 1351 1352 1353 1354

	bio->bi_next = bios;
	bios = bio;
	++n_bios;

	page_chain_for_each(page) {
		unsigned len = min_t(unsigned, ds, PAGE_SIZE);
		if (!bio_add_page(bio, page, len, 0)) {
1355 1356 1357 1358 1359 1360 1361
			/* A single page must always be possible!
			 * But in case it fails anyways,
			 * we deal with it, and complain (below). */
			if (bio->bi_vcnt == 0) {
				dev_err(DEV,
					"bio_add_page failed for len=%u, "
					"bi_vcnt=0 (bi_sector=%llu)\n",
1362
					len, (uint64_t)bio->bi_iter.bi_sector);
1363 1364 1365
				err = -ENOSPC;
				goto fail;
			}
1366 1367 1368 1369 1370 1371 1372 1373 1374
			goto next_bio;
		}
		ds -= len;
		sector += len >> 9;
		--nr_pages;
	}
	D_ASSERT(page == NULL);
	D_ASSERT(ds == 0);

1375
	atomic_set(&peer_req->pending_bios, n_bios);
1376 1377 1378 1379 1380
	do {
		bio = bios;
		bios = bios->bi_next;
		bio->bi_next = NULL;

1381
		drbd_generic_make_request(device, fault_type, bio);
1382 1383 1384 1385 1386 1387 1388 1389 1390
	} while (bios);
	return 0;

fail:
	while (bios) {
		bio = bios;
		bios = bios->bi_next;
		bio_put(bio);
	}
1391
	return err;
1392 1393
}

1394
static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1395
					     struct drbd_peer_request *peer_req)
1396
{
1397
	struct drbd_interval *i = &peer_req->i;
1398

1399
	drbd_remove_interval(&device->write_requests, i);
1400 1401
	drbd_clear_interval(i);

A
Andreas Gruenbacher 已提交
1402
	/* Wake up any processes waiting for this peer request to complete.  */
1403
	if (i->waiting)
1404
		wake_up(&device->misc_wait);
1405 1406
}

1407
static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1408
{
1409
	struct drbd_peer_device *peer_device;
1410 1411 1412
	int vnr;

	rcu_read_lock();
1413 1414 1415
	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
		struct drbd_device *device = peer_device->device;

1416
		kref_get(&device->kref);
1417
		rcu_read_unlock();
1418
		drbd_wait_ee_list_empty(device, &device->active_ee);
1419
		kref_put(&device->kref, drbd_destroy_device);
1420 1421 1422 1423 1424
		rcu_read_lock();
	}
	rcu_read_unlock();
}

1425
static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
1426
{
1427
	int rv;
1428
	struct p_barrier *p = pi->data;
P
Philipp Reisner 已提交
1429 1430
	struct drbd_epoch *epoch;

1431 1432 1433
	/* FIXME these are unacked on connection,
	 * not a specific (peer)device.
	 */
1434 1435 1436
	connection->current_epoch->barrier_nr = p->barrier;
	connection->current_epoch->connection = connection;
	rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
P
Philipp Reisner 已提交
1437 1438 1439 1440 1441 1442

	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
	 * the activity log, which means it would not be resynced in case the
	 * R_PRIMARY crashes now.
	 * Therefore we must send the barrier_ack after the barrier request was
	 * completed. */
1443
	switch (connection->write_ordering) {
P
Philipp Reisner 已提交
1444 1445
	case WO_none:
		if (rv == FE_RECYCLED)
1446
			return 0;
1447 1448 1449 1450 1451 1452 1453

		/* receiver context, in the writeout path of the other node.
		 * avoid potential distributed deadlock */
		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
		if (epoch)
			break;
		else
1454
			conn_warn(connection, "Allocation of an epoch failed, slowing down\n");
1455
			/* Fall through */
P
Philipp Reisner 已提交
1456 1457 1458

	case WO_bdev_flush:
	case WO_drain_io:
1459 1460
		conn_wait_active_ee_empty(connection);
		drbd_flush(connection);
1461

1462
		if (atomic_read(&connection->current_epoch->epoch_size)) {
1463 1464 1465
			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
			if (epoch)
				break;
P
Philipp Reisner 已提交
1466 1467
		}

1468
		return 0;
1469
	default:
1470
		conn_err(connection, "Strangeness in connection->write_ordering %d\n", connection->write_ordering);
1471
		return -EIO;
P
Philipp Reisner 已提交
1472 1473 1474 1475 1476 1477
	}

	epoch->flags = 0;
	atomic_set(&epoch->epoch_size, 0);
	atomic_set(&epoch->active, 0);

1478 1479 1480 1481 1482
	spin_lock(&connection->epoch_lock);
	if (atomic_read(&connection->current_epoch->epoch_size)) {
		list_add(&epoch->list, &connection->current_epoch->list);
		connection->current_epoch = epoch;
		connection->epochs++;
P
Philipp Reisner 已提交
1483 1484 1485 1486
	} else {
		/* The current_epoch got recycled while we allocated this one... */
		kfree(epoch);
	}
1487
	spin_unlock(&connection->epoch_lock);
P
Philipp Reisner 已提交
1488

1489
	return 0;
P
Philipp Reisner 已提交
1490 1491 1492 1493
}

/* used from receive_RSDataReply (recv_resync_read)
 * and from receive_Data */
1494
static struct drbd_peer_request *
1495
read_in_block(struct drbd_device *device, u64 id, sector_t sector,
1496
	      int data_size) __must_hold(local)
P
Philipp Reisner 已提交
1497
{
1498
	const sector_t capacity = drbd_get_capacity(device->this_bdev);
1499
	struct drbd_peer_request *peer_req;
P
Philipp Reisner 已提交
1500
	struct page *page;
1501
	int dgs, ds, err;
1502 1503
	void *dig_in = first_peer_device(device)->connection->int_dig_in;
	void *dig_vv = first_peer_device(device)->connection->int_dig_vv;
1504
	unsigned long *data;
P
Philipp Reisner 已提交
1505

1506
	dgs = 0;
1507 1508
	if (first_peer_device(device)->connection->peer_integrity_tfm) {
		dgs = crypto_hash_digestsize(first_peer_device(device)->connection->peer_integrity_tfm);
1509 1510 1511 1512
		/*
		 * FIXME: Receive the incoming digest into the receive buffer
		 *	  here, together with its struct p_data?
		 */
1513
		err = drbd_recv_all_warn(first_peer_device(device)->connection, dig_in, dgs);
1514
		if (err)
P
Philipp Reisner 已提交
1515
			return NULL;
1516
		data_size -= dgs;
P
Philipp Reisner 已提交
1517 1518
	}

1519 1520 1521 1522
	if (!expect(IS_ALIGNED(data_size, 512)))
		return NULL;
	if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
		return NULL;
P
Philipp Reisner 已提交
1523

1524 1525 1526
	/* even though we trust out peer,
	 * we sometimes have to double check. */
	if (sector + (data_size>>9) > capacity) {
1527 1528
		dev_err(DEV, "request from peer beyond end of local disk: "
			"capacity: %llus < sector: %llus + size: %u\n",
1529 1530 1531 1532 1533
			(unsigned long long)capacity,
			(unsigned long long)sector, data_size);
		return NULL;
	}

P
Philipp Reisner 已提交
1534 1535 1536
	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
	 * "criss-cross" setup, that might cause write-out on some other DRBD,
	 * which in turn might block on the other node at this very place.  */
1537
	peer_req = drbd_alloc_peer_req(device, id, sector, data_size, GFP_NOIO);
1538
	if (!peer_req)
P
Philipp Reisner 已提交
1539
		return NULL;
1540

1541
	if (!data_size)
1542
		return peer_req;
1543

P
Philipp Reisner 已提交
1544
	ds = data_size;
1545
	page = peer_req->pages;
1546 1547
	page_chain_for_each(page) {
		unsigned len = min_t(int, ds, PAGE_SIZE);
1548
		data = kmap(page);
1549
		err = drbd_recv_all_warn(first_peer_device(device)->connection, data, len);
1550
		if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1551 1552 1553
			dev_err(DEV, "Fault injection: Corrupting data on receive\n");
			data[0] = data[0] ^ (unsigned long)-1;
		}
P
Philipp Reisner 已提交
1554
		kunmap(page);
1555
		if (err) {
1556
			drbd_free_peer_req(device, peer_req);
P
Philipp Reisner 已提交
1557 1558
			return NULL;
		}
1559
		ds -= len;
P
Philipp Reisner 已提交
1560 1561 1562
	}

	if (dgs) {
1563
		drbd_csum_ee(device, first_peer_device(device)->connection->peer_integrity_tfm, peer_req, dig_vv);
P
Philipp Reisner 已提交
1564
		if (memcmp(dig_in, dig_vv, dgs)) {
1565 1566
			dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
				(unsigned long long)sector, data_size);
1567
			drbd_free_peer_req(device, peer_req);
P
Philipp Reisner 已提交
1568 1569 1570
			return NULL;
		}
	}
1571
	device->recv_cnt += data_size>>9;
1572
	return peer_req;
P
Philipp Reisner 已提交
1573 1574 1575 1576 1577
}

/* drbd_drain_block() just takes a data block
 * out of the socket input buffer, and discards it.
 */
1578
static int drbd_drain_block(struct drbd_device *device, int data_size)
P
Philipp Reisner 已提交
1579 1580
{
	struct page *page;
1581
	int err = 0;
P
Philipp Reisner 已提交
1582 1583
	void *data;

1584
	if (!data_size)
1585
		return 0;
1586

1587
	page = drbd_alloc_pages(device, 1, 1);
P
Philipp Reisner 已提交
1588 1589 1590

	data = kmap(page);
	while (data_size) {
1591 1592
		unsigned int len = min_t(int, data_size, PAGE_SIZE);

1593
		err = drbd_recv_all_warn(first_peer_device(device)->connection, data, len);
1594
		if (err)
P
Philipp Reisner 已提交
1595
			break;
1596
		data_size -= len;
P
Philipp Reisner 已提交
1597 1598
	}
	kunmap(page);
1599
	drbd_free_pages(device, page, 0);
1600
	return err;
P
Philipp Reisner 已提交
1601 1602
}

1603
static int recv_dless_read(struct drbd_device *device, struct drbd_request *req,
P
Philipp Reisner 已提交
1604 1605
			   sector_t sector, int data_size)
{
1606 1607
	struct bio_vec bvec;
	struct bvec_iter iter;
P
Philipp Reisner 已提交
1608
	struct bio *bio;
1609
	int dgs, err, expect;
1610 1611
	void *dig_in = first_peer_device(device)->connection->int_dig_in;
	void *dig_vv = first_peer_device(device)->connection->int_dig_vv;
P
Philipp Reisner 已提交
1612

1613
	dgs = 0;
1614 1615 1616
	if (first_peer_device(device)->connection->peer_integrity_tfm) {
		dgs = crypto_hash_digestsize(first_peer_device(device)->connection->peer_integrity_tfm);
		err = drbd_recv_all_warn(first_peer_device(device)->connection, dig_in, dgs);
1617 1618
		if (err)
			return err;
1619
		data_size -= dgs;
P
Philipp Reisner 已提交
1620 1621 1622 1623
	}

	/* optimistically update recv_cnt.  if receiving fails below,
	 * we disconnect anyways, and counters will be reset. */
1624
	device->recv_cnt += data_size>>9;
P
Philipp Reisner 已提交
1625 1626

	bio = req->master_bio;
1627
	D_ASSERT(sector == bio->bi_iter.bi_sector);
P
Philipp Reisner 已提交
1628

1629 1630 1631
	bio_for_each_segment(bvec, bio, iter) {
		void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
		expect = min_t(int, data_size, bvec.bv_len);
1632
		err = drbd_recv_all_warn(first_peer_device(device)->connection, mapped, expect);
1633
		kunmap(bvec.bv_page);
1634 1635 1636
		if (err)
			return err;
		data_size -= expect;
P
Philipp Reisner 已提交
1637 1638 1639
	}

	if (dgs) {
1640
		drbd_csum_bio(device, first_peer_device(device)->connection->peer_integrity_tfm, bio, dig_vv);
P
Philipp Reisner 已提交
1641 1642
		if (memcmp(dig_in, dig_vv, dgs)) {
			dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1643
			return -EINVAL;
P
Philipp Reisner 已提交
1644 1645 1646 1647
		}
	}

	D_ASSERT(data_size == 0);
1648
	return 0;
P
Philipp Reisner 已提交
1649 1650
}

1651 1652 1653 1654
/*
 * e_end_resync_block() is called in asender context via
 * drbd_finish_peer_reqs().
 */
1655
static int e_end_resync_block(struct drbd_work *w, int unused)
P
Philipp Reisner 已提交
1656
{
1657 1658
	struct drbd_peer_request *peer_req =
		container_of(w, struct drbd_peer_request, w);
1659
	struct drbd_device *device = w->device;
1660
	sector_t sector = peer_req->i.sector;
1661
	int err;
P
Philipp Reisner 已提交
1662

1663
	D_ASSERT(drbd_interval_empty(&peer_req->i));
P
Philipp Reisner 已提交
1664

1665
	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1666 1667
		drbd_set_in_sync(device, sector, peer_req->i.size);
		err = drbd_send_ack(device, P_RS_WRITE_ACK, peer_req);
P
Philipp Reisner 已提交
1668 1669
	} else {
		/* Record failure to sync */
1670
		drbd_rs_failed_io(device, sector, peer_req->i.size);
P
Philipp Reisner 已提交
1671

1672
		err  = drbd_send_ack(device, P_NEG_ACK, peer_req);
P
Philipp Reisner 已提交
1673
	}
1674
	dec_unacked(device);
P
Philipp Reisner 已提交
1675

1676
	return err;
P
Philipp Reisner 已提交
1677 1678
}

1679
static int recv_resync_read(struct drbd_device *device, sector_t sector, int data_size) __releases(local)
P
Philipp Reisner 已提交
1680
{
1681
	struct drbd_peer_request *peer_req;
P
Philipp Reisner 已提交
1682

1683
	peer_req = read_in_block(device, ID_SYNCER, sector, data_size);
1684
	if (!peer_req)
1685
		goto fail;
P
Philipp Reisner 已提交
1686

1687
	dec_rs_pending(device);
P
Philipp Reisner 已提交
1688

1689
	inc_unacked(device);
P
Philipp Reisner 已提交
1690 1691 1692
	/* corresponding dec_unacked() in e_end_resync_block()
	 * respective _drbd_clear_done_ee */

1693
	peer_req->w.cb = e_end_resync_block;
1694

1695
	spin_lock_irq(&first_peer_device(device)->connection->req_lock);
1696
	list_add(&peer_req->w.list, &device->sync_ee);
1697
	spin_unlock_irq(&first_peer_device(device)->connection->req_lock);
P
Philipp Reisner 已提交
1698

1699 1700
	atomic_add(data_size >> 9, &device->rs_sect_ev);
	if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1701
		return 0;
P
Philipp Reisner 已提交
1702

1703 1704
	/* don't care for the reason here */
	dev_err(DEV, "submit failed, triggering re-connect\n");
1705
	spin_lock_irq(&first_peer_device(device)->connection->req_lock);
1706
	list_del(&peer_req->w.list);
1707
	spin_unlock_irq(&first_peer_device(device)->connection->req_lock);
1708

1709
	drbd_free_peer_req(device, peer_req);
1710
fail:
1711
	put_ldev(device);
1712
	return -EIO;
P
Philipp Reisner 已提交
1713 1714
}

1715
static struct drbd_request *
1716
find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1717
	     sector_t sector, bool missing_ok, const char *func)
1718 1719 1720
{
	struct drbd_request *req;

1721 1722
	/* Request object according to our peer */
	req = (struct drbd_request *)(unsigned long)id;
1723
	if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1724
		return req;
1725
	if (!missing_ok) {
1726
		dev_err(DEV, "%s: failed to find request 0x%lx, sector %llus\n", func,
1727 1728
			(unsigned long)id, (unsigned long long)sector);
	}
1729
	return NULL;
P
Philipp Reisner 已提交
1730 1731
}

1732
static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
1733
{
1734
	struct drbd_device *device;
P
Philipp Reisner 已提交
1735 1736
	struct drbd_request *req;
	sector_t sector;
1737
	int err;
1738
	struct p_data *p = pi->data;
1739

1740
	device = vnr_to_device(connection, pi->vnr);
1741
	if (!device)
1742
		return -EIO;
P
Philipp Reisner 已提交
1743 1744 1745

	sector = be64_to_cpu(p->sector);

1746
	spin_lock_irq(&first_peer_device(device)->connection->req_lock);
1747
	req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1748
	spin_unlock_irq(&first_peer_device(device)->connection->req_lock);
1749
	if (unlikely(!req))
1750
		return -EIO;
P
Philipp Reisner 已提交
1751

B
Bart Van Assche 已提交
1752
	/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
P
Philipp Reisner 已提交
1753 1754
	 * special casing it there for the various failure cases.
	 * still no race with drbd_fail_pending_reads */
1755
	err = recv_dless_read(device, req, sector, pi->size);
1756
	if (!err)
1757
		req_mod(req, DATA_RECEIVED);
P
Philipp Reisner 已提交
1758 1759 1760 1761
	/* else: nothing. handled from drbd_disconnect...
	 * I don't think we may complete this just yet
	 * in case we are "on-disconnect: freeze" */

1762
	return err;
P
Philipp Reisner 已提交
1763 1764
}

1765
static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
1766
{
1767
	struct drbd_device *device;
P
Philipp Reisner 已提交
1768
	sector_t sector;
1769
	int err;
1770
	struct p_data *p = pi->data;
1771

1772
	device = vnr_to_device(connection, pi->vnr);
1773
	if (!device)
1774
		return -EIO;
P
Philipp Reisner 已提交
1775 1776 1777 1778

	sector = be64_to_cpu(p->sector);
	D_ASSERT(p->block_id == ID_SYNCER);

1779
	if (get_ldev(device)) {
P
Philipp Reisner 已提交
1780 1781
		/* data is submitted to disk within recv_resync_read.
		 * corresponding put_ldev done below on error,
1782
		 * or in drbd_peer_request_endio. */
1783
		err = recv_resync_read(device, sector, pi->size);
P
Philipp Reisner 已提交
1784 1785 1786 1787
	} else {
		if (__ratelimit(&drbd_ratelimit_state))
			dev_err(DEV, "Can not write resync data to local disk.\n");

1788
		err = drbd_drain_block(device, pi->size);
P
Philipp Reisner 已提交
1789

1790
		drbd_send_ack_dp(device, P_NEG_ACK, p, pi->size);
P
Philipp Reisner 已提交
1791 1792
	}

1793
	atomic_add(pi->size >> 9, &device->rs_sect_in);
1794

1795
	return err;
P
Philipp Reisner 已提交
1796 1797
}

1798
static void restart_conflicting_writes(struct drbd_device *device,
1799
				       sector_t sector, int size)
P
Philipp Reisner 已提交
1800
{
1801 1802 1803
	struct drbd_interval *i;
	struct drbd_request *req;

1804
	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1805 1806 1807 1808 1809 1810
		if (!i->local)
			continue;
		req = container_of(i, struct drbd_request, i);
		if (req->rq_state & RQ_LOCAL_PENDING ||
		    !(req->rq_state & RQ_POSTPONED))
			continue;
1811 1812
		/* as it is RQ_POSTPONED, this will cause it to
		 * be queued on the retry workqueue. */
1813
		__req_mod(req, CONFLICT_RESOLVED, NULL);
1814 1815
	}
}
P
Philipp Reisner 已提交
1816

1817 1818
/*
 * e_end_block() is called in asender context via drbd_finish_peer_reqs().
P
Philipp Reisner 已提交
1819
 */
1820
static int e_end_block(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
1821
{
1822 1823
	struct drbd_peer_request *peer_req =
		container_of(w, struct drbd_peer_request, w);
1824
	struct drbd_device *device = w->device;
1825
	sector_t sector = peer_req->i.sector;
1826
	int err = 0, pcmd;
P
Philipp Reisner 已提交
1827

1828
	if (peer_req->flags & EE_SEND_WRITE_ACK) {
1829
		if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1830 1831
			pcmd = (device->state.conn >= C_SYNC_SOURCE &&
				device->state.conn <= C_PAUSED_SYNC_T &&
1832
				peer_req->flags & EE_MAY_SET_IN_SYNC) ?
P
Philipp Reisner 已提交
1833
				P_RS_WRITE_ACK : P_WRITE_ACK;
1834
			err = drbd_send_ack(device, pcmd, peer_req);
P
Philipp Reisner 已提交
1835
			if (pcmd == P_RS_WRITE_ACK)
1836
				drbd_set_in_sync(device, sector, peer_req->i.size);
P
Philipp Reisner 已提交
1837
		} else {
1838
			err = drbd_send_ack(device, P_NEG_ACK, peer_req);
P
Philipp Reisner 已提交
1839 1840 1841
			/* we expect it to be marked out of sync anyways...
			 * maybe assert this?  */
		}
1842
		dec_unacked(device);
P
Philipp Reisner 已提交
1843 1844 1845
	}
	/* we delete from the conflict detection hash _after_ we sent out the
	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1846
	if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1847
		spin_lock_irq(&first_peer_device(device)->connection->req_lock);
1848
		D_ASSERT(!drbd_interval_empty(&peer_req->i));
1849
		drbd_remove_epoch_entry_interval(device, peer_req);
1850
		if (peer_req->flags & EE_RESTART_REQUESTS)
1851
			restart_conflicting_writes(device, sector, peer_req->i.size);
1852
		spin_unlock_irq(&first_peer_device(device)->connection->req_lock);
1853
	} else
1854
		D_ASSERT(drbd_interval_empty(&peer_req->i));
P
Philipp Reisner 已提交
1855

1856
	drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
P
Philipp Reisner 已提交
1857

1858
	return err;
P
Philipp Reisner 已提交
1859 1860
}

1861
static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
P
Philipp Reisner 已提交
1862
{
1863
	struct drbd_device *device = w->device;
1864 1865
	struct drbd_peer_request *peer_req =
		container_of(w, struct drbd_peer_request, w);
1866
	int err;
P
Philipp Reisner 已提交
1867

1868 1869
	err = drbd_send_ack(device, ack, peer_req);
	dec_unacked(device);
P
Philipp Reisner 已提交
1870

1871
	return err;
P
Philipp Reisner 已提交
1872 1873
}

1874
static int e_send_superseded(struct drbd_work *w, int unused)
1875
{
1876
	return e_send_ack(w, P_SUPERSEDED);
1877 1878
}

1879
static int e_send_retry_write(struct drbd_work *w, int unused)
1880
{
1881
	struct drbd_connection *connection = first_peer_device(w->device)->connection;
1882

1883
	return e_send_ack(w, connection->agreed_pro_version >= 100 ?
1884
			     P_RETRY_WRITE : P_SUPERSEDED);
1885
}
P
Philipp Reisner 已提交
1886

1887 1888 1889 1890 1891 1892 1893 1894 1895
static bool seq_greater(u32 a, u32 b)
{
	/*
	 * We assume 32-bit wrap-around here.
	 * For 24-bit wrap-around, we would have to shift:
	 *  a <<= 8; b <<= 8;
	 */
	return (s32)a - (s32)b > 0;
}
P
Philipp Reisner 已提交
1896

1897 1898 1899
static u32 seq_max(u32 a, u32 b)
{
	return seq_greater(a, b) ? a : b;
P
Philipp Reisner 已提交
1900 1901
}

1902
static void update_peer_seq(struct drbd_device *device, unsigned int peer_seq)
1903
{
1904
	unsigned int newest_peer_seq;
1905

1906
	if (test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags)) {
1907 1908 1909 1910 1911
		spin_lock(&device->peer_seq_lock);
		newest_peer_seq = seq_max(device->peer_seq, peer_seq);
		device->peer_seq = newest_peer_seq;
		spin_unlock(&device->peer_seq_lock);
		/* wake up only if we actually changed device->peer_seq */
1912
		if (peer_seq == newest_peer_seq)
1913
			wake_up(&device->seq_wait);
1914
	}
P
Philipp Reisner 已提交
1915 1916
}

1917
static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1918
{
1919 1920
	return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
}
1921

1922
/* maybe change sync_ee into interval trees as well? */
1923
static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
1924 1925
{
	struct drbd_peer_request *rs_req;
1926 1927
	bool rv = 0;

1928
	spin_lock_irq(&first_peer_device(device)->connection->req_lock);
1929
	list_for_each_entry(rs_req, &device->sync_ee, w.list) {
1930 1931
		if (overlaps(peer_req->i.sector, peer_req->i.size,
			     rs_req->i.sector, rs_req->i.size)) {
1932 1933 1934 1935
			rv = 1;
			break;
		}
	}
1936
	spin_unlock_irq(&first_peer_device(device)->connection->req_lock);
1937 1938 1939 1940

	return rv;
}

P
Philipp Reisner 已提交
1941 1942 1943 1944 1945 1946 1947 1948 1949
/* Called from receive_Data.
 * Synchronize packets on sock with packets on msock.
 *
 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
 * packet traveling on msock, they are still processed in the order they have
 * been sent.
 *
 * Note: we don't care for Ack packets overtaking P_DATA packets.
 *
1950
 * In case packet_seq is larger than device->peer_seq number, there are
P
Philipp Reisner 已提交
1951
 * outstanding packets on the msock. We wait for them to arrive.
1952
 * In case we are the logically next packet, we update device->peer_seq
P
Philipp Reisner 已提交
1953 1954 1955 1956 1957 1958 1959 1960 1961
 * ourselves. Correctly handles 32bit wrap around.
 *
 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
 *
 * returns 0 if we may process the packet,
 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1962
static int wait_for_and_update_peer_seq(struct drbd_device *device, const u32 peer_seq)
P
Philipp Reisner 已提交
1963 1964 1965
{
	DEFINE_WAIT(wait);
	long timeout;
1966
	int ret = 0, tp;
1967

1968
	if (!test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags))
1969 1970
		return 0;

1971
	spin_lock(&device->peer_seq_lock);
P
Philipp Reisner 已提交
1972
	for (;;) {
1973 1974
		if (!seq_greater(peer_seq - 1, device->peer_seq)) {
			device->peer_seq = seq_max(device->peer_seq, peer_seq);
P
Philipp Reisner 已提交
1975
			break;
1976
		}
1977

P
Philipp Reisner 已提交
1978 1979 1980 1981
		if (signal_pending(current)) {
			ret = -ERESTARTSYS;
			break;
		}
1982 1983

		rcu_read_lock();
1984
		tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries;
1985 1986 1987 1988 1989 1990
		rcu_read_unlock();

		if (!tp)
			break;

		/* Only need to wait if two_primaries is enabled */
1991 1992
		prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
		spin_unlock(&device->peer_seq_lock);
1993
		rcu_read_lock();
1994
		timeout = rcu_dereference(first_peer_device(device)->connection->net_conf)->ping_timeo*HZ/10;
1995
		rcu_read_unlock();
1996
		timeout = schedule_timeout(timeout);
1997
		spin_lock(&device->peer_seq_lock);
1998
		if (!timeout) {
P
Philipp Reisner 已提交
1999
			ret = -ETIMEDOUT;
2000
			dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
P
Philipp Reisner 已提交
2001 2002 2003
			break;
		}
	}
2004 2005
	spin_unlock(&device->peer_seq_lock);
	finish_wait(&device->seq_wait, &wait);
P
Philipp Reisner 已提交
2006 2007 2008
	return ret;
}

2009 2010 2011
/* see also bio_flags_to_wire()
 * DRBD_REQ_*, because we need to semantically map the flags to data packet
 * flags and back. We may replicate to other kernel versions. */
2012
static unsigned long wire_flags_to_bio(struct drbd_device *device, u32 dpf)
2013
{
2014 2015 2016 2017
	return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
		(dpf & DP_FUA ? REQ_FUA : 0) |
		(dpf & DP_FLUSH ? REQ_FLUSH : 0) |
		(dpf & DP_DISCARD ? REQ_DISCARD : 0);
2018 2019
}

2020
static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2021 2022 2023 2024 2025
				    unsigned int size)
{
	struct drbd_interval *i;

    repeat:
2026
	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2027 2028 2029 2030 2031 2032 2033 2034 2035 2036
		struct drbd_request *req;
		struct bio_and_error m;

		if (!i->local)
			continue;
		req = container_of(i, struct drbd_request, i);
		if (!(req->rq_state & RQ_POSTPONED))
			continue;
		req->rq_state &= ~RQ_POSTPONED;
		__req_mod(req, NEG_ACKED, &m);
2037
		spin_unlock_irq(&first_peer_device(device)->connection->req_lock);
2038
		if (m.bio)
2039
			complete_master_bio(device, &m);
2040
		spin_lock_irq(&first_peer_device(device)->connection->req_lock);
2041 2042 2043 2044
		goto repeat;
	}
}

2045
static int handle_write_conflicts(struct drbd_device *device,
2046 2047
				  struct drbd_peer_request *peer_req)
{
2048
	struct drbd_connection *connection = first_peer_device(device)->connection;
2049
	bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2050 2051 2052 2053 2054 2055 2056 2057 2058 2059
	sector_t sector = peer_req->i.sector;
	const unsigned int size = peer_req->i.size;
	struct drbd_interval *i;
	bool equal;
	int err;

	/*
	 * Inserting the peer request into the write_requests tree will prevent
	 * new conflicting local requests from being added.
	 */
2060
	drbd_insert_interval(&device->write_requests, &peer_req->i);
2061 2062

    repeat:
2063
	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2064 2065 2066 2067 2068 2069 2070 2071 2072
		if (i == &peer_req->i)
			continue;

		if (!i->local) {
			/*
			 * Our peer has sent a conflicting remote request; this
			 * should not happen in a two-node setup.  Wait for the
			 * earlier peer request to complete.
			 */
2073
			err = drbd_wait_misc(device, i);
2074 2075 2076 2077 2078 2079 2080 2081 2082
			if (err)
				goto out;
			goto repeat;
		}

		equal = i->sector == sector && i->size == size;
		if (resolve_conflicts) {
			/*
			 * If the peer request is fully contained within the
2083 2084 2085
			 * overlapping request, it can be considered overwritten
			 * and thus superseded; otherwise, it will be retried
			 * once all overlapping requests have completed.
2086
			 */
2087
			bool superseded = i->sector <= sector && i->sector +
2088 2089 2090 2091 2092 2093 2094 2095
				       (i->size >> 9) >= sector + (size >> 9);

			if (!equal)
				dev_alert(DEV, "Concurrent writes detected: "
					       "local=%llus +%u, remote=%llus +%u, "
					       "assuming %s came first\n",
					  (unsigned long long)i->sector, i->size,
					  (unsigned long long)sector, size,
2096
					  superseded ? "local" : "remote");
2097

2098
			inc_unacked(device);
2099
			peer_req->w.cb = superseded ? e_send_superseded :
2100
						   e_send_retry_write;
2101
			list_add_tail(&peer_req->w.list, &device->done_ee);
2102
			wake_asender(first_peer_device(device)->connection);
2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119

			err = -ENOENT;
			goto out;
		} else {
			struct drbd_request *req =
				container_of(i, struct drbd_request, i);

			if (!equal)
				dev_alert(DEV, "Concurrent writes detected: "
					       "local=%llus +%u, remote=%llus +%u\n",
					  (unsigned long long)i->sector, i->size,
					  (unsigned long long)sector, size);

			if (req->rq_state & RQ_LOCAL_PENDING ||
			    !(req->rq_state & RQ_POSTPONED)) {
				/*
				 * Wait for the node with the discard flag to
2120 2121 2122
				 * decide if this request has been superseded
				 * or needs to be retried.
				 * Requests that have been superseded will
2123 2124 2125 2126 2127 2128
				 * disappear from the write_requests tree.
				 *
				 * In addition, wait for the conflicting
				 * request to finish locally before submitting
				 * the conflicting peer request.
				 */
2129
				err = drbd_wait_misc(device, &req->i);
2130
				if (err) {
2131
					_conn_request_state(first_peer_device(device)->connection,
2132 2133
							    NS(conn, C_TIMEOUT),
							    CS_HARD);
2134
					fail_postponed_requests(device, sector, size);
2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149
					goto out;
				}
				goto repeat;
			}
			/*
			 * Remember to restart the conflicting requests after
			 * the new peer request has completed.
			 */
			peer_req->flags |= EE_RESTART_REQUESTS;
		}
	}
	err = 0;

    out:
	if (err)
2150
		drbd_remove_epoch_entry_interval(device, peer_req);
2151 2152 2153
	return err;
}

P
Philipp Reisner 已提交
2154
/* mirrored write */
2155
static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
2156
{
2157
	struct drbd_device *device;
P
Philipp Reisner 已提交
2158
	sector_t sector;
2159
	struct drbd_peer_request *peer_req;
2160
	struct p_data *p = pi->data;
2161
	u32 peer_seq = be32_to_cpu(p->seq_num);
P
Philipp Reisner 已提交
2162 2163
	int rw = WRITE;
	u32 dp_flags;
2164
	int err, tp;
P
Philipp Reisner 已提交
2165

2166
	device = vnr_to_device(connection, pi->vnr);
2167
	if (!device)
2168
		return -EIO;
P
Philipp Reisner 已提交
2169

2170
	if (!get_ldev(device)) {
2171 2172
		int err2;

2173 2174
		err = wait_for_and_update_peer_seq(device, peer_seq);
		drbd_send_ack_dp(device, P_NEG_ACK, p, pi->size);
2175
		atomic_inc(&connection->current_epoch->epoch_size);
2176
		err2 = drbd_drain_block(device, pi->size);
2177 2178 2179
		if (!err)
			err = err2;
		return err;
P
Philipp Reisner 已提交
2180 2181
	}

2182 2183 2184 2185 2186
	/*
	 * Corresponding put_ldev done either below (on various errors), or in
	 * drbd_peer_request_endio, if we successfully submit the data at the
	 * end of this function.
	 */
P
Philipp Reisner 已提交
2187 2188

	sector = be64_to_cpu(p->sector);
2189
	peer_req = read_in_block(device, p->block_id, sector, pi->size);
2190
	if (!peer_req) {
2191
		put_ldev(device);
2192
		return -EIO;
P
Philipp Reisner 已提交
2193 2194
	}

2195
	peer_req->w.cb = e_end_block;
P
Philipp Reisner 已提交
2196

2197
	dp_flags = be32_to_cpu(p->dp_flags);
2198
	rw |= wire_flags_to_bio(device, dp_flags);
2199 2200
	if (peer_req->pages == NULL) {
		D_ASSERT(peer_req->i.size == 0);
2201 2202
		D_ASSERT(dp_flags & DP_FLUSH);
	}
2203 2204

	if (dp_flags & DP_MAY_SET_IN_SYNC)
2205
		peer_req->flags |= EE_MAY_SET_IN_SYNC;
2206

2207 2208
	spin_lock(&connection->epoch_lock);
	peer_req->epoch = connection->current_epoch;
2209 2210
	atomic_inc(&peer_req->epoch->epoch_size);
	atomic_inc(&peer_req->epoch->active);
2211
	spin_unlock(&connection->epoch_lock);
P
Philipp Reisner 已提交
2212

2213
	rcu_read_lock();
2214
	tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries;
2215 2216 2217
	rcu_read_unlock();
	if (tp) {
		peer_req->flags |= EE_IN_INTERVAL_TREE;
2218
		err = wait_for_and_update_peer_seq(device, peer_seq);
2219
		if (err)
P
Philipp Reisner 已提交
2220
			goto out_interrupted;
2221
		spin_lock_irq(&first_peer_device(device)->connection->req_lock);
2222
		err = handle_write_conflicts(device, peer_req);
2223
		if (err) {
2224
			spin_unlock_irq(&first_peer_device(device)->connection->req_lock);
2225
			if (err == -ENOENT) {
2226
				put_ldev(device);
2227
				return 0;
P
Philipp Reisner 已提交
2228
			}
2229
			goto out_interrupted;
P
Philipp Reisner 已提交
2230
		}
2231
	} else {
2232
		update_peer_seq(device, peer_seq);
2233
		spin_lock_irq(&first_peer_device(device)->connection->req_lock);
2234
	}
2235
	list_add(&peer_req->w.list, &device->active_ee);
2236
	spin_unlock_irq(&first_peer_device(device)->connection->req_lock);
P
Philipp Reisner 已提交
2237

2238 2239
	if (device->state.conn == C_SYNC_TARGET)
		wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
P
Philipp Reisner 已提交
2240

2241
	if (first_peer_device(device)->connection->agreed_pro_version < 100) {
2242
		rcu_read_lock();
2243
		switch (rcu_dereference(first_peer_device(device)->connection->net_conf)->wire_protocol) {
2244 2245 2246 2247 2248 2249
		case DRBD_PROT_C:
			dp_flags |= DP_SEND_WRITE_ACK;
			break;
		case DRBD_PROT_B:
			dp_flags |= DP_SEND_RECEIVE_ACK;
			break;
P
Philipp Reisner 已提交
2250
		}
2251
		rcu_read_unlock();
P
Philipp Reisner 已提交
2252 2253
	}

2254 2255
	if (dp_flags & DP_SEND_WRITE_ACK) {
		peer_req->flags |= EE_SEND_WRITE_ACK;
2256
		inc_unacked(device);
P
Philipp Reisner 已提交
2257 2258
		/* corresponding dec_unacked() in e_end_block()
		 * respective _drbd_clear_done_ee */
2259 2260 2261
	}

	if (dp_flags & DP_SEND_RECEIVE_ACK) {
P
Philipp Reisner 已提交
2262 2263
		/* I really don't like it that the receiver thread
		 * sends on the msock, but anyways */
2264
		drbd_send_ack(device, P_RECV_ACK, peer_req);
P
Philipp Reisner 已提交
2265 2266
	}

2267
	if (device->state.pdsk < D_INCONSISTENT) {
P
Philipp Reisner 已提交
2268
		/* In case we have the only disk of the cluster, */
2269
		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2270 2271
		peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
		peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2272
		drbd_al_begin_io(device, &peer_req->i, true);
P
Philipp Reisner 已提交
2273 2274
	}

2275
	err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2276 2277
	if (!err)
		return 0;
P
Philipp Reisner 已提交
2278

2279 2280
	/* don't care for the reason here */
	dev_err(DEV, "submit failed, triggering re-connect\n");
2281
	spin_lock_irq(&first_peer_device(device)->connection->req_lock);
2282
	list_del(&peer_req->w.list);
2283
	drbd_remove_epoch_entry_interval(device, peer_req);
2284
	spin_unlock_irq(&first_peer_device(device)->connection->req_lock);
2285
	if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2286
		drbd_al_complete_io(device, &peer_req->i);
2287

P
Philipp Reisner 已提交
2288
out_interrupted:
2289
	drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2290 2291
	put_ldev(device);
	drbd_free_peer_req(device, peer_req);
2292
	return err;
P
Philipp Reisner 已提交
2293 2294
}

2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305
/* We may throttle resync, if the lower device seems to be busy,
 * and current sync rate is above c_min_rate.
 *
 * To decide whether or not the lower device is busy, we use a scheme similar
 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
 * (more than 64 sectors) of activity we cannot account for with our own resync
 * activity, it obviously is "busy".
 *
 * The current sync rate used here uses only the most recent two step marks,
 * to have a short time average so we can react faster.
 */
2306
int drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector)
2307
{
2308
	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2309
	unsigned long db, dt, dbdt;
2310
	struct lc_element *tmp;
2311 2312
	int curr_events;
	int throttle = 0;
P
Philipp Reisner 已提交
2313 2314 2315
	unsigned int c_min_rate;

	rcu_read_lock();
2316
	c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
P
Philipp Reisner 已提交
2317
	rcu_read_unlock();
2318 2319

	/* feature disabled? */
P
Philipp Reisner 已提交
2320
	if (c_min_rate == 0)
2321 2322
		return 0;

2323 2324
	spin_lock_irq(&device->al_lock);
	tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2325 2326 2327
	if (tmp) {
		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
		if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2328
			spin_unlock_irq(&device->al_lock);
2329 2330 2331 2332
			return 0;
		}
		/* Do not slow down if app IO is already waiting for this extent */
	}
2333
	spin_unlock_irq(&device->al_lock);
2334

2335 2336
	curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
		      (int)part_stat_read(&disk->part0, sectors[1]) -
2337
			atomic_read(&device->rs_sect_ev);
2338

2339
	if (!device->rs_last_events || curr_events - device->rs_last_events > 64) {
2340 2341 2342
		unsigned long rs_left;
		int i;

2343
		device->rs_last_events = curr_events;
2344 2345 2346

		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
		 * approx. */
2347
		i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2348

2349 2350
		if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
			rs_left = device->ov_left;
2351
		else
2352
			rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2353

2354
		dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2355 2356
		if (!dt)
			dt++;
2357
		db = device->rs_mark_left[i] - rs_left;
2358 2359
		dbdt = Bit2KB(db/dt);

P
Philipp Reisner 已提交
2360
		if (dbdt > c_min_rate)
2361 2362 2363 2364 2365 2366
			throttle = 1;
	}
	return throttle;
}


2367
static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
2368
{
2369
	struct drbd_device *device;
P
Philipp Reisner 已提交
2370
	sector_t sector;
2371
	sector_t capacity;
2372
	struct drbd_peer_request *peer_req;
P
Philipp Reisner 已提交
2373
	struct digest_info *di = NULL;
2374
	int size, verb;
P
Philipp Reisner 已提交
2375
	unsigned int fault_type;
2376
	struct p_block_req *p =	pi->data;
2377

2378
	device = vnr_to_device(connection, pi->vnr);
2379
	if (!device)
2380
		return -EIO;
2381
	capacity = drbd_get_capacity(device->this_bdev);
P
Philipp Reisner 已提交
2382 2383 2384 2385

	sector = be64_to_cpu(p->sector);
	size   = be32_to_cpu(p->blksize);

2386
	if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
P
Philipp Reisner 已提交
2387 2388
		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
				(unsigned long long)sector, size);
2389
		return -EINVAL;
P
Philipp Reisner 已提交
2390 2391 2392 2393
	}
	if (sector + (size>>9) > capacity) {
		dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
				(unsigned long long)sector, size);
2394
		return -EINVAL;
P
Philipp Reisner 已提交
2395 2396
	}

2397
	if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2398
		verb = 1;
2399
		switch (pi->cmd) {
2400
		case P_DATA_REQUEST:
2401
			drbd_send_ack_rp(device, P_NEG_DREPLY, p);
2402 2403 2404 2405
			break;
		case P_RS_DATA_REQUEST:
		case P_CSUM_RS_REQUEST:
		case P_OV_REQUEST:
2406
			drbd_send_ack_rp(device, P_NEG_RS_DREPLY , p);
2407 2408 2409
			break;
		case P_OV_REPLY:
			verb = 0;
2410 2411
			dec_rs_pending(device);
			drbd_send_ack_ex(device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2412 2413
			break;
		default:
2414
			BUG();
2415 2416
		}
		if (verb && __ratelimit(&drbd_ratelimit_state))
P
Philipp Reisner 已提交
2417 2418
			dev_err(DEV, "Can not satisfy peer's read request, "
			    "no local data.\n");
2419

L
Lars Ellenberg 已提交
2420
		/* drain possibly payload */
2421
		return drbd_drain_block(device, pi->size);
P
Philipp Reisner 已提交
2422 2423 2424 2425 2426
	}

	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
	 * "criss-cross" setup, that might cause write-out on some other DRBD,
	 * which in turn might block on the other node at this very place.  */
2427
	peer_req = drbd_alloc_peer_req(device, p->block_id, sector, size, GFP_NOIO);
2428
	if (!peer_req) {
2429
		put_ldev(device);
2430
		return -ENOMEM;
P
Philipp Reisner 已提交
2431 2432
	}

2433
	switch (pi->cmd) {
P
Philipp Reisner 已提交
2434
	case P_DATA_REQUEST:
2435
		peer_req->w.cb = w_e_end_data_req;
P
Philipp Reisner 已提交
2436
		fault_type = DRBD_FAULT_DT_RD;
2437 2438 2439
		/* application IO, don't drbd_rs_begin_io */
		goto submit;

P
Philipp Reisner 已提交
2440
	case P_RS_DATA_REQUEST:
2441
		peer_req->w.cb = w_e_end_rsdata_req;
P
Philipp Reisner 已提交
2442
		fault_type = DRBD_FAULT_RS_RD;
2443
		/* used in the sector offset progress display */
2444
		device->bm_resync_fo = BM_SECT_TO_BIT(sector);
P
Philipp Reisner 已提交
2445 2446 2447 2448 2449
		break;

	case P_OV_REPLY:
	case P_CSUM_RS_REQUEST:
		fault_type = DRBD_FAULT_RS_RD;
2450
		di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
P
Philipp Reisner 已提交
2451 2452 2453
		if (!di)
			goto out_free_e;

2454
		di->digest_size = pi->size;
P
Philipp Reisner 已提交
2455 2456
		di->digest = (((char *)di)+sizeof(struct digest_info));

2457 2458
		peer_req->digest = di;
		peer_req->flags |= EE_HAS_DIGEST;
2459

2460
		if (drbd_recv_all(first_peer_device(device)->connection, di->digest, pi->size))
P
Philipp Reisner 已提交
2461 2462
			goto out_free_e;

2463
		if (pi->cmd == P_CSUM_RS_REQUEST) {
2464
			D_ASSERT(first_peer_device(device)->connection->agreed_pro_version >= 89);
2465
			peer_req->w.cb = w_e_end_csum_rs_req;
2466
			/* used in the sector offset progress display */
2467
			device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2468
		} else if (pi->cmd == P_OV_REPLY) {
2469
			/* track progress, we may need to throttle */
2470
			atomic_add(size >> 9, &device->rs_sect_in);
2471
			peer_req->w.cb = w_e_end_ov_reply;
2472
			dec_rs_pending(device);
2473 2474 2475
			/* drbd_rs_begin_io done when we sent this request,
			 * but accounting still needs to be done. */
			goto submit_for_resync;
P
Philipp Reisner 已提交
2476 2477 2478 2479
		}
		break;

	case P_OV_REQUEST:
2480
		if (device->ov_start_sector == ~(sector_t)0 &&
2481
		    first_peer_device(device)->connection->agreed_pro_version >= 90) {
2482 2483
			unsigned long now = jiffies;
			int i;
2484 2485 2486 2487
			device->ov_start_sector = sector;
			device->ov_position = sector;
			device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
			device->rs_total = device->ov_left;
2488
			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2489 2490
				device->rs_mark_left[i] = device->ov_left;
				device->rs_mark_time[i] = now;
2491
			}
P
Philipp Reisner 已提交
2492 2493 2494
			dev_info(DEV, "Online Verify start sector: %llu\n",
					(unsigned long long)sector);
		}
2495
		peer_req->w.cb = w_e_end_ov_req;
P
Philipp Reisner 已提交
2496 2497 2498 2499
		fault_type = DRBD_FAULT_RS_RD;
		break;

	default:
2500
		BUG();
P
Philipp Reisner 已提交
2501 2502
	}

2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524
	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
	 * wrt the receiver, but it is not as straightforward as it may seem.
	 * Various places in the resync start and stop logic assume resync
	 * requests are processed in order, requeuing this on the worker thread
	 * introduces a bunch of new code for synchronization between threads.
	 *
	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
	 * "forever", throttling after drbd_rs_begin_io will lock that extent
	 * for application writes for the same time.  For now, just throttle
	 * here, where the rest of the code expects the receiver to sleep for
	 * a while, anyways.
	 */

	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
	 * this defers syncer requests for some time, before letting at least
	 * on request through.  The resync controller on the receiving side
	 * will adapt to the incoming rate accordingly.
	 *
	 * We cannot throttle here if remote is Primary/SyncTarget:
	 * we would also throttle its application reads.
	 * In that case, throttling is done on the SyncTarget only.
	 */
2525
	if (device->state.peer != R_PRIMARY && drbd_rs_should_slow_down(device, sector))
2526
		schedule_timeout_uninterruptible(HZ/10);
2527
	if (drbd_rs_begin_io(device, sector))
2528
		goto out_free_e;
P
Philipp Reisner 已提交
2529

2530
submit_for_resync:
2531
	atomic_add(size >> 9, &device->rs_sect_ev);
2532

2533
submit:
2534
	inc_unacked(device);
2535
	spin_lock_irq(&first_peer_device(device)->connection->req_lock);
2536
	list_add_tail(&peer_req->w.list, &device->read_ee);
2537
	spin_unlock_irq(&first_peer_device(device)->connection->req_lock);
P
Philipp Reisner 已提交
2538

2539
	if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2540
		return 0;
P
Philipp Reisner 已提交
2541

2542 2543
	/* don't care for the reason here */
	dev_err(DEV, "submit failed, triggering re-connect\n");
2544
	spin_lock_irq(&first_peer_device(device)->connection->req_lock);
2545
	list_del(&peer_req->w.list);
2546
	spin_unlock_irq(&first_peer_device(device)->connection->req_lock);
2547 2548
	/* no drbd_rs_complete_io(), we are dropping the connection anyways */

P
Philipp Reisner 已提交
2549
out_free_e:
2550 2551
	put_ldev(device);
	drbd_free_peer_req(device, peer_req);
2552
	return -EIO;
P
Philipp Reisner 已提交
2553 2554
}

2555
static int drbd_asb_recover_0p(struct drbd_device *device) __must_hold(local)
P
Philipp Reisner 已提交
2556 2557 2558
{
	int self, peer, rv = -100;
	unsigned long ch_self, ch_peer;
2559
	enum drbd_after_sb_p after_sb_0p;
P
Philipp Reisner 已提交
2560

2561 2562
	self = device->ldev->md.uuid[UI_BITMAP] & 1;
	peer = device->p_uuid[UI_BITMAP] & 1;
P
Philipp Reisner 已提交
2563

2564 2565
	ch_peer = device->p_uuid[UI_SIZE];
	ch_self = device->comm_bm_set;
P
Philipp Reisner 已提交
2566

2567
	rcu_read_lock();
2568
	after_sb_0p = rcu_dereference(first_peer_device(device)->connection->net_conf)->after_sb_0p;
2569 2570
	rcu_read_unlock();
	switch (after_sb_0p) {
P
Philipp Reisner 已提交
2571 2572 2573
	case ASB_CONSENSUS:
	case ASB_DISCARD_SECONDARY:
	case ASB_CALL_HELPER:
2574
	case ASB_VIOLENTLY:
P
Philipp Reisner 已提交
2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598
		dev_err(DEV, "Configuration error.\n");
		break;
	case ASB_DISCONNECT:
		break;
	case ASB_DISCARD_YOUNGER_PRI:
		if (self == 0 && peer == 1) {
			rv = -1;
			break;
		}
		if (self == 1 && peer == 0) {
			rv =  1;
			break;
		}
		/* Else fall through to one of the other strategies... */
	case ASB_DISCARD_OLDER_PRI:
		if (self == 0 && peer == 1) {
			rv = 1;
			break;
		}
		if (self == 1 && peer == 0) {
			rv = -1;
			break;
		}
		/* Else fall through to one of the other strategies... */
L
Lars Ellenberg 已提交
2599
		dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
P
Philipp Reisner 已提交
2600 2601 2602
		     "Using discard-least-changes instead\n");
	case ASB_DISCARD_ZERO_CHG:
		if (ch_peer == 0 && ch_self == 0) {
2603
			rv = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags)
P
Philipp Reisner 已提交
2604 2605 2606 2607 2608 2609
				? -1 : 1;
			break;
		} else {
			if (ch_peer == 0) { rv =  1; break; }
			if (ch_self == 0) { rv = -1; break; }
		}
2610
		if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
P
Philipp Reisner 已提交
2611 2612 2613 2614 2615 2616 2617 2618
			break;
	case ASB_DISCARD_LEAST_CHG:
		if	(ch_self < ch_peer)
			rv = -1;
		else if (ch_self > ch_peer)
			rv =  1;
		else /* ( ch_self == ch_peer ) */
		     /* Well, then use something else. */
2619
			rv = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags)
P
Philipp Reisner 已提交
2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631
				? -1 : 1;
		break;
	case ASB_DISCARD_LOCAL:
		rv = -1;
		break;
	case ASB_DISCARD_REMOTE:
		rv =  1;
	}

	return rv;
}

2632
static int drbd_asb_recover_1p(struct drbd_device *device) __must_hold(local)
P
Philipp Reisner 已提交
2633
{
2634
	int hg, rv = -100;
2635
	enum drbd_after_sb_p after_sb_1p;
P
Philipp Reisner 已提交
2636

2637
	rcu_read_lock();
2638
	after_sb_1p = rcu_dereference(first_peer_device(device)->connection->net_conf)->after_sb_1p;
2639 2640
	rcu_read_unlock();
	switch (after_sb_1p) {
P
Philipp Reisner 已提交
2641 2642 2643 2644 2645
	case ASB_DISCARD_YOUNGER_PRI:
	case ASB_DISCARD_OLDER_PRI:
	case ASB_DISCARD_LEAST_CHG:
	case ASB_DISCARD_LOCAL:
	case ASB_DISCARD_REMOTE:
2646
	case ASB_DISCARD_ZERO_CHG:
P
Philipp Reisner 已提交
2647 2648 2649 2650 2651
		dev_err(DEV, "Configuration error.\n");
		break;
	case ASB_DISCONNECT:
		break;
	case ASB_CONSENSUS:
2652 2653
		hg = drbd_asb_recover_0p(device);
		if (hg == -1 && device->state.role == R_SECONDARY)
P
Philipp Reisner 已提交
2654
			rv = hg;
2655
		if (hg == 1  && device->state.role == R_PRIMARY)
P
Philipp Reisner 已提交
2656 2657 2658
			rv = hg;
		break;
	case ASB_VIOLENTLY:
2659
		rv = drbd_asb_recover_0p(device);
P
Philipp Reisner 已提交
2660 2661
		break;
	case ASB_DISCARD_SECONDARY:
2662
		return device->state.role == R_PRIMARY ? 1 : -1;
P
Philipp Reisner 已提交
2663
	case ASB_CALL_HELPER:
2664 2665
		hg = drbd_asb_recover_0p(device);
		if (hg == -1 && device->state.role == R_PRIMARY) {
2666 2667
			enum drbd_state_rv rv2;

P
Philipp Reisner 已提交
2668 2669 2670
			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
			  * we might be here in C_WF_REPORT_PARAMS which is transient.
			  * we do not need to wait for the after state change work either. */
2671
			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2672
			if (rv2 != SS_SUCCESS) {
2673
				drbd_khelper(device, "pri-lost-after-sb");
P
Philipp Reisner 已提交
2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684
			} else {
				dev_warn(DEV, "Successfully gave up primary role.\n");
				rv = hg;
			}
		} else
			rv = hg;
	}

	return rv;
}

2685
static int drbd_asb_recover_2p(struct drbd_device *device) __must_hold(local)
P
Philipp Reisner 已提交
2686
{
2687
	int hg, rv = -100;
2688
	enum drbd_after_sb_p after_sb_2p;
P
Philipp Reisner 已提交
2689

2690
	rcu_read_lock();
2691
	after_sb_2p = rcu_dereference(first_peer_device(device)->connection->net_conf)->after_sb_2p;
2692 2693
	rcu_read_unlock();
	switch (after_sb_2p) {
P
Philipp Reisner 已提交
2694 2695 2696 2697 2698 2699 2700
	case ASB_DISCARD_YOUNGER_PRI:
	case ASB_DISCARD_OLDER_PRI:
	case ASB_DISCARD_LEAST_CHG:
	case ASB_DISCARD_LOCAL:
	case ASB_DISCARD_REMOTE:
	case ASB_CONSENSUS:
	case ASB_DISCARD_SECONDARY:
2701
	case ASB_DISCARD_ZERO_CHG:
P
Philipp Reisner 已提交
2702 2703 2704
		dev_err(DEV, "Configuration error.\n");
		break;
	case ASB_VIOLENTLY:
2705
		rv = drbd_asb_recover_0p(device);
P
Philipp Reisner 已提交
2706 2707 2708 2709
		break;
	case ASB_DISCONNECT:
		break;
	case ASB_CALL_HELPER:
2710
		hg = drbd_asb_recover_0p(device);
P
Philipp Reisner 已提交
2711
		if (hg == -1) {
2712 2713
			enum drbd_state_rv rv2;

P
Philipp Reisner 已提交
2714 2715 2716
			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
			  * we might be here in C_WF_REPORT_PARAMS which is transient.
			  * we do not need to wait for the after state change work either. */
2717
			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2718
			if (rv2 != SS_SUCCESS) {
2719
				drbd_khelper(device, "pri-lost-after-sb");
P
Philipp Reisner 已提交
2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730
			} else {
				dev_warn(DEV, "Successfully gave up primary role.\n");
				rv = hg;
			}
		} else
			rv = hg;
	}

	return rv;
}

2731
static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
P
Philipp Reisner 已提交
2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754 2755 2756
			   u64 bits, u64 flags)
{
	if (!uuid) {
		dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
		return;
	}
	dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
	     text,
	     (unsigned long long)uuid[UI_CURRENT],
	     (unsigned long long)uuid[UI_BITMAP],
	     (unsigned long long)uuid[UI_HISTORY_START],
	     (unsigned long long)uuid[UI_HISTORY_END],
	     (unsigned long long)bits,
	     (unsigned long long)flags);
}

/*
  100	after split brain try auto recover
    2	C_SYNC_SOURCE set BitMap
    1	C_SYNC_SOURCE use BitMap
    0	no Sync
   -1	C_SYNC_TARGET use BitMap
   -2	C_SYNC_TARGET set BitMap
 -100	after split brain, disconnect
-1000	unrelated data
2757 2758
-1091   requires proto 91
-1096   requires proto 96
P
Philipp Reisner 已提交
2759
 */
2760
static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_hold(local)
P
Philipp Reisner 已提交
2761 2762 2763 2764
{
	u64 self, peer;
	int i, j;

2765 2766
	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
P
Philipp Reisner 已提交
2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784

	*rule_nr = 10;
	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
		return 0;

	*rule_nr = 20;
	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
	     peer != UUID_JUST_CREATED)
		return -2;

	*rule_nr = 30;
	if (self != UUID_JUST_CREATED &&
	    (peer == UUID_JUST_CREATED || peer == (u64)0))
		return 2;

	if (self == peer) {
		int rct, dc; /* roles at crash time */

2785
		if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
P
Philipp Reisner 已提交
2786

2787
			if (first_peer_device(device)->connection->agreed_pro_version < 91)
2788
				return -1091;
P
Philipp Reisner 已提交
2789

2790 2791
			if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
			    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
P
Philipp Reisner 已提交
2792
				dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2793 2794 2795
				drbd_uuid_move_history(device);
				device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
				device->ldev->md.uuid[UI_BITMAP] = 0;
P
Philipp Reisner 已提交
2796

2797 2798
				drbd_uuid_dump(device, "self", device->ldev->md.uuid,
					       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
P
Philipp Reisner 已提交
2799 2800 2801 2802 2803 2804 2805 2806 2807
				*rule_nr = 34;
			} else {
				dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
				*rule_nr = 36;
			}

			return 1;
		}

2808
		if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
P
Philipp Reisner 已提交
2809

2810
			if (first_peer_device(device)->connection->agreed_pro_version < 91)
2811
				return -1091;
P
Philipp Reisner 已提交
2812

2813 2814
			if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
			    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
P
Philipp Reisner 已提交
2815 2816
				dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");

2817 2818 2819
				device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
				device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
				device->p_uuid[UI_BITMAP] = 0UL;
P
Philipp Reisner 已提交
2820

2821
				drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
P
Philipp Reisner 已提交
2822 2823 2824 2825 2826 2827 2828 2829 2830 2831
				*rule_nr = 35;
			} else {
				dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
				*rule_nr = 37;
			}

			return -1;
		}

		/* Common power [off|failure] */
2832 2833
		rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
			(device->p_uuid[UI_FLAGS] & 2);
P
Philipp Reisner 已提交
2834 2835 2836 2837 2838 2839 2840 2841 2842
		/* lowest bit is set when we were primary,
		 * next bit (weight 2) is set when peer was primary */
		*rule_nr = 40;

		switch (rct) {
		case 0: /* !self_pri && !peer_pri */ return 0;
		case 1: /*  self_pri && !peer_pri */ return 1;
		case 2: /* !self_pri &&  peer_pri */ return -1;
		case 3: /*  self_pri &&  peer_pri */
2843
			dc = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags);
P
Philipp Reisner 已提交
2844 2845 2846 2847 2848
			return dc ? -1 : 1;
		}
	}

	*rule_nr = 50;
2849
	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
P
Philipp Reisner 已提交
2850 2851 2852 2853
	if (self == peer)
		return -1;

	*rule_nr = 51;
2854
	peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
P
Philipp Reisner 已提交
2855
	if (self == peer) {
2856
		if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2857 2858 2859
		    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
		    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
		    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
P
Philipp Reisner 已提交
2860 2861 2862
			/* The last P_SYNC_UUID did not get though. Undo the last start of
			   resync as sync source modifications of the peer's UUIDs. */

2863
			if (first_peer_device(device)->connection->agreed_pro_version < 91)
2864
				return -1091;
P
Philipp Reisner 已提交
2865

2866 2867
			device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
			device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
2868

2869
			dev_info(DEV, "Lost last syncUUID packet, corrected:\n");
2870
			drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2871

P
Philipp Reisner 已提交
2872 2873 2874 2875 2876
			return -1;
		}
	}

	*rule_nr = 60;
2877
	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
P
Philipp Reisner 已提交
2878
	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2879
		peer = device->p_uuid[i] & ~((u64)1);
P
Philipp Reisner 已提交
2880 2881 2882 2883 2884
		if (self == peer)
			return -2;
	}

	*rule_nr = 70;
2885 2886
	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
P
Philipp Reisner 已提交
2887 2888 2889 2890
	if (self == peer)
		return 1;

	*rule_nr = 71;
2891
	self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
P
Philipp Reisner 已提交
2892
	if (self == peer) {
2893
		if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2894 2895 2896
		    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
		    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
		    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
P
Philipp Reisner 已提交
2897 2898 2899
			/* The last P_SYNC_UUID did not get though. Undo the last start of
			   resync as sync source modifications of our UUIDs. */

2900
			if (first_peer_device(device)->connection->agreed_pro_version < 91)
2901
				return -1091;
P
Philipp Reisner 已提交
2902

2903 2904
			__drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
			__drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
P
Philipp Reisner 已提交
2905

2906
			dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2907 2908
			drbd_uuid_dump(device, "self", device->ldev->md.uuid,
				       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
P
Philipp Reisner 已提交
2909 2910 2911 2912 2913 2914 2915

			return 1;
		}
	}


	*rule_nr = 80;
2916
	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
P
Philipp Reisner 已提交
2917
	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2918
		self = device->ldev->md.uuid[i] & ~((u64)1);
P
Philipp Reisner 已提交
2919 2920 2921 2922 2923
		if (self == peer)
			return 2;
	}

	*rule_nr = 90;
2924 2925
	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
P
Philipp Reisner 已提交
2926 2927 2928 2929 2930
	if (self == peer && self != ((u64)0))
		return 100;

	*rule_nr = 100;
	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2931
		self = device->ldev->md.uuid[i] & ~((u64)1);
P
Philipp Reisner 已提交
2932
		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2933
			peer = device->p_uuid[j] & ~((u64)1);
P
Philipp Reisner 已提交
2934 2935 2936 2937 2938 2939 2940 2941 2942 2943 2944
			if (self == peer)
				return -100;
		}
	}

	return -1000;
}

/* drbd_sync_handshake() returns the new conn state on success, or
   CONN_MASK (-1) on failure.
 */
2945
static enum drbd_conns drbd_sync_handshake(struct drbd_device *device, enum drbd_role peer_role,
P
Philipp Reisner 已提交
2946 2947 2948 2949
					   enum drbd_disk_state peer_disk) __must_hold(local)
{
	enum drbd_conns rv = C_MASK;
	enum drbd_disk_state mydisk;
2950
	struct net_conf *nc;
2951
	int hg, rule_nr, rr_conflict, tentative;
P
Philipp Reisner 已提交
2952

2953
	mydisk = device->state.disk;
P
Philipp Reisner 已提交
2954
	if (mydisk == D_NEGOTIATING)
2955
		mydisk = device->new_state_tmp.disk;
P
Philipp Reisner 已提交
2956 2957

	dev_info(DEV, "drbd_sync_handshake:\n");
2958

2959 2960 2961 2962
	spin_lock_irq(&device->ldev->md.uuid_lock);
	drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
	drbd_uuid_dump(device, "peer", device->p_uuid,
		       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
P
Philipp Reisner 已提交
2963

2964 2965
	hg = drbd_uuid_compare(device, &rule_nr);
	spin_unlock_irq(&device->ldev->md.uuid_lock);
P
Philipp Reisner 已提交
2966 2967 2968 2969 2970 2971 2972

	dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);

	if (hg == -1000) {
		dev_alert(DEV, "Unrelated data, aborting!\n");
		return C_MASK;
	}
2973 2974
	if (hg < -1000) {
		dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
P
Philipp Reisner 已提交
2975 2976 2977 2978 2979 2980 2981 2982 2983 2984 2985 2986 2987
		return C_MASK;
	}

	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
		int f = (hg == -100) || abs(hg) == 2;
		hg = mydisk > D_INCONSISTENT ? 1 : -1;
		if (f)
			hg = hg*2;
		dev_info(DEV, "Becoming sync %s due to disk states.\n",
		     hg > 0 ? "source" : "target");
	}

2988
	if (abs(hg) == 100)
2989
		drbd_khelper(device, "initial-split-brain");
2990

2991
	rcu_read_lock();
2992
	nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
2993 2994

	if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2995
		int pcount = (device->state.role == R_PRIMARY)
P
Philipp Reisner 已提交
2996 2997 2998 2999 3000
			   + (peer_role == R_PRIMARY);
		int forced = (hg == -100);

		switch (pcount) {
		case 0:
3001
			hg = drbd_asb_recover_0p(device);
P
Philipp Reisner 已提交
3002 3003
			break;
		case 1:
3004
			hg = drbd_asb_recover_1p(device);
P
Philipp Reisner 已提交
3005 3006
			break;
		case 2:
3007
			hg = drbd_asb_recover_2p(device);
P
Philipp Reisner 已提交
3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022
			break;
		}
		if (abs(hg) < 100) {
			dev_warn(DEV, "Split-Brain detected, %d primaries, "
			     "automatically solved. Sync from %s node\n",
			     pcount, (hg < 0) ? "peer" : "this");
			if (forced) {
				dev_warn(DEV, "Doing a full sync, since"
				     " UUIDs where ambiguous.\n");
				hg = hg*2;
			}
		}
	}

	if (hg == -100) {
3023
		if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
P
Philipp Reisner 已提交
3024
			hg = -1;
3025
		if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
P
Philipp Reisner 已提交
3026 3027 3028 3029 3030 3031 3032
			hg = 1;

		if (abs(hg) < 100)
			dev_warn(DEV, "Split-Brain detected, manually solved. "
			     "Sync from %s node\n",
			     (hg < 0) ? "peer" : "this");
	}
3033
	rr_conflict = nc->rr_conflict;
3034
	tentative = nc->tentative;
3035
	rcu_read_unlock();
P
Philipp Reisner 已提交
3036 3037

	if (hg == -100) {
3038 3039 3040 3041
		/* FIXME this log message is not correct if we end up here
		 * after an attempted attach on a diskless node.
		 * We just refuse to attach -- well, we drop the "connection"
		 * to that disk, in a way... */
3042
		dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
3043
		drbd_khelper(device, "split-brain");
P
Philipp Reisner 已提交
3044 3045 3046 3047 3048 3049 3050 3051 3052
		return C_MASK;
	}

	if (hg > 0 && mydisk <= D_INCONSISTENT) {
		dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
		return C_MASK;
	}

	if (hg < 0 && /* by intention we do not use mydisk here. */
3053
	    device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3054
		switch (rr_conflict) {
P
Philipp Reisner 已提交
3055
		case ASB_CALL_HELPER:
3056
			drbd_khelper(device, "pri-lost");
P
Philipp Reisner 已提交
3057 3058 3059 3060 3061 3062 3063 3064 3065 3066
			/* fall through */
		case ASB_DISCONNECT:
			dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
			return C_MASK;
		case ASB_VIOLENTLY:
			dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
			     "assumption\n");
		}
	}

3067
	if (tentative || test_bit(CONN_DRY_RUN, &first_peer_device(device)->connection->flags)) {
3068 3069 3070 3071 3072 3073 3074 3075 3076
		if (hg == 0)
			dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
		else
			dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
				 abs(hg) >= 2 ? "full" : "bit-map based");
		return C_MASK;
	}

P
Philipp Reisner 已提交
3077 3078
	if (abs(hg) >= 2) {
		dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3079
		if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3080
					BM_LOCKED_SET_ALLOWED))
P
Philipp Reisner 已提交
3081 3082 3083 3084 3085 3086 3087 3088 3089
			return C_MASK;
	}

	if (hg > 0) { /* become sync source. */
		rv = C_WF_BITMAP_S;
	} else if (hg < 0) { /* become sync target */
		rv = C_WF_BITMAP_T;
	} else {
		rv = C_CONNECTED;
3090
		if (drbd_bm_total_weight(device)) {
P
Philipp Reisner 已提交
3091
			dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
3092
			     drbd_bm_total_weight(device));
P
Philipp Reisner 已提交
3093 3094 3095 3096 3097 3098
		}
	}

	return rv;
}

3099
static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
P
Philipp Reisner 已提交
3100 3101
{
	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3102 3103
	if (peer == ASB_DISCARD_REMOTE)
		return ASB_DISCARD_LOCAL;
P
Philipp Reisner 已提交
3104 3105

	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3106 3107
	if (peer == ASB_DISCARD_LOCAL)
		return ASB_DISCARD_REMOTE;
P
Philipp Reisner 已提交
3108 3109

	/* everything else is valid if they are equal on both sides. */
3110
	return peer;
P
Philipp Reisner 已提交
3111 3112
}

3113
static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
3114
{
3115
	struct p_protocol *p = pi->data;
3116 3117 3118 3119
	enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
	int p_proto, p_discard_my_data, p_two_primaries, cf;
	struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
	char integrity_alg[SHARED_SECRET_MAX] = "";
3120
	struct crypto_hash *peer_integrity_tfm = NULL;
3121
	void *int_dig_in = NULL, *int_dig_vv = NULL;
P
Philipp Reisner 已提交
3122 3123 3124 3125 3126 3127

	p_proto		= be32_to_cpu(p->protocol);
	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
	p_two_primaries = be32_to_cpu(p->two_primaries);
3128
	cf		= be32_to_cpu(p->conn_flags);
3129
	p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3130

3131
	if (connection->agreed_pro_version >= 87) {
3132
		int err;
3133

3134
		if (pi->size > sizeof(integrity_alg))
3135
			return -EIO;
3136
		err = drbd_recv_all(connection, integrity_alg, pi->size);
3137 3138
		if (err)
			return err;
3139
		integrity_alg[SHARED_SECRET_MAX - 1] = 0;
P
Philipp Reisner 已提交
3140 3141
	}

3142
	if (pi->cmd != P_PROTOCOL_UPDATE) {
3143
		clear_bit(CONN_DRY_RUN, &connection->flags);
P
Philipp Reisner 已提交
3144

3145
		if (cf & CF_DRY_RUN)
3146
			set_bit(CONN_DRY_RUN, &connection->flags);
P
Philipp Reisner 已提交
3147

3148
		rcu_read_lock();
3149
		nc = rcu_dereference(connection->net_conf);
P
Philipp Reisner 已提交
3150

3151
		if (p_proto != nc->wire_protocol) {
3152
			conn_err(connection, "incompatible %s settings\n", "protocol");
3153 3154
			goto disconnect_rcu_unlock;
		}
P
Philipp Reisner 已提交
3155

3156
		if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3157
			conn_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3158 3159
			goto disconnect_rcu_unlock;
		}
P
Philipp Reisner 已提交
3160

3161
		if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3162
			conn_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3163 3164
			goto disconnect_rcu_unlock;
		}
P
Philipp Reisner 已提交
3165

3166
		if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3167
			conn_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3168 3169
			goto disconnect_rcu_unlock;
		}
P
Philipp Reisner 已提交
3170

3171
		if (p_discard_my_data && nc->discard_my_data) {
3172
			conn_err(connection, "incompatible %s settings\n", "discard-my-data");
3173 3174
			goto disconnect_rcu_unlock;
		}
P
Philipp Reisner 已提交
3175

3176
		if (p_two_primaries != nc->two_primaries) {
3177
			conn_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3178 3179
			goto disconnect_rcu_unlock;
		}
P
Philipp Reisner 已提交
3180

3181
		if (strcmp(integrity_alg, nc->integrity_alg)) {
3182
			conn_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3183 3184
			goto disconnect_rcu_unlock;
		}
P
Philipp Reisner 已提交
3185

3186
		rcu_read_unlock();
P
Philipp Reisner 已提交
3187 3188
	}

3189 3190 3191 3192 3193 3194 3195 3196 3197 3198 3199
	if (integrity_alg[0]) {
		int hash_size;

		/*
		 * We can only change the peer data integrity algorithm
		 * here.  Changing our own data integrity algorithm
		 * requires that we send a P_PROTOCOL_UPDATE packet at
		 * the same time; otherwise, the peer has no way to
		 * tell between which packets the algorithm should
		 * change.
		 */
P
Philipp Reisner 已提交
3200

3201 3202
		peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
		if (!peer_integrity_tfm) {
3203
			conn_err(connection, "peer data-integrity-alg %s not supported\n",
3204 3205 3206
				 integrity_alg);
			goto disconnect;
		}
P
Philipp Reisner 已提交
3207

3208 3209 3210 3211
		hash_size = crypto_hash_digestsize(peer_integrity_tfm);
		int_dig_in = kmalloc(hash_size, GFP_KERNEL);
		int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
		if (!(int_dig_in && int_dig_vv)) {
3212
			conn_err(connection, "Allocation of buffers for data integrity checking failed\n");
P
Philipp Reisner 已提交
3213 3214 3215 3216
			goto disconnect;
		}
	}

3217 3218
	new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
	if (!new_net_conf) {
3219
		conn_err(connection, "Allocation of new net_conf failed\n");
3220 3221 3222
		goto disconnect;
	}

3223 3224 3225
	mutex_lock(&connection->data.mutex);
	mutex_lock(&connection->conf_update);
	old_net_conf = connection->net_conf;
3226 3227 3228 3229 3230 3231 3232 3233
	*new_net_conf = *old_net_conf;

	new_net_conf->wire_protocol = p_proto;
	new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
	new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
	new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
	new_net_conf->two_primaries = p_two_primaries;

3234 3235 3236
	rcu_assign_pointer(connection->net_conf, new_net_conf);
	mutex_unlock(&connection->conf_update);
	mutex_unlock(&connection->data.mutex);
3237

3238 3239 3240 3241 3242 3243
	crypto_free_hash(connection->peer_integrity_tfm);
	kfree(connection->int_dig_in);
	kfree(connection->int_dig_vv);
	connection->peer_integrity_tfm = peer_integrity_tfm;
	connection->int_dig_in = int_dig_in;
	connection->int_dig_vv = int_dig_vv;
3244 3245

	if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3246
		conn_info(connection, "peer data-integrity-alg: %s\n",
3247 3248 3249 3250
			  integrity_alg[0] ? integrity_alg : "(none)");

	synchronize_rcu();
	kfree(old_net_conf);
3251
	return 0;
P
Philipp Reisner 已提交
3252

3253 3254
disconnect_rcu_unlock:
	rcu_read_unlock();
P
Philipp Reisner 已提交
3255
disconnect:
3256
	crypto_free_hash(peer_integrity_tfm);
3257 3258
	kfree(int_dig_in);
	kfree(int_dig_vv);
3259
	conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3260
	return -EIO;
P
Philipp Reisner 已提交
3261 3262 3263 3264 3265 3266 3267
}

/* helper function
 * input: alg name, feature name
 * return: NULL (alg name was "")
 *         ERR_PTR(error) if something goes wrong
 *         or the crypto hash ptr, if it worked out ok. */
3268
static
3269
struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
P
Philipp Reisner 已提交
3270 3271 3272 3273 3274 3275 3276 3277 3278 3279 3280 3281 3282 3283 3284 3285
		const char *alg, const char *name)
{
	struct crypto_hash *tfm;

	if (!alg[0])
		return NULL;

	tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
	if (IS_ERR(tfm)) {
		dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
			alg, name, PTR_ERR(tfm));
		return tfm;
	}
	return tfm;
}

3286
static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3287
{
3288
	void *buffer = connection->data.rbuf;
3289 3290 3291 3292
	int size = pi->size;

	while (size) {
		int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3293
		s = drbd_recv(connection, buffer, s);
3294 3295 3296 3297 3298 3299 3300 3301 3302 3303 3304 3305 3306 3307 3308 3309 3310 3311 3312 3313 3314 3315 3316
		if (s <= 0) {
			if (s < 0)
				return s;
			break;
		}
		size -= s;
	}
	if (size)
		return -EIO;
	return 0;
}

/*
 * config_unknown_volume  -  device configuration command for unknown volume
 *
 * When a device is added to an existing connection, the node on which the
 * device is added first will send configuration commands to its peer but the
 * peer will not know about the device yet.  It will warn and ignore these
 * commands.  Once the device is added on the second node, the second node will
 * send the same device configuration commands, but in the other direction.
 *
 * (We can also end up here if drbd is misconfigured.)
 */
3317
static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3318
{
3319
	conn_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3320
		  cmdname(pi->cmd), pi->vnr);
3321
	return ignore_remaining_packet(connection, pi);
3322 3323
}

3324
static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
3325
{
3326
	struct drbd_device *device;
3327
	struct p_rs_param_95 *p;
P
Philipp Reisner 已提交
3328 3329 3330
	unsigned int header_size, data_size, exp_max_sz;
	struct crypto_hash *verify_tfm = NULL;
	struct crypto_hash *csums_tfm = NULL;
3331
	struct net_conf *old_net_conf, *new_net_conf = NULL;
P
Philipp Reisner 已提交
3332
	struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3333
	const int apv = connection->agreed_pro_version;
P
Philipp Reisner 已提交
3334
	struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3335
	int fifo_size = 0;
3336
	int err;
P
Philipp Reisner 已提交
3337

3338
	device = vnr_to_device(connection, pi->vnr);
3339
	if (!device)
3340
		return config_unknown_volume(connection, pi);
P
Philipp Reisner 已提交
3341 3342 3343 3344

	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
		    : apv == 88 ? sizeof(struct p_rs_param)
					+ SHARED_SECRET_MAX
3345 3346
		    : apv <= 94 ? sizeof(struct p_rs_param_89)
		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
P
Philipp Reisner 已提交
3347

3348
	if (pi->size > exp_max_sz) {
P
Philipp Reisner 已提交
3349
		dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3350
		    pi->size, exp_max_sz);
3351
		return -EIO;
P
Philipp Reisner 已提交
3352 3353 3354
	}

	if (apv <= 88) {
3355
		header_size = sizeof(struct p_rs_param);
3356
		data_size = pi->size - header_size;
3357
	} else if (apv <= 94) {
3358
		header_size = sizeof(struct p_rs_param_89);
3359
		data_size = pi->size - header_size;
P
Philipp Reisner 已提交
3360
		D_ASSERT(data_size == 0);
3361
	} else {
3362
		header_size = sizeof(struct p_rs_param_95);
3363
		data_size = pi->size - header_size;
P
Philipp Reisner 已提交
3364 3365 3366 3367
		D_ASSERT(data_size == 0);
	}

	/* initialize verify_alg and csums_alg */
3368
	p = pi->data;
P
Philipp Reisner 已提交
3369 3370
	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);

3371
	err = drbd_recv_all(first_peer_device(device)->connection, p, header_size);
3372 3373
	if (err)
		return err;
P
Philipp Reisner 已提交
3374

3375 3376
	mutex_lock(&first_peer_device(device)->connection->conf_update);
	old_net_conf = first_peer_device(device)->connection->net_conf;
3377
	if (get_ldev(device)) {
P
Philipp Reisner 已提交
3378 3379
		new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
		if (!new_disk_conf) {
3380
			put_ldev(device);
3381
			mutex_unlock(&first_peer_device(device)->connection->conf_update);
P
Philipp Reisner 已提交
3382 3383 3384
			dev_err(DEV, "Allocation of new disk_conf failed\n");
			return -ENOMEM;
		}
P
Philipp Reisner 已提交
3385

3386
		old_disk_conf = device->ldev->disk_conf;
P
Philipp Reisner 已提交
3387
		*new_disk_conf = *old_disk_conf;
P
Philipp Reisner 已提交
3388

3389
		new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
P
Philipp Reisner 已提交
3390
	}
P
Philipp Reisner 已提交
3391 3392 3393

	if (apv >= 88) {
		if (apv == 88) {
3394 3395 3396 3397
			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
				dev_err(DEV, "verify-alg of wrong size, "
					"peer wants %u, accepting only up to %u byte\n",
					data_size, SHARED_SECRET_MAX);
P
Philipp Reisner 已提交
3398 3399
				err = -EIO;
				goto reconnect;
P
Philipp Reisner 已提交
3400 3401
			}

3402
			err = drbd_recv_all(first_peer_device(device)->connection, p->verify_alg, data_size);
P
Philipp Reisner 已提交
3403 3404
			if (err)
				goto reconnect;
P
Philipp Reisner 已提交
3405 3406 3407 3408 3409 3410 3411 3412 3413 3414 3415 3416 3417 3418
			/* we expect NUL terminated string */
			/* but just in case someone tries to be evil */
			D_ASSERT(p->verify_alg[data_size-1] == 0);
			p->verify_alg[data_size-1] = 0;

		} else /* apv >= 89 */ {
			/* we still expect NUL terminated strings */
			/* but just in case someone tries to be evil */
			D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
			D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
		}

3419
		if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3420
			if (device->state.conn == C_WF_REPORT_PARAMS) {
P
Philipp Reisner 已提交
3421
				dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3422
				    old_net_conf->verify_alg, p->verify_alg);
P
Philipp Reisner 已提交
3423 3424
				goto disconnect;
			}
3425
			verify_tfm = drbd_crypto_alloc_digest_safe(device,
P
Philipp Reisner 已提交
3426 3427 3428 3429 3430 3431 3432
					p->verify_alg, "verify-alg");
			if (IS_ERR(verify_tfm)) {
				verify_tfm = NULL;
				goto disconnect;
			}
		}

3433
		if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3434
			if (device->state.conn == C_WF_REPORT_PARAMS) {
P
Philipp Reisner 已提交
3435
				dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3436
				    old_net_conf->csums_alg, p->csums_alg);
P
Philipp Reisner 已提交
3437 3438
				goto disconnect;
			}
3439
			csums_tfm = drbd_crypto_alloc_digest_safe(device,
P
Philipp Reisner 已提交
3440 3441 3442 3443 3444 3445 3446
					p->csums_alg, "csums-alg");
			if (IS_ERR(csums_tfm)) {
				csums_tfm = NULL;
				goto disconnect;
			}
		}

P
Philipp Reisner 已提交
3447
		if (apv > 94 && new_disk_conf) {
P
Philipp Reisner 已提交
3448 3449 3450 3451
			new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
			new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
			new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
			new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3452

P
Philipp Reisner 已提交
3453
			fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3454
			if (fifo_size != device->rs_plan_s->size) {
P
Philipp Reisner 已提交
3455 3456
				new_plan = fifo_alloc(fifo_size);
				if (!new_plan) {
3457
					dev_err(DEV, "kmalloc of fifo_buffer failed");
3458
					put_ldev(device);
3459 3460 3461
					goto disconnect;
				}
			}
3462
		}
P
Philipp Reisner 已提交
3463

3464
		if (verify_tfm || csums_tfm) {
3465 3466
			new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
			if (!new_net_conf) {
3467 3468 3469 3470
				dev_err(DEV, "Allocation of new net_conf failed\n");
				goto disconnect;
			}

3471
			*new_net_conf = *old_net_conf;
3472 3473

			if (verify_tfm) {
3474 3475
				strcpy(new_net_conf->verify_alg, p->verify_alg);
				new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3476 3477
				crypto_free_hash(first_peer_device(device)->connection->verify_tfm);
				first_peer_device(device)->connection->verify_tfm = verify_tfm;
3478 3479 3480
				dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
			}
			if (csums_tfm) {
3481 3482
				strcpy(new_net_conf->csums_alg, p->csums_alg);
				new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3483 3484
				crypto_free_hash(first_peer_device(device)->connection->csums_tfm);
				first_peer_device(device)->connection->csums_tfm = csums_tfm;
3485 3486
				dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
			}
3487
			rcu_assign_pointer(connection->net_conf, new_net_conf);
3488
		}
P
Philipp Reisner 已提交
3489 3490
	}

P
Philipp Reisner 已提交
3491
	if (new_disk_conf) {
3492 3493
		rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
		put_ldev(device);
P
Philipp Reisner 已提交
3494 3495 3496
	}

	if (new_plan) {
3497 3498
		old_plan = device->rs_plan_s;
		rcu_assign_pointer(device->rs_plan_s, new_plan);
P
Philipp Reisner 已提交
3499
	}
P
Philipp Reisner 已提交
3500

3501
	mutex_unlock(&first_peer_device(device)->connection->conf_update);
P
Philipp Reisner 已提交
3502 3503 3504 3505
	synchronize_rcu();
	if (new_net_conf)
		kfree(old_net_conf);
	kfree(old_disk_conf);
P
Philipp Reisner 已提交
3506
	kfree(old_plan);
P
Philipp Reisner 已提交
3507

3508
	return 0;
P
Philipp Reisner 已提交
3509

P
Philipp Reisner 已提交
3510 3511
reconnect:
	if (new_disk_conf) {
3512
		put_ldev(device);
P
Philipp Reisner 已提交
3513 3514
		kfree(new_disk_conf);
	}
3515
	mutex_unlock(&first_peer_device(device)->connection->conf_update);
P
Philipp Reisner 已提交
3516 3517
	return -EIO;

P
Philipp Reisner 已提交
3518
disconnect:
P
Philipp Reisner 已提交
3519 3520
	kfree(new_plan);
	if (new_disk_conf) {
3521
		put_ldev(device);
P
Philipp Reisner 已提交
3522 3523
		kfree(new_disk_conf);
	}
3524
	mutex_unlock(&first_peer_device(device)->connection->conf_update);
P
Philipp Reisner 已提交
3525 3526 3527 3528 3529
	/* just for completeness: actually not needed,
	 * as this is not reached if csums_tfm was ok. */
	crypto_free_hash(csums_tfm);
	/* but free the verify_tfm again, if csums_tfm did not work out */
	crypto_free_hash(verify_tfm);
3530
	conn_request_state(first_peer_device(device)->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3531
	return -EIO;
P
Philipp Reisner 已提交
3532 3533 3534
}

/* warn if the arguments differ by more than 12.5% */
3535
static void warn_if_differ_considerably(struct drbd_device *device,
P
Philipp Reisner 已提交
3536 3537 3538 3539 3540 3541 3542 3543 3544 3545 3546
	const char *s, sector_t a, sector_t b)
{
	sector_t d;
	if (a == 0 || b == 0)
		return;
	d = (a > b) ? (a - b) : (b - a);
	if (d > (a>>3) || d > (b>>3))
		dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
		     (unsigned long long)a, (unsigned long long)b);
}

3547
static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
3548
{
3549
	struct drbd_device *device;
3550
	struct p_sizes *p = pi->data;
3551
	enum determine_dev_size dd = DS_UNCHANGED;
P
Philipp Reisner 已提交
3552 3553
	sector_t p_size, p_usize, my_usize;
	int ldsc = 0; /* local disk size changed */
3554
	enum dds_flags ddsf;
P
Philipp Reisner 已提交
3555

3556
	device = vnr_to_device(connection, pi->vnr);
3557
	if (!device)
3558
		return config_unknown_volume(connection, pi);
3559

P
Philipp Reisner 已提交
3560 3561 3562 3563 3564
	p_size = be64_to_cpu(p->d_size);
	p_usize = be64_to_cpu(p->u_size);

	/* just store the peer's disk size for now.
	 * we still need to figure out whether we accept that. */
3565
	device->p_size = p_size;
P
Philipp Reisner 已提交
3566

3567
	if (get_ldev(device)) {
P
Philipp Reisner 已提交
3568
		rcu_read_lock();
3569
		my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
P
Philipp Reisner 已提交
3570 3571
		rcu_read_unlock();

3572 3573 3574
		warn_if_differ_considerably(device, "lower level device sizes",
			   p_size, drbd_get_max_capacity(device->ldev));
		warn_if_differ_considerably(device, "user requested size",
P
Philipp Reisner 已提交
3575
					    p_usize, my_usize);
P
Philipp Reisner 已提交
3576 3577 3578

		/* if this is the first connect, or an otherwise expected
		 * param exchange, choose the minimum */
3579
		if (device->state.conn == C_WF_REPORT_PARAMS)
P
Philipp Reisner 已提交
3580
			p_usize = min_not_zero(my_usize, p_usize);
P
Philipp Reisner 已提交
3581 3582 3583

		/* Never shrink a device with usable data during connect.
		   But allow online shrinking if we are connected. */
3584 3585 3586 3587
		if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
		    drbd_get_capacity(device->this_bdev) &&
		    device->state.disk >= D_OUTDATED &&
		    device->state.conn < C_CONNECTED) {
P
Philipp Reisner 已提交
3588
			dev_err(DEV, "The peer's disk size is too small!\n");
3589
			conn_request_state(first_peer_device(device)->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3590
			put_ldev(device);
3591
			return -EIO;
P
Philipp Reisner 已提交
3592
		}
P
Philipp Reisner 已提交
3593 3594 3595 3596 3597 3598 3599

		if (my_usize != p_usize) {
			struct disk_conf *old_disk_conf, *new_disk_conf = NULL;

			new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
			if (!new_disk_conf) {
				dev_err(DEV, "Allocation of new disk_conf failed\n");
3600
				put_ldev(device);
P
Philipp Reisner 已提交
3601 3602 3603
				return -ENOMEM;
			}

3604
			mutex_lock(&first_peer_device(device)->connection->conf_update);
3605
			old_disk_conf = device->ldev->disk_conf;
P
Philipp Reisner 已提交
3606 3607 3608
			*new_disk_conf = *old_disk_conf;
			new_disk_conf->disk_size = p_usize;

3609
			rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3610
			mutex_unlock(&first_peer_device(device)->connection->conf_update);
P
Philipp Reisner 已提交
3611 3612 3613 3614 3615
			synchronize_rcu();
			kfree(old_disk_conf);

			dev_info(DEV, "Peer sets u_size to %lu sectors\n",
				 (unsigned long)my_usize);
P
Philipp Reisner 已提交
3616
		}
P
Philipp Reisner 已提交
3617

3618
		put_ldev(device);
P
Philipp Reisner 已提交
3619 3620
	}

3621
	ddsf = be16_to_cpu(p->dds_flags);
3622 3623 3624
	if (get_ldev(device)) {
		dd = drbd_determine_dev_size(device, ddsf, NULL);
		put_ldev(device);
3625
		if (dd == DS_ERROR)
3626
			return -EIO;
3627
		drbd_md_sync(device);
P
Philipp Reisner 已提交
3628 3629
	} else {
		/* I am diskless, need to accept the peer's size. */
3630
		drbd_set_my_capacity(device, p_size);
P
Philipp Reisner 已提交
3631 3632
	}

3633 3634
	device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
	drbd_reconsider_max_bio_size(device);
3635

3636 3637 3638
	if (get_ldev(device)) {
		if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
			device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
P
Philipp Reisner 已提交
3639 3640 3641
			ldsc = 1;
		}

3642
		put_ldev(device);
P
Philipp Reisner 已提交
3643 3644
	}

3645
	if (device->state.conn > C_WF_REPORT_PARAMS) {
P
Philipp Reisner 已提交
3646
		if (be64_to_cpu(p->c_size) !=
3647
		    drbd_get_capacity(device->this_bdev) || ldsc) {
P
Philipp Reisner 已提交
3648 3649
			/* we have different sizes, probably peer
			 * needs to know my new size... */
3650
			drbd_send_sizes(device, 0, ddsf);
P
Philipp Reisner 已提交
3651
		}
3652 3653 3654 3655
		if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
		    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
			if (device->state.pdsk >= D_INCONSISTENT &&
			    device->state.disk >= D_INCONSISTENT) {
3656 3657 3658
				if (ddsf & DDSF_NO_RESYNC)
					dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
				else
3659
					resync_after_online_grow(device);
3660
			} else
3661
				set_bit(RESYNC_AFTER_NEG, &device->flags);
P
Philipp Reisner 已提交
3662 3663 3664
		}
	}

3665
	return 0;
P
Philipp Reisner 已提交
3666 3667
}

3668
static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
3669
{
3670
	struct drbd_device *device;
3671
	struct p_uuids *p = pi->data;
P
Philipp Reisner 已提交
3672
	u64 *p_uuid;
3673
	int i, updated_uuids = 0;
P
Philipp Reisner 已提交
3674

3675
	device = vnr_to_device(connection, pi->vnr);
3676
	if (!device)
3677
		return config_unknown_volume(connection, pi);
3678

P
Philipp Reisner 已提交
3679
	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3680 3681 3682 3683
	if (!p_uuid) {
		dev_err(DEV, "kmalloc of p_uuid failed\n");
		return false;
	}
P
Philipp Reisner 已提交
3684 3685 3686 3687

	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
		p_uuid[i] = be64_to_cpu(p->uuid[i]);

3688 3689
	kfree(device->p_uuid);
	device->p_uuid = p_uuid;
P
Philipp Reisner 已提交
3690

3691 3692 3693 3694
	if (device->state.conn < C_CONNECTED &&
	    device->state.disk < D_INCONSISTENT &&
	    device->state.role == R_PRIMARY &&
	    (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
P
Philipp Reisner 已提交
3695
		dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3696
		    (unsigned long long)device->ed_uuid);
3697
		conn_request_state(first_peer_device(device)->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3698
		return -EIO;
P
Philipp Reisner 已提交
3699 3700
	}

3701
	if (get_ldev(device)) {
P
Philipp Reisner 已提交
3702
		int skip_initial_sync =
3703
			device->state.conn == C_CONNECTED &&
3704
			first_peer_device(device)->connection->agreed_pro_version >= 90 &&
3705
			device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
P
Philipp Reisner 已提交
3706 3707 3708
			(p_uuid[UI_FLAGS] & 8);
		if (skip_initial_sync) {
			dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3709
			drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3710 3711
					"clear_n_write from receive_uuids",
					BM_LOCKED_TEST_ALLOWED);
3712 3713 3714
			_drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
			_drbd_uuid_set(device, UI_BITMAP, 0);
			_drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
P
Philipp Reisner 已提交
3715
					CS_VERBOSE, NULL);
3716
			drbd_md_sync(device);
3717
			updated_uuids = 1;
P
Philipp Reisner 已提交
3718
		}
3719 3720 3721
		put_ldev(device);
	} else if (device->state.disk < D_INCONSISTENT &&
		   device->state.role == R_PRIMARY) {
3722 3723
		/* I am a diskless primary, the peer just created a new current UUID
		   for me. */
3724
		updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
P
Philipp Reisner 已提交
3725 3726 3727 3728 3729 3730
	}

	/* Before we test for the disk state, we should wait until an eventually
	   ongoing cluster wide state change is finished. That is important if
	   we are primary and are detaching from our disk. We need to see the
	   new disk state... */
3731 3732 3733 3734
	mutex_lock(device->state_mutex);
	mutex_unlock(device->state_mutex);
	if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
		updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3735 3736

	if (updated_uuids)
3737
		drbd_print_uuids(device, "receiver updated UUIDs to");
P
Philipp Reisner 已提交
3738

3739
	return 0;
P
Philipp Reisner 已提交
3740 3741 3742 3743 3744 3745 3746 3747 3748 3749 3750
}

/**
 * convert_state() - Converts the peer's view of the cluster state to our point of view
 * @ps:		The state as seen by the peer.
 */
static union drbd_state convert_state(union drbd_state ps)
{
	union drbd_state ms;

	static enum drbd_conns c_tab[] = {
3751
		[C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
P
Philipp Reisner 已提交
3752 3753 3754 3755 3756 3757 3758 3759 3760 3761 3762 3763 3764 3765 3766 3767 3768 3769 3770 3771 3772
		[C_CONNECTED] = C_CONNECTED,

		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
		[C_VERIFY_S]       = C_VERIFY_T,
		[C_MASK]   = C_MASK,
	};

	ms.i = ps.i;

	ms.conn = c_tab[ps.conn];
	ms.peer = ps.role;
	ms.role = ps.peer;
	ms.pdsk = ps.disk;
	ms.disk = ps.pdsk;
	ms.peer_isp = (ps.aftr_isp | ps.user_isp);

	return ms;
}

3773
static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
3774
{
3775
	struct drbd_device *device;
3776
	struct p_req_state *p = pi->data;
P
Philipp Reisner 已提交
3777
	union drbd_state mask, val;
3778
	enum drbd_state_rv rv;
P
Philipp Reisner 已提交
3779

3780
	device = vnr_to_device(connection, pi->vnr);
3781
	if (!device)
3782 3783
		return -EIO;

P
Philipp Reisner 已提交
3784 3785 3786
	mask.i = be32_to_cpu(p->mask);
	val.i = be32_to_cpu(p->val);

3787
	if (test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags) &&
3788 3789
	    mutex_is_locked(device->state_mutex)) {
		drbd_send_sr_reply(device, SS_CONCURRENT_ST_CHG);
3790
		return 0;
P
Philipp Reisner 已提交
3791 3792 3793 3794 3795
	}

	mask = convert_state(mask);
	val = convert_state(val);

3796 3797
	rv = drbd_change_state(device, CS_VERBOSE, mask, val);
	drbd_send_sr_reply(device, rv);
P
Philipp Reisner 已提交
3798

3799
	drbd_md_sync(device);
P
Philipp Reisner 已提交
3800

3801
	return 0;
P
Philipp Reisner 已提交
3802 3803
}

3804
static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
3805
{
3806
	struct p_req_state *p = pi->data;
P
Philipp Reisner 已提交
3807
	union drbd_state mask, val;
3808
	enum drbd_state_rv rv;
P
Philipp Reisner 已提交
3809 3810 3811 3812

	mask.i = be32_to_cpu(p->mask);
	val.i = be32_to_cpu(p->val);

3813 3814 3815
	if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
	    mutex_is_locked(&connection->cstate_mutex)) {
		conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
3816
		return 0;
P
Philipp Reisner 已提交
3817 3818 3819 3820 3821
	}

	mask = convert_state(mask);
	val = convert_state(val);

3822 3823
	rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
	conn_send_sr_reply(connection, rv);
P
Philipp Reisner 已提交
3824

3825
	return 0;
P
Philipp Reisner 已提交
3826 3827
}

3828
static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
3829
{
3830
	struct drbd_device *device;
3831
	struct p_state *p = pi->data;
3832
	union drbd_state os, ns, peer_state;
P
Philipp Reisner 已提交
3833
	enum drbd_disk_state real_peer_disk;
3834
	enum chg_state_flags cs_flags;
P
Philipp Reisner 已提交
3835 3836
	int rv;

3837
	device = vnr_to_device(connection, pi->vnr);
3838
	if (!device)
3839
		return config_unknown_volume(connection, pi);
3840

P
Philipp Reisner 已提交
3841 3842 3843 3844
	peer_state.i = be32_to_cpu(p->state);

	real_peer_disk = peer_state.disk;
	if (peer_state.disk == D_NEGOTIATING) {
3845
		real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
P
Philipp Reisner 已提交
3846 3847 3848
		dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
	}

3849
	spin_lock_irq(&first_peer_device(device)->connection->req_lock);
P
Philipp Reisner 已提交
3850
 retry:
3851
	os = ns = drbd_read_state(device);
3852
	spin_unlock_irq(&first_peer_device(device)->connection->req_lock);
P
Philipp Reisner 已提交
3853

3854 3855 3856 3857
	/* If some other part of the code (asender thread, timeout)
	 * already decided to close the connection again,
	 * we must not "re-establish" it here. */
	if (os.conn <= C_TEAR_DOWN)
3858
		return -ECONNRESET;
3859

3860 3861 3862 3863 3864 3865 3866 3867
	/* If this is the "end of sync" confirmation, usually the peer disk
	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
	 * set) resync started in PausedSyncT, or if the timing of pause-/
	 * unpause-sync events has been "just right", the peer disk may
	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
	 */
	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
	    real_peer_disk == D_UP_TO_DATE &&
3868 3869 3870 3871 3872 3873 3874 3875 3876 3877 3878 3879 3880 3881 3882 3883
	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
		/* If we are (becoming) SyncSource, but peer is still in sync
		 * preparation, ignore its uptodate-ness to avoid flapping, it
		 * will change to inconsistent once the peer reaches active
		 * syncing states.
		 * It may have changed syncer-paused flags, however, so we
		 * cannot ignore this completely. */
		if (peer_state.conn > C_CONNECTED &&
		    peer_state.conn < C_SYNC_SOURCE)
			real_peer_disk = D_INCONSISTENT;

		/* if peer_state changes to connected at the same time,
		 * it explicitly notifies us that it finished resync.
		 * Maybe we should finish it up, too? */
		else if (os.conn >= C_SYNC_SOURCE &&
			 peer_state.conn == C_CONNECTED) {
3884 3885
			if (drbd_bm_total_weight(device) <= device->rs_failed)
				drbd_resync_finished(device);
3886
			return 0;
3887 3888 3889
		}
	}

3890 3891 3892
	/* explicit verify finished notification, stop sector reached. */
	if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
	    peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
3893 3894
		ov_out_of_sync_print(device);
		drbd_resync_finished(device);
3895
		return 0;
3896 3897
	}

3898 3899 3900 3901 3902 3903 3904 3905 3906
	/* peer says his disk is inconsistent, while we think it is uptodate,
	 * and this happens while the peer still thinks we have a sync going on,
	 * but we think we are already done with the sync.
	 * We ignore this to avoid flapping pdsk.
	 * This should not happen, if the peer is a recent version of drbd. */
	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
		real_peer_disk = D_UP_TO_DATE;

3907 3908
	if (ns.conn == C_WF_REPORT_PARAMS)
		ns.conn = C_CONNECTED;
P
Philipp Reisner 已提交
3909

3910 3911 3912
	if (peer_state.conn == C_AHEAD)
		ns.conn = C_BEHIND;

3913 3914
	if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
	    get_ldev_if_state(device, D_NEGOTIATING)) {
P
Philipp Reisner 已提交
3915 3916 3917
		int cr; /* consider resync */

		/* if we established a new connection */
3918
		cr  = (os.conn < C_CONNECTED);
P
Philipp Reisner 已提交
3919 3920
		/* if we had an established connection
		 * and one of the nodes newly attaches a disk */
3921
		cr |= (os.conn == C_CONNECTED &&
P
Philipp Reisner 已提交
3922
		       (peer_state.disk == D_NEGOTIATING ||
3923
			os.disk == D_NEGOTIATING));
P
Philipp Reisner 已提交
3924 3925
		/* if we have both been inconsistent, and the peer has been
		 * forced to be UpToDate with --overwrite-data */
3926
		cr |= test_bit(CONSIDER_RESYNC, &device->flags);
P
Philipp Reisner 已提交
3927 3928
		/* if we had been plain connected, and the admin requested to
		 * start a sync by "invalidate" or "invalidate-remote" */
3929
		cr |= (os.conn == C_CONNECTED &&
P
Philipp Reisner 已提交
3930 3931 3932 3933
				(peer_state.conn >= C_STARTING_SYNC_S &&
				 peer_state.conn <= C_WF_BITMAP_T));

		if (cr)
3934
			ns.conn = drbd_sync_handshake(device, peer_state.role, real_peer_disk);
P
Philipp Reisner 已提交
3935

3936
		put_ldev(device);
3937 3938
		if (ns.conn == C_MASK) {
			ns.conn = C_CONNECTED;
3939 3940
			if (device->state.disk == D_NEGOTIATING) {
				drbd_force_state(device, NS(disk, D_FAILED));
P
Philipp Reisner 已提交
3941 3942 3943
			} else if (peer_state.disk == D_NEGOTIATING) {
				dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
				peer_state.disk = D_DISKLESS;
3944
				real_peer_disk = D_DISKLESS;
P
Philipp Reisner 已提交
3945
			} else {
3946
				if (test_and_clear_bit(CONN_DRY_RUN, &first_peer_device(device)->connection->flags))
3947
					return -EIO;
3948
				D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3949
				conn_request_state(first_peer_device(device)->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3950
				return -EIO;
P
Philipp Reisner 已提交
3951 3952 3953 3954
			}
		}
	}

3955
	spin_lock_irq(&first_peer_device(device)->connection->req_lock);
3956
	if (os.i != drbd_read_state(device).i)
P
Philipp Reisner 已提交
3957
		goto retry;
3958
	clear_bit(CONSIDER_RESYNC, &device->flags);
P
Philipp Reisner 已提交
3959 3960 3961
	ns.peer = peer_state.role;
	ns.pdsk = real_peer_disk;
	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3962
	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3963
		ns.disk = device->new_state_tmp.disk;
3964
	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3965 3966
	if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
	    test_bit(NEW_CUR_UUID, &device->flags)) {
3967
		/* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3968
		   for temporal network outages! */
3969
		spin_unlock_irq(&first_peer_device(device)->connection->req_lock);
3970
		dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3971
		tl_clear(first_peer_device(device)->connection);
3972 3973
		drbd_uuid_new_current(device);
		clear_bit(NEW_CUR_UUID, &device->flags);
3974
		conn_request_state(first_peer_device(device)->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3975
		return -EIO;
3976
	}
3977 3978
	rv = _drbd_set_state(device, ns, cs_flags, NULL);
	ns = drbd_read_state(device);
3979
	spin_unlock_irq(&first_peer_device(device)->connection->req_lock);
P
Philipp Reisner 已提交
3980 3981

	if (rv < SS_SUCCESS) {
3982
		conn_request_state(first_peer_device(device)->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3983
		return -EIO;
P
Philipp Reisner 已提交
3984 3985
	}

3986 3987
	if (os.conn > C_WF_REPORT_PARAMS) {
		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
P
Philipp Reisner 已提交
3988 3989 3990 3991
		    peer_state.disk != D_NEGOTIATING ) {
			/* we want resync, peer has not yet decided to sync... */
			/* Nowadays only used when forcing a node into primary role and
			   setting its disk to UpToDate with that */
3992 3993
			drbd_send_uuids(device);
			drbd_send_current_state(device);
P
Philipp Reisner 已提交
3994 3995 3996
		}
	}

3997
	clear_bit(DISCARD_MY_DATA, &device->flags);
P
Philipp Reisner 已提交
3998

3999
	drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
P
Philipp Reisner 已提交
4000

4001
	return 0;
P
Philipp Reisner 已提交
4002 4003
}

4004
static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
4005
{
4006
	struct drbd_device *device;
4007
	struct p_rs_uuid *p = pi->data;
4008

4009
	device = vnr_to_device(connection, pi->vnr);
4010
	if (!device)
4011
		return -EIO;
P
Philipp Reisner 已提交
4012

4013 4014 4015 4016 4017
	wait_event(device->misc_wait,
		   device->state.conn == C_WF_SYNC_UUID ||
		   device->state.conn == C_BEHIND ||
		   device->state.conn < C_CONNECTED ||
		   device->state.disk < D_NEGOTIATING);
P
Philipp Reisner 已提交
4018

4019
	/* D_ASSERT( device->state.conn == C_WF_SYNC_UUID ); */
P
Philipp Reisner 已提交
4020 4021 4022

	/* Here the _drbd_uuid_ functions are right, current should
	   _not_ be rotated into the history */
4023 4024 4025
	if (get_ldev_if_state(device, D_NEGOTIATING)) {
		_drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
		_drbd_uuid_set(device, UI_BITMAP, 0UL);
P
Philipp Reisner 已提交
4026

4027 4028
		drbd_print_uuids(device, "updated sync uuid");
		drbd_start_resync(device, C_SYNC_TARGET);
P
Philipp Reisner 已提交
4029

4030
		put_ldev(device);
P
Philipp Reisner 已提交
4031 4032 4033
	} else
		dev_err(DEV, "Ignoring SyncUUID packet!\n");

4034
	return 0;
P
Philipp Reisner 已提交
4035 4036
}

4037 4038 4039 4040 4041 4042 4043
/**
 * receive_bitmap_plain
 *
 * Return 0 when done, 1 when another iteration is needed, and a negative error
 * code upon failure.
 */
static int
4044
receive_bitmap_plain(struct drbd_device *device, unsigned int size,
4045
		     unsigned long *p, struct bm_xfer_ctx *c)
P
Philipp Reisner 已提交
4046
{
4047
	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4048
				 drbd_header_size(first_peer_device(device)->connection);
4049
	unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4050
				       c->bm_words - c->word_offset);
4051
	unsigned int want = num_words * sizeof(*p);
4052
	int err;
P
Philipp Reisner 已提交
4053

4054 4055
	if (want != size) {
		dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
4056
		return -EIO;
P
Philipp Reisner 已提交
4057 4058
	}
	if (want == 0)
4059
		return 0;
4060
	err = drbd_recv_all(first_peer_device(device)->connection, p, want);
4061
	if (err)
4062
		return err;
P
Philipp Reisner 已提交
4063

4064
	drbd_bm_merge_lel(device, c->word_offset, num_words, p);
P
Philipp Reisner 已提交
4065 4066 4067 4068 4069 4070

	c->word_offset += num_words;
	c->bit_offset = c->word_offset * BITS_PER_LONG;
	if (c->bit_offset > c->bm_bits)
		c->bit_offset = c->bm_bits;

4071
	return 1;
P
Philipp Reisner 已提交
4072 4073
}

4074 4075 4076 4077 4078 4079 4080 4081 4082 4083 4084 4085 4086 4087 4088
static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
{
	return (enum drbd_bitmap_code)(p->encoding & 0x0f);
}

static int dcbp_get_start(struct p_compressed_bm *p)
{
	return (p->encoding & 0x80) != 0;
}

static int dcbp_get_pad_bits(struct p_compressed_bm *p)
{
	return (p->encoding >> 4) & 0x7;
}

4089 4090 4091 4092 4093 4094 4095
/**
 * recv_bm_rle_bits
 *
 * Return 0 when done, 1 when another iteration is needed, and a negative error
 * code upon failure.
 */
static int
4096
recv_bm_rle_bits(struct drbd_device *device,
P
Philipp Reisner 已提交
4097
		struct p_compressed_bm *p,
4098 4099
		 struct bm_xfer_ctx *c,
		 unsigned int len)
P
Philipp Reisner 已提交
4100 4101 4102 4103 4104 4105 4106
{
	struct bitstream bs;
	u64 look_ahead;
	u64 rl;
	u64 tmp;
	unsigned long s = c->bit_offset;
	unsigned long e;
4107
	int toggle = dcbp_get_start(p);
P
Philipp Reisner 已提交
4108 4109 4110
	int have;
	int bits;

4111
	bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
P
Philipp Reisner 已提交
4112 4113 4114

	bits = bitstream_get_bits(&bs, &look_ahead, 64);
	if (bits < 0)
4115
		return -EIO;
P
Philipp Reisner 已提交
4116 4117 4118 4119

	for (have = bits; have > 0; s += rl, toggle = !toggle) {
		bits = vli_decode_bits(&rl, look_ahead);
		if (bits <= 0)
4120
			return -EIO;
P
Philipp Reisner 已提交
4121 4122 4123 4124 4125

		if (toggle) {
			e = s + rl -1;
			if (e >= c->bm_bits) {
				dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4126
				return -EIO;
P
Philipp Reisner 已提交
4127
			}
4128
			_drbd_bm_set_bits(device, s, e);
P
Philipp Reisner 已提交
4129 4130 4131 4132 4133 4134 4135
		}

		if (have < bits) {
			dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
				have, bits, look_ahead,
				(unsigned int)(bs.cur.b - p->code),
				(unsigned int)bs.buf_len);
4136
			return -EIO;
P
Philipp Reisner 已提交
4137
		}
4138 4139 4140 4141 4142
		/* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
		if (likely(bits < 64))
			look_ahead >>= bits;
		else
			look_ahead = 0;
P
Philipp Reisner 已提交
4143 4144 4145 4146
		have -= bits;

		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
		if (bits < 0)
4147
			return -EIO;
P
Philipp Reisner 已提交
4148 4149 4150 4151 4152 4153 4154
		look_ahead |= tmp << have;
		have += bits;
	}

	c->bit_offset = s;
	bm_xfer_ctx_bit_to_word_offset(c);

4155
	return (s != c->bm_bits);
P
Philipp Reisner 已提交
4156 4157
}

4158 4159 4160 4161 4162 4163 4164
/**
 * decode_bitmap_c
 *
 * Return 0 when done, 1 when another iteration is needed, and a negative error
 * code upon failure.
 */
static int
4165
decode_bitmap_c(struct drbd_device *device,
P
Philipp Reisner 已提交
4166
		struct p_compressed_bm *p,
4167 4168
		struct bm_xfer_ctx *c,
		unsigned int len)
P
Philipp Reisner 已提交
4169
{
4170
	if (dcbp_get_code(p) == RLE_VLI_Bits)
4171
		return recv_bm_rle_bits(device, p, c, len - sizeof(*p));
P
Philipp Reisner 已提交
4172 4173 4174 4175 4176 4177

	/* other variants had been implemented for evaluation,
	 * but have been dropped as this one turned out to be "best"
	 * during all our tests. */

	dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4178
	conn_request_state(first_peer_device(device)->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4179
	return -EIO;
P
Philipp Reisner 已提交
4180 4181
}

4182
void INFO_bm_xfer_stats(struct drbd_device *device,
P
Philipp Reisner 已提交
4183 4184 4185
		const char *direction, struct bm_xfer_ctx *c)
{
	/* what would it take to transfer it "plaintext" */
4186
	unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4187 4188 4189 4190 4191 4192
	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
	unsigned int plain =
		header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
		c->bm_words * sizeof(unsigned long);
	unsigned int total = c->bytes[0] + c->bytes[1];
	unsigned int r;
P
Philipp Reisner 已提交
4193 4194 4195 4196 4197 4198 4199 4200 4201 4202 4203 4204 4205 4206 4207 4208 4209 4210 4211 4212 4213 4214 4215 4216 4217 4218 4219 4220 4221 4222 4223 4224 4225

	/* total can not be zero. but just in case: */
	if (total == 0)
		return;

	/* don't report if not compressed */
	if (total >= plain)
		return;

	/* total < plain. check for overflow, still */
	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
		                    : (1000 * total / plain);

	if (r > 1000)
		r = 1000;

	r = 1000 - r;
	dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
	     "total %u; compression: %u.%u%%\n",
			direction,
			c->bytes[1], c->packets[1],
			c->bytes[0], c->packets[0],
			total, r/10, r % 10);
}

/* Since we are processing the bitfield from lower addresses to higher,
   it does not matter if the process it in 32 bit chunks or 64 bit
   chunks as long as it is little endian. (Understand it as byte stream,
   beginning with the lowest byte...) If we would use big endian
   we would need to process it from the highest address to the lowest,
   in order to be agnostic to the 32 vs 64 bits issue.

   returns 0 on failure, 1 if we successfully received it. */
4226
static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
4227
{
4228
	struct drbd_device *device;
P
Philipp Reisner 已提交
4229
	struct bm_xfer_ctx c;
4230
	int err;
4231

4232
	device = vnr_to_device(connection, pi->vnr);
4233
	if (!device)
4234
		return -EIO;
P
Philipp Reisner 已提交
4235

4236
	drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4237 4238
	/* you are supposed to send additional out-of-sync information
	 * if you actually set bits during this phase */
P
Philipp Reisner 已提交
4239 4240

	c = (struct bm_xfer_ctx) {
4241 4242
		.bm_bits = drbd_bm_bits(device),
		.bm_words = drbd_bm_words(device),
P
Philipp Reisner 已提交
4243 4244
	};

4245
	for(;;) {
4246
		if (pi->cmd == P_BITMAP)
4247
			err = receive_bitmap_plain(device, pi->size, pi->data, &c);
4248
		else if (pi->cmd == P_COMPRESSED_BITMAP) {
P
Philipp Reisner 已提交
4249 4250
			/* MAYBE: sanity check that we speak proto >= 90,
			 * and the feature is enabled! */
4251
			struct p_compressed_bm *p = pi->data;
P
Philipp Reisner 已提交
4252

4253
			if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
P
Philipp Reisner 已提交
4254
				dev_err(DEV, "ReportCBitmap packet too large\n");
4255
				err = -EIO;
P
Philipp Reisner 已提交
4256 4257
				goto out;
			}
4258
			if (pi->size <= sizeof(*p)) {
4259
				dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4260
				err = -EIO;
4261
				goto out;
P
Philipp Reisner 已提交
4262
			}
4263
			err = drbd_recv_all(first_peer_device(device)->connection, p, pi->size);
4264 4265
			if (err)
			       goto out;
4266
			err = decode_bitmap_c(device, p, &c, pi->size);
P
Philipp Reisner 已提交
4267
		} else {
4268
			dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4269
			err = -EIO;
P
Philipp Reisner 已提交
4270 4271 4272
			goto out;
		}

4273
		c.packets[pi->cmd == P_BITMAP]++;
4274
		c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
P
Philipp Reisner 已提交
4275

4276 4277 4278
		if (err <= 0) {
			if (err < 0)
				goto out;
P
Philipp Reisner 已提交
4279
			break;
4280
		}
4281
		err = drbd_recv_header(first_peer_device(device)->connection, pi);
4282
		if (err)
P
Philipp Reisner 已提交
4283
			goto out;
4284
	}
P
Philipp Reisner 已提交
4285

4286
	INFO_bm_xfer_stats(device, "receive", &c);
P
Philipp Reisner 已提交
4287

4288
	if (device->state.conn == C_WF_BITMAP_T) {
4289 4290
		enum drbd_state_rv rv;

4291
		err = drbd_send_bitmap(device);
4292
		if (err)
P
Philipp Reisner 已提交
4293 4294
			goto out;
		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4295
		rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4296
		D_ASSERT(rv == SS_SUCCESS);
4297
	} else if (device->state.conn != C_WF_BITMAP_S) {
P
Philipp Reisner 已提交
4298 4299 4300
		/* admin may have requested C_DISCONNECTING,
		 * other threads may have noticed network errors */
		dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4301
		    drbd_conn_str(device->state.conn));
P
Philipp Reisner 已提交
4302
	}
4303
	err = 0;
P
Philipp Reisner 已提交
4304 4305

 out:
4306 4307 4308
	drbd_bm_unlock(device);
	if (!err && device->state.conn == C_WF_BITMAP_S)
		drbd_start_resync(device, C_SYNC_SOURCE);
4309
	return err;
P
Philipp Reisner 已提交
4310 4311
}

4312
static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
4313
{
4314
	conn_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4315
		 pi->cmd, pi->size);
P
Philipp Reisner 已提交
4316

4317
	return ignore_remaining_packet(connection, pi);
P
Philipp Reisner 已提交
4318 4319
}

4320
static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4321
{
4322 4323
	/* Make sure we've acked all the TCP data associated
	 * with the data requests being unplugged */
4324
	drbd_tcp_quickack(connection->data.socket);
4325

4326
	return 0;
4327 4328
}

4329
static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4330
{
4331
	struct drbd_device *device;
4332
	struct p_block_desc *p = pi->data;
4333

4334
	device = vnr_to_device(connection, pi->vnr);
4335
	if (!device)
4336
		return -EIO;
4337

4338
	switch (device->state.conn) {
4339 4340 4341 4342 4343 4344
	case C_WF_SYNC_UUID:
	case C_WF_BITMAP_T:
	case C_BEHIND:
			break;
	default:
		dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4345
				drbd_conn_str(device->state.conn));
4346 4347
	}

4348
	drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4349

4350
	return 0;
4351 4352
}

4353 4354 4355
struct data_cmd {
	int expect_payload;
	size_t pkt_size;
4356
	int (*fn)(struct drbd_connection *, struct packet_info *);
4357 4358 4359 4360 4361 4362 4363
};

static struct data_cmd drbd_cmd_handler[] = {
	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4364 4365 4366
	[P_BITMAP]	    = { 1, 0, receive_bitmap } ,
	[P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
	[P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4367 4368
	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4369 4370
	[P_SYNC_PARAM]	    = { 1, 0, receive_SyncParam },
	[P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4371 4372 4373 4374 4375 4376 4377 4378 4379 4380
	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4381
	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4382
	[P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4383
	[P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
P
Philipp Reisner 已提交
4384 4385
};

4386
static void drbdd(struct drbd_connection *connection)
P
Philipp Reisner 已提交
4387
{
4388
	struct packet_info pi;
4389
	size_t shs; /* sub header size */
4390
	int err;
P
Philipp Reisner 已提交
4391

4392
	while (get_t_state(&connection->receiver) == RUNNING) {
4393
		struct data_cmd *cmd;
P
Philipp Reisner 已提交
4394

4395 4396
		drbd_thread_current_set_cpu(&connection->receiver);
		if (drbd_recv_header(connection, &pi))
4397
			goto err_out;
P
Philipp Reisner 已提交
4398

4399
		cmd = &drbd_cmd_handler[pi.cmd];
4400
		if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4401
			conn_err(connection, "Unexpected data packet %s (0x%04x)",
4402
				 cmdname(pi.cmd), pi.cmd);
4403
			goto err_out;
4404
		}
P
Philipp Reisner 已提交
4405

4406 4407
		shs = cmd->pkt_size;
		if (pi.size > shs && !cmd->expect_payload) {
4408
			conn_err(connection, "No payload expected %s l:%d\n",
4409
				 cmdname(pi.cmd), pi.size);
4410
			goto err_out;
P
Philipp Reisner 已提交
4411 4412
		}

4413
		if (shs) {
4414
			err = drbd_recv_all_warn(connection, pi.data, shs);
4415
			if (err)
4416
				goto err_out;
4417
			pi.size -= shs;
4418 4419
		}

4420
		err = cmd->fn(connection, &pi);
4421
		if (err) {
4422
			conn_err(connection, "error receiving %s, e: %d l: %d!\n",
4423
				 cmdname(pi.cmd), err, pi.size);
4424
			goto err_out;
P
Philipp Reisner 已提交
4425 4426
		}
	}
4427
	return;
P
Philipp Reisner 已提交
4428

4429
    err_out:
4430
	conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
P
Philipp Reisner 已提交
4431 4432
}

4433
void conn_flush_workqueue(struct drbd_connection *connection)
P
Philipp Reisner 已提交
4434 4435 4436 4437
{
	struct drbd_wq_barrier barr;

	barr.w.cb = w_prev_work_done;
4438
	barr.w.connection = connection;
P
Philipp Reisner 已提交
4439
	init_completion(&barr.done);
4440
	drbd_queue_work(&connection->sender_work, &barr.w);
P
Philipp Reisner 已提交
4441 4442 4443
	wait_for_completion(&barr.done);
}

4444
static void conn_disconnect(struct drbd_connection *connection)
P
Philipp Reisner 已提交
4445
{
4446
	struct drbd_peer_device *peer_device;
4447
	enum drbd_conns oc;
P
Philipp Reisner 已提交
4448
	int vnr;
P
Philipp Reisner 已提交
4449

4450
	if (connection->cstate == C_STANDALONE)
P
Philipp Reisner 已提交
4451 4452
		return;

4453 4454 4455 4456 4457
	/* We are about to start the cleanup after connection loss.
	 * Make sure drbd_make_request knows about that.
	 * Usually we should be in some network failure state already,
	 * but just in case we are not, we fix it up here.
	 */
4458
	conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4459

P
Philipp Reisner 已提交
4460
	/* asender does not clean up anything. it must not interfere, either */
4461 4462
	drbd_thread_stop(&connection->asender);
	drbd_free_sock(connection);
4463

P
Philipp Reisner 已提交
4464
	rcu_read_lock();
4465 4466
	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
		struct drbd_device *device = peer_device->device;
4467
		kref_get(&device->kref);
P
Philipp Reisner 已提交
4468
		rcu_read_unlock();
4469
		drbd_disconnected(device);
4470
		kref_put(&device->kref, drbd_destroy_device);
P
Philipp Reisner 已提交
4471 4472 4473 4474
		rcu_read_lock();
	}
	rcu_read_unlock();

4475 4476
	if (!list_empty(&connection->current_epoch->list))
		conn_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4477
	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4478 4479
	atomic_set(&connection->current_epoch->epoch_size, 0);
	connection->send.seen_any_write_yet = false;
4480

4481
	conn_info(connection, "Connection closed\n");
4482

4483 4484
	if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
		conn_try_outdate_peer_async(connection);
4485

4486 4487
	spin_lock_irq(&connection->req_lock);
	oc = connection->cstate;
4488
	if (oc >= C_UNCONNECTED)
4489
		_conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4490

4491
	spin_unlock_irq(&connection->req_lock);
4492

4493
	if (oc == C_DISCONNECTING)
4494
		conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4495 4496
}

4497
static int drbd_disconnected(struct drbd_device *device)
4498 4499
{
	unsigned int i;
P
Philipp Reisner 已提交
4500

4501
	/* wait for current activity to cease. */
4502
	spin_lock_irq(&first_peer_device(device)->connection->req_lock);
4503 4504 4505
	_drbd_wait_ee_list_empty(device, &device->active_ee);
	_drbd_wait_ee_list_empty(device, &device->sync_ee);
	_drbd_wait_ee_list_empty(device, &device->read_ee);
4506
	spin_unlock_irq(&first_peer_device(device)->connection->req_lock);
P
Philipp Reisner 已提交
4507 4508 4509 4510 4511 4512 4513 4514 4515 4516 4517

	/* We do not have data structures that would allow us to
	 * get the rs_pending_cnt down to 0 again.
	 *  * On C_SYNC_TARGET we do not have any data structures describing
	 *    the pending RSDataRequest's we have sent.
	 *  * On C_SYNC_SOURCE there is no data structure that tracks
	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
	 *  And no, it is not the sum of the reference counts in the
	 *  resync_LRU. The resync_LRU tracks the whole operation including
	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
	 *  on the fly. */
4518 4519 4520 4521 4522
	drbd_rs_cancel_all(device);
	device->rs_total = 0;
	device->rs_failed = 0;
	atomic_set(&device->rs_pending_cnt, 0);
	wake_up(&device->misc_wait);
P
Philipp Reisner 已提交
4523

4524 4525
	del_timer_sync(&device->resync_timer);
	resync_timer_fn((unsigned long)device);
P
Philipp Reisner 已提交
4526 4527 4528 4529

	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
	 * w_make_resync_request etc. which may still be on the worker queue
	 * to be "canceled" */
4530
	drbd_flush_workqueue(device);
P
Philipp Reisner 已提交
4531

4532
	drbd_finish_peer_reqs(device);
P
Philipp Reisner 已提交
4533

4534 4535 4536
	/* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
	   might have issued a work again. The one before drbd_finish_peer_reqs() is
	   necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4537
	drbd_flush_workqueue(device);
4538

4539 4540
	/* need to do it again, drbd_finish_peer_reqs() may have populated it
	 * again via drbd_try_clear_on_disk_bm(). */
4541
	drbd_rs_cancel_all(device);
P
Philipp Reisner 已提交
4542

4543 4544
	kfree(device->p_uuid);
	device->p_uuid = NULL;
P
Philipp Reisner 已提交
4545

4546
	if (!drbd_suspended(device))
4547
		tl_clear(first_peer_device(device)->connection);
P
Philipp Reisner 已提交
4548

4549
	drbd_md_sync(device);
P
Philipp Reisner 已提交
4550

4551 4552
	/* serialize with bitmap writeout triggered by the state change,
	 * if any. */
4553
	wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4554

P
Philipp Reisner 已提交
4555 4556 4557 4558 4559 4560 4561
	/* tcp_close and release of sendpage pages can be deferred.  I don't
	 * want to use SO_LINGER, because apparently it can be deferred for
	 * more than 20 seconds (longest time I checked).
	 *
	 * Actually we don't care for exactly when the network stack does its
	 * put_page(), but release our reference on these pages right here.
	 */
4562
	i = drbd_free_peer_reqs(device, &device->net_ee);
P
Philipp Reisner 已提交
4563 4564
	if (i)
		dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4565
	i = atomic_read(&device->pp_in_use_by_net);
4566 4567
	if (i)
		dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4568
	i = atomic_read(&device->pp_in_use);
P
Philipp Reisner 已提交
4569
	if (i)
4570
		dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
P
Philipp Reisner 已提交
4571

4572 4573 4574 4575
	D_ASSERT(list_empty(&device->read_ee));
	D_ASSERT(list_empty(&device->active_ee));
	D_ASSERT(list_empty(&device->sync_ee));
	D_ASSERT(list_empty(&device->done_ee));
P
Philipp Reisner 已提交
4576

4577
	return 0;
P
Philipp Reisner 已提交
4578 4579 4580 4581 4582 4583 4584 4585 4586 4587 4588
}

/*
 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
 * we can agree on is stored in agreed_pro_version.
 *
 * feature flags and the reserved array should be enough room for future
 * enhancements of the handshake protocol, and possible plugins...
 *
 * for now, they are expected to be zero, but ignored.
 */
4589
static int drbd_send_features(struct drbd_connection *connection)
P
Philipp Reisner 已提交
4590
{
4591 4592
	struct drbd_socket *sock;
	struct p_connection_features *p;
P
Philipp Reisner 已提交
4593

4594 4595
	sock = &connection->data;
	p = conn_prepare_command(connection, sock);
4596
	if (!p)
4597
		return -EIO;
P
Philipp Reisner 已提交
4598 4599 4600
	memset(p, 0, sizeof(*p));
	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4601
	return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
P
Philipp Reisner 已提交
4602 4603 4604 4605 4606 4607 4608 4609 4610
}

/*
 * return values:
 *   1 yes, we have a valid connection
 *   0 oops, did not work out, please try again
 *  -1 peer talks different language,
 *     no point in trying again, please go standalone.
 */
4611
static int drbd_do_features(struct drbd_connection *connection)
P
Philipp Reisner 已提交
4612
{
4613
	/* ASSERT current == connection->receiver ... */
4614 4615
	struct p_connection_features *p;
	const int expect = sizeof(struct p_connection_features);
4616
	struct packet_info pi;
4617
	int err;
P
Philipp Reisner 已提交
4618

4619
	err = drbd_send_features(connection);
4620
	if (err)
P
Philipp Reisner 已提交
4621 4622
		return 0;

4623
	err = drbd_recv_header(connection, &pi);
4624
	if (err)
P
Philipp Reisner 已提交
4625 4626
		return 0;

4627
	if (pi.cmd != P_CONNECTION_FEATURES) {
4628
		conn_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4629
			 cmdname(pi.cmd), pi.cmd);
P
Philipp Reisner 已提交
4630 4631 4632
		return -1;
	}

4633
	if (pi.size != expect) {
4634
		conn_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4635
		     expect, pi.size);
P
Philipp Reisner 已提交
4636 4637 4638
		return -1;
	}

4639
	p = pi.data;
4640
	err = drbd_recv_all_warn(connection, p, expect);
4641
	if (err)
P
Philipp Reisner 已提交
4642 4643 4644 4645 4646 4647 4648 4649 4650 4651 4652
		return 0;

	p->protocol_min = be32_to_cpu(p->protocol_min);
	p->protocol_max = be32_to_cpu(p->protocol_max);
	if (p->protocol_max == 0)
		p->protocol_max = p->protocol_min;

	if (PRO_VERSION_MAX < p->protocol_min ||
	    PRO_VERSION_MIN > p->protocol_max)
		goto incompat;

4653
	connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
P
Philipp Reisner 已提交
4654

4655 4656
	conn_info(connection, "Handshake successful: "
	     "Agreed network protocol version %d\n", connection->agreed_pro_version);
P
Philipp Reisner 已提交
4657 4658 4659 4660

	return 1;

 incompat:
4661
	conn_err(connection, "incompatible DRBD dialects: "
P
Philipp Reisner 已提交
4662 4663 4664 4665 4666 4667 4668
	    "I support %d-%d, peer supports %d-%d\n",
	    PRO_VERSION_MIN, PRO_VERSION_MAX,
	    p->protocol_min, p->protocol_max);
	return -1;
}

#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4669
static int drbd_do_auth(struct drbd_connection *connection)
P
Philipp Reisner 已提交
4670
{
4671 4672
	conn_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
	conn_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4673
	return -1;
P
Philipp Reisner 已提交
4674 4675 4676
}
#else
#define CHALLENGE_LEN 64
4677 4678 4679 4680 4681 4682 4683

/* Return value:
	1 - auth succeeded,
	0 - failed, try again (network error),
	-1 - auth failed, don't try again.
*/

4684
static int drbd_do_auth(struct drbd_connection *connection)
P
Philipp Reisner 已提交
4685
{
4686
	struct drbd_socket *sock;
P
Philipp Reisner 已提交
4687 4688 4689 4690 4691
	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
	struct scatterlist sg;
	char *response = NULL;
	char *right_response = NULL;
	char *peers_ch = NULL;
4692 4693
	unsigned int key_len;
	char secret[SHARED_SECRET_MAX]; /* 64 byte */
P
Philipp Reisner 已提交
4694 4695
	unsigned int resp_size;
	struct hash_desc desc;
4696
	struct packet_info pi;
4697
	struct net_conf *nc;
4698
	int err, rv;
P
Philipp Reisner 已提交
4699

4700
	/* FIXME: Put the challenge/response into the preallocated socket buffer.  */
P
Philipp Reisner 已提交
4701

4702
	rcu_read_lock();
4703
	nc = rcu_dereference(connection->net_conf);
4704 4705 4706 4707
	key_len = strlen(nc->shared_secret);
	memcpy(secret, nc->shared_secret, key_len);
	rcu_read_unlock();

4708
	desc.tfm = connection->cram_hmac_tfm;
P
Philipp Reisner 已提交
4709 4710
	desc.flags = 0;

4711
	rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
P
Philipp Reisner 已提交
4712
	if (rv) {
4713
		conn_err(connection, "crypto_hash_setkey() failed with %d\n", rv);
4714
		rv = -1;
P
Philipp Reisner 已提交
4715 4716 4717 4718 4719
		goto fail;
	}

	get_random_bytes(my_challenge, CHALLENGE_LEN);

4720 4721
	sock = &connection->data;
	if (!conn_prepare_command(connection, sock)) {
4722 4723 4724
		rv = 0;
		goto fail;
	}
4725
	rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4726
				my_challenge, CHALLENGE_LEN);
P
Philipp Reisner 已提交
4727 4728 4729
	if (!rv)
		goto fail;

4730
	err = drbd_recv_header(connection, &pi);
4731 4732
	if (err) {
		rv = 0;
P
Philipp Reisner 已提交
4733
		goto fail;
4734
	}
P
Philipp Reisner 已提交
4735

4736
	if (pi.cmd != P_AUTH_CHALLENGE) {
4737
		conn_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4738
			 cmdname(pi.cmd), pi.cmd);
P
Philipp Reisner 已提交
4739 4740 4741 4742
		rv = 0;
		goto fail;
	}

4743
	if (pi.size > CHALLENGE_LEN * 2) {
4744
		conn_err(connection, "expected AuthChallenge payload too big.\n");
4745
		rv = -1;
P
Philipp Reisner 已提交
4746 4747 4748
		goto fail;
	}

4749
	peers_ch = kmalloc(pi.size, GFP_NOIO);
P
Philipp Reisner 已提交
4750
	if (peers_ch == NULL) {
4751
		conn_err(connection, "kmalloc of peers_ch failed\n");
4752
		rv = -1;
P
Philipp Reisner 已提交
4753 4754 4755
		goto fail;
	}

4756
	err = drbd_recv_all_warn(connection, peers_ch, pi.size);
4757
	if (err) {
P
Philipp Reisner 已提交
4758 4759 4760 4761
		rv = 0;
		goto fail;
	}

4762
	resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm);
P
Philipp Reisner 已提交
4763 4764
	response = kmalloc(resp_size, GFP_NOIO);
	if (response == NULL) {
4765
		conn_err(connection, "kmalloc of response failed\n");
4766
		rv = -1;
P
Philipp Reisner 已提交
4767 4768 4769 4770
		goto fail;
	}

	sg_init_table(&sg, 1);
4771
	sg_set_buf(&sg, peers_ch, pi.size);
P
Philipp Reisner 已提交
4772 4773 4774

	rv = crypto_hash_digest(&desc, &sg, sg.length, response);
	if (rv) {
4775
		conn_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4776
		rv = -1;
P
Philipp Reisner 已提交
4777 4778 4779
		goto fail;
	}

4780
	if (!conn_prepare_command(connection, sock)) {
4781
		rv = 0;
P
Philipp Reisner 已提交
4782
		goto fail;
4783
	}
4784
	rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
4785
				response, resp_size);
P
Philipp Reisner 已提交
4786 4787 4788
	if (!rv)
		goto fail;

4789
	err = drbd_recv_header(connection, &pi);
4790
	if (err) {
P
Philipp Reisner 已提交
4791 4792 4793 4794
		rv = 0;
		goto fail;
	}

4795
	if (pi.cmd != P_AUTH_RESPONSE) {
4796
		conn_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
4797
			 cmdname(pi.cmd), pi.cmd);
P
Philipp Reisner 已提交
4798 4799 4800 4801
		rv = 0;
		goto fail;
	}

4802
	if (pi.size != resp_size) {
4803
		conn_err(connection, "expected AuthResponse payload of wrong size\n");
P
Philipp Reisner 已提交
4804 4805 4806 4807
		rv = 0;
		goto fail;
	}

4808
	err = drbd_recv_all_warn(connection, response , resp_size);
4809
	if (err) {
P
Philipp Reisner 已提交
4810 4811 4812 4813 4814
		rv = 0;
		goto fail;
	}

	right_response = kmalloc(resp_size, GFP_NOIO);
4815
	if (right_response == NULL) {
4816
		conn_err(connection, "kmalloc of right_response failed\n");
4817
		rv = -1;
P
Philipp Reisner 已提交
4818 4819 4820 4821 4822 4823 4824
		goto fail;
	}

	sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);

	rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
	if (rv) {
4825
		conn_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4826
		rv = -1;
P
Philipp Reisner 已提交
4827 4828 4829 4830 4831 4832
		goto fail;
	}

	rv = !memcmp(response, right_response, resp_size);

	if (rv)
4833
		conn_info(connection, "Peer authenticated using %d bytes HMAC\n",
4834
		     resp_size);
4835 4836
	else
		rv = -1;
P
Philipp Reisner 已提交
4837 4838 4839 4840 4841 4842 4843 4844 4845 4846 4847 4848

 fail:
	kfree(peers_ch);
	kfree(response);
	kfree(right_response);

	return rv;
}
#endif

int drbdd_init(struct drbd_thread *thi)
{
4849
	struct drbd_connection *connection = thi->connection;
P
Philipp Reisner 已提交
4850 4851
	int h;

4852
	conn_info(connection, "receiver (re)started\n");
P
Philipp Reisner 已提交
4853 4854

	do {
4855
		h = conn_connect(connection);
P
Philipp Reisner 已提交
4856
		if (h == 0) {
4857
			conn_disconnect(connection);
4858
			schedule_timeout_interruptible(HZ);
P
Philipp Reisner 已提交
4859 4860
		}
		if (h == -1) {
4861 4862
			conn_warn(connection, "Discarding network configuration.\n");
			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
P
Philipp Reisner 已提交
4863 4864 4865
		}
	} while (h == 0);

4866
	if (h > 0)
4867
		drbdd(connection);
P
Philipp Reisner 已提交
4868

4869
	conn_disconnect(connection);
P
Philipp Reisner 已提交
4870

4871
	conn_info(connection, "receiver terminated\n");
P
Philipp Reisner 已提交
4872 4873 4874 4875 4876
	return 0;
}

/* ********* acknowledge sender ******** */

4877
static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
4878
{
4879
	struct p_req_state_reply *p = pi->data;
4880 4881 4882
	int retcode = be32_to_cpu(p->retcode);

	if (retcode >= SS_SUCCESS) {
4883
		set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
4884
	} else {
4885 4886
		set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
		conn_err(connection, "Requested state change failed by peer: %s (%d)\n",
4887 4888
			 drbd_set_st_err_str(retcode), retcode);
	}
4889
	wake_up(&connection->ping_wait);
4890

4891
	return 0;
4892
}
P
Philipp Reisner 已提交
4893

4894
static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
4895
{
4896
	struct drbd_device *device;
4897
	struct p_req_state_reply *p = pi->data;
P
Philipp Reisner 已提交
4898 4899
	int retcode = be32_to_cpu(p->retcode);

4900
	device = vnr_to_device(connection, pi->vnr);
4901
	if (!device)
4902
		return -EIO;
4903

4904 4905 4906
	if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
		D_ASSERT(connection->agreed_pro_version < 100);
		return got_conn_RqSReply(connection, pi);
4907 4908
	}

P
Philipp Reisner 已提交
4909
	if (retcode >= SS_SUCCESS) {
4910
		set_bit(CL_ST_CHG_SUCCESS, &device->flags);
P
Philipp Reisner 已提交
4911
	} else {
4912
		set_bit(CL_ST_CHG_FAIL, &device->flags);
P
Philipp Reisner 已提交
4913
		dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4914
			drbd_set_st_err_str(retcode), retcode);
P
Philipp Reisner 已提交
4915
	}
4916
	wake_up(&device->state_wait);
P
Philipp Reisner 已提交
4917

4918
	return 0;
P
Philipp Reisner 已提交
4919 4920
}

4921
static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
4922
{
4923
	return drbd_send_ping_ack(connection);
P
Philipp Reisner 已提交
4924 4925 4926

}

4927
static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
4928 4929
{
	/* restore idle timeout */
4930 4931 4932
	connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
	if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
		wake_up(&connection->ping_wait);
P
Philipp Reisner 已提交
4933

4934
	return 0;
P
Philipp Reisner 已提交
4935 4936
}

4937
static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
4938
{
4939
	struct drbd_device *device;
4940
	struct p_block_ack *p = pi->data;
P
Philipp Reisner 已提交
4941 4942 4943
	sector_t sector = be64_to_cpu(p->sector);
	int blksize = be32_to_cpu(p->blksize);

4944
	device = vnr_to_device(connection, pi->vnr);
4945
	if (!device)
4946
		return -EIO;
4947

4948
	D_ASSERT(first_peer_device(device)->connection->agreed_pro_version >= 89);
P
Philipp Reisner 已提交
4949

4950
	update_peer_seq(device, be32_to_cpu(p->seq_num));
P
Philipp Reisner 已提交
4951

4952 4953 4954
	if (get_ldev(device)) {
		drbd_rs_complete_io(device, sector);
		drbd_set_in_sync(device, sector, blksize);
4955
		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4956 4957
		device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
		put_ldev(device);
4958
	}
4959 4960
	dec_rs_pending(device);
	atomic_add(blksize >> 9, &device->rs_sect_in);
P
Philipp Reisner 已提交
4961

4962
	return 0;
P
Philipp Reisner 已提交
4963 4964
}

4965
static int
4966
validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
4967 4968
			      struct rb_root *root, const char *func,
			      enum drbd_req_event what, bool missing_ok)
P
Philipp Reisner 已提交
4969 4970 4971 4972
{
	struct drbd_request *req;
	struct bio_and_error m;

4973
	spin_lock_irq(&first_peer_device(device)->connection->req_lock);
4974
	req = find_request(device, root, id, sector, missing_ok, func);
P
Philipp Reisner 已提交
4975
	if (unlikely(!req)) {
4976
		spin_unlock_irq(&first_peer_device(device)->connection->req_lock);
4977
		return -EIO;
P
Philipp Reisner 已提交
4978 4979
	}
	__req_mod(req, what, &m);
4980
	spin_unlock_irq(&first_peer_device(device)->connection->req_lock);
P
Philipp Reisner 已提交
4981 4982

	if (m.bio)
4983
		complete_master_bio(device, &m);
4984
	return 0;
P
Philipp Reisner 已提交
4985 4986
}

4987
static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
4988
{
4989
	struct drbd_device *device;
4990
	struct p_block_ack *p = pi->data;
P
Philipp Reisner 已提交
4991 4992 4993 4994
	sector_t sector = be64_to_cpu(p->sector);
	int blksize = be32_to_cpu(p->blksize);
	enum drbd_req_event what;

4995
	device = vnr_to_device(connection, pi->vnr);
4996
	if (!device)
4997
		return -EIO;
4998

4999
	update_peer_seq(device, be32_to_cpu(p->seq_num));
P
Philipp Reisner 已提交
5000

5001
	if (p->block_id == ID_SYNCER) {
5002 5003
		drbd_set_in_sync(device, sector, blksize);
		dec_rs_pending(device);
5004
		return 0;
P
Philipp Reisner 已提交
5005
	}
5006
	switch (pi->cmd) {
P
Philipp Reisner 已提交
5007
	case P_RS_WRITE_ACK:
5008
		what = WRITE_ACKED_BY_PEER_AND_SIS;
P
Philipp Reisner 已提交
5009 5010
		break;
	case P_WRITE_ACK:
5011
		what = WRITE_ACKED_BY_PEER;
P
Philipp Reisner 已提交
5012 5013
		break;
	case P_RECV_ACK:
5014
		what = RECV_ACKED_BY_PEER;
P
Philipp Reisner 已提交
5015
		break;
5016 5017
	case P_SUPERSEDED:
		what = CONFLICT_RESOLVED;
P
Philipp Reisner 已提交
5018
		break;
5019 5020
	case P_RETRY_WRITE:
		what = POSTPONE_WRITE;
P
Philipp Reisner 已提交
5021 5022
		break;
	default:
5023
		BUG();
P
Philipp Reisner 已提交
5024 5025
	}

5026 5027
	return validate_req_change_req_state(device, p->block_id, sector,
					     &device->write_requests, __func__,
5028
					     what, false);
P
Philipp Reisner 已提交
5029 5030
}

5031
static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
5032
{
5033
	struct drbd_device *device;
5034
	struct p_block_ack *p = pi->data;
P
Philipp Reisner 已提交
5035
	sector_t sector = be64_to_cpu(p->sector);
5036
	int size = be32_to_cpu(p->blksize);
5037
	int err;
P
Philipp Reisner 已提交
5038

5039
	device = vnr_to_device(connection, pi->vnr);
5040
	if (!device)
5041
		return -EIO;
P
Philipp Reisner 已提交
5042

5043
	update_peer_seq(device, be32_to_cpu(p->seq_num));
P
Philipp Reisner 已提交
5044

5045
	if (p->block_id == ID_SYNCER) {
5046 5047
		dec_rs_pending(device);
		drbd_rs_failed_io(device, sector, size);
5048
		return 0;
P
Philipp Reisner 已提交
5049
	}
5050

5051 5052
	err = validate_req_change_req_state(device, p->block_id, sector,
					    &device->write_requests, __func__,
5053
					    NEG_ACKED, true);
5054
	if (err) {
5055 5056 5057 5058 5059
		/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
		   The master bio might already be completed, therefore the
		   request is no longer in the collision hash. */
		/* In Protocol B we might already have got a P_RECV_ACK
		   but then get a P_NEG_ACK afterwards. */
5060
		drbd_set_out_of_sync(device, sector, size);
5061
	}
5062
	return 0;
P
Philipp Reisner 已提交
5063 5064
}

5065
static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
5066
{
5067
	struct drbd_device *device;
5068
	struct p_block_ack *p = pi->data;
P
Philipp Reisner 已提交
5069 5070
	sector_t sector = be64_to_cpu(p->sector);

5071
	device = vnr_to_device(connection, pi->vnr);
5072
	if (!device)
5073
		return -EIO;
5074

5075
	update_peer_seq(device, be32_to_cpu(p->seq_num));
5076

5077
	dev_err(DEV, "Got NegDReply; Sector %llus, len %u.\n",
P
Philipp Reisner 已提交
5078 5079
	    (unsigned long long)sector, be32_to_cpu(p->blksize));

5080 5081
	return validate_req_change_req_state(device, p->block_id, sector,
					     &device->read_requests, __func__,
5082
					     NEG_ACKED, false);
P
Philipp Reisner 已提交
5083 5084
}

5085
static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
5086
{
5087
	struct drbd_device *device;
P
Philipp Reisner 已提交
5088 5089
	sector_t sector;
	int size;
5090
	struct p_block_ack *p = pi->data;
5091

5092
	device = vnr_to_device(connection, pi->vnr);
5093
	if (!device)
5094
		return -EIO;
P
Philipp Reisner 已提交
5095 5096 5097 5098

	sector = be64_to_cpu(p->sector);
	size = be32_to_cpu(p->blksize);

5099
	update_peer_seq(device, be32_to_cpu(p->seq_num));
P
Philipp Reisner 已提交
5100

5101
	dec_rs_pending(device);
P
Philipp Reisner 已提交
5102

5103 5104
	if (get_ldev_if_state(device, D_FAILED)) {
		drbd_rs_complete_io(device, sector);
5105
		switch (pi->cmd) {
5106
		case P_NEG_RS_DREPLY:
5107
			drbd_rs_failed_io(device, sector, size);
5108 5109 5110
		case P_RS_CANCEL:
			break;
		default:
5111
			BUG();
5112
		}
5113
		put_ldev(device);
P
Philipp Reisner 已提交
5114 5115
	}

5116
	return 0;
P
Philipp Reisner 已提交
5117 5118
}

5119
static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
5120
{
5121
	struct p_barrier_ack *p = pi->data;
5122
	struct drbd_peer_device *peer_device;
5123
	int vnr;
5124

5125
	tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
P
Philipp Reisner 已提交
5126

5127
	rcu_read_lock();
5128 5129 5130
	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
		struct drbd_device *device = peer_device->device;

5131 5132 5133 5134 5135
		if (device->state.conn == C_AHEAD &&
		    atomic_read(&device->ap_in_flight) == 0 &&
		    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
			device->start_resync_timer.expires = jiffies + HZ;
			add_timer(&device->start_resync_timer);
5136
		}
5137
	}
5138
	rcu_read_unlock();
5139

5140
	return 0;
P
Philipp Reisner 已提交
5141 5142
}

5143
static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
5144
{
5145
	struct drbd_device *device;
5146
	struct p_block_ack *p = pi->data;
P
Philipp Reisner 已提交
5147 5148 5149 5150
	struct drbd_work *w;
	sector_t sector;
	int size;

5151
	device = vnr_to_device(connection, pi->vnr);
5152
	if (!device)
5153
		return -EIO;
5154

P
Philipp Reisner 已提交
5155 5156 5157
	sector = be64_to_cpu(p->sector);
	size = be32_to_cpu(p->blksize);

5158
	update_peer_seq(device, be32_to_cpu(p->seq_num));
P
Philipp Reisner 已提交
5159 5160

	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5161
		drbd_ov_out_of_sync_found(device, sector, size);
P
Philipp Reisner 已提交
5162
	else
5163
		ov_out_of_sync_print(device);
P
Philipp Reisner 已提交
5164

5165
	if (!get_ldev(device))
5166
		return 0;
5167

5168 5169
	drbd_rs_complete_io(device, sector);
	dec_rs_pending(device);
P
Philipp Reisner 已提交
5170

5171
	--device->ov_left;
5172 5173

	/* let's advance progress step marks only for every other megabyte */
5174 5175
	if ((device->ov_left & 0x200) == 0x200)
		drbd_advance_rs_marks(device, device->ov_left);
5176

5177
	if (device->ov_left == 0) {
P
Philipp Reisner 已提交
5178 5179 5180
		w = kmalloc(sizeof(*w), GFP_NOIO);
		if (w) {
			w->cb = w_ov_finished;
5181
			w->device = device;
5182
			drbd_queue_work(&first_peer_device(device)->connection->sender_work, w);
P
Philipp Reisner 已提交
5183 5184
		} else {
			dev_err(DEV, "kmalloc(w) failed.");
5185 5186
			ov_out_of_sync_print(device);
			drbd_resync_finished(device);
P
Philipp Reisner 已提交
5187 5188
		}
	}
5189
	put_ldev(device);
5190
	return 0;
P
Philipp Reisner 已提交
5191 5192
}

5193
static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5194
{
5195
	return 0;
P
Philipp Reisner 已提交
5196 5197
}

5198
static int connection_finish_peer_reqs(struct drbd_connection *connection)
5199
{
5200
	struct drbd_peer_device *peer_device;
P
Philipp Reisner 已提交
5201
	int vnr, not_empty = 0;
5202 5203

	do {
5204
		clear_bit(SIGNAL_ASENDER, &connection->flags);
5205
		flush_signals(current);
P
Philipp Reisner 已提交
5206 5207

		rcu_read_lock();
5208 5209
		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
			struct drbd_device *device = peer_device->device;
5210
			kref_get(&device->kref);
P
Philipp Reisner 已提交
5211
			rcu_read_unlock();
5212
			if (drbd_finish_peer_reqs(device)) {
5213
				kref_put(&device->kref, drbd_destroy_device);
P
Philipp Reisner 已提交
5214
				return 1;
5215
			}
5216
			kref_put(&device->kref, drbd_destroy_device);
P
Philipp Reisner 已提交
5217
			rcu_read_lock();
5218
		}
5219
		set_bit(SIGNAL_ASENDER, &connection->flags);
5220

5221
		spin_lock_irq(&connection->req_lock);
5222 5223
		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
			struct drbd_device *device = peer_device->device;
5224
			not_empty = !list_empty(&device->done_ee);
5225 5226 5227
			if (not_empty)
				break;
		}
5228
		spin_unlock_irq(&connection->req_lock);
P
Philipp Reisner 已提交
5229
		rcu_read_unlock();
5230 5231 5232
	} while (not_empty);

	return 0;
5233 5234
}

P
Philipp Reisner 已提交
5235 5236
struct asender_cmd {
	size_t pkt_size;
5237
	int (*fn)(struct drbd_connection *connection, struct packet_info *);
P
Philipp Reisner 已提交
5238 5239
};

5240
static struct asender_cmd asender_tbl[] = {
5241 5242
	[P_PING]	    = { 0, got_Ping },
	[P_PING_ACK]	    = { 0, got_PingAck },
P
Philipp Reisner 已提交
5243 5244 5245
	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5246
	[P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
P
Philipp Reisner 已提交
5247 5248
	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
5249
	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
P
Philipp Reisner 已提交
5250 5251 5252 5253
	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5254
	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5255 5256 5257
	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
	[P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
	[P_RETRY_WRITE]	    = { sizeof(struct p_block_ack), got_BlockAck },
5258
};
P
Philipp Reisner 已提交
5259 5260 5261

int drbd_asender(struct drbd_thread *thi)
{
5262
	struct drbd_connection *connection = thi->connection;
P
Philipp Reisner 已提交
5263
	struct asender_cmd *cmd = NULL;
5264
	struct packet_info pi;
5265
	int rv;
5266
	void *buf    = connection->meta.rbuf;
P
Philipp Reisner 已提交
5267
	int received = 0;
5268
	unsigned int header_size = drbd_header_size(connection);
5269
	int expect   = header_size;
5270 5271
	bool ping_timeout_active = false;
	struct net_conf *nc;
5272
	int ping_timeo, tcp_cork, ping_int;
P
Philipp Reisner 已提交
5273
	struct sched_param param = { .sched_priority = 2 };
P
Philipp Reisner 已提交
5274

P
Philipp Reisner 已提交
5275 5276
	rv = sched_setscheduler(current, SCHED_RR, &param);
	if (rv < 0)
5277
		conn_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv);
P
Philipp Reisner 已提交
5278

5279
	while (get_t_state(thi) == RUNNING) {
5280
		drbd_thread_current_set_cpu(thi);
P
Philipp Reisner 已提交
5281

5282
		rcu_read_lock();
5283
		nc = rcu_dereference(connection->net_conf);
5284
		ping_timeo = nc->ping_timeo;
5285
		tcp_cork = nc->tcp_cork;
5286 5287 5288
		ping_int = nc->ping_int;
		rcu_read_unlock();

5289 5290 5291
		if (test_and_clear_bit(SEND_PING, &connection->flags)) {
			if (drbd_send_ping(connection)) {
				conn_err(connection, "drbd_send_ping has failed\n");
P
Philipp Reisner 已提交
5292
				goto reconnect;
5293
			}
5294
			connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5295
			ping_timeout_active = true;
P
Philipp Reisner 已提交
5296 5297
		}

5298 5299
		/* TODO: conditionally cork; it may hurt latency if we cork without
		   much to send */
5300
		if (tcp_cork)
5301 5302 5303
			drbd_tcp_cork(connection->meta.socket);
		if (connection_finish_peer_reqs(connection)) {
			conn_err(connection, "connection_finish_peer_reqs() failed\n");
5304
			goto reconnect;
P
Philipp Reisner 已提交
5305 5306
		}
		/* but unconditionally uncork unless disabled */
5307
		if (tcp_cork)
5308
			drbd_tcp_uncork(connection->meta.socket);
P
Philipp Reisner 已提交
5309 5310 5311 5312 5313

		/* short circuit, recv_msg would return EINTR anyways. */
		if (signal_pending(current))
			continue;

5314 5315
		rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
		clear_bit(SIGNAL_ASENDER, &connection->flags);
P
Philipp Reisner 已提交
5316 5317 5318 5319 5320 5321 5322 5323 5324 5325 5326 5327 5328 5329 5330 5331 5332

		flush_signals(current);

		/* Note:
		 * -EINTR	 (on meta) we got a signal
		 * -EAGAIN	 (on meta) rcvtimeo expired
		 * -ECONNRESET	 other side closed the connection
		 * -ERESTARTSYS  (on data) we got a signal
		 * rv <  0	 other than above: unexpected error!
		 * rv == expected: full header or command
		 * rv <  expected: "woken" by signal during receive
		 * rv == 0	 : "connection shut down by peer"
		 */
		if (likely(rv > 0)) {
			received += rv;
			buf	 += rv;
		} else if (rv == 0) {
5333
			if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5334 5335
				long t;
				rcu_read_lock();
5336
				t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5337 5338
				rcu_read_unlock();

5339 5340
				t = wait_event_timeout(connection->ping_wait,
						       connection->cstate < C_WF_REPORT_PARAMS,
5341
						       t);
5342 5343 5344
				if (t)
					break;
			}
5345
			conn_err(connection, "meta connection shut down by peer.\n");
P
Philipp Reisner 已提交
5346 5347
			goto reconnect;
		} else if (rv == -EAGAIN) {
5348 5349
			/* If the data socket received something meanwhile,
			 * that is good enough: peer is still alive. */
5350 5351
			if (time_after(connection->last_received,
				jiffies - connection->meta.socket->sk->sk_rcvtimeo))
5352
				continue;
5353
			if (ping_timeout_active) {
5354
				conn_err(connection, "PingAck did not arrive in time.\n");
P
Philipp Reisner 已提交
5355 5356
				goto reconnect;
			}
5357
			set_bit(SEND_PING, &connection->flags);
P
Philipp Reisner 已提交
5358 5359 5360 5361
			continue;
		} else if (rv == -EINTR) {
			continue;
		} else {
5362
			conn_err(connection, "sock_recvmsg returned %d\n", rv);
P
Philipp Reisner 已提交
5363 5364 5365 5366
			goto reconnect;
		}

		if (received == expect && cmd == NULL) {
5367
			if (decode_header(connection, connection->meta.rbuf, &pi))
P
Philipp Reisner 已提交
5368
				goto reconnect;
5369
			cmd = &asender_tbl[pi.cmd];
5370
			if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5371
				conn_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5372
					 cmdname(pi.cmd), pi.cmd);
P
Philipp Reisner 已提交
5373 5374
				goto disconnect;
			}
5375
			expect = header_size + cmd->pkt_size;
5376
			if (pi.size != expect - header_size) {
5377
				conn_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5378
					pi.cmd, pi.size);
P
Philipp Reisner 已提交
5379
				goto reconnect;
5380
			}
P
Philipp Reisner 已提交
5381 5382
		}
		if (received == expect) {
5383
			bool err;
5384

5385
			err = cmd->fn(connection, &pi);
5386
			if (err) {
5387
				conn_err(connection, "%pf failed\n", cmd->fn);
P
Philipp Reisner 已提交
5388
				goto reconnect;
5389
			}
P
Philipp Reisner 已提交
5390

5391
			connection->last_received = jiffies;
5392

5393 5394
			if (cmd == &asender_tbl[P_PING_ACK]) {
				/* restore idle timeout */
5395
				connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5396 5397
				ping_timeout_active = false;
			}
5398

5399
			buf	 = connection->meta.rbuf;
P
Philipp Reisner 已提交
5400
			received = 0;
5401
			expect	 = header_size;
P
Philipp Reisner 已提交
5402 5403 5404 5405 5406 5407
			cmd	 = NULL;
		}
	}

	if (0) {
reconnect:
5408 5409
		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
		conn_md_sync(connection);
P
Philipp Reisner 已提交
5410 5411 5412
	}
	if (0) {
disconnect:
5413
		conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
P
Philipp Reisner 已提交
5414
	}
5415
	clear_bit(SIGNAL_ASENDER, &connection->flags);
P
Philipp Reisner 已提交
5416

5417
	conn_info(connection, "asender terminated\n");
P
Philipp Reisner 已提交
5418 5419 5420

	return 0;
}