未验证 提交 06cf1d88 编写于 作者: F Frost Ming 提交者: GitHub

Merge pull request #25 from frostming/feature/list-graph

new option: list --graph
......@@ -325,14 +325,24 @@ def do_remove(
do_sync(project, sections=(section,), default=False, clean=True)
def do_list(project: Project) -> None:
def do_list(project: Project, graph: bool = False) -> None:
"""Display a list of packages installed in the local packages directory.
:param project: the project instance.
:param graph: whether to display a graph.
"""
from pdm.cli.dependencies import build_dependency_graph, format_dependency_graph
check_project_file(project)
working_set = project.environment.get_working_set()
rows = [
(context.io.green(k, bold=True), format_dist(v))
for k, v in sorted(working_set.items())
]
context.io.display_columns(rows, ["Package", "Version"])
if graph:
context.io.echo(format_dependency_graph(build_dependency_graph(working_set)))
else:
rows = [
(context.io.green(k, bold=True), format_dist(v))
for k, v in sorted(working_set.items())
]
context.io.display_columns(rows, ["Package", "Version"])
def do_build(
......
......@@ -210,11 +210,15 @@ def remove(project, dev, section, sync, packages):
actions.do_remove(project, dev, section, sync, packages)
@cli.command(name="list", help="List packages installed in current working set.")
@cli.command(name="list")
@verbose_option
@click.option(
"--graph", is_flag=True, default=False, help="Display a graph of dependencies."
)
@pass_project
def list_(project):
actions.do_list(project)
def list_(project, graph):
"""List packages installed in the current working set."""
actions.do_list(project, graph)
@cli.command(help="Build artifacts for distribution.")
......
from __future__ import annotations
from packaging.specifiers import SpecifierSet
from pdm.context import context
from pdm.models.candidates import identify
from pdm.models.environment import WorkingSet
from pdm.models.requirements import Requirement, strip_extras
from pdm.resolver.structs import DirectedGraph
class Package:
"""An internal class for the convenience of dependency graph building."""
def __init__(self, name, version, requirements):
self.name = name
self.version = version # if version is None, the dist is not installed.
self.requirements = requirements
def __hash__(self):
return hash(self.name)
def __repr__(self):
return f"<Package {self.name}=={self.version}>"
def __eq__(self, value):
return self.name == value.name
def build_dependency_graph(working_set: WorkingSet) -> DirectedGraph:
"""Build a dependency graph from locked result."""
graph = DirectedGraph()
graph.add(None) # sentinel parent of top nodes.
node_with_extras = set()
def add_package(key, dist):
name, extras = strip_extras(key)
extras = extras or ()
reqs = {}
if dist:
requirements = [
Requirement.from_pkg_requirement(r) for r in dist.requires(extras)
]
for req in requirements:
reqs[identify(req)] = req
version = dist.version
else:
version = None
node = Package(key, version, reqs)
if node not in graph:
if extras:
node_with_extras.add(name)
graph.add(node)
for k in reqs:
child = add_package(k, working_set.get(strip_extras(k)[0]))
graph.connect(node, child)
return node
for k, dist in working_set.items():
add_package(k, dist)
for node in graph._vertices.copy():
if node is not None and not list(graph.iter_parents(node)):
# Top requirements
if node.name in node_with_extras:
# Already included in package[extra], no need to keep the top level
# non-extra package.
graph.remove(node)
else:
graph.connect(None, node)
return graph
LAST_CHILD = "└── "
LAST_PREFIX = " "
NON_LAST_CHILD = "├── "
NON_LAST_PREFIX = "│ "
def format_package(
graph: DirectedGraph, package: Package, required: str = "", prefix: str = ""
) -> str:
"""Format one package.
:param graph: the dependency graph.
:param package: the package instance.
:param required: the version required by its parent.
:param prefix: prefix text for children.
"""
result = []
version = (
context.io.red("[ not installed ]")
if not package.version
else context.io.red(package.version)
if required
and required != "Any"
and not SpecifierSet(required).contains(package.version)
else context.io.yellow(package.version)
)
required = f"[ required: {required} ]" if required else ""
result.append(f"{context.io.green(package.name, bold=True)} {version} {required}\n")
try:
*children, last = sorted(graph.iter_children(package), key=lambda p: p.name)
except ValueError: # No children nodes
pass
else:
for child in children:
required = str(package.requirements[child.name].specifier or "Any")
result.append(
prefix
+ NON_LAST_CHILD
+ format_package(graph, child, required, prefix + NON_LAST_PREFIX)
)
required = str(package.requirements[last.name].specifier or "Any")
result.append(
prefix
+ LAST_CHILD
+ format_package(graph, last, required, prefix + LAST_PREFIX)
)
return "".join(result)
def format_dependency_graph(graph: DirectedGraph) -> str:
"""Format dependency graph for output."""
content = []
for package in graph.iter_children(None):
content.append(format_package(graph, package, prefix=""))
return "".join(content).strip()
from __future__ import annotations
import functools
import warnings
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Union
from pip._vendor.pkg_resources import safe_extra
from pip_shims import shims
......@@ -26,6 +27,34 @@ def get_sdist(egg_info) -> Optional[EggInfoDistribution]:
return EggInfoDistribution(egg_info) if egg_info else None
@functools.lru_cache(128)
def get_requirements_from_dist(
dist: EggInfoDistribution, extras: Sequence[str]
) -> List[str]:
"""Get requirements of a distribution, with given extras."""
extras_in_metadata = []
result = []
dep_map = dist._build_dep_map()
for extra, reqs in dep_map.items():
reqs = [Requirement.from_pkg_requirement(r) for r in reqs]
if not extra:
# requirements without extras are always required.
result.extend(r.as_line() for r in reqs)
else:
new_extra, _, marker = extra.partition(":")
extras_in_metadata.append(new_extra.strip())
# Only include requirements that match one of extras.
if not new_extra.strip() or safe_extra(new_extra.strip()) in extras:
marker = Marker(marker) if marker else None
for r in reqs:
r.marker = marker
result.append(r.as_line())
extras_not_found = [e for e in extras if e not in extras_in_metadata]
if extras_not_found:
warnings.warn(ExtrasError(extras_not_found), stacklevel=2)
return result
def identify(req: Union[Candidate, Requirement]) -> Optional[str]:
"""Get the identity of a candidate or requirement.
The result carries the extras information to distinguish from the same package
......@@ -79,6 +108,10 @@ class Candidate:
self.wheel = None
self.metadata = None
# Dependencies from lockfile content.
self.dependencies = None
self.summary = None
def __hash__(self):
return hash((self.name, self.version))
......@@ -150,30 +183,12 @@ class Candidate:
"""Get the dependencies of a candidate from metadata."""
extras = self.req.extras or ()
metadata = self.get_metadata()
result = []
if self.req.editable:
if not metadata:
return result
extras_in_metadata = []
dep_map = self.ireq.get_dist()._build_dep_map()
for extra, reqs in dep_map.items():
reqs = [Requirement.from_pkg_requirement(r) for r in reqs]
if not extra:
result.extend(r.as_line() for r in reqs)
else:
new_extra, _, marker = extra.partition(":")
extras_in_metadata.append(new_extra.strip())
if not new_extra.strip() or safe_extra(new_extra.strip()) in extras:
marker = Marker(marker) if marker else None
for r in reqs:
r.marker = marker
result.append(r.as_line())
extras_not_found = [e for e in extras if e not in extras_in_metadata]
if extras_not_found:
warnings.warn(ExtrasError(extras_not_found), stacklevel=2)
return []
return get_requirements_from_dist(self.ireq.get_dist(), extras)
else:
result = filter_requirements_with_extras(metadata.run_requires, extras)
return result
return filter_requirements_with_extras(metadata.run_requires, extras)
@property
def requires_python(self) -> str:
......@@ -197,6 +212,10 @@ class Candidate:
requires_python = f">={requires_python},<{int(requires_python) + 1}"
return requires_python
@requires_python.setter
def requires_python(self, value: str) -> None:
self._requires_python = value
def as_lockfile_entry(self) -> Dict[str, Any]:
"""Build a lockfile entry dictionary for the candidate."""
result = {
......
......@@ -201,8 +201,14 @@ class PyPIRepository(BaseRepository):
return requirements, requires_python, summary
raise CandidateInfoNotFound(candidate)
def _get_dependencies_from_lockfile(self, candidate: Candidate) -> CandidateInfo:
if candidate.dependencies is None:
raise CandidateInfoNotFound(candidate)
return candidate.dependencies, candidate.requires_python, candidate.summary
def dependency_generators(self) -> Iterable[Callable[[Candidate], CandidateInfo]]:
return (
self._get_dependencies_from_lockfile,
self._get_dependencies_from_cache,
self._get_dependencies_from_json,
self._get_dependencies_from_metadata,
......
......@@ -184,9 +184,17 @@ class Project:
if version:
package["version"] = f"=={version}"
package_name = package.pop("name")
summary = package.pop("summary", None)
dependencies = [
Requirement.from_req_dict(k, v)
for k, v in package.pop("dependencies", {}).items()
]
req = Requirement.from_req_dict(package_name, dict(package))
can = Candidate(req, self.environment, name=package_name, version=version)
can.marker = req.marker
can.requires_python = str(req.requires_python)
can.dependencies = dependencies
can.summary = summary
can.hashes = {
item["file"]: item["hash"]
for item in self.lockfile["metadata"].get(
......
......@@ -339,3 +339,11 @@ def test_project_no_init_error(project_no_init):
PdmException, match="The pyproject.toml has not been initialized yet"
):
handler(project_no_init)
def test_list_dependency_graph(capsys):
project = Project()
actions.do_list(project, True)
content, _ = capsys.readouterr()
assert "halo 0.0.28 [ required: <1.0.0,>=0.0.28 ]" in content
assert "six 1.14.0 [ required: >=1.12.0 ]" in content
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册