diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index 842fde1ec5f16aeec28a790f1869eaed64e3516c..19717745275c57060d475ca5145b4ca90a71491c 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -59,7 +59,7 @@ paddle.fluid.DistributeTranspiler.__init__ ArgSpec(args=['self', 'config'], vara paddle.fluid.DistributeTranspiler.get_pserver_program ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None) paddle.fluid.DistributeTranspiler.get_pserver_programs ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None) paddle.fluid.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None, None)) -paddle.fluid.DistributeTranspiler.get_trainer_program ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) +paddle.fluid.DistributeTranspiler.get_trainer_program ArgSpec(args=['self', 'wait_port'], varargs=None, keywords=None, defaults=(True,)) paddle.fluid.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None)) paddle.fluid.InferenceTranspiler.__init__ paddle.fluid.InferenceTranspiler.transpile ArgSpec(args=['self', 'program', 'place', 'scope'], varargs=None, keywords=None, defaults=(None,)) @@ -346,7 +346,7 @@ paddle.fluid.transpiler.DistributeTranspiler.__init__ ArgSpec(args=['self', 'con paddle.fluid.transpiler.DistributeTranspiler.get_pserver_program ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None) paddle.fluid.transpiler.DistributeTranspiler.get_pserver_programs ArgSpec(args=['self', 'endpoint'], varargs=None, keywords=None, defaults=None) paddle.fluid.transpiler.DistributeTranspiler.get_startup_program ArgSpec(args=['self', 'endpoint', 'pserver_program', 'startup_program'], varargs=None, keywords=None, defaults=(None, None)) -paddle.fluid.transpiler.DistributeTranspiler.get_trainer_program ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) +paddle.fluid.transpiler.DistributeTranspiler.get_trainer_program ArgSpec(args=['self', 'wait_port'], varargs=None, keywords=None, defaults=(True,)) paddle.fluid.transpiler.DistributeTranspiler.transpile ArgSpec(args=['self', 'trainer_id', 'program', 'pservers', 'trainers', 'sync_mode', 'startup_program'], varargs=None, keywords=None, defaults=(None, '127.0.0.1:6174', 1, True, None)) paddle.fluid.transpiler.InferenceTranspiler.__init__ paddle.fluid.transpiler.InferenceTranspiler.transpile ArgSpec(args=['self', 'program', 'place', 'scope'], varargs=None, keywords=None, defaults=(None,)) diff --git a/paddle/fluid/inference/tests/api/CMakeLists.txt b/paddle/fluid/inference/tests/api/CMakeLists.txt index ff6bb662c14d9da60f2f6c10477bdd1fbe3a82c2..6c6c887ac6c084bd78add0163036fdf0430b276d 100644 --- a/paddle/fluid/inference/tests/api/CMakeLists.txt +++ b/paddle/fluid/inference/tests/api/CMakeLists.txt @@ -1,56 +1,58 @@ -function (inference_download_and_uncompress install_dir url) - get_filename_component(filename ${url} NAME) - message(STATUS "Download inference test stuff ${filename} from ${url}") +set(INFERENCE_URL "http://paddle-inference-dist.bj.bcebos.com") +set(INFERENCE_DEMO_INSTALL_DIR "${THIRD_PARTY_PATH}/inference_demo") +set(INFERENCE_EXTRA_DEPS paddle_inference_api paddle_fluid_api ir_pass_manager analysis_predictor) +function (inference_download_and_uncompress install_dir filename) + message(STATUS "Download inference test stuff from ${INFERENCE_URL}/${filename}") execute_process(COMMAND bash -c "mkdir -p ${install_dir}") - execute_process(COMMAND bash -c "cd ${install_dir} && wget -q ${url}") + execute_process(COMMAND bash -c "cd ${install_dir} && wget -q ${INFERENCE_URL}/${filename}") execute_process(COMMAND bash -c "cd ${install_dir} && tar xzf ${filename}") message(STATUS "finish downloading ${filename}") endfunction(inference_download_and_uncompress) -function(download_model_and_data install_dir model_url data_url) +function(download_model_and_data install_dir model_name data_name) if (NOT EXISTS ${install_dir} AND WITH_INFERENCE) - inference_download_and_uncompress(${install_dir} ${model_url}) - inference_download_and_uncompress(${install_dir} ${data_url}) + inference_download_and_uncompress(${install_dir} ${model_name}) + inference_download_and_uncompress(${install_dir} ${data_name}) endif() endfunction() # RNN1 -set(RNN1_MODEL_URL "http://paddle-inference-dist.bj.bcebos.com/rnn1%2Fmodel.tar.gz") -set(RNN1_DATA_URL "http://paddle-inference-dist.bj.bcebos.com/rnn1%2Fdata.txt.tar.gz") -set(RNN1_INSTALL_DIR "${THIRD_PARTY_PATH}/inference_demo/rnn1") -download_model_and_data(${RNN1_INSTALL_DIR} ${RNN1_MODEL_URL} ${RNN1_DATA_URL}) -inference_analysis_test(test_analyzer_rnn1 SRCS analyzer_rnn1_tester.cc - EXTRA_DEPS paddle_inference_api paddle_fluid_api ir_pass_manager analysis_predictor +set(RNN1_INSTALL_DIR "${INFERENCE_DEMO_INSTALL_DIR}/rnn1") +download_model_and_data(${RNN1_INSTALL_DIR} "rnn1%2Fmodel.tar.gz" "rnn1%2Fdata.txt.tar.gz") +inference_analysis_test(test_analyzer_rnn1 SRCS analyzer_rnn1_tester.cc + EXTRA_DEPS ${INFERENCE_EXTRA_DEPS} ARGS --infer_model=${RNN1_INSTALL_DIR}/model --infer_data=${RNN1_INSTALL_DIR}/data.txt) +# RNN2 +set(RNN2_INSTALL_DIR "${INFERENCE_DEMO_INSTALL_DIR}/rnn2") +download_model_and_data(${RNN2_INSTALL_DIR} "rnn2_model.tar.gz" "rnn2_data.txt.tar.gz") +inference_analysis_test(test_analyzer_rnn2 SRCS analyzer_rnn2_tester.cc + EXTRA_DEPS ${INFERENCE_EXTRA_DEPS} + ARGS --infer_model=${RNN2_INSTALL_DIR}/model + --infer_data=${RNN2_INSTALL_DIR}/data.txt) + # chinese_ner -set(CHINESE_NER_MODEL_URL "http://paddle-inference-dist.bj.bcebos.com/chinese_ner_model.tar.gz") -set(CHINESE_NER_DATA_URL "http://paddle-inference-dist.bj.bcebos.com/chinese_ner-data.txt.tar.gz") -set(CHINESE_NER_INSTALL_DIR "${THIRD_PARTY_PATH}/inference_demo/chinese_ner") -download_model_and_data(${CHINESE_NER_INSTALL_DIR} ${CHINESE_NER_MODEL_URL} ${CHINESE_NER_DATA_URL}) +set(CHINESE_NER_INSTALL_DIR "${INFERENCE_DEMO_INSTALL_DIR}/chinese_ner") +download_model_and_data(${CHINESE_NER_INSTALL_DIR} "chinese_ner_model.tar.gz" "chinese_ner-data.txt.tar.gz") inference_analysis_test(test_analyzer_ner SRCS analyzer_ner_tester.cc - EXTRA_DEPS paddle_inference_api paddle_fluid_api analysis_predictor + EXTRA_DEPS ${INFERENCE_EXTRA_DEPS} ARGS --infer_model=${CHINESE_NER_INSTALL_DIR}/model --infer_data=${CHINESE_NER_INSTALL_DIR}/data.txt) # lac -set(LAC_MODEL_URL "http://paddle-inference-dist.bj.bcebos.com/lac_model.tar.gz") -set(LAC_DATA_URL "http://paddle-inference-dist.bj.bcebos.com/lac_data.txt.tar.gz") -set(LAC_INSTALL_DIR "${THIRD_PARTY_PATH}/inference_demo/lac") -download_model_and_data(${LAC_INSTALL_DIR} ${LAC_MODEL_URL} ${LAC_DATA_URL}) +set(LAC_INSTALL_DIR "${INFERENCE_DEMO_INSTALL_DIR}/lac") +download_model_and_data(${LAC_INSTALL_DIR} "lac_model.tar.gz" "lac_data.txt.tar.gz") inference_analysis_test(test_analyzer_lac SRCS analyzer_lac_tester.cc - EXTRA_DEPS paddle_inference_api paddle_fluid_api ir_pass_manager analysis_predictor + EXTRA_DEPS ${INFERENCE_EXTRA_DEPS} ARGS --infer_model=${LAC_INSTALL_DIR}/model --infer_data=${LAC_INSTALL_DIR}/data.txt) # text_classification -set(TEXT_CLASSIFICATION_MODEL_URL "http://paddle-inference-dist.bj.bcebos.com/text-classification-Senta.tar.gz") -set(TEXT_CLASSIFICATION_DATA_URL "http://paddle-inference-dist.bj.bcebos.com/text_classification_data.txt.tar.gz") -set(TEXT_CLASSIFICATION_INSTALL_DIR "${THIRD_PARTY_PATH}/inference_demo/text_classification") -download_model_and_data(${TEXT_CLASSIFICATION_INSTALL_DIR} ${TEXT_CLASSIFICATION_MODEL_URL} ${TEXT_CLASSIFICATION_DATA_URL}) +set(TEXT_CLASSIFICATION_INSTALL_DIR "${INFERENCE_DEMO_INSTALL_DIR}/text_classification") +download_model_and_data(${TEXT_CLASSIFICATION_INSTALL_DIR} "text-classification-Senta.tar.gz" "text_classification_data.txt.tar.gz") inference_analysis_test(test_text_classification SRCS analyzer_text_classification_tester.cc - EXTRA_DEPS paddle_inference_api paddle_fluid_api analysis_predictor + EXTRA_DEPS ${INFERENCE_EXTRA_DEPS} ARGS --infer_model=${TEXT_CLASSIFICATION_INSTALL_DIR}/text-classification-Senta --infer_data=${TEXT_CLASSIFICATION_INSTALL_DIR}/data.txt --topn=1 # Just run top 1 batch. diff --git a/paddle/fluid/inference/tests/api/analyzer_rnn2_tester.cc b/paddle/fluid/inference/tests/api/analyzer_rnn2_tester.cc new file mode 100644 index 0000000000000000000000000000000000000000..c40ea58eea9c10a85acf84108f1d081a779f526d --- /dev/null +++ b/paddle/fluid/inference/tests/api/analyzer_rnn2_tester.cc @@ -0,0 +1,181 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/inference/analysis/analyzer.h" + +#include +#include +#include // NOLINT +#include "paddle/fluid/framework/ir/fuse_pass_base.h" +#include "paddle/fluid/framework/ir/pass.h" +#include "paddle/fluid/inference/analysis/ut_helper.h" +#include "paddle/fluid/inference/api/analysis_predictor.h" +#include "paddle/fluid/inference/api/helper.h" +#include "paddle/fluid/inference/api/paddle_inference_api.h" +#include "paddle/fluid/inference/api/paddle_inference_pass.h" + +DEFINE_string(infer_model, "", "model path"); +DEFINE_string(infer_data, "", "data path"); +DEFINE_int32(batch_size, 1, "batch size."); +DEFINE_int32(repeat, 1, "Running the inference program repeat times."); +DEFINE_int32(num_threads, 1, "Running the inference program in multi-threads."); + +namespace paddle { +namespace inference { + +using namespace framework; // NOLINT + +struct DataRecord { + std::vector>> link_step_data_all; + std::vector lod; + std::vector> rnn_link_data; + std::vector result_data; + size_t batch_iter{0}; + size_t batch_size{1}; + DataRecord() = default; + explicit DataRecord(const std::string &path, int batch_size = 1) + : batch_size(batch_size) { + Load(path); + } + DataRecord NextBatch() { + DataRecord data; + size_t batch_end = batch_iter + batch_size; + // NOTE skip the final batch, if no enough data is provided. + if (batch_end <= link_step_data_all.size()) { + data.link_step_data_all.assign(link_step_data_all.begin() + batch_iter, + link_step_data_all.begin() + batch_end); + // Prepare LoDs + data.lod.push_back(0); + CHECK(!data.link_step_data_all.empty()) << "empty"; + for (size_t j = 0; j < data.link_step_data_all.size(); j++) { + for (const auto &d : data.link_step_data_all[j]) { + data.rnn_link_data.push_back(d); + // calculate lod + data.lod.push_back(data.lod.back() + 11); + } + } + } + batch_iter += batch_size; + return data; + } + void Load(const std::string &path) { + std::ifstream file(path); + std::string line; + int num_lines = 0; + while (std::getline(file, line)) { + num_lines++; + std::vector data; + split(line, ':', &data); + if (num_lines % 2) { // feature + std::vector feature_data; + split(data[1], ' ', &feature_data); + std::vector> link_step_data; + int feature_count = 1; + std::vector feature; + for (auto &step_data : feature_data) { + std::vector tmp; + split_to_float(step_data, ',', &tmp); + feature.insert(feature.end(), tmp.begin(), tmp.end()); + if (feature_count % 11 == 0) { // each sample has 11 features + link_step_data.push_back(feature); + feature.clear(); + } + feature_count++; + } + link_step_data_all.push_back(std::move(link_step_data)); + } else { // result + std::vector tmp; + split_to_float(data[1], ',', &tmp); + result_data.insert(result_data.end(), tmp.begin(), tmp.end()); + } + } + } +}; +void PrepareInputs(std::vector *input_slots, DataRecord *data, + int batch_size) { + PaddleTensor feed_tensor; + feed_tensor.name = "feed"; + auto one_batch = data->NextBatch(); + int token_size = one_batch.rnn_link_data.size(); + // each token has 11 features, each feature's dim is 54. + std::vector rnn_link_data_shape({token_size * 11, 54}); + feed_tensor.shape = rnn_link_data_shape; + feed_tensor.lod.assign({one_batch.lod}); + feed_tensor.dtype = PaddleDType::FLOAT32; + TensorAssignData(&feed_tensor, one_batch.rnn_link_data); + // Set inputs. + input_slots->assign({feed_tensor}); +} + +void CompareResult(const std::vector &outputs, + const std::vector &base_result) { + PADDLE_ENFORCE_GT(outputs.size(), 0); + for (size_t i = 0; i < outputs.size(); i++) { + auto &out = outputs[i]; + size_t size = std::accumulate(out.shape.begin(), out.shape.end(), 1, + [](int a, int b) { return a * b; }); + PADDLE_ENFORCE_GT(size, 0); + float *data = static_cast(out.data.data()); + for (size_t i = 0; i < size; i++) { + EXPECT_NEAR(data[i], base_result[i], 1e-3); + } + } +} +// Test with a really complicate model. +void TestRNN2Prediction() { + AnalysisConfig config; + config.prog_file = FLAGS_infer_model + "/__model__"; + config.param_file = FLAGS_infer_model + "/param"; + config.use_gpu = false; + config.device = 0; + config.specify_input_name = true; + config.enable_ir_optim = true; + PADDLE_ENFORCE(config.ir_mode == + AnalysisConfig::IrPassMode::kExclude); // default + + int batch_size = FLAGS_batch_size; + int num_times = FLAGS_repeat; + + auto base_predictor = + CreatePaddlePredictor(config); + auto predictor = + CreatePaddlePredictor( + config); + std::vector input_slots; + DataRecord data(FLAGS_infer_data, batch_size); + PrepareInputs(&input_slots, &data, batch_size); + std::vector outputs, base_outputs; + + Timer timer1; + timer1.tic(); + for (int i = 0; i < num_times; i++) { + base_predictor->Run(input_slots, &base_outputs); + } + PrintTime(batch_size, num_times, 1, 0, timer1.toc() / num_times); + + Timer timer2; + timer2.tic(); + for (int i = 0; i < num_times; i++) { + predictor->Run(input_slots, &outputs); + } + PrintTime(batch_size, num_times, 1, 0, timer2.toc() / num_times); + + CompareResult(base_outputs, data.result_data); + CompareResult(outputs, data.result_data); +} + +TEST(Analyzer, rnn2) { TestRNN2Prediction(); } + +} // namespace inference +} // namespace paddle diff --git a/paddle/fluid/operators/distributed/grpc_client.cc b/paddle/fluid/operators/distributed/grpc_client.cc index 07ac20797ddab54296a45e99915588a40cc6f3c7..e22bc552f85b85c75f06b4158f2abac2d3843256 100644 --- a/paddle/fluid/operators/distributed/grpc_client.cc +++ b/paddle/fluid/operators/distributed/grpc_client.cc @@ -290,12 +290,18 @@ void GRPCClient::Proceed() { c->Finish(false); } - delete c; + bool notify = false; { std::lock_guard lk(sync_mutex_); req_count_--; + notify = (req_count_ <= 0 || !c->status_.ok()); + } + + delete c; + + if (notify) { + sync_cond_.notify_all(); } - sync_cond_.notify_all(); } VLOG(3) << "GRPCClient Proceed end"; } diff --git a/python/paddle/fluid/tests/unittests/test_dist_transpiler.py b/python/paddle/fluid/tests/unittests/test_dist_transpiler.py index b85501ef6b80d1f5004aa0dd08c3123d3bda48a5..a198b25520f97ce23b9c1ebb9cd82fc458222d73 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_transpiler.py +++ b/python/paddle/fluid/tests/unittests/test_dist_transpiler.py @@ -62,7 +62,7 @@ class TranspilerTest(unittest.TestCase): t = self._transpiler_instance(config) - trainer_main = t.get_trainer_program() + trainer_main = t.get_trainer_program(wait_port=False) trainer_startup = fluid.default_startup_program() assert (src.num_blocks == 1) diff --git a/python/paddle/fluid/transpiler/details/__init__.py b/python/paddle/fluid/transpiler/details/__init__.py index 5e98266a761c7e01bd6668e85e6adeb54103ca80..f33c05ed2f48c2498b98fc486d6ff7471088d77e 100644 --- a/python/paddle/fluid/transpiler/details/__init__.py +++ b/python/paddle/fluid/transpiler/details/__init__.py @@ -16,3 +16,4 @@ from __future__ import print_function from .program_utils import * from .ufind import * +from .checkport import * diff --git a/python/paddle/fluid/transpiler/details/checkport.py b/python/paddle/fluid/transpiler/details/checkport.py new file mode 100644 index 0000000000000000000000000000000000000000..7bad4b427a2d53bd14c7a1f870ce74a883158d04 --- /dev/null +++ b/python/paddle/fluid/transpiler/details/checkport.py @@ -0,0 +1,50 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import time +import socket +from contextlib import closing + + +def wait_server_ready(endpoints): + """ + Wait until parameter servers are ready, use connext_ex to detect + port readiness. + + Args: + endpoints (list): endpoints string list, like: + ["127.0.0.1:8080", "127.0.0.1:8081"] + + Examples: + .. code-block:: python + + wait_server_ready(["127.0.0.1:8080", "127.0.0.1:8081"]) + """ + while True: + all_ok = True + for ep in endpoints: + ip_port = ep.split(":") + with closing(socket.socket(socket.AF_INET, + socket.SOCK_STREAM)) as sock: + sock.settimeout(2) + result = sock.connect_ex((ip_port[0], int(ip_port[1]))) + if result != 0: + all_ok = False + if not all_ok: + sys.stderr.write("pserver not ready, wait 3 sec to retry...\n") + sys.stderr.flush() + time.sleep(3) + else: + break diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index d4d218d547a394a56c040ade2a9ba703b691b86b..53c9cbe23dd82af866658fe46d1d631b0a3b26f3 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -381,7 +381,7 @@ class DistributeTranspiler(object): pserver_endpoints) self._split_table_grad_and_add_send_vars(program, pserver_endpoints) - def get_trainer_program(self): + def get_trainer_program(self, wait_port=True): """ Get transpiled trainer side program. @@ -393,6 +393,9 @@ class DistributeTranspiler(object): delete_ops(self.origin_program.global_block(), self.optimize_ops) self.origin_program.__str__() + if wait_port: + wait_server_ready(self.pserver_endpoints) + return self.origin_program def _get_trainer_startup_program(self, recv_vars, eplist):