""" SimNet Task """ from __future__ import absolute_import from __future__ import division from __future__ import print_function import os import time import argparse import multiprocessing import sys sys.path.append("..") import paddle import paddle.fluid as fluid import numpy as np import codecs import config import utils import reader import models.matching.paddle_layers as layers import logging parser = argparse.ArgumentParser(__doc__) model_g = utils.ArgumentGroup(parser, "model", "model configuration and paths.") model_g.add_arg("config_path", str, None, "Path to the json file for EmoTect model config.") model_g.add_arg("init_checkpoint", str, None, "Init checkpoint to resume training from.") model_g.add_arg("output_dir", str, None, "Directory path to save checkpoints") model_g.add_arg("task_mode", str, None, "task mode: pairwise or pointwise") train_g = utils.ArgumentGroup(parser, "training", "training options.") train_g.add_arg("epoch", int, 10, "Number of epoches for training.") train_g.add_arg("save_steps", int, 200, "The steps interval to save checkpoints.") train_g.add_arg("validation_steps", int, 100, "The steps interval to evaluate model performance.") log_g = utils.ArgumentGroup(parser, "logging", "logging related") log_g.add_arg("skip_steps", int, 10, "The steps interval to print loss.") log_g.add_arg("verbose_result", bool, True, "Whether to output verbose result.") log_g.add_arg("test_result_path", str, "test_result", "Directory path to test result.") log_g.add_arg("infer_result_path", str, "infer_result", "Directory path to infer result.") data_g = utils.ArgumentGroup( parser, "data", "Data paths, vocab paths and data processing options") data_g.add_arg("train_data_dir", str, None, "Directory path to training data.") data_g.add_arg("valid_data_dir", str, None, "Directory path to valid data.") data_g.add_arg("test_data_dir", str, None, "Directory path to testing data.") data_g.add_arg("infer_data_dir", str, None, "Directory path to infer data.") data_g.add_arg("vocab_path", str, None, "Vocabulary path.") data_g.add_arg("batch_size", int, 32, "Total examples' number in batch for training.") run_type_g = utils.ArgumentGroup(parser, "run_type", "running type options.") run_type_g.add_arg("use_cuda", bool, False, "If set, use GPU for training.") run_type_g.add_arg("task_name", str, None, "The name of task to perform sentiment classification.") run_type_g.add_arg("do_train", bool, False, "Whether to perform training.") run_type_g.add_arg("do_valid", bool, False, "Whether to perform dev.") run_type_g.add_arg("do_test", bool, False, "Whether to perform testing.") run_type_g.add_arg("do_infer", bool, False, "Whether to perform inference.") run_type_g.add_arg("compute_accuracy", bool, False, "Whether to compute accuracy.") run_type_g.add_arg( "lamda", float, 0.91, "When task_mode is pairwise, lamda is the threshold for calculating the accuracy." ) parser.add_argument( '--enable_ce', action='store_true', help='If set, run the task with continuous evaluation logs.') args = parser.parse_args() def train(conf_dict, args): """ train processic """ if args.enable_ce: SEED = 102 fluid.default_startup_program().random_seed = SEED fluid.default_main_program().random_seed = SEED # loading vocabulary vocab = utils.load_vocab(args.vocab_path) # get vocab size conf_dict['dict_size'] = len(vocab) # Get data layer data = layers.DataLayer() # Load network structure dynamically net = utils.import_class("../models/matching", conf_dict["net"]["module_name"], conf_dict["net"]["class_name"])(conf_dict) # Load loss function dynamically loss = utils.import_class("../models/matching/losses", conf_dict["loss"]["module_name"], conf_dict["loss"]["class_name"])(conf_dict) # Load Optimization method optimizer = utils.import_class( "../models/matching/optimizers", "paddle_optimizers", conf_dict["optimizer"]["class_name"])(conf_dict) # load auc method metric = fluid.metrics.Auc(name="auc") # Get device if args.use_cuda: place = fluid.CUDAPlace(0) else: place = fluid.CPUPlace() simnet_process = reader.SimNetProcessor(args, vocab) if args.task_mode == "pairwise": # Build network left = data.ops(name="left", shape=[1], dtype="int64", lod_level=1) pos_right = data.ops(name="right", shape=[1], dtype="int64", lod_level=1) neg_right = data.ops(name="neg_right", shape=[1], dtype="int64", lod_level=1) left_feat, pos_score = net.predict(left, pos_right) # Get Feeder and Reader train_feeder = fluid.DataFeeder( place=place, feed_list=[left.name, pos_right.name, neg_right.name]) train_reader = simnet_process.get_reader("train") if args.do_valid: valid_feeder = fluid.DataFeeder( place=place, feed_list=[left.name, pos_right.name]) valid_reader = simnet_process.get_reader("valid") pred = pos_score # Save Infer model infer_program = fluid.default_main_program().clone(for_test=True) _, neg_score = net.predict(left, neg_right) avg_cost = loss.compute(pos_score, neg_score) avg_cost.persistable = True else: # Build network left = data.ops(name="left", shape=[1], dtype="int64", lod_level=1) right = data.ops(name="right", shape=[1], dtype="int64", lod_level=1) label = data.ops(name="label", shape=[1], dtype="int64", lod_level=0) left_feat, pred = net.predict(left, right) # Get Feeder and Reader train_feeder = fluid.DataFeeder( place=place, feed_list=[left.name, right.name, label.name]) train_reader = simnet_process.get_reader("train") if args.do_valid: valid_feeder = fluid.DataFeeder( place=place, feed_list=[left.name, right.name]) valid_reader = simnet_process.get_reader("valid") # Save Infer model infer_program = fluid.default_main_program().clone(for_test=True) avg_cost = loss.compute(pred, label) avg_cost.persistable = True # operate Optimization optimizer.ops(avg_cost) executor = fluid.Executor(place) executor.run(fluid.default_startup_program()) if args.init_checkpoint is not None: utils.init_checkpoint(executor, args.init_checkpoint, fluid.default_startup_program()) # Get and run executor parallel_executor = fluid.ParallelExecutor( use_cuda=args.use_cuda, loss_name=avg_cost.name, main_program=fluid.default_main_program()) # Get device number device_count = parallel_executor.device_count logging.info("device count: %d" % device_count) def valid_and_test(program, feeder, reader, process, mode="test"): """ return auc and acc """ # Get Batch Data batch_data = paddle.batch(reader, args.batch_size, drop_last=False) pred_list = [] for data in batch_data(): _pred = executor.run(program=program, feed=feeder.feed(data), fetch_list=[pred.name]) pred_list += list(_pred) pred_list = np.vstack(pred_list) if mode == "test": label_list = process.get_test_label() elif mode == "valid": label_list = process.get_valid_label() if args.task_mode == "pairwise": pred_list = (pred_list + 1) / 2 pred_list = np.hstack( (np.ones_like(pred_list) - pred_list, pred_list)) metric.reset() metric.update(pred_list, label_list) auc = metric.eval() if args.compute_accuracy: acc = utils.get_accuracy(pred_list, label_list, args.task_mode, args.lamda) return auc, acc else: return auc # run train logging.info("start train process ...") # set global step global_step = 0 ce_info = [] for epoch_id in range(args.epoch): losses = [] # Get batch data iterator train_batch_data = paddle.batch( paddle.reader.shuffle( train_reader, buf_size=10000), args.batch_size, drop_last=False) start_time = time.time() for iter, data in enumerate(train_batch_data()): if len(data) < device_count: logging.info( "the size of batch data is less than device_count(%d)" % device_count) continue global_step += 1 avg_loss = parallel_executor.run([avg_cost.name], feed=train_feeder.feed(data)) if args.do_valid and global_step % args.validation_steps == 0: valid_result = valid_and_test( program=infer_program, feeder=valid_feeder, reader=valid_reader, process=simnet_process, mode="valid") if args.compute_accuracy: valid_auc, valid_acc = valid_result logging.info( "global_steps: %d, valid_auc: %f, valid_acc: %f" % (global_step, valid_auc, valid_acc)) else: valid_auc = valid_result logging.info("global_steps: %d, valid_auc: %f" % (global_step, valid_auc)) if global_step % args.save_steps == 0: model_save_dir = os.path.join(args.output_dir, conf_dict["model_path"]) model_path = os.path.join(model_save_dir, str(global_step)) if not os.path.exists(model_save_dir): os.makedirs(model_save_dir) if args.task_mode == "pairwise": feed_var_names = [left.name, pos_right.name] target_vars = [left_feat, pos_score] else: feed_var_names = [ left.name, right.name, ] target_vars = [left_feat, pred] fluid.io.save_inference_model(model_path, feed_var_names, target_vars, executor, infer_program) logging.info("saving infer model in %s" % model_path) losses.append(np.mean(avg_loss[0])) end_time = time.time() logging.info("epoch: %d, loss: %f, used time: %d sec" % (epoch_id, np.mean(losses), end_time - start_time)) ce_info.append([np.mean(losses), end_time - start_time]) if args.enable_ce: card_num = get_cards() ce_loss = 0 ce_time = 0 try: ce_loss = ce_info[-2][0] ce_time = ce_info[-2][1] except: logging.info("ce info err!") print("kpis\teach_step_duration_%s_card%s\t%s" % (args.task_name, card_num, ce_time)) print("kpis\ttrain_loss_%s_card%s\t%f" % (args.task_name, card_num, ce_loss)) if args.do_test: if args.task_mode == "pairwise": # Get Feeder and Reader test_feeder = fluid.DataFeeder( place=place, feed_list=[left.name, pos_right.name]) test_reader = simnet_process.get_reader("test") else: # Get Feeder and Reader test_feeder = fluid.DataFeeder( place=place, feed_list=[left.name, right.name]) test_reader = simnet_process.get_reader("test") test_result = valid_and_test( program=infer_program, feeder=test_feeder, reader=test_reader, process=simnet_process, mode="test") if args.compute_accuracy: test_auc, test_acc = test_result logging.info("AUC of test is %f, Accuracy of test is %f" % (test_auc, test_acc)) else: test_auc = test_result logging.info("AUC of test is %f" % test_auc) def test(conf_dict, args): """ run predict """ vocab = utils.load_vocab(args.vocab_path) simnet_process = reader.SimNetProcessor(args, vocab) # load auc method metric = fluid.metrics.Auc(name="auc") with codecs.open("predictions.txt", "w", "utf-8") as predictions_file: # Get model path model_path = args.init_checkpoint # Get device if args.use_cuda: place = fluid.CUDAPlace(0) else: place = fluid.CPUPlace() # Get executor executor = fluid.Executor(place=place) # Load model program, feed_var_names, fetch_targets = fluid.io.load_inference_model( model_path, executor) if args.task_mode == "pairwise": # Get Feeder and Reader feeder = fluid.DataFeeder( place=place, feed_list=feed_var_names, program=program) test_reader = simnet_process.get_reader("test") else: # Get Feeder and Reader feeder = fluid.DataFeeder( place=place, feed_list=feed_var_names, program=program) test_reader = simnet_process.get_reader("test") # Get batch data iterator batch_data = paddle.batch(test_reader, args.batch_size, drop_last=False) logging.info("start test process ...") pred_list = [] for iter, data in enumerate(batch_data()): output = executor.run(program, feed=feeder.feed(data), fetch_list=fetch_targets) if args.task_mode == "pairwise": pred_list += list(map(lambda item: float(item[0]), output[1])) predictions_file.write("\n".join( map(lambda item: str((item[0] + 1) / 2), output[1])) + "\n") else: pred_list += map(lambda item: item, output[1]) predictions_file.write("\n".join( map(lambda item: str(np.argmax(item)), output[1])) + "\n") if args.task_mode == "pairwise": pred_list = np.array(pred_list).reshape((-1, 1)) pred_list = (pred_list + 1) / 2 pred_list = np.hstack( (np.ones_like(pred_list) - pred_list, pred_list)) else: pred_list = np.array(pred_list) labels = simnet_process.get_test_label() metric.update(pred_list, labels) if args.compute_accuracy: acc = utils.get_accuracy(pred_list, labels, args.task_mode, args.lamda) logging.info("AUC of test is %f, Accuracy of test is %f" % (metric.eval(), acc)) else: logging.info("AUC of test is %f" % metric.eval()) if args.verbose_result: utils.get_result_file(args) logging.info("test result saved in %s" % os.path.join(os.getcwd(), args.test_result_path)) def infer(args): """ run predict """ vocab = utils.load_vocab(args.vocab_path) simnet_process = reader.SimNetProcessor(args, vocab) # Get model path model_path = args.init_checkpoint # Get device if args.use_cuda: place = fluid.CUDAPlace(0) else: place = fluid.CPUPlace() # Get executor executor = fluid.Executor(place=place) # Load model program, feed_var_names, fetch_targets = fluid.io.load_inference_model( model_path, executor) if args.task_mode == "pairwise": # Get Feeder and Reader infer_feeder = fluid.DataFeeder( place=place, feed_list=feed_var_names, program=program) infer_reader = simnet_process.get_infer_reader else: # Get Feeder and Reader infer_feeder = fluid.DataFeeder( place=place, feed_list=feed_var_names, program=program) infer_reader = simnet_process.get_infer_reader # Get batch data iterator batch_data = paddle.batch(infer_reader, args.batch_size, drop_last=False) logging.info("start test process ...") preds_list = [] for iter, data in enumerate(batch_data()): output = executor.run(program, feed=infer_feeder.feed(data), fetch_list=fetch_targets) if args.task_mode == "pairwise": preds_list += list( map(lambda item: str((item[0] + 1) / 2), output[1])) else: preds_list += map(lambda item: str(np.argmax(item)), output[1]) with codecs.open(args.infer_result_path, "w", "utf-8") as infer_file: for _data, _pred in zip(simnet_process.get_infer_data(), preds_list): infer_file.write(_data + "\t" + _pred + "\n") logging.info("infer result saved in %s" % os.path.join(os.getcwd(), args.infer_result_path)) def get_cards(): num = 0 cards = os.environ.get('CUDA_VISIBLE_DEVICES', '') if cards != '': num = len(cards.split(",")) return num def main(conf_dict, args): """ main """ if args.do_train: train(conf_dict, args) elif args.do_test: test(conf_dict, args) elif args.do_infer: infer(args) else: raise ValueError( "one of do_train and do_test and do_infer must be True") if __name__ == "__main__": utils.print_arguments(args) try: if fluid.is_compiled_with_cuda() != True and args.use_cuda == True: print( "\nYou can not set use_cuda = True in the model because you are using paddlepaddle-cpu.\nPlease: 1. Install paddlepaddle-gpu to run your models on GPU or 2. Set use_cuda = False to run models on CPU.\n" ) sys.exit(1) except Exception as e: pass utils.init_log("./log/TextSimilarityNet") conf_dict = config.SimNetConfig(args) main(conf_dict, args)