提交 a84efab2 编写于 作者: H Haojun Liao

enh(tsdb): add table filter according to stt block statistics data.

上级 a127e915
...@@ -710,9 +710,8 @@ typedef struct { ...@@ -710,9 +710,8 @@ typedef struct {
typedef struct SSttBlockLoadInfo { typedef struct SSttBlockLoadInfo {
SBlockData blockData[2]; SBlockData blockData[2];
void *pBlockArray; void *pSttStatisBlkArray;
SArray *aSttBlk; SArray *aSttBlk;
SArray *pTombBlockArray; // tomb block array list
int32_t blockIndex[2]; // to denote the loaded block in the corresponding position. int32_t blockIndex[2]; // to denote the loaded block in the corresponding position.
int32_t currentLoadBlockIndex; int32_t currentLoadBlockIndex;
int32_t loadBlocks; int32_t loadBlocks;
...@@ -720,10 +719,9 @@ typedef struct SSttBlockLoadInfo { ...@@ -720,10 +719,9 @@ typedef struct SSttBlockLoadInfo {
STSchema *pSchema; STSchema *pSchema;
int16_t *colIds; int16_t *colIds;
int32_t numOfCols; int32_t numOfCols;
bool checkRemainingRow; bool checkRemainingRow; // todo: no assign value?
bool isLast; bool isLast;
bool sttBlockLoaded; bool sttBlockLoaded;
int32_t numOfStt;
// keep the last access position, this position may be used to reduce the binary times for // keep the last access position, this position may be used to reduce the binary times for
// starting last block data for a new table // starting last block data for a new table
......
...@@ -1799,10 +1799,6 @@ static int32_t loadSttTomb(STsdbReader *pTsdbReader, SSttFileReader *pSttFileRea ...@@ -1799,10 +1799,6 @@ static int32_t loadSttTomb(STsdbReader *pTsdbReader, SSttFileReader *pSttFileRea
SCacheRowsReader *pReader = (SCacheRowsReader *)pTsdbReader; SCacheRowsReader *pReader = (SCacheRowsReader *)pTsdbReader;
if (pLoadInfo->pTombBlockArray == NULL) {
pLoadInfo->pTombBlockArray = taosArrayInit(4, POINTER_BYTES);
}
const TTombBlkArray *pBlkArray = NULL; const TTombBlkArray *pBlkArray = NULL;
code = tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray); code = tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
......
...@@ -29,8 +29,6 @@ SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, ...@@ -29,8 +29,6 @@ SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList,
return NULL; return NULL;
} }
pLoadInfo->numOfStt = numOfSttTrigger;
for (int32_t i = 0; i < numOfSttTrigger; ++i) { for (int32_t i = 0; i < numOfSttTrigger; ++i) {
pLoadInfo[i].blockIndex[0] = -1; pLoadInfo[i].blockIndex[0] = -1;
pLoadInfo[i].blockIndex[1] = -1; pLoadInfo[i].blockIndex[1] = -1;
...@@ -61,7 +59,6 @@ SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colLi ...@@ -61,7 +59,6 @@ SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colLi
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pLoadInfo->numOfStt = 1;
pLoadInfo->blockIndex[0] = -1; pLoadInfo->blockIndex[0] = -1;
pLoadInfo->blockIndex[1] = -1; pLoadInfo->blockIndex[1] = -1;
...@@ -78,7 +75,6 @@ SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colLi ...@@ -78,7 +75,6 @@ SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colLi
} }
pLoadInfo->aSttBlk = taosArrayInit(4, sizeof(SSttBlk)); pLoadInfo->aSttBlk = taosArrayInit(4, sizeof(SSttBlk));
pLoadInfo->pTombBlockArray = taosArrayInit(4, POINTER_BYTES);
pLoadInfo->pSchema = pSchema; pLoadInfo->pSchema = pSchema;
pLoadInfo->colIds = colList; pLoadInfo->colIds = colList;
pLoadInfo->numOfCols = numOfCols; pLoadInfo->numOfCols = numOfCols;
...@@ -87,7 +83,7 @@ SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colLi ...@@ -87,7 +83,7 @@ SSttBlockLoadInfo *tCreateOneLastBlockLoadInfo(STSchema *pSchema, int16_t *colLi
} }
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) { for (int32_t i = 0; i < 1; ++i) {
pLoadInfo[i].currentLoadBlockIndex = 1; pLoadInfo[i].currentLoadBlockIndex = 1;
pLoadInfo[i].blockIndex[0] = -1; pLoadInfo[i].blockIndex[0] = -1;
pLoadInfo[i].blockIndex[1] = -1; pLoadInfo[i].blockIndex[1] = -1;
...@@ -101,7 +97,7 @@ void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { ...@@ -101,7 +97,7 @@ void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
} }
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el) { void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el) {
for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) { for (int32_t i = 0; i < 1; ++i) {
*el += pLoadInfo[i].elapsedTime; *el += pLoadInfo[i].elapsedTime;
*blocks += pLoadInfo[i].loadBlocks; *blocks += pLoadInfo[i].loadBlocks;
} }
...@@ -118,7 +114,7 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { ...@@ -118,7 +114,7 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
return NULL; return NULL;
} }
for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) { for (int32_t i = 0; i < 1; ++i) {
pLoadInfo[i].currentLoadBlockIndex = 1; pLoadInfo[i].currentLoadBlockIndex = 1;
pLoadInfo[i].blockIndex[0] = -1; pLoadInfo[i].blockIndex[0] = -1;
pLoadInfo[i].blockIndex[1] = -1; pLoadInfo[i].blockIndex[1] = -1;
...@@ -129,8 +125,6 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { ...@@ -129,8 +125,6 @@ void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) {
taosArrayDestroy(pLoadInfo[i].aSttBlk); taosArrayDestroy(pLoadInfo[i].aSttBlk);
} }
taosArrayDestroyEx(pLoadInfo->pTombBlockArray, freeTombBlock);
taosMemoryFree(pLoadInfo); taosMemoryFree(pLoadInfo);
return NULL; return NULL;
} }
...@@ -317,8 +311,8 @@ int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t ...@@ -317,8 +311,8 @@ int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t
return 0; return 0;
} }
static int32_t loadSttBlockInfo(SLDataIter *pIter, SSttBlockLoadInfo *pBlockLoadInfo, uint64_t suid) { static int32_t extractSttBlockInfo(SLDataIter *pIter, const TSttBlkArray *pArray, SSttBlockLoadInfo *pBlockLoadInfo,
TSttBlkArray *pArray = pBlockLoadInfo->pBlockArray; uint64_t suid) {
if (TARRAY2_SIZE(pArray) <= 0) { if (TARRAY2_SIZE(pArray) <= 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -358,40 +352,52 @@ static int32_t loadSttBlockInfo(SLDataIter *pIter, SSttBlockLoadInfo *pBlockLoad ...@@ -358,40 +352,52 @@ static int32_t loadSttBlockInfo(SLDataIter *pIter, SSttBlockLoadInfo *pBlockLoad
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t loadSttTombBlockData(SSttFileReader *pSttFileReader, uint64_t suid, SSttBlockLoadInfo *pLoadInfo) { static int32_t uidComparFn(const void* p1, const void* p2) {
if (pLoadInfo->pTombBlockArray == NULL) { const uint64_t* uid1 = p1;
pLoadInfo->pTombBlockArray = taosArrayInit(4, POINTER_BYTES); const uint64_t* uid2 = p2;
} return (*uid1) - (*uid2);
}
const TTombBlkArray *pBlkArray = NULL; static bool existsFromSttBlkStatis(const TStatisBlkArray *pStatisBlkArray, uint64_t suid, uint64_t uid,
int32_t code = tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray); SSttFileReader *pReader) {
if (code != TSDB_CODE_SUCCESS) { if (TARRAY2_SIZE(pStatisBlkArray) <= 0) {
return code; return true;
} }
for (int32_t j = 0; j < pBlkArray->size; ++j) { int32_t i = 0;
STombBlk *pTombBlk = &pBlkArray->data[j]; for (i = 0; i < TARRAY2_SIZE(pStatisBlkArray); ++i) {
if (pTombBlk->maxTbid.suid < suid) { SStatisBlk *p = &pStatisBlkArray->data[i];
continue; // todo use binary search instead if (p->minTbid.suid == suid) {
break;
} }
}
if (pTombBlk->minTbid.suid > suid) { for (; i < TARRAY2_SIZE(pStatisBlkArray); ++i) {
SStatisBlk *p = &pStatisBlkArray->data[i];
if (p->minTbid.uid <= uid && p->maxTbid.uid >= uid) {
break; break;
} }
STombBlock *pTombBlock = taosMemoryCalloc(1, sizeof(STombBlock)); if (p->maxTbid.uid < uid) {
code = tsdbSttFileReadTombBlock(pSttFileReader, pTombBlk, pTombBlock); break;
if (code != TSDB_CODE_SUCCESS) {
// todo handle error
} }
}
void *p = taosArrayPush(pLoadInfo->pTombBlockArray, &pTombBlock); if (i >= TARRAY2_SIZE(pStatisBlkArray)) {
if (p == NULL) { return false;
return TSDB_CODE_OUT_OF_MEMORY;
}
} }
return TSDB_CODE_SUCCESS; SStatisBlk *p = &pStatisBlkArray->data[i];
STbStatisBlock block = {0};
tsdbSttFileReadStatisBlock(pReader, p, &block);
int32_t index = tarray2SearchIdx(block.uid, &uid, sizeof(int64_t), uidComparFn, TD_EQ);
if (index == -1) { // has record, continue.
tStatisBlockDestroy(&block);
return false;
}
return true;
} }
int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t iStt, int8_t backward, int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t iStt, int8_t backward,
...@@ -412,27 +418,45 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader ...@@ -412,27 +418,45 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader
if (!pBlockLoadInfo->sttBlockLoaded) { if (!pBlockLoadInfo->sttBlockLoaded) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
const TSttBlkArray*pSttBlkArray = NULL;
pBlockLoadInfo->sttBlockLoaded = true; pBlockLoadInfo->sttBlockLoaded = true;
code = tsdbSttFileReadSttBlk(pIter->pReader, (const TSttBlkArray **)&pBlockLoadInfo->pBlockArray); // load the stt block info for each stt-block
code = tsdbSttFileReadSttBlk(pIter->pReader, &pSttBlkArray);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tsdbError("load stt blk, code:%s, %s", tstrerror(code), idStr); tsdbError("load stt blk failed, code:%s, %s", tstrerror(code), idStr);
return code; return code;
} }
code = loadSttBlockInfo(pIter, pBlockLoadInfo, suid); code = extractSttBlockInfo(pIter, pSttBlkArray, pBlockLoadInfo, suid);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tsdbError("load stt block info failed, code:%s, %s", tstrerror(code), idStr); tsdbError("load stt block info failed, code:%s, %s", tstrerror(code), idStr);
return code; return code;
} }
code = loadTombFn(pReader1, pIter->pReader, pBlockLoadInfo); // load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file
code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray **)&pBlockLoadInfo->pSttStatisBlkArray);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), idStr);
return code;
}
code = loadTombFn(pReader1, pIter->pReader, pIter->pBlockLoadInfo);
double el = (taosGetTimestampUs() - st) / 1000.0; double el = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr); tsdbDebug("load the stt file info completed, elapsed time:%.2fms, %s", el, idStr);
}
bool exists = existsFromSttBlkStatis(pBlockLoadInfo->pSttStatisBlkArray, suid, uid, pIter->pReader);
if (!exists) {
pIter->iSttBlk = -1;
pIter->pSttBlk = NULL;
return TSDB_CODE_SUCCESS;
} }
// find the start block // find the start block, actually we could load the position to avoid repeatly searching for the start position when
// the skey is updated.
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk); size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
pIter->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward); pIter->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward);
if (pIter->iSttBlk != -1) { if (pIter->iSttBlk != -1) {
...@@ -744,6 +768,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) { ...@@ -744,6 +768,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf) {
goto _end; goto _end;
} }
// add the list/iter placeholder
while (taosArrayGetSize(pConf->pSttFileBlockIterArray) < size) { while (taosArrayGetSize(pConf->pSttFileBlockIterArray) < size) {
SArray *pList = taosArrayInit(4, POINTER_BYTES); SArray *pList = taosArrayInit(4, POINTER_BYTES);
taosArrayPush(pConf->pSttFileBlockIterArray, &pList); taosArrayPush(pConf->pSttFileBlockIterArray, &pList);
......
...@@ -590,10 +590,6 @@ int32_t loadDataFileTombDataForAll(STsdbReader* pReader) { ...@@ -590,10 +590,6 @@ int32_t loadDataFileTombDataForAll(STsdbReader* pReader) {
} }
int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo) { int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo) {
if (pLoadInfo->pTombBlockArray == NULL) {
pLoadInfo->pTombBlockArray = taosArrayInit(4, POINTER_BYTES);
}
const TTombBlkArray* pBlkArray = NULL; const TTombBlkArray* pBlkArray = NULL;
int32_t code = tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray); int32_t code = tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册