From 780f526578abd587cd08b8ab88f8ba3e08a65958 Mon Sep 17 00:00:00 2001 From: Taylor Robie Date: Wed, 24 Oct 2018 11:51:43 -0700 Subject: [PATCH] Add logging calls to NCF (#5576) * first pass at __getattr__ abuse logger * first pass at adding tags to NCF * minor formatting updates * fix tag name * convert metrics to python floats * getting closer... * direct mlperf logs to a file * small tweaks and add stitching * update tags * fix tag and add a sudo call * tweak format of run.sh * delint * use distribution strategies for evaluation * address PR comments * delint and fix test * adjust flag validation for xla * add prefix to distinguish log stitching * fix index bug * fix clear cache for root user * dockerize cache drop * TIL some regex magic --- .../recommendation/data_async_generation.py | 95 +++++-- official/recommendation/data_preprocessing.py | 59 +++-- official/recommendation/ncf_main.py | 45 +++- official/recommendation/neumf_model.py | 18 ++ official/recommendation/run.sh | 16 +- official/requirements.txt | 1 + official/utils/logs/mlperf_helper.py | 242 ++++++++++++++++++ 7 files changed, 421 insertions(+), 55 deletions(-) create mode 100644 official/utils/logs/mlperf_helper.py diff --git a/official/recommendation/data_async_generation.py b/official/recommendation/data_async_generation.py index 7c882f773..422dbc607 100644 --- a/official/recommendation/data_async_generation.py +++ b/official/recommendation/data_async_generation.py @@ -44,6 +44,7 @@ from official.datasets import movielens from official.recommendation import constants as rconst from official.recommendation import stat_utils from official.recommendation import popen_helper +from official.utils.logs import mlperf_helper _log_file = None @@ -222,11 +223,25 @@ def _construct_records( """ st = timeit.default_timer() - if not is_training: + if is_training: + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.INPUT_STEP_TRAIN_NEG_GEN) + mlperf_helper.ncf_print( + key=mlperf_helper.TAGS.INPUT_HP_NUM_NEG, value=num_neg) + + # set inside _process_shard() + mlperf_helper.ncf_print( + key=mlperf_helper.TAGS.INPUT_HP_SAMPLE_TRAIN_REPLACEMENT, value=True) + + else: # Later logic assumes that all items for a given user are in the same batch. assert not batch_size % (rconst.NUM_EVAL_NEGATIVES + 1) assert num_neg == rconst.NUM_EVAL_NEGATIVES + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.INPUT_STEP_EVAL_NEG_GEN) + + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_HP_NUM_USERS, + value=num_positives) + assert epochs_per_cycle == 1 or is_training num_workers = min([num_workers, len(training_shards) * epochs_per_cycle]) @@ -259,6 +274,7 @@ def _construct_records( # user is grouped within a batch. if is_training: index_destinations = np.random.permutation(num_pts) + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.INPUT_ORDER) else: index_destinations = np.arange(num_pts) @@ -276,6 +292,8 @@ def _construct_records( if num_padding: # In order to have a full batch, randomly include points from earlier in # the batch. + + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.INPUT_ORDER) pad_sample_indices = np.random.randint( low=0, high=num_pts, size=(num_padding,)) dest = np.arange(start=start_ind, stop=start_ind + num_padding) @@ -287,10 +305,20 @@ def _construct_records( # to interpret and discard the zero padded entries. data[0][num_pts:] = 0 - # Check that no points were overlooked. - + # Check that no points were overlooked. assert not np.sum(data[0] == -1) + if is_training: + # The number of points is slightly larger than num_pts due to padding. + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.INPUT_SIZE, + value=int(data[0].shape[0])) + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.INPUT_BATCH_SIZE, + value=batch_size) + else: + # num_pts is logged instead of int(data[0].shape[0]), because the size + # of the data vector includes zero pads which are ignored. + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_SIZE, value=num_pts) + batches_per_file = np.ceil(num_pts_with_padding / batch_size / num_readers) current_file_id = -1 current_batch_id = -1 @@ -316,6 +344,7 @@ def _construct_records( if is_training: # Empirically it is observed that placing the batch with repeated values at # the start rather than the end improves convergence. + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.INPUT_ORDER) batches_by_file[0][0], batches_by_file[-1][-1] = \ batches_by_file[-1][-1], batches_by_file[0][0] @@ -389,17 +418,6 @@ def _generation_loop(num_workers, # type: int # type: (...) -> None """Primary run loop for data file generation.""" - log_msg("Signaling that I am alive.") - with tf.gfile.Open(cache_paths.subproc_alive, "w") as f: - f.write("Generation subproc has started.") - - @atexit.register - def remove_alive_file(): - try: - tf.gfile.Remove(cache_paths.subproc_alive) - except tf.errors.NotFoundError: - return # Main thread has already deleted the entire cache dir. - log_msg("Entering generation loop.") tf.gfile.MakeDirs(cache_paths.train_epoch_dir) tf.gfile.MakeDirs(cache_paths.eval_data_subdir) @@ -484,10 +502,29 @@ def _parse_flagfile(flagfile): tf.gfile.Remove(flagfile_temp) +def write_alive_file(cache_paths): + """Write file to signal that generation process started correctly.""" + log_msg("Signaling that I am alive.") + with tf.gfile.Open(cache_paths.subproc_alive, "w") as f: + f.write("Generation subproc has started.") + + @atexit.register + def remove_alive_file(): + try: + tf.gfile.Remove(cache_paths.subproc_alive) + except tf.errors.NotFoundError: + return # Main thread has already deleted the entire cache dir. + + def main(_): + # Note: The async process must execute the following two steps in the + # following order BEFORE doing anything else: + # 1) Write the alive file + # 2) Wait for the flagfile to be written. global _log_file cache_paths = rconst.Paths( data_dir=flags.FLAGS.data_dir, cache_id=flags.FLAGS.cache_id) + write_alive_file(cache_paths=cache_paths) flagfile = os.path.join(cache_paths.cache_root, rconst.FLAGFILE) _parse_flagfile(flagfile) @@ -513,20 +550,22 @@ def main(_): if flags.FLAGS.seed is not None: np.random.seed(flags.FLAGS.seed) - _generation_loop( - num_workers=flags.FLAGS.num_workers, - cache_paths=cache_paths, - num_readers=flags.FLAGS.num_readers, - num_neg=flags.FLAGS.num_neg, - num_train_positives=flags.FLAGS.num_train_positives, - num_items=flags.FLAGS.num_items, - num_users=flags.FLAGS.num_users, - epochs_per_cycle=flags.FLAGS.epochs_per_cycle, - train_batch_size=flags.FLAGS.train_batch_size, - eval_batch_size=flags.FLAGS.eval_batch_size, - deterministic=flags.FLAGS.seed is not None, - match_mlperf=flags.FLAGS.ml_perf, - ) + with mlperf_helper.LOGGER(enable=flags.FLAGS.ml_perf): + mlperf_helper.set_ncf_root(os.path.split(os.path.abspath(__file__))[0]) + _generation_loop( + num_workers=flags.FLAGS.num_workers, + cache_paths=cache_paths, + num_readers=flags.FLAGS.num_readers, + num_neg=flags.FLAGS.num_neg, + num_train_positives=flags.FLAGS.num_train_positives, + num_items=flags.FLAGS.num_items, + num_users=flags.FLAGS.num_users, + epochs_per_cycle=flags.FLAGS.epochs_per_cycle, + train_batch_size=flags.FLAGS.train_batch_size, + eval_batch_size=flags.FLAGS.eval_batch_size, + deterministic=flags.FLAGS.seed is not None, + match_mlperf=flags.FLAGS.ml_perf, + ) except KeyboardInterrupt: log_msg("KeyboardInterrupt registered.") except: diff --git a/official/recommendation/data_preprocessing.py b/official/recommendation/data_preprocessing.py index d6ac883a0..39a44473a 100644 --- a/official/recommendation/data_preprocessing.py +++ b/official/recommendation/data_preprocessing.py @@ -46,6 +46,7 @@ from official.datasets import movielens from official.recommendation import constants as rconst from official.recommendation import stat_utils from official.recommendation import popen_helper +from official.utils.logs import mlperf_helper DATASET_TO_NUM_USERS_AND_ITEMS = { @@ -134,6 +135,9 @@ def _filter_index_sort(raw_rating_path, match_mlperf): original_users = df[movielens.USER_COLUMN].unique() original_items = df[movielens.ITEM_COLUMN].unique() + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.PREPROC_HP_MIN_RATINGS, + value=rconst.MIN_NUM_RATINGS) + # Map the ids of user and item to 0 based index for following processing tf.logging.info("Generating user_map and item_map...") user_map = {user: index for index, user in enumerate(original_users)} @@ -147,6 +151,12 @@ def _filter_index_sort(raw_rating_path, match_mlperf): num_users = len(original_users) num_items = len(original_items) + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.PREPROC_HP_NUM_EVAL, + value=num_users * (1 + rconst.NUM_EVAL_NEGATIVES)) + mlperf_helper.ncf_print( + key=mlperf_helper.TAGS.PREPROC_HP_SAMPLE_EVAL_REPLACEMENT, + value=match_mlperf) + assert num_users <= np.iinfo(np.int32).max assert num_items <= np.iinfo(np.uint16).max assert df[movielens.USER_COLUMN].max() == num_users - 1 @@ -397,6 +407,27 @@ def _shutdown(proc): tf.logging.error("Data generation subprocess could not be killed.") + +def write_flagfile(flags_, ncf_dataset): + """Write flagfile to begin async data generation.""" + if ncf_dataset.deterministic: + flags_["seed"] = stat_utils.random_int32() + + # We write to a temp file then atomically rename it to the final file, + # because writing directly to the final file can cause the data generation + # async process to read a partially written JSON file. + flagfile_temp = os.path.join(ncf_dataset.cache_paths.cache_root, + rconst.FLAGFILE_TEMP) + tf.logging.info("Preparing flagfile for async data generation in {} ..." + .format(flagfile_temp)) + with tf.gfile.Open(flagfile_temp, "w") as f: + for k, v in six.iteritems(flags_): + f.write("--{}={}\n".format(k, v)) + flagfile = os.path.join(ncf_dataset.cache_paths.cache_root, rconst.FLAGFILE) + tf.gfile.Rename(flagfile_temp, flagfile) + tf.logging.info( + "Wrote flagfile for async data generation in {}.".format(flagfile)) + def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size, num_data_readers=None, num_neg=4, epochs_per_cycle=1, match_mlperf=False, deterministic=False, @@ -405,6 +436,7 @@ def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size, """Preprocess data and start negative generation subprocess.""" tf.logging.info("Beginning data preprocessing.") + tf.gfile.MakeDirs(data_dir) ncf_dataset = construct_cache(dataset=dataset, data_dir=data_dir, num_data_readers=num_data_readers, match_mlperf=match_mlperf, @@ -431,25 +463,6 @@ def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size, "ml_perf": match_mlperf, } - if ncf_dataset.deterministic: - flags_["seed"] = stat_utils.random_int32() - tf.gfile.MakeDirs(data_dir) - # We write to a temp file then atomically rename it to the final file, - # because writing directly to the final file can cause the data generation - # async process to read a partially written JSON file. - flagfile_temp = os.path.join(ncf_dataset.cache_paths.cache_root, - rconst.FLAGFILE_TEMP) - tf.logging.info("Preparing flagfile for async data generation in {} ..." - .format(flagfile_temp)) - with tf.gfile.Open(flagfile_temp, "w") as f: - for k, v in six.iteritems(flags_): - f.write("--{}={}\n".format(k, v)) - flagfile = os.path.join(ncf_dataset.cache_paths.cache_root, rconst.FLAGFILE) - tf.gfile.Rename(flagfile_temp, flagfile) - tf.logging.info( - "Wrote flagfile for async data generation in {}." - .format(flagfile)) - if use_subprocess: tf.logging.info("Creating training file subprocess.") subproc_env = os.environ.copy() @@ -489,6 +502,14 @@ def instantiate_pipeline(dataset, data_dir, batch_size, eval_batch_size, raise ValueError("Generation subprocess did not start correctly. Data will " "not be available; exiting to avoid waiting forever.") + # We start the async process and wait for it to signal that it is alive. It + # will then enter a loop waiting for the flagfile to be written. Once we see + # that the async process has signaled that it is alive, we clear the system + # caches and begin the run. + mlperf_helper.clear_system_caches() + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.RUN_START) + write_flagfile(flags_, ncf_dataset) + return ncf_dataset, cleanup diff --git a/official/recommendation/ncf_main.py b/official/recommendation/ncf_main.py index a9fd0da5f..ded3cb7c2 100644 --- a/official/recommendation/ncf_main.py +++ b/official/recommendation/ncf_main.py @@ -45,6 +45,7 @@ from official.recommendation import neumf_model from official.utils.flags import core as flags_core from official.utils.logs import hooks_helper from official.utils.logs import logger +from official.utils.logs import mlperf_helper from official.utils.misc import distribution_utils from official.utils.misc import model_helpers @@ -104,7 +105,8 @@ def construct_estimator(num_gpus, model_dir, params, batch_size, return train_estimator, eval_estimator distribution = distribution_utils.get_distribution_strategy(num_gpus=num_gpus) - run_config = tf.estimator.RunConfig(train_distribute=distribution) + run_config = tf.estimator.RunConfig(train_distribute=distribution, + eval_distribute=distribution) params["eval_batch_size"] = eval_batch_size model_fn = neumf_model.neumf_model_fn if params["use_xla_for_gpu"]: @@ -116,8 +118,10 @@ def construct_estimator(num_gpus, model_dir, params, batch_size, def main(_): - with logger.benchmark_context(FLAGS): + with logger.benchmark_context(FLAGS), mlperf_helper.LOGGER(FLAGS.ml_perf): + mlperf_helper.set_ncf_root(os.path.split(os.path.abspath(__file__))[0]) run_ncf(FLAGS) + mlperf_helper.stitch_ncf() def run_ncf(_): @@ -218,10 +222,16 @@ def run_ncf(_): pred_input_fn = None total_training_cycle = FLAGS.train_epochs // FLAGS.epochs_between_evals + target_reached = False + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.TRAIN_LOOP) for cycle_index in range(total_training_cycle): + assert FLAGS.epochs_between_evals == 1 or not mlperf_helper.LOGGER.enabled tf.logging.info("Starting a training cycle: {}/{}".format( cycle_index + 1, total_training_cycle)) + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.TRAIN_EPOCH, + value=cycle_index) + # Train the model train_input_fn, train_record_dir, batch_count = \ data_preprocessing.make_input_fn( @@ -248,27 +258,49 @@ def run_ncf(_): "producing incorrect shards.".format( eval_batch_count, num_eval_steps)) + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_START, + value=cycle_index) eval_results = eval_estimator.evaluate(pred_input_fn, steps=num_eval_steps) + hr = float(eval_results[rconst.HR_KEY]) + ndcg = float(eval_results[rconst.NDCG_KEY]) tf.logging.info("Evaluation complete.") + mlperf_helper.ncf_print( + key=mlperf_helper.TAGS.EVAL_TARGET, + value={"epoch": cycle_index, "value": FLAGS.hr_threshold}) + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_ACCURACY, + value={"epoch": cycle_index, "value": hr}) + mlperf_helper.ncf_print( + key=mlperf_helper.TAGS.EVAL_HP_NUM_NEG, + value={"epoch": cycle_index, "value": rconst.NUM_EVAL_NEGATIVES}) + + # Logged by the async process during record creation. + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_HP_NUM_USERS, + deferred=True) + + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_STOP, value=cycle_index) + # Benchmark the evaluation results benchmark_logger.log_evaluation_result(eval_results) # Log the HR and NDCG results. - hr = eval_results[rconst.HR_KEY] - ndcg = eval_results[rconst.NDCG_KEY] tf.logging.info( "Iteration {}: HR = {:.4f}, NDCG = {:.4f}".format( cycle_index + 1, hr, ndcg)) # If some evaluation threshold is met if model_helpers.past_stop_threshold(FLAGS.hr_threshold, hr): + target_reached = True break + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.RUN_STOP, + value={"success": target_reached}) cleanup_fn() # Cleanup data construction artifacts and subprocess. # Clear the session explicitly to avoid session delete error tf.keras.backend.clear_session() + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.RUN_FINAL) + def define_ncf_flags(): """Add flags for running ncf_main.""" @@ -419,7 +451,10 @@ def define_ncf_flags(): "If True, use XLA for the model function. Only works when using a " "GPU. On TPUs, XLA is always used")) - flags.mark_flags_as_mutual_exclusive(["use_xla_for_gpu", "tpu"]) + xla_message = "--use_xla_for_gpu is incompatible with --tpu" + @flags.multi_flags_validator(["use_xla_for_gpu", "tpu"], message=xla_message) + def xla_validator(flag_dict): + return not flag_dict["use_xla_for_gpu"] or not flag_dict["tpu"] if __name__ == "__main__": diff --git a/official/recommendation/neumf_model.py b/official/recommendation/neumf_model.py index a6574abd2..a648a2be3 100644 --- a/official/recommendation/neumf_model.py +++ b/official/recommendation/neumf_model.py @@ -42,6 +42,7 @@ import tensorflow as tf from official.datasets import movielens # pylint: disable=g-bad-import-order from official.recommendation import constants as rconst from official.recommendation import stat_utils +from official.utils.logs import mlperf_helper def _sparse_to_dense_grads(grads_and_vars): @@ -102,12 +103,25 @@ def neumf_model_fn(features, labels, mode, params): elif mode == tf.estimator.ModeKeys.TRAIN: labels = tf.cast(labels, tf.int32) + + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.OPT_NAME, value="adam") + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.OPT_LR, + value=params["learning_rate"]) + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.OPT_HP_ADAM_BETA1, + value=params["beta1"]) + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.OPT_HP_ADAM_BETA2, + value=params["beta2"]) + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.OPT_HP_ADAM_EPSILON, + value=params["epsilon"]) + optimizer = tf.train.AdamOptimizer( learning_rate=params["learning_rate"], beta1=params["beta1"], beta2=params["beta2"], epsilon=params["epsilon"]) if params["use_tpu"]: optimizer = tf.contrib.tpu.CrossShardOptimizer(optimizer) + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.MODEL_HP_LOSS_FN, + value=mlperf_helper.TAGS.BCE) loss = tf.losses.sparse_softmax_cross_entropy( labels=labels, logits=softmax_logits @@ -158,6 +172,10 @@ def construct_model(users, items, params): mf_dim = params["mf_dim"] + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.MODEL_HP_MF_DIM, value=mf_dim) + mlperf_helper.ncf_print(key=mlperf_helper.TAGS.MODEL_HP_MLP_LAYER_SIZES, + value=model_layers) + if model_layers[0] % 2 != 0: raise ValueError("The first layer size should be multiple of 2!") diff --git a/official/recommendation/run.sh b/official/recommendation/run.sh index 599f2aa13..911a395a6 100755 --- a/official/recommendation/run.sh +++ b/official/recommendation/run.sh @@ -1,6 +1,11 @@ #!/bin/bash set -e +if [ `id -u` != 0 ]; then + echo "Calling sudo to gain root for this shell. (Needed to clear caches.)" + sudo echo "Success" +fi + DATASET="ml-20m" BUCKET=${BUCKET:-""} @@ -22,7 +27,7 @@ mkdir -p ${LOCAL_TEST_DIR} TPU=${TPU:-""} if [[ -z ${TPU} ]]; then - DEVICE_FLAG="--num_gpus -1" + DEVICE_FLAG="--num_gpus -1 --use_xla_for_gpu" else DEVICE_FLAG="--tpu ${TPU} --num_gpus 0" fi @@ -38,9 +43,14 @@ do MODEL_DIR="${TEST_DIR}/model_dir_${i}" RUN_LOG="${LOCAL_TEST_DIR}/run_${i}.log" + export COMPLIANCE_FILE="${LOCAL_TEST_DIR}/run_${i}_compliance_raw.log" + export STITCHED_COMPLIANCE_FILE="${LOCAL_TEST_DIR}/run_${i}_compliance_submission.log" echo "" echo "Beginning run ${i}" - echo " Complete logs are in ${RUN_LOG}" + echo " Complete output logs are in ${RUN_LOG}" + echo " Compliance logs: (submission log is created after run.)" + echo " ${COMPLIANCE_FILE}" + echo " ${STITCHED_COMPLIANCE_FILE}" # To reduce variation set the seed flag: # --seed ${i} @@ -62,7 +72,7 @@ do --hr_threshold 0.635 \ --ml_perf \ |& tee ${RUN_LOG} \ - | grep --line-buffered -E --regexp="(Iteration [0-9]+: HR = [0-9\.]+, NDCG = [0-9\.]+)|(pipeline_hash)" + | grep --line-buffered -E --regexp="(Iteration [0-9]+: HR = [0-9\.]+, NDCG = [0-9\.]+)|(pipeline_hash)|(MLPerf time:)" END_TIME=$(date +%s) echo "Run ${i} complete: $(( $END_TIME - $START_TIME )) seconds." diff --git a/official/requirements.txt b/official/requirements.txt index 183c18c08..832ea40a0 100644 --- a/official/requirements.txt +++ b/official/requirements.txt @@ -1,6 +1,7 @@ google-api-python-client>=1.6.7 google-cloud-bigquery>=0.31.0 kaggle>=1.3.9 +mlperf_compliance==0.0.8 numpy oauth2client>=4.1.2 pandas diff --git a/official/utils/logs/mlperf_helper.py b/official/utils/logs/mlperf_helper.py new file mode 100644 index 000000000..832bd2b78 --- /dev/null +++ b/official/utils/logs/mlperf_helper.py @@ -0,0 +1,242 @@ +# Copyright 2018 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +"""Wrapper for the mlperf logging utils. + +MLPerf compliance logging is only desired under a limited set of circumstances. +This module is intended to keep users from needing to consider logging (or +install the module) unless they are performing mlperf runs. +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from collections import namedtuple +import json +import os +import re +import subprocess +import sys +import typing + +import tensorflow as tf + +_MIN_VERSION = (0, 0, 6) +_STACK_OFFSET = 2 + +SUDO = "sudo" if os.geteuid() else "" + +# This indirection is used in docker. +DROP_CACHE_LOC = os.getenv("DROP_CACHE_LOC", "/proc/sys/vm/drop_caches") + +_NCF_PREFIX = "NCF_RAW_" + +# TODO(robieta): move line parsing to mlperf util +_PREFIX = r"(?:{})?:::MLPv([0-9]+).([0-9]+).([0-9]+)".format(_NCF_PREFIX) +_BENCHMARK = r"([a-zA-Z0-9_]+)" +_TIMESTAMP = r"([0-9]+\.[0-9]+)" +_CALLSITE = r"\((.+):([0-9]+)\)" +_TAG = r"([a-zA-Z0-9_]+)" +_VALUE = r"(.*)" + +ParsedLine = namedtuple("ParsedLine", ["version", "benchmark", "timestamp", + "callsite", "tag", "value"]) + +LINE_PATTERN = re.compile( + "^{prefix} {benchmark} {timestamp} {callsite} {tag}(: |$){value}?$".format( + prefix=_PREFIX, benchmark=_BENCHMARK, timestamp=_TIMESTAMP, + callsite=_CALLSITE, tag=_TAG, value=_VALUE)) + + +def parse_line(line): # type: (str) -> typing.Optional[ParsedLine] + match = LINE_PATTERN.match(line.strip()) + if not match: + return + + major, minor, micro, benchmark, timestamp = match.groups()[:5] + call_file, call_line, tag, _, value = match.groups()[5:] + + return ParsedLine(version=(int(major), int(minor), int(micro)), + benchmark=benchmark, timestamp=timestamp, + callsite=(call_file, call_line), tag=tag, value=value) + + +def unparse_line(parsed_line): # type: (ParsedLine) -> str + version_str = "{}.{}.{}".format(*parsed_line.version) + callsite_str = "({}:{})".format(*parsed_line.callsite) + value_str = ": {}".format(parsed_line.value) if parsed_line.value else "" + return ":::MLPv{} {} {} {} {} {}".format( + version_str, parsed_line.benchmark, parsed_line.timestamp, callsite_str, + parsed_line.tag, value_str) + + +def get_mlperf_log(): + """Shielded import of mlperf_log module.""" + try: + import pkg_resources + import mlperf_compliance + + version = pkg_resources.get_distribution("mlperf_compliance") + version = tuple(int(i) for i in version.version.split(".")) + if version < _MIN_VERSION: + tf.logging.warning( + "mlperf_compliance is version {}, must be at least version {}".format( + ".".join([str(i) for i in version]), + ".".join([str(i) for i in _MIN_VERSION]))) + raise ImportError + + mlperf_log = mlperf_compliance.mlperf_log + + except ImportError: + mlperf_log = None + + return mlperf_log + + +class Logger(object): + """MLPerf logger indirection class. + + This logger only logs for MLPerf runs, and prevents various errors associated + with not having the mlperf_compliance package installed. + """ + class Tags(object): + def __init__(self, mlperf_log): + self._enabled = False + self._mlperf_log = mlperf_log + + def __getattr__(self, item): + if self._mlperf_log is None or not self._enabled: + return + return getattr(self._mlperf_log, item) + + def __init__(self): + self._enabled = False + self._mlperf_log = get_mlperf_log() + self.tags = self.Tags(self._mlperf_log) + + def __call__(self, enable=False): + if enable and self._mlperf_log is None: + raise ImportError("MLPerf logging was requested, but mlperf_compliance " + "module could not be loaded.") + + self._enabled = enable + self.tags._enabled = enable + return self + + def __enter__(self): + pass + + def __exit__(self, exc_type, exc_val, exc_tb): + self._enabled = False + self.tags._enabled = False + + @property + def log_file(self): + if self._mlperf_log is None: + return + return self._mlperf_log.LOG_FILE + + @property + def enabled(self): + return self._enabled + + def ncf_print(self, key, value=None, stack_offset=_STACK_OFFSET, + deferred=False, extra_print=False, prefix=_NCF_PREFIX): + if self._mlperf_log is None or not self.enabled: + return + self._mlperf_log.ncf_print(key=key, value=value, stack_offset=stack_offset, + deferred=deferred, extra_print=extra_print, + prefix=prefix) + + def set_ncf_root(self, path): + if self._mlperf_log is None: + return + self._mlperf_log.ROOT_DIR_NCF = path + + +LOGGER = Logger() +ncf_print, set_ncf_root = LOGGER.ncf_print, LOGGER.set_ncf_root +TAGS = LOGGER.tags + + +def clear_system_caches(): + if not LOGGER.enabled: + return + ret_code = subprocess.call( + ["sync && echo 3 | {} tee {}".format(SUDO, DROP_CACHE_LOC)], + shell=True) + + if ret_code: + raise ValueError("Failed to clear caches") + + +def stitch_ncf(): + """Format NCF logs for MLPerf compliance.""" + if not LOGGER.enabled: + return + + if LOGGER.log_file is None or not tf.gfile.Exists(LOGGER.log_file): + tf.logging.error("Could not find log file to stitch.") + + log_lines = [] + num_eval_users = None + start_time = None + stop_time = None + with tf.gfile.Open(LOGGER.log_file, "r") as f: + for line in f: + parsed_line = parse_line(line) + if not parsed_line: + tf.logging.warning("Failed to parse line: {}".format(line)) + continue + log_lines.append(parsed_line) + + if parsed_line.tag == TAGS.RUN_START: + assert start_time is None + start_time = float(parsed_line.timestamp) + + if parsed_line.tag == TAGS.RUN_STOP: + assert stop_time is None + stop_time = float(parsed_line.timestamp) + + if (parsed_line.tag == TAGS.EVAL_HP_NUM_USERS and parsed_line.value + is not None and "DEFERRED" not in parsed_line.value): + assert num_eval_users is None or num_eval_users == parsed_line.value + num_eval_users = parsed_line.value + log_lines.pop() + + for i, parsed_line in enumerate(log_lines): + if parsed_line.tag == TAGS.EVAL_HP_NUM_USERS: + log_lines[i] = ParsedLine(*parsed_line[:-1], value=num_eval_users) + + log_lines = sorted([unparse_line(i) for i in log_lines]) + + output_path = os.getenv("STITCHED_COMPLIANCE_FILE", None) + if output_path: + with tf.gfile.Open(output_path, "w") as f: + for line in log_lines: + f.write(line + "\n") + else: + for line in log_lines: + print(line) + sys.stdout.flush() + + if start_time is not None and stop_time is not None: + tf.logging.info("MLPerf time: {:.1f} sec.".format(stop_time - start_time)) + +if __name__ == "__main__": + tf.logging.set_verbosity(tf.logging.INFO) + with LOGGER(True): + ncf_print(key=TAGS.RUN_START) -- GitLab