提交 e918982a 编写于 作者: K kailixu

refact: rsma adaption for new SSubmitReq

上级 385440be
...@@ -2253,7 +2253,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat ...@@ -2253,7 +2253,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
SColVal cv = COL_VAL_NULL(PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type); SColVal cv = COL_VAL_NULL(PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type);
taosArrayPush(pVals, &cv); taosArrayPush(pVals, &cv);
} else { } else {
void* data = colDataGetVarData(pColInfoData, j); void* data = colDataGetVarData(pColInfoData, j);
SValue sv = (SValue){.nData = varDataLen(data), .pData = varDataVal(data)}; // address copy, no value SValue sv = (SValue){.nData = varDataLen(data), .pData = varDataVal(data)}; // address copy, no value
SColVal cv = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, pCol->type, sv); SColVal cv = COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, pCol->type, sv);
taosArrayPush(pVals, &cv); taosArrayPush(pVals, &cv);
......
...@@ -220,7 +220,7 @@ int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); ...@@ -220,7 +220,7 @@ int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq); int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq);
int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t len, int32_t inputType); int32_t tdProcessRSmaSubmit(SSma* pSma, void* pReq, void* pMsg, int32_t len, int32_t inputType);
int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq); int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq);
int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid); int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore, bool isAdd); int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore, bool isAdd);
......
...@@ -750,8 +750,6 @@ _err: ...@@ -750,8 +750,6 @@ _err:
*/ */
static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t len, int32_t inputType, SRSmaInfo *pInfo, static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t len, int32_t inputType, SRSmaInfo *pInfo,
tb_uid_t suid) { tb_uid_t suid) {
const SSubmitReq2 *pReq = (const SSubmitReq2 *)pMsg;
void *qItem = taosAllocateQitem(len, DEF_QITEM); void *qItem = taosAllocateQitem(len, DEF_QITEM);
if (!qItem) { if (!qItem) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
...@@ -1034,7 +1032,7 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t len, int ...@@ -1034,7 +1032,7 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t len, int
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t len, int32_t inputType) { int32_t tdProcessRSmaSubmit(SSma *pSma, void *pReq, void* pMsg, int32_t len, int32_t inputType) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
if (!pEnv) { if (!pEnv) {
// only applicable when rsma env exists // only applicable when rsma env exists
...@@ -1048,7 +1046,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t len, int32_t inputTy ...@@ -1048,7 +1046,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t len, int32_t inputTy
} }
if (inputType == STREAM_INPUT__DATA_SUBMIT) { if (inputType == STREAM_INPUT__DATA_SUBMIT) {
if (tdFetchSubmitReqSuids(pMsg, &uidStore) < 0) { if (tdFetchSubmitReqSuids(pReq, &uidStore) < 0) {
smaError("vgId:%d, failed to process rsma submit fetch suid since: %s", SMA_VID(pSma), terrstr()); smaError("vgId:%d, failed to process rsma submit fetch suid since: %s", SMA_VID(pSma), terrstr());
goto _err; goto _err;
} }
......
...@@ -987,7 +987,7 @@ _exit: ...@@ -987,7 +987,7 @@ _exit:
atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1); atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1);
if (code == 0) { if (code == 0) {
atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1); atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1);
tdProcessRSmaSubmit(pVnode->pSma, pSubmitReq, len, STREAM_INPUT__DATA_SUBMIT); tdProcessRSmaSubmit(pVnode->pSma, pSubmitReq, pReq, len, STREAM_INPUT__DATA_SUBMIT);
} }
// clear // clear
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册