提交 83fc29e6 编写于 作者: L lixinqi

merge master


Former-commit-id: a4ce93e0dbc838e912b4b416e321d77449d3a6a5
上级 c46b52b5
......@@ -958,7 +958,7 @@ Maybe<void> LazyJobBuildAndInferCtx::Complete() {
JUST(DoPass("DynamicLossScaleSchedulePass"));
JUST(DoPass("AutoTrainStep"));
JUST(DoPass("AutoLearningRate"));
JUST(DoPass("SspPartition"));
JUST(DoPass("StagePartition"));
JUST(DoPass("GenerateBackwardAndOptimizerOpConfs"));
JUST(DoPass("AddSspVariableProxy"));
JUST(DoPass("CudnnFusedNormalizationAddReluPass"));
......
......@@ -19,15 +19,8 @@ namespace oneflow {
namespace {
REGISTER_FUNCTION_CONFIG_DEF()
.Bool("enable_ssp", false, "enable ssp")
.String("ssp_partition_strategy", "naive_sequantial",
"ssp partition strategy, Avaiable strategies: naive_sequantial | disable")
.ListInt64("ssp_partition_scope_ids", {}, "type: list[int64]. ssp partition scope symbol ids");
REGISTER_SCOPE_CONFIG_DEF()
.Int64("ssp_num_stages", -1, "total number of ssp stages")
.Int64("ssp_stage_id", -1, "current ssp stage id ");
REGISTER_FUNCTION_CONFIG_DEF().Bool("enable_ssp_variable_proxy", false,
"enable ssp variable proxy");
} // namespace
......
/*
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "oneflow/core/framework/config_def.h"
namespace oneflow {
namespace {
REGISTER_FUNCTION_CONFIG_DEF()
.Bool("enable_stage_partition", false, "enable stage partition")
.String("stage_partition_strategy", "naive_sequantial",
"stage partition strategy, Avaiable strategies: naive_sequantial | disable")
.ListInt64("stage_partition_scope_ids", {},
"type: list[int64]. stage partition scope symbol ids");
REGISTER_SCOPE_CONFIG_DEF()
.Int64("num_stages", -1, "total number of stages")
.Int64("stage_id", -1, "current stage id ");
} // namespace
} // namespace oneflow
......@@ -40,7 +40,7 @@ class AddSspVariableProxyPass final : public JobPass {
}
bool IsEnabled(const JobPassCtx& ctx) const {
return ctx.job_desc().IsTrain() && ctx.job_desc().Bool("enable_ssp");
return ctx.job_desc().IsTrain() && ctx.job_desc().Bool("enable_ssp_variable_proxy");
}
Maybe<void> Apply(const OpGraph& op_graph, JobBuilder* job_builder) const {
......@@ -125,8 +125,8 @@ class AddSspVariableProxyPass final : public JobPass {
const Scope& scope = JUST(Global<vm::SymbolStorage<Scope>>::Get()->MaybeGet(scope_symbol_id));
int64_t buffer_size = 0;
{
int64_t num_stages = scope.Int64("ssp_num_stages");
int64_t stage_id = scope.Int64("ssp_stage_id");
int64_t num_stages = scope.Int64("num_stages");
int64_t stage_id = scope.Int64("stage_id");
CHECK_GT(num_stages, 0);
CHECK_GE(stage_id, 0);
CHECK_LT(stage_id, num_stages);
......
......@@ -26,55 +26,55 @@ namespace oneflow {
namespace {
class SspPartitionStragety {
class StagePartitionStragety {
public:
SspPartitionStragety() = default;
~SspPartitionStragety() = default;
StagePartitionStragety() = default;
~StagePartitionStragety() = default;
virtual Maybe<void> Apply(Job* job, JobPassCtx* ctx) const = 0;
};
class SspPartitionPass final : public JobPass {
class StagePartitionPass final : public JobPass {
public:
SspPartitionPass() = default;
~SspPartitionPass() = default;
StagePartitionPass() = default;
~StagePartitionPass() = default;
Maybe<void> Apply(Job* job, JobPassCtx* ctx) const override {
if (!IsEnabled(*ctx)) { return Maybe<void>::Ok(); }
const std::string& partition_strategy = ctx->job_desc().String("ssp_partition_strategy");
std::unique_ptr<const SspPartitionStragety> strategy;
strategy.reset(NewObj<std::string, SspPartitionStragety>(partition_strategy));
const std::string& partition_strategy = ctx->job_desc().String("stage_partition_strategy");
std::unique_ptr<const StagePartitionStragety> strategy;
strategy.reset(NewObj<std::string, StagePartitionStragety>(partition_strategy));
return strategy->Apply(job, ctx);
}
bool IsEnabled(const JobPassCtx& ctx) const {
return ctx.job_desc().IsTrain() && ctx.job_desc().Bool("enable_ssp");
return ctx.job_desc().IsTrain() && ctx.job_desc().Bool("enable_stage_partition");
}
};
REGISTER_JOB_PASS("SspPartition", SspPartitionPass);
REGISTER_JOB_PASS("StagePartition", StagePartitionPass);
#define REGISTER_SSP_PARTITION_STRATEGY(strategy_name, strategy_type) \
REGISTER_CLASS_CREATOR(std::string, strategy_name, SspPartitionStragety, \
#define REGISTER_SSP_PARTITION_STRATEGY(strategy_name, strategy_type) \
REGISTER_CLASS_CREATOR(std::string, strategy_name, StagePartitionStragety, \
([] { return new strategy_type(); }));
class DisableSspPartitionStrategy : public SspPartitionStragety {
class DisableStagePartitionStrategy : public StagePartitionStragety {
public:
DisableSspPartitionStrategy() = default;
~DisableSspPartitionStrategy() = default;
DisableStagePartitionStrategy() = default;
~DisableStagePartitionStrategy() = default;
Maybe<void> Apply(Job* job, JobPassCtx*) const override { return Maybe<void>::Ok(); }
};
REGISTER_SSP_PARTITION_STRATEGY("disable", DisableSspPartitionStrategy);
REGISTER_SSP_PARTITION_STRATEGY("disable", DisableStagePartitionStrategy);
class NaiveSequantialSspPartitionStrategy : public SspPartitionStragety {
class NaiveSequantialStagePartitionStrategy : public StagePartitionStragety {
public:
NaiveSequantialSspPartitionStrategy() = default;
~NaiveSequantialSspPartitionStrategy() = default;
NaiveSequantialStagePartitionStrategy() = default;
~NaiveSequantialStagePartitionStrategy() = default;
Maybe<void> Apply(Job* job, JobPassCtx* ctx) const override {
const OpGraph op_graph(*job);
JobBuilder job_builder(job);
JUST(ForEachSspScope4TrainableFwOp(
JUST(ForEachStageScope4TrainableFwOp(
op_graph, ctx->job_desc(),
[&](const OpNode* op_node, const Scope& scope, int64_t scope_symbol_id) -> Maybe<void> {
// Sets scope_symbol_id
......@@ -93,28 +93,28 @@ class NaiveSequantialSspPartitionStrategy : public SspPartitionStragety {
}
private:
Maybe<void> ForEachSspScope4TrainableFwOp(
Maybe<void> ForEachStageScope4TrainableFwOp(
const OpGraph& op_graph, const JobDesc& job_desc,
const std::function<Maybe<void>(const OpNode*, const Scope&, int64_t scope_symbol_id)>&
Handler) const {
// Sequantialize trainable forward ops
std::list<std::unique_ptr<std::vector<OpNode*>>> sequantial_trainable_fw_ops;
JUST(GetSequantialTrainableFwOps(op_graph, &sequantial_trainable_fw_ops));
// Gets ssp partition config
std::vector<int64_t> ssp_partition_scope_ids;
JUST(GetSspPartitionScopeIds(job_desc, &ssp_partition_scope_ids));
// Gets stage partition config
std::vector<int64_t> stage_partition_scope_ids;
JUST(GetStagePartitionScopeIds(job_desc, &stage_partition_scope_ids));
// Partition to stages
std::function<Maybe<int64_t>(int64_t)> Stage4Depth;
int64_t num_stages = ssp_partition_scope_ids.size();
JUST(GetSspDepth2Stage(sequantial_trainable_fw_ops, num_stages, &Stage4Depth));
std::function<Maybe<const Scope&>(int64_t, int64_t*)> SspScope4Stage;
int64_t num_stages = stage_partition_scope_ids.size();
JUST(GetStageDepth2Stage(sequantial_trainable_fw_ops, num_stages, &Stage4Depth));
std::function<Maybe<const Scope&>(int64_t, int64_t*)> StageScope4Stage;
// Provides scope for each stage
JUST(MakeGetterSspScope4Stage(ssp_partition_scope_ids, &SspScope4Stage));
JUST(MakeGetterStageScope4Stage(stage_partition_scope_ids, &StageScope4Stage));
int64_t depth = 0;
for (const auto& fused_vec : sequantial_trainable_fw_ops) {
int64_t stage = JUST(Stage4Depth(depth));
int64_t scope_symbol_id = 0;
const auto& scope = JUST(SspScope4Stage(stage, &scope_symbol_id));
const auto& scope = JUST(StageScope4Stage(stage, &scope_symbol_id));
for (OpNode* op_node : *fused_vec) { JUST(Handler(op_node, scope, scope_symbol_id)); }
++depth;
}
......@@ -156,7 +156,7 @@ class NaiveSequantialSspPartitionStrategy : public SspPartitionStragety {
return Maybe<void>::Ok();
}
Maybe<void> GetSspDepth2Stage(
Maybe<void> GetStageDepth2Stage(
const std::list<std::unique_ptr<std::vector<OpNode*>>>& sequantial_trainable_fw_ops,
int64_t num_stages, std::function<Maybe<int64_t>(int64_t)>* Stage4Depth) const {
int64_t num_ops = 0;
......@@ -303,29 +303,29 @@ class NaiveSequantialSspPartitionStrategy : public SspPartitionStragety {
return Maybe<void>::Ok();
}
Maybe<void> MakeGetterSspScope4Stage(
const std::vector<int64_t>& ssp_partition_scope_ids,
std::function<Maybe<const Scope&>(int64_t stage, int64_t* scope_symbol_id)>* SspScope4Stage)
Maybe<void> MakeGetterStageScope4Stage(
const std::vector<int64_t>& stage_partition_scope_ids,
std::function<Maybe<const Scope&>(int64_t stage, int64_t* scope_symbol_id)>* StageScope4Stage)
const {
*SspScope4Stage = [ssp_partition_scope_ids](int64_t stage,
int64_t* scope_symbol_id) -> Maybe<const Scope&> {
*StageScope4Stage = [stage_partition_scope_ids](
int64_t stage, int64_t* scope_symbol_id) -> Maybe<const Scope&> {
CHECK_GE_OR_RETURN(stage, 0);
CHECK_LT_OR_RETURN(stage, ssp_partition_scope_ids.size());
*scope_symbol_id = ssp_partition_scope_ids.at(stage);
CHECK_LT_OR_RETURN(stage, stage_partition_scope_ids.size());
*scope_symbol_id = stage_partition_scope_ids.at(stage);
return Global<vm::SymbolStorage<Scope>>::Get()->Get(*scope_symbol_id);
};
return Maybe<void>::Ok();
}
Maybe<void> GetSspPartitionScopeIds(const JobDesc& job_desc,
std::vector<int64_t>* ssp_partition_scope_ids) const {
const auto& scope_ids = job_desc.ListInt64("ssp_partition_scope_ids");
Maybe<void> GetStagePartitionScopeIds(const JobDesc& job_desc,
std::vector<int64_t>* stage_partition_scope_ids) const {
const auto& scope_ids = job_desc.ListInt64("stage_partition_scope_ids");
CHECK_GT_OR_RETURN(scope_ids.size(), 0);
ssp_partition_scope_ids->assign(scope_ids.begin(), scope_ids.end());
stage_partition_scope_ids->assign(scope_ids.begin(), scope_ids.end());
return Maybe<void>::Ok();
}
};
REGISTER_SSP_PARTITION_STRATEGY("naive_sequantial", NaiveSequantialSspPartitionStrategy);
REGISTER_SSP_PARTITION_STRATEGY("naive_sequantial", NaiveSequantialStagePartitionStrategy);
} // namespace
......
......@@ -30,6 +30,9 @@ def SetAttrValue(attr_value, py_value, default_attr_value):
elif default_attr_value.HasField("at_string"):
assert type(py_value) is str
attr_value.at_string = py_value
elif default_attr_value.HasField("at_list_int64"):
assert type(py_value) is list
attr_value.at_list_int64.val[:] = py_value
else:
raise ValueError(
"config with type %s is invalid. supported types: [bool, int, float, str]"
......
......@@ -33,6 +33,7 @@ import oneflow.python.framework.placement_context as placement_ctx
import oneflow.python.framework.session_context as session_ctx
import oneflow.python.framework.typing_util as oft_util
import oneflow.python.lib.core.pb_util as pb_util
import oneflow.python.framework.attr_util as attr_util
from oneflow.python.framework.function_desc import FunctionDesc
from oneflow.python.oneflow_export import oneflow_export
import traceback
......@@ -56,27 +57,11 @@ class FunctionConfig(object):
default_val = name2default[attr_name]
def FunctionConfigSetter(
attr_value: Optional[Union[bool, int, float, str]] = None
py_value: Optional[Union[bool, int, float, str]] = None
) -> None:
if default_val.HasField("at_bool"):
if attr_value is None:
attr_value = True
assert type(attr_value) is bool
flag_name2flag_value[attr_name].at_bool = attr_value
elif default_val.HasField("at_int64"):
assert type(attr_value) is int
flag_name2flag_value[attr_name].at_int64 = attr_value
elif default_val.HasField("at_double"):
assert type(attr_value) is float
flag_name2flag_value[attr_name].at_double = attr_value
elif default_val.HasField("at_string"):
assert type(attr_value) is str
flag_name2flag_value[attr_name].at_string = attr_value
else:
raise NotImplementedError(
"config_flag `%s' with type %s is not supported"
% (attr_name, type(attr_value))
)
attr_util.SetAttrValue(
flag_name2flag_value[attr_name], py_value, default_val
)
return FunctionConfigSetter
......
"""
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import oneflow as flow
import numpy as np
import oneflow.typing as tp
import os
import unittest
@flow.unittest.skip_unless_1n1d()
class TestStagePartition(flow.unittest.TestCase):
def GetScopeSymbolIds(self, device_tag, device_name, num):
scope_symbol_ids = []
for i in range(num):
with flow.scope.placement(device_tag, device_name):
scope_symbol_ids.append(flow.current_scope().symbol_id)
return scope_symbol_ids
def test_stage_partition(self):
if flow.eager_execution_enabled():
return
device_name = "0:0"
flow.config.enable_debug_mode(True)
flow.config.cpu_device_num(2)
shape = (10,)
function_config = flow.FunctionConfig()
function_config.enable_stage_partition(True)
function_config.stage_partition_scope_ids(
self.GetScopeSymbolIds("gpu", device_name, 2)
)
@flow.global_function(type="train", function_config=function_config)
def Foo() -> tp.Numpy:
x = flow.constant(0, dtype=flow.float, shape=shape)
with flow.scope.placement("gpu", device_name):
for i in range(10):
w = flow.get_variable(
"w_%s" % i,
shape=shape,
dtype=flow.float,
initializer=flow.constant_initializer(0),
)
x = w + x
loss = x
flow.optimizer.SGD(
flow.optimizer.PiecewiseConstantScheduler([], [-10.0]), momentum=0
).minimize(loss)
return loss
checkpoint = flow.train.CheckPoint()
checkpoint.init()
for i in range(10):
x = Foo()
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册