From 488105ebd200bbd1f6d7cbe863412e41d9ab4221 Mon Sep 17 00:00:00 2001 From: Olatunji Ruwase Date: Fri, 12 Nov 2021 09:26:47 -0800 Subject: [PATCH] Fix zinf none swapper (#1550) --- deepspeed/runtime/engine.py | 1 + .../swap_tensor/partitioned_param_swapper.py | 3 + deepspeed/runtime/zero/stage3.py | 91 +++++++++++-------- 3 files changed, 59 insertions(+), 36 deletions(-) diff --git a/deepspeed/runtime/engine.py b/deepspeed/runtime/engine.py index 2fcc764c..89bfddcb 100755 --- a/deepspeed/runtime/engine.py +++ b/deepspeed/runtime/engine.py @@ -1153,6 +1153,7 @@ class DeepSpeedEngine(Module): self.module, optimizer, timers=timers, + ds_config=self.config, static_loss_scale=self.loss_scale(), dynamic_loss_scale=self.dynamic_loss_scale(), dynamic_loss_args=self.dynamic_loss_scale_args(), diff --git a/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py b/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py index 1188ed68..6d1af7bb 100644 --- a/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py +++ b/deepspeed/runtime/swap_tensor/partitioned_param_swapper.py @@ -414,3 +414,6 @@ class AsyncPartitionedParameterSwapper(object): dst_fp16_params[i].ds_tensor.status = PartitionedParamStatus.AVAILABLE self.partitioned_swap_pool.swap_out(self.aio_write_handle) + + for param in dst_fp16_params: + param.ds_tensor.status = PartitionedParamStatus.NOT_AVAILABLE diff --git a/deepspeed/runtime/zero/stage3.py b/deepspeed/runtime/zero/stage3.py index 221c5e51..852b619a 100755 --- a/deepspeed/runtime/zero/stage3.py +++ b/deepspeed/runtime/zero/stage3.py @@ -607,6 +607,7 @@ class FP16_DeepSpeedZeroOptimizer_Stage3(object): module, init_optimizer, timers, + ds_config, static_loss_scale=1.0, dynamic_loss_scale=False, dynamic_loss_args=None, @@ -657,7 +658,20 @@ class FP16_DeepSpeedZeroOptimizer_Stage3(object): self.dtype = self.optimizer.param_groups[0]['params'][0].dtype self._global_grad_norm = 0. - self._convert_to_zero_parameters(module, mpu) + self.optimizer_swapper = None + self.swap_optimizer = False + + self.offload_optimizer = False + self.offload_optimizer_pin_memory = False + self.offload_optimizer_fast_init = False + self.offload_param = False + self.offload_param_pin_memory = False + self.params_in_nvme_and_cpu = False + self.max_params_in_cpu = 0 + + self._configure_offloading(offload_optimizer_config, offload_param_config) + + self._convert_to_zero_parameters(ds_config, module, mpu) for m in module.modules(): _init_external_params(m) @@ -673,39 +687,6 @@ class FP16_DeepSpeedZeroOptimizer_Stage3(object): if self.overlap_comm: self.gpu_sum = torch.zeros(1, dtype=torch.float).cuda() - ###################### offload optimizer setup ################################## - self.optimizer_swapper = None - self.swap_optimizer = False - - self.offload_optimizer = False - self.offload_optimizer_pin_memory = False - self.offload_optimizer_fast_init = False - if offload_optimizer_config is not None: - self.offload_optimizer = True - self.offload_optimizer_pin_memory = offload_optimizer_config[ - OFFLOAD_OPTIMIZER_PIN_MEMORY] - self.swap_optimizer = offload_optimizer_config[ - OFFLOAD_OPTIMIZER_DEVICE] == OFFLOAD_NVME_DEVICE - self.offload_optimizer_fast_init = offload_optimizer_config[ - OFFLOAD_OPTIMIZER_FAST_INIT] - - ###################### offload param setup ################################## - self.offload_param = False - self.offload_param_pin_memory = False - self.params_in_nvme_and_cpu = False - self.max_params_in_cpu = 0 - if offload_param_config is not None: - assert self.offload_optimizer, "parameter offload is only available with optimizer state offload" - self.offload_param = True - self.offload_param_pin_memory = offload_param_config[ - OFFLOAD_PARAM_PIN_MEMORY] - self.params_in_nvme_and_cpu = offload_param_config[ - OFFLOAD_PARAM_DEVICE] == OFFLOAD_NVME_DEVICE - self.max_params_in_cpu = offload_param_config[OFFLOAD_PARAM_MAX_IN_CPU] - print_rank_0( - f"FP16 params swapping is {self.params_in_nvme_and_cpu}, Max params in CPU is {self.max_params_in_cpu}", - force=False) - self.deepspeed_adam_offload = (self.offload_optimizer and type(init_optimizer) == DeepSpeedCPUAdam) @@ -926,7 +907,31 @@ class FP16_DeepSpeedZeroOptimizer_Stage3(object): if dist.get_rank(group=self.dp_process_group) == 0: see_memory_usage(f"After initializing ZeRO optimizer", force=False) - def _convert_to_zero_parameters(self, module, mpu): + def _configure_offloading(self, offload_optimizer_config, offload_param_config): + ###################### offload optimizer setup ################################## + if offload_optimizer_config is not None: + self.offload_optimizer = True + self.offload_optimizer_pin_memory = offload_optimizer_config[ + OFFLOAD_OPTIMIZER_PIN_MEMORY] + self.swap_optimizer = offload_optimizer_config[ + OFFLOAD_OPTIMIZER_DEVICE] == OFFLOAD_NVME_DEVICE + self.offload_optimizer_fast_init = offload_optimizer_config[ + OFFLOAD_OPTIMIZER_FAST_INIT] + + ###################### offload param setup ################################## + if offload_param_config is not None: + assert self.offload_optimizer, "parameter offload is only available with optimizer state offload" + self.offload_param = True + self.offload_param_pin_memory = offload_param_config[ + OFFLOAD_PARAM_PIN_MEMORY] + self.params_in_nvme_and_cpu = offload_param_config[ + OFFLOAD_PARAM_DEVICE] == OFFLOAD_NVME_DEVICE + self.max_params_in_cpu = offload_param_config[OFFLOAD_PARAM_MAX_IN_CPU] + print_rank_0( + f"FP16 params swapping is {self.params_in_nvme_and_cpu}, Max params in CPU is {self.max_params_in_cpu}", + force=False) + + def _convert_to_zero_parameters(self, ds_config, module, mpu): non_zero_params = [p for p in module.parameters() if not is_zero_param(p)] if non_zero_params: zero_params = [p for p in module.parameters() if is_zero_param(p)] @@ -936,7 +941,21 @@ class FP16_DeepSpeedZeroOptimizer_Stage3(object): group = None if mpu: group = mpu.get_data_parallel_group() - Init(module=module, data_parallel_group=group, dtype=self.dtype) + + if self.params_in_nvme_and_cpu: + remote_device = OFFLOAD_NVME_DEVICE + elif self.offload_param: + remote_device = OFFLOAD_CPU_DEVICE + else: + remote_device = None + + Init(module=module, + data_parallel_group=group, + dtype=self.dtype, + config_dict_or_path=ds_config, + remote_device=remote_device, + pin_memory=self.offload_param_pin_memory, + mpu=mpu) def _configure_tensor_swapping(self, offload_optimizer_config, aio_config): nvme_swap_folder = os.path.join( -- GitLab