提交 27e8a077 编写于 作者: Z zengbin93

0.3.7 优化中枢识别代码

上级 bd7b3799
...@@ -80,7 +80,7 @@ def up_zs_number(ka): ...@@ -80,7 +80,7 @@ def up_zs_number(ka):
k_zs = ka.zs[::-1] k_zs = ka.zs[::-1]
zs_cur = k_zs[0] zs_cur = k_zs[0]
for zs_next in k_zs[1:]: for zs_next in k_zs[1:]:
if zs_cur['zs'][0] >= zs_next['zs'][1]: if zs_cur["ZD"] >= zs_next["ZG"]:
zs_num += 1 zs_num += 1
zs_cur = zs_next zs_cur = zs_next
else: else:
...@@ -95,7 +95,7 @@ def down_zs_number(ka): ...@@ -95,7 +95,7 @@ def down_zs_number(ka):
k_zs = ka.zs[::-1] k_zs = ka.zs[::-1]
zs_cur = k_zs[0] zs_cur = k_zs[0]
for zs_next in k_zs[1:]: for zs_next in k_zs[1:]:
if zs_cur['zs'][1] <= zs_next['zs'][0]: if zs_cur["ZG"] <= zs_next["ZD"]:
zs_num += 1 zs_num += 1
zs_cur = zs_next zs_cur = zs_next
else: else:
...@@ -129,6 +129,80 @@ def get_ka_feature(ka): ...@@ -129,6 +129,80 @@ def get_ka_feature(ka):
return feature return feature
def find_zs(points):
"""输入笔或线段标记点,输出中枢识别结果"""
if len(points) <= 4:
return []
# 当输入为笔的标记点时,新增 xd 值
for i, x in enumerate(points):
if x.get("bi", 0):
points[i]['xd'] = x["bi"]
k_xd = points
k_zs = []
zs_xd = []
for i in range(len(k_xd)):
if len(zs_xd) < 5:
zs_xd.append(k_xd[i])
continue
xd_p = k_xd[i]
zs_d = max([x['xd'] for x in zs_xd[:4] if x['fx_mark'] == 'd'])
zs_g = min([x['xd'] for x in zs_xd[:4] if x['fx_mark'] == 'g'])
if zs_g <= zs_d:
zs_xd.append(k_xd[i])
zs_xd.pop(0)
continue
# 定义四个指标,GG=max(gn),G=min(gn),D=max(dn),DD=min(dn),
# n遍历中枢中所有Zn。特别地,再定义ZG=min(g1、g2),
# ZD=max(d1、d2),显然,[ZD,ZG]就是缠中说禅走势中枢的区间
if xd_p['fx_mark'] == "d" and xd_p['xd'] > zs_g:
# 线段在中枢上方结束,形成三买
k_zs.append({
'ZD': zs_d,
"ZG": zs_g,
'G': min([x['xd'] for x in zs_xd if x['fx_mark'] == 'g']),
'GG': max([x['xd'] for x in zs_xd if x['fx_mark'] == 'g']),
'D': max([x['xd'] for x in zs_xd if x['fx_mark'] == 'd']),
'DD': min([x['xd'] for x in zs_xd if x['fx_mark'] == 'd']),
"points": deepcopy(zs_xd),
"third_buy": deepcopy(xd_p)
})
zs_xd = deepcopy(k_xd[i - 1: i + 1])
elif xd_p['fx_mark'] == "g" and xd_p['xd'] < zs_d:
# 线段在中枢下方结束,形成三卖
k_zs.append({
'ZD': zs_d,
"ZG": zs_g,
'G': min([x['xd'] for x in zs_xd if x['fx_mark'] == 'g']),
'GG': max([x['xd'] for x in zs_xd if x['fx_mark'] == 'g']),
'D': max([x['xd'] for x in zs_xd if x['fx_mark'] == 'd']),
'DD': min([x['xd'] for x in zs_xd if x['fx_mark'] == 'd']),
"points": deepcopy(zs_xd),
"third_sell": deepcopy(xd_p)
})
zs_xd = deepcopy(k_xd[i - 1: i + 1])
else:
zs_xd.append(deepcopy(xd_p))
if len(zs_xd) >= 5:
zs_d = max([x['xd'] for x in zs_xd[:4] if x['fx_mark'] == 'd'])
zs_g = min([x['xd'] for x in zs_xd[:4] if x['fx_mark'] == 'g'])
k_zs.append({
'ZD': zs_d,
"ZG": zs_g,
'G': min([x['xd'] for x in zs_xd if x['fx_mark'] == 'g']),
'GG': max([x['xd'] for x in zs_xd if x['fx_mark'] == 'g']),
'D': max([x['xd'] for x in zs_xd if x['fx_mark'] == 'd']),
'DD': min([x['xd'] for x in zs_xd if x['fx_mark'] == 'd']),
"points": deepcopy(zs_xd),
})
return k_zs
@lru_cache(maxsize=64) @lru_cache(maxsize=64)
def create_df(ka, ma_params=(5, 20, 120, 250)): def create_df(ka, ma_params=(5, 20, 120, 250)):
df = pd.DataFrame(deepcopy(ka.kline)) df = pd.DataFrame(deepcopy(ka.kline))
...@@ -175,7 +249,8 @@ class KlineAnalyze(object): ...@@ -175,7 +249,8 @@ class KlineAnalyze(object):
self.fx = self._find_fx() self.fx = self._find_fx()
self.bi = self._find_bi() self.bi = self._find_bi()
self.xd = self._find_xd() self.xd = self._find_xd()
self.zs = self._find_zs() self.zs = find_zs(self.xd)
# self.zs = self._find_zs()
self.__update_kline() self.__update_kline()
def __repr__(self): def __repr__(self):
...@@ -454,52 +529,52 @@ class KlineAnalyze(object): ...@@ -454,52 +529,52 @@ class KlineAnalyze(object):
traceback.print_exc() traceback.print_exc()
return [] return []
def _find_zs(self): # def _find_zs(self):
"""查找中枢""" # """查找中枢"""
if len(self.xd) <= 4: # if len(self.xd) <= 4:
return [] # return []
#
k_xd = self.xd # k_xd = self.xd
k_zs = [] # k_zs = []
zs_xd = [] # zs_xd = []
#
for i in range(len(k_xd)): # for i in range(len(k_xd)):
if len(zs_xd) < 5: # if len(zs_xd) < 5:
zs_xd.append(k_xd[i]) # zs_xd.append(k_xd[i])
continue # continue
xd_p = k_xd[i] # xd_p = k_xd[i]
zs_d = max([x['xd'] for x in zs_xd[1:5] if x['fx_mark'] == 'd']) # zs_d = max([x['xd'] for x in zs_xd[1:5] if x['fx_mark'] == 'd'])
zs_g = min([x['xd'] for x in zs_xd[1:5] if x['fx_mark'] == 'g']) # zs_g = min([x['xd'] for x in zs_xd[1:5] if x['fx_mark'] == 'g'])
if zs_g <= zs_d: # if zs_g <= zs_d:
zs_xd.append(k_xd[i]) # zs_xd.append(k_xd[i])
zs_xd.pop(0) # zs_xd.pop(0)
continue # continue
#
if xd_p['fx_mark'] == "d" and xd_p['xd'] > zs_g: # if xd_p['fx_mark'] == "d" and xd_p['xd'] > zs_g:
# 线段在中枢上方结束,形成三买 # # 线段在中枢上方结束,形成三买
k_zs.append({ # k_zs.append({
'zs': (zs_d, zs_g), # 'zs': (zs_d, zs_g),
"zs_xd": deepcopy(zs_xd), # "zs_xd": deepcopy(zs_xd),
"third_buy": deepcopy(xd_p) # "third_buy": deepcopy(xd_p)
}) # })
zs_xd = deepcopy(k_xd[i: i+1]) # zs_xd = deepcopy(k_xd[i: i+1])
elif xd_p['fx_mark'] == "g" and xd_p['xd'] < zs_d: # elif xd_p['fx_mark'] == "g" and xd_p['xd'] < zs_d:
# 线段在中枢下方结束,形成三卖 # # 线段在中枢下方结束,形成三卖
k_zs.append({ # k_zs.append({
'zs': (zs_d, zs_g), # 'zs': (zs_d, zs_g),
"zs_xd": deepcopy(zs_xd), # "zs_xd": deepcopy(zs_xd),
"third_sell": deepcopy(xd_p) # "third_sell": deepcopy(xd_p)
}) # })
zs_xd = deepcopy(k_xd[i: i+1]) # zs_xd = deepcopy(k_xd[i: i+1])
else: # else:
zs_xd.append(deepcopy(xd_p)) # zs_xd.append(deepcopy(xd_p))
#
if len(zs_xd) >= 5: # if len(zs_xd) >= 5:
zs_d = max([x['xd'] for x in zs_xd[1:5] if x['fx_mark'] == 'd']) # zs_d = max([x['xd'] for x in zs_xd[1:5] if x['fx_mark'] == 'd'])
zs_g = min([x['xd'] for x in zs_xd[1:5] if x['fx_mark'] == 'g']) # zs_g = min([x['xd'] for x in zs_xd[1:5] if x['fx_mark'] == 'g'])
k_zs.append({'zs': (zs_d, zs_g), "zs_xd": deepcopy(zs_xd)}) # k_zs.append({'zs': (zs_d, zs_g), "zs_xd": deepcopy(zs_xd)})
#
return k_zs # return k_zs
def __update_kline(self): def __update_kline(self):
kn_map = {x['dt']: x for x in self.kline_new} kn_map = {x['dt']: x for x in self.kline_new}
......
...@@ -5,17 +5,27 @@ sys.path.insert(0, '.') ...@@ -5,17 +5,27 @@ sys.path.insert(0, '.')
sys.path.insert(0, '..') sys.path.insert(0, '..')
import czsc import czsc
from czsc import KlineAnalyze from czsc import KlineAnalyze
from czsc.analyze import is_bei_chi from czsc.analyze import is_bei_chi, find_zs, down_zs_number, up_zs_number
from czsc.utils import plot_ka from czsc.utils import plot_ka
print(czsc.__version__) print(czsc.__version__)
df = get_kline(ts_code="000001.SH", end_dt="2020-04-28 15:00:00", freq='D', asset='I')
ka = KlineAnalyze(df, name="日线")
def test_kline_analyze():
assert ka.bi[-1]['fx_mark'] == 'g'
assert ka.xd[-1]['fx_mark'] == 'd'
# 测试背驰识别
assert not ka.bi_bei_chi()
assert ka.xd_bei_chi()
print(ka.zs[-2])
def test_bei_chi(): def test_bei_chi():
df = get_kline(ts_code="000001.SH", end_dt="2020-04-28 15:00:00", freq='D', asset='I') plot_ka(ka, file_image="test.png")
ka = KlineAnalyze(df, name="日线")
plot_ka(ka, file_image="test.jpg")
# 线段背驰 # 线段背驰
zs1 = {"start_dt": '2018-07-26 15:00:00', "end_dt": '2018-10-19 15:00:00', "direction": "down"} zs1 = {"start_dt": '2018-07-26 15:00:00', "end_dt": '2018-10-19 15:00:00', "direction": "down"}
...@@ -36,17 +46,21 @@ def test_bei_chi(): ...@@ -36,17 +46,21 @@ def test_bei_chi():
assert not is_bei_chi(ka, zs1, zs2, mode='bi', adjust=0.9) assert not is_bei_chi(ka, zs1, zs2, mode='bi', adjust=0.9)
def test_kline_analyze(): def test_find_zs():
df = get_kline(ts_code="300008.SZ", end_dt="2020-03-23 15:00:00", freq='30min', asset='E') assert down_zs_number(ka) == 2
ka = KlineAnalyze(df) assert up_zs_number(ka) == 1
xd_zs = find_zs(ka.xd)
bi_zs = find_zs(ka.bi)
# 测试识别结果 assert xd_zs[-2]["ZD"] == 2850.71
assert ka.bi[-1]['fx_mark'] == 'g' assert xd_zs[-2]["ZG"] == 3684.57
assert ka.xd[-1]['fx_mark'] == 'g'
# 测试背驰识别 assert xd_zs[-1]["ZD"] == 2691.02
assert not ka.bi_bei_chi() assert xd_zs[-1]["ZG"] == 2827.34
assert not ka.xd_bei_chi()
print(ka.zs[-2]) assert bi_zs[-2]['ZD'] == 2987.77
assert bi_zs[-2]['ZG'] == 3125.02
assert bi_zs[-1]['ZD'] == 2838.38
assert bi_zs[-1]['ZG'] == 2956.78
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册