未验证 提交 92dde0a8 编写于 作者: F Frost Ming

retry failed jobs

上级 dc839854
Retry failed jobs when syncing packages.
......@@ -109,7 +109,9 @@ class Core:
etype, err, traceback = sys.exc_info()
if stream.verbosity > stream.NORMAL:
raise err.with_traceback(traceback)
stream.echo("[{}]: {}".format(etype.__name__, err), err=True)
stream.echo(
f"{stream.red('[' + etype.__name__ + ']')}: {err}", err=True
)
sys.exit(1)
def register_command(
......
import contextlib
import functools
import multiprocessing
import traceback
from concurrent.futures.thread import ThreadPoolExecutor
from functools import partial
from typing import Dict, List, Tuple
from pip._vendor.pkg_resources import Distribution, safe_name
......@@ -61,19 +61,20 @@ class DummyExecutor:
class Synchronizer:
"""Synchronize the working set with given installation candidates"""
RETRY_TIMES = 1
SEQUENTIAL_PACKAGES = ("pip", "setuptools", "wheel")
def __init__(
self,
candidates: Dict[str, Candidate],
environment: Environment,
retry_times: int = 1,
) -> None:
self.candidates = candidates
self.environment = environment
self.parallel = environment.project.config["parallel_install"]
self.all_candidates = environment.project.get_locked_candidates("__all__")
self.working_set = environment.get_working_set()
self.retry_times = retry_times
@contextlib.contextmanager
def create_executor(self):
......@@ -271,33 +272,37 @@ class Synchronizer:
parallel_jobs.append((kind, key))
errors: List[str] = []
with stream.indent(" "):
for job in sequential_jobs:
kind, key = job
try:
handlers[kind](key)
except Exception as err:
errors.append(f"{kind} {stream.green(key)} failed:\n")
errors.extend(
traceback.format_exception(type(err), err, err.__traceback__)
)
failed_jobs: List[Tuple[str, str]] = []
def update_progress(future, kind, key):
if future.exception():
errors.append(f"{kind} {stream.green(key)} failed:\n")
failed_jobs.append((kind, key))
error = future.exception()
errors.extend(
traceback.format_exception(type(error), error, error.__traceback__)
[f"{kind} {stream.green(key)} failed:\n"]
+ traceback.format_exception(
type(error), error, error.__traceback__
)
)
with stream.logging("install"):
with stream.indent(" "), self.create_executor() as executor:
for job in parallel_jobs:
with stream.indent(" "):
for job in sequential_jobs:
kind, key = job
future = executor.submit(handlers[kind], key)
future.add_done_callback(
partial(update_progress, kind=kind, key=key)
)
handlers[kind](key)
for i in range(self.retry_times + 1):
with self.create_executor() as executor:
for job in parallel_jobs:
kind, key = job
future = executor.submit(handlers[kind], key)
future.add_done_callback(
functools.partial(update_progress, kind=kind, key=key)
)
if not failed_jobs or i == self.retry_times:
break
parallel_jobs, failed_jobs = failed_jobs, []
errors.clear()
stream.echo("Retry failed jobs")
if errors:
stream.echo(stream.red("\nERRORS:"))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册