From be528bf2b34349e9c87673d21ee0717661a083a9 Mon Sep 17 00:00:00 2001 From: xujiaqi01 <173596896@qq.com> Date: Tue, 4 Feb 2020 20:46:51 +0800 Subject: [PATCH] add collective communication library in fleet (#22211) (#22435) * add collective communication library in fleet to replace mpi * test=develop --- cmake/configure.cmake | 4 + cmake/external/gloo.cmake | 60 +++++++ cmake/third_party.cmake | 5 + paddle/fluid/framework/fleet/CMakeLists.txt | 8 + paddle/fluid/framework/fleet/fleet_wrapper.cc | 10 ++ paddle/fluid/framework/fleet/fleet_wrapper.h | 2 + paddle/fluid/framework/fleet/gloo_wrapper.cc | 166 ++++++++++++++++++ paddle/fluid/framework/fleet/gloo_wrapper.h | 161 +++++++++++++++++ paddle/fluid/framework/fleet/test_fleet.cc | 66 +++++++ paddle/fluid/framework/io/CMakeLists.txt | 2 + paddle/fluid/framework/io/fs.cc | 29 +++ paddle/fluid/framework/io/fs.h | 7 + paddle/fluid/framework/io/test_fs.cc | 47 +++++ paddle/fluid/pybind/CMakeLists.txt | 4 +- paddle/fluid/pybind/fleet_wrapper_py.cc | 6 +- paddle/fluid/pybind/gloo_wrapper_py.cc | 48 +++++ paddle/fluid/pybind/gloo_wrapper_py.h | 28 +++ paddle/fluid/pybind/pybind.cc | 2 + 18 files changed, 653 insertions(+), 2 deletions(-) create mode 100644 cmake/external/gloo.cmake create mode 100644 paddle/fluid/framework/fleet/gloo_wrapper.cc create mode 100644 paddle/fluid/framework/fleet/gloo_wrapper.h create mode 100644 paddle/fluid/framework/fleet/test_fleet.cc create mode 100644 paddle/fluid/framework/io/test_fs.cc create mode 100644 paddle/fluid/pybind/gloo_wrapper_py.cc create mode 100644 paddle/fluid/pybind/gloo_wrapper_py.h diff --git a/cmake/configure.cmake b/cmake/configure.cmake index 77facca600..72425f3f48 100644 --- a/cmake/configure.cmake +++ b/cmake/configure.cmake @@ -58,6 +58,10 @@ if(WITH_PSLIB) add_definitions(-DPADDLE_WITH_PSLIB) endif() +if(WITH_GLOO) + add_definitions(-DPADDLE_WITH_GLOO) +endif() + if(WITH_BOX_PS) add_definitions(-DPADDLE_WITH_BOX_PS) endif() diff --git a/cmake/external/gloo.cmake b/cmake/external/gloo.cmake new file mode 100644 index 0000000000..337e326dc1 --- /dev/null +++ b/cmake/external/gloo.cmake @@ -0,0 +1,60 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +INCLUDE(ExternalProject) + +SET(GLOO_PROJECT "extern_gloo") +IF((NOT DEFINED GLOO_VER) OR (NOT DEFINED GLOO_URL)) + MESSAGE(STATUS "use pre defined download url") + SET(GLOO_VER "master" CACHE STRING "" FORCE) + SET(GLOO_NAME "gloo" CACHE STRING "" FORCE) + SET(GLOO_URL "https://pslib.bj.bcebos.com/gloo.tar.gz" CACHE STRING "" FORCE) +ENDIF() +MESSAGE(STATUS "GLOO_NAME: ${GLOO_NAME}, GLOO_URL: ${GLOO_URL}") +SET(GLOO_SOURCE_DIR "${THIRD_PARTY_PATH}/gloo") +SET(GLOO_DOWNLOAD_DIR "${GLOO_SOURCE_DIR}/src/${GLOO_PROJECT}") +SET(GLOO_DST_DIR "gloo") +SET(GLOO_INSTALL_ROOT "${THIRD_PARTY_PATH}/install") +SET(GLOO_INSTALL_DIR ${GLOO_INSTALL_ROOT}/${GLOO_DST_DIR}) +SET(GLOO_ROOT ${GLOO_INSTALL_DIR}) +SET(GLOO_INC_DIR ${GLOO_ROOT}/include) +SET(GLOO_LIB_DIR ${GLOO_ROOT}/lib) +SET(GLOO_LIB ${GLOO_LIB_DIR}/libgloo.a) +#SET(GLOO_IOMP_LIB ${GLOO_LIB_DIR}/libiomp5.so) #todo what is this +SET(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_RPATH}" "${GLOO_ROOT}/lib") + +INCLUDE_DIRECTORIES(${GLOO_INC_DIR}) + +FILE(WRITE ${GLOO_DOWNLOAD_DIR}/CMakeLists.txt + "PROJECT(GLOO)\n" + "cmake_minimum_required(VERSION 3.0)\n" + "install(DIRECTORY ${GLOO_NAME}/include ${GLOO_NAME}/lib \n" + " DESTINATION ${GLOO_DST_DIR})\n") + +ExternalProject_Add( + ${GLOO_PROJECT} + ${EXTERNAL_PROJECT_LOG_ARGS} + PREFIX ${GLOO_SOURCE_DIR} + DOWNLOAD_DIR ${GLOO_DOWNLOAD_DIR} + DOWNLOAD_COMMAND wget --no-check-certificate ${GLOO_URL} -c -q -O ${GLOO_NAME}.tar.gz + && tar zxvf ${GLOO_NAME}.tar.gz + DOWNLOAD_NO_PROGRESS 1 + UPDATE_COMMAND "" + CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${GLOO_INSTALL_ROOT} + CMAKE_CACHE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=${GLOO_INSTALL_ROOT} +) + +ADD_LIBRARY(gloo SHARED IMPORTED GLOBAL) +SET_PROPERTY(TARGET gloo PROPERTY IMPORTED_LOCATION ${GLOO_LIB}) +ADD_DEPENDENCIES(gloo ${GLOO_PROJECT}) diff --git a/cmake/third_party.cmake b/cmake/third_party.cmake index 2288ecd09c..950bd4ca09 100644 --- a/cmake/third_party.cmake +++ b/cmake/third_party.cmake @@ -241,6 +241,11 @@ if(WITH_PSLIB) endif() endif(WITH_PSLIB) +if(NOT WIN32 AND NOT APPLE) + include(external/gloo) + list(APPEND third_party_deps extern_gloo) +endif() + if(WITH_BOX_PS) include(external/box_ps) list(APPEND third_party_deps extern_box_ps) diff --git a/paddle/fluid/framework/fleet/CMakeLists.txt b/paddle/fluid/framework/fleet/CMakeLists.txt index 424063970b..1b031ab315 100644 --- a/paddle/fluid/framework/fleet/CMakeLists.txt +++ b/paddle/fluid/framework/fleet/CMakeLists.txt @@ -10,3 +10,11 @@ if(WITH_BOX_PS) else() cc_library(box_wrapper SRCS box_wrapper.cc DEPS framework_proto lod_tensor) endif(WITH_BOX_PS) + +if(WITH_GLOO) + cc_library(gloo_wrapper SRCS gloo_wrapper.cc DEPS framework_proto variable_helper scope gloo) +else() + cc_library(gloo_wrapper SRCS gloo_wrapper.cc DEPS framework_proto variable_helper scope) +endif(WITH_GLOO) + +cc_test(test_fleet SRCS test_fleet.cc DEPS fleet_wrapper gloo_wrapper fs shell) diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.cc b/paddle/fluid/framework/fleet/fleet_wrapper.cc index ff1a7db69a..29d1bf5b2e 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.cc +++ b/paddle/fluid/framework/fleet/fleet_wrapper.cc @@ -107,6 +107,16 @@ uint64_t FleetWrapper::RunServer() { #endif } +uint64_t FleetWrapper::RunServer(const std::string& ip, uint32_t port) { +#ifdef PADDLE_WITH_PSLIB + VLOG(3) << "Going to run server with ip " << ip << " port " << port; + auto ret = pslib_ptr_->run_server(ip, port); + return ret; +#else + return 0; +#endif +} + void FleetWrapper::GatherServers(const std::vector& host_sign_list, int node_num) { #ifdef PADDLE_WITH_PSLIB diff --git a/paddle/fluid/framework/fleet/fleet_wrapper.h b/paddle/fluid/framework/fleet/fleet_wrapper.h index f86df13c9a..40c500c647 100644 --- a/paddle/fluid/framework/fleet/fleet_wrapper.h +++ b/paddle/fluid/framework/fleet/fleet_wrapper.h @@ -151,6 +151,8 @@ class FleetWrapper { void FinalizeWorker(); // run server uint64_t RunServer(); + // run server with ip port + uint64_t RunServer(const std::string& ip, uint32_t port); // gather server ip void GatherServers(const std::vector& host_sign_list, int node_num); // gather client ip diff --git a/paddle/fluid/framework/fleet/gloo_wrapper.cc b/paddle/fluid/framework/fleet/gloo_wrapper.cc new file mode 100644 index 0000000000..c839bd1d38 --- /dev/null +++ b/paddle/fluid/framework/fleet/gloo_wrapper.cc @@ -0,0 +1,166 @@ +/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/framework/fleet/gloo_wrapper.h" +#include +#include "paddle/fluid/framework/io/fs.h" +#include "paddle/fluid/platform/errors.h" + +namespace gloo { +namespace rendezvous { + +HdfsStore::HdfsStore(const std::string& path) { + path_ = path; + wait_sleep_ms_ = 3000; + wait_timeout_ = std::chrono::seconds(999999999); +} + +void HdfsStore::set(const std::string& key, const std::vector& data) { +#ifdef PADDLE_WITH_GLOO + auto tmp = TmpPath(key); + auto path = ObjectPath(key); + bool is_exists = paddle::framework::fs_exists(path); + if (is_exists) { + LOG(WARNING) << "path exists, will be removed: " << path; + paddle::framework::fs_remove(path); + } + int err_no = 0; + std::shared_ptr fp = paddle::framework::fs_open_write(tmp, &err_no, ""); + size_t write_count = fwrite_unlocked(data.data(), 1, data.size(), fp.get()); + VLOG(3) << "HdfsStore::set write_count=" << write_count << " key " << key; + fp.reset(); + paddle::framework::fs_mv(tmp, path); +#endif +} + +std::vector HdfsStore::get(const std::string& key) { + auto path = ObjectPath(key); + std::vector result; +#ifdef PADDLE_WITH_GLOO + // block until key is set + wait({key}); + bool is_exists = paddle::framework::fs_exists(path); + PADDLE_ENFORCE_EQ(is_exists, true, + paddle::platform::errors::NotFound( + "HdfsStore::get, path not exists: " + path)); + int err_no = 0; + std::shared_ptr fp = paddle::framework::fs_open_read(path, &err_no, ""); + char buffer = '\0'; + size_t read_count = 0; + while (fread(&buffer, 1, 1, fp.get()) == 1) { + ++read_count; + result.push_back(buffer); + } + VLOG(3) << "HdfsStore::get read_count " << read_count; +#endif + return result; +} + +void HdfsStore::wait(const std::vector& keys) { +#ifdef PADDLE_WITH_GLOO + wait(keys, wait_timeout_); // NOLINT +#endif +} + +void HdfsStore::wait(const std::vector& keys, + const std::chrono::milliseconds&) { // NOLINT +#ifdef PADDLE_WITH_GLOO + auto start = std::chrono::steady_clock::now(); + while (!Check(keys)) { + auto elapsed = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start); + if (wait_timeout_ != gloo::kNoTimeout && elapsed > wait_timeout_) { + PADDLE_ENFORCE_EQ(0, 1, paddle::platform::errors::ExecutionTimeout( + "HdfsStore::wait, Wait timeout for key(s): " + + ::gloo::MakeString(keys))); + } + std::this_thread::sleep_for(std::chrono::milliseconds(wait_sleep_ms_)); + } +#endif +} + +std::string HdfsStore::EncodeName(const std::string& name) { + thread_local std::hash hash_func; + return std::to_string(hash_func(name)); +} + +std::string HdfsStore::TmpPath(const std::string& name) { + return path_ + "/" + EncodeName(name) + "_tmp"; +} + +std::string HdfsStore::ObjectPath(const std::string& name) { + return path_ + "/" + EncodeName(name); +} + +bool HdfsStore::Check(const std::vector& keys) { +#ifdef PADDLE_WITH_GLOO + std::vector paths; + for (const auto& key : keys) { + paths.push_back(ObjectPath(key)); + } + for (const auto& path : paths) { + bool is_exists = paddle::framework::fs_exists(path); + VLOG(3) << "HdfsStore::Check " << is_exists << " path " << path; + if (!is_exists) { + return false; + } + } +#endif + return true; +} + +} // namespace rendezvous +} // namespace gloo + +namespace paddle { +namespace framework { + +void GlooWrapper::Init(int rank, int size, const std::string& path, + const std::string& fs_name, const std::string& fs_ugi, + const std::string& iface, const std::string& prefix) { + if (is_initialized_) { + return; + } + rank_ = rank; + size_ = size; + std::string cmd = std::string("hadoop fs"); + cmd += " -D fs.default.name=" + fs_name; + cmd += " -D hadoop.job.ugi=" + fs_ugi; + paddle::framework::hdfs_set_command(cmd); +#ifdef PADDLE_WITH_GLOO + gloo::transport::tcp::attr attr; + attr.iface = iface; + auto file_store = gloo::rendezvous::HdfsStore(path); + auto prefix_store = gloo::rendezvous::PrefixStore(prefix, file_store); + auto dev = gloo::transport::tcp::CreateDevice(attr); + auto context = std::make_shared(rank, size); + context->setTimeout(file_store.wait_timeout_); + context->connectFullMesh(prefix_store, dev); + context_ = std::move(context); +#endif + is_initialized_ = true; +} + +template void GlooWrapper::AllReduce( + std::vector& sendbuf, // NOLINT + std::vector& recvbuf, // NOLINT + const std::string& mode); +template void GlooWrapper::AllReduce( + std::vector& sendbuf, // NOLINT + std::vector& recvbuf, // NOLINT + const std::string& mode); +template std::vector GlooWrapper::AllGather( + int64_t& input); // NOLINT +template std::vector GlooWrapper::AllGather( + double& input); // NOLINT + +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/fleet/gloo_wrapper.h b/paddle/fluid/framework/fleet/gloo_wrapper.h new file mode 100644 index 0000000000..a7d0526dcd --- /dev/null +++ b/paddle/fluid/framework/fleet/gloo_wrapper.h @@ -0,0 +1,161 @@ +/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once + +#if defined _WIN32 || defined __APPLE__ +#else +#define _LINUX +#endif + +#ifdef _LINUX +#include +#include +#endif +#include +#include +#include +#include +#include +#ifdef PADDLE_WITH_GLOO +#include +#include +#include +#include +#include +#include +#include +#include +#endif +#include "paddle/fluid/framework/variable_helper.h" + +namespace gloo { +namespace rendezvous { + +#ifdef PADDLE_WITH_GLOO +class HdfsStore : public gloo::rendezvous::Store { +#else +class HdfsStore { +#endif + public: // NOLINT + explicit HdfsStore(const std::string& path); + + virtual ~HdfsStore() {} + + virtual void set(const std::string& key, const std::vector& data); + + virtual std::vector get(const std::string& key); + + virtual void wait(const std::vector& keys); + + virtual void wait(const std::vector& keys, + const std::chrono::milliseconds& timeout); + + std::string EncodeName(const std::string& name); + + std::string TmpPath(const std::string& name); + + std::string ObjectPath(const std::string& name); + + bool Check(const std::vector& keys); + + std::string path_; + int wait_sleep_ms_; + std::chrono::seconds wait_timeout_; +}; + +} // namespace rendezvous +} // namespace gloo + +namespace paddle { +namespace framework { + +class GlooWrapper { + public: + GlooWrapper() {} + + virtual ~GlooWrapper() {} + + void Init(int rank, int size, const std::string& path, + const std::string& fs_name, const std::string& fs_ugi, + const std::string& iface, const std::string& prefix); + + int Rank() { + CHECK_EQ(is_initialized_, true); + return rank_; + } + + int Size() { + CHECK_EQ(is_initialized_, true); + return size_; + } + + void Barrier() { + CHECK_EQ(is_initialized_, true); +#ifdef PADDLE_WITH_GLOO + gloo::BarrierOptions opts(context_); + gloo::barrier(opts); +#endif + } + + template + void AllReduce(std::vector& sendbuf, std::vector& recvbuf, // NOLINT + const std::string& mode = "sum") { + CHECK_EQ(is_initialized_, true); + CHECK_EQ(sendbuf.size() == recvbuf.size(), true); +#ifdef PADDLE_WITH_GLOO + gloo::AllreduceOptions opts(context_); + opts.setInput(sendbuf.data(), sendbuf.size()); + opts.setOutput(recvbuf.data(), recvbuf.size()); + if (mode == "sum") { + opts.setReduceFunction( + static_cast( + &gloo::sum)); + } else if (mode == "max") { + opts.setReduceFunction( + static_cast( + &gloo::max)); + } else if (mode == "min") { + opts.setReduceFunction( + static_cast( + &gloo::min)); + } else { + PADDLE_ENFORCE_EQ(0, 1, paddle::platform::errors::InvalidArgument( + "AllReduce mode not known: " + mode)); + } + gloo::allreduce(opts); +#endif + } + + template + std::vector AllGather(T& input) { // NOLINT + CHECK_EQ(is_initialized_, true); + std::vector ret(size_, T()); +#ifdef PADDLE_WITH_GLOO + gloo::AllgatherOptions opts(context_); + opts.setInput(&input, 1); + opts.setOutput(ret.data(), size_); + gloo::allgather(opts); +#endif + return std::move(ret); + } + + protected: + bool is_initialized_ = false; +#ifdef PADDLE_WITH_GLOO + std::shared_ptr context_ = nullptr; +#endif + int rank_ = 0; + int size_ = 0; +}; + +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/fleet/test_fleet.cc b/paddle/fluid/framework/fleet/test_fleet.cc new file mode 100644 index 0000000000..42343beb24 --- /dev/null +++ b/paddle/fluid/framework/fleet/test_fleet.cc @@ -0,0 +1,66 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include "paddle/fluid/framework/fleet/fleet_wrapper.h" +#include "paddle/fluid/framework/fleet/gloo_wrapper.h" +#include "paddle/fluid/framework/io/fs.h" + +#if defined _WIN32 || defined __APPLE__ +#else +#define _LINUX +#endif + +TEST(TEST_GLOO, store_1) { +#ifdef _LINUX +#ifdef PADDLE_WITH_GLOO +#else + auto store = gloo::rendezvous::HdfsStore("./test_gllo_store"); + store.set("1", std::vector{'t', 'e', 's', 't'}); + store.get("1"); + try { + store.get("2"); + } catch (...) { + VLOG(3) << "catch expected error of not found"; + } + store.wait(std::vector{"test"}); + store.wait(std::vector{"test"}, std::chrono::milliseconds(0)); + store.EncodeName("1"); + store.TmpPath("1"); + store.ObjectPath("1"); + store.Check(std::vector{"test"}); + + auto gw = paddle::framework::GlooWrapper(); + gw.Init(0, 1, "", "", "", "", ""); + gw.Init(0, 1, "", "", "", "", ""); + gw.Rank(); + gw.Size(); + gw.Barrier(); + std::vector input; + std::vector output; + gw.AllReduce(input, output); + int64_t t; + gw.AllGather(t); +#endif +#endif +} + +TEST(TEST_FLEET, fleet_1) { + auto fleet = paddle::framework::FleetWrapper::GetInstance(); +#ifdef PADDLE_WITH_PSLIB +#else + fleet->RunServer("", 0); +#endif +} diff --git a/paddle/fluid/framework/io/CMakeLists.txt b/paddle/fluid/framework/io/CMakeLists.txt index 2baef77b9c..1fb2d3d259 100644 --- a/paddle/fluid/framework/io/CMakeLists.txt +++ b/paddle/fluid/framework/io/CMakeLists.txt @@ -1,2 +1,4 @@ cc_library(fs SRCS fs.cc DEPS string_helper glog boost) cc_library(shell SRCS shell.cc DEPS string_helper glog) + +cc_test(test_fs SRCS test_fs.cc DEPS fs shell) diff --git a/paddle/fluid/framework/io/fs.cc b/paddle/fluid/framework/io/fs.cc index d5bc5df256..91b67be460 100644 --- a/paddle/fluid/framework/io/fs.cc +++ b/paddle/fluid/framework/io/fs.cc @@ -196,6 +196,13 @@ void localfs_mkdir(const std::string& path) { shell_execute(string::format_string("mkdir -p %s", path.c_str())); } +void localfs_mv(const std::string& src, const std::string& dest) { + if (src == "" || dest == "") { + return; + } + shell_execute(string::format_string("mv %s %s", src.c_str(), dest.c_str())); +} + static size_t& hdfs_buffer_size_internal() { static size_t x = 0; return x; @@ -314,6 +321,14 @@ void hdfs_mkdir(const std::string& path) { hdfs_command().c_str(), path.c_str())); } +void hdfs_mv(const std::string& src, const std::string& dest) { + if (src == "" || dest == "") { + return; + } + shell_execute(string::format_string( + "%s -mv %s %s; true", hdfs_command().c_str(), src.c_str(), dest.c_str())); +} + int fs_select_internal(const std::string& path) { if (fs_begin_with_internal(path, "hdfs:")) { return 1; @@ -452,5 +467,19 @@ void fs_mkdir(const std::string& path) { LOG(FATAL) << "Not supported"; } } + +void fs_mv(const std::string& src, const std::string& dest) { + int s = fs_select_internal(src); + int d = fs_select_internal(dest); + CHECK_EQ(s, d); + switch (s) { + case 0: + return localfs_mv(src, dest); + + case 1: + return hdfs_mv(src, dest); + } +} + } // end namespace framework } // end namespace paddle diff --git a/paddle/fluid/framework/io/fs.h b/paddle/fluid/framework/io/fs.h index 3f0174701c..06ec11f5d1 100644 --- a/paddle/fluid/framework/io/fs.h +++ b/paddle/fluid/framework/io/fs.h @@ -50,6 +50,8 @@ extern bool localfs_exists(const std::string& path); extern void localfs_mkdir(const std::string& path); +extern void localfs_mv(const std::string& src, const std::string& dest); + // hdfs extern size_t hdfs_buffer_size(); @@ -75,6 +77,8 @@ extern bool hdfs_exists(const std::string& path); extern void hdfs_mkdir(const std::string& path); +extern void hdfs_mv(const std::string& src, const std::string& dest); + // aut-detect fs extern std::shared_ptr fs_open_read(const std::string& path, int* err_no, const std::string& converter); @@ -97,5 +101,8 @@ extern std::string fs_tail(const std::string& path); extern bool fs_exists(const std::string& path); extern void fs_mkdir(const std::string& path); + +extern void fs_mv(const std::string& src, const std::string& dest); + } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/io/test_fs.cc b/paddle/fluid/framework/io/test_fs.cc new file mode 100644 index 0000000000..49dee60320 --- /dev/null +++ b/paddle/fluid/framework/io/test_fs.cc @@ -0,0 +1,47 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include "paddle/fluid/framework/io/fs.h" + +#if defined _WIN32 || defined __APPLE__ +#else +#define _LINUX +#endif + +TEST(FS, mv) { +#ifdef _LINUX + std::ofstream out("src.txt"); + out.close(); + paddle::framework::fs_mv("src.txt", "dest.txt"); + paddle::framework::hdfs_mv("", ""); + paddle::framework::localfs_mv("", ""); + try { + paddle::framework::hdfs_mv("afs:/none", "afs:/none"); + } catch (...) { + VLOG(3) << "test hdfs_mv, catch expected errors of unknown path"; + } + try { + paddle::framework::fs_mv("afs:/none", "afs:/none"); + } catch (...) { + VLOG(3) << "test hdfs_mv, catch expected errors of unknown path"; + } + try { + paddle::framework::hdfs_mv("unknown:/none", "unknown:/none"); + } catch (...) { + VLOG(3) << "test hdfs_mv, catch expected errors of unknown prefix"; + } +#endif +} diff --git a/paddle/fluid/pybind/CMakeLists.txt b/paddle/fluid/pybind/CMakeLists.txt index 14dec53638..171d7752a7 100644 --- a/paddle/fluid/pybind/CMakeLists.txt +++ b/paddle/fluid/pybind/CMakeLists.txt @@ -1,6 +1,7 @@ set(PYBIND_DEPS pybind python proto_desc memory executor fleet_wrapper box_wrapper nccl_wrapper prune feed_fetch_method pass_builder parallel_executor profiler layer tracer engine scope_pool - analysis_predictor imperative_profiler imperative_flag save_load_util dlpack_tensor device_context) + analysis_predictor imperative_profiler imperative_flag save_load_util dlpack_tensor device_context + gloo_wrapper) if(NOT WIN32) set(PYBIND_DEPS ${PYBIND_DEPS} nccl_context data_loader) @@ -22,6 +23,7 @@ set(PYBIND_SRCS global_value_getter_setter.cc reader_py.cc fleet_wrapper_py.cc + gloo_wrapper_py.cc box_helper_py.cc nccl_wrapper_py.cc data_set_py.cc diff --git a/paddle/fluid/pybind/fleet_wrapper_py.cc b/paddle/fluid/pybind/fleet_wrapper_py.cc index 746f2bbea6..03e574bdd2 100644 --- a/paddle/fluid/pybind/fleet_wrapper_py.cc +++ b/paddle/fluid/pybind/fleet_wrapper_py.cc @@ -46,7 +46,11 @@ void BindFleetWrapper(py::module* m) { .def("push_dense", &framework::FleetWrapper::PushDenseVarsSync) .def("pull_dense", &framework::FleetWrapper::PullDenseVarsSync) .def("init_server", &framework::FleetWrapper::InitServer) - .def("run_server", &framework::FleetWrapper::RunServer) + .def("run_server", (uint64_t (framework::FleetWrapper::*)(void)) & + framework::FleetWrapper::RunServer) + .def("run_server", (uint64_t (framework::FleetWrapper::*)( // NOLINT + const std::string&, uint32_t)) & // NOLINT + framework::FleetWrapper::RunServer) .def("init_worker", &framework::FleetWrapper::InitWorker) .def("init_model", &framework::FleetWrapper::PushDenseParamSync) .def("save_model", &framework::FleetWrapper::SaveModel) diff --git a/paddle/fluid/pybind/gloo_wrapper_py.cc b/paddle/fluid/pybind/gloo_wrapper_py.cc new file mode 100644 index 0000000000..5d0b720fca --- /dev/null +++ b/paddle/fluid/pybind/gloo_wrapper_py.cc @@ -0,0 +1,48 @@ +/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ +#include + +#ifdef _POSIX_C_SOURCE +#undef _POSIX_C_SOURCE +#endif + +#ifdef _XOPEN_SOURCE +#undef _XOPEN_SOURCE +#endif + +#include +#include + +#include "paddle/fluid/framework/fleet/gloo_wrapper.h" +#include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/platform/place.h" +#include "paddle/fluid/pybind/gloo_wrapper_py.h" + +namespace py = pybind11; + +namespace paddle { +namespace pybind { +void BindGlooWrapper(py::module* m) { + py::class_(*m, "Gloo") + .def(py::init()) + .def("init", &framework::GlooWrapper::Init) + .def("rank", &framework::GlooWrapper::Rank) + .def("size", &framework::GlooWrapper::Size) + .def("barrier", &framework::GlooWrapper::Barrier) + .def("all_reduce", &framework::GlooWrapper::AllReduce) + .def("all_reduce", &framework::GlooWrapper::AllReduce) + .def("all_gather", &framework::GlooWrapper::AllGather) + .def("all_gather", &framework::GlooWrapper::AllGather) + .def("Allreduce", &framework::GlooWrapper::AllReduce) + .def("Allreduce", &framework::GlooWrapper::AllReduce); +} // end BindGlooWrapper +} // end namespace pybind +} // end namespace paddle diff --git a/paddle/fluid/pybind/gloo_wrapper_py.h b/paddle/fluid/pybind/gloo_wrapper_py.h new file mode 100644 index 0000000000..9166c7ec69 --- /dev/null +++ b/paddle/fluid/pybind/gloo_wrapper_py.h @@ -0,0 +1,28 @@ +// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "pybind11/pybind11.h" +#include "pybind11/stl.h" + +namespace py = pybind11; + +namespace paddle { +namespace pybind { + +void BindGlooWrapper(py::module* m); + +} // namespace pybind +} // namespace paddle diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 3762b5a419..c15f008dc8 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -62,6 +62,7 @@ limitations under the License. */ #include "paddle/fluid/pybind/exception.h" #include "paddle/fluid/pybind/fleet_wrapper_py.h" #include "paddle/fluid/pybind/global_value_getter_setter.h" +#include "paddle/fluid/pybind/gloo_wrapper_py.h" #include "paddle/fluid/pybind/imperative.h" #include "paddle/fluid/pybind/inference_api.h" #include "paddle/fluid/pybind/ir.h" @@ -2204,6 +2205,7 @@ All parameter, weight, gradient are variables in Paddle. .def("device_count", &ParallelExecutor::DeviceCount); BindFleetWrapper(&m); + BindGlooWrapper(&m); BindBoxHelper(&m); #ifndef _WIN32 BindNCCLWrapper(&m); -- GitLab