From 0c0860edc56b22e5123e37049dcd174ffa69b571 Mon Sep 17 00:00:00 2001 From: Reed Date: Mon, 29 Oct 2018 15:43:21 -0700 Subject: [PATCH] Add option to not use estimator. (#5623) The option is --nouse_estimator --- official/recommendation/data_preprocessing.py | 108 +++++---- official/recommendation/model_runner.py | 207 ++++++++++++++++++ official/recommendation/ncf_main.py | 127 ++++++----- official/recommendation/ncf_test.py | 8 + official/recommendation/neumf_model.py | 18 +- 5 files changed, 373 insertions(+), 95 deletions(-) create mode 100644 official/recommendation/model_runner.py diff --git a/official/recommendation/data_preprocessing.py b/official/recommendation/data_preprocessing.py index d5bad72fd..74b595384 100644 --- a/official/recommendation/data_preprocessing.py +++ b/official/recommendation/data_preprocessing.py @@ -593,49 +593,29 @@ def hash_pipeline(dataset, deterministic): tf.logging.info(" [pipeline_hash] All batches hash: {}".format(overall_hash)) -def make_input_fn(ncf_dataset, is_training): - # type: (typing.Optional[NCFDataset], bool) -> (typing.Callable, str, int) +def make_input_fn( + ncf_dataset, # type: typing.Optional[NCFDataset] + is_training, # type: bool + record_files=None # type: typing.Optional[tf.Tensor] + ): + # type: (...) -> (typing.Callable, str, int) """Construct training input_fn for the current epoch.""" if ncf_dataset is None: return make_synthetic_input_fn(is_training) - if not tf.gfile.Exists(ncf_dataset.cache_paths.subproc_alive): - # The generation subprocess must have been alive at some point, because we - # earlier checked that the subproc_alive file existed. - raise ValueError("Generation subprocess unexpectedly died. Data will not " - "be available; exiting to avoid waiting forever.") - - if is_training: - train_epoch_dir = ncf_dataset.cache_paths.train_epoch_dir - while not tf.gfile.Exists(train_epoch_dir): - tf.logging.info("Waiting for {} to exist.".format(train_epoch_dir)) - time.sleep(1) - - train_data_dirs = tf.gfile.ListDirectory(train_epoch_dir) - while not train_data_dirs: - tf.logging.info("Waiting for data folder to be created.") - time.sleep(1) - train_data_dirs = tf.gfile.ListDirectory(train_epoch_dir) - train_data_dirs.sort() # names are zfilled so that - # lexicographic sort == numeric sort - record_dir = os.path.join(train_epoch_dir, train_data_dirs[0]) - template = rconst.TRAIN_RECORD_TEMPLATE + if record_files is not None: + epoch_metadata = None + batch_count = None + record_dir = None else: - record_dir = ncf_dataset.cache_paths.eval_data_subdir - template = rconst.EVAL_RECORD_TEMPLATE - - ready_file = os.path.join(record_dir, rconst.READY_FILE) - while not tf.gfile.Exists(ready_file): - tf.logging.info("Waiting for records in {} to be ready".format(record_dir)) - time.sleep(1) + epoch_metadata, record_dir, template = get_epoch_info(is_training, + ncf_dataset) + record_files = os.path.join(record_dir, template.format("*")) + # This value is used to check that the batch count from the subprocess + # matches the batch count expected by the main thread. + batch_count = epoch_metadata["batch_count"] - with tf.gfile.Open(ready_file, "r") as f: - epoch_metadata = json.load(f) - - # This value is used to check that the batch count from the subprocess matches - # the batch count expected by the main thread. - batch_count = epoch_metadata["batch_count"] def input_fn(params): """Generated input_fn for the given epoch.""" @@ -646,15 +626,13 @@ def make_input_fn(ncf_dataset, is_training): # populates "batch_size" to the appropriate value. batch_size = params.get("eval_batch_size") or params["batch_size"] - if epoch_metadata["batch_size"] != batch_size: + if epoch_metadata and epoch_metadata["batch_size"] != batch_size: raise ValueError( "Records were constructed with batch size {}, but input_fn was given " "a batch size of {}. This will result in a deserialization error in " "tf.parse_single_example." .format(epoch_metadata["batch_size"], batch_size)) - - record_files = tf.data.Dataset.list_files( - os.path.join(record_dir, template.format("*")), shuffle=False) + record_files_ds = tf.data.Dataset.list_files(record_files, shuffle=False) interleave = tf.contrib.data.parallel_interleave( tf.data.TFRecordDataset, @@ -665,7 +643,7 @@ def make_input_fn(ncf_dataset, is_training): ) deserialize = make_deserialize(params, batch_size, is_training) - dataset = record_files.apply(interleave) + dataset = record_files_ds.apply(interleave) dataset = dataset.map(deserialize, num_parallel_calls=4) dataset = dataset.prefetch(32) @@ -677,6 +655,54 @@ def make_input_fn(ncf_dataset, is_training): return input_fn, record_dir, batch_count +def get_epoch_info(is_training, ncf_dataset): + """Wait for the epoch input data to be ready and return various info about it. + + Args: + is_training: If we should return info for a training or eval epoch. + ncf_dataset: An NCFDataset. + + Returns: + epoch_metadata: A dict with epoch metadata. + record_dir: The directory with the TFRecord files storing the input data. + template: A string template of the files in `record_dir`. + `template.format('*')` is a glob that matches all the record files. + """ + if not tf.gfile.Exists(ncf_dataset.cache_paths.subproc_alive): + # The generation subprocess must have been alive at some point, because we + # earlier checked that the subproc_alive file existed. + raise ValueError("Generation subprocess unexpectedly died. Data will not " + "be available; exiting to avoid waiting forever.") + + if is_training: + train_epoch_dir = ncf_dataset.cache_paths.train_epoch_dir + while not tf.gfile.Exists(train_epoch_dir): + tf.logging.info("Waiting for {} to exist.".format(train_epoch_dir)) + time.sleep(1) + + train_data_dirs = tf.gfile.ListDirectory(train_epoch_dir) + while not train_data_dirs: + tf.logging.info("Waiting for data folder to be created.") + time.sleep(1) + train_data_dirs = tf.gfile.ListDirectory(train_epoch_dir) + train_data_dirs.sort() # names are zfilled so that + # lexicographic sort == numeric sort + record_dir = os.path.join(train_epoch_dir, train_data_dirs[0]) + template = rconst.TRAIN_RECORD_TEMPLATE + else: + record_dir = ncf_dataset.cache_paths.eval_data_subdir + template = rconst.EVAL_RECORD_TEMPLATE + + ready_file = os.path.join(record_dir, rconst.READY_FILE) + while not tf.gfile.Exists(ready_file): + tf.logging.info("Waiting for records in {} to be ready".format(record_dir)) + time.sleep(1) + + with tf.gfile.Open(ready_file, "r") as f: + epoch_metadata = json.load(f) + return epoch_metadata, record_dir, template + + def make_synthetic_input_fn(is_training): """Construct training input_fn that uses synthetic data.""" def input_fn(params): diff --git a/official/recommendation/model_runner.py b/official/recommendation/model_runner.py new file mode 100644 index 000000000..d20e5a4fd --- /dev/null +++ b/official/recommendation/model_runner.py @@ -0,0 +1,207 @@ +# Copyright 2018 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Contains NcfModelRunner, which can train and evaluate an NCF model.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from collections import namedtuple +import os +import time + +import tensorflow as tf + +from tensorflow.contrib.compiler import xla +from official.recommendation import data_preprocessing +from official.recommendation import neumf_model + + +class NcfModelRunner(object): + """Creates a graph to train/evaluate an NCF model, and runs it. + + This class builds both a training model and evaluation model in the graph. + The two models share variables, so that during evaluation, the trained + variables are used. + """ + + # _TrainModelProperties and _EvalModelProperties store useful properties of + # the training and evaluation models, respectively. + # _SHARED_MODEL_PROPERTY_FIELDS is their shared fields. + _SHARED_MODEL_PROPERTY_FIELDS = ( + # A scalar tf.string placeholder tensor, that will be fed the path to the + # directory storing the TFRecord files for the input data. + "record_files_placeholder", + # The tf.data.Iterator to iterate over the input data. + "iterator", + # A scalar float tensor representing the model loss. + "loss", + # The batch size, as a Python int. + "batch_size", + # The op to run the model. For the training model, this trains the model + # for one step. For the evaluation model, this computes the metrics and + # updates the metric variables. + "run_model_op") + _TrainModelProperties = namedtuple("_TrainModelProperties", # pylint: disable=invalid-name + _SHARED_MODEL_PROPERTY_FIELDS) + _EvalModelProperties = namedtuple( # pylint: disable=invalid-name + "_EvalModelProperties", _SHARED_MODEL_PROPERTY_FIELDS + ( + # A dict from metric name to (metric, update_op) tuple. + "metrics", + # Initializes the metric variables. + "metric_initializer",)) + + def __init__(self, ncf_dataset, params): + with tf.Graph().as_default() as self._graph: + if params["use_xla_for_gpu"]: + # The XLA functions we use require resource variables. + tf.enable_resource_variables() + self._ncf_dataset = ncf_dataset + self._global_step = tf.train.create_global_step() + self._train_model_properties = self._build_model(params, is_training=True) + self._eval_model_properties = self._build_model(params, is_training=False) + + initializer = tf.global_variables_initializer() + self._graph.finalize() + self._session = tf.Session(graph=self._graph) + self._session.run(initializer) + + def _build_model(self, params, is_training): + """Builds the NCF model. + + Args: + params: A dict of hyperparameters. + is_training: If True, build the training model. If False, build the + evaluation model. + Returns: + A _TrainModelProperties if is_training is True, or an _EvalModelProperties + otherwise. + """ + record_files_placeholder = tf.placeholder(tf.string, ()) + input_fn, _, _ = \ + data_preprocessing.make_input_fn( + ncf_dataset=self._ncf_dataset, is_training=is_training, + record_files=record_files_placeholder) + dataset = input_fn(params) + iterator = dataset.make_initializable_iterator() + + model_fn = neumf_model.neumf_model_fn + if params["use_xla_for_gpu"]: + model_fn = xla.estimator_model_fn(model_fn) + + if is_training: + features, labels = iterator.get_next() + estimator_spec = model_fn( + features, labels, tf.estimator.ModeKeys.TRAIN, params) + with tf.control_dependencies([estimator_spec.train_op]): + run_model_op = self._global_step.assign_add(1) + return self._TrainModelProperties( + record_files_placeholder, iterator, + estimator_spec.loss, params["batch_size"], run_model_op) + else: + features = iterator.get_next() + estimator_spec = model_fn( + features, None, tf.estimator.ModeKeys.EVAL, params) + run_model_op = tf.group(*(update_op for _, update_op in + estimator_spec.eval_metric_ops.values())) + metric_initializer = tf.variables_initializer( + tf.get_collection(tf.GraphKeys.METRIC_VARIABLES)) + return self._EvalModelProperties( + record_files_placeholder, iterator, estimator_spec.loss, + params["eval_batch_size"], run_model_op, + estimator_spec.eval_metric_ops, metric_initializer) + + def _train_or_eval(self, model_properties, num_steps, is_training): + """Either trains or evaluates, depending on whether `is_training` is True. + + Args: + model_properties: _TrainModelProperties or an _EvalModelProperties + containing the properties of the training or evaluation graph. + num_steps: The number of steps to train or evaluate for. + is_training: If True, run the training model. If False, run the evaluation + model. + + Returns: + record_dir: The directory of TFRecords where the training/evaluation input + data was read from. + """ + if self._ncf_dataset is not None: + epoch_metadata, record_dir, template = data_preprocessing.get_epoch_info( + is_training=is_training, ncf_dataset=self._ncf_dataset) + batch_count = epoch_metadata["batch_count"] + if batch_count != num_steps: + raise ValueError( + "Step counts do not match. ({} vs. {}) The async process is " + "producing incorrect shards.".format(batch_count, num_steps)) + record_files = os.path.join(record_dir, template.format("*")) + initializer_feed_dict = { + model_properties.record_files_placeholder: record_files} + del batch_count + else: + initializer_feed_dict = None + record_dir = None + + self._session.run(model_properties.iterator.initializer, + initializer_feed_dict) + fetches = (model_properties.loss, model_properties.run_model_op) + mode = "Train" if is_training else "Eval" + start = None + for i in range(num_steps): + loss, _, = self._session.run(fetches) + if i % 100 == 0: + if start is None: + # Only start the timer after 100 steps so there is a warmup. + start = time.time() + start_step = i + tf.logging.info("{} Loss = {}".format(mode, loss)) + end = time.time() + if start is not None: + print("{} peformance: {} examples/sec".format( + mode, (i - start_step) * model_properties.batch_size / (end - start))) + return record_dir + + + def train(self, num_train_steps): + """Trains the graph for a single cycle. + + Args: + num_train_steps: The number of steps per cycle to train for. + """ + record_dir = self._train_or_eval(self._train_model_properties, + num_train_steps, is_training=True) + if record_dir: + # We delete the record_dir because each cycle, new TFRecords is generated + # by the async process. + tf.gfile.DeleteRecursively(record_dir) + + def eval(self, num_eval_steps): + """Evaluates the graph on the eval data. + + Args: + num_eval_steps: The number of steps to evaluate for. + + Returns: + A dict of evaluation results. + """ + self._session.run(self._eval_model_properties.metric_initializer) + self._train_or_eval(self._eval_model_properties, num_eval_steps, + is_training=False) + eval_results = { + 'global_step': self._session.run(self._global_step)} + for key, (val, _) in self._eval_model_properties.metrics.items(): + val_ = self._session.run(val) + tf.logging.info("{} = {}".format(key, self._session.run(val))) + eval_results[key] = val_ + return eval_results diff --git a/official/recommendation/ncf_main.py b/official/recommendation/ncf_main.py index 8900d6bf9..5ca9ce06b 100644 --- a/official/recommendation/ncf_main.py +++ b/official/recommendation/ncf_main.py @@ -41,6 +41,7 @@ from tensorflow.contrib.compiler import xla from official.datasets import movielens from official.recommendation import constants as rconst from official.recommendation import data_preprocessing +from official.recommendation import model_runner from official.recommendation import neumf_model from official.utils.flags import core as flags_core from official.utils.logs import hooks_helper @@ -177,30 +178,36 @@ def run_ncf(_): model_helpers.apply_clean(flags.FLAGS) - train_estimator, eval_estimator = construct_estimator( - num_gpus=num_gpus, model_dir=FLAGS.model_dir, params={ - "use_seed": FLAGS.seed is not None, - "hash_pipeline": FLAGS.hash_pipeline, - "batch_size": batch_size, - "eval_batch_size": eval_batch_size, - "learning_rate": FLAGS.learning_rate, - "num_users": num_users, - "num_items": num_items, - "mf_dim": FLAGS.num_factors, - "model_layers": [int(layer) for layer in FLAGS.layers], - "mf_regularization": FLAGS.mf_regularization, - "mlp_reg_layers": [float(reg) for reg in FLAGS.mlp_regularization], - "num_neg": FLAGS.num_neg, - "use_tpu": FLAGS.tpu is not None, - "tpu": FLAGS.tpu, - "tpu_zone": FLAGS.tpu_zone, - "tpu_gcp_project": FLAGS.tpu_gcp_project, - "beta1": FLAGS.beta1, - "beta2": FLAGS.beta2, - "epsilon": FLAGS.epsilon, - "match_mlperf": FLAGS.ml_perf, - "use_xla_for_gpu": FLAGS.use_xla_for_gpu, - }, batch_size=flags.FLAGS.batch_size, eval_batch_size=eval_batch_size) + params = { + "use_seed": FLAGS.seed is not None, + "hash_pipeline": FLAGS.hash_pipeline, + "batch_size": batch_size, + "eval_batch_size": eval_batch_size, + "learning_rate": FLAGS.learning_rate, + "num_users": num_users, + "num_items": num_items, + "mf_dim": FLAGS.num_factors, + "model_layers": [int(layer) for layer in FLAGS.layers], + "mf_regularization": FLAGS.mf_regularization, + "mlp_reg_layers": [float(reg) for reg in FLAGS.mlp_regularization], + "num_neg": FLAGS.num_neg, + "use_tpu": FLAGS.tpu is not None, + "tpu": FLAGS.tpu, + "tpu_zone": FLAGS.tpu_zone, + "tpu_gcp_project": FLAGS.tpu_gcp_project, + "beta1": FLAGS.beta1, + "beta2": FLAGS.beta2, + "epsilon": FLAGS.epsilon, + "match_mlperf": FLAGS.ml_perf, + "use_xla_for_gpu": FLAGS.use_xla_for_gpu, + "use_estimator": FLAGS.use_estimator, + } + if FLAGS.use_estimator: + train_estimator, eval_estimator = construct_estimator( + num_gpus=num_gpus, model_dir=FLAGS.model_dir, params=params, + batch_size=flags.FLAGS.batch_size, eval_batch_size=eval_batch_size) + else: + runner = model_runner.NcfModelRunner(ncf_dataset, params) # Create hooks that log information about the training and metric values train_hooks = hooks_helper.get_train_hooks( @@ -237,37 +244,46 @@ def run_ncf(_): value=cycle_index) # Train the model - train_input_fn, train_record_dir, batch_count = \ - data_preprocessing.make_input_fn( - ncf_dataset=ncf_dataset, is_training=True) - - if batch_count != num_train_steps: - raise ValueError( - "Step counts do not match. ({} vs. {}) The async process is " - "producing incorrect shards.".format(batch_count, num_train_steps)) - - train_estimator.train(input_fn=train_input_fn, hooks=train_hooks, - steps=num_train_steps) - if train_record_dir: - tf.gfile.DeleteRecursively(train_record_dir) - - tf.logging.info("Beginning evaluation.") - if pred_input_fn is None: - pred_input_fn, _, eval_batch_count = data_preprocessing.make_input_fn( - ncf_dataset=ncf_dataset, is_training=False) - - if eval_batch_count != num_eval_steps: + if FLAGS.use_estimator: + train_input_fn, train_record_dir, batch_count = \ + data_preprocessing.make_input_fn( + ncf_dataset=ncf_dataset, is_training=True) + + if batch_count != num_train_steps: raise ValueError( "Step counts do not match. ({} vs. {}) The async process is " - "producing incorrect shards.".format( - eval_batch_count, num_eval_steps)) - - mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_START, - value=cycle_index) - eval_results = eval_estimator.evaluate(pred_input_fn, steps=num_eval_steps) + "producing incorrect shards.".format(batch_count, num_train_steps)) + + train_estimator.train(input_fn=train_input_fn, hooks=train_hooks, + steps=num_train_steps) + if train_record_dir: + tf.gfile.DeleteRecursively(train_record_dir) + + tf.logging.info("Beginning evaluation.") + if pred_input_fn is None: + pred_input_fn, _, eval_batch_count = data_preprocessing.make_input_fn( + ncf_dataset=ncf_dataset, is_training=False) + + if eval_batch_count != num_eval_steps: + raise ValueError( + "Step counts do not match. ({} vs. {}) The async process is " + "producing incorrect shards.".format( + eval_batch_count, num_eval_steps)) + + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_START, + value=cycle_index) + eval_results = eval_estimator.evaluate(pred_input_fn, + steps=num_eval_steps) + tf.logging.info("Evaluation complete.") + else: + runner.train(num_train_steps) + tf.logging.info("Beginning evaluation.") + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_START, + value=cycle_index) + eval_results = runner.eval(num_eval_steps) + tf.logging.info("Evaluation complete.") hr = float(eval_results[rconst.HR_KEY]) ndcg = float(eval_results[rconst.NDCG_KEY]) - tf.logging.info("Evaluation complete.") mlperf_helper.ncf_print( key=mlperf_helper.TAGS.EVAL_TARGET, @@ -472,6 +488,15 @@ def define_ncf_flags(): def xla_validator(flag_dict): return not flag_dict["use_xla_for_gpu"] or not flag_dict["tpu"] + flags.DEFINE_bool( + name="use_estimator", default=True, help=flags_core.help_wrap( + "If True, use Estimator to train. Setting to False is slightly " + "faster, but when False, the following are currently unsupported:\n" + " * Using TPUs\n" + " * Using more than 1 GPU\n" + " * Reloading from checkpoints\n" + " * Any hooks specified with --hooks\n")) + if __name__ == "__main__": tf.logging.set_verbosity(tf.logging.INFO) diff --git a/official/recommendation/ncf_test.py b/official/recommendation/ncf_test.py index 089938cc0..4def07349 100644 --- a/official/recommendation/ncf_test.py +++ b/official/recommendation/ncf_test.py @@ -24,6 +24,7 @@ import mock import numpy as np import tensorflow as tf +from absl import flags from absl.testing import flagsaver from official.recommendation import constants as rconst from official.recommendation import data_preprocessing @@ -249,6 +250,13 @@ class NcfTest(tf.test.TestCase): def test_end_to_end_mlperf(self): ncf_main.main(None) + @flagsaver.flagsaver(use_estimator=False, **_BASE_END_TO_END_FLAGS) + @mock.patch.object(data_preprocessing, "SYNTHETIC_BATCHES_PER_EPOCH", 100) + def test_end_to_end_no_estimator(self): + ncf_main.main(None) + flags.FLAGS.ml_perf = True + ncf_main.main(None) + if __name__ == "__main__": tf.logging.set_verbosity(tf.logging.INFO) diff --git a/official/recommendation/neumf_model.py b/official/recommendation/neumf_model.py index a648a2be3..d0323d841 100644 --- a/official/recommendation/neumf_model.py +++ b/official/recommendation/neumf_model.py @@ -78,7 +78,18 @@ def neumf_model_fn(features, labels, mode, params): users = features[movielens.USER_COLUMN] items = tf.cast(features[movielens.ITEM_COLUMN], tf.int32) - logits = construct_model(users=users, items=items, params=params) + keras_model = params.get("keras_model") + if keras_model: + logits = keras_model([users, items], + training=mode == tf.estimator.ModeKeys.TRAIN) + else: + keras_model = construct_model(users=users, items=items, params=params) + logits = keras_model.output + if not params["use_estimator"] and "keras_model" not in params: + # When we are not using estimator, we need to reuse the Keras model when + # this model_fn is called again, so that the variables are shared between + # training and eval. So we mutate params to add the Keras model. + params["keras_model"] = keras_model # Softmax with the first column of zeros is equivalent to sigmoid. softmax_logits = tf.concat([tf.zeros(logits.shape, dtype=logits.dtype), @@ -242,10 +253,11 @@ def construct_model(users, items, params): name=movielens.RATING_COLUMN)(predict_vector) # Print model topology. - tf.keras.models.Model([user_input, item_input], logits).summary() + model = tf.keras.models.Model([user_input, item_input], logits) + model.summary() sys.stdout.flush() - return logits + return model def compute_eval_loss_and_metrics(logits, # type: tf.Tensor -- GitLab