提交 37b30d5e 编写于 作者: H Haoyu Zhang 提交者: TensorFlower Gardener

Ignore transient connectivity issues in PS client.

This is a workaround to avoid noisy reports of PS failure from workers due to transient connection errors between them.

PiperOrigin-RevId: 327872097
Change-Id: I5b1feb9cac66c78df4659dfb4b5ed81350d5678c
上级 12c6a4ea
......@@ -26,6 +26,7 @@ import contextlib
import enum
import functools
import os
import re
import sys
import threading
import weakref
......@@ -542,8 +543,9 @@ class _CoordinatedClosureQueue(object):
class WorkerPreemptionHandler(object):
"""Handles worker preemptions."""
def __init__(self, server_def):
def __init__(self, server_def, cluster):
self._server_def = server_def
self._cluster = cluster
self._cluster_update_lock = threading.Lock()
self._cluster_due_for_update = threading.Event()
self._worker_up_cond = threading.Condition(self._cluster_update_lock)
......@@ -577,6 +579,13 @@ class WorkerPreemptionHandler(object):
try:
yield
except errors.OpError as e:
# If the error is due to temporary connectivity issues between worker and
# ps, put back closure, ignore error and do not mark worker as failure.
if self._cluster._record_and_ignore_transient_ps_failure(e): # pylint: disable=protected-access
if on_failure_fn:
on_failure_fn()
return
self._validate_preemption_failure(e)
logging.error("Worker %s failed with error: %s", worker_device_name, e)
if on_failure_fn:
......@@ -775,8 +784,25 @@ class Cluster(object):
protocol=cluster_resolver.rpc_layer,
cluster_device_filters=device_filters)
# Ignore PS failures reported by workers due to transient connection errors.
# Transient connectivity issues between workers and PS are relayed by the
# workers to the client, leading the client to believe that there are PS
# failures. The difference between transient vs. permanent PS failure is the
# number of reports from the workers. When this env var is set to a positive
# integer K, the client ignores up to K reports of a failed PS task. I.e.,
# only when there are more than K trials of executing closures fail due to
# errors from the same PS instance do we consider the PS instance encounters
# a failure.
# TODO(b/164279603): Remove this workaround when the underlying connectivity
# issue in gRPC server is resolved.
self._transient_ps_failures_threshold = int(os.environ.get(
"TF_CLIENT_IGNORE_TRANSIENT_PS_FAILURES", 3))
self._potential_ps_failures_lock = threading.Lock()
self._potential_ps_failures_count = [0] * self._num_ps
self._closure_queue = _CoordinatedClosureQueue()
self.failure_handler = WorkerPreemptionHandler(context.get_server_def())
self.failure_handler = WorkerPreemptionHandler(context.get_server_def(),
self)
worker_device_strings = [
"/job:worker/replica:0/task:%d" % i for i in range(self._num_workers)
]
......@@ -784,6 +810,22 @@ class Cluster(object):
Worker(i, w, self) for i, w in enumerate(worker_device_strings)
]
def _record_and_ignore_transient_ps_failure(self, e):
"""Records potential PS failures and return if failure should be ignored."""
if self._transient_ps_failures_threshold <= 0 or not _is_ps_failure(e):
return False
ps_tasks = _extract_failed_ps_instances(str(e))
with self._potential_ps_failures_lock:
for t in ps_tasks:
self._potential_ps_failures_count[t] += 1
# The number of UnavailableError encountered on this PS task exceeds the
# maximum number of ignored error
if (self._potential_ps_failures_count[t] >=
self._transient_ps_failures_threshold):
return False
return True
def schedule(self, function, args, kwargs):
"""Schedules `function` to be dispatched to a worker for execution.
......@@ -1162,6 +1204,12 @@ class _PerWorkerDistributedIterator(PerWorkerValues):
"is not supported right now.")
def _extract_failed_ps_instances(err_msg):
"""Return a set of potentially failing ps instances from error message."""
tasks = re.findall("/job:ps/replica:0/task:[0-9]+", err_msg)
return set(int(t.split(":")[-1]) for t in tasks)
def _is_ps_failure(error):
"""Whether the error is considered a parameter server failure."""
if (_RPC_ERROR_FROM_PS in str(error) or
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册