diff --git a/examples/c/tmq.c b/examples/c/tmq.c index eb41ad039a1852bb265165837d69edc3a2835684..1e3a828e0217b73c57cf2140f653abf40cc31ffe 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -191,21 +191,45 @@ tmq_t* build_consumer() { tmq_conf_res_t code; tmq_conf_t* conf = tmq_conf_new(); code = tmq_conf_set(conf, "enable.auto.commit", "true"); - if (TMQ_CONF_OK != code) return NULL; + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); - if (TMQ_CONF_OK != code) return NULL; + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } code = tmq_conf_set(conf, "group.id", "cgrpName"); - if (TMQ_CONF_OK != code) return NULL; + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } code = tmq_conf_set(conf, "client.id", "user defined name"); - if (TMQ_CONF_OK != code) return NULL; + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } code = tmq_conf_set(conf, "td.connect.user", "root"); - if (TMQ_CONF_OK != code) return NULL; + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } code = tmq_conf_set(conf, "td.connect.pass", "taosdata"); - if (TMQ_CONF_OK != code) return NULL; + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } code = tmq_conf_set(conf, "auto.offset.reset", "earliest"); - if (TMQ_CONF_OK != code) return NULL; + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } code = tmq_conf_set(conf, "experimental.snapshot.enable", "false"); - if (TMQ_CONF_OK != code) return NULL; + if (TMQ_CONF_OK != code) { + tmq_conf_destroy(conf); + return NULL; + } tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index e86620337293e7796287d2203168fc4882a4f724..15aa7b4297e96619f3db80cbb5d4993f9726ca14 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1033,6 +1033,7 @@ SHelper* createTupleIndex_rv(int32_t numOfRows, SArray* pOrderInfo, SSDataBlock* offset += pInfo->pColData->info.bytes; } + taosMemoryFree(buf); return phelper; } @@ -2370,7 +2371,11 @@ _end: taosArrayDestroy(pVals); if (terrno != 0) { *ppReq = NULL; - if (pReq) tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE); + if (pReq) { + tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE); + taosMemoryFreeClear(pReq); + } + return TSDB_CODE_FAILED; } *ppReq = pReq; diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 6ea169a5e332c2450d8d48d25ed5552c497ca34b..5ec516fa1b885a694147db40addc871a9219893d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -433,8 +433,10 @@ _end: tsdbUntakeReadSnap((STsdbReader*)pr, pr->pReadSnap, true); taosThreadMutexUnlock(&pr->readerMutex); - for (int32_t j = 0; j < pr->numOfCols; ++j) { - taosMemoryFree(pRes[j]); + if (pRes != NULL) { + for (int32_t j = 0; j < pr->numOfCols; ++j) { + taosMemoryFree(pRes[j]); + } } taosMemoryFree(pRes); diff --git a/source/dnode/vnode/src/tsdb/tsdbDataIter.c b/source/dnode/vnode/src/tsdb/tsdbDataIter.c index 2f49781276082dad126a1d2a1c6a396b7fd9dafe..3299a2f497a825a546c9e613cf74ffde1c859ca3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataIter.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataIter.c @@ -219,7 +219,7 @@ static int32_t tsdbDataFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* } ASSERT(pIter->rowInfo.suid == pIter->dIter.bData.suid); - ASSERT(pIter->rowInfo.uid = pIter->dIter.bData.uid); + ASSERT(pIter->rowInfo.uid == pIter->dIter.bData.uid); pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->dIter.bData, pIter->dIter.iRow); pIter->dIter.iRow++; goto _exit; diff --git a/source/dnode/vnode/src/tsdb/tsdbFile.c b/source/dnode/vnode/src/tsdb/tsdbFile.c index 42728be657cee044e3a003fb6b437b56b6df3e71..6851bb20437aa13c145326f586eb9e20d26942cc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile.c @@ -148,10 +148,10 @@ bool tsdbDelFileIsSame(SDelFile *pDelFile1, SDelFile *pDelFile2) { return pDelFi int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype) { int32_t code = 0; - int64_t size; + int64_t size = 0; int64_t n; TdFilePtr pFD; - char fname[TSDB_FILENAME_LEN]; + char fname[TSDB_FILENAME_LEN] = {0}; char hdr[TSDB_FHDR_SIZE] = {0}; // truncate diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 3db9ff2b42607a0b39c8cddb8f69bd3c21ce5f5e..554ec0f1f9a4bbee0870521cb29b8aa5028f521e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -473,7 +473,7 @@ static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListN int8_t forward) { int32_t code = 0; int8_t level; - SMemSkipListNode *pNode; + SMemSkipListNode *pNode = NULL; SVBufPool *pPool = pMemTable->pTsdb->pVnode->inUse; int64_t nSize; @@ -591,7 +591,9 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, pBlockData->aColData = vnodeBufPoolMalloc(pPool, sizeof(SColData) * pBlockData->nColData); if (pBlockData->aColData == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; } + for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) { code = tColDataCopy(&aColData[iColData + 1], &pBlockData->aColData[iColData], (xMallocFn)vnodeBufPoolMalloc, pPool); if (code) goto _exit; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index c83fdb2e4fc3a1b1073039ab3c65565ba69ee23e..027784e340aa755a3b215e0ae1db842560783bbb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -874,7 +874,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN pBlockNum->numOfBlocks += 1; } - if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) { + if ((pScanInfo->pBlockList != NULL )&& (taosArrayGetSize(pScanInfo->pBlockList) > 0)) { numOfQTable += 1; } } @@ -4532,7 +4532,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, SSDataBlock* pResBlock = pReader->pResBlock; if (pResBlock->pBlockAgg == NULL) { size_t num = taosArrayGetSize(pResBlock->pDataBlock); - pResBlock->pBlockAgg = taosMemoryCalloc(num, sizeof(SColumnDataAgg)); + pResBlock->pBlockAgg = taosMemoryCalloc(num, POINTER_BYTES); } // do fill all null column value SMA info diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 639adb7ec60338817f8807ba790dbe44cd05d182..5ee1601a245ca13810d2ffa1304b13f7001516b8 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1148,7 +1148,6 @@ int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode, if (TSDB_CODE_SUCCESS == code) { REPLACE_NODE(pNew); } else { - taosMemoryFree(keyBuf); nodesDestroyList(groupNew); metaReaderClear(&mr); return code; @@ -1166,7 +1165,6 @@ int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode, if (pValue->node.resType.type == TSDB_DATA_TYPE_JSON) { if (tTagIsJson(data)) { terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR; - taosMemoryFree(keyBuf); nodesDestroyList(groupNew); metaReaderClear(&mr); return terrno; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e6ccb99b10bc91a876818f2bf2eda58633b3e12d..1848bbdd5ff03d688b43b65843676d39bda91715 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2035,7 +2035,11 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, tDecoderClear(&mr.coder); tb_uid_t suid = mr.me.ctbEntry.suid; - metaGetTableEntryByUidCache(&mr, suid); + code = metaGetTableEntryByUidCache(&mr, suid); + if (code != TSDB_CODE_SUCCESS) { + return terrno; + } + pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version; } else { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 9a56512683c505ef4556958a4c5bb6fc8bdea817..389fc98163683d984158c60b15b683729e53cd21 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3050,8 +3050,8 @@ int32_t tblCountScanGetInputs(SNodeList* groupTags, SName* tableName, STableCoun } } } else { - strncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN); - strncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN); + tstrncpy(supp->dbNameFilter, tNameGetDbNameP(tableName), TSDB_DB_NAME_LEN); + tstrncpy(supp->stbNameFilter, tNameGetTableName(tableName), TSDB_TABLE_NAME_LEN); } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 98ef6b8a3621abb1560d9f04924f92cff84b4935..de840890400c692902ad453ff84b40f1cb082995 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -46,8 +46,9 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pOperator->pTaskInfo = pTaskInfo; SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc; - int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols); + int32_t numOfCols = 0; + pOperator->exprSupp.pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols); + pOperator->exprSupp.numOfExprs = numOfCols; int32_t numOfOutputCols = 0; int32_t code = @@ -56,7 +57,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* goto _error; } - pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); + pOperator->exprSupp.pCtx = + createSqlFunctionCtx(pOperator->exprSupp.pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); initResultSizeInfo(&pOperator->resultInfo, 1024); code = filterInitFromNode((SNode*)pSortNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); if (code != TSDB_CODE_SUCCESS) { @@ -68,8 +70,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo); setOperatorInfo(pOperator, "SortOperator", QUERY_NODE_PHYSICAL_PLAN_SORT, true, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = numOfCols; + // lazy evaluation for the following parameter since the input datablock is not known till now. // pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 1b2f1350647e0f5bcdffb06b27dde16321d9d96f..ff5be12d0ba5b66bd4469469becbd89f38590756 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -1608,9 +1608,7 @@ static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScan if (pInfo->tbnameSlotId != -1) { SColumnInfoData* pColumnInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, pInfo->tbnameSlotId); char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0}; - memcpy(varDataVal(varTbName), name, strlen(name)); - varDataSetLen(varTbName, strlen(name)); - + STR_TO_VARSTR(varTbName, name); colDataAppendNItems(pColumnInfoData, 0, varTbName, pBlock->info.rows); } diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 90f20f40b820c9503eeb13be1e52de8eb4c4a66b..3ce2ef34d356606378c7665082c58bd75f2b8842 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -212,6 +212,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { int32_t pageId = -1; void* pPage = getNewBufPage(pHandle->pBuf, &pageId); if (pPage == NULL) { + taosArrayDestroy(pPageIdList); blockDataDestroy(p); return terrno; } diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 6bcf4ad39b729786e2d562d7a928b0a4965dc03d..d39e8599f178e5e0820db6518f917575ab688a81 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -391,7 +391,7 @@ _error: return TSDB_CODE_OUT_OF_MEMORY; } -static char* doExtractPage(SDiskbasedBuf* pBuf) { +static char* doExtractPage(SDiskbasedBuf* pBuf, bool* newPage) { char* availablePage = NULL; if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) { availablePage = evictBufPage(pBuf); @@ -405,6 +405,7 @@ static char* doExtractPage(SDiskbasedBuf* pBuf) { if (availablePage == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; } + *newPage = true; } return availablePage; @@ -413,7 +414,8 @@ static char* doExtractPage(SDiskbasedBuf* pBuf) { void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) { pBuf->statis.getPages += 1; - char* availablePage = doExtractPage(pBuf); + bool newPage = false; + char* availablePage = doExtractPage(pBuf, &newPage); if (availablePage == NULL) { return NULL; } @@ -432,6 +434,9 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) { // register page id info pi = registerNewPageInfo(pBuf, *pageId); if (pi == NULL) { + if (newPage) { + taosMemoryFree(availablePage); + } return NULL; } @@ -492,7 +497,8 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { ASSERT((!BUF_PAGE_IN_MEM(*pi)) && (*pi)->pn == NULL && (((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1))); - (*pi)->pData = doExtractPage(pBuf); + bool newPage = false; + (*pi)->pData = doExtractPage(pBuf, &newPage); // failed to evict buffer page, return with error code. if ((*pi)->pData == NULL) { @@ -509,6 +515,10 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { if (HAS_DATA_IN_DISK(*pi)) { int32_t code = loadPageFromDisk(pBuf, *pi); if (code != 0) { + if (newPage) { + taosMemoryFree((*pi)->pData); + } + terrno = code; return NULL; } diff --git a/source/util/src/tutil.c b/source/util/src/tutil.c index 55d7d4f6e7dc850e13b362289b545f57b65b5bbd..1f9ca7407e51240680466f736cb1cf4d3914b531 100644 --- a/source/util/src/tutil.c +++ b/source/util/src/tutil.c @@ -118,7 +118,7 @@ char **strsplit(char *z, const char *delim, int32_t *num) { if ((*num) >= size) { size = (size << 1); split = taosMemoryRealloc(split, POINTER_BYTES * size); - ASSERTS(NULL != split, "realloc memory failed. size=%d", POINTER_BYTES * size); + ASSERTS(NULL != split, "realloc memory failed. size=%d", (int32_t) POINTER_BYTES * size); } } diff --git a/utils/test/c/tmqSim.c b/utils/test/c/tmqSim.c index 9c1dc2e063ee96e7e593154fcc602c6b3be9073f..8f7768635b81499ab299960ab6b88826116d8335 100644 --- a/utils/test/c/tmqSim.c +++ b/utils/test/c/tmqSim.c @@ -1381,7 +1381,7 @@ void startOmbConsume() { printf("SQL: %s\n", sql); queryDbExec(taos, sql, NO_INSERT_TYPE); - int32_t producerRate = ceil(g_stConfInfo.producerRate / g_stConfInfo.producers); + int32_t producerRate = ceil(((double)g_stConfInfo.producerRate) / g_stConfInfo.producers); printf("==== create %d produce thread ====\n", g_stConfInfo.producers); for (int32_t i = 0; i < g_stConfInfo.producers; ++i) {