未验证 提交 6c5b5013 编写于 作者: F Frost Ming

parallel install mode

上级 e9de147c
......@@ -62,7 +62,7 @@ class HelloCommand(BaseCommand):
```
!!! note
The default options are loaded first, then `add_arguments()` is called.
The default options are loaded first, then `add_arguments()` is called.
### Register the command to the core object
......
......@@ -127,6 +127,7 @@ by the configuration item `use_venv`, when it is set to `True`, PDM will use the
| `cache_dir` | The root directory of cached files | The default cache location on OS | No | |
| `auto_global` | Use global package implicity if no local project is found | `False` | No | `PDM_AUTO_GLOBAL` |
| `use_venv` | Install packages into the activated venv site packages instead of PEP 582 | `False` | Yes | `PDM_USE_VENV` |
| `parallel_install` | Whether to perform installation and uninstallation in parallel | `True` | Yes | `PDM_PARALLEL_INSTALL` |
| `python.path` | The Python interpreter path | | Yes | `PDM_PYTHON_PATH` |
| `python.use_pyenv` | Use the pyenv interpreter | `True` | Yes | |
| `pypi.url` | The URL of PyPI mirror | Read `index-url` in `pip.conf`, or `https://pypi.org/simple` if not found | Yes | `PDM_PYPI_URL` |
......
import contextlib
import functools
import importlib
import subprocess
import traceback
from collections import defaultdict
from concurrent.futures.thread import ThreadPoolExecutor
from typing import Dict, List, Tuple
import distlib.scripts
from click import progressbar
from distlib.wheel import Wheel
from pip._internal.utils import logging as pip_logging
from pip._vendor.pkg_resources import Distribution, EggInfoDistribution, safe_name
from pip_shims import shims
from vistir import cd
......@@ -272,6 +279,8 @@ class Installer: # pragma: no cover
def __init__(self, environment: Environment, auto_confirm: bool = True) -> None:
self.environment = environment
self.auto_confirm = auto_confirm
# XXX: Patch pip to make it work under multi-thread mode
pip_logging._log_state.indentation = 0
def install(self, candidate: Candidate) -> None:
candidate.get_metadata(allow_all_wheels=False)
......@@ -324,30 +333,91 @@ class Installer: # pragma: no cover
pathset.commit()
class DummyFuture:
_NOT_SET = object()
def __init__(self):
self._result = self._NOT_SET
self._exc = None
def set_result(self, result):
self._result = result
def set_exception(self, exc):
self._exc = exc
def result(self):
return self._result
def exception(self):
return self._exc
def add_done_callback(self, func):
func(self)
class DummyExecutor:
"""A synchronous pool class to mimick ProcessPoolExecuter's interface.
functions are called and awaited for the result
"""
def submit(self, func, *args, **kwargs):
future = DummyFuture()
try:
future.set_result(func(*args, **kwargs))
except Exception as exc:
future.set_exception(exc)
return future
def close(self):
pass
def join(self):
pass
class Synchronizer:
"""Synchronize the working set with given installation candidates"""
BAR_FILLED_CHAR = "▉"
BAR_EMPTY_CHAR = " "
RETRY_TIMES = 1
def __init__(
self, candidates: Dict[str, Candidate], environment: Environment
self,
candidates: Dict[str, Candidate],
environment: Environment,
parallel: bool = True,
) -> None:
self.candidates = candidates
self.environment = environment
def _print_list_information(self, word, items, dry=False):
if dry:
word = "to be " + word
template = "{count} package{suffix} {word}: {items}"
suffix = "s" if len(items) > 1 else ""
count = len(items)
items = ", ".join(str(stream.green(item, bold=True)) for item in items)
stream.echo(template.format(count=count, suffix=suffix, word=word, items=items))
self.parallel = parallel
self.working_set = environment.get_working_set()
@contextlib.contextmanager
def progressbar(self, label: str, total: int):
bar = progressbar(
length=total,
fill_char=stream.green(self.BAR_FILLED_CHAR),
empty_char=self.BAR_EMPTY_CHAR,
show_percent=False,
show_pos=True,
label=label,
bar_template="%(label)s %(bar)s %(info)s",
)
if self.parallel:
executor = ThreadPoolExecutor()
else:
executor = DummyExecutor()
with executor:
yield bar, executor
def get_installer(self) -> Installer:
return Installer(self.environment)
def compare_with_working_set(self) -> Tuple[List[str], List[str], List[str]]:
"""Compares the candidates and return (to_add, to_update, to_remove)"""
working_set = self.environment.get_working_set()
working_set = self.working_set
to_update, to_remove = [], []
candidates = self.candidates.copy()
environment = self.environment.marker_environment
......@@ -371,43 +441,62 @@ class Synchronizer:
)
return to_add, to_update, to_remove
def install_candidates(
self, candidates: List[Candidate], update: bool = False
) -> None:
"""Install candidates.
def install_candidate(self, key: str) -> Candidate:
"""Install candidate"""
can = self.candidates[key]
installer = self.get_installer()
installer.install(can)
return can
:param candidates: a list of candidates to be installed.
:param update: whether to remove existed packages.
"""
def update_candidate(self, key: str) -> Tuple[Distribution, Candidate]:
"""Update candidate"""
can = self.candidates[key]
dist = self.working_set[safe_name(can.name).lower()]
installer = self.get_installer()
working_set = self.environment.get_working_set()
for can in candidates:
if update:
dist = working_set[safe_name(can.name).lower()]
stream.echo(
f" - Updating {stream.green(can.name, bold=True)} "
f"{stream.yellow(dist.version)} -> "
f"{stream.yellow(can.version)}"
)
installer.uninstall(dist)
else:
stream.echo(f" - Installing {can.format()}...")
installer.install(can)
installer.uninstall(dist)
installer.install(can)
return dist, can
def remove_distributions(self, distributions: List[str]) -> None:
def remove_distribution(self, key: str) -> Distribution:
"""Remove distributions with given names.
:param distributions: a list of names to be removed.
"""
installer = self.get_installer()
working_set = self.environment.get_working_set()
for name in distributions:
dist = working_set[name]
stream.echo(
f" - Uninstalling: {stream.green(name, bold=True)} "
f"{stream.yellow(dist.version)}"
)
installer.uninstall(dist)
dist = self.working_set[key]
installer.uninstall(dist)
return dist
def _print_section_title(
self, action: str, number_of_packages: int, dry_run: bool
) -> None:
plural = "s" if number_of_packages > 1 else ""
verb = "will be" if dry_run else "are" if plural else "is"
stream.echo(f"{number_of_packages} package{plural} {verb} {action}:")
def summarize(self, result, dry_run=False):
added, updated, removed = result["add"], result["update"], result["remove"]
if added:
self._print_section_title("installed", len(added), dry_run)
for item in added:
stream.echo(f" - {item.format()}")
stream.echo()
if updated:
self._print_section_title("updated", len(updated), dry_run)
for old, can in updated:
stream.echo(
f" - {stream.green(can.name, bold=True)} "
f"{stream.yellow(old.version)} "
f"-> {stream.yellow(can.version)}"
)
stream.echo()
if removed:
self._print_section_title("removed", len(removed), dry_run)
for dist in removed:
stream.echo(
f" - {stream.green(dist.key, bold=True)} "
f"{stream.yellow(dist.version)}"
)
def synchronize(self, clean: bool = True, dry_run: bool = False) -> None:
"""Synchronize the working set with pinned candidates.
......@@ -416,27 +505,93 @@ class Synchronizer:
:param dry_run: If set to True, only prints actions without actually do them.
"""
to_add, to_update, to_remove = self.compare_with_working_set()
lists_to_check = [to_add, to_update]
if clean:
lists_to_check.append(to_remove)
if not clean:
to_remove = []
lists_to_check = [to_add, to_update, to_remove]
if not any(lists_to_check):
stream.echo("All packages are synced to date, nothing to do.")
return
if to_add and not dry_run:
self.install_candidates(
[can for k, can in self.candidates.items() if k in to_add]
)
if to_update and not dry_run:
self.install_candidates(
[can for k, can in self.candidates.items() if k in to_update],
update=True,
if dry_run:
result = dict(
add=[self.candidates[key] for key in to_add],
update=[
(self.working_set[key], self.candidates[key]) for key in to_update
],
remove=[self.working_set[key] for key in to_remove],
)
if clean and to_remove and not dry_run:
self.remove_distributions(to_remove)
self.summarize(result, dry_run)
return
handlers = {
"add": self.install_candidate,
"update": self.update_candidate,
"remove": self.remove_distribution,
}
result = defaultdict(list)
failed = defaultdict(list)
to_do = {"add": to_add, "update": to_update, "remove": to_remove}
# Keep track of exceptions
errors = []
def update_progress(future, section, key, bar):
if future.exception():
failed[section].append(key)
errors.append(future.exception())
else:
result[section].append(future.result())
bar.update(1)
with self.progressbar(
"Synchronizing:", sum(len(l) for l in to_do.values())
) as (bar, pool):
for section in to_do:
for key in to_do[section]:
future = pool.submit(handlers[section], key)
future.add_done_callback(
functools.partial(
update_progress, section=section, key=key, bar=bar
)
)
# Retry for failed items
for i in range(self.RETRY_TIMES):
if not any(failed.values()):
break
to_do = failed
failed = defaultdict(list)
errors.clear()
with self.progressbar(
f"Retrying ({i + 1}/{self.RETRY_TIMES}):",
sum(len(l) for l in to_do.values()),
) as (bar, pool):
for section in to_do:
for key in to_do[section]:
future = pool.submit(handlers[section], key)
future.add_done_callback(
functools.partial(
update_progress, section=section, key=key, bar=bar
)
)
stream.echo()
if to_add:
self._print_list_information("added", to_add, dry_run)
if to_update:
self._print_list_information("updated", to_update, dry_run)
if clean and to_remove:
self._print_list_information("removed", to_remove, dry_run)
self.summarize(result)
if not any(failed.values()):
return
stream.echo(stream.red("[ERROR]", bold=True))
if failed["add"] + failed["update"]:
stream.echo(
f"Installation failed: {', '.join(failed['add'] + failed['update'])}"
)
if failed["remove"]:
stream.echo(f"Removal failed: {', '.join(failed['remove'])}")
for error in errors:
stream.echo(
"".join(
traceback.format_exception(type(error), error, error.__traceback__)
),
verbosity=stream.DEBUG,
)
......@@ -71,6 +71,11 @@ class Config(MutableMapping):
True,
"PDM_AUTO_GLOBAL",
),
"parallel_install": ConfigItem(
"Whether to perform installation and uninstallation in parallel",
True,
env_var="PDM_PARALLEL_INSTALL",
),
"python.path": ConfigItem("The Python interpreter path", env_var="PDM_PYTHON"),
"python.use_pyenv": ConfigItem("Use the pyenv interpreter", True),
"pypi.url": ConfigItem(
......
......@@ -24,8 +24,9 @@ def test_sync_only_different(project, repository, working_set, capsys):
working_set.add_distribution(make_distribution("idna", "2.7"))
actions.do_add(project, packages=["requests"])
out, _ = capsys.readouterr()
assert "4 packages added" in out
assert "1 package updated" in out
print(out)
assert "4 packages are installed" in out
assert "1 package is updated" in out
assert "foo" in working_set
assert "test-project" in working_set
assert working_set["chardet"].version == "3.0.4"
......@@ -196,7 +197,7 @@ def test_update_all_packages(project, repository, working_set, capsys):
assert locked_candidates["chardet"].version == "3.0.5"
assert locked_candidates["pytz"].version == "2019.6"
out, _ = capsys.readouterr()
assert "3 packages updated" in out
assert "3 packages are updated" in out
actions.do_sync(project)
out, _ = capsys.readouterr()
......
......@@ -208,10 +208,13 @@ class MockWorkingSet(collections.abc.MutableMapping):
@pytest.fixture()
def working_set(mocker, repository):
from pip._internal.utils import logging
rv = MockWorkingSet()
mocker.patch.object(Environment, "get_working_set", return_value=rv)
def install(candidate):
logging._log_state.indentation = 0
dependencies = repository.get_dependencies(candidate)[0]
key = safe_name(candidate.name).lower()
dist = Distribution(key, candidate.version)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册