collective_comm.cpp 2.9 KB
Newer Older
1
/**
M
Megvii Engine Team 已提交
2 3
 * \file imperative/src/impl/ops/collective_comm.cpp
 * MegEngine is Licensed under the Apache License, Version 2.0 (the "License")
4
 *
M
Megvii Engine Team 已提交
5
 * Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
6
 *
M
Megvii Engine Team 已提交
7 8 9
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10
 */
M
Megvii Engine Team 已提交
11

12 13 14 15 16 17
#include "megbrain_build_config.h"

#if MGB_ENABLE_OPR_MM
#include "../op_trait.h"
#include "../proxy_graph_detail.h"
#include "megbrain/opr/mm_handler.h"
18
#include "megbrain/utils/hash.h"
19 20
#endif // MGB_ENABLE_OPR_MM

21
#include "megbrain/imperative/ops/autogen.h"
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51

namespace mgb {
namespace imperative {

#if MGB_ENABLE_OPR_MM
namespace {
cg::OperatorNodeBase* apply_on_var_node(
        const OpDef& def,
        const VarNodeArray& inputs) {
    auto&& comm = def.cast_final_safe<CollectiveComm>();
    auto group_client = std::make_shared<GroupClientProxy>(
            ssprintf("%s:%d", comm.addr.data(), comm.port));
    SmallVector<std::shared_ptr<mgb::DeviceTensorND>> dev_buffer_arr(1, nullptr);
    auto disable = std::make_shared<DTypeScalar>();
    disable->set(0);

    cg::OperatorNodeConfig config;
    if (comm.comp_node.size() > 0) {
        config.comp_node(CompNode::load(comm.comp_node));
    }

    mgb_assert(inputs.size() == 1, "exactly one input expected");
    auto&& graph = inputs[0]->owner_graph();

    return graph->insert_opr(std::make_unique<opr::CollectiveComm>(
            inputs, graph, comm.key, comm.nr_devices, comm.is_root, comm.rank,
            comm.local_grad, group_client, comm.mode, comm.dtype, comm.backend,
            dev_buffer_arr, config, disable));
}

52 53 54 55 56 57 58 59 60 61 62 63
std::tuple<std::string, std::string> split_address(const std::string& address_and_port){
    auto index = address_and_port.find_last_of(':');
    mgb_assert(index != std::string::npos, "missing ':' in server address");
    return {address_and_port.substr(0, index), address_and_port.substr(index+1)};
}

std::shared_ptr<OpDef> make_from_op_node(cg::OperatorNodeBase* node) {
    auto&& comm = node->cast_final_safe<opr::CollectiveComm>();
    auto&& group_client = comm.group_client();
    auto [addr, port] = split_address(group_client->get_addr());
    auto comp_node = node->config().get_single_comp_node().to_string_logical();
    return std::make_shared<CollectiveComm>(
64 65
            comm.param().mode, comm.key(), comm.nr_devices(), comm.rank(),
            comm.is_root(), comm.local_grad(), addr, std::stoi(port),
66 67 68
            comm.dtype(), comm.backend(), comp_node);
}

69 70
OP_TRAIT_REG(CollectiveComm, CollectiveComm, opr::CollectiveComm)
    .apply_on_var_node(apply_on_var_node)
71
    .make_from_op_node(make_from_op_node)
72 73
    .fallback();
} // anonymous namespace
74
#endif // MGB_ENABLE_OPR_MM
75

76 77 78 79
}  // namespace imperative
}  // namespace mgb

// vim: syntax=cpp.doxygen foldmethod=marker foldmarker=f{{{,f}}}