From ecfe0c871aecb8cf01c96211a138d9e231589976 Mon Sep 17 00:00:00 2001 From: zengbin93 Date: Mon, 24 Aug 2020 16:00:34 +0800 Subject: [PATCH] =?UTF-8?q?0.5.3=20=E6=96=B0=E5=A2=9E=20is=5Fvalid=5Fxd=20?= =?UTF-8?q?=E6=96=B9=E6=B3=95=EF=BC=8C=E5=88=A4=E6=96=AD=E7=BA=BF=E6=AE=B5?= =?UTF-8?q?=E6=A0=87=E8=AE=B0=E6=98=AF=E5=90=A6=E6=9C=89=E6=95=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- czsc/analyze.py | 243 +++++++++++++++++++++++++++++++++++------------- 1 file changed, 180 insertions(+), 63 deletions(-) diff --git a/czsc/analyze.py b/czsc/analyze.py index c2fd91b..9125eb2 100644 --- a/czsc/analyze.py +++ b/czsc/analyze.py @@ -124,15 +124,110 @@ def find_zs(points): return k_zs -def has_gap(k1, k2): +def has_gap(k1, k2, min_gap=0.002): """判断 k1, k2 之间是否有缺口""" assert k2['dt'] > k1['dt'] - if k1['high'] < k2['low'] * 0.998 or k2['high'] < k1['low'] * 0.998: + if k1['high'] < k2['low'] * (1-min_gap) \ + or k2['high'] < k1['low'] * (1-min_gap): return True else: return False +def make_standard_seq(bi_seq): + """计算标准特征序列 + + :return: list of dict + """ + if bi_seq[0]['fx_mark'] == 'd': + direction = "up" + elif bi_seq[0]['fx_mark'] == 'g': + direction = "down" + else: + raise ValueError + + raw_seq = [{"dt": bi_seq[i].dt, + 'high': max(bi_seq[i].price, bi_seq[i + 1].price), + 'low': min(bi_seq[i].price, bi_seq[i + 1].price)} + for i in range(1, len(bi_seq), 2) if i <= len(bi_seq) - 2] + + seq = [] + for row in raw_seq: + if not seq: + seq.append(row) + continue + last = seq[-1] + cur_h, cur_l = row['high'], row['low'] + last_h, last_l = last['high'], last['low'] + + # 左包含 or 右包含 + if (cur_h <= last_h and cur_l >= last_l) or (cur_h >= last_h and cur_l <= last_l): + seq.pop(-1) + # 有包含关系,按方向分别处理 + if direction == "up": + last_h = max(last_h, cur_h) + last_l = max(last_l, cur_l) + elif direction == "down": + last_h = min(last_h, cur_h) + last_l = min(last_l, cur_l) + else: + raise ValueError + seq.append({"dt": row['dt'], "high": last_h, "low": last_l}) + else: + seq.append(row) + return seq + + +def is_valid_xd(bi_seq1, bi_seq2, bi_seq3): + """判断线段标记是否有效(第二个线段标记) + + :param bi_seq1: list of dict + 第一个线段标记到第二个线段标记之间的笔序列 + :param bi_seq2: + 第二个线段标记到第三个线段标记之间的笔序列 + :param bi_seq3: + 第三个线段标记之后的笔序列 + :return: + """ + assert bi_seq2[0] == bi_seq1[-1] and bi_seq3[0] == bi_seq2[-1] + + standard_bi_seq1 = make_standard_seq(bi_seq1) + + # 第一种情况(向下线段) + if bi_seq2[0]['fx_mark'] == 'd' and bi_seq2[1]['bi'] >= standard_bi_seq1[-1]['low']: + if bi_seq2[-1]['bi'] < bi_seq2[1]['bi']: + return False + + # 第一种情况(向上线段) + if bi_seq2[0]['fx_mark'] == 'g' and bi_seq2[1]['bi'] <= standard_bi_seq1[-1]['high']: + if bi_seq2[-1]['bi'] > bi_seq2[1]['bi']: + return False + + # 第二种情况(向下线段) + if bi_seq2[0]['fx_mark'] == 'd' and bi_seq2[1]['bi'] < standard_bi_seq1[-1]['low']: + bi_seq2.extend(bi_seq3) + standard_bi_seq2 = make_standard_seq(bi_seq2) + standard_bi_seq2_g = [] + for i in range(len(standard_bi_seq2) -2): + bi1, bi2, bi3 = standard_bi_seq2[i-1: i+2] + if bi1['high'] < bi2['high'] > bi3['high']: + standard_bi_seq2_g.append(bi2) + if len(standard_bi_seq2_g) == 0: + return False + + # 第二种情况(向上线段) + if bi_seq2[0]['fx_mark'] == 'g' and bi_seq2[1]['bi'] > standard_bi_seq1[-1]['high']: + bi_seq2.extend(bi_seq3) + standard_bi_seq2 = make_standard_seq(bi_seq2) + standard_bi_seq2_d = [] + for i in range(len(standard_bi_seq2) -2): + bi1, bi2, bi3 = standard_bi_seq2[i-1: i+2] + if bi1['low'] > bi2['low'] < bi3['low']: + standard_bi_seq2_d.append(bi2) + if len(standard_bi_seq2_d) == 0: + return False + return True + class KlineAnalyze: def __init__(self, kline, name="本级别", bi_mode="old", max_raw_len=10000, ma_params=(5, 20, 120), verbose=False): """ @@ -342,22 +437,7 @@ class KlineAnalyze: i += 1 def _update_bi_list(self): - """更新笔序列 - - 笔标记样例: - {'dt': Timestamp('2020-05-25 15:00:00'), - 'fx_mark': 'd', - 'fx_high': 2821.5, - 'fx_low': 2802.47, - 'bi': 2802.47} - - {'dt': Timestamp('2020-07-09 15:00:00'), - 'fx_mark': 'g', - 'fx_high': 3456.97, - 'fx_low': 3366.08, - 'bi': 3456.97} - - """ + """更新笔序列""" if len(self.fx_list) < 2: return @@ -409,57 +489,90 @@ class KlineAnalyze: print("新增笔标记:{}".format(bi)) self.bi_list.append(bi) - if (self.bi_list[-1]['fx_mark'] == 'd' and self.kline_new[-1]['low'] < self.bi_list[-1]['bi']) \ - or (self.bi_list[-1]['fx_mark'] == 'g' and self.kline_new[-1]['high'] > self.bi_list[-1]['bi']): - if self.verbose: - print("最后一个笔标记无效,{}".format(self.bi_list[-1])) - self.bi_list.pop(-1) + # if (self.bi_list[-1]['fx_mark'] == 'd' and self.kline_new[-1]['low'] < self.bi_list[-1]['bi']) \ + # or (self.bi_list[-1]['fx_mark'] == 'g' and self.kline_new[-1]['high'] > self.bi_list[-1]['bi']): + # if self.verbose: + # print("最后一个笔标记无效,{}".format(self.bi_list[-1])) + # self.bi_list.pop(-1) - @staticmethod - def _make_standard_seq(bi_seq): - """计算标准特征序列 + def _update_xd_list_v1(self): + """更新线段序列""" + if len(self.bi_list) < 4: + return - :return: list of dict - """ - if bi_seq[0]['fx_mark'] == 'd': - direction = "up" - elif bi_seq[0]['fx_mark'] == 'g': - direction = "down" + self.xd_list = self.xd_list[:-2] + if len(self.xd_list) == 0: + for i in range(3): + xd = dict(self.bi_list[i]) + xd['xd'] = xd.pop('bi') + self.xd_list.append(xd) + + if len(self.xd_list) <= 3: + right_bi = [x for x in self.bi_list if x['dt'] >= self.xd_list[-1]['dt']] else: - raise ValueError + right_bi = [x for x in self.bi_list[-200:] if x['dt'] >= self.xd_list[-1]['dt']] + xd_p = [] + bi_d = [x for x in right_bi if x['fx_mark'] == 'd'] + bi_g = [x for x in right_bi if x['fx_mark'] == 'g'] + for i in range(1, len(bi_d) - 2): + d1, d2, d3 = bi_d[i - 1: i + 2] + if d1['bi'] > d2['bi'] < d3['bi']: + xd_p.append(d2) + for j in range(1, len(bi_g) - 2): + g1, g2, g3 = bi_g[j - 1: j + 2] + if g1['bi'] < g2['bi'] > g3['bi']: + xd_p.append(g2) - raw_seq = [{"dt": bi_seq[i].dt, - 'high': max(bi_seq[i].price, bi_seq[i + 1].price), - 'low': min(bi_seq[i].price, bi_seq[i + 1].price)} - for i in range(1, len(bi_seq), 2) if i <= len(bi_seq) - 2] - - seq = [] - for row in raw_seq: - if not seq: - seq.append(row) - continue - last = seq[-1] - cur_h, cur_l = row['high'], row['low'] - last_h, last_l = last['high'], last['low'] - - # 左包含 or 右包含 - if (cur_h <= last_h and cur_l >= last_l) or (cur_h >= last_h and cur_l <= last_l): - seq.pop(-1) - # 有包含关系,按方向分别处理 - if direction == "up": - last_h = max(last_h, cur_h) - last_l = max(last_l, cur_l) - elif direction == "down": - last_h = min(last_h, cur_h) - last_l = min(last_l, cur_l) - else: - raise ValueError - seq.append({"dt": row['dt'], "high": last_h, "low": last_l}) + xd_p = sorted(xd_p, key=lambda x: x['dt'], reverse=False) + for xp in xd_p: + xd = dict(xp) + xd['xd'] = xd.pop('bi') + last_xd = self.xd_list[-1] + if last_xd['fx_mark'] == xd['fx_mark']: + if (last_xd['fx_mark'] == 'd' and last_xd['xd'] > xd['xd']) \ + or (last_xd['fx_mark'] == 'g' and last_xd['xd'] < xd['xd']): + if self.verbose: + print("更新线段标记:from {} to {}".format(last_xd, xd)) + self.xd_list[-1] = xd else: - seq.append(row) - return seq + if (last_xd['fx_mark'] == 'd' and last_xd['xd'] > xd['xd']) \ + or (last_xd['fx_mark'] == 'g' and last_xd['xd'] < xd['xd']): + continue - def _update_xd_list(self): + bi_inside = [x for x in right_bi if last_xd['dt'] <= x['dt'] <= xd['dt']] + if len(bi_inside) < 4: + if self.verbose: + print("{} - {} 之间笔标记数量少于4,跳过".format(last_xd['dt'], xd['dt'])) + continue + else: + if len(bi_inside) > 4: + if self.verbose: + print("新增线段标记(笔标记数量大于4):{}".format(xd)) + self.xd_list.append(xd) + else: + bi_r = [x for x in right_bi if x['dt'] >= xd['dt']] + assert bi_r[1]['fx_mark'] == bi_inside[-2]['fx_mark'] + # 第一种情况:没有缺口 + if (bi_r[1]['fx_mark'] == "g" and bi_r[1]['bi'] > bi_inside[-3]['bi']) \ + or (bi_r[1]['fx_mark'] == "d" and bi_r[1]['bi'] < bi_inside[-3]['bi']): + if self.verbose: + print("新增线段标记(第一种情况):{}".format(xd)) + self.xd_list.append(xd) + # 第二种情况:有缺口 + else: + if (bi_r[1]['fx_mark'] == "g" and bi_r[1]['bi'] < bi_inside[-2]['bi']) \ + or (bi_r[1]['fx_mark'] == "d" and bi_r[1]['bi'] > bi_inside[-2]['bi']): + if self.verbose: + print("新增线段标记(第二种情况):{}".format(xd)) + self.xd_list.append(xd) + + if (self.xd_list[-1]['fx_mark'] == 'd' and self.kline_new[-1]['low'] < self.xd_list[-1]['xd']) \ + or (self.xd_list[-1]['fx_mark'] == 'g' and self.kline_new[-1]['high'] > self.xd_list[-1]['xd']): + if self.verbose: + print("最后一个线段标记无效,{}".format(self.xd_list[-1])) + self.xd_list.pop(-1) + + def _update_xd_list_v2(self): """更新线段序列""" if len(self.bi_list) < 4: return @@ -536,6 +649,10 @@ class KlineAnalyze: print("最后一个线段标记无效,{}".format(self.xd_list[-1])) self.xd_list.pop(-1) + + def _update_xd_list(self): + self._update_xd_list_v1() + def update(self, k): """更新分析结果 -- GitLab