dataset.py 7.8 KB
Newer Older
D
dongdaxiang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from paddle.fluid.proto import data_feed_pb2
from google.protobuf import text_format
from . import core
__all__ = ['DatasetFactory']


class DatasetFactory(object):
22 23 24 25 26 27 28 29
    """
    DatasetFactory is a factory which create dataset by its name,
    you can create "QueueDataset" or "InMemoryDataset",
    the default is "QueueDataset".

    Example:
        dataset = paddle.fluid.DatasetFactory.create_dataset("InMemoryDataset")
    """
D
dongdaxiang 已提交
30

D
dongdaxiang 已提交
31
    def __init__(self):
32 33 34
        """
        Init
        """
D
dongdaxiang 已提交
35 36
        pass

37
    def create_dataset(self, datafeed_class="QueueDataset"):
38 39 40 41
        """
        Create "QueueDataset" or "InMemoryDataset",
        the default is "QueueDataset".
        """
D
dongdaxiang 已提交
42 43
        try:
            dataset = globals()[datafeed_class]()
44
            return dataset
D
dongdaxiang 已提交
45 46 47 48 49 50
        except:
            raise ValueError("datafeed class %s does not exist" %
                             datafeed_class)


class DatasetBase(object):
51 52 53
    """
    Base dataset class
    """
D
dongdaxiang 已提交
54

D
dongdaxiang 已提交
55
    def __init__(self):
56 57 58
        """
        Init
        """
D
dongdaxiang 已提交
59 60 61 62
        # define class name here
        # to decide whether we need create in memory instance
        self.proto_desc = data_feed_pb2.DataFeedDesc()
        self.proto_desc.pipe_command = "cat"
X
xujiaqi01 已提交
63
        self.dataset = core.Dataset("MultiSlotDataset")
64
        self.thread_num = 0
D
dongdaxiang 已提交
65 66 67 68 69 70

    def set_pipe_command(self, pipe_command):
        """
        Set pipe command of current dataset
        A pipe command is a UNIX pipeline command that can be used only

71 72 73 74 75 76
        Example:
            >>> dataset.set_pipe_command("python my_script.py")

        Args:
            pipe_command: pipe command

D
dongdaxiang 已提交
77 78 79 80 81 82 83 84
        """
        self.proto_desc.pipe_command = pipe_command

    def set_batch_size(self, batch_size):
        """
        Set batch size. Will be effective during training

        Example:
85
            >>> dataset.set_batch_size(128)
D
dongdaxiang 已提交
86 87 88 89 90 91 92

        Args:
            batch_size: batch size

        """
        self.proto_desc.batch_size = batch_size

93
    def set_thread(self, thread_num):
94 95 96 97 98 99 100 101 102
        """
        Set thread num, it is the num of readers.

        Example:
            >>> dataset.set_thread(12)

        Args:
            thread_num: thread num
        """
103
        self.dataset.set_thread_num(thread_num)
104
        self.thread_num = thread_num
105 106

    def set_filelist(self, filelist):
107 108 109 110 111 112 113 114 115
        """
        Set file list in current worker.

        Example:
            >>> dataset.set_filelist(['a.txt', 'b.txt'])

        Args:
            filelist: file list
        """
116 117
        self.dataset.set_filelist(filelist)

D
dongdaxiang 已提交
118
    def set_use_var(self, var_list):
119 120 121 122 123 124 125 126 127
        """
        Set Variables which you will use.

        Example:
            >>> dataset.set_use_var([data, label])

        Args:
            var_list: variable list
        """
128
        multi_slot = self.proto_desc.multi_slot_desc
D
dongdaxiang 已提交
129
        for var in var_list:
130
            slot_var = multi_slot.slots.add()
D
dongdaxiang 已提交
131 132 133 134
            slot_var.is_used = True
            slot_var.name = var.name
            if var.lod_level == 0:
                slot_var.is_dense = True
135
            if var.dtype == core.VarDesc.VarType.FP32:
D
dongdaxiang 已提交
136
                slot_var.type = "float"
137
            elif var.dtype == core.VarDesc.VarType.INT64:
D
dongdaxiang 已提交
138 139 140 141 142 143
                slot_var.type = "uint64"
            else:
                raise ValueError(
                    "Currently, fluid.dataset only supports dtype=float32 and dtype=int64"
                )

144
    def set_hdfs_config(self, fs_name, fs_ugi):
145 146 147 148 149 150 151 152 153 154
        """
        Set hdfs config: fs name ad ugi

        Example:
            >>> dataset.set_hdfs_config("my_fs_name", "my_fs_ugi")

        Args:
            fs_name: fs name
            fs_ugi: fs ugi
        """
155 156
        self.dataset.set_hdfs_config(fs_name, fs_ugi)

157
    def _prepare_to_run(self):
158 159 160 161
        """
        Set data_feed_desc before load or shuffle,
        user no need to call this function.
        """
162 163
        self.dataset.set_data_feed_desc(self.desc())

D
dongdaxiang 已提交
164 165 166 167 168
    def desc(self):
        """
        Returns a protobuf message for this DataFeedDesc

        Example:
169
            >>> print(dataset.desc())
D
dongdaxiang 已提交
170 171 172 173 174 175 176 177

        Returns:
            A string message
        """
        return text_format.MessageToString(self.proto_desc)


class InMemoryDataset(DatasetBase):
178 179 180 181 182 183 184
    """
    InMemoryDataset, it will load data into memory
    and shuffle data before training

    Example:
        dataset = paddle.fluid.DatasetFactory.create_dataset("InMemoryDataset")
    """
D
dongdaxiang 已提交
185

D
dongdaxiang 已提交
186
    def __init__(self):
187 188 189
        """
        Init
        """
190 191 192 193
        super(InMemoryDataset, self).__init__()
        self.proto_desc.name = "MultiSlotInMemoryDataFeed"

    def load_into_memory(self):
194 195 196 197
        """
        Load data into memory

        Example:
D
dongdaxiang 已提交
198 199 200 201
            >>> import paddle.fluid as fluid
            >>> dataset = fluid.DatasetFactory.create_dataset("InMemoryDataset")
            >>> filelist = ["a.txt", "b.txt"]
            >>> dataset.set_filelist(filelist)
202 203
            >>> dataset.load_into_memory()
        """
204
        self._prepare_to_run()
205
        self.dataset.load_into_memory()
D
dongdaxiang 已提交
206 207

    def local_shuffle(self):
208 209 210 211
        """
        Local shuffle

        Example:
D
dongdaxiang 已提交
212 213 214 215
            >>> import paddle.fluid as fluid
            >>> dataset = fluid.DatasetFactory.create_dataset("InMemoryDataset")
            >>> filelist = ["a.txt", "b.txt"]
            >>> dataset.set_filelist(filelist)
216 217
            >>> dataset.local_shuffle()
        """
218
        self.dataset.local_shuffle()
D
dongdaxiang 已提交
219

220
    def global_shuffle(self, fleet=None):
221 222 223 224 225
        """
        Global shuffle.
        If you run distributed, you should pass fleet instead of None.

        Example:
D
dongdaxiang 已提交
226 227 228 229 230
            >>> import paddle.fluid as fluid
            >>> import paddle.fluid.incubate.fleet.parameter_server as fleet
            >>> dataset = fluid.DatasetFactory.create_dataset("InMemoryDataset")
            >>> filelist = ["a.txt", "b.txt"]
            >>> dataset.set_filelist(filelist)
231 232 233 234 235
            >>> dataset.global_shuffle(fleet)

        Args:
            fleet: fleet singleton. Default None.
        """
236 237
        trainer_num = 1
        if fleet is not None:
X
xjqbest 已提交
238
            fleet.fleet_instance.role_maker_._barrier_worker()
239
            trainer_num = fleet.worker_num()
240
        self.dataset.register_client2client_msg_handler()
241
        self.dataset.set_trainer_num(trainer_num)
242
        if fleet is not None:
X
xjqbest 已提交
243
            fleet.fleet_instance.role_maker_._barrier_worker()
X
xujiaqi01 已提交
244
        self.dataset.global_shuffle()
245
        if fleet is not None:
X
xjqbest 已提交
246
            fleet.fleet_instance.role_maker_._barrier_worker()
D
dongdaxiang 已提交
247 248 249


class QueueDataset(DatasetBase):
250 251 252 253
    """
    QueueDataset, it will process data streamly.

    Example:
D
dongdaxiang 已提交
254 255
        import paddle.fluid as fluid
        dataset = fluid.DatasetFactory.create_dataset("QueueDataset")
256
    """
D
dongdaxiang 已提交
257

D
dongdaxiang 已提交
258
    def __init__(self):
259 260 261
        """
        Init
        """
262
        super(QueueDataset, self).__init__()
D
dongdaxiang 已提交
263
        self.proto_desc.name = "MultiSlotDataFeed"
X
xujiaqi01 已提交
264 265

    def local_shuffle(self):
266 267
        """
        Local shuffle
D
dongdaxiang 已提交
268 269

        QueueDataset does not support local shuffle
270
        """
D
dongdaxiang 已提交
271 272 273
        raise NotImplementedError(
            "QueueDataset does not support local shuffle, "
            "please use InMemoryDataset for local_shuffle")
X
xujiaqi01 已提交
274

275
    def global_shuffle(self, fleet=None):
276 277 278
        """
        Global shuffle
        """
D
dongdaxiang 已提交
279 280 281
        raise NotImplementedError(
            "QueueDataset does not support global shuffle, "
            "please use InMemoryDataset for global_shuffle")