提交 1e531711 编写于 作者: Z zengbin93

0.4.2: 优化线段处理方法

上级 744d4eb7
......@@ -187,9 +187,9 @@ def create_df(ka, ma_params=(5, 20, 120, 250), use_macd=True, use_boll=True):
class KlineAnalyze(object):
def __init__(self, kline, name="本级别", bi_mode="new", xd_mode="strict", handle_last=True, debug=False):
def __init__(self, kline, name="本级别", bi_mode="new", xd_mode="strict",
min_bi_gap=0.001, handle_last=True, debug=False):
:param kline: list of dict or pd.DataFrame
example kline:
kline = [
......@@ -206,6 +206,8 @@ class KlineAnalyze(object):
:param xd_mode: str
线段识别控制参数,默认为 loose,在这种模式下,只要线段标记内有三笔就满足会识别;另外一个可选值是 strict,
在 strict 模式下,对于三笔形成的线段,要求其后的一笔不跌破或升破线段最后一笔的起始位置。
:param min_bi_gap: float
笔内部缺口的最小百分比,默认值 0.001
:param handle_last: bool
是否使用默认的 handle_last 方法,默认值为 True
......@@ -215,6 +217,7 @@ class KlineAnalyze(object):
self.bi_mode = bi_mode
self.xd_mode = xd_mode
self.handle_last = handle_last
self.min_bi_gap = min_bi_gap
self.debug = debug
self.kline = self._preprocess(kline)
self.symbol = self.kline[0]['symbol']
......@@ -419,22 +422,6 @@ class KlineAnalyze(object):
raise ValueError
self.min_k_num = min_k_num
kn = self.kline_new
# fx_p = [] # 存储潜在笔标记
# fx_p.extend(self.__extract_potential(mode='fx', fx_mark='d'))
# fx_p.extend(self.__extract_potential(mode='fx', fx_mark='g'))
# # 加入满足笔条件的连续两个分型
# fx = self.fx
# for i in range(len(fx) - 1):
# fx1 = fx[i]
# fx2 = fx[i + 1]
# k_num = [x for x in kn if fx1['dt'] <= x['dt'] <= fx2['dt']]
# if len(k_num) >= min_k_num:
# fx_p.append(fx1)
# fx_p.append(fx2)
# fx_p = sorted(fx_p, key=lambda x: x['dt'], reverse=False)
fx_p = self.fx
# 确认哪些分型可以构成笔
......@@ -463,12 +450,12 @@ class KlineAnalyze(object):
for pair in k_pair:
kr, kl = pair
# 向下缺口
if kr['low'] > kl['high']:
if kr['low'] > kl['high'] * (1+self.min_bi_gap):
has_gap = True
# 向上缺口
if kr['high'] < kl['low']:
if kr['high'] < kl['low'] * (1-self.min_bi_gap):
has_gap = True
......@@ -483,17 +470,6 @@ class KlineAnalyze(object):
if (k0['fx_mark'] == 'g' and k['bi'] < k0['bi'] and k['bi'] == min_low) or \
(k0['fx_mark'] == 'd' and k['bi'] > k0['bi'] and k['bi'] == max_high):
# # 确保相邻两个顶底之间顶大于底
# if (k0['fx_mark'] == 'g' and k['bi'] >= k0['bi']) or \
# (k0['fx_mark'] == 'd' and k['bi'] <= k0['bi']):
# bi.pop(-1)
# continue
# # 一笔的顶底分型之间至少包含5根K线
# k_num = [x for x in kn if k0['dt'] <= x['dt'] <= k['dt']]
# if len(k_num) >= min_k_num:
# bi.append(k)
return bi
def __handle_last_bi(self, bi):
......@@ -559,25 +535,34 @@ class KlineAnalyze(object):
bi_m = [x for x in self.bi if k0['dt'] <= x['dt'] <= k['dt']]
bi_r = [x for x in self.bi if x['dt'] >= k['dt']]
# 一线段内部至少三笔
if len(bi_m) >= 4:
# 两个连续线段标记之间只有三笔的处理,这里区分 loose 和 strict 两种模式
if len(bi_m) == 4:
if self.xd_mode == 'loose':
if (k['fx_mark'] == "g" and bi_m[-1]['bi'] > bi_m[-3]['bi']) \
or (k['fx_mark'] == "d" and bi_m[-1]['bi'] < bi_m[-3]['bi']):
elif self.xd_mode == 'strict':
if len(bi_r) <= 1:
lp2 = bi_m[-2]
rp2 = bi_r[1]
if (k['fx_mark'] == "g" and lp2['bi'] < rp2['bi'] and bi_m[-1]['bi'] > bi_m[-3]['bi']) \
or (k['fx_mark'] == "d" and lp2['bi'] > rp2['bi']
and bi_m[-1]['bi'] < bi_m[-3]['bi']):
raise ValueError("xd_mode value error")
if len(bi_m) < 4 or len(bi_r) < 4:
# 线段的顶必然大于相邻的两个顶;线段的底必然小于相邻的两个底
assert k['fx_mark'] == bi_m[-3]['fx_mark'] == bi_r[2]['fx_mark']
if k['fx_mark'] == "d" and not (bi_m[-3]['bi'] > k['xd'] < bi_r[2]['bi']):
print(bi_m[-3], k, bi_r[2])
if k['fx_mark'] == "g" and not (bi_m[-3]['bi'] < k['xd'] > bi_r[2]['bi']):
print(bi_m[-3], k, bi_r[2])
# 判断线段标记是否有效
left_last = bi_m[-3]
right_first = bi_r[1]
assert left_last['fx_mark'] != right_first['fx_mark']
if k['fx_mark'] == 'd':
max_g = max([x['bi'] for x in bi_r[:8] if x['fx_mark'] == 'g'])
if max_g > right_first['bi'] and max_g > left_last['bi']:
if k['fx_mark'] == 'g':
min_d = min([x['bi'] for x in bi_r[:8] if x['fx_mark'] == 'd'])
if min_d < right_first['bi'] and min_d < left_last['bi']:
return xd
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册