未验证 提交 cfa63f5d 编写于 作者: J Jeff Rasley 提交者: GitHub

ZeRO stage 1 refresh (#1042)

上级 d88d9279
......@@ -3,6 +3,7 @@ Copyright 2020 The Microsoft DeepSpeed Team
'''
import sys
import types
import packaging
from . import ops
......@@ -25,9 +26,8 @@ from .git_version_info import version, git_hash, git_branch
def _parse_version(version_str):
'''Parse a version string and extract the major, minor, and patch versions.'''
import re
matched = re.search('^(\d+)\.(\d+)\.(\d+)', version_str)
return int(matched.group(1)), int(matched.group(2)), int(matched.group(3))
ver = packaging.version.parse(version_str)
return ver.major, ver.minor, ver.micro
# Export version information
......
......@@ -766,12 +766,7 @@ class DeepSpeedConfig(object):
GRADIENT_ACCUMULATION_STEPS)
if self.zero_enabled:
if self.zero_optimization_stage < ZERO_OPTIMIZATION_GRADIENTS:
assert self.fp16_enabled, "DeepSpeedConfig: ZeRO is only supported if fp16 is enabled"
assert self.zero_optimization_stage <= MAX_STAGE_ZERO_OPTIMIZATION, "DeepSpeedConfig: Maximum supported ZeRO stage is {}".format(MAX_STAGE_ZERO_OPTIMIZATION)
#if self.zero_config.cpu_offload is True:
# assert self.zero_optimization_stage == ZERO_OPTIMIZATION_GRADIENTS, "DeepSpeedConfig: cpu-offload supported ZeRO stage is {}".format(ZERO_OPTIMIZATION_GRADIENTS)
#assert self.gradient_accumulation_steps == 1, "DeepSpeedConfig: {}is not supported for {}".format(GRADIENT_ACCUMULATION_STEPS, ZERO_OPTIMIZATION_CPU_OFFLOAD)
def _do_warning_check(self):
fp16_enabled = self.fp16_enabled or self.zero_enabled
......
......@@ -44,6 +44,7 @@ from .utils import ensure_directory_exists
from ..ops.op_builder import UtilsBuilder
from ..ops.adam import DeepSpeedCPUAdam
from ..ops.adam import FusedAdam
from ..git_version_info import version
from deepspeed.profiling.flops_profiler.profiler import FlopsProfiler
......@@ -148,6 +149,8 @@ class DeepSpeedEngine(Module):
# Configure distributed model
self._configure_distributed_model(model)
self.pipeline_parallelism = isinstance(self.module, PipelineModule)
see_memory_usage(f"DeepSpeed Engine: After configure distributed model")
# Configure wall clock timer
......@@ -390,6 +393,12 @@ class DeepSpeedEngine(Module):
def zero_ignore_unused_parameters(self):
return self._config.zero_config.ignore_unused_parameters
def zero_grad_hooks(self):
return self._config.zero_config.grad_hooks
def zero_legacy_stage1(self):
return self._config.zero_config.legacy_stage1
def fp16_enabled(self):
return self._config.fp16_enabled
......@@ -780,7 +789,8 @@ class DeepSpeedEngine(Module):
assert not self.allreduce_always_fp32(), "ZeRO does not support 'fp32_allreduce': true"
timers = self.timers if self.wall_clock_breakdown() else None
if zero_stage == ZERO_OPTIMIZATION_OPTIMIZER_STATES:
if self.zero_legacy_stage1(
) and zero_stage == ZERO_OPTIMIZATION_OPTIMIZER_STATES:
optimizer = FP16_DeepSpeedZeroOptimizer_Stage1(
optimizer,
static_loss_scale=self.loss_scale(),
......@@ -792,8 +802,19 @@ class DeepSpeedEngine(Module):
max_elements_per_comm=self.zero_reduce_bucket_size(),
dp_process_group=self.data_parallel_group,
elastic_checkpoint=self.zero_elastic_checkpoint(),
mpu=self.mpu)
elif zero_stage == ZERO_OPTIMIZATION_GRADIENTS:
mpu=self.mpu,
postscale_gradients=self.postscale_gradients(),
gradient_predivide_factor=self.gradient_predivide_factor(),
gradient_predivide=self.gradient_predivide)
elif zero_stage <= ZERO_OPTIMIZATION_GRADIENTS:
overlap_comm = self.zero_overlap_comm()
if isinstance(self.module, PipelineModule):
if overlap_comm:
logger.warning(
"Pipeline parallelism does not support overlapped communication, will be disabled."
)
overlap_comm = False
optimizer = FP16_DeepSpeedZeroOptimizer(
optimizer,
timers=timers,
......@@ -806,13 +827,14 @@ class DeepSpeedEngine(Module):
allgather_bucket_size=self.zero_allgather_bucket_size(),
dp_process_group=self.data_parallel_group,
reduce_scatter=self.zero_reduce_scatter(),
overlap_comm=self.zero_overlap_comm(),
overlap_comm=overlap_comm,
cpu_offload=self.zero_cpu_offload(),
mpu=self.mpu,
postscale_gradients=self.postscale_gradients(),
gradient_predivide_factor=self.gradient_predivide_factor(),
gradient_accumulation_steps=self.gradient_accumulation_steps(),
ignore_unused_parameters=self.zero_ignore_unused_parameters())
ignore_unused_parameters=self.zero_ignore_unused_parameters(),
partition_grads=zero_stage == ZERO_OPTIMIZATION_GRADIENTS)
elif zero_stage == ZERO_OPTIMIZATION_WEIGHTS:
print("Initializing ZeRO Stage 3") if dist.get_rank() == 0 else None
from deepspeed.runtime.zero.stage3 import FP16_DeepSpeedZeroOptimizer_Stage3
......@@ -999,18 +1021,15 @@ class DeepSpeedEngine(Module):
return loss
def allreduce_gradients(self, bucket_size=MEMORY_OPT_ALLREDUCE_SIZE):
#Zero stage 2 communicates during non gradient accumulation boundaries as well
# ZeRO stage 2 communicates during non gradient accumulation boundaries as well
if self.zero_optimization_partition_gradients():
self.optimizer.overlapping_partition_gradients_reduce_epilogue()
#Communicate only at gradient accumulation boundaries
# Communicate only at gradient accumulation boundaries
elif self.is_gradient_accumulation_boundary():
if self.zero_optimization_stage(
) == ZERO_OPTIMIZATION_OPTIMIZER_STATES and self.zero_reduce_scatter():
self.optimizer.reduce_scatter_gradients(
postscale_gradients=self.postscale_gradients(),
gradient_predivide_factor=self.gradient_predivide_factor(),
gradient_average=self.gradient_average)
if self.zero_optimization_stage() == ZERO_OPTIMIZATION_OPTIMIZER_STATES:
self.optimizer.reduce_gradients(
pipeline_parallel=self.pipeline_parallelism)
else:
self.buffered_allreduce_fallback(elements_per_buffer=bucket_size)
......@@ -1731,19 +1750,19 @@ class DeepSpeedEngine(Module):
# then instead just returns None.
self._curr_ckpt_path = os.path.join(save_dir, tag)
state = dict(
module=self.module_state_dict(),
optimizer=self.optimizer.state_dict()
if self.optimizer and not self.zero_optimization() else None,
lr_scheduler=self.lr_scheduler.state_dict()
if self.lr_scheduler is not None else None,
csr_tensor_module_names=self.csr_tensor_module_names,
skipped_steps=self.skipped_steps,
global_steps=self.global_steps,
global_samples=self.global_samples,
dp_world_size=self.dp_world_size,
mp_world_size=self.mp_world_size,
)
state = dict(module=self.module_state_dict(),
optimizer=self.optimizer.state_dict()
if self.optimizer and not self.zero_optimization() else None,
lr_scheduler=self.lr_scheduler.state_dict()
if self.lr_scheduler is not None else None,
csr_tensor_module_names=self.csr_tensor_module_names,
skipped_steps=self.skipped_steps,
global_steps=self.global_steps,
global_samples=self.global_samples,
dp_world_size=self.dp_world_size,
mp_world_size=self.mp_world_size,
ds_config=self.config,
ds_version=version)
state.update(client_state)
log_dist(message=f'Saving model checkpoint: {save_path}', ranks=[0])
......@@ -1771,10 +1790,10 @@ class DeepSpeedEngine(Module):
def _save_zero_checkpoint(self, save_path, tag):
zero_checkpoint_name = self._get_zero_ckpt_name(save_path, tag)
zero_sd = dict(
optimizer_state_dict=self.optimizer.state_dict(),
param_shapes=self._get_param_shapes(),
)
zero_sd = dict(optimizer_state_dict=self.optimizer.state_dict(),
param_shapes=self._get_param_shapes(),
ds_config=self.config,
ds_version=version)
torch.save(zero_sd, zero_checkpoint_name)
self._copy_recovery_script(save_path)
logger.info('zero checkpoint saved {}'.format(zero_checkpoint_name))
......
......@@ -226,9 +226,8 @@ class PipelineEngine(DeepSpeedEngine):
def _exec_reduce_grads(self):
self._force_grad_boundary = True
if self.is_data_parallel and self.pipeline_enable_backward_allreduce:
self.buffered_allreduce_fallback(
elements_per_buffer=MEMORY_OPT_ALLREDUCE_SIZE)
if self.pipeline_enable_backward_allreduce:
self.allreduce_gradients(bucket_size=MEMORY_OPT_ALLREDUCE_SIZE)
self._force_grad_boundary = False
def _reserve_pipe_buffers(self, num_buffers):
......
"""
Copyright (c) Microsoft Corporation
Licensed under the MIT license.
"""
from deepspeed.runtime.config_utils import get_scalar_param, DeepSpeedConfigObject
from deepspeed.utils import logger
from .constants import *
from .offload_constants import *
from .offload_config import get_offload_param_config, get_default_offload_param_config, \
get_offload_optimizer_config, get_default_offload_optimizer_config
class DeepSpeedZeroConfig(DeepSpeedConfigObject):
def __init__(self, param_dict):
super(DeepSpeedZeroConfig, self).__init__()
self.stage = None
self.contiguous_gradients = None
self.reduce_scatter = None
self.reduce_bucket_size = None
self.allgather_partitions = None
self.allgather_bucket_size = None
self.overlap_comm = None
self.load_from_fp32_weights = None
self.elastic_checkpoint = None
#Offload Specific Parameters
self.offload_param = None
self.offload_optimizer = None
self.sub_group_size = None
#Stage3 Specific Parameters
self.prefetch_bucket_size = None
self.param_persistence_threshold = None
self.max_live_parameters = None
self.max_reuse_distance = None
self.gather_fp16_weights_on_model_save = None
self.ignore_unused_parameters = None
if ZERO_OPTIMIZATION in param_dict.keys():
zero_config_dict = param_dict[ZERO_OPTIMIZATION]
if type(zero_config_dict) is bool:
zero_config_dict = self.read_zero_config_deprecated(param_dict)
else:
zero_config_dict = ZERO_OPTIMIZATION_DEFAULT
self._initialize(zero_config_dict)
def read_zero_config_deprecated(self, param_dict):
zero_config_dict = {}
zero_config_dict[
ZERO_OPTIMIZATION_STAGE] = 1 if param_dict[ZERO_OPTIMIZATION] else 0
if zero_config_dict[ZERO_OPTIMIZATION_STAGE] > 0:
zero_config_dict[ZERO_OPTIMIZATION_ALLGATHER_BUCKET_SIZE] = get_scalar_param(
param_dict,
ZERO_OPTIMIZATION_ALLGATHER_BUCKET_SIZE_DEPRECATED,
ZERO_OPTIMIZATION_ALLGATHER_BUCKET_SIZE_DEFAULT)
logger.warning(
'DeepSpeedConfig: this format of ZeRO optimization setup is deprecated. Please use the following format: {}'
.format(ZERO_FORMAT))
return zero_config_dict
def _sanity_check(self, zero_config_dict):
deprecated_dict = {
ZERO_OPTIMIZATION_CPU_OFFLOAD:
ZERO_OPTIMIZATION_OFFLOAD_OPTIMIZER,
ZERO_OPTIMIZATION_CPU_OFFLOAD_PARAMS:
ZERO_OPTIMIZATION_OFFLOAD_PARAM,
ZERO_OPTIMIZATION_CPU_OFFLOAD_USE_PIN_MEMORY:
f'{ZERO_OPTIMIZATION_OFFLOAD_PARAM} or {ZERO_OPTIMIZATION_OFFLOAD_OPTIMIZER}'
}
for old_key, new_key in deprecated_dict.items():
if old_key in zero_config_dict:
logger.warning(
f'DeepSpeedConfig: {old_key} is deprecated. Please use {new_key}.')
def _initialize(self, zero_config_dict):
self._sanity_check(zero_config_dict)
self.stage = get_scalar_param(zero_config_dict,
ZERO_OPTIMIZATION_STAGE,
ZERO_OPTIMIZATION_STAGE_DEFAULT)
self.contiguous_gradients = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_CONTIGUOUS_GRADIENTS,
ZERO3_OPTIMIZATION_CONTIGUOUS_GRADIENTS_DEFAULT
if self.stage == ZERO_OPTIMIZATION_WEIGHTS else
ZERO_OPTIMIZATION_CONTIGUOUS_GRADIENTS_DEFAULT)
self.reduce_bucket_size = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_REDUCE_BUCKET_SIZE,
ZERO_OPTIMIZATION_REDUCE_BUCKET_SIZE_DEFAULT)
self.reduce_scatter = get_scalar_param(zero_config_dict,
ZERO_OPTIMIZATION_REDUCE_SCATTER,
ZERO_OPTIMIZATION_REDUCE_SCATTER_DEFAULT)
self.overlap_comm = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_OVERLAP_COMM,
ZERO3_OPTIMIZATION_OVERLAP_COMM_DEFAULT
if self.stage == ZERO_OPTIMIZATION_WEIGHTS else
ZERO_OPTIMIZATION_OVERLAP_COMM_DEFAULT)
self.allgather_partitions = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_ALLGATHER_PARTITIONS,
ZERO_OPTIMIZATION_ALLGATHER_PARTITIONS_DEFAULT)
self.allgather_bucket_size = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_ALLGATHER_BUCKET_SIZE,
ZERO_OPTIMIZATION_ALLGATHER_BUCKET_SIZE_DEFAULT)
self.load_from_fp32_weights = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_LOAD_FROM_FP32_WEIGHTS,
ZERO_OPTIMIZATION_LOAD_FROM_FP32_WEIGHTS_DEFAULT)
self.elastic_checkpoint = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_ELASTIC_CHECKPOINT,
ZERO_OPTIMIZATION_ELASTIC_CHECKPOINT_DEFAULT)
if ZERO_OPTIMIZATION_CPU_OFFLOAD in zero_config_dict:
cpu_offload_optimizer = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_CPU_OFFLOAD,
ZERO_OPTIMIZATION_CPU_OFFLOAD_DEFAULT)
if cpu_offload_optimizer:
self.offload_optimizer = get_default_offload_optimizer_config()
else:
self.offload_optimizer = get_offload_optimizer_config(zero_config_dict)
if ZERO_OPTIMIZATION_CPU_OFFLOAD_PARAMS in zero_config_dict:
cpu_offload_params = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_CPU_OFFLOAD_PARAMS,
ZERO_OPTIMIZATION_CPU_OFFLOAD_PARAMS_DEFAULT)
if cpu_offload_params:
self.offload_param = get_default_offload_param_config()
else:
self.offload_param = get_offload_param_config(zero_config_dict)
self.sub_group_size = get_scalar_param(zero_config_dict,
ZERO_OPTIMIZATION_SUB_GROUP_SIZE,
ZERO_OPTIMIZATION_SUB_GROUP_SIZE_DEFAULT)
self.max_live_parameters = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_MAX_LIVE_PARAMETERS,
ZERO_OPTIMIZATION_MAX_LIVE_PARAMETERS_DEFAULT)
self.max_reuse_distance = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_MAX_REUSE_DISTANCE,
ZERO_OPTIMIZATION_MAX_REUSE_DISTANCE_DEFAULT)
self.prefetch_bucket_size = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_PREFETCH_BUCKET_SIZE,
ZERO_OPTIMIZATION_PREFETCH_BUCKET_SIZE_DEFAULT)
self.param_persistence_threshold = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_PARAM_PERSISTENCE_THRESHOLD,
ZERO_OPTIMIZATION_PARAM_PERSISTENCE_THRESHOLD_DEFAULT)
self.gather_fp16_weights_on_model_save = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_GATHER_FP16_WEIGHTS_ON_MODEL_SAVE,
ZERO_OPTIMIZATION_GATHER_FP16_WEIGHTS_ON_MODEL_SAVE_DEFAULT)
self.ignore_unused_parameters = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_IGNORE_UNUSED_PARAMETERS,
ZERO_OPTIMIZATION_IGNORE_UNUSED_PARAMETERS_DEFAULT)
"""
Copyright (c) Microsoft Corporation
Licensed under the MIT license.
"""
from deepspeed.runtime.config_utils import get_scalar_param, DeepSpeedConfigObject
from deepspeed.utils import logger
from .constants import *
from .offload_constants import *
from .offload_config import get_offload_param_config, get_default_offload_param_config, \
get_offload_optimizer_config, get_default_offload_optimizer_config
class DeepSpeedZeroConfig(DeepSpeedConfigObject):
def __init__(self, param_dict):
super(DeepSpeedZeroConfig, self).__init__()
self.stage = None
self.contiguous_gradients = None
self.reduce_scatter = None
self.reduce_bucket_size = None
self.allgather_partitions = None
self.allgather_bucket_size = None
self.overlap_comm = None
self.load_from_fp32_weights = None
self.elastic_checkpoint = None
#Offload Specific Parameters
self.offload_param = None
self.offload_optimizer = None
self.sub_group_size = None
#Stage3 Specific Parameters
self.prefetch_bucket_size = None
self.param_persistence_threshold = None
self.max_live_parameters = None
self.max_reuse_distance = None
self.gather_fp16_weights_on_model_save = None
self.ignore_unused_parameters = None
if ZERO_OPTIMIZATION in param_dict.keys():
zero_config_dict = param_dict[ZERO_OPTIMIZATION]
if type(zero_config_dict) is bool:
zero_config_dict = self.read_zero_config_deprecated(param_dict)
else:
zero_config_dict = ZERO_OPTIMIZATION_DEFAULT
self._initialize(zero_config_dict)
def read_zero_config_deprecated(self, param_dict):
zero_config_dict = {}
zero_config_dict[
ZERO_OPTIMIZATION_STAGE] = 1 if param_dict[ZERO_OPTIMIZATION] else 0
if zero_config_dict[ZERO_OPTIMIZATION_STAGE] > 0:
zero_config_dict[ZERO_OPTIMIZATION_ALLGATHER_BUCKET_SIZE] = get_scalar_param(
param_dict,
ZERO_OPTIMIZATION_ALLGATHER_BUCKET_SIZE_DEPRECATED,
ZERO_OPTIMIZATION_ALLGATHER_BUCKET_SIZE_DEFAULT)
logger.warning(
'DeepSpeedConfig: this format of ZeRO optimization setup is deprecated. Please use the following format: {}'
.format(ZERO_FORMAT))
return zero_config_dict
def _sanity_check(self, zero_config_dict):
deprecated_dict = {
ZERO_OPTIMIZATION_CPU_OFFLOAD:
ZERO_OPTIMIZATION_OFFLOAD_OPTIMIZER,
ZERO_OPTIMIZATION_CPU_OFFLOAD_PARAMS:
ZERO_OPTIMIZATION_OFFLOAD_PARAM,
ZERO_OPTIMIZATION_CPU_OFFLOAD_USE_PIN_MEMORY:
f'{ZERO_OPTIMIZATION_OFFLOAD_PARAM} or {ZERO_OPTIMIZATION_OFFLOAD_OPTIMIZER}'
}
for old_key, new_key in deprecated_dict.items():
if old_key in zero_config_dict:
logger.warning(
f'DeepSpeedConfig: {old_key} is deprecated. Please use {new_key}.')
def _initialize(self, zero_config_dict):
self._sanity_check(zero_config_dict)
self.stage = get_scalar_param(zero_config_dict,
ZERO_OPTIMIZATION_STAGE,
ZERO_OPTIMIZATION_STAGE_DEFAULT)
self.contiguous_gradients = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_CONTIGUOUS_GRADIENTS,
ZERO3_OPTIMIZATION_CONTIGUOUS_GRADIENTS_DEFAULT
if self.stage == ZERO_OPTIMIZATION_WEIGHTS else
ZERO_OPTIMIZATION_CONTIGUOUS_GRADIENTS_DEFAULT)
self.reduce_bucket_size = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_REDUCE_BUCKET_SIZE,
ZERO_OPTIMIZATION_REDUCE_BUCKET_SIZE_DEFAULT)
self.reduce_scatter = get_scalar_param(zero_config_dict,
ZERO_OPTIMIZATION_REDUCE_SCATTER,
ZERO_OPTIMIZATION_REDUCE_SCATTER_DEFAULT)
self.overlap_comm = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_OVERLAP_COMM,
ZERO3_OPTIMIZATION_OVERLAP_COMM_DEFAULT
if self.stage == ZERO_OPTIMIZATION_WEIGHTS else
ZERO_OPTIMIZATION_OVERLAP_COMM_DEFAULT)
self.allgather_partitions = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_ALLGATHER_PARTITIONS,
ZERO_OPTIMIZATION_ALLGATHER_PARTITIONS_DEFAULT)
self.allgather_bucket_size = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_ALLGATHER_BUCKET_SIZE,
ZERO_OPTIMIZATION_ALLGATHER_BUCKET_SIZE_DEFAULT)
self.load_from_fp32_weights = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_LOAD_FROM_FP32_WEIGHTS,
ZERO_OPTIMIZATION_LOAD_FROM_FP32_WEIGHTS_DEFAULT)
self.elastic_checkpoint = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_ELASTIC_CHECKPOINT,
ZERO_OPTIMIZATION_ELASTIC_CHECKPOINT_DEFAULT)
if ZERO_OPTIMIZATION_CPU_OFFLOAD in zero_config_dict:
cpu_offload_optimizer = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_CPU_OFFLOAD,
ZERO_OPTIMIZATION_CPU_OFFLOAD_DEFAULT)
if cpu_offload_optimizer:
self.offload_optimizer = get_default_offload_optimizer_config()
else:
self.offload_optimizer = get_offload_optimizer_config(zero_config_dict)
if ZERO_OPTIMIZATION_CPU_OFFLOAD_PARAMS in zero_config_dict:
cpu_offload_params = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_CPU_OFFLOAD_PARAMS,
ZERO_OPTIMIZATION_CPU_OFFLOAD_PARAMS_DEFAULT)
if cpu_offload_params:
self.offload_param = get_default_offload_param_config()
else:
self.offload_param = get_offload_param_config(zero_config_dict)
self.sub_group_size = get_scalar_param(zero_config_dict,
ZERO_OPTIMIZATION_SUB_GROUP_SIZE,
ZERO_OPTIMIZATION_SUB_GROUP_SIZE_DEFAULT)
self.max_live_parameters = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_MAX_LIVE_PARAMETERS,
ZERO_OPTIMIZATION_MAX_LIVE_PARAMETERS_DEFAULT)
self.max_reuse_distance = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_MAX_REUSE_DISTANCE,
ZERO_OPTIMIZATION_MAX_REUSE_DISTANCE_DEFAULT)
self.prefetch_bucket_size = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_PREFETCH_BUCKET_SIZE,
ZERO_OPTIMIZATION_PREFETCH_BUCKET_SIZE_DEFAULT)
self.param_persistence_threshold = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_PARAM_PERSISTENCE_THRESHOLD,
ZERO_OPTIMIZATION_PARAM_PERSISTENCE_THRESHOLD_DEFAULT)
self.gather_fp16_weights_on_model_save = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_GATHER_FP16_WEIGHTS_ON_MODEL_SAVE,
ZERO_OPTIMIZATION_GATHER_FP16_WEIGHTS_ON_MODEL_SAVE_DEFAULT)
self.ignore_unused_parameters = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_IGNORE_UNUSED_PARAMETERS,
ZERO_OPTIMIZATION_IGNORE_UNUSED_PARAMETERS_DEFAULT)
self.legacy_stage1 = get_scalar_param(zero_config_dict,
ZERO_OPTIMIZATION_LEGACY_STAGE1,
ZERO_OPTIMIZATION_LEGACY_STAGE1_DEFAULT)
"""
Copyright (c) Microsoft Corporation
Licensed under the MIT license.
"""
from .offload_constants import *
#########################################
# ZeRO optimization
#########################################
# ZeRO optimization. By default, this optimization is not enabled.
# Users have to configure the desired optimization (0 means disabled) in params.json as below example:
ZERO_FORMAT = '''
ZeRO optimization should be enabled as:
"session_params": {
"zero_optimization": {
"stage": [0|1|2],
"stage3_max_live_parameters" : 1000000000,
"stage3_max_reuse_distance" : 1000000000,
"allgather_partitions": [true|false],
"allgather_bucket_size": 500000000,
"reduce_scatter": [true|false],
"contiguous_gradients" : [true|false]
"overlap_comm": [true|false],
"reduce_bucket_size": 500000000,
"load_from_fp32_weights": [true|false],
"cpu_offload": [true|false] (deprecated),
"cpu_offload_params" : [true|false] (deprecated),
"cpu_offload_use_pin_memory": [true|false] (deprecated),
"sub_group_size" : 1000000000000,
"offload_param": {...},
"offload_optimizer": {...},
"ignore_unused_parameters": [true|false]
}
}
'''
ZERO_OPTIMIZATION = 'zero_optimization'
ZERO_OPTIMIZATION_DISABLED = 0
ZERO_OPTIMIZATION_OPTIMIZER_STATES = 1
ZERO_OPTIMIZATION_GRADIENTS = 2
ZERO_OPTIMIZATION_WEIGHTS = 3
MAX_STAGE_ZERO_OPTIMIZATION = ZERO_OPTIMIZATION_WEIGHTS
ZERO_OPTIMIZATION_STAGE = 'stage'
ZERO_OPTIMIZATION_STAGE_1 = 'stage_1'
ZERO_OPTIMIZATION_STAGE_2 = 'stage_2'
ZERO_OPTIMIZATION_STAGE_3 = 'stage_3'
ZERO_OPTIMIZATION_STAGE_DEFAULT = ZERO_OPTIMIZATION_DISABLED
ZERO_OPTIMIZATION_ALLGATHER_PARTITIONS = 'allgather_partitions'
ZERO_OPTIMIZATION_ALLGATHER_PARTITIONS_DEFAULT = True
ZERO_OPTIMIZATION_REDUCE_SCATTER = 'reduce_scatter'
ZERO_OPTIMIZATION_REDUCE_SCATTER_DEFAULT = False
ZERO_OPTIMIZATION_OVERLAP_COMM = 'overlap_comm'
ZERO_OPTIMIZATION_OVERLAP_COMM_DEFAULT = False
ZERO3_OPTIMIZATION_OVERLAP_COMM_DEFAULT = True
ZERO_OPTIMIZATION_CONTIGUOUS_GRADIENTS = 'contiguous_gradients'
ZERO_OPTIMIZATION_CONTIGUOUS_GRADIENTS_DEFAULT = False
ZERO3_OPTIMIZATION_CONTIGUOUS_GRADIENTS_DEFAULT = False
ZERO_OPTIMIZATION_REDUCE_BUCKET_SIZE = 'reduce_bucket_size'
ZERO_OPTIMIZATION_REDUCE_BUCKET_SIZE_DEFAULT = 500000000
ZERO_OPTIMIZATION_ALLGATHER_BUCKET_SIZE = 'allgather_bucket_size'
ZERO_OPTIMIZATION_ALLGATHER_BUCKET_SIZE_DEFAULT = 500000000
ZERO_OPTIMIZATION_ALLGATHER_BUCKET_SIZE_DEPRECATED = 'allgather_size'
ZERO_OPTIMIZATION_LOAD_FROM_FP32_WEIGHTS = 'load_from_fp32_weights'
ZERO_OPTIMIZATION_LOAD_FROM_FP32_WEIGHTS_DEFAULT = True
ZERO_OPTIMIZATION_ELASTIC_CHECKPOINT = 'elastic_checkpoint'
ZERO_OPTIMIZATION_ELASTIC_CHECKPOINT_DEFAULT = True
ZERO_OPTIMIZATION_CPU_OFFLOAD = 'cpu_offload'
ZERO_OPTIMIZATION_CPU_OFFLOAD_DEFAULT = False
ZERO_OPTIMIZATION_CPU_OFFLOAD_PARAMS = 'cpu_offload_params'
ZERO_OPTIMIZATION_CPU_OFFLOAD_PARAMS_DEFAULT = False
ZERO_OPTIMIZATION_CPU_OFFLOAD_USE_PIN_MEMORY = 'cpu_offload_use_pin_memory'
ZERO_OPTIMIZATION_CPU_OFFLOAD_USE_PIN_MEMORY_DEFAULT = False
ZERO_OPTIMIZATION_OFFLOAD_PARAM = OFFLOAD_PARAM
ZERO_OPTIMIZATION_OFFLOAD_PARAM_DEFAULT = None
ZERO_OPTIMIZATION_OFFLOAD_OPTIMIZER = OFFLOAD_OPTIMIZER
ZERO_OPTIMIZATION_OFFLOAD_OPTIMIZER_DEFAULT = None
ZERO_OPTIMIZATION_SUB_GROUP_SIZE = 'sub_group_size'
ZERO_OPTIMIZATION_SUB_GROUP_SIZE_DEFAULT = 1000000000000
#maximum number of parameters per GPU before releasing them
ZERO_OPTIMIZATION_MAX_LIVE_PARAMETERS = 'stage3_max_live_parameters'
ZERO_OPTIMIZATION_MAX_LIVE_PARAMETERS_DEFAULT = 1000000000
#release a parameter only if the reuse distance is larger than specified
ZERO_OPTIMIZATION_MAX_REUSE_DISTANCE = 'stage3_max_reuse_distance'
ZERO_OPTIMIZATION_MAX_REUSE_DISTANCE_DEFAULT = 1000000000
ZERO_OPTIMIZATION_PREFETCH_BUCKET_SIZE = 'stage3_prefetch_bucket_size'
ZERO_OPTIMIZATION_PREFETCH_BUCKET_SIZE_DEFAULT = 50000000
#parameters smaller than the threshold are only communicated once after the
#parameters are updated and are persisted thoughout the trainging
#avoid tons of latency bound communication
ZERO_OPTIMIZATION_PARAM_PERSISTENCE_THRESHOLD = 'stage3_param_persistence_threshold'
ZERO_OPTIMIZATION_PARAM_PERSISTENCE_THRESHOLD_DEFAULT = 100000
# gathers params for saving a model - inefficient but is required in certain situations
ZERO_OPTIMIZATION_GATHER_FP16_WEIGHTS_ON_MODEL_SAVE = 'stage3_gather_fp16_weights_on_model_save'
ZERO_OPTIMIZATION_GATHER_FP16_WEIGHTS_ON_MODEL_SAVE_DEFAULT = False
# Now just used in stage2 complete_grad_norm_calculation_for_cpu_offload
# Enable this option to avoid:
# https://github.com/microsoft/DeepSpeed/issues/707
ZERO_OPTIMIZATION_IGNORE_UNUSED_PARAMETERS = 'ignore_unused_parameters'
ZERO_OPTIMIZATION_IGNORE_UNUSED_PARAMETERS_DEFAULT = True
ZERO_OPTIMIZATION_DEFAULT = {
ZERO_OPTIMIZATION_STAGE:
ZERO_OPTIMIZATION_STAGE_DEFAULT,
ZERO_OPTIMIZATION_CONTIGUOUS_GRADIENTS:
ZERO_OPTIMIZATION_CONTIGUOUS_GRADIENTS_DEFAULT,
ZERO_OPTIMIZATION_REDUCE_SCATTER:
ZERO_OPTIMIZATION_REDUCE_SCATTER_DEFAULT,
ZERO_OPTIMIZATION_REDUCE_BUCKET_SIZE:
ZERO_OPTIMIZATION_REDUCE_BUCKET_SIZE_DEFAULT,
ZERO_OPTIMIZATION_ALLGATHER_PARTITIONS:
ZERO_OPTIMIZATION_ALLGATHER_PARTITIONS_DEFAULT,
ZERO_OPTIMIZATION_ALLGATHER_BUCKET_SIZE:
ZERO_OPTIMIZATION_ALLGATHER_BUCKET_SIZE_DEFAULT,
ZERO_OPTIMIZATION_LOAD_FROM_FP32_WEIGHTS:
ZERO_OPTIMIZATION_LOAD_FROM_FP32_WEIGHTS_DEFAULT,
ZERO_OPTIMIZATION_ELASTIC_CHECKPOINT:
ZERO_OPTIMIZATION_ELASTIC_CHECKPOINT_DEFAULT,
ZERO_OPTIMIZATION_OFFLOAD_PARAM:
ZERO_OPTIMIZATION_OFFLOAD_PARAM_DEFAULT,
ZERO_OPTIMIZATION_OFFLOAD_OPTIMIZER:
ZERO_OPTIMIZATION_OFFLOAD_OPTIMIZER_DEFAULT,
ZERO_OPTIMIZATION_SUB_GROUP_SIZE:
ZERO_OPTIMIZATION_SUB_GROUP_SIZE_DEFAULT,
ZERO_OPTIMIZATION_MAX_LIVE_PARAMETERS:
ZERO_OPTIMIZATION_MAX_LIVE_PARAMETERS_DEFAULT,
ZERO_OPTIMIZATION_MAX_REUSE_DISTANCE:
ZERO_OPTIMIZATION_MAX_REUSE_DISTANCE_DEFAULT,
ZERO_OPTIMIZATION_PREFETCH_BUCKET_SIZE:
ZERO_OPTIMIZATION_PREFETCH_BUCKET_SIZE_DEFAULT,
ZERO_OPTIMIZATION_PARAM_PERSISTENCE_THRESHOLD:
ZERO_OPTIMIZATION_PARAM_PERSISTENCE_THRESHOLD_DEFAULT,
ZERO_OPTIMIZATION_GATHER_FP16_WEIGHTS_ON_MODEL_SAVE:
ZERO_OPTIMIZATION_GATHER_FP16_WEIGHTS_ON_MODEL_SAVE_DEFAULT,
ZERO_OPTIMIZATION_IGNORE_UNUSED_PARAMETERS:
ZERO_OPTIMIZATION_IGNORE_UNUSED_PARAMETERS_DEFAULT
}
"""
Copyright (c) Microsoft Corporation
Licensed under the MIT license.
"""
from .offload_constants import *
#########################################
# ZeRO optimization
#########################################
# ZeRO optimization. By default, this optimization is not enabled.
# Users have to configure the desired optimization (0 means disabled) in params.json as below example:
ZERO_FORMAT = '''
ZeRO optimization should be enabled as:
"session_params": {
"zero_optimization": {
"stage": [0|1|2],
"stage3_max_live_parameters" : 1000000000,
"stage3_max_reuse_distance" : 1000000000,
"allgather_partitions": [true|false],
"allgather_bucket_size": 500000000,
"reduce_scatter": [true|false],
"contiguous_gradients" : [true|false]
"overlap_comm": [true|false],
"reduce_bucket_size": 500000000,
"load_from_fp32_weights": [true|false],
"cpu_offload": [true|false] (deprecated),
"cpu_offload_params" : [true|false] (deprecated),
"cpu_offload_use_pin_memory": [true|false] (deprecated),
"sub_group_size" : 1000000000000,
"offload_param": {...},
"offload_optimizer": {...},
"ignore_unused_parameters": [true|false]
}
}
'''
ZERO_OPTIMIZATION = 'zero_optimization'
ZERO_OPTIMIZATION_DISABLED = 0
ZERO_OPTIMIZATION_OPTIMIZER_STATES = 1
ZERO_OPTIMIZATION_GRADIENTS = 2
ZERO_OPTIMIZATION_WEIGHTS = 3
MAX_STAGE_ZERO_OPTIMIZATION = ZERO_OPTIMIZATION_WEIGHTS
ZERO_OPTIMIZATION_STAGE = 'stage'
ZERO_OPTIMIZATION_STAGE_1 = 'stage_1'
ZERO_OPTIMIZATION_STAGE_2 = 'stage_2'
ZERO_OPTIMIZATION_STAGE_3 = 'stage_3'
ZERO_OPTIMIZATION_STAGE_DEFAULT = ZERO_OPTIMIZATION_DISABLED
ZERO_OPTIMIZATION_ALLGATHER_PARTITIONS = 'allgather_partitions'
ZERO_OPTIMIZATION_ALLGATHER_PARTITIONS_DEFAULT = True
ZERO_OPTIMIZATION_REDUCE_SCATTER = 'reduce_scatter'
ZERO_OPTIMIZATION_REDUCE_SCATTER_DEFAULT = False
ZERO_OPTIMIZATION_OVERLAP_COMM = 'overlap_comm'
ZERO_OPTIMIZATION_OVERLAP_COMM_DEFAULT = False
ZERO3_OPTIMIZATION_OVERLAP_COMM_DEFAULT = True
ZERO_OPTIMIZATION_CONTIGUOUS_GRADIENTS = 'contiguous_gradients'
ZERO_OPTIMIZATION_CONTIGUOUS_GRADIENTS_DEFAULT = False
ZERO3_OPTIMIZATION_CONTIGUOUS_GRADIENTS_DEFAULT = False
ZERO_OPTIMIZATION_REDUCE_BUCKET_SIZE = 'reduce_bucket_size'
ZERO_OPTIMIZATION_REDUCE_BUCKET_SIZE_DEFAULT = 500000000
ZERO_OPTIMIZATION_ALLGATHER_BUCKET_SIZE = 'allgather_bucket_size'
ZERO_OPTIMIZATION_ALLGATHER_BUCKET_SIZE_DEFAULT = 500000000
ZERO_OPTIMIZATION_ALLGATHER_BUCKET_SIZE_DEPRECATED = 'allgather_size'
ZERO_OPTIMIZATION_LOAD_FROM_FP32_WEIGHTS = 'load_from_fp32_weights'
ZERO_OPTIMIZATION_LOAD_FROM_FP32_WEIGHTS_DEFAULT = True
ZERO_OPTIMIZATION_ELASTIC_CHECKPOINT = 'elastic_checkpoint'
ZERO_OPTIMIZATION_ELASTIC_CHECKPOINT_DEFAULT = True
ZERO_OPTIMIZATION_CPU_OFFLOAD = 'cpu_offload'
ZERO_OPTIMIZATION_CPU_OFFLOAD_DEFAULT = False
ZERO_OPTIMIZATION_CPU_OFFLOAD_PARAMS = 'cpu_offload_params'
ZERO_OPTIMIZATION_CPU_OFFLOAD_PARAMS_DEFAULT = False
ZERO_OPTIMIZATION_CPU_OFFLOAD_USE_PIN_MEMORY = 'cpu_offload_use_pin_memory'
ZERO_OPTIMIZATION_CPU_OFFLOAD_USE_PIN_MEMORY_DEFAULT = False
ZERO_OPTIMIZATION_OFFLOAD_PARAM = OFFLOAD_PARAM
ZERO_OPTIMIZATION_OFFLOAD_PARAM_DEFAULT = None
ZERO_OPTIMIZATION_OFFLOAD_OPTIMIZER = OFFLOAD_OPTIMIZER
ZERO_OPTIMIZATION_OFFLOAD_OPTIMIZER_DEFAULT = None
ZERO_OPTIMIZATION_SUB_GROUP_SIZE = 'sub_group_size'
ZERO_OPTIMIZATION_SUB_GROUP_SIZE_DEFAULT = 1000000000000
#maximum number of parameters per GPU before releasing them
ZERO_OPTIMIZATION_MAX_LIVE_PARAMETERS = 'stage3_max_live_parameters'
ZERO_OPTIMIZATION_MAX_LIVE_PARAMETERS_DEFAULT = 1000000000
#release a parameter only if the reuse distance is larger than specified
ZERO_OPTIMIZATION_MAX_REUSE_DISTANCE = 'stage3_max_reuse_distance'
ZERO_OPTIMIZATION_MAX_REUSE_DISTANCE_DEFAULT = 1000000000
ZERO_OPTIMIZATION_PREFETCH_BUCKET_SIZE = 'stage3_prefetch_bucket_size'
ZERO_OPTIMIZATION_PREFETCH_BUCKET_SIZE_DEFAULT = 50000000
#parameters smaller than the threshold are only communicated once after the
#parameters are updated and are persisted thoughout the trainging
#avoid tons of latency bound communication
ZERO_OPTIMIZATION_PARAM_PERSISTENCE_THRESHOLD = 'stage3_param_persistence_threshold'
ZERO_OPTIMIZATION_PARAM_PERSISTENCE_THRESHOLD_DEFAULT = 100000
# gathers params for saving a model - inefficient but is required in certain situations
ZERO_OPTIMIZATION_GATHER_FP16_WEIGHTS_ON_MODEL_SAVE = 'stage3_gather_fp16_weights_on_model_save'
ZERO_OPTIMIZATION_GATHER_FP16_WEIGHTS_ON_MODEL_SAVE_DEFAULT = False
# Now just used in stage2 complete_grad_norm_calculation_for_cpu_offload
# Enable this option to avoid:
# https://github.com/microsoft/DeepSpeed/issues/707
ZERO_OPTIMIZATION_IGNORE_UNUSED_PARAMETERS = 'ignore_unused_parameters'
ZERO_OPTIMIZATION_IGNORE_UNUSED_PARAMETERS_DEFAULT = True
# Use deepspeed < v0.3.17 zero stage 1, kept for backwards compatability reasons
ZERO_OPTIMIZATION_LEGACY_STAGE1 = "legacy_stage1"
ZERO_OPTIMIZATION_LEGACY_STAGE1_DEFAULT = False
#yapf: disable
ZERO_OPTIMIZATION_DEFAULT = {
ZERO_OPTIMIZATION_STAGE:
ZERO_OPTIMIZATION_STAGE_DEFAULT,
ZERO_OPTIMIZATION_CONTIGUOUS_GRADIENTS:
ZERO_OPTIMIZATION_CONTIGUOUS_GRADIENTS_DEFAULT,
ZERO_OPTIMIZATION_REDUCE_SCATTER:
ZERO_OPTIMIZATION_REDUCE_SCATTER_DEFAULT,
ZERO_OPTIMIZATION_REDUCE_BUCKET_SIZE:
ZERO_OPTIMIZATION_REDUCE_BUCKET_SIZE_DEFAULT,
ZERO_OPTIMIZATION_ALLGATHER_PARTITIONS:
ZERO_OPTIMIZATION_ALLGATHER_PARTITIONS_DEFAULT,
ZERO_OPTIMIZATION_ALLGATHER_BUCKET_SIZE:
ZERO_OPTIMIZATION_ALLGATHER_BUCKET_SIZE_DEFAULT,
ZERO_OPTIMIZATION_LOAD_FROM_FP32_WEIGHTS:
ZERO_OPTIMIZATION_LOAD_FROM_FP32_WEIGHTS_DEFAULT,
ZERO_OPTIMIZATION_ELASTIC_CHECKPOINT:
ZERO_OPTIMIZATION_ELASTIC_CHECKPOINT_DEFAULT,
ZERO_OPTIMIZATION_OFFLOAD_PARAM:
ZERO_OPTIMIZATION_OFFLOAD_PARAM_DEFAULT,
ZERO_OPTIMIZATION_OFFLOAD_OPTIMIZER:
ZERO_OPTIMIZATION_OFFLOAD_OPTIMIZER_DEFAULT,
ZERO_OPTIMIZATION_SUB_GROUP_SIZE:
ZERO_OPTIMIZATION_SUB_GROUP_SIZE_DEFAULT,
ZERO_OPTIMIZATION_MAX_LIVE_PARAMETERS:
ZERO_OPTIMIZATION_MAX_LIVE_PARAMETERS_DEFAULT,
ZERO_OPTIMIZATION_MAX_REUSE_DISTANCE:
ZERO_OPTIMIZATION_MAX_REUSE_DISTANCE_DEFAULT,
ZERO_OPTIMIZATION_PREFETCH_BUCKET_SIZE:
ZERO_OPTIMIZATION_PREFETCH_BUCKET_SIZE_DEFAULT,
ZERO_OPTIMIZATION_PARAM_PERSISTENCE_THRESHOLD:
ZERO_OPTIMIZATION_PARAM_PERSISTENCE_THRESHOLD_DEFAULT,
ZERO_OPTIMIZATION_GATHER_FP16_WEIGHTS_ON_MODEL_SAVE:
ZERO_OPTIMIZATION_GATHER_FP16_WEIGHTS_ON_MODEL_SAVE_DEFAULT,
ZERO_OPTIMIZATION_IGNORE_UNUSED_PARAMETERS:
ZERO_OPTIMIZATION_IGNORE_UNUSED_PARAMETERS_DEFAULT,
ZERO_OPTIMIZATION_LEGACY_STAGE1:
ZERO_OPTIMIZATION_LEGACY_STAGE1_DEFAULT
}
......@@ -77,7 +77,10 @@ class FP16_DeepSpeedZeroOptimizer_Stage1(object):
allgather_size=500000000,
clip_grad=0.0,
max_elements_per_comm=5e8,
elastic_checkpoint=True):
elastic_checkpoint=True,
postscale_gradients=True,
gradient_predivide_factor=1.0,
gradient_average=True):
# Load pre-built or JIT compile (un)flatten ops
util_ops = UtilsBuilder().load()
......@@ -98,6 +101,10 @@ class FP16_DeepSpeedZeroOptimizer_Stage1(object):
self.verbose = verbose
self.dp_process_group = dp_process_group
self.postscale_gradients = postscale_gradients
self.gradient_predivide_factor = gradient_predivide_factor
self.gradient_average = gradient_average
# TODO: automatically turn off if #params > some_limit
self.all_gather_partitions = all_gather_partitions
self.allgather_size = allgather_size
......@@ -575,10 +582,11 @@ class FP16_DeepSpeedZeroOptimizer_Stage1(object):
flat_tensors = self.flatten(aligned_tensor_list)
return flat_tensors
def reduce_scatter_gradients(self,
postscale_gradients,
gradient_predivide_factor,
gradient_average):
def reduce_gradients(self, pipeline_parallel=False):
postscale_gradients = self.postscale_gradients
gradient_predivide_factor = self.gradient_predivide_factor
gradient_average = self.gradient_average
world_size = dist.get_world_size(group=self.dp_process_group)
local_rank = dist.get_rank(group=self.dp_process_group)
......
......@@ -8,6 +8,7 @@ import torch.distributed as dist
import math
from torch._six import inf
from torch.autograd import Variable
from packaging import version as pkg_version
import collections
......@@ -17,6 +18,7 @@ from deepspeed.runtime.zero.config import ZERO_OPTIMIZATION_GRADIENTS
from deepspeed.ops.adam import DeepSpeedCPUAdam
from deepspeed.ops.op_builder import UtilsBuilder
from deepspeed.utils import logger
from deepspeed.git_version_info import version
#Toggle this to true to enable correctness test
#with gradient partitioning and without
......@@ -96,7 +98,8 @@ class FP16_DeepSpeedZeroOptimizer(object):
postscale_gradients=True,
gradient_predivide_factor=1.0,
gradient_accumulation_steps=1,
ignore_unused_parameters=True):
ignore_unused_parameters=True,
partition_grads=True):
if dist.get_rank() == 0:
logger.info(f"Reduce bucket size {reduce_bucket_size}")
......@@ -120,6 +123,9 @@ class FP16_DeepSpeedZeroOptimizer(object):
self.flatten = util_ops.flatten
self.unflatten = util_ops.unflatten
# ZeRO stage 1 (False) or 2 (True)
self.partition_gradients = partition_grads
self.timers = timers
self.reduce_scatter = reduce_scatter
......@@ -136,6 +142,8 @@ class FP16_DeepSpeedZeroOptimizer(object):
self.partition_count = dist.get_world_size(group=self.dp_process_group)
self.is_gradient_accumulation_boundary = True
if mpu is None:
self.model_parallel_group = None
self.model_parallel_rank = 0
......@@ -152,6 +160,8 @@ class FP16_DeepSpeedZeroOptimizer(object):
self.micro_step_id = 0
self.ignore_unused_parameters = ignore_unused_parameters
self.extra_large_param_to_reduce = None
if self.reduce_scatter:
assert not self.allreduce_always_fp32, "allreduce_always_fp32 is not yet supported with ZeRO-2 with reduce scatter enabled"
assert self.gradient_predivide_factor == 1.0, "gradient_predivide_factor != 1.0 is not yet supported with ZeRO-2 with reduce scatter enabled"
......@@ -373,7 +383,8 @@ class FP16_DeepSpeedZeroOptimizer(object):
self.reset_partition_gradient_structures()
#creates backward hooks for gradient partitioning
self.create_reduce_and_remove_grad_hooks()
if self.partition_gradients or self.overlap_comm:
self.create_reduce_and_remove_grad_hooks()
# we may have a way of fusing dynamic scale. Do not support for now
if self.dtype == torch.float or not dynamic_loss_scale:
......@@ -455,6 +466,31 @@ class FP16_DeepSpeedZeroOptimizer(object):
return
#########################################################################
#################### ZeRO Stage 1 - reduce gradients ####################
#########################################################################
def reduce_gradients(self, pipeline_parallel=False):
world_size = dist.get_world_size(self.dp_process_group)
my_rank = dist.get_rank(self.dp_process_group)
# with PP we must create ipg buffer, since backward is handled outside zero
if pipeline_parallel and self.contiguous_gradients:
self.ipg_buffer = []
buf_0 = torch.empty(int(self.reduce_bucket_size),
dtype=self.dtype,
device=torch.cuda.current_device())
self.ipg_buffer.append(buf_0)
self.ipg_index = 0
if not self.overlap_comm:
for i, group in enumerate(self.fp16_groups):
for param in group:
self.reduce_ready_partitions_and_remove_grads(param, i)
# reduce any pending grads in either hook/non-hook case
self.overlapping_partition_gradients_reduce_epilogue()
#########################################################################
#########################ZeRO Partition Gradients########################
#########################################################################
......@@ -680,8 +716,11 @@ class FP16_DeepSpeedZeroOptimizer(object):
Gradient computed twice for this partition. \
Multiple gradient reduction is currently not supported"
#keeping the gradients contiguous to prevent memory fragmentation, and avoid flattening
if self.contiguous_gradients:
if param.numel() > self.reduce_bucket_size:
self.extra_large_param_to_reduce = param
elif self.contiguous_gradients:
#keeping the gradients contiguous to prevent memory fragmentation, and avoid flattening
new_grad_tensor = self.ipg_buffer[self.ipg_index].narrow(
0,
self.elements_in_ipg_bucket,
......@@ -1019,7 +1058,14 @@ class FP16_DeepSpeedZeroOptimizer(object):
stream = torch.cuda.current_stream()
if self.contiguous_gradients:
self.average_tensor(self.ipg_buffer[self.ipg_index])
if self.extra_large_param_to_reduce is not None:
assert len(self.params_in_ipg_bucket) == 1, "more than 1 param in ipg bucket, this shouldn't happen"
_, _, param_id = self.params_in_ipg_bucket[0]
assert self.get_param_id(self.extra_large_param_to_reduce) == param_id, "param in ipg bucket does not match extra-large param"
self.average_tensor(self.extra_large_param_to_reduce.grad.view(-1))
self.extra_large_param_to_reduce = None
else:
self.average_tensor(self.ipg_buffer[self.ipg_index])
else:
self.buffered_reduce_fallback(
None,
......@@ -1054,7 +1100,8 @@ class FP16_DeepSpeedZeroOptimizer(object):
#####################################################################
def reduce_ready_partitions_and_remove_grads(self, param, i):
self.reduce_independent_p_g_buckets_and_remove_grads(param, i)
if self.partition_gradients or self.is_gradient_accumulation_boundary:
self.reduce_independent_p_g_buckets_and_remove_grads(param, i)
def zero_reduced_gradients(self, partition_id, i):
def are_all_related_partitions_reduced(params_id):
......@@ -1677,17 +1724,16 @@ class FP16_DeepSpeedZeroOptimizer(object):
if self.cpu_offload:
torch.cuda.current_stream().wait_stream(self.migration_stream)
#TODO: we need to revist this and remove the magic 4.5x multiplier here
if self.contiguous_gradients:
self.ipg_buffer = []
buf_0 = torch.empty(int(self.reduce_bucket_size * 4.5),
buf_0 = torch.empty(int(self.reduce_bucket_size),
dtype=self.dtype,
device=torch.cuda.current_device())
self.ipg_buffer.append(buf_0)
# Use double buffers to avoid data access conflict when overlap_comm is enabled.
if self.overlap_comm:
buf_1 = torch.empty(int(self.reduce_bucket_size * 4.5),
buf_1 = torch.empty(int(self.reduce_bucket_size),
dtype=self.dtype,
device=torch.cuda.current_device())
self.ipg_buffer.append(buf_1)
......@@ -1785,6 +1831,8 @@ class FP16_DeepSpeedZeroOptimizer(object):
state_dict['zero_stage'] = ZERO_OPTIMIZATION_GRADIENTS
state_dict['partition_count'] = self.partition_count
state_dict['ds_version'] = version
# Remove paddings for DP alignment to enable loading for other alignment values
fp32_groups_without_padding = self._get_groups_without_padding(
self.single_partition_of_fp32_groups)
......@@ -1904,6 +1952,18 @@ class FP16_DeepSpeedZeroOptimizer(object):
self.dynamic_loss_scale = state_dict_list[0]['dynamic_loss_scale']
self.overflow = state_dict_list[0]['overflow']
# zero stage 1 mode
if not self.partition_gradients:
required_version = pkg_version.parse("0.3.17")
ckpt_version = state_dict_list[0].get("ds_version", False)
error_str = f"ZeRO stage 1 changed in {required_version} and is not backwards compatible " \
"with older stage 1 checkpoints. If you'd like to load an old ZeRO-1 checkpoint " \
"please set 'legacy_stage1': true in your zero config json. This old version of " \
"stage 1 will be removed in v0.4.0."
assert ckpt_version, f"Empty ds_version! {error_str}"
assert required_version <= pkg_version.parse(ckpt_version), f"Old version: {ckpt_version} {error_str}"
if load_optimizer_states:
self._restore_base_optimizer_state(state_dict_list)
......
......@@ -352,6 +352,11 @@ Enabling and configuring ZeRO memory optimizations
| --------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- |
| Copies the gradients to a contiguous buffer as they are produced. Avoids memory fragmentation during backward pass. Only useful when running very large models. | `False` |
<i>**grad_hooks**</i>: [boolean]
| Description | Default |
| ------------------------------------------------------------------------------------------------------------------------------------------ | ------- |
| For use with ZeRO stage 1, enable backward hooks to reduce gradients during the backward pass or wait until the end of the backward pass. | `True` |
***offload_param***: [dictionary]
......
......@@ -5,3 +5,4 @@ tensorboardX==1.8
ninja
numpy
psutil
packaging
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册