提交 32edeb88 编写于 作者: Z zengbin93

add macd, boll

上级 eeaf93ad
......@@ -21,9 +21,9 @@ def preprocess(kline):
last_l = kline.loc[0, 'low']
if last_h >= kline.loc[1, 'high']:
direction = 0 # 下跌
direction = 0 # 下跌
else:
direction = 1 # 上涨
direction = 1 # 上涨
for i, row in kline.iterrows():
cur_h, cur_l = row['high'], row['low']
......@@ -46,8 +46,8 @@ def preprocess(kline):
continue
# 当期行与上一行之间无包含关系,更新上一行 high low
kline.loc[i-1, 'high_m'] = last_h
kline.loc[i-1, 'low_m'] = last_l
kline.loc[i - 1, 'high_m'] = last_h
kline.loc[i - 1, 'low_m'] = last_l
# 更新 direction, last_h, last_l
if last_h >= cur_h:
......@@ -67,7 +67,7 @@ def preprocess(kline):
return kline_new
def find_fx(kline):
def find_bi(kline):
"""找出全部分型,并验证有效性
0 - 顶分型
......@@ -126,3 +126,83 @@ def find_fx(kline):
return kline
def find_xd(kline):
"""线段查找。输入:确定了分型的 K 线;输出:加入线段查找结果的 K 线"""
# 找出所有可能的线段终点
gd1 = kline[kline['bi_mark'] == 0].iloc[0]
gd2 = kline[kline['bi_mark'] == 2].iloc[0]
if gd1['high'] < gd2['high']:
direction = "向上"
else:
direction = "向下"
i = 4
mark = 0
kline['xd_mark'] = None
while i <= kline['bi_mark'].max():
gd1 = kline[kline['bi_mark'] == i - 3].iloc[0]
dd1 = kline[kline['bi_mark'] == i - 2].iloc[0]
gd2 = kline[kline['bi_mark'] == i - 1].iloc[0]
dd2 = kline[kline['bi_mark'] == i].iloc[0]
# 第二个顶分型的最高价小于或等于第一个顶分型的最高价,向上过程有可能结束
if direction == "向上" and gd2['high'] <= gd1['high']:
kline.loc[gd1.name, 'xd_mark'] = mark
mark += 1
direction = "向下"
# 第二个底分型的最低价大于或等于第一个底分型的最低价,向下过程有可能结束
elif direction == "向下" and dd2['low'] >= dd1['low']:
kline.loc[dd1.name, 'xd_mark'] = mark
mark += 1
direction = "向上"
i += 2
# 线段有效的基础: 标准特征序列中至少含一笔
# TODO(zengbin): 检查线段的有效性
return kline
def macd(kline):
"""计算 MACD 指标
:param kline: pd.DataFrame
K线,确保含有 close 列
:return: pd.DataFrame
在原始数据中新增 diff,dea,macd 三列
"""
short_, long_, m = 12, 26, 9
kline['diff'] = kline['close'].ewm(adjust=False, alpha=2 / (short_ + 1), ignore_na=True).mean() - \
kline['close'].ewm(adjust=False, alpha=2 / (long_ + 1), ignore_na=True).mean()
kline['dea'] = kline['diff'].ewm(adjust=False, alpha=2 / (m + 1), ignore_na=True).mean()
kline['macd'] = 2 * (kline['diff'] - kline['dea'])
kline['diff'] = kline['diff'].apply(round, args=(4,))
kline['dea'] = kline['dea'].apply(round, args=(4,))
kline['macd'] = kline['macd'].apply(round, args=(4,))
return kline
def boll(kline):
"""计算 BOLL 指标
:param kline: pd.DataFrame
K线,确保含有 close 列
:return: pd.DataFrame
在原始数据中新增 BOLL 指标结果
"""
kline['boll-mid'] = kline['close'].rolling(26).mean()
kline['boll-tmp2'] = kline['close'].rolling(20).std()
kline['boll-top'] = kline['boll-mid'] + 2*kline['boll-tmp2']
kline['boll-bottom'] = kline['boll-mid'] - 2*kline['boll-tmp2']
kline['boll-mid'] = kline['boll-mid'].apply(round, args=(4,))
kline['boll-tmp2'] = kline['boll-tmp2'].apply(round, args=(4,))
kline['boll-top'] = kline['boll-top'].apply(round, args=(4,))
kline['boll-bottom'] = kline['boll-bottom'].apply(round, args=(4,))
return kline
# coding: utf-8
import unittest
from chan.a import get_kline
from chan.utils import preprocess, find_fx
class TestAnalyze(unittest.TestCase):
def test_analyze(self):
kline = get_kline(ts_code='600977.SH', start_date='20190501', end_date='20190725', freq='5min')
kline_new = preprocess(kline)
kline_fx = find_fx(kline_new)
# coding: utf-8
import unittest
from chan.a import get_kline
import chan.utils as u
class TestAnalyze(unittest.TestCase):
def setUp(self):
self.kline = get_kline(ts_code='600977.SH', start_date='20190501', end_date='20190725', freq='5min')
def test_utils(self):
kline_new = u.preprocess(self.kline)
self.assertTrue(len(kline_new) < len(self.kline))
kline_bi = u.find_bi(kline_new)
self.assertTrue("bi_mark" in kline_bi.columns)
kline_xd = u.find_xd(kline_bi)
self.assertTrue("xd_mark" in kline_xd.columns)
kline_macd = u.macd(kline_xd)
self.assertTrue("macd" in kline_macd.columns)
kline_boll = u.boll(kline_macd)
self.assertTrue("boll-top" in kline_boll.columns)
if __name__ == "__main__":
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册