未验证 提交 0c0860ed 编写于 作者: R Reed 提交者: GitHub

Add option to not use estimator. (#5623)

The option is --nouse_estimator
上级 4298c3a3
......@@ -593,49 +593,29 @@ def hash_pipeline(dataset, deterministic):
tf.logging.info(" [pipeline_hash] All batches hash: {}".format(overall_hash))
def make_input_fn(ncf_dataset, is_training):
# type: (typing.Optional[NCFDataset], bool) -> (typing.Callable, str, int)
def make_input_fn(
ncf_dataset, # type: typing.Optional[NCFDataset]
is_training, # type: bool
record_files=None # type: typing.Optional[tf.Tensor]
# type: (...) -> (typing.Callable, str, int)
"""Construct training input_fn for the current epoch."""
if ncf_dataset is None:
return make_synthetic_input_fn(is_training)
if not tf.gfile.Exists(ncf_dataset.cache_paths.subproc_alive):
# The generation subprocess must have been alive at some point, because we
# earlier checked that the subproc_alive file existed.
raise ValueError("Generation subprocess unexpectedly died. Data will not "
"be available; exiting to avoid waiting forever.")
if is_training:
train_epoch_dir = ncf_dataset.cache_paths.train_epoch_dir
while not tf.gfile.Exists(train_epoch_dir):
tf.logging.info("Waiting for {} to exist.".format(train_epoch_dir))
train_data_dirs = tf.gfile.ListDirectory(train_epoch_dir)
while not train_data_dirs:
tf.logging.info("Waiting for data folder to be created.")
train_data_dirs = tf.gfile.ListDirectory(train_epoch_dir)
train_data_dirs.sort() # names are zfilled so that
# lexicographic sort == numeric sort
record_dir = os.path.join(train_epoch_dir, train_data_dirs[0])
template = rconst.TRAIN_RECORD_TEMPLATE
if record_files is not None:
epoch_metadata = None
batch_count = None
record_dir = None
record_dir = ncf_dataset.cache_paths.eval_data_subdir
template = rconst.EVAL_RECORD_TEMPLATE
ready_file = os.path.join(record_dir, rconst.READY_FILE)
while not tf.gfile.Exists(ready_file):
tf.logging.info("Waiting for records in {} to be ready".format(record_dir))
epoch_metadata, record_dir, template = get_epoch_info(is_training,
record_files = os.path.join(record_dir, template.format("*"))
# This value is used to check that the batch count from the subprocess
# matches the batch count expected by the main thread.
batch_count = epoch_metadata["batch_count"]
with tf.gfile.Open(ready_file, "r") as f:
epoch_metadata = json.load(f)
# This value is used to check that the batch count from the subprocess matches
# the batch count expected by the main thread.
batch_count = epoch_metadata["batch_count"]
def input_fn(params):
"""Generated input_fn for the given epoch."""
......@@ -646,15 +626,13 @@ def make_input_fn(ncf_dataset, is_training):
# populates "batch_size" to the appropriate value.
batch_size = params.get("eval_batch_size") or params["batch_size"]
if epoch_metadata["batch_size"] != batch_size:
if epoch_metadata and epoch_metadata["batch_size"] != batch_size:
raise ValueError(
"Records were constructed with batch size {}, but input_fn was given "
"a batch size of {}. This will result in a deserialization error in "
.format(epoch_metadata["batch_size"], batch_size))
record_files = tf.data.Dataset.list_files(
os.path.join(record_dir, template.format("*")), shuffle=False)
record_files_ds = tf.data.Dataset.list_files(record_files, shuffle=False)
interleave = tf.contrib.data.parallel_interleave(
......@@ -665,7 +643,7 @@ def make_input_fn(ncf_dataset, is_training):
deserialize = make_deserialize(params, batch_size, is_training)
dataset = record_files.apply(interleave)
dataset = record_files_ds.apply(interleave)
dataset = dataset.map(deserialize, num_parallel_calls=4)
dataset = dataset.prefetch(32)
......@@ -677,6 +655,54 @@ def make_input_fn(ncf_dataset, is_training):
return input_fn, record_dir, batch_count
def get_epoch_info(is_training, ncf_dataset):
"""Wait for the epoch input data to be ready and return various info about it.
is_training: If we should return info for a training or eval epoch.
ncf_dataset: An NCFDataset.
epoch_metadata: A dict with epoch metadata.
record_dir: The directory with the TFRecord files storing the input data.
template: A string template of the files in `record_dir`.
`template.format('*')` is a glob that matches all the record files.
if not tf.gfile.Exists(ncf_dataset.cache_paths.subproc_alive):
# The generation subprocess must have been alive at some point, because we
# earlier checked that the subproc_alive file existed.
raise ValueError("Generation subprocess unexpectedly died. Data will not "
"be available; exiting to avoid waiting forever.")
if is_training:
train_epoch_dir = ncf_dataset.cache_paths.train_epoch_dir
while not tf.gfile.Exists(train_epoch_dir):
tf.logging.info("Waiting for {} to exist.".format(train_epoch_dir))
train_data_dirs = tf.gfile.ListDirectory(train_epoch_dir)
while not train_data_dirs:
tf.logging.info("Waiting for data folder to be created.")
train_data_dirs = tf.gfile.ListDirectory(train_epoch_dir)
train_data_dirs.sort() # names are zfilled so that
# lexicographic sort == numeric sort
record_dir = os.path.join(train_epoch_dir, train_data_dirs[0])
template = rconst.TRAIN_RECORD_TEMPLATE
record_dir = ncf_dataset.cache_paths.eval_data_subdir
template = rconst.EVAL_RECORD_TEMPLATE
ready_file = os.path.join(record_dir, rconst.READY_FILE)
while not tf.gfile.Exists(ready_file):
tf.logging.info("Waiting for records in {} to be ready".format(record_dir))
with tf.gfile.Open(ready_file, "r") as f:
epoch_metadata = json.load(f)
return epoch_metadata, record_dir, template
def make_synthetic_input_fn(is_training):
"""Construct training input_fn that uses synthetic data."""
def input_fn(params):
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Contains NcfModelRunner, which can train and evaluate an NCF model."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from collections import namedtuple
import os
import time
import tensorflow as tf
from tensorflow.contrib.compiler import xla
from official.recommendation import data_preprocessing
from official.recommendation import neumf_model
class NcfModelRunner(object):
"""Creates a graph to train/evaluate an NCF model, and runs it.
This class builds both a training model and evaluation model in the graph.
The two models share variables, so that during evaluation, the trained
variables are used.
# _TrainModelProperties and _EvalModelProperties store useful properties of
# the training and evaluation models, respectively.
# _SHARED_MODEL_PROPERTY_FIELDS is their shared fields.
# A scalar tf.string placeholder tensor, that will be fed the path to the
# directory storing the TFRecord files for the input data.
# The tf.data.Iterator to iterate over the input data.
# A scalar float tensor representing the model loss.
# The batch size, as a Python int.
# The op to run the model. For the training model, this trains the model
# for one step. For the evaluation model, this computes the metrics and
# updates the metric variables.
_TrainModelProperties = namedtuple("_TrainModelProperties", # pylint: disable=invalid-name
_EvalModelProperties = namedtuple( # pylint: disable=invalid-name
"_EvalModelProperties", _SHARED_MODEL_PROPERTY_FIELDS + (
# A dict from metric name to (metric, update_op) tuple.
# Initializes the metric variables.
def __init__(self, ncf_dataset, params):
with tf.Graph().as_default() as self._graph:
if params["use_xla_for_gpu"]:
# The XLA functions we use require resource variables.
self._ncf_dataset = ncf_dataset
self._global_step = tf.train.create_global_step()
self._train_model_properties = self._build_model(params, is_training=True)
self._eval_model_properties = self._build_model(params, is_training=False)
initializer = tf.global_variables_initializer()
self._session = tf.Session(graph=self._graph)
def _build_model(self, params, is_training):
"""Builds the NCF model.
params: A dict of hyperparameters.
is_training: If True, build the training model. If False, build the
evaluation model.
A _TrainModelProperties if is_training is True, or an _EvalModelProperties
record_files_placeholder = tf.placeholder(tf.string, ())
input_fn, _, _ = \
ncf_dataset=self._ncf_dataset, is_training=is_training,
dataset = input_fn(params)
iterator = dataset.make_initializable_iterator()
model_fn = neumf_model.neumf_model_fn
if params["use_xla_for_gpu"]:
model_fn = xla.estimator_model_fn(model_fn)
if is_training:
features, labels = iterator.get_next()
estimator_spec = model_fn(
features, labels, tf.estimator.ModeKeys.TRAIN, params)
with tf.control_dependencies([estimator_spec.train_op]):
run_model_op = self._global_step.assign_add(1)
return self._TrainModelProperties(
record_files_placeholder, iterator,
estimator_spec.loss, params["batch_size"], run_model_op)
features = iterator.get_next()
estimator_spec = model_fn(
features, None, tf.estimator.ModeKeys.EVAL, params)
run_model_op = tf.group(*(update_op for _, update_op in
metric_initializer = tf.variables_initializer(
return self._EvalModelProperties(
record_files_placeholder, iterator, estimator_spec.loss,
params["eval_batch_size"], run_model_op,
estimator_spec.eval_metric_ops, metric_initializer)
def _train_or_eval(self, model_properties, num_steps, is_training):
"""Either trains or evaluates, depending on whether `is_training` is True.
model_properties: _TrainModelProperties or an _EvalModelProperties
containing the properties of the training or evaluation graph.
num_steps: The number of steps to train or evaluate for.
is_training: If True, run the training model. If False, run the evaluation
record_dir: The directory of TFRecords where the training/evaluation input
data was read from.
if self._ncf_dataset is not None:
epoch_metadata, record_dir, template = data_preprocessing.get_epoch_info(
is_training=is_training, ncf_dataset=self._ncf_dataset)
batch_count = epoch_metadata["batch_count"]
if batch_count != num_steps:
raise ValueError(
"Step counts do not match. ({} vs. {}) The async process is "
"producing incorrect shards.".format(batch_count, num_steps))
record_files = os.path.join(record_dir, template.format("*"))
initializer_feed_dict = {
model_properties.record_files_placeholder: record_files}
del batch_count
initializer_feed_dict = None
record_dir = None
fetches = (model_properties.loss, model_properties.run_model_op)
mode = "Train" if is_training else "Eval"
start = None
for i in range(num_steps):
loss, _, = self._session.run(fetches)
if i % 100 == 0:
if start is None:
# Only start the timer after 100 steps so there is a warmup.
start = time.time()
start_step = i
tf.logging.info("{} Loss = {}".format(mode, loss))
end = time.time()
if start is not None:
print("{} peformance: {} examples/sec".format(
mode, (i - start_step) * model_properties.batch_size / (end - start)))
return record_dir
def train(self, num_train_steps):
"""Trains the graph for a single cycle.
num_train_steps: The number of steps per cycle to train for.
record_dir = self._train_or_eval(self._train_model_properties,
num_train_steps, is_training=True)
if record_dir:
# We delete the record_dir because each cycle, new TFRecords is generated
# by the async process.
def eval(self, num_eval_steps):
"""Evaluates the graph on the eval data.
num_eval_steps: The number of steps to evaluate for.
A dict of evaluation results.
self._train_or_eval(self._eval_model_properties, num_eval_steps,
eval_results = {
'global_step': self._session.run(self._global_step)}
for key, (val, _) in self._eval_model_properties.metrics.items():
val_ = self._session.run(val)
tf.logging.info("{} = {}".format(key, self._session.run(val)))
eval_results[key] = val_
return eval_results
......@@ -41,6 +41,7 @@ from tensorflow.contrib.compiler import xla
from official.datasets import movielens
from official.recommendation import constants as rconst
from official.recommendation import data_preprocessing
from official.recommendation import model_runner
from official.recommendation import neumf_model
from official.utils.flags import core as flags_core
from official.utils.logs import hooks_helper
......@@ -177,30 +178,36 @@ def run_ncf(_):
train_estimator, eval_estimator = construct_estimator(
num_gpus=num_gpus, model_dir=FLAGS.model_dir, params={
"use_seed": FLAGS.seed is not None,
"hash_pipeline": FLAGS.hash_pipeline,
"batch_size": batch_size,
"eval_batch_size": eval_batch_size,
"learning_rate": FLAGS.learning_rate,
"num_users": num_users,
"num_items": num_items,
"mf_dim": FLAGS.num_factors,
"model_layers": [int(layer) for layer in FLAGS.layers],
"mf_regularization": FLAGS.mf_regularization,
"mlp_reg_layers": [float(reg) for reg in FLAGS.mlp_regularization],
"num_neg": FLAGS.num_neg,
"use_tpu": FLAGS.tpu is not None,
"tpu": FLAGS.tpu,
"tpu_zone": FLAGS.tpu_zone,
"tpu_gcp_project": FLAGS.tpu_gcp_project,
"beta1": FLAGS.beta1,
"beta2": FLAGS.beta2,
"epsilon": FLAGS.epsilon,
"match_mlperf": FLAGS.ml_perf,
"use_xla_for_gpu": FLAGS.use_xla_for_gpu,
}, batch_size=flags.FLAGS.batch_size, eval_batch_size=eval_batch_size)
params = {
"use_seed": FLAGS.seed is not None,
"hash_pipeline": FLAGS.hash_pipeline,
"batch_size": batch_size,
"eval_batch_size": eval_batch_size,
"learning_rate": FLAGS.learning_rate,
"num_users": num_users,
"num_items": num_items,
"mf_dim": FLAGS.num_factors,
"model_layers": [int(layer) for layer in FLAGS.layers],
"mf_regularization": FLAGS.mf_regularization,
"mlp_reg_layers": [float(reg) for reg in FLAGS.mlp_regularization],
"num_neg": FLAGS.num_neg,
"use_tpu": FLAGS.tpu is not None,
"tpu": FLAGS.tpu,
"tpu_zone": FLAGS.tpu_zone,
"tpu_gcp_project": FLAGS.tpu_gcp_project,
"beta1": FLAGS.beta1,
"beta2": FLAGS.beta2,
"epsilon": FLAGS.epsilon,
"match_mlperf": FLAGS.ml_perf,
"use_xla_for_gpu": FLAGS.use_xla_for_gpu,
"use_estimator": FLAGS.use_estimator,
if FLAGS.use_estimator:
train_estimator, eval_estimator = construct_estimator(
num_gpus=num_gpus, model_dir=FLAGS.model_dir, params=params,
batch_size=flags.FLAGS.batch_size, eval_batch_size=eval_batch_size)
runner = model_runner.NcfModelRunner(ncf_dataset, params)
# Create hooks that log information about the training and metric values
train_hooks = hooks_helper.get_train_hooks(
......@@ -237,37 +244,46 @@ def run_ncf(_):
# Train the model
train_input_fn, train_record_dir, batch_count = \
ncf_dataset=ncf_dataset, is_training=True)
if batch_count != num_train_steps:
raise ValueError(
"Step counts do not match. ({} vs. {}) The async process is "
"producing incorrect shards.".format(batch_count, num_train_steps))
train_estimator.train(input_fn=train_input_fn, hooks=train_hooks,
if train_record_dir:
tf.logging.info("Beginning evaluation.")
if pred_input_fn is None:
pred_input_fn, _, eval_batch_count = data_preprocessing.make_input_fn(
ncf_dataset=ncf_dataset, is_training=False)
if eval_batch_count != num_eval_steps:
if FLAGS.use_estimator:
train_input_fn, train_record_dir, batch_count = \
ncf_dataset=ncf_dataset, is_training=True)
if batch_count != num_train_steps:
raise ValueError(
"Step counts do not match. ({} vs. {}) The async process is "
"producing incorrect shards.".format(
eval_batch_count, num_eval_steps))
eval_results = eval_estimator.evaluate(pred_input_fn, steps=num_eval_steps)
"producing incorrect shards.".format(batch_count, num_train_steps))
train_estimator.train(input_fn=train_input_fn, hooks=train_hooks,
if train_record_dir:
tf.logging.info("Beginning evaluation.")
if pred_input_fn is None:
pred_input_fn, _, eval_batch_count = data_preprocessing.make_input_fn(
ncf_dataset=ncf_dataset, is_training=False)
if eval_batch_count != num_eval_steps:
raise ValueError(
"Step counts do not match. ({} vs. {}) The async process is "
"producing incorrect shards.".format(
eval_batch_count, num_eval_steps))
eval_results = eval_estimator.evaluate(pred_input_fn,
tf.logging.info("Evaluation complete.")
tf.logging.info("Beginning evaluation.")
eval_results = runner.eval(num_eval_steps)
tf.logging.info("Evaluation complete.")
hr = float(eval_results[rconst.HR_KEY])
ndcg = float(eval_results[rconst.NDCG_KEY])
tf.logging.info("Evaluation complete.")
......@@ -472,6 +488,15 @@ def define_ncf_flags():
def xla_validator(flag_dict):
return not flag_dict["use_xla_for_gpu"] or not flag_dict["tpu"]
name="use_estimator", default=True, help=flags_core.help_wrap(
"If True, use Estimator to train. Setting to False is slightly "
"faster, but when False, the following are currently unsupported:\n"
" * Using TPUs\n"
" * Using more than 1 GPU\n"
" * Reloading from checkpoints\n"
" * Any hooks specified with --hooks\n"))
if __name__ == "__main__":
......@@ -24,6 +24,7 @@ import mock
import numpy as np
import tensorflow as tf
from absl import flags
from absl.testing import flagsaver
from official.recommendation import constants as rconst
from official.recommendation import data_preprocessing
......@@ -249,6 +250,13 @@ class NcfTest(tf.test.TestCase):
def test_end_to_end_mlperf(self):
@flagsaver.flagsaver(use_estimator=False, **_BASE_END_TO_END_FLAGS)
@mock.patch.object(data_preprocessing, "SYNTHETIC_BATCHES_PER_EPOCH", 100)
def test_end_to_end_no_estimator(self):
flags.FLAGS.ml_perf = True
if __name__ == "__main__":
......@@ -78,7 +78,18 @@ def neumf_model_fn(features, labels, mode, params):
users = features[movielens.USER_COLUMN]
items = tf.cast(features[movielens.ITEM_COLUMN], tf.int32)
logits = construct_model(users=users, items=items, params=params)
keras_model = params.get("keras_model")
if keras_model:
logits = keras_model([users, items],
training=mode == tf.estimator.ModeKeys.TRAIN)
keras_model = construct_model(users=users, items=items, params=params)
logits = keras_model.output
if not params["use_estimator"] and "keras_model" not in params:
# When we are not using estimator, we need to reuse the Keras model when
# this model_fn is called again, so that the variables are shared between
# training and eval. So we mutate params to add the Keras model.
params["keras_model"] = keras_model
# Softmax with the first column of zeros is equivalent to sigmoid.
softmax_logits = tf.concat([tf.zeros(logits.shape, dtype=logits.dtype),
......@@ -242,10 +253,11 @@ def construct_model(users, items, params):
# Print model topology.
tf.keras.models.Model([user_input, item_input], logits).summary()
model = tf.keras.models.Model([user_input, item_input], logits)
return logits
return model
def compute_eval_loss_and_metrics(logits, # type: tf.Tensor
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册