drbd_worker.c 62.1 KB
Newer Older
P
Philipp Reisner 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
/*
   drbd_worker.c

   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.

   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.

   drbd is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2, or (at your option)
   any later version.

   drbd is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with drbd; see the file COPYING.  If not, write to
   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.

24
*/
P
Philipp Reisner 已提交
25 26 27 28 29 30 31 32 33 34 35 36 37 38

#include <linux/module.h>
#include <linux/drbd.h>
#include <linux/sched.h>
#include <linux/wait.h>
#include <linux/mm.h>
#include <linux/memcontrol.h>
#include <linux/mm_inline.h>
#include <linux/slab.h>
#include <linux/random.h>
#include <linux/string.h>
#include <linux/scatterlist.h>

#include "drbd_int.h"
39
#include "drbd_protocol.h"
P
Philipp Reisner 已提交
40 41
#include "drbd_req.h"

42 43
static int make_ov_request(struct drbd_device *, int);
static int make_resync_request(struct drbd_device *, int);
P
Philipp Reisner 已提交
44

45
/* endio handlers:
46
 *   drbd_md_endio (defined here)
47 48
 *   drbd_request_endio (defined here)
 *   drbd_peer_request_endio (defined here)
49
 *   drbd_bm_endio (defined in drbd_bitmap.c)
50
 *
P
Philipp Reisner 已提交
51 52 53 54 55 56 57 58 59 60
 * For all these callbacks, note the following:
 * The callbacks will be called in irq context by the IDE drivers,
 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
 * Try to get the locking right :)
 *
 */

/* used for synchronous meta data and bitmap IO
 * submitted by drbd_md_sync_page_io()
 */
61
void drbd_md_endio(struct bio *bio)
P
Philipp Reisner 已提交
62
{
63
	struct drbd_device *device;
P
Philipp Reisner 已提交
64

65
	device = bio->bi_private;
66
	device->md_io.error = bio->bi_error;
P
Philipp Reisner 已提交
67

68 69 70 71 72 73 74 75 76
	/* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
	 * to timeout on the lower level device, and eventually detach from it.
	 * If this io completion runs after that timeout expired, this
	 * drbd_md_put_buffer() may allow us to finally try and re-attach.
	 * During normal operation, this only puts that extra reference
	 * down to 1 again.
	 * Make sure we first drop the reference, and only then signal
	 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
	 * next drbd_md_sync_page_io(), that we trigger the
77
	 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
78
	 */
79
	drbd_md_put_buffer(device);
80
	device->md_io.done = 1;
81
	wake_up(&device->misc_wait);
82
	bio_put(bio);
83 84
	if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
		put_ldev(device);
P
Philipp Reisner 已提交
85 86 87 88 89
}

/* reads on behalf of the partner,
 * "submitted" by the receiver
 */
90
static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
P
Philipp Reisner 已提交
91 92
{
	unsigned long flags = 0;
93 94
	struct drbd_peer_device *peer_device = peer_req->peer_device;
	struct drbd_device *device = peer_device->device;
P
Philipp Reisner 已提交
95

96
	spin_lock_irqsave(&device->resource->req_lock, flags);
97
	device->read_cnt += peer_req->i.size >> 9;
98
	list_del(&peer_req->w.list);
99 100
	if (list_empty(&device->read_ee))
		wake_up(&device->ee_wait);
101
	if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
102
		__drbd_chk_io_error(device, DRBD_READ_ERROR);
103
	spin_unlock_irqrestore(&device->resource->req_lock, flags);
P
Philipp Reisner 已提交
104

105
	drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
106
	put_ldev(device);
P
Philipp Reisner 已提交
107 108 109
}

/* writes on behalf of the partner, or resync writes,
110
 * "submitted" by the receiver, final stage.  */
111
void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
P
Philipp Reisner 已提交
112 113
{
	unsigned long flags = 0;
114 115
	struct drbd_peer_device *peer_device = peer_req->peer_device;
	struct drbd_device *device = peer_device->device;
116
	struct drbd_connection *connection = peer_device->connection;
117
	struct drbd_interval i;
P
Philipp Reisner 已提交
118
	int do_wake;
119
	u64 block_id;
P
Philipp Reisner 已提交
120 121
	int do_al_complete_io;

122
	/* after we moved peer_req to done_ee,
P
Philipp Reisner 已提交
123 124 125
	 * we may no longer access it,
	 * it may be freed/reused already!
	 * (as soon as we release the req_lock) */
126
	i = peer_req->i;
127 128
	do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
	block_id = peer_req->block_id;
129
	peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
P
Philipp Reisner 已提交
130

131
	spin_lock_irqsave(&device->resource->req_lock, flags);
132
	device->writ_cnt += peer_req->i.size >> 9;
133
	list_move_tail(&peer_req->w.list, &device->done_ee);
P
Philipp Reisner 已提交
134

135
	/*
136
	 * Do not remove from the write_requests tree here: we did not send the
137 138
	 * Ack yet and did not wake possibly waiting conflicting requests.
	 * Removed from the tree from "drbd_process_done_ee" within the
139
	 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
140 141
	 * _drbd_clear_done_ee.
	 */
P
Philipp Reisner 已提交
142

143
	do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
P
Philipp Reisner 已提交
144

145 146 147
	/* FIXME do we want to detach for failed REQ_DISCARD?
	 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
	if (peer_req->flags & EE_WAS_ERROR)
148
		__drbd_chk_io_error(device, DRBD_WRITE_ERROR);
149 150 151 152 153 154

	if (connection->cstate >= C_WF_REPORT_PARAMS) {
		kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
		if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
			kref_put(&device->kref, drbd_destroy_device);
	}
155
	spin_unlock_irqrestore(&device->resource->req_lock, flags);
P
Philipp Reisner 已提交
156

157
	if (block_id == ID_SYNCER)
158
		drbd_rs_complete_io(device, i.sector);
P
Philipp Reisner 已提交
159 160

	if (do_wake)
161
		wake_up(&device->ee_wait);
P
Philipp Reisner 已提交
162 163

	if (do_al_complete_io)
164
		drbd_al_complete_io(device, &i);
P
Philipp Reisner 已提交
165

166
	put_ldev(device);
167
}
P
Philipp Reisner 已提交
168

169 170 171
/* writes on behalf of the partner, or resync writes,
 * "submitted" by the receiver.
 */
172
void drbd_peer_request_endio(struct bio *bio)
173
{
174
	struct drbd_peer_request *peer_req = bio->bi_private;
175
	struct drbd_device *device = peer_req->peer_device->device;
176
	int is_write = bio_data_dir(bio) == WRITE;
177
	int is_discard = !!(bio->bi_rw & REQ_DISCARD);
178

179
	if (bio->bi_error && __ratelimit(&drbd_ratelimit_state))
180
		drbd_warn(device, "%s: error=%d s=%llus\n",
181
				is_write ? (is_discard ? "discard" : "write")
182
					: "read", bio->bi_error,
183
				(unsigned long long)peer_req->i.sector);
184

185
	if (bio->bi_error)
186
		set_bit(__EE_WAS_ERROR, &peer_req->flags);
187 188

	bio_put(bio); /* no need for the bio anymore */
189
	if (atomic_dec_and_test(&peer_req->pending_bios)) {
190
		if (is_write)
191
			drbd_endio_write_sec_final(peer_req);
192
		else
193
			drbd_endio_read_sec_final(peer_req);
194
	}
P
Philipp Reisner 已提交
195 196
}

197 198 199 200 201 202
void drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
{
	panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
		device->minor, device->resource->name, device->vnr);
}

P
Philipp Reisner 已提交
203 204
/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
 */
205
void drbd_request_endio(struct bio *bio)
P
Philipp Reisner 已提交
206
{
207
	unsigned long flags;
P
Philipp Reisner 已提交
208
	struct drbd_request *req = bio->bi_private;
209
	struct drbd_device *device = req->device;
210
	struct bio_and_error m;
P
Philipp Reisner 已提交
211
	enum drbd_req_event what;
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242

	/* If this request was aborted locally before,
	 * but now was completed "successfully",
	 * chances are that this caused arbitrary data corruption.
	 *
	 * "aborting" requests, or force-detaching the disk, is intended for
	 * completely blocked/hung local backing devices which do no longer
	 * complete requests at all, not even do error completions.  In this
	 * situation, usually a hard-reset and failover is the only way out.
	 *
	 * By "aborting", basically faking a local error-completion,
	 * we allow for a more graceful swichover by cleanly migrating services.
	 * Still the affected node has to be rebooted "soon".
	 *
	 * By completing these requests, we allow the upper layers to re-use
	 * the associated data pages.
	 *
	 * If later the local backing device "recovers", and now DMAs some data
	 * from disk into the original request pages, in the best case it will
	 * just put random data into unused pages; but typically it will corrupt
	 * meanwhile completely unrelated data, causing all sorts of damage.
	 *
	 * Which means delayed successful completion,
	 * especially for READ requests,
	 * is a reason to panic().
	 *
	 * We assume that a delayed *error* completion is OK,
	 * though we still will complain noisily about it.
	 */
	if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
		if (__ratelimit(&drbd_ratelimit_state))
243
			drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
244

245
		if (!bio->bi_error)
246
			drbd_panic_after_delayed_completion_of_aborted_request(device);
247 248
	}

P
Philipp Reisner 已提交
249
	/* to avoid recursion in __req_mod */
250
	if (unlikely(bio->bi_error)) {
251
		if (bio->bi_rw & REQ_DISCARD)
252
			what = (bio->bi_error == -EOPNOTSUPP)
253 254 255 256
				? DISCARD_COMPLETED_NOTSUPP
				: DISCARD_COMPLETED_WITH_ERROR;
		else
			what = (bio_data_dir(bio) == WRITE)
257
			? WRITE_COMPLETED_WITH_ERROR
258
			: (bio_rw(bio) == READ)
259 260
			  ? READ_COMPLETED_WITH_ERROR
			  : READ_AHEAD_COMPLETED_WITH_ERROR;
P
Philipp Reisner 已提交
261
	} else
262
		what = COMPLETED_OK;
P
Philipp Reisner 已提交
263 264

	bio_put(req->private_bio);
265
	req->private_bio = ERR_PTR(bio->bi_error);
P
Philipp Reisner 已提交
266

267
	/* not req_mod(), we need irqsave here! */
268
	spin_lock_irqsave(&device->resource->req_lock, flags);
269
	__req_mod(req, what, &m);
270
	spin_unlock_irqrestore(&device->resource->req_lock, flags);
271
	put_ldev(device);
272 273

	if (m.bio)
274
		complete_master_bio(device, &m);
P
Philipp Reisner 已提交
275 276
}

H
Herbert Xu 已提交
277
void drbd_csum_ee(struct crypto_ahash *tfm, struct drbd_peer_request *peer_req, void *digest)
278
{
H
Herbert Xu 已提交
279
	AHASH_REQUEST_ON_STACK(req, tfm);
280
	struct scatterlist sg;
281
	struct page *page = peer_req->pages;
282 283 284
	struct page *tmp;
	unsigned len;

H
Herbert Xu 已提交
285 286
	ahash_request_set_tfm(req, tfm);
	ahash_request_set_callback(req, 0, NULL, NULL);
287 288

	sg_init_table(&sg, 1);
H
Herbert Xu 已提交
289
	crypto_ahash_init(req);
290 291 292 293

	while ((tmp = page_chain_next(page))) {
		/* all but the last page will be fully used */
		sg_set_page(&sg, page, PAGE_SIZE, 0);
H
Herbert Xu 已提交
294 295
		ahash_request_set_crypt(req, &sg, NULL, sg.length);
		crypto_ahash_update(req);
296 297 298
		page = tmp;
	}
	/* and now the last, possibly only partially used page */
299
	len = peer_req->i.size & (PAGE_SIZE - 1);
300
	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
H
Herbert Xu 已提交
301 302 303
	ahash_request_set_crypt(req, &sg, digest, sg.length);
	crypto_ahash_finup(req);
	ahash_request_zero(req);
304 305
}

H
Herbert Xu 已提交
306
void drbd_csum_bio(struct crypto_ahash *tfm, struct bio *bio, void *digest)
P
Philipp Reisner 已提交
307
{
H
Herbert Xu 已提交
308
	AHASH_REQUEST_ON_STACK(req, tfm);
P
Philipp Reisner 已提交
309
	struct scatterlist sg;
310 311
	struct bio_vec bvec;
	struct bvec_iter iter;
P
Philipp Reisner 已提交
312

H
Herbert Xu 已提交
313 314
	ahash_request_set_tfm(req, tfm);
	ahash_request_set_callback(req, 0, NULL, NULL);
P
Philipp Reisner 已提交
315 316

	sg_init_table(&sg, 1);
H
Herbert Xu 已提交
317
	crypto_ahash_init(req);
P
Philipp Reisner 已提交
318

319 320
	bio_for_each_segment(bvec, bio, iter) {
		sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
H
Herbert Xu 已提交
321 322
		ahash_request_set_crypt(req, &sg, NULL, sg.length);
		crypto_ahash_update(req);
P
Philipp Reisner 已提交
323
	}
H
Herbert Xu 已提交
324 325 326
	ahash_request_set_crypt(req, NULL, digest, 0);
	crypto_ahash_final(req);
	ahash_request_zero(req);
P
Philipp Reisner 已提交
327 328
}

329
/* MAYBE merge common code with w_e_end_ov_req */
330
static int w_e_send_csum(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
331
{
332
	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
333 334
	struct drbd_peer_device *peer_device = peer_req->peer_device;
	struct drbd_device *device = peer_device->device;
P
Philipp Reisner 已提交
335 336
	int digest_size;
	void *digest;
337
	int err = 0;
P
Philipp Reisner 已提交
338

339 340
	if (unlikely(cancel))
		goto out;
P
Philipp Reisner 已提交
341

342
	if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
343
		goto out;
P
Philipp Reisner 已提交
344

H
Herbert Xu 已提交
345
	digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
346 347
	digest = kmalloc(digest_size, GFP_NOIO);
	if (digest) {
348 349
		sector_t sector = peer_req->i.sector;
		unsigned int size = peer_req->i.size;
350
		drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
351
		/* Free peer_req and pages before send.
352 353 354
		 * In case we block on congestion, we could otherwise run into
		 * some distributed deadlock, if the other side blocks on
		 * congestion as well, because our receiver blocks in
355
		 * drbd_alloc_pages due to pp_in_use > max_buffers. */
356
		drbd_free_peer_req(device, peer_req);
357
		peer_req = NULL;
358
		inc_rs_pending(device);
359
		err = drbd_send_drequest_csum(peer_device, sector, size,
360 361
					      digest, digest_size,
					      P_CSUM_RS_REQUEST);
362 363
		kfree(digest);
	} else {
364
		drbd_err(device, "kmalloc() of digest failed.\n");
365
		err = -ENOMEM;
366
	}
P
Philipp Reisner 已提交
367

368
out:
369
	if (peer_req)
370
		drbd_free_peer_req(device, peer_req);
P
Philipp Reisner 已提交
371

372
	if (unlikely(err))
373
		drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
374
	return err;
P
Philipp Reisner 已提交
375 376 377 378
}

#define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)

379
static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
P
Philipp Reisner 已提交
380
{
381
	struct drbd_device *device = peer_device->device;
382
	struct drbd_peer_request *peer_req;
P
Philipp Reisner 已提交
383

384
	if (!get_ldev(device))
385
		return -EIO;
P
Philipp Reisner 已提交
386 387 388

	/* GFP_TRY, because if there is no memory available right now, this may
	 * be rescheduled for later. It is "only" background resync, after all. */
389
	peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
390
				       size, true /* has real payload */, GFP_TRY);
391
	if (!peer_req)
392
		goto defer;
P
Philipp Reisner 已提交
393

394
	peer_req->w.cb = w_e_send_csum;
395
	spin_lock_irq(&device->resource->req_lock);
396
	list_add_tail(&peer_req->w.list, &device->read_ee);
397
	spin_unlock_irq(&device->resource->req_lock);
P
Philipp Reisner 已提交
398

399 400
	atomic_add(size >> 9, &device->rs_sect_ev);
	if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
401
		return 0;
P
Philipp Reisner 已提交
402

403 404 405 406
	/* If it failed because of ENOMEM, retry should help.  If it failed
	 * because bio_add_page failed (probably broken lower level driver),
	 * retry may or may not help.
	 * If it does not, you may need to force disconnect. */
407
	spin_lock_irq(&device->resource->req_lock);
408
	list_del(&peer_req->w.list);
409
	spin_unlock_irq(&device->resource->req_lock);
410

411
	drbd_free_peer_req(device, peer_req);
412
defer:
413
	put_ldev(device);
414
	return -EAGAIN;
P
Philipp Reisner 已提交
415 416
}

417
int w_resync_timer(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
418
{
419 420 421
	struct drbd_device *device =
		container_of(w, struct drbd_device, resync_work);

422
	switch (device->state.conn) {
423
	case C_VERIFY_S:
424
		make_ov_request(device, cancel);
425 426
		break;
	case C_SYNC_TARGET:
427
		make_resync_request(device, cancel);
428
		break;
P
Philipp Reisner 已提交
429 430
	}

431
	return 0;
432 433 434 435
}

void resync_timer_fn(unsigned long data)
{
436
	struct drbd_device *device = (struct drbd_device *) data;
437

438 439 440
	drbd_queue_work_if_unqueued(
		&first_peer_device(device)->connection->sender_work,
		&device->resync_work);
P
Philipp Reisner 已提交
441 442
}

443 444 445 446 447
static void fifo_set(struct fifo_buffer *fb, int value)
{
	int i;

	for (i = 0; i < fb->size; i++)
448
		fb->values[i] = value;
449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471
}

static int fifo_push(struct fifo_buffer *fb, int value)
{
	int ov;

	ov = fb->values[fb->head_index];
	fb->values[fb->head_index++] = value;

	if (fb->head_index >= fb->size)
		fb->head_index = 0;

	return ov;
}

static void fifo_add_val(struct fifo_buffer *fb, int value)
{
	int i;

	for (i = 0; i < fb->size; i++)
		fb->values[i] += value;
}

472 473 474 475
struct fifo_buffer *fifo_alloc(int fifo_size)
{
	struct fifo_buffer *fb;

476
	fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
477 478 479 480 481 482 483 484 485 486
	if (!fb)
		return NULL;

	fb->head_index = 0;
	fb->size = fifo_size;
	fb->total = 0;

	return fb;
}

487
static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
488
{
P
Philipp Reisner 已提交
489
	struct disk_conf *dc;
490
	unsigned int want;     /* The number of sectors we want in-flight */
491
	int req_sect; /* Number of sectors to request in this turn */
492
	int correction; /* Number of sectors more we need in-flight */
493 494 495 496
	int cps; /* correction per invocation of drbd_rs_controller() */
	int steps; /* Number of time steps to plan ahead */
	int curr_corr;
	int max_sect;
P
Philipp Reisner 已提交
497
	struct fifo_buffer *plan;
498

499 500
	dc = rcu_dereference(device->ldev->disk_conf);
	plan = rcu_dereference(device->rs_plan_s);
501

P
Philipp Reisner 已提交
502
	steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
503

504
	if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
P
Philipp Reisner 已提交
505
		want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
506
	} else { /* normal path */
P
Philipp Reisner 已提交
507 508
		want = dc->c_fill_target ? dc->c_fill_target :
			sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
509 510
	}

511
	correction = want - device->rs_in_flight - plan->total;
512 513 514

	/* Plan ahead */
	cps = correction / steps;
P
Philipp Reisner 已提交
515 516
	fifo_add_val(plan, cps);
	plan->total += cps * steps;
517 518

	/* What we do in this step */
P
Philipp Reisner 已提交
519 520
	curr_corr = fifo_push(plan, 0);
	plan->total -= curr_corr;
521 522 523 524 525

	req_sect = sect_in + curr_corr;
	if (req_sect < 0)
		req_sect = 0;

P
Philipp Reisner 已提交
526
	max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
527 528 529 530
	if (req_sect > max_sect)
		req_sect = max_sect;

	/*
531
	drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
532 533
		 sect_in, device->rs_in_flight, want, correction,
		 steps, cps, device->rs_planed, curr_corr, req_sect);
534 535 536 537 538
	*/

	return req_sect;
}

539
static int drbd_rs_number_requests(struct drbd_device *device)
540
{
541 542 543 544 545
	unsigned int sect_in;  /* Number of sectors that came in since the last turn */
	int number, mxb;

	sect_in = atomic_xchg(&device->rs_sect_in, 0);
	device->rs_in_flight -= sect_in;
P
Philipp Reisner 已提交
546 547

	rcu_read_lock();
548
	mxb = drbd_get_max_buffers(device) / 2;
549
	if (rcu_dereference(device->rs_plan_s)->size) {
550
		number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
551
		device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
552
	} else {
553 554
		device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
		number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
555
	}
P
Philipp Reisner 已提交
556
	rcu_read_unlock();
557

558 559 560 561 562
	/* Don't have more than "max-buffers"/2 in-flight.
	 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
	 * potentially causing a distributed deadlock on congestion during
	 * online-verify or (checksum-based) resync, if max-buffers,
	 * socket buffer sizes and resync rate settings are mis-configured. */
563 564 565 566 567 568 569

	/* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
	 * mxb (as used here, and in drbd_alloc_pages on the peer) is
	 * "number of pages" (typically also 4k),
	 * but "rs_in_flight" is in "sectors" (512 Byte). */
	if (mxb - device->rs_in_flight/8 < number)
		number = mxb - device->rs_in_flight/8;
570

571 572 573
	return number;
}

574
static int make_resync_request(struct drbd_device *const device, int cancel)
P
Philipp Reisner 已提交
575
{
576 577
	struct drbd_peer_device *const peer_device = first_peer_device(device);
	struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
P
Philipp Reisner 已提交
578 579
	unsigned long bit;
	sector_t sector;
580
	const sector_t capacity = drbd_get_capacity(device->this_bdev);
581
	int max_bio_size;
582
	int number, rollback_i, size;
583
	int align, requeue = 0;
584
	int i = 0;
P
Philipp Reisner 已提交
585 586

	if (unlikely(cancel))
587
		return 0;
P
Philipp Reisner 已提交
588

589
	if (device->rs_total == 0) {
590
		/* empty resync? */
591
		drbd_resync_finished(device);
592
		return 0;
593 594
	}

595 596 597
	if (!get_ldev(device)) {
		/* Since we only need to access device->rsync a
		   get_ldev_if_state(device,D_FAILED) would be sufficient, but
P
Philipp Reisner 已提交
598 599
		   to continue resync with a broken disk makes no sense at
		   all */
600
		drbd_err(device, "Disk broke down during resync!\n");
601
		return 0;
P
Philipp Reisner 已提交
602 603
	}

604 605
	max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
	number = drbd_rs_number_requests(device);
606
	if (number <= 0)
607
		goto requeue;
P
Philipp Reisner 已提交
608 609

	for (i = 0; i < number; i++) {
610 611
		/* Stop generating RS requests when half of the send buffer is filled,
		 * but notify TCP that we'd like to have more space. */
612 613
		mutex_lock(&connection->data.mutex);
		if (connection->data.socket) {
614 615 616 617 618 619 620 621 622 623
			struct sock *sk = connection->data.socket->sk;
			int queued = sk->sk_wmem_queued;
			int sndbuf = sk->sk_sndbuf;
			if (queued > sndbuf / 2) {
				requeue = 1;
				if (sk->sk_socket)
					set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
			}
		} else
			requeue = 1;
624
		mutex_unlock(&connection->data.mutex);
625
		if (requeue)
P
Philipp Reisner 已提交
626 627 628 629
			goto requeue;

next_sector:
		size = BM_BLOCK_SIZE;
630
		bit  = drbd_bm_find_next(device, device->bm_resync_fo);
P
Philipp Reisner 已提交
631

632
		if (bit == DRBD_END_OF_BITMAP) {
633 634
			device->bm_resync_fo = drbd_bm_bits(device);
			put_ldev(device);
635
			return 0;
P
Philipp Reisner 已提交
636 637 638 639
		}

		sector = BM_BIT_TO_SECT(bit);

640
		if (drbd_try_rs_begin_io(device, sector)) {
641
			device->bm_resync_fo = bit;
P
Philipp Reisner 已提交
642 643
			goto requeue;
		}
644
		device->bm_resync_fo = bit + 1;
P
Philipp Reisner 已提交
645

646 647
		if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
			drbd_rs_complete_io(device, sector);
P
Philipp Reisner 已提交
648 649 650
			goto next_sector;
		}

651
#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
P
Philipp Reisner 已提交
652 653 654 655 656 657 658
		/* try to find some adjacent bits.
		 * we stop if we have already the maximum req size.
		 *
		 * Additionally always align bigger requests, in order to
		 * be prepared for all stripe sizes of software RAIDs.
		 */
		align = 1;
659
		rollback_i = i;
660
		while (i < number) {
661
			if (size + BM_BLOCK_SIZE > max_bio_size)
P
Philipp Reisner 已提交
662 663 664 665 666 667 668 669 670 671 672 673 674 675
				break;

			/* Be always aligned */
			if (sector & ((1<<(align+3))-1))
				break;

			/* do not cross extent boundaries */
			if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
				break;
			/* now, is it actually dirty, after all?
			 * caution, drbd_bm_test_bit is tri-state for some
			 * obscure reason; ( b == 0 ) would get the out-of-band
			 * only accidentally right because of the "oddly sized"
			 * adjustment below */
676
			if (drbd_bm_test_bit(device, bit+1) != 1)
P
Philipp Reisner 已提交
677 678 679 680 681 682 683 684 685 686
				break;
			bit++;
			size += BM_BLOCK_SIZE;
			if ((BM_BLOCK_SIZE << align) <= size)
				align++;
			i++;
		}
		/* if we merged some,
		 * reset the offset to start the next drbd_bm_find_next from */
		if (size > BM_BLOCK_SIZE)
687
			device->bm_resync_fo = bit + 1;
P
Philipp Reisner 已提交
688 689 690 691 692
#endif

		/* adjust very last sectors, in case we are oddly sized */
		if (sector + (size>>9) > capacity)
			size = (capacity-sector)<<9;
693 694

		if (device->use_csums) {
695
			switch (read_for_csum(peer_device, sector, size)) {
696
			case -EIO: /* Disk failure */
697
				put_ldev(device);
698
				return -EIO;
699
			case -EAGAIN: /* allocation failed, or ldev busy */
700 701
				drbd_rs_complete_io(device, sector);
				device->bm_resync_fo = BM_SECT_TO_BIT(sector);
702
				i = rollback_i;
P
Philipp Reisner 已提交
703
				goto requeue;
704 705 706 707 708
			case 0:
				/* everything ok */
				break;
			default:
				BUG();
P
Philipp Reisner 已提交
709 710
			}
		} else {
711 712
			int err;

713
			inc_rs_pending(device);
714
			err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST,
715 716
						 sector, size, ID_SYNCER);
			if (err) {
717
				drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
718 719
				dec_rs_pending(device);
				put_ldev(device);
720
				return err;
P
Philipp Reisner 已提交
721 722 723 724
			}
		}
	}

725
	if (device->bm_resync_fo >= drbd_bm_bits(device)) {
P
Philipp Reisner 已提交
726 727 728 729 730 731
		/* last syncer _request_ was sent,
		 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
		 * next sync group will resume), as soon as we receive the last
		 * resync data block, and the last bit is cleared.
		 * until then resync "work" is "inactive" ...
		 */
732
		put_ldev(device);
733
		return 0;
P
Philipp Reisner 已提交
734 735 736
	}

 requeue:
737 738 739
	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
	mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
	put_ldev(device);
740
	return 0;
P
Philipp Reisner 已提交
741 742
}

743
static int make_ov_request(struct drbd_device *device, int cancel)
P
Philipp Reisner 已提交
744 745 746
{
	int number, i, size;
	sector_t sector;
747
	const sector_t capacity = drbd_get_capacity(device->this_bdev);
748
	bool stop_sector_reached = false;
P
Philipp Reisner 已提交
749 750 751 752

	if (unlikely(cancel))
		return 1;

753
	number = drbd_rs_number_requests(device);
P
Philipp Reisner 已提交
754

755
	sector = device->ov_position;
P
Philipp Reisner 已提交
756
	for (i = 0; i < number; i++) {
757
		if (sector >= capacity)
P
Philipp Reisner 已提交
758
			return 1;
759 760 761 762 763

		/* We check for "finished" only in the reply path:
		 * w_e_end_ov_reply().
		 * We need to send at least one request out. */
		stop_sector_reached = i > 0
764 765
			&& verify_can_do_stop_sector(device)
			&& sector >= device->ov_stop_sector;
766 767
		if (stop_sector_reached)
			break;
P
Philipp Reisner 已提交
768 769 770

		size = BM_BLOCK_SIZE;

771
		if (drbd_try_rs_begin_io(device, sector)) {
772
			device->ov_position = sector;
P
Philipp Reisner 已提交
773 774 775 776 777 778
			goto requeue;
		}

		if (sector + (size>>9) > capacity)
			size = (capacity-sector)<<9;

779
		inc_rs_pending(device);
780
		if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
781
			dec_rs_pending(device);
P
Philipp Reisner 已提交
782 783 784 785
			return 0;
		}
		sector += BM_SECT_PER_BIT;
	}
786
	device->ov_position = sector;
P
Philipp Reisner 已提交
787 788

 requeue:
789
	device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
790
	if (i == 0 || !stop_sector_reached)
791
		mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
P
Philipp Reisner 已提交
792 793 794
	return 1;
}

795
int w_ov_finished(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
796
{
797 798 799 800
	struct drbd_device_work *dw =
		container_of(w, struct drbd_device_work, w);
	struct drbd_device *device = dw->device;
	kfree(dw);
801 802
	ov_out_of_sync_print(device);
	drbd_resync_finished(device);
P
Philipp Reisner 已提交
803

804
	return 0;
P
Philipp Reisner 已提交
805 806
}

807
static int w_resync_finished(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
808
{
809 810 811 812
	struct drbd_device_work *dw =
		container_of(w, struct drbd_device_work, w);
	struct drbd_device *device = dw->device;
	kfree(dw);
P
Philipp Reisner 已提交
813

814
	drbd_resync_finished(device);
P
Philipp Reisner 已提交
815

816
	return 0;
P
Philipp Reisner 已提交
817 818
}

819
static void ping_peer(struct drbd_device *device)
820
{
821
	struct drbd_connection *connection = first_peer_device(device)->connection;
822

823 824 825 826
	clear_bit(GOT_PING_ACK, &connection->flags);
	request_ping(connection);
	wait_event(connection->ping_wait,
		   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
827 828
}

829
int drbd_resync_finished(struct drbd_device *device)
P
Philipp Reisner 已提交
830 831 832 833
{
	unsigned long db, dt, dbdt;
	unsigned long n_oos;
	union drbd_state os, ns;
834
	struct drbd_device_work *dw;
P
Philipp Reisner 已提交
835
	char *khelper_cmd = NULL;
836
	int verify_done = 0;
P
Philipp Reisner 已提交
837 838 839 840

	/* Remove all elements from the resync LRU. Since future actions
	 * might set bits in the (main) bitmap, then the entries in the
	 * resync LRU would be wrong. */
841
	if (drbd_rs_del_all(device)) {
P
Philipp Reisner 已提交
842 843 844 845 846
		/* In case this is not possible now, most probably because
		 * there are P_RS_DATA_REPLY Packets lingering on the worker's
		 * queue (or even the read operations for those packets
		 * is not finished by now).   Retry in 100ms. */

847
		schedule_timeout_interruptible(HZ / 10);
848 849 850 851 852 853
		dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
		if (dw) {
			dw->w.cb = w_resync_finished;
			dw->device = device;
			drbd_queue_work(&first_peer_device(device)->connection->sender_work,
					&dw->w);
P
Philipp Reisner 已提交
854 855
			return 1;
		}
856
		drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
P
Philipp Reisner 已提交
857 858
	}

859
	dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
P
Philipp Reisner 已提交
860 861
	if (dt <= 0)
		dt = 1;
862

863
	db = device->rs_total;
864
	/* adjust for verify start and stop sectors, respective reached position */
865 866
	if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
		db -= device->ov_left;
867

P
Philipp Reisner 已提交
868
	dbdt = Bit2KB(db/dt);
869
	device->rs_paused /= HZ;
P
Philipp Reisner 已提交
870

871
	if (!get_ldev(device))
P
Philipp Reisner 已提交
872 873
		goto out;

874
	ping_peer(device);
875

876
	spin_lock_irq(&device->resource->req_lock);
877
	os = drbd_read_state(device);
P
Philipp Reisner 已提交
878

879 880
	verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);

P
Philipp Reisner 已提交
881 882 883 884 885 886 887 888
	/* This protects us against multiple calls (that can happen in the presence
	   of application IO), and against connectivity loss just before we arrive here. */
	if (os.conn <= C_CONNECTED)
		goto out_unlock;

	ns = os;
	ns.conn = C_CONNECTED;

889
	drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
890
	     verify_done ? "Online verify" : "Resync",
891
	     dt + device->rs_paused, device->rs_paused, dbdt);
P
Philipp Reisner 已提交
892

893
	n_oos = drbd_bm_total_weight(device);
P
Philipp Reisner 已提交
894 895 896

	if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
		if (n_oos) {
897
			drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
P
Philipp Reisner 已提交
898 899 900 901
			      n_oos, Bit2KB(1));
			khelper_cmd = "out-of-sync";
		}
	} else {
902
		D_ASSERT(device, (n_oos - device->rs_failed) == 0);
P
Philipp Reisner 已提交
903 904 905 906

		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
			khelper_cmd = "after-resync-target";

907
		if (device->use_csums && device->rs_total) {
908 909
			const unsigned long s = device->rs_same_csum;
			const unsigned long t = device->rs_total;
P
Philipp Reisner 已提交
910 911 912
			const int ratio =
				(t == 0)     ? 0 :
			(t < 100000) ? ((s*100)/t) : (s/(t/100));
913
			drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
P
Philipp Reisner 已提交
914 915
			     "transferred %luK total %luK\n",
			     ratio,
916 917 918
			     Bit2KB(device->rs_same_csum),
			     Bit2KB(device->rs_total - device->rs_same_csum),
			     Bit2KB(device->rs_total));
P
Philipp Reisner 已提交
919 920 921
		}
	}

922
	if (device->rs_failed) {
923
		drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
P
Philipp Reisner 已提交
924 925 926 927 928 929 930 931 932 933 934 935 936

		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
			ns.disk = D_INCONSISTENT;
			ns.pdsk = D_UP_TO_DATE;
		} else {
			ns.disk = D_UP_TO_DATE;
			ns.pdsk = D_INCONSISTENT;
		}
	} else {
		ns.disk = D_UP_TO_DATE;
		ns.pdsk = D_UP_TO_DATE;

		if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
937
			if (device->p_uuid) {
P
Philipp Reisner 已提交
938 939
				int i;
				for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
940 941 942
					_drbd_uuid_set(device, i, device->p_uuid[i]);
				drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
				_drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
P
Philipp Reisner 已提交
943
			} else {
944
				drbd_err(device, "device->p_uuid is NULL! BUG\n");
P
Philipp Reisner 已提交
945 946 947
			}
		}

948 949 950
		if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
			/* for verify runs, we don't update uuids here,
			 * so there would be nothing to report. */
951 952 953
			drbd_uuid_set_bm(device, 0UL);
			drbd_print_uuids(device, "updated UUIDs");
			if (device->p_uuid) {
954 955 956 957
				/* Now the two UUID sets are equal, update what we
				 * know of the peer. */
				int i;
				for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
958
					device->p_uuid[i] = device->ldev->md.uuid[i];
959
			}
P
Philipp Reisner 已提交
960 961 962
		}
	}

963
	_drbd_set_state(device, ns, CS_VERBOSE, NULL);
P
Philipp Reisner 已提交
964
out_unlock:
965
	spin_unlock_irq(&device->resource->req_lock);
966
	put_ldev(device);
P
Philipp Reisner 已提交
967
out:
968 969 970
	device->rs_total  = 0;
	device->rs_failed = 0;
	device->rs_paused = 0;
971 972

	/* reset start sector, if we reached end of device */
973 974
	if (verify_done && device->ov_left == 0)
		device->ov_start_sector = 0;
P
Philipp Reisner 已提交
975

976
	drbd_md_sync(device);
977

P
Philipp Reisner 已提交
978
	if (khelper_cmd)
979
		drbd_khelper(device, khelper_cmd);
P
Philipp Reisner 已提交
980 981 982 983 984

	return 1;
}

/* helper */
985
static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
P
Philipp Reisner 已提交
986
{
987
	if (drbd_peer_req_has_active_page(peer_req)) {
P
Philipp Reisner 已提交
988
		/* This might happen if sendpage() has not finished */
989
		int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
990 991
		atomic_add(i, &device->pp_in_use_by_net);
		atomic_sub(i, &device->pp_in_use);
992
		spin_lock_irq(&device->resource->req_lock);
993
		list_add_tail(&peer_req->w.list, &device->net_ee);
994
		spin_unlock_irq(&device->resource->req_lock);
995
		wake_up(&drbd_pp_wait);
P
Philipp Reisner 已提交
996
	} else
997
		drbd_free_peer_req(device, peer_req);
P
Philipp Reisner 已提交
998 999 1000 1001
}

/**
 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1002
 * @device:	DRBD device.
P
Philipp Reisner 已提交
1003 1004 1005
 * @w:		work object.
 * @cancel:	The connection will be closed anyways
 */
1006
int w_e_end_data_req(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
1007
{
1008
	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1009 1010
	struct drbd_peer_device *peer_device = peer_req->peer_device;
	struct drbd_device *device = peer_device->device;
1011
	int err;
P
Philipp Reisner 已提交
1012 1013

	if (unlikely(cancel)) {
1014 1015
		drbd_free_peer_req(device, peer_req);
		dec_unacked(device);
1016
		return 0;
P
Philipp Reisner 已提交
1017 1018
	}

1019
	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1020
		err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
P
Philipp Reisner 已提交
1021 1022
	} else {
		if (__ratelimit(&drbd_ratelimit_state))
1023
			drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1024
			    (unsigned long long)peer_req->i.sector);
P
Philipp Reisner 已提交
1025

1026
		err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
P
Philipp Reisner 已提交
1027 1028
	}

1029
	dec_unacked(device);
P
Philipp Reisner 已提交
1030

1031
	move_to_net_ee_or_free(device, peer_req);
P
Philipp Reisner 已提交
1032

1033
	if (unlikely(err))
1034
		drbd_err(device, "drbd_send_block() failed\n");
1035
	return err;
P
Philipp Reisner 已提交
1036 1037 1038
}

/**
1039
 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
P
Philipp Reisner 已提交
1040 1041 1042
 * @w:		work object.
 * @cancel:	The connection will be closed anyways
 */
1043
int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
1044
{
1045
	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1046 1047
	struct drbd_peer_device *peer_device = peer_req->peer_device;
	struct drbd_device *device = peer_device->device;
1048
	int err;
P
Philipp Reisner 已提交
1049 1050

	if (unlikely(cancel)) {
1051 1052
		drbd_free_peer_req(device, peer_req);
		dec_unacked(device);
1053
		return 0;
P
Philipp Reisner 已提交
1054 1055
	}

1056 1057 1058
	if (get_ldev_if_state(device, D_FAILED)) {
		drbd_rs_complete_io(device, peer_req->i.sector);
		put_ldev(device);
P
Philipp Reisner 已提交
1059 1060
	}

1061
	if (device->state.conn == C_AHEAD) {
1062
		err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1063
	} else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1064 1065
		if (likely(device->state.pdsk >= D_INCONSISTENT)) {
			inc_rs_pending(device);
1066
			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
P
Philipp Reisner 已提交
1067 1068
		} else {
			if (__ratelimit(&drbd_ratelimit_state))
1069
				drbd_err(device, "Not sending RSDataReply, "
P
Philipp Reisner 已提交
1070
				    "partner DISKLESS!\n");
1071
			err = 0;
P
Philipp Reisner 已提交
1072 1073 1074
		}
	} else {
		if (__ratelimit(&drbd_ratelimit_state))
1075
			drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1076
			    (unsigned long long)peer_req->i.sector);
P
Philipp Reisner 已提交
1077

1078
		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
P
Philipp Reisner 已提交
1079 1080

		/* update resync data with failure */
1081
		drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
P
Philipp Reisner 已提交
1082 1083
	}

1084
	dec_unacked(device);
P
Philipp Reisner 已提交
1085

1086
	move_to_net_ee_or_free(device, peer_req);
P
Philipp Reisner 已提交
1087

1088
	if (unlikely(err))
1089
		drbd_err(device, "drbd_send_block() failed\n");
1090
	return err;
P
Philipp Reisner 已提交
1091 1092
}

1093
int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
1094
{
1095
	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1096 1097
	struct drbd_peer_device *peer_device = peer_req->peer_device;
	struct drbd_device *device = peer_device->device;
P
Philipp Reisner 已提交
1098 1099 1100
	struct digest_info *di;
	int digest_size;
	void *digest = NULL;
1101
	int err, eq = 0;
P
Philipp Reisner 已提交
1102 1103

	if (unlikely(cancel)) {
1104 1105
		drbd_free_peer_req(device, peer_req);
		dec_unacked(device);
1106
		return 0;
P
Philipp Reisner 已提交
1107 1108
	}

1109 1110 1111
	if (get_ldev(device)) {
		drbd_rs_complete_io(device, peer_req->i.sector);
		put_ldev(device);
1112
	}
P
Philipp Reisner 已提交
1113

1114
	di = peer_req->digest;
P
Philipp Reisner 已提交
1115

1116
	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
P
Philipp Reisner 已提交
1117 1118 1119
		/* quick hack to try to avoid a race against reconfiguration.
		 * a real fix would be much more involved,
		 * introducing more locking mechanisms */
1120
		if (peer_device->connection->csums_tfm) {
H
Herbert Xu 已提交
1121
			digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
1122
			D_ASSERT(device, digest_size == di->digest_size);
P
Philipp Reisner 已提交
1123 1124 1125
			digest = kmalloc(digest_size, GFP_NOIO);
		}
		if (digest) {
1126
			drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
P
Philipp Reisner 已提交
1127 1128 1129 1130 1131
			eq = !memcmp(digest, di->digest, digest_size);
			kfree(digest);
		}

		if (eq) {
1132
			drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1133
			/* rs_same_csums unit is BM_BLOCK_SIZE */
1134
			device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1135
			err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
P
Philipp Reisner 已提交
1136
		} else {
1137
			inc_rs_pending(device);
1138 1139
			peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
			peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1140
			kfree(di);
1141
			err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
P
Philipp Reisner 已提交
1142 1143
		}
	} else {
1144
		err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
P
Philipp Reisner 已提交
1145
		if (__ratelimit(&drbd_ratelimit_state))
1146
			drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
P
Philipp Reisner 已提交
1147 1148
	}

1149 1150
	dec_unacked(device);
	move_to_net_ee_or_free(device, peer_req);
P
Philipp Reisner 已提交
1151

1152
	if (unlikely(err))
1153
		drbd_err(device, "drbd_send_block/ack() failed\n");
1154
	return err;
P
Philipp Reisner 已提交
1155 1156
}

1157
int w_e_end_ov_req(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
1158
{
1159
	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1160 1161
	struct drbd_peer_device *peer_device = peer_req->peer_device;
	struct drbd_device *device = peer_device->device;
1162 1163
	sector_t sector = peer_req->i.sector;
	unsigned int size = peer_req->i.size;
P
Philipp Reisner 已提交
1164 1165
	int digest_size;
	void *digest;
1166
	int err = 0;
P
Philipp Reisner 已提交
1167 1168 1169 1170

	if (unlikely(cancel))
		goto out;

H
Herbert Xu 已提交
1171
	digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
P
Philipp Reisner 已提交
1172
	digest = kmalloc(digest_size, GFP_NOIO);
1173
	if (!digest) {
1174
		err = 1;	/* terminate the connection in case the allocation failed */
1175
		goto out;
P
Philipp Reisner 已提交
1176 1177
	}

1178
	if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1179
		drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1180 1181 1182
	else
		memset(digest, 0, digest_size);

1183 1184 1185 1186
	/* Free e and pages before send.
	 * In case we block on congestion, we could otherwise run into
	 * some distributed deadlock, if the other side blocks on
	 * congestion as well, because our receiver blocks in
1187
	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1188
	drbd_free_peer_req(device, peer_req);
1189
	peer_req = NULL;
1190
	inc_rs_pending(device);
1191
	err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1192
	if (err)
1193
		dec_rs_pending(device);
1194 1195
	kfree(digest);

P
Philipp Reisner 已提交
1196
out:
1197
	if (peer_req)
1198 1199
		drbd_free_peer_req(device, peer_req);
	dec_unacked(device);
1200
	return err;
P
Philipp Reisner 已提交
1201 1202
}

1203
void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
P
Philipp Reisner 已提交
1204
{
1205 1206
	if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
		device->ov_last_oos_size += size>>9;
P
Philipp Reisner 已提交
1207
	} else {
1208 1209
		device->ov_last_oos_start = sector;
		device->ov_last_oos_size = size>>9;
P
Philipp Reisner 已提交
1210
	}
1211
	drbd_set_out_of_sync(device, sector, size);
P
Philipp Reisner 已提交
1212 1213
}

1214
int w_e_end_ov_reply(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
1215
{
1216
	struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1217 1218
	struct drbd_peer_device *peer_device = peer_req->peer_device;
	struct drbd_device *device = peer_device->device;
P
Philipp Reisner 已提交
1219 1220
	struct digest_info *di;
	void *digest;
1221 1222
	sector_t sector = peer_req->i.sector;
	unsigned int size = peer_req->i.size;
1223
	int digest_size;
1224
	int err, eq = 0;
1225
	bool stop_sector_reached = false;
P
Philipp Reisner 已提交
1226 1227

	if (unlikely(cancel)) {
1228 1229
		drbd_free_peer_req(device, peer_req);
		dec_unacked(device);
1230
		return 0;
P
Philipp Reisner 已提交
1231 1232 1233 1234
	}

	/* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
	 * the resync lru has been cleaned up already */
1235 1236 1237
	if (get_ldev(device)) {
		drbd_rs_complete_io(device, peer_req->i.sector);
		put_ldev(device);
1238
	}
P
Philipp Reisner 已提交
1239

1240
	di = peer_req->digest;
P
Philipp Reisner 已提交
1241

1242
	if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
H
Herbert Xu 已提交
1243
		digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
P
Philipp Reisner 已提交
1244 1245
		digest = kmalloc(digest_size, GFP_NOIO);
		if (digest) {
1246
			drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
P
Philipp Reisner 已提交
1247

1248
			D_ASSERT(device, digest_size == di->digest_size);
P
Philipp Reisner 已提交
1249 1250 1251 1252 1253
			eq = !memcmp(digest, di->digest, digest_size);
			kfree(digest);
		}
	}

1254 1255 1256 1257
	/* Free peer_req and pages before send.
	 * In case we block on congestion, we could otherwise run into
	 * some distributed deadlock, if the other side blocks on
	 * congestion as well, because our receiver blocks in
1258
	 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1259
	drbd_free_peer_req(device, peer_req);
P
Philipp Reisner 已提交
1260
	if (!eq)
1261
		drbd_ov_out_of_sync_found(device, sector, size);
P
Philipp Reisner 已提交
1262
	else
1263
		ov_out_of_sync_print(device);
P
Philipp Reisner 已提交
1264

1265
	err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1266
			       eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
P
Philipp Reisner 已提交
1267

1268
	dec_unacked(device);
P
Philipp Reisner 已提交
1269

1270
	--device->ov_left;
1271 1272

	/* let's advance progress step marks only for every other megabyte */
1273 1274
	if ((device->ov_left & 0x200) == 0x200)
		drbd_advance_rs_marks(device, device->ov_left);
1275

1276 1277
	stop_sector_reached = verify_can_do_stop_sector(device) &&
		(sector + (size>>9)) >= device->ov_stop_sector;
1278

1279 1280 1281
	if (device->ov_left == 0 || stop_sector_reached) {
		ov_out_of_sync_print(device);
		drbd_resync_finished(device);
P
Philipp Reisner 已提交
1282 1283
	}

1284
	return err;
P
Philipp Reisner 已提交
1285 1286
}

1287 1288 1289 1290 1291
/* FIXME
 * We need to track the number of pending barrier acks,
 * and to be able to wait for them.
 * See also comment in drbd_adm_attach before drbd_suspend_io.
 */
1292
static int drbd_send_barrier(struct drbd_connection *connection)
P
Philipp Reisner 已提交
1293
{
1294
	struct p_barrier *p;
1295
	struct drbd_socket *sock;
P
Philipp Reisner 已提交
1296

1297 1298
	sock = &connection->data;
	p = conn_prepare_command(connection, sock);
1299 1300
	if (!p)
		return -EIO;
1301
	p->barrier = connection->send.current_epoch_nr;
1302
	p->pad = 0;
1303
	connection->send.current_epoch_writes = 0;
1304
	connection->send.last_sent_barrier_jif = jiffies;
1305

1306
	return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
P
Philipp Reisner 已提交
1307 1308
}

1309
int w_send_write_hint(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
1310
{
1311 1312
	struct drbd_device *device =
		container_of(w, struct drbd_device, unplug_work);
1313 1314
	struct drbd_socket *sock;

P
Philipp Reisner 已提交
1315
	if (cancel)
1316
		return 0;
1317
	sock = &first_peer_device(device)->connection->data;
1318
	if (!drbd_prepare_command(first_peer_device(device), sock))
1319
		return -EIO;
1320
	return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
P
Philipp Reisner 已提交
1321 1322
}

1323
static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1324
{
1325 1326 1327 1328
	if (!connection->send.seen_any_write_yet) {
		connection->send.seen_any_write_yet = true;
		connection->send.current_epoch_nr = epoch;
		connection->send.current_epoch_writes = 0;
1329
		connection->send.last_sent_barrier_jif = jiffies;
1330 1331 1332
	}
}

1333
static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1334 1335
{
	/* re-init if first write on this connection */
1336
	if (!connection->send.seen_any_write_yet)
1337
		return;
1338 1339 1340 1341
	if (connection->send.current_epoch_nr != epoch) {
		if (connection->send.current_epoch_writes)
			drbd_send_barrier(connection);
		connection->send.current_epoch_nr = epoch;
1342 1343 1344
	}
}

1345
int w_send_out_of_sync(struct drbd_work *w, int cancel)
1346 1347
{
	struct drbd_request *req = container_of(w, struct drbd_request, w);
1348
	struct drbd_device *device = req->device;
1349 1350
	struct drbd_peer_device *const peer_device = first_peer_device(device);
	struct drbd_connection *const connection = peer_device->connection;
1351
	int err;
1352 1353

	if (unlikely(cancel)) {
1354
		req_mod(req, SEND_CANCELED);
1355
		return 0;
1356
	}
1357
	req->pre_send_jif = jiffies;
1358

1359
	/* this time, no connection->send.current_epoch_writes++;
1360 1361 1362
	 * If it was sent, it was the closing barrier for the last
	 * replicated epoch, before we went into AHEAD mode.
	 * No more barriers will be sent, until we leave AHEAD mode again. */
1363
	maybe_send_barrier(connection, req->epoch);
1364

1365
	err = drbd_send_out_of_sync(peer_device, req);
1366
	req_mod(req, OOS_HANDED_TO_NETWORK);
1367

1368
	return err;
1369 1370
}

P
Philipp Reisner 已提交
1371 1372 1373 1374 1375
/**
 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
 * @w:		work object.
 * @cancel:	The connection will be closed anyways
 */
1376
int w_send_dblock(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
1377 1378
{
	struct drbd_request *req = container_of(w, struct drbd_request, w);
1379
	struct drbd_device *device = req->device;
1380 1381
	struct drbd_peer_device *const peer_device = first_peer_device(device);
	struct drbd_connection *connection = peer_device->connection;
1382
	int err;
P
Philipp Reisner 已提交
1383 1384

	if (unlikely(cancel)) {
1385
		req_mod(req, SEND_CANCELED);
1386
		return 0;
P
Philipp Reisner 已提交
1387
	}
1388
	req->pre_send_jif = jiffies;
P
Philipp Reisner 已提交
1389

1390 1391 1392
	re_init_if_first_write(connection, req->epoch);
	maybe_send_barrier(connection, req->epoch);
	connection->send.current_epoch_writes++;
1393

1394
	err = drbd_send_dblock(peer_device, req);
1395
	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
P
Philipp Reisner 已提交
1396

1397
	return err;
P
Philipp Reisner 已提交
1398 1399 1400 1401 1402 1403 1404
}

/**
 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
 * @w:		work object.
 * @cancel:	The connection will be closed anyways
 */
1405
int w_send_read_req(struct drbd_work *w, int cancel)
P
Philipp Reisner 已提交
1406 1407
{
	struct drbd_request *req = container_of(w, struct drbd_request, w);
1408
	struct drbd_device *device = req->device;
1409 1410
	struct drbd_peer_device *const peer_device = first_peer_device(device);
	struct drbd_connection *connection = peer_device->connection;
1411
	int err;
P
Philipp Reisner 已提交
1412 1413

	if (unlikely(cancel)) {
1414
		req_mod(req, SEND_CANCELED);
1415
		return 0;
P
Philipp Reisner 已提交
1416
	}
1417
	req->pre_send_jif = jiffies;
P
Philipp Reisner 已提交
1418

1419 1420
	/* Even read requests may close a write epoch,
	 * if there was any yet. */
1421
	maybe_send_barrier(connection, req->epoch);
1422

1423
	err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1424
				 (unsigned long)req);
P
Philipp Reisner 已提交
1425

1426
	req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
P
Philipp Reisner 已提交
1427

1428
	return err;
P
Philipp Reisner 已提交
1429 1430
}

1431
int w_restart_disk_io(struct drbd_work *w, int cancel)
1432 1433
{
	struct drbd_request *req = container_of(w, struct drbd_request, w);
1434
	struct drbd_device *device = req->device;
1435

1436
	if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1437
		drbd_al_begin_io(device, &req->i);
1438 1439

	drbd_req_make_private_bio(req, req->master_bio);
1440
	req->private_bio->bi_bdev = device->ldev->backing_bdev;
1441 1442
	generic_make_request(req->private_bio);

1443
	return 0;
1444 1445
}

1446
static int _drbd_may_sync_now(struct drbd_device *device)
P
Philipp Reisner 已提交
1447
{
1448
	struct drbd_device *odev = device;
1449
	int resync_after;
P
Philipp Reisner 已提交
1450 1451

	while (1) {
1452
		if (!odev->ldev || odev->state.disk == D_DISKLESS)
1453
			return 1;
P
Philipp Reisner 已提交
1454
		rcu_read_lock();
1455
		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
P
Philipp Reisner 已提交
1456
		rcu_read_unlock();
1457
		if (resync_after == -1)
P
Philipp Reisner 已提交
1458
			return 1;
1459
		odev = minor_to_device(resync_after);
1460
		if (!odev)
1461
			return 1;
P
Philipp Reisner 已提交
1462 1463 1464 1465 1466 1467 1468 1469 1470
		if ((odev->state.conn >= C_SYNC_SOURCE &&
		     odev->state.conn <= C_PAUSED_SYNC_T) ||
		    odev->state.aftr_isp || odev->state.peer_isp ||
		    odev->state.user_isp)
			return 0;
	}
}

/**
1471
 * drbd_pause_after() - Pause resync on all devices that may not resync now
1472
 * @device:	DRBD device.
P
Philipp Reisner 已提交
1473 1474 1475
 *
 * Called from process context only (admin command and after_state_ch).
 */
1476
static bool drbd_pause_after(struct drbd_device *device)
P
Philipp Reisner 已提交
1477
{
1478
	bool changed = false;
1479
	struct drbd_device *odev;
1480
	int i;
P
Philipp Reisner 已提交
1481

1482
	rcu_read_lock();
1483
	idr_for_each_entry(&drbd_devices, odev, i) {
P
Philipp Reisner 已提交
1484 1485
		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
			continue;
1486 1487 1488 1489
		if (!_drbd_may_sync_now(odev) &&
		    _drbd_set_state(_NS(odev, aftr_isp, 1),
				    CS_HARD, NULL) != SS_NOTHING_TO_DO)
			changed = true;
P
Philipp Reisner 已提交
1490
	}
1491
	rcu_read_unlock();
P
Philipp Reisner 已提交
1492

1493
	return changed;
P
Philipp Reisner 已提交
1494 1495 1496
}

/**
1497
 * drbd_resume_next() - Resume resync on all devices that may resync now
1498
 * @device:	DRBD device.
P
Philipp Reisner 已提交
1499 1500 1501
 *
 * Called from process context only (admin command and worker).
 */
1502
static bool drbd_resume_next(struct drbd_device *device)
P
Philipp Reisner 已提交
1503
{
1504
	bool changed = false;
1505
	struct drbd_device *odev;
1506
	int i;
P
Philipp Reisner 已提交
1507

1508
	rcu_read_lock();
1509
	idr_for_each_entry(&drbd_devices, odev, i) {
P
Philipp Reisner 已提交
1510 1511 1512
		if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
			continue;
		if (odev->state.aftr_isp) {
1513 1514 1515 1516
			if (_drbd_may_sync_now(odev) &&
			    _drbd_set_state(_NS(odev, aftr_isp, 0),
					    CS_HARD, NULL) != SS_NOTHING_TO_DO)
				changed = true;
P
Philipp Reisner 已提交
1517 1518
		}
	}
1519
	rcu_read_unlock();
1520
	return changed;
P
Philipp Reisner 已提交
1521 1522
}

1523
void resume_next_sg(struct drbd_device *device)
P
Philipp Reisner 已提交
1524
{
1525 1526 1527
	lock_all_resources();
	drbd_resume_next(device);
	unlock_all_resources();
P
Philipp Reisner 已提交
1528 1529
}

1530
void suspend_other_sg(struct drbd_device *device)
P
Philipp Reisner 已提交
1531
{
1532 1533 1534
	lock_all_resources();
	drbd_pause_after(device);
	unlock_all_resources();
P
Philipp Reisner 已提交
1535 1536
}

1537
/* caller must lock_all_resources() */
1538
enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
P
Philipp Reisner 已提交
1539
{
1540
	struct drbd_device *odev;
1541
	int resync_after;
P
Philipp Reisner 已提交
1542 1543 1544

	if (o_minor == -1)
		return NO_ERROR;
1545
	if (o_minor < -1 || o_minor > MINORMASK)
1546
		return ERR_RESYNC_AFTER;
P
Philipp Reisner 已提交
1547 1548

	/* check for loops */
1549
	odev = minor_to_device(o_minor);
P
Philipp Reisner 已提交
1550
	while (1) {
1551
		if (odev == device)
1552
			return ERR_RESYNC_AFTER_CYCLE;
P
Philipp Reisner 已提交
1553

1554 1555 1556 1557 1558 1559 1560 1561 1562
		/* You are free to depend on diskless, non-existing,
		 * or not yet/no longer existing minors.
		 * We only reject dependency loops.
		 * We cannot follow the dependency chain beyond a detached or
		 * missing minor.
		 */
		if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
			return NO_ERROR;

P
Philipp Reisner 已提交
1563
		rcu_read_lock();
1564
		resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
P
Philipp Reisner 已提交
1565
		rcu_read_unlock();
P
Philipp Reisner 已提交
1566
		/* dependency chain ends here, no cycles. */
1567
		if (resync_after == -1)
P
Philipp Reisner 已提交
1568 1569 1570
			return NO_ERROR;

		/* follow the dependency chain */
1571
		odev = minor_to_device(resync_after);
P
Philipp Reisner 已提交
1572 1573 1574
	}
}

1575
/* caller must lock_all_resources() */
1576
void drbd_resync_after_changed(struct drbd_device *device)
P
Philipp Reisner 已提交
1577
{
1578
	int changed;
P
Philipp Reisner 已提交
1579

1580
	do {
1581 1582 1583
		changed  = drbd_pause_after(device);
		changed |= drbd_resume_next(device);
	} while (changed);
P
Philipp Reisner 已提交
1584 1585
}

1586
void drbd_rs_controller_reset(struct drbd_device *device)
1587
{
1588
	struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
P
Philipp Reisner 已提交
1589 1590
	struct fifo_buffer *plan;

1591 1592 1593
	atomic_set(&device->rs_sect_in, 0);
	atomic_set(&device->rs_sect_ev, 0);
	device->rs_in_flight = 0;
1594 1595 1596
	device->rs_last_events =
		(int)part_stat_read(&disk->part0, sectors[0]) +
		(int)part_stat_read(&disk->part0, sectors[1]);
P
Philipp Reisner 已提交
1597 1598 1599 1600 1601 1602

	/* Updating the RCU protected object in place is necessary since
	   this function gets called from atomic context.
	   It is valid since all other updates also lead to an completely
	   empty fifo */
	rcu_read_lock();
1603
	plan = rcu_dereference(device->rs_plan_s);
P
Philipp Reisner 已提交
1604 1605 1606
	plan->total = 0;
	fifo_set(plan, 0);
	rcu_read_unlock();
1607 1608
}

P
Philipp Reisner 已提交
1609 1610
void start_resync_timer_fn(unsigned long data)
{
1611
	struct drbd_device *device = (struct drbd_device *) data;
1612
	drbd_device_post_work(device, RS_START);
P
Philipp Reisner 已提交
1613 1614
}

1615
static void do_start_resync(struct drbd_device *device)
P
Philipp Reisner 已提交
1616
{
1617
	if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1618
		drbd_warn(device, "postponing start_resync ...\n");
1619 1620
		device->start_resync_timer.expires = jiffies + HZ/10;
		add_timer(&device->start_resync_timer);
1621
		return;
P
Philipp Reisner 已提交
1622 1623
	}

1624 1625
	drbd_start_resync(device, C_SYNC_SOURCE);
	clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
P
Philipp Reisner 已提交
1626 1627
}

1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639
static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
{
	bool csums_after_crash_only;
	rcu_read_lock();
	csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
	rcu_read_unlock();
	return connection->agreed_pro_version >= 89 &&		/* supported? */
		connection->csums_tfm &&			/* configured? */
		(csums_after_crash_only == 0			/* use for each resync? */
		 || test_bit(CRASHED_PRIMARY, &device->flags));	/* or only after Primary crash? */
}

P
Philipp Reisner 已提交
1640 1641
/**
 * drbd_start_resync() - Start the resync process
1642
 * @device:	DRBD device.
P
Philipp Reisner 已提交
1643 1644 1645 1646 1647
 * @side:	Either C_SYNC_SOURCE or C_SYNC_TARGET
 *
 * This function might bring you directly into one of the
 * C_PAUSED_SYNC_* states.
 */
1648
void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
P
Philipp Reisner 已提交
1649
{
1650 1651
	struct drbd_peer_device *peer_device = first_peer_device(device);
	struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
P
Philipp Reisner 已提交
1652 1653 1654
	union drbd_state ns;
	int r;

1655
	if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1656
		drbd_err(device, "Resync already running!\n");
P
Philipp Reisner 已提交
1657 1658 1659
		return;
	}

1660
	if (!test_bit(B_RS_H_DONE, &device->flags)) {
1661 1662 1663 1664
		if (side == C_SYNC_TARGET) {
			/* Since application IO was locked out during C_WF_BITMAP_T and
			   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
			   we check that we might make the data inconsistent. */
1665
			r = drbd_khelper(device, "before-resync-target");
1666 1667
			r = (r >> 8) & 0xff;
			if (r > 0) {
1668
				drbd_info(device, "before-resync-target handler returned %d, "
1669
					 "dropping connection.\n", r);
1670
				conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1671 1672
				return;
			}
1673
		} else /* C_SYNC_SOURCE */ {
1674
			r = drbd_khelper(device, "before-resync-source");
1675 1676 1677
			r = (r >> 8) & 0xff;
			if (r > 0) {
				if (r == 3) {
1678
					drbd_info(device, "before-resync-source handler returned %d, "
1679 1680
						 "ignoring. Old userland tools?", r);
				} else {
1681
					drbd_info(device, "before-resync-source handler returned %d, "
1682
						 "dropping connection.\n", r);
1683
					conn_request_state(connection,
1684
							   NS(conn, C_DISCONNECTING), CS_HARD);
1685 1686 1687
					return;
				}
			}
1688
		}
P
Philipp Reisner 已提交
1689 1690
	}

1691
	if (current == connection->worker.task) {
1692
		/* The worker should not sleep waiting for state_mutex,
1693
		   that can take long */
1694 1695 1696 1697
		if (!mutex_trylock(device->state_mutex)) {
			set_bit(B_RS_H_DONE, &device->flags);
			device->start_resync_timer.expires = jiffies + HZ/5;
			add_timer(&device->start_resync_timer);
1698 1699 1700
			return;
		}
	} else {
1701
		mutex_lock(device->state_mutex);
1702
	}
P
Philipp Reisner 已提交
1703

1704 1705
	lock_all_resources();
	clear_bit(B_RS_H_DONE, &device->flags);
1706
	/* Did some connection breakage or IO error race with us? */
1707 1708
	if (device->state.conn < C_CONNECTED
	|| !get_ldev_if_state(device, D_NEGOTIATING)) {
1709 1710
		unlock_all_resources();
		goto out;
P
Philipp Reisner 已提交
1711 1712
	}

1713
	ns = drbd_read_state(device);
P
Philipp Reisner 已提交
1714

1715
	ns.aftr_isp = !_drbd_may_sync_now(device);
P
Philipp Reisner 已提交
1716 1717 1718 1719 1720 1721 1722 1723

	ns.conn = side;

	if (side == C_SYNC_TARGET)
		ns.disk = D_INCONSISTENT;
	else /* side == C_SYNC_SOURCE */
		ns.pdsk = D_INCONSISTENT;

1724
	r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1725
	ns = drbd_read_state(device);
P
Philipp Reisner 已提交
1726 1727 1728 1729 1730

	if (ns.conn < C_CONNECTED)
		r = SS_UNKNOWN_ERROR;

	if (r == SS_SUCCESS) {
1731
		unsigned long tw = drbd_bm_total_weight(device);
1732 1733 1734
		unsigned long now = jiffies;
		int i;

1735 1736 1737 1738 1739 1740
		device->rs_failed    = 0;
		device->rs_paused    = 0;
		device->rs_same_csum = 0;
		device->rs_last_sect_ev = 0;
		device->rs_total     = tw;
		device->rs_start     = now;
1741
		for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1742 1743
			device->rs_mark_left[i] = tw;
			device->rs_mark_time[i] = now;
1744
		}
1745
		drbd_pause_after(device);
1746 1747 1748 1749 1750 1751 1752 1753
		/* Forget potentially stale cached per resync extent bit-counts.
		 * Open coded drbd_rs_cancel_all(device), we already have IRQs
		 * disabled, and know the disk state is ok. */
		spin_lock(&device->al_lock);
		lc_reset(device->resync);
		device->resync_locked = 0;
		device->resync_wenr = LC_FREE;
		spin_unlock(&device->al_lock);
P
Philipp Reisner 已提交
1754
	}
1755
	unlock_all_resources();
1756

P
Philipp Reisner 已提交
1757
	if (r == SS_SUCCESS) {
1758
		wake_up(&device->al_wait); /* for lc_reset() above */
1759 1760
		/* reset rs_last_bcast when a resync or verify is started,
		 * to deal with potential jiffies wrap. */
1761
		device->rs_last_bcast = jiffies - HZ;
1762

1763
		drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
P
Philipp Reisner 已提交
1764
		     drbd_conn_str(ns.conn),
1765 1766
		     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
		     (unsigned long) device->rs_total);
1767
		if (side == C_SYNC_TARGET) {
1768
			device->bm_resync_fo = 0;
1769 1770 1771 1772
			device->use_csums = use_checksum_based_resync(connection, device);
		} else {
			device->use_csums = 0;
		}
1773 1774 1775 1776 1777 1778 1779 1780

		/* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
		 * with w_send_oos, or the sync target will get confused as to
		 * how much bits to resync.  We cannot do that always, because for an
		 * empty resync and protocol < 95, we need to do it here, as we call
		 * drbd_resync_finished from here in that case.
		 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
		 * and from after_state_ch otherwise. */
1781 1782
		if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
			drbd_gen_and_send_sync_uuid(peer_device);
P
Philipp Reisner 已提交
1783

1784
		if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1785 1786 1787 1788 1789 1790 1791 1792 1793 1794
			/* This still has a race (about when exactly the peers
			 * detect connection loss) that can lead to a full sync
			 * on next handshake. In 8.3.9 we fixed this with explicit
			 * resync-finished notifications, but the fix
			 * introduces a protocol change.  Sleeping for some
			 * time longer than the ping interval + timeout on the
			 * SyncSource, to give the SyncTarget the chance to
			 * detect connection loss, then waiting for a ping
			 * response (implicit in drbd_resync_finished) reduces
			 * the race considerably, but does not solve it. */
1795 1796 1797 1798 1799
			if (side == C_SYNC_SOURCE) {
				struct net_conf *nc;
				int timeo;

				rcu_read_lock();
1800
				nc = rcu_dereference(connection->net_conf);
1801 1802 1803 1804
				timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
				rcu_read_unlock();
				schedule_timeout_interruptible(timeo);
			}
1805
			drbd_resync_finished(device);
P
Philipp Reisner 已提交
1806 1807
		}

1808 1809
		drbd_rs_controller_reset(device);
		/* ns.conn may already be != device->state.conn,
P
Philipp Reisner 已提交
1810 1811 1812 1813
		 * we may have been paused in between, or become paused until
		 * the timer triggers.
		 * No matter, that is handled in resync_timer_fn() */
		if (ns.conn == C_SYNC_TARGET)
1814
			mod_timer(&device->resync_timer, jiffies);
P
Philipp Reisner 已提交
1815

1816
		drbd_md_sync(device);
P
Philipp Reisner 已提交
1817
	}
1818
	put_ldev(device);
1819
out:
1820
	mutex_unlock(device->state_mutex);
P
Philipp Reisner 已提交
1821 1822
}

1823
static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1824 1825 1826 1827 1828 1829 1830 1831
{
	struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
	device->rs_last_bcast = jiffies;

	if (!get_ldev(device))
		return;

	drbd_bm_write_lazy(device, 0);
1832
	if (resync_done && is_sync_state(device->state.conn))
1833
		drbd_resync_finished(device);
1834

1835 1836 1837 1838 1839 1840
	drbd_bcast_event(device, &sib);
	/* update timestamp, in case it took a while to write out stuff */
	device->rs_last_bcast = jiffies;
	put_ldev(device);
}

1841 1842 1843 1844 1845 1846
static void drbd_ldev_destroy(struct drbd_device *device)
{
	lc_destroy(device->resync);
	device->resync = NULL;
	lc_destroy(device->act_log);
	device->act_log = NULL;
1847 1848

	__acquire(local);
1849
	drbd_backing_dev_free(device, device->ldev);
1850 1851 1852
	device->ldev = NULL;
	__release(local);

1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894
	clear_bit(GOING_DISKLESS, &device->flags);
	wake_up(&device->misc_wait);
}

static void go_diskless(struct drbd_device *device)
{
	D_ASSERT(device, device->state.disk == D_FAILED);
	/* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
	 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
	 * the protected members anymore, though, so once put_ldev reaches zero
	 * again, it will be safe to free them. */

	/* Try to write changed bitmap pages, read errors may have just
	 * set some bits outside the area covered by the activity log.
	 *
	 * If we have an IO error during the bitmap writeout,
	 * we will want a full sync next time, just in case.
	 * (Do we want a specific meta data flag for this?)
	 *
	 * If that does not make it to stable storage either,
	 * we cannot do anything about that anymore.
	 *
	 * We still need to check if both bitmap and ldev are present, we may
	 * end up here after a failed attach, before ldev was even assigned.
	 */
	if (device->bitmap && device->ldev) {
		/* An interrupted resync or similar is allowed to recounts bits
		 * while we detach.
		 * Any modifications would not be expected anymore, though.
		 */
		if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
					"detach", BM_LOCKED_TEST_ALLOWED)) {
			if (test_bit(WAS_READ_ERROR, &device->flags)) {
				drbd_md_set_flag(device, MDF_FULL_SYNC);
				drbd_md_sync(device);
			}
		}
	}

	drbd_force_state(device, NS(disk, D_DISKLESS));
}

1895 1896 1897 1898 1899 1900 1901
static int do_md_sync(struct drbd_device *device)
{
	drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
	drbd_md_sync(device);
	return 0;
}

1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924
/* only called from drbd_worker thread, no locking */
void __update_timing_details(
		struct drbd_thread_timing_details *tdp,
		unsigned int *cb_nr,
		void *cb,
		const char *fn, const unsigned int line)
{
	unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
	struct drbd_thread_timing_details *td = tdp + i;

	td->start_jif = jiffies;
	td->cb_addr = cb;
	td->caller_fn = fn;
	td->line = line;
	td->cb_nr = *cb_nr;

	i = (i+1) % DRBD_THREAD_DETAILS_HIST;
	td = tdp + i;
	memset(td, 0, sizeof(*td));

	++(*cb_nr);
}

1925 1926
static void do_device_work(struct drbd_device *device, const unsigned long todo)
{
1927
	if (test_bit(MD_SYNC, &todo))
1928
		do_md_sync(device);
1929 1930 1931 1932
	if (test_bit(RS_DONE, &todo) ||
	    test_bit(RS_PROGRESS, &todo))
		update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
	if (test_bit(GO_DISKLESS, &todo))
1933
		go_diskless(device);
1934
	if (test_bit(DESTROY_DISK, &todo))
1935
		drbd_ldev_destroy(device);
1936
	if (test_bit(RS_START, &todo))
1937
		do_start_resync(device);
1938 1939 1940 1941 1942
}

#define DRBD_DEVICE_WORK_MASK	\
	((1UL << GO_DISKLESS)	\
	|(1UL << DESTROY_DISK)	\
1943 1944
	|(1UL << MD_SYNC)	\
	|(1UL << RS_START)	\
1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959
	|(1UL << RS_PROGRESS)	\
	|(1UL << RS_DONE)	\
	)

static unsigned long get_work_bits(unsigned long *flags)
{
	unsigned long old, new;
	do {
		old = *flags;
		new = old & ~DRBD_DEVICE_WORK_MASK;
	} while (cmpxchg(flags, old, new) != old);
	return old & DRBD_DEVICE_WORK_MASK;
}

static void do_unqueued_work(struct drbd_connection *connection)
1960 1961 1962 1963 1964 1965 1966
{
	struct drbd_peer_device *peer_device;
	int vnr;

	rcu_read_lock();
	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
		struct drbd_device *device = peer_device->device;
1967 1968
		unsigned long todo = get_work_bits(&device->flags);
		if (!todo)
1969
			continue;
1970

1971 1972
		kref_get(&device->kref);
		rcu_read_unlock();
1973
		do_device_work(device, todo);
1974 1975 1976 1977 1978 1979
		kref_put(&device->kref, drbd_destroy_device);
		rcu_read_lock();
	}
	rcu_read_unlock();
}

1980
static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1981 1982
{
	spin_lock_irq(&queue->q_lock);
1983
	list_splice_tail_init(&queue->q, work_list);
1984 1985 1986 1987
	spin_unlock_irq(&queue->q_lock);
	return !list_empty(work_list);
}

1988
static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
1989 1990 1991 1992 1993
{
	DEFINE_WAIT(wait);
	struct net_conf *nc;
	int uncork, cork;

1994
	dequeue_work_batch(&connection->sender_work, work_list);
1995 1996 1997 1998 1999 2000 2001 2002 2003 2004 2005 2006 2007 2008 2009 2010 2011 2012 2013 2014 2015 2016 2017
	if (!list_empty(work_list))
		return;

	/* Still nothing to do?
	 * Maybe we still need to close the current epoch,
	 * even if no new requests are queued yet.
	 *
	 * Also, poke TCP, just in case.
	 * Then wait for new work (or signal). */
	rcu_read_lock();
	nc = rcu_dereference(connection->net_conf);
	uncork = nc ? nc->tcp_cork : 0;
	rcu_read_unlock();
	if (uncork) {
		mutex_lock(&connection->data.mutex);
		if (connection->data.socket)
			drbd_tcp_uncork(connection->data.socket);
		mutex_unlock(&connection->data.mutex);
	}

	for (;;) {
		int send_barrier;
		prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2018
		spin_lock_irq(&connection->resource->req_lock);
2019
		spin_lock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
2020
		if (!list_empty(&connection->sender_work.q))
2021
			list_splice_tail_init(&connection->sender_work.q, work_list);
2022 2023
		spin_unlock(&connection->sender_work.q_lock);	/* FIXME get rid of this one? */
		if (!list_empty(work_list) || signal_pending(current)) {
2024
			spin_unlock_irq(&connection->resource->req_lock);
2025 2026
			break;
		}
2027 2028 2029 2030 2031 2032 2033 2034 2035 2036 2037

		/* We found nothing new to do, no to-be-communicated request,
		 * no other work item.  We may still need to close the last
		 * epoch.  Next incoming request epoch will be connection ->
		 * current transfer log epoch number.  If that is different
		 * from the epoch of the last request we communicated, it is
		 * safe to send the epoch separating barrier now.
		 */
		send_barrier =
			atomic_read(&connection->current_tle_nr) !=
			connection->send.current_epoch_nr;
2038
		spin_unlock_irq(&connection->resource->req_lock);
2039 2040 2041 2042

		if (send_barrier)
			maybe_send_barrier(connection,
					connection->send.current_epoch_nr + 1);
2043

2044
		if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2045 2046
			break;

2047 2048 2049
		/* drbd_send() may have called flush_signals() */
		if (get_t_state(&connection->worker) != RUNNING)
			break;
2050

2051 2052 2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069 2070 2071 2072
		schedule();
		/* may be woken up for other things but new work, too,
		 * e.g. if the current epoch got closed.
		 * In which case we send the barrier above. */
	}
	finish_wait(&connection->sender_work.q_wait, &wait);

	/* someone may have changed the config while we have been waiting above. */
	rcu_read_lock();
	nc = rcu_dereference(connection->net_conf);
	cork = nc ? nc->tcp_cork : 0;
	rcu_read_unlock();
	mutex_lock(&connection->data.mutex);
	if (connection->data.socket) {
		if (cork)
			drbd_tcp_cork(connection->data.socket);
		else if (!uncork)
			drbd_tcp_uncork(connection->data.socket);
	}
	mutex_unlock(&connection->data.mutex);
}

P
Philipp Reisner 已提交
2073 2074
int drbd_worker(struct drbd_thread *thi)
{
2075
	struct drbd_connection *connection = thi->connection;
2076
	struct drbd_work *w = NULL;
2077
	struct drbd_peer_device *peer_device;
P
Philipp Reisner 已提交
2078
	LIST_HEAD(work_list);
2079
	int vnr;
P
Philipp Reisner 已提交
2080

2081
	while (get_t_state(thi) == RUNNING) {
2082
		drbd_thread_current_set_cpu(thi);
P
Philipp Reisner 已提交
2083

2084 2085
		if (list_empty(&work_list)) {
			update_worker_timing_details(connection, wait_for_work);
2086
			wait_for_work(connection, &work_list);
2087
		}
P
Philipp Reisner 已提交
2088

2089 2090
		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
			update_worker_timing_details(connection, do_unqueued_work);
2091
			do_unqueued_work(connection);
2092
		}
2093

2094
		if (signal_pending(current)) {
P
Philipp Reisner 已提交
2095
			flush_signals(current);
2096
			if (get_t_state(thi) == RUNNING) {
2097
				drbd_warn(connection, "Worker got an unexpected signal\n");
P
Philipp Reisner 已提交
2098
				continue;
2099
			}
P
Philipp Reisner 已提交
2100 2101 2102
			break;
		}

2103
		if (get_t_state(thi) != RUNNING)
P
Philipp Reisner 已提交
2104 2105
			break;

2106
		if (!list_empty(&work_list)) {
2107 2108
			w = list_first_entry(&work_list, struct drbd_work, list);
			list_del_init(&w->list);
2109
			update_worker_timing_details(connection, w->cb);
2110
			if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2111
				continue;
2112 2113
			if (connection->cstate >= C_WF_REPORT_PARAMS)
				conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
P
Philipp Reisner 已提交
2114 2115 2116
		}
	}

2117
	do {
2118 2119
		if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
			update_worker_timing_details(connection, do_unqueued_work);
2120
			do_unqueued_work(connection);
2121
		}
2122
		if (!list_empty(&work_list)) {
2123 2124
			w = list_first_entry(&work_list, struct drbd_work, list);
			list_del_init(&w->list);
2125
			update_worker_timing_details(connection, w->cb);
2126
			w->cb(w, 1);
2127 2128
		} else
			dequeue_work_batch(&connection->sender_work, &work_list);
2129
	} while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
P
Philipp Reisner 已提交
2130

P
Philipp Reisner 已提交
2131
	rcu_read_lock();
2132 2133
	idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
		struct drbd_device *device = peer_device->device;
2134
		D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2135
		kref_get(&device->kref);
P
Philipp Reisner 已提交
2136
		rcu_read_unlock();
2137
		drbd_device_cleanup(device);
2138
		kref_put(&device->kref, drbd_destroy_device);
P
Philipp Reisner 已提交
2139
		rcu_read_lock();
2140
	}
P
Philipp Reisner 已提交
2141
	rcu_read_unlock();
P
Philipp Reisner 已提交
2142 2143 2144

	return 0;
}