diff --git a/.copyright.hook b/.copyright.hook index dc1b096a0ad28db732b794fa856efed71917c5e8..09afff2072df3384a429d01d06188218ae6e85d1 100644 --- a/.copyright.hook +++ b/.copyright.hook @@ -9,7 +9,7 @@ import subprocess import platform COPYRIGHT = ''' - Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/doc/howto/usage/cluster/fluid_cluster_train_en.md b/doc/howto/usage/cluster/fluid_cluster_train_en.md index 11904a6f71bb6ce37417aeffb8e408ec65961b12..ae825d9a517c7e9005d4e32f8f34b3f6a79be0c9 100644 --- a/doc/howto/usage/cluster/fluid_cluster_train_en.md +++ b/doc/howto/usage/cluster/fluid_cluster_train_en.md @@ -16,6 +16,12 @@ PaddlePaddle must be installed on all nodes. If you have GPU cards on your nodes PaddlePaddle build and installation guide can be found [here](http://www.paddlepaddle.org/docs/develop/documentation/en/getstarted/build_and_install/index_en.html). +In addition to above, the `cmake` command should be run with the option `WITH_DISTRIBUTE` set to on. An example bare minimum `cmake` command would look as follows: + +``` bash +cmake .. -DWITH_DOC=OFF -DWITH_GPU=OFF -DWITH_DISTRIBUTE=ON -DWITH_SWIG_PY=ON -DWITH_PYTHON=ON +``` + ### Update the training script #### Non-cluster training script @@ -119,7 +125,14 @@ for pass_id in range(100): ### E2E demo -Please find the complete demo from [here](https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/v2/fluid/tests/book_distribute/notest_dist_fit_a_line.py). In parameter server node run the following in the command line: +Please find the complete demo from [here](https://github.com/PaddlePaddle/Paddle/blob/develop/python/paddle/v2/fluid/tests/book_distribute/notest_dist_fit_a_line.py). +First `cd` into the folder that contains the `python` files. In this case: + +```bash +cd /paddle/python/paddle/v2/fluid/tests/book_distribute +``` + +In parameter server node run the following in the command line: ``` bash PSERVERS=192.168.1.2:6174 SERVER_ENDPOINT=192.168.1.2:6174 TRAINING_ROLE=PSERVER python notest_dist_fit_a_line.py diff --git a/paddle/inference/inference.cc b/paddle/inference/inference.cc index 49001778808173b82865a4b6632a6b175ef96242..09268ffb3a1410b22f1b7d997a5cc0e4176b6d55 100644 --- a/paddle/inference/inference.cc +++ b/paddle/inference/inference.cc @@ -19,14 +19,10 @@ limitations under the License. */ #include "paddle/framework/init.h" #include "paddle/framework/scope.h" -#ifdef PADDLE_USE_PTOOLS -#include "chooseser.h" -#endif - namespace paddle { void InferenceEngine::LoadInferenceModel(const std::string& dirname) { - std::string model_filename = dirname + "/__model__.dat"; + std::string model_filename = dirname + "/__model__"; LOG(INFO) << "loading model from " << model_filename; std::ifstream inputfs(model_filename, std::ios::in | std::ios::binary); std::string program_desc_str; @@ -52,39 +48,15 @@ void InferenceEngine::LoadInferenceModel(const std::string& dirname) { } } -void InferenceEngine::LoadInferenceModel( - const std::string& dirname, - const std::vector& feed_var_names, - const std::vector& fetch_var_names) { - std::string model_filename = dirname + "/__model__.dat"; - LOG(INFO) << "loading model from " << model_filename; - std::ifstream inputfs(model_filename, std::ios::in | std::ios::binary); - std::string program_desc_str; - inputfs.seekg(0, std::ios::end); - program_desc_str.resize(inputfs.tellg()); - inputfs.seekg(0, std::ios::beg); - LOG(INFO) << "program_desc_str's size: " << program_desc_str.size(); - inputfs.read(&program_desc_str[0], program_desc_str.size()); - inputfs.close(); - - program_ = new framework::ProgramDesc(program_desc_str); - GenerateLoadProgram(dirname); - - if (feed_var_names.empty() || fetch_var_names.empty()) { - LOG(FATAL) << "Please specify the feed_var_names and fetch_var_names."; - } - feed_var_names_ = feed_var_names; - fetch_var_names_ = fetch_var_names; - PrependFeedOp(); - AppendFetchOp(); -} - bool InferenceEngine::IsParameter(const framework::VarDesc* var) { - if (var->Persistable() && var->Name() != "feed" && var->Name() != "fetch") { + if (var->Persistable()) { // There are many unreachable variables in the program for (size_t i = 0; i < program_->Size(); ++i) { const framework::BlockDesc& block = program_->Block(i); for (auto* op : block.AllOps()) { + if (op->Type() == "feed") { + continue; + } for (auto input_argument_name : op->InputArgumentNames()) { if (input_argument_name == var->Name()) { return true; diff --git a/paddle/inference/inference.h b/paddle/inference/inference.h index 7fc09cb9e539a65a8cd3cceb1543bc7d111c22b3..26f259824b945e260b370ced9d065842264075d5 100644 --- a/paddle/inference/inference.h +++ b/paddle/inference/inference.h @@ -29,9 +29,6 @@ public: } void LoadInferenceModel(const std::string& dirname); - void LoadInferenceModel(const std::string& dirname, - const std::vector& feed_var_names, - const std::vector& fetch_var_names); void Execute(const std::vector& feeds, std::vector& fetchs); diff --git a/paddle/operators/math/CMakeLists.txt b/paddle/operators/math/CMakeLists.txt index c607704efac86982c8c22e462381aaab488a9b69..28c5aec1996ad04a6cb551ac68c14b613d16858e 100644 --- a/paddle/operators/math/CMakeLists.txt +++ b/paddle/operators/math/CMakeLists.txt @@ -11,7 +11,7 @@ if(WITH_GPU) nv_library(sequence_pooling SRCS sequence_pooling.cc sequence_pooling.cu DEPS device_context math_function) nv_library(vol2col SRCS vol2col.cc vol2col.cu DEPS device_context tensor) nv_library(context_project SRCS context_project.cc context_project.cu DEPS device_context math_function) - nv_library(sequence2batch SRCS sequence2batch.cc sequence2batch.cu DEPS device_context tensor) + nv_library(sequence2batch SRCS sequence2batch.cc sequence2batch.cu DEPS device_context tensor math_function) nv_library(sequence_padding SRCS sequence_padding.cc sequence_padding.cu DEPS lod_tensor device_context) nv_library(sequence_scale SRCS sequence_scale.cc sequence_scale.cu DEPS lod_tensor device_context) nv_library(lstm_compute SRCS lstm_compute.cc lstm_compute.cu DEPS device_context activation_functions) @@ -28,7 +28,7 @@ else() cc_library(sequence_pooling SRCS sequence_pooling.cc DEPS device_context math_function) cc_library(vol2col SRCS vol2col.cc DEPS device_context tensor) cc_library(context_project SRCS context_project.cc DEPS device_context math_function) - cc_library(sequence2batch SRCS sequence2batch.cc DEPS device_context tensor) + cc_library(sequence2batch SRCS sequence2batch.cc DEPS device_context tensor math_function) cc_library(sequence_padding SRCS sequence_padding.cc DEPS lod_tensor device_context) cc_library(sequence_scale SRCS sequence_scale.cc DEPS lod_tensor device_context) cc_library(lstm_compute SRCS lstm_compute.cc DEPS device_context activation_functions) diff --git a/paddle/operators/nccl_op_test.cu.cc b/paddle/operators/nccl_op_test.cu.cc index 6546096069d4c3fbc4908a16c2dba2ac6d7e6421..072e4eb2eff1f6f3d8745ac8e16709b8e1a69725 100644 --- a/paddle/operators/nccl_op_test.cu.cc +++ b/paddle/operators/nccl_op_test.cu.cc @@ -241,7 +241,7 @@ TEST_F(NCCLTester, ncclReduceOp) { // ncclBcastOp with desc TEST_F(NCCLTester, ncclBcastOp) { std::unique_ptr op2(new f::OpDesc); - const int kRoot = 5; + const int kRoot = 0; op2->SetType("ncclBcast"); op2->SetInput("X", {"st"}); op2->SetInput("Communicator", {"comm"}); diff --git a/paddle/platform/call_once.h b/paddle/platform/call_once.h index 00337a7f051758559a0f8012d8c78dbe8e3457a6..44a4d38f679ddf6c317e52132b6cf3eb2f0a0649 100644 --- a/paddle/platform/call_once.h +++ b/paddle/platform/call_once.h @@ -29,20 +29,25 @@ namespace platform { */ template inline void call_once(std::once_flag& flag, Callable&& f, Args&&... args) { - bool good = false; + bool good = true; std::exception ex; - std::call_once(flag, - [&](Args&&... args) { - try { - f(args...); - good = true; - } catch (const std::exception& e) { - ex = e; - } catch (...) { - ex = std::runtime_error("excption caught in call_once"); - } - }, - args...); + try { + std::call_once(flag, + [&](Args&&... args) { + try { + f(args...); + } catch (const std::exception& e) { + ex = e; + good = false; + } catch (...) { + ex = std::runtime_error("excption caught in call_once"); + good = false; + } + }, + args...); + } catch (std::system_error& x) { + throw std::runtime_error("call once failed"); + } if (!good) { throw std::exception(ex); } diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index abcad899bfac9ba3eff20cde825e136d867a4485..934eba73b82dbfa93c210f7675793417c10a04c3 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -33,6 +33,10 @@ class VarBlock: return "%s:%d:%d" % (self.varname, self.offset, self.size) +def same_or_split_var(p_name, var_name): + return p_name == var_name or p_name.startswith(var_name + ".block") + + def split_dense_variable(var_list, pserver_count, min_block_size=1024, @@ -303,8 +307,8 @@ class DistributeTranspiler: return True else: for n in param_names: - if n.startswith(op.inputs["Param"].name+".block") and \ - n != op.inputs["Param"].name: + if same_or_split_var(n, op.inputs[ + "Param"].name) and n != op.inputs["Param"].name: return True return False else: @@ -335,7 +339,7 @@ class DistributeTranspiler: if key == "Grad": grad_block = None for g in self.param_grad_ep_mapping[endpoint]["grads"]: - if g.name.startswith(var.name): + if same_or_split_var(g.name, var.name): grad_block = g break if not grad_block: @@ -365,7 +369,7 @@ class DistributeTranspiler: # param is already created on global program param_block = None for p in self.param_grad_ep_mapping[endpoint]["params"]: - if p.name.startswith(var.name): + if same_or_split_var(p.name, var.name): param_block = p break if not param_block: @@ -502,7 +506,7 @@ class DistributeTranspiler: def _get_splited_name_and_shape(varname): for idx, splited_param in enumerate(params): pname = splited_param.name - if pname.startswith(varname) and varname != pname: + if same_or_split_var(pname, varname) and varname != pname: return pname, splited_param.shape return "", [] diff --git a/python/paddle/v2/fluid/executor.py b/python/paddle/v2/fluid/executor.py index 9d5ed9571a2fa0a871a25e43b23b1a3c3a6102db..9f48815b8b84426c7d539af4e7d45ea47e69d4d9 100644 --- a/python/paddle/v2/fluid/executor.py +++ b/python/paddle/v2/fluid/executor.py @@ -68,6 +68,84 @@ def as_numpy(tensor): return ans +def has_feed_operators(block, feed_targets, feed_holder_name): + """ Check whether the block already has feed operators. + + Return false if the block does not have any feed operators. + If some feed operators have been prepended to the block, check that + the info contained in these feed operators matches the feed_targets + and feed_holder_name. Raise exception when any mismatch is found. + Return true when the block has feed operators with matching info. + + Args: + block: a block instance (typically global block of a program) + feed_targets: a dictionary of {feed_target_name: feed_target_data} + feed_holder_name: the name of the variable that holds the data of + all feed targets. The type of this feed_holder variable is + FEED_MINIBATCH, which is essentially vector. + + Returns: + A boolean value that indicates whether a block has feed operators + that match the info contained in feed_targets and feed_holder_name. + """ + + feed_count = 0 + for op in block.ops: + if op.desc.type() == 'feed': + feed_count += 1 + assert op.desc.input('X')[0] == feed_holder_name + feed_target_name = op.desc.output('Out')[0] + if feed_target_name not in feed_targets: + raise Exception("'feed_targets' does not have {} variable". + format(feed_target_name)) + else: + break + if feed_count > 0 and feed_count != len(feed_targets): + raise Exception( + "Feed operators in program desc do not match 'feed_targets'") + return feed_count > 0 + + +def has_fetch_operators(block, fetch_targets, fetch_holder_name): + """ Check whether the block already has fetch operators. + + Return false if the block does not have any fetch operators. + If some fetch operators have been appended to the block, check that + the info contained in these fetch operators matches the fetch_targets + and fetch_holder_name. Raise exception when any mismatch is found. + Return true when the block has fetch operators with matching info. + + Args: + block: a block instance (typically global block of a program) + fetch_targets: a dictionary of {fetch_target_name: fetch_target_data} + fetch_holder_name: the name of the variable that holds the data of + all fetch targets. The type of this fetch_holder variable is + FETCH_LIST, which is essentially vector. + + Return: + A boolean value that indicates whether a block has fetch operators + that match the info contained in fetch_targets and fetch_holder_name. + """ + + fetch_count = 0 + for op in block.ops: + if op.desc.type() == 'fetch': + fetch_count += 1 + assert op.desc.output('Out')[0] == fetch_holder_name + fetch_target_name = op.desc.input('X')[0] + if fetch_target_name not in [ + var.desc.name() for var in fetch_targets + ]: + raise Exception("'fetch_targets' does not have {} variable". + format(fetch_target_name)) + idx = op.desc.attr('col') + assert fetch_target_name == fetch_targets[idx].desc.name() + if fetch_count > 0 and fetch_count != len(fetch_targets): + raise Exception( + "Fetch operators in program desc do not match 'fetch_targets'") + return fetch_count > 0 + + class Executor(object): def __init__(self, places): if not isinstance(places, list) and not isinstance(places, tuple): @@ -147,33 +225,50 @@ class Executor(object): program = program.clone() global_block = program.global_block() - feed_var = global_block.create_var( - name=feed_var_name, - type=core.VarDesc.VarType.FEED_MINIBATCH, - persistable=True) - - for i, name in enumerate(feed): - out = global_block.var(name) - global_block.prepend_op( - 'feed', - inputs={'X': [feed_var]}, - outputs={'Out': [out]}, - attrs={'col': i}) - cur_feed = feed[name] - if not isinstance(cur_feed, core.LoDTensor): - cur_feed = self.aslodtensor(cur_feed) - core.set_feed_variable(scope, cur_feed, feed_var.name, i) - - fetch_var = global_block.create_var( - name=fetch_var_name, - type=core.VarDesc.VarType.FETCH_LIST, - persistable=True) - for i, var in enumerate(fetch_list): - global_block.append_op( - type='fetch', - inputs={'X': [var]}, - outputs={'Out': [fetch_var]}, - attrs={'col': i}) + + if feed_var_name in global_block.vars: + feed_var = global_block.var(feed_var_name) + else: + feed_var = global_block.create_var( + name=feed_var_name, + type=core.VarDesc.VarType.FEED_MINIBATCH, + persistable=True) + + if fetch_var_name in global_block.vars: + fetch_var = global_block.var(fetch_var_name) + else: + fetch_var = global_block.create_var( + name=fetch_var_name, + type=core.VarDesc.VarType.FETCH_LIST, + persistable=True) + + if not has_feed_operators(global_block, feed, feed_var_name): + for i, name in enumerate(feed): + out = global_block.var(name) + global_block.prepend_op( + type='feed', + inputs={'X': [feed_var]}, + outputs={'Out': [out]}, + attrs={'col': i}) + + for op in global_block.ops: + if op.desc.type() == 'feed': + feed_target_name = op.desc.output('Out')[0] + cur_feed = feed[feed_target_name] + if not isinstance(cur_feed, core.LoDTensor): + cur_feed = self.aslodtensor(cur_feed) + idx = op.desc.attr('col') + core.set_feed_variable(scope, cur_feed, feed_var_name, idx) + else: + break + + if not has_fetch_operators(global_block, fetch_list, fetch_var_name): + for i, var in enumerate(fetch_list): + global_block.append_op( + type='fetch', + inputs={'X': [var]}, + outputs={'Out': [fetch_var]}, + attrs={'col': i}) self.executor.run(program.desc, scope, 0, True, True) outs = [ diff --git a/python/paddle/v2/fluid/io.py b/python/paddle/v2/fluid/io.py index 5b02d2495d1ebe9e82e7f847e5bd07548901c7fc..d56ec45c538b580f5520bc060b4b339bb1be0539 100644 --- a/python/paddle/v2/fluid/io.py +++ b/python/paddle/v2/fluid/io.py @@ -13,7 +13,6 @@ # limitations under the License. import os -import cPickle as pickle from paddle.v2.fluid.evaluator import Evaluator from paddle.v2.fluid.framework import Program, Parameter, default_main_program, Variable @@ -191,8 +190,8 @@ def get_inference_program(target_vars, main_program=None): vars = [] for var in target_vars: if isinstance(var, Evaluator): - vars.append(var.states) - vars.append(var.metrics) + vars.extend(var.states) + vars.extend(var.metrics) else: vars.append(var) pruned_program = main_program.prune(targets=vars) @@ -200,12 +199,16 @@ def get_inference_program(target_vars, main_program=None): return inference_program -def prepend_feed_ops(inference_program, feeded_var_names): +def prepend_feed_ops(inference_program, + feed_target_names, + feed_holder_name='feed'): global_block = inference_program.global_block() feed_var = global_block.create_var( - name='feed', type=core.VarDesc.VarType.FEED_MINIBATCH, persistable=True) + name=feed_holder_name, + type=core.VarDesc.VarType.FEED_MINIBATCH, + persistable=True) - for i, name in enumerate(feeded_var_names): + for i, name in enumerate(feed_target_names): out = global_block.var(name) global_block.prepend_op( type='feed', @@ -214,12 +217,16 @@ def prepend_feed_ops(inference_program, feeded_var_names): attrs={'col': i}) -def append_fetch_ops(inference_program, fetch_var_names): +def append_fetch_ops(inference_program, + fetch_target_names, + fetch_holder_name='fetch'): global_block = inference_program.global_block() fetch_var = global_block.create_var( - name='fetch', type=core.VarDesc.VarType.FETCH_LIST, persistable=True) + name=fetch_holder_name, + type=core.VarDesc.VarType.FETCH_LIST, + persistable=True) - for i, name in enumerate(fetch_var_names): + for i, name in enumerate(fetch_target_names): global_block.append_op( type='fetch', inputs={'X': [name]}, @@ -269,21 +276,12 @@ def save_inference_model(dirname, inference_program = pruned_program.inference_optimize() fetch_var_names = [v.name for v in target_vars] - model_file_name = dirname + "/__model__" - with open(model_file_name, "w") as f: - pickle.dump({ - "program_desc_str": inference_program.desc.serialize_to_string(), - "feed_var_names": feeded_var_names, - "fetch_var_names": fetch_var_names - }, f, -1) - prepend_feed_ops(inference_program, feeded_var_names) append_fetch_ops(inference_program, fetch_var_names) - # Save only programDesc of inference_program in binary format - # in another file: __model__.dat - with open(model_file_name + ".dat", "wb") as fp: - fp.write(inference_program.desc.serialize_to_string()) + model_file_name = dirname + "/__model__" + with open(model_file_name, "wb") as f: + f.write(inference_program.desc.serialize_to_string()) save_params(executor, dirname, main_program) @@ -306,6 +304,24 @@ def load_persistables_if_exist(executor, dirname, main_program=None): predicate=_is_presistable_and_exist_) +def get_feed_targets_names(program): + feed_targets_names = [] + global_block = program.global_block() + for op in global_block.ops: + if op.desc.type() == 'feed': + feed_targets_names.insert(0, op.desc.output('Out')[0]) + return feed_targets_names + + +def get_fetch_targets_names(program): + fetch_targets_names = [] + global_block = program.global_block() + for op in global_block.ops: + if op.desc.type() == 'fetch': + fetch_targets_names.append(op.desc.input('X')[0]) + return fetch_targets_names + + def load_inference_model(dirname, executor): """ Load inference model from a directory @@ -313,24 +329,28 @@ def load_inference_model(dirname, executor): :param dirname: directory path :param executor: executor that load inference model - :return: [program, feed_var_names, fetch_var_names] + :return: [program, feed_target_names, fetch_targets] program: program especially for inference. - feeded_var_names: Names of variables that need to feed data - fetch_vars: Variables from which we can get inference results. + feed_target_names: Names of variables that need to feed data + fetch_targets: Variables from which we can get inference results. """ if not os.path.isdir(dirname): raise ValueError("There is no directory named '%s'", dirname) model_file_name = dirname + "/__model__" - model = pickle.load(open(model_file_name, "r")) - program_desc_str = model["program_desc_str"] - feed_var_names = model["feed_var_names"] - fetch_var_names = model["fetch_var_names"] + with open(model_file_name, "rb") as f: + program_desc_str = f.read() + program = Program.parse_from_string(program_desc_str) load_persistables_if_exist(executor, dirname, program) - fetch_vars = [program.global_block().var(name) for name in fetch_var_names] - return [program, feed_var_names, fetch_vars] + feed_target_names = get_feed_targets_names(program) + fetch_target_names = get_fetch_targets_names(program) + fetch_targets = [ + program.global_block().var(name) for name in fetch_target_names + ] + + return [program, feed_target_names, fetch_targets] def get_parameter_value(para, executor): diff --git a/python/paddle/v2/fluid/tests/book/test_recognize_digits_mlp.py b/python/paddle/v2/fluid/tests/book/test_recognize_digits_mlp.py index 8776a65bf804e93dfeb295ecca34fac0840b0a90..236ee4f3398538403228a00ee9c4b72c7c8231cf 100644 --- a/python/paddle/v2/fluid/tests/book/test_recognize_digits_mlp.py +++ b/python/paddle/v2/fluid/tests/book/test_recognize_digits_mlp.py @@ -91,6 +91,21 @@ for pass_id in range(PASS_NUM): fluid.io.save_inference_model( "./recognize_digits_mlp.inference.model/", ["x"], [predict], exe) - exit(0) - -exit(1) + break + +# Use load_inference_model to obtain the inference program desc, +# the feed_target_names (the names of variables that will be feeded +# data using feed operators), and the fetch_targets (variables that +# we want to obtain data from using fetch operators). +[infer_prog, feed_target_names, fetch_targets] = fluid.io.load_inference_model( + "./recognize_digits_mlp.inference.model/", exe) + +tensor_x = np.random.rand(1, 784).astype("float32") +# Construct feed as a dictionary of {feed_target_name: feed_target_data} +# and results will contain a list of data corresponding to fetch_targets. +results = exe.run(infer_prog, + feed={feed_target_names[0]: tensor_x}, + fetch_list=fetch_targets) +print(results[0]) + +exit(0) diff --git a/python/paddle/v2/fluid/tests/book_distribute/notest_recommender_system_dist.py b/python/paddle/v2/fluid/tests/book_distribute/notest_recommender_system_dist.py new file mode 100644 index 0000000000000000000000000000000000000000..2d8885e377b0a10d8b5bad4e8fcecb9cc6fc8b64 --- /dev/null +++ b/python/paddle/v2/fluid/tests/book_distribute/notest_recommender_system_dist.py @@ -0,0 +1,216 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import os +import paddle.v2 as paddle +import paddle.v2.fluid as fluid +import paddle.v2.fluid.core as core +import paddle.v2.fluid.layers as layers +import paddle.v2.fluid.nets as nets +from paddle.v2.fluid.optimizer import SGDOptimizer + +IS_SPARSE = True +BATCH_SIZE = 256 +PASS_NUM = 100 + + +def get_usr_combined_features(): + USR_DICT_SIZE = paddle.dataset.movielens.max_user_id() + 1 + uid = layers.data(name='user_id', shape=[1], dtype='int64') + usr_emb = layers.embedding( + input=uid, + dtype='float32', + size=[USR_DICT_SIZE, 32], + param_attr='user_table', + is_sparse=IS_SPARSE) + usr_fc = layers.fc(input=usr_emb, size=32) + USR_GENDER_DICT_SIZE = 2 + + usr_gender_id = layers.data(name='gender_id', shape=[1], dtype='int64') + usr_gender_emb = layers.embedding( + input=usr_gender_id, + size=[USR_GENDER_DICT_SIZE, 16], + param_attr='gender_table', + is_sparse=IS_SPARSE) + usr_gender_fc = layers.fc(input=usr_gender_emb, size=16) + + USR_AGE_DICT_SIZE = len(paddle.dataset.movielens.age_table) + usr_age_id = layers.data(name='age_id', shape=[1], dtype="int64") + usr_age_emb = layers.embedding( + input=usr_age_id, + size=[USR_AGE_DICT_SIZE, 16], + is_sparse=IS_SPARSE, + param_attr='age_table') + usr_age_fc = layers.fc(input=usr_age_emb, size=16) + + USR_JOB_DICT_SIZE = paddle.dataset.movielens.max_job_id() + 1 + usr_job_id = layers.data(name='job_id', shape=[1], dtype="int64") + usr_job_emb = layers.embedding( + input=usr_job_id, + size=[USR_JOB_DICT_SIZE, 16], + param_attr='job_table', + is_sparse=IS_SPARSE) + usr_job_fc = layers.fc(input=usr_job_emb, size=16) + + concat_embed = layers.concat( + input=[usr_fc, usr_gender_fc, usr_age_fc, usr_job_fc], axis=1) + + usr_combined_features = layers.fc(input=concat_embed, size=200, act="tanh") + return usr_combined_features + + +def get_mov_combined_features(): + MOV_DICT_SIZE = paddle.dataset.movielens.max_movie_id() + 1 + mov_id = layers.data(name='movie_id', shape=[1], dtype='int64') + mov_emb = layers.embedding( + input=mov_id, + dtype='float32', + size=[MOV_DICT_SIZE, 32], + param_attr='movie_table', + is_sparse=IS_SPARSE) + mov_fc = layers.fc(input=mov_emb, size=32) + + CATEGORY_DICT_SIZE = len(paddle.dataset.movielens.movie_categories()) + category_id = layers.data(name='category_id', shape=[1], dtype='int64') + mov_categories_emb = layers.embedding( + input=category_id, size=[CATEGORY_DICT_SIZE, 32], is_sparse=IS_SPARSE) + mov_categories_hidden = layers.sequence_pool( + input=mov_categories_emb, pool_type="sum") + + MOV_TITLE_DICT_SIZE = len(paddle.dataset.movielens.get_movie_title_dict()) + mov_title_id = layers.data(name='movie_title', shape=[1], dtype='int64') + mov_title_emb = layers.embedding( + input=mov_title_id, size=[MOV_TITLE_DICT_SIZE, 32], is_sparse=IS_SPARSE) + mov_title_conv = nets.sequence_conv_pool( + input=mov_title_emb, + num_filters=32, + filter_size=3, + act="tanh", + pool_type="sum") + + concat_embed = layers.concat( + input=[mov_fc, mov_categories_hidden, mov_title_conv], axis=1) + + mov_combined_features = layers.fc(input=concat_embed, size=200, act="tanh") + return mov_combined_features + + +def model(): + usr_combined_features = get_usr_combined_features() + mov_combined_features = get_mov_combined_features() + + # need cos sim + inference = layers.cos_sim(X=usr_combined_features, Y=mov_combined_features) + scale_infer = layers.scale(x=inference, scale=5.0) + + label = layers.data(name='score', shape=[1], dtype='float32') + square_cost = layers.square_error_cost(input=scale_infer, label=label) + avg_cost = layers.mean(x=square_cost) + + return avg_cost + + +def func_feed(feeding, data, place): + feed_tensors = {} + for (key, idx) in feeding.iteritems(): + tensor = core.LoDTensor() + if key != "category_id" and key != "movie_title": + if key == "score": + numpy_data = np.array(map(lambda x: x[idx], data)).astype( + "float32") + else: + numpy_data = np.array(map(lambda x: x[idx], data)).astype( + "int64") + else: + numpy_data = map(lambda x: np.array(x[idx]).astype("int64"), data) + lod_info = [len(item) for item in numpy_data] + offset = 0 + lod = [offset] + for item in lod_info: + offset += item + lod.append(offset) + numpy_data = np.concatenate(numpy_data, axis=0) + tensor.set_lod([lod]) + + numpy_data = numpy_data.reshape([numpy_data.shape[0], 1]) + tensor.set(numpy_data, place) + feed_tensors[key] = tensor + return feed_tensors + + +def main(): + cost = model() + optimizer = SGDOptimizer(learning_rate=0.2) + optimize_ops, params_grads = optimizer.minimize(cost) + + train_reader = paddle.batch( + paddle.reader.shuffle( + paddle.dataset.movielens.train(), buf_size=8192), + batch_size=BATCH_SIZE) + + place = fluid.CPUPlace() + exe = fluid.Executor(place) + + t = fluid.DistributeTranspiler() + + # all parameter server endpoints list for spliting parameters + pserver_endpoints = os.getenv("PSERVERS") + # server endpoint for current node + current_endpoint = os.getenv("SERVER_ENDPOINT") + # run as trainer or parameter server + training_role = os.getenv("TRAINING_ROLE", "TRAINER") + t.transpile( + optimize_ops, params_grads, pservers=pserver_endpoints, trainers=2) + + if training_role == "PSERVER": + if not current_endpoint: + print("need env SERVER_ENDPOINT") + exit(1) + pserver_prog = t.get_pserver_program(current_endpoint) + pserver_startup = t.get_startup_program(current_endpoint, pserver_prog) + exe.run(pserver_startup) + exe.run(pserver_prog) + elif training_role == "TRAINER": + exe.run(fluid.default_startup_program()) + trainer_prog = t.get_trainer_program() + + feeding = { + 'user_id': 0, + 'gender_id': 1, + 'age_id': 2, + 'job_id': 3, + 'movie_id': 4, + 'category_id': 5, + 'movie_title': 6, + 'score': 7 + } + + for pass_id in range(PASS_NUM): + for data in train_reader(): + outs = exe.run(trainer_prog, + feed=func_feed(feeding, data, place), + fetch_list=[cost]) + out = np.array(outs[0]) + print("cost=" + str(out[0])) + if out[0] < 6.0: + print("Training complete. Average cost is less than 6.0.") + # if avg cost less than 6.0, we think our code is good. + exit(0) + else: + print("environment var TRAINER_ROLE should be TRAINER os PSERVER") + + +if __name__ == '__main__': + main() diff --git a/python/paddle/v2/fluid/tests/test_multihead_attention.py b/python/paddle/v2/fluid/tests/test_multihead_attention.py index 54ec3e3d6e53f35d6a518ef659853e1a13c1711f..a2b300a645fe21931cc12a4e7bb8ebe9b85707c9 100644 --- a/python/paddle/v2/fluid/tests/test_multihead_attention.py +++ b/python/paddle/v2/fluid/tests/test_multihead_attention.py @@ -58,7 +58,7 @@ class TestMultiheadAttention(unittest.TestCase): """Run the test program. """ places = [core.CPUPlace()] - if core.is_compile_gpu(): + if core.is_compiled_with_cuda(): places.append(core.CUDAPlace(0)) for place in places: