diff --git a/paddle/fluid/operators/distributed/parameter_prefetch.cc b/paddle/fluid/operators/distributed/parameter_prefetch.cc index 13fc0bc0cd8afb5c5574486386efb3f4557e919f..77cae353133e77e0ee66d4763b0bcb70babdcf5d 100644 --- a/paddle/fluid/operators/distributed/parameter_prefetch.cc +++ b/paddle/fluid/operators/distributed/parameter_prefetch.cc @@ -48,7 +48,7 @@ static size_t GetSectionIndex(int64_t id, } static std::vector ToAbsoluteSection( - const std::vector& height_sections) { + const std::vector& height_sections) { std::vector abs_sections; abs_sections.resize(height_sections.size()); abs_sections[0] = 0; @@ -59,7 +59,7 @@ static std::vector ToAbsoluteSection( } static std::vector> SplitIds( - const std::string& id_name, const std::vector& height_section, + const std::string& id_name, const std::vector& height_section, framework::Scope* scope) { auto& id_tensor = scope->FindVar(id_name)->Get(); auto* id_data = id_tensor.data(); @@ -79,7 +79,7 @@ static std::vector> SplitIds( static void SplitIdsIntoMultipleVarsBySection( const std::string& id_name, const std::vector& in_var_names, - const std::vector& height_section, + const std::vector& height_section, const std::vector>& splited_ids, framework::Scope* scope) { PADDLE_ENFORCE_EQ(in_var_names.size(), height_section.size(), ""); @@ -101,7 +101,7 @@ static void SplitIdsIntoMultipleVarsBySection( static void MergeMultipleVarsIntoOneBySection( const std::string& id_name, const std::string& out_name, const std::vector& out_var_names, - const std::vector& height_section, + const std::vector& height_section, const std::vector>& splited_ids, const framework::ExecutionContext& context, framework::Scope* scope) { PADDLE_ENFORCE_EQ(out_var_names.size(), height_section.size(), ""); @@ -154,7 +154,7 @@ static void MergeMultipleVarsIntoOneBySection( void prefetch(const std::string& id_name, const std::string& out_name, const std::vector& table_names, const std::vector& epmap, - const std::vector& height_sections, + const std::vector& height_sections, const framework::ExecutionContext& context) { auto& local_scope = context.scope().NewScope(); diff --git a/paddle/fluid/operators/distributed/parameter_prefetch.h b/paddle/fluid/operators/distributed/parameter_prefetch.h index 0693cfc1fd2b5bb1bee1609149d7e056557d65db..53b0fbfb51f60fa86351cca34fd1665c7802591b 100644 --- a/paddle/fluid/operators/distributed/parameter_prefetch.h +++ b/paddle/fluid/operators/distributed/parameter_prefetch.h @@ -26,7 +26,7 @@ namespace distributed { void prefetch(const std::string& id_name, const std::string& out_name, const std::vector& table_names, const std::vector& epmap, - const std::vector& height_sections, + const std::vector& height_sections, const framework::ExecutionContext& context); }; // namespace distributed diff --git a/paddle/fluid/operators/lookup_table_op.cc b/paddle/fluid/operators/lookup_table_op.cc index ab6518641bd72ff4cfc0fc2af081c24c307be46e..658b586e4cf395adc2035b70a262fff7e8858e88 100644 --- a/paddle/fluid/operators/lookup_table_op.cc +++ b/paddle/fluid/operators/lookup_table_op.cc @@ -91,9 +91,9 @@ class LookupTableOpMaker : public framework::OpProtoAndCheckerMaker { // for parameter prefetch AddAttr("remote_prefetch", "").SetDefault(false); AddAttr("trainer_id", "trainer id from 0 ~ worker_num.").SetDefault(0); - AddAttr>("height_sections", - "Height for each output SelectedRows.") - .SetDefault(std::vector({})); + AddAttr>("height_sections", + "Height for each output SelectedRows.") + .SetDefault(std::vector({})); AddAttr>( "epmap", "(string vector, default 127.0.0.1:6164)" diff --git a/paddle/fluid/operators/lookup_table_op.h b/paddle/fluid/operators/lookup_table_op.h index c1a1ea87a0ee7e6438a680a531c2a91f1f421395..17286191c2675905d31ea428e2453b60d2741d20 100644 --- a/paddle/fluid/operators/lookup_table_op.h +++ b/paddle/fluid/operators/lookup_table_op.h @@ -52,8 +52,7 @@ class LookupTableKernel : public framework::OpKernel { // for remote prefetch auto epmap = context.Attr>("epmap"); - auto height_sections = - context.Attr>("height_sections"); + auto height_sections = context.Attr>("height_sections"); auto table_names = context.Attr>("table_names"); if (!height_sections.empty()) { diff --git a/python/paddle/fluid/tests/unittests/test_lookup_remote_table_op.py b/python/paddle/fluid/tests/unittests/test_lookup_remote_table_op.py new file mode 100644 index 0000000000000000000000000000000000000000..95a266540dd10a0b30b785b3379a8c37774bf6b3 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_lookup_remote_table_op.py @@ -0,0 +1,146 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +import signal +import time +import unittest +from multiprocessing import Process + +import numpy as np +import paddle.fluid as fluid +import paddle.fluid.core as core +from paddle.fluid.op import Operator +from paddle.fluid.framework import Program, program_guard + + +def run_pserver(use_cuda, sync_mode): + scope = fluid.core.Scope() + program = Program() + with fluid.scope_guard(scope): + with program_guard(program, startup_program=Program()): + # create table parameter in scope + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + # create and initialize Param Variable + param = scope.var('table').get_tensor() + param_array = np.full((10, 8), 5.0).astype("float32") + param.set(param_array, place) + + optimize_block = program._create_block(program.global_block().idx) + program.global_block().append_op( + type="listen_and_serv", + inputs={'X': []}, + outputs={}, + attrs={ + "optimize_blocks": [optimize_block], + "endpoint": '127.0.0.1:0', + "Fanin": 1, + "sync_mode": True, + "grad_to_block_id": [] + }) + + exe = fluid.Executor(place) + exe.run(program) + + +class TestListenAndServOp(unittest.TestCase): + def setUp(self): + self.ps_timeout = 5 + + def _start_pserver(self, use_cuda, sync_mode, pserver_func): + p = Process(target=pserver_func, args=(use_cuda, sync_mode)) + p.daemon = True + p.start() + return p + + def _wait_ps_ready(self, pid): + start_left_time = self.ps_timeout + sleep_time = 0.5 + while True: + assert start_left_time >= 0, "wait ps ready failed" + time.sleep(sleep_time) + try: + # the listen_and_serv_op would touch a file which contains the listen port + # on the /tmp directory until it was ready to process all the RPC call. + os.stat("/tmp/paddle.%d.port" % pid) + return + except os.error: + start_left_time -= sleep_time + + def _get_pserver_port(self, pid): + with open("/tmp/paddle.%d.port" % pid, 'r') as f: + port = int(f.read().strip()) + return port + + def _run_lookup_table_op(self, place, port): + scope = fluid.core.Scope() + program = Program() + with fluid.scope_guard(scope): + with program_guard(program, startup_program=Program()): + # create and initialize Param Variable + param = scope.var('W').get_tensor() + param_array = np.full((10, 8), 1.0).astype("float32") + param.set(param_array, place) + + ids = scope.var('Ids').get_tensor() + ids_array = np.array([[1.0], [2.0]]).astype("int64") + ids.set(ids_array, place) + ids.set_lod([[0, 1, 2]]) + + out = scope.var('Out').get_tensor() + + emaps = ['127.0.0.1:' + str(port)] + table_names = ['table'] + height_sections = [10] + # create and run sgd operator + lookup_table_op = Operator( + "lookup_table", + W='W', + Ids='Ids', + Out='Out', + remote_prefetch=True, + epmap=emaps, + table_names=table_names, + height_sections=height_sections) + lookup_table_op.run(scope, place) + + # get and compare result + result_array = np.array(out) + + print(result_array) + + self.assertTrue((result_array[0] == 5).all()) + self.assertTrue((result_array[0] == 5).all()) + + def test_lookup_remote_table(self): + # run pserver on CPU in sync mode + p1 = self._start_pserver(False, True, run_pserver) + self._wait_ps_ready(p1.pid) + port = self._get_pserver_port(p1.pid) + + places = [core.CPUPlace()] + # if core.is_compiled_with_cuda(): + # places.append(core.CUDAPlace(0)) + for place in places: + self._run_lookup_table_op(place, port) + + # raise SIGTERM to pserver + os.kill(p1.pid, signal.SIGINT) + p1.join() + + +if __name__ == '__main__': + unittest.main()