From c195ea6371c348cd63c08cb3ee79f76592547d55 Mon Sep 17 00:00:00 2001 From: zengbin93 Date: Sun, 26 Apr 2020 23:09:26 +0800 Subject: [PATCH] =?UTF-8?q?0.3.1=20=E6=96=B0=E5=A2=9E=E7=BA=BF=E6=AE=B5?= =?UTF-8?q?=E3=80=81=E7=AC=94=E8=AF=86=E5=88=AB=E6=8E=A7=E5=88=B6=E5=8F=82?= =?UTF-8?q?=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- chan/analyze.py | 90 ++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 75 insertions(+), 15 deletions(-) diff --git a/chan/analyze.py b/chan/analyze.py index 91698c3..b0e59d3 100644 --- a/chan/analyze.py +++ b/chan/analyze.py @@ -34,7 +34,7 @@ def is_bei_chi(ka, zs1, zs2, direction="down", mode="bi"): if mode == 'bi': macd_sum1 = sum([abs(x) for x in k1.macd]) macd_sum2 = sum([abs(x) for x in k2.macd]) - if macd_sum1 < macd_sum2: + if macd_sum1 < macd_sum2 * 0.9: bc = True elif mode == 'xd': @@ -46,7 +46,7 @@ def is_bei_chi(ka, zs1, zs2, direction="down", mode="bi"): macd_sum2 = sum([abs(x) for x in k2.macd if x > 0]) else: raise ValueError('direction value error') - if macd_sum1 < macd_sum2: + if macd_sum1 < macd_sum2 * 0.9: bc = True else: @@ -96,7 +96,7 @@ def is_macd_cross(ka, direction="up"): class KlineAnalyze(object): - def __init__(self, kline): + def __init__(self, kline, bi_mode="new", xd_mode="loose"): """ :param kline: list of dict or pd.DataFrame @@ -108,7 +108,16 @@ class KlineAnalyze(object): {'symbol': '600797.SH', 'dt': '2020-01-08 14:30:00', 'open': 10.42, 'close': 10.41, 'high': 10.48, 'low': 10.35, 'vol': 6610000.0}, {'symbol': '600797.SH', 'dt': '2020-01-08 15:00:00', 'open': 10.42, 'close': 10.39, 'high': 10.48, 'low': 10.36, 'vol': 7160500.0} ] + :param bi_mode: str + 笔识别控制参数,默认为 new,表示新笔;如果不用新笔定义识别,设置为 old + :param xd_mode: str + 线段识别控制参数,默认为 loose,在这种模式下,只要线段标记内有三笔就满足会识别;另外一个可选值是 strict, + 在 strict 模式下,对于三笔形成的线段,要求其后的一笔不跌破或升破线段最后一笔的起始位置。 """ + assert bi_mode in ['new', 'old'], "bi_mode 参数错误" + assert xd_mode in ['loose', 'strict'], "bi_mode 参数错误" + self.bi_mode = bi_mode + self.xd_mode = xd_mode self.kline = self._preprocess(kline) self.symbol = self.kline[0]['symbol'] self.latest_price = self.kline[-1]['close'] @@ -222,8 +231,8 @@ class KlineAnalyze(object): seq = sorted(seq, key=lambda x: x['dt'], reverse=False) p = [] - for i in range(len(seq)-2): - window = seq[i: i+3] + for i in range(len(seq) - 2): + window = seq[i: i + 3] if fx_mark == 'd': if window[0][mode] >= window[1][mode] <= window[2][mode]: p.append(deepcopy(window[1])) @@ -237,17 +246,24 @@ class KlineAnalyze(object): def __handle_hist_bi(self): """识别笔标记:从已经识别出来的分型中确定能够构建笔的分型 """ + if self.bi_mode == "new": + min_k_num = 4 + elif self.bi_mode == "old": + min_k_num = 5 + else: + raise ValueError + # 符合标准的分型 kn = self.kline_new - fx_p = [] # 存储潜在笔标记 + fx_p = [] # 存储潜在笔标记 fx_p.extend(self.__extract_potential(mode='fx', fx_mark='d')) fx_p.extend(self.__extract_potential(mode='fx', fx_mark='g')) # 加入满足笔条件的连续两个分型 fx = self.fx - for i in range(len(fx)-1): + for i in range(len(fx) - 1): fx1 = fx[i] - fx2 = fx[i+1] + fx2 = fx[i + 1] k_num = [x for x in kn if fx1['dt'] <= x['dt'] <= fx2['dt']] if len(k_num) >= 4: fx_p.append(fx1) @@ -279,7 +295,7 @@ class KlineAnalyze(object): # 一笔的顶底分型之间至少包含5根K线(新笔只需要4根) k_num = [x for x in kn if k0['dt'] <= x['dt'] <= k['dt']] - if len(k_num) >= 4: + if len(k_num) >= min_k_num: bi.append(k) return bi @@ -304,7 +320,7 @@ class KlineAnalyze(object): def __handle_hist_xd(self): """识别线段标记:从已经识别出来的笔中识别线段""" - bi_p = [] # 存储潜在线段标记 + bi_p = [] # 存储潜在线段标记 bi_p.extend(self.__extract_potential(mode='bi', fx_mark='d')) bi_p.extend(self.__extract_potential(mode='bi', fx_mark='g')) bi_p = sorted(bi_p, key=lambda x: x['dt'], reverse=False) @@ -331,6 +347,15 @@ class KlineAnalyze(object): xd.pop(-1) continue bi_m = [x for x in self.bi if k0['dt'] <= x['dt'] <= k['dt']] + + if len(bi_m) == 2 and len(xd) >= 2: + k1 = xd[-2] + if (k1['fx_mark'] == 'g' and k1['xd'] < k['xd']) or \ + (k1['fx_mark'] == 'd' and k1['xd'] > k['xd']): + xd.pop(-1) + xd.pop(-1) + xd.append(k) + # 一线段内部至少三笔 if len(bi_m) >= 4: xd.append(k) @@ -449,21 +474,56 @@ class KlineAnalyze(object): :return: float """ if mode == 'xd': - latest_zs = deepcopy(self.xd[-n-1:]) + latest_zs = deepcopy(self.xd[-n - 1:]) for x in latest_zs: x['fx'] = x['xd'] elif mode == 'bi': - latest_zs = deepcopy(self.bi[-n-1:]) + latest_zs = deepcopy(self.bi[-n - 1:]) for x in latest_zs: x['fx'] = x['bi'] else: raise ValueError("mode value error, only support 'xd' or 'bi'") wave = [] - for i in range(len(latest_zs)-1): + for i in range(len(latest_zs) - 1): x1 = latest_zs[i]['fx'] - x2 = latest_zs[i+1]['fx'] - w = abs(x1-x2)/x1 + x2 = latest_zs[i + 1]['fx'] + w = abs(x1 - x2) / x1 wave.append(w) return round(sum(wave) / len(wave), 2) + def bi_bei_chi(self): + """判断最后一笔是否背驰""" + bi = deepcopy(self.bi) + # 最后一笔背驰出现的两种情况: + # 1)向上笔新高且和前一个向上笔不存在包含关系; + # 2)向下笔新低且和前一个向下笔不存在包含关系。 + if (bi[-1]['fx_mark'] == 'g' and bi[-1]["bi"] > bi[-3]["bi"] and bi[-2]["bi"] > bi[-4]["bi"]) or \ + (bi[-1]['fx_mark'] == 'd' and bi[-1]['bi'] < bi[-3]['bi'] and bi[-2]['bi'] < bi[-4]['bi']): + zs1 = [bi[-2]['dt'], bi[-1]['dt']] + zs2 = [bi[-4]['dt'], bi[-3]['dt']] + return is_bei_chi(self, zs1, zs2, mode="bi") + else: + return False + + def xd_bei_chi(self): + """判断最后一个线段是否背驰""" + xd = deepcopy(self.xd) + last_xd = xd[-1] + if last_xd['fx_mark'] == 'g': + direction = "up" + elif last_xd['fx_mark'] == 'd': + direction = "down" + else: + raise ValueError + + # 最后一个线段背驰出现的两种情况: + # 1)向上线段新高且和前一个向上线段不存在包含关系; + # 2)向下线段新低且和前一个向下线段不存在包含关系。 + if (last_xd['fx_mark'] == 'g' and xd[-1]["xd"] > xd[-3]["xd"] and xd[-2]["xd"] > xd[-4]["xd"]) or \ + (last_xd['fx_mark'] == 'd' and xd[-1]['xd'] < xd[-3]['xd'] and xd[-2]['xd'] < xd[-4]['xd']): + zs1 = [xd[-2]['dt'], xd[-1]['dt']] + zs2 = [xd[-4]['dt'], xd[-3]['dt']] + return is_bei_chi(self, zs1, zs2, direction=direction, mode="xd") + else: + return False -- GitLab