diff --git a/paddle/fluid/operators/distributed/parameter_prefetch.cc b/paddle/fluid/operators/distributed/parameter_prefetch.cc index aebf6376d16e1e918905f559b2b84396f1a2452e..52085482f4753ac76019ea649f6714c27dce1fc2 100644 --- a/paddle/fluid/operators/distributed/parameter_prefetch.cc +++ b/paddle/fluid/operators/distributed/parameter_prefetch.cc @@ -32,7 +32,7 @@ namespace paddle { namespace operators { namespace distributed { -using Tensor = framework::Tensor; +using LoDTensor = framework::LoDTensor; using LoDTensor = framework::LoDTensor; using SelectedRows = framework::SelectedRows; using DDim = framework::DDim; @@ -120,8 +120,8 @@ static void MergeMultipleVarsIntoOneBySection( PADDLE_ENFORCE_GT( out_tensor->numel(), 0, - "When calling this method, the Tensor's numel must larger than zero. " - "Please check Tensor::Resize has been called first."); + "When calling this method, the LoDTensor's numel must larger than zero. " + "Please check LoDTensor::Resize has been called first."); auto* out_tensor_data = out_tensor->mutable_data(id_tensor.place()); @@ -144,7 +144,7 @@ static void MergeMultipleVarsIntoOneBySection( auto row_numel = dims[1]; - for (size_t i = 0; i < dims[0]; ++i) { + for (int64_t i = 0; i < dims[0]; ++i) { auto id = ids_in_this_section[i]; auto origin_id = id + abs_sections[section_idx]; auto& offsets = id_to_offset[origin_id]; @@ -201,7 +201,7 @@ void prefetch(const std::string& id_name, const std::string& out_name, std::vector ids_vector; if (platform::is_cpu_place(id_tensor.place())) { auto* id_data = id_tensor.data(); - for (size_t i = 0; i < id_tensor.numel(); ++i) { + for (int64_t i = 0; i < id_tensor.numel(); ++i) { ids_vector.push_back(id_data[i]); } } else { @@ -209,7 +209,7 @@ void prefetch(const std::string& id_name, const std::string& out_name, PADDLE_THROW("paddle is not compiled with CUDA!"); #else auto cpu_place = platform::CPUPlace(); - framework::Tensor cpu_tensor; + framework::LoDTensor cpu_tensor; auto* cpu_tensor_data = cpu_tensor.mutable_data(id_tensor.dims(), cpu_place); auto stream = diff --git a/paddle/fluid/operators/distributed/parameter_prefetch.h b/paddle/fluid/operators/distributed/parameter_prefetch.h index 53482c4c40e4446bc3f0fca8d4b3199354f9a130..882c6bd9b84f8e3bfb6cb8f1cc154ac497e4f97c 100644 --- a/paddle/fluid/operators/distributed/parameter_prefetch.h +++ b/paddle/fluid/operators/distributed/parameter_prefetch.h @@ -30,6 +30,30 @@ void prefetch(const std::string& id_name, const std::string& out_name, const framework::ExecutionContext& context, const framework::Scope& scope); +template +void prefetch_with_reconstruct(const std::string& id_name, + const std::string& out_name, + const std::vector& table_names, + const std::vector& epmap, + const std::vector& height_sections, + const framework::ExecutionContext& context, + const framework::Scope& scope, + framework::LoDTensor* original) { + prefetch(id_name, out_name, table_names, epmap, height_sections, context, + scope); + auto& out = scope.FindVar(out_name)->Get(); + auto& ids = scope.FindVar(id_name)->Get(); + auto* original_value = original->data(); + auto* out_value = out.data(); + size_t original_width = original->numel() / original->dims()[0]; + + for (int64_t i = 0; i < ids.numel(); i++) { + const T* out_rows = out_value + original_width * i; + T* original_row = original_value + original_width * ids.data()[i]; + std::memcpy(original_row, out_rows, original_width * sizeof(T)); + } +} + }; // namespace distributed }; // namespace operators }; // namespace paddle diff --git a/paddle/fluid/operators/hierarchical_sigmoid_op.cc b/paddle/fluid/operators/hierarchical_sigmoid_op.cc index 0dbcc442dfa1a395cdb0ffbd69eb78ad66cfaa17..b9059f6b054f477bf86aeb0dc323dcd4dbfca9f1 100644 --- a/paddle/fluid/operators/hierarchical_sigmoid_op.cc +++ b/paddle/fluid/operators/hierarchical_sigmoid_op.cc @@ -67,6 +67,11 @@ class HierarchicalSigmoidOp : public framework::OperatorWithKernel { PADDLE_ENFORCE(ctx->HasOutput("Out"), "Output(Out) should not be null."); PADDLE_ENFORCE(ctx->HasOutput("PreOut"), "Output(PreOut) should not be null."); + auto with_prefetch = ctx->Attrs().Get("remote_prefetch"); + if (with_prefetch) { + PADDLE_ENFORCE(ctx->HasOutput("W_Out"), + "Output(W_Out) should not be null."); + } const int64_t batch_size = ctx->GetInputDim("X")[0]; std::vector output_shape({batch_size, 1}); ctx->SetOutputDim("Out", framework::make_ddim(output_shape)); @@ -96,7 +101,7 @@ class HierarchicalSigmoidOpMaker : public framework::OpProtoAndCheckerMaker { AddInput("Label", "(LoDTensor, required), The labels of training data. It's a" "tensor with shape [N, 1]."); - AddInput("PTable", + AddInput("PathTable", "(LoDTensor, optional), The Path Table from root to current word" "it should have shape like [N, L], L is the length of the Path") .AsDispensable(); @@ -120,8 +125,30 @@ class HierarchicalSigmoidOpMaker : public framework::OpProtoAndCheckerMaker { "[batch_size, code_length], where code_length represents the " "maximum path length from root to leaf nodes.") .AsIntermediate(); + AddOutput( + "W_Out", + "(LoDTensor, optinal) using input 'W' as Output to make it mutable" + "When we are using prefetch") + .AsIntermediate(); AddAttr("num_classes", "(int, optional), The number of classes") .SetDefault(2); + // for parameter prefetch + AddAttr("remote_prefetch", "").SetDefault(false); + AddAttr("trainer_id", "trainer id from 0 ~ worker_num.").SetDefault(0); + AddAttr>("height_sections", + "Height for each output SelectedRows.") + .SetDefault(std::vector({})); + AddAttr>( + "epmap", + "(string vector, default 127.0.0.1:6164)" + "Server endpoints in the order of input variables for mapping") + .SetDefault({}); + AddAttr>( + "table_names", + "(string vector, the splited table names that will be fetched from " + "parameter server)" + "in the order of input variables for mapping") + .SetDefault({}); AddComment(R"DOC( The hierarchical sigmoid operator organize the classes into a binary tree. At each node, a sigmoid function is used to calculate the probability of @@ -191,23 +218,17 @@ class HierarchicalSigmoidGradOpGradVarTypeInference << " is set to SelectedRows"; block->Var(w_grad_var_name) ->SetType(framework::proto::VarType::SELECTED_ROWS); - if (hasBias) { - VLOG(30) << "hierarchical_sigmoid_grad op " - << framework::GradVarName("Bias") << " is set to SelectedRows"; - block->Var(bias_grad_var_name) - ->SetType(framework::proto::VarType::SELECTED_ROWS); - } } else { VLOG(30) << "hierarchical_sigmoid_grad op " << framework::GradVarName("W") << " is set to LoDTensor"; block->Var(w_grad_var_name) ->SetType(framework::proto::VarType::LOD_TENSOR); - if (hasBias) { - VLOG(30) << "hierarchical_sigmoid_grad op " - << framework::GradVarName("Bias") << " is set to LoDTensor"; - block->Var(bias_grad_var_name) - ->SetType(framework::proto::VarType::LOD_TENSOR); - } + } + if (hasBias) { + VLOG(30) << "hierarchical_sigmoid_grad op " + << framework::GradVarName("Bias") << " is set to LoDTensor"; + block->Var(bias_grad_var_name) + ->SetType(framework::proto::VarType::LOD_TENSOR); } block->Var(w_grad_var_name)->SetDataType(block->Var("W")->GetDataType()); } diff --git a/paddle/fluid/operators/hierarchical_sigmoid_op.h b/paddle/fluid/operators/hierarchical_sigmoid_op.h index b73a32af89e882ac02623dd1d312f400a78fc47a..d8e406a96b664bd06f9d611c7d0b423155398291 100644 --- a/paddle/fluid/operators/hierarchical_sigmoid_op.h +++ b/paddle/fluid/operators/hierarchical_sigmoid_op.h @@ -14,7 +14,9 @@ limitations under the License. */ #pragma once #include +#include #include +#include #include #include "paddle/fluid/framework/mixed_vector.h" #include "paddle/fluid/framework/op_registry.h" @@ -24,6 +26,10 @@ limitations under the License. */ #include "paddle/fluid/operators/math/matrix_bit_code.h" #include "paddle/fluid/platform/transform.h" +#ifdef PADDLE_WITH_DISTRIBUTE +#include "paddle/fluid/operators/distributed/parameter_prefetch.h" +#endif + namespace paddle { namespace operators { @@ -49,13 +55,55 @@ class HierarchicalSigmoidOpKernel : public framework::OpKernel { void Compute(const framework::ExecutionContext& ctx) const override { auto& in = detail::Ref(ctx.Input("X")); auto& w = detail::Ref(ctx.Input("W")); - auto* path = ctx.Input("PTable"); + auto* path = ctx.Input("PathTable"); auto* code = ctx.Input("PathCode"); auto& label = detail::Ref(ctx.Input("Label")); auto* bias = ctx.Input("Bias"); auto* out = ctx.Output("Out"); auto* pre_out = ctx.Output("PreOut"); size_t num_classes = static_cast(ctx.Attr("num_classes")); + // for remote prefetch + + auto epmap = ctx.Attr>("epmap"); + if (!epmap.empty()) { + // if epmap is not empty, then the parameter will be fetched from remote + // parameter + // server + auto height_sections = ctx.Attr>("height_sections"); + auto table_names = ctx.Attr>("table_names"); + VLOG(3) << "path type is " << path->type().name(); + std::vector real_rows = PathToRows(*path); + framework::Scope& local_scope = ctx.scope().NewScope(); + auto* ids = local_scope.Var("Ids@Prefetch"); + auto* x_tensor = ids->GetMutable(); + + x_tensor->mutable_data( + framework::make_ddim({static_cast(real_rows.size()), 1}), + ctx.GetPlace()); + // copy. + + std::memcpy(x_tensor->data(), real_rows.data(), + real_rows.size() * sizeof(int64_t)); + + framework::DDim w_dims = ctx.Input("W")->dims(); + w_dims[0] = x_tensor->dims()[0]; + auto* w_tensor = + local_scope.Var("W@Prefetch")->GetMutable(); + w_tensor->Resize(w_dims); + +#ifdef PADDLE_WITH_DISTRIBUTE + // w_Out is set to used by prefetch, never change it in other cases + auto* w_out = ctx.Output("W_Out"); + operators::distributed::prefetch_with_reconstruct( + "Ids@Prefetch", "W@Prefetch", table_names, epmap, height_sections, + ctx, local_scope, w_out); +#else + PADDLE_THROW( + "paddle is not compiled with distribute support, can not do " + "parameter prefetch!"); +#endif + } + bool is_custom = false; if (path) { is_custom = true; @@ -116,9 +164,8 @@ class HierarchicalSigmoidGradOpKernel : public framework::OpKernel { void Compute(const framework::ExecutionContext& ctx) const override { auto& in = detail::Ref(ctx.Input("X")); auto& w = detail::Ref(ctx.Input("W")); - auto* path = ctx.Input("PTable"); + auto* path = ctx.Input("PathTable"); auto* code = ctx.Input("PathCode"); - auto* bias = ctx.Input("Bias"); auto* in_grad = ctx.Output(framework::GradVarName("X")); bool is_sparse = ctx.Attr("is_sparse"); @@ -165,15 +212,14 @@ class HierarchicalSigmoidGradOpKernel : public framework::OpKernel { pre_out_grad_mat * out_grad_mat.broadcast(bcast); // TODO(guosheng): multiply pre_out_grad with subgradient of clipping to // be consistent with the clipping in forward. - + auto* bias_grad = + ctx.Output(framework::GradVarName("Bias")); + if (bias_grad) { + bias_grad->mutable_data(ctx.GetPlace()); + zero(dev_ctx, bias_grad, static_cast(0.0)); + bit_code->AddGrad(pre_out_grad, bias_grad); + } if (!is_sparse) { - auto* bias_grad = - ctx.Output(framework::GradVarName("Bias")); - if (bias_grad) { - bias_grad->mutable_data(ctx.GetPlace()); - zero(dev_ctx, bias_grad, static_cast(0.0)); - bit_code->AddGrad(pre_out_grad, bias_grad); - } auto* w_grad = ctx.Output(framework::GradVarName("W")); w_grad->mutable_data(ctx.GetPlace()); @@ -192,21 +238,6 @@ class HierarchicalSigmoidGradOpKernel : public framework::OpKernel { w_grad_value->mutable_data(temp_dim, ctx.GetPlace()); zero(dev_ctx, w_grad_value, static_cast(0.0)); - auto* bias_grad = - ctx.Output(framework::GradVarName("Bias")); - if (bias_grad) { - bias_grad->set_rows(real_rows); - // build ids -> rows index map - bias_grad->SyncIndex(); - bias_grad->set_height(bias->dims()[0]); - auto* bias_grad_value = bias_grad->mutable_value(); - std::vector dims = {static_cast(real_rows.size()), - bias->dims()[1]}; - bias_grad_value->mutable_data(framework::make_ddim(dims), - ctx.GetPlace()); - zero(dev_ctx, bias_grad_value, static_cast(0.0)); - bit_code->AddGrad(pre_out_grad, bias_grad); - } bit_code->MulGradWeight(pre_out_grad, w_grad, in); } bit_code->MulGradError(pre_out_grad, w, in_grad); diff --git a/paddle/fluid/operators/math/matrix_bit_code.cc b/paddle/fluid/operators/math/matrix_bit_code.cc index 5a6e64b6f87d33249f0153e5f391deaf78e53de5..fed4639b011fbf4421e92f0b6e8e766c38db0cfe 100644 --- a/paddle/fluid/operators/math/matrix_bit_code.cc +++ b/paddle/fluid/operators/math/matrix_bit_code.cc @@ -48,23 +48,6 @@ void MatrixBitCodeFunctor::AddGrad(const framework::Tensor& tmat, } } -template -void MatrixBitCodeFunctor::AddGrad(const framework::Tensor& tmat, - framework::SelectedRows* vec) { - size_t batch_size = tmat.dims()[0]; - size_t width = tmat.dims()[1]; - for (size_t i = 0; i < batch_size; ++i) { - auto code = code_table_->get_code(i); - int code_length = code->get_length(); - for (int j = 0; j < code_length; ++j) { - size_t index = code->calc_index(j); - int64_t row_index = vec->GetIndexFromId(static_cast(index)); - vec->mutable_value()->data()[row_index] += - tmat.data()[i * width + j]; - } - } -} - template void MatrixBitCodeFunctor::Sum(const framework::Tensor& tmat, framework::Tensor* sum, T scale_sum) { diff --git a/paddle/fluid/operators/math/matrix_bit_code.h b/paddle/fluid/operators/math/matrix_bit_code.h index 35ca73802b48982ddf3ed7485b56f50221c9f28c..0bc09bdb35c3340ec9096f6f0012a3a9d8b6e01e 100644 --- a/paddle/fluid/operators/math/matrix_bit_code.h +++ b/paddle/fluid/operators/math/matrix_bit_code.h @@ -139,11 +139,11 @@ class SimpleCode : public Code { template class CustomCode : public Code { public: - CustomCode(const framework::Tensor& ptable, const framework::Tensor& pcode, - const int64_t* ids, int index) + CustomCode(const framework::Tensor& path_table, + const framework::Tensor& path_code, const int64_t* ids, int index) : ids_(ids), index_(index) { - ptable_ = ptable.Slice(index, index + 1); - pcode_ = pcode.Slice(index, index + 1); + ptable_ = path_table.Slice(index, index + 1); + pcode_ = path_code.Slice(index, index + 1); } /** * Here the id of root shoud be 1 rather than 0, thus the encoding of class c @@ -195,9 +195,9 @@ class SimpleCodeTable : public CodeTable { template class CustomCodeTable : public CodeTable { public: - CustomCodeTable(const framework::Tensor& ptable, - const framework::Tensor& pcode, const int64_t* ids) - : ptable_(ptable), pcode_(pcode), ids_(ids) {} + CustomCodeTable(const framework::Tensor& path_table, + const framework::Tensor& path_code, const int64_t* ids) + : ptable_(path_table), pcode_(path_code), ids_(ids) {} std::unique_ptr get_code(int64_t code) const { std::unique_ptr coder(new CustomCode(ptable_, pcode_, ids_, code)); @@ -223,11 +223,11 @@ class MatrixBitCodeFunctor { ids_(ids), code_table_(new SimpleCodeTable(num_classes, ids)) {} - MatrixBitCodeFunctor(const framework::Tensor& ptable, - const framework::Tensor& pcode, const int64_t* ids) - : num_classes_(static_cast(ptable.dims()[1])), + MatrixBitCodeFunctor(const framework::Tensor& path_table, + const framework::Tensor& path_code, const int64_t* ids) + : num_classes_(static_cast(path_table.dims()[1])), ids_(ids), - code_table_(new CustomCodeTable(ptable, pcode, ids)) {} + code_table_(new CustomCodeTable(path_table, path_code, ids)) {} /* For j < code_length tmat(i, j) += vec(0, index(i, j)) */ @@ -238,11 +238,6 @@ class MatrixBitCodeFunctor { */ void AddGrad(const framework::Tensor& tmat, framework::Tensor* vec); - /* For selected rows For j < code_length - vec(0, index(i, j)) += tmat(i, j) - */ - void AddGrad(const framework::Tensor& tmat, framework::SelectedRows* vec); - /* For j < code_length sum(i, 0) = \sum_j bit(i, j) * tmat(i, j) */ diff --git a/python/paddle/fluid/layers/nn.py b/python/paddle/fluid/layers/nn.py index 37ddfdf7d5817dac0013437b8a9d895aaa333675..38dad857174c9cdb55a9222beb977a65c624acc3 100644 --- a/python/paddle/fluid/layers/nn.py +++ b/python/paddle/fluid/layers/nn.py @@ -4931,6 +4931,9 @@ def hsigmoid(input, pass weights = None + remote_prefetch = False + if os.environ.get('PADDLE_ENABLE_REMOTE_PREFETCH'): + remote_prefetch = True if not is_custom: weights = helper.create_parameter( @@ -4947,7 +4950,7 @@ def hsigmoid(input, inputs = { "X": input, "W": weights, - "PTable": path_table, + "PathTable": path_table, "PathCode": path_code, "Label": label } @@ -4970,9 +4973,13 @@ def hsigmoid(input, type="hierarchical_sigmoid", inputs=inputs, outputs={"Out": out, - "PreOut": pre_out}, - attrs={"num_classes": num_classes, - "is_sparse": is_sparse}) + "PreOut": pre_out, + "W_Out": weights}, + attrs={ + "num_classes": num_classes, + "is_sparse": is_sparse, + "remote_prefetch": remote_prefetch + }) return out @@ -7440,7 +7447,7 @@ def brelu(x, t_min=0.0, t_max=24.0, name=None): Examples: - .. code-block:: python + .. code-block:: python x = fluid.layers.data(name="x", shape=[2,3,16,16], dtype="float32") y = fluid.layers.brelu(x, t_min=1.0, t_max=20.0) diff --git a/python/paddle/fluid/tests/unittests/test_hsigmoid_op.py b/python/paddle/fluid/tests/unittests/test_hsigmoid_op.py index 2a6c93f75fad53440a2db64e4f34c9a5c22c654e..8ed5074dc2626ff58fc65d8af1340e260c029572 100644 --- a/python/paddle/fluid/tests/unittests/test_hsigmoid_op.py +++ b/python/paddle/fluid/tests/unittests/test_hsigmoid_op.py @@ -185,7 +185,7 @@ class TestHSigmoidOpSparse(OpTest): self.inputs = { 'X': x, 'W': w, - 'PTable': path_table, + 'PathTable': path_table, 'PathCode': path_code, 'Label': label, 'Bias': bias @@ -287,7 +287,7 @@ class TestHSigmoidOpWithCostumTree(OpTest): self.inputs = { 'X': x, 'W': w, - 'PTable': path_table, + 'PathTable': path_table, 'PathCode': path_code, 'Label': label, 'Bias': bias @@ -324,7 +324,7 @@ class TestHSigmoidOpWithCostumTreeWithoutBias(OpTest): self.inputs = { 'X': x, 'W': w, - 'PTable': path_table, + 'PathTable': path_table, 'PathCode': path_code, 'Label': label, } diff --git a/python/paddle/fluid/tests/unittests/test_hsigmoid_remote_table_op.py b/python/paddle/fluid/tests/unittests/test_hsigmoid_remote_table_op.py new file mode 100644 index 0000000000000000000000000000000000000000..9ed6c94bd2066231772571db1c6376d771f9aed2 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_hsigmoid_remote_table_op.py @@ -0,0 +1,271 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import signal +import time +import unittest +from multiprocessing import Process + +import numpy as np +import paddle.fluid as fluid +import paddle.fluid.core as core +from paddle.fluid.op import Operator +from paddle.fluid.framework import Program, program_guard + + +def run_pserver(pserver_id, use_cuda, sync_mode): + scope = fluid.core.Scope() + program = Program() + with fluid.scope_guard(scope): + with program_guard(program, startup_program=Program()): + # create table parameter in scope + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + # create and initialize Param Variable + param = scope.var('table').get_tensor() + + param_array = np.ones((5, 8)).astype("float32") + for i in range(len(param_array)): + param_array[i] *= param_array[i] * i + pserver_id * 10 + 1 + param.set(param_array, place) + + optimize_block = program._create_block(program.global_block().idx) + program.global_block().append_op( + type="listen_and_serv", + inputs={'X': []}, + outputs={}, + attrs={ + "optimize_blocks": [optimize_block], + "endpoint": '127.0.0.1:0', + "Fanin": 1, + "sync_mode": True, + "grad_to_block_id": [] + }) + + exe = fluid.Executor(place) + exe.run(program) + + +class TestListenAndServOp(unittest.TestCase): + def setUp(self): + self.ps_timeout = 5 + + def _start_pserver(self, pserver_id, use_cuda, sync_mode, pserver_func): + p = Process(target=pserver_func, args=(pserver_id, use_cuda, sync_mode)) + p.daemon = True + p.start() + return p + + def _wait_ps_ready(self, pid): + start_left_time = self.ps_timeout + sleep_time = 0.5 + while True: + assert start_left_time >= 0, "wait ps ready failed" + time.sleep(sleep_time) + try: + # the listen_and_serv_op would touch a file which contains the listen port + # on the /tmp directory until it was ready to process all the RPC call. + os.stat("/tmp/paddle.%d.port" % pid) + return + except os.error: + start_left_time -= sleep_time + + def _get_pserver_port(self, pid): + with open("/tmp/paddle.%d.port" % pid, 'r') as f: + port = int(f.read().strip()) + return port + + def _run_hsigmoid_op_one_pserver(self, place, port): + scope = fluid.core.Scope() + program = Program() + with fluid.scope_guard(scope): + with program_guard(program, startup_program=Program()): + x = scope.var('X').get_tensor() + x_array = np.random.random((4, 8)).astype("float32") * 2 + x.set(x_array, place) + # create and initialize Param Variable + param = scope.var('W').get_tensor() + param_array = np.zeros((5, 8)).astype("float32") * 2 + param.set(param_array, place) + + path_table = scope.var('PathTable').get_tensor() + path_table_array = np.array( + [(0, 2, -1, -1, -1), (0, 1, 2, -1, -1), (0, 1, 4, -1, -1), + (0, 2, -1, -1, -1)]).astype( + "int64" + ) #np.array to store 1,2,5,6s' non-leaf path(root -> leaf) + path_table.set(path_table_array, place) + + path_code = scope.var('PathCode').get_tensor() + path_code_array = np.array( + [(0, 0, -1, -1, -1), (1, 1, 1, -1, -1), (1, 0, 0, -1, -1), + (0, 1, -1, -1, -1)]).astype("int64") #np.array to store + path_code.set(path_code_array, place) + + label = scope.var('Label').get_tensor() + label_array = np.array([0, 1, 4, 5]) + label.set(label_array, place) + + bias = scope.var('Bias').get_tensor() + bias_array = np.random.random((5, 1)).astype("float32") + bias.set(bias_array, place) + + out = scope.var('Out').get_tensor() + + pre_out = scope.var('PreOut').get_tensor + + w_out = scope.var('W_Out').get_tensor() + w_out.set(param_array, place) + + emaps = ['127.0.0.1:' + str(port)] + table_names = ['table'] + height_sections = [2] + + # create and run sgd operator + hsigmoid_op = Operator( + "hierarchical_sigmoid", + X='X', + W='W', + PathTable='PathTable', + PathCode='PathCode', + Label='Label', + Bias='Bias', + Out='Out', + PreOut='PreOut', + W_Out='W_Out', + remote_prefetch=True, + epmap=emaps, + table_names=table_names, + height_sections=height_sections) + + hsigmoid_op.run(scope, place) + + # get and compare result + result_array = np.array(w_out) + self.assertEqual(list(result_array.shape), [5, 8]) + correct = None + for i in range(5): + if i != 3: + correct = np.full((1, 8), i + 1).astype("float32") + self.assertTrue((result_array[i] == correct).all()) + else: + correct = np.full((1, 8), 0).astype("float32") + self.assertTrue((result_array[i] == correct).all()) + + def _run_hsigmoid_op_two_pserver(self, place, port0, port1): + scope = fluid.core.Scope() + program = Program() + with fluid.scope_guard(scope): + with program_guard(program, startup_program=Program()): + x = scope.var('X').get_tensor() + x_array = np.random.random((4, 8)).astype("float32") * 2 + x.set(x_array, place) + # create and initialize Param Variable + param = scope.var('W').get_tensor() + param_array = np.zeros((5, 8)).astype("float32") * 2 + param.set(param_array, place) + + path_table = scope.var('PathTable').get_tensor() + path_table_array = np.array( + [(0, 2, -1, -1, -1), (0, 1, 3, -1, -1), (0, 1, 4, -1, -1), + (0, 2, -1, -1, -1)]).astype( + "int64" + ) #np.array to store 1,2,5,6s' non-leaf path(root -> leaf) + path_table.set(path_table_array, place) + + path_code = scope.var('PathCode').get_tensor() + path_code_array = np.array( + [(0, 0, -1, -1, -1), (1, 1, 1, -1, -1), (1, 0, 0, -1, -1), + (0, 1, -1, -1, -1)]).astype("int64") #np.array to store + path_code.set(path_code_array, place) + + label = scope.var('Label').get_tensor() + label_array = np.array([0, 1, 4, 5]) + label.set(label_array, place) + + bias = scope.var('Bias').get_tensor() + bias_array = np.random.random((5, 1)).astype("float32") + bias.set(bias_array, place) + + out = scope.var('Out').get_tensor() + + pre_out = scope.var('PreOut').get_tensor + + w_out = scope.var('W_Out').get_tensor() + w_out.set(param_array, place) + + emaps = ['127.0.0.1:' + str(port0), '127.0.0.1:' + str(port1)] + table_names = ['table', 'table'] + height_sections = [2, 3] + + # create and run sgd operator + hsigmoid_op = Operator( + "hierarchical_sigmoid", + X='X', + W='W', + PathTable='PathTable', + PathCode='PathCode', + Label='Label', + Bias='Bias', + Out='Out', + PreOut='PreOut', + W_Out='W_Out', + remote_prefetch=True, + epmap=emaps, + table_names=table_names, + height_sections=height_sections) + hsigmoid_op.run(scope, place) + + # get and compare result + result_array = np.array(w_out) + self.assertEqual(list(result_array.shape), [5, 8]) + correct = None + for i in range(5): + if i < 2: + correct = np.full((1, 8), i + 1).astype("float32") + self.assertTrue((result_array[i] == correct).all()) + else: + correct = np.full((1, 8), i + 9).astype("float32") + self.assertTrue((result_array[i] == correct).all()) + + def test_hsigmoid_op_remote(self): + os.environ['PADDLE_ENABLE_REMOTE_PREFETCH'] = "1" + # run pserver on CPU in sync mode + p0 = self._start_pserver(0, False, True, run_pserver) + self._wait_ps_ready(p0.pid) + port0 = self._get_pserver_port(p0.pid) + + p1 = self._start_pserver(1, False, True, run_pserver) + self._wait_ps_ready(p1.pid) + port1 = self._get_pserver_port(p1.pid) + + places = [core.CPUPlace()] + if core.is_compiled_with_cuda(): + places.append(core.CUDAPlace(0)) + + for place in places: + self._run_hsigmoid_op_one_pserver(place, port0) + self._run_hsigmoid_op_two_pserver(place, port0, port1) + + # raise SIGTERM to pserver + os.kill(p0.pid, signal.SIGINT) + p0.join() + os.kill(p1.pid, signal.SIGINT) + p1.join() + + +if __name__ == '__main__': + unittest.main()