提交 49fa2349 编写于 作者: O ouyangyu

add argument

上级 102ab268
......@@ -19,61 +19,126 @@ from datetime import datetime
def str_list(x):
return x.split(',')
return x.split(",")
def int_list(x):
return list(map(int, x.split(',')))
return list(map(int, x.split(",")))
def float_list(x):
return list(map(float, x.split(',')))
return list(map(float, x.split(",")))
def str2bool(v):
if v.lower() in ('yes', 'true', 't', 'y', '1'):
if v.lower() in ("yes", "true", "t", "y", "1"):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
elif v.lower() in ("no", "false", "f", "n", "0"):
return False
else:
raise argparse.ArgumentTypeError('Unsupported value encountered.')
raise argparse.ArgumentTypeError("Unsupported value encountered.")
def get_parser(parser=None):
parser = argparse.ArgumentParser(description="flags for bert")
parser.add_argument('--do_train', type=str2bool, nargs='?', const=True, help='train or not')
parser.add_argument('--do_eval', type=str2bool, nargs='?', const=True, help='eval or not')
parser.add_argument(
"--do_train", type=str2bool, nargs="?", const=True, help="train or not"
)
parser.add_argument(
"--do_eval", type=str2bool, nargs="?", const=True, help="eval or not"
)
# resouce
parser.add_argument("--model", type=str, default='BERT Pretrain')
parser.add_argument("--model", type=str, default="BERT Pretrain")
parser.add_argument("--gpu_num_per_node", type=int, default=1)
parser.add_argument('--num_nodes', type=int, default=1,
help='node/machine number for training')
parser.add_argument('--node_ips', type=str_list, default=['192.168.1.13', '192.168.1.14'],
help='nodes ip list for training, devided by ",", length >= num_nodes')
parser.add_argument("--ctrl_port", type=int, default=50051, help='ctrl_port for multinode job')
parser.add_argument(
"--num_nodes", type=int, default=1, help="node/machine number for training"
)
parser.add_argument(
"--node_ips",
type=str_list,
default=["192.168.1.13", "192.168.1.14"],
help='nodes ip list for training, devided by ",", length >= num_nodes',
)
parser.add_argument(
"--ctrl_port", type=int, default=50051, help="ctrl_port for multinode job"
)
# train
parser.add_argument("--learning_rate", type=float, default=1e-4, help="Learning rate")
parser.add_argument("--weight_decay_rate", type=float, default=0.01, help="weight decay rate")
parser.add_argument(
"--learning_rate", type=float, default=1e-4, help="Learning rate"
)
parser.add_argument(
"--weight_decay_rate", type=float, default=0.01, help="weight decay rate"
)
parser.add_argument("--warmup_proportion", type=float, default=0.1)
parser.add_argument('--use_fp16', type=str2bool, nargs='?', default='False', const=True,
help='use use fp16 or not')
parser.add_argument('--use_xla', type=str2bool, nargs='?', const=True,
help='Whether to use use xla')
parser.add_argument(
"--use_fp16",
type=str2bool,
nargs="?",
default="False",
const=True,
help="use use fp16 or not",
)
parser.add_argument(
"--use_xla", type=str2bool, nargs="?", const=True, help="Whether to use use xla"
)
parser.add_argument(
"--num_accumulation_steps",
type=int,
default=1,
help="Number of accumulation steps before gradient update, Global batch size = num_accumulation_steps * train_batch_size",
)
parser.add_argument(
"--optimizer_type",
type=str,
default="adam",
help="Optimizer used for training - LAMB or ADAM",
)
# log and resore/save
parser.add_argument("--loss_print_every_n_iter", type=int, default=10, required=False,
help="print loss every n iteration")
parser.add_argument("--model_save_every_n_iter", type=int, default=10000, required=False,
help="save model every n iteration",)
parser.add_argument("--model_save_dir", type=str,
default="./output/model_save-{}".format(str(datetime.now().strftime("%Y-%m-%d-%H:%M:%S"))),
required=False, help="model save directory")
parser.add_argument("--save_last_snapshot", type=str2bool, default=False, required=False,
help="save model snapshot for last iteration")
parser.add_argument("--model_load_dir", type=str, default=None, help="model load directory")
parser.add_argument("--log_dir", type=str, default="./output", help="log info save directory")
parser.add_argument(
"--loss_print_every_n_iter",
type=int,
default=10,
required=False,
help="print loss every n iteration",
)
parser.add_argument(
"--model_save_every_n_iter",
type=int,
default=10000,
required=False,
help="save model every n iteration",
)
parser.add_argument(
"--model_save_dir",
type=str,
default="./output/model_save-{}".format(
str(datetime.now().strftime("%Y-%m-%d-%H:%M:%S"))
),
required=False,
help="model save directory",
)
parser.add_argument(
"--save_last_snapshot",
type=str2bool,
default=False,
required=False,
help="save model snapshot for last iteration",
)
parser.add_argument(
"--model_load_dir", type=str, default=None, help="model load directory"
)
parser.add_argument(
"--log_dir", type=str, default="./output", help="log info save directory"
)
# bert backbone
parser.add_argument('--do_lower_case', type=str2bool, nargs='?', const=True, default='True')
parser.add_argument(
"--do_lower_case", type=str2bool, nargs="?", const=True, default="True"
)
parser.add_argument("--seq_length", type=int, default=512)
parser.add_argument("--max_predictions_per_seq", type=int, default=80)
parser.add_argument("--num_hidden_layers", type=int, default=24)
......@@ -90,17 +155,19 @@ def get_parser(parser=None):
def print_args(args):
print("=".ljust(66, "="))
print("Running {}: num_gpu_per_node = {}, num_nodes = {}.".format(
args.model, args.gpu_num_per_node, args.num_nodes))
print(
"Running {}: num_gpu_per_node = {}, num_nodes = {}.".format(
args.model, args.gpu_num_per_node, args.num_nodes
)
)
print("=".ljust(66, "="))
for arg in vars(args):
print("{} = {}".format(arg, getattr(args, arg)))
print("-".ljust(66, "-"))
print("Time stamp: {}".format(
str(datetime.now().strftime("%Y-%m-%d-%H:%M:%S"))))
print("Time stamp: {}".format(str(datetime.now().strftime("%Y-%m-%d-%H:%M:%S"))))
if __name__ == '__main__':
if __name__ == "__main__":
parser = get_parser()
args = parser.parse_args()
print_args(args)
......@@ -26,8 +26,12 @@ from util import Snapshot, InitNodes, Metric, CreateOptimizer, GetFunctionConfig
parser = configs.get_parser()
parser.add_argument("--data_dir", type=str, default=None)
parser.add_argument("--data_part_num", type=int, default=32, help="data part number in dataset")
parser.add_argument("--iter_num", type=int, default=1144000, help="total iterations to run")
parser.add_argument(
"--data_part_num", type=int, default=32, help="data part number in dataset"
)
parser.add_argument(
"--iter_num", type=int, default=1144000, help="total iterations to run"
)
parser.add_argument("--batch_size_per_device", type=int, default=64)
args = parser.parse_args()
configs.print_args(args)
......@@ -35,15 +39,22 @@ configs.print_args(args)
batch_size = args.num_nodes * args.gpu_num_per_node * args.batch_size_per_device
def BertDecoder(data_dir, batch_size, data_part_num, seq_length, max_predictions_per_seq):
ofrecord = flow.data.ofrecord_reader(data_dir,
batch_size=batch_size,
data_part_num=data_part_num,
random_shuffle = True,
shuffle_after_epoch=True)
def BertDecoder(
data_dir, batch_size, data_part_num, seq_length, max_predictions_per_seq
):
ofrecord = flow.data.ofrecord_reader(
data_dir,
batch_size=batch_size,
data_part_num=data_part_num,
random_shuffle=True,
shuffle_after_epoch=True,
)
blob_confs = {}
def _blob_conf(name, shape, dtype=flow.int32):
blob_confs[name] = flow.data.OFRecordRawDecoder(ofrecord, name, shape=shape, dtype=dtype)
blob_confs[name] = flow.data.OFRecordRawDecoder(
ofrecord, name, shape=shape, dtype=dtype
)
_blob_conf("input_ids", [seq_length])
_blob_conf("next_sentence_labels", [1])
......@@ -54,19 +65,30 @@ def BertDecoder(data_dir, batch_size, data_part_num, seq_length, max_predictions
_blob_conf("masked_lm_weights", [max_predictions_per_seq], flow.float)
return blob_confs
@flow.global_function(type='train', function_config=GetFunctionConfig(args))
@flow.global_function(type="train", function_config=GetFunctionConfig(args))
def PretrainJob():
hidden_size = 64 * args.num_attention_heads # , H = 64, size per head
intermediate_size = hidden_size * 4
if args.data_part_num == 1:
with flow.scope.placement("cpu", "0:0"):
decoders = BertDecoder(args.data_dir, batch_size, args.data_part_num, args.seq_length,
args.max_predictions_per_seq)
decoders = BertDecoder(
args.data_dir,
batch_size,
args.data_part_num,
args.seq_length,
args.max_predictions_per_seq,
)
else:
assert args.data_part_num > 1
decoders = BertDecoder(args.data_dir, batch_size, args.data_part_num, args.seq_length,
args.max_predictions_per_seq)
decoders = BertDecoder(
args.data_dir,
batch_size,
args.data_part_num,
args.seq_length,
args.max_predictions_per_seq,
)
total_loss, mlm_loss, nsp_loss = PreTrain(
decoders["input_ids"],
......@@ -93,7 +115,7 @@ def PretrainJob():
)
opt = CreateOptimizer(args)
opt.minimize(total_loss)
return {'total_loss': total_loss, 'mlm_loss': mlm_loss, 'nsp_loss': nsp_loss}
return {"total_loss": total_loss, "mlm_loss": mlm_loss, "nsp_loss": nsp_loss}
def main():
......@@ -104,11 +126,17 @@ def main():
snapshot = Snapshot(args.model_save_dir, args.model_load_dir)
metric = Metric(desc='train', print_steps=args.loss_print_every_n_iter,
batch_size=batch_size, keys=['total_loss', 'mlm_loss', 'nsp_loss'])
print("num_accumulation_steps:", args.num_accumulation_steps)
metric = Metric(
desc="train",
print_steps=args.loss_print_every_n_iter,
batch_size=batch_size * args.num_accumulation_steps,
keys=["total_loss", "mlm_loss", "nsp_loss"],
)
for step in range(args.iter_num):
PretrainJob().async_get(metric.metric_cb(step))
#PretrainJob().async_get(metric.metric_cb(step, epoch=3))
# PretrainJob().async_get(metric.metric_cb(step, epoch=3))
if (step + 1) % args.model_save_every_n_iter == 0:
snapshot.save("snapshot_%d" % (step + 1))
......
BENCH_ROOT_DIR=/path/to/OneFlow-Benchmark/LanguageModeling/BERT
OUTPUT_DIR=/DATA/disk1/of_output
DATA_DIR=/DATA/disk1/bert/wiki_seq_len_128
BZ=48
ITER_NUM=1000000
max_seq_length=128
max_predictions_per_seq=20
of_log_dir=$OUTPUT_DIR/bert_master/of
rm -rf ${of_log_dir}
mkdir -p ${of_log_dir}
rm -rf core.*
export PYTHONUNBUFFERED=1
export ONEFLOW_DEBUG_MODE=True
export GLOG_v=3
export CUDA_VISIBLE_DEVICES=6
python3 $BENCH_ROOT_DIR/run_pretraining.py \
--gpu_num_per_node=1 \
--num_nodes=1 \
--learning_rate=1.25e-5 \
--warmup_proportion=0.01 \
--weight_decay_rate=0.01 \
--batch_size_per_device=${BZ} \
--iter_num=${ITER_NUM} \
--loss_print_every_n_iter=1 \
--seq_length=128 \
--use_fp16 \
--max_predictions_per_seq=20 \
--num_hidden_layers=12 \
--num_attention_heads=12 \
--num_accumulation_steps=1 \
--max_position_embeddings=512 \
--type_vocab_size=2 \
--vocab_size=30522 \
--attention_probs_dropout_prob=0.1 \
--hidden_dropout_prob=0.1 \
--hidden_size_per_head=64 \
--data_part_num=64 \
--data_dir=$DATA_DIR \
--log_dir=${of_log_dir} \
--model_save_every_n_iter=50000 \
--model_save_dir=${of_log_dir}
BENCH_ROOT_DIR=/path/to/OneFlow-Benchmark/LanguageModeling/BERT
OUTPUT_DIR=/DATA/disk1/of_output
DATA_DIR=/DATA/disk1/bert/wiki_seq_len_128
BZ=16
ITER_NUM=1000000
max_seq_length=128
max_predictions_per_seq=20
of_log_dir=$OUTPUT_DIR/bert_master/of
rm -rf ${of_log_dir}
mkdir -p ${of_log_dir}
rm -rf core.*
export PYTHONUNBUFFERED=1
export ONEFLOW_DEBUG_MODE=True
export GLOG_v=3
python3 $BENCH_ROOT_DIR/run_pretraining.py \
--gpu_num_per_node=8 \
--num_nodes=1 \
--learning_rate=1e-4 \
--warmup_proportion=0.01 \
--weight_decay_rate=0.01 \
--batch_size_per_device=${BZ} \
--iter_num=${ITER_NUM} \
--loss_print_every_n_iter=1 \
--seq_length=128 \
--use_fp16 \
--optimizer_type="lamb" \
--max_predictions_per_seq=20 \
--num_hidden_layers=12 \
--num_attention_heads=12 \
--num_accumulation_steps=512 \
--max_position_embeddings=512 \
--type_vocab_size=2 \
--vocab_size=30522 \
--attention_probs_dropout_prob=0.1 \
--hidden_dropout_prob=0.1 \
--hidden_size_per_head=64 \
--data_part_num=64 \
--data_dir=$DATA_DIR \
--log_dir=${of_log_dir} \
--model_save_every_n_iter=50000 \
--model_save_dir=${of_log_dir}
\ No newline at end of file
BENCH_ROOT_DIR=/path/to/OneFlow-Benchmark/LanguageModeling/BERT
# pretrained model dir
PRETRAINED_MODEL=/DATA/disk1/of_output/uncased_L-12_H-768_A-12_oneflow
# squad ofrecord dataset dir
DATA_ROOT=/DATA/disk1/of_output/bert/of_squad
# `vocab.txt` dir
REF_ROOT_DIR=/DATA/disk1/of_output/uncased_L-12_H-768_A-12
# `evaluate-v*.py` and `dev-v*.json` dir
SQUAD_TOOL_DIR=/DATA/disk1/of_output/bert/of_squad
db_version=${1:-"v2.0"}
if [ $db_version = "v1.1" ]; then
train_example_num=88614
eval_example_num=10833
version_2_with_negative="False"
elif [ $db_version = "v2.0" ]; then
train_example_num=131944
eval_example_num=12232
version_2_with_negative="True"
else
echo "db_version must be 'v1.1' or 'v2.0'"
exit
fi
train_data_dir=$DATA_ROOT/train-$db_version
eval_data_dir=$DATA_ROOT/dev-$db_version
LOGFILE=./bert_fp_training.log
export PYTHONUNBUFFERED=1
export ONEFLOW_DEBUG_MODE=True
export CUDA_VISIBLE_DEVICES=7
# finetune and eval SQuAD,
# `predictions.json` will be saved to folder `./squad_output`
python3 $BENCH_ROOT_DIR/run_squad.py \
--model=SQuAD \
--do_train=True \
--do_eval=True \
--gpu_num_per_node=1 \
--learning_rate=3e-5 \
--batch_size_per_device=16 \
--eval_batch_size_per_device=16 \
--num_epoch=3 \
--use_fp16 \
--version_2_with_negative=$version_2_with_negative \
--loss_print_every_n_iter=20 \
--do_lower_case=True \
--seq_length=384 \
--num_hidden_layers=12 \
--num_attention_heads=12 \
--max_position_embeddings=512 \
--type_vocab_size=2 \
--vocab_size=30522 \
--attention_probs_dropout_prob=0.1 \
--hidden_dropout_prob=0.1 \
--hidden_size_per_head=64 \
--train_data_dir=$train_data_dir \
--train_example_num=$train_example_num \
--eval_data_dir=$eval_data_dir \
--eval_example_num=$eval_example_num \
--log_dir=./log \
--model_load_dir=${PRETRAINED_MODEL} \
--save_last_snapshot=True \
--model_save_dir=./squad_snapshots \
--vocab_file=$REF_ROOT_DIR/vocab.txt \
--predict_file=$SQUAD_TOOL_DIR/dev-${db_version}.json \
--output_dir=./squad_output 2>&1 | tee ${LOGFILE}
# evaluate predictions.json to get metrics
python3 $SQUAD_TOOL_DIR/evaluate-${db_version}.py \
$SQUAD_TOOL_DIR/dev-${db_version}.json \
./squad_output/predictions.json
......@@ -28,7 +28,7 @@ def InitNodes(args):
assert args.num_nodes <= len(args.node_ips)
flow.env.ctrl_port(args.ctrl_port)
nodes = []
for ip in args.node_ips[:args.num_nodes]:
for ip in args.node_ips[: args.num_nodes]:
addr_dict = {}
addr_dict["addr"] = ip
nodes.append(addr_dict)
......@@ -44,11 +44,13 @@ class Snapshot(object):
print("Restoring model from {}.".format(model_load_dir))
flow.load_variables(flow.checkpoint.get(model_load_dir))
else:
flow.checkpoint.save('initial_model')
flow.checkpoint.save("initial_model")
print("Init model on demand.")
def save(self, name):
snapshot_save_path = os.path.join(self._model_save_dir, "snapshot_{}".format(name))
snapshot_save_path = os.path.join(
self._model_save_dir, "snapshot_{}".format(name)
)
if not os.path.exists(snapshot_save_path):
os.makedirs(snapshot_save_path)
print("Saving model to {}.".format(snapshot_save_path))
......@@ -77,7 +79,14 @@ class StopWatch(object):
class Metric(object):
def __init__(self, desc='train', print_steps=-1, batch_size=256, keys=[]):
def __init__(
self,
desc="train",
print_steps=-1,
batch_size=256,
keys=[],
nvidia_smi_report_step=10,
):
r"""accumulate and calculate metric
Args:
......@@ -91,11 +100,12 @@ class Metric(object):
self.print_steps = print_steps
assert batch_size > 0
self.batch_size = batch_size
self.nvidia_smi_report_step = nvidia_smi_report_step
assert isinstance(keys, (list, tuple))
self.keys = keys
self.metric_dict = OrderedDict()
self.metric_dict['step'] = 0
self.metric_dict["step"] = 0
self.timer = StopWatch()
self.timer.start()
......@@ -104,58 +114,90 @@ class Metric(object):
def _clear(self):
for key in self.keys:
self.metric_dict[key] = 0.0
self.metric_dict['n_' + key] = 0.0
self.metric_dict['throughput'] = 0.0
self.metric_dict["n_" + key] = 0.0
self.metric_dict["throughput"] = 0.0
self.num_samples = 0.0
def update_and_save(self, key, value, step, **kwargs):
self.metric_dict[key] = value
self.metric_dict.pop('n_' + key, None)
self.metric_dict.pop("n_" + key, None)
def metric_cb(self, step=0, **kwargs):
def callback(outputs):
if step == 0: self._clear()
if step == 0:
self._clear()
if step == self.nvidia_smi_report_step:
cmd = "nvidia-smi --query-gpu=utilization.gpu,memory.used --format=csv"
os.system(cmd)
for key in self.keys:
self.metric_dict[key] += outputs[key].sum()
self.metric_dict['n_' + key] += outputs[key].size
self.metric_dict["n_" + key] += outputs[key].size
self.num_samples += self.batch_size
if (step + 1) % self.print_steps == 0:
self.metric_dict['step'] = step
self.metric_dict["step"] = step
for k, v in kwargs.items():
self.metric_dict[k] = v
throughput = self.num_samples / self.timer.split()
self.update_and_save('throughput', throughput, step)
self.update_and_save("throughput", throughput, step)
for key in self.keys:
value = self.metric_dict[key] / self.metric_dict['n_' + key]
value = self.metric_dict[key] / self.metric_dict["n_" + key]
self.update_and_save(key, value, step, **kwargs)
print(', '.join(('{}: {}' if type(v) is int else '{}: {:.3f}').format(k, v) \
for k, v in self.metric_dict.items()), time.time())
print(
", ".join(
("{}: {}" if type(v) is int else "{}: {:.3f}").format(k, v)
for k, v in self.metric_dict.items()
),
time.time(),
)
self._clear()
return callback
def CreateOptimizer(args):
warmup_batches = int(args.iter_num * args.warmup_proportion)
lr_warmup = flow.optimizer.warmup.linear(warmup_batches, 0)
lr_scheduler = flow.optimizer.PolynomialScheduler(args.learning_rate, args.iter_num, 0.0,
warmup=lr_warmup)
lr_scheduler = flow.optimizer.PolynomialScheduler(
args.learning_rate, args.iter_num, 0.0, warmup=lr_warmup
)
loss_scale_policy = None
if args.use_fp16:
loss_scale_policy = flow.optimizer.loss_scale.dynamic_loss_scale(increment_period=2000);
return flow.optimizer.AdamW(lr_scheduler, epsilon=1e-6, weight_decay=args.weight_decay_rate,
weight_decay_excludes=["bias", "LayerNorm", "layer_norm"],
grad_clipping=flow.optimizer.grad_clipping.by_global_norm(1.0),
loss_scale_policy=loss_scale_policy)
loss_scale_policy = flow.optimizer.loss_scale.dynamic_loss_scale(
increment_period=2000
)
if args.optimizer_type == "lamb":
return flow.optimizer.LAMB(
lr_scheduler,
beta1=0.9,
beta2=0.999,
epsilon=1e-6,
weight_decay=args.weight_decay_rate,
weight_decay_excludes=["bias", "LayerNorm", "layer_norm"],
grad_clipping=flow.optimizer.grad_clipping.by_global_norm(1.0),
loss_scale_policy=loss_scale_policy,
)
else:
return flow.optimizer.AdamW(
lr_scheduler,
epsilon=1e-6,
weight_decay=args.weight_decay_rate,
weight_decay_excludes=["bias", "LayerNorm", "layer_norm"],
grad_clipping=flow.optimizer.grad_clipping.by_global_norm(1.0),
loss_scale_policy=loss_scale_policy,
)
def GetFunctionConfig(args):
config = flow.function_config()
config.enable_auto_mixed_precision(args.use_fp16)
config.train.num_gradient_accumulation_steps(args.num_accumulation_steps)
if args.use_xla:
config.use_xla_jit(True)
config.enable_fuse_add_to_output(True)
config.enable_fuse_model_update_ops(True)
return config
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册