提交 3f42c415 编写于 作者: Z zengbin93

Merge branch '0.3.XX'

# Conflicts:
#	chan/solid.py
......@@ -4,13 +4,8 @@ from .analyze import KlineAnalyze
from .ta import ma, macd, boll
from .utils import plot_kline
from .solid import SolidAnalyze
from .solid import is_xd_buy, is_xd_sell
from .solid import is_first_buy, is_first_sell
from .solid import is_second_buy, is_second_sell
from .solid import is_third_buy, is_third_sell
__version__ = "0.2.21"
__version__ = "0.3.1"
__author__ = "zengbin93"
__email__ = "zeng_bin8888@163.com"
......
此差异已折叠。
此差异已折叠。
## 0.3.XX 开发文档
### 笔识别过程
划分笔的步骤:
(1)确定所有符合标准的分型。
(1.1)用大小为2的窗口在分型序列上滑动,如果窗口内的两个分型
之间存在4根以上无包含关系的K线,则保留窗口中的两个分型;
(1.2)取出所有分型中的顶分型序列,用大小为3的窗口在顶分型序列
上滑动,如果中间的分型值最大,则保留窗口中间的顶分型;
(1.3)取出所有分型中的底分型序列,同样用大小为3的窗口进行滑动,
如果中间的分型值最小,则保留窗口中间的底分型;
(2)合并第(1)步保留下来的顶底分型序列,按时间排序,遍历,如果前后两分型是同一性质的,对于顶,前面的低于后面的,只保留后面的,
前面那个可以忽略掉;对于底,前面的高于后面的,只保留后面的,前面那个可以忽略掉。不满足上面情况的,例如相等的,都可以先保留。
(3)经过步骤(2)的处理后,余下的分型,如果相邻的是顶和底,那么这就可以划为一笔。
### 线段识别过程
线段划分,按照走势是否完成可以拆分成两个任务:1)对已完成走势进行线段划分;2)对当下走势进行线段划分。
对已完成走势进行线段划分是很容易的,只要按定义来就可以,不存在模糊的概念。
如何对当下走势进行精确的线段划分是需要点功夫的,在实战中积累经验才行。
**目前,`chan` 能够对当下走势按照定义进行划分,但是这种划分结果的精度是不高的,准备使用`chan` 的交易者要理解这个问题。**
使用缠论的交易者能否赚钱,其关键就在于对当下走势的线段划分精度,
对已完成走势进行线段划分的步骤:
(1)确定所有符合标准的笔标记。
(1.1)取出所有笔标记中的顶分型序列,用大小为3的窗口在顶分型序列上滑动,如果中间的分型值最大,则保留窗口中间的笔标记;
(1.3)取出所有笔标记中的底分型序列,同样用大小为3的窗口进行滑动,如果中间的分型值最小,则保留窗口中间的的笔标记;
(2)合并第(1)步保留下来的笔标记序列,按时间排序,遍历,如果前后两分型是同一性质的,对于顶,前面的低于后面的,只保留后面的,
前面那个可以忽略掉;对于底,前面的高于后面的,只保留后面的,前面那个可以忽略掉。不满足上面情况的,例如相等的,都可以先保留。
(3)经过步骤(2)的处理后,余下的分型,如果相邻的是顶和底,那么这就可以划为一线段。
(4)处理当下走势的线段划分,即最后一个线段标记的处理。
## 买卖点识别逻辑
### 一买识别逻辑
1)必须:上级别最后一个线段标记和最后一个笔标记重合且为底分型;
2)必须:上级别最后一个向下线段内部笔标记数量大于等于6,且本级别最后一个线段标记为底分型;
3)必须:本级别向下线段背驰 或 本级别向下笔背驰;
4)辅助:下级别向下线段背驰 或 下级别向下笔背驰。
### 一卖识别逻辑
1)必须:上级别最后一个线段标记和最后一个笔标记重合且为顶分型;
2)必须:上级别最后一个向上线段内部笔标记数量大于等于6,且本级别最后一个线段标记为顶分型;
3)必须:本级别向上线段背驰 或 本级别向上笔背驰;
4)辅助:下级别向上线段背驰 或 下级别向上笔背驰。
### 二买识别逻辑
1)必须:上级别最后一个线段标记和最后一个笔标记都是底分型;
2)必须:上级别最后一个向下线段内部笔标记数量大于等于6,且本级别最后一个线段标记为底分型,不创新低;
3)必须:上级别最后一个线段标记后有且只有三个笔标记,且上级别向下笔不创新低;
4)辅助:下级别向下线段背驰 或 下级别向下笔背驰
### 二卖识别逻辑
1)必须:上级别最后一个线段标记和最后一个笔标记都是顶分型;
2)必须:上级别最后一个向上线段内部笔标记数量大于等于6,且本级别最后一个线段标记为顶分型,不创新高;
3)必须:上级别最后一个线段标记后有且只有三个笔标记,且上级别向上笔不创新低;
4)辅助:下级别向上线段背驰 或 下级别向上笔背驰
### 三买识别逻辑
1)必须:本级别有6个以上线段标记,且最后一个线段标记为底分型;
2)必须:前三段有价格重叠部分,构成中枢;
2)必须:第4段比第2段新高无背驰,第5段不跌回中枢;
4)辅助:暂无
### 三卖识别逻辑
1)必须:本级别有6个以上线段标记,且最后一个线段标记为顶分型;
2)必须:前三段有价格重叠部分,构成中枢;
2)必须:第4段比第2段新低无背驰,第5段不升回中枢;
4)辅助:暂无
### 线买识别逻辑
1) 必须:本级别至少有 3 个线段标记且最后一个线段标记为底分型;
2) 必须:本级别向下线段背驰 或 本级别向下线段不创新低;
3) 辅助:上级别向下笔背驰 或 上级别向下笔不创新低
4) 辅助:下级别向下笔背驰
### 线卖识别逻辑
1) 必须:本级别至少有 3 个线段标记且最后一个线段标记为顶分型;
2) 必须:本级别向上线段背驰 或 本级别向上线段不创新高;
3) 辅助:上级别向上笔背驰 或 上级别向上笔不创新高
4) 辅助:下级别向上笔背驰
......@@ -295,8 +295,8 @@ if __name__ == '__main__':
asset = "E"
end_date = '20200321'
freq = '1min'
file_bs = f"{ts_code}买卖点变化过程_{end_date}.xlsx"
file_html = f"{ts_code}_{freq}_{end_date}_bs.html"
file_bs = "%s买卖点变化过程_%s.xlsx" % (ts_code, end_date)
file_html = f"%s_%s_%s_bs.html" % (ts_code, freq, end_date)
# step 1. 仿真交易
trade_simulator(ts_code, end_date=end_date, file_bs=file_bs, days=150, asset=asset, watch_interval=5)
......
......@@ -8,7 +8,7 @@ from datetime import timedelta, datetime
from cobra.data.kline import kline_simulator, get_kline
from cobra.data.basic import is_trade_day
from chan import SolidAnalyze, KlineAnalyze
from chan.solid import is_macd_cross
from chan.analyze import is_macd_cross
data_path = "./data"
if not os.path.exists(data_path):
......@@ -43,8 +43,7 @@ def trade_simulator(ts_code, end_date, start_date, asset="E", watch_interval=5):
看盘间隔,单位:分钟;默认值为 5分钟看盘一次
:return: None
"""
file_signals = os.path.join(data_path, f"{ts_code}_{start_date}_{end_date}_signals.txt")
file_signals = os.path.join(data_path, "%s_%s_%s_signals.txt" % (ts_code, start_date, end_date))
end_date = datetime.strptime(end_date.replace("-", ""), "%Y%m%d")
start_date = datetime.strptime(start_date.replace("-", ""), "%Y%m%d")
......@@ -111,7 +110,8 @@ def make_one_day(ts_code, trade_date, asset="E"):
col = f'{freq}线段状态'
df[col] = df['交易时间'].apply(___xd_status)
df.to_excel(f"./data/{ts_code}_{start_date}_{end_date}_{freq}.xlsx", index=False)
file_excel = "./data/%s_%s_%s_%s.xlsx" % (ts_code, start_date, end_date, freq)
df.to_excel(file_excel, index=False)
if __name__ == '__main__':
......
# coding: utf-8
import tushare as ts
from datetime import datetime, timedelta
import sys
from functools import lru_cache
sys.path.insert(0, r'C:\git_repo\zengbin93\chan')
from cobra.data.kline import get_kline
sys.path.insert(0, '.')
sys.path.insert(0, '..')
import chan
from chan import KlineAnalyze, SolidAnalyze
from chan import KlineAnalyze
from chan.analyze import is_bei_chi
print(chan.__version__)
# 首次使用,需要在这里设置你的 tushare token,用于获取数据;在同一台机器上,tushare token 只需要设置一次
# 没有 token,到 https://tushare.pro/register?reg=7 注册获取
# ts.set_token("your tushare token")
def _get_start_date(end_date, freq):
end_date = datetime.strptime(end_date, '%Y%m%d')
if freq == '1min':
start_date = end_date - timedelta(days=60)
elif freq == '5min':
start_date = end_date - timedelta(days=150)
elif freq == '30min':
start_date = end_date - timedelta(days=1000)
elif freq == 'D':
start_date = end_date - timedelta(weeks=1000)
elif freq == 'W':
start_date = end_date - timedelta(weeks=1000)
else:
raise ValueError("'freq' value error, current value is %s, "
"optional valid values are ['1min', '5min', '30min', "
"'D', 'W']" % freq)
return start_date
def get_kline(ts_code, end_date, freq='30min', asset='E'):
"""获取指定级别的前复权K线
:param ts_code: str
股票代码,如 600122.SH
:param freq: str
K线级别,可选值 [1min, 5min, 15min, 30min, 60min, D, M, Y]
:param end_date: str
日期,如 20190610
:param asset: str
交易资产类型,可选值 E股票 I沪深指数 C数字货币 FT期货 FD基金 O期权 CB可转债(v1.2.39),默认E
:return: pd.DataFrame
columns = ["symbol", "dt", "open", "close", "high", "low", "vol"]
"""
start_date = _get_start_date(end_date, freq)
start_date = start_date.date().__str__().replace("-", "")
df = ts.pro_bar(ts_code=ts_code, freq=freq, start_date=start_date, end_date=end_date,
adj='qfq', asset=asset)
# 统一 k 线数据格式为 6 列,分别是 ["symbol", "dt", "open", "close", "high", "low", "vr"]
if "min" in freq:
df.rename(columns={'ts_code': "symbol", "trade_time": "dt"}, inplace=True)
else:
df.rename(columns={'ts_code': "symbol", "trade_date": "dt"}, inplace=True)
df.drop_duplicates(subset='dt', keep='first', inplace=True)
df.sort_values('dt', inplace=True)
df['dt'] = df.dt.apply(str)
if freq.endswith("min"):
# 清理 9:30 的空数据
df['not_start'] = df.dt.apply(lambda x: not x.endswith("09:30:00"))
df = df[df['not_start']]
df.reset_index(drop=True, inplace=True)
def test_bei_chi():
df = get_kline(ts_code="000001.SH", end_dt="2020-04-28 15:00:00", freq='D', asset='I')
ka = KlineAnalyze(df)
k = df[['symbol', 'dt', 'open', 'close', 'high', 'low', 'vol']]
# 线段背驰
zs1 = ['2018-07-26 15:00:00', '2018-10-19 15:00:00']
zs2 = ['2018-01-29 15:00:00', '2018-07-06 15:00:00']
assert is_bei_chi(ka, zs1, zs2, direction='down', mode='xd', adjust=0.9)
for col in ['open', 'close', 'high', 'low']:
k[col] = k[col].apply(round, args=(2,))
return k
zs1 = ['2013-12-10 15:00:00', '2014-01-20 15:00:00']
zs2 = ['2013-09-12 15:00:00', '2013-11-14 15:00:00']
assert not is_bei_chi(ka, zs1, zs2, direction='down', mode='xd', adjust=0.9)
# 笔背驰
zs1 = ['2019-05-17 15:00:00', '2019-06-10 15:00:00']
zs2 = ['2019-04-08 15:00:00', '2019-05-10 15:00:00']
assert is_bei_chi(ka, zs1, zs2, mode='bi', adjust=0.9)
def get_klines(ts_code, end_date, freqs='1min,5min,30min,D', asset='E'):
"""获取不同级别K线"""
freq_map = {"1min": "1分钟", "5min": "5分钟", "30min": "30分钟", "D": "日线"}
klines = dict()
freqs = freqs.split(",")
for freq in freqs:
df = get_kline(ts_code, end_date, freq=freq, asset=asset)
klines[freq_map[freq]] = df
return klines
zs1 = ['2018-09-28 15:00:00', '2018-10-19 15:00:00']
zs2 = ['2018-08-28 15:00:00', '2018-09-12 15:00:00']
assert not is_bei_chi(ka, zs1, zs2, mode='bi', adjust=0.9)
def test_kline_analyze():
df = get_kline(ts_code="300803.SZ", freq='5min', end_date="20200316")
df = get_kline(ts_code="300008.SZ", end_dt="2020-03-23 15:00:00", freq='30min', asset='E')
ka = KlineAnalyze(df)
# 测试识别结果
assert ka.bi[-1]['fx_mark'] == 'g'
assert ka.xd[-1]['fx_mark'] == 'g'
@lru_cache(maxsize=128)
def create_sa(ts_code, end_date):
klines = get_klines(ts_code=ts_code, freqs='1min,5min,30min,D', asset="E", end_date=end_date)
sa = SolidAnalyze(klines)
return sa
# 测试背驰识别
assert not ka.bi_bei_chi()
assert not ka.xd_bei_chi()
def test_solid_analyze():
test_data = [
{"ts_code": '300033.SZ', "freq": "5分钟", "end_date": "20200307", "bs": "二买"},
{"ts_code": '300033.SZ', "freq": "1分钟", "end_date": "20200307", "bs": "二买"},
{"ts_code": '000012.SZ', "freq": "5分钟", "end_date": "20200307", "bs": "二卖"},
{"ts_code": '002405.SZ', "freq": "5分钟", "end_date": "20200307", "bs": "一卖"},
{"ts_code": '603383.SH', "freq": "日线", "end_date": "20200227", "bs": "线卖"},
]
for row in test_data:
print("=" * 100)
print(row)
sa = create_sa(row['ts_code'], row['end_date'])
if row['bs'] == '二买':
b, detail = sa.is_second_buy(row['freq'], tolerance=0.1)
print(b, detail)
elif row['bs'] == '二卖':
b, detail = sa.is_second_sell(row['freq'], tolerance=0.1)
print(b, detail)
elif row['bs'] == '一卖':
b, detail = sa.is_first_sell(row['freq'], tolerance=0.1)
print(b, detail)
print('\n')
# coding: utf-8
import sys
from functools import lru_cache
from cobra.data.kline import get_klines
sys.path.insert(0, r'C:\git_repo\zengbin93\chan')
sys.path.insert(0, r'C:\git_repo\zengbin93\cobra')
sys.path.insert(0, '.')
sys.path.insert(0, '..')
import chan
from chan import KlineAnalyze, SolidAnalyze
from chan.solid import SolidAnalyze, nested_intervals, is_in_tolerance
print(chan.__version__)
def test_solid_analyze():
ts_code = "300033.SZ"
klines = get_klines(ts_code, end_date='2020-04-03 14:00:00', asset='E', freqs='1min,5min,30min,D')
def test_in_tolerance():
assert not is_in_tolerance(10, 10.31, tolerance=0.03)
assert not is_in_tolerance(10, 9.61, tolerance=0.03)
assert not is_in_tolerance(10, 9, tolerance=0.03)
assert is_in_tolerance(10, 10.3, tolerance=0.03)
assert is_in_tolerance(10, 10.15, tolerance=0.03)
assert is_in_tolerance(10, 9.8, tolerance=0.03)
for k, v in klines.items():
print(k, v.tail(5), '\n\n')
def test_solid_analyze():
ts_code = "000001.SH"
klines = get_klines(ts_code, end_date='2020-04-03 14:00:00', asset='I', freqs='1min,5min,30min,D')
sa = SolidAnalyze(klines)
for func in [sa.is_first_buy, sa.is_first_sell, sa.is_second_buy, sa.is_second_sell,
sa.is_third_buy, sa.is_third_sell, sa.is_xd_buy, sa.is_xd_sell]:
for freq in ['1分钟', '5分钟', '30分钟']:
b, detail = func(freq, tolerance=0.1)
if b:
print(detail)
ka = sa.kas['30分钟']
ka1 = sa.kas['日线']
ka2 = sa.kas['5分钟']
print(nested_intervals(ka, ka1, ka2))
# for func in [sa.is_first_buy, sa.is_first_sell, sa.is_second_buy, sa.is_second_sell,
# sa.is_third_buy, sa.is_third_sell, sa.is_xd_buy, sa.is_xd_sell]:
# for freq in ['1分钟', '5分钟', '30分钟']:
# b, detail = func(freq, tolerance=0.1)
# if b:
# print(detail)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册