未验证 提交 c082fb41 编写于 作者: F frostming

search command

上级 896b70b9
Add a new command to search for packages
......@@ -3,3 +3,4 @@ from typing import Dict, List, Tuple, Union
Source = Dict[str, Union[str, bool]]
RequirementDict = Union[str, Dict[str, Union[bool, str]]]
CandidateInfo = Tuple[List[str], str, str]
SearchResult = List[Dict[str, Union[str, List[str]]]]
......@@ -429,6 +429,7 @@ def do_info(
python_version = get_python_version(python_path, True)
if not python and not show_project and not env:
rows = [
(stream.cyan("PDM version:", bold=True), project.core.version),
(
stream.cyan("Python Interpreter:", bold=True),
python_path + f" ({python_version})",
......
import argparse
import sys
import textwrap
from shutil import get_terminal_size
from pkg_resources import safe_name
from pdm.cli.commands.base import BaseCommand
from pdm.iostream import stream
from pdm.project import Project
from pdm.utils import highest_version
def print_results(hits, working_set, terminal_width=None):
if not hits:
return
name_column_width = (
max(
[
len(hit["name"]) + len(highest_version(hit.get("versions", ["-"])))
for hit in hits
]
)
+ 4
)
for hit in hits:
name = hit["name"]
summary = hit["summary"] or ""
latest = highest_version(hit.get("versions", ["-"]))
if terminal_width is not None:
target_width = terminal_width - name_column_width - 5
if target_width > 10:
# wrap and indent summary to fit terminal
summary = textwrap.wrap(summary, target_width)
summary = ("\n" + " " * (name_column_width + 2)).join(summary)
current_width = len(name) + len(latest) + 4
spaces = " " * (name_column_width - current_width)
line = "{name} ({latest}){spaces} - {summary}".format(
name=stream.green(name, bold=True),
latest=stream.yellow(latest),
spaces=spaces,
summary=summary,
)
try:
stream.echo(line)
if safe_name(name).lower() in working_set:
dist = working_set[safe_name(name).lower()]
if dist.version == latest:
stream.echo(" INSTALLED: %s (latest)" % dist.version)
else:
stream.echo(" INSTALLED: %s" % dist.version)
stream.echo(" LATEST: %s" % latest)
except UnicodeEncodeError:
pass
class Command(BaseCommand):
"""Search for PyPI packages"""
def add_arguments(self, parser: argparse.ArgumentParser) -> None:
parser.add_argument("query")
def handle(self, project: Project, options: argparse.Namespace) -> None:
result = project.get_repository().search(options.query)
terminal_width = None
if sys.stdout.isatty():
terminal_width = get_terminal_size()[0]
print_results(result, project.environment.get_working_set(), terminal_width)
......@@ -25,6 +25,10 @@ class CorruptedCacheError(PdmException):
pass
class PackageIndexError(PdmException):
pass
class CandidateInfoNotFound(PdmException):
def __init__(self, candidate):
message = (
......
......@@ -57,7 +57,7 @@ class IOStream:
return click.style(text, *args, **kwargs)
def display_columns(
self, rows: List[str], header: Optional[List[str]] = None
self, rows: List[List[str]], header: Optional[List[str]] = None
) -> None:
"""Print rows in aligned columns.
......
from __future__ import annotations
import sys
import xmlrpc.client as xmlrpc_client
from functools import wraps
from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Tuple
from pdm._types import CandidateInfo, Source
from pdm.exceptions import CandidateInfoNotFound, CorruptedCacheError
from pdm._types import CandidateInfo, SearchResult, Source
from pdm.exceptions import CandidateInfoNotFound, CorruptedCacheError, PackageIndexError
from pdm.models.candidates import Candidate
from pdm.models.requirements import (
Requirement,
......@@ -13,7 +14,8 @@ from pdm.models.requirements import (
parse_requirement,
)
from pdm.models.specifiers import PySpecSet, SpecifierSet
from pdm.utils import allow_all_wheels
from pdm.models.xmlrpc import PyPIXmlrpcTransport
from pdm.utils import allow_all_wheels, highest_version
if TYPE_CHECKING:
from pdm.models.environment import Environment
......@@ -159,6 +161,14 @@ class BaseRepository:
"""
raise NotImplementedError
def search(self, query: str) -> SearchResult:
"""Search package by name or summary.
:param query: query string
:returns: search result, a dictionary of name: package metadata
"""
raise NotImplementedError
class PyPIRepository(BaseRepository):
"""Get package and metadata from PyPI source."""
......@@ -245,3 +255,34 @@ class PyPIRepository(BaseRepository):
key=lambda c: c.version,
)
return sorted_cans
def search(self, query: str) -> SearchResult:
pypi_simple = self.sources[0]["url"]
if not pypi_simple.endswith("/simple"):
raise PackageIndexError(f"{pypi_simple} doesn't support '/pypi' endpoint.")
pypi_url = pypi_simple[:-6] + "pypi"
with self.environment.get_finder() as finder:
transport = PyPIXmlrpcTransport(pypi_url, finder.session)
pypi = xmlrpc_client.ServerProxy(pypi_url, transport)
hits = pypi.search({"name": query, "summary": query}, "or")
packages = {}
for hit in hits:
name = hit["name"]
summary = hit["summary"]
version = hit["version"]
if name not in packages.keys():
packages[name] = {
"name": name,
"summary": summary,
"versions": [version],
}
else:
packages[name]["versions"].append(version)
# if this is the highest version, replace summary and score
if version == highest_version(packages[name]["versions"]):
packages[name]["summary"] = summary
return list(packages.values())
import urllib.parse as urllib_parse
import xmlrpc.client as xmlrpc_client
class PyPIXmlrpcTransport(xmlrpc_client.Transport):
"""Provide a `xmlrpclib.Transport` implementation via a `PipSession`
object.
"""
def __init__(self, index_url, session, use_datetime=False):
xmlrpc_client.Transport.__init__(self, use_datetime)
index_parts = urllib_parse.urlparse(index_url)
self._scheme = index_parts.scheme
self._session = session
def request(self, host, handler, request_body, verbose=False):
parts = (self._scheme, host, handler, None, None, None)
url = urllib_parse.urlunparse(parts)
headers = {"Content-Type": "text/xml"}
response = self._session.post(
url, data=request_body, headers=headers, stream=True
)
response.raise_for_status()
self.verbose = verbose
return self.parse_response(response.raw)
......@@ -213,7 +213,9 @@ class Project:
)
return sources
def get_repository(self, cls: Optional[Type[BaseRepository]]) -> BaseRepository:
def get_repository(
self, cls: Optional[Type[BaseRepository]] = None
) -> BaseRepository:
"""Get the repository object"""
if cls is None:
cls = PyPIRepository
......
......@@ -18,6 +18,7 @@ from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
from distlib.wheel import Wheel
from packaging.version import parse as parse_version
from pip_shims.shims import InstallCommand, PackageFinder, TargetPython, url_to_path
from pdm._types import Source
......@@ -500,3 +501,8 @@ def get_platform():
# pip pull request #3497
result = "linux_i686"
return result
def highest_version(versions: List[str]) -> str:
"""Return the highest version of a given list."""
return max(versions, key=parse_version)
......@@ -258,3 +258,9 @@ def test_pep582_not_loading_site_packages(project, invoke, capfd):
)
sys_path = json.loads(capfd.readouterr()[0])
assert not any("site-packages" in p for p in sys_path)
def test_search_package(project, invoke):
result = invoke(["search", "requests"], obj=project)
assert result.exit_code == 0
assert len(result.output.splitlines()) > 0
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册