未验证 提交 ffcf3846 编写于 作者: M Ma, Guokai 提交者: GitHub

Abstract accelerator (step 1) (#2504)

* Establish building block of abstract accelerator

* Change .*Tensor variable to @property

* [op builder] add op builder reflection to allow enumerate of builders in all_ops.py and builder_names.py

* change @abstractproperty to @property @abstractmethod
Co-authored-by: NOlatunji Ruwase <olruwase@microsoft.com>
上级 c5f85858
from .abstract_accelerator import DeepSpeedAccelerator
from .real_accelerator import get_accelerator, set_accelerator
import abc
from abc import ABC
class DeepSpeedAccelerator(ABC):
def __init__(self):
self._name = None
self._communication_backend_name = None
# Device APIs
@abc.abstractmethod
def device_name(self, device_index):
...
@abc.abstractmethod
def device(self, device_index):
...
@abc.abstractmethod
def set_device(self, device_index):
...
@abc.abstractmethod
def current_device(self):
...
@abc.abstractmethod
def current_device_name(self):
...
@abc.abstractmethod
def device_count(self):
...
@abc.abstractmethod
def synchronize(self, device_index=None):
...
# RNG APIs
@abc.abstractmethod
def random(self):
...
@abc.abstractmethod
def set_rng_state(self, new_state, device_index=None):
...
@abc.abstractmethod
def get_rng_state(self, device_index=None):
...
@abc.abstractmethod
def manual_seed(self, seed):
...
@abc.abstractmethod
def manual_seed_all(self, seed):
...
@abc.abstractmethod
def initial_seed(self, seed):
...
@abc.abstractmethod
def default_generator(self, device_index):
...
# Streams/Events
@abc.abstractmethod
def Stream(self, device=None, priority=0, **kwargs):
...
@abc.abstractmethod
def StreamContext(self, stream):
...
@abc.abstractmethod
def stream(self, stream):
...
@abc.abstractmethod
def current_stream(self, device_index=None):
...
@abc.abstractmethod
def default_stream(self, device_index=None):
...
@abc.abstractmethod
def Event(self, **kwargs):
...
# Memory management
@abc.abstractmethod
def empty_cache(self):
...
@abc.abstractmethod
def memory_allocated(self, device_index=None):
...
@abc.abstractmethod
def max_memory_allocated(self, device_index=None):
...
@abc.abstractmethod
def reset_max_memory_allocated(self, device_index=None):
...
@abc.abstractmethod
def memory_cached(self, device_index=None):
...
@abc.abstractmethod
def max_memory_cached(self, device_index=None):
...
@abc.abstractmethod
def reset_max_memory_cached(self, device_index=None):
...
@abc.abstractmethod
def memory_stats(self, device_index=None):
...
@abc.abstractmethod
def reset_peak_memory_stats(self, device_index=None):
...
@abc.abstractmethod
def memory_reserved(self, device_index=None):
...
@abc.abstractmethod
def max_memory_reserved(self, device_index=None):
...
@abc.abstractmethod
def total_memory(self, device_index=None):
...
# Data types
@abc.abstractmethod
def is_bf16_supported(self):
...
@abc.abstractmethod
def is_fp16_supported(self):
...
# Misc
@abc.abstractmethod
def amp(self):
...
@abc.abstractmethod
def is_available(self):
...
@abc.abstractmethod
def range_push(self, msg):
...
@abc.abstractmethod
def range_pop(self):
...
@abc.abstractmethod
def lazy_call(self, callback):
...
@abc.abstractmethod
def communication_backend_name(self):
...
# Tensor operations
@property
@abc.abstractmethod
def BFloat16Tensor(self):
...
@property
@abc.abstractmethod
def ByteTensor(self):
...
@property
@abc.abstractmethod
def DoubleTensor(self):
...
@property
@abc.abstractmethod
def FloatTensor(self):
...
@property
@abc.abstractmethod
def HalfTensor(self):
...
@property
@abc.abstractmethod
def IntTensor(self):
...
@property
@abc.abstractmethod
def LongTensor(self):
...
@abc.abstractmethod
def pin_memory(self, tensor):
...
@abc.abstractmethod
def on_accelerator(self, tensor):
...
@abc.abstractmethod
def op_builder_dir(self):
...
@abc.abstractmethod
def create_op_builder(self, class_name):
...
@abc.abstractmethod
def build_extension(self):
...
from deepspeed.accelerator.abstract_accelerator import DeepSpeedAccelerator
import torch.cuda
class CUDA_Accelerator(DeepSpeedAccelerator):
def __init__(self):
self._name = 'cuda'
self._communication_backend_name = 'nccl'
# Device APIs
def device_name(self, device_index=None):
if device_index == None:
return 'cuda'
return 'cuda:{}'.format(device_index)
def device(self, device_index=None):
return torch.cuda.device(device_index)
def set_device(self, device_index):
torch.cuda.set_device(device_index)
def current_device(self):
return torch.cuda.current_device()
def current_device_name(self):
return 'cuda:{}'.format(torch.cuda.current_device())
def device_count(self):
return torch.cuda.device_count()
def synchronize(self, device_index=None):
return torch.cuda.synchronize(device_index)
# RNG APIs
def random(self):
return torch.random
def set_rng_state(self, new_state, device_index=None):
if device_index is None:
return torch.cuda.set_rng_state(new_state)
return torch.cuda.set_rng_state(new_state, device_index)
def get_rng_state(self, device_index=None):
if device_index is None:
return torch.cuda.get_rng_state()
return torch.cuda.get_rng_state(device_index)
def manual_seed(self, seed):
return torch.cuda.manual_seed(seed)
def manual_seed_all(self, seed):
return torch.cuda.manual_seed_all(seed)
def initial_seed(self, seed):
return torch.cuda.initial_seed(seed)
def default_generator(self, device_index):
return torch.cuda.default_generators[device_index]
# Streams/Events
def Stream(self, device=None, priority=0, **kwargs):
return torch.cuda.Stream(device, priority, **kwargs)
def StreamContext(self, stream):
return torch.cuda.StreamContext(stream)
def stream(self, stream):
return torch.cuda.stream(stream)
def current_stream(self, device_index=None):
return torch.cuda.current_stream(device_index)
def default_stream(self, device_index=None):
return torch.cuda.default_stream(device_index)
def Event(self, **kwargs):
return torch.cuda.Event(**kwargs)
# Memory management
def empty_cache(self):
return torch.cuda.empty_cache()
def memory_allocated(self, device_index=None):
return torch.cuda.memory_allocated(device_index)
def max_memory_allocated(self, device_index=None):
return torch.cuda.max_memory_allocated(device_index)
def reset_max_memory_allocated(self, device_index=None):
return torch.cuda.reset_max_memory_allocated(device_index)
def memory_cached(self, device_index=None):
return torch.cuda.memory_cached(device_index)
def max_memory_cached(self, device_index=None):
return torch.cuda.max_memory_cached(device_index)
def reset_max_memory_cached(self, device_index=None):
return torch.cuda.reset_max_memory_cached(device_index)
def memory_stats(self, device_index=None):
if hasattr(torch.cuda, 'memory_stats'):
return torch.cuda.memory_stats(device_index)
def reset_peak_memory_stats(self, device_index=None):
if hasattr(torch.cuda, 'reset_peak_memory_stats'):
return torch.cuda.reset_peak_memory_stats(device_index)
def memory_reserved(self, device_index=None):
if hasattr(torch.cuda, 'memory_reserved'):
return torch.cuda.memory_reserved(device_index)
def max_memory_reserved(self, device_index=None):
if hasattr(torch.cuda, 'max_memory_reserved'):
return torch.cuda.max_memory_reserved(device_index)
def total_memory(self, device_index=None):
return torch.cuda.get_device_properties(device_index).total_memory
# Data types
def is_bf16_supported(self):
return torch.cuda.is_bf16_supported()
def is_fp16_supported(self):
major, _ = torch.cuda.get_device_capability()
if major >= 7:
return True
else:
return False
# Misc
def amp(self):
if hasattr(torch.cuda, 'amp'):
return torch.cuda.amp
return None
def is_available(self):
return torch.cuda.is_available()
def range_push(self, msg):
if hasattr(torch.cuda.nvtx, 'range_push'):
return torch.cuda.nvtx.range_push(msg)
def range_pop(self):
if hasattr(torch.cuda.nvtx, 'range_pop'):
return torch.cuda.nvtx.range_pop()
def lazy_call(self, callback):
return torch.cuda._lazy_call(callback)
def communication_backend_name(self):
return self._communication_backend_name
# Tensor operations
@property
def BFloat16Tensor(self):
return torch.cuda.BFloat16Tensor
@property
def ByteTensor(self):
return torch.cuda.ByteTensor
@property
def DoubleTensor(self):
return torch.cuda.DoubleTensor
@property
def FloatTensor(self):
return torch.cuda.FloatTensor
@property
def HalfTensor(self):
return torch.cuda.HalfTensor
@property
def IntTensor(self):
return torch.cuda.IntTensor
@property
def LongTensor(self):
return torch.cuda.LongTensor
def pin_memory(self, tensor):
return tensor.pin_memory()
def on_accelerator(self, tensor):
device_str = str(tensor.device)
if device_str.startswith('cuda:'):
return True
else:
return False
def op_builder_dir(self):
return "deepspeed.ops.op_builder"
def create_op_builder(self, class_name):
from deepspeed.ops.op_builder import AsyncIOBuilder, CPUAdagradBuilder, CPUAdamBuilder, FusedAdamBuilder, FusedLambBuilder, QuantizerBuilder, SparseAttnBuilder, StochasticTransformerBuilder, TransformerBuilder, InferenceBuilder, UtilsBuilder
from deepspeed.ops.op_builder.builder_names import AsyncIOBuilder as AsyncIOBuilderName
from deepspeed.ops.op_builder.builder_names import CPUAdagradBuilder as CPUAdagradBuilderName
from deepspeed.ops.op_builder.builder_names import CPUAdamBuilder as CPUAdamBuilderName
from deepspeed.ops.op_builder.builder_names import FusedAdamBuilder as FusedAdamBuilderName
from deepspeed.ops.op_builder.builder_names import FusedLambBuilder as FusedLambBuilderName
from deepspeed.ops.op_builder.builder_names import QuantizerBuilder as QuantizerBuilderName
from deepspeed.ops.op_builder.builder_names import SparseAttnBuilder as SparseAttnBuilderName
from deepspeed.ops.op_builder.builder_names import StochasticTransformerBuilder as StochasticTransformerBuilderName
from deepspeed.ops.op_builder.builder_names import TransformerBuilder as TransformerBuilderName
from deepspeed.ops.op_builder.builder_names import InferenceBuilder as InferenceBuilderName
from deepspeed.ops.op_builder.builder_names import UtilsBuilder as UtilsBuilderName
if class_name == AsyncIOBuilderName:
return AsyncIOBuilder()
elif class_name == CPUAdagradBuilderName:
return CPUAdagradBuilder()
elif class_name == CPUAdamBuilderName:
return CPUAdamBuilder()
elif class_name == FusedAdamBuilderName:
return FusedAdamBuilder()
elif class_name == FusedLambBuilderName:
return FusedLambBuilder()
elif class_name == QuantizerBuilderName:
return QuantizerBuilder()
elif class_name == SparseAttnBuilderName:
return SparseAttnBuilder()
elif class_name == StochasticTransformerBuilderName:
return StochasticTransformerBuilder()
elif class_name == TransformerBuilderName:
return TransformerBuilder()
elif class_name == InferenceBuilderName:
return InferenceBuilder()
elif class_name == UtilsBuilderName:
return UtilsBuilder()
else:
return None
def build_extension(self):
from torch.utils.cpp_extension import BuildExtension
return BuildExtension
from .abstract_accelerator import DeepSpeedAccelerator
ds_accelerator = None
def _validate_accelerator(accel_obj):
assert isinstance(accel_obj, DeepSpeedAccelerator), \
f'{accel_obj.__class__.__name__} accelerator is not subclass of DeepSpeedAccelerator'
# TODO: turn off is_available test since this breaks tests
#assert accel_obj.is_available(), \
# f'{accel_obj.__class__.__name__} accelerator fails is_available() test'
def get_accelerator():
global ds_accelerator
if ds_accelerator is None:
try:
from intel_extension_for_deepspeed import XPU_Accelerator
except ImportError as e:
pass
else:
ds_accelerator = XPU_Accelerator()
_validate_accelerator(ds_accelerator)
return ds_accelerator
from deepspeed.accelerator.cuda_accelerator import CUDA_Accelerator
ds_accelerator = CUDA_Accelerator()
_validate_accelerator(ds_accelerator)
return ds_accelerator
def set_accelerator(accel_obj):
global ds_accelerator
_validate_accelerator(accel_obj)
ds_accelerator = accel_obj
'''
-----------[code] test_get.py -----------
from deepspeed.accelerator import get_accelerator
my_accelerator = get_accelerator()
print(f'{my_accelerator._name=}')
print(f'{my_accelerator._communication_backend=}')
print(f'{my_accelerator.HalfTensor().device=}')
print(f'{my_accelerator.total_memory()=}')
-----------[code] test_get.py -----------
---[output] python test_get.py---------
my_accelerator.name()='cuda'
my_accelerator.communication_backend='nccl'
my_accelerator.HalfTensor().device=device(type='cuda', index=0)
my_accelerator.total_memory()=34089730048
---[output] python test_get.py---------
**************************************************************************
-----------[code] test_set.py -----------
from deepspeed.accelerator.cuda_accelerator import CUDA_Accelerator
cu_accel = CUDA_Accelerator()
print(f'{id(cu_accel)=}')
from deepspeed.accelerator import set_accelerator, get_accelerator
set_accelerator(cu_accel)
my_accelerator = get_accelerator()
print(f'{id(my_accelerator)=}')
print(f'{my_accelerator._name=}')
print(f'{my_accelerator._communication_backend=}')
print(f'{my_accelerator.HalfTensor().device=}')
print(f'{my_accelerator.total_memory()=}')
-----------[code] test_set.py -----------
---[output] python test_set.py---------
id(cu_accel)=139648165478304
my_accelerator=<deepspeed.accelerator.cuda_accelerator.CUDA_Accelerator object at 0x7f025f4bffa0>
my_accelerator.name='cuda'
my_accelerator.communication_backend='nccl'
my_accelerator.HalfTensor().device=device(type='cuda', index=0)
my_accelerator.total_memory()=34089730048
---[output] python test_set.py---------
'''
......@@ -15,6 +15,7 @@ from .quantizer import QuantizerBuilder
from .spatial_inference import SpatialInferenceBuilder
from .builder import get_default_compute_capabilities, OpBuilder
# TODO: This will be removed eventurally when all files containing reference to ALL_OPS redirected to op_builder.all_ops
# TODO: infer this list instead of hard coded
# List of all available ops
__op_builders__ = [
......
"""
Copyright 2020 The Microsoft DeepSpeed Team
"""
import os
import pkgutil
import importlib
from deepspeed.accelerator import get_accelerator
# List of all available ops
# reflect all builder names into __op_builders__
op_builder_dir = get_accelerator().op_builder_dir()
op_builder_module = importlib.import_module(op_builder_dir)
__op_builders__ = []
for _, module_name, _ in pkgutil.iter_modules([os.path.dirname(op_builder_module.__file__)]):
# avoid self references
if module_name != 'all_ops' and module_name != 'builder' and module_name != 'builder_names':
module = importlib.import_module("{}.{}".format(op_builder_dir, module_name))
for member_name in module.__dir__():
if member_name.endswith('Builder'):
# append builder to __op_builders__ list
builder = get_accelerator().create_op_builder(member_name)
__op_builders__.append(builder)
ALL_OPS = {op.name: op for op in __op_builders__ if op is not None}
import sys
import os
import pkgutil
import importlib
# List of all available op builders from deepspeed op_builder
op_builder_dir = "deepspeed.ops.op_builder"
op_builder_module = importlib.import_module(op_builder_dir)
__op_builders__ = []
this_module = sys.modules[__name__]
# reflect all builder names into variable definition such as 'TransformerBuilder = "TransformerBuilder"'
for _, module_name, _ in pkgutil.iter_modules([os.path.dirname(op_builder_module.__file__)]):
# avoid self references
if module_name != 'all_ops' and module_name != 'builder' and module_name != 'builder_names':
module = importlib.import_module("{}.{}".format(op_builder_dir, module_name))
for member_name in module.__dir__():
if member_name.endswith(
'Builder'
) and member_name != "OpBuilder" and member_name != "CUDAOpBuilder":
# assign builder name to variable with same name
# the following is equivalent to i.e. TransformerBuilder = "TransformerBuilder"
this_module.__dict__[member_name] = member_name
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册