未验证 提交 1c3e7540 编写于 作者: F Frost Ming

pdm search fix

上级 b6c1c3d3
Re-implement the `pdm search` to query the `/search` HTTP endpoint.
from typing import Dict, List, Tuple, Union
from typing import Dict, List, NamedTuple, Tuple, Union
Source = Dict[str, Union[str, bool]]
RequirementDict = Union[str, Dict[str, Union[bool, str]]]
CandidateInfo = Tuple[List[str], str, str]
SearchResult = List[Dict[str, Union[str, List[str]]]]
class Package(NamedTuple):
name: str
version: str
summary: str
SearchResult = List[Package]
......@@ -2,32 +2,30 @@ import argparse
import sys
import textwrap
from shutil import get_terminal_size
from typing import Optional
from pip._vendor.pkg_resources import safe_name
from pdm._types import SearchResult
from pdm.cli.commands.base import BaseCommand
from pdm.iostream import stream
from pdm.models.environment import WorkingSet
from pdm.project import Project
from pdm.utils import highest_version
def print_results(hits, working_set, terminal_width=None):
def print_results(
hits: SearchResult, working_set: WorkingSet, terminal_width: Optional[int] = None
):
if not hits:
return
name_column_width = (
max(
[
len(hit["name"]) + len(highest_version(hit.get("versions", ["-"])))
for hit in hits
]
)
+ 4
max([len(hit.name) + len(hit.version or "") for hit in hits]) + 4
)
for hit in hits:
name = hit["name"]
summary = hit["summary"] or ""
latest = highest_version(hit.get("versions", ["-"]))
name = hit.name
summary = hit.summary or ""
latest = hit.version or ""
if terminal_width is not None:
target_width = terminal_width - name_column_width - 5
if target_width > 10:
......
from __future__ import annotations
import sys
import xmlrpc.client as xmlrpc_client
from functools import lru_cache, wraps
from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Tuple
from pdm._types import CandidateInfo, SearchResult, Source
from pip._vendor.html5lib import parse
from pdm._types import CandidateInfo, Package, SearchResult, Source
from pdm.exceptions import CandidateInfoNotFound, CorruptedCacheError, PackageIndexError
from pdm.models.candidates import Candidate
from pdm.models.requirements import (
......@@ -14,8 +15,7 @@ from pdm.models.requirements import (
parse_requirement,
)
from pdm.models.specifiers import PySpecSet, SpecifierSet
from pdm.models.xmlrpc import PyPIXmlrpcTransport
from pdm.utils import allow_all_wheels, highest_version
from pdm.utils import allow_all_wheels
if TYPE_CHECKING:
from pdm.models.environment import Environment
......@@ -234,32 +234,36 @@ class PyPIRepository(BaseRepository):
]
def search(self, query: str) -> SearchResult:
pypi_simple = self.sources[0]["url"]
if not pypi_simple.endswith("/simple"):
raise PackageIndexError(f"{pypi_simple} doesn't support '/pypi' endpoint.")
pypi_url = pypi_simple[:-6] + "pypi"
pypi_simple = self.sources[0]["url"].rstrip("/")
results = []
if pypi_simple.endswith("/simple"):
search_url = pypi_simple[:-6] + "search"
else:
search_url = pypi_simple + "/search"
with self.environment.get_finder() as finder:
transport = PyPIXmlrpcTransport(pypi_url, finder.session)
pypi = xmlrpc_client.ServerProxy(pypi_url, transport)
hits = pypi.search({"name": query, "summary": query}, "or")
packages = {}
for hit in hits:
name = hit["name"]
summary = hit["summary"]
version = hit["version"]
if name not in packages.keys():
packages[name] = {
"name": name,
"summary": summary,
"versions": [version],
}
else:
packages[name]["versions"].append(version)
# if this is the highest version, replace summary and score
if version == highest_version(packages[name]["versions"]):
packages[name]["summary"] = summary
return list(packages.values())
session = finder.session
resp = session.get(search_url, params={"q": query})
if resp.status_code == 404:
raise PackageIndexError(
f"{pypi_simple!r} doesn't support '/search' endpoint."
)
resp.raise_for_status()
content = parse(resp.content, namespaceHTMLElements=False)
for result in content.findall(".//*[@class='package-snippet']"):
name = result.find("h3/*[@class='package-snippet__name']").text
version = result.find("h3/*[@class='package-snippet__version']").text
if not name or not version:
continue
description = result.find("p[@class='package-snippet__description']").text
if not description:
description = ""
result = Package(name, version, description)
results.append(result)
return results
import urllib.parse as urllib_parse
import xmlrpc.client as xmlrpc_client
class PyPIXmlrpcTransport(xmlrpc_client.Transport):
"""Provide a `xmlrpclib.Transport` implementation via a `PipSession`
object.
"""
def __init__(self, index_url, session, use_datetime=False):
xmlrpc_client.Transport.__init__(self, use_datetime)
index_parts = urllib_parse.urlparse(index_url)
self._scheme = index_parts.scheme
self._session = session
def request(self, host, handler, request_body, verbose=False):
parts = (self._scheme, host, handler, None, None, None)
url = urllib_parse.urlunparse(parts)
headers = {"Content-Type": "text/xml"}
response = self._session.post(
url, data=request_body, headers=headers, stream=True
)
response.raise_for_status()
self.verbose = verbose
return self.parse_response(response.raw)
......@@ -15,7 +15,6 @@ from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
from distlib.wheel import Wheel
from packaging.version import parse as parse_version
from pdm._types import Source
from pdm.models.pip_shims import (
......@@ -367,11 +366,6 @@ def open_file(url, session=None):
result.close()
def highest_version(versions: List[str]) -> str:
"""Return the highest version of a given list."""
return max(versions, key=parse_version)
def populate_link(
finder: PackageFinder,
ireq: InstallRequirement,
......
......@@ -240,6 +240,13 @@ def test_import_other_format_file(project, invoke, filename):
assert result.exit_code == 0
@pytest.mark.pypi
def test_search_package(project, invoke):
result = invoke(["search", "requests"], obj=project)
assert result.exit_code == 0
assert len(result.output.splitlines()) > 0
@pytest.mark.pypi
def test_show_package_on_pypi(invoke):
result = invoke(["show", "ipython"])
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册