From 31bbef3d379700b215701c589bb388e4c7f20e03 Mon Sep 17 00:00:00 2001 From: tangwei Date: Wed, 15 Apr 2020 18:13:43 +0800 Subject: [PATCH] structure rebuild --- fleetrec/check.py | 8 ++ fleetrec/core/{trainer => engine}/__init__.py | 0 .../details => engine}/local_engine.py | 0 fleetrec/core/factory.py | 25 +------ fleetrec/core/layer.py | 2 +- fleetrec/core/metrics/auc_metrics.py | 2 +- fleetrec/core/models/modul/build.py | 12 ++- fleetrec/core/models/modul/layers.py | 2 +- fleetrec/core/reader.py | 2 +- .../{trainer/details => trainers}/__init__.py | 0 .../{trainer => trainers}/cluster_trainer.py | 9 +-- .../core/{trainer => trainers}/ctr_trainer.py | 13 ++-- .../{trainer => trainers}/single_trainer.py | 0 .../transpiler_trainer.py | 0 fleetrec/core/utils/dataset.py | 61 +++++++-------- fleetrec/core/utils/envs.py | 1 - fleetrec/core/utils/fs.py | 1 - fleetrec/core/utils/reader_instance.py | 2 +- fleetrec/core/utils/util.py | 3 +- fleetrec/examples/built_in/run.py | 24 ------ .../examples/built_in/single_training.yaml | 2 - .../examples/built_in/user_define_trainer.py | 4 +- fleetrec/models/ctr_dnn/model.py | 4 +- fleetrec/models/ctr_dnn/reader.py | 4 +- fleetrec/run.py | 74 +++++++++++++++++++ 25 files changed, 148 insertions(+), 107 deletions(-) create mode 100644 fleetrec/check.py rename fleetrec/core/{trainer => engine}/__init__.py (100%) mode change 100755 => 100644 rename fleetrec/core/{trainer/details => engine}/local_engine.py (100%) rename fleetrec/core/{trainer/details => trainers}/__init__.py (100%) mode change 100644 => 100755 rename fleetrec/core/{trainer => trainers}/cluster_trainer.py (94%) rename fleetrec/core/{trainer => trainers}/ctr_trainer.py (98%) rename fleetrec/core/{trainer => trainers}/single_trainer.py (100%) rename fleetrec/core/{trainer/details => trainers}/transpiler_trainer.py (100%) delete mode 100644 fleetrec/examples/built_in/run.py delete mode 100644 fleetrec/examples/built_in/single_training.yaml create mode 100644 fleetrec/run.py diff --git a/fleetrec/check.py b/fleetrec/check.py new file mode 100644 index 00000000..d4177fd8 --- /dev/null +++ b/fleetrec/check.py @@ -0,0 +1,8 @@ +import argparse + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='fleet-rec check') + parser.add_argument("--model", type=str) + parser.add_argument("--engine", type=str) + + print("coming soon") diff --git a/fleetrec/core/trainer/__init__.py b/fleetrec/core/engine/__init__.py old mode 100755 new mode 100644 similarity index 100% rename from fleetrec/core/trainer/__init__.py rename to fleetrec/core/engine/__init__.py diff --git a/fleetrec/core/trainer/details/local_engine.py b/fleetrec/core/engine/local_engine.py similarity index 100% rename from fleetrec/core/trainer/details/local_engine.py rename to fleetrec/core/engine/local_engine.py diff --git a/fleetrec/core/factory.py b/fleetrec/core/factory.py index 2d6655b3..292a2e4c 100644 --- a/fleetrec/core/factory.py +++ b/fleetrec/core/factory.py @@ -16,13 +16,13 @@ import os import sys import yaml + from fleetrec.trainer.local_engine import Launch from fleetrec.trainer.single_trainer import SingleTrainer from fleetrec.trainer.cluster_trainer import ClusterTrainer from fleetrec.trainer.ctr_trainer import CtrPaddleTrainer from fleetrec.utils import envs -from fleetrec.utils import util class TrainerFactory(object): @@ -52,19 +52,6 @@ class TrainerFactory(object): raise ValueError("trainer only support SingleTraining/ClusterTraining") return trainer - @staticmethod - def _build_engine(yaml_config): - cluster_envs = {} - cluster_envs["server_num"] = envs.get_global_env("train.pserver_num") - cluster_envs["worker_num"] = envs.get_global_env("train.pserver_num") - cluster_envs["start_port"] = envs.get_global_env("train.start_port") - cluster_envs["log_dir"] = envs.get_global_env("train.log_dirname") - - print(envs.pretty_print_envs(cluster_envs, ("Cluster Global Envs", "Value"))) - - launch = Launch(cluster_envs, yaml_config) - return launch - @staticmethod def create(config): _config = None @@ -75,15 +62,7 @@ class TrainerFactory(object): raise ValueError("fleetrec's config only support yaml") envs.set_global_envs(_config) - mode = envs.get_global_env("train.trainer") - container = envs.get_global_env("train.container") - instance = util.str2bool(os.getenv("CLUSTER_INSTANCE", "0")) - - if mode == "ClusterTraining" and container == "local" and not instance: - trainer = TrainerFactory._build_engine(config) - else: - trainer = TrainerFactory._build_trainer(_config, config) - + trainer = TrainerFactory._build_trainer(_config, config) return trainer diff --git a/fleetrec/core/layer.py b/fleetrec/core/layer.py index 98debbac..eb68c862 100644 --- a/fleetrec/core/layer.py +++ b/fleetrec/core/layer.py @@ -16,9 +16,9 @@ import abc import copy import yaml import paddle.fluid as fluid -from ..utils import table as table from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet +from fleetrec.core.utils import table as table class Layer(object): """R diff --git a/fleetrec/core/metrics/auc_metrics.py b/fleetrec/core/metrics/auc_metrics.py index 4c937bde..f2f68890 100644 --- a/fleetrec/core/metrics/auc_metrics.py +++ b/fleetrec/core/metrics/auc_metrics.py @@ -15,7 +15,7 @@ import math import numpy as np import paddle.fluid as fluid -from .base import Metric +from fleetrec.core.metric import Metric class AUCMetric(Metric): diff --git a/fleetrec/core/models/modul/build.py b/fleetrec/core/models/modul/build.py index c4eee6f2..51b83e5f 100644 --- a/fleetrec/core/models/modul/build.py +++ b/fleetrec/core/models/modul/build.py @@ -1,3 +1,11 @@ +import yaml +import copy +import paddle.fluid as fluid +from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet + +from fleetrec.core.model import Model +from fleetrec.core.utils import table + def create(config): """ @@ -14,14 +22,14 @@ def create(config): return model -class YamlModel(ModelBase): +class YamlModel(Model): """R """ def __init__(self, config): """R """ - ModelBase.__init__(self, config) + Model.__init__(self, config) self._config = config self._name = config['name'] f = open(config['layer_file'], 'r') diff --git a/fleetrec/core/models/modul/layers.py b/fleetrec/core/models/modul/layers.py index 6f79e64d..84a08ee8 100644 --- a/fleetrec/core/models/modul/layers.py +++ b/fleetrec/core/models/modul/layers.py @@ -13,7 +13,7 @@ # limitations under the License. import paddle.fluid as fluid -from .base import Layer +from fleetrec.core.layer import Layer class EmbeddingInputLayer(Layer): diff --git a/fleetrec/core/reader.py b/fleetrec/core/reader.py index cf217e0d..77a67ed7 100644 --- a/fleetrec/core/reader.py +++ b/fleetrec/core/reader.py @@ -19,7 +19,7 @@ import os import paddle.fluid.incubate.data_generator as dg import yaml -from fleetrec.utils import envs +from fleetrec.core.utils import envs class Reader(dg.MultiSlotDataGenerator): diff --git a/fleetrec/core/trainer/details/__init__.py b/fleetrec/core/trainers/__init__.py old mode 100644 new mode 100755 similarity index 100% rename from fleetrec/core/trainer/details/__init__.py rename to fleetrec/core/trainers/__init__.py diff --git a/fleetrec/core/trainer/cluster_trainer.py b/fleetrec/core/trainers/cluster_trainer.py similarity index 94% rename from fleetrec/core/trainer/cluster_trainer.py rename to fleetrec/core/trainers/cluster_trainer.py index f9b41a19..c8d05d05 100644 --- a/fleetrec/core/trainer/cluster_trainer.py +++ b/fleetrec/core/trainers/cluster_trainer.py @@ -17,19 +17,14 @@ Training use fluid with one node only. """ from __future__ import print_function -import logging import paddle.fluid as fluid from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory from paddle.fluid.incubate.fleet.base.role_maker import PaddleCloudRoleMaker -from ..utils import envs -from .transpiler_trainer import TranspileTrainer - -logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s") -logger = logging.getLogger("fluid") -logger.setLevel(logging.INFO) +from fleetrec.core.utils import envs +from fleetrec.core.trainers.transpiler_trainer import TranspileTrainer class ClusterTrainer(TranspileTrainer): diff --git a/fleetrec/core/trainer/ctr_trainer.py b/fleetrec/core/trainers/ctr_trainer.py similarity index 98% rename from fleetrec/core/trainer/ctr_trainer.py rename to fleetrec/core/trainers/ctr_trainer.py index a83b3902..bc923ff8 100755 --- a/fleetrec/core/trainer/ctr_trainer.py +++ b/fleetrec/core/trainers/ctr_trainer.py @@ -23,12 +23,13 @@ import paddle.fluid as fluid from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker -from fleetrec.utils import fs as fs -from fleetrec.utils import util as util -from fleetrec.metrics.auc_metrics import AUCMetric -from fleetrec.models import base as model_basic -from fleetrec.reader import dataset -from .trainer import Trainer + +from fleetrec.core.utils import fs as fs +from fleetrec.core.utils import util as util +from fleetrec.core.metrics.auc_metrics import AUCMetric +from fleetrec.core.models.modul import build as model_basic +from fleetrec.core.utils import dataset +from fleetrec.core.trainer import Trainer def wroker_numric_opt(value, env, opt): diff --git a/fleetrec/core/trainer/single_trainer.py b/fleetrec/core/trainers/single_trainer.py similarity index 100% rename from fleetrec/core/trainer/single_trainer.py rename to fleetrec/core/trainers/single_trainer.py diff --git a/fleetrec/core/trainer/details/transpiler_trainer.py b/fleetrec/core/trainers/transpiler_trainer.py similarity index 100% rename from fleetrec/core/trainer/details/transpiler_trainer.py rename to fleetrec/core/trainers/transpiler_trainer.py diff --git a/fleetrec/core/utils/dataset.py b/fleetrec/core/utils/dataset.py index 66450aed..92cbe891 100755 --- a/fleetrec/core/utils/dataset.py +++ b/fleetrec/core/utils/dataset.py @@ -13,13 +13,13 @@ # limitations under the License. import abc -import copy -import yaml import time import datetime + import paddle.fluid as fluid -from .. utils import fs as fs -from .. utils import util as util + +from fleetrec.core.utils import fs as fs +from fleetrec.core.utils import util as util class Dataset(object): @@ -27,12 +27,13 @@ class Dataset(object): Dataset Base """ __metaclass__ = abc.ABCMeta + def __init__(self, config): """ """ self._datasets = {} self._config = config - + @abc.abstractmethod def check_ready(self, params): """ @@ -43,19 +44,19 @@ class Dataset(object): pass @abc.abstractmethod - def load_dataset(self, params): + def load_dataset(self, params): """R """ pass - + @abc.abstractmethod - def preload_dataset(self, params): + def preload_dataset(self, params): """R """ pass - + @abc.abstractmethod - def release_dataset(self, params): + def release_dataset(self, params): """R """ pass @@ -65,23 +66,24 @@ class TimeSplitDataset(Dataset): """ Dataset with time split dir. root_path/$DAY/$HOUR """ + def __init__(self, config): """ init data root_path, time_split_interval, data_path_format """ Dataset.__init__(self, config) if 'data_donefile' not in config or config['data_donefile'] is None: - config['data_donefile'] = config['data_path'] + "/to.hadoop.done" + config['data_donefile'] = config['data_path'] + "/to.hadoop.done" self._path_generator = util.PathGenerator({'templates': [ - {'name': 'data_path', 'template': config['data_path']}, - {'name': 'donefile_path', 'template': config['data_donefile']} + {'name': 'data_path', 'template': config['data_path']}, + {'name': 'donefile_path', 'template': config['data_donefile']} ]}) - self._split_interval = config['split_interval'] # data split N mins per dir + self._split_interval = config['split_interval'] # data split N mins per dir self._data_file_handler = fs.FileHandler(config) def _format_data_time(self, daytime_str, time_window_mins): """ """ - data_time = util.make_datetime(daytime_str) + data_time = util.make_datetime(daytime_str) mins_of_day = data_time.hour * 60 + data_time.minute begin_stage = mins_of_day / self._split_interval end_stage = (mins_of_day + time_window_mins) / self._split_interval @@ -91,9 +93,9 @@ class TimeSplitDataset(Dataset): if mins_of_day % self._split_interval != 0: skip_mins = self._split_interval - (mins_of_day % self._split_interval) data_time = data_time + datetime.timedelta(minutes=skip_mins) - time_window_mins = time_window_mins - skip_mins + time_window_mins = time_window_mins - skip_mins return data_time, time_window_mins - + def check_ready(self, daytime_str, time_window_mins): """ data in [daytime_str, daytime_str + time_window_mins] is ready or not @@ -106,14 +108,14 @@ class TimeSplitDataset(Dataset): is_ready = True data_time, windows_mins = self._format_data_time(daytime_str, time_window_mins) while time_window_mins > 0: - file_path = self._path_generator.generate_path('donefile_path', {'time_format': data_time}) + file_path = self._path_generator.generate_path('donefile_path', {'time_format': data_time}) if not self._data_file_handler.is_exist(file_path): is_ready = False break time_window_mins = time_window_mins - self._split_interval data_time = data_time + datetime.timedelta(minutes=self._split_interval) return is_ready - + def get_file_list(self, daytime_str, time_window_mins, node_num=1, node_idx=0): """ data in [daytime_str, daytime_str + time_window_mins], random shard to node_num, return shard[node_idx] @@ -128,7 +130,7 @@ class TimeSplitDataset(Dataset): data_file_list = [] data_time, windows_mins = self._format_data_time(daytime_str, time_window_mins) while time_window_mins > 0: - file_path = self._path_generator.generate_path('data_path', {'time_format': data_time}) + file_path = self._path_generator.generate_path('data_path', {'time_format': data_time}) sub_file_list = self._data_file_handler.ls(file_path) for sub_file in sub_file_list: sub_file_name = self._data_file_handler.get_file_name(sub_file) @@ -138,17 +140,18 @@ class TimeSplitDataset(Dataset): data_file_list.append(sub_file) time_window_mins = time_window_mins - self._split_interval data_time = data_time + datetime.timedelta(minutes=self._split_interval) - return data_file_list - + return data_file_list + class FluidTimeSplitDataset(TimeSplitDataset): """ A Dataset with time split for PaddleFluid """ + def __init__(self, config): """ """ TimeSplitDataset.__init__(self, config) - + def _alloc_dataset(self, file_list): """ """ dataset = fluid.DatasetFactory().create_dataset(self._config['dataset_type']) @@ -158,12 +161,12 @@ class FluidTimeSplitDataset(TimeSplitDataset): dataset.set_pipe_command(self._config['data_converter']) dataset.set_filelist(file_list) dataset.set_use_var(self._config['data_vars']) - #dataset.set_fleet_send_sleep_seconds(2) - #dataset.set_fleet_send_batch_size(80000) + # dataset.set_fleet_send_sleep_seconds(2) + # dataset.set_fleet_send_batch_size(80000) return dataset def load_dataset(self, params): - """ """ + """ """ begin_time = params['begin_time'] windown_min = params['time_window_min'] if begin_time not in self._datasets: @@ -176,8 +179,8 @@ class FluidTimeSplitDataset(TimeSplitDataset): else: self._datasets[begin_time].wait_preload_done() return self._datasets[begin_time] - - def preload_dataset(self, params): + + def preload_dataset(self, params): """ """ begin_time = params['begin_time'] windown_min = params['time_window_min'] @@ -189,7 +192,7 @@ class FluidTimeSplitDataset(TimeSplitDataset): return True return False - def release_dataset(self, params): + def release_dataset(self, params): """ """ begin_time = params['begin_time'] windown_min = params['time_window_min'] diff --git a/fleetrec/core/utils/envs.py b/fleetrec/core/utils/envs.py index 2cd55dd4..eb866fba 100644 --- a/fleetrec/core/utils/envs.py +++ b/fleetrec/core/utils/envs.py @@ -13,7 +13,6 @@ # limitations under the License. -import os import copy global_envs = {} diff --git a/fleetrec/core/utils/fs.py b/fleetrec/core/utils/fs.py index b4f129a9..df237972 100755 --- a/fleetrec/core/utils/fs.py +++ b/fleetrec/core/utils/fs.py @@ -13,7 +13,6 @@ # limitations under the License. import os -import time from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient diff --git a/fleetrec/core/utils/reader_instance.py b/fleetrec/core/utils/reader_instance.py index b4c2d9c1..91b6af94 100644 --- a/fleetrec/core/utils/reader_instance.py +++ b/fleetrec/core/utils/reader_instance.py @@ -14,7 +14,7 @@ from __future__ import print_function import sys -from fleetrec.utils.envs import lazy_instance +from fleetrec.core.utils.envs import lazy_instance if len(sys.argv) != 4: raise ValueError("reader only accept 3 argument: 1. reader_class 2.train/evaluate 3.yaml_abs_path") diff --git a/fleetrec/core/utils/util.py b/fleetrec/core/utils/util.py index 8b5971d0..d74c0d6c 100755 --- a/fleetrec/core/utils/util.py +++ b/fleetrec/core/utils/util.py @@ -15,7 +15,8 @@ import os import time import datetime -from ..utils import fs as fs + +from fleetrec.core.utils import fs as fs def str2bool(v): diff --git a/fleetrec/examples/built_in/run.py b/fleetrec/examples/built_in/run.py deleted file mode 100644 index a1c4281d..00000000 --- a/fleetrec/examples/built_in/run.py +++ /dev/null @@ -1,24 +0,0 @@ -# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os - -from fleetrec.core.factory import TrainerFactory - -if __name__ == "__main__": - - abs_dir = os.path.dirname(os.path.abspath(__file__)) - yaml = os.path.join(abs_dir, 'ctr-dnn_train_single.yaml') - trainer = TrainerFactory.create(yaml) - trainer.run() diff --git a/fleetrec/examples/built_in/single_training.yaml b/fleetrec/examples/built_in/single_training.yaml deleted file mode 100644 index 8fb437ae..00000000 --- a/fleetrec/examples/built_in/single_training.yaml +++ /dev/null @@ -1,2 +0,0 @@ - -trainer: "SingleTraining" \ No newline at end of file diff --git a/fleetrec/examples/built_in/user_define_trainer.py b/fleetrec/examples/built_in/user_define_trainer.py index 70841f9a..194d1e2b 100644 --- a/fleetrec/examples/built_in/user_define_trainer.py +++ b/fleetrec/examples/built_in/user_define_trainer.py @@ -14,8 +14,8 @@ import paddle.fluid as fluid -from fleetrec.trainer.transpiler_trainer import TranspileTrainer -from fleetrec.utils import envs +from fleetrec.core.trainers.transpiler_trainer import TranspileTrainer +from fleetrec.core.utils import envs class UserDefineTrainer(TranspileTrainer): diff --git a/fleetrec/models/ctr_dnn/model.py b/fleetrec/models/ctr_dnn/model.py index f771eaf5..970ef6fa 100644 --- a/fleetrec/models/ctr_dnn/model.py +++ b/fleetrec/models/ctr_dnn/model.py @@ -15,8 +15,8 @@ import math import paddle.fluid as fluid -from fleetrec.utils import envs -from fleetrec.models.base import ModelBase +from fleetrec.core.utils import envs +from fleetrec.core.model import Model as ModelBase class Model(ModelBase): diff --git a/fleetrec/models/ctr_dnn/reader.py b/fleetrec/models/ctr_dnn/reader.py index fd1ceff7..ce7fdfd4 100644 --- a/fleetrec/models/ctr_dnn/reader.py +++ b/fleetrec/models/ctr_dnn/reader.py @@ -13,8 +13,8 @@ # limitations under the License. from __future__ import print_function -from fleetrec.reader.reader import Reader -from fleetrec.utils import envs +from fleetrec.core.reader import Reader +from fleetrec.core.utils import envs class TrainReader(Reader): diff --git a/fleetrec/run.py b/fleetrec/run.py new file mode 100644 index 00000000..1a79ce76 --- /dev/null +++ b/fleetrec/run.py @@ -0,0 +1,74 @@ +import argparse +import os + +from fleetrec.core.factory import TrainerFactory +from fleetrec.core.utils import envs +from fleetrec.core.engine import local_engine + + +def run(model_yaml): + trainer = TrainerFactory.create(model_yaml) + trainer.run() + + +def single_engine(model_yaml): + single_envs = {} + single_envs["singleTraning"] = True + + print(envs.pretty_print_envs(single_envs, ("Single Envs", "Value"))) + run(model_yaml) + + +def local_cluster_engine(cluster_envs, model_yaml): + print(envs.pretty_print_envs(cluster_envs, ("Local Cluster Envs", "Value"))) + + launch = local_engine.Launch(cluster_envs, model_yaml) + launch.run() + + +def local_mpi_engine(cluster_envs, model_yaml): + print(envs.pretty_print_envs(cluster_envs, ("Local MPI Cluster Envs", "Value"))) + print("coming soon") + + +def yaml_engine(engine_yaml, model_yaml): + print("coming soon") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='fleet-rec run') + parser.add_argument("--model", type=str) + parser.add_argument("--engine", type=str) + + args = parser.parse_args() + + if not os.path.exists(args.model) or not os.path.isfile(args.model): + raise ValueError("argument model: {} error, must specify a existed yaml file".format(args.model)) + + if args.engine == "Single": + print("use SingleTraining to run model: {}".format(args.model)) + single_engine(args.model) + elif args.engine == "LocalCluster": + print("use 1X1 ClusterTraining at localhost to run model: {}".format(args.model)) + + cluster_envs = {} + cluster_envs["server_num"] = 1 + cluster_envs["worker_num"] = 1 + cluster_envs["start_port"] = 36001 + cluster_envs["log_dir"] = "logs" + + local_cluster_engine(cluster_envs, args.model) + elif args.engine == "LocalMPI": + print("use 1X1 MPI ClusterTraining at localhost to run model: {}".format(args.model)) + + cluster_envs = {} + cluster_envs["server_num"] = 1 + cluster_envs["worker_num"] = 1 + cluster_envs["start_port"] = 36001 + cluster_envs["log_dir"] = "logs" + + local_mpi_engine(cluster_envs, args.model) + else: + if not os.path.exists(args.engine) or not os.path.isfile(args.engine): + raise ValueError("argument engine: {} error, must specify a existed yaml file".format(args.model)) + yaml_engine(args.engine, args.model) -- GitLab