未验证 提交 44ad2f44 编写于 作者: C colourful-tree 提交者: GitHub

Merge pull request #14873 from colourful-tree/develop

add pslib(pserver) to paddle, an industrial scale high performance parameter server library
......@@ -65,6 +65,7 @@ option(WITH_GOLANG "Compile PaddlePaddle with GOLANG" OFF)
option(GLIDE_INSTALL "Download and install go dependencies " ON)
option(USE_NNPACK "Compile PaddlePaddle with NNPACK library" OFF)
option(WITH_DISTRIBUTE "Compile with distributed support" OFF)
option(WITH_PSLIB "Compile with pslib support" OFF)
option(USE_EIGEN_FOR_BLAS "Use matrix multiplication in Eigen" OFF)
option(EIGEN_USE_THREADS "Compile with multi-threaded Eigen" OFF)
option(WITH_ARM_FP16 "Use half precision support on armv8.2-a cpu" OFF)
......@@ -215,6 +216,12 @@ include(cupti)
include(external/gzstream)
endif (NOT WIN32)
if(WITH_PSLIB)
include(external/libmct)
include(external/pslib_brpc)
include(external/pslib)
endif(WITH_PSLIB)
if(WITH_DISTRIBUTE)
if(WITH_GRPC)
include(external/grpc)
......@@ -282,6 +289,12 @@ set(EXTERNAL_LIBS
${PYTHON_LIBRARIES}
)
if(WITH_PSLIB)
list(APPEND EXTERNAL_LIBS pslib)
list(APPEND EXTERNAL_LIBS pslib_brpc)
list(APPEND EXTERNAL_LIBS libmct)
endif(WITH_PSLIB)
if(WITH_AMD_GPU)
find_package(HIP)
include(hip)
......
......@@ -84,6 +84,10 @@ if(NOT WITH_GOLANG)
add_definitions(-DPADDLE_WITHOUT_GOLANG)
endif(NOT WITH_GOLANG)
if(WITH_PSLIB)
add_definitions(-DPADDLE_WITH_PSLIB)
endif()
if(WITH_GPU)
add_definitions(-DPADDLE_WITH_CUDA)
add_definitions(-DEIGEN_USE_GPU)
......
# Copyright (c) 2017 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
IF(NOT ${WITH_LIBMCT})
return()
ENDIF(NOT ${WITH_LIBMCT})
IF(WIN32 OR APPLE)
MESSAGE(WARNING
"Windows or Mac is not supported with LIBMCT in Paddle yet."
"Force WITH_LIBMCT=OFF")
SET(WITH_LIBMCT OFF CACHE STRING "Disable LIBMCT package in Windows and MacOS" FORCE)
return()
ENDIF()
INCLUDE(ExternalProject)
SET(LIBMCT_PROJECT "extern_libmct")
IF((NOT DEFINED LIBMCT_VER) OR (NOT DEFINED LIBMCT_URL))
MESSAGE(STATUS "use pre defined download url")
SET(LIBMCT_VER "0.1.0" CACHE STRING "" FORCE)
SET(LIBMCT_NAME "libmct" CACHE STRING "" FORCE)
SET(LIBMCT_URL "https://raw.githubusercontent.com/PaddlePaddle/Fleet/release/${LIBMCT_VER}/${LIBMCT_NAME}.tar.gz" CACHE STRING "" FORCE)
ENDIF()
MESSAGE(STATUS "LIBMCT_NAME: ${LIBMCT_NAME}, LIBMCT_URL: ${LIBMCT_URL}")
SET(LIBMCT_SOURCE_DIR "${THIRD_PARTY_PATH}/libmct")
SET(LIBMCT_DOWNLOAD_DIR "${LIBMCT_SOURCE_DIR}/src/${LIBMCT_PROJECT}")
SET(LIBMCT_DST_DIR "libmct")
SET(LIBMCT_INSTALL_ROOT "${THIRD_PARTY_PATH}/install")
SET(LIBMCT_INSTALL_DIR ${LIBMCT_INSTALL_ROOT}/${LIBMCT_DST_DIR})
SET(LIBMCT_ROOT ${LIBMCT_INSTALL_DIR})
SET(LIBMCT_INC_DIR ${LIBMCT_ROOT}/include)
SET(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_RPATH}" "${LIBMCT_ROOT}/lib")
INCLUDE_DIRECTORIES(${LIBMCT_INC_DIR})
FILE(WRITE ${LIBMCT_DOWNLOAD_DIR}/CMakeLists.txt
"PROJECT(LIBMCT)\n"
"cmake_minimum_required(VERSION 3.0)\n"
"install(DIRECTORY ${LIBMCT_NAME}/include ${LIBMCT_NAME}/lib \n"
" DESTINATION ${LIBMCT_DST_DIR})\n")
ExternalProject_Add(
${LIBMCT_PROJECT}
${EXTERNAL_PROJECT_LOG_ARGS}
PREFIX ${LIBMCT_SOURCE_DIR}
DOWNLOAD_DIR ${LIBMCT_DOWNLOAD_DIR}
DOWNLOAD_COMMAND wget --no-check-certificate ${LIBMCT_URL} -c -q -O ${LIBMCT_NAME}.tar.gz
&& tar zxvf ${LIBMCT_NAME}.tar.gz
DOWNLOAD_NO_PROGRESS 1
UPDATE_COMMAND ""
CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${LIBMCT_INSTALL_ROOT}
CMAKE_CACHE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=${LIBMCT_INSTALL_ROOT}
)
if (${CMAKE_VERSION} VERSION_LESS "3.3.0" OR NOT WIN32)
set(dummyfile ${CMAKE_CURRENT_BINARY_DIR}/boost_dummy.c)
file(WRITE ${dummyfile} "const char *dummy = \"${dummyfile}\";")
add_library(libmct STATIC ${dummyfile})
else()
add_library(libmct INTERFACE)
endif()
#ADD_LIBRARY(libmct SHARED IMPORTED GLOBAL)
ADD_DEPENDENCIES(libmct ${LIBMCT_PROJECT})
LIST(APPEND external_project_dependencies libmct)
# Copyright (c) 2017 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
IF(NOT ${WITH_PSLIB})
return()
ENDIF(NOT ${WITH_PSLIB})
IF(WIN32 OR APPLE)
MESSAGE(WARNING
"Windows or Mac is not supported with PSLIB in Paddle yet."
"Force WITH_PSLIB=OFF")
SET(WITH_PSLIB OFF CACHE STRING "Disable PSLIB package in Windows and MacOS" FORCE)
return()
ENDIF()
INCLUDE(ExternalProject)
SET(PSLIB_PROJECT "extern_pslib")
IF((NOT DEFINED PSLIB_VER) OR (NOT DEFINED PSLIB_URL))
MESSAGE(STATUS "use pre defined download url")
SET(PSLIB_VER "0.1.0" CACHE STRING "" FORCE)
SET(PSLIB_NAME "pslib" CACHE STRING "" FORCE)
SET(PSLIB_URL "https://raw.githubusercontent.com/PaddlePaddle/Fleet/release/${PSLIB_VER}/${PSLIB_NAME}.tar.gz" CACHE STRING "" FORCE)
ENDIF()
MESSAGE(STATUS "PSLIB_NAME: ${PSLIB_NAME}, PSLIB_URL: ${PSLIB_URL}")
SET(PSLIB_SOURCE_DIR "${THIRD_PARTY_PATH}/pslib")
SET(PSLIB_DOWNLOAD_DIR "${PSLIB_SOURCE_DIR}/src/${PSLIB_PROJECT}")
SET(PSLIB_DST_DIR "pslib")
SET(PSLIB_INSTALL_ROOT "${THIRD_PARTY_PATH}/install")
SET(PSLIB_INSTALL_DIR ${PSLIB_INSTALL_ROOT}/${PSLIB_DST_DIR})
SET(PSLIB_ROOT ${PSLIB_INSTALL_DIR})
SET(PSLIB_INC_DIR ${PSLIB_ROOT}/include)
SET(PSLIB_LIB_DIR ${PSLIB_ROOT}/lib)
SET(PSLIB_LIB ${PSLIB_LIB_DIR}/libps.so)
SET(PSLIB_IOMP_LIB ${PSLIB_LIB_DIR}/libiomp5.so) #todo what is this
SET(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_RPATH}" "${PSLIB_ROOT}/lib")
INCLUDE_DIRECTORIES(${PSLIB_INC_DIR})
FILE(WRITE ${PSLIB_DOWNLOAD_DIR}/CMakeLists.txt
"PROJECT(PSLIB)\n"
"cmake_minimum_required(VERSION 3.0)\n"
"install(DIRECTORY ${PSLIB_NAME}/include ${PSLIB_NAME}/lib \n"
" DESTINATION ${PSLIB_DST_DIR})\n")
ExternalProject_Add(
${PSLIB_PROJECT}
${EXTERNAL_PROJECT_LOG_ARGS}
PREFIX ${PSLIB_SOURCE_DIR}
DOWNLOAD_DIR ${PSLIB_DOWNLOAD_DIR}
DOWNLOAD_COMMAND wget --no-check-certificate ${PSLIB_URL} -c -q -O ${PSLIB_NAME}.tar.gz
&& tar zxvf ${PSLIB_NAME}.tar.gz
DOWNLOAD_NO_PROGRESS 1
UPDATE_COMMAND ""
CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${PSLIB_INSTALL_ROOT}
CMAKE_CACHE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=${PSLIB_INSTALL_ROOT}
)
ADD_LIBRARY(pslib SHARED IMPORTED GLOBAL)
SET_PROPERTY(TARGET pslib PROPERTY IMPORTED_LOCATION ${PSLIB_LIB})
ADD_DEPENDENCIES(pslib ${PSLIB_PROJECT})
LIST(APPEND external_project_dependencies pslib)
IF(WITH_C_API)
INSTALL(FILES ${PSLIB_LIB} ${PSLIB_IOMP_LIB} DESTINATION lib)
ENDIF()
# Copyright (c) 2017 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
IF(NOT ${WITH_PSLIB_BRPC})
return()
ENDIF(NOT ${WITH_PSLIB_BRPC})
IF(WIN32 OR APPLE)
MESSAGE(WARNING
"Windows or Mac is not supported with PSLIB_BRPC in Paddle yet."
"Force WITH_PSLIB_BRPC=OFF")
SET(WITH_PSLIB_BRPC OFF CACHE STRING "Disable PSLIB_BRPC package in Windows and MacOS" FORCE)
return()
ENDIF()
INCLUDE(ExternalProject)
SET(PSLIB_BRPC_PROJECT "extern_pslib_brpc")
IF((NOT DEFINED PSLIB_BRPC_NAME) OR (NOT DEFINED PSLIB_BRPC_URL))
MESSAGE(STATUS "use pre defined download url")
SET(PSLIB_BRPC_VER "0.1.0" CACHE STRING "" FORCE)
SET(PSLIB_BRPC_NAME "pslib_brpc" CACHE STRING "" FORCE)
SET(PSLIB_BRPC_URL "https://raw.githubusercontent.com/PaddlePaddle/Fleet/release/${PSLIB_BRPC_VER}/${PSLIB_BRPC_NAME}.tar.gz" CACHE STRING "" FORCE)
ENDIF()
MESSAGE(STATUS "PSLIB_BRPC_NAME: ${PSLIB_BRPC_NAME}, PSLIB_BRPC_URL: ${PSLIB_BRPC_URL}")
SET(PSLIB_BRPC_SOURCE_DIR "${THIRD_PARTY_PATH}/pslib_brpc")
SET(PSLIB_BRPC_DOWNLOAD_DIR "${PSLIB_BRPC_SOURCE_DIR}/src/${PSLIB_BRPC_PROJECT}")
SET(PSLIB_BRPC_DST_DIR "pslib_brpc")
SET(PSLIB_BRPC_INSTALL_ROOT "${THIRD_PARTY_PATH}/install")
SET(PSLIB_BRPC_INSTALL_DIR ${PSLIB_BRPC_INSTALL_ROOT}/${PSLIB_BRPC_DST_DIR})
SET(PSLIB_BRPC_ROOT ${PSLIB_BRPC_INSTALL_DIR})
SET(PSLIB_BRPC_INC_DIR ${PSLIB_BRPC_ROOT}/include)
SET(PSLIB_BRPC_LIB_DIR ${PSLIB_BRPC_ROOT}/lib)
SET(PSLIB_BRPC_LIB ${PSLIB_BRPC_LIB_DIR}/libbrpc.a)
SET(PSLIB_BRPC_IOMP_LIB ${PSLIB_BRPC_LIB_DIR}/libiomp5.so) #todo what is this
SET(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_RPATH}" "${PSLIB_BRPC_ROOT}/lib")
INCLUDE_DIRECTORIES(${PSLIB_BRPC_INC_DIR})
FILE(WRITE ${PSLIB_BRPC_DOWNLOAD_DIR}/CMakeLists.txt
"PROJECT(PSLIB_BRPC)\n"
"cmake_minimum_required(VERSION 3.0)\n"
"install(DIRECTORY ${PSLIB_BRPC_NAME}/include ${PSLIB_BRPC_NAME}/lib \n"
" DESTINATION ${PSLIB_BRPC_DST_DIR})\n")
ExternalProject_Add(
${PSLIB_BRPC_PROJECT}
${EXTERNAL_PROJECT_LOG_ARGS}
PREFIX ${PSLIB_BRPC_SOURCE_DIR}
DOWNLOAD_DIR ${PSLIB_BRPC_DOWNLOAD_DIR}
DOWNLOAD_COMMAND wget --no-check-certificate ${PSLIB_BRPC_URL} -c -q -O ${PSLIB_BRPC_NAME}.tar.gz
&& tar zxvf ${PSLIB_BRPC_NAME}.tar.gz
DOWNLOAD_NO_PROGRESS 1
UPDATE_COMMAND ""
CMAKE_ARGS -DCMAKE_INSTALL_PREFIX=${PSLIB_BRPC_INSTALL_ROOT}
CMAKE_CACHE_ARGS -DCMAKE_INSTALL_PREFIX:PATH=${PSLIB_BRPC_INSTALL_ROOT}
)
ADD_LIBRARY(pslib_brpc SHARED IMPORTED GLOBAL)
SET_PROPERTY(TARGET pslib_brpc PROPERTY IMPORTED_LOCATION ${PSLIB_BRPC_LIB})
ADD_DEPENDENCIES(pslib_brpc ${PSLIB_BRPC_PROJECT})
LIST(APPEND external_project_dependencies pslib_brpc)
IF(WITH_C_API)
INSTALL(FILES ${PSLIB_BRPC_LIB} ${PSLIB_BRPC_IOMP_LIB} DESTINATION lib)
ENDIF()
......@@ -37,8 +37,16 @@ paddle.fluid.DataFeedDesc.desc ArgSpec(args=['self'], varargs=None, keywords=Non
paddle.fluid.DataFeedDesc.set_batch_size ArgSpec(args=['self', 'batch_size'], varargs=None, keywords=None, defaults=None)
paddle.fluid.DataFeedDesc.set_dense_slots ArgSpec(args=['self', 'dense_slots_name'], varargs=None, keywords=None, defaults=None)
paddle.fluid.DataFeedDesc.set_use_slots ArgSpec(args=['self', 'use_slots_name'], varargs=None, keywords=None, defaults=None)
paddle.fluid.AsyncExecutor.__init__ ArgSpec(args=['self', 'place'], varargs=None, keywords=None, defaults=(None,))
paddle.fluid.AsyncExecutor.run ArgSpec(args=['self', 'program', 'data_feed', 'filelist', 'thread_num', 'fetch', 'debug'], varargs=None, keywords=None, defaults=(False,))
paddle.fluid.AsyncExecutor.__init__ ArgSpec(args=['self', 'place', 'run_mode'], varargs=None, keywords=None, defaults=(None, ''))
paddle.fluid.AsyncExecutor.config_distributed_nodes ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)
paddle.fluid.AsyncExecutor.download_data ArgSpec(args=['self', 'afs_path', 'local_path', 'fs_default_name', 'ugi', 'file_cnt', 'hadoop_home', 'process_num'], varargs=None, keywords=None, defaults=('$HADOOP_HOME', 12))
paddle.fluid.AsyncExecutor.get_instance ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)
paddle.fluid.AsyncExecutor.init_model ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)
paddle.fluid.AsyncExecutor.init_server ArgSpec(args=['self', 'dist_desc'], varargs=None, keywords=None, defaults=None)
paddle.fluid.AsyncExecutor.init_worker ArgSpec(args=['self', 'dist_desc', 'startup_program'], varargs=None, keywords=None, defaults=None)
paddle.fluid.AsyncExecutor.run ArgSpec(args=['self', 'program', 'data_feed', 'filelist', 'thread_num', 'fetch', 'mode', 'debug'], varargs=None, keywords=None, defaults=('', False))
paddle.fluid.AsyncExecutor.save_model ArgSpec(args=['self', 'save_path'], varargs=None, keywords=None, defaults=None)
paddle.fluid.AsyncExecutor.stop ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)
paddle.fluid.io.save_vars ArgSpec(args=['executor', 'dirname', 'main_program', 'vars', 'predicate', 'filename'], varargs=None, keywords=None, defaults=(None, None, None, None))
paddle.fluid.io.save_params ArgSpec(args=['executor', 'dirname', 'main_program', 'filename'], varargs=None, keywords=None, defaults=(None, None))
paddle.fluid.io.save_persistables ArgSpec(args=['executor', 'dirname', 'main_program', 'filename'], varargs=None, keywords=None, defaults=(None, None))
......
# windows treat symbolic file as a real file, which is different with unix
# We create a hidden file and compile it instead of origin source file.
#windows treat symbolic file as a real file, which is different with unix
#We create a hidden file and compile it instead of origin source file.
function(windows_symbolic TARGET)
set(oneValueArgs "")
set(multiValueArgs SRCS PATH)
cmake_parse_arguments(windows_symbolic "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN})
set(final_path ${CMAKE_CURRENT_SOURCE_DIR}/${windows_symbolic_PATH})
foreach(src ${windows_symbolic_SRCS})
get_filename_component(src ${src} NAME_WE)
if (NOT EXISTS ${final_path}/${src}.cc OR NOT EXISTS ${final_path}/${src}.cu)
message(FATAL " ${src}.cc and ${src}.cu must exsits, and ${src}.cu must be symbolic file.")
endif()
file(GENERATE OUTPUT ${final_path}/.${src}.cu INPUT ${final_path}/${src}.cc)
add_custom_command(OUTPUT ${final_path}/.${src}.cu
COMMAND ${CMAKE_COMMAND} -E remove ${final_path}/.${src}.cu
COMMAND ${CMAKE_COMMAND} -E copy "${final_path}/${src}.cc" "${final_path}/.${src}.cu"
COMMENT "create hidden file of ${src}.cu")
add_custom_target(${TARGET} ALL DEPENDS .${src}.cu)
get_filename_component(src ${src} NAME_WE)
if (NOT EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/${src}.cc OR NOT EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/${src}.cu)
message(FATAL " ${src}.cc and ${src}.cu must exsits, and ${src}.cu must be symbolic file.")
endif()
#only copy the xx.cu to.xx.cu when the content are modified
set(copy_flag 1)
if (EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/.${src}.cu)
file(READ ${CMAKE_CURRENT_SOURCE_DIR}/${src}.cc SOURCE_STR)
file(READ ${CMAKE_CURRENT_SOURCE_DIR}/.${src}.cu TARGET_STR)
if (SOURCE_STR STREQUAL TARGET_STR)
set(copy_flag 0)
endif()
endif()
if (copy_flag)
add_custom_command(OUTPUT .${src}.cu
COMMAND ${CMAKE_COMMAND} -E remove ${CMAKE_CURRENT_SOURCE_DIR}/.${src}.cu
COMMAND ${CMAKE_COMMAND} -E copy "${CMAKE_CURRENT_SOURCE_DIR}/${src}.cc" "${CMAKE_CURRENT_SOURCE_DIR}/.${src}.cu"
COMMENT "create hidden file of ${src}.cu")
endif(copy_flag)
add_custom_target(${TARGET} ALL DEPENDS .${src}.cu)
endforeach()
endfunction()
add_subdirectory(ir)
add_subdirectory(details)
# ddim lib
#ddim lib
proto_library(framework_proto SRCS framework.proto)
proto_library(async_executor_proto SRCS data_feed.proto)
......@@ -83,8 +92,8 @@ nv_test(data_device_transform_test SRCS data_device_transform_test.cu
if(WITH_GPU)
if (WIN32)
# windows treat symbolic file as a real file, which is different with unix
# We create a hidden file and compile it instead of origin source file.
#windows treat symbolic file as a real file, which is different with unix
#We create a hidden file and compile it instead of origin source file.
windows_symbolic(hidden_file SRCS data_type_transform.cu)
nv_library(data_type_transform SRCS .data_type_transform.cu DEPS tensor)
add_dependencies(data_type_transform hidden_file)
......@@ -135,7 +144,8 @@ cc_library(op_registry SRCS op_registry.cc DEPS op_proto_maker op_info operator
nv_test(op_registry_test SRCS op_registry_test.cc DEPS op_registry)
py_proto_compile(framework_py_proto SRCS framework.proto data_feed.proto)
# Generate an empty __init__.py to make framework_py_proto as a valid python module.
#Generate an empty \
#__init__.py to make framework_py_proto as a valid python module.
add_custom_target(framework_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py)
add_dependencies(framework_py_proto framework_py_proto_init)
if (NOT WIN32)
......@@ -187,7 +197,12 @@ cc_library(parallel_executor SRCS parallel_executor.cc DEPS
graph build_strategy
fast_threaded_ssa_graph_executor variable_helper)
cc_library(async_executor SRCS async_executor.cc data_feed.cc data_feed_factory.cc executor_thread_worker.cc DEPS op_registry device_context scope framework_proto glog lod_rank_table feed_fetch_method graph_to_program_pass async_executor_proto variable_helper)
if(WITH_PSLIB)
cc_library(async_executor SRCS async_executor.cc data_feed.cc data_feed_factory.cc executor_thread_worker.cc DEPS op_registry device_context scope framework_proto glog lod_rank_table feed_fetch_method graph_to_program_pass async_executor_proto variable_helper pslib_brpc pslib)
else()
cc_library(async_executor SRCS async_executor.cc data_feed.cc data_feed_factory.cc executor_thread_worker.cc DEPS op_registry device_context scope framework_proto glog lod_rank_table feed_fetch_method graph_to_program_pass async_executor_proto variable_helper)
endif(WITH_PSLIB)
cc_test(data_feed_test SRCS data_feed_test.cc DEPS async_executor)
cc_library(prune SRCS prune.cc DEPS framework_proto)
......
......@@ -29,6 +29,9 @@ limitations under the License. */
#include "paddle/fluid/inference/io.h"
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/pybind/pybind.h"
#ifdef PADDLE_WITH_PSLIB
#include <pslib.h>
#endif
namespace paddle {
namespace framework {
......@@ -47,6 +50,11 @@ void AsyncExecutor::CreateThreads(
worker->SetDataFeed(reader);
worker->SetFetchVarNames(fetch_var_names);
worker->BindingDataFeedMemory();
#ifdef PADDLE_WITH_PSLIB
worker->SetPSlibPtr(_pslib_ptr);
worker->SetPullDenseThread(_pull_dense_thread);
worker->SetParamConfig(&_param_config);
#endif
}
void PrepareReaders(std::vector<std::shared_ptr<DataFeed>>& readers, // NOLINT
......@@ -60,12 +68,177 @@ void PrepareReaders(std::vector<std::shared_ptr<DataFeed>>& readers, // NOLINT
readers[0]->SetFileList(filelist);
}
#ifdef PADDLE_WITH_PSLIB
void AsyncExecutor::InitServer(const std::string& dist_desc, int index) {
_pslib_ptr = std::shared_ptr<paddle::distributed::PSlib>(
new paddle::distributed::PSlib());
_pslib_ptr->init_server(dist_desc, index);
InitParamConfig();
}
void AsyncExecutor::InitWorker(const std::string& dist_desc,
const std::vector<uint64_t>& host_sign_list,
int node_num, int index) {
_pslib_ptr = std::shared_ptr<paddle::distributed::PSlib>(
new paddle::distributed::PSlib());
_pslib_ptr->init_worker(
dist_desc, const_cast<uint64_t*>(host_sign_list.data()), node_num, index);
InitParamConfig();
}
uint64_t AsyncExecutor::StartServer() { return _pslib_ptr->run_server(); }
void AsyncExecutor::StopServer() { _pslib_ptr->stop_server(); }
void AsyncExecutor::GatherServers(const std::vector<uint64_t>& host_sign_list,
int node_num) {
_pslib_ptr->gather_servers(const_cast<uint64_t*>(host_sign_list.data()),
node_num);
}
void AsyncExecutor::InitParamConfig() {
for (int i = 0; i < _pslib_ptr->get_param()
->server_param()
.downpour_server_param()
.downpour_table_param_size();
++i) {
if (_pslib_ptr->get_param()
->server_param()
.downpour_server_param()
.downpour_table_param(i)
.table_class()
.find("SparseTable") != -1) {
_param_config.fea_dim = _pslib_ptr->get_param()
->server_param()
.downpour_server_param()
.downpour_table_param(i)
.accessor()
.fea_dim();
break;
}
}
_param_config.slot_dim = _param_config.fea_dim - 2;
_param_config.tmp_push_dense_wait_times = static_cast<int32_t>(
_pslib_ptr->get_param()->trainer_param().push_dense_per_batch());
_param_config.tmp_push_sparse_wait_times = static_cast<int32_t>(
_pslib_ptr->get_param()->trainer_param().push_sparse_per_batch());
for (auto t = 0u; t < _pslib_ptr->get_param()->trainer_param().skip_op_size();
++t) {
_param_config.skip_op.push_back(
_pslib_ptr->get_param()->trainer_param().skip_op(t));
}
for (auto t = 0u;
t < _pslib_ptr->get_param()->trainer_param().sparse_table_size(); ++t) {
auto& table = _pslib_ptr->get_param()->trainer_param().sparse_table(t);
std::vector<std::string> tmp_sparse_variable_name;
for (int i = 0u; i < table.slot_value_size(); ++i) {
tmp_sparse_variable_name.push_back(table.slot_value(i));
_param_config.slot_alias_to_table[table.slot_key(i)] = table.table_id();
}
std::vector<std::string> tmp_sparse_gradient_variable_name;
for (auto i = 0u; i < table.slot_gradient_size(); ++i) {
tmp_sparse_gradient_variable_name.push_back(table.slot_gradient(i));
}
_param_config.slot_input_vec[table.table_id()] =
std::move(tmp_sparse_variable_name);
_param_config.gradient_var[table.table_id()] =
std::move(tmp_sparse_gradient_variable_name);
_param_config.sparse_table_id.push_back(table.table_id());
}
for (auto t = 0u;
t < _pslib_ptr->get_param()->trainer_param().dense_table_size(); ++t) {
auto& table = _pslib_ptr->get_param()->trainer_param().dense_table(t);
std::vector<std::string> tmp_dense_variable_name;
for (int i = 0u; i < table.dense_variable_name_size(); ++i) {
tmp_dense_variable_name.push_back(table.dense_variable_name(i));
}
std::vector<std::string> tmp_dense_gradient_variable_name;
for (auto i = 0u; i < table.dense_gradient_variable_name_size(); ++i) {
tmp_dense_gradient_variable_name.push_back(
table.dense_gradient_variable_name(i));
}
_param_config.dense_variable_name[table.table_id()] =
std::move(tmp_dense_variable_name);
_param_config.dense_gradient_variable_name[table.table_id()] =
std::move(tmp_dense_gradient_variable_name);
_param_config.dense_table_id.push_back(table.table_id());
_param_config.dense_table_size.push_back(table.fea_dim());
}
}
void AsyncExecutor::InitModel() {
for (auto table_id : _param_config.dense_table_id) {
std::vector<paddle::ps::Region> regions;
for (auto& t : _param_config.dense_variable_name[table_id]) {
Variable* var = root_scope_->FindVar(t);
CHECK(var != nullptr) << "var[" << t << "] not found";
LoDTensor* tensor = var->GetMutable<LoDTensor>();
float* g = tensor->data<float>();
CHECK(g != nullptr) << "var[" << t << "] value not initialized";
float init_range = 0.2;
int rown = tensor->dims()[0];
init_range /= sqrt(rown);
std::normal_distribution<float> ndistr(0.0, 1.0);
for (auto i = 0u; i < tensor->numel(); ++i) {
g[i] = ndistr(local_random_engine()) * init_range;
}
paddle::ps::Region reg(g, tensor->numel());
regions.emplace_back(std::move(reg));
}
auto push_status = _pslib_ptr->_worker_ptr->push_dense_param(
regions.data(), regions.size(), table_id);
push_status.wait();
auto status = push_status.get();
if (status != 0) {
LOG(FATAL) << "push dense param failed, status[" << status << "]";
exit(-1);
}
}
}
void AsyncExecutor::SaveModel(const std::string& path) {
auto ret = _pslib_ptr->_worker_ptr->flush();
ret.wait();
ret = _pslib_ptr->_worker_ptr->save(path, 0);
ret.wait();
int32_t feasign_cnt = ret.get();
if (feasign_cnt == -1) { // (colourful-tree) TODO should be feasign_cnt < 0
LOG(FATAL) << "save model failed";
exit(-1);
}
}
void AsyncExecutor::PrepareDenseThread(const std::string& mode) {
if (mode == "mpi") {
DensePullThreadParam param;
param.ps_client = _pslib_ptr->_worker_ptr;
param.threshold = 1;
param.training_thread_num = actual_thread_num;
param.root_scope = root_scope_;
param.dense_params = &_param_config.dense_variable_name;
_pull_dense_thread =
std::shared_ptr<DensePullThread>(new DensePullThread(param));
_pull_dense_thread->start();
}
}
#endif
void AsyncExecutor::RunFromFile(const ProgramDesc& main_program,
const std::string& data_feed_desc_str,
const std::vector<std::string>& filelist,
const int thread_num,
const std::vector<std::string>& fetch_var_names,
const bool debug) {
const std::string& mode, const bool debug) {
std::vector<std::thread> threads;
auto& block = main_program.Block(0);
......@@ -82,7 +255,7 @@ void AsyncExecutor::RunFromFile(const ProgramDesc& main_program,
google::protobuf::TextFormat::ParseFromString(data_feed_desc_str,
&data_feed_desc);
int actual_thread_num = thread_num;
actual_thread_num = thread_num;
int file_cnt = filelist.size();
PADDLE_ENFORCE(file_cnt > 0, "File list cannot be empty");
......@@ -106,11 +279,21 @@ void AsyncExecutor::RunFromFile(const ProgramDesc& main_program,
// todo: should be factory method for creating datafeed
std::vector<std::shared_ptr<DataFeed>> readers;
PrepareReaders(readers, actual_thread_num, data_feed_desc, filelist);
#ifdef PADDLE_WITH_PSLIB
PrepareDenseThread(mode);
#endif
std::vector<std::shared_ptr<ExecutorThreadWorker>> workers;
workers.resize(actual_thread_num);
for (auto& worker : workers) {
#ifdef PADDLE_WITH_PSLIB
if (mode == "mpi") {
worker.reset(new AsyncExecutorThreadWorker);
} else {
worker.reset(new ExecutorThreadWorker);
}
#else
worker.reset(new ExecutorThreadWorker);
#endif
}
// prepare thread resource here
......@@ -128,7 +311,11 @@ void AsyncExecutor::RunFromFile(const ProgramDesc& main_program,
for (auto& th : threads) {
th.join();
}
#ifdef PADDLE_WITH_PSLIB
if (mode == "mpi") {
_pull_dense_thread->stop();
}
#endif
root_scope_->DropKids();
return;
......
......@@ -14,9 +14,11 @@ limitations under the License. */
#pragma once
#include <time.h>
#include <map>
#include <memory>
#include <mutex> // NOLINT
#include <mutex> // NOLINT
#include <random> // local_random_engine
#include <set>
#include <string>
#include <thread> // NOLINT
......@@ -30,6 +32,31 @@ limitations under the License. */
namespace paddle {
namespace framework {
inline double current_realtime() {
#if !defined(_WIN32)
struct timespec tp;
clock_gettime(CLOCK_REALTIME, &tp);
return tp.tv_sec + tp.tv_nsec * 1e-9;
#else
return 0.0;
#endif
}
inline std::default_random_engine& local_random_engine() {
struct engine_wrapper_t {
std::default_random_engine engine;
engine_wrapper_t() {
static std::atomic<uint64_t> x(0);
std::seed_seq sseq = {x++, x++, x++,
static_cast<uint64_t>(current_realtime() * 1000)};
engine.seed(sseq);
}
};
thread_local engine_wrapper_t r;
return r.engine;
}
class AsyncExecutor {
public:
AsyncExecutor(Scope* scope, const platform::Place& place);
......@@ -39,7 +66,19 @@ class AsyncExecutor {
const std::vector<std::string>& filelist,
const int thread_num,
const std::vector<std::string>& fetch_names,
const bool debug = false);
const std::string& mode, const bool debug = false);
#ifdef PADDLE_WITH_PSLIB
void InitServer(const std::string& dist_desc, int index);
void InitWorker(const std::string& dist_desc,
const std::vector<uint64_t>& host_sign_list, int node_num,
int index);
uint64_t StartServer();
void StopServer();
void GatherServers(const std::vector<uint64_t>& host_sign_list, int node_num);
void InitModel();
void SaveModel(const std::string& path);
void InitParamConfig();
#endif
private:
void CreateThreads(ExecutorThreadWorker* worker,
......@@ -48,10 +87,21 @@ class AsyncExecutor {
const std::vector<std::string>& fetch_var_names,
Scope* root_scope, const int thread_index,
const bool debug);
#ifdef PADDLE_WITH_PSLIB
void PrepareDenseThread(const std::string& mode);
#endif
public:
#ifdef PADDLE_WITH_PSLIB
std::shared_ptr<paddle::distributed::PSlib> _pslib_ptr;
std::shared_ptr<DensePullThread> _pull_dense_thread;
AsyncWorkerParamConfig _param_config;
#endif
Scope* root_scope_;
platform::Place place_;
private:
int actual_thread_num;
};
} // namespace framework
......
......@@ -64,6 +64,7 @@ bool DataFeed::PickOneFile(std::string* filename) {
return false;
}
*filename = filelist_[file_idx_++];
LOG(ERROR) << "pick file:" << *filename;
return true;
}
......
......@@ -13,6 +13,7 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/framework/executor_thread_worker.h"
#include <algorithm>
#include "google/protobuf/io/zero_copy_stream_impl.h"
#include "google/protobuf/message.h"
#include "google/protobuf/text_format.h"
......@@ -32,6 +33,89 @@ limitations under the License. */
namespace paddle {
namespace framework {
#ifdef PADDLE_WITH_PSLIB
int DensePullThread::start() {
_running = true;
_t = std::thread(&DensePullThread::run, this);
return 0;
}
void DensePullThread::run() {
while (_running) {
_pull_dense_status.resize(0);
for (auto& t : _dense_variable_name) {
if (check_update_param(t.first)) {
auto status = pull_dense(t.first);
_pull_dense_status.emplace_back(std::move(status));
reset_thread_version(t.first);
}
}
if (_pull_dense_status.size() != 0) {
wait_all();
}
usleep(_sleep_time_ms * 1000);
}
}
bool DensePullThread::check_update_param(uint64_t table_id) {
{
std::lock_guard<std::mutex> lock(_mutex_for_version);
auto& version = _training_versions[table_id];
_current_version[table_id] =
*(std::min_element(version.begin(), version.end()));
}
if (_current_version[table_id] - _last_versions[table_id] < _threshold) {
return false;
}
return true;
}
void DensePullThread::reset_thread_version(uint64_t table_id) {
std::lock_guard<std::mutex> lock(_mutex_for_version);
_last_versions[table_id] = _current_version[table_id];
}
std::future<int32_t> DensePullThread::pull_dense(uint64_t table_id) {
auto& regions = _regions[table_id];
regions.clear();
auto& variables = _dense_variable_name[table_id];
regions.resize(variables.size());
for (auto i = 0u; i < variables.size(); ++i) {
auto& t = variables[i];
Variable* var = _root_scope->FindVar(t);
LoDTensor* tensor = var->GetMutable<LoDTensor>();
float* w = tensor->data<float>();
paddle::ps::Region reg(w, tensor->numel());
regions[i] = std::move(reg);
}
return _ps_client->pull_dense(regions.data(), regions.size(), table_id);
}
void DensePullThread::wait_all() {
for (auto& t : _pull_dense_status) {
t.wait();
auto status = t.get();
if (status != 0) {
LOG(WARNING) << "pull dense failed times:" << ++_pull_dense_fail_times;
}
}
if (_pull_dense_fail_times > 20) {
LOG(FATAL) << "pull dense failed times more than 20 times";
exit(-1);
}
_pull_dense_status.resize(0);
}
void DensePullThread::increase_thread_version(int thread_id,
uint64_t table_id) {
std::lock_guard<std::mutex> lock(_mutex_for_version);
_training_versions[table_id][thread_id]++;
}
#endif
void ExecutorThreadWorker::CreateThreadOperators(const ProgramDesc& program) {
auto& block = program.Block(0);
op_names_.clear();
......@@ -202,5 +286,358 @@ void ExecutorThreadWorker::SetRootScope(Scope* g_scope) {
root_scope_ = g_scope;
}
#ifdef PADDLE_WITH_PSLIB
// AsyncExecutor
void AsyncExecutorThreadWorker::TrainFiles() {
SetDevice();
int fetch_var_num = fetch_var_names_.size();
fetch_values_.clear();
fetch_values_.resize(fetch_var_num);
thread_reader_->Start();
int cur_batch;
int batch_cnt = 0;
while ((cur_batch = thread_reader_->Next()) > 0) {
// executor run here
TrainOneNetwork();
++batch_cnt;
thread_scope_->DropKids();
if (debug_ == false || thread_id_ != 0) {
continue;
}
for (int i = 0; i < fetch_var_num; ++i) {
print_fetch_var(thread_scope_, fetch_var_names_[i]);
} // end for (int i = 0...)
} // end while ()
}
void AsyncExecutorThreadWorker::SetPSlibPtr(
std::shared_ptr<paddle::distributed::PSlib> pslib_ptr) {
_pslib_ptr = pslib_ptr;
}
void AsyncExecutorThreadWorker::SetPullDenseThread(
std::shared_ptr<DensePullThread> dpt) {
_pull_dense_thread = dpt;
}
void AsyncExecutorThreadWorker::TrainOneNetwork() {
PrepareParams();
for (auto& op : ops_) {
if (op->Type().find("sgd") != std::string::npos) {
continue;
}
bool need_skip = false;
for (auto t = 0u; t < _param_config->skip_op.size(); ++t) {
if (op->Type().find(_param_config->skip_op[t]) != std::string::npos) {
need_skip = true;
break;
}
}
if (!need_skip) {
op->Run(*thread_scope_, place_);
}
}
UpdateParams();
}
void AsyncExecutorThreadWorker::SetParamConfig(
AsyncWorkerParamConfig* param_config) {
_param_config = param_config;
}
void AsyncExecutorThreadWorker::PrepareParams() {
for (auto table_id : _param_config->sparse_table_id) {
PullSparse(table_id);
for (auto& t : _pull_sparse_status) {
t.wait();
auto status = t.get();
if (status != 0) {
LOG(ERROR) << "pull sparse failed, status[" << status << "]";
exit(-1);
}
}
}
_pull_sparse_status.resize(0);
for (auto table_id : _param_config->sparse_table_id) {
FillSparse(table_id);
}
}
void AsyncExecutorThreadWorker::UpdateParams() {
for (auto i : _param_config->sparse_table_id) {
PushSparse(i);
}
for (auto i : _param_config->dense_table_id) {
PushDense(i);
}
int32_t tmp_push_dense_wait_times = -1;
int32_t tmp_push_sparse_wait_times = -1;
static uint32_t push_dense_wait_times =
static_cast<uint32_t>(tmp_push_dense_wait_times);
static uint32_t push_sparse_wait_times =
static_cast<uint32_t>(tmp_push_sparse_wait_times);
if (_push_dense_status.size() >= push_dense_wait_times) {
for (auto& t : _push_dense_status) {
t.wait();
}
_push_dense_status.resize(0);
}
if (tmp_push_dense_wait_times == -1) {
_push_dense_status.resize(0);
}
if (_push_sparse_status.size() >= push_sparse_wait_times) {
for (auto& t : _push_sparse_status) {
t.wait();
}
_push_sparse_status.resize(0);
}
if (tmp_push_sparse_wait_times == -1) {
_push_sparse_status.resize(0);
}
for (auto dense_table_id : _param_config->dense_table_id) {
_pull_dense_thread->increase_thread_version(thread_id_, dense_table_id);
}
}
void AsyncExecutorThreadWorker::PushDense(int table_id) {
std::vector<paddle::ps::Region> regions;
for (auto& t : _param_config->dense_gradient_variable_name[table_id]) {
Variable* var = thread_scope_->FindVar(t);
CHECK(var != nullptr) << "var[" << t << "] not found";
LoDTensor* tensor = var->GetMutable<LoDTensor>();
int count = tensor->numel();
float* g = tensor->data<float>();
paddle::ps::Region reg(g, count);
regions.emplace_back(std::move(reg));
}
auto status = _pslib_ptr->_worker_ptr->push_dense(regions.data(),
regions.size(), table_id);
_push_dense_status.push_back(std::move(status));
}
void AsyncExecutorThreadWorker::PullSparse(int table_id) {
auto& features = _features[table_id];
auto& feature_value = _feature_value[table_id];
auto fea_dim = _param_config->fea_dim;
// slot id starts from 1
features.clear();
features.resize(0);
features.reserve(MAX_FEASIGN_NUM);
const std::vector<std::string>& feed_vec = thread_reader_->GetUseSlotAlias();
// slot_idx = 0 is label TODO
for (auto slot_idx = 1u; slot_idx < feed_vec.size(); ++slot_idx) {
Variable* var = thread_scope_->FindVar(feed_vec[slot_idx]);
LoDTensor* tensor = var->GetMutable<LoDTensor>();
int64_t* ids = tensor->data<int64_t>();
int len = tensor->numel();
for (auto i = 0u; i < len; ++i) {
// todo(colourful-tree): current trick - filter feasign=use_slot_mod(
// bug: datafeed fill use_slot_mod for empty slot)
if (ids[i] == 0u) {
continue;
}
features.push_back(static_cast<uint64_t>(ids[i]));
}
}
check_pull_push_memory(features, &feature_value, fea_dim);
std::vector<float*> pull_feature_value;
for (auto i = 0u; i < features.size(); ++i) {
pull_feature_value.push_back(feature_value[i].data());
}
auto status = _pslib_ptr->_worker_ptr->pull_sparse(
pull_feature_value.data(), table_id, features.data(), features.size());
_pull_sparse_status.push_back(std::move(status));
auto& push_g = _feature_push_value[table_id];
check_pull_push_memory(features, &push_g, fea_dim);
collect_feasign_info(table_id);
}
void AsyncExecutorThreadWorker::FillSparse(int table_id) {
auto slot_dim = _param_config->slot_dim;
auto fea_dim = _param_config->fea_dim;
auto& features = _features[table_id];
auto& fea_value = _feature_value[table_id];
CHECK(features.size() > 0) << "feature size check failed";
auto fea_idx = 0u;
std::vector<float> init_value(fea_dim);
const std::vector<std::string>& feed_vec = thread_reader_->GetUseSlotAlias();
// slot_idx = 0 is label TODO
for (auto slot_idx = 1u; slot_idx < feed_vec.size(); ++slot_idx) {
Variable* var = thread_scope_->FindVar(feed_vec[slot_idx]);
LoDTensor* tensor = var->GetMutable<LoDTensor>();
int64_t* ids = tensor->data<int64_t>();
int len = tensor->numel();
Variable* var_emb = thread_scope_->FindVar(
_param_config->slot_input_vec[table_id][slot_idx - 1]);
LoDTensor* tensor_emb = var_emb->GetMutable<LoDTensor>();
float* ptr =
tensor_emb->mutable_data<float>({len, slot_dim}, platform::CPUPlace());
memset(ptr, 0, sizeof(float) * len * slot_dim);
auto& tensor_lod = tensor->lod()[0];
LoD data_lod{tensor_lod};
tensor_emb->set_lod(data_lod);
for (auto index = 0u; index < len; ++index) {
if (ids[index] == 0u) {
memcpy(ptr + slot_dim * index, init_value.data() + 2,
sizeof(float) * slot_dim);
continue;
}
memcpy(ptr + slot_dim * index, fea_value[fea_idx].data() + 2,
sizeof(float) * slot_dim);
fea_idx++;
}
}
}
void AsyncExecutorThreadWorker::PushSparse(int table_id) {
auto slot_dim = _param_config->slot_dim;
auto fea_dim = _param_config->fea_dim;
auto& features = _features[table_id];
auto& push_g = _feature_push_value[table_id];
check_pull_push_memory(features, &push_g, fea_dim);
CHECK(push_g.size() == features.size() + 1)
<< "push_g size:" << push_g.size()
<< " features size:" << features.size();
uint64_t fea_idx = 0u;
auto& fea_info = _fea_info[table_id];
int offset = 2;
const std::vector<std::string>& feed_vec = thread_reader_->GetUseSlotAlias();
// slot_idx = 0 is label
for (auto slot_idx = 1u; slot_idx < feed_vec.size(); ++slot_idx) {
if (_param_config->slot_alias_to_table.find(feed_vec[slot_idx]) ==
_param_config->slot_alias_to_table.end()) {
LOG(ERROR) << "ERROR slot_idx:" << slot_idx
<< " name:" << feed_vec[slot_idx];
} else if (_param_config->slot_alias_to_table[feed_vec[slot_idx]] !=
table_id) {
continue;
}
Variable* g_var = thread_scope_->FindVar(
_param_config->gradient_var[table_id][slot_idx - 1]);
CHECK(g_var != nullptr)
<< "var[" << _param_config->gradient_var[table_id][slot_idx - 1]
<< "] not found";
LoDTensor* g_tensor = g_var->GetMutable<LoDTensor>();
if (g_tensor == NULL) {
LOG(ERROR) << "var["
<< _param_config->gradient_var[table_id][slot_idx - 1]
<< "] not found";
exit(-1);
}
float* g = g_tensor->data<float>();
Variable* var = thread_scope_->FindVar(feed_vec[slot_idx]);
CHECK(var != nullptr) << "var[" << feed_vec[slot_idx] << "] not found";
LoDTensor* tensor = var->GetMutable<LoDTensor>();
if (tensor == NULL) {
LOG(ERROR) << "var[" << feed_vec[slot_idx] << "] not found";
exit(-1);
}
int len = tensor->numel();
CHECK(slot_dim * len == g_tensor->numel())
<< "len:" << len << " g_numel:" << g_tensor->numel();
CHECK(len == tensor->numel()) << "len:" << len
<< "t_numel:" << tensor->numel();
int64_t* ids = tensor->data<int64_t>();
for (auto id_idx = 0u; id_idx < len; ++id_idx) {
if (ids[id_idx] == 0) {
g += slot_dim;
continue;
}
memcpy(push_g[fea_idx].data() + offset, g, sizeof(float) * slot_dim);
push_g[fea_idx][0] = 1.0f;
CHECK(fea_idx < fea_info.size()) << "fea_idx:" << fea_idx
<< " size:" << fea_info.size();
push_g[fea_idx][1] = static_cast<float>(fea_info[fea_idx].label);
g += slot_dim;
fea_idx++;
}
}
CHECK(fea_idx == features.size()) << "fea_idx:" << fea_idx
<< " features size:" << features.size();
CHECK_GT(features.size(), 0);
std::vector<float*> push_g_vec;
for (auto i = 0u; i < features.size(); ++i) {
push_g_vec.push_back(push_g[i].data());
}
auto status = _pslib_ptr->_worker_ptr->push_sparse(
table_id, features.data(), (const float**)push_g_vec.data(),
features.size());
_push_sparse_status.push_back(std::move(status));
}
void AsyncExecutorThreadWorker::collect_feasign_info(int table_id) {
auto& fea_info = _fea_info[table_id];
auto& feature = _features[table_id];
fea_info.resize(feature.size());
const std::vector<std::string>& feed_vec = thread_reader_->GetUseSlotAlias();
Variable* var = thread_scope_->FindVar(feed_vec[0]);
LoDTensor* tensor = var->GetMutable<LoDTensor>();
int64_t* label = tensor->data<int64_t>();
int global_index = 0;
for (auto slot_idx = 1u; slot_idx < feed_vec.size(); ++slot_idx) {
Variable* var = thread_scope_->FindVar(feed_vec[slot_idx]);
LoDTensor* tensor = var->GetMutable<LoDTensor>();
int64_t* ids = tensor->data<int64_t>();
int fea_idx = 0;
for (auto ins_idx = 1u; ins_idx < tensor->lod()[0].size(); ++ins_idx) {
for (; fea_idx < tensor->lod()[0][ins_idx]; ++fea_idx) {
if (ids[fea_idx] == 0u) {
continue;
}
FeasignInfo info{slot_idx, ins_idx, label[ins_idx - 1]};
fea_info[global_index++] = std::move(info);
}
}
}
CHECK(global_index == feature.size())
<< "expect fea info size:" << feature.size() << " real:" << global_index;
}
void AsyncExecutorThreadWorker::check_pull_push_memory(
const std::vector<uint64_t>& features,
std::vector<std::vector<float>>* push_g, int dim) {
push_g->resize(features.size() + 1);
for (auto& t : *push_g) {
t.resize(dim);
}
}
void AsyncExecutorThreadWorker::check_pull_push_memory(
const std::vector<uint64_t>& features, std::vector<float*>* push_g,
int dim) {
if (features.size() > push_g->size()) {
push_g->reserve(features.size() + 1);
auto size = features.size() - push_g->size() + 1;
for (auto i = 0u; i < size; ++i) {
float* ptr = new float[dim];
push_g->push_back(ptr);
}
}
}
#endif
} // einit_modelnd namespace framework
} // end namespace paddle
......@@ -25,16 +25,119 @@ limitations under the License. */
#include "paddle/fluid/framework/executor.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#ifdef PADDLE_WITH_PSLIB
#include <pslib.h>
#endif
namespace paddle {
namespace framework {
void CreateTensor(Variable* var, proto::VarType::Type var_type);
#ifdef PADDLE_WITH_PSLIB
static const uint32_t MAX_FEASIGN_NUM = 1000 * 100 * 100;
struct AsyncWorkerParamConfig {
int slot_dim;
int fea_dim;
int32_t tmp_push_dense_wait_times;
int32_t tmp_push_sparse_wait_times;
std::vector<std::string> skip_op;
std::map<uint64_t, std::vector<std::string>> dense_variable_name;
std::map<uint64_t, std::vector<std::string>> dense_gradient_variable_name;
std::vector<int> dense_table_id;
// fea_dim for each dense table
std::vector<uint32_t> dense_table_size;
std::vector<int> sparse_table_id;
std::map<uint64_t, std::vector<std::string>> slot_input_vec;
std::map<uint64_t, std::vector<std::string>> gradient_var;
std::map<std::string, uint64_t> slot_alias_to_table;
};
struct DensePullThreadParam {
std::shared_ptr<paddle::ps::PSClient> ps_client;
int threshold;
int training_thread_num;
Scope* root_scope;
std::map<uint64_t, std::vector<std::string>>* dense_params;
int sleep_time_ms = 2;
};
class DensePullThread {
public:
explicit DensePullThread(const DensePullThreadParam& param)
: _running(false) {
_ps_client = param.ps_client;
_threshold = param.threshold;
_thread_num = param.training_thread_num;
_root_scope = param.root_scope;
_sleep_time_ms = param.sleep_time_ms;
for (auto& t : *param.dense_params) {
_dense_variable_name[t.first].insert(_dense_variable_name[t.first].end(),
t.second.begin(), t.second.end());
_training_versions[t.first].resize(_thread_num, 0);
_last_versions[t.first] = 0;
_current_version[t.first] = 0;
}
}
int start();
void stop() {
if (_running) {
_running = false;
_t.join();
}
}
void increase_thread_version(int thread_id, uint64_t table_id);
void reset_thread_version(uint64_t table_id);
std::future<int32_t> pull_dense(uint64_t table_id);
void pull_dense2(uint64_t table_id);
void wait_all();
private:
void run();
bool check_update_param(uint64_t table_id);
private:
std::shared_ptr<paddle::ps::PSClient> _ps_client;
int _thread_num;
int _threshold;
int _sleep_time_ms;
Scope* _root_scope;
bool _running;
std::map<uint64_t, uint64_t> _last_versions;
std::map<uint64_t, uint64_t> _current_version;
std::mutex _mutex_for_version;
std::map<uint64_t, std::vector<uint64_t>> _training_versions;
std::map<uint64_t, std::vector<std::string>> _dense_variable_name;
std::thread _t;
std::vector<::std::future<int32_t>> _pull_dense_status;
std::map<uint64_t, std::vector<paddle::ps::Region>> _regions;
uint32_t _pull_dense_fail_times = 0;
std::vector<float> _base_norm_param;
std::vector<float> _mean;
std::vector<float> _scale;
float _squared_sum_epsilon = 1e-4;
std::mutex _mutex_for_mean_scale;
float _total_batch_num = 0;
};
#endif
class ExecutorThreadWorker {
public:
ExecutorThreadWorker()
: thread_id_(-1), root_scope_(NULL), thread_scope_(NULL), debug_(false) {}
~ExecutorThreadWorker() {}
virtual ~ExecutorThreadWorker() {}
void CreateThreadResource(const framework::ProgramDesc& program,
const paddle::platform::Place& place);
......@@ -51,9 +154,15 @@ class ExecutorThreadWorker {
// set data feed declared in executor
void SetDataFeed(const std::shared_ptr<DataFeed>& datafeed);
// A multi-thread training function
void TrainFiles();
virtual void TrainFiles();
// set fetch variable names from python interface assigned by users
void SetFetchVarNames(const std::vector<std::string>& fetch_var_names);
#ifdef PADDLE_WITH_PSLIB
virtual void SetPSlibPtr(
std::shared_ptr<paddle::distributed::PSlib> pslib_ptr) {}
virtual void SetPullDenseThread(std::shared_ptr<DensePullThread> dpt) {}
virtual void SetParamConfig(AsyncWorkerParamConfig* param_config) {}
#endif
private:
void CreateThreadScope(const framework::ProgramDesc& program);
......@@ -77,12 +186,58 @@ class ExecutorThreadWorker {
Scope* root_scope_;
// a thread scope, father scope is global score which is shared
Scope* thread_scope_;
private:
std::vector<std::string> fetch_var_names_;
std::vector<std::vector<float>> fetch_values_;
bool debug_;
};
#ifdef PADDLE_WITH_PSLIB
class AsyncExecutorThreadWorker : public ExecutorThreadWorker {
public:
AsyncExecutorThreadWorker() {}
virtual ~AsyncExecutorThreadWorker() {}
void SetPSlibPtr(std::shared_ptr<paddle::distributed::PSlib> pslib_ptr);
void SetPullDenseThread(std::shared_ptr<DensePullThread> dpt);
void SetParamConfig(AsyncWorkerParamConfig* param_config);
void TrainFiles();
void TrainOneNetwork();
void PrepareParams();
void UpdateParams();
void PullSparse(int table_id);
void FillSparse(int table_id);
void PushSparse(int table_id);
void PushDense(int table_id);
void check_pull_push_memory(const std::vector<uint64_t>& features,
std::vector<float*>* push_g, int dim);
void check_pull_push_memory(const std::vector<uint64_t>& features,
std::vector<std::vector<float>>* push_g, int dim);
void collect_feasign_info(int table_id);
private:
struct FeasignInfo {
uint32_t slot;
uint32_t ins;
int64_t label;
};
std::map<uint64_t, std::vector<uint64_t>> _features;
std::map<uint64_t, std::vector<FeasignInfo>> _fea_info;
std::map<uint64_t, std::vector<std::vector<float>>> _feature_value;
std::map<uint64_t, std::vector<std::vector<float>>> _feature_push_value;
std::shared_ptr<paddle::distributed::PSlib> _pslib_ptr;
std::shared_ptr<DensePullThread> _pull_dense_thread;
std::vector<::std::future<int32_t>> _pull_sparse_status;
std::vector<::std::future<int32_t>> _pull_dense_status;
std::vector<::std::future<int32_t>> _push_sparse_status;
std::vector<::std::future<int32_t>> _push_dense_status;
AsyncWorkerParamConfig* _param_config;
};
#endif
} // namespace framework
} // namespace paddle
......@@ -41,6 +41,23 @@ namespace pd = paddle::framework;
namespace paddle {
namespace pybind {
using set_name_func = void (pd::DataFeedDesc::*)(const std::string&);
#ifdef PADDLE_WITH_PSLIB
void BindAsyncExecutor(py::module* m) {
py::class_<framework::AsyncExecutor>(*m, "AsyncExecutor")
.def(py::init([](framework::Scope* scope, const platform::Place& place) {
return std::unique_ptr<framework::AsyncExecutor>(
new framework::AsyncExecutor(scope, place));
}))
.def("run_from_files", &framework::AsyncExecutor::RunFromFile)
.def("init_server", &framework::AsyncExecutor::InitServer)
.def("init_worker", &framework::AsyncExecutor::InitWorker)
.def("start_server", &framework::AsyncExecutor::StartServer)
.def("stop_server", &framework::AsyncExecutor::StopServer)
.def("gather_servers", &framework::AsyncExecutor::GatherServers)
.def("init_model", &framework::AsyncExecutor::InitModel)
.def("save_model", &framework::AsyncExecutor::SaveModel);
} // end BindAsyncExecutor
#else
void BindAsyncExecutor(py::module* m) {
py::class_<framework::AsyncExecutor>(*m, "AsyncExecutor")
.def(py::init([](framework::Scope* scope, const platform::Place& place) {
......@@ -49,5 +66,6 @@ void BindAsyncExecutor(py::module* m) {
}))
.def("run_from_files", &framework::AsyncExecutor::RunFromFile);
} // end BindAsyncExecutor
#endif
} // end namespace pybind
} // end namespace paddle
......@@ -24,6 +24,8 @@ from paddle.fluid.proto import data_feed_pb2
from google.protobuf import text_format
from . import io
from .data_feed_desc import DataFeedDesc
from .distributed import ps_instance
from .contrib.utils import hdfs_utils as hdfs
__all__ = ['AsyncExecutor']
......@@ -74,7 +76,7 @@ class AsyncExecutor(object):
Note: Only running on CPUPlace supported.
"""
def __init__(self, place=None):
def __init__(self, place=None, run_mode=""):
if place is None:
place = core.CPUPlace()
if not isinstance(place, core.CPUPlace):
......@@ -85,8 +87,16 @@ class AsyncExecutor(object):
scope = global_scope()
self.executor = core.AsyncExecutor(scope, p)
self.instance = None
def run(self, program, data_feed, filelist, thread_num, fetch, debug=False):
def run(self,
program,
data_feed,
filelist,
thread_num,
fetch,
mode="",
debug=False):
"""
Run program by this AsyncExecutor. Training dataset will be in filelist.
Users can also inspect certain variables by naming them in parameter
......@@ -106,6 +116,7 @@ class AsyncExecutor(object):
thread_num(int): number of concurrent training threads. See
:code:`Note` for how to set this properly
fetch(str|list): the var name or a list of var names to inspect
mode(str): run mode of this interface
debug(bool): When set to True, fetch vars will be printed to
standard output after each minibatch
......@@ -148,4 +159,152 @@ class AsyncExecutor(object):
self.executor.run_from_files(program_desc,
data_feed.desc(), filelist, thread_num,
fetch_var_names, debug)
fetch_var_names, mode, debug)
def download_data(self,
afs_path,
local_path,
fs_default_name,
ugi,
file_cnt,
hadoop_home="$HADOOP_HOME",
process_num=12):
"""
download_data is a default download method for distributed training
a user download data without this method
Example:
>>> exe = fluid.AsyncExecutor()
>>> exe.download_data("/xxx/xxx/xx/",
>>> "./data", "afs://
>>> xxx.xxx.xxx.xxx:9901", "xxx,yyy")
Args:
afs_path(str): afs_path defined by users
local_path(str): download data path
fs_default_name(str): file system server address
ugi(str): hadoop ugi
file_cn(int): a user can specify file number for debugging
hadoop_home(str): hadoop home path
process_num(int): download process num
"""
if self.instance is None:
raise ValueError('instance is None, please run'
'config_distributed_nodes init instance')
configs = {"fs.default.name": fs_default_name, "hadoop.job.ugi": ugi}
client = hdfs.HDFSClient(hadoop_home, configs)
downloads = hdfs.multi_download(
client,
afs_path,
local_path,
self.instance.get_worker_index(),
self.instance.get_node_cnt() / 2,
file_cnt,
multi_processes=process_num)
self.instance.barrier_worker() #wait for download_data
def get_instance(self):
"""
get current node's instance so that user can do operations
in distributed setting
"""
if self.instance is None:
raise ValueError(
'instance is None, please run config_distributed_nodes init instance'
)
return self.instance
def config_distributed_nodes(self):
"""
if a user needs to run distributed async executor
he or she needs to do a global configuration so that
information of current process can be obtained
"""
self.instance = ps_instance.PaddlePSInstance(1, 2)
return self.instance
def stop(self):
"""
at the end of process, users should call stop to servers
and barrier all workers
"""
if self.instance is None:
raise ValueError(
'instance is None, please run config_distributed_nodes init instance'
)
self.instance.barrier_worker() #worker do all things
if self.instance.is_first_worker():
self.executor.stop_server()
self.instance.barrier_worker() #sync
self.instance.barrier_all()
self.instance.finalize()
def init_server(self, dist_desc):
"""
initialize server of current node if current process is a server
Args:
dist_desc(str): a protobuf string that describes
how to init a worker and a server
"""
if self.instance is None:
raise ValueError(
'instance is None, please run config_distributed_nodes init instance'
)
self.executor.init_server(dist_desc, self.instance._rankid)
ip = self.executor.start_server()
self.instance.set_ip(ip)
self.instance.barrier_all() #wait all server start
ips = self.instance.gather_ips()
self.executor.gather_servers(ips, self.instance.get_node_cnt())
self.instance.barrier_all() #wait all worker start
def init_worker(self, dist_desc, startup_program):
"""
initialize worker of current node if current process is a worker
Args:
dist_desc(str): a protobuf string that describes
how to init a worker and a server
startup_program(fluid.Program): startup program of current process
"""
if self.instance is None:
raise ValueError(
'instance is None, please run config_distributed_nodes init instance'
)
place = core.CPUPlace()
executor = Executor(place)
executor.run(startup_program)
self.instance.barrier_all() #wait all server start
ips = self.instance.gather_ips()
self.executor.init_worker(dist_desc, ips,
self.instance.get_node_cnt(),
self.instance._rankid)
self.instance.barrier_all() #wait all worker start
if self.instance.is_first_worker():
self.executor.init_model()
self.instance.barrier_worker() #wait init model
def init_model(self):
"""
init_model command that can be invoked from one of the worker
model parameters are initialized in servers
"""
if self.instance is None:
raise ValueError(
'instance is None, please run config_distributed_nodes init instance'
)
self.executor.init_model()
def save_model(self, save_path):
"""
save_model command that can be invoked from one of the worker
model parameters are saved in servers and upload to save_path of file system
Args:
save_path(str): save path to file system
"""
if self.instance is None:
raise ValueError(
'instance is None, please run config_distributed_nodes init instance'
)
self.executor.save_model(save_path)
......@@ -13,10 +13,10 @@
# limitations under the License.
from __future__ import print_function
from . import lookup_table_utils
from .lookup_table_utils import *
#from . import lookup_table_utils
#from .lookup_table_utils import *
from . import hdfs_utils
from .hdfs_utils import *
__all__ = lookup_table_utils.__all__
#__all__ = lookup_table_utils.__all__
__all__ = hdfs_utils.__all__
......@@ -32,6 +32,28 @@ _logger.setLevel(logging.INFO)
class HDFSClient(object):
"""
A tool of HDFS
Args:
hadoop_home (string): hadoop_home
configs (dict): hadoop config, it is a dict, please contain \
key "fs.default.name" and "hadoop.job.ugi"
Can be a float value
Examples:
hadoop_home = "/home/client/hadoop-client/hadoop/"
configs = {
"fs.default.name": "hdfs://xxx.hadoop.com:54310",
"hadoop.job.ugi": "hello,hello123"
}
client = HDFSClient(hadoop_home, configs)
client.ls("/user/com/train-25")
files = client.lsr("/user/com/train-25/models")
"""
def __init__(self, hadoop_home, configs):
self.pre_commands = []
hadoop_bin = '%s/bin/hadoop' % hadoop_home
......@@ -52,9 +74,13 @@ class HDFSClient(object):
ret_code = 0
ret_out = None
ret_err = None
whole_commands = " ".join(whole_commands)
for x in range(retry_times + 1):
proc = subprocess.Popen(
whole_commands, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
whole_commands,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True)
(output, errors) = proc.communicate()
ret_code, ret_out, ret_err = proc.returncode, output, errors
if ret_code:
......@@ -68,10 +94,12 @@ class HDFSClient(object):
def upload(self, hdfs_path, local_path, overwrite=False, retry_times=5):
"""
upload the local file to hdfs
args:
local_file_path: the local file path
remote_file_path: default value(${OUTPUT_PATH}/${SYS_USER_ID}/${SYS_JOB_ID}/tmp)
return:
Args:
hdfs_path: hdfs path, target path
local_path: local file path, source path
overwrite: will overwrite the original file
retry_times: max times retry to upload
Returns:
True or False
"""
assert hdfs_path is not None
......@@ -114,10 +142,12 @@ class HDFSClient(object):
def download(self, hdfs_path, local_path, overwrite=False, unzip=False):
"""
download from hdfs
args:
local_file_path: the local file path
remote_file_path: remote dir on hdfs
return:
Args:
hdfs_path: hdfs path, target path
local_path: local file path, source path
overwrite: will remove original file and overwrite it.
unzip: ignore this param
Returns
True or False
"""
_logger.info('Downloading %r to %r.', hdfs_path, local_path)
......@@ -159,11 +189,11 @@ class HDFSClient(object):
def is_exist(self, hdfs_path=None):
"""
whether the remote hdfs path exists?
args:
remote_file_path: default value(${OUTPUT_PATH}/${SYS_USER_ID}/${SYS_JOB_ID}/tmp)
Args:
hdfs_path: default value(${OUTPUT_PATH}/${SYS_USER_ID}/${SYS_JOB_ID}/tmp)
fs_name: The default values are the same as in the job configuration
fs_ugi: The default values are the same as in the job configuration
return:
Returns:
True or False
"""
exist_cmd = ['-test', '-e', hdfs_path]
......@@ -182,11 +212,11 @@ class HDFSClient(object):
def is_dir(self, hdfs_path=None):
"""
whether the remote hdfs path exists?
args:
Args:
remote_file_path: default value(${OUTPUT_PATH}/${SYS_USER_ID}/${SYS_JOB_ID}/tmp)
fs_name: The default values are the same as in the job configuration
fs_ugi: The default values are the same as in the job configuration
return:
Returns:
True or False
"""
......@@ -206,15 +236,17 @@ class HDFSClient(object):
return True
def delete(self, hdfs_path):
"""Remove a file or directory from HDFS.
:param hdfs_path: HDFS path.
:param recursive: Recursively delete files and directories. By default,
this method will raise an :class:`HdfsError` if trying to delete a
non-empty directory.
"""
Remove a file or directory from HDFS.
This function returns `True` if the deletion was successful and `False` if
no file or directory previously existed at `hdfs_path`.
Args:
param hdfs_path: HDFS path.
param recursive: Recursively delete files and directories. By default,
this method will raise an :class:`HdfsError` if trying to delete a
non-empty directory.
Returns:
This function returns `True` if the deletion was successful and `False` if
no file or directory previously existed at `hdfs_path`.
"""
_logger.info('Deleting %r.', hdfs_path)
......@@ -240,14 +272,17 @@ class HDFSClient(object):
return True
def rename(self, hdfs_src_path, hdfs_dst_path, overwrite=False):
"""Move a file or folder.
:param hdfs_src_path: Source path.
:param hdfs_dst_path: Destination path. If the path already exists and is
a directory, the source will be moved into it. If the path exists and is
a file, or if a parent destination directory is missing, this method will
raise an :class:`HdfsError`.
"""
Rename a file or folder.
Args:
:param hdfs_src_path: Source path.
:param hdfs_dst_path: Destination path. If the path already exists and is
a directory, the source will be moved into it. If the path exists and is
a file, or if a parent destination directory is missing, this method will
raise an :class:`HdfsError`.
Returns:
This function returns `True` if the rename was successful and `False` if
rename was faild.
"""
assert hdfs_src_path is not None
assert hdfs_dst_path is not None
......@@ -273,6 +308,11 @@ class HDFSClient(object):
@staticmethod
def make_local_dirs(local_path):
"""
create a directiory local, is same to mkdir
Args:
local_path: local path that wants to create a directiory.
"""
try:
os.makedirs(local_path)
except OSError as e:
......@@ -281,9 +321,11 @@ class HDFSClient(object):
def makedirs(self, hdfs_path):
"""Create a remote directory, recursively if necessary.
:param hdfs_path: Remote path. Intermediate directories will be created
appropriately.
Args:
:param hdfs_path: Remote path. Intermediate directories will be created
appropriately.
Returns:
True if make a directories was successful, False when make a directiries was failed.
"""
_logger.info('Creating directories to %r.', hdfs_path)
assert hdfs_path is not None
......@@ -303,6 +345,13 @@ class HDFSClient(object):
return True
def ls(self, hdfs_path):
"""
ls a hdfs_path.
Args:
:param hdfs_path: hdfs_path will be ls.
Returns:
This function returns a `list` that contaion all files in the hdfs_path.
"""
assert hdfs_path is not None
if not self.is_exist(hdfs_path):
......@@ -328,6 +377,14 @@ class HDFSClient(object):
return ret_lines
def lsr(self, hdfs_path, only_file=True, sort=True):
"""
ls a hdfs_path sort by time.
Args:
:param hdfs_path: hdfs_path will be ls.
Returns:
This function returns a `list` that contaion all files sorted by time in the hdfs_path.
"""
def sort_by_time(v1, v2):
v1_time = datetime.strptime(v1[1], '%Y-%m-%d %H:%M')
v2_time = datetime.strptime(v2[1], '%Y-%m-%d %H:%M')
......@@ -371,12 +428,15 @@ def multi_upload(client,
multi_processes=5,
overwrite=False):
"""
:param overwrite: will overwrite hdfs file or not
:param multi_processes: the upload data process at the same time, default=5
:param client: instance of HDFSClient
:param hdfs_path: path on hdfs
:param local_path: path on local
:return:
Upload file to hdfs.
Args:
:param overwrite: will overwrite hdfs file or not
:param multi_processes: the upload data process at the same time, default=5
:param client: instance of HDFSClient
:param hdfs_path: path on hdfs
:param local_path: path on local
Returns:
"""
def __subprocess_upload(datas):
......@@ -386,6 +446,13 @@ def multi_upload(client,
client.upload(hdfs_re_path, data, overwrite, retry_times=5)
def get_local_files(path):
"""
Get all local files
Args:
path: local file path
Returns:
A list that contation all files in the path.
"""
rlist = []
if not os.path.isdir(path):
......@@ -426,16 +493,21 @@ def multi_download(client,
local_path,
trainer_id,
trainers,
file_cnt,
multi_processes=5):
"""
multi_download
:param client: instance of HDFSClient
:param hdfs_path: path on hdfs
:param local_path: path on local
:param trainer_id: current trainer id
:param trainers: all trainers number
:param multi_processes: the download data process at the same time, default=5
:return: None
Args:
:param client: instance of HDFSClient
:param hdfs_path: path on hdfs
:param local_path: path on local
:param trainer_id: current trainer id
:param trainers: all trainers number
:param file_cnt: all file number
:param multi_processes: the download data process at the same time, default=5
:return: None
Returns:
A list that be downloaded.
"""
def __subprocess_download(datas):
......@@ -449,7 +521,7 @@ def multi_download(client,
client.make_local_dirs(local_path)
_logger.info("Make local dir {} successfully".format(local_path))
all_need_download = client.lsr(hdfs_path, sort=True)
all_need_download = client.lsr(hdfs_path, sort=True)[:file_cnt]
need_download = all_need_download[trainer_id::trainers]
_logger.info("Get {} files From all {} files need to be download from {}".
format(len(need_download), len(all_need_download), hdfs_path))
......@@ -500,6 +572,7 @@ if __name__ == "__main__":
"/home/xx/data1",
1,
5,
100,
multi_processes=5)
multi_upload(client, "/user/com/train-25/model", "/home/xx/data1")
......@@ -15,12 +15,52 @@
LOOKUP_TABLE_TYPE = "lookup_table"
def find_distributed_lookup_table_inputs(program, table_name):
"""
Find input variable of distribute lookup table in program.
We only support one distribute table now.
Args:
program(Program): given program, locate distributed lookup table
table_name(str): given table name that is found beforehand
Returns:
inputs
"""
local_vars = program.current_block().vars
inputs = []
for op in program.global_block().ops:
if op.type == LOOKUP_TABLE_TYPE:
if table_name == op.input("W")[0]:
inputs.extend([local_vars[name] for name in op.input("Ids")])
return inputs
def find_distributed_lookup_table_outputs(program, table_name):
"""
Find output variable of distribute lookup table in program.
We only support one distribute table now.
Args:
program(Program): given program, locate distributed lookup table
table_name(str): given table name that is found beforehand
Returns:
outputs
"""
local_vars = program.current_block().vars
outputs = []
for op in program.global_block().ops:
if op.type == LOOKUP_TABLE_TYPE:
if table_name == op.input("W")[0]:
outputs.extend([local_vars[name] for name in op.output("Out")])
return outputs
def find_distributed_lookup_table(program):
"""
Find distribute lookup table in program.
We only support one distribute table now.
:param program:
:return: table_name or None
Args:
program(Program): given program, locate distributed lookup table
Returns:
table_name or None
"""
table_name = None
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
from .node import DownpourServer
from .node import DownpourWorker
from ..backward import append_backward
import ps_pb2 as pslib
from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table
from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table_inputs
from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table_outputs
from google.protobuf import text_format
class DownpourSGD(object):
"""
Distributed optimizer of downpour stochastic gradient descent
Standard implementation of Google's Downpour SGD
in Large Scale Distributed Deep Networks
Args:
learning_rate (float): the learning rate used to update parameters. \
Can be a float value
Examples:
.. code-block:: python
downpour_sgd = fluid.distributed.DownpourSGD(learning_rate=0.2)
downpour_sgd.minimize(cost)
"""
def __init__(self, learning_rate=0.001, window=1):
# todo(guru4elephant): add more optimizers here as argument
# todo(guru4elephant): make learning_rate as a variable
self.learning_rate_ = learning_rate
self.window_ = window
self.type = "downpour"
def minimize(self,
loss,
startup_program=None,
parameter_list=None,
no_grad_set=None):
"""
DownpounSGD is a distributed optimizer so
that user can call minimize to generate backward
operators and optimization operators within minmize function
Args:
loss(Variable): loss variable defined by user
startup_program(Program): startup program that defined by user
parameter_list(str list): parameter names defined by users
no_grad_set(set): a set of variables that is defined by users
so that these variables do not need gradient computation
Returns:
[ps_param, worker_skipped_ops]
ps_param: parameter server protobuf desc
worker_skipped_ops: operator names that need
to be skipped during execution
"""
params_grads = sorted(
append_backward(loss, parameter_list, no_grad_set),
key=lambda x: x[0].name)
table_name = find_distributed_lookup_table(loss.block.program)
prefetch_slots = find_distributed_lookup_table_inputs(
loss.block.program, table_name)
prefetch_slots_emb = find_distributed_lookup_table_outputs(
loss.block.program, table_name)
server = DownpourServer()
# window is communication strategy
worker = DownpourWorker(self.window_)
# Todo(guru4elephant): support multiple tables definitions
# currently support one big sparse table
sparse_table_index = 0
# currently merge all dense parameters into one dense table
dense_table_index = 1
params = []
grads = []
for i in params_grads:
params.append(i[0])
for i in params_grads:
grads.append(i[1])
server.add_sparse_table(sparse_table_index, self.learning_rate_,
prefetch_slots, prefetch_slots_emb)
server.add_dense_table(dense_table_index, self.learning_rate_, params,
grads)
worker.add_sparse_table(sparse_table_index, self.learning_rate_,
prefetch_slots, prefetch_slots_emb)
worker.add_dense_table(dense_table_index, self.learning_rate_, params,
grads)
ps_param = pslib.PSParameter()
ps_param.server_param.CopyFrom(server.get_desc())
ps_param.trainer_param.CopyFrom(worker.get_desc())
# Todo(guru4elephant): figure out how to support more sparse parameters
# currently only support lookup_table
worker_skipped_ops = ["lookup_table", "lookup_table_grad"]
ps_param.trainer_param.skip_op.extend(worker_skipped_ops)
return [ps_param, worker_skipped_ops]
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
class FileSystem(object):
"""
A file system that support async_executor hadoop client desc.
Args:
fs_type (string): fs_type, for example is "afs"
user (string): hadoop param
passwd (string): hadoop param
hadoop bin (string): hadoop param
Examples:
fs = FileSystm()
"""
def __init__(self,
fs_type="afs",
uri="afs://xx",
user=None,
passwd=None,
hadoop_bin=""):
assert user != None
assert passwd != None
assert hadoop_bin != None
import ps_pb2 as pslib
self.fs_client = pslib.FsClientParameter()
self.fs_client.uri = uri
self.fs_client.user = user
self.fs_client.passwd = passwd
#self.fs_client.buffer_size = 0
self.fs_client.hadoop_bin = hadoop_bin
#self.fs_client.afs_conf = afs_conf if not afs_conf else ""
def get_desc(self):
"""
get hadoop desc.
"""
return self.fs_client
class MPIHelper(object):
"""
MPIHelper is a wrapper of mpi4py, support get_rank get_size etc.
Args:
No params
Examples:
mh = MPIHelper()
mh.get_ip()
"""
def __init__(self):
from mpi4py import MPI
self.comm = MPI.COMM_WORLD
self.MPI = MPI
def get_rank(self):
return self.comm.Get_rank()
def get_size(self):
return self.comm.Get_size()
def get_ip(self):
import socket
local_ip = socket.gethostbyname(socket.gethostname())
return local_ip
def get_hostname(self):
import socket
return socket.gethostname()
def finalize(self):
self.MPI.Finalize()
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
import ps_pb2 as pslib
class Server(object):
"""
A Server basic class.
"""
def __init__(self):
pass
class Worker(object):
"""
A Worker basic class.
"""
def __init__(self):
pass
class DownpourServer(Server):
"""
DownpourServer class is used to generate server program_desc
Args:
server: it is pslib.ServerParameter()
Examples:
server = DownpourServer()
"""
def __init__(self):
self.server_ = pslib.ServerParameter()
self.server_.downpour_server_param.service_param.start_server_port = 0
self.server_.downpour_server_param.service_param.server_class = "DownpourBrpcPsServer"
self.server_.downpour_server_param.service_param.client_class = "DownpourBrpcPsClient"
self.server_.downpour_server_param.service_param.service_class = "DownpourPsService"
self.server_.downpour_server_param.service_param.start_server_port = 0
self.server_.downpour_server_param.service_param.server_thread_num = 12
def add_sparse_table(self, table_id, learning_rate, slot_key_vars,
slot_value_var):
"""
Args:
table_id(int): id of sparse params table
learning_rate(float): the learning rate used to update parameters. \
Can be a float value
slot_key_vars(string): slot key id
slot_value_var(string): slot key value after embedding
Returns:
return None
"""
table = self.server_.downpour_server_param.downpour_table_param.add()
table.table_id = table_id
table.table_class = "DownpourSparseTable"
table.type = pslib.PS_SPARSE_TABLE
table.accessor.accessor_class = "DownpourFeatureValueAccessor"
table.accessor.sparse_sgd_param.learning_rate = learning_rate
table.accessor.sparse_sgd_param.initial_g2sum = 3
table.accessor.sparse_sgd_param.initial_range = 1e-4
table.accessor.sparse_sgd_param.weight_bounds.extend([-10, 10])
table.accessor.embedx_dim = 8
table.accessor.embedx_threshold = 5
table.accessor.fea_dim = 11
table.accessor.downpour_accessor_param.nonclk_coeff = 0.1
table.accessor.downpour_accessor_param.click_coeff = 2
table.accessor.downpour_accessor_param.base_threshold = 0.2
table.accessor.downpour_accessor_param.delta_threshold = 0.15
table.accessor.downpour_accessor_param.delta_keep_days = 31
table.accessor.downpour_accessor_param.show_click_decay_rate = 0.999
table.accessor.downpour_accessor_param.delete_threshold = 0.8
def add_dense_table(self, table_id, learning_rate, param_var, grad_var):
"""
Args:
table_id(int): id of sparse params table
learning_rate(float): the learning rate used to update parameters. \
Can be a float value
param_var(list): all dense param. it is a list.
grad_var(list): all dense grad parm it is a list.
Returns:
return None
"""
table = self.server_.downpour_server_param.downpour_table_param.add()
table.table_id = table_id
table.table_class = "DownpourDenseTable"
table.type = pslib.PS_DENSE_TABLE
table.accessor.accessor_class = "DownpourDenseValueAccessor"
table.accessor.dense_sgd_param.name = "adam"
table.accessor.dense_sgd_param.adam.learning_rate = learning_rate
table.accessor.dense_sgd_param.adam.avg_decay_rate = 0.999993
table.accessor.dense_sgd_param.adam.ada_decay_rate = 0.9999
table.accessor.dense_sgd_param.adam.ada_epsilon = 1e-8
table.accessor.dense_sgd_param.adam.mom_decay_rate = 0.99
table.accessor.dense_sgd_param.naive.learning_rate = 0.0002
fea_dim = 0
for param in filter(lambda x: x.name.find("embedding") == -1,
param_var):
fea_dim += reduce(lambda x, y: x * y, param.shape, 1)
table.accessor.fea_dim = fea_dim
def get_desc(self):
"""
Return downpour server program_desc
"""
return self.server_
class DownpourWorker(Worker):
"""
DownpourWorker class is used to generate worker program_desc
Args:
window (int): push params frequency
worker: it is pslib.DownpourTrainerParameter
Examples:
worker = DownpourWorker(1)
"""
def __init__(self, window):
self.window = window
self.worker_ = pslib.DownpourTrainerParameter()
def add_sparse_table(self, table_id, learning_rate, slot_key_vars,
slot_value_vars):
"""
Args:
table_id(int): id of sparse params table
learning_rate(float): the learning rate used to update parameters. \
Can be a float value
slot_key_vars(string): slot key id
slot_value_var(string): slot key value after embedding
Returns:
return None
"""
table = self.worker_.sparse_table.add()
table.table_id = table_id
table.slot_key.extend([var.name for var in slot_key_vars])
table.slot_value.extend([var.name for var in slot_value_vars])
table.slot_gradient.extend(
[var.name + "@GRAD" for var in slot_value_vars])
def add_dense_table(self, table_id, learning_rate, param_vars, grad_vars):
"""
Args:
table_id(int): id of sparse params table
learning_rate(float): the learning rate used to update parameters. \
Can be a float value
param_var(list): all dense param. it is a list.
grad_var(list): all dense grad parm it is a list.
Returns:
return None
"""
table = self.worker_.dense_table.add()
table.table_id = table_id
table.dense_variable_name.extend(
filter(lambda x: x.find("embedding") == -1,
[p.name for p in param_vars]))
table.dense_gradient_variable_name.extend(
filter(lambda x: x.find("embedding") == -1,
[g.name for g in grad_vars]))
def get_desc(self):
"""
Return downpour worker program_desc
"""
return self.worker_
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
from .helper import MPIHelper
class PaddlePSInstance(object):
"""
PaddlePSInstance class is used to generate A instance of server or worker
Args:
server_worker_mode: is a value 0 or 1, default is 1
proc_per_node: process per node, default is 2
Examples:
instance = PaddlePSInstance(1, 2)
"""
def __init__(self, server_worker_mode, proc_per_node):
self.dh = MPIHelper()
self._rankid = self.dh.get_rank()
self._server_worker_mode = server_worker_mode
self._proc_per_node = proc_per_node
self._nodes = self.dh.get_size()
self._ip = 0
self._worker_num = self._nodes * self._proc_per_node / 2
self._server_num = self._nodes * self._proc_per_node / 2
self._total_server_worker = self._worker_num + self._server_num
self._node_type = None #IDLE=-1, WORKER=1, SERVER=0
self._set_nodetype()
self._comm = None
self._split_comm()
def _set_nodetype(self):
if self._server_worker_mode == 0:
if self._rankid < self._server_num:
self._node_type = 1
elif self._rankid < self._total_server_worker:
self._node_type = 0
else:
self._node_type = -1
elif self._server_worker_mode == 1:
if self._rankid < self._total_server_worker:
if 0 == self._rankid % self._proc_per_node % 2:
self._node_type = 0
else:
self._node_type = 1
else:
self._node_type = -1
else:
self._node_type = -1
def _split_comm(self):
if self.is_server():
self._comm = self.dh.comm.Split(self._node_type)
elif self.is_worker():
self._comm = self.dh.comm.Split(self._node_type)
pass
def get_worker_index(self):
"""
Return worker index
"""
if self._server_worker_mode == 0:
return self._rankid == self.server_num
else:
return self._rankid / self._proc_per_node
def get_server_index(self):
"""
Return server index
"""
if self._server_worker_mode == 0:
return self.rank_id
else:
return self.rank_id / self._proc_per_node
def is_worker(self):
"""
Return instance is worker or not
"""
return self._node_type == 1
def is_server(self):
"""
Return instance is server or not
"""
return self._node_type == 0
def is_first_worker(self):
"""
Return instance is first worker or not
"""
return self.is_worker() and 0 == self.get_worker_index()
def set_ip(self, ip):
"""
set server ip
"""
self._ip = ip
def gather_ips(self):
"""
Return all servers and workers ip throught mpi allgather
"""
self._ips = self.dh.comm.allgather(self._ip)
return self._ips
def get_node_cnt(self):
"""
Return node cnt
"""
return self._nodes
def barrier_all(self):
"""
barrier workers and servers
"""
self.dh.comm.barrier()
def barrier_worker(self):
"""
barrier workers
"""
if self.is_worker():
self._comm.barrier()
pass
def finalize(self):
"""
MPI finalize
"""
self.dh.finalize()
pass
if __name__ == "__main__":
instance = PaddlePSInstance(1, 1, 2, 50)
instance.barrier_all()
此差异已折叠。
......@@ -104,8 +104,10 @@ packages=['paddle',
'paddle.fluid.imperative',
'paddle.fluid.proto',
'paddle.fluid.proto.profiler',
'paddle.fluid.distributed',
'paddle.fluid.layers',
'paddle.fluid.contrib',
'paddle.fluid.contrib.utils',
'paddle.fluid.contrib.decoder',
'paddle.fluid.contrib.quantize',
'paddle.fluid.transpiler',
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册