diff --git a/paddle/fluid/framework/lod_tensor.cc b/paddle/fluid/framework/lod_tensor.cc index ca820068c4d2f89b76306df81bac757918195ec1..ec5aa27be825dda2ae4b0ae66eaf96eb5770936f 100644 --- a/paddle/fluid/framework/lod_tensor.cc +++ b/paddle/fluid/framework/lod_tensor.cc @@ -243,15 +243,48 @@ void SerializeToStream(std::ostream &os, const LoDTensor &tensor, TensorToStream(os, static_cast(tensor), dev_ctx); } +void DeserializeFromStream(std::istream &is, LoDTensor *tensor, + const platform::DeviceContext &dev_ctx, + const size_t &seek, + const std::vector &shape) { + { + // the 1st field, unit32_t version for LoDTensor + uint32_t version; + is.read(reinterpret_cast(&version), sizeof(version)); + PADDLE_ENFORCE_EQ(framework::IsTensorVersionSupported(version), true, + platform::errors::InvalidArgument( + "tensor version %u is not supported.", version)); + PADDLE_ENFORCE_EQ( + version, 0U, + platform::errors::InvalidArgument( + "tensor version %u is not supported, Only version 0 is supported", + version)); + } + { + // the 2st field, LoD information + uint64_t lod_level; + is.read(reinterpret_cast(&lod_level), sizeof(lod_level)); + auto &lod = *tensor->mutable_lod(); + lod.resize(lod_level); + } + // the 3st filed, Tensor + TensorFromStream(is, static_cast(tensor), dev_ctx, seek, shape); +} + void DeserializeFromStream(std::istream &is, LoDTensor *tensor, const platform::DeviceContext &dev_ctx) { { // the 1st field, unit32_t version for LoDTensor uint32_t version; is.read(reinterpret_cast(&version), sizeof(version)); - PADDLE_ENFORCE(framework::IsTensorVersionSupported(version), - "tensor version %u is not supported.", version); - PADDLE_ENFORCE_EQ(version, 0U, "Only version 0 is supported"); + PADDLE_ENFORCE_EQ(framework::IsTensorVersionSupported(version), true, + platform::errors::InvalidArgument( + "tensor version %u is not supported.", version)); + PADDLE_ENFORCE_EQ( + version, 0U, + platform::errors::InvalidArgument( + "tensor version %u is not supported, Only version 0 is supported", + version)); } { // the 2st field, LoD information diff --git a/paddle/fluid/framework/lod_tensor.h b/paddle/fluid/framework/lod_tensor.h index ef48753349ec7b07d2c1c0ee68d133145e4e4047..3ad873d1f6c500bf6135a521bfc846869b70f774 100644 --- a/paddle/fluid/framework/lod_tensor.h +++ b/paddle/fluid/framework/lod_tensor.h @@ -209,6 +209,10 @@ void SerializeToStream(std::ostream& os, const LoDTensor& tensor, const platform::DeviceContext& dev_ctx); void DeserializeFromStream(std::istream& is, LoDTensor* tensor, const platform::DeviceContext& dev_ctx); +void DeserializeFromStream(std::istream& is, LoDTensor* tensor, + const platform::DeviceContext& dev_ctx, + const size_t& seek, + const std::vector& shape); /* * Convert between length-based LoD and offset-based LoD. diff --git a/paddle/fluid/framework/save_load_util.cc b/paddle/fluid/framework/save_load_util.cc index 76544af052f07763146478c305e07e142ffc3bbd..fbbbfd66b3d8c39d0ccaa7d998bb5c5e9860df4e 100644 --- a/paddle/fluid/framework/save_load_util.cc +++ b/paddle/fluid/framework/save_load_util.cc @@ -342,8 +342,9 @@ bool LoadTensorFromDisk( std::unique_ptr buf(new char[size]); fin.read(reinterpret_cast(buf.get()), size); CheckInStreamState(fin, sizeof(size)); - PADDLE_ENFORCE(desc.ParseFromArray(buf.get(), size), - "Cannot parse tensor desc"); + PADDLE_ENFORCE_EQ( + desc.ParseFromArray(buf.get(), size), true, + platform::errors::InvalidArgument("Cannot parse tensor desc")); } { // read tensor diff --git a/paddle/fluid/framework/tensor_util.cc b/paddle/fluid/framework/tensor_util.cc index ace6b033cccfb11cacf81a4d34f185ef1e11e0a0..8b2e3fc323ce560fbd5f8248584f95ef6a4bb702 100644 --- a/paddle/fluid/framework/tensor_util.cc +++ b/paddle/fluid/framework/tensor_util.cc @@ -404,8 +404,9 @@ void TensorToStream(std::ostream& os, const Tensor& tensor, uint64_t size = tensor.numel() * framework::SizeOfType(tensor.type()); auto* data_ptr = tensor.data(); - PADDLE_ENFORCE(size < std::numeric_limits::max(), - "Index overflow when writing tensor"); + PADDLE_ENFORCE_LT(size, std::numeric_limits::max(), + platform::errors::ResourceExhausted( + "tensor size %d overflow when writing tensor", size)); if (platform::is_gpu_place(tensor.place())) { #ifdef PADDLE_WITH_CUDA constexpr size_t kBufSize = 1024 * 1024 * 64; // 64MB @@ -426,7 +427,8 @@ void TensorToStream(std::ostream& os, const Tensor& tensor, size -= size_to_write; } #else - PADDLE_THROW("Unexpected branch"); + PADDLE_THROW(platform::errors::Unimplemented( + "CUDAPlace is not supported when not compiled with CUDA")); #endif } else { os.write(static_cast(data_ptr), @@ -450,11 +452,69 @@ struct DeserializedDataFunctor { platform::Place place_; }; +void TensorFromStream(std::istream& is, Tensor* tensor, + const platform::DeviceContext& dev_ctx, + const size_t& seek, const std::vector& shape) { + uint32_t version; + is.read(reinterpret_cast(&version), sizeof(version)); + + PADDLE_ENFORCE_EQ( + version, 0U, + platform::errors::InvalidArgument( + "tensor version %u is not supported, Only version 0 is supported", + version)); + + proto::VarType::TensorDesc desc; + { // int32_t size + // proto buffer + int32_t size; + is.read(reinterpret_cast(&size), sizeof(size)); + std::unique_ptr buf(new char[size]); + is.read(reinterpret_cast(buf.get()), size); + PADDLE_ENFORCE_EQ( + desc.ParseFromArray(buf.get(), size), true, + platform::errors::InvalidArgument("Cannot parse tensor desc")); + } + { // read tensor + tensor->Resize(framework::make_ddim(shape)); + size_t seekg = seek * framework::SizeOfType(desc.data_type()); + is.seekg(seekg, is.cur); + + void* buf; + auto ctx = platform::CPUDeviceContext(); + size_t size = tensor->numel() * framework::SizeOfType(desc.data_type()); + if (platform::is_gpu_place(dev_ctx.GetPlace())) { +#ifdef PADDLE_WITH_CUDA + Tensor cpu_tensor; + cpu_tensor.Resize(framework::make_ddim(shape)); + framework::VisitDataType( + desc.data_type(), + DeserializedDataFunctor(&buf, &cpu_tensor, ctx.GetPlace())); + is.read(static_cast(buf), size); + auto dst_place = dev_ctx.GetPlace(); + framework::TensorCopy(cpu_tensor, dst_place, dev_ctx, tensor); +#else + PADDLE_THROW(platform::errors::Unimplemented( + "CUDAPlace is not supported when not compiled with CUDA")); +#endif + } else { + framework::VisitDataType( + desc.data_type(), + DeserializedDataFunctor(&buf, tensor, ctx.GetPlace())); + is.read(static_cast(buf), size); + } + } +} + void TensorFromStream(std::istream& is, Tensor* tensor, const platform::DeviceContext& dev_ctx) { uint32_t version; is.read(reinterpret_cast(&version), sizeof(version)); - PADDLE_ENFORCE_EQ(version, 0U, "Only version 0 is supported"); + PADDLE_ENFORCE_EQ( + version, 0U, + platform::errors::InvalidArgument( + "tensor version %u is not supported, Only version 0 is supported", + version)); proto::VarType::TensorDesc desc; { // int32_t size // proto buffer @@ -462,8 +522,9 @@ void TensorFromStream(std::istream& is, Tensor* tensor, is.read(reinterpret_cast(&size), sizeof(size)); std::unique_ptr buf(new char[size]); is.read(reinterpret_cast(buf.get()), size); - PADDLE_ENFORCE(desc.ParseFromArray(buf.get(), size), - "Cannot parse tensor desc"); + PADDLE_ENFORCE_EQ( + desc.ParseFromArray(buf.get(), size), true, + platform::errors::InvalidArgument("Cannot parse tensor desc")); } { // read tensor std::vector dims; @@ -484,7 +545,8 @@ void TensorFromStream(std::istream& is, Tensor* tensor, auto dst_place = dev_ctx.GetPlace(); framework::TensorCopy(cpu_tensor, dst_place, dev_ctx, tensor); #else - PADDLE_THROW("Unexpected branch"); + PADDLE_THROW(platform::errors::Unimplemented( + "CUDAPlace is not supported when not compiled with CUDA")); #endif } else { framework::VisitDataType( diff --git a/paddle/fluid/framework/tensor_util.h b/paddle/fluid/framework/tensor_util.h index dd535dfb6b8a438939311f843d49dc1fdc864985..9cbcf8f6286c6a24bdf9165c107b15d2a7a5d5c5 100644 --- a/paddle/fluid/framework/tensor_util.h +++ b/paddle/fluid/framework/tensor_util.h @@ -72,6 +72,9 @@ void TensorToStream(std::ostream& os, const Tensor& tensor, const platform::DeviceContext& dev_ctx); void TensorFromStream(std::istream& is, Tensor* tensor, const platform::DeviceContext& dev_ctx); +void TensorFromStream(std::istream& is, Tensor* tensor, + const platform::DeviceContext& dev_ctx, + const size_t& seek, const std::vector& shape); // convert dlpack's DLTensor to tensor void TensorFromDLPack(const ::DLTensor& dl_tensor, framework::Tensor* dst); diff --git a/paddle/fluid/operators/distributed/parameter_prefetch.cc b/paddle/fluid/operators/distributed/parameter_prefetch.cc index b6ee2d29df8911a1844acf3005a7b16d53ef8079..c9bbc3c193558a6e07e118a482c63f7f8427a27e 100644 --- a/paddle/fluid/operators/distributed/parameter_prefetch.cc +++ b/paddle/fluid/operators/distributed/parameter_prefetch.cc @@ -183,9 +183,18 @@ void prefetchs(const std::vector& id_var_names, PADDLE_ENFORCE_EQ(table_names.size(), endpoints.size(), ""); PADDLE_ENFORCE_EQ(table_names.size(), height_sections.size(), ""); - auto* reconstruct_var = - scope.FindVar(persistable_var_name)->GetMutable(); - const auto vec_dim_1 = reconstruct_var->dims()[1]; + auto vec_dim_1 = 0; + framework::Variable* var = scope.FindVar(persistable_var_name); + + PADDLE_ENFORCE_EQ(var->IsType(), true, + platform::errors::InvalidArgument( + "prefetch can only support LodTensor only")); + + vec_dim_1 = var->Get().dims()[1]; + + PADDLE_ENFORCE_GT(vec_dim_1, 0, + platform::errors::InvalidArgument( + "lookup table var's dim must gather than 0")); const auto place = scope.FindVar(id_var_names[0])->Get().place(); @@ -251,16 +260,6 @@ void prefetchs(const std::vector& id_var_names, } } } - - if (backfill) { - VLOG(3) << "backfill persistable var's id with vecs"; - - auto* reconstruct_d = reconstruct_var->data(); - for (auto& id : ids_union) { - std::copy(recved_vec_map[id].begin(), recved_vec_map[id].end(), - reconstruct_d + id * vec_dim_1); - } - } } }; // namespace distributed diff --git a/paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.cc b/paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.cc index 403347d334b274c105b3f6c0e19c1150de824c85..f37d2b1eee108d835a8f06035f13ca1abbaf0d69 100644 --- a/paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.cc +++ b/paddle/fluid/operators/distributed_ops/distributed_lookup_table_op.cc @@ -72,8 +72,9 @@ class DistributedLookupTableOp : public framework::OperatorWithKernel { protected: framework::OpKernelType GetExpectedKernelType( const framework::ExecutionContext &ctx) const override { - auto data_type = OperatorWithKernel::IndicateVarDataType(ctx, "W"); - return framework::OpKernelType(data_type, ctx.device_context()); + return framework::OpKernelType( + framework::proto::VarType::Type(ctx.Attr("dtype")), + ctx.GetPlace()); } }; @@ -139,6 +140,10 @@ class DistributedLookupTableOpMaker : public framework::OpProtoAndCheckerMaker { "Otherwise the given value indicates padding the output " "with zeros whenever lookup encounters it in Ids.") .SetDefault(distributed::kNoPadding); + AddAttr("dtype", + "(int, default 5 (FP32)) " + "Output data type") + .SetDefault(framework::proto::VarType::FP32); AddComment(R"DOC( Lookup Tablel Prefetch Operator. diff --git a/paddle/fluid/operators/distributed_ops/fake_init_op.cc b/paddle/fluid/operators/distributed_ops/fake_init_op.cc index b56ab1d5fd6d2da086fa6d3fb02df8a66d550b41..f5a34b2dcb12e6f9c3367ea388c09eeeea905f83 100644 --- a/paddle/fluid/operators/distributed_ops/fake_init_op.cc +++ b/paddle/fluid/operators/distributed_ops/fake_init_op.cc @@ -1,11 +1,8 @@ /* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. - Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -69,10 +66,8 @@ class FakeInitOpMaker : public framework::OpProtoAndCheckerMaker { "with the specified value"); AddComment(R"DOC( FakeInit Operator. - Init an variable but not alloc memory for it, it is used for init the table parameter at trainer side in distributed lookup table. - )DOC"); } }; diff --git a/paddle/fluid/operators/distributed_ops/recv_save_op.cc b/paddle/fluid/operators/distributed_ops/recv_save_op.cc new file mode 100644 index 0000000000000000000000000000000000000000..766e378e106bc7cd47c375aef22aaa863ddc6aba --- /dev/null +++ b/paddle/fluid/operators/distributed_ops/recv_save_op.cc @@ -0,0 +1,262 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include +#include +#include +#include +#include + +#include "paddle/fluid/framework/data_type.h" +#include "paddle/fluid/framework/data_type_transform.h" +#include "paddle/fluid/framework/framework.pb.h" +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/op_registry.h" +#include "paddle/fluid/framework/selected_rows.h" +#include "paddle/fluid/framework/variable.h" +#include "paddle/fluid/framework/version.h" +#include "paddle/fluid/operators/distributed/distributed.h" +#include "paddle/fluid/operators/distributed/parameter_recv.h" +#include "paddle/fluid/operators/distributed/rpc_common.h" +#include "paddle/fluid/string/string_helper.h" + +namespace paddle { +namespace operators { +class RecvSaveOp : public framework::OperatorWithKernel { + public: + using framework::OperatorWithKernel::OperatorWithKernel; + + void InferShape(framework::InferShapeContext *ctx) const override {} + + protected: + framework::OpKernelType GetExpectedKernelType( + const framework::ExecutionContext &ctx) const override { + return framework::OpKernelType( + framework::proto::VarType::Type(ctx.Attr("dtype")), + ctx.GetPlace()); + } +}; + +class RecvSaveOpProtoMaker : public framework::OpProtoAndCheckerMaker { + public: + void Make() override { + AddComment(R"DOC( +Recv Save operator + +This operator will serialize and write LoDTensor variable to file on disk. +)DOC"); + AddAttr("dtype", + "(int, default 5 (FP32)) " + "Output data type") + .SetDefault(framework::proto::VarType::FP32); + + AddAttr("overwrite", + "(boolean, default true)" + "Overwrite the output file if exist") + .SetDefault(true); + + AddAttr("file_path", + "(string)" + "The \"file_path\" where the variable will be saved.") + .AddCustomChecker( + [](const std::string &path) { return !path.empty(); }); + + AddAttr>("shape", + "(vector) The shape of the output") + .SetDefault({}); + + AddAttr>( + "slice_varnames", + "(string vector, default {}) " + "sometimes we need to put received var in another name " + "for example: we need var named 'moment_1@127.0.0.1:1001', " + "and it real name on parameter server is 'moment_1'. ") + .SetDefault({}); + + AddAttr>( + "remote_varnames", + "(string vector, default {}) " + "sometimes we need to put received var in another name " + "for example: we need var named 'moment_1@127.0.0.1:1001', " + "and it real name on parameter server is 'moment_1'. ") + .SetDefault({}); + + AddAttr>("slice_shapes", + "(vector) " + "the length of each output along the " + "specified axis.") + .SetDefault({}); + + AddAttr>("endpoints", + "(string vector, default 127.0.0.1:6164)" + "Server endpoints in the order of input " + "variables for mapping") + .SetDefault({}); + + AddAttr("trainer_id", "trainer id from 0 ~ worker_num.").SetDefault(0); + } +}; + +template +class RecvSaveOpKernel : public framework::OpKernel { + private: + void SerializeVersionToStream(std::ostream &os) const { + { // the 1st field, uint32_t version for LoDTensor + os.write(reinterpret_cast(&framework::kCurTensorVersion), + sizeof(framework::kCurTensorVersion)); + } + // the 2st field, LoD information + // in this scene, skip LoD information. + uint64_t size = 0; + os.write(reinterpret_cast(&size), sizeof(size)); + } + + void SerializeTensorHeaderToStream( + std::ostream &os, const framework::proto::VarType::Type &type, + const framework::DDim &dims) const { + { // the 1st field, uint32_t version + constexpr uint32_t version = 0; + os.write(reinterpret_cast(&version), sizeof(version)); + } + { // the 2nd field, tensor description + // int32_t size + // void* protobuf message + framework::proto::VarType::TensorDesc desc; + desc.set_data_type(type); + auto tensor_dims = framework::vectorize(dims); + auto *pb_dims = desc.mutable_dims(); + pb_dims->Resize(static_cast(tensor_dims.size()), 0); + std::copy(tensor_dims.begin(), tensor_dims.end(), pb_dims->begin()); + int32_t size = desc.ByteSize(); + os.write(reinterpret_cast(&size), sizeof(size)); + auto out = desc.SerializeAsString(); + os.write(out.data(), size); + } + } + + void SerializeTensorAppendToStream(std::ostream &os, + const framework::Tensor &tensor) const { + uint64_t size = tensor.numel() * framework::SizeOfType(tensor.type()); + auto *data_ptr = tensor.data(); + + PADDLE_ENFORCE_LT(size, std::numeric_limits::max(), + platform::errors::ResourceExhausted( + "tensor size %d overflow when writing tensor", size)); + os.write(static_cast(data_ptr), + static_cast(size)); + } + + public: + void Compute(const framework::ExecutionContext &ctx) const override { + auto place = ctx.GetPlace(); + + auto filename = ctx.Attr("file_path"); + auto overwrite = ctx.Attr("overwrite"); + + if (FileExists(filename) && !overwrite) { + PADDLE_THROW_ERROR( + "%s is existed, cannot save to it when overwrite=false", filename, + overwrite); + } + + MkDirRecursively(DirName(filename).c_str()); + + auto origin_shape = ctx.Attr>("shape"); + auto slice_shapes = ctx.Attr>("slice_shapes"); + auto slice_varnames = ctx.Attr>("slice_varnames"); + auto remote_varnames = + ctx.Attr>("remote_varnames"); + auto endpoints = ctx.Attr>("endpoints"); + + PADDLE_ENFORCE_EQ(slice_shapes.size(), slice_varnames.size(), + platform::errors::InvalidArgument( + "Expected attr len(slice_shapes) must be equal to " + "len(slice_varnames)")); + + PADDLE_ENFORCE_EQ( + slice_shapes.size(), endpoints.size(), + platform::errors::InvalidArgument( + "Expected attr len(slice_shapes) must be equal to len(endpoints)")); + + auto data_type = + static_cast(ctx.Attr("dtype")); + + // it to save an output stream. + std::ofstream fout(filename, std::ios::binary); + PADDLE_ENFORCE_EQ( + static_cast(fout), true, + platform::errors::NotFound("Cannot open %s to write", filename)); + + SerializeVersionToStream(fout); + SerializeTensorHeaderToStream(fout, data_type, + framework::make_ddim(origin_shape)); + + framework::Scope &local_scope = ctx.scope().NewScope(); + + auto trainer_id = ctx.Attr("trainer_id"); + + platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); + auto &device_ctx = *pool.Get(place); + + distributed::RPCClient *rpc_client = + distributed::RPCClient::GetInstance(trainer_id); + + for (size_t i = 0; i < slice_varnames.size(); i++) { + auto &varname = slice_varnames[i]; + auto *var = local_scope.Var(varname); + auto *tensor = var->GetMutable(); + + auto slice_string = + string::split_string(slice_shapes[i], ","); + std::vector slice_shape; + + for (auto &dim : slice_string) { + slice_shape.push_back(static_cast(std::stoull(dim))); + } + + tensor->Resize(framework::make_ddim(slice_shape)); + + distributed::VarHandlePtr ret; + + ret = rpc_client->AsyncGetVarNoBarrier( + endpoints[i], device_ctx, local_scope, remote_varnames[i], varname); + + PADDLE_ENFORCE_NE( + ret->Wait(), 0U, + platform::errors::ExecutionTimeout( + "rpc error when communication with %s", endpoints[i])); + + auto &c_tensor = var->Get(); + + SerializeTensorAppendToStream(fout, c_tensor); + local_scope.EraseVars({varname}); + } + + fout.close(); + ctx.scope().DeleteScope(&local_scope); + } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; + +REGISTER_OPERATOR(recv_save, ops::RecvSaveOp, ops::RecvSaveOpProtoMaker); + +REGISTER_OP_CPU_KERNEL( + recv_save, ops::RecvSaveOpKernel, + ops::RecvSaveOpKernel, + ops::RecvSaveOpKernel, + ops::RecvSaveOpKernel); diff --git a/paddle/fluid/operators/hierarchical_sigmoid_op.h b/paddle/fluid/operators/hierarchical_sigmoid_op.h index daf99586f825c53704088d718773a220d434b7a9..752773670433006e40b899f88bf7f078fa958464 100644 --- a/paddle/fluid/operators/hierarchical_sigmoid_op.h +++ b/paddle/fluid/operators/hierarchical_sigmoid_op.h @@ -68,46 +68,6 @@ class HierarchicalSigmoidOpKernel : public framework::OpKernel { size_t num_classes = static_cast(ctx.Attr("num_classes")); // for remote prefetch - auto remote_prefetch = ctx.Attr("remote_prefetch"); - auto epmap = ctx.Attr>("epmap"); - if (remote_prefetch && !epmap.empty()) { - // if epmap is not empty, then the parameter will be fetched from remote - // parameter - // server - auto height_sections = ctx.Attr>("height_sections"); - auto table_names = ctx.Attr>("table_names"); - std::vector real_rows = PathToRows(*path); - framework::Scope& local_scope = ctx.scope().NewScope(); - auto* ids = local_scope.Var("Ids@Prefetch"); - auto* x_tensor = ids->GetMutable(); - - x_tensor->mutable_data( - framework::make_ddim({static_cast(real_rows.size()), 1}), - ctx.GetPlace()); - // copy. - - std::memcpy(x_tensor->data(), real_rows.data(), - real_rows.size() * sizeof(int64_t)); - - framework::DDim w_dims = ctx.Input("W")->dims(); - w_dims[0] = x_tensor->dims()[0]; - auto* w_tensor = - local_scope.Var("W@Prefetch")->GetMutable(); - w_tensor->Resize(w_dims); - -#ifdef PADDLE_WITH_DISTRIBUTE - // w_Out is set to used by prefetch, never change it in other cases - auto weight = ctx.OutputNames("W_Out").front(); - operators::distributed::prefetch("Ids@Prefetch", "W@Prefetch", weight, - true, table_names, epmap, - height_sections, ctx, local_scope); -#else - PADDLE_THROW( - "paddle is not compiled with distribute support, can not do " - "parameter prefetch!"); -#endif - } - bool is_custom = false; if (path) { is_custom = true; diff --git a/paddle/fluid/operators/load_op.cc b/paddle/fluid/operators/load_op.cc index 435c755df3642ae0ba5144a89ed30ed6e0b63258..4f2c9a6ca038cff7188793f42417baf7e096ee50 100644 --- a/paddle/fluid/operators/load_op.cc +++ b/paddle/fluid/operators/load_op.cc @@ -48,8 +48,14 @@ class LoadOpProtoMaker : public framework::OpProtoAndCheckerMaker { R"(Variable will be loaded from "file_path")") .AddCustomChecker( [](const std::string &path) { return !path.empty(); }); + AddAttr("seek", "(int64_t) Starting for load tensor from seek pos") + .SetDefault(-1); + AddAttr>("shape", + "(vector) The shape of the output") + .SetDefault({}); AddComment( - "Load operator will load a LoDTensor / SelectedRows variable from disk " + "Load operator will load a LoDTensor / SelectedRows variable from " + "disk " "file."); } }; diff --git a/paddle/fluid/operators/load_op.h b/paddle/fluid/operators/load_op.h index 7347fd5e05481156cf7bd84c55b9c6f55b175bce..5a409502fd26421f5f835fa143adf87b6ca14d02 100644 --- a/paddle/fluid/operators/load_op.h +++ b/paddle/fluid/operators/load_op.h @@ -16,6 +16,7 @@ limitations under the License. */ #include #include +#include #include "paddle/fluid/framework/data_type_transform.h" #include "paddle/fluid/framework/op_registry.h" @@ -63,7 +64,18 @@ class LoadOpKernel : public framework::OpKernel { platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); auto &dev_ctx = *pool.Get(place); auto *tensor = var->GetMutable(); - DeserializeFromStream(fin, tensor, dev_ctx); + + auto seek = ctx.Attr("seek"); + + if (seek != -1) { + PADDLE_ENFORCE_GE(seek, 0, + platform::errors::InvalidArgument( + "seek witn tensor must great than or equal to 0")); + auto shape = ctx.Attr>("shape"); + DeserializeFromStream(fin, tensor, dev_ctx, seek, shape); + } else { + DeserializeFromStream(fin, tensor, dev_ctx); + } auto load_as_fp16 = ctx.Attr("load_as_fp16"); auto in_dtype = tensor->type(); diff --git a/paddle/fluid/string/to_string.h b/paddle/fluid/string/to_string.h index f5693ee7f1ebf6654de7d8ff20649fac858a27cb..7b3332861e0fa3edbbb8915e3e3f068fed3b412f 100644 --- a/paddle/fluid/string/to_string.h +++ b/paddle/fluid/string/to_string.h @@ -17,6 +17,7 @@ limitations under the License. */ #include #include #include +#include namespace paddle { namespace string { diff --git a/python/paddle/fluid/io.py b/python/paddle/fluid/io.py index 6f58a700a53bf8a8f1f30ad0498e87ac54372fdf..eed3b79591c09b07923a8f290173fe4585bec2c4 100644 --- a/python/paddle/fluid/io.py +++ b/python/paddle/fluid/io.py @@ -30,7 +30,8 @@ from paddle.reader import * from paddle.fluid import layers from paddle.fluid.executor import Executor, global_scope from paddle.fluid.evaluator import Evaluator -from paddle.fluid.framework import Program, Parameter, default_main_program, default_startup_program, Variable, program_guard +from paddle.fluid.framework import Program, Parameter, default_main_program, default_startup_program, Variable, \ + program_guard from paddle.fluid.compiler import CompiledProgram from paddle.fluid.log_helper import get_logger from . import reader @@ -383,72 +384,42 @@ def _save_distributed_persistables(executor, dirname, main_program): # recv optimize vars from pserver for name, remote_params in remote_params_map.items(): - origin_var = None - is_slice = False - slice_vars = [0] * len(remote_params) - slice_var_names = [""] * len(remote_params) - endpoints = [""] * len(remote_params) + origin = remote_params[0].origin + is_slice = remote_params[0].is_slice + + slices = [None] * len(remote_params) + slice_varnames = [None] * len(remote_params) + remote_varnames = [None] * len(remote_params) + endpoints = [None] * len(remote_params) for idx, optimizer in enumerate(remote_params): - origin = optimizer.origin - slice = optimizer.slice - is_slice = optimizer.is_slice block_id = optimizer.block_id + slice = optimizer.slice endpoint = optimizer.endpoint - if idx == 0: - origin_var = block.create_var( - name=origin.name, - type=origin.type, - shape=origin.shape, - dtype=origin.dtype, - persistable=True) - - slice_var = block.create_var( - name="{}.slice.{}".format(slice.name, idx), - type=slice.type, - shape=slice.shape, - dtype=slice.dtype, - persistable=True) - index = block_id if is_slice else idx - slice_vars[index] = slice_var - slice_var_names[index] = slice.name + slices[index] = slice + slice_varnames[index] = "{}.slice.{}".format(slice.name, idx) + remote_varnames[index] = slice.name endpoints[index] = endpoint - if is_slice: - block.append_op( - type='recv', - inputs={"X": []}, - outputs={"Out": slice_vars}, - attrs={ - "epmap": endpoints, - "with_barrier": False, - "varnames": slice_var_names, - "sync_mode": True - }) - block.append_op( - type='concat', - inputs={'X': slice_vars}, - outputs={'Out': origin_var}, - attrs={}) - else: - block.append_op( - type='recv', - inputs={"X": []}, - outputs={"Out": [origin_var]}, - attrs={ - "epmap": endpoints[:1], - "with_barrier": False, - "varnames": slice_var_names, - "sync_mode": True - }) + slice_shapes = [] + for slice in slices: + tmp = [str(dim) for dim in slice.shape] + slice_shapes.append(",".join(tmp)) + block.append_op( - type='save', - inputs={'X': [origin_var]}, - outputs={}, - attrs={'file_path': os.path.join(dirname, origin_var.name)}) - block.append_op(type='delete_var', inputs={'X': slice_vars}) + type='recv_save', + attrs={ + "trainer_id": 0, + "shape": origin.shape, + "slice_shapes": slice_shapes, + "slice_varnames": slice_varnames, + "remote_varnames": remote_varnames, + "endpoints": endpoints, + "file_path": os.path.join(dirname, origin.name) + }) + executor.run(prog) def __save_distributed_lookup_tables(executor, dirname, @@ -478,8 +449,8 @@ def _save_distributed_persistables(executor, dirname, main_program): if var.name in exclude_var_names: return False if var.desc.type() == core.VarDesc.VarType.FEED_MINIBATCH or \ - var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \ - var.desc.type() == core.VarDesc.VarType.READER: + var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \ + var.desc.type() == core.VarDesc.VarType.READER: return False return var.persistable @@ -690,7 +661,7 @@ def load_vars(executor, if not isinstance(main_program, Program): raise TypeError("program should be as Program type or None") - #save origin param shape + # save origin param shape orig_para_shape = {} load_var_map = {} for each_var in vars: @@ -725,7 +696,7 @@ def load_vars(executor, attrs={'file_path': os.path.join(load_dirname, filename)}) executor.run(load_prog) - #check var shape + # check var shape for each_var in vars: if not isinstance(each_var, Parameter): continue @@ -893,21 +864,6 @@ def _load_distributed_persistables(executor, dirname, main_program=None): offset = param.offset if is_slice: - origin = load_block.create_var( - name="{}.load".format(origin_var.name), - type=origin_var.type, - shape=origin_var.shape, - dtype=origin_var.dtype, - persistable=True) - - load_block.append_op( - type='load', - inputs={}, - outputs={'Out': [origin]}, - attrs={ - 'file_path': os.path.join(dirname, origin_var.name) - }) - slice = load_block.create_var( name=slice_var.name, type=slice_var.type, @@ -915,22 +871,15 @@ def _load_distributed_persistables(executor, dirname, main_program=None): dtype=slice_var.dtype, persistable=True) - dim1_flatten = 1 - if len(slice.shape) >= 2: - dim1_flatten = reduce(lambda x, y: x * y, slice.shape[1:]) - - start = int(offset / dim1_flatten) - end = int(offset / dim1_flatten + slice.shape[0]) - load_block.append_op( - type="slice", - inputs={'Input': origin}, - outputs={'Out': slice}, - attrs={'axes': [0], - 'starts': [start], - 'ends': [end]}) - - need_delete_vars.append(origin) + type='load', + inputs={}, + outputs={'Out': [slice]}, + attrs={ + 'file_path': os.path.join(dirname, origin_var.name), + 'seek': offset, + 'shape': slice.shape + }) else: origin = load_block.create_var( name="{}".format(origin_var.name), @@ -1517,7 +1466,7 @@ def save(program, model_path): base_name = os.path.basename(model_path) assert base_name != "", \ - "model_path MUST be format of dirname/filename [dirname\\filename in Window], Now filename is empty str" + "model_path MUST be format of dirname/filename [dirname\\filename in Window], Now filename is empty str" def get_tensor(var): t = global_scope().find_var(var.name).get_tensor() @@ -1574,7 +1523,7 @@ def load(program, model_path, executor=None): parameter_file_name = model_path + ".pdparams" assert os.path.exists(parameter_file_name), \ - "Parameter file [{}] not exits".format(parameter_file_name) + "Parameter file [{}] not exits".format(parameter_file_name) def set_var(var, ndarray): t = global_scope().find_var(var.name).get_tensor() @@ -1610,7 +1559,7 @@ def load(program, model_path, executor=None): if len(optimizer_var_list) > 0: opt_file_name = model_path + ".pdopt" assert os.path.exists(opt_file_name), \ - "Optimizer file [{}] not exits".format( opt_file_name) + "Optimizer file [{}] not exits".format(opt_file_name) if executor: paddle.fluid.core._create_loaded_parameter( @@ -1655,7 +1604,7 @@ def load_program_state(model_path): """ parameter_file_name = model_path + ".pdparams" assert os.path.exists(parameter_file_name), \ - "Parameter file [{}] not exits".format( parameter_file_name) + "Parameter file [{}] not exits".format(parameter_file_name) with open(parameter_file_name, 'rb') as f: para_dict = pickle.load(f) @@ -1707,25 +1656,25 @@ def set_program_state(program, state_dict): for para in parameter_list: var_temp = paddle.fluid.global_scope().find_var(para.name) assert var_temp != None, \ - "Variable [ {} ] Not found, Please make sure run startup program".format( para.name ) + "Variable [ {} ] Not found, Please make sure run startup program".format(para.name) if para.name in state_dict: # set value from state dict orig_para_np = np.array(var_temp.get_tensor()) new_para_np = state_dict[para.name] - assert orig_para_np.shape == new_para_np.shape, \ - "Shape not matching: the Program requires a parameter with a shape of ({}), " \ - "while the loaded parameter (namely [ {} ]) has a shape of ({})." \ + assert orig_para_np.shape == new_para_np.shape, \ + "Shape not matching: the Program requires a parameter with a shape of ({}), " \ + "while the loaded parameter (namely [ {} ]) has a shape of ({})." \ .format(orig_para_np.shape, para.name, new_para_np.shape) - assert orig_para_np.dtype == new_para_np.dtype, \ - "Dtype not matching: the Program requires a parameter with a dtype of ({}), " \ - "while the loaded parameter (namely [ {} ]) has a dtype of ({})." \ + assert orig_para_np.dtype == new_para_np.dtype, \ + "Dtype not matching: the Program requires a parameter with a dtype of ({}), " \ + "while the loaded parameter (namely [ {} ]) has a dtype of ({})." \ .format(orig_para_np.dtype, para.name, new_para_np.dtype) ten = var_temp.get_tensor() ten_place = ten._place() assert ten_place.is_gpu_place() or ten_place.is_cpu_place(), \ - "Place not support, only support CPUPlace and GPUPlace, now is {}".format( str(ten_place)) + "Place not support, only support CPUPlace and GPUPlace, now is {}".format(str(ten_place)) py_place = paddle.fluid.CPUPlace() if ten_place.is_cuda_pinned_place(): place = paddle.fluid.CUDAPinnedPlace() diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index f4f11168d2d39a490e0d29b493316a3a47d33e11..9614aa72f98274b5e03751aeb90670c511a2ae79 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -15,7 +15,8 @@ list(APPEND MIXED_DIST_TEST_OPS test_dgc_optimizer) list(APPEND MIXED_DIST_TEST_OPS test_simple_dist_transpiler) list(APPEND MIXED_DIST_TEST_OPS test_listen_and_serv_op) list(APPEND MIXED_DIST_TEST_OPS test_nce_remote_table_op) -list(APPEND MIXED_DIST_TEST_OPS test_hsigmoid_remote_table_op) +list(APPEND MIXED_DIST_TEST_OPS test_recv_save_op) +list(APPEND MIXED_DIST_TEST_OPS test_transpiler_ops) list(APPEND MIXED_DIST_TEST_OPS test_lookup_remote_table_op) list(APPEND MIXED_DIST_TEST_OPS test_launch) list(APPEND MIXED_DIST_TEST_OPS test_launch_ps) @@ -252,8 +253,9 @@ if(WITH_DISTRIBUTE) list(REMOVE_ITEM DIST_TEST_OPS "test_dist_fleet_base") py_test_modules(test_lookup_remote_table_op MODULES test_lookup_remote_table_op ENVS ${dist_ENVS}) - py_test_modules(test_hsigmoid_remote_table_op MODULES test_hsigmoid_remote_table_op ENVS ${dist_ENVS}) py_test_modules(test_nce_remote_table_op MODULES test_nce_remote_table_op ENVS ${dist_ENVS}) + py_test_modules(test_recv_save_op MODULES test_recv_save_op ENVS ${dist_ENVS}) + py_test_modules(test_transpiler_ops MODULES test_transpiler_ops ENVS ${dist_ENVS}) if(WITH_DGC) py_test_modules(test_dgc_op MODULES test_dgc_op) py_test_modules(test_dgc_momentum_op MODULES test_dgc_momentum_op) diff --git a/python/paddle/fluid/tests/unittests/test_dist_transpiler.py b/python/paddle/fluid/tests/unittests/test_dist_transpiler.py index f81d4fda50be195b239fcb149382d997275405fd..a2e95f0d65ecffb71bc1e5c9937fb93ea69c385a 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_transpiler.py +++ b/python/paddle/fluid/tests/unittests/test_dist_transpiler.py @@ -23,6 +23,7 @@ import unittest import numpy as np import gc + gc.set_debug(gc.DEBUG_COLLECTABLE) import paddle.fluid as fluid @@ -274,6 +275,115 @@ class TestLRDecay(TranspilerTest): ]) +class TestFakeInit(TranspilerTest): + def net_conf(self): + dict_size, embedding_size, neg_num = 10000, 8, 5 + + input_word = fluid.layers.data( + name="input_word", shape=[1], dtype='int64', lod_level=1) + true_word = fluid.layers.data( + name='true_label', shape=[1], dtype='int64', lod_level=1) + neg_word = fluid.layers.data( + name="neg_label", shape=[1], dtype='int64', lod_level=1) + inputs = [input_word, true_word, neg_word] + + init_width = 0.5 / embedding_size + input_emb = fluid.layers.embedding( + input=inputs[0], + is_sparse=True, + size=[dict_size, embedding_size], + param_attr=fluid.ParamAttr( + name='emb', + initializer=fluid.initializer.Uniform(-init_width, init_width))) + + true_emb_w = fluid.layers.embedding( + input=inputs[1], + is_sparse=True, + size=[dict_size, embedding_size], + param_attr=fluid.ParamAttr( + name='emb_w', + initializer=fluid.initializer.Constant(value=0.0))) + + true_emb_b = fluid.layers.embedding( + input=inputs[1], + is_sparse=True, + size=[dict_size, 1], + param_attr=fluid.ParamAttr( + name='emb_b', + initializer=fluid.initializer.Constant(value=0.0))) + + neg_word_reshape = fluid.layers.reshape(inputs[2], shape=[-1, 1]) + neg_word_reshape.stop_gradient = True + + neg_emb_w = fluid.layers.embedding( + input=neg_word_reshape, + is_sparse=True, + size=[dict_size, embedding_size], + param_attr=fluid.ParamAttr( + name='emb_w', learning_rate=1.0)) + + neg_emb_w_re = fluid.layers.reshape( + neg_emb_w, shape=[-1, neg_num, embedding_size]) + + neg_emb_b = fluid.layers.embedding( + input=neg_word_reshape, + is_sparse=True, + size=[dict_size, 1], + param_attr=fluid.ParamAttr( + name='emb_b', learning_rate=1.0)) + + neg_emb_b_vec = fluid.layers.reshape(neg_emb_b, shape=[-1, neg_num]) + + true_logits = fluid.layers.elementwise_add( + fluid.layers.reduce_sum( + fluid.layers.elementwise_mul(input_emb, true_emb_w), + dim=1, + keep_dim=True), + true_emb_b) + + input_emb_re = fluid.layers.reshape( + input_emb, shape=[-1, 1, embedding_size]) + + neg_matmul = fluid.layers.matmul( + input_emb_re, neg_emb_w_re, transpose_y=True) + neg_matmul_re = fluid.layers.reshape(neg_matmul, shape=[-1, neg_num]) + neg_logits = fluid.layers.elementwise_add(neg_matmul_re, neg_emb_b_vec) + # nce loss + label_ones = fluid.layers.fill_constant_batch_size_like( + true_logits, shape=[-1, 1], value=1.0, dtype='float32') + label_zeros = fluid.layers.fill_constant_batch_size_like( + true_logits, shape=[-1, neg_num], value=0.0, dtype='float32') + + true_xent = fluid.layers.sigmoid_cross_entropy_with_logits(true_logits, + label_ones) + neg_xent = fluid.layers.sigmoid_cross_entropy_with_logits(neg_logits, + label_zeros) + cost = fluid.layers.elementwise_add( + fluid.layers.reduce_sum( + true_xent, dim=1), + fluid.layers.reduce_sum( + neg_xent, dim=1)) + avg_cost = fluid.layers.reduce_mean(cost) + + sgd_optimizer = fluid.optimizer.SGD( + learning_rate=fluid.layers.exponential_decay( + learning_rate=1.0, + decay_steps=2100, + decay_rate=0.1, + staircase=True)) + sgd_optimizer.minimize(avg_cost) + + def transpiler_test_impl(self): + trainer, startup = self.get_trainer() + + fake_init_ops = [] + for op in startup.global_block().ops: + if op.type == "fake_init": + fake_init_ops.append(op) + + self.assertEqual(len(fake_init_ops), 3) + + class TestDecayedAdagrad(TranspilerTest): def net_conf(self): x = fluid.layers.data(name='x', shape=[1000], dtype='float32') @@ -788,7 +898,7 @@ class TestLoadSliceVar(TranspilerTest): class TestNCCL2Transpile(TranspilerTest): def test_nccl2_transpile(self): - if fluid.core.is_compiled_with_cuda(): #test nccl2 only with cuda + if fluid.core.is_compiled_with_cuda(): # test nccl2 only with cuda main = fluid.Program() startup = fluid.Program() with fluid.program_guard(main, startup): diff --git a/python/paddle/fluid/tests/unittests/test_hsigmoid_remote_table_op.py b/python/paddle/fluid/tests/unittests/test_hsigmoid_remote_table_op.py deleted file mode 100644 index 014d30486d7e2009165a2e05cbedffc7c175ccee..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/unittests/test_hsigmoid_remote_table_op.py +++ /dev/null @@ -1,271 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import os -import signal -import time -import unittest -from multiprocessing import Process - -import numpy as np -import paddle.fluid as fluid -import paddle.fluid.core as core -from paddle.fluid.op import Operator -from paddle.fluid.framework import Program, program_guard -from dist_test_utils import * - - -def run_pserver(pserver_id, use_cuda, sync_mode): - remove_ps_flag(os.getpid()) - scope = fluid.core.Scope() - program = Program() - with fluid.scope_guard(scope): - with program_guard(program, startup_program=Program()): - # create table parameter in scope - place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - # create and initialize Param Variable - param = scope.var('table').get_tensor() - - param_array = np.ones((5, 8)).astype("float32") - for i in range(len(param_array)): - param_array[i] *= param_array[i] * i + pserver_id * 10 + 1 - param.set(param_array, place) - - optimize_block = program._create_block(program.global_block().idx) - program.global_block().append_op( - type="listen_and_serv", - inputs={'X': []}, - outputs={}, - attrs={ - "optimize_blocks": [optimize_block], - "endpoint": '127.0.0.1:0', - "Fanin": 1, - "sync_mode": True, - "grad_to_block_id": [] - }) - - exe = fluid.Executor(place) - exe.run(program) - - -class TestListenAndServOp(unittest.TestCase): - def setUp(self): - self.ps_timeout = 5 - - def _start_pserver(self, pserver_id, use_cuda, sync_mode, pserver_func): - p = Process(target=pserver_func, args=(pserver_id, use_cuda, sync_mode)) - p.daemon = True - p.start() - return p - - def _wait_ps_ready(self, pid): - start_left_time = self.ps_timeout - sleep_time = 0.5 - while True: - assert start_left_time >= 0, "wait ps ready failed" - time.sleep(sleep_time) - try: - # the listen_and_serv_op would touch a file which contains the listen port - # on the /tmp directory until it was ready to process all the RPC call. - os.stat("/tmp/paddle.%d.port" % pid) - return - except os.error: - start_left_time -= sleep_time - - def _get_pserver_port(self, pid): - with open("/tmp/paddle.%d.port" % pid, 'r') as f: - port = int(f.read().strip()) - return port - - def _run_hsigmoid_op_one_pserver(self, place, port): - scope = fluid.core.Scope() - program = Program() - with fluid.scope_guard(scope): - with program_guard(program, startup_program=Program()): - x = scope.var('X').get_tensor() - x_array = np.random.random((4, 8)).astype("float32") * 2 - x.set(x_array, place) - # create and initialize Param Variable - param = scope.var('W').get_tensor() - param_array = np.zeros((5, 8)).astype("float32") * 2 - param.set(param_array, place) - - path_table = scope.var('PathTable').get_tensor() - path_table_array = np.array( - [(0, 2, -1, -1, -1), (0, 1, 2, -1, -1), (0, 1, 4, -1, -1), - (0, 2, -1, -1, -1)]).astype( - "int64" - ) #np.array to store 1,2,5,6s' non-leaf path(root -> leaf) - path_table.set(path_table_array, place) - - path_code = scope.var('PathCode').get_tensor() - path_code_array = np.array( - [(0, 0, -1, -1, -1), (1, 1, 1, -1, -1), (1, 0, 0, -1, -1), - (0, 1, -1, -1, -1)]).astype("int64") #np.array to store - path_code.set(path_code_array, place) - - label = scope.var('Label').get_tensor() - label_array = np.array([0, 1, 4, 5]) - label.set(label_array, place) - - bias = scope.var('Bias').get_tensor() - bias_array = np.random.random((5, 1)).astype("float32") - bias.set(bias_array, place) - - out = scope.var('Out').get_tensor() - - pre_out = scope.var('PreOut').get_tensor - - w_out = scope.var('W_Out').get_tensor() - w_out.set(param_array, place) - - emaps = ['127.0.0.1:' + str(port)] - table_names = ['table'] - height_sections = [2] - - # create and run sgd operator - hsigmoid_op = Operator( - "hierarchical_sigmoid", - X='X', - W='W', - PathTable='PathTable', - PathCode='PathCode', - Label='Label', - Bias='Bias', - Out='Out', - PreOut='PreOut', - W_Out='W_Out', - remote_prefetch=True, - epmap=emaps, - table_names=table_names, - height_sections=height_sections) - - hsigmoid_op.run(scope, place) - - # get and compare result - result_array = np.array(w_out) - self.assertEqual(list(result_array.shape), [5, 8]) - correct = None - for i in range(5): - if i != 3: - correct = np.full((1, 8), i + 1).astype("float32") - self.assertTrue((result_array[i] == correct).all()) - else: - correct = np.full((1, 8), 0).astype("float32") - self.assertTrue((result_array[i] == correct).all()) - - def _run_hsigmoid_op_two_pserver(self, place, port0, port1): - scope = fluid.core.Scope() - program = Program() - with fluid.scope_guard(scope): - with program_guard(program, startup_program=Program()): - x = scope.var('X').get_tensor() - x_array = np.random.random((4, 8)).astype("float32") * 2 - x.set(x_array, place) - # create and initialize Param Variable - param = scope.var('W').get_tensor() - param_array = np.zeros((5, 8)).astype("float32") * 2 - param.set(param_array, place) - - path_table = scope.var('PathTable').get_tensor() - path_table_array = np.array( - [(0, 2, -1, -1, -1), (0, 1, 3, -1, -1), (0, 1, 4, -1, -1), - (0, 2, -1, -1, -1)]).astype( - "int64" - ) #np.array to store 1,2,5,6s' non-leaf path(root -> leaf) - path_table.set(path_table_array, place) - - path_code = scope.var('PathCode').get_tensor() - path_code_array = np.array( - [(0, 0, -1, -1, -1), (1, 1, 1, -1, -1), (1, 0, 0, -1, -1), - (0, 1, -1, -1, -1)]).astype("int64") #np.array to store - path_code.set(path_code_array, place) - - label = scope.var('Label').get_tensor() - label_array = np.array([0, 1, 4, 5]) - label.set(label_array, place) - - bias = scope.var('Bias').get_tensor() - bias_array = np.random.random((5, 1)).astype("float32") - bias.set(bias_array, place) - - out = scope.var('Out').get_tensor() - - pre_out = scope.var('PreOut').get_tensor - - w_out = scope.var('W_Out').get_tensor() - w_out.set(param_array, place) - - emaps = ['127.0.0.1:' + str(port0), '127.0.0.1:' + str(port1)] - table_names = ['table', 'table'] - height_sections = [2, 3] - - # create and run sgd operator - hsigmoid_op = Operator( - "hierarchical_sigmoid", - X='X', - W='W', - PathTable='PathTable', - PathCode='PathCode', - Label='Label', - Bias='Bias', - Out='Out', - PreOut='PreOut', - W_Out='W_Out', - remote_prefetch=True, - epmap=emaps, - table_names=table_names, - height_sections=height_sections) - hsigmoid_op.run(scope, place) - - # get and compare result - result_array = np.array(w_out) - self.assertEqual(list(result_array.shape), [5, 8]) - correct = None - for i in range(5): - if i < 2: - correct = np.full((1, 8), i + 1).astype("float32") - self.assertTrue((result_array[i] == correct).all()) - else: - correct = np.full((1, 8), i + 9).astype("float32") - self.assertTrue((result_array[i] == correct).all()) - - def test_hsigmoid_op_remote(self): - os.environ['PADDLE_ENABLE_REMOTE_PREFETCH'] = "1" - # run pserver on CPU in sync mode - p0 = self._start_pserver(0, False, True, run_pserver) - self._wait_ps_ready(p0.pid) - port0 = self._get_pserver_port(p0.pid) - - p1 = self._start_pserver(1, False, True, run_pserver) - self._wait_ps_ready(p1.pid) - port1 = self._get_pserver_port(p1.pid) - - places = [core.CPUPlace()] - - for place in places: - self._run_hsigmoid_op_one_pserver(place, port0) - self._run_hsigmoid_op_two_pserver(place, port0, port1) - - # raise SIGTERM to pserver - os.kill(p0.pid, signal.SIGINT) - p0.join() - os.kill(p1.pid, signal.SIGINT) - p1.join() - - -if __name__ == '__main__': - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_recv_save_op.py b/python/paddle/fluid/tests/unittests/test_recv_save_op.py new file mode 100644 index 0000000000000000000000000000000000000000..a8324a6a02d34e5083fa5072d43501a3ef613831 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_recv_save_op.py @@ -0,0 +1,265 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import signal +import time +import shutil +import unittest + +from multiprocessing import Process + +import numpy as np +import paddle.fluid as fluid +import paddle.fluid.core as core +from paddle.fluid.op import Operator +from paddle.fluid.framework import Program, program_guard +from paddle.fluid.transpiler.details import VarStruct, VarsDistributed +from dist_test_utils import * + + +def run_pserver(pserver_id): + remove_ps_flag(os.getpid()) + scope = fluid.core.Scope() + program = Program() + with fluid.scope_guard(scope): + with program_guard(program, startup_program=Program()): + # create table parameter in scope + place = fluid.CPUPlace() + # create and initialize Param Variable + param = scope.var('table').get_tensor() + + param_array = np.ones((5, 8)).astype("float32") + for i in range(len(param_array)): + param_array[i] *= param_array[i] * i + pserver_id * 10 + 1 + param.set(param_array, place) + + optimize_block = program._create_block(program.global_block().idx) + program.global_block().append_op( + type="listen_and_serv", + inputs={'X': []}, + outputs={}, + attrs={ + "optimize_blocks": [optimize_block], + "endpoint": '127.0.0.1:0', + "Fanin": 1, + "sync_mode": True, + "grad_to_block_id": [] + }) + + exe = fluid.Executor(place) + exe.run(program) + + +class TestListenAndServOp(unittest.TestCase): + def setUp(self): + self.ps_timeout = 5 + + def _start_pserver(self, pserver_id, pserver_func): + p = Process(target=pserver_func, args=(pserver_id, )) + p.daemon = True + p.start() + return p + + def _wait_ps_ready(self, pid): + start_left_time = self.ps_timeout + sleep_time = 0.5 + while True: + assert start_left_time >= 0, "wait ps ready failed" + time.sleep(sleep_time) + try: + # the listen_and_serv_op would touch a file which contains the listen port + # on the /tmp directory until it was ready to process all the RPC call. + os.stat("/tmp/paddle.%d.port" % pid) + return + except os.error: + start_left_time -= sleep_time + + def _get_pserver_port(self, pid): + with open("/tmp/paddle.%d.port" % pid, 'r') as f: + port = int(f.read().strip()) + return port + + def _run_nce_op_two_pserver(self, place, port0, port1, model_file): + scope = fluid.core.Scope() + program = Program() + with fluid.scope_guard(scope): + with program_guard(program, startup_program=Program()): + emaps = ['127.0.0.1:' + str(port0), '127.0.0.1:' + str(port1)] + + # create and run recv and save operator + remote_recv_op = Operator( + "recv_save", + trainer_id=0, + shape=[10, 8], + slice_shapes=["5,8", "5,8"], + slice_varnames=["table", "table"], + remote_varnames=['table', 'table'], + endpoints=emaps, + file_path=model_file) + + remote_recv_op.run(scope, place) + + def _load_slice_var(self, model_file): + load_prog = fluid.Program() + load_block = load_prog.global_block() + + origin = load_block.create_var( + name="var.origin", + type=fluid.core.VarDesc.VarType.LOD_TENSOR, + shape=[10, 8], + dtype="float32", + persistable=True) + + slice0 = load_block.create_var( + name="var.slice0", + type=fluid.core.VarDesc.VarType.LOD_TENSOR, + shape=[3, 8], + dtype="float32", + persistable=True) + + slice1 = load_block.create_var( + name="var.slice1", + type=fluid.core.VarDesc.VarType.LOD_TENSOR, + shape=[5, 8], + dtype="float32", + persistable=True) + + load_block.append_op( + type='load', + inputs={}, + outputs={'Out': [origin]}, + attrs={'file_path': model_file}) + + load_block.append_op( + type='load', + inputs={}, + outputs={'Out': [slice0]}, + attrs={ + 'file_path': model_file, + 'seek': 2 * 8, + 'shape': slice0.shape + }) + + load_block.append_op( + type='load', + inputs={}, + outputs={'Out': [slice1]}, + attrs={ + 'file_path': model_file, + 'seek': 5 * 8, + 'shape': slice1.shape + }) + + exe = fluid.Executor(place=fluid.CPUPlace()) + exe.run(load_prog) + + origin_var = fluid.global_scope().find_var("var.origin") + slice0_var = fluid.global_scope().find_var("var.slice0") + slice1_var = fluid.global_scope().find_var("var.slice1") + + origin = np.array(origin_var.get_tensor()) + slice0 = np.array(slice0_var.get_tensor()) + slice1 = np.array(slice1_var.get_tensor()) + + np.testing.assert_equal(origin[2:5], slice0) + np.testing.assert_equal(origin[5:10], slice1) + + def _save_by_io_persistables(self, place, port0, port1, dirname, var_name): + exe = fluid.Executor(place=place) + + vars_overview = VarsDistributed() + + orig_var = VarStruct( + name=var_name, + type=fluid.core.VarDesc.VarType.LOD_TENSOR, + shape=[10, 8], + dtype="float32", + lod_level=0, + persistable=True) + + slice_0_var = VarStruct( + name=var_name, + type=fluid.core.VarDesc.VarType.LOD_TENSOR, + shape=[5, 8], + dtype="float32", + lod_level=0, + persistable=True) + + slice_1_var = VarStruct( + name=var_name, + type=fluid.core.VarDesc.VarType.LOD_TENSOR, + shape=[5, 8], + dtype="float32", + lod_level=0, + persistable=True) + + vars_overview.add_distributed_var( + origin_var=orig_var, + slice_var=slice_0_var, + block_id=0, + offset=0, + is_slice=True, + vtype="RemotePrefetch", + endpoint="{}:{}".format("127.0.0.1", port0)) + + vars_overview.add_distributed_var( + origin_var=orig_var, + slice_var=slice_1_var, + block_id=1, + offset=40, + is_slice=True, + vtype="RemotePrefetch", + endpoint="{}:{}".format("127.0.0.1", port1)) + + program = Program() + program._is_distributed = True + program._is_chief = True + program._parameters_on_pservers = vars_overview + + fluid.io.save_persistables(exe, dirname, program) + + def test_recv_save_op_remote(self): + # run pserver on CPU in sync mode + p0 = self._start_pserver(0, run_pserver) + self._wait_ps_ready(p0.pid) + port0 = self._get_pserver_port(p0.pid) + + p1 = self._start_pserver(1, run_pserver) + self._wait_ps_ready(p1.pid) + port1 = self._get_pserver_port(p1.pid) + + places = [core.CPUPlace()] + + param_dir = "./model_for_test_recv_save_op/" + param_name = "table" + + for place in places: + self._save_by_io_persistables(place, port0, port1, param_dir, + param_name) + + # raise SIGTERM to pserver + os.kill(p0.pid, signal.SIGINT) + p0.join() + os.kill(p1.pid, signal.SIGINT) + p1.join() + + self._load_slice_var(param_dir + param_name) + shutil.rmtree(param_dir) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_transpiler_ops.py b/python/paddle/fluid/tests/unittests/test_transpiler_ops.py new file mode 100644 index 0000000000000000000000000000000000000000..9512ae495d8b61e4710b762916c6b7d138140896 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_transpiler_ops.py @@ -0,0 +1,143 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import traceback +import math +import collections + +import six +import unittest +import numpy as np + +import gc + +gc.set_debug(gc.DEBUG_COLLECTABLE) + +import paddle.fluid as fluid +from test_dist_transpiler import TranspilerTest + + +class TestFakeInit(TranspilerTest): + def net_conf(self): + dict_size, embedding_size, neg_num = 10000, 8, 5 + + input_word = fluid.layers.data( + name="input_word", shape=[1], dtype='int64', lod_level=1) + true_word = fluid.layers.data( + name='true_label', shape=[1], dtype='int64', lod_level=1) + neg_word = fluid.layers.data( + name="neg_label", shape=[1], dtype='int64', lod_level=1) + inputs = [input_word, true_word, neg_word] + + init_width = 0.5 / embedding_size + input_emb = fluid.layers.embedding( + input=inputs[0], + is_sparse=True, + size=[dict_size, embedding_size], + param_attr=fluid.ParamAttr( + name='emb', + initializer=fluid.initializer.Uniform(-init_width, init_width))) + + true_emb_w = fluid.layers.embedding( + input=inputs[1], + is_sparse=True, + size=[dict_size, embedding_size], + param_attr=fluid.ParamAttr( + name='emb_w', + initializer=fluid.initializer.Constant(value=0.0))) + + true_emb_b = fluid.layers.embedding( + input=inputs[1], + is_sparse=True, + size=[dict_size, 1], + param_attr=fluid.ParamAttr( + name='emb_b', + initializer=fluid.initializer.Constant(value=0.0))) + + neg_word_reshape = fluid.layers.reshape(inputs[2], shape=[-1, 1]) + neg_word_reshape.stop_gradient = True + + neg_emb_w = fluid.layers.embedding( + input=neg_word_reshape, + is_sparse=True, + size=[dict_size, embedding_size], + param_attr=fluid.ParamAttr( + name='emb_w', learning_rate=1.0)) + + neg_emb_w_re = fluid.layers.reshape( + neg_emb_w, shape=[-1, neg_num, embedding_size]) + + neg_emb_b = fluid.layers.embedding( + input=neg_word_reshape, + is_sparse=True, + size=[dict_size, 1], + param_attr=fluid.ParamAttr( + name='emb_b', learning_rate=1.0)) + + neg_emb_b_vec = fluid.layers.reshape(neg_emb_b, shape=[-1, neg_num]) + + true_logits = fluid.layers.elementwise_add( + fluid.layers.reduce_sum( + fluid.layers.elementwise_mul(input_emb, true_emb_w), + dim=1, + keep_dim=True), + true_emb_b) + + input_emb_re = fluid.layers.reshape( + input_emb, shape=[-1, 1, embedding_size]) + + neg_matmul = fluid.layers.matmul( + input_emb_re, neg_emb_w_re, transpose_y=True) + neg_matmul_re = fluid.layers.reshape(neg_matmul, shape=[-1, neg_num]) + neg_logits = fluid.layers.elementwise_add(neg_matmul_re, neg_emb_b_vec) + # nce loss + label_ones = fluid.layers.fill_constant_batch_size_like( + true_logits, shape=[-1, 1], value=1.0, dtype='float32') + label_zeros = fluid.layers.fill_constant_batch_size_like( + true_logits, shape=[-1, neg_num], value=0.0, dtype='float32') + + true_xent = fluid.layers.sigmoid_cross_entropy_with_logits(true_logits, + label_ones) + neg_xent = fluid.layers.sigmoid_cross_entropy_with_logits(neg_logits, + label_zeros) + cost = fluid.layers.elementwise_add( + fluid.layers.reduce_sum( + true_xent, dim=1), + fluid.layers.reduce_sum( + neg_xent, dim=1)) + avg_cost = fluid.layers.reduce_mean(cost) + + sgd_optimizer = fluid.optimizer.SGD( + learning_rate=fluid.layers.exponential_decay( + learning_rate=1.0, + decay_steps=2100, + decay_rate=0.1, + staircase=True)) + sgd_optimizer.minimize(avg_cost) + + def transpiler_test_impl(self): + trainer, startup = self.get_trainer() + + fake_init_ops = [] + for op in startup.global_block().ops: + if op.type == "fake_init": + fake_init_ops.append(op) + + self.assertEqual(len(fake_init_ops), 3) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index bd6def9506f1284ade9c47404243081a51f25538..3579536c96f2a98453876a5bb978241458630cb9 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -403,7 +403,7 @@ class DistributeTranspiler(object): def _get_all_remote_sparse_update_op(self, main_program): sparse_update_ops = [] - sparse_update_op_types = ["lookup_table", "nce", "hierarchical_sigmoid"] + sparse_update_op_types = ["lookup_table", "nce"] for op in main_program.global_block().ops: if op.type in sparse_update_op_types and op.attr( 'remote_prefetch') is True: @@ -607,6 +607,7 @@ class DistributeTranspiler(object): self.origin_program) # use_sparse_update_param_name -> split_height_section self.sparse_param_to_height_sections = dict() + self.need_delete_optimize_vars = [] # add distributed attrs to program self.origin_program._is_distributed = True @@ -861,6 +862,78 @@ class DistributeTranspiler(object): self._get_distributed_optimizer_vars() self.origin_program._parameters_on_pservers = self.vars_overview + def _get_sparse_table_names(self): + sparse_update_op_types = ["lookup_table", "nce"] + + sparse_table_names = [] + for op in self.origin_program.global_block().ops: + if op.type in sparse_update_op_types and op.attr( + 'is_sparse') is True: + sparse_table_names.append(op.input("W")[0]) + if op.type == "distributed_lookup_table": + sparse_table_names.append(op.input("W")[0]) + + if self.has_distributed_lookup_table: + sparse_table_names.append(self.table_name) + + return list(set(sparse_table_names)) + + def _fake_init_sparsetable(self, sparse_table_names): + # delete table init op + for table_name in sparse_table_names: + table_var = self.startup_program.global_block().vars[table_name] + table_param_init_op = [] + for op in self.startup_program.global_block().ops: + if table_name in op.output_arg_names: + table_param_init_op.append(op) + init_op_num = len(table_param_init_op) + if init_op_num != 1: + raise ValueError("table init op num should be 1, now is " + str( + init_op_num)) + table_init_op = table_param_init_op[0] + self.startup_program.global_block().append_op( + type="fake_init", + inputs={}, + outputs={"Out": table_var}, + attrs={"shape": table_init_op.attr('shape')}) + delete_ops(self.startup_program.global_block(), table_param_init_op) + + def _delete_trainer_optimizer(self, is_startup): + optimize_vars = [] + optimize_op_role_vars = [] + optimize_need_delete_vars = [] + + for op in self.optimize_ops: + optimize_vars.extend(op.input_arg_names) + optimize_op_role_vars.extend(op.attr("op_role_var")) + + optimize_vars = list(set(optimize_vars)) + optimize_op_role_vars = list(set(optimize_op_role_vars)) + + for var in optimize_vars: + if var not in optimize_op_role_vars: + optimize_need_delete_vars.append(var) + need_delete_optimize_vars = list(set(optimize_need_delete_vars)) + + if is_startup: + init_ops = [] + for var in need_delete_optimize_vars: + param_init_op = [] + for op in self.startup_program.global_block().ops: + if var in op.output_arg_names: + param_init_op.append(op) + init_ops.extend(param_init_op) + delete_ops(self.startup_program.global_block(), init_ops) + + for var in need_delete_optimize_vars: + if self.startup_program.global_block().has_var(var): + self.startup_program.global_block()._remove_var(var) + else: + delete_ops(self.origin_program.global_block(), self.optimize_ops) + for var in need_delete_optimize_vars: + if self.origin_program.global_block().has_var(var): + self.origin_program.global_block()._remove_var(var) + def get_trainer_program(self, wait_port=True): """ Get transpiled trainer side program. The program on trainer side compared with origin program @@ -891,31 +964,16 @@ class DistributeTranspiler(object): # remove optimize ops and add a send op to main_program # FIXME(typhoonzero): Also ops like clip_gradient, lrn_decay? + self._delete_trainer_optimizer(is_startup=True) + sparse_table_names = self._get_sparse_table_names() + self._fake_init_sparsetable(sparse_table_names) + lr_ops = self._get_lr_ops() - delete_ops(self.origin_program.global_block(), self.optimize_ops) delete_ops(self.origin_program.global_block(), lr_ops) - - # delete table init op - if self.has_distributed_lookup_table: - table_var = self.startup_program.global_block().vars[ - self.table_name] - table_param_init_op = [] - for op in self.startup_program.global_block().ops: - if self.table_name in op.output_arg_names: - table_param_init_op.append(op) - init_op_num = len(table_param_init_op) - if init_op_num != 1: - raise ValueError("table init op num should be 1, now is " + str( - init_op_num)) - table_init_op = table_param_init_op[0] - self.startup_program.global_block().append_op( - type="fake_init", - inputs={}, - outputs={"Out": table_var}, - attrs={"shape": table_init_op.attr('shape')}) - delete_ops(self.startup_program.global_block(), table_param_init_op) + self._delete_trainer_optimizer(is_startup=False) self.origin_program.__str__() + self.startup_program.__str__() if wait_port: wait_server_ready(self.pserver_endpoints) @@ -937,8 +995,14 @@ class DistributeTranspiler(object): # FIXME(gongwb): delete not need ops. # note that: some parameter is not trainable and those ops can't be deleted. + sparse_table_names = self._get_sparse_table_names() + + # self._fake_init_sparsetable(sparse_table_names) + #self._delete_trainer_optimizer(is_startup=True) for varname, splited_var in six.iteritems(self.param_var_mapping): + if varname in sparse_table_names: + continue # Get the eplist of recv vars eps = [] for var in splited_var: @@ -980,6 +1044,8 @@ class DistributeTranspiler(object): }) for varname, splited_var in six.iteritems(self.param_var_mapping): + if varname in sparse_table_names: + continue # add concat ops to merge splited parameters received from parameter servers. if len(splited_var) <= 1: continue