diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 484b6646b5498f29b726d37480ff97cdabcd41a8..925197d708ff15464df3835e55f98a38dc2861f1 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -91,9 +91,9 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { pCfg->keep = pCreate->daysToKeep0; pCfg->streamMode = pCreate->streamMode; pCfg->isWeak = true; - pCfg->tsdbCfg.keep = pCreate->daysToKeep0; - pCfg->tsdbCfg.keep1 = pCreate->daysToKeep2; pCfg->tsdbCfg.keep2 = pCreate->daysToKeep0; + pCfg->tsdbCfg.keep0 = pCreate->daysToKeep2; + pCfg->tsdbCfg.keep1 = pCreate->daysToKeep0; pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize; pCfg->tsdbCfg.retentions = pCreate->pRetensions; pCfg->metaCfg.lruSize = pCreate->cacheBlockSize; @@ -121,6 +121,8 @@ static void vmGenerateWrapperCfg(SVnodesMgmt *pMgmt, SCreateVnodeReq *pCreate, S int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { SRpcMsg *pReq = &pMsg->rpcMsg; SCreateVnodeReq createReq = {0}; + char path[TSDB_FILENAME_LEN]; + if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; @@ -143,6 +145,14 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return -1; } + // create vnode + snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId); + if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) { + tFreeSCreateVnodeReq(&createReq); + dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr()); + return -1; + } + SMsgCb msgCb = pMgmt->pDnode->data.msgCb; msgCb.pWrapper = pMgmt->pWrapper; msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 25c6ffc1ad36126e65701d3647a306e69dac9e1d..303376dba44f2cda60ae39875b9a96a0aab8c053 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -124,12 +124,12 @@ struct STsdbCfg { int8_t precision; int8_t update; int8_t compression; - int32_t daysPerFile; - int32_t minRowsPerFileBlock; - int32_t maxRowsPerFileBlock; - int32_t keep; - int32_t keep1; + int32_t days; + int32_t minRows; + int32_t maxRows; int32_t keep2; + int32_t keep0; + int32_t keep1; uint64_t lruCacheSize; SArray *retentions; }; diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 913fec64ed3e9f88f417641f4d007b4a5a6a0ce4..cb40900e81c5e8b695d95232c794de26ab09b98f 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -43,6 +43,7 @@ int vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg); // vnodeCommit ==================== int vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); int vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo); +int vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo); #if 1 // SVBufPool diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index b0b87665edcd793537e1d35d089415a500b401f9..e95878b04e3b88f42de31135d0d295344c1a3037 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -27,6 +27,7 @@ #include "tdbInt.h" #include "tfs.h" #include "tglobal.h" +#include "tjson.h" #include "tlist.h" #include "tlockfree.h" #include "tlosertree.h" diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index eb3266338720ae8e1222d54db16389b192e98b45..a1edb7cd9c32148ccfda3ccef7597b4f20544304 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -55,7 +55,7 @@ typedef struct { #define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh)) #define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh)) #define TSDB_COMMIT_EXBUF(ch) TSDB_READ_EXBUF(&((ch)->readh)) -#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock) +#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRows) #define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch))) static void tsdbStartCommit(STsdb *pRepo); @@ -217,14 +217,14 @@ void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) { TSKEY minKey, midKey, maxKey, now; now = taosGetTimestamp(pCfg->precision); - minKey = now - pCfg->keep * tsTickPerDay[pCfg->precision]; - midKey = now - pCfg->keep2 * tsTickPerDay[pCfg->precision]; - maxKey = now - pCfg->keep1 * tsTickPerDay[pCfg->precision]; + minKey = now - pCfg->keep2 * tsTickPerDay[pCfg->precision]; + midKey = now - pCfg->keep1 * tsTickPerDay[pCfg->precision]; + maxKey = now - pCfg->keep0 * tsTickPerDay[pCfg->precision]; pRtn->minKey = minKey; - pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision)); - pRtn->midFid = (int)(TSDB_KEY_FID(midKey, pCfg->daysPerFile, pCfg->precision)); - pRtn->maxFid = (int)(TSDB_KEY_FID(maxKey, pCfg->daysPerFile, pCfg->precision)); + pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->days, pCfg->precision)); + pRtn->midFid = (int)(TSDB_KEY_FID(midKey, pCfg->days, pCfg->precision)); + pRtn->maxFid = (int)(TSDB_KEY_FID(maxKey, pCfg->days, pCfg->precision)); tsdbDebug("vgId:%d now:%" PRId64 " minKey:%" PRId64 " minFid:%d, midFid:%d, maxFid:%d", REPO_ID(pRepo), now, minKey, pRtn->minFid, pRtn->midFid, pRtn->maxFid); } @@ -286,7 +286,7 @@ static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo) { return -1; } - pCommith->pDataCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock); + pCommith->pDataCols = tdNewDataCols(0, pCfg->maxRows); if (pCommith->pDataCols == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tsdbDestroyCommitH(pCommith); @@ -319,7 +319,7 @@ static int tsdbNextCommitFid(SCommitH *pCommith) { if (nextKey == TSDB_DATA_TIMESTAMP_NULL) { continue; } else { - int tfid = (int)(TSDB_KEY_FID(nextKey, pCfg->daysPerFile, pCfg->precision)); + int tfid = (int)(TSDB_KEY_FID(nextKey, pCfg->days, pCfg->precision)); if (fid == TSDB_IVLD_FID || fid > tfid) { fid = tfid; } @@ -346,7 +346,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { ASSERT(pSet == NULL || pSet->fid == fid); tsdbResetCommitFile(pCommith); - tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &(pCommith->minKey), &(pCommith->maxKey)); + tsdbGetFidKeyRange(pCfg->days, pCfg->precision, fid, &(pCommith->minKey), &(pCommith->maxKey)); // Set and open files if (tsdbSetAndOpenCommitFile(pCommith, pSet, fid) < 0) { @@ -1210,8 +1210,8 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDF int64_t offset = 0, offsetAggr = 0; int rowsToWrite = pDataCols->numOfRows; - ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock); - ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock); + ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRows); + ASSERT((!isLast) || rowsToWrite < pCfg->minRows); // Make buffer space if (tsdbMakeRoom(ppBuf, tsdbBlockStatisSize(pDataCols->numOfCols, SBlockVerLatest)) < 0) { @@ -1460,7 +1460,7 @@ static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLi if (pCommith->pDataCols->numOfRows <= 0) break; - if (toData || pCommith->pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) { + if (toData || pCommith->pDataCols->numOfRows >= pCfg->minRows) { pDFile = TSDB_COMMIT_DATA_FILE(pCommith); isLast = false; } else { @@ -1619,7 +1619,7 @@ static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols if (pCommith->pDataCols->numOfRows == 0) break; if (isLastOneBlock) { - if (pCommith->pDataCols->numOfRows < pCfg->minRowsPerFileBlock) { + if (pCommith->pDataCols->numOfRows < pCfg->minRows) { pDFile = TSDB_COMMIT_LAST_FILE(pCommith); isLast = true; } else { @@ -1667,7 +1667,8 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt if (tdGetColDataOfRow(&sVal, pDataCols->cols + i, *iter, pDataCols->bitmapMode) < 0) { TASSERT(0); } - tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints, pTarget->bitmapMode); + tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints, + pTarget->bitmapMode); } ++pTarget->numOfRows; @@ -1774,11 +1775,11 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p ASSERT(mergeRows > 0); - if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pInfo->nOperations <= pCfg->maxRowsPerFileBlock) { + if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pInfo->nOperations <= pCfg->maxRows) { if (pBlock->last) { - if (pCommith->isLFileSame && mergeRows < pCfg->minRowsPerFileBlock) return true; + if (pCommith->isLFileSame && mergeRows < pCfg->minRows) return true; } else { - if (pCommith->isDFileSame && mergeRows <= pCfg->maxRowsPerFileBlock) return true; + if (pCommith->isDFileSame && mergeRows <= pCfg->maxRows) return true; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index bd3888864d17a9de59bbe3661464606e765fe7c1..866c02cbb316063b122ba87e2d85cbcbe8a39d46 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -190,8 +190,8 @@ static int tsdbAddDFileSetToStatus(SFSStatus *pStatus, const SDFileSet *pSet) { // ================== STsdbFS STsdbFS *tsdbNewFS(const STsdbCfg *pCfg) { - int keep = pCfg->keep; - int days = pCfg->daysPerFile; + int keep = pCfg->keep2; + int days = pCfg->days; int maxFSet = TSDB_MAX_FSETS(keep, days); STsdbFS *pfs; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 5a477e646cb4ea2cf94cde85608dd813bb929a14..5f401c9b2bd2cb4ff871d04c0dabcc1dbf2d5f4c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -19,9 +19,9 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg); static int tsdbMemTableInsertTbData(STsdb *pRepo, SSubmitBlk *pBlock, int32_t *pAffectedRows); static STbData *tsdbNewTbData(tb_uid_t uid); static void tsdbFreeTbData(STbData *pTbData); -static char * tsdbGetTsTupleKey(const void *data); +static char *tsdbGetTsTupleKey(const void *data); static int tsdbTbDataComp(const void *arg1, const void *arg2); -static char * tsdbTbDataGetUid(const void *arg); +static char *tsdbTbDataGetUid(const void *arg); static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row); STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb) { @@ -74,7 +74,7 @@ void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable) { } int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitReq *pMsg, SSubmitRsp *pRsp) { - SSubmitBlk * pBlock = NULL; + SSubmitBlk *pBlock = NULL; SSubmitMsgIter msgIter = {0}; int32_t affectedrows = 0, numOfRows = 0; @@ -119,12 +119,12 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo) { ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0); if (pIter == NULL) return 0; - STSchema * pSchema = NULL; + STSchema *pSchema = NULL; TSKEY rowKey = 0; TSKEY fKey = 0; bool isRowDel = false; int filterIter = 0; - STSRow * row = NULL; + STSRow *row = NULL; SMergeInfo mInfo; if (pMergeInfo == NULL) pMergeInfo = &mInfo; @@ -259,12 +259,12 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) { ASSERT(pMsg != NULL); // STsdbMeta * pMeta = pTsdb->tsdbMeta; SSubmitMsgIter msgIter = {0}; - SSubmitBlk * pBlock = NULL; + SSubmitBlk *pBlock = NULL; SSubmitBlkIter blkIter = {0}; - STSRow * row = NULL; + STSRow *row = NULL; TSKEY now = taosGetTimestamp(pTsdb->config.precision); - TSKEY minKey = now - tsTickPerDay[pTsdb->config.precision] * pTsdb->config.keep; - TSKEY maxKey = now + tsTickPerDay[pTsdb->config.precision] * pTsdb->config.daysPerFile; + TSKEY minKey = now - tsTickPerDay[pTsdb->config.precision] * pTsdb->config.keep2; + TSKEY maxKey = now + tsTickPerDay[pTsdb->config.precision] * pTsdb->config.days; terrno = TSDB_CODE_SUCCESS; pMsg->length = htonl(pMsg->length); @@ -332,9 +332,9 @@ static int tsdbMemTableInsertTbData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *p // STable *pTable = NULL; SSubmitBlkIter blkIter = {0}; STsdbMemTable *pMemTable = pTsdb->mem; - void * tptr; - STbData * pTbData; - STSRow * row; + void *tptr; + STbData *pTbData; + STSRow *row; TSKEY keyMin; TSKEY keyMax; @@ -504,7 +504,7 @@ int tsdbInsertDataToMemTable(STsdbMemTable *pMemTable, SSubmitReq *pMsg) { #include "tskiplist.h" #define TSDB_DATA_SKIPLIST_LEVEL 5 -#define TSDB_MAX_INSERT_BATCH 512 +#define TSDB_MAX_INSERT_BATCH 512 typedef struct { int32_t totalLen; diff --git a/source/dnode/vnode/src/tsdb/tsdbOptions.c b/source/dnode/vnode/src/tsdb/tsdbOptions.c index 2c57a7406e4733799c88b7a1286a64028249ea72..3560c9feaadfbef4c02ac2a647857e48d5ccf4e2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOptions.c +++ b/source/dnode/vnode/src/tsdb/tsdbOptions.c @@ -17,12 +17,12 @@ const STsdbCfg defautlTsdbOptions = {.precision = 0, .lruCacheSize = 0, - .daysPerFile = 10, - .minRowsPerFileBlock = 100, - .maxRowsPerFileBlock = 4096, - .keep = 3650, - .keep1 = 3650, + .days = 10, + .minRows = 100, + .maxRows = 4096, .keep2 = 3650, + .keep0 = 3650, + .keep1 = 3650, .update = 0, .compression = TWO_STAGE_COMP}; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 9509dfa462a9e53aae9d284f6b5c5254de92dfe3..1314ef7d1eef95397125198e45fa4c6ba98478a0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -314,7 +314,7 @@ static int64_t getEarliestValidTimestamp(STsdb* pTsdb) { STsdbCfg* pCfg = &pTsdb->config; int64_t now = taosGetTimestamp(pCfg->precision); - return now - (tsTickPerDay[pCfg->precision] * pCfg->keep) + 1; // needs to add one tick + return now - (tsTickPerDay[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick } static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, STsdbQueryCond* pCond) { @@ -404,7 +404,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, pReadHandle->defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true); } - pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->config.maxRowsPerFileBlock); + pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->config.maxRows); if (pReadHandle->pDataCols == NULL) { tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr); terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; @@ -2199,7 +2199,7 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi break; } - tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey); + tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey); // current file are not overlapped with query time window, ignore remain files if ((ASCENDING_TRAVERSE(pTsdbReadHandle->order) && win.skey > pTsdbReadHandle->window.ekey) || @@ -2295,7 +2295,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* // find the start data block in file pTsdbReadHandle->locateStart = true; STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config; - int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->daysPerFile, pCfg->precision); + int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision); tsdbRLockFS(pFileHandle); tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order); @@ -2321,7 +2321,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo* break; } - tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey); + tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey); // current file are not overlapped with query time window, ignore remain files if ((ascTraverse && win.skey > pTsdbReadHandle->window.ekey) || @@ -2396,7 +2396,7 @@ static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exis if (!pTsdbReadHandle->locateStart) { pTsdbReadHandle->locateStart = true; STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config; - int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->daysPerFile, pCfg->precision); + int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision); tsdbRLockFS(pFileHandle); tsdbFSIterInit(&pTsdbReadHandle->fileIter, pFileHandle, pTsdbReadHandle->order); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c index e31ede09cc0aa2cefdacca42980783785de4ab1a..0b5a53dd9c8a278598522fb95e7df995107ec50b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c @@ -43,14 +43,14 @@ int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo) { return -1; } - pReadh->pDCols[0] = tdNewDataCols(0, pCfg->maxRowsPerFileBlock); + pReadh->pDCols[0] = tdNewDataCols(0, pCfg->maxRows); if (pReadh->pDCols[0] == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tsdbDestroyReadH(pReadh); return -1; } - pReadh->pDCols[1] = tdNewDataCols(0, pCfg->maxRowsPerFileBlock); + pReadh->pDCols[1] = tdNewDataCols(0, pCfg->maxRows); if (pReadh->pDCols[1] == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tsdbDestroyReadH(pReadh); @@ -276,8 +276,8 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) { return 0; } -int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, - int numOfColsIds, bool mergeBitmap) { +int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds, + bool mergeBitmap) { ASSERT(pBlock->numOfSubBlocks > 0); int8_t update = pReadh->pRepo->config.update; @@ -513,7 +513,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat tdResetDataCols(pDataCols); - if(tsdbIsSupBlock(pBlock)) { + if (tsdbIsSupBlock(pBlock)) { tdDataColsSetBitmapI(pDataCols); } @@ -710,7 +710,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols * tdResetDataCols(pDataCols); - if(tsdbIsSupBlock(pBlock)) { + if (tsdbIsSupBlock(pBlock)) { tdDataColsSetBitmapI(pDataCols); } @@ -836,7 +836,7 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc } if (tsdbCheckAndDecodeColumnData(pDataCol, pReadh->pBuf, pBlockCol->len, pBlockCol->blen, pBlock->algorithm, - pBlock->numOfRows, tLenBitmap, pCfg->maxRowsPerFileBlock, pReadh->pCBuf, + pBlock->numOfRows, tLenBitmap, pCfg->maxRows, pReadh->pCBuf, (int32_t)taosTSizeof(pReadh->pCBuf)) < 0) { tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile), pBlockCol->colId, offset); diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 7de5a0d5a9cf0419246658857eb6e4131afeffe0..7abdf22073ce61cbf9fa4272444c122f2f6a5491 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -106,7 +106,8 @@ struct SSmaStat { // expired window static int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version); -static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey, int64_t version); +static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey, + int64_t version); static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat); static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem); static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat); @@ -197,7 +198,7 @@ static SPoolMem *openPool() { static void clearPool(SPoolMem *pPool) { if (!pPool) return; - + SPoolMem *pMem; do { @@ -544,7 +545,8 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { return TSDB_CODE_SUCCESS; }; -static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey, int64_t version) { +static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey, + int64_t version) { SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid)); if (pItem == NULL) { // TODO: use TSDB_SMA_STAT_EXPIRED and update by stream computing later @@ -946,7 +948,7 @@ static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, int64_t indexUid, int32_t */ static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel) { STsdbCfg *pCfg = REPO_CFG(pTsdb); - int32_t daysPerFile = pCfg->daysPerFile; + int32_t daysPerFile = pCfg->days; if (storageLevel == SMA_STORAGE_LEVEL_TSDB) { int32_t days = SMA_STORAGE_TSDB_TIMES * (interval / tsTickPerDay[pCfg->precision]); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index fc74ee92537e7ef533998dde04c465665d229d3b..fa249d3ba1fe6323e0ca9af8f2d9d4f496aceac6 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -18,8 +18,8 @@ #define VND_INFO_FNAME "vnode.json" #define VND_INFO_FNAME_TMP "vnode_tmp.json" -static int vnodeEncodeInfo(const SVnodeInfo *pInfo, uint8_t **ppData, int *len); -static int vnodeDecodeInfo(uint8_t *pData, int len, SVnodeInfo *pInfo); +static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); +static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo); static int vnodeStartCommit(SVnode *pVnode); static int vnodeEndCommit(SVnode *pVnode); static int vnodeCommit(void *arg); @@ -28,16 +28,14 @@ static void vnodeWaitCommit(SVnode *pVnode); int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) { char fname[TSDB_FILENAME_LEN]; TdFilePtr pFile; - uint8_t *data; - int len; + char *data; snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP); // encode info data = NULL; - len = 0; - if (vnodeEncodeInfo(pInfo, &data, &len) < 0) { + if (vnodeEncodeInfo(pInfo, &data) < 0) { return -1; } @@ -48,7 +46,7 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) { return -1; } - if (taosWriteFile(pFile, data, len) < 0) { + if (taosWriteFile(pFile, data, strlen(data)) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } @@ -90,9 +88,53 @@ int vnodeCommitInfo(const char *dir, const SVnodeInfo *pInfo) { return 0; } -int vnodeLoadInfo(const char *dir) { - // TODO +int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) { + char fname[TSDB_FILENAME_LEN]; + TdFilePtr pFile = NULL; + char *pData = NULL; + int64_t size; + + snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME); + + // read info + pFile = taosOpenFile(fname, TD_FILE_READ); + if (pFile == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + + if (taosFStatFile(pFile, &size, NULL) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + pData = taosMemoryMalloc(size); + if (pData == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + if (taosReadFile(pFile, pData, size) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + taosCloseFile(&pFile); + + // decode info + if (vnodeDecodeInfo(pData, pInfo) < 0) { + taosMemoryFree(pData); + return -1; + } + + taosMemoryFree(pData); + return 0; + +_err: + taosCloseFile(&pFile); + taosMemoryFree(pData); + return -1; } int vnodeAsyncCommit(SVnode *pVnode) { @@ -137,12 +179,131 @@ static int vnodeEndCommit(SVnode *pVnode) { static FORCE_INLINE void vnodeWaitCommit(SVnode *pVnode) { tsem_wait(&pVnode->canCommit); } -static int vnodeEncodeInfo(const SVnodeInfo *pInfo, uint8_t **ppData, int *len) { - // TODO +static int vnodeEncodeConfig(const void *pObj, SJson *pJson) { + const SVnodeCfg *pCfg = (SVnodeCfg *)pObj; + + if (tjsonAddIntegerToObject(pJson, "vgId", pCfg->vgId) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "dbId", pCfg->dbId) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "wsize", pCfg->wsize) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "ssize", pCfg->ssize) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "lsize", pCfg->lsize) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "isHeap", pCfg->isHeapAllocator) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "ttl", pCfg->ttl) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "keep", pCfg->keep) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "streamMode", pCfg->streamMode) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "isWeak", pCfg->isWeak) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "daysPerFile", pCfg->tsdbCfg.days) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "minRows", pCfg->tsdbCfg.minRows) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "maxRows", pCfg->tsdbCfg.maxRows) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "keep0", pCfg->tsdbCfg.keep0) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1; + if (tjsonAddIntegerToObject(pJson, "lruCacheSize", pCfg->tsdbCfg.lruCacheSize) < 0) return -1; + return 0; } -static int vnodeDecodeInfo(uint8_t *pData, int len, SVnodeInfo *pInfo) { - // TODO +static int vnodeDecodeConfig(const SJson *pJson, void *pObj) { + SVnodeCfg *pCfg = (SVnodeCfg *)pObj; + + if (tjsonGetNumberValue(pJson, "vgId", pCfg->vgId) < 0) return -1; + if (tjsonGetNumberValue(pJson, "dbId", pCfg->dbId) < 0) return -1; + if (tjsonGetNumberValue(pJson, "wsize", pCfg->wsize) < 0) return -1; + if (tjsonGetNumberValue(pJson, "ssize", pCfg->ssize) < 0) return -1; + if (tjsonGetNumberValue(pJson, "lsize", pCfg->lsize) < 0) return -1; + if (tjsonGetNumberValue(pJson, "isHeap", pCfg->isHeapAllocator) < 0) return -1; + if (tjsonGetNumberValue(pJson, "ttl", pCfg->ttl) < 0) return -1; + if (tjsonGetNumberValue(pJson, "keep", pCfg->keep) < 0) return -1; + if (tjsonGetNumberValue(pJson, "streamMode", pCfg->streamMode) < 0) return -1; + if (tjsonGetNumberValue(pJson, "isWeak", pCfg->isWeak) < 0) return -1; + if (tjsonGetNumberValue(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1; + if (tjsonGetNumberValue(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1; + if (tjsonGetNumberValue(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1; + if (tjsonGetNumberValue(pJson, "daysPerFile", pCfg->tsdbCfg.days) < 0) return -1; + if (tjsonGetNumberValue(pJson, "minRows", pCfg->tsdbCfg.minRows) < 0) return -1; + if (tjsonGetNumberValue(pJson, "maxRows", pCfg->tsdbCfg.maxRows) < 0) return -1; + if (tjsonGetNumberValue(pJson, "keep0", pCfg->tsdbCfg.keep0) < 0) return -1; + if (tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1; + if (tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1; + if (tjsonGetNumberValue(pJson, "lruCacheSize", pCfg->tsdbCfg.lruCacheSize) < 0) return -1; + + return 0; +} + +static int vnodeEncodeState(const void *pObj, SJson *pJson) { + const SVState *pState = (SVState *)pObj; + + if (tjsonAddIntegerToObject(pJson, "commit version", pState->committed) < 0) return -1; + return 0; } + +static int vnodeDecodeState(const SJson *pJson, void *pObj) { + SVState *pState = (SVState *)pObj; + + if (tjsonGetNumberValue(pJson, "commit version", pState->committed) < 0) return -1; + + return 0; +} + +static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) { + SJson *pJson; + char *pData; + + *ppData = NULL; + + pJson = tjsonCreateObject(); + if (pJson == NULL) { + return -1; + } + + if (tjsonAddObject(pJson, "config", vnodeEncodeConfig, (void *)&pInfo->config) < 0) { + goto _err; + } + + if (tjsonAddObject(pJson, "state", vnodeEncodeState, (void *)&pInfo->state) < 0) { + goto _err; + } + + pData = tjsonToString(pJson); + if (pData == NULL) { + goto _err; + } + + tjsonDelete(pJson); + + *ppData = pData; + return 0; + +_err: + tjsonDelete(pJson); + return -1; +} + +static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) { + SJson *pJson = NULL; + + pJson = tjsonCreateObject(); + if (pJson == NULL) { + return -1; + } + + if (tjsonToObject(pJson, "config", vnodeDecodeConfig, (void *)&pInfo->config) < 0) { + goto _err; + } + + if (tjsonToObject(pJson, "state", vnodeDecodeState, (void *)&pInfo->state) < 0) { + goto _err; + } + + tjsonDelete(pJson); + + return 0; + +_err: + tjsonDelete(pJson); + return -1; +} diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 5fa14f6018bd2bd78516214aec067b74bd82fecd..241c26ab1c38f6ef54d85da35d2b7bb113c2b050 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -46,6 +46,8 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { return -1; } + vInfo("vgId: %d vnode is created", pCfg->vgId); + return 0; } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 93bc16a7a2e2669d5e8c36c59408f3e1fb72d66b..4e59e1bea6548d4c63ddc9b180639d129e3f480f 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -14,12 +14,12 @@ */ #include "catalog.h" +#include "command.h" #include "query.h" #include "schedulerInt.h" #include "tmsg.h" #include "tref.h" #include "trpc.h" -#include "command.h" SSchedulerMgmt schMgmt = {0}; @@ -68,8 +68,8 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel * } int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql, - int64_t startTs, bool syncSchedule) { - int32_t code = 0; + int64_t startTs, bool syncSchedule) { + int32_t code = 0; SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob)); if (NULL == pJob) { qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob)); @@ -141,7 +141,6 @@ _return: SCH_RET(code); } - void schFreeRpcCtx(SRpcCtx *pCtx) { if (NULL == pCtx) { return; @@ -1047,12 +1046,12 @@ _return: int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) { SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed); - + atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows)); atomic_store_ptr(&pJob->resData, pRsp); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED); - + schProcessOnDataFetched(pJob); return TSDB_CODE_SUCCESS; @@ -1146,7 +1145,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch if (NULL == msg) { SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } - + if (!SCH_IS_EXPLAIN_JOB(pJob)) { SCH_TASK_ELOG("invalid msg received for none explain query, msg type:%s", TMSG_INFO(msgType)); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); @@ -1180,13 +1179,13 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch } if (SCH_IS_EXPLAIN_JOB(pJob)) { - if (rsp->completed) { + if (rsp->completed) { SRetrieveTableRsp *pRsp = NULL; SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp)); if (pRsp) { SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp)); } - + return TSDB_CODE_SUCCESS; } @@ -1238,23 +1237,24 @@ _return: } int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) { - int32_t s = taosHashGetSize(pTaskList); - if (s <= 0) { - return TSDB_CODE_SUCCESS; - } - - SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId)); - if (NULL == task || NULL == (*task)) { - return TSDB_CODE_SUCCESS; - } + int32_t s = taosHashGetSize(pTaskList); + if (s <= 0) { + return TSDB_CODE_SUCCESS; + } + + SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId)); + if (NULL == task || NULL == (*task)) { + return TSDB_CODE_SUCCESS; + } - *pTask = *task; + *pTask = *task; - return TSDB_CODE_SUCCESS; + return TSDB_CODE_SUCCESS; } int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCode) { - if (rspCode || NULL == pTask->execNodes || taosArrayGetSize(pTask->execNodes) > 1 || taosArrayGetSize(pTask->execNodes) <= 0) { + if (rspCode || NULL == pTask->execNodes || taosArrayGetSize(pTask->execNodes) > 1 || + taosArrayGetSize(pTask->execNodes) <= 0) { return TSDB_CODE_SUCCESS; } @@ -1264,7 +1264,6 @@ int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCo return TSDB_CODE_SUCCESS; } - int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) { int32_t code = 0; SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; @@ -1282,13 +1281,15 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in if (TDMT_VND_EXPLAIN_RSP == msgType) { schGetTaskFromTaskList(pJob->succTasks, pParam->taskId, &pTask); } else { - SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId); + SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, + pParam->taskId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } } - + if (NULL == pTask) { - SCH_JOB_ELOG("task not found in execList & succList, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId); + SCH_JOB_ELOG("task not found in execList & succList, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, + pParam->taskId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } @@ -1444,7 +1445,7 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) { } int32_t schGenerateTaskCallBackAHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, SMsgSendInfo **pMsgSendInfo) { - int32_t code = 0; + int32_t code = 0; SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (NULL == msgSendInfo) { SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo)); @@ -1565,7 +1566,7 @@ _return: } int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { - int32_t code = 0; + int32_t code = 0; SMsgSendInfo *pReadyMsgSendInfo = NULL; SMsgSendInfo *pExplainMsgSendInfo = NULL; @@ -1578,7 +1579,7 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, TDMT_VND_RES_READY, &pReadyMsgSendInfo)); SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, TDMT_VND_EXPLAIN, &pExplainMsgSendInfo)); - int32_t msgType = TDMT_VND_RES_READY_RSP; + int32_t msgType = TDMT_VND_RES_READY_RSP; SRpcCtxVal ctxVal = {.val = pReadyMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal}; if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) { SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType); @@ -1599,7 +1600,7 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { _return: taosHashCleanup(pCtx->args); - + if (pReadyMsgSendInfo) { taosMemoryFreeClear(pReadyMsgSendInfo->param); taosMemoryFreeClear(pReadyMsgSendInfo); @@ -1818,7 +1819,7 @@ _return: taosMemoryFreeClear(pMsgSendInfo->param); taosMemoryFreeClear(pMsgSendInfo); } - + SCH_RET(code); } @@ -2319,7 +2320,7 @@ _return: } int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, - bool syncSchedule) { + bool syncSchedule) { qDebug("QID:0x%" PRIx64 " job started", pDag->queryId); int32_t code = 0; @@ -2608,7 +2609,7 @@ int32_t schedulerFetchRows(int64_t job, void **pData) { if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) { SCH_ERR_JRET(schFetchFromRemote(pJob)); tsem_wait(&pJob->rspSem); - } + } } else { SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status)); SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); @@ -2670,11 +2671,11 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) { for (int32_t i = pJob->levelNum - 1; i >= 0; --i) { SSchLevel *pLevel = taosArrayGet(pJob->levels, i); - + for (int32_t m = 0; m < pLevel->taskNum; ++m) { - SSchTask *pTask = taosArrayGet(pLevel->subTasks, m); + SSchTask *pTask = taosArrayGet(pLevel->subTasks, m); SQuerySubDesc subDesc = {.tid = pTask->taskId, .status = pTask->status}; - + taosArrayPush(pSub, &subDesc); } } @@ -2682,7 +2683,6 @@ int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) { return TSDB_CODE_SUCCESS; } - int32_t scheduleCancelJob(int64_t job) { SSchJob *pJob = schAcquireJob(job); if (NULL == pJob) { @@ -2734,15 +2734,17 @@ void schedulerFreeTaskList(SArray *taskList) { void schedulerDestroy(void) { if (schMgmt.jobRef) { SSchJob *pJob = taosIterateRef(schMgmt.jobRef, 0); - + int64_t refId = 0; + while (pJob) { + refId = pJob->refId; + taosRemoveRef(schMgmt.jobRef, pJob->refId); - pJob = taosIterateRef(schMgmt.jobRef, pJob->refId); + pJob = taosIterateRef(schMgmt.jobRef, refId); } taosCloseRef(schMgmt.jobRef); schMgmt.jobRef = 0; } } -