未验证 提交 780f5265 编写于 作者: T Taylor Robie 提交者: GitHub

Add logging calls to NCF (#5576)

* first pass at __getattr__ abuse logger

* first pass at adding tags to NCF

* minor formatting updates

* fix tag name

* convert metrics to python floats

* getting closer...

* direct mlperf logs to a file

* small tweaks and add stitching

* update tags

* fix tag and add a sudo call

* tweak format of run.sh

* delint

* use distribution strategies for evaluation

* address PR comments

* delint and fix test

* adjust flag validation for xla

* add prefix to distinguish log stitching

* fix index bug

* fix clear cache for root user

* dockerize cache drop

* TIL some regex magic
上级 f2b702a0
......@@ -44,6 +44,7 @@ from official.datasets import movielens
from official.recommendation import constants as rconst
from official.recommendation import stat_utils
from official.recommendation import popen_helper
from official.utils.logs import mlperf_helper
_log_file = None
......@@ -222,11 +223,25 @@ def _construct_records(
"""
st = timeit.default_timer()
if not is_training:
if is_training:
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.INPUT_STEP_TRAIN_NEG_GEN)
mlperf_helper.ncf_print(
key=mlperf_helper.TAGS.INPUT_HP_NUM_NEG, value=num_neg)
# set inside _process_shard()
mlperf_helper.ncf_print(
key=mlperf_helper.TAGS.INPUT_HP_SAMPLE_TRAIN_REPLACEMENT, value=True)
else:
# Later logic assumes that all items for a given user are in the same batch.
assert not batch_size % (rconst.NUM_EVAL_NEGATIVES + 1)
assert num_neg == rconst.NUM_EVAL_NEGATIVES
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.INPUT_STEP_EVAL_NEG_GEN)
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_HP_NUM_USERS,
value=num_positives)
assert epochs_per_cycle == 1 or is_training
num_workers = min([num_workers, len(training_shards) * epochs_per_cycle])
......@@ -259,6 +274,7 @@ def _construct_records(
# user is grouped within a batch.
if is_training:
index_destinations = np.random.permutation(num_pts)
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.INPUT_ORDER)
else:
index_destinations = np.arange(num_pts)
......@@ -276,6 +292,8 @@ def _construct_records(
if num_padding:
# In order to have a full batch, randomly include points from earlier in
# the batch.
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.INPUT_ORDER)
pad_sample_indices = np.random.randint(
low=0, high=num_pts, size=(num_padding,))
dest = np.arange(start=start_ind, stop=start_ind + num_padding)
......@@ -287,10 +305,20 @@ def _construct_records(
# to interpret and discard the zero padded entries.
data[0][num_pts:] = 0
# Check that no points were overlooked.
# Check that no points were overlooked.
assert not np.sum(data[0] == -1)
if is_training:
# The number of points is slightly larger than num_pts due to padding.
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.INPUT_SIZE,
value=int(data[0].shape[0]))
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.INPUT_BATCH_SIZE,
value=batch_size)
else:
# num_pts is logged instead of int(data[0].shape[0]), because the size
# of the data vector includes zero pads which are ignored.
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_SIZE, value=num_pts)
batches_per_file = np.ceil(num_pts_with_padding / batch_size / num_readers)
current_file_id = -1
current_batch_id = -1
......@@ -316,6 +344,7 @@ def _construct_records(
if is_training:
# Empirically it is observed that placing the batch with repeated values at
# the start rather than the end improves convergence.
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.INPUT_ORDER)
batches_by_file[0][0], batches_by_file[-1][-1] = \
batches_by_file[-1][-1], batches_by_file[0][0]
......@@ -389,17 +418,6 @@ def _generation_loop(num_workers, # type: int
# type: (...) -> None
"""Primary run loop for data file generation."""
log_msg("Signaling that I am alive.")
with tf.gfile.Open(cache_paths.subproc_alive, "w") as f:
f.write("Generation subproc has started.")
@atexit.register
def remove_alive_file():
try:
tf.gfile.Remove(cache_paths.subproc_alive)
except tf.errors.NotFoundError:
return # Main thread has already deleted the entire cache dir.
log_msg("Entering generation loop.")
tf.gfile.MakeDirs(cache_paths.train_epoch_dir)
tf.gfile.MakeDirs(cache_paths.eval_data_subdir)
......@@ -484,10 +502,29 @@ def _parse_flagfile(flagfile):
tf.gfile.Remove(flagfile_temp)
def write_alive_file(cache_paths):
"""Write file to signal that generation process started correctly."""
log_msg("Signaling that I am alive.")
with tf.gfile.Open(cache_paths.subproc_alive, "w") as f:
f.write("Generation subproc has started.")
@atexit.register
def remove_alive_file():
try:
tf.gfile.Remove(cache_paths.subproc_alive)
except tf.errors.NotFoundError:
return # Main thread has already deleted the entire cache dir.
def main(_):
# Note: The async process must execute the following two steps in the
# following order BEFORE doing anything else:
# 1) Write the alive file
# 2) Wait for the flagfile to be written.
global _log_file
cache_paths = rconst.Paths(
data_dir=flags.FLAGS.data_dir, cache_id=flags.FLAGS.cache_id)
write_alive_file(cache_paths=cache_paths)
flagfile = os.path.join(cache_paths.cache_root, rconst.FLAGFILE)
_parse_flagfile(flagfile)
......@@ -513,20 +550,22 @@ def main(_):
if flags.FLAGS.seed is not None:
np.random.seed(flags.FLAGS.seed)
_generation_loop(
num_workers=flags.FLAGS.num_workers,
cache_paths=cache_paths,
num_readers=flags.FLAGS.num_readers,
num_neg=flags.FLAGS.num_neg,
num_train_positives=flags.FLAGS.num_train_positives,
num_items=flags.FLAGS.num_items,
num_users=flags.FLAGS.num_users,
epochs_per_cycle=flags.FLAGS.epochs_per_cycle,
train_batch_size=flags.FLAGS.train_batch_size,
eval_batch_size=flags.FLAGS.eval_batch_size,
deterministic=flags.FLAGS.seed is not None,
match_mlperf=flags.FLAGS.ml_perf,
)
with mlperf_helper.LOGGER(enable=flags.FLAGS.ml_perf):
mlperf_helper.set_ncf_root(os.path.split(os.path.abspath(__file__))[0])
_generation_loop(
num_workers=flags.FLAGS.num_workers,
cache_paths=cache_paths,
num_readers=flags.FLAGS.num_readers,
num_neg=flags.FLAGS.num_neg,
num_train_positives=flags.FLAGS.num_train_positives,
num_items=flags.FLAGS.num_items,
num_users=flags.FLAGS.num_users,
epochs_per_cycle=flags.FLAGS.epochs_per_cycle,
train_batch_size=flags.FLAGS.train_batch_size,
eval_batch_size=flags.FLAGS.eval_batch_size,
deterministic=flags.FLAGS.seed is not None,
match_mlperf=flags.FLAGS.ml_perf,
)
except KeyboardInterrupt:
log_msg("KeyboardInterrupt registered.")
except:
......
......@@ -46,6 +46,7 @@ from official.datasets import movielens
from official.recommendation import constants as rconst
from official.recommendation import stat_utils
from official.recommendation import popen_helper
from official.utils.logs import mlperf_helper
DATASET_TO_NUM_USERS_AND_ITEMS = {
......@@ -134,6 +135,9 @@ def _filter_index_sort(raw_rating_path, match_mlperf):
original_users = df[movielens.USER_COLUMN].unique()
original_items = df[movielens.ITEM_COLUMN].unique()
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.PREPROC_HP_MIN_RATINGS,
value=rconst.MIN_NUM_RATINGS)
# Map the ids of user and item to 0 based index for following processing
tf.logging.info("Generating user_map and item_map...")
user_map = {user: index for index, user in enumerate(original_users)}
......@@ -147,6 +151,12 @@ def _filter_index_sort(raw_rating_path, match_mlperf):
num_users = len(original_users)
num_items = len(original_items)
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.PREPROC_HP_NUM_EVAL,
value=num_users * (1 + rconst.NUM_EVAL_NEGATIVES))
mlperf_helper.ncf_print(
key=mlperf_helper.TAGS.PREPROC_HP_SAMPLE_EVAL_REPLACEMENT,
value=match_mlperf)
assert num_users <= np.iinfo(np.int32).max
assert num_items <= np.iinfo(np.uint16).max
assert df[movielens.USER_COLUMN].max() == num_users - 1
......@@ -397,6 +407,27 @@ def _shutdown(proc):
tf.logging.error("Data generation subprocess could not be killed.")
def write_flagfile(flags_, ncf_dataset):
"""Write flagfile to begin async data generation."""
if ncf_dataset.deterministic:
flags_["seed"] = stat_utils.random_int32()
# We write to a temp file then atomically rename it to the final file,
# because writing directly to the final file can cause the data generation
# async process to read a partially written JSON file.
flagfile_temp = os.path.join(ncf_dataset.cache_paths.cache_root,
rconst.FLAGFILE_TEMP)
tf.logging.info("Preparing flagfile for async data generation in {} ..."
.format(flagfile_temp))
with tf.gfile.Open(flagfile_temp, "w") as f:
for k, v in six.iteritems(flags_):
f.write("--{}={}\n".format(k, v))
flagfile = os.path.join(ncf_dataset.cache_paths.cache_root, rconst.FLAGFILE)
tf.gfile.Rename(flagfile_temp, flagfile)
tf.logging.info(
"Wrote flagfile for async data generation in {}.".format(flagfile))
def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size,
num_data_readers=None, num_neg=4, epochs_per_cycle=1,
match_mlperf=False, deterministic=False,
......@@ -405,6 +436,7 @@ def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size,
"""Preprocess data and start negative generation subprocess."""
tf.logging.info("Beginning data preprocessing.")
tf.gfile.MakeDirs(data_dir)
ncf_dataset = construct_cache(dataset=dataset, data_dir=data_dir,
num_data_readers=num_data_readers,
match_mlperf=match_mlperf,
......@@ -431,25 +463,6 @@ def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size,
"ml_perf": match_mlperf,
}
if ncf_dataset.deterministic:
flags_["seed"] = stat_utils.random_int32()
tf.gfile.MakeDirs(data_dir)
# We write to a temp file then atomically rename it to the final file,
# because writing directly to the final file can cause the data generation
# async process to read a partially written JSON file.
flagfile_temp = os.path.join(ncf_dataset.cache_paths.cache_root,
rconst.FLAGFILE_TEMP)
tf.logging.info("Preparing flagfile for async data generation in {} ..."
.format(flagfile_temp))
with tf.gfile.Open(flagfile_temp, "w") as f:
for k, v in six.iteritems(flags_):
f.write("--{}={}\n".format(k, v))
flagfile = os.path.join(ncf_dataset.cache_paths.cache_root, rconst.FLAGFILE)
tf.gfile.Rename(flagfile_temp, flagfile)
tf.logging.info(
"Wrote flagfile for async data generation in {}."
.format(flagfile))
if use_subprocess:
tf.logging.info("Creating training file subprocess.")
subproc_env = os.environ.copy()
......@@ -489,6 +502,14 @@ def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size,
raise ValueError("Generation subprocess did not start correctly. Data will "
"not be available; exiting to avoid waiting forever.")
# We start the async process and wait for it to signal that it is alive. It
# will then enter a loop waiting for the flagfile to be written. Once we see
# that the async process has signaled that it is alive, we clear the system
# caches and begin the run.
mlperf_helper.clear_system_caches()
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.RUN_START)
write_flagfile(flags_, ncf_dataset)
return ncf_dataset, cleanup
......
......@@ -45,6 +45,7 @@ from official.recommendation import neumf_model
from official.utils.flags import core as flags_core
from official.utils.logs import hooks_helper
from official.utils.logs import logger
from official.utils.logs import mlperf_helper
from official.utils.misc import distribution_utils
from official.utils.misc import model_helpers
......@@ -104,7 +105,8 @@ def construct_estimator(num_gpus, model_dir, params, batch_size,
return train_estimator, eval_estimator
distribution = distribution_utils.get_distribution_strategy(num_gpus=num_gpus)
run_config = tf.estimator.RunConfig(train_distribute=distribution)
run_config = tf.estimator.RunConfig(train_distribute=distribution,
eval_distribute=distribution)
params["eval_batch_size"] = eval_batch_size
model_fn = neumf_model.neumf_model_fn
if params["use_xla_for_gpu"]:
......@@ -116,8 +118,10 @@ def construct_estimator(num_gpus, model_dir, params, batch_size,
def main(_):
with logger.benchmark_context(FLAGS):
with logger.benchmark_context(FLAGS), mlperf_helper.LOGGER(FLAGS.ml_perf):
mlperf_helper.set_ncf_root(os.path.split(os.path.abspath(__file__))[0])
run_ncf(FLAGS)
mlperf_helper.stitch_ncf()
def run_ncf(_):
......@@ -218,10 +222,16 @@ def run_ncf(_):
pred_input_fn = None
total_training_cycle = FLAGS.train_epochs // FLAGS.epochs_between_evals
target_reached = False
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.TRAIN_LOOP)
for cycle_index in range(total_training_cycle):
assert FLAGS.epochs_between_evals == 1 or not mlperf_helper.LOGGER.enabled
tf.logging.info("Starting a training cycle: {}/{}".format(
cycle_index + 1, total_training_cycle))
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.TRAIN_EPOCH,
value=cycle_index)
# Train the model
train_input_fn, train_record_dir, batch_count = \
data_preprocessing.make_input_fn(
......@@ -248,27 +258,49 @@ def run_ncf(_):
"producing incorrect shards.".format(
eval_batch_count, num_eval_steps))
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_START,
value=cycle_index)
eval_results = eval_estimator.evaluate(pred_input_fn, steps=num_eval_steps)
hr = float(eval_results[rconst.HR_KEY])
ndcg = float(eval_results[rconst.NDCG_KEY])
tf.logging.info("Evaluation complete.")
mlperf_helper.ncf_print(
key=mlperf_helper.TAGS.EVAL_TARGET,
value={"epoch": cycle_index, "value": FLAGS.hr_threshold})
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_ACCURACY,
value={"epoch": cycle_index, "value": hr})
mlperf_helper.ncf_print(
key=mlperf_helper.TAGS.EVAL_HP_NUM_NEG,
value={"epoch": cycle_index, "value": rconst.NUM_EVAL_NEGATIVES})
# Logged by the async process during record creation.
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_HP_NUM_USERS,
deferred=True)
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_STOP, value=cycle_index)
# Benchmark the evaluation results
benchmark_logger.log_evaluation_result(eval_results)
# Log the HR and NDCG results.
hr = eval_results[rconst.HR_KEY]
ndcg = eval_results[rconst.NDCG_KEY]
tf.logging.info(
"Iteration {}: HR = {:.4f}, NDCG = {:.4f}".format(
cycle_index + 1, hr, ndcg))
# If some evaluation threshold is met
if model_helpers.past_stop_threshold(FLAGS.hr_threshold, hr):
target_reached = True
break
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.RUN_STOP,
value={"success": target_reached})
cleanup_fn() # Cleanup data construction artifacts and subprocess.
# Clear the session explicitly to avoid session delete error
tf.keras.backend.clear_session()
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.RUN_FINAL)
def define_ncf_flags():
"""Add flags for running ncf_main."""
......@@ -419,7 +451,10 @@ def define_ncf_flags():
"If True, use XLA for the model function. Only works when using a "
"GPU. On TPUs, XLA is always used"))
flags.mark_flags_as_mutual_exclusive(["use_xla_for_gpu", "tpu"])
xla_message = "--use_xla_for_gpu is incompatible with --tpu"
@flags.multi_flags_validator(["use_xla_for_gpu", "tpu"], message=xla_message)
def xla_validator(flag_dict):
return not flag_dict["use_xla_for_gpu"] or not flag_dict["tpu"]
if __name__ == "__main__":
......
......@@ -42,6 +42,7 @@ import tensorflow as tf
from official.datasets import movielens # pylint: disable=g-bad-import-order
from official.recommendation import constants as rconst
from official.recommendation import stat_utils
from official.utils.logs import mlperf_helper
def _sparse_to_dense_grads(grads_and_vars):
......@@ -102,12 +103,25 @@ def neumf_model_fn(features, labels, mode, params):
elif mode == tf.estimator.ModeKeys.TRAIN:
labels = tf.cast(labels, tf.int32)
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.OPT_NAME, value="adam")
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.OPT_LR,
value=params["learning_rate"])
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.OPT_HP_ADAM_BETA1,
value=params["beta1"])
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.OPT_HP_ADAM_BETA2,
value=params["beta2"])
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.OPT_HP_ADAM_EPSILON,
value=params["epsilon"])
optimizer = tf.train.AdamOptimizer(
learning_rate=params["learning_rate"], beta1=params["beta1"],
beta2=params["beta2"], epsilon=params["epsilon"])
if params["use_tpu"]:
optimizer = tf.contrib.tpu.CrossShardOptimizer(optimizer)
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.MODEL_HP_LOSS_FN,
value=mlperf_helper.TAGS.BCE)
loss = tf.losses.sparse_softmax_cross_entropy(
labels=labels,
logits=softmax_logits
......@@ -158,6 +172,10 @@ def construct_model(users, items, params):
mf_dim = params["mf_dim"]
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.MODEL_HP_MF_DIM, value=mf_dim)
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.MODEL_HP_MLP_LAYER_SIZES,
value=model_layers)
if model_layers[0] % 2 != 0:
raise ValueError("The first layer size should be multiple of 2!")
......
#!/bin/bash
set -e
if [ `id -u` != 0 ]; then
echo "Calling sudo to gain root for this shell. (Needed to clear caches.)"
sudo echo "Success"
fi
DATASET="ml-20m"
BUCKET=${BUCKET:-""}
......@@ -22,7 +27,7 @@ mkdir -p ${LOCAL_TEST_DIR}
TPU=${TPU:-""}
if [[ -z ${TPU} ]]; then
DEVICE_FLAG="--num_gpus -1"
DEVICE_FLAG="--num_gpus -1 --use_xla_for_gpu"
else
DEVICE_FLAG="--tpu ${TPU} --num_gpus 0"
fi
......@@ -38,9 +43,14 @@ do
MODEL_DIR="${TEST_DIR}/model_dir_${i}"
RUN_LOG="${LOCAL_TEST_DIR}/run_${i}.log"
export COMPLIANCE_FILE="${LOCAL_TEST_DIR}/run_${i}_compliance_raw.log"
export STITCHED_COMPLIANCE_FILE="${LOCAL_TEST_DIR}/run_${i}_compliance_submission.log"
echo ""
echo "Beginning run ${i}"
echo " Complete logs are in ${RUN_LOG}"
echo " Complete output logs are in ${RUN_LOG}"
echo " Compliance logs: (submission log is created after run.)"
echo " ${COMPLIANCE_FILE}"
echo " ${STITCHED_COMPLIANCE_FILE}"
# To reduce variation set the seed flag:
# --seed ${i}
......@@ -62,7 +72,7 @@ do
--hr_threshold 0.635 \
--ml_perf \
|& tee ${RUN_LOG} \
| grep --line-buffered -E --regexp="(Iteration [0-9]+: HR = [0-9\.]+, NDCG = [0-9\.]+)|(pipeline_hash)"
| grep --line-buffered -E --regexp="(Iteration [0-9]+: HR = [0-9\.]+, NDCG = [0-9\.]+)|(pipeline_hash)|(MLPerf time:)"
END_TIME=$(date +%s)
echo "Run ${i} complete: $(( $END_TIME - $START_TIME )) seconds."
......
google-api-python-client>=1.6.7
google-cloud-bigquery>=0.31.0
kaggle>=1.3.9
mlperf_compliance==0.0.8
numpy
oauth2client>=4.1.2
pandas
......
# Copyright 2018 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Wrapper for the mlperf logging utils.
MLPerf compliance logging is only desired under a limited set of circumstances.
This module is intended to keep users from needing to consider logging (or
install the module) unless they are performing mlperf runs.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from collections import namedtuple
import json
import os
import re
import subprocess
import sys
import typing
import tensorflow as tf
_MIN_VERSION = (0, 0, 6)
_STACK_OFFSET = 2
SUDO = "sudo" if os.geteuid() else ""
# This indirection is used in docker.
DROP_CACHE_LOC = os.getenv("DROP_CACHE_LOC", "/proc/sys/vm/drop_caches")
_NCF_PREFIX = "NCF_RAW_"
# TODO(robieta): move line parsing to mlperf util
_PREFIX = r"(?:{})?:::MLPv([0-9]+).([0-9]+).([0-9]+)".format(_NCF_PREFIX)
_BENCHMARK = r"([a-zA-Z0-9_]+)"
_TIMESTAMP = r"([0-9]+\.[0-9]+)"
_CALLSITE = r"\((.+):([0-9]+)\)"
_TAG = r"([a-zA-Z0-9_]+)"
_VALUE = r"(.*)"
ParsedLine = namedtuple("ParsedLine", ["version", "benchmark", "timestamp",
"callsite", "tag", "value"])
LINE_PATTERN = re.compile(
"^{prefix} {benchmark} {timestamp} {callsite} {tag}(: |$){value}?$".format(
prefix=_PREFIX, benchmark=_BENCHMARK, timestamp=_TIMESTAMP,
callsite=_CALLSITE, tag=_TAG, value=_VALUE))
def parse_line(line): # type: (str) -> typing.Optional[ParsedLine]
match = LINE_PATTERN.match(line.strip())
if not match:
return
major, minor, micro, benchmark, timestamp = match.groups()[:5]
call_file, call_line, tag, _, value = match.groups()[5:]
return ParsedLine(version=(int(major), int(minor), int(micro)),
benchmark=benchmark, timestamp=timestamp,
callsite=(call_file, call_line), tag=tag, value=value)
def unparse_line(parsed_line): # type: (ParsedLine) -> str
version_str = "{}.{}.{}".format(*parsed_line.version)
callsite_str = "({}:{})".format(*parsed_line.callsite)
value_str = ": {}".format(parsed_line.value) if parsed_line.value else ""
return ":::MLPv{} {} {} {} {} {}".format(
version_str, parsed_line.benchmark, parsed_line.timestamp, callsite_str,
parsed_line.tag, value_str)
def get_mlperf_log():
"""Shielded import of mlperf_log module."""
try:
import pkg_resources
import mlperf_compliance
version = pkg_resources.get_distribution("mlperf_compliance")
version = tuple(int(i) for i in version.version.split("."))
if version < _MIN_VERSION:
tf.logging.warning(
"mlperf_compliance is version {}, must be at least version {}".format(
".".join([str(i) for i in version]),
".".join([str(i) for i in _MIN_VERSION])))
raise ImportError
mlperf_log = mlperf_compliance.mlperf_log
except ImportError:
mlperf_log = None
return mlperf_log
class Logger(object):
"""MLPerf logger indirection class.
This logger only logs for MLPerf runs, and prevents various errors associated
with not having the mlperf_compliance package installed.
"""
class Tags(object):
def __init__(self, mlperf_log):
self._enabled = False
self._mlperf_log = mlperf_log
def __getattr__(self, item):
if self._mlperf_log is None or not self._enabled:
return
return getattr(self._mlperf_log, item)
def __init__(self):
self._enabled = False
self._mlperf_log = get_mlperf_log()
self.tags = self.Tags(self._mlperf_log)
def __call__(self, enable=False):
if enable and self._mlperf_log is None:
raise ImportError("MLPerf logging was requested, but mlperf_compliance "
"module could not be loaded.")
self._enabled = enable
self.tags._enabled = enable
return self
def __enter__(self):
pass
def __exit__(self, exc_type, exc_val, exc_tb):
self._enabled = False
self.tags._enabled = False
@property
def log_file(self):
if self._mlperf_log is None:
return
return self._mlperf_log.LOG_FILE
@property
def enabled(self):
return self._enabled
def ncf_print(self, key, value=None, stack_offset=_STACK_OFFSET,
deferred=False, extra_print=False, prefix=_NCF_PREFIX):
if self._mlperf_log is None or not self.enabled:
return
self._mlperf_log.ncf_print(key=key, value=value, stack_offset=stack_offset,
deferred=deferred, extra_print=extra_print,
prefix=prefix)
def set_ncf_root(self, path):
if self._mlperf_log is None:
return
self._mlperf_log.ROOT_DIR_NCF = path
LOGGER = Logger()
ncf_print, set_ncf_root = LOGGER.ncf_print, LOGGER.set_ncf_root
TAGS = LOGGER.tags
def clear_system_caches():
if not LOGGER.enabled:
return
ret_code = subprocess.call(
["sync && echo 3 | {} tee {}".format(SUDO, DROP_CACHE_LOC)],
shell=True)
if ret_code:
raise ValueError("Failed to clear caches")
def stitch_ncf():
"""Format NCF logs for MLPerf compliance."""
if not LOGGER.enabled:
return
if LOGGER.log_file is None or not tf.gfile.Exists(LOGGER.log_file):
tf.logging.error("Could not find log file to stitch.")
log_lines = []
num_eval_users = None
start_time = None
stop_time = None
with tf.gfile.Open(LOGGER.log_file, "r") as f:
for line in f:
parsed_line = parse_line(line)
if not parsed_line:
tf.logging.warning("Failed to parse line: {}".format(line))
continue
log_lines.append(parsed_line)
if parsed_line.tag == TAGS.RUN_START:
assert start_time is None
start_time = float(parsed_line.timestamp)
if parsed_line.tag == TAGS.RUN_STOP:
assert stop_time is None
stop_time = float(parsed_line.timestamp)
if (parsed_line.tag == TAGS.EVAL_HP_NUM_USERS and parsed_line.value
is not None and "DEFERRED" not in parsed_line.value):
assert num_eval_users is None or num_eval_users == parsed_line.value
num_eval_users = parsed_line.value
log_lines.pop()
for i, parsed_line in enumerate(log_lines):
if parsed_line.tag == TAGS.EVAL_HP_NUM_USERS:
log_lines[i] = ParsedLine(*parsed_line[:-1], value=num_eval_users)
log_lines = sorted([unparse_line(i) for i in log_lines])
output_path = os.getenv("STITCHED_COMPLIANCE_FILE", None)
if output_path:
with tf.gfile.Open(output_path, "w") as f:
for line in log_lines:
f.write(line + "\n")
else:
for line in log_lines:
print(line)
sys.stdout.flush()
if start_time is not None and stop_time is not None:
tf.logging.info("MLPerf time: {:.1f} sec.".format(stop_time - start_time))
if __name__ == "__main__":
tf.logging.set_verbosity(tf.logging.INFO)
with LOGGER(True):
ncf_print(key=TAGS.RUN_START)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册