diff --git a/drivers/md/dm-core.h b/drivers/md/dm-core.h
index 33ef92e904623927fd37e17e794425e1ebaa7239..8078b6c155eff5aed621389991122edd3de09b1d 100644
--- a/drivers/md/dm-core.h
+++ b/drivers/md/dm-core.h
@@ -232,6 +232,8 @@ struct dm_io {
 	struct mapped_device *md;
 	struct bio *orig_bio;
 	blk_status_t status;
+	bool start_io_acct:1;
+	int was_accounted;
 	unsigned long start_time;
 	spinlock_t endio_lock;
 	struct dm_stats_aux stats_aux;
diff --git a/drivers/md/dm.c b/drivers/md/dm.c
index d1c618c3f6c63dc03caf2c626b135168da0427ce..082366d0ad4994390dd104333788d2cfeba7a478 100644
--- a/drivers/md/dm.c
+++ b/drivers/md/dm.c
@@ -518,14 +518,33 @@ static void dm_io_acct(bool end, struct mapped_device *md, struct bio *bio,
 		bio->bi_iter.bi_size = bi_size;
 }
 
-static void dm_start_io_acct(struct dm_io *io)
+static void __dm_start_io_acct(struct dm_io *io, struct bio *bio)
 {
-	dm_io_acct(false, io->md, io->orig_bio, io->start_time, &io->stats_aux);
+	dm_io_acct(false, io->md, bio, io->start_time, &io->stats_aux);
 }
 
-static void dm_end_io_acct(struct dm_io *io)
+static void dm_start_io_acct(struct dm_io *io, struct bio *clone)
 {
-	dm_io_acct(true, io->md, io->orig_bio, io->start_time, &io->stats_aux);
+	/* Must account IO to DM device in terms of orig_bio */
+	struct bio *bio = io->orig_bio;
+
+	/*
+	 * Ensure IO accounting is only ever started once.
+	 * Expect no possibility for race unless is_duplicate_bio.
+	 */
+	if (!clone || likely(!clone_to_tio(clone)->is_duplicate_bio)) {
+		if (WARN_ON(io->was_accounted))
+			return;
+		io->was_accounted = 1;
+	} else if (xchg(&io->was_accounted, 1) == 1)
+		return;
+
+	__dm_start_io_acct(io, bio);
+}
+
+static void dm_end_io_acct(struct dm_io *io, struct bio *bio)
+{
+	dm_io_acct(true, io->md, bio, io->start_time, &io->stats_aux);
 }
 
 static struct dm_io *alloc_io(struct mapped_device *md, struct bio *bio)
@@ -545,11 +564,13 @@ static struct dm_io *alloc_io(struct mapped_device *md, struct bio *bio)
 	io->status = 0;
 	atomic_set(&io->io_count, 1);
 	this_cpu_inc(*md->pending_io);
-	io->orig_bio = bio;
+	io->orig_bio = NULL;
 	io->md = md;
 	spin_lock_init(&io->endio_lock);
 
 	io->start_time = jiffies;
+	io->start_io_acct = false;
+	io->was_accounted = 0;
 
 	dm_stats_record_start(&md->stats, &io->stats_aux);
 
@@ -849,7 +870,16 @@ void dm_io_dec_pending(struct dm_io *io, blk_status_t error)
 		}
 
 		io_error = io->status;
-		dm_end_io_acct(io);
+		if (io->was_accounted)
+			dm_end_io_acct(io, bio);
+		else if (!io_error) {
+			/*
+			 * Must handle target that DM_MAPIO_SUBMITTED only to
+			 * then bio_endio() rather than dm_submit_bio_remap()
+			 */
+			__dm_start_io_acct(io, bio);
+			dm_end_io_acct(io, bio);
+		}
 		free_io(io);
 		smp_wmb();
 		this_cpu_dec(*md->pending_io);
@@ -1131,6 +1161,56 @@ void dm_accept_partial_bio(struct bio *bio, unsigned n_sectors)
 }
 EXPORT_SYMBOL_GPL(dm_accept_partial_bio);
 
+static inline void __dm_submit_bio_remap(struct bio *clone,
+					 dev_t dev, sector_t old_sector)
+{
+	trace_block_bio_remap(clone, dev, old_sector);
+	submit_bio_noacct(clone);
+}
+
+/*
+ * @clone: clone bio that DM core passed to target's .map function
+ * @tgt_clone: clone of @clone bio that target needs submitted
+ * @from_wq: caller is a workqueue thread managed by DM target
+ *
+ * Targets should use this interface to submit bios they take
+ * ownership of when returning DM_MAPIO_SUBMITTED.
+ *
+ * Target should also enable ti->accounts_remapped_io
+ */
+void dm_submit_bio_remap(struct bio *clone, struct bio *tgt_clone,
+			 bool from_wq)
+{
+	struct dm_target_io *tio = clone_to_tio(clone);
+	struct dm_io *io = tio->io;
+
+	/* establish bio that will get submitted */
+	if (!tgt_clone)
+		tgt_clone = clone;
+
+	/*
+	 * Account io->origin_bio to DM dev on behalf of target
+	 * that took ownership of IO with DM_MAPIO_SUBMITTED.
+	 */
+	if (!from_wq) {
+		/* Still in target's map function */
+		io->start_io_acct = true;
+	} else {
+		/*
+		 * Called by another thread, managed by DM target,
+		 * wait for dm_split_and_process_bio() to store
+		 * io->orig_bio
+		 */
+		while (unlikely(!smp_load_acquire(&io->orig_bio)))
+			msleep(1);
+		dm_start_io_acct(io, clone);
+	}
+
+	__dm_submit_bio_remap(tgt_clone, disk_devt(io->md->disk),
+			      tio->old_sector);
+}
+EXPORT_SYMBOL_GPL(dm_submit_bio_remap);
+
 static noinline void __set_swap_bios_limit(struct mapped_device *md, int latch)
 {
 	mutex_lock(&md->swap_bios_lock);
@@ -1157,9 +1237,7 @@ static void __map_bio(struct bio *clone)
 	clone->bi_end_io = clone_endio;
 
 	/*
-	 * Map the clone.  If r == 0 we don't need to do
-	 * anything, the target has assumed ownership of
-	 * this io.
+	 * Map the clone.
 	 */
 	dm_io_inc_pending(io);
 	tio->old_sector = clone->bi_iter.bi_sector;
@@ -1184,12 +1262,18 @@ static void __map_bio(struct bio *clone)
 
 	switch (r) {
 	case DM_MAPIO_SUBMITTED:
+		/* target has assumed ownership of this io */
+		if (!ti->accounts_remapped_io)
+			io->start_io_acct = true;
 		break;
 	case DM_MAPIO_REMAPPED:
-		/* the bio has been remapped so dispatch it */
-		trace_block_bio_remap(clone, bio_dev(io->orig_bio),
+		/*
+		 * the bio has been remapped so dispatch it, but defer
+		 * dm_start_io_acct() until after possible bio_split().
+		 */
+		__dm_submit_bio_remap(clone, disk_devt(io->md->disk),
 				      tio->old_sector);
-		submit_bio_noacct(clone);
+		io->start_io_acct = true;
 		break;
 	case DM_MAPIO_KILL:
 	case DM_MAPIO_REQUEUE:
@@ -1404,7 +1488,7 @@ static void dm_split_and_process_bio(struct mapped_device *md,
 				     struct dm_table *map, struct bio *bio)
 {
 	struct clone_info ci;
-	struct bio *b;
+	struct bio *orig_bio = NULL;
 	int error = 0;
 
 	init_clone_info(&ci, md, map, bio);
@@ -1426,15 +1510,18 @@ static void dm_split_and_process_bio(struct mapped_device *md,
 	 * used by dm_end_io_acct() and for dm_io_dec_pending() to use for
 	 * completion handling.
 	 */
-	b = bio_split(bio, bio_sectors(bio) - ci.sector_count,
-		      GFP_NOIO, &md->queue->bio_split);
-	ci.io->orig_bio = b;
-
-	bio_chain(b, bio);
-	trace_block_split(b, bio->bi_iter.bi_sector);
+	orig_bio = bio_split(bio, bio_sectors(bio) - ci.sector_count,
+			     GFP_NOIO, &md->queue->bio_split);
+	bio_chain(orig_bio, bio);
+	trace_block_split(orig_bio, bio->bi_iter.bi_sector);
 	submit_bio_noacct(bio);
 out:
-	dm_start_io_acct(ci.io);
+	if (!orig_bio)
+		orig_bio = bio;
+	smp_store_release(&ci.io->orig_bio, orig_bio);
+	if (ci.io->start_io_acct)
+		dm_start_io_acct(ci.io, NULL);
+
 	/* drop the extra reference count */
 	dm_io_dec_pending(ci.io, errno_to_blk_status(error));
 }
diff --git a/include/linux/device-mapper.h b/include/linux/device-mapper.h
index b26fecf6c8e87ad571590a1ab3a96712f1edc756..7752d14d13f8ec7ba3c275b6aa00b283bd27d049 100644
--- a/include/linux/device-mapper.h
+++ b/include/linux/device-mapper.h
@@ -362,6 +362,12 @@ struct dm_target {
 	 * zone append operations using regular writes.
 	 */
 	bool emulate_zone_append:1;
+
+	/*
+	 * Set if the target will submit IO using dm_submit_bio_remap()
+	 * after returning DM_MAPIO_SUBMITTED from its map function.
+	 */
+	bool accounts_remapped_io:1;
 };
 
 void *dm_per_bio_data(struct bio *bio, size_t data_size);
@@ -465,6 +471,7 @@ int dm_suspended(struct dm_target *ti);
 int dm_post_suspending(struct dm_target *ti);
 int dm_noflush_suspending(struct dm_target *ti);
 void dm_accept_partial_bio(struct bio *bio, unsigned n_sectors);
+void dm_submit_bio_remap(struct bio *clone, struct bio *tgt_clone, bool from_wq);
 union map_info *dm_get_rq_mapinfo(struct request *rq);
 
 #ifdef CONFIG_BLK_DEV_ZONED
diff --git a/include/uapi/linux/dm-ioctl.h b/include/uapi/linux/dm-ioctl.h
index c12ce30b52dfc86c2288c7da694ea961936c4ec1..2e9550fef90fad986d3f518ef176b52c6e4df460 100644
--- a/include/uapi/linux/dm-ioctl.h
+++ b/include/uapi/linux/dm-ioctl.h
@@ -286,9 +286,9 @@ enum {
 #define DM_DEV_SET_GEOMETRY	_IOWR(DM_IOCTL, DM_DEV_SET_GEOMETRY_CMD, struct dm_ioctl)
 
 #define DM_VERSION_MAJOR	4
-#define DM_VERSION_MINOR	45
+#define DM_VERSION_MINOR	46
 #define DM_VERSION_PATCHLEVEL	0
-#define DM_VERSION_EXTRA	"-ioctl (2021-03-22)"
+#define DM_VERSION_EXTRA	"-ioctl (2022-02-22)"
 
 /* Status bits */
 #define DM_READONLY_FLAG	(1 << 0) /* In/Out */