提交 7360a6b6 编写于 作者: H Haojun Liao

enh(query): opt hash.

上级 d73306f6
......@@ -16,6 +16,7 @@
#ifndef _TD_VNODE_TSDB_H_
#define _TD_VNODE_TSDB_H_
#include "tsimplehash.h"
#include "vnodeInt.h"
#ifdef __cplusplus
......@@ -224,7 +225,7 @@ int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward,
void *tsdbTbDataIterDestroy(STbDataIter *pIter);
void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter);
bool tsdbTbDataIterNext(STbDataIter *pIter);
void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj *pTableMap, int64_t *rowsNum);
void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum);
// STbData
int32_t tsdbGetNRowsInTbData(STbData *pTbData);
......
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <util/tsimplehash.h>
#include "tsdb.h"
#define MEM_MIN_HASH 1024
......@@ -298,12 +299,12 @@ int64_t tsdbCountTbDataRows(STbData *pTbData) {
return rowsNum;
}
void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum) {
void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj* pTableMap, int64_t *rowsNum) {
taosRLockLatch(&pMemTable->latch);
for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
STbData *pTbData = pMemTable->aBucket[i];
while (pTbData) {
void* p = taosHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid));
void* p = tSimpleHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid));
if (p == NULL) {
pTbData = pTbData->next;
continue;
......
......@@ -125,12 +125,12 @@ typedef struct SFileDataBlockInfo {
} SFileDataBlockInfo;
typedef struct SDataBlockIter {
int32_t numOfBlocks;
int32_t index;
SArray* blockList; // SArray<SFileDataBlockInfo>
int32_t order;
SDataBlk block; // current SDataBlk data
SHashObj* pTableMap;
int32_t numOfBlocks;
int32_t index;
SArray* blockList; // SArray<SFileDataBlockInfo>
int32_t order;
SDataBlk block; // current SDataBlk data
SSHashObj* pTableMap;
} SDataBlockIter;
typedef struct SFileBlockDumpInfo {
......@@ -148,7 +148,7 @@ typedef struct STableUidList {
typedef struct SReaderStatus {
bool loadFromFile; // check file stage
bool composedDataBlock; // the returned data block is a composed block or not
SHashObj* pTableMap; // SHash<STableBlockScanInfo>
SSHashObj* pTableMap; // SHash<STableBlockScanInfo>
STableBlockScanInfo** pTableIter; // table iterator used in building in-memory buffer data blocks.
STableUidList uidList; // check tables in uid order, to avoid the repeatly load of blocks in STT.
SFileBlockDumpInfo fBlockDumpInfo;
......@@ -233,7 +233,7 @@ static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFil
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
static STableBlockScanInfo* getTableBlockScanInfo(SHashObj* pTableMap, uint64_t uid, const char* id);
static STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id);
static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid);
......@@ -384,12 +384,11 @@ static int32_t uidComparFunc(const void* p1, const void* p2) {
}
// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
static SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
STableUidList* pUidList, int32_t numOfTables) {
// allocate buffer in order to load data blocks from file
// todo use simple hash instead, optimize the memory consumption
SHashObj* pTableMap =
taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
SSHashObj* pTableMap = tSimpleHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
if (pTableMap == NULL) {
return NULL;
}
......@@ -399,7 +398,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf
pUidList->tableUidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
if (pUidList->tableUidList == NULL) {
taosHashCleanup(pTableMap);
tSimpleHashCleanup(pTableMap);
return NULL;
}
......@@ -421,7 +420,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf
pScanInfo->lastKeyInStt = ekey;
}
taosHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid,
pScanInfo->lastKey, pTsdbReader->idStr);
}
......@@ -436,9 +435,11 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf
return pTableMap;
}
static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts, int32_t step) {
STableBlockScanInfo** p = NULL;
while ((p = taosHashIterate(pTableMap, p)) != NULL) {
static void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) {
void *p = NULL;
int32_t iter = 0;
while ((p = tSimpleHashIterate(pTableMap, p, &iter)) != NULL) {
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
pInfo->iterInit = false;
......@@ -478,13 +479,15 @@ static void clearBlockScanInfo(STableBlockScanInfo* p) {
tMapDataClear(&p->mapData);
}
static void destroyAllBlockScanInfo(SHashObj* pTableMap) {
static void destroyAllBlockScanInfo(SSHashObj* pTableMap) {
void* p = NULL;
while ((p = taosHashIterate(pTableMap, p)) != NULL) {
int32_t iter = 0;
while ((p = tSimpleHashIterate(pTableMap, p, &iter)) != NULL) {
clearBlockScanInfo(*(STableBlockScanInfo**)p);
}
taosHashCleanup(pTableMap);
tSimpleHashCleanup(pTableMap);
}
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) { return pWindow->skey > pWindow->ekey; }
......@@ -800,7 +803,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
goto _end;
}
int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle);
size_t num = taosArrayGetSize(aBlockIdx);
......@@ -864,10 +867,12 @@ _end:
return code;
}
static void cleanupTableScanInfo(SHashObj* pTableMap) {
static void cleanupTableScanInfo(SSHashObj* pTableMap) {
STableBlockScanInfo** px = NULL;
int32_t iter = 0;
while (1) {
px = taosHashIterate(pTableMap, px);
px = tSimpleHashIterate(pTableMap, px, &iter);
if (px == NULL) {
break;
}
......@@ -1439,7 +1444,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
pBlockIter->pTableMap = pReader->status.pTableMap;
// access data blocks according to the offset of each block in asc/desc order.
int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
int32_t numOfTables = (int32_t)tSimpleHashGetSize(pReader->status.pTableMap);
int64_t st = taosGetTimestampUs();
int32_t code = initBlockOrderSupporter(&sup, numOfTables);
......@@ -1449,8 +1454,10 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
int32_t cnt = 0;
void* ptr = NULL;
int32_t iter = 0;
while (1) {
ptr = taosHashIterate(pReader->status.pTableMap, ptr);
ptr = tSimpleHashIterate(pReader->status.pTableMap, ptr, &iter);
if (ptr == NULL) {
break;
}
......@@ -2916,7 +2923,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
pBlockNum->numOfBlocks = 0;
pBlockNum->numOfLastFiles = 0;
size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
size_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
while (1) {
......@@ -2985,18 +2992,18 @@ static void resetTableListIndex(SReaderStatus* pStatus) {
pList->currentIndex = 0;
uint64_t uid = pList->tableUidList[0];
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
pStatus->pTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid));
}
static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pStatus) {
pOrderedCheckInfo->currentIndex += 1;
if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
if (pOrderedCheckInfo->currentIndex >= tSimpleHashGetSize(pStatus->pTableMap)) {
pStatus->pTableIter = NULL;
return false;
}
uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
pStatus->pTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid));
return (pStatus->pTableIter != NULL);
}
......@@ -3006,7 +3013,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
STableUidList* pUidList = &pStatus->uidList;
int32_t code = TSDB_CODE_SUCCESS;
if (taosHashGetSize(pStatus->pTableMap) == 0) {
if (tSimpleHashGetSize(pStatus->pTableMap) == 0) {
return TSDB_CODE_SUCCESS;
}
......@@ -3168,7 +3175,7 @@ static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReade
goto _end;
}
int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle);
size_t num = taosArrayGetSize(aBlockIdx);
......@@ -3178,14 +3185,13 @@ static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReade
}
SBlockIdx* pBlockIdx = NULL;
int32_t i = 0;
for (int32_t i = 0; i < num; ++i) {
pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
if (pBlockIdx->suid != pReader->suid) {
continue;
}
STableBlockScanInfo** p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(pBlockIdx->uid));
STableBlockScanInfo** p = tSimpleHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(pBlockIdx->uid));
if (p == NULL) {
continue;
}
......@@ -3231,13 +3237,13 @@ static int32_t doSumSttBlockRows(STsdbReader* pReader) {
taosArrayClear(pBlockLoadInfo->aSttBlk);
continue;
}
for (int32_t i = 0; i < size; ++i) {
SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
for (int32_t j = 0; j < size; ++j) {
SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, j);
pReader->rowsNum += p->nRow;
}
} else {
for (int32_t i = 0; i < size; ++i) {
SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
for (int32_t j = 0; j < size; ++j) {
SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, j);
uint64_t s = p->suid;
if (s < pReader->suid) {
continue;
......@@ -3307,13 +3313,6 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
STableUidList* pUidList = &pStatus->uidList;
while (1) {
// if (pStatus->pTableIter == NULL) {
// pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
// if (pStatus->pTableIter == NULL) {
// return TSDB_CODE_SUCCESS;
// }
// }
STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
initMemDataIterator(*pBlockScanInfo, pReader);
......@@ -3341,7 +3340,7 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter)
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
if (pBlockInfo) {
STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
STableBlockScanInfo* pScanInfo = tSimpleHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
if (pScanInfo) {
lastKey = pScanInfo->lastKey;
}
......@@ -4169,10 +4168,12 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
// TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
int32_t size = taosHashGetSize(pReader->status.pTableMap);
int32_t size = tSimpleHashGetSize(pReader->status.pTableMap);
STableBlockScanInfo** p = NULL;
while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
int32_t iter = 0;
while ((p = tSimpleHashIterate(pReader->status.pTableMap, p, &iter)) != NULL) {
clearBlockScanInfo(*p);
}
......@@ -4190,7 +4191,7 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n
pReader->status.uidList.tableUidList = (uint64_t*)p1;
}
taosHashClear(pReader->status.pTableMap);
tSimpleHashClear(pReader->status.pTableMap);
STableUidList* pUidList = &pReader->status.uidList;
pUidList->currentIndex = 0;
......@@ -4211,7 +4212,7 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n
pInfo->lastKeyInStt = ekey;
}
taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
tSimpleHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
}
return TDB_CODE_SUCCESS;
......@@ -4428,7 +4429,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
tBlockDataDestroy(&pReader->status.fileBlockData);
cleanupDataBlockIterator(&pReader->status.blockIter);
size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
size_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
if (pReader->status.pTableMap != NULL) {
destroyAllBlockScanInfo(pReader->status.pTableMap);
clearBlockScanInfoBuf(&pReader->blockInfoBuf);
......@@ -4510,8 +4511,9 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
// resetDataBlockScanInfo excluding lastKey
STableBlockScanInfo** p = NULL;
int32_t iter = 0;
while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) {
while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) {
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
pInfo->iterInit = false;
......@@ -4532,8 +4534,9 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
} else {
// resetDataBlockScanInfo excluding lastKey
STableBlockScanInfo** p = NULL;
int32_t iter = 0;
while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) {
while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) {
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
pInfo->iterInit = false;
......@@ -4622,7 +4625,7 @@ int32_t tsdbReaderResume(STsdbReader* pReader) {
// restore reader's state
// task snapshot
int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
if (numOfTables > 0) {
qTrace("tsdb/reader: %p, take snapshot", pReader);
code = tsdbTakeReadSnap(pReader, tsdbSetQueryReseek, &pReader->pReadSnap);
......@@ -4701,7 +4704,7 @@ static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) {
*hasNext = false;
SReaderStatus* pStatus = &pReader->status;
if (taosHashGetSize(pStatus->pTableMap) == 0) {
if (tSimpleHashGetSize(pStatus->pTableMap) == 0) {
return code;
}
......@@ -4954,11 +4957,11 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
return code;
}
STableBlockScanInfo* getTableBlockScanInfo(SHashObj* pTableMap, uint64_t uid, const char* id) {
STableBlockScanInfo** p = taosHashGet(pTableMap, &uid, sizeof(uid));
STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id) {
STableBlockScanInfo** p = tSimpleHashGet(pTableMap, &uid, sizeof(uid));
if (p == NULL || *p == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
int32_t size = taosHashGetSize(pTableMap);
int32_t size = tSimpleHashGetSize(pTableMap);
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", uid, size, id);
return NULL;
}
......@@ -5044,7 +5047,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
tsdbDataFReaderClose(&pReader->pFileReader);
int32_t numOfTables = taosHashGetSize(pStatus->pTableMap);
int32_t numOfTables = tSimpleHashGetSize(pStatus->pTableMap);
initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
resetDataBlockIterator(pBlockIter, pReader->order);
......@@ -5115,7 +5118,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
pTableBlockInfo->numOfFiles += 1;
int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
int32_t numOfTables = (int32_t)tSimpleHashGetSize(pStatus->pTableMap);
int defaultRows = 4096;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
......@@ -5179,7 +5182,8 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
tsdbReaderResume(pReader);
}
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
int32_t iter = 0;
pStatus->pTableIter = tSimpleHashIterate(pStatus->pTableMap, NULL, &iter);
while (pStatus->pTableIter != NULL) {
STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
......@@ -5201,7 +5205,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
}
// current table is exhausted, let's try the next table
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
pStatus->pTableIter = tSimpleHashIterate(pStatus->pTableMap, pStatus->pTableIter, &iter);
}
tsdbReleaseReader(pReader);
......
......@@ -361,10 +361,6 @@ int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t ke
return TSDB_CODE_SUCCESS;
}
static void destroyItems(void* pItem) {
taosMemoryFree(*(void**)pItem);
}
void tSimpleHashClear(SSHashObj *pHashObj) {
if (!pHashObj || taosHashTableEmpty(pHashObj)) {
return;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册