Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
zengbin93
czsc
提交
3ee5e66b
C
czsc
项目概览
zengbin93
/
czsc
通知
23
Star
2
Fork
1
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
4
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
C
czsc
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
4
Issue
4
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
提交
3ee5e66b
编写于
7月 26, 2020
作者:
Z
zengbin93
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
0.5.1 实现线段增量更新
上级
8ecf16c2
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
243 addition
and
27 deletion
+243
-27
czsc/__init__.py
czsc/__init__.py
+1
-1
czsc/analyze_v3.py
czsc/analyze_v3.py
+231
-15
test/test_analyze_v3.py
test/test_analyze_v3.py
+11
-11
未找到文件。
czsc/__init__.py
浏览文件 @
3ee5e66b
...
...
@@ -2,7 +2,7 @@
from
.ta
import
ma
,
macd
,
boll
from
.utils
import
plot_kline
from
.analyze
import
KlineAnalyze
,
is_bei_chi
from
.analyze
_v3
import
KlineAnalyze
__version__
=
"0.5.1"
__author__
=
"zengbin93"
...
...
czsc/analyze_v3.py
浏览文件 @
3ee5e66b
...
...
@@ -4,11 +4,145 @@ import pandas as pd
from
czsc.ta
import
ma
,
macd
,
boll
from
czsc.utils
import
plot_ka
,
plot_kline
def
is_bei_chi
(
ka
,
zs1
,
zs2
,
mode
=
"bi"
,
adjust
=
0.9
):
"""判断 zs1 对 zs2 是否有背驰
注意:力度的比较,并没有要求两段走势方向一致;但是如果两段走势之间存在包含关系,这样的力度比较是没有意义的。
:param ka: KlineAnalyze
缠论的分析结果,即去除包含关系后,识别出分型、笔、线段的K线
:param zs1: dict
用于比较的走势,通常是最近的走势,示例如下:
zs1 = {"start_dt": "2020-02-20 11:30:00", "end_dt": "2020-02-20 14:30:00", "direction": "up"}
:param zs2: dict
被比较的走势,通常是较前的走势,示例如下:
zs2 = {"start_dt": "2020-02-21 11:30:00", "end_dt": "2020-02-21 14:30:00", "direction": "down"}
:param mode: str
default `bi`, optional value [`xd`, `bi`]
xd 判断两个线段之间是否存在背驰
bi 判断两笔之间是否存在背驰
:param adjust: float
调整 zs2 的力度,建议设置范围在 0.6 ~ 1.0 之间,默认设置为 0.9;
其作用是确保 zs1 相比于 zs2 的力度足够小。
:return:
"""
assert
zs1
[
"start_dt"
]
>
zs2
[
"end_dt"
],
"zs1 必须是最近的走势,用于比较;zs2 必须是较前的走势,被比较。"
assert
zs1
[
"start_dt"
]
<
zs1
[
"end_dt"
],
"走势的时间区间定义错误,必须满足 start_dt < end_dt"
assert
zs2
[
"start_dt"
]
<
zs2
[
"end_dt"
],
"走势的时间区间定义错误,必须满足 start_dt < end_dt"
df
=
ka
.
to_df
(
ma_params
=
(
5
,),
use_macd
=
True
,
use_boll
=
False
)
k1
=
df
[(
df
[
'dt'
]
>=
zs1
[
"start_dt"
])
&
(
df
[
'dt'
]
<=
zs1
[
"end_dt"
])]
k2
=
df
[(
df
[
'dt'
]
>=
zs2
[
"start_dt"
])
&
(
df
[
'dt'
]
<=
zs2
[
"end_dt"
])]
bc
=
False
if
mode
==
'bi'
:
macd_sum1
=
sum
([
abs
(
x
)
for
x
in
k1
.
macd
])
macd_sum2
=
sum
([
abs
(
x
)
for
x
in
k2
.
macd
])
# print("bi: ", macd_sum1, macd_sum2)
if
macd_sum1
<
macd_sum2
*
adjust
:
bc
=
True
elif
mode
==
'xd'
:
assert
zs1
[
'direction'
]
in
[
'down'
,
'up'
],
"走势的 direction 定义错误,可取值为 up 或 down"
assert
zs2
[
'direction'
]
in
[
'down'
,
'up'
],
"走势的 direction 定义错误,可取值为 up 或 down"
if
zs1
[
'direction'
]
==
"down"
:
macd_sum1
=
sum
([
abs
(
x
)
for
x
in
k1
.
macd
if
x
<
0
])
else
:
macd_sum1
=
sum
([
abs
(
x
)
for
x
in
k1
.
macd
if
x
>
0
])
if
zs2
[
'direction'
]
==
"down"
:
macd_sum2
=
sum
([
abs
(
x
)
for
x
in
k2
.
macd
if
x
<
0
])
else
:
macd_sum2
=
sum
([
abs
(
x
)
for
x
in
k2
.
macd
if
x
>
0
])
# print("xd: ", macd_sum1, macd_sum2)
if
macd_sum1
<
macd_sum2
*
adjust
:
bc
=
True
else
:
raise
ValueError
(
"mode value error"
)
return
bc
def
find_zs
(
points
):
"""输入笔或线段标记点,输出中枢识别结果"""
if
len
(
points
)
<=
4
:
return
[]
# 当输入为笔的标记点时,新增 xd 值
for
i
,
x
in
enumerate
(
points
):
if
x
.
get
(
"bi"
,
0
):
points
[
i
][
'xd'
]
=
x
[
"bi"
]
k_xd
=
points
k_zs
=
[]
zs_xd
=
[]
for
i
in
range
(
len
(
k_xd
)):
if
len
(
zs_xd
)
<
5
:
zs_xd
.
append
(
k_xd
[
i
])
continue
xd_p
=
k_xd
[
i
]
zs_d
=
max
([
x
[
'xd'
]
for
x
in
zs_xd
[:
4
]
if
x
[
'fx_mark'
]
==
'd'
])
zs_g
=
min
([
x
[
'xd'
]
for
x
in
zs_xd
[:
4
]
if
x
[
'fx_mark'
]
==
'g'
])
if
zs_g
<=
zs_d
:
zs_xd
.
append
(
k_xd
[
i
])
zs_xd
.
pop
(
0
)
continue
# 定义四个指标,GG=max(gn),G=min(gn),D=max(dn),DD=min(dn),
# n遍历中枢中所有Zn。特别地,再定义ZG=min(g1、g2),
# ZD=max(d1、d2),显然,[ZD,ZG]就是缠中说禅走势中枢的区间
if
xd_p
[
'fx_mark'
]
==
"d"
and
xd_p
[
'xd'
]
>
zs_g
:
# 线段在中枢上方结束,形成三买
k_zs
.
append
({
'ZD'
:
zs_d
,
"ZG"
:
zs_g
,
'G'
:
min
([
x
[
'xd'
]
for
x
in
zs_xd
if
x
[
'fx_mark'
]
==
'g'
]),
'GG'
:
max
([
x
[
'xd'
]
for
x
in
zs_xd
if
x
[
'fx_mark'
]
==
'g'
]),
'D'
:
max
([
x
[
'xd'
]
for
x
in
zs_xd
if
x
[
'fx_mark'
]
==
'd'
]),
'DD'
:
min
([
x
[
'xd'
]
for
x
in
zs_xd
if
x
[
'fx_mark'
]
==
'd'
]),
"points"
:
zs_xd
,
"third_buy"
:
xd_p
})
zs_xd
=
k_xd
[
i
-
1
:
i
+
1
]
elif
xd_p
[
'fx_mark'
]
==
"g"
and
xd_p
[
'xd'
]
<
zs_d
:
# 线段在中枢下方结束,形成三卖
k_zs
.
append
({
'ZD'
:
zs_d
,
"ZG"
:
zs_g
,
'G'
:
min
([
x
[
'xd'
]
for
x
in
zs_xd
if
x
[
'fx_mark'
]
==
'g'
]),
'GG'
:
max
([
x
[
'xd'
]
for
x
in
zs_xd
if
x
[
'fx_mark'
]
==
'g'
]),
'D'
:
max
([
x
[
'xd'
]
for
x
in
zs_xd
if
x
[
'fx_mark'
]
==
'd'
]),
'DD'
:
min
([
x
[
'xd'
]
for
x
in
zs_xd
if
x
[
'fx_mark'
]
==
'd'
]),
"points"
:
zs_xd
,
"third_sell"
:
xd_p
})
zs_xd
=
k_xd
[
i
-
1
:
i
+
1
]
else
:
zs_xd
.
append
(
xd_p
)
if
len
(
zs_xd
)
>=
5
:
zs_d
=
max
([
x
[
'xd'
]
for
x
in
zs_xd
[:
4
]
if
x
[
'fx_mark'
]
==
'd'
])
zs_g
=
min
([
x
[
'xd'
]
for
x
in
zs_xd
[:
4
]
if
x
[
'fx_mark'
]
==
'g'
])
k_zs
.
append
({
'ZD'
:
zs_d
,
"ZG"
:
zs_g
,
'G'
:
min
([
x
[
'xd'
]
for
x
in
zs_xd
if
x
[
'fx_mark'
]
==
'g'
]),
'GG'
:
max
([
x
[
'xd'
]
for
x
in
zs_xd
if
x
[
'fx_mark'
]
==
'g'
]),
'D'
:
max
([
x
[
'xd'
]
for
x
in
zs_xd
if
x
[
'fx_mark'
]
==
'd'
]),
'DD'
:
min
([
x
[
'xd'
]
for
x
in
zs_xd
if
x
[
'fx_mark'
]
==
'd'
]),
"points"
:
zs_xd
,
})
return
k_zs
class
KlineAnalyze
:
def
__init__
(
self
,
name
=
"本级别"
,
verbose
=
True
):
def
__init__
(
self
,
kline
,
name
=
"本级别"
,
min_bi_k
=
5
,
verbose
=
True
):
self
.
name
=
name
self
.
verbose
=
verbose
self
.
min_bi_k
=
min_bi_k
self
.
symbol
=
None
self
.
latest_price
=
None
self
.
start_dt
=
None
...
...
@@ -31,6 +165,14 @@ class KlineAnalyze:
self
.
fd_list_l2
=
[]
self
.
fd_list_l3
=
[]
if
isinstance
(
kline
,
pd
.
DataFrame
):
columns
=
kline
.
columns
.
to_list
()
bars
=
[{
k
:
v
for
k
,
v
in
zip
(
columns
,
row
)}
for
row
in
kline
.
values
]
else
:
bars
=
kline
for
bar
in
bars
:
self
.
update
(
bar
)
def
_update_kline_new
(
self
):
"""更新去除包含关系的K线序列
...
...
@@ -102,8 +244,8 @@ class KlineAnalyze:
raise
ValueError
self
.
kline_new
.
append
(
k
)
if
self
.
verbose
:
print
(
f
"原始序列长度:
{
len
(
self
.
kline_raw
)
}
;去除包含关系之后的序列长度:
{
len
(
self
.
kline_new
)
}
"
)
if
self
.
verbose
:
print
(
f
"原始序列长度:
{
len
(
self
.
kline_raw
)
}
;去除包含关系之后的序列长度:
{
len
(
self
.
kline_new
)
}
"
)
def
_update_fx_list
(
self
):
"""更新分型序列
...
...
@@ -144,7 +286,6 @@ class KlineAnalyze:
"fx"
:
k2
[
'high'
],
"fx_high"
:
k2
[
'high'
],
"fx_low"
:
max
(
k1
[
'low'
],
k3
[
'low'
]),
# "left": [x for x in kn if x['dt'] <= k2['dt']]
}
self
.
fx_list
.
append
(
fx
)
...
...
@@ -157,7 +298,6 @@ class KlineAnalyze:
"fx"
:
k2
[
'low'
],
"fx_high"
:
min
(
k1
[
'high'
],
k2
[
'high'
]),
"fx_low"
:
k2
[
'low'
],
# "left": [x for x in kn if x['dt'] <= k2['dt']]
}
self
.
fx_list
.
append
(
fx
)
...
...
@@ -211,7 +351,7 @@ class KlineAnalyze:
self
.
bi_list
[
-
1
]
=
bi
else
:
kn_inside
=
[
x
for
x
in
right_kn
if
last_bi
[
'dt'
]
<=
x
[
'dt'
]
<=
bi
[
'dt'
]]
if
len
(
kn_inside
)
>=
5
:
if
len
(
kn_inside
)
>=
self
.
min_bi_k
:
# 确保相邻两个顶底之间不存在包含关系
if
(
last_bi
[
'fx_mark'
]
==
'g'
and
bi
[
'fx_high'
]
<
last_bi
[
'fx_low'
])
or
\
(
last_bi
[
'fx_mark'
]
==
'd'
and
bi
[
'fx_low'
]
>
last_bi
[
'fx_high'
]):
...
...
@@ -219,6 +359,50 @@ class KlineAnalyze:
print
(
f
"新增笔标记:
{
bi
}
"
)
self
.
bi_list
.
append
(
bi
)
@
staticmethod
def
_make_standard_seq
(
bi_seq
):
"""计算标准特征序列
:return: list of dict
"""
if
bi_seq
[
0
][
'fx_mark'
]
==
'd'
:
direction
=
"up"
elif
bi_seq
[
0
][
'fx_mark'
]
==
'g'
:
direction
=
"down"
else
:
raise
ValueError
raw_seq
=
[{
"dt"
:
bi_seq
[
i
].
dt
,
'high'
:
max
(
bi_seq
[
i
].
price
,
bi_seq
[
i
+
1
].
price
),
'low'
:
min
(
bi_seq
[
i
].
price
,
bi_seq
[
i
+
1
].
price
)}
for
i
in
range
(
1
,
len
(
bi_seq
),
2
)
if
i
<=
len
(
bi_seq
)
-
2
]
seq
=
[]
for
row
in
raw_seq
:
if
not
seq
:
seq
.
append
(
row
)
continue
last
=
seq
[
-
1
]
cur_h
,
cur_l
=
row
[
'high'
],
row
[
'low'
]
last_h
,
last_l
=
last
[
'high'
],
last
[
'low'
]
# 左包含 or 右包含
if
(
cur_h
<=
last_h
and
cur_l
>=
last_l
)
or
(
cur_h
>=
last_h
and
cur_l
<=
last_l
):
seq
.
pop
(
-
1
)
# 有包含关系,按方向分别处理
if
direction
==
"up"
:
last_h
=
max
(
last_h
,
cur_h
)
last_l
=
max
(
last_l
,
cur_l
)
elif
direction
==
"down"
:
last_h
=
min
(
last_h
,
cur_h
)
last_l
=
min
(
last_l
,
cur_l
)
else
:
raise
ValueError
seq
.
append
({
"dt"
:
row
[
'dt'
],
"high"
:
last_h
,
"low"
:
last_l
})
else
:
seq
.
append
(
row
)
return
seq
def
_update_xd_list
(
self
):
"""更新线段序列"""
if
len
(
self
.
bi_list
)
<
4
:
...
...
@@ -226,7 +410,7 @@ class KlineAnalyze:
if
len
(
self
.
xd_list
)
==
0
:
for
i
in
range
(
3
):
xd
=
dict
(
self
.
xd
_list
[
i
])
xd
=
dict
(
self
.
bi
_list
[
i
])
xd
[
'xd'
]
=
xd
.
pop
(
'bi'
)
self
.
xd_list
.
append
(
xd
)
...
...
@@ -234,11 +418,38 @@ class KlineAnalyze:
if
len
(
self
.
xd_list
)
==
0
:
return
right_bi
=
[
x
for
x
in
self
.
bi_list
if
x
[
'dt'
]
>
self
.
xd_list
[
-
1
][
'dt'
]]
for
bi
in
right_bi
:
last_xd
=
self
.
xd_list
[
-
1
]
xd
=
dict
(
bi
)
right_bi
=
[
x
for
x
in
self
.
bi_list
if
x
[
'dt'
]
>=
self
.
xd_list
[
-
1
][
'dt'
]]
xd_p
=
[]
bi_d
=
[
x
for
x
in
right_bi
if
x
[
'fx_mark'
]
==
'd'
]
bi_g
=
[
x
for
x
in
right_bi
if
x
[
'fx_mark'
]
==
'g'
]
for
i
in
range
(
1
,
len
(
bi_d
)
-
2
):
d1
,
d2
,
d3
=
bi_d
[
i
-
1
:
i
+
2
]
if
d1
[
'bi'
]
>
d2
[
'bi'
]
<
d3
[
'bi'
]:
xd_p
.
append
(
d2
)
for
j
in
range
(
1
,
len
(
bi_g
)
-
2
):
g1
,
g2
,
g3
=
bi_g
[
j
-
1
:
j
+
2
]
if
g1
[
'bi'
]
<
g2
[
'bi'
]
>
g3
[
'bi'
]:
xd_p
.
append
(
g2
)
xd_p
=
sorted
(
xd_p
,
key
=
lambda
x
:
x
[
'dt'
],
reverse
=
False
)
for
xp
in
xd_p
:
xd
=
dict
(
xp
)
xd
[
'xd'
]
=
xd
.
pop
(
'bi'
)
last_xd
=
self
.
xd_list
[
-
1
]
if
last_xd
[
'fx_mark'
]
==
xd
[
'fx_mark'
]:
if
(
last_xd
[
'fx_mark'
]
==
'd'
and
last_xd
[
'xd'
]
>
xd
[
'xd'
])
\
or
(
last_xd
[
'fx_mark'
]
==
'g'
and
last_xd
[
'xd'
]
<
xd
[
'xd'
]):
if
self
.
verbose
:
print
(
f
"更新线段标记:from
{
last_xd
}
to
{
xd
}
"
)
self
.
xd_list
[
-
1
]
=
xd
else
:
bi_inside
=
[
x
for
x
in
right_bi
if
last_xd
[
'dt'
]
<=
x
[
'dt'
]
<=
xd
[
'dt'
]]
if
len
(
bi_inside
)
<
4
:
if
self
.
verbose
:
print
(
f
"
{
last_xd
[
'dt'
]
}
-
{
xd
[
'dt'
]
}
之间没有4个笔标记,跳过"
)
continue
else
:
self
.
xd_list
.
append
(
xd
)
def
update
(
self
,
k
):
"""更新分析结果
...
...
@@ -251,10 +462,12 @@ class KlineAnalyze:
'close': 3210.1,
'high': 3373.53,
'low': 3209.76,
'vol': 486366915.0,
'is_end': True}
'vol': 486366915.0}
"""
if
k
[
'is_end'
]:
if
self
.
verbose
:
print
(
"="
*
100
)
print
(
f
"输入新K线:
{
k
}
"
)
if
not
self
.
kline_raw
or
k
[
'open'
]
!=
self
.
kline_raw
[
-
1
][
'open'
]:
self
.
kline_raw
.
append
(
k
)
else
:
if
self
.
verbose
:
...
...
@@ -269,7 +482,10 @@ class KlineAnalyze:
self
.
_update_kline_new
()
self
.
_update_fx_list
()
self
.
_update_bi_list
()
# self._update_xd_list()
self
.
_update_xd_list
()
if
self
.
verbose
:
print
(
"更新结束
\n\n
"
)
def
to_df
(
self
,
ma_params
=
(
5
,
20
),
use_macd
=
True
,
use_boll
=
False
,
max_count
=
5000
):
"""整理成 df 输出
...
...
test/test_analyze_v3.py
浏览文件 @
3ee5e66b
...
...
@@ -18,15 +18,15 @@ kline.loc[:, "is_end"] = True
# print(ka)
def
test_
objects
():
if
isinstance
(
kline
,
pd
.
DataFrame
):
columns
=
kline
.
columns
.
to_list
()
bars
=
[{
k
:
v
for
k
,
v
in
zip
(
columns
,
row
)}
for
row
in
kline
.
values
]
else
:
bars
=
kline
ka
=
KlineAnalyze
(
name
=
"日线"
)
for
bar
in
bars
:
ka
.
update
(
bar
)
def
test_
kline_analyze
():
ka
=
KlineAnalyze
(
kline
,
name
=
"日线"
)
# 测试增量更新
ka_raw_len
=
len
(
ka
.
kline_raw
)
for
x
in
[
2890
,
2910
,
2783
,
3120
]:
k
=
dict
(
ka
.
kline_raw
[
-
1
])
k
[
'close'
]
=
x
ka
.
update
(
k
)
assert
len
(
ka
.
kline_raw
)
==
ka_raw_len
assert
ka
.
kline_raw
[
-
1
][
'close'
]
==
x
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录