提交 0b0367af 编写于 作者: Z zengbin93

0.3.1 修改笔识别方法

上级 81bd666c
......@@ -6,36 +6,6 @@ import pandas as pd
from .ta import macd
def up_zs_number(ka):
"""检查最新走势的连续向上中枢数量"""
zs_num = 1
if len(ka.zs) > 1:
k_zs = ka.zs[::-1]
zs_cur = k_zs[0]
for zs_next in k_zs[1:]:
if zs_cur['zs'][0] >= zs_next['zs'][1]:
zs_num += 1
zs_cur = zs_next
else:
break
return zs_num
def down_zs_number(ka):
"""检查最新走势的连续向下中枢数量"""
zs_num = 1
if len(ka.zs) > 1:
k_zs = ka.zs[::-1]
zs_cur = k_zs[0]
for zs_next in k_zs[1:]:
if zs_cur['zs'][1] <= zs_next['zs'][0]:
zs_num += 1
zs_cur = zs_next
else:
break
return zs_num
def is_bei_chi(ka, zs1, zs2, direction="down", mode="bi"):
"""判断 zs1 对 zs2 是否有背驰
......@@ -199,18 +169,49 @@ class KlineAnalyze(object):
if x['fx_mark'] in ['d', 'g']]
return fx
def __get_potential_bi(self):
def __extract_potential(self, mode='fx', fx_mark='d'):
if mode == 'fx':
points = deepcopy(self.fx)
elif mode == 'bi':
points = deepcopy(self.bi)
else:
raise ValueError
seq = [x for x in points if x['fx_mark'] == fx_mark]
seq = sorted(seq, key=lambda x: x['dt'], reverse=False)
p = []
for i in range(len(seq)-2):
window = seq[i: i+3]
if fx_mark == 'd':
if window[0][mode] >= window[1][mode] <= window[2][mode]:
p.append(deepcopy(window[1]))
elif fx_mark == 'g':
if window[0][mode] <= window[1][mode] >= window[2][mode]:
p.append(deepcopy(window[1]))
else:
raise ValueError
return p
def __handle_hist_bi(self):
"""识别笔标记:从已经识别出来的分型中确定能够构建笔的分型
划分笔的步骤:
(1)确定所有符合标准的分型。
(2)如果前后两分型是同一性质的,对于顶,前面的低于后面的,只保留后面的,前面那个可以忽略掉;对于底,
前面的高于后面的,只保留后面的,前面那个可以忽略掉。不满足上面情况的,例如相等的,都可以先保留。
(3)经过步骤(2)的处理后,余下的分型,如果相邻的是顶和底,那么这就可以划为一笔。
(2)取出所有分型中的顶分型序列,用大小为3的窗口在顶分型序列上滑动,如果中间的分型值最大,则保留窗口中间的顶分型;
(3)取出所有分型中的底分型序列,同样用大小为3的窗口进行滑动,如果中间的分型值最小,则保留窗口中间的底分型;
(4)合并第2/3步保留下来的顶底分型序列,遍历,如果前后两分型是同一性质的,对于顶,前面的低于后面的,只保留后面的,
前面那个可以忽略掉;对于底,前面的高于后面的,只保留后面的,前面那个可以忽略掉。
不满足上面情况的,例如相等的,都可以先保留。
(5)经过步骤(4)的处理后,余下的分型,如果相邻的是顶和底,那么这就可以划为一笔。
"""
# 符合标准的分型
kn = deepcopy(self.kline_new)
fx_p = sorted(deepcopy(self.fx), key=lambda x: x['dt'], reverse=False)
fx_p = [] # 存储潜在笔标记
fx_p.extend(self.__extract_potential(mode='fx', fx_mark='d'))
fx_p.extend(self.__extract_potential(mode='fx', fx_mark='g'))
fx_p = sorted(fx_p, key=lambda x: x['dt'], reverse=False)
# 确认哪些分型可以构成笔
bi = []
for i in range(len(fx_p)):
......@@ -240,41 +241,18 @@ class KlineAnalyze(object):
return bi
def __handle_last_bi(self, bi):
"""判断最后一个笔标记是否有效,规则如下:
1)如果最后一个笔标记为顶分型,最近一根K线的最高价在这个顶分型上方,该标记无效;
2)如果最后一个笔标记为底分型,最近一根K线的最低价在这个底分型下方,该标记无效;
"""
last_bi = bi[-1]
last_k = self.kline_new[-1]
if (last_bi['fx_mark'] == 'g' and last_k['high'] >= last_bi['bi']) or \
(last_bi['fx_mark'] == 'd' and last_k['low'] <= last_bi['bi']):
bi.pop()
return bi
pass
def _find_bi(self):
bi = self.__get_potential_bi()
bi = self.__handle_last_bi(bi)
bi_list = [x["dt"] for x in bi]
bi = self.__handle_hist_bi()
# bi = self.__handle_last_bi(bi)
dts = [x["dt"] for x in bi]
for k in self.kline_new:
if k['dt'] in bi_list:
if k['dt'] in dts:
k['bi'] = k['fx']
return bi
def __handle_last_xd(self, xd_v):
"""判断最后一个线段标记是否有效,规则如下:
1)如果最后一个线段标记为顶分型,最近一根K线的最高价在这个顶分型上方,该标记无效;
2)如果最后一个线段标记为底分型,最近一根K线的最低价在这个底分型下方,该标记无效;
"""
last_xd = xd_v[-1]
last_k = self.kline_new[-1]
if (last_xd['fx_mark'] == 'g' and last_k['high'] >= last_xd['xd']) or \
(last_xd['fx_mark'] == 'd' and last_k['low'] <= last_xd['xd']):
xd_v.pop()
return xd_v
def __get_potential_xd(self):
def __handle_hist_xd(self):
"""识别线段标记:从已经识别出来的笔中识别线段
划分线段的步骤:
......@@ -283,7 +261,11 @@ class KlineAnalyze(object):
前面的高于后面的,只保留后面的,前面那个可以忽略掉。不满足上面情况的,例如相等的,都可以先保留。
(3)经过步骤(2)的处理后,余下的笔标记,如果相邻的是顶和底,那么这就可以划为线段。
"""
bi_p = sorted(deepcopy(self.bi), key=lambda x: x['dt'], reverse=False)
bi_p = [] # 存储潜在线段标记
bi_p.extend(self.__extract_potential(mode='bi', fx_mark='d'))
bi_p.extend(self.__extract_potential(mode='bi', fx_mark='g'))
bi_p = sorted(bi_p, key=lambda x: x['dt'], reverse=False)
xd = []
for i in range(len(bi_p)):
k = deepcopy(bi_p[i])
......@@ -307,42 +289,47 @@ class KlineAnalyze(object):
continue
bi_m = [x for x in self.bi if k0['dt'] <= x['dt'] <= k['dt']]
bi_r = [x for x in self.bi if x['dt'] >= k['dt']]
# 一线段内部至少三笔
if len(bi_m) >= 4:
if len(bi_m) == 4:
if len(bi_r) <= 1:
continue
# 两个连续线段标记之间只有三笔的处理
lp2 = bi_m[-2]
rp2 = bi_r[1]
if lp2['fx_mark'] != rp2['fx_mark']:
continue
# if (k['fx_mark'] == "g" and bi_m[-1]['bi'] > bi_m[-3]['bi']) \
# or (k['fx_mark'] == "d" and bi_m[-1]['bi'] < bi_m[-3]['bi']):
# xd.append(k)
if (k['fx_mark'] == "g" and lp2['bi'] < rp2['bi'] and bi_m[-1]['bi'] > bi_m[-3]['bi']) \
or (k['fx_mark'] == "d" and lp2['bi'] > rp2['bi']
and bi_m[-1]['bi'] < bi_m[-3]['bi']):
xd.append(k)
else:
xd.append(k)
xd.append(k)
# 后处理:在任一线段内部,底分型一定是最低点,顶分型一定是最高点
xd_new = []
for i in range(len(xd)):
if i == 0:
xd2 = xd[i+1]
bi_m = [x for x in self.bi if x['dt'] <= xd2['dt']]
elif i == len(xd)-1:
xd1 = xd[i-1]
bi_m = [x for x in self.bi if x['dt'] >= xd1['dt']]
else:
xd1, xd2 = xd[i-1], xd[i+1]
bi_m = [x for x in self.bi if xd1['dt'] <= x['dt'] <= xd2['dt']]
xd0 = xd[i]
if xd0['fx_mark'] == 'd':
bi_m = sorted(bi_m, key=lambda x: x['bi'], reverse=False)
elif xd0['fx_mark'] == 'g':
bi_m = sorted(bi_m, key=lambda x: x['bi'], reverse=True)
else:
raise ValueError
new = deepcopy(bi_m[0])
new['xd'] = new['bi']
del new['bi']
xd_new.append(new)
return xd
def __handle_last_xd(self, xd_v):
pass
def _find_xd(self):
try:
xd = self.__get_potential_xd()
xd = self.__handle_last_xd(xd)
dts = [x['dt'] for x in xd]
def __add_xd(k):
xd = self.__handle_hist_xd()
# xd = self.__handle_last_xd(xd)
dts = [x["dt"] for x in xd]
for k in self.kline_new:
if k['dt'] in dts:
k['xd'] = k['fx']
return k
self.kline_new = [__add_xd(k) for k in self.kline_new]
return xd
except:
traceback.print_exc()
......
......@@ -6,7 +6,7 @@ from functools import lru_cache
sys.path.insert(0, r'C:\git_repo\zengbin93\chan')
import chan
from chan import KlineAnalyze, SolidAnalyze
from chan import KlineAnalyze
print(chan.__version__)
......@@ -90,35 +90,6 @@ def get_klines(ts_code, end_date, freqs='1min,5min,30min,D', asset='E'):
def test_kline_analyze():
df = get_kline(ts_code="300803.SZ", freq='5min', end_date="20200316")
ka = KlineAnalyze(df)
print(ka.bi)
@lru_cache(maxsize=128)
def create_sa(ts_code, end_date):
klines = get_klines(ts_code=ts_code, freqs='1min,5min,30min,D', asset="E", end_date=end_date)
sa = SolidAnalyze(klines)
return sa
def test_solid_analyze():
test_data = [
{"ts_code": '300033.SZ', "freq": "5分钟", "end_date": "20200307", "bs": "二买"},
{"ts_code": '300033.SZ', "freq": "1分钟", "end_date": "20200307", "bs": "二买"},
{"ts_code": '000012.SZ', "freq": "5分钟", "end_date": "20200307", "bs": "二卖"},
{"ts_code": '002405.SZ', "freq": "5分钟", "end_date": "20200307", "bs": "一卖"},
{"ts_code": '603383.SH', "freq": "日线", "end_date": "20200227", "bs": "线卖"},
]
for row in test_data:
print("=" * 100)
print(row)
sa = create_sa(row['ts_code'], row['end_date'])
if row['bs'] == '二买':
b, detail = sa.is_second_buy(row['freq'], tolerance=0.1)
print(b, detail)
elif row['bs'] == '二卖':
b, detail = sa.is_second_sell(row['freq'], tolerance=0.1)
print(b, detail)
elif row['bs'] == '一卖':
b, detail = sa.is_first_sell(row['freq'], tolerance=0.1)
print(b, detail)
print('\n')
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册