提交 5d059ec7 编写于 作者: G gavin1332

supprts distributed classification with unittests

test=develop
上级 93097f9c
...@@ -26,7 +26,7 @@ __global__ void ShardIndexInner(const T* in_data, T* out_data, ...@@ -26,7 +26,7 @@ __global__ void ShardIndexInner(const T* in_data, T* out_data,
const int64_t numel, const int index_num, const int64_t numel, const int index_num,
const int nshards, const int shard_id, const int nshards, const int shard_id,
const int ignore_value) { const int ignore_value) {
int shard_size = index_num / nshards; int shard_size = (index_num + nshards - 1) / nshards;
int idx = blockIdx.x * blockDim.x + threadIdx.x; int idx = blockIdx.x * blockDim.x + threadIdx.x;
if (idx < numel) { if (idx < numel) {
assert(in_data[idx] >= 0 && in_data[idx] < index_num); assert(in_data[idx] >= 0 && in_data[idx] < index_num);
......
...@@ -34,7 +34,7 @@ class ShardIndexCPUKernel : public framework::OpKernel<T> { ...@@ -34,7 +34,7 @@ class ShardIndexCPUKernel : public framework::OpKernel<T> {
PADDLE_ENFORCE(shard_id >= 0 && shard_id < nshards, PADDLE_ENFORCE(shard_id >= 0 && shard_id < nshards,
"shard_id(%d) is not in range [0, %d)", shard_id, nshards); "shard_id(%d) is not in range [0, %d)", shard_id, nshards);
int shard_size = index_num / nshards; int shard_size = (index_num + nshards - 1) / nshards;
out->Resize(in->dims()); out->Resize(in->dims());
out->set_lod(in->lod()); out->set_lod(in->lod());
......
...@@ -13653,9 +13653,7 @@ def shard_index(input, index_num, nshards, shard_id, ignore_value=-1): ...@@ -13653,9 +13653,7 @@ def shard_index(input, index_num, nshards, shard_id, ignore_value=-1):
.. math:: .. math::
assert index_num % nshards == 0 shard_size = (index_num + nshards - 1) // nshards
shard_size = index_num / nshards
y = x % shard_size if x / shard_size == shard_id else ignore_value y = x % shard_size if x / shard_size == shard_id else ignore_value
...@@ -13705,10 +13703,6 @@ def shard_index(input, index_num, nshards, shard_id, ignore_value=-1): ...@@ -13705,10 +13703,6 @@ def shard_index(input, index_num, nshards, shard_id, ignore_value=-1):
""" """
op_type = 'shard_index' op_type = 'shard_index'
helper = LayerHelper(op_type, **locals()) helper = LayerHelper(op_type, **locals())
if index_num % nshards != 0:
raise ValueError(
'The index_num(%d) cannot be evenly divided by nshards(%d)' %
(index_num, nshards))
if shard_id < 0 or shard_id >= nshards: if shard_id < 0 or shard_id >= nshards:
raise ValueError('The shard_id(%d) should be in [0, %d)' % raise ValueError('The shard_id(%d) should be in [0, %d)' %
(shard_id, nshards)) (shard_id, nshards))
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
import numpy as np
import paddle.fluid as fluid
import paddle.fluid.layers as layers
from paddle.fluid.initializer import NumpyArrayInitializer
from test_dist_classification_base import DistClassificationRunner, runtime_main
# TODO bias attr
class DistSoftmaxClassificationRunner(DistClassificationRunner):
@classmethod
def add_arguments(cls, parser):
pass
def __init__(self, args):
super(DistSoftmaxClassificationRunner, self).__init__(args)
np.random.seed(1024)
self.param_value = np.random.rand(args.feature_size, args.class_num)
def local_classify_subnet(self, feature, label):
args = self.args
logits = layers.fc(feature,
args.class_num,
param_attr=NumpyArrayInitializer(self.param_value))
loss = layers.softmax_with_cross_entropy(logits, label)
cost = layers.mean(loss)
return cost
def parall_classify_subnet(self, feature, label):
args = self.args
shard_dim = (args.class_num + args.nranks - 1) // args.nranks
shard_start = shard_dim * args.rank
rank_param_value = self.param_value[:, shard_start:(shard_start +
shard_dim)]
cost = layers.collective._distributed_fc_classify(
x=feature,
label=label,
class_num=args.class_num,
nranks=args.nranks,
rank_id=args.rank,
param_attr=NumpyArrayInitializer(rank_param_value))
return cost
if __name__ == "__main__":
runtime_main(DistSoftmaxClassificationRunner)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from datetime import datetime
import unittest
import os
import sys
import subprocess
import six
import argparse
import pickle
import numpy as np
import paddle.fluid as fluid
from paddle.fluid.transpiler.collective import \
GradAllReduce, DistributedClassificationOptimizer
DEFAULT_BATCH_SIZE = 2
DEFAULT_FEATURE_SIZE = 4
DEFAULT_CLASS_NUM = 4
DEFAULT_LR = 0.001
RUN_STEPS = 5
def stdprint(value):
if six.PY2:
print(pickle.dumps(value))
else:
sys.stdout.buffer.write(pickle.dumps(value))
def log(ref, message, print2pipe=False):
localtime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
log_str = '[%s] [%s] %s' % (localtime, type(ref).__name__, message)
if print2pipe:
if six.PY2:
sys.stderr.write(pickle.dumps(log_str))
else:
sys.stderr.buffer.write(pickle.dumps(log_str))
else:
sys.stderr.write(log_str + "\n")
class DistClassificationRunner(object):
def __init__(self, args):
args.rank = int(os.getenv('PADDLE_TRAINER_ID', '0'))
args.current_endpoint = os.getenv('PADDLE_CURRENT_ENDPOINT')
args.nranks = int(os.getenv('PADDLE_TRAINERS_NUM', '1'))
args.endpoints = os.getenv('PADDLE_TRAINER_ENDPOINTS', '').split(',')
args.device_id = int(os.getenv('FLAGS_selected_gpus', '0'))
self.args = args
def log(self, message, print2pipe=False):
log(self, message, print2pipe)
def local_classify_subnet(self, feature, label):
raise NotImplementedError(
'get_local_model should be implemented by child classes.')
def parall_classify_subnet(self, feature, label):
raise NotImplementedError(
'get_parall_model should be implemented by child classes.')
def build_net(self):
args = self.args
main_prog = fluid.Program()
start_prog = fluid.Program()
optimizer = fluid.optimizer.SGD(learning_rate=args.lr)
with fluid.program_guard(main_prog, start_prog):
feature = fluid.layers.data(
name='feature', shape=[args.feature_size], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
if args.nranks <= 1:
log(self, 'build local network')
loss = self.local_classify_subnet(feature, label)
optimizer.minimize(loss)
else:
log(self, 'build parallel network')
loss = self.parall_classify_subnet(feature, label)
# TODO why need batch size?
optimizer_wrapper = DistributedClassificationOptimizer(
optimizer, args.batch_size)
optimizer_wrapper.minimize(loss)
self.transpile(main_prog, start_prog)
return [feature, label], loss, start_prog
def gen_rank_batch(self):
args = self.args
def generate_global_batch():
if not hasattr(self, 'seed'):
self.seed = args.batch_size * args.nranks
np.random.seed(self.seed)
self.seed += 1
global_batch_size = args.batch_size * args.nranks
return [[
np.random.rand(args.feature_size),
np.random.randint(args.class_num)
] for i in range(global_batch_size)]
rank_batch = []
global_batch = generate_global_batch()
for i, sample in enumerate(global_batch):
if i // args.batch_size == args.rank:
rank_batch.append(sample)
log(self, rank_batch)
return rank_batch
def transpile(self, main_prog, start_prog):
args = self.args
transpiler = GradAllReduce()
transpiler.transpile(
startup_program=start_prog,
main_program=main_prog,
rank=args.rank,
endpoints=args.endpoints,
current_endpoint=args.current_endpoint,
wait_port=True)
def run(self):
feed_vars, loss, start_prog = self.build_net()
main_prog = loss.block.program
place = fluid.CUDAPlace(self.args.device_id)
exe = fluid.Executor(place)
exe.run(start_prog)
log(self, 'finish running startup program.')
feeder = fluid.DataFeeder(feed_vars, place)
log(self, 'start to train')
out_losses = []
for i in range(RUN_STEPS):
losses = exe.run(main_prog,
fetch_list=[loss],
feed=feeder.feed(self.gen_rank_batch()))
out_losses.append(losses[0][0])
log(self, "step %d loss: %f" % (i, losses[0][0]))
log(self, 'finish training')
stdprint(out_losses)
@classmethod
def add_arguments(cls, parser):
pass
def runtime_main(test_class):
parser = argparse.ArgumentParser(
description='Run distributed classification test.')
parser.add_argument('--batch_size', type=int, required=True)
parser.add_argument(
'--feature_size', type=int, default=DEFAULT_FEATURE_SIZE)
parser.add_argument('--class_num', type=int, default=DEFAULT_CLASS_NUM)
parser.add_argument('--lr', type=float, default=DEFAULT_LR)
test_class.add_arguments(parser)
args = parser.parse_args()
trainer = test_class(args)
trainer.run()
import socket
from contextlib import closing
class TestDistClassificationBase(unittest.TestCase):
# override configurations in setUp
def setup_config(self):
raise NotImplementedError('tests should have setup_config implemented')
def setUp(self):
self.nranks = 2
self.batch_size = DEFAULT_BATCH_SIZE
self.setup_config()
self.global_batch_size = self.batch_size * self.nranks
self.endpoints = [
'127.0.0.1:%d' % self.find_free_port() for i in range(self.nranks)
]
def find_free_port(self):
while True:
with closing(socket.socket(socket.AF_INET,
socket.SOCK_STREAM)) as s:
s.bind(('', 0))
log(self, 'socket port: %s' % s.getsockname()[1])
port = s.getsockname()[1]
return port
def run_local(self, train_script, user_env):
env = {}
cmd = '%s -u %s --batch_size %d' % (sys.executable, train_script,
self.global_batch_size)
if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
env['COVERAGE_FILE'] = os.getenv('COVERAGE_FILE', '')
cmd += ' -m coverage run --branch -p'
env.update(user_env)
log(self, 'local_cmd: %s' % cmd)
log(self, 'local_env: %s' % env)
ferr = open('/tmp/local.log', 'w')
proc = subprocess.Popen(
cmd.split(' '),
stdout=subprocess.PIPE,
#stderr=subprocess.PIPE,
stderr=ferr,
env=env)
out, err = proc.communicate()
ferr.close()
log(self, 'local_stdout: %s' % pickle.loads(out))
#log(self, 'local_stderr: %s' % pickle.loads(err))
return pickle.loads(out)
def get_parall_env(self, rank):
env = {
'FLAGS_selected_gpus': str(rank),
'PADDLE_TRAINER_ID': str(rank),
'PADDLE_CURRENT_ENDPOINT': self.endpoints[rank],
'PADDLE_TRAINERS_NUM': str(self.nranks),
'PADDLE_TRAINER_ENDPOINTS': ','.join(self.endpoints),
}
if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
env['COVERAGE_FILE'] = os.getenv('COVERAGE_FILE', '')
return env
def run_parall(self, train_script, user_env):
cmd = '%s -u %s --batch_size %d' % (sys.executable, train_script,
self.batch_size)
if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
cmd += ' -m coverage run --branch -p'
procs = []
ferrs = []
for rank in range(self.nranks):
env = self.get_parall_env(rank)
env.update(user_env)
log(self, '[r%d] parall_cmd: %s' % (rank, cmd))
log(self, '[r%d] parall_env: %s' % (rank, env))
ferr = open('/tmp/parall_tr%d.log' % rank, 'w')
proc = subprocess.Popen(
cmd.strip().split(' '),
stdout=subprocess.PIPE,
stderr=ferr,
env=env)
procs.append(proc)
ferrs.append(ferr)
outs = []
for rank in range(self.nranks):
out, err = procs[rank].communicate()
ferrs[rank].close()
outs.append(out)
#log(self, '[r%d] parall_stderr: %s' % (rank, pickle.loads(err)))
return [pickle.loads(outs[i]) for i in range(self.nranks)]
def compare_parall_to_local(self, train_script, delta=1e-3, user_envs={}):
required_envs = {
'PATH': os.getenv('PATH', ''),
'PYTHONPATH': os.getenv('PYTHONPATH', ''),
'LD_LIBRARY_PATH': os.getenv('LD_LIBRARY_PATH', ''),
'FLAGS_fraction_of_gpu_memory_to_use': '0.15',
'FLAGS_rpc_deadline': '30000', # 5s to fail fast
'FLAGS_cudnn_deterministic': '1',
'NCCL_P2P_DISABLE': '1',
'NCCL_SHM_DISABLE': '1'
}
required_envs.update(user_envs)
local_losses = self.run_local(train_script, required_envs)
parall_losses = self.run_parall(train_script, required_envs)
for i in range(RUN_STEPS):
local_loss = local_losses[i]
parall_loss = sum(
[parall_losses[j][i] for j in range(self.nranks)]) / self.nranks
log(self, '======= local_loss : parall_loss =======')
log(self, '======= %s : %s =======' % (local_loss, parall_loss))
self.assertAlmostEqual(local_loss, parall_loss, delta=delta)
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from test_dist_classification_base import TestDistClassificationBase
class TestDistSoftmaxClassification(TestDistClassificationBase):
def setup_config(self):
pass
def test_dist_train(self):
import paddle.fluid as fluid
if fluid.core.is_compiled_with_cuda():
self.compare_parall_to_local(
"dist_softmax_classification.py", delta=1e-4)
if __name__ == "__main__":
unittest.main()
...@@ -82,10 +82,8 @@ class Collective(object): ...@@ -82,10 +82,8 @@ class Collective(object):
self.wait_port = wait_port self.wait_port = wait_port
self.startup_program._origin_program = self.startup_program.clone()
self._transpile_startup_program() self._transpile_startup_program()
self.main_program._origin_program = self.main_program.clone()
self._transpile_main_program() self._transpile_main_program()
def _transpile_main_program(self): def _transpile_main_program(self):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册