未验证 提交 717fe1e4 编写于 作者: H Hui Zhang 提交者: GitHub

Merge pull request #680 from PaddlePaddle/checkpoint

checkpoint refactor to save disk space
...@@ -18,8 +18,8 @@ import paddle ...@@ -18,8 +18,8 @@ import paddle
from paddle import distributed as dist from paddle import distributed as dist
from tensorboardX import SummaryWriter from tensorboardX import SummaryWriter
from deepspeech.utils import checkpoint
from deepspeech.utils import mp_tools from deepspeech.utils import mp_tools
from deepspeech.utils.checkpoint import Checkpoint
from deepspeech.utils.log import Log from deepspeech.utils.log import Log
__all__ = ["Trainer"] __all__ = ["Trainer"]
...@@ -139,9 +139,9 @@ class Trainer(): ...@@ -139,9 +139,9 @@ class Trainer():
"epoch": self.epoch, "epoch": self.epoch,
"lr": self.optimizer.get_lr() "lr": self.optimizer.get_lr()
}) })
checkpoint.save_parameters(self.checkpoint_dir, self.iteration self.checkpoint.add_checkpoint(self.checkpoint_dir, self.iteration
if tag is None else tag, self.model, if tag is None else tag, self.model,
self.optimizer, infos) self.optimizer, infos)
def resume_or_scratch(self): def resume_or_scratch(self):
"""Resume from latest checkpoint at checkpoints in the output """Resume from latest checkpoint at checkpoints in the output
...@@ -151,7 +151,7 @@ class Trainer(): ...@@ -151,7 +151,7 @@ class Trainer():
resume training. resume training.
""" """
scratch = None scratch = None
infos = checkpoint.load_parameters( infos = self.checkpoint.load_latest_parameters(
self.model, self.model,
self.optimizer, self.optimizer,
checkpoint_dir=self.checkpoint_dir, checkpoint_dir=self.checkpoint_dir,
...@@ -180,7 +180,7 @@ class Trainer(): ...@@ -180,7 +180,7 @@ class Trainer():
from_scratch = self.resume_or_scratch() from_scratch = self.resume_or_scratch()
if from_scratch: if from_scratch:
# save init model, i.e. 0 epoch # save init model, i.e. 0 epoch
self.save(tag='init') self.save(tag='init', infos=None)
self.lr_scheduler.step(self.iteration) self.lr_scheduler.step(self.iteration)
if self.parallel: if self.parallel:
...@@ -263,6 +263,10 @@ class Trainer(): ...@@ -263,6 +263,10 @@ class Trainer():
self.checkpoint_dir = checkpoint_dir self.checkpoint_dir = checkpoint_dir
self.checkpoint = Checkpoint(
kbest_n=self.config.training.checkpoint.kbest_n,
latest_n=self.config.training.checkpoint.latest_n)
@mp_tools.rank_zero_only @mp_tools.rank_zero_only
def destory(self): def destory(self):
"""Close visualizer to avoid hanging after training""" """Close visualizer to avoid hanging after training"""
......
...@@ -11,9 +11,11 @@ ...@@ -11,9 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import glob
import json import json
import os import os
import re import re
from pathlib import Path
from typing import Union from typing import Union
import paddle import paddle
...@@ -25,128 +27,260 @@ from deepspeech.utils.log import Log ...@@ -25,128 +27,260 @@ from deepspeech.utils.log import Log
logger = Log(__name__).getlog() logger = Log(__name__).getlog()
__all__ = ["load_parameters", "save_parameters"] __all__ = ["Checkpoint"]
def _load_latest_checkpoint(checkpoint_dir: str) -> int: class Checkpoint(object):
"""Get the iteration number corresponding to the latest saved checkpoint. def __init__(self, kbest_n: int=5, latest_n: int=1):
Args: self.best_records: Mapping[Path, float] = {}
checkpoint_dir (str): the directory where checkpoint is saved. self.latest_records = []
Returns: self.kbest_n = kbest_n
int: the latest iteration number. -1 for no checkpoint to load. self.latest_n = latest_n
""" self._save_all = (kbest_n == -1)
checkpoint_record = os.path.join(checkpoint_dir, "checkpoint")
if not os.path.isfile(checkpoint_record): def add_checkpoint(self,
return -1 checkpoint_dir,
tag_or_iteration,
# Fetch the latest checkpoint index. model,
with open(checkpoint_record, "rt") as handle: optimizer,
latest_checkpoint = handle.readlines()[-1].strip() infos,
iteration = int(latest_checkpoint.split(":")[-1]) metric_type="val_loss"):
return iteration if (metric_type not in infos.keys()):
self._save_parameters(checkpoint_dir, tag_or_iteration, model,
optimizer, infos)
def _save_record(checkpoint_dir: str, iteration: int): return
"""Save the iteration number of the latest model to be checkpoint record.
Args: #save best
checkpoint_dir (str): the directory where checkpoint is saved. if self._should_save_best(infos[metric_type]):
iteration (int): the latest iteration number. self._save_best_checkpoint_and_update(
Returns: infos[metric_type], checkpoint_dir, tag_or_iteration, model,
None optimizer, infos)
""" #save latest
checkpoint_record = os.path.join(checkpoint_dir, "checkpoint") self._save_latest_checkpoint_and_update(
# Update the latest checkpoint index. checkpoint_dir, tag_or_iteration, model, optimizer, infos)
with open(checkpoint_record, "a+") as handle:
handle.write("model_checkpoint_path:{}\n".format(iteration)) if isinstance(tag_or_iteration, int):
self._save_checkpoint_record(checkpoint_dir, tag_or_iteration)
def load_parameters(model, def load_latest_parameters(self,
optimizer=None, model,
checkpoint_dir=None, optimizer=None,
checkpoint_path=None): checkpoint_dir=None,
"""Load a specific model checkpoint from disk. checkpoint_path=None):
Args: """Load a last model checkpoint from disk.
model (Layer): model to load parameters. Args:
optimizer (Optimizer, optional): optimizer to load states if needed. model (Layer): model to load parameters.
Defaults to None. optimizer (Optimizer, optional): optimizer to load states if needed.
checkpoint_dir (str, optional): the directory where checkpoint is saved. Defaults to None.
checkpoint_path (str, optional): if specified, load the checkpoint checkpoint_dir (str, optional): the directory where checkpoint is saved.
stored in the checkpoint_path(prefix) and the argument 'checkpoint_dir' will checkpoint_path (str, optional): if specified, load the checkpoint
be ignored. Defaults to None. stored in the checkpoint_path(prefix) and the argument 'checkpoint_dir' will
Returns: be ignored. Defaults to None.
configs (dict): epoch or step, lr and other meta info should be saved. Returns:
""" configs (dict): epoch or step, lr and other meta info should be saved.
configs = {} """
return self._load_parameters(model, optimizer, checkpoint_dir,
if checkpoint_path is not None: checkpoint_path, "checkpoint_latest")
tag = os.path.basename(checkpoint_path).split(":")[-1]
elif checkpoint_dir is not None: def load_best_parameters(self,
iteration = _load_latest_checkpoint(checkpoint_dir) model,
if iteration == -1: optimizer=None,
return configs checkpoint_dir=None,
checkpoint_path = os.path.join(checkpoint_dir, "{}".format(iteration)) checkpoint_path=None):
else: """Load a last model checkpoint from disk.
raise ValueError( Args:
"At least one of 'checkpoint_dir' and 'checkpoint_path' should be specified!" model (Layer): model to load parameters.
) optimizer (Optimizer, optional): optimizer to load states if needed.
Defaults to None.
rank = dist.get_rank() checkpoint_dir (str, optional): the directory where checkpoint is saved.
checkpoint_path (str, optional): if specified, load the checkpoint
params_path = checkpoint_path + ".pdparams" stored in the checkpoint_path(prefix) and the argument 'checkpoint_dir' will
model_dict = paddle.load(params_path) be ignored. Defaults to None.
model.set_state_dict(model_dict) Returns:
logger.info("Rank {}: loaded model from {}".format(rank, params_path)) configs (dict): epoch or step, lr and other meta info should be saved.
"""
optimizer_path = checkpoint_path + ".pdopt" return self._load_parameters(model, optimizer, checkpoint_dir,
if optimizer and os.path.isfile(optimizer_path): checkpoint_path, "checkpoint_best")
optimizer_dict = paddle.load(optimizer_path)
optimizer.set_state_dict(optimizer_dict) def _should_save_best(self, metric: float) -> bool:
logger.info("Rank {}: loaded optimizer state from {}".format( if not self._best_full():
rank, optimizer_path)) return True
info_path = re.sub('.pdparams$', '.json', params_path) # already full
if os.path.exists(info_path): worst_record_path = max(self.best_records, key=self.best_records.get)
with open(info_path, 'r') as fin: # worst_record_path = max(self.best_records.iteritems(), key=operator.itemgetter(1))[0]
configs = json.load(fin) worst_metric = self.best_records[worst_record_path]
return configs return metric < worst_metric
def _best_full(self):
@mp_tools.rank_zero_only return (not self._save_all) and len(self.best_records) == self.kbest_n
def save_parameters(checkpoint_dir: str,
tag_or_iteration: Union[int, str], def _latest_full(self):
model: paddle.nn.Layer, return len(self.latest_records) == self.latest_n
optimizer: Optimizer=None,
infos: dict=None): def _save_best_checkpoint_and_update(self, metric, checkpoint_dir,
"""Checkpoint the latest trained model parameters. tag_or_iteration, model, optimizer,
Args: infos):
checkpoint_dir (str): the directory where checkpoint is saved. # remove the worst
tag_or_iteration (int or str): the latest iteration(step or epoch) number. if self._best_full():
model (Layer): model to be checkpointed. worst_record_path = max(self.best_records,
optimizer (Optimizer, optional): optimizer to be checkpointed. key=self.best_records.get)
Defaults to None. self.best_records.pop(worst_record_path)
infos (dict or None): any info you want to save. if (worst_record_path not in self.latest_records):
Returns: logger.info(
None "remove the worst checkpoint: {}".format(worst_record_path))
""" self._del_checkpoint(checkpoint_dir, worst_record_path)
checkpoint_path = os.path.join(checkpoint_dir,
"{}".format(tag_or_iteration)) # add the new one
self._save_parameters(checkpoint_dir, tag_or_iteration, model,
model_dict = model.state_dict() optimizer, infos)
params_path = checkpoint_path + ".pdparams" self.best_records[tag_or_iteration] = metric
paddle.save(model_dict, params_path)
logger.info("Saved model to {}".format(params_path)) def _save_latest_checkpoint_and_update(
self, checkpoint_dir, tag_or_iteration, model, optimizer, infos):
if optimizer: # remove the old
opt_dict = optimizer.state_dict() if self._latest_full():
to_del_fn = self.latest_records.pop(0)
if (to_del_fn not in self.best_records.keys()):
logger.info(
"remove the latest checkpoint: {}".format(to_del_fn))
self._del_checkpoint(checkpoint_dir, to_del_fn)
self.latest_records.append(tag_or_iteration)
self._save_parameters(checkpoint_dir, tag_or_iteration, model,
optimizer, infos)
def _del_checkpoint(self, checkpoint_dir, tag_or_iteration):
checkpoint_path = os.path.join(checkpoint_dir,
"{}".format(tag_or_iteration))
for filename in glob.glob(checkpoint_path + ".*"):
os.remove(filename)
logger.info("delete file: {}".format(filename))
def _load_checkpoint_idx(self, checkpoint_record: str) -> int:
"""Get the iteration number corresponding to the latest saved checkpoint.
Args:
checkpoint_path (str): the saved path of checkpoint.
Returns:
int: the latest iteration number. -1 for no checkpoint to load.
"""
if not os.path.isfile(checkpoint_record):
return -1
# Fetch the latest checkpoint index.
with open(checkpoint_record, "rt") as handle:
latest_checkpoint = handle.readlines()[-1].strip()
iteration = int(latest_checkpoint.split(":")[-1])
return iteration
def _save_checkpoint_record(self, checkpoint_dir: str, iteration: int):
"""Save the iteration number of the latest model to be checkpoint record.
Args:
checkpoint_dir (str): the directory where checkpoint is saved.
iteration (int): the latest iteration number.
Returns:
None
"""
checkpoint_record_latest = os.path.join(checkpoint_dir,
"checkpoint_latest")
checkpoint_record_best = os.path.join(checkpoint_dir, "checkpoint_best")
with open(checkpoint_record_best, "w") as handle:
for i in self.best_records.keys():
handle.write("model_checkpoint_path:{}\n".format(i))
with open(checkpoint_record_latest, "w") as handle:
for i in self.latest_records:
handle.write("model_checkpoint_path:{}\n".format(i))
def _load_parameters(self,
model,
optimizer=None,
checkpoint_dir=None,
checkpoint_path=None,
checkpoint_file=None):
"""Load a last model checkpoint from disk.
Args:
model (Layer): model to load parameters.
optimizer (Optimizer, optional): optimizer to load states if needed.
Defaults to None.
checkpoint_dir (str, optional): the directory where checkpoint is saved.
checkpoint_path (str, optional): if specified, load the checkpoint
stored in the checkpoint_path(prefix) and the argument 'checkpoint_dir' will
be ignored. Defaults to None.
checkpoint_file "checkpoint_latest" or "checkpoint_best"
Returns:
configs (dict): epoch or step, lr and other meta info should be saved.
"""
configs = {}
if checkpoint_path is not None:
tag = os.path.basename(checkpoint_path).split(":")[-1]
elif checkpoint_dir is not None and checkpoint_file is not None:
checkpoint_record = os.path.join(checkpoint_dir, checkpoint_file)
iteration = self._load_checkpoint_idx(checkpoint_record)
if iteration == -1:
return configs
checkpoint_path = os.path.join(checkpoint_dir,
"{}".format(iteration))
else:
raise ValueError(
"At least one of 'checkpoint_dir' and 'checkpoint_file' and 'checkpoint_path' should be specified!"
)
rank = dist.get_rank()
params_path = checkpoint_path + ".pdparams"
model_dict = paddle.load(params_path)
model.set_state_dict(model_dict)
logger.info("Rank {}: loaded model from {}".format(rank, params_path))
optimizer_path = checkpoint_path + ".pdopt" optimizer_path = checkpoint_path + ".pdopt"
paddle.save(opt_dict, optimizer_path) if optimizer and os.path.isfile(optimizer_path):
logger.info("Saved optimzier state to {}".format(optimizer_path)) optimizer_dict = paddle.load(optimizer_path)
optimizer.set_state_dict(optimizer_dict)
logger.info("Rank {}: loaded optimizer state from {}".format(
rank, optimizer_path))
info_path = re.sub('.pdparams$', '.json', params_path)
if os.path.exists(info_path):
with open(info_path, 'r') as fin:
configs = json.load(fin)
return configs
@mp_tools.rank_zero_only
def _save_parameters(self,
checkpoint_dir: str,
tag_or_iteration: Union[int, str],
model: paddle.nn.Layer,
optimizer: Optimizer=None,
infos: dict=None):
"""Checkpoint the latest trained model parameters.
Args:
checkpoint_dir (str): the directory where checkpoint is saved.
tag_or_iteration (int or str): the latest iteration(step or epoch) number.
model (Layer): model to be checkpointed.
optimizer (Optimizer, optional): optimizer to be checkpointed.
Defaults to None.
infos (dict or None): any info you want to save.
Returns:
None
"""
checkpoint_path = os.path.join(checkpoint_dir,
"{}".format(tag_or_iteration))
model_dict = model.state_dict()
params_path = checkpoint_path + ".pdparams"
paddle.save(model_dict, params_path)
logger.info("Saved model to {}".format(params_path))
info_path = re.sub('.pdparams$', '.json', params_path) if optimizer:
infos = {} if infos is None else infos opt_dict = optimizer.state_dict()
with open(info_path, 'w') as fout: optimizer_path = checkpoint_path + ".pdopt"
data = json.dumps(infos) paddle.save(opt_dict, optimizer_path)
fout.write(data) logger.info("Saved optimzier state to {}".format(optimizer_path))
if isinstance(tag_or_iteration, int): info_path = re.sub('.pdparams$', '.json', params_path)
_save_record(checkpoint_dir, tag_or_iteration) infos = {} if infos is None else infos
with open(info_path, 'w') as fout:
data = json.dumps(infos)
fout.write(data)
...@@ -48,6 +48,9 @@ training: ...@@ -48,6 +48,9 @@ training:
weight_decay: 1e-06 weight_decay: 1e-06
global_grad_clip: 3.0 global_grad_clip: 3.0
log_interval: 100 log_interval: 100
checkpoint:
kbest_n: 50
latest_n: 5
decoding: decoding:
batch_size: 128 batch_size: 128
......
...@@ -93,6 +93,9 @@ training: ...@@ -93,6 +93,9 @@ training:
warmup_steps: 25000 warmup_steps: 25000
lr_decay: 1.0 lr_decay: 1.0
log_interval: 100 log_interval: 100
checkpoint:
kbest_n: 50
latest_n: 5
decoding: decoding:
......
...@@ -88,6 +88,9 @@ training: ...@@ -88,6 +88,9 @@ training:
warmup_steps: 25000 warmup_steps: 25000
lr_decay: 1.0 lr_decay: 1.0
log_interval: 100 log_interval: 100
checkpoint:
kbest_n: 50
latest_n: 5
decoding: decoding:
......
...@@ -48,6 +48,9 @@ training: ...@@ -48,6 +48,9 @@ training:
weight_decay: 1e-06 weight_decay: 1e-06
global_grad_clip: 5.0 global_grad_clip: 5.0
log_interval: 100 log_interval: 100
checkpoint:
kbest_n: 50
latest_n: 5
decoding: decoding:
batch_size: 128 batch_size: 128
......
...@@ -93,6 +93,9 @@ training: ...@@ -93,6 +93,9 @@ training:
warmup_steps: 25000 warmup_steps: 25000
lr_decay: 1.0 lr_decay: 1.0
log_interval: 100 log_interval: 100
checkpoint:
kbest_n: 50
latest_n: 5
decoding: decoding:
......
...@@ -86,6 +86,9 @@ training: ...@@ -86,6 +86,9 @@ training:
warmup_steps: 25000 warmup_steps: 25000
lr_decay: 1.0 lr_decay: 1.0
log_interval: 100 log_interval: 100
checkpoint:
kbest_n: 50
latest_n: 5
decoding: decoding:
......
...@@ -89,6 +89,9 @@ training: ...@@ -89,6 +89,9 @@ training:
warmup_steps: 25000 warmup_steps: 25000
lr_decay: 1.0 lr_decay: 1.0
log_interval: 100 log_interval: 100
checkpoint:
kbest_n: 50
latest_n: 5
decoding: decoding:
......
...@@ -84,6 +84,9 @@ training: ...@@ -84,6 +84,9 @@ training:
warmup_steps: 25000 warmup_steps: 25000
lr_decay: 1.0 lr_decay: 1.0
log_interval: 100 log_interval: 100
checkpoint:
kbest_n: 50
latest_n: 5
decoding: decoding:
......
...@@ -43,12 +43,16 @@ model: ...@@ -43,12 +43,16 @@ model:
share_rnn_weights: True share_rnn_weights: True
training: training:
n_epoch: 24 n_epoch: 10
lr: 1e-5 lr: 1e-5
lr_decay: 1.0 lr_decay: 1.0
weight_decay: 1e-06 weight_decay: 1e-06
global_grad_clip: 5.0 global_grad_clip: 5.0
log_interval: 1 log_interval: 1
checkpoint:
kbest_n: 3
latest_n: 2
decoding: decoding:
batch_size: 128 batch_size: 128
......
...@@ -91,6 +91,9 @@ training: ...@@ -91,6 +91,9 @@ training:
warmup_steps: 25000 warmup_steps: 25000
lr_decay: 1.0 lr_decay: 1.0
log_interval: 1 log_interval: 1
checkpoint:
kbest_n: 10
latest_n: 1
decoding: decoding:
......
...@@ -84,6 +84,9 @@ training: ...@@ -84,6 +84,9 @@ training:
warmup_steps: 25000 warmup_steps: 25000
lr_decay: 1.0 lr_decay: 1.0
log_interval: 1 log_interval: 1
checkpoint:
kbest_n: 10
latest_n: 1
decoding: decoding:
......
...@@ -87,6 +87,9 @@ training: ...@@ -87,6 +87,9 @@ training:
warmup_steps: 25000 warmup_steps: 25000
lr_decay: 1.0 lr_decay: 1.0
log_interval: 1 log_interval: 1
checkpoint:
kbest_n: 10
latest_n: 1
decoding: decoding:
......
...@@ -84,6 +84,9 @@ training: ...@@ -84,6 +84,9 @@ training:
warmup_steps: 25000 warmup_steps: 25000
lr_decay: 1.0 lr_decay: 1.0
log_interval: 1 log_interval: 1
checkpoint:
kbest_n: 10
latest_n: 1
decoding: decoding:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册