未验证 提交 7e813283 编写于 作者: J Jeff Rasley 提交者: GitHub

MPI 3.x support via mpi4py (#107)

* add mpirun support for openmpi 4.0

* add master addr support from args

* switch mpi detection to use mpi4py

* set constant for default distributed port

* Make sure deepspeed_mpi exits in args
上级 ccec2463
...@@ -143,6 +143,15 @@ def _add_core_arguments(parser): ...@@ -143,6 +143,15 @@ def _add_core_arguments(parser):
default=None, default=None,
type=str, type=str,
help='Deprecated DeepSpeed json configuration file.') help='Deprecated DeepSpeed json configuration file.')
group.add_argument(
'--deepspeed_mpi',
default=False,
action='store_true',
help=
"Run via MPI, this will attempt to discover the necessary variables to initialize torch "
"distributed from the MPI environment")
return parser return parser
......
...@@ -31,6 +31,11 @@ SCHEDULER_TYPE_DEFAULT = None ...@@ -31,6 +31,11 @@ SCHEDULER_TYPE_DEFAULT = None
SCHEDULER_PARAMS = "params" SCHEDULER_PARAMS = "params"
MAX_GRAD_NORM = 'max_grad_norm' MAX_GRAD_NORM = 'max_grad_norm'
#############################################
# Torch distributed constants
#############################################
TORCH_DISTRIBUTED_DEFAULT_PORT = "29500"
# Steps # Steps
STEPS_PER_PRINT = "steps_per_print" STEPS_PER_PRINT = "steps_per_print"
STEPS_PER_PRINT_DEFAULT = 10 STEPS_PER_PRINT_DEFAULT = 10
......
...@@ -5,19 +5,16 @@ Copyright 2020 The Microsoft DeepSpeed Team: deepspeed@microsoft.com ...@@ -5,19 +5,16 @@ Copyright 2020 The Microsoft DeepSpeed Team: deepspeed@microsoft.com
import sys import sys
import subprocess import subprocess
import os import os
import socket
import json import json
import base64 import base64
from collections import defaultdict from collections import defaultdict
from argparse import ArgumentParser, REMAINDER from argparse import ArgumentParser, REMAINDER
import torch
def parse_args(): def parse_args():
parser = ArgumentParser(description="DeepSpeed distributed training launch " parser = ArgumentParser(description="DeepSpeed distributed training launch"
"utilty that creates multiple distributed " " utility that creates multiple distributed"
"processes on a single node") " processes on a single node")
# Optional arguments for the launch helper # Optional arguments for the launch helper
parser.add_argument("--node_rank", parser.add_argument("--node_rank",
...@@ -28,15 +25,15 @@ def parse_args(): ...@@ -28,15 +25,15 @@ def parse_args():
parser.add_argument("--master_addr", parser.add_argument("--master_addr",
default="127.0.0.1", default="127.0.0.1",
type=str, type=str,
help="Master node (rank 0)'s address, should be either " help="Master node (rank 0)'s address, should be either"
"the IP address or the hostname of node 0, for " " the IP address or the hostname of node 0, for"
"single node multi-proc training, the " " single node multi-proc training, the"
"--master_addr can simply be 127.0.0.1") " --master_addr can simply be 127.0.0.1")
parser.add_argument("--master_port", parser.add_argument("--master_port",
default=29500, default=29500,
type=int, type=int,
help="Master node (rank 0)'s free port that needs to " help="Master node (rank 0)'s free port that needs to "
"be used for communciation during distributed " "be used for communication during distributed "
"training") "training")
parser.add_argument("--world_info", parser.add_argument("--world_info",
default="None", default="None",
......
...@@ -21,7 +21,7 @@ from deepspeed.pt.deepspeed_config import DeepSpeedConfig, \ ...@@ -21,7 +21,7 @@ from deepspeed.pt.deepspeed_config import DeepSpeedConfig, \
from deepspeed.pt.deepspeed_dataloader import DeepSpeedDataLoader from deepspeed.pt.deepspeed_dataloader import DeepSpeedDataLoader
from deepspeed.pt.deepspeed_constants import ROUTE_TRAIN, ROUTE_PREDICT, \ from deepspeed.pt.deepspeed_constants import ROUTE_TRAIN, ROUTE_PREDICT, \
ROUTE_EVAL ROUTE_EVAL, TORCH_DISTRIBUTED_DEFAULT_PORT
import deepspeed.pt.deepspeed_lr_schedules as lr_schedules import deepspeed.pt.deepspeed_lr_schedules as lr_schedules
from deepspeed.pt.deepspeed_csr_tensor import CSRTensor from deepspeed.pt.deepspeed_csr_tensor import CSRTensor
...@@ -119,6 +119,8 @@ class DeepSpeedLight(Module): ...@@ -119,6 +119,8 @@ class DeepSpeedLight(Module):
self.gradient_average = True self.gradient_average = True
self.warn_unscaled_loss = True self.warn_unscaled_loss = True
self._mpi_check(args)
if dist_init_required is None: if dist_init_required is None:
dist_init_required = not dist.is_initialized() dist_init_required = not dist.is_initialized()
...@@ -184,6 +186,40 @@ class DeepSpeedLight(Module): ...@@ -184,6 +186,40 @@ class DeepSpeedLight(Module):
if self.dump_state(): if self.dump_state():
print_configuration(self, 'DeepSpeedLight') print_configuration(self, 'DeepSpeedLight')
def _mpi_check(self, args):
if hasattr(args, 'deepspeed_mpi') and args.deepspeed_mpi:
from mpi4py import MPI
import subprocess
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
world_size = comm.Get_size()
master_addr = None
if rank == 0:
hostname_cmd = ["hostname -I"]
result = subprocess.check_output(hostname_cmd, shell=True)
master_addr = result.decode('utf-8').split()[0]
master_addr = comm.bcast(master_addr, root=0)
# Determine local rank by assuming hostnames are unique
proc_name = MPI.Get_processor_name()
all_procs = comm.allgather(proc_name)
local_rank = sum([i == proc_name for i in all_procs[:rank]])
os.environ['RANK'] = str(rank)
os.environ['WORLD_SIZE'] = str(world_size)
args.local_rank = local_rank
os.environ['MASTER_ADDR'] = master_addr
os.environ['MASTER_PORT'] = TORCH_DISTRIBUTED_DEFAULT_PORT
logging.info(
"Discovered MPI settings of world_rank={}, local_rank={}, world_size={}, master_addr={}, master_port={}"
.format(os.environ['RANK'],
args.local_rank,
os.environ['WORLD_SIZE'],
os.environ['MASTER_ADDR'],
os.environ['MASTER_PORT']))
def tensorboard_enabled(self): def tensorboard_enabled(self):
return self._config.tensorboard_enabled return self._config.tensorboard_enabled
......
...@@ -13,6 +13,7 @@ import argparse ...@@ -13,6 +13,7 @@ import argparse
import subprocess import subprocess
import collections import collections
from copy import deepcopy from copy import deepcopy
from deepspeed.pt.deepspeed_constants import TORCH_DISTRIBUTED_DEFAULT_PORT
DLTS_HOSTFILE = "/job/hostfile" DLTS_HOSTFILE = "/job/hostfile"
EXPORT_ENVS = ["NCCL", "PYTHONPATH"] EXPORT_ENVS = ["NCCL", "PYTHONPATH"]
...@@ -61,7 +62,7 @@ def parse_args(args=None): ...@@ -61,7 +62,7 @@ def parse_args(args=None):
parser.add_argument("--num_gpus", type=int, default=-1, help="") parser.add_argument("--num_gpus", type=int, default=-1, help="")
parser.add_argument("--master_port", parser.add_argument("--master_port",
default=29500, default=int(TORCH_DISTRIBUTED_DEFAULT_PORT),
type=int, type=int,
help="(optional) Port used by PyTorch distributed for " help="(optional) Port used by PyTorch distributed for "
"communication during training.") "communication during training.")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册