未验证 提交 82a37008 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #16863 from taosdata/feature/3_liaohj

fix(query): fix memory leak.
...@@ -285,8 +285,8 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader); ...@@ -285,8 +285,8 @@ int32_t tsdbDelFReaderClose(SDelFReader **ppReader);
int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData); int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData);
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx);
// tsdbRead.c ============================================================================================== // tsdbRead.c ==============================================================================================
int32_t tsdbTakeReadSnap(STsdb *pTsdb, STsdbReadSnap **ppSnap); int32_t tsdbTakeReadSnap(STsdb *pTsdb, STsdbReadSnap **ppSnap, const char* id);
void tsdbUntakeReadSnap(STsdb *pTsdb, STsdbReadSnap *pSnap); void tsdbUntakeReadSnap(STsdb *pTsdb, STsdbReadSnap *pSnap, const char* id);
// tsdbMerge.c ============================================================================================== // tsdbMerge.c ==============================================================================================
int32_t tsdbMerge(STsdb *pTsdb); int32_t tsdbMerge(STsdb *pTsdb);
......
...@@ -847,7 +847,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs ...@@ -847,7 +847,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
tb_uid_t suid = getTableSuidByUid(uid, pTsdb); tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
tsdbTakeReadSnap(pTsdb, &pIter->pReadSnap); tsdbTakeReadSnap(pTsdb, &pIter->pReadSnap, NULL);
STbData *pMem = NULL; STbData *pMem = NULL;
if (pIter->pReadSnap->pMem) { if (pIter->pReadSnap->pMem) {
...@@ -941,7 +941,7 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) { ...@@ -941,7 +941,7 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) {
taosArrayDestroy(pIter->pSkyline); taosArrayDestroy(pIter->pSkyline);
} }
tsdbUntakeReadSnap(pIter->pTsdb, pIter->pReadSnap); tsdbUntakeReadSnap(pIter->pTsdb, pIter->pReadSnap, NULL);
_err: _err:
return code; return code;
......
...@@ -2096,8 +2096,10 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { ...@@ -2096,8 +2096,10 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
// it is a clean block, load it directly // it is a clean block, load it directly
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader)) { if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader)) {
copyBlockDataToSDataBlock(pReader, pBlockScanInfo); if (pReader->order == TSDB_ORDER_ASC || (pReader->order == TSDB_ORDER_DESC && (!hasDataInLastBlock(pLastBlockReader)))) {
goto _end; copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
goto _end;
}
} }
} else { // file blocks not exist } else { // file blocks not exist
pBlockScanInfo = pReader->status.pTableIter; pBlockScanInfo = pReader->status.pTableIter;
...@@ -3360,7 +3362,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl ...@@ -3360,7 +3362,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
goto _err; goto _err;
} }
code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap); code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap, pReader->idStr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _err; goto _err;
} }
...@@ -3384,7 +3386,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl ...@@ -3384,7 +3386,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
STsdbReader* pPrevReader = pReader->innerReader[0]; STsdbReader* pPrevReader = pReader->innerReader[0];
SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter; SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter;
code = tsdbTakeReadSnap(pPrevReader->pTsdb, &pPrevReader->pReadSnap); code = tsdbTakeReadSnap(pPrevReader->pTsdb, &pPrevReader->pReadSnap, pReader->idStr);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _err; goto _err;
} }
...@@ -3441,7 +3443,7 @@ void tsdbReaderClose(STsdbReader* pReader) { ...@@ -3441,7 +3443,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
tsdbDataFReaderClose(&pReader->pFileReader); tsdbDataFReaderClose(&pReader->pFileReader);
} }
tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap); tsdbUntakeReadSnap(pReader->pTsdb, pReader->pReadSnap, pReader->idStr);
taosMemoryFree(pReader->status.uidCheckInfo.tableUidList); taosMemoryFree(pReader->status.uidCheckInfo.tableUidList);
SIOCostSummary* pCost = &pReader->cost; SIOCostSummary* pCost = &pReader->cost;
...@@ -3717,8 +3719,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { ...@@ -3717,8 +3719,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
} }
} }
tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%"PRId64", query range:%" PRId64 " - %" PRId64 " in query %s",
pReader, pReader->suid, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr); pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey, pReader->idStr);
return code; return code;
} }
...@@ -3863,7 +3865,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 ...@@ -3863,7 +3865,7 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap) { int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr) {
int32_t code = 0; int32_t code = 0;
// alloc // alloc
...@@ -3906,12 +3908,12 @@ int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap) { ...@@ -3906,12 +3908,12 @@ int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap) {
goto _exit; goto _exit;
} }
tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode)); tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
_exit: _exit:
return code; return code;
} }
void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap) { void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap, const char* idStr) {
if (pSnap) { if (pSnap) {
if (pSnap->pMem) { if (pSnap->pMem) {
tsdbUnrefMemTable(pSnap->pMem); tsdbUnrefMemTable(pSnap->pMem);
...@@ -3924,6 +3926,5 @@ void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap) { ...@@ -3924,6 +3926,5 @@ void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap) {
tsdbFSUnref(pTsdb, &pSnap->fs); tsdbFSUnref(pTsdb, &pSnap->fs);
taosMemoryFree(pSnap); taosMemoryFree(pSnap);
} }
tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr);
tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
} }
...@@ -533,6 +533,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR ...@@ -533,6 +533,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
taosArrayPush(rsp.pArray, &cRsp); taosArrayPush(rsp.pArray, &cRsp);
} }
vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids));
tqUpdateTbUidList(pVnode->pTq, tbUids, true); tqUpdateTbUidList(pVnode->pTq, tbUids, true);
if (tdUpdateTbUidList(pVnode->pSma, pStore, true) < 0) { if (tdUpdateTbUidList(pVnode->pSma, pStore, true) < 0) {
goto _exit; goto _exit;
...@@ -885,8 +886,9 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq ...@@ -885,8 +886,9 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
if (NULL != submitBlkRsp.pMeta) { if (NULL != submitBlkRsp.pMeta) {
vnodeUpdateMetaRsp(pVnode, submitBlkRsp.pMeta); vnodeUpdateMetaRsp(pVnode, submitBlkRsp.pMeta);
} }
taosArrayPush(newTbUids, &createTbReq.uid);
} }
taosArrayPush(newTbUids, &createTbReq.uid);
submitBlkRsp.uid = createTbReq.uid; submitBlkRsp.uid = createTbReq.uid;
submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2); submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
...@@ -917,6 +919,11 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq ...@@ -917,6 +919,11 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
submitRsp.affectedRows += submitBlkRsp.affectedRows; submitRsp.affectedRows += submitBlkRsp.affectedRows;
taosArrayPush(submitRsp.pArray, &submitBlkRsp); taosArrayPush(submitRsp.pArray, &submitBlkRsp);
} }
if (taosArrayGetSize(newTbUids) > 0) {
vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode), (int32_t)taosArrayGetSize(newTbUids));
}
tqUpdateTbUidList(pVnode->pTq, newTbUids, true); tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
_exit: _exit:
......
...@@ -265,6 +265,15 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S ...@@ -265,6 +265,15 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) { int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
STableListInfo* pListInfo = &pTaskInfo->tableqinfoList;
if (isAdd) {
qDebug("add %d tables id into query list, %s", (int32_t) taosArrayGetSize(tableIdList), pTaskInfo->id.str);
}
if (pListInfo->map == NULL) {
pListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
}
// traverse to the stream scanner node to add this table id // traverse to the stream scanner node to add this table id
SOperatorInfo* pInfo = pTaskInfo->pRoot; SOperatorInfo* pInfo = pTaskInfo->pRoot;
...@@ -311,13 +320,19 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo ...@@ -311,13 +320,19 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
} }
} }
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo); bool exists = false;
if (pTaskInfo->tableqinfoList.map == NULL) { for (int32_t k = 0; k < taosArrayGetSize(pListInfo->pTableList); ++k) {
pTaskInfo->tableqinfoList.map = STableKeyInfo* pKeyInfo = taosArrayGet(pListInfo->pTableList, k);
taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); if (pKeyInfo->uid == keyInfo.uid) {
qWarn("ignore duplicated query table uid:%" PRIu64 " added, %s", pKeyInfo->uid, pTaskInfo->id.str);
exists = true;
}
} }
taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId)); if (!exists) {
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo);
taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId));
}
} }
if (keyBuf != NULL) { if (keyBuf != NULL) {
...@@ -480,6 +495,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) { ...@@ -480,6 +495,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) {
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
pTaskInfo->code = ret; pTaskInfo->code = ret;
cleanUpUdfs(); cleanUpUdfs();
qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
atomic_store_64(&pTaskInfo->owner, 0); atomic_store_64(&pTaskInfo->owner, 0);
...@@ -512,8 +528,8 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) { ...@@ -512,8 +528,8 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) {
} }
cleanUpUdfs(); cleanUpUdfs();
uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms", qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0); GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0);
......
...@@ -617,19 +617,28 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { ...@@ -617,19 +617,28 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
// if scan table by table // if scan table by table
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) { if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
if (pInfo->noTable) return NULL; if (pInfo->noTable) {
return NULL;
}
int32_t numOfTables = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
while (1) { while (1) {
SSDataBlock* result = doTableScanGroup(pOperator); SSDataBlock* result = doTableScanGroup(pOperator);
if (result) { if (result) {
return result; return result;
} }
// if no data, switch to next table and continue scan // if no data, switch to next table and continue scan
pInfo->currentTable++; pInfo->currentTable++;
if (pInfo->currentTable >= taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList)) { if (pInfo->currentTable >= numOfTables) {
return NULL; return NULL;
} }
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable); STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable);
tsdbSetTableId(pInfo->dataReader, pTableInfo->uid); tsdbSetTableId(pInfo->dataReader, pTableInfo->uid);
qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables, pInfo->currentTable, pTaskInfo->id.str);
tsdbReaderReset(pInfo->dataReader, &pInfo->cond); tsdbReaderReset(pInfo->dataReader, &pInfo->cond);
pInfo->scanTimes = 0; pInfo->scanTimes = 0;
} }
......
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
#include "tcommon.h" #include "tcommon.h"
#include "tmsg.h" #include "tmsg.h"
#include "tname.h" #include "tname.h"
#include "tdatablock.h"
SQWorkerMgmt gQwMgmt = { SQWorkerMgmt gQwMgmt = {
.lock = 0, .lock = 0,
...@@ -16,6 +17,11 @@ SQWorkerMgmt gQwMgmt = { ...@@ -16,6 +17,11 @@ SQWorkerMgmt gQwMgmt = {
.qwNum = 0, .qwNum = 0,
}; };
static void freeBlock(void* param) {
SSDataBlock* pBlock = *(SSDataBlock**)param;
blockDataDestroy(pBlock);
}
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
int32_t code = 0; int32_t code = 0;
SSchedulerHbRsp rsp = {0}; SSchedulerHbRsp rsp = {0};
...@@ -88,6 +94,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { ...@@ -88,6 +94,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
// if *taskHandle is NULL, it's killed right now // if *taskHandle is NULL, it's killed right now
if (taskHandle) { if (taskHandle) {
qwDbgSimulateSleep(); qwDbgSimulateSleep();
code = qExecTaskOpt(taskHandle, pResList, &useconds); code = qExecTaskOpt(taskHandle, pResList, &useconds);
if (code) { if (code) {
if (code != TSDB_CODE_OPS_NOT_SUPPORT) { if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
...@@ -150,8 +157,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { ...@@ -150,8 +157,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
} }
_return: _return:
taosArrayDestroyEx(pResList, freeBlock);
taosArrayDestroy(pResList);
QW_RET(code); QW_RET(code);
} }
...@@ -915,13 +921,13 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { ...@@ -915,13 +921,13 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
void *pIter = taosHashIterate(mgmt->schHash, NULL); void *pIter = taosHashIterate(mgmt->schHash, NULL);
while (pIter) { while (pIter) {
SQWSchStatus *sch = (SQWSchStatus *)pIter; SQWSchStatus *sch1 = (SQWSchStatus *)pIter;
if (NULL == sch->hbConnInfo.handle) { if (NULL == sch1->hbConnInfo.handle) {
uint64_t *sId = taosHashGetKey(pIter, NULL); uint64_t *sId = taosHashGetKey(pIter, NULL);
QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId); QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId);
if (sch->hbBrokenTs > 0 && ((currentMs - sch->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) && if (sch1->hbBrokenTs > 0 && ((currentMs - sch1->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) &&
taosHashGetSize(sch->tasksHash) <= 0) { taosHashGetSize(sch1->tasksHash) <= 0) {
taosArrayPush(pExpiredSch, sId); taosArrayPush(pExpiredSch, sId);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册