未验证 提交 4912e0ad 编写于 作者: J Justin Chiu 提交者: GitHub

Various ZeRO Stage3 Optimizations + Improvements (including bfloat16 support) (#1453)

* Changes for bfloat16 Zero2

* ZeRO stage3 optimizations, with some bug fixes

optimizations for stage3:
- prefetching improvements
- batching allgather calls to amortize fixed overhead and improve
  bandwidth utilization
- batching reduce_scatter calls to amortize fixed overhead and
  improve bandwidth utilization
- using *_base variants of allgather and reduce scatter to reduce memory
  allocations and data movement
- more fine grained synchronization for communication that allows
  blocking on less work
- precomputation of fetching code - using a fetch queue rather than
  deciding what to (pre)fetch at each iteration
- limiting queued coalesced communication ops to reduce memory pressure
  on pytorch cuda caching allocator (not elegant solution)

optimizations for stage3-offload:
- made some host-device tensor copies async to improve performance

bug fixes and qol improvements:
- fix init context method when parent modules modify child weights
- speed up model initialization by moving model to GPU before weight
  initialization
- fixed unit test imports so that unit tests can be run from any
  directory
- change performance logging to include memory consumption
- add logging w/ model size when done partitioning model

new features
- bfloat16 support for ZeRO 3

* fix import in ut

* ran yapf

* improvements to cache flush warn log

* backwards compatibility with older versions of pytorch

* handle edge case where reduced tensor smaller than world size

* moved event synchronization to allgather handle wait() call

* removed unnecessary barrier call

* formatting fix after resolving merge conflict

* skip nvme prefetch when trace not complete

* opportunistically avoid memory allocation in allgather coalesced where possible

* fix indentation after merge

* fixes to account for parameter offload

* accounting for torch.cuda.memory_stats not being available

* moved partition_all_params to optimizer step

* allgathering on params before item gets called

* fix param status checks

needed after moving partition_all_parameters call to optimizer step

* fix grad accumulation with optimizer offload

* grad norm computation fix for optimizer offload

* change post divide in reduce-scatter to pre divide

* fix gradient race condition w/ optimizer offload

* improve inf/nan gradient tracking

* don't prefetch when not in training mode

* format fix after merging

* fix prefetching issue when using NVME offload

* improved defragmentation for fp16 parameters

* relative imports for bf16 tests

* changes for bwd compatibility with pytorch 1.2

* remove buffered_reduce_fallback

* removed unused parameter offset bookkeeping

* fixed tracking for multiple param groups

* unbroke bfloat16 config after merge conflict

* using base allgather params when only 1 param

* cleanup/fixes for fp16 partition defragmentation

* switch to CRLF

* convert to same new-line style as master

* align new line with master

* Fix merge issues

* switch to CRLF

* fix to LF line endings

* minor merge fixes

* remove extra bfloat16_enabled definition

* asserting params inflight for AllGatherHandle

* remove get_cuda_mem_allocated_str

* Format fixes

* fix bfloat16 zero stage check (broken after merge commit)

* +self.communication_data_type, -self.allreduce_always_fp32; delete dead code

* Add self.reduce_scatter

* Format fix

* Fix merge issues

* iterate over params_to_fetch rather than make another iterator

* add some TODOs

* remove unnecessary division by micro_step_id

* rename config keys "bfloat16" -> "bf16"

* rename stage3_gather_fp16_weights_on_model_save -> stage3_gather_16bit_weights_on_model_save

* add unit test to check backwards compatibility for gather_16bit_weights

* added test to confirm bf16 key bwd compatibility

* Format fixes
Co-authored-by: NRana Ali Amjad <raamjad@amazon.com>
Co-authored-by: NJustin Chiu <justchiu@amazon.com>
Co-authored-by: NOlatunji Ruwase <olruwase@microsoft.com>
Co-authored-by: NJeff Rasley <jerasley@microsoft.com>
上级 2d51f617
Subproject commit 1fed12e8b375b0c54902827e7140d8266dfccd59
Subproject commit 174ae3bc8dbb688cfaccb4afa15d6e2cdbe19ce5
......@@ -11,7 +11,7 @@
"stage3_max_reuse_distance": 1e9,
"stage3_prefetch_bucket_size": 5e8,
"stage3_param_persistence_threshold": 1e6,
"stage3_gather_fp16_weights_on_model_save": false,
"stage3_gather_16bit_weights_on_model_save": false,
"sub_group_size": 1e12
}
}
"""batched collective operations for overhead amortization and better
bandwidth utilization"""
import math
from typing import List
import torch
from torch import Tensor
import torch.distributed
from torch.distributed import ProcessGroup
import torch.nn.functional
from deepspeed.utils import instrument_w_nvtx
from deepspeed.utils.logging import logger
if hasattr(torch.distributed, "_reduce_scatter_base"):
def torch_reduce_scatter_fn(input_tensor: Tensor, output_tensor: Tensor, group):
instrument_w_nvtx(torch.distributed._reduce_scatter_base)(
output_tensor,
input_tensor,
group=group,
)
else:
logger.warning(
"unable to find torch.distributed._reduce_scatter_base. will fall back to "
"torch.distributed.reduce_scatter which will result in suboptimal performance. "
"please consider upgrading your pytorch installation.")
def torch_reduce_scatter_fn(input_tensor: Tensor, output_tensor: Tensor, group):
input_tensor_lst = list(
torch.chunk(input_tensor,
torch.distributed.get_world_size(group)))
instrument_w_nvtx(torch.distributed.reduce_scatter)(
output_tensor,
input_tensor_lst,
group=group,
)
@instrument_w_nvtx
@torch.no_grad()
def reduce_scatter_coalesced(
tensors: List[Tensor],
group: ProcessGroup = None,
) -> List[Tensor]:
"""simultaneously reduce-scatter a list of tensors - this can be done more
efficiently than individual reduce scatter calls
TODO. see if PyTorch team wants a c++ verson of this for ProcessGroupNCCL
"""
this_rank = torch.distributed.get_rank(group)
world_sz = torch.distributed.get_world_size(group)
partition_lst_for_each_tensor = [None] * len(tensors)
for tensor_idx, tensor in enumerate(tensors):
flattened_tensor = tensor.view(-1)
chunk_sz = math.ceil(tensor.numel() / world_sz)
partition_lst_for_each_tensor[tensor_idx] = [
flattened_tensor[rank * chunk_sz:rank * chunk_sz + chunk_sz]
for rank in range(0,
world_sz)
]
padded_partition_sz_for_each_tensor = tuple(
math.ceil(t.numel() / world_sz) for t in tensors)
if len(tensors) == 1 and tensors[0].numel() % world_sz == 0:
# if there's only one tensor being reduced and we don't need to pad
# we have an opportunity to avoid a memory allocation
tensor_partition_flat_buffer = tensors[0].view(-1)
else:
# interleave tensor partitions such that the correct reduced partitions of each tensor
# end up at each rank
tensor_partitions_lst_with_padding = []
for rank in range(world_sz):
for tensor_idx in range(len(tensors)):
# add tensor content
tensor_chunk = partition_lst_for_each_tensor[tensor_idx][rank]
tensor_partitions_lst_with_padding.append(tensor_chunk)
# add padding if necessary
padding_sz = padded_partition_sz_for_each_tensor[
tensor_idx] - tensor_chunk.numel()
if padding_sz > 0:
tensor_partitions_lst_with_padding.append(
torch.empty(padding_sz,
dtype=tensor_chunk.dtype,
device=tensor_chunk.device))
tensor_partition_flat_buffer = instrument_w_nvtx(
torch.cat)(tensor_partitions_lst_with_padding)
tensor_partition_flat_buffer.div_(world_sz) # pre-divide
tensor_partition_buffer_for_each_rank: List[Tensor] = torch.chunk(
tensor_partition_flat_buffer,
world_sz)
# batched reduce-scatter call
torch_reduce_scatter_fn(tensor_partition_flat_buffer,
tensor_partition_buffer_for_each_rank[this_rank],
group)
# reverse procedure of the interleaving done previously, done on the
# result of the batched reduce-scatter
output_lst: List[Tensor] = [None] * len(tensors)
offset = 0
for tensor_idx in range(len(tensors)):
output_lst[tensor_idx] = tensor_partition_buffer_for_each_rank[this_rank].narrow(
0,
offset,
partition_lst_for_each_tensor[tensor_idx][this_rank].numel())
offset += padded_partition_sz_for_each_tensor[tensor_idx]
return output_lst
......@@ -134,12 +134,12 @@ def get_fp16_enabled(param_dict):
def get_bfloat16_enabled(param_dict):
if BFLOAT16 in param_dict.keys():
return get_scalar_param(param_dict[BFLOAT16],
BFLOAT16_ENABLED,
BFLOAT16_ENABLED_DEFAULT)
else:
return False
for key in [BFLOAT16, BFLOAT16_OLD]:
if key in param_dict.keys():
return get_scalar_param(param_dict[key],
BFLOAT16_ENABLED,
BFLOAT16_ENABLED_DEFAULT)
return False
def get_fp16_master_weights_and_grads_enabled(param_dict):
......@@ -899,7 +899,7 @@ class DeepSpeedConfig(object):
self.fp16_enabled = get_fp16_enabled(param_dict)
self.bfloat16_enabled = get_bfloat16_enabled(param_dict)
assert not (self.fp16_enabled and self.bfloat16_enabled), 'bfloat16 and fp16 modes cannot be simultaneously enabled'
assert not (self.bfloat16_enabled and (self.zero_optimization_stage != 2)), 'bfloat16 mode is only enabled for Zero2 currently'
assert not (self.bfloat16_enabled and (self.zero_optimization_stage not in {2, 3})), f'bfloat16 mode is only enabled for Zero 2 and 3 currently. got {self.zero_optimization_stage}'
self.fp16_master_weights_and_gradients = get_fp16_master_weights_and_grads_enabled(
param_dict)
self.amp_enabled = get_amp_enabled(param_dict)
......
......@@ -114,11 +114,12 @@ SPARSE_GRADIENTS_DEFAULT = False
# Users can configure in ds_config.json as below example:
BFLOAT16_FORMAT = '''
BFLOAT16 parameters should be of the format:
"bfloat16": {
"bf16": {
"enabled": true
}
'''
BFLOAT16 = "bfloat16"
BFLOAT16 = "bf16"
BFLOAT16_OLD = "bfloat16" # keeping for backwards compatibility
BFLOAT16_ENABLED = "enabled"
BFLOAT16_ENABLED_DEFAULT = False
......
......@@ -47,7 +47,7 @@ from deepspeed.runtime.sparse_tensor import SparseTensor
import deepspeed.runtime.lr_schedules as lr_schedules
import deepspeed.utils.groups as groups
from deepspeed.runtime.utils import get_grad_norm
from deepspeed.utils import logger, log_dist, init_distributed
from deepspeed.utils import logger, log_dist, init_distributed, instrument_w_nvtx
from deepspeed.utils.timer import ThroughputTimer, SynchronizedWallClockTimer
from deepspeed.utils.debug import debug_extract_module_and_param_names
from deepspeed.runtime.progressive_layer_drop import ProgressiveLayerDrop
......@@ -706,8 +706,8 @@ class DeepSpeedEngine(Module):
def zero_param_persistence_threshold(self):
return self._config.zero_config.param_persistence_threshold
def zero_gather_fp16_weights_on_model_save(self):
return self._config.zero_config.gather_fp16_weights_on_model_save
def zero_gather_16bit_weights_on_model_save(self):
return self._config.zero_config.gather_16bit_weights_on_model_save
def zero_grad_hooks(self):
return self._config.zero_config.grad_hooks
......@@ -969,6 +969,16 @@ class DeepSpeedEngine(Module):
self.broadcast_src_rank,
group=self.data_parallel_group)
@staticmethod
def __check_params(model: Module, dtype: torch.dtype) -> None:
if not all(param.dtype == dtype
for param in model.parameters()) and dist.get_rank() == 0:
raise ValueError(
f"{dtype} is enabled but the following parameters have dtype that is "
f"not {dtype}: "
f"{[(n, p.dtype) for n, p in model.named_parameters() if p.dtype != dtype]}"
)
def _configure_distributed_model(self, model):
self.module = model
if self.fp16_enabled():
......@@ -986,17 +996,13 @@ class DeepSpeedEngine(Module):
)
self.module.half()
elif self.bfloat16_enabled():
if self.zero_optimization_partition_weights() and any(
hasattr(param,
'ds_id') for param in self.module.parameters()):
self.__check_params(self.module, torch.bfloat16)
self.module.bfloat16()
else:
if not all(
[param.dtype == torch.float for param in self.module.parameters()]):
names = [
n for n,
p in self.module.named_parameters() if p.dtype != torch.float
]
raise ValueError(
f"fp32 is enabled but the following parameters have dtype that is not fp32: {', '.join(names)}"
)
self.__check_params(self.module, torch.float)
if not self.dont_change_device:
self.module.to(self.device)
......@@ -1542,6 +1548,7 @@ class DeepSpeedEngine(Module):
return scaled_loss
@instrument_w_nvtx
def forward(self, *inputs, **kwargs):
r"""Execute forward propagation
Arguments:
......@@ -1637,6 +1644,7 @@ class DeepSpeedEngine(Module):
f"rank={torch.distributed.get_rank()} time (ms) | forward: {fwd_time:.2f} (forward_moe: {moe_time:.2f}, 1st alltoall: {falltoall:.2f}, 2nd alltoall: {salltoall:.2f}, top-k: {gate_time:.2f})",
ranks=[0])
@instrument_w_nvtx
def allreduce_gradients(self, bucket_size=MEMORY_OPT_ALLREDUCE_SIZE):
# Pass (PP) gas boundary flag to optimizer (required for zero)
self.optimizer.is_gradient_accumulation_boundary = self.is_gradient_accumulation_boundary(
......@@ -1654,6 +1662,7 @@ class DeepSpeedEngine(Module):
else:
self.buffered_allreduce_fallback(elements_per_buffer=bucket_size)
@instrument_w_nvtx
def backward(self, loss, allreduce_gradients=True, release_loss=False):
r"""Execute backward pass on the loss
......@@ -3013,7 +3022,7 @@ class DeepSpeedEngine(Module):
self._copy_recovery_script(save_path)
logger.info('zero checkpoint saved {}'.format(zero_checkpoint_name))
def _zero3_consolidated_fp16_state_dict(self):
def _zero3_consolidated_16bit_state_dict(self):
"""
Get a full non-partitioned state_dict with fp16 weights on cpu.
......@@ -3082,9 +3091,14 @@ class DeepSpeedEngine(Module):
return state_dict
def save_fp16_model(self, save_dir, save_filename="pytorch_model.bin"):
r"""Save fp16 model weights
"""has been renamed to save_16bit_model, keeping this around for backwards
compatibility"""
return self.save_16bit_model(save_dir, save_filename)
def save_16bit_model(self, save_dir, save_filename="pytorch_model.bin"):
r"""Save 16bit model weights
This method saves the fp16 model weights at the desired destination.
This method saves the 16bit model weights at the desired destination.
Arguments:
save_dir: Required. Directory for saving the model
......@@ -3092,7 +3106,7 @@ class DeepSpeedEngine(Module):
Returns:
``True`` when a model has been saved, ``False`` otherwise. It will not be saved if
stage3_gather_fp16_weights_on_model_save is ``False``.
stage3_gather_16bit_weights_on_model_save is ``False``.
Important: all processes must call this method and not just the process with rank 0. It is
because the processes need to work in sync to gather the weights. This method will hang
......@@ -3103,13 +3117,13 @@ class DeepSpeedEngine(Module):
path = os.path.join(save_dir, save_filename)
if self.zero_optimization_partition_weights():
if self.zero_gather_fp16_weights_on_model_save():
if self.zero_gather_16bit_weights_on_model_save():
# consolidation is expensive in time and memory and therefore isn't a default
state_dict = self._zero3_consolidated_fp16_state_dict()
state_dict = self._zero3_consolidated_16bit_state_dict()
else:
# the model will be bogus if not consolidated so don't confuse the user by saving it
logger.info(
f"Did not save the model {path} because `stage3_gather_fp16_weights_on_model_save` is False"
f"Did not save the model {path} because `stage3_gather_16bit_weights_on_model_save` is False"
)
return False
else:
......
......@@ -858,3 +858,12 @@ def call_to_str(base, *args, **kwargs):
name += ', '.join(f'{key}={repr(arg)}' for key, arg in kwargs.items())
name += ')'
return name
def get_only_unique_item(items):
item_set = set(items)
if len(item_set) != 1:
raise RuntimeError(f"expected there to be only one unique element in {items}")
unique_item, = item_set
return unique_item
......@@ -36,7 +36,7 @@ class DeepSpeedZeroConfig(DeepSpeedConfigObject):
self.param_persistence_threshold = None
self.max_live_parameters = None
self.max_reuse_distance = None
self.gather_fp16_weights_on_model_save = None
self.gather_16bit_weights_on_model_save = None
self.ignore_unused_parameters = None
self.round_robin_gradients = None
......@@ -171,10 +171,16 @@ class DeepSpeedZeroConfig(DeepSpeedConfigObject):
ZERO_OPTIMIZATION_PARAM_PERSISTENCE_THRESHOLD,
ZERO_OPTIMIZATION_PARAM_PERSISTENCE_THRESHOLD_DEFAULT)
self.gather_fp16_weights_on_model_save = get_scalar_param(
zero_config_dict,
ZERO_OPTIMIZATION_GATHER_FP16_WEIGHTS_ON_MODEL_SAVE,
ZERO_OPTIMIZATION_GATHER_FP16_WEIGHTS_ON_MODEL_SAVE_DEFAULT)
# config key has been renamed to use "16bit" instead of "fp16." falling back
# to old config name in order to preserve backwards compatibility
self.gather_16bit_weights_on_model_save = ZERO_OPTIMIZATION_GATHER_16BIT_WEIGHTS_ON_MODEL_SAVE_DEFAULT
for key in [
ZERO_OPTIMIZATION_GATHER_16BIT_WEIGHTS_ON_MODEL_SAVE,
ZERO_OPTIMIZATION_GATHER_FP16_WEIGHTS_ON_MODEL_SAVE
]:
if key in zero_config_dict:
self.gather_16bit_weights_on_model_save = zero_config_dict[key]
break
self.ignore_unused_parameters = get_scalar_param(
zero_config_dict,
......
......@@ -116,7 +116,8 @@ ZERO_OPTIMIZATION_PARAM_PERSISTENCE_THRESHOLD_DEFAULT = 100000
# gathers params for saving a model - inefficient but is required in certain situations
ZERO_OPTIMIZATION_GATHER_FP16_WEIGHTS_ON_MODEL_SAVE = 'stage3_gather_fp16_weights_on_model_save'
ZERO_OPTIMIZATION_GATHER_FP16_WEIGHTS_ON_MODEL_SAVE_DEFAULT = False
ZERO_OPTIMIZATION_GATHER_16BIT_WEIGHTS_ON_MODEL_SAVE = 'stage3_gather_16bit_weights_on_model_save'
ZERO_OPTIMIZATION_GATHER_16BIT_WEIGHTS_ON_MODEL_SAVE_DEFAULT = False
# Now just used in stage2 complete_grad_norm_calculation_for_cpu_offload
# Enable this option to avoid:
......@@ -164,8 +165,8 @@ ZERO_OPTIMIZATION_DEFAULT = {
ZERO_OPTIMIZATION_PREFETCH_BUCKET_SIZE_DEFAULT,
ZERO_OPTIMIZATION_PARAM_PERSISTENCE_THRESHOLD:
ZERO_OPTIMIZATION_PARAM_PERSISTENCE_THRESHOLD_DEFAULT,
ZERO_OPTIMIZATION_GATHER_FP16_WEIGHTS_ON_MODEL_SAVE:
ZERO_OPTIMIZATION_GATHER_FP16_WEIGHTS_ON_MODEL_SAVE_DEFAULT,
ZERO_OPTIMIZATION_GATHER_16BIT_WEIGHTS_ON_MODEL_SAVE:
ZERO_OPTIMIZATION_GATHER_16BIT_WEIGHTS_ON_MODEL_SAVE_DEFAULT,
ZERO_OPTIMIZATION_IGNORE_UNUSED_PARAMETERS:
ZERO_OPTIMIZATION_IGNORE_UNUSED_PARAMETERS_DEFAULT,
ZERO_OPTIMIZATION_LEGACY_STAGE1:
......
此差异已折叠。
import os
from typing import List
import torch
import torch.distributed as dist
from deepspeed.utils import logger
from deepspeed.ops.adam import DeepSpeedCPUAdam
from deepspeed.ops.adam import FusedAdam
from deepspeed.utils.nvtx import instrument_w_nvtx
def _initialize_parameter_parallel_groups(parameter_parallel_size=None):
......@@ -47,5 +51,37 @@ def is_zero_supported_optimizer(optimizer):
return type(optimizer) in ZERO_SUPPORTED_OPTIMIZERS
def get_lst_from_rank0(lst: List[int]) -> None:
"""
NOTE: creates both communication and synchronization overhead so should be used
sparingly
"""
lst_tensor = torch.tensor(
lst if dist.get_rank() == 0 else [-1] * len(lst),
dtype=int,
# device=torch.cuda.current_device(),
device=torch.device('cuda:{}'.format(os.environ["LOCAL_RANK"])),
requires_grad=False,
)
dist.broadcast(lst_tensor, src=0, async_op=False)
return list(lst_tensor.cpu().numpy())
@instrument_w_nvtx
def assert_ints_same_as_other_ranks(ints: List[int]) -> None:
"""
NOTE: creates both communication and synchronization overhead so should be
used sparingly
takes a list of ints from each rank and ensures that they are the same
across ranks, throwing an exception if they are not.
"""
rank0_ints = get_lst_from_rank0(ints)
if ints != rank0_ints:
raise RuntimeError(f"disagreement between rank0 and rank{dist.get_rank()}: "
f"rank0: {rank0_ints}, rank{dist.get_rank()}: {ints}")
class ZeRORuntimeException(Exception):
pass
from .logging import logger, log_dist
from .distributed import init_distributed
from .groups import *
from .nvtx import instrument_w_nvtx
from deepspeed.runtime.dataloader import RepeatingLoader
import torch
def instrument_w_nvtx(func):
"""decorator that causes an NVTX range to be recorded for the duration of the
function call."""
if hasattr(torch.cuda.nvtx, "range"):
def wrapped_fn(*args, **kwargs):
with torch.cuda.nvtx.range(func.__qualname__):
return func(*args, **kwargs)
return wrapped_fn
else:
return func
......@@ -178,11 +178,15 @@ class ThroughputTimer:
self.total_elapsed_time += duration
if self.local_step_count % self.steps_per_output == 0:
if report_speed:
self.logging("{}/{}, SamplesPerSec={}".format(
self.epoch_count,
self.local_step_count,
self.avg_samples_per_sec(),
))
self.logging(
"{}/{}, SamplesPerSec={}, MemAllocated={}GB, MaxMemAllocated={}GB"
.format(self.epoch_count,
self.local_step_count,
self.avg_samples_per_sec(),
round(torch.cuda.memory_allocated() / 1024**3,
2),
round(torch.cuda.max_memory_allocated() / 1024**3,
2)))
if self.monitor_memory:
virt_mem = psutil.virtual_memory()
swap = psutil.swap_memory()
......
......@@ -251,7 +251,7 @@ Example of <i>**scheduler**</i>
| Configuration for using [bfloat16](https://en.wikipedia.org/wiki/Bfloat16_floating-point_format) floating-point format as an alternative to FP16. BFLOAT16 requires hardware support (e.g., NVIDIA A100). An example, including the available dictionary keys is illustrated below. Training with bfloat16 does not require loss scaling. | None |
```json
"bfloat16": {
"bf16": {
"enabled": true
}
```
......@@ -329,7 +329,7 @@ Enabling and configuring ZeRO memory optimizations
"stage3_param_persistence_threshold" : 1e6,
"sub_group_size" : 1e12,
"elastic_checkpoint" : [true|false],
"stage3_gather_fp16_weights_on_model_save": [true|false],
"stage3_gather_16bit_weights_on_model_save": [true|false],
"ignore_unused_parameters": [true|false]
"round_robin_gradients": [true|false]
}
......@@ -433,11 +433,11 @@ Enabling and configuring ZeRO memory optimizations
| Do not partition parameters smaller than this threshold. Smaller values use less memory, but can greatly increase communication (especially latency-bound messages). | `1e6` |
***stage3_gather_fp16_weights_on_model_save***: [boolean]
***stage3_gather_16bit_weights_on_model_save***: [boolean]
| Description | Default |
|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ------- |
| Consolidate the weights before saving the model by `save_fp16_model()`. Since the weights are partitioned across GPUs, they aren't part of `state_dict`, so this function automatically gathers the weights when this option is enabled and then saves the fp16 model weights. | `False` |
| Consolidate the weights before saving the model by `save_16bit_model()`. Since the weights are partitioned across GPUs, they aren't part of `state_dict`, so this function automatically gathers the weights when this option is enabled and then saves the fp16 model weights. | `False` |
***cpu_offload***: [boolean]
......
......@@ -252,19 +252,19 @@ If you need to take the pretrained weights out of Deepspeed here is what you can
```json
"zero_optimization": {
"stage3_gather_fp16_weights_on_model_save": true
"stage3_gather_16bit_weights_on_model_save": true
},
```
And then save the model using:
```python
if self.deepspeed:
self.deepspeed.save_fp16_model(output_dir, output_file)
self.deepspeed.save_16bit_model(output_dir, output_file)
```
Because it requires consolidation of the weights on one GPU it can be slow and memory demanding, so only use this feature when needed.
Note that if `stage3_gather_fp16_weights_on_model_save` is `False`, no weights will be saved (again, because `state_dict` doesn't have them).
Note that if `stage3_gather_16bit_weights_on_model_save` is `False`, no weights will be saved (again, because `state_dict` doesn't have them).
You can use this method to save ZeRO-2 weights as well.
If you'd like to get the fp32 weights, we supply a special script that can do offline consolidation. It requires no configuration files or GPUs. Here is an example of its usage:
......
......@@ -35,7 +35,7 @@ Gradient Accumulation
Model Saving
------------
.. autofunction:: deepspeed.DeepSpeedEngine.save_fp16_model
.. autofunction:: deepspeed.DeepSpeedEngine.save_16bit_model
Additionally when a DeepSpeed checkpoint is created, a script ``zero_to_fp32.py`` is added there which can be used to reconstruct fp32 master weights into a single pytorch ``state_dict`` file.
from pathlib import Path
import torch
import os
import sys
import math
from common import get_test_path
from .common import get_test_path
from deepspeed.pipe import PipelineModule, LayerSpec
......
......@@ -10,7 +10,7 @@ import deepspeed
ckpt = deepspeed.checkpointing.checkpoint
from common import distributed_test
from .common import distributed_test
def _compute(module, *inputs, do_checkpoint=False):
......
......@@ -2,10 +2,10 @@ import deepspeed
import torch
import pytest
from common import distributed_test
from deepspeed.ops.adam import FusedAdam
from deepspeed.ops.adam import DeepSpeedCPUAdam
from simple_model import SimpleModel, args_from_dict
from .common import distributed_test
from .simple_model import SimpleModel, args_from_dict
# yapf: disable
#'optimizer, zero_offload, torch_adam, adam_w_mode, resulting_optimizer
......
......@@ -4,8 +4,8 @@ import filecmp
import torch
import deepspeed
import torch.distributed as dist
from common import distributed_test
from deepspeed.ops.aio import AsyncIOBuilder
from .common import distributed_test
MEGA_BYTE = 1024**2
BLOCK_SIZE = MEGA_BYTE
......
import os
import pytest
import torch
from simple_model import create_config_from_dict
from .simple_model import create_config_from_dict
from deepspeed.launcher import runner as dsrun
from deepspeed.autotuning.autotuner import Autotuner
from deepspeed.autotuning.scheduler import ResourceManager
......
......@@ -3,10 +3,10 @@ import torch
import deepspeed
import pytest
from deepspeed.ops.adam import FusedAdam
from common import distributed_test
from .common import distributed_test
from deepspeed.ops.op_builder import CPUAdamBuilder
from simple_model import SimpleModel, SimpleOptimizer, random_dataloader, args_from_dict
from util import bf16_required_version_check
from .simple_model import SimpleModel, SimpleOptimizer, random_dataloader, args_from_dict
from .util import bf16_required_version_check
@pytest.mark.parametrize('zero_stage, use_cpu_offload', [(2, False)])
......@@ -45,7 +45,7 @@ def test_adam_bf16_zero_onecycle_compatibility(tmpdir, zero_stage, use_cpu_offlo
"fp16": {
"enabled": False
},
"bfloat16": {
"bf16": {
"enabled": True
},
"zero_optimization": {
......@@ -95,7 +95,7 @@ def test_zero_allow_untested_optimizer(tmpdir, zero_stage, use_cpu_offload):
"fp16": {
"enabled": False,
},
"bfloat16": {
"bf16": {
"enabled": True
},
"zero_optimization": {
......@@ -139,7 +139,7 @@ def test_zero_empty_partition(tmpdir, zero_stage, use_cpu_offload):
"fp16": {
"enabled": False
},
"bfloat16": {
"bf16": {
"enabled": True
},
"optimizer": {
......@@ -199,7 +199,7 @@ def test_zero_supported_client_optimizer(tmpdir, zero_stage, optimizer_construct
"fp16": {
"enabled": False
},
"bfloat16": {
"bf16": {
"enabled": True
},
"zero_optimization": {
......@@ -250,7 +250,7 @@ def test_zero2_reduce_scatter_off(tmpdir):
"fp16": {
"enabled": False
},
"bfloat16": {
"bf16": {
"enabled": True
}
}
......@@ -290,7 +290,7 @@ def test_zero_empty_grad(tmpdir, stage):
"fp16": {
"enabled": False
},
"bfloat16": {
"bf16": {
"enabled": True
},
"zero_optimization": {
......
......@@ -16,7 +16,7 @@ PipeTopo = PipeDataParallelTopology
from deepspeed.ops.op_builder import FusedLambBuilder, CPUAdamBuilder
from deepspeed.runtime.zero.stage3 import DeepSpeedZeroOptimizer_Stage3
from util import required_torch_version
from .util import required_torch_version
import itertools
import argparse
......@@ -24,8 +24,8 @@ import pytest
import json
import os
import numbers
from common import distributed_test
from simple_model import *
from .common import distributed_test
from .simple_model import *
def compare_deepspeed_states(saved_model, loaded_model):
......
"""unit tests for coalesced collectives"""
import pytest
import torch
import torch.distributed as dist
from deepspeed.runtime.comm.coalesced_collectives import reduce_scatter_coalesced
from .common import distributed_test
@distributed_test(world_size=2)
def test_reduce_scatter_coalesced_single_input():
input = torch.full((6,
),
dist.get_rank(),
dtype=torch.half,
device=torch.cuda.current_device())
(output, ) = reduce_scatter_coalesced([input], dist.group.WORLD)
assert output.shape == (3, )
assert torch.allclose(output, torch.full_like(output, 0.5))
@distributed_test(world_size=2)
def test_reduce_scatter_coalesced_two_inputs():
tensor_kwargs = {"device": torch.cuda.current_device(), "dtype": torch.half}
inputs = [
dist.get_rank() * torch.arange(0,
6,
**tensor_kwargs),
dist.get_rank() * torch.arange(6,
9,
**tensor_kwargs),
]
output1, output2 = reduce_scatter_coalesced(inputs, dist.group.WORLD)
if dist.get_rank() == 0:
assert output1.shape == (3, )
assert torch.allclose(output1, torch.arange(0, 3, **tensor_kwargs) / 2)
assert output2.shape == (2, )
assert torch.allclose(output2, torch.arange(6, 8, **tensor_kwargs) / 2)
elif dist.get_rank() == 1:
assert output1.shape == (3, )
assert torch.allclose(output1, torch.arange(3, 6, **tensor_kwargs) / 2)
assert output2.shape == (1, )
assert torch.allclose(output2, torch.arange(8, 9, **tensor_kwargs) / 2)
@distributed_test(world_size=2)
def test_reduce_scatter_coalesced_tensor_smaller_than_world_sz():
input = torch.zeros((1, ), dtype=torch.half, device=torch.cuda.current_device())
(output, ) = reduce_scatter_coalesced([input], dist.group.WORLD)
if dist.get_rank() == 0:
assert output.shape == (1, )
assert torch.allclose(output, torch.zeros_like(output))
elif dist.get_rank() == 1:
assert output.shape == (0, )
......@@ -3,13 +3,16 @@ import torch
import pytest
import json
import argparse
from common import distributed_test, get_test_path
from simple_model import SimpleModel, create_config_from_dict, random_dataloader
from deepspeed.runtime.zero.config import DeepSpeedZeroConfig
from .common import distributed_test, get_test_path
from .simple_model import SimpleModel, create_config_from_dict, random_dataloader
import torch.distributed as dist
# A test on its own
import deepspeed
from deepspeed.runtime.config import DeepSpeedConfig
from deepspeed.runtime.config import DeepSpeedConfig, get_bfloat16_enabled
def test_cuda():
......@@ -114,6 +117,32 @@ def test_temp_config_json(tmpdir):
assert 'train_batch_size' in config_json
@pytest.mark.parametrize("gather_weights_key",
[
"stage3_gather_16bit_weights_on_model_save",
"stage3_gather_fp16_weights_on_model_save"
])
def test_gather_16bit_params_on_model_save(gather_weights_key):
config_dict = {
"zero_optimization": {
gather_weights_key: True,
},
}
config = DeepSpeedZeroConfig(config_dict)
assert config.gather_16bit_weights_on_model_save == True
@pytest.mark.parametrize("bf16_key", ["bf16", "bfloat16"])
def test_get_bfloat16_enabled(bf16_key):
cfg = {
bf16_key: {
"enabled": True,
},
}
assert get_bfloat16_enabled(cfg) == True
def test_deprecated_deepscale_config(tmpdir):
config_dict = {
"train_batch_size": 1,
......
......@@ -7,10 +7,10 @@ import random
import numpy as np
import torch.multiprocessing as mp
import torch.distributed as dist
from common import distributed_test
from simple_model import args_from_dict, create_deepspeed_args
from megatron_model import get_gpt2_model, get_megatron_version
from megatron_model import MockGPT2ModelPipe as GPT2ModelPipe
from .common import distributed_test
from .simple_model import args_from_dict, create_deepspeed_args
from .megatron_model import get_gpt2_model, get_megatron_version
from .megatron_model import MockGPT2ModelPipe as GPT2ModelPipe
from deepspeed.utils import RepeatingLoader
TORCH_MAJOR = int(torch.__version__.split('.')[0])
......
import argparse
import numpy as np
import torch
import torch.nn.functional as F
import pytest
import json
import random
import time
import copy
from torch import nn
from modelingpreln import BertEncoder as BertEncoderPreln
from modeling import BertEncoder as BertEncoderPostln
from modeling import BertConfig, BertLayerNorm
from deepspeed import DeepSpeedTransformerLayer, DeepSpeedTransformerConfig
import deepspeed
import sys
from .modeling import BertConfig, BertLayerNorm, BertEncoder as BertEncoderPostln
from .modelingpreln import BertEncoder as BertEncoderPreln
#if not deepspeed.ops.__installed_ops__['transformer']:
#pytest.skip(
......
......@@ -8,9 +8,8 @@ import random
import time
import copy
from torch import nn
from modelingpreln import BertEncoder as BertEncoderPreln
from modeling import BertEncoder as BertEncoderPostln
from modeling import BertLayerNorm, BertConfig
from .modelingpreln import BertEncoder as BertEncoderPreln
from .modeling import BertLayerNorm, BertConfig, BertEncoder as BertEncoderPostln
from deepspeed import DeepSpeedTransformerLayer, DeepSpeedTransformerConfig
import deepspeed
......
......@@ -7,8 +7,8 @@ import json
import os
import numpy as np
import time
from common import distributed_test
from simple_model import Curriculum_SimpleModel, random_dataloader, args_from_dict
from .common import distributed_test
from .simple_model import Curriculum_SimpleModel, random_dataloader, args_from_dict
def test_curriculum_scheduler_fixed_discrete(tmpdir):
......
......@@ -2,8 +2,8 @@ from deepspeed.utils import RepeatingLoader
import torch
import pytest
import deepspeed
from common import distributed_test
from simple_model import SimpleModel, args_from_dict, random_dataset
from .common import distributed_test
from .simple_model import SimpleModel, args_from_dict, random_dataset
def test_repeating_loader():
......
import torch
import torch.distributed as dist
from common import distributed_test
from .common import distributed_test
import pytest
......
......@@ -4,9 +4,9 @@ import torch
from torch.optim import Optimizer, Adam, AdamW
from torch.optim.lr_scheduler import _LRScheduler, LambdaLR
from simple_model import args_from_dict, SimpleModel, random_dataloader
from common import distributed_test
from util import required_torch_version
from .simple_model import args_from_dict, SimpleModel, random_dataloader
from .common import distributed_test
from .util import required_torch_version
import deepspeed
from deepspeed.ops.adam import FusedAdam
......
......@@ -5,8 +5,8 @@ import pytest
import json
import os
import numpy as np
from common import distributed_test
from simple_model import SimpleModel, args_from_dict
from .common import distributed_test
from .simple_model import SimpleModel, args_from_dict
def run_model_step(model, gradient_list):
......
import pytest
import deepspeed
from common import distributed_test
from .common import distributed_test
from deepspeed.git_version_info import version as ds_version
from simple_model import SimpleModel, SimpleOptimizer, random_dataloader, args_from_dict
from .simple_model import SimpleModel, SimpleOptimizer, random_dataloader, args_from_dict
base_ds_config = {
"elasticity": {
......
......@@ -3,8 +3,8 @@ import pytest
import deepspeed
import deepspeed.runtime.utils as ds_utils
from deepspeed.profiling.flops_profiler import FlopsProfiler, get_model_profile
from simple_model import SimpleModel, SimpleOptimizer, random_dataloader, args_from_dict
from common import distributed_test
from .simple_model import SimpleModel, SimpleOptimizer, random_dataloader, args_from_dict
from .common import distributed_test
TORCH_MAJOR = int(torch.__version__.split('.')[0])
TORCH_MINOR = int(torch.__version__.split('.')[1])
......
......@@ -8,10 +8,10 @@ import pytest
import json
import os
from deepspeed.ops.adam import FusedAdam
from common import distributed_test
from .common import distributed_test
from deepspeed.ops.op_builder import CPUAdamBuilder
from simple_model import SimpleModel, SimpleOptimizer, random_dataloader, args_from_dict, create_deepspeed_args, SimpleMoEModel, sequence_dataloader
from util import required_torch_version
from .simple_model import SimpleModel, SimpleOptimizer, random_dataloader, args_from_dict, create_deepspeed_args, SimpleMoEModel, sequence_dataloader
from .util import required_torch_version
try:
from apex import amp
......
......@@ -3,8 +3,8 @@ import pytest
import json
import argparse
import os
from common import distributed_test
from simple_model import UnusedParametersModel, random_dataloader, args_from_dict
from .common import distributed_test
from .simple_model import UnusedParametersModel, random_dataloader, args_from_dict
from deepspeed.ops.op_builder import CPUAdamBuilder
import deepspeed
......
......@@ -4,8 +4,8 @@ import argparse
import pytest
import json
import os
from common import distributed_test
from simple_model import SimpleModel, SimpleOptimizer, random_dataloader, args_from_dict
from .common import distributed_test
from .simple_model import SimpleModel, SimpleOptimizer, random_dataloader, args_from_dict
from deepspeed.runtime.lr_schedules import LR_RANGE_TEST, LR_RANGE_TEST_MIN_LR, LR_RANGE_TEST_STEP_RATE, LR_RANGE_TEST_STEP_SIZE, LR_RANGE_TEST_STAIRCASE
from deepspeed.runtime.lr_schedules import WARMUP_LR, WARMUP_MIN_LR, WARMUP_MAX_LR, WARMUP_NUM_STEPS, WARMUP_TYPE, WARMUP_LOG_RATE, WARMUP_LINEAR_RATE
from deepspeed.runtime.lr_schedules import ONE_CYCLE, CYCLE_MIN_LR, CYCLE_MAX_LR, CYCLE_FIRST_STEP_SIZE, DECAY_LR_RATE, DECAY_STEP_SIZE
......
......@@ -8,10 +8,10 @@ import pytest
import json
import os
from deepspeed.ops.adam import FusedAdam
from common import distributed_test
from .common import distributed_test
from deepspeed.ops.op_builder import CPUAdamBuilder
from simple_model import SimpleModel, SimpleOptimizer, random_dataloader, args_from_dict, create_deepspeed_args, SimpleMoEModel, sequence_dataloader
from util import required_torch_version
from .simple_model import SimpleModel, SimpleOptimizer, random_dataloader, args_from_dict, create_deepspeed_args, SimpleMoEModel, sequence_dataloader
from .util import required_torch_version
try:
from apex import amp
......
......@@ -5,9 +5,9 @@ import pytest
from pytest import approx
import json
import os
from common import distributed_test
from simple_model import args_from_dict
from multi_output_model import MultiOutputModel, multi_output_dataloader
from .common import distributed_test
from .simple_model import args_from_dict
from .multi_output_model import MultiOutputModel, multi_output_dataloader
def create_config_dict(micro_batch_size, grad_accumulation_steps, world_size):
......
......@@ -15,9 +15,9 @@ from deepspeed.runtime.pipe.topology import PipeDataParallelTopology, PipeModelD
PipeTopo = PipeDataParallelTopology
from deepspeed.runtime.pipe.module import PipelineModule, LayerSpec
from common import distributed_test
from simple_model import SimpleModel, SimpleOptimizer, random_dataloader, args_from_dict, create_deepspeed_args
from test_pipe import AlexNetPipe, train_cifar
from .common import distributed_test
from .simple_model import SimpleModel, SimpleOptimizer, random_dataloader, args_from_dict, create_deepspeed_args
from .test_pipe import AlexNetPipe, train_cifar
TORCH_MAJOR = int(torch.__version__.split('.')[0])
TORCH_MINOR = int(torch.__version__.split('.')[1])
......
......@@ -8,7 +8,7 @@ from deepspeed.runtime.utils import partition_balanced
from deepspeed.runtime.utils import prefix_sum_inc
from deepspeed.runtime.utils import PartitionedTensor
from common import distributed_test
from .common import distributed_test
@distributed_test(world_size=4)
......
......@@ -17,7 +17,7 @@ from deepspeed.runtime.pipe.topology import PipeDataParallelTopology, PipeModelD
PipeTopo = PipeDataParallelTopology
from deepspeed.runtime.pipe.module import PipelineModule, LayerSpec
from common import distributed_test
from .common import distributed_test
def rel_diff(A, B):
......@@ -25,7 +25,7 @@ def rel_diff(A, B):
# All models
from simple_model import args_from_dict
from .simple_model import args_from_dict
class AlexNet(nn.Module):
......
......@@ -15,8 +15,8 @@ PipeTopo = PipeDataParallelTopology
from deepspeed.pipe import PipelineModule, LayerSpec
from deepspeed.utils import RepeatingLoader
from common import distributed_test
from simple_model import args_from_dict
from .common import distributed_test
from .simple_model import args_from_dict
HIDDEN_DIM = 32
LAYERS = 8
......
......@@ -2,8 +2,9 @@ import numpy as np
import deepspeed
import pytest
from deepspeed.runtime.progressive_layer_drop import ProgressiveLayerDrop
from common import distributed_test
from simple_model import SimpleModel, PLD_SimpleModel, SimpleOptimizer, random_dataloader, args_from_dict
from .common import distributed_test
from .simple_model import SimpleModel, PLD_SimpleModel, SimpleOptimizer, random_dataloader, args_from_dict
@pytest.mark.parametrize('theta', [0, 0.1, 0.9, 1.0])
......
......@@ -8,7 +8,7 @@ import deepspeed.runtime.utils as ds_utils
from deepspeed.utils.logging import log_dist
import deepspeed.utils.groups as groups
from common import distributed_test
from .common import distributed_test
def test_call_to_str():
......
......@@ -2,7 +2,8 @@ import torch
import torch.distributed as dist
import deepspeed
import pytest
from common import distributed_test
from .common import distributed_test
import deepspeed.utils.groups as groups
......
......@@ -7,7 +7,7 @@ from deepspeed.runtime.pipe.topology import PipelineParallelGrid as Grid
from deepspeed.runtime.pipe.topology import ProcessTopology as Topo
from deepspeed.runtime.pipe.topology import _prime_factors
from common import distributed_test
from .common import distributed_test
def test_topology_2d():
......
此差异已折叠。
import os
import sys
from types import SimpleNamespace
import torch
......@@ -8,7 +7,7 @@ import pytest
import deepspeed
from deepspeed.runtime.zero.partition_parameters import ZeroParamStatus, partitioned_param_data_shape
from common import distributed_test, get_master_port
from .common import distributed_test, get_master_port
def setup_serial_env():
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册