未验证 提交 b1f708fc 编写于 作者: W wuzhihua 提交者: GitHub

Merge pull request #204 from vslyu/fix_collective_files_partition

fix bugs for files partition running in collective mode
...@@ -119,7 +119,8 @@ class LocalClusterEngine(Engine): ...@@ -119,7 +119,8 @@ class LocalClusterEngine(Engine):
"PADDLE_TRAINERS_NUM": str(worker_num), "PADDLE_TRAINERS_NUM": str(worker_num),
"TRAINING_ROLE": "TRAINER", "TRAINING_ROLE": "TRAINER",
"PADDLE_TRAINER_ID": str(i), "PADDLE_TRAINER_ID": str(i),
"FLAGS_selected_gpus": str(selected_gpus[i]) "FLAGS_selected_gpus": str(selected_gpus[i]),
"PADDLEREC_GPU_NUMS": str(selected_gpus_num)
}) })
os.system("mkdir -p {}".format(logs_dir)) os.system("mkdir -p {}".format(logs_dir))
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
from __future__ import print_function from __future__ import print_function
import os import os
import warnings
from paddlerec.core.utils.envs import lazy_instance_by_fliename from paddlerec.core.utils.envs import lazy_instance_by_fliename
from paddlerec.core.utils.envs import get_global_env from paddlerec.core.utils.envs import get_global_env
from paddlerec.core.utils.envs import get_runtime_environ from paddlerec.core.utils.envs import get_runtime_environ
...@@ -47,6 +48,16 @@ def dataloader_by_name(readerclass, ...@@ -47,6 +48,16 @@ def dataloader_by_name(readerclass,
files.sort() files.sort()
# for local cluster: discard some files if files cannot be divided equally between GPUs
if (context["device"] == "GPU") and "PADDLEREC_GPU_NUMS" in os.environ:
selected_gpu_nums = int(os.getenv("PADDLEREC_GPU_NUMS"))
discard_file_nums = len(files) % selected_gpu_nums
if (discard_file_nums != 0):
warnings.warn(
"Because files cannot be divided equally between GPUs,discard these files:{}".
format(files[-discard_file_nums:]))
files = files[:len(files) - discard_file_nums]
need_split_files = False need_split_files = False
if context["engine"] == EngineMode.LOCAL_CLUSTER: if context["engine"] == EngineMode.LOCAL_CLUSTER:
# for local cluster: split files for multi process # for local cluster: split files for multi process
...@@ -109,6 +120,16 @@ def slotdataloader_by_name(readerclass, dataset_name, yaml_file, context): ...@@ -109,6 +120,16 @@ def slotdataloader_by_name(readerclass, dataset_name, yaml_file, context):
files.sort() files.sort()
# for local cluster: discard some files if files cannot be divided equally between GPUs
if (context["device"] == "GPU") and "PADDLEREC_GPU_NUMS" in os.environ:
selected_gpu_nums = int(os.getenv("PADDLEREC_GPU_NUMS"))
discard_file_nums = len(files) % selected_gpu_nums
if (discard_file_nums != 0):
warnings.warn(
"Because files cannot be divided equally between GPUs,discard these files:{}".
format(files[-discard_file_nums:]))
files = files[:len(files) - discard_file_nums]
need_split_files = False need_split_files = False
if context["engine"] == EngineMode.LOCAL_CLUSTER: if context["engine"] == EngineMode.LOCAL_CLUSTER:
# for local cluster: split files for multi process # for local cluster: split files for multi process
...@@ -153,73 +174,3 @@ def slotdataloader_by_name(readerclass, dataset_name, yaml_file, context): ...@@ -153,73 +174,3 @@ def slotdataloader_by_name(readerclass, dataset_name, yaml_file, context):
if hasattr(reader, 'generate_batch_from_trainfiles'): if hasattr(reader, 'generate_batch_from_trainfiles'):
return gen_batch_reader() return gen_batch_reader()
return gen_reader return gen_reader
def slotdataloader(readerclass, train, yaml_file, context):
if train == "TRAIN":
reader_name = "SlotReader"
namespace = "train.reader"
data_path = get_global_env("train_data_path", None, namespace)
else:
reader_name = "SlotReader"
namespace = "evaluate.reader"
data_path = get_global_env("test_data_path", None, namespace)
if data_path.startswith("paddlerec::"):
package_base = get_runtime_environ("PACKAGE_BASE")
assert package_base is not None
data_path = os.path.join(package_base, data_path.split("::")[1])
hidden_file_list, files = check_filelist(
hidden_file_list=[], data_file_list=[], train_data_path=data_path)
if (hidden_file_list is not None):
print(
"Warning:please make sure there are no hidden files in the dataset folder and check these hidden files:{}".
format(hidden_file_list))
files.sort()
need_split_files = False
if context["engine"] == EngineMode.LOCAL_CLUSTER:
# for local cluster: split files for multi process
need_split_files = True
elif context["engine"] == EngineMode.CLUSTER and context[
"cluster_type"] == "K8S":
# for k8s mount mode, split files for every node
need_split_files = True
if need_split_files:
files = split_files(files, context["fleet"].worker_index(),
context["fleet"].worker_num())
context["file_list"] = files
sparse = get_global_env("sparse_slots", "#", namespace)
if sparse == "":
sparse = "#"
dense = get_global_env("dense_slots", "#", namespace)
if dense == "":
dense = "#"
padding = get_global_env("padding", 0, namespace)
reader = SlotReader(yaml_file)
reader.init(sparse, dense, int(padding))
def gen_reader():
for file in files:
with open(file, 'r') as f:
for line in f:
line = line.rstrip('\n')
iter = reader.generate_sample(line)
for parsed_line in iter():
if parsed_line is None:
continue
else:
values = []
for pased in parsed_line:
values.append(pased[1])
yield values
def gen_batch_reader():
return reader.generate_batch_from_trainfiles(files)
if hasattr(reader, 'generate_batch_from_trainfiles'):
return gen_batch_reader()
return gen_reader
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册