未验证 提交 e5414f76 编写于 作者: J jjyaoao 提交者: GitHub

[Test Mv] remove mlu (#52064)

上级 24740ccd
...@@ -869,10 +869,6 @@ if(WITH_IPU) ...@@ -869,10 +869,6 @@ if(WITH_IPU)
add_subdirectory(ipu) add_subdirectory(ipu)
endif() endif()
if(WITH_MLU)
add_subdirectory(mlu)
endif()
add_subdirectory(asp) add_subdirectory(asp)
add_subdirectory(ir) add_subdirectory(ir)
......
file(
GLOB TEST_OPS
RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}"
"test_*.py")
string(REPLACE ".py" "" TEST_OPS "${TEST_OPS}")
file(
GLOB TEST_DIST_OPS
RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}"
"test_collective_*.py")
string(REPLACE ".py" "" TEST_DIST_OPS "${TEST_DIST_OPS}")
if(WITH_MLU)
foreach(TEST_OP ${TEST_DIST_OPS})
list(REMOVE_ITEM TEST_OPS ${TEST_OP})
endforeach()
list(REMOVE_ITEM TEST_OPS "test_spawn_mlu")
foreach(TEST_OP ${TEST_OPS})
py_test_modules(${TEST_OP} MODULES ${TEST_OP})
endforeach()
if(WITH_CNCL)
list(APPEND TEST_DIST_OPS "test_spawn_mlu")
foreach(TEST_OP ${TEST_DIST_OPS})
py_test_modules(${TEST_OP} MODULES ${TEST_OP})
endforeach()
bash_test_modules(test_launch_async_mlu START_BASH test_launch_async_mlu.sh
ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
bash_test_modules(test_launch_cloud_mlu START_BASH test_launch_cloud_mlu.sh
ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
bash_test_modules(test_launch_nproc_mlu START_BASH test_launch_nproc_mlu.sh
ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
bash_test_modules(
test_c_comm_init_op_mlu START_BASH test_c_comm_init_op_mlu.sh ENVS
PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
set_tests_properties(test_collective_broadcast PROPERTIES TIMEOUT 120)
set_tests_properties(test_collective_allreduce_sum PROPERTIES TIMEOUT 120)
set_tests_properties(test_collective_allreduce_max PROPERTIES TIMEOUT 120)
set_tests_properties(test_collective_allreduce_min PROPERTIES TIMEOUT 120)
set_tests_properties(test_collective_allreduce_prod PROPERTIES TIMEOUT 120)
set_tests_properties(test_collective_allgather PROPERTIES TIMEOUT 120)
set_tests_properties(test_collective_reduce_sum PROPERTIES TIMEOUT 120)
set_tests_properties(test_collective_reduce_max PROPERTIES TIMEOUT 120)
set_tests_properties(test_collective_reduce_min PROPERTIES TIMEOUT 120)
set_tests_properties(test_collective_reduce_prod PROPERTIES TIMEOUT 120)
set_tests_properties(test_collective_broadcast_api_mlu PROPERTIES TIMEOUT
120)
set_tests_properties(test_collective_allreduce_api_mlu PROPERTIES TIMEOUT
120)
set_tests_properties(test_collective_allgather_api_mlu PROPERTIES TIMEOUT
120)
set_tests_properties(test_c_comm_init_op_mlu PROPERTIES TIMEOUT 120)
set_tests_properties(test_sync_batch_norm_op_mlu_baseline PROPERTIES TIMEOUT
120)
endif()
endif()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import os
import paddle.fluid.core as core
import paddle.fluid as fluid
from paddle.distributed.fleet.base.private_helper_function import (
wait_server_ready,
)
import paddle
paddle.enable_static()
class TestCCommInitOp(unittest.TestCase):
def setUp(self):
self.endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS").split(',')
self.current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT")
self.nranks = len(self.endpoints)
self.rank = self.endpoints.index(self.current_endpoint)
self.mlu_id = int(os.getenv("FLAGS_selected_mlus"))
self.place = fluid.MLUPlace(self.mlu_id)
self.exe = fluid.Executor(self.place)
self.endpoints.remove(self.current_endpoint)
self.other_endpoints = self.endpoints
if self.rank == 0:
wait_server_ready(self.other_endpoints)
def test_specifying_devices(self):
program = fluid.Program()
block = program.global_block()
cncl_id_var = block.create_var(
name=fluid.unique_name.generate('cncl_id'),
persistable=True,
type=fluid.core.VarDesc.VarType.RAW,
)
block.append_op(
type='c_gen_cncl_id',
inputs={},
outputs={'Out': cncl_id_var},
attrs={
'rank': self.rank,
'endpoint': self.current_endpoint,
'other_endpoints': self.other_endpoints,
},
)
block.append_op(
type='c_comm_init',
inputs={'X': cncl_id_var},
outputs={},
attrs={
'nranks': self.nranks,
'rank': self.rank,
'ring_id': 0,
'device_id': self.mlu_id,
},
)
self.exe.run(program)
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import argparse
import os
import sys
import signal
import time
import socket
from contextlib import closing
import math
import paddle
import paddle.fluid as fluid
import paddle.fluid.profiler as profiler
import paddle.fluid.unique_name as nameGen
from paddle.fluid import core
import unittest
from multiprocessing import Process
import paddle.fluid.layers as layers
from functools import reduce
from test_collective_api_base_mlu import (
TestCollectiveAPIRunnerBase,
runtime_main,
)
paddle.enable_static()
class TestCollectiveAllgatherAPI(TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank):
with fluid.program_guard(main_prog, startup_program):
tensor_list = []
tindata = paddle.static.data(
name="tindata", shape=[-1, 10, 1000], dtype='float32'
)
paddle.distributed.all_gather(tensor_list, tindata)
return tensor_list
if __name__ == "__main__":
runtime_main(TestCollectiveAllgatherAPI, "allgather")
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import argparse
import os
import sys
import signal
import time
from contextlib import closing
import math
import paddle
import paddle.fluid as fluid
import paddle.fluid.profiler as profiler
import paddle.fluid.unique_name as nameGen
from paddle.fluid import core
import unittest
from multiprocessing import Process
import paddle.fluid.layers as layers
from functools import reduce
from test_collective_base_mlu import TestCollectiveRunnerBase, runtime_main
paddle.enable_static()
class TestCollectiveAllgather(TestCollectiveRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, col_type):
ring_id = 0
nranks = 2
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata", shape=[-1, 10, 1000], dtype='float32'
)
tindata.desc.set_need_check_feed(False)
toutdata = main_prog.current_block().create_var(
name="outofallgather",
dtype='float32',
type=core.VarDesc.VarType.LOD_TENSOR,
persistable=False,
stop_gradient=False,
)
main_prog.global_block().append_op(
type="c_allgather",
inputs={'X': tindata},
attrs={'ring_id': ring_id, 'nranks': nranks},
outputs={'Out': toutdata},
)
main_prog.global_block().append_op(
type="c_sync_comm_stream",
inputs={'X': toutdata},
outputs={'Out': toutdata},
attrs={'ring_id': ring_id},
)
return toutdata
if __name__ == "__main__":
runtime_main(TestCollectiveAllgather)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import argparse
import os
import sys
import signal
import time
import socket
from contextlib import closing
import math
import paddle
import paddle.fluid as fluid
import paddle.fluid.profiler as profiler
import paddle.fluid.unique_name as nameGen
from paddle.fluid import core
import unittest
from multiprocessing import Process
import paddle.fluid.layers as layers
from functools import reduce
from test_collective_api_base_mlu import (
TestCollectiveAPIRunnerBase,
runtime_main,
)
paddle.enable_static()
class TestCollectiveAllreduceAPI(TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank):
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata", shape=[-1, 10, 1000], dtype='float32'
)
paddle.distributed.all_reduce(tindata)
return [tindata]
if __name__ == "__main__":
runtime_main(TestCollectiveAllreduceAPI, "allreduce")
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import argparse
import os
import sys
import signal
import time
import socket
from contextlib import closing
import math
import paddle
import paddle.fluid as fluid
import paddle.fluid.profiler as profiler
import paddle.fluid.unique_name as nameGen
from paddle.fluid import core
import unittest
from multiprocessing import Process
import paddle.fluid.layers as layers
from functools import reduce
from test_collective_base_mlu import TestCollectiveRunnerBase, runtime_main
paddle.enable_static()
class TestCollectiveAllreduce(TestCollectiveRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, col_type):
ring_id = 0
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata", shape=[-1, 10, 1000], dtype='float32'
)
tindata.desc.set_need_check_feed(False)
toutdata = main_prog.current_block().create_var(
name="outof" + col_type,
dtype='float32',
type=core.VarDesc.VarType.LOD_TENSOR,
persistable=False,
stop_gradient=False,
)
main_prog.global_block().append_op(
type="c_" + col_type,
inputs={'X': tindata},
attrs={'ring_id': ring_id},
outputs={'Out': toutdata},
)
main_prog.global_block().append_op(
type="c_sync_comm_stream",
inputs={'X': toutdata},
outputs={'Out': toutdata},
attrs={'ring_id': ring_id},
)
return toutdata
if __name__ == "__main__":
runtime_main(TestCollectiveAllreduce)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import argparse
import os
import sys
import signal
import time
import socket
from contextlib import closing
import math
import paddle
import paddle.fluid as fluid
import paddle.fluid.profiler as profiler
import paddle.fluid.unique_name as nameGen
from paddle.fluid import core
import unittest
from multiprocessing import Process
import paddle.fluid.layers as layers
from functools import reduce
from test_collective_api_base_mlu import (
TestCollectiveAPIRunnerBase,
runtime_main,
)
paddle.enable_static()
class TestCollectiveBroadcastAPI(TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank):
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata", shape=[-1, 10, 1000], dtype="float32"
)
tindata.desc.set_need_check_feed(False)
paddle.distributed.broadcast(tindata, src=1)
return [tindata]
if __name__ == "__main__":
runtime_main(TestCollectiveBroadcastAPI, "broadcast")
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import argparse
import os
import sys
import signal
import time
import socket
from contextlib import closing
import math
import paddle
import paddle.fluid as fluid
import paddle.fluid.profiler as profiler
import paddle.fluid.unique_name as nameGen
from paddle.fluid import core
import unittest
from multiprocessing import Process
import paddle.fluid.layers as layers
from functools import reduce
from test_collective_base_mlu import TestCollectiveRunnerBase, runtime_main
paddle.enable_static()
class TestCollectiveBroadcast(TestCollectiveRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, col_type):
ring_id = 0
rootid = 1
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata", shape=[-1, 10, 1000], dtype='float32'
)
tindata.desc.set_need_check_feed(False)
toutdata = main_prog.current_block().create_var(
name="outofbroadcast",
dtype='float32',
type=core.VarDesc.VarType.LOD_TENSOR,
persistable=False,
stop_gradient=False,
)
main_prog.global_block().append_op(
type="c_broadcast",
inputs={'X': tindata},
attrs={'ring_id': ring_id, 'root': rootid},
outputs={'Out': toutdata},
)
main_prog.global_block().append_op(
type="c_sync_comm_stream",
inputs={'X': toutdata},
outputs={'Out': toutdata},
attrs={'ring_id': ring_id},
)
return toutdata
if __name__ == "__main__":
runtime_main(TestCollectiveBroadcast)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import argparse
import os
import sys
import signal
import time
import socket
from contextlib import closing
import math
import paddle
import paddle.fluid as fluid
import paddle.fluid.profiler as profiler
import paddle.fluid.unique_name as nameGen
from paddle.fluid import core
import unittest
from multiprocessing import Process
import paddle.fluid.layers as layers
from functools import reduce
from test_collective_api_base_mlu import (
TestCollectiveAPIRunnerBase,
runtime_main,
)
paddle.enable_static()
class TestCollectiveReduceAPI(TestCollectiveAPIRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, rank):
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata", shape=[-1, 10, 1000], dtype='float32'
)
tindata.desc.set_need_check_feed(False)
paddle.distributed.reduce(tindata, dst=0)
return [tindata]
if __name__ == "__main__":
runtime_main(TestCollectiveReduceAPI, "reduce")
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import argparse
import os
import sys
import signal
import time
import socket
from contextlib import closing
import math
import paddle
import paddle.fluid as fluid
import paddle.fluid.profiler as profiler
import paddle.fluid.unique_name as nameGen
from paddle.fluid import core
import unittest
from multiprocessing import Process
import paddle.fluid.layers as layers
from functools import reduce
from test_collective_base_mlu import TestCollectiveRunnerBase, runtime_main
paddle.enable_static()
class TestCollectiveReduce(TestCollectiveRunnerBase):
def __init__(self):
self.global_ring_id = 0
def get_model(self, main_prog, startup_program, col_type):
ring_id = 0
rootid = 1
with fluid.program_guard(main_prog, startup_program):
tindata = paddle.static.data(
name="tindata", shape=[-1, 10, 1000], dtype='float32'
)
tindata.desc.set_need_check_feed(False)
toutdata = main_prog.current_block().create_var(
name="outof" + col_type,
dtype='float32',
type=core.VarDesc.VarType.LOD_TENSOR,
persistable=False,
stop_gradient=False,
)
main_prog.global_block().append_op(
type="c_" + col_type,
inputs={'X': tindata},
attrs={'ring_id': ring_id, 'root_id': rootid},
outputs={'Out': toutdata},
)
main_prog.global_block().append_op(
type="c_sync_comm_stream",
inputs={'X': toutdata},
outputs={'Out': toutdata},
attrs={'ring_id': ring_id},
)
return toutdata
if __name__ == "__main__":
runtime_main(TestCollectiveReduce)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import time
import paddle.fluid as fluid
def train(prefix):
selected_mlus = os.getenv("FLAGS_selected_mlus")
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
worker_endpoints_env = os.getenv("PADDLE_TRAINER_ENDPOINTS")
current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT")
worker_endpoints = worker_endpoints_env
trainers_num = len(worker_endpoints.split(','))
name = "selected_mlus:{} worker_endpoints:{} trainers_num:{} current_endpoint:{} trainer_id:{}".format(
selected_mlus,
worker_endpoints,
trainers_num,
current_endpoint,
trainer_id,
)
print(name)
with open(
"multi_process_{}.check_{}.log".format(prefix, trainer_id), "w"
) as f:
f.write(name)
def train_abort(prefix):
selected_mlus = os.getenv("FLAGS_selected_mlus")
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
worker_endpoints_env = os.getenv("PADDLE_TRAINER_ENDPOINTS")
current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT")
worker_endpoints = worker_endpoints_env
trainers_num = len(worker_endpoints.split(','))
if trainer_id == 0:
try:
# train abort
exit(1)
except SystemExit:
name = "abort>>> selected_mlus:{} worker_endpoints:{} trainers_num:{} current_endpoint:{} trainer_id:{}".format(
selected_mlus,
worker_endpoints,
trainers_num,
current_endpoint,
trainer_id,
)
print(name)
with open(
"multi_process_{}.check_{}.log".format(prefix, trainer_id), "w"
) as f:
f.write(name)
raise
else:
# sleep 30s to make sure paddle.distributed.launch will terminate this process
time.sleep(30)
name = "selected_mlus:{} worker_endpoints:{} trainers_num:{} current_endpoint:{} trainer_id:{}".format(
selected_mlus,
worker_endpoints,
trainers_num,
current_endpoint,
trainer_id,
)
print(name)
with open(
"multi_process_{}.check_{}.log".format(prefix, trainer_id), "w"
) as f:
f.write(name)
if __name__ == '__main__':
if len(sys.argv) == 3 and sys.argv[2] == "abort":
prefix = sys.argv[1]
train_abort(prefix)
else:
prefix = sys.argv[1]
train(prefix)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import time
def train(prefix):
selected_mlus = os.getenv("FLAGS_selected_mlus")
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
worker_endpoints_env = os.getenv("PADDLE_TRAINER_ENDPOINTS")
current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT")
worker_endpoints = worker_endpoints_env
trainers_num = len(worker_endpoints.split(','))
name = "selected_mlus:{} worker_endpoints:{} trainers_num:{} current_endpoint:{} trainer_id:{}".format(
selected_mlus,
worker_endpoints,
trainers_num,
current_endpoint,
trainer_id,
)
print(name)
with open("{}.check_{}.log".format(prefix, trainer_id), "w") as f:
f.write(name)
if __name__ == '__main__':
prefix = sys.argv[1]
train(prefix)
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import contextlib
import unittest
import numpy as np
import pickle
import paddle
import paddle.fluid as fluid
import paddle.fluid.dygraph as dygraph
from paddle.fluid import core
from paddle.fluid.optimizer import SGDOptimizer
from paddle.nn import Conv2D, Linear, SyncBatchNorm
from paddle.fluid.dygraph.base import to_variable
import sys
sys.path.append("..")
from test_dist_base import runtime_main, TestParallelDyGraphRunnerBase
class TestLayer(paddle.nn.Layer):
def __init__(
self,
num_channels,
num_filters,
filter_size,
stride=1,
groups=1,
act=None,
):
super().__init__()
self._conv = Conv2D(
in_channels=num_channels,
out_channels=num_filters,
kernel_size=filter_size,
stride=stride,
padding=(filter_size - 1) // 2,
groups=groups,
bias_attr=False,
)
self._sync_batch_norm = SyncBatchNorm(num_filters)
self._conv2 = Conv2D(
in_channels=num_filters,
out_channels=num_filters,
kernel_size=filter_size,
stride=stride,
padding=(filter_size - 1) // 2,
groups=groups,
bias_attr=False,
)
self._sync_batch_norm2 = SyncBatchNorm(
num_filters, weight_attr=False, bias_attr=False
)
def forward(self, inputs):
y = self._conv(inputs)
y = self._sync_batch_norm(y)
y = self._conv2(y)
y = self._sync_batch_norm2(y)
return y
class TestSyncBatchNorm(TestParallelDyGraphRunnerBase):
def get_model(self):
model = TestLayer(3, 64, 7)
train_reader = paddle.batch(
paddle.dataset.flowers.test(use_xmap=False),
batch_size=32,
drop_last=True,
)
opt = fluid.optimizer.Adam(
learning_rate=1e-3, parameter_list=model.parameters()
)
return model, train_reader, opt
def run_one_loop(self, model, opt, data):
batch_size = len(data)
dy_x_data = np.array([x[0].reshape(3, 224, 224) for x in data]).astype(
'float32'
)
img = to_variable(dy_x_data)
img.stop_gradient = False
out = model(img)
out = paddle.mean(out)
return out
if __name__ == "__main__":
runtime_main(TestSyncBatchNorm)
#!/bin/bash
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
# use default values
# FIXME: random fails on Unknown command lines -c (or -m).
MLU_VISIBLE_DEVICES=0,1 python -m paddle.distributed.launch c_comm_init_op_mlu.py
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import unittest
import numpy as np
import paddle
from test_collective_base_mlu import TestDistBase
paddle.enable_static()
class TestCAllgatherOp(TestDistBase):
def _setup_config(self):
pass
def test_allgather_fp32(self):
self.check_with_place(
"collective_allgather_op.py", "allgather", "float32"
)
def test_allgather_fp16(self):
self.check_with_place(
"collective_allgather_op.py", "allgather", "float16"
)
def test_allgather_int32(self):
self.check_with_place(
"collective_allgather_op.py", "allgather", "int32"
)
def test_allgather_int16(self):
self.check_with_place(
"collective_allgather_op.py", "allgather", "int16"
)
def test_allgather_int8(self):
self.check_with_place("collective_allgather_op.py", "allgather", "int8")
def test_allgather_uint8(self):
self.check_with_place(
"collective_allgather_op.py", "allgather", "uint8"
)
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
import paddle
from test_collective_api_base_mlu import TestDistBase
paddle.enable_static()
class TestCollectiveAllgatherAPI(TestDistBase):
def _setup_config(self):
pass
def test_allgather_cncl_fp16(self):
self.check_with_place(
"collective_allgather_api.py", "allgather", "float16"
)
def test_allgather_cncl_fp32(self):
self.check_with_place(
"collective_allgather_api.py", "allgather", "float32"
)
def test_allgather_cncl_int32(self):
self.check_with_place(
"collective_allgather_api.py", "allgather", "int32"
)
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
import paddle
from test_collective_api_base_mlu import TestDistBase
paddle.enable_static()
class TestCollectiveAllreduceAPI(TestDistBase):
def _setup_config(self):
pass
def test_allreduce_cncl_fp16(self):
self.check_with_place(
"collective_allreduce_api.py", "allreduce", "float16"
)
def test_allreduce_cncl_fp32(self):
self.check_with_place(
"collective_allreduce_api.py", "allreduce", "float32"
)
def test_allreduce_cncl_int32(self):
self.check_with_place(
"collective_allreduce_api.py", "allreduce", "int32"
)
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import unittest
import numpy as np
import paddle
from test_collective_base_mlu import TestDistBase
paddle.enable_static()
class TestCAllreduceOp(TestDistBase):
def _setup_config(self):
pass
def test_allreduce_max_fp32(self):
self.check_with_place(
"collective_allreduce_op.py", "allreduce_max", "float32"
)
def test_allreduce_max_fp16(self):
self.check_with_place(
"collective_allreduce_op.py", "allreduce_max", "float16"
)
def test_allreduce_max_int32(self):
self.check_with_place(
"collective_allreduce_op.py", "allreduce_max", "int32"
)
def test_allreduce_max_int16(self):
self.check_with_place(
"collective_allreduce_op.py", "allreduce_max", "int16"
)
def test_allreduce_max_int8(self):
self.check_with_place(
"collective_allreduce_op.py", "allreduce_max", "int8"
)
def test_allreduce_max_uint8(self):
self.check_with_place(
"collective_allreduce_op.py", "allreduce_max", "uint8"
)
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import unittest
import numpy as np
import paddle
from test_collective_base_mlu import TestDistBase
paddle.enable_static()
class TestCAllreduceOp(TestDistBase):
def _setup_config(self):
pass
def test_allreduce_min_fp32(self):
self.check_with_place(
"collective_allreduce_op.py", "allreduce_min", "float32"
)
def test_allreduce_min_fp16(self):
self.check_with_place(
"collective_allreduce_op.py", "allreduce_min", "float16"
)
def test_allreduce_min_int32(self):
self.check_with_place(
"collective_allreduce_op.py", "allreduce_min", "int32"
)
def test_allreduce_min_int16(self):
self.check_with_place(
"collective_allreduce_op.py", "allreduce_min", "int16"
)
def test_allreduce_min_int8(self):
self.check_with_place(
"collective_allreduce_op.py", "allreduce_min", "int8"
)
def test_allreduce_min_uint8(self):
self.check_with_place(
"collective_allreduce_op.py", "allreduce_min", "uint8"
)
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import unittest
import numpy as np
import paddle
from test_collective_base_mlu import TestDistBase
paddle.enable_static()
class TestCAllreduceOp(TestDistBase):
def _setup_config(self):
pass
def test_allreduce_prod_fp32(self):
self.check_with_place(
"collective_allreduce_op.py", "allreduce_prod", "float32"
)
def test_allreduce_prod_fp16(self):
self.check_with_place(
"collective_allreduce_op.py", "allreduce_prod", "float16"
)
def test_allreduce_prod_int32(self):
self.check_with_place(
"collective_allreduce_op.py", "allreduce_prod", "int32"
)
def test_allreduce_prod_int16(self):
self.check_with_place(
"collective_allreduce_op.py", "allreduce_prod", "int16"
)
def test_allreduce_prod_int8(self):
self.check_with_place(
"collective_allreduce_op.py", "allreduce_prod", "int8"
)
def test_allreduce_prod_uint8(self):
self.check_with_place(
"collective_allreduce_op.py", "allreduce_prod", "uint8"
)
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import unittest
import numpy as np
import paddle
from test_collective_base_mlu import TestDistBase
paddle.enable_static()
class TestCAllreduceOp(TestDistBase):
def _setup_config(self):
pass
def test_allreduce_sum_fp32(self):
self.check_with_place(
"collective_allreduce_op.py", "allreduce_sum", "float32"
)
def test_allreduce_sum_fp16(self):
self.check_with_place(
"collective_allreduce_op.py", "allreduce_sum", "float16"
)
def test_allreduce_sum_int32(self):
self.check_with_place(
"collective_allreduce_op.py", "allreduce_sum", "int32"
)
def test_allreduce_sum_int16(self):
self.check_with_place(
"collective_allreduce_op.py", "allreduce_sum", "int16"
)
def test_allreduce_sum_int8(self):
self.check_with_place(
"collective_allreduce_op.py", "allreduce_sum", "int8"
)
def test_allreduce_sum_uint8(self):
self.check_with_place(
"collective_allreduce_op.py", "allreduce_sum", "uint8"
)
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import unittest
import os
import sys
import subprocess
import pickle
from contextlib import closing
import paddle
import paddle.fluid as fluid
from paddle.fluid import core
def DataTypeCast(date_type):
np_data_type = None
if date_type == "float16":
np_data_type = np.float16
elif date_type == "float32":
np_data_type = np.float32
elif date_type == "int32":
np_data_type = np.int32
else:
raise ValueError("This data type is not support!")
return np_data_type
class TestCollectiveAPIRunnerBase:
def get_model(self, train_prog, startup_prog, rank, indata=None):
raise NotImplementedError(
"get model should be implemented by child class."
)
def run_trainer(self, args):
train_prog = fluid.Program()
startup_prog = fluid.Program()
endpoints = args["endpoints"].split(",")
rank = args["trainerid"]
current_endpoint = args["currentendpoint"]
nranks = 2
paddle.distributed.init_parallel_env()
device_id = int(os.getenv("FLAGS_selected_mlus", "0"))
place = fluid.MLUPlace(device_id)
np.random.seed(os.getpid())
np_data_type = DataTypeCast(args["data_type"])
indata = np.random.random((10, 1000)).astype(np_data_type)
if args['static_mode']:
result = self.get_model(train_prog, startup_prog, rank)
exe = fluid.Executor(place)
exe.run(startup_prog)
fetch_list = []
for elem in result:
fetch_list.append(elem.name)
out = exe.run(
train_prog, feed={'tindata': indata}, fetch_list=fetch_list
)
else:
out = self.get_model(train_prog, startup_prog, rank, indata)
# print(out, sys.stderr)
sys.stdout.buffer.write(pickle.dumps(out))
def runtime_main(test_class, col_type):
args = {}
model = test_class()
args["trainerid"] = int(os.getenv("PADDLE_TRAINER_ID"))
args["trainernum"] = int(os.getenv("PADDLE_TRAINERS_NUM"))
args["endpoints"] = os.getenv('PADDLE_TRAINER_ENDPOINTS')
args["currentendpoint"] = os.getenv("PADDLE_CURRENT_ENDPOINT")
args["col_type"] = col_type
args["backend"] = os.getenv("BACKEND")
args["path_id"] = int(os.getenv("PATH_ID"))
args["static_mode"] = int(os.getenv("STATIC_MODE"))
args["data_type"] = os.getenv("DATA_TYPE")
model.run_trainer(args)
import socket
from contextlib import closing
class TestDistBase(unittest.TestCase):
def setUp(self):
self._port_set = set()
self._trainers = 2
self._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
self._find_free_port(),
self._find_free_port(),
)
self._python_interp = sys.executable
def _find_free_port(self):
def __free_port():
with closing(
socket.socket(socket.AF_INET, socket.SOCK_STREAM)
) as s:
s.bind(('', 0))
return s.getsockname()[1]
while True:
port = __free_port()
if port not in self._port_set:
self._port_set.add(port)
return port
def _run_cluster(self, model_file, envs):
worker_endpoints = self._ps_endpoints.split(",")
w0_ep, w1_ep = worker_endpoints
# print("w0_ep:",w0_ep," w1_ep:",w1_ep)
env0 = {
"FLAGS_selected_mlus": "0",
"PADDLE_TRAINER_ID": "0",
"PADDLE_TRAINERS_NUM": "2",
"PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints,
"PADDLE_CURRENT_ENDPOINT": w0_ep,
}
env1 = {
"FLAGS_selected_mlus": "1",
"PADDLE_TRAINER_ID": "1",
"PADDLE_TRAINERS_NUM": "2",
"PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints,
"PADDLE_CURRENT_ENDPOINT": w1_ep,
}
# update environment
env0.update(envs)
env1.update(envs)
if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
tr_cmd = "%s -m coverage run --branch -p %s"
else:
tr_cmd = "%s %s"
tr0_cmd = tr_cmd % (self._python_interp, model_file)
tr1_cmd = tr_cmd % (self._python_interp, model_file)
tr0_pipe = open("/tmp/tr0_err_%d.log" % os.getpid(), "w")
tr1_pipe = open("/tmp/tr1_err_%d.log" % os.getpid(), "w")
# print(tr0_cmd)
tr0_proc = subprocess.Popen(
tr0_cmd.strip().split(),
stdout=subprocess.PIPE,
stderr=tr0_pipe,
env=env0,
)
tr1_proc = subprocess.Popen(
tr0_cmd.strip().split(),
stdout=subprocess.PIPE,
stderr=tr1_pipe,
env=env1,
)
tr0_out, tr0_err = tr0_proc.communicate()
tr1_out, tr1_err = tr1_proc.communicate()
sys.stderr.write('trainer 0 stderr: %s\n' % tr0_err)
sys.stderr.write('trainer 1 stderr: %s\n' % tr1_err)
# close trainer file
tr0_pipe.close()
tr1_pipe.close()
with open("/tmp/tr0_err_%d.log" % os.getpid(), "r") as f:
sys.stderr.write('trainer 0 stderr file: %s\n' % f.read())
with open("/tmp/tr1_err_%d.log" % os.getpid(), "r") as f:
sys.stderr.write('trainer 1 stderr file: %s\n' % f.read())
return (
pickle.loads(tr0_out),
pickle.loads(tr1_out),
tr0_proc.pid,
tr1_proc.pid,
)
def check_with_place(
self,
model_file,
col_type,
data_type,
path_id="0",
static_mode="1",
check_error_log=False,
need_envs={},
):
required_envs = {
"FLAGS_fraction_of_gpu_memory_to_use": "0.15",
"FLAGS_eager_delete_tensor_gb": "0.0",
"PATH": os.getenv("PATH"),
"PYTHONPATH": os.getenv("PYTHONPATH", ""),
"LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
"LD_PRELOAD": os.getenv("LD_PRELOAD", ""),
"FLAGS_call_stack_level": "2",
"GLOG_v": "3",
"STATIC_MODE": static_mode,
"PADDLE_WITH_GLOO": '0',
"BACKEND": "cncl",
"PATH_ID": path_id,
"DATA_TYPE": data_type,
}
required_envs.update(need_envs)
if check_error_log:
required_envs["GLOG_v"] = "3"
required_envs["GLOG_logtostderr"] = "1"
required_envs["GLOO_LOG_LEVEL"] = "TRACE"
tr0_out, tr1_out, pid0, pid1 = self._run_cluster(
model_file, required_envs
)
np_data_type = DataTypeCast(data_type)
np.random.seed(pid0)
input1 = np.random.random((10, 1000)).astype(np_data_type)
np.random.seed(pid1)
input2 = np.random.random((10, 1000)).astype(np_data_type)
if col_type == "broadcast":
need_result = input2
np.testing.assert_allclose(tr0_out[0], need_result)
np.testing.assert_allclose(tr1_out[0], need_result)
elif col_type == "allreduce":
need_result = input1 + input2
np.testing.assert_allclose(
tr0_out[0], need_result, rtol=1e-05, atol=1e-05
)
np.testing.assert_allclose(
tr1_out[0], need_result, rtol=1e-05, atol=1e-05
)
elif col_type == "reduce":
need_result = input1 + input2
np.testing.assert_allclose(tr0_out[0], need_result)
elif col_type == "allgather":
need_result = np.vstack((input1, input2))
tr_out0 = np.vstack((tr0_out[0], tr0_out[1]))
tr_out1 = np.vstack((tr1_out[0], tr1_out[1]))
np.testing.assert_allclose(tr_out0, need_result)
np.testing.assert_allclose(tr_out1, need_result)
else:
pass
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import unittest
import time
import argparse
import os
import sys
import subprocess
import traceback
import functools
import pickle
from contextlib import closing
import paddle.fluid as fluid
import paddle.fluid.unique_name as nameGen
from paddle.fluid import core
def DataTypeCast(date_type):
np_data_type = None
if date_type == "float16":
np_data_type = np.float16
elif date_type == "float32":
np_data_type = np.float32
elif date_type == "float64":
np_data_type = np.float64
elif date_type == "int8":
np_data_type = np.int8
elif date_type == "int16":
np_data_type = np.int16
elif date_type == "int32":
np_data_type = np.int32
elif date_type == "uint8":
np_data_type = np.uint8
else:
raise ValueError("This data type is not support!")
return np_data_type
class TestCollectiveRunnerBase:
def get_model(self, train_prog, startup_prog, col_type):
raise NotImplementedError(
"get model should be implemented by child class."
)
def wait_server_ready(self, endpoints):
while True:
all_ok = True
not_ready_endpoints = []
for ep in endpoints:
ip_port = ep.split(":")
with closing(
socket.socket(socket.AF_INET, socket.SOCK_STREAM)
) as sock:
sock.settimeout(2)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if hasattr(socket, 'SO_REUSEPORT'):
sock.setsockopt(
socket.SOL_SOCKET, socket.SO_REUSEPORT, 1
)
result = sock.connect_ex((ip_port[0], int(ip_port[1])))
if result != 0:
all_ok = False
not_ready_endpoints.append(ep)
if not all_ok:
sys.stderr.write("server not ready, wait 3 sec to retry...\n")
sys.stderr.write(
"not ready endpoints:" + str(not_ready_endpoints) + "\n"
)
sys.stderr.flush()
time.sleep(3)
else:
break
# endpoints should be ["ip1:port1","ip2:port2"]
def initCommunicator(
self, program, rank, nranks, wait_port, current_endpoint, endpoints
):
other_endpoints = endpoints[:]
other_endpoints.remove(current_endpoint)
if rank == 0 and wait_port:
self.wait_server_ready(other_endpoints)
block = program.global_block()
cncl_id_var = block.create_var(
name=nameGen.generate('cncl_id'),
persistable=True,
type=core.VarDesc.VarType.RAW,
)
block.append_op(
type='c_gen_cncl_id',
inputs={},
outputs={'Out': cncl_id_var},
attrs={
'rank': rank,
'endpoint': current_endpoint,
'other_endpoints': other_endpoints,
},
)
block.append_op(
type='c_comm_init',
inputs={'X': cncl_id_var},
outputs={},
attrs={
'nranks': nranks,
'rank': rank,
'ring_id': self.global_ring_id,
},
)
def run_trainer(self, args):
train_prog = fluid.Program()
startup_prog = fluid.Program()
endpoints = args["endpoints"].split(",")
rank = args["trainerid"]
current_endpoint = args["currentendpoint"]
nranks = 2
self.initCommunicator(
startup_prog, rank, nranks, True, current_endpoint, endpoints
)
self.rank = rank
result = self.get_model(train_prog, startup_prog, args["col_type"])
device_id = int(os.getenv("FLAGS_selected_mlus", "0"))
place = fluid.MLUPlace(device_id)
exe = fluid.Executor(place)
exe.run(startup_prog)
np.random.seed(os.getpid())
np_data_type = DataTypeCast(args["data_type"])
indata = np.random.random((10, 1000)).astype(np_data_type)
out = exe.run(
train_prog, feed={'tindata': indata}, fetch_list=[result.name]
)
sys.stdout.buffer.write(pickle.dumps(out))
def runtime_main(test_class):
args = {}
model = test_class()
args["deviceid"] = os.getenv("FLAGS_selected_mlus")
args["trainerid"] = int(os.getenv("PADDLE_TRAINER_ID"))
args["trainernum"] = int(os.getenv("PADDLE_TRAINERS_NUM"))
args["endpoints"] = os.getenv('PADDLE_TRAINER_ENDPOINTS')
args["currentendpoint"] = os.getenv("PADDLE_CURRENT_ENDPOINT")
args["col_type"] = os.getenv("COL_TYPE")
args["data_type"] = os.getenv("DATA_TYPE")
model.run_trainer(args)
import socket
from contextlib import closing
class TestDistBase(unittest.TestCase):
def setUp(self):
self._port_set = set()
self._trainers = 2
self._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
self._find_free_port(),
self._find_free_port(),
)
self._python_interp = sys.executable
def _find_free_port(self):
def __free_port():
with closing(
socket.socket(socket.AF_INET, socket.SOCK_STREAM)
) as s:
s.bind(('', 0))
return s.getsockname()[1]
while True:
port = __free_port()
if port not in self._port_set:
self._port_set.add(port)
return port
def _run_cluster(self, model_file, envs):
worker_endpoints = self._ps_endpoints.split(",")
w0_ep, w1_ep = worker_endpoints
env0 = {
"FLAGS_selected_mlus": "0",
"PADDLE_TRAINER_ID": "0",
"PADDLE_TRAINERS_NUM": "2",
"PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints,
"PADDLE_CURRENT_ENDPOINT": w0_ep,
}
env1 = {
"FLAGS_selected_mlus": "1",
"PADDLE_TRAINER_ID": "1",
"PADDLE_TRAINERS_NUM": "2",
"PADDLE_TRAINER_ENDPOINTS": self._ps_endpoints,
"PADDLE_CURRENT_ENDPOINT": w1_ep,
}
# update environment
env0.update(envs)
env1.update(envs)
tr_cmd = "%s %s"
tr0_cmd = tr_cmd % (self._python_interp, model_file)
tr1_cmd = tr_cmd % (self._python_interp, model_file)
tr0_pipe = open("/tmp/tr0_err.log", "wb")
tr1_pipe = open("/tmp/tr1_err.log", "wb")
tr0_proc = subprocess.Popen(
tr0_cmd.strip().split(),
stdout=subprocess.PIPE,
stderr=tr0_pipe,
env=env0,
)
tr1_proc = subprocess.Popen(
tr0_cmd.strip().split(),
stdout=subprocess.PIPE,
stderr=tr1_pipe,
env=env1,
)
tr0_out, tr0_err = tr0_proc.communicate()
tr1_out, tr1_err = tr1_proc.communicate()
sys.stderr.write('trainer 0 stderr: %s\n' % tr0_err)
sys.stderr.write('trainer 1 stderr: %s\n' % tr1_err)
# close trainer file
tr0_pipe.close()
tr1_pipe.close()
return (
pickle.loads(tr0_out),
pickle.loads(tr1_out),
tr0_proc.pid,
tr1_proc.pid,
)
def check_with_place(
self,
model_file,
col_type,
data_type,
check_error_log=False,
need_envs={},
):
required_envs = {
"FLAGS_eager_delete_tensor_gb": "0.0",
"PATH": os.getenv("PATH"),
"PYTHONPATH": os.getenv("PYTHONPATH", ""),
"LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
"LD_PRELOAD": os.getenv("LD_PRELOAD", ""),
"GLOG_v": "3",
"DATA_TYPE": data_type,
"COL_TYPE": col_type,
}
required_envs.update(need_envs)
if check_error_log:
required_envs["GLOG_v"] = "3"
required_envs["GLOG_logtostderr"] = "1"
tr0_out, tr1_out, pid0, pid1 = self._run_cluster(
model_file, required_envs
)
np_data_type = DataTypeCast(data_type)
np.random.seed(pid0)
input1 = np.random.random((10, 1000)).astype(np_data_type)
np.random.seed(pid1)
input2 = np.random.random((10, 1000)).astype(np_data_type)
if col_type == "broadcast":
need_result = input2
np.testing.assert_allclose(tr0_out[0], need_result)
np.testing.assert_allclose(tr1_out[0], need_result)
elif col_type == "allreduce_sum":
need_result = input1 + input2
np.testing.assert_allclose(
tr0_out[0], need_result, rtol=1e-05, atol=1e-05
)
np.testing.assert_allclose(
tr1_out[0], need_result, rtol=1e-05, atol=1e-05
)
elif col_type == "allreduce_prod":
need_result = input1 * input2
np.testing.assert_allclose(
tr0_out[0], need_result, rtol=1e-05, atol=1e-05
)
np.testing.assert_allclose(
tr1_out[0], need_result, rtol=1e-05, atol=1e-05
)
elif col_type == "allreduce_max":
need_result = np.maximum(input1, input2)
np.testing.assert_allclose(
tr0_out[0], need_result, rtol=1e-05, atol=1e-05
)
np.testing.assert_allclose(
tr1_out[0], need_result, rtol=1e-05, atol=1e-05
)
elif col_type == "allreduce_min":
need_result = np.minimum(input1, input2)
np.testing.assert_allclose(
tr0_out[0], need_result, rtol=1e-05, atol=1e-05
)
np.testing.assert_allclose(
tr1_out[0], need_result, rtol=1e-05, atol=1e-05
)
elif col_type == "reduce_sum":
need_result = input1 + input2
np.testing.assert_allclose(tr1_out[0], need_result)
elif col_type == "reduce_prod":
need_result = input1 * input2
np.testing.assert_allclose(tr1_out[0], need_result)
elif col_type == "reduce_max":
need_result = np.maximum(input1, input2)
np.testing.assert_allclose(tr1_out[0], need_result)
elif col_type == "reduce_min":
need_result = np.minimum(input1, input2)
np.testing.assert_allclose(tr1_out[0], need_result)
elif col_type == "allgather":
need_result = np.vstack((input1, input2))
np.testing.assert_allclose(tr0_out[0], need_result)
np.testing.assert_allclose(tr1_out[0], need_result)
else:
pass
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import unittest
import numpy as np
import paddle
from test_collective_base_mlu import TestDistBase
paddle.enable_static()
class TestCBroadcastOp(TestDistBase):
def _setup_config(self):
pass
def test_broadcast_fp32(self):
self.check_with_place(
"collective_broadcast_op.py", "broadcast", "float32"
)
def test_broadcast_fp16(self):
self.check_with_place(
"collective_broadcast_op.py", "broadcast", "float16"
)
def test_broadcast_int32(self):
self.check_with_place(
"collective_broadcast_op.py", "broadcast", "int32"
)
def test_broadcast_int16(self):
self.check_with_place(
"collective_broadcast_op.py", "broadcast", "int16"
)
def test_broadcast_int8(self):
self.check_with_place("collective_broadcast_op.py", "broadcast", "int8")
def test_broadcast_uint8(self):
self.check_with_place(
"collective_broadcast_op.py", "broadcast", "uint8"
)
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
import paddle
from test_collective_api_base_mlu import TestDistBase
paddle.enable_static()
class TestCollectiveBroadcastAPI(TestDistBase):
def _setup_config(self):
pass
def test_broadcast_cncl_fp16(self):
self.check_with_place(
"collective_broadcast_api.py", "broadcast", "float16"
)
def test_broadcast_cncl_fp32(self):
self.check_with_place(
"collective_broadcast_api.py", "broadcast", "float32"
)
def test_broadcast_cncl_int32(self):
self.check_with_place(
"collective_broadcast_api.py", "broadcast", "int32"
)
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import numpy as np
import paddle
from test_collective_api_base_mlu import TestDistBase
paddle.enable_static()
class TestCollectiveReduceAPI(TestDistBase):
def _setup_config(self):
pass
def test_reduce_cncl_fp16(self):
self.check_with_place("collective_reduce_api.py", "reduce", "float16")
def test_reduce_cncl_fp32(self):
self.check_with_place("collective_reduce_api.py", "reduce", "float32")
def test_reduce_cncl_int32(self):
self.check_with_place("collective_reduce_api.py", "reduce", "int32")
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import unittest
import numpy as np
import paddle
from test_collective_base_mlu import TestDistBase
paddle.enable_static()
class TestCReduceOp(TestDistBase):
def _setup_config(self):
pass
def test_reduce_max_fp32(self):
self.check_with_place(
"collective_reduce_op.py", "reduce_max", "float32"
)
def test_reduce_max_fp16(self):
self.check_with_place(
"collective_reduce_op.py", "reduce_max", "float16"
)
def test_reduce_max_int32(self):
self.check_with_place("collective_reduce_op.py", "reduce_max", "int32")
def test_reduce_max_int16(self):
self.check_with_place("collective_reduce_op.py", "reduce_max", "int16")
def test_reduce_max_int8(self):
self.check_with_place("collective_reduce_op.py", "reduce_max", "int8")
def test_reduce_max_uint8(self):
self.check_with_place("collective_reduce_op.py", "reduce_max", "uint8")
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import unittest
import numpy as np
import paddle
from test_collective_base_mlu import TestDistBase
paddle.enable_static()
class TestCReduceOp(TestDistBase):
def _setup_config(self):
pass
def test_reduce_min_fp32(self):
self.check_with_place(
"collective_reduce_op.py", "reduce_min", "float32"
)
def test_reduce_min_fp16(self):
self.check_with_place(
"collective_reduce_op.py", "reduce_min", "float16"
)
def test_reduce_min_int32(self):
self.check_with_place("collective_reduce_op.py", "reduce_min", "int32")
def test_reduce_min_int16(self):
self.check_with_place("collective_reduce_op.py", "reduce_min", "int16")
def test_reduce_min_int8(self):
self.check_with_place("collective_reduce_op.py", "reduce_min", "int8")
def test_reduce_min_uint8(self):
self.check_with_place("collective_reduce_op.py", "reduce_min", "uint8")
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import unittest
import numpy as np
import paddle
from test_collective_base_mlu import TestDistBase
paddle.enable_static()
class TestCReduceOp(TestDistBase):
def _setup_config(self):
pass
def test_reduce_prod_fp32(self):
self.check_with_place(
"collective_reduce_op.py", "reduce_prod", "float32"
)
def test_reduce_prod_fp16(self):
self.check_with_place(
"collective_reduce_op.py", "reduce_prod", "float16"
)
def test_reduce_prod_int32(self):
self.check_with_place("collective_reduce_op.py", "reduce_prod", "int32")
def test_reduce_prod_int16(self):
self.check_with_place("collective_reduce_op.py", "reduce_prod", "int16")
def test_reduce_prod_int8(self):
self.check_with_place("collective_reduce_op.py", "reduce_prod", "int8")
def test_reduce_prod_uint8(self):
self.check_with_place("collective_reduce_op.py", "reduce_prod", "uint8")
if __name__ == '__main__':
unittest.main()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import unittest
import numpy as np
import paddle
from test_collective_base_mlu import TestDistBase
paddle.enable_static()
class TestCReduceOp(TestDistBase):
def _setup_config(self):
pass
def test_reduce_sum_fp32(self):
self.check_with_place(
"collective_reduce_op.py", "reduce_sum", "float32"
)
def test_reduce_sum_fp16(self):
self.check_with_place(
"collective_reduce_op.py", "reduce_sum", "float16"
)
def test_reduce_sum_int32(self):
self.check_with_place("collective_reduce_op.py", "reduce_sum", "int32")
def test_reduce_sum_int16(self):
self.check_with_place("collective_reduce_op.py", "reduce_sum", "int16")
def test_reduce_sum_int8(self):
self.check_with_place("collective_reduce_op.py", "reduce_sum", "int8")
def test_reduce_sum_uint8(self):
self.check_with_place("collective_reduce_op.py", "reduce_sum", "uint8")
if __name__ == '__main__':
unittest.main()
#!/bin/bash
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
# test use DISTRIBUTED_TRAINER_ENDPOINTS env in paddlecloud
unset PADDLE_PORT
export DISTRIBUTED_TRAINER_ENDPOINTS=127.0.0.1:6170,127.0.0.1:6171,127.0.0.2:6170,127.0.0.2:6171
export cluster_node_ips="127.0.0.1,127.0.0.2"
export PADDLE_TRAINERS_NUM=2
export POD_IP=127.0.0.1
export PADDLE_TRAINERS=127.0.0.1,127.0.0.2
export PADDLE_TRAINER_ID=0
export TRAINER_PORTS_NUM=2
file_0="multi_process_fullpath_launch.check_0.log"
file_1="multi_process_fullpath_launch.check_1.log"
distributed_args="--ips=${cluster_node_ips} --mlus=0,1 --log_dir=testlog"
echo "paddle.distributed.fleet.launch async poll process test"
if ! MLU_VISIBLE_DEVICES=0,1 python -m paddle.distributed.fleet.launch ${distributed_args} multi_process_mlu.py fullpath_launch abort; then
echo "train abort as planned"
fi
abort_str1="abort>>> selected_mlus:0 worker_endpoints:127.0.0.1:6170,127.0.0.1:6171,127.0.0.2:6170,127.0.0.2:6171 trainers_num:4 current_endpoint:127.0.0.1:6170 trainer_id:0"
if grep -q "$abort_str1" "$file_0"; then
echo "trainer 0 abort as planned"
else
echo "trainer 0 not abort as planned"
exit -1
fi
if [ ! -f $file_1 ]; then
echo "trainer 1 terminate as planned"
else
echo "trainer 1 not terminate as planned"
rm $file_1
exit -1
fi
if [ -f $file_0 ]; then
rm $file_0
fi
#!/bin/bash
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
# use paddlecloud
echo "begin test use paddlecloud"
cluster_node_ips="127.0.0.1,127.0.0.2"
export PADDLE_TRAINERS_NUM=2
export POD_IP=127.0.0.1
export PADDLE_TRAINERS=127.0.0.1,127.0.0.2
export PADDLE_TRAINER_ID=0
export PADDLE_PORT=35789
export TRAINER_PORTS_NUM=2
distributed_args="--ips=${cluster_node_ips} --mlus=0,1 --log_dir=testlog"
MLU_VISIBLE_DEVICES=0,1 python -m paddle.distributed.fleet.launch ${distributed_args} multi_process_mlu.py fleetlaunchcloud
str1="selected_mlus:0 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790,127.0.0.2:35789,127.0.0.2:35790 trainers_num:4 current_endpoint:127.0.0.1:35789 trainer_id:0"
str2="selected_mlus:1 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790,127.0.0.2:35789,127.0.0.2:35790 trainers_num:4 current_endpoint:127.0.0.1:35790 trainer_id:1"
file_0="multi_process_fleetlaunchcloud.check_0.log"
file_1="multi_process_fleetlaunchcloud.check_1.log"
echo "paddlecloud params test"
if grep -q "$str1" "$file_0"; then
echo "find trainer 0"
else
echo "not find trainer 0"
exit -1
fi
if grep -q "$str2" "$file_1"; then
echo "find trainer 1"
else
echo "not find trainer 1"
exit -1
fi
# test async poll process
if [ -f $file_0 ]; then
rm $file_0
fi
if [ -f $file_1 ]; then
rm $file_1
fi
#!/bin/bash
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
export PADDLE_START_PORT=35789
export MLU_VISIBLE_DEVICES=0,1
function test_nproc_0(){
mlus=$1
file_0="fleet_nproc_0.check_0.log"
rm -f ${file_0}
distributed_args="--log_dir=testlog --nproc_per_node=1 --ips=127.0.0.1"
python -m paddle.distributed.launch ${distributed_args} nproc_process_mlu.py fleet_nproc_0
str0="selected_mlus:${mlus} worker_endpoints:127.0.0.1:35789 trainers_num:1 current_endpoint:127.0.0.1:35789 trainer_id:0"
if grep -q "$str0" "$file_0"; then
echo "find trainer 0"
else
echo "not find trainer 0"
exit -1
fi
if [ -f $file_0 ]; then
rm $file_0
fi
}
function test_nproc_1(){
file_0="fleet_nproc_1.check_0.log"
file_1="fleet_nproc_1.check_1.log"
rm -f ${file_0} ${file_1}
distributed_args="--log_dir=testlog --nproc_per_node=2 --ips=127.0.0.1"
python -m paddle.distributed.launch ${distributed_args} nproc_process_mlu.py fleet_nproc_1
str0="selected_mlus:0 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35789 trainer_id:0"
if grep -q "$str0" "$file_0"; then
echo "find trainer 0"
else
echo "not find trainer 0"
exit -1
fi
str1="selected_mlus:1 worker_endpoints:127.0.0.1:35789,127.0.0.1:35790 trainers_num:2 current_endpoint:127.0.0.1:35790 trainer_id:1"
if grep -q "$str1" "$file_1"; then
echo "find trainer 1"
else
echo "not find trainer 1"
exit -1
fi
if [ -f $file_0 ]; then
rm $file_0
fi
if [ -f $file_1 ]; then
rm $file_1
fi
}
test_nproc_0 "0,1"
test_nproc_1
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
sys.path.append('..')
import unittest
import paddle
import numpy as np
from paddle import _C_ops, _legacy_C_ops
from paddle.fluid.framework import in_dygraph_mode
def run_adam_op(
params,
grads,
lrs,
moment1s,
moment2s,
beta1_pows,
beta2_pows,
master_params,
epsilon,
beta1,
beta2,
place,
multi_precision=False,
use_merged=False,
):
assert len(params) == len(grads)
assert len(params) == len(lrs)
assert len(params) == len(moment1s)
assert len(params) == len(moment2s)
assert len(params) == len(beta1_pows)
assert len(params) == len(beta1_pows)
assert len(params) == len(master_params)
paddle.disable_static()
# paddle.set_device(place)
param_vars = [paddle.fluid.dygraph.to_variable(p) for p in params]
grad_vars = [paddle.fluid.dygraph.to_variable(g) for g in grads]
lr_vars = [paddle.fluid.dygraph.to_variable(l) for l in lrs]
moment1_vars = [paddle.fluid.dygraph.to_variable(m) for m in moment1s]
moment2_vars = [paddle.fluid.dygraph.to_variable(m) for m in moment2s]
beta1_pow_vars = [paddle.fluid.dygraph.to_variable(b) for b in beta1_pows]
beta2_pow_vars = [paddle.fluid.dygraph.to_variable(b) for b in beta2_pows]
master_param_vars = [
paddle.fluid.dygraph.to_variable(m_p) for m_p in master_params
]
if not use_merged:
for i in range(len(param_vars)):
_, _, _, _, _, _ = _legacy_C_ops.adam(
param_vars[i],
grad_vars[i],
lr_vars[i],
moment1_vars[i],
moment2_vars[i],
beta1_pow_vars[i],
beta2_pow_vars[i],
master_param_vars[i],
param_vars[i],
moment1_vars[i],
moment2_vars[i],
beta1_pow_vars[i],
beta2_pow_vars[i],
master_param_vars[i],
'epsilon',
epsilon,
'beta1',
beta1,
'beta2',
beta2,
'multi_precision',
multi_precision,
)
else:
if in_dygraph_mode():
_, _, _, _, _, _ = _C_ops.merged_adam_(
param_vars,
grad_vars,
lr_vars,
moment1_vars,
moment2_vars,
beta1_pow_vars,
beta2_pow_vars,
master_param_vars,
beta1,
beta2,
epsilon,
multi_precision,
False,
)
else:
_, _, _, _, _, _ = _legacy_C_ops.merged_adam(
param_vars,
grad_vars,
lr_vars,
moment1_vars,
moment2_vars,
beta1_pow_vars,
beta2_pow_vars,
master_param_vars,
param_vars,
moment1_vars,
moment2_vars,
beta1_pow_vars,
beta2_pow_vars,
master_param_vars,
'epsilon',
epsilon,
'beta1',
beta1,
'beta2',
beta2,
'multi_precision',
multi_precision,
)
outputs = {
'ParamOut': param_vars,
'Moment1Out': moment1_vars,
'Moment2Out': moment2_vars,
'Beta1PowOut': beta1_pow_vars,
'Beta2PowOut': beta2_pow_vars,
'MasterParamOut': master_param_vars,
}
return outputs
class TestMergedAdam(unittest.TestCase):
def setUp(self):
paddle.disable_static()
self.shapes = [[3, 4], [2, 7], [5, 6], [7, 8]]
self.seed = 10
self.place = paddle.device.MLUPlace(0)
self.__class__.use_mlu = True
def gen_rand_data(self, shapes, dtype):
return [np.random.random(s).astype(dtype) for s in shapes]
def prepare_data(self, shapes, multi_precision, seed, place):
np.random.seed(seed)
mp_dtype = np.float32
# dtype = np.float16 if multi_precision and place == 'mlu' else np.float32
dtype = np.float32
params = self.gen_rand_data(shapes, dtype)
grads = self.gen_rand_data(shapes, dtype)
lrs = self.gen_rand_data([[1], [1], [1], [1]], mp_dtype)
moment1s = self.gen_rand_data(shapes, mp_dtype)
moment2s = self.gen_rand_data(shapes, mp_dtype)
beta1_pows = self.gen_rand_data([[1], [1], [1], [1]], mp_dtype)
beta2_pows = self.gen_rand_data([[1], [1], [1], [1]], mp_dtype)
master_params = [p.astype(mp_dtype) for p in params]
return (
params,
grads,
lrs,
moment1s,
moment2s,
beta1_pows,
beta2_pows,
master_params,
)
def check_with_place(self, place, multi_precision):
(
params,
grads,
lrs,
moment1s,
moment2s,
beta1_pows,
beta2_pows,
master_params,
) = self.prepare_data(self.shapes, multi_precision, self.seed, place)
def run_op(use_merged):
return run_adam_op(
params=params,
grads=grads,
lrs=lrs,
moment1s=moment1s,
moment2s=moment2s,
beta1_pows=beta1_pows,
beta2_pows=beta2_pows,
master_params=master_params,
epsilon=0.9,
beta1=0.9,
beta2=0.99,
place=place,
multi_precision=multi_precision,
use_merged=use_merged,
)
outs1 = run_op(True)
outs2 = run_op(False)
self.assertEqual(len(outs1), len(outs2))
for key in outs1.keys():
value1 = outs1[key]
value2 = outs2[key]
for i in range(len(value1)):
if place == 'mlu':
np.testing.assert_array_equal(value1[i], value2[i])
else:
np.testing.assert_allclose(
value1[i], value2[i], rtol=1e-05, atol=1e-07
)
def test_main(self):
for multi_precision in [False, True]:
self.check_with_place(self.place, multi_precision)
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
sys.path.append('..')
import unittest
import paddle
import numpy as np
from paddle.fluid.layer_helper import LayerHelper
from collections import OrderedDict
def run_momentum_op(
params,
grads,
velocitys,
master_params,
learning_rate,
place,
multi_precision,
mu=0.9,
rescale_grad=0.01,
use_merged=False,
):
assert len(params) == len(grads)
assert len(params) == len(velocitys)
if multi_precision:
assert len(params) == len(master_params)
op_type = 'merged_momentum' if use_merged else 'momentum'
main = paddle.static.Program()
startup = paddle.static.Program()
with paddle.static.program_guard(main, startup):
helper = LayerHelper(op_type, **locals())
attrs = {
'mu': mu,
'multi_precision': multi_precision,
'rescale_grad': rescale_grad,
}
param_vars = [
helper.create_variable(
persistable=True, shape=p.shape, dtype=p.dtype
)
for p in params
]
grad_vars = [
helper.create_variable(shape=g.shape, dtype=g.dtype) for g in grads
]
velocity_vars = [
helper.create_variable(
persistable=True, shape=v.shape, dtype=v.dtype
)
for v in velocitys
]
lr_var = helper.create_variable(
persistable=True,
shape=learning_rate.shape,
dtype=learning_rate.dtype,
)
feed_dict = OrderedDict()
feed_dict.update(
OrderedDict(
[
(p_var.name, p_val)
for p_var, p_val in zip(param_vars, params)
]
)
)
feed_dict.update(
OrderedDict(
[
(v_var.name, v_val)
for v_var, v_val in zip(velocity_vars, velocitys)
]
)
)
fetch_list = list(feed_dict.keys())
feed_dict.update(
OrderedDict(
[(g_var.name, g_val) for g_var, g_val in zip(grad_vars, grads)]
)
)
feed_dict.update({lr_var.name: learning_rate})
if multi_precision:
master_param_vars = [
helper.create_variable(
persistable=True, shape=p.shape, dtype=p.dtype
)
for p in master_params
]
feed_dict.update(
OrderedDict(
[
(mp_var.name, mp_val)
for mp_var, mp_val in zip(
master_param_vars, master_params
)
]
)
)
# CPUPlace does not use MasterParam
if isinstance(place, paddle.CUDAPlace):
fetch_list = fetch_list + [
mp_var.name for mp_var in master_param_vars
]
else:
master_param_vars = None
if not use_merged:
for i, (p, g, v) in enumerate(
zip(param_vars, grad_vars, velocity_vars)
):
inputs = {
'Param': p,
'Grad': g,
'Velocity': v,
'LearningRate': lr_var,
}
outputs = {'ParamOut': p, 'VelocityOut': v}
if multi_precision:
inputs['MasterParam'] = master_param_vars[i]
outputs['MasterParamOut'] = master_param_vars[i]
helper.append_op(
type=op_type, inputs=inputs, outputs=outputs, attrs=attrs
)
else:
inputs = {
'Param': param_vars,
'Grad': grad_vars,
'Velocity': velocity_vars,
'LearningRate': lr_var,
}
outputs = {'ParamOut': param_vars, 'VelocityOut': velocity_vars}
if multi_precision:
inputs['MasterParam'] = master_param_vars
outputs['MasterParamOut'] = master_param_vars
helper.append_op(
type=op_type, inputs=inputs, outputs=outputs, attrs=attrs
)
exe = paddle.static.Executor(place)
with paddle.static.scope_guard(paddle.static.Scope()):
exe.run(startup)
return exe.run(main, feed=feed_dict, fetch_list=fetch_list)
def run_momentum_op2(
params,
grads,
velocitys,
master_params,
learning_rate,
place,
multi_precision,
mu=0.9,
rescale_grad=0.01,
use_merged=False,
use_nesterov=True,
):
assert len(params) == len(grads)
assert len(params) == len(velocitys)
if multi_precision:
assert len(params) == len(master_params)
op_type = 'merged_momentum' if use_merged else 'momentum'
main = paddle.static.Program()
startup = paddle.static.Program()
with paddle.static.program_guard(main, startup):
helper = LayerHelper(op_type, **locals())
param_vars = [
helper.create_variable(
persistable=True, shape=p.shape, dtype=p.dtype
)
for p in params
]
grad_vars = [
helper.create_variable(shape=g.shape, dtype=g.dtype) for g in grads
]
velocity_vars = [
helper.create_variable(
persistable=True, shape=v.shape, dtype=v.dtype
)
for v in velocitys
]
lr_var = helper.create_variable(
persistable=True,
shape=learning_rate.shape,
dtype=learning_rate.dtype,
)
feed_dict = OrderedDict()
feed_dict.update(
OrderedDict(
[
(p_var.name, p_val)
for p_var, p_val in zip(param_vars, params)
]
)
)
feed_dict.update(
OrderedDict(
[
(v_var.name, v_val)
for v_var, v_val in zip(velocity_vars, velocitys)
]
)
)
fetch_list = list(feed_dict.keys())
feed_dict.update(
OrderedDict(
[(g_var.name, g_val) for g_var, g_val in zip(grad_vars, grads)]
)
)
feed_dict.update({lr_var.name: learning_rate})
if multi_precision:
master_param_vars = [
helper.create_variable(
persistable=True, shape=p.shape, dtype=p.dtype
)
for p in master_params
]
feed_dict.update(
OrderedDict(
[
(mp_var.name, mp_val)
for mp_var, mp_val in zip(
master_param_vars, master_params
)
]
)
)
# CPUPlace does not use MasterParam
if isinstance(place, paddle.CUDAPlace):
fetch_list = fetch_list + [
mp_var.name for mp_var in master_param_vars
]
else:
master_param_vars = None
if not use_merged:
for i, (p, g, v) in enumerate(
zip(param_vars, grad_vars, velocity_vars)
):
inputs = {
'Param': p,
'Grad': g,
'Velocity': v,
'LearningRate': lr_var,
}
outputs = {'ParamOut': p, 'VelocityOut': v}
if multi_precision:
inputs['MasterParam'] = master_param_vars[i]
outputs['MasterParamOut'] = master_param_vars[i]
attrs = {
'mu': mu,
'multi_precision': multi_precision,
'rescale_grad': rescale_grad,
'use_nesterov': use_nesterov,
'regularization_method': 'l2_decay',
'regularization_coeff': 2.0,
}
helper.append_op(
type=op_type, inputs=inputs, outputs=outputs, attrs=attrs
)
else:
inputs = {
'Param': param_vars,
'Grad': grad_vars,
'Velocity': velocity_vars,
'LearningRate': lr_var,
}
outputs = {'ParamOut': param_vars, 'VelocityOut': velocity_vars}
if multi_precision:
inputs['MasterParam'] = master_param_vars
outputs['MasterParamOut'] = master_param_vars
attrs = {
'mu': mu,
'multi_precision': multi_precision,
'rescale_grad': rescale_grad,
'use_nesterov': use_nesterov,
'regularization_method': [
'l2_decay' for i in range(len(param_vars))
],
'regularization_coeff': [2.0 for i in range(len(param_vars))],
}
helper.append_op(
type=op_type, inputs=inputs, outputs=outputs, attrs=attrs
)
exe = paddle.static.Executor(place)
with paddle.static.scope_guard(paddle.static.Scope()):
exe.run(startup)
return exe.run(main, feed=feed_dict, fetch_list=fetch_list)
class TestMergedMomentum(unittest.TestCase):
def setUp(self):
paddle.enable_static()
self.shapes = [[3, 4], [2, 7], [5, 6], [7, 8]]
self.seed = 10
self.place = paddle.device.MLUPlace(0)
self.__class__.use_mlu = True
def gen_rand_data(self, shapes, dtype):
return [np.random.random(s).astype(dtype) for s in shapes]
def prepare_data(self, shapes, multi_precision, seed, place):
np.random.seed(seed)
mp_dtype = np.float32
dtype = np.float32
params = self.gen_rand_data(shapes, dtype)
grads = self.gen_rand_data(shapes, dtype)
velocitys = self.gen_rand_data(shapes, mp_dtype)
learning_rate = self.gen_rand_data([[1]], mp_dtype)[0]
if multi_precision:
master_params = [p.astype(mp_dtype) for p in params]
else:
master_params = None
return params, grads, velocitys, master_params, learning_rate
def check_with_place(self, place, multi_precision):
(
params,
grads,
velocitys,
master_params,
learning_rate,
) = self.prepare_data(self.shapes, multi_precision, self.seed, place)
def run_op(use_merged):
# MLU Momentum Op does not support rescale_grad
rescale_grad = 1.0
return run_momentum_op(
params,
grads,
velocitys,
master_params,
learning_rate,
place,
multi_precision,
rescale_grad=rescale_grad,
use_merged=use_merged,
)
outs1 = run_op(True)
outs2 = run_op(False)
self.assertEqual(len(outs1), len(outs2))
for i, (out1, out2) in enumerate(zip(outs1, outs2)):
np.testing.assert_allclose(out1, out2, atol=1e-7)
def test_main(self):
self.check_with_place(self.place, multi_precision=False)
class TestMergedMomentum2(unittest.TestCase):
def setUp(self):
paddle.enable_static()
self.shapes = [[3, 4], [2, 7], [5, 6], [7, 8]]
self.seed = 10
self.place = paddle.device.MLUPlace(0)
self.__class__.use_mlu = True
def gen_rand_data(self, shapes, dtype):
return [np.random.random(s).astype(dtype) for s in shapes]
def prepare_data(self, shapes, multi_precision, seed, place):
np.random.seed(seed)
mp_dtype = np.float32
dtype = np.float32 # np.float16
params = self.gen_rand_data(shapes, dtype)
grads = self.gen_rand_data(shapes, dtype)
velocitys = self.gen_rand_data(shapes, mp_dtype)
learning_rate = self.gen_rand_data([[1]], mp_dtype)[0]
if multi_precision:
master_params = [p.astype(mp_dtype) for p in params]
else:
master_params = None
return params, grads, velocitys, master_params, learning_rate
def check_with_place(self, place, multi_precision):
(
params,
grads,
velocitys,
master_params,
learning_rate,
) = self.prepare_data(self.shapes, multi_precision, self.seed, place)
def run_op(use_nesterov, use_merged):
# MLU Momentum Op does not support rescale_grad
rescale_grad = 1.0
return run_momentum_op2(
params,
grads,
velocitys,
master_params,
learning_rate,
place,
multi_precision,
rescale_grad=rescale_grad,
use_merged=use_merged,
use_nesterov=use_nesterov,
)
outs1 = run_op(use_nesterov=True, use_merged=True)
outs2 = run_op(use_nesterov=True, use_merged=False)
self.assertEqual(len(outs1), len(outs2))
for i, (out1, out2) in enumerate(zip(outs1, outs2)):
np.testing.assert_allclose(out1, out2, atol=1e-7)
outs3 = run_op(use_nesterov=False, use_merged=True)
outs4 = run_op(use_nesterov=False, use_merged=False)
self.assertEqual(len(outs3), len(outs4))
for j, (out3, out4) in enumerate(zip(outs3, outs4)):
np.testing.assert_allclose(out3, out4, atol=1e-7)
def test_main(self):
self.check_with_place(self.place, multi_precision=False)
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
sys.path.append("..")
import unittest
from test_dist_base import TestDistBase
import paddle.fluid as fluid
import os
import subprocess
import pickle
DEFAULT_BATCH_SIZE = 2
flag_name = os.path.splitext(__file__)[0]
print("file: {}".format(flag_name))
class TestParallelDygraphMnistMLU(TestDistBase):
def _setup_config(self):
self._sync_mode = False
self._cncl_mode = True
self._dygraph = True
self._enforce_place = "MLU"
def _get_required_envs(self, check_error_log=False, need_envs={}):
required_envs = {
"PATH": os.getenv("PATH", ""),
"PYTHONPATH": os.getenv("PYTHONPATH", ""),
"LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
"LD_PRELOAD": os.getenv("LD_PRELOAD", ""),
"FLAGS_fraction_of_gpu_memory_to_use": "0.15",
"FLAGS_eager_delete_tensor_gb": "0.0",
"FLAGS_call_stack_level": "2",
"GLOG_v": "2",
"PADDLE_WITH_GLOO": '0',
"BACKEND": "cncl",
}
if check_error_log:
required_envs["GLOG_v"] = "5"
required_envs["GLOG_logtostderr"] = "1"
required_envs["GLOO_LOG_LEVEL"] = "TRACE"
required_envs.update(need_envs)
return required_envs
def _run_local(
self,
model,
envs,
check_error_log=False,
batch_size=DEFAULT_BATCH_SIZE,
batch_merge_repeat=1,
log_name="",
devices="1",
):
cmd = self._python_interp
if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
envs['COVERAGE_FILE'] = os.getenv('COVERAGE_FILE', '')
cmd += " -m coverage run --branch -p"
cmd += " %s --role trainer --update_method local --lr %f" % (
model,
self._lr,
)
if batch_size != DEFAULT_BATCH_SIZE:
cmd += " --batch_size %d" % batch_size
if batch_merge_repeat > 1:
cmd += " --batch_merge_repeat %d" % batch_merge_repeat
if self._nccl2_reduce_layer:
cmd += " --nccl2_reduce_layer_local_run 1"
if self._use_mlu:
cmd += " --use_mlu"
env_local = {
"FLAGS_selected_mlus": devices,
"PADDLE_TRAINERS_NUM": "1",
"PADDLE_TRAINER_ID": "0",
}
else:
env_local = {'CPU_NUM': '1'}
# not use dgc in single card
if len(devices) > 1 and self._use_dgc:
cmd += " --use_dgc"
if self._accumulate_gradient:
cmd += " --accumulate_gradient"
if self._find_unused_parameters:
cmd += " --find_unused_parameters"
env_local.update(envs)
print("local_cmd: {}, env: {}".format(cmd, env_local))
if check_error_log:
path = "/tmp/local_err_%d.log" % os.getpid()
err_log = open(path, "w")
local_proc = subprocess.Popen(
cmd.split(" "),
stdout=subprocess.PIPE,
stderr=err_log,
env=env_local,
)
else:
local_proc = subprocess.Popen(
cmd.split(" "),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env_local,
)
local_out, local_err = local_proc.communicate()
if check_error_log:
err_log.close()
sys.stderr.write(
'\n--run_local-- trainer 0 stderr file saved in: %s\n' % (path)
)
sys.stderr.write('local_stderr: %s\n' % local_err)
sys.stderr.write('local_stdout: %s\n' % pickle.loads(local_out))
return pickle.loads(local_out)
def _run_cluster_nccl2(
self, model, envs, update_method, check_error_log, log_name
):
# NOTE: we reuse ps_endpoints as nccl2 worker endpoints
worker_endpoints = self._ps_endpoints.split(",")
trainer_num = len(worker_endpoints)
procs = []
pipes = []
for i in range(0, trainer_num):
tr_cmd, tr_env = self._get_nccl2_trainer_cmd(
model, worker_endpoints[i], update_method, i, trainer_num
)
tr_env.update(envs)
print(
"use_hallreduce:{} \ntr{}_cmd:{}, env: {}".format(
self._use_hallreduce, i, tr_cmd, tr_env
)
)
tr_pipe = open("/tmp/tr%d_err_%d.log" % (i, os.getpid()), "w")
sys.stderr.write(
"\n{} going to start process {} with nccl2\n".format(
type(self).__name__, i
)
)
tr_proc = subprocess.Popen(
tr_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=tr_pipe,
env=tr_env,
)
procs.append(tr_proc)
pipes.append(tr_pipe)
outs = []
for i in range(0, trainer_num):
tr_out, tr_err = procs[i].communicate()
outs.append(tr_out)
pipes[i].close()
sys.stderr.write('trainer {} stderr: {}\n'.format(i, tr_err))
sys.stderr.write(
'trainer {} glog file saved in: /tmp/tr{}_err_{}.log \n'.format(
i, i, os.getpid()
)
)
if check_error_log:
print("outs[0]:", pickle.loads(outs[0]))
print("outs[1]:", pickle.loads(outs[1]))
return pickle.loads(outs[0]), pickle.loads(outs[1])
def test_mnist(self):
if fluid.core.is_compiled_with_mlu():
self.check_with_place(
os.path.abspath("parallel_dygraph_sync_batch_norm.py"),
delta=1e-5,
check_error_log=True,
log_name=flag_name,
)
if __name__ == "__main__":
unittest.main()
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import os
import paddle
import paddle.nn as nn
import paddle.optimizer as opt
import paddle.distributed as dist
from paddle.distributed.spawn import (
_get_subprocess_env_list,
_options_valid_check,
_get_default_nprocs,
)
from paddle.fluid import core
class LinearNet(nn.Layer):
def __init__(self):
super().__init__()
self._linear1 = nn.Linear(10, 10)
self._linear2 = nn.Linear(10, 1)
def forward(self, x):
return self._linear2(self._linear1(x))
def train(print_result=False):
# 1. initialize parallel environment
dist.init_parallel_env()
# 2. create data parallel layer & optimizer
layer = LinearNet()
dp_layer = paddle.DataParallel(layer)
loss_fn = nn.MSELoss()
adam = opt.Adam(learning_rate=0.001, parameters=dp_layer.parameters())
# 3. run layer
inputs = paddle.randn([10, 10], 'float32')
outputs = dp_layer(inputs)
labels = paddle.randn([10, 1], 'float32')
loss = loss_fn(outputs, labels)
if print_result is True:
print("Rank:", int(os.getenv("PADDLE_TRAINER_ID")))
loss.backward()
adam.step()
adam.clear_grad()
return int(os.getenv("PADDLE_TRAINER_ID"))
class TestSpawn(unittest.TestCase):
def test_nprocs_greater_than_device_num_error(self):
with self.assertRaises(RuntimeError):
_get_subprocess_env_list(nprocs=100, options=dict())
def test_selected_devices_error(self):
with self.assertRaises(ValueError):
options = dict()
options['selected_devices'] = "100,101"
_get_subprocess_env_list(nprocs=2, options=options)
def test_get_correct_env(self):
options = dict()
options['print_config'] = True
env_dict = _get_subprocess_env_list(nprocs=1, options=options)[0]
self.assertEqual(env_dict['PADDLE_TRAINER_ID'], '0')
self.assertEqual(env_dict['PADDLE_TRAINERS_NUM'], '1')
def test_nprocs_not_equal_to_selected_devices(self):
with self.assertRaises(ValueError):
options = dict()
options['selected_devices'] = "100,101,102"
_get_subprocess_env_list(nprocs=2, options=options)
def test_options_valid_check(self):
options = dict()
options['selected_devices'] = "100,101,102"
_options_valid_check(options)
with self.assertRaises(ValueError):
options['error'] = "error"
_options_valid_check(options)
def test_get_default_nprocs(self):
paddle.set_device('mlu')
nprocs = _get_default_nprocs()
self.assertEqual(nprocs, core.get_mlu_device_count())
def test_spawn(self):
num_devs = core.get_mlu_device_count()
context = dist.spawn(train, backend='cncl', nprocs=num_devs)
rank_list = []
for i in range(num_devs):
rank_list.append(context.return_queues[i].get())
rank_list.sort()
self.assertEqual(rank_list, list(range(num_devs)))
if __name__ == '__main__':
unittest.main()
#!/bin/bash
# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
set -e
MLU_VISIBLE_DEVICES=0,1 python -m paddle.distributed.launch test_sync_batch_norm_op_mlu_baseline.py
MLU_VISIBLE_DEVICES=0,1 python -m paddle.distributed.launch test_parallel_dygraph_sync_batch_norm_mlu.py
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册