From d5cd2d36af188c3b5f451c8216f8fcad258ed3d2 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Fri, 17 Feb 2023 15:55:55 +0800 Subject: [PATCH] feat: compatible with older versions of wal --- source/dnode/vnode/src/tq/tqPush.c | 4 ++-- source/dnode/vnode/src/tq/tqSink.c | 30 ++++++++++++------------- source/libs/executor/src/dataInserter.c | 9 ++++---- 3 files changed, 22 insertions(+), 21 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 559a3b76fe..2e3dc86ce9 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -207,8 +207,8 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ #endif int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { - void* pReq = POINTER_SHIFT(msg, sizeof(SMsgHead)); - int32_t len = msgLen - sizeof(SMsgHead); + void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg)); + int32_t len = msgLen - sizeof(SSubmitReq2Msg); tqDebug("vgId:%d, tq push msg ver %" PRId64 ", type: %s, p head %p, p body %p, len %d", pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType), msg, pReq, len); diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 29a25e4cd0..7a8d899a19 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -71,7 +71,6 @@ int32_t tqBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl return 0; } - void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { const SArray* pBlocks = (const SArray*)data; SVnode* pVnode = (SVnode*)vnode; @@ -324,7 +323,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d } static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) { - int32_t ret = 0; + int32_t ret = 0; tEncodeSize(tEncodeSVCreateTbBatchReq, pReqs, *contLen, ret); if (ret < 0) { @@ -340,7 +339,7 @@ static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t v ((SMsgHead*)(*pBuf))->vgId = vgId; ((SMsgHead*)(*pBuf))->contLen = htonl(*contLen); SEncoder coder = {0}; - tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead) ); + tEncoderInit(&coder, POINTER_SHIFT(*pBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead)); if (tEncodeSVCreateTbBatchReq(&coder, pReqs) < 0) { rpcFreeCont(*pBuf); *pBuf = NULL; @@ -440,7 +439,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* goto _end; } for (int32_t rowId = 0; rowId < rows; rowId++) { - SVCreateTbReq createTbReq = {0}; + SVCreateTbReq createTbReq = {0}; SVCreateTbReq* pCreateTbReq = &createTbReq; if (!pCreateTbReq) { goto _end; @@ -482,9 +481,9 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* } for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) { SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId); - STagVal tagVal = { - .cid = pTSchema->numOfCols + step, - .type = pTagData->info.type, + STagVal tagVal = { + .cid = pTSchema->numOfCols + step, + .type = pTagData->info.type, }; void* pData = colDataGetData(pTagData, rowId); if (colDataIsNull_s(pTagData, rowId)) { @@ -514,7 +513,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* SColumnInfoData* pTbColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX); if (colDataIsNull_s(pTbColInfo, rowId)) { SColumnInfoData* pGpIdColInfo = taosArrayGet(pDataBlock->pDataBlock, UD_GROUPID_COLUMN_INDEX); - void* pGpIdData = colDataGetData(pGpIdColInfo, rowId); + void* pGpIdData = colDataGetData(pGpIdColInfo, rowId); pCreateTbReq->name = buildCtbNameByGroupId(stbFullName, *(uint64_t*)pGpIdData); } else { void* pTbData = colDataGetData(pTbColInfo, rowId); @@ -639,16 +638,16 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* taosArrayClear(pVals); int32_t dataIndex = 0; for (int32_t k = 0; k < pTSchema->numOfCols; k++) { - const STColumn* pCol = &pTSchema->columns[k]; + const STColumn* pCol = &pTSchema->columns[k]; if (k == 0) { SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); - void* colData = colDataGetData(pColData, j); + void* colData = colDataGetData(pColData, j); tqDebug("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData); } if (IS_SET_NULL(pCol)) { SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); taosArrayPush(pVals, &cv); - } else{ + } else { SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); if (colDataIsNull_s(pColData, j)) { SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); @@ -692,14 +691,15 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* int32_t code; tEncodeSize(tEncodeSSubmitReq2, &submitReq, len, code); SEncoder encoder; - len += sizeof(SMsgHead); + len += sizeof(SSubmitReq2Msg); pBuf = rpcMallocCont(len); if (NULL == pBuf) { goto _end; } - ((SMsgHead*)pBuf)->vgId = TD_VID(pVnode); - ((SMsgHead*)pBuf)->contLen = htonl(len); - tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead)); + ((SSubmitReq2Msg*)pBuf)->header.vgId = TD_VID(pVnode); + ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len); + ((SSubmitReq2Msg*)pBuf)->version = htobe64(1); + tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg)); if (tEncodeSSubmitReq2(&encoder, &submitReq) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("failed to encode submit req since %s", terrstr()); diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index a823baa2ae..16b43b560c 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -129,14 +129,15 @@ static int32_t submitReqToMsg(int32_t vgId, SSubmitReq2* pReq, void** pData, int tEncodeSize(tEncodeSSubmitReq2, pReq, len, code); if (TSDB_CODE_SUCCESS == code) { SEncoder encoder; - len += sizeof(SMsgHead); + len += sizeof(SSubmitReq2Msg); pBuf = taosMemoryMalloc(len); if (NULL == pBuf) { return TSDB_CODE_OUT_OF_MEMORY; } - ((SMsgHead*)pBuf)->vgId = htonl(vgId); - ((SMsgHead*)pBuf)->contLen = htonl(len); - tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead)); + ((SSubmitReq2Msg*)pBuf)->header.vgId = htonl(vgId); + ((SSubmitReq2Msg*)pBuf)->header.contLen = htonl(len); + ((SSubmitReq2Msg*)pBuf)->version = htobe64(1); + tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg)); code = tEncodeSSubmitReq2(&encoder, pReq); tEncoderClear(&encoder); } -- GitLab