提交 392d5ef8 编写于 作者: S slzhou

fix: after first pass of select * from st order by ts on one vgroup

上级 68b369a4
......@@ -707,7 +707,8 @@ typedef struct SSttBlockLoadInfo {
SArray *aSttBlk;
int32_t blockIndex[2]; // to denote the loaded block in the corresponding position.
int32_t currentLoadBlockIndex;
LRUHandle *blockDataHandle;
SLRUCache* pBlockDataCache;
LRUHandle *blockDataHandle[2];
int32_t loadBlocks;
double elapsedTime;
STSchema *pSchema;
......@@ -800,6 +801,8 @@ bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
void tMergeTreeClose(SMergeTree *pMTree);
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, int32_t numOfStt);
void setLastBlockLoadInfoCache(SSttBlockLoadInfo* pLoadInfo, SLRUCache* pBlockDataCache);
void releaseLastBlockLoadInfoCacheHandle(SSttBlockLoadInfo *pLoadInfo);
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el);
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
......
......@@ -1214,13 +1214,20 @@ int32_t tsdbOpenCache(STsdb *pTsdb) {
taosLRUCacheSetStrictCapacity(pTsdb->sttBlockCache, false);
pTsdb->sttBlkCache = taosLRUCacheInit(4 * 1024 * 1024, -1, 0.0);
taosLRUCacheSetStrictCapacity(pTsdb->sttBlkCache, false);
_err:
pTsdb->lruCache = pCache;
return code;
}
void tsdbCloseCache(STsdb *pTsdb) {
taosLRUCacheEraseUnrefEntries(pTsdb->sttBlockCache);
taosLRUCacheCleanup(pTsdb->sttBlockCache);
taosLRUCacheEraseUnrefEntries(pTsdb->sttBlkCache);
taosLRUCacheCleanup(pTsdb->sttBlkCache);
SLRUCache *pCache = pTsdb->lruCache;
if (pCache) {
taosLRUCacheEraseUnrefEntries(pCache);
......
......@@ -45,11 +45,33 @@ SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList,
pLoadInfo[i].pSchema = pSchema;
pLoadInfo[i].colIds = colList;
pLoadInfo[i].numOfCols = numOfCols;
pLoadInfo[i].blockDataHandle[0] = NULL;
pLoadInfo[i].blockDataHandle[1] = NULL;
}
return pLoadInfo;
}
void setLastBlockLoadInfoCache(SSttBlockLoadInfo *pLoadInfo, SLRUCache *pBlockDataCache) {
for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) {
pLoadInfo[i].pBlockDataCache = pBlockDataCache;
}
}
void releaseLastBlockLoadInfoCacheHandle(SSttBlockLoadInfo *pLoadInfo) {
if (pLoadInfo == NULL) return;
for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) {
if (pLoadInfo[i].pBlockDataCache && pLoadInfo[i].blockDataHandle[0]) {
taosLRUCacheRelease(pLoadInfo[i].pBlockDataCache, pLoadInfo[i].blockDataHandle[0], false);
pLoadInfo[i].blockDataHandle[0] = NULL;
}
if (pLoadInfo[i].pBlockDataCache && pLoadInfo[i].blockDataHandle[1]) {
taosLRUCacheRelease(pLoadInfo[i].pBlockDataCache, pLoadInfo[i].blockDataHandle[1], false);
pLoadInfo[i].blockDataHandle[1] = NULL;
}
}
}
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) {
pLoadInfo[i].currentLoadBlockIndex = 1;
......@@ -61,6 +83,8 @@ void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
pLoadInfo[i].elapsedTime = 0;
pLoadInfo[i].loadBlocks = 0;
pLoadInfo[i].sttBlockLoaded = false;
pLoadInfo[i].blockDataHandle[0] = NULL;
pLoadInfo[i].blockDataHandle[1] = NULL;
}
}
......@@ -80,7 +104,14 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
pLoadInfo[i].currentLoadBlockIndex = 1;
pLoadInfo[i].blockIndex[0] = -1;
pLoadInfo[i].blockIndex[1] = -1;
if (pLoadInfo[i].pBlockDataCache && pLoadInfo[i].blockDataHandle[0]) {
taosLRUCacheRelease(pLoadInfo[i].pBlockDataCache, pLoadInfo[i].blockDataHandle[0], false);
pLoadInfo[i].blockDataHandle[0] = NULL;
}
if (pLoadInfo[i].pBlockDataCache && pLoadInfo[i].blockDataHandle[1]) {
taosLRUCacheRelease(pLoadInfo[i].pBlockDataCache, pLoadInfo[i].blockDataHandle[1], false);
pLoadInfo[i].blockDataHandle[1] = NULL;
}
tBlockDataDestroy(&pLoadInfo[i].blockData[0]);
tBlockDataDestroy(&pLoadInfo[i].blockData[1]);
......@@ -91,70 +122,23 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
return NULL;
}
typedef struct SSttBlockDataCacheKey{
int32_t fid;
int32_t iStt;
int64_t cid;
int64_t offset;
typedef struct SSttBlockDataCacheKey {
int32_t fid;
int32_t iStt;
int64_t cid;
int64_t offset;
} SSttBlockDataCacheKey;
static void deleteSttBlockDataCache(const void *key, size_t keyLen, void *value, void *ud) {
SBlockData* pBlockData = value;
SBlockData *pBlockData = value;
tBlockDataDestroy(pBlockData);
taosMemoryFree(pBlockData);
}
static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
int32_t code = 0;
// (pReader->pSet->fid, iStt, pReader->pSet->pHeadF->commitID) -> aSttBlk global cache
// (pIter->pReader->pSet->fid, pIter->iStt, pIter->pReader->pSet->pHeadF->commitID, pIter->pSttBlk->bInfo.offset) -> SBlockData
SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
if (pIter->pSttBlk == NULL || pInfo->pSchema == NULL) {
pInfo->blockDataHandle = NULL;
return NULL;
}
SSttBlockDataCacheKey key = {.fid = pIter->pReader->pSet->fid, .iStt = pIter->iStt, .cid = pIter->pReader->pSet->pHeadF->commitID, .offset = pIter->pSttBlk->bInfo.offset};
int32_t code = 0;
LRUHandle* h = taosLRUCacheLookup(pIter->pReader->pTsdb->sttBlockCache, &key, sizeof(struct SSttBlockDataCacheKey));
bool bFound = false;
if (!h) {
int64_t st = taosGetTimestampUs();
SSttBlockLoadInfo *pInfo = pIter->pBlockLoadInfo;
SBlockData* pBlockData = taosMemoryMalloc(sizeof(SBlockData));
TABLEID id = {0};
if (pIter->pSttBlk->suid != 0) {
id.suid = pIter->pSttBlk->suid;
} else {
id.uid = pIter->uid;
}
tBlockDataCreate(pBlockData);
tBlockDataInit(pBlockData, &id, pInfo->pSchema, pInfo->colIds, pInfo->numOfCols);
tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlockData);
double el = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("read last block, total load:%d, trigger by uid:%" PRIu64
", last file index:%d, last block index:%d, entry:%d, rows:%d, %p, elapsed time:%.2f ms, %s",
pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlockData->nRow,
pBlockData, el, idStr);
int charge = 1 * 1024 * 1024; //TODO
taosLRUCacheInsert(pIter->pReader->pTsdb->sttBlockCache, &key, sizeof(key), pBlockData, charge, deleteSttBlockDataCache, &h, TAOS_LRU_PRIORITY_LOW, NULL);
pInfo->elapsedTime += el;
pInfo->loadBlocks += 1;
} else {
tsdbDebug("use global cached last block, block index:%d, file index:%d, block data offset: %"PRId64 " due to uid:%" PRIu64 ", load data, %s",
pIter->iSttBlk, pIter->iStt, pIter->pSttBlk->bInfo.offset, pIter->uid, idStr);
bFound = true;
}
SBlockData* pBlockData = taosLRUCacheValue(pIter->pReader->pTsdb->sttBlockCache, h);
pInfo->blockDataHandle = h;
return pBlockData;
if (pInfo->blockIndex[0] == pIter->iSttBlk) {
if (pInfo->currentLoadBlockIndex != 0) {
......@@ -162,7 +146,8 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
pIter->iSttBlk, pIter->iStt, pIter->uid, idStr);
pInfo->currentLoadBlockIndex = 0;
}
return &pInfo->blockData[0];
SBlockData *pBlockData = taosLRUCacheValue(pInfo->pBlockDataCache, pInfo->blockDataHandle[0]);
return pBlockData;
}
if (pInfo->blockIndex[1] == pIter->iSttBlk) {
......@@ -171,7 +156,8 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
pIter->iSttBlk, pIter->iStt, pIter->uid, idStr);
pInfo->currentLoadBlockIndex = 1;
}
return &pInfo->blockData[1];
SBlockData *pBlockData = taosLRUCacheValue(pInfo->pBlockDataCache, pInfo->blockDataHandle[1]);
return pBlockData;
}
if (pIter->pSttBlk == NULL || pInfo->pSchema == NULL) {
......@@ -182,40 +168,57 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
pInfo->currentLoadBlockIndex ^= 1;
int64_t st = taosGetTimestampUs();
SBlockData *pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex];
SSttBlockDataCacheKey key = {.fid = pIter->pReader->pSet->fid,
.iStt = pIter->iStt,
.cid = pIter->pReader->pSet->pHeadF->commitID,
.offset = pIter->pSttBlk->bInfo.offset};
TABLEID id = {0};
if (pIter->pSttBlk->suid != 0) {
id.suid = pIter->pSttBlk->suid;
} else {
id.uid = pIter->uid;
}
LRUHandle *h = taosLRUCacheLookup(pInfo->pBlockDataCache, &key, sizeof(struct SSttBlockDataCacheKey));
if (!h) {
int64_t st = taosGetTimestampUs();
code = tBlockDataInit(pBlock, &id, pInfo->pSchema, pInfo->colIds, pInfo->numOfCols);
if (code != TSDB_CODE_SUCCESS) {
goto _exit;
}
code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlock);
if (code != TSDB_CODE_SUCCESS) {
goto _exit;
}
SBlockData *pBlockData = taosMemoryMalloc(sizeof(SBlockData));
TABLEID id = {0};
if (pIter->pSttBlk->suid != 0) {
id.suid = pIter->pSttBlk->suid;
} else {
id.uid = pIter->uid;
}
tBlockDataCreate(pBlockData);
tBlockDataInit(pBlockData, &id, pInfo->pSchema, pInfo->colIds, pInfo->numOfCols);
tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlockData);
double el = (taosGetTimestampUs() - st) / 1000.0;
double el = (taosGetTimestampUs() - st) / 1000.0;
pInfo->elapsedTime += el;
pInfo->loadBlocks += 1;
tsdbDebug("read last block, total load:%d, trigger by uid:%" PRIu64
", last file index:%d, block data offset: %" PRId64
", last block index:%d, entry:%d, rows:%d, %p, elapsed time:%.2f ms, %s",
pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->pSttBlk->bInfo.offset, pIter->iSttBlk,
pInfo->currentLoadBlockIndex, pBlockData->nRow, pBlockData, el, idStr);
int charge = 1 * 1024 * 1024; // TODO
taosLRUCacheInsert(pIter->pReader->pTsdb->sttBlockCache, &key, sizeof(key), pBlockData, charge,
deleteSttBlockDataCache, &h, TAOS_LRU_PRIORITY_LOW, NULL);
pInfo->elapsedTime += el;
pInfo->loadBlocks += 1;
} else {
tsdbDebug("use global cached last block, block index:%d, file index:%d, block data offset: %" PRId64
" due to uid:%" PRIu64 ", load data, %s",
pIter->iSttBlk, pIter->iStt, pIter->pSttBlk->bInfo.offset, pIter->uid, idStr);
}
tsdbDebug("read last block, total load:%d, trigger by uid:%" PRIu64
", last file index:%d, last block index:%d, entry:%d, rows:%d, %p, elapsed time:%.2f ms, %s",
pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock->nRow,
pBlock, el, idStr);
SBlockData *pBlockData = taosLRUCacheValue(pIter->pReader->pTsdb->sttBlockCache, h);
taosLRUCacheRelease(pInfo->pBlockDataCache, pInfo->blockDataHandle[pInfo->currentLoadBlockIndex], false);
pInfo->blockDataHandle[pInfo->currentLoadBlockIndex] = h;
pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1;
tsdbDebug("last block index list:%d, %d, rowIndex:%d %s", pInfo->blockIndex[0], pInfo->blockIndex[1], pIter->iRow,
idStr);
return &pInfo->blockData[pInfo->currentLoadBlockIndex];
return pBlockData;
_exit:
if (code != TSDB_CODE_SUCCESS) {
......@@ -404,7 +407,8 @@ int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t
return code;
}
void tLDataIterClose(SLDataIter *pIter) { /*taosMemoryFree(pIter); */}
void tLDataIterClose(SLDataIter *pIter) { /*taosMemoryFree(pIter); */
}
void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) {
int32_t step = pIter->backward ? -1 : 1;
......@@ -468,7 +472,6 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
bool hasVal = false;
int32_t i = pIter->iRow;
SBlockData *pBlockData = loadLastBlock(pIter, idStr);
// mostly we only need to find the start position for a given table
......@@ -573,7 +576,6 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
}
if (skipBlock || pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) {
taosLRUCacheRelease(pIter->pReader->pTsdb->sttBlockCache, pIter->pBlockLoadInfo->blockDataHandle, false);
tLDataIterNextBlock(pIter, idStr);
if (pIter->pSttBlk == NULL) { // no more data
goto _exit;
......@@ -632,7 +634,7 @@ static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SR
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter* pLDataIter) {
bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter *pLDataIter) {
int32_t code = TSDB_CODE_SUCCESS;
pMTree->backward = backward;
......@@ -641,7 +643,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
if (!pMTree->backward) { // asc
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
} else { // desc
} else { // desc
tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
}
......
......@@ -561,6 +561,7 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdb
int32_t numOfStt = pReader->pTsdb->pVnode->config.sttTrigger;
pLReader->pInfo = tCreateLastBlockLoadInfo(pReader->pSchema, &pInfo->colId[1], pInfo->numOfCols - 1, numOfStt);
setLastBlockLoadInfoCache(pLReader->pInfo, pReader->pTsdb->sttBlockCache);
if (pLReader->pInfo == NULL) {
tsdbDebug("init fileset iterator failed, code:%s %s", tstrerror(terrno), pReader->idStr);
return terrno;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册