main2.py 8.1 KB
Newer Older
Y
yangyaqin1@huawei.com 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
import csv
import os
import time

import numpy as np
from easydict import EasyDict as edict
from matplotlib import pyplot as plt

import mindspore
from mindspore import nn
from mindspore import context
from mindspore import dataset
from mindspore.train.callback import TimeMonitor, LossMonitor
from mindspore import Tensor
from mindspore.train import Model
from mindspore.train.callback import ModelCheckpoint, CheckpointConfig

context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")

# 解析执行本脚本时的传参
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--data_url', required=True, default=None, help='Location of data.')
parser.add_argument('--train_url', required=True, default=None, help='Location of training outputs.')
args, unknown = parser.parse_known_args()

import moxing as mox
mox.file.copy_parallel(src_url=os.path.join(args.data_url, 'iris.data'), dst_url='iris.data')  # 将OBS桶中数据拷贝到容器中

cfg = edict({
    'data_size': 150,
    'train_size': 120,  # 训练集大小
    'test_size': 30,  # 测试集大小
    'feature_number': 4,  # 输入特征数
    'num_class': 3,  # 分类类别
    'batch_size': 30,
    'data_dir': 'iris.data',  # 执行容器中数据集所在路径
    'save_checkpoint_steps': 5,  # 多少步保存一次模型
    'keep_checkpoint_max': 1,  # 最多保存多少个模型
    'out_dir_no_opt': './model_iris/no_opt',  # 保存模型路径,无优化器模型
    'out_dir_sgd': './model_iris/sgd',  # 保存模型路径,SGD优化器模型
    'out_dir_momentum': './model_iris/momentum',  # 保存模型路径,momentum模型
    'out_dir_adam': './model_iris/adam',  # 保存模型路径,adam优化器模型
    'output_prefix': "checkpoint_fashion_forward"  # 保存模型文件名
})


# 读取数据并预处理:
with open(cfg.data_dir) as csv_file:
    data = list(csv.reader(csv_file, delimiter=','))

# 共150条数据,将数据集的4个属性作为自变量X。将数据集的3个类别映射为{0, 1,2},作为因变量Y。
label_map = {'Iris-setosa': 0, 'Iris-versicolor': 1, 'Iris-virginica': 2}
X = np.array([[float(x) for x in s[:-1]] for s in data[:cfg.data_size]], np.float32)
Y = np.array([label_map[s[-1]] for s in data[:150]], np.int32)

# 将数据集分为训练集120条,测试集30条。
train_idx = np.random.choice(cfg.data_size, cfg.train_size, replace=False)
test_idx = np.array(list(set(range(cfg.data_size)) - set(train_idx)))
X_train, Y_train = X[train_idx], Y[train_idx]
X_test, Y_test = X[test_idx], Y[test_idx]
print('训练数据x尺寸:', X_train.shape)
print('训练数据y尺寸:', Y_train.shape)
print('测试数据x尺寸:', X_test.shape)
print('测试数据y尺寸:', Y_test.shape)

# 使用MindSpore GeneratorDataset接口将numpy.ndarray类型的数据转换为Dataset。
def gen_data(X_train, Y_train, epoch_size):
    XY_train = list(zip(X_train, Y_train))
    ds_train = dataset.GeneratorDataset(XY_train, ['x', 'y'])
    ds_train.set_dataset_size(cfg.train_size)
    ds_train = ds_train.shuffle(buffer_size=cfg.train_size).batch(cfg.batch_size, drop_remainder=True)
    XY_test = list(zip(X_test, Y_test))
    ds_test = dataset.GeneratorDataset(XY_test, ['x', 'y'])
    ds_test.set_dataset_size(cfg.test_size)
    ds_test = ds_test.shuffle(buffer_size=cfg.test_size).batch(cfg.test_size, drop_remainder=True)
    return ds_train, ds_test


# 继承并重写LossMonitor
class SubLossMonitor(LossMonitor):
    def epoch_end(self, run_context):
        pass

    def step_end(self, run_context):
        cb_params = run_context.original_args()
        step_loss = cb_params.net_outputs

        if isinstance(step_loss, (tuple, list)) and isinstance(step_loss[0], Tensor):
            step_loss = step_loss[0]
        if isinstance(step_loss, Tensor):
            step_loss = np.mean(step_loss.asnumpy())

        self.losses.append(step_loss)
        cur_step_in_epoch = int((cb_params.cur_step_num - 1) % cb_params.batch_num) + 1

        if isinstance(step_loss, float) and (np.isnan(step_loss) or np.isinf(step_loss)):
            raise ValueError("Epoch: [{:3d}/{:3d}], step: [{:5d}/{:5d}]. "
                             "Invalid loss, terminating training.".format(
                cb_params.cur_epoch_num - 1, cb_params.epoch_num,
                cur_step_in_epoch, cb_params.batch_num))

        if self._per_print_times != 0 and cb_params.cur_step_num % self._per_print_times == 0:
            print("epoch: {} step {}, loss: {}, avg loss: {:5.3f}".format(cb_params.cur_epoch_num,
                                                                          cur_step_in_epoch,
                                                                          step_loss, np.mean(self.losses)), flush=True)


# 训练
def train(network, net_opt, ds_train, prefix, directory, print_times):
    net_loss = nn.SoftmaxCrossEntropyWithLogits(is_grad=False, sparse=True, reduction="mean")
    model = Model(network, loss_fn=net_loss, optimizer=net_opt, metrics={"acc"})
    loss_cb = SubLossMonitor(per_print_times=print_times)
    config_ck = CheckpointConfig(save_checkpoint_steps=cfg.save_checkpoint_steps,
                                 keep_checkpoint_max=cfg.keep_checkpoint_max)
    ckpoint_cb = ModelCheckpoint(prefix=prefix, directory=directory, config=config_ck)
    print("============== Starting Training ==============")
    model.train(epoch_size, ds_train, callbacks=[ckpoint_cb, loss_cb], dataset_sink_mode=False)
    return model


# 评估预测
def eval_predict(model, ds_test):
    # 使用测试集评估模型,打印总体准确率
    metric = model.eval(ds_test)
    print(metric)
    # 预测
    test_ = ds_test.create_dict_iterator().get_next()
    test = Tensor(test_['x'], mindspore.float32)
    predictions = model.predict(test)
    predictions = predictions.asnumpy()
    for i in range(10):
        p_np = predictions[i, :]
        p_list = p_np.tolist()
        print('第' + str(i) + '个sample预测结果:', p_list.index(max(p_list)), '   真实结果:', test_['y'][i])


# --------------------------------------------------无优化器-----------------------------------
epoch_size = 20
print('------------------无优化器--------------------------')
# 数据
ds_train, ds_test = gen_data(X_train, Y_train, epoch_size)
# 定义网络并训练
network = nn.Dense(cfg.feature_number, cfg.num_class)
model = train(network, None, ds_train, "checkpoint_no_opt", cfg.out_dir_no_opt, 4)
# 评估预测
eval_predict(model, ds_test)

# ---------------------------------------------------SGD-------------------------------------
epoch_size = 200
lr = 0.01
print('-------------------SGD优化器-----------------------')
# 数据
ds_train, ds_test = gen_data(X_train, Y_train, epoch_size)
# 定义网络并训练、测试、预测
network = nn.Dense(cfg.feature_number, cfg.num_class)
net_opt = nn.SGD(network.trainable_params(), lr)
model = train(network, net_opt, ds_train, "checkpoint_sgd", cfg.out_dir_sgd, 40)
# 评估预测
eval_predict(model, ds_test)

# ----------------------------------------------------Momentum-------------------------------
epoch_size = 20
lr = 0.01
print('-------------------Momentum优化器-----------------------')
# 数据
ds_train, ds_test = gen_data(X_train, Y_train, epoch_size)
# 定义网络并训练
network = nn.Dense(cfg.feature_number, cfg.num_class)
net_opt = nn.Momentum(network.trainable_params(), lr, 0.9)
model = train(network, net_opt, ds_train, "checkpoint_momentum", cfg.out_dir_momentum, 4)
# 评估预测
eval_predict(model, ds_test)

# ----------------------------------------------------Adam-----------------------------------
epoch_size = 15
lr = 0.1
print('------------------Adam优化器--------------------------')
# 数据
ds_train, ds_test = gen_data(X_train, Y_train, epoch_size)
# 定义网络并训练
network = nn.Dense(cfg.feature_number, cfg.num_class)
net_opt = nn.Adam(network.trainable_params(), learning_rate=lr)
model = train(network, net_opt, ds_train, "checkpoint_adam", cfg.out_dir_adam, 4)
# 评估预测
eval_predict(model, ds_test)

# -------------------------------------------------------------------------------------------
mox.file.copy_parallel(src_url='model_iris', dst_url=args.train_url)  # 将容器输出拷贝到OBS桶中