提交 31bbef3d 编写于 作者: T tangwei

structure rebuild

上级 8334c08d
import argparse
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='fleet-rec check')
parser.add_argument("--model", type=str)
parser.add_argument("--engine", type=str)
print("coming soon")
......@@ -16,13 +16,13 @@ import os
import sys
import yaml
from fleetrec.trainer.local_engine import Launch
from fleetrec.trainer.single_trainer import SingleTrainer
from fleetrec.trainer.cluster_trainer import ClusterTrainer
from fleetrec.trainer.ctr_trainer import CtrPaddleTrainer
from fleetrec.utils import envs
from fleetrec.utils import util
class TrainerFactory(object):
......@@ -52,19 +52,6 @@ class TrainerFactory(object):
raise ValueError("trainer only support SingleTraining/ClusterTraining")
return trainer
@staticmethod
def _build_engine(yaml_config):
cluster_envs = {}
cluster_envs["server_num"] = envs.get_global_env("train.pserver_num")
cluster_envs["worker_num"] = envs.get_global_env("train.pserver_num")
cluster_envs["start_port"] = envs.get_global_env("train.start_port")
cluster_envs["log_dir"] = envs.get_global_env("train.log_dirname")
print(envs.pretty_print_envs(cluster_envs, ("Cluster Global Envs", "Value")))
launch = Launch(cluster_envs, yaml_config)
return launch
@staticmethod
def create(config):
_config = None
......@@ -75,15 +62,7 @@ class TrainerFactory(object):
raise ValueError("fleetrec's config only support yaml")
envs.set_global_envs(_config)
mode = envs.get_global_env("train.trainer")
container = envs.get_global_env("train.container")
instance = util.str2bool(os.getenv("CLUSTER_INSTANCE", "0"))
if mode == "ClusterTraining" and container == "local" and not instance:
trainer = TrainerFactory._build_engine(config)
else:
trainer = TrainerFactory._build_trainer(_config, config)
trainer = TrainerFactory._build_trainer(_config, config)
return trainer
......
......@@ -16,9 +16,9 @@ import abc
import copy
import yaml
import paddle.fluid as fluid
from ..utils import table as table
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
from fleetrec.core.utils import table as table
class Layer(object):
"""R
......
......@@ -15,7 +15,7 @@
import math
import numpy as np
import paddle.fluid as fluid
from .base import Metric
from fleetrec.core.metric import Metric
class AUCMetric(Metric):
......
import yaml
import copy
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
from fleetrec.core.model import Model
from fleetrec.core.utils import table
def create(config):
"""
......@@ -14,14 +22,14 @@ def create(config):
return model
class YamlModel(ModelBase):
class YamlModel(Model):
"""R
"""
def __init__(self, config):
"""R
"""
ModelBase.__init__(self, config)
Model.__init__(self, config)
self._config = config
self._name = config['name']
f = open(config['layer_file'], 'r')
......
......@@ -13,7 +13,7 @@
# limitations under the License.
import paddle.fluid as fluid
from .base import Layer
from fleetrec.core.layer import Layer
class EmbeddingInputLayer(Layer):
......
......@@ -19,7 +19,7 @@ import os
import paddle.fluid.incubate.data_generator as dg
import yaml
from fleetrec.utils import envs
from fleetrec.core.utils import envs
class Reader(dg.MultiSlotDataGenerator):
......
......@@ -17,19 +17,14 @@ Training use fluid with one node only.
"""
from __future__ import print_function
import logging
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
from paddle.fluid.incubate.fleet.base.role_maker import PaddleCloudRoleMaker
from ..utils import envs
from .transpiler_trainer import TranspileTrainer
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
from fleetrec.core.utils import envs
from fleetrec.core.trainers.transpiler_trainer import TranspileTrainer
class ClusterTrainer(TranspileTrainer):
......
......@@ -23,12 +23,13 @@ import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
from paddle.fluid.incubate.fleet.base.role_maker import GeneralRoleMaker
from fleetrec.utils import fs as fs
from fleetrec.utils import util as util
from fleetrec.metrics.auc_metrics import AUCMetric
from fleetrec.models import base as model_basic
from fleetrec.reader import dataset
from .trainer import Trainer
from fleetrec.core.utils import fs as fs
from fleetrec.core.utils import util as util
from fleetrec.core.metrics.auc_metrics import AUCMetric
from fleetrec.core.models.modul import build as model_basic
from fleetrec.core.utils import dataset
from fleetrec.core.trainer import Trainer
def wroker_numric_opt(value, env, opt):
......
......@@ -13,13 +13,13 @@
# limitations under the License.
import abc
import copy
import yaml
import time
import datetime
import paddle.fluid as fluid
from .. utils import fs as fs
from .. utils import util as util
from fleetrec.core.utils import fs as fs
from fleetrec.core.utils import util as util
class Dataset(object):
......@@ -27,12 +27,13 @@ class Dataset(object):
Dataset Base
"""
__metaclass__ = abc.ABCMeta
def __init__(self, config):
"""
"""
self._datasets = {}
self._config = config
@abc.abstractmethod
def check_ready(self, params):
"""
......@@ -43,19 +44,19 @@ class Dataset(object):
pass
@abc.abstractmethod
def load_dataset(self, params):
def load_dataset(self, params):
"""R
"""
pass
@abc.abstractmethod
def preload_dataset(self, params):
def preload_dataset(self, params):
"""R
"""
pass
@abc.abstractmethod
def release_dataset(self, params):
def release_dataset(self, params):
"""R
"""
pass
......@@ -65,23 +66,24 @@ class TimeSplitDataset(Dataset):
"""
Dataset with time split dir. root_path/$DAY/$HOUR
"""
def __init__(self, config):
"""
init data root_path, time_split_interval, data_path_format
"""
Dataset.__init__(self, config)
if 'data_donefile' not in config or config['data_donefile'] is None:
config['data_donefile'] = config['data_path'] + "/to.hadoop.done"
config['data_donefile'] = config['data_path'] + "/to.hadoop.done"
self._path_generator = util.PathGenerator({'templates': [
{'name': 'data_path', 'template': config['data_path']},
{'name': 'donefile_path', 'template': config['data_donefile']}
{'name': 'data_path', 'template': config['data_path']},
{'name': 'donefile_path', 'template': config['data_donefile']}
]})
self._split_interval = config['split_interval'] # data split N mins per dir
self._split_interval = config['split_interval'] # data split N mins per dir
self._data_file_handler = fs.FileHandler(config)
def _format_data_time(self, daytime_str, time_window_mins):
""" """
data_time = util.make_datetime(daytime_str)
data_time = util.make_datetime(daytime_str)
mins_of_day = data_time.hour * 60 + data_time.minute
begin_stage = mins_of_day / self._split_interval
end_stage = (mins_of_day + time_window_mins) / self._split_interval
......@@ -91,9 +93,9 @@ class TimeSplitDataset(Dataset):
if mins_of_day % self._split_interval != 0:
skip_mins = self._split_interval - (mins_of_day % self._split_interval)
data_time = data_time + datetime.timedelta(minutes=skip_mins)
time_window_mins = time_window_mins - skip_mins
time_window_mins = time_window_mins - skip_mins
return data_time, time_window_mins
def check_ready(self, daytime_str, time_window_mins):
"""
data in [daytime_str, daytime_str + time_window_mins] is ready or not
......@@ -106,14 +108,14 @@ class TimeSplitDataset(Dataset):
is_ready = True
data_time, windows_mins = self._format_data_time(daytime_str, time_window_mins)
while time_window_mins > 0:
file_path = self._path_generator.generate_path('donefile_path', {'time_format': data_time})
file_path = self._path_generator.generate_path('donefile_path', {'time_format': data_time})
if not self._data_file_handler.is_exist(file_path):
is_ready = False
break
time_window_mins = time_window_mins - self._split_interval
data_time = data_time + datetime.timedelta(minutes=self._split_interval)
return is_ready
def get_file_list(self, daytime_str, time_window_mins, node_num=1, node_idx=0):
"""
data in [daytime_str, daytime_str + time_window_mins], random shard to node_num, return shard[node_idx]
......@@ -128,7 +130,7 @@ class TimeSplitDataset(Dataset):
data_file_list = []
data_time, windows_mins = self._format_data_time(daytime_str, time_window_mins)
while time_window_mins > 0:
file_path = self._path_generator.generate_path('data_path', {'time_format': data_time})
file_path = self._path_generator.generate_path('data_path', {'time_format': data_time})
sub_file_list = self._data_file_handler.ls(file_path)
for sub_file in sub_file_list:
sub_file_name = self._data_file_handler.get_file_name(sub_file)
......@@ -138,17 +140,18 @@ class TimeSplitDataset(Dataset):
data_file_list.append(sub_file)
time_window_mins = time_window_mins - self._split_interval
data_time = data_time + datetime.timedelta(minutes=self._split_interval)
return data_file_list
return data_file_list
class FluidTimeSplitDataset(TimeSplitDataset):
"""
A Dataset with time split for PaddleFluid
"""
def __init__(self, config):
""" """
TimeSplitDataset.__init__(self, config)
def _alloc_dataset(self, file_list):
""" """
dataset = fluid.DatasetFactory().create_dataset(self._config['dataset_type'])
......@@ -158,12 +161,12 @@ class FluidTimeSplitDataset(TimeSplitDataset):
dataset.set_pipe_command(self._config['data_converter'])
dataset.set_filelist(file_list)
dataset.set_use_var(self._config['data_vars'])
#dataset.set_fleet_send_sleep_seconds(2)
#dataset.set_fleet_send_batch_size(80000)
# dataset.set_fleet_send_sleep_seconds(2)
# dataset.set_fleet_send_batch_size(80000)
return dataset
def load_dataset(self, params):
""" """
""" """
begin_time = params['begin_time']
windown_min = params['time_window_min']
if begin_time not in self._datasets:
......@@ -176,8 +179,8 @@ class FluidTimeSplitDataset(TimeSplitDataset):
else:
self._datasets[begin_time].wait_preload_done()
return self._datasets[begin_time]
def preload_dataset(self, params):
def preload_dataset(self, params):
""" """
begin_time = params['begin_time']
windown_min = params['time_window_min']
......@@ -189,7 +192,7 @@ class FluidTimeSplitDataset(TimeSplitDataset):
return True
return False
def release_dataset(self, params):
def release_dataset(self, params):
""" """
begin_time = params['begin_time']
windown_min = params['time_window_min']
......
......@@ -13,7 +13,6 @@
# limitations under the License.
import os
import copy
global_envs = {}
......
......@@ -13,7 +13,6 @@
# limitations under the License.
import os
import time
from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient
......
......@@ -14,7 +14,7 @@
from __future__ import print_function
import sys
from fleetrec.utils.envs import lazy_instance
from fleetrec.core.utils.envs import lazy_instance
if len(sys.argv) != 4:
raise ValueError("reader only accept 3 argument: 1. reader_class 2.train/evaluate 3.yaml_abs_path")
......
......@@ -15,7 +15,8 @@
import os
import time
import datetime
from ..utils import fs as fs
from fleetrec.core.utils import fs as fs
def str2bool(v):
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from fleetrec.core.factory import TrainerFactory
if __name__ == "__main__":
abs_dir = os.path.dirname(os.path.abspath(__file__))
yaml = os.path.join(abs_dir, 'ctr-dnn_train_single.yaml')
trainer = TrainerFactory.create(yaml)
trainer.run()
trainer: "SingleTraining"
\ No newline at end of file
......@@ -14,8 +14,8 @@
import paddle.fluid as fluid
from fleetrec.trainer.transpiler_trainer import TranspileTrainer
from fleetrec.utils import envs
from fleetrec.core.trainers.transpiler_trainer import TranspileTrainer
from fleetrec.core.utils import envs
class UserDefineTrainer(TranspileTrainer):
......
......@@ -15,8 +15,8 @@
import math
import paddle.fluid as fluid
from fleetrec.utils import envs
from fleetrec.models.base import ModelBase
from fleetrec.core.utils import envs
from fleetrec.core.model import Model as ModelBase
class Model(ModelBase):
......
......@@ -13,8 +13,8 @@
# limitations under the License.
from __future__ import print_function
from fleetrec.reader.reader import Reader
from fleetrec.utils import envs
from fleetrec.core.reader import Reader
from fleetrec.core.utils import envs
class TrainReader(Reader):
......
import argparse
import os
from fleetrec.core.factory import TrainerFactory
from fleetrec.core.utils import envs
from fleetrec.core.engine import local_engine
def run(model_yaml):
trainer = TrainerFactory.create(model_yaml)
trainer.run()
def single_engine(model_yaml):
single_envs = {}
single_envs["singleTraning"] = True
print(envs.pretty_print_envs(single_envs, ("Single Envs", "Value")))
run(model_yaml)
def local_cluster_engine(cluster_envs, model_yaml):
print(envs.pretty_print_envs(cluster_envs, ("Local Cluster Envs", "Value")))
launch = local_engine.Launch(cluster_envs, model_yaml)
launch.run()
def local_mpi_engine(cluster_envs, model_yaml):
print(envs.pretty_print_envs(cluster_envs, ("Local MPI Cluster Envs", "Value")))
print("coming soon")
def yaml_engine(engine_yaml, model_yaml):
print("coming soon")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='fleet-rec run')
parser.add_argument("--model", type=str)
parser.add_argument("--engine", type=str)
args = parser.parse_args()
if not os.path.exists(args.model) or not os.path.isfile(args.model):
raise ValueError("argument model: {} error, must specify a existed yaml file".format(args.model))
if args.engine == "Single":
print("use SingleTraining to run model: {}".format(args.model))
single_engine(args.model)
elif args.engine == "LocalCluster":
print("use 1X1 ClusterTraining at localhost to run model: {}".format(args.model))
cluster_envs = {}
cluster_envs["server_num"] = 1
cluster_envs["worker_num"] = 1
cluster_envs["start_port"] = 36001
cluster_envs["log_dir"] = "logs"
local_cluster_engine(cluster_envs, args.model)
elif args.engine == "LocalMPI":
print("use 1X1 MPI ClusterTraining at localhost to run model: {}".format(args.model))
cluster_envs = {}
cluster_envs["server_num"] = 1
cluster_envs["worker_num"] = 1
cluster_envs["start_port"] = 36001
cluster_envs["log_dir"] = "logs"
local_mpi_engine(cluster_envs, args.model)
else:
if not os.path.exists(args.engine) or not os.path.isfile(args.engine):
raise ValueError("argument engine: {} error, must specify a existed yaml file".format(args.model))
yaml_engine(args.engine, args.model)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册