提交 5a0ae19c 编写于 作者: C Cary Xu

feat: submit req msg iter refactor

上级 7cecc341
...@@ -240,21 +240,18 @@ typedef struct { ...@@ -240,21 +240,18 @@ typedef struct {
// head of SSubmitBlk // head of SSubmitBlk
const void* pMsg; const void* pMsg;
} SSubmitMsgIter; } SSubmitMsgIter;
#if 0
int32_t tInitSubmitMsgIterOrigin(const SSubmitReq* pMsg, SSubmitMsgIter* pIter);
int32_t tGetSubmitMsgNextOrigin(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
int32_t tInitSubmitBlkIterOrigin(SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
STSRow* tGetSubmitBlkNextOrigin(SSubmitBlkIter* pIter);
#endif
// TODO: KEEP one suite of iterator API finally.
int32_t tInitSubmitMsgIter(const SSubmitReq* pMsg, SSubmitMsgIter* pIter); int32_t tInitSubmitMsgIter(const SSubmitReq* pMsg, SSubmitMsgIter* pIter);
int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
int32_t tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter); int32_t tInitSubmitBlkIter(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter); STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
// TODO: KEEP one suite of iterator API finally.
// 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts
// 2) replace tInitSubmitMsgIterEx with tInitSubmitMsgIter later
// 3) finally, rename tInitSubmitMsgIterEx to tInitSubmitMsgIter
int32_t tInitSubmitMsgIterEx(const SSubmitReq* pMsg, SSubmitMsgIter* pIter);
int32_t tGetSubmitMsgNextEx(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
int32_t tInitSubmitBlkIterEx(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
STSRow* tGetSubmitBlkNextEx(SSubmitBlkIter* pIter);
typedef struct { typedef struct {
int32_t index; // index of failed block in submit blocks int32_t index; // index of failed block in submit blocks
int32_t vnode; // vnode index of failed block int32_t vnode; // vnode index of failed block
......
...@@ -27,8 +27,8 @@ ...@@ -27,8 +27,8 @@
#define TD_MSG_DICT_ #define TD_MSG_DICT_
#undef TD_MSG_SEG_CODE_ #undef TD_MSG_SEG_CODE_
#include "tmsgdef.h" #include "tmsgdef.h"
#if 0
int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) { int32_t tInitSubmitMsgIterOrigin(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
if (pMsg == NULL) { if (pMsg == NULL) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1; return -1;
...@@ -46,7 +46,7 @@ int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) { ...@@ -46,7 +46,7 @@ int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
return 0; return 0;
} }
int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { int32_t tGetSubmitMsgNextOrigin(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
ASSERT(pIter->len >= 0); ASSERT(pIter->len >= 0);
if (pIter->len == 0) { if (pIter->len == 0) {
...@@ -72,7 +72,7 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { ...@@ -72,7 +72,7 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
return 0; return 0;
} }
int32_t tInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { int32_t tInitSubmitBlkIterOrigin(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
if (pBlock->dataLen <= 0) return -1; if (pBlock->dataLen <= 0) return -1;
pIter->totalLen = pBlock->dataLen; pIter->totalLen = pBlock->dataLen;
pIter->len = 0; pIter->len = 0;
...@@ -80,7 +80,7 @@ int32_t tInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { ...@@ -80,7 +80,7 @@ int32_t tInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
return 0; return 0;
} }
STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) { STSRow *tGetSubmitBlkNextOrigin(SSubmitBlkIter *pIter) {
STSRow *row = pIter->row; STSRow *row = pIter->row;
if (pIter->len >= pIter->totalLen) { if (pIter->len >= pIter->totalLen) {
...@@ -93,13 +93,10 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) { ...@@ -93,13 +93,10 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) {
return row; return row;
} }
} }
#endif
// TODO: KEEP one suite of iterator API finally. // TODO: KEEP one suite of iterator API finally.
// 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
// 2) replace tInitSubmitMsgIterEx with tInitSubmitMsgIter later
// 3) finally, rename tInitSubmitMsgIterEx to tInitSubmitMsgIter
int32_t tInitSubmitMsgIterEx(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
if (pMsg == NULL) { if (pMsg == NULL) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1; return -1;
...@@ -117,7 +114,7 @@ int32_t tInitSubmitMsgIterEx(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) { ...@@ -117,7 +114,7 @@ int32_t tInitSubmitMsgIterEx(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
return 0; return 0;
} }
int32_t tGetSubmitMsgNextEx(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
ASSERT(pIter->len >= 0); ASSERT(pIter->len >= 0);
if (pIter->len == 0) { if (pIter->len == 0) {
...@@ -152,7 +149,7 @@ int32_t tGetSubmitMsgNextEx(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { ...@@ -152,7 +149,7 @@ int32_t tGetSubmitMsgNextEx(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
return 0; return 0;
} }
int32_t tInitSubmitBlkIterEx(SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { int32_t tInitSubmitBlkIter(SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
if (pMsgIter->dataLen <= 0) return -1; if (pMsgIter->dataLen <= 0) return -1;
pIter->totalLen = pMsgIter->dataLen; pIter->totalLen = pMsgIter->dataLen;
pIter->len = 0; pIter->len = 0;
...@@ -160,7 +157,7 @@ int32_t tInitSubmitBlkIterEx(SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubm ...@@ -160,7 +157,7 @@ int32_t tInitSubmitBlkIterEx(SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubm
return 0; return 0;
} }
STSRow *tGetSubmitBlkNextEx(SSubmitBlkIter *pIter) { STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) {
STSRow *row = pIter->row; STSRow *row = pIter->row;
if (pIter->len >= pIter->totalLen) { if (pIter->len >= pIter->totalLen) {
......
...@@ -40,7 +40,7 @@ typedef struct STable STable; ...@@ -40,7 +40,7 @@ typedef struct STable STable;
int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable); int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable);
void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable); void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable);
int tsdbInsertTableData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *pAffectedRows); int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, int32_t *pAffectedRows);
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols, int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo); TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo);
......
...@@ -188,7 +188,7 @@ struct STbUidStore { ...@@ -188,7 +188,7 @@ struct STbUidStore {
#define TD_VID(PVNODE) (PVNODE)->config.vgId #define TD_VID(PVNODE) (PVNODE)->config.vgId
static FORCE_INLINE bool tsdbIsRollup(SVnode* pVnode) { static FORCE_INLINE bool vnodeIsRollup(SVnode* pVnode) {
SRetention* pRetention = &(pVnode->config.tsdbCfg.retentions[0]); SRetention* pRetention = &(pVnode->config.tsdbCfg.retentions[0]);
return (pRetention->freq > 0 && pRetention->keep > 0); return (pRetention->freq > 0 && pRetention->keep > 0);
} }
......
...@@ -37,9 +37,9 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ...@@ -37,9 +37,9 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t
// pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); // pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
// iterate and convert // iterate and convert
if (tInitSubmitMsgIterEx(pMsg, &pReadHandle->msgIter) < 0) return -1; if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
while (true) { while (true) {
if (tGetSubmitMsgNextEx(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1; if (tGetSubmitMsgNext(&pReadHandle->msgIter, &pReadHandle->pBlock) < 0) return -1;
if (pReadHandle->pBlock == NULL) break; if (pReadHandle->pBlock == NULL) break;
// pReadHandle->pBlock->uid = htobe64(pReadHandle->pBlock->uid); // pReadHandle->pBlock->uid = htobe64(pReadHandle->pBlock->uid);
...@@ -50,7 +50,7 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ...@@ -50,7 +50,7 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t
// pReadHandle->pBlock->numOfRows = htons(pReadHandle->pBlock->numOfRows); // pReadHandle->pBlock->numOfRows = htons(pReadHandle->pBlock->numOfRows);
} }
if (tInitSubmitMsgIterEx(pMsg, &pReadHandle->msgIter) < 0) return -1; if (tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter) < 0) return -1;
pReadHandle->ver = ver; pReadHandle->ver = ver;
memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter)); memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter));
return 0; return 0;
...@@ -58,7 +58,7 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t ...@@ -58,7 +58,7 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t
bool tqNextDataBlock(STqReadHandle* pHandle) { bool tqNextDataBlock(STqReadHandle* pHandle) {
while (1) { while (1) {
if (tGetSubmitMsgNextEx(&pHandle->msgIter, &pHandle->pBlock) < 0) { if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
return false; return false;
} }
if (pHandle->pBlock == NULL) return false; if (pHandle->pBlock == NULL) return false;
...@@ -169,8 +169,8 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p ...@@ -169,8 +169,8 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
tdSTSRowIterInit(&iter, pTschema); tdSTSRowIterInit(&iter, pTschema);
STSRow* row; STSRow* row;
int32_t curRow = 0; int32_t curRow = 0;
tInitSubmitBlkIterEx(&pHandle->msgIter, pHandle->pBlock, &pHandle->blkIter); tInitSubmitBlkIter(&pHandle->msgIter, pHandle->pBlock, &pHandle->blkIter);
while ((row = tGetSubmitBlkNextEx(&pHandle->blkIter)) != NULL) { while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) {
tdSTSRowIterReset(&iter, row); tdSTSRowIterReset(&iter, row);
// get all wanted col of that block // get all wanted col of that block
for (int32_t i = 0; i < colActual; i++) { for (int32_t i = 0; i < colActual; i++) {
......
...@@ -190,35 +190,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey ...@@ -190,35 +190,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
return 0; return 0;
} }
int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg) { int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, int32_t *pAffectedRows) {
ASSERT(pMsg != NULL);
SSubmitMsgIter msgIter = {0};
SSubmitBlk *pBlock = NULL;
SSubmitBlkIter blkIter = {0};
STSRow *row = NULL;
terrno = TSDB_CODE_SUCCESS;
pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
while (true) {
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (pBlock == NULL) break;
pBlock->uid = htobe64(pBlock->uid);
pBlock->suid = htobe64(pBlock->suid);
pBlock->sversion = htonl(pBlock->sversion);
pBlock->dataLen = htonl(pBlock->dataLen);
pBlock->schemaLen = htonl(pBlock->schemaLen);
pBlock->numOfRows = htons(pBlock->numOfRows);
}
if (terrno != TSDB_CODE_SUCCESS) return -1;
return 0;
}
int tsdbInsertTableData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *pAffectedRows) {
// STsdbMeta *pMeta = pRepo->tsdbMeta; // STsdbMeta *pMeta = pRepo->tsdbMeta;
// int32_t points = 0; // int32_t points = 0;
// STable *pTable = NULL; // STable *pTable = NULL;
...@@ -232,15 +204,15 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *pAffectedRows ...@@ -232,15 +204,15 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *pAffectedRows
SSubmitBlk *pBlkCopy; SSubmitBlk *pBlkCopy;
// create container is nedd // create container is nedd
tptr = taosHashGet(pMemTable->pHashIdx, &(pBlock->uid), sizeof(pBlock->uid)); tptr = taosHashGet(pMemTable->pHashIdx, &(pMsgIter->uid), sizeof(pMsgIter->uid));
if (tptr == NULL) { if (tptr == NULL) {
pTbData = tsdbNewTbData(pBlock->uid); pTbData = tsdbNewTbData(pMsgIter->uid);
if (pTbData == NULL) { if (pTbData == NULL) {
return -1; return -1;
} }
// Put into hash // Put into hash
taosHashPut(pMemTable->pHashIdx, &(pBlock->uid), sizeof(pBlock->uid), &(pTbData), sizeof(pTbData)); taosHashPut(pMemTable->pHashIdx, &(pMsgIter->uid), sizeof(pMsgIter->uid), &(pTbData), sizeof(pTbData));
// Put into skiplist // Put into skiplist
tSkipListPut(pMemTable->pSlIdx, pTbData); tSkipListPut(pMemTable->pSlIdx, pTbData);
...@@ -249,10 +221,10 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *pAffectedRows ...@@ -249,10 +221,10 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *pAffectedRows
} }
// copy data to buffer pool // copy data to buffer pool
pBlkCopy = (SSubmitBlk *)vnodeBufPoolMalloc(pTsdb->mem->pPool, pBlock->dataLen + sizeof(*pBlock)); pBlkCopy = (SSubmitBlk *)vnodeBufPoolMalloc(pTsdb->mem->pPool, pMsgIter->dataLen + sizeof(*pBlock));
memcpy(pBlkCopy, pBlock, pBlock->dataLen + sizeof(*pBlock)); memcpy(pBlkCopy, pBlock, pMsgIter->dataLen + sizeof(*pBlock));
tInitSubmitBlkIter(pBlkCopy, &blkIter); tInitSubmitBlkIter(pMsgIter, pBlkCopy, &blkIter);
if (blkIter.row == NULL) return 0; if (blkIter.row == NULL) return 0;
keyMin = TD_ROW_KEY(blkIter.row); keyMin = TD_ROW_KEY(blkIter.row);
...@@ -261,15 +233,15 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *pAffectedRows ...@@ -261,15 +233,15 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *pAffectedRows
// Set statistics // Set statistics
keyMax = TD_ROW_KEY(blkIter.row); keyMax = TD_ROW_KEY(blkIter.row);
pTbData->nrows += pBlock->numOfRows; pTbData->nrows += pMsgIter->numOfRows;
if (pTbData->keyMin > keyMin) pTbData->keyMin = keyMin; if (pTbData->keyMin > keyMin) pTbData->keyMin = keyMin;
if (pTbData->keyMax < keyMax) pTbData->keyMax = keyMax; if (pTbData->keyMax < keyMax) pTbData->keyMax = keyMax;
pMemTable->nRow += pBlock->numOfRows; pMemTable->nRow += pMsgIter->numOfRows;
if (pMemTable->keyMin > keyMin) pMemTable->keyMin = keyMin; if (pMemTable->keyMin > keyMin) pMemTable->keyMin = keyMin;
if (pMemTable->keyMax < keyMax) pMemTable->keyMax = keyMax; if (pMemTable->keyMax < keyMax) pMemTable->keyMax = keyMax;
(*pAffectedRows) += pBlock->numOfRows; (*pAffectedRows) += pMsgIter->numOfRows;
return 0; return 0;
} }
......
...@@ -67,7 +67,7 @@ int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir) { ...@@ -67,7 +67,7 @@ int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir) {
goto _err; goto _err;
} }
tsdbDebug("vgId: %d tsdb is opened for %s", TD_VID(pVnode), pTsdb->path); tsdbDebug("vgId:%d tsdb is opened for %s", TD_VID(pVnode), pTsdb->path);
*ppTsdb = pTsdb; *ppTsdb = pTsdb;
return 0; return 0;
......
...@@ -351,14 +351,25 @@ static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableData ...@@ -351,14 +351,25 @@ static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableData
} }
} }
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions) {
if (vnodeIsRollup(pVnode)) {
// for(int32_t i=0; i< TSDB_; ) {
// }
}
return pVnode->pTsdb;
}
static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* pCond, uint64_t qId, uint64_t taskId) { static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* pCond, uint64_t qId, uint64_t taskId) {
STsdbReadHandle* pReadHandle = taosMemoryCalloc(1, sizeof(STsdbReadHandle)); STsdbReadHandle* pReadHandle = taosMemoryCalloc(1, sizeof(STsdbReadHandle));
if (pReadHandle == NULL) { if (pReadHandle == NULL) {
goto _end; goto _end;
} }
STsdb* pTsdb = getTsdbByRetentions(pVnode, pCond->twindow.skey, pVnode->config.tsdbCfg.retentions);
pReadHandle->order = pCond->order; pReadHandle->order = pCond->order;
pReadHandle->pTsdb = pVnode->pTsdb; pReadHandle->pTsdb = pTsdb;
pReadHandle->type = TSDB_QUERY_TYPE_ALL; pReadHandle->type = TSDB_QUERY_TYPE_ALL;
pReadHandle->cur.fid = INT32_MIN; pReadHandle->cur.fid = INT32_MIN;
pReadHandle->cur.win = TSWINDOW_INITIALIZER; pReadHandle->cur.win = TSWINDOW_INITIALIZER;
...@@ -376,7 +387,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* ...@@ -376,7 +387,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond*
snprintf(buf, tListLen(buf), "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, qId); snprintf(buf, tListLen(buf), "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, qId);
pReadHandle->idStr = strdup(buf); pReadHandle->idStr = strdup(buf);
if (tsdbInitReadH(&pReadHandle->rhelper, (STsdb*)pVnode->pTsdb) != 0) { if (tsdbInitReadH(&pReadHandle->rhelper, pReadHandle->pTsdb) != 0) {
goto _end; goto _end;
} }
...@@ -413,7 +424,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* ...@@ -413,7 +424,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond*
pReadHandle->suppInfo.plist = taosMemoryCalloc(taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn), POINTER_BYTES); pReadHandle->suppInfo.plist = taosMemoryCalloc(taosArrayGetSize(pReadHandle->suppInfo.defaultLoadColumn), POINTER_BYTES);
} }
pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->pVnode->config.tsdbCfg.maxRows); pReadHandle->pDataCols = tdNewDataCols(1000, pVnode->config.tsdbCfg.maxRows);
if (pReadHandle->pDataCols == NULL) { if (pReadHandle->pDataCols == NULL) {
tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr); tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
......
...@@ -702,25 +702,25 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers ...@@ -702,25 +702,25 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers
SInterval interval = {0}; SInterval interval = {0};
TSKEY lastWinSKey = INT64_MIN; TSKEY lastWinSKey = INT64_MIN;
if (tInitSubmitMsgIterEx(pMsg, &msgIter) != TSDB_CODE_SUCCESS) { if (tInitSubmitMsgIter(pMsg, &msgIter) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
while (true) { while (true) {
tGetSubmitMsgNextEx(&msgIter, &pBlock); tGetSubmitMsgNext(&msgIter, &pBlock);
if (!pBlock) break; if (!pBlock) break;
STSmaWrapper *pSW = NULL; STSmaWrapper *pSW = NULL;
STSma *pTSma = NULL; STSma *pTSma = NULL;
SSubmitBlkIter blkIter = {0}; SSubmitBlkIter blkIter = {0};
if (tInitSubmitBlkIterEx(&msgIter, pBlock, &blkIter) != TSDB_CODE_SUCCESS) { if (tInitSubmitBlkIter(&msgIter, pBlock, &blkIter) != TSDB_CODE_SUCCESS) {
pSW = tdFreeTSmaWrapper(pSW); pSW = tdFreeTSmaWrapper(pSW);
break; break;
} }
while (true) { while (true) {
STSRow *row = tGetSubmitBlkNextEx(&blkIter); STSRow *row = tGetSubmitBlkNext(&blkIter);
if (!row) { if (!row) {
tdFreeTSmaWrapper(pSW); tdFreeTSmaWrapper(pSW);
break; break;
...@@ -1966,9 +1966,9 @@ static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { ...@@ -1966,9 +1966,9 @@ static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
if (tInitSubmitMsgIterEx(pMsg, &msgIter) < 0) return -1; if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
while (true) { while (true) {
if (tGetSubmitMsgNextEx(&msgIter, &pBlock) < 0) return -1; if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (!pBlock) break; if (!pBlock) break;
tsdbUidStorePut(pStore, msgIter.suid, NULL); tsdbUidStorePut(pStore, msgIter.suid, NULL);
......
...@@ -38,11 +38,11 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp * ...@@ -38,11 +38,11 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *
while (true) { while (true) {
tGetSubmitMsgNext(&msgIter, &pBlock); tGetSubmitMsgNext(&msgIter, &pBlock);
if (pBlock == NULL) break; if (pBlock == NULL) break;
if (tsdbInsertTableData(pTsdb, pBlock, &affectedrows) < 0) { if (tsdbInsertTableData(pTsdb, &msgIter, pBlock, &affectedrows) < 0) {
return -1; return -1;
} }
numOfRows += pBlock->numOfRows; numOfRows += msgIter.numOfRows;
} }
if (pRsp != NULL) { if (pRsp != NULL) {
...@@ -66,20 +66,20 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { ...@@ -66,20 +66,20 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
TSKEY maxKey = now + tsTickPerDay[pCfg->precision] * pCfg->days; TSKEY maxKey = now + tsTickPerDay[pCfg->precision] * pCfg->days;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
pMsg->length = htonl(pMsg->length); // pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); // pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
while (true) { while (true) {
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (pBlock == NULL) break; if (pBlock == NULL) break;
pBlock->uid = htobe64(pBlock->uid); // pBlock->uid = htobe64(pBlock->uid);
pBlock->suid = htobe64(pBlock->suid); // pBlock->suid = htobe64(pBlock->suid);
pBlock->sversion = htonl(pBlock->sversion); // pBlock->sversion = htonl(pBlock->sversion);
pBlock->dataLen = htonl(pBlock->dataLen); // pBlock->dataLen = htonl(pBlock->dataLen);
pBlock->schemaLen = htonl(pBlock->schemaLen); // pBlock->schemaLen = htonl(pBlock->schemaLen);
pBlock->numOfRows = htons(pBlock->numOfRows); // pBlock->numOfRows = htons(pBlock->numOfRows);
#if 0 #if 0
if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) { if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) {
......
...@@ -96,24 +96,24 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -96,24 +96,24 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
} }
// open tsdb // open tsdb
if (tsdbIsRollup(pVnode)) { if (vnodeIsRollup(pVnode)) {
if (tsdbOpen(pVnode, TSDB_TYPE_RSMA_L0) < 0) { if (tsdbOpen(pVnode, TSDB_TYPE_RSMA_L0) < 0) {
vError("vgId: %d failed to open vnode rsma0 since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d failed to open vnode rsma0 since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
if (tsdbOpen(pVnode, TSDB_TYPE_RSMA_L1) < 0) { if (tsdbOpen(pVnode, TSDB_TYPE_RSMA_L1) < 0) {
vError("vgId: %d failed to open vnode rsma1 since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d failed to open vnode rsma1 since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
if (tsdbOpen(pVnode, TSDB_TYPE_RSMA_L2) < 0) { if (tsdbOpen(pVnode, TSDB_TYPE_RSMA_L2) < 0) {
vError("vgId: %d failed to open vnode rsma2 since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d failed to open vnode rsma2 since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
} else { } else {
if (tsdbOpen(pVnode, TSDB_TYPE_TSDB) < 0) { if (tsdbOpen(pVnode, TSDB_TYPE_TSDB) < 0) {
vError("vgId: %d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
} }
} }
...@@ -160,8 +160,8 @@ _err: ...@@ -160,8 +160,8 @@ _err:
if (pVnode->pWal) walClose(pVnode->pWal); if (pVnode->pWal) walClose(pVnode->pWal);
if (pVnode->pTsdb) tsdbClose(pVnode->pTsdb); if (pVnode->pTsdb) tsdbClose(pVnode->pTsdb);
if (pVnode->pMeta) metaClose(pVnode->pMeta); if (pVnode->pMeta) metaClose(pVnode->pMeta);
tsdbClose(VND_RSMA1(pVnode)); tsdbClose(VND_RSMA1(pVnode));
tsdbClose(VND_RSMA2(pVnode)); tsdbClose(VND_RSMA2(pVnode));
tsem_destroy(&(pVnode->canCommit)); tsem_destroy(&(pVnode->canCommit));
taosMemoryFree(pVnode); taosMemoryFree(pVnode);
return NULL; return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册