diff --git a/paddle/fluid/operators/shard_index_op.cu b/paddle/fluid/operators/shard_index_op.cu index 08503e3e1a8fe66b20f1e23012c584f9e32b4a01..db29b73f9eae463c977e96293d870bdf77addce9 100644 --- a/paddle/fluid/operators/shard_index_op.cu +++ b/paddle/fluid/operators/shard_index_op.cu @@ -26,7 +26,7 @@ __global__ void ShardIndexInner(const T* in_data, T* out_data, const int64_t numel, const int index_num, const int nshards, const int shard_id, const int ignore_value) { - int shard_size = index_num / nshards; + int shard_size = (index_num + nshards - 1) / nshards; int idx = blockIdx.x * blockDim.x + threadIdx.x; if (idx < numel) { assert(in_data[idx] >= 0 && in_data[idx] < index_num); diff --git a/paddle/fluid/operators/shard_index_op.h b/paddle/fluid/operators/shard_index_op.h index f060b3fdf182a2bf7fe03b1d86db41c4d1cfb340..f943de586bc242324596853455cf94cd80837953 100644 --- a/paddle/fluid/operators/shard_index_op.h +++ b/paddle/fluid/operators/shard_index_op.h @@ -34,7 +34,7 @@ class ShardIndexCPUKernel : public framework::OpKernel { PADDLE_ENFORCE(shard_id >= 0 && shard_id < nshards, "shard_id(%d) is not in range [0, %d)", shard_id, nshards); - int shard_size = index_num / nshards; + int shard_size = (index_num + nshards - 1) / nshards; out->Resize(in->dims()); out->set_lod(in->lod()); diff --git a/python/paddle/fluid/layers/nn.py b/python/paddle/fluid/layers/nn.py index d78d94454e65b306b3c7c4bddef05c08dd7a4d74..94765565ed3522ec6a7e79098cbed6659cfd116b 100755 --- a/python/paddle/fluid/layers/nn.py +++ b/python/paddle/fluid/layers/nn.py @@ -13653,9 +13653,7 @@ def shard_index(input, index_num, nshards, shard_id, ignore_value=-1): .. math:: - assert index_num % nshards == 0 - - shard_size = index_num / nshards + shard_size = (index_num + nshards - 1) // nshards y = x % shard_size if x / shard_size == shard_id else ignore_value @@ -13705,10 +13703,6 @@ def shard_index(input, index_num, nshards, shard_id, ignore_value=-1): """ op_type = 'shard_index' helper = LayerHelper(op_type, **locals()) - if index_num % nshards != 0: - raise ValueError( - 'The index_num(%d) cannot be evenly divided by nshards(%d)' % - (index_num, nshards)) if shard_id < 0 or shard_id >= nshards: raise ValueError('The shard_id(%d) should be in [0, %d)' % (shard_id, nshards)) diff --git a/python/paddle/fluid/tests/unittests/dist_softmax_classification.py b/python/paddle/fluid/tests/unittests/dist_softmax_classification.py new file mode 100644 index 0000000000000000000000000000000000000000..c041f0bb4dd3fcf00bf7d1cb1f7509c875623c3e --- /dev/null +++ b/python/paddle/fluid/tests/unittests/dist_softmax_classification.py @@ -0,0 +1,61 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import unittest +import numpy as np +import paddle.fluid as fluid +import paddle.fluid.layers as layers +from paddle.fluid.initializer import NumpyArrayInitializer +from test_dist_classification_base import DistClassificationRunner, runtime_main + + +# TODO bias attr +class DistSoftmaxClassificationRunner(DistClassificationRunner): + @classmethod + def add_arguments(cls, parser): + pass + + def __init__(self, args): + super(DistSoftmaxClassificationRunner, self).__init__(args) + np.random.seed(1024) + self.param_value = np.random.rand(args.feature_size, args.class_num) + + def local_classify_subnet(self, feature, label): + args = self.args + logits = layers.fc(feature, + args.class_num, + param_attr=NumpyArrayInitializer(self.param_value)) + loss = layers.softmax_with_cross_entropy(logits, label) + cost = layers.mean(loss) + return cost + + def parall_classify_subnet(self, feature, label): + args = self.args + shard_dim = (args.class_num + args.nranks - 1) // args.nranks + shard_start = shard_dim * args.rank + rank_param_value = self.param_value[:, shard_start:(shard_start + + shard_dim)] + cost = layers.collective._distributed_fc_classify( + x=feature, + label=label, + class_num=args.class_num, + nranks=args.nranks, + rank_id=args.rank, + param_attr=NumpyArrayInitializer(rank_param_value)) + return cost + + +if __name__ == "__main__": + runtime_main(DistSoftmaxClassificationRunner) diff --git a/python/paddle/fluid/tests/unittests/test_dist_classification_base.py b/python/paddle/fluid/tests/unittests/test_dist_classification_base.py new file mode 100644 index 0000000000000000000000000000000000000000..b203088ab515f77503781ec2c2fad6ec11eb0e7a --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dist_classification_base.py @@ -0,0 +1,305 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +from datetime import datetime + +import unittest +import os +import sys +import subprocess +import six +import argparse +import pickle +import numpy as np +import paddle.fluid as fluid + +from paddle.fluid.transpiler.collective import \ + GradAllReduce, DistributedClassificationOptimizer + +DEFAULT_BATCH_SIZE = 2 +DEFAULT_FEATURE_SIZE = 4 +DEFAULT_CLASS_NUM = 4 +DEFAULT_LR = 0.001 + +RUN_STEPS = 5 + + +def stdprint(value): + if six.PY2: + print(pickle.dumps(value)) + else: + sys.stdout.buffer.write(pickle.dumps(value)) + + +def log(ref, message, print2pipe=False): + localtime = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + log_str = '[%s] [%s] %s' % (localtime, type(ref).__name__, message) + if print2pipe: + if six.PY2: + sys.stderr.write(pickle.dumps(log_str)) + else: + sys.stderr.buffer.write(pickle.dumps(log_str)) + else: + sys.stderr.write(log_str + "\n") + + +class DistClassificationRunner(object): + def __init__(self, args): + args.rank = int(os.getenv('PADDLE_TRAINER_ID', '0')) + args.current_endpoint = os.getenv('PADDLE_CURRENT_ENDPOINT') + args.nranks = int(os.getenv('PADDLE_TRAINERS_NUM', '1')) + args.endpoints = os.getenv('PADDLE_TRAINER_ENDPOINTS', '').split(',') + args.device_id = int(os.getenv('FLAGS_selected_gpus', '0')) + self.args = args + + def log(self, message, print2pipe=False): + log(self, message, print2pipe) + + def local_classify_subnet(self, feature, label): + raise NotImplementedError( + 'get_local_model should be implemented by child classes.') + + def parall_classify_subnet(self, feature, label): + raise NotImplementedError( + 'get_parall_model should be implemented by child classes.') + + def build_net(self): + args = self.args + main_prog = fluid.Program() + start_prog = fluid.Program() + optimizer = fluid.optimizer.SGD(learning_rate=args.lr) + with fluid.program_guard(main_prog, start_prog): + feature = fluid.layers.data( + name='feature', shape=[args.feature_size], dtype='float32') + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + if args.nranks <= 1: + log(self, 'build local network') + loss = self.local_classify_subnet(feature, label) + optimizer.minimize(loss) + else: + log(self, 'build parallel network') + loss = self.parall_classify_subnet(feature, label) + # TODO why need batch size? + optimizer_wrapper = DistributedClassificationOptimizer( + optimizer, args.batch_size) + optimizer_wrapper.minimize(loss) + self.transpile(main_prog, start_prog) + + return [feature, label], loss, start_prog + + def gen_rank_batch(self): + args = self.args + + def generate_global_batch(): + if not hasattr(self, 'seed'): + self.seed = args.batch_size * args.nranks + np.random.seed(self.seed) + self.seed += 1 + + global_batch_size = args.batch_size * args.nranks + return [[ + np.random.rand(args.feature_size), + np.random.randint(args.class_num) + ] for i in range(global_batch_size)] + + rank_batch = [] + global_batch = generate_global_batch() + for i, sample in enumerate(global_batch): + if i // args.batch_size == args.rank: + rank_batch.append(sample) + + log(self, rank_batch) + + return rank_batch + + def transpile(self, main_prog, start_prog): + args = self.args + transpiler = GradAllReduce() + transpiler.transpile( + startup_program=start_prog, + main_program=main_prog, + rank=args.rank, + endpoints=args.endpoints, + current_endpoint=args.current_endpoint, + wait_port=True) + + def run(self): + feed_vars, loss, start_prog = self.build_net() + main_prog = loss.block.program + + place = fluid.CUDAPlace(self.args.device_id) + exe = fluid.Executor(place) + exe.run(start_prog) + log(self, 'finish running startup program.') + + feeder = fluid.DataFeeder(feed_vars, place) + + log(self, 'start to train') + out_losses = [] + for i in range(RUN_STEPS): + losses = exe.run(main_prog, + fetch_list=[loss], + feed=feeder.feed(self.gen_rank_batch())) + out_losses.append(losses[0][0]) + log(self, "step %d loss: %f" % (i, losses[0][0])) + + log(self, 'finish training') + + stdprint(out_losses) + + @classmethod + def add_arguments(cls, parser): + pass + + +def runtime_main(test_class): + parser = argparse.ArgumentParser( + description='Run distributed classification test.') + parser.add_argument('--batch_size', type=int, required=True) + parser.add_argument( + '--feature_size', type=int, default=DEFAULT_FEATURE_SIZE) + parser.add_argument('--class_num', type=int, default=DEFAULT_CLASS_NUM) + parser.add_argument('--lr', type=float, default=DEFAULT_LR) + test_class.add_arguments(parser) + args = parser.parse_args() + + trainer = test_class(args) + trainer.run() + + +import socket +from contextlib import closing + + +class TestDistClassificationBase(unittest.TestCase): + # override configurations in setUp + def setup_config(self): + raise NotImplementedError('tests should have setup_config implemented') + + def setUp(self): + self.nranks = 2 + self.batch_size = DEFAULT_BATCH_SIZE + self.setup_config() + + self.global_batch_size = self.batch_size * self.nranks + self.endpoints = [ + '127.0.0.1:%d' % self.find_free_port() for i in range(self.nranks) + ] + + def find_free_port(self): + while True: + with closing(socket.socket(socket.AF_INET, + socket.SOCK_STREAM)) as s: + s.bind(('', 0)) + log(self, 'socket port: %s' % s.getsockname()[1]) + port = s.getsockname()[1] + return port + + def run_local(self, train_script, user_env): + env = {} + cmd = '%s -u %s --batch_size %d' % (sys.executable, train_script, + self.global_batch_size) + if os.getenv('WITH_COVERAGE', 'OFF') == 'ON': + env['COVERAGE_FILE'] = os.getenv('COVERAGE_FILE', '') + cmd += ' -m coverage run --branch -p' + env.update(user_env) + + log(self, 'local_cmd: %s' % cmd) + log(self, 'local_env: %s' % env) + + ferr = open('/tmp/local.log', 'w') + proc = subprocess.Popen( + cmd.split(' '), + stdout=subprocess.PIPE, + #stderr=subprocess.PIPE, + stderr=ferr, + env=env) + + out, err = proc.communicate() + ferr.close() + + log(self, 'local_stdout: %s' % pickle.loads(out)) + #log(self, 'local_stderr: %s' % pickle.loads(err)) + + return pickle.loads(out) + + def get_parall_env(self, rank): + env = { + 'FLAGS_selected_gpus': str(rank), + 'PADDLE_TRAINER_ID': str(rank), + 'PADDLE_CURRENT_ENDPOINT': self.endpoints[rank], + 'PADDLE_TRAINERS_NUM': str(self.nranks), + 'PADDLE_TRAINER_ENDPOINTS': ','.join(self.endpoints), + } + if os.getenv('WITH_COVERAGE', 'OFF') == 'ON': + env['COVERAGE_FILE'] = os.getenv('COVERAGE_FILE', '') + return env + + def run_parall(self, train_script, user_env): + cmd = '%s -u %s --batch_size %d' % (sys.executable, train_script, + self.batch_size) + if os.getenv('WITH_COVERAGE', 'OFF') == 'ON': + cmd += ' -m coverage run --branch -p' + + procs = [] + ferrs = [] + for rank in range(self.nranks): + env = self.get_parall_env(rank) + env.update(user_env) + log(self, '[r%d] parall_cmd: %s' % (rank, cmd)) + log(self, '[r%d] parall_env: %s' % (rank, env)) + + ferr = open('/tmp/parall_tr%d.log' % rank, 'w') + proc = subprocess.Popen( + cmd.strip().split(' '), + stdout=subprocess.PIPE, + stderr=ferr, + env=env) + procs.append(proc) + ferrs.append(ferr) + + outs = [] + for rank in range(self.nranks): + out, err = procs[rank].communicate() + ferrs[rank].close() + + outs.append(out) + #log(self, '[r%d] parall_stderr: %s' % (rank, pickle.loads(err))) + + return [pickle.loads(outs[i]) for i in range(self.nranks)] + + def compare_parall_to_local(self, train_script, delta=1e-3, user_envs={}): + required_envs = { + 'PATH': os.getenv('PATH', ''), + 'PYTHONPATH': os.getenv('PYTHONPATH', ''), + 'LD_LIBRARY_PATH': os.getenv('LD_LIBRARY_PATH', ''), + 'FLAGS_fraction_of_gpu_memory_to_use': '0.15', + 'FLAGS_rpc_deadline': '30000', # 5s to fail fast + 'FLAGS_cudnn_deterministic': '1', + 'NCCL_P2P_DISABLE': '1', + 'NCCL_SHM_DISABLE': '1' + } + required_envs.update(user_envs) + + local_losses = self.run_local(train_script, required_envs) + parall_losses = self.run_parall(train_script, required_envs) + + for i in range(RUN_STEPS): + local_loss = local_losses[i] + parall_loss = sum( + [parall_losses[j][i] for j in range(self.nranks)]) / self.nranks + log(self, '======= local_loss : parall_loss =======') + log(self, '======= %s : %s =======' % (local_loss, parall_loss)) + self.assertAlmostEqual(local_loss, parall_loss, delta=delta) diff --git a/python/paddle/fluid/tests/unittests/test_dist_softmax_classification.py b/python/paddle/fluid/tests/unittests/test_dist_softmax_classification.py new file mode 100644 index 0000000000000000000000000000000000000000..bc371a13cd524897b318fed762b81b5ac9c8f4d0 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_dist_softmax_classification.py @@ -0,0 +1,31 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from test_dist_classification_base import TestDistClassificationBase + + +class TestDistSoftmaxClassification(TestDistClassificationBase): + def setup_config(self): + pass + + def test_dist_train(self): + import paddle.fluid as fluid + if fluid.core.is_compiled_with_cuda(): + self.compare_parall_to_local( + "dist_softmax_classification.py", delta=1e-4) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/transpiler/collective.py b/python/paddle/fluid/transpiler/collective.py index af82cd451acc7ccb19e0c4fd8a7e14bfaa1d9e38..ecca373492157adc16a6e2fc6bbd8cbbe72f1324 100644 --- a/python/paddle/fluid/transpiler/collective.py +++ b/python/paddle/fluid/transpiler/collective.py @@ -82,10 +82,8 @@ class Collective(object): self.wait_port = wait_port - self.startup_program._origin_program = self.startup_program.clone() self._transpile_startup_program() - self.main_program._origin_program = self.main_program.clone() self._transpile_main_program() def _transpile_main_program(self):