提交 93bf920e 编写于 作者: W wuzewu

add Task run environment

上级 28eb496f
...@@ -142,7 +142,11 @@ def from_module_attr_to_param(module_attr): ...@@ -142,7 +142,11 @@ def from_module_attr_to_param(module_attr):
return param return param
def connect_program(pre_program, next_program, input_dict=None, inplace=True): def connect_program(pre_program,
next_program,
input_dict=None,
inplace=True,
need_log=True):
def _copy_vars_and_ops_in_blocks(from_block, to_block): def _copy_vars_and_ops_in_blocks(from_block, to_block):
for var in from_block.vars: for var in from_block.vars:
var = from_block.var(var) var = from_block.var(var)
...@@ -198,7 +202,8 @@ def connect_program(pre_program, next_program, input_dict=None, inplace=True): ...@@ -198,7 +202,8 @@ def connect_program(pre_program, next_program, input_dict=None, inplace=True):
outputs={'Out': output_var}) outputs={'Out': output_var})
block_map = {0: 0} block_map = {0: 0}
logger.info("Connect program's input tensor") if need_log:
logger.info("Connect program's input tensor")
for index, block in enumerate(next_program.blocks): for index, block in enumerate(next_program.blocks):
if block.idx == 0: if block.idx == 0:
_copy_vars_and_ops_in_blocks(block, output_program.global_block()) _copy_vars_and_ops_in_blocks(block, output_program.global_block())
...@@ -210,7 +215,8 @@ def connect_program(pre_program, next_program, input_dict=None, inplace=True): ...@@ -210,7 +215,8 @@ def connect_program(pre_program, next_program, input_dict=None, inplace=True):
new_block = output_program._create_block( new_block = output_program._create_block(
parent_idx=block_map[block.parent_idx]) parent_idx=block_map[block.parent_idx])
_copy_vars_and_ops_in_blocks(block, new_block) _copy_vars_and_ops_in_blocks(block, new_block)
logger.info("Connect program's input tensor done") if need_log:
logger.info("Connect program's input tensor done")
return output_program return output_program
......
...@@ -18,8 +18,10 @@ from __future__ import print_function ...@@ -18,8 +18,10 @@ from __future__ import print_function
import os import os
import collections import collections
import contextlib
import time import time
import multiprocessing import multiprocessing
import copy
import numpy as np import numpy as np
import paddle.fluid as fluid import paddle.fluid as fluid
...@@ -61,6 +63,28 @@ class RunState(object): ...@@ -61,6 +63,28 @@ class RunState(object):
return self return self
class RunEnv(object):
def __init__(self):
self.current_epoch = 0
self.current_step = 0
self.main_program = None
self.start_program = None
self.main_program_compiled = None
self.py_reader = None
self.reader = None
self.loss = None
self.label = None
self.metrics = None
self.is_inititalized = False
self.UNG = copy.deepcopy(fluid.unique_name.generator)
def __setattr__(self, key, value):
self.__dict__[key] = value
def __getattr__(self, key):
return self.__dict__[key]
class BasicTask(object): class BasicTask(object):
def __init__(self, def __init__(self,
feed_list, feed_list,
...@@ -68,36 +92,277 @@ class BasicTask(object): ...@@ -68,36 +92,277 @@ class BasicTask(object):
main_program=None, main_program=None,
startup_program=None, startup_program=None,
config=None): config=None):
self.data_reader = data_reader
self.main_program = main_program if main_program else fluid.default_main_program( # base item
) self._base_data_reader = data_reader
self.startup_program = startup_program if startup_program else fluid.default_startup_program( self._base_feed_list = feed_list
) if main_program is None:
self._base_main_program = fluid.default_main_program().clone()
else:
self._base_main_program = main_program.clone()
if startup_program is None:
self._base_startup_program = fluid.default_startup_program().clone()
else:
self._base_startup_program = startup_program.clone()
self._load_checkpoint = False
self._base_compile_program = None
# run config
self.config = config if config else RunConfig() self.config = config if config else RunConfig()
self.place, self.device_count = hub.common.get_running_device_info( self.place, self.device_count = hub.common.get_running_device_info(
self.config) self.config)
self.exe = fluid.Executor(place=self.place) self.exe = fluid.Executor(place=self.place)
self.feed_list = feed_list self.build_strategy = fluid.BuildStrategy()
self.feed_variables = [ if self.config.enable_memory_optim:
main_program.global_block().vars[var_name] for var_name in feed_list self.build_strategy.memory_optimize = True
] else:
self.metrics = [] self.build_strategy.memory_optimize = False
self.is_inititalized = False
self.current_step = 0 # log item
self.current_epoch = 0 if not os.path.exists(self.config.checkpoint_dir):
mkdir(self.config.checkpoint_dir)
vdl_log_dir = os.path.join(self.config.checkpoint_dir, "vdllog")
self.log_writer = LogWriter(vdl_log_dir, sync_cycle=1)
# run environment
self._phases = []
self._envs = {}
def init_if_necessary(self):
if not self._load_checkpoint:
self.load_checkpoint()
self._load_checkpoint = True
@contextlib.contextmanager
def phase_guard(self, phase):
if phase not in ["train", "val", "dev", "test", "predict", "inference"]:
raise RuntimeError()
self._phases.append(phase)
yield
self._phases = self._phases[:-1]
def _build_env(self):
if self.env.is_inititalized:
return
self._build_env_start_event()
self.env.is_inititalized = True
self.env.main_program = self._base_main_program.clone()
self.env.startup_program = fluid.Program()
with fluid.program_guard(self.env.main_program,
self._base_startup_program):
with fluid.unique_name.guard(self.env.UNG):
self.env.output = self._build_net()
if self.is_train_phase or self.is_test_phase:
self.env.label = self._add_label()
self.env.loss = self._add_loss()
self.env.metrics = self._add_metrics()
def _init_start_event(self): if self.config.use_pyreader:
t_program = fluid.Program()
with fluid.program_guard(t_program, self.env.startup_program):
self.env.py_reader = fluid.layers.py_reader(
capacity=64,
shapes=[var.shape for var in self.feed_var_list],
dtypes=[dtype_map[var.dtype] for var in self.feed_var_list],
lod_levels=[var.lod_level for var in self.feed_var_list],
use_double_buffer=False)
feed_var_list = self.feed_var_list
py_vars = fluid.layers.read_file(self.env.py_reader)
input_dict = {
feed_var_list[index].name: py_var
for index, py_var in enumerate(py_vars)
}
hub.connect_program(
pre_program=t_program,
next_program=self.env.main_program,
input_dict=input_dict,
need_log=False)
self.env.main_program = t_program
self.env.loss = self.env.main_program.global_block().vars[
self.env.loss.name]
self.env.output = self.env.main_program.global_block().vars[
self.env.output.name]
metrics_name = [var.name for var in self.env.metrics]
self.env.metrics = [
self.env.main_program.global_block().vars[name]
for name in metrics_name
]
if self.config.enable_memory_optim:
for var_name in self.fetch_list:
var = self.env.main_program.global_block().vars[var_name]
var.persistable = True
if self.is_train_phase:
with fluid.program_guard(self.env.main_program,
self._base_startup_program):
with fluid.unique_name.guard(self.env.UNG):
self.config.strategy.execute(
self.loss, self._base_data_reader, self.config)
if self.is_train_phase:
loss_name = self.env.loss.name
share_vars_from = None
else:
loss_name = None
if self._base_compile_program is None:
share_vars_from = None
else:
share_vars_from = self._base_compile_program
self.env.main_program_compiled = fluid.CompiledProgram(
self.env.main_program).with_data_parallel(
loss_name=loss_name,
share_vars_from=share_vars_from,
build_strategy=self.build_strategy)
if self._base_compile_program is None:
self._base_compile_program = self.env.main_program_compiled
self.exe.run(self.env.startup_program)
self._build_env_end_event()
@property
def is_train_phase(self):
return self.phase in ["train"]
@property
def is_test_phase(self):
return self.phase in ["val", "dev", "test"]
@property
def is_predict_phase(self):
return self.phase in ["predict", "inference"]
@property
def phase(self):
return self._phases[-1]
@property
def env(self):
phase = self.phase
if phase in ["val", "dev", "test"]:
phase = "val"
if not phase in self._envs:
self._envs[phase] = RunEnv()
return self._envs[phase]
@property
def py_reader(self):
if not self.env.is_inititalized:
self._build_env()
return self.env.py_reader
@property
def current_step(self):
if not self.env.is_inititalized:
self._build_env()
return self.env.current_step
@property
def current_epoch(self):
if not self.env.is_inititalized:
self._build_env()
return self.env.current_epoch
@property
def main_program(self):
if not self.env.is_inititalized:
self._build_env()
return self.env.main_program
@property
def startup_program(self):
if not self.env.is_inititalized:
self._build_env()
return self.env.startup_program
@property
def main_program_compiled(self):
if not self.env.is_inititalized:
self._build_env()
return self.env.main_program_compiled
@property
def reader(self):
self.env.reader = self._base_data_reader.data_generator(
batch_size=self.config.batch_size, phase=self.phase)
return self.env.reader
@property
def loss(self):
if self.is_predict_phase:
raise RuntimeError()
if not self.env.is_inititalized:
self._build_env()
return self.env.loss
@property
def label(self):
if self.is_predict_phase:
raise RuntimeError()
if not self.env.is_inititalized:
self._build_env()
return self.env.label
@property
def output(self):
if self.is_predict_phase:
raise RuntimeError()
if not self.env.is_inititalized:
self._build_env()
return self.env.output
@property
def metrics(self):
if self.is_predict_phase:
raise RuntimeError()
if not self.env.is_inititalized:
self._build_env()
return self.env.metrics
@property
def unique_name_generator(self):
return self.env.UNG
@property
def feed_list(self):
feed_list = [varname for varname in self._base_feed_list]
if self.is_train_phase or self.is_test_phase:
feed_list += [self.label.name]
return feed_list
@property
def feed_var_list(self):
vars = self.main_program.global_block().vars
return [vars[varname] for varname in self.feed_list]
@property
def fetch_list(self):
if self.is_train_phase or self.is_test_phase:
return [metric.name for metric in self.metrics] + [self.loss.name]
return [self.output.name]
def _build_env_start_event(self):
pass pass
def _init_end_event(self): def _build_env_end_event(self):
pass pass
def _eval_start_event(self, phase): def _eval_start_event(self):
logger.info("Evaluation on {} dataset start".format(phase)) logger.info("Evaluation on {} dataset start".format(self.phase))
def _eval_end_event(self, phase, run_state): def _eval_end_event(self, run_state):
logger.info("[%s dataset evaluation result] [step/sec: %.2f]" % logger.info("[%s dataset evaluation result] [step/sec: %.2f]" %
(phase, run_state.run_speed)) (self.phase, run_state.run_speed))
def _log_interval_event(self, run_state): def _log_interval_event(self, run_state):
logger.info("step %d: [step/sec: %.2f]" % (self.current_step, logger.info("step %d: [step/sec: %.2f]" % (self.current_step,
...@@ -109,8 +374,8 @@ class BasicTask(object): ...@@ -109,8 +374,8 @@ class BasicTask(object):
def _eval_interval_event(self): def _eval_interval_event(self):
self.eval(phase="dev") self.eval(phase="dev")
def _run_step_event(self, phase, run_state): def _run_step_event(self, run_state):
if phase == "predict": if self.is_predict_phase:
yield run_state.run_results yield run_state.run_results
def _finetune_start_event(self): def _finetune_start_event(self):
...@@ -131,114 +396,6 @@ class BasicTask(object): ...@@ -131,114 +396,6 @@ class BasicTask(object):
def _add_metrics(self): def _add_metrics(self):
raise NotImplementedError raise NotImplementedError
def _add_py_reader(self):
for program, add_label in ((self.main_program,
True), (self.test_program, True),
(self.inference_program, False)):
temp_program = fluid.Program()
startup_program = fluid.Program()
with fluid.program_guard(temp_program, startup_program):
feed_variables = self.feed_variables
if add_label:
feed_variables = feed_variables + [self.label]
feed_list = self.feed_list
if add_label:
feed_list = feed_list + [self.label.name]
py_reader = fluid.layers.py_reader(
capacity=16,
shapes=[var.shape for var in feed_variables],
lod_levels=[var.lod_level for var in feed_variables],
dtypes=[dtype_map[var.dtype] for var in feed_variables],
use_double_buffer=True)
feed_variables = fluid.layers.read_file(py_reader)
input_dict = {
key: feed_variables[index]
for index, key in enumerate(feed_list)
}
hub.connect_program(
pre_program=temp_program,
next_program=program,
input_dict=input_dict,
inplace=True)
self.exe.run(startup_program)
if program == self.main_program:
self.main_program = temp_program
self.loss = self.main_program.global_block().vars[
self.loss.name]
for index, metric in enumerate(self.metrics):
self.metrics[index] = self.main_program.global_block().vars[
metric.name]
self.output = self.main_program.global_block().vars[
self.output.name]
self.loss.persistable = True
for metric in self.metrics:
metric.persistable = True
self.output.persistable = True
self.main_py_reader = py_reader
elif program == self.test_program:
self.test_program = temp_program
self.test_py_reader = py_reader
elif program == self.inference_program:
self.inference_program = temp_program
self.inference_py_reader = py_reader
def _init_if_necessary(self, load_best_model=False):
if not self.is_inititalized:
self._init_start_event()
with fluid.program_guard(self.main_program):
self.output = self._build_net()
self.inference_program = self.main_program.clone(for_test=True)
self._add_label()
self._add_loss()
self._add_metrics()
self.test_program = self.main_program.clone(for_test=True)
if self.config.use_pyreader:
self._add_py_reader()
with fluid.program_guard(self.main_program):
self.config.strategy.execute(self.loss, self.data_reader,
self.config)
self.loss.persistable = True
for metric in self.metrics:
metric.persistable = True
self.output.persistable = True
self.build_strategy = fluid.BuildStrategy()
if self.config.enable_memory_optim:
self.build_strategy.memory_optimize = True
else:
self.build_strategy.memory_optimize = False
self.main_program_compiled = fluid.CompiledProgram(
self.main_program).with_data_parallel(
loss_name=self.loss.name,
build_strategy=self.build_strategy)
self.inference_program_compiled = fluid.CompiledProgram(
self.inference_program).with_data_parallel(
share_vars_from=self.main_program_compiled,
build_strategy=self.build_strategy)
self.test_program_compiled = fluid.CompiledProgram(
self.test_program).with_data_parallel(
share_vars_from=self.main_program_compiled,
build_strategy=self.build_strategy)
self.load_checkpoint(load_best_model=load_best_model)
if not os.path.exists(self.config.checkpoint_dir):
mkdir(self.config.checkpoint_dir)
vdl_log_dir = os.path.join(self.config.checkpoint_dir, "vdllog")
self.log_writer = LogWriter(vdl_log_dir, sync_cycle=1)
self.is_inititalized = True
self._init_end_event()
# NOTE: current saved checkpoint machanism is not completed, # NOTE: current saved checkpoint machanism is not completed,
# it can't restore dataset training status # it can't restore dataset training status
def save_checkpoint(self, epoch, step): def save_checkpoint(self, epoch, step):
...@@ -250,11 +407,11 @@ class BasicTask(object): ...@@ -250,11 +407,11 @@ class BasicTask(object):
main_program=self.main_program) main_program=self.main_program)
def load_checkpoint(self, load_best_model=False): def load_checkpoint(self, load_best_model=False):
self.current_epoch, self.current_step = load_checkpoint( self.env.current_epoch, self.env.current_step = load_checkpoint(
self.config.checkpoint_dir, self.config.checkpoint_dir,
self.exe, self.exe,
main_program=self.main_program, main_program=self.main_program,
startup_program=self.startup_program) startup_program=self._base_startup_program)
if load_best_model: if load_best_model:
model_saved_dir = os.path.join(self.config.checkpoint_dir, model_saved_dir = os.path.join(self.config.checkpoint_dir,
...@@ -265,89 +422,73 @@ class BasicTask(object): ...@@ -265,89 +422,73 @@ class BasicTask(object):
dirname=model_saved_dir, dirname=model_saved_dir,
main_program=self.main_program) main_program=self.main_program)
def get_feed_list(self, phase):
if phase in ["train", "dev", "val", "test"]:
return self.feed_list + [self.label.name]
return self.feed_list
def get_fetch_list(self, phase):
metrics_name = [metric.name for metric in self.metrics]
if phase in ["train", "dev", "val", "test"]:
return metrics_name + [self.loss.name]
return [self.output.name]
def finetune_and_eval(self): def finetune_and_eval(self):
self.finetune(do_eval=True) self.finetune(do_eval=True)
def finetune(self, do_eval=False): def finetune(self, do_eval=False):
self._init_if_necessary() # Start to finetune
self._finetune_start_event() with self.phase_guard(phase="train"):
run_states = [] self.init_if_necessary()
if self.current_epoch <= self.config.num_epoch: self._finetune_start_event()
# Start to finetune run_states = []
with fluid.program_guard(self.main_program): if self.current_epoch <= self.config.num_epoch:
while self.current_epoch <= self.config.num_epoch: while self.current_epoch <= self.config.num_epoch:
train_reader = self.data_reader.data_generator( run_states = self._run(do_eval=do_eval)
batch_size=self.config.batch_size, phase='train') self.env.current_epoch += 1
run_states = self._run(
train_reader,
phase="train",
do_eval=do_eval,
program_compiled=self.main_program_compiled)
self.current_epoch += 1
# Save checkpoint after finetune # Save checkpoint after finetune
self.save_checkpoint(self.current_epoch + 1, self.current_step) self.save_checkpoint(self.current_epoch + 1, self.current_step)
# Final evaluation # Final evaluation
self.eval(phase="dev") self.eval(phase="dev")
self.eval(phase="test") self.eval(phase="test")
self._finetune_end_event(run_states) self._finetune_end_event(run_states)
def eval(self, phase="dev"): def eval(self, phase="dev"):
self._init_if_necessary() with self.phase_guard(phase=phase):
self._eval_start_event(phase) self.init_if_necessary()
with fluid.program_guard(self.test_program): self._eval_start_event()
test_reader = self.data_reader.data_generator( run_states = self._run()
batch_size=self.config.batch_size, phase=phase) self._eval_end_event(run_states)
run_states = self._run(
test_reader, def predict(self, data, load_best_model=True):
phase=phase, with self.phase_guard(phase=phase):
program_compiled=self.test_program_compiled) self.init_if_necessary()
for run_state in self._run():
self._eval_end_event(phase, run_states) yield run_state.run_results
def _run_with_data_feeder(self, def _run(self, do_eval=False):
reader, with fluid.program_guard(self.main_program, self.startup_program):
phase, if self.config.use_pyreader:
do_eval=False, return self._run_with_py_reader(do_eval=do_eval)
program_compiled=None): return self._run_with_data_feeder(do_eval=do_eval)
if program_compiled is None:
program_compiled = self.main_program_compiled def _run_with_data_feeder(self, do_eval=False):
feed_list = self.get_feed_list(phase=phase)
data_feeder = fluid.DataFeeder(feed_list=feed_list, place=self.place) data_feeder = fluid.DataFeeder(
fetch_list = self.get_fetch_list(phase=phase) feed_list=self.feed_list, place=self.place)
global_run_states = [] global_run_states = []
period_run_states = [] period_run_states = []
for run_step, batch in enumerate(reader(), start=1): for run_step, batch in enumerate(self.reader(), start=1):
step_run_state = RunState(len(fetch_list)) step_run_state = RunState(len(self.fetch_list))
step_run_state.run_step = 1 step_run_state.run_step = 1
num_batch_examples = len(batch) num_batch_examples = len(batch)
fetch_result = self.exe.run( fetch_result = self.exe.run(
program_compiled, self.main_program_compiled,
feed=data_feeder.feed(batch), feed=data_feeder.feed(batch),
fetch_list=fetch_list) fetch_list=self.fetch_list)
for index, result in enumerate(fetch_result): for index, result in enumerate(fetch_result):
step_run_state.run_results[index] = result step_run_state.run_results[index] = result
step_run_state.run_examples += num_batch_examples step_run_state.run_examples += num_batch_examples
step_run_state.update() step_run_state.update()
period_run_states += [step_run_state] period_run_states += [step_run_state]
if phase == "train": if self.is_train_phase:
self.current_step += 1 self.env.current_step += 1
if self.current_step % self.config.log_interval == 0: if self.current_step % self.config.log_interval == 0:
self._log_interval_event(period_run_states) self._log_interval_event(period_run_states)
global_run_states += period_run_states global_run_states += period_run_states
...@@ -359,46 +500,31 @@ class BasicTask(object): ...@@ -359,46 +500,31 @@ class BasicTask(object):
if do_eval and self.current_step % self.config.eval_interval == 0: if do_eval and self.current_step % self.config.eval_interval == 0:
self._eval_interval_event() self._eval_interval_event()
self._run_step_event(phase, step_run_state) self._run_step_event(step_run_state)
global_run_states += period_run_states global_run_states += period_run_states
return global_run_states return global_run_states
def _run_with_py_reader(self, def _run_with_py_reader(self, do_eval=False):
reader,
phase,
do_eval=False,
program_compiled=None):
if program_compiled is None:
program_compiled = self.main_program_compiled
if phase == "train":
py_reader = self.main_py_reader
elif phase in ["dev", "val", "test"]:
py_reader = self.test_py_reader
elif phase == "predict":
py_reader = self.inference_py_reader
py_reader.decorate_paddle_reader(reader)
fetch_list = self.get_fetch_list(phase=phase)
global_run_states = [] global_run_states = []
period_run_states = [] period_run_states = []
self.py_reader.decorate_paddle_reader(self.reader)
py_reader.start() self.py_reader.start()
try: try:
while True: while True:
num_batch_examples = self.config.batch_size num_batch_examples = self.config.batch_size
step_run_state = RunState(len(fetch_list)) step_run_state = RunState(len(self.fetch_list))
step_run_state.run_step = 1 step_run_state.run_step = 1
fetch_result = self.exe.run( fetch_result = self.exe.run(
program_compiled, fetch_list=fetch_list) self.main_program_compiled, fetch_list=self.fetch_list)
for index, result in enumerate(fetch_result): for index, result in enumerate(fetch_result):
step_run_state.run_results[index] = result step_run_state.run_results[index] = result
step_run_state.run_examples += num_batch_examples step_run_state.run_examples += num_batch_examples
step_run_state.update() step_run_state.update()
period_run_states += [step_run_state] period_run_states += [step_run_state]
if phase == "train": if self.is_train_phase:
self.current_step += 1 self.env.current_step += 1
if self.current_step % self.config.log_interval == 0: if self.current_step % self.config.log_interval == 0:
self._log_interval_event(period_run_states) self._log_interval_event(period_run_states)
global_run_states += period_run_states global_run_states += period_run_states
...@@ -410,38 +536,13 @@ class BasicTask(object): ...@@ -410,38 +536,13 @@ class BasicTask(object):
if do_eval and self.current_step % self.config.eval_interval == 0: if do_eval and self.current_step % self.config.eval_interval == 0:
self._eval_interval_event() self._eval_interval_event()
self._run_step_event(phase, step_run_state) self._run_step_event(step_run_state)
except fluid.core.EOFException: except fluid.core.EOFException:
py_reader.reset() self.py_reader.reset()
global_run_states += period_run_states global_run_states += period_run_states
return global_run_states return global_run_states
def _run(self, reader, phase, do_eval=False, program_compiled=None):
if self.config.use_pyreader:
return self._run_with_py_reader(
reader,
phase,
do_eval=do_eval,
program_compiled=program_compiled)
else:
return self._run_with_data_feeder(
reader,
phase,
do_eval=do_eval,
program_compiled=program_compiled)
def predict(self, data, load_best_model=True):
self._init_if_necessary(load_best_model=load_best_model)
with fluid.program_guard(self.inference_program):
inference_reader = self.data_reader.data_generator(
batch_size=self.config.batch_size, phase='predict', data=data)
for run_state in self._run(
inference_reader,
phase='predict',
program_compiled=self.inference_program_compiled):
yield run_state.run_results
class ClassifierTask(BasicTask): class ClassifierTask(BasicTask):
def __init__(self, def __init__(self,
...@@ -487,25 +588,22 @@ class ClassifierTask(BasicTask): ...@@ -487,25 +588,22 @@ class ClassifierTask(BasicTask):
return logits return logits
def _add_label(self): def _add_label(self):
self.label = fluid.layers.data(name="label", dtype="int64", shape=[1]) return fluid.layers.data(name="label", dtype="int64", shape=[1])
def _add_loss(self): def _add_loss(self):
ce_loss = fluid.layers.cross_entropy( ce_loss = fluid.layers.cross_entropy(
input=self.output, label=self.label) input=self.output, label=self.label)
self.loss = fluid.layers.mean(x=ce_loss) return fluid.layers.mean(x=ce_loss)
def _add_metrics(self): def _add_metrics(self):
self.accuracy = fluid.layers.accuracy( return [fluid.layers.accuracy(input=self.output, label=self.label)]
input=self.output, label=self.label)
self.metrics.append(self.accuracy)
def _init_end_event(self): def _build_env_end_event(self):
with self.log_writer.mode("train") as logw: with self.log_writer.mode(self.phase) as logw:
self.train_loss_scalar = logw.scalar(tag="Loss [train]") self.env.loss_scalar = logw.scalar(
self.train_acc_scalar = logw.scalar(tag="Accuracy [train]") tag="Loss [{}]".format(self.phase))
with self.log_writer.mode("evaluate") as logw: self.env.acc_scalar = logw.scalar(
self.eval_loss_scalar = logw.scalar(tag="Loss [eval]") tag="Accuracy [{}]".format(self.phase))
self.eval_acc_scalar = logw.scalar(tag="Accuracy [eval]")
def _calculate_metrics(self, run_states): def _calculate_metrics(self, run_states):
loss_sum = acc_sum = run_examples = 0 loss_sum = acc_sum = run_examples = 0
...@@ -527,19 +625,19 @@ class ClassifierTask(BasicTask): ...@@ -527,19 +625,19 @@ class ClassifierTask(BasicTask):
def _log_interval_event(self, run_states): def _log_interval_event(self, run_states):
avg_loss, avg_acc, run_speed = self._calculate_metrics(run_states) avg_loss, avg_acc, run_speed = self._calculate_metrics(run_states)
self.train_loss_scalar.add_record(self.current_step, avg_loss) self.env.loss_scalar.add_record(self.current_step, avg_loss)
self.train_acc_scalar.add_record(self.current_step, avg_acc) self.env.acc_scalar.add_record(self.current_step, avg_acc)
logger.info("step %d: loss=%.5f acc=%.5f [step/sec: %.2f]" % logger.info("step %d: loss=%.5f acc=%.5f [step/sec: %.2f]" %
(self.current_step, avg_loss, avg_acc, run_speed)) (self.current_step, avg_loss, avg_acc, run_speed))
def _eval_end_event(self, phase, run_states): def _eval_end_event(self, run_states):
eval_loss, eval_acc, run_speed = self._calculate_metrics(run_states) eval_loss, eval_acc, run_speed = self._calculate_metrics(run_states)
logger.info( logger.info(
"[%s dataset evaluation result] loss=%.5f acc=%.5f [step/sec: %.2f]" "[%s dataset evaluation result] loss=%.5f acc=%.5f [step/sec: %.2f]"
% (phase, eval_loss, eval_acc, run_speed)) % (self.phase, eval_loss, eval_acc, run_speed))
if phase in ["dev", "val"] and eval_acc > self.best_accuracy: if self.phase in ["dev", "val"] and eval_acc > self.best_accuracy:
self.eval_loss_scalar.add_record(self.current_step, eval_loss) self.env.loss_scalar.add_record(self.current_step, eval_loss)
self.eval_acc_scalar.add_record(self.current_step, eval_acc) self.env.acc_scalar.add_record(self.current_step, eval_acc)
self.best_accuracy = eval_acc self.best_accuracy = eval_acc
model_saved_dir = os.path.join(self.config.checkpoint_dir, model_saved_dir = os.path.join(self.config.checkpoint_dir,
"best_model") "best_model")
...@@ -548,7 +646,7 @@ class ClassifierTask(BasicTask): ...@@ -548,7 +646,7 @@ class ClassifierTask(BasicTask):
save_result = fluid.io.save_persistables( save_result = fluid.io.save_persistables(
executor=self.exe, executor=self.exe,
dirname=model_saved_dir, dirname=model_saved_dir,
main_program=self.test_program) main_program=self.main_program)
ImageClassifierTask = ClassifierTask ImageClassifierTask = ClassifierTask
...@@ -644,30 +742,34 @@ class SequenceLabelTask(BasicTask): ...@@ -644,30 +742,34 @@ class SequenceLabelTask(BasicTask):
return logits return logits
def _add_label(self): def _add_label(self):
self.label = fluid.layers.data( label = fluid.layers.data(
name="label", shape=[self.max_seq_len, 1], dtype='int64') name="label", shape=[self.max_seq_len, 1], dtype='int64')
return label
def _add_loss(self): def _add_loss(self):
labels = fluid.layers.flatten(self.label, axis=2) labels = fluid.layers.flatten(self.label, axis=2)
ce_loss = fluid.layers.cross_entropy(input=self.output, label=labels) ce_loss = fluid.layers.cross_entropy(input=self.output, label=labels)
self.loss = fluid.layers.mean(x=ce_loss) loss = fluid.layers.mean(x=ce_loss)
return loss
def _add_metrics(self): def _add_metrics(self):
self.ret_labels = fluid.layers.reshape(x=self.label, shape=[-1, 1]) ret_labels = fluid.layers.reshape(x=self.label, shape=[-1, 1])
self.ret_infers = fluid.layers.reshape( ret_infers = fluid.layers.reshape(
x=fluid.layers.argmax(self.logits, axis=2), shape=[-1, 1]) x=fluid.layers.argmax(self.logits, axis=2), shape=[-1, 1])
self.seq_len = fluid.layers.data( self.seq_len = fluid.layers.data(
name="seq_len", shape=[1], dtype='int64') name="seq_len", shape=[1], dtype='int64')
self.seq_len = fluid.layers.assign(self.seq_len) seq_len = fluid.layers.assign(self.seq_len)
self.metrics += [self.ret_labels, self.ret_infers, self.seq_len] return [ret_labels, ret_infers, seq_len]
def _init_end_event(self): def _build_env_end_event(self):
with self.log_writer.mode("train") as logw: with self.log_writer.mode(self.phase) as logw:
self.train_loss_scalar = logw.scalar(tag="Loss [train]") self.env.loss_scalar = logw.scalar(
with self.log_writer.mode("evaluate") as logw: tag="Loss [{}]".format(self.phase))
self.eval_f1_scalar = logw.scalar(tag="F1 [eval]") self.env.f1_scalar = logw.scalar(tag="F1 [{}]".format(self.phase))
self.eval_precision_scalar = logw.scalar(tag="Precision [eval]") self.env.precision_scalar = logw.scalar(
self.eval_recall_scalar = logw.scalar(tag="Recall [eval]") tag="Precision [{}]".format(self.phase))
self.env.recall_scalar = logw.scalar(
tag="Recall [{}]".format(self.phase))
def _calculate_metrics(self, run_states): def _calculate_metrics(self, run_states):
total_infer = total_label = total_correct = loss_sum = 0 total_infer = total_label = total_correct = loss_sum = 0
...@@ -696,22 +798,22 @@ class SequenceLabelTask(BasicTask): ...@@ -696,22 +798,22 @@ class SequenceLabelTask(BasicTask):
def _log_interval_event(self, run_states): def _log_interval_event(self, run_states):
precision, recall, f1, avg_loss, run_speed = self._calculate_metrics( precision, recall, f1, avg_loss, run_speed = self._calculate_metrics(
run_states) run_states)
self.train_loss_scalar.add_record(self.current_step, avg_loss) self.env.loss_scalar.add_record(self.current_step, avg_loss)
logger.info("step %d: loss=%.5f [step/sec: %.2f]" % logger.info("step %d: loss=%.5f [step/sec: %.2f]" %
(self.current_step, avg_loss, run_speed)) (self.current_step, avg_loss, run_speed))
def _eval_end_event(self, phase, run_states): def _eval_end_event(self, run_states):
precision, recall, f1, avg_loss, run_speed = self._calculate_metrics( precision, recall, f1, avg_loss, run_speed = self._calculate_metrics(
run_states) run_states)
self.eval_f1_scalar.add_record(self.current_step, f1) self.env.f1_scalar.add_record(self.current_step, f1)
self.eval_precision_scalar.add_record(self.current_step, precision) self.env.precision_scalar.add_record(self.current_step, precision)
self.eval_recall_scalar.add_record(self.current_step, recall) self.env.recall_scalar.add_record(self.current_step, recall)
logger.info("[%s dataset evaluation result] [step/sec: %.2f]" % logger.info("[%s dataset evaluation result] [step/sec: %.2f]" %
(phase, run_speed)) (self.phase, run_speed))
logger.info( logger.info(
"[%s evaluation] F1-Score=%f, precision=%f, recall=%f [step/sec: %.2f]" "[%s evaluation] F1-Score=%f, precision=%f, recall=%f [step/sec: %.2f]"
% (phase, f1, precision, recall, run_speed)) % (self.phase, f1, precision, recall, run_speed))
if f1 > self.best_f1: if self.phase in ["dev", "val"] and f1 > self.best_f1:
self.best_f1 = f1 self.best_f1 = f1
model_saved_dir = os.path.join(self.config.checkpoint_dir, model_saved_dir = os.path.join(self.config.checkpoint_dir,
"best_model") "best_model")
...@@ -719,7 +821,9 @@ class SequenceLabelTask(BasicTask): ...@@ -719,7 +821,9 @@ class SequenceLabelTask(BasicTask):
(model_saved_dir, self.best_f1)) (model_saved_dir, self.best_f1))
fluid.io.save_persistables(self.exe, dirname=model_saved_dir) fluid.io.save_persistables(self.exe, dirname=model_saved_dir)
def get_feed_list(self, phase): @property
if phase in ["train", "dev", "val", "test"]: def feed_list(self):
return self.feed_list + [self.label.name] + [self.seq_len.name] feed_list = [varname for varname in self._base_feed_list]
return self.feed_list if self.is_train_phase or self.is_test_phase:
feed_list += [self.label.name, self.seq_len.name]
return feed_list
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册