diff --git a/core/engine/cluster/cluster.py b/core/engine/cluster/cluster.py index a64e99e38b2df3033e480706bedd02eadea1dc90..385738e08b374af0c92b992aac5760aab9d54132 100644 --- a/core/engine/cluster/cluster.py +++ b/core/engine/cluster/cluster.py @@ -19,10 +19,16 @@ import copy import os import subprocess import warnings +import sys +import logging from paddlerec.core.engine.engine import Engine from paddlerec.core.factory import TrainerFactory from paddlerec.core.utils import envs +import paddlerec.core.engine.cluster_utils as cluster_utils + +logger = logging.getLogger("root") +logger.propagate = False class ClusterEngine(Engine): @@ -47,8 +53,38 @@ class ClusterEngine(Engine): self.backend)) def start_worker_procs(self): - trainer = TrainerFactory.create(self.trainer) - trainer.run() + if (envs.get_runtime_environ("fleet_mode") == "COLLECTIVE"): + #trainer_ports = os.getenv("TRAINER_PORTS", None).split(",") + cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") + if cuda_visible_devices is None or cuda_visible_devices == "": + selected_gpus = range(int(os.getenv("TRAINER_GPU_CARD_COUNT"))) + else: + # change selected_gpus into relative values + # e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.selected_gpus=4,5,6,7; + # therefore selected_gpus=0,1,2,3 + cuda_visible_devices_list = cuda_visible_devices.split(',') + for x in range(int(os.getenv("TRAINER_GPU_CARD_COUNT"))): + assert x in cuda_visible_devices_list, "Can't find "\ + "your selected_gpus %s in CUDA_VISIBLE_DEVICES[%s]."\ + % (x, cuda_visible_devices) + selected_gpus = [cuda_visible_devices_list.index(x)] + print("selected_gpus:{}".format(selected_gpus)) + + factory = "paddlerec.core.factory" + cmd = [sys.executable, "-u", "-m", factory, self.trainer] + logs_dir = envs.get_runtime_environ("log_dir") + print("use_paddlecloud_flag:{}".format( + cluster_utils.use_paddlecloud())) + if cluster_utils.use_paddlecloud(): + cluster, pod = cluster_utils.get_cloud_cluster(selected_gpus) + logger.info("get cluster from cloud:{}".format(cluster)) + procs = cluster_utils.start_local_trainers( + cluster, pod, cmd, log_dir=logs_dir) + print("cluster:{}".format(cluster)) + print("pod:{}".format(pod)) + else: + trainer = TrainerFactory.create(self.trainer) + trainer.run() def start_master_procs(self): if self.backend == "PADDLECLOUD": diff --git a/core/engine/cluster_utils.py b/core/engine/cluster_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..10c11773cf78f0ded890b9d0709fe33695054aaf --- /dev/null +++ b/core/engine/cluster_utils.py @@ -0,0 +1,324 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import functools +import logging +import socket +import time +import os +import signal +import copy +import sys +import subprocess +from contextlib import closing +import socket + +logger = logging.getLogger("root") +logger.propagate = False + + +class Cluster(object): + def __init__(self, hdfs): + self.job_server = None + self.pods = [] + self.hdfs = None + self.job_stage_flag = None + + def __str__(self): + return "job_server:{} pods:{} job_stage_flag:{} hdfs:{}".format( + self.job_server, [str(pod) for pod in self.pods], + self.job_stage_flag, self.hdfs) + + def __eq__(self, cluster): + if len(self.pods) != len(cluster.pods): + return False + + for a, b in zip(self.pods, cluster.pods): + if a != b: + return False + + if self.job_stage_flag != cluster.job_stage_flag: + return False + + return True + + def __ne__(self, cluster): + return not self.__eq__(cluster) + + def update_pods(cluster): + self.pods = copy.copy(cluster.pods) + + def trainers_nranks(self): + return len(self.trainers_endpoints()) + + def pods_nranks(self): + return len(self.pods) + + def trainers_endpoints(self): + r = [] + for pod in self.pods: + for t in pod.trainers: + r.append(t.endpoint) + return r + + def pods_endpoints(self): + r = [] + for pod in self.pods: + ep = "{}:{}".format(pod.addr, pod.port) + assert pod.port != None and pod.addr != None, "{} not a valid endpoint".format( + ep) + r.append(ep) + + return r + + def get_pod_by_id(self, pod_id): + for pod in self.pods: + if str(pod_id) == str(pod.id): + return pod + + return None + + +class JobServer(object): + def __init__(self): + self.endpoint = None + + def __str__(self): + return "{}".format(self.endpoint) + + def __eq__(self, j): + return self.endpint == j.endpoint + + def __ne__(self, j): + return not self == j + + +class Trainer(object): + def __init__(self): + self.gpus = [] + self.endpoint = None + self.rank = None + + def __str__(self): + return "gpu:{} endpoint:{} rank:{}".format(self.gpus, self.endpoint, + self.rank) + + def __eq__(self, t): + if len(self.gpus) != len(t.gpus): + return False + + if self.endpoint != t.endpoint or \ + self.rank != t.rank: + return False + + for a, b in zip(self.gpus, t.gpus): + if a != b: + return False + + return True + + def __ne__(self, t): + return not self == t + + def rank(self): + return self.rank + + +class Pod(object): + def __init__(self): + self.rank = None + self.id = None + self.addr = None + self.port = None + self.trainers = [] + self.gpus = [] + + def __str__(self): + return "rank:{} id:{} addr:{} port:{} visible_gpu:{} trainers:{}".format( + self.rank, self.id, self.addr, self.port, self.gpus, + [str(t) for t in self.trainers]) + + def __eq__(self, pod): + if self.rank != pod.rank or \ + self.id != pod.id or \ + self.addr != pod.addr or \ + self.port != pod.port: + logger.debug("pod {} != pod".format(self, pod)) + return False + + if len(self.trainers) != len(pod.trainers): + logger.debug("trainers {} != {}".format(self.trainers, + pod.trainers)) + return False + + for i in range(len(self.trainers)): + if self.trainers[i] != pod.trainers[i]: + logger.debug("trainer {} != {}".format(self.trainers[i], + pod.trainers[i])) + return False + + return True + + def __ne__(self, pod): + return not self == pod + + def parse_response(self, res_pods): + pass + + def rank(self): + return self.rank + + def get_visible_gpus(self): + r = "" + for g in self.gpus: + r += "{},".format(g) + + assert r != "", "this pod {} can't see any gpus".format(self) + + r = r[:-1] + return r + + +def get_cluster(node_ips, node_ip, paddle_ports, selected_gpus): + assert type(paddle_ports) is list, "paddle_ports must be list" + cluster = Cluster(hdfs=None) + trainer_rank = 0 + for node_rank, ip in enumerate(node_ips): + pod = Pod() + pod.rank = node_rank + pod.addr = ip + for i in range(len(selected_gpus)): + trainer = Trainer() + trainer.gpus.append(selected_gpus[i]) + trainer.endpoint = "%s:%d" % (ip, paddle_ports[i]) + trainer.rank = trainer_rank + trainer_rank += 1 + + pod.trainers.append(trainer) + cluster.pods.append(pod) + + pod_rank = node_ips.index(node_ip) + return cluster, cluster.pods[pod_rank] + + +def get_cloud_cluster(selected_gpus, args_port=None): + #you can automatically get ip info while using paddlecloud multi nodes mode. + node_ips = os.getenv("PADDLE_TRAINERS") + assert node_ips is not None, "PADDLE_TRAINERS should not be None" + print("node_ips:{}".format(node_ips)) + node_ip = os.getenv("POD_IP") + assert node_ip is not None, "POD_IP should not be None" + print("node_ip:{}".format(node_ip)) + node_rank = os.getenv("PADDLE_TRAINER_ID") + assert node_rank is not None, "PADDLE_TRAINER_ID should not be None" + print("node_rank:{}".format(node_rank)) + node_ips = node_ips.split(",") + num_nodes = len(node_ips) + node_rank = int(node_rank) + + started_port = args_port + print("num_nodes:", num_nodes) + if num_nodes > 1: + try: + paddle_port = int(os.getenv("PADDLE_PORT", "")) + paddle_port_num = int(os.getenv("TRAINER_PORTS_NUM", "")) + + if paddle_port_num >= len( + selected_gpus) and paddle_port != args_port: + logger.warning("Use Cloud specified port:{}.".format( + paddle_port)) + started_port = paddle_port + + except Exception as e: + print(e) + pass + + if started_port is None: + started_port = 6170 + + logger.debug("parsed from args:node_ips:{} \ + node_ip:{} node_rank:{} started_port:{}" + .format(node_ips, node_ip, node_rank, started_port)) + + ports = [x for x in range(started_port, started_port + len(selected_gpus))] + cluster, pod = get_cluster(node_ips, node_ip, ports, selected_gpus) + return cluster, cluster.pods[node_rank] + + +def use_paddlecloud(): + node_ips = os.getenv("PADDLE_TRAINERS", None) + node_ip = os.getenv("POD_IP", None) + node_rank = os.getenv("PADDLE_TRAINER_ID", None) + if node_ips is None or node_ip is None or node_rank is None: + return False + else: + return True + + +class TrainerProc(object): + def __init__(self): + self.proc = None + self.log_fn = None + self.log_offset = None + self.rank = None + self.local_rank = None + self.cmd = None + + +def start_local_trainers(cluster, pod, cmd, log_dir=None): + current_env = copy.copy(os.environ.copy()) + #paddle broadcast ncclUniqueId use socket, and + #proxy maybe make trainers unreachable, so delete them. + #if we set them to "", grpc will log error message "bad uri" + #so just delete them. + current_env.pop("http_proxy", None) + current_env.pop("https_proxy", None) + + procs = [] + for idx, t in enumerate(pod.trainers): + proc_env = { + "FLAGS_selected_gpus": "%s" % ",".join([str(g) for g in t.gpus]), + "PADDLE_TRAINER_ID": "%d" % t.rank, + "PADDLE_CURRENT_ENDPOINT": "%s" % t.endpoint, + "PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(), + "PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()) + } + + current_env.update(proc_env) + + logger.debug("trainer proc env:{}".format(current_env)) + + # cmd = [sys.executable, "-u", training_script] + + logger.info("start trainer proc:{} env:{}".format(cmd, proc_env)) + + fn = None + if log_dir is not None: + os.system("mkdir -p {}".format(log_dir)) + fn = open("%s/workerlog.%d" % (log_dir, idx), "a") + proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn) + else: + proc = subprocess.Popen(cmd, env=current_env) + + tp = TrainerProc() + tp.proc = proc + tp.rank = t.rank + tp.local_rank = idx + tp.log_fn = fn + tp.log_offset = fn.tell() if fn else None + tp.cmd = cmd + + procs.append(proc) + + return procs diff --git a/core/engine/local_cluster.py b/core/engine/local_cluster.py index 88f21ef8bf7218a4b83db265ad534ad2266561a9..b6ff736eefc107305649bf3c9b9b5cb831f0fcb5 100755 --- a/core/engine/local_cluster.py +++ b/core/engine/local_cluster.py @@ -19,9 +19,14 @@ import copy import os import sys import subprocess +import logging from paddlerec.core.engine.engine import Engine from paddlerec.core.utils import envs +import paddlerec.core.engine.cluster_utils as cluster_utils + +logger = logging.getLogger("root") +logger.propagate = False class LocalClusterEngine(Engine): @@ -97,42 +102,70 @@ class LocalClusterEngine(Engine): stderr=fn, cwd=os.getcwd()) procs.append(proc) + elif fleet_mode.upper() == "COLLECTIVE": - selected_gpus = self.envs["selected_gpus"].split(",") + cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") + if cuda_visible_devices is None or cuda_visible_devices == "": + selected_gpus = [ + x.strip() for x in self.envs["selected_gpus"].split(",") + ] + else: + # change selected_gpus into relative values + # e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.selected_gpus=4,5,6,7; + # therefore selected_gpus=0,1,2,3 + cuda_visible_devices_list = cuda_visible_devices.split(',') + for x in self.envs["selected_gpus"].split(","): + assert x in cuda_visible_devices_list, "Can't find "\ + "your selected_gpus %s in CUDA_VISIBLE_DEVICES[%s]."\ + % (x, cuda_visible_devices) + selected_gpus = [ + cuda_visible_devices_list.index(x.strip()) + for x in self.envs["selected_gpus"].split(",") + ] selected_gpus_num = len(selected_gpus) - for i in range(selected_gpus_num - 1): - while True: - new_port = envs.find_free_port() - if new_port not in ports: - ports.append(new_port) - break - user_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports]) - factory = "paddlerec.core.factory" cmd = [sys.executable, "-u", "-m", factory, self.trainer] - for i in range(selected_gpus_num): - current_env.update({ - "PADDLE_TRAINER_ENDPOINTS": user_endpoints, - "PADDLE_CURRENT_ENDPOINTS": user_endpoints[i], - "PADDLE_TRAINERS_NUM": str(worker_num), - "TRAINING_ROLE": "TRAINER", - "PADDLE_TRAINER_ID": str(i), - "FLAGS_selected_gpus": str(selected_gpus[i]), - "PADDLEREC_GPU_NUMS": str(selected_gpus_num) - }) - - os.system("mkdir -p {}".format(logs_dir)) - fn = open("%s/worker.%d" % (logs_dir, i), "w") - log_fns.append(fn) - proc = subprocess.Popen( - cmd, - env=current_env, - stdout=fn, - stderr=fn, - cwd=os.getcwd()) - procs.append(proc) + print("use_paddlecloud_flag:{}".format( + cluster_utils.use_paddlecloud())) + if cluster_utils.use_paddlecloud(): + cluster, pod = cluster_utils.get_cloud_cluster(selected_gpus) + logger.info("get cluster from cloud:{}".format(cluster)) + procs = cluster_utils.start_local_trainers( + cluster, pod, cmd, log_dir=logs_dir) + + else: + # trainers_num = 1 or not use paddlecloud ips="a,b" + for i in range(selected_gpus_num - 1): + while True: + new_port = envs.find_free_port() + if new_port not in ports: + ports.append(new_port) + break + user_endpoints = ",".join( + ["127.0.0.1:" + str(x) for x in ports]) + for i in range(selected_gpus_num): + current_env.update({ + "PADDLE_TRAINER_ENDPOINTS": user_endpoints, + "PADDLE_CURRENT_ENDPOINTS": user_endpoints[i], + "PADDLE_TRAINERS_NUM": str(worker_num), + "TRAINING_ROLE": "TRAINER", + "PADDLE_TRAINER_ID": str(i), + "FLAGS_selected_gpus": str(selected_gpus[i]), + "PADDLEREC_GPU_NUMS": str(selected_gpus_num) + }) + + os.system("mkdir -p {}".format(logs_dir)) + fn = open("%s/worker.%d" % (logs_dir, i), "w") + log_fns.append(fn) + proc = subprocess.Popen( + cmd, + env=current_env, + stdout=fn, + stderr=fn, + cwd=os.getcwd()) + procs.append(proc) # only wait worker to finish here for i, proc in enumerate(procs): diff --git a/core/trainer.py b/core/trainer.py index bbba6250529283d24389e2719b7110f8aa321973..8951e69f6f2a69270e1688979533b0346d27b521 100755 --- a/core/trainer.py +++ b/core/trainer.py @@ -76,9 +76,6 @@ class Trainer(object): _config = envs.load_yaml(config) - self._context["env"] = _config - self._context["dataset"] = _config.get("dataset") - phases = [] if phase_names is None: phases = _config.get("phase") @@ -86,8 +83,10 @@ class Trainer(object): for phase in _config.get("phase"): if phase["name"] in phase_names: phases.append(phase) - self._context["phases"] = phases + _config["phase"] = phases + self._context["env"] = _config + self._context["dataset"] = _config.get("dataset") print("PaddleRec: Runner {} Begin".format(self._runner_name)) self.which_engine() self.which_device() diff --git a/core/trainers/framework/network.py b/core/trainers/framework/network.py index 7d7a8273b6a402bd163f653a7beb3900de899ae3..2a9a3a4003f36627aff4fe7ab4f86a4979c46525 100644 --- a/core/trainers/framework/network.py +++ b/core/trainers/framework/network.py @@ -238,8 +238,8 @@ class PSNetwork(NetworkBase): else: context["fleet"].init_worker() context["dataset"] = {} - for dataset in context["env"]["dataset"]: - type = envs.get_global_env("dataset." + dataset["name"] + + for phase in context["env"]["phase"]: + type = envs.get_global_env("dataset." + phase["dataset_name"] + ".type") if type == "DataLoader": data_loader = DataLoader(context) @@ -247,9 +247,9 @@ class PSNetwork(NetworkBase): model._data_loader) elif type == "QueueDataset": dataset_class = QueueDataset(context) - context["dataset"][dataset[ - "name"]] = dataset_class.create_dataset( - dataset["name"], context) + context["dataset"][phase[ + "dataset_name"]] = dataset_class.create_dataset( + phase["dataset_name"], context) context["status"] = "startup_pass" def _build_strategy(self, context): @@ -336,7 +336,7 @@ class PslibNetwork(NetworkBase): self._server(context) else: context["dataset"] = {} - for dataset in context["env"]["dataset"]: + for phase in context["env"]["phase"]: type = envs.get_global_env("dataset." + dataset["name"] + ".type") if type == "DataLoader": @@ -363,6 +363,7 @@ class CollectiveNetwork(NetworkBase): def build_network(self, context): context["model"] = {} if len(context["env"]["phase"]) > 1: + print("CollectiveNetwork phase:{}".format(context["env"]["phase"])) warnings.warn( "Cluster Train Only Support One Phase.", category=UserWarning, @@ -407,16 +408,17 @@ class CollectiveNetwork(NetworkBase): context["model"][model_dict["name"]]["compiled_program"] = None context["dataset"] = {} - for dataset in context["env"]["dataset"]: - type = envs.get_global_env("dataset." + dataset["name"] + ".type") + for phase in context["env"]["phase"]: + type = envs.get_global_env("dataset." + phase["dataset_name"] + + ".type") if type == "QueueDataset": raise ValueError( "Collective don't support QueueDataset training, please use DataLoader." ) dataset_class = QueueDataset(context) - context["dataset"][dataset[ - "name"]] = dataset_class.create_dataset(dataset["name"], - context) + context["dataset"][phase[ + "dataset_name"]] = dataset_class.create_dataset( + phase["dataset_name"], context) context["status"] = "startup_pass" def _build_strategy(self, context): diff --git a/core/trainers/framework/runner.py b/core/trainers/framework/runner.py index 839e3ed4d6e04b13f69e6c2cfc463e83aef130f7..b837cd676d80f2cddddf5c2079c5f39e96a21ab7 100644 --- a/core/trainers/framework/runner.py +++ b/core/trainers/framework/runner.py @@ -209,9 +209,13 @@ class RunnerBase(object): if save_step_interval >= 1 and batch_id % save_step_interval == 0 and context[ "is_infer"] == False: - if context["fleet_mode"].upper() == "PS": - train_prog = context["model"][model_dict["name"]][ - "main_program"] + if context["is_fleet"]: + if context["fleet_mode"].upper() == "PS": + train_prog = context["model"][model_dict[ + "name"]]["main_program"] + else: + train_prog = context["model"][model_dict[ + "name"]]["default_main_program"] else: train_prog = context["model"][model_dict["name"]][ "default_main_program"] @@ -432,9 +436,11 @@ class RunnerBase(object): dirname = envs.get_global_env(name + "save_step_path", None) if dirname is None or dirname == "": return - dirname = os.path.join(dirname, str(batch_id)) - logging.info("\tsave batch_id:%d model into: \"%s\"" % - (batch_id, dirname)) + dirname = os.path.join(dirname, + "epoch_" + str(context["current_epoch"]) + + "_batch_" + str(batch_id)) + logging.info("\tsave epoch_id:%d, batch_id:%d model into: \"%s\"" % + (context["current_epoch"], batch_id, dirname)) if is_fleet: if context["fleet"].worker_index() == 0: context["fleet"].save_persistables(context["exe"], dirname) diff --git a/doc/custom_reader.md b/doc/custom_reader.md new file mode 100644 index 0000000000000000000000000000000000000000..c392a5ea53c0da59492956a7c64cd13627dcb36a --- /dev/null +++ b/doc/custom_reader.md @@ -0,0 +1,309 @@ +# PaddleRec 自定义数据集及Reader + +用户自定义数据集及配置异步Reader,需要关注以下几个步骤: + +* [数据集整理](#数据集整理) +* [在模型组网中加入输入占位符](#在模型组网中加入输入占位符) +* [Reader实现](#Reader的实现) +* [在yaml文件中配置Reader](#在yaml文件中配置reader) + +我们以CTR-DNN模型为例,给出了从数据整理,变量定义,Reader写法,调试的完整历程。 + +* [数据及Reader示例-DNN](#数据及Reader示例-DNN) + + +## 数据集整理 + +PaddleRec支持模型自定义数据集。 + +关于数据的tips: +1. 数据量: + + PaddleRec面向大规模数据设计,可以轻松支持亿级的数据读取,工业级的数据读写api:`dataset`在搜索、推荐、信息流等业务得到了充分打磨。 +2. 文件类型: + + 支持任意直接可读的文本数据,`dataset`同时支持`.gz`格式的文本压缩数据,无需额外代码,可直接读取。数据样本应以`\n`为标志,按行组织。 + +3. 文件存放位置: + + 文件通常存放在训练节点本地,但同时,`dataset`支持使用`hadoop`远程读取数据,数据无需下载到本地,为dataset配置hadoop相关账户及地址即可。 +4. 数据类型 + + Reader处理的是以行为单位的`string`数据,喂入网络的数据需要转为`int`,`float`的数值数据,不支持`string`喂入网络,不建议明文保存及处理训练数据。 +5. Tips + + Dataset模式下,训练线程与数据读取线程的关系强相关,为了多线程充分利用,`强烈建议将文件合理的拆为多个小文件`,尤其是在分布式训练场景下,可以均衡各个节点的数据量,同时加快数据的下载速度。 + +## 在模型组网中加入输入占位符 + +Reader读取文件后,产出的数据喂入网络,需要有占位符进行接收。占位符在Paddle中使用`fluid.data`或`fluid.layers.data`进行定义。`data`的定义可以参考[fluid.data](https://www.paddlepaddle.org.cn/documentation/docs/zh/api_cn/fluid_cn/data_cn.html#data)以及[fluid.layers.data](https://www.paddlepaddle.org.cn/documentation/docs/zh/api_cn/layers_cn/data_cn.html#data)。 + +加入您希望输入三个数据,分别是维度32的数据A,维度变长的稀疏数据B,以及一个一维的标签数据C,并希望梯度可以经过该变量向前传递,则示例如下: + +数据A的定义: +```python +var_a = fluid.data(name='A', shape= [-1, 32], dtype='float32') +``` + +数据B的定义,变长数据的使用可以参考[LoDTensor](https://www.paddlepaddle.org.cn/documentation/docs/zh/beginners_guide/basic_concept/lod_tensor.html#cn-user-guide-lod-tensor): +```python +var_b = fluid.data(name='B', shape=[-1, 1], lod_level=1, dtype='int64') +``` + +数据C的定义: +```python +var_c = fluid.data(name='C', shape=[-1, 1], dtype='int32') +var_c.stop_gradient = False +``` + +当我们完成以上三个数据的定义后,在PaddleRec的模型定义中,还需将其加入model基类成员变量`self._data_var` + +```python +self._data_var.append(var_a) +self._data_var.append(var_b) +self._data_var.append(var_c) +``` +至此,我们完成了在组网中定义输入数据的工作。 + +## Reader的实现 + +### Reader的实现范式 + +Reader的逻辑需要一个单独的python文件进行描述。我们试写一个`test_reader.py`,实现的具体流程如下: +1. 首先我们需要引入Reader基类 + + ```python + from paddlerec.core.reader import ReaderBase + ``` +2. 创建一个子类,继承Reader的基类,训练所需Reader命名为`TrainerReader` + ```python + class Reader(ReaderBase): + def init(self): + pass + + def generator_sample(self, line): + pass + ``` + +3. 在`init(self)`函数中声明一些在数据读取中会用到的变量,必要时可以在`config.yaml`文件中配置变量,利用`env.get_global_env()`拿到。 + + 比如,我们希望从yaml文件中读取一个数据预处理变量`avg=10`,目的是将数据A的数据缩小10倍,可以这样实现: + + 首先更改yaml文件,在某个hyper_parameters下加入该变量 + + ```yaml + ... + hyper_parameters: + reader: + avg: 10 + ... + ``` + + + 再更改Reader的init函数 + + ```python + from paddlerec.core.utils import envs + class Reader(ReaderBase): + def init(self): + self.avg = envs.get_global_env("avg", None, "hyper_parameters.reader") + + def generator_sample(self, line): + pass + ``` + +4. 继承并实现基类中的`generate_sample(self, line)`函数,逐行读取数据。 + - 该函数应返回一个可以迭代的reader方法(带有yield的函数不再是一个普通的函数,而是一个生成器generator,成为了可以迭代的对象,等价于一个数组、链表、文件、字符串etc.) + - 在这个可以迭代的函数中,如示例代码中的`def reader()`,我们定义数据读取的逻辑。以行为单位的数据进行截取,转换及预处理。 + - 最后,我们需要将数据整理为特定的格式,才能够被PaddleRec的Reader正确读取,并灌入的训练的网络中。简单来说,数据的输出顺序与我们在网络中创建的`inputs`必须是严格一一对应的,并转换为类似字典的形式。 + + 示例: 假设数据ABC在文本数据中,每行以这样的形式存储: + ```shell + 0.1,0.2,0.3...3.0,3.1,3.2 \t 99999,99998,99997 \t 1 \n + ``` + + 则示例代码如下: + ```python + from paddlerec.core.utils import envs + class Reader(ReaderBase): + def init(self): + self.avg = envs.get_global_env("avg", None, "hyper_parameters.reader") + + def generator_sample(self, line): + + def reader(self, line): + # 先分割 '\n', 再以 '\t'为标志分割为list + variables = (line.strip('\n')).split('\t') + + # A是第一个元素,并且每个数据之间使用','分割 + var_a = variables[0].split(',') # list + var_a = [float(i) / self.avg for i in var_a] # 将str数据转换为float + + + # B是第二个元素,同样以 ',' 分割 + var_b = variables[1].split(',') # list + var_b = [int(i) for i in var_b] # 将str数据转换为int + + # C是第三个元素, 只有一个元素,没有分割符 + var_c = variables[2] + var_c = int(var_c) # 将str数据转换为int + var_c = [var_c] # 将单独的数据元素置入list中 + + # 将数据与数据名结合,组织为dict的形式 + # 如下,output形式为{ A: var_a, B: var_b, C: var_c} + variable_name = ['A', 'B', 'C'] + output = zip(variable_name, [var_a] + [var_b] + [var_c]) + + # 将数据输出,使用yield方法,将该函数变为了一个可迭代的对象 + yield output + + ``` + + 至此,我们完成了Reader的实现。 + + +### 在yaml文件中配置Reader + +在模型的yaml配置文件中,主要的修改是三个,如下 + +```yaml +reader: + batch_size: 2 + class: "{workspace}/criteo_reader.py" + train_data_path: "{workspace}/data/train_data" +``` + +batch_size: 顾名思义,是小批量训练时的样本大小 +class: 运行改模型所需reader的路径 +train_data_path: 训练数据所在文件夹 +reader_debug_mode: 测试reader语法,及输出是否符合预期的debug模式的开关 + + +## 数据及Reader示例-DNN + + +### Criteo数据集格式 + +CTR-DNN训练及测试数据集选用[Display Advertising Challenge](https://www.kaggle.com/c/criteo-display-ad-challenge/)所用的Criteo数据集。该数据集包括两部分:训练集和测试集。训练集包含一段时间内Criteo的部分流量,测试集则对应训练数据后一天的广告点击流量。 +每一行数据格式如下所示: +```bash +