Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
OpenDILab开源决策智能平台
DI-engine
提交
fd863c00
D
DI-engine
项目概览
OpenDILab开源决策智能平台
/
DI-engine
上一次同步 接近 3 年
通知
66
Star
322
Fork
1
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
1
Wiki
分析
仓库
DevOps
项目成员
Pages
D
DI-engine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
1
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
fd863c00
编写于
11月 02, 2021
作者:
X
Xu Jingxin
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Rename next to chain
上级
3de05532
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
30 addition
and
30 deletion
+30
-30
ding/worker/buffer/buffer.py
ding/worker/buffer/buffer.py
+4
-4
ding/worker/buffer/middleware/clone_object.py
ding/worker/buffer/middleware/clone_object.py
+8
-8
ding/worker/buffer/middleware/use_time_check.py
ding/worker/buffer/middleware/use_time_check.py
+8
-8
ding/worker/buffer/tests/test_buffer.py
ding/worker/buffer/tests/test_buffer.py
+10
-10
未找到文件。
ding/worker/buffer/buffer.py
浏览文件 @
fd863c00
...
...
@@ -11,8 +11,8 @@ def apply_middleware(func_name: str):
"""
Overview:
The real processing starts here, we apply the middleware one by one,
each middleware will receive
a `next
` function, which is an executor of next
middleware. You can change the input arguments to the
`next
` middleware, and you
each middleware will receive
next `chained
` function, which is an executor of next
middleware. You can change the input arguments to the
next `chained
` middleware, and you
also can get the return value from the next middleware, so you have the
maximum freedom to choose at what stage to implement your method.
"""
...
...
@@ -21,11 +21,11 @@ def apply_middleware(func_name: str):
if
len
(
middleware
)
==
0
:
return
base_func
(
buffer
,
*
args
,
**
kwargs
)
def
next
(
*
args
,
**
kwargs
):
def
chain
(
*
args
,
**
kwargs
):
return
wrap_handler
(
middleware
[
1
:],
*
args
,
**
kwargs
)
func
=
middleware
[
0
]
return
func
(
func_name
,
next
,
*
args
,
**
kwargs
)
return
func
(
func_name
,
chain
,
*
args
,
**
kwargs
)
return
wrap_handler
(
buffer
.
middleware
,
*
args
,
**
kwargs
)
...
...
ding/worker/buffer/middleware/clone_object.py
浏览文件 @
fd863c00
...
...
@@ -58,19 +58,19 @@ def clone_object():
"""
fastcopy
=
FastCopy
()
def
push
(
next
:
Callable
,
data
:
Any
,
*
args
,
**
kwargs
)
->
None
:
def
push
(
chain
:
Callable
,
data
:
Any
,
*
args
,
**
kwargs
)
->
None
:
data
=
fastcopy
.
copy
(
data
)
return
next
(
data
,
*
args
,
**
kwargs
)
return
chain
(
data
,
*
args
,
**
kwargs
)
def
sample
(
next
:
Callable
,
*
args
,
**
kwargs
)
->
List
[
Any
]:
data
=
next
(
*
args
,
**
kwargs
)
def
sample
(
chain
:
Callable
,
*
args
,
**
kwargs
)
->
List
[
Any
]:
data
=
chain
(
*
args
,
**
kwargs
)
return
fastcopy
.
copy
(
data
)
def
_immutable_object
(
action
:
str
,
next
:
Callable
,
*
args
,
**
kwargs
):
def
_immutable_object
(
action
:
str
,
chain
:
Callable
,
*
args
,
**
kwargs
):
if
action
==
"push"
:
return
push
(
next
,
*
args
,
**
kwargs
)
return
push
(
chain
,
*
args
,
**
kwargs
)
elif
action
==
"sample"
:
return
sample
(
next
,
*
args
,
**
kwargs
)
return
next
(
*
args
,
**
kwargs
)
return
sample
(
chain
,
*
args
,
**
kwargs
)
return
chain
(
*
args
,
**
kwargs
)
return
_immutable_object
ding/worker/buffer/middleware/use_time_check.py
浏览文件 @
fd863c00
...
...
@@ -9,28 +9,28 @@ def use_time_check(max_use: int = float("inf")) -> Callable:
greater than max_use, this data will be removed from buffer as soon as possible.
"""
def
push
(
next
:
Callable
,
data
:
Any
,
*
args
,
**
kwargs
)
->
None
:
def
push
(
chain
:
Callable
,
data
:
Any
,
*
args
,
**
kwargs
)
->
None
:
if
'meta'
in
kwargs
:
kwargs
[
'meta'
][
'use_count'
]
=
0
else
:
kwargs
[
'meta'
]
=
{
'use_count'
:
0
}
return
next
(
data
,
*
args
,
**
kwargs
)
return
chain
(
data
,
*
args
,
**
kwargs
)
def
sample
(
next
:
Callable
,
*
args
,
**
kwargs
)
->
List
[
Any
]:
def
sample
(
chain
:
Callable
,
*
args
,
**
kwargs
)
->
List
[
Any
]:
kwargs
[
'return_index'
]
=
True
kwargs
[
'return_meta'
]
=
True
data
=
next
(
*
args
,
**
kwargs
)
data
=
chain
(
*
args
,
**
kwargs
)
for
i
,
(
d
,
idx
,
meta
)
in
enumerate
(
data
):
meta
[
'use_count'
]
+=
1
if
meta
[
'use_count'
]
>=
max_use
:
print
(
'max_use trigger'
)
# TODO(nyz)
return
data
def
_immutable_object
(
action
:
str
,
next
:
Callable
,
*
args
,
**
kwargs
)
->
Any
:
def
_immutable_object
(
action
:
str
,
chain
:
Callable
,
*
args
,
**
kwargs
)
->
Any
:
if
action
==
"push"
:
return
push
(
next
,
*
args
,
**
kwargs
)
return
push
(
chain
,
*
args
,
**
kwargs
)
elif
action
==
"sample"
:
return
sample
(
next
,
*
args
,
**
kwargs
)
return
next
(
*
args
,
**
kwargs
)
return
sample
(
chain
,
*
args
,
**
kwargs
)
return
chain
(
*
args
,
**
kwargs
)
return
_immutable_object
ding/worker/buffer/tests/test_buffer.py
浏览文件 @
fd863c00
...
...
@@ -17,20 +17,20 @@ class RateLimit:
def
handler
(
self
)
->
Callable
:
def
_handler
(
action
:
str
,
next
:
Callable
,
*
args
,
**
kwargs
):
def
_handler
(
action
:
str
,
chain
:
Callable
,
*
args
,
**
kwargs
):
if
action
==
"push"
:
return
self
.
push
(
next
,
*
args
,
**
kwargs
)
return
next
(
*
args
,
**
kwargs
)
return
self
.
push
(
chain
,
*
args
,
**
kwargs
)
return
chain
(
*
args
,
**
kwargs
)
return
_handler
def
push
(
self
,
next
,
data
,
*
args
,
**
kwargs
)
->
None
:
def
push
(
self
,
chain
,
data
,
*
args
,
**
kwargs
)
->
None
:
current
=
time
.
time
()
# Cut off stale records
self
.
buffered
=
[
t
for
t
in
self
.
buffered
if
t
>
current
-
self
.
window_seconds
]
if
len
(
self
.
buffered
)
<
self
.
max_rate
:
self
.
buffered
.
append
(
current
)
return
next
(
data
,
*
args
,
**
kwargs
)
return
chain
(
data
,
*
args
,
**
kwargs
)
else
:
return
None
...
...
@@ -40,14 +40,14 @@ def add_10() -> Callable:
Transform data on sampling
"""
def
sample
(
next
:
Callable
,
size
:
int
,
replace
:
bool
=
False
,
*
args
,
**
kwargs
):
data
=
next
(
size
,
replace
,
*
args
,
**
kwargs
)
def
sample
(
chain
:
Callable
,
size
:
int
,
replace
:
bool
=
False
,
*
args
,
**
kwargs
):
data
=
chain
(
size
,
replace
,
*
args
,
**
kwargs
)
return
[
d
+
10
for
d
in
data
]
def
_subview
(
action
:
str
,
next
:
Callable
,
*
args
,
**
kwargs
):
def
_subview
(
action
:
str
,
chain
:
Callable
,
*
args
,
**
kwargs
):
if
action
==
"sample"
:
return
sample
(
next
,
*
args
,
**
kwargs
)
return
next
(
*
args
,
**
kwargs
)
return
sample
(
chain
,
*
args
,
**
kwargs
)
return
chain
(
*
args
,
**
kwargs
)
return
_subview
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录