提交 ce83a9db 编写于 作者: H Hongkun Yu 提交者: A. Unique TensorFlower

Internal change

PiperOrigin-RevId: 297383836
上级 dd0ff4df
......@@ -551,8 +551,6 @@ class BertSquadMultiWorkerAccuracy(BertSquadBenchmarkBase):
num_gpus = 8
FLAGS.num_gpus = num_gpus
FLAGS.dtype = 'fp16'
# Enable gradient allreduce in fp16
FLAGS.explicit_allreduce = True
FLAGS.enable_xla = False
FLAGS.distribution_strategy = 'multi_worker_mirrored'
FLAGS.tf_gpu_thread_mode = 'gpu_private'
......@@ -623,8 +621,7 @@ class BertSquadMultiWorkerBenchmark(BertSquadBenchmarkBase):
min_accuracy=0,
max_accuracy=1)
def _benchmark_common(self, num_workers, all_reduce_alg,
explicit_allreduce=False):
def _benchmark_common(self, num_workers, all_reduce_alg):
"""Common to all benchmarks in this class."""
self._setup()
......@@ -640,8 +637,6 @@ class BertSquadMultiWorkerBenchmark(BertSquadBenchmarkBase):
num_workers, all_reduce_alg))
FLAGS.train_batch_size = 4 * num_gpus * num_workers
FLAGS.all_reduce_alg = all_reduce_alg
# Enable gradient allreduce in fp16
FLAGS.explicit_allreduce = explicit_allreduce
self._run_and_report_benchmark()
......@@ -655,23 +650,19 @@ class BertSquadMultiWorkerBenchmark(BertSquadBenchmarkBase):
def benchmark_8_gpu_2_workers_fp16_ring_tweaked(self):
"""8 GPUs per worker, 2 workers, fp16, ring all-reduce."""
self._benchmark_common(num_workers=2, all_reduce_alg='ring',
explicit_allreduce=True)
self._benchmark_common(num_workers=2, all_reduce_alg='ring')
def benchmark_8_gpu_2_workers_fp16_nccl_tweaked(self):
"""8 GPUs per worker, 2 workers, fp16, nccl all-reduce."""
self._benchmark_common(num_workers=2, all_reduce_alg='nccl',
explicit_allreduce=True)
self._benchmark_common(num_workers=2, all_reduce_alg='nccl')
def benchmark_8_gpu_8_workers_fp16_ring_tweaked(self):
"""8 GPUs per worker, 8 workers, fp16, ring all-reduce."""
self._benchmark_common(num_workers=8, all_reduce_alg='ring',
explicit_allreduce=True)
self._benchmark_common(num_workers=8, all_reduce_alg='ring')
def benchmark_8_gpu_8_workers_fp16_nccl_tweaked(self):
"""8 GPUs per worker, 8 workers, fp16, nccl all-reduce."""
self._benchmark_common(num_workers=8, all_reduce_alg='nccl',
explicit_allreduce=True)
self._benchmark_common(num_workers=8, all_reduce_alg='nccl')
if __name__ == '__main__':
......
......@@ -76,56 +76,6 @@ def write_txt_summary(training_summary, summary_dir):
f.write(json.dumps(training_summary, indent=4))
def _filter_grads(grads_and_vars):
"""Filter out iterable with grad equal to None."""
grads_and_vars = tuple(grads_and_vars)
if not grads_and_vars:
return grads_and_vars
filtered = []
vars_with_empty_grads = []
for grad, var in grads_and_vars:
if grad is None:
vars_with_empty_grads.append(var)
else:
filtered.append((grad, var))
filtered = tuple(filtered)
if not filtered:
raise ValueError('No gradients provided for any variable: %s.' %
([v.name for _, v in grads_and_vars],))
if vars_with_empty_grads:
logging.warning(
('Gradients do not exist for variables %s when minimizing the loss.'),
([v.name for v in vars_with_empty_grads]))
return filtered
def _filter_and_allreduce_gradients(grads_and_vars,
allreduce_precision='float32'):
"""Filter None grads and then allreduce gradients in specified precision.
This utils function is used when users intent to explicitly allreduce
gradients and customize gradients operations before and after allreduce.
The allreduced gradients are then passed to optimizer.apply_gradients(
all_reduce_sum_gradients=False).
Arguments:
grads_and_vars: gradients and variables pairs.
allreduce_precision: Whether to allreduce gradients in float32 or float16.
Returns:
pairs of allreduced non-None gradients and variables.
"""
filtered_grads_and_vars = _filter_grads(grads_and_vars)
(grads, variables) = zip(*filtered_grads_and_vars)
if allreduce_precision == 'float16':
grads = [tf.cast(grad, 'float16') for grad in grads]
allreduced_grads = tf.distribute.get_replica_context().all_reduce(
tf.distribute.ReduceOp.SUM, grads)
if allreduce_precision == 'float16':
allreduced_grads = [tf.cast(grad, 'float32') for grad in allreduced_grads]
return allreduced_grads, variables
def run_customized_training_loop(
# pylint: disable=invalid-name
_sentinel=None,
......@@ -144,8 +94,7 @@ def run_customized_training_loop(
init_checkpoint=None,
custom_callbacks=None,
run_eagerly=False,
sub_model_export_name=None,
explicit_allreduce=False):
sub_model_export_name=None):
"""Run BERT pretrain model training using low-level API.
Arguments:
......@@ -187,12 +136,6 @@ def run_customized_training_loop(
file is {sub_model_export_name}_step_{step}.ckpt and the last
checkpint's name is {sub_model_export_name}.ckpt;
if None, `sub_model` will not be exported as checkpoint.
explicit_allreduce: Whether to explicitly perform gradient allreduce,
instead of relying on implicit allreduce in optimizer.apply_gradients().
default is False. For now, if training using FP16 mixed precision,
explicit allreduce will aggregate gradients in FP16 format. For TPU and
GPU training using FP32, explicit allreduce will aggregate gradients in
FP32 format.
Returns:
Trained model.
......@@ -308,30 +251,10 @@ def run_customized_training_loop(
if use_float16:
scaled_grads = tape.gradient(scaled_loss, training_vars)
if explicit_allreduce:
(allreduced_scaled_grads,
filtered_training_vars) = _filter_and_allreduce_gradients(
zip(scaled_grads, training_vars), allreduce_precision='float16')
allreduced_unscaled_grads = optimizer.get_unscaled_gradients(
allreduced_scaled_grads)
grads_and_vars = zip(allreduced_unscaled_grads,
filtered_training_vars)
else:
grads = optimizer.get_unscaled_gradients(scaled_grads)
grads_and_vars = zip(grads, training_vars)
else:
# TPU or FP32 GPU code path
grads = tape.gradient(loss, training_vars)
if explicit_allreduce:
(allreduced_grads,
filtered_training_vars) = _filter_and_allreduce_gradients(
zip(grads, training_vars), allreduce_precision='float32')
grads_and_vars = zip(allreduced_grads, filtered_training_vars)
else:
grads_and_vars = zip(grads, training_vars)
optimizer.apply_gradients(
grads_and_vars, all_reduce_sum_gradients=not explicit_allreduce)
optimizer.apply_gradients(zip(grads, training_vars))
# For reporting, the metric takes the mean of losses.
train_loss_metric.update_state(loss)
for metric in train_metrics:
......
......@@ -139,8 +139,7 @@ class ModelTrainingUtilsTest(tf.test.TestCase, parameterized.TestCase):
super(ModelTrainingUtilsTest, self).setUp()
self._model_fn = create_model_fn(input_shape=[128], num_classes=3)
def run_training(self, strategy, model_dir, steps_per_loop, run_eagerly,
explicit_allreduce=False):
def run_training(self, strategy, model_dir, steps_per_loop, run_eagerly):
input_fn = create_fake_data_input_fn(
batch_size=8, features_shape=[128], num_classes=3)
model_training_utils.run_customized_training_loop(
......@@ -180,7 +179,12 @@ class ModelTrainingUtilsTest(tf.test.TestCase, parameterized.TestCase):
self.run_training(
distribution, model_dir, steps_per_loop=1, run_eagerly=True)
def _verify_artifacts(self, model_dir):
@combinations.generate(eager_strategy_combinations())
def test_train_check_artifacts(self, distribution):
model_dir = self.get_temp_dir()
self.run_training(
distribution, model_dir, steps_per_loop=10, run_eagerly=False)
# Two checkpoints should be saved after two epochs.
self.assertNotEmpty(tf.io.gfile.glob(os.path.join(model_dir, 'ctl_step_*')))
self.assertNotEmpty(
......@@ -204,23 +208,6 @@ class ModelTrainingUtilsTest(tf.test.TestCase, parameterized.TestCase):
check_eventfile_for_keyword('mean_input',
os.path.join(model_dir, 'summaries/eval')))
@combinations.generate(eager_strategy_combinations())
def test_train_check_artifacts(self, distribution):
model_dir = self.get_temp_dir()
self.run_training(
distribution, model_dir, steps_per_loop=10, run_eagerly=False)
self._verify_artifacts(model_dir)
@combinations.generate(eager_strategy_combinations())
def test_train_explicit_allreduce_check_artifacts(self, distribution):
model_dir = self.get_temp_dir()
self.run_training(
distribution,
model_dir,
steps_per_loop=10,
run_eagerly=False,
explicit_allreduce=True)
self._verify_artifacts(model_dir)
if __name__ == '__main__':
assert tf.version.VERSION.startswith('2.')
......
......@@ -68,10 +68,6 @@ def define_common_bert_flags():
'If specified, init_checkpoint flag should not be used.')
flags.DEFINE_bool('hub_module_trainable', True,
'True to make keras layers in the hub module trainable.')
flags.DEFINE_bool('explicit_allreduce', False,
'Whether to explicit perform gradient allreduce in '
'training loop, instead of relying on implicit allreduce '
'in optimizer.apply_gradients().')
# Adds flags for mixed precision and multi-worker training.
flags_core.define_performance(
......
......@@ -280,8 +280,7 @@ def train_squad(strategy,
train_input_fn=train_input_fn,
init_checkpoint=FLAGS.init_checkpoint,
run_eagerly=run_eagerly,
custom_callbacks=custom_callbacks,
explicit_allreduce=FLAGS.explicit_allreduce)
custom_callbacks=custom_callbacks)
def predict_squad(strategy, input_meta_data, tokenizer, bert_config, squad_lib):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册