提交 7d204483 编写于 作者: Z zengbin93

0.5.1: 删除文件

上级 b204d7bd
# coding: utf-8
import os
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.ensemble import AdaBoostClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, KFold
from sklearn.metrics import classification_report
import xgboost as xgb
data = []
for file in os.listdir("./data"):
if file.endswith("xlsx") and "30min" in file:
file_data = f"./data/{file}"
df_ = pd.read_excel(file_data)
print(f"load {file_data} success.")
df = pd.concat(data)
print("data shape(before drop duplicates): ", df.shape)
x_cols = ['1分钟分型标记', '1分钟笔标记', '1分钟线段标记', '1分钟MACD金叉',
'1分钟MACD死叉', '5分钟分型标记', '5分钟笔标记', '5分钟线段标记', '5分钟MACD金叉', '5分钟MACD死叉',
'30分钟分型标记', '30分钟笔标记', '30分钟线段标记', '30分钟MACD金叉', '30分钟MACD死叉', '日线分型标记',
'日线笔标记', '日线线段标记', '日线MACD金叉', '日线MACD死叉']
y_col = '30min线段状态'
df.drop_duplicates(subset=x_cols + [y_col], inplace=True)
print("data shape(after drop duplicates): ", df.shape)
df0 = df[df[y_col] == '向上段']
df1 = df[df[y_col] == '向下段']
# 降采样获取均衡数据集
n_sample = min(len(df0), len(df1))
df = pd.concat([df0.sample(n_sample, random_state=42), df1.sample(n_sample, random_state=42)])
X = df[x_cols].values
y = df[y_col].apply(lambda x: 1 if x == '向上段' else 0).values
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)
k_fold = KFold(n_splits=5, shuffle=True, random_state=42)
def run_logistic_regression():
model = LogisticRegression(penalty='l1', random_state=42)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
print(classification_report(y_test, y_pred))
for k, (train, test) in enumerate(k_fold.split(X, y)):
model.fit(X[train], y[train])
y_pred = model.predict(X[test])
print(k, "=" * 100)
print(classification_report(y[test], y_pred), '\n\n')
def run_svc():
model = LinearSVC(penalty='l2', tol=1e-8, max_iter=10000, random_state=42, verbose=True)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
print(classification_report(y_test, y_pred))
for k, (train, test) in enumerate(k_fold.split(X, y)):
model.fit(X[train], y[train])
y_pred = model.predict(X[test])
print(k, "=" * 100)
print(classification_report(y[test], y_pred), '\n\n')
def run_ada_boost():
model = AdaBoostClassifier(n_estimators=100, random_state=0)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
print(classification_report(y_test, y_pred))
for k, (train, test) in enumerate(k_fold.split(X, y)):
model.fit(X[train], y[train])
y_pred = model.predict(X[test])
print(k, "=" * 100)
print(classification_report(y[test], y_pred), '\n\n')
def run_random_forest():
model = RandomForestClassifier(n_estimators=100, max_depth=2, random_state=0)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
print(classification_report(y_test, y_pred))
for k, (train, test) in enumerate(k_fold.split(X, y)):
model.fit(X[train], y[train])
y_pred = model.predict(X[test])
print(k, "=" * 100)
print(classification_report(y[test], y_pred), '\n\n')
def run_xgboost():
model = xgb.XGBClassifier()
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
print(classification_report(y_test, y_pred))
for k, (train, test) in enumerate(k_fold.split(X, y)):
model = model.fit(X[train], y[train])
y_pred = model.predict(X[test])
print(k, "=" * 100)
print(classification_report(y[test], y_pred), '\n\n')
if __name__ == '__main__':
# run_ada_boost()
# run_random_forest()
# coding: utf-8
import traceback
import os
import pandas as pd
from copy import deepcopy
from datetime import timedelta, datetime
from cobra.data.kline import kline_simulator, get_kline
from cobra.data.basic import is_trade_day
from czsc import SolidAnalyze, KlineAnalyze
from czsc.analyze import is_macd_cross
data_path = "./data"
if not os.path.exists(data_path):
def get_data(sa):
mark_convert = {"d": 0, "g": 1}
signals = {"交易标的": sa.symbol, "交易时间": sa.kas['1分钟'].end_dt}
for freq, ka in sa.kas.items():
signals[freq + "分型标记"] = mark_convert[ka.fx[-1]['fx_mark']]
signals[freq + "笔标记"] = mark_convert[ka.bi[-1]['fx_mark']]
signals[freq + "线段标记"] = mark_convert[ka.xd[-1]['fx_mark']]
signals[freq + "MACD金叉"] = int(is_macd_cross(ka, direction='up'))
signals[freq + "MACD死叉"] = int(is_macd_cross(ka, direction='down'))
return signals
def trade_simulator(ts_code, end_date, start_date, asset="E", watch_interval=5):
:param ts_code: str
标的代码,如 300033.SZ
:param end_date: str
截止日期,如 2020-03-12
:param start_date: str
:param asset: str
tushare 中的资产类型编码
:param watch_interval: int
看盘间隔,单位:分钟;默认值为 5分钟看盘一次
:return: None
file_signals = os.path.join(data_path, "%s_%s_%s_signals.txt" % (ts_code, start_date, end_date))
end_date = datetime.strptime(end_date.replace("-", ""), "%Y%m%d")
start_date = datetime.strptime(start_date.replace("-", ""), "%Y%m%d")
while start_date <= end_date:
if (asset in ["E", "I"]) and (not is_trade_day(start_date.strftime('%Y%m%d'))):
start_date += timedelta(days=1)
ks = kline_simulator(ts_code, trade_dt=start_date.strftime('%Y-%m-%d'), asset=asset, count=1000)
for i, klines in enumerate(ks.__iter__(), 1):
if i % watch_interval != 0:
sa = SolidAnalyze(klines)
signals = get_data(sa)
with open(file_signals, 'a', encoding='utf-8') as f:
f.write(str(signals) + "\n")
start_date += timedelta(days=1)
def make_one_day(ts_code, trade_date, asset="E"):
if "-" in trade_date:
end_date = datetime.strptime(trade_date, '%Y-%m-%d')
end_date = datetime.strptime(trade_date, '%Y-%m-%d')
start_date = end_date - timedelta(days=1)
end_dt = end_date + timedelta(days=30)
start_date = start_date.strftime("%Y-%m-%d")
end_date = end_date.strftime("%Y-%m-%d")
if not is_trade_day(start_date):
print(f"start trade simulator on {start_date}")
trade_simulator(ts_code=ts_code, start_date=start_date,
end_date=end_date, asset=asset, watch_interval=1)
for freq in ['1min', '5min', '30min']:
file_signals = os.path.join(data_path, f"{ts_code}_{start_date}_{end_date}_signals.txt")
signals = [eval(x) for x in open(file_signals, encoding='utf-8').readlines()]
df = pd.DataFrame(signals)
kline = get_kline(ts_code, end_dt=end_dt.strftime("%Y-%m-%d %H:%M:%S"), freq=freq, asset=asset)
ka = KlineAnalyze(kline)
print(kline.head(), "\n\n")
xd = deepcopy(ka.xd)
xd = sorted(xd, key=lambda row: row['dt'], reverse=False)
print(xd, "\n\n")
def ___xd_status(dt):
for x in xd:
if x['dt'] >= dt:
if x['fx_mark'] == 'd':
s = "向下段"
elif x['fx_mark'] == 'g':
s = "向上段"
raise ValueError
return s
return "o"
col = f'{freq}线段状态'
df[col] = df['交易时间'].apply(___xd_status)
file_excel = "./data/%s_%s_%s_%s.xlsx" % (ts_code, start_date, end_date, freq)
df.to_excel(file_excel, index=False)
if __name__ == '__main__':
ts_code = "000001.SH"
start_date = "2019-08-01"
end_date = "2019-10-01"
asset = 'I'
start_date = datetime.strptime(start_date, "%Y-%m-%d")
end_date = datetime.strptime(end_date, "%Y-%m-%d")
while start_date < end_date:
start_date += timedelta(days=1)
make_one_day(ts_code, start_date.strftime("%Y-%m-%d"), asset)
# 线段当下结束的判断
* 1)仿真交易,获取判断线段结束需要的特征,并构建线段方向的分类数据集;
* 2)训练分类器,得到模型,实盘中,输入特征,得到线段方向。
**Note:** 仿真依赖 `cobra`,执行 `pip install git+git://github.com/zengbin93/cobra.git -U` 进行安装
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册