drbd_worker.c 62.7 KB
Newer Older
P
Philipp Reisner 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
/*
   drbd_worker.c

   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.

   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.

   drbd is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2, or (at your option)
   any later version.

   drbd is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with drbd; see the file COPYING.  If not, write to
   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.

24
*/
P
Philipp Reisner 已提交
25 26 27 28 29 30 31 32 33 34 35 36 37 38

#include <linux/module.h>
#include <linux/drbd.h>
#include <linux/sched.h>
#include <linux/wait.h>
#include <linux/mm.h>
#include <linux/memcontrol.h>
#include <linux/mm_inline.h>
#include <linux/slab.h>
#include <linux/random.h>
#include <linux/string.h>
#include <linux/scatterlist.h>

#include "drbd_int.h"
39
#include "drbd_protocol.h"
P
Philipp Reisner 已提交
40 41
#include "drbd_req.h"

42 43
static int make_ov_request(struct drbd_device *, int);
static int make_resync_request(struct drbd_device *, int);
P
Philipp Reisner 已提交
44

45
/* endio handlers:
46
 *   drbd_md_endio (defined here)
47 48
 *   drbd_request_endio (defined here)
 *   drbd_peer_request_endio (defined here)
49
 *   drbd_bm_endio (defined in drbd_bitmap.c)
50
 *
P
Philipp Reisner 已提交
51 52 53 54 55 56 57 58 59 60
 * For all these callbacks, note the following:
 * The callbacks will be called in irq context by the IDE drivers,
 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
 * Try to get the locking right :)
 *
 */


/* About the global_state_lock
   Each state transition on an device holds a read lock. In case we have
61
   to evaluate the resync after dependencies, we grab a write lock, because
P
Philipp Reisner 已提交
62 63 64 65 66 67
   we need stable states on all devices for that.  */
rwlock_t global_state_lock;

/* used for synchronous meta data and bitmap IO
 * submitted by drbd_md_sync_page_io()
 */
68
void drbd_md_endio(struct bio *bio, int error)
P
Philipp Reisner 已提交
69
{
70
	struct drbd_device *device;
P
Philipp Reisner 已提交
71

72 73
	device = bio->bi_private;
	device->md_io.error = error;
P
Philipp Reisner 已提交
74

75 76 77 78 79 80 81 82 83
	/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
	 * to timeout on the lower level device, and eventually detach from it.
	 * If this io completion runs after that timeout expired, this
	 * drbd_md_put_buffer() may allow us to finally try and re-attach.
	 * During normal operation, this only puts that extra reference
	 * down to 1 again.
	 * Make sure we first drop the reference, and only then signal
	 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
	 * next drbd_md_sync_page_io(), that we trigger the
84
	 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
85
	 */
86
	drbd_md_put_buffer(device);
87
	device->md_io.done = 1;
88
	wake_up(&device->misc_wait);
89
	bio_put(bio);
90 91
	if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
		put_ldev(device);
P
Philipp Reisner 已提交
92 93 94 95 96
}

/* reads on behalf of the partner,
 * "submitted" by the receiver
 */
97
static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
P
Philipp Reisner 已提交
98 99
{
	unsigned long flags = 0;
100 101
	struct drbd_peer_device *peer_device = peer_req->peer_device;
	struct drbd_device *device = peer_device->device;
P
Philipp Reisner 已提交
102

103
	spin_lock_irqsave(&device->resource->req_lock, flags);
104
	device->read_cnt += peer_req->i.size >> 9;
105
	list_del(&peer_req->w.list);
106 107
	if (list_empty(&device->read_ee))
		wake_up(&device->ee_wait);
108
	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
109
		__drbd_chk_io_error(device, DRBD_READ_ERROR);
110
	spin_unlock_irqrestore(&device->resource->req_lock, flags);
P
Philipp Reisner 已提交
111

112
	drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
113
	put_ldev(device);
P
Philipp Reisner 已提交
114 115 116
}

/* writes on behalf of the partner, or resync writes,
117
 * "submitted" by the receiver, final stage.  */
118
void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
P
Philipp Reisner 已提交
119 120
{
	unsigned long flags = 0;
121 122
	struct drbd_peer_device *peer_device = peer_req->peer_device;
	struct drbd_device *device = peer_device->device;
123
	struct drbd_interval i;
P
Philipp Reisner 已提交
124
	int do_wake;
125
	u64 block_id;
P
Philipp Reisner 已提交
126 127
	int do_al_complete_io;

128
	/* after we moved peer_req to done_ee,
P
Philipp Reisner 已提交
129 130 131
	 * we may no longer access it,
	 * it may be freed/reused already!
	 * (as soon as we release the req_lock) */
132
	i = peer_req->i;
133 134
	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
	block_id = peer_req->block_id;
135
	peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
P
Philipp Reisner 已提交
136

137
	spin_lock_irqsave(&device->resource->req_lock, flags);
138
	device->writ_cnt += peer_req->i.size >> 9;
139
	list_move_tail(&peer_req->w.list, &device->done_ee);
P
Philipp Reisner 已提交
140

141
	/*
142
	 * Do not remove from the write_requests tree here: we did not send the
143 144
	 * Ack yet and did not wake possibly waiting conflicting requests.
	 * Removed from the tree from "drbd_process_done_ee" within the
145
	 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
146 147
	 * _drbd_clear_done_ee.
	 */
P
Philipp Reisner 已提交
148

149
	do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
P
Philipp Reisner 已提交
150

151 152 153
	/* FIXME do we want to detach for failed REQ_DISCARD?
	 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
	if (peer_req->flags & EE_WAS_ERROR)
154
		__drbd_chk_io_error(device, DRBD_WRITE_ERROR);
155
	spin_unlock_irqrestore(&device->resource->req_lock, flags);
P
Philipp Reisner 已提交
156

157
	if (block_id == ID_SYNCER)
158
		drbd_rs_complete_io(device, i.sector);
P
Philipp Reisner 已提交
159 160

	if (do_wake)
161
		wake_up(&device->ee_wait);
P
Philipp Reisner 已提交
162 163

	if (do_al_complete_io)
164
		drbd_al_complete_io(device, &i);
P
Philipp Reisner 已提交
165

166
	wake_asender(peer_device->connection);
167
	put_ldev(device);
168
}
P
Philipp Reisner 已提交
169

170 171 172
/* writes on behalf of the partner, or resync writes,
 * "submitted" by the receiver.
 */
173
void drbd_peer_request_endio(struct bio *bio, int error)
174
{
175
	struct drbd_peer_request *peer_req = bio->bi_private;
176
	struct drbd_device *device = peer_req->peer_device->device;
177 178
	int uptodate = bio_flagged(bio, BIO_UPTODATE);
	int is_write = bio_data_dir(bio) == WRITE;
179
	int is_discard = !!(bio->bi_rw & REQ_DISCARD);
180

181
	if (error && __ratelimit(&drbd_ratelimit_state))
182
		drbd_warn(device, "%s: error=%d s=%llus\n",
183 184
				is_write ? (is_discard ? "discard" : "write")
					: "read", error,
185
				(unsigned long long)peer_req->i.sector);
186
	if (!error && !uptodate) {
187
		if (__ratelimit(&drbd_ratelimit_state))
188
			drbd_warn(device, "%s: setting error to -EIO s=%llus\n",
189
					is_write ? "write" : "read",
190
					(unsigned long long)peer_req->i.sector);
191 192 193 194 195 196 197
		/* strange behavior of some lower level drivers...
		 * fail the request by clearing the uptodate flag,
		 * but do not return any error?! */
		error = -EIO;
	}

	if (error)
198
		set_bit(__EE_WAS_ERROR, &peer_req->flags);
199 200

	bio_put(bio); /* no need for the bio anymore */
201
	if (atomic_dec_and_test(&peer_req->pending_bios)) {
202
		if (is_write)
203
			drbd_endio_write_sec_final(peer_req);
204
		else
205
			drbd_endio_read_sec_final(peer_req);
206
	}
P
Philipp Reisner 已提交
207 208 209 210
}

/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
 */
211
void drbd_request_endio(struct bio *bio, int error)
P
Philipp Reisner 已提交
212
{
213
	unsigned long flags;
P
Philipp Reisner 已提交
214
	struct drbd_request *req = bio->bi_private;
215
	struct drbd_device *device = req->device;
216
	struct bio_and_error m;
P
Philipp Reisner 已提交
217 218 219 220
	enum drbd_req_event what;
	int uptodate = bio_flagged(bio, BIO_UPTODATE);

	if (!error && !uptodate) {
221
		drbd_warn(device, "p %s: setting error to -EIO\n",
P
Philipp Reisner 已提交
222 223 224 225 226 227 228
			 bio_data_dir(bio) == WRITE ? "write" : "read");
		/* strange behavior of some lower level drivers...
		 * fail the request by clearing the uptodate flag,
		 * but do not return any error?! */
		error = -EIO;
	}

229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259

	/* If this request was aborted locally before,
	 * but now was completed "successfully",
	 * chances are that this caused arbitrary data corruption.
	 *
	 * "aborting" requests, or force-detaching the disk, is intended for
	 * completely blocked/hung local backing devices which do no longer
	 * complete requests at all, not even do error completions.  In this
	 * situation, usually a hard-reset and failover is the only way out.
	 *
	 * By "aborting", basically faking a local error-completion,
	 * we allow for a more graceful swichover by cleanly migrating services.
	 * Still the affected node has to be rebooted "soon".
	 *
	 * By completing these requests, we allow the upper layers to re-use
	 * the associated data pages.
	 *
	 * If later the local backing device "recovers", and now DMAs some data
	 * from disk into the original request pages, in the best case it will
	 * just put random data into unused pages; but typically it will corrupt
	 * meanwhile completely unrelated data, causing all sorts of damage.
	 *
	 * Which means delayed successful completion,
	 * especially for READ requests,
	 * is a reason to panic().
	 *
	 * We assume that a delayed *error* completion is OK,
	 * though we still will complain noisily about it.
	 */
	if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
		if (__ratelimit(&drbd_ratelimit_state))
260
			drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
261 262 263 264 265

		if (!error)
			panic("possible random memory corruption caused by delayed completion of aborted local request\n");
	}

P
Philipp Reisner 已提交
266 267
	/* to avoid recursion in __req_mod */
	if (unlikely(error)) {
268 269 270 271 272 273
		if (bio->bi_rw & REQ_DISCARD)
			what = (error == -EOPNOTSUPP)
				? DISCARD_COMPLETED_NOTSUPP
				: DISCARD_COMPLETED_WITH_ERROR;
		else
			what = (bio_data_dir(bio) == WRITE)
274
			? WRITE_COMPLETED_WITH_ERROR
275
			: (bio_rw(bio) == READ)
276 277
			  ? READ_COMPLETED_WITH_ERROR
			  : READ_AHEAD_COMPLETED_WITH_ERROR;
P
Philipp Reisner 已提交
278
	} else
279
		what = COMPLETED_OK;
P
Philipp Reisner 已提交
280 281 282 283

	bio_put(req->private_bio);
	req->private_bio = ERR_PTR(error);

284
	/* not req_mod(), we need irqsave here! */
285
	spin_lock_irqsave(&device->resource->req_lock, flags);
286
	__req_mod(req, what, &m);
287
	spin_unlock_irqrestore(&device->resource->req_lock, flags);
288
	put_ldev(device);
289 290

	if (m.bio)
291
		complete_master_bio(device, &m);
P
Philipp Reisner 已提交
292 293
}

294
void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest)
295 296 297
{
	struct hash_desc desc;
	struct scatterlist sg;
298
	struct page *page = peer_req->pages;
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
	struct page *tmp;
	unsigned len;

	desc.tfm = tfm;
	desc.flags = 0;

	sg_init_table(&sg, 1);
	crypto_hash_init(&desc);

	while ((tmp = page_chain_next(page))) {
		/* all but the last page will be fully used */
		sg_set_page(&sg, page, PAGE_SIZE, 0);
		crypto_hash_update(&desc, &sg, sg.length);
		page = tmp;
	}
	/* and now the last, possibly only partially used page */
315
	len = peer_req->i.size & (PAGE_SIZE - 1);
316 317 318 319 320
	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
	crypto_hash_update(&desc, &sg, sg.length);
	crypto_hash_final(&desc, digest);
}

321
void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest)
P
Philipp Reisner 已提交
322 323 324
{
	struct hash_desc desc;
	struct scatterlist sg;
325 326
	struct bio_vec bvec;
	struct bvec_iter iter;
P
Philipp Reisner 已提交
327 328 329 330 331 332 333

	desc.tfm = tfm;
	desc.flags = 0;

	sg_init_table(&sg, 1);
	crypto_hash_init(&desc);

334 335
	bio_for_each_segment(bvec, bio, iter) {
		sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
P
Philipp Reisner 已提交
336 337 338 339 340
		crypto_hash_update(&desc, &sg, sg.length);
	}
	crypto_hash_final(&desc, digest);
}

341
/* MAYBE merge common code with w_e_end_ov_req */
342
static int w_e_send_csum(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
343
{
344
	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
345 346
	struct drbd_peer_device *peer_device = peer_req->peer_device;
	struct drbd_device *device = peer_device->device;
P
Philipp Reisner 已提交
347 348
	int digest_size;
	void *digest;
349
	int err = 0;
P
Philipp Reisner 已提交
350

351 352
	if (unlikely(cancel))
		goto out;
P
Philipp Reisner 已提交
353

354
	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
355
		goto out;
P
Philipp Reisner 已提交
356

357
	digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
358 359
	digest = kmalloc(digest_size, GFP_NOIO);
	if (digest) {
360 361
		sector_t sector = peer_req->i.sector;
		unsigned int size = peer_req->i.size;
362
		drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
363
		/* Free peer_req and pages before send.
364 365 366
		 * In case we block on congestion, we could otherwise run into
		 * some distributed deadlock, if the other side blocks on
		 * congestion as well, because our receiver blocks in
367
		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
368
		drbd_free_peer_req(device, peer_req);
369
		peer_req = NULL;
370
		inc_rs_pending(device);
371
		err = drbd_send_drequest_csum(peer_device, sector, size,
372 373
					      digest, digest_size,
					      P_CSUM_RS_REQUEST);
374 375
		kfree(digest);
	} else {
376
		drbd_err(device, "kmalloc() of digest failed.\n");
377
		err = -ENOMEM;
378
	}
P
Philipp Reisner 已提交
379

380
out:
381
	if (peer_req)
382
		drbd_free_peer_req(device, peer_req);
P
Philipp Reisner 已提交
383

384
	if (unlikely(err))
385
		drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
386
	return err;
P
Philipp Reisner 已提交
387 388 389 390
}

#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)

391
static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
P
Philipp Reisner 已提交
392
{
393
	struct drbd_device *device = peer_device->device;
394
	struct drbd_peer_request *peer_req;
P
Philipp Reisner 已提交
395

396
	if (!get_ldev(device))
397
		return -EIO;
P
Philipp Reisner 已提交
398 399 400

	/* GFP_TRY, because if there is no memory available right now, this may
	 * be rescheduled for later. It is "only" background resync, after all. */
401
	peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
402
				       size, true /* has real payload */, GFP_TRY);
403
	if (!peer_req)
404
		goto defer;
P
Philipp Reisner 已提交
405

406
	peer_req->w.cb = w_e_send_csum;
407
	spin_lock_irq(&device->resource->req_lock);
408
	list_add_tail(&peer_req->w.list, &device->read_ee);
409
	spin_unlock_irq(&device->resource->req_lock);
P
Philipp Reisner 已提交
410

411 412
	atomic_add(size >> 9, &device->rs_sect_ev);
	if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
413
		return 0;
P
Philipp Reisner 已提交
414

415 416 417 418
	/* If it failed because of ENOMEM, retry should help.  If it failed
	 * because bio_add_page failed (probably broken lower level driver),
	 * retry may or may not help.
	 * If it does not, you may need to force disconnect. */
419
	spin_lock_irq(&device->resource->req_lock);
420
	list_del(&peer_req->w.list);
421
	spin_unlock_irq(&device->resource->req_lock);
422

423
	drbd_free_peer_req(device, peer_req);
424
defer:
425
	put_ldev(device);
426
	return -EAGAIN;
P
Philipp Reisner 已提交
427 428
}

429
int w_resync_timer(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
430
{
431 432 433
	struct drbd_device *device =
		container_of(w, struct drbd_device, resync_work);

434
	switch (device->state.conn) {
435
	case C_VERIFY_S:
436
		make_ov_request(device, cancel);
437 438
		break;
	case C_SYNC_TARGET:
439
		make_resync_request(device, cancel);
440
		break;
P
Philipp Reisner 已提交
441 442
	}

443
	return 0;
444 445 446 447
}

void resync_timer_fn(unsigned long data)
{
448
	struct drbd_device *device = (struct drbd_device *) data;
449

450 451 452
	drbd_queue_work_if_unqueued(
		&first_peer_device(device)->connection->sender_work,
		&device->resync_work);
P
Philipp Reisner 已提交
453 454
}

455 456 457 458 459
static void fifo_set(struct fifo_buffer *fb, int value)
{
	int i;

	for (i = 0; i < fb->size; i++)
460
		fb->values[i] = value;
461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483
}

static int fifo_push(struct fifo_buffer *fb, int value)
{
	int ov;

	ov = fb->values[fb->head_index];
	fb->values[fb->head_index++] = value;

	if (fb->head_index >= fb->size)
		fb->head_index = 0;

	return ov;
}

static void fifo_add_val(struct fifo_buffer *fb, int value)
{
	int i;

	for (i = 0; i < fb->size; i++)
		fb->values[i] += value;
}

484 485 486 487
struct fifo_buffer *fifo_alloc(int fifo_size)
{
	struct fifo_buffer *fb;

488
	fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
489 490 491 492 493 494 495 496 497 498
	if (!fb)
		return NULL;

	fb->head_index = 0;
	fb->size = fifo_size;
	fb->total = 0;

	return fb;
}

499
static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
500
{
P
Philipp Reisner 已提交
501
	struct disk_conf *dc;
502
	unsigned int want;     /* The number of sectors we want in-flight */
503
	int req_sect; /* Number of sectors to request in this turn */
504
	int correction; /* Number of sectors more we need in-flight */
505 506 507 508
	int cps; /* correction per invocation of drbd_rs_controller() */
	int steps; /* Number of time steps to plan ahead */
	int curr_corr;
	int max_sect;
P
Philipp Reisner 已提交
509
	struct fifo_buffer *plan;
510

511 512
	dc = rcu_dereference(device->ldev->disk_conf);
	plan = rcu_dereference(device->rs_plan_s);
513

P
Philipp Reisner 已提交
514
	steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
515

516
	if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
P
Philipp Reisner 已提交
517
		want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
518
	} else { /* normal path */
P
Philipp Reisner 已提交
519 520
		want = dc->c_fill_target ? dc->c_fill_target :
			sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
521 522
	}

523
	correction = want - device->rs_in_flight - plan->total;
524 525 526

	/* Plan ahead */
	cps = correction / steps;
P
Philipp Reisner 已提交
527 528
	fifo_add_val(plan, cps);
	plan->total += cps * steps;
529 530

	/* What we do in this step */
P
Philipp Reisner 已提交
531 532
	curr_corr = fifo_push(plan, 0);
	plan->total -= curr_corr;
533 534 535 536 537

	req_sect = sect_in + curr_corr;
	if (req_sect < 0)
		req_sect = 0;

P
Philipp Reisner 已提交
538
	max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
539 540 541 542
	if (req_sect > max_sect)
		req_sect = max_sect;

	/*
543
	drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
544 545
		 sect_in, device->rs_in_flight, want, correction,
		 steps, cps, device->rs_planed, curr_corr, req_sect);
546 547 548 549 550
	*/

	return req_sect;
}

551
static int drbd_rs_number_requests(struct drbd_device *device)
552
{
553 554 555 556 557
	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
	int number, mxb;

	sect_in = atomic_xchg(&device->rs_sect_in, 0);
	device->rs_in_flight -= sect_in;
P
Philipp Reisner 已提交
558 559

	rcu_read_lock();
560
	mxb = drbd_get_max_buffers(device) / 2;
561
	if (rcu_dereference(device->rs_plan_s)->size) {
562
		number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
563
		device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
564
	} else {
565 566
		device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
		number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
567
	}
P
Philipp Reisner 已提交
568
	rcu_read_unlock();
569

570 571 572 573 574
	/* Don't have more than "max-buffers"/2 in-flight.
	 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
	 * potentially causing a distributed deadlock on congestion during
	 * online-verify or (checksum-based) resync, if max-buffers,
	 * socket buffer sizes and resync rate settings are mis-configured. */
575 576 577 578 579 580 581

	/* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
	 * mxb (as used here, and in drbd_alloc_pages on the peer) is
	 * "number of pages" (typically also 4k),
	 * but "rs_in_flight" is in "sectors" (512 Byte). */
	if (mxb - device->rs_in_flight/8 < number)
		number = mxb - device->rs_in_flight/8;
582

583 584 585
	return number;
}

586
static int make_resync_request(struct drbd_device *const device, int cancel)
P
Philipp Reisner 已提交
587
{
588 589
	struct drbd_peer_device *const peer_device = first_peer_device(device);
	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
P
Philipp Reisner 已提交
590 591
	unsigned long bit;
	sector_t sector;
592
	const sector_t capacity = drbd_get_capacity(device->this_bdev);
593
	int max_bio_size;
594
	int number, rollback_i, size;
595
	int align, requeue = 0;
596
	int i = 0;
P
Philipp Reisner 已提交
597 598

	if (unlikely(cancel))
599
		return 0;
P
Philipp Reisner 已提交
600

601
	if (device->rs_total == 0) {
602
		/* empty resync? */
603
		drbd_resync_finished(device);
604
		return 0;
605 606
	}

607 608 609
	if (!get_ldev(device)) {
		/* Since we only need to access device->rsync a
		   get_ldev_if_state(device,D_FAILED) would be sufficient, but
P
Philipp Reisner 已提交
610 611
		   to continue resync with a broken disk makes no sense at
		   all */
612
		drbd_err(device, "Disk broke down during resync!\n");
613
		return 0;
P
Philipp Reisner 已提交
614 615
	}

616 617
	max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
	number = drbd_rs_number_requests(device);
618
	if (number <= 0)
619
		goto requeue;
P
Philipp Reisner 已提交
620 621

	for (i = 0; i < number; i++) {
622 623
		/* Stop generating RS requests when half of the send buffer is filled,
		 * but notify TCP that we'd like to have more space. */
624 625
		mutex_lock(&connection->data.mutex);
		if (connection->data.socket) {
626 627 628 629 630 631 632 633 634 635
			struct sock *sk = connection->data.socket->sk;
			int queued = sk->sk_wmem_queued;
			int sndbuf = sk->sk_sndbuf;
			if (queued > sndbuf / 2) {
				requeue = 1;
				if (sk->sk_socket)
					set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
			}
		} else
			requeue = 1;
636
		mutex_unlock(&connection->data.mutex);
637
		if (requeue)
P
Philipp Reisner 已提交
638 639 640 641
			goto requeue;

next_sector:
		size = BM_BLOCK_SIZE;
642
		bit  = drbd_bm_find_next(device, device->bm_resync_fo);
P
Philipp Reisner 已提交
643

644
		if (bit == DRBD_END_OF_BITMAP) {
645 646
			device->bm_resync_fo = drbd_bm_bits(device);
			put_ldev(device);
647
			return 0;
P
Philipp Reisner 已提交
648 649 650 651
		}

		sector = BM_BIT_TO_SECT(bit);

652
		if (drbd_try_rs_begin_io(device, sector)) {
653
			device->bm_resync_fo = bit;
P
Philipp Reisner 已提交
654 655
			goto requeue;
		}
656
		device->bm_resync_fo = bit + 1;
P
Philipp Reisner 已提交
657

658 659
		if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
			drbd_rs_complete_io(device, sector);
P
Philipp Reisner 已提交
660 661 662
			goto next_sector;
		}

663
#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
P
Philipp Reisner 已提交
664 665 666 667 668 669 670
		/* try to find some adjacent bits.
		 * we stop if we have already the maximum req size.
		 *
		 * Additionally always align bigger requests, in order to
		 * be prepared for all stripe sizes of software RAIDs.
		 */
		align = 1;
671
		rollback_i = i;
672
		while (i < number) {
673
			if (size + BM_BLOCK_SIZE > max_bio_size)
P
Philipp Reisner 已提交
674 675 676 677 678 679 680 681 682 683 684 685 686 687
				break;

			/* Be always aligned */
			if (sector & ((1<<(align+3))-1))
				break;

			/* do not cross extent boundaries */
			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
				break;
			/* now, is it actually dirty, after all?
			 * caution, drbd_bm_test_bit is tri-state for some
			 * obscure reason; ( b == 0 ) would get the out-of-band
			 * only accidentally right because of the "oddly sized"
			 * adjustment below */
688
			if (drbd_bm_test_bit(device, bit+1) != 1)
P
Philipp Reisner 已提交
689 690 691 692 693 694 695 696 697 698
				break;
			bit++;
			size += BM_BLOCK_SIZE;
			if ((BM_BLOCK_SIZE << align) <= size)
				align++;
			i++;
		}
		/* if we merged some,
		 * reset the offset to start the next drbd_bm_find_next from */
		if (size > BM_BLOCK_SIZE)
699
			device->bm_resync_fo = bit + 1;
P
Philipp Reisner 已提交
700 701 702 703 704
#endif

		/* adjust very last sectors, in case we are oddly sized */
		if (sector + (size>>9) > capacity)
			size = (capacity-sector)<<9;
705 706

		if (device->use_csums) {
707
			switch (read_for_csum(peer_device, sector, size)) {
708
			case -EIO: /* Disk failure */
709
				put_ldev(device);
710
				return -EIO;
711
			case -EAGAIN: /* allocation failed, or ldev busy */
712 713
				drbd_rs_complete_io(device, sector);
				device->bm_resync_fo = BM_SECT_TO_BIT(sector);
714
				i = rollback_i;
P
Philipp Reisner 已提交
715
				goto requeue;
716 717 718 719 720
			case 0:
				/* everything ok */
				break;
			default:
				BUG();
P
Philipp Reisner 已提交
721 722
			}
		} else {
723 724
			int err;

725
			inc_rs_pending(device);
726
			err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST,
727 728
						 sector, size, ID_SYNCER);
			if (err) {
729
				drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
730 731
				dec_rs_pending(device);
				put_ldev(device);
732
				return err;
P
Philipp Reisner 已提交
733 734 735 736
			}
		}
	}

737
	if (device->bm_resync_fo >= drbd_bm_bits(device)) {
P
Philipp Reisner 已提交
738 739 740 741 742 743
		/* last syncer _request_ was sent,
		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
		 * next sync group will resume), as soon as we receive the last
		 * resync data block, and the last bit is cleared.
		 * until then resync "work" is "inactive" ...
		 */
744
		put_ldev(device);
745
		return 0;
P
Philipp Reisner 已提交
746 747 748
	}

 requeue:
749 750 751
	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
	mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
	put_ldev(device);
752
	return 0;
P
Philipp Reisner 已提交
753 754
}

755
static int make_ov_request(struct drbd_device *device, int cancel)
P
Philipp Reisner 已提交
756 757 758
{
	int number, i, size;
	sector_t sector;
759
	const sector_t capacity = drbd_get_capacity(device->this_bdev);
760
	bool stop_sector_reached = false;
P
Philipp Reisner 已提交
761 762 763 764

	if (unlikely(cancel))
		return 1;

765
	number = drbd_rs_number_requests(device);
P
Philipp Reisner 已提交
766

767
	sector = device->ov_position;
P
Philipp Reisner 已提交
768
	for (i = 0; i < number; i++) {
769
		if (sector >= capacity)
P
Philipp Reisner 已提交
770
			return 1;
771 772 773 774 775

		/* We check for "finished" only in the reply path:
		 * w_e_end_ov_reply().
		 * We need to send at least one request out. */
		stop_sector_reached = i > 0
776 777
			&& verify_can_do_stop_sector(device)
			&& sector >= device->ov_stop_sector;
778 779
		if (stop_sector_reached)
			break;
P
Philipp Reisner 已提交
780 781 782

		size = BM_BLOCK_SIZE;

783
		if (drbd_try_rs_begin_io(device, sector)) {
784
			device->ov_position = sector;
P
Philipp Reisner 已提交
785 786 787 788 789 790
			goto requeue;
		}

		if (sector + (size>>9) > capacity)
			size = (capacity-sector)<<9;

791
		inc_rs_pending(device);
792
		if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
793
			dec_rs_pending(device);
P
Philipp Reisner 已提交
794 795 796 797
			return 0;
		}
		sector += BM_SECT_PER_BIT;
	}
798
	device->ov_position = sector;
P
Philipp Reisner 已提交
799 800

 requeue:
801
	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
802
	if (i == 0 || !stop_sector_reached)
803
		mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
P
Philipp Reisner 已提交
804 805 806
	return 1;
}

807
int w_ov_finished(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
808
{
809 810 811 812
	struct drbd_device_work *dw =
		container_of(w, struct drbd_device_work, w);
	struct drbd_device *device = dw->device;
	kfree(dw);
813 814
	ov_out_of_sync_print(device);
	drbd_resync_finished(device);
P
Philipp Reisner 已提交
815

816
	return 0;
P
Philipp Reisner 已提交
817 818
}

819
static int w_resync_finished(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
820
{
821 822 823 824
	struct drbd_device_work *dw =
		container_of(w, struct drbd_device_work, w);
	struct drbd_device *device = dw->device;
	kfree(dw);
P
Philipp Reisner 已提交
825

826
	drbd_resync_finished(device);
P
Philipp Reisner 已提交
827

828
	return 0;
P
Philipp Reisner 已提交
829 830
}

831
static void ping_peer(struct drbd_device *device)
832
{
833
	struct drbd_connection *connection = first_peer_device(device)->connection;
834

835 836 837 838
	clear_bit(GOT_PING_ACK, &connection->flags);
	request_ping(connection);
	wait_event(connection->ping_wait,
		   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
839 840
}

841
int drbd_resync_finished(struct drbd_device *device)
P
Philipp Reisner 已提交
842 843 844 845
{
	unsigned long db, dt, dbdt;
	unsigned long n_oos;
	union drbd_state os, ns;
846
	struct drbd_device_work *dw;
P
Philipp Reisner 已提交
847
	char *khelper_cmd = NULL;
848
	int verify_done = 0;
P
Philipp Reisner 已提交
849 850 851 852

	/* Remove all elements from the resync LRU. Since future actions
	 * might set bits in the (main) bitmap, then the entries in the
	 * resync LRU would be wrong. */
853
	if (drbd_rs_del_all(device)) {
P
Philipp Reisner 已提交
854 855 856 857 858
		/* In case this is not possible now, most probably because
		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
		 * queue (or even the read operations for those packets
		 * is not finished by now).   Retry in 100ms. */

859
		schedule_timeout_interruptible(HZ / 10);
860 861 862 863 864 865
		dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
		if (dw) {
			dw->w.cb = w_resync_finished;
			dw->device = device;
			drbd_queue_work(&first_peer_device(device)->connection->sender_work,
					&dw->w);
P
Philipp Reisner 已提交
866 867
			return 1;
		}
868
		drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
P
Philipp Reisner 已提交
869 870
	}

871
	dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
P
Philipp Reisner 已提交
872 873
	if (dt <= 0)
		dt = 1;
874

875
	db = device->rs_total;
876
	/* adjust for verify start and stop sectors, respective reached position */
877 878
	if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
		db -= device->ov_left;
879

P
Philipp Reisner 已提交
880
	dbdt = Bit2KB(db/dt);
881
	device->rs_paused /= HZ;
P
Philipp Reisner 已提交
882

883
	if (!get_ldev(device))
P
Philipp Reisner 已提交
884 885
		goto out;

886
	ping_peer(device);
887

888
	spin_lock_irq(&device->resource->req_lock);
889
	os = drbd_read_state(device);
P
Philipp Reisner 已提交
890

891 892
	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);

P
Philipp Reisner 已提交
893 894 895 896 897 898 899 900
	/* This protects us against multiple calls (that can happen in the presence
	   of application IO), and against connectivity loss just before we arrive here. */
	if (os.conn <= C_CONNECTED)
		goto out_unlock;

	ns = os;
	ns.conn = C_CONNECTED;

901
	drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
902
	     verify_done ? "Online verify" : "Resync",
903
	     dt + device->rs_paused, device->rs_paused, dbdt);
P
Philipp Reisner 已提交
904

905
	n_oos = drbd_bm_total_weight(device);
P
Philipp Reisner 已提交
906 907 908

	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
		if (n_oos) {
909
			drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
P
Philipp Reisner 已提交
910 911 912 913
			      n_oos, Bit2KB(1));
			khelper_cmd = "out-of-sync";
		}
	} else {
914
		D_ASSERT(device, (n_oos - device->rs_failed) == 0);
P
Philipp Reisner 已提交
915 916 917 918

		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
			khelper_cmd = "after-resync-target";

919
		if (device->use_csums && device->rs_total) {
920 921
			const unsigned long s = device->rs_same_csum;
			const unsigned long t = device->rs_total;
P
Philipp Reisner 已提交
922 923 924
			const int ratio =
				(t == 0)     ? 0 :
			(t < 100000) ? ((s*100)/t) : (s/(t/100));
925
			drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
P
Philipp Reisner 已提交
926 927
			     "transferred %luK total %luK\n",
			     ratio,
928 929 930
			     Bit2KB(device->rs_same_csum),
			     Bit2KB(device->rs_total - device->rs_same_csum),
			     Bit2KB(device->rs_total));
P
Philipp Reisner 已提交
931 932 933
		}
	}

934
	if (device->rs_failed) {
935
		drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
P
Philipp Reisner 已提交
936 937 938 939 940 941 942 943 944 945 946 947 948

		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
			ns.disk = D_INCONSISTENT;
			ns.pdsk = D_UP_TO_DATE;
		} else {
			ns.disk = D_UP_TO_DATE;
			ns.pdsk = D_INCONSISTENT;
		}
	} else {
		ns.disk = D_UP_TO_DATE;
		ns.pdsk = D_UP_TO_DATE;

		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
949
			if (device->p_uuid) {
P
Philipp Reisner 已提交
950 951
				int i;
				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
952 953 954
					_drbd_uuid_set(device, i, device->p_uuid[i]);
				drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
				_drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
P
Philipp Reisner 已提交
955
			} else {
956
				drbd_err(device, "device->p_uuid is NULL! BUG\n");
P
Philipp Reisner 已提交
957 958 959
			}
		}

960 961 962
		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
			/* for verify runs, we don't update uuids here,
			 * so there would be nothing to report. */
963 964 965
			drbd_uuid_set_bm(device, 0UL);
			drbd_print_uuids(device, "updated UUIDs");
			if (device->p_uuid) {
966 967 968 969
				/* Now the two UUID sets are equal, update what we
				 * know of the peer. */
				int i;
				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
970
					device->p_uuid[i] = device->ldev->md.uuid[i];
971
			}
P
Philipp Reisner 已提交
972 973 974
		}
	}

975
	_drbd_set_state(device, ns, CS_VERBOSE, NULL);
P
Philipp Reisner 已提交
976
out_unlock:
977
	spin_unlock_irq(&device->resource->req_lock);
978
	put_ldev(device);
P
Philipp Reisner 已提交
979
out:
980 981 982
	device->rs_total  = 0;
	device->rs_failed = 0;
	device->rs_paused = 0;
983 984

	/* reset start sector, if we reached end of device */
985 986
	if (verify_done && device->ov_left == 0)
		device->ov_start_sector = 0;
P
Philipp Reisner 已提交
987

988
	drbd_md_sync(device);
989

P
Philipp Reisner 已提交
990
	if (khelper_cmd)
991
		drbd_khelper(device, khelper_cmd);
P
Philipp Reisner 已提交
992 993 994 995 996

	return 1;
}

/* helper */
997
static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
P
Philipp Reisner 已提交
998
{
999
	if (drbd_peer_req_has_active_page(peer_req)) {
P
Philipp Reisner 已提交
1000
		/* This might happen if sendpage() has not finished */
1001
		int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1002 1003
		atomic_add(i, &device->pp_in_use_by_net);
		atomic_sub(i, &device->pp_in_use);
1004
		spin_lock_irq(&device->resource->req_lock);
1005
		list_add_tail(&peer_req->w.list, &device->net_ee);
1006
		spin_unlock_irq(&device->resource->req_lock);
1007
		wake_up(&drbd_pp_wait);
P
Philipp Reisner 已提交
1008
	} else
1009
		drbd_free_peer_req(device, peer_req);
P
Philipp Reisner 已提交
1010 1011 1012 1013
}

/**
 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1014
 * @device:	DRBD device.
P
Philipp Reisner 已提交
1015 1016 1017
 * @w:		work object.
 * @cancel:	The connection will be closed anyways
 */
1018
int w_e_end_data_req(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
1019
{
1020
	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1021 1022
	struct drbd_peer_device *peer_device = peer_req->peer_device;
	struct drbd_device *device = peer_device->device;
1023
	int err;
P
Philipp Reisner 已提交
1024 1025

	if (unlikely(cancel)) {
1026 1027
		drbd_free_peer_req(device, peer_req);
		dec_unacked(device);
1028
		return 0;
P
Philipp Reisner 已提交
1029 1030
	}

1031
	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1032
		err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
P
Philipp Reisner 已提交
1033 1034
	} else {
		if (__ratelimit(&drbd_ratelimit_state))
1035
			drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1036
			    (unsigned long long)peer_req->i.sector);
P
Philipp Reisner 已提交
1037

1038
		err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
P
Philipp Reisner 已提交
1039 1040
	}

1041
	dec_unacked(device);
P
Philipp Reisner 已提交
1042

1043
	move_to_net_ee_or_free(device, peer_req);
P
Philipp Reisner 已提交
1044

1045
	if (unlikely(err))
1046
		drbd_err(device, "drbd_send_block() failed\n");
1047
	return err;
P
Philipp Reisner 已提交
1048 1049 1050
}

/**
1051
 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
P
Philipp Reisner 已提交
1052 1053 1054
 * @w:		work object.
 * @cancel:	The connection will be closed anyways
 */
1055
int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
1056
{
1057
	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1058 1059
	struct drbd_peer_device *peer_device = peer_req->peer_device;
	struct drbd_device *device = peer_device->device;
1060
	int err;
P
Philipp Reisner 已提交
1061 1062

	if (unlikely(cancel)) {
1063 1064
		drbd_free_peer_req(device, peer_req);
		dec_unacked(device);
1065
		return 0;
P
Philipp Reisner 已提交
1066 1067
	}

1068 1069 1070
	if (get_ldev_if_state(device, D_FAILED)) {
		drbd_rs_complete_io(device, peer_req->i.sector);
		put_ldev(device);
P
Philipp Reisner 已提交
1071 1072
	}

1073
	if (device->state.conn == C_AHEAD) {
1074
		err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1075
	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1076 1077
		if (likely(device->state.pdsk >= D_INCONSISTENT)) {
			inc_rs_pending(device);
1078
			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
P
Philipp Reisner 已提交
1079 1080
		} else {
			if (__ratelimit(&drbd_ratelimit_state))
1081
				drbd_err(device, "Not sending RSDataReply, "
P
Philipp Reisner 已提交
1082
				    "partner DISKLESS!\n");
1083
			err = 0;
P
Philipp Reisner 已提交
1084 1085 1086
		}
	} else {
		if (__ratelimit(&drbd_ratelimit_state))
1087
			drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1088
			    (unsigned long long)peer_req->i.sector);
P
Philipp Reisner 已提交
1089

1090
		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
P
Philipp Reisner 已提交
1091 1092

		/* update resync data with failure */
1093
		drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
P
Philipp Reisner 已提交
1094 1095
	}

1096
	dec_unacked(device);
P
Philipp Reisner 已提交
1097

1098
	move_to_net_ee_or_free(device, peer_req);
P
Philipp Reisner 已提交
1099

1100
	if (unlikely(err))
1101
		drbd_err(device, "drbd_send_block() failed\n");
1102
	return err;
P
Philipp Reisner 已提交
1103 1104
}

1105
int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
1106
{
1107
	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1108 1109
	struct drbd_peer_device *peer_device = peer_req->peer_device;
	struct drbd_device *device = peer_device->device;
P
Philipp Reisner 已提交
1110 1111 1112
	struct digest_info *di;
	int digest_size;
	void *digest = NULL;
1113
	int err, eq = 0;
P
Philipp Reisner 已提交
1114 1115

	if (unlikely(cancel)) {
1116 1117
		drbd_free_peer_req(device, peer_req);
		dec_unacked(device);
1118
		return 0;
P
Philipp Reisner 已提交
1119 1120
	}

1121 1122 1123
	if (get_ldev(device)) {
		drbd_rs_complete_io(device, peer_req->i.sector);
		put_ldev(device);
1124
	}
P
Philipp Reisner 已提交
1125

1126
	di = peer_req->digest;
P
Philipp Reisner 已提交
1127

1128
	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
P
Philipp Reisner 已提交
1129 1130 1131
		/* quick hack to try to avoid a race against reconfiguration.
		 * a real fix would be much more involved,
		 * introducing more locking mechanisms */
1132 1133
		if (peer_device->connection->csums_tfm) {
			digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
1134
			D_ASSERT(device, digest_size == di->digest_size);
P
Philipp Reisner 已提交
1135 1136 1137
			digest = kmalloc(digest_size, GFP_NOIO);
		}
		if (digest) {
1138
			drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
P
Philipp Reisner 已提交
1139 1140 1141 1142 1143
			eq = !memcmp(digest, di->digest, digest_size);
			kfree(digest);
		}

		if (eq) {
1144
			drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1145
			/* rs_same_csums unit is BM_BLOCK_SIZE */
1146
			device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1147
			err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
P
Philipp Reisner 已提交
1148
		} else {
1149
			inc_rs_pending(device);
1150 1151
			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1152
			kfree(di);
1153
			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
P
Philipp Reisner 已提交
1154 1155
		}
	} else {
1156
		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
P
Philipp Reisner 已提交
1157
		if (__ratelimit(&drbd_ratelimit_state))
1158
			drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
P
Philipp Reisner 已提交
1159 1160
	}

1161 1162
	dec_unacked(device);
	move_to_net_ee_or_free(device, peer_req);
P
Philipp Reisner 已提交
1163

1164
	if (unlikely(err))
1165
		drbd_err(device, "drbd_send_block/ack() failed\n");
1166
	return err;
P
Philipp Reisner 已提交
1167 1168
}

1169
int w_e_end_ov_req(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
1170
{
1171
	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1172 1173
	struct drbd_peer_device *peer_device = peer_req->peer_device;
	struct drbd_device *device = peer_device->device;
1174 1175
	sector_t sector = peer_req->i.sector;
	unsigned int size = peer_req->i.size;
P
Philipp Reisner 已提交
1176 1177
	int digest_size;
	void *digest;
1178
	int err = 0;
P
Philipp Reisner 已提交
1179 1180 1181 1182

	if (unlikely(cancel))
		goto out;

1183
	digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
P
Philipp Reisner 已提交
1184
	digest = kmalloc(digest_size, GFP_NOIO);
1185
	if (!digest) {
1186
		err = 1;	/* terminate the connection in case the allocation failed */
1187
		goto out;
P
Philipp Reisner 已提交
1188 1189
	}

1190
	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1191
		drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1192 1193 1194
	else
		memset(digest, 0, digest_size);

1195 1196 1197 1198
	/* Free e and pages before send.
	 * In case we block on congestion, we could otherwise run into
	 * some distributed deadlock, if the other side blocks on
	 * congestion as well, because our receiver blocks in
1199
	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1200
	drbd_free_peer_req(device, peer_req);
1201
	peer_req = NULL;
1202
	inc_rs_pending(device);
1203
	err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1204
	if (err)
1205
		dec_rs_pending(device);
1206 1207
	kfree(digest);

P
Philipp Reisner 已提交
1208
out:
1209
	if (peer_req)
1210 1211
		drbd_free_peer_req(device, peer_req);
	dec_unacked(device);
1212
	return err;
P
Philipp Reisner 已提交
1213 1214
}

1215
void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
P
Philipp Reisner 已提交
1216
{
1217 1218
	if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
		device->ov_last_oos_size += size>>9;
P
Philipp Reisner 已提交
1219
	} else {
1220 1221
		device->ov_last_oos_start = sector;
		device->ov_last_oos_size = size>>9;
P
Philipp Reisner 已提交
1222
	}
1223
	drbd_set_out_of_sync(device, sector, size);
P
Philipp Reisner 已提交
1224 1225
}

1226
int w_e_end_ov_reply(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
1227
{
1228
	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1229 1230
	struct drbd_peer_device *peer_device = peer_req->peer_device;
	struct drbd_device *device = peer_device->device;
P
Philipp Reisner 已提交
1231 1232
	struct digest_info *di;
	void *digest;
1233 1234
	sector_t sector = peer_req->i.sector;
	unsigned int size = peer_req->i.size;
1235
	int digest_size;
1236
	int err, eq = 0;
1237
	bool stop_sector_reached = false;
P
Philipp Reisner 已提交
1238 1239

	if (unlikely(cancel)) {
1240 1241
		drbd_free_peer_req(device, peer_req);
		dec_unacked(device);
1242
		return 0;
P
Philipp Reisner 已提交
1243 1244 1245 1246
	}

	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
	 * the resync lru has been cleaned up already */
1247 1248 1249
	if (get_ldev(device)) {
		drbd_rs_complete_io(device, peer_req->i.sector);
		put_ldev(device);
1250
	}
P
Philipp Reisner 已提交
1251

1252
	di = peer_req->digest;
P
Philipp Reisner 已提交
1253

1254
	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1255
		digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
P
Philipp Reisner 已提交
1256 1257
		digest = kmalloc(digest_size, GFP_NOIO);
		if (digest) {
1258
			drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
P
Philipp Reisner 已提交
1259

1260
			D_ASSERT(device, digest_size == di->digest_size);
P
Philipp Reisner 已提交
1261 1262 1263 1264 1265
			eq = !memcmp(digest, di->digest, digest_size);
			kfree(digest);
		}
	}

1266 1267 1268 1269
	/* Free peer_req and pages before send.
	 * In case we block on congestion, we could otherwise run into
	 * some distributed deadlock, if the other side blocks on
	 * congestion as well, because our receiver blocks in
1270
	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1271
	drbd_free_peer_req(device, peer_req);
P
Philipp Reisner 已提交
1272
	if (!eq)
1273
		drbd_ov_out_of_sync_found(device, sector, size);
P
Philipp Reisner 已提交
1274
	else
1275
		ov_out_of_sync_print(device);
P
Philipp Reisner 已提交
1276

1277
	err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1278
			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
P
Philipp Reisner 已提交
1279

1280
	dec_unacked(device);
P
Philipp Reisner 已提交
1281

1282
	--device->ov_left;
1283 1284

	/* let's advance progress step marks only for every other megabyte */
1285 1286
	if ((device->ov_left & 0x200) == 0x200)
		drbd_advance_rs_marks(device, device->ov_left);
1287

1288 1289
	stop_sector_reached = verify_can_do_stop_sector(device) &&
		(sector + (size>>9)) >= device->ov_stop_sector;
1290

1291 1292 1293
	if (device->ov_left == 0 || stop_sector_reached) {
		ov_out_of_sync_print(device);
		drbd_resync_finished(device);
P
Philipp Reisner 已提交
1294 1295
	}

1296
	return err;
P
Philipp Reisner 已提交
1297 1298
}

1299 1300 1301 1302 1303
/* FIXME
 * We need to track the number of pending barrier acks,
 * and to be able to wait for them.
 * See also comment in drbd_adm_attach before drbd_suspend_io.
 */
1304
static int drbd_send_barrier(struct drbd_connection *connection)
P
Philipp Reisner 已提交
1305
{
1306
	struct p_barrier *p;
1307
	struct drbd_socket *sock;
P
Philipp Reisner 已提交
1308

1309 1310
	sock = &connection->data;
	p = conn_prepare_command(connection, sock);
1311 1312
	if (!p)
		return -EIO;
1313
	p->barrier = connection->send.current_epoch_nr;
1314
	p->pad = 0;
1315
	connection->send.current_epoch_writes = 0;
1316

1317
	return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
P
Philipp Reisner 已提交
1318 1319
}

1320
int w_send_write_hint(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
1321
{
1322 1323
	struct drbd_device *device =
		container_of(w, struct drbd_device, unplug_work);
1324 1325
	struct drbd_socket *sock;

P
Philipp Reisner 已提交
1326
	if (cancel)
1327
		return 0;
1328
	sock = &first_peer_device(device)->connection->data;
1329
	if (!drbd_prepare_command(first_peer_device(device), sock))
1330
		return -EIO;
1331
	return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
P
Philipp Reisner 已提交
1332 1333
}

1334
static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1335
{
1336 1337 1338 1339
	if (!connection->send.seen_any_write_yet) {
		connection->send.seen_any_write_yet = true;
		connection->send.current_epoch_nr = epoch;
		connection->send.current_epoch_writes = 0;
1340 1341 1342
	}
}

1343
static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1344 1345
{
	/* re-init if first write on this connection */
1346
	if (!connection->send.seen_any_write_yet)
1347
		return;
1348 1349 1350 1351
	if (connection->send.current_epoch_nr != epoch) {
		if (connection->send.current_epoch_writes)
			drbd_send_barrier(connection);
		connection->send.current_epoch_nr = epoch;
1352 1353 1354
	}
}

1355
int w_send_out_of_sync(struct drbd_work *w, int cancel)
1356 1357
{
	struct drbd_request *req = container_of(w, struct drbd_request, w);
1358
	struct drbd_device *device = req->device;
1359 1360
	struct drbd_peer_device *const peer_device = first_peer_device(device);
	struct drbd_connection *const connection = peer_device->connection;
1361
	int err;
1362 1363

	if (unlikely(cancel)) {
1364
		req_mod(req, SEND_CANCELED);
1365
		return 0;
1366
	}
1367
	req->pre_send_jif = jiffies;
1368

1369
	/* this time, no connection->send.current_epoch_writes++;
1370 1371 1372
	 * If it was sent, it was the closing barrier for the last
	 * replicated epoch, before we went into AHEAD mode.
	 * No more barriers will be sent, until we leave AHEAD mode again. */
1373
	maybe_send_barrier(connection, req->epoch);
1374

1375
	err = drbd_send_out_of_sync(peer_device, req);
1376
	req_mod(req, OOS_HANDED_TO_NETWORK);
1377

1378
	return err;
1379 1380
}

P
Philipp Reisner 已提交
1381 1382 1383 1384 1385
/**
 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
 * @w:		work object.
 * @cancel:	The connection will be closed anyways
 */
1386
int w_send_dblock(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
1387 1388
{
	struct drbd_request *req = container_of(w, struct drbd_request, w);
1389
	struct drbd_device *device = req->device;
1390 1391
	struct drbd_peer_device *const peer_device = first_peer_device(device);
	struct drbd_connection *connection = peer_device->connection;
1392
	int err;
P
Philipp Reisner 已提交
1393 1394

	if (unlikely(cancel)) {
1395
		req_mod(req, SEND_CANCELED);
1396
		return 0;
P
Philipp Reisner 已提交
1397
	}
1398
	req->pre_send_jif = jiffies;
P
Philipp Reisner 已提交
1399

1400 1401 1402
	re_init_if_first_write(connection, req->epoch);
	maybe_send_barrier(connection, req->epoch);
	connection->send.current_epoch_writes++;
1403

1404
	err = drbd_send_dblock(peer_device, req);
1405
	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
P
Philipp Reisner 已提交
1406

1407
	return err;
P
Philipp Reisner 已提交
1408 1409 1410 1411 1412 1413 1414
}

/**
 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
 * @w:		work object.
 * @cancel:	The connection will be closed anyways
 */
1415
int w_send_read_req(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
1416 1417
{
	struct drbd_request *req = container_of(w, struct drbd_request, w);
1418
	struct drbd_device *device = req->device;
1419 1420
	struct drbd_peer_device *const peer_device = first_peer_device(device);
	struct drbd_connection *connection = peer_device->connection;
1421
	int err;
P
Philipp Reisner 已提交
1422 1423

	if (unlikely(cancel)) {
1424
		req_mod(req, SEND_CANCELED);
1425
		return 0;
P
Philipp Reisner 已提交
1426
	}
1427
	req->pre_send_jif = jiffies;
P
Philipp Reisner 已提交
1428

1429 1430
	/* Even read requests may close a write epoch,
	 * if there was any yet. */
1431
	maybe_send_barrier(connection, req->epoch);
1432

1433
	err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1434
				 (unsigned long)req);
P
Philipp Reisner 已提交
1435

1436
	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
P
Philipp Reisner 已提交
1437

1438
	return err;
P
Philipp Reisner 已提交
1439 1440
}

1441
int w_restart_disk_io(struct drbd_work *w, int cancel)
1442 1443
{
	struct drbd_request *req = container_of(w, struct drbd_request, w);
1444
	struct drbd_device *device = req->device;
1445

1446
	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1447
		drbd_al_begin_io(device, &req->i);
1448 1449

	drbd_req_make_private_bio(req, req->master_bio);
1450
	req->private_bio->bi_bdev = device->ldev->backing_bdev;
1451 1452
	generic_make_request(req->private_bio);

1453
	return 0;
1454 1455
}

1456
static int _drbd_may_sync_now(struct drbd_device *device)
P
Philipp Reisner 已提交
1457
{
1458
	struct drbd_device *odev = device;
1459
	int resync_after;
P
Philipp Reisner 已提交
1460 1461

	while (1) {
1462
		if (!odev->ldev || odev->state.disk == D_DISKLESS)
1463
			return 1;
P
Philipp Reisner 已提交
1464
		rcu_read_lock();
1465
		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
P
Philipp Reisner 已提交
1466
		rcu_read_unlock();
1467
		if (resync_after == -1)
P
Philipp Reisner 已提交
1468
			return 1;
1469
		odev = minor_to_device(resync_after);
1470
		if (!odev)
1471
			return 1;
P
Philipp Reisner 已提交
1472 1473 1474 1475 1476 1477 1478 1479 1480 1481
		if ((odev->state.conn >= C_SYNC_SOURCE &&
		     odev->state.conn <= C_PAUSED_SYNC_T) ||
		    odev->state.aftr_isp || odev->state.peer_isp ||
		    odev->state.user_isp)
			return 0;
	}
}

/**
 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1482
 * @device:	DRBD device.
P
Philipp Reisner 已提交
1483 1484 1485
 *
 * Called from process context only (admin command and after_state_ch).
 */
1486
static int _drbd_pause_after(struct drbd_device *device)
P
Philipp Reisner 已提交
1487
{
1488
	struct drbd_device *odev;
P
Philipp Reisner 已提交
1489 1490
	int i, rv = 0;

1491
	rcu_read_lock();
1492
	idr_for_each_entry(&drbd_devices, odev, i) {
P
Philipp Reisner 已提交
1493 1494 1495 1496 1497 1498
		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
			continue;
		if (!_drbd_may_sync_now(odev))
			rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
			       != SS_NOTHING_TO_DO);
	}
1499
	rcu_read_unlock();
P
Philipp Reisner 已提交
1500 1501 1502 1503 1504 1505

	return rv;
}

/**
 * _drbd_resume_next() - Resume resync on all devices that may resync now
1506
 * @device:	DRBD device.
P
Philipp Reisner 已提交
1507 1508 1509
 *
 * Called from process context only (admin command and worker).
 */
1510
static int _drbd_resume_next(struct drbd_device *device)
P
Philipp Reisner 已提交
1511
{
1512
	struct drbd_device *odev;
P
Philipp Reisner 已提交
1513 1514
	int i, rv = 0;

1515
	rcu_read_lock();
1516
	idr_for_each_entry(&drbd_devices, odev, i) {
P
Philipp Reisner 已提交
1517 1518 1519 1520 1521 1522 1523 1524 1525
		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
			continue;
		if (odev->state.aftr_isp) {
			if (_drbd_may_sync_now(odev))
				rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
							CS_HARD, NULL)
				       != SS_NOTHING_TO_DO) ;
		}
	}
1526
	rcu_read_unlock();
P
Philipp Reisner 已提交
1527 1528 1529
	return rv;
}

1530
void resume_next_sg(struct drbd_device *device)
P
Philipp Reisner 已提交
1531 1532
{
	write_lock_irq(&global_state_lock);
1533
	_drbd_resume_next(device);
P
Philipp Reisner 已提交
1534 1535 1536
	write_unlock_irq(&global_state_lock);
}

1537
void suspend_other_sg(struct drbd_device *device)
P
Philipp Reisner 已提交
1538 1539
{
	write_lock_irq(&global_state_lock);
1540
	_drbd_pause_after(device);
P
Philipp Reisner 已提交
1541 1542 1543
	write_unlock_irq(&global_state_lock);
}

1544
/* caller must hold global_state_lock */
1545
enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
P
Philipp Reisner 已提交
1546
{
1547
	struct drbd_device *odev;
1548
	int resync_after;
P
Philipp Reisner 已提交
1549 1550 1551

	if (o_minor == -1)
		return NO_ERROR;
1552
	if (o_minor < -1 || o_minor > MINORMASK)
1553
		return ERR_RESYNC_AFTER;
P
Philipp Reisner 已提交
1554 1555

	/* check for loops */
1556
	odev = minor_to_device(o_minor);
P
Philipp Reisner 已提交
1557
	while (1) {
1558
		if (odev == device)
1559
			return ERR_RESYNC_AFTER_CYCLE;
P
Philipp Reisner 已提交
1560

1561 1562 1563 1564 1565 1566 1567 1568 1569
		/* You are free to depend on diskless, non-existing,
		 * or not yet/no longer existing minors.
		 * We only reject dependency loops.
		 * We cannot follow the dependency chain beyond a detached or
		 * missing minor.
		 */
		if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
			return NO_ERROR;

P
Philipp Reisner 已提交
1570
		rcu_read_lock();
1571
		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
P
Philipp Reisner 已提交
1572
		rcu_read_unlock();
P
Philipp Reisner 已提交
1573
		/* dependency chain ends here, no cycles. */
1574
		if (resync_after == -1)
P
Philipp Reisner 已提交
1575 1576 1577
			return NO_ERROR;

		/* follow the dependency chain */
1578
		odev = minor_to_device(resync_after);
P
Philipp Reisner 已提交
1579 1580 1581
	}
}

1582
/* caller must hold global_state_lock */
1583
void drbd_resync_after_changed(struct drbd_device *device)
P
Philipp Reisner 已提交
1584 1585 1586
{
	int changes;

1587
	do {
1588 1589
		changes  = _drbd_pause_after(device);
		changes |= _drbd_resume_next(device);
1590
	} while (changes);
P
Philipp Reisner 已提交
1591 1592
}

1593
void drbd_rs_controller_reset(struct drbd_device *device)
1594
{
P
Philipp Reisner 已提交
1595 1596
	struct fifo_buffer *plan;

1597 1598 1599
	atomic_set(&device->rs_sect_in, 0);
	atomic_set(&device->rs_sect_ev, 0);
	device->rs_in_flight = 0;
P
Philipp Reisner 已提交
1600 1601 1602 1603 1604 1605

	/* Updating the RCU protected object in place is necessary since
	   this function gets called from atomic context.
	   It is valid since all other updates also lead to an completely
	   empty fifo */
	rcu_read_lock();
1606
	plan = rcu_dereference(device->rs_plan_s);
P
Philipp Reisner 已提交
1607 1608 1609
	plan->total = 0;
	fifo_set(plan, 0);
	rcu_read_unlock();
1610 1611
}

P
Philipp Reisner 已提交
1612 1613
void start_resync_timer_fn(unsigned long data)
{
1614
	struct drbd_device *device = (struct drbd_device *) data;
1615
	drbd_device_post_work(device, RS_START);
P
Philipp Reisner 已提交
1616 1617
}

1618
static void do_start_resync(struct drbd_device *device)
P
Philipp Reisner 已提交
1619
{
1620
	if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1621
		drbd_warn(device, "postponing start_resync ...\n");
1622 1623
		device->start_resync_timer.expires = jiffies + HZ/10;
		add_timer(&device->start_resync_timer);
1624
		return;
P
Philipp Reisner 已提交
1625 1626
	}

1627 1628
	drbd_start_resync(device, C_SYNC_SOURCE);
	clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
P
Philipp Reisner 已提交
1629 1630
}

1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642
static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
{
	bool csums_after_crash_only;
	rcu_read_lock();
	csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
	rcu_read_unlock();
	return connection->agreed_pro_version >= 89 &&		/* supported? */
		connection->csums_tfm &&			/* configured? */
		(csums_after_crash_only == 0			/* use for each resync? */
		 || test_bit(CRASHED_PRIMARY, &device->flags));	/* or only after Primary crash? */
}

P
Philipp Reisner 已提交
1643 1644
/**
 * drbd_start_resync() - Start the resync process
1645
 * @device:	DRBD device.
P
Philipp Reisner 已提交
1646 1647 1648 1649 1650
 * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
 *
 * This function might bring you directly into one of the
 * C_PAUSED_SYNC_* states.
 */
1651
void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
P
Philipp Reisner 已提交
1652
{
1653 1654
	struct drbd_peer_device *peer_device = first_peer_device(device);
	struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
P
Philipp Reisner 已提交
1655 1656 1657
	union drbd_state ns;
	int r;

1658
	if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1659
		drbd_err(device, "Resync already running!\n");
P
Philipp Reisner 已提交
1660 1661 1662
		return;
	}

1663
	if (!test_bit(B_RS_H_DONE, &device->flags)) {
1664 1665 1666 1667
		if (side == C_SYNC_TARGET) {
			/* Since application IO was locked out during C_WF_BITMAP_T and
			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
			   we check that we might make the data inconsistent. */
1668
			r = drbd_khelper(device, "before-resync-target");
1669 1670
			r = (r >> 8) & 0xff;
			if (r > 0) {
1671
				drbd_info(device, "before-resync-target handler returned %d, "
1672
					 "dropping connection.\n", r);
1673
				conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1674 1675
				return;
			}
1676
		} else /* C_SYNC_SOURCE */ {
1677
			r = drbd_khelper(device, "before-resync-source");
1678 1679 1680
			r = (r >> 8) & 0xff;
			if (r > 0) {
				if (r == 3) {
1681
					drbd_info(device, "before-resync-source handler returned %d, "
1682 1683
						 "ignoring. Old userland tools?", r);
				} else {
1684
					drbd_info(device, "before-resync-source handler returned %d, "
1685
						 "dropping connection.\n", r);
1686
					conn_request_state(connection,
1687
							   NS(conn, C_DISCONNECTING), CS_HARD);
1688 1689 1690
					return;
				}
			}
1691
		}
P
Philipp Reisner 已提交
1692 1693
	}

1694
	if (current == connection->worker.task) {
1695
		/* The worker should not sleep waiting for state_mutex,
1696
		   that can take long */
1697 1698 1699 1700
		if (!mutex_trylock(device->state_mutex)) {
			set_bit(B_RS_H_DONE, &device->flags);
			device->start_resync_timer.expires = jiffies + HZ/5;
			add_timer(&device->start_resync_timer);
1701 1702 1703
			return;
		}
	} else {
1704
		mutex_lock(device->state_mutex);
1705
	}
1706
	clear_bit(B_RS_H_DONE, &device->flags);
P
Philipp Reisner 已提交
1707

1708 1709 1710 1711
	/* req_lock: serialize with drbd_send_and_submit() and others
	 * global_state_lock: for stable sync-after dependencies */
	spin_lock_irq(&device->resource->req_lock);
	write_lock(&global_state_lock);
1712
	/* Did some connection breakage or IO error race with us? */
1713 1714
	if (device->state.conn < C_CONNECTED
	|| !get_ldev_if_state(device, D_NEGOTIATING)) {
1715 1716
		write_unlock(&global_state_lock);
		spin_unlock_irq(&device->resource->req_lock);
1717
		mutex_unlock(device->state_mutex);
P
Philipp Reisner 已提交
1718 1719 1720
		return;
	}

1721
	ns = drbd_read_state(device);
P
Philipp Reisner 已提交
1722

1723
	ns.aftr_isp = !_drbd_may_sync_now(device);
P
Philipp Reisner 已提交
1724 1725 1726 1727 1728 1729 1730 1731

	ns.conn = side;

	if (side == C_SYNC_TARGET)
		ns.disk = D_INCONSISTENT;
	else /* side == C_SYNC_SOURCE */
		ns.pdsk = D_INCONSISTENT;

1732 1733
	r = __drbd_set_state(device, ns, CS_VERBOSE, NULL);
	ns = drbd_read_state(device);
P
Philipp Reisner 已提交
1734 1735 1736 1737 1738

	if (ns.conn < C_CONNECTED)
		r = SS_UNKNOWN_ERROR;

	if (r == SS_SUCCESS) {
1739
		unsigned long tw = drbd_bm_total_weight(device);
1740 1741 1742
		unsigned long now = jiffies;
		int i;

1743 1744 1745 1746 1747 1748 1749
		device->rs_failed    = 0;
		device->rs_paused    = 0;
		device->rs_same_csum = 0;
		device->rs_last_events = 0;
		device->rs_last_sect_ev = 0;
		device->rs_total     = tw;
		device->rs_start     = now;
1750
		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1751 1752
			device->rs_mark_left[i] = tw;
			device->rs_mark_time[i] = now;
1753
		}
1754
		_drbd_pause_after(device);
1755 1756 1757 1758 1759 1760 1761 1762
		/* Forget potentially stale cached per resync extent bit-counts.
		 * Open coded drbd_rs_cancel_all(device), we already have IRQs
		 * disabled, and know the disk state is ok. */
		spin_lock(&device->al_lock);
		lc_reset(device->resync);
		device->resync_locked = 0;
		device->resync_wenr = LC_FREE;
		spin_unlock(&device->al_lock);
P
Philipp Reisner 已提交
1763
	}
1764 1765
	write_unlock(&global_state_lock);
	spin_unlock_irq(&device->resource->req_lock);
1766

P
Philipp Reisner 已提交
1767
	if (r == SS_SUCCESS) {
1768
		wake_up(&device->al_wait); /* for lc_reset() above */
1769 1770
		/* reset rs_last_bcast when a resync or verify is started,
		 * to deal with potential jiffies wrap. */
1771
		device->rs_last_bcast = jiffies - HZ;
1772

1773
		drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
P
Philipp Reisner 已提交
1774
		     drbd_conn_str(ns.conn),
1775 1776
		     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
		     (unsigned long) device->rs_total);
1777
		if (side == C_SYNC_TARGET) {
1778
			device->bm_resync_fo = 0;
1779 1780 1781 1782
			device->use_csums = use_checksum_based_resync(connection, device);
		} else {
			device->use_csums = 0;
		}
1783 1784 1785 1786 1787 1788 1789 1790

		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
		 * with w_send_oos, or the sync target will get confused as to
		 * how much bits to resync.  We cannot do that always, because for an
		 * empty resync and protocol < 95, we need to do it here, as we call
		 * drbd_resync_finished from here in that case.
		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
		 * and from after_state_ch otherwise. */
1791 1792
		if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
			drbd_gen_and_send_sync_uuid(peer_device);
P
Philipp Reisner 已提交
1793

1794
		if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1795 1796 1797 1798 1799 1800 1801 1802 1803 1804
			/* This still has a race (about when exactly the peers
			 * detect connection loss) that can lead to a full sync
			 * on next handshake. In 8.3.9 we fixed this with explicit
			 * resync-finished notifications, but the fix
			 * introduces a protocol change.  Sleeping for some
			 * time longer than the ping interval + timeout on the
			 * SyncSource, to give the SyncTarget the chance to
			 * detect connection loss, then waiting for a ping
			 * response (implicit in drbd_resync_finished) reduces
			 * the race considerably, but does not solve it. */
1805 1806 1807 1808 1809
			if (side == C_SYNC_SOURCE) {
				struct net_conf *nc;
				int timeo;

				rcu_read_lock();
1810
				nc = rcu_dereference(connection->net_conf);
1811 1812 1813 1814
				timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
				rcu_read_unlock();
				schedule_timeout_interruptible(timeo);
			}
1815
			drbd_resync_finished(device);
P
Philipp Reisner 已提交
1816 1817
		}

1818 1819
		drbd_rs_controller_reset(device);
		/* ns.conn may already be != device->state.conn,
P
Philipp Reisner 已提交
1820 1821 1822 1823
		 * we may have been paused in between, or become paused until
		 * the timer triggers.
		 * No matter, that is handled in resync_timer_fn() */
		if (ns.conn == C_SYNC_TARGET)
1824
			mod_timer(&device->resync_timer, jiffies);
P
Philipp Reisner 已提交
1825

1826
		drbd_md_sync(device);
P
Philipp Reisner 已提交
1827
	}
1828 1829
	put_ldev(device);
	mutex_unlock(device->state_mutex);
P
Philipp Reisner 已提交
1830 1831
}

1832
static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1833 1834 1835 1836 1837 1838 1839 1840
{
	struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
	device->rs_last_bcast = jiffies;

	if (!get_ldev(device))
		return;

	drbd_bm_write_lazy(device, 0);
1841
	if (resync_done && is_sync_state(device->state.conn))
1842
		drbd_resync_finished(device);
1843

1844 1845 1846 1847 1848 1849
	drbd_bcast_event(device, &sib);
	/* update timestamp, in case it took a while to write out stuff */
	device->rs_last_bcast = jiffies;
	put_ldev(device);
}

1850 1851 1852 1853 1854 1855
static void drbd_ldev_destroy(struct drbd_device *device)
{
	lc_destroy(device->resync);
	device->resync = NULL;
	lc_destroy(device->act_log);
	device->act_log = NULL;
1856 1857 1858 1859 1860 1861

	__acquire(local);
	drbd_free_ldev(device->ldev);
	device->ldev = NULL;
	__release(local);

1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903
	clear_bit(GOING_DISKLESS, &device->flags);
	wake_up(&device->misc_wait);
}

static void go_diskless(struct drbd_device *device)
{
	D_ASSERT(device, device->state.disk == D_FAILED);
	/* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
	 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
	 * the protected members anymore, though, so once put_ldev reaches zero
	 * again, it will be safe to free them. */

	/* Try to write changed bitmap pages, read errors may have just
	 * set some bits outside the area covered by the activity log.
	 *
	 * If we have an IO error during the bitmap writeout,
	 * we will want a full sync next time, just in case.
	 * (Do we want a specific meta data flag for this?)
	 *
	 * If that does not make it to stable storage either,
	 * we cannot do anything about that anymore.
	 *
	 * We still need to check if both bitmap and ldev are present, we may
	 * end up here after a failed attach, before ldev was even assigned.
	 */
	if (device->bitmap && device->ldev) {
		/* An interrupted resync or similar is allowed to recounts bits
		 * while we detach.
		 * Any modifications would not be expected anymore, though.
		 */
		if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
					"detach", BM_LOCKED_TEST_ALLOWED)) {
			if (test_bit(WAS_READ_ERROR, &device->flags)) {
				drbd_md_set_flag(device, MDF_FULL_SYNC);
				drbd_md_sync(device);
			}
		}
	}

	drbd_force_state(device, NS(disk, D_DISKLESS));
}

1904 1905 1906 1907 1908 1909 1910
static int do_md_sync(struct drbd_device *device)
{
	drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
	drbd_md_sync(device);
	return 0;
}

1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933
/* only called from drbd_worker thread, no locking */
void __update_timing_details(
		struct drbd_thread_timing_details *tdp,
		unsigned int *cb_nr,
		void *cb,
		const char *fn, const unsigned int line)
{
	unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
	struct drbd_thread_timing_details *td = tdp + i;

	td->start_jif = jiffies;
	td->cb_addr = cb;
	td->caller_fn = fn;
	td->line = line;
	td->cb_nr = *cb_nr;

	i = (i+1) % DRBD_THREAD_DETAILS_HIST;
	td = tdp + i;
	memset(td, 0, sizeof(*td));

	++(*cb_nr);
}

1934 1935 1936
#define WORK_PENDING(work_bit, todo)	(todo & (1UL << work_bit))
static void do_device_work(struct drbd_device *device, const unsigned long todo)
{
1937 1938
	if (WORK_PENDING(MD_SYNC, todo))
		do_md_sync(device);
1939 1940 1941 1942 1943 1944 1945
	if (WORK_PENDING(RS_DONE, todo) ||
	    WORK_PENDING(RS_PROGRESS, todo))
		update_on_disk_bitmap(device, WORK_PENDING(RS_DONE, todo));
	if (WORK_PENDING(GO_DISKLESS, todo))
		go_diskless(device);
	if (WORK_PENDING(DESTROY_DISK, todo))
		drbd_ldev_destroy(device);
1946 1947
	if (WORK_PENDING(RS_START, todo))
		do_start_resync(device);
1948 1949 1950 1951 1952
}

#define DRBD_DEVICE_WORK_MASK	\
	((1UL << GO_DISKLESS)	\
	|(1UL << DESTROY_DISK)	\
1953 1954
	|(1UL << MD_SYNC)	\
	|(1UL << RS_START)	\
1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969
	|(1UL << RS_PROGRESS)	\
	|(1UL << RS_DONE)	\
	)

static unsigned long get_work_bits(unsigned long *flags)
{
	unsigned long old, new;
	do {
		old = *flags;
		new = old & ~DRBD_DEVICE_WORK_MASK;
	} while (cmpxchg(flags, old, new) != old);
	return old & DRBD_DEVICE_WORK_MASK;
}

static void do_unqueued_work(struct drbd_connection *connection)
1970 1971 1972 1973 1974 1975 1976
{
	struct drbd_peer_device *peer_device;
	int vnr;

	rcu_read_lock();
	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
		struct drbd_device *device = peer_device->device;
1977 1978
		unsigned long todo = get_work_bits(&device->flags);
		if (!todo)
1979
			continue;
1980

1981 1982
		kref_get(&device->kref);
		rcu_read_unlock();
1983
		do_device_work(device, todo);
1984 1985 1986 1987 1988 1989
		kref_put(&device->kref, drbd_destroy_device);
		rcu_read_lock();
	}
	rcu_read_unlock();
}

1990
static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1991 1992
{
	spin_lock_irq(&queue->q_lock);
1993
	list_splice_tail_init(&queue->q, work_list);
1994 1995 1996 1997
	spin_unlock_irq(&queue->q_lock);
	return !list_empty(work_list);
}

1998
static bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list)
1999 2000 2001 2002 2003 2004 2005 2006
{
	spin_lock_irq(&queue->q_lock);
	if (!list_empty(&queue->q))
		list_move(queue->q.next, work_list);
	spin_unlock_irq(&queue->q_lock);
	return !list_empty(work_list);
}

2007
static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2008 2009 2010 2011 2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032 2033 2034 2035 2036
{
	DEFINE_WAIT(wait);
	struct net_conf *nc;
	int uncork, cork;

	dequeue_work_item(&connection->sender_work, work_list);
	if (!list_empty(work_list))
		return;

	/* Still nothing to do?
	 * Maybe we still need to close the current epoch,
	 * even if no new requests are queued yet.
	 *
	 * Also, poke TCP, just in case.
	 * Then wait for new work (or signal). */
	rcu_read_lock();
	nc = rcu_dereference(connection->net_conf);
	uncork = nc ? nc->tcp_cork : 0;
	rcu_read_unlock();
	if (uncork) {
		mutex_lock(&connection->data.mutex);
		if (connection->data.socket)
			drbd_tcp_uncork(connection->data.socket);
		mutex_unlock(&connection->data.mutex);
	}

	for (;;) {
		int send_barrier;
		prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2037
		spin_lock_irq(&connection->resource->req_lock);
2038
		spin_lock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2039 2040 2041
		/* dequeue single item only,
		 * we still use drbd_queue_work_front() in some places */
		if (!list_empty(&connection->sender_work.q))
2042
			list_splice_tail_init(&connection->sender_work.q, work_list);
2043 2044
		spin_unlock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
		if (!list_empty(work_list) || signal_pending(current)) {
2045
			spin_unlock_irq(&connection->resource->req_lock);
2046 2047
			break;
		}
2048 2049 2050 2051 2052 2053 2054 2055 2056 2057 2058

		/* We found nothing new to do, no to-be-communicated request,
		 * no other work item.  We may still need to close the last
		 * epoch.  Next incoming request epoch will be connection ->
		 * current transfer log epoch number.  If that is different
		 * from the epoch of the last request we communicated, it is
		 * safe to send the epoch separating barrier now.
		 */
		send_barrier =
			atomic_read(&connection->current_tle_nr) !=
			connection->send.current_epoch_nr;
2059
		spin_unlock_irq(&connection->resource->req_lock);
2060 2061 2062 2063

		if (send_barrier)
			maybe_send_barrier(connection,
					connection->send.current_epoch_nr + 1);
2064

2065
		if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2066 2067
			break;

2068 2069 2070
		/* drbd_send() may have called flush_signals() */
		if (get_t_state(&connection->worker) != RUNNING)
			break;
2071

2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093
		schedule();
		/* may be woken up for other things but new work, too,
		 * e.g. if the current epoch got closed.
		 * In which case we send the barrier above. */
	}
	finish_wait(&connection->sender_work.q_wait, &wait);

	/* someone may have changed the config while we have been waiting above. */
	rcu_read_lock();
	nc = rcu_dereference(connection->net_conf);
	cork = nc ? nc->tcp_cork : 0;
	rcu_read_unlock();
	mutex_lock(&connection->data.mutex);
	if (connection->data.socket) {
		if (cork)
			drbd_tcp_cork(connection->data.socket);
		else if (!uncork)
			drbd_tcp_uncork(connection->data.socket);
	}
	mutex_unlock(&connection->data.mutex);
}

P
Philipp Reisner 已提交
2094 2095
int drbd_worker(struct drbd_thread *thi)
{
2096
	struct drbd_connection *connection = thi->connection;
2097
	struct drbd_work *w = NULL;
2098
	struct drbd_peer_device *peer_device;
P
Philipp Reisner 已提交
2099
	LIST_HEAD(work_list);
2100
	int vnr;
P
Philipp Reisner 已提交
2101

2102
	while (get_t_state(thi) == RUNNING) {
2103
		drbd_thread_current_set_cpu(thi);
P
Philipp Reisner 已提交
2104

2105 2106
		if (list_empty(&work_list)) {
			update_worker_timing_details(connection, wait_for_work);
2107
			wait_for_work(connection, &work_list);
2108
		}
P
Philipp Reisner 已提交
2109

2110 2111
		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
			update_worker_timing_details(connection, do_unqueued_work);
2112
			do_unqueued_work(connection);
2113
		}
2114

2115
		if (signal_pending(current)) {
P
Philipp Reisner 已提交
2116
			flush_signals(current);
2117
			if (get_t_state(thi) == RUNNING) {
2118
				drbd_warn(connection, "Worker got an unexpected signal\n");
P
Philipp Reisner 已提交
2119
				continue;
2120
			}
P
Philipp Reisner 已提交
2121 2122 2123
			break;
		}

2124
		if (get_t_state(thi) != RUNNING)
P
Philipp Reisner 已提交
2125 2126
			break;

2127
		while (!list_empty(&work_list)) {
2128 2129
			w = list_first_entry(&work_list, struct drbd_work, list);
			list_del_init(&w->list);
2130
			update_worker_timing_details(connection, w->cb);
2131
			if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2132
				continue;
2133 2134
			if (connection->cstate >= C_WF_REPORT_PARAMS)
				conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
P
Philipp Reisner 已提交
2135 2136 2137
		}
	}

2138
	do {
2139 2140
		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
			update_worker_timing_details(connection, do_unqueued_work);
2141
			do_unqueued_work(connection);
2142
		}
P
Philipp Reisner 已提交
2143
		while (!list_empty(&work_list)) {
2144 2145
			w = list_first_entry(&work_list, struct drbd_work, list);
			list_del_init(&w->list);
2146
			update_worker_timing_details(connection, w->cb);
2147
			w->cb(w, 1);
P
Philipp Reisner 已提交
2148
		}
2149
		dequeue_work_batch(&connection->sender_work, &work_list);
2150
	} while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
P
Philipp Reisner 已提交
2151

P
Philipp Reisner 已提交
2152
	rcu_read_lock();
2153 2154
	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
		struct drbd_device *device = peer_device->device;
2155
		D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2156
		kref_get(&device->kref);
P
Philipp Reisner 已提交
2157
		rcu_read_unlock();
2158
		drbd_device_cleanup(device);
2159
		kref_put(&device->kref, drbd_destroy_device);
P
Philipp Reisner 已提交
2160
		rcu_read_lock();
2161
	}
P
Philipp Reisner 已提交
2162
	rcu_read_unlock();
P
Philipp Reisner 已提交
2163 2164 2165

	return 0;
}