diff --git a/python/paddle/fluid/contrib/utils/hdfs_utils.py b/python/paddle/fluid/contrib/utils/hdfs_utils.py index 42b4d7feab669f5c041a31a17fd4c2b16b38c19d..baea57ccce0e9ca3a8fab244e43a107a89cfe67d 100644 --- a/python/paddle/fluid/contrib/utils/hdfs_utils.py +++ b/python/paddle/fluid/contrib/utils/hdfs_utils.py @@ -32,6 +32,28 @@ _logger.setLevel(logging.INFO) class HDFSClient(object): + """ + A tool of HDFS + + Args: + hadoop_home (string): hadoop_home + configs (dict): hadoop config, it is a dict, please contain \ + key "fs.default.name" and "hadoop.job.ugi" + Can be a float value + Examples: + hadoop_home = "/home/client/hadoop-client/hadoop/" + + configs = { + "fs.default.name": "hdfs://xxx.hadoop.com:54310", + "hadoop.job.ugi": "hello,hello123" + } + + client = HDFSClient(hadoop_home, configs) + + client.ls("/user/com/train-25") + files = client.lsr("/user/com/train-25/models") + """ + def __init__(self, hadoop_home, configs): self.pre_commands = [] hadoop_bin = '%s/bin/hadoop' % hadoop_home @@ -55,7 +77,10 @@ class HDFSClient(object): whole_commands = " ".join(whole_commands) for x in range(retry_times + 1): proc = subprocess.Popen( - whole_commands, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) + whole_commands, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=True) (output, errors) = proc.communicate() ret_code, ret_out, ret_err = proc.returncode, output, errors if ret_code: @@ -69,10 +94,12 @@ class HDFSClient(object): def upload(self, hdfs_path, local_path, overwrite=False, retry_times=5): """ upload the local file to hdfs - args: - local_file_path: the local file path - remote_file_path: default value(${OUTPUT_PATH}/${SYS_USER_ID}/${SYS_JOB_ID}/tmp) - return: + Args: + hdfs_path: hdfs path, target path + local_path: local file path, source path + overwrite: will overwrite the original file + retry_times: max times retry to upload + Returns: True or False """ assert hdfs_path is not None @@ -115,10 +142,12 @@ class HDFSClient(object): def download(self, hdfs_path, local_path, overwrite=False, unzip=False): """ download from hdfs - args: - local_file_path: the local file path - remote_file_path: remote dir on hdfs - return: + Args: + hdfs_path: hdfs path, target path + local_path: local file path, source path + overwrite: will remove original file and overwrite it. + unzip: ignore this param + Returns True or False """ _logger.info('Downloading %r to %r.', hdfs_path, local_path) @@ -160,11 +189,11 @@ class HDFSClient(object): def is_exist(self, hdfs_path=None): """ whether the remote hdfs path exists? - args: - remote_file_path: default value(${OUTPUT_PATH}/${SYS_USER_ID}/${SYS_JOB_ID}/tmp) + Args: + hdfs_path: default value(${OUTPUT_PATH}/${SYS_USER_ID}/${SYS_JOB_ID}/tmp) fs_name: The default values are the same as in the job configuration fs_ugi: The default values are the same as in the job configuration - return: + Returns: True or False """ exist_cmd = ['-test', '-e', hdfs_path] @@ -183,11 +212,11 @@ class HDFSClient(object): def is_dir(self, hdfs_path=None): """ whether the remote hdfs path exists? - args: + Args: remote_file_path: default value(${OUTPUT_PATH}/${SYS_USER_ID}/${SYS_JOB_ID}/tmp) fs_name: The default values are the same as in the job configuration fs_ugi: The default values are the same as in the job configuration - return: + Returns: True or False """ @@ -207,15 +236,17 @@ class HDFSClient(object): return True def delete(self, hdfs_path): - """Remove a file or directory from HDFS. - - :param hdfs_path: HDFS path. - :param recursive: Recursively delete files and directories. By default, - this method will raise an :class:`HdfsError` if trying to delete a - non-empty directory. + """ + Remove a file or directory from HDFS. - This function returns `True` if the deletion was successful and `False` if - no file or directory previously existed at `hdfs_path`. + Args: + param hdfs_path: HDFS path. + param recursive: Recursively delete files and directories. By default, + this method will raise an :class:`HdfsError` if trying to delete a + non-empty directory. + Returns: + This function returns `True` if the deletion was successful and `False` if + no file or directory previously existed at `hdfs_path`. """ _logger.info('Deleting %r.', hdfs_path) @@ -241,14 +272,17 @@ class HDFSClient(object): return True def rename(self, hdfs_src_path, hdfs_dst_path, overwrite=False): - """Move a file or folder. - - :param hdfs_src_path: Source path. - :param hdfs_dst_path: Destination path. If the path already exists and is - a directory, the source will be moved into it. If the path exists and is - a file, or if a parent destination directory is missing, this method will - raise an :class:`HdfsError`. - + """ + Rename a file or folder. + Args: + :param hdfs_src_path: Source path. + :param hdfs_dst_path: Destination path. If the path already exists and is + a directory, the source will be moved into it. If the path exists and is + a file, or if a parent destination directory is missing, this method will + raise an :class:`HdfsError`. + Returns: + This function returns `True` if the rename was successful and `False` if + rename was faild. """ assert hdfs_src_path is not None assert hdfs_dst_path is not None @@ -274,6 +308,11 @@ class HDFSClient(object): @staticmethod def make_local_dirs(local_path): + """ + create a directiory local, is same to mkdir + Args: + local_path: local path that wants to create a directiory. + """ try: os.makedirs(local_path) except OSError as e: @@ -282,9 +321,11 @@ class HDFSClient(object): def makedirs(self, hdfs_path): """Create a remote directory, recursively if necessary. - - :param hdfs_path: Remote path. Intermediate directories will be created - appropriately. + Args: + :param hdfs_path: Remote path. Intermediate directories will be created + appropriately. + Returns: + True if make a directories was successful, False when make a directiries was failed. """ _logger.info('Creating directories to %r.', hdfs_path) assert hdfs_path is not None @@ -304,6 +345,13 @@ class HDFSClient(object): return True def ls(self, hdfs_path): + """ + ls a hdfs_path. + Args: + :param hdfs_path: hdfs_path will be ls. + Returns: + This function returns a `list` that contaion all files in the hdfs_path. + """ assert hdfs_path is not None if not self.is_exist(hdfs_path): @@ -329,6 +377,14 @@ class HDFSClient(object): return ret_lines def lsr(self, hdfs_path, only_file=True, sort=True): + """ + ls a hdfs_path sort by time. + Args: + :param hdfs_path: hdfs_path will be ls. + Returns: + This function returns a `list` that contaion all files sorted by time in the hdfs_path. + """ + def sort_by_time(v1, v2): v1_time = datetime.strptime(v1[1], '%Y-%m-%d %H:%M') v2_time = datetime.strptime(v2[1], '%Y-%m-%d %H:%M') @@ -372,12 +428,15 @@ def multi_upload(client, multi_processes=5, overwrite=False): """ - :param overwrite: will overwrite hdfs file or not - :param multi_processes: the upload data process at the same time, default=5 - :param client: instance of HDFSClient - :param hdfs_path: path on hdfs - :param local_path: path on local - :return: + Upload file to hdfs. + Args: + :param overwrite: will overwrite hdfs file or not + :param multi_processes: the upload data process at the same time, default=5 + :param client: instance of HDFSClient + :param hdfs_path: path on hdfs + :param local_path: path on local + Returns: + """ def __subprocess_upload(datas): @@ -387,6 +446,13 @@ def multi_upload(client, client.upload(hdfs_re_path, data, overwrite, retry_times=5) def get_local_files(path): + """ + Get all local files + Args: + path: local file path + Returns: + A list that contation all files in the path. + """ rlist = [] if not os.path.isdir(path): @@ -431,14 +497,17 @@ def multi_download(client, multi_processes=5): """ multi_download - :param client: instance of HDFSClient - :param hdfs_path: path on hdfs - :param local_path: path on local - :param trainer_id: current trainer id - :param trainers: all trainers number - :param file_cnt: all file number - :param multi_processes: the download data process at the same time, default=5 - :return: None + Args: + :param client: instance of HDFSClient + :param hdfs_path: path on hdfs + :param local_path: path on local + :param trainer_id: current trainer id + :param trainers: all trainers number + :param file_cnt: all file number + :param multi_processes: the download data process at the same time, default=5 + :return: None + Returns: + A list that be downloaded. """ def __subprocess_download(datas): diff --git a/python/paddle/fluid/distributed/helper.py b/python/paddle/fluid/distributed/helper.py index 986525e5d85883d7194b6413c50d6cbbd7b95f9b..ca6dd5dabfa1ea19da56187113335a81b090df86 100644 --- a/python/paddle/fluid/distributed/helper.py +++ b/python/paddle/fluid/distributed/helper.py @@ -15,13 +15,26 @@ from mpi4py import MPI import ps_pb2 as pslib + class FileSystem(object): - def __init__(self, fs_type="afs", + """ + A file system that support async_executor hadoop client desc. + + Args: + fs_type (string): fs_type, for example is "afs" + user (string): hadoop param + passwd (string): hadoop param + hadoop bin (string): hadoop param + Examples: + fs = FileSystm() + """ + + def __init__(self, + fs_type="afs", uri="afs://tianqi.afs.baidu.com:9902", user=None, passwd=None, - hadoop_bin="", - afs_conf=None): + hadoop_bin=""): assert user != None assert passwd != None assert hadoop_bin != None @@ -38,9 +51,22 @@ class FileSystem(object): #self.fs_client.afs_conf = afs_conf if not afs_conf else "" def get_desc(self): + """ + get hadoop desc. + """ return self.fs_client + class MPIHelper(object): + """ + MPIHelper is a wrapper of mpi4py, supprot get_rank get_size etc. + Args: + No params + Examples: + mh = MPIHelper() + mh.get_ip() + """ + def __init__(self): self.comm = MPI.COMM_WORLD @@ -61,5 +87,3 @@ class MPIHelper(object): def finalize(self): MPI.Finalize() - - diff --git a/python/paddle/fluid/distributed/node.py b/python/paddle/fluid/distributed/node.py index 87553230060d114599434ffa7cc2691b4605614f..117da9cff826172d0e2992fed5903e376aed4a51 100644 --- a/python/paddle/fluid/distributed/node.py +++ b/python/paddle/fluid/distributed/node.py @@ -13,17 +13,34 @@ import ps_pb2 as pslib + class Server(object): + """ + A Server basic class. + """ + def __init__(self): pass class Worker(object): + """ + A Worker basic class. + """ + def __init__(self): pass class DownpourServer(Server): + """ + DownpourServer class is used to generate server program_desc + Args: + server: it is pslib.ServerParameter() + Examples: + server = DownpourServer() + """ + def __init__(self): self.server_ = pslib.ServerParameter() self.server_.downpour_server_param.service_param.start_server_port = 0 @@ -33,8 +50,18 @@ class DownpourServer(Server): self.server_.downpour_server_param.service_param.start_server_port = 0 self.server_.downpour_server_param.service_param.server_thread_num = 12 - def add_sparse_table(self, table_id, learning_rate, - slot_key_vars, slot_value_var): + def add_sparse_table(self, table_id, learning_rate, slot_key_vars, + slot_value_var): + """ + Args: + table_id(int): id of sparse params table + learning_rate(float): the learning rate used to update parameters. \ + Can be a float value + slot_key_vars(string): slot key id + slot_value_var(string): slot key value after embedding + Returns: + return None + """ table = self.server_.downpour_server_param.downpour_table_param.add() table.table_id = table_id table.table_class = "DownpourSparseTable" @@ -44,10 +71,10 @@ class DownpourServer(Server): table.accessor.sparse_sgd_param.initial_g2sum = 3 table.accessor.sparse_sgd_param.initial_range = 1e-4 table.accessor.sparse_sgd_param.weight_bounds.extend([-10, 10]) - + table.accessor.embedx_dim = 8 table.accessor.embedx_threshold = 5 - table.accessor.fea_dim = 11 + table.accessor.fea_dim = 11 #table.accessor.fea_dim = abs(reduce(lambda x, y: x * y, # slot_value_var[0].shape, 1)) table.accessor.downpour_accessor_param.nonclk_coeff = 0.1 @@ -58,53 +85,99 @@ class DownpourServer(Server): table.accessor.downpour_accessor_param.show_click_decay_rate = 0.999 table.accessor.downpour_accessor_param.delete_threshold = 0.8 - def add_dense_table(self, table_id, learning_rate, - param_var, grad_var): + def add_dense_table(self, table_id, learning_rate, param_var, grad_var): + """ + Args: + table_id(int): id of sparse params table + learning_rate(float): the learning rate used to update parameters. \ + Can be a float value + param_var(list): all dense param. it is a list. + grad_var(list): all dense grad parm it is a list. + Returns: + return None + """ table = self.server_.downpour_server_param.downpour_table_param.add() table.table_id = table_id table.table_class = "DownpourDenseTable" table.type = pslib.PS_DENSE_TABLE table.accessor.accessor_class = "DownpourDenseValueAccessor" - table.accessor.dense_sgd_param.name = "adam" + table.accessor.dense_sgd_param.name = "adam" table.accessor.dense_sgd_param.adam.learning_rate = learning_rate - table.accessor.dense_sgd_param.adam.avg_decay_rate = 0.999993 - table.accessor.dense_sgd_param.adam.ada_decay_rate = 0.9999 + table.accessor.dense_sgd_param.adam.avg_decay_rate = 0.999993 + table.accessor.dense_sgd_param.adam.ada_decay_rate = 0.9999 table.accessor.dense_sgd_param.adam.ada_epsilon = 1e-8 table.accessor.dense_sgd_param.adam.mom_decay_rate = 0.99 table.accessor.dense_sgd_param.naive.learning_rate = 0.0002 fea_dim = 0 - for param in filter(lambda x: x.name.find("embedding") == -1, param_var): + for param in filter(lambda x: x.name.find("embedding") == -1, + param_var): fea_dim += reduce(lambda x, y: x * y, param.shape, 1) table.accessor.fea_dim = fea_dim def get_desc(self): + """ + Return downpour server program_desc + """ return self.server_ class DownpourWorker(Worker): + """ + DownpourWorker class is used to generate worker program_desc + Args: + window (int): push params frequency + worker: it is pslib.DownpourTrainerParameter + Examples: + worker = DownpourWorker(1) + """ + def __init__(self, window): self.window = window self.worker_ = pslib.DownpourTrainerParameter() #self.worker_.pull_dense_per_batch = window #self.worker_.push_dense_per_batch = window - def add_sparse_table(self, table_id, learning_rate, - slot_key_vars, slot_value_vars): + def add_sparse_table(self, table_id, learning_rate, slot_key_vars, + slot_value_vars): + """ + Args: + table_id(int): id of sparse params table + learning_rate(float): the learning rate used to update parameters. \ + Can be a float value + slot_key_vars(string): slot key id + slot_value_var(string): slot key value after embedding + Returns: + return None + """ table = self.worker_.sparse_table.add() table.table_id = table_id - table.slot_key.extend( - [var.name for var in slot_key_vars]) - table.slot_value.extend( - [var.name for var in slot_value_vars]) + table.slot_key.extend([var.name for var in slot_key_vars]) + table.slot_value.extend([var.name for var in slot_value_vars]) table.slot_gradient.extend( [var.name + "@GRAD" for var in slot_value_vars]) - def add_dense_table(self, table_id, learning_rate, - param_vars, grad_vars): + def add_dense_table(self, table_id, learning_rate, param_vars, grad_vars): + """ + Args: + table_id(int): id of sparse params table + learning_rate(float): the learning rate used to update parameters. \ + Can be a float value + param_var(list): all dense param. it is a list. + grad_var(list): all dense grad parm it is a list. + Returns: + return None + """ table = self.worker_.dense_table.add() table.table_id = table_id - table.dense_variable_name.extend(filter(lambda x: x.find("embedding") == -1, [p.name for p in param_vars])) - table.dense_gradient_variable_name.extend(filter(lambda x: x.find("embedding") == -1, [g.name for g in grad_vars])) + table.dense_variable_name.extend( + filter(lambda x: x.find("embedding") == -1, + [p.name for p in param_vars])) + table.dense_gradient_variable_name.extend( + filter(lambda x: x.find("embedding") == -1, + [g.name for g in grad_vars])) def get_desc(self): + """ + Return downpour worker program_desc + """ return self.worker_