提交 4bccb6aa 编写于 作者: L LielinJiang

remove init context

上级 280dd0e3
......@@ -16,7 +16,7 @@ import six
import copy
from progressbar import ProgressBar
from distributed import get_local_rank
from paddle.fluid.dygraph.parallel import Env
def config_callbacks(callbacks=None,
model=None,
......@@ -193,7 +193,7 @@ class ProgBarLogger(Callback):
self.steps = self.params['steps']
self.epoch = epoch
self.train_step = 0
if self.verbose and self.epochs and get_local_rank() == 0:
if self.verbose and self.epochs and Env().local_rank == 0:
print('Epoch %d/%d' % (epoch + 1, self.epochs))
self.train_progbar = ProgressBar(num=self.steps, verbose=self.verbose)
......@@ -211,7 +211,7 @@ class ProgBarLogger(Callback):
logs = logs or {}
self.train_step = step
if self.train_step % self.log_freq == 0 and self.verbose and get_local_rank() == 0:
if self.train_step % self.log_freq == 0 and self.verbose and Env().local_rank == 0:
# if steps is not None, last step will update in on_epoch_end
if self.steps and self.train_step < self.steps:
self._updates(logs, 'train')
......@@ -220,7 +220,7 @@ class ProgBarLogger(Callback):
def on_epoch_end(self, epoch, logs=None):
logs = logs or {}
if self.verbose and get_local_rank() == 0:
if self.verbose and Env().local_rank == 0:
self._updates(logs, 'train')
def on_eval_begin(self, logs=None):
......@@ -230,7 +230,7 @@ class ProgBarLogger(Callback):
self.evaled_samples = 0
self.eval_progbar = ProgressBar(
num=self.eval_steps, verbose=self.verbose)
if get_local_rank() == 0:
if Env().local_rank == 0:
print('Eval begin...')
def on_eval_batch_end(self, step, logs=None):
......@@ -241,7 +241,7 @@ class ProgBarLogger(Callback):
def on_eval_end(self, logs=None):
logs = logs or {}
if self.verbose and get_local_rank() == 0:
if self.verbose and Env().local_rank == 0:
self._updates(logs, 'eval')
print('Eval samples: %d' % (self.evaled_samples))
......@@ -255,13 +255,13 @@ class ModelCheckpoint(Callback):
self.epoch = epoch
def on_epoch_end(self, epoch, logs=None):
if self.model and self.epoch % self.save_freq == 0 and get_local_rank() == 0:
if self.model and self.epoch % self.save_freq == 0 and Env().local_rank == 0:
path = '{}/{}'.format(self.save_file, epoch)
print('save checkpoint at {}'.format(path))
self.model.save(path)
def on_train_end(self, logs=None):
if self.model and get_local_rank() == 0:
if self.model and Env().local_rank == 0:
path = '{}/final'.format(self.save_file)
print('save checkpoint at {}'.format(path))
self.model.save(path)
......@@ -17,26 +17,13 @@ import time
import math
import socket
import contextlib
from contextlib import closing
from six import string_types
import numpy as np
from collections import OrderedDict
from paddle import fluid
import paddle.fluid.unique_name as nameGen
from paddle.fluid import core
from paddle.fluid import framework
from paddle import fluid
from paddle.fluid.layers import collective
from paddle.fluid.dygraph import to_variable, no_grad, layers
from paddle.fluid.framework import Variable
from paddle.fluid.executor import global_scope
from paddle.fluid.dygraph.parallel import Env
from paddle.fluid.io import BatchSampler
from paddle.fluid.dygraph.parallel import Env, DataParallel, ParallelStrategy
from paddle.fluid.layers.collective import _c_allreduce, _c_allgather, _c_broadcast, \
_c_sync_comm_stream, _c_sync_calc_stream
from paddle.fluid.io import BatchSampler, DataLoader
__parallel_context_init = False
class DistributedBatchSampler(BatchSampler):
"""Sampler that restricts data loading to a subset of the dataset.
......@@ -71,9 +58,10 @@ class DistributedBatchSampler(BatchSampler):
self.shuffle = shuffle
assert isinstance(drop_last, bool), \
"drop_last should be a boolean number"
self.drop_last = drop_last
self.nranks = get_nranks()
self.local_rank = get_local_rank()
self.nranks = Env().nranks
self.local_rank = Env().local_rank
self.epoch = 0
self.num_samples = int(math.ceil(len(self.dataset) * 1.0 / self.nranks))
self.total_size = self.num_samples * self.nranks
......@@ -106,164 +94,9 @@ class DistributedBatchSampler(BatchSampler):
num_samples += int(not self.drop_last) * (self.batch_size - 1)
return num_samples // self.batch_size
def _all_gather(x, nranks, ring_id=0, use_calc_stream=True):
return _c_allgather(x, nranks, ring_id=ring_id, use_calc_stream=use_calc_stream)
def get_local_rank():
return Env().local_rank
def get_nranks():
return Env().nranks
def wait_server_ready(endpoints):
assert not isinstance(endpoints, string_types)
while True:
all_ok = True
not_ready_endpoints = []
for ep in endpoints:
ip_port = ep.split(":")
with closing(
socket.socket(socket.AF_INET,
socket.SOCK_STREAM)) as sock:
sock.settimeout(2)
result = sock.connect_ex((ip_port[0], int(ip_port[1])))
if result != 0:
all_ok = False
not_ready_endpoints.append(ep)
if not all_ok:
sys.stderr.write("server not ready, wait 3 sec to retry...\n")
sys.stderr.write("not ready endpoints:" + str(
not_ready_endpoints) + "\n")
sys.stderr.flush()
time.sleep(3)
else:
break
def init_communicator(program, rank, nranks, wait_port,
current_endpoint, endpoints):
if nranks < 2:
return
other_endpoints = endpoints[:]
other_endpoints.remove(current_endpoint)
if rank == 0 and wait_port:
wait_server_ready(other_endpoints)
block = program.global_block()
nccl_id_var = block.create_var(
name=nameGen.generate('nccl_id'),
persistable=True,
type=core.VarDesc.VarType.RAW)
block.append_op(
type='c_gen_nccl_id',
inputs={},
outputs={'Out': nccl_id_var},
attrs={
'rank': rank,
'endpoint': current_endpoint,
'other_endpoints': other_endpoints
})
block.append_op(
type='c_comm_init',
inputs={'X': nccl_id_var},
outputs={},
attrs={
'nranks': nranks,
'rank': rank,
'ring_id': 0,
})
def set_epoch(self, epoch):
self.epoch = epoch
def prepare_distributed_context(place=None):
if place is None:
place = fluid.CUDAPlace(Env().dev_id) if Env().nranks > 1 \
else fluid.CUDAPlace(0)
strategy = ParallelStrategy()
strategy.nranks = Env().nranks
strategy.local_rank = Env().local_rank
strategy.trainer_endpoints = Env().trainer_endpoints
strategy.current_endpoint = Env().current_endpoint
if strategy.nranks < 2:
return
global __parallel_context_init
if not __parallel_context_init and isinstance(place, core.CUDAPlace):
def _init_context():
communicator_prog = framework.Program()
init_communicator(communicator_prog, strategy.local_rank, strategy.nranks,
True, strategy.current_endpoint, strategy.trainer_endpoints)
exe = fluid.Executor(place)
exe.run(communicator_prog)
if fluid.in_dygraph_mode():
fluid.disable_dygraph()
_init_context()
fluid.enable_dygraph(place)
else:
_init_context()
else:
assert ("Only support CUDAPlace for now.")
__parallel_context_init = True
return strategy
class DistributedDataParallel(DataParallel):
def __init__(self, layers, strategy=None):
if strategy is None:
strategy = ParallelStrategy()
strategy.nranks = Env().nranks
strategy.local_rank = Env().local_rank
strategy.trainer_endpoints = Env().trainer_endpoints
strategy.current_endpoint = Env().current_endpoint
super(DistributedDataParallel, self).__init__(layers, strategy)
@no_grad
def apply_collective_grads(self):
"""
AllReduce the Parameters' gradient.
"""
if not self._is_data_parallel_mode():
return
grad_var_set = set()
grad_vars = []
for param in self._layers.parameters():
# NOTE(zcd): The grad_ivar maybe no generated.
if param.trainable and param._grad_ivar():
g_var = param._grad_ivar()
grad_vars.append(g_var)
assert g_var not in grad_var_set
grad_var_set.add(g_var)
mega_bytes = 128 * 1024 * 1024
group_idx = 0
memory_counter = 0
grad_var_groups = OrderedDict()
dtype = grad_vars[0].dtype
for g_var in grad_vars:
# Note: the dtype of the same group should be the same.
bytes = np.prod(g_var.shape) * core.size_of_dtype(g_var.dtype)
if memory_counter < mega_bytes and dtype == g_var.dtype:
memory_counter += bytes
else:
memory_counter = bytes
group_idx += 1
grad_var_groups.setdefault(group_idx, []).append(g_var)
coalesced_grads_and_vars = self._coalesce_tensors(grad_var_groups)
for coalesced_grad, _, _ in coalesced_grads_and_vars:
collective._c_allreduce(coalesced_grad, coalesced_grad, use_calc_stream=True)
self._split_tensors(coalesced_grads_and_vars)
def _all_gather(x, nranks, ring_id=0, use_calc_stream=True):
return collective._c_allgather(x, nranks, ring_id=ring_id, use_calc_stream=use_calc_stream)
......@@ -20,27 +20,26 @@ import pickle
import numpy as np
import six
import warnings
from collections import Iterable
from collections import OrderedDict
from collections import OrderedDict
from collections import Iterable, OrderedDict
from paddle import fluid
from paddle.fluid.framework import in_dygraph_mode, Variable
from paddle.fluid.executor import global_scope
from paddle.fluid.io import is_belong_to_optimizer
from paddle.fluid.dygraph.base import to_variable
from paddle.fluid.dygraph.parallel import Env
from paddle.fluid.incubate.fleet.collective import fleet, DistributedStrategy
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
import distributed
from distributed import DistributedBatchSampler
from paddle.fluid.incubate.fleet.base import role_maker
from paddle.fluid.io import DataLoader
from distributed import DistributedBatchSampler, _all_gather
from metrics import Metric
from callbacks import config_callbacks
__all__ = ['Model', 'Loss', 'CrossEntropy', 'Input']
_parallel_context_inited = False
def to_list(value):
if value is None:
......@@ -85,18 +84,6 @@ def extract_args(func):
return inspect.getargspec(func)[0]
def init_context(backend):
assert isinstance(backend, str) and backend.lower() in ['dynamic', 'static'], \
"Expected backend in ['dynamic', 'static'], but got {}".format(backend)
place = fluid.CUDAPlace(distributed.Env().dev_id) if \
distributed.Env().nranks > 1 else fluid.CUDAPlace(0)
distributed.prepare_distributed_context(place)
backend = backend.lower()
if backend == 'dynamic':
fluid.enable_dygraph(place)
class Input(fluid.dygraph.Layer):
def __init__(self, shape=None, dtype=None, name=None):
super(Input, self).__init__()
......@@ -158,8 +145,8 @@ class StaticGraphAdapter(object):
self._merge_count = {'eval_total': 0, 'test_total': 0,
'eval_batch': 0, 'test_batch': 0}
self._nranks = distributed.Env().nranks
self._local_rank = distributed.Env().local_rank
self._nranks = Env().nranks
self._local_rank = Env().local_rank
@property
def mode(self):
......@@ -424,9 +411,9 @@ class StaticGraphAdapter(object):
losses = self.model._loss_function(outputs, labels)
if self._nranks > 1 and mode != 'train':
outputs = [distributed._all_gather(o, self._nranks) for o in outputs]
outputs = [_all_gather(o, self._nranks) for o in outputs]
if mode != 'test':
labels = [distributed._all_gather(l, self._nranks) for l in labels]
labels = [_all_gather(l, self._nranks) for l in labels]
if mode != 'test':
for metric in self.model._metrics:
......@@ -476,7 +463,7 @@ class StaticGraphAdapter(object):
# therefore startup program only needs to run once
if self._executor is None:
if self._nranks > 1 and device.lower() == 'gpu':
gpu_id = int(distributed.Env().dev_id)
gpu_id = int(Env().dev_id)
place = fluid.CUDAPlace(gpu_id) if device.lower() == 'gpu' else fluid.CPUPlace()
else:
place = places[0]
......@@ -512,13 +499,18 @@ class DynamicGraphAdapter(object):
def __init__(self, model):
super(DynamicGraphAdapter, self).__init__()
self.model = model
self._nranks = distributed.Env().nranks
self._local_rank = distributed.Env().local_rank
self._nranks = Env().nranks
self._local_rank = Env().local_rank
self._merge_count = {'eval_total': 0, 'test_total': 0,
'eval_batch': 0, 'test_batch': 0}
if self._nranks > 1:
self.ddp_model = distributed.DistributedDataParallel(self.model)
stradegy = fluid.dygraph.parallel.ParallelStrategy()
stradegy.nranks = Env().nranks
stradegy.local_rank = Env().local_rank
stradegy.trainer_endpoints = Env().trainer_endpoints
stradegy.current_endpoint = Env().current_endpoint
self.ddp_model = fluid.dygraph.parallel.DataParallel(self.model, stradegy)
@property
def mode(self):
......@@ -573,8 +565,8 @@ class DynamicGraphAdapter(object):
else:
losses = []
if self._nranks > 1:
outputs = [distributed._all_gather(o, self._nranks) for o in to_list(outputs)]
labels = [distributed._all_gather(l, self._nranks) for l in labels]
outputs = [_all_gather(o, self._nranks) for o in to_list(outputs)]
labels = [_all_gather(l, self._nranks) for l in labels]
metrics = []
for metric in self.model._metrics:
# cut off padding value.
......@@ -608,7 +600,7 @@ class DynamicGraphAdapter(object):
inputs = [to_variable(x) for x in to_list(inputs)]
outputs = self.model.forward(*inputs)
if self._nranks > 2:
outputs = [distributed._all_gather(o, self._nranks) for o in to_list(outputs)]
outputs = [_all_gather(o, self._nranks) for o in to_list(outputs)]
return [to_numpy(o) for o in to_list(outputs)]
def parameters(self, *args, **kwargs):
......@@ -696,8 +688,20 @@ class Model(fluid.dygraph.Layer):
self._test_dataloader = None
# init multiple gpus context
self._place = fluid.CUDAPlace(distributed.Env().dev_id) \
if distributed.Env().nranks > 1 else fluid.CUDAPlace(0)
self._place = fluid.CUDAPlace(Env().dev_id) \
if Env().nranks > 1 else fluid.CUDAPlace(0)
global _parallel_context_inited
if Env().nranks > 1 and not _parallel_context_inited:
if fluid.in_dygraph_mode():
fluid.disable_dygraph()
fluid.enable_dygraph(self._place)
fluid.dygraph.parallel.prepare_context()
else:
fluid.enable_dygraph(self._place)
fluid.dygraph.parallel.prepare_context()
fluid.disable_dygraph()
_parallel_context_inited = True
# init backend
if fluid.in_dygraph_mode():
......@@ -715,7 +719,7 @@ class Model(fluid.dygraph.Layer):
return self._adapter.test(*args, **kwargs)
def save(self, *args, **kwargs):
if distributed.get_local_rank() == 0:
if Env().local_rank == 0:
return self._adapter.save(*args, **kwargs)
def load(self, path, skip_mismatch=False, reset_optimizer=False):
......@@ -829,7 +833,7 @@ class Model(fluid.dygraph.Layer):
the variable to the environment variable and set its value to 1.
The default is None.
"""
self._optimizer = optimizer
if loss_function:
if not isinstance(loss_function, Loss):
......@@ -972,7 +976,7 @@ class Model(fluid.dygraph.Layer):
logs['step'] = step
if mode == 'train' or self._adapter._merge_count.get(mode + '_batch', 0) <= 0:
logs['batch_size'] = batch_size * distributed.Env().nranks
logs['batch_size'] = batch_size * Env().nranks
else:
logs['batch_size'] = self._adapter._merge_count[mode + '_batch']
......
......@@ -28,7 +28,7 @@ import contextlib
import paddle
from paddle import fluid
from paddle.fluid.dygraph.nn import Conv2D, Pool2D, Linear
from model import Model, CrossEntropy, Input, Loss, init_context
from model import Model, CrossEntropy, Input, Loss
from metrics import Accuracy
from callbacks import ProgBarLogger
from paddle.fluid.io import BatchSampler, DataLoader
......@@ -141,7 +141,7 @@ class MyCrossEntropy(Loss):
class TestModel(unittest.TestCase):
def fit(self, dynamic, is_mlp=False):
init_context('dynamic' if dynamic else 'static')
fluid.enable_dygraph() if dynamic else None
im_shape = (-1, 784)
batch_size = 128
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册