提交 6f377e32 编写于 作者: K kailixu

refact: tsma/rsma data format

上级 096e7054
......@@ -2246,23 +2246,17 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
continue;
}
SSubmitTbData* pTbData = (SSubmitTbData*)taosMemoryCalloc(1, sizeof(SSubmitTbData));
if (!pTbData) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _end;
}
SSubmitTbData pTbData = {0};
if (!(pTbData->aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
taosMemoryFree(pTbData);
if (!(pTbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) {
goto _end;
}
pTbData->suid = suid;
pTbData->uid = pDataBlock->info.id.groupId;
pTbData->sver = pTSchema->version;
pTbData.suid = suid;
pTbData.uid = pDataBlock->info.id.groupId;
pTbData.sver = pTSchema->version;
if (!pVals && !(pVals = taosArrayInit(colNum, sizeof(SColVal)))) {
taosArrayDestroy(pTbData->aRowP);
taosMemoryFree(pTbData);
taosArrayDestroy(pTbData.aRowP);
goto _end;
}
......@@ -2360,14 +2354,14 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
}
SRow* pRow = NULL;
if ((terrno = tRowBuild(pVals, pTSchema, &pRow)) < 0) {
tDestroySSubmitTbData(pTbData, TSDB_MSG_FLG_ENCODE);
tDestroySSubmitTbData(&pTbData, TSDB_MSG_FLG_ENCODE);
goto _end;
}
ASSERT(pRow);
taosArrayPush(pTbData->aRowP, &pRow);
taosArrayPush(pTbData.aRowP, &pRow);
}
taosArrayPush(pReq->aSubmitTbData, pTbData);
taosArrayPush(pReq->aSubmitTbData, &pTbData);
}
_end:
taosArrayDestroy(pVals);
......
......@@ -215,7 +215,7 @@ int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq);
int32_t tdProcessRSmaSubmit(SSma* pSma, void* pReq, void* pMsg, int32_t len, int32_t inputType);
int32_t tdProcessRSmaSubmit(SSma* pSma, int64_t version, void* pReq, void* pMsg, int32_t len, int32_t inputType);
int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq);
int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore, bool isAdd);
......
......@@ -741,6 +741,7 @@ _err:
* @brief Copy msg to rsmaQueueBuffer for batch process
*
* @param pSma
* @param version
* @param pMsg
* @param len
* @param inputType
......@@ -748,16 +749,20 @@ _err:
* @param suid
* @return int32_t
*/
static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t len, int32_t inputType, SRSmaInfo *pInfo,
tb_uid_t suid) {
int32_t size = sizeof(int32_t) + len;
static int32_t tdExecuteRSmaImplAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType,
SRSmaInfo *pInfo, tb_uid_t suid) {
int32_t size = sizeof(int32_t) + sizeof(int64_t) + len;
void *qItem = taosAllocateQitem(size, DEF_QITEM);
if (!qItem) {
return TSDB_CODE_FAILED;
}
*(int32_t *)qItem = len;
memcpy(POINTER_SHIFT(qItem, sizeof(int32_t)), pMsg, len);
void *pItem = qItem;
*(int32_t *)pItem = len;
pItem = POINTER_SHIFT(pItem, sizeof(int32_t));
*(int64_t *)pItem = version;
memcpy(POINTER_SHIFT(pItem, sizeof(int64_t)), pMsg, len);
taosWriteQitem(pInfo->queue, qItem);
......@@ -999,12 +1004,14 @@ static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) {
* @brief async mode
*
* @param pSma
* @param version
* @param pMsg
* @param inputType
* @param suid
* @return int32_t
*/
static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t len, int32_t inputType, tb_uid_t suid) {
static int32_t tdExecuteRSmaAsync(SSma *pSma, int64_t version, const void *pMsg, int32_t len, int32_t inputType,
tb_uid_t suid) {
SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid);
if (!pRSmaInfo) {
smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid);
......@@ -1012,7 +1019,7 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t len, int
}
if (inputType == STREAM_INPUT__DATA_SUBMIT) {
if (tdExecuteRSmaImplAsync(pSma, pMsg, len, inputType, pRSmaInfo, suid) < 0) {
if (tdExecuteRSmaImplAsync(pSma, version, pMsg, len, inputType, pRSmaInfo, suid) < 0) {
tdReleaseRSmaInfo(pSma, pRSmaInfo);
return TSDB_CODE_FAILED;
}
......@@ -1034,7 +1041,7 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t len, int
return TSDB_CODE_SUCCESS;
}
int32_t tdProcessRSmaSubmit(SSma *pSma, void *pReq, void *pMsg, int32_t len, int32_t inputType) {
int32_t tdProcessRSmaSubmit(SSma *pSma, int64_t version, void *pReq, void *pMsg, int32_t len, int32_t inputType) {
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
if (!pEnv) {
// only applicable when rsma env exists
......@@ -1054,7 +1061,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pReq, void *pMsg, int32_t len, int
}
if (uidStore.suid != 0) {
if (tdExecuteRSmaAsync(pSma, pMsg, len, inputType, uidStore.suid) < 0) {
if (tdExecuteRSmaAsync(pSma, version, pMsg, len, inputType, uidStore.suid) < 0) {
smaError("vgId:%d, failed to process rsma submit exec 1 since: %s", SMA_VID(pSma), terrstr());
goto _err;
}
......@@ -1062,7 +1069,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pReq, void *pMsg, int32_t len, int
void *pIter = NULL;
while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) {
tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
if (tdExecuteRSmaAsync(pSma, pMsg, len, inputType, *pTbSuid) < 0) {
if (tdExecuteRSmaAsync(pSma, version, pMsg, len, inputType, *pTbSuid) < 0) {
smaError("vgId:%d, failed to process rsma submit exec 2 since: %s", SMA_VID(pSma), terrstr());
goto _err;
}
......@@ -1370,7 +1377,7 @@ _end:
static void tdFreeRSmaSubmitItems(SArray *pItems) {
for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) {
SPackedData *packData = taosArrayGet(pItems, i);
taosFreeQitem(POINTER_SHIFT(packData->msgStr, -sizeof(int32_t)));
taosFreeQitem(POINTER_SHIFT(packData->msgStr, -sizeof(int32_t) - sizeof(int64_t)));
}
taosArrayClear(pItems);
}
......@@ -1439,7 +1446,10 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
void *msg = NULL;
taosGetQitem(qall, (void **)&msg);
if (msg) {
SPackedData packData = {.msgLen = *(int32_t *)msg, .msgStr = POINTER_SHIFT(msg, sizeof(int32_t))};
SPackedData packData = {.msgLen = *(int32_t *)msg,
.ver = *(int64_t *)POINTER_SHIFT(msg, sizeof(int32_t)),
.msgStr = POINTER_SHIFT(msg, sizeof(int32_t) + sizeof(int64_t))};
if (!taosArrayPush(pSubmitArr, &packData)) {
tdFreeRSmaSubmitItems(pSubmitArr);
goto _err;
......
......@@ -1026,7 +1026,7 @@ _exit:
atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1);
if (code == 0) {
atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1);
tdProcessRSmaSubmit(pVnode->pSma, pSubmitReq, pReq, len, STREAM_INPUT__DATA_SUBMIT);
tdProcessRSmaSubmit(pVnode->pSma, version, pSubmitReq, pReq, len, STREAM_INPUT__DATA_SUBMIT);
}
// clear
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册