From 83fc29e6538e2a6b01bea09618db6e1a5473fb13 Mon Sep 17 00:00:00 2001 From: lixinqi Date: Sat, 28 Nov 2020 19:06:58 +0800 Subject: [PATCH] merge master Former-commit-id: a4ce93e0dbc838e912b4b416e321d77449d3a6a5 --- oneflow/core/job/job_build_and_infer_ctx.cpp | 2 +- oneflow/core/job/ssp_config_def.cpp | 11 +-- oneflow/core/job/stage_config_def.cpp | 35 ++++++++ .../job_rewriter/add_ssp_variable_proxy.cpp | 6 +- ...tion_pass.cpp => stage_partition_pass.cpp} | 86 +++++++++---------- oneflow/python/framework/attr_util.py | 3 + oneflow/python/framework/function_util.py | 25 ++---- .../python/test/ops/test_stage_partition.py | 74 ++++++++++++++++ 8 files changed, 166 insertions(+), 76 deletions(-) create mode 100644 oneflow/core/job/stage_config_def.cpp rename oneflow/core/job_rewriter/{ssp_partition_pass.cpp => stage_partition_pass.cpp} (82%) create mode 100644 oneflow/python/test/ops/test_stage_partition.py diff --git a/oneflow/core/job/job_build_and_infer_ctx.cpp b/oneflow/core/job/job_build_and_infer_ctx.cpp index ee7274419d..997371f3d2 100644 --- a/oneflow/core/job/job_build_and_infer_ctx.cpp +++ b/oneflow/core/job/job_build_and_infer_ctx.cpp @@ -958,7 +958,7 @@ Maybe LazyJobBuildAndInferCtx::Complete() { JUST(DoPass("DynamicLossScaleSchedulePass")); JUST(DoPass("AutoTrainStep")); JUST(DoPass("AutoLearningRate")); - JUST(DoPass("SspPartition")); + JUST(DoPass("StagePartition")); JUST(DoPass("GenerateBackwardAndOptimizerOpConfs")); JUST(DoPass("AddSspVariableProxy")); JUST(DoPass("CudnnFusedNormalizationAddReluPass")); diff --git a/oneflow/core/job/ssp_config_def.cpp b/oneflow/core/job/ssp_config_def.cpp index b12dc10195..a473f799c5 100644 --- a/oneflow/core/job/ssp_config_def.cpp +++ b/oneflow/core/job/ssp_config_def.cpp @@ -19,15 +19,8 @@ namespace oneflow { namespace { -REGISTER_FUNCTION_CONFIG_DEF() - .Bool("enable_ssp", false, "enable ssp") - .String("ssp_partition_strategy", "naive_sequantial", - "ssp partition strategy, Avaiable strategies: naive_sequantial | disable") - .ListInt64("ssp_partition_scope_ids", {}, "type: list[int64]. ssp partition scope symbol ids"); - -REGISTER_SCOPE_CONFIG_DEF() - .Int64("ssp_num_stages", -1, "total number of ssp stages") - .Int64("ssp_stage_id", -1, "current ssp stage id "); +REGISTER_FUNCTION_CONFIG_DEF().Bool("enable_ssp_variable_proxy", false, + "enable ssp variable proxy"); } // namespace diff --git a/oneflow/core/job/stage_config_def.cpp b/oneflow/core/job/stage_config_def.cpp new file mode 100644 index 0000000000..72fdcdb371 --- /dev/null +++ b/oneflow/core/job/stage_config_def.cpp @@ -0,0 +1,35 @@ +/* +Copyright 2020 The OneFlow Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#include "oneflow/core/framework/config_def.h" + +namespace oneflow { + +namespace { + +REGISTER_FUNCTION_CONFIG_DEF() + .Bool("enable_stage_partition", false, "enable stage partition") + .String("stage_partition_strategy", "naive_sequantial", + "stage partition strategy, Avaiable strategies: naive_sequantial | disable") + .ListInt64("stage_partition_scope_ids", {}, + "type: list[int64]. stage partition scope symbol ids"); + +REGISTER_SCOPE_CONFIG_DEF() + .Int64("num_stages", -1, "total number of stages") + .Int64("stage_id", -1, "current stage id "); + +} // namespace + +} // namespace oneflow diff --git a/oneflow/core/job_rewriter/add_ssp_variable_proxy.cpp b/oneflow/core/job_rewriter/add_ssp_variable_proxy.cpp index c01b56201f..0f30ed00ed 100644 --- a/oneflow/core/job_rewriter/add_ssp_variable_proxy.cpp +++ b/oneflow/core/job_rewriter/add_ssp_variable_proxy.cpp @@ -40,7 +40,7 @@ class AddSspVariableProxyPass final : public JobPass { } bool IsEnabled(const JobPassCtx& ctx) const { - return ctx.job_desc().IsTrain() && ctx.job_desc().Bool("enable_ssp"); + return ctx.job_desc().IsTrain() && ctx.job_desc().Bool("enable_ssp_variable_proxy"); } Maybe Apply(const OpGraph& op_graph, JobBuilder* job_builder) const { @@ -125,8 +125,8 @@ class AddSspVariableProxyPass final : public JobPass { const Scope& scope = JUST(Global>::Get()->MaybeGet(scope_symbol_id)); int64_t buffer_size = 0; { - int64_t num_stages = scope.Int64("ssp_num_stages"); - int64_t stage_id = scope.Int64("ssp_stage_id"); + int64_t num_stages = scope.Int64("num_stages"); + int64_t stage_id = scope.Int64("stage_id"); CHECK_GT(num_stages, 0); CHECK_GE(stage_id, 0); CHECK_LT(stage_id, num_stages); diff --git a/oneflow/core/job_rewriter/ssp_partition_pass.cpp b/oneflow/core/job_rewriter/stage_partition_pass.cpp similarity index 82% rename from oneflow/core/job_rewriter/ssp_partition_pass.cpp rename to oneflow/core/job_rewriter/stage_partition_pass.cpp index bf845b47ca..8a9e262286 100644 --- a/oneflow/core/job_rewriter/ssp_partition_pass.cpp +++ b/oneflow/core/job_rewriter/stage_partition_pass.cpp @@ -26,55 +26,55 @@ namespace oneflow { namespace { -class SspPartitionStragety { +class StagePartitionStragety { public: - SspPartitionStragety() = default; - ~SspPartitionStragety() = default; + StagePartitionStragety() = default; + ~StagePartitionStragety() = default; virtual Maybe Apply(Job* job, JobPassCtx* ctx) const = 0; }; -class SspPartitionPass final : public JobPass { +class StagePartitionPass final : public JobPass { public: - SspPartitionPass() = default; - ~SspPartitionPass() = default; + StagePartitionPass() = default; + ~StagePartitionPass() = default; Maybe Apply(Job* job, JobPassCtx* ctx) const override { if (!IsEnabled(*ctx)) { return Maybe::Ok(); } - const std::string& partition_strategy = ctx->job_desc().String("ssp_partition_strategy"); - std::unique_ptr strategy; - strategy.reset(NewObj(partition_strategy)); + const std::string& partition_strategy = ctx->job_desc().String("stage_partition_strategy"); + std::unique_ptr strategy; + strategy.reset(NewObj(partition_strategy)); return strategy->Apply(job, ctx); } bool IsEnabled(const JobPassCtx& ctx) const { - return ctx.job_desc().IsTrain() && ctx.job_desc().Bool("enable_ssp"); + return ctx.job_desc().IsTrain() && ctx.job_desc().Bool("enable_stage_partition"); } }; -REGISTER_JOB_PASS("SspPartition", SspPartitionPass); +REGISTER_JOB_PASS("StagePartition", StagePartitionPass); -#define REGISTER_SSP_PARTITION_STRATEGY(strategy_name, strategy_type) \ - REGISTER_CLASS_CREATOR(std::string, strategy_name, SspPartitionStragety, \ +#define REGISTER_SSP_PARTITION_STRATEGY(strategy_name, strategy_type) \ + REGISTER_CLASS_CREATOR(std::string, strategy_name, StagePartitionStragety, \ ([] { return new strategy_type(); })); -class DisableSspPartitionStrategy : public SspPartitionStragety { +class DisableStagePartitionStrategy : public StagePartitionStragety { public: - DisableSspPartitionStrategy() = default; - ~DisableSspPartitionStrategy() = default; + DisableStagePartitionStrategy() = default; + ~DisableStagePartitionStrategy() = default; Maybe Apply(Job* job, JobPassCtx*) const override { return Maybe::Ok(); } }; -REGISTER_SSP_PARTITION_STRATEGY("disable", DisableSspPartitionStrategy); +REGISTER_SSP_PARTITION_STRATEGY("disable", DisableStagePartitionStrategy); -class NaiveSequantialSspPartitionStrategy : public SspPartitionStragety { +class NaiveSequantialStagePartitionStrategy : public StagePartitionStragety { public: - NaiveSequantialSspPartitionStrategy() = default; - ~NaiveSequantialSspPartitionStrategy() = default; + NaiveSequantialStagePartitionStrategy() = default; + ~NaiveSequantialStagePartitionStrategy() = default; Maybe Apply(Job* job, JobPassCtx* ctx) const override { const OpGraph op_graph(*job); JobBuilder job_builder(job); - JUST(ForEachSspScope4TrainableFwOp( + JUST(ForEachStageScope4TrainableFwOp( op_graph, ctx->job_desc(), [&](const OpNode* op_node, const Scope& scope, int64_t scope_symbol_id) -> Maybe { // Sets scope_symbol_id @@ -93,28 +93,28 @@ class NaiveSequantialSspPartitionStrategy : public SspPartitionStragety { } private: - Maybe ForEachSspScope4TrainableFwOp( + Maybe ForEachStageScope4TrainableFwOp( const OpGraph& op_graph, const JobDesc& job_desc, const std::function(const OpNode*, const Scope&, int64_t scope_symbol_id)>& Handler) const { // Sequantialize trainable forward ops std::list>> sequantial_trainable_fw_ops; JUST(GetSequantialTrainableFwOps(op_graph, &sequantial_trainable_fw_ops)); - // Gets ssp partition config - std::vector ssp_partition_scope_ids; - JUST(GetSspPartitionScopeIds(job_desc, &ssp_partition_scope_ids)); + // Gets stage partition config + std::vector stage_partition_scope_ids; + JUST(GetStagePartitionScopeIds(job_desc, &stage_partition_scope_ids)); // Partition to stages std::function(int64_t)> Stage4Depth; - int64_t num_stages = ssp_partition_scope_ids.size(); - JUST(GetSspDepth2Stage(sequantial_trainable_fw_ops, num_stages, &Stage4Depth)); - std::function(int64_t, int64_t*)> SspScope4Stage; + int64_t num_stages = stage_partition_scope_ids.size(); + JUST(GetStageDepth2Stage(sequantial_trainable_fw_ops, num_stages, &Stage4Depth)); + std::function(int64_t, int64_t*)> StageScope4Stage; // Provides scope for each stage - JUST(MakeGetterSspScope4Stage(ssp_partition_scope_ids, &SspScope4Stage)); + JUST(MakeGetterStageScope4Stage(stage_partition_scope_ids, &StageScope4Stage)); int64_t depth = 0; for (const auto& fused_vec : sequantial_trainable_fw_ops) { int64_t stage = JUST(Stage4Depth(depth)); int64_t scope_symbol_id = 0; - const auto& scope = JUST(SspScope4Stage(stage, &scope_symbol_id)); + const auto& scope = JUST(StageScope4Stage(stage, &scope_symbol_id)); for (OpNode* op_node : *fused_vec) { JUST(Handler(op_node, scope, scope_symbol_id)); } ++depth; } @@ -156,7 +156,7 @@ class NaiveSequantialSspPartitionStrategy : public SspPartitionStragety { return Maybe::Ok(); } - Maybe GetSspDepth2Stage( + Maybe GetStageDepth2Stage( const std::list>>& sequantial_trainable_fw_ops, int64_t num_stages, std::function(int64_t)>* Stage4Depth) const { int64_t num_ops = 0; @@ -303,29 +303,29 @@ class NaiveSequantialSspPartitionStrategy : public SspPartitionStragety { return Maybe::Ok(); } - Maybe MakeGetterSspScope4Stage( - const std::vector& ssp_partition_scope_ids, - std::function(int64_t stage, int64_t* scope_symbol_id)>* SspScope4Stage) + Maybe MakeGetterStageScope4Stage( + const std::vector& stage_partition_scope_ids, + std::function(int64_t stage, int64_t* scope_symbol_id)>* StageScope4Stage) const { - *SspScope4Stage = [ssp_partition_scope_ids](int64_t stage, - int64_t* scope_symbol_id) -> Maybe { + *StageScope4Stage = [stage_partition_scope_ids]( + int64_t stage, int64_t* scope_symbol_id) -> Maybe { CHECK_GE_OR_RETURN(stage, 0); - CHECK_LT_OR_RETURN(stage, ssp_partition_scope_ids.size()); - *scope_symbol_id = ssp_partition_scope_ids.at(stage); + CHECK_LT_OR_RETURN(stage, stage_partition_scope_ids.size()); + *scope_symbol_id = stage_partition_scope_ids.at(stage); return Global>::Get()->Get(*scope_symbol_id); }; return Maybe::Ok(); } - Maybe GetSspPartitionScopeIds(const JobDesc& job_desc, - std::vector* ssp_partition_scope_ids) const { - const auto& scope_ids = job_desc.ListInt64("ssp_partition_scope_ids"); + Maybe GetStagePartitionScopeIds(const JobDesc& job_desc, + std::vector* stage_partition_scope_ids) const { + const auto& scope_ids = job_desc.ListInt64("stage_partition_scope_ids"); CHECK_GT_OR_RETURN(scope_ids.size(), 0); - ssp_partition_scope_ids->assign(scope_ids.begin(), scope_ids.end()); + stage_partition_scope_ids->assign(scope_ids.begin(), scope_ids.end()); return Maybe::Ok(); } }; -REGISTER_SSP_PARTITION_STRATEGY("naive_sequantial", NaiveSequantialSspPartitionStrategy); +REGISTER_SSP_PARTITION_STRATEGY("naive_sequantial", NaiveSequantialStagePartitionStrategy); } // namespace diff --git a/oneflow/python/framework/attr_util.py b/oneflow/python/framework/attr_util.py index 61d5f34aa6..3d2cdf0f6e 100644 --- a/oneflow/python/framework/attr_util.py +++ b/oneflow/python/framework/attr_util.py @@ -30,6 +30,9 @@ def SetAttrValue(attr_value, py_value, default_attr_value): elif default_attr_value.HasField("at_string"): assert type(py_value) is str attr_value.at_string = py_value + elif default_attr_value.HasField("at_list_int64"): + assert type(py_value) is list + attr_value.at_list_int64.val[:] = py_value else: raise ValueError( "config with type %s is invalid. supported types: [bool, int, float, str]" diff --git a/oneflow/python/framework/function_util.py b/oneflow/python/framework/function_util.py index 87b1469469..9058695f75 100644 --- a/oneflow/python/framework/function_util.py +++ b/oneflow/python/framework/function_util.py @@ -33,6 +33,7 @@ import oneflow.python.framework.placement_context as placement_ctx import oneflow.python.framework.session_context as session_ctx import oneflow.python.framework.typing_util as oft_util import oneflow.python.lib.core.pb_util as pb_util +import oneflow.python.framework.attr_util as attr_util from oneflow.python.framework.function_desc import FunctionDesc from oneflow.python.oneflow_export import oneflow_export import traceback @@ -56,27 +57,11 @@ class FunctionConfig(object): default_val = name2default[attr_name] def FunctionConfigSetter( - attr_value: Optional[Union[bool, int, float, str]] = None + py_value: Optional[Union[bool, int, float, str]] = None ) -> None: - if default_val.HasField("at_bool"): - if attr_value is None: - attr_value = True - assert type(attr_value) is bool - flag_name2flag_value[attr_name].at_bool = attr_value - elif default_val.HasField("at_int64"): - assert type(attr_value) is int - flag_name2flag_value[attr_name].at_int64 = attr_value - elif default_val.HasField("at_double"): - assert type(attr_value) is float - flag_name2flag_value[attr_name].at_double = attr_value - elif default_val.HasField("at_string"): - assert type(attr_value) is str - flag_name2flag_value[attr_name].at_string = attr_value - else: - raise NotImplementedError( - "config_flag `%s' with type %s is not supported" - % (attr_name, type(attr_value)) - ) + attr_util.SetAttrValue( + flag_name2flag_value[attr_name], py_value, default_val + ) return FunctionConfigSetter diff --git a/oneflow/python/test/ops/test_stage_partition.py b/oneflow/python/test/ops/test_stage_partition.py new file mode 100644 index 0000000000..754a00ebe6 --- /dev/null +++ b/oneflow/python/test/ops/test_stage_partition.py @@ -0,0 +1,74 @@ +""" +Copyright 2020 The OneFlow Authors. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import oneflow as flow +import numpy as np +import oneflow.typing as tp +import os +import unittest + + +@flow.unittest.skip_unless_1n1d() +class TestStagePartition(flow.unittest.TestCase): + def GetScopeSymbolIds(self, device_tag, device_name, num): + scope_symbol_ids = [] + for i in range(num): + with flow.scope.placement(device_tag, device_name): + scope_symbol_ids.append(flow.current_scope().symbol_id) + return scope_symbol_ids + + def test_stage_partition(self): + if flow.eager_execution_enabled(): + return + device_name = "0:0" + + flow.config.enable_debug_mode(True) + flow.config.cpu_device_num(2) + + shape = (10,) + + function_config = flow.FunctionConfig() + function_config.enable_stage_partition(True) + function_config.stage_partition_scope_ids( + self.GetScopeSymbolIds("gpu", device_name, 2) + ) + + @flow.global_function(type="train", function_config=function_config) + def Foo() -> tp.Numpy: + x = flow.constant(0, dtype=flow.float, shape=shape) + with flow.scope.placement("gpu", device_name): + for i in range(10): + w = flow.get_variable( + "w_%s" % i, + shape=shape, + dtype=flow.float, + initializer=flow.constant_initializer(0), + ) + x = w + x + loss = x + flow.optimizer.SGD( + flow.optimizer.PiecewiseConstantScheduler([], [-10.0]), momentum=0 + ).minimize(loss) + return loss + + checkpoint = flow.train.CheckPoint() + checkpoint.init() + + for i in range(10): + x = Foo() + + +if __name__ == "__main__": + unittest.main() -- GitLab