提交 d22d53e9 编写于 作者: Z zengbin93

0.5.1: 新增Bar对象,为实时分析提供便利

上级 30b3c80b
......@@ -5,10 +5,32 @@
import pandas as pd
from .utils import plot_ka, plot_kline
from .ta import macd, ma, boll
class Bar:
"""K线蜡烛"""
def __init__(self, k: dict):
self.dt = k['dt']
self.dt = k['symbol']
self.open = k['open']
self.close = k['close']
self.high = k['high']
self.low = k['low']
self.is_end = False
def update(self, k: dict):
if k["open"] != self.open or k['dt'] != self.dt:
self.is_end = True
else:
self.close = k['close']
self.high = k['high']
self.low = k['low']
class FX:
"""分型"""
# __slots__ = ["elements", "number", 'mark', 'dt', 'price', 'is_end']
def __init__(self):
......@@ -93,6 +115,7 @@ class FX:
class BI:
"""笔"""
# __slots__ = ["elements", "number", 'mark', 'dt', 'price', 'is_end']
def __init__(self):
self.elements = []
......@@ -143,14 +166,15 @@ class BI:
class XD:
"""线段"""
def __init__(self):
self.elements = []
self.bi_g = []
self.bi_d = []
self.mark = None
self.dt = None
self.price = None
self.is_end = False
self.bi_g = []
self.bi_d = []
self.left = []
self.right = []
......@@ -168,9 +192,9 @@ class XD:
# assert self.right[0].mark == self.mark
raw_seq = [{"dt": self.right[i].dt,
'high': max(self.right[i].price, self.right[i+1].price),
'low': min(self.right[i].price, self.right[i+1].price)}
for i in range(1, len(self.right), 2) if i <= len(self.right)-2]
'high': max(self.right[i].price, self.right[i + 1].price),
'low': min(self.right[i].price, self.right[i + 1].price)}
for i in range(1, len(self.right), 2) if i <= len(self.right) - 2]
seq = []
for row in raw_seq:
......@@ -219,7 +243,7 @@ class XD:
self.mark = b2.mark
self.dt = b2.dt
self.price = b2.price
if self.elements[0].mark == "d" and len(self.bi_g) >= 3:
b1, b2, b3 = self.bi_g[-3:]
if b1.price < b2.price > b3.price:
......@@ -261,6 +285,7 @@ class XD:
class ZS:
"""中枢"""
def __init__(self):
# 组成中枢的元件:笔标记、线段标记、走势标记
self.elements = []
......@@ -319,14 +344,9 @@ class ZS:
self.inside = [x for x in self.elements if self.start_dt <= x.dt <= self.end_dt]
class PZ:
"""盘整"""
def __repr__(self):
pass
class FD:
"""走势分段:趋势 / 盘整"""
class QS:
"""趋势"""
def __repr__(self):
pass
......@@ -346,37 +366,39 @@ class KlineAnalyze(object):
:param name: str
级别名称,默认为 “本级别”
"""
self.name = name
if isinstance(kline, pd.DataFrame):
columns = kline.columns.to_list()
self.kline = [{k: v for k, v in zip(columns, row)} for row in kline.values]
bars = [{k: v for k, v in zip(columns, row)} for row in kline.values]
else:
self.kline = kline
bars = kline
self.name = name
self.kline = []
self.debug = debug
self.symbol = self.kline[0]['symbol']
self.latest_price = self.kline[-1]['close']
self.start_dt = self.kline[0]['dt']
self.end_dt = self.kline[-1]['dt']
self.symbol = bars[0]['symbol']
self.latest_price = bars[-1]['close']
self.start_dt = bars[0]['dt']
self.end_dt = bars[-1]['dt']
self.fxs = []
self.bis = []
self.xds = []
self.zss = []
# 初始化
for bar in self.kline:
self.update(bar, save=False)
for bar in bars:
self.update(bar)
def __repr__(self):
return "<KlineAnalyze of %s@%s, from %s to %s>" % (self.symbol, self.name, self.start_dt, self.end_dt)
@property
def df(self):
fx_list = {x.dt: {"fx_mark": x.mark, "fx": x.price} for x in self.fxs}
bi_list = {x.dt: {"fx_mark": x.mark, "bi": x.price} for x in self.bis}
xd_list = {x.dt: {"fx_mark": x.mark, "xd": x.price} for x in self.xds}
def to_df(self, ma_params=(5, 20), use_macd=True, use_boll=False):
bars = self.kline
fx_list = {x.dt: {"fx_mark": x.mark, "fx": x.price} for x in self.fxs if x.is_end}
bi_list = {x.dt: {"fx_mark": x.mark, "bi": x.price} for x in self.bis if x.is_end}
xd_list = {x.dt: {"fx_mark": x.mark, "xd": x.price} for x in self.xds if x.is_end}
results = []
for k in self.kline:
for k in bars:
k['fx_mark'], k['fx'], k['bi'], k['xd'] = "o", None, None, None
fx_ = fx_list.get(k['dt'], None)
bi_ = bi_list.get(k['dt'], None)
......@@ -392,13 +414,28 @@ class KlineAnalyze(object):
k['xd'] = xd_["xd"]
results.append(k)
return pd.DataFrame(results)
def update(self, bar, save=True):
df = pd.DataFrame(results)
df = ma(df, ma_params)
if use_macd:
df = macd(df)
if use_boll:
df = boll(df)
return df
def update(self, bar):
"""每次输入一根K线进行分析"""
if save:
if bar['dt'] == self.kline[-1]['dt']:
self.latest_price = bar['close']
if bar['dt'] != self.end_dt:
self.end_dt = bar['dt']
if self.kline:
if bar['open'] == self.kline[-1]['open'] or bar['dt'] == self.kline[-1]['dt']:
self.kline.pop(-1)
self.kline.append(bar)
else:
self.kline.append(bar)
else:
self.kline.append(bar)
if not self.fxs:
......@@ -406,21 +443,21 @@ class KlineAnalyze(object):
self.fxs.append(fx)
else:
fx = self.fxs[-1]
assert not fx.is_end, "最后一分型必须处于未完成状态"
assert not fx.is_end, f"最后一分型必须处于未完成状态:{fx}"
if not self.bis:
bi = BI()
self.bis.append(bi)
else:
bi = self.bis[-1]
assert not bi.is_end, "最后一笔必须处于未完成状态"
assert not bi.is_end, f"最后一笔必须处于未完成状态:{bi}"
if not self.xds:
xd = XD()
self.xds.append(xd)
else:
xd = self.xds[-1]
assert not xd.is_end, "最后一线段必须处于未完成状态"
assert not xd.is_end, f"最后一线段必须处于未完成状态:{xd}"
if not self.zss:
zs = ZS()
......@@ -458,7 +495,6 @@ class KlineAnalyze(object):
xd_new.update(bi_)
self.xds.append(xd_new)
if xd_new.is_end:
# print(f"{xd}确认结束,创建第{i}个线段标记 ...")
xd = xd_new
zs.update(xd)
continue
......@@ -497,4 +533,4 @@ class KlineAnalyze(object):
图片分辨率
:return:
"""
plot_ka(self, file_image=file_image, mav=mav, max_k_count=max_k_count, dpi=dpi)
\ No newline at end of file
plot_ka(self, file_image=file_image, mav=mav, max_k_count=max_k_count, dpi=dpi)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册