diff --git a/block/blk-core.c b/block/blk-core.c
index dd134d834d589468fe1794c71a8325920ee5ac34..4b4dbdfbca89fe5769fd4b2f6826f305fca18e26 100644
--- a/block/blk-core.c
+++ b/block/blk-core.c
@@ -2909,23 +2909,47 @@ static void queue_unplugged(struct request_queue *q, unsigned int depth,
 
 }
 
-static void flush_plug_callbacks(struct blk_plug *plug)
+static void flush_plug_callbacks(struct blk_plug *plug, bool from_schedule)
 {
 	LIST_HEAD(callbacks);
 
-	if (list_empty(&plug->cb_list))
-		return;
-
-	list_splice_init(&plug->cb_list, &callbacks);
+	while (!list_empty(&plug->cb_list)) {
+		list_splice_init(&plug->cb_list, &callbacks);
 
-	while (!list_empty(&callbacks)) {
-		struct blk_plug_cb *cb = list_first_entry(&callbacks,
+		while (!list_empty(&callbacks)) {
+			struct blk_plug_cb *cb = list_first_entry(&callbacks,
 							  struct blk_plug_cb,
 							  list);
-		list_del(&cb->list);
-		cb->callback(cb);
+			list_del(&cb->list);
+			cb->callback(cb, from_schedule);
+		}
+	}
+}
+
+struct blk_plug_cb *blk_check_plugged(blk_plug_cb_fn unplug, void *data,
+				      int size)
+{
+	struct blk_plug *plug = current->plug;
+	struct blk_plug_cb *cb;
+
+	if (!plug)
+		return NULL;
+
+	list_for_each_entry(cb, &plug->cb_list, list)
+		if (cb->callback == unplug && cb->data == data)
+			return cb;
+
+	/* Not currently on the callback list */
+	BUG_ON(size < sizeof(*cb));
+	cb = kzalloc(size, GFP_ATOMIC);
+	if (cb) {
+		cb->data = data;
+		cb->callback = unplug;
+		list_add(&cb->list, &plug->cb_list);
 	}
+	return cb;
 }
+EXPORT_SYMBOL(blk_check_plugged);
 
 void blk_flush_plug_list(struct blk_plug *plug, bool from_schedule)
 {
@@ -2937,7 +2961,7 @@ void blk_flush_plug_list(struct blk_plug *plug, bool from_schedule)
 
 	BUG_ON(plug->magic != PLUG_MAGIC);
 
-	flush_plug_callbacks(plug);
+	flush_plug_callbacks(plug, from_schedule);
 	if (list_empty(&plug->list))
 		return;
 
diff --git a/drivers/block/drbd/drbd_actlog.c b/drivers/block/drbd/drbd_actlog.c
index e54e31b02b88eb6e927072745f345871a8d96203..3fbef018ce555fe47a716a0de01c2df837a2cc9e 100644
--- a/drivers/block/drbd/drbd_actlog.c
+++ b/drivers/block/drbd/drbd_actlog.c
@@ -411,7 +411,7 @@ w_al_write_transaction(struct drbd_conf *mdev, struct drbd_work *w, int unused)
 		+ mdev->ldev->md.al_offset + mdev->al_tr_pos;
 
 	if (!drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE))
-		drbd_chk_io_error(mdev, 1, true);
+		drbd_chk_io_error(mdev, 1, DRBD_META_IO_ERROR);
 
 	if (++mdev->al_tr_pos >
 	    div_ceil(mdev->act_log->nr_elements, AL_EXTENTS_PT))
@@ -876,7 +876,11 @@ int __drbd_set_out_of_sync(struct drbd_conf *mdev, sector_t sector, int size,
 	unsigned int enr, count = 0;
 	struct lc_element *e;
 
-	if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
+	/* this should be an empty REQ_FLUSH */
+	if (size == 0)
+		return 0;
+
+	if (size < 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_BIO_SIZE) {
 		dev_err(DEV, "sector: %llus, size: %d\n",
 			(unsigned long long)sector, size);
 		return 0;
diff --git a/drivers/block/drbd/drbd_bitmap.c b/drivers/block/drbd/drbd_bitmap.c
index fcb956bb4b4c525875f961e7c2398b20c35723f4..ba91b408abad75ce7da0ff173595ce23ee51f635 100644
--- a/drivers/block/drbd/drbd_bitmap.c
+++ b/drivers/block/drbd/drbd_bitmap.c
@@ -1096,7 +1096,7 @@ static int bm_rw(struct drbd_conf *mdev, int rw, unsigned flags, unsigned lazy_w
 
 	if (ctx->error) {
 		dev_alert(DEV, "we had at least one MD IO ERROR during bitmap IO\n");
-		drbd_chk_io_error(mdev, 1, true);
+		drbd_chk_io_error(mdev, 1, DRBD_META_IO_ERROR);
 		err = -EIO; /* ctx->error ? */
 	}
 
@@ -1212,7 +1212,7 @@ int drbd_bm_write_page(struct drbd_conf *mdev, unsigned int idx) __must_hold(loc
 	wait_until_done_or_disk_failure(mdev, mdev->ldev, &ctx->done);
 
 	if (ctx->error)
-		drbd_chk_io_error(mdev, 1, true);
+		drbd_chk_io_error(mdev, 1, DRBD_META_IO_ERROR);
 		/* that should force detach, so the in memory bitmap will be
 		 * gone in a moment as well. */
 
diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h
index 02f013a073a75b66fe73c8658f1cde96ea7102a7..b2ca143d0053d75487e3a083a4271b9e5d0d285e 100644
--- a/drivers/block/drbd/drbd_int.h
+++ b/drivers/block/drbd/drbd_int.h
@@ -813,7 +813,6 @@ enum {
 	SIGNAL_ASENDER,		/* whether asender wants to be interrupted */
 	SEND_PING,		/* whether asender should send a ping asap */
 
-	UNPLUG_QUEUED,		/* only relevant with kernel 2.4 */
 	UNPLUG_REMOTE,		/* sending a "UnplugRemote" could help */
 	MD_DIRTY,		/* current uuids and flags not yet on disk */
 	DISCARD_CONCURRENT,	/* Set on one node, cleared on the peer! */
@@ -824,7 +823,6 @@ enum {
 	CRASHED_PRIMARY,	/* This node was a crashed primary.
 				 * Gets cleared when the state.conn
 				 * goes into C_CONNECTED state. */
-	NO_BARRIER_SUPP,	/* underlying block device doesn't implement barriers */
 	CONSIDER_RESYNC,
 
 	MD_NO_FUA,		/* Users wants us to not use FUA/FLUSH on meta data dev */
@@ -834,6 +832,7 @@ enum {
 	BITMAP_IO_QUEUED,       /* Started bitmap IO */
 	GO_DISKLESS,		/* Disk is being detached, on io-error or admin request. */
 	WAS_IO_ERROR,		/* Local disk failed returned IO error */
+	FORCE_DETACH,		/* Force-detach from local disk, aborting any pending local IO */
 	RESYNC_AFTER_NEG,       /* Resync after online grow after the attach&negotiate finished. */
 	NET_CONGESTED,		/* The data socket is congested */
 
@@ -851,6 +850,13 @@ enum {
 	AL_SUSPENDED,		/* Activity logging is currently suspended. */
 	AHEAD_TO_SYNC_SOURCE,   /* Ahead -> SyncSource queued */
 	STATE_SENT,		/* Do not change state/UUIDs while this is set */
+
+	CALLBACK_PENDING,	/* Whether we have a call_usermodehelper(, UMH_WAIT_PROC)
+				 * pending, from drbd worker context.
+				 * If set, bdi_write_congested() returns true,
+				 * so shrink_page_list() would not recurse into,
+				 * and potentially deadlock on, this drbd worker.
+				 */
 };
 
 struct drbd_bitmap; /* opaque for drbd_conf */
@@ -1130,8 +1136,8 @@ struct drbd_conf {
 	int rs_in_flight; /* resync sectors in flight (to proxy, in proxy and from proxy) */
 	int rs_planed;    /* resync sectors already planned */
 	atomic_t ap_in_flight; /* App sectors in flight (waiting for ack) */
-	int peer_max_bio_size;
-	int local_max_bio_size;
+	unsigned int peer_max_bio_size;
+	unsigned int local_max_bio_size;
 };
 
 static inline struct drbd_conf *minor_to_mdev(unsigned int minor)
@@ -1435,9 +1441,9 @@ struct bm_extent {
  * hash table. */
 #define HT_SHIFT 8
 #define DRBD_MAX_BIO_SIZE (1U<<(9+HT_SHIFT))
-#define DRBD_MAX_BIO_SIZE_SAFE (1 << 12)       /* Works always = 4k */
+#define DRBD_MAX_BIO_SIZE_SAFE (1U << 12)       /* Works always = 4k */
 
-#define DRBD_MAX_SIZE_H80_PACKET (1 << 15) /* The old header only allows packets up to 32Kib data */
+#define DRBD_MAX_SIZE_H80_PACKET (1U << 15) /* The old header only allows packets up to 32Kib data */
 
 /* Number of elements in the app_reads_hash */
 #define APP_R_HSIZE 15
@@ -1840,12 +1846,20 @@ static inline int drbd_request_state(struct drbd_conf *mdev,
 	return _drbd_request_state(mdev, mask, val, CS_VERBOSE + CS_ORDERED);
 }
 
+enum drbd_force_detach_flags {
+	DRBD_IO_ERROR,
+	DRBD_META_IO_ERROR,
+	DRBD_FORCE_DETACH,
+};
+
 #define __drbd_chk_io_error(m,f) __drbd_chk_io_error_(m,f, __func__)
-static inline void __drbd_chk_io_error_(struct drbd_conf *mdev, int forcedetach, const char *where)
+static inline void __drbd_chk_io_error_(struct drbd_conf *mdev,
+		enum drbd_force_detach_flags forcedetach,
+		const char *where)
 {
 	switch (mdev->ldev->dc.on_io_error) {
 	case EP_PASS_ON:
-		if (!forcedetach) {
+		if (forcedetach == DRBD_IO_ERROR) {
 			if (__ratelimit(&drbd_ratelimit_state))
 				dev_err(DEV, "Local IO failed in %s.\n", where);
 			if (mdev->state.disk > D_INCONSISTENT)
@@ -1856,6 +1870,8 @@ static inline void __drbd_chk_io_error_(struct drbd_conf *mdev, int forcedetach,
 	case EP_DETACH:
 	case EP_CALL_HELPER:
 		set_bit(WAS_IO_ERROR, &mdev->flags);
+		if (forcedetach == DRBD_FORCE_DETACH)
+			set_bit(FORCE_DETACH, &mdev->flags);
 		if (mdev->state.disk > D_FAILED) {
 			_drbd_set_state(_NS(mdev, disk, D_FAILED), CS_HARD, NULL);
 			dev_err(DEV,
@@ -1875,7 +1891,7 @@ static inline void __drbd_chk_io_error_(struct drbd_conf *mdev, int forcedetach,
  */
 #define drbd_chk_io_error(m,e,f) drbd_chk_io_error_(m,e,f, __func__)
 static inline void drbd_chk_io_error_(struct drbd_conf *mdev,
-	int error, int forcedetach, const char *where)
+	int error, enum drbd_force_detach_flags forcedetach, const char *where)
 {
 	if (error) {
 		unsigned long flags;
@@ -2405,15 +2421,17 @@ static inline void dec_ap_bio(struct drbd_conf *mdev)
 	int ap_bio = atomic_dec_return(&mdev->ap_bio_cnt);
 
 	D_ASSERT(ap_bio >= 0);
+
+	if (ap_bio == 0 && test_bit(BITMAP_IO, &mdev->flags)) {
+		if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
+			drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
+	}
+
 	/* this currently does wake_up for every dec_ap_bio!
 	 * maybe rather introduce some type of hysteresis?
 	 * e.g. (ap_bio == mxb/2 || ap_bio == 0) ? */
 	if (ap_bio < mxb)
 		wake_up(&mdev->misc_wait);
-	if (ap_bio == 0 && test_bit(BITMAP_IO, &mdev->flags)) {
-		if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
-			drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
-	}
 }
 
 static inline int drbd_set_ed_uuid(struct drbd_conf *mdev, u64 val)
diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c
index 920ede2829d6c5e467e177ac43a3e97e9f550aac..2e0e7fc1dbbaf50e24d8a446a4e20aeb2239f5bb 100644
--- a/drivers/block/drbd/drbd_main.c
+++ b/drivers/block/drbd/drbd_main.c
@@ -1514,6 +1514,13 @@ static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
 
 	/* Do not change the order of the if above and the two below... */
 	if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
+		/* we probably will start a resync soon.
+		 * make sure those things are properly reset. */
+		mdev->rs_total = 0;
+		mdev->rs_failed = 0;
+		atomic_set(&mdev->rs_pending_cnt, 0);
+		drbd_rs_cancel_all(mdev);
+
 		drbd_send_uuids(mdev);
 		drbd_send_state(mdev, ns);
 	}
@@ -1630,9 +1637,24 @@ static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
 			eh = mdev->ldev->dc.on_io_error;
 			was_io_error = test_and_clear_bit(WAS_IO_ERROR, &mdev->flags);
 
-			/* Immediately allow completion of all application IO, that waits
-			   for completion from the local disk. */
-			tl_abort_disk_io(mdev);
+			if (was_io_error && eh == EP_CALL_HELPER)
+				drbd_khelper(mdev, "local-io-error");
+
+			/* Immediately allow completion of all application IO,
+			 * that waits for completion from the local disk,
+			 * if this was a force-detach due to disk_timeout
+			 * or administrator request (drbdsetup detach --force).
+			 * Do NOT abort otherwise.
+			 * Aborting local requests may cause serious problems,
+			 * if requests are completed to upper layers already,
+			 * and then later the already submitted local bio completes.
+			 * This can cause DMA into former bio pages that meanwhile
+			 * have been re-used for other things.
+			 * So aborting local requests may cause crashes,
+			 * or even worse, silent data corruption.
+			 */
+			if (test_and_clear_bit(FORCE_DETACH, &mdev->flags))
+				tl_abort_disk_io(mdev);
 
 			/* current state still has to be D_FAILED,
 			 * there is only one way out: to D_DISKLESS,
@@ -1653,9 +1675,6 @@ static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
 			drbd_md_sync(mdev);
 		}
 		put_ldev(mdev);
-
-		if (was_io_error && eh == EP_CALL_HELPER)
-			drbd_khelper(mdev, "local-io-error");
 	}
 
         /* second half of local IO error, failure to attach,
@@ -1669,10 +1688,6 @@ static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
                                 "ASSERT FAILED: disk is %s while going diskless\n",
                                 drbd_disk_str(mdev->state.disk));
 
-                mdev->rs_total = 0;
-                mdev->rs_failed = 0;
-                atomic_set(&mdev->rs_pending_cnt, 0);
-
 		if (ns.conn >= C_CONNECTED)
 			drbd_send_state(mdev, ns);
 
@@ -2194,7 +2209,8 @@ int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags fl
 {
 	struct p_sizes p;
 	sector_t d_size, u_size;
-	int q_order_type, max_bio_size;
+	int q_order_type;
+	unsigned int max_bio_size;
 	int ok;
 
 	if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
@@ -2203,7 +2219,7 @@ int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags fl
 		u_size = mdev->ldev->dc.disk_size;
 		q_order_type = drbd_queue_order_type(mdev);
 		max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
-		max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
+		max_bio_size = min(max_bio_size, DRBD_MAX_BIO_SIZE);
 		put_ldev(mdev);
 	} else {
 		d_size = 0;
@@ -2214,7 +2230,7 @@ int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags fl
 
 	/* Never allow old drbd (up to 8.3.7) to see more than 32KiB */
 	if (mdev->agreed_pro_version <= 94)
-		max_bio_size = min_t(int, max_bio_size, DRBD_MAX_SIZE_H80_PACKET);
+		max_bio_size = min(max_bio_size, DRBD_MAX_SIZE_H80_PACKET);
 
 	p.d_size = cpu_to_be64(d_size);
 	p.u_size = cpu_to_be64(u_size);
@@ -3541,6 +3557,22 @@ static int drbd_congested(void *congested_data, int bdi_bits)
 		goto out;
 	}
 
+	if (test_bit(CALLBACK_PENDING, &mdev->flags)) {
+		r |= (1 << BDI_async_congested);
+		/* Without good local data, we would need to read from remote,
+		 * and that would need the worker thread as well, which is
+		 * currently blocked waiting for that usermode helper to
+		 * finish.
+		 */
+		if (!get_ldev_if_state(mdev, D_UP_TO_DATE))
+			r |= (1 << BDI_sync_congested);
+		else
+			put_ldev(mdev);
+		r &= bdi_bits;
+		reason = 'c';
+		goto out;
+	}
+
 	if (get_ldev(mdev)) {
 		q = bdev_get_queue(mdev->ldev->backing_bdev);
 		r = bdi_congested(&q->backing_dev_info, bdi_bits);
@@ -3604,6 +3636,7 @@ struct drbd_conf *drbd_new_device(unsigned int minor)
 	q->backing_dev_info.congested_data = mdev;
 
 	blk_queue_make_request(q, drbd_make_request);
+	blk_queue_flush(q, REQ_FLUSH | REQ_FUA);
 	/* Setting the max_hw_sectors to an odd value of 8kibyte here
 	   This triggers a max_bio_size message upon first attach or connect */
 	blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
@@ -3870,7 +3903,7 @@ void drbd_md_sync(struct drbd_conf *mdev)
 	if (!drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
 		/* this was a try anyways ... */
 		dev_err(DEV, "meta data update failed!\n");
-		drbd_chk_io_error(mdev, 1, true);
+		drbd_chk_io_error(mdev, 1, DRBD_META_IO_ERROR);
 	}
 
 	/* Update mdev->ldev->md.la_size_sect,
@@ -3950,9 +3983,9 @@ int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
 
 	spin_lock_irq(&mdev->req_lock);
 	if (mdev->state.conn < C_CONNECTED) {
-		int peer;
+		unsigned int peer;
 		peer = be32_to_cpu(buffer->la_peer_max_bio_size);
-		peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
+		peer = max(peer, DRBD_MAX_BIO_SIZE_SAFE);
 		mdev->peer_max_bio_size = peer;
 	}
 	spin_unlock_irq(&mdev->req_lock);
diff --git a/drivers/block/drbd/drbd_nl.c b/drivers/block/drbd/drbd_nl.c
index 6d4de6a72e8069018b2b4d050b7ef2aa8b259259..fb9dce8daa2468c76992f4ab609adb471bc42af3 100644
--- a/drivers/block/drbd/drbd_nl.c
+++ b/drivers/block/drbd/drbd_nl.c
@@ -147,6 +147,9 @@ int drbd_khelper(struct drbd_conf *mdev, char *cmd)
 	char *argv[] = {usermode_helper, cmd, mb, NULL };
 	int ret;
 
+	if (current == mdev->worker.task)
+		set_bit(CALLBACK_PENDING, &mdev->flags);
+
 	snprintf(mb, 12, "minor-%d", mdev_to_minor(mdev));
 
 	if (get_net_conf(mdev)) {
@@ -189,6 +192,9 @@ int drbd_khelper(struct drbd_conf *mdev, char *cmd)
 				usermode_helper, cmd, mb,
 				(ret >> 8) & 0xff, ret);
 
+	if (current == mdev->worker.task)
+		clear_bit(CALLBACK_PENDING, &mdev->flags);
+
 	if (ret < 0) /* Ignore any ERRNOs we got. */
 		ret = 0;
 
@@ -795,8 +801,8 @@ static int drbd_check_al_size(struct drbd_conf *mdev)
 static void drbd_setup_queue_param(struct drbd_conf *mdev, unsigned int max_bio_size)
 {
 	struct request_queue * const q = mdev->rq_queue;
-	int max_hw_sectors = max_bio_size >> 9;
-	int max_segments = 0;
+	unsigned int max_hw_sectors = max_bio_size >> 9;
+	unsigned int max_segments = 0;
 
 	if (get_ldev_if_state(mdev, D_ATTACHING)) {
 		struct request_queue * const b = mdev->ldev->backing_bdev->bd_disk->queue;
@@ -829,7 +835,7 @@ static void drbd_setup_queue_param(struct drbd_conf *mdev, unsigned int max_bio_
 
 void drbd_reconsider_max_bio_size(struct drbd_conf *mdev)
 {
-	int now, new, local, peer;
+	unsigned int now, new, local, peer;
 
 	now = queue_max_hw_sectors(mdev->rq_queue) << 9;
 	local = mdev->local_max_bio_size; /* Eventually last known value, from volatile memory */
@@ -840,13 +846,14 @@ void drbd_reconsider_max_bio_size(struct drbd_conf *mdev)
 		mdev->local_max_bio_size = local;
 		put_ldev(mdev);
 	}
+	local = min(local, DRBD_MAX_BIO_SIZE);
 
 	/* We may ignore peer limits if the peer is modern enough.
 	   Because new from 8.3.8 onwards the peer can use multiple
 	   BIOs for a single peer_request */
 	if (mdev->state.conn >= C_CONNECTED) {
 		if (mdev->agreed_pro_version < 94) {
-			peer = min_t(int, mdev->peer_max_bio_size, DRBD_MAX_SIZE_H80_PACKET);
+			peer = min(mdev->peer_max_bio_size, DRBD_MAX_SIZE_H80_PACKET);
 			/* Correct old drbd (up to 8.3.7) if it believes it can do more than 32KiB */
 		} else if (mdev->agreed_pro_version == 94)
 			peer = DRBD_MAX_SIZE_H80_PACKET;
@@ -854,10 +861,10 @@ void drbd_reconsider_max_bio_size(struct drbd_conf *mdev)
 			peer = DRBD_MAX_BIO_SIZE;
 	}
 
-	new = min_t(int, local, peer);
+	new = min(local, peer);
 
 	if (mdev->state.role == R_PRIMARY && new < now)
-		dev_err(DEV, "ASSERT FAILED new < now; (%d < %d)\n", new, now);
+		dev_err(DEV, "ASSERT FAILED new < now; (%u < %u)\n", new, now);
 
 	if (new != now)
 		dev_info(DEV, "max BIO size = %u\n", new);
@@ -950,6 +957,14 @@ static int drbd_nl_disk_conf(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nlp
 	 * to realize a "hot spare" feature (not that I'd recommend that) */
 	wait_event(mdev->misc_wait, !atomic_read(&mdev->local_cnt));
 
+	/* make sure there is no leftover from previous force-detach attempts */
+	clear_bit(FORCE_DETACH, &mdev->flags);
+
+	/* and no leftover from previously aborted resync or verify, either */
+	mdev->rs_total = 0;
+	mdev->rs_failed = 0;
+	atomic_set(&mdev->rs_pending_cnt, 0);
+
 	/* allocation not in the IO path, cqueue thread context */
 	nbc = kzalloc(sizeof(struct drbd_backing_dev), GFP_KERNEL);
 	if (!nbc) {
@@ -1345,6 +1360,7 @@ static int drbd_nl_detach(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nlp,
 	}
 
 	if (dt.detach_force) {
+		set_bit(FORCE_DETACH, &mdev->flags);
 		drbd_force_state(mdev, NS(disk, D_FAILED));
 		reply->ret_code = SS_SUCCESS;
 		goto out;
@@ -1962,9 +1978,11 @@ static int drbd_nl_invalidate(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nl
 	int retcode;
 
 	/* If there is still bitmap IO pending, probably because of a previous
-	 * resync just being finished, wait for it before requesting a new resync. */
+	 * resync just being finished, wait for it before requesting a new resync.
+	 * Also wait for it's after_state_ch(). */
 	drbd_suspend_io(mdev);
 	wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
+	drbd_flush_workqueue(mdev);
 
 	retcode = _drbd_request_state(mdev, NS(conn, C_STARTING_SYNC_T), CS_ORDERED);
 
@@ -2003,9 +2021,11 @@ static int drbd_nl_invalidate_peer(struct drbd_conf *mdev, struct drbd_nl_cfg_re
 	int retcode;
 
 	/* If there is still bitmap IO pending, probably because of a previous
-	 * resync just being finished, wait for it before requesting a new resync. */
+	 * resync just being finished, wait for it before requesting a new resync.
+	 * Also wait for it's after_state_ch(). */
 	drbd_suspend_io(mdev);
 	wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
+	drbd_flush_workqueue(mdev);
 
 	retcode = _drbd_request_state(mdev, NS(conn, C_STARTING_SYNC_S), CS_ORDERED);
 
diff --git a/drivers/block/drbd/drbd_proc.c b/drivers/block/drbd/drbd_proc.c
index 869bada2ed06838a656d431584afde516a9cd115..5496104f90b9efe295704a271c95b724a8245145 100644
--- a/drivers/block/drbd/drbd_proc.c
+++ b/drivers/block/drbd/drbd_proc.c
@@ -245,6 +245,9 @@ static int drbd_seq_show(struct seq_file *seq, void *v)
 		    mdev->state.role == R_SECONDARY) {
 			seq_printf(seq, "%2d: cs:Unconfigured\n", i);
 		} else {
+			/* reset mdev->congestion_reason */
+			bdi_rw_congested(&mdev->rq_queue->backing_dev_info);
+
 			seq_printf(seq,
 			   "%2d: cs:%s ro:%s/%s ds:%s/%s %c %c%c%c%c%c%c\n"
 			   "    ns:%u nr:%u dw:%u dr:%u al:%u bm:%u "
diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c
index ea4836e0ae9829e12206e482cc50b70678a3e4aa..c74ca2df7431f13553d366fd371d717b6ed1f56b 100644
--- a/drivers/block/drbd/drbd_receiver.c
+++ b/drivers/block/drbd/drbd_receiver.c
@@ -277,6 +277,9 @@ static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
 	atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
 	int i;
 
+	if (page == NULL)
+		return;
+
 	if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE)*minor_count)
 		i = page_chain_free(page);
 	else {
@@ -316,7 +319,7 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
 				     gfp_t gfp_mask) __must_hold(local)
 {
 	struct drbd_epoch_entry *e;
-	struct page *page;
+	struct page *page = NULL;
 	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
 
 	if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
@@ -329,9 +332,11 @@ struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
 		return NULL;
 	}
 
-	page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
-	if (!page)
-		goto fail;
+	if (data_size) {
+		page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
+		if (!page)
+			goto fail;
+	}
 
 	INIT_HLIST_NODE(&e->collision);
 	e->epoch = NULL;
@@ -1270,7 +1275,6 @@ read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __
 
 	data_size -= dgs;
 
-	ERR_IF(data_size == 0) return NULL;
 	ERR_IF(data_size &  0x1ff) return NULL;
 	ERR_IF(data_size >  DRBD_MAX_BIO_SIZE) return NULL;
 
@@ -1291,6 +1295,9 @@ read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __
 	if (!e)
 		return NULL;
 
+	if (!data_size)
+		return e;
+
 	ds = data_size;
 	page = e->pages;
 	page_chain_for_each(page) {
@@ -1715,6 +1722,10 @@ static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned
 
 	dp_flags = be32_to_cpu(p->dp_flags);
 	rw |= wire_flags_to_bio(mdev, dp_flags);
+	if (e->pages == NULL) {
+		D_ASSERT(e->size == 0);
+		D_ASSERT(dp_flags & DP_FLUSH);
+	}
 
 	if (dp_flags & DP_MAY_SET_IN_SYNC)
 		e->flags |= EE_MAY_SET_IN_SYNC;
@@ -3801,11 +3812,18 @@ void drbd_free_tl_hash(struct drbd_conf *mdev)
 	mdev->ee_hash = NULL;
 	mdev->ee_hash_s = 0;
 
-	/* paranoia code */
-	for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
-		if (h->first)
-			dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
-				(int)(h - mdev->tl_hash), h->first);
+	/* We may not have had the chance to wait for all locally pending
+	 * application requests. The hlist_add_fake() prevents access after
+	 * free on master bio completion. */
+	for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++) {
+		struct drbd_request *req;
+		struct hlist_node *pos, *n;
+		hlist_for_each_entry_safe(req, pos, n, h, collision) {
+			hlist_del_init(&req->collision);
+			hlist_add_fake(&req->collision);
+		}
+	}
+
 	kfree(mdev->tl_hash);
 	mdev->tl_hash = NULL;
 	mdev->tl_hash_s = 0;
diff --git a/drivers/block/drbd/drbd_req.c b/drivers/block/drbd/drbd_req.c
index 8e93a6ac9bb65e3497f53c92723a67429ffdf946..910335c30927f0429a4c4b0fddcfe74033c45e8d 100644
--- a/drivers/block/drbd/drbd_req.c
+++ b/drivers/block/drbd/drbd_req.c
@@ -455,7 +455,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
 		req->rq_state |= RQ_LOCAL_COMPLETED;
 		req->rq_state &= ~RQ_LOCAL_PENDING;
 
-		__drbd_chk_io_error(mdev, false);
+		__drbd_chk_io_error(mdev, DRBD_IO_ERROR);
 		_req_may_be_done_not_susp(req, m);
 		break;
 
@@ -477,7 +477,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
 			break;
 		}
 
-		__drbd_chk_io_error(mdev, false);
+		__drbd_chk_io_error(mdev, DRBD_IO_ERROR);
 
 	goto_queue_for_net_read:
 
@@ -1111,13 +1111,12 @@ void drbd_make_request(struct request_queue *q, struct bio *bio)
 	/*
 	 * what we "blindly" assume:
 	 */
-	D_ASSERT(bio->bi_size > 0);
 	D_ASSERT((bio->bi_size & 0x1ff) == 0);
 
 	/* to make some things easier, force alignment of requests within the
 	 * granularity of our hash tables */
 	s_enr = bio->bi_sector >> HT_SHIFT;
-	e_enr = (bio->bi_sector+(bio->bi_size>>9)-1) >> HT_SHIFT;
+	e_enr = bio->bi_size ? (bio->bi_sector+(bio->bi_size>>9)-1) >> HT_SHIFT : s_enr;
 
 	if (likely(s_enr == e_enr)) {
 		do {
@@ -1275,7 +1274,7 @@ void request_timer_fn(unsigned long data)
 		 time_after(now, req->start_time + dt) &&
 		!time_in_range(now, mdev->last_reattach_jif, mdev->last_reattach_jif + dt)) {
 		dev_warn(DEV, "Local backing device failed to meet the disk-timeout\n");
-		__drbd_chk_io_error(mdev, 1);
+		__drbd_chk_io_error(mdev, DRBD_FORCE_DETACH);
 	}
 	nt = (time_after(now, req->start_time + et) ? now : req->start_time) + et;
 	spin_unlock_irq(&mdev->req_lock);
diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c
index 620c70ff223118e6f259a200512203f5010e2cda..6bce2cc179d4112980673b1b9d82ae91d70d390f 100644
--- a/drivers/block/drbd/drbd_worker.c
+++ b/drivers/block/drbd/drbd_worker.c
@@ -111,7 +111,7 @@ void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
 	if (list_empty(&mdev->read_ee))
 		wake_up(&mdev->ee_wait);
 	if (test_bit(__EE_WAS_ERROR, &e->flags))
-		__drbd_chk_io_error(mdev, false);
+		__drbd_chk_io_error(mdev, DRBD_IO_ERROR);
 	spin_unlock_irqrestore(&mdev->req_lock, flags);
 
 	drbd_queue_work(&mdev->data.work, &e->w);
@@ -154,7 +154,7 @@ static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(lo
 		: list_empty(&mdev->active_ee);
 
 	if (test_bit(__EE_WAS_ERROR, &e->flags))
-		__drbd_chk_io_error(mdev, false);
+		__drbd_chk_io_error(mdev, DRBD_IO_ERROR);
 	spin_unlock_irqrestore(&mdev->req_lock, flags);
 
 	if (is_syncer_req)
@@ -1501,14 +1501,6 @@ void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
 		return;
 	}
 
-	if (mdev->state.conn < C_AHEAD) {
-		/* In case a previous resync run was aborted by an IO error/detach on the peer. */
-		drbd_rs_cancel_all(mdev);
-		/* This should be done when we abort the resync. We definitely do not
-		   want to have this for connections going back and forth between
-		   Ahead/Behind and SyncSource/SyncTarget */
-	}
-
 	if (side == C_SYNC_TARGET) {
 		/* Since application IO was locked out during C_WF_BITMAP_T and
 		   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
diff --git a/drivers/block/floppy.c b/drivers/block/floppy.c
index 553f43a90953cab40f421658ecb37fa9c20c8c54..a7d6347aaa7913b2a029014a95a2558d8360597e 100644
--- a/drivers/block/floppy.c
+++ b/drivers/block/floppy.c
@@ -191,6 +191,7 @@ static int print_unex = 1;
 #include <linux/mutex.h>
 #include <linux/io.h>
 #include <linux/uaccess.h>
+#include <linux/async.h>
 
 /*
  * PS/2 floppies have much slower step rates than regular floppies.
@@ -2516,8 +2517,7 @@ static int make_raw_rw_request(void)
 	set_fdc((long)current_req->rq_disk->private_data);
 
 	raw_cmd = &default_raw_cmd;
-	raw_cmd->flags = FD_RAW_SPIN | FD_RAW_NEED_DISK | FD_RAW_NEED_DISK |
-	    FD_RAW_NEED_SEEK;
+	raw_cmd->flags = FD_RAW_SPIN | FD_RAW_NEED_DISK | FD_RAW_NEED_SEEK;
 	raw_cmd->cmd_count = NR_RW;
 	if (rq_data_dir(current_req) == READ) {
 		raw_cmd->flags |= FD_RAW_READ;
@@ -4123,7 +4123,7 @@ static struct kobject *floppy_find(dev_t dev, int *part, void *data)
 	return get_disk(disks[drive]);
 }
 
-static int __init floppy_init(void)
+static int __init do_floppy_init(void)
 {
 	int i, unit, drive;
 	int err, dr;
@@ -4338,6 +4338,24 @@ static int __init floppy_init(void)
 	return err;
 }
 
+#ifndef MODULE
+static __init void floppy_async_init(void *data, async_cookie_t cookie)
+{
+	do_floppy_init();
+}
+#endif
+
+static int __init floppy_init(void)
+{
+#ifdef MODULE
+	return do_floppy_init();
+#else
+	/* Don't hold up the bootup by the floppy initialization */
+	async_schedule(floppy_async_init, NULL);
+	return 0;
+#endif
+}
+
 static const struct io_region {
 	int offset;
 	int size;
diff --git a/drivers/block/nbd.c b/drivers/block/nbd.c
index 76bc96fd01c85316813b5baeece41eeadb4b22ff..d07c9f7fded600d76192330ef37c35c2135c0fb5 100644
--- a/drivers/block/nbd.c
+++ b/drivers/block/nbd.c
@@ -485,7 +485,7 @@ static void nbd_handle_req(struct nbd_device *nbd, struct request *req)
 		nbd_end_request(req);
 	} else {
 		spin_lock(&nbd->queue_lock);
-		list_add(&req->queuelist, &nbd->queue_head);
+		list_add_tail(&req->queuelist, &nbd->queue_head);
 		spin_unlock(&nbd->queue_lock);
 	}
 
diff --git a/drivers/block/umem.c b/drivers/block/umem.c
index 9a72277a31df0cb131f879bb8a1503a65a57b7cc..eb0d8216f557434b36e6fbc809b75c33bbbc1292 100644
--- a/drivers/block/umem.c
+++ b/drivers/block/umem.c
@@ -513,42 +513,19 @@ static void process_page(unsigned long data)
 	}
 }
 
-struct mm_plug_cb {
-	struct blk_plug_cb cb;
-	struct cardinfo *card;
-};
-
-static void mm_unplug(struct blk_plug_cb *cb)
+static void mm_unplug(struct blk_plug_cb *cb, bool from_schedule)
 {
-	struct mm_plug_cb *mmcb = container_of(cb, struct mm_plug_cb, cb);
+	struct cardinfo *card = cb->data;
 
-	spin_lock_irq(&mmcb->card->lock);
-	activate(mmcb->card);
-	spin_unlock_irq(&mmcb->card->lock);
-	kfree(mmcb);
+	spin_lock_irq(&card->lock);
+	activate(card);
+	spin_unlock_irq(&card->lock);
+	kfree(cb);
 }
 
 static int mm_check_plugged(struct cardinfo *card)
 {
-	struct blk_plug *plug = current->plug;
-	struct mm_plug_cb *mmcb;
-
-	if (!plug)
-		return 0;
-
-	list_for_each_entry(mmcb, &plug->cb_list, cb.list) {
-		if (mmcb->cb.callback == mm_unplug && mmcb->card == card)
-			return 1;
-	}
-	/* Not currently on the callback list */
-	mmcb = kmalloc(sizeof(*mmcb), GFP_ATOMIC);
-	if (!mmcb)
-		return 0;
-
-	mmcb->card = card;
-	mmcb->cb.callback = mm_unplug;
-	list_add(&mmcb->cb.list, &plug->cb_list);
-	return 1;
+	return !!blk_check_plugged(mm_unplug, card, sizeof(struct blk_plug_cb));
 }
 
 static void mm_make_request(struct request_queue *q, struct bio *bio)
diff --git a/drivers/md/md.c b/drivers/md/md.c
index f6c46109b071310fdc0a28b89a916cd67cf69b9d..fcd098794d37d16c7b48b12c54b5ba6f5e6ba60e 100644
--- a/drivers/md/md.c
+++ b/drivers/md/md.c
@@ -498,61 +498,13 @@ void md_flush_request(struct mddev *mddev, struct bio *bio)
 }
 EXPORT_SYMBOL(md_flush_request);
 
-/* Support for plugging.
- * This mirrors the plugging support in request_queue, but does not
- * require having a whole queue or request structures.
- * We allocate an md_plug_cb for each md device and each thread it gets
- * plugged on.  This links tot the private plug_handle structure in the
- * personality data where we keep a count of the number of outstanding
- * plugs so other code can see if a plug is active.
- */
-struct md_plug_cb {
-	struct blk_plug_cb cb;
-	struct mddev *mddev;
-};
-
-static void plugger_unplug(struct blk_plug_cb *cb)
+void md_unplug(struct blk_plug_cb *cb, bool from_schedule)
 {
-	struct md_plug_cb *mdcb = container_of(cb, struct md_plug_cb, cb);
-	if (atomic_dec_and_test(&mdcb->mddev->plug_cnt))
-		md_wakeup_thread(mdcb->mddev->thread);
-	kfree(mdcb);
-}
-
-/* Check that an unplug wakeup will come shortly.
- * If not, wakeup the md thread immediately
- */
-int mddev_check_plugged(struct mddev *mddev)
-{
-	struct blk_plug *plug = current->plug;
-	struct md_plug_cb *mdcb;
-
-	if (!plug)
-		return 0;
-
-	list_for_each_entry(mdcb, &plug->cb_list, cb.list) {
-		if (mdcb->cb.callback == plugger_unplug &&
-		    mdcb->mddev == mddev) {
-			/* Already on the list, move to top */
-			if (mdcb != list_first_entry(&plug->cb_list,
-						    struct md_plug_cb,
-						    cb.list))
-				list_move(&mdcb->cb.list, &plug->cb_list);
-			return 1;
-		}
-	}
-	/* Not currently on the callback list */
-	mdcb = kmalloc(sizeof(*mdcb), GFP_ATOMIC);
-	if (!mdcb)
-		return 0;
-
-	mdcb->mddev = mddev;
-	mdcb->cb.callback = plugger_unplug;
-	atomic_inc(&mddev->plug_cnt);
-	list_add(&mdcb->cb.list, &plug->cb_list);
-	return 1;
+	struct mddev *mddev = cb->data;
+	md_wakeup_thread(mddev->thread);
+	kfree(cb);
 }
-EXPORT_SYMBOL_GPL(mddev_check_plugged);
+EXPORT_SYMBOL(md_unplug);
 
 static inline struct mddev *mddev_get(struct mddev *mddev)
 {
@@ -602,7 +554,6 @@ void mddev_init(struct mddev *mddev)
 	atomic_set(&mddev->active, 1);
 	atomic_set(&mddev->openers, 0);
 	atomic_set(&mddev->active_io, 0);
-	atomic_set(&mddev->plug_cnt, 0);
 	spin_lock_init(&mddev->write_lock);
 	atomic_set(&mddev->flush_pending, 0);
 	init_waitqueue_head(&mddev->sb_wait);
diff --git a/drivers/md/md.h b/drivers/md/md.h
index 7b4a3c318cae437ecb2041c33b2bdcee2c66bf15..f385b038589d32313014e07af203b8f742bec4a7 100644
--- a/drivers/md/md.h
+++ b/drivers/md/md.h
@@ -266,9 +266,6 @@ struct mddev {
 	int				new_chunk_sectors;
 	int				reshape_backwards;
 
-	atomic_t			plug_cnt;	/* If device is expecting
-							 * more bios soon.
-							 */
 	struct md_thread		*thread;	/* management thread */
 	struct md_thread		*sync_thread;	/* doing resync or reconstruct */
 	sector_t			curr_resync;	/* last block scheduled */
@@ -630,6 +627,12 @@ extern struct bio *bio_clone_mddev(struct bio *bio, gfp_t gfp_mask,
 				   struct mddev *mddev);
 extern struct bio *bio_alloc_mddev(gfp_t gfp_mask, int nr_iovecs,
 				   struct mddev *mddev);
-extern int mddev_check_plugged(struct mddev *mddev);
 extern void md_trim_bio(struct bio *bio, int offset, int size);
+
+extern void md_unplug(struct blk_plug_cb *cb, bool from_schedule);
+static inline int mddev_check_plugged(struct mddev *mddev)
+{
+	return !!blk_check_plugged(md_unplug, mddev,
+				   sizeof(struct blk_plug_cb));
+}
 #endif /* _MD_MD_H */
diff --git a/drivers/md/raid1.c b/drivers/md/raid1.c
index 197f62681db562bd38d643a115be92940c46b9a6..9f7f8bee84423f1a7dd35cc33bda9874407af06e 100644
--- a/drivers/md/raid1.c
+++ b/drivers/md/raid1.c
@@ -2247,8 +2247,7 @@ static void raid1d(struct mddev *mddev)
 	blk_start_plug(&plug);
 	for (;;) {
 
-		if (atomic_read(&mddev->plug_cnt) == 0)
-			flush_pending_writes(conf);
+		flush_pending_writes(conf);
 
 		spin_lock_irqsave(&conf->device_lock, flags);
 		if (list_empty(head)) {
diff --git a/drivers/md/raid10.c b/drivers/md/raid10.c
index e2549deab7c3a87c9f35c44499fa21e2f9f57b63..de5ed6fd8806c3c3db39bbd894fabb04734b4523 100644
--- a/drivers/md/raid10.c
+++ b/drivers/md/raid10.c
@@ -2680,8 +2680,7 @@ static void raid10d(struct mddev *mddev)
 	blk_start_plug(&plug);
 	for (;;) {
 
-		if (atomic_read(&mddev->plug_cnt) == 0)
-			flush_pending_writes(conf);
+		flush_pending_writes(conf);
 
 		spin_lock_irqsave(&conf->device_lock, flags);
 		if (list_empty(head)) {
diff --git a/drivers/md/raid5.c b/drivers/md/raid5.c
index 259f519814ca0f083696d875dc16fdc4bd141659..87a2d0bdedd1187a695a4d7f25a6d2e5bc2164fe 100644
--- a/drivers/md/raid5.c
+++ b/drivers/md/raid5.c
@@ -4562,7 +4562,7 @@ static void raid5d(struct mddev *mddev)
 	while (1) {
 		struct bio *bio;
 
-		if (atomic_read(&mddev->plug_cnt) == 0 &&
+		if (
 		    !list_empty(&conf->bitmap_list)) {
 			/* Now is a good time to flush some bitmap updates */
 			conf->seq_flush++;
@@ -4572,8 +4572,7 @@ static void raid5d(struct mddev *mddev)
 			conf->seq_write = conf->seq_flush;
 			activate_bit_delay(conf);
 		}
-		if (atomic_read(&mddev->plug_cnt) == 0)
-			raid5_activate_delayed(conf);
+		raid5_activate_delayed(conf);
 
 		while ((bio = remove_bio_from_retry(conf))) {
 			int ok;
diff --git a/include/linux/blkdev.h b/include/linux/blkdev.h
index 3816ce8a08fc44aea59438e0ca1c7d1a897aaa7e..4e72a9d48232d513b5b2fe44d973de8dfeb1e9e4 100644
--- a/include/linux/blkdev.h
+++ b/include/linux/blkdev.h
@@ -922,11 +922,15 @@ struct blk_plug {
 };
 #define BLK_MAX_REQUEST_COUNT 16
 
+struct blk_plug_cb;
+typedef void (*blk_plug_cb_fn)(struct blk_plug_cb *, bool);
 struct blk_plug_cb {
 	struct list_head list;
-	void (*callback)(struct blk_plug_cb *);
+	blk_plug_cb_fn callback;
+	void *data;
 };
-
+extern struct blk_plug_cb *blk_check_plugged(blk_plug_cb_fn unplug,
+					     void *data, int size);
 extern void blk_start_plug(struct blk_plug *);
 extern void blk_finish_plug(struct blk_plug *);
 extern void blk_flush_plug_list(struct blk_plug *, bool);