未验证 提交 4b159c54 编写于 作者: P Pradyun Gedam 提交者: GitHub

Merge pull request #7612 from sbidoul/pip609-v2-sbi

Better freeze of distributions installed from direct URL references
pip now implements PEP 610, so ``pip freeze`` has better fidelity
in presence of distributions installed from Direct URL requirements.
......@@ -270,6 +270,16 @@ class EphemWheelCache(SimpleWheelCache):
)
class CacheEntry(object):
def __init__(
self,
link, # type: Link
persistent, # type: bool
):
self.link = link
self.persistent = persistent
class WheelCache(Cache):
"""Wraps EphemWheelCache and SimpleWheelCache into a single Cache
......@@ -304,16 +314,36 @@ class WheelCache(Cache):
supported_tags, # type: List[Tag]
):
# type: (...) -> Link
cache_entry = self.get_cache_entry(link, package_name, supported_tags)
if cache_entry is None:
return link
return cache_entry.link
def get_cache_entry(
self,
link, # type: Link
package_name, # type: Optional[str]
supported_tags, # type: List[Tag]
):
# type: (...) -> Optional[CacheEntry]
"""Returns a CacheEntry with a link to a cached item if it exists or
None. The cache entry indicates if the item was found in the persistent
or ephemeral cache.
"""
retval = self._wheel_cache.get(
link=link,
package_name=package_name,
supported_tags=supported_tags,
)
if retval is not link:
return retval
return CacheEntry(retval, persistent=True)
return self._ephem_cache.get(
retval = self._ephem_cache.get(
link=link,
package_name=package_name,
supported_tags=supported_tags,
)
if retval is not link:
return CacheEntry(retval, persistent=False)
return None
""" PEP 610 """
import json
import re
from pip._vendor import six
from pip._vendor.six.moves.urllib import parse as urllib_parse
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
if MYPY_CHECK_RUNNING:
from typing import (
Any, Dict, Iterable, Optional, Type, TypeVar, Union
)
T = TypeVar("T")
DIRECT_URL_METADATA_NAME = "direct_url.json"
ENV_VAR_RE = re.compile(r"^\$\{[A-Za-z0-9-_]+\}(:\$\{[A-Za-z0-9-_]+\})?$")
__all__ = [
"DirectUrl",
"DirectUrlValidationError",
"DirInfo",
"ArchiveInfo",
"VcsInfo",
]
class DirectUrlValidationError(Exception):
pass
def _get(d, expected_type, key, default=None):
# type: (Dict[str, Any], Type[T], str, Optional[T]) -> Optional[T]
"""Get value from dictionary and verify expected type."""
if key not in d:
return default
value = d[key]
if six.PY2 and expected_type is str:
expected_type = six.string_types # type: ignore
if not isinstance(value, expected_type):
raise DirectUrlValidationError(
"{!r} has unexpected type for {} (expected {})".format(
value, key, expected_type
)
)
return value
def _get_required(d, expected_type, key, default=None):
# type: (Dict[str, Any], Type[T], str, Optional[T]) -> T
value = _get(d, expected_type, key, default)
if value is None:
raise DirectUrlValidationError("{} must have a value".format(key))
return value
def _exactly_one_of(infos):
# type: (Iterable[Optional[InfoType]]) -> InfoType
infos = [info for info in infos if info is not None]
if not infos:
raise DirectUrlValidationError(
"missing one of archive_info, dir_info, vcs_info"
)
if len(infos) > 1:
raise DirectUrlValidationError(
"more than one of archive_info, dir_info, vcs_info"
)
assert infos[0] is not None
return infos[0]
def _filter_none(**kwargs):
# type: (Any) -> Dict[str, Any]
"""Make dict excluding None values."""
return {k: v for k, v in kwargs.items() if v is not None}
class VcsInfo(object):
name = "vcs_info"
def __init__(
self,
vcs, # type: str
commit_id, # type: str
requested_revision=None, # type: Optional[str]
resolved_revision=None, # type: Optional[str]
resolved_revision_type=None, # type: Optional[str]
):
self.vcs = vcs
self.requested_revision = requested_revision
self.commit_id = commit_id
self.resolved_revision = resolved_revision
self.resolved_revision_type = resolved_revision_type
@classmethod
def _from_dict(cls, d):
# type: (Optional[Dict[str, Any]]) -> Optional[VcsInfo]
if d is None:
return None
return cls(
vcs=_get_required(d, str, "vcs"),
commit_id=_get_required(d, str, "commit_id"),
requested_revision=_get(d, str, "requested_revision"),
resolved_revision=_get(d, str, "resolved_revision"),
resolved_revision_type=_get(d, str, "resolved_revision_type"),
)
def _to_dict(self):
# type: () -> Dict[str, Any]
return _filter_none(
vcs=self.vcs,
requested_revision=self.requested_revision,
commit_id=self.commit_id,
resolved_revision=self.resolved_revision,
resolved_revision_type=self.resolved_revision_type,
)
class ArchiveInfo(object):
name = "archive_info"
def __init__(
self,
hash=None, # type: Optional[str]
):
self.hash = hash
@classmethod
def _from_dict(cls, d):
# type: (Optional[Dict[str, Any]]) -> Optional[ArchiveInfo]
if d is None:
return None
return cls(hash=_get(d, str, "hash"))
def _to_dict(self):
# type: () -> Dict[str, Any]
return _filter_none(hash=self.hash)
class DirInfo(object):
name = "dir_info"
def __init__(
self,
editable=False, # type: bool
):
self.editable = editable
@classmethod
def _from_dict(cls, d):
# type: (Optional[Dict[str, Any]]) -> Optional[DirInfo]
if d is None:
return None
return cls(
editable=_get_required(d, bool, "editable", default=False)
)
def _to_dict(self):
# type: () -> Dict[str, Any]
return _filter_none(editable=self.editable or None)
if MYPY_CHECK_RUNNING:
InfoType = Union[ArchiveInfo, DirInfo, VcsInfo]
class DirectUrl(object):
def __init__(
self,
url, # type: str
info, # type: InfoType
subdirectory=None, # type: Optional[str]
):
self.url = url
self.info = info
self.subdirectory = subdirectory
def _remove_auth_from_netloc(self, netloc):
# type: (str) -> str
if "@" not in netloc:
return netloc
user_pass, netloc_no_user_pass = netloc.split("@", 1)
if (
isinstance(self.info, VcsInfo) and
self.info.vcs == "git" and
user_pass == "git"
):
return netloc
if ENV_VAR_RE.match(user_pass):
return netloc
return netloc_no_user_pass
@property
def redacted_url(self):
# type: () -> str
"""url with user:password part removed unless it is formed with
environment variables as specified in PEP 610, or it is ``git``
in the case of a git URL.
"""
purl = urllib_parse.urlsplit(self.url)
netloc = self._remove_auth_from_netloc(purl.netloc)
surl = urllib_parse.urlunsplit(
(purl.scheme, netloc, purl.path, purl.query, purl.fragment)
)
return surl
def validate(self):
# type: () -> None
self.from_dict(self.to_dict())
@classmethod
def from_dict(cls, d):
# type: (Dict[str, Any]) -> DirectUrl
return DirectUrl(
url=_get_required(d, str, "url"),
subdirectory=_get(d, str, "subdirectory"),
info=_exactly_one_of(
[
ArchiveInfo._from_dict(_get(d, dict, "archive_info")),
DirInfo._from_dict(_get(d, dict, "dir_info")),
VcsInfo._from_dict(_get(d, dict, "vcs_info")),
]
),
)
def to_dict(self):
# type: () -> Dict[str, Any]
res = _filter_none(
url=self.redacted_url,
subdirectory=self.subdirectory,
)
res[self.info.name] = self.info._to_dict()
return res
@classmethod
def from_json(cls, s):
# type: (str) -> DirectUrl
return cls.from_dict(json.loads(s))
def to_json(self):
# type: () -> str
return json.dumps(self.to_dict(), sort_keys=True)
......@@ -19,6 +19,10 @@ from pip._internal.req.constructors import (
install_req_from_line,
)
from pip._internal.req.req_file import COMMENT_RE
from pip._internal.utils.direct_url_helpers import (
direct_url_as_pep440_direct_reference,
dist_get_direct_url,
)
from pip._internal.utils.misc import (
dist_is_editable,
get_installed_distributions,
......@@ -250,8 +254,20 @@ class FrozenRequirement(object):
@classmethod
def from_dist(cls, dist):
# type: (Distribution) -> FrozenRequirement
# TODO `get_requirement_info` is taking care of editable requirements.
# TODO This should be refactored when we will add detection of
# editable that provide .dist-info metadata.
req, editable, comments = get_requirement_info(dist)
if req is None and not editable:
# if PEP 610 metadata is present, attempt to use it
direct_url = dist_get_direct_url(dist)
if direct_url:
req = direct_url_as_pep440_direct_reference(
direct_url, dist.project_name
)
comments = []
if req is None:
# name==version requirement
req = dist.as_requirement()
return cls(dist.project_name, req, editable, comments=comments)
......
......@@ -27,6 +27,7 @@ from pip._vendor.six import StringIO
from pip._internal.exceptions import InstallationError
from pip._internal.locations import get_major_minor_version
from pip._internal.models.direct_url import DIRECT_URL_METADATA_NAME, DirectUrl
from pip._internal.utils.filesystem import adjacent_tmp_file, replace
from pip._internal.utils.misc import captured_stdout, ensure_dir, hash_file
from pip._internal.utils.temp_dir import TempDirectory
......@@ -289,7 +290,8 @@ def install_unpacked_wheel(
scheme, # type: Scheme
req_description, # type: str
pycompile=True, # type: bool
warn_script_location=True # type: bool
warn_script_location=True, # type: bool
direct_url=None, # type: Optional[DirectUrl]
):
# type: (...) -> None
"""Install a wheel.
......@@ -570,6 +572,14 @@ def install_unpacked_wheel(
replace(installer_file.name, installer_path)
generated.append(installer_path)
# Record the PEP 610 direct URL reference
if direct_url is not None:
direct_url_path = os.path.join(dest_info_dir, DIRECT_URL_METADATA_NAME)
with adjacent_tmp_file(direct_url_path) as direct_url_file:
direct_url_file.write(direct_url.to_json().encode("utf-8"))
replace(direct_url_file.name, direct_url_path)
generated.append(direct_url_path)
# Record details of all files installed
record_path = os.path.join(dest_info_dir, 'RECORD')
with open(record_path, **csv_io_kwargs('r')) as record_file:
......@@ -593,6 +603,7 @@ def install_wheel(
pycompile=True, # type: bool
warn_script_location=True, # type: bool
_temp_dir_for_testing=None, # type: Optional[str]
direct_url=None, # type: Optional[DirectUrl]
):
# type: (...) -> None
with TempDirectory(
......@@ -607,4 +618,5 @@ def install_wheel(
req_description=req_description,
pycompile=pycompile,
warn_script_location=warn_script_location,
direct_url=direct_url,
)
......@@ -31,6 +31,7 @@ from pip._internal.operations.install.wheel import install_wheel
from pip._internal.pyproject import load_pyproject_toml, make_pyproject_path
from pip._internal.req.req_uninstall import UninstallPathSet
from pip._internal.utils.deprecation import deprecated
from pip._internal.utils.direct_url_helpers import direct_url_from_link
from pip._internal.utils.hashes import Hashes
from pip._internal.utils.logging import indent_log
from pip._internal.utils.misc import (
......@@ -126,6 +127,7 @@ class InstallRequirement(object):
# PEP 508 URL requirement
link = Link(req.url)
self.link = self.original_link = link
self.original_link_is_in_wheel_cache = False
# Path to any downloaded or already-existing package.
self.local_file_path = None # type: Optional[str]
if self.link and self.link.is_file:
......@@ -785,6 +787,13 @@ class InstallRequirement(object):
if self.is_wheel:
assert self.local_file_path
direct_url = None
if self.original_link:
direct_url = direct_url_from_link(
self.original_link,
self.source_dir,
self.original_link_is_in_wheel_cache,
)
install_wheel(
self.name,
self.local_file_path,
......@@ -792,6 +801,7 @@ class InstallRequirement(object):
req_description=str(self.req),
pycompile=pycompile,
warn_script_location=warn_script_location,
direct_url=direct_url,
)
self.install_succeeded = True
return
......
......@@ -280,14 +280,16 @@ class Resolver(BaseResolver):
if self.wheel_cache is None or self.preparer.require_hashes:
return
cached_link = self.wheel_cache.get(
cache_entry = self.wheel_cache.get_cache_entry(
link=req.link,
package_name=req.name,
supported_tags=get_supported(),
)
if req.link != cached_link:
logger.debug('Using cached wheel link: %s', cached_link)
req.link = cached_link
if cache_entry is not None:
logger.debug('Using cached wheel link: %s', cache_entry.link)
if req.link is req.original_link and cache_entry.persistent:
req.original_link_is_in_wheel_cache = True
req.link = cache_entry.link
def _get_abstract_dist_for(self, req):
# type: (InstallRequirement) -> AbstractDistribution
......
import logging
from pip._internal.models.direct_url import (
DIRECT_URL_METADATA_NAME,
ArchiveInfo,
DirectUrl,
DirectUrlValidationError,
DirInfo,
VcsInfo,
)
from pip._internal.utils.typing import MYPY_CHECK_RUNNING
from pip._internal.vcs import vcs
try:
from json import JSONDecodeError
except ImportError:
# PY2
JSONDecodeError = ValueError # type: ignore
if MYPY_CHECK_RUNNING:
from typing import Optional
from pip._internal.models.link import Link
from pip._vendor.pkg_resources import Distribution
logger = logging.getLogger(__name__)
def direct_url_as_pep440_direct_reference(direct_url, name):
# type: (DirectUrl, str) -> str
"""Convert a DirectUrl to a pip requirement string."""
direct_url.validate() # if invalid, this is a pip bug
requirement = name + " @ "
fragments = []
if isinstance(direct_url.info, VcsInfo):
requirement += "{}+{}@{}".format(
direct_url.info.vcs, direct_url.url, direct_url.info.commit_id
)
elif isinstance(direct_url.info, ArchiveInfo):
requirement += direct_url.url
if direct_url.info.hash:
fragments.append(direct_url.info.hash)
else:
assert isinstance(direct_url.info, DirInfo)
# pip should never reach this point for editables, since
# pip freeze inspects the editable project location to produce
# the requirement string
assert not direct_url.info.editable
requirement += direct_url.url
if direct_url.subdirectory:
fragments.append("subdirectory=" + direct_url.subdirectory)
if fragments:
requirement += "#" + "&".join(fragments)
return requirement
def direct_url_from_link(link, source_dir=None, link_is_in_wheel_cache=False):
# type: (Link, Optional[str], bool) -> DirectUrl
if link.is_vcs:
vcs_backend = vcs.get_backend_for_scheme(link.scheme)
assert vcs_backend
url, requested_revision, _ = (
vcs_backend.get_url_rev_and_auth(link.url_without_fragment)
)
# For VCS links, we need to find out and add commit_id.
if link_is_in_wheel_cache:
# If the requested VCS link corresponds to a cached
# wheel, it means the requested revision was an
# immutable commit hash, otherwise it would not have
# been cached. In that case we don't have a source_dir
# with the VCS checkout.
assert requested_revision
commit_id = requested_revision
else:
# If the wheel was not in cache, it means we have
# had to checkout from VCS to build and we have a source_dir
# which we can inspect to find out the commit id.
assert source_dir
commit_id = vcs_backend.get_revision(source_dir)
return DirectUrl(
url=url,
info=VcsInfo(
vcs=vcs_backend.name,
commit_id=commit_id,
requested_revision=requested_revision,
),
subdirectory=link.subdirectory_fragment,
)
elif link.is_existing_dir():
return DirectUrl(
url=link.url_without_fragment,
info=DirInfo(),
subdirectory=link.subdirectory_fragment,
)
else:
hash = None
hash_name = link.hash_name
if hash_name:
hash = "{}={}".format(hash_name, link.hash)
return DirectUrl(
url=link.url_without_fragment,
info=ArchiveInfo(hash=hash),
subdirectory=link.subdirectory_fragment,
)
def dist_get_direct_url(dist):
# type: (Distribution) -> Optional[DirectUrl]
"""Obtain a DirectUrl from a pkg_resource.Distribution.
Returns None if the distribution has no `direct_url.json` metadata,
or if `direct_url.json` is invalid.
"""
if not dist.has_metadata(DIRECT_URL_METADATA_NAME):
return None
try:
return DirectUrl.from_json(dist.get_metadata(DIRECT_URL_METADATA_NAME))
except (
DirectUrlValidationError,
JSONDecodeError,
UnicodeDecodeError
) as e:
logger.warning(
"Error parsing %s for %s: %s",
DIRECT_URL_METADATA_NAME,
dist.project_name,
e,
)
return None
......@@ -816,3 +816,11 @@ def test_freeze_path_multiple(tmpdir, script, data):
simple2==3.0
<BLANKLINE>""")
_check_output(result.stdout, expected)
def test_freeze_direct_url_archive(script, shared_data, with_wheel):
req = "simple @ " + path_to_url(shared_data.packages / "simple-2.0.tar.gz")
assert req.startswith("simple @ file://")
script.pip("install", req)
result = script.pip("freeze")
assert req in result.stdout
import re
from pip._internal.models.direct_url import DIRECT_URL_METADATA_NAME, DirectUrl
from tests.lib import _create_test_package, path_to_url
def _get_created_direct_url(result, pkg):
direct_url_metadata_re = re.compile(
pkg + r"-[\d\.]+\.dist-info." + DIRECT_URL_METADATA_NAME + r"$"
)
for filename in result.files_created:
if direct_url_metadata_re.search(filename):
direct_url_path = result.test_env.base_path / filename
with open(direct_url_path) as f:
return DirectUrl.from_json(f.read())
return None
def test_install_find_links_no_direct_url(script, with_wheel):
result = script.pip_install_local("simple")
assert not _get_created_direct_url(result, "simple")
def test_install_vcs_editable_no_direct_url(script, with_wheel):
pkg_path = _create_test_package(script, name="testpkg")
args = ["install", "-e", "git+%s#egg=testpkg" % path_to_url(pkg_path)]
result = script.pip(*args)
# legacy editable installs do not generate .dist-info,
# hence no direct_url.json
assert not _get_created_direct_url(result, "testpkg")
def test_install_vcs_non_editable_direct_url(script, with_wheel):
pkg_path = _create_test_package(script, name="testpkg")
url = path_to_url(pkg_path)
args = ["install", "git+{}#egg=testpkg".format(url)]
result = script.pip(*args)
direct_url = _get_created_direct_url(result, "testpkg")
assert direct_url
assert direct_url.url == url
assert direct_url.info.vcs == "git"
def test_install_archive_direct_url(script, data, with_wheel):
req = "simple @ " + path_to_url(data.packages / "simple-2.0.tar.gz")
assert req.startswith("simple @ file://")
result = script.pip("install", req)
assert _get_created_direct_url(result, "simple")
......@@ -90,3 +90,26 @@ def test_get_with_legacy_entry_only(tmpdir):
os.path.normcase(os.path.dirname(cached_link.file_path)) ==
os.path.normcase(legacy_path)
)
def test_get_cache_entry(tmpdir):
wc = WheelCache(tmpdir, FormatControl())
persi_link = Link("https://g.c/o/r/persi")
persi_path = wc.get_path_for_link(persi_link)
ensure_dir(persi_path)
with open(os.path.join(persi_path, "persi-1.0.0-py3-none-any.whl"), "w"):
pass
ephem_link = Link("https://g.c/o/r/ephem")
ephem_path = wc.get_ephem_path_for_link(ephem_link)
ensure_dir(ephem_path)
with open(os.path.join(ephem_path, "ephem-1.0.0-py3-none-any.whl"), "w"):
pass
other_link = Link("https://g.c/o/r/other")
supported_tags = [Tag("py3", "none", "any")]
assert (
wc.get_cache_entry(persi_link, "persi", supported_tags).persistent
)
assert (
not wc.get_cache_entry(ephem_link, "ephem", supported_tags).persistent
)
assert wc.get_cache_entry(other_link, "other", supported_tags) is None
import pytest
from pip._internal.models.direct_url import (
ArchiveInfo,
DirectUrl,
DirectUrlValidationError,
DirInfo,
VcsInfo,
)
def test_from_json():
json = '{"url": "file:///home/user/project", "dir_info": {}}'
direct_url = DirectUrl.from_json(json)
assert direct_url.url == "file:///home/user/project"
assert direct_url.info.editable is False
def test_to_json():
direct_url = DirectUrl(
url="file:///home/user/archive.tgz",
info=ArchiveInfo(),
)
direct_url.validate()
assert direct_url.to_json() == (
'{"archive_info": {}, "url": "file:///home/user/archive.tgz"}'
)
def test_archive_info():
direct_url_dict = {
"url": "file:///home/user/archive.tgz",
"archive_info": {
"hash": "sha1=1b8c5bc61a86f377fea47b4276c8c8a5842d2220"
},
}
direct_url = DirectUrl.from_dict(direct_url_dict)
assert isinstance(direct_url.info, ArchiveInfo)
assert direct_url.url == direct_url_dict["url"]
assert direct_url.info.hash == direct_url_dict["archive_info"]["hash"]
assert direct_url.to_dict() == direct_url_dict
def test_dir_info():
direct_url_dict = {
"url": "file:///home/user/project",
"dir_info": {"editable": True},
}
direct_url = DirectUrl.from_dict(direct_url_dict)
assert isinstance(direct_url.info, DirInfo)
assert direct_url.url == direct_url_dict["url"]
assert direct_url.info.editable is True
assert direct_url.to_dict() == direct_url_dict
# test editable default to False
direct_url_dict = {"url": "file:///home/user/project", "dir_info": {}}
direct_url = DirectUrl.from_dict(direct_url_dict)
assert direct_url.info.editable is False
def test_vcs_info():
direct_url_dict = {
"url": "https:///g.c/u/p.git",
"vcs_info": {
"vcs": "git",
"requested_revision": "master",
"commit_id": "1b8c5bc61a86f377fea47b4276c8c8a5842d2220",
},
}
direct_url = DirectUrl.from_dict(direct_url_dict)
assert isinstance(direct_url.info, VcsInfo)
assert direct_url.url == direct_url_dict["url"]
assert direct_url.info.vcs == "git"
assert direct_url.info.requested_revision == "master"
assert (
direct_url.info.commit_id == "1b8c5bc61a86f377fea47b4276c8c8a5842d2220"
)
assert direct_url.to_dict() == direct_url_dict
def test_parsing_validation():
with pytest.raises(
DirectUrlValidationError, match="url must have a value"
):
DirectUrl.from_dict({"dir_info": {}})
with pytest.raises(
DirectUrlValidationError,
match="missing one of archive_info, dir_info, vcs_info",
):
DirectUrl.from_dict({"url": "http://..."})
with pytest.raises(
DirectUrlValidationError, match="unexpected type for editable"
):
DirectUrl.from_dict(
{"url": "http://...", "dir_info": {"editable": "false"}}
)
with pytest.raises(
DirectUrlValidationError, match="unexpected type for hash"
):
DirectUrl.from_dict({"url": "http://...", "archive_info": {"hash": 1}})
with pytest.raises(
DirectUrlValidationError, match="unexpected type for vcs"
):
DirectUrl.from_dict({"url": "http://...", "vcs_info": {"vcs": None}})
with pytest.raises(
DirectUrlValidationError, match="commit_id must have a value"
):
DirectUrl.from_dict({"url": "http://...", "vcs_info": {"vcs": "git"}})
with pytest.raises(
DirectUrlValidationError,
match="more than one of archive_info, dir_info, vcs_info",
):
DirectUrl.from_dict(
{"url": "http://...", "dir_info": {}, "archive_info": {}}
)
def test_redact_url():
def _redact_git(url):
direct_url = DirectUrl(
url=url,
info=VcsInfo(vcs="git", commit_id="1"),
)
return direct_url.redacted_url
def _redact_archive(url):
direct_url = DirectUrl(
url=url,
info=ArchiveInfo(),
)
return direct_url.redacted_url
assert (
_redact_git("https://user:password@g.c/u/p.git@branch#egg=pkg") ==
"https://g.c/u/p.git@branch#egg=pkg"
)
assert (
_redact_git("https://${USER}:password@g.c/u/p.git") ==
"https://g.c/u/p.git"
)
assert (
_redact_archive("file://${U}:${PIP_PASSWORD}@g.c/u/p.tgz") ==
"file://${U}:${PIP_PASSWORD}@g.c/u/p.tgz"
)
assert (
_redact_git("https://${PIP_TOKEN}@g.c/u/p.git") ==
"https://${PIP_TOKEN}@g.c/u/p.git"
)
assert (
_redact_git("ssh://git@g.c/u/p.git") ==
"ssh://git@g.c/u/p.git"
)
from functools import partial
from mock import MagicMock, patch
from pip._internal.models.direct_url import (
DIRECT_URL_METADATA_NAME,
ArchiveInfo,
DirectUrl,
DirInfo,
VcsInfo,
)
from pip._internal.models.link import Link
from pip._internal.utils.direct_url_helpers import (
direct_url_as_pep440_direct_reference,
direct_url_from_link,
dist_get_direct_url,
)
from pip._internal.utils.urls import path_to_url
def test_as_pep440_requirement_archive():
direct_url = DirectUrl(
url="file:///home/user/archive.tgz",
info=ArchiveInfo(),
)
direct_url.validate()
assert (
direct_url_as_pep440_direct_reference(direct_url, "pkg") ==
"pkg @ file:///home/user/archive.tgz"
)
direct_url.subdirectory = "subdir"
direct_url.validate()
assert (
direct_url_as_pep440_direct_reference(direct_url, "pkg") ==
"pkg @ file:///home/user/archive.tgz#subdirectory=subdir"
)
direct_url.info.hash = "sha1=1b8c5bc61a86f377fea47b4276c8c8a5842d2220"
direct_url.validate()
assert (
direct_url_as_pep440_direct_reference(direct_url, "pkg") ==
"pkg @ file:///home/user/archive.tgz"
"#sha1=1b8c5bc61a86f377fea47b4276c8c8a5842d2220&subdirectory=subdir"
)
def test_as_pep440_requirement_dir():
direct_url = DirectUrl(
url="file:///home/user/project",
info=DirInfo(editable=False),
)
direct_url.validate()
assert (
direct_url_as_pep440_direct_reference(direct_url, "pkg") ==
"pkg @ file:///home/user/project"
)
def test_as_pep440_requirement_vcs():
direct_url = DirectUrl(
url="https:///g.c/u/p.git",
info=VcsInfo(
vcs="git", commit_id="1b8c5bc61a86f377fea47b4276c8c8a5842d2220"
)
)
direct_url.validate()
assert (
direct_url_as_pep440_direct_reference(direct_url, "pkg") ==
"pkg @ git+https:///g.c/u/p.git"
"@1b8c5bc61a86f377fea47b4276c8c8a5842d2220"
)
direct_url.subdirectory = "subdir"
direct_url.validate()
assert (
direct_url_as_pep440_direct_reference(direct_url, "pkg") ==
"pkg @ git+https:///g.c/u/p.git"
"@1b8c5bc61a86f377fea47b4276c8c8a5842d2220#subdirectory=subdir"
)
@patch("pip._internal.vcs.git.Git.get_revision")
def test_from_link_vcs(mock_get_backend_for_scheme):
_direct_url_from_link = partial(direct_url_from_link, source_dir="...")
direct_url = _direct_url_from_link(Link("git+https://g.c/u/p.git"))
assert direct_url.url == "https://g.c/u/p.git"
assert isinstance(direct_url.info, VcsInfo)
assert direct_url.info.vcs == "git"
direct_url = _direct_url_from_link(Link("git+https://g.c/u/p.git#egg=pkg"))
assert direct_url.url == "https://g.c/u/p.git"
direct_url = _direct_url_from_link(
Link("git+https://g.c/u/p.git#egg=pkg&subdirectory=subdir")
)
assert direct_url.url == "https://g.c/u/p.git"
assert direct_url.subdirectory == "subdir"
direct_url = _direct_url_from_link(Link("git+https://g.c/u/p.git@branch"))
assert direct_url.url == "https://g.c/u/p.git"
assert direct_url.info.requested_revision == "branch"
direct_url = _direct_url_from_link(
Link("git+https://g.c/u/p.git@branch#egg=pkg")
)
assert direct_url.url == "https://g.c/u/p.git"
assert direct_url.info.requested_revision == "branch"
direct_url = _direct_url_from_link(
Link("git+https://token@g.c/u/p.git")
)
assert direct_url.to_dict()["url"] == "https://g.c/u/p.git"
def test_from_link_vcs_with_source_dir_obtains_commit_id(script, tmpdir):
repo_path = tmpdir / 'test-repo'
repo_path.mkdir()
repo_dir = str(repo_path)
script.run('git', 'init', cwd=repo_dir)
(repo_path / "somefile").touch()
script.run('git', 'add', '.', cwd=repo_dir)
script.run('git', 'commit', '-m', 'commit msg', cwd=repo_dir)
commit_id = script.run(
'git', 'rev-parse', 'HEAD', cwd=repo_dir
).stdout.strip()
direct_url = direct_url_from_link(
Link("git+https://g.c/u/p.git"), source_dir=repo_dir
)
assert direct_url.url == "https://g.c/u/p.git"
assert direct_url.info.commit_id == commit_id
def test_from_link_vcs_without_source_dir(script, tmpdir):
direct_url = direct_url_from_link(
Link("git+https://g.c/u/p.git@1"), link_is_in_wheel_cache=True
)
assert direct_url.url == "https://g.c/u/p.git"
assert direct_url.info.commit_id == "1"
def test_from_link_archive():
direct_url = direct_url_from_link(Link("https://g.c/archive.tgz"))
assert direct_url.url == "https://g.c/archive.tgz"
assert isinstance(direct_url.info, ArchiveInfo)
direct_url = direct_url_from_link(
Link(
"https://g.c/archive.tgz"
"#sha1=1b8c5bc61a86f377fea47b4276c8c8a5842d2220"
)
)
assert isinstance(direct_url.info, ArchiveInfo)
assert (
direct_url.info.hash == "sha1=1b8c5bc61a86f377fea47b4276c8c8a5842d2220"
)
def test_from_link_dir(tmpdir):
dir_url = path_to_url(tmpdir)
direct_url = direct_url_from_link(Link(dir_url))
assert direct_url.url == dir_url
assert isinstance(direct_url.info, DirInfo)
def test_from_link_hide_user_password():
# Basic test only here, other variants are covered by
# direct_url.redact_url tests.
direct_url = direct_url_from_link(
Link("git+https://user:password@g.c/u/p.git@branch#egg=pkg"),
link_is_in_wheel_cache=True,
)
assert direct_url.to_dict()["url"] == "https://g.c/u/p.git"
direct_url = direct_url_from_link(
Link("git+ssh://git@g.c/u/p.git@branch#egg=pkg"),
link_is_in_wheel_cache=True,
)
assert direct_url.to_dict()["url"] == "ssh://git@g.c/u/p.git"
def test_dist_get_direct_url_no_metadata():
dist = MagicMock()
dist.has_metadata.return_value = False
assert dist_get_direct_url(dist) is None
dist.has_metadata.assert_called()
def test_dist_get_direct_url_bad_metadata():
dist = MagicMock()
dist.has_metadata.return_value = True
dist.get_metadata.return_value = "{}" # invalid direct_url.json
assert dist_get_direct_url(dist) is None
dist.get_metadata.assert_called_with(DIRECT_URL_METADATA_NAME)
def test_dist_get_direct_url_valid_metadata():
dist = MagicMock()
dist.has_metadata.return_value = True
dist.get_metadata.return_value = (
'{"url": "https://e.c/p.tgz", "archive_info": {}}'
)
direct_url = dist_get_direct_url(dist)
dist.get_metadata.assert_called_with(DIRECT_URL_METADATA_NAME)
assert direct_url.url == "https://e.c/p.tgz"
assert isinstance(direct_url.info, ArchiveInfo)
......@@ -10,6 +10,11 @@ from mock import patch
from pip._vendor.packaging.requirements import Requirement
from pip._internal.locations import get_scheme
from pip._internal.models.direct_url import (
DIRECT_URL_METADATA_NAME,
ArchiveInfo,
DirectUrl,
)
from pip._internal.models.scheme import Scheme
from pip._internal.operations.build.wheel_legacy import (
get_legacy_build_wheel_path,
......@@ -259,6 +264,37 @@ class TestInstallUnpackedWheel(object):
)
self.assert_installed()
def test_std_install_with_direct_url(self, data, tmpdir):
"""Test that install_wheel creates direct_url.json metadata when
provided with a direct_url argument. Also test that the RECORDS
file contains an entry for direct_url.json in that case.
Note direct_url.url is intentionally different from wheelpath,
because wheelpath is typically the result of a local build.
"""
self.prep(data, tmpdir)
direct_url = DirectUrl(
url="file:///home/user/archive.tgz",
info=ArchiveInfo(),
)
wheel.install_wheel(
self.name,
self.wheelpath,
scheme=self.scheme,
req_description=str(self.req),
direct_url=direct_url,
)
direct_url_path = os.path.join(
self.dest_dist_info, DIRECT_URL_METADATA_NAME
)
assert os.path.isfile(direct_url_path)
with open(direct_url_path, 'rb') as f:
expected_direct_url_json = direct_url.to_json()
direct_url_json = f.read().decode("utf-8")
assert direct_url_json == expected_direct_url_json
# check that the direc_url file is part of RECORDS
with open(os.path.join(self.dest_dist_info, "RECORD")) as f:
assert DIRECT_URL_METADATA_NAME in f.read()
def test_install_prefix(self, data, tmpdir):
prefix = os.path.join(os.path.sep, 'some', 'path')
self.prep(data, tmpdir)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册