提交 023b8377 编写于 作者: M Minglei Jin

enh(tsdb/cache): new block index cache for querying

上级 aa12f502
......@@ -321,6 +321,8 @@ struct STsdb {
STsdbFS fs;
SLRUCache *lruCache;
TdThreadMutex lruMutex;
SLRUCache *biCache;
TdThreadMutex biMutex;
};
struct TSDBKEY {
......@@ -746,6 +748,9 @@ int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr,
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h);
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHandle **handle);
int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h);
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
......
......@@ -15,6 +15,34 @@
#include "tsdb.h"
static int32_t tsdbOpenBICache(STsdb *pTsdb) {
int32_t code = 0;
SLRUCache *pCache = taosLRUCacheInit(1 * 1024 * 1024, -1, .5);
if (pCache == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
taosLRUCacheSetStrictCapacity(pCache, false);
taosThreadMutexInit(&pTsdb->biMutex, NULL);
_err:
pTsdb->biCache = pCache;
return code;
}
static void tsdbCloseBICache(STsdb *pTsdb) {
SLRUCache *pCache = pTsdb->biCache;
if (pCache) {
taosLRUCacheEraseUnrefEntries(pCache);
taosLRUCacheCleanup(pCache);
taosThreadMutexDestroy(&pTsdb->biMutex);
}
}
int32_t tsdbOpenCache(STsdb *pTsdb) {
int32_t code = 0;
SLRUCache *pCache = NULL;
......@@ -26,6 +54,12 @@ int32_t tsdbOpenCache(STsdb *pTsdb) {
goto _err;
}
code = tsdbOpenBICache(pTsdb);
if (code != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
taosLRUCacheSetStrictCapacity(pCache, false);
taosThreadMutexInit(&pTsdb->lruMutex, NULL);
......@@ -44,6 +78,8 @@ void tsdbCloseCache(STsdb *pTsdb) {
taosThreadMutexDestroy(&pTsdb->lruMutex);
}
tsdbCloseBICache(pTsdb);
}
static void getTableCacheKey(tb_uid_t uid, int cacheType, char *key, int *len) {
......@@ -1475,3 +1511,84 @@ size_t tsdbCacheGetUsage(SVnode *pVnode) {
return usage;
}
static void getBICacheKey(int32_t fid, int64_t commitID, char *key, int *len) {
struct {
int32_t fid;
int64_t commitID;
} biKey = {0};
biKey.fid = fid;
biKey.commitID = commitID;
*len = sizeof(biKey);
memcpy(key, &biKey, *len);
}
static int32_t tsdbCacheLoadBlockIdx(SDataFReader *pFileReader, SArray **aBlockIdx) {
SArray *pArray = taosArrayInit(8, sizeof(SBlockIdx));
int32_t code = tsdbReadBlockIdx(pFileReader, pArray);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pArray);
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
*aBlockIdx = pArray;
return code;
}
static void deleteBICache(const void *key, size_t keyLen, void *value) {
SArray *pArray = (SArray *)value;
taosArrayDestroy(pArray);
}
int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHandle **handle) {
int32_t code = 0;
char key[128] = {0};
int keyLen = 0;
getBICacheKey(pFileReader->pSet->fid, pFileReader->pSet->pHeadF->commitID, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (!h) {
STsdb *pTsdb = pFileReader->pTsdb;
taosThreadMutexLock(&pTsdb->biMutex);
h = taosLRUCacheLookup(pCache, key, keyLen);
if (!h) {
SArray *pArray = NULL;
code = tsdbCacheLoadBlockIdx(pFileReader, &pArray);
// if table's empty or error, return code of -1
if (code != TSDB_CODE_SUCCESS || pArray == NULL) {
taosThreadMutexUnlock(&pTsdb->biMutex);
*handle = NULL;
return 0;
}
size_t charge = pArray->capacity * pArray->elemSize + sizeof(*pArray);
_taos_lru_deleter_t deleter = deleteBICache;
LRUStatus status = taosLRUCacheInsert(pCache, key, keyLen, pArray, charge, deleter, &h, TAOS_LRU_PRIORITY_LOW);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
}
taosThreadMutexUnlock(&pTsdb->biMutex);
}
*handle = h;
return code;
}
int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h) {
int32_t code = 0;
taosLRUCacheRelease(pCache, h, false);
return code;
}
......@@ -222,17 +222,18 @@ static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, int32_t numOfCols) {
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList,
int32_t numOfCols) {
pSupInfo->smaValid = true;
pSupInfo->numOfCols = numOfCols;
pSupInfo->colId = taosMemoryMalloc(numOfCols * (sizeof(int16_t)*2 + POINTER_BYTES));
pSupInfo->colId = taosMemoryMalloc(numOfCols * (sizeof(int16_t) * 2 + POINTER_BYTES));
if (pSupInfo->colId == NULL) {
taosMemoryFree(pSupInfo->colId);
return TSDB_CODE_OUT_OF_MEMORY;
}
pSupInfo->slotId = (int16_t*)((char*)pSupInfo->colId + (sizeof(int16_t) * numOfCols));
pSupInfo->buildBuf = (char**) ((char*)pSupInfo->slotId + (sizeof(int16_t) * numOfCols));
pSupInfo->buildBuf = (char**)((char*)pSupInfo->slotId + (sizeof(int16_t) * numOfCols));
for (int32_t i = 0; i < numOfCols; ++i) {
pSupInfo->colId[i] = pCols[i].colId;
pSupInfo->slotId[i] = pSlotIdList[i];
......@@ -250,7 +251,7 @@ static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pC
static int32_t updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) {
int32_t i = 0, j = 0;
while(i < pSchema->numOfCols && j < pSupInfo->numOfCols) {
while (i < pSchema->numOfCols && j < pSupInfo->numOfCols) {
STColumn* pTCol = &pSchema->columns[i];
if (pTCol->colId == pSupInfo->colId[j]) {
if (!IS_BSMA_ON(pTCol)) {
......@@ -315,7 +316,8 @@ static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
}
// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, int32_t numOfTables) {
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
int32_t numOfTables) {
// allocate buffer in order to load data blocks from file
// todo use simple hash instead, optimize the memory consumption
SHashObj* pTableMap =
......@@ -401,9 +403,7 @@ static void destroyAllBlockScanInfo(SHashObj* pTableMap) {
taosHashCleanup(pTableMap);
}
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) {
return pWindow->skey > pWindow->ekey;
}
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) { return pWindow->skey > pWindow->ekey; }
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
......@@ -647,14 +647,17 @@ _end:
}
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
// SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
int64_t st = taosGetTimestampUs();
int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
if (code != TSDB_CODE_SUCCESS) {
// int32_t code = tsdbReadBlockIdx(pFileReader, aBlockIdx);
LRUHandle* handle = NULL;
int32_t code = tsdbCacheGetBlockIdx(pFileReader->pTsdb->biCache, pFileReader, &handle);
if (code != TSDB_CODE_SUCCESS || handle == NULL) {
goto _end;
}
SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle);
size_t num = taosArrayGetSize(aBlockIdx);
if (num == 0) {
taosArrayDestroy(aBlockIdx);
......@@ -693,7 +696,8 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
pReader->cost.headFileLoadTime += (et1 - st) / 1000.0;
_end:
taosArrayDestroy(aBlockIdx);
// taosArrayDestroy(aBlockIdx);
tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
return code;
}
......@@ -772,7 +776,6 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
pReader->idStr);
pReader->cost.numOfBlocks += total;
pReader->cost.headFileLoadTime += el;
......@@ -915,22 +918,21 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo
p = pData->pData + tDataTypes[pData->type].bytes * startIndex;
}
int32_t step = asc? 1:-1;
int32_t step = asc ? 1 : -1;
// make sure it is aligned to 8bit, the allocated memory address is aligned to 256bit
// ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0);
// ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0);
// 1. copy data in a batch model
memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes);
// 2. reverse the array list in case of descending order scan data block
if (!asc) {
switch(pColData->info.type) {
switch (pColData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
{
case TSDB_DATA_TYPE_UBIGINT: {
int32_t mid = dumpedRows >> 1u;
int64_t* pts = (int64_t*)pColData->pData;
for (int32_t j = 0; j < mid; ++j) {
......@@ -1116,7 +1118,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%"PRIu64" elapsed time:%.2f ms, %s",
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%" PRIu64 " elapsed time:%.2f ms, %s",
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, dumpedRows,
unDumpedRows, pBlock->minVer, pBlock->maxVer, pBlockInfo->uid, elapsedTime, pReader->idStr);
......@@ -2661,7 +2663,6 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
taosArrayDestroy(pIndexList);
if (pReader->pReadSnap != NULL) {
SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile;
if (pReader->pDelFReader == NULL && pDelFile != NULL) {
int32_t code = tsdbDelFReaderOpen(&pReader->pDelFReader, pDelFile, pReader->pTsdb);
......@@ -2849,7 +2850,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
ASSERT(pBlockInfo != NULL);
// if (pBlockInfo != NULL) {
// if (pBlockInfo != NULL) {
pScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
// } else {
// pScanInfo = *pReader->status.pTableIter;
......@@ -2861,9 +2862,9 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
return code;
}
// if (pBlockInfo != NULL) {
// if (pBlockInfo != NULL) {
pBlock = getCurrentBlock(pBlockIter);
// }
// }
initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader);
......@@ -2928,7 +2929,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
// update the last key for the corresponding table
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey;
tsdbDebug("%p uid:%" PRIu64 " clean file block retrieved from file, global index:%d, "
tsdbDebug("%p uid:%" PRIu64
" clean file block retrieved from file, global index:%d, "
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->nRow, pBlock->minKey.ts,
pBlock->maxKey.ts, pReader->idStr);
......@@ -3183,7 +3185,8 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
return false;
} else if (pKey->ts == last->ts) {
TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
return (prev->version >= pKey->version && prev->version <= pVerRange->maxVer && prev->version >= pVerRange->minVer);
return (prev->version >= pKey->version && prev->version <= pVerRange->maxVer &&
prev->version >= pVerRange->minVer);
}
} else {
TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
......@@ -3619,7 +3622,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow*
SColVal colVal = {0};
int32_t i = 0, j = 0;
if (pSupInfo->colId[i]== PRIMARYKEY_TIMESTAMP_COL_ID) {
if (pSupInfo->colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
((int64_t*)pColData->pData)[outputRowIndex] = pTSRow->ts;
i += 1;
......@@ -3664,7 +3667,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
int32_t outputRowIndex = pResBlock->info.rows;
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
if (pReader->suppInfo.colId[i]== PRIMARYKEY_TIMESTAMP_COL_ID) {
if (pReader->suppInfo.colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
((int64_t*)pColData->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex];
i += 1;
......@@ -4006,17 +4009,16 @@ void tsdbReaderClose(STsdbReader* pReader) {
taosMemoryFree(pLReader);
}
tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
tsdbDebug(
"%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
" SMA-time:%.2f ms, fileBlocks:%" PRId64
", fileBlocks-load-time:%.2f ms, "
"build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64
", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
"build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, creatTime:%.2f ms,"
", getTbFromMem-time:%.2f ms, getTbFromIMem-time:%.2f ms, initDelSkylineIterTime:%.2f ms, %s",
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime,
pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad,
pCost->lastBlockLoadTime, pCost->composedBlocks, pCost->buildComposedBlockTime,
numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList,
pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaDataLoad, pCost->smaLoadTime, pCost->numOfBlocks,
pCost->blockLoadTime, pCost->buildmemBlock, pCost->lastBlockLoad, pCost->lastBlockLoadTime, pCost->composedBlocks,
pCost->buildComposedBlockTime, numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pCost->createScanInfoList,
pCost->getTbFromMemTime, pCost->getTbFromIMemTime, pCost->initDelSkylineIterTime, pReader->idStr);
taosMemoryFree(pReader->idStr);
......@@ -4033,7 +4035,7 @@ static bool doTsdbNextDataBlock(STsdbReader* pReader) {
blockDataCleanup(pBlock);
SReaderStatus* pStatus = &pReader->status;
if (taosHashGetSize(pStatus->pTableMap) == 0){
if (taosHashGetSize(pStatus->pTableMap) == 0) {
return false;
}
......@@ -4123,12 +4125,10 @@ void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64
}
}
static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols,
SColumnDataAgg* pTsAgg) {
static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols, SColumnDataAgg* pTsAgg) {
// do fill all null column value SMA info
int32_t i = 0, j = 0;
int32_t size = (int32_t) taosArrayGetSize(pSup->pColAgg);
int32_t size = (int32_t)taosArrayGetSize(pSup->pColAgg);
taosArrayInsert(pSup->pColAgg, 0, pTsAgg);
while (j < numOfCols && i < size) {
......@@ -4141,7 +4141,7 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
} else if (pSup->colId[j] < pAgg->colId) {
if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
taosArrayInsert(pSup->pColAgg, i ,&nullColAgg);
taosArrayInsert(pSup->pColAgg, i, &nullColAgg);
}
j += 1;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册