From f938ccec62d1cb34dbc6f35f662f33e262d72528 Mon Sep 17 00:00:00 2001 From: guru4elephant <35550832+guru4elephant@users.noreply.github.com> Date: Wed, 1 May 2019 21:47:06 +0800 Subject: [PATCH] remove async executor python api to fix document (#17174) * remove async executor python api test=develop * remove test_async_executor.py add executor train_from_dataset demo test=develop * fix import bug test=develop --- paddle/fluid/API.spec | 10 - python/paddle/fluid/__init__.py | 5 +- python/paddle/fluid/async_executor.py | 335 ------------------ python/paddle/fluid/distributed/helper.py | 2 +- ..._executor.py => executor_train_dataset.py} | 14 +- .../tests/unittests/test_async_executor.py | 86 ----- 6 files changed, 7 insertions(+), 445 deletions(-) delete mode 100644 python/paddle/fluid/async_executor.py rename python/paddle/fluid/tests/demo/{async_executor.py => executor_train_dataset.py} (88%) delete mode 100644 python/paddle/fluid/tests/unittests/test_async_executor.py diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index c5e6c558e..78c7729f9 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -39,16 +39,6 @@ paddle.fluid.DataFeedDesc.desc (ArgSpec(args=['self'], varargs=None, keywords=No paddle.fluid.DataFeedDesc.set_batch_size (ArgSpec(args=['self', 'batch_size'], varargs=None, keywords=None, defaults=None), ('document', '8d9f44601e0a99dd431f14fd9250cd21')) paddle.fluid.DataFeedDesc.set_dense_slots (ArgSpec(args=['self', 'dense_slots_name'], varargs=None, keywords=None, defaults=None), ('document', 'eb894b464bbcd1b4bc8038398954f766')) paddle.fluid.DataFeedDesc.set_use_slots (ArgSpec(args=['self', 'use_slots_name'], varargs=None, keywords=None, defaults=None), ('document', '415c56600ce4e198c071cad01409a690')) -paddle.fluid.AsyncExecutor.__init__ (ArgSpec(args=['self', 'place', 'run_mode'], varargs=None, keywords=None, defaults=(None, '')), ('document', '4e85874dddcd06c38f5717992d741589')) -paddle.fluid.AsyncExecutor.config_distributed_nodes (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '762980fe0181eb41e3d1081b26ed76b1')) -paddle.fluid.AsyncExecutor.download_data (ArgSpec(args=['self', 'afs_path', 'local_path', 'fs_default_name', 'ugi', 'file_cnt', 'hadoop_home', 'process_num'], varargs=None, keywords=None, defaults=('$HADOOP_HOME', 12)), ('document', '39e3ccddf8ea8db75ea85287c9147c3b')) -paddle.fluid.AsyncExecutor.get_instance (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', 'f8688f76a2db1243c7097a60c507b182')) -paddle.fluid.AsyncExecutor.init_model (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '504f39be2007404a17e5cabea1256c7d')) -paddle.fluid.AsyncExecutor.init_server (ArgSpec(args=['self', 'dist_desc'], varargs=None, keywords=None, defaults=None), ('document', '384fa5fbb99912db1baf7ef7784bd312')) -paddle.fluid.AsyncExecutor.init_worker (ArgSpec(args=['self', 'dist_desc', 'startup_program'], varargs=None, keywords=None, defaults=None), ('document', 'f0a36d7c8561039f60a6f6555c7fee0b')) -paddle.fluid.AsyncExecutor.run (ArgSpec(args=['self', 'program', 'data_feed', 'filelist', 'thread_num', 'fetch', 'mode', 'debug'], varargs=None, keywords=None, defaults=('', False)), ('document', '848fc53484e8326f6325feea87fe955c')) -paddle.fluid.AsyncExecutor.save_model (ArgSpec(args=['self', 'save_path'], varargs=None, keywords=None, defaults=None), ('document', '145b5c0da01bfff397142e51361f4b75')) -paddle.fluid.AsyncExecutor.stop (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '5f23d043607bb5d55e466ec3f578e093')) paddle.fluid.CompiledProgram.__init__ (ArgSpec(args=['self', 'program_or_graph'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.CompiledProgram.with_data_parallel (ArgSpec(args=['self', 'loss_name', 'build_strategy', 'exec_strategy', 'share_vars_from', 'places'], varargs=None, keywords=None, defaults=(None, None, None, None, None)), ('document', 'a8c7793803cf976680d9478e378fa356')) paddle.fluid.CompiledProgram.with_inference_optimize (ArgSpec(args=['self', 'config'], varargs=None, keywords=None, defaults=None), ('document', '9e5b009d850191a010e859189c127fd8')) diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index 3dc2b0c89..631257cc2 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -27,9 +27,6 @@ from .data_feed_desc import * from . import dataset from .dataset import * -from . import async_executor -from .async_executor import * - from . import trainer_desc from . import inferencer @@ -74,7 +71,7 @@ Tensor = LoDTensor __all__ = framework.__all__ + executor.__all__ + \ trainer_desc.__all__ + inferencer.__all__ + transpiler.__all__ + \ parallel_executor.__all__ + lod_tensor.__all__ + \ - data_feed_desc.__all__ + async_executor.__all__ + compiler.__all__ + [ + data_feed_desc.__all__ + compiler.__all__ + [ 'io', 'initializer', 'layers', diff --git a/python/paddle/fluid/async_executor.py b/python/paddle/fluid/async_executor.py deleted file mode 100644 index 2442d26d3..000000000 --- a/python/paddle/fluid/async_executor.py +++ /dev/null @@ -1,335 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import numpy as np -import contextlib -import six -from .framework import Program, default_main_program, Variable -from . import core -from .executor import global_scope, Executor -from paddle.fluid.proto import data_feed_pb2 -from google.protobuf import text_format -from . import io -from .data_feed_desc import DataFeedDesc -from .trainer_desc import TrainerDesc, MultiTrainer, DistMultiTrainer -from .distributed import ps_instance -from .contrib.utils import hdfs_utils as hdfs - -__all__ = ['AsyncExecutor'] - - -class AsyncExecutor(object): - """ - An asynchronous Executor in Python. Through exploiting the power of - multi-core processor and data queueing, AsyncExecutor makes data reading - and cosuming decoupled, each run in multiple threads in parallel. - - Instead of reading data in python side, AsyncExecutor accepts a training - file list, which will be retrieved in C++, then training inputs will be - read, parsed and fed to training network within C++ code. - - AsyncExecutor is in active development and the API might change in the near - future. - - Example: - >>> data_feed = fluid.DataFeedDesc('data.proto') - >>> startup_program = fluid.default_startup_program() - >>> main_program = fluid.default_main_program() - >>> filelist = ["train_data/part-%d" % i for i in range(100)] - >>> thread_num = len(filelist) / 4 - >>> - >>> place = fluid.CPUPlace() - >>> async_executor = fluid.AsyncExecutor(place) - >>> - >>> async_executor.run_startup_program(startup_program) - >>> - >>> epoch = 10 - >>> for i in range(epoch): - >>> async_executor.run(main_program, - >>> data_feed, - >>> filelist, - >>> thread_num, - >>> [acc], - >>> debug=False) - - Args: - place(fluid.CPUPlace|None): indicate the executor run on which device. - Only CPUPlace supported - - Note: - For debugging complicated network in parallel-GPUs, you can test it - on the executor. They has the exactly same arguments, and expected - the same results. - - Note: Only running on CPUPlace supported. - """ - - def __init__(self, place=None, run_mode=""): - """ - Init. - - Example: - >>> place = fluid.CPUPlace() - >>> async_executor = fluid.AsyncExecutor(place) - - Args: - place(Place): CPUPlace only - run_mode(str): default is empty string. - """ - if place is None: - place = core.CPUPlace() - if not isinstance(place, core.CPUPlace): - raise ValueError("AsyncExecutor only supports CPU device") - - p = core.Place() - p.set_place(place) - - scope = global_scope() - self.executor = core.AsyncExecutor(scope, p) - self.instance = None - - def run(self, - program, - data_feed, - filelist, - thread_num, - fetch, - mode="", - debug=False): - """ - Run program by this AsyncExecutor. Training dataset will be in filelist. - Users can also inspect certain variables by naming them in parameter - :code:`fetch`, like in fluid.Executor. Unlike fluid.Executor, however, - AsyncExecutor doesn't return fetched variables, instead, it will dump - the values of each fetched variable to stdandard output. - - Running the dataset will be on multiple threads, within each a thread - local scope will be created, then all OPs also created in that scope. - Parameters are updated by all the OPs simultaneously. - - Args: - program(Program): the program that need to run, if not provied, - then default_main_program will be used. - data_feed(DataFeedDesc): A DataFeedDesc object - filelist(str): a file containing the training dataset file list - thread_num(int): number of concurrent training threads. See - :code:`Note` for how to set this properly - fetch(str|list): the var name or a list of var names to inspect - mode(str): run mode of this interface - debug(bool): When set to True, fetch vars will be printed to - standard output after each minibatch - - Note: - the executor will run all operators in the program but not only - the operators dependent by the fetch_list. - - Note: - Running AsyncExecutor will be on multiple threads, each bound to a - CPU core. To achieve best performance, it's suggested to set thread - num to be equal or slightly less than that of CPU cores. - """ - if program is None: - program = default_main_program() - program_desc = program.desc - - if data_feed is None: - raise ValueError('ValueError: data_feed should be provided') - - if filelist is None: - raise ValueError('ValueError: filelist should be provided') - - if isinstance(filelist, str): - filelist = [filelist] - - if not isinstance(thread_num, int): - raise TypeError('TypeError: thread_num should be a positive number') - - if fetch is not None: - if isinstance(fetch, Variable): - fetch = [fetch] - fetch_var_names = [var.name for var in fetch] - for fetch_var in fetch: - shape = fetch_var.shape - if shape[len(shape) - 1] != 1: - raise AssertionError( - "%s: Fetch variable has wrong shape. Only varibles " - "with the last dimension size 1 supported." % - (fetch_var.name)) - - self.executor.run_from_files(program_desc, - data_feed.desc(), filelist, thread_num, - fetch_var_names, mode, debug, - str(id(program_desc))) - - def download_data(self, - afs_path, - local_path, - fs_default_name, - ugi, - file_cnt, - hadoop_home="$HADOOP_HOME", - process_num=12): - """ - download_data is a default download method for distributed training - a user download data without this method - - Example: - >>> exe = fluid.AsyncExecutor() - >>> exe.download_data("/xxx/xxx/xx/", - >>> "./data", "afs:// - >>> xxx.xxx.xxx.xxx:9901", "xxx,yyy") - - Args: - afs_path(str): afs_path defined by users - local_path(str): download data path - fs_default_name(str): file system server address - ugi(str): hadoop ugi - file_cnt(int): a user can specify file number for debugging - hadoop_home(str): hadoop home path - process_num(int): download process num - """ - if self.instance is None: - raise ValueError('instance is None, please run' - 'config_distributed_nodes init instance') - - configs = {"fs.default.name": fs_default_name, "hadoop.job.ugi": ugi} - - client = hdfs.HDFSClient(hadoop_home, configs) - downloads = hdfs.multi_download( - client, - afs_path, - local_path, - self.instance.get_worker_index(), - self.instance.get_node_cnt() / 2, - multi_processes=process_num) - self.instance.barrier_worker() #wait for download_data - - def get_instance(self): - """ - get current node's instance so that user can do operations - in distributed setting - """ - if self.instance is None: - raise ValueError( - 'instance is None, please run config_distributed_nodes init instance' - ) - return self.instance - - def config_distributed_nodes(self): - """ - if a user needs to run distributed async executor - he or she needs to do a global configuration so that - information of current process can be obtained - """ - self.instance = ps_instance.PaddlePSInstance(1, 2) - return self.instance - - def stop(self): - """ - at the end of process, users should call stop to servers - and barrier all workers - """ - if self.instance is None: - raise ValueError( - 'instance is None, please run config_distributed_nodes init instance' - ) - self.instance.barrier_worker() #worker do all things - if self.instance.is_first_worker(): - self.executor.stop_server() - self.instance.barrier_worker() #sync - self.instance.barrier_all() - self.instance.finalize() - - def init_server(self, dist_desc): - """ - Initialize server of current node if current process is a server. - - Args: - dist_desc(str): a protobuf string that describes - how to init a worker and a server - """ - if self.instance is None: - raise ValueError( - 'instance is None, please run config_distributed_nodes init instance' - ) - self.dist_desc_str = text_format.MessageToString(dist_desc) - self.dist_desc = dist_desc - self.executor.init_server(self.dist_desc_str, self.instance._rankid) - ip = self.executor.start_server() - self.instance.set_ip(ip) - self.instance.barrier_all() #wait all server start - ips = self.instance.gather_ips() - self.executor.gather_servers(ips, self.instance.get_node_cnt()) - self.instance.barrier_all() #wait all worker start - - def init_worker(self, dist_desc, startup_program): - """ - Initialize worker of current node if current process is a worker. - - Args: - dist_desc(str): a protobuf string that describes - how to init a worker and a server - startup_program(fluid.Program): startup program of current process - """ - if self.instance is None: - raise ValueError( - 'instance is None, please run config_distributed_nodes init instance' - ) - - self.dist_desc_str = text_format.MessageToString(dist_desc) - self.dist_desc = dist_desc - place = core.CPUPlace() - executor = Executor(place) - if isinstance(startup_program, list): - for sp in startup_program: - executor.run(sp) - else: - executor.run(startup_program) - - self.instance.barrier_all() #wait all server start - ips = self.instance.gather_ips() - self.executor.init_worker(self.dist_desc_str, ips, - self.instance.get_node_cnt(), - self.instance._rankid) - self.instance.barrier_all() #wait all worker start - if self.instance.is_first_worker(): - self.executor.init_model() - self.instance.barrier_worker() #wait init model - - def init_model(self): - """ - init_model command that can be invoked from one of the worker - model parameters are initialized in servers - """ - if self.instance is None: - raise ValueError( - 'instance is None, please run config_distributed_nodes init instance' - ) - self.executor.init_model() - - def save_model(self, save_path): - """ - save_model command that can be invoked from one of the worker - model parameters are saved in servers and upload to save_path of file system. - - Args: - save_path(str): save path to file system - """ - if self.instance is None: - raise ValueError( - 'instance is None, please run config_distributed_nodes init instance' - ) - self.executor.save_model(save_path) diff --git a/python/paddle/fluid/distributed/helper.py b/python/paddle/fluid/distributed/helper.py index 06d3d0315..20f45b4e7 100644 --- a/python/paddle/fluid/distributed/helper.py +++ b/python/paddle/fluid/distributed/helper.py @@ -15,7 +15,7 @@ class FileSystem(object): """ - A file system that support async_executor hadoop client desc. + A file system that support hadoop client desc. Args: fs_type (string): fs_type, for example is "afs" diff --git a/python/paddle/fluid/tests/demo/async_executor.py b/python/paddle/fluid/tests/demo/executor_train_dataset.py similarity index 88% rename from python/paddle/fluid/tests/demo/async_executor.py rename to python/paddle/fluid/tests/demo/executor_train_dataset.py index fe8da0aab..6938982de 100644 --- a/python/paddle/fluid/tests/demo/async_executor.py +++ b/python/paddle/fluid/tests/demo/executor_train_dataset.py @@ -58,9 +58,8 @@ def train(): tarf.close() # Initialize dataset description - dataset = fluid.DataFeedDesc('train_data/data.prototxt') + dataset = fluid.DatasetFactory().create_dataset() dataset.set_batch_size(128) # See API doc for how to change other fields - print dataset.desc() # Debug purpose: see what we get # define network # input text data @@ -68,7 +67,7 @@ def train(): name="words", shape=[1], dtype="int64", lod_level=1) # label data label = fluid.layers.data(name="label", shape=[1], dtype="int64") - + dataset.set_use_var([data, label]) avg_cost, acc, prediction = bow_net(data, label) sgd_optimizer = fluid.optimizer.Adagrad(learning_rate=0.002) opt_ops, weight_and_grad = sgd_optimizer.minimize(avg_cost) @@ -79,18 +78,15 @@ def train(): executor = fluid.Executor(place) executor.run(startup_program) - async_executor = fluid.AsyncExecutor(place) main_program = fluid.default_main_program() epochs = 10 filelist = ["train_data/part-%d" % i for i in range(12)] + dataset.set_filelist(filelist) for i in range(epochs): - thread_num = 4 - async_executor.run( + dataset.set_thread(4) + executor.train_from_dataset( main_program, # This can be changed during iteration dataset, # This can be changed during iteration - filelist, # This can be changed during iteration - thread_num, # This can be changed during iteration - [data, acc], # Multiple fetch targets can be specified debug=False) fluid.io.save_inference_model('imdb/epoch%d.model' % i, [data.name, label.name], [acc], executor) diff --git a/python/paddle/fluid/tests/unittests/test_async_executor.py b/python/paddle/fluid/tests/unittests/test_async_executor.py deleted file mode 100644 index 563301691..000000000 --- a/python/paddle/fluid/tests/unittests/test_async_executor.py +++ /dev/null @@ -1,86 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import paddle.fluid as fluid -import paddle -import unittest -import tarfile -import os -import shutil - -proto_str = ('name: "MultiSlotDataFeed"\n' - 'batch_size: 2\n' - 'multi_slot_desc {\n' - ' slots {\n' - ' name: "words"\n' - ' type: "uint64"\n' - ' is_dense: false\n' - ' is_used: true\n' - ' }\n' - ' slots {\n' - ' name: "label"\n' - ' type: "uint64"\n' - ' is_dense: false\n' - ' is_used: true\n' - ' }\n' - '}') - -URL = 'http://paddle-unittest-data.gz.bcebos.com/python_paddle_fluid_tests_demo_async-executor/train_data.tar.gz' -MD5 = '2a405a31508969b3ab823f42c0f522ca' - - -def bow_net(data, - label, - dict_dim=89528, - emb_dim=128, - hid_dim=128, - hid_dim2=96, - class_dim=2): - """ - BOW net - This model is from https://github.com/PaddlePaddle/models: - models/fluid/PaddleNLP/text_classification/nets.py - """ - # embedding - emb = fluid.layers.embedding( - input=data, size=[dict_dim, emb_dim], is_sparse=True) - bow = fluid.layers.sequence_pool(input=emb, pool_type='sum') - bowh = fluid.layers.tanh(bow) - # fc layer after conv - fc_1 = fluid.layers.fc(input=bowh, size=hid_dim, act="tanh") - fc_2 = fluid.layers.fc(input=fc_1, size=hid_dim2, act="tanh") - # probability of each class - prediction = fluid.layers.fc(input=[fc_2], size=class_dim, act="softmax") - # cross entropy loss - cost = fluid.layers.cross_entropy(input=prediction, label=label) - # mean loss - avg_cost = fluid.layers.mean(x=cost) - acc = fluid.layers.accuracy(input=prediction, label=label) - return avg_cost, acc, prediction - - -class TestAsyncExecutor(unittest.TestCase): - def setUp(self): - with open('./data.prototxt', 'w+') as f: - f.write(proto_str) - f.close() - - with tarfile.open(paddle.dataset.common.download(URL, "imdb", - MD5)) as tarf: - tarf.extractall(path='./') - tarf.close() - - -if __name__ == '__main__': - unittest.main() -- GitLab