提交 318807ff 编写于 作者: Z zengbin93

0.4.2 优化性能,彻底清除deepcopy,优化线段标记

上级 2aafe8bd
...@@ -7,7 +7,7 @@ from .solid import SolidAnalyze ...@@ -7,7 +7,7 @@ from .solid import SolidAnalyze
from .solid import is_in_tolerance, is_first_buy, is_first_sell, is_second_buy, \ from .solid import is_in_tolerance, is_first_buy, is_first_sell, is_second_buy, \
is_second_sell, is_third_buy, is_third_sell, is_xd_buy, is_xd_sell is_second_sell, is_third_buy, is_third_sell, is_xd_buy, is_xd_sell
__version__ = "0.4.1" __version__ = "0.4.2"
__author__ = "zengbin93" __author__ = "zengbin93"
__email__ = "zeng_bin8888@163.com" __email__ = "zeng_bin8888@163.com"
......
# coding: utf-8 # coding: utf-8
import traceback import traceback
from copy import deepcopy
import pandas as pd import pandas as pd
from functools import lru_cache from functools import lru_cache
...@@ -292,19 +291,34 @@ class KlineAnalyze(object): ...@@ -292,19 +291,34 @@ class KlineAnalyze(object):
raise ValueError raise ValueError
k_new.pop(-1) k_new.pop(-1)
k_new.append({ if k['open'] >= k['close']:
"symbol": k['symbol'], k_new.append({
"dt": k['dt'], "symbol": k['symbol'],
"open": k['open'], "dt": k['dt'],
"close": k['close'], "open": last_h,
"high": last_h, "close": last_l,
"low": last_l, "high": last_h,
"vol": k['vol'], "low": last_l,
"fx_mark": k['fx_mark'], "vol": k['vol'],
"fx": k['fx'], "fx_mark": k['fx_mark'],
"bi": k['bi'], "fx": k['fx'],
"xd": k['xd'], "bi": k['bi'],
}) "xd": k['xd'],
})
else:
k_new.append({
"symbol": k['symbol'],
"dt": k['dt'],
"open": last_l,
"close": last_h,
"high": last_h,
"low": last_l,
"vol": k['vol'],
"fx_mark": k['fx_mark'],
"fx": k['fx'],
"bi": k['bi'],
"xd": k['xd'],
})
else: else:
# 无包含关系,更新 K 线 # 无包含关系,更新 K 线
k_new.append({ k_new.append({
...@@ -367,21 +381,31 @@ class KlineAnalyze(object): ...@@ -367,21 +381,31 @@ class KlineAnalyze(object):
p = [seq[0]] p = [seq[0]]
i = 1 i = 1
while i < len(seq): while i < len(seq):
s1 = p[-1]
s2 = seq[i]
if fx_mark == 'd': if fx_mark == 'd':
# 对于底,前面的高于后面的,只保留后面的 # 对于底,前面的高于后面的,只保留后面的
if s1[mode] >= s2[mode]: s1 = seq[i-1]
p.pop(-1) s2 = seq[i]
p.append(s2) if i == len(seq) - 1:
p.append(s2)
else:
s3 = seq[i + 1]
if s1[mode] > s2[mode] < s3[mode]:
p.append(s2)
elif fx_mark == 'g': elif fx_mark == 'g':
# 对于顶,前面的低于后面的,只保留后面的 # 对于顶,前面的低于后面的,只保留后面的
if s1[mode] <= s2[mode]: s1 = seq[i-1]
p.pop(-1) s2 = seq[i]
p.append(s2) if i == len(seq) - 1:
p.append(s2)
else:
s3 = seq[i + 1]
if s1[mode] < s2[mode] > s3[mode]:
p.append(s2)
else: else:
raise ValueError raise ValueError
i += 1 i += 1
return p return p
def __handle_hist_bi(self): def __handle_hist_bi(self):
...@@ -415,9 +439,11 @@ class KlineAnalyze(object): ...@@ -415,9 +439,11 @@ class KlineAnalyze(object):
# 确认哪些分型可以构成笔 # 确认哪些分型可以构成笔
bi = [] bi = []
for i in range(len(fx_p)): for i in range(len(fx_p)):
k = deepcopy(fx_p[i]) k = {
k['bi'] = k['fx'] "dt": fx_p[i]['dt'],
del k['fx'] "fx_mark": fx_p[i]['fx_mark'],
"bi": fx_p[i]['fx'],
}
if len(bi) == 0: if len(bi) == 0:
bi.append(k) bi.append(k)
else: else:
...@@ -434,7 +460,7 @@ class KlineAnalyze(object): ...@@ -434,7 +460,7 @@ class KlineAnalyze(object):
bi.pop(-1) bi.pop(-1)
continue continue
# 一笔的顶底分型之间至少包含5根K线(新笔只需要4根) # 一笔的顶底分型之间至少包含5根K线
k_num = [x for x in kn if k0['dt'] <= x['dt'] <= k['dt']] k_num = [x for x in kn if k0['dt'] <= x['dt'] <= k['dt']]
if len(k_num) >= min_k_num: if len(k_num) >= min_k_num:
bi.append(k) bi.append(k)
...@@ -477,9 +503,11 @@ class KlineAnalyze(object): ...@@ -477,9 +503,11 @@ class KlineAnalyze(object):
xd = [] xd = []
for i in range(len(bi_p)): for i in range(len(bi_p)):
k = deepcopy(bi_p[i]) k = {
k['xd'] = k['bi'] "dt": bi_p[i]['dt'],
del k['bi'] "fx_mark": bi_p[i]['fx_mark'],
"xd": bi_p[i]['bi'],
}
if len(xd) == 0: if len(xd) == 0:
xd.append(k) xd.append(k)
else: else:
...@@ -496,6 +524,7 @@ class KlineAnalyze(object): ...@@ -496,6 +524,7 @@ class KlineAnalyze(object):
(k0['fx_mark'] == 'd' and k['xd'] <= k0['xd']): (k0['fx_mark'] == 'd' and k['xd'] <= k0['xd']):
xd.pop(-1) xd.pop(-1)
continue continue
bi_m = [x for x in self.bi if k0['dt'] <= x['dt'] <= k['dt']] bi_m = [x for x in self.bi if k0['dt'] <= x['dt'] <= k['dt']]
bi_r = [x for x in self.bi if x['dt'] >= k['dt']] bi_r = [x for x in self.bi if x['dt'] >= k['dt']]
# 一线段内部至少三笔 # 一线段内部至少三笔
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册