conftest.py 7.4 KB
Newer Older
F
frostming 已提交
1
import json
F
Frost Ming 已提交
2 3
import os
import shutil
F
frostming 已提交
4 5
from io import BytesIO
from pathlib import Path
F
frostming 已提交
6
from typing import Callable, Iterable, List, Optional, Tuple
F
frostming 已提交
7 8
from urllib.parse import urlparse

F
Frost Ming 已提交
9 10
from pip._internal.vcs import versioncontrol
from pip._vendor import requests
F
frostming 已提交
11
from pip._vendor.pkg_resources import safe_name
F
Frost Ming 已提交
12

F
Frost Ming 已提交
13
import pytest
F
frostming 已提交
14
from pdm.exceptions import CandidateInfoNotFound
F
frostming 已提交
15
from pdm.installers import Synchronizer
F
frostming 已提交
16 17 18 19
from pdm.models.candidates import Candidate
from pdm.models.repositories import BaseRepository
from pdm.models.requirements import Requirement
from pdm.models.specifiers import PySpecSet
F
frostming 已提交
20
from pdm.project import Project
F
frostming 已提交
21
from pdm.types import CandidateInfo
F
frostming 已提交
22 23 24 25
from pdm.utils import get_finder
from tests import FIXTURES


F
frostming 已提交
26
class LocalFileAdapter(requests.adapters.BaseAdapter):
F
frostming 已提交
27 28 29 30 31
    def __init__(self, base_path):
        super().__init__()
        self.base_path = base_path
        self._opened_files = []

F
Frost Ming 已提交
32 33 34 35 36 37
    def send(
        self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None
    ):
        file_path = self.base_path / urlparse(request.url).path.lstrip(
            "/"
        )  # type: Path
F
Frost Ming 已提交
38
        response = requests.models.Response()
F
frostming 已提交
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
        response.request = request
        if not file_path.exists():
            response.status_code = 404
            response.reason = "Not Found"
            response.raw = BytesIO(b"Not Found")
        else:
            response.status_code = 200
            response.reason = "OK"
            response.raw = file_path.open("rb")
        self._opened_files.append(response.raw)
        return response

    def close(self):
        for fp in self._opened_files:
            fp.close()
        self._opened_files.clear()


F
Frost Ming 已提交
57 58 59 60 61 62 63
class MockVersionControl(versioncontrol.VersionControl):
    def obtain(self, dest, url):
        url, _ = self.get_url_rev_options(url)
        path = os.path.splitext(os.path.basename(urlparse(str(url)).path))[0]
        mocked_path = FIXTURES / "projects" / path
        shutil.copytree(mocked_path, dest)

F
frostming 已提交
64 65 66 67
    @classmethod
    def get_revision(cls, location):
        return "1234567890abcdef"

F
Frost Ming 已提交
68

F
frostming 已提交
69
class TestRepository(BaseRepository):
F
frostming 已提交
70 71
    def __init__(self, sources, environment):
        super().__init__(sources, environment)
F
frostming 已提交
72 73 74 75 76 77 78 79 80 81 82 83 84 85
        self._pypi_data = {}
        self.load_fixtures()

    def add_candidate(self, name, version, requires_python=""):
        pypi_data = self._pypi_data.setdefault(safe_name(name), {}).setdefault(
            version, {}
        )
        pypi_data["requires_python"] = requires_python

    def add_dependencies(self, name, version, requirements):
        pypi_data = self._pypi_data[safe_name(name)][version]
        pypi_data.setdefault("dependencies", []).extend(requirements)

    def _get_dependencies_from_fixture(
F
Frost Ming 已提交
86
        self, candidate: Candidate
F
frostming 已提交
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
    ) -> Tuple[List[str], str, str]:
        try:
            pypi_data = self._pypi_data[candidate.req.key][candidate.version]
        except KeyError:
            raise CandidateInfoNotFound(candidate)
        deps = pypi_data.get("dependencies", [])
        for extra in candidate.req.extras or ():
            deps.extend(pypi_data.get("extras_require", {}).get(extra, []))
        return deps, pypi_data.get("requires_python", ""), ""

    def dependency_generators(self) -> Iterable[Callable[[Candidate], CandidateInfo]]:
        return (
            self._get_dependencies_from_cache,
            self._get_dependencies_from_fixture,
            self._get_dependencies_from_metadata,
        )

    def get_hashes(self, candidate: Candidate) -> None:
        candidate.hashes = {}
F
frostming 已提交
106

F
Frost Ming 已提交
107 108 109
    def _find_named_matches(
        self,
        requirement: Requirement,
F
frostming 已提交
110 111 112
        requires_python: PySpecSet = PySpecSet(),
        allow_prereleases: Optional[bool] = None,
        allow_all: bool = False,
F
Frost Ming 已提交
113
    ) -> List[Candidate]:
F
frostming 已提交
114 115 116 117 118 119
        if allow_prereleases is None:
            allow_prereleases = requirement.allow_prereleases

        cans = []
        for version, candidate in self._pypi_data.get(requirement.key, {}).items():
            c = Candidate(
F
frostming 已提交
120 121
                requirement, self.environment,
                name=requirement.project_name, version=version
F
frostming 已提交
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
            )
            c._requires_python = PySpecSet(candidate.get("requires_python", ""))
            cans.append(c)

        sorted_cans = sorted(
            (
                c
                for c in cans
                if requirement.specifier.contains(c.version, allow_prereleases)
            ),
            key=lambda c: c.version,
        )
        if not allow_all:
            sorted_cans = [
                can
                for can in sorted_cans
                if requires_python.is_subset(can.requires_python)
            ]
        if not sorted_cans and allow_prereleases is None:
            # No non-pre-releases is found, force pre-releases now
            sorted_cans = sorted(
                (c for c in cans if requirement.specifier.contains(c.version, True)),
                key=lambda c: c.version,
            )
        return sorted_cans
F
frostming 已提交
147

F
frostming 已提交
148 149 150 151
    def load_fixtures(self):
        json_file = FIXTURES / "pypi.json"
        self._pypi_data = json.loads(json_file.read_text())

F
frostming 已提交
152

F
frostming 已提交
153
class TestProject(Project):
F
frostming 已提交
154 155 156
    pass


F
frostming 已提交
157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188
class MockSynchronizer(Synchronizer):
    def __init__(self):
        super().__init__({}, None)
        self.working_set = {}

    def compare_with_working_set(self):
        working_set = self.working_set
        to_update, to_remove = [], []
        candidates = self.candidates.copy()
        for key, version in working_set.items():
            if key not in candidates:
                to_remove.append(key)
            else:
                can = candidates.pop(key)
                if version != can.version:
                    to_update.append(key)
        to_add = list(candidates)
        return to_add, to_update, to_remove

    def install_candidates(self, candidates):
        self.working_set.update({
            can.req.key: can.version for can in candidates
        })

    def remove_distributions(self, distributions):
        for dist in distributions:
            try:
                del self.working_set[dist]
            except KeyError:
                pass


F
frostming 已提交
189 190 191 192
def get_local_finder(*args, **kwargs):
    finder = get_finder(*args, **kwargs)
    finder.session.mount("http://fixtures.test/", LocalFileAdapter(FIXTURES))
    return finder
F
frostming 已提交
193 194 195


@pytest.fixture()
F
frostming 已提交
196
def project(tmp_path, mocker):
F
frostming 已提交
197 198
    p = TestProject(tmp_path.as_posix())
    p.config["cache_dir"] = tmp_path.joinpath("caches").as_posix()
F
frostming 已提交
199 200
    mocker.patch("pdm.utils.get_finder", get_local_finder)
    mocker.patch("pdm.models.environment.get_finder", get_local_finder)
F
frostming 已提交
201
    p.init_pyproject()
F
frostming 已提交
202
    return p
F
Frost Ming 已提交
203 204


F
frostming 已提交
205 206
@pytest.fixture()
def repository(project):
F
frostming 已提交
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
    rv = TestRepository([], project.environment)
    project.get_repository = lambda: rv
    return rv


@pytest.fixture()
def synchronizer(mocker):
    rv = MockSynchronizer()

    def new_synchronizer(candidates, environment):
        rv.candidates = candidates
        rv.environment = environment
        return rv

    mocker.patch("pdm.cli.actions.Synchronizer", new_synchronizer)
    yield rv
F
frostming 已提交
223 224


F
Frost Ming 已提交
225 226 227 228 229 230 231 232 233 234 235 236 237
@pytest.fixture()
def vcs(mocker):
    ret = MockVersionControl()
    mocker.patch(
        "pip._internal.vcs.versioncontrol.VcsSupport.get_backend", return_value=ret
    )
    mocker.patch("pip._internal.download._get_used_vcs_backend", return_value=ret)
    yield ret


@pytest.fixture(params=[False, True])
def is_editable(request):
    return request.param
F
frostming 已提交
238 239 240 241 242


@pytest.fixture(params=[False, True])
def is_dev(request):
    return request.param