diff --git a/oneflow/core/functional/impl/comm_functor.cpp b/oneflow/core/functional/impl/comm_functor.cpp index 1f32faf3fef4eaeaed5ef0962dd0dd3860cee872..4aea669132de1da9f0c9e9de5b8e63eb329f8cba 100644 --- a/oneflow/core/functional/impl/comm_functor.cpp +++ b/oneflow/core/functional/impl/comm_functor.cpp @@ -140,39 +140,36 @@ class BroadcastFunctor { } }; +namespace { + +Maybe RankGroupAndDeviceType2AllReduceOpExpr(Symbol rank_group, + DeviceType device_type) { + const auto& parallel_desc = JUST(RankGroup::GetDefaultParallelDesc(device_type, rank_group)); + return one::OpBuilder("eager_nccl_all_reduce") + .Input("in") + .Output("out") + .Attr("parallel_conf", PbMessage2TxtString(parallel_desc->parallel_conf())) + .Attr("async_launch", true) + .Build(); +} + +auto* CachedRankGroupAndDeviceType2AllReduceOpExpr = + DECORATE(&RankGroupAndDeviceType2AllReduceOpExpr, ThreadLocal); + +} // namespace + class LocalAllReduceFunctor { public: LocalAllReduceFunctor() = default; Maybe operator()(const std::shared_ptr& x) const { - { - const auto& device = JUST(x->device()); - CHECK_EQ_OR_RETURN(JUST(device->of_type()), "gpu"); - CHECK_EQ_OR_RETURN(device->device_id(), GlobalProcessCtx::LocalRank()); - } - static thread_local std::unordered_map, std::shared_ptr> - rank_group2op_expr; + const auto& device = JUST(x->device()); + CHECK_EQ_OR_RETURN(device->device_id(), GlobalProcessCtx::LocalRank()); const auto& rank_group = JUST(RankGroupScope::CurrentRankGroup()); - auto iter = rank_group2op_expr.find(rank_group); - std::shared_ptr op_expr; - if (iter == rank_group2op_expr.end()) { - ParallelConf parallel_conf; - parallel_conf.set_device_tag("gpu"); - JUST(rank_group->ForEachRank([¶llel_conf](int64_t rank) -> Maybe { - parallel_conf.add_device_name("@" + std::to_string(rank) + ":" - + std::to_string(GlobalProcessCtx::LocalRank(rank))); - return Maybe::Ok(); - })); - - op_expr = JUST(one::OpBuilder("eager_nccl_all_reduce") - .Input("in") - .Output("out") - .Attr("parallel_conf", PbMessage2TxtString(parallel_conf)) - .Attr("async_launch", true) - .Build()); - rank_group2op_expr[rank_group] = op_expr; - } else { - op_expr = iter->second; - } + const std::string& device_type_str = device->type(); + CHECK_OR_RETURN(device_type_str == "cuda" || device_type_str == "cpu"); + DeviceType device_type = device_type_str == "cuda" ? DeviceType::kGPU : DeviceType::kCPU; + std::shared_ptr op_expr = + JUST(CachedRankGroupAndDeviceType2AllReduceOpExpr(rank_group, device_type)); if (const auto& static_zeros_tensor = std::dynamic_pointer_cast(x)) { return OpInterpUtil::Dispatch(*op_expr, {JUST(static_zeros_tensor->AsMirroredTensor())}, {}); diff --git a/python/oneflow/test/modules/test_ddp.py b/python/oneflow/test/modules/test_ddp.py index 1317b9874a8cc45b2950e77d8b47c9f292fcd1f4..0f44bd6864284114dda5c8d668a12fcd2de1d77b 100644 --- a/python/oneflow/test/modules/test_ddp.py +++ b/python/oneflow/test/modules/test_ddp.py @@ -31,7 +31,7 @@ def np_allclose_with_shape(a, b, *args, **kwargs): @unittest.skipIf(os.getenv("ONEFLOW_TEST_CPU_ONLY"), "only test cpu cases") @flow.unittest.skip_unless_1n2d() class TestDDP(flow.unittest.TestCase): - def test_ddp_basic(test_case): + def _test_ddp_basic(test_case, dev_type): class Mul(flow.nn.Module): def __init__(self): super().__init__() @@ -48,8 +48,8 @@ class TestDDP(flow.unittest.TestCase): else: raise ValueError() - x = x.to("cuda") - m = Mul().to("cuda") + x = x.to(dev_type) + m = Mul().to(dev_type) m = ddp(m) y = m(x) y.sum().backward() @@ -58,7 +58,11 @@ class TestDDP(flow.unittest.TestCase): np_allclose_with_shape(m.w.grad.numpy(), np.array([1.5, 1.5])) ) - def test_ddp_with_unused_param(test_case): + def test_ddp_basic(test_case): + for dev_type in ["cuda", "cpu"]: + test_case._test_ddp_basic(dev_type) + + def _test_ddp_with_unused_param(test_case, dev_type): class Model(flow.nn.Module): def __init__(self): super().__init__() @@ -80,8 +84,8 @@ class TestDDP(flow.unittest.TestCase): else: raise ValueError() - x = x.to("cuda") - m = Model().to("cuda") + x = x.to(dev_type) + m = Model().to(dev_type) m = ddp(m) y = m(x) y.backward() @@ -94,7 +98,11 @@ class TestDDP(flow.unittest.TestCase): np_allclose_with_shape(m.unused_in_all_ranks.grad.numpy(), np.array([0])) ) - def test_out_of_order_execution(test_case): + def test_ddp_with_unused_param(test_case): + for dev_type in ["cuda", "cpu"]: + test_case._test_ddp_with_unused_param(dev_type) + + def _test_out_of_order_execution(test_case, dev_type): class Model(flow.nn.Module): def __init__(self): super().__init__() @@ -121,8 +129,8 @@ class TestDDP(flow.unittest.TestCase): else: raise ValueError() - x = x.to("cuda") - m = Model().to("cuda") + x = x.to(dev_type) + m = Model().to(dev_type) m = ddp(m) y = m(x) y.backward() @@ -131,7 +139,11 @@ class TestDDP(flow.unittest.TestCase): test_case.assertTrue(np_allclose_with_shape(m.w2.grad.numpy(), np.array([4.5]))) test_case.assertTrue(np_allclose_with_shape(m.w3.grad.numpy(), np.array([3]))) - def test_broadcast_buffer(test_case): + def test_out_of_order_execution(test_case): + for dev_type in ["cuda", "cpu"]: + test_case._test_out_of_order_execution(dev_type) + + def _test_broadcast_buffer(test_case, dev_type): rank = flow.env.get_rank() class CustomModule(flow.nn.Module): @@ -145,17 +157,17 @@ class TestDDP(flow.unittest.TestCase): return res x = flow.tensor([2, 3]) * (rank + 1) - x = x.to("cuda") + x = x.to(dev_type) m = CustomModule() - m = m.to("cuda") + m = m.to(dev_type) m = ddp(m) y1 = m(x) y2 = m(x) m = CustomModule() - m = m.to("cuda") + m = m.to(dev_type) m = ddp(m, broadcast_buffers=False) y3 = m(x) @@ -174,6 +186,10 @@ class TestDDP(flow.unittest.TestCase): else: raise ValueError() + def test_broadcast_buffer(test_case): + for dev_type in ["cuda", "cpu"]: + test_case._test_broadcast_buffer(dev_type) + if __name__ == "__main__": unittest.main()