未验证 提交 56b5482b 编写于 作者: F Frost Ming 提交者: GitHub

Merge pull request #169 from frostming/feature/progress

Feature: new lookings
......@@ -33,7 +33,7 @@ jobs:
with:
python-version: ${{ matrix.python-version }}
- name: Set PY
run: echo "::set-env name=PY::$(python -c 'import hashlib, sys;print(hashlib.sha256(sys.version.encode()+sys.executable.encode()).hexdigest())')"
run: echo "PY=$(python -c 'import hashlib, sys;print(hashlib.sha256(sys.version.encode()+sys.executable.encode()).hexdigest())')" >> $GITHUB_ENV
- name: Cache PIP
uses: actions/cache@v1
with:
......
......@@ -3,6 +3,7 @@ repos:
rev: 20.8b1
hooks:
- id: black
exclude: ^pdm/_vendor
- repo: https://gitlab.com/pycqa/flake8
rev: 3.8.3
hooks:
......
Patch the halo library to support parallel spinners. Change the looking of `pdm install`.
Vendorize `halo` and related libraries for patching.
此差异已折叠。
Copyright (c) 2010 Jonathan Hartley
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the copyright holders, nor those of its contributors
may be used to endorse or promote products derived from this software without
specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
from .initialise import init, deinit, reinit, colorama_text
from .ansi import Fore, Back, Style, Cursor
from .ansitowin32 import AnsiToWin32
__version__ = '0.4.4'
# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
'''
This module generates ANSI character codes to printing colors to terminals.
See: http://en.wikipedia.org/wiki/ANSI_escape_code
'''
CSI = '\033['
OSC = '\033]'
BEL = '\a'
def code_to_chars(code):
return CSI + str(code) + 'm'
def set_title(title):
return OSC + '2;' + title + BEL
def clear_screen(mode=2):
return CSI + str(mode) + 'J'
def clear_line(mode=2):
return CSI + str(mode) + 'K'
class AnsiCodes(object):
def __init__(self):
# the subclasses declare class attributes which are numbers.
# Upon instantiation we define instance attributes, which are the same
# as the class attributes but wrapped with the ANSI escape sequence
for name in dir(self):
if not name.startswith('_'):
value = getattr(self, name)
setattr(self, name, code_to_chars(value))
class AnsiCursor(object):
def UP(self, n=1):
return CSI + str(n) + 'A'
def DOWN(self, n=1):
return CSI + str(n) + 'B'
def FORWARD(self, n=1):
return CSI + str(n) + 'C'
def BACK(self, n=1):
return CSI + str(n) + 'D'
def POS(self, x=1, y=1):
return CSI + str(y) + ';' + str(x) + 'H'
class AnsiFore(AnsiCodes):
BLACK = 30
RED = 31
GREEN = 32
YELLOW = 33
BLUE = 34
MAGENTA = 35
CYAN = 36
WHITE = 37
RESET = 39
# These are fairly well supported, but not part of the standard.
LIGHTBLACK_EX = 90
LIGHTRED_EX = 91
LIGHTGREEN_EX = 92
LIGHTYELLOW_EX = 93
LIGHTBLUE_EX = 94
LIGHTMAGENTA_EX = 95
LIGHTCYAN_EX = 96
LIGHTWHITE_EX = 97
class AnsiBack(AnsiCodes):
BLACK = 40
RED = 41
GREEN = 42
YELLOW = 43
BLUE = 44
MAGENTA = 45
CYAN = 46
WHITE = 47
RESET = 49
# These are fairly well supported, but not part of the standard.
LIGHTBLACK_EX = 100
LIGHTRED_EX = 101
LIGHTGREEN_EX = 102
LIGHTYELLOW_EX = 103
LIGHTBLUE_EX = 104
LIGHTMAGENTA_EX = 105
LIGHTCYAN_EX = 106
LIGHTWHITE_EX = 107
class AnsiStyle(AnsiCodes):
BRIGHT = 1
DIM = 2
NORMAL = 22
RESET_ALL = 0
Fore = AnsiFore()
Back = AnsiBack()
Style = AnsiStyle()
Cursor = AnsiCursor()
# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
import re
import sys
import os
from .ansi import AnsiFore, AnsiBack, AnsiStyle, Style, BEL
from .winterm import WinTerm, WinColor, WinStyle
from .win32 import windll, winapi_test
winterm = None
if windll is not None:
winterm = WinTerm()
class StreamWrapper(object):
'''
Wraps a stream (such as stdout), acting as a transparent proxy for all
attribute access apart from method 'write()', which is delegated to our
Converter instance.
'''
def __init__(self, wrapped, converter):
# double-underscore everything to prevent clashes with names of
# attributes on the wrapped stream object.
self.__wrapped = wrapped
self.__convertor = converter
def __getattr__(self, name):
return getattr(self.__wrapped, name)
def __enter__(self, *args, **kwargs):
# special method lookup bypasses __getattr__/__getattribute__, see
# https://stackoverflow.com/questions/12632894/why-doesnt-getattr-work-with-exit
# thus, contextlib magic methods are not proxied via __getattr__
return self.__wrapped.__enter__(*args, **kwargs)
def __exit__(self, *args, **kwargs):
return self.__wrapped.__exit__(*args, **kwargs)
def write(self, text):
self.__convertor.write(text)
def isatty(self):
stream = self.__wrapped
if 'PYCHARM_HOSTED' in os.environ:
if stream is not None and (stream is sys.__stdout__ or stream is sys.__stderr__):
return True
try:
stream_isatty = stream.isatty
except AttributeError:
return False
else:
return stream_isatty()
@property
def closed(self):
stream = self.__wrapped
try:
return stream.closed
except AttributeError:
return True
class AnsiToWin32(object):
'''
Implements a 'write()' method which, on Windows, will strip ANSI character
sequences from the text, and if outputting to a tty, will convert them into
win32 function calls.
'''
ANSI_CSI_RE = re.compile('\001?\033\\[((?:\\d|;)*)([a-zA-Z])\002?') # Control Sequence Introducer
ANSI_OSC_RE = re.compile('\001?\033\\]([^\a]*)(\a)\002?') # Operating System Command
def __init__(self, wrapped, convert=None, strip=None, autoreset=False):
# The wrapped stream (normally sys.stdout or sys.stderr)
self.wrapped = wrapped
# should we reset colors to defaults after every .write()
self.autoreset = autoreset
# create the proxy wrapping our output stream
self.stream = StreamWrapper(wrapped, self)
on_windows = os.name == 'nt'
# We test if the WinAPI works, because even if we are on Windows
# we may be using a terminal that doesn't support the WinAPI
# (e.g. Cygwin Terminal). In this case it's up to the terminal
# to support the ANSI codes.
conversion_supported = on_windows and winapi_test()
# should we strip ANSI sequences from our output?
if strip is None:
strip = conversion_supported or (not self.stream.closed and not self.stream.isatty())
self.strip = strip
# should we should convert ANSI sequences into win32 calls?
if convert is None:
convert = conversion_supported and not self.stream.closed and self.stream.isatty()
self.convert = convert
# dict of ansi codes to win32 functions and parameters
self.win32_calls = self.get_win32_calls()
# are we wrapping stderr?
self.on_stderr = self.wrapped is sys.stderr
def should_wrap(self):
'''
True if this class is actually needed. If false, then the output
stream will not be affected, nor will win32 calls be issued, so
wrapping stdout is not actually required. This will generally be
False on non-Windows platforms, unless optional functionality like
autoreset has been requested using kwargs to init()
'''
return self.convert or self.strip or self.autoreset
def get_win32_calls(self):
if self.convert and winterm:
return {
AnsiStyle.RESET_ALL: (winterm.reset_all, ),
AnsiStyle.BRIGHT: (winterm.style, WinStyle.BRIGHT),
AnsiStyle.DIM: (winterm.style, WinStyle.NORMAL),
AnsiStyle.NORMAL: (winterm.style, WinStyle.NORMAL),
AnsiFore.BLACK: (winterm.fore, WinColor.BLACK),
AnsiFore.RED: (winterm.fore, WinColor.RED),
AnsiFore.GREEN: (winterm.fore, WinColor.GREEN),
AnsiFore.YELLOW: (winterm.fore, WinColor.YELLOW),
AnsiFore.BLUE: (winterm.fore, WinColor.BLUE),
AnsiFore.MAGENTA: (winterm.fore, WinColor.MAGENTA),
AnsiFore.CYAN: (winterm.fore, WinColor.CYAN),
AnsiFore.WHITE: (winterm.fore, WinColor.GREY),
AnsiFore.RESET: (winterm.fore, ),
AnsiFore.LIGHTBLACK_EX: (winterm.fore, WinColor.BLACK, True),
AnsiFore.LIGHTRED_EX: (winterm.fore, WinColor.RED, True),
AnsiFore.LIGHTGREEN_EX: (winterm.fore, WinColor.GREEN, True),
AnsiFore.LIGHTYELLOW_EX: (winterm.fore, WinColor.YELLOW, True),
AnsiFore.LIGHTBLUE_EX: (winterm.fore, WinColor.BLUE, True),
AnsiFore.LIGHTMAGENTA_EX: (winterm.fore, WinColor.MAGENTA, True),
AnsiFore.LIGHTCYAN_EX: (winterm.fore, WinColor.CYAN, True),
AnsiFore.LIGHTWHITE_EX: (winterm.fore, WinColor.GREY, True),
AnsiBack.BLACK: (winterm.back, WinColor.BLACK),
AnsiBack.RED: (winterm.back, WinColor.RED),
AnsiBack.GREEN: (winterm.back, WinColor.GREEN),
AnsiBack.YELLOW: (winterm.back, WinColor.YELLOW),
AnsiBack.BLUE: (winterm.back, WinColor.BLUE),
AnsiBack.MAGENTA: (winterm.back, WinColor.MAGENTA),
AnsiBack.CYAN: (winterm.back, WinColor.CYAN),
AnsiBack.WHITE: (winterm.back, WinColor.GREY),
AnsiBack.RESET: (winterm.back, ),
AnsiBack.LIGHTBLACK_EX: (winterm.back, WinColor.BLACK, True),
AnsiBack.LIGHTRED_EX: (winterm.back, WinColor.RED, True),
AnsiBack.LIGHTGREEN_EX: (winterm.back, WinColor.GREEN, True),
AnsiBack.LIGHTYELLOW_EX: (winterm.back, WinColor.YELLOW, True),
AnsiBack.LIGHTBLUE_EX: (winterm.back, WinColor.BLUE, True),
AnsiBack.LIGHTMAGENTA_EX: (winterm.back, WinColor.MAGENTA, True),
AnsiBack.LIGHTCYAN_EX: (winterm.back, WinColor.CYAN, True),
AnsiBack.LIGHTWHITE_EX: (winterm.back, WinColor.GREY, True),
}
return dict()
def write(self, text):
if self.strip or self.convert:
self.write_and_convert(text)
else:
self.wrapped.write(text)
self.wrapped.flush()
if self.autoreset:
self.reset_all()
def reset_all(self):
if self.convert:
self.call_win32('m', (0,))
elif not self.strip and not self.stream.closed:
self.wrapped.write(Style.RESET_ALL)
def write_and_convert(self, text):
'''
Write the given text to our wrapped stream, stripping any ANSI
sequences from the text, and optionally converting them into win32
calls.
'''
cursor = 0
text = self.convert_osc(text)
for match in self.ANSI_CSI_RE.finditer(text):
start, end = match.span()
self.write_plain_text(text, cursor, start)
self.convert_ansi(*match.groups())
cursor = end
self.write_plain_text(text, cursor, len(text))
def write_plain_text(self, text, start, end):
if start < end:
self.wrapped.write(text[start:end])
self.wrapped.flush()
def convert_ansi(self, paramstring, command):
if self.convert:
params = self.extract_params(command, paramstring)
self.call_win32(command, params)
def extract_params(self, command, paramstring):
if command in 'Hf':
params = tuple(int(p) if len(p) != 0 else 1 for p in paramstring.split(';'))
while len(params) < 2:
# defaults:
params = params + (1,)
else:
params = tuple(int(p) for p in paramstring.split(';') if len(p) != 0)
if len(params) == 0:
# defaults:
if command in 'JKm':
params = (0,)
elif command in 'ABCD':
params = (1,)
return params
def call_win32(self, command, params):
if command == 'm':
for param in params:
if param in self.win32_calls:
func_args = self.win32_calls[param]
func = func_args[0]
args = func_args[1:]
kwargs = dict(on_stderr=self.on_stderr)
func(*args, **kwargs)
elif command in 'J':
winterm.erase_screen(params[0], on_stderr=self.on_stderr)
elif command in 'K':
winterm.erase_line(params[0], on_stderr=self.on_stderr)
elif command in 'Hf': # cursor position - absolute
winterm.set_cursor_position(params, on_stderr=self.on_stderr)
elif command in 'ABCD': # cursor position - relative
n = params[0]
# A - up, B - down, C - forward, D - back
x, y = {'A': (0, -n), 'B': (0, n), 'C': (n, 0), 'D': (-n, 0)}[command]
winterm.cursor_adjust(x, y, on_stderr=self.on_stderr)
def convert_osc(self, text):
for match in self.ANSI_OSC_RE.finditer(text):
start, end = match.span()
text = text[:start] + text[end:]
paramstring, command = match.groups()
if command == BEL:
if paramstring.count(";") == 1:
params = paramstring.split(";")
# 0 - change title and icon (we will only change title)
# 1 - change icon (we don't support this)
# 2 - change title
if params[0] in '02':
winterm.set_title(params[1])
return text
# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
import atexit
import contextlib
import sys
from .ansitowin32 import AnsiToWin32
orig_stdout = None
orig_stderr = None
wrapped_stdout = None
wrapped_stderr = None
atexit_done = False
def reset_all():
if AnsiToWin32 is not None: # Issue #74: objects might become None at exit
AnsiToWin32(orig_stdout).reset_all()
def init(autoreset=False, convert=None, strip=None, wrap=True):
if not wrap and any([autoreset, convert, strip]):
raise ValueError('wrap=False conflicts with any other arg=True')
global wrapped_stdout, wrapped_stderr
global orig_stdout, orig_stderr
orig_stdout = sys.stdout
orig_stderr = sys.stderr
if sys.stdout is None:
wrapped_stdout = None
else:
sys.stdout = wrapped_stdout = \
wrap_stream(orig_stdout, convert, strip, autoreset, wrap)
if sys.stderr is None:
wrapped_stderr = None
else:
sys.stderr = wrapped_stderr = \
wrap_stream(orig_stderr, convert, strip, autoreset, wrap)
global atexit_done
if not atexit_done:
atexit.register(reset_all)
atexit_done = True
def deinit():
if orig_stdout is not None:
sys.stdout = orig_stdout
if orig_stderr is not None:
sys.stderr = orig_stderr
@contextlib.contextmanager
def colorama_text(*args, **kwargs):
init(*args, **kwargs)
try:
yield
finally:
deinit()
def reinit():
if wrapped_stdout is not None:
sys.stdout = wrapped_stdout
if wrapped_stderr is not None:
sys.stderr = wrapped_stderr
def wrap_stream(stream, convert, strip, autoreset, wrap):
if wrap:
wrapper = AnsiToWin32(stream,
convert=convert, strip=strip, autoreset=autoreset)
if wrapper.should_wrap():
stream = wrapper.stream
return stream
# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
# from winbase.h
STDOUT = -11
STDERR = -12
try:
import ctypes
from ctypes import LibraryLoader
windll = LibraryLoader(ctypes.WinDLL)
from ctypes import wintypes
except (AttributeError, ImportError):
windll = None
SetConsoleTextAttribute = lambda *_: None
winapi_test = lambda *_: None
else:
from ctypes import byref, Structure, c_char, POINTER
COORD = wintypes._COORD
class CONSOLE_SCREEN_BUFFER_INFO(Structure):
"""struct in wincon.h."""
_fields_ = [
("dwSize", COORD),
("dwCursorPosition", COORD),
("wAttributes", wintypes.WORD),
("srWindow", wintypes.SMALL_RECT),
("dwMaximumWindowSize", COORD),
]
def __str__(self):
return '(%d,%d,%d,%d,%d,%d,%d,%d,%d,%d,%d)' % (
self.dwSize.Y, self.dwSize.X
, self.dwCursorPosition.Y, self.dwCursorPosition.X
, self.wAttributes
, self.srWindow.Top, self.srWindow.Left, self.srWindow.Bottom, self.srWindow.Right
, self.dwMaximumWindowSize.Y, self.dwMaximumWindowSize.X
)
_GetStdHandle = windll.kernel32.GetStdHandle
_GetStdHandle.argtypes = [
wintypes.DWORD,
]
_GetStdHandle.restype = wintypes.HANDLE
_GetConsoleScreenBufferInfo = windll.kernel32.GetConsoleScreenBufferInfo
_GetConsoleScreenBufferInfo.argtypes = [
wintypes.HANDLE,
POINTER(CONSOLE_SCREEN_BUFFER_INFO),
]
_GetConsoleScreenBufferInfo.restype = wintypes.BOOL
_SetConsoleTextAttribute = windll.kernel32.SetConsoleTextAttribute
_SetConsoleTextAttribute.argtypes = [
wintypes.HANDLE,
wintypes.WORD,
]
_SetConsoleTextAttribute.restype = wintypes.BOOL
_SetConsoleCursorPosition = windll.kernel32.SetConsoleCursorPosition
_SetConsoleCursorPosition.argtypes = [
wintypes.HANDLE,
COORD,
]
_SetConsoleCursorPosition.restype = wintypes.BOOL
_FillConsoleOutputCharacterA = windll.kernel32.FillConsoleOutputCharacterA
_FillConsoleOutputCharacterA.argtypes = [
wintypes.HANDLE,
c_char,
wintypes.DWORD,
COORD,
POINTER(wintypes.DWORD),
]
_FillConsoleOutputCharacterA.restype = wintypes.BOOL
_FillConsoleOutputAttribute = windll.kernel32.FillConsoleOutputAttribute
_FillConsoleOutputAttribute.argtypes = [
wintypes.HANDLE,
wintypes.WORD,
wintypes.DWORD,
COORD,
POINTER(wintypes.DWORD),
]
_FillConsoleOutputAttribute.restype = wintypes.BOOL
_SetConsoleTitleW = windll.kernel32.SetConsoleTitleW
_SetConsoleTitleW.argtypes = [
wintypes.LPCWSTR
]
_SetConsoleTitleW.restype = wintypes.BOOL
def _winapi_test(handle):
csbi = CONSOLE_SCREEN_BUFFER_INFO()
success = _GetConsoleScreenBufferInfo(
handle, byref(csbi))
return bool(success)
def winapi_test():
return any(_winapi_test(h) for h in
(_GetStdHandle(STDOUT), _GetStdHandle(STDERR)))
def GetConsoleScreenBufferInfo(stream_id=STDOUT):
handle = _GetStdHandle(stream_id)
csbi = CONSOLE_SCREEN_BUFFER_INFO()
success = _GetConsoleScreenBufferInfo(
handle, byref(csbi))
return csbi
def SetConsoleTextAttribute(stream_id, attrs):
handle = _GetStdHandle(stream_id)
return _SetConsoleTextAttribute(handle, attrs)
def SetConsoleCursorPosition(stream_id, position, adjust=True):
position = COORD(*position)
# If the position is out of range, do nothing.
if position.Y <= 0 or position.X <= 0:
return
# Adjust for Windows' SetConsoleCursorPosition:
# 1. being 0-based, while ANSI is 1-based.
# 2. expecting (x,y), while ANSI uses (y,x).
adjusted_position = COORD(position.Y - 1, position.X - 1)
if adjust:
# Adjust for viewport's scroll position
sr = GetConsoleScreenBufferInfo(STDOUT).srWindow
adjusted_position.Y += sr.Top
adjusted_position.X += sr.Left
# Resume normal processing
handle = _GetStdHandle(stream_id)
return _SetConsoleCursorPosition(handle, adjusted_position)
def FillConsoleOutputCharacter(stream_id, char, length, start):
handle = _GetStdHandle(stream_id)
char = c_char(char.encode())
length = wintypes.DWORD(length)
num_written = wintypes.DWORD(0)
# Note that this is hard-coded for ANSI (vs wide) bytes.
success = _FillConsoleOutputCharacterA(
handle, char, length, start, byref(num_written))
return num_written.value
def FillConsoleOutputAttribute(stream_id, attr, length, start):
''' FillConsoleOutputAttribute( hConsole, csbi.wAttributes, dwConSize, coordScreen, &cCharsWritten )'''
handle = _GetStdHandle(stream_id)
attribute = wintypes.WORD(attr)
length = wintypes.DWORD(length)
num_written = wintypes.DWORD(0)
# Note that this is hard-coded for ANSI (vs wide) bytes.
return _FillConsoleOutputAttribute(
handle, attribute, length, start, byref(num_written))
def SetConsoleTitle(title):
return _SetConsoleTitleW(title)
# Copyright Jonathan Hartley 2013. BSD 3-Clause license, see LICENSE file.
from . import win32
# from wincon.h
class WinColor(object):
BLACK = 0
BLUE = 1
GREEN = 2
CYAN = 3
RED = 4
MAGENTA = 5
YELLOW = 6
GREY = 7
# from wincon.h
class WinStyle(object):
NORMAL = 0x00 # dim text, dim background
BRIGHT = 0x08 # bright text, dim background
BRIGHT_BACKGROUND = 0x80 # dim text, bright background
class WinTerm(object):
def __init__(self):
self._default = win32.GetConsoleScreenBufferInfo(win32.STDOUT).wAttributes
self.set_attrs(self._default)
self._default_fore = self._fore
self._default_back = self._back
self._default_style = self._style
# In order to emulate LIGHT_EX in windows, we borrow the BRIGHT style.
# So that LIGHT_EX colors and BRIGHT style do not clobber each other,
# we track them separately, since LIGHT_EX is overwritten by Fore/Back
# and BRIGHT is overwritten by Style codes.
self._light = 0
def get_attrs(self):
return self._fore + self._back * 16 + (self._style | self._light)
def set_attrs(self, value):
self._fore = value & 7
self._back = (value >> 4) & 7
self._style = value & (WinStyle.BRIGHT | WinStyle.BRIGHT_BACKGROUND)
def reset_all(self, on_stderr=None):
self.set_attrs(self._default)
self.set_console(attrs=self._default)
self._light = 0
def fore(self, fore=None, light=False, on_stderr=False):
if fore is None:
fore = self._default_fore
self._fore = fore
# Emulate LIGHT_EX with BRIGHT Style
if light:
self._light |= WinStyle.BRIGHT
else:
self._light &= ~WinStyle.BRIGHT
self.set_console(on_stderr=on_stderr)
def back(self, back=None, light=False, on_stderr=False):
if back is None:
back = self._default_back
self._back = back
# Emulate LIGHT_EX with BRIGHT_BACKGROUND Style
if light:
self._light |= WinStyle.BRIGHT_BACKGROUND
else:
self._light &= ~WinStyle.BRIGHT_BACKGROUND
self.set_console(on_stderr=on_stderr)
def style(self, style=None, on_stderr=False):
if style is None:
style = self._default_style
self._style = style
self.set_console(on_stderr=on_stderr)
def set_console(self, attrs=None, on_stderr=False):
if attrs is None:
attrs = self.get_attrs()
handle = win32.STDOUT
if on_stderr:
handle = win32.STDERR
win32.SetConsoleTextAttribute(handle, attrs)
def get_position(self, handle):
position = win32.GetConsoleScreenBufferInfo(handle).dwCursorPosition
# Because Windows coordinates are 0-based,
# and win32.SetConsoleCursorPosition expects 1-based.
position.X += 1
position.Y += 1
return position
def set_cursor_position(self, position=None, on_stderr=False):
if position is None:
# I'm not currently tracking the position, so there is no default.
# position = self.get_position()
return
handle = win32.STDOUT
if on_stderr:
handle = win32.STDERR
win32.SetConsoleCursorPosition(handle, position)
def cursor_adjust(self, x, y, on_stderr=False):
handle = win32.STDOUT
if on_stderr:
handle = win32.STDERR
position = self.get_position(handle)
adjusted_position = (position.Y + y, position.X + x)
win32.SetConsoleCursorPosition(handle, adjusted_position, adjust=False)
def erase_screen(self, mode=0, on_stderr=False):
# 0 should clear from the cursor to the end of the screen.
# 1 should clear from the cursor to the beginning of the screen.
# 2 should clear the entire screen, and move cursor to (1,1)
handle = win32.STDOUT
if on_stderr:
handle = win32.STDERR
csbi = win32.GetConsoleScreenBufferInfo(handle)
# get the number of character cells in the current buffer
cells_in_screen = csbi.dwSize.X * csbi.dwSize.Y
# get number of character cells before current cursor position
cells_before_cursor = csbi.dwSize.X * csbi.dwCursorPosition.Y + csbi.dwCursorPosition.X
if mode == 0:
from_coord = csbi.dwCursorPosition
cells_to_erase = cells_in_screen - cells_before_cursor
elif mode == 1:
from_coord = win32.COORD(0, 0)
cells_to_erase = cells_before_cursor
elif mode == 2:
from_coord = win32.COORD(0, 0)
cells_to_erase = cells_in_screen
else:
# invalid mode
return
# fill the entire screen with blanks
win32.FillConsoleOutputCharacter(handle, ' ', cells_to_erase, from_coord)
# now set the buffer's attributes accordingly
win32.FillConsoleOutputAttribute(handle, self.get_attrs(), cells_to_erase, from_coord)
if mode == 2:
# put the cursor where needed
win32.SetConsoleCursorPosition(handle, (1, 1))
def erase_line(self, mode=0, on_stderr=False):
# 0 should clear from the cursor to the end of the line.
# 1 should clear from the cursor to the beginning of the line.
# 2 should clear the entire line.
handle = win32.STDOUT
if on_stderr:
handle = win32.STDERR
csbi = win32.GetConsoleScreenBufferInfo(handle)
if mode == 0:
from_coord = csbi.dwCursorPosition
cells_to_erase = csbi.dwSize.X - csbi.dwCursorPosition.X
elif mode == 1:
from_coord = win32.COORD(0, csbi.dwCursorPosition.Y)
cells_to_erase = csbi.dwCursorPosition.X
elif mode == 2:
from_coord = win32.COORD(0, csbi.dwCursorPosition.Y)
cells_to_erase = csbi.dwSize.X
else:
# invalid mode
return
# fill the entire screen with blanks
win32.FillConsoleOutputCharacter(handle, ' ', cells_to_erase, from_coord)
# now set the buffer's attributes accordingly
win32.FillConsoleOutputAttribute(handle, self.get_attrs(), cells_to_erase, from_coord)
def set_title(self, title):
win32.SetConsoleTitle(title)
MIT License
Copyright (c) 2017 Manraj Singh
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
# -*- coding: utf-8 -*-
__author__ = 'Manraj Singh'
__email__ = 'manrajsinghgrover@gmail.com'
import logging
from .halo import Halo
from .halo_notebook import HaloNotebook
logging.getLogger(__name__).addHandler(logging.NullHandler())
# -*- coding: utf-8 -*-
"""Utilities for Halo library.
"""
import codecs
import platform
import six
try:
from shutil import get_terminal_size
except ImportError:
from backports.shutil_get_terminal_size import get_terminal_size
from pdm._vendor.colorama import init
from pdm._vendor.termcolor import colored
init(autoreset=True)
def is_supported():
"""Check whether operating system supports main symbols or not.
Returns
-------
boolean
Whether operating system supports main symbols or not
"""
os_arch = platform.system()
if os_arch != 'Windows':
return True
return False
def get_environment():
"""Get the environment in which halo is running
Returns
-------
str
Environment name
"""
try:
from IPython import get_ipython
except ImportError:
return 'terminal'
try:
shell = get_ipython().__class__.__name__
if shell == 'ZMQInteractiveShell': # Jupyter notebook or qtconsole
return 'jupyter'
elif shell == 'TerminalInteractiveShell': # Terminal running IPython
return 'ipython'
else:
return 'terminal' # Other type (?)
except NameError:
return 'terminal'
def colored_frame(frame, color):
"""Color the frame with given color and returns.
Parameters
----------
frame : str
Frame to be colored
color : str
Color to be applied
Returns
-------
str
Colored frame
"""
return colored(frame, color, attrs=['bold'])
def is_text_type(text):
"""Check if given parameter is a string or not
Parameters
----------
text : *
Parameter to be checked for text type
Returns
-------
bool
Whether parameter is a string or not
"""
if isinstance(text, six.text_type) or isinstance(text, six.string_types):
return True
return False
def decode_utf_8_text(text):
"""Decode the text from utf-8 format
Parameters
----------
text : str
String to be decoded
Returns
-------
str
Decoded string
"""
try:
return codecs.decode(text, 'utf-8')
except (TypeError, ValueError):
return text
def encode_utf_8_text(text):
"""Encodes the text to utf-8 format
Parameters
----------
text : str
String to be encoded
Returns
-------
str
Encoded string
"""
try:
return codecs.encode(text, 'utf-8', 'ignore')
except (TypeError, ValueError):
return text
def get_terminal_columns():
"""Determine the amount of available columns in the terminal
Returns
-------
int
Terminal width
"""
terminal_size = get_terminal_size()
# If column size is 0 either we are not connected
# to a terminal or something else went wrong. Fallback to 80.
if terminal_size.columns == 0:
return 80
else:
return terminal_size.columns
"""
Source: https://stackoverflow.com/a/10455937/2692667
"""
import sys
import os
if os.name == "nt":
import ctypes
class _CursorInfo(ctypes.Structure):
_fields_ = [("size", ctypes.c_int), ("visible", ctypes.c_byte)]
def hide(stream=sys.stdout):
"""Hide cursor.
Parameters
----------
stream: sys.stdout, Optional
Defines stream to write output to.
"""
if os.name == "nt":
ci = _CursorInfo()
handle = ctypes.windll.kernel32.GetStdHandle(-11)
ctypes.windll.kernel32.GetConsoleCursorInfo(handle, ctypes.byref(ci))
ci.visible = False
ctypes.windll.kernel32.SetConsoleCursorInfo(handle, ctypes.byref(ci))
elif os.name == "posix":
stream.write("\033[?25l")
stream.flush()
def show(stream=sys.stdout):
"""Show cursor.
Parameters
----------
stream: sys.stdout, Optional
Defines stream to write output to.
"""
if os.name == "nt":
ci = _CursorInfo()
handle = ctypes.windll.kernel32.GetStdHandle(-11)
ctypes.windll.kernel32.GetConsoleCursorInfo(handle, ctypes.byref(ci))
ci.visible = True
ctypes.windll.kernel32.SetConsoleCursorInfo(handle, ctypes.byref(ci))
elif os.name == "posix":
stream.write("\033[?25h")
stream.flush()
# -*- coding: utf-8 -*-
# pylint: disable=unsubscriptable-object
"""Beautiful terminal spinners in Python.
"""
from __future__ import absolute_import, unicode_literals
import atexit
import functools
import sys
import threading
import time
import pdm._vendor.halo.cursor as cursor
from pdm._vendor.log_symbols.symbols import LogSymbols, is_supported
from pdm._vendor.spinners.spinners import Spinners
from pdm._vendor.halo._utils import (
colored_frame,
decode_utf_8_text,
get_environment,
get_terminal_columns,
is_text_type,
encode_utf_8_text,
)
class Halo(object):
"""Halo library.
Attributes
----------
CLEAR_LINE : str
Code to clear the line
"""
CLEAR_LINE = "\033[K"
CLEAR_REST = "\033[J"
SPINNER_PLACEMENTS = (
"left",
"right",
)
# a global list to keep all Halo instances
_instances = []
_lock = threading.Lock()
def __init__(
self,
text="",
color="cyan",
text_color=None,
spinner=None,
animation=None,
placement="left",
interval=-1,
enabled=True,
indent="",
stream=sys.stdout,
):
"""Constructs the Halo object.
Parameters
----------
text : str, optional
Text to display.
text_color : str, optional
Color of the text.
color : str, optional
Color of the text to display.
spinner : str|dict, optional
String or dictionary representing spinner. String can be one of 60+ spinners
supported.
animation: str, optional
Animation to apply if text is too large. Can be one of `bounce`, `marquee`.
Defaults to ellipses.
placement: str, optional
Side of the text to place the spinner on. Can be `left` or `right`.
Defaults to `left`.
interval : integer, optional
Interval between each frame of the spinner in milliseconds.
enabled : boolean, optional
Spinner enabled or not.
stream : io, optional
Output.
"""
self._color = color
self._animation = animation
self.spinner = spinner
self.text = text
self._text_color = text_color
self._interval = (
int(interval) if int(interval) > 0 else self._spinner["interval"]
)
self._stream = stream
self.placement = placement
self._frame_index = 0
self._text_index = 0
self._spinner_thread = None
self._stop_spinner = None
self._spinner_id = None
self.enabled = enabled
self._stopped = False
self._content = ""
self.indent = indent
environment = get_environment()
def clean_up():
"""Handle cell execution"""
self.stop()
if environment in ("ipython", "jupyter"):
from IPython import get_ipython
ip = get_ipython()
ip.events.register("post_run_cell", clean_up)
else: # default terminal
atexit.register(clean_up)
def __enter__(self):
"""Starts the spinner on a separate thread. For use in context managers.
Returns
-------
self
"""
return self.start()
def __exit__(self, type, value, traceback):
"""Stops the spinner. For use in context managers."""
self.stop()
def __call__(self, f):
"""Allow the Halo object to be used as a regular function decorator."""
@functools.wraps(f)
def wrapped(*args, **kwargs):
with self:
return f(*args, **kwargs)
return wrapped
@property
def spinner(self):
"""Getter for spinner property.
Returns
-------
dict
spinner value
"""
return self._spinner
@spinner.setter
def spinner(self, spinner=None):
"""Setter for spinner property.
Parameters
----------
spinner : dict, str
Defines the spinner value with frame and interval
"""
self._spinner = self._get_spinner(spinner)
self._frame_index = 0
self._text_index = 0
@property
def text(self):
"""Getter for text property.
Returns
-------
str
text value
"""
return self._text["original"]
@text.setter
def text(self, text):
"""Setter for text property.
Parameters
----------
text : str
Defines the text value for spinner
"""
self._text = self._get_text(text)
@property
def text_color(self):
"""Getter for text color property.
Returns
-------
str
text color value
"""
return self._text_color
@text_color.setter
def text_color(self, text_color):
"""Setter for text color property.
Parameters
----------
text_color : str
Defines the text color value for spinner
"""
self._text_color = text_color
@property
def color(self):
"""Getter for color property.
Returns
-------
str
color value
"""
return self._color
@color.setter
def color(self, color):
"""Setter for color property.
Parameters
----------
color : str
Defines the color value for spinner
"""
self._color = color
@property
def placement(self):
"""Getter for placement property.
Returns
-------
str
spinner placement
"""
return self._placement
@placement.setter
def placement(self, placement):
"""Setter for placement property.
Parameters
----------
placement: str
Defines the placement of the spinner
"""
if placement not in self.SPINNER_PLACEMENTS:
raise ValueError(
"Unknown spinner placement '{0}', available are {1}".format(
placement, self.SPINNER_PLACEMENTS
)
)
self._placement = placement
@property
def spinner_id(self):
"""Getter for spinner id
Returns
-------
str
Spinner id value
"""
return self._spinner_id
@property
def animation(self):
"""Getter for animation property.
Returns
-------
str
Spinner animation
"""
return self._animation
@animation.setter
def animation(self, animation):
"""Setter for animation property.
Parameters
----------
animation: str
Defines the animation of the spinner
"""
self._animation = animation
self._text = self._get_text(self._text["original"])
def _check_stream(self):
"""Returns whether the stream is open, and if applicable, writable
Returns
-------
bool
Whether the stream is open
"""
if self._stream.closed:
return False
try:
# Attribute access kept separate from invocation, to avoid
# swallowing AttributeErrors from the call which should bubble up.
check_stream_writable = self._stream.writable
except AttributeError:
pass
else:
return check_stream_writable()
return True
def _pop_stream_content_until_self(self, clear_self=False):
"""Move cursor to the end of this instance's content and erase all contents
following it.
Parameters
----------
clear_self: bool
If equals True, the content of current line will also get cleared
Returns
-------
str
The content of stream following this instance.
"""
erased_content = []
lines_to_erase = self._content.count("\n") if clear_self else 0
for inst in Halo._instances[::-1]:
if inst is self:
break
erased_content.append(inst._content)
lines_to_erase += inst._content.count("\n")
if lines_to_erase > 0:
# Move cursor up n lines
self._write_stream("\033[{}A".format(lines_to_erase))
# Erase rest content
self._write_stream(self.CLEAR_REST)
return "".join(reversed(erased_content))
def _write_stream(self, s):
"""Write to the stream, if writable
Parameters
----------
s : str
Characters to write to the stream
"""
if self._check_stream():
self._stream.write(s)
def _write(self, s, overwrite=False):
"""Write to the stream and keep following lines unchanged.
Parameters
----------
s : str
Characters to write to the stream
overwrite: bool
If set to True, overwrite the content of current instance.
"""
if s.startswith("\r"):
s = f"\r{self.indent}{s[1:]}"
else:
s = f"{self.indent}{s}"
with Halo._lock:
erased_content = self._pop_stream_content_until_self(overwrite)
self._write_stream(s)
# Write back following lines
self._write_stream(erased_content)
self._content = s if overwrite else self._content + s
def _hide_cursor(self):
"""Disable the user's blinking cursor"""
if self._check_stream() and self._stream.isatty():
cursor.hide(stream=self._stream)
def _show_cursor(self):
"""Re-enable the user's blinking cursor"""
if self._check_stream() and self._stream.isatty():
cursor.show(stream=self._stream)
def _get_spinner(self, spinner):
"""Extracts spinner value from options and returns value
containing spinner frames and interval, defaults to 'dots' spinner.
Parameters
----------
spinner : dict, str
Contains spinner value or type of spinner to be used
Returns
-------
dict
Contains frames and interval defining spinner
"""
default_spinner = Spinners["dots"].value
if spinner and type(spinner) == dict:
return spinner
if is_supported():
if all([is_text_type(spinner), spinner in Spinners.__members__]):
return Spinners[spinner].value
else:
return default_spinner
else:
return Spinners["line"].value
def _get_text(self, text):
"""Creates frames based on the selected animation
Returns
-------
self
"""
animation = self._animation
stripped_text = text.strip()
# Check which frame of the animation is the widest
max_spinner_length = max([len(i) for i in self._spinner["frames"]])
# Subtract to the current terminal size the max spinner length
# (-1 to leave room for the extra space between spinner and text)
terminal_width = get_terminal_columns() - max_spinner_length - 1
text_length = len(stripped_text)
frames = []
if terminal_width < text_length and animation:
if animation == "bounce":
"""
Make the text bounce back and forth
"""
for x in range(0, text_length - terminal_width + 1):
frames.append(stripped_text[x : terminal_width + x])
frames.extend(list(reversed(frames)))
elif "marquee":
"""
Make the text scroll like a marquee
"""
stripped_text = stripped_text + " " + stripped_text[:terminal_width]
for x in range(0, text_length + 1):
frames.append(stripped_text[x : terminal_width + x])
elif terminal_width < text_length and not animation:
# Add ellipsis if text is larger than terminal width and no animation was specified
frames = [stripped_text[: terminal_width - 6] + " (...)"]
else:
frames = [stripped_text]
return {"original": text, "frames": frames}
def clear(self):
"""Clears the line and returns cursor to the start.
of line
Returns
-------
self
"""
with Halo._lock:
erased_content = self._pop_stream_content_until_self(True)
self._content = ""
self._write_stream(erased_content)
return self
def _render_frame(self):
"""Renders the frame on the line after clearing it."""
if not self.enabled:
# in case we're disabled or stream is closed while still rendering,
# we render the frame and increment the frame index, so the proper
# frame is rendered if we're reenabled or the stream opens again.
return
frame = self.frame()
output = "\r{}\n".format(frame)
try:
self._write(output, True)
except UnicodeEncodeError:
self._write(encode_utf_8_text(output), True)
def render(self):
"""Runs the render until thread flag is set.
Returns
-------
self
"""
while not self._stop_spinner.is_set():
self._render_frame()
time.sleep(0.001 * self._interval)
return self
def frame(self):
"""Builds and returns the frame to be rendered
Returns
-------
self
"""
frames = self._spinner["frames"]
frame = frames[self._frame_index]
if self._color:
frame = colored_frame(frame, self._color)
self._frame_index += 1
self._frame_index = self._frame_index % len(frames)
text_frame = self.text_frame()
return "{0} {1}".format(
*[
(text_frame, frame)
if self._placement == "right"
else (frame, text_frame)
][0]
)
def text_frame(self):
"""Builds and returns the text frame to be rendered
Returns
-------
self
"""
if len(self._text["frames"]) == 1:
if self._text_color:
return colored_frame(self._text["frames"][0], self._text_color)
# Return first frame (can't return original text because at this point it might be ellipsed)
return self._text["frames"][0]
frames = self._text["frames"]
frame = frames[self._text_index]
self._text_index += 1
self._text_index = self._text_index % len(frames)
if self._text_color:
return colored_frame(frame, self._text_color)
return frame
def start(self, text=None):
"""Starts the spinner on a separate thread.
Parameters
----------
text : None, optional
Text to be used alongside spinner
Returns
-------
self
"""
if text is not None:
self.text = text
if self._spinner_id is not None:
return self
if not (self.enabled and self._check_stream()):
return self
# Clear all stale Halo instances created before
# Check against Halo._instances instead of self._instances
# to avoid possible overriding in subclasses.
if all(inst._stopped for inst in Halo._instances):
Halo._instances[:] = []
# Allow for calling start() multiple times
if self not in Halo._instances:
Halo._instances.append(self)
self._hide_cursor()
self._stop_spinner = threading.Event()
self._spinner_thread = threading.Thread(target=self.render)
self._spinner_thread.setDaemon(True)
self._render_frame()
self._spinner_id = self._spinner_thread.name
self._spinner_thread.start()
self._stopped = False
return self
def stop(self):
"""Stops the spinner and clears the line.
Returns
-------
self
"""
if self._spinner_thread and self._spinner_thread.is_alive():
self._stop_spinner.set()
self._spinner_thread.join()
if self._stopped:
return
if self.enabled:
self.clear()
self._frame_index = 0
self._spinner_id = None
self._show_cursor()
self._stopped = True
return self
def succeed(self, text=None):
"""Shows and persists success symbol and text and exits.
Parameters
----------
text : None, optional
Text to be shown alongside success symbol.
Returns
-------
self
"""
return self.stop_and_persist(symbol=LogSymbols.SUCCESS.value, text=text)
def fail(self, text=None):
"""Shows and persists fail symbol and text and exits.
Parameters
----------
text : None, optional
Text to be shown alongside fail symbol.
Returns
-------
self
"""
return self.stop_and_persist(symbol=LogSymbols.ERROR.value, text=text)
def warn(self, text=None):
"""Shows and persists warn symbol and text and exits.
Parameters
----------
text : None, optional
Text to be shown alongside warn symbol.
Returns
-------
self
"""
return self.stop_and_persist(symbol=LogSymbols.WARNING.value, text=text)
def info(self, text=None):
"""Shows and persists info symbol and text and exits.
Parameters
----------
text : None, optional
Text to be shown alongside info symbol.
Returns
-------
self
"""
return self.stop_and_persist(symbol=LogSymbols.INFO.value, text=text)
def stop_and_persist(self, symbol=" ", text=None):
"""Stops the spinner and persists the final frame to be shown.
Parameters
----------
symbol : str, optional
Symbol to be shown in final frame
text: str, optional
Text to be shown in final frame
Returns
-------
self
"""
if not self.enabled:
return self
symbol = decode_utf_8_text(symbol)
if text is not None:
text = decode_utf_8_text(text)
else:
text = self._text["original"]
text = text.strip()
if self._text_color:
text = colored_frame(text, self._text_color)
self.stop()
output = "{0} {1}\n".format(
*[(text, symbol) if self._placement == "right" else (symbol, text)][0]
)
try:
self._write(output)
except UnicodeEncodeError:
self._write(encode_utf_8_text(output))
return self
from __future__ import absolute_import, print_function, unicode_literals
import sys
import threading
import pdm._vendor.halo.cursor as cursor
from pdm._vendor.halo import Halo
from pdm._vendor.halo._utils import colored_frame, decode_utf_8_text
class HaloNotebook(Halo):
def __init__(
self,
text="",
color="cyan",
text_color=None,
spinner=None,
placement="left",
animation=None,
interval=-1,
enabled=True,
stream=sys.stdout,
):
super(HaloNotebook, self).__init__(
text=text,
color=color,
text_color=text_color,
spinner=spinner,
placement=placement,
animation=animation,
interval=interval,
enabled=enabled,
stream=stream,
)
self.output = self._make_output_widget()
def _make_output_widget(self):
from ipywidgets.widgets import Output
return Output()
# TODO: using property and setter
def _output(self, text=""):
return ({"name": "stdout", "output_type": "stream", "text": text},)
def clear(self):
if not self.enabled:
return self
with self.output:
self.output.outputs += self._output("\r")
self.output.outputs += self._output(self.CLEAR_LINE)
self.output.outputs = self._output()
return self
def _render_frame(self):
frame = self.frame()
output = "\r{}".format(frame)
with self.output:
self.output.outputs += self._output(output)
def start(self, text=None):
if text is not None:
self.text = text
if not self.enabled or self._spinner_id is not None:
return self
if self._stream.isatty():
cursor.hide()
self.output = self._make_output_widget()
from IPython.display import display
display(self.output)
self._stop_spinner = threading.Event()
self._spinner_thread = threading.Thread(target=self.render)
self._spinner_thread.setDaemon(True)
self._render_frame()
self._spinner_id = self._spinner_thread.name
self._spinner_thread.start()
return self
def stop_and_persist(self, symbol=" ", text=None):
"""Stops the spinner and persists the final frame to be shown.
Parameters
----------
symbol : str, optional
Symbol to be shown in final frame
text: str, optional
Text to be shown in final frame
Returns
-------
self
"""
if not self.enabled:
return self
symbol = decode_utf_8_text(symbol)
if text is not None:
text = decode_utf_8_text(text)
else:
text = self._text["original"]
text = text.strip()
if self._text_color:
text = colored_frame(text, self._text_color)
self.stop()
output = "\r{} {}\n".format(
*[(text, symbol) if self._placement == "right" else (symbol, text)][0]
)
with self.output:
self.output.outputs = self._output(output)
MIT License
Copyright (c) 2017 Manraj Singh
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
# -*- coding: utf-8 -*-
__author__ = 'Manraj Singh'
__email__ = 'manrajsinghgrover@gmail.com'
from .symbols import LogSymbols
# -*- coding: utf-8 -*-
"""Provide log symbols for various log levels."""
import codecs
import locale
import os
import sys
from enum import Enum
from pdm._vendor.colorama import init, deinit, Fore
init(autoreset=True)
_MAIN = {
'info': 'ℹ',
'success': '✔',
'warning': '⚠',
'error': '✖'
}
_FALLBACKS = {
'info': '¡',
'success': 'v',
'warning': '!!',
'error': '×'
}
def is_supported():
"""Check whether operating system supports main symbols or not.
Returns
-------
boolean
Whether operating system supports main symbols or not
"""
if os.getenv("DISABLE_UNICODE_OUTPUT"):
return False
encoding = getattr(sys.stdout, "encoding")
if encoding is None:
encoding = locale.getpreferredencoding(False)
try:
encoding = codecs.lookup(encoding).name
except Exception:
encoding = "utf-8"
return encoding == "utf-8"
_SYMBOLS = _MAIN if is_supported() else _FALLBACKS
class LogSymbols(Enum): # pylint: disable=too-few-public-methods
"""LogSymbol enum class.
Attributes
----------
ERROR : str
Colored error symbol
INFO : str
Colored info symbol
SUCCESS : str
Colored success symbol
WARNING : str
Colored warning symbol
"""
INFO = Fore.BLUE + _SYMBOLS['info'] + Fore.RESET
SUCCESS = Fore.GREEN + _SYMBOLS['success'] + Fore.RESET
WARNING = Fore.YELLOW + _SYMBOLS['warning'] + Fore.RESET
ERROR = Fore.RED + _SYMBOLS['error'] + Fore.RESET
deinit()
MIT License
Copyright (c) 2017 Manraj Singh
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
__author__ = 'Manraj Singh'
__email__ = 'manrajsinghgrover@gmail.com'
from .spinners import Spinners
# -*- coding: utf-8 -*-
"""
Python wrapper for beautiful terminal spinner library.
Spinners are from:
* cli-spinners:
MIT License
Copyright (c) Sindre Sorhus <sindresorhus@gmail.com> (sindresorhus.com)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included
in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE
FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
IN THE SOFTWARE.
"""
from __future__ import unicode_literals
from enum import Enum
Spinners = Enum('Spinners', {
"dots": {
"interval": 80,
"frames": [
"⠋",
"⠙",
"⠹",
"⠸",
"⠼",
"⠴",
"⠦",
"⠧",
"⠇",
"⠏"
]
},
"dots2": {
"interval": 80,
"frames": [
"⣾",
"⣽",
"⣻",
"⢿",
"⡿",
"⣟",
"⣯",
"⣷"
]
},
"dots3": {
"interval": 80,
"frames": [
"⠋",
"⠙",
"⠚",
"⠞",
"⠖",
"⠦",
"⠴",
"⠲",
"⠳",
"⠓"
]
},
"dots4": {
"interval": 80,
"frames": [
"⠄",
"⠆",
"⠇",
"⠋",
"⠙",
"⠸",
"⠰",
"⠠",
"⠰",
"⠸",
"⠙",
"⠋",
"⠇",
"⠆"
]
},
"dots5": {
"interval": 80,
"frames": [
"⠋",
"⠙",
"⠚",
"⠒",
"⠂",
"⠂",
"⠒",
"⠲",
"⠴",
"⠦",
"⠖",
"⠒",
"⠐",
"⠐",
"⠒",
"⠓",
"⠋"
]
},
"dots6": {
"interval": 80,
"frames": [
"⠁",
"⠉",
"⠙",
"⠚",
"⠒",
"⠂",
"⠂",
"⠒",
"⠲",
"⠴",
"⠤",
"⠄",
"⠄",
"⠤",
"⠴",
"⠲",
"⠒",
"⠂",
"⠂",
"⠒",
"⠚",
"⠙",
"⠉",
"⠁"
]
},
"dots7": {
"interval": 80,
"frames": [
"⠈",
"⠉",
"⠋",
"⠓",
"⠒",
"⠐",
"⠐",
"⠒",
"⠖",
"⠦",
"⠤",
"⠠",
"⠠",
"⠤",
"⠦",
"⠖",
"⠒",
"⠐",
"⠐",
"⠒",
"⠓",
"⠋",
"⠉",
"⠈"
]
},
"dots8": {
"interval": 80,
"frames": [
"⠁",
"⠁",
"⠉",
"⠙",
"⠚",
"⠒",
"⠂",
"⠂",
"⠒",
"⠲",
"⠴",
"⠤",
"⠄",
"⠄",
"⠤",
"⠠",
"⠠",
"⠤",
"⠦",
"⠖",
"⠒",
"⠐",
"⠐",
"⠒",
"⠓",
"⠋",
"⠉",
"⠈",
"⠈"
]
},
"dots9": {
"interval": 80,
"frames": [
"⢹",
"⢺",
"⢼",
"⣸",
"⣇",
"⡧",
"⡗",
"⡏"
]
},
"dots10": {
"interval": 80,
"frames": [
"⢄",
"⢂",
"⢁",
"⡁",
"⡈",
"⡐",
"⡠"
]
},
"dots11": {
"interval": 100,
"frames": [
"⠁",
"⠂",
"⠄",
"⡀",
"⢀",
"⠠",
"⠐",
"⠈"
]
},
"dots12": {
"interval": 80,
"frames": [
"⢀⠀",
"⡀⠀",
"⠄⠀",
"⢂⠀",
"⡂⠀",
"⠅⠀",
"⢃⠀",
"⡃⠀",
"⠍⠀",
"⢋⠀",
"⡋⠀",
"⠍⠁",
"⢋⠁",
"⡋⠁",
"⠍⠉",
"⠋⠉",
"⠋⠉",
"⠉⠙",
"⠉⠙",
"⠉⠩",
"⠈⢙",
"⠈⡙",
"⢈⠩",
"⡀⢙",
"⠄⡙",
"⢂⠩",
"⡂⢘",
"⠅⡘",
"⢃⠨",
"⡃⢐",
"⠍⡐",
"⢋⠠",
"⡋⢀",
"⠍⡁",
"⢋⠁",
"⡋⠁",
"⠍⠉",
"⠋⠉",
"⠋⠉",
"⠉⠙",
"⠉⠙",
"⠉⠩",
"⠈⢙",
"⠈⡙",
"⠈⠩",
"⠀⢙",
"⠀⡙",
"⠀⠩",
"⠀⢘",
"⠀⡘",
"⠀⠨",
"⠀⢐",
"⠀⡐",
"⠀⠠",
"⠀⢀",
"⠀⡀"
]
},
"line": {
"interval": 130,
"frames": [
"-",
"\\",
"|",
"/"
]
},
"line2": {
"interval": 100,
"frames": [
"⠂",
"-",
"–",
"—",
"–",
"-"
]
},
"pipe": {
"interval": 100,
"frames": [
"┤",
"┘",
"┴",
"└",
"├",
"┌",
"┬",
"┐"
]
},
"simpleDots": {
"interval": 400,
"frames": [
". ",
".. ",
"...",
" "
]
},
"simpleDotsScrolling": {
"interval": 200,
"frames": [
". ",
".. ",
"...",
" ..",
" .",
" "
]
},
"star": {
"interval": 70,
"frames": [
"✶",
"✸",
"✹",
"✺",
"✹",
"✷"
]
},
"star2": {
"interval": 80,
"frames": [
"+",
"x",
"*"
]
},
"flip": {
"interval": 70,
"frames": [
"_",
"_",
"_",
"-",
"`",
"`",
"'",
"´",
"-",
"_",
"_",
"_"
]
},
"hamburger": {
"interval": 100,
"frames": [
"☱",
"☲",
"☴"
]
},
"growVertical": {
"interval": 120,
"frames": [
"▁",
"▃",
"▄",
"▅",
"▆",
"▇",
"▆",
"▅",
"▄",
"▃"
]
},
"growHorizontal": {
"interval": 120,
"frames": [
"▏",
"▎",
"▍",
"▌",
"▋",
"▊",
"▉",
"▊",
"▋",
"▌",
"▍",
"▎"
]
},
"balloon": {
"interval": 140,
"frames": [
" ",
".",
"o",
"O",
"@",
"*",
" "
]
},
"balloon2": {
"interval": 120,
"frames": [
".",
"o",
"O",
"°",
"O",
"o",
"."
]
},
"noise": {
"interval": 100,
"frames": [
"▓",
"▒",
"░"
]
},
"bounce": {
"interval": 120,
"frames": [
"⠁",
"⠂",
"⠄",
"⠂"
]
},
"boxBounce": {
"interval": 120,
"frames": [
"▖",
"▘",
"▝",
"▗"
]
},
"boxBounce2": {
"interval": 100,
"frames": [
"▌",
"▀",
"▐",
"▄"
]
},
"triangle": {
"interval": 50,
"frames": [
"◢",
"◣",
"◤",
"◥"
]
},
"arc": {
"interval": 100,
"frames": [
"◜",
"◠",
"◝",
"◞",
"◡",
"◟"
]
},
"circle": {
"interval": 120,
"frames": [
"◡",
"⊙",
"◠"
]
},
"squareCorners": {
"interval": 180,
"frames": [
"◰",
"◳",
"◲",
"◱"
]
},
"circleQuarters": {
"interval": 120,
"frames": [
"◴",
"◷",
"◶",
"◵"
]
},
"circleHalves": {
"interval": 50,
"frames": [
"◐",
"◓",
"◑",
"◒"
]
},
"squish": {
"interval": 100,
"frames": [
"╫",
"╪"
]
},
"toggle": {
"interval": 250,
"frames": [
"⊶",
"⊷"
]
},
"toggle2": {
"interval": 80,
"frames": [
"▫",
"▪"
]
},
"toggle3": {
"interval": 120,
"frames": [
"□",
"■"
]
},
"toggle4": {
"interval": 100,
"frames": [
"■",
"□",
"▪",
"▫"
]
},
"toggle5": {
"interval": 100,
"frames": [
"▮",
"▯"
]
},
"toggle6": {
"interval": 300,
"frames": [
"ဝ",
"၀"
]
},
"toggle7": {
"interval": 80,
"frames": [
"⦾",
"⦿"
]
},
"toggle8": {
"interval": 100,
"frames": [
"◍",
"◌"
]
},
"toggle9": {
"interval": 100,
"frames": [
"◉",
"◎"
]
},
"toggle10": {
"interval": 100,
"frames": [
"㊂",
"㊀",
"㊁"
]
},
"toggle11": {
"interval": 50,
"frames": [
"⧇",
"⧆"
]
},
"toggle12": {
"interval": 120,
"frames": [
"☗",
"☖"
]
},
"toggle13": {
"interval": 80,
"frames": [
"=",
"*",
"-"
]
},
"arrow": {
"interval": 100,
"frames": [
"←",
"↖",
"↑",
"↗",
"→",
"↘",
"↓",
"↙"
]
},
"arrow2": {
"interval": 80,
"frames": [
"⬆️ ",
"↗️ ",
"➡️ ",
"↘️ ",
"⬇️ ",
"↙️ ",
"⬅️ ",
"↖️ "
]
},
"arrow3": {
"interval": 120,
"frames": [
"▹▹▹▹▹",
"▸▹▹▹▹",
"▹▸▹▹▹",
"▹▹▸▹▹",
"▹▹▹▸▹",
"▹▹▹▹▸"
]
},
"bouncingBar": {
"interval": 80,
"frames": [
"[ ]",
"[= ]",
"[== ]",
"[=== ]",
"[ ===]",
"[ ==]",
"[ =]",
"[ ]",
"[ =]",
"[ ==]",
"[ ===]",
"[====]",
"[=== ]",
"[== ]",
"[= ]"
]
},
"bouncingBall": {
"interval": 80,
"frames": [
"( ● )",
"( ● )",
"( ● )",
"( ● )",
"( ●)",
"( ● )",
"( ● )",
"( ● )",
"( ● )",
"(● )"
]
},
"smiley": {
"interval": 200,
"frames": [
"😄 ",
"😝 "
]
},
"monkey": {
"interval": 300,
"frames": [
"🙈 ",
"🙈 ",
"🙉 ",
"🙊 "
]
},
"hearts": {
"interval": 100,
"frames": [
"💛 ",
"💙 ",
"💜 ",
"💚 ",
"❤️ "
]
},
"clock": {
"interval": 100,
"frames": [
"🕛 ",
"🕐 ",
"🕑 ",
"🕒 ",
"🕓 ",
"🕔 ",
"🕕 ",
"🕖 ",
"🕗 ",
"🕘 ",
"🕙 ",
"🕚 "
]
},
"earth": {
"interval": 180,
"frames": [
"🌍 ",
"🌎 ",
"🌏 "
]
},
"moon": {
"interval": 80,
"frames": [
"🌑 ",
"🌒 ",
"🌓 ",
"🌔 ",
"🌕 ",
"🌖 ",
"🌗 ",
"🌘 "
]
},
"runner": {
"interval": 140,
"frames": [
"🚶 ",
"🏃 "
]
},
"pong": {
"interval": 80,
"frames": [
"▐⠂ ▌",
"▐⠈ ▌",
"▐ ⠂ ▌",
"▐ ⠠ ▌",
"▐ ⡀ ▌",
"▐ ⠠ ▌",
"▐ ⠂ ▌",
"▐ ⠈ ▌",
"▐ ⠂ ▌",
"▐ ⠠ ▌",
"▐ ⡀ ▌",
"▐ ⠠ ▌",
"▐ ⠂ ▌",
"▐ ⠈ ▌",
"▐ ⠂▌",
"▐ ⠠▌",
"▐ ⡀▌",
"▐ ⠠ ▌",
"▐ ⠂ ▌",
"▐ ⠈ ▌",
"▐ ⠂ ▌",
"▐ ⠠ ▌",
"▐ ⡀ ▌",
"▐ ⠠ ▌",
"▐ ⠂ ▌",
"▐ ⠈ ▌",
"▐ ⠂ ▌",
"▐ ⠠ ▌",
"▐ ⡀ ▌",
"▐⠠ ▌"
]
},
"shark": {
"interval": 120,
"frames": [
"▐|\\____________▌",
"▐_|\\___________▌",
"▐__|\\__________▌",
"▐___|\\_________▌",
"▐____|\\________▌",
"▐_____|\\_______▌",
"▐______|\\______▌",
"▐_______|\\_____▌",
"▐________|\\____▌",
"▐_________|\\___▌",
"▐__________|\\__▌",
"▐___________|\\_▌",
"▐____________|\\▌",
"▐____________/|▌",
"▐___________/|_▌",
"▐__________/|__▌",
"▐_________/|___▌",
"▐________/|____▌",
"▐_______/|_____▌",
"▐______/|______▌",
"▐_____/|_______▌",
"▐____/|________▌",
"▐___/|_________▌",
"▐__/|__________▌",
"▐_/|___________▌",
"▐/|____________▌"
]
},
"dqpb": {
"interval": 100,
"frames": [
"d",
"q",
"p",
"b"
]
},
"weather": {
"interval": 100,
"frames": [
"☀️ ",
"☀️ ",
"☀️ ",
"🌤 ",
"⛅️ ",
"🌥 ",
"☁️ ",
"🌧 ",
"🌨 ",
"🌧 ",
"🌨 ",
"🌧 ",
"🌨 ",
"⛈ ",
"🌨 ",
"🌧 ",
"🌨 ",
"☁️ ",
"🌥 ",
"⛅️ ",
"🌤 ",
"☀️ ",
"☀️ "
]
},
"christmas": {
"interval": 400,
"frames": [
"🌲",
"🎄"
]
},
"grenade": {
"interval": 80,
"frames": [
"، ",
"′ ",
" ´ ",
" ‾ ",
" ⸌",
" ⸊",
" |",
" ⁎",
" ⁕",
" ෴ ",
" ⁓",
" ",
" ",
" "
]
},
"point": {
"interval": 125,
"frames": [
"∙∙∙",
"●∙∙",
"∙●∙",
"∙∙●",
"∙∙∙"
]
},
"layer": {
"interval": 150,
"frames": [
"-",
"=",
"≡"
]
}
})
Copyright (c) 2008-2011 Volvox Development Team
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
# coding: utf-8
# Copyright (c) 2008-2011 Volvox Development Team
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# Author: Konstantin Lepa <konstantin.lepa@gmail.com>
"""ANSII Color formatting for output in terminal."""
from __future__ import print_function
import os
__ALL__ = [ 'colored', 'cprint' ]
VERSION = (1, 1, 0)
ATTRIBUTES = dict(
list(zip([
'bold',
'dark',
'',
'underline',
'blink',
'',
'reverse',
'concealed'
],
list(range(1, 9))
))
)
del ATTRIBUTES['']
HIGHLIGHTS = dict(
list(zip([
'on_grey',
'on_red',
'on_green',
'on_yellow',
'on_blue',
'on_magenta',
'on_cyan',
'on_white'
],
list(range(40, 48))
))
)
COLORS = dict(
list(zip([
'grey',
'red',
'green',
'yellow',
'blue',
'magenta',
'cyan',
'white',
],
list(range(30, 38))
))
)
RESET = '\033[0m'
def colored(text, color=None, on_color=None, attrs=None):
"""Colorize text.
Available text colors:
red, green, yellow, blue, magenta, cyan, white.
Available text highlights:
on_red, on_green, on_yellow, on_blue, on_magenta, on_cyan, on_white.
Available attributes:
bold, dark, underline, blink, reverse, concealed.
Example:
colored('Hello, World!', 'red', 'on_grey', ['blue', 'blink'])
colored('Hello, World!', 'green')
"""
if os.getenv('ANSI_COLORS_DISABLED') is None:
fmt_str = '\033[%dm%s'
if color is not None:
text = fmt_str % (COLORS[color], text)
if on_color is not None:
text = fmt_str % (HIGHLIGHTS[on_color], text)
if attrs is not None:
for attr in attrs:
text = fmt_str % (ATTRIBUTES[attr], text)
text += RESET
return text
def cprint(text, color=None, on_color=None, attrs=None, **kwargs):
"""Print colorize text.
It accepts arguments of print function.
"""
print((colored(text, color, on_color, attrs)), **kwargs)
if __name__ == '__main__':
print('Current terminal type: %s' % os.getenv('TERM'))
print('Test basic colors:')
cprint('Grey color', 'grey')
cprint('Red color', 'red')
cprint('Green color', 'green')
cprint('Yellow color', 'yellow')
cprint('Blue color', 'blue')
cprint('Magenta color', 'magenta')
cprint('Cyan color', 'cyan')
cprint('White color', 'white')
print(('-' * 78))
print('Test highlights:')
cprint('On grey color', on_color='on_grey')
cprint('On red color', on_color='on_red')
cprint('On green color', on_color='on_green')
cprint('On yellow color', on_color='on_yellow')
cprint('On blue color', on_color='on_blue')
cprint('On magenta color', on_color='on_magenta')
cprint('On cyan color', on_color='on_cyan')
cprint('On white color', color='grey', on_color='on_white')
print('-' * 78)
print('Test attributes:')
cprint('Bold grey color', 'grey', attrs=['bold'])
cprint('Dark red color', 'red', attrs=['dark'])
cprint('Underline green color', 'green', attrs=['underline'])
cprint('Blink yellow color', 'yellow', attrs=['blink'])
cprint('Reversed blue color', 'blue', attrs=['reverse'])
cprint('Concealed Magenta color', 'magenta', attrs=['concealed'])
cprint('Bold underline reverse cyan color', 'cyan',
attrs=['bold', 'underline', 'reverse'])
cprint('Dark blink concealed white color', 'white',
attrs=['dark', 'blink', 'concealed'])
print(('-' * 78))
print('Test mixing:')
cprint('Underline red on grey color', 'red', 'on_grey',
['underline'])
cprint('Reversed green on red color', 'green', 'on_red', ['reverse'])
halo==0.0.31
log_symbols==0.0.14
spinners==0.0.24
termcolor==1.1.0
colorama==0.4.4
......@@ -50,17 +50,19 @@ def do_lock(
requirements = [
r for deps in project.all_dependencies.values() for r in deps.values()
]
with stream.open_spinner(
title="Resolving dependencies", spinner="dots"
) as spin, stream.logging("lock"):
reporter = project.get_reporter(requirements, tracked_names, spin)
resolver = project.core.resolver_class(provider, reporter)
mapping, dependencies, summaries = resolve(
resolver, requirements, project.environment.python_requires
)
data = format_lockfile(mapping, dependencies, summaries)
spin.succeed("Resolution success")
with stream.logging("lock"):
# The context managers are nested to ensure the spinner is stopped before
# any message is thrown to the output.
with stream.open_spinner(
title="Resolving dependencies", spinner="dots"
) as spin:
reporter = project.get_reporter(requirements, tracked_names, spin)
resolver = project.core.resolver_class(provider, reporter)
mapping, dependencies, summaries = resolve(
resolver, requirements, project.environment.python_requires
)
data = format_lockfile(mapping, dependencies, summaries)
spin.succeed("Resolution succeeds")
project.write_lockfile(data)
return mapping
......
import contextlib
import functools
import os
import multiprocessing
import traceback
from collections import defaultdict
from concurrent.futures.thread import ThreadPoolExecutor
from functools import partial
from typing import Dict, List, Tuple
from click import progressbar
from pip._vendor.pkg_resources import Distribution, safe_name
from pdm.exceptions import InstallationError
......@@ -63,8 +61,6 @@ class DummyExecutor:
class Synchronizer:
"""Synchronize the working set with given installation candidates"""
BAR_FILLED_CHAR = "=" if os.name == "nt" else "▉"
BAR_EMPTY_CHAR = " "
RETRY_TIMES = 1
SEQUENTIAL_PACKAGES = ("pip", "setuptools", "wheel")
......@@ -80,23 +76,16 @@ class Synchronizer:
self.working_set = environment.get_working_set()
@contextlib.contextmanager
def progressbar(self, label: str, total: int):
bar = progressbar(
length=total,
fill_char=stream.green(self.BAR_FILLED_CHAR),
empty_char=self.BAR_EMPTY_CHAR,
show_percent=False,
show_pos=True,
label=label,
bar_template="%(label)s %(bar)s %(info)s",
)
def create_executor(self):
if self.parallel:
executor = ThreadPoolExecutor()
executor = ThreadPoolExecutor(
max_workers=min(multiprocessing.cpu_count(), 8)
)
else:
executor = DummyExecutor()
with executor:
try:
yield bar, executor
yield executor
except KeyboardInterrupt:
pass
......@@ -129,13 +118,21 @@ class Synchronizer:
and strip_extras(name)[0] not in working_set
}
)
return to_add, to_update, to_remove
return sorted(to_add), sorted(to_update), sorted(to_remove)
def install_candidate(self, key: str) -> Candidate:
"""Install candidate"""
can = self.candidates[key]
installer = self.get_installer()
installer.install(can)
with stream.open_spinner(f"Installing {can.format()}...") as spinner:
try:
installer.install(can)
except Exception:
spinner.fail(f"Install {can.format()} failed")
raise
else:
spinner.succeed(f"Install {can.format()} successful")
return can
def update_candidate(self, key: str) -> Tuple[Distribution, Candidate]:
......@@ -143,8 +140,24 @@ class Synchronizer:
can = self.candidates[key]
dist = self.working_set[safe_name(can.name).lower()]
installer = self.get_installer()
installer.uninstall(dist)
installer.install(can)
with stream.open_spinner(
f"Updating {stream.green(key, bold=True)} {stream.yellow(dist.version)} "
f"-> {stream.yellow(can.version)}..."
) as spinner:
try:
installer.uninstall(dist)
installer.install(can)
except Exception:
spinner.fail(
f"Update {stream.green(key, bold=True)} "
f"-> {stream.yellow(can.version)} failed"
)
raise
else:
spinner.succeed(
f"Update {stream.green(key, bold=True)} "
f"-> {stream.yellow(can.version)} successful"
)
return dist, can
def remove_distribution(self, key: str) -> Distribution:
......@@ -154,40 +167,62 @@ class Synchronizer:
"""
installer = self.get_installer()
dist = self.working_set[key]
installer.uninstall(dist)
with stream.open_spinner(
f"Removing {stream.green(key, bold=True)} {stream.yellow(dist.version)}..."
) as spinner:
try:
installer.uninstall(dist)
except Exception:
spinner.fail(
f"Remove {stream.green(key, bold=True)} "
f"{stream.yellow(dist.version)} failed"
)
raise
else:
spinner.succeed(
f"Remove {stream.green(key, bold=True)} "
f"{stream.yellow(dist.version)} successful"
)
return dist
def _print_section_title(
self, action: str, number_of_packages: int, dry_run: bool
) -> None:
plural = "s" if number_of_packages > 1 else ""
verb = "will be" if dry_run else "are" if plural else "is"
stream.echo(f"{number_of_packages} package{plural} {verb} {action}:")
def summarize(self, result, dry_run=False):
added, updated, removed = result["add"], result["update"], result["remove"]
if added:
stream.echo("\n")
self._print_section_title("installed", len(added), dry_run)
for item in sorted(added, key=lambda x: x.name):
stream.echo(f" - {item.format()}")
if updated:
stream.echo("\n")
self._print_section_title("updated", len(updated), dry_run)
for old, can in sorted(updated, key=lambda x: x[1].name):
stream.echo(
f" - {stream.green(can.name, bold=True)} "
f"{stream.yellow(old.version)} "
f"-> {stream.yellow(can.version)}"
def _show_headline(self, packages: Dict[str, List[str]]) -> None:
add, update, remove = packages["add"], packages["update"], packages["remove"]
results = [stream.bold("Synchronizing working set with lock file:")]
results.extend(
[
f"{stream.green(str(len(add)))} to add,",
f"{stream.yellow(str(len(update)))} to update,",
f"{stream.red(str(len(remove)))} to remove",
]
)
stream.echo(" ".join(results) + "\n")
def _show_summary(self, packages: Dict[str, List[str]]) -> None:
to_add = [self.candidates[key] for key in packages["add"]]
to_update = [
(self.working_set[key], self.candidates[key]) for key in packages["update"]
]
to_remove = [self.working_set[key] for key in packages["remove"]]
lines = []
if to_add:
lines.append(stream.bold("Packages to add:"))
for can in to_add:
lines.append(f" - {can.format()}")
if to_update:
lines.append(stream.bold("Packages to add:"))
for prev, cur in to_update:
lines.append(
f" - {stream.green(cur.name, bold=True)} "
f"{stream.yellow(prev.version)} -> {stream.yellow(cur.version)}"
)
if removed:
stream.echo("\n")
self._print_section_title("removed", len(removed), dry_run)
for dist in sorted(removed, key=lambda x: x.key):
stream.echo(
if to_remove:
lines.append(stream.bold("Packages to add:"))
for dist in to_remove:
lines.append(
f" - {stream.green(dist.key, bold=True)} "
f"{stream.yellow(dist.version)}"
)
stream.echo("\n".join(lines))
def synchronize(self, clean: bool = True, dry_run: bool = False) -> None:
"""Synchronize the working set with pinned candidates.
......@@ -198,22 +233,18 @@ class Synchronizer:
to_add, to_update, to_remove = self.compare_with_working_set()
if not clean:
to_remove = []
lists_to_check = [to_add, to_update, to_remove]
if not any(lists_to_check):
if not any([to_add, to_update, to_remove]):
if not dry_run:
self.environment.write_site_py()
stream.echo("All packages are synced to date, nothing to do.")
stream.echo(
stream.yellow("All packages are synced to date, nothing to do.")
)
return
to_do = {"remove": to_remove, "update": to_update, "add": to_add}
self._show_headline(to_do)
if dry_run:
result = dict(
add=[self.candidates[key] for key in to_add],
update=[
(self.working_set[key], self.candidates[key]) for key in to_update
],
remove=[self.working_set[key] for key in to_remove],
)
self.summarize(result, dry_run)
self._show_summary(to_do)
return
handlers = {
......@@ -222,88 +253,58 @@ class Synchronizer:
"remove": self.remove_distribution,
}
result = defaultdict(list)
failed = defaultdict(list)
to_do = {"add": to_add, "update": to_update, "remove": to_remove}
# Keep track of exceptions
errors = []
def update_progress(future, section, key, bar):
sequential_jobs = []
parallel_jobs = []
# Self package will be installed after all other dependencies are installed.
install_self = None
for kind in to_do:
for key in to_do[kind]:
if key == self.environment.project.meta.project_name.lower():
install_self = (kind, key)
elif key in self.SEQUENTIAL_PACKAGES:
sequential_jobs.append((kind, key))
elif key in self.candidates and self.candidates[key].req.editable:
# Editable packages are installed sequentially.
sequential_jobs.append((kind, key))
else:
parallel_jobs.append((kind, key))
errors: List[str] = []
with stream.indent(" "):
for job in sequential_jobs:
kind, key = job
try:
handlers[kind](key)
except Exception as err:
errors.append(f"{kind} {stream.green(key)} failed:\n")
errors.extend(
traceback.format_exception(type(err), err, err.__traceback__)
)
def update_progress(future, kind, key):
if future.exception():
failed[section].append(key)
errors.append(future.exception())
else:
result[section].append(future.result())
bar.update(1)
errors.append(f"{kind} {stream.green(key)} failed:\n")
error = future.exception()
errors.extend(
traceback.format_exception(type(error), error, error.__traceback__)
)
with stream.logging("install"):
with self.progressbar(
"Synchronizing:", sum(len(lst) for lst in to_do.values())
) as (bar, pool):
# First update packages, then remove and add
for section in sorted(to_do, reverse=True):
# setup toolkits are installed sequentially before other packages.
for key in sorted(
to_do[section], key=lambda x: x not in self.SEQUENTIAL_PACKAGES
):
future = pool.submit(handlers[section], key)
future.add_done_callback(
functools.partial(
update_progress, section=section, key=key, bar=bar
)
)
if key in self.SEQUENTIAL_PACKAGES:
future.result()
# Retry for failed items
for i in range(self.RETRY_TIMES):
if not any(failed.values()):
break
stream.echo(
stream.yellow("\nSome packages failed to install, retrying...")
)
to_do = failed
failed = defaultdict(list)
errors.clear()
with self.progressbar(
f"Retrying ({i + 1}/{self.RETRY_TIMES}):",
sum(len(lst) for lst in to_do.values()),
) as (bar, pool):
for section in sorted(to_do, reverse=True):
for key in sorted(
to_do[section],
key=lambda x: x not in self.SEQUENTIAL_PACKAGES,
):
future = pool.submit(handlers[section], key)
future.add_done_callback(
functools.partial(
update_progress, section=section, key=key, bar=bar
)
)
if key in self.SEQUENTIAL_PACKAGES:
future.result()
# End installation
self.summarize(result)
with stream.indent(" "), self.create_executor() as executor:
for job in parallel_jobs:
kind, key = job
future = executor.submit(handlers[kind], key)
future.add_done_callback(
partial(update_progress, kind=kind, key=key)
)
if errors:
stream.echo(stream.red("\nERRORS:"))
stream.echo("".join(errors), err=True)
raise InstallationError("Some package operations are not complete yet")
self.environment.write_site_py()
if not any(failed.values()):
return
stream.echo("\n")
error_msg = []
if failed["add"] + failed["update"]:
error_msg.append(
"Installation failed: "
f"{', '.join(failed['add'] + failed['update'])}"
)
if failed["remove"]:
error_msg.append(f"Removal failed: {', '.join(failed['remove'])}")
for error in errors:
stream.echo(
"".join(
traceback.format_exception(
type(error), error, error.__traceback__
)
),
verbosity=stream.DEBUG,
)
raise InstallationError("\n" + "\n".join(error_msg))
if install_self:
stream.echo("Installing the project as an editable package...")
with stream.indent(" "):
handlers[install_self[0]](install_self[1])
stream.echo("\nAll complete!")
import contextlib
import functools
import io
import logging
import os
import re
import sys
from itertools import zip_longest
from tempfile import mktemp
from typing import List, Optional
import click
import halo
from pdm._vendor import halo
from pdm.utils import cached_property
COLORS = ("red", "green", "yellow", "blue", "black", "magenta", "cyan", "white")
......@@ -51,7 +55,6 @@ class IOStream:
def __init__(self, verbosity: int = NORMAL, disable_colors: bool = False) -> None:
self.verbosity = verbosity
self._disable_colors = disable_colors
self._indent = ""
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
......@@ -61,12 +64,21 @@ class IOStream:
for color in COLORS:
setattr(self, color, functools.partial(self._style, fg=color))
def disable_colors(self) -> None:
self._disable_colors = True
def set_verbosity(self, verbosity: int) -> None:
self.verbosity = verbosity
@cached_property
def supports_ansi(self) -> bool:
if os.getenv("CI"):
return False
stream = sys.stdout
if not hasattr(stream, "fileno"):
return False
try:
return os.isatty(stream.fileno())
except io.UnsupportedOperation:
return False
def echo(
self, message: str = "", err: bool = False, verbosity: int = NORMAL, **kwargs
) -> None:
......@@ -74,10 +86,12 @@ class IOStream:
click.echo(self._indent + str(message), err=err, **kwargs)
def _style(self, text: str, *args, **kwargs) -> str:
if self._disable_colors:
if not self.supports_ansi:
return text
else:
return click.style(text, *args, **kwargs)
return click.style(text, *args, **kwargs)
def bold(self, text: str, **kwargs) -> str:
return self._style(text, bold=True, **kwargs)
def display_columns(
self, rows: List[List[str]], header: Optional[List[str]] = None
......@@ -135,10 +149,10 @@ class IOStream:
@contextlib.contextmanager
def open_spinner(self, title: str, spinner: str = "dots"):
if self.verbosity >= self.DETAIL:
if self.verbosity >= self.DETAIL or not self.supports_ansi:
bar = DummySpinner()
else:
bar = halo.Halo(title, spinner=spinner)
bar = halo.Halo(title, spinner=spinner, indent=self._indent)
with bar as bar:
yield bar
......
......@@ -6,7 +6,6 @@ import re
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Type, Union
import halo
import tomlkit
from pip._vendor.pkg_resources import safe_name
from pip_shims import shims
......@@ -34,6 +33,7 @@ if TYPE_CHECKING:
from resolvelib.reporters import BaseReporter
from tomlkit.container import Container
from pdm._vendor import halo
from pdm.resolver.providers import BaseProvider
......@@ -294,7 +294,7 @@ class Project:
with atomic_open_for_write(self.lockfile_file) as fp:
fp.write(tomlkit.dumps(toml_data))
if show_message:
stream.echo("Changes are written to pdm.lock.")
stream.echo(f"Changes are written to {stream.green('pdm.lock')}.")
self._lockfile = None
def make_self_candidate(self, editable: bool = True) -> Candidate:
......@@ -327,7 +327,7 @@ class Project:
} or None
result[req.identify()] = can
if section in ("default", "__all__") and self.meta.name and self.meta.version:
result[safe_name(self.meta.name).lower()] = self.make_self_candidate(True)
result[self.meta.project_name.lower()] = self.make_self_candidate(True)
return result
def get_content_hash(self, algo: str = "md5") -> str:
......
......@@ -2,7 +2,6 @@ from __future__ import annotations
from typing import TYPE_CHECKING, Dict, List, Optional
import halo
from resolvelib import BaseReporter
from pdm.iostream import stream
......@@ -10,6 +9,7 @@ from pdm.iostream import stream
if TYPE_CHECKING:
from resolvelib.resolvers import State
from pdm._vendor import halo
from pdm.models.candidates import Candidate
from pdm.models.requirements import Requirement
......@@ -48,8 +48,10 @@ class SpinnerReporter(BaseReporter):
log_title("Resolution Result")
stream.logger.info("Stable pins:")
for k, can in state.mapping.items():
stream.logger.info(f"\t{can.name}\t{can.version}")
if state.mapping:
column_width = max(map(len, state.mapping.keys()))
for k, can in state.mapping.items():
stream.logger.info(f" {k.rjust(column_width)} {can.version}")
def extract_metadata(self):
self.spinner.start("Extracting package metadata")
......@@ -63,15 +65,15 @@ class SpinnerReporter(BaseReporter):
dependency, or None if ``requirement`` is one of the root
requirements passed in from ``Resolver.resolve()``.
"""
parent_line = f"(from {parent.name}-{parent.version})" if parent else ""
parent_line = f"(from {parent.name} {parent.version})" if parent else ""
stream.logger.info(f"\tAdding requirement {requirement.as_line()}{parent_line}")
def backtracking(self, candidate: Candidate) -> None:
"""Called when rejecting a candidate during backtracking."""
stream.logger.info(f"Candidate rejected: {candidate.name}-{candidate.version}")
stream.logger.info(f"Candidate rejected: {candidate.name} {candidate.version}")
stream.logger.info("Backtracking...")
def pinning(self, candidate: Candidate) -> None:
"""Called when adding a candidate to the potential solution."""
self.spinner.text = "Resolving: " + candidate.format()
stream.logger.info(f"\tNew pin: {candidate.name}-{candidate.version}")
stream.logger.info(f"\tNew pin: {candidate.name} {candidate.version}")
......@@ -32,7 +32,6 @@ pip = ">=20.1"
pip_shims = "*"
pythonfinder = "*"
tomlkit = "*"
halo = "<1.0.0,>=0.0.28"
python-cfonts = "*"
resolvelib = "<1.0.0,>=0.3.0"
pdm-pep517 = "<1.0.0,>=0.1.0"
......@@ -46,6 +45,7 @@ pytest-cov = "*"
pytest-mock = "*"
towncrier = "<20.0.0,>=19.2.0"
pytest-xdist = "<2.0.0,>=1.31.0"
vendoring = {version = "*", marker = "python_version ~= '3.8'"}
[tool.pdm.doc-dependencies]
mkdocs = "<2.0.0,>=1.1"
......@@ -57,6 +57,21 @@ pdm = "pdm.core:main"
[tool.black]
line-length = 88
exclude = '''
/(
\.eggs
| \.git
| \.hg
| \.mypy_cache
| \.tox
| \.venv
| _build
| buck-out
| build
| dist
| pdm/_vendor/*
)/
'''
[tool.towncrier]
package = "pdm"
......@@ -104,7 +119,7 @@ build-backend = "pdm.pep517.api"
[tool.isort]
profile = "black"
atomic = true
skip_glob = ["*/setup.py"]
skip_glob = ["*/setup.py", "pdm/_vendor/*"]
filter_files = true
known_first_party = ["pdm"]
known_third_party = [
......@@ -119,3 +134,38 @@ known_third_party = [
"cfonts",
"packaging",
]
[tool.vendoring]
destination = "pdm/_vendor/"
requirements = "pdm/_vendor/vendors.txt"
namespace = "pdm._vendor"
protected-files = ["__init__.py", "README.md", "vendors.txt"]
patches-dir = "vendors/patches"
[tool.vendoring.transformations]
substitute = [
{match = 'import halo\.', replace = 'import pdm._vendor.halo'}
]
drop = [
"bin/",
"*.so",
"typing.*",
"*/tests/"
]
[tool.vendoring.typing-stubs]
halo = []
log_symbols = []
spinners = []
termcolor = []
colorama = []
[tool.vendoring.license.directories]
[tool.vendoring.license.fallback-urls]
[tool.pytest.ini_options]
filterwarnings = [
"ignore::DeprecationWarning"
]
......@@ -2,6 +2,7 @@
exclude =
.git,
tests/fixtures/*,
pdm/_vendor/*,
env,
dist,
build,
......
......@@ -24,8 +24,8 @@ def test_sync_only_different(project, repository, working_set, capsys):
working_set.add_distribution(make_distribution("idna", "2.7"))
actions.do_add(project, packages=["requests"])
out, _ = capsys.readouterr()
assert "4 packages are installed" in out
assert "1 package is updated" in out
assert "4 to add" in out
assert "1 to update" in out
assert "foo" in working_set
assert "test-project" in working_set
assert working_set["chardet"].version == "3.0.4"
......@@ -35,7 +35,7 @@ def test_sync_in_sequential_mode(project, repository, working_set, capsys):
project.project_config["parallel_install"] = False
actions.do_add(project, packages=["requests"])
out, _ = capsys.readouterr()
assert "6 packages are installed" in out
assert "6 to add" in out
assert "test-project" in working_set
assert working_set["chardet"].version == "3.0.4"
......@@ -205,7 +205,7 @@ def test_update_all_packages(project, repository, working_set, capsys):
assert locked_candidates["chardet"].version == "3.0.5"
assert locked_candidates["pytz"].version == "2019.6"
out, _ = capsys.readouterr()
assert "3 packages are updated" in out
assert "3 to update" in out
actions.do_sync(project)
out, _ = capsys.readouterr()
......@@ -326,7 +326,7 @@ def test_list_packages(capsys):
out, _ = capsys.readouterr()
assert "pdm" in out
assert "tomlkit" in out
assert "halo" in out
assert "pip" in out
def test_lock_dependencies(project, repository):
......
......@@ -20,7 +20,6 @@ from pdm._types import CandidateInfo
from pdm.cli.actions import do_init, do_use
from pdm.core import main
from pdm.exceptions import CandidateInfoNotFound
from pdm.iostream import stream
from pdm.models.candidates import Candidate
from pdm.models.environment import Environment
from pdm.models.repositories import BaseRepository
......@@ -30,7 +29,6 @@ from pdm.project.config import Config
from pdm.utils import cached_property, get_finder, temp_environ
from tests import FIXTURES
stream.disable_colors()
os.environ["CI"] = "1"
......
diff --git a/pdm/_vendor/halo/halo.py b/pdm/_vendor/halo/halo.py
index c42941b..32281ef 100644
--- a/pdm/_vendor/halo/halo.py
+++ b/pdm/_vendor/halo/halo.py
@@ -12,7 +12,7 @@ import time
import halo.cursor as cursor
-from pdm._vendor.log_symbols.symbols import LogSymbols
+from pdm._vendor.log_symbols.symbols import LogSymbols, is_supported
from pdm._vendor.spinners.spinners import Spinners
from pdm._vendor.halo._utils import (
@@ -20,7 +20,6 @@ from pdm._vendor.halo._utils import (
decode_utf_8_text,
get_environment,
get_terminal_columns,
- is_supported,
is_text_type,
encode_utf_8_text,
)
@@ -35,11 +34,16 @@ class Halo(object):
"""
CLEAR_LINE = "\033[K"
+ CLEAR_REST = "\033[J"
SPINNER_PLACEMENTS = (
"left",
"right",
)
+ # a global list to keep all Halo instances
+ _instances = []
+ _lock = threading.Lock()
+
def __init__(
self,
text="",
@@ -50,6 +54,7 @@ class Halo(object):
placement="left",
interval=-1,
enabled=True,
+ indent="",
stream=sys.stdout,
):
"""Constructs the Halo object.
@@ -96,6 +101,9 @@ class Halo(object):
self._stop_spinner = None
self._spinner_id = None
self.enabled = enabled
+ self._stopped = False
+ self._content = ""
+ self.indent = indent
environment = get_environment()
@@ -294,7 +302,34 @@ class Halo(object):
return True
- def _write(self, s):
+ def _pop_stream_content_until_self(self, clear_self=False):
+ """Move cursor to the end of this instance's content and erase all contents
+ following it.
+ Parameters
+ ----------
+ clear_self: bool
+ If equals True, the content of current line will also get cleared
+ Returns
+ -------
+ str
+ The content of stream following this instance.
+ """
+ erased_content = []
+ lines_to_erase = self._content.count("\n") if clear_self else 0
+ for inst in Halo._instances[::-1]:
+ if inst is self:
+ break
+ erased_content.append(inst._content)
+ lines_to_erase += inst._content.count("\n")
+
+ if lines_to_erase > 0:
+ # Move cursor up n lines
+ self._write_stream("\033[{}A".format(lines_to_erase))
+ # Erase rest content
+ self._write_stream(self.CLEAR_REST)
+ return "".join(reversed(erased_content))
+
+ def _write_stream(self, s):
"""Write to the stream, if writable
Parameters
----------
@@ -304,15 +339,33 @@ class Halo(object):
if self._check_stream():
self._stream.write(s)
- def _hide_cursor(self):
- """Disable the user's blinking cursor
+ def _write(self, s, overwrite=False):
+ """Write to the stream and keep following lines unchanged.
+ Parameters
+ ----------
+ s : str
+ Characters to write to the stream
+ overwrite: bool
+ If set to True, overwrite the content of current instance.
"""
+ if s.startswith("\r"):
+ s = f"\r{self.indent}{s[1:]}"
+ else:
+ s = f"{self.indent}{s}"
+ with Halo._lock:
+ erased_content = self._pop_stream_content_until_self(overwrite)
+ self._write_stream(s)
+ # Write back following lines
+ self._write_stream(erased_content)
+ self._content = s if overwrite else self._content + s
+
+ def _hide_cursor(self):
+ """Disable the user's blinking cursor"""
if self._check_stream() and self._stream.isatty():
cursor.hide(stream=self._stream)
def _show_cursor(self):
- """Re-enable the user's blinking cursor
- """
+ """Re-enable the user's blinking cursor"""
if self._check_stream() and self._stream.isatty():
cursor.show(stream=self._stream)
@@ -390,26 +443,26 @@ class Halo(object):
-------
self
"""
- self._write("\r")
- self._write(self.CLEAR_LINE)
+ with Halo._lock:
+ erased_content = self._pop_stream_content_until_self(True)
+ self._content = ""
+ self._write_stream(erased_content)
return self
def _render_frame(self):
- """Renders the frame on the line after clearing it.
- """
+ """Renders the frame on the line after clearing it."""
if not self.enabled:
# in case we're disabled or stream is closed while still rendering,
# we render the frame and increment the frame index, so the proper
# frame is rendered if we're reenabled or the stream opens again.
return
- self.clear()
frame = self.frame()
- output = "\r{}".format(frame)
+ output = "\r{}\n".format(frame)
try:
- self._write(output)
+ self._write(output, True)
except UnicodeEncodeError:
- self._write(encode_utf_8_text(output))
+ self._write(encode_utf_8_text(output), True)
def render(self):
"""Runs the render until thread flag is set.
@@ -490,6 +543,14 @@ class Halo(object):
if not (self.enabled and self._check_stream()):
return self
+ # Clear all stale Halo instances created before
+ # Check against Halo._instances instead of self._instances
+ # to avoid possible overriding in subclasses.
+ if all(inst._stopped for inst in Halo._instances):
+ Halo._instances[:] = []
+ # Allow for calling start() multiple times
+ if self not in Halo._instances:
+ Halo._instances.append(self)
self._hide_cursor()
self._stop_spinner = threading.Event()
@@ -498,6 +559,7 @@ class Halo(object):
self._render_frame()
self._spinner_id = self._spinner_thread.name
self._spinner_thread.start()
+ self._stopped = False
return self
@@ -511,12 +573,17 @@ class Halo(object):
self._stop_spinner.set()
self._spinner_thread.join()
+ if self._stopped:
+ return
+
if self.enabled:
self.clear()
self._frame_index = 0
self._spinner_id = None
self._show_cursor()
+ self._stopped = True
+
return self
def succeed(self, text=None):
diff --git a/pdm/_vendor/log_symbols/symbols.py b/pdm/_vendor/log_symbols/symbols.py
index b7047fc..3ed2ef0 100644
--- a/pdm/_vendor/log_symbols/symbols.py
+++ b/pdm/_vendor/log_symbols/symbols.py
@@ -1,6 +1,9 @@
# -*- coding: utf-8 -*-
"""Provide log symbols for various log levels."""
-import platform
+import codecs
+import locale
+import os
+import sys
from enum import Enum
from pdm._vendor.colorama import init, deinit, Fore
@@ -30,13 +33,17 @@ def is_supported():
boolean
Whether operating system supports main symbols or not
"""
-
- os_arch = platform.system()
-
- if os_arch != 'Windows':
- return True
-
- return False
+ if os.getenv("DISABLE_UNICODE_OUTPUT"):
+ return False
+ encoding = getattr(sys.stdout, "encoding")
+ if encoding is None:
+ encoding = locale.getpreferredencoding(False)
+
+ try:
+ encoding = codecs.lookup(encoding).name
+ except Exception:
+ encoding = "utf-8"
+ return encoding == "utf-8"
_SYMBOLS = _MAIN if is_supported() else _FALLBACKS
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册