提交 787708cc 编写于 作者: HansBug's avatar HansBug 😆

save the great changes about graph

上级 0165b367
......@@ -24,3 +24,9 @@ common_direct_base
.. autofunction:: treevalue.utils.clazz.common_direct_base
get_class_full_name
~~~~~~~~~~~~~~~~~~~~~~
.. autofunction:: treevalue.utils.clazz.get_class_full_name
......@@ -18,3 +18,9 @@ static_call
.. autofunction:: treevalue.utils.func.static_call
post_process
~~~~~~~~~~~~~~~~~~
.. autofunction:: treevalue.utils.func.post_process
......@@ -2,7 +2,11 @@ import pytest
from easydict import EasyDict
from treevalue.tree import FastTreeValue, TreeValue
from treevalue.utils import init_magic, common_direct_base, common_bases
from treevalue.utils import init_magic, common_direct_base, common_bases, get_class_full_name
class _TestClass:
pass
@pytest.mark.unittest
......@@ -88,3 +92,7 @@ class TestUtilsClazz:
assert common_direct_base(T3, T4) == T1
assert common_direct_base(T4, T5) == object
def test_get_class_full_name(self):
assert get_class_full_name(str) == 'str'
assert get_class_full_name(_TestClass) == 'test.utils.test_clazz._TestClass'
......@@ -2,7 +2,7 @@ from functools import wraps
import pytest
from treevalue.utils import args_iter, dynamic_call, static_call
from treevalue.utils import args_iter, dynamic_call, static_call, post_process
@pytest.mark.unittest
......@@ -51,3 +51,16 @@ class TestUtilsFunc:
with pytest.raises(TypeError):
_ = static_call(another_f, static_ok=False)
def test_post_process(self):
@post_process(lambda x: -x)
def plus(a, b):
return a + b
assert plus(1, 2) == -3
@post_process(lambda: None)
def plus2(a, b):
return a + b
assert plus2(1, 2) is None
from .graph import graphics
from .tree import TreeValue
from .utils import jsonify, view, clone, typetrans, mapping, filter_, mask, shrink, union, subside, rise, \
NO_RISE_TEMPLATE
import colorsys
from functools import wraps
from typing import Type
from graphviz import Digraph
from .tree import TreeValue, get_data_property
from ...utils import get_class_full_name, seed_random, post_process, build_graph, dynamic_call
_PRIME_P, _PRIME_Q, _PRIME_R, _PRIME_S = 482480892821, 697797055633, 251526220339, 572076910547
def _str_hash(string):
sum_ = 0
for ch in str(string):
sum_ = (sum_ + ord(ch) * _PRIME_P) % _PRIME_Q
return sum_
_MAX_RGB = 0xff
def _max_mul(r):
return int(round(r * _MAX_RGB))
def _rrgb_wrap(func):
@wraps(func)
def _new_func(*args, **kwargs):
_result = func(*args, **kwargs)
if len(_result) > 3:
r, g, b, a = _result[:4]
return _max_mul(r), _max_mul(g), _max_mul(b), _max_mul(a)
else:
r, g, b = _result[:3]
return _max_mul(r), _max_mul(g), _max_mul(b)
return _new_func
def _rgb_str_wrap(func):
@wraps(func)
def _new_func(*args, **kwargs):
_result = func(*args, **kwargs)
if len(_result) > 3:
return '%02x%02x%02x%02x' % tuple(_result[:4])
else:
return '%02x%02x%02x' % tuple(_result[:3])
return _new_func
@_rgb_str_wrap
@_rrgb_wrap
def _color_from_class_raw(type_: Type[TreeValue], alpha=None):
with seed_random(_str_hash(get_class_full_name(type_))) as rnd:
h, s, v = rnd.random(), rnd.random() * 0.4 + 0.6, rnd.random() * 0.4 + 0.6
r, g, b = colorsys.hsv_to_rgb(h, s, v)
if alpha is None:
return r, g, b
else:
return r, g, b, alpha
_color_from_class = post_process(lambda s: '#%s' % (s,))(_color_from_class_raw)
def _color_from_node(n, alpha=None):
return _color_from_class(type(n), alpha)
def _dict_call_merge(d1, d2):
d1 = dynamic_call(d1)
d2 = dynamic_call(d2)
def _new_func(*args, **kwargs):
_r1 = d1(*args, **kwargs)
_r2 = d2(*args, **kwargs)
_return = dict(_r1)
_return.update(_r2)
return _return
return _new_func
def graphics(*trees, title=None, cfg=None, repr_gen=None,
node_cfg_gen=None, edge_cfg_gen=None) -> Digraph:
return build_graph(
*trees,
node_id_gen=lambda n: 'node_%x' % (id(get_data_property(n).actual())),
graph_title=title or "<untitled>",
graph_cfg=cfg or {},
repr_gen=repr_gen or (lambda x: repr(x)),
iter_gen=lambda n: iter(n) if isinstance(n, TreeValue) else None,
node_cfg_gen=_dict_call_merge(lambda n, p, np, pp, is_node, is_root: {
'fillcolor': _color_from_node(n if is_node else p, 0.5),
'color': _color_from_node(n if is_node else p, 0.7 if is_node else 0.0),
'style': 'filled',
'shape': 'diamond' if is_root else ('ellipse' if is_node else 'box'),
'penwidth': 3 if is_root else 1.5,
'fontname': "Times-Roman bold" if is_node else "Times-Roman",
}, (node_cfg_gen or (lambda: {}))),
edge_cfg_gen=_dict_call_merge(lambda n, p, np, pp, is_node: {
'arrowhead': 'vee' if is_node else 'dot',
'arrowsize': 1.0 if is_node else 0.5,
'color': _color_from_node(n if is_node else p, 0.7 if is_node else 0.9),
'fontcolor': _color_from_node(n if is_node else p, 1.0),
'fontname': "Times-Roman",
}, (edge_cfg_gen or (lambda: {}))),
)
from functools import wraps
from typing import Union
from typing import Union, Any, Mapping
from ..common import BaseTree, Tree
from ...utils import init_magic, build_tree
......@@ -14,16 +14,22 @@ def get_data_property(t: 'TreeValue') -> BaseTree:
return getattr(t, _DATA_PROPERTY)
def _dict_unpack(t: Union['TreeValue', Mapping[str, Any]]) -> Union[BaseTree, Any]:
if isinstance(t, BaseTree):
return t
elif isinstance(t, TreeValue):
return get_data_property(t)
elif isinstance(t, dict):
return Tree({str(key): _dict_unpack(value) for key, value in t.items()})
else:
return t
def _init_decorate(init_func):
@wraps(init_func)
def _new_init_func(data):
if isinstance(data, TreeValue):
_new_init_func(get_data_property(data))
elif isinstance(data, dict):
_new_init_func(Tree({
str(key): get_data_property(value) if isinstance(value, TreeValue) else value
for key, value in data.items()
}))
if isinstance(data, (TreeValue, dict)):
_new_init_func(_dict_unpack(data))
elif isinstance(data, BaseTree):
init_func(data)
else:
......
from .clazz import init_magic, class_wraps, common_bases, common_direct_base
from .clazz import init_magic, class_wraps, common_bases, common_direct_base, get_class_full_name
from .enum import int_enum_loads
from .final import FinalMeta
from .func import args_iter, dynamic_call, static_call
from .func import args_iter, dynamic_call, static_call, post_process
from .random import seed_random, random_hex, random_hex_with_timestamp
from .singleton import SingletonMeta, ValueBasedSingletonMeta, SingletonMark
from .tree import build_tree, build_graph
......@@ -181,3 +181,21 @@ def common_direct_base(*classes: Type, base: Union[Collection[Type], Type, None]
template = "No common base found with {classes}."
raise TypeError(template.format(classes=repr(classes), bases=repr(base)))
def get_class_full_name(clazz: type):
"""
Overview:
Get full name of a class.
Arguments:
- clazz (:obj:`type`): Given class.
Returns:
- name (:obj:`str`): Full name of the given class.
"""
module = clazz.__module__
if not module or module == 'builtins':
return clazz.__name__
else:
return module + '.' + clazz.__name__
......@@ -101,3 +101,34 @@ def static_call(func: Callable, static_ok: bool = True):
raise TypeError("Given callable is already static.")
return getattr(func, '__wrapped__', func)
def post_process(processor: Callable):
"""
Overview:
Post processor for function.
Arguments:
- processor (:obj:`Callable`): Post processor.
Returns:
- result (:obj:`Any`): Final result.
Example:
>>> @post_process(lambda x: -x)
>>> def plus(a, b):
>>> return a + b
>>>
>>> plus(1, 2) # -3
"""
processor = dynamic_call(processor)
def _decorator(func):
@wraps(func)
def _new_func(*args, **kwargs):
_result = func(*args, **kwargs)
return processor(_result)
return _new_func
return _decorator
......@@ -6,7 +6,7 @@ from typing import Optional, Mapping, Any, Callable
from graphviz import Digraph
from treelib import Tree as LibTree
from .func import dynamic_call
from .func import dynamic_call, post_process
from .random import random_hex_with_timestamp
_ROOT_ID = '_root'
......@@ -81,14 +81,20 @@ def _title_flatten(title):
return title
def _value_to_string(dict_) -> dict:
return type(dict_)({key: str(value) for key, value in dict_.items()})
def _no_none_value(dict_) -> dict:
return type(dict_)({key: value for key, value in dict_.items() if value is not None})
def _none_value_filter(func):
@wraps(func)
@post_process(_value_to_string)
@post_process(_no_none_value)
def _new_func(*args, **kwargs):
return _no_none_value(func(*args, **kwargs))
return func(*args, **kwargs)
return _new_func
......@@ -188,7 +194,7 @@ def build_graph(*roots, node_id_gen: Optional[Callable] = None,
if root_node_id not in _queued_node_ids:
graph.node(
name=root_node_id, label=root_title,
**node_cfg_gen(root, [])
**node_cfg_gen(root, None, [], [], True, True)
)
_queue.put((root_node_id, root, root_title, []))
_queued_node_ids.add(root_node_id)
......@@ -198,21 +204,22 @@ def build_graph(*roots, node_id_gen: Optional[Callable] = None,
for key, _current_node in iter_gen(_parent_node, _parent_path):
_current_path = [*_parent_path, key]
_current_id = node_id_gen(_current_node, _parent_node, _current_path, _parent_path,
not not iter_gen(_current_node, _current_path))
_is_node = not not iter_gen(_current_node, _current_path)
_current_id = node_id_gen(_current_node, _parent_node, _current_path, _parent_path, _is_node)
if iter_gen(_current_node, _current_path):
_current_label = '.'.join([_root_title, *_current_path])
else:
_current_label = repr_gen(_current_node, _current_path)
if _current_id not in _queued_node_ids:
graph.node(_current_id, label=_current_label, **node_cfg_gen(_current_node, _current_path))
graph.node(_current_id, label=_current_label,
**node_cfg_gen(_current_node, _parent_node, _current_path, _parent_path, _is_node, False))
if iter_gen(_current_node, _current_path):
_queue.put((_current_id, _current_node, _root_title, _current_path))
_queued_node_ids.add(_current_id)
if (_parent_id, _current_id) not in _queued_edges:
graph.edge(_parent_id, _current_id, label=key,
**edge_cfg_gen(_current_node, _parent_node, _current_path, _parent_path))
**edge_cfg_gen(_current_node, _parent_node, _current_path, _parent_path, _is_node))
_queued_edges.add((_parent_id, _current_id))
return graph
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册