提交 27af0747 编写于 作者: L Liu Jicong

feat(sma): using separate vg

上级 0dcd7ab8
...@@ -588,7 +588,8 @@ typedef struct { ...@@ -588,7 +588,8 @@ typedef struct {
int8_t status; int8_t status;
int8_t createdBy; // STREAM_CREATED_BY__USER or SMA int8_t createdBy; // STREAM_CREATED_BY__USER or SMA
int32_t fixedSinkVgId; // 0 for shuffle int32_t fixedSinkVgId; // 0 for shuffle
int64_t smaId; // 0 for unused SVgObj fixedSinkVg;
int64_t smaId; // 0 for unused
int8_t trigger; int8_t trigger;
int32_t triggerParam; int32_t triggerParam;
int64_t waterMark; int64_t waterMark;
......
...@@ -229,11 +229,14 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr ...@@ -229,11 +229,14 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
taosArrayPush(tasks, &pTask); taosArrayPush(tasks, &pTask);
pTask->nodeId = pStream->fixedSinkVgId; pTask->nodeId = pStream->fixedSinkVgId;
#if 0
SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId); SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
return -1; return -1;
} }
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
#endif
pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg);
// source // source
pTask->sourceType = TASK_SOURCE__MERGE; pTask->sourceType = TASK_SOURCE__MERGE;
pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK; pTask->inputType = TASK_INPUT_TYPE__DATA_BLOCK;
...@@ -254,7 +257,8 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr ...@@ -254,7 +257,8 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr
// dispatch // dispatch
pTask->dispatchType = TASK_DISPATCH__NONE; pTask->dispatchType = TASK_DISPATCH__NONE;
mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId); /*mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);*/
mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pStream->fixedSinkVg.vgId);
return 0; return 0;
} }
......
...@@ -295,9 +295,9 @@ static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSm ...@@ -295,9 +295,9 @@ static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSm
} }
static void *mndBuildVDropSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma, int32_t *pContLen) { static void *mndBuildVDropSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSma, int32_t *pContLen) {
SEncoder encoder = {0}; SEncoder encoder = {0};
int32_t contLen; int32_t contLen;
SName name = {0}; SName name = {0};
tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&name, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
SVDropTSmaReq req = {0}; SVDropTSmaReq req = {0};
...@@ -482,14 +482,6 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea ...@@ -482,14 +482,6 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
memcpy(smaObj.ast, pCreate->ast, smaObj.astLen); memcpy(smaObj.ast, pCreate->ast, smaObj.astLen);
} }
SVgObj smaVgObj = {0};
if (mndAllocSmaVgroup(pMnode, pDb, &smaVgObj) != 0) {
mError("sma:%s, failed to create since %s", smaObj.name, terrstr());
return -1;
}
smaObj.dstVgId = smaVgObj.vgId;
SStreamObj streamObj = {0}; SStreamObj streamObj = {0};
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN); tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN); tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
...@@ -502,7 +494,12 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea ...@@ -502,7 +494,12 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
streamObj.createdBy = STREAM_CREATED_BY__SMA; streamObj.createdBy = STREAM_CREATED_BY__SMA;
streamObj.fixedSinkVgId = smaObj.dstVgId; streamObj.fixedSinkVgId = smaObj.dstVgId;
streamObj.smaId = smaObj.uid; streamObj.smaId = smaObj.uid;
/*streamObj.physicalPlan = "";*/
if (mndAllocSmaVgroup(pMnode, pDb, &streamObj.fixedSinkVg) != 0) {
mError("sma:%s, failed to create since %s", smaObj.name, terrstr());
return -1;
}
smaObj.dstVgId = streamObj.fixedSinkVg.vgId;
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_SMA, pReq); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_SMA, pReq);
...@@ -512,11 +509,11 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea ...@@ -512,11 +509,11 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
mndTransSetDbInfo(pTrans, pDb); mndTransSetDbInfo(pTrans, pDb);
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &smaVgObj) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &smaVgObj) != 0) goto _OVER; if (mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &smaVgObj) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg) != 0) goto _OVER;
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, STREAM_TRIGGER_AT_ONCE, 0, pTrans) != 0) goto _OVER; if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, STREAM_TRIGGER_AT_ONCE, 0, pTrans) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
...@@ -656,7 +653,7 @@ static int32_t mndSetDropSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj * ...@@ -656,7 +653,7 @@ static int32_t mndSetDropSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *
return 0; return 0;
} }
static int32_t mndSetDropSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) { static int32_t mndSetDropSmaVgroupRedoLogs(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup) {
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup); SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroup);
if (pVgRaw == NULL) return -1; if (pVgRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1; if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册