未验证 提交 baa95c62 编写于 作者: M Michael Wyatt 提交者: GitHub

remove benchmarks (now in DSE) and add links (#3157)

Co-authored-by: NJeff Rasley <jerasley@microsoft.com>
上级 adc15e1c
# DeepSpeed Benchmarks
If you are looking for DeepSpeed benchmarks, please see the following resources:
1. [Communication Benchmarking Suite](https://github.com/microsoft/DeepSpeedExamples/tree/master/benchmarks/communication)
2. [Inference Benchmarks](https://github.com/microsoft/DeepSpeedExamples/tree/master/benchmarks/inference)
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
'''Copyright The Microsoft DeepSpeed Team'''
# The DeepSpeed Communication Benchmarking Suite
The intent of these benchmarks is to measure communication latency/bw of deepspeed and/or pytorch distributed communication operations at the Python layer. These benchmarks are complementary to C-level comms benchmarks like [OSU Micro-Benchmarks](https://mvapich.cse.ohio-state.edu/benchmarks/) and [NCCL Tests](https://github.com/NVIDIA/nccl-tests) in that users can:
- Easily debug which layer of the communication software stack hangs or performance degradations originate from.
- Measure the expected communication performance of either DeepSpeed comms or pure PyTorch distributed
To run benchmarks, there are two options:
1. Run a single communication operation:
For example, run with a single large message size (calculated to barely fit within GPU mem):
<pre>
deepspeed all_reduce.py
</pre>
Scan across message sizes:
<pre>
deepspeed all_reduce.py --scan
</pre>
Benchmark pure PyTorch distributed comms (without importing or using DeepSpeed) with MPI
<pre>
mpirun -np 16 --hostfile ${HOSTFILE} -x LD_LIBRARY_PATH -x PATH -x LD_PRELOAD python all_reduce.py --scan --dist="torch"
</pre>
or Slurm
<pre>
srun -n 16 python all_reduce.py --scan --dist="torch"
</pre>
2. Run all available communication benchmarks:
<pre>
deepspeed run_all.py
</pre>
Like the individual benchmarks, `run_all.py` supports scanning arguments for the max message size, bw-unit, etc. Simply pass the desired arguments to `run_all.py` and they'll be propagated to each comm op.
<pre>
usage: ds_bench [-h] [--local_rank LOCAL_RANK] [--trials TRIALS] [--warmups WARMUPS] [--maxsize MAXSIZE] [--async-op] [--bw-unit {Gbps,GBps}] [--backend {nccl}] [--dist {deepspeed,torch}] [--scan] [--raw] [--all-reduce] [--all-gather] [--all-to-all]
[--pt2pt] [--broadcast] [--dtype DTYPE] [--mem-factor MEM_FACTOR] [--debug]
optional arguments:
-h, --help show this help message and exit
--local_rank LOCAL_RANK
--trials TRIALS Number of timed iterations
--warmups WARMUPS Number of warmup (non-timed) iterations
--maxsize MAXSIZE Max message size as a power of 2
--async-op Enables non-blocking communication
--bw-unit {Gbps,GBps}
--backend {nccl} Communication library to use
--dist {deepspeed,torch}
Distributed DL framework to use
--scan Enables scanning all message sizes
--raw Print the message size and latency without units
--all-reduce Run all_reduce
--all-gather Run all_gather
--all-to-all Run all_to_all
--pt2pt Run pt2pt
--broadcast Run broadcast
--dtype DTYPE PyTorch tensor dtype
--mem-factor MEM_FACTOR
Proportion of max available GPU memory to use for single-size evals
--debug Enables all_to_all debug prints
</pre>
Note that `ds_bench` is a pre-packaged wrapper around `run_all.py`. Users can pass the same arguments as well:
<pre>
<path to deepspeed>/bin/ds_bench --scan --trials=10
</pre>
Finally, users can choose specific communication operations to run in `run_all.py` or `ds_bench` by passing them as arguments (all operations are run by default). For example:
<pre>
deepspeed run_all.py --scan --all-reduce --all-to-all --broadcast
</pre>
# Adding Communication Benchmarks
To add new communication benchmarks, follow this general procedure:
1. Copy a similar benchmark file (e.g. to add `reduce_scatter`, copy `all_reduce.py` as a template)
2. Add a new bw formula in `utils.get_bw`, a new maximum tensor element formula in `utils.max_numel`, and a new arg in `utils.benchmark_parser`
3. Replace comm op calls in new file with find-replace
4. Find a good default `mem_factor` for use in `run_<collective>_single()` function
5. Add new comm op to `run_all.py`
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
'''Copyright The Microsoft DeepSpeed Team'''
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
import torch
import sys, os, time
COMMS_BENCH_DIR = os.path.join(os.path.dirname(__file__), "../")
sys.path.append(COMMS_BENCH_DIR)
from communication.utils import *
from communication.constants import *
from deepspeed.accelerator import get_accelerator
from deepspeed.comm import TorchBackend
# Run all_gather and print metrics
def timed_all_gather(input, output, args):
if args.dist == 'torch':
import torch.distributed as dist
all_gather_func = TorchBackend.get_all_gather_function()
elif args.dist == 'deepspeed':
import deepspeed.comm as dist
all_gather_func = dist.allgather_fn
sync_all()
# Warmups, establish connections, etc.
for i in range(args.warmups):
all_gather_func(output, input, group=None, async_op=args.async_op)
sync_all()
# time the actual comm op trials times and average it
pre = time.perf_counter()
for i in range(args.trials):
all_gather_func(output, input, group=None, async_op=args.async_op)
sync_all()
duration = time.perf_counter() - pre
# maintain and clean performance data
avg_duration = duration / args.trials
size = input.element_size() * input.nelement()
tput, busbw = get_bw('all_gather', size, avg_duration, args)
tput_str, busbw_str, duration_str = get_metric_strings(args, tput, busbw, avg_duration)
desc = f'{input.nelement()}x{input.element_size()}'
if not args.raw:
size = convert_size(size)
print_rank_0(f"{size:<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}")
def run_all_gather(local_rank, args):
if args.dist == 'torch':
import torch.distributed as dist
elif args.dist == 'deepspeed':
import deepspeed.comm as dist
# Prepare benchmark header
print_header(args, 'all_gather')
global_rank = dist.get_rank()
world_size = dist.get_world_size()
if args.scan:
# Create list of message sizes
M_LIST = []
for x in (2**p for p in range(1, args.maxsize)):
M_LIST.append(x)
sync_all()
# loop over various tensor sizes
for M in M_LIST:
global_rank = dist.get_rank()
try:
mat = torch.ones(world_size, M,
dtype=getattr(torch, args.dtype)).to(get_accelerator().device_name(local_rank))
sync_all()
input = ((mat.mul_(float(global_rank))).view(-1))
# Delete original mat to avoid OOM
del mat
get_accelerator().empty_cache()
output = torch.zeros(input.nelement() * world_size,
dtype=getattr(torch, args.dtype)).to(get_accelerator().device_name(local_rank))
except RuntimeError as e:
if 'out of memory' in str(e):
if dist.get_rank() == 0:
print('WARNING: Ran out of GPU memory. Exiting comm op.')
sync_all()
break
else:
raise e
sync_all()
timed_all_gather(input, output, args)
else:
# all_gather_into_tensor saves memory
if ((args.dist == 'torch' or args.dist == 'deepspeed') and dist.has_all_gather_into_tensor()):
mem_factor = args.mem_factor + 0.2
else:
mem_factor = args.mem_factor
# Send the biggest message size our GPUs can fit. If you're facing OOM errors, reduce the mem_factor
sync_all()
elements_per_gpu = max_numel(comm_op='all_gather',
dtype=getattr(torch, args.dtype),
mem_factor=mem_factor,
local_rank=local_rank,
args=args)
try:
mat = torch.ones(elements_per_gpu, dtype=getattr(torch,
args.dtype)).to(get_accelerator().device_name(local_rank))
# multiply each GPU's tensor by the rank to ease debugging
input = ((mat.mul_(float(global_rank))).view(-1))
# Delete original mat to avoid OOM
del mat
get_accelerator().empty_cache()
output = torch.zeros(elements_per_gpu * world_size,
dtype=getattr(torch, args.dtype)).to(get_accelerator().device_name(local_rank))
except RuntimeError as e:
if 'out of memory' in str(e):
if dist.get_rank() == 0:
print('WARNING: Ran out of GPU memory. Try to reduce the --mem-factor argument!')
sync_all()
return
else:
raise e
sync_all()
timed_all_gather(input, output, args)
if __name__ == "__main__":
args = benchmark_parser().parse_args()
rank = args.local_rank
init_processes(local_rank=rank, args=args)
run_all_gather(local_rank=rank, args=args)
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
import torch
import sys, os, time
COMMS_BENCH_DIR = os.path.join(os.path.dirname(__file__), "../")
sys.path.append(COMMS_BENCH_DIR)
from communication.utils import *
from communication.constants import *
from deepspeed.accelerator import get_accelerator
def timed_all_reduce(input, args):
if args.dist == 'torch':
import torch.distributed as dist
elif args.dist == 'deepspeed':
import deepspeed.comm as dist
sync_all()
# Warmups, establish connections, etc.
for i in range(args.warmups):
dist.all_reduce(input, async_op=args.async_op)
sync_all()
# time the actual comm op trials times and average it
pre = time.perf_counter()
for i in range(args.trials):
dist.all_reduce(input, async_op=args.async_op)
sync_all()
duration = time.perf_counter() - pre
# maintain and clean performance data
avg_duration = duration / args.trials
size = input.element_size() * input.nelement()
n = dist.get_world_size()
tput, busbw = get_bw('all_reduce', size, avg_duration, args)
tput_str, busbw_str, duration_str = get_metric_strings(args, tput, busbw, avg_duration)
desc = f'{input.nelement()}x{input.element_size()}'
if not args.raw:
size = convert_size(size)
print_rank_0(f"{size:<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}")
def run_all_reduce(local_rank, args):
if args.dist == 'torch':
import torch.distributed as dist
elif args.dist == 'deepspeed':
import deepspeed.comm as dist
# Prepare benchmark header
print_header(args, 'all_reduce')
world_size = dist.get_world_size()
global_rank = dist.get_rank()
if args.scan:
M_LIST = []
for x in (2**p for p in range(1, args.maxsize)):
M_LIST.append(x)
sync_all()
# loop over various tensor sizes
for M in M_LIST:
global_rank = dist.get_rank()
try:
mat = torch.ones(world_size, M,
dtype=getattr(torch, args.dtype)).to(get_accelerator().device_name(local_rank))
sync_all()
input = ((mat.mul_(float(global_rank))).view(-1))
except RuntimeError as e:
if 'out of memory' in str(e):
if dist.get_rank() == 0:
print('WARNING: Ran out of GPU memory. Exiting comm op.')
sync_all()
break
else:
raise e
sync_all()
timed_all_reduce(input, args)
else:
# Send the biggest message size our GPUs can fit. If you're facing OOM errors, reduce the mem_factor
# Don't need output tensor, so we double mem_factor
elements_per_gpu = max_numel(comm_op='all_reduce',
dtype=getattr(torch, args.dtype),
mem_factor=args.mem_factor * 2,
local_rank=local_rank,
args=args)
try:
mat = torch.ones(elements_per_gpu, dtype=getattr(torch,
args.dtype)).to(get_accelerator().device_name(local_rank))
input = ((mat.mul_(float(global_rank))).view(-1))
except RuntimeError as e:
if 'out of memory' in str(e):
if dist.get_rank() == 0:
print('WARNING: Ran out of GPU memory. Try to reduce the --mem-factor argument!')
sync_all()
return
else:
raise e
sync_all()
timed_all_reduce(input, args)
if __name__ == "__main__":
args = benchmark_parser().parse_args()
rank = args.local_rank
init_processes(local_rank=rank, args=args)
run_all_reduce(local_rank=rank, args=args)
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
import torch
import sys, os, time
COMMS_BENCH_DIR = os.path.join(os.path.dirname(__file__), "../")
sys.path.append(COMMS_BENCH_DIR)
from communication.utils import *
from communication.constants import *
from deepspeed.accelerator import get_accelerator
def timed_all_to_all(input, output, args):
if args.dist == 'torch':
import torch.distributed as dist
elif args.dist == 'deepspeed':
import deepspeed.comm as dist
sync_all()
# Warmups, establish connections, etc.
for i in range(args.warmups):
dist.all_to_all_single(output, input, async_op=args.async_op)
sync_all()
# time the actual comm op trials times and average it
pre = time.perf_counter()
for i in range(args.trials):
dist.all_to_all_single(output, input, async_op=args.async_op)
sync_all()
duration = time.perf_counter() - pre
# maintain and clean performance data
avg_duration = duration / args.trials
size = input.element_size() * input.nelement()
n = dist.get_world_size()
tput, busbw = get_bw('all_to_all', size, avg_duration, args)
tput_str, busbw_str, duration_str = get_metric_strings(args, tput, busbw, avg_duration)
desc = f'{input.nelement()}x{input.element_size()}'
if not args.raw:
size = convert_size(size)
print_rank_0(f"{size:<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}")
def run_all_to_all(local_rank, args):
if args.dist == 'torch':
import torch.distributed as dist
elif args.dist == 'deepspeed':
import deepspeed.comm as dist
world_size = dist.get_world_size()
global_rank = dist.get_rank()
# Prepare benchmark header
print_header(args, 'all_to_all')
if args.scan:
M_LIST = []
for x in (2**p for p in range(1, args.maxsize)):
M_LIST.append(x)
sync_all()
# loop over various tensor sizes
for M in M_LIST:
global_rank = dist.get_rank()
try:
mat = torch.ones(world_size, M,
dtype=getattr(torch, args.dtype)).to(get_accelerator().device_name(local_rank))
assert mat.numel() % world_size == 0, f"tensor cannot be divided in {world_size} chunks"
sync_all()
input = ((mat.mul_(float(global_rank))).view(-1))
output = (mat.clone().view(-1))
except RuntimeError as e:
if 'out of memory' in str(e):
if dist.get_rank() == 0:
print('WARNING: Ran out of GPU memory. Exiting comm op.')
sync_all()
break
else:
raise e
sync_all()
timed_all_to_all(input, output, args)
else:
# Send the biggest message size our GPUs can fit. If you're facing OOM errors, reduce the mem_factor
elements_per_gpu = max_numel(comm_op='all_to_all',
dtype=getattr(torch, args.dtype),
mem_factor=args.mem_factor,
local_rank=local_rank,
args=args)
try:
mat = torch.ones(elements_per_gpu, dtype=getattr(torch,
args.dtype)).to(get_accelerator().device_name(local_rank))
assert mat.numel(
) % world_size == 0, f"tensor with {mat.numel()} elements cannot be divided in {world_size} chunks"
input = ((mat.mul_(float(global_rank))).view(-1))
# Delete original mat to avoid OOM
del mat
get_accelerator().empty_cache()
output = torch.zeros(elements_per_gpu,
dtype=getattr(torch, args.dtype)).to(get_accelerator().device_name(local_rank))
except RuntimeError as e:
if 'out of memory' in str(e):
if dist.get_rank() == 0:
print('WARNING: Ran out of GPU memory. Try to reduce the --mem-factor argument!')
sync_all()
return
else:
raise e
sync_all()
if args.debug:
for i in range(world_size):
if i == global_rank:
print(f"Before AllToAll Input List at rank {global_rank}: {input}")
dist.barrier()
timed_all_to_all(input, output, args)
if args.debug:
for i in range(world_size):
if i == global_rank:
print(f"AllToAll Results at rank {global_rank}: {output}")
dist.barrier()
if __name__ == "__main__":
args = benchmark_parser().parse_args()
rank = args.local_rank
init_processes(local_rank=rank, args=args)
run_all_to_all(local_rank=rank, args=args)
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
import torch
import sys, os, time
COMMS_BENCH_DIR = os.path.join(os.path.dirname(__file__), "../")
sys.path.append(COMMS_BENCH_DIR)
from communication.utils import *
from communication.constants import *
from deepspeed.accelerator import get_accelerator
def timed_broadcast(input, args):
if args.dist == 'torch':
import torch.distributed as dist
elif args.dist == 'deepspeed':
import deepspeed.comm as dist
sync_all()
# Warmups, establish connections, etc.
for i in range(args.warmups):
dist.broadcast(input, 0, async_op=args.async_op)
sync_all()
# time the actual comm op trials times and average it
pre = time.perf_counter()
for i in range(args.trials):
dist.broadcast(input, 0, async_op=args.async_op)
sync_all()
duration = time.perf_counter() - pre
# maintain and clean performance data
avg_duration = duration / args.trials
size = input.element_size() * input.nelement()
n = dist.get_world_size()
tput, busbw = get_bw('broadcast', size, avg_duration, args)
tput_str, busbw_str, duration_str = get_metric_strings(args, tput, busbw, avg_duration)
desc = f'{input.nelement()}x{input.element_size()}'
if not args.raw:
size = convert_size(size)
print_rank_0(f"{size:<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}")
def run_broadcast(local_rank, args):
if args.dist == 'torch':
import torch.distributed as dist
elif args.dist == 'deepspeed':
import deepspeed.comm as dist
# Prepare benchmark header
print_header(args, 'broadcast')
world_size = dist.get_world_size()
global_rank = dist.get_rank()
if args.scan:
M_LIST = []
for x in (2**p for p in range(1, args.maxsize)):
M_LIST.append(x)
sync_all()
# loop over various tensor sizes
for M in M_LIST:
global_rank = dist.get_rank()
try:
mat = torch.ones(world_size, M,
dtype=getattr(torch, args.dtype)).to(get_accelerator().device_name(local_rank))
sync_all()
input = ((mat.mul_(float(global_rank))).view(-1))
except RuntimeError as e:
if 'out of memory' in str(e):
if dist.get_rank() == 0:
print('WARNING: Ran out of GPU memory. Exiting comm op.')
sync_all()
break
else:
raise e
sync_all()
timed_broadcast(input, args)
else:
# Send the biggest message size our GPUs can fit. If you're facing OOM errors, reduce the mem_factor
# Don't need output tensor, so we double mem_factor
elements_per_gpu = max_numel(comm_op='broadcast',
dtype=getattr(torch, args.dtype),
mem_factor=args.mem_factor * 2,
local_rank=local_rank,
args=args)
try:
mat = torch.ones(elements_per_gpu, dtype=getattr(torch,
args.dtype)).to(get_accelerator().device_name(local_rank))
input = ((mat.mul_(float(global_rank))).view(-1))
except RuntimeError as e:
if 'out of memory' in str(e):
if dist.get_rank() == 0:
print('WARNING: Ran out of GPU memory. Try to reduce the --mem-factor argument!')
sync_all()
return
sync_all()
timed_broadcast(input, args)
if __name__ == "__main__":
args = benchmark_parser().parse_args()
rank = args.local_rank
init_processes(local_rank=rank, args=args)
run_broadcast(local_rank=rank, args=args)
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
from deepspeed.accelerator import get_accelerator
DEFAULT_WARMUPS = 5
DEFAULT_TRIALS = 50
DEFAULT_TYPE = 'float'
DEFAULT_BACKEND = get_accelerator().communication_backend_name()
DEFAULT_UNIT = 'Gbps'
DEFAULT_DIST = 'deepspeed'
DEFAULT_MAXSIZE = 24
TORCH_DISTRIBUTED_DEFAULT_PORT = 29500
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
import torch
import sys, os, time
COMMS_BENCH_DIR = os.path.join(os.path.dirname(__file__), "../")
sys.path.append(COMMS_BENCH_DIR)
from communication.utils import *
from communication.constants import *
from deepspeed.accelerator import get_accelerator
def timed_pt2pt(input, args):
if args.dist == 'torch':
import torch.distributed as dist
elif args.dist == 'deepspeed':
import deepspeed.comm as dist
sync_all()
# Warmups, establish connections, etc.
for i in range(args.warmups):
if dist.get_rank() == 0:
if args.async_op:
dist.isend(input, 1)
else:
dist.send(input, 1)
if dist.get_rank() == 1:
if args.async_op:
dist.irecv(input, src=0)
else:
dist.recv(input, src=0)
sync_all()
# time the actual comm op trials times and average it
pre = time.perf_counter()
for i in range(args.trials):
if dist.get_rank() == 0:
if args.async_op:
dist.isend(input, 1)
else:
dist.send(input, 1)
if dist.get_rank() == 1:
if args.async_op:
dist.irecv(input, src=0)
else:
dist.recv(input, src=0)
sync_all()
duration = time.perf_counter() - pre
# maintain and clean performance data
avg_duration = duration / args.trials
size = input.element_size() * input.nelement()
n = dist.get_world_size()
tput, busbw = get_bw('pt2pt', size, avg_duration, args)
tput_str, busbw_str, duration_str = get_metric_strings(args, tput, busbw, avg_duration)
desc = f'{input.nelement()}x{input.element_size()}'
if not args.raw:
size = convert_size(size)
print_rank_0(f"{size:<20} {desc:25s} {duration_str:20s} {tput_str:20s} {busbw_str:20s}")
def run_pt2pt(local_rank, args):
if args.dist == 'torch':
import torch.distributed as dist
elif args.dist == 'deepspeed':
import deepspeed.comm as dist
# Prepare benchmark header
print_header(args, 'pt2pt')
global_rank = dist.get_rank()
world_size = dist.get_world_size()
if args.scan:
# Create list of message sizes
M_LIST = []
for x in (2**p for p in range(1, args.maxsize)):
M_LIST.append(x)
sync_all()
# loop over various tensor sizes
for M in M_LIST:
global_rank = dist.get_rank()
try:
mat = torch.ones(world_size, M,
dtype=getattr(torch, args.dtype)).to(get_accelerator().device_name(local_rank))
sync_all()
input = ((mat.mul_(float(global_rank))).view(-1))
except RuntimeError as e:
if 'out of memory' in str(e):
if dist.get_rank() == 0:
print('WARNING: Ran out of GPU memory. Exiting comm op.')
sync_all()
break
else:
raise e
sync_all()
timed_pt2pt(input, args)
else:
# Send the biggest message size our GPUs can fit. If you're facing OOM errors, reduce the mem_factor
# Don't need output tensor, so double mem_factor
elements_per_gpu = max_numel(comm_op='pt2pt',
dtype=getattr(torch, args.dtype),
mem_factor=args.mem_factor * 2,
local_rank=local_rank,
args=args)
try:
mat = torch.ones(elements_per_gpu, dtype=getattr(torch,
args.dtype)).to(get_accelerator().device_name(local_rank))
input = ((mat.mul_(float(global_rank))).view(-1))
except RuntimeError as e:
if 'out of memory' in str(e):
if dist.get_rank() == 0:
print('WARNING: Ran out of GPU memory. Try to reduce the --mem-factor argument!')
sync_all()
return
sync_all()
timed_pt2pt(input, args)
if __name__ == "__main__":
args = benchmark_parser().parse_args()
rank = args.local_rank
init_processes(local_rank=rank, args=args)
run_pt2pt(local_rank=rank, args=args)
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
import sys, os
COMMS_BENCH_DIR = os.path.join(os.path.dirname(__file__), "../")
sys.path.append(COMMS_BENCH_DIR)
from communication.utils import *
from communication.all_reduce import run_all_reduce
from communication.all_gather import run_all_gather
from communication.all_to_all import run_all_to_all
from communication.pt2pt import run_pt2pt
from communication.broadcast import run_broadcast
from communication.constants import *
# For importing
def main(args, rank):
init_processes(local_rank=rank, args=args)
ops_to_run = []
if args.all_reduce:
ops_to_run.append('all_reduce')
if args.all_gather:
ops_to_run.append('all_gather')
if args.broadcast:
ops_to_run.append('broadcast')
if args.pt2pt:
ops_to_run.append('pt2pt')
if args.all_to_all:
ops_to_run.append('all_to_all')
if len(ops_to_run) == 0:
ops_to_run = ['all_reduce', 'all_gather', 'all_to_all', 'broadcast', 'pt2pt']
for comm_op in ops_to_run:
if comm_op == 'all_reduce':
run_all_reduce(local_rank=rank, args=args)
if comm_op == 'all_gather':
run_all_gather(local_rank=rank, args=args)
if comm_op == 'all_to_all':
run_all_to_all(local_rank=rank, args=args)
if comm_op == 'pt2pt':
run_pt2pt(local_rank=rank, args=args)
if comm_op == 'broadcast':
run_broadcast(local_rank=rank, args=args)
# For directly calling benchmark
if __name__ == "__main__":
args = benchmark_parser().parse_args()
rank = args.local_rank
main(args, rank)
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
import torch
import os, sys
import math
import argparse
COMMS_BENCH_DIR = os.path.join(os.path.dirname(__file__), "../")
sys.path.append(COMMS_BENCH_DIR)
from communication.constants import *
from deepspeed.accelerator import get_accelerator
global dist
def env2int(env_list, default=-1):
for e in env_list:
val = int(os.environ.get(e, -1))
if val >= 0: return val
return default
def init_torch_distributed(backend):
global dist
import torch.distributed as dist
# discover rank/size info from env
if 'MASTER_PORT' not in os.environ:
os.environ['MASTER_PORT'] = str(TORCH_DISTRIBUTED_DEFAULT_PORT)
if 'MASTER_ADDR' not in os.environ:
try:
from mpi4py import MPI
except ModuleNotFoundError:
print(
"Cannot import mpi4py and MASTER_ADDR not set. Please either install mpi4py or set the MASTER_ADDR on all ranks"
)
raise Exception
import subprocess
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
master_addr = None
if rank == 0:
hostname_cmd = ["hostname -I"]
result = subprocess.check_output(hostname_cmd, shell=True)
master_addr = result.decode('utf-8').split()[0]
master_addr = comm.bcast(master_addr, root=0)
os.environ['MASTER_ADDR'] = master_addr
local_rank = env2int(
['LOCAL_RANK', 'MPI_LOCALRANKID', 'OMPI_COMM_WORLD_LOCAL_RANK', 'MV2_COMM_WORLD_LOCAL_RANK', 'SLURM_LOCALID'])
if 'LOCAL_RANK' not in os.environ:
os.environ['LOCAL_RANK'] = str(local_rank)
rank = env2int(['RANK', 'MPI_RANKID', 'OMPI_COMM_WORLD_RANK', 'MV2_COMM_WORLD_RANK', 'SLURM_PROCID'])
if 'RANK' not in os.environ:
os.environ['RANK'] = str(rank)
world_size = env2int(['WORLD_SIZE', 'OMPI_COMM_WORLD_SIZE', 'MV2_COMM_WORLD_SIZE', 'SLURM_NPROCS'])
if 'WORLD_SIZE' not in os.environ:
os.environ['WORLD_SIZE'] = str(world_size)
torch.distributed.init_process_group(backend)
local_rank = int(os.environ['LOCAL_RANK'])
get_accelerator().set_device(local_rank)
def init_deepspeed_comm(backend):
global dist
import deepspeed
import deepspeed.comm as dist
deepspeed.init_distributed(dist_backend=backend)
local_rank = int(os.environ['LOCAL_RANK'])
get_accelerator().set_device(local_rank)
def init_processes(local_rank, args):
if args.dist == 'deepspeed':
init_deepspeed_comm(args.backend)
elif args.dist == 'torch':
init_torch_distributed(args.backend)
else:
print_rank_0(f"distributed framework {args.dist} not supported")
exit(0)
def print_rank_0(message):
if dist.get_rank() == 0:
print(message)
def print_header(args, comm_op):
if comm_op == 'pt2pt':
world_size = 2
else:
world_size = dist.get_world_size()
tput = f'Throughput ({args.bw_unit})'
busbw = f'BusBW ({args.bw_unit})'
header = f"\n---- Performance of {comm_op} on {world_size} devices ---------------------------------------------------------\n"
duration_str = 'Duration'
if args.raw:
duration_str += ' (us)'
header += f"{'Size (Bytes)':20s} {'Description':25s} {duration_str:20s} {tput:20s} {busbw:20s}\n"
header += "----------------------------------------------------------------------------------------------------"
print_rank_0(header)
def get_bw(comm_op, size, duration, args):
n = dist.get_world_size()
tput = 0
busbw = 0
if comm_op == "all_to_all":
tput = (size / duration)
busbw = (size / duration) * ((n - 1) / n)
elif comm_op == "all_gather":
size *= n
tput = (size / duration)
busbw = (size / duration) * ((n - 1) / n)
elif comm_op == "all_reduce":
tput = (size * 2 / duration)
busbw = (size / duration) * (2 * (n - 1) / n)
elif comm_op == "pt2pt" or comm_op == "broadcast":
tput = (size / duration)
busbw = tput
else:
print_rank_0("wrong comm_op specified")
exit(0)
if args.bw_unit == 'Gbps':
tput *= 8
busbw *= 8
return tput, busbw
def get_metric_strings(args, tput, busbw, duration):
duration_ms = duration * 1e3
duration_us = duration * 1e6
tput = f'{tput / 1e9:.3f}'
busbw = f'{busbw /1e9:.3f}'
if duration_us < 1e3 or args.raw:
duration = f'{duration_us:.3f}'
if not args.raw:
duration += ' us'
else:
duration = f'{duration_ms:.3f} ms'
return tput, busbw, duration
def sync_all():
get_accelerator().synchronize()
dist.barrier()
def max_numel(comm_op, dtype, mem_factor, local_rank, args):
dtype_size = _element_size(dtype)
max_memory_per_gpu = get_accelerator().total_memory(local_rank) * mem_factor
if comm_op == 'all_reduce' or comm_op == 'pt2pt' or comm_op == 'broadcast':
elements_per_gpu = int(max_memory_per_gpu // dtype_size)
elif comm_op == 'all_gather':
# all_gather performance is lower for non-powers of two, and the output buffer size scales with world size
# Therefore, divide by world size and round down to nearest power of 2
elements_per_gpu = int(max_memory_per_gpu // dtype_size // dist.get_world_size())
elements_per_gpu = int(pow(2, int(math.log(elements_per_gpu, 2))))
elif comm_op == 'all_to_all':
# Number of elements must be divisible by world_size
# all_to_all performance is lower for non-powers of two. Round down like all_gather.
elements_per_gpu = int(max_memory_per_gpu // dtype_size)
elements_per_gpu = int(dist.get_world_size() * round(elements_per_gpu / dist.get_world_size()))
elements_per_gpu = int(pow(2, int(math.log(elements_per_gpu, 2))))
else:
print(f"This communication operation: {comm_op} is not supported yet")
exit(0)
return elements_per_gpu
# Helper function to pretty-print message sizes
def convert_size(size_bytes):
if size_bytes == 0:
return "0B"
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return "%s %s" % (s, size_name[i])
# Copied from torch. Need to add the func here for old torch compatibility.
def _element_size(dtype):
"""
Returns the element size for a dtype, in bytes
"""
if not isinstance(dtype, torch.dtype):
raise RuntimeError(f'expected torch.dtype, but got {type(dtype)}')
if dtype.is_complex:
return torch.finfo(dtype).bits >> 2
elif dtype.is_floating_point:
return torch.finfo(dtype).bits >> 3
elif dtype == torch.bool:
# NOTE: torch.bool is not supported in torch.iinfo()
return 1
else:
return torch.iinfo(dtype).bits >> 3
def benchmark_parser():
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", type=int)
parser.add_argument("--trials", type=int, default=DEFAULT_TRIALS, help='Number of timed iterations')
parser.add_argument("--warmups", type=int, default=DEFAULT_WARMUPS, help='Number of warmup (non-timed) iterations')
parser.add_argument("--maxsize", type=int, default=24, help='Max message size as a power of 2')
parser.add_argument("--async-op", action="store_true", help='Enables non-blocking communication')
parser.add_argument("--bw-unit", type=str, default=DEFAULT_UNIT, choices=['Gbps', 'GBps'])
parser.add_argument("--backend",
type=str,
default=DEFAULT_BACKEND,
choices=['nccl', 'ccl', 'mpi'],
help='Communication library to use')
parser.add_argument("--dist",
type=str,
default=DEFAULT_DIST,
choices=['deepspeed', 'torch'],
help='Distributed DL framework to use')
parser.add_argument("--scan", action="store_true", help='Enables scanning all message sizes')
parser.add_argument("--raw", action="store_true", help='Print the message size and latency without units')
parser.add_argument("--all-reduce", action="store_true", help='Run all_reduce')
parser.add_argument("--all-gather", action="store_true", help='Run all_gather')
parser.add_argument("--all-to-all", action="store_true", help='Run all_to_all')
parser.add_argument("--pt2pt", action="store_true", help='Run pt2pt')
parser.add_argument("--broadcast", action="store_true", help='Run broadcast')
parser.add_argument("--dtype", type=str, default=DEFAULT_TYPE, help='PyTorch tensor dtype')
parser.add_argument("--mem-factor",
type=float,
default=.4,
help='Proportion of max available GPU memory to use for single-size evals')
parser.add_argument("--debug", action="store_true", help='Enables all_to_all debug prints')
return parser
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
import torch
import time
import deepspeed
import argparse
from transformers import pipeline
from deepspeed.accelerator import get_accelerator
parser = argparse.ArgumentParser()
parser.add_argument("--model", "-m", type=str, help="hf model name")
parser.add_argument("--deepspeed", action="store_true", help="use deepspeed inference")
parser.add_argument("--dtype", type=str, default="fp16", help="fp16 or fp32")
parser.add_argument("--max-tokens", type=int, default=50, help="max new tokens")
parser.add_argument("--local_rank", type=int, default=0, help="local rank")
parser.add_argument("--trials", type=int, default=30, help="number of trials")
parser.add_argument("--kernel-inject", action="store_true", help="inject kernels on")
parser.add_argument("--graphs", action="store_true", help="CUDA Graphs on")
args = parser.parse_args()
def print_latency(latency_set, title, warmup=3):
# trim warmup queries
latency_set = latency_set[warmup:]
count = len(latency_set)
if count > 0:
latency_set.sort()
n50 = (count - 1) * 0.5 + 1
n90 = (count - 1) * 0.9 + 1
n95 = (count - 1) * 0.95 + 1
n99 = (count - 1) * 0.99 + 1
n999 = (count - 1) * 0.999 + 1
avg = sum(latency_set) / count
p50 = latency_set[int(n50) - 1]
p90 = latency_set[int(n90) - 1]
p95 = latency_set[int(n95) - 1]
p99 = latency_set[int(n99) - 1]
p999 = latency_set[int(n999) - 1]
print(f"====== latency stats {title} ======")
print("\tAvg Latency: {0:8.2f} ms".format(avg * 1000))
print("\tP50 Latency: {0:8.2f} ms".format(p50 * 1000))
print("\tP90 Latency: {0:8.2f} ms".format(p90 * 1000))
print("\tP95 Latency: {0:8.2f} ms".format(p95 * 1000))
print("\tP99 Latency: {0:8.2f} ms".format(p99 * 1000))
print("\t999 Latency: {0:8.2f} ms".format(p999 * 1000))
deepspeed.init_distributed()
print(args.model, args.max_tokens, args.dtype)
if args.dtype.lower() == "fp16":
dtype = torch.float16
else:
dtype = torch.float32
pipe = pipeline("fill-mask", model=args.model, framework="pt", device=args.local_rank)
if dtype == torch.half:
pipe.model.half()
mask = pipe.tokenizer.mask_token
br = pipe(f"Hello I'm a {mask} model")
if args.deepspeed:
pipe.model = deepspeed.init_inference(pipe.model,
dtype=dtype,
mp_size=1,
replace_with_kernel_inject=args.kernel_inject,
enable_cuda_graph=args.graphs)
pipe.model.profile_model_time()
responses = []
times = []
mtimes = []
for i in range(args.trials):
get_accelerator().synchronize()
start = time.time()
r = pipe(f"Hello I'm a {mask} model")
get_accelerator().synchronize()
end = time.time()
responses.append(r)
times.append((end - start))
mtimes += pipe.model.model_times()
#print(f"{pipe.model.model_times()=}")
print_latency(times, "e2e latency")
print_latency(mtimes, "model latency")
print(responses[0:3])
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
import os
import re
import argparse
import pandas as pd
parser = argparse.ArgumentParser()
parser.add_argument(
"--results-dir",
"-r",
type=str,
default="./results",
help="directory containing sweep results",
)
parser.add_argument("--version", "-v", type=int, default=0, help="version to be collected")
parser.add_argument("--gen-text-n", "-n", type=int, default=1, help="expected number of generated text")
parser.add_argument("--output", "-o", type=str, default="./results.csv", help="output file")
args = parser.parse_args()
def get_branch(file_path):
match = re.match(r".*\/(.*)\.log", file_path)
if match is None:
return False
else:
return match.groups()[0]
def get_benchmark_params(root_dir, file_path):
match = re.match(
rf"{root_dir}\/(.+?)_(fp\d+)_(true|false)_(true|false)_(\d+)gpus_v(\d+)\/",
file_path,
)
if match is None:
return False
else:
model, dtype, graphs, kernel, gpus, version = match.groups()
bool_dict = {"true": True, "false": False}
return {
"model": model,
"dtype": dtype,
"graphs": bool_dict[graphs.lower()],
"kernel": bool_dict[kernel.lower()],
"gpus": int(gpus),
"version": int(version),
}
def get_perf_data(file_content):
matches = re.findall(r"\s+(.+?)\sLatency:\s+(\d+\.\d+)\sms", file_content)
if matches is []:
return False
else:
return {f"latency-{key}": float(val) for key, val in matches}
def get_generated_text(file_content, gen_text_n):
file_content = file_content.replace("\n", " ")
file_content = file_content.replace("\t", " ")
matches = re.findall(r"RESPONSE\s(\d+):\s+[-]{30}\s+(.+?)\s+[-]{30}", file_content)
if len(matches) != gen_text_n:
return False
else:
return {f"generated-text-{key}": val for key, val in matches}
def get_error(file_content):
matches = re.findall(r"Error:\s+(.+?)\n", file_content)
if matches is []:
return False
else:
return {f"error": val for val in matches}
if __name__ == "__main__":
# List to collect data from all benchmarks
benchmarks_data = []
# Walk through directory of results from sweep.sh
for root, dirs, files in os.walk(args.results_dir):
# Because of how some models are named, the dir structure for results can vary, e.g.:
# "EleutherAI/gpt-neo_*/baseline.log" versus "gpt2_*/baseline.log"
if dirs:
continue
# Get data from baseline and each tested branch
for name in files:
file_path = os.path.join(root, name)
branch = get_branch(file_path)
if not branch:
print(f"WARNING: Could not detect branch for file {file_path}, skipping")
continue
params = get_benchmark_params(args.results_dir, file_path)
if not params:
print(f"WARNING: Could not detect benchmark settings for file {file_path}, skipping")
continue
# Verify that the version matches that which we want to collect
if params["version"] != args.version:
continue
with open(file_path, "r") as f:
file_content = f.read()
perf_data = get_perf_data(file_content)
if not perf_data:
print(f"WARNING: Could not detect benchmark performance data for file {file_path}")
generated_text = get_generated_text(file_content, args.gen_text_n)
if not generated_text:
print(f"WARNING: Could not detect generated text for file {file_path}")
error = get_error(file_content)
if error:
print(f"Error found in {file_path}, collecting error info...")
benchmarks_data.append({"branch": branch, **params, **error})
continue
benchmarks_data.append({"branch": branch, **params, **perf_data, **generated_text})
# Convert to a DataFrame and save
benchmarks_df = pd.DataFrame(benchmarks_data)
benchmarks_df.to_csv(args.output)
# Copyright (c) Microsoft Corporation.
# SPDX-License-Identifier: Apache-2.0
# DeepSpeed Team
import os
import torch
import time
import deepspeed
import argparse
from transformers import pipeline
from deepspeed.accelerator import get_accelerator
parser = argparse.ArgumentParser()
parser.add_argument("--model", "-m", type=str, help="hf model name")
parser.add_argument("--deepspeed", action="store_true", help="use deepspeed inference")
parser.add_argument("--dtype", type=str, default="fp16", choices=["fp16", "fp32", "int8"], help="int8, fp16, or fp32")
parser.add_argument("--graphs", action="store_true", help="CUDA Graphs on")
parser.add_argument("--kernel-inject", action="store_true", help="inject kernels on")
parser.add_argument("--max-tokens", type=int, default=50, help="max new tokens")
parser.add_argument("--local_rank", type=int, default=int(os.getenv("LOCAL_RANK", "0")), help="local rank")
parser.add_argument("--world_size", type=int, default=int(os.getenv("WORLD_SIZE", "1")), help="world size")
parser.add_argument("--trials", type=int, default=30, help="number of trials")
args = parser.parse_args()
def print_latency(latency_set, title, warmup=3):
# trim warmup queries
latency_set = list(latency_set)
latency_set = latency_set[warmup:]
count = len(latency_set)
if count > 0:
latency_set.sort()
n50 = (count - 1) * 0.5 + 1
n90 = (count - 1) * 0.9 + 1
n95 = (count - 1) * 0.95 + 1
n99 = (count - 1) * 0.99 + 1
n999 = (count - 1) * 0.999 + 1
avg = sum(latency_set) / count
p50 = latency_set[int(n50) - 1]
p90 = latency_set[int(n90) - 1]
p95 = latency_set[int(n95) - 1]
p99 = latency_set[int(n99) - 1]
p999 = latency_set[int(n999) - 1]
print(f"====== latency stats {title} ======")
print("\tAvg Latency: {0:8.2f} ms".format(avg * 1000))
print("\tP50 Latency: {0:8.2f} ms".format(p50 * 1000))
print("\tP90 Latency: {0:8.2f} ms".format(p90 * 1000))
print("\tP95 Latency: {0:8.2f} ms".format(p95 * 1000))
print("\tP99 Latency: {0:8.2f} ms".format(p99 * 1000))
print("\t999 Latency: {0:8.2f} ms".format(p999 * 1000))
deepspeed.init_distributed()
if args.local_rank == 0:
print("BENCHMARK SETTINGS:")
print(f"\tMODEL: {args.model}")
print(f"\tMAX_TOKENS: {args.max_tokens}")
print(f"\tDTYPE: {args.dtype}")
print(f"\tCUDA_GRAPHS: {args.graphs}")
print(f"\tKERNEL_INJECT: {args.kernel_inject}")
if args.dtype == "int8":
dtype = torch.int8
elif args.dtype == "fp16":
dtype = torch.float16
else:
dtype = torch.float32
pipe = pipeline("text-generation", model=args.model, framework="pt", device=args.local_rank)
if dtype == torch.float16:
pipe.model.half()
if args.deepspeed:
pipe.model = deepspeed.init_inference(
pipe.model,
dtype=dtype,
mp_size=args.world_size,
replace_with_kernel_inject=args.kernel_inject,
enable_cuda_graph=args.graphs,
)
pipe.model.profile_model_time()
responses = []
times = []
mtimes = []
for i in range(args.trials):
get_accelerator().synchronize()
start = time.time()
r = pipe("DeepSpeed is", do_sample=False, max_new_tokens=args.max_tokens)
get_accelerator().synchronize()
end = time.time()
responses.append(r)
times.append(end - start) # / (args.max_tokens - 3))
mtimes.append(sum(pipe.model.model_times()))
if args.local_rank == 0:
print_latency(times, "(e2e) latency")
print_latency(mtimes, "(model-only) latency")
print_latency(map(lambda t: t / (args.max_tokens - 3), times), "(e2e) per token latency")
print(f"RESPONSE 0:")
print("-" * 30)
print(responses[0][0]["generated_text"])
print("-" * 30)
set -x
model=$1
branch1=$2
branch2=$3
dtype=$4
graphs=$5
kernel=$6
gpus=$7
version=0
log_path=results/${model}_${dtype}_${graphs}_${kernel}_${gpus}gpus_v${version}
mkdir -p ${log_path}
params="--dtype $dtype "
if [[ "$graphs" == "true" ]]; then
params+="--graphs "
fi
if [[ "$kernel" == "true" ]]; then
params+="--kernel "
fi
echo "baseline $log_path"
deepspeed --num_gpus 1 gpt-bench.py -m "${model}" $params &> ${log_path}/baseline.log
cd ../../
git checkout ${branch1}
cd -
echo "ds ${branch1} $log_path"
deepspeed --num_gpus $gpus gpt-bench.py --deepspeed -m "${model}" $params &> ${log_path}/ds-${branch1}.log
cd ../../
git checkout ${branch2}
cd -
echo "ds ${branch2} $log_path"
deepspeed --num_gpus $gpus gpt-bench.py --deepspeed -m "${model}" $params&> ${log_path}/ds-${branch2}.log
set -x
export TRANSFORMERS_CACHE=/tmp/hf-cache
branch1=$1
branch2=$2
gptneo_models="EleutherAI/gpt-neo-2.7B EleutherAI/gpt-neo-1.3B EleutherAI/gpt-neo-125M"
gpt2_models="gpt2 gpt2-large gpt2-xl"
gptj_models="EleutherAI/gpt-j-6B"
opt_models="facebook/opt-125m facebook/opt-1.3b facebook/opt-2.7b facebook/opt-6.7b facebook/opt-13b"
bloom_models="bigscience/bloom-560m bigscience/bloom-1b7 bigscience/bloom-3b bigscience/bloom-7b1"
for gpus in `echo "1 2 4 8"`; do
for dtype in `echo "fp16 fp32"`; do
for graphs in `echo "true false"`; do
for kernel in `echo "true false"`; do
params="$dtype $graphs $kernel $gpus"
for m in `echo "$gptneo_models"`; do
bash run_model.sh $m $branch1 $branch2 $params
done
for m in `echo "$gpt2_models"`; do
bash run_model.sh $m $branch1 $branch2 $params
done
for m in `echo "$gptj_models"`; do
bash run_model.sh $m $branch1 $branch2 $params
done
for m in `echo "$opt_models"`; do
bash run_model.sh $m $branch1 $branch2 $params
done
for m in `echo "$bloom_models"`; do
bash run_model.sh $m $branch1 $branch2 $params
done
done
done
done
done
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册