提交 ad35a6e4 编写于 作者: Y yihaoDeng

TD-1733

上级 8714b05e
......@@ -190,6 +190,7 @@ typedef struct SQInfo {
int32_t code; // error code to returned to client
int64_t owner; // if it is in execution
void* tsdb;
int32_t ref; // tsdb mem/immem ref count
int32_t vgId;
STableGroupInfo tableGroupInfo; // table <tid, last_key> list SArray<STableKeyInfo>
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
......
......@@ -425,8 +425,8 @@ SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
int tsdbInsertRowToMem(STsdbRepo* pRepo, SDataRow row, STable* pTable);
int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem);
void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem);
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem, bool ref);
void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem, bool unRef);
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
int tsdbAsyncCommit(STsdbRepo* pRepo);
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
......@@ -576,4 +576,4 @@ void tsdbFreeScanHandle(STsdbScanHandle* pScanHandle);
}
#endif
#endif
\ No newline at end of file
#endif
......@@ -171,30 +171,36 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
return 0;
}
int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) {
int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem, bool ref) {
if (tsdbLockRepo(pRepo) < 0) return -1;
*pMem = pRepo->mem;
*pIMem = pRepo->imem;
*pMem = pRepo->mem;
*pIMem = pRepo->imem;
tsdbRefMemTable(pRepo, *pMem);
tsdbRefMemTable(pRepo, *pIMem);
if (ref) {
tsdbRefMemTable(pRepo, *pMem);
tsdbRefMemTable(pRepo, *pIMem);
}
if (tsdbUnlockRepo(pRepo) < 0) return -1;
if (*pMem != NULL) taosRLockLatch(&((*pMem)->latch));
if (*pMem != NULL) {
if (ref) taosRLockLatch(&((*pMem)->latch));
}
tsdbDebug("vgId:%d take memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), *pMem, *pIMem);
return 0;
}
void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem) {
void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem, bool unRef) {
if (pMem != NULL) {
taosRUnLockLatch(&(pMem->latch));
tsdbUnRefMemTable(pRepo, pMem);
if (unRef) {
taosRUnLockLatch(&(pMem->latch));
tsdbUnRefMemTable(pRepo, pMem);
}
}
if (pIMem != NULL) {
tsdbUnRefMemTable(pRepo, pIMem);
if (unRef) tsdbUnRefMemTable(pRepo, pIMem);
}
}
......@@ -780,4 +786,4 @@ static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) {
taosTFree(tData);
return 0;
}
\ No newline at end of file
}
......@@ -20,6 +20,7 @@
#include "exception.h"
#include "../../query/inc/qAst.h" // todo move to common module
#include "../../query/inc/qExecutor.h"
#include "tlosertree.h"
#include "tsdb.h"
#include "tsdbMain.h"
......@@ -203,8 +204,8 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
if (tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb) != 0) {
goto out_of_memory;
}
tsdbTakeMemSnapshot(pQueryHandle->pTsdb, &pQueryHandle->mem, &pQueryHandle->imem);
SQInfo *pQInfo = (SQInfo *)(pQueryHandle->qinfo);
tsdbTakeMemSnapshot(pQueryHandle->pTsdb, &pQueryHandle->mem, &pQueryHandle->imem, pQInfo->ref++ == 0);
size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList);
assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0);
......@@ -1939,14 +1940,15 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
pSecQueryHandle->checkFiles = true;
pSecQueryHandle->activeIndex = 0;
pSecQueryHandle->outputCapacity = ((STsdbRepo*)pSecQueryHandle->pTsdb)->config.maxRowsPerFileBlock;
pSecQueryHandle->qinfo = pQueryHandle->qinfo;
if (tsdbInitReadHelper(&pSecQueryHandle->rhelper, (STsdbRepo*) pSecQueryHandle->pTsdb) != 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
free(pSecQueryHandle);
return false;
}
tsdbTakeMemSnapshot(pSecQueryHandle->pTsdb, &pSecQueryHandle->mem, &pSecQueryHandle->imem);
SQInfo *pQInfo = (SQInfo *)(pSecQueryHandle->qinfo);
tsdbTakeMemSnapshot(pSecQueryHandle->pTsdb, &pSecQueryHandle->mem, &pSecQueryHandle->imem, (pQInfo->ref++ == 0));
// allocate buffer in order to load data blocks from file
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle));
......@@ -2707,7 +2709,8 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
taosTFree(pQueryHandle->statis);
// todo check error
tsdbUnTakeMemSnapShot(pQueryHandle->pTsdb, pQueryHandle->mem, pQueryHandle->imem);
SQInfo *qInfo = (SQInfo *)(pQueryHandle->qinfo);
tsdbUnTakeMemSnapShot(pQueryHandle->pTsdb, pQueryHandle->mem, pQueryHandle->imem, --qInfo->ref == 0);
tsdbDestroyHelper(&pQueryHandle->rhelper);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册