提交 0f4de9da 编写于 作者: X Xiaoyu Wang

feat: compatible with older versions of wal

上级 d5cd2d36
...@@ -257,7 +257,6 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * ...@@ -257,7 +257,6 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
SSubmitTbData tbData = {0}; SSubmitTbData tbData = {0};
if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow *)))) { if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow *)))) {
goto _end; goto _end;
...@@ -313,14 +312,15 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema * ...@@ -313,14 +312,15 @@ int32_t smaBlockToSubmit(SVnode *pVnode, const SArray *pBlocks, const STSchema *
tEncodeSize(tEncodeSSubmitReq2, pReq, len, terrno); tEncodeSize(tEncodeSSubmitReq2, pReq, len, terrno);
if (TSDB_CODE_SUCCESS == terrno) { if (TSDB_CODE_SUCCESS == terrno) {
SEncoder encoder; SEncoder encoder;
len += sizeof(SMsgHead); len += sizeof(SSubmitReq2Msg);
pBuf = rpcMallocCont(len); pBuf = rpcMallocCont(len);
if (NULL == pBuf) { if (NULL == pBuf) {
goto _end; goto _end;
} }
((SMsgHead *)pBuf)->vgId = TD_VID(pVnode); ((SSubmitReq2Msg *)pBuf)->header.vgId = TD_VID(pVnode);
((SMsgHead *)pBuf)->contLen = htonl(len); ((SSubmitReq2Msg *)pBuf)->header.contLen = htonl(len);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead)); ((SSubmitReq2Msg *)pBuf)->version = htobe64(1);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SSubmitReq2Msg)), len - sizeof(SSubmitReq2Msg));
if (tEncodeSSubmitReq2(&encoder, pReq) < 0) { if (tEncodeSSubmitReq2(&encoder, pReq) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
/*vError("failed to encode submit req since %s", terrstr());*/ /*vError("failed to encode submit req since %s", terrstr());*/
......
...@@ -311,8 +311,8 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { ...@@ -311,8 +311,8 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
tqDebug("return offset %" PRId64 ", no more valid", ret->offset.version); tqDebug("return offset %" PRId64 ", no more valid", ret->offset.version);
return -1; return -1;
} }
void* body = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SMsgHead)); void* body = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SMsgHead); int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
int64_t ver = pReader->pWalReader->pHead->head.version; int64_t ver = pReader->pWalReader->pHead->head.version;
#if 0 #if 0
if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) { if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
...@@ -560,7 +560,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD ...@@ -560,7 +560,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
pReader->nextBlk++; pReader->nextBlk++;
if(pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData; if (pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData;
int32_t sversion = pSubmitTbData->sver; int32_t sversion = pSubmitTbData->sver;
int64_t suid = pSubmitTbData->suid; int64_t suid = pSubmitTbData->suid;
int64_t uid = pSubmitTbData->uid; int64_t uid = pSubmitTbData->uid;
...@@ -1012,7 +1012,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema ...@@ -1012,7 +1012,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
pReader->nextBlk++; pReader->nextBlk++;
if(pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData; if (pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData;
int32_t sversion = pSubmitTbData->sver; int32_t sversion = pSubmitTbData->sver;
int64_t suid = pSubmitTbData->suid; int64_t suid = pSubmitTbData->suid;
int64_t uid = pSubmitTbData->uid; int64_t uid = pSubmitTbData->uid;
...@@ -1022,7 +1022,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema ...@@ -1022,7 +1022,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, uid, sversion, 1); pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, uid, sversion, 1);
if (pReader->pSchema == NULL) { if (pReader->pSchema == NULL) {
tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64
"), version %d, possibly dropped table", "), version %d, possibly dropped table",
pReader->pWalReader->pWal->cfg.vgId, uid, suid, sversion); pReader->pWalReader->pWal->cfg.vgId, uid, suid, sversion);
pReader->cachedSchemaSuid = 0; pReader->cachedSchemaSuid = 0;
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
...@@ -1041,7 +1041,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema ...@@ -1041,7 +1041,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
STSchema* pTschema = pReader->pSchema; STSchema* pTschema = pReader->pSchema;
SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper; SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
int32_t numOfRows = 0; int32_t numOfRows = 0;
if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
SArray* pCols = pSubmitTbData->aCol; SArray* pCols = pSubmitTbData->aCol;
...@@ -1054,7 +1054,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema ...@@ -1054,7 +1054,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
int32_t curRow = 0; int32_t curRow = 0;
int32_t lastRow = 0; int32_t lastRow = 0;
char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols); char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
if (assigned == NULL) return -1; if (assigned == NULL) return -1;
// convert and scan one block // convert and scan one block
...@@ -1064,9 +1064,9 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema ...@@ -1064,9 +1064,9 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
for (int32_t i = 0; i < numOfRows; i++) { for (int32_t i = 0; i < numOfRows; i++) {
bool buildNew = false; bool buildNew = false;
for (int32_t j = 0; j < numOfCols; j++){ for (int32_t j = 0; j < numOfCols; j++) {
SColData* pCol = taosArrayGet(pCols, j); SColData* pCol = taosArrayGet(pCols, j);
SColVal colVal; SColVal colVal;
tColDataGetValue(pCol, i, &colVal); tColDataGetValue(pCol, i, &colVal);
if (curRow == 0) { if (curRow == 0) {
assigned[j] = !COL_VAL_IS_NONE(&colVal); assigned[j] = !COL_VAL_IS_NONE(&colVal);
...@@ -1087,9 +1087,9 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema ...@@ -1087,9 +1087,9 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
lastRow = curRow; lastRow = curRow;
} }
SSDataBlock block = {0}; SSDataBlock block = {0};
SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if(pSW == NULL){ if (pSW == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL; goto FAIL;
} }
...@@ -1158,10 +1158,10 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema ...@@ -1158,10 +1158,10 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
} else { } else {
SArray* pRows = pSubmitTbData->aRowP; SArray* pRows = pSubmitTbData->aRowP;
for (int32_t i = 0; i < numOfRows; i++) { for (int32_t i = 0; i < numOfRows; i++) {
SRow* pRow = taosArrayGetP(pRows, i); SRow* pRow = taosArrayGetP(pRows, i);
bool buildNew = false; bool buildNew = false;
for (int32_t j = 0; j < pTschema->numOfCols; j++){ for (int32_t j = 0; j < pTschema->numOfCols; j++) {
SColVal colVal; SColVal colVal;
tRowGet(pRow, pTschema, j, &colVal); tRowGet(pRow, pTschema, j, &colVal);
if (curRow == 0) { if (curRow == 0) {
...@@ -1183,9 +1183,9 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema ...@@ -1183,9 +1183,9 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
lastRow = curRow; lastRow = curRow;
} }
SSDataBlock block = {0}; SSDataBlock block = {0};
SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if(pSW == NULL){ if (pSW == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL; goto FAIL;
} }
...@@ -1220,7 +1220,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema ...@@ -1220,7 +1220,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
int32_t colActual = blockDataGetNumOfCols(pBlock); int32_t colActual = blockDataGetNumOfCols(pBlock);
while (targetIdx < colActual) { while (targetIdx < colActual) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx); SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
SColVal colVal; SColVal colVal;
tRowGet(pRow, pTschema, sourceIdx, &colVal); tRowGet(pRow, pTschema, sourceIdx, &colVal);
if (colVal.cid < pColData->info.colId) { if (colVal.cid < pColData->info.colId) {
...@@ -1256,7 +1256,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema ...@@ -1256,7 +1256,7 @@ int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schema
taosMemoryFree(assigned); taosMemoryFree(assigned);
return 0; return 0;
FAIL: FAIL:
taosMemoryFree(assigned); taosMemoryFree(assigned);
return -1; return -1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册