提交 3f6768d8 编写于 作者: K kailixu

chore: rsma rocksdb adaption

上级 df7866e9
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "sma.h" #include "sma.h"
#include "tq.h"
#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt #define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt
#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt #define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt
...@@ -269,7 +270,10 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat ...@@ -269,7 +270,10 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
} }
taosMemoryFree(s); taosMemoryFree(s);
} }
pStreamState = streamStateOpen(taskInfDir, NULL, true, -1, -1);
SStreamTask task = {.id.taskId = 0, .id.streamId = 0}; // TODO: assign value
task.pMeta = pVnode->pTq->pStreamMeta;
pStreamState = streamStateOpen(taskInfDir, &task, true, -1, -1);
if (!pStreamState) { if (!pStreamState) {
terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN; terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
......
...@@ -378,12 +378,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -378,12 +378,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
goto _err; goto _err;
} }
// open sma
if (smaOpen(pVnode, rollback)) {
vError("vgId:%d, failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open wal // open wal
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR); sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR);
taosRealPath(tdir, NULL, sizeof(tdir)); taosRealPath(tdir, NULL, sizeof(tdir));
...@@ -403,6 +397,12 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -403,6 +397,12 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
goto _err; goto _err;
} }
// open sma
if (smaOpen(pVnode, rollback)) {
vError("vgId:%d, failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open query // open query
if (vnodeQueryOpen(pVnode)) { if (vnodeQueryOpen(pVnode)) {
vError("vgId:%d, failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno));
......
...@@ -5,7 +5,7 @@ sleep 50 ...@@ -5,7 +5,7 @@ sleep 50
sql connect sql connect
#todo xukaili sma should use rocksdb. #todo xukaili sma should use rocksdb.
return 1 #return 1
print =============== create database with retentions print =============== create database with retentions
sql create database d0 retentions 5s:7d,5m:21d,15m:365d; sql create database d0 retentions 5s:7d,5m:21d,15m:365d;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册