From fcdc8512d21dbe7881975aad95d41cd87f9099cf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 10 Nov 2020 22:50:34 +0800 Subject: [PATCH] [TD-1980] release mem/imem table reference once the data in cache are all scanned. --- src/query/src/qExecutor.c | 2 +- src/tsdb/src/tsdbMemTable.c | 2 +- src/tsdb/src/tsdbRead.c | 195 ++++++++++++++++++++---------------- 3 files changed, 108 insertions(+), 91 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 6191f536a3..8ede4b7c61 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -6926,7 +6926,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co if (IS_QUERY_KILLED(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) { // here current thread hold the refcount, so it is safe to free tsdbQueryHandle. - doFreeQueryHandle(pQInfo); +// doFreeQueryHandle(pQInfo); *continueExec = false; (*pRsp)->completed = 1; // notify no more result to client } else { diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 8864602ba1..0cd29cddb1 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -197,7 +197,7 @@ void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem) tsdbUnRefMemTable(pRepo, pIMem); } - tsdbDebug("vgId:%d utake memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pMem, pIMem); + tsdbDebug("vgId:%d untake memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pMem, pIMem); } void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index a96f764ea0..0a34feebad 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -120,8 +120,6 @@ typedef struct STsdbQueryHandle { SDataCols *pDataCols; // in order to hold current file data block int32_t allocSize; // allocated data block size SMemRef *pMemRef; -// SMemTable *mem; // mem-table -// SMemTable *imem; // imem-table, acquired from snapshot SArray *defaultLoadColumn;// default load column SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */ SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */ @@ -194,9 +192,12 @@ static void tsdbMayTakeMemSnapshot(STsdbQueryHandle* pQueryHandle) { } static void tsdbMayUnTakeMemSnapshot(STsdbQueryHandle* pQueryHandle) { - assert(pQueryHandle != NULL && pQueryHandle->pMemRef != NULL); - + assert(pQueryHandle != NULL); SMemRef* pMemRef = pQueryHandle->pMemRef; + if (pMemRef == NULL) { // it has been freed + return; + } + if (--pMemRef->ref == 0) { tsdbUnTakeMemSnapShot(pQueryHandle->pTsdb, pMemRef->mem, pMemRef->imem); pMemRef->mem = NULL; @@ -205,6 +206,7 @@ static void tsdbMayUnTakeMemSnapshot(STsdbQueryHandle* pQueryHandle) { pQueryHandle->pMemRef = NULL; } + static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STableGroupInfo* pGroupList, STsdbMeta* pMeta) { size_t sizeOfGroup = taosArrayGetSize(pGroupList->pGroupList); assert(sizeOfGroup >= 1 && pMeta != NULL); @@ -1821,6 +1823,8 @@ static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) { pQueryHandle->activeIndex += 1; } + // no data in memtable or imemtable, decrease the memory reference. + tsdbMayUnTakeMemSnapshot(pQueryHandle); return false; } @@ -1947,116 +1951,129 @@ static void destroyHelper(void* param) { free(param); } -// handle data in cache situation -bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { - STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle; +static bool getNeighborRows(STsdbQueryHandle* pQueryHandle) { + assert (pQueryHandle->type == TSDB_QUERY_TYPE_EXTERNAL); - int64_t stime = taosGetTimestampUs(); - int64_t elapsedTime = stime; + SDataBlockInfo blockInfo = {{0}, 0}; - size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); - assert(numOfTables > 0); + pQueryHandle->type = TSDB_QUERY_TYPE_ALL; + pQueryHandle->order = TSDB_ORDER_DESC; - SDataBlockInfo blockInfo = {{0}, 0}; - if (pQueryHandle->type == TSDB_QUERY_TYPE_EXTERNAL) { - pQueryHandle->type = TSDB_QUERY_TYPE_ALL; - pQueryHandle->order = TSDB_ORDER_DESC; + if (!tsdbNextDataBlock((void*) pQueryHandle)) { + return false; + } - if (!tsdbNextDataBlock(pHandle)) { - return false; + tsdbRetrieveDataBlockInfo((void*) pQueryHandle, &blockInfo); + /*SArray *pDataBlock = */tsdbRetrieveDataBlock((void*) pQueryHandle, pQueryHandle->defaultLoadColumn); + if (terrno != TSDB_CODE_SUCCESS) { + return false; + } + + if (pQueryHandle->cur.win.ekey == pQueryHandle->window.skey) { + // data already retrieve, discard other data rows and return + int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle)); + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); + memcpy((char*)pCol->pData, (char*)pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows - 1), pCol->info.bytes); } - tsdbRetrieveDataBlockInfo(pHandle, &blockInfo); - /*SArray *pDataBlock = */tsdbRetrieveDataBlock(pHandle, pQueryHandle->defaultLoadColumn); - if (terrno != TSDB_CODE_SUCCESS) { + pQueryHandle->cur.win = (STimeWindow){pQueryHandle->window.skey, pQueryHandle->window.skey}; + pQueryHandle->window = pQueryHandle->cur.win; + pQueryHandle->cur.rows = 1; + pQueryHandle->type = TSDB_QUERY_TYPE_ALL; + return true; + } else { + STimeWindow win = (STimeWindow) {pQueryHandle->window.skey, INT64_MAX}; + STsdbQueryCond cond = { + .order = TSDB_ORDER_ASC, + .numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle)) + }; + cond.twindow = win; + + cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo)); + if (cond.colList == NULL) { + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; return false; } - if (pQueryHandle->cur.win.ekey == pQueryHandle->window.skey) { - // data already retrieve, discard other data rows and return - int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle)); - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); - memcpy((char*)pCol->pData, (char*)pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows - 1), pCol->info.bytes); - } + for(int32_t i = 0; i < cond.numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pQueryHandle->pColumns, i); + memcpy(&cond.colList[i], &pColInfoData->info, sizeof(SColumnInfo)); + } - pQueryHandle->cur.win = (STimeWindow){pQueryHandle->window.skey, pQueryHandle->window.skey}; - pQueryHandle->window = pQueryHandle->cur.win; - pQueryHandle->cur.rows = 1; - pQueryHandle->type = TSDB_QUERY_TYPE_ALL; - return true; - } else { - STimeWindow win = (STimeWindow) {pQueryHandle->window.skey, INT64_MAX}; - STsdbQueryCond cond = { - .order = TSDB_ORDER_ASC, - .numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle)) - }; - cond.twindow = win; - - cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo)); - if (cond.colList == NULL) { - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return false; - } + STsdbQueryHandle* pSecQueryHandle = tsdbQueryTablesImpl(pQueryHandle->pTsdb, &cond, pQueryHandle->qinfo, pQueryHandle->pMemRef); + taosTFree(cond.colList); - for(int32_t i = 0; i < cond.numOfCols; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pQueryHandle->pColumns, i); - memcpy(&cond.colList[i], &pColInfoData->info, sizeof(SColumnInfo)); - } + pSecQueryHandle->pTableCheckInfo = createCheckInfoFromCheckInfo(pQueryHandle->pTableCheckInfo, pSecQueryHandle->window.skey); + if (pSecQueryHandle->pTableCheckInfo == NULL) { + tsdbCleanupQueryHandle(pSecQueryHandle); + return false; + } - STsdbQueryHandle* pSecQueryHandle = tsdbQueryTablesImpl(pQueryHandle->pTsdb, &cond, pQueryHandle->qinfo, pQueryHandle->pMemRef); + if (!tsdbNextDataBlock((void*) pSecQueryHandle)) { + tsdbCleanupQueryHandle(pSecQueryHandle); + return false; + } - taosTFree(cond.colList); + tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo); + tsdbRetrieveDataBlock((void*) pSecQueryHandle, pSecQueryHandle->defaultLoadColumn); - pSecQueryHandle->pTableCheckInfo = createCheckInfoFromCheckInfo(pQueryHandle->pTableCheckInfo, pSecQueryHandle->window.skey); - if (pSecQueryHandle->pTableCheckInfo == NULL) { - tsdbCleanupQueryHandle(pSecQueryHandle); - return false; - } + int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pSecQueryHandle)); + size_t si = taosArrayGetSize(pSecQueryHandle->pTableCheckInfo); - if (!tsdbNextDataBlock((void*) pSecQueryHandle)) { - tsdbCleanupQueryHandle(pSecQueryHandle); - return false; - } + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); + memcpy((char*)pCol->pData, (char*)pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows - 1), pCol->info.bytes); - tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo); - tsdbRetrieveDataBlock((void*) pSecQueryHandle, pSecQueryHandle->defaultLoadColumn); + SColumnInfoData* pCol1 = taosArrayGet(pSecQueryHandle->pColumns, i); + assert(pCol->info.colId == pCol1->info.colId); - int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pSecQueryHandle)); - size_t si = taosArrayGetSize(pSecQueryHandle->pTableCheckInfo); + memcpy((char*)pCol->pData + pCol->info.bytes, pCol1->pData, pCol1->info.bytes); + } - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); - memcpy((char*)pCol->pData, (char*)pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows - 1), pCol->info.bytes); + SColumnInfoData* pTSCol = taosArrayGet(pQueryHandle->pColumns, 0); - SColumnInfoData* pCol1 = taosArrayGet(pSecQueryHandle->pColumns, i); - assert(pCol->info.colId == pCol1->info.colId); + // it is ascending order + pQueryHandle->order = TSDB_ORDER_DESC; + pQueryHandle->window = pQueryHandle->cur.win; + pQueryHandle->cur.win = (STimeWindow){((TSKEY*)pTSCol->pData)[0], ((TSKEY*)pTSCol->pData)[1]}; + pQueryHandle->cur.rows = 2; + pQueryHandle->cur.mixBlock = true; - memcpy((char*)pCol->pData + pCol->info.bytes, pCol1->pData, pCol1->info.bytes); - } + int32_t step = -1;// one step for ascending order traverse + for (int32_t j = 0; j < si; ++j) { + STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, j); + pCheckInfo->lastKey = pQueryHandle->cur.win.ekey + step; + } - SColumnInfoData* pTSCol = taosArrayGet(pQueryHandle->pColumns, 0); + tsdbCleanupQueryHandle(pSecQueryHandle); + } - // it is ascending order - pQueryHandle->order = TSDB_ORDER_DESC; - pQueryHandle->window = pQueryHandle->cur.win; - pQueryHandle->cur.win = (STimeWindow){((TSKEY*)pTSCol->pData)[0], ((TSKEY*)pTSCol->pData)[1]}; - pQueryHandle->cur.rows = 2; - pQueryHandle->cur.mixBlock = true; + //disable it after retrieve data + pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL; + pQueryHandle->checkFiles = false; + return true; +} - int32_t step = -1;// one step for ascending order traverse - for (int32_t j = 0; j < si; ++j) { - STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, j); - pCheckInfo->lastKey = pQueryHandle->cur.win.ekey + step; - } +// handle data in cache situation +bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { + STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle; - tsdbCleanupQueryHandle(pSecQueryHandle); - } + int64_t stime = taosGetTimestampUs(); + int64_t elapsedTime = stime; - //disable it after retrieve data - pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL; - pQueryHandle->checkFiles = false; - return true; + size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); + assert(numOfTables > 0); + + if (pQueryHandle->type == TSDB_QUERY_TYPE_EXTERNAL) { + SMemRef* pMemRef = pQueryHandle->pMemRef; + tsdbMayTakeMemSnapshot(pQueryHandle); + bool ret = getNeighborRows(pQueryHandle); + tsdbMayUnTakeMemSnapshot(pQueryHandle); + + // restore the pMemRef + pQueryHandle->pMemRef = pMemRef; + return ret; } if (pQueryHandle->checkFiles) { -- GitLab