dataset.py 3.9 KB
Newer Older
D
dongdaxiang 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from paddle.fluid.proto import data_feed_pb2
from google.protobuf import text_format
from . import core
__all__ = ['DatasetFactory']


class DatasetFactory(object):
    def __init__(self):
        pass

25
    def create_dataset(self, datafeed_class="QueueDataset"):
D
dongdaxiang 已提交
26 27
        try:
            dataset = globals()[datafeed_class]()
28
            return dataset
D
dongdaxiang 已提交
29 30 31 32 33 34 35 36 37 38 39
        except:
            raise ValueError("datafeed class %s does not exist" %
                             datafeed_class)


class DatasetBase(object):
    def __init__(self):
        # define class name here
        # to decide whether we need create in memory instance
        self.proto_desc = data_feed_pb2.DataFeedDesc()
        self.proto_desc.pipe_command = "cat"
40
        self.dataset = core.MultiSlotDataset()
41
        self.thread_num = 0
D
dongdaxiang 已提交
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64

    def set_pipe_command(self, pipe_command):
        """
        Set pipe command of current dataset
        A pipe command is a UNIX pipeline command that can be used only

        """
        self.proto_desc.pipe_command = pipe_command

    def set_batch_size(self, batch_size):
        """
        Set batch size. Will be effective during training

        Example:
            >>> data_feed = fluid.DataFeedDesc('data.proto')
            >>> data_feed.set_batch_size(128)

        Args:
            batch_size: batch size

        """
        self.proto_desc.batch_size = batch_size

65 66
    def set_thread(self, thread_num):
        self.dataset.set_thread_num(thread_num)
67
        self.thread_num = thread_num
68 69 70 71

    def set_filelist(self, filelist):
        self.dataset.set_filelist(filelist)

D
dongdaxiang 已提交
72
    def set_use_var(self, var_list):
73
        multi_slot = self.proto_desc.multi_slot_desc
D
dongdaxiang 已提交
74
        for var in var_list:
75
            slot_var = multi_slot.slots.add()
D
dongdaxiang 已提交
76 77 78 79
            slot_var.is_used = True
            slot_var.name = var.name
            if var.lod_level == 0:
                slot_var.is_dense = True
80
            if var.dtype == core.VarDesc.VarType.FP32:
D
dongdaxiang 已提交
81
                slot_var.type = "float32"
82
            elif var.dtype == core.VarDesc.VarType.INT64:
D
dongdaxiang 已提交
83 84 85 86 87 88
                slot_var.type = "uint64"
            else:
                raise ValueError(
                    "Currently, fluid.dataset only supports dtype=float32 and dtype=int64"
                )

89 90 91
    def _prepare_to_run(self):
        self.dataset.set_data_feed_desc(self.desc())

D
dongdaxiang 已提交
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
    def desc(self):
        """
        Returns a protobuf message for this DataFeedDesc

        Example:
            >>> data_feed = fluid.DataFeedDesc('data.proto')
            >>> print(data_feed.desc())

        Returns:
            A string message
        """
        return text_format.MessageToString(self.proto_desc)


class InMemoryDataset(DatasetBase):
    def __init__(self):
108 109 110 111
        super(InMemoryDataset, self).__init__()
        self.proto_desc.name = "MultiSlotInMemoryDataFeed"

    def load_into_memory(self):
112
        self._prepare_to_run()
113
        self.dataset.load_into_memory()
D
dongdaxiang 已提交
114 115

    def local_shuffle(self):
116
        self.dataset.local_shuffle()
D
dongdaxiang 已提交
117 118

    def global_shuffle(self):
119 120 121 122
        from .distributed import ps_instance
        instance = ps_instance.PaddlePSInstance(1, 2)
        self.dataset.set_trainer_num(instance.get_worker_num())
        self.global_shuffle()
D
dongdaxiang 已提交
123 124 125 126


class QueueDataset(DatasetBase):
    def __init__(self):
127
        super(QueueDataset, self).__init__()
D
dongdaxiang 已提交
128
        self.proto_desc.name = "MultiSlotDataFeed"