提交 9f7ff6c4 编写于 作者: HansBug's avatar HansBug 😆

add new function code

上级 f97b7ceb
......@@ -2,8 +2,7 @@ from functools import wraps
import pytest
from treevalue.utils import args_iter, dynamic_call, static_call, post_process
from treevalue.utils.func import freduce
from treevalue.utils import args_iter, dynamic_call, static_call, post_process, pre_process, freduce
@pytest.mark.unittest
......@@ -53,6 +52,37 @@ class TestUtilsFunc:
with pytest.raises(TypeError):
_ = static_call(another_f, static_ok=False)
def test_pre_process(self):
@pre_process(lambda x, y: (-x, (x + 2) * y))
def plus(a, b):
return a + b
assert plus(1, 2) == 5
@pre_process(lambda x, y: ((), {'a': -x, 'b': (x + 2) * y}))
def plus2(a, b):
return a + b
assert plus2(1, 2) == 5
@pre_process(lambda x, y: {'a': -x, 'b': (x + 2) * y})
def plus3(a, b):
return a + b
assert plus3(1, 2) == 5
@pre_process(lambda x, y: ((-x, -x + 1, -x + 2), (y, y + 1, y + 2)))
def plus4(a, b):
return a + b
assert plus4(1, 2) == (-1, 0, 1, 2, 3, 4)
@pre_process(lambda x: -x)
def pw(a):
return a ** a
assert pw(-3) == 27
def test_post_process(self):
@post_process(lambda x: -x)
def plus(a, b):
......
import colorsys
from enum import IntEnum, unique
from functools import wraps
from typing import Type, Callable, Union, Optional, Tuple
from typing import Type, Callable, Union, Optional, Tuple, Any
import enum_tools
from graphviz import Digraph
from .tree import TreeValue, get_data_property
from ...utils import get_class_full_name, seed_random, post_process, build_graph, dynamic_call
from ...utils.func import freduce
from ...utils import get_class_full_name, seed_random, post_process, build_graph, dynamic_call, \
int_enum_loads, freduce
from ...utils.tree import SUFFIXED_TAG
_PRIME_P, _PRIME_Q, _PRIME_R, _PRIME_S = 482480892821, 697797055633, 251526220339, 572076910547
......@@ -52,12 +54,33 @@ def _rgb_str_wrap(func):
return _new_func
_H_CLUSTERS = 18
_H_UNIT = 1 / _H_CLUSTERS
_H_MIN = 0
_H_MAX = _H_CLUSTERS - 1
_H_CLUSTER_MAPPING = [11, 8, 7, 4, 16, 0, 13, 6, 14, 3, 15, 2, 17, 1, 12, 9, 10, 5]
_S_CLUSTERS = 12
_S_UNIT = 1 / _S_CLUSTERS
_S_MIN = 6
_S_MAX = 9
@post_process(lambda s: '#%s' % (s,))
@_rgb_str_wrap
@_rrgb_wrap
def _color_from_tag(tag: str, alpha=None):
def _color_from_tag(tag: str, alpha=None, hx: int = None, sr: float = None):
if not isinstance(tag, str):
tag = 'hash_%x' % (hash(tag),)
with seed_random(_str_hash(tag)) as rnd:
h, s, v = rnd.random(), rnd.random() * 0.4 + 0.6, rnd.random() * 0.4 + 0.6
h, s, v = _H_CLUSTER_MAPPING[rnd.randint(_H_MIN, _H_MAX)] * _H_UNIT, \
rnd.randint(_S_MIN, _S_MAX) * _S_UNIT, rnd.random() * 0.12 + 0.88
if hx is not None:
h = _H_CLUSTER_MAPPING[hx % _H_CLUSTERS] * _H_UNIT
if sr is not None:
s = max(0.0, min(1.0, sr))
r, g, b = colorsys.hsv_to_rgb(h, s, v)
if alpha is None:
return r, g, b
......@@ -65,12 +88,30 @@ def _color_from_tag(tag: str, alpha=None):
return r, g, b, alpha
def _color_from_type(type_: Type[TreeValue], alpha=None):
return _color_from_tag(get_class_full_name(type_), alpha)
def _color_from_data(data, alpha=None, is_edge: bool = False):
if isinstance(data, tuple):
hx, tag = data
return _color_from_tag(tag, alpha, hx)
else:
return _color_from_tag(data, alpha, None)
@enum_tools.documentation.document_enum
@int_enum_loads(name_preprocess=str.upper)
@unique
class ColorTheme(IntEnum):
TYPE = 1 # doc: Distinct colors by the tree types
INDEX = 2 # doc: Distinct colors by the index of the argument tree
NAME = 3 # doc: Distinct colors by the index of the argument's name
def _color_from_node(n, alpha=None):
return _color_from_type(type(n), alpha)
@dynamic_call
def __call__(self, root_node, root_title: str, root_index: int):
if self == self.__class__.TYPE:
return get_class_full_name(type(root_node))
elif self == self.__class__.INDEX:
return root_index, 'root_%d' % (root_index,)
elif self == self.__class__.NAME:
return root_title
@freduce(init=lambda: (lambda: {}))
......@@ -126,6 +167,7 @@ def _dup_value_func(dup_value):
def graphics(*trees, title: Optional[str] = None, cfg: Optional[dict] = None,
dup_value: Union[bool, Callable, type, Tuple[Type, ...]] = False,
color_theme_gen: Union[Callable, str, ColorTheme, Any, None] = None,
repr_gen: Optional[Callable] = None,
node_cfg_gen: Optional[Callable] = None,
edge_cfg_gen: Optional[Callable] = None) -> Digraph:
......@@ -153,6 +195,11 @@ def graphics(*trees, title: Optional[str] = None, cfg: Optional[dict] = None,
- graph (:obj:`Digraph`): Generated graph of tree values.
"""
color_theme_gen = color_theme_gen or ColorTheme.TYPE
if not hasattr(color_theme_gen, '__call__'):
color_theme_gen = ColorTheme.loads(color_theme_gen)
color_theme_gen = post_process(lambda x: '%x' % _str_hash(x))(dynamic_call(color_theme_gen))
def _node_tag(current, parent, current_path, parent_path, is_node):
if is_node:
return _node_id(current, current_path)
......@@ -170,19 +217,19 @@ def graphics(*trees, title: Optional[str] = None, cfg: Optional[dict] = None,
graph_cfg=cfg or {},
repr_gen=repr_gen or (lambda x: repr(x)),
iter_gen=lambda n: iter(n) if isinstance(n, TreeValue) else None,
node_cfg_gen=_dict_call_merge(lambda n, p, np, pp, is_node, is_root: {
'fillcolor': _color_from_node(n if is_node else p, 0.5),
'color': _color_from_node(n if is_node else p, 0.7 if is_node else 0.0),
node_cfg_gen=_dict_call_merge(lambda n, p, np, pp, is_node, is_root, root: {
'fillcolor': _color_from_data(color_theme_gen(*root), 0.5),
'color': _color_from_data(color_theme_gen(*root), 0.7 if is_node else 0.0),
'style': 'filled',
'shape': 'diamond' if is_root else ('ellipse' if is_node else 'box'),
'penwidth': 3 if is_root else 1.5,
'fontname': "Times-Roman bold" if is_node else "Times-Roman",
}, (node_cfg_gen or (lambda: {}))),
edge_cfg_gen=_dict_call_merge(lambda n, p, np, pp, is_node: {
edge_cfg_gen=_dict_call_merge(lambda n, p, np, pp, is_node, root: {
'arrowhead': 'vee' if is_node else 'dot',
'arrowsize': 1.0 if is_node else 0.5,
'color': _color_from_node(n if is_node else p, 0.7 if is_node else 0.9),
'fontcolor': _color_from_node(n if is_node else p, 1.0),
'fontname': "Times-Roman",
'color': _color_from_data(color_theme_gen(*root), 0.7 if is_node else 0.9),
'fontcolor': _color_from_data(color_theme_gen(*root), 2.0),
'fontname': "Times-Roman bold",
}, (edge_cfg_gen or (lambda: {}))),
)
from .clazz import init_magic, class_wraps, common_bases, common_direct_base, get_class_full_name
from .enum import int_enum_loads
from .final import FinalMeta
from .func import args_iter, dynamic_call, static_call, post_process
from .func import args_iter, dynamic_call, static_call, post_process, pre_process, freduce
from .random import seed_random, random_hex, random_hex_with_timestamp
from .singleton import SingletonMeta, ValueBasedSingletonMeta, SingletonMark
from .tree import build_tree, build_graph
......@@ -106,6 +106,49 @@ def static_call(func: Callable, static_ok: bool = True):
return getattr(func, '__wrapped__', func)
def pre_process(processor: Callable):
"""
Overview:
Pre processor for function.
Arguments:
- processor (:obj:`Callable`): Pre processor.
Returns:
- decorator (:obj:`Callable`): Function decorator
Example:
>>> @pre_process(lambda x, y: (-x, (x + 2) * y))
>>> def plus(a, b):
>>> return a + b
>>>
>>> plus(1, 2) # 5, 5 = -1 + (1 + 2) * 2
"""
_processor = dynamic_call(processor)
def _decorator(func):
@wraps(func)
def _new_func(*args, **kwargs):
pargs = _processor(*args, **kwargs)
if isinstance(pargs, tuple) and len(pargs) == 2 \
and isinstance(pargs[0], (list, tuple)) \
and isinstance(pargs[1], (dict,)):
args_, kwargs_ = tuple(pargs[0]), dict(pargs[1])
elif isinstance(pargs, (tuple, list)):
args_, kwargs_ = tuple(pargs), {}
elif isinstance(pargs, (dict,)):
args_, kwargs_ = (), dict(pargs)
else:
args_, kwargs_ = (pargs,), {}
return func(*args_, **kwargs_)
return _new_func
return _decorator
def post_process(processor: Callable):
"""
Overview:
......@@ -115,7 +158,7 @@ def post_process(processor: Callable):
- processor (:obj:`Callable`): Post processor.
Returns:
- result (:obj:`Any`): Final result.
- decorator (:obj:`Callable`): Function decorator
Example:
>>> @post_process(lambda x: -x)
......
......@@ -133,9 +133,9 @@ def _root_process(root, index):
elif len(root) == 1:
return _root_process(root[0], index)
else:
return root[0], str(root[1])
return root[0], str(root[1]), index
else:
return root, '<root_%d>' % (index,)
return root, '<root_%d>' % (index,), index
def build_graph(*roots, node_id_gen: Optional[Callable] = None,
......@@ -189,18 +189,20 @@ def build_graph(*roots, node_id_gen: Optional[Callable] = None,
_queue = Queue()
_queued_node_ids = set()
_queued_edges = set()
for root, root_title in roots:
for root_info in roots:
root, root_title, root_index = root_info
root_node_id = node_id_gen(root, None, [], [], True)
if root_node_id not in _queued_node_ids:
graph.node(
name=root_node_id, label=root_title,
**node_cfg_gen(root, None, [], [], True, True)
**node_cfg_gen(root, None, [], [], True, True, root_info)
)
_queue.put((root_node_id, root, root_title, []))
_queue.put((root_node_id, root, (root, root_title, root_index), []))
_queued_node_ids.add(root_node_id)
while not _queue.empty():
_parent_id, _parent_node, _root_title, _parent_path = _queue.get()
_parent_id, _parent_node, _root_info, _parent_path = _queue.get()
_root_node, _root_title, _root_index = _root_info
for key, _current_node in iter_gen(_parent_node, _parent_path):
_current_path = [*_parent_path, key]
......@@ -213,13 +215,15 @@ def build_graph(*roots, node_id_gen: Optional[Callable] = None,
if _current_id not in _queued_node_ids:
graph.node(_current_id, label=_current_label,
**node_cfg_gen(_current_node, _parent_node, _current_path, _parent_path, _is_node, False))
**node_cfg_gen(_current_node, _parent_node, _current_path, _parent_path,
_is_node, False, _root_info))
if iter_gen(_current_node, _current_path):
_queue.put((_current_id, _current_node, _root_title, _current_path))
_queue.put((_current_id, _current_node, _root_info, _current_path))
_queued_node_ids.add(_current_id)
if (_parent_id, _current_id, key) not in _queued_edges:
graph.edge(_parent_id, _current_id, label=key,
**edge_cfg_gen(_current_node, _parent_node, _current_path, _parent_path, _is_node))
**edge_cfg_gen(_current_node, _parent_node, _current_path, _parent_path,
_is_node, _root_info))
_queued_edges.add((_parent_id, _current_id, key))
return graph
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册