未验证 提交 a3730dc8 编写于 作者: S Sing_chan 提交者: GitHub

【code format check upgrade】 step2:clang-format (#42840)

上级 a072fca8

要显示的变更太多。

To preserve performance only 1000 of 1000+ files are displayed.
...@@ -33,6 +33,10 @@ repos: ...@@ -33,6 +33,10 @@ repos:
entry: bash ./tools/codestyle/clang_format.hook -i entry: bash ./tools/codestyle/clang_format.hook -i
language: system language: system
files: \.(c|cc|cxx|cpp|cu|h|hpp|hxx|proto|xpu|kps)$ files: \.(c|cc|cxx|cpp|cu|h|hpp|hxx|proto|xpu|kps)$
exclude: |
(?x)^(
paddle/fluid/distributed/ps/thirdparty/round_robin.h
)$
- repo: local - repo: local
hooks: hooks:
- id: cpplint-cpp-source - id: cpplint-cpp-source
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/collective/HCCLTools.h" #include "paddle/fluid/distributed/collective/HCCLTools.h"
#include "paddle/fluid/distributed/collective/Types.h" #include "paddle/fluid/distributed/collective/Types.h"
namespace paddle { namespace paddle {
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#pragma once #pragma once
#include <error.h> #include <error.h>
#include <string> #include <string>
#include "boost/variant.hpp" #include "boost/variant.hpp"
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/collective/NCCLTools.h" #include "paddle/fluid/distributed/collective/NCCLTools.h"
#include "paddle/fluid/distributed/collective/Types.h" #include "paddle/fluid/distributed/collective/Types.h"
namespace paddle { namespace paddle {
......
...@@ -16,9 +16,11 @@ ...@@ -16,9 +16,11 @@
#include <cuda_runtime.h> #include <cuda_runtime.h>
#include <error.h> #include <error.h>
#include <string> #include <string>
#include "boost/variant.hpp" #include "boost/variant.hpp"
#include "paddle/fluid/distributed/collective/Types.h"
#include "paddle/fluid/framework/data_type.h" #include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/variable.h" #include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/platform/cuda_device_guard.h" #include "paddle/fluid/platform/cuda_device_guard.h"
...@@ -26,8 +28,6 @@ ...@@ -26,8 +28,6 @@
#include "paddle/fluid/platform/dynload/nccl.h" #include "paddle/fluid/platform/dynload/nccl.h"
#include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/distributed/collective/Types.h"
namespace paddle { namespace paddle {
namespace distributed { namespace distributed {
......
...@@ -21,7 +21,6 @@ ...@@ -21,7 +21,6 @@
#include "paddle/fluid/distributed/collective/Types.h" #include "paddle/fluid/distributed/collective/Types.h"
#include "paddle/fluid/eager/api/utils/tensor_utils.h" #include "paddle/fluid/eager/api/utils/tensor_utils.h"
#include "paddle/fluid/framework/tensor.h" #include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/framework/variable.h" #include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/enforce.h"
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
#include <gloo/broadcast.h> #include <gloo/broadcast.h>
#include <gloo/reduce.h> #include <gloo/reduce.h>
#include <gloo/scatter.h> #include <gloo/scatter.h>
#include "paddle/fluid/distributed/collective/Common.h" #include "paddle/fluid/distributed/collective/Common.h"
#include "paddle/fluid/distributed/collective/ProcessGroupGloo.h" #include "paddle/fluid/distributed/collective/ProcessGroupGloo.h"
#include "paddle/fluid/framework/fleet/gloo_wrapper.h" #include "paddle/fluid/framework/fleet/gloo_wrapper.h"
...@@ -485,8 +486,9 @@ std::shared_ptr<::gloo::transport::Device> ...@@ -485,8 +486,9 @@ std::shared_ptr<::gloo::transport::Device>
ProcessGroupGloo::createDefaultDevice() { ProcessGroupGloo::createDefaultDevice() {
std::array<char, HOST_NAME_MAX> hostname{}; std::array<char, HOST_NAME_MAX> hostname{};
auto ret = ::gethostname(hostname.data(), HOST_NAME_MAX); auto ret = ::gethostname(hostname.data(), HOST_NAME_MAX);
PADDLE_ENFORCE_EQ(ret, 0, platform::errors::Fatal( PADDLE_ENFORCE_EQ(
"Get hostname error for createDefaultDevice.")); ret, 0,
platform::errors::Fatal("Get hostname error for createDefaultDevice."));
::addrinfo* result; ::addrinfo* result;
result = tcputils::get_addr_info(hostname.data(), "", 0, AF_UNSPEC); result = tcputils::get_addr_info(hostname.data(), "", 0, AF_UNSPEC);
::addrinfo* cur; ::addrinfo* cur;
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/collective/ProcessGroupHCCL.h" #include "paddle/fluid/distributed/collective/ProcessGroupHCCL.h"
#include "paddle/fluid/distributed/collective/Common.h" #include "paddle/fluid/distributed/collective/Common.h"
#include "paddle/fluid/distributed/collective/HCCLTools.h" #include "paddle/fluid/distributed/collective/HCCLTools.h"
#include "paddle/fluid/memory/malloc.h" #include "paddle/fluid/memory/malloc.h"
...@@ -216,15 +217,16 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHCCL::AllReduce( ...@@ -216,15 +217,16 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHCCL::AllReduce(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT std::vector<phi::DenseTensor>& out_tensors, // NOLINT
const AllreduceOptions& opts) { const AllreduceOptions& opts) {
return Collective(in_tensors, out_tensors, return Collective(
[&](phi::DenseTensor& input, phi::DenseTensor& output, in_tensors, out_tensors,
HcclComm comm, const aclrtStream& stream) { [&](phi::DenseTensor& input, phi::DenseTensor& output, HcclComm comm,
return platform::dynload::HcclAllReduce( const aclrtStream& stream) {
input.data(), output.data(), input.numel(), return platform::dynload::HcclAllReduce(
platform::ToHCCLDataType(input.dtype()), input.data(), output.data(), input.numel(),
ToHCCLRedType(opts.reduce_op), comm, stream); platform::ToHCCLDataType(input.dtype()),
}, ToHCCLRedType(opts.reduce_op), comm, stream);
CommType::ALLREDUCE); },
CommType::ALLREDUCE);
} }
std::shared_ptr<ProcessGroup::Task> ProcessGroupHCCL::Broadcast( std::shared_ptr<ProcessGroup::Task> ProcessGroupHCCL::Broadcast(
......
...@@ -21,12 +21,11 @@ ...@@ -21,12 +21,11 @@
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include "paddle/fluid/distributed/collective/HCCLTools.h"
#include "paddle/fluid/distributed/collective/ProcessGroup.h" #include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/fluid/distributed/store/store.h"
#include "paddle/fluid/platform/device/npu/npu_stream.h" #include "paddle/fluid/platform/device/npu/npu_stream.h"
#include "paddle/fluid/platform/device_context.h" #include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/distributed/collective/HCCLTools.h"
#include "paddle/fluid/distributed/store/store.h"
#include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/gen_comm_id_helper.h" #include "paddle/fluid/platform/gen_comm_id_helper.h"
#include "paddle/fluid/platform/place.h" #include "paddle/fluid/platform/place.h"
......
...@@ -13,7 +13,9 @@ ...@@ -13,7 +13,9 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/collective/ProcessGroupHeter.h" #include "paddle/fluid/distributed/collective/ProcessGroupHeter.h"
#include <chrono> #include <chrono>
#include "paddle/fluid/platform/device/gpu/nccl_helper.h" #include "paddle/fluid/platform/device/gpu/nccl_helper.h"
#include "paddle/fluid/platform/place.h" #include "paddle/fluid/platform/place.h"
#include "paddle/phi/api/include/api.h" #include "paddle/phi/api/include/api.h"
...@@ -129,8 +131,9 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::AllReduce( ...@@ -129,8 +131,9 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::AllReduce(
gid_, {dense_cpu_tensor.name()}, send_size, dense_cpu_tensor.data(), gid_, {dense_cpu_tensor.name()}, send_size, dense_cpu_tensor.data(),
dense_cpu_tensor.numel() * dense_cpu_tensor.numel() *
framework::DataTypeSize(dense_cpu_tensor.dtype())); framework::DataTypeSize(dense_cpu_tensor.dtype()));
PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet( PADDLE_ENFORCE_EQ(ret, 0,
"Send to the switch module error.")); platform::errors::PreconditionNotMet(
"Send to the switch module error."));
phi::DenseTensor cpu_tensor2; phi::DenseTensor cpu_tensor2;
cpu_tensor2.AllocateFrom( cpu_tensor2.AllocateFrom(
std::make_unique<paddle::experimental::DefaultAllocator>( std::make_unique<paddle::experimental::DefaultAllocator>(
...@@ -140,8 +143,9 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::AllReduce( ...@@ -140,8 +143,9 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::AllReduce(
ret = client_->Recv( ret = client_->Recv(
gid_, {dense_cpu_tensor.name()}, cpu_tensor2.data(), gid_, {dense_cpu_tensor.name()}, cpu_tensor2.data(),
cpu_tensor2.numel() * framework::DataTypeSize(cpu_tensor2.dtype())); cpu_tensor2.numel() * framework::DataTypeSize(cpu_tensor2.dtype()));
PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet( PADDLE_ENFORCE_EQ(ret, 0,
"Recv from the switch module error.")); platform::errors::PreconditionNotMet(
"Recv from the switch module error."));
switch (dense_cpu_tensor.dtype()) { switch (dense_cpu_tensor.dtype()) {
case DataType::FLOAT32: case DataType::FLOAT32:
...@@ -226,8 +230,9 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Broadcast( ...@@ -226,8 +230,9 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Broadcast(
dense_cpu_tensor.data(), dense_cpu_tensor.data(),
dense_cpu_tensor.numel() * dense_cpu_tensor.numel() *
framework::DataTypeSize(dense_cpu_tensor.dtype())); framework::DataTypeSize(dense_cpu_tensor.dtype()));
PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet( PADDLE_ENFORCE_EQ(ret, 0,
"Send to the switch module error.")); platform::errors::PreconditionNotMet(
"Send to the switch module error."));
} else { } else {
int ret = client_->Recv( int ret = client_->Recv(
gid_, {dense_cpu_tensor.name()}, dense_cpu_tensor.data(), gid_, {dense_cpu_tensor.name()}, dense_cpu_tensor.data(),
...@@ -286,8 +291,9 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Send( ...@@ -286,8 +291,9 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Send(
VLOG(2) << "tensor_name:" << tensor_name; VLOG(2) << "tensor_name:" << tensor_name;
int ret = client_->Send(gid_, {tensor_name}, send_size, cpu_tensor.data(), int ret = client_->Send(gid_, {tensor_name}, send_size, cpu_tensor.data(),
tensor_size); tensor_size);
PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet( PADDLE_ENFORCE_EQ(
"Send to the switch module error.")); ret, 0,
platform::errors::PreconditionNotMet("Send to the switch module error."));
return CreateTask(rank_, CommType::SEND, in_tensors); return CreateTask(rank_, CommType::SEND, in_tensors);
} }
...@@ -319,8 +325,9 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Recv( ...@@ -319,8 +325,9 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupHeter::Recv(
int ret = client_->Recv( int ret = client_->Recv(
gid_, {tensor_name}, cpu_tensor.data(), gid_, {tensor_name}, cpu_tensor.data(),
cpu_tensor.numel() * framework::DataTypeSize(cpu_tensor.dtype())); cpu_tensor.numel() * framework::DataTypeSize(cpu_tensor.dtype()));
PADDLE_ENFORCE_EQ(ret, 0, platform::errors::PreconditionNotMet( PADDLE_ENFORCE_EQ(ret, 0,
"receive to the switch module error.")); platform::errors::PreconditionNotMet(
"receive to the switch module error."));
auto end = std::chrono::high_resolution_clock::now(); auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> diff = end - start; std::chrono::duration<double> diff = end - start;
double goodput = cpu_tensor.numel() * double goodput = cpu_tensor.numel() *
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/collective/ProcessGroupNCCL.h" #include "paddle/fluid/distributed/collective/ProcessGroupNCCL.h"
#include "paddle/fluid/distributed/collective/Common.h" #include "paddle/fluid/distributed/collective/Common.h"
#include "paddle/fluid/platform/device/gpu/gpu_info.h" #include "paddle/fluid/platform/device/gpu/gpu_info.h"
#include "paddle/fluid/platform/device/gpu/nccl_helper.h" #include "paddle/fluid/platform/device/gpu/nccl_helper.h"
...@@ -320,15 +321,16 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllReduce( ...@@ -320,15 +321,16 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllReduce(
PADDLE_ENFORCE_EQ( PADDLE_ENFORCE_EQ(
CheckTensorsInCudaPlace(in_tensors), true, CheckTensorsInCudaPlace(in_tensors), true,
platform::errors::InvalidArgument("All inputs should be in CudaPlace.")); platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
return Collective(in_tensors, out_tensors, return Collective(
[&](const phi::DenseTensor& input, phi::DenseTensor& output, in_tensors, out_tensors,
ncclComm_t comm, const gpuStream_t& stream) { [&](const phi::DenseTensor& input, phi::DenseTensor& output,
return platform::dynload::ncclAllReduce( ncclComm_t comm, const gpuStream_t& stream) {
input.data(), output.data(), input.numel(), return platform::dynload::ncclAllReduce(
platform::ToNCCLDataType(input.type()), input.data(), output.data(), input.numel(),
ToNCCLRedType(opts.reduce_op), comm, stream); platform::ToNCCLDataType(input.type()),
}, ToNCCLRedType(opts.reduce_op), comm, stream);
CommType::ALLREDUCE); },
CommType::ALLREDUCE);
} }
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Broadcast( std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Broadcast(
...@@ -338,17 +340,17 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Broadcast( ...@@ -338,17 +340,17 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Broadcast(
CheckTensorsInCudaPlace(in_tensors), true, CheckTensorsInCudaPlace(in_tensors), true,
platform::errors::InvalidArgument("All inputs should be in CudaPlace.")); platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
return Collective(in_tensors, out_tensors, return Collective(
[&](phi::DenseTensor& input, phi::DenseTensor& output, in_tensors, out_tensors,
ncclComm_t comm, const gpuStream_t& stream) { [&](phi::DenseTensor& input, phi::DenseTensor& output, ncclComm_t comm,
const auto root = opts.source_rank * in_tensors.size() + const gpuStream_t& stream) {
opts.source_root; const auto root =
return platform::dynload::ncclBroadcast( opts.source_rank * in_tensors.size() + opts.source_root;
input.data(), output.data(), input.numel(), return platform::dynload::ncclBroadcast(
platform::ToNCCLDataType(input.type()), root, comm, input.data(), output.data(), input.numel(),
stream); platform::ToNCCLDataType(input.type()), root, comm, stream);
}, },
CommType::BROADCAST); CommType::BROADCAST);
} }
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Barrier( std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Barrier(
...@@ -400,15 +402,15 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Send( ...@@ -400,15 +402,15 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Send(
std::vector<phi::DenseTensor>& tensors, int dst_rank) { std::vector<phi::DenseTensor>& tensors, int dst_rank) {
CheckTensorsInDifferentDevices(tensors, static_cast<size_t>(GetSize())); CheckTensorsInDifferentDevices(tensors, static_cast<size_t>(GetSize()));
auto task = PointToPoint(tensors, auto task = PointToPoint(
[&](phi::DenseTensor& input, ncclComm_t comm, tensors,
const gpuStream_t& stream, int dst_rank) { [&](phi::DenseTensor& input, ncclComm_t comm, const gpuStream_t& stream,
return platform::dynload::ncclSend( int dst_rank) {
input.data(), input.numel(), return platform::dynload::ncclSend(
platform::ToNCCLDataType(input.dtype()), input.data(), input.numel(),
dst_rank, comm, stream); platform::ToNCCLDataType(input.dtype()), dst_rank, comm, stream);
}, },
dst_rank, CommType::SEND); dst_rank, CommType::SEND);
return task; return task;
} }
...@@ -416,15 +418,15 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Recv( ...@@ -416,15 +418,15 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Recv(
std::vector<phi::DenseTensor>& tensors, int src_rank) { std::vector<phi::DenseTensor>& tensors, int src_rank) {
CheckTensorsInDifferentDevices(tensors, static_cast<size_t>(GetSize())); CheckTensorsInDifferentDevices(tensors, static_cast<size_t>(GetSize()));
auto task = PointToPoint(tensors, auto task = PointToPoint(
[&](phi::DenseTensor& output, ncclComm_t comm, tensors,
const gpuStream_t& stream, int src_rank) { [&](phi::DenseTensor& output, ncclComm_t comm, const gpuStream_t& stream,
return platform::dynload::ncclRecv( int src_rank) {
output.data(), output.numel(), return platform::dynload::ncclRecv(
platform::ToNCCLDataType(output.dtype()), output.data(), output.numel(),
src_rank, comm, stream); platform::ToNCCLDataType(output.dtype()), src_rank, comm, stream);
}, },
src_rank, CommType::RECV); src_rank, CommType::RECV);
return task; return task;
} }
...@@ -440,15 +442,15 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Send_Partial( ...@@ -440,15 +442,15 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Send_Partial(
std::vector<phi::DenseTensor> shared_tensors; std::vector<phi::DenseTensor> shared_tensors;
shared_tensors.push_back(shared_input); shared_tensors.push_back(shared_input);
auto task = PointToPoint(shared_tensors, auto task = PointToPoint(
[&](phi::DenseTensor& input, ncclComm_t comm, shared_tensors,
const gpuStream_t& stream, int dst_rank) { [&](phi::DenseTensor& input, ncclComm_t comm, const gpuStream_t& stream,
return platform::dynload::ncclSend( int dst_rank) {
input.data(), input.numel(), return platform::dynload::ncclSend(
platform::ToNCCLDataType(input.dtype()), input.data(), input.numel(),
dst_rank, comm, stream); platform::ToNCCLDataType(input.dtype()), dst_rank, comm, stream);
}, },
dst_rank, CommType::SEND); dst_rank, CommType::SEND);
return task; return task;
} }
...@@ -463,15 +465,15 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Recv_Partial( ...@@ -463,15 +465,15 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Recv_Partial(
std::vector<phi::DenseTensor> shared_tensors; std::vector<phi::DenseTensor> shared_tensors;
shared_tensors.push_back(shared_input); shared_tensors.push_back(shared_input);
auto task = PointToPoint(shared_tensors, auto task = PointToPoint(
[&](phi::DenseTensor& output, ncclComm_t comm, shared_tensors,
const gpuStream_t& stream, int src_rank) { [&](phi::DenseTensor& output, ncclComm_t comm, const gpuStream_t& stream,
return platform::dynload::ncclRecv( int src_rank) {
output.data(), output.numel(), return platform::dynload::ncclRecv(
platform::ToNCCLDataType(output.dtype()), output.data(), output.numel(),
src_rank, comm, stream); platform::ToNCCLDataType(output.dtype()), src_rank, comm, stream);
}, },
src_rank, CommType::RECV); src_rank, CommType::RECV);
return task; return task;
} }
...@@ -484,15 +486,15 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllGather( ...@@ -484,15 +486,15 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllGather(
PADDLE_ENFORCE_EQ( PADDLE_ENFORCE_EQ(
CheckTensorsInCudaPlace(out_tensors), true, CheckTensorsInCudaPlace(out_tensors), true,
platform::errors::InvalidArgument("All outputs should be in CudaPlace.")); platform::errors::InvalidArgument("All outputs should be in CudaPlace."));
return Collective(in_tensors, out_tensors, return Collective(
[&](const phi::DenseTensor& input, phi::DenseTensor& output, in_tensors, out_tensors,
ncclComm_t comm, const gpuStream_t& stream) { [&](const phi::DenseTensor& input, phi::DenseTensor& output,
return platform::dynload::ncclAllGather( ncclComm_t comm, const gpuStream_t& stream) {
input.data(), output.data(), input.numel(), return platform::dynload::ncclAllGather(
platform::ToNCCLDataType(input.dtype()), comm, input.data(), output.data(), input.numel(),
stream); platform::ToNCCLDataType(input.dtype()), comm, stream);
}, },
CommType::ALLGATHER); CommType::ALLGATHER);
} }
void* GetPointerByOffset(void* raw_pointer, size_t offset, void* GetPointerByOffset(void* raw_pointer, size_t offset,
......
...@@ -22,10 +22,9 @@ ...@@ -22,10 +22,9 @@
#include <vector> #include <vector>
#include "paddle/fluid/distributed/collective/ProcessGroup.h" #include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/fluid/distributed/store/store.h"
#include "paddle/fluid/platform/cuda_device_guard.h" #include "paddle/fluid/platform/cuda_device_guard.h"
#include "paddle/fluid/platform/device_context.h" #include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/distributed/store/store.h"
#include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/enforce.h"
#include "paddle/fluid/platform/gen_comm_id_helper.h" #include "paddle/fluid/platform/gen_comm_id_helper.h"
#include "paddle/fluid/platform/place.h" #include "paddle/fluid/platform/place.h"
......
...@@ -403,8 +403,9 @@ void EagerReducer::InitializeDenseGroups( ...@@ -403,8 +403,9 @@ void EagerReducer::InitializeDenseGroups(
"Tensor %s is not initialized.", tensor_name)); "Tensor %s is not initialized.", tensor_name));
const auto size = tensor.numel(); const auto size = tensor.numel();
PADDLE_ENFORCE_GT( PADDLE_ENFORCE_GT(
size, 0, platform::errors::PreconditionNotMet( size, 0,
"The number of tensor %s's elements is 0.", tensor_name)); platform::errors::PreconditionNotMet(
"The number of tensor %s's elements is 0.", tensor_name));
all_length += size; all_length += size;
p_group->length_.push_back(size); p_group->length_.push_back(size);
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include <map> #include <map>
#include <vector> #include <vector>
#include "paddle/fluid/distributed/collective/ProcessGroup.h" #include "paddle/fluid/distributed/collective/ProcessGroup.h"
#include "paddle/fluid/eager/accumulation/accumulation_node.h" #include "paddle/fluid/eager/accumulation/accumulation_node.h"
#include "paddle/fluid/eager/api/utils/hook_utils.h" #include "paddle/fluid/eager/api/utils/hook_utils.h"
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/common/afs_warpper.h" #include "paddle/fluid/distributed/common/afs_warpper.h"
#include "paddle/fluid/framework/io/fs.h" #include "paddle/fluid/framework/io/fs.h"
namespace paddle { namespace paddle {
...@@ -27,9 +28,10 @@ int AfsClient::initialize(const FsClientParameter& fs_client_param) { ...@@ -27,9 +28,10 @@ int AfsClient::initialize(const FsClientParameter& fs_client_param) {
int AfsClient::initialize(const std::string& hadoop_bin, const std::string& uri, int AfsClient::initialize(const std::string& hadoop_bin, const std::string& uri,
const std::string& user, const std::string& passwd, const std::string& user, const std::string& passwd,
int buffer_size_param) { int buffer_size_param) {
return initialize(hadoop_bin, uri, paddle::string::format_string( return initialize(
"%s,%s", user.c_str(), passwd.c_str()), hadoop_bin, uri,
buffer_size_param); paddle::string::format_string("%s,%s", user.c_str(), passwd.c_str()),
buffer_size_param);
} }
int AfsClient::initialize(const std::string& hadoop_bin, const std::string& uri, int AfsClient::initialize(const std::string& hadoop_bin, const std::string& uri,
const std::string& ugi, int buffer_size_param) { const std::string& ugi, int buffer_size_param) {
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include <vector> #include <vector>
#include "paddle/fluid/distributed/ps.pb.h" #include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/string/string_helper.h" #include "paddle/fluid/string/string_helper.h"
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#pragma once #pragma once
#include <memory> #include <memory>
#include <unordered_map> #include <unordered_map>
#include "butil/time.h" #include "butil/time.h"
#include "bvar/latency_recorder.h" #include "bvar/latency_recorder.h"
#include "glog/logging.h" #include "glog/logging.h"
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#pragma once #pragma once
#include <assert.h> #include <assert.h>
#include <time.h> #include <time.h>
#include <atomic> #include <atomic>
#include <random> #include <random>
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#pragma once #pragma once
#include <glog/logging.h> #include <glog/logging.h>
#include <iostream> #include <iostream>
#include <map> #include <map>
#include <string> #include <string>
......
...@@ -12,9 +12,10 @@ ...@@ -12,9 +12,10 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/fleet_executor/carrier.h"
#include <algorithm> #include <algorithm>
#include "paddle/fluid/distributed/fleet_executor/carrier.h"
#include "paddle/fluid/distributed/fleet_executor/global.h" #include "paddle/fluid/distributed/fleet_executor/global.h"
#include "paddle/fluid/distributed/fleet_executor/interceptor.h" #include "paddle/fluid/distributed/fleet_executor/interceptor.h"
#include "paddle/fluid/distributed/fleet_executor/message_bus.h" #include "paddle/fluid/distributed/fleet_executor/message_bus.h"
...@@ -148,8 +149,9 @@ void Carrier::WakeUp() { ...@@ -148,8 +149,9 @@ void Carrier::WakeUp() {
} }
void Carrier::Start() { void Carrier::Start() {
PADDLE_ENFORCE_EQ(is_init_, true, platform::errors::PreconditionNotMet( PADDLE_ENFORCE_EQ(is_init_, true,
"Using carrier before initialized.")); platform::errors::PreconditionNotMet(
"Using carrier before initialized."));
for (int64_t id : source_interceptor_ids_) { for (int64_t id : source_interceptor_ids_) {
VLOG(3) << "Carrier Start is sending start to source interceptor " << id VLOG(3) << "Carrier Start is sending start to source interceptor " << id
<< "."; << ".";
......
...@@ -35,7 +35,7 @@ namespace paddle { ...@@ -35,7 +35,7 @@ namespace paddle {
namespace framework { namespace framework {
class Scope; class Scope;
class ProgramDesc; class ProgramDesc;
} } // namespace framework
namespace distributed { namespace distributed {
......
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/fleet_executor/compute_interceptor.h" #include "paddle/fluid/distributed/fleet_executor/compute_interceptor.h"
#include "paddle/fluid/distributed/fleet_executor/carrier.h"
#include "paddle/fluid/distributed/fleet_executor/carrier.h"
#include "paddle/fluid/distributed/fleet_executor/task_node.h" #include "paddle/fluid/distributed/fleet_executor/task_node.h"
#include "paddle/fluid/framework/executor_gc_helper.h" #include "paddle/fluid/framework/executor_gc_helper.h"
#include "paddle/fluid/framework/operator.h" #include "paddle/fluid/framework/operator.h"
......
...@@ -12,10 +12,12 @@ ...@@ -12,10 +12,12 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/fleet_executor/dist_model.h"
#include <glog/logging.h> #include <glog/logging.h>
#include <chrono> // NOLINT #include <chrono> // NOLINT
#include "paddle/fluid/distributed/fleet_executor/dist_model.h"
#include "paddle/fluid/distributed/fleet_executor/fleet_executor.h" #include "paddle/fluid/distributed/fleet_executor/fleet_executor.h"
#include "paddle/fluid/distributed/fleet_executor/task_node.h" #include "paddle/fluid/distributed/fleet_executor/task_node.h"
#include "paddle/fluid/framework/block_desc.h" #include "paddle/fluid/framework/block_desc.h"
...@@ -294,8 +296,9 @@ bool DistModel::PrepareProgram() { ...@@ -294,8 +296,9 @@ bool DistModel::PrepareProgram() {
bool DistModel::LoadProgram() { bool DistModel::LoadProgram() {
VLOG(3) << "Loading program from " << config_.model_dir; VLOG(3) << "Loading program from " << config_.model_dir;
PADDLE_ENFORCE_NE(config_.model_dir, "", platform::errors::InvalidArgument( PADDLE_ENFORCE_NE(
"Model dir must be provided.")); config_.model_dir, "",
platform::errors::InvalidArgument("Model dir must be provided."));
std::string model_path = config_.model_dir + ".pdmodel"; std::string model_path = config_.model_dir + ".pdmodel";
framework::proto::ProgramDesc program_proto; framework::proto::ProgramDesc program_proto;
std::string pb_content; std::string pb_content;
......
...@@ -31,7 +31,7 @@ namespace framework { ...@@ -31,7 +31,7 @@ namespace framework {
class ProgramDesc; class ProgramDesc;
class Scope; class Scope;
class BlockDesc; class BlockDesc;
} } // namespace framework
namespace distributed { namespace distributed {
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/fleet_executor/dist_model_tensor_wrapper.h" #include "paddle/fluid/distributed/fleet_executor/dist_model_tensor_wrapper.h"
#include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/enforce.h"
namespace paddle { namespace paddle {
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#pragma once #pragma once
#include <string> #include <string>
#include <vector> #include <vector>
#include "paddle/fluid/platform/float16.h" #include "paddle/fluid/platform/float16.h"
#include "paddle/fluid/platform/macros.h" #include "paddle/fluid/platform/macros.h"
......
...@@ -11,9 +11,10 @@ ...@@ -11,9 +11,10 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/fleet_executor/fleet_executor.h"
#include <algorithm> #include <algorithm>
#include "paddle/fluid/distributed/fleet_executor/fleet_executor.h"
#include "paddle/fluid/distributed/fleet_executor/global.h" #include "paddle/fluid/distributed/fleet_executor/global.h"
#include "paddle/fluid/distributed/fleet_executor/message_bus.h" #include "paddle/fluid/distributed/fleet_executor/message_bus.h"
#include "paddle/fluid/distributed/fleet_executor/runtime_graph.h" #include "paddle/fluid/distributed/fleet_executor/runtime_graph.h"
......
...@@ -25,7 +25,7 @@ namespace paddle { ...@@ -25,7 +25,7 @@ namespace paddle {
namespace framework { namespace framework {
class ProgramDesc; class ProgramDesc;
class Scope; class Scope;
} } // namespace framework
namespace distributed { namespace distributed {
class RuntimeGraph; class RuntimeGraph;
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/fleet_executor/interceptor.h" #include "paddle/fluid/distributed/fleet_executor/interceptor.h"
#include "paddle/fluid/distributed/fleet_executor/carrier.h" #include "paddle/fluid/distributed/fleet_executor/carrier.h"
#include "paddle/fluid/distributed/fleet_executor/task_loop.h" #include "paddle/fluid/distributed/fleet_executor/task_loop.h"
#include "paddle/fluid/distributed/fleet_executor/task_node.h" #include "paddle/fluid/distributed/fleet_executor/task_node.h"
......
...@@ -33,7 +33,7 @@ namespace paddle { ...@@ -33,7 +33,7 @@ namespace paddle {
namespace framework { namespace framework {
class Scope; class Scope;
class GarbageCollector; class GarbageCollector;
} } // namespace framework
namespace distributed { namespace distributed {
class TaskNode; class TaskNode;
......
...@@ -12,6 +12,8 @@ ...@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/fleet_executor/message_bus.h"
#include <chrono> #include <chrono>
#include <memory> #include <memory>
#include <set> #include <set>
...@@ -19,7 +21,6 @@ ...@@ -19,7 +21,6 @@
#include "paddle/fluid/distributed/fleet_executor/carrier.h" #include "paddle/fluid/distributed/fleet_executor/carrier.h"
#include "paddle/fluid/distributed/fleet_executor/global.h" #include "paddle/fluid/distributed/fleet_executor/global.h"
#include "paddle/fluid/distributed/fleet_executor/message_bus.h"
#include "paddle/fluid/platform/gen_comm_id_helper.h" #include "paddle/fluid/platform/gen_comm_id_helper.h"
namespace paddle { namespace paddle {
...@@ -28,8 +29,9 @@ namespace distributed { ...@@ -28,8 +29,9 @@ namespace distributed {
void MessageBus::Init( void MessageBus::Init(
int64_t rank, const std::unordered_map<int64_t, std::string>& rank_to_addr, int64_t rank, const std::unordered_map<int64_t, std::string>& rank_to_addr,
const std::string& addr) { const std::string& addr) {
PADDLE_ENFORCE_EQ(is_init_, false, platform::errors::AlreadyExists( PADDLE_ENFORCE_EQ(
"MessageBus is already init.")); is_init_, false,
platform::errors::AlreadyExists("MessageBus is already init."));
rank_ = rank; rank_ = rank;
is_init_ = true; is_init_ = true;
rank_to_addr_ = rank_to_addr; rank_to_addr_ = rank_to_addr;
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
#if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE) #if defined(PADDLE_WITH_DISTRIBUTE) && defined(PADDLE_WITH_PSCORE)
#include "paddle/fluid/distributed/fleet_executor/message_service.h" #include "paddle/fluid/distributed/fleet_executor/message_service.h"
#include "brpc/server.h" #include "brpc/server.h"
#include "paddle/fluid/distributed/fleet_executor/global.h" #include "paddle/fluid/distributed/fleet_executor/global.h"
#include "paddle/fluid/distributed/fleet_executor/message_bus.h" #include "paddle/fluid/distributed/fleet_executor/message_bus.h"
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/fleet_executor/runtime_graph.h" #include "paddle/fluid/distributed/fleet_executor/runtime_graph.h"
#include "paddle/fluid/distributed/fleet_executor/task_node.h" #include "paddle/fluid/distributed/fleet_executor/task_node.h"
namespace paddle { namespace paddle {
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include "paddle/fluid/distributed/fleet_executor/fleet_executor_desc.pb.h" #include "paddle/fluid/distributed/fleet_executor/fleet_executor_desc.pb.h"
#include "paddle/fluid/framework/op_proto_maker.h" #include "paddle/fluid/framework/op_proto_maker.h"
#include "paddle/fluid/platform/macros.h" #include "paddle/fluid/platform/macros.h"
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/fleet_executor/sink_interceptor.h" #include "paddle/fluid/distributed/fleet_executor/sink_interceptor.h"
#include "paddle/fluid/distributed/fleet_executor/task_node.h" #include "paddle/fluid/distributed/fleet_executor/task_node.h"
namespace paddle { namespace paddle {
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/fleet_executor/source_interceptor.h" #include "paddle/fluid/distributed/fleet_executor/source_interceptor.h"
#include "paddle/fluid/distributed/fleet_executor/task_node.h" #include "paddle/fluid/distributed/fleet_executor/task_node.h"
namespace paddle { namespace paddle {
......
...@@ -31,8 +31,9 @@ TaskLoopThread::~TaskLoopThread() { ...@@ -31,8 +31,9 @@ TaskLoopThread::~TaskLoopThread() {
} }
TaskLoop* TaskLoopThread::StartLoop() { TaskLoop* TaskLoopThread::StartLoop() {
PADDLE_ENFORCE_EQ(start_, false, platform::errors::PreconditionNotMet( PADDLE_ENFORCE_EQ(
"thread is already running.")); start_, false,
platform::errors::PreconditionNotMet("thread is already running."));
start_ = true; start_ = true;
thread_ = std::thread([this]() { Loop(); }); thread_ = std::thread([this]() { Loop(); });
......
...@@ -30,8 +30,9 @@ TaskLoopThreadPool::TaskLoopThreadPool(int thread_num) ...@@ -30,8 +30,9 @@ TaskLoopThreadPool::TaskLoopThreadPool(int thread_num)
TaskLoopThreadPool::~TaskLoopThreadPool() = default; TaskLoopThreadPool::~TaskLoopThreadPool() = default;
void TaskLoopThreadPool::Start() { void TaskLoopThreadPool::Start() {
PADDLE_ENFORCE_EQ(start_, false, platform::errors::PreconditionNotMet( PADDLE_ENFORCE_EQ(
"thread pool is already start.")); start_, false,
platform::errors::PreconditionNotMet("thread pool is already start."));
PADDLE_ENFORCE_GT( PADDLE_ENFORCE_GT(
thread_num_, 0, thread_num_, 0,
platform::errors::InvalidArgument( platform::errors::InvalidArgument(
...@@ -45,10 +46,12 @@ void TaskLoopThreadPool::Start() { ...@@ -45,10 +46,12 @@ void TaskLoopThreadPool::Start() {
} }
TaskLoop* TaskLoopThreadPool::GetLoop(int tid) { TaskLoop* TaskLoopThreadPool::GetLoop(int tid) {
PADDLE_ENFORCE_EQ(start_, true, platform::errors::PreconditionNotMet( PADDLE_ENFORCE_EQ(
"thread pool must start first.")); start_, true,
PADDLE_ENFORCE_GE(tid, 0, platform::errors::OutOfRange( platform::errors::PreconditionNotMet("thread pool must start first."));
"tid must >= 0, but now is %d", tid)); PADDLE_ENFORCE_GE(
tid, 0,
platform::errors::OutOfRange("tid must >= 0, but now is %d", tid));
PADDLE_ENFORCE_LT(tid, thread_num_, PADDLE_ENFORCE_LT(tid, thread_num_,
platform::errors::OutOfRange( platform::errors::OutOfRange(
"tid must < thread_num, but now tid=%d thread_num=%d", "tid must < thread_num, but now tid=%d thread_num=%d",
...@@ -57,8 +60,9 @@ TaskLoop* TaskLoopThreadPool::GetLoop(int tid) { ...@@ -57,8 +60,9 @@ TaskLoop* TaskLoopThreadPool::GetLoop(int tid) {
} }
std::vector<TaskLoop*> TaskLoopThreadPool::GetAllLoops() { std::vector<TaskLoop*> TaskLoopThreadPool::GetAllLoops() {
PADDLE_ENFORCE_EQ(start_, true, platform::errors::PreconditionNotMet( PADDLE_ENFORCE_EQ(
"thread pool must start first.")); start_, true,
platform::errors::PreconditionNotMet("thread pool must start first."));
return loops_; return loops_;
} }
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/fleet_executor/task_node.h" #include "paddle/fluid/distributed/fleet_executor/task_node.h"
#include "paddle/fluid/framework/op_desc.h" #include "paddle/fluid/framework/op_desc.h"
#include "paddle/fluid/framework/op_registry.h" #include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/operator.h" #include "paddle/fluid/framework/operator.h"
...@@ -153,15 +154,17 @@ void TaskNode::SetRunAtOffset(int64_t value) { ...@@ -153,15 +154,17 @@ void TaskNode::SetRunAtOffset(int64_t value) {
void TaskNode::SetReplyUpPerSteps(int64_t value) { void TaskNode::SetReplyUpPerSteps(int64_t value) {
PADDLE_ENFORCE_GE( PADDLE_ENFORCE_GE(
value, 1, platform::errors::InvalidArgument( value, 1,
"reply_up_per_steps must >= 1, but received %ld", value)); platform::errors::InvalidArgument(
"reply_up_per_steps must >= 1, but received %ld", value));
reply_up_per_steps_ = value; reply_up_per_steps_ = value;
} }
void TaskNode::SetSendDownPerSteps(int64_t value) { void TaskNode::SetSendDownPerSteps(int64_t value) {
PADDLE_ENFORCE_GE( PADDLE_ENFORCE_GE(
value, 1, platform::errors::InvalidArgument( value, 1,
"send_down_per_steps must >= 1, but received %ld", value)); platform::errors::InvalidArgument(
"send_down_per_steps must >= 1, but received %ld", value));
send_down_per_steps_ = value; send_down_per_steps_ = value;
} }
......
...@@ -26,7 +26,7 @@ namespace paddle { ...@@ -26,7 +26,7 @@ namespace paddle {
namespace framework { namespace framework {
class OperatorBase; class OperatorBase;
class OpDesc; class OpDesc;
} } // namespace framework
namespace distributed { namespace distributed {
class TaskNode final { class TaskNode final {
......
...@@ -16,7 +16,6 @@ limitations under the License. */ ...@@ -16,7 +16,6 @@ limitations under the License. */
#include <unordered_map> #include <unordered_map>
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "paddle/fluid/distributed/fleet_executor/carrier.h" #include "paddle/fluid/distributed/fleet_executor/carrier.h"
#include "paddle/fluid/distributed/fleet_executor/global.h" #include "paddle/fluid/distributed/fleet_executor/global.h"
#include "paddle/fluid/distributed/fleet_executor/interceptor.h" #include "paddle/fluid/distributed/fleet_executor/interceptor.h"
......
...@@ -16,7 +16,6 @@ limitations under the License. */ ...@@ -16,7 +16,6 @@ limitations under the License. */
#include <unordered_map> #include <unordered_map>
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "paddle/fluid/distributed/fleet_executor/carrier.h" #include "paddle/fluid/distributed/fleet_executor/carrier.h"
#include "paddle/fluid/distributed/fleet_executor/global.h" #include "paddle/fluid/distributed/fleet_executor/global.h"
#include "paddle/fluid/distributed/fleet_executor/interceptor.h" #include "paddle/fluid/distributed/fleet_executor/interceptor.h"
......
...@@ -16,7 +16,6 @@ limitations under the License. */ ...@@ -16,7 +16,6 @@ limitations under the License. */
#include <unordered_map> #include <unordered_map>
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "paddle/fluid/distributed/fleet_executor/carrier.h" #include "paddle/fluid/distributed/fleet_executor/carrier.h"
#include "paddle/fluid/distributed/fleet_executor/global.h" #include "paddle/fluid/distributed/fleet_executor/global.h"
#include "paddle/fluid/distributed/fleet_executor/interceptor.h" #include "paddle/fluid/distributed/fleet_executor/interceptor.h"
......
...@@ -14,11 +14,11 @@ limitations under the License. */ ...@@ -14,11 +14,11 @@ limitations under the License. */
#include <sys/socket.h> #include <sys/socket.h>
#include <time.h> #include <time.h>
#include <iostream> #include <iostream>
#include <unordered_map> #include <unordered_map>
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "paddle/fluid/distributed/fleet_executor/carrier.h" #include "paddle/fluid/distributed/fleet_executor/carrier.h"
#include "paddle/fluid/distributed/fleet_executor/global.h" #include "paddle/fluid/distributed/fleet_executor/global.h"
#include "paddle/fluid/distributed/fleet_executor/interceptor.h" #include "paddle/fluid/distributed/fleet_executor/interceptor.h"
......
...@@ -16,7 +16,6 @@ limitations under the License. */ ...@@ -16,7 +16,6 @@ limitations under the License. */
#include <unordered_map> #include <unordered_map>
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "paddle/fluid/distributed/fleet_executor/carrier.h" #include "paddle/fluid/distributed/fleet_executor/carrier.h"
#include "paddle/fluid/distributed/fleet_executor/global.h" #include "paddle/fluid/distributed/fleet_executor/global.h"
#include "paddle/fluid/distributed/fleet_executor/interceptor.h" #include "paddle/fluid/distributed/fleet_executor/interceptor.h"
......
...@@ -16,7 +16,6 @@ limitations under the License. */ ...@@ -16,7 +16,6 @@ limitations under the License. */
#include <unordered_map> #include <unordered_map>
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "paddle/fluid/distributed/fleet_executor/carrier.h" #include "paddle/fluid/distributed/fleet_executor/carrier.h"
#include "paddle/fluid/distributed/fleet_executor/global.h" #include "paddle/fluid/distributed/fleet_executor/global.h"
#include "paddle/fluid/distributed/fleet_executor/interceptor.h" #include "paddle/fluid/distributed/fleet_executor/interceptor.h"
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
#include <unordered_map> #include <unordered_map>
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "paddle/fluid/distributed/fleet_executor/carrier.h" #include "paddle/fluid/distributed/fleet_executor/carrier.h"
#include "paddle/fluid/distributed/fleet_executor/global.h" #include "paddle/fluid/distributed/fleet_executor/global.h"
#include "paddle/fluid/distributed/fleet_executor/interceptor.h" #include "paddle/fluid/distributed/fleet_executor/interceptor.h"
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
#include <unordered_map> #include <unordered_map>
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "paddle/fluid/distributed/fleet_executor/carrier.h" #include "paddle/fluid/distributed/fleet_executor/carrier.h"
#include "paddle/fluid/distributed/fleet_executor/global.h" #include "paddle/fluid/distributed/fleet_executor/global.h"
#include "paddle/fluid/distributed/fleet_executor/interceptor.h" #include "paddle/fluid/distributed/fleet_executor/interceptor.h"
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/index_dataset/index_sampler.h" #include "paddle/fluid/distributed/index_dataset/index_sampler.h"
#include "paddle/fluid/framework/data_feed.h" #include "paddle/fluid/framework/data_feed.h"
namespace paddle { namespace paddle {
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#pragma once #pragma once
#include <vector> #include <vector>
#include "paddle/fluid/distributed/index_dataset/index_wrapper.h" #include "paddle/fluid/distributed/index_dataset/index_wrapper.h"
#include "paddle/fluid/framework/data_feed.h" #include "paddle/fluid/framework/data_feed.h"
#include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/program_desc.h"
......
...@@ -9,15 +9,16 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ...@@ -9,15 +9,16 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include "paddle/fluid/distributed/index_dataset/index_wrapper.h"
#include <memory> #include <memory>
#include <string> #include <string>
#include <thread> #include <thread>
#include <unordered_map> #include <unordered_map>
#include <unordered_set> #include <unordered_set>
#include <vector> #include <vector>
#include "paddle/fluid/framework/io/fs.h"
#include "paddle/fluid/distributed/index_dataset/index_wrapper.h" #include "paddle/fluid/framework/io/fs.h"
namespace paddle { namespace paddle {
namespace distributed { namespace distributed {
......
...@@ -17,6 +17,7 @@ limitations under the License. */ ...@@ -17,6 +17,7 @@ limitations under the License. */
#include <unordered_set> #include <unordered_set>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "paddle/fluid/distributed/index_dataset/index_dataset.pb.h" #include "paddle/fluid/distributed/index_dataset/index_dataset.pb.h"
#include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/enforce.h"
...@@ -90,10 +91,11 @@ class IndexWrapper { ...@@ -90,10 +91,11 @@ class IndexWrapper {
} }
TreePtr tree = std::make_shared<TreeIndex>(); TreePtr tree = std::make_shared<TreeIndex>();
int ret = tree->Load(tree_path); int ret = tree->Load(tree_path);
PADDLE_ENFORCE_EQ(ret, 0, paddle::platform::errors::InvalidArgument( PADDLE_ENFORCE_EQ(ret, 0,
"Load tree[%s] from path[%s] failed. Please " paddle::platform::errors::InvalidArgument(
"check whether the file exists.", "Load tree[%s] from path[%s] failed. Please "
name, tree_path)); "check whether the file exists.",
name, tree_path));
tree_map.insert(std::pair<std::string, TreePtr>{name, tree}); tree_map.insert(std::pair<std::string, TreePtr>{name, tree});
} }
......
...@@ -12,11 +12,12 @@ ...@@ -12,11 +12,12 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/ps/service/brpc_ps_client.h"
#include <memory> #include <memory>
#include <sstream> #include <sstream>
#include <string> #include <string>
#include "paddle/fluid/distributed/ps/service/brpc_ps_client.h"
#include "paddle/fluid/framework/archive.h" #include "paddle/fluid/framework/archive.h"
static const int max_port = 65535; static const int max_port = 65535;
...@@ -245,8 +246,9 @@ int32_t BrpcPsClient::Initialize() { ...@@ -245,8 +246,9 @@ int32_t BrpcPsClient::Initialize() {
int DownpourBrpcClosure::check_response(size_t request_idx, int cmd_id) { int DownpourBrpcClosure::check_response(size_t request_idx, int cmd_id) {
if (_cntls[request_idx]->Failed()) { if (_cntls[request_idx]->Failed()) {
LOG(ERROR) << "resquest cmd_id:" << cmd_id << " failed, " LOG(ERROR) << "resquest cmd_id:" << cmd_id
"err:" << " failed, "
"err:"
<< _cntls[request_idx]->ErrorText(); << _cntls[request_idx]->ErrorText();
return -1; return -1;
} }
...@@ -263,8 +265,9 @@ int DownpourBrpcClosure::check_response(size_t request_idx, int cmd_id) { ...@@ -263,8 +265,9 @@ int DownpourBrpcClosure::check_response(size_t request_idx, int cmd_id) {
int DownpourBrpcClosure::check_save_response(size_t request_idx, int cmd_id) { int DownpourBrpcClosure::check_save_response(size_t request_idx, int cmd_id) {
int32_t feasign_size = 0; int32_t feasign_size = 0;
if (_cntls[request_idx]->Failed()) { if (_cntls[request_idx]->Failed()) {
LOG(ERROR) << "resquest cmd_id:" << cmd_id << " failed, " LOG(ERROR) << "resquest cmd_id:" << cmd_id
"err:" << " failed, "
"err:"
<< _cntls[request_idx]->ErrorText(); << _cntls[request_idx]->ErrorText();
return -1; return -1;
} }
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#pragma once #pragma once
#include <ThreadPool.h> #include <ThreadPool.h>
#include <memory> #include <memory>
#include <string> #include <string>
#include <vector> #include <vector>
......
...@@ -13,7 +13,9 @@ ...@@ -13,7 +13,9 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/ps/service/brpc_ps_server.h" #include "paddle/fluid/distributed/ps/service/brpc_ps_server.h"
#include <thread> // NOLINT #include <thread> // NOLINT
#include "butil/object_pool.h" #include "butil/object_pool.h"
#include "paddle/fluid/distributed/common/cost_timer.h" #include "paddle/fluid/distributed/common/cost_timer.h"
#include "paddle/fluid/distributed/ps/table/depends/sparse_utils.h" #include "paddle/fluid/distributed/ps/table/depends/sparse_utils.h"
......
...@@ -15,6 +15,7 @@ limitations under the License. */ ...@@ -15,6 +15,7 @@ limitations under the License. */
#pragma once #pragma once
#include <netdb.h> #include <netdb.h>
#include <iostream> #include <iostream>
#include <string> #include <string>
#include <vector> #include <vector>
......
...@@ -13,7 +13,9 @@ See the License for the specific language governing permissions and ...@@ -13,7 +13,9 @@ See the License for the specific language governing permissions and
limitations under the License. */ limitations under the License. */
#include "paddle/fluid/distributed/ps/service/communicator/communicator.h" #include "paddle/fluid/distributed/ps/service/communicator/communicator.h"
#include <google/protobuf/text_format.h> #include <google/protobuf/text_format.h>
#include "gflags/gflags.h" #include "gflags/gflags.h"
#include "paddle/fluid/distributed/ps/service/brpc_ps_client.h" #include "paddle/fluid/distributed/ps/service/brpc_ps_client.h"
#include "paddle/fluid/distributed/ps/wrapper/fleet.h" #include "paddle/fluid/distributed/ps/wrapper/fleet.h"
......
...@@ -16,6 +16,7 @@ limitations under the License. */ ...@@ -16,6 +16,7 @@ limitations under the License. */
#include <ThreadPool.h> #include <ThreadPool.h>
#include <stdint.h> #include <stdint.h>
#include <atomic> #include <atomic>
#include <deque> #include <deque>
#include <map> #include <map>
...@@ -30,6 +31,7 @@ limitations under the License. */ ...@@ -30,6 +31,7 @@ limitations under the License. */
#include "gflags/gflags.h" #include "gflags/gflags.h"
#include "paddle/fluid/distributed/ps/service/communicator/communicator_common.h" #include "paddle/fluid/distributed/ps/service/communicator/communicator_common.h"
#include "paddle/fluid/distributed/ps/service/ps_client.h"
#include "paddle/fluid/framework/channel.h" #include "paddle/fluid/framework/channel.h"
#include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/variable.h" #include "paddle/fluid/framework/variable.h"
...@@ -42,8 +44,6 @@ limitations under the License. */ ...@@ -42,8 +44,6 @@ limitations under the License. */
#include "paddle/phi/kernels/funcs/blas/blas.h" #include "paddle/phi/kernels/funcs/blas/blas.h"
#include "paddle/phi/kernels/funcs/math_function.h" #include "paddle/phi/kernels/funcs/math_function.h"
#include "paddle/fluid/distributed/ps/service/ps_client.h"
namespace paddle { namespace paddle {
namespace distributed { namespace distributed {
class PSClient; class PSClient;
...@@ -157,8 +157,9 @@ template <typename T> ...@@ -157,8 +157,9 @@ template <typename T>
inline void MergeVars(const std::string &var_name, inline void MergeVars(const std::string &var_name,
const std::vector<std::shared_ptr<Variable>> &vars, const std::vector<std::shared_ptr<Variable>> &vars,
Scope *scope, bool merge_add = true) { Scope *scope, bool merge_add = true) {
PADDLE_ENFORCE_NE(vars.empty(), true, platform::errors::InvalidArgument( PADDLE_ENFORCE_NE(
"vector vars are empty.")); vars.empty(), true,
platform::errors::InvalidArgument("vector vars are empty."));
auto cpu_place = platform::CPUPlace(); auto cpu_place = platform::CPUPlace();
auto &var0 = vars[0]; auto &var0 = vars[0];
auto *out_var = scope->Var(var_name); auto *out_var = scope->Var(var_name);
......
...@@ -18,11 +18,13 @@ ...@@ -18,11 +18,13 @@
#include <glog/logging.h> #include <glog/logging.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <stdio.h> #include <stdio.h>
#include <algorithm> #include <algorithm>
#include <memory> #include <memory>
#include <string> #include <string>
#include <unordered_set> #include <unordered_set>
#include <vector> #include <vector>
#include "gflags/gflags.h" #include "gflags/gflags.h"
namespace paddle { namespace paddle {
......
...@@ -13,12 +13,14 @@ ...@@ -13,12 +13,14 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/ps/service/graph_brpc_client.h" #include "paddle/fluid/distributed/ps/service/graph_brpc_client.h"
#include <algorithm> #include <algorithm>
#include <memory> #include <memory>
#include <sstream> #include <sstream>
#include <string> #include <string>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "Eigen/Dense" #include "Eigen/Dense"
#include "paddle/fluid/distributed/ps/service/brpc_ps_client.h" #include "paddle/fluid/distributed/ps/service/brpc_ps_client.h"
#include "paddle/fluid/distributed/ps/table/table.h" #include "paddle/fluid/distributed/ps/table/table.h"
...@@ -149,7 +151,7 @@ std::future<int32_t> GraphBrpcClient::get_node_feat( ...@@ -149,7 +151,7 @@ std::future<int32_t> GraphBrpcClient::get_node_feat(
std::future<int32_t> GraphBrpcClient::clear_nodes(uint32_t table_id, std::future<int32_t> GraphBrpcClient::clear_nodes(uint32_t table_id,
int type_id, int idx_) { int type_id, int idx_) {
DownpourBrpcClosure *closure = new DownpourBrpcClosure( DownpourBrpcClosure *closure = new DownpourBrpcClosure(
server_size, [&, server_size = this->server_size ](void *done) { server_size, [&, server_size = this->server_size](void *done) {
int ret = 0; int ret = 0;
auto *closure = (DownpourBrpcClosure *)done; auto *closure = (DownpourBrpcClosure *)done;
size_t fail_num = 0; size_t fail_num = 0;
...@@ -665,5 +667,5 @@ int32_t GraphBrpcClient::Initialize() { ...@@ -665,5 +667,5 @@ int32_t GraphBrpcClient::Initialize() {
local_channel = NULL; local_channel = NULL;
return 0; return 0;
} }
} } // namespace distributed
} } // namespace paddle
...@@ -15,11 +15,12 @@ ...@@ -15,11 +15,12 @@
#pragma once #pragma once
#include <ThreadPool.h> #include <ThreadPool.h>
#include <memory> #include <memory>
#include <string> #include <string>
#include <utility>
#include <vector> #include <vector>
#include <utility>
#include "ThreadPool.h" #include "ThreadPool.h"
#include "brpc/channel.h" #include "brpc/channel.h"
#include "brpc/controller.h" #include "brpc/controller.h"
......
...@@ -13,13 +13,14 @@ ...@@ -13,13 +13,14 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/ps/service/graph_brpc_server.h" #include "paddle/fluid/distributed/ps/service/graph_brpc_server.h"
#include "paddle/fluid/distributed/ps/service/brpc_ps_server.h"
#include <thread> // NOLINT #include <thread> // NOLINT
#include <utility> #include <utility>
#include "butil/endpoint.h" #include "butil/endpoint.h"
#include "iomanip" #include "iomanip"
#include "paddle/fluid/distributed/ps/service/brpc_ps_client.h" #include "paddle/fluid/distributed/ps/service/brpc_ps_client.h"
#include "paddle/fluid/distributed/ps/service/brpc_ps_server.h"
#include "paddle/fluid/framework/archive.h" #include "paddle/fluid/framework/archive.h"
#include "paddle/fluid/platform/profiler.h" #include "paddle/fluid/platform/profiler.h"
namespace paddle { namespace paddle {
......
...@@ -14,12 +14,12 @@ ...@@ -14,12 +14,12 @@
#pragma once #pragma once
#include <memory>
#include <vector>
#include "brpc/channel.h" #include "brpc/channel.h"
#include "brpc/controller.h" #include "brpc/controller.h"
#include "brpc/server.h" #include "brpc/server.h"
#include <memory>
#include <vector>
#include "paddle/fluid/distributed/ps/service/brpc_ps_server.h" #include "paddle/fluid/distributed/ps/service/brpc_ps_server.h"
#include "paddle/fluid/distributed/ps/service/server.h" #include "paddle/fluid/distributed/ps/service/server.h"
#include "paddle/fluid/distributed/ps/table/common_graph_table.h" #include "paddle/fluid/distributed/ps/table/common_graph_table.h"
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/ps/service/ps_client.h" #include "paddle/fluid/distributed/ps/service/ps_client.h"
#include "glog/logging.h" #include "glog/logging.h"
#include "paddle/fluid/distributed/ps/service/brpc_ps_client.h" #include "paddle/fluid/distributed/ps/service/brpc_ps_client.h"
#include "paddle/fluid/distributed/ps/service/graph_brpc_client.h" #include "paddle/fluid/distributed/ps/service/graph_brpc_client.h"
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include "paddle/fluid/distributed/common/cost_timer.h" #include "paddle/fluid/distributed/common/cost_timer.h"
#include "paddle/fluid/distributed/ps.pb.h" #include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/distributed/ps/service/env.h" #include "paddle/fluid/distributed/ps/service/env.h"
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/ps/service/ps_local_client.h" #include "paddle/fluid/distributed/ps/service/ps_local_client.h"
#include "paddle/fluid/distributed/ps/table/table.h" #include "paddle/fluid/distributed/ps/table/table.h"
//#define pslib_debug_dense_compress //#define pslib_debug_dense_compress
...@@ -316,5 +317,5 @@ int32_t PsLocalClient::Initialize() { ...@@ -316,5 +317,5 @@ int32_t PsLocalClient::Initialize() {
table_ptr->Push(table_context); table_ptr->Push(table_context);
return done(); return done();
} }
} } // namespace distributed
} } // namespace paddle
...@@ -223,5 +223,5 @@ class PsLocalClient : public PSClient { ...@@ -223,5 +223,5 @@ class PsLocalClient : public PSClient {
float _mse = 0; float _mse = 0;
uint16_t _push_times = 0; uint16_t _push_times = 0;
}; };
} } // namespace distributed
} } // namespace paddle
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include <memory> #include <memory>
#include <vector> #include <vector>
#include "paddle/fluid/distributed/ps/service/server.h" #include "paddle/fluid/distributed/ps/service/server.h"
namespace paddle { namespace paddle {
...@@ -37,5 +38,5 @@ class PsLocalServer : public PSServer { ...@@ -37,5 +38,5 @@ class PsLocalServer : public PSServer {
private: private:
virtual int32_t Initialize() { return 0; } virtual int32_t Initialize() { return 0; }
}; };
} } // namespace distributed
} } // namespace paddle
...@@ -13,7 +13,9 @@ ...@@ -13,7 +13,9 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/ps/service/ps_service/graph_py_service.h" #include "paddle/fluid/distributed/ps/service/ps_service/graph_py_service.h"
#include <thread> // NOLINT #include <thread> // NOLINT
#include "butil/endpoint.h" #include "butil/endpoint.h"
#include "iomanip" #include "iomanip"
#include "paddle/fluid/distributed/ps/table/table.h" #include "paddle/fluid/distributed/ps/table/table.h"
...@@ -501,5 +503,5 @@ void GraphPyClient::StopServer() { ...@@ -501,5 +503,5 @@ void GraphPyClient::StopServer() {
if (status.get() == 0) stoped_ = true; if (status.get() == 0) stoped_ = true;
} }
void GraphPyClient::FinalizeWorker() { this->worker_ptr->FinalizeWorker(); } void GraphPyClient::FinalizeWorker() { this->worker_ptr->FinalizeWorker(); }
} } // namespace distributed
} } // namespace paddle
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
#pragma once #pragma once
#include <unistd.h> #include <unistd.h>
#include <condition_variable> // NOLINT #include <condition_variable> // NOLINT
#include <fstream> #include <fstream>
#include <iomanip> #include <iomanip>
...@@ -23,21 +24,20 @@ ...@@ -23,21 +24,20 @@
#include <thread> // NOLINT #include <thread> // NOLINT
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include "google/protobuf/text_format.h"
#include "google/protobuf/text_format.h"
#include "gtest/gtest.h" #include "gtest/gtest.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/tensor_util.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/distributed/ps.pb.h" #include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/distributed/ps/service/env.h" #include "paddle/fluid/distributed/ps/service/env.h"
#include "paddle/fluid/distributed/ps/service/graph_brpc_client.h" #include "paddle/fluid/distributed/ps/service/graph_brpc_client.h"
#include "paddle/fluid/distributed/ps/service/graph_brpc_server.h" #include "paddle/fluid/distributed/ps/service/graph_brpc_server.h"
#include "paddle/fluid/distributed/ps/service/ps_service/service.h" #include "paddle/fluid/distributed/ps/service/ps_service/service.h"
#include "paddle/fluid/distributed/ps/service/sendrecv.pb.h" #include "paddle/fluid/distributed/ps/service/sendrecv.pb.h"
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/tensor_util.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/platform/place.h" #include "paddle/fluid/platform/place.h"
#include "paddle/fluid/string/printf.h" #include "paddle/fluid/string/printf.h"
#include "paddle/phi/kernels/funcs/math_function.h" #include "paddle/phi/kernels/funcs/math_function.h"
...@@ -198,5 +198,5 @@ class GraphPyClient : public GraphPyService { ...@@ -198,5 +198,5 @@ class GraphPyClient : public GraphPyService {
std::thread* client_thread; std::thread* client_thread;
bool stoped_ = false; bool stoped_ = false;
}; };
} } // namespace distributed
} } // namespace paddle
...@@ -17,7 +17,9 @@ ...@@ -17,7 +17,9 @@
#include <fcntl.h> #include <fcntl.h>
#include <google/protobuf/io/zero_copy_stream_impl.h> #include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/text_format.h> #include <google/protobuf/text_format.h>
#include <iostream> #include <iostream>
#include "paddle/fluid/distributed/ps/service/communicator/communicator.h" #include "paddle/fluid/distributed/ps/service/communicator/communicator.h"
#include "paddle/fluid/string/string_helper.h" #include "paddle/fluid/string/string_helper.h"
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include <unordered_map> #include <unordered_map>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "butil/endpoint.h" #include "butil/endpoint.h"
#include "google/protobuf/service.h" #include "google/protobuf/service.h"
#include "paddle/fluid/distributed/common/registerer.h" #include "paddle/fluid/distributed/common/registerer.h"
......
...@@ -15,8 +15,10 @@ ...@@ -15,8 +15,10 @@
#pragma once #pragma once
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include "paddle/fluid/distributed/common/afs_warpper.h" #include "paddle/fluid/distributed/common/afs_warpper.h"
#include "paddle/fluid/distributed/common/registerer.h" #include "paddle/fluid/distributed/common/registerer.h"
#include "paddle/fluid/distributed/ps.pb.h" #include "paddle/fluid/distributed/ps.pb.h"
......
...@@ -13,11 +13,14 @@ ...@@ -13,11 +13,14 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/ps/table/common_graph_table.h" #include "paddle/fluid/distributed/ps/table/common_graph_table.h"
#include <time.h> #include <time.h>
#include <algorithm> #include <algorithm>
#include <chrono> #include <chrono>
#include <set> #include <set>
#include <sstream> #include <sstream>
#include "paddle/fluid/distributed/common/utils.h" #include "paddle/fluid/distributed/common/utils.h"
#include "paddle/fluid/distributed/ps/table/graph/graph_node.h" #include "paddle/fluid/distributed/ps/table/graph/graph_node.h"
#include "paddle/fluid/framework/generator.h" #include "paddle/fluid/framework/generator.h"
...@@ -212,7 +215,6 @@ int64_t GraphTable::load_graph_to_memory_from_ssd(int idx, ...@@ -212,7 +215,6 @@ int64_t GraphTable::load_graph_to_memory_from_ssd(int idx,
for (size_t i = 0; i < bags.size(); i++) { for (size_t i = 0; i < bags.size(); i++) {
if (bags[i].size() > 0) { if (bags[i].size() > 0) {
tasks.push_back(_shards_task_pool[i]->enqueue([&, i, idx, this]() -> int { tasks.push_back(_shards_task_pool[i]->enqueue([&, i, idx, this]() -> int {
char ch[sizeof(int) * 2 + sizeof(int64_t)]; char ch[sizeof(int) * 2 + sizeof(int64_t)];
memset(ch, 0, sizeof(int)); memset(ch, 0, sizeof(int));
memcpy(ch + sizeof(int), &idx, sizeof(int)); memcpy(ch + sizeof(int), &idx, sizeof(int));
...@@ -353,7 +355,6 @@ void GraphTable::export_partition_files(int idx, std::string file_path) { ...@@ -353,7 +355,6 @@ void GraphTable::export_partition_files(int idx, std::string file_path) {
for (int i = 0; i < part_len; i++) { for (int i = 0; i < part_len; i++) {
tasks.push_back(_shards_task_pool[i % task_pool_size_]->enqueue( tasks.push_back(_shards_task_pool[i % task_pool_size_]->enqueue(
[&, i, idx, this]() -> int { [&, i, idx, this]() -> int {
std::string output_path = std::string output_path =
file_path + "partition_" + std::to_string(i); file_path + "partition_" + std::to_string(i);
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include <ThreadPool.h> #include <ThreadPool.h>
#include <assert.h> #include <assert.h>
#include <pthread.h> #include <pthread.h>
#include <algorithm> #include <algorithm>
#include <cassert> #include <cassert>
#include <cstdio> #include <cstdio>
...@@ -36,6 +37,7 @@ ...@@ -36,6 +37,7 @@
#include <unordered_set> #include <unordered_set>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "paddle/fluid/distributed/ps/table/accessor.h" #include "paddle/fluid/distributed/ps/table/accessor.h"
#include "paddle/fluid/distributed/ps/table/common_table.h" #include "paddle/fluid/distributed/ps/table/common_table.h"
#include "paddle/fluid/distributed/ps/table/graph/class_macro.h" #include "paddle/fluid/distributed/ps/table/graph/class_macro.h"
...@@ -670,4 +672,4 @@ struct hash<paddle::distributed::SampleKey> { ...@@ -670,4 +672,4 @@ struct hash<paddle::distributed::SampleKey> {
return s.idx ^ s.node_key ^ s.sample_size; return s.idx ^ s.node_key ^ s.sample_size;
} }
}; };
} } // namespace std
...@@ -19,9 +19,8 @@ ...@@ -19,9 +19,8 @@
#include <mutex> // NOLINT #include <mutex> // NOLINT
#include <set> #include <set>
#include "paddle/fluid/distributed/ps/table/table.h"
#include "paddle/fluid/distributed/common/utils.h" #include "paddle/fluid/distributed/common/utils.h"
#include "paddle/fluid/distributed/ps/table/table.h"
namespace paddle { namespace paddle {
namespace distributed { namespace distributed {
......
...@@ -13,7 +13,9 @@ ...@@ -13,7 +13,9 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/ps/table/ctr_accessor.h" #include "paddle/fluid/distributed/ps/table/ctr_accessor.h"
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include "glog/logging.h" #include "glog/logging.h"
#include "paddle/fluid/string/string_helper.h" #include "paddle/fluid/string/string_helper.h"
......
...@@ -15,7 +15,9 @@ ...@@ -15,7 +15,9 @@
#pragma once #pragma once
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <vector> #include <vector>
#include "paddle/fluid/distributed/common/registerer.h" #include "paddle/fluid/distributed/common/registerer.h"
#include "paddle/fluid/distributed/ps.pb.h" #include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/distributed/ps/table/accessor.h" #include "paddle/fluid/distributed/ps/table/accessor.h"
......
...@@ -13,7 +13,9 @@ ...@@ -13,7 +13,9 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/ps/table/ctr_double_accessor.h" #include "paddle/fluid/distributed/ps/table/ctr_double_accessor.h"
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include "glog/logging.h" #include "glog/logging.h"
#include "paddle/fluid/string/string_helper.h" #include "paddle/fluid/string/string_helper.h"
......
...@@ -15,7 +15,9 @@ ...@@ -15,7 +15,9 @@
#pragma once #pragma once
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <vector> #include <vector>
#include "paddle/fluid/distributed/common/registerer.h" #include "paddle/fluid/distributed/common/registerer.h"
#include "paddle/fluid/distributed/ps.pb.h" #include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/distributed/ps/table/accessor.h" #include "paddle/fluid/distributed/ps/table/accessor.h"
......
...@@ -13,7 +13,9 @@ ...@@ -13,7 +13,9 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/ps/table/ctr_dymf_accessor.h" #include "paddle/fluid/distributed/ps/table/ctr_dymf_accessor.h"
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include "glog/logging.h" #include "glog/logging.h"
#include "paddle/fluid/string/string_helper.h" #include "paddle/fluid/string/string_helper.h"
......
...@@ -15,7 +15,9 @@ ...@@ -15,7 +15,9 @@
#pragma once #pragma once
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <vector> #include <vector>
#include "paddle/fluid/distributed/common/registerer.h" #include "paddle/fluid/distributed/common/registerer.h"
#include "paddle/fluid/distributed/ps.pb.h" #include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/distributed/ps/table/accessor.h" #include "paddle/fluid/distributed/ps/table/accessor.h"
......
...@@ -15,13 +15,14 @@ ...@@ -15,13 +15,14 @@
#pragma once #pragma once
#include <math.h> // for sqrt in CPU and CUDA #include <math.h> // for sqrt in CPU and CUDA
#include <functional> #include <functional>
#include <memory> #include <memory>
#include <string> #include <string>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "gflags/gflags.h"
#include "gflags/gflags.h"
#include "paddle/fluid/distributed/common/utils.h" #include "paddle/fluid/distributed/common/utils.h"
namespace paddle { namespace paddle {
......
...@@ -14,10 +14,10 @@ ...@@ -14,10 +14,10 @@
#pragma once #pragma once
#include <mct/hash-map.hpp>
#include <vector> #include <vector>
#include "gflags/gflags.h"
#include <mct/hash-map.hpp> #include "gflags/gflags.h"
#include "paddle/fluid/distributed/common/chunk_allocator.h" #include "paddle/fluid/distributed/common/chunk_allocator.h"
namespace paddle { namespace paddle {
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#pragma once #pragma once
#include <ThreadPool.h> #include <ThreadPool.h>
#include <future> // NOLINT #include <future> // NOLINT
#include <memory> #include <memory>
#include <unordered_set> #include <unordered_set>
......
...@@ -20,10 +20,9 @@ ...@@ -20,10 +20,9 @@
#include <string> #include <string>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "gflags/gflags.h"
#include "gflags/gflags.h"
#include "paddle/fluid/framework/generator.h" #include "paddle/fluid/framework/generator.h"
#include "paddle/fluid/operators/truncated_gaussian_random_op.h" #include "paddle/fluid/operators/truncated_gaussian_random_op.h"
namespace paddle { namespace paddle {
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include <rocksdb/slice.h> #include <rocksdb/slice.h>
#include <rocksdb/table.h> #include <rocksdb/table.h>
#include <rocksdb/write_batch.h> #include <rocksdb/write_batch.h>
#include <iostream> #include <iostream>
#include <string> #include <string>
...@@ -153,5 +154,5 @@ class RocksDBHandler { ...@@ -153,5 +154,5 @@ class RocksDBHandler {
std::vector<rocksdb::ColumnFamilyHandle*> _handles; std::vector<rocksdb::ColumnFamilyHandle*> _handles;
rocksdb::DB* _db; rocksdb::DB* _db;
}; };
} // distributed } // namespace distributed
} // paddle } // namespace paddle
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/ps/table/graph/graph_edge.h" #include "paddle/fluid/distributed/ps/table/graph/graph_edge.h"
#include <cstring> #include <cstring>
namespace paddle { namespace paddle {
namespace distributed { namespace distributed {
...@@ -25,5 +26,5 @@ void WeightedGraphEdgeBlob::add_edge(int64_t id, float weight = 1) { ...@@ -25,5 +26,5 @@ void WeightedGraphEdgeBlob::add_edge(int64_t id, float weight = 1) {
id_arr.push_back(id); id_arr.push_back(id);
weight_arr.push_back(weight); weight_arr.push_back(weight);
} }
} } // namespace distributed
} } // namespace paddle
...@@ -43,5 +43,5 @@ class WeightedGraphEdgeBlob : public GraphEdgeBlob { ...@@ -43,5 +43,5 @@ class WeightedGraphEdgeBlob : public GraphEdgeBlob {
protected: protected:
std::vector<float> weight_arr; std::vector<float> weight_arr;
}; };
} } // namespace distributed
} } // namespace paddle
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/ps/table/graph/graph_node.h" #include "paddle/fluid/distributed/ps/table/graph/graph_node.h"
#include <cstring> #include <cstring>
namespace paddle { namespace paddle {
namespace distributed { namespace distributed {
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include <memory> #include <memory>
#include <sstream> #include <sstream>
#include <vector> #include <vector>
#include "paddle/fluid/distributed/ps/table/graph/graph_weighted_sampler.h" #include "paddle/fluid/distributed/ps/table/graph/graph_weighted_sampler.h"
namespace paddle { namespace paddle {
namespace distributed { namespace distributed {
......
...@@ -13,9 +13,11 @@ ...@@ -13,9 +13,11 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/ps/table/graph/graph_weighted_sampler.h" #include "paddle/fluid/distributed/ps/table/graph/graph_weighted_sampler.h"
#include <iostream> #include <iostream>
#include <memory> #include <memory>
#include <unordered_map> #include <unordered_map>
#include "paddle/fluid/framework/generator.h" #include "paddle/fluid/framework/generator.h"
namespace paddle { namespace paddle {
namespace distributed { namespace distributed {
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include <random> #include <random>
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include "paddle/fluid/distributed/ps/table/graph/graph_edge.h" #include "paddle/fluid/distributed/ps/table/graph/graph_edge.h"
namespace paddle { namespace paddle {
namespace distributed { namespace distributed {
......
...@@ -17,7 +17,9 @@ ...@@ -17,7 +17,9 @@
#include <ThreadPool.h> #include <ThreadPool.h>
#include <assert.h> #include <assert.h>
#include <pthread.h> #include <pthread.h>
#include <string> #include <string>
#include "Eigen/Dense" #include "Eigen/Dense"
#include "paddle/fluid/distributed/ps/table/accessor.h" #include "paddle/fluid/distributed/ps/table/accessor.h"
#include "paddle/fluid/distributed/ps/table/common_table.h" #include "paddle/fluid/distributed/ps/table/common_table.h"
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include <assert.h> #include <assert.h>
// #include <pthread.h> // #include <pthread.h>
#include <stdint.h> #include <stdint.h>
#include <memory> #include <memory>
#include <mutex> // NOLINT #include <mutex> // NOLINT
#include <string> #include <string>
......
...@@ -12,15 +12,16 @@ ...@@ -12,15 +12,16 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/ps/table/memory_sparse_table.h"
#include <omp.h> #include <omp.h>
#include <sstream>
#include "paddle/fluid/distributed/common/cost_timer.h" #include <sstream>
#include "paddle/fluid/distributed/ps/table/memory_sparse_table.h"
#include "paddle/fluid/framework/io/fs.h"
#include "boost/lexical_cast.hpp" #include "boost/lexical_cast.hpp"
#include "glog/logging.h" #include "glog/logging.h"
#include "paddle/fluid/distributed/common/cost_timer.h"
#include "paddle/fluid/framework/io/fs.h"
#include "paddle/fluid/platform/enforce.h" #include "paddle/fluid/platform/enforce.h"
DEFINE_bool(pserver_print_missed_key_num_every_push, false, DEFINE_bool(pserver_print_missed_key_num_every_push, false,
...@@ -272,9 +273,8 @@ int32_t MemorySparseTable::Save(const std::string& dirname, ...@@ -272,9 +273,8 @@ int32_t MemorySparseTable::Save(const std::string& dirname,
if (_value_accesor->Save(it.value().data(), save_param)) { if (_value_accesor->Save(it.value().data(), save_param)) {
std::string format_value = _value_accesor->ParseToString( std::string format_value = _value_accesor->ParseToString(
it.value().data(), it.value().size()); it.value().data(), it.value().size());
if (0 != if (0 != write_channel->write_line(paddle::string::format_string(
write_channel->write_line(paddle::string::format_string( "%lu %s", it.key(), format_value.c_str()))) {
"%lu %s", it.key(), format_value.c_str()))) {
++retry_num; ++retry_num;
is_write_failed = true; is_write_failed = true;
LOG(ERROR) LOG(ERROR)
......
...@@ -17,12 +17,14 @@ ...@@ -17,12 +17,14 @@
#include <ThreadPool.h> #include <ThreadPool.h>
#include <assert.h> #include <assert.h>
#include <pthread.h> #include <pthread.h>
#include <memory> #include <memory>
#include <mutex> // NOLINT #include <mutex> // NOLINT
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include <utility> #include <utility>
#include <vector> #include <vector>
#include "Eigen/Dense" #include "Eigen/Dense"
#include "paddle/fluid/distributed/ps/table/accessor.h" #include "paddle/fluid/distributed/ps/table/accessor.h"
#include "paddle/fluid/distributed/ps/table/common_table.h" #include "paddle/fluid/distributed/ps/table/common_table.h"
......
...@@ -13,7 +13,9 @@ ...@@ -13,7 +13,9 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/ps/table/sparse_accessor.h" #include "paddle/fluid/distributed/ps/table/sparse_accessor.h"
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include "glog/logging.h" #include "glog/logging.h"
#include "paddle/fluid/string/string_helper.h" #include "paddle/fluid/string/string_helper.h"
......
...@@ -15,7 +15,9 @@ ...@@ -15,7 +15,9 @@
#pragma once #pragma once
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <vector> #include <vector>
#include "paddle/fluid/distributed/common/registerer.h" #include "paddle/fluid/distributed/common/registerer.h"
#include "paddle/fluid/distributed/ps.pb.h" #include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/distributed/ps/table/accessor.h" #include "paddle/fluid/distributed/ps/table/accessor.h"
......
...@@ -13,7 +13,9 @@ ...@@ -13,7 +13,9 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/distributed/ps/table/sparse_sgd_rule.h" #include "paddle/fluid/distributed/ps/table/sparse_sgd_rule.h"
#include <gflags/gflags.h> #include <gflags/gflags.h>
#include "glog/logging.h" #include "glog/logging.h"
DEFINE_bool(enable_show_scale_gradient, true, "enable show scale gradient"); DEFINE_bool(enable_show_scale_gradient, true, "enable show scale gradient");
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册