未验证 提交 4ecbbdfc 编写于 作者: qq_22305325's avatar qq_22305325 提交者: GitHub

Eager boxing 1 to n (#5943)

* cuda base cpu mpi boxing

* cpu_mpi

* fix conflicts

* add cpu mpi unittests

* more checks and unittests

* abstract_consistent_to_consistent_op_expr

* fix compiler complaint

* refactor consistent-to-consistent eager consisitent op interpreter

* fix compiler complaint

* refactor ConsistentToConsistentOpExpr

* lazy interpreter (#5903)

* fix bugs about consistent_id

* more test_consistent_cast unittests

* refactor functional::ToConsistent

* refactor GetNdSbp

* fix compiler complaints

* Update eager_consistent_op_interpreter.cpp

* Update eager_mirrored_op_interpreter.cpp

* eager_boxing_1_to_n

* add missing files

* del useless file

* minor fix

* refactor GetDevice4CurrentProcessCtx

* refine

* minor fix

* Update naive_1ton_boxing_interpreter.cpp

* refine

* Update eager_boxing_interpreter_mgr.cpp

* fix error

* fix error

* auto format by CI

* fix error

* refine

* make of_format

* Update nd_sbp.h

* fix consistent id check error

* refine

* back up

* refine

* minor fix

* refine

* refine

* refine

* minor fix

* minor fix

* refine

* refine

* Update nccl_boxing_function.cpp

* refine

* fix consistent meta check bug

* zoom kLimitParallelConfString

* refine

* Update symmetric_x_to_b.cpp

* Update boxing_dividor_util.h

* Update boxing_dividor_util.cpp

* Update eager_boxing_interpreter_mgr.cpp

* auto format by CI

* Update asymmetric_x_to_b.cpp

* Update asymmetric_broadcast.cpp

* auto format by CI
Co-authored-by: NXinqi Li <lixinqi0703106@163.com>
Co-authored-by: Nleaves-zwx <kunta0932@gmail.com>
Co-authored-by: NLi Xinqi <lixinqi2010@gmail.com>
Co-authored-by: Noneflow-ci-bot <69100618+oneflow-ci-bot@users.noreply.github.com>
Co-authored-by: Noneflow-ci-bot <ci-bot@oneflow.org>
上级 a70537f3
......@@ -54,7 +54,7 @@ class Device final {
static Maybe<Symbol<Device>> ThreadLocalGetOrNew(const std::string& type, int64_t device_id);
static Maybe<Symbol<Device>> New(const std::string& type, int64_t device_id);
static Maybe<Symbol<Device>> New(const std::string& typed);
static Maybe<Symbol<Device>> New(const std::string& type);
static Maybe<Symbol<Device>> MakeDeviceByParallelDesc(const ParallelDesc& parallel_desc);
static const std::unordered_set<std::string> type_supported;
......
......@@ -30,17 +30,18 @@ namespace oneflow {
namespace {
Maybe<void> RawCheckAsymBroadcast(Symbol<PlacedNdSbp> in, Symbol<PlacedNdSbp> out) {
Maybe<void> RawCheckAsymmetricBroadcast(Symbol<PlacedNdSbp> in, Symbol<PlacedNdSbp> out) {
CHECK_EQ_OR_RETURN(in->nd_sbp()->sbp_parallel_size(), 1);
CHECK_EQ_OR_RETURN(out->nd_sbp()->sbp_parallel_size(), 1);
CHECK_OR_RETURN(EagerBoxingInterpreterUtil::IsBroadcastNdSbp(in->nd_sbp()));
CHECK_OR_RETURN(EagerBoxingInterpreterUtil::IsBroadcastNdSbp(out->nd_sbp()));
CHECK_OR_RETURN(EagerBoxingInterpreterUtil::IsAllBroadcastNdSbp(in->nd_sbp()));
CHECK_OR_RETURN(EagerBoxingInterpreterUtil::IsAllBroadcastNdSbp(out->nd_sbp()));
CHECK_OR_RETURN(out->placement()->Bigger(*in->placement()))
<< "The output placement must contain the input placement";
return Maybe<void>::Ok();
}
static constexpr auto* CheckAsymBroadcast = DECORATE(&RawCheckAsymBroadcast, ThreadLocal);
static constexpr auto* CheckAsymmetricBroadcast =
DECORATE(&RawCheckAsymmetricBroadcast, ThreadLocal);
Maybe<int64_t> CalBroadcastRoot(Symbol<ParallelDesc> src_parallel_desc,
Symbol<ParallelDesc> dst_parallel_desc) {
......@@ -77,8 +78,8 @@ static constexpr auto* CachedEagerNcclBroadcast = DECORATE(&EagerNcclBroadcast,
} // namespace
Maybe<one::Tensor> AsymBroadcast(const std::shared_ptr<one::Tensor>& tensor, Symbol<PlacedNdSbp> in,
Symbol<PlacedNdSbp> out) {
Maybe<one::Tensor> AsymmetricBroadcast(const std::shared_ptr<one::Tensor>& tensor,
Symbol<PlacedNdSbp> in, Symbol<PlacedNdSbp> out) {
const auto& in_placement = in->placement();
const auto& out_placement = out->placement();
const auto& tensor_nd_sbp = JUST(tensor->nd_sbp());
......@@ -110,6 +111,7 @@ Maybe<one::Tensor> AsymBroadcast(const std::shared_ptr<one::Tensor>& tensor, Sym
*local_tensor->shape(), local_tensor->dtype());
}
COMMAND(RegisterBoxingFunction("asymmetric-broadcast", CheckAsymBroadcast, &AsymBroadcast));
COMMAND(RegisterBoxingFunction("asymmetric-broadcast", CheckAsymmetricBroadcast,
&AsymmetricBroadcast));
} // namespace oneflow
......@@ -22,16 +22,16 @@ namespace oneflow {
namespace {
Maybe<void> RawCheckAsymXToB(Symbol<PlacedNdSbp> in, Symbol<PlacedNdSbp> out) {
Maybe<void> RawCheckAsymmetricXToB(Symbol<PlacedNdSbp> in, Symbol<PlacedNdSbp> out) {
CHECK_EQ_OR_RETURN(in->nd_sbp()->sbp_parallel_size(), 1);
CHECK_EQ_OR_RETURN(out->nd_sbp()->sbp_parallel_size(), 1);
CHECK_OR_RETURN(EagerBoxingInterpreterUtil::IsBroadcastNdSbp(out->nd_sbp()));
CHECK_OR_RETURN(EagerBoxingInterpreterUtil::IsAllBroadcastNdSbp(out->nd_sbp()));
CHECK_OR_RETURN(out->placement()->Bigger(*in->placement()));
CHECK_OR_RETURN(in->placement()->device_type() == DeviceType::kGPU);
return Maybe<void>::Ok();
}
static constexpr auto* CheckAsymXToB = DECORATE(&RawCheckAsymXToB, ThreadLocal);
static constexpr auto* CheckAsymmetricXToB = DECORATE(&RawCheckAsymmetricXToB, ThreadLocal);
Maybe<Symbol<cfg::NdSbp>> GetBroadcastNdSbp() {
cfg::NdSbp broadcast_nd_sbp;
......@@ -43,8 +43,8 @@ auto* CachedGetBroadcastNdSbp = DECORATE(&GetBroadcastNdSbp, ThreadLocal);
} // namespace
Maybe<one::Tensor> AsymXToB(const std::shared_ptr<one::Tensor>& tensor, Symbol<PlacedNdSbp> in,
Symbol<PlacedNdSbp> out) {
Maybe<one::Tensor> AsymmetricXToB(const std::shared_ptr<one::Tensor>& tensor,
Symbol<PlacedNdSbp> in, Symbol<PlacedNdSbp> out) {
const auto& tensor_nd_sbp = JUST(tensor->nd_sbp());
CHECK_OR_RETURN(tensor_nd_sbp == in->nd_sbp());
const auto& tensor_placement = JUST(tensor->parallel_desc());
......@@ -63,6 +63,6 @@ Maybe<one::Tensor> AsymXToB(const std::shared_ptr<one::Tensor>& tensor, Symbol<P
return AsymBroadcastBoxingFunction(broadcast_input, broadcast_in_placed_nd_sbp, out);
}
COMMAND(RegisterBoxingFunction("asymmetric-x-to-b", CheckAsymXToB, &AsymXToB));
COMMAND(RegisterBoxingFunction("asymmetric-x-to-b", CheckAsymmetricXToB, &AsymmetricXToB));
} // namespace oneflow
......@@ -76,4 +76,35 @@ Maybe<BoxingDividor> RawFlattenInHierarchy() {
} // namespace
decltype(FlattenInHierarchy) FlattenInHierarchy = DECORATE(&RawFlattenInHierarchy, ThreadLocal);
namespace {
Maybe<Symbol<cfg::NdSbp>> GetPartialSumNdSbp() {
cfg::NdSbp partial_sum_nd_sbp;
partial_sum_nd_sbp.mutable_sbp_parallel()->Add()->mutable_partial_sum_parallel();
return SymbolOf(partial_sum_nd_sbp);
}
auto* CachedGetPartialSumNdSbp = DECORATE(&GetPartialSumNdSbp, ThreadLocal);
Maybe<Symbol<PlacedNdSbp>> RawReplaceNdSbpWithPartialSum(Symbol<PlacedNdSbp> placed_nd_sbp) {
Symbol<cfg::NdSbp> partial_sum_nd_sbp = JUST(CachedGetPartialSumNdSbp());
return JUST(PlacedNdSbp::New(partial_sum_nd_sbp, placed_nd_sbp->placement()));
}
static constexpr auto* ReplaceNdSbpWithPartialSum =
DECORATE(&RawReplaceNdSbpWithPartialSum, ThreadLocal);
Maybe<BoxingDividor> RawOutPlacementAndPartialSum() {
return std::make_shared<BoxingDividor>(
"OutPlacementAndPartialSum",
[](Symbol<PlacedNdSbp> in, Symbol<PlacedNdSbp> out) -> Maybe<Symbol<PlacedNdSbp>> {
return ReplaceNdSbpWithPartialSum(out);
});
}
} // namespace
decltype(OutPlacementAndPartialSum) OutPlacementAndPartialSum =
DECORATE(&RawOutPlacementAndPartialSum, ThreadLocal);
} // namespace oneflow
......@@ -24,6 +24,7 @@ namespace oneflow {
extern Maybe<BoxingDividor> (*ReplaceInDeviceType)(DeviceType device_type);
extern Maybe<BoxingDividor> (*ReplaceOutDeviceType)(DeviceType device_type);
extern Maybe<BoxingDividor> (*FlattenInHierarchy)();
extern Maybe<BoxingDividor> (*OutPlacementAndPartialSum)();
} // namespace oneflow
......
......@@ -79,8 +79,11 @@ Maybe<BoxingExprIf> OptionalCudaCopy(const std::shared_ptr<BoxingExprIf>& core_b
}
Maybe<BoxingExprIf> RawMainBoxingExpr() {
const auto& core = JUST(BoxingExpr("identity")) | JUST(BoxingExpr("flatten-hierarchy"))
| JUST(BoxingExpr("asymmetric-x-to-b"));
const auto& core =
JUST(BoxingExpr("identity")) | JUST(BoxingExpr("flatten-hierarchy"))
| JUST(BoxingExpr("asymmetric-x-to-b")) | JUST(BoxingExpr("naive-1-to-p"))
| JUST(BoxingExpr(JUST(OutPlacementAndPartialSum()), JUST(BoxingExpr("naive-1-to-p")),
JUST(BoxingExpr("nccl-p-to-b")) | JUST(BoxingExpr("nccl-p-to-s"))));
return core | JUST(OptionalCudaCopy(core));
}
......@@ -143,6 +146,7 @@ Maybe<EagerBoxingInterpreter> GetBoxingInterpreter(Symbol<cfg::NdSbp> in_nd_sbp,
in_nd_sbp, out_nd_sbp, in_parallel_desc, out_parallel_desc));
if (interpreter.IsOk()) { return JUST(interpreter); }
}
const auto& in = JUST(PlacedNdSbp::New(in_nd_sbp, in_parallel_desc));
const auto& out = JUST(PlacedNdSbp::New(out_nd_sbp, out_parallel_desc));
......
......@@ -52,13 +52,29 @@ bool EagerBoxingInterpreterUtil::IsBoxingB2P(const cfg::SbpParallel& src,
return src.has_broadcast_parallel() && dst.has_partial_sum_parallel();
}
bool EagerBoxingInterpreterUtil::IsBroadcastNdSbp(Symbol<cfg::NdSbp> nd_sbp) {
bool EagerBoxingInterpreterUtil::IsAllBroadcastNdSbp(Symbol<cfg::NdSbp> nd_sbp) {
for (const auto& sbp_parallel : nd_sbp->sbp_parallel()) {
if (!sbp_parallel.has_broadcast_parallel()) { return false; }
}
return true;
}
bool EagerBoxingInterpreterUtil::IsAllPartialSumNdSbp(Symbol<cfg::NdSbp> nd_sbp) {
for (const auto& sbp_parallel : nd_sbp->sbp_parallel()) {
if (!sbp_parallel.has_partial_sum_parallel()) { return false; }
}
return true;
}
bool EagerBoxingInterpreterUtil::IsAllSplitNdSbp(Symbol<cfg::NdSbp> nd_sbp, int64_t axis) {
for (const auto& sbp_parallel : nd_sbp->sbp_parallel()) {
if (!(sbp_parallel.has_split_parallel() && sbp_parallel.split_parallel().axis() == axis)) {
return false;
}
}
return true;
}
bool EagerBoxingInterpreterUtil::IsBroadcastSbp(const cfg::SbpParallel& sbp) {
return sbp.has_broadcast_parallel();
}
......
......@@ -30,7 +30,9 @@ struct EagerBoxingInterpreterUtil {
static bool IsBoxingP2B(const cfg::SbpParallel& src, const cfg::SbpParallel& dst);
static bool IsBoxingB2S(const cfg::SbpParallel& src, const cfg::SbpParallel& dst);
static bool IsBoxingB2P(const cfg::SbpParallel& src, const cfg::SbpParallel& dst);
static bool IsBroadcastNdSbp(Symbol<cfg::NdSbp> nd_sbp);
static bool IsAllBroadcastNdSbp(Symbol<cfg::NdSbp> nd_sbp);
static bool IsAllPartialSumNdSbp(Symbol<cfg::NdSbp> nd_sbp);
static bool IsAllSplitNdSbp(Symbol<cfg::NdSbp> nd_sbp, int64_t axis);
static bool IsBroadcastSbp(const cfg::SbpParallel& sbp);
static bool IsPartialSumSbp(const cfg::SbpParallel& sbp);
static bool IsSplitSbp(const cfg::SbpParallel& sbp, int64_t axis);
......
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/control/global_process_ctx.h"
#include "oneflow/core/framework/nd_sbp.h"
#include "oneflow/core/framework/device.h"
#include "oneflow/core/framework/op_interpreter/boxing/eager_boxing_interpreter_util.h"
#include "oneflow/core/framework/op_interpreter/boxing/eager_boxing_interpreter.h"
#include "oneflow/core/functional/functional.h"
#include "oneflow/core/common/decorator.h"
namespace oneflow {
namespace {
Maybe<void> RawCheckNaive1ToP(Symbol<PlacedNdSbp> in, Symbol<PlacedNdSbp> out) {
CHECK_EQ_OR_RETURN(in->placement()->parallel_num(), 1);
CHECK_OR_RETURN(EagerBoxingInterpreterUtil::IsAllPartialSumNdSbp(out->nd_sbp()));
CHECK_OR_RETURN(out->placement()->Bigger(*in->placement()));
return Maybe<void>::Ok();
}
static constexpr auto* CheckNaive1ToP = DECORATE(&RawCheckNaive1ToP, ThreadLocal);
} // namespace
Maybe<one::Tensor> Naive1ToP(const std::shared_ptr<one::Tensor>& tensor, Symbol<PlacedNdSbp> in,
Symbol<PlacedNdSbp> out) {
const auto& tensor_nd_sbp = JUST(tensor->nd_sbp());
CHECK_OR_RETURN(tensor_nd_sbp == in->nd_sbp());
const auto& tensor_placement = JUST(tensor->parallel_desc());
CHECK_OR_RETURN(tensor_placement == in->placement());
int64_t root = JUST(tensor_placement->MachineId4ParallelId(0));
std::shared_ptr<one::Tensor> local_tensor = JUST(tensor->cur_rank_phy_tensor());
const auto& out_parallel_id = JUST(GetParallelId4CurrentProcessCtx(out->placement()));
if (root == GlobalProcessCtx::Rank() || !out_parallel_id->has_value()) {
// do nothing
} else {
const std::string& device_type = Device::Type4DeviceTag(tensor_placement->device_tag());
local_tensor = JUST(one::functional::Constant(*tensor->shape(), 0, tensor->dtype(),
JUST(Device::New(device_type))));
}
return JUST(one::functional::LocalToConsistent(local_tensor, out->placement(),
*JUST(GetSbpList(out->nd_sbp())), *tensor->shape(),
tensor->dtype()));
}
COMMAND(RegisterBoxingFunction("naive-1-to-p", CheckNaive1ToP, &Naive1ToP));
} // namespace oneflow
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/framework/nd_sbp.h"
#include "oneflow/core/framework/op_interpreter/boxing/eager_boxing_interpreter_util.h"
#include "oneflow/core/framework/op_interpreter/boxing/eager_boxing_interpreter.h"
#include "oneflow/core/common/decorator.h"
#include "oneflow/core/framework/id_util.h"
#include "oneflow/core/framework/op_interpreter/op_interpreter_util.h"
#include "oneflow/core/framework/op_expr.h"
#include "oneflow/core/framework/op_builder.h"
namespace oneflow {
namespace {
Maybe<void> RawCheckNcclP2B(Symbol<PlacedNdSbp> in, Symbol<PlacedNdSbp> out) {
CHECK_EQ_OR_RETURN(in->nd_sbp()->sbp_parallel_size(), 1);
CHECK_EQ_OR_RETURN(out->nd_sbp()->sbp_parallel_size(), 1);
CHECK_OR_RETURN(EagerBoxingInterpreterUtil::IsAllPartialSumNdSbp(in->nd_sbp()));
CHECK_OR_RETURN(EagerBoxingInterpreterUtil::IsAllBroadcastNdSbp(out->nd_sbp()));
CHECK_OR_RETURN(in->placement() == out->placement());
CHECK_EQ_OR_RETURN(in->placement()->device_type(), DeviceType::kGPU);
return Maybe<void>::Ok();
}
static constexpr auto* CheckNcclP2B = DECORATE(&RawCheckNcclP2B, ThreadLocal);
Maybe<void> RawCheckNcclP2S(Symbol<PlacedNdSbp> in, Symbol<PlacedNdSbp> out) {
CHECK_EQ_OR_RETURN(in->nd_sbp()->sbp_parallel_size(), 1);
CHECK_EQ_OR_RETURN(out->nd_sbp()->sbp_parallel_size(), 1);
CHECK_OR_RETURN(EagerBoxingInterpreterUtil::IsAllPartialSumNdSbp(in->nd_sbp()));
CHECK_OR_RETURN(EagerBoxingInterpreterUtil::IsAllSplitNdSbp(out->nd_sbp(), 0));
CHECK_OR_RETURN(in->placement() == out->placement());
CHECK_EQ_OR_RETURN(in->placement()->device_type(), DeviceType::kGPU);
return Maybe<void>::Ok();
}
static constexpr auto* CheckNcclP2S = DECORATE(&RawCheckNcclP2S, ThreadLocal);
Maybe<void> RawCheckNcclS2B(Symbol<PlacedNdSbp> in, Symbol<PlacedNdSbp> out) {
CHECK_EQ_OR_RETURN(in->nd_sbp()->sbp_parallel_size(), 1);
CHECK_EQ_OR_RETURN(out->nd_sbp()->sbp_parallel_size(), 1);
CHECK_OR_RETURN(EagerBoxingInterpreterUtil::IsAllSplitNdSbp(in->nd_sbp(), 0));
CHECK_OR_RETURN(EagerBoxingInterpreterUtil::IsAllBroadcastNdSbp(out->nd_sbp()));
CHECK_OR_RETURN(in->placement() == out->placement());
CHECK_EQ_OR_RETURN(in->placement()->device_type(), DeviceType::kGPU);
return Maybe<void>::Ok();
}
static constexpr auto* CheckNcclS2B = DECORATE(&RawCheckNcclS2B, ThreadLocal);
Maybe<one::UserOpExpr> EagerNcclAllReduce(Symbol<ParallelDesc> parallel_desc) {
return one::OpBuilder("eager_nccl_all_reduce", *JUST(UniqueStr("eager_nccl_all_reduce")))
.Input("in")
.Output("out")
.Attr<std::string>("parallel_conf", PbMessage2TxtString(parallel_desc->parallel_conf()))
.Build();
}
static constexpr auto* CachedEagerNcclAllReduceOpExpr = DECORATE(&EagerNcclAllReduce, ThreadLocal);
Maybe<one::UserOpExpr> EagerNcclReduceScatter(Symbol<ParallelDesc> parallel_desc,
const std::string& op_type) {
return one::OpBuilder("eager_nccl_reduce_scatter", *JUST(UniqueStr("eager_nccl_reduce_scatter")))
.Input("in")
.Output("out")
.Attr<std::string>("parallel_conf", PbMessage2TxtString(parallel_desc->parallel_conf()))
.Attr<std::string>("op_type", op_type)
.Build();
}
static constexpr auto* CachedNcclReduceScatterOpExpr =
DECORATE(&EagerNcclReduceScatter, ThreadLocalCopiable);
Maybe<one::UserOpExpr> EagerNcclAllGather(Symbol<ParallelDesc> parallel_desc) {
return one::OpBuilder("eager_nccl_all_gather", *JUST(UniqueStr("eager_nccl_all_gather")))
.Input("in")
.Output("out")
.Attr<std::string>("parallel_conf", PbMessage2TxtString(parallel_desc->parallel_conf()))
.Build();
}
static constexpr auto* CachedEagerNcclAllGatherOpExpr = DECORATE(&EagerNcclAllGather, ThreadLocal);
} // namespace
Maybe<one::Tensor> NcclP2B(const std::shared_ptr<one::Tensor>& tensor, Symbol<PlacedNdSbp> in,
Symbol<PlacedNdSbp> out) {
const auto& tensor_nd_sbp = JUST(tensor->nd_sbp());
CHECK_OR_RETURN(tensor_nd_sbp == in->nd_sbp());
const auto& tensor_placement = JUST(tensor->parallel_desc());
CHECK_OR_RETURN(tensor_placement == in->placement());
const auto& op_expr = JUST(CachedEagerNcclAllReduceOpExpr(in->placement()));
return JUST(one::OpInterpUtil::Dispatch<one::Tensor>(*op_expr, {tensor}));
}
Maybe<one::Tensor> NcclP2S(const std::shared_ptr<one::Tensor>& tensor, Symbol<PlacedNdSbp> in,
Symbol<PlacedNdSbp> out) {
const auto& tensor_nd_sbp = JUST(tensor->nd_sbp());
CHECK_OR_RETURN(tensor_nd_sbp == in->nd_sbp());
const auto& tensor_placement = JUST(tensor->parallel_desc());
CHECK_OR_RETURN(tensor_placement == in->placement());
const auto& op_expr = JUST(CachedNcclReduceScatterOpExpr(in->placement(), "sum"));
return JUST(one::OpInterpUtil::Dispatch<one::Tensor>(*op_expr, {tensor}));
}
Maybe<one::Tensor> NcclS2B(const std::shared_ptr<one::Tensor>& tensor, Symbol<PlacedNdSbp> in,
Symbol<PlacedNdSbp> out) {
const auto& tensor_nd_sbp = JUST(tensor->nd_sbp());
CHECK_OR_RETURN(tensor_nd_sbp == in->nd_sbp());
const auto& tensor_placement = JUST(tensor->parallel_desc());
CHECK_OR_RETURN(tensor_placement == in->placement());
const auto& op_expr = JUST(CachedEagerNcclAllGatherOpExpr(in->placement()));
return JUST(one::OpInterpUtil::Dispatch<one::Tensor>(*op_expr, {tensor}));
}
COMMAND(RegisterBoxingFunction("nccl-p-to-b", CheckNcclP2B, &NcclP2B));
COMMAND(RegisterBoxingFunction("nccl-p-to-s", CheckNcclP2S, &NcclP2S));
COMMAND(RegisterBoxingFunction("nccl-s-to-b", CheckNcclS2B, &NcclS2B));
} // namespace oneflow
......@@ -25,57 +25,37 @@ namespace oneflow {
namespace {
Maybe<void> RawCheckSymXToB(Symbol<PlacedNdSbp> in, Symbol<PlacedNdSbp> out) {
Maybe<void> RawCheckSymmetricXToB(Symbol<PlacedNdSbp> in, Symbol<PlacedNdSbp> out) {
CHECK_EQ_OR_RETURN(in->nd_sbp()->sbp_parallel_size(), 1);
CHECK_EQ_OR_RETURN(out->nd_sbp()->sbp_parallel_size(), 1);
CHECK_OR_RETURN(EagerBoxingInterpreterUtil::IsBroadcastNdSbp(out->nd_sbp()));
CHECK_OR_RETURN(EagerBoxingInterpreterUtil::IsAllBroadcastNdSbp(out->nd_sbp()));
CHECK_OR_RETURN(in->placement() == out->placement());
CHECK_OR_RETURN(in->placement()->device_type() == DeviceType::kGPU);
return Maybe<void>::Ok();
}
static constexpr auto* CheckSymXToB = DECORATE(&RawCheckSymXToB, ThreadLocal);
Maybe<one::UserOpExpr> EagerNcclAllReduce(Symbol<ParallelDesc> parallel_desc) {
return one::OpBuilder("eager_nccl_all_reduce", *JUST(UniqueStr("eager_nccl_all_reduce")))
.Input("in")
.Output("out")
.Attr<std::string>("parallel_conf", PbMessage2TxtString(parallel_desc->parallel_conf()))
.Build();
}
static constexpr auto* CachedEagerNcclAllReduceOpExpr = DECORATE(&EagerNcclAllReduce, ThreadLocal);
Maybe<one::UserOpExpr> EagerNcclAllGather(Symbol<ParallelDesc> parallel_desc) {
return one::OpBuilder("eager_nccl_all_gather", *JUST(UniqueStr("eager_nccl_all_gather")))
.Input("in")
.Output("out")
.Attr<std::string>("parallel_conf", PbMessage2TxtString(parallel_desc->parallel_conf()))
.Build();
}
static constexpr auto* CachedEagerNcclAllGatherOpExpr = DECORATE(&EagerNcclAllGather, ThreadLocal);
static constexpr auto* CheckSymmetricXToB = DECORATE(&RawCheckSymmetricXToB, ThreadLocal);
} // namespace
Maybe<one::Tensor> SymXToB(const std::shared_ptr<one::Tensor>& tensor, Symbol<PlacedNdSbp> in,
Symbol<PlacedNdSbp> out) {
Maybe<one::Tensor> SymmetricXToB(const std::shared_ptr<one::Tensor>& tensor, Symbol<PlacedNdSbp> in,
Symbol<PlacedNdSbp> out) {
const auto& tensor_nd_sbp = JUST(tensor->nd_sbp());
CHECK_OR_RETURN(tensor_nd_sbp == in->nd_sbp());
const auto& tensor_placement = JUST(tensor->parallel_desc());
CHECK_OR_RETURN(tensor_placement == in->placement());
if (EagerBoxingInterpreterUtil::IsBroadcastSbp(tensor_nd_sbp->sbp_parallel(0))) { return tensor; }
if (EagerBoxingInterpreterUtil::IsPartialSumSbp(tensor_nd_sbp->sbp_parallel(0))) {
const auto& op_expr = JUST(CachedEagerNcclAllReduceOpExpr(in->placement()));
return JUST(one::OpInterpUtil::Dispatch<one::Tensor>(*op_expr, {tensor}));
const auto& NcclPToBBoxingFunction = *JUST(GetBoxingFunction("nccl-p-to-b", in, out));
return JUST(NcclPToBBoxingFunction(tensor, in, out));
}
if (EagerBoxingInterpreterUtil::IsSplitSbp(tensor_nd_sbp->sbp_parallel(0), 0)) {
const auto& op_expr = JUST(CachedEagerNcclAllGatherOpExpr(in->placement()));
return JUST(one::OpInterpUtil::Dispatch<one::Tensor>(*op_expr, {tensor}));
const auto& NcclSToBBoxingFunction = *JUST(GetBoxingFunction("nccl-s-to-b", in, out));
return JUST(NcclSToBBoxingFunction(tensor, in, out));
}
UNIMPLEMENTED_THEN_RETURN();
}
COMMAND(RegisterBoxingFunction("symmetric-x-to-b", CheckSymXToB, &SymXToB));
COMMAND(RegisterBoxingFunction("symmetric-x-to-b", CheckSymmetricXToB, &SymmetricXToB));
} // namespace oneflow
......@@ -22,7 +22,7 @@ namespace oneflow {
namespace {
static const int kLimitParallelConfString = 1024 * 8;
static const int kLimitParallelConfString = 1024 * 64;
struct FlatParallelConf {
size_t available_size() const {
CHECK_GE(this->buffer_size, 0);
......
......@@ -284,17 +284,20 @@ void ParallelDesc::ClearUp() {
hierarchy_.reset(new Shape({parallel_num_}));
hierarchy_->ToProto(parallel_conf_.mutable_hierarchy());
}
cfg_parallel_conf_.reset(new cfg::ParallelConf(parallel_conf_));
SortAndRemoveDuplication(&sorted_machine_ids_);
parallel_conf_.clear_device_name();
int64_t parallel_id = 0;
for (int64_t machine_id : sorted_machine_ids_) {
for (int64_t device_id : *machine_id2sorted_dev_phy_ids_->at(machine_id)) {
parallel_conf_.add_device_name(std::string("@") + std::to_string(machine_id) + ":"
+ std::to_string(device_id));
parallel_id2machine_id_[parallel_id] = machine_id;
parallel_id2device_id_[parallel_id] = device_id;
machine_id2device_id2parallel_id_[machine_id][device_id] = parallel_id;
parallel_id += 1;
}
}
cfg_parallel_conf_.reset(new cfg::ParallelConf(parallel_conf_));
}
void ParallelDesc::set_device_type(DeviceType device_type) {
......
......@@ -600,5 +600,128 @@ class TestConsistentCast_XToB(flow.unittest.TestCase):
)
@flow.unittest.skip_unless_1n4d()
@unittest.skipIf(os.getenv("ONEFLOW_TEST_CPU_ONLY"), "only test cpu cases")
class TestConsistentCast_1ToN(flow.unittest.TestCase):
def test_consistent_to_consistent_1tb(test_case):
if flow.distributed.get_rank() == 0:
np_arr = np.array(
[[4, 6, 5, 20], [6, 2, 5, 7], [3, 7, 5, 4], [6, 8, 9, 4]],
dtype=np.float32,
)
else:
np_arr = np.array(
[[2, 10, 10, 7], [3, 9, 10, 5], [4, 6, 6, 9], [6, 8, 6, 4]],
dtype=np.float32,
)
device = flow.device("cuda")
tensor = flow.Tensor(np_arr, device=device, dtype=flow.float32)
placement = flow.placement("cuda", {0: range(1)})
consistent_tensor = tensor.to_consistent(placement, flow.sbp.split(0))
new_placement = flow.placement("cuda", {0: range(2)})
broadcast_tensor = consistent_tensor.to_consistent(
new_placement, flow.sbp.broadcast
)
test_case.assertTrue(broadcast_tensor.placement, new_placement)
if flow.distributed.get_rank() < 2:
test_case.assertTrue(
np.array_equal(
broadcast_tensor.to_local().numpy(),
np.array(
[[4, 6, 5, 20], [6, 2, 5, 7], [3, 7, 5, 4], [6, 8, 9, 4]],
dtype=np.float32,
),
)
)
def test_consistent_to_consistent_1tp(test_case):
if flow.distributed.get_rank() == 0:
np_arr = np.array(
[[4, 6, 5, 20], [6, 2, 5, 7], [3, 7, 5, 4], [6, 8, 9, 4]],
dtype=np.float32,
)
else:
np_arr = np.array(
[[2, 10, 10, 7], [3, 9, 10, 5], [4, 6, 6, 9], [6, 8, 6, 4]],
dtype=np.float32,
)
device = flow.device("cuda")
tensor = flow.Tensor(np_arr, device=device, dtype=flow.float32)
placement = flow.placement("cuda", {0: range(1)})
consistent_tensor = tensor.to_consistent(placement, flow.sbp.split(0))
new_placement = flow.placement("cuda", {0: range(2)})
partial_sum_tensor = consistent_tensor.to_consistent(
new_placement, flow.sbp.partial_sum
)
test_case.assertTrue(partial_sum_tensor.placement, new_placement)
if flow.distributed.get_rank() == 0:
test_case.assertTrue(
np.array_equal(
partial_sum_tensor.to_local().numpy(),
np.array(
[[4, 6, 5, 20], [6, 2, 5, 7], [3, 7, 5, 4], [6, 8, 9, 4]],
dtype=np.float32,
),
)
)
elif flow.distributed.get_rank() == 1:
test_case.assertTrue(
np.array_equal(
partial_sum_tensor.to_local().numpy(),
np.array(
[[0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0], [0, 0, 0, 0]],
dtype=np.float32,
),
)
)
def test_consistent_to_consistent_1ts(test_case):
if flow.distributed.get_rank() == 0:
np_arr = np.array(
[[4, 6, 5, 20], [6, 2, 5, 7], [3, 7, 5, 4], [6, 8, 9, 4]],
dtype=np.float32,
)
else:
np_arr = np.array(
[[2, 10, 10, 7], [3, 9, 10, 5], [4, 6, 6, 9], [6, 8, 6, 4]],
dtype=np.float32,
)
device = flow.device("cuda")
tensor = flow.Tensor(np_arr, device=device, dtype=flow.float32)
placement = flow.placement("cuda", {0: range(1)})
consistent_tensor = tensor.to_consistent(placement, flow.sbp.split(0))
new_placement = flow.placement("cuda", {0: range(4)})
split_tensor = consistent_tensor.to_consistent(new_placement, flow.sbp.split(0))
test_case.assertTrue(split_tensor.placement, new_placement)
if flow.distributed.get_rank() == 0:
test_case.assertTrue(
np.array_equal(
split_tensor.to_local().numpy(),
np.array([[4, 6, 5, 20]], dtype=np.float32,),
)
)
elif flow.distributed.get_rank() == 1:
test_case.assertTrue(
np.array_equal(
split_tensor.to_local().numpy(),
np.array([[6, 2, 5, 7]], dtype=np.float32,),
)
)
elif flow.distributed.get_rank() == 2:
test_case.assertTrue(
np.array_equal(
split_tensor.to_local().numpy(),
np.array([[3, 7, 5, 4]], dtype=np.float32,),
)
)
elif flow.distributed.get_rank() == 3:
test_case.assertTrue(
np.array_equal(
split_tensor.to_local().numpy(),
np.array([[6, 8, 9, 4]], dtype=np.float32,),
)
)
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册