dataloader_instance.py 9.8 KB
Newer Older
T
tangwei 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
#   Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function

import os
17 18 19
from paddlerec.core.utils.envs import lazy_instance_by_fliename
from paddlerec.core.utils.envs import get_global_env
from paddlerec.core.utils.envs import get_runtime_environ
X
xujiaqi01 已提交
20
from paddlerec.core.reader import SlotReader
C
Chengmo 已提交
21
from paddlerec.core.trainer import EngineMode
22
from paddlerec.core.utils.util import split_files, check_filelist
T
tangwei 已提交
23

X
fix  
xjqbest 已提交
24

C
Chengmo 已提交
25 26 27 28 29 30 31 32
def dataloader_by_name(readerclass,
                       dataset_name,
                       yaml_file,
                       context,
                       reader_class_name="Reader"):

    reader_class = lazy_instance_by_fliename(readerclass, reader_class_name)

X
fix  
xjqbest 已提交
33 34 35 36 37 38 39 40
    name = "dataset." + dataset_name + "."
    data_path = get_global_env(name + "data_path")

    if data_path.startswith("paddlerec::"):
        package_base = get_runtime_environ("PACKAGE_BASE")
        assert package_base is not None
        data_path = os.path.join(package_base, data_path.split("::")[1])

41 42 43 44 45 46 47
    hidden_file_list, files = check_filelist(
        hidden_file_list=[], data_file_list=[], train_data_path=data_path)
    if (hidden_file_list is not None):
        print(
            "Warning:please make sure there are no hidden files in the dataset folder and check these hidden files:{}".
            format(hidden_file_list))

C
Chengmo 已提交
48 49
    files.sort()

50 51 52 53 54 55 56 57 58 59
    # for local cluster: discard some files if files cannot be divided equally between GPUs
    if (context["device"] == "GPU"):
        selected_gpu_nums = int(os.getenv("PADDLEREC_GPU_NUMS"))
        discard_file_nums = len(files) % selected_gpu_nums
        if (discard_file_nums != 0):
            print(
                "Warning: beacause files cannot be divided equally between GPUs,discard these files:{}".
                format(files[-discard_file_nums:]))
            files = files[:len(files) - discard_file_nums]

C
Chengmo 已提交
60
    need_split_files = False
C
Chengmo 已提交
61
    if context["engine"] == EngineMode.LOCAL_CLUSTER:
C
Chengmo 已提交
62 63 64 65 66 67 68 69
        # for local cluster: split files for multi process
        need_split_files = True
    elif context["engine"] == EngineMode.CLUSTER and context[
            "cluster_type"] == "K8S":
        # for k8s mount mode, split files for every node
        need_split_files = True
    print("need_split_files: {}".format(need_split_files))
    if need_split_files:
C
Chengmo 已提交
70 71
        files = split_files(files, context["fleet"].worker_index(),
                            context["fleet"].worker_num())
C
Chengmo 已提交
72

X
fix  
xjqbest 已提交
73 74
    reader = reader_class(yaml_file)
    reader.init()
X
fix  
xjqbest 已提交
75

X
fix  
xjqbest 已提交
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
    def gen_reader():
        for file in files:
            with open(file, 'r') as f:
                for line in f:
                    line = line.rstrip('\n')
                    iter = reader.generate_sample(line)
                    for parsed_line in iter():
                        if parsed_line is None:
                            continue
                        else:
                            values = []
                            for pased in parsed_line:
                                values.append(pased[1])
                            yield values

    def gen_batch_reader():
        return reader.generate_batch_from_trainfiles(files)

    if hasattr(reader, 'generate_batch_from_trainfiles'):
        return gen_batch_reader()
M
malin10 已提交
96 97 98 99

    if hasattr(reader, "batch_tensor_creator"):
        return reader.batch_tensor_creator(gen_reader)

X
fix  
xjqbest 已提交
100 101 102
    return gen_reader


C
Chengmo 已提交
103
def slotdataloader_by_name(readerclass, dataset_name, yaml_file, context):
X
fix  
xjqbest 已提交
104 105 106 107 108 109 110 111 112
    name = "dataset." + dataset_name + "."
    reader_name = "SlotReader"
    data_path = get_global_env(name + "data_path")

    if data_path.startswith("paddlerec::"):
        package_base = get_runtime_environ("PACKAGE_BASE")
        assert package_base is not None
        data_path = os.path.join(package_base, data_path.split("::")[1])

113 114 115 116 117 118 119
    hidden_file_list, files = check_filelist(
        hidden_file_list=[], data_file_list=[], train_data_path=data_path)
    if (hidden_file_list is not None):
        print(
            "Warning:please make sure there are no hidden files in the dataset folder and check these hidden files:{}".
            format(hidden_file_list))

C
Chengmo 已提交
120 121
    files.sort()

122 123 124 125 126 127 128 129 130 131
    # for local cluster: discard some files if files cannot be divided equally between GPUs
    if (context["device"] == "GPU"):
        selected_gpu_nums = int(os.getenv("PADDLEREC_GPU_NUMS"))
        discard_file_nums = len(files) % selected_gpu_nums
        if (discard_file_nums != 0):
            print(
                "Warning: beacause files cannot be divided equally between GPUs, discard these files:{}".
                format(files[-discard_file_nums:]))
            files = files[:len(files) - discard_file_nums]

C
Chengmo 已提交
132
    need_split_files = False
C
Chengmo 已提交
133
    if context["engine"] == EngineMode.LOCAL_CLUSTER:
C
Chengmo 已提交
134 135 136 137 138 139 140 141
        # for local cluster: split files for multi process
        need_split_files = True
    elif context["engine"] == EngineMode.CLUSTER and context[
            "cluster_type"] == "K8S":
        # for k8s mount mode, split files for every node
        need_split_files = True

    if need_split_files:
C
Chengmo 已提交
142 143
        files = split_files(files, context["fleet"].worker_index(),
                            context["fleet"].worker_num())
C
Chengmo 已提交
144

X
fix  
xjqbest 已提交
145 146 147 148 149 150
    sparse = get_global_env(name + "sparse_slots", "#")
    if sparse == "":
        sparse = "#"
    dense = get_global_env(name + "dense_slots", "#")
    if dense == "":
        dense = "#"
X
fix  
xjqbest 已提交
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
    padding = get_global_env(name + "padding", 0)
    reader = SlotReader(yaml_file)
    reader.init(sparse, dense, int(padding))

    def gen_reader():
        for file in files:
            with open(file, 'r') as f:
                for line in f:
                    line = line.rstrip('\n')
                    iter = reader.generate_sample(line)
                    for parsed_line in iter():
                        if parsed_line is None:
                            continue
                        else:
                            values = []
                            for pased in parsed_line:
                                values.append(pased[1])
                            yield values

    def gen_batch_reader():
        return reader.generate_batch_from_trainfiles(files)

    if hasattr(reader, 'generate_batch_from_trainfiles'):
        return gen_batch_reader()
    return gen_reader
T
tangwei 已提交
176

X
fix  
xjqbest 已提交
177

C
Chengmo 已提交
178
def slotdataloader(readerclass, train, yaml_file, context):
X
xujiaqi01 已提交
179 180 181 182 183 184 185 186 187 188 189 190 191 192
    if train == "TRAIN":
        reader_name = "SlotReader"
        namespace = "train.reader"
        data_path = get_global_env("train_data_path", None, namespace)
    else:
        reader_name = "SlotReader"
        namespace = "evaluate.reader"
        data_path = get_global_env("test_data_path", None, namespace)

    if data_path.startswith("paddlerec::"):
        package_base = get_runtime_environ("PACKAGE_BASE")
        assert package_base is not None
        data_path = os.path.join(package_base, data_path.split("::")[1])

193 194 195 196 197 198 199
    hidden_file_list, files = check_filelist(
        hidden_file_list=[], data_file_list=[], train_data_path=data_path)
    if (hidden_file_list is not None):
        print(
            "Warning:please make sure there are no hidden files in the dataset folder and check these hidden files:{}".
            format(hidden_file_list))

C
Chengmo 已提交
200 201
    files.sort()

202 203 204 205 206 207 208 209 210 211
    # for local cluster: discard some files if files cannot be divided equally between GPUs
    if (context["device"] == "GPU"):
        selected_gpu_nums = int(os.getenv("PADDLEREC_GPU_NUMS"))
        discard_file_nums = len(files) % selected_gpu_nums
        if (discard_file_nums != 0):
            print(
                "Warning: beacause files cannot be divided equally between GPUs,discard these files:{}".
                format(files[-discard_file_nums:]))
            files = files[:len(files) - discard_file_nums]

C
Chengmo 已提交
212
    need_split_files = False
C
Chengmo 已提交
213
    if context["engine"] == EngineMode.LOCAL_CLUSTER:
C
Chengmo 已提交
214 215 216 217 218 219 220 221
        # for local cluster: split files for multi process
        need_split_files = True
    elif context["engine"] == EngineMode.CLUSTER and context[
            "cluster_type"] == "K8S":
        # for k8s mount mode, split files for every node
        need_split_files = True

    if need_split_files:
C
Chengmo 已提交
222 223
        files = split_files(files, context["fleet"].worker_index(),
                            context["fleet"].worker_num())
X
xujiaqi01 已提交
224

X
fix  
xjqbest 已提交
225 226 227 228 229 230
    sparse = get_global_env("sparse_slots", "#", namespace)
    if sparse == "":
        sparse = "#"
    dense = get_global_env("dense_slots", "#", namespace)
    if dense == "":
        dense = "#"
X
xujiaqi01 已提交
231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
    padding = get_global_env("padding", 0, namespace)
    reader = SlotReader(yaml_file)
    reader.init(sparse, dense, int(padding))

    def gen_reader():
        for file in files:
            with open(file, 'r') as f:
                for line in f:
                    line = line.rstrip('\n')
                    iter = reader.generate_sample(line)
                    for parsed_line in iter():
                        if parsed_line is None:
                            continue
                        else:
                            values = []
                            for pased in parsed_line:
                                values.append(pased[1])
                            yield values

    def gen_batch_reader():
        return reader.generate_batch_from_trainfiles(files)

    if hasattr(reader, 'generate_batch_from_trainfiles'):
        return gen_batch_reader()
    return gen_reader