提交 dbc7f776 编写于 作者: M mir-of

update cnn and bert for of-develop

上级 968a114e
......@@ -5,35 +5,41 @@ OneFlow models for benchmarking.
### cnns
* 1 node, 1 gpu:
```
python3 cnn_benchmark/of_cnn_benchmarks.py \
--gpu_num_per_node=1 \
--model="vgg16" \
--batch_size_per_device=8 \
--iter_num=5 \
--learning_rate=0.01 \
--optimizer="sgd" \
--loss_print_every_n_iter=1 \
--warmup_iter_num=2 \
--data_dir="/dataset/ofrecord/imagenet/train"
python3 cnn_benchmark/of_cnn_train_val.py \
--gpu_num_per_node=1 \
--batch_size_per_device=32 \
--val_batch_size_per_device=32 \
--use_new_dataloader=True \
--train_data_part_num=256 \
--val_data_part_num=256 \
--num_epochs=1 \
--optimizer="sgd" \
--learning_rate=0.256 \
--use_fp16=False \
--use_boxing_v2=True \
--model="resnet50"
```
* 2 nodes, 2 gpu each node:
* 2 nodes:
simply add `--node_num=2 --node_list="192.168.1.12,192.168.1.14" ` :
simply add `--num_nodes=2 --node_ips="192.168.1.12,192.168.1.14" ` :
```
python3 cnn_benchmark/of_cnn_benchmarks.py \
--gpu_num_per_node=2 \
--node_num=2 \
--node_list="192.168.1.12,192.168.1.14" \
--model="vgg16" \
--batch_size_per_device=8 \
--iter_num=5 \
--learning_rate=0.01 \
--optimizer="sgd" \
--loss_print_every_n_iter=1 \
--warmup_iter_num=2 \
--data_dir="/dataset/ofrecord/imagenet/train"
python3 cnn_benchmark/of_cnn_train_val.py \
--num_nodes=2 \
--node_ips="192.168.1.12,192.168.1.14" \
--gpu_num_per_node=1 \
--batch_size_per_device=32 \
--val_batch_size_per_device=32 \
--use_new_dataloader=True \
--train_data_part_num=256 \
--val_data_part_num=256 \
--num_epochs=1 \
--optimizer="sgd" \
--learning_rate=0.256 \
--use_fp16=False \
--use_boxing_v2=True \
--model="resnet50"
```
### bert pretrain
......@@ -42,15 +48,10 @@ OneFlow models for benchmarking.
```
python3 bert_benchmark/run_pretraining.py \
--gpu_num_per_node=1 \
--node_num=1 \
--learning_rate=1e-4 \
--weight_l2=0.01 \
--batch_size_per_device=24 \
--batch_size_per_device=12 \
--iter_num=5 \
--loss_print_every_n_iter=1 \
--warmup_iter_num=2 \
--data_dir="/dataset/ofrecord/wiki_128" \
--data_part_num=1 \
--seq_length=128 \
--max_predictions_per_seq=20 \
--num_hidden_layers=12 \
......@@ -60,22 +61,23 @@ OneFlow models for benchmarking.
--vocab_size=30522 \
--attention_probs_dropout_prob=0.1 \
--hidden_dropout_prob=0.1 \
--hidden_size_per_head=64
--hidden_size_per_head=64 \
--data_part_num=1 \
--data_dir="/dataset/bert/of_wiki_seq_len_128"
```
* bert large:
bert large's is different from bert base in flowing configurations:
`--max_predictions_per_seq=80 --num_hidden_layers=24 --num_attention_heads=16 --max_position_embeddings=512`
```
python3 ../bert_benchmark/run_pretraining.py \
--gpu_num_per_node=$GPU_NUM_PER_NODE \
--node_num=$NODE_NUM \
--node_list=$NODE_LIST \
python3 bert_benchmark/run_pretraining.py \
--gpu_num_per_node=1 \
--learning_rate=1e-4 \
--weight_l2=0.01 \
--batch_size_per_device=24 \
--batch_size_per_device=12 \
--iter_num=5 \
--loss_print_every_n_iter=1 \
--warmup_iter_num=2 \
--seq_length=512 \
--seq_length=128 \
--max_predictions_per_seq=80 \
--num_hidden_layers=24 \
--num_attention_heads=16 \
......@@ -85,27 +87,13 @@ OneFlow models for benchmarking.
--attention_probs_dropout_prob=0.1 \
--hidden_dropout_prob=0.1 \
--hidden_size_per_head=64 \
--data_part_num=32 \
--data_dir=/dataset/ofrecord/wiki_512"
--data_part_num=1 \
--data_dir="/dataset/bert/of_wiki_seq_len_128"
```
* 2 nodes, 2 gpu each node:
* 2 nodes:
simply add `--node_num=2 --node_list='192.168.1.12,192.168.1.14' `,see above.
## Inference
* 1 node, 1 gpu:
```
python3 cnn_benchmark/of_cnn_infer_benchmarks.py \
--gpu_num_per_node=1 \
--model="vgg16" \
--batch_size_per_device=8 \
--iter_num=5 \
--print_every_n_iter=1 \
--warmup_iter_num=2 \
--data_dir="/dataset/ofrecord/imagenet/train"
```
If you want to run the benchmark with TensorRT or XLA, only pass `--use_tensorrt` or `--use_xla_jit` to enable it. Low-precision arithmetics such as float16 or int8 are usually faster than 32bit float, and you can pass `--precision=float16` for acceleration.
simply add `--num_nodes=2 --node_ips="192.168.1.12,192.168.1.14" ` :
## build docker images from wheel
......
# ###################################################################
# alexnet.py
# 使用方法说明:
# 单机运行: python alexnet.py -g 1
# -g 指定使用的GPU个数
# 多机运行: python alexnet.py -g 8 -m -n "192.168.1.15,192.168.1.16"
# -g 指定使用的GPU个数
# -m 指定使用多机运行
# -n 指定各个机器ip地址,用逗号分格
# ###################################################################
import oneflow as flow
import argparse
DATA_DIR = "/dataset/imagenet_1k/oneflow/30/train"
parser = argparse.ArgumentParser(description="flags for multi-node and resource")
parser.add_argument("-i", "--iter_num", type=int, default=10, required=False)
parser.add_argument("-g", "--gpu_num_per_node", type=int, default=1, required=False)
parser.add_argument(
"-m", "--multinode", default=False, action="store_true", required=False
)
parser.add_argument("-n", "--node_list", type=str, default=None, required=False)
parser.add_argument("-e", "--eval_dir", type=str, default=DATA_DIR, required=False)
parser.add_argument("-t", "--train_dir", type=str, default=DATA_DIR, required=False)
parser.add_argument("-load", "--model_load_dir", type=str, default="", required=False)
parser.add_argument(
"-save", "--model_save_dir", type=str, default="./checkpoints", required=False
)
args = parser.parse_args()
def _data_load_layer(data_dir):
# 从数据集加载图像,并进行数据预处理
image_blob_conf = flow.data.BlobConf(
"encoded",
shape=(227, 227, 3),
dtype=flow.float,
codec=flow.data.ImageCodec([flow.data.ImageResizePreprocessor(227, 227)]),
preprocessors=[flow.data.NormByChannelPreprocessor((123.68, 116.78, 103.94))],
)
# 从数据集加载标签
label_blob_conf = flow.data.BlobConf(
"class/label", shape=(), dtype=flow.int32, codec=flow.data.RawCodec()
)
# 解码
labels, images = flow.data.decode_ofrecord(
data_dir,
(label_blob_conf, image_blob_conf),
batch_size=12,
data_part_num=8,
name="decode",
)
return labels, images
def _conv2d_layer(
name,
input,
filters,
kernel_size=3,
strides=1,
padding="SAME",
data_format="NCHW",
dilation_rate=1,
activation="Relu",
use_bias=False,
weight_initializer=flow.random_uniform_initializer(),
bias_initializer=None,
):
weight_shape = (filters, input.static_shape[1], kernel_size, kernel_size)
weight = flow.get_variable(
name + "-weight",
shape=weight_shape,
dtype=input.dtype,
initializer=weight_initializer,
)
output = flow.nn.conv2d(
input, weight, strides, padding, data_format, dilation_rate, name=name
)
if use_bias:
bias = flow.get_variable(
name + "-bias",
shape=(filters,),
dtype=input.dtype,
initializer=bias_initializer,
)
output = flow.nn.bias_add(output, bias, data_format)
if activation is not None:
if activation == "Relu":
output = flow.keras.activations.relu(output)
else:
raise NotImplementedError
return output
def alexnet(images, labels):
# 数据数据集格式转换, NHWC -> NCHW
transposed = flow.transpose(images, name="transpose", perm=[0, 3, 1, 2])
conv1 = _conv2d_layer(
"conv1", transposed, filters=64, kernel_size=11, strides=4, padding="VALID"
)
pool1 = flow.nn.avg_pool2d(conv1, 3, 2, "VALID", "NCHW", name="pool1")
conv2 = _conv2d_layer("conv2", pool1, filters=192, kernel_size=5)
pool2 = flow.nn.avg_pool2d(conv2, 3, 2, "VALID", "NCHW", name="pool2")
conv3 = _conv2d_layer("conv3", pool2, filters=384)
conv4 = _conv2d_layer("conv4", conv3, filters=384)
conv5 = _conv2d_layer("conv5", conv4, filters=256)
pool5 = flow.nn.avg_pool2d(conv5, 3, 2, "VALID", "NCHW", name="pool5")
if len(pool5.shape) > 2:
pool5 = flow.reshape(pool5, shape=(pool5.static_shape[0], -1))
fc1 = flow.layers.dense(
inputs=pool5,
units=4096,
activation=flow.keras.activations.relu,
use_bias=False,
kernel_initializer=flow.random_uniform_initializer(),
bias_initializer=False,
trainable=True,
name="fc1",
)
dropout1 = flow.nn.dropout(fc1, rate=0.5)
fc2 = flow.layers.dense(
inputs=dropout1,
units=4096,
activation=flow.keras.activations.relu,
use_bias=False,
kernel_initializer=flow.random_uniform_initializer(),
bias_initializer=False,
trainable=True,
name="fc2",
)
dropout2 = flow.nn.dropout(fc2, rate=0.5)
fc3 = flow.layers.dense(
inputs=dropout2,
units=1001,
activation=None,
use_bias=False,
kernel_initializer=flow.random_uniform_initializer(),
bias_initializer=False,
trainable=True,
name="fc3",
)
# 损失函数
loss = flow.nn.sparse_softmax_cross_entropy_with_logits(
labels, fc3, name="softmax_loss"
)
return loss
# 训练任务
@flow.function
def alexnet_train_job():
# 设置训练超参数
flow.config.train.primary_lr(0.00001)
flow.config.train.model_update_conf(dict(naive_conf={}))
# 加载数据
(labels, images) = _data_load_layer(args.train_dir)
# 构建网络
loss = alexnet(images, labels)
# 指定训练网络的loss(优化目标)
flow.losses.add_loss(loss)
return loss
# 预测任务
@flow.function
def alexnet_eval_job():
# 加载数据
(labels, images) = _data_load_layer(args.eval_dir)
# 构建预测网络
loss = alexnet(images, labels)
return loss
def main():
# 配置运行方式
flow.config.gpu_device_num(args.gpu_num_per_node)
flow.config.ctrl_port(9788)
flow.config.default_data_type(flow.float)
# 设置多机分布式端口
if args.multinode:
flow.config.ctrl_port(12138)
nodes = []
for n in args.node_list.strip().split(","):
addr_dict = {}
addr_dict["addr"] = n
nodes.append(addr_dict)
flow.config.machine(nodes)
# 模型加载/初始化
check_point = flow.train.CheckPoint()
if not args.model_load_dir:
check_point.init()
else:
check_point.load(args.model_load_dir)
# 训练迭代过程
print("{:>12} {:>12} {:>12}".format("iter", "loss type", "loss value"))
for i in range(args.iter_num):
fmt_str = "{:>12} {:>12} {:>12.10f}"
# 打印训练输出
train_loss = alexnet_train_job().get().mean()
print(fmt_str.format(i, "train loss:", train_loss))
# 打印预测输出
if (i + 1) % 10 == 0:
eval_loss = alexnet_eval_job().get().mean()
print(fmt_str.format(i, "eval loss:", eval_loss))
# 保存模型
if (i + 1) % 100 == 0:
check_point.save(args.model_save_dir + str(i))
if __name__ == "__main__":
main()
......@@ -6,12 +6,13 @@ import oneflow as flow
from model_util import conv2d_layer
def alexnet(images, trainable=True):
def alexnet(images, need_transpose=False):
transposed = flow.transpose(images, name="transpose", perm=[0, 3, 1, 2])
if need_transpose:
images = flow.transpose(images, name="transpose", perm=[0, 3, 1, 2])
conv1 = conv2d_layer(
"conv1", transposed, filters=64, kernel_size=11, strides=4, padding="VALID"
"conv1", images, filters=64, kernel_size=11, strides=4, padding="VALID"
)
pool1 = flow.nn.avg_pool2d(conv1, 3, 2, "VALID", "NCHW", name="pool1")
......@@ -29,7 +30,7 @@ def alexnet(images, trainable=True):
pool5 = flow.nn.avg_pool2d(conv5, 3, 2, "VALID", "NCHW", name="pool5")
if len(pool5.shape) > 2:
pool5 = flow.reshape(pool5, shape=(pool5.static_shape[0], -1))
pool5 = flow.reshape(pool5, shape=(pool5.shape[0], -1))
fc1 = flow.layers.dense(
inputs=pool5,
......@@ -38,7 +39,6 @@ def alexnet(images, trainable=True):
use_bias=False,
kernel_initializer=flow.random_uniform_initializer(),
bias_initializer=False,
trainable=trainable,
name="fc1",
)
......@@ -51,7 +51,6 @@ def alexnet(images, trainable=True):
use_bias=False,
kernel_initializer=flow.random_uniform_initializer(),
bias_initializer=False,
trainable=trainable,
name="fc2",
)
......@@ -64,7 +63,6 @@ def alexnet(images, trainable=True):
use_bias=False,
kernel_initializer=flow.random_uniform_initializer(),
bias_initializer=False,
trainable=trainable,
name="fc3",
)
......
## Inference
测试平台:Nvidia GTX2080Ti单卡.
CUDA版本:10.0
CUDNN版本:7.5.0
TensorRT版本:6.0.1
Oneflow-Benchmark
branch: of_dev_python_py3
commit: 985dd3f03887d266e66573db0b31a4cf3051ff31
Oneflow:
branch: of_xrt_tensorrt
commit: 726c3a12b9d97b57f9fb7e3d212b63564e20e755
### CV
#### Speed
输入图片大小为224 (inception-v3为299),预热5 batches,平均吞吐(img/s)为500个batches的平均值。
1. batch size为8
>| - | Oneflow(fp32) | Oneflow(fp16) | TensorRT(fp32) | TensorRT(fp16) | TensorRT(int8) |
>| ------------ | ------------- | ------------- | -------------- | -------------- | -------------- |
>| alexnet | 2637 | 1550 | 2540 | 2759 | |
>| vgg16 | 371 | 332 | 377 | 1124 | |
>| resnet50 | 657 | 541 | 729 | 940 | |
>| inception-v3 | 433 | 434 | 489 | 999 | |
2. batch size为50
>| - | Oneflow(fp32) | Oneflow(fp16) | TensorRT(fp32) | TensorRT(fp16) | TensorRT(int8) |
>| ------------ | ------------- | ------------- | -------------- | -------------- | -------------- |
>| alexnet | 6999 | 3219 | 4306 | 7704 | |
>| vgg16 | 497 | 476 | 404 | 1482 | |
>| resnet50 | 810 | 619 | 830 | 1285 | |
>| inception-v3 | 544 | 531 | 717 | 1839 | |
#### Precision
总共5w张图片, 统计Top1 accuracy和相对oneflow fp32的分类误差数量。
>| - | Oneflow(fp32) | Oneflow(fp16) | TensorRT(fp32) | TensorRT(fp16) | TensorRT(int8) |
>| ------------ | ------------- | ------------- | -------------- | -------------- | -------------- |
>| vgg16 | 0.495 / 0 | 0.495 / 61 | 0.495 / 0 | 0.495 / 101 | |
>| alexnet | | | | | |
>| resnet50 | 0.613 / 0 | 0.613 / 59 | 0.613 / 0 | 0.613 / 130 | |
>| inception-v3 | | | | | |
import time
import numpy as np
class StopWatch:
def __init__(self):
pass
def start(self):
self.start_time = time.time()
self.last_split = self.start_time
def set_start(self, val):
self.start_time = val
self.last_split = self.start_time
def split(self):
now = time.time()
duration = now - self.last_split
self.last_split = now
return duration
def stop(self):
self.stop_time = time.time()
def duration(self):
return self.stop_time - self.start_time
class CNNSpeedometer:
def __init__(self):
self.watch = StopWatch()
self.throughoutput_list = []
def speedometer_cb(
self,
step,
start_time,
total_batch_size,
skip_iter_num,
iter_num,
loss_print_every_n_iter,
):
def callback(train_loss):
assert skip_iter_num >= 0
if skip_iter_num == 0 and step == 0:
self.watch.set_start(start_time)
print("Start trainning without any skipping iteration.")
if step < skip_iter_num:
if step == 0:
print(
"Skipping {} iterations for benchmark purpose.".format(
skip_iter_num
)
)
if (step + 1) == skip_iter_num:
self.watch.start()
print("Start trainning.")
else:
train_step = step - skip_iter_num
if (train_step + 1) % loss_print_every_n_iter == 0:
loss = train_loss.mean()
avg_elapse_time_per_iter = (
self.watch.split() / loss_print_every_n_iter
)
samples_per_sec = total_batch_size / avg_elapse_time_per_iter
print(
"iter {}, loss: {:.3f}, speed: {:.3f}(sec/batch), {:.3f}(images/sec)".format(
train_step, loss, avg_elapse_time_per_iter, samples_per_sec
)
)
self.throughoutput_list.append(samples_per_sec)
if (train_step + 1) == iter_num:
self.watch.stop()
totoal_duration = self.watch.duration()
avg_samples_per_sec = total_batch_size * iter_num / totoal_duration
print("-".ljust(66, "-"))
print(
"average speed: {:.3f}(images/sec), new_cal_method: {:.3f}(images/sec)".format(
avg_samples_per_sec, np.mean(self.throughoutput_list)
)
)
print("-".ljust(66, "-"))
return callback
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
from datetime import datetime
import logging
import oneflow
from dali import add_dali_args
from optimizer_util import add_optimizer_args
from ofrecord_util import add_ofrecord_args
def get_parser(parser=None):
def str_list(x):
return x.split(',')
def int_list(x):
return list(map(int, x.split(',')))
def float_list(x):
return list(map(float, x.split(',')))
def str2bool(v):
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise argparse.ArgumentTypeError('Unsupported value encountered.')
if parser is None:
parser = argparse.ArgumentParser("flags for cnn benchmark")
parser.add_argument("--dtype", type=str, default='float32', help="float16 float32")
parser.add_argument("--dtype", type=str,
default='float32', help="float16 float32")
# resouce
parser.add_argument("--gpu_num_per_node", type=int, default=1)
parser.add_argument('--num_nodes', type=int, default=1,
help='node/machine number for training')
parser.add_argument('--node_ips', type=str_list, default=['192.168.1.15', '192.168.1.16'],
parser.add_argument('--node_ips', type=str_list, default=['192.168.1.13', '192.168.1.14'],
help='nodes ip list for training, devided by ",", length >= num_nodes')
parser.add_argument("--model", type=str, default="vgg16", help="vgg16 or resnet50")
parser.add_argument("--model", type=str, default="vgg16",
help="vgg16 or resnet50")
parser.add_argument(
'--use_fp16',
type=str2bool,
nargs='?',
const=True,
help='Whether to use use fp16'
)
parser.add_argument(
'--use_boxing_v2',
type=str2bool,
nargs='?',
const=True,
help='Whether to use boxing v2'
)
# train
parser.add_argument("--model_load_dir", type=str, default=None, help="model load directory if need")
parser.add_argument(
'--use_new_dataloader',
type=str2bool,
nargs='?',
const=True,
help='Whether to use new dataloader'
)
# train and validaion
parser.add_argument('--num_epochs', type=int,
default=90, help='number of epochs')
parser.add_argument("--model_load_dir", type=str,
default=None, help="model load directory if need")
parser.add_argument("--batch_size_per_device", type=int, default=64)
parser.add_argument("--learning_rate", type=float, default=0.256)
parser.add_argument("--optimizer", type=str, default="momentum-cosine-decay",
help="sgd, adam, momentum, momentum-cosine-decay")
parser.add_argument("--weight_l2", type=float, default=1.0/32768, help="weight decay parameter")
# from mxnet
parser.add_argument('--num_epochs', type=int, default=90, help='number of epochs')
parser.add_argument('--lr', type=float, default=0.1, help='initial learning rate')
parser.add_argument('--lr-schedule', choices=('multistep', 'cosine'), default='cosine',
help='learning rate schedule')
parser.add_argument('--lr-factor', type=float, default=0.256,
help='the ratio to reduce lr on each step')
parser.add_argument('--lr-steps', type=float_list, default=[],
help='the epochs to reduce the lr, e.g. 30,60')
parser.add_argument('--warmup-epochs', type=int, default=5,
help='the epochs to ramp-up lr to scaled large-batch value')
parser.add_argument("--input_layout", type=str, default='NHWC', help="NCHW or NHWC")
parser.add_argument('--image-shape', type=int_list, default=[3, 224, 224],
help='the image shape feed into the network')
parser.add_argument("--val_batch_size_per_device", type=int, default=8)
# for data process
parser.add_argument("--num_examples", type=int,
default=1281167, help="train pic number")
parser.add_argument("--num_val_examples", type=int,
default=50000, help="validation pic number")
parser.add_argument('--rgb-mean', type=float_list, default=[123.68, 116.779, 103.939],
help='a tuple of size 3 for the mean rgb')
parser.add_argument('--rgb-std', type=float_list, default=[58.393, 57.12, 57.375],
help='a tuple of size 3 for the std rgb')
parser.add_argument('--data_train', type=str, help='the training data')
parser.add_argument('--data_train_idx', type=str, default='', help='the index of training data')
parser.add_argument('--data_val', type=str, help='the validation data')
parser.add_argument('--data_val_idx', type=str, default='', help='the index of validation data')
parser.add_argument("--num_examples", type=int, default=1281167, help="train pic number")
parser.add_argument("--num_val_examples", type=int, default=50000, help="validation pic number")
## snapshot
parser.add_argument("--input_layout", type=str,
default='NHWC', help="NCHW or NHWC")
parser.add_argument('--image-shape', type=int_list, default=[3, 224, 224],
help='the image shape feed into the network')
# snapshot
parser.add_argument("--model_save_dir", type=str,
default="./output/model_save-{}".format(str(datetime.now().strftime("%Y-%m-%d-%H:%M:%S"))),
help="model save directory",
)
parser.add_argument("--val_batch_size_per_device", type=int, default=8)
default="./output/snapshots/model_save-{}".format(
str(datetime.now().strftime("%Y%m%d%H%M%S"))),
help="model save directory",
)
# log and loss print
parser.add_argument("--log_dir", type=str, default="./output", help="log info save directory")
parser.add_argument("--log_dir", type=str,
default="./output", help="log info save directory")
parser.add_argument(
"--loss_print_every_n_iter",
type=int,
default=1,
help="print loss every n iteration",
)
add_dali_args(parser)
add_ofrecord_args(parser)
add_optimizer_args(parser)
return parser
def print_args(args):
print("=".ljust(66, "="))
print("Running {}: num_gpu_per_node = {}, num_nodes = {}.".format(
args.model, args.gpu_num_per_node, args.num_nodes))
args.model, args.gpu_num_per_node, args.num_nodes))
print("=".ljust(66, "="))
for arg in vars(args):
print("{} = {}".format(arg, getattr(args, arg)))
print("-".ljust(66, "-"))
print("Time stamp: {}".format(str(datetime.now().strftime("%Y-%m-%d-%H:%M:%S"))))
print("Time stamp: {}".format(
str(datetime.now().strftime("%Y-%m-%d-%H:%M:%S"))))
if __name__ == '__main__':
......
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import time
import ctypes
#import logging
import warnings
from nvidia import dali
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types
from nvidia.dali.backend import TensorGPU
def add_dali_args(parser):
group = parser.add_argument_group('DALI data backend',
'entire group applies only to dali data backend')
group.add_argument('--dali-separ-val', action='store_true',
help='each process will perform independent validation on whole val-set')
group.add_argument('--dali-threads', type=int, default=3, help="number of threads" +\
"per GPU for DALI")
group.add_argument('--dali-validation-threads', type=int, default=10,
help="number of threads per GPU for DALI for validation")
group.add_argument('--dali-prefetch-queue', type=int, default=2,
help="DALI prefetch queue depth")
group.add_argument('--dali-nvjpeg-memory-padding', type=int, default=64,
help="Memory padding value for nvJPEG (in MB)")
group.add_argument('--dali-fuse-decoder', type=int, default=1,
help="0 or 1 whether to fuse decoder or not")
return parser
class HybridTrainPipe(Pipeline):
def __init__(self, args, batch_size, num_threads, device_id, rec_path, idx_path,
shard_id, num_shards, crop_shape, nvjpeg_padding, prefetch_queue=3,
output_layout=types.NCHW, pad_output=True, dtype='float16', dali_cpu=False):
super(HybridTrainPipe, self).__init__(batch_size, num_threads, device_id,
seed=12 + device_id,
prefetch_queue_depth = prefetch_queue)
self.input = ops.MXNetReader(path=[rec_path], index_path=[idx_path],
random_shuffle=True, shard_id=shard_id, num_shards=num_shards)
dali_device = "cpu" if dali_cpu else "mixed"
dali_resize_device = "cpu" if dali_cpu else "gpu"
if args.dali_fuse_decoder:
self.decode = ops.ImageDecoderRandomCrop(device=dali_device, output_type=types.RGB,
device_memory_padding=nvjpeg_padding,
host_memory_padding=nvjpeg_padding)
self.resize = ops.Resize(device=dali_resize_device, resize_x=crop_shape[1],
resize_y=crop_shape[0])
else:
self.decode = ops.ImageDecoder(device=dali_device, output_type=types.RGB,
device_memory_padding=nvjpeg_padding,
host_memory_padding=nvjpeg_padding)
self.resize = ops.RandomResizedCrop(device=dali_resize_device, size=crop_shape)
self.cmnp = ops.CropMirrorNormalize(device=dali_resize_device,
output_dtype=types.FLOAT16 if dtype == 'float16' else types.FLOAT,
output_layout=output_layout, crop=crop_shape, pad_output=pad_output,
image_type=types.RGB, mean=args.rgb_mean, std=args.rgb_std)
self.coin = ops.CoinFlip(probability=0.5)
def define_graph(self):
rng = self.coin()
self.jpegs, self.labels = self.input(name="Reader")
images = self.decode(self.jpegs)
images = self.resize(images)
output = self.cmnp(images, mirror=rng)
return [output, self.labels]
class HybridValPipe(Pipeline):
def __init__(self, args, batch_size, num_threads, device_id, rec_path, idx_path, shard_id,
num_shards, crop_shape, nvjpeg_padding, prefetch_queue=3, resize_shp=None,
output_layout=types.NCHW, pad_output=True, dtype='float16', dali_cpu=False):
super(HybridValPipe, self).__init__(batch_size, num_threads, device_id, seed=12 + device_id,
prefetch_queue_depth=prefetch_queue)
self.input = ops.MXNetReader(path=[rec_path], index_path=[idx_path],
random_shuffle=False, shard_id=shard_id, num_shards=num_shards)
if dali_cpu:
dali_device = "cpu"
self.decode = ops.ImageDecoder(device=dali_device, output_type=types.RGB)
else:
dali_device = "gpu"
self.decode = ops.ImageDecoder(device="mixed", output_type=types.RGB,
device_memory_padding=nvjpeg_padding,
host_memory_padding=nvjpeg_padding)
self.resize = ops.Resize(device=dali_device, resize_shorter=resize_shp) if resize_shp else None
self.cmnp = ops.CropMirrorNormalize(device=dali_device,
output_dtype=types.FLOAT16 if dtype == 'float16' else types.FLOAT,
output_layout=output_layout, crop=crop_shape, pad_output=pad_output,
image_type=types.RGB, mean=args.rgb_mean, std=args.rgb_std)
def define_graph(self):
self.jpegs, self.labels = self.input(name="Reader")
images = self.decode(self.jpegs)
if self.resize:
images = self.resize(images)
output = self.cmnp(images)
return [output, self.labels]
def feed_ndarray(dali_tensor, arr):
"""
Copy contents of DALI tensor to numpy's NDArray.
Parameters
----------
`dali_tensor` : nvidia.dali.backend.TensorCPU or nvidia.dali.backend.TensorGPU
Tensor from which to copy
`arr` : numpy.NDArray
Destination of the copy
"""
# Wait until arr is no longer used by the engine
assert dali_tensor.shape() == list(arr.shape), \
("Shapes do not match: DALI tensor has shape {0}"
", but NDArray has shape {1}".format(dali_tensor.shape(), list(arr.shape)))
# Get CTypes void pointer to the underlying memory held by arr
c_type_pointer = ctypes.c_void_p(arr.ctypes.data)
# Copy data from DALI tensor to ptr
dali_tensor.copy_to_external(c_type_pointer)
class DALIGenericIterator(object):
"""
General DALI iterator for Numpy. It can return any number of
outputs from the DALI pipeline in the form of ndarray.
Parameters
----------
pipelines : list of nvidia.dali.pipeline.Pipeline
List of pipelines to use
size : int, Number of samples in the epoch (Usually the size of the dataset).
data_layout : str, optional, default = 'NCHW'
Either 'NHWC' or 'NCHW' - layout of the pipeline outputs.
fill_last_batch : bool, optional, default = True
Whether to fill the last batch with data up to 'self.batch_size'.
The iterator would return the first integer multiple
of self._num_gpus * self.batch_size entries which exceeds 'size'.
Setting this flag to False will cause the iterator to return
exactly 'size' entries.
auto_reset : bool, optional, default = False
Whether the iterator resets itself for the next epoch
or it requires reset() to be called separately.
squeeze_labels: bool, optional, default = True
Whether the iterator should squeeze the labels before
copying them to the ndarray.
dynamic_shape: bool, optional, default = False
Whether the shape of the output of the DALI pipeline can
change during execution. If True, the ndarray will be resized accordingly
if the shape of DALI returned tensors changes during execution.
If False, the iterator will fail in case of change.
last_batch_padded : bool, optional, default = False
Whether the last batch provided by DALI is padded with the last sample
or it just wraps up. In the conjunction with `fill_last_batch` it tells
if the iterator returning last batch with data only partially filled with
data from the current epoch is dropping padding samples or samples from
the next epoch. If set to False next epoch will end sooner as data from
it was consumed but dropped. If set to True next epoch would be the
same length as the first one. For this happen, the option `pad_last_batch`
in the reader need to be set to `True` as well.
Example
-------
With the data set [1,2,3,4,5,6,7] and the batch size 2:
fill_last_batch = False, last_batch_padded = True -> last batch = [7], next iteration will return [1, 2]
fill_last_batch = False, last_batch_padded = False -> last batch = [7], next iteration will return [2, 3]
fill_last_batch = True, last_batch_padded = True -> last batch = [7, 7], next iteration will return [1, 2]
fill_last_batch = True, last_batch_padded = False -> last batch = [7, 1], next iteration will return [2, 3]
"""
def __init__(self,
pipelines,
size,
output_map=['data', 'label'],
data_layout='NCHW',
fill_last_batch=False,
auto_reset=False,
squeeze_labels=True,
dynamic_shape=False,
last_batch_padded=False):
if not isinstance(pipelines, list):
pipelines = [pipelines]
self._num_gpus = len(pipelines)
assert pipelines is not None, "Number of provided pipelines has to be at least 1"
self.batch_size = pipelines[0].batch_size
self._size = int(size)
self._pipes = pipelines
self._fill_last_batch = fill_last_batch
self._last_batch_padded = last_batch_padded
self._auto_reset = auto_reset
self._squeeze_labels = squeeze_labels
self._dynamic_shape = dynamic_shape
# Build all pipelines
for p in self._pipes:
with p._check_api_type_scope(types.PipelineAPIType.ITERATOR):
p.build()
# Use double-buffering of data batches
self._data_batches = [[None] for i in range(self._num_gpus)]
self._counter = 0
self._current_data_batch = 0
self.output_map = output_map
# We need data about the batches (like shape information),
# so we need to run a single batch as part of setup to get that info
for p in self._pipes:
with p._check_api_type_scope(types.PipelineAPIType.ITERATOR):
p.schedule_run()
self._first_batch = None
self._first_batch = self.next()
def __next__(self):
if self._first_batch is not None:
batch = self._first_batch
self._first_batch = None
return batch
if self._counter >= self._size:
if self._auto_reset:
self.reset()
raise StopIteration
# Gather outputs
outputs = []
for p in self._pipes:
with p._check_api_type_scope(types.PipelineAPIType.ITERATOR):
outputs.append(p.share_outputs())
for i in range(self._num_gpus):
# MXNet wants batches with clear distinction between
# data and label entries, so segregate outputs into
# 2 categories
# Change DALI TensorLists into Tensors
category_tensors = dict()
category_info = dict()
for j, out in enumerate(outputs[i]):
x = out.as_tensor()
category_tensors[self.output_map[j]] = x#.as_tensor()
if self._squeeze_labels and self.output_map[j]=='label':
category_tensors[self.output_map[j]].squeeze()
category_info[self.output_map[j]] = (x.shape(), np.dtype(x.dtype()))
# If we did not yet allocate memory for that batch, do it now
if self._data_batches[i][self._current_data_batch] is None:
for category in self.output_map:
t = category_tensors[category]
assert type(t) is not TensorGPU, "CPU data only"#TODO
d = []
for (shape, dtype) in category_info.values():
d.append(np.zeros(shape, dtype = dtype))
self._data_batches[i][self._current_data_batch] = d
d = self._data_batches[i][self._current_data_batch]
# Copy data from DALI Tensors to NDArrays
if self._dynamic_shape:
for j, (shape, dtype) in enumerate(category_info):
if list(d[j].shape) != shape:
d[j] = np.zeros(shape, dtype = dtype)
for j, d_arr in enumerate(d):
feed_ndarray(category_tensors[self.output_map[j]], d_arr)
for p in self._pipes:
with p._check_api_type_scope(types.PipelineAPIType.ITERATOR):
p.release_outputs()
p.schedule_run()
copy_db_index = self._current_data_batch
# Change index for double buffering
self._current_data_batch = (self._current_data_batch + 1) % 1
self._counter += self._num_gpus * self.batch_size
assert not self._fill_last_batch
## padding the last batch
#if (not self._fill_last_batch) and (self._counter > self._size):
# # this is the last batch and we need to pad
# overflow = self._counter - self._size
# overflow_per_device = overflow // self._num_gpus
# difference = self._num_gpus - (overflow % self._num_gpus)
# for i in range(self._num_gpus):
# if i < difference:
# self._data_batches[i][copy_db_index].pad = overflow_per_device
# else:
# self._data_batches[i][copy_db_index].pad = overflow_per_device + 1
#else:
# for db in self._data_batches:
# db[copy_db_index].pad = 0
#_data_batches[gpu_id][_current_data_batch][images, labels]
images = [db[copy_db_index][0] for db in self._data_batches]
labels = [db[copy_db_index][1] for db in self._data_batches]
return images, labels
#return [db[copy_db_index] for db in self._data_batches]
def next(self):
"""
Returns the next batch of data.
"""
return self.__next__()
def __iter__(self):
return self
def reset(self):
"""
Resets the iterator after the full epoch.
DALI iterators do not support resetting before the end of the epoch
and will ignore such request.
"""
if self._counter >= self._size:
if self._fill_last_batch and not self._last_batch_padded:
self._counter = self._counter % self._size
else:
self._counter = 0
for p in self._pipes:
p.reset()
if p.empty():
with p._check_api_type_scope(types.PipelineAPIType.ITERATOR):
p.schedule_run()
else:
print("DALI iterator does not support resetting while epoch is not finished. Ignoring...")
def get_rec_iter(args, dali_cpu=False, concat=True):
gpus = range(args.gpu_num_per_node)
rank = 0 #TODO
nWrk = 1 #TODO
num_threads = args.dali_threads
num_validation_threads = args.dali_validation_threads
pad_output = (args.image_shape[0] == 4)
# the input_layout w.r.t. the model is the output_layout of the image pipeline
output_layout = types.NHWC if args.input_layout == 'NHWC' else types.NCHW
trainpipes = [HybridTrainPipe(args = args,
batch_size = args.batch_size_per_device,
num_threads = num_threads,
device_id = gpu_id,
rec_path = args.data_train,
idx_path = args.data_train_idx,
shard_id = gpus.index(gpu_id) + len(gpus)*rank,
num_shards = len(gpus)*nWrk,
crop_shape = args.image_shape[1:],
output_layout = output_layout,
dtype = args.dtype,
pad_output = pad_output,
dali_cpu = dali_cpu,
nvjpeg_padding = args.dali_nvjpeg_memory_padding * 1024 * 1024,
prefetch_queue = args.dali_prefetch_queue) for gpu_id in gpus]
if args.data_val:
valpipes = [HybridValPipe(args = args,
batch_size = args.val_batch_size_per_device,
num_threads = num_validation_threads,
device_id = gpu_id,
rec_path = args.data_val,
idx_path = args.data_val_idx,
shard_id = gpus.index(gpu_id) + len(gpus)*rank,
num_shards = len(gpus)*nWrk,
crop_shape = args.image_shape[1:],
resize_shp = 256, #args.data_val_resize,
output_layout = output_layout,
dtype = args.dtype,
pad_output = pad_output,
dali_cpu = dali_cpu,
nvjpeg_padding = args.dali_nvjpeg_memory_padding * 1024 * 1024,
prefetch_queue = args.dali_prefetch_queue) for gpu_id in gpus]
trainpipes[0].build()
if args.data_val:
valpipes[0].build()
val_examples = valpipes[0].epoch_size("Reader")
if args.num_examples < trainpipes[0].epoch_size("Reader"):
warnings.warn("{} training examples will be used, although full training set contains {} examples".format(args.num_examples, trainpipes[0].epoch_size("Reader")))
dali_train_iter = DALIGenericIterator(trainpipes, args.num_examples)
if args.data_val:
dali_val_iter = DALIGenericIterator(valpipes, val_examples, fill_last_batch = False)
else:
dali_val_iter = None
return dali_train_iter, dali_val_iter
if __name__ == '__main__':
import config as configs
parser = configs.get_parser()
args = parser.parse_args()
configs.print_args(args)
train_data_iter, val_data_iter = get_rec_iter(args, True)
for epoch in range(args.num_epochs):
tic = time.time()
last_time = time.time()
print('Starting epoch {}'.format(epoch))
train_data_iter.reset()
for i, batches in enumerate(train_data_iter):
images, labels = batches
if i % args.loss_print_every_n_iter == 0:
print(args.loss_print_every_n_iter * 256 / (time.time() - last_time))
last_time = time.time()
#print(images.shape)
epoch_time = time.time() - tic
print('epoch mena images/sec', 1281167 / epoch_time, epoch_time)
# Copyright (c) 2019, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import time
import ctypes
#import logging
import warnings
from nvidia import dali
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types
from nvidia.dali.backend import TensorGPU
def add_dali_args(parser):
group = parser.add_argument_group('DALI data backend',
'entire group applies only to dali data backend')
group.add_argument('--dali-separ-val', action='store_true',
help='each process will perform independent validation on whole val-set')
group.add_argument('--dali-threads', type=int, default=3, help="number of threads" +\
"per GPU for DALI")
group.add_argument('--dali-validation-threads', type=int, default=10,
help="number of threads per GPU for DALI for validation")
group.add_argument('--dali-prefetch-queue', type=int, default=2,
help="DALI prefetch queue depth")
group.add_argument('--dali-nvjpeg-memory-padding', type=int, default=64,
help="Memory padding value for nvJPEG (in MB)")
group.add_argument('--dali-fuse-decoder', type=int, default=1,
help="0 or 1 whether to fuse decoder or not")
return parser
class HybridTrainPipe(Pipeline):
def __init__(self, args, batch_size, num_threads, device_id, rec_path, idx_path,
shard_id, num_shards, crop_shape, nvjpeg_padding, prefetch_queue=3,
output_layout=types.NCHW, pad_output=True, dtype='float16', dali_cpu=False):
super(HybridTrainPipe, self).__init__(batch_size, num_threads, device_id,
seed=12 + device_id,
prefetch_queue_depth = prefetch_queue)
self.input = ops.MXNetReader(path=[rec_path], index_path=[idx_path],
random_shuffle=True, shard_id=shard_id, num_shards=num_shards)
self.cast = ops.Cast(dtype=types.INT32)
dali_device = "cpu" if dali_cpu else "mixed"
dali_resize_device = "cpu" if dali_cpu else "gpu"
if args.dali_fuse_decoder:
self.decode = ops.ImageDecoderRandomCrop(device=dali_device, output_type=types.RGB,
device_memory_padding=nvjpeg_padding,
host_memory_padding=nvjpeg_padding)
self.resize = ops.Resize(device=dali_resize_device, resize_x=crop_shape[1],
resize_y=crop_shape[0])
else:
self.decode = ops.ImageDecoder(device=dali_device, output_type=types.RGB,
device_memory_padding=nvjpeg_padding,
host_memory_padding=nvjpeg_padding)
self.resize = ops.RandomResizedCrop(device=dali_resize_device, size=crop_shape)
self.cmnp = ops.CropMirrorNormalize(device=dali_resize_device,
output_dtype=types.FLOAT16 if dtype == 'float16' else types.FLOAT,
output_layout=output_layout, crop=crop_shape, pad_output=pad_output,
image_type=types.RGB, mean=args.rgb_mean, std=args.rgb_std)
self.coin = ops.CoinFlip(probability=0.5)
def define_graph(self):
rng = self.coin()
self.jpegs, self.labels = self.input(name="Reader")
images = self.decode(self.jpegs)
images = self.resize(images)
output = self.cmnp(images, mirror=rng)
return [output, self.cast(self.labels)]
class HybridValPipe(Pipeline):
def __init__(self, args, batch_size, num_threads, device_id, rec_path, idx_path, shard_id,
num_shards, crop_shape, nvjpeg_padding, prefetch_queue=3, resize_shp=None,
output_layout=types.NCHW, pad_output=True, dtype='float16', dali_cpu=False):
super(HybridValPipe, self).__init__(batch_size, num_threads, device_id, seed=12 + device_id,
prefetch_queue_depth=prefetch_queue)
self.input = ops.MXNetReader(path=[rec_path], index_path=[idx_path],
random_shuffle=False, shard_id=shard_id, num_shards=num_shards)
self.cast = ops.Cast(dtype=types.INT32)
if dali_cpu:
dali_device = "cpu"
self.decode = ops.ImageDecoder(device=dali_device, output_type=types.RGB)
else:
dali_device = "gpu"
self.decode = ops.ImageDecoder(device="mixed", output_type=types.RGB,
device_memory_padding=nvjpeg_padding,
host_memory_padding=nvjpeg_padding)
self.resize = ops.Resize(device=dali_device, resize_shorter=resize_shp) if resize_shp else None
self.cmnp = ops.CropMirrorNormalize(device=dali_device,
output_dtype=types.FLOAT16 if dtype == 'float16' else types.FLOAT,
output_layout=output_layout, crop=crop_shape, pad_output=pad_output,
image_type=types.RGB, mean=args.rgb_mean, std=args.rgb_std)
def define_graph(self):
self.jpegs, self.labels = self.input(name="Reader")
images = self.decode(self.jpegs)
if self.resize:
images = self.resize(images)
output = self.cmnp(images)
#return [output, self.labels]
return [output, self.cast(self.labels)]
def feed_ndarray(dali_tensor, arr, offset):
"""
Copy contents of DALI tensor to numpy's NDArray.
Parameters
----------
`dali_tensor` : nvidia.dali.backend.TensorCPU or nvidia.dali.backend.TensorGPU
Tensor from which to copy
`arr` : numpy.NDArray
Destination of the copy
"""
# Wait until arr is no longer used by the engine
#assert dali_tensor.shape() == list(arr.shape), \
# ("Shapes do not match: DALI tensor has shape {0}"
# ", but NDArray has shape {1}".format(dali_tensor.shape(), list(arr.shape)))
# Get CTypes void pointer to the underlying memory held by arr
c_type_pointer = ctypes.c_void_p(arr.ctypes.data + offset)
# Copy data from DALI tensor to ptr
dali_tensor.copy_to_external(c_type_pointer)
class DALIGenericIterator(object):
"""
General DALI iterator for Numpy. It can return any number of
outputs from the DALI pipeline in the form of ndarray.
Parameters
----------
pipelines : list of nvidia.dali.pipeline.Pipeline
List of pipelines to use
size : int, Number of samples in the epoch (Usually the size of the dataset).
data_layout : str, optional, default = 'NCHW'
Either 'NHWC' or 'NCHW' - layout of the pipeline outputs.
fill_last_batch : bool, optional, default = True
Whether to fill the last batch with data up to 'self.batch_size'.
The iterator would return the first integer multiple
of self._num_gpus * self.batch_size entries which exceeds 'size'.
Setting this flag to False will cause the iterator to return
exactly 'size' entries.
auto_reset : bool, optional, default = False
Whether the iterator resets itself for the next epoch
or it requires reset() to be called separately.
squeeze_labels: bool, optional, default = True
Whether the iterator should squeeze the labels before
copying them to the ndarray.
dynamic_shape: bool, optional, default = False
Whether the shape of the output of the DALI pipeline can
change during execution. If True, the ndarray will be resized accordingly
if the shape of DALI returned tensors changes during execution.
If False, the iterator will fail in case of change.
last_batch_padded : bool, optional, default = False
Whether the last batch provided by DALI is padded with the last sample
or it just wraps up. In the conjunction with `fill_last_batch` it tells
if the iterator returning last batch with data only partially filled with
data from the current epoch is dropping padding samples or samples from
the next epoch. If set to False next epoch will end sooner as data from
it was consumed but dropped. If set to True next epoch would be the
same length as the first one. For this happen, the option `pad_last_batch`
in the reader need to be set to `True` as well.
Example
-------
With the data set [1,2,3,4,5,6,7] and the batch size 2:
fill_last_batch = False, last_batch_padded = True -> last batch = [7], next iteration will return [1, 2]
fill_last_batch = False, last_batch_padded = False -> last batch = [7], next iteration will return [2, 3]
fill_last_batch = True, last_batch_padded = True -> last batch = [7, 7], next iteration will return [1, 2]
fill_last_batch = True, last_batch_padded = False -> last batch = [7, 1], next iteration will return [2, 3]
"""
def __init__(self,
pipelines,
size,
output_map=['data', 'label'],
data_layout='NCHW',
fill_last_batch=False,
auto_reset=False,
squeeze_labels=True,
dynamic_shape=False,
last_batch_padded=False):
if not isinstance(pipelines, list):
pipelines = [pipelines]
self._num_gpus = len(pipelines)
assert pipelines is not None, "Number of provided pipelines has to be at least 1"
self.batch_size = pipelines[0].batch_size
self._size = int(size)
self._pipes = pipelines
self._fill_last_batch = fill_last_batch
self._last_batch_padded = last_batch_padded
self._auto_reset = auto_reset
self._squeeze_labels = squeeze_labels
assert dynamic_shape == False, "support fixed shape only."
self._dynamic_shape = dynamic_shape
# Build all pipelines
for p in self._pipes:
with p._check_api_type_scope(types.PipelineAPIType.ITERATOR):
p.build()
# Use double-buffering of data batches
#self._data_batches = [[None] for i in range(self._num_gpus)]
self._data_batches = [None for i in range(2)]
self._counter = 0
self._current_data_batch = 0
self.output_map = output_map
# We need data about the batches (like shape information),
# so we need to run a single batch as part of setup to get that info
for p in self._pipes:
with p._check_api_type_scope(types.PipelineAPIType.ITERATOR):
p.schedule_run()
self._first_batch = None
self._first_batch = self.next()
def __next__(self):
if self._first_batch is not None:
batch = self._first_batch
self._first_batch = None
return batch
if self._counter >= self._size:
if self._auto_reset:
self.reset()
raise StopIteration
# Gather outputs
outputs = []
for p in self._pipes:
with p._check_api_type_scope(types.PipelineAPIType.ITERATOR):
outputs.append(p.share_outputs())
for gpu_id in range(self._num_gpus):
# MXNet wants batches with clear distinction between
# data and label entries, so segregate outputs into
# 2 categories
# Change DALI TensorLists into Tensors
category_tensors = dict()
category_info = []
for j, out in enumerate(outputs[gpu_id]):
x = out.as_tensor()
category_tensors[self.output_map[j]] = x#.as_tensor()
if self._squeeze_labels and self.output_map[j]=='label':
category_tensors[self.output_map[j]].squeeze()
category_info.append((x.shape(), np.dtype(x.dtype())))
# If we did not yet allocate memory for that batch, do it now
if self._data_batches[self._current_data_batch] is None:
for category in self.output_map:
t = category_tensors[category]
assert type(t) is not TensorGPU, "CPU data only"#TODO
d = []
self.category_nbytes = []
for j, (shape, dtype) in enumerate(category_info):
self.category_nbytes.append(np.zeros(shape, dtype = dtype).nbytes)
shape[0] = self._num_gpus * shape[0]
d.append(np.zeros(shape, dtype = dtype))
self._data_batches[self._current_data_batch] = d
d = self._data_batches[self._current_data_batch]
# Copy data from DALI Tensors to NDArrays
if self._dynamic_shape:
for j, (shape, dtype) in enumerate(category_info):
if list(d[j].shape) != shape:
d[j] = np.zeros(shape, dtype = dtype)
for j, d_arr in enumerate(d):
offset = gpu_id * self.category_nbytes[j]
feed_ndarray(category_tensors[self.output_map[j]], d_arr, offset)
for p in self._pipes:
with p._check_api_type_scope(types.PipelineAPIType.ITERATOR):
p.release_outputs()
p.schedule_run()
copy_db_index = self._current_data_batch
# Change index for double buffering
self._current_data_batch = (self._current_data_batch + 1) % 1
self._counter += self._num_gpus * self.batch_size
assert not self._fill_last_batch
## padding the last batch
#if (not self._fill_last_batch) and (self._counter > self._size):
# # this is the last batch and we need to pad
# overflow = self._counter - self._size
# overflow_per_device = overflow // self._num_gpus
# difference = self._num_gpus - (overflow % self._num_gpus)
# for i in range(self._num_gpus):
# if i < difference:
# self._data_batches[i][copy_db_index].pad = overflow_per_device
# else:
# self._data_batches[i][copy_db_index].pad = overflow_per_device + 1
#else:
# for db in self._data_batches:
# db[copy_db_index].pad = 0
return self._data_batches[copy_db_index]
def next(self):
"""
Returns the next batch of data.
"""
return self.__next__()
def __iter__(self):
return self
def reset(self):
"""
Resets the iterator after the full epoch.
DALI iterators do not support resetting before the end of the epoch
and will ignore such request.
"""
if self._counter >= self._size:
if self._fill_last_batch and not self._last_batch_padded:
self._counter = self._counter % self._size
else:
self._counter = 0
for p in self._pipes:
p.reset()
if p.empty():
with p._check_api_type_scope(types.PipelineAPIType.ITERATOR):
p.schedule_run()
else:
print("DALI iterator does not support resetting while epoch is not finished. Ignoring...")
def get_rec_iter(args, dali_cpu=False, concat=True):
gpus = range(args.gpu_num_per_node)
rank = 0 #TODO
nWrk = 1 #TODO
num_threads = args.dali_threads
num_validation_threads = args.dali_validation_threads
pad_output = (args.image_shape[0] == 4)
# the input_layout w.r.t. the model is the output_layout of the image pipeline
output_layout = types.NHWC if args.input_layout == 'NHWC' else types.NCHW
trainpipes = [HybridTrainPipe(args = args,
batch_size = args.batch_size_per_device,
num_threads = num_threads,
device_id = gpu_id,
rec_path = args.data_train,
idx_path = args.data_train_idx,
shard_id = gpus.index(gpu_id) + len(gpus)*rank,
num_shards = len(gpus)*nWrk,
crop_shape = args.image_shape[1:],
output_layout = output_layout,
dtype = args.dtype,
pad_output = pad_output,
dali_cpu = dali_cpu,
nvjpeg_padding = args.dali_nvjpeg_memory_padding * 1024 * 1024,
prefetch_queue = args.dali_prefetch_queue) for gpu_id in gpus]
if args.data_val:
valpipes = [HybridValPipe(args = args,
batch_size = args.val_batch_size_per_device,
num_threads = num_validation_threads,
device_id = gpu_id,
rec_path = args.data_val,
idx_path = args.data_val_idx,
shard_id = gpus.index(gpu_id) + len(gpus)*rank,
num_shards = len(gpus)*nWrk,
crop_shape = args.image_shape[1:],
resize_shp = 256, #args.data_val_resize,
output_layout = output_layout,
dtype = args.dtype,
pad_output = pad_output,
dali_cpu = dali_cpu,
nvjpeg_padding = args.dali_nvjpeg_memory_padding * 1024 * 1024,
prefetch_queue = args.dali_prefetch_queue) for gpu_id in gpus]
trainpipes[0].build()
if args.data_val:
valpipes[0].build()
val_examples = valpipes[0].epoch_size("Reader")
if args.num_examples < trainpipes[0].epoch_size("Reader"):
warnings.warn("{} training examples will be used, although full training set contains {} examples".format(args.num_examples, trainpipes[0].epoch_size("Reader")))
dali_train_iter = DALIGenericIterator(trainpipes, args.num_examples)
if args.data_val:
dali_val_iter = DALIGenericIterator(valpipes, val_examples, fill_last_batch = False)
else:
dali_val_iter = None
return dali_train_iter, dali_val_iter
if __name__ == '__main__':
import config as configs
parser = configs.get_parser()
args = parser.parse_args()
configs.print_args(args)
train_data_iter, val_data_iter = get_rec_iter(args, True)
for epoch in range(args.num_epochs):
tic = time.time()
last_time = time.time()
print('Starting epoch {}'.format(epoch))
train_data_iter.reset()
for i, batches in enumerate(train_data_iter):
images, labels = batches
if i % args.loss_print_every_n_iter == 0:
print(args.loss_print_every_n_iter * 256 / (time.time() - last_time))
last_time = time.time()
#print(images.shape)
epoch_time = time.time() - tic
print('epoch mena images/sec', 1281167 / epoch_time, epoch_time)
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import oneflow as flow
def load_imagenet(data_dir, image_size, batch_size, data_part_num):
image_blob_conf = flow.data.BlobConf(
"encoded",
shape=(image_size, image_size, 3),
dtype=flow.float,
codec=flow.data.ImageCodec(
[
flow.data.ImagePreprocessor('mirror'),
#flow.data.ImageCropPreprocessor(width=228, height=228),
flow.data.ImageResizePreprocessor(image_size, image_size),
]
),
preprocessors=[
flow.data.NormByChannelPreprocessor(
(123.68, 116.78, 103.94), (255.0, 255.0, 255.0)
)
],
)
label_blob_conf = flow.data.BlobConf(
"class/label", shape=(), dtype=flow.int32, codec=flow.data.RawCodec()
)
return flow.data.decode_ofrecord(
data_dir,
(label_blob_conf, image_blob_conf),
batch_size=batch_size,
data_part_num=data_part_num,
part_name_suffix_length=5,
#shuffle = True,
#buffer_size=16384,
name="decode",
)
def load_synthetic(image_size, batch_size):
label = flow.data.decode_random(
shape=(),
dtype=flow.int32,
batch_size=batch_size,
initializer=flow.zeros_initializer(flow.int32),
)
image = flow.data.decode_random(
shape=(image_size, image_size, 3), dtype=flow.float, batch_size=batch_size
)
return label, image
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import oneflow as flow
import oneflow.core.operator.op_conf_pb2 as op_conf_util
from datetime import datetime
import argparse
import os
import numpy
_DATA_DIR = "/dataset/PNGS/PNG299/of_record_repeated"
_EVAL_DIR = _DATA_DIR
_TRAIN_DIR = _DATA_DIR
_MODEL_LOAD = "/dataset/PNGS/cnns_model_for_test/inceptionv3/models/of_model"
_MODEL_SAVE_DIR = "./model_save-{}".format(
str(datetime.now().strftime("%Y-%m-%d-%H:%M:%S"))
)
NODE_LIST = "192.168.1.12,192.168.1.14"
class DLNetSpec(object):
def __init__(self, enable_auto_mixed_precision):
self.batch_size = 8
self.data_part_num = 32
self.eval_dir = _DATA_DIR
self.train_dir = _DATA_DIR
self.model_save_dir = _MODEL_SAVE_DIR
self.model_load_dir = _MODEL_LOAD
self.num_nodes = 1
self.gpu_num_per_node = 1
self.iter_num = 10
self.enable_auto_mixed_precision = enable_auto_mixed_precision
parser = argparse.ArgumentParser(description="flags for multi-node and resource")
parser.add_argument("-g", "--gpu_num_per_node", type=int, default=1, required=False)
parser.add_argument("-i", "--iter_num", type=int, default=10, required=False)
parser.add_argument("-b", "--batch_size", type=int, default=8, required=False)
parser.add_argument(
"-m", "--multinode", default=False, action="store_true", required=False
)
parser.add_argument("-n", "--node_list", type=str, default=NODE_LIST, required=False)
parser.add_argument(
"-s", "--skip_scp_binary", default=False, action="store_true", required=False
)
parser.add_argument(
"-c",
"--scp_binary_without_uuid",
default=False,
action="store_true",
required=False,
)
parser.add_argument(
"-r", "--remote_by_hand", default=False, action="store_true", required=False
)
parser.add_argument("-e", "--eval_dir", type=str, default=_DATA_DIR, required=False)
parser.add_argument("-t", "--train_dir", type=str, default=_DATA_DIR, required=False)
parser.add_argument(
"-load", "--model_load_dir", type=str, default=_MODEL_LOAD, required=False
)
parser.add_argument(
"-save", "--model_save_dir", type=str, default=_MODEL_SAVE_DIR, required=False
)
parser.add_argument("-dn", "--data_part_num", type=int, default=32, required=False)
# TODO: add this interface to oneflow.layers
def _conv2d_layer(
name,
input,
......@@ -18,7 +70,6 @@ def _conv2d_layer(
dilation_rate=1,
activation=op_conf_util.kSigmoid,
use_bias=True,
trainable=True,
weight_initializer=flow.random_uniform_initializer(),
bias_initializer=flow.constant_initializer(),
):
......@@ -26,7 +77,7 @@ def _conv2d_layer(
kernel_size = (kernel_size, kernel_size)
else:
kernel_size = tuple(kernel_size)
weight_shape = (filters, input.static_shape[1]) + kernel_size
weight_shape = (filters, input.shape[1]) + kernel_size
weight = flow.get_variable(
name + "-weight",
shape=weight_shape,
......@@ -56,6 +107,25 @@ def _conv2d_layer(
return output
def _data_load_layer(args, data_dir):
image_blob_conf = flow.data.BlobConf(
"encoded",
shape=(299, 299, 3),
dtype=flow.float,
codec=flow.data.ImageCodec([flow.data.ImagePreprocessor("bgr2rgb")]),
preprocessors=[flow.data.NormByChannelPreprocessor((123.68, 116.78, 103.94))],
)
label_blob_conf = flow.data.BlobConf(
"class/label", shape=(), dtype=flow.int32, codec=flow.data.RawCodec()
)
node_num = args.num_nodes
total_batch_size = args.batch_size * args.gpu_num_per_node * node_num
return flow.data.decode_ofrecord(
data_dir, (image_blob_conf, label_blob_conf),
batch_size=total_batch_size, data_part_num=args.data_part_num, name="decode",
)
def InceptionA(in_blob, index):
with flow.deprecated.variable_scope("mixed_{}".format(index)):
with flow.deprecated.variable_scope("branch1x1"):
......@@ -425,7 +495,7 @@ def InceptionE(in_blob, index):
return concat_total
def inceptionv3(images, labels, trainable=True):
def InceptionV3(images, labels, trainable=True):
images = flow.transpose(images, perm=[0, 3, 1, 2])
conv0 = _conv2d_layer(
......@@ -506,5 +576,75 @@ def inceptionv3(images, labels, trainable=True):
fc1 = flow.matmul(pool3, weight)
fc1 = flow.nn.bias_add(fc1, bias)
return fc1
loss = flow.nn.sparse_softmax_cross_entropy_with_logits(
labels=labels, logits=fc1, name="softmax_loss"
)
return loss
def main(args):
flow.config.machine_num(args.num_nodes)
flow.config.gpu_device_num(args.gpu_num_per_node)
func_config = flow.FunctionConfig()
func_config.default_distribute_strategy(flow.distribute.consistent_strategy())
func_config.default_data_type(flow.float)
func_config.train.primary_lr(0.0001)
func_config.train.model_update_conf(dict(naive_conf={}))
func_config.enable_auto_mixed_precision(args.enable_auto_mixed_precision)
@flow.function(func_config)
def TrainNet():
(images, labels) = _data_load_layer(args, args.train_dir)
loss = InceptionV3(images, labels)
flow.losses.add_loss(loss)
return loss
check_point = flow.train.CheckPoint()
if not args.model_load_dir:
check_point.init()
else:
check_point.load(args.model_load_dir)
num_nodes = args.num_nodes
print("Traning inceptionv3: num_gpu_per_node = {}, num_nodes = {}.".format(args.gpu_num_per_node, num_nodes))
print("{:>12} {:>12} {:>12}".format("iter", "loss type", "loss value"))
loss = []
for i in range(args.iter_num):
train_loss = TrainNet().get().mean()
loss.append(train_loss)
fmt_str = "{:>12} {:>12} {:>12.6f}"
print(fmt_str.format(i, "train loss:", train_loss))
if (i + 1) % 100 == 0:
check_point.save(_MODEL_SAVE_DIR + str(i))
# save loss to file
loss_file = "{}n{}c.npy".format(str(num_nodes), str(args.gpu_num_per_node * num_nodes))
loss_path = "./of_loss/inceptionv3"
if not os.path.exists(loss_path): os.makedirs(loss_path)
numpy.save(os.path.join(loss_path, loss_file), loss)
if __name__ == "__main__":
args = parser.parse_args()
if args.multinode:
flow.env.ctrl_port(12138)
nodes = []
for n in args.node_list.strip().split(","):
addr_dict = {}
addr_dict["addr"] = n
nodes.append(addr_dict)
flow.env.machine(nodes)
if args.remote_by_hand is False:
if args.scp_binary_without_uuid:
flow.deprecated.init_worker(scp_binary=True, use_uuid=False)
elif args.skip_scp_binary:
flow.deprecated.init_worker(scp_binary=False, use_uuid=False)
else:
flow.deprecated.init_worker(scp_binary=True, use_uuid=True)
main(args)
if (args.multinode and args.skip_scp_binary is False
and args.scp_binary_without_uuid is False):
flow.deprecated.delete_worker()
因为 它太大了无法显示 source diff 。你可以改为 查看blob
......@@ -17,7 +17,14 @@ def conv2d_layer(
weight_initializer=flow.random_uniform_initializer(),
bias_initializer=flow.constant_initializer(),
):
weight_shape = (filters, input.shape[1], kernel_size, kernel_size)
if isinstance(kernel_size, int):
kernel_size_1 = kernel_size
kernel_size_2 = kernel_size
if isinstance(kernel_size, list):
kernel_size_1 = kernel_size[0]
kernel_size_2 = kernel_size[1]
weight_shape = (filters, input.shape[1], kernel_size_1, kernel_size_2)
weight = flow.get_variable(
name + "-weight",
shape=weight_shape,
......@@ -43,3 +50,43 @@ def conv2d_layer(
raise NotImplementedError
return output
def conv2d_layer_with_bn(
name,
input,
filters,
kernel_size=3,
strides=1,
padding="SAME",
data_format="NCHW",
dilation_rate=1,
activation="Relu",
use_bias=True,
weight_initializer=flow.random_uniform_initializer(),
bias_initializer=flow.constant_initializer(),
use_bn=True,
):
output = conv2d_layer(name=name,
input=input,
filters=filters,
kernel_size=kernel_size,
strides=strides,
padding=padding,
data_format=data_format,
dilation_rate=dilation_rate,
activation=activation,
use_bias=use_bias,
weight_initializer=weight_initializer,
bias_initializer=bias_initializer)
if use_bn:
output = flow.layers.batch_normalization(inputs=output,
axis=1,
momentum=0.997,
epsilon=1.001e-5,
center=True,
scale=True,
trainable=True,
name=name + "_bn")
return output
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import time
import argparse
from datetime import datetime
import oneflow as flow
import data_loader
import vgg_model
import resnet_model
import alexnet_model
import benchmark_util
parser = argparse.ArgumentParser(description="flags for cnn benchmark")
# resouce
parser.add_argument("--gpu_num_per_node", type=int, default=1, required=False)
parser.add_argument("--node_num", type=int, default=1)
parser.add_argument(
"--node_list",
type=str,
default=None,
required=False,
help="nodes' IP address, split by comma",
)
# train
parser.add_argument(
"--model", type=str, default="vgg16", required=False, help="vgg16 or resnet50"
)
parser.add_argument("--batch_size_per_device", type=int, default=8, required=False)
parser.add_argument("--learning_rate", type=float, default=1e-4, required=False)
parser.add_argument(
"--optimizer", type=str, default="sgd", required=False, help="sgd, adam, momentum"
)
parser.add_argument(
"--weight_l2",
type=float,
default=None,
required=False,
help="weight decay parameter",
)
parser.add_argument(
"--iter_num", type=int, default=10, required=False, help="total iterations to run"
)
parser.add_argument(
"--skip_iter_num",
type=int,
default=0,
required=False,
help="number of skipping iterations for benchmark purpose.",
)
parser.add_argument(
"--data_dir", type=str, default=None, required=False, help="dataset directory"
)
parser.add_argument(
"--data_part_num",
type=int,
default=32,
required=False,
help="data part number in dataset",
)
parser.add_argument(
"--image_size", type=int, default=228, required=False, help="image size"
)
# log and resore/save
parser.add_argument(
"--loss_print_every_n_iter",
type=int,
default=1,
required=False,
help="print loss every n iteration",
)
parser.add_argument(
"--model_save_every_n_iter",
type=int,
default=200,
required=False,
help="save model every n iteration",
)
parser.add_argument(
"--model_save_dir",
type=str,
default="./output/model_save-{}".format(
str(datetime.now().strftime("%Y-%m-%d-%H:%M:%S"))
),
required=False,
help="model save directory",
)
parser.add_argument(
"--save_last_snapshot",
type=bool,
default=False,
required=False,
help="save model snapshot for last iteration",
)
parser.add_argument(
"--model_load_dir",
type=str,
default=None,
required=False,
help="model load directory",
)
parser.add_argument(
"--log_dir",
type=str,
default="./output",
required=False,
help="log info save directory",
)
args = parser.parse_args()
model_dict = {
"resnet50": resnet_model.resnet50,
"vgg16": vgg_model.vgg16,
"alexnet": alexnet_model.alexnet,
}
optimizer_dict = {
"sgd": {"naive_conf": {}},
"adam": {"adam_conf": {"beta1": 0.9}},
"momentum": {"momentum_conf": {"beta": 0.9}},
"momentum-decay": {
"momentum_conf": {"beta": 0.9},
"learning_rate_decay": {
"polynomial_conf": {"decay_batches": 300000, "end_learning_rate": 0.0001,},
},
},
}
# "warmup_conf": {"linear_conf": {"warmup_batches":10000, "start_multiplier":0}},
func_config = flow.FunctionConfig()
func_config.default_distribute_strategy(flow.distribute.consistent_strategy())
func_config.train.primary_lr(args.learning_rate)
func_config.default_data_type(flow.float)
func_config.train.model_update_conf(optimizer_dict[args.optimizer])
func_config.disable_all_reduce_sequence(True)
func_config.all_reduce_group_min_mbyte(8)
func_config.all_reduce_group_num(128)
if args.weight_l2:
func_config.train.weight_l2(args.weight_l2)
flow.config.gpu_device_num(args.gpu_num_per_node)
@flow.function(func_config)
def TrainNet():
total_device_num = args.node_num * args.gpu_num_per_node
batch_size = total_device_num * args.batch_size_per_device
if args.data_dir:
assert os.path.exists(args.data_dir)
print("Loading data from {}".format(args.data_dir))
(labels, images) = data_loader.load_imagenet(
args.data_dir, args.image_size, batch_size, args.data_part_num
)
else:
print("Loading synthetic data.")
(labels, images) = data_loader.load_synthetic(args.image_size, batch_size)
logits = model_dict[args.model](images)
loss = flow.nn.sparse_softmax_cross_entropy_with_logits(
labels, logits, name="softmax_loss"
)
flow.losses.add_loss(loss)
return loss
def main():
print("=".ljust(66, "="))
print(
"Running {}: num_gpu_per_node = {}, num_nodes = {}.".format(
args.model, args.gpu_num_per_node, args.node_num
)
)
print("=".ljust(66, "="))
for arg in vars(args):
print("{} = {}".format(arg, getattr(args, arg)))
print("-".ljust(66, "-"))
print("Time stamp: {}".format(str(datetime.now().strftime("%Y-%m-%d-%H:%M:%S"))))
flow.env.grpc_use_no_signal()
flow.env.log_dir(args.log_dir)
if args.node_num > 1:
nodes = []
for n in args.node_list.strip().split(","):
addr_dict = {}
addr_dict["addr"] = n
nodes.append(addr_dict)
flow.env.machine(nodes)
check_point = flow.train.CheckPoint()
if args.model_load_dir:
assert os.path.isdir(args.model_load_dir)
print("Restoring model from {}.".format(args.model_load_dir))
check_point.load(args.model_load_dir)
else:
print("Init model on demand.")
check_point.init()
total_batch_size = (
args.node_num * args.gpu_num_per_node * args.batch_size_per_device
)
speedometer = benchmark_util.CNNSpeedometer()
start_time = time.time()
for step in range(args.skip_iter_num + args.iter_num):
cb = speedometer.speedometer_cb(
step,
start_time,
total_batch_size,
args.skip_iter_num,
args.iter_num,
args.loss_print_every_n_iter,
)
TrainNet().async_get(cb)
if (step + 1) % args.model_save_every_n_iter == 0:
if not os.path.exists(args.model_save_dir):
os.makedirs(args.model_save_dir)
snapshot_save_path = os.path.join(
args.model_save_dir, "snapshot_%d" % (step + 1)
)
print("Saving model to {}.".format(snapshot_save_path))
check_point.save(snapshot_save_path)
if args.save_last_snapshot:
snapshot_save_path = os.path.join(args.model_save_dir, "last_snapshot")
if not os.path.exists(snapshot_save_path):
os.makedirs(snapshot_save_path)
print("Saving model to {}.".format(snapshot_save_path))
check_point.save(snapshot_save_path)
if __name__ == "__main__":
main()
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import time
import argparse
from datetime import datetime
import oneflow as flow
import data_loader
import vgg_model
import resnet_model
import alexnet_model
import inceptionv3_model
parser = argparse.ArgumentParser(description="flags for cnn benchmark")
# resouce
parser.add_argument("--gpu_num_per_node", type=int, default=1, required=False)
parser.add_argument("--node_num", type=int, default=1)
parser.add_argument(
"--node_list",
type=str,
default=None,
required=False,
help="nodes' IP address, split by comma",
)
# train
parser.add_argument(
"--model", type=str, default="vgg16", required=False, help="vgg16 or resnet50"
)
parser.add_argument("--batch_size_per_device", type=int, default=8, required=False)
parser.add_argument(
"--iter_num", type=int, default=10, required=False, help="total iterations to run"
)
parser.add_argument(
"--warmup_iter_num",
type=int,
default=0,
required=False,
help="total iterations to run",
)
parser.add_argument(
"--data_dir", type=str, default=None, required=False, help="dataset directory"
)
parser.add_argument(
"--data_part_num",
type=int,
default=32,
required=False,
help="data part number in dataset",
)
parser.add_argument(
"--image_size", type=int, default=228, required=False, help="image size"
)
parser.add_argument(
"--use_tensorrt",
dest="use_tensorrt",
action="store_true",
default=False,
required=False,
help="inference with tensorrt",
)
parser.add_argument(
"--use_xla_jit",
dest="use_xla_jit",
action="store_true",
default=False,
required=False,
help="inference with xla jit",
)
parser.add_argument(
"--precision",
type=str,
default="float32",
required=False,
help="inference with low precision",
)
# log and resore/save
parser.add_argument(
"--print_every_n_iter",
type=int,
default=1,
required=False,
help="print log every n iterations",
)
parser.add_argument(
"--model_load_dir",
type=str,
default=None,
required=False,
help="model load directory",
)
parser.add_argument(
"--log_dir",
type=str,
default="./output",
required=False,
help="log info save directory",
)
args = parser.parse_args()
model_dict = {
"resnet50": resnet_model.resnet50,
"inceptionv3": inceptionv3_model.inceptionv3,
"vgg16": vgg_model.vgg16,
"alexnet": alexnet_model.alexnet,
}
@flow.function
def InferenceNet():
total_device_num = args.node_num * args.gpu_num_per_node
batch_size = total_device_num * args.batch_size_per_device
if args.use_tensorrt:
flow.config.use_tensorrt()
if args.use_xla_jit:
flow.config.use_xla_jit()
if args.precision == "float16":
if not args.use_tensorrt:
flow.config.enable_auto_mixed_precision()
else:
flow.config.tensorrt.use_fp16()
if args.data_dir:
assert os.path.exists(args.data_dir)
print("Loading data from {}".format(args.data_dir))
(labels, images) = data_loader.load_imagenet(
args.data_dir, args.image_size, batch_size, args.data_part_num
)
else:
print("Loading synthetic data.")
(labels, images) = data_loader.load_synthetic(args.image_size, batch_size)
logits = model_dict[args.model](images)
softmax = flow.nn.softmax(logits)
return softmax
def main():
print("=".ljust(66, "="))
print(
"Running {}: num_gpu_per_node = {}, num_nodes = {}.".format(
args.model, args.gpu_num_per_node, args.node_num
)
)
print("=".ljust(66, "="))
for arg in vars(args):
print("{} = {}".format(arg, getattr(args, arg)))
print("-".ljust(66, "-"))
print("Time stamp: {}".format(str(datetime.now().strftime("%Y-%m-%d-%H:%M:%S"))))
flow.config.default_data_type(flow.float)
flow.config.gpu_device_num(args.gpu_num_per_node)
flow.env.grpc_use_no_signal()
flow.env.log_dir(args.log_dir)
# flow.config.enable_inplace(False)
# flow.config.ctrl_port(12140)
if args.node_num > 1:
nodes = []
for n in args.node_list.strip().split(","):
addr_dict = {}
addr_dict["addr"] = n
nodes.append(addr_dict)
flow.env.machine(nodes)
check_point = flow.train.CheckPoint()
if args.model_load_dir:
assert os.path.isdir(args.model_load_dir)
print("Restoring model from {}.".format(args.model_load_dir))
check_point.load(args.model_load_dir)
else:
print("Init model on demand.")
check_point.init()
# warmups
print("Runing warm up for {} iterations.".format(args.warmup_iter_num))
for step in range(args.warmup_iter_num):
predictions = InferenceNet().get()
main.total_time = 0.0
main.batch_size = args.node_num * args.gpu_num_per_node * args.batch_size_per_device
main.start_time = time.time()
def create_callback(step):
def callback(predictions):
if step % args.print_every_n_iter == 0:
cur_time = time.time()
duration = cur_time - main.start_time
main.total_time += duration
main.start_time = cur_time
images_per_sec = main.batch_size / duration
print(
"iter {}, speed: {:.3f}(sec/batch), {:.3f}(images/sec)".format(
step, duration, images_per_sec
)
)
if step == args.iter_num - 1:
avg_img_per_sec = main.batch_size * args.iter_num / main.total_time
print("-".ljust(66, "-"))
print("average speed: {:.3f}(images/sec)".format(avg_img_per_sec))
print("-".ljust(66, "-"))
return callback
for step in range(args.iter_num):
InferenceNet().async_get(create_callback(step))
# predictions = InferenceNet().get()
# create_callback(step)(predictions)
# print(predictions)
if __name__ == "__main__":
main()
......@@ -3,195 +3,117 @@ from __future__ import division
from __future__ import print_function
import os
import time
import math
import numpy as np
import config as configs
parser = configs.get_parser()
args = parser.parse_args()
configs.print_args(args)
from util import Snapshot, Summary, nodes_init, StopWatch
from dali import get_rec_iter
import oneflow as flow
import vgg_model
import ofrecord_util
import config as configs
from util import Snapshot, Summary, InitNodes, Metric
from job_function_util import get_train_config, get_val_config
import inception_model
import resnet_model
import vgg_model
import alexnet_model
parser = configs.get_parser()
args = parser.parse_args()
configs.print_args(args)
total_device_num = args.num_nodes * args.gpu_num_per_node
train_batch_size = total_device_num * args.batch_size_per_device
val_batch_size = total_device_num * args.val_batch_size_per_device
(C, H, W) = args.image_shape
epoch_size = math.ceil(args.num_examples / train_batch_size)
num_train_batches = epoch_size * args.num_epochs
num_warmup_batches = epoch_size * args.warmup_epochs
decay_batches = num_train_batches - num_warmup_batches
num_val_steps = args.num_val_examples / val_batch_size
num_val_steps = int(args.num_val_examples / val_batch_size)
summary = Summary(args.log_dir, args)
timer = StopWatch()
model_dict = {
"resnet50": resnet_model.resnet50,
"vgg16": vgg_model.vgg16,
"alexnet": alexnet_model.alexnet,
}
optimizer_dict = {
"sgd": {"naive_conf": {}},
"adam": {"adam_conf": {"beta1": 0.9}},
"momentum": {"momentum_conf": {"beta": 0.9}},
"momentum-decay": {
"momentum_conf": {"beta": 0.9},
"learning_rate_decay": {
"polynomial_conf": {"decay_batches": 300000, "end_learning_rate": 0.0001,},
},
},
"momentum-cosine-decay": {
"momentum_conf": {"beta": 0.875},
"warmup_conf": {"linear_conf": {"warmup_batches":num_warmup_batches, "start_multiplier":0}},
"learning_rate_decay": {"cosine_conf": {"decay_batches": decay_batches}},
},
"inceptionv3": inception_model.inceptionv3,
}
flow.config.gpu_device_num(args.gpu_num_per_node)
flow.config.enable_debug_mode(True)
def get_train_config():
train_config = flow.function_config()
#train_config.default_distribute_strategy(flow.distribute.consistent_strategy())
train_config.default_data_type(flow.float)
train_config.train.primary_lr(args.learning_rate)
train_config.disable_all_reduce_sequence(True)
#train_config.all_reduce_group_min_mbyte(8)
#train_config.all_reduce_group_num(128)
# train_config.all_reduce_lazy_ratio(0)
# train_config.enable_nccl_hierarchical_all_reduce(True)
# train_config.cudnn_buf_limit_mbyte(2048)
# train_config.concurrency_width(2)
train_config.all_reduce_group_num(128)
train_config.all_reduce_group_min_mbyte(8)
train_config.train.model_update_conf(optimizer_dict[args.optimizer])
if args.weight_l2 and 0:
train_config.train.weight_l2(args.weight_l2)
train_config.enable_inplace(True)
return train_config
image_shape = (args.batch_size_per_device, H, W, C)
label_shape = (args.batch_size_per_device, 1)
@flow.function(get_train_config())
def TrainNet(images=flow.MirroredTensorDef(image_shape, dtype=flow.float),
labels=flow.MirroredTensorDef(label_shape, dtype=flow.int32)):
logits = model_dict[args.model](images)
loss = flow.nn.sparse_softmax_cross_entropy_with_logits(labels, logits, name="softmax_loss")
if args.use_boxing_v2:
flow.config.collective_boxing.nccl_fusion_threshold_mb(8)
flow.config.collective_boxing.nccl_fusion_all_reduce_use_buffer(False)
@flow.function(get_train_config(args))
def TrainNet():
if args.train_data_dir:
assert os.path.exists(args.train_data_dir)
print("Loading data from {}".format(args.train_data_dir))
if args.use_new_dataloader:
(labels, images) = ofrecord_util.load_imagenet_for_training2(args)
else:
(labels, images) = ofrecord_util.load_imagenet_for_training(args)
# note: images.shape = (N C H W) in cc's new dataloader(load_imagenet_for_training2)
else:
print("Loading synthetic data.")
(labels, images) = ofrecord_util.load_synthetic(args)
logits = model_dict[args.model](
images, need_transpose=False if (args.use_new_dataloader and args.train_data_dir) else True)
loss = flow.nn.sparse_softmax_cross_entropy_with_logits(
labels, logits, name="softmax_loss")
loss = flow.math.reduce_mean(loss)
flow.losses.add_loss(loss)
softmax = flow.nn.softmax(logits)
outputs = {"loss": loss, "softmax":softmax, "labels": labels}
predictions = flow.nn.softmax(logits)
outputs = {"loss": loss, "predictions": predictions, "labels": labels}
return outputs
def get_val_config():
val_config = flow.function_config()
#val_config.default_distribute_strategy(flow.distribute.consistent_strategy())
val_config.default_data_type(flow.float)
return val_config
image_shape = (args.val_batch_size_per_device, H, W, C)
label_shape = (args.val_batch_size_per_device, 1)
@flow.function(get_val_config())
def InferenceNet(images=flow.MirroredTensorDef(image_shape, dtype=flow.float),
labels=flow.MirroredTensorDef(label_shape, dtype=flow.int32)):
logits = model_dict[args.model](images)
softmax = flow.nn.softmax(logits)
outputs = {"softmax":softmax, "labels": labels}
return outputs#(softmax, labels)
def acc_acc(step, predictions):
classfications = np.argmax(predictions['softmax'].ndarray(), axis=1)
labels = predictions['labels'].reshape(-1)
if step == 0:
main.correct = 0.0
main.total = 0.0
@flow.function(get_val_config(args))
def InferenceNet():
if args.val_data_dir:
assert os.path.exists(args.val_data_dir)
print("Loading data from {}".format(args.val_data_dir))
if args.use_new_dataloader:
(labels, images) = ofrecord_util.load_imagenet_for_validation2(args)
else:
(labels, images) = ofrecord_util.load_imagenet_for_validation(args)
else:
main.correct += np.sum(classfications == labels);
main.total += len(labels)
def train_callback(epoch, step):
def callback(train_outputs):
acc_acc(step, train_outputs)
loss = train_outputs['loss'].mean()
summary.scalar('loss', loss, step)
#summary.scalar('learning_rate', train_outputs['lr'], step)
if (step-1) % args.loss_print_every_n_iter == 0:
throughput = args.loss_print_every_n_iter * train_batch_size / timer.split()
accuracy = main.correct/main.total
print("epoch {}, iter {}, loss: {:.6f}, accuracy: {:.6f}, samples/s: {:.3f}".format(
epoch, step-1, loss, accuracy, throughput))
summary.scalar('train_accuracy', accuracy, step)
main.correct = 0.0
main.total = 0.0
return callback
def do_predictions(epoch, predict_step, predictions):
acc_acc(predict_step, predictions)
if predict_step + 1 == num_val_steps:
assert main.total > 0
summary.scalar('top1_accuracy', main.correct/main.total, epoch)
#summary.scalar('top1_correct', main.correct, epoch)
#summary.scalar('total_val_images', main.total, epoch)
print("epoch {}, top 1 accuracy: {:.6f}, time: {:.2f}".format(epoch,
main.correct/main.total, timer.split()))
def predict_callback(epoch, predict_step):
def callback(predictions):
do_predictions(epoch, predict_step, predictions)
return callback
print("Loading synthetic data.")
(labels, images) = ofrecord_util.load_synthetic(args)
logits = model_dict[args.model](
images, need_transpose=False if (args.use_new_dataloader and args.train_data_dir) else True)
predictions = flow.nn.softmax(logits)
outputs = {"predictions": predictions, "labels": labels}
return outputs
def main():
nodes_init(args)
InitNodes(args)
flow.env.grpc_use_no_signal()
flow.env.log_dir(args.log_dir)
summary = Summary(args.log_dir, args)
snapshot = Snapshot(args.model_save_dir, args.model_load_dir)
train_data_iter, val_data_iter = get_rec_iter(args, True)
timer.start()
for epoch in range(args.num_epochs):
tic = time.time()
print('Starting epoch {} at {:.2f}'.format(epoch, tic))
train_data_iter.reset()
for i, batches in enumerate(train_data_iter):
images, labels = batches
TrainNet(batches).async_get(train_callback(epoch, i))
# if i > 30:#debug
# break
#break
print('epoch {} training time: {:.2f}'.format(epoch, time.time() - tic))
if args.data_val:
tic = time.time()
val_data_iter.reset()
for i, batches in enumerate(val_data_iter):
images, labels = batches
InferenceNet(images, labels).async_get(predict_callback(epoch, i))
#acc_acc(i, InferenceNet(images, labels.astype(np.int32)).get())
summary.save()
snapshot.save('epoch_{}'.format(epoch+1))
metric = Metric(desc='train', calculate_batches=args.loss_print_every_n_iter,
summary=summary, save_summary_steps=epoch_size,
batch_size=train_batch_size, loss_key='loss')
for i in range(epoch_size):
TrainNet().async_get(metric.metric_cb(epoch, i))
if args.val_data_dir:
metric = Metric(desc='validation', calculate_batches=num_val_steps, summary=summary,
save_summary_steps=num_val_steps, batch_size=val_batch_size)
for i in range(num_val_steps):
InferenceNet().async_get(metric.metric_cb(epoch, i))
snapshot.save('epoch_{}'.format(epoch))
if __name__ == "__main__":
......
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import time
import math
import numpy as np
import config as configs
parser = configs.get_parser()
args = parser.parse_args()
configs.print_args(args)
from util import Snapshot, Summary, nodes_init, StopWatch
from dali_consistent import get_rec_iter
import oneflow as flow
import vgg_model
import resnet_model
import alexnet_model
total_device_num = args.num_nodes * args.gpu_num_per_node
train_batch_size = total_device_num * args.batch_size_per_device
val_batch_size = total_device_num * args.val_batch_size_per_device
(C, H, W) = args.image_shape
epoch_size = math.ceil(args.num_examples / train_batch_size)
num_train_batches = epoch_size * args.num_epochs
num_warmup_batches = epoch_size * args.warmup_epochs
decay_batches = num_train_batches - num_warmup_batches
num_val_steps = args.num_val_examples / val_batch_size
summary = Summary(args.log_dir, args)
timer = StopWatch()
model_dict = {
"resnet50": resnet_model.resnet50,
"vgg16": vgg_model.vgg16,
"alexnet": alexnet_model.alexnet,
}
optimizer_dict = {
"sgd": {"naive_conf": {}},
"adam": {"adam_conf": {"beta1": 0.9}},
"momentum": {"momentum_conf": {"beta": 0.9}},
"momentum-decay": {
"momentum_conf": {"beta": 0.9},
"learning_rate_decay": {
"polynomial_conf": {"decay_batches": 300000, "end_learning_rate": 0.0001,},
},
},
"momentum-cosine-decay": {
"momentum_conf": {"beta": 0.875},
"warmup_conf": {"linear_conf": {"warmup_batches":num_warmup_batches, "start_multiplier":0}},
"learning_rate_decay": {"cosine_conf": {"decay_batches": decay_batches}},
},
}
flow.config.gpu_device_num(args.gpu_num_per_node)
flow.config.enable_debug_mode(True)
def get_train_config():
train_config = flow.function_config()
train_config.default_distribute_strategy(flow.distribute.consistent_strategy())
train_config.default_data_type(flow.float)
train_config.train.primary_lr(args.learning_rate)
train_config.disable_all_reduce_sequence(True)
#train_config.all_reduce_group_min_mbyte(8)
#train_config.all_reduce_group_num(128)
# train_config.all_reduce_lazy_ratio(0)
# train_config.enable_nccl_hierarchical_all_reduce(True)
# train_config.cudnn_buf_limit_mbyte(2048)
# train_config.concurrency_width(2)
train_config.all_reduce_group_num(128)
train_config.all_reduce_group_min_mbyte(8)
train_config.train.model_update_conf(optimizer_dict[args.optimizer])
if args.weight_l2 and 0:
train_config.train.weight_l2(args.weight_l2)
train_config.enable_inplace(True)
return train_config
@flow.function(get_train_config())
def TrainNet(images=flow.FixedTensorDef((train_batch_size, H, W, C), dtype=flow.float),
labels=flow.FixedTensorDef((train_batch_size, ), dtype=flow.int32)):
logits = model_dict[args.model](images)
loss = flow.nn.sparse_softmax_cross_entropy_with_logits(labels, logits, name="softmax_loss")
#loss = flow.math.reduce_mean(loss)
flow.losses.add_loss(loss)
softmax = flow.nn.softmax(logits)
outputs = {"loss": loss, "softmax":softmax, "labels": labels}
return outputs
def get_val_config():
val_config = flow.function_config()
val_config.default_distribute_strategy(flow.distribute.consistent_strategy())
val_config.default_data_type(flow.float)
return val_config
@flow.function(get_val_config())
def InferenceNet(images=flow.FixedTensorDef((val_batch_size, H, W, C), dtype=flow.float),
labels=flow.FixedTensorDef((val_batch_size, ), dtype=flow.int32)):
logits = model_dict[args.model](images)
softmax = flow.nn.softmax(logits)
outputs = {"softmax":softmax, "labels": labels}
return outputs#(softmax, labels)
def acc_acc(step, predictions):
classfications = np.argmax(predictions['softmax'].ndarray(), axis=1)
labels = predictions['labels'].reshape(-1)
if step == 0:
main.correct = 0.0
main.total = 0.0
else:
main.correct += np.sum(classfications == labels);
main.total += len(labels)
def train_callback(epoch, step):
def callback(train_outputs):
acc_acc(step, train_outputs)
loss = train_outputs['loss'].mean()
summary.scalar('loss', loss, step)
#summary.scalar('learning_rate', train_outputs['lr'], step)
if (step-1) % args.loss_print_every_n_iter == 0:
throughput = args.loss_print_every_n_iter * train_batch_size / timer.split()
accuracy = main.correct/main.total
print("epoch {}, iter {}, loss: {:.6f}, accuracy: {:.6f}, samples/s: {:.3f}".format(
epoch, step-1, loss, accuracy, throughput))
summary.scalar('train_accuracy', accuracy, step)
main.correct = 0.0
main.total = 0.0
return callback
def do_predictions(epoch, predict_step, predictions):
acc_acc(predict_step, predictions)
if predict_step + 1 == num_val_steps:
assert main.total > 0
summary.scalar('top1_accuracy', main.correct/main.total, epoch)
#summary.scalar('top1_correct', main.correct, epoch)
#summary.scalar('total_val_images', main.total, epoch)
print("epoch {}, top 1 accuracy: {:.6f}, time: {:.2f}".format(epoch,
main.correct/main.total, timer.split()))
def predict_callback(epoch, predict_step):
def callback(predictions):
do_predictions(epoch, predict_step, predictions)
return callback
def main():
nodes_init(args)
flow.env.grpc_use_no_signal()
flow.env.log_dir(args.log_dir)
snapshot = Snapshot(args.model_save_dir, args.model_load_dir)
train_data_iter, val_data_iter = get_rec_iter(args, True)
timer.start()
for epoch in range(args.num_epochs):
tic = time.time()
print('Starting epoch {} at {:.2f}'.format(epoch, tic))
train_data_iter.reset()
for i, batches in enumerate(train_data_iter):
images, labels = batches
TrainNet(images, labels).async_get(train_callback(epoch, i))
# if i > 30:#debug
# break
#break
print('epoch {} training time: {:.2f}'.format(epoch, time.time() - tic))
if args.data_val:
tic = time.time()
val_data_iter.reset()
for i, batches in enumerate(val_data_iter):
images, labels = batches
InferenceNet(images, labels).async_get(predict_callback(epoch, i))
#acc_acc(i, InferenceNet(images, labels.astype(np.int32)).get())
summary.save()
snapshot.save('epoch_{}'.format(epoch+1))
if __name__ == "__main__":
main()
# pip3 install -U altair vega_datasets jupyterlab --user
import altair as alt
import pandas as pd
import numpy as np
import os
import glob
def plot_value(df):
legends = df["legend"].unique()
poly_data = pd.DataFrame(
{"iter": np.linspace(df["iter"].min(), df["iter"].max(), 1000)}
)
for legend in legends:
poly_data[legend + "-fit"] = np.poly1d(
np.polyfit(
df[df["legend"] == legend]["iter"],
df[df["legend"] == legend]["value"],
3,
)
)(poly_data["iter"])
base = alt.Chart(df).interactive()
chart = base.mark_circle().encode(x="iter", y="value", color="legend:N")
polynomial_fit = (
alt.Chart(poly_data)
.transform_fold(
[legend + "-fit" for legend in legends], as_=["legend", "value"]
)
.mark_line()
.encode(x="iter:Q", y="value:Q", color="legend:N")
)
chart += polynomial_fit
chart.display()
def plot_by_legend(df):
legends = df["legend"].unique()
for legend in legends:
df[df["legend"] == legend]
plot_value(df[df["legend"] == legend])
def plot_many_by_legend(df_dict):
legend_set_unsored = []
legend_set_sorted = [
"loss",
"top1_accuracy",
]
for _, df in df_dict.items():
for legend in list(df["legend"].unique()):
if (
legend not in legend_set_sorted
and "note" in df
and df[df["legend"] == legend]["note"].size is 0
):
legend_set_unsored.append(legend)
for legend in legend_set_sorted + legend_set_unsored:
df_by_legend = pd.concat(
[
update_legend(df[df["legend"] == legend].copy(), k)
for k, df in df_dict.items()
],
axis=0,
sort=False,
)
plot_value(df_by_legend)
def update_legend(df, prefix):
if df["legend"].size > 0:
df["legend"] = df.apply(
lambda row: "{}-{}".format(prefix, row["legend"]), axis=1
)
return df
COLUMNS = [
"loss",
]
def make_loss_frame(hisogram, column_index, legend="undefined"):
assert column_index < len(COLUMNS)
ndarray = np.array(hisogram)[:, [column_index, len(COLUMNS)]]
return pd.DataFrame(
{"iter": ndarray[:, 1], "legend": legend, "value": ndarray[:, 0]}
)
def make_loss_frame5(losses_hisogram, source):
return pd.concat(
[
make_loss_frame(losses_hisogram, column_index, legend=column_name)
for column_index, column_name in enumerate(COLUMNS)
],
axis=0,
ignore_index=True,
)
def wildcard_at(path, index):
result = glob.glob(path)
assert len(result) > 0, "there is no files in {}".format(path)
result.sort(key=os.path.getmtime)
return result[index]
def get_df(path, wildcard, index=-1):
if os.path.isdir(path):
path = wildcard_at(os.path.join(path, wildcard), index)
return pd.read_csv(path)
def get_metrics_sr(df1, df2):
limit = min(df1["iter"].max(), df2["iter"].max(), df1.size, df2.size)
rate = 1 if limit <= 2500 else limit // 2500 + 1
return limit, rate
def subset_and_mod(df, limit, take_every_n):
df_limited = df[df["iter"] < limit]
return df_limited[df_limited["iter"] % take_every_n == 0]
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(argument_default=argparse.SUPPRESS)
parser.add_argument("-d", "--metrics_dir", type=str)
parser.add_argument("-o", "--oneflow_metrics_path", type=str)
parser.add_argument("-p", "--pytorch_metrics_path", type=str)
args = parser.parse_args()
if hasattr(args, "metrics_dir"):
flow_metrics_path = args.metrics_dir
torch_metrics_path = args.metrics_dir
if hasattr(args, "oneflow_metrics_path"):
flow_metrics_path = args.oneflow_metrics_path
if hasattr(args, "pytorch_metrics_path"):
torch_metrics_path = args.pytorch_metrics_path
assert os.path.exists(flow_metrics_path), "{} not found".format(
flow_metrics_path
)
assert os.path.exists(torch_metrics_path), "{} not found".format(
torch_metrics_path
)
flow_df = get_df(flow_metrics_path, "loss*.csv")
flow_df.drop(["rank", "note"], axis=1)
if "primary_lr" in flow_df["legend"].unique():
flow_df["legend"].replace("primary_lr", "lr", inplace=True)
flow_df = flow_df.groupby(["iter", "legend"], as_index=False).mean()
torch_df = get_df(torch_metrics_path, "torch*.csv")
if torch_df[torch_df["value"].notnull()]["iter"].min() == 0:
torch_df["iter"] += 1
limit, rate = get_metrics_sr(flow_df, torch_df)
plot_many_by_legend(
{
"flow": subset_and_mod(flow_df, limit, rate),
"torch": subset_and_mod(torch_df, limit, rate),
}
)
# plot_by_legend(flow_df)
......@@ -19,14 +19,18 @@ def _conv2d(
padding="SAME",
data_format="NCHW",
dilations=1,
weight_initializer=flow.variance_scaling_initializer(
2, 'fan_in', 'random_normal', data_format="NCHW"),
weight_regularizer=flow.regularizers.l2(1.0/32768),
trainable=True,
weight_initializer=flow.variance_scaling_initializer(data_format="NCHW"),
):
weight = flow.get_variable(
name + "-weight",
shape=(filters, input.static_shape[1], kernel_size, kernel_size),
shape=(filters, input.shape[1], kernel_size, kernel_size),
dtype=input.dtype,
initializer=weight_initializer,
regularizer=weight_regularizer,
model_name="weight",
trainable=trainable,
)
return flow.nn.conv2d(
......@@ -38,7 +42,7 @@ def _batch_norm(inputs, name=None, trainable=True):
return flow.layers.batch_normalization(
inputs=inputs,
axis=1,
momentum=0.9,#97,
momentum=0.9, # 97,
epsilon=1.001e-5,
center=True,
scale=True,
......@@ -121,9 +125,11 @@ def resnet_stem(input):
return pool1
def resnet50(images, trainable=True):
def resnet50(images, trainable=True, need_transpose=False):
images = flow.transpose(images, name="transpose", perm=[0, 3, 1, 2])
# note: images.shape = (N C H W) in cc's new dataloader, transpose is not needed anymore
if need_transpose:
images = flow.transpose(images, name="transpose", perm=[0, 3, 1, 2])
with flow.deprecated.variable_scope("Resnet"):
stem = resnet_stem(images)
......@@ -134,10 +140,13 @@ def resnet50(images, trainable=True):
fc1001 = flow.layers.dense(
flow.reshape(pool5, (pool5.shape[0], -1)),
units=1001,
units=1000,
use_bias=True,
kernel_initializer=flow.xavier_uniform_initializer(),
kernel_initializer=flow.variance_scaling_initializer(
2, 'fan_in', 'random_normal'),
# kernel_initializer=flow.xavier_uniform_initializer(),
bias_initializer=flow.zeros_initializer(),
kernel_regularizer=flow.regularizers.l2(1.0/32768),
trainable=trainable,
name="fc1001",
)
......
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import time
import numpy as np
import pandas as pd
from datetime import datetime
import oneflow as flow
def nodes_init(args):
def InitNodes(args):
if args.num_nodes > 1:
assert args.num_nodes <= len(args.node_ips)
flow.env.ctrl_port(12138)
nodes = []
for n in args.node_list.strip().split(","):
for ip in args.node_ips:
addr_dict = {}
addr_dict["addr"] = n
addr_dict["addr"] = ip
nodes.append(addr_dict)
flow.env.machine(nodes)
class Snapshot:
class Snapshot(object):
def __init__(self, model_save_dir, model_load_dir):
self._model_save_dir = model_save_dir
self._check_point = flow.train.CheckPoint()
......@@ -26,8 +32,9 @@ class Snapshot:
print("Restoring model from {}.".format(model_load_dir))
self._check_point.load(model_load_dir)
else:
print("Init model on demand.")
self._check_point.init()
self.save('initial_model')
print("Init model on demand.")
def save(self, name):
snapshot_save_path = os.path.join(self._model_save_dir, "snapshot_{}".format(name))
......@@ -37,25 +44,25 @@ class Snapshot:
self._check_point.save(snapshot_save_path)
class Summary():
def __init__(self, log_dir, config):
class Summary(object):
def __init__(self, log_dir, config, filename='summary.csv'):
self._filename = filename
self._log_dir = log_dir
self._metrics = pd.DataFrame({"iter": 0, "legend": "cfg", "note": str(config)}, index=[0])
self._metrics = pd.DataFrame({"epoch":0, "iter": 0, "legend": "cfg", "note": str(config)}, index=[0])
def scalar(self, legend, value, step=-1):
def scalar(self, legend, value, epoch, step=-1):
# TODO: support rank(which device/gpu)
df = pd.DataFrame(
{"iter": step, "legend": legend, "value": value, "rank": 0, "time": time.time()},
{"epoch": epoch, "iter": step, "legend": legend, "value": value, "rank": 0},
index=[0])
self._metrics = pd.concat([self._metrics, df], axis=0, sort=False)
def save(self):
save_path = os.path.join(self._log_dir, "summary.csv")
save_path = os.path.join(self._log_dir, self._filename)
self._metrics.to_csv(save_path, index=False)
print("saved: {}".format(save_path))
class StopWatch:
class StopWatch(object):
def __init__(self):
pass
......@@ -75,3 +82,87 @@ class StopWatch:
def duration(self):
return self.stop_time - self.start_time
def match_top_k(predictions, labels, top_k=1):
max_k_preds = predictions.argsort(axis=1)[:, -top_k:][:, ::-1]
match_array = np.logical_or.reduce(max_k_preds==labels.reshape((-1, 1)), axis=1)
num_matched = match_array.sum()
return num_matched, match_array.shape[0]
class Metric(object):
def __init__(self, summary=None, save_summary_steps=-1, desc='train', calculate_batches=-1,
batch_size=256, top_k=5, prediction_key='predictions', label_key='labels',
loss_key=None):
self.summary = summary
self.save_summary = isinstance(self.summary, Summary)
self.save_summary_steps = save_summary_steps
self.desc = desc
self.calculate_batches = calculate_batches
self.top_k = top_k
self.prediction_key = prediction_key
self.label_key = label_key
self.loss_key = loss_key
if loss_key:
self.fmt = "{}: epoch {}, iter {}, loss: {:.6f}, top_1: {:.6f}, top_k: {:.6f}, samples/s: {:.3f}"
else:
self.fmt = "{}: epoch {}, iter {}, top_1: {:.6f}, top_k: {:.6f}, samples/s: {:.3f}"
self.timer = StopWatch()
self.timer.start()
self._clear()
def _clear(self):
self.top_1_num_matched = 0
self.top_k_num_matched = 0
self.num_samples = 0.0
def metric_cb(self, epoch, step):
def callback(outputs):
if step == 0: self._clear()
if self.prediction_key:
num_matched, num_samples = match_top_k(outputs[self.prediction_key],
outputs[self.label_key])
self.top_1_num_matched += num_matched
num_matched, _ = match_top_k(outputs[self.prediction_key],
outputs[self.label_key], self.top_k)
self.top_k_num_matched += num_matched
else:
num_samples = outputs[self.label_key].shape[0]
self.num_samples += num_samples
if (step + 1) % self.calculate_batches == 0:
throughput = self.num_samples / self.timer.split()
if self.prediction_key:
top_1_accuracy = self.top_1_num_matched / self.num_samples
top_k_accuracy = self.top_k_num_matched / self.num_samples
else:
top_1_accuracy = 0.0
top_k_accuracy = 0.0
if self.loss_key:
loss = outputs[self.loss_key].mean()
print(self.fmt.format(self.desc, epoch, step + 1, loss, top_1_accuracy,
top_k_accuracy, throughput))
if self.save_summary:
self.summary.scalar(self.desc+"_" + self.loss_key, loss, epoch, step)
else:
print(self.fmt.format(self.desc, epoch, step + 1, top_1_accuracy,
top_k_accuracy, throughput))
self._clear()
if self.save_summary:
self.summary.scalar(self.desc + "_throughput", throughput, epoch, step)
if self.prediction_key:
self.summary.scalar(self.desc + "_top_1", top_1_accuracy, epoch, step)
self.summary.scalar(self.desc + "_top_{}".format(self.top_k),
top_k_accuracy, epoch, step)
if self.save_summary:
if (step + 1) % self.save_summary_steps == 0:
self.summary.save()
return callback
......@@ -3,7 +3,6 @@ from __future__ import division
from __future__ import print_function
import oneflow as flow
import oneflow.core.operator.op_conf_pb2 as op_conf_util
from model_util import conv2d_layer
......@@ -24,10 +23,12 @@ def _conv_block(in_blob, index, filters, conv_times):
return conv_block
def vgg16(images, trainable=True):
def vgg16(images, need_transpose=False):
transposed = flow.transpose(images, name="transpose", perm=[0, 3, 1, 2])
conv1 = _conv_block(transposed, 0, 64, 2)
if need_transpose:
images = flow.transpose(images, name="transpose", perm=[0, 3, 1, 2])
conv1 = _conv_block(images, 0, 64, 2)
pool1 = flow.nn.max_pool2d(conv1[-1], 2, 2, "VALID", "NCHW", name="pool1")
conv2 = _conv_block(pool1, 2, 128, 2)
......@@ -42,26 +43,13 @@ def vgg16(images, trainable=True):
conv5 = _conv_block(pool4, 10, 512, 3)
pool5 = flow.nn.max_pool2d(conv5[-1], 2, 2, "VALID", "NCHW", name="pool5")
def _get_kernel_initializer():
kernel_initializer = op_conf_util.InitializerConf()
kernel_initializer.truncated_normal_conf.std = 0.816496580927726
return kernel_initializer
def _get_bias_initializer():
bias_initializer = op_conf_util.InitializerConf()
bias_initializer.constant_conf.value = 0.0
return bias_initializer
pool5 = flow.reshape(pool5, [pool5.shape[0], -1])
fc6 = flow.layers.dense(
inputs=pool5,
inputs=flow.reshape(pool5, [pool5.shape[0], -1]),
units=4096,
activation=flow.keras.activations.relu,
use_bias=True,
kernel_initializer=_get_kernel_initializer(),
bias_initializer=_get_bias_initializer(),
trainable=trainable,
kernel_initializer=flow.truncated_normal(0.816496580927726),
bias_initializer=flow.constant_initializer(),
name="fc1",
)
......@@ -72,9 +60,8 @@ def vgg16(images, trainable=True):
units=4096,
activation=flow.keras.activations.relu,
use_bias=True,
kernel_initializer=_get_kernel_initializer(),
bias_initializer=_get_bias_initializer(),
trainable=trainable,
kernel_initializer=flow.truncated_normal(0.816496580927726),
bias_initializer=flow.constant_initializer(),
name="fc2",
)
fc7 = flow.nn.dropout(fc7, rate=0.5)
......@@ -83,9 +70,8 @@ def vgg16(images, trainable=True):
inputs=fc7,
units=1001,
use_bias=True,
kernel_initializer=_get_kernel_initializer(),
bias_initializer=_get_bias_initializer(),
trainable=trainable,
kernel_initializer=flow.truncated_normal(0.816496580927726),
bias_initializer=flow.constant_initializer(),
name="fc_final",
)
......
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import oneflow as flow
from model_util import conv2d_layer
def alexnet(images, need_transpose=False):
if need_transpose:
images = flow.transpose(images, name="transpose", perm=[0, 3, 1, 2])
conv1 = conv2d_layer(
"conv1", images, filters=64, kernel_size=11, strides=4, padding="VALID"
)
pool1 = flow.nn.avg_pool2d(conv1, 3, 2, "VALID", "NCHW", name="pool1")
conv2 = conv2d_layer("conv2", pool1, filters=192, kernel_size=5)
pool2 = flow.nn.avg_pool2d(conv2, 3, 2, "VALID", "NCHW", name="pool2")
conv3 = conv2d_layer("conv3", pool2, filters=384)
conv4 = conv2d_layer("conv4", conv3, filters=384)
conv5 = conv2d_layer("conv5", conv4, filters=256)
pool5 = flow.nn.avg_pool2d(conv5, 3, 2, "VALID", "NCHW", name="pool5")
if len(pool5.shape) > 2:
pool5 = flow.reshape(pool5, shape=(pool5.shape[0], -1))
fc1 = flow.layers.dense(
inputs=pool5,
units=4096,
activation=flow.keras.activations.relu,
use_bias=False,
kernel_initializer=flow.random_uniform_initializer(),
bias_initializer=False,
name="fc1",
)
dropout1 = flow.nn.dropout(fc1, rate=0.5)
fc2 = flow.layers.dense(
inputs=dropout1,
units=4096,
activation=flow.keras.activations.relu,
use_bias=False,
kernel_initializer=flow.random_uniform_initializer(),
bias_initializer=False,
name="fc2",
)
dropout2 = flow.nn.dropout(fc2, rate=0.5)
fc3 = flow.layers.dense(
inputs=dropout2,
units=1001,
activation=None,
use_bias=False,
kernel_initializer=flow.random_uniform_initializer(),
bias_initializer=False,
name="fc3",
)
return fc3
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
from datetime import datetime
import logging
from optimizer_util import add_optimizer_args
from ofrecord_util import add_ofrecord_args
def get_parser(parser=None):
def str_list(x):
return x.split(',')
def int_list(x):
return list(map(int, x.split(',')))
def float_list(x):
return list(map(float, x.split(',')))
def str2bool(v):
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise argparse.ArgumentTypeError('Unsupported value encountered.')
if parser is None:
parser = argparse.ArgumentParser("flags for cnn benchmark")
parser.add_argument("--dtype", type=str, default='float32', help="float16 float32")
# resouce
parser.add_argument("--gpu_num_per_node", type=int, default=1)
parser.add_argument('--num_nodes', type=int, default=1,
help='node/machine number for training')
parser.add_argument('--node_ips', type=str_list, default=['192.168.1.13', '192.168.1.14'],
help='nodes ip list for training, devided by ",", length >= num_nodes')
parser.add_argument("--model", type=str, default="vgg16", help="vgg16 or resnet50")
parser.add_argument(
'--use_fp16',
type=str2bool,
nargs='?',
const=True,
help='Whether to use use fp16'
)
parser.add_argument(
'--use_boxing_v2',
type=str2bool,
nargs='?',
const=True,
help='Whether to use boxing v2'
)
parser.add_argument(
'--use_new_dataloader',
type=str2bool,
nargs='?',
const=True,
help='Whether to use new dataloader'
)
# train and validaion
parser.add_argument('--num_epochs', type=int, default=90, help='number of epochs')
parser.add_argument("--model_load_dir", type=str, default=None, help="model load directory if need")
parser.add_argument("--batch_size_per_device", type=int, default=64)
parser.add_argument("--val_batch_size_per_device", type=int, default=8)
# for data process
parser.add_argument("--num_examples", type=int, default=1281167, help="train pic number")
parser.add_argument("--num_val_examples", type=int, default=50000, help="validation pic number")
parser.add_argument('--rgb-mean', type=float_list, default=[123.68, 116.779, 103.939],
help='a tuple of size 3 for the mean rgb')
parser.add_argument('--rgb-std', type=float_list, default=[58.393, 57.12, 57.375],
help='a tuple of size 3 for the std rgb')
parser.add_argument("--input_layout", type=str, default='NHWC', help="NCHW or NHWC")
parser.add_argument('--image-shape', type=int_list, default=[3, 224, 224],
help='the image shape feed into the network')
## snapshot
parser.add_argument("--model_save_dir", type=str,
default="./output/snapshots/model_save-{}".format(str(datetime.now().strftime("%Y%m%d%H%M%S"))),
help="model save directory",
)
# log and loss print
parser.add_argument("--log_dir", type=str, default="./output", help="log info save directory")
parser.add_argument(
"--loss_print_every_n_iter",
type=int,
default=1,
help="print loss every n iteration",
)
add_ofrecord_args(parser)
add_optimizer_args(parser)
return parser
def print_args(args):
print("=".ljust(66, "="))
print("Running {}: num_gpu_per_node = {}, num_nodes = {}.".format(
args.model, args.gpu_num_per_node, args.num_nodes))
print("=".ljust(66, "="))
for arg in vars(args):
print("{} = {}".format(arg, getattr(args, arg)))
print("-".ljust(66, "-"))
print("Time stamp: {}".format(str(datetime.now().strftime("%Y-%m-%d-%H:%M:%S"))))
if __name__ == '__main__':
parser = get_parser()
args = parser.parse_args()
print_args(args)
from __future__ import absolute_import
import oneflow as flow
def conv2d_layer(
name,
input,
filters,
kernel_size=3,
strides=1,
padding="SAME",
data_format="NCHW",
dilation_rate=1,
activation="Relu",
use_bias=True,
weight_initializer=flow.random_uniform_initializer(),
bias_initializer=flow.constant_initializer(),
):
if isinstance(kernel_size, int):
kernel_size_1 = kernel_size
kernel_size_2 = kernel_size
if isinstance(kernel_size, list):
kernel_size_1 = kernel_size[0]
kernel_size_2 = kernel_size[1]
weight_shape = (filters, input.shape[1], kernel_size_1, kernel_size_2)
weight = flow.get_variable(
name + "-weight",
shape=weight_shape,
dtype=input.dtype,
initializer=weight_initializer,
)
output = flow.nn.conv2d(
input, weight, strides, padding, data_format, dilation_rate, name=name
)
if use_bias:
bias = flow.get_variable(
name + "-bias",
shape=(filters,),
dtype=input.dtype,
initializer=bias_initializer,
)
output = flow.nn.bias_add(output, bias, data_format)
if activation is not None:
if activation == "Relu":
output = flow.keras.activations.relu(output)
else:
raise NotImplementedError
return output
def conv2d_layer_with_bn(
name,
input,
filters,
kernel_size=3,
strides=1,
padding="SAME",
data_format="NCHW",
dilation_rate=1,
activation="Relu",
use_bias=True,
weight_initializer=flow.random_uniform_initializer(),
bias_initializer=flow.constant_initializer(),
use_bn=True,
):
output = conv2d_layer(name=name,
input=input,
filters=filters,
kernel_size=kernel_size,
strides=strides,
padding=padding,
data_format=data_format,
dilation_rate=dilation_rate,
activation=activation,
use_bias=use_bias,
weight_initializer=weight_initializer,
bias_initializer=bias_initializer)
if use_bn:
output = flow.layers.batch_normalization(inputs=output,
axis=1,
momentum=0.997,
epsilon=1.001e-5,
center=True,
scale=True,
trainable=True,
name=name + "_bn")
return output
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import math
import oneflow as flow
import ofrecord_util
import config as configs
from util import Snapshot, Summary, InitNodes, Metric
from job_function_util import get_train_config, get_val_config
import inception_model
import resnet_model
import vgg_model
import alexnet_model
parser = configs.get_parser()
args = parser.parse_args()
configs.print_args(args)
total_device_num = args.num_nodes * args.gpu_num_per_node
train_batch_size = total_device_num * args.batch_size_per_device
val_batch_size = total_device_num * args.val_batch_size_per_device
(C, H, W) = args.image_shape
epoch_size = math.ceil(args.num_examples / train_batch_size)
num_val_steps = int(args.num_val_examples / val_batch_size)
model_dict = {
"resnet50": resnet_model.resnet50,
"vgg16": vgg_model.vgg16,
"alexnet": alexnet_model.alexnet,
"inceptionv3": inception_model.inceptionv3,
}
flow.config.gpu_device_num(args.gpu_num_per_node)
flow.config.enable_debug_mode(True)
if args.use_boxing_v2:
flow.config.collective_boxing.nccl_fusion_threshold_mb(8)
flow.config.collective_boxing.nccl_fusion_all_reduce_use_buffer(False)
@flow.function(get_train_config(args))
def TrainNet():
if args.train_data_dir:
assert os.path.exists(args.train_data_dir)
print("Loading data from {}".format(args.train_data_dir))
if args.use_new_dataloader:
(labels, images) = ofrecord_util.load_imagenet_for_training2(args)
else:
(labels, images) = ofrecord_util.load_imagenet_for_training(args)
# note: images.shape = (N C H W) in cc's new dataloader(load_imagenet_for_training2)
else:
print("Loading synthetic data.")
(labels, images) = ofrecord_util.load_synthetic(args)
logits = model_dict[args.model](
images, need_transpose=False if (args.use_new_dataloader and args.train_data_dir) else True)
loss = flow.nn.sparse_softmax_cross_entropy_with_logits(
labels, logits, name="softmax_loss")
loss = flow.math.reduce_mean(loss)
flow.losses.add_loss(loss)
predictions = flow.nn.softmax(logits)
outputs = {"loss": loss, "predictions": predictions, "labels": labels}
return outputs
@flow.function(get_val_config(args))
def InferenceNet():
if args.val_data_dir:
assert os.path.exists(args.val_data_dir)
print("Loading data from {}".format(args.val_data_dir))
if args.use_new_dataloader:
(labels, images) = ofrecord_util.load_imagenet_for_validation2(args)
else:
(labels, images) = ofrecord_util.load_imagenet_for_validation(args)
else:
print("Loading synthetic data.")
(labels, images) = ofrecord_util.load_synthetic(args)
logits = model_dict[args.model](
images, need_transpose=False if (args.use_new_dataloader and args.train_data_dir) else True)
predictions = flow.nn.softmax(logits)
outputs = {"predictions": predictions, "labels": labels}
return outputs
def main():
InitNodes(args)
flow.env.grpc_use_no_signal()
flow.env.log_dir(args.log_dir)
summary = Summary(args.log_dir, args)
snapshot = Snapshot(args.model_save_dir, args.model_load_dir)
for epoch in range(args.num_epochs):
metric = Metric(desc='train', calculate_batches=args.loss_print_every_n_iter,
summary=summary, save_summary_steps=epoch_size,
batch_size=train_batch_size, loss_key='loss')
for i in range(epoch_size):
TrainNet().async_get(metric.metric_cb(epoch, i))
if args.val_data_dir:
metric = Metric(desc='validation', calculate_batches=num_val_steps, summary=summary,
save_summary_steps=num_val_steps, batch_size=val_batch_size)
for i in range(num_val_steps):
InferenceNet().async_get(metric.metric_cb(epoch, i))
snapshot.save('epoch_{}'.format(epoch))
if __name__ == "__main__":
main()
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import oneflow as flow
BLOCK_COUNTS = [3, 4, 6, 3]
BLOCK_FILTERS = [256, 512, 1024, 2048]
BLOCK_FILTERS_INNER = [64, 128, 256, 512]
def _conv2d(
name,
input,
filters,
kernel_size,
strides=1,
padding="SAME",
data_format="NCHW",
dilations=1,
weight_initializer=flow.variance_scaling_initializer(
2, 'fan_in', 'random_normal', data_format="NCHW"),
weight_regularizer=flow.regularizers.l2(1.0/32768),
trainable=True,
):
weight = flow.get_variable(
name + "-weight",
shape=(filters, input.shape[1], kernel_size, kernel_size),
dtype=input.dtype,
initializer=weight_initializer,
regularizer=weight_regularizer,
model_name="weight",
trainable=trainable,
)
return flow.nn.conv2d(
input, weight, strides, padding, data_format, dilations, name=name
)
def _batch_norm(inputs, name=None, trainable=True):
return flow.layers.batch_normalization(
inputs=inputs,
axis=1,
momentum=0.9, # 97,
epsilon=1.001e-5,
center=True,
scale=True,
trainable=trainable,
name=name,
)
def conv2d_affine(input, name, filters, kernel_size, strides, activation=None):
# input data_format must be NCHW, cannot check now
padding = "SAME" if strides > 1 or kernel_size > 1 else "VALID"
output = _conv2d(name, input, filters, kernel_size, strides, padding)
output = _batch_norm(output, name + "_bn")
if activation == "Relu":
output = flow.keras.activations.relu(output)
return output
def bottleneck_transformation(input, block_name, filters, filters_inner, strides):
a = conv2d_affine(
input, block_name + "_branch2a", filters_inner, 1, 1, activation="Relu",
)
b = conv2d_affine(
a, block_name + "_branch2b", filters_inner, 3, strides, activation="Relu",
)
c = conv2d_affine(b, block_name + "_branch2c", filters, 1, 1)
return c
def residual_block(input, block_name, filters, filters_inner, strides_init):
if strides_init != 1 or block_name == "res2_0":
shortcut = conv2d_affine(
input, block_name + "_branch1", filters, 1, strides_init
)
else:
shortcut = input
bottleneck = bottleneck_transformation(
input, block_name, filters, filters_inner, strides_init
)
return flow.keras.activations.relu(bottleneck + shortcut)
def residual_stage(input, stage_name, counts, filters, filters_inner, stride_init=2):
output = input
for i in range(counts):
block_name = "%s_%d" % (stage_name, i)
output = residual_block(
output, block_name, filters, filters_inner, stride_init if i == 0 else 1,
)
return output
def resnet_conv_x_body(input, on_stage_end=lambda x: x):
output = input
for i, (counts, filters, filters_inner) in enumerate(
zip(BLOCK_COUNTS, BLOCK_FILTERS, BLOCK_FILTERS_INNER)
):
stage_name = "res%d" % (i + 2)
output = residual_stage(
output, stage_name, counts, filters, filters_inner, 1 if i == 0 else 2,
)
on_stage_end(output)
return output
def resnet_stem(input):
conv1 = _conv2d("conv1", input, 64, 7, 2)
conv1_bn = flow.keras.activations.relu(_batch_norm(conv1, "conv1_bn"))
pool1 = flow.nn.max_pool2d(
conv1_bn, ksize=3, strides=2, padding="VALID", data_format="NCHW", name="pool1",
)
return pool1
def resnet50(images, trainable=True, need_transpose=False):
# note: images.shape = (N C H W) in cc's new dataloader, transpose is not needed anymore
if need_transpose:
images = flow.transpose(images, name="transpose", perm=[0, 3, 1, 2])
with flow.deprecated.variable_scope("Resnet"):
stem = resnet_stem(images)
body = resnet_conv_x_body(stem, lambda x: x)
pool5 = flow.nn.avg_pool2d(
body, ksize=7, strides=1, padding="VALID", data_format="NCHW", name="pool5",
)
fc1001 = flow.layers.dense(
flow.reshape(pool5, (pool5.shape[0], -1)),
units=1000,
use_bias=True,
kernel_initializer=flow.variance_scaling_initializer(
2, 'fan_in', 'random_normal'),
# kernel_initializer=flow.xavier_uniform_initializer(),
bias_initializer=flow.zeros_initializer(),
kernel_regularizer=flow.regularizers.l2(1.0/32768),
trainable=trainable,
name="fc1001",
)
return fc1001
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import time
import numpy as np
import pandas as pd
from datetime import datetime
import oneflow as flow
def InitNodes(args):
if args.num_nodes > 1:
assert args.num_nodes <= len(args.node_ips)
flow.env.ctrl_port(12138)
nodes = []
for ip in args.node_ips:
addr_dict = {}
addr_dict["addr"] = ip
nodes.append(addr_dict)
flow.env.machine(nodes)
class Snapshot(object):
def __init__(self, model_save_dir, model_load_dir):
self._model_save_dir = model_save_dir
self._check_point = flow.train.CheckPoint()
if model_load_dir:
assert os.path.isdir(model_load_dir)
print("Restoring model from {}.".format(model_load_dir))
self._check_point.load(model_load_dir)
else:
self._check_point.init()
self.save('initial_model')
print("Init model on demand.")
def save(self, name):
snapshot_save_path = os.path.join(self._model_save_dir, "snapshot_{}".format(name))
if not os.path.exists(snapshot_save_path):
os.makedirs(snapshot_save_path)
print("Saving model to {}.".format(snapshot_save_path))
self._check_point.save(snapshot_save_path)
class Summary(object):
def __init__(self, log_dir, config, filename='summary.csv'):
self._filename = filename
self._log_dir = log_dir
self._metrics = pd.DataFrame({"epoch":0, "iter": 0, "legend": "cfg", "note": str(config)}, index=[0])
def scalar(self, legend, value, epoch, step=-1):
# TODO: support rank(which device/gpu)
df = pd.DataFrame(
{"epoch": epoch, "iter": step, "legend": legend, "value": value, "rank": 0},
index=[0])
self._metrics = pd.concat([self._metrics, df], axis=0, sort=False)
def save(self):
save_path = os.path.join(self._log_dir, self._filename)
self._metrics.to_csv(save_path, index=False)
class StopWatch(object):
def __init__(self):
pass
def start(self):
self.start_time = time.time()
self.last_split = self.start_time
def split(self):
now = time.time()
duration = now - self.last_split
self.last_split = now
return duration
def stop(self):
self.stop_time = time.time()
def duration(self):
return self.stop_time - self.start_time
def match_top_k(predictions, labels, top_k=1):
max_k_preds = predictions.argsort(axis=1)[:, -top_k:][:, ::-1]
match_array = np.logical_or.reduce(max_k_preds==labels.reshape((-1, 1)), axis=1)
num_matched = match_array.sum()
return num_matched, match_array.shape[0]
class Metric(object):
def __init__(self, summary=None, save_summary_steps=-1, desc='train', calculate_batches=-1,
batch_size=256, top_k=5, prediction_key='predictions', label_key='labels',
loss_key=None):
self.summary = summary
self.save_summary = isinstance(self.summary, Summary)
self.save_summary_steps = save_summary_steps
self.desc = desc
self.calculate_batches = calculate_batches
self.top_k = top_k
self.prediction_key = prediction_key
self.label_key = label_key
self.loss_key = loss_key
if loss_key:
self.fmt = "{}: epoch {}, iter {}, loss: {:.6f}, top_1: {:.6f}, top_k: {:.6f}, samples/s: {:.3f}"
else:
self.fmt = "{}: epoch {}, iter {}, top_1: {:.6f}, top_k: {:.6f}, samples/s: {:.3f}"
self.timer = StopWatch()
self.timer.start()
self._clear()
def _clear(self):
self.top_1_num_matched = 0
self.top_k_num_matched = 0
self.num_samples = 0.0
def metric_cb(self, epoch, step):
def callback(outputs):
if step == 0: self._clear()
if self.prediction_key:
num_matched, num_samples = match_top_k(outputs[self.prediction_key],
outputs[self.label_key])
self.top_1_num_matched += num_matched
num_matched, _ = match_top_k(outputs[self.prediction_key],
outputs[self.label_key], self.top_k)
self.top_k_num_matched += num_matched
else:
num_samples = outputs[self.label_key].shape[0]
self.num_samples += num_samples
if (step + 1) % self.calculate_batches == 0:
throughput = self.num_samples / self.timer.split()
if self.prediction_key:
top_1_accuracy = self.top_1_num_matched / self.num_samples
top_k_accuracy = self.top_k_num_matched / self.num_samples
else:
top_1_accuracy = 0.0
top_k_accuracy = 0.0
if self.loss_key:
loss = outputs[self.loss_key].mean()
print(self.fmt.format(self.desc, epoch, step + 1, loss, top_1_accuracy,
top_k_accuracy, throughput))
if self.save_summary:
self.summary.scalar(self.desc+"_" + self.loss_key, loss, epoch, step)
else:
print(self.fmt.format(self.desc, epoch, step + 1, top_1_accuracy,
top_k_accuracy, throughput))
self._clear()
if self.save_summary:
self.summary.scalar(self.desc + "_throughput", throughput, epoch, step)
if self.prediction_key:
self.summary.scalar(self.desc + "_top_1", top_1_accuracy, epoch, step)
self.summary.scalar(self.desc + "_top_{}".format(self.top_k),
top_k_accuracy, epoch, step)
if self.save_summary:
if (step + 1) % self.save_summary_steps == 0:
self.summary.save()
return callback
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import oneflow as flow
from model_util import conv2d_layer
def _conv_block(in_blob, index, filters, conv_times):
conv_block = []
conv_block.insert(0, in_blob)
for i in range(conv_times):
conv_i = conv2d_layer(
name="conv{}".format(index),
input=conv_block[i],
filters=filters,
kernel_size=3,
strides=1,
)
conv_block.append(conv_i)
index += 1
return conv_block
def vgg16(images, need_transpose=False):
if need_transpose:
images = flow.transpose(images, name="transpose", perm=[0, 3, 1, 2])
conv1 = _conv_block(images, 0, 64, 2)
pool1 = flow.nn.max_pool2d(conv1[-1], 2, 2, "VALID", "NCHW", name="pool1")
conv2 = _conv_block(pool1, 2, 128, 2)
pool2 = flow.nn.max_pool2d(conv2[-1], 2, 2, "VALID", "NCHW", name="pool2")
conv3 = _conv_block(pool2, 4, 256, 3)
pool3 = flow.nn.max_pool2d(conv3[-1], 2, 2, "VALID", "NCHW", name="pool3")
conv4 = _conv_block(pool3, 7, 512, 3)
pool4 = flow.nn.max_pool2d(conv4[-1], 2, 2, "VALID", "NCHW", name="pool4")
conv5 = _conv_block(pool4, 10, 512, 3)
pool5 = flow.nn.max_pool2d(conv5[-1], 2, 2, "VALID", "NCHW", name="pool5")
fc6 = flow.layers.dense(
inputs=flow.reshape(pool5, [pool5.shape[0], -1]),
units=4096,
activation=flow.keras.activations.relu,
use_bias=True,
kernel_initializer=flow.truncated_normal(0.816496580927726),
bias_initializer=flow.constant_initializer(),
name="fc1",
)
fc6 = flow.nn.dropout(fc6, rate=0.5)
fc7 = flow.layers.dense(
inputs=fc6,
units=4096,
activation=flow.keras.activations.relu,
use_bias=True,
kernel_initializer=flow.truncated_normal(0.816496580927726),
bias_initializer=flow.constant_initializer(),
name="fc2",
)
fc7 = flow.nn.dropout(fc7, rate=0.5)
fc8 = flow.layers.dense(
inputs=fc7,
units=1001,
use_bias=True,
kernel_initializer=flow.truncated_normal(0.816496580927726),
bias_initializer=flow.constant_initializer(),
name="fc_final",
)
return fc8
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册