// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include #ifdef _WIN32 #include #include #include #else #include #include #include #endif #include #include "paddle/fluid/distributed/collective/ProcessGroupGloo.h" #include "paddle/fluid/framework/fleet/gloo_wrapper.h" #include "paddle/fluid/platform/enforce.h" namespace paddle { namespace distributed { #ifdef _WIN32 #define GENERATE_FUNC(type, func, ...) \ switch (type) { \ case experimental::DataType::FLOAT32: \ func(__VA_ARGS__); \ break; \ case experimental::DataType::FLOAT64: \ func(__VA_ARGS__); \ break; \ case experimental::DataType::FLOAT16: \ func(__VA_ARGS__); \ break; \ case experimental::DataType::INT32: \ func(__VA_ARGS__); \ break; \ case experimental::DataType::INT64: \ func(__VA_ARGS__); \ break; \ default: \ VLOG(0) << "Error: Unknown DataType."; \ exit(-1); \ } #define HOST_NAME_MAX 256 #else #define GENERATE_FUNC(type, func, args...) \ switch (type) { \ case experimental::DataType::FLOAT32: \ func(args); \ break; \ case experimental::DataType::FLOAT64: \ func(args); \ break; \ case experimental::DataType::FLOAT16: \ func(args); \ break; \ case experimental::DataType::INT32: \ func(args); \ break; \ case experimental::DataType::INT64: \ func(args); \ break; \ default: \ VLOG(0) << "Error: Unknown DataType."; \ exit(-1); \ } #endif typedef void (*reduce_func)(void*, const void*, const void*, size_t); template reduce_func get_function(const ReduceOp& r) { switch (r) { case ReduceOp::SUM: return reduce_func(&::gloo::sum); case ReduceOp::PRODUCT: return reduce_func(&::gloo::product); case ReduceOp::MIN: return reduce_func(&::gloo::min); case ReduceOp::MAX: return reduce_func(&::gloo::max); case ReduceOp::AVG: VLOG(0) << "Error: Unsupported ReduceOp::AVG."; exit(-1); } VLOG(0) << "Error: Unknown ReduceOp."; exit(-1); } bool CheckTensorsInCPUPlace(const std::vector& tensors) { return std::all_of(tensors.cbegin(), tensors.cend(), [&](const Tensor& t) { return t.place() == PlaceType::kCPU; }); } template T* get_data(const Tensor& tensor) { auto raw_tensor = std::dynamic_pointer_cast(tensor.impl()); return static_cast(raw_tensor->data()); } template std::vector get_multi_data(const std::vector& tensors) { std::vector ret(tensors.size()); for (size_t i = 0; i < tensors.size(); i++) { ret[i] = get_data(tensors[i]); } return ret; } template void set_output(P& opts, const Tensor& tensor) { // NOLINT opts.setOutput(get_data(tensor), tensor.numel()); } template void set_input(P& opts, const Tensor& tensor) { // NOLINT opts.setInput(get_data(tensor), tensor.numel()); } template void set_outputs(P& opts, const std::vector& tensors) { // NOLINT opts.setOutputs(get_multi_data(tensors), tensors[0].numel()); } template void set_inputs(P& opts, const std::vector& tensors) { // NOLINT opts.setInputs(get_multi_data(tensors), tensors[0].numel()); } ProcessGroupGloo::GlooTask::GlooTask(int rank, const std::vector& inputs, CommType comm_type) : ProcessGroup::Task(rank, inputs, comm_type) { PADDLE_ENFORCE_EQ(CheckTensorsInCPUPlace(inputs), true, platform::errors::Fatal( "Only CPU place is supported for ProcessGroupGloo.")); } ProcessGroupGloo::ProcessGroupGloo(const std::shared_ptr& store, int rank, int world_size, const std::shared_ptr options) : ProcessGroup(rank, world_size), _tag(0), _store(store) { _context = std::make_shared(rank, world_size); auto prefix_store = ::gloo::rendezvous::PrefixStore(std::to_string(0), *_store); _context->connectFullMesh(prefix_store, options->device); } class BroadcastGlooTask : public ProcessGroupGloo::GlooTask { public: BroadcastGlooTask(const std::shared_ptr& context, const std::vector& inputs, int rank, int root, uint32_t tag) : ProcessGroupGloo::GlooTask(rank, inputs, CommType::BROADCAST), _context(context), _root(root), _inputs(inputs), _tag(tag) {} void Run() override { _do_broadcast(_inputs[0]); } private: std::shared_ptr _context; const int _root; std::vector _inputs{}; const uint32_t _tag; void _do_broadcast(const Tensor& tensor) { gloo::BroadcastOptions opts(_context); const auto& dtype = tensor.type(); GENERATE_FUNC(dtype, set_output, opts, tensor); opts.setRoot(_root); opts.setTag(_tag); gloo::broadcast(opts); } }; std::shared_ptr ProcessGroupGloo::Broadcast( std::vector& inputs, const BroadcastOptions& opts) { auto root = opts.source_rank; std::unique_ptr task; auto tag = next_tag(); auto context = get_context(); task = std::make_unique(context, inputs, rank_, root, tag); task->Run(); return task; } class AllreduceGlooTask : public ProcessGroupGloo::GlooTask { public: AllreduceGlooTask(int rank, const std::shared_ptr& context, std::vector& inputs, ReduceOp reduce_op, // NOLINT uint32_t tag) : ProcessGroupGloo::GlooTask(rank, inputs, CommType::ALLREDUCE), _context(context), _inputs(inputs), _reduce_op(reduce_op), _tag(tag) {} void Run() override { _do_allreduce(_inputs); } private: std::shared_ptr _context; std::vector _inputs; const ReduceOp _reduce_op; uint32_t _tag; gloo::AllreduceOptions::Func _get_function(const experimental::DataType type, const ReduceOp op) { gloo::AllreduceOptions::Func fn; GENERATE_FUNC(type, _get_function_impl, fn, op); return fn; } template void _get_function_impl(gloo::AllreduceOptions::Func& fn, // NOLINT const ReduceOp op) { fn = get_function(op); } void _do_allreduce(std::vector& tensors) { // NOLINT const auto& dtype = tensors[0].type(); gloo::AllreduceOptions opts(_context); GENERATE_FUNC(dtype, set_inputs, opts, tensors); GENERATE_FUNC(dtype, set_outputs, opts, tensors); opts.setReduceFunction(_get_function(dtype, _reduce_op)); opts.setTag(_tag); gloo::allreduce(opts); } }; std::shared_ptr ProcessGroupGloo::AllReduce( std::vector& inputs, const AllreduceOptions& opts) { auto tag = next_tag(); std::shared_ptr task; auto context = get_context(); task = std::make_shared(rank_, context, inputs, opts.reduce_op, tag); task->Run(); return task; } std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo::createDeviceForInterface(const std::string& ifname) { ::gloo::transport::tcp::attr attr; attr.iface = ifname; return ::gloo::transport::tcp::CreateDevice(attr); } std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo::createDeviceForHostname(const std::string& hostname) { ::gloo::transport::tcp::attr attr; attr.hostname = hostname; return ::gloo::transport::tcp::CreateDevice(attr); } std::shared_ptr<::gloo::transport::Device> ProcessGroupGloo::createDefaultDevice() { std::array hostname{}; auto ret = ::gethostname(hostname.data(), HOST_NAME_MAX); PADDLE_ENFORCE_EQ(ret, 0, platform::errors::Fatal( "Get hostname error for createDefaultDevice.")); ::addrinfo* result; result = tcputils::get_addr_info(hostname.data(), "", 0, AF_UNSPEC); ::addrinfo* cur; for (cur = result; cur != nullptr; cur = cur->ai_next) { SocketType socket = ::socket(cur->ai_family, cur->ai_socktype, cur->ai_protocol); if (socket == -1) { continue; } ret = ::bind(socket, cur->ai_addr, cur->ai_addrlen); #ifdef _WIN32 closesocket(socket); #else close(socket); #endif if (ret == -1) { continue; } break; } freeaddrinfo(result); if (cur != nullptr) { return createDeviceForHostname(hostname.data()); } return createDeviceForHostname("127.0.0.1"); } } // namespace distributed } // namespace paddle