未验证 提交 2ef7c1e9 编写于 作者: W wuzewu 提交者: GitHub

add high performance, dataloader and annotation (#406)

* use dataloader
上级 2865db04
......@@ -16,4 +16,4 @@ python -u reading_comprehension.py \
--warmup_proportion=0.1 \
--num_epoch=2 \
--max_seq_len=512 \
--use_data_parallel=True
--use_data_parallel=False
......@@ -46,6 +46,7 @@ from .module.manager import default_module_manager
from .io.type import DataType
from .finetune.task import BaseTask
from .finetune.task import ClassifierTask
from .finetune.task import TextClassifierTask
from .finetune.task import ImageClassifierTask
......
......@@ -21,7 +21,6 @@ import os
import contextlib
import time
import copy
import logging
import inspect
from functools import partial
from collections import OrderedDict
......@@ -36,13 +35,21 @@ from tb_paddle import SummaryWriter
import paddlehub as hub
from paddlehub.common.paddle_helper import dtype_map, clone_program
from paddlehub.common.utils import mkdir, to_list
from paddlehub.common.utils import mkdir
from paddlehub.common.dir import tmp_dir
from paddlehub.common.logger import logger
from paddlehub.finetune.checkpoint import load_checkpoint, save_checkpoint
from paddlehub.finetune.config import RunConfig
class RunState(object):
"""
RunState is used to save the result of every running step
Args:
length (int): the number of fetch result
"""
def __init__(self, length):
self.run_time_begin = time.time()
self.run_step = 0
......@@ -65,6 +72,10 @@ class RunState(object):
class RunEnv(object):
"""
RunEnv saves the running environment of the train/dev/predict phase, including program, reader, metrics and so on.
"""
def __init__(self):
self.current_epoch = 0
self.current_step = 0
......@@ -87,6 +98,10 @@ class RunEnv(object):
class TaskHooks():
"""
TaskHooks can handle some tasks during the spectific event.
"""
def __init__(self):
self._registered_hooks = {
"build_env_start_event": OrderedDict(),
......@@ -118,6 +133,14 @@ class TaskHooks():
}
def add(self, hook_type, name=None, func=None):
"""
add the handler function to spectific event.
Args:
hook_type (str): the spectific event name
name (str): the handler function name, default None
func (func): the handler function, default None
"""
if not func or not callable(func):
raise TypeError(
"The hook function is empty or it is not a function")
......@@ -142,6 +165,13 @@ class TaskHooks():
self._registered_hooks[hook_type][name] = func
def delete(self, hook_type, name):
"""
delete the handler function of spectific event.
Args:
hook_type (str): the spectific event name
name (str): the handler function name
"""
if self.exist(hook_type, name):
del self._registered_hooks[hook_type][name]
else:
......@@ -150,6 +180,14 @@ class TaskHooks():
% (hook_type, name, hook_type))
def modify(self, hook_type, name, func):
"""
modify the handler function of spectific event.
Args:
hook_type (str): the spectific event name
name (str): the handler function name
func (func): the new handler function
"""
if not (isinstance(name, str) and callable(func)):
raise TypeError(
"The hook name must be a string, and the hook function must be a function"
......@@ -162,6 +200,16 @@ class TaskHooks():
% (hook_type, name, hook_type))
def exist(self, hook_type, name):
"""
check if the the handler function of spectific event is existing.
Args:
hook_type (str): the spectific event name
name (str): the handler function name
Returns:
bool: True or False
"""
if hook_type not in self._registered_hooks \
or name not in self._registered_hooks[hook_type]:
return False
......@@ -169,6 +217,15 @@ class TaskHooks():
return True
def info(self, show_default=False):
"""
get the hooks information, including the source code.
Args:
show_default (bool): show the information of Paddlehub default hooks or not, default False
Returns:
str: the formatted string of the hooks information
"""
# formatted output the source code
ret = ""
for hook_type, hooks in self._registered_hooks.items():
......@@ -194,10 +251,22 @@ class TaskHooks():
return self._registered_hooks[hook_type]
def __repr__(self):
return self.info(only_customized=False)
return self.info(show_default=False)
class BaseTask(object):
"""
BaseTask is the base class of all the task. It will complete the building of all the running environment.
Args:
feed_list (list): the inputs name
data_reader (object): data reader for the task
main_program (object): the customized main_program, default None
startup_program (object): the customized startup_program, default None
config (object): the config for the task, default None
metrics_choices (list): metrics used to the task, default ["acc"]
"""
def __init__(self,
feed_list,
data_reader,
......@@ -205,7 +274,6 @@ class BaseTask(object):
startup_program=None,
config=None,
metrics_choices="default"):
# base item
self._base_data_reader = data_reader
self._base_feed_list = feed_list
......@@ -270,6 +338,7 @@ class BaseTask(object):
# accelerate predict
self.is_best_model_loaded = False
self._predictor = None
# set default phase
self.enter_phase("train")
......@@ -314,6 +383,9 @@ class BaseTask(object):
logger.info("The best model has been loaded")
def _build_env(self):
"""
building the program and strategy for specific running phase.
"""
if self.env.is_inititalized:
return
......@@ -338,46 +410,6 @@ class BaseTask(object):
hub.common.paddle_helper.set_op_attr(
self.env.main_program, is_test=True)
if self.config.use_pyreader:
t_program = fluid.Program()
with fluid.program_guard(t_program, self.env.startup_program):
self.env.py_reader = fluid.layers.py_reader(
capacity=64,
shapes=[var.shape for var in self.feed_var_list],
dtypes=[dtype_map[var.dtype] for var in self.feed_var_list],
lod_levels=[var.lod_level for var in self.feed_var_list],
use_double_buffer=False)
feed_var_list = self.feed_var_list
py_vars = fluid.layers.read_file(self.env.py_reader)
py_vars = to_list(py_vars)
input_dict = {
feed_var_list[index].name: py_var
for index, py_var in enumerate(py_vars)
}
hub.connect_program(
pre_program=t_program,
next_program=self.env.main_program,
input_dict=input_dict,
need_log=False)
self.env.main_program = t_program
if not self.is_predict_phase:
self.env.loss = self.env.main_program.global_block().vars[
self.env.loss.name]
metrics_name = [var.name for var in self.env.metrics]
self.env.metrics = [
self.env.main_program.global_block().vars[name]
for name in metrics_name
]
outputs_name = [var.name for var in self.env.outputs]
self.env.outputs = [
self.env.main_program.global_block().vars[name]
for name in outputs_name
]
if self.config.enable_memory_optim:
for var_name in self.fetch_list:
var = self.env.main_program.global_block().vars[var_name]
......@@ -405,7 +437,8 @@ class BaseTask(object):
self.env.main_program).with_data_parallel(
loss_name=loss_name,
share_vars_from=share_vars_from,
build_strategy=self.build_strategy)
build_strategy=self.build_strategy,
places=self.places)
self.exe.run(self.env.startup_program)
self._build_env_end_event()
......@@ -501,7 +534,10 @@ class BaseTask(object):
else:
data = None
self.env.reader = self._base_data_reader.data_generator(
batch_size=self.config.batch_size, phase=self.phase, data=data)
batch_size=self.config.batch_size,
phase=self.phase,
data=data,
return_list=not self.config.use_pyreader)
return self.env.reader
@property
......@@ -566,6 +602,9 @@ class BaseTask(object):
@property
def tb_writer(self):
"""
get tb_writer for visualization.
"""
if not os.path.exists(self.config.checkpoint_dir):
mkdir(self.config.checkpoint_dir)
tb_log_dir = os.path.join(self.config.checkpoint_dir, "visualization")
......@@ -574,7 +613,18 @@ class BaseTask(object):
return self._tb_writer
def create_event_function(self, hook_type):
"""
create handlers for specific event.
Args:
hook_type (str): specific event name
Returns:
func: executable function, the class method will receive a parameter named self.
"""
def hook_function(self, *args):
# all the handler in self._hooks[hook_type] will be configured to executable
for name, func in self._hooks[hook_type].items():
if inspect.ismethod(func):
func(*args)
......@@ -587,20 +637,52 @@ class BaseTask(object):
def hooks(self):
return self._hooks
def hooks_info(self, only_customized=True):
return self._hooks.info(only_customized)
def hooks_info(self, show_default=False):
"""
get the hooks information, including the source code.
Args:
show_default (bool): show the information of Paddlehub default hooks or not, default False
Returns:
str: the formatted string of the hooks information
"""
return self._hooks.info(show_default)
def add_hook(self, hook_type, name=None, func=None):
"""
add the handler function to spectific event.
Args:
hook_type (str): the spectific event name
name (str): the handler function name, default None
func (func): the handler function, default None
"""
if name == None:
name = "hook_%s" % id(func)
self._hooks.add(hook_type, name=name, func=func)
logger.info("Add hook %s:%s successfully" % (hook_type, name))
def delete_hook(self, hook_type, name):
"""
delete the handler function of spectific event.
Args:
hook_type (str): the spectific event name
name (str): the handler function name
"""
self._hooks.delete(hook_type, name)
logger.info("Delete hook %s:%s successfully" % (hook_type, name))
def modify_hook(self, hook_type, name, func):
"""
modify the handler function of spectific event.
Args:
hook_type (str): the spectific event name
name (str): the handler function name
func (func): the new handler function
"""
self._hooks.modify(hook_type, name, func)
logger.info("Modify hook %s:%s successfully" % (hook_type, name))
......@@ -627,6 +709,12 @@ class BaseTask(object):
logger.info("Evaluation on {} dataset start".format(self.phase))
def _default_eval_end_event(self, run_states):
"""
Paddlehub default handler for eval_end_event, it will complete visualization and metrics calculation
Args:
run_states (object): the results in eval phase
"""
eval_scores, eval_loss, run_speed = self._calculate_metrics(run_states)
if 'train' in self._envs:
self.tb_writer.add_scalar(
......@@ -665,6 +753,12 @@ class BaseTask(object):
self.save_inference_model(dirname=model_saved_dir)
def _default_log_interval_event(self, run_states):
"""
PaddleHub default handler for log_interval_event, it will complete visualization.
Args:
run_states (object): the results in train phase
"""
scores, avg_loss, run_speed = self._calculate_metrics(run_states)
self.tb_writer.add_scalar(
tag="Loss_{}".format(self.phase),
......@@ -763,6 +857,15 @@ class BaseTask(object):
return self.finetune(do_eval=True)
def finetune(self, do_eval=False):
"""
train and finetune the module parameters.
Args:
do_eval (bool): do eval during train phase or not
Returns:
RunState: the running result of train phase
"""
# Start to finetune
with self.phase_guard(phase="train"):
......@@ -777,6 +880,9 @@ class BaseTask(object):
# Final evaluation
if self._base_data_reader.get_dev_examples() != []:
# Warning: DO NOT use self.eval(phase="dev", load_best_model=True) during training.
# It will cause trainer unable to continue training from checkpoint after eval.
# More important, The model should evaluate current performance during training.
self.eval(phase="dev")
if self._base_data_reader.get_test_examples() != []:
self.eval(phase="test", load_best_model=True)
......@@ -787,6 +893,16 @@ class BaseTask(object):
return run_states
def eval(self, phase="dev", load_best_model=False):
"""
evaluate the performance of current module.
Args:
phase (str): current run phase
load_best_model (bool): load the best model or not
Returns:
RunState: the running result of eval phase
"""
# Warning: DO NOT use eval(load_best_model=True) in finetune_and_eval
# It will cause trainer unable to continue training from checkpoint after eval
# More important, The model should evaluate current performance during training.
......@@ -800,15 +916,96 @@ class BaseTask(object):
self._eval_end_event(run_states)
return run_states
def predict(self, data, load_best_model=True, return_result=False):
def _create_predictor(self):
"""
create high-performance predictor for predict.
Returns:
PaddlePredictor: the high-performance predictor
"""
with tmp_dir() as _dir:
self.save_inference_model(dirname=_dir)
predictor_config = fluid.core.AnalysisConfig(_dir)
if self.config.use_cuda:
predictor_config.enable_use_gpu(100, 0)
predictor_config.switch_ir_optim(True)
else:
predictor_config.disable_gpu()
predictor_config.enable_memory_optim()
return fluid.core.create_paddle_predictor(predictor_config)
def _run_with_predictor(self):
"""
use high-performance predictor to make prediction.
Returns:
RunState: the running result of predict phase
"""
if isinstance(self._base_data_reader, hub.reader.LACClassifyReader):
raise Exception(
"LACClassifyReader does not support predictor, please close accelerate_mode"
)
global_run_states = []
period_run_states = []
for run_step, batch in enumerate(self.reader(), start=1):
step_run_state = RunState(len(self.fetch_list))
step_run_state.run_step = 1
num_batch_examples = len(batch)
if not self.config.use_pyreader:
# if use pyreader, the nlp_reader return [batch]
batch = batch[0]
batch = [fluid.core.PaddleTensor(data) for data in batch]
fetch_result = self._predictor.run(batch)
for index, result in enumerate(fetch_result):
step_run_state.run_results[index] = result.as_ndarray()
step_run_state.run_examples += num_batch_examples
step_run_state.update()
period_run_states += [step_run_state]
self._run_step_event(step_run_state)
global_run_states += period_run_states
return global_run_states
def predict(self,
data,
load_best_model=True,
return_result=False,
accelerate_mode=False):
"""
make prediction for the input data.
Args:
data (list): the data will be predicted.
load_best_model (bool): load the best model or not
return_result (bool): return a readable result or just the raw run result
accelerate_mode (bool): use high-performance predictor or not
Returns:
RunState: the running result of predict phase
"""
self.accelerate_mode = accelerate_mode
with self.phase_guard(phase="predict"):
self._predict_data = data
self._predict_start_event()
if load_best_model:
self.init_if_load_best_model()
else:
self.init_if_necessary()
self._predict_data = data
self._predict_start_event()
run_states = self._run()
if not self.accelerate_mode:
run_states = self._run()
else:
if not self._predictor:
self._predictor = self._create_predictor()
run_states = self._run_with_predictor()
self._predict_end_event(run_states)
self._predict_data = None
if return_result:
......@@ -816,6 +1013,15 @@ class BaseTask(object):
return run_states
def _postprocessing(self, run_states):
"""
postprocessing the run result, get readable result.
Args:
run_states (RunState): the raw run result to be processed
Returns:
list: readable result
"""
results = []
for batch_state in run_states:
batch_result = batch_state.run_results[0]
......@@ -823,130 +1029,70 @@ class BaseTask(object):
return results
def _run(self, do_eval=False):
with fluid.program_guard(self.main_program, self.startup_program):
if self.config.use_pyreader:
return self._run_with_py_reader(do_eval=do_eval)
return self._run_with_data_feeder(do_eval=do_eval)
def _run_with_data_feeder(self, do_eval=False):
"""
load data and run the program.
data_feeder = fluid.DataFeeder(
feed_list=self.feed_list, place=self.place)
Args:
do_eval (bool): do eval during train phase or not
global_run_states = []
period_run_states = []
Returns:
RunState: the running result of specific phase
"""
with fluid.program_guard(self.main_program, self.startup_program):
if self.config.use_pyreader:
data_loader = fluid.io.DataLoader.from_generator(
feed_list=self.feed_var_list,
capacity=64,
use_double_buffer=True,
iterable=True)
data_reader = data_loader.set_batch_generator(
self.reader, places=self.places)
else:
data_feeder = fluid.DataFeeder(
feed_list=self.feed_list, place=self.place)
data_reader = data_feeder.decorate_reader(
self.reader,
multi_devices=self.config.use_data_parallel,
drop_last=True)
parallel_batch = []
for run_step, batch in enumerate(self.reader(), start=1):
if self.config.use_data_parallel:
parallel_batch += batch
if len(parallel_batch) < self.device_count:
continue
else:
batch = parallel_batch
parallel_batch = []
global_run_states = []
period_run_states = []
step_run_state = RunState(len(self.fetch_list))
step_run_state.run_step = 1
num_batch_examples = len(batch)
for run_step, batch in enumerate(data_reader(), start=1):
step_run_state = RunState(len(self.fetch_list))
step_run_state.run_step = 1
num_batch_examples = len(batch)
if self.return_numpy:
fetch_result = self.exe.run(
self.main_program_to_be_run,
feed=data_feeder.feed(batch),
fetch_list=self.fetch_list)
else:
fetch_result = self.exe.run(
self.main_program_to_be_run,
feed=data_feeder.feed(batch),
feed=batch,
fetch_list=self.fetch_list,
return_numpy=False)
fetch_result = [np.array(x) for x in fetch_result]
for index, result in enumerate(fetch_result):
step_run_state.run_results[index] = result
step_run_state.run_examples += num_batch_examples
step_run_state.update()
period_run_states += [step_run_state]
self.env.current_step += 1
if self.is_train_phase:
if self.current_step % self.config.log_interval == 0:
self._log_interval_event(period_run_states)
global_run_states += period_run_states
period_run_states = []
if self.config.save_ckpt_interval and self.current_step % self.config.save_ckpt_interval == 0:
self._save_ckpt_interval_event()
if do_eval and self.current_step % self.config.eval_interval == 0:
self._eval_interval_event()
self._run_step_event(step_run_state)
global_run_states += period_run_states
return global_run_states
def _run_with_py_reader(self, do_eval=False):
flag = False
use_data_parallel_backup = self.config.use_data_parallel
while True:
global_run_states = []
period_run_states = []
self.py_reader.decorate_paddle_reader(self.reader)
self.py_reader.start()
try:
while True:
num_batch_examples = self.config.batch_size * self.device_count
step_run_state = RunState(len(self.fetch_list))
step_run_state.run_step = 1
if self.return_numpy:
fetch_result = self.exe.run(
self.main_program_to_be_run,
fetch_list=self.fetch_list)
else:
fetch_result = self.exe.run(
self.main_program_to_be_run,
fetch_list=self.fetch_list,
return_numpy=False)
fetch_result = [np.array(x) for x in fetch_result]
for index, result in enumerate(fetch_result):
step_run_state.run_results[index] = result
step_run_state.run_examples += num_batch_examples
step_run_state.update()
period_run_states += [step_run_state]
self.env.current_step += 1
if self.is_train_phase:
if self.current_step % self.config.log_interval == 0:
self._log_interval_event(period_run_states)
global_run_states += period_run_states
period_run_states = []
if self.config.save_ckpt_interval and self.current_step % self.config.save_ckpt_interval == 0:
self._save_ckpt_interval_event()
if do_eval and self.current_step % self.config.eval_interval == 0:
self._eval_interval_event()
self._run_step_event(step_run_state)
except fluid.core.EOFException:
global_run_states += period_run_states
self.py_reader.reset()
'''
When opening use_data_parallel and use_pyreader, if the amount of data is too small,
the reader will have thrown EOF Exception when not fetching to the running result.
In this case, temporarily close the use_data_parallel to get the result.
'''
if flag:
self.config._use_data_parallel = use_data_parallel_backup
elif len(global_run_states) == 0:
flag = True
self.config._use_data_parallel = False
continue
break
return global_run_states
return_numpy=self.return_numpy)
if not self.return_numpy:
fetch_result = [np.array(x) for x in fetch_result]
for index, result in enumerate(fetch_result):
step_run_state.run_results[index] = result
step_run_state.run_examples += num_batch_examples
step_run_state.update()
period_run_states += [step_run_state]
self.env.current_step += 1
if self.is_train_phase:
if self.current_step % self.config.log_interval == 0:
self._log_interval_event(period_run_states)
global_run_states += period_run_states
period_run_states = []
if self.config.save_ckpt_interval and self.current_step % self.config.save_ckpt_interval == 0:
self._save_ckpt_interval_event()
if do_eval and self.current_step % self.config.eval_interval == 0:
self._eval_interval_event()
self._run_step_event(step_run_state)
global_run_states += period_run_states
return global_run_states
def __repr__(self):
return "Task: %s with metrics_choices: %s, reader: %s, %s" % (
......
......@@ -409,7 +409,8 @@ class ReadingComprehensionTask(BaseTask):
def _build_net(self):
self.unique_ids = fluid.layers.data(
name="unique_ids", shape=[-1, 1], lod_level=0, dtype="int64")
# to avoid memory optimization
_ = fluid.layers.assign(self.unique_ids)
logits = fluid.layers.fc(
input=self.feature,
size=2,
......
......@@ -64,17 +64,17 @@ class SequenceLabelTask(BaseTask):
return True
def _build_net(self):
self.seq_len = fluid.layers.data(
name="seq_len", shape=[1], dtype='int64', lod_level=0)
if version_compare(paddle.__version__, "1.6"):
self.seq_len = fluid.layers.data(
name="seq_len", shape=[-1], dtype='int64')
self.seq_len_used = fluid.layers.squeeze(self.seq_len, axes=[1])
else:
self.seq_len = fluid.layers.data(
name="seq_len", shape=[1], dtype='int64')
seq_len = fluid.layers.assign(self.seq_len)
self.seq_len_used = self.seq_len
if self.add_crf:
unpad_feature = fluid.layers.sequence_unpad(
self.feature, length=self.seq_len)
self.feature, length=self.seq_len_used)
self.emission = fluid.layers.fc(
size=self.num_classes,
input=unpad_feature,
......@@ -103,7 +103,6 @@ class SequenceLabelTask(BaseTask):
self.ret_infers = fluid.layers.reshape(
x=fluid.layers.argmax(self.logits, axis=2), shape=[-1, 1])
ret_infers = fluid.layers.assign(self.ret_infers)
logits = self.logits
logits = fluid.layers.flatten(logits, axis=2)
......@@ -118,7 +117,8 @@ class SequenceLabelTask(BaseTask):
def _add_loss(self):
if self.add_crf:
labels = fluid.layers.sequence_unpad(self.labels[0], self.seq_len)
labels = fluid.layers.sequence_unpad(self.labels[0],
self.seq_len_used)
crf_cost = fluid.layers.linear_chain_crf(
input=self.emission,
label=labels,
......@@ -133,7 +133,8 @@ class SequenceLabelTask(BaseTask):
def _add_metrics(self):
if self.add_crf:
labels = fluid.layers.sequence_unpad(self.labels[0], self.seq_len)
labels = fluid.layers.sequence_unpad(self.labels[0],
self.seq_len_used)
(precision, recall, f1_score, num_infer_chunks, num_label_chunks,
num_correct_chunks) = fluid.layers.chunk_eval(
input=self.outputs[0],
......@@ -146,7 +147,7 @@ class SequenceLabelTask(BaseTask):
else:
self.ret_labels = fluid.layers.reshape(
x=self.labels[0], shape=[-1, 1])
return [self.ret_labels, self.ret_infers, self.seq_len]
return [self.ret_labels, self.ret_infers, self.seq_len_used]
def _calculate_metrics(self, run_states):
total_infer = total_label = total_correct = loss_sum = 0
......@@ -214,7 +215,7 @@ class SequenceLabelTask(BaseTask):
if self.is_train_phase or self.is_test_phase:
return [metric.name for metric in self.metrics] + [self.loss.name]
elif self.is_predict_phase:
return [self.ret_infers.name] + [self.seq_len.name]
return [self.ret_infers.name] + [self.seq_len_used.name]
return [output.name for output in self.outputs]
def _postprocessing(self, run_states):
......
#coding:utf-8
# coding:utf-8
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
......@@ -77,7 +77,8 @@ class ImageClassificationReader(BaseReader):
batch_size=1,
phase="train",
shuffle=False,
data=None):
data=None,
return_list=True):
if phase != 'predict' and not self.dataset:
raise ValueError("The dataset is none and it's not allowed!")
if phase == "train":
......@@ -135,14 +136,48 @@ class ImageClassificationReader(BaseReader):
def _data_reader():
if shuffle:
np.random.shuffle(data)
images = []
labels = []
if phase == "predict":
for image_path in data:
image = preprocess(image_path)
yield (image, )
images.append(image.astype('float32'))
if len(images) == batch_size:
# predictor must receive numpy array not list
images = np.array([images]).astype('float32')
if return_list:
# for DataFeeder
yield [images]
else:
# for DataLoader
yield images
images = []
if images:
images = np.array([images]).astype('float32')
if return_list:
yield [images]
else:
yield images
images = []
else:
for image_path, label in data:
image = preprocess(image_path)
yield (image, label)
return paddle.batch(_data_reader, batch_size=batch_size)
images.append(image.astype('float32'))
labels.append([int(label)])
if len(images) == batch_size:
if return_list:
yield [[images, labels]]
else:
yield [images, labels]
images = []
labels = []
if images:
if return_list:
yield [[images, labels]]
else:
yield [images, labels]
images = []
labels = []
return _data_reader
......@@ -22,7 +22,7 @@ import numpy as np
import six
from collections import namedtuple
import paddle
import paddle.fluid as fluid
from paddlehub.reader import tokenization
from paddlehub.common.logger import logger
......@@ -203,7 +203,8 @@ class BaseNLPReader(BaseReader):
batch_size=1,
phase='train',
shuffle=True,
data=None):
data=None,
return_list=True):
if phase != 'predict' and not self.dataset:
raise ValueError("The dataset is None ! It isn't allowed.")
if phase == 'train':
......@@ -255,7 +256,12 @@ class BaseNLPReader(BaseReader):
for batch_data in self._prepare_batch_data(
examples, batch_size, phase=phase):
yield [batch_data]
if return_list:
# for DataFeeder
yield [batch_data]
else:
# for DataLoader
yield batch_data
return wrapper
......@@ -666,7 +672,8 @@ class RegressionReader(BaseNLPReader):
batch_size=1,
phase='train',
shuffle=True,
data=None):
data=None,
return_list=True):
if phase != 'predict' and not self.dataset:
raise ValueError("The dataset is none and it's not allowed.")
if phase == 'train':
......@@ -715,7 +722,12 @@ class RegressionReader(BaseNLPReader):
for batch_data in self._prepare_batch_data(
examples, batch_size, phase=phase):
yield [batch_data]
if return_list:
# for DataFeeder
yield [batch_data]
else:
# for DataLoader
yield batch_data
return wrapper
......@@ -884,7 +896,8 @@ class ReadingComprehensionReader(BaseNLPReader):
batch_size=1,
phase='train',
shuffle=False,
data=None):
data=None,
return_list=True):
# we need all_examples and all_features in write_prediction in reading_comprehension_task
# we can also use all_examples and all_features to avoid duplicate long-time preprocessing
examples = None
......@@ -926,7 +939,12 @@ class ReadingComprehensionReader(BaseNLPReader):
for batch_data in self._prepare_batch_data(
features, batch_size, phase=phase):
yield [batch_data]
if return_list:
# for DataFeeder
yield [batch_data]
else:
# for DataLoader
yield batch_data
return wrapper
......@@ -1147,12 +1165,20 @@ class LACClassifyReader(BaseReader):
self.feed_key = list(
self.lac.processor.data_format(
sign_name="lexical_analysis").keys())[0]
self.has_processed = {
"train": False,
"dev": False,
"val": False,
"test": False,
"predict": False
}
def data_generator(self,
batch_size=1,
phase="train",
shuffle=False,
data=None):
data=None,
return_list=True):
if phase != "predict" and not self.dataset:
raise ValueError("The dataset is None and it isn't allowed.")
if phase == "train":
......@@ -1180,32 +1206,96 @@ class LACClassifyReader(BaseReader):
self.vocab[word] for word in processed[0]['word']
if word in self.vocab
]
if len(processed) == 0:
if six.PY2:
text = text.encode(sys_stdout_encoding())
logger.warning(
"The words in text %s can't be found in the vocabulary." %
(text))
return processed
if not self.has_processed[phase]:
logger.info(
"processing %s data now... this may take a few minutes" % phase)
for i in range(len(data)):
if phase == "predict":
data[i] = preprocess(data[i])
else:
data[i].text_a = preprocess(data[i].text_a)
if self.label_map:
if data[i].label not in self.label_map:
raise KeyError("example.label = {%s} not in label" %
data[i].label)
label_id = self.label_map[data[i].label]
else:
label_id = data[i].label
data[i].label = label_id
self.has_processed[phase] = True
def _data_reader():
if shuffle:
np.random.shuffle(data)
texts = []
labels = []
if phase == "predict":
for text in data:
text = preprocess(text)
if not text:
continue
yield (text, )
texts.append(text)
if len(texts) == batch_size:
if return_list:
# for DataFeeder
# if you want to use high-performance predictor, yield [[[t] for t in texts]]
yield [[t] for t in texts]
else:
# for DataLoader
# cannot use in high-performance predictor, as PaddleTensor rejects lod_tensor
texts = fluid.create_lod_tensor(
texts, [[len(seq) for seq in texts]],
fluid.CPUPlace())
yield [texts]
texts = []
if texts:
if return_list:
yield [[t] for t in texts]
else:
texts = fluid.create_lod_tensor(
texts, [[len(seq) for seq in texts]],
fluid.CPUPlace())
yield [texts]
texts = []
else:
for item in data:
text = preprocess(item.text_a)
text = item.text_a
if not text:
continue
yield (text, item.label)
return paddle.batch(_data_reader, batch_size=batch_size)
texts.append(text)
labels.append([item.label])
if len(texts) == batch_size:
if return_list:
yield list(zip(texts, labels))
else:
texts = fluid.create_lod_tensor(
texts, [[len(seq) for seq in texts]],
fluid.CPUPlace())
yield [texts, labels]
texts = []
labels = []
if texts:
if return_list:
yield list(zip(texts, labels))
else:
texts = fluid.create_lod_tensor(
texts, [[len(seq) for seq in texts]],
fluid.CPUPlace())
yield [texts, labels]
texts = []
labels = []
return _data_reader
if __name__ == '__main__':
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册