提交 fcdc8512 编写于 作者: H Haojun Liao

[TD-1980] release mem/imem table reference once the data in cache are all scanned.

上级 8d1c2d37
...@@ -6926,7 +6926,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co ...@@ -6926,7 +6926,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
if (IS_QUERY_KILLED(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) { if (IS_QUERY_KILLED(pQInfo) || Q_STATUS_EQUAL(pQuery->status, QUERY_OVER)) {
// here current thread hold the refcount, so it is safe to free tsdbQueryHandle. // here current thread hold the refcount, so it is safe to free tsdbQueryHandle.
doFreeQueryHandle(pQInfo); // doFreeQueryHandle(pQInfo);
*continueExec = false; *continueExec = false;
(*pRsp)->completed = 1; // notify no more result to client (*pRsp)->completed = 1; // notify no more result to client
} else { } else {
......
...@@ -197,7 +197,7 @@ void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem) ...@@ -197,7 +197,7 @@ void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem)
tsdbUnRefMemTable(pRepo, pIMem); tsdbUnRefMemTable(pRepo, pIMem);
} }
tsdbDebug("vgId:%d utake memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pMem, pIMem); tsdbDebug("vgId:%d untake memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pMem, pIMem);
} }
void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
......
...@@ -120,8 +120,6 @@ typedef struct STsdbQueryHandle { ...@@ -120,8 +120,6 @@ typedef struct STsdbQueryHandle {
SDataCols *pDataCols; // in order to hold current file data block SDataCols *pDataCols; // in order to hold current file data block
int32_t allocSize; // allocated data block size int32_t allocSize; // allocated data block size
SMemRef *pMemRef; SMemRef *pMemRef;
// SMemTable *mem; // mem-table
// SMemTable *imem; // imem-table, acquired from snapshot
SArray *defaultLoadColumn;// default load column SArray *defaultLoadColumn;// default load column
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */ SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */ SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */
...@@ -194,9 +192,12 @@ static void tsdbMayTakeMemSnapshot(STsdbQueryHandle* pQueryHandle) { ...@@ -194,9 +192,12 @@ static void tsdbMayTakeMemSnapshot(STsdbQueryHandle* pQueryHandle) {
} }
static void tsdbMayUnTakeMemSnapshot(STsdbQueryHandle* pQueryHandle) { static void tsdbMayUnTakeMemSnapshot(STsdbQueryHandle* pQueryHandle) {
assert(pQueryHandle != NULL && pQueryHandle->pMemRef != NULL); assert(pQueryHandle != NULL);
SMemRef* pMemRef = pQueryHandle->pMemRef; SMemRef* pMemRef = pQueryHandle->pMemRef;
if (pMemRef == NULL) { // it has been freed
return;
}
if (--pMemRef->ref == 0) { if (--pMemRef->ref == 0) {
tsdbUnTakeMemSnapShot(pQueryHandle->pTsdb, pMemRef->mem, pMemRef->imem); tsdbUnTakeMemSnapShot(pQueryHandle->pTsdb, pMemRef->mem, pMemRef->imem);
pMemRef->mem = NULL; pMemRef->mem = NULL;
...@@ -205,6 +206,7 @@ static void tsdbMayUnTakeMemSnapshot(STsdbQueryHandle* pQueryHandle) { ...@@ -205,6 +206,7 @@ static void tsdbMayUnTakeMemSnapshot(STsdbQueryHandle* pQueryHandle) {
pQueryHandle->pMemRef = NULL; pQueryHandle->pMemRef = NULL;
} }
static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STableGroupInfo* pGroupList, STsdbMeta* pMeta) { static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STableGroupInfo* pGroupList, STsdbMeta* pMeta) {
size_t sizeOfGroup = taosArrayGetSize(pGroupList->pGroupList); size_t sizeOfGroup = taosArrayGetSize(pGroupList->pGroupList);
assert(sizeOfGroup >= 1 && pMeta != NULL); assert(sizeOfGroup >= 1 && pMeta != NULL);
...@@ -1821,6 +1823,8 @@ static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) { ...@@ -1821,6 +1823,8 @@ static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) {
pQueryHandle->activeIndex += 1; pQueryHandle->activeIndex += 1;
} }
// no data in memtable or imemtable, decrease the memory reference.
tsdbMayUnTakeMemSnapshot(pQueryHandle);
return false; return false;
} }
...@@ -1947,116 +1951,129 @@ static void destroyHelper(void* param) { ...@@ -1947,116 +1951,129 @@ static void destroyHelper(void* param) {
free(param); free(param);
} }
// handle data in cache situation static bool getNeighborRows(STsdbQueryHandle* pQueryHandle) {
bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) { assert (pQueryHandle->type == TSDB_QUERY_TYPE_EXTERNAL);
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
int64_t stime = taosGetTimestampUs(); SDataBlockInfo blockInfo = {{0}, 0};
int64_t elapsedTime = stime;
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
assert(numOfTables > 0); pQueryHandle->order = TSDB_ORDER_DESC;
SDataBlockInfo blockInfo = {{0}, 0}; if (!tsdbNextDataBlock((void*) pQueryHandle)) {
if (pQueryHandle->type == TSDB_QUERY_TYPE_EXTERNAL) { return false;
pQueryHandle->type = TSDB_QUERY_TYPE_ALL; }
pQueryHandle->order = TSDB_ORDER_DESC;
if (!tsdbNextDataBlock(pHandle)) { tsdbRetrieveDataBlockInfo((void*) pQueryHandle, &blockInfo);
return false; /*SArray *pDataBlock = */tsdbRetrieveDataBlock((void*) pQueryHandle, pQueryHandle->defaultLoadColumn);
if (terrno != TSDB_CODE_SUCCESS) {
return false;
}
if (pQueryHandle->cur.win.ekey == pQueryHandle->window.skey) {
// data already retrieve, discard other data rows and return
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle));
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
memcpy((char*)pCol->pData, (char*)pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows - 1), pCol->info.bytes);
} }
tsdbRetrieveDataBlockInfo(pHandle, &blockInfo); pQueryHandle->cur.win = (STimeWindow){pQueryHandle->window.skey, pQueryHandle->window.skey};
/*SArray *pDataBlock = */tsdbRetrieveDataBlock(pHandle, pQueryHandle->defaultLoadColumn); pQueryHandle->window = pQueryHandle->cur.win;
if (terrno != TSDB_CODE_SUCCESS) { pQueryHandle->cur.rows = 1;
pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
return true;
} else {
STimeWindow win = (STimeWindow) {pQueryHandle->window.skey, INT64_MAX};
STsdbQueryCond cond = {
.order = TSDB_ORDER_ASC,
.numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle))
};
cond.twindow = win;
cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo));
if (cond.colList == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return false; return false;
} }
if (pQueryHandle->cur.win.ekey == pQueryHandle->window.skey) { for(int32_t i = 0; i < cond.numOfCols; ++i) {
// data already retrieve, discard other data rows and return SColumnInfoData* pColInfoData = taosArrayGet(pQueryHandle->pColumns, i);
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle)); memcpy(&cond.colList[i], &pColInfoData->info, sizeof(SColumnInfo));
for (int32_t i = 0; i < numOfCols; ++i) { }
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
memcpy((char*)pCol->pData, (char*)pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows - 1), pCol->info.bytes);
}
pQueryHandle->cur.win = (STimeWindow){pQueryHandle->window.skey, pQueryHandle->window.skey}; STsdbQueryHandle* pSecQueryHandle = tsdbQueryTablesImpl(pQueryHandle->pTsdb, &cond, pQueryHandle->qinfo, pQueryHandle->pMemRef);
pQueryHandle->window = pQueryHandle->cur.win; taosTFree(cond.colList);
pQueryHandle->cur.rows = 1;
pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
return true;
} else {
STimeWindow win = (STimeWindow) {pQueryHandle->window.skey, INT64_MAX};
STsdbQueryCond cond = {
.order = TSDB_ORDER_ASC,
.numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle))
};
cond.twindow = win;
cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo));
if (cond.colList == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return false;
}
for(int32_t i = 0; i < cond.numOfCols; ++i) { pSecQueryHandle->pTableCheckInfo = createCheckInfoFromCheckInfo(pQueryHandle->pTableCheckInfo, pSecQueryHandle->window.skey);
SColumnInfoData* pColInfoData = taosArrayGet(pQueryHandle->pColumns, i); if (pSecQueryHandle->pTableCheckInfo == NULL) {
memcpy(&cond.colList[i], &pColInfoData->info, sizeof(SColumnInfo)); tsdbCleanupQueryHandle(pSecQueryHandle);
} return false;
}
STsdbQueryHandle* pSecQueryHandle = tsdbQueryTablesImpl(pQueryHandle->pTsdb, &cond, pQueryHandle->qinfo, pQueryHandle->pMemRef); if (!tsdbNextDataBlock((void*) pSecQueryHandle)) {
tsdbCleanupQueryHandle(pSecQueryHandle);
return false;
}
taosTFree(cond.colList); tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo);
tsdbRetrieveDataBlock((void*) pSecQueryHandle, pSecQueryHandle->defaultLoadColumn);
pSecQueryHandle->pTableCheckInfo = createCheckInfoFromCheckInfo(pQueryHandle->pTableCheckInfo, pSecQueryHandle->window.skey); int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pSecQueryHandle));
if (pSecQueryHandle->pTableCheckInfo == NULL) { size_t si = taosArrayGetSize(pSecQueryHandle->pTableCheckInfo);
tsdbCleanupQueryHandle(pSecQueryHandle);
return false;
}
if (!tsdbNextDataBlock((void*) pSecQueryHandle)) { for (int32_t i = 0; i < numOfCols; ++i) {
tsdbCleanupQueryHandle(pSecQueryHandle); SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
return false; memcpy((char*)pCol->pData, (char*)pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows - 1), pCol->info.bytes);
}
tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo); SColumnInfoData* pCol1 = taosArrayGet(pSecQueryHandle->pColumns, i);
tsdbRetrieveDataBlock((void*) pSecQueryHandle, pSecQueryHandle->defaultLoadColumn); assert(pCol->info.colId == pCol1->info.colId);
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pSecQueryHandle)); memcpy((char*)pCol->pData + pCol->info.bytes, pCol1->pData, pCol1->info.bytes);
size_t si = taosArrayGetSize(pSecQueryHandle->pTableCheckInfo); }
for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pTSCol = taosArrayGet(pQueryHandle->pColumns, 0);
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
memcpy((char*)pCol->pData, (char*)pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows - 1), pCol->info.bytes);
SColumnInfoData* pCol1 = taosArrayGet(pSecQueryHandle->pColumns, i); // it is ascending order
assert(pCol->info.colId == pCol1->info.colId); pQueryHandle->order = TSDB_ORDER_DESC;
pQueryHandle->window = pQueryHandle->cur.win;
pQueryHandle->cur.win = (STimeWindow){((TSKEY*)pTSCol->pData)[0], ((TSKEY*)pTSCol->pData)[1]};
pQueryHandle->cur.rows = 2;
pQueryHandle->cur.mixBlock = true;
memcpy((char*)pCol->pData + pCol->info.bytes, pCol1->pData, pCol1->info.bytes); int32_t step = -1;// one step for ascending order traverse
} for (int32_t j = 0; j < si; ++j) {
STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, j);
pCheckInfo->lastKey = pQueryHandle->cur.win.ekey + step;
}
SColumnInfoData* pTSCol = taosArrayGet(pQueryHandle->pColumns, 0); tsdbCleanupQueryHandle(pSecQueryHandle);
}
// it is ascending order //disable it after retrieve data
pQueryHandle->order = TSDB_ORDER_DESC; pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
pQueryHandle->window = pQueryHandle->cur.win; pQueryHandle->checkFiles = false;
pQueryHandle->cur.win = (STimeWindow){((TSKEY*)pTSCol->pData)[0], ((TSKEY*)pTSCol->pData)[1]}; return true;
pQueryHandle->cur.rows = 2; }
pQueryHandle->cur.mixBlock = true;
int32_t step = -1;// one step for ascending order traverse // handle data in cache situation
for (int32_t j = 0; j < si; ++j) { bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, j); STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
pCheckInfo->lastKey = pQueryHandle->cur.win.ekey + step;
}
tsdbCleanupQueryHandle(pSecQueryHandle); int64_t stime = taosGetTimestampUs();
} int64_t elapsedTime = stime;
//disable it after retrieve data size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL; assert(numOfTables > 0);
pQueryHandle->checkFiles = false;
return true; if (pQueryHandle->type == TSDB_QUERY_TYPE_EXTERNAL) {
SMemRef* pMemRef = pQueryHandle->pMemRef;
tsdbMayTakeMemSnapshot(pQueryHandle);
bool ret = getNeighborRows(pQueryHandle);
tsdbMayUnTakeMemSnapshot(pQueryHandle);
// restore the pMemRef
pQueryHandle->pMemRef = pMemRef;
return ret;
} }
if (pQueryHandle->checkFiles) { if (pQueryHandle->checkFiles) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册