From 0f4de9daec0e0b9d6b7cd98bd5ef307e708e0fdd Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Fri, 17 Feb 2023 18:06:57 +0800 Subject: [PATCH] feat: compatible with older versions of wal --- source/dnode/vnode/src/sma/smaTimeRange.c | 10 +++---- source/dnode/vnode/src/tq/tqRead.c | 36 +++++++++++------------ 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 1b191dd5a5..e7f03d668e 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -257,7 +257,6 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * int32_t rows = pDataBlock->info.rows; SSubmitTbData tbData = {0}; - if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow *)))) { goto _end; @@ -313,14 +312,15 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * tEncodeSize(tEncodeSSubmitReq2, pReq, len, terrno); if (TSDB_CODE_SUCCESS == terrno) { SEncoder encoder; - len += sizeof(SMsgHead); + len += sizeof(SSubmitReq2Msg); pBuf = rpcMallocCont(len); if (NULL == pBuf) { goto _end; } - ((SMsgHead *)pBuf)->vgId = TD_VID(pVnode); - ((SMsgHead *)pBuf)->contLen = htonl(len); - tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead)); + ((SSubmitReq2Msg *)pBuf)->header.vgId = TD_VID(pVnode); + ((SSubmitReq2Msg *)pBuf)->header.contLen = htonl(len); + ((SSubmitReq2Msg *)pBuf)->version = htobe64(1); + tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg)); if (tEncodeSSubmitReq2(&encoder, pReq) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; /*vError("failed to encode submit req since %s", terrstr());*/ diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 04af05cc44..414ffda544 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -311,8 +311,8 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { tqDebug("return offset %" PRId64 ", no more valid", ret->offset.version); return -1; } - void* body = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SMsgHead)); - int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SMsgHead); + void* body = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); + int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); int64_t ver = pReader->pWalReader->pHead->head.version; #if 0 if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) { @@ -560,7 +560,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); pReader->nextBlk++; - if(pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData; + if (pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData; int32_t sversion = pSubmitTbData->sver; int64_t suid = pSubmitTbData->suid; int64_t uid = pSubmitTbData->uid; @@ -1012,7 +1012,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); pReader->nextBlk++; - if(pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData; + if (pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData; int32_t sversion = pSubmitTbData->sver; int64_t suid = pSubmitTbData->suid; int64_t uid = pSubmitTbData->uid; @@ -1022,7 +1022,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, uid, sversion, 1); if (pReader->pSchema == NULL) { tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 - "), version %d, possibly dropped table", + "), version %d, possibly dropped table", pReader->pWalReader->pWal->cfg.vgId, uid, suid, sversion); pReader->cachedSchemaSuid = 0; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; @@ -1041,7 +1041,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema STSchema* pTschema = pReader->pSchema; SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper; - int32_t numOfRows = 0; + int32_t numOfRows = 0; if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { SArray* pCols = pSubmitTbData->aCol; @@ -1054,7 +1054,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema int32_t curRow = 0; int32_t lastRow = 0; - char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols); + char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols); if (assigned == NULL) return -1; // convert and scan one block @@ -1064,9 +1064,9 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema for (int32_t i = 0; i < numOfRows; i++) { bool buildNew = false; - for (int32_t j = 0; j < numOfCols; j++){ + for (int32_t j = 0; j < numOfCols; j++) { SColData* pCol = taosArrayGet(pCols, j); - SColVal colVal; + SColVal colVal; tColDataGetValue(pCol, i, &colVal); if (curRow == 0) { assigned[j] = !COL_VAL_IS_NONE(&colVal); @@ -1087,9 +1087,9 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema lastRow = curRow; } - SSDataBlock block = {0}; + SSDataBlock block = {0}; SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); - if(pSW == NULL){ + if (pSW == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto FAIL; } @@ -1158,10 +1158,10 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema } else { SArray* pRows = pSubmitTbData->aRowP; for (int32_t i = 0; i < numOfRows; i++) { - SRow* pRow = taosArrayGetP(pRows, i); - bool buildNew = false; + SRow* pRow = taosArrayGetP(pRows, i); + bool buildNew = false; - for (int32_t j = 0; j < pTschema->numOfCols; j++){ + for (int32_t j = 0; j < pTschema->numOfCols; j++) { SColVal colVal; tRowGet(pRow, pTschema, j, &colVal); if (curRow == 0) { @@ -1183,9 +1183,9 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema lastRow = curRow; } - SSDataBlock block = {0}; + SSDataBlock block = {0}; SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); - if(pSW == NULL){ + if (pSW == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto FAIL; } @@ -1220,7 +1220,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema int32_t colActual = blockDataGetNumOfCols(pBlock); while (targetIdx < colActual) { SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx); - SColVal colVal; + SColVal colVal; tRowGet(pRow, pTschema, sourceIdx, &colVal); if (colVal.cid < pColData->info.colId) { @@ -1256,7 +1256,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema taosMemoryFree(assigned); return 0; - FAIL: +FAIL: taosMemoryFree(assigned); return -1; } -- GitLab