提交 96126122 编写于 作者: A Ancientshi

wdl_test_all

上级 9c7bac4e
## Directory description
```
.
├── gpu_info Inside are temporary csv files that include gpu usage situation
├── log
├── n1g1_benchmark_mem.sh test memory and latency in one gpu
├── n1g1_benchmark.sh test loss in one gpu
├── n1g8_benchmark_mem.sh test memory and latency in multi-gpus
├── README.md
└── wdl_train_eval.py python script
```
## How to start a test task
We use n1g1_benchmark as an example:
`./n1g1_benchmark.sh`, then a csv file named *n1g1_benchmark* will be created under the */results/old* directory.
EMBD_SIZE=1603616
DATA_ROOT=/dataset/wdl_ofrecord/ofrecord
python3 wdl_train_eval.py \
--train_data_dir $DATA_ROOT/train \
--train_data_part_num 256 \
--train_part_name_suffix_length=5 \
--eval_data_dir $DATA_ROOT/val \
--eval_data_part_num 256 \
--eval_part_name_suffix_length=5 \
--max_iter=103 \
--loss_print_every_n_iter=1 \
--batch_size=32 \
--deep_dropout_rate=0\
--hidden_units_num 7\
--hidden_size 1024\
--wide_vocab_size=$EMBD_SIZE \
--deep_vocab_size=$EMBD_SIZE \
--gpu_num_per_node 1 \
--test_name 'n1g1_benchmark'
\ No newline at end of file
EMBD_SIZE=1603616
DATA_ROOT=/dataset/wdl_ofrecord/ofrecord
python3 wdl_train_eval.py \
--train_data_dir $DATA_ROOT/train \
--train_data_part_num 256 \
--train_part_name_suffix_length=5 \
--eval_data_dir $DATA_ROOT/val \
--eval_data_part_num 256 \
--eval_part_name_suffix_length=5 \
--max_iter=103 \
--loss_print_every_n_iter=1 \
--batch_size=16384 \
--deep_dropout_rate=0.5\
--hidden_units_num 7\
--hidden_size 1024\
--wide_vocab_size=$EMBD_SIZE \
--deep_vocab_size=$EMBD_SIZE \
--gpu_num_per_node 1 \
--test_name 'n1g1_benchmark_mem'
\ No newline at end of file
EMBD_SIZE=1603616
DATA_ROOT=/dataset/wdl_ofrecord/ofrecord
python3 wdl_train_eval.py \
--train_data_dir $DATA_ROOT/train \
--train_data_part_num 256 \
--train_part_name_suffix_length=5 \
--eval_data_dir $DATA_ROOT/val \
--eval_data_part_num 256 \
--eval_part_name_suffix_length=5 \
--max_iter=103 \
--loss_print_every_n_iter=1 \
--batch_size=16384 \
--deep_dropout_rate=0.5\
--hidden_units_num 7\
--hidden_size 1024\
--wide_vocab_size=$EMBD_SIZE \
--deep_vocab_size=$EMBD_SIZE \
--gpu_num_per_node 8 \
--test_name 'n1g8_benchmark_mem'
\ No newline at end of file
"""
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import argparse
import re
from numpy.core.records import record
import oneflow.compatible.single_client as flow
import datetime
import os
import glob
import pandas
from sklearn.metrics import roc_auc_score
import numpy as np
import time
import pandas as pd
import sys
def str_list(x):
return x.split(',')
parser = argparse.ArgumentParser()
parser.add_argument('--test_name', type=str, default='a_test')
parser.add_argument('--dataset_format', type=str, default='ofrecord', help='ofrecord or onerec')
parser.add_argument(
"--use_single_dataloader_thread",
action="store_true",
help="use single dataloader threads per node or not."
)
parser.add_argument('--num_dataloader_thread_per_gpu', type=int, default=2)
parser.add_argument('--train_data_dir', type=str, default='')
parser.add_argument('--train_data_part_num', type=int, default=1)
parser.add_argument('--train_part_name_suffix_length', type=int, default=5)
parser.add_argument('--eval_data_dir', type=str, default='')
parser.add_argument('--eval_data_part_num', type=int, default=1)
parser.add_argument('--eval_part_name_suffix_length', type=int, default=5)
parser.add_argument('--eval_batchs', type=int, default=20)
parser.add_argument('--eval_interval', type=int, default=1000)
parser.add_argument('--batch_size', type=int, default=16384)
parser.add_argument('--learning_rate', type=float, default=1e-3)
parser.add_argument('--wide_vocab_size', type=int, default=3200000)
parser.add_argument('--deep_vocab_size', type=int, default=3200000)
parser.add_argument('--deep_embedding_vec_size', type=int, default=16)
parser.add_argument('--deep_dropout_rate', type=float, default=0.5)
parser.add_argument('--num_dense_fields', type=int, default=13)
parser.add_argument('--num_wide_sparse_fields', type=int, default=2)
parser.add_argument('--num_deep_sparse_fields', type=int, default=26)
parser.add_argument('--max_iter', type=int, default=100000)
parser.add_argument('--loss_print_every_n_iter', type=int, default=100)
parser.add_argument('--gpu_num_per_node', type=int, default=8)
parser.add_argument('--num_nodes', type=int, default=1,
help='node/machine number for training')
parser.add_argument('--node_ips', type=str_list, default=['192.168.1.13', '192.168.1.14'],
help='nodes ip list for training, devided by ",", length >= num_nodes')
parser.add_argument("--ctrl_port", type=int, default=50051, help='ctrl_port for multinode job')
parser.add_argument('--hidden_units_num', type=int, default=7)
parser.add_argument('--hidden_size', type=int, default=1024)
FLAGS = parser.parse_args()
records=[]
#DEEP_HIDDEN_UNITS = [1024, 1024]#, 1024, 1024, 1024, 1024, 1024]
DEEP_HIDDEN_UNITS = [FLAGS.hidden_size for i in range(FLAGS.hidden_units_num)]
def _data_loader(data_dir, data_part_num, batch_size, part_name_suffix_length=-1, shuffle=True):
assert FLAGS.num_dataloader_thread_per_gpu >= 1
if FLAGS.use_single_dataloader_thread:
devices = ['{}:0'.format(i) for i in range(FLAGS.num_nodes)]
else:
num_dataloader_thread = FLAGS.num_dataloader_thread_per_gpu * FLAGS.gpu_num_per_node
devices = ['{}:0-{}'.format(i, num_dataloader_thread - 1) for i in range(FLAGS.num_nodes)]
with flow.scope.placement("cpu", devices):
if FLAGS.dataset_format == 'ofrecord':
data = _data_loader_ofrecord(data_dir, data_part_num, batch_size,
part_name_suffix_length, shuffle)
elif FLAGS.dataset_format == 'onerec':
data = _data_loader_onerec(data_dir, batch_size, shuffle)
elif FLAGS.dataset_format == 'synthetic':
data = _data_loader_synthetic(batch_size)
else:
assert 0, "Please specify dataset_type as `ofrecord`, `onerec` or `synthetic`."
return flow.identity_n(data)
def _data_loader_ofrecord(data_dir, data_part_num, batch_size, part_name_suffix_length=-1,
shuffle=True):
assert data_dir
print('load ofrecord data form', data_dir)
ofrecord = flow.data.ofrecord_reader(data_dir,
batch_size=batch_size,
data_part_num=data_part_num,
part_name_suffix_length=part_name_suffix_length,
random_shuffle=shuffle,
shuffle_after_epoch=shuffle)
def _blob_decoder(bn, shape, dtype=flow.int32):
return flow.data.OFRecordRawDecoder(ofrecord, bn, shape=shape, dtype=dtype)
labels = _blob_decoder("labels", (1,))
dense_fields = _blob_decoder("dense_fields", (FLAGS.num_dense_fields,), flow.float)
wide_sparse_fields = _blob_decoder("wide_sparse_fields", (FLAGS.num_wide_sparse_fields,))
deep_sparse_fields = _blob_decoder("deep_sparse_fields", (FLAGS.num_deep_sparse_fields,))
return [labels, dense_fields, wide_sparse_fields, deep_sparse_fields]
def _data_loader_synthetic(batch_size):
def _blob_random(shape, dtype=flow.int32, initializer=flow.zeros_initializer(flow.int32)):
return flow.data.decode_random(shape=shape, dtype=dtype, batch_size=batch_size,
initializer=initializer)
labels = _blob_random((1,), initializer=flow.random_uniform_initializer(dtype=flow.int32))
dense_fields = _blob_random((FLAGS.num_dense_fields,), dtype=flow.float,
initializer=flow.random_uniform_initializer())
wide_sparse_fields = _blob_random((FLAGS.num_wide_sparse_fields,))
deep_sparse_fields = _blob_random((FLAGS.num_deep_sparse_fields,))
print('use synthetic data')
return [labels, dense_fields, wide_sparse_fields, deep_sparse_fields]
def _data_loader_onerec(data_dir, batch_size, shuffle):
assert data_dir
print('load onerec data form', data_dir)
files = glob.glob(os.path.join(data_dir, '*.onerec'))
readdata = flow.data.onerec_reader(files=files, batch_size=batch_size, random_shuffle=shuffle,
verify_example=False,
shuffle_mode="batch",
shuffle_buffer_size=64,
shuffle_after_epoch=shuffle)
def _blob_decoder(bn, shape, dtype=flow.int32):
return flow.data.onerec_decoder(readdata, key=bn, shape=shape, dtype=dtype)
labels = _blob_decoder('labels', shape=(1,))
dense_fields = _blob_decoder("dense_fields", (FLAGS.num_dense_fields,), flow.float)
wide_sparse_fields = _blob_decoder("wide_sparse_fields", (FLAGS.num_wide_sparse_fields,))
deep_sparse_fields = _blob_decoder("deep_sparse_fields", (FLAGS.num_deep_sparse_fields,))
return [labels, dense_fields, wide_sparse_fields, deep_sparse_fields]
def _model(dense_fields, wide_sparse_fields, deep_sparse_fields):
wide_sparse_fields = flow.parallel_cast(wide_sparse_fields, distribute=flow.distribute.broadcast())
wide_embedding_table = flow.get_variable(
name='wide_embedding',
shape=(FLAGS.wide_vocab_size, 1),
initializer=flow.random_uniform_initializer(minval=-0.05, maxval=0.05),
distribute=flow.distribute.split(0),
)
wide_embedding = flow.gather(params=wide_embedding_table, indices=wide_sparse_fields)
wide_embedding = flow.reshape(wide_embedding, shape=(-1, wide_embedding.shape[-1] * wide_embedding.shape[-2]))
wide_scores = flow.math.reduce_sum(wide_embedding, axis=[1], keepdims=True)
wide_scores = flow.parallel_cast(wide_scores, distribute=flow.distribute.split(0),
gradient_distribute=flow.distribute.broadcast())
deep_sparse_fields = flow.parallel_cast(deep_sparse_fields, distribute=flow.distribute.broadcast())
deep_embedding_table = flow.get_variable(
name='deep_embedding',
shape=(FLAGS.deep_vocab_size, FLAGS.deep_embedding_vec_size),
initializer=flow.random_uniform_initializer(minval=-0.05, maxval=0.05),
distribute=flow.distribute.split(1),
)
deep_embedding = flow.gather(params=deep_embedding_table, indices=deep_sparse_fields)
deep_embedding = flow.parallel_cast(deep_embedding, distribute=flow.distribute.split(0),
gradient_distribute=flow.distribute.split(2))
deep_embedding = flow.reshape(deep_embedding, shape=(-1, deep_embedding.shape[-1] * deep_embedding.shape[-2]))
deep_features = flow.concat([deep_embedding, dense_fields], axis=1)
for idx, units in enumerate(DEEP_HIDDEN_UNITS):
deep_features = flow.layers.dense(
deep_features,
units=units,
kernel_initializer=flow.glorot_uniform_initializer(),
bias_initializer=flow.constant_initializer(0.0),
activation=flow.math.relu,
name='fc' + str(idx)
)
deep_features = flow.nn.dropout(deep_features, rate=FLAGS.deep_dropout_rate)
deep_scores = flow.layers.dense(
deep_features,
units=1,
kernel_initializer=flow.glorot_uniform_initializer(),
bias_initializer=flow.constant_initializer(0.0),
name='deep_scores'
)
scores = wide_scores + deep_scores
return scores
def get_memory_usage(rank):
currentPath=os.path.dirname(os.path.abspath(sys.argv[0]))
nvidia_smi_report_file_path=os.path.join(currentPath,'gpu_info/gpu_memory_usage_%s.csv'%rank)
# ('~/benckmark/OneFlow-Benchmark/ClickThroughRate/WideDeepLearning/gpu_info','gpu_memory_usage_%s.csv'%rank)
cmd = "nvidia-smi --query-gpu=utilization.gpu,memory.used --format=csv"
if nvidia_smi_report_file_path is not None:
cmd += f" -f {nvidia_smi_report_file_path}"
os.system(cmd)
df=pd.read_csv(nvidia_smi_report_file_path)
memory=df.iat[rank,1].split()[0]
return memory
global_loss = 0.0
time_begin=0
time_end=0
def _create_train_callback(step):
def nop(loss):
global global_loss
global_loss += loss.mean()
pass
def print_loss(loss):
global global_loss
global time_begin
global time_end
time_end=time.time()
latency=(time_end-time_begin)*1000/FLAGS.loss_print_every_n_iter
global_loss += loss.mean()
record_dict={}
record_dict['task']='train'
record_dict['step']=step+1
record_dict['loss']=global_loss/FLAGS.loss_print_every_n_iter
record_dict['latency/ms']=latency
for i in range(FLAGS.gpu_num_per_node):
record_dict['memory_usage_%s/MB'%i]=get_memory_usage(i)
records.append(record_dict)
global_loss = 0.0
time_begin=time.time()
if (step + 1) % FLAGS.loss_print_every_n_iter == 0:
return print_loss
else:
return nop
def CreateOptimizer(args):
lr_scheduler = flow.optimizer.PiecewiseConstantScheduler([], [args.learning_rate])
return flow.optimizer.SGD(lr_scheduler)
def _get_train_conf():
train_conf = flow.FunctionConfig()
train_conf.default_data_type(flow.float)
#train_conf.indexed_slices_optimizer_conf(dict(include_op_names=dict(op_name=['wide_embedding', 'deep_embedding'])))
return train_conf
@flow.global_function('train', _get_train_conf())
def train_job():
labels, dense_fields, wide_sparse_fields, deep_sparse_fields = \
_data_loader(data_dir=FLAGS.train_data_dir, data_part_num=FLAGS.train_data_part_num,
batch_size=FLAGS.batch_size,
part_name_suffix_length=FLAGS.train_part_name_suffix_length, shuffle=False)
logits = _model(dense_fields, wide_sparse_fields, deep_sparse_fields)
loss = flow.nn.sigmoid_cross_entropy_with_logits(labels=labels, logits=logits)
opt = CreateOptimizer(FLAGS)
opt.minimize(loss)
loss = flow.math.reduce_mean(loss)
return loss
def InitNodes(args):
if args.num_nodes > 1:
assert args.num_nodes <= len(args.node_ips)
flow.env.ctrl_port(args.ctrl_port)
nodes = []
for ip in args.node_ips[:args.num_nodes]:
addr_dict = {}
addr_dict["addr"] = ip
nodes.append(addr_dict)
flow.env.machine(nodes)
def print_args(args):
print("=".ljust(66, "="))
print("Running {}: num_gpu_per_node = {}, num_nodes = {}.".format(
'OneFlow-WDL', args.gpu_num_per_node, args.num_nodes))
print("=".ljust(66, "="))
for arg in vars(args):
print("{} = {}".format(arg, getattr(args, arg)))
print("-".ljust(66, "-"))
#print("Time stamp: {}".format(
# str(datetime.now().strftime("%Y-%m-%d-%H:%M:%S"))))
def main():
print_args(FLAGS)
InitNodes(FLAGS)
flow.config.gpu_device_num(FLAGS.gpu_num_per_node)
flow.config.enable_model_io_v2(True)
flow.config.enable_debug_mode(True)
flow.config.enable_legacy_model_io(True)
flow.config.nccl_use_compute_stream(True)
# flow.config.collective_boxing.nccl_enable_all_to_all(True)
#flow.config.enable_numa_aware_cuda_malloc_host(True)
#flow.config.collective_boxing.enable_fusion(False)
check_point = flow.train.CheckPoint()
check_point.load('/home/shiyunxiao/checkpoint_old')
global time_begin
time_begin=time.time()
for i in range(FLAGS.max_iter):
train_job().async_get(_create_train_callback(i))
df=pandas.DataFrame.from_dict(records, orient='columns')
df.to_csv('/home/shiyunxiao/wdl_test_all/results/old/%s.csv'%(FLAGS.test_name),index=False)
if __name__ == '__main__':
main()
## Directory description
```
.
├── eval
│ ├── csv
│ │ ├── gpu_info Inside are temporary csv files that include gpu usage situation
│ │ ├── n1g1_ddp Every test task result, multiple csv files and the number is gpu_num
│ ├── eval.py python script
│ ├── main.py python script
│ ├── n1g1_ddp_mem.sh test memory and latency in one gpu
│ ├── n1g1_ddp.sh test loss in one gpu
│ ├── n1g1_eager_mem.sh test memory and latency in one gpu
│ ├── n1g1_eager.sh test loss in one gpu
│ ├── n1g1_graph_mem.sh test memory and latency in one gpu
│ ├── n1g1_graph.sh test loss in one gpu
│ ├── n1g8_ddp_mem.sh test memory and latency in multi-gpus
│ ├── n1g8_ddp.sh test loss in multi-gpus
│ ├── n1g8_eager_mem.sh test memory and latency in multi-gpus
│ ├── n1g8_eager.sh test loss in multi-gpus
│ ├── n1g8_graph_mem.sh test memory and latency in multi-gpus
│ ├── n1g8_graph.sh test loss in multi-gpus
├── config.py
├── graph.py
├── models
│ ├── dataloader_utils.py
│ └── wide_and_deep.py
├── README.md
└── util.py merge param from old version
```
## How to start a test task
We use n1g1_ddp as an example:
`./n1g1_ddp.sh`, then a directory named *n1g1_ddp* which contains multi csv files will be created under the *csv* directory, and each csv file corresponds to training info in one device.
Remember to copy the directory to results/new if you want to analyze them further.
\ No newline at end of file
"""
Copyright 2020 The OneFlow Authors. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import argparse
def get_args(print_args=True):
def str_list(x):
return x.split(",")
parser = argparse.ArgumentParser()
parser.add_argument(
"--dataset_format", type=str, default="ofrecord", help="ofrecord or onerec"
)
parser.add_argument(
"--use_single_dataloader_thread",
action="store_true",
help="use single dataloader threads per node or not.",
)
parser.add_argument("--model_load_dir", type=str, default="")
parser.add_argument("--model_save_dir", type=str, default="")
parser.add_argument(
"--save_initial_model",
action="store_true",
help="save initial model parameters or not.",
)
parser.add_argument("--num_dataloader_thread_per_gpu", type=int, default=2)
parser.add_argument(
"--data_dir", type=str, default="/dataset/wdl_ofrecord/ofrecord"
)
parser.add_argument("--print_interval", type=int, default=1000)
parser.add_argument("--eval_batchs", type=int, default=20)
parser.add_argument("--batch_size", type=int, default=16384)
parser.add_argument("--learning_rate", type=float, default=1e-3)
parser.add_argument("--wide_vocab_size", type=int, default=1603616)
parser.add_argument("--deep_vocab_size", type=int, default=1603616)
parser.add_argument("--hf_wide_vocab_size", type=int, default=800000)
parser.add_argument("--hf_deep_vocab_size", type=int, default=800000)
parser.add_argument("--deep_embedding_vec_size", type=int, default=16)
parser.add_argument("--deep_dropout_rate", type=float, default=0.5)
parser.add_argument("--num_dense_fields", type=int, default=13)
parser.add_argument("--num_wide_sparse_fields", type=int, default=2)
parser.add_argument("--num_deep_sparse_fields", type=int, default=26)
parser.add_argument("--max_iter", type=int, default=30000)
parser.add_argument("--gpu_num_per_node", type=int, default=8)
parser.add_argument(
"--num_nodes", type=int, default=1, help="node/machine number for training"
)
parser.add_argument(
"--node_ips",
type=str_list,
default=["192.168.1.13", "192.168.1.14"],
help='nodes ip list for training, devided by ",", length >= num_nodes',
)
parser.add_argument(
"--ctrl_port", type=int, default=50051, help="ctrl_port for multinode job"
)
parser.add_argument("--hidden_units_num", type=int, default=7)
parser.add_argument("--hidden_size", type=int, default=1024)
parser.add_argument(
"--ddp", action="store_true", help="Run model in distributed data parallel mode"
)
parser.add_argument(
"--execution_mode", type=str, default="eager", help="graph or eager"
)
parser.add_argument(
"--test_name", type=str, default="noname_test"
)
FLAGS = parser.parse_args()
if print_args:
_print_args(FLAGS)
return FLAGS
def _print_args(args):
"""Print arguments."""
print("------------------------ arguments ------------------------", flush=True)
str_list = []
for arg in vars(args):
dots = "." * (48 - len(arg))
str_list.append(" {} {} {}".format(arg, dots, getattr(args, arg)))
for arg in sorted(str_list, key=lambda x: x.lower()):
print(arg, flush=True)
print("-------------------- end of arguments ---------------------", flush=True)
if __name__ == "__main__":
get_args()
import os
import sys
sys.path.append(
os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir))
)
import time
import numpy as np
from sklearn.metrics import roc_auc_score
import oneflow as flow
from tqdm import tqdm
from config import get_args
from models.dataloader_utils import OFRecordDataLoader
from oneflow.framework import distribute
from models.wide_and_deep import WideAndDeep
from oneflow.nn.parallel import DistributedDataParallel as ddp
from graph import WideAndDeepGraph,WideAndDeepTrainGraph
import warnings
import pandas as pd
from datetime import datetime
class Trainer(object):
def __init__(self,args):
self.args = args
self. test_name=args. test_name
self.execution_mode = args.execution_mode
self.ddp = args.ddp
if self.ddp == 1 and self.execution_mode == "graph":
warnings.warn(
"""when ddp is True, the execution_mode can only be eager, but it is graph""",
UserWarning,
)
self.execution_mode = "eager"
self.is_consistent = (
flow.env.get_world_size() > 1 and not args.ddp
) or args.execution_mode == "graph"
self.rank = flow.env.get_rank()
self.world_size = flow.env.get_world_size()
(
self.train_dataloader,
self.val_dataloader,
self.wdl_module,
self.loss,
self.opt,
) = self.prepare_modules()
if self.execution_mode == "graph":
self.eval_graph = WideAndDeepGraph(
self.wdl_module, self.val_dataloader, self.loss
)
self.train_graph = WideAndDeepTrainGraph(
self.wdl_module, self.train_dataloader, self.loss, self.opt
)
self.record=[]
def get_memory_usage(self):
currentPath=os.path.dirname(os.path.abspath(__file__))
nvidia_smi_report_file_path=os.path.join('csv/gpu_info','gpu_memory_usage_%s.csv'%self.rank)
nvidia_smi_report_file_path=os.path.join(currentPath,nvidia_smi_report_file_path)
cmd = "nvidia-smi --query-gpu=utilization.gpu,memory.used --format=csv"
if nvidia_smi_report_file_path is not None:
cmd += f" -f {nvidia_smi_report_file_path}"
os.system(cmd)
df=pd.read_csv(nvidia_smi_report_file_path)
memory=df.iat[self.rank,1].split()[0]
return memory
def record_to_csv(self):
currentPath=os.path.dirname(os.path.abspath(__file__))
dir_path=os.path.join(currentPath,'csv/%s'%(self. test_name))
isExists=os.path.exists(dir_path)
if not isExists:
os.makedirs(dir_path)
filePath=os.path.join(dir_path,'record_%s_%s.csv'%(self.args.batch_size,self.rank))
df_record=pd.DataFrame.from_dict(self.record, orient='columns')
df_record.to_csv(filePath,index=False)
def to_record(self,iter=0,loss=0,latency=0):
data={}
data['node']=1
data['device']=self.rank
data['batch_size']=self.args.batch_size
data['deep_vocab_size']=self.args.deep_vocab_size
data['deep_embedding_vec_size']=self.args.deep_embedding_vec_size
data['hidden_units_num']=self.args.hidden_units_num
data['iter']=iter
data['latency/ms']=latency
data['memory_usage/MB']=self.get_memory_usage()
data['loss']=loss
self.record.append(data)
def prepare_modules(self):
args = self.args
is_consistent = self.is_consistent
self.wdl_module = WideAndDeep(args)
if is_consistent == True:
world_size = self.world_size
placement = flow.placement("cuda", {0: range(world_size)})
self.wdl_module = self.wdl_module.to_consistent(
placement=placement, sbp=flow.sbp.broadcast
)
else:
self.wdl_module=self.wdl_module.to("cuda")
if args.model_load_dir != "":
self.load_state_dict()
if self.ddp:
self.wdl_module = ddp(self.wdl_module)
if args.save_initial_model and args.model_save_dir != "":
self.save(os.path.join(args.model_save_dir, "initial_checkpoint"))
train_dataloader = OFRecordDataLoader(args)
val_dataloader = OFRecordDataLoader(args, mode="val")
bce_loss = flow.nn.BCELoss(reduction="mean")
bce_loss.to("cuda")
opt = flow.optim.SGD(
self.wdl_module.parameters(), lr=args.learning_rate, momentum=0.9
)
return train_dataloader, val_dataloader, self.wdl_module, bce_loss, opt
def load_state_dict(self):
print(f"Loading model from {self.args.model_load_dir}")
if self.is_consistent:
state_dict = flow.load(self.args.model_load_dir, consistent_src_rank=0)
elif self.rank == 0:
state_dict = flow.load(self.args.model_load_dir)
else:
return
self.wdl_module.load_state_dict(state_dict)
def save(self, save_path):
if save_path is None:
return
print(f"Saving model to {save_path}")
state_dict = self.wdl_module.state_dict()
if self.is_consistent:
flow.save(state_dict, save_path, consistent_dst_rank=0)
elif self.rank == 0:
flow.save(state_dict, save_path)
else:
return
def __call__(self):
self.train()
def train(self):
def handle(dict):
for key, value in dict.items():
if self.is_consistent == True:
dict[key] = (
value.to_consistent(
placement=flow.placement(
"cuda", {0: range(self.world_size)}
),
sbp=flow.sbp.broadcast,
)
.to_local()
.numpy()
)
else:
dict[key] = value.numpy()
return dict
losses = []
args = self.args
latency=0
time_begin=time.time()
for i in range(args.max_iter):
loss = self.train_one_step()
losses.append(handle({"loss": loss})["loss"])
if self.execution_mode == "eager":
loss.backward()
self.opt.step()
self.opt.zero_grad()
if (i + 1) % args.print_interval == 0:
time_end=time.time()
latency=(time_end-time_begin)*1000/args.print_interval
l = sum(losses) / len(losses)
self.to_record(i+1,l,round(latency,3))
losses = []
latency=0
time_begin=time.time()
self.record_to_csv()
def train_one_step(self):
self.wdl_module.train()
if self.execution_mode == "graph":
predicts, labels, train_loss = self.train_graph()
else:
(
labels,
dense_fields,
wide_sparse_fields,
deep_sparse_fields,
) = self.train_dataloader()
labels = labels.to("cuda").to(dtype=flow.float32)
dense_fields = dense_fields.to("cuda")
wide_sparse_fields = wide_sparse_fields.to("cuda")
deep_sparse_fields = deep_sparse_fields.to("cuda")
predicts = self.wdl_module(
dense_fields, wide_sparse_fields, deep_sparse_fields
)
train_loss = self.loss(predicts, labels)
return train_loss
if __name__ == "__main__":
flow.distributed.launch.main()
trainer = Trainer()
trainer()
from eval import Trainer
from config import get_args
if __name__ == "__main__":
args=get_args()
trainer = Trainer(args)
trainer()
\ No newline at end of file
DEVICE_NUM_PER_NODE=1
MASTER_ADDR=127.0.0.1
NUM_NODES=1
NODE_RANK=0
# export CUDA_VISIBLE_DEVICES=3
python3 -m oneflow.distributed.launch \
--nproc_per_node $DEVICE_NUM_PER_NODE \
--nnodes $NUM_NODES \
--node_rank $NODE_RANK \
--master_addr $MASTER_ADDR \
main.py \
--deep_vocab_size 1603616 \
--wide_vocab_size 1603616 \
--hidden_units_num 7 \
--hidden_size 1024 \
--deep_embedding_vec_size 16 \
--batch_size 32 \
--print_interval 1 \
--deep_dropout_rate 0 \
--max_iter 100 \
--execution_mode 'eager' \
--ddp \
--test_name 'n1g1_ddp' \
--model_load_dir "/home/shiyunxiao/checkpoint_new" \
DEVICE_NUM_PER_NODE=1
MASTER_ADDR=127.0.0.1
NUM_NODES=1
NODE_RANK=0
# export CUDA_VISIBLE_DEVICES=3
python3 -m oneflow.distributed.launch \
--nproc_per_node $DEVICE_NUM_PER_NODE \
--nnodes $NUM_NODES \
--node_rank $NODE_RANK \
--master_addr $MASTER_ADDR \
main.py \
--deep_vocab_size 1603616 \
--wide_vocab_size 1603616 \
--hidden_units_num 7 \
--hidden_size 1024 \
--deep_embedding_vec_size 16 \
--batch_size 16384 \
--print_interval 1 \
--deep_dropout_rate 0.5 \
--max_iter 100 \
--execution_mode 'eager' \
--ddp \
--test_name 'n1g1_ddp_mem' \
--model_load_dir "/home/shiyunxiao/checkpoint_new" \
DEVICE_NUM_PER_NODE=1
MASTER_ADDR=127.0.0.1
NUM_NODES=1
NODE_RANK=0
# export CUDA_VISIBLE_DEVICES=3
python3 -m oneflow.distributed.launch \
--nproc_per_node $DEVICE_NUM_PER_NODE \
--nnodes $NUM_NODES \
--node_rank $NODE_RANK \
--master_addr $MASTER_ADDR \
main.py \
--deep_vocab_size 1603616 \
--wide_vocab_size 1603616 \
--hidden_units_num 7 \
--hidden_size 1024 \
--deep_embedding_vec_size 16 \
--batch_size 32 \
--print_interval 1 \
--deep_dropout_rate 0 \
--max_iter 100 \
--execution_mode 'eager' \
--test_name 'n1g1_eager' \
--model_load_dir "/home/shiyunxiao/checkpoint_new" \
DEVICE_NUM_PER_NODE=1
MASTER_ADDR=127.0.0.1
NUM_NODES=1
NODE_RANK=0
# export CUDA_VISIBLE_DEVICES=3
python3 -m oneflow.distributed.launch \
--nproc_per_node $DEVICE_NUM_PER_NODE \
--nnodes $NUM_NODES \
--node_rank $NODE_RANK \
--master_addr $MASTER_ADDR \
main.py \
--deep_vocab_size 1603616 \
--wide_vocab_size 1603616 \
--hidden_units_num 7 \
--hidden_size 1024 \
--deep_embedding_vec_size 16 \
--batch_size 16384 \
--print_interval 1 \
--deep_dropout_rate 0.5 \
--max_iter 100 \
--execution_mode 'eager' \
--test_name 'n1g1_eager_mem' \
--model_load_dir "/home/shiyunxiao/checkpoint_new" \
DEVICE_NUM_PER_NODE=1
MASTER_ADDR=127.0.0.1
NUM_NODES=1
NODE_RANK=0
# export CUDA_VISIBLE_DEVICES=3
python3 -m oneflow.distributed.launch \
--nproc_per_node $DEVICE_NUM_PER_NODE \
--nnodes $NUM_NODES \
--node_rank $NODE_RANK \
--master_addr $MASTER_ADDR \
main.py \
--deep_vocab_size 1603616 \
--wide_vocab_size 1603616 \
--hidden_units_num 7 \
--hidden_size 1024 \
--deep_embedding_vec_size 16 \
--batch_size 32 \
--print_interval 1 \
--deep_dropout_rate 0 \
--max_iter 100 \
--execution_mode 'graph' \
--test_name 'n1g1_graph' \
--model_load_dir "/home/shiyunxiao/checkpoint_new" \
DEVICE_NUM_PER_NODE=1
MASTER_ADDR=127.0.0.1
NUM_NODES=1
NODE_RANK=0
# export CUDA_VISIBLE_DEVICES=3
python3 -m oneflow.distributed.launch \
--nproc_per_node $DEVICE_NUM_PER_NODE \
--nnodes $NUM_NODES \
--node_rank $NODE_RANK \
--master_addr $MASTER_ADDR \
main.py \
--deep_vocab_size 1603616 \
--wide_vocab_size 1603616 \
--hidden_units_num 7 \
--hidden_size 1024 \
--deep_embedding_vec_size 16 \
--batch_size 16384 \
--print_interval 1 \
--deep_dropout_rate 0.5 \
--max_iter 100 \
--execution_mode 'graph' \
--test_name 'n1g1_graph_mem' \
--model_load_dir "/home/shiyunxiao/checkpoint_new" \
DEVICE_NUM_PER_NODE=2
MASTER_ADDR=127.0.0.1
NUM_NODES=1
NODE_RANK=0
# export CUDA_VISIBLE_DEVICES=3
python3 -m oneflow.distributed.launch \
--nproc_per_node $DEVICE_NUM_PER_NODE \
--nnodes $NUM_NODES \
--node_rank $NODE_RANK \
--master_addr $MASTER_ADDR \
main.py \
--deep_vocab_size 1603616 \
--wide_vocab_size 1603616 \
--hidden_units_num 7 \
--hidden_size 1024 \
--deep_embedding_vec_size 16 \
--batch_size 32 \
--print_interval 1 \
--deep_dropout_rate 0.5 \
--max_iter 100 \
--execution_mode 'eager' \
--ddp \
--test_name 'n1g8_ddp' \
--model_load_dir "/home/shiyunxiao/checkpoint_new" \
DEVICE_NUM_PER_NODE=8
MASTER_ADDR=127.0.0.1
NUM_NODES=1
NODE_RANK=0
# export CUDA_VISIBLE_DEVICES=3
python3 -m oneflow.distributed.launch \
--nproc_per_node $DEVICE_NUM_PER_NODE \
--nnodes $NUM_NODES \
--node_rank $NODE_RANK \
--master_addr $MASTER_ADDR \
main.py \
--deep_vocab_size 1603616 \
--wide_vocab_size 1603616 \
--hidden_units_num 7 \
--hidden_size 1024 \
--deep_embedding_vec_size 16 \
--batch_size 16384 \
--print_interval 1 \
--deep_dropout_rate 0.5 \
--max_iter 100 \
--execution_mode 'eager' \
--ddp \
--test_name 'n1g8_ddp_mem' \
--model_load_dir "/home/shiyunxiao/checkpoint_new" \
DEVICE_NUM_PER_NODE=8
MASTER_ADDR=127.0.0.1
NUM_NODES=1
NODE_RANK=0
# export CUDA_VISIBLE_DEVICES=3
python3 -m oneflow.distributed.launch \
--nproc_per_node $DEVICE_NUM_PER_NODE \
--nnodes $NUM_NODES \
--node_rank $NODE_RANK \
--master_addr $MASTER_ADDR \
main.py \
--deep_vocab_size 1603616 \
--wide_vocab_size 1603616 \
--hidden_units_num 7 \
--hidden_size 1024 \
--deep_embedding_vec_size 16 \
--batch_size 32 \
--print_interval 1 \
--deep_dropout_rate 0.5 \
--max_iter 100 \
--execution_mode 'eager' \
--test_name 'n1g8_eager' \
--model_load_dir "/home/shiyunxiao/checkpoint_new" \
DEVICE_NUM_PER_NODE=8
MASTER_ADDR=127.0.0.1
NUM_NODES=1
NODE_RANK=0
# export CUDA_VISIBLE_DEVICES=3
python3 -m oneflow.distributed.launch \
--nproc_per_node $DEVICE_NUM_PER_NODE \
--nnodes $NUM_NODES \
--node_rank $NODE_RANK \
--master_addr $MASTER_ADDR \
main.py \
--deep_vocab_size 1603616 \
--wide_vocab_size 1603616 \
--hidden_units_num 7 \
--hidden_size 1024 \
--deep_embedding_vec_size 16 \
--batch_size 16384 \
--print_interval 1 \
--deep_dropout_rate 0.5 \
--max_iter 100 \
--execution_mode 'eager' \
--test_name 'n1g8_eager_mem' \
--model_load_dir "/home/shiyunxiao/checkpoint_new" \
DEVICE_NUM_PER_NODE=8
MASTER_ADDR=127.0.0.1
NUM_NODES=1
NODE_RANK=0
# export CUDA_VISIBLE_DEVICES=3
python3 -m oneflow.distributed.launch \
--nproc_per_node $DEVICE_NUM_PER_NODE \
--nnodes $NUM_NODES \
--node_rank $NODE_RANK \
--master_addr $MASTER_ADDR \
main.py \
--deep_vocab_size 1603616 \
--wide_vocab_size 1603616 \
--hidden_units_num 7 \
--hidden_size 1024 \
--deep_embedding_vec_size 16 \
--batch_size 32 \
--print_interval 1 \
--deep_dropout_rate 0.5 \
--max_iter 100 \
--execution_mode 'graph' \
--test_name 'n1g8_graph' \
--model_load_dir "/home/shiyunxiao/checkpoint_new" \
DEVICE_NUM_PER_NODE=8
MASTER_ADDR=127.0.0.1
NUM_NODES=1
NODE_RANK=0
# export CUDA_VISIBLE_DEVICES=3
python3 -m oneflow.distributed.launch \
--nproc_per_node $DEVICE_NUM_PER_NODE \
--nnodes $NUM_NODES \
--node_rank $NODE_RANK \
--master_addr $MASTER_ADDR \
main.py \
--deep_vocab_size 1603616 \
--wide_vocab_size 1603616 \
--hidden_units_num 7 \
--hidden_size 1024 \
--deep_embedding_vec_size 16 \
--batch_size 16384 \
--print_interval 1 \
--deep_dropout_rate 0.5 \
--max_iter 100 \
--execution_mode 'graph' \
--test_name 'n1g8_graph_mem' \
--model_load_dir "/home/shiyunxiao/checkpoint_new" \
import oneflow as flow
class WideAndDeepGraph(flow.nn.Graph):
def __init__(self, wdl_module, dataloader, bce_loss):
super(WideAndDeepGraph, self).__init__()
self.module = wdl_module
self.dataloader = dataloader
self.bce_loss = bce_loss
def build(self):
with flow.no_grad():
return self.graph()
def graph(self):
(
labels,
dense_fields,
wide_sparse_fields,
deep_sparse_fields,
) = self.dataloader()
labels = labels.to("cuda").to(dtype=flow.float32)
dense_fields = dense_fields.to("cuda")
wide_sparse_fields = wide_sparse_fields.to("cuda")
deep_sparse_fields = deep_sparse_fields.to("cuda")
predicts = self.module(dense_fields, wide_sparse_fields, deep_sparse_fields)
loss = self.bce_loss(predicts, labels)
return predicts, labels, loss
class WideAndDeepTrainGraph(WideAndDeepGraph):
def __init__(self, wdl_module, dataloader, bce_loss, optimizer):
super(WideAndDeepTrainGraph, self).__init__(wdl_module, dataloader, bce_loss)
self.add_optimizer(optimizer)
def build(self):
predicts, labels, loss = self.graph()
loss.backward()
return predicts, labels, loss
import os
import oneflow as flow
import oneflow.nn as nn
class OFRecordDataLoader(nn.Module):
def __init__(
self,
FLAGS,
data_part_num: int = 256,
part_name_suffix_length: int = 5,
mode: str = "train",
):
super(OFRecordDataLoader, self).__init__()
assert FLAGS.num_dataloader_thread_per_gpu >= 1
self.num_dataloader_thread_per_gpu = FLAGS.num_dataloader_thread_per_gpu
if FLAGS.use_single_dataloader_thread:
self.devices = ["{}:0".format(i) for i in range(FLAGS.num_nodes)]
else:
num_dataloader_thread = (
FLAGS.num_dataloader_thread_per_gpu * FLAGS.gpu_num_per_node
)
self.devices = [
"{}:0-{}".format(i, num_dataloader_thread - 1)
for i in range(FLAGS.num_nodes)
]
data_root = FLAGS.data_dir
batch_size = FLAGS.batch_size
is_consistent = (
flow.env.get_world_size() > 1 and not FLAGS.ddp
) or FLAGS.execution_mode == "graph"
placement = None
sbp = None
if is_consistent == True:
placement = flow.placement("cpu", {0: range(flow.env.get_world_size())})
sbp = flow.sbp.split(0)
#shuffle = mode == "train"
shuffle = False
self.reader = nn.OfrecordReader(
os.path.join(data_root, mode),
batch_size=batch_size,
data_part_num=data_part_num,
part_name_suffix_length=part_name_suffix_length,
random_shuffle=shuffle,
shuffle_after_epoch=shuffle,
placement=placement,
sbp=sbp,
)
def _blob_decoder(bn, shape, dtype=flow.int32):
return nn.OfrecordRawDecoder(bn, shape=shape, dtype=dtype)
self.labels = _blob_decoder("labels", (1,))
self.dense_fields = _blob_decoder(
"dense_fields", (FLAGS.num_dense_fields,), flow.float
)
self.wide_sparse_fields = _blob_decoder(
"wide_sparse_fields", (FLAGS.num_wide_sparse_fields,)
)
self.deep_sparse_fields = _blob_decoder(
"deep_sparse_fields", (FLAGS.num_deep_sparse_fields,)
)
def forward(self):
reader = self.reader()
labels = self.labels(reader)
dense_fields = self.dense_fields(reader)
wide_sparse_fields = self.wide_sparse_fields(reader)
deep_sparse_fields = self.deep_sparse_fields(reader)
return labels, dense_fields, wide_sparse_fields, deep_sparse_fields
if __name__ == "__main__":
from config import get_args
FLAGS = get_args()
dataloader = OFRecordDataLoader(FLAGS, data_root="/dataset/wdl_ofrecord/ofrecord")
for i in range(10):
labels, dense_fields, wide_sparse_fields, deep_sparse_fields = dataloader()
print(deep_sparse_fields)
from collections import OrderedDict
import oneflow as flow
import oneflow.nn as nn
from typing import Any
__all__ = ["WideAndDeep", "wide_and_deep"]
class Embedding(nn.Embedding):
def __init__(self, vocab_size, embed_size, split_axis=0):
# TODO: name and split_axis for weight
super(Embedding, self).__init__(vocab_size, embed_size, padding_idx=0)
for param in self.parameters():
nn.init.uniform_(param, a=-0.05, b=0.05)
def forward(self, indices):
# indices = flow.parallel_cast(indices, distribute=flow.distribute.broadcast())
embedding = flow._C.gather(self.weight, indices, axis=0)
return embedding.view(-1, embedding.shape[-1] * embedding.shape[-2])
class Dense(nn.Module):
def __init__(
self, in_features: int, out_features: int, dropout_rate: float = 0.5
) -> None:
super(Dense, self).__init__()
self.features = nn.Sequential(
nn.Linear(in_features, out_features),
nn.ReLU(inplace=True),
nn.Dropout(p=dropout_rate),
)
for name, param in self.named_parameters():
if name.endswith("weight"):
nn.init.xavier_uniform_(param)
elif name.endswith("bias"):
nn.init.zeros_(param)
def forward(self, x: flow.Tensor) -> flow.Tensor:
x = self.features(x)
return x
class WideAndDeep(nn.Module):
def __init__(self, FLAGS) -> None:
super(WideAndDeep, self).__init__()
self.FLAGS = FLAGS
self.wide_embedding = Embedding(vocab_size=FLAGS.wide_vocab_size, embed_size=1)
self.deep_embedding = Embedding(
vocab_size=FLAGS.deep_vocab_size,
embed_size=FLAGS.deep_embedding_vec_size,
split_axis=1,
)
deep_feature_size = (
FLAGS.deep_embedding_vec_size * FLAGS.num_deep_sparse_fields
+ FLAGS.num_dense_fields
)
self.linear_layers = nn.Sequential(
OrderedDict(
[
(
f"fc{i}",
Dense(
deep_feature_size if i == 0 else FLAGS.hidden_size,
FLAGS.hidden_size,
FLAGS.deep_dropout_rate,
),
)
for i in range(FLAGS.hidden_units_num)
]
)
)
self.deep_scores = nn.Linear(FLAGS.hidden_size, 1)
self.sigmoid = nn.Sigmoid()
def forward(
self, dense_fields, wide_sparse_fields, deep_sparse_fields
) -> flow.Tensor:
wide_embedding = self.wide_embedding(wide_sparse_fields)
wide_scores = flow.sum(wide_embedding, dim=1, keepdim=True)
deep_embedding = self.deep_embedding(deep_sparse_fields)
deep_features = flow.cat([deep_embedding, dense_fields], dim=1)
deep_features = self.linear_layers(deep_features)
deep_scores = self.deep_scores(deep_features)
return self.sigmoid(wide_scores + deep_scores)
def wide_and_deep(
pretrained: bool = False, progress: bool = True, **kwargs: Any
) -> WideAndDeep:
r"""WideAndDeep model architecture from the
`"One weird trick..." <https://arxiv.org/abs/1606.07792>`_ paper.
Args:
pretrained (bool): If True, returns a model pre-trained on WideAndDeep
progress (bool): If True, displays a progress bar of the download to stderr
"""
model = WideAndDeep(**kwargs)
return model
import os
import sys
from shutil import copy,copytree
import numpy as np
import matplotlib.pyplot as plt
from numpy.core.fromnumeric import var
def npy_compare(lhs_path, rhs_path):
lhs = np.load(lhs_path)
rhs = np.load(rhs_path)
return np.allclose(lhs, rhs)
def walk_compare_npy(lhs, rhs):
assert os.path.isdir(lhs)
assert os.path.isdir(rhs)
same = 0
diff = 0
ignore = 0
for root, dirs, files in os.walk(lhs):
for name in filter(lambda f: f.endswith(".npy"), files):
lhs_path = os.path.join(root, name)
rhs_path = os.path.join(rhs, os.path.relpath(lhs_path, lhs))
if os.path.exists(rhs_path) and os.path.isfile(rhs_path):
if not npy_compare(lhs_path, rhs_path):
print("{} False".format(lhs_path))
diff += 1
else:
same += 1
else:
print("{} ignore".format(lhs_path))
ignore += 1
print("same:", same)
print("diff:", diff)
print("ignore:", ignore)
def get_varible_name(var_org):
# for item in sys._getframe().f_locals.items():
# print(item[0],item[1])
# for item in sys._getframe(1).f_locals.items():
# print(item[0],item[1])
for item in sys._getframe(2).f_locals.items():
if var_org is item[1]:
return item[0]
def dump_to_npy(tensor, root="./output", sub="", name=""):
if sub != "":
root = os.path.join(root, str(sub))
if not os.path.isdir(root):
os.makedirs(root)
var_org_name = get_varible_name(tensor) if name == "" else name
path = os.path.join(root, f"{var_org_name}.npy")
if not isinstance(tensor, np.ndarray):
tensor = tensor.numpy()
np.save(path, tensor)
def save_param_npy(module, root="./output"):
for name, param in module.named_parameters():
# if name.endswith('bias'):
dump_to_npy(param.numpy(), root=root, sub=0, name=name)
def param_hist(param, name, root="output"):
print(name, param.shape)
# print(param.flatten())
# the histogram of the data
n, bins, patches = plt.hist(param.flatten(), density=False, facecolor="g")
# plt.xlabel('Smarts')
# plt.ylabel('value')
plt.title(f"Histogram of {name}")
# plt.xlim(40, 160)
# plt.ylim(0, 0.03)
plt.grid(True)
plt.savefig(os.path.join(root, f"{name}.png"))
plt.close()
def save_param_hist_pngs(module, root="output"):
for name, param in module.named_parameters():
# if name.endswith('bias'):
param_hist(param.numpy(), name, root=root)
def merge_param_from_old_version(src, dst):
param_list = [
["deep_embedding.weight", "deep_embedding"],
["wide_embedding.weight", "wide_embedding"],
["linear_layers.fc0.features.0.bias", "fc0-bias"],
["linear_layers.fc0.features.0.weight", "fc0-weight"],
["linear_layers.fc1.features.0.bias", "fc1-bias"],
["linear_layers.fc1.features.0.weight", "fc1-weight"],
["linear_layers.fc2.features.0.bias", "fc2-bias"],
["linear_layers.fc2.features.0.weight", "fc2-weight"],
["linear_layers.fc3.features.0.bias", "fc3-bias"],
["linear_layers.fc3.features.0.weight", "fc3-weight"],
["linear_layers.fc4.features.0.bias", "fc4-bias"],
["linear_layers.fc4.features.0.weight", "fc4-weight"],
["linear_layers.fc5.features.0.bias", "fc5-bias"],
["linear_layers.fc5.features.0.weight", "fc5-weight"],
["linear_layers.fc6.features.0.bias", "fc6-bias"],
["linear_layers.fc6.features.0.weight", "fc6-weight"],
["deep_scores.weight", "deep_scores-weight"],
["deep_scores.bias", "deep_scores-bias"],
]
for new_name, old_name in param_list:
src_file = os.path.join(src, old_name)
dst_file = os.path.join(dst, new_name)
copytree(src_file, dst_file)
print(src_file, dst_file)
for new_name, old_name in param_list:
src_file = os.path.join('/home/shiyunxiao/checkpoints/initial_checkpoint', new_name+'/meta')
dst_file = os.path.join('/home/shiyunxiao/checkpoint_new', new_name+'/meta')
copy(src_file, dst_file)
print(src_file, dst_file)
if __name__ == "__main__":
# walk_compare_npy("output/old_0", "output/0")
merge_param_from_old_version('/home/shiyunxiao/checkpoint_old','/home/shiyunxiao/checkpoint_new')
\ No newline at end of file
from datetime import date
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os
def benchmark_n1g1_loss():
'''
draw n1g1 benchmark line chart of loss
'''
filename='n1g1_old_100loss'
legendname='benchmark_n1g1_loss'
csvpath='wide_deep_test/old/%s.csv'%filename
imgpath='wide_deep_test/img/%s.jpg'%filename
df = pd.read_csv(csvpath)
y=df.loc[:,['loss']].values
x = list(range(100))
plt.figure(figsize=(8, 5))
plt.xlabel('iter times')
plt.ylabel('loss')
plt.plot(x,y)
plt.legend([legendname])
plt.savefig(imgpath,dpi=400)
def all_n1n1_loss():
'''draw n1g1 benchmark,eager,graph,ddp line chart of loss'''
filename='n1g1_ddp_eager_graph_100loss'
benchmark_csvpath='/home/shiyunxiao/wdl_test_all/results/old/n1g1_benchmark.csv'
ddp_csvpath='/home/shiyunxiao/wdl_test_all/results/new/n1g1_ddp/record_32_0.csv'
eager_csvpath='/home/shiyunxiao/wdl_test_all/results/new/n1g1_eager/record_32_0.csv'
graph_csvpath='/home/shiyunxiao/wdl_test_all/results/new/n1g1_graph/record_32_0.csv'
imgpath='Oneflow_intern_work/img/%s.jpg'%filename
df_benchmark=pd.read_csv(benchmark_csvpath)
df_ddp = pd.read_csv(ddp_csvpath)
df_eager = pd.read_csv(eager_csvpath)
df_graph = pd.read_csv(graph_csvpath)
y_benchmark=df_benchmark.loc[:,['loss']].values
y_ddp=df_ddp.loc[:,['loss']].values
y_eager=df_eager.loc[:,['loss']].values
y_graph=df_graph.loc[:,['loss']].values
x = list(range(100))
plt.figure(figsize=(8, 5))
plt.xlabel('iter times')
plt.ylabel('loss')
plt.plot(x,y_benchmark,label='benchmark')
plt.plot(x,y_ddp,label='ddp')
plt.plot(x,y_eager,label='eager')
plt.plot(x,y_graph,label='graph')
plt.legend()
plt.savefig(imgpath,dpi=400)
def n1g1_lat():
'''
draw n1g1 benchmark,eager,graph,ddp line chart of latency
'''
filename='n1g1_ddp_eager_graph_lat'
imgpath='wide_deep_test/img/%s.jpg'%filename
n1g1_ddp_mem='/home/shiyunxiao/wdl_test_all/results/new/n1g1_ddp_mem/record_16384_0.csv'
n1g1_eager_mem='/home/shiyunxiao/wdl_test_all/results/new/n1g1_eager_mem/record_16384_0.csv'
n1g1_graph_mem='/home/shiyunxiao/wdl_test_all/results/new/n1g1_graph_mem/record_16384_0.csv'
old_n1g1_mem='/home/shiyunxiao/wdl_test_all/results/old/n1g1_benchmark_mem.csv'
n1g1_ddp_mem=pd.read_csv(n1g1_ddp_mem)
n1g1_eager_mem = pd.read_csv(n1g1_eager_mem)
n1g1_graph_mem = pd.read_csv(n1g1_graph_mem)
old_n1g1_mem = pd.read_csv(old_n1g1_mem)
n1g1_ddp_mem=n1g1_ddp_mem.loc[:,['latency/ms']].values
n1g1_eager_mem=n1g1_eager_mem.loc[:,['latency/ms']].values
n1g1_graph_mem=n1g1_graph_mem.loc[:,['latency/ms']].values
old_n1g1_mem=old_n1g1_mem.loc[:,['latency/ms']].values
x = list(range(100))
plt.figure(figsize=(8, 5))
plt.xlabel('iter times')
plt.ylabel('latency/ms')
plt.plot(x,n1g1_ddp_mem,label='ddp')
plt.plot(x,n1g1_eager_mem,label='eager')
plt.plot(x,n1g1_graph_mem,label='graph')
plt.plot(x,old_n1g1_mem,label='benchmark')
plt.legend()
plt.savefig(imgpath,dpi=400)
def n1g1_lat_avg():
'''
draw n1g1 benchmark,eager,graph,ddp table of latency
'''
filename='n1g1_ddp_eager_graph_lat'
imgpath='Oneflow_intern_work/img/%s.jpg'%filename
n1g1_ddp_mem='/home/shiyunxiao/wdl_test_all/results/new/n1g1_ddp_mem/record_16384_0.csv'
n1g1_eager_mem='/home/shiyunxiao/wdl_test_all/results/new/n1g1_eager_mem/record_16384_0.csv'
n1g1_graph_mem='/home/shiyunxiao/wdl_test_all/results/new/n1g1_graph_mem/record_16384_0.csv'
old_n1g1_mem='/home/shiyunxiao/wdl_test_all/results/old/n1g1_benchmark_mem.csv'
n1g1_ddp_mem=pd.read_csv(n1g1_ddp_mem)
n1g1_eager_mem = pd.read_csv(n1g1_eager_mem)
n1g1_graph_mem = pd.read_csv(n1g1_graph_mem)
old_n1g1_mem = pd.read_csv(old_n1g1_mem)
n1g1_ddp_mem=n1g1_ddp_mem.loc[:,['latency/ms']].values[20:100]
n1g1_eager_mem=n1g1_eager_mem.loc[:,['latency/ms']].values[20:100]
n1g1_graph_mem=n1g1_graph_mem.loc[:,['latency/ms']].values[20:100]
old_n1g1_mem=old_n1g1_mem.loc[:,['latency/ms']].values[20:100]
return (np.mean(n1g1_ddp_mem),
np.mean(n1g1_eager_mem),
np.mean(n1g1_graph_mem),
np.mean(old_n1g1_mem))
def n1g1_mem():
'''draw n1g1 benchmark,eager,graph,ddp line of memory'''
filename='n1g1_ddp_eager_graph_mem'
imgpath='wide_deep_test/img/%s.jpg'%filename
n1g1_ddp_mem='/home/shiyunxiao/wdl_test_all/results/new/n1g1_ddp_mem/record_16384_0.csv'
n1g1_eager_mem='/home/shiyunxiao/wdl_test_all/results/new/n1g1_eager_mem/record_16384_0.csv'
n1g1_graph_mem='/home/shiyunxiao/wdl_test_all/results/new/n1g1_graph_mem/record_16384_0.csv'
old_n1g1_mem='/home/shiyunxiao/wdl_test_all/results/old/n1g1_benchmark_mem.csv'
n1g1_ddp_mem=pd.read_csv(n1g1_ddp_mem)
n1g1_eager_mem = pd.read_csv(n1g1_eager_mem)
n1g1_graph_mem = pd.read_csv(n1g1_graph_mem)
old_n1g1_mem = pd.read_csv(old_n1g1_mem)
n1g1_ddp_mem=n1g1_ddp_mem.loc[:,['memory_usage/MB']].values
n1g1_eager_mem=n1g1_eager_mem.loc[:,['memory_usage/MB']].values
n1g1_graph_mem=n1g1_graph_mem.loc[:,['memory_usage/MB']].values
old_n1g1_mem=old_n1g1_mem.loc[:,['memory_usage_0/MB']].values
x = list(range(100))
plt.figure(figsize=(8, 5))
plt.xlabel('iter times')
plt.ylabel('memory_usage/MB')
plt.plot(x,n1g1_ddp_mem,label='ddp')
plt.plot(x,n1g1_eager_mem,label='eager')
plt.plot(x,n1g1_graph_mem,label='graph')
plt.plot(x,old_n1g1_mem,label='benchmark')
plt.legend()
plt.savefig(imgpath,dpi=400)
def n1g1_mem_avg():
'''draw n1g1 benchmark,eager,graph,ddp table of memory'''
filename='n1g1_ddp_eager_graph_mem'
n1g1_ddp_mem='/home/shiyunxiao/wdl_test_all/results/new/n1g1_ddp_mem/record_16384_0.csv'
n1g1_eager_mem='/home/shiyunxiao/wdl_test_all/results/new/n1g1_eager_mem/record_16384_0.csv'
n1g1_graph_mem='/home/shiyunxiao/wdl_test_all/results/new/n1g1_graph_mem/record_16384_0.csv'
old_n1g1_mem='/home/shiyunxiao/wdl_test_all/results/old/n1g1_benchmark_mem.csv'
n1g1_ddp_mem=pd.read_csv(n1g1_ddp_mem)
n1g1_eager_mem = pd.read_csv(n1g1_eager_mem)
n1g1_graph_mem = pd.read_csv(n1g1_graph_mem)
old_n1g1_mem = pd.read_csv(old_n1g1_mem)
n1g1_ddp_mem=n1g1_ddp_mem.loc[:,['memory_usage/MB']].values[20:100]
n1g1_eager_mem=n1g1_eager_mem.loc[:,['memory_usage/MB']].values[20:100]
n1g1_graph_mem=n1g1_graph_mem.loc[:,['memory_usage/MB']].values[20:100]
old_n1g1_mem=old_n1g1_mem.loc[:,['memory_usage_0/MB']].values[20:100]
return (np.mean(n1g1_ddp_mem),
np.mean(n1g1_eager_mem),
np.mean(n1g1_graph_mem),
np.mean(old_n1g1_mem))
def n1g8_lat_mem(column='latency/ms'):
'''draw n1g8 benchmark,eager,graph,ddp line of latency or memory'''
filename='n1g8_ddp_eager_graph_%s'%(column[:3])
imgpath='wide_deep_test/img/%s.jpg'%filename
plt.figure(figsize=(8, 5))
plt.xlabel('iter times')
plt.ylabel(column)
x = list(range(100))
#calculate average lat in 8 devices per batch
for dirpath in ['/home/shiyunxiao/wdl_test_all/results/new/n1g8_ddp_mem','/home/shiyunxiao/wdl_test_all/results/new/n1g8_eager_mem','/home/shiyunxiao/wdl_test_all/results/new/n1g8_graph_mem']:
data=np.zeros(100)
for i in range(8):
csv_path=os.path.join(dirpath,'record_16384_%s.csv'%i)
df = pd.read_csv(csv_path)
the_data=df.loc[:,[column]].values.flatten()
data+=the_data
data=data/8
label_str=dirpath.split('''/''')[-1].split('_')[-2]
plt.plot(x,data,label=label_str)
old_csv_path='/home/shiyunxiao/wdl_test_all/results/old/n1g8_benchmark_mem.csv'
df=pd.read_csv(old_csv_path)
if column=='latency/ms' or column=='loss':
data=df.loc[:,[column]].values.flatten()
plt.plot(x,data,label='benchmark')
else:
data=df.iloc[:,-8:-1].values
data=np.mean(data,axis=1).flatten()
plt.plot(x,data,label='benchmark')
plt.legend()
plt.savefig(imgpath,dpi=400)
def n1g8_lat_mem_avg(column='latency/ms'):
'''draw n1g8 benchmark,eager,graph,ddp table of latency or memory'''
filename='n1g8_ddp_eager_graph_%s'%(column[:3])
imgpath='Oneflow_intern_work/img/%s.jpg'%filename
#calculate average lat in 8 devices per batch
return_result=[]
for dirpath in ['/home/shiyunxiao/wdl_test_all/results/new/n1g8_ddp_mem','/home/shiyunxiao/wdl_test_all/results/new/n1g8_eager_mem','/home/shiyunxiao/wdl_test_all/results/new/n1g8_graph_mem']:
data=np.zeros(100)
for i in range(8):
csv_path=os.path.join(dirpath,'record_16384_%s.csv'%i)
df = pd.read_csv(csv_path)
the_data=df.loc[:,[column]].values.flatten()
data+=the_data
data=data/8
label_str=dirpath.split('''/''')[-1].split('_')[-2]
return_result.append(np.mean(data[20:100]))
old_csv_path='/home/shiyunxiao/wdl_test_all/results/old/n1g8_benchmark_mem.csv'
df=pd.read_csv(old_csv_path)
if column=='latency/ms' or column=='loss':
data=df.loc[:,[column]].values.flatten()
return_result.append(np.mean(data[20:100]))
else:
data=df.iloc[:,-8:-1].values
data=np.mean(data,axis=1).flatten()
return_result.append(np.mean(data[20:100]))
return tuple(return_result)
def n1g1_tabel():
(ddp_lat,eager_lat,graph_lat,old_lat)=( round(num,3) for num in n1g1_lat_avg())
(ddp_mem,eager_mem,graph_mem,old_mem)=( round(num,3) for num in n1g1_mem_avg())
base_info=[['n1g1','32','0'],['n1g1','32','0']]
data=[[ddp_lat,eager_lat,graph_lat,old_lat],
[ddp_mem,eager_mem,graph_mem,old_mem]]
data=np.concatenate((base_info,data),axis=1)
df=pd.DataFrame(data,columns=['gpu','batch_size','dropout','ddp','eager','graph','old'],index=['latency/ms','memory_usage/MB'])
print(df)
def n1g8_tabel():
(ddp_lat,eager_lat,graph_lat,old_lat)=( round(num,3) for num in n1g8_lat_mem_avg('latency/ms'))
(ddp_mem,eager_mem,graph_mem,old_mem)=( round(num,3) for num in n1g8_lat_mem_avg('memory_usage/MB'))
base_info=[['n1g8','16384','0.5'],['n1g8','16384','0.5']]
data=[[ddp_lat,eager_lat,graph_lat,old_lat],
[ddp_mem,eager_mem,graph_mem,old_mem]]
data=np.concatenate((base_info,data),axis=1)
df=pd.DataFrame(data,columns=['gpu','batch_size','dropout','ddp','eager','graph','old'],index=['latency/ms','memory_usage/MB'])
print(df)
if __name__ == "__main__":
all_n1n1_loss()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册