提交 ecfe0c87 编写于 作者: Z zengbin93

0.5.3 新增 is_valid_xd 方法,判断线段标记是否有效

上级 b2ae91f9
......@@ -124,15 +124,110 @@ def find_zs(points):
return k_zs
def has_gap(k1, k2):
def has_gap(k1, k2, min_gap=0.002):
"""判断 k1, k2 之间是否有缺口"""
assert k2['dt'] > k1['dt']
if k1['high'] < k2['low'] * 0.998 or k2['high'] < k1['low'] * 0.998:
if k1['high'] < k2['low'] * (1-min_gap) \
or k2['high'] < k1['low'] * (1-min_gap):
return True
else:
return False
def make_standard_seq(bi_seq):
"""计算标准特征序列
:return: list of dict
"""
if bi_seq[0]['fx_mark'] == 'd':
direction = "up"
elif bi_seq[0]['fx_mark'] == 'g':
direction = "down"
else:
raise ValueError
raw_seq = [{"dt": bi_seq[i].dt,
'high': max(bi_seq[i].price, bi_seq[i + 1].price),
'low': min(bi_seq[i].price, bi_seq[i + 1].price)}
for i in range(1, len(bi_seq), 2) if i <= len(bi_seq) - 2]
seq = []
for row in raw_seq:
if not seq:
seq.append(row)
continue
last = seq[-1]
cur_h, cur_l = row['high'], row['low']
last_h, last_l = last['high'], last['low']
# 左包含 or 右包含
if (cur_h <= last_h and cur_l >= last_l) or (cur_h >= last_h and cur_l <= last_l):
seq.pop(-1)
# 有包含关系,按方向分别处理
if direction == "up":
last_h = max(last_h, cur_h)
last_l = max(last_l, cur_l)
elif direction == "down":
last_h = min(last_h, cur_h)
last_l = min(last_l, cur_l)
else:
raise ValueError
seq.append({"dt": row['dt'], "high": last_h, "low": last_l})
else:
seq.append(row)
return seq
def is_valid_xd(bi_seq1, bi_seq2, bi_seq3):
"""判断线段标记是否有效(第二个线段标记)
:param bi_seq1: list of dict
第一个线段标记到第二个线段标记之间的笔序列
:param bi_seq2:
第二个线段标记到第三个线段标记之间的笔序列
:param bi_seq3:
第三个线段标记之后的笔序列
:return:
"""
assert bi_seq2[0] == bi_seq1[-1] and bi_seq3[0] == bi_seq2[-1]
standard_bi_seq1 = make_standard_seq(bi_seq1)
# 第一种情况(向下线段)
if bi_seq2[0]['fx_mark'] == 'd' and bi_seq2[1]['bi'] >= standard_bi_seq1[-1]['low']:
if bi_seq2[-1]['bi'] < bi_seq2[1]['bi']:
return False
# 第一种情况(向上线段)
if bi_seq2[0]['fx_mark'] == 'g' and bi_seq2[1]['bi'] <= standard_bi_seq1[-1]['high']:
if bi_seq2[-1]['bi'] > bi_seq2[1]['bi']:
return False
# 第二种情况(向下线段)
if bi_seq2[0]['fx_mark'] == 'd' and bi_seq2[1]['bi'] < standard_bi_seq1[-1]['low']:
bi_seq2.extend(bi_seq3)
standard_bi_seq2 = make_standard_seq(bi_seq2)
standard_bi_seq2_g = []
for i in range(len(standard_bi_seq2) -2):
bi1, bi2, bi3 = standard_bi_seq2[i-1: i+2]
if bi1['high'] < bi2['high'] > bi3['high']:
standard_bi_seq2_g.append(bi2)
if len(standard_bi_seq2_g) == 0:
return False
# 第二种情况(向上线段)
if bi_seq2[0]['fx_mark'] == 'g' and bi_seq2[1]['bi'] > standard_bi_seq1[-1]['high']:
bi_seq2.extend(bi_seq3)
standard_bi_seq2 = make_standard_seq(bi_seq2)
standard_bi_seq2_d = []
for i in range(len(standard_bi_seq2) -2):
bi1, bi2, bi3 = standard_bi_seq2[i-1: i+2]
if bi1['low'] > bi2['low'] < bi3['low']:
standard_bi_seq2_d.append(bi2)
if len(standard_bi_seq2_d) == 0:
return False
return True
class KlineAnalyze:
def __init__(self, kline, name="本级别", bi_mode="old", max_raw_len=10000, ma_params=(5, 20, 120), verbose=False):
"""
......@@ -342,22 +437,7 @@ class KlineAnalyze:
i += 1
def _update_bi_list(self):
"""更新笔序列
笔标记样例:
{'dt': Timestamp('2020-05-25 15:00:00'),
'fx_mark': 'd',
'fx_high': 2821.5,
'fx_low': 2802.47,
'bi': 2802.47}
{'dt': Timestamp('2020-07-09 15:00:00'),
'fx_mark': 'g',
'fx_high': 3456.97,
'fx_low': 3366.08,
'bi': 3456.97}
"""
"""更新笔序列"""
if len(self.fx_list) < 2:
return
......@@ -409,57 +489,90 @@ class KlineAnalyze:
print("新增笔标记:{}".format(bi))
self.bi_list.append(bi)
if (self.bi_list[-1]['fx_mark'] == 'd' and self.kline_new[-1]['low'] < self.bi_list[-1]['bi']) \
or (self.bi_list[-1]['fx_mark'] == 'g' and self.kline_new[-1]['high'] > self.bi_list[-1]['bi']):
if self.verbose:
print("最后一个笔标记无效,{}".format(self.bi_list[-1]))
self.bi_list.pop(-1)
# if (self.bi_list[-1]['fx_mark'] == 'd' and self.kline_new[-1]['low'] < self.bi_list[-1]['bi']) \
# or (self.bi_list[-1]['fx_mark'] == 'g' and self.kline_new[-1]['high'] > self.bi_list[-1]['bi']):
# if self.verbose:
# print("最后一个笔标记无效,{}".format(self.bi_list[-1]))
# self.bi_list.pop(-1)
@staticmethod
def _make_standard_seq(bi_seq):
"""计算标准特征序列
def _update_xd_list_v1(self):
"""更新线段序列"""
if len(self.bi_list) < 4:
return
:return: list of dict
"""
if bi_seq[0]['fx_mark'] == 'd':
direction = "up"
elif bi_seq[0]['fx_mark'] == 'g':
direction = "down"
self.xd_list = self.xd_list[:-2]
if len(self.xd_list) == 0:
for i in range(3):
xd = dict(self.bi_list[i])
xd['xd'] = xd.pop('bi')
self.xd_list.append(xd)
if len(self.xd_list) <= 3:
right_bi = [x for x in self.bi_list if x['dt'] >= self.xd_list[-1]['dt']]
else:
raise ValueError
right_bi = [x for x in self.bi_list[-200:] if x['dt'] >= self.xd_list[-1]['dt']]
xd_p = []
bi_d = [x for x in right_bi if x['fx_mark'] == 'd']
bi_g = [x for x in right_bi if x['fx_mark'] == 'g']
for i in range(1, len(bi_d) - 2):
d1, d2, d3 = bi_d[i - 1: i + 2]
if d1['bi'] > d2['bi'] < d3['bi']:
xd_p.append(d2)
for j in range(1, len(bi_g) - 2):
g1, g2, g3 = bi_g[j - 1: j + 2]
if g1['bi'] < g2['bi'] > g3['bi']:
xd_p.append(g2)
raw_seq = [{"dt": bi_seq[i].dt,
'high': max(bi_seq[i].price, bi_seq[i + 1].price),
'low': min(bi_seq[i].price, bi_seq[i + 1].price)}
for i in range(1, len(bi_seq), 2) if i <= len(bi_seq) - 2]
seq = []
for row in raw_seq:
if not seq:
seq.append(row)
continue
last = seq[-1]
cur_h, cur_l = row['high'], row['low']
last_h, last_l = last['high'], last['low']
# 左包含 or 右包含
if (cur_h <= last_h and cur_l >= last_l) or (cur_h >= last_h and cur_l <= last_l):
seq.pop(-1)
# 有包含关系,按方向分别处理
if direction == "up":
last_h = max(last_h, cur_h)
last_l = max(last_l, cur_l)
elif direction == "down":
last_h = min(last_h, cur_h)
last_l = min(last_l, cur_l)
else:
raise ValueError
seq.append({"dt": row['dt'], "high": last_h, "low": last_l})
xd_p = sorted(xd_p, key=lambda x: x['dt'], reverse=False)
for xp in xd_p:
xd = dict(xp)
xd['xd'] = xd.pop('bi')
last_xd = self.xd_list[-1]
if last_xd['fx_mark'] == xd['fx_mark']:
if (last_xd['fx_mark'] == 'd' and last_xd['xd'] > xd['xd']) \
or (last_xd['fx_mark'] == 'g' and last_xd['xd'] < xd['xd']):
if self.verbose:
print("更新线段标记:from {} to {}".format(last_xd, xd))
self.xd_list[-1] = xd
else:
seq.append(row)
return seq
if (last_xd['fx_mark'] == 'd' and last_xd['xd'] > xd['xd']) \
or (last_xd['fx_mark'] == 'g' and last_xd['xd'] < xd['xd']):
continue
def _update_xd_list(self):
bi_inside = [x for x in right_bi if last_xd['dt'] <= x['dt'] <= xd['dt']]
if len(bi_inside) < 4:
if self.verbose:
print("{} - {} 之间笔标记数量少于4,跳过".format(last_xd['dt'], xd['dt']))
continue
else:
if len(bi_inside) > 4:
if self.verbose:
print("新增线段标记(笔标记数量大于4):{}".format(xd))
self.xd_list.append(xd)
else:
bi_r = [x for x in right_bi if x['dt'] >= xd['dt']]
assert bi_r[1]['fx_mark'] == bi_inside[-2]['fx_mark']
# 第一种情况:没有缺口
if (bi_r[1]['fx_mark'] == "g" and bi_r[1]['bi'] > bi_inside[-3]['bi']) \
or (bi_r[1]['fx_mark'] == "d" and bi_r[1]['bi'] < bi_inside[-3]['bi']):
if self.verbose:
print("新增线段标记(第一种情况):{}".format(xd))
self.xd_list.append(xd)
# 第二种情况:有缺口
else:
if (bi_r[1]['fx_mark'] == "g" and bi_r[1]['bi'] < bi_inside[-2]['bi']) \
or (bi_r[1]['fx_mark'] == "d" and bi_r[1]['bi'] > bi_inside[-2]['bi']):
if self.verbose:
print("新增线段标记(第二种情况):{}".format(xd))
self.xd_list.append(xd)
if (self.xd_list[-1]['fx_mark'] == 'd' and self.kline_new[-1]['low'] < self.xd_list[-1]['xd']) \
or (self.xd_list[-1]['fx_mark'] == 'g' and self.kline_new[-1]['high'] > self.xd_list[-1]['xd']):
if self.verbose:
print("最后一个线段标记无效,{}".format(self.xd_list[-1]))
self.xd_list.pop(-1)
def _update_xd_list_v2(self):
"""更新线段序列"""
if len(self.bi_list) < 4:
return
......@@ -536,6 +649,10 @@ class KlineAnalyze:
print("最后一个线段标记无效,{}".format(self.xd_list[-1]))
self.xd_list.pop(-1)
def _update_xd_list(self):
self._update_xd_list_v1()
def update(self, k):
"""更新分析结果
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册