未验证 提交 b9fd067e 编写于 作者: D daquexian 提交者: GitHub

support cpu allreduce (#6627)

Signed-off-by: Ndaquexian <daquexian566@gmail.com>
Co-authored-by: Noneflow-ci-bot <69100618+oneflow-ci-bot@users.noreply.github.com>
上级 81086cff
......@@ -140,39 +140,36 @@ class BroadcastFunctor {
}
};
namespace {
Maybe<one::UserOpExpr> RankGroupAndDeviceType2AllReduceOpExpr(Symbol<RankGroup> rank_group,
DeviceType device_type) {
const auto& parallel_desc = JUST(RankGroup::GetDefaultParallelDesc(device_type, rank_group));
return one::OpBuilder("eager_nccl_all_reduce")
.Input("in")
.Output("out")
.Attr<std::string>("parallel_conf", PbMessage2TxtString(parallel_desc->parallel_conf()))
.Attr<bool>("async_launch", true)
.Build();
}
auto* CachedRankGroupAndDeviceType2AllReduceOpExpr =
DECORATE(&RankGroupAndDeviceType2AllReduceOpExpr, ThreadLocal);
} // namespace
class LocalAllReduceFunctor {
public:
LocalAllReduceFunctor() = default;
Maybe<Tensor> operator()(const std::shared_ptr<one::Tensor>& x) const {
{
const auto& device = JUST(x->device());
CHECK_EQ_OR_RETURN(JUST(device->of_type()), "gpu");
CHECK_EQ_OR_RETURN(device->device_id(), GlobalProcessCtx::LocalRank());
}
static thread_local std::unordered_map<Symbol<RankGroup>, std::shared_ptr<OpExpr>>
rank_group2op_expr;
const auto& device = JUST(x->device());
CHECK_EQ_OR_RETURN(device->device_id(), GlobalProcessCtx::LocalRank());
const auto& rank_group = JUST(RankGroupScope::CurrentRankGroup());
auto iter = rank_group2op_expr.find(rank_group);
std::shared_ptr<OpExpr> op_expr;
if (iter == rank_group2op_expr.end()) {
ParallelConf parallel_conf;
parallel_conf.set_device_tag("gpu");
JUST(rank_group->ForEachRank([&parallel_conf](int64_t rank) -> Maybe<void> {
parallel_conf.add_device_name("@" + std::to_string(rank) + ":"
+ std::to_string(GlobalProcessCtx::LocalRank(rank)));
return Maybe<void>::Ok();
}));
op_expr = JUST(one::OpBuilder("eager_nccl_all_reduce")
.Input("in")
.Output("out")
.Attr("parallel_conf", PbMessage2TxtString(parallel_conf))
.Attr<bool>("async_launch", true)
.Build());
rank_group2op_expr[rank_group] = op_expr;
} else {
op_expr = iter->second;
}
const std::string& device_type_str = device->type();
CHECK_OR_RETURN(device_type_str == "cuda" || device_type_str == "cpu");
DeviceType device_type = device_type_str == "cuda" ? DeviceType::kGPU : DeviceType::kCPU;
std::shared_ptr<OpExpr> op_expr =
JUST(CachedRankGroupAndDeviceType2AllReduceOpExpr(rank_group, device_type));
if (const auto& static_zeros_tensor = std::dynamic_pointer_cast<StaticZerosTensor>(x)) {
return OpInterpUtil::Dispatch<Tensor>(*op_expr,
{JUST(static_zeros_tensor->AsMirroredTensor())}, {});
......
......@@ -31,7 +31,7 @@ def np_allclose_with_shape(a, b, *args, **kwargs):
@unittest.skipIf(os.getenv("ONEFLOW_TEST_CPU_ONLY"), "only test cpu cases")
@flow.unittest.skip_unless_1n2d()
class TestDDP(flow.unittest.TestCase):
def test_ddp_basic(test_case):
def _test_ddp_basic(test_case, dev_type):
class Mul(flow.nn.Module):
def __init__(self):
super().__init__()
......@@ -48,8 +48,8 @@ class TestDDP(flow.unittest.TestCase):
else:
raise ValueError()
x = x.to("cuda")
m = Mul().to("cuda")
x = x.to(dev_type)
m = Mul().to(dev_type)
m = ddp(m)
y = m(x)
y.sum().backward()
......@@ -58,7 +58,11 @@ class TestDDP(flow.unittest.TestCase):
np_allclose_with_shape(m.w.grad.numpy(), np.array([1.5, 1.5]))
)
def test_ddp_with_unused_param(test_case):
def test_ddp_basic(test_case):
for dev_type in ["cuda", "cpu"]:
test_case._test_ddp_basic(dev_type)
def _test_ddp_with_unused_param(test_case, dev_type):
class Model(flow.nn.Module):
def __init__(self):
super().__init__()
......@@ -80,8 +84,8 @@ class TestDDP(flow.unittest.TestCase):
else:
raise ValueError()
x = x.to("cuda")
m = Model().to("cuda")
x = x.to(dev_type)
m = Model().to(dev_type)
m = ddp(m)
y = m(x)
y.backward()
......@@ -94,7 +98,11 @@ class TestDDP(flow.unittest.TestCase):
np_allclose_with_shape(m.unused_in_all_ranks.grad.numpy(), np.array([0]))
)
def test_out_of_order_execution(test_case):
def test_ddp_with_unused_param(test_case):
for dev_type in ["cuda", "cpu"]:
test_case._test_ddp_with_unused_param(dev_type)
def _test_out_of_order_execution(test_case, dev_type):
class Model(flow.nn.Module):
def __init__(self):
super().__init__()
......@@ -121,8 +129,8 @@ class TestDDP(flow.unittest.TestCase):
else:
raise ValueError()
x = x.to("cuda")
m = Model().to("cuda")
x = x.to(dev_type)
m = Model().to(dev_type)
m = ddp(m)
y = m(x)
y.backward()
......@@ -131,7 +139,11 @@ class TestDDP(flow.unittest.TestCase):
test_case.assertTrue(np_allclose_with_shape(m.w2.grad.numpy(), np.array([4.5])))
test_case.assertTrue(np_allclose_with_shape(m.w3.grad.numpy(), np.array([3])))
def test_broadcast_buffer(test_case):
def test_out_of_order_execution(test_case):
for dev_type in ["cuda", "cpu"]:
test_case._test_out_of_order_execution(dev_type)
def _test_broadcast_buffer(test_case, dev_type):
rank = flow.env.get_rank()
class CustomModule(flow.nn.Module):
......@@ -145,17 +157,17 @@ class TestDDP(flow.unittest.TestCase):
return res
x = flow.tensor([2, 3]) * (rank + 1)
x = x.to("cuda")
x = x.to(dev_type)
m = CustomModule()
m = m.to("cuda")
m = m.to(dev_type)
m = ddp(m)
y1 = m(x)
y2 = m(x)
m = CustomModule()
m = m.to("cuda")
m = m.to(dev_type)
m = ddp(m, broadcast_buffers=False)
y3 = m(x)
......@@ -174,6 +186,10 @@ class TestDDP(flow.unittest.TestCase):
else:
raise ValueError()
def test_broadcast_buffer(test_case):
for dev_type in ["cuda", "cpu"]:
test_case._test_broadcast_buffer(dev_type)
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册