Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
镜像
Python_Packaging_Authority
pip
提交
cf7ebdbb
P
pip
项目概览
镜像
/
Python_Packaging_Authority
/
pip
12 个月 前同步成功
通知
0
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
P
pip
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
cf7ebdbb
编写于
9月 26, 2019
作者:
C
Chris Hunt
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Move PipSession to network.session
上级
6ce7217a
变更
16
隐藏空白更改
内联
并排
Showing
16 changed file
with
467 addition
and
451 deletion
+467
-451
src/pip/_internal/cli/req_command.py
src/pip/_internal/cli/req_command.py
+1
-1
src/pip/_internal/collector.py
src/pip/_internal/collector.py
+1
-1
src/pip/_internal/download.py
src/pip/_internal/download.py
+5
-262
src/pip/_internal/legacy_resolve.py
src/pip/_internal/legacy_resolve.py
+1
-1
src/pip/_internal/network/session.py
src/pip/_internal/network/session.py
+275
-3
src/pip/_internal/operations/prepare.py
src/pip/_internal/operations/prepare.py
+1
-1
src/pip/_internal/req/req_file.py
src/pip/_internal/req/req_file.py
+1
-1
src/pip/_internal/utils/outdated.py
src/pip/_internal/utils/outdated.py
+2
-1
tests/lib/__init__.py
tests/lib/__init__.py
+1
-1
tests/unit/test_collector.py
tests/unit/test_collector.py
+1
-1
tests/unit/test_download.py
tests/unit/test_download.py
+1
-172
tests/unit/test_index.py
tests/unit/test_index.py
+1
-1
tests/unit/test_network_session.py
tests/unit/test_network_session.py
+173
-2
tests/unit/test_req.py
tests/unit/test_req.py
+1
-1
tests/unit/test_req_file.py
tests/unit/test_req_file.py
+1
-1
tests/unit/test_unit_outdated.py
tests/unit/test_unit_outdated.py
+1
-1
未找到文件。
src/pip/_internal/cli/req_command.py
浏览文件 @
cf7ebdbb
...
...
@@ -10,11 +10,11 @@ from functools import partial
from
pip._internal.cli.base_command
import
Command
from
pip._internal.cli.command_context
import
CommandContextMixIn
from
pip._internal.download
import
PipSession
from
pip._internal.exceptions
import
CommandError
from
pip._internal.index
import
PackageFinder
from
pip._internal.legacy_resolve
import
Resolver
from
pip._internal.models.selection_prefs
import
SelectionPreferences
from
pip._internal.network.session
import
PipSession
from
pip._internal.operations.prepare
import
RequirementPreparer
from
pip._internal.req.constructors
import
(
install_req_from_editable
,
...
...
src/pip/_internal/collector.py
浏览文件 @
cf7ebdbb
...
...
@@ -32,7 +32,7 @@ if MYPY_CHECK_RUNNING:
from
pip._vendor.requests
import
Response
from
pip._internal.models.search_scope
import
SearchScope
from
pip._internal.
download
import
PipSession
from
pip._internal.
network.session
import
PipSession
HTMLElement
=
xml
.
etree
.
ElementTree
.
Element
ResponseHeaders
=
MutableMapping
[
str
,
str
]
...
...
src/pip/_internal/download.py
浏览文件 @
cf7ebdbb
from
__future__
import
absolute_import
import
cgi
import
email.utils
import
logging
import
mimetypes
import
os
...
...
@@ -9,11 +8,8 @@ import re
import
shutil
import
sys
from
pip._vendor
import
requests
,
six
,
urllib3
from
pip._vendor.cachecontrol
import
CacheControlAdapter
from
pip._vendor.requests.adapters
import
BaseAdapter
,
HTTPAdapter
from
pip._vendor
import
requests
from
pip._vendor.requests.models
import
CONTENT_CHUNK_SIZE
,
Response
from
pip._vendor.requests.structures
import
CaseInsensitiveDict
from
pip._vendor.six
import
PY2
# NOTE: XMLRPC Client is not annotated in typeshed as on 2017-07-17, which is
# why we ignore the type on this import
...
...
@@ -22,22 +18,16 @@ from pip._vendor.six.moves.urllib import parse as urllib_parse
from
pip._internal.exceptions
import
HashMismatch
,
InstallationError
from
pip._internal.models.index
import
PyPI
from
pip._internal.network.auth
import
MultiDomainBasicAuth
from
pip._internal.network.cache
import
SafeFileCache
from
pip._internal.network.session
import
SECURE_ORIGINS
,
user_agent
# Import ssl from compat so the initial import occurs in only one place.
from
pip._internal.utils.compat
import
ipaddress
from
pip._internal.network.session
import
PipSession
from
pip._internal.utils.encoding
import
auto_decode
from
pip._internal.utils.filesystem
import
c
heck_path_owner
,
c
opy2_fixed
from
pip._internal.utils.filesystem
import
copy2_fixed
from
pip._internal.utils.misc
import
(
ask_path_exists
,
backup_dir
,
build_url_from_netloc
,
consume
,
display_path
,
format_size
,
hide_url
,
parse_netloc
,
path_to_display
,
rmtree
,
splitext
,
...
...
@@ -46,18 +36,17 @@ from pip._internal.utils.temp_dir import TempDirectory
from
pip._internal.utils.typing
import
MYPY_CHECK_RUNNING
from
pip._internal.utils.ui
import
DownloadProgressProvider
from
pip._internal.utils.unpacking
import
unpack_file
from
pip._internal.utils.urls
import
get_url_scheme
,
url_to_path
from
pip._internal.utils.urls
import
get_url_scheme
from
pip._internal.vcs
import
vcs
if
MYPY_CHECK_RUNNING
:
from
typing
import
(
IO
,
Callable
,
Iterator
,
List
,
Optional
,
Text
,
Tuple
,
IO
,
Callable
,
List
,
Optional
,
Text
,
Tuple
,
)
from
mypy_extensions
import
TypedDict
from
pip._internal.models.link
import
Link
from
pip._internal.network.requests
import
SecureOrigin
from
pip._internal.utils.hashes
import
Hashes
from
pip._internal.vcs.versioncontrol
import
VersionControl
...
...
@@ -93,252 +82,6 @@ __all__ = ['get_file_content',
logger
=
logging
.
getLogger
(
__name__
)
class
LocalFSAdapter
(
BaseAdapter
):
def
send
(
self
,
request
,
stream
=
None
,
timeout
=
None
,
verify
=
None
,
cert
=
None
,
proxies
=
None
):
pathname
=
url_to_path
(
request
.
url
)
resp
=
Response
()
resp
.
status_code
=
200
resp
.
url
=
request
.
url
try
:
stats
=
os
.
stat
(
pathname
)
except
OSError
as
exc
:
resp
.
status_code
=
404
resp
.
raw
=
exc
else
:
modified
=
email
.
utils
.
formatdate
(
stats
.
st_mtime
,
usegmt
=
True
)
content_type
=
mimetypes
.
guess_type
(
pathname
)[
0
]
or
"text/plain"
resp
.
headers
=
CaseInsensitiveDict
({
"Content-Type"
:
content_type
,
"Content-Length"
:
stats
.
st_size
,
"Last-Modified"
:
modified
,
})
resp
.
raw
=
open
(
pathname
,
"rb"
)
resp
.
close
=
resp
.
raw
.
close
return
resp
def
close
(
self
):
pass
class
InsecureHTTPAdapter
(
HTTPAdapter
):
def
cert_verify
(
self
,
conn
,
url
,
verify
,
cert
):
conn
.
cert_reqs
=
'CERT_NONE'
conn
.
ca_certs
=
None
class
PipSession
(
requests
.
Session
):
timeout
=
None
# type: Optional[int]
def
__init__
(
self
,
*
args
,
**
kwargs
):
"""
:param trusted_hosts: Domains not to emit warnings for when not using
HTTPS.
"""
retries
=
kwargs
.
pop
(
"retries"
,
0
)
cache
=
kwargs
.
pop
(
"cache"
,
None
)
trusted_hosts
=
kwargs
.
pop
(
"trusted_hosts"
,
[])
# type: List[str]
index_urls
=
kwargs
.
pop
(
"index_urls"
,
None
)
super
(
PipSession
,
self
).
__init__
(
*
args
,
**
kwargs
)
# Namespace the attribute with "pip_" just in case to prevent
# possible conflicts with the base class.
self
.
pip_trusted_origins
=
[]
# type: List[Tuple[str, Optional[int]]]
# Attach our User Agent to the request
self
.
headers
[
"User-Agent"
]
=
user_agent
()
# Attach our Authentication handler to the session
self
.
auth
=
MultiDomainBasicAuth
(
index_urls
=
index_urls
)
# Create our urllib3.Retry instance which will allow us to customize
# how we handle retries.
retries
=
urllib3
.
Retry
(
# Set the total number of retries that a particular request can
# have.
total
=
retries
,
# A 503 error from PyPI typically means that the Fastly -> Origin
# connection got interrupted in some way. A 503 error in general
# is typically considered a transient error so we'll go ahead and
# retry it.
# A 500 may indicate transient error in Amazon S3
# A 520 or 527 - may indicate transient error in CloudFlare
status_forcelist
=
[
500
,
503
,
520
,
527
],
# Add a small amount of back off between failed requests in
# order to prevent hammering the service.
backoff_factor
=
0.25
,
)
# Check to ensure that the directory containing our cache directory
# is owned by the user current executing pip. If it does not exist
# we will check the parent directory until we find one that does exist.
if
cache
and
not
check_path_owner
(
cache
):
logger
.
warning
(
"The directory '%s' or its parent directory is not owned by "
"the current user and the cache has been disabled. Please "
"check the permissions and owner of that directory. If "
"executing pip with sudo, you may want sudo's -H flag."
,
cache
,
)
cache
=
None
# We want to _only_ cache responses on securely fetched origins. We do
# this because we can't validate the response of an insecurely fetched
# origin, and we don't want someone to be able to poison the cache and
# require manual eviction from the cache to fix it.
if
cache
:
secure_adapter
=
CacheControlAdapter
(
cache
=
SafeFileCache
(
cache
),
max_retries
=
retries
,
)
else
:
secure_adapter
=
HTTPAdapter
(
max_retries
=
retries
)
# Our Insecure HTTPAdapter disables HTTPS validation. It does not
# support caching (see above) so we'll use it for all http:// URLs as
# well as any https:// host that we've marked as ignoring TLS errors
# for.
insecure_adapter
=
InsecureHTTPAdapter
(
max_retries
=
retries
)
# Save this for later use in add_insecure_host().
self
.
_insecure_adapter
=
insecure_adapter
self
.
mount
(
"https://"
,
secure_adapter
)
self
.
mount
(
"http://"
,
insecure_adapter
)
# Enable file:// urls
self
.
mount
(
"file://"
,
LocalFSAdapter
())
for
host
in
trusted_hosts
:
self
.
add_trusted_host
(
host
,
suppress_logging
=
True
)
def
add_trusted_host
(
self
,
host
,
source
=
None
,
suppress_logging
=
False
):
# type: (str, Optional[str], bool) -> None
"""
:param host: It is okay to provide a host that has previously been
added.
:param source: An optional source string, for logging where the host
string came from.
"""
if
not
suppress_logging
:
msg
=
'adding trusted host: {!r}'
.
format
(
host
)
if
source
is
not
None
:
msg
+=
' (from {})'
.
format
(
source
)
logger
.
info
(
msg
)
host_port
=
parse_netloc
(
host
)
if
host_port
not
in
self
.
pip_trusted_origins
:
self
.
pip_trusted_origins
.
append
(
host_port
)
self
.
mount
(
build_url_from_netloc
(
host
)
+
'/'
,
self
.
_insecure_adapter
)
if
not
host_port
[
1
]:
# Mount wildcard ports for the same host.
self
.
mount
(
build_url_from_netloc
(
host
)
+
':'
,
self
.
_insecure_adapter
)
def
iter_secure_origins
(
self
):
# type: () -> Iterator[SecureOrigin]
for
secure_origin
in
SECURE_ORIGINS
:
yield
secure_origin
for
host
,
port
in
self
.
pip_trusted_origins
:
yield
(
'*'
,
host
,
'*'
if
port
is
None
else
port
)
def
is_secure_origin
(
self
,
location
):
# type: (Link) -> bool
# Determine if this url used a secure transport mechanism
parsed
=
urllib_parse
.
urlparse
(
str
(
location
))
origin_protocol
,
origin_host
,
origin_port
=
(
parsed
.
scheme
,
parsed
.
hostname
,
parsed
.
port
,
)
# The protocol to use to see if the protocol matches.
# Don't count the repository type as part of the protocol: in
# cases such as "git+ssh", only use "ssh". (I.e., Only verify against
# the last scheme.)
origin_protocol
=
origin_protocol
.
rsplit
(
'+'
,
1
)[
-
1
]
# Determine if our origin is a secure origin by looking through our
# hardcoded list of secure origins, as well as any additional ones
# configured on this PackageFinder instance.
for
secure_origin
in
self
.
iter_secure_origins
():
secure_protocol
,
secure_host
,
secure_port
=
secure_origin
if
origin_protocol
!=
secure_protocol
and
secure_protocol
!=
"*"
:
continue
try
:
# We need to do this decode dance to ensure that we have a
# unicode object, even on Python 2.x.
addr
=
ipaddress
.
ip_address
(
origin_host
if
(
isinstance
(
origin_host
,
six
.
text_type
)
or
origin_host
is
None
)
else
origin_host
.
decode
(
"utf8"
)
)
network
=
ipaddress
.
ip_network
(
secure_host
if
isinstance
(
secure_host
,
six
.
text_type
)
# setting secure_host to proper Union[bytes, str]
# creates problems in other places
else
secure_host
.
decode
(
"utf8"
)
# type: ignore
)
except
ValueError
:
# We don't have both a valid address or a valid network, so
# we'll check this origin against hostnames.
if
(
origin_host
and
origin_host
.
lower
()
!=
secure_host
.
lower
()
and
secure_host
!=
"*"
):
continue
else
:
# We have a valid address and network, so see if the address
# is contained within the network.
if
addr
not
in
network
:
continue
# Check to see if the port matches.
if
(
origin_port
!=
secure_port
and
secure_port
!=
"*"
and
secure_port
is
not
None
):
continue
# If we've gotten here, then this origin matches the current
# secure origin and we should return True
return
True
# If we've gotten to this point, then the origin isn't secure and we
# will not accept it as a valid location to search. We will however
# log a warning that we are ignoring it.
logger
.
warning
(
"The repository located at %s is not a trusted or secure host and "
"is being ignored. If this repository is available via HTTPS we "
"recommend you use HTTPS instead, otherwise you may silence "
"this warning and allow it anyway with '--trusted-host %s'."
,
origin_host
,
origin_host
,
)
return
False
def
request
(
self
,
method
,
url
,
*
args
,
**
kwargs
):
# Allow setting a default timeout on a session
kwargs
.
setdefault
(
"timeout"
,
self
.
timeout
)
# Dispatch the actual request
return
super
(
PipSession
,
self
).
request
(
method
,
url
,
*
args
,
**
kwargs
)
def
get_file_content
(
url
,
comes_from
=
None
,
session
=
None
):
# type: (str, Optional[str], Optional[PipSession]) -> Tuple[str, Text]
"""Gets the content of a file; it may be a filename, file: URL, or
...
...
src/pip/_internal/legacy_resolve.py
浏览文件 @
cf7ebdbb
...
...
@@ -44,7 +44,7 @@ if MYPY_CHECK_RUNNING:
from
pip._vendor
import
pkg_resources
from
pip._internal.distributions
import
AbstractDistribution
from
pip._internal.
download
import
PipSession
from
pip._internal.
network.session
import
PipSession
from
pip._internal.index
import
PackageFinder
from
pip._internal.operations.prepare
import
RequirementPreparer
from
pip._internal.req.req_install
import
InstallRequirement
...
...
src/pip/_internal/network/session.py
浏览文件 @
cf7ebdbb
import
email.utils
import
json
import
logging
import
mimetypes
import
os
import
platform
import
sys
from
pip._vendor
import
requests
,
six
,
urllib3
from
pip._vendor.cachecontrol
import
CacheControlAdapter
from
pip._vendor.requests.adapters
import
BaseAdapter
,
HTTPAdapter
from
pip._vendor.requests.models
import
Response
from
pip._vendor.requests.structures
import
CaseInsensitiveDict
from
pip._vendor.six.moves.urllib
import
parse
as
urllib_parse
import
pip
from
pip._internal.utils.compat
import
HAS_TLS
,
ssl
from
pip._internal.network.auth
import
MultiDomainBasicAuth
from
pip._internal.network.cache
import
SafeFileCache
# Import ssl from compat so the initial import occurs in only one place.
from
pip._internal.utils.compat
import
HAS_TLS
,
ipaddress
,
ssl
from
pip._internal.utils.filesystem
import
check_path_owner
from
pip._internal.utils.glibc
import
libc_ver
from
pip._internal.utils.misc
import
get_installed_version
from
pip._internal.utils.misc
import
(
build_url_from_netloc
,
get_installed_version
,
parse_netloc
,
)
from
pip._internal.utils.typing
import
MYPY_CHECK_RUNNING
from
pip._internal.utils.urls
import
url_to_path
if
MYPY_CHECK_RUNNING
:
from
typing
import
List
,
Optional
,
Tuple
,
Union
from
typing
import
(
Iterator
,
List
,
Optional
,
Tuple
,
Union
,
)
from
pip._internal.models.link
import
Link
SecureOrigin
=
Tuple
[
str
,
str
,
Optional
[
Union
[
int
,
str
]]]
logger
=
logging
.
getLogger
(
__name__
)
SECURE_ORIGINS
=
[
# protocol, hostname, port
# Taken from Chrome's list of secure origins (See: http://bit.ly/1qrySKC)
...
...
@@ -135,3 +161,249 @@ def user_agent():
data
=
data
,
json
=
json
.
dumps
(
data
,
separators
=
(
","
,
":"
),
sort_keys
=
True
),
)
class
LocalFSAdapter
(
BaseAdapter
):
def
send
(
self
,
request
,
stream
=
None
,
timeout
=
None
,
verify
=
None
,
cert
=
None
,
proxies
=
None
):
pathname
=
url_to_path
(
request
.
url
)
resp
=
Response
()
resp
.
status_code
=
200
resp
.
url
=
request
.
url
try
:
stats
=
os
.
stat
(
pathname
)
except
OSError
as
exc
:
resp
.
status_code
=
404
resp
.
raw
=
exc
else
:
modified
=
email
.
utils
.
formatdate
(
stats
.
st_mtime
,
usegmt
=
True
)
content_type
=
mimetypes
.
guess_type
(
pathname
)[
0
]
or
"text/plain"
resp
.
headers
=
CaseInsensitiveDict
({
"Content-Type"
:
content_type
,
"Content-Length"
:
stats
.
st_size
,
"Last-Modified"
:
modified
,
})
resp
.
raw
=
open
(
pathname
,
"rb"
)
resp
.
close
=
resp
.
raw
.
close
return
resp
def
close
(
self
):
pass
class
InsecureHTTPAdapter
(
HTTPAdapter
):
def
cert_verify
(
self
,
conn
,
url
,
verify
,
cert
):
conn
.
cert_reqs
=
'CERT_NONE'
conn
.
ca_certs
=
None
class
PipSession
(
requests
.
Session
):
timeout
=
None
# type: Optional[int]
def
__init__
(
self
,
*
args
,
**
kwargs
):
"""
:param trusted_hosts: Domains not to emit warnings for when not using
HTTPS.
"""
retries
=
kwargs
.
pop
(
"retries"
,
0
)
cache
=
kwargs
.
pop
(
"cache"
,
None
)
trusted_hosts
=
kwargs
.
pop
(
"trusted_hosts"
,
[])
# type: List[str]
index_urls
=
kwargs
.
pop
(
"index_urls"
,
None
)
super
(
PipSession
,
self
).
__init__
(
*
args
,
**
kwargs
)
# Namespace the attribute with "pip_" just in case to prevent
# possible conflicts with the base class.
self
.
pip_trusted_origins
=
[]
# type: List[Tuple[str, Optional[int]]]
# Attach our User Agent to the request
self
.
headers
[
"User-Agent"
]
=
user_agent
()
# Attach our Authentication handler to the session
self
.
auth
=
MultiDomainBasicAuth
(
index_urls
=
index_urls
)
# Create our urllib3.Retry instance which will allow us to customize
# how we handle retries.
retries
=
urllib3
.
Retry
(
# Set the total number of retries that a particular request can
# have.
total
=
retries
,
# A 503 error from PyPI typically means that the Fastly -> Origin
# connection got interrupted in some way. A 503 error in general
# is typically considered a transient error so we'll go ahead and
# retry it.
# A 500 may indicate transient error in Amazon S3
# A 520 or 527 - may indicate transient error in CloudFlare
status_forcelist
=
[
500
,
503
,
520
,
527
],
# Add a small amount of back off between failed requests in
# order to prevent hammering the service.
backoff_factor
=
0.25
,
)
# Check to ensure that the directory containing our cache directory
# is owned by the user current executing pip. If it does not exist
# we will check the parent directory until we find one that does exist.
if
cache
and
not
check_path_owner
(
cache
):
logger
.
warning
(
"The directory '%s' or its parent directory is not owned by "
"the current user and the cache has been disabled. Please "
"check the permissions and owner of that directory. If "
"executing pip with sudo, you may want sudo's -H flag."
,
cache
,
)
cache
=
None
# We want to _only_ cache responses on securely fetched origins. We do
# this because we can't validate the response of an insecurely fetched
# origin, and we don't want someone to be able to poison the cache and
# require manual eviction from the cache to fix it.
if
cache
:
secure_adapter
=
CacheControlAdapter
(
cache
=
SafeFileCache
(
cache
),
max_retries
=
retries
,
)
else
:
secure_adapter
=
HTTPAdapter
(
max_retries
=
retries
)
# Our Insecure HTTPAdapter disables HTTPS validation. It does not
# support caching (see above) so we'll use it for all http:// URLs as
# well as any https:// host that we've marked as ignoring TLS errors
# for.
insecure_adapter
=
InsecureHTTPAdapter
(
max_retries
=
retries
)
# Save this for later use in add_insecure_host().
self
.
_insecure_adapter
=
insecure_adapter
self
.
mount
(
"https://"
,
secure_adapter
)
self
.
mount
(
"http://"
,
insecure_adapter
)
# Enable file:// urls
self
.
mount
(
"file://"
,
LocalFSAdapter
())
for
host
in
trusted_hosts
:
self
.
add_trusted_host
(
host
,
suppress_logging
=
True
)
def
add_trusted_host
(
self
,
host
,
source
=
None
,
suppress_logging
=
False
):
# type: (str, Optional[str], bool) -> None
"""
:param host: It is okay to provide a host that has previously been
added.
:param source: An optional source string, for logging where the host
string came from.
"""
if
not
suppress_logging
:
msg
=
'adding trusted host: {!r}'
.
format
(
host
)
if
source
is
not
None
:
msg
+=
' (from {})'
.
format
(
source
)
logger
.
info
(
msg
)
host_port
=
parse_netloc
(
host
)
if
host_port
not
in
self
.
pip_trusted_origins
:
self
.
pip_trusted_origins
.
append
(
host_port
)
self
.
mount
(
build_url_from_netloc
(
host
)
+
'/'
,
self
.
_insecure_adapter
)
if
not
host_port
[
1
]:
# Mount wildcard ports for the same host.
self
.
mount
(
build_url_from_netloc
(
host
)
+
':'
,
self
.
_insecure_adapter
)
def
iter_secure_origins
(
self
):
# type: () -> Iterator[SecureOrigin]
for
secure_origin
in
SECURE_ORIGINS
:
yield
secure_origin
for
host
,
port
in
self
.
pip_trusted_origins
:
yield
(
'*'
,
host
,
'*'
if
port
is
None
else
port
)
def
is_secure_origin
(
self
,
location
):
# type: (Link) -> bool
# Determine if this url used a secure transport mechanism
parsed
=
urllib_parse
.
urlparse
(
str
(
location
))
origin_protocol
,
origin_host
,
origin_port
=
(
parsed
.
scheme
,
parsed
.
hostname
,
parsed
.
port
,
)
# The protocol to use to see if the protocol matches.
# Don't count the repository type as part of the protocol: in
# cases such as "git+ssh", only use "ssh". (I.e., Only verify against
# the last scheme.)
origin_protocol
=
origin_protocol
.
rsplit
(
'+'
,
1
)[
-
1
]
# Determine if our origin is a secure origin by looking through our
# hardcoded list of secure origins, as well as any additional ones
# configured on this PackageFinder instance.
for
secure_origin
in
self
.
iter_secure_origins
():
secure_protocol
,
secure_host
,
secure_port
=
secure_origin
if
origin_protocol
!=
secure_protocol
and
secure_protocol
!=
"*"
:
continue
try
:
# We need to do this decode dance to ensure that we have a
# unicode object, even on Python 2.x.
addr
=
ipaddress
.
ip_address
(
origin_host
if
(
isinstance
(
origin_host
,
six
.
text_type
)
or
origin_host
is
None
)
else
origin_host
.
decode
(
"utf8"
)
)
network
=
ipaddress
.
ip_network
(
secure_host
if
isinstance
(
secure_host
,
six
.
text_type
)
# setting secure_host to proper Union[bytes, str]
# creates problems in other places
else
secure_host
.
decode
(
"utf8"
)
# type: ignore
)
except
ValueError
:
# We don't have both a valid address or a valid network, so
# we'll check this origin against hostnames.
if
(
origin_host
and
origin_host
.
lower
()
!=
secure_host
.
lower
()
and
secure_host
!=
"*"
):
continue
else
:
# We have a valid address and network, so see if the address
# is contained within the network.
if
addr
not
in
network
:
continue
# Check to see if the port matches.
if
(
origin_port
!=
secure_port
and
secure_port
!=
"*"
and
secure_port
is
not
None
):
continue
# If we've gotten here, then this origin matches the current
# secure origin and we should return True
return
True
# If we've gotten to this point, then the origin isn't secure and we
# will not accept it as a valid location to search. We will however
# log a warning that we are ignoring it.
logger
.
warning
(
"The repository located at %s is not a trusted or secure host and "
"is being ignored. If this repository is available via HTTPS we "
"recommend you use HTTPS instead, otherwise you may silence "
"this warning and allow it anyway with '--trusted-host %s'."
,
origin_host
,
origin_host
,
)
return
False
def
request
(
self
,
method
,
url
,
*
args
,
**
kwargs
):
# Allow setting a default timeout on a session
kwargs
.
setdefault
(
"timeout"
,
self
.
timeout
)
# Dispatch the actual request
return
super
(
PipSession
,
self
).
request
(
method
,
url
,
*
args
,
**
kwargs
)
src/pip/_internal/operations/prepare.py
浏览文件 @
cf7ebdbb
...
...
@@ -32,8 +32,8 @@ if MYPY_CHECK_RUNNING:
from
typing
import
Optional
from
pip._internal.distributions
import
AbstractDistribution
from
pip._internal.download
import
PipSession
from
pip._internal.index
import
PackageFinder
from
pip._internal.network.session
import
PipSession
from
pip._internal.req.req_install
import
InstallRequirement
from
pip._internal.req.req_tracker
import
RequirementTracker
...
...
src/pip/_internal/req/req_file.py
浏览文件 @
cf7ebdbb
...
...
@@ -33,7 +33,7 @@ if MYPY_CHECK_RUNNING:
from
pip._internal.req
import
InstallRequirement
from
pip._internal.cache
import
WheelCache
from
pip._internal.index
import
PackageFinder
from
pip._internal.
download
import
PipSession
from
pip._internal.
network.session
import
PipSession
ReqFileLines
=
Iterator
[
Tuple
[
int
,
Text
]]
...
...
src/pip/_internal/utils/outdated.py
浏览文件 @
cf7ebdbb
...
...
@@ -33,7 +33,8 @@ if MYPY_CHECK_RUNNING:
import
optparse
from
optparse
import
Values
from
typing
import
Any
,
Dict
,
Text
,
Union
from
pip._internal.download
import
PipSession
from
pip._internal.network.session
import
PipSession
SELFCHECK_DATE_FMT
=
"%Y-%m-%dT%H:%M:%SZ"
...
...
tests/lib/__init__.py
浏览文件 @
cf7ebdbb
...
...
@@ -14,11 +14,11 @@ import pytest
from
scripttest
import
FoundDir
,
TestFileEnvironment
from
pip._internal.collector
import
LinkCollector
from
pip._internal.download
import
PipSession
from
pip._internal.index
import
PackageFinder
from
pip._internal.locations
import
get_major_minor_version
from
pip._internal.models.search_scope
import
SearchScope
from
pip._internal.models.selection_prefs
import
SelectionPreferences
from
pip._internal.network.session
import
PipSession
from
pip._internal.utils.deprecation
import
DEPRECATION_MSG_PREFIX
from
pip._internal.utils.typing
import
MYPY_CHECK_RUNNING
from
tests.lib.path
import
Path
,
curdir
...
...
tests/unit/test_collector.py
浏览文件 @
cf7ebdbb
...
...
@@ -22,9 +22,9 @@ from pip._internal.collector import (
group_locations
,
parse_links
,
)
from
pip._internal.download
import
PipSession
from
pip._internal.models.index
import
PyPI
from
pip._internal.models.link
import
Link
from
pip._internal.network.session
import
PipSession
from
tests.lib
import
make_test_link_collector
...
...
tests/unit/test_download.py
浏览文件 @
cf7ebdbb
import
hashlib
import
logging
import
os
import
shutil
import
sys
...
...
@@ -11,7 +10,6 @@ import pytest
from
mock
import
Mock
,
patch
from
pip._internal.download
import
(
PipSession
,
_copy_source_tree
,
_download_http_url
,
parse_content_disposition
,
...
...
@@ -21,6 +19,7 @@ from pip._internal.download import (
)
from
pip._internal.exceptions
import
HashMismatch
from
pip._internal.models.link
import
Link
from
pip._internal.network.session
import
PipSession
from
pip._internal.utils.hashes
import
Hashes
from
pip._internal.utils.urls
import
path_to_url
from
tests.lib
import
create_file
...
...
@@ -453,173 +452,3 @@ def test_unpack_file_url_excludes_expected_dirs(tmpdir, exclude_dir):
assert
not
os
.
path
.
isfile
(
dst_excluded_file
)
assert
os
.
path
.
isfile
(
dst_included_file
)
assert
os
.
path
.
isdir
(
dst_included_dir
)
class
TestPipSession
:
def
test_cache_defaults_off
(
self
):
session
=
PipSession
()
assert
not
hasattr
(
session
.
adapters
[
"http://"
],
"cache"
)
assert
not
hasattr
(
session
.
adapters
[
"https://"
],
"cache"
)
def
test_cache_is_enabled
(
self
,
tmpdir
):
session
=
PipSession
(
cache
=
tmpdir
.
joinpath
(
"test-cache"
))
assert
hasattr
(
session
.
adapters
[
"https://"
],
"cache"
)
assert
(
session
.
adapters
[
"https://"
].
cache
.
directory
==
tmpdir
.
joinpath
(
"test-cache"
))
def
test_http_cache_is_not_enabled
(
self
,
tmpdir
):
session
=
PipSession
(
cache
=
tmpdir
.
joinpath
(
"test-cache"
))
assert
not
hasattr
(
session
.
adapters
[
"http://"
],
"cache"
)
def
test_insecure_host_adapter
(
self
,
tmpdir
):
session
=
PipSession
(
cache
=
tmpdir
.
joinpath
(
"test-cache"
),
trusted_hosts
=
[
"example.com"
],
)
assert
"https://example.com/"
in
session
.
adapters
# Check that the "port wildcard" is present.
assert
"https://example.com:"
in
session
.
adapters
# Check that the cache isn't enabled.
assert
not
hasattr
(
session
.
adapters
[
"https://example.com/"
],
"cache"
)
def
test_add_trusted_host
(
self
):
# Leave a gap to test how the ordering is affected.
trusted_hosts
=
[
'host1'
,
'host3'
]
session
=
PipSession
(
trusted_hosts
=
trusted_hosts
)
insecure_adapter
=
session
.
_insecure_adapter
prefix2
=
'https://host2/'
prefix3
=
'https://host3/'
prefix3_wildcard
=
'https://host3:'
# Confirm some initial conditions as a baseline.
assert
session
.
pip_trusted_origins
==
[
(
'host1'
,
None
),
(
'host3'
,
None
)
]
assert
session
.
adapters
[
prefix3
]
is
insecure_adapter
assert
session
.
adapters
[
prefix3_wildcard
]
is
insecure_adapter
assert
prefix2
not
in
session
.
adapters
# Test adding a new host.
session
.
add_trusted_host
(
'host2'
)
assert
session
.
pip_trusted_origins
==
[
(
'host1'
,
None
),
(
'host3'
,
None
),
(
'host2'
,
None
)
]
# Check that prefix3 is still present.
assert
session
.
adapters
[
prefix3
]
is
insecure_adapter
assert
session
.
adapters
[
prefix2
]
is
insecure_adapter
# Test that adding the same host doesn't create a duplicate.
session
.
add_trusted_host
(
'host3'
)
assert
session
.
pip_trusted_origins
==
[
(
'host1'
,
None
),
(
'host3'
,
None
),
(
'host2'
,
None
)
],
'actual: {}'
.
format
(
session
.
pip_trusted_origins
)
session
.
add_trusted_host
(
'host4:8080'
)
prefix4
=
'https://host4:8080/'
assert
session
.
pip_trusted_origins
==
[
(
'host1'
,
None
),
(
'host3'
,
None
),
(
'host2'
,
None
),
(
'host4'
,
8080
)
]
assert
session
.
adapters
[
prefix4
]
is
insecure_adapter
def
test_add_trusted_host__logging
(
self
,
caplog
):
"""
Test logging when add_trusted_host() is called.
"""
trusted_hosts
=
[
'host0'
,
'host1'
]
session
=
PipSession
(
trusted_hosts
=
trusted_hosts
)
with
caplog
.
at_level
(
logging
.
INFO
):
# Test adding an existing host.
session
.
add_trusted_host
(
'host1'
,
source
=
'somewhere'
)
session
.
add_trusted_host
(
'host2'
)
# Test calling add_trusted_host() on the same host twice.
session
.
add_trusted_host
(
'host2'
)
actual
=
[(
r
.
levelname
,
r
.
message
)
for
r
in
caplog
.
records
]
# Observe that "host0" isn't included in the logs.
expected
=
[
(
'INFO'
,
"adding trusted host: 'host1' (from somewhere)"
),
(
'INFO'
,
"adding trusted host: 'host2'"
),
(
'INFO'
,
"adding trusted host: 'host2'"
),
]
assert
actual
==
expected
def
test_iter_secure_origins
(
self
):
trusted_hosts
=
[
'host1'
,
'host2'
,
'host3:8080'
]
session
=
PipSession
(
trusted_hosts
=
trusted_hosts
)
actual
=
list
(
session
.
iter_secure_origins
())
assert
len
(
actual
)
==
9
# Spot-check that SECURE_ORIGINS is included.
assert
actual
[
0
]
==
(
'https'
,
'*'
,
'*'
)
assert
actual
[
-
3
:]
==
[
(
'*'
,
'host1'
,
'*'
),
(
'*'
,
'host2'
,
'*'
),
(
'*'
,
'host3'
,
8080
)
]
def
test_iter_secure_origins__trusted_hosts_empty
(
self
):
"""
Test iter_secure_origins() after passing trusted_hosts=[].
"""
session
=
PipSession
(
trusted_hosts
=
[])
actual
=
list
(
session
.
iter_secure_origins
())
assert
len
(
actual
)
==
6
# Spot-check that SECURE_ORIGINS is included.
assert
actual
[
0
]
==
(
'https'
,
'*'
,
'*'
)
@
pytest
.
mark
.
parametrize
(
'location, trusted, expected'
,
[
(
"http://pypi.org/something"
,
[],
False
),
(
"https://pypi.org/something"
,
[],
True
),
(
"git+http://pypi.org/something"
,
[],
False
),
(
"git+https://pypi.org/something"
,
[],
True
),
(
"git+ssh://git@pypi.org/something"
,
[],
True
),
(
"http://localhost"
,
[],
True
),
(
"http://127.0.0.1"
,
[],
True
),
(
"http://example.com/something/"
,
[],
False
),
(
"http://example.com/something/"
,
[
"example.com"
],
True
),
# Try changing the case.
(
"http://eXample.com/something/"
,
[
"example.cOm"
],
True
),
# Test hosts with port.
(
"http://example.com:8080/something/"
,
[
"example.com"
],
True
),
# Test a trusted_host with a port.
(
"http://example.com:8080/something/"
,
[
"example.com:8080"
],
True
),
(
"http://example.com/something/"
,
[
"example.com:8080"
],
False
),
(
"http://example.com:8888/something/"
,
[
"example.com:8080"
],
False
),
],
)
def
test_is_secure_origin
(
self
,
caplog
,
location
,
trusted
,
expected
):
class
MockLogger
(
object
):
def
__init__
(
self
):
self
.
called
=
False
def
warning
(
self
,
*
args
,
**
kwargs
):
self
.
called
=
True
session
=
PipSession
(
trusted_hosts
=
trusted
)
actual
=
session
.
is_secure_origin
(
location
)
assert
actual
==
expected
log_records
=
[(
r
.
levelname
,
r
.
message
)
for
r
in
caplog
.
records
]
if
expected
:
assert
not
log_records
return
assert
len
(
log_records
)
==
1
actual_level
,
actual_message
=
log_records
[
0
]
assert
actual_level
==
'WARNING'
assert
'is not a trusted or secure host'
in
actual_message
tests/unit/test_index.py
浏览文件 @
cf7ebdbb
...
...
@@ -4,7 +4,6 @@ import pytest
from
pip._vendor.packaging.specifiers
import
SpecifierSet
from
pip._internal.collector
import
LinkCollector
from
pip._internal.download
import
PipSession
from
pip._internal.index
import
(
CandidateEvaluator
,
CandidatePreferences
,
...
...
@@ -21,6 +20,7 @@ from pip._internal.models.link import Link
from
pip._internal.models.search_scope
import
SearchScope
from
pip._internal.models.selection_prefs
import
SelectionPreferences
from
pip._internal.models.target_python
import
TargetPython
from
pip._internal.network.session
import
PipSession
from
pip._internal.pep425tags
import
get_supported
from
pip._internal.utils.hashes
import
Hashes
from
tests.lib
import
CURRENT_PY_VERSION_INFO
...
...
tests/unit/test_network_session.py
浏览文件 @
cf7ebdbb
import
logging
import
pytest
import
pip
from
pip._internal.download
import
PipSession
from
pip._internal.network.requests
import
CI_ENVIRONMENT_VARIABLES
from
pip._internal.network.session
import
CI_ENVIRONMENT_VARIABLES
,
PipSession
def
get_user_agent
():
...
...
@@ -46,3 +47,173 @@ def test_user_agent__ci(monkeypatch, name, expected_like_ci):
def
test_user_agent_user_data
(
monkeypatch
):
monkeypatch
.
setenv
(
"PIP_USER_AGENT_USER_DATA"
,
"some_string"
)
assert
"some_string"
in
PipSession
().
headers
[
"User-Agent"
]
class
TestPipSession
:
def
test_cache_defaults_off
(
self
):
session
=
PipSession
()
assert
not
hasattr
(
session
.
adapters
[
"http://"
],
"cache"
)
assert
not
hasattr
(
session
.
adapters
[
"https://"
],
"cache"
)
def
test_cache_is_enabled
(
self
,
tmpdir
):
session
=
PipSession
(
cache
=
tmpdir
.
joinpath
(
"test-cache"
))
assert
hasattr
(
session
.
adapters
[
"https://"
],
"cache"
)
assert
(
session
.
adapters
[
"https://"
].
cache
.
directory
==
tmpdir
.
joinpath
(
"test-cache"
))
def
test_http_cache_is_not_enabled
(
self
,
tmpdir
):
session
=
PipSession
(
cache
=
tmpdir
.
joinpath
(
"test-cache"
))
assert
not
hasattr
(
session
.
adapters
[
"http://"
],
"cache"
)
def
test_insecure_host_adapter
(
self
,
tmpdir
):
session
=
PipSession
(
cache
=
tmpdir
.
joinpath
(
"test-cache"
),
trusted_hosts
=
[
"example.com"
],
)
assert
"https://example.com/"
in
session
.
adapters
# Check that the "port wildcard" is present.
assert
"https://example.com:"
in
session
.
adapters
# Check that the cache isn't enabled.
assert
not
hasattr
(
session
.
adapters
[
"https://example.com/"
],
"cache"
)
def
test_add_trusted_host
(
self
):
# Leave a gap to test how the ordering is affected.
trusted_hosts
=
[
'host1'
,
'host3'
]
session
=
PipSession
(
trusted_hosts
=
trusted_hosts
)
insecure_adapter
=
session
.
_insecure_adapter
prefix2
=
'https://host2/'
prefix3
=
'https://host3/'
prefix3_wildcard
=
'https://host3:'
# Confirm some initial conditions as a baseline.
assert
session
.
pip_trusted_origins
==
[
(
'host1'
,
None
),
(
'host3'
,
None
)
]
assert
session
.
adapters
[
prefix3
]
is
insecure_adapter
assert
session
.
adapters
[
prefix3_wildcard
]
is
insecure_adapter
assert
prefix2
not
in
session
.
adapters
# Test adding a new host.
session
.
add_trusted_host
(
'host2'
)
assert
session
.
pip_trusted_origins
==
[
(
'host1'
,
None
),
(
'host3'
,
None
),
(
'host2'
,
None
)
]
# Check that prefix3 is still present.
assert
session
.
adapters
[
prefix3
]
is
insecure_adapter
assert
session
.
adapters
[
prefix2
]
is
insecure_adapter
# Test that adding the same host doesn't create a duplicate.
session
.
add_trusted_host
(
'host3'
)
assert
session
.
pip_trusted_origins
==
[
(
'host1'
,
None
),
(
'host3'
,
None
),
(
'host2'
,
None
)
],
'actual: {}'
.
format
(
session
.
pip_trusted_origins
)
session
.
add_trusted_host
(
'host4:8080'
)
prefix4
=
'https://host4:8080/'
assert
session
.
pip_trusted_origins
==
[
(
'host1'
,
None
),
(
'host3'
,
None
),
(
'host2'
,
None
),
(
'host4'
,
8080
)
]
assert
session
.
adapters
[
prefix4
]
is
insecure_adapter
def
test_add_trusted_host__logging
(
self
,
caplog
):
"""
Test logging when add_trusted_host() is called.
"""
trusted_hosts
=
[
'host0'
,
'host1'
]
session
=
PipSession
(
trusted_hosts
=
trusted_hosts
)
with
caplog
.
at_level
(
logging
.
INFO
):
# Test adding an existing host.
session
.
add_trusted_host
(
'host1'
,
source
=
'somewhere'
)
session
.
add_trusted_host
(
'host2'
)
# Test calling add_trusted_host() on the same host twice.
session
.
add_trusted_host
(
'host2'
)
actual
=
[(
r
.
levelname
,
r
.
message
)
for
r
in
caplog
.
records
]
# Observe that "host0" isn't included in the logs.
expected
=
[
(
'INFO'
,
"adding trusted host: 'host1' (from somewhere)"
),
(
'INFO'
,
"adding trusted host: 'host2'"
),
(
'INFO'
,
"adding trusted host: 'host2'"
),
]
assert
actual
==
expected
def
test_iter_secure_origins
(
self
):
trusted_hosts
=
[
'host1'
,
'host2'
,
'host3:8080'
]
session
=
PipSession
(
trusted_hosts
=
trusted_hosts
)
actual
=
list
(
session
.
iter_secure_origins
())
assert
len
(
actual
)
==
9
# Spot-check that SECURE_ORIGINS is included.
assert
actual
[
0
]
==
(
'https'
,
'*'
,
'*'
)
assert
actual
[
-
3
:]
==
[
(
'*'
,
'host1'
,
'*'
),
(
'*'
,
'host2'
,
'*'
),
(
'*'
,
'host3'
,
8080
)
]
def
test_iter_secure_origins__trusted_hosts_empty
(
self
):
"""
Test iter_secure_origins() after passing trusted_hosts=[].
"""
session
=
PipSession
(
trusted_hosts
=
[])
actual
=
list
(
session
.
iter_secure_origins
())
assert
len
(
actual
)
==
6
# Spot-check that SECURE_ORIGINS is included.
assert
actual
[
0
]
==
(
'https'
,
'*'
,
'*'
)
@
pytest
.
mark
.
parametrize
(
'location, trusted, expected'
,
[
(
"http://pypi.org/something"
,
[],
False
),
(
"https://pypi.org/something"
,
[],
True
),
(
"git+http://pypi.org/something"
,
[],
False
),
(
"git+https://pypi.org/something"
,
[],
True
),
(
"git+ssh://git@pypi.org/something"
,
[],
True
),
(
"http://localhost"
,
[],
True
),
(
"http://127.0.0.1"
,
[],
True
),
(
"http://example.com/something/"
,
[],
False
),
(
"http://example.com/something/"
,
[
"example.com"
],
True
),
# Try changing the case.
(
"http://eXample.com/something/"
,
[
"example.cOm"
],
True
),
# Test hosts with port.
(
"http://example.com:8080/something/"
,
[
"example.com"
],
True
),
# Test a trusted_host with a port.
(
"http://example.com:8080/something/"
,
[
"example.com:8080"
],
True
),
(
"http://example.com/something/"
,
[
"example.com:8080"
],
False
),
(
"http://example.com:8888/something/"
,
[
"example.com:8080"
],
False
),
],
)
def
test_is_secure_origin
(
self
,
caplog
,
location
,
trusted
,
expected
):
class
MockLogger
(
object
):
def
__init__
(
self
):
self
.
called
=
False
def
warning
(
self
,
*
args
,
**
kwargs
):
self
.
called
=
True
session
=
PipSession
(
trusted_hosts
=
trusted
)
actual
=
session
.
is_secure_origin
(
location
)
assert
actual
==
expected
log_records
=
[(
r
.
levelname
,
r
.
message
)
for
r
in
caplog
.
records
]
if
expected
:
assert
not
log_records
return
assert
len
(
log_records
)
==
1
actual_level
,
actual_message
=
log_records
[
0
]
assert
actual_level
==
'WARNING'
assert
'is not a trusted or secure host'
in
actual_message
tests/unit/test_req.py
浏览文件 @
cf7ebdbb
...
...
@@ -11,7 +11,6 @@ from pip._vendor.packaging.markers import Marker
from
pip._vendor.packaging.requirements
import
Requirement
from
pip._internal.commands
import
create_command
from
pip._internal.download
import
PipSession
from
pip._internal.exceptions
import
(
HashErrors
,
InstallationError
,
...
...
@@ -19,6 +18,7 @@ from pip._internal.exceptions import (
PreviousBuildDirError
,
)
from
pip._internal.legacy_resolve
import
Resolver
from
pip._internal.network.session
import
PipSession
from
pip._internal.operations.prepare
import
RequirementPreparer
from
pip._internal.req
import
InstallRequirement
,
RequirementSet
from
pip._internal.req.constructors
import
(
...
...
tests/unit/test_req_file.py
浏览文件 @
cf7ebdbb
...
...
@@ -8,12 +8,12 @@ from mock import Mock, patch
from
pretend
import
stub
import
pip._internal.index
from
pip._internal.download
import
PipSession
from
pip._internal.exceptions
import
(
InstallationError
,
RequirementsFileParseError
,
)
from
pip._internal.models.format_control
import
FormatControl
from
pip._internal.network.session
import
PipSession
from
pip._internal.req.constructors
import
(
install_req_from_editable
,
install_req_from_line
,
...
...
tests/unit/test_unit_outdated.py
浏览文件 @
cf7ebdbb
...
...
@@ -9,8 +9,8 @@ import pytest
from
mock
import
patch
from
pip._vendor
import
pkg_resources
from
pip._internal.download
import
PipSession
from
pip._internal.index
import
InstallationCandidate
from
pip._internal.network.session
import
PipSession
from
pip._internal.utils
import
outdated
from
pip._internal.utils.outdated
import
(
SelfCheckState
,
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录