dataset.py 6.7 KB
Newer Older
W
wanghua 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""
Data operations, will be used in run_pretrain.py
"""
import os
import mindspore.common.dtype as mstype
import mindspore.dataset.engine.datasets as de
import mindspore.dataset.transforms.c_transforms as C
from mindspore import log as logger
23
from .config import bert_net_cfg
W
wanghua 已提交
24 25 26 27 28 29 30 31 32 33


def create_bert_dataset(epoch_size=1, device_num=1, rank=0, do_shuffle="true", enable_data_sink="true",
                        data_sink_steps=1, data_dir=None, schema_dir=None):
    """create train dataset"""
    # apply repeat operations
    repeat_count = epoch_size
    files = os.listdir(data_dir)
    data_files = []
    for file_name in files:
34 35 36
        if "tfrecord" in file_name:
            data_files.append(os.path.join(data_dir, file_name))
    ds = de.TFRecordDataset(data_files, schema_dir if schema_dir != "" else None,
W
wanghua 已提交
37 38
                            columns_list=["input_ids", "input_mask", "segment_ids", "next_sentence_labels",
                                          "masked_lm_positions", "masked_lm_ids", "masked_lm_weights"],
39 40
                            shuffle=de.Shuffle.FILES if do_shuffle == "true" else False,
                            num_shards=device_num, shard_id=rank, shard_equal_rows=True)
W
wanghua 已提交
41
    ori_dataset_size = ds.get_dataset_size()
C
chenhaozhe 已提交
42
    print('origin dataset size: ', ori_dataset_size)
W
wanghua 已提交
43 44 45 46
    new_size = ori_dataset_size
    if enable_data_sink == "true":
        new_size = data_sink_steps * bert_net_cfg.batch_size
    ds.set_dataset_size(new_size)
47
    new_repeat_count = int(repeat_count * ori_dataset_size // ds.get_dataset_size())
W
wanghua 已提交
48 49 50 51 52 53 54 55 56 57 58
    type_cast_op = C.TypeCast(mstype.int32)
    ds = ds.map(input_columns="masked_lm_ids", operations=type_cast_op)
    ds = ds.map(input_columns="masked_lm_positions", operations=type_cast_op)
    ds = ds.map(input_columns="next_sentence_labels", operations=type_cast_op)
    ds = ds.map(input_columns="segment_ids", operations=type_cast_op)
    ds = ds.map(input_columns="input_mask", operations=type_cast_op)
    ds = ds.map(input_columns="input_ids", operations=type_cast_op)
    # apply batch operations
    ds = ds.batch(bert_net_cfg.batch_size, drop_remainder=True)
    logger.info("data size: {}".format(ds.get_dataset_size()))
    logger.info("repeatcount: {}".format(ds.get_repeat_count()))
59
    return ds, new_repeat_count
W
wanghua 已提交
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133


def create_ner_dataset(batch_size=1, repeat_count=1, assessment_method="accuracy",
                       data_file_path=None, schema_file_path=None):
    """create finetune or evaluation dataset"""
    type_cast_op = C.TypeCast(mstype.int32)
    ds = de.TFRecordDataset([data_file_path], schema_file_path if schema_file_path != "" else None,
                            columns_list=["input_ids", "input_mask", "segment_ids", "label_ids"])
    if assessment_method == "Spearman_correlation":
        type_cast_op_float = C.TypeCast(mstype.float32)
        ds = ds.map(input_columns="label_ids", operations=type_cast_op_float)
    else:
        ds = ds.map(input_columns="label_ids", operations=type_cast_op)
    ds = ds.map(input_columns="segment_ids", operations=type_cast_op)
    ds = ds.map(input_columns="input_mask", operations=type_cast_op)
    ds = ds.map(input_columns="input_ids", operations=type_cast_op)
    ds = ds.repeat(repeat_count)
    # apply shuffle operation
    buffer_size = 960
    ds = ds.shuffle(buffer_size=buffer_size)
    # apply batch operations
    ds = ds.batch(batch_size, drop_remainder=True)
    return ds


def create_classification_dataset(batch_size=1, repeat_count=1, assessment_method="accuracy",
                                  data_file_path=None, schema_file_path=None):
    """create finetune or evaluation dataset"""
    type_cast_op = C.TypeCast(mstype.int32)
    ds = de.TFRecordDataset([data_file_path], schema_file_path if schema_file_path != "" else None,
                            columns_list=["input_ids", "input_mask", "segment_ids", "label_ids"])
    if assessment_method == "Spearman_correlation":
        type_cast_op_float = C.TypeCast(mstype.float32)
        ds = ds.map(input_columns="label_ids", operations=type_cast_op_float)
    else:
        ds = ds.map(input_columns="label_ids", operations=type_cast_op)
    ds = ds.map(input_columns="segment_ids", operations=type_cast_op)
    ds = ds.map(input_columns="input_mask", operations=type_cast_op)
    ds = ds.map(input_columns="input_ids", operations=type_cast_op)
    ds = ds.repeat(repeat_count)
    # apply shuffle operation
    buffer_size = 960
    ds = ds.shuffle(buffer_size=buffer_size)
    # apply batch operations
    ds = ds.batch(batch_size, drop_remainder=True)
    return ds


def create_squad_dataset(batch_size=1, repeat_count=1, data_file_path=None, schema_file_path=None, is_training=True):
    """create finetune or evaluation dataset"""
    type_cast_op = C.TypeCast(mstype.int32)
    if is_training:
        ds = de.TFRecordDataset([data_file_path], schema_file_path if schema_file_path != "" else None,
                                columns_list=["input_ids", "input_mask", "segment_ids",
                                              "start_positions", "end_positions",
                                              "unique_ids", "is_impossible"])
        ds = ds.map(input_columns="start_positions", operations=type_cast_op)
        ds = ds.map(input_columns="end_positions", operations=type_cast_op)
    else:
        ds = de.TFRecordDataset([data_file_path], schema_file_path if schema_file_path != "" else None,
                                columns_list=["input_ids", "input_mask", "segment_ids", "unique_ids"])
        ds = ds.map(input_columns="input_ids", operations=type_cast_op)
        ds = ds.map(input_columns="input_mask", operations=type_cast_op)
        ds = ds.map(input_columns="segment_ids", operations=type_cast_op)
    ds = ds.map(input_columns="segment_ids", operations=type_cast_op)
    ds = ds.map(input_columns="input_mask", operations=type_cast_op)
    ds = ds.map(input_columns="input_ids", operations=type_cast_op)
    ds = ds.repeat(repeat_count)
    # apply shuffle operation
    buffer_size = 960
    ds = ds.shuffle(buffer_size=buffer_size)
    # apply batch operations
    ds = ds.batch(batch_size, drop_remainder=True)
    return ds