diff --git a/arch_init.c b/arch_init.c index 179c58c8523b0159c1d769705bf4f10299f184e0..874948195a8075c2ee7593fb39ae397ab035dd47 100644 --- a/arch_init.c +++ b/arch_init.c @@ -24,6 +24,7 @@ #include #include #include +#include #ifndef _WIN32 #include #include @@ -127,6 +128,7 @@ static uint64_t bitmap_sync_count; #define RAM_SAVE_FLAG_CONTINUE 0x20 #define RAM_SAVE_FLAG_XBZRLE 0x40 /* 0x80 is reserved in migration.h start with 0x100 next */ +#define RAM_SAVE_FLAG_COMPRESS_PAGE 0x100 static struct defconfig_file { const char *filename; @@ -321,9 +323,18 @@ struct CompressParam { }; typedef struct CompressParam CompressParam; +struct DecompressParam { + /* To be done */ +}; +typedef struct DecompressParam DecompressParam; + static CompressParam *comp_param; static QemuThread *compress_threads; static bool quit_comp_thread; +static bool quit_decomp_thread; +static DecompressParam *decomp_param; +static QemuThread *decompress_threads; +static uint8_t *compressed_data_buf; static void *do_data_compress(void *opaque) { @@ -1203,10 +1214,59 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size) } } +static void *do_data_decompress(void *opaque) +{ + while (!quit_decomp_thread) { + /* To be done */ + } + + return NULL; +} + +void migrate_decompress_threads_create(void) +{ + int i, thread_count; + + thread_count = migrate_decompress_threads(); + decompress_threads = g_new0(QemuThread, thread_count); + decomp_param = g_new0(DecompressParam, thread_count); + compressed_data_buf = g_malloc0(compressBound(TARGET_PAGE_SIZE)); + quit_decomp_thread = false; + for (i = 0; i < thread_count; i++) { + qemu_thread_create(decompress_threads + i, "decompress", + do_data_decompress, decomp_param + i, + QEMU_THREAD_JOINABLE); + } +} + +void migrate_decompress_threads_join(void) +{ + int i, thread_count; + + quit_decomp_thread = true; + thread_count = migrate_decompress_threads(); + for (i = 0; i < thread_count; i++) { + qemu_thread_join(decompress_threads + i); + } + g_free(decompress_threads); + g_free(decomp_param); + g_free(compressed_data_buf); + decompress_threads = NULL; + decomp_param = NULL; + compressed_data_buf = NULL; +} + +static void decompress_data_with_multi_threads(uint8_t *compbuf, + void *host, int len) +{ + /* To be done */ +} + static int ram_load(QEMUFile *f, void *opaque, int version_id) { int flags = 0, ret = 0; static uint64_t seq_iter; + int len = 0; seq_iter++; @@ -1286,6 +1346,23 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id) } qemu_get_buffer(f, host, TARGET_PAGE_SIZE); break; + case RAM_SAVE_FLAG_COMPRESS_PAGE: + host = host_from_stream_offset(f, addr, flags); + if (!host) { + error_report("Invalid RAM offset " RAM_ADDR_FMT, addr); + ret = -EINVAL; + break; + } + + len = qemu_get_be32(f); + if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) { + error_report("Invalid compressed data length: %d", len); + ret = -EINVAL; + break; + } + qemu_get_buffer(f, compressed_data_buf, len); + decompress_data_with_multi_threads(compressed_data_buf, host, len); + break; case RAM_SAVE_FLAG_XBZRLE: host = host_from_stream_offset(f, addr, flags); if (!host) { diff --git a/include/migration/migration.h b/include/migration/migration.h index a3ebbf6c6e0be3898cf4b086a2b331b4b23a8d00..d4a10627cd9a07129b31778de7d099b786a74ec8 100644 --- a/include/migration/migration.h +++ b/include/migration/migration.h @@ -51,6 +51,7 @@ struct MigrationState QEMUBH *cleanup_bh; QEMUFile *file; int compress_thread_count; + int decompress_thread_count; int compress_level; int state; @@ -108,6 +109,8 @@ MigrationState *migrate_get_current(void); void migrate_compress_threads_create(void); void migrate_compress_threads_join(void); +void migrate_decompress_threads_create(void); +void migrate_decompress_threads_join(void); uint64_t ram_bytes_remaining(void); uint64_t ram_bytes_transferred(void); uint64_t ram_bytes_total(void); @@ -159,6 +162,7 @@ int64_t xbzrle_cache_resize(int64_t new_size); bool migrate_use_compression(void); int migrate_compress_level(void); int migrate_compress_threads(void); +int migrate_decompress_threads(void); void ram_control_before_iterate(QEMUFile *f, uint64_t flags); void ram_control_after_iterate(QEMUFile *f, uint64_t flags); diff --git a/migration/migration.c b/migration/migration.c index 5a8b5a7c74d439af773db8bcc845a984f7d21cec..19409e6790a8bbb42822a0ed0660ca86fe516deb 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -35,6 +35,9 @@ /* Default compression thread count */ #define DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT 8 +/* Default decompression thread count, usually decompression is at + * least 4 times as fast as compression.*/ +#define DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT 2 /*0: means nocompress, 1: best speed, ... 9: best compress ratio */ #define DEFAULT_MIGRATE_COMPRESS_LEVEL 1 @@ -58,6 +61,7 @@ MigrationState *migrate_get_current(void) .xbzrle_cache_size = DEFAULT_MIGRATE_CACHE_SIZE, .mbps = -1, .compress_thread_count = DEFAULT_MIGRATE_COMPRESS_THREAD_COUNT, + .decompress_thread_count = DEFAULT_MIGRATE_DECOMPRESS_THREAD_COUNT, .compress_level = DEFAULT_MIGRATE_COMPRESS_LEVEL, }; @@ -113,6 +117,7 @@ static void process_incoming_migration_co(void *opaque) free_xbzrle_decoded_buf(); if (ret < 0) { error_report("load of migration failed: %s", strerror(-ret)); + migrate_decompress_threads_join(); exit(EXIT_FAILURE); } qemu_announce_self(); @@ -121,6 +126,7 @@ static void process_incoming_migration_co(void *opaque) bdrv_invalidate_cache_all(&local_err); if (local_err) { error_report_err(local_err); + migrate_decompress_threads_join(); exit(EXIT_FAILURE); } @@ -129,6 +135,7 @@ static void process_incoming_migration_co(void *opaque) } else { runstate_set(RUN_STATE_PAUSED); } + migrate_decompress_threads_join(); } void process_incoming_migration(QEMUFile *f) @@ -137,6 +144,7 @@ void process_incoming_migration(QEMUFile *f) int fd = qemu_get_fd(f); assert(fd != -1); + migrate_decompress_threads_create(); qemu_set_nonblock(fd); qemu_coroutine_enter(co, f); } @@ -400,6 +408,7 @@ static MigrationState *migrate_init(const MigrationParams *params) int64_t xbzrle_cache_size = s->xbzrle_cache_size; int compress_level = s->compress_level; int compress_thread_count = s->compress_thread_count; + int decompress_thread_count = s->decompress_thread_count; memcpy(enabled_capabilities, s->enabled_capabilities, sizeof(enabled_capabilities)); @@ -412,6 +421,7 @@ static MigrationState *migrate_init(const MigrationParams *params) s->compress_level = compress_level; s->compress_thread_count = compress_thread_count; + s->decompress_thread_count = decompress_thread_count; s->bandwidth_limit = bandwidth_limit; s->state = MIGRATION_STATUS_SETUP; trace_migrate_set_state(MIGRATION_STATUS_SETUP); @@ -623,6 +633,15 @@ int migrate_compress_threads(void) return s->compress_thread_count; } +int migrate_decompress_threads(void) +{ + MigrationState *s; + + s = migrate_get_current(); + + return s->decompress_thread_count; +} + int migrate_use_xbzrle(void) { MigrationState *s;