eager_oneflow.cpp 5.3 KB
Newer Older
S
Shenghang Tsai 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
Copyright 2020 The OneFlow Authors. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
16
#include "oneflow/core/common/multi_client.h"
D
daquexian 已提交
17
#include "oneflow/core/control/ctrl_client.h"
qq_22305325's avatar
qq_22305325 已提交
18
#include "oneflow/core/control/global_process_ctx.h"
L
Li Xinqi 已提交
19
#include "oneflow/core/eager/eager_oneflow.h"
20
#include "oneflow/core/eager/eager_symbol.pb.h"
21
#include "oneflow/core/eager/eager_symbol.cfg.h"
22 23
#include "oneflow/core/vm/vm_util.h"
#include "oneflow/core/vm/instruction.pb.h"
L
Li Xinqi 已提交
24
#include "oneflow/core/vm/instruction.cfg.h"
L
Li Xinqi 已提交
25
#include "oneflow/core/vm/symbol_storage.h"
qq_22305325's avatar
qq_22305325 已提交
26
#include "oneflow/core/vm/string_symbol.h"
L
Li Xinqi 已提交
27
#include "oneflow/core/eager/eager_symbol.cfg.h"
L
Li Xinqi 已提交
28 29
#include "oneflow/core/job/job_desc.h"
#include "oneflow/core/job/scope.h"
L
Li Xinqi 已提交
30
#include "oneflow/core/job/cluster_instruction.h"
31 32
#include "oneflow/core/job/placement.pb.h"
#include "oneflow/core/operator/op_conf.pb.h"
qq_22305325's avatar
qq_22305325 已提交
33
#include "oneflow/core/operator/op_node_signature.pb.h"
L
Li Xinqi 已提交
34
#include "oneflow/core/operator/op_node_signature_desc.h"
qq_22305325's avatar
qq_22305325 已提交
35
#include "oneflow/core/operator/op_conf_symbol.h"
36 37 38 39
#include "oneflow/core/common/protobuf.h"
#include "oneflow/core/common/util.h"

namespace oneflow {
Z
Zhenhua 已提交
40
namespace vm {
41 42 43

namespace {

qq_22305325's avatar
qq_22305325 已提交
44
Maybe<void> StorageAdd(const EagerSymbol& symbol) {
45 46
  int64_t symbol_id = symbol.symbol_id();
  if (symbol.has_string_symbol()) {
qq_22305325's avatar
qq_22305325 已提交
47
    JUST(Global<symbol::Storage<StringSymbol>>::Get()->TryAdd(symbol_id, symbol.string_symbol()));
L
Li Xinqi 已提交
48
  } else if (symbol.has_scope_symbol()) {
L
Li Xinqi 已提交
49
    JUST(Global<symbol::Storage<Scope>>::Get()->TryAdd(symbol_id, symbol.scope_symbol()));
50
  } else if (symbol.has_job_conf_symbol()) {
qq_22305325's avatar
qq_22305325 已提交
51
    JUST(Global<symbol::Storage<JobDesc>>::Get()->TryAdd(symbol_id, symbol.job_conf_symbol()));
52
  } else if (symbol.has_parallel_conf_symbol()) {
qq_22305325's avatar
qq_22305325 已提交
53 54
    JUST(Global<symbol::Storage<ParallelDesc>>::Get()->TryAdd(symbol_id,
                                                              symbol.parallel_conf_symbol()));
55
  } else if (symbol.has_op_conf_symbol()) {
qq_22305325's avatar
qq_22305325 已提交
56 57
    JUST(Global<symbol::Storage<OperatorConfSymbol>>::Get()->TryAdd(symbol_id,
                                                                    symbol.op_conf_symbol()));
L
Li Xinqi 已提交
58
  } else if (symbol.has_op_node_signature_symbol()) {
qq_22305325's avatar
qq_22305325 已提交
59
    JUST(Global<symbol::Storage<OpNodeSignatureDesc>>::Get()->TryAdd(
qq_22305325's avatar
qq_22305325 已提交
60
        symbol_id, symbol.op_node_signature_symbol()));
61
  } else {
qq_22305325's avatar
qq_22305325 已提交
62
    OF_UNIMPLEMENTED();
63
  }
qq_22305325's avatar
qq_22305325 已提交
64
  return Maybe<void>::Ok();
65 66
}

L
Li Xinqi 已提交
67
}  // namespace
68

L
Li Xinqi 已提交
69 70
Maybe<void> EagerOneflow::RunPhysicalInstruction(
    const std::shared_ptr<const ClusterInstructionProto>& cluster_instruction) {
L
Li Xinqi 已提交
71
  vm::InstructionMsgList instruction_list;
72 73
  const auto& eager_instructions = cluster_instruction->eager_instruction();
  for (const auto& instr_proto : eager_instructions.instruction_list().instruction()) {
L
Li Xinqi 已提交
74
    instruction_list.EmplaceBack(intrusive::make_shared<vm::InstructionMsg>(instr_proto));
qq_22305325's avatar
qq_22305325 已提交
75
  }
76
  return RunPhysicalInstruction(&instruction_list, eager_instructions.eager_symbol_list());
77 78
}

L
Li Xinqi 已提交
79
Maybe<void> EagerOneflow::RunPhysicalInstruction(
L
Li Xinqi 已提交
80
    vm::InstructionMsgList* instruction_list,
Z
Zhenhua 已提交
81 82
    const vm::cfg::EagerSymbolList& cfg_eager_symbol_list) {
  vm::EagerSymbolList eager_symbol_list;
L
Li Xinqi 已提交
83
  cfg_eager_symbol_list.ToProto(&eager_symbol_list);
84
  return RunPhysicalInstruction(instruction_list, eager_symbol_list);
L
Li Xinqi 已提交
85 86
}

L
Li Xinqi 已提交
87
Maybe<void> EagerOneflow::RunPhysicalInstruction(vm::InstructionMsgList* instruction_list,
Z
Zhenhua 已提交
88
                                                 const vm::EagerSymbolList& eager_symbol_list) {
89 90 91
  for (const auto& eager_symbol : eager_symbol_list.eager_symbol()) {
    JUST(StorageAdd(eager_symbol));
  }
L
Li Xinqi 已提交
92
  return vm::Run(instruction_list);
93 94
}

Z
Zhenhua 已提交
95 96
Maybe<void> EagerOneflow::RunLogicalInstruction(vm::InstructionMsgList* instruction_list,
                                                const vm::cfg::EagerSymbolList& eager_symbol_list) {
97
  if (JUST(IsMultiClient())) {
98 99 100 101
    // NOTE(chengcheng): in Multi-Client LogicalRun will degenerate directly to PhysicalRun,
    //   because each rank will process instructions ONLY from itself, NOT the master.
    return RunPhysicalInstruction(instruction_list, eager_symbol_list);
  }
102 103
  ClusterInstructionProto cluster_instruction;
  auto* repeated_instruction_proto = cluster_instruction.mutable_eager_instruction()
104 105
                                         ->mutable_instruction_list()
                                         ->mutable_instruction();
L
Li Xinqi 已提交
106
  INTRUSIVE_FOR_EACH_PTR(instruction_msg, instruction_list) {
107 108
    instruction_msg->ToProto(repeated_instruction_proto->Add());
  }
L
Li Xinqi 已提交
109
  eager_symbol_list.ToProto(
110 111 112 113 114
      cluster_instruction.mutable_eager_instruction()->mutable_eager_symbol_list());
  CHECK(GlobalProcessCtx::IsThisProcessMaster());
  ClusterInstruction::MasterSendEagerInstruction(cluster_instruction);
  const auto& eage_instruction = cluster_instruction.eager_instruction();
  return RunPhysicalInstruction(instruction_list, eage_instruction.eager_symbol_list());
L
Li Xinqi 已提交
115 116
}

L
Li Xinqi 已提交
117 118
COMMAND(Global<EagerOneflow>::SetAllocated(new EagerOneflow()));

Z
Zhenhua 已提交
119
}  // namespace vm
120
}  // namespace oneflow