提交 9b03ed3a 编写于 作者: L lixinqi

JobBuildAndInferCtx::Complete

上级 fbd1e7ec
#include "oneflow/core/job/compiler.h"
#include "oneflow/core/persistence/tee_persistent_log_stream.h"
#include "oneflow/core/job/cudnn_conv_ctx_cache_scope.h"
#include "oneflow/core/graph/op_graph.h"
#include "oneflow/core/job_completer/job_completer.h"
......@@ -48,7 +47,6 @@ void Compiler::GenNetTopo(Plan* plan) const {
}
void Compiler::Compile(Job* job, Plan* plan, bool need_job_complete) const {
auto cudnn_conv_ctx_cache_scope = std::make_unique<CudnnConvCtxCacheScope>();
const JobDesc& job_desc = GlobalJobDesc();
if (need_job_complete) { JobCompleter().Complete(job); }
Global<OpGraph>::New(*job);
......
#include "oneflow/core/job/cudnn_conv_ctx_cache_scope.h"
#include "oneflow/core/device/cudnn_conv_ctx_cache.h"
namespace oneflow {
CudnnConvCtxCacheScope::CudnnConvCtxCacheScope() {
#ifdef WITH_CUDA
Global<CudnnConvCtxCache>::New();
#endif
}
CudnnConvCtxCacheScope::~CudnnConvCtxCacheScope() {
#ifdef WITH_CUDA
Global<CudnnConvCtxCache>::Delete();
#endif
}
} // namespace oneflow
#ifndef ONEFLOW_CORE_JOB_CUDNN_CONV_CTX_CACHE_SCOPE_H_
#define ONEFLOW_CORE_JOB_CUDNN_CONV_CTX_CACHE_SCOPE_H_
#include "oneflow/core/common/util.h"
namespace oneflow {
class CudnnConvCtxCacheScope final {
public:
OF_DISALLOW_COPY_AND_MOVE(CudnnConvCtxCacheScope);
CudnnConvCtxCacheScope();
~CudnnConvCtxCacheScope();
};
} // namespace oneflow
#endif // ONEFLOW_CORE_JOB_CUDNN_CONV_CTX_CACHE_SCOPE_H_
#include "oneflow/core/job/job_build_and_infer_ctx.h"
#include "oneflow/core/job_completer/op_graph_pass.h"
#include "oneflow/core/framework/user_op_conf.h"
#include "oneflow/core/common/protobuf.h"
namespace oneflow {
......@@ -38,6 +40,26 @@ Maybe<void> JobBuildAndInferCtx::SetJobConf(const JobConfigProto& job_conf) {
return Maybe<void>::Ok();
}
Maybe<void> JobBuildAndInferCtx::Complete() {
CHECK_NOTNULL(Global<JobDesc>::Get());
Global<JobDesc>::Delete();
auto scope = std::make_unique<GlobalJobDescScope>(job_->job_conf(), job_id_);
auto DoPass = [&](const std::string& pass_name) { FunctionPass(pass_name)(job_); };
DoPass("CompleteOfrecordDecoder");
DoPass("SetDefaultVariableConf");
DoPass("AutoMixedPrecision");
DoPass("TieUpChainHeadersUnReachableFromAnyVariableOps");
DoPass("NonDistributedOptimizerPass");
DoPass("AutoTrainStep");
DoPass("AutoLearningRate");
DoPass("GenerateBackwardAndOptimizerOpConfs");
DoPass("SequentializeNcclTupleBroadcastReducePass");
DoPass("AddAllReduceGroupPass");
DoPass("AddLbiDiffWatcherOpConfs");
DoPass("SequentializeAllReduceGroupPass");
return Maybe<void>::Ok();
}
Maybe<void> JobBuildAndInferCtx::AddOpNameParallelConf2Placement(
const std::string& op_name, const ParallelConf& parallel_conf) {
ParallelDesc parallel_desc(parallel_conf);
......
......@@ -20,6 +20,7 @@ class JobBuildAndInferCtx {
Maybe<OperatorConf> CheckAndCompleteUserOpConf(const OperatorConf& op_conf);
Maybe<void> SetJobConf(const JobConfigProto& job_conf);
Maybe<void> Complete();
Maybe<void> AddAndInferOp(const OperatorConf& op_conf, const ParallelConf& parallel_conf);
Maybe<void> AddAndInferConsistentOp(const OperatorConf& op_conf,
const ParallelConf& parallel_conf);
......
......@@ -65,6 +65,7 @@ const ParallelConf& JobBuilder::ParallelConf4Lbi(const LogicalBlobId& lbi) const
void JobBuilder::AddOps(const ParallelConf& parallel_conf,
const std::vector<OperatorConf>& op_confs) {
if (op_confs.empty()) { return; }
auto* placemnt_group = job_->mutable_placement()->add_placement_group();
*placemnt_group->mutable_parallel_conf() = parallel_conf;
for (const auto& op_conf : op_confs) {
......
......@@ -6,7 +6,6 @@
#include "oneflow/core/job/improver.h"
#include "oneflow/core/job/job_desc.h"
#include "oneflow/core/job/job_builder.h"
#include "oneflow/core/job_completer/op_graph_pass.h"
#include "oneflow/core/job/job_set.pb.h"
#include "oneflow/core/job/machine_context.h"
#include "oneflow/core/job/profiler.h"
......@@ -743,7 +742,6 @@ void CompileAndMergePlanOnMaster(const PbRpf<Job>& conf_jobs, Plan* plan) {
AddJobName2JobId(jobs.at(job_id).job_conf().job_name(), job_id);
{
auto scope = std::make_unique<GlobalJobDescScope>(jobs.at(job_id).job_conf(), job_id);
FunctionPass("CompleteOfrecordDecoder")(&jobs.at(job_id));
CompileCurJobOnMaster(&jobs.at(job_id), &sub_plans.at(job_id), true);
}
}
......
......@@ -17,6 +17,7 @@
#include "oneflow/core/job/lbi_diff_watcher_info.pb.h"
#include "oneflow/core/job/job_set_compile_ctx.h"
#include "oneflow/core/job/runtime_buffer_managers_scope.h"
#include "oneflow/core/device/cudnn_conv_ctx_cache.h"
namespace oneflow {
......@@ -72,10 +73,16 @@ Maybe<void> SessionGlobalObjectsScope::Init(const ConfigProto& config_proto) {
Global<JobSetCompileCtx>::New();
Global<RuntimeBufferManagersScope>::New();
}
#ifdef WITH_CUDA
Global<CudnnConvCtxCache>::New();
#endif
return Maybe<void>::Ok();
}
SessionGlobalObjectsScope::~SessionGlobalObjectsScope() {
#ifdef WITH_CUDA
Global<CudnnConvCtxCache>::Delete();
#endif
if (Global<MachineCtx>::Get()->IsThisMachineMaster()) {
Global<RuntimeBufferManagersScope>::Delete();
Global<JobSetCompileCtx>::Delete();
......
......@@ -86,17 +86,6 @@ void DumpLogicalBlobDescAndSbpSignature(const OpGraph& op_graph, JobBuilder* job
} // namespace
void JobCompleter::Complete(Job* job) const {
FunctionPass("SetDefaultVariableConf")(job);
FunctionPass("AutoMixedPrecision")(job);
FunctionPass("TieUpChainHeadersUnReachableFromAnyVariableOps")(job);
FunctionPass("NonDistributedOptimizerPass")(job);
FunctionPass("AutoTrainStep")(job);
FunctionPass("AutoLearningRate")(job);
FunctionPass("GenerateBackwardAndOptimizerOpConfs")(job);
FunctionPass("SequentializeNcclTupleBroadcastReducePass")(job);
FunctionPass("AddAllReduceGroupPass")(job);
FunctionPass("AddLbiDiffWatcherOpConfs")(job);
FunctionPass("SequentializeAllReduceGroupPass")(job);
WithOpGraphAndMutJobBuilder(job, &DumpLogicalBlobDescAndSbpSignature);
WithOpGraphAndMutJobBuilder(job, &GroupBoxingByDstParallel);
WithOpGraphAndMutJobBuilder(job, &AddKeepHeaderOnlyOp);
......
......@@ -97,6 +97,11 @@ def CurJobBuildAndInferCtx_SetJobConf(job_config_proto):
error = text_format.Parse(error_str, error_util.ErrorProto())
if error.HasField("error_type"): raise JobBuildAndInferError(error)
def CurJobBuildAndInferCtx_Complete():
error_str = oneflow_internal.CurJobBuildAndInferCtx_Complete()
error = text_format.Parse(error_str, error_util.ErrorProto())
if error.HasField("error_type"): raise JobBuildAndInferError(error)
def CurJobBuildAndInferCtx_CheckAndCompleteUserOpConf(op_conf_proto):
serialized_op_conf = str(text_format.MessageToString(op_conf_proto))
AddDefaultVal = oneflow_internal.CurJobBuildAndInferCtx_CheckAndCompleteUserOpConf
......
......@@ -32,6 +32,7 @@ def Compile(function_desc, config_proto):
with _JobBuildAndInferCtx(job_conf.job_name), placement_scope, distribute_strategy:
c_api_util.CurJobBuildAndInferCtx_SetJobConf(job_conf)
_CompileJob(function_desc)
c_api_util.CurJobBuildAndInferCtx_Complete()
def _CompileJob(function_desc):
func = function_desc.job_func
......
from __future__ import absolute_import
import oneflow.python.framework.c_api_util as c_api_util
class JobBuildAndInferCtx(object):
def __init__(self, job_name):
self.job_name_ = job_name
def __enter__(self):
_Open(self.job_name_)
def __exit__(self, *args):
_Close(self.job_name_)
def GetCurCtxJobName():
return c_api_util.JobBuildAndInferCtx_GetCurrentJobName()
def CurCtxCheckJob():
c_api_util.CurJobBuildAndInferCtx_CheckJob()
def CurCtxSetJobConfIfNotSet(job_config_proto):
global job_conf_inited
if job_conf_inited == False:
c_api_util.CurJobBuildAndInferCtx_SetJobConf(job_config_proto)
job_conf_inited = True
def CurCtxCheckAndCompleteUserOpConf(op_conf_proto):
return c_api_util.CurJobBuildAndInferCtx_CheckAndCompleteUserOpConf(op_conf_proto)
def CurCtxAddAndInferOp(op_conf_proto, parallel_conf_proto):
return c_api_util.CurJobBuildAndInferCtx_AddAndInferOp(op_conf_proto, parallel_conf_proto)
def CurCtxAddLossLogicalBlobName(lbn):
return c_api_util.CurJobBuildAndInferCtx_AddLossLogicalBlobName(lbn)
def CurCtxAddLbiAndDiffWatcherUuidPair(lbi_and_uuid):
return c_api_util.CurJobBuildAndInferCtx_AddLbiAndDiffWatcherUuidPair(lbi_and_uuid)
def CurCtxHasJobConf():
return c_api_util.CurJobBuildAndInferCtx_HasJobConf()
def MirroredBlobGetStaticShape(job_name, lbn):
return c_api_util.JobBuildAndInferCtx_MirroredBlobGetStaticShape(job_name, lbn)
def MirroredBlobGetDataType(job_name, lbn):
return c_api_util.JobBuildAndInferCtx_MirroredBlobGetDataType(job_name, lbn)
def MirroredBlobIsDynamic(job_name, lbn):
return c_api_util.JobBuildAndInferCtx_MirroredBlobIsDynamic(job_name, lbn)
def MirroredBlobDisableBoxing(job_name, lbn):
return c_api_util.JobBuildAndInferCtx_MirroredBlobDisableBoxing(job_name, lbn)
def MirroredBlobIsTensorList(job_name, lbn):
return c_api_util.JobBuildAndInferCtx_MirroredBlobIsTensorList(job_name, lbn)
def MirroredBlobGetBatchAxis(job_name, lbn):
return c_api_util.JobBuildAndInferCtx_MirroredBlobGetBatchAxis(job_name, lbn)
def MirroredBlobGetSplitAxisFromProducerView(job_name, lbn):
return c_api_util.JobBuildAndInferCtx_MirroredBlobGetSplitAxisFromProducerView(job_name, lbn)
def MirroredBlobGetParallelConfFromProducerView(job_name, lbn):
return c_api_util.JobBuildAndInferCtx_MirroredBlobGetParallelConfFromProducerView(job_name, lbn)
def GetStaticShape(job_name, lbn):
return c_api_util.JobBuildAndInferCtx_GetStaticShape(job_name, lbn)
def GetDataType(job_name, lbn):
return c_api_util.JobBuildAndInferCtx_GetDataType(job_name, lbn)
def IsDynamic(job_name, lbn):
return c_api_util.JobBuildAndInferCtx_IsDynamic(job_name, lbn)
def DisableBoxing(job_name, lbn):
return c_api_util.JobBuildAndInferCtx_DisableBoxing(job_name, lbn)
def IsTensorList(job_name, lbn):
return c_api_util.JobBuildAndInferCtx_IsTensorList(job_name, lbn)
def GetBatchAxis(job_name, lbn):
return c_api_util.JobBuildAndInferCtx_GetBatchAxis(job_name, lbn)
def GetSplitAxisFromProducerView(job_name, lbn):
return c_api_util.JobBuildAndInferCtx_GetSplitAxisFromProducerView(job_name, lbn)
def GetParallelConfFromProducerView(job_name, lbn):
return c_api_util.JobBuildAndInferCtx_GetParallelConfFromProducerView(job_name, lbn)
def _Open(job_name):
return c_api_util.JobBuildAndInferCtx_Open(job_name)
def _Close(job_name):
global job_conf_inited
job_conf_inited = False
return c_api_util.JobBuildAndInferCtx_Close()
job_conf_inited = False
......@@ -40,6 +40,8 @@ Maybe<void> CurJobBuildAndInferCtx_SetJobConf(const std::string& serialized_job_
return JUST(GetCurInferCtx())->SetJobConf(job_conf);
}
Maybe<void> CurJobBuildAndInferCtx_Complete() { return JUST(GetCurInferCtx())->Complete(); }
Maybe<bool> CurJobBuildAndInferCtx_HasJobConf() { return JUST(GetCurInferCtx())->HasJobConf(); }
Maybe<std::string> CurJobBuildAndInferCtx_CheckAndCompleteUserOpConf(
......
......@@ -30,6 +30,10 @@ void CurJobBuildAndInferCtx_SetJobConf(const std::string& serialized_job_conf,
.GetDataAndSerializedErrorProto(error_str);
}
void CurJobBuildAndInferCtx_Complete(std::string* error_str) {
return oneflow::CurJobBuildAndInferCtx_Complete().GetDataAndSerializedErrorProto(error_str);
}
bool CurJobBuildAndInferCtx_HasJobConf(std::string* error_str) {
return oneflow::CurJobBuildAndInferCtx_HasJobConf().GetDataAndSerializedErrorProto(error_str,
false);
......
......@@ -4,7 +4,7 @@ import oneflow.python.framework.compile_context as compile_context
import oneflow.python.framework.blob_desc as blob_desc
import oneflow.python.framework.remote_blob as remote_blob_util
import oneflow.python.framework.id_util as id_util
import oneflow.python.framework.job_builder as job_builder
import oneflow.python.framework.c_api_util as c_api_util
import oneflow.core.operator.op_conf_pb2 as op_conf_util
import oneflow.core.framework.user_op_attr_pb2 as user_op_attr_util
import oneflow.core.register.logical_blob_id_pb2 as logical_blob_id_util
......@@ -35,7 +35,7 @@ class UserOpConfWrapperBuilder(object):
def Build(self):
assert self.user_op_.op_conf_.user_conf.op_type_name is not ""
self.user_op_.op_conf_ = \
job_builder.CurCtxCheckAndCompleteUserOpConf(self.user_op_.op_conf_)
c_api_util.CurJobBuildAndInferCtx_CheckAndCompleteUserOpConf(self.user_op_.op_conf_)
return self.user_op_
def Op(self, op_type_name):
......
......@@ -14,6 +14,7 @@ from test_util import Save
def compare_with_tensorflow(device_type, activation_type, shape):
assert device_type in ["gpu", "cpu"]
flow.clear_default_session()
flow.config.enable_debug_mode(True);
func_config = flow.FunctionConfig()
func_config.default_data_type(flow.float)
func_config.train.primary_lr(1e-4)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册