drbd_receiver.c 158.2 KB
Newer Older
P
Philipp Reisner 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
/*
   drbd_receiver.c

   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.

   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.

   drbd is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2, or (at your option)
   any later version.

   drbd is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with drbd; see the file COPYING.  If not, write to
   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
 */


#include <linux/module.h>

#include <asm/uaccess.h>
#include <net/sock.h>

#include <linux/drbd.h>
#include <linux/fs.h>
#include <linux/file.h>
#include <linux/in.h>
#include <linux/mm.h>
#include <linux/memcontrol.h>
#include <linux/mm_inline.h>
#include <linux/slab.h>
#include <linux/pkt_sched.h>
#define __KERNEL_SYSCALLS__
#include <linux/unistd.h>
#include <linux/vmalloc.h>
#include <linux/random.h>
#include <linux/string.h>
#include <linux/scatterlist.h>
#include "drbd_int.h"
47
#include "drbd_protocol.h"
P
Philipp Reisner 已提交
48 49 50
#include "drbd_req.h"
#include "drbd_vli.h"

51 52
#define PRO_FEATURES (FF_TRIM)

53 54
struct packet_info {
	enum drbd_packet cmd;
55 56
	unsigned int size;
	unsigned int vnr;
57
	void *data;
58 59
};

P
Philipp Reisner 已提交
60 61 62 63 64 65
enum finish_epoch {
	FE_STILL_LIVE,
	FE_DESTROYED,
	FE_RECYCLED,
};

66 67
static int drbd_do_features(struct drbd_connection *connection);
static int drbd_do_auth(struct drbd_connection *connection);
68
static int drbd_disconnected(struct drbd_peer_device *);
69
static void conn_wait_active_ee_empty(struct drbd_connection *connection);
70
static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
71
static int e_end_block(struct drbd_work *, int);
P
Philipp Reisner 已提交
72 73 74 75


#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)

76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
/*
 * some helper functions to deal with single linked page lists,
 * page->private being our "next" pointer.
 */

/* If at least n pages are linked at head, get n pages off.
 * Otherwise, don't modify head, and return NULL.
 * Locking is the responsibility of the caller.
 */
static struct page *page_chain_del(struct page **head, int n)
{
	struct page *page;
	struct page *tmp;

	BUG_ON(!n);
	BUG_ON(!head);

	page = *head;
94 95 96 97

	if (!page)
		return NULL;

98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154
	while (page) {
		tmp = page_chain_next(page);
		if (--n == 0)
			break; /* found sufficient pages */
		if (tmp == NULL)
			/* insufficient pages, don't use any of them. */
			return NULL;
		page = tmp;
	}

	/* add end of list marker for the returned list */
	set_page_private(page, 0);
	/* actual return value, and adjustment of head */
	page = *head;
	*head = tmp;
	return page;
}

/* may be used outside of locks to find the tail of a (usually short)
 * "private" page chain, before adding it back to a global chain head
 * with page_chain_add() under a spinlock. */
static struct page *page_chain_tail(struct page *page, int *len)
{
	struct page *tmp;
	int i = 1;
	while ((tmp = page_chain_next(page)))
		++i, page = tmp;
	if (len)
		*len = i;
	return page;
}

static int page_chain_free(struct page *page)
{
	struct page *tmp;
	int i = 0;
	page_chain_for_each_safe(page, tmp) {
		put_page(page);
		++i;
	}
	return i;
}

static void page_chain_add(struct page **head,
		struct page *chain_first, struct page *chain_last)
{
#if 1
	struct page *tmp;
	tmp = page_chain_tail(chain_first, NULL);
	BUG_ON(tmp != chain_last);
#endif

	/* add chain to head */
	set_page_private(chain_last, (unsigned long)*head);
	*head = chain_first;
}

155
static struct page *__drbd_alloc_pages(struct drbd_device *device,
156
				       unsigned int number)
P
Philipp Reisner 已提交
157 158
{
	struct page *page = NULL;
159
	struct page *tmp = NULL;
160
	unsigned int i = 0;
P
Philipp Reisner 已提交
161 162 163

	/* Yes, testing drbd_pp_vacant outside the lock is racy.
	 * So what. It saves a spin_lock. */
164
	if (drbd_pp_vacant >= number) {
P
Philipp Reisner 已提交
165
		spin_lock(&drbd_pp_lock);
166 167 168
		page = page_chain_del(&drbd_pp_pool, number);
		if (page)
			drbd_pp_vacant -= number;
P
Philipp Reisner 已提交
169
		spin_unlock(&drbd_pp_lock);
170 171
		if (page)
			return page;
P
Philipp Reisner 已提交
172
	}
173

P
Philipp Reisner 已提交
174 175 176
	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
	 * "criss-cross" setup, that might cause write-out on some other DRBD,
	 * which in turn might block on the other node at this very place.  */
177 178 179 180 181 182 183 184 185 186 187 188
	for (i = 0; i < number; i++) {
		tmp = alloc_page(GFP_TRY);
		if (!tmp)
			break;
		set_page_private(tmp, (unsigned long)page);
		page = tmp;
	}

	if (i == number)
		return page;

	/* Not enough pages immediately available this time.
189
	 * No need to jump around here, drbd_alloc_pages will retry this
190 191 192 193 194 195 196 197 198
	 * function "soon". */
	if (page) {
		tmp = page_chain_tail(page, NULL);
		spin_lock(&drbd_pp_lock);
		page_chain_add(&drbd_pp_pool, page, tmp);
		drbd_pp_vacant += i;
		spin_unlock(&drbd_pp_lock);
	}
	return NULL;
P
Philipp Reisner 已提交
199 200
}

201
static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
202
					   struct list_head *to_be_freed)
P
Philipp Reisner 已提交
203
{
204
	struct drbd_peer_request *peer_req, *tmp;
P
Philipp Reisner 已提交
205 206 207 208 209 210

	/* The EEs are always appended to the end of the list. Since
	   they are sent in order over the wire, they have to finish
	   in order. As soon as we see the first not finished we can
	   stop to examine the list... */

211
	list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
212
		if (drbd_peer_req_has_active_page(peer_req))
P
Philipp Reisner 已提交
213
			break;
214
		list_move(&peer_req->w.list, to_be_freed);
P
Philipp Reisner 已提交
215 216 217
	}
}

218
static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device)
P
Philipp Reisner 已提交
219 220
{
	LIST_HEAD(reclaimed);
221
	struct drbd_peer_request *peer_req, *t;
P
Philipp Reisner 已提交
222

223
	spin_lock_irq(&device->resource->req_lock);
224
	reclaim_finished_net_peer_reqs(device, &reclaimed);
225
	spin_unlock_irq(&device->resource->req_lock);
P
Philipp Reisner 已提交
226

227
	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228
		drbd_free_net_peer_req(device, peer_req);
P
Philipp Reisner 已提交
229 230 231
}

/**
232
 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233
 * @device:	DRBD device.
234 235 236 237
 * @number:	number of pages requested
 * @retry:	whether to retry, if not enough pages are available right now
 *
 * Tries to allocate number pages, first from our own page pool, then from
238
 * the kernel.
239
 * Possibly retry until DRBD frees sufficient pages somewhere else.
P
Philipp Reisner 已提交
240
 *
241 242 243 244 245 246 247 248
 * If this allocation would exceed the max_buffers setting, we throttle
 * allocation (schedule_timeout) to give the system some room to breathe.
 *
 * We do not use max-buffers as hard limit, because it could lead to
 * congestion and further to a distributed deadlock during online-verify or
 * (checksum based) resync, if the max-buffers, socket buffer sizes and
 * resync-rate settings are mis-configured.
 *
249
 * Returns a page chain linked via page->private.
P
Philipp Reisner 已提交
250
 */
251
struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
252
			      bool retry)
P
Philipp Reisner 已提交
253
{
254
	struct drbd_device *device = peer_device->device;
P
Philipp Reisner 已提交
255
	struct page *page = NULL;
256
	struct net_conf *nc;
P
Philipp Reisner 已提交
257
	DEFINE_WAIT(wait);
258
	unsigned int mxb;
P
Philipp Reisner 已提交
259

260
	rcu_read_lock();
261
	nc = rcu_dereference(peer_device->connection->net_conf);
262 263 264
	mxb = nc ? nc->max_buffers : 1000000;
	rcu_read_unlock();

265 266
	if (atomic_read(&device->pp_in_use) < mxb)
		page = __drbd_alloc_pages(device, number);
P
Philipp Reisner 已提交
267

268
	while (page == NULL) {
P
Philipp Reisner 已提交
269 270
		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);

271
		drbd_kick_lo_and_reclaim_net(device);
P
Philipp Reisner 已提交
272

273 274
		if (atomic_read(&device->pp_in_use) < mxb) {
			page = __drbd_alloc_pages(device, number);
P
Philipp Reisner 已提交
275 276 277 278 279 280 281 282
			if (page)
				break;
		}

		if (!retry)
			break;

		if (signal_pending(current)) {
283
			drbd_warn(device, "drbd_alloc_pages interrupted!\n");
P
Philipp Reisner 已提交
284 285 286
			break;
		}

287 288
		if (schedule_timeout(HZ/10) == 0)
			mxb = UINT_MAX;
P
Philipp Reisner 已提交
289 290 291
	}
	finish_wait(&drbd_pp_wait, &wait);

292
	if (page)
293
		atomic_add(number, &device->pp_in_use);
P
Philipp Reisner 已提交
294 295 296
	return page;
}

297
/* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
298
 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
299 300
 * Either links the page chain back to the global pool,
 * or returns all pages to the system. */
301
static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
P
Philipp Reisner 已提交
302
{
303
	atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
P
Philipp Reisner 已提交
304
	int i;
305

306 307 308
	if (page == NULL)
		return;

309
	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
310 311 312 313 314 315 316 317
		i = page_chain_free(page);
	else {
		struct page *tmp;
		tmp = page_chain_tail(page, &i);
		spin_lock(&drbd_pp_lock);
		page_chain_add(&drbd_pp_pool, page, tmp);
		drbd_pp_vacant += i;
		spin_unlock(&drbd_pp_lock);
P
Philipp Reisner 已提交
318
	}
319
	i = atomic_sub_return(i, a);
320
	if (i < 0)
321
		drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
322
			is_net ? "pp_in_use_by_net" : "pp_in_use", i);
P
Philipp Reisner 已提交
323 324 325 326 327 328 329 330
	wake_up(&drbd_pp_wait);
}

/*
You need to hold the req_lock:
 _drbd_wait_ee_list_empty()

You must not have the req_lock:
331
 drbd_free_peer_req()
332
 drbd_alloc_peer_req()
333
 drbd_free_peer_reqs()
P
Philipp Reisner 已提交
334
 drbd_ee_fix_bhs()
335
 drbd_finish_peer_reqs()
P
Philipp Reisner 已提交
336 337 338 339
 drbd_clear_done_ee()
 drbd_wait_ee_list_empty()
*/

340
struct drbd_peer_request *
341
drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
342
		    unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local)
P
Philipp Reisner 已提交
343
{
344
	struct drbd_device *device = peer_device->device;
345
	struct drbd_peer_request *peer_req;
346
	struct page *page = NULL;
347
	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
P
Philipp Reisner 已提交
348

349
	if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
P
Philipp Reisner 已提交
350 351
		return NULL;

352 353
	peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
	if (!peer_req) {
P
Philipp Reisner 已提交
354
		if (!(gfp_mask & __GFP_NOWARN))
355
			drbd_err(device, "%s: allocation failed\n", __func__);
P
Philipp Reisner 已提交
356 357 358
		return NULL;
	}

359
	if (has_payload && data_size) {
360
		page = drbd_alloc_pages(peer_device, nr_pages, (gfp_mask & __GFP_WAIT));
361 362 363
		if (!page)
			goto fail;
	}
P
Philipp Reisner 已提交
364

365 366 367 368 369 370 371
	drbd_clear_interval(&peer_req->i);
	peer_req->i.size = data_size;
	peer_req->i.sector = sector;
	peer_req->i.local = false;
	peer_req->i.waiting = false;

	peer_req->epoch = NULL;
372
	peer_req->peer_device = peer_device;
373 374 375
	peer_req->pages = page;
	atomic_set(&peer_req->pending_bios, 0);
	peer_req->flags = 0;
376 377 378 379
	/*
	 * The block_id is opaque to the receiver.  It is not endianness
	 * converted, and sent back to the sender unchanged.
	 */
380
	peer_req->block_id = id;
P
Philipp Reisner 已提交
381

382
	return peer_req;
P
Philipp Reisner 已提交
383

384
 fail:
385
	mempool_free(peer_req, drbd_ee_mempool);
P
Philipp Reisner 已提交
386 387 388
	return NULL;
}

389
void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
390
		       int is_net)
P
Philipp Reisner 已提交
391
{
392 393
	if (peer_req->flags & EE_HAS_DIGEST)
		kfree(peer_req->digest);
394
	drbd_free_pages(device, peer_req->pages, is_net);
395 396
	D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
397
	mempool_free(peer_req, drbd_ee_mempool);
P
Philipp Reisner 已提交
398 399
}

400
int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
P
Philipp Reisner 已提交
401 402
{
	LIST_HEAD(work_list);
403
	struct drbd_peer_request *peer_req, *t;
P
Philipp Reisner 已提交
404
	int count = 0;
405
	int is_net = list == &device->net_ee;
P
Philipp Reisner 已提交
406

407
	spin_lock_irq(&device->resource->req_lock);
P
Philipp Reisner 已提交
408
	list_splice_init(list, &work_list);
409
	spin_unlock_irq(&device->resource->req_lock);
P
Philipp Reisner 已提交
410

411
	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
412
		__drbd_free_peer_req(device, peer_req, is_net);
P
Philipp Reisner 已提交
413 414 415 416 417 418
		count++;
	}
	return count;
}

/*
419
 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
P
Philipp Reisner 已提交
420
 */
421
static int drbd_finish_peer_reqs(struct drbd_device *device)
P
Philipp Reisner 已提交
422 423 424
{
	LIST_HEAD(work_list);
	LIST_HEAD(reclaimed);
425
	struct drbd_peer_request *peer_req, *t;
426
	int err = 0;
P
Philipp Reisner 已提交
427

428
	spin_lock_irq(&device->resource->req_lock);
429 430
	reclaim_finished_net_peer_reqs(device, &reclaimed);
	list_splice_init(&device->done_ee, &work_list);
431
	spin_unlock_irq(&device->resource->req_lock);
P
Philipp Reisner 已提交
432

433
	list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
434
		drbd_free_net_peer_req(device, peer_req);
P
Philipp Reisner 已提交
435 436

	/* possible callbacks here:
437
	 * e_end_block, and e_end_resync_block, e_send_superseded.
P
Philipp Reisner 已提交
438 439
	 * all ignore the last argument.
	 */
440
	list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
441 442
		int err2;

P
Philipp Reisner 已提交
443
		/* list_del not necessary, next/prev members not touched */
444
		err2 = peer_req->w.cb(&peer_req->w, !!err);
445 446
		if (!err)
			err = err2;
447
		drbd_free_peer_req(device, peer_req);
P
Philipp Reisner 已提交
448
	}
449
	wake_up(&device->ee_wait);
P
Philipp Reisner 已提交
450

451
	return err;
P
Philipp Reisner 已提交
452 453
}

454
static void _drbd_wait_ee_list_empty(struct drbd_device *device,
455
				     struct list_head *head)
P
Philipp Reisner 已提交
456 457 458 459 460 461
{
	DEFINE_WAIT(wait);

	/* avoids spin_lock/unlock
	 * and calling prepare_to_wait in the fast path */
	while (!list_empty(head)) {
462
		prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
463
		spin_unlock_irq(&device->resource->req_lock);
J
Jens Axboe 已提交
464
		io_schedule();
465
		finish_wait(&device->ee_wait, &wait);
466
		spin_lock_irq(&device->resource->req_lock);
P
Philipp Reisner 已提交
467 468 469
	}
}

470
static void drbd_wait_ee_list_empty(struct drbd_device *device,
471
				    struct list_head *head)
P
Philipp Reisner 已提交
472
{
473
	spin_lock_irq(&device->resource->req_lock);
474
	_drbd_wait_ee_list_empty(device, head);
475
	spin_unlock_irq(&device->resource->req_lock);
P
Philipp Reisner 已提交
476 477
}

478
static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
P
Philipp Reisner 已提交
479 480 481 482 483 484 485 486
{
	struct kvec iov = {
		.iov_base = buf,
		.iov_len = size,
	};
	struct msghdr msg = {
		.msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
	};
A
Al Viro 已提交
487
	return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
P
Philipp Reisner 已提交
488 489
}

490
static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
P
Philipp Reisner 已提交
491 492 493
{
	int rv;

494
	rv = drbd_recv_short(connection->data.socket, buf, size, 0);
P
Philipp Reisner 已提交
495

P
Philipp Reisner 已提交
496 497
	if (rv < 0) {
		if (rv == -ECONNRESET)
498
			drbd_info(connection, "sock was reset by peer\n");
P
Philipp Reisner 已提交
499
		else if (rv != -ERESTARTSYS)
500
			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
P
Philipp Reisner 已提交
501
	} else if (rv == 0) {
502
		if (test_bit(DISCONNECT_SENT, &connection->flags)) {
503 504
			long t;
			rcu_read_lock();
505
			t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
506 507
			rcu_read_unlock();

508
			t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
509

510 511 512
			if (t)
				goto out;
		}
513
		drbd_info(connection, "sock was shut down by peer\n");
514 515
	}

P
Philipp Reisner 已提交
516
	if (rv != size)
517
		conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
P
Philipp Reisner 已提交
518

519
out:
P
Philipp Reisner 已提交
520 521 522
	return rv;
}

523
static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
524 525 526
{
	int err;

527
	err = drbd_recv(connection, buf, size);
528 529 530 531 532 533 534 535
	if (err != size) {
		if (err >= 0)
			err = -EIO;
	} else
		err = 0;
	return err;
}

536
static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
537 538 539
{
	int err;

540
	err = drbd_recv_all(connection, buf, size);
541
	if (err && !signal_pending(current))
542
		drbd_warn(connection, "short read (expected size %d)\n", (int)size);
543 544 545
	return err;
}

546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564
/* quoting tcp(7):
 *   On individual connections, the socket buffer size must be set prior to the
 *   listen(2) or connect(2) calls in order to have it take effect.
 * This is our wrapper to do so.
 */
static void drbd_setbufsize(struct socket *sock, unsigned int snd,
		unsigned int rcv)
{
	/* open coded SO_SNDBUF, SO_RCVBUF */
	if (snd) {
		sock->sk->sk_sndbuf = snd;
		sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
	}
	if (rcv) {
		sock->sk->sk_rcvbuf = rcv;
		sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
	}
}

565
static struct socket *drbd_try_connect(struct drbd_connection *connection)
P
Philipp Reisner 已提交
566 567 568 569
{
	const char *what;
	struct socket *sock;
	struct sockaddr_in6 src_in6;
570 571 572
	struct sockaddr_in6 peer_in6;
	struct net_conf *nc;
	int err, peer_addr_len, my_addr_len;
573
	int sndbuf_size, rcvbuf_size, connect_int;
P
Philipp Reisner 已提交
574 575
	int disconnect_on_error = 1;

576
	rcu_read_lock();
577
	nc = rcu_dereference(connection->net_conf);
578 579
	if (!nc) {
		rcu_read_unlock();
P
Philipp Reisner 已提交
580
		return NULL;
581 582 583
	}
	sndbuf_size = nc->sndbuf_size;
	rcvbuf_size = nc->rcvbuf_size;
584
	connect_int = nc->connect_int;
585
	rcu_read_unlock();
586

587 588
	my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
	memcpy(&src_in6, &connection->my_addr, my_addr_len);
589

590
	if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
591 592 593 594
		src_in6.sin6_port = 0;
	else
		((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */

595 596
	peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
	memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
P
Philipp Reisner 已提交
597 598

	what = "sock_create_kern";
599 600
	err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
			       SOCK_STREAM, IPPROTO_TCP, &sock);
P
Philipp Reisner 已提交
601 602 603 604 605 606
	if (err < 0) {
		sock = NULL;
		goto out;
	}

	sock->sk->sk_rcvtimeo =
607
	sock->sk->sk_sndtimeo = connect_int * HZ;
608
	drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
P
Philipp Reisner 已提交
609 610 611 612 613 614 615 616 617

       /* explicitly bind to the configured IP as source IP
	*  for the outgoing connections.
	*  This is needed for multihomed hosts and to be
	*  able to use lo: interfaces for drbd.
	* Make sure to use 0 as port number, so linux selects
	*  a free one dynamically.
	*/
	what = "bind before connect";
618
	err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
P
Philipp Reisner 已提交
619 620 621 622 623 624 625
	if (err < 0)
		goto out;

	/* connect may fail, peer not yet available.
	 * stay C_WF_CONNECTION, don't go Disconnecting! */
	disconnect_on_error = 0;
	what = "connect";
626
	err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
P
Philipp Reisner 已提交
627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643

out:
	if (err < 0) {
		if (sock) {
			sock_release(sock);
			sock = NULL;
		}
		switch (-err) {
			/* timeout, busy, signal pending */
		case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
		case EINTR: case ERESTARTSYS:
			/* peer not (yet) available, network problem */
		case ECONNREFUSED: case ENETUNREACH:
		case EHOSTDOWN:    case EHOSTUNREACH:
			disconnect_on_error = 0;
			break;
		default:
644
			drbd_err(connection, "%s failed, err = %d\n", what, err);
P
Philipp Reisner 已提交
645 646
		}
		if (disconnect_on_error)
647
			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
P
Philipp Reisner 已提交
648
	}
649

P
Philipp Reisner 已提交
650 651 652
	return sock;
}

653
struct accept_wait_data {
654
	struct drbd_connection *connection;
655 656 657 658 659 660
	struct socket *s_listen;
	struct completion door_bell;
	void (*original_sk_state_change)(struct sock *sk);

};

661
static void drbd_incoming_connection(struct sock *sk)
662 663
{
	struct accept_wait_data *ad = sk->sk_user_data;
664
	void (*state_change)(struct sock *sk);
665

666 667 668 669
	state_change = ad->original_sk_state_change;
	if (sk->sk_state == TCP_ESTABLISHED)
		complete(&ad->door_bell);
	state_change(sk);
670 671
}

672
static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
P
Philipp Reisner 已提交
673
{
674
	int err, sndbuf_size, rcvbuf_size, my_addr_len;
675
	struct sockaddr_in6 my_addr;
676
	struct socket *s_listen;
677
	struct net_conf *nc;
P
Philipp Reisner 已提交
678 679
	const char *what;

680
	rcu_read_lock();
681
	nc = rcu_dereference(connection->net_conf);
682 683
	if (!nc) {
		rcu_read_unlock();
684
		return -EIO;
685 686 687 688
	}
	sndbuf_size = nc->sndbuf_size;
	rcvbuf_size = nc->rcvbuf_size;
	rcu_read_unlock();
P
Philipp Reisner 已提交
689

690 691
	my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
	memcpy(&my_addr, &connection->my_addr, my_addr_len);
P
Philipp Reisner 已提交
692 693

	what = "sock_create_kern";
694
	err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
695
			       SOCK_STREAM, IPPROTO_TCP, &s_listen);
P
Philipp Reisner 已提交
696 697 698 699 700
	if (err) {
		s_listen = NULL;
		goto out;
	}

701
	s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
702
	drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
P
Philipp Reisner 已提交
703 704

	what = "bind before listen";
705
	err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
P
Philipp Reisner 已提交
706 707 708
	if (err < 0)
		goto out;

709 710 711
	ad->s_listen = s_listen;
	write_lock_bh(&s_listen->sk->sk_callback_lock);
	ad->original_sk_state_change = s_listen->sk->sk_state_change;
712
	s_listen->sk->sk_state_change = drbd_incoming_connection;
713 714
	s_listen->sk->sk_user_data = ad;
	write_unlock_bh(&s_listen->sk->sk_callback_lock);
P
Philipp Reisner 已提交
715

716 717 718 719 720
	what = "listen";
	err = s_listen->ops->listen(s_listen, 5);
	if (err < 0)
		goto out;

721
	return 0;
P
Philipp Reisner 已提交
722 723 724 725 726
out:
	if (s_listen)
		sock_release(s_listen);
	if (err < 0) {
		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
727
			drbd_err(connection, "%s failed, err = %d\n", what, err);
728
			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
P
Philipp Reisner 已提交
729 730 731
		}
	}

732
	return -EIO;
P
Philipp Reisner 已提交
733 734
}

735
static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
P
Philipp Reisner 已提交
736
{
737 738 739 740
	write_lock_bh(&sk->sk_callback_lock);
	sk->sk_state_change = ad->original_sk_state_change;
	sk->sk_user_data = NULL;
	write_unlock_bh(&sk->sk_callback_lock);
P
Philipp Reisner 已提交
741 742
}

743
static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
P
Philipp Reisner 已提交
744
{
745 746 747 748 749
	int timeo, connect_int, err = 0;
	struct socket *s_estab = NULL;
	struct net_conf *nc;

	rcu_read_lock();
750
	nc = rcu_dereference(connection->net_conf);
751 752 753 754 755 756 757 758
	if (!nc) {
		rcu_read_unlock();
		return NULL;
	}
	connect_int = nc->connect_int;
	rcu_read_unlock();

	timeo = connect_int * HZ;
759 760
	/* 28.5% random jitter */
	timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
761

762 763 764
	err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
	if (err <= 0)
		return NULL;
P
Philipp Reisner 已提交
765

766
	err = kernel_accept(ad->s_listen, &s_estab, 0);
P
Philipp Reisner 已提交
767 768
	if (err < 0) {
		if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
769
			drbd_err(connection, "accept failed, err = %d\n", err);
770
			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
P
Philipp Reisner 已提交
771 772 773
		}
	}

774 775
	if (s_estab)
		unregister_state_change(s_estab->sk, ad);
P
Philipp Reisner 已提交
776 777 778 779

	return s_estab;
}

780
static int decode_header(struct drbd_connection *, void *, struct packet_info *);
P
Philipp Reisner 已提交
781

782
static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
783 784
			     enum drbd_packet cmd)
{
785
	if (!conn_prepare_command(connection, sock))
786
		return -EIO;
787
	return conn_send_command(connection, sock, cmd, 0, NULL, 0);
P
Philipp Reisner 已提交
788 789
}

790
static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
P
Philipp Reisner 已提交
791
{
792
	unsigned int header_size = drbd_header_size(connection);
793 794
	struct packet_info pi;
	int err;
P
Philipp Reisner 已提交
795

796
	err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
797 798 799 800 801
	if (err != header_size) {
		if (err >= 0)
			err = -EIO;
		return err;
	}
802
	err = decode_header(connection, connection->data.rbuf, &pi);
803 804 805
	if (err)
		return err;
	return pi.cmd;
P
Philipp Reisner 已提交
806 807 808 809 810 811
}

/**
 * drbd_socket_okay() - Free the socket if its connection is not okay
 * @sock:	pointer to the pointer to the socket.
 */
812
static int drbd_socket_okay(struct socket **sock)
P
Philipp Reisner 已提交
813 814 815 816 817
{
	int rr;
	char tb[4];

	if (!*sock)
818
		return false;
P
Philipp Reisner 已提交
819

820
	rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
P
Philipp Reisner 已提交
821 822

	if (rr > 0 || rr == -EAGAIN) {
823
		return true;
P
Philipp Reisner 已提交
824 825 826
	} else {
		sock_release(*sock);
		*sock = NULL;
827
		return false;
P
Philipp Reisner 已提交
828 829
	}
}
830 831
/* Gets called if a connection is established, or if a new minor gets created
   in a connection */
832
int drbd_connected(struct drbd_peer_device *peer_device)
833
{
834
	struct drbd_device *device = peer_device->device;
835
	int err;
836

837 838
	atomic_set(&device->packet_seq, 0);
	device->peer_seq = 0;
839

840 841
	device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
		&peer_device->connection->cstate_mutex :
842
		&device->own_state_mutex;
843

844
	err = drbd_send_sync_param(peer_device);
845
	if (!err)
846
		err = drbd_send_sizes(peer_device, 0, 0);
847
	if (!err)
848
		err = drbd_send_uuids(peer_device);
849
	if (!err)
850
		err = drbd_send_current_state(peer_device);
851 852 853 854
	clear_bit(USE_DEGR_WFC_T, &device->flags);
	clear_bit(RESIZE_PENDING, &device->flags);
	atomic_set(&device->ap_in_flight, 0);
	mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
855
	return err;
856
}
P
Philipp Reisner 已提交
857 858 859 860 861 862 863 864 865

/*
 * return values:
 *   1 yes, we have a valid connection
 *   0 oops, did not work out, please try again
 *  -1 peer talks different language,
 *     no point in trying again, please go standalone.
 *  -2 We do not have a network config...
 */
866
static int conn_connect(struct drbd_connection *connection)
P
Philipp Reisner 已提交
867
{
868
	struct drbd_socket sock, msock;
869
	struct drbd_peer_device *peer_device;
870
	struct net_conf *nc;
871
	int vnr, timeout, h, ok;
872
	bool discard_my_data;
873
	enum drbd_state_rv rv;
874
	struct accept_wait_data ad = {
875
		.connection = connection,
876 877
		.door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
	};
P
Philipp Reisner 已提交
878

879 880
	clear_bit(DISCONNECT_SENT, &connection->flags);
	if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
P
Philipp Reisner 已提交
881 882
		return -2;

883
	mutex_init(&sock.mutex);
884 885
	sock.sbuf = connection->data.sbuf;
	sock.rbuf = connection->data.rbuf;
886 887
	sock.socket = NULL;
	mutex_init(&msock.mutex);
888 889
	msock.sbuf = connection->meta.sbuf;
	msock.rbuf = connection->meta.rbuf;
890 891
	msock.socket = NULL;

892
	/* Assume that the peer only understands protocol 80 until we know better.  */
893
	connection->agreed_pro_version = 80;
P
Philipp Reisner 已提交
894

895
	if (prepare_listen_socket(connection, &ad))
896
		return 0;
P
Philipp Reisner 已提交
897 898

	do {
899
		struct socket *s;
P
Philipp Reisner 已提交
900

901
		s = drbd_try_connect(connection);
P
Philipp Reisner 已提交
902
		if (s) {
903 904
			if (!sock.socket) {
				sock.socket = s;
905
				send_first_packet(connection, &sock, P_INITIAL_DATA);
906
			} else if (!msock.socket) {
907
				clear_bit(RESOLVE_CONFLICTS, &connection->flags);
908
				msock.socket = s;
909
				send_first_packet(connection, &msock, P_INITIAL_META);
P
Philipp Reisner 已提交
910
			} else {
911
				drbd_err(connection, "Logic error in conn_connect()\n");
P
Philipp Reisner 已提交
912 913 914 915
				goto out_release_sockets;
			}
		}

916 917
		if (sock.socket && msock.socket) {
			rcu_read_lock();
918
			nc = rcu_dereference(connection->net_conf);
919 920 921 922 923
			timeout = nc->ping_timeo * HZ / 10;
			rcu_read_unlock();
			schedule_timeout_interruptible(timeout);
			ok = drbd_socket_okay(&sock.socket);
			ok = drbd_socket_okay(&msock.socket) && ok;
P
Philipp Reisner 已提交
924 925 926 927 928
			if (ok)
				break;
		}

retry:
929
		s = drbd_wait_for_connect(connection, &ad);
P
Philipp Reisner 已提交
930
		if (s) {
931
			int fp = receive_first_packet(connection, s);
932 933
			drbd_socket_okay(&sock.socket);
			drbd_socket_okay(&msock.socket);
934
			switch (fp) {
935
			case P_INITIAL_DATA:
936
				if (sock.socket) {
937
					drbd_warn(connection, "initial packet S crossed\n");
938
					sock_release(sock.socket);
939 940
					sock.socket = s;
					goto randomize;
P
Philipp Reisner 已提交
941
				}
942
				sock.socket = s;
P
Philipp Reisner 已提交
943
				break;
944
			case P_INITIAL_META:
945
				set_bit(RESOLVE_CONFLICTS, &connection->flags);
946
				if (msock.socket) {
947
					drbd_warn(connection, "initial packet M crossed\n");
948
					sock_release(msock.socket);
949 950
					msock.socket = s;
					goto randomize;
P
Philipp Reisner 已提交
951
				}
952
				msock.socket = s;
P
Philipp Reisner 已提交
953 954
				break;
			default:
955
				drbd_warn(connection, "Error receiving initial packet\n");
P
Philipp Reisner 已提交
956
				sock_release(s);
957
randomize:
958
				if (prandom_u32() & 1)
P
Philipp Reisner 已提交
959 960 961 962
					goto retry;
			}
		}

963
		if (connection->cstate <= C_DISCONNECTING)
P
Philipp Reisner 已提交
964 965 966 967
			goto out_release_sockets;
		if (signal_pending(current)) {
			flush_signals(current);
			smp_rmb();
968
			if (get_t_state(&connection->receiver) == EXITING)
P
Philipp Reisner 已提交
969 970 971
				goto out_release_sockets;
		}

972 973 974
		ok = drbd_socket_okay(&sock.socket);
		ok = drbd_socket_okay(&msock.socket) && ok;
	} while (!ok);
P
Philipp Reisner 已提交
975

976 977
	if (ad.s_listen)
		sock_release(ad.s_listen);
P
Philipp Reisner 已提交
978

979 980
	sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
	msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
P
Philipp Reisner 已提交
981

982 983
	sock.socket->sk->sk_allocation = GFP_NOIO;
	msock.socket->sk->sk_allocation = GFP_NOIO;
P
Philipp Reisner 已提交
984

985 986
	sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
	msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
P
Philipp Reisner 已提交
987 988

	/* NOT YET ...
989
	 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
990
	 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
991
	 * first set it to the P_CONNECTION_FEATURES timeout,
P
Philipp Reisner 已提交
992
	 * which we set to 4x the configured ping_timeout. */
993
	rcu_read_lock();
994
	nc = rcu_dereference(connection->net_conf);
995

996 997
	sock.socket->sk->sk_sndtimeo =
	sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
P
Philipp Reisner 已提交
998

999
	msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1000
	timeout = nc->timeout * HZ / 10;
1001
	discard_my_data = nc->discard_my_data;
1002
	rcu_read_unlock();
P
Philipp Reisner 已提交
1003

1004
	msock.socket->sk->sk_sndtimeo = timeout;
P
Philipp Reisner 已提交
1005 1006

	/* we don't want delays.
L
Lucas De Marchi 已提交
1007
	 * we use TCP_CORK where appropriate, though */
1008 1009
	drbd_tcp_nodelay(sock.socket);
	drbd_tcp_nodelay(msock.socket);
P
Philipp Reisner 已提交
1010

1011 1012 1013
	connection->data.socket = sock.socket;
	connection->meta.socket = msock.socket;
	connection->last_received = jiffies;
P
Philipp Reisner 已提交
1014

1015
	h = drbd_do_features(connection);
P
Philipp Reisner 已提交
1016 1017 1018
	if (h <= 0)
		return h;

1019
	if (connection->cram_hmac_tfm) {
1020
		/* drbd_request_state(device, NS(conn, WFAuth)); */
1021
		switch (drbd_do_auth(connection)) {
1022
		case -1:
1023
			drbd_err(connection, "Authentication of peer failed\n");
P
Philipp Reisner 已提交
1024
			return -1;
1025
		case 0:
1026
			drbd_err(connection, "Authentication of peer failed, trying again.\n");
1027
			return 0;
P
Philipp Reisner 已提交
1028 1029 1030
		}
	}

1031 1032
	connection->data.socket->sk->sk_sndtimeo = timeout;
	connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
P
Philipp Reisner 已提交
1033

1034
	if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1035
		return -1;
P
Philipp Reisner 已提交
1036

1037 1038 1039 1040 1041 1042 1043 1044 1045 1046
	/* Prevent a race between resync-handshake and
	 * being promoted to Primary.
	 *
	 * Grab and release the state mutex, so we know that any current
	 * drbd_set_role() is finished, and any incoming drbd_set_role
	 * will see the STATE_SENT flag, and wait for it to be cleared.
	 */
	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
		mutex_lock(peer_device->device->state_mutex);

1047
	set_bit(STATE_SENT, &connection->flags);
1048

1049 1050 1051
	idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
		mutex_unlock(peer_device->device->state_mutex);

P
Philipp Reisner 已提交
1052
	rcu_read_lock();
1053 1054
	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
		struct drbd_device *device = peer_device->device;
1055
		kref_get(&device->kref);
1056 1057
		rcu_read_unlock();

1058
		if (discard_my_data)
1059
			set_bit(DISCARD_MY_DATA, &device->flags);
1060
		else
1061
			clear_bit(DISCARD_MY_DATA, &device->flags);
1062

1063
		drbd_connected(peer_device);
1064
		kref_put(&device->kref, drbd_destroy_device);
P
Philipp Reisner 已提交
1065 1066 1067 1068
		rcu_read_lock();
	}
	rcu_read_unlock();

1069 1070 1071
	rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
	if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
		clear_bit(STATE_SENT, &connection->flags);
1072
		return 0;
1073
	}
1074

1075
	drbd_thread_start(&connection->asender);
P
Philipp Reisner 已提交
1076

1077
	mutex_lock(&connection->resource->conf_update);
1078 1079 1080 1081
	/* The discard_my_data flag is a single-shot modifier to the next
	 * connection attempt, the handshake of which is now well underway.
	 * No need for rcu style copying of the whole struct
	 * just to clear a single value. */
1082
	connection->net_conf->discard_my_data = 0;
1083
	mutex_unlock(&connection->resource->conf_update);
1084

1085
	return h;
P
Philipp Reisner 已提交
1086 1087

out_release_sockets:
1088 1089
	if (ad.s_listen)
		sock_release(ad.s_listen);
1090 1091 1092 1093
	if (sock.socket)
		sock_release(sock.socket);
	if (msock.socket)
		sock_release(msock.socket);
P
Philipp Reisner 已提交
1094 1095 1096
	return -1;
}

1097
static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
P
Philipp Reisner 已提交
1098
{
1099
	unsigned int header_size = drbd_header_size(connection);
1100

1101 1102 1103 1104
	if (header_size == sizeof(struct p_header100) &&
	    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
		struct p_header100 *h = header;
		if (h->pad != 0) {
1105
			drbd_err(connection, "Header padding is not zero\n");
1106 1107 1108 1109 1110 1111 1112
			return -EINVAL;
		}
		pi->vnr = be16_to_cpu(h->volume);
		pi->cmd = be16_to_cpu(h->command);
		pi->size = be32_to_cpu(h->length);
	} else if (header_size == sizeof(struct p_header95) &&
		   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1113 1114
		struct p_header95 *h = header;
		pi->cmd = be16_to_cpu(h->command);
1115 1116
		pi->size = be32_to_cpu(h->length);
		pi->vnr = 0;
1117 1118 1119 1120 1121
	} else if (header_size == sizeof(struct p_header80) &&
		   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
		struct p_header80 *h = header;
		pi->cmd = be16_to_cpu(h->command);
		pi->size = be16_to_cpu(h->length);
1122
		pi->vnr = 0;
1123
	} else {
1124
		drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1125
			 be32_to_cpu(*(__be32 *)header),
1126
			 connection->agreed_pro_version);
1127
		return -EINVAL;
P
Philipp Reisner 已提交
1128
	}
1129
	pi->data = header + header_size;
1130
	return 0;
1131
}
P
Philipp Reisner 已提交
1132

1133
static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1134
{
1135
	void *buffer = connection->data.rbuf;
1136
	int err;
1137

1138
	err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1139
	if (err)
1140
		return err;
1141

1142 1143
	err = decode_header(connection, buffer, pi);
	connection->last_received = jiffies;
P
Philipp Reisner 已提交
1144

1145
	return err;
P
Philipp Reisner 已提交
1146 1147
}

1148
static void drbd_flush(struct drbd_connection *connection)
P
Philipp Reisner 已提交
1149 1150
{
	int rv;
1151
	struct drbd_peer_device *peer_device;
1152 1153
	int vnr;

1154
	if (connection->write_ordering >= WO_bdev_flush) {
1155
		rcu_read_lock();
1156 1157 1158
		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
			struct drbd_device *device = peer_device->device;

1159
			if (!get_ldev(device))
1160
				continue;
1161
			kref_get(&device->kref);
1162 1163
			rcu_read_unlock();

1164
			rv = blkdev_issue_flush(device->ldev->backing_bdev,
1165 1166
					GFP_NOIO, NULL);
			if (rv) {
1167
				drbd_info(device, "local disk flush failed with status %d\n", rv);
1168 1169 1170
				/* would rather check on EOPNOTSUPP, but that is not reliable.
				 * don't try again for ANY return value != 0
				 * if (rv == -EOPNOTSUPP) */
1171
				drbd_bump_write_ordering(connection, WO_drain_io);
1172
			}
1173
			put_ldev(device);
1174
			kref_put(&device->kref, drbd_destroy_device);
P
Philipp Reisner 已提交
1175

1176 1177 1178
			rcu_read_lock();
			if (rv)
				break;
P
Philipp Reisner 已提交
1179
		}
1180
		rcu_read_unlock();
P
Philipp Reisner 已提交
1181 1182 1183 1184 1185
	}
}

/**
 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1186
 * @device:	DRBD device.
P
Philipp Reisner 已提交
1187 1188 1189
 * @epoch:	Epoch object.
 * @ev:		Epoch event.
 */
1190
static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
P
Philipp Reisner 已提交
1191 1192 1193
					       struct drbd_epoch *epoch,
					       enum epoch_event ev)
{
1194
	int epoch_size;
P
Philipp Reisner 已提交
1195 1196 1197
	struct drbd_epoch *next_epoch;
	enum finish_epoch rv = FE_STILL_LIVE;

1198
	spin_lock(&connection->epoch_lock);
P
Philipp Reisner 已提交
1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217
	do {
		next_epoch = NULL;

		epoch_size = atomic_read(&epoch->epoch_size);

		switch (ev & ~EV_CLEANUP) {
		case EV_PUT:
			atomic_dec(&epoch->active);
			break;
		case EV_GOT_BARRIER_NR:
			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
			break;
		case EV_BECAME_LAST:
			/* nothing to do*/
			break;
		}

		if (epoch_size != 0 &&
		    atomic_read(&epoch->active) == 0 &&
1218
		    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
P
Philipp Reisner 已提交
1219
			if (!(ev & EV_CLEANUP)) {
1220 1221 1222
				spin_unlock(&connection->epoch_lock);
				drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
				spin_lock(&connection->epoch_lock);
P
Philipp Reisner 已提交
1223
			}
1224 1225 1226
#if 0
			/* FIXME: dec unacked on connection, once we have
			 * something to count pending connection packets in. */
1227
			if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1228
				dec_unacked(epoch->connection);
1229
#endif
P
Philipp Reisner 已提交
1230

1231
			if (connection->current_epoch != epoch) {
P
Philipp Reisner 已提交
1232 1233 1234
				next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
				list_del(&epoch->list);
				ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1235
				connection->epochs--;
P
Philipp Reisner 已提交
1236 1237 1238 1239 1240 1241 1242
				kfree(epoch);

				if (rv == FE_STILL_LIVE)
					rv = FE_DESTROYED;
			} else {
				epoch->flags = 0;
				atomic_set(&epoch->epoch_size, 0);
1243
				/* atomic_set(&epoch->active, 0); is already zero */
P
Philipp Reisner 已提交
1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254
				if (rv == FE_STILL_LIVE)
					rv = FE_RECYCLED;
			}
		}

		if (!next_epoch)
			break;

		epoch = next_epoch;
	} while (1);

1255
	spin_unlock(&connection->epoch_lock);
P
Philipp Reisner 已提交
1256 1257 1258 1259 1260 1261

	return rv;
}

/**
 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1262
 * @connection:	DRBD connection.
P
Philipp Reisner 已提交
1263 1264
 * @wo:		Write ordering method to try.
 */
1265
void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo)
P
Philipp Reisner 已提交
1266
{
P
Philipp Reisner 已提交
1267
	struct disk_conf *dc;
1268
	struct drbd_peer_device *peer_device;
P
Philipp Reisner 已提交
1269
	enum write_ordering_e pwo;
1270
	int vnr;
P
Philipp Reisner 已提交
1271 1272 1273 1274 1275 1276
	static char *write_ordering_str[] = {
		[WO_none] = "none",
		[WO_drain_io] = "drain",
		[WO_bdev_flush] = "flush",
	};

1277
	pwo = connection->write_ordering;
P
Philipp Reisner 已提交
1278
	wo = min(pwo, wo);
P
Philipp Reisner 已提交
1279
	rcu_read_lock();
1280 1281 1282
	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
		struct drbd_device *device = peer_device->device;

1283
		if (!get_ldev_if_state(device, D_ATTACHING))
1284
			continue;
1285
		dc = rcu_dereference(device->ldev->disk_conf);
1286 1287 1288 1289 1290

		if (wo == WO_bdev_flush && !dc->disk_flushes)
			wo = WO_drain_io;
		if (wo == WO_drain_io && !dc->disk_drain)
			wo = WO_none;
1291
		put_ldev(device);
1292
	}
P
Philipp Reisner 已提交
1293
	rcu_read_unlock();
1294 1295
	connection->write_ordering = wo;
	if (pwo != connection->write_ordering || wo == WO_bdev_flush)
1296
		drbd_info(connection, "Method to ensure write ordering: %s\n", write_ordering_str[connection->write_ordering]);
P
Philipp Reisner 已提交
1297 1298
}

1299
/**
1300
 * drbd_submit_peer_request()
1301
 * @device:	DRBD device.
1302
 * @peer_req:	peer request
1303
 * @rw:		flag field, see bio->bi_rw
1304 1305 1306 1307 1308 1309 1310 1311 1312 1313
 *
 * May spread the pages to multiple bios,
 * depending on bio_add_page restrictions.
 *
 * Returns 0 if all bios have been submitted,
 * -ENOMEM if we could not allocate enough bios,
 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
 *  single page to an empty bio (which should never happen and likely indicates
 *  that the lower level IO stack is in some way broken). This has been observed
 *  on certain Xen deployments.
1314 1315
 */
/* TODO allocate from our own bio_set. */
1316
int drbd_submit_peer_request(struct drbd_device *device,
1317 1318
			     struct drbd_peer_request *peer_req,
			     const unsigned rw, const int fault_type)
1319 1320 1321
{
	struct bio *bios = NULL;
	struct bio *bio;
1322 1323 1324
	struct page *page = peer_req->pages;
	sector_t sector = peer_req->i.sector;
	unsigned ds = peer_req->i.size;
1325 1326
	unsigned n_bios = 0;
	unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1327
	int err = -ENOMEM;
1328

1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342
	if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) {
		/* wait for all pending IO completions, before we start
		 * zeroing things out. */
		conn_wait_active_ee_empty(first_peer_device(device)->connection);
		if (blkdev_issue_zeroout(device->ldev->backing_bdev,
			sector, ds >> 9, GFP_NOIO))
			peer_req->flags |= EE_WAS_ERROR;
		drbd_endio_write_sec_final(peer_req);
		return 0;
	}

	if (peer_req->flags & EE_IS_TRIM)
		nr_pages = 0; /* discards don't have any payload. */

1343 1344 1345
	/* In most cases, we will only need one bio.  But in case the lower
	 * level restrictions happen to be different at this offset on this
	 * side than those of the sending peer, we may need to submit the
1346 1347 1348 1349 1350
	 * request in more than one bio.
	 *
	 * Plain bio_alloc is good enough here, this is no DRBD internally
	 * generated bio, but a bio allocated on behalf of the peer.
	 */
1351 1352 1353
next_bio:
	bio = bio_alloc(GFP_NOIO, nr_pages);
	if (!bio) {
1354
		drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1355 1356
		goto fail;
	}
1357
	/* > peer_req->i.sector, unless this is the first bio */
1358
	bio->bi_iter.bi_sector = sector;
1359
	bio->bi_bdev = device->ldev->backing_bdev;
1360
	bio->bi_rw = rw;
1361
	bio->bi_private = peer_req;
1362
	bio->bi_end_io = drbd_peer_request_endio;
1363 1364 1365 1366 1367

	bio->bi_next = bios;
	bios = bio;
	++n_bios;

1368 1369 1370 1371 1372
	if (rw & REQ_DISCARD) {
		bio->bi_iter.bi_size = ds;
		goto submit;
	}

1373 1374 1375
	page_chain_for_each(page) {
		unsigned len = min_t(unsigned, ds, PAGE_SIZE);
		if (!bio_add_page(bio, page, len, 0)) {
1376 1377 1378 1379
			/* A single page must always be possible!
			 * But in case it fails anyways,
			 * we deal with it, and complain (below). */
			if (bio->bi_vcnt == 0) {
1380
				drbd_err(device,
1381 1382
					"bio_add_page failed for len=%u, "
					"bi_vcnt=0 (bi_sector=%llu)\n",
1383
					len, (uint64_t)bio->bi_iter.bi_sector);
1384 1385 1386
				err = -ENOSPC;
				goto fail;
			}
1387 1388 1389 1390 1391 1392
			goto next_bio;
		}
		ds -= len;
		sector += len >> 9;
		--nr_pages;
	}
1393
	D_ASSERT(device, ds == 0);
1394 1395
submit:
	D_ASSERT(device, page == NULL);
1396

1397
	atomic_set(&peer_req->pending_bios, n_bios);
1398 1399 1400 1401 1402
	do {
		bio = bios;
		bios = bios->bi_next;
		bio->bi_next = NULL;

1403
		drbd_generic_make_request(device, fault_type, bio);
1404 1405 1406 1407 1408 1409 1410 1411 1412
	} while (bios);
	return 0;

fail:
	while (bios) {
		bio = bios;
		bios = bios->bi_next;
		bio_put(bio);
	}
1413
	return err;
1414 1415
}

1416
static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1417
					     struct drbd_peer_request *peer_req)
1418
{
1419
	struct drbd_interval *i = &peer_req->i;
1420

1421
	drbd_remove_interval(&device->write_requests, i);
1422 1423
	drbd_clear_interval(i);

A
Andreas Gruenbacher 已提交
1424
	/* Wake up any processes waiting for this peer request to complete.  */
1425
	if (i->waiting)
1426
		wake_up(&device->misc_wait);
1427 1428
}

1429
static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1430
{
1431
	struct drbd_peer_device *peer_device;
1432 1433 1434
	int vnr;

	rcu_read_lock();
1435 1436 1437
	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
		struct drbd_device *device = peer_device->device;

1438
		kref_get(&device->kref);
1439
		rcu_read_unlock();
1440
		drbd_wait_ee_list_empty(device, &device->active_ee);
1441
		kref_put(&device->kref, drbd_destroy_device);
1442 1443 1444 1445 1446
		rcu_read_lock();
	}
	rcu_read_unlock();
}

1447 1448 1449 1450 1451 1452
static struct drbd_peer_device *
conn_peer_device(struct drbd_connection *connection, int volume_number)
{
	return idr_find(&connection->peer_devices, volume_number);
}

1453
static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
1454
{
1455
	int rv;
1456
	struct p_barrier *p = pi->data;
P
Philipp Reisner 已提交
1457 1458
	struct drbd_epoch *epoch;

1459 1460 1461
	/* FIXME these are unacked on connection,
	 * not a specific (peer)device.
	 */
1462 1463 1464
	connection->current_epoch->barrier_nr = p->barrier;
	connection->current_epoch->connection = connection;
	rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
P
Philipp Reisner 已提交
1465 1466 1467 1468 1469 1470

	/* P_BARRIER_ACK may imply that the corresponding extent is dropped from
	 * the activity log, which means it would not be resynced in case the
	 * R_PRIMARY crashes now.
	 * Therefore we must send the barrier_ack after the barrier request was
	 * completed. */
1471
	switch (connection->write_ordering) {
P
Philipp Reisner 已提交
1472 1473
	case WO_none:
		if (rv == FE_RECYCLED)
1474
			return 0;
1475 1476 1477 1478 1479 1480 1481

		/* receiver context, in the writeout path of the other node.
		 * avoid potential distributed deadlock */
		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
		if (epoch)
			break;
		else
1482
			drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1483
			/* Fall through */
P
Philipp Reisner 已提交
1484 1485 1486

	case WO_bdev_flush:
	case WO_drain_io:
1487 1488
		conn_wait_active_ee_empty(connection);
		drbd_flush(connection);
1489

1490
		if (atomic_read(&connection->current_epoch->epoch_size)) {
1491 1492 1493
			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
			if (epoch)
				break;
P
Philipp Reisner 已提交
1494 1495
		}

1496
		return 0;
1497
	default:
1498
		drbd_err(connection, "Strangeness in connection->write_ordering %d\n", connection->write_ordering);
1499
		return -EIO;
P
Philipp Reisner 已提交
1500 1501 1502 1503 1504 1505
	}

	epoch->flags = 0;
	atomic_set(&epoch->epoch_size, 0);
	atomic_set(&epoch->active, 0);

1506 1507 1508 1509 1510
	spin_lock(&connection->epoch_lock);
	if (atomic_read(&connection->current_epoch->epoch_size)) {
		list_add(&epoch->list, &connection->current_epoch->list);
		connection->current_epoch = epoch;
		connection->epochs++;
P
Philipp Reisner 已提交
1511 1512 1513 1514
	} else {
		/* The current_epoch got recycled while we allocated this one... */
		kfree(epoch);
	}
1515
	spin_unlock(&connection->epoch_lock);
P
Philipp Reisner 已提交
1516

1517
	return 0;
P
Philipp Reisner 已提交
1518 1519 1520 1521
}

/* used from receive_RSDataReply (recv_resync_read)
 * and from receive_Data */
1522
static struct drbd_peer_request *
1523
read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1524
	      struct packet_info *pi) __must_hold(local)
P
Philipp Reisner 已提交
1525
{
1526
	struct drbd_device *device = peer_device->device;
1527
	const sector_t capacity = drbd_get_capacity(device->this_bdev);
1528
	struct drbd_peer_request *peer_req;
P
Philipp Reisner 已提交
1529
	struct page *page;
1530
	int dgs, ds, err;
1531
	int data_size = pi->size;
1532 1533
	void *dig_in = peer_device->connection->int_dig_in;
	void *dig_vv = peer_device->connection->int_dig_vv;
1534
	unsigned long *data;
1535
	struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
P
Philipp Reisner 已提交
1536

1537
	dgs = 0;
1538
	if (!trim && peer_device->connection->peer_integrity_tfm) {
1539
		dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1540 1541 1542 1543
		/*
		 * FIXME: Receive the incoming digest into the receive buffer
		 *	  here, together with its struct p_data?
		 */
1544
		err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
1545
		if (err)
P
Philipp Reisner 已提交
1546
			return NULL;
1547
		data_size -= dgs;
P
Philipp Reisner 已提交
1548 1549
	}

1550 1551 1552 1553 1554
	if (trim) {
		D_ASSERT(peer_device, data_size == 0);
		data_size = be32_to_cpu(trim->size);
	}

1555 1556
	if (!expect(IS_ALIGNED(data_size, 512)))
		return NULL;
1557 1558
	/* prepare for larger trim requests. */
	if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE))
1559
		return NULL;
P
Philipp Reisner 已提交
1560

1561 1562 1563
	/* even though we trust out peer,
	 * we sometimes have to double check. */
	if (sector + (data_size>>9) > capacity) {
1564
		drbd_err(device, "request from peer beyond end of local disk: "
1565
			"capacity: %llus < sector: %llus + size: %u\n",
1566 1567 1568 1569 1570
			(unsigned long long)capacity,
			(unsigned long long)sector, data_size);
		return NULL;
	}

P
Philipp Reisner 已提交
1571 1572 1573
	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
	 * "criss-cross" setup, that might cause write-out on some other DRBD,
	 * which in turn might block on the other node at this very place.  */
1574
	peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO);
1575
	if (!peer_req)
P
Philipp Reisner 已提交
1576
		return NULL;
1577

1578
	if (trim)
1579
		return peer_req;
1580

P
Philipp Reisner 已提交
1581
	ds = data_size;
1582
	page = peer_req->pages;
1583 1584
	page_chain_for_each(page) {
		unsigned len = min_t(int, ds, PAGE_SIZE);
1585
		data = kmap(page);
1586
		err = drbd_recv_all_warn(peer_device->connection, data, len);
1587
		if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1588
			drbd_err(device, "Fault injection: Corrupting data on receive\n");
1589 1590
			data[0] = data[0] ^ (unsigned long)-1;
		}
P
Philipp Reisner 已提交
1591
		kunmap(page);
1592
		if (err) {
1593
			drbd_free_peer_req(device, peer_req);
P
Philipp Reisner 已提交
1594 1595
			return NULL;
		}
1596
		ds -= len;
P
Philipp Reisner 已提交
1597 1598 1599
	}

	if (dgs) {
1600
		drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
P
Philipp Reisner 已提交
1601
		if (memcmp(dig_in, dig_vv, dgs)) {
1602
			drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1603
				(unsigned long long)sector, data_size);
1604
			drbd_free_peer_req(device, peer_req);
P
Philipp Reisner 已提交
1605 1606 1607
			return NULL;
		}
	}
1608
	device->recv_cnt += data_size>>9;
1609
	return peer_req;
P
Philipp Reisner 已提交
1610 1611 1612 1613 1614
}

/* drbd_drain_block() just takes a data block
 * out of the socket input buffer, and discards it.
 */
1615
static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
P
Philipp Reisner 已提交
1616 1617
{
	struct page *page;
1618
	int err = 0;
P
Philipp Reisner 已提交
1619 1620
	void *data;

1621
	if (!data_size)
1622
		return 0;
1623

1624
	page = drbd_alloc_pages(peer_device, 1, 1);
P
Philipp Reisner 已提交
1625 1626 1627

	data = kmap(page);
	while (data_size) {
1628 1629
		unsigned int len = min_t(int, data_size, PAGE_SIZE);

1630
		err = drbd_recv_all_warn(peer_device->connection, data, len);
1631
		if (err)
P
Philipp Reisner 已提交
1632
			break;
1633
		data_size -= len;
P
Philipp Reisner 已提交
1634 1635
	}
	kunmap(page);
1636
	drbd_free_pages(peer_device->device, page, 0);
1637
	return err;
P
Philipp Reisner 已提交
1638 1639
}

1640
static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
P
Philipp Reisner 已提交
1641 1642
			   sector_t sector, int data_size)
{
1643 1644
	struct bio_vec bvec;
	struct bvec_iter iter;
P
Philipp Reisner 已提交
1645
	struct bio *bio;
1646
	int dgs, err, expect;
1647 1648
	void *dig_in = peer_device->connection->int_dig_in;
	void *dig_vv = peer_device->connection->int_dig_vv;
P
Philipp Reisner 已提交
1649

1650
	dgs = 0;
1651 1652 1653
	if (peer_device->connection->peer_integrity_tfm) {
		dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
		err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
1654 1655
		if (err)
			return err;
1656
		data_size -= dgs;
P
Philipp Reisner 已提交
1657 1658 1659 1660
	}

	/* optimistically update recv_cnt.  if receiving fails below,
	 * we disconnect anyways, and counters will be reset. */
1661
	peer_device->device->recv_cnt += data_size>>9;
P
Philipp Reisner 已提交
1662 1663

	bio = req->master_bio;
1664
	D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
P
Philipp Reisner 已提交
1665

1666 1667 1668
	bio_for_each_segment(bvec, bio, iter) {
		void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
		expect = min_t(int, data_size, bvec.bv_len);
1669
		err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1670
		kunmap(bvec.bv_page);
1671 1672 1673
		if (err)
			return err;
		data_size -= expect;
P
Philipp Reisner 已提交
1674 1675 1676
	}

	if (dgs) {
1677
		drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
P
Philipp Reisner 已提交
1678
		if (memcmp(dig_in, dig_vv, dgs)) {
1679
			drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1680
			return -EINVAL;
P
Philipp Reisner 已提交
1681 1682 1683
		}
	}

1684
	D_ASSERT(peer_device->device, data_size == 0);
1685
	return 0;
P
Philipp Reisner 已提交
1686 1687
}

1688 1689 1690 1691
/*
 * e_end_resync_block() is called in asender context via
 * drbd_finish_peer_reqs().
 */
1692
static int e_end_resync_block(struct drbd_work *w, int unused)
P
Philipp Reisner 已提交
1693
{
1694
	struct drbd_peer_request *peer_req =
1695 1696 1697
		container_of(w, struct drbd_peer_request, w);
	struct drbd_peer_device *peer_device = peer_req->peer_device;
	struct drbd_device *device = peer_device->device;
1698
	sector_t sector = peer_req->i.sector;
1699
	int err;
P
Philipp Reisner 已提交
1700

1701
	D_ASSERT(device, drbd_interval_empty(&peer_req->i));
P
Philipp Reisner 已提交
1702

1703
	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1704
		drbd_set_in_sync(device, sector, peer_req->i.size);
1705
		err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
P
Philipp Reisner 已提交
1706 1707
	} else {
		/* Record failure to sync */
1708
		drbd_rs_failed_io(device, sector, peer_req->i.size);
P
Philipp Reisner 已提交
1709

1710
		err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
P
Philipp Reisner 已提交
1711
	}
1712
	dec_unacked(device);
P
Philipp Reisner 已提交
1713

1714
	return err;
P
Philipp Reisner 已提交
1715 1716
}

1717
static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1718
			    struct packet_info *pi) __releases(local)
P
Philipp Reisner 已提交
1719
{
1720
	struct drbd_device *device = peer_device->device;
1721
	struct drbd_peer_request *peer_req;
P
Philipp Reisner 已提交
1722

1723
	peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1724
	if (!peer_req)
1725
		goto fail;
P
Philipp Reisner 已提交
1726

1727
	dec_rs_pending(device);
P
Philipp Reisner 已提交
1728

1729
	inc_unacked(device);
P
Philipp Reisner 已提交
1730 1731 1732
	/* corresponding dec_unacked() in e_end_resync_block()
	 * respective _drbd_clear_done_ee */

1733
	peer_req->w.cb = e_end_resync_block;
1734

1735
	spin_lock_irq(&device->resource->req_lock);
1736
	list_add(&peer_req->w.list, &device->sync_ee);
1737
	spin_unlock_irq(&device->resource->req_lock);
P
Philipp Reisner 已提交
1738

1739
	atomic_add(pi->size >> 9, &device->rs_sect_ev);
1740
	if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1741
		return 0;
P
Philipp Reisner 已提交
1742

1743
	/* don't care for the reason here */
1744
	drbd_err(device, "submit failed, triggering re-connect\n");
1745
	spin_lock_irq(&device->resource->req_lock);
1746
	list_del(&peer_req->w.list);
1747
	spin_unlock_irq(&device->resource->req_lock);
1748

1749
	drbd_free_peer_req(device, peer_req);
1750
fail:
1751
	put_ldev(device);
1752
	return -EIO;
P
Philipp Reisner 已提交
1753 1754
}

1755
static struct drbd_request *
1756
find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1757
	     sector_t sector, bool missing_ok, const char *func)
1758 1759 1760
{
	struct drbd_request *req;

1761 1762
	/* Request object according to our peer */
	req = (struct drbd_request *)(unsigned long)id;
1763
	if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1764
		return req;
1765
	if (!missing_ok) {
1766
		drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1767 1768
			(unsigned long)id, (unsigned long long)sector);
	}
1769
	return NULL;
P
Philipp Reisner 已提交
1770 1771
}

1772
static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
1773
{
1774
	struct drbd_peer_device *peer_device;
1775
	struct drbd_device *device;
P
Philipp Reisner 已提交
1776 1777
	struct drbd_request *req;
	sector_t sector;
1778
	int err;
1779
	struct p_data *p = pi->data;
1780

1781 1782
	peer_device = conn_peer_device(connection, pi->vnr);
	if (!peer_device)
1783
		return -EIO;
1784
	device = peer_device->device;
P
Philipp Reisner 已提交
1785 1786 1787

	sector = be64_to_cpu(p->sector);

1788
	spin_lock_irq(&device->resource->req_lock);
1789
	req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1790
	spin_unlock_irq(&device->resource->req_lock);
1791
	if (unlikely(!req))
1792
		return -EIO;
P
Philipp Reisner 已提交
1793

B
Bart Van Assche 已提交
1794
	/* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
P
Philipp Reisner 已提交
1795 1796
	 * special casing it there for the various failure cases.
	 * still no race with drbd_fail_pending_reads */
1797
	err = recv_dless_read(peer_device, req, sector, pi->size);
1798
	if (!err)
1799
		req_mod(req, DATA_RECEIVED);
P
Philipp Reisner 已提交
1800 1801 1802 1803
	/* else: nothing. handled from drbd_disconnect...
	 * I don't think we may complete this just yet
	 * in case we are "on-disconnect: freeze" */

1804
	return err;
P
Philipp Reisner 已提交
1805 1806
}

1807
static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
1808
{
1809
	struct drbd_peer_device *peer_device;
1810
	struct drbd_device *device;
P
Philipp Reisner 已提交
1811
	sector_t sector;
1812
	int err;
1813
	struct p_data *p = pi->data;
1814

1815 1816
	peer_device = conn_peer_device(connection, pi->vnr);
	if (!peer_device)
1817
		return -EIO;
1818
	device = peer_device->device;
P
Philipp Reisner 已提交
1819 1820

	sector = be64_to_cpu(p->sector);
1821
	D_ASSERT(device, p->block_id == ID_SYNCER);
P
Philipp Reisner 已提交
1822

1823
	if (get_ldev(device)) {
P
Philipp Reisner 已提交
1824 1825
		/* data is submitted to disk within recv_resync_read.
		 * corresponding put_ldev done below on error,
1826
		 * or in drbd_peer_request_endio. */
1827
		err = recv_resync_read(peer_device, sector, pi);
P
Philipp Reisner 已提交
1828 1829
	} else {
		if (__ratelimit(&drbd_ratelimit_state))
1830
			drbd_err(device, "Can not write resync data to local disk.\n");
P
Philipp Reisner 已提交
1831

1832
		err = drbd_drain_block(peer_device, pi->size);
P
Philipp Reisner 已提交
1833

1834
		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
P
Philipp Reisner 已提交
1835 1836
	}

1837
	atomic_add(pi->size >> 9, &device->rs_sect_in);
1838

1839
	return err;
P
Philipp Reisner 已提交
1840 1841
}

1842
static void restart_conflicting_writes(struct drbd_device *device,
1843
				       sector_t sector, int size)
P
Philipp Reisner 已提交
1844
{
1845 1846 1847
	struct drbd_interval *i;
	struct drbd_request *req;

1848
	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1849 1850 1851 1852 1853 1854
		if (!i->local)
			continue;
		req = container_of(i, struct drbd_request, i);
		if (req->rq_state & RQ_LOCAL_PENDING ||
		    !(req->rq_state & RQ_POSTPONED))
			continue;
1855 1856
		/* as it is RQ_POSTPONED, this will cause it to
		 * be queued on the retry workqueue. */
1857
		__req_mod(req, CONFLICT_RESOLVED, NULL);
1858 1859
	}
}
P
Philipp Reisner 已提交
1860

1861 1862
/*
 * e_end_block() is called in asender context via drbd_finish_peer_reqs().
P
Philipp Reisner 已提交
1863
 */
1864
static int e_end_block(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
1865
{
1866
	struct drbd_peer_request *peer_req =
1867 1868 1869
		container_of(w, struct drbd_peer_request, w);
	struct drbd_peer_device *peer_device = peer_req->peer_device;
	struct drbd_device *device = peer_device->device;
1870
	sector_t sector = peer_req->i.sector;
1871
	int err = 0, pcmd;
P
Philipp Reisner 已提交
1872

1873
	if (peer_req->flags & EE_SEND_WRITE_ACK) {
1874
		if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1875 1876
			pcmd = (device->state.conn >= C_SYNC_SOURCE &&
				device->state.conn <= C_PAUSED_SYNC_T &&
1877
				peer_req->flags & EE_MAY_SET_IN_SYNC) ?
P
Philipp Reisner 已提交
1878
				P_RS_WRITE_ACK : P_WRITE_ACK;
1879
			err = drbd_send_ack(peer_device, pcmd, peer_req);
P
Philipp Reisner 已提交
1880
			if (pcmd == P_RS_WRITE_ACK)
1881
				drbd_set_in_sync(device, sector, peer_req->i.size);
P
Philipp Reisner 已提交
1882
		} else {
1883
			err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
P
Philipp Reisner 已提交
1884 1885 1886
			/* we expect it to be marked out of sync anyways...
			 * maybe assert this?  */
		}
1887
		dec_unacked(device);
P
Philipp Reisner 已提交
1888 1889 1890
	}
	/* we delete from the conflict detection hash _after_ we sent out the
	 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1891
	if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1892
		spin_lock_irq(&device->resource->req_lock);
1893
		D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1894
		drbd_remove_epoch_entry_interval(device, peer_req);
1895
		if (peer_req->flags & EE_RESTART_REQUESTS)
1896
			restart_conflicting_writes(device, sector, peer_req->i.size);
1897
		spin_unlock_irq(&device->resource->req_lock);
1898
	} else
1899
		D_ASSERT(device, drbd_interval_empty(&peer_req->i));
P
Philipp Reisner 已提交
1900

1901
	drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
P
Philipp Reisner 已提交
1902

1903
	return err;
P
Philipp Reisner 已提交
1904 1905
}

1906
static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
P
Philipp Reisner 已提交
1907
{
1908
	struct drbd_peer_request *peer_req =
1909 1910
		container_of(w, struct drbd_peer_request, w);
	struct drbd_peer_device *peer_device = peer_req->peer_device;
1911
	int err;
P
Philipp Reisner 已提交
1912

1913 1914
	err = drbd_send_ack(peer_device, ack, peer_req);
	dec_unacked(peer_device->device);
P
Philipp Reisner 已提交
1915

1916
	return err;
P
Philipp Reisner 已提交
1917 1918
}

1919
static int e_send_superseded(struct drbd_work *w, int unused)
1920
{
1921
	return e_send_ack(w, P_SUPERSEDED);
1922 1923
}

1924
static int e_send_retry_write(struct drbd_work *w, int unused)
1925
{
1926 1927 1928
	struct drbd_peer_request *peer_req =
		container_of(w, struct drbd_peer_request, w);
	struct drbd_connection *connection = peer_req->peer_device->connection;
1929

1930
	return e_send_ack(w, connection->agreed_pro_version >= 100 ?
1931
			     P_RETRY_WRITE : P_SUPERSEDED);
1932
}
P
Philipp Reisner 已提交
1933

1934 1935 1936 1937 1938 1939 1940 1941 1942
static bool seq_greater(u32 a, u32 b)
{
	/*
	 * We assume 32-bit wrap-around here.
	 * For 24-bit wrap-around, we would have to shift:
	 *  a <<= 8; b <<= 8;
	 */
	return (s32)a - (s32)b > 0;
}
P
Philipp Reisner 已提交
1943

1944 1945 1946
static u32 seq_max(u32 a, u32 b)
{
	return seq_greater(a, b) ? a : b;
P
Philipp Reisner 已提交
1947 1948
}

1949
static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
1950
{
1951
	struct drbd_device *device = peer_device->device;
1952
	unsigned int newest_peer_seq;
1953

1954
	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
1955 1956 1957 1958 1959
		spin_lock(&device->peer_seq_lock);
		newest_peer_seq = seq_max(device->peer_seq, peer_seq);
		device->peer_seq = newest_peer_seq;
		spin_unlock(&device->peer_seq_lock);
		/* wake up only if we actually changed device->peer_seq */
1960
		if (peer_seq == newest_peer_seq)
1961
			wake_up(&device->seq_wait);
1962
	}
P
Philipp Reisner 已提交
1963 1964
}

1965
static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1966
{
1967 1968
	return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
}
1969

1970
/* maybe change sync_ee into interval trees as well? */
1971
static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
1972 1973
{
	struct drbd_peer_request *rs_req;
1974 1975
	bool rv = 0;

1976
	spin_lock_irq(&device->resource->req_lock);
1977
	list_for_each_entry(rs_req, &device->sync_ee, w.list) {
1978 1979
		if (overlaps(peer_req->i.sector, peer_req->i.size,
			     rs_req->i.sector, rs_req->i.size)) {
1980 1981 1982 1983
			rv = 1;
			break;
		}
	}
1984
	spin_unlock_irq(&device->resource->req_lock);
1985 1986 1987 1988

	return rv;
}

P
Philipp Reisner 已提交
1989 1990 1991 1992 1993 1994 1995 1996 1997
/* Called from receive_Data.
 * Synchronize packets on sock with packets on msock.
 *
 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
 * packet traveling on msock, they are still processed in the order they have
 * been sent.
 *
 * Note: we don't care for Ack packets overtaking P_DATA packets.
 *
1998
 * In case packet_seq is larger than device->peer_seq number, there are
P
Philipp Reisner 已提交
1999
 * outstanding packets on the msock. We wait for them to arrive.
2000
 * In case we are the logically next packet, we update device->peer_seq
P
Philipp Reisner 已提交
2001 2002 2003 2004 2005 2006 2007 2008 2009
 * ourselves. Correctly handles 32bit wrap around.
 *
 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
 *
 * returns 0 if we may process the packet,
 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2010
static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
P
Philipp Reisner 已提交
2011
{
2012
	struct drbd_device *device = peer_device->device;
P
Philipp Reisner 已提交
2013 2014
	DEFINE_WAIT(wait);
	long timeout;
2015
	int ret = 0, tp;
2016

2017
	if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2018 2019
		return 0;

2020
	spin_lock(&device->peer_seq_lock);
P
Philipp Reisner 已提交
2021
	for (;;) {
2022 2023
		if (!seq_greater(peer_seq - 1, device->peer_seq)) {
			device->peer_seq = seq_max(device->peer_seq, peer_seq);
P
Philipp Reisner 已提交
2024
			break;
2025
		}
2026

P
Philipp Reisner 已提交
2027 2028 2029 2030
		if (signal_pending(current)) {
			ret = -ERESTARTSYS;
			break;
		}
2031 2032

		rcu_read_lock();
2033
		tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries;
2034 2035 2036 2037 2038 2039
		rcu_read_unlock();

		if (!tp)
			break;

		/* Only need to wait if two_primaries is enabled */
2040 2041
		prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
		spin_unlock(&device->peer_seq_lock);
2042
		rcu_read_lock();
2043
		timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2044
		rcu_read_unlock();
2045
		timeout = schedule_timeout(timeout);
2046
		spin_lock(&device->peer_seq_lock);
2047
		if (!timeout) {
P
Philipp Reisner 已提交
2048
			ret = -ETIMEDOUT;
2049
			drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
P
Philipp Reisner 已提交
2050 2051 2052
			break;
		}
	}
2053 2054
	spin_unlock(&device->peer_seq_lock);
	finish_wait(&device->seq_wait, &wait);
P
Philipp Reisner 已提交
2055 2056 2057
	return ret;
}

2058 2059 2060
/* see also bio_flags_to_wire()
 * DRBD_REQ_*, because we need to semantically map the flags to data packet
 * flags and back. We may replicate to other kernel versions. */
2061
static unsigned long wire_flags_to_bio(u32 dpf)
2062
{
2063 2064 2065 2066
	return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
		(dpf & DP_FUA ? REQ_FUA : 0) |
		(dpf & DP_FLUSH ? REQ_FLUSH : 0) |
		(dpf & DP_DISCARD ? REQ_DISCARD : 0);
2067 2068
}

2069
static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2070 2071 2072 2073 2074
				    unsigned int size)
{
	struct drbd_interval *i;

    repeat:
2075
	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2076 2077 2078 2079 2080 2081 2082 2083 2084 2085
		struct drbd_request *req;
		struct bio_and_error m;

		if (!i->local)
			continue;
		req = container_of(i, struct drbd_request, i);
		if (!(req->rq_state & RQ_POSTPONED))
			continue;
		req->rq_state &= ~RQ_POSTPONED;
		__req_mod(req, NEG_ACKED, &m);
2086
		spin_unlock_irq(&device->resource->req_lock);
2087
		if (m.bio)
2088
			complete_master_bio(device, &m);
2089
		spin_lock_irq(&device->resource->req_lock);
2090 2091 2092 2093
		goto repeat;
	}
}

2094
static int handle_write_conflicts(struct drbd_device *device,
2095 2096
				  struct drbd_peer_request *peer_req)
{
2097
	struct drbd_connection *connection = peer_req->peer_device->connection;
2098
	bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2099 2100 2101 2102 2103 2104 2105 2106 2107 2108
	sector_t sector = peer_req->i.sector;
	const unsigned int size = peer_req->i.size;
	struct drbd_interval *i;
	bool equal;
	int err;

	/*
	 * Inserting the peer request into the write_requests tree will prevent
	 * new conflicting local requests from being added.
	 */
2109
	drbd_insert_interval(&device->write_requests, &peer_req->i);
2110 2111

    repeat:
2112
	drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2113 2114 2115 2116 2117 2118 2119 2120 2121
		if (i == &peer_req->i)
			continue;

		if (!i->local) {
			/*
			 * Our peer has sent a conflicting remote request; this
			 * should not happen in a two-node setup.  Wait for the
			 * earlier peer request to complete.
			 */
2122
			err = drbd_wait_misc(device, i);
2123 2124 2125 2126 2127 2128 2129 2130 2131
			if (err)
				goto out;
			goto repeat;
		}

		equal = i->sector == sector && i->size == size;
		if (resolve_conflicts) {
			/*
			 * If the peer request is fully contained within the
2132 2133 2134
			 * overlapping request, it can be considered overwritten
			 * and thus superseded; otherwise, it will be retried
			 * once all overlapping requests have completed.
2135
			 */
2136
			bool superseded = i->sector <= sector && i->sector +
2137 2138 2139
				       (i->size >> 9) >= sector + (size >> 9);

			if (!equal)
2140
				drbd_alert(device, "Concurrent writes detected: "
2141 2142 2143 2144
					       "local=%llus +%u, remote=%llus +%u, "
					       "assuming %s came first\n",
					  (unsigned long long)i->sector, i->size,
					  (unsigned long long)sector, size,
2145
					  superseded ? "local" : "remote");
2146

2147
			inc_unacked(device);
2148
			peer_req->w.cb = superseded ? e_send_superseded :
2149
						   e_send_retry_write;
2150
			list_add_tail(&peer_req->w.list, &device->done_ee);
2151
			wake_asender(connection);
2152 2153 2154 2155 2156 2157 2158 2159

			err = -ENOENT;
			goto out;
		} else {
			struct drbd_request *req =
				container_of(i, struct drbd_request, i);

			if (!equal)
2160
				drbd_alert(device, "Concurrent writes detected: "
2161 2162 2163 2164 2165 2166 2167 2168
					       "local=%llus +%u, remote=%llus +%u\n",
					  (unsigned long long)i->sector, i->size,
					  (unsigned long long)sector, size);

			if (req->rq_state & RQ_LOCAL_PENDING ||
			    !(req->rq_state & RQ_POSTPONED)) {
				/*
				 * Wait for the node with the discard flag to
2169 2170 2171
				 * decide if this request has been superseded
				 * or needs to be retried.
				 * Requests that have been superseded will
2172 2173 2174 2175 2176 2177
				 * disappear from the write_requests tree.
				 *
				 * In addition, wait for the conflicting
				 * request to finish locally before submitting
				 * the conflicting peer request.
				 */
2178
				err = drbd_wait_misc(device, &req->i);
2179
				if (err) {
2180
					_conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2181
					fail_postponed_requests(device, sector, size);
2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194 2195 2196
					goto out;
				}
				goto repeat;
			}
			/*
			 * Remember to restart the conflicting requests after
			 * the new peer request has completed.
			 */
			peer_req->flags |= EE_RESTART_REQUESTS;
		}
	}
	err = 0;

    out:
	if (err)
2197
		drbd_remove_epoch_entry_interval(device, peer_req);
2198 2199 2200
	return err;
}

P
Philipp Reisner 已提交
2201
/* mirrored write */
2202
static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
2203
{
2204
	struct drbd_peer_device *peer_device;
2205
	struct drbd_device *device;
P
Philipp Reisner 已提交
2206
	sector_t sector;
2207
	struct drbd_peer_request *peer_req;
2208
	struct p_data *p = pi->data;
2209
	u32 peer_seq = be32_to_cpu(p->seq_num);
P
Philipp Reisner 已提交
2210 2211
	int rw = WRITE;
	u32 dp_flags;
2212
	int err, tp;
P
Philipp Reisner 已提交
2213

2214 2215
	peer_device = conn_peer_device(connection, pi->vnr);
	if (!peer_device)
2216
		return -EIO;
2217
	device = peer_device->device;
P
Philipp Reisner 已提交
2218

2219
	if (!get_ldev(device)) {
2220 2221
		int err2;

2222 2223
		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
		drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2224
		atomic_inc(&connection->current_epoch->epoch_size);
2225
		err2 = drbd_drain_block(peer_device, pi->size);
2226 2227 2228
		if (!err)
			err = err2;
		return err;
P
Philipp Reisner 已提交
2229 2230
	}

2231 2232 2233 2234 2235
	/*
	 * Corresponding put_ldev done either below (on various errors), or in
	 * drbd_peer_request_endio, if we successfully submit the data at the
	 * end of this function.
	 */
P
Philipp Reisner 已提交
2236 2237

	sector = be64_to_cpu(p->sector);
2238
	peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2239
	if (!peer_req) {
2240
		put_ldev(device);
2241
		return -EIO;
P
Philipp Reisner 已提交
2242 2243
	}

2244
	peer_req->w.cb = e_end_block;
P
Philipp Reisner 已提交
2245

2246
	dp_flags = be32_to_cpu(p->dp_flags);
2247
	rw |= wire_flags_to_bio(dp_flags);
2248 2249 2250 2251 2252 2253 2254 2255 2256
	if (pi->cmd == P_TRIM) {
		struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
		peer_req->flags |= EE_IS_TRIM;
		if (!blk_queue_discard(q))
			peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
		D_ASSERT(peer_device, peer_req->i.size > 0);
		D_ASSERT(peer_device, rw & REQ_DISCARD);
		D_ASSERT(peer_device, peer_req->pages == NULL);
	} else if (peer_req->pages == NULL) {
2257 2258
		D_ASSERT(device, peer_req->i.size == 0);
		D_ASSERT(device, dp_flags & DP_FLUSH);
2259
	}
2260 2261

	if (dp_flags & DP_MAY_SET_IN_SYNC)
2262
		peer_req->flags |= EE_MAY_SET_IN_SYNC;
2263

2264 2265
	spin_lock(&connection->epoch_lock);
	peer_req->epoch = connection->current_epoch;
2266 2267
	atomic_inc(&peer_req->epoch->epoch_size);
	atomic_inc(&peer_req->epoch->active);
2268
	spin_unlock(&connection->epoch_lock);
P
Philipp Reisner 已提交
2269

2270
	rcu_read_lock();
2271
	tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2272 2273 2274
	rcu_read_unlock();
	if (tp) {
		peer_req->flags |= EE_IN_INTERVAL_TREE;
2275
		err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2276
		if (err)
P
Philipp Reisner 已提交
2277
			goto out_interrupted;
2278
		spin_lock_irq(&device->resource->req_lock);
2279
		err = handle_write_conflicts(device, peer_req);
2280
		if (err) {
2281
			spin_unlock_irq(&device->resource->req_lock);
2282
			if (err == -ENOENT) {
2283
				put_ldev(device);
2284
				return 0;
P
Philipp Reisner 已提交
2285
			}
2286
			goto out_interrupted;
P
Philipp Reisner 已提交
2287
		}
2288
	} else {
2289
		update_peer_seq(peer_device, peer_seq);
2290
		spin_lock_irq(&device->resource->req_lock);
2291
	}
2292 2293 2294 2295 2296 2297
	/* if we use the zeroout fallback code, we process synchronously
	 * and we wait for all pending requests, respectively wait for
	 * active_ee to become empty in drbd_submit_peer_request();
	 * better not add ourselves here. */
	if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0)
		list_add(&peer_req->w.list, &device->active_ee);
2298
	spin_unlock_irq(&device->resource->req_lock);
P
Philipp Reisner 已提交
2299

2300 2301
	if (device->state.conn == C_SYNC_TARGET)
		wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
P
Philipp Reisner 已提交
2302

2303
	if (peer_device->connection->agreed_pro_version < 100) {
2304
		rcu_read_lock();
2305
		switch (rcu_dereference(peer_device->connection->net_conf)->wire_protocol) {
2306 2307 2308 2309 2310 2311
		case DRBD_PROT_C:
			dp_flags |= DP_SEND_WRITE_ACK;
			break;
		case DRBD_PROT_B:
			dp_flags |= DP_SEND_RECEIVE_ACK;
			break;
P
Philipp Reisner 已提交
2312
		}
2313
		rcu_read_unlock();
P
Philipp Reisner 已提交
2314 2315
	}

2316 2317
	if (dp_flags & DP_SEND_WRITE_ACK) {
		peer_req->flags |= EE_SEND_WRITE_ACK;
2318
		inc_unacked(device);
P
Philipp Reisner 已提交
2319 2320
		/* corresponding dec_unacked() in e_end_block()
		 * respective _drbd_clear_done_ee */
2321 2322 2323
	}

	if (dp_flags & DP_SEND_RECEIVE_ACK) {
P
Philipp Reisner 已提交
2324 2325
		/* I really don't like it that the receiver thread
		 * sends on the msock, but anyways */
2326
		drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req);
P
Philipp Reisner 已提交
2327 2328
	}

2329
	if (device->state.pdsk < D_INCONSISTENT) {
P
Philipp Reisner 已提交
2330
		/* In case we have the only disk of the cluster, */
2331
		drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2332 2333
		peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
		peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2334
		drbd_al_begin_io(device, &peer_req->i, true);
P
Philipp Reisner 已提交
2335 2336
	}

2337
	err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2338 2339
	if (!err)
		return 0;
P
Philipp Reisner 已提交
2340

2341
	/* don't care for the reason here */
2342
	drbd_err(device, "submit failed, triggering re-connect\n");
2343
	spin_lock_irq(&device->resource->req_lock);
2344
	list_del(&peer_req->w.list);
2345
	drbd_remove_epoch_entry_interval(device, peer_req);
2346
	spin_unlock_irq(&device->resource->req_lock);
2347
	if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2348
		drbd_al_complete_io(device, &peer_req->i);
2349

P
Philipp Reisner 已提交
2350
out_interrupted:
2351
	drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2352 2353
	put_ldev(device);
	drbd_free_peer_req(device, peer_req);
2354
	return err;
P
Philipp Reisner 已提交
2355 2356
}

2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367
/* We may throttle resync, if the lower device seems to be busy,
 * and current sync rate is above c_min_rate.
 *
 * To decide whether or not the lower device is busy, we use a scheme similar
 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
 * (more than 64 sectors) of activity we cannot account for with our own resync
 * activity, it obviously is "busy".
 *
 * The current sync rate used here uses only the most recent two step marks,
 * to have a short time average so we can react faster.
 */
2368
bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector)
2369
{
2370
	struct lc_element *tmp;
2371
	bool throttle = true;
P
Philipp Reisner 已提交
2372

2373 2374
	if (!drbd_rs_c_min_rate_throttle(device))
		return false;
2375

2376 2377
	spin_lock_irq(&device->al_lock);
	tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2378 2379
	if (tmp) {
		struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2380 2381
		if (test_bit(BME_PRIORITY, &bm_ext->flags))
			throttle = false;
2382 2383
		/* Do not slow down if app IO is already waiting for this extent */
	}
2384
	spin_unlock_irq(&device->al_lock);
2385

2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403
	return throttle;
}

bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
{
	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
	unsigned long db, dt, dbdt;
	unsigned int c_min_rate;
	int curr_events;

	rcu_read_lock();
	c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
	rcu_read_unlock();

	/* feature disabled? */
	if (c_min_rate == 0)
		return false;

2404 2405
	curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
		      (int)part_stat_read(&disk->part0, sectors[1]) -
2406 2407
			atomic_read(&device->rs_sect_ev);
	if (!device->rs_last_events || curr_events - device->rs_last_events > 64) {
2408 2409 2410
		unsigned long rs_left;
		int i;

2411
		device->rs_last_events = curr_events;
2412 2413 2414

		/* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
		 * approx. */
2415
		i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2416

2417 2418
		if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
			rs_left = device->ov_left;
2419
		else
2420
			rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2421

2422
		dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2423 2424
		if (!dt)
			dt++;
2425
		db = device->rs_mark_left[i] - rs_left;
2426 2427
		dbdt = Bit2KB(db/dt);

P
Philipp Reisner 已提交
2428
		if (dbdt > c_min_rate)
2429
			return true;
2430
	}
2431
	return false;
2432 2433
}

2434
static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
2435
{
2436
	struct drbd_peer_device *peer_device;
2437
	struct drbd_device *device;
P
Philipp Reisner 已提交
2438
	sector_t sector;
2439
	sector_t capacity;
2440
	struct drbd_peer_request *peer_req;
P
Philipp Reisner 已提交
2441
	struct digest_info *di = NULL;
2442
	int size, verb;
P
Philipp Reisner 已提交
2443
	unsigned int fault_type;
2444
	struct p_block_req *p =	pi->data;
2445

2446 2447
	peer_device = conn_peer_device(connection, pi->vnr);
	if (!peer_device)
2448
		return -EIO;
2449
	device = peer_device->device;
2450
	capacity = drbd_get_capacity(device->this_bdev);
P
Philipp Reisner 已提交
2451 2452 2453 2454

	sector = be64_to_cpu(p->sector);
	size   = be32_to_cpu(p->blksize);

2455
	if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2456
		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
P
Philipp Reisner 已提交
2457
				(unsigned long long)sector, size);
2458
		return -EINVAL;
P
Philipp Reisner 已提交
2459 2460
	}
	if (sector + (size>>9) > capacity) {
2461
		drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
P
Philipp Reisner 已提交
2462
				(unsigned long long)sector, size);
2463
		return -EINVAL;
P
Philipp Reisner 已提交
2464 2465
	}

2466
	if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2467
		verb = 1;
2468
		switch (pi->cmd) {
2469
		case P_DATA_REQUEST:
2470
			drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2471 2472 2473 2474
			break;
		case P_RS_DATA_REQUEST:
		case P_CSUM_RS_REQUEST:
		case P_OV_REQUEST:
2475
			drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2476 2477 2478
			break;
		case P_OV_REPLY:
			verb = 0;
2479
			dec_rs_pending(device);
2480
			drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2481 2482
			break;
		default:
2483
			BUG();
2484 2485
		}
		if (verb && __ratelimit(&drbd_ratelimit_state))
2486
			drbd_err(device, "Can not satisfy peer's read request, "
P
Philipp Reisner 已提交
2487
			    "no local data.\n");
2488

L
Lars Ellenberg 已提交
2489
		/* drain possibly payload */
2490
		return drbd_drain_block(peer_device, pi->size);
P
Philipp Reisner 已提交
2491 2492 2493 2494 2495
	}

	/* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
	 * "criss-cross" setup, that might cause write-out on some other DRBD,
	 * which in turn might block on the other node at this very place.  */
2496 2497
	peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
			true /* has real payload */, GFP_NOIO);
2498
	if (!peer_req) {
2499
		put_ldev(device);
2500
		return -ENOMEM;
P
Philipp Reisner 已提交
2501 2502
	}

2503
	switch (pi->cmd) {
P
Philipp Reisner 已提交
2504
	case P_DATA_REQUEST:
2505
		peer_req->w.cb = w_e_end_data_req;
P
Philipp Reisner 已提交
2506
		fault_type = DRBD_FAULT_DT_RD;
2507 2508 2509
		/* application IO, don't drbd_rs_begin_io */
		goto submit;

P
Philipp Reisner 已提交
2510
	case P_RS_DATA_REQUEST:
2511
		peer_req->w.cb = w_e_end_rsdata_req;
P
Philipp Reisner 已提交
2512
		fault_type = DRBD_FAULT_RS_RD;
2513
		/* used in the sector offset progress display */
2514
		device->bm_resync_fo = BM_SECT_TO_BIT(sector);
P
Philipp Reisner 已提交
2515 2516 2517 2518 2519
		break;

	case P_OV_REPLY:
	case P_CSUM_RS_REQUEST:
		fault_type = DRBD_FAULT_RS_RD;
2520
		di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
P
Philipp Reisner 已提交
2521 2522 2523
		if (!di)
			goto out_free_e;

2524
		di->digest_size = pi->size;
P
Philipp Reisner 已提交
2525 2526
		di->digest = (((char *)di)+sizeof(struct digest_info));

2527 2528
		peer_req->digest = di;
		peer_req->flags |= EE_HAS_DIGEST;
2529

2530
		if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
P
Philipp Reisner 已提交
2531 2532
			goto out_free_e;

2533
		if (pi->cmd == P_CSUM_RS_REQUEST) {
2534
			D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2535
			peer_req->w.cb = w_e_end_csum_rs_req;
2536
			/* used in the sector offset progress display */
2537
			device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2538
		} else if (pi->cmd == P_OV_REPLY) {
2539
			/* track progress, we may need to throttle */
2540
			atomic_add(size >> 9, &device->rs_sect_in);
2541
			peer_req->w.cb = w_e_end_ov_reply;
2542
			dec_rs_pending(device);
2543 2544 2545
			/* drbd_rs_begin_io done when we sent this request,
			 * but accounting still needs to be done. */
			goto submit_for_resync;
P
Philipp Reisner 已提交
2546 2547 2548 2549
		}
		break;

	case P_OV_REQUEST:
2550
		if (device->ov_start_sector == ~(sector_t)0 &&
2551
		    peer_device->connection->agreed_pro_version >= 90) {
2552 2553
			unsigned long now = jiffies;
			int i;
2554 2555 2556 2557
			device->ov_start_sector = sector;
			device->ov_position = sector;
			device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
			device->rs_total = device->ov_left;
2558
			for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2559 2560
				device->rs_mark_left[i] = device->ov_left;
				device->rs_mark_time[i] = now;
2561
			}
2562
			drbd_info(device, "Online Verify start sector: %llu\n",
P
Philipp Reisner 已提交
2563 2564
					(unsigned long long)sector);
		}
2565
		peer_req->w.cb = w_e_end_ov_req;
P
Philipp Reisner 已提交
2566 2567 2568 2569
		fault_type = DRBD_FAULT_RS_RD;
		break;

	default:
2570
		BUG();
P
Philipp Reisner 已提交
2571 2572
	}

2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594
	/* Throttle, drbd_rs_begin_io and submit should become asynchronous
	 * wrt the receiver, but it is not as straightforward as it may seem.
	 * Various places in the resync start and stop logic assume resync
	 * requests are processed in order, requeuing this on the worker thread
	 * introduces a bunch of new code for synchronization between threads.
	 *
	 * Unlimited throttling before drbd_rs_begin_io may stall the resync
	 * "forever", throttling after drbd_rs_begin_io will lock that extent
	 * for application writes for the same time.  For now, just throttle
	 * here, where the rest of the code expects the receiver to sleep for
	 * a while, anyways.
	 */

	/* Throttle before drbd_rs_begin_io, as that locks out application IO;
	 * this defers syncer requests for some time, before letting at least
	 * on request through.  The resync controller on the receiving side
	 * will adapt to the incoming rate accordingly.
	 *
	 * We cannot throttle here if remote is Primary/SyncTarget:
	 * we would also throttle its application reads.
	 * In that case, throttling is done on the SyncTarget only.
	 */
2595
	if (device->state.peer != R_PRIMARY && drbd_rs_should_slow_down(device, sector))
2596
		schedule_timeout_uninterruptible(HZ/10);
2597
	if (drbd_rs_begin_io(device, sector))
2598
		goto out_free_e;
P
Philipp Reisner 已提交
2599

2600
submit_for_resync:
2601
	atomic_add(size >> 9, &device->rs_sect_ev);
2602

2603
submit:
2604
	inc_unacked(device);
2605
	spin_lock_irq(&device->resource->req_lock);
2606
	list_add_tail(&peer_req->w.list, &device->read_ee);
2607
	spin_unlock_irq(&device->resource->req_lock);
P
Philipp Reisner 已提交
2608

2609
	if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2610
		return 0;
P
Philipp Reisner 已提交
2611

2612
	/* don't care for the reason here */
2613
	drbd_err(device, "submit failed, triggering re-connect\n");
2614
	spin_lock_irq(&device->resource->req_lock);
2615
	list_del(&peer_req->w.list);
2616
	spin_unlock_irq(&device->resource->req_lock);
2617 2618
	/* no drbd_rs_complete_io(), we are dropping the connection anyways */

P
Philipp Reisner 已提交
2619
out_free_e:
2620 2621
	put_ldev(device);
	drbd_free_peer_req(device, peer_req);
2622
	return -EIO;
P
Philipp Reisner 已提交
2623 2624
}

2625 2626 2627 2628
/**
 * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
 */
static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
P
Philipp Reisner 已提交
2629
{
2630
	struct drbd_device *device = peer_device->device;
P
Philipp Reisner 已提交
2631 2632
	int self, peer, rv = -100;
	unsigned long ch_self, ch_peer;
2633
	enum drbd_after_sb_p after_sb_0p;
P
Philipp Reisner 已提交
2634

2635 2636
	self = device->ldev->md.uuid[UI_BITMAP] & 1;
	peer = device->p_uuid[UI_BITMAP] & 1;
P
Philipp Reisner 已提交
2637

2638 2639
	ch_peer = device->p_uuid[UI_SIZE];
	ch_self = device->comm_bm_set;
P
Philipp Reisner 已提交
2640

2641
	rcu_read_lock();
2642
	after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2643 2644
	rcu_read_unlock();
	switch (after_sb_0p) {
P
Philipp Reisner 已提交
2645 2646 2647
	case ASB_CONSENSUS:
	case ASB_DISCARD_SECONDARY:
	case ASB_CALL_HELPER:
2648
	case ASB_VIOLENTLY:
2649
		drbd_err(device, "Configuration error.\n");
P
Philipp Reisner 已提交
2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672
		break;
	case ASB_DISCONNECT:
		break;
	case ASB_DISCARD_YOUNGER_PRI:
		if (self == 0 && peer == 1) {
			rv = -1;
			break;
		}
		if (self == 1 && peer == 0) {
			rv =  1;
			break;
		}
		/* Else fall through to one of the other strategies... */
	case ASB_DISCARD_OLDER_PRI:
		if (self == 0 && peer == 1) {
			rv = 1;
			break;
		}
		if (self == 1 && peer == 0) {
			rv = -1;
			break;
		}
		/* Else fall through to one of the other strategies... */
2673
		drbd_warn(device, "Discard younger/older primary did not find a decision\n"
P
Philipp Reisner 已提交
2674 2675 2676
		     "Using discard-least-changes instead\n");
	case ASB_DISCARD_ZERO_CHG:
		if (ch_peer == 0 && ch_self == 0) {
2677
			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
P
Philipp Reisner 已提交
2678 2679 2680 2681 2682 2683
				? -1 : 1;
			break;
		} else {
			if (ch_peer == 0) { rv =  1; break; }
			if (ch_self == 0) { rv = -1; break; }
		}
2684
		if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
P
Philipp Reisner 已提交
2685 2686 2687 2688 2689 2690 2691 2692
			break;
	case ASB_DISCARD_LEAST_CHG:
		if	(ch_self < ch_peer)
			rv = -1;
		else if (ch_self > ch_peer)
			rv =  1;
		else /* ( ch_self == ch_peer ) */
		     /* Well, then use something else. */
2693
			rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
P
Philipp Reisner 已提交
2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705
				? -1 : 1;
		break;
	case ASB_DISCARD_LOCAL:
		rv = -1;
		break;
	case ASB_DISCARD_REMOTE:
		rv =  1;
	}

	return rv;
}

2706 2707 2708 2709
/**
 * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
 */
static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
P
Philipp Reisner 已提交
2710
{
2711
	struct drbd_device *device = peer_device->device;
2712
	int hg, rv = -100;
2713
	enum drbd_after_sb_p after_sb_1p;
P
Philipp Reisner 已提交
2714

2715
	rcu_read_lock();
2716
	after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2717 2718
	rcu_read_unlock();
	switch (after_sb_1p) {
P
Philipp Reisner 已提交
2719 2720 2721 2722 2723
	case ASB_DISCARD_YOUNGER_PRI:
	case ASB_DISCARD_OLDER_PRI:
	case ASB_DISCARD_LEAST_CHG:
	case ASB_DISCARD_LOCAL:
	case ASB_DISCARD_REMOTE:
2724
	case ASB_DISCARD_ZERO_CHG:
2725
		drbd_err(device, "Configuration error.\n");
P
Philipp Reisner 已提交
2726 2727 2728 2729
		break;
	case ASB_DISCONNECT:
		break;
	case ASB_CONSENSUS:
2730
		hg = drbd_asb_recover_0p(peer_device);
2731
		if (hg == -1 && device->state.role == R_SECONDARY)
P
Philipp Reisner 已提交
2732
			rv = hg;
2733
		if (hg == 1  && device->state.role == R_PRIMARY)
P
Philipp Reisner 已提交
2734 2735 2736
			rv = hg;
		break;
	case ASB_VIOLENTLY:
2737
		rv = drbd_asb_recover_0p(peer_device);
P
Philipp Reisner 已提交
2738 2739
		break;
	case ASB_DISCARD_SECONDARY:
2740
		return device->state.role == R_PRIMARY ? 1 : -1;
P
Philipp Reisner 已提交
2741
	case ASB_CALL_HELPER:
2742
		hg = drbd_asb_recover_0p(peer_device);
2743
		if (hg == -1 && device->state.role == R_PRIMARY) {
2744 2745
			enum drbd_state_rv rv2;

P
Philipp Reisner 已提交
2746 2747 2748
			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
			  * we might be here in C_WF_REPORT_PARAMS which is transient.
			  * we do not need to wait for the after state change work either. */
2749
			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2750
			if (rv2 != SS_SUCCESS) {
2751
				drbd_khelper(device, "pri-lost-after-sb");
P
Philipp Reisner 已提交
2752
			} else {
2753
				drbd_warn(device, "Successfully gave up primary role.\n");
P
Philipp Reisner 已提交
2754 2755 2756 2757 2758 2759 2760 2761 2762
				rv = hg;
			}
		} else
			rv = hg;
	}

	return rv;
}

2763 2764 2765 2766
/**
 * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
 */
static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
P
Philipp Reisner 已提交
2767
{
2768
	struct drbd_device *device = peer_device->device;
2769
	int hg, rv = -100;
2770
	enum drbd_after_sb_p after_sb_2p;
P
Philipp Reisner 已提交
2771

2772
	rcu_read_lock();
2773
	after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2774 2775
	rcu_read_unlock();
	switch (after_sb_2p) {
P
Philipp Reisner 已提交
2776 2777 2778 2779 2780 2781 2782
	case ASB_DISCARD_YOUNGER_PRI:
	case ASB_DISCARD_OLDER_PRI:
	case ASB_DISCARD_LEAST_CHG:
	case ASB_DISCARD_LOCAL:
	case ASB_DISCARD_REMOTE:
	case ASB_CONSENSUS:
	case ASB_DISCARD_SECONDARY:
2783
	case ASB_DISCARD_ZERO_CHG:
2784
		drbd_err(device, "Configuration error.\n");
P
Philipp Reisner 已提交
2785 2786
		break;
	case ASB_VIOLENTLY:
2787
		rv = drbd_asb_recover_0p(peer_device);
P
Philipp Reisner 已提交
2788 2789 2790 2791
		break;
	case ASB_DISCONNECT:
		break;
	case ASB_CALL_HELPER:
2792
		hg = drbd_asb_recover_0p(peer_device);
P
Philipp Reisner 已提交
2793
		if (hg == -1) {
2794 2795
			enum drbd_state_rv rv2;

P
Philipp Reisner 已提交
2796 2797 2798
			 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
			  * we might be here in C_WF_REPORT_PARAMS which is transient.
			  * we do not need to wait for the after state change work either. */
2799
			rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2800
			if (rv2 != SS_SUCCESS) {
2801
				drbd_khelper(device, "pri-lost-after-sb");
P
Philipp Reisner 已提交
2802
			} else {
2803
				drbd_warn(device, "Successfully gave up primary role.\n");
P
Philipp Reisner 已提交
2804 2805 2806 2807 2808 2809 2810 2811 2812
				rv = hg;
			}
		} else
			rv = hg;
	}

	return rv;
}

2813
static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
P
Philipp Reisner 已提交
2814 2815 2816
			   u64 bits, u64 flags)
{
	if (!uuid) {
2817
		drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
P
Philipp Reisner 已提交
2818 2819
		return;
	}
2820
	drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
P
Philipp Reisner 已提交
2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838
	     text,
	     (unsigned long long)uuid[UI_CURRENT],
	     (unsigned long long)uuid[UI_BITMAP],
	     (unsigned long long)uuid[UI_HISTORY_START],
	     (unsigned long long)uuid[UI_HISTORY_END],
	     (unsigned long long)bits,
	     (unsigned long long)flags);
}

/*
  100	after split brain try auto recover
    2	C_SYNC_SOURCE set BitMap
    1	C_SYNC_SOURCE use BitMap
    0	no Sync
   -1	C_SYNC_TARGET use BitMap
   -2	C_SYNC_TARGET set BitMap
 -100	after split brain, disconnect
-1000	unrelated data
2839 2840
-1091   requires proto 91
-1096   requires proto 96
P
Philipp Reisner 已提交
2841
 */
2842
static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_hold(local)
P
Philipp Reisner 已提交
2843 2844 2845 2846
{
	u64 self, peer;
	int i, j;

2847 2848
	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
P
Philipp Reisner 已提交
2849 2850 2851 2852 2853 2854 2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866

	*rule_nr = 10;
	if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
		return 0;

	*rule_nr = 20;
	if ((self == UUID_JUST_CREATED || self == (u64)0) &&
	     peer != UUID_JUST_CREATED)
		return -2;

	*rule_nr = 30;
	if (self != UUID_JUST_CREATED &&
	    (peer == UUID_JUST_CREATED || peer == (u64)0))
		return 2;

	if (self == peer) {
		int rct, dc; /* roles at crash time */

2867
		if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
P
Philipp Reisner 已提交
2868

2869
			if (first_peer_device(device)->connection->agreed_pro_version < 91)
2870
				return -1091;
P
Philipp Reisner 已提交
2871

2872 2873
			if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
			    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2874
				drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2875 2876 2877
				drbd_uuid_move_history(device);
				device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
				device->ldev->md.uuid[UI_BITMAP] = 0;
P
Philipp Reisner 已提交
2878

2879 2880
				drbd_uuid_dump(device, "self", device->ldev->md.uuid,
					       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
P
Philipp Reisner 已提交
2881 2882
				*rule_nr = 34;
			} else {
2883
				drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
P
Philipp Reisner 已提交
2884 2885 2886 2887 2888 2889
				*rule_nr = 36;
			}

			return 1;
		}

2890
		if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
P
Philipp Reisner 已提交
2891

2892
			if (first_peer_device(device)->connection->agreed_pro_version < 91)
2893
				return -1091;
P
Philipp Reisner 已提交
2894

2895 2896
			if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
			    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2897
				drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
P
Philipp Reisner 已提交
2898

2899 2900 2901
				device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
				device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
				device->p_uuid[UI_BITMAP] = 0UL;
P
Philipp Reisner 已提交
2902

2903
				drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
P
Philipp Reisner 已提交
2904 2905
				*rule_nr = 35;
			} else {
2906
				drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
P
Philipp Reisner 已提交
2907 2908 2909 2910 2911 2912 2913
				*rule_nr = 37;
			}

			return -1;
		}

		/* Common power [off|failure] */
2914 2915
		rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
			(device->p_uuid[UI_FLAGS] & 2);
P
Philipp Reisner 已提交
2916 2917 2918 2919 2920 2921 2922 2923 2924
		/* lowest bit is set when we were primary,
		 * next bit (weight 2) is set when peer was primary */
		*rule_nr = 40;

		switch (rct) {
		case 0: /* !self_pri && !peer_pri */ return 0;
		case 1: /*  self_pri && !peer_pri */ return 1;
		case 2: /* !self_pri &&  peer_pri */ return -1;
		case 3: /*  self_pri &&  peer_pri */
2925
			dc = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags);
P
Philipp Reisner 已提交
2926 2927 2928 2929 2930
			return dc ? -1 : 1;
		}
	}

	*rule_nr = 50;
2931
	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
P
Philipp Reisner 已提交
2932 2933 2934 2935
	if (self == peer)
		return -1;

	*rule_nr = 51;
2936
	peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
P
Philipp Reisner 已提交
2937
	if (self == peer) {
2938
		if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2939 2940 2941
		    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
		    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
		    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
P
Philipp Reisner 已提交
2942 2943 2944
			/* The last P_SYNC_UUID did not get though. Undo the last start of
			   resync as sync source modifications of the peer's UUIDs. */

2945
			if (first_peer_device(device)->connection->agreed_pro_version < 91)
2946
				return -1091;
P
Philipp Reisner 已提交
2947

2948 2949
			device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
			device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
2950

2951
			drbd_info(device, "Lost last syncUUID packet, corrected:\n");
2952
			drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2953

P
Philipp Reisner 已提交
2954 2955 2956 2957 2958
			return -1;
		}
	}

	*rule_nr = 60;
2959
	self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
P
Philipp Reisner 已提交
2960
	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2961
		peer = device->p_uuid[i] & ~((u64)1);
P
Philipp Reisner 已提交
2962 2963 2964 2965 2966
		if (self == peer)
			return -2;
	}

	*rule_nr = 70;
2967 2968
	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
P
Philipp Reisner 已提交
2969 2970 2971 2972
	if (self == peer)
		return 1;

	*rule_nr = 71;
2973
	self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
P
Philipp Reisner 已提交
2974
	if (self == peer) {
2975
		if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2976 2977 2978
		    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
		    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
		    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
P
Philipp Reisner 已提交
2979 2980 2981
			/* The last P_SYNC_UUID did not get though. Undo the last start of
			   resync as sync source modifications of our UUIDs. */

2982
			if (first_peer_device(device)->connection->agreed_pro_version < 91)
2983
				return -1091;
P
Philipp Reisner 已提交
2984

2985 2986
			__drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
			__drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
P
Philipp Reisner 已提交
2987

2988
			drbd_info(device, "Last syncUUID did not get through, corrected:\n");
2989 2990
			drbd_uuid_dump(device, "self", device->ldev->md.uuid,
				       device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
P
Philipp Reisner 已提交
2991 2992 2993 2994 2995 2996 2997

			return 1;
		}
	}


	*rule_nr = 80;
2998
	peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
P
Philipp Reisner 已提交
2999
	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3000
		self = device->ldev->md.uuid[i] & ~((u64)1);
P
Philipp Reisner 已提交
3001 3002 3003 3004 3005
		if (self == peer)
			return 2;
	}

	*rule_nr = 90;
3006 3007
	self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
	peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
P
Philipp Reisner 已提交
3008 3009 3010 3011 3012
	if (self == peer && self != ((u64)0))
		return 100;

	*rule_nr = 100;
	for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3013
		self = device->ldev->md.uuid[i] & ~((u64)1);
P
Philipp Reisner 已提交
3014
		for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3015
			peer = device->p_uuid[j] & ~((u64)1);
P
Philipp Reisner 已提交
3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026
			if (self == peer)
				return -100;
		}
	}

	return -1000;
}

/* drbd_sync_handshake() returns the new conn state on success, or
   CONN_MASK (-1) on failure.
 */
3027 3028
static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
					   enum drbd_role peer_role,
P
Philipp Reisner 已提交
3029 3030
					   enum drbd_disk_state peer_disk) __must_hold(local)
{
3031
	struct drbd_device *device = peer_device->device;
P
Philipp Reisner 已提交
3032 3033
	enum drbd_conns rv = C_MASK;
	enum drbd_disk_state mydisk;
3034
	struct net_conf *nc;
3035
	int hg, rule_nr, rr_conflict, tentative;
P
Philipp Reisner 已提交
3036

3037
	mydisk = device->state.disk;
P
Philipp Reisner 已提交
3038
	if (mydisk == D_NEGOTIATING)
3039
		mydisk = device->new_state_tmp.disk;
P
Philipp Reisner 已提交
3040

3041
	drbd_info(device, "drbd_sync_handshake:\n");
3042

3043 3044 3045 3046
	spin_lock_irq(&device->ldev->md.uuid_lock);
	drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
	drbd_uuid_dump(device, "peer", device->p_uuid,
		       device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
P
Philipp Reisner 已提交
3047

3048 3049
	hg = drbd_uuid_compare(device, &rule_nr);
	spin_unlock_irq(&device->ldev->md.uuid_lock);
P
Philipp Reisner 已提交
3050

3051
	drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
P
Philipp Reisner 已提交
3052 3053

	if (hg == -1000) {
3054
		drbd_alert(device, "Unrelated data, aborting!\n");
P
Philipp Reisner 已提交
3055 3056
		return C_MASK;
	}
3057
	if (hg < -1000) {
3058
		drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
P
Philipp Reisner 已提交
3059 3060 3061 3062 3063 3064 3065 3066 3067
		return C_MASK;
	}

	if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
	    (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
		int f = (hg == -100) || abs(hg) == 2;
		hg = mydisk > D_INCONSISTENT ? 1 : -1;
		if (f)
			hg = hg*2;
3068
		drbd_info(device, "Becoming sync %s due to disk states.\n",
P
Philipp Reisner 已提交
3069 3070 3071
		     hg > 0 ? "source" : "target");
	}

3072
	if (abs(hg) == 100)
3073
		drbd_khelper(device, "initial-split-brain");
3074

3075
	rcu_read_lock();
3076
	nc = rcu_dereference(peer_device->connection->net_conf);
3077 3078

	if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3079
		int pcount = (device->state.role == R_PRIMARY)
P
Philipp Reisner 已提交
3080 3081 3082 3083 3084
			   + (peer_role == R_PRIMARY);
		int forced = (hg == -100);

		switch (pcount) {
		case 0:
3085
			hg = drbd_asb_recover_0p(peer_device);
P
Philipp Reisner 已提交
3086 3087
			break;
		case 1:
3088
			hg = drbd_asb_recover_1p(peer_device);
P
Philipp Reisner 已提交
3089 3090
			break;
		case 2:
3091
			hg = drbd_asb_recover_2p(peer_device);
P
Philipp Reisner 已提交
3092 3093 3094
			break;
		}
		if (abs(hg) < 100) {
3095
			drbd_warn(device, "Split-Brain detected, %d primaries, "
P
Philipp Reisner 已提交
3096 3097 3098
			     "automatically solved. Sync from %s node\n",
			     pcount, (hg < 0) ? "peer" : "this");
			if (forced) {
3099
				drbd_warn(device, "Doing a full sync, since"
P
Philipp Reisner 已提交
3100 3101 3102 3103 3104 3105 3106
				     " UUIDs where ambiguous.\n");
				hg = hg*2;
			}
		}
	}

	if (hg == -100) {
3107
		if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
P
Philipp Reisner 已提交
3108
			hg = -1;
3109
		if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
P
Philipp Reisner 已提交
3110 3111 3112
			hg = 1;

		if (abs(hg) < 100)
3113
			drbd_warn(device, "Split-Brain detected, manually solved. "
P
Philipp Reisner 已提交
3114 3115 3116
			     "Sync from %s node\n",
			     (hg < 0) ? "peer" : "this");
	}
3117
	rr_conflict = nc->rr_conflict;
3118
	tentative = nc->tentative;
3119
	rcu_read_unlock();
P
Philipp Reisner 已提交
3120 3121

	if (hg == -100) {
3122 3123 3124 3125
		/* FIXME this log message is not correct if we end up here
		 * after an attempted attach on a diskless node.
		 * We just refuse to attach -- well, we drop the "connection"
		 * to that disk, in a way... */
3126
		drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3127
		drbd_khelper(device, "split-brain");
P
Philipp Reisner 已提交
3128 3129 3130 3131
		return C_MASK;
	}

	if (hg > 0 && mydisk <= D_INCONSISTENT) {
3132
		drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
P
Philipp Reisner 已提交
3133 3134 3135 3136
		return C_MASK;
	}

	if (hg < 0 && /* by intention we do not use mydisk here. */
3137
	    device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3138
		switch (rr_conflict) {
P
Philipp Reisner 已提交
3139
		case ASB_CALL_HELPER:
3140
			drbd_khelper(device, "pri-lost");
P
Philipp Reisner 已提交
3141 3142
			/* fall through */
		case ASB_DISCONNECT:
3143
			drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
P
Philipp Reisner 已提交
3144 3145
			return C_MASK;
		case ASB_VIOLENTLY:
3146
			drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
P
Philipp Reisner 已提交
3147 3148 3149 3150
			     "assumption\n");
		}
	}

3151
	if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3152
		if (hg == 0)
3153
			drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3154
		else
3155
			drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3156 3157 3158 3159 3160
				 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
				 abs(hg) >= 2 ? "full" : "bit-map based");
		return C_MASK;
	}

P
Philipp Reisner 已提交
3161
	if (abs(hg) >= 2) {
3162
		drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3163
		if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3164
					BM_LOCKED_SET_ALLOWED))
P
Philipp Reisner 已提交
3165 3166 3167 3168 3169 3170 3171 3172 3173
			return C_MASK;
	}

	if (hg > 0) { /* become sync source. */
		rv = C_WF_BITMAP_S;
	} else if (hg < 0) { /* become sync target */
		rv = C_WF_BITMAP_T;
	} else {
		rv = C_CONNECTED;
3174
		if (drbd_bm_total_weight(device)) {
3175
			drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3176
			     drbd_bm_total_weight(device));
P
Philipp Reisner 已提交
3177 3178 3179 3180 3181 3182
		}
	}

	return rv;
}

3183
static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
P
Philipp Reisner 已提交
3184 3185
{
	/* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3186 3187
	if (peer == ASB_DISCARD_REMOTE)
		return ASB_DISCARD_LOCAL;
P
Philipp Reisner 已提交
3188 3189

	/* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3190 3191
	if (peer == ASB_DISCARD_LOCAL)
		return ASB_DISCARD_REMOTE;
P
Philipp Reisner 已提交
3192 3193

	/* everything else is valid if they are equal on both sides. */
3194
	return peer;
P
Philipp Reisner 已提交
3195 3196
}

3197
static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
3198
{
3199
	struct p_protocol *p = pi->data;
3200 3201 3202 3203
	enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
	int p_proto, p_discard_my_data, p_two_primaries, cf;
	struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
	char integrity_alg[SHARED_SECRET_MAX] = "";
3204
	struct crypto_hash *peer_integrity_tfm = NULL;
3205
	void *int_dig_in = NULL, *int_dig_vv = NULL;
P
Philipp Reisner 已提交
3206 3207 3208 3209 3210 3211

	p_proto		= be32_to_cpu(p->protocol);
	p_after_sb_0p	= be32_to_cpu(p->after_sb_0p);
	p_after_sb_1p	= be32_to_cpu(p->after_sb_1p);
	p_after_sb_2p	= be32_to_cpu(p->after_sb_2p);
	p_two_primaries = be32_to_cpu(p->two_primaries);
3212
	cf		= be32_to_cpu(p->conn_flags);
3213
	p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3214

3215
	if (connection->agreed_pro_version >= 87) {
3216
		int err;
3217

3218
		if (pi->size > sizeof(integrity_alg))
3219
			return -EIO;
3220
		err = drbd_recv_all(connection, integrity_alg, pi->size);
3221 3222
		if (err)
			return err;
3223
		integrity_alg[SHARED_SECRET_MAX - 1] = 0;
P
Philipp Reisner 已提交
3224 3225
	}

3226
	if (pi->cmd != P_PROTOCOL_UPDATE) {
3227
		clear_bit(CONN_DRY_RUN, &connection->flags);
P
Philipp Reisner 已提交
3228

3229
		if (cf & CF_DRY_RUN)
3230
			set_bit(CONN_DRY_RUN, &connection->flags);
P
Philipp Reisner 已提交
3231

3232
		rcu_read_lock();
3233
		nc = rcu_dereference(connection->net_conf);
P
Philipp Reisner 已提交
3234

3235
		if (p_proto != nc->wire_protocol) {
3236
			drbd_err(connection, "incompatible %s settings\n", "protocol");
3237 3238
			goto disconnect_rcu_unlock;
		}
P
Philipp Reisner 已提交
3239

3240
		if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3241
			drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3242 3243
			goto disconnect_rcu_unlock;
		}
P
Philipp Reisner 已提交
3244

3245
		if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3246
			drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3247 3248
			goto disconnect_rcu_unlock;
		}
P
Philipp Reisner 已提交
3249

3250
		if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3251
			drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3252 3253
			goto disconnect_rcu_unlock;
		}
P
Philipp Reisner 已提交
3254

3255
		if (p_discard_my_data && nc->discard_my_data) {
3256
			drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3257 3258
			goto disconnect_rcu_unlock;
		}
P
Philipp Reisner 已提交
3259

3260
		if (p_two_primaries != nc->two_primaries) {
3261
			drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3262 3263
			goto disconnect_rcu_unlock;
		}
P
Philipp Reisner 已提交
3264

3265
		if (strcmp(integrity_alg, nc->integrity_alg)) {
3266
			drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3267 3268
			goto disconnect_rcu_unlock;
		}
P
Philipp Reisner 已提交
3269

3270
		rcu_read_unlock();
P
Philipp Reisner 已提交
3271 3272
	}

3273 3274 3275 3276 3277 3278 3279 3280 3281 3282 3283
	if (integrity_alg[0]) {
		int hash_size;

		/*
		 * We can only change the peer data integrity algorithm
		 * here.  Changing our own data integrity algorithm
		 * requires that we send a P_PROTOCOL_UPDATE packet at
		 * the same time; otherwise, the peer has no way to
		 * tell between which packets the algorithm should
		 * change.
		 */
P
Philipp Reisner 已提交
3284

3285 3286
		peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
		if (!peer_integrity_tfm) {
3287
			drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3288 3289 3290
				 integrity_alg);
			goto disconnect;
		}
P
Philipp Reisner 已提交
3291

3292 3293 3294 3295
		hash_size = crypto_hash_digestsize(peer_integrity_tfm);
		int_dig_in = kmalloc(hash_size, GFP_KERNEL);
		int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
		if (!(int_dig_in && int_dig_vv)) {
3296
			drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
P
Philipp Reisner 已提交
3297 3298 3299 3300
			goto disconnect;
		}
	}

3301 3302
	new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
	if (!new_net_conf) {
3303
		drbd_err(connection, "Allocation of new net_conf failed\n");
3304 3305 3306
		goto disconnect;
	}

3307
	mutex_lock(&connection->data.mutex);
3308
	mutex_lock(&connection->resource->conf_update);
3309
	old_net_conf = connection->net_conf;
3310 3311 3312 3313 3314 3315 3316 3317
	*new_net_conf = *old_net_conf;

	new_net_conf->wire_protocol = p_proto;
	new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
	new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
	new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
	new_net_conf->two_primaries = p_two_primaries;

3318
	rcu_assign_pointer(connection->net_conf, new_net_conf);
3319
	mutex_unlock(&connection->resource->conf_update);
3320
	mutex_unlock(&connection->data.mutex);
3321

3322 3323 3324 3325 3326 3327
	crypto_free_hash(connection->peer_integrity_tfm);
	kfree(connection->int_dig_in);
	kfree(connection->int_dig_vv);
	connection->peer_integrity_tfm = peer_integrity_tfm;
	connection->int_dig_in = int_dig_in;
	connection->int_dig_vv = int_dig_vv;
3328 3329

	if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3330
		drbd_info(connection, "peer data-integrity-alg: %s\n",
3331 3332 3333 3334
			  integrity_alg[0] ? integrity_alg : "(none)");

	synchronize_rcu();
	kfree(old_net_conf);
3335
	return 0;
P
Philipp Reisner 已提交
3336

3337 3338
disconnect_rcu_unlock:
	rcu_read_unlock();
P
Philipp Reisner 已提交
3339
disconnect:
3340
	crypto_free_hash(peer_integrity_tfm);
3341 3342
	kfree(int_dig_in);
	kfree(int_dig_vv);
3343
	conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3344
	return -EIO;
P
Philipp Reisner 已提交
3345 3346 3347 3348 3349 3350 3351
}

/* helper function
 * input: alg name, feature name
 * return: NULL (alg name was "")
 *         ERR_PTR(error) if something goes wrong
 *         or the crypto hash ptr, if it worked out ok. */
3352
static
3353
struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
P
Philipp Reisner 已提交
3354 3355 3356 3357 3358 3359 3360 3361 3362
		const char *alg, const char *name)
{
	struct crypto_hash *tfm;

	if (!alg[0])
		return NULL;

	tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
	if (IS_ERR(tfm)) {
3363
		drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
P
Philipp Reisner 已提交
3364 3365 3366 3367 3368 3369
			alg, name, PTR_ERR(tfm));
		return tfm;
	}
	return tfm;
}

3370
static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3371
{
3372
	void *buffer = connection->data.rbuf;
3373 3374 3375 3376
	int size = pi->size;

	while (size) {
		int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3377
		s = drbd_recv(connection, buffer, s);
3378 3379 3380 3381 3382 3383 3384 3385 3386 3387 3388 3389 3390 3391 3392 3393 3394 3395 3396 3397 3398 3399 3400
		if (s <= 0) {
			if (s < 0)
				return s;
			break;
		}
		size -= s;
	}
	if (size)
		return -EIO;
	return 0;
}

/*
 * config_unknown_volume  -  device configuration command for unknown volume
 *
 * When a device is added to an existing connection, the node on which the
 * device is added first will send configuration commands to its peer but the
 * peer will not know about the device yet.  It will warn and ignore these
 * commands.  Once the device is added on the second node, the second node will
 * send the same device configuration commands, but in the other direction.
 *
 * (We can also end up here if drbd is misconfigured.)
 */
3401
static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3402
{
3403
	drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3404
		  cmdname(pi->cmd), pi->vnr);
3405
	return ignore_remaining_packet(connection, pi);
3406 3407
}

3408
static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
3409
{
3410
	struct drbd_peer_device *peer_device;
3411
	struct drbd_device *device;
3412
	struct p_rs_param_95 *p;
P
Philipp Reisner 已提交
3413 3414 3415
	unsigned int header_size, data_size, exp_max_sz;
	struct crypto_hash *verify_tfm = NULL;
	struct crypto_hash *csums_tfm = NULL;
3416
	struct net_conf *old_net_conf, *new_net_conf = NULL;
P
Philipp Reisner 已提交
3417
	struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3418
	const int apv = connection->agreed_pro_version;
P
Philipp Reisner 已提交
3419
	struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3420
	int fifo_size = 0;
3421
	int err;
P
Philipp Reisner 已提交
3422

3423 3424
	peer_device = conn_peer_device(connection, pi->vnr);
	if (!peer_device)
3425
		return config_unknown_volume(connection, pi);
3426
	device = peer_device->device;
P
Philipp Reisner 已提交
3427 3428 3429 3430

	exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
		    : apv == 88 ? sizeof(struct p_rs_param)
					+ SHARED_SECRET_MAX
3431 3432
		    : apv <= 94 ? sizeof(struct p_rs_param_89)
		    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
P
Philipp Reisner 已提交
3433

3434
	if (pi->size > exp_max_sz) {
3435
		drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3436
		    pi->size, exp_max_sz);
3437
		return -EIO;
P
Philipp Reisner 已提交
3438 3439 3440
	}

	if (apv <= 88) {
3441
		header_size = sizeof(struct p_rs_param);
3442
		data_size = pi->size - header_size;
3443
	} else if (apv <= 94) {
3444
		header_size = sizeof(struct p_rs_param_89);
3445
		data_size = pi->size - header_size;
3446
		D_ASSERT(device, data_size == 0);
3447
	} else {
3448
		header_size = sizeof(struct p_rs_param_95);
3449
		data_size = pi->size - header_size;
3450
		D_ASSERT(device, data_size == 0);
P
Philipp Reisner 已提交
3451 3452 3453
	}

	/* initialize verify_alg and csums_alg */
3454
	p = pi->data;
P
Philipp Reisner 已提交
3455 3456
	memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);

3457
	err = drbd_recv_all(peer_device->connection, p, header_size);
3458 3459
	if (err)
		return err;
P
Philipp Reisner 已提交
3460

3461
	mutex_lock(&connection->resource->conf_update);
3462
	old_net_conf = peer_device->connection->net_conf;
3463
	if (get_ldev(device)) {
P
Philipp Reisner 已提交
3464 3465
		new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
		if (!new_disk_conf) {
3466
			put_ldev(device);
3467
			mutex_unlock(&connection->resource->conf_update);
3468
			drbd_err(device, "Allocation of new disk_conf failed\n");
P
Philipp Reisner 已提交
3469 3470
			return -ENOMEM;
		}
P
Philipp Reisner 已提交
3471

3472
		old_disk_conf = device->ldev->disk_conf;
P
Philipp Reisner 已提交
3473
		*new_disk_conf = *old_disk_conf;
P
Philipp Reisner 已提交
3474

3475
		new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
P
Philipp Reisner 已提交
3476
	}
P
Philipp Reisner 已提交
3477 3478 3479

	if (apv >= 88) {
		if (apv == 88) {
3480
			if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3481
				drbd_err(device, "verify-alg of wrong size, "
3482 3483
					"peer wants %u, accepting only up to %u byte\n",
					data_size, SHARED_SECRET_MAX);
P
Philipp Reisner 已提交
3484 3485
				err = -EIO;
				goto reconnect;
P
Philipp Reisner 已提交
3486 3487
			}

3488
			err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
P
Philipp Reisner 已提交
3489 3490
			if (err)
				goto reconnect;
P
Philipp Reisner 已提交
3491 3492
			/* we expect NUL terminated string */
			/* but just in case someone tries to be evil */
3493
			D_ASSERT(device, p->verify_alg[data_size-1] == 0);
P
Philipp Reisner 已提交
3494 3495 3496 3497 3498
			p->verify_alg[data_size-1] = 0;

		} else /* apv >= 89 */ {
			/* we still expect NUL terminated strings */
			/* but just in case someone tries to be evil */
3499 3500
			D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
			D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
P
Philipp Reisner 已提交
3501 3502 3503 3504
			p->verify_alg[SHARED_SECRET_MAX-1] = 0;
			p->csums_alg[SHARED_SECRET_MAX-1] = 0;
		}

3505
		if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3506
			if (device->state.conn == C_WF_REPORT_PARAMS) {
3507
				drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3508
				    old_net_conf->verify_alg, p->verify_alg);
P
Philipp Reisner 已提交
3509 3510
				goto disconnect;
			}
3511
			verify_tfm = drbd_crypto_alloc_digest_safe(device,
P
Philipp Reisner 已提交
3512 3513 3514 3515 3516 3517 3518
					p->verify_alg, "verify-alg");
			if (IS_ERR(verify_tfm)) {
				verify_tfm = NULL;
				goto disconnect;
			}
		}

3519
		if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3520
			if (device->state.conn == C_WF_REPORT_PARAMS) {
3521
				drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3522
				    old_net_conf->csums_alg, p->csums_alg);
P
Philipp Reisner 已提交
3523 3524
				goto disconnect;
			}
3525
			csums_tfm = drbd_crypto_alloc_digest_safe(device,
P
Philipp Reisner 已提交
3526 3527 3528 3529 3530 3531 3532
					p->csums_alg, "csums-alg");
			if (IS_ERR(csums_tfm)) {
				csums_tfm = NULL;
				goto disconnect;
			}
		}

P
Philipp Reisner 已提交
3533
		if (apv > 94 && new_disk_conf) {
P
Philipp Reisner 已提交
3534 3535 3536 3537
			new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
			new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
			new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
			new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3538

P
Philipp Reisner 已提交
3539
			fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3540
			if (fifo_size != device->rs_plan_s->size) {
P
Philipp Reisner 已提交
3541 3542
				new_plan = fifo_alloc(fifo_size);
				if (!new_plan) {
3543
					drbd_err(device, "kmalloc of fifo_buffer failed");
3544
					put_ldev(device);
3545 3546 3547
					goto disconnect;
				}
			}
3548
		}
P
Philipp Reisner 已提交
3549

3550
		if (verify_tfm || csums_tfm) {
3551 3552
			new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
			if (!new_net_conf) {
3553
				drbd_err(device, "Allocation of new net_conf failed\n");
3554 3555 3556
				goto disconnect;
			}

3557
			*new_net_conf = *old_net_conf;
3558 3559

			if (verify_tfm) {
3560 3561
				strcpy(new_net_conf->verify_alg, p->verify_alg);
				new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3562 3563
				crypto_free_hash(peer_device->connection->verify_tfm);
				peer_device->connection->verify_tfm = verify_tfm;
3564
				drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3565 3566
			}
			if (csums_tfm) {
3567 3568
				strcpy(new_net_conf->csums_alg, p->csums_alg);
				new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3569 3570
				crypto_free_hash(peer_device->connection->csums_tfm);
				peer_device->connection->csums_tfm = csums_tfm;
3571
				drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3572
			}
3573
			rcu_assign_pointer(connection->net_conf, new_net_conf);
3574
		}
P
Philipp Reisner 已提交
3575 3576
	}

P
Philipp Reisner 已提交
3577
	if (new_disk_conf) {
3578 3579
		rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
		put_ldev(device);
P
Philipp Reisner 已提交
3580 3581 3582
	}

	if (new_plan) {
3583 3584
		old_plan = device->rs_plan_s;
		rcu_assign_pointer(device->rs_plan_s, new_plan);
P
Philipp Reisner 已提交
3585
	}
P
Philipp Reisner 已提交
3586

3587
	mutex_unlock(&connection->resource->conf_update);
P
Philipp Reisner 已提交
3588 3589 3590 3591
	synchronize_rcu();
	if (new_net_conf)
		kfree(old_net_conf);
	kfree(old_disk_conf);
P
Philipp Reisner 已提交
3592
	kfree(old_plan);
P
Philipp Reisner 已提交
3593

3594
	return 0;
P
Philipp Reisner 已提交
3595

P
Philipp Reisner 已提交
3596 3597
reconnect:
	if (new_disk_conf) {
3598
		put_ldev(device);
P
Philipp Reisner 已提交
3599 3600
		kfree(new_disk_conf);
	}
3601
	mutex_unlock(&connection->resource->conf_update);
P
Philipp Reisner 已提交
3602 3603
	return -EIO;

P
Philipp Reisner 已提交
3604
disconnect:
P
Philipp Reisner 已提交
3605 3606
	kfree(new_plan);
	if (new_disk_conf) {
3607
		put_ldev(device);
P
Philipp Reisner 已提交
3608 3609
		kfree(new_disk_conf);
	}
3610
	mutex_unlock(&connection->resource->conf_update);
P
Philipp Reisner 已提交
3611 3612 3613 3614 3615
	/* just for completeness: actually not needed,
	 * as this is not reached if csums_tfm was ok. */
	crypto_free_hash(csums_tfm);
	/* but free the verify_tfm again, if csums_tfm did not work out */
	crypto_free_hash(verify_tfm);
3616
	conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3617
	return -EIO;
P
Philipp Reisner 已提交
3618 3619 3620
}

/* warn if the arguments differ by more than 12.5% */
3621
static void warn_if_differ_considerably(struct drbd_device *device,
P
Philipp Reisner 已提交
3622 3623 3624 3625 3626 3627 3628
	const char *s, sector_t a, sector_t b)
{
	sector_t d;
	if (a == 0 || b == 0)
		return;
	d = (a > b) ? (a - b) : (b - a);
	if (d > (a>>3) || d > (b>>3))
3629
		drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
P
Philipp Reisner 已提交
3630 3631 3632
		     (unsigned long long)a, (unsigned long long)b);
}

3633
static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
3634
{
3635
	struct drbd_peer_device *peer_device;
3636
	struct drbd_device *device;
3637
	struct p_sizes *p = pi->data;
3638
	enum determine_dev_size dd = DS_UNCHANGED;
P
Philipp Reisner 已提交
3639 3640
	sector_t p_size, p_usize, my_usize;
	int ldsc = 0; /* local disk size changed */
3641
	enum dds_flags ddsf;
P
Philipp Reisner 已提交
3642

3643 3644
	peer_device = conn_peer_device(connection, pi->vnr);
	if (!peer_device)
3645
		return config_unknown_volume(connection, pi);
3646
	device = peer_device->device;
3647

P
Philipp Reisner 已提交
3648 3649 3650 3651 3652
	p_size = be64_to_cpu(p->d_size);
	p_usize = be64_to_cpu(p->u_size);

	/* just store the peer's disk size for now.
	 * we still need to figure out whether we accept that. */
3653
	device->p_size = p_size;
P
Philipp Reisner 已提交
3654

3655
	if (get_ldev(device)) {
P
Philipp Reisner 已提交
3656
		rcu_read_lock();
3657
		my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
P
Philipp Reisner 已提交
3658 3659
		rcu_read_unlock();

3660 3661 3662
		warn_if_differ_considerably(device, "lower level device sizes",
			   p_size, drbd_get_max_capacity(device->ldev));
		warn_if_differ_considerably(device, "user requested size",
P
Philipp Reisner 已提交
3663
					    p_usize, my_usize);
P
Philipp Reisner 已提交
3664 3665 3666

		/* if this is the first connect, or an otherwise expected
		 * param exchange, choose the minimum */
3667
		if (device->state.conn == C_WF_REPORT_PARAMS)
P
Philipp Reisner 已提交
3668
			p_usize = min_not_zero(my_usize, p_usize);
P
Philipp Reisner 已提交
3669 3670 3671

		/* Never shrink a device with usable data during connect.
		   But allow online shrinking if we are connected. */
3672 3673 3674 3675
		if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
		    drbd_get_capacity(device->this_bdev) &&
		    device->state.disk >= D_OUTDATED &&
		    device->state.conn < C_CONNECTED) {
3676
			drbd_err(device, "The peer's disk size is too small!\n");
3677
			conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3678
			put_ldev(device);
3679
			return -EIO;
P
Philipp Reisner 已提交
3680
		}
P
Philipp Reisner 已提交
3681 3682 3683 3684 3685 3686

		if (my_usize != p_usize) {
			struct disk_conf *old_disk_conf, *new_disk_conf = NULL;

			new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
			if (!new_disk_conf) {
3687
				drbd_err(device, "Allocation of new disk_conf failed\n");
3688
				put_ldev(device);
P
Philipp Reisner 已提交
3689 3690 3691
				return -ENOMEM;
			}

3692
			mutex_lock(&connection->resource->conf_update);
3693
			old_disk_conf = device->ldev->disk_conf;
P
Philipp Reisner 已提交
3694 3695 3696
			*new_disk_conf = *old_disk_conf;
			new_disk_conf->disk_size = p_usize;

3697
			rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3698
			mutex_unlock(&connection->resource->conf_update);
P
Philipp Reisner 已提交
3699 3700 3701
			synchronize_rcu();
			kfree(old_disk_conf);

3702
			drbd_info(device, "Peer sets u_size to %lu sectors\n",
P
Philipp Reisner 已提交
3703
				 (unsigned long)my_usize);
P
Philipp Reisner 已提交
3704
		}
P
Philipp Reisner 已提交
3705

3706
		put_ldev(device);
P
Philipp Reisner 已提交
3707 3708
	}

3709 3710 3711 3712 3713 3714 3715
	device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
	drbd_reconsider_max_bio_size(device);
	/* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
	   In case we cleared the QUEUE_FLAG_DISCARD from our queue in
	   drbd_reconsider_max_bio_size(), we can be sure that after
	   drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */

3716
	ddsf = be16_to_cpu(p->dds_flags);
3717 3718 3719
	if (get_ldev(device)) {
		dd = drbd_determine_dev_size(device, ddsf, NULL);
		put_ldev(device);
3720
		if (dd == DS_ERROR)
3721
			return -EIO;
3722
		drbd_md_sync(device);
P
Philipp Reisner 已提交
3723 3724
	} else {
		/* I am diskless, need to accept the peer's size. */
3725
		drbd_set_my_capacity(device, p_size);
P
Philipp Reisner 已提交
3726 3727
	}

3728 3729 3730
	if (get_ldev(device)) {
		if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
			device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
P
Philipp Reisner 已提交
3731 3732 3733
			ldsc = 1;
		}

3734
		put_ldev(device);
P
Philipp Reisner 已提交
3735 3736
	}

3737
	if (device->state.conn > C_WF_REPORT_PARAMS) {
P
Philipp Reisner 已提交
3738
		if (be64_to_cpu(p->c_size) !=
3739
		    drbd_get_capacity(device->this_bdev) || ldsc) {
P
Philipp Reisner 已提交
3740 3741
			/* we have different sizes, probably peer
			 * needs to know my new size... */
3742
			drbd_send_sizes(peer_device, 0, ddsf);
P
Philipp Reisner 已提交
3743
		}
3744 3745 3746 3747
		if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
		    (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
			if (device->state.pdsk >= D_INCONSISTENT &&
			    device->state.disk >= D_INCONSISTENT) {
3748
				if (ddsf & DDSF_NO_RESYNC)
3749
					drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3750
				else
3751
					resync_after_online_grow(device);
3752
			} else
3753
				set_bit(RESYNC_AFTER_NEG, &device->flags);
P
Philipp Reisner 已提交
3754 3755 3756
		}
	}

3757
	return 0;
P
Philipp Reisner 已提交
3758 3759
}

3760
static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
3761
{
3762
	struct drbd_peer_device *peer_device;
3763
	struct drbd_device *device;
3764
	struct p_uuids *p = pi->data;
P
Philipp Reisner 已提交
3765
	u64 *p_uuid;
3766
	int i, updated_uuids = 0;
P
Philipp Reisner 已提交
3767

3768 3769
	peer_device = conn_peer_device(connection, pi->vnr);
	if (!peer_device)
3770
		return config_unknown_volume(connection, pi);
3771
	device = peer_device->device;
3772

P
Philipp Reisner 已提交
3773
	p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3774
	if (!p_uuid) {
3775
		drbd_err(device, "kmalloc of p_uuid failed\n");
3776 3777
		return false;
	}
P
Philipp Reisner 已提交
3778 3779 3780 3781

	for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
		p_uuid[i] = be64_to_cpu(p->uuid[i]);

3782 3783
	kfree(device->p_uuid);
	device->p_uuid = p_uuid;
P
Philipp Reisner 已提交
3784

3785 3786 3787 3788
	if (device->state.conn < C_CONNECTED &&
	    device->state.disk < D_INCONSISTENT &&
	    device->state.role == R_PRIMARY &&
	    (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3789
		drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3790
		    (unsigned long long)device->ed_uuid);
3791
		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3792
		return -EIO;
P
Philipp Reisner 已提交
3793 3794
	}

3795
	if (get_ldev(device)) {
P
Philipp Reisner 已提交
3796
		int skip_initial_sync =
3797
			device->state.conn == C_CONNECTED &&
3798
			peer_device->connection->agreed_pro_version >= 90 &&
3799
			device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
P
Philipp Reisner 已提交
3800 3801
			(p_uuid[UI_FLAGS] & 8);
		if (skip_initial_sync) {
3802
			drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3803
			drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3804 3805
					"clear_n_write from receive_uuids",
					BM_LOCKED_TEST_ALLOWED);
3806 3807 3808
			_drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
			_drbd_uuid_set(device, UI_BITMAP, 0);
			_drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
P
Philipp Reisner 已提交
3809
					CS_VERBOSE, NULL);
3810
			drbd_md_sync(device);
3811
			updated_uuids = 1;
P
Philipp Reisner 已提交
3812
		}
3813 3814 3815
		put_ldev(device);
	} else if (device->state.disk < D_INCONSISTENT &&
		   device->state.role == R_PRIMARY) {
3816 3817
		/* I am a diskless primary, the peer just created a new current UUID
		   for me. */
3818
		updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
P
Philipp Reisner 已提交
3819 3820 3821 3822 3823 3824
	}

	/* Before we test for the disk state, we should wait until an eventually
	   ongoing cluster wide state change is finished. That is important if
	   we are primary and are detaching from our disk. We need to see the
	   new disk state... */
3825 3826 3827 3828
	mutex_lock(device->state_mutex);
	mutex_unlock(device->state_mutex);
	if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
		updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3829 3830

	if (updated_uuids)
3831
		drbd_print_uuids(device, "receiver updated UUIDs to");
P
Philipp Reisner 已提交
3832

3833
	return 0;
P
Philipp Reisner 已提交
3834 3835 3836 3837 3838 3839 3840 3841 3842 3843 3844
}

/**
 * convert_state() - Converts the peer's view of the cluster state to our point of view
 * @ps:		The state as seen by the peer.
 */
static union drbd_state convert_state(union drbd_state ps)
{
	union drbd_state ms;

	static enum drbd_conns c_tab[] = {
3845
		[C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
P
Philipp Reisner 已提交
3846 3847 3848 3849 3850 3851 3852 3853 3854 3855 3856 3857 3858 3859 3860 3861 3862 3863 3864 3865 3866
		[C_CONNECTED] = C_CONNECTED,

		[C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
		[C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
		[C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
		[C_VERIFY_S]       = C_VERIFY_T,
		[C_MASK]   = C_MASK,
	};

	ms.i = ps.i;

	ms.conn = c_tab[ps.conn];
	ms.peer = ps.role;
	ms.role = ps.peer;
	ms.pdsk = ps.disk;
	ms.disk = ps.pdsk;
	ms.peer_isp = (ps.aftr_isp | ps.user_isp);

	return ms;
}

3867
static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
3868
{
3869
	struct drbd_peer_device *peer_device;
3870
	struct drbd_device *device;
3871
	struct p_req_state *p = pi->data;
P
Philipp Reisner 已提交
3872
	union drbd_state mask, val;
3873
	enum drbd_state_rv rv;
P
Philipp Reisner 已提交
3874

3875 3876
	peer_device = conn_peer_device(connection, pi->vnr);
	if (!peer_device)
3877
		return -EIO;
3878
	device = peer_device->device;
3879

P
Philipp Reisner 已提交
3880 3881 3882
	mask.i = be32_to_cpu(p->mask);
	val.i = be32_to_cpu(p->val);

3883
	if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
3884
	    mutex_is_locked(device->state_mutex)) {
3885
		drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
3886
		return 0;
P
Philipp Reisner 已提交
3887 3888 3889 3890 3891
	}

	mask = convert_state(mask);
	val = convert_state(val);

3892
	rv = drbd_change_state(device, CS_VERBOSE, mask, val);
3893
	drbd_send_sr_reply(peer_device, rv);
P
Philipp Reisner 已提交
3894

3895
	drbd_md_sync(device);
P
Philipp Reisner 已提交
3896

3897
	return 0;
P
Philipp Reisner 已提交
3898 3899
}

3900
static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
3901
{
3902
	struct p_req_state *p = pi->data;
P
Philipp Reisner 已提交
3903
	union drbd_state mask, val;
3904
	enum drbd_state_rv rv;
P
Philipp Reisner 已提交
3905 3906 3907 3908

	mask.i = be32_to_cpu(p->mask);
	val.i = be32_to_cpu(p->val);

3909 3910 3911
	if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
	    mutex_is_locked(&connection->cstate_mutex)) {
		conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
3912
		return 0;
P
Philipp Reisner 已提交
3913 3914 3915 3916 3917
	}

	mask = convert_state(mask);
	val = convert_state(val);

3918 3919
	rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
	conn_send_sr_reply(connection, rv);
P
Philipp Reisner 已提交
3920

3921
	return 0;
P
Philipp Reisner 已提交
3922 3923
}

3924
static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
3925
{
3926
	struct drbd_peer_device *peer_device;
3927
	struct drbd_device *device;
3928
	struct p_state *p = pi->data;
3929
	union drbd_state os, ns, peer_state;
P
Philipp Reisner 已提交
3930
	enum drbd_disk_state real_peer_disk;
3931
	enum chg_state_flags cs_flags;
P
Philipp Reisner 已提交
3932 3933
	int rv;

3934 3935
	peer_device = conn_peer_device(connection, pi->vnr);
	if (!peer_device)
3936
		return config_unknown_volume(connection, pi);
3937
	device = peer_device->device;
3938

P
Philipp Reisner 已提交
3939 3940 3941 3942
	peer_state.i = be32_to_cpu(p->state);

	real_peer_disk = peer_state.disk;
	if (peer_state.disk == D_NEGOTIATING) {
3943
		real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3944
		drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
P
Philipp Reisner 已提交
3945 3946
	}

3947
	spin_lock_irq(&device->resource->req_lock);
P
Philipp Reisner 已提交
3948
 retry:
3949
	os = ns = drbd_read_state(device);
3950
	spin_unlock_irq(&device->resource->req_lock);
P
Philipp Reisner 已提交
3951

3952 3953 3954 3955
	/* If some other part of the code (asender thread, timeout)
	 * already decided to close the connection again,
	 * we must not "re-establish" it here. */
	if (os.conn <= C_TEAR_DOWN)
3956
		return -ECONNRESET;
3957

3958 3959 3960 3961 3962 3963 3964 3965
	/* If this is the "end of sync" confirmation, usually the peer disk
	 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
	 * set) resync started in PausedSyncT, or if the timing of pause-/
	 * unpause-sync events has been "just right", the peer disk may
	 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
	 */
	if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
	    real_peer_disk == D_UP_TO_DATE &&
3966 3967 3968 3969 3970 3971 3972 3973 3974 3975 3976 3977 3978 3979 3980 3981
	    os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
		/* If we are (becoming) SyncSource, but peer is still in sync
		 * preparation, ignore its uptodate-ness to avoid flapping, it
		 * will change to inconsistent once the peer reaches active
		 * syncing states.
		 * It may have changed syncer-paused flags, however, so we
		 * cannot ignore this completely. */
		if (peer_state.conn > C_CONNECTED &&
		    peer_state.conn < C_SYNC_SOURCE)
			real_peer_disk = D_INCONSISTENT;

		/* if peer_state changes to connected at the same time,
		 * it explicitly notifies us that it finished resync.
		 * Maybe we should finish it up, too? */
		else if (os.conn >= C_SYNC_SOURCE &&
			 peer_state.conn == C_CONNECTED) {
3982 3983
			if (drbd_bm_total_weight(device) <= device->rs_failed)
				drbd_resync_finished(device);
3984
			return 0;
3985 3986 3987
		}
	}

3988 3989 3990
	/* explicit verify finished notification, stop sector reached. */
	if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
	    peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
3991 3992
		ov_out_of_sync_print(device);
		drbd_resync_finished(device);
3993
		return 0;
3994 3995
	}

3996 3997 3998 3999 4000 4001 4002 4003 4004
	/* peer says his disk is inconsistent, while we think it is uptodate,
	 * and this happens while the peer still thinks we have a sync going on,
	 * but we think we are already done with the sync.
	 * We ignore this to avoid flapping pdsk.
	 * This should not happen, if the peer is a recent version of drbd. */
	if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
	    os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
		real_peer_disk = D_UP_TO_DATE;

4005 4006
	if (ns.conn == C_WF_REPORT_PARAMS)
		ns.conn = C_CONNECTED;
P
Philipp Reisner 已提交
4007

4008 4009 4010
	if (peer_state.conn == C_AHEAD)
		ns.conn = C_BEHIND;

4011 4012
	if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
	    get_ldev_if_state(device, D_NEGOTIATING)) {
P
Philipp Reisner 已提交
4013 4014 4015
		int cr; /* consider resync */

		/* if we established a new connection */
4016
		cr  = (os.conn < C_CONNECTED);
P
Philipp Reisner 已提交
4017 4018
		/* if we had an established connection
		 * and one of the nodes newly attaches a disk */
4019
		cr |= (os.conn == C_CONNECTED &&
P
Philipp Reisner 已提交
4020
		       (peer_state.disk == D_NEGOTIATING ||
4021
			os.disk == D_NEGOTIATING));
P
Philipp Reisner 已提交
4022 4023
		/* if we have both been inconsistent, and the peer has been
		 * forced to be UpToDate with --overwrite-data */
4024
		cr |= test_bit(CONSIDER_RESYNC, &device->flags);
P
Philipp Reisner 已提交
4025 4026
		/* if we had been plain connected, and the admin requested to
		 * start a sync by "invalidate" or "invalidate-remote" */
4027
		cr |= (os.conn == C_CONNECTED &&
P
Philipp Reisner 已提交
4028 4029 4030 4031
				(peer_state.conn >= C_STARTING_SYNC_S &&
				 peer_state.conn <= C_WF_BITMAP_T));

		if (cr)
4032
			ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
P
Philipp Reisner 已提交
4033

4034
		put_ldev(device);
4035 4036
		if (ns.conn == C_MASK) {
			ns.conn = C_CONNECTED;
4037 4038
			if (device->state.disk == D_NEGOTIATING) {
				drbd_force_state(device, NS(disk, D_FAILED));
P
Philipp Reisner 已提交
4039
			} else if (peer_state.disk == D_NEGOTIATING) {
4040
				drbd_err(device, "Disk attach process on the peer node was aborted.\n");
P
Philipp Reisner 已提交
4041
				peer_state.disk = D_DISKLESS;
4042
				real_peer_disk = D_DISKLESS;
P
Philipp Reisner 已提交
4043
			} else {
4044
				if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4045
					return -EIO;
4046
				D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4047
				conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4048
				return -EIO;
P
Philipp Reisner 已提交
4049 4050 4051 4052
			}
		}
	}

4053
	spin_lock_irq(&device->resource->req_lock);
4054
	if (os.i != drbd_read_state(device).i)
P
Philipp Reisner 已提交
4055
		goto retry;
4056
	clear_bit(CONSIDER_RESYNC, &device->flags);
P
Philipp Reisner 已提交
4057 4058 4059
	ns.peer = peer_state.role;
	ns.pdsk = real_peer_disk;
	ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4060
	if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4061
		ns.disk = device->new_state_tmp.disk;
4062
	cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4063 4064
	if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
	    test_bit(NEW_CUR_UUID, &device->flags)) {
4065
		/* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4066
		   for temporal network outages! */
4067
		spin_unlock_irq(&device->resource->req_lock);
4068
		drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4069
		tl_clear(peer_device->connection);
4070 4071
		drbd_uuid_new_current(device);
		clear_bit(NEW_CUR_UUID, &device->flags);
4072
		conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4073
		return -EIO;
4074
	}
4075 4076
	rv = _drbd_set_state(device, ns, cs_flags, NULL);
	ns = drbd_read_state(device);
4077
	spin_unlock_irq(&device->resource->req_lock);
P
Philipp Reisner 已提交
4078 4079

	if (rv < SS_SUCCESS) {
4080
		conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4081
		return -EIO;
P
Philipp Reisner 已提交
4082 4083
	}

4084 4085
	if (os.conn > C_WF_REPORT_PARAMS) {
		if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
P
Philipp Reisner 已提交
4086 4087 4088 4089
		    peer_state.disk != D_NEGOTIATING ) {
			/* we want resync, peer has not yet decided to sync... */
			/* Nowadays only used when forcing a node into primary role and
			   setting its disk to UpToDate with that */
4090 4091
			drbd_send_uuids(peer_device);
			drbd_send_current_state(peer_device);
P
Philipp Reisner 已提交
4092 4093 4094
		}
	}

4095
	clear_bit(DISCARD_MY_DATA, &device->flags);
P
Philipp Reisner 已提交
4096

4097
	drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
P
Philipp Reisner 已提交
4098

4099
	return 0;
P
Philipp Reisner 已提交
4100 4101
}

4102
static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
4103
{
4104
	struct drbd_peer_device *peer_device;
4105
	struct drbd_device *device;
4106
	struct p_rs_uuid *p = pi->data;
4107

4108 4109
	peer_device = conn_peer_device(connection, pi->vnr);
	if (!peer_device)
4110
		return -EIO;
4111
	device = peer_device->device;
P
Philipp Reisner 已提交
4112

4113 4114 4115 4116 4117
	wait_event(device->misc_wait,
		   device->state.conn == C_WF_SYNC_UUID ||
		   device->state.conn == C_BEHIND ||
		   device->state.conn < C_CONNECTED ||
		   device->state.disk < D_NEGOTIATING);
P
Philipp Reisner 已提交
4118

4119
	/* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
P
Philipp Reisner 已提交
4120 4121 4122

	/* Here the _drbd_uuid_ functions are right, current should
	   _not_ be rotated into the history */
4123 4124 4125
	if (get_ldev_if_state(device, D_NEGOTIATING)) {
		_drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
		_drbd_uuid_set(device, UI_BITMAP, 0UL);
P
Philipp Reisner 已提交
4126

4127 4128
		drbd_print_uuids(device, "updated sync uuid");
		drbd_start_resync(device, C_SYNC_TARGET);
P
Philipp Reisner 已提交
4129

4130
		put_ldev(device);
P
Philipp Reisner 已提交
4131
	} else
4132
		drbd_err(device, "Ignoring SyncUUID packet!\n");
P
Philipp Reisner 已提交
4133

4134
	return 0;
P
Philipp Reisner 已提交
4135 4136
}

4137 4138 4139 4140 4141 4142 4143
/**
 * receive_bitmap_plain
 *
 * Return 0 when done, 1 when another iteration is needed, and a negative error
 * code upon failure.
 */
static int
4144
receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4145
		     unsigned long *p, struct bm_xfer_ctx *c)
P
Philipp Reisner 已提交
4146
{
4147
	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4148
				 drbd_header_size(peer_device->connection);
4149
	unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4150
				       c->bm_words - c->word_offset);
4151
	unsigned int want = num_words * sizeof(*p);
4152
	int err;
P
Philipp Reisner 已提交
4153

4154
	if (want != size) {
4155
		drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4156
		return -EIO;
P
Philipp Reisner 已提交
4157 4158
	}
	if (want == 0)
4159
		return 0;
4160
	err = drbd_recv_all(peer_device->connection, p, want);
4161
	if (err)
4162
		return err;
P
Philipp Reisner 已提交
4163

4164
	drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
P
Philipp Reisner 已提交
4165 4166 4167 4168 4169 4170

	c->word_offset += num_words;
	c->bit_offset = c->word_offset * BITS_PER_LONG;
	if (c->bit_offset > c->bm_bits)
		c->bit_offset = c->bm_bits;

4171
	return 1;
P
Philipp Reisner 已提交
4172 4173
}

4174 4175 4176 4177 4178 4179 4180 4181 4182 4183 4184 4185 4186 4187 4188
static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
{
	return (enum drbd_bitmap_code)(p->encoding & 0x0f);
}

static int dcbp_get_start(struct p_compressed_bm *p)
{
	return (p->encoding & 0x80) != 0;
}

static int dcbp_get_pad_bits(struct p_compressed_bm *p)
{
	return (p->encoding >> 4) & 0x7;
}

4189 4190 4191 4192 4193 4194 4195
/**
 * recv_bm_rle_bits
 *
 * Return 0 when done, 1 when another iteration is needed, and a negative error
 * code upon failure.
 */
static int
4196
recv_bm_rle_bits(struct drbd_peer_device *peer_device,
P
Philipp Reisner 已提交
4197
		struct p_compressed_bm *p,
4198 4199
		 struct bm_xfer_ctx *c,
		 unsigned int len)
P
Philipp Reisner 已提交
4200 4201 4202 4203 4204 4205 4206
{
	struct bitstream bs;
	u64 look_ahead;
	u64 rl;
	u64 tmp;
	unsigned long s = c->bit_offset;
	unsigned long e;
4207
	int toggle = dcbp_get_start(p);
P
Philipp Reisner 已提交
4208 4209 4210
	int have;
	int bits;

4211
	bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
P
Philipp Reisner 已提交
4212 4213 4214

	bits = bitstream_get_bits(&bs, &look_ahead, 64);
	if (bits < 0)
4215
		return -EIO;
P
Philipp Reisner 已提交
4216 4217 4218 4219

	for (have = bits; have > 0; s += rl, toggle = !toggle) {
		bits = vli_decode_bits(&rl, look_ahead);
		if (bits <= 0)
4220
			return -EIO;
P
Philipp Reisner 已提交
4221 4222 4223 4224

		if (toggle) {
			e = s + rl -1;
			if (e >= c->bm_bits) {
4225
				drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4226
				return -EIO;
P
Philipp Reisner 已提交
4227
			}
4228
			_drbd_bm_set_bits(peer_device->device, s, e);
P
Philipp Reisner 已提交
4229 4230 4231
		}

		if (have < bits) {
4232
			drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
P
Philipp Reisner 已提交
4233 4234 4235
				have, bits, look_ahead,
				(unsigned int)(bs.cur.b - p->code),
				(unsigned int)bs.buf_len);
4236
			return -EIO;
P
Philipp Reisner 已提交
4237
		}
4238 4239 4240 4241 4242
		/* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
		if (likely(bits < 64))
			look_ahead >>= bits;
		else
			look_ahead = 0;
P
Philipp Reisner 已提交
4243 4244 4245 4246
		have -= bits;

		bits = bitstream_get_bits(&bs, &tmp, 64 - have);
		if (bits < 0)
4247
			return -EIO;
P
Philipp Reisner 已提交
4248 4249 4250 4251 4252 4253 4254
		look_ahead |= tmp << have;
		have += bits;
	}

	c->bit_offset = s;
	bm_xfer_ctx_bit_to_word_offset(c);

4255
	return (s != c->bm_bits);
P
Philipp Reisner 已提交
4256 4257
}

4258 4259 4260 4261 4262 4263 4264
/**
 * decode_bitmap_c
 *
 * Return 0 when done, 1 when another iteration is needed, and a negative error
 * code upon failure.
 */
static int
4265
decode_bitmap_c(struct drbd_peer_device *peer_device,
P
Philipp Reisner 已提交
4266
		struct p_compressed_bm *p,
4267 4268
		struct bm_xfer_ctx *c,
		unsigned int len)
P
Philipp Reisner 已提交
4269
{
4270
	if (dcbp_get_code(p) == RLE_VLI_Bits)
4271
		return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
P
Philipp Reisner 已提交
4272 4273 4274 4275 4276

	/* other variants had been implemented for evaluation,
	 * but have been dropped as this one turned out to be "best"
	 * during all our tests. */

4277 4278
	drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
	conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4279
	return -EIO;
P
Philipp Reisner 已提交
4280 4281
}

4282
void INFO_bm_xfer_stats(struct drbd_device *device,
P
Philipp Reisner 已提交
4283 4284 4285
		const char *direction, struct bm_xfer_ctx *c)
{
	/* what would it take to transfer it "plaintext" */
4286
	unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4287 4288 4289 4290 4291 4292
	unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
	unsigned int plain =
		header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
		c->bm_words * sizeof(unsigned long);
	unsigned int total = c->bytes[0] + c->bytes[1];
	unsigned int r;
P
Philipp Reisner 已提交
4293 4294 4295 4296 4297 4298 4299 4300 4301 4302 4303 4304 4305 4306 4307 4308 4309

	/* total can not be zero. but just in case: */
	if (total == 0)
		return;

	/* don't report if not compressed */
	if (total >= plain)
		return;

	/* total < plain. check for overflow, still */
	r = (total > UINT_MAX/1000) ? (total / (plain/1000))
		                    : (1000 * total / plain);

	if (r > 1000)
		r = 1000;

	r = 1000 - r;
4310
	drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
P
Philipp Reisner 已提交
4311 4312 4313 4314 4315 4316 4317 4318 4319 4320 4321 4322 4323 4324 4325
	     "total %u; compression: %u.%u%%\n",
			direction,
			c->bytes[1], c->packets[1],
			c->bytes[0], c->packets[0],
			total, r/10, r % 10);
}

/* Since we are processing the bitfield from lower addresses to higher,
   it does not matter if the process it in 32 bit chunks or 64 bit
   chunks as long as it is little endian. (Understand it as byte stream,
   beginning with the lowest byte...) If we would use big endian
   we would need to process it from the highest address to the lowest,
   in order to be agnostic to the 32 vs 64 bits issue.

   returns 0 on failure, 1 if we successfully received it. */
4326
static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
4327
{
4328
	struct drbd_peer_device *peer_device;
4329
	struct drbd_device *device;
P
Philipp Reisner 已提交
4330
	struct bm_xfer_ctx c;
4331
	int err;
4332

4333 4334
	peer_device = conn_peer_device(connection, pi->vnr);
	if (!peer_device)
4335
		return -EIO;
4336
	device = peer_device->device;
P
Philipp Reisner 已提交
4337

4338
	drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4339 4340
	/* you are supposed to send additional out-of-sync information
	 * if you actually set bits during this phase */
P
Philipp Reisner 已提交
4341 4342

	c = (struct bm_xfer_ctx) {
4343 4344
		.bm_bits = drbd_bm_bits(device),
		.bm_words = drbd_bm_words(device),
P
Philipp Reisner 已提交
4345 4346
	};

4347
	for(;;) {
4348
		if (pi->cmd == P_BITMAP)
4349
			err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4350
		else if (pi->cmd == P_COMPRESSED_BITMAP) {
P
Philipp Reisner 已提交
4351 4352
			/* MAYBE: sanity check that we speak proto >= 90,
			 * and the feature is enabled! */
4353
			struct p_compressed_bm *p = pi->data;
P
Philipp Reisner 已提交
4354

4355
			if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4356
				drbd_err(device, "ReportCBitmap packet too large\n");
4357
				err = -EIO;
P
Philipp Reisner 已提交
4358 4359
				goto out;
			}
4360
			if (pi->size <= sizeof(*p)) {
4361
				drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4362
				err = -EIO;
4363
				goto out;
P
Philipp Reisner 已提交
4364
			}
4365
			err = drbd_recv_all(peer_device->connection, p, pi->size);
4366 4367
			if (err)
			       goto out;
4368
			err = decode_bitmap_c(peer_device, p, &c, pi->size);
P
Philipp Reisner 已提交
4369
		} else {
4370
			drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4371
			err = -EIO;
P
Philipp Reisner 已提交
4372 4373 4374
			goto out;
		}

4375
		c.packets[pi->cmd == P_BITMAP]++;
4376
		c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
P
Philipp Reisner 已提交
4377

4378 4379 4380
		if (err <= 0) {
			if (err < 0)
				goto out;
P
Philipp Reisner 已提交
4381
			break;
4382
		}
4383
		err = drbd_recv_header(peer_device->connection, pi);
4384
		if (err)
P
Philipp Reisner 已提交
4385
			goto out;
4386
	}
P
Philipp Reisner 已提交
4387

4388
	INFO_bm_xfer_stats(device, "receive", &c);
P
Philipp Reisner 已提交
4389

4390
	if (device->state.conn == C_WF_BITMAP_T) {
4391 4392
		enum drbd_state_rv rv;

4393
		err = drbd_send_bitmap(device);
4394
		if (err)
P
Philipp Reisner 已提交
4395 4396
			goto out;
		/* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4397
		rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4398
		D_ASSERT(device, rv == SS_SUCCESS);
4399
	} else if (device->state.conn != C_WF_BITMAP_S) {
P
Philipp Reisner 已提交
4400 4401
		/* admin may have requested C_DISCONNECTING,
		 * other threads may have noticed network errors */
4402
		drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4403
		    drbd_conn_str(device->state.conn));
P
Philipp Reisner 已提交
4404
	}
4405
	err = 0;
P
Philipp Reisner 已提交
4406 4407

 out:
4408 4409 4410
	drbd_bm_unlock(device);
	if (!err && device->state.conn == C_WF_BITMAP_S)
		drbd_start_resync(device, C_SYNC_SOURCE);
4411
	return err;
P
Philipp Reisner 已提交
4412 4413
}

4414
static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
4415
{
4416
	drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4417
		 pi->cmd, pi->size);
P
Philipp Reisner 已提交
4418

4419
	return ignore_remaining_packet(connection, pi);
P
Philipp Reisner 已提交
4420 4421
}

4422
static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4423
{
4424 4425
	/* Make sure we've acked all the TCP data associated
	 * with the data requests being unplugged */
4426
	drbd_tcp_quickack(connection->data.socket);
4427

4428
	return 0;
4429 4430
}

4431
static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4432
{
4433
	struct drbd_peer_device *peer_device;
4434
	struct drbd_device *device;
4435
	struct p_block_desc *p = pi->data;
4436

4437 4438
	peer_device = conn_peer_device(connection, pi->vnr);
	if (!peer_device)
4439
		return -EIO;
4440
	device = peer_device->device;
4441

4442
	switch (device->state.conn) {
4443 4444 4445 4446 4447
	case C_WF_SYNC_UUID:
	case C_WF_BITMAP_T:
	case C_BEHIND:
			break;
	default:
4448
		drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4449
				drbd_conn_str(device->state.conn));
4450 4451
	}

4452
	drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4453

4454
	return 0;
4455 4456
}

4457 4458 4459
struct data_cmd {
	int expect_payload;
	size_t pkt_size;
4460
	int (*fn)(struct drbd_connection *, struct packet_info *);
4461 4462 4463 4464 4465 4466 4467
};

static struct data_cmd drbd_cmd_handler[] = {
	[P_DATA]	    = { 1, sizeof(struct p_data), receive_Data },
	[P_DATA_REPLY]	    = { 1, sizeof(struct p_data), receive_DataReply },
	[P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
	[P_BARRIER]	    = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4468 4469 4470
	[P_BITMAP]	    = { 1, 0, receive_bitmap } ,
	[P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
	[P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4471 4472
	[P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
	[P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4473 4474
	[P_SYNC_PARAM]	    = { 1, 0, receive_SyncParam },
	[P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4475 4476 4477 4478 4479 4480 4481 4482 4483 4484
	[P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
	[P_UUIDS]	    = { 0, sizeof(struct p_uuids), receive_uuids },
	[P_SIZES]	    = { 0, sizeof(struct p_sizes), receive_sizes },
	[P_STATE]	    = { 0, sizeof(struct p_state), receive_state },
	[P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
	[P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
	[P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
	[P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
	[P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
	[P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4485
	[P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4486
	[P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4487
	[P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4488
	[P_TRIM]	    = { 0, sizeof(struct p_trim), receive_Data },
P
Philipp Reisner 已提交
4489 4490
};

4491
static void drbdd(struct drbd_connection *connection)
P
Philipp Reisner 已提交
4492
{
4493
	struct packet_info pi;
4494
	size_t shs; /* sub header size */
4495
	int err;
P
Philipp Reisner 已提交
4496

4497
	while (get_t_state(&connection->receiver) == RUNNING) {
4498
		struct data_cmd *cmd;
P
Philipp Reisner 已提交
4499

4500 4501
		drbd_thread_current_set_cpu(&connection->receiver);
		if (drbd_recv_header(connection, &pi))
4502
			goto err_out;
P
Philipp Reisner 已提交
4503

4504
		cmd = &drbd_cmd_handler[pi.cmd];
4505
		if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4506
			drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4507
				 cmdname(pi.cmd), pi.cmd);
4508
			goto err_out;
4509
		}
P
Philipp Reisner 已提交
4510

4511 4512
		shs = cmd->pkt_size;
		if (pi.size > shs && !cmd->expect_payload) {
4513
			drbd_err(connection, "No payload expected %s l:%d\n",
4514
				 cmdname(pi.cmd), pi.size);
4515
			goto err_out;
P
Philipp Reisner 已提交
4516 4517
		}

4518
		if (shs) {
4519
			err = drbd_recv_all_warn(connection, pi.data, shs);
4520
			if (err)
4521
				goto err_out;
4522
			pi.size -= shs;
4523 4524
		}

4525
		err = cmd->fn(connection, &pi);
4526
		if (err) {
4527
			drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4528
				 cmdname(pi.cmd), err, pi.size);
4529
			goto err_out;
P
Philipp Reisner 已提交
4530 4531
		}
	}
4532
	return;
P
Philipp Reisner 已提交
4533

4534
    err_out:
4535
	conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
P
Philipp Reisner 已提交
4536 4537
}

4538
static void conn_disconnect(struct drbd_connection *connection)
P
Philipp Reisner 已提交
4539
{
4540
	struct drbd_peer_device *peer_device;
4541
	enum drbd_conns oc;
P
Philipp Reisner 已提交
4542
	int vnr;
P
Philipp Reisner 已提交
4543

4544
	if (connection->cstate == C_STANDALONE)
P
Philipp Reisner 已提交
4545 4546
		return;

4547 4548 4549 4550 4551
	/* We are about to start the cleanup after connection loss.
	 * Make sure drbd_make_request knows about that.
	 * Usually we should be in some network failure state already,
	 * but just in case we are not, we fix it up here.
	 */
4552
	conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4553

P
Philipp Reisner 已提交
4554
	/* asender does not clean up anything. it must not interfere, either */
4555 4556
	drbd_thread_stop(&connection->asender);
	drbd_free_sock(connection);
4557

P
Philipp Reisner 已提交
4558
	rcu_read_lock();
4559 4560
	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
		struct drbd_device *device = peer_device->device;
4561
		kref_get(&device->kref);
P
Philipp Reisner 已提交
4562
		rcu_read_unlock();
4563
		drbd_disconnected(peer_device);
4564
		kref_put(&device->kref, drbd_destroy_device);
P
Philipp Reisner 已提交
4565 4566 4567 4568
		rcu_read_lock();
	}
	rcu_read_unlock();

4569
	if (!list_empty(&connection->current_epoch->list))
4570
		drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4571
	/* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4572 4573
	atomic_set(&connection->current_epoch->epoch_size, 0);
	connection->send.seen_any_write_yet = false;
4574

4575
	drbd_info(connection, "Connection closed\n");
4576

4577 4578
	if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
		conn_try_outdate_peer_async(connection);
4579

4580
	spin_lock_irq(&connection->resource->req_lock);
4581
	oc = connection->cstate;
4582
	if (oc >= C_UNCONNECTED)
4583
		_conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4584

4585
	spin_unlock_irq(&connection->resource->req_lock);
4586

4587
	if (oc == C_DISCONNECTING)
4588
		conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4589 4590
}

4591
static int drbd_disconnected(struct drbd_peer_device *peer_device)
4592
{
4593
	struct drbd_device *device = peer_device->device;
4594
	unsigned int i;
P
Philipp Reisner 已提交
4595

4596
	/* wait for current activity to cease. */
4597
	spin_lock_irq(&device->resource->req_lock);
4598 4599 4600
	_drbd_wait_ee_list_empty(device, &device->active_ee);
	_drbd_wait_ee_list_empty(device, &device->sync_ee);
	_drbd_wait_ee_list_empty(device, &device->read_ee);
4601
	spin_unlock_irq(&device->resource->req_lock);
P
Philipp Reisner 已提交
4602 4603 4604 4605 4606 4607 4608 4609 4610 4611 4612

	/* We do not have data structures that would allow us to
	 * get the rs_pending_cnt down to 0 again.
	 *  * On C_SYNC_TARGET we do not have any data structures describing
	 *    the pending RSDataRequest's we have sent.
	 *  * On C_SYNC_SOURCE there is no data structure that tracks
	 *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
	 *  And no, it is not the sum of the reference counts in the
	 *  resync_LRU. The resync_LRU tracks the whole operation including
	 *  the disk-IO, while the rs_pending_cnt only tracks the blocks
	 *  on the fly. */
4613 4614 4615 4616 4617
	drbd_rs_cancel_all(device);
	device->rs_total = 0;
	device->rs_failed = 0;
	atomic_set(&device->rs_pending_cnt, 0);
	wake_up(&device->misc_wait);
P
Philipp Reisner 已提交
4618

4619 4620
	del_timer_sync(&device->resync_timer);
	resync_timer_fn((unsigned long)device);
P
Philipp Reisner 已提交
4621 4622 4623 4624

	/* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
	 * w_make_resync_request etc. which may still be on the worker queue
	 * to be "canceled" */
4625
	drbd_flush_workqueue(&peer_device->connection->sender_work);
P
Philipp Reisner 已提交
4626

4627
	drbd_finish_peer_reqs(device);
P
Philipp Reisner 已提交
4628

4629 4630 4631
	/* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
	   might have issued a work again. The one before drbd_finish_peer_reqs() is
	   necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4632
	drbd_flush_workqueue(&peer_device->connection->sender_work);
4633

4634 4635
	/* need to do it again, drbd_finish_peer_reqs() may have populated it
	 * again via drbd_try_clear_on_disk_bm(). */
4636
	drbd_rs_cancel_all(device);
P
Philipp Reisner 已提交
4637

4638 4639
	kfree(device->p_uuid);
	device->p_uuid = NULL;
P
Philipp Reisner 已提交
4640

4641
	if (!drbd_suspended(device))
4642
		tl_clear(peer_device->connection);
P
Philipp Reisner 已提交
4643

4644
	drbd_md_sync(device);
P
Philipp Reisner 已提交
4645

4646 4647
	/* serialize with bitmap writeout triggered by the state change,
	 * if any. */
4648
	wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4649

P
Philipp Reisner 已提交
4650 4651 4652 4653 4654 4655 4656
	/* tcp_close and release of sendpage pages can be deferred.  I don't
	 * want to use SO_LINGER, because apparently it can be deferred for
	 * more than 20 seconds (longest time I checked).
	 *
	 * Actually we don't care for exactly when the network stack does its
	 * put_page(), but release our reference on these pages right here.
	 */
4657
	i = drbd_free_peer_reqs(device, &device->net_ee);
P
Philipp Reisner 已提交
4658
	if (i)
4659
		drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4660
	i = atomic_read(&device->pp_in_use_by_net);
4661
	if (i)
4662
		drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4663
	i = atomic_read(&device->pp_in_use);
P
Philipp Reisner 已提交
4664
	if (i)
4665
		drbd_info(device, "pp_in_use = %d, expected 0\n", i);
P
Philipp Reisner 已提交
4666

4667 4668 4669 4670
	D_ASSERT(device, list_empty(&device->read_ee));
	D_ASSERT(device, list_empty(&device->active_ee));
	D_ASSERT(device, list_empty(&device->sync_ee));
	D_ASSERT(device, list_empty(&device->done_ee));
P
Philipp Reisner 已提交
4671

4672
	return 0;
P
Philipp Reisner 已提交
4673 4674 4675 4676 4677 4678 4679 4680 4681 4682 4683
}

/*
 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
 * we can agree on is stored in agreed_pro_version.
 *
 * feature flags and the reserved array should be enough room for future
 * enhancements of the handshake protocol, and possible plugins...
 *
 * for now, they are expected to be zero, but ignored.
 */
4684
static int drbd_send_features(struct drbd_connection *connection)
P
Philipp Reisner 已提交
4685
{
4686 4687
	struct drbd_socket *sock;
	struct p_connection_features *p;
P
Philipp Reisner 已提交
4688

4689 4690
	sock = &connection->data;
	p = conn_prepare_command(connection, sock);
4691
	if (!p)
4692
		return -EIO;
P
Philipp Reisner 已提交
4693 4694 4695
	memset(p, 0, sizeof(*p));
	p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
	p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4696
	p->feature_flags = cpu_to_be32(PRO_FEATURES);
4697
	return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
P
Philipp Reisner 已提交
4698 4699 4700 4701 4702 4703 4704 4705 4706
}

/*
 * return values:
 *   1 yes, we have a valid connection
 *   0 oops, did not work out, please try again
 *  -1 peer talks different language,
 *     no point in trying again, please go standalone.
 */
4707
static int drbd_do_features(struct drbd_connection *connection)
P
Philipp Reisner 已提交
4708
{
4709
	/* ASSERT current == connection->receiver ... */
4710 4711
	struct p_connection_features *p;
	const int expect = sizeof(struct p_connection_features);
4712
	struct packet_info pi;
4713
	int err;
P
Philipp Reisner 已提交
4714

4715
	err = drbd_send_features(connection);
4716
	if (err)
P
Philipp Reisner 已提交
4717 4718
		return 0;

4719
	err = drbd_recv_header(connection, &pi);
4720
	if (err)
P
Philipp Reisner 已提交
4721 4722
		return 0;

4723
	if (pi.cmd != P_CONNECTION_FEATURES) {
4724
		drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4725
			 cmdname(pi.cmd), pi.cmd);
P
Philipp Reisner 已提交
4726 4727 4728
		return -1;
	}

4729
	if (pi.size != expect) {
4730
		drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4731
		     expect, pi.size);
P
Philipp Reisner 已提交
4732 4733 4734
		return -1;
	}

4735
	p = pi.data;
4736
	err = drbd_recv_all_warn(connection, p, expect);
4737
	if (err)
P
Philipp Reisner 已提交
4738 4739 4740 4741 4742 4743 4744 4745 4746 4747 4748
		return 0;

	p->protocol_min = be32_to_cpu(p->protocol_min);
	p->protocol_max = be32_to_cpu(p->protocol_max);
	if (p->protocol_max == 0)
		p->protocol_max = p->protocol_min;

	if (PRO_VERSION_MAX < p->protocol_min ||
	    PRO_VERSION_MIN > p->protocol_max)
		goto incompat;

4749
	connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4750
	connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
P
Philipp Reisner 已提交
4751

4752
	drbd_info(connection, "Handshake successful: "
4753
	     "Agreed network protocol version %d\n", connection->agreed_pro_version);
P
Philipp Reisner 已提交
4754

4755 4756 4757
	drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n",
		  connection->agreed_features & FF_TRIM ? " " : " not ");

P
Philipp Reisner 已提交
4758 4759 4760
	return 1;

 incompat:
4761
	drbd_err(connection, "incompatible DRBD dialects: "
P
Philipp Reisner 已提交
4762 4763 4764 4765 4766 4767 4768
	    "I support %d-%d, peer supports %d-%d\n",
	    PRO_VERSION_MIN, PRO_VERSION_MAX,
	    p->protocol_min, p->protocol_max);
	return -1;
}

#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4769
static int drbd_do_auth(struct drbd_connection *connection)
P
Philipp Reisner 已提交
4770
{
4771 4772
	drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
	drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4773
	return -1;
P
Philipp Reisner 已提交
4774 4775 4776
}
#else
#define CHALLENGE_LEN 64
4777 4778 4779 4780 4781 4782 4783

/* Return value:
	1 - auth succeeded,
	0 - failed, try again (network error),
	-1 - auth failed, don't try again.
*/

4784
static int drbd_do_auth(struct drbd_connection *connection)
P
Philipp Reisner 已提交
4785
{
4786
	struct drbd_socket *sock;
P
Philipp Reisner 已提交
4787 4788 4789 4790 4791
	char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
	struct scatterlist sg;
	char *response = NULL;
	char *right_response = NULL;
	char *peers_ch = NULL;
4792 4793
	unsigned int key_len;
	char secret[SHARED_SECRET_MAX]; /* 64 byte */
P
Philipp Reisner 已提交
4794 4795
	unsigned int resp_size;
	struct hash_desc desc;
4796
	struct packet_info pi;
4797
	struct net_conf *nc;
4798
	int err, rv;
P
Philipp Reisner 已提交
4799

4800
	/* FIXME: Put the challenge/response into the preallocated socket buffer.  */
P
Philipp Reisner 已提交
4801

4802
	rcu_read_lock();
4803
	nc = rcu_dereference(connection->net_conf);
4804 4805 4806 4807
	key_len = strlen(nc->shared_secret);
	memcpy(secret, nc->shared_secret, key_len);
	rcu_read_unlock();

4808
	desc.tfm = connection->cram_hmac_tfm;
P
Philipp Reisner 已提交
4809 4810
	desc.flags = 0;

4811
	rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
P
Philipp Reisner 已提交
4812
	if (rv) {
4813
		drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv);
4814
		rv = -1;
P
Philipp Reisner 已提交
4815 4816 4817 4818 4819
		goto fail;
	}

	get_random_bytes(my_challenge, CHALLENGE_LEN);

4820 4821
	sock = &connection->data;
	if (!conn_prepare_command(connection, sock)) {
4822 4823 4824
		rv = 0;
		goto fail;
	}
4825
	rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4826
				my_challenge, CHALLENGE_LEN);
P
Philipp Reisner 已提交
4827 4828 4829
	if (!rv)
		goto fail;

4830
	err = drbd_recv_header(connection, &pi);
4831 4832
	if (err) {
		rv = 0;
P
Philipp Reisner 已提交
4833
		goto fail;
4834
	}
P
Philipp Reisner 已提交
4835

4836
	if (pi.cmd != P_AUTH_CHALLENGE) {
4837
		drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4838
			 cmdname(pi.cmd), pi.cmd);
P
Philipp Reisner 已提交
4839 4840 4841 4842
		rv = 0;
		goto fail;
	}

4843
	if (pi.size > CHALLENGE_LEN * 2) {
4844
		drbd_err(connection, "expected AuthChallenge payload too big.\n");
4845
		rv = -1;
P
Philipp Reisner 已提交
4846 4847 4848
		goto fail;
	}

4849 4850 4851 4852 4853 4854
	if (pi.size < CHALLENGE_LEN) {
		drbd_err(connection, "AuthChallenge payload too small.\n");
		rv = -1;
		goto fail;
	}

4855
	peers_ch = kmalloc(pi.size, GFP_NOIO);
P
Philipp Reisner 已提交
4856
	if (peers_ch == NULL) {
4857
		drbd_err(connection, "kmalloc of peers_ch failed\n");
4858
		rv = -1;
P
Philipp Reisner 已提交
4859 4860 4861
		goto fail;
	}

4862
	err = drbd_recv_all_warn(connection, peers_ch, pi.size);
4863
	if (err) {
P
Philipp Reisner 已提交
4864 4865 4866 4867
		rv = 0;
		goto fail;
	}

4868 4869 4870 4871 4872 4873
	if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
		drbd_err(connection, "Peer presented the same challenge!\n");
		rv = -1;
		goto fail;
	}

4874
	resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm);
P
Philipp Reisner 已提交
4875 4876
	response = kmalloc(resp_size, GFP_NOIO);
	if (response == NULL) {
4877
		drbd_err(connection, "kmalloc of response failed\n");
4878
		rv = -1;
P
Philipp Reisner 已提交
4879 4880 4881 4882
		goto fail;
	}

	sg_init_table(&sg, 1);
4883
	sg_set_buf(&sg, peers_ch, pi.size);
P
Philipp Reisner 已提交
4884 4885 4886

	rv = crypto_hash_digest(&desc, &sg, sg.length, response);
	if (rv) {
4887
		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4888
		rv = -1;
P
Philipp Reisner 已提交
4889 4890 4891
		goto fail;
	}

4892
	if (!conn_prepare_command(connection, sock)) {
4893
		rv = 0;
P
Philipp Reisner 已提交
4894
		goto fail;
4895
	}
4896
	rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
4897
				response, resp_size);
P
Philipp Reisner 已提交
4898 4899 4900
	if (!rv)
		goto fail;

4901
	err = drbd_recv_header(connection, &pi);
4902
	if (err) {
P
Philipp Reisner 已提交
4903 4904 4905 4906
		rv = 0;
		goto fail;
	}

4907
	if (pi.cmd != P_AUTH_RESPONSE) {
4908
		drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
4909
			 cmdname(pi.cmd), pi.cmd);
P
Philipp Reisner 已提交
4910 4911 4912 4913
		rv = 0;
		goto fail;
	}

4914
	if (pi.size != resp_size) {
4915
		drbd_err(connection, "expected AuthResponse payload of wrong size\n");
P
Philipp Reisner 已提交
4916 4917 4918 4919
		rv = 0;
		goto fail;
	}

4920
	err = drbd_recv_all_warn(connection, response , resp_size);
4921
	if (err) {
P
Philipp Reisner 已提交
4922 4923 4924 4925 4926
		rv = 0;
		goto fail;
	}

	right_response = kmalloc(resp_size, GFP_NOIO);
4927
	if (right_response == NULL) {
4928
		drbd_err(connection, "kmalloc of right_response failed\n");
4929
		rv = -1;
P
Philipp Reisner 已提交
4930 4931 4932 4933 4934 4935 4936
		goto fail;
	}

	sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);

	rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
	if (rv) {
4937
		drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4938
		rv = -1;
P
Philipp Reisner 已提交
4939 4940 4941 4942 4943 4944
		goto fail;
	}

	rv = !memcmp(response, right_response, resp_size);

	if (rv)
4945
		drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
4946
		     resp_size);
4947 4948
	else
		rv = -1;
P
Philipp Reisner 已提交
4949 4950 4951 4952 4953 4954 4955 4956 4957 4958

 fail:
	kfree(peers_ch);
	kfree(response);
	kfree(right_response);

	return rv;
}
#endif

4959
int drbd_receiver(struct drbd_thread *thi)
P
Philipp Reisner 已提交
4960
{
4961
	struct drbd_connection *connection = thi->connection;
P
Philipp Reisner 已提交
4962 4963
	int h;

4964
	drbd_info(connection, "receiver (re)started\n");
P
Philipp Reisner 已提交
4965 4966

	do {
4967
		h = conn_connect(connection);
P
Philipp Reisner 已提交
4968
		if (h == 0) {
4969
			conn_disconnect(connection);
4970
			schedule_timeout_interruptible(HZ);
P
Philipp Reisner 已提交
4971 4972
		}
		if (h == -1) {
4973
			drbd_warn(connection, "Discarding network configuration.\n");
4974
			conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
P
Philipp Reisner 已提交
4975 4976 4977
		}
	} while (h == 0);

4978
	if (h > 0)
4979
		drbdd(connection);
P
Philipp Reisner 已提交
4980

4981
	conn_disconnect(connection);
P
Philipp Reisner 已提交
4982

4983
	drbd_info(connection, "receiver terminated\n");
P
Philipp Reisner 已提交
4984 4985 4986 4987 4988
	return 0;
}

/* ********* acknowledge sender ******** */

4989
static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
4990
{
4991
	struct p_req_state_reply *p = pi->data;
4992 4993 4994
	int retcode = be32_to_cpu(p->retcode);

	if (retcode >= SS_SUCCESS) {
4995
		set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
4996
	} else {
4997
		set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
4998
		drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
4999 5000
			 drbd_set_st_err_str(retcode), retcode);
	}
5001
	wake_up(&connection->ping_wait);
5002

5003
	return 0;
5004
}
P
Philipp Reisner 已提交
5005

5006
static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
5007
{
5008
	struct drbd_peer_device *peer_device;
5009
	struct drbd_device *device;
5010
	struct p_req_state_reply *p = pi->data;
P
Philipp Reisner 已提交
5011 5012
	int retcode = be32_to_cpu(p->retcode);

5013 5014
	peer_device = conn_peer_device(connection, pi->vnr);
	if (!peer_device)
5015
		return -EIO;
5016
	device = peer_device->device;
5017

5018
	if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5019
		D_ASSERT(device, connection->agreed_pro_version < 100);
5020
		return got_conn_RqSReply(connection, pi);
5021 5022
	}

P
Philipp Reisner 已提交
5023
	if (retcode >= SS_SUCCESS) {
5024
		set_bit(CL_ST_CHG_SUCCESS, &device->flags);
P
Philipp Reisner 已提交
5025
	} else {
5026
		set_bit(CL_ST_CHG_FAIL, &device->flags);
5027
		drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5028
			drbd_set_st_err_str(retcode), retcode);
P
Philipp Reisner 已提交
5029
	}
5030
	wake_up(&device->state_wait);
P
Philipp Reisner 已提交
5031

5032
	return 0;
P
Philipp Reisner 已提交
5033 5034
}

5035
static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
5036
{
5037
	return drbd_send_ping_ack(connection);
P
Philipp Reisner 已提交
5038 5039 5040

}

5041
static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
5042 5043
{
	/* restore idle timeout */
5044 5045 5046
	connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
	if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
		wake_up(&connection->ping_wait);
P
Philipp Reisner 已提交
5047

5048
	return 0;
P
Philipp Reisner 已提交
5049 5050
}

5051
static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
5052
{
5053
	struct drbd_peer_device *peer_device;
5054
	struct drbd_device *device;
5055
	struct p_block_ack *p = pi->data;
P
Philipp Reisner 已提交
5056 5057 5058
	sector_t sector = be64_to_cpu(p->sector);
	int blksize = be32_to_cpu(p->blksize);

5059 5060
	peer_device = conn_peer_device(connection, pi->vnr);
	if (!peer_device)
5061
		return -EIO;
5062
	device = peer_device->device;
5063

5064
	D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
P
Philipp Reisner 已提交
5065

5066
	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
P
Philipp Reisner 已提交
5067

5068 5069 5070
	if (get_ldev(device)) {
		drbd_rs_complete_io(device, sector);
		drbd_set_in_sync(device, sector, blksize);
5071
		/* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5072 5073
		device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
		put_ldev(device);
5074
	}
5075 5076
	dec_rs_pending(device);
	atomic_add(blksize >> 9, &device->rs_sect_in);
P
Philipp Reisner 已提交
5077

5078
	return 0;
P
Philipp Reisner 已提交
5079 5080
}

5081
static int
5082
validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5083 5084
			      struct rb_root *root, const char *func,
			      enum drbd_req_event what, bool missing_ok)
P
Philipp Reisner 已提交
5085 5086 5087 5088
{
	struct drbd_request *req;
	struct bio_and_error m;

5089
	spin_lock_irq(&device->resource->req_lock);
5090
	req = find_request(device, root, id, sector, missing_ok, func);
P
Philipp Reisner 已提交
5091
	if (unlikely(!req)) {
5092
		spin_unlock_irq(&device->resource->req_lock);
5093
		return -EIO;
P
Philipp Reisner 已提交
5094 5095
	}
	__req_mod(req, what, &m);
5096
	spin_unlock_irq(&device->resource->req_lock);
P
Philipp Reisner 已提交
5097 5098

	if (m.bio)
5099
		complete_master_bio(device, &m);
5100
	return 0;
P
Philipp Reisner 已提交
5101 5102
}

5103
static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
5104
{
5105
	struct drbd_peer_device *peer_device;
5106
	struct drbd_device *device;
5107
	struct p_block_ack *p = pi->data;
P
Philipp Reisner 已提交
5108 5109 5110 5111
	sector_t sector = be64_to_cpu(p->sector);
	int blksize = be32_to_cpu(p->blksize);
	enum drbd_req_event what;

5112 5113
	peer_device = conn_peer_device(connection, pi->vnr);
	if (!peer_device)
5114
		return -EIO;
5115
	device = peer_device->device;
5116

5117
	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
P
Philipp Reisner 已提交
5118

5119
	if (p->block_id == ID_SYNCER) {
5120 5121
		drbd_set_in_sync(device, sector, blksize);
		dec_rs_pending(device);
5122
		return 0;
P
Philipp Reisner 已提交
5123
	}
5124
	switch (pi->cmd) {
P
Philipp Reisner 已提交
5125
	case P_RS_WRITE_ACK:
5126
		what = WRITE_ACKED_BY_PEER_AND_SIS;
P
Philipp Reisner 已提交
5127 5128
		break;
	case P_WRITE_ACK:
5129
		what = WRITE_ACKED_BY_PEER;
P
Philipp Reisner 已提交
5130 5131
		break;
	case P_RECV_ACK:
5132
		what = RECV_ACKED_BY_PEER;
P
Philipp Reisner 已提交
5133
		break;
5134 5135
	case P_SUPERSEDED:
		what = CONFLICT_RESOLVED;
P
Philipp Reisner 已提交
5136
		break;
5137 5138
	case P_RETRY_WRITE:
		what = POSTPONE_WRITE;
P
Philipp Reisner 已提交
5139 5140
		break;
	default:
5141
		BUG();
P
Philipp Reisner 已提交
5142 5143
	}

5144 5145
	return validate_req_change_req_state(device, p->block_id, sector,
					     &device->write_requests, __func__,
5146
					     what, false);
P
Philipp Reisner 已提交
5147 5148
}

5149
static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
5150
{
5151
	struct drbd_peer_device *peer_device;
5152
	struct drbd_device *device;
5153
	struct p_block_ack *p = pi->data;
P
Philipp Reisner 已提交
5154
	sector_t sector = be64_to_cpu(p->sector);
5155
	int size = be32_to_cpu(p->blksize);
5156
	int err;
P
Philipp Reisner 已提交
5157

5158 5159
	peer_device = conn_peer_device(connection, pi->vnr);
	if (!peer_device)
5160
		return -EIO;
5161
	device = peer_device->device;
P
Philipp Reisner 已提交
5162

5163
	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
P
Philipp Reisner 已提交
5164

5165
	if (p->block_id == ID_SYNCER) {
5166 5167
		dec_rs_pending(device);
		drbd_rs_failed_io(device, sector, size);
5168
		return 0;
P
Philipp Reisner 已提交
5169
	}
5170

5171 5172
	err = validate_req_change_req_state(device, p->block_id, sector,
					    &device->write_requests, __func__,
5173
					    NEG_ACKED, true);
5174
	if (err) {
5175 5176 5177 5178 5179
		/* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
		   The master bio might already be completed, therefore the
		   request is no longer in the collision hash. */
		/* In Protocol B we might already have got a P_RECV_ACK
		   but then get a P_NEG_ACK afterwards. */
5180
		drbd_set_out_of_sync(device, sector, size);
5181
	}
5182
	return 0;
P
Philipp Reisner 已提交
5183 5184
}

5185
static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
5186
{
5187
	struct drbd_peer_device *peer_device;
5188
	struct drbd_device *device;
5189
	struct p_block_ack *p = pi->data;
P
Philipp Reisner 已提交
5190 5191
	sector_t sector = be64_to_cpu(p->sector);

5192 5193
	peer_device = conn_peer_device(connection, pi->vnr);
	if (!peer_device)
5194
		return -EIO;
5195
	device = peer_device->device;
5196

5197
	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5198

5199
	drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
P
Philipp Reisner 已提交
5200 5201
	    (unsigned long long)sector, be32_to_cpu(p->blksize));

5202 5203
	return validate_req_change_req_state(device, p->block_id, sector,
					     &device->read_requests, __func__,
5204
					     NEG_ACKED, false);
P
Philipp Reisner 已提交
5205 5206
}

5207
static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
5208
{
5209
	struct drbd_peer_device *peer_device;
5210
	struct drbd_device *device;
P
Philipp Reisner 已提交
5211 5212
	sector_t sector;
	int size;
5213
	struct p_block_ack *p = pi->data;
5214

5215 5216
	peer_device = conn_peer_device(connection, pi->vnr);
	if (!peer_device)
5217
		return -EIO;
5218
	device = peer_device->device;
P
Philipp Reisner 已提交
5219 5220 5221 5222

	sector = be64_to_cpu(p->sector);
	size = be32_to_cpu(p->blksize);

5223
	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
P
Philipp Reisner 已提交
5224

5225
	dec_rs_pending(device);
P
Philipp Reisner 已提交
5226

5227 5228
	if (get_ldev_if_state(device, D_FAILED)) {
		drbd_rs_complete_io(device, sector);
5229
		switch (pi->cmd) {
5230
		case P_NEG_RS_DREPLY:
5231
			drbd_rs_failed_io(device, sector, size);
5232 5233 5234
		case P_RS_CANCEL:
			break;
		default:
5235
			BUG();
5236
		}
5237
		put_ldev(device);
P
Philipp Reisner 已提交
5238 5239
	}

5240
	return 0;
P
Philipp Reisner 已提交
5241 5242
}

5243
static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
5244
{
5245
	struct p_barrier_ack *p = pi->data;
5246
	struct drbd_peer_device *peer_device;
5247
	int vnr;
5248

5249
	tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
P
Philipp Reisner 已提交
5250

5251
	rcu_read_lock();
5252 5253 5254
	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
		struct drbd_device *device = peer_device->device;

5255 5256 5257 5258 5259
		if (device->state.conn == C_AHEAD &&
		    atomic_read(&device->ap_in_flight) == 0 &&
		    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
			device->start_resync_timer.expires = jiffies + HZ;
			add_timer(&device->start_resync_timer);
5260
		}
5261
	}
5262
	rcu_read_unlock();
5263

5264
	return 0;
P
Philipp Reisner 已提交
5265 5266
}

5267
static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
P
Philipp Reisner 已提交
5268
{
5269
	struct drbd_peer_device *peer_device;
5270
	struct drbd_device *device;
5271
	struct p_block_ack *p = pi->data;
5272
	struct drbd_device_work *dw;
P
Philipp Reisner 已提交
5273 5274 5275
	sector_t sector;
	int size;

5276 5277
	peer_device = conn_peer_device(connection, pi->vnr);
	if (!peer_device)
5278
		return -EIO;
5279
	device = peer_device->device;
5280

P
Philipp Reisner 已提交
5281 5282 5283
	sector = be64_to_cpu(p->sector);
	size = be32_to_cpu(p->blksize);

5284
	update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
P
Philipp Reisner 已提交
5285 5286

	if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5287
		drbd_ov_out_of_sync_found(device, sector, size);
P
Philipp Reisner 已提交
5288
	else
5289
		ov_out_of_sync_print(device);
P
Philipp Reisner 已提交
5290

5291
	if (!get_ldev(device))
5292
		return 0;
5293

5294 5295
	drbd_rs_complete_io(device, sector);
	dec_rs_pending(device);
P
Philipp Reisner 已提交
5296

5297
	--device->ov_left;
5298 5299

	/* let's advance progress step marks only for every other megabyte */
5300 5301
	if ((device->ov_left & 0x200) == 0x200)
		drbd_advance_rs_marks(device, device->ov_left);
5302

5303
	if (device->ov_left == 0) {
5304 5305 5306 5307 5308
		dw = kmalloc(sizeof(*dw), GFP_NOIO);
		if (dw) {
			dw->w.cb = w_ov_finished;
			dw->device = device;
			drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
P
Philipp Reisner 已提交
5309
		} else {
5310
			drbd_err(device, "kmalloc(dw) failed.");
5311 5312
			ov_out_of_sync_print(device);
			drbd_resync_finished(device);
P
Philipp Reisner 已提交
5313 5314
		}
	}
5315
	put_ldev(device);
5316
	return 0;
P
Philipp Reisner 已提交
5317 5318
}

5319
static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5320
{
5321
	return 0;
P
Philipp Reisner 已提交
5322 5323
}

5324
static int connection_finish_peer_reqs(struct drbd_connection *connection)
5325
{
5326
	struct drbd_peer_device *peer_device;
P
Philipp Reisner 已提交
5327
	int vnr, not_empty = 0;
5328 5329

	do {
5330
		clear_bit(SIGNAL_ASENDER, &connection->flags);
5331
		flush_signals(current);
P
Philipp Reisner 已提交
5332 5333

		rcu_read_lock();
5334 5335
		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
			struct drbd_device *device = peer_device->device;
5336
			kref_get(&device->kref);
P
Philipp Reisner 已提交
5337
			rcu_read_unlock();
5338
			if (drbd_finish_peer_reqs(device)) {
5339
				kref_put(&device->kref, drbd_destroy_device);
P
Philipp Reisner 已提交
5340
				return 1;
5341
			}
5342
			kref_put(&device->kref, drbd_destroy_device);
P
Philipp Reisner 已提交
5343
			rcu_read_lock();
5344
		}
5345
		set_bit(SIGNAL_ASENDER, &connection->flags);
5346

5347
		spin_lock_irq(&connection->resource->req_lock);
5348 5349
		idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
			struct drbd_device *device = peer_device->device;
5350
			not_empty = !list_empty(&device->done_ee);
5351 5352 5353
			if (not_empty)
				break;
		}
5354
		spin_unlock_irq(&connection->resource->req_lock);
P
Philipp Reisner 已提交
5355
		rcu_read_unlock();
5356 5357 5358
	} while (not_empty);

	return 0;
5359 5360
}

P
Philipp Reisner 已提交
5361 5362
struct asender_cmd {
	size_t pkt_size;
5363
	int (*fn)(struct drbd_connection *connection, struct packet_info *);
P
Philipp Reisner 已提交
5364 5365
};

5366
static struct asender_cmd asender_tbl[] = {
5367 5368
	[P_PING]	    = { 0, got_Ping },
	[P_PING_ACK]	    = { 0, got_PingAck },
P
Philipp Reisner 已提交
5369 5370 5371
	[P_RECV_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
	[P_WRITE_ACK]	    = { sizeof(struct p_block_ack), got_BlockAck },
	[P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5372
	[P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
P
Philipp Reisner 已提交
5373 5374
	[P_NEG_ACK]	    = { sizeof(struct p_block_ack), got_NegAck },
	[P_NEG_DREPLY]	    = { sizeof(struct p_block_ack), got_NegDReply },
5375
	[P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
P
Philipp Reisner 已提交
5376 5377 5378 5379
	[P_OV_RESULT]	    = { sizeof(struct p_block_ack), got_OVResult },
	[P_BARRIER_ACK]	    = { sizeof(struct p_barrier_ack), got_BarrierAck },
	[P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
	[P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5380
	[P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5381 5382 5383
	[P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
	[P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
	[P_RETRY_WRITE]	    = { sizeof(struct p_block_ack), got_BlockAck },
5384
};
P
Philipp Reisner 已提交
5385 5386 5387

int drbd_asender(struct drbd_thread *thi)
{
5388
	struct drbd_connection *connection = thi->connection;
P
Philipp Reisner 已提交
5389
	struct asender_cmd *cmd = NULL;
5390
	struct packet_info pi;
5391
	int rv;
5392
	void *buf    = connection->meta.rbuf;
P
Philipp Reisner 已提交
5393
	int received = 0;
5394
	unsigned int header_size = drbd_header_size(connection);
5395
	int expect   = header_size;
5396 5397
	bool ping_timeout_active = false;
	struct net_conf *nc;
5398
	int ping_timeo, tcp_cork, ping_int;
P
Philipp Reisner 已提交
5399
	struct sched_param param = { .sched_priority = 2 };
P
Philipp Reisner 已提交
5400

P
Philipp Reisner 已提交
5401 5402
	rv = sched_setscheduler(current, SCHED_RR, &param);
	if (rv < 0)
5403
		drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv);
P
Philipp Reisner 已提交
5404

5405
	while (get_t_state(thi) == RUNNING) {
5406
		drbd_thread_current_set_cpu(thi);
P
Philipp Reisner 已提交
5407

5408
		rcu_read_lock();
5409
		nc = rcu_dereference(connection->net_conf);
5410
		ping_timeo = nc->ping_timeo;
5411
		tcp_cork = nc->tcp_cork;
5412 5413 5414
		ping_int = nc->ping_int;
		rcu_read_unlock();

5415 5416
		if (test_and_clear_bit(SEND_PING, &connection->flags)) {
			if (drbd_send_ping(connection)) {
5417
				drbd_err(connection, "drbd_send_ping has failed\n");
P
Philipp Reisner 已提交
5418
				goto reconnect;
5419
			}
5420
			connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5421
			ping_timeout_active = true;
P
Philipp Reisner 已提交
5422 5423
		}

5424 5425
		/* TODO: conditionally cork; it may hurt latency if we cork without
		   much to send */
5426
		if (tcp_cork)
5427 5428
			drbd_tcp_cork(connection->meta.socket);
		if (connection_finish_peer_reqs(connection)) {
5429
			drbd_err(connection, "connection_finish_peer_reqs() failed\n");
5430
			goto reconnect;
P
Philipp Reisner 已提交
5431 5432
		}
		/* but unconditionally uncork unless disabled */
5433
		if (tcp_cork)
5434
			drbd_tcp_uncork(connection->meta.socket);
P
Philipp Reisner 已提交
5435 5436 5437 5438 5439

		/* short circuit, recv_msg would return EINTR anyways. */
		if (signal_pending(current))
			continue;

5440 5441
		rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
		clear_bit(SIGNAL_ASENDER, &connection->flags);
P
Philipp Reisner 已提交
5442 5443 5444 5445 5446 5447 5448 5449 5450 5451 5452 5453 5454 5455 5456 5457 5458

		flush_signals(current);

		/* Note:
		 * -EINTR	 (on meta) we got a signal
		 * -EAGAIN	 (on meta) rcvtimeo expired
		 * -ECONNRESET	 other side closed the connection
		 * -ERESTARTSYS  (on data) we got a signal
		 * rv <  0	 other than above: unexpected error!
		 * rv == expected: full header or command
		 * rv <  expected: "woken" by signal during receive
		 * rv == 0	 : "connection shut down by peer"
		 */
		if (likely(rv > 0)) {
			received += rv;
			buf	 += rv;
		} else if (rv == 0) {
5459
			if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5460 5461
				long t;
				rcu_read_lock();
5462
				t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5463 5464
				rcu_read_unlock();

5465 5466
				t = wait_event_timeout(connection->ping_wait,
						       connection->cstate < C_WF_REPORT_PARAMS,
5467
						       t);
5468 5469 5470
				if (t)
					break;
			}
5471
			drbd_err(connection, "meta connection shut down by peer.\n");
P
Philipp Reisner 已提交
5472 5473
			goto reconnect;
		} else if (rv == -EAGAIN) {
5474 5475
			/* If the data socket received something meanwhile,
			 * that is good enough: peer is still alive. */
5476 5477
			if (time_after(connection->last_received,
				jiffies - connection->meta.socket->sk->sk_rcvtimeo))
5478
				continue;
5479
			if (ping_timeout_active) {
5480
				drbd_err(connection, "PingAck did not arrive in time.\n");
P
Philipp Reisner 已提交
5481 5482
				goto reconnect;
			}
5483
			set_bit(SEND_PING, &connection->flags);
P
Philipp Reisner 已提交
5484 5485 5486 5487
			continue;
		} else if (rv == -EINTR) {
			continue;
		} else {
5488
			drbd_err(connection, "sock_recvmsg returned %d\n", rv);
P
Philipp Reisner 已提交
5489 5490 5491 5492
			goto reconnect;
		}

		if (received == expect && cmd == NULL) {
5493
			if (decode_header(connection, connection->meta.rbuf, &pi))
P
Philipp Reisner 已提交
5494
				goto reconnect;
5495
			cmd = &asender_tbl[pi.cmd];
5496
			if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5497
				drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5498
					 cmdname(pi.cmd), pi.cmd);
P
Philipp Reisner 已提交
5499 5500
				goto disconnect;
			}
5501
			expect = header_size + cmd->pkt_size;
5502
			if (pi.size != expect - header_size) {
5503
				drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5504
					pi.cmd, pi.size);
P
Philipp Reisner 已提交
5505
				goto reconnect;
5506
			}
P
Philipp Reisner 已提交
5507 5508
		}
		if (received == expect) {
5509
			bool err;
5510

5511
			err = cmd->fn(connection, &pi);
5512
			if (err) {
5513
				drbd_err(connection, "%pf failed\n", cmd->fn);
P
Philipp Reisner 已提交
5514
				goto reconnect;
5515
			}
P
Philipp Reisner 已提交
5516

5517
			connection->last_received = jiffies;
5518

5519 5520
			if (cmd == &asender_tbl[P_PING_ACK]) {
				/* restore idle timeout */
5521
				connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5522 5523
				ping_timeout_active = false;
			}
5524

5525
			buf	 = connection->meta.rbuf;
P
Philipp Reisner 已提交
5526
			received = 0;
5527
			expect	 = header_size;
P
Philipp Reisner 已提交
5528 5529 5530 5531 5532 5533
			cmd	 = NULL;
		}
	}

	if (0) {
reconnect:
5534 5535
		conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
		conn_md_sync(connection);
P
Philipp Reisner 已提交
5536 5537 5538
	}
	if (0) {
disconnect:
5539
		conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
P
Philipp Reisner 已提交
5540
	}
5541
	clear_bit(SIGNAL_ASENDER, &connection->flags);
P
Philipp Reisner 已提交
5542

5543
	drbd_info(connection, "asender terminated\n");
P
Philipp Reisner 已提交
5544 5545 5546

	return 0;
}