未验证 提交 8d0a4772 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #21070 from taosdata/fix/liaohj_main

enh(query): opt last row read performance. TD-23313
......@@ -1664,11 +1664,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
return handleErrorBeforePoll(pVg, pTmq);
}
sendInfo->msgInfo = (SDataBuf){
.pData = msg,
.len = msgSize,
.handle = NULL,
};
sendInfo->msgInfo = (SDataBuf){ .pData = msg, .len = msgSize, .handle = NULL };
sendInfo->requestId = req.reqId;
sendInfo->requestObjRefId = 0;
......
......@@ -124,11 +124,7 @@ static void mndCalMqRebalance(SMnode *pMnode) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) {
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_TMQ_TIMER,
.pCont = pReq,
.contLen = contLen,
};
SRpcMsg rpcMsg = { .msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen };
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
}
......
......@@ -162,7 +162,6 @@ int32_t metaTbCursorPrev(SMTbCursor *pTbCur, ETableType jumpTableType);
#endif
// tsdb
// typedef struct STsdb STsdb;
typedef struct STsdbReader STsdbReader;
#define TSDB_DEFAULT_STT_FILE 8
......
......@@ -16,6 +16,7 @@
#ifndef _TD_VNODE_TSDB_H_
#define _TD_VNODE_TSDB_H_
#include "tsimplehash.h"
#include "vnodeInt.h"
#ifdef __cplusplus
......@@ -125,11 +126,13 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter);
// SRowMerger
int32_t tsdbRowMergerInit(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema);
int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
// int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
void tsdbRowMergerClear(SRowMerger *pMerger);
// int32_t tsdbRowMerge(SRowMerger *pMerger, TSDBROW *pRow);
int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow);
int32_t tsdbRowMergerInit_rv(SRowMerger* pMerger, STSchema *pSchema);
void tsdbRowMergerClear_rv(SRowMerger* pMerger);
void tsdbRowMergerCleanup_rv(SRowMerger* pMerger);
// TABLEID
int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
// TSDBKEY
......@@ -224,7 +227,7 @@ int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward,
void *tsdbTbDataIterDestroy(STbDataIter *pIter);
void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter);
bool tsdbTbDataIterNext(STbDataIter *pIter);
void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj *pTableMap, int64_t *rowsNum);
void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum);
// STbData
int32_t tsdbGetNRowsInTbData(STbData *pTbData);
......@@ -706,7 +709,6 @@ typedef struct SSttBlockLoadInfo {
typedef struct SMergeTree {
int8_t backward;
SRBTree rbt;
SArray *pIterList;
SLDataIter *pIter;
bool destroyLoadInfo;
SSttBlockLoadInfo *pLoadInfo;
......@@ -752,13 +754,29 @@ struct SDiskDataBuilder {
SBlkInfo bi;
};
typedef struct SLDataIter {
SRBTreeNode node;
SSttBlk *pSttBlk;
SDataFReader *pReader;
int32_t iStt;
int8_t backward;
int32_t iSttBlk;
int32_t iRow;
SRowInfo rInfo;
uint64_t uid;
STimeWindow timeWindow;
SVersionRange verRange;
SSttBlockLoadInfo *pBlockLoadInfo;
bool ignoreEarlierTs;
} SLDataIter;
#define tMergeTreeGetRow(_t) (&((_t)->pIter->rInfo.row))
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
bool destroyLoadInfo, const char *idStr, bool strictTimeRange);
bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter* pLDataIter);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree *pMTree);
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
void tMergeTreeClose(SMergeTree *pMTree);
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, int32_t numOfStt);
......@@ -783,6 +801,7 @@ typedef struct SCacheRowsReader {
STableKeyInfo *pTableList; // table id list
int32_t numOfTables;
SSttBlockLoadInfo *pLoadInfo;
SLDataIter *pDataIter;
STsdbReadSnap *pReadSnap;
SDataFReader *pDataFReader;
SDataFReader *pDataFReaderLast;
......
......@@ -598,6 +598,7 @@ typedef struct {
SMergeTree mergeTree;
SMergeTree *pMergeTree;
SSttBlockLoadInfo *pLoadInfo;
SLDataIter* pDataIter;
int64_t lastTs;
} SFSLastNextRowIter;
......@@ -645,7 +646,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
}
tMergeTreeOpen(&state->mergeTree, 1, *state->pDataFReader, state->suid, state->uid,
&(STimeWindow){.skey = state->lastTs, .ekey = TSKEY_MAX},
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true);
&(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}, state->pLoadInfo, false, NULL, true, state->pDataIter);
state->pMergeTree = &state->mergeTree;
state->state = SFSLASTNEXTROW_BLOCKROW;
}
......@@ -667,7 +668,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
state->state = SFSLASTNEXTROW_FILESET;
goto _next_fileset;
}
state->row = tMergeTreeGetRow(&state->mergeTree);
state->row = *tMergeTreeGetRow(&state->mergeTree);
*ppRow = &state->row;
if (TSDBROW_TS(&state->row) <= state->lastTs) {
......@@ -1211,7 +1212,7 @@ typedef struct {
} CacheNextRowIter;
static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid,
SSttBlockLoadInfo *pLoadInfo, STsdbReadSnap *pReadSnap, SDataFReader **pDataFReader,
SSttBlockLoadInfo *pLoadInfo, SLDataIter* pLDataIter, STsdbReadSnap *pReadSnap, SDataFReader **pDataFReader,
SDataFReader **pDataFReaderLast, int64_t lastTs) {
int code = 0;
......@@ -1274,6 +1275,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
pIter->fsLastState.pLoadInfo = pLoadInfo;
pIter->fsLastState.pDataFReader = pDataFReaderLast;
pIter->fsLastState.lastTs = lastTs;
pIter->fsLastState.pDataIter = pLDataIter;
pIter->fsState.state = SFSNEXTROW_FS;
pIter->fsState.pTsdb = pTsdb;
......@@ -1465,7 +1467,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, SArray **ppCo
TSKEY lastRowTs = TSKEY_MAX;
CacheNextRowIter iter = {0};
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader,
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pDataIter, pr->pReadSnap, &pr->pDataFReader,
&pr->pDataFReaderLast, pr->lastTs);
do {
......@@ -1622,7 +1624,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCach
TSKEY lastRowTs = TSKEY_MAX;
CacheNextRowIter iter = {0};
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader,
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pDataIter, pr->pReadSnap, &pr->pDataFReader,
&pr->pDataFReaderLast, pr->lastTs);
do {
......
......@@ -187,13 +187,21 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
}
}
int32_t numOfStt = ((SVnode*)pVnode)->config.sttTrigger;
SVnodeCfg* pCfg = &((SVnode*)pVnode)->config;
int32_t numOfStt = pCfg->sttTrigger;
p->pLoadInfo = tCreateLastBlockLoadInfo(p->pSchema, NULL, 0, numOfStt);
if (p->pLoadInfo == NULL) {
tsdbCacherowsReaderClose(p);
return TSDB_CODE_OUT_OF_MEMORY;
}
p->pDataIter = taosMemoryCalloc(pCfg->sttTrigger, sizeof(SLDataIter));
if (p->pDataIter == NULL) {
tsdbCacherowsReaderClose(p);
return TSDB_CODE_OUT_OF_MEMORY;
}
p->idstr = taosStrdup(idstr);
taosThreadMutexInit(&p->readerMutex, NULL);
......@@ -215,6 +223,7 @@ void* tsdbCacherowsReaderClose(void* pReader) {
taosMemoryFree(p->pSchema);
}
taosMemoryFreeClear(p->pDataIter);
taosMemoryFree(p->pCurrSchema);
destroyLastBlockLoadInfo(p->pLoadInfo);
......
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <util/tsimplehash.h>
#include "tsdb.h"
#define MEM_MIN_HASH 1024
......@@ -298,12 +299,12 @@ int64_t tsdbCountTbDataRows(STbData *pTbData) {
return rowsNum;
}
void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum) {
void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj* pTableMap, int64_t *rowsNum) {
taosRLockLatch(&pMemTable->latch);
for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
STbData *pTbData = pMemTable->aBucket[i];
while (pTbData) {
void* p = taosHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid));
void* p = tSimpleHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid));
if (p == NULL) {
pTbData = pTbData->next;
continue;
......
......@@ -16,22 +16,6 @@
#include "tsdb.h"
// SLDataIter =================================================
struct SLDataIter {
SRBTreeNode node;
SSttBlk *pSttBlk;
SDataFReader *pReader;
int32_t iStt;
int8_t backward;
int32_t iSttBlk;
int32_t iRow;
SRowInfo rInfo;
uint64_t uid;
STimeWindow timeWindow;
SVersionRange verRange;
SSttBlockLoadInfo *pBlockLoadInfo;
bool ignoreEarlierTs;
};
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols,
int32_t numOfSttTrigger) {
SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(numOfSttTrigger, sizeof(SSttBlockLoadInfo));
......@@ -268,25 +252,21 @@ static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint
}
}
int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
int32_t tLDataIterOpen(struct SLDataIter *pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
const char *idStr, bool strictTimeRange) {
int32_t code = TSDB_CODE_SUCCESS;
*pIter = taosMemoryCalloc(1, sizeof(SLDataIter));
if (*pIter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pIter->uid = uid;
pIter->pReader = pReader;
pIter->iStt = iStt;
pIter->backward = backward;
pIter->verRange.minVer = pRange->minVer;
pIter->verRange.maxVer = pRange->maxVer;
pIter->timeWindow.skey = pTimeWindow->skey;
pIter->timeWindow.ekey = pTimeWindow->ekey;
(*pIter)->uid = uid;
(*pIter)->pReader = pReader;
(*pIter)->iStt = iStt;
(*pIter)->backward = backward;
(*pIter)->verRange = *pRange;
(*pIter)->timeWindow = *pTimeWindow;
(*pIter)->pBlockLoadInfo = pBlockLoadInfo;
pIter->pBlockLoadInfo = pBlockLoadInfo;
if (!pBlockLoadInfo->sttBlockLoaded) {
int64_t st = taosGetTimestampUs();
......@@ -294,7 +274,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
code = tsdbReadSttBlk(pReader, iStt, pBlockLoadInfo->aSttBlk);
if (code) {
goto _exit;
return code;
}
// only apply to the child tables, ordinary tables will not incur this filter procedure.
......@@ -310,7 +290,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
// no qualified stt block existed
taosArrayClear(pBlockLoadInfo->aSttBlk);
(*pIter)->iSttBlk = -1;
pIter->iSttBlk = -1;
double el = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr);
return code;
......@@ -343,31 +323,27 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
// find the start block
(*pIter)->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward);
if ((*pIter)->iSttBlk != -1) {
(*pIter)->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, (*pIter)->iSttBlk);
(*pIter)->iRow = ((*pIter)->backward) ? (*pIter)->pSttBlk->nRow : -1;
if ((!backward) && ((strictTimeRange && (*pIter)->pSttBlk->minKey >= (*pIter)->timeWindow.ekey) ||
(!strictTimeRange && (*pIter)->pSttBlk->minKey > (*pIter)->timeWindow.ekey))) {
(*pIter)->pSttBlk = NULL;
pIter->iSttBlk = binarySearchForStartBlock(pBlockLoadInfo->aSttBlk->pData, size, uid, backward);
if (pIter->iSttBlk != -1) {
pIter->pSttBlk = taosArrayGet(pBlockLoadInfo->aSttBlk, pIter->iSttBlk);
pIter->iRow = (pIter->backward) ? pIter->pSttBlk->nRow : -1;
if ((!backward) && ((strictTimeRange && pIter->pSttBlk->minKey >= pIter->timeWindow.ekey) ||
(!strictTimeRange && pIter->pSttBlk->minKey > pIter->timeWindow.ekey))) {
pIter->pSttBlk = NULL;
}
if (backward && ((strictTimeRange && (*pIter)->pSttBlk->maxKey <= (*pIter)->timeWindow.skey) ||
(!strictTimeRange && (*pIter)->pSttBlk->maxKey < (*pIter)->timeWindow.skey))) {
(*pIter)->pSttBlk = NULL;
(*pIter)->ignoreEarlierTs = true;
if (backward && ((strictTimeRange && pIter->pSttBlk->maxKey <= pIter->timeWindow.skey) ||
(!strictTimeRange && pIter->pSttBlk->maxKey < pIter->timeWindow.skey))) {
pIter->pSttBlk = NULL;
pIter->ignoreEarlierTs = true;
}
}
return code;
_exit:
taosMemoryFree(*pIter);
return code;
}
void tLDataIterClose(SLDataIter *pIter) { taosMemoryFree(pIter); }
void tLDataIterClose(SLDataIter *pIter) { /*taosMemoryFree(pIter); */}
void tLDataIterNextBlock(SLDataIter *pIter, const char *idStr) {
int32_t step = pIter->backward ? -1 : 1;
......@@ -594,43 +570,38 @@ static FORCE_INLINE int32_t tLDataIterDescCmprFn(const SRBTreeNode *p1, const SR
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, SSttBlockLoadInfo *pBlockLoadInfo,
bool destroyLoadInfo, const char *idStr, bool strictTimeRange) {
bool destroyLoadInfo, const char *idStr, bool strictTimeRange, SLDataIter* pLDataIter) {
int32_t code = TSDB_CODE_SUCCESS;
pMTree->backward = backward;
pMTree->pIter = NULL;
pMTree->pIterList = taosArrayInit(4, POINTER_BYTES);
if (pMTree->pIterList == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pMTree->idStr = idStr;
if (!pMTree->backward) { // asc
tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn);
} else { // desc
tRBTreeCreate(&pMTree->rbt, tLDataIterDescCmprFn);
}
int32_t code = TSDB_CODE_SUCCESS;
pMTree->pLoadInfo = pBlockLoadInfo;
pMTree->destroyLoadInfo = destroyLoadInfo;
pMTree->ignoreEarlierTs = false;
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
struct SLDataIter *pIter = NULL;
code = tLDataIterOpen(&pIter, pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
memset(&pLDataIter[i], 0, sizeof(SLDataIter));
code = tLDataIterOpen(&pLDataIter[i], pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
&pMTree->pLoadInfo[i], pMTree->idStr, strictTimeRange);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr);
bool hasVal = tLDataIterNextRow(&pLDataIter[i], pMTree->idStr);
if (hasVal) {
taosArrayPush(pMTree->pIterList, &pIter);
tMergeTreeAddIter(pMTree, pIter);
tMergeTreeAddIter(pMTree, &pLDataIter[i]);
} else {
if (!pMTree->ignoreEarlierTs) {
pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs;
pMTree->ignoreEarlierTs = pLDataIter[i].ignoreEarlierTs;
}
tLDataIterClose(pIter);
}
}
......@@ -678,18 +649,8 @@ bool tMergeTreeNext(SMergeTree *pMTree) {
return pMTree->pIter != NULL;
}
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree) { return pMTree->pIter->rInfo.row; }
void tMergeTreeClose(SMergeTree *pMTree) {
size_t size = taosArrayGetSize(pMTree->pIterList);
for (int32_t i = 0; i < size; ++i) {
SLDataIter *pIter = taosArrayGetP(pMTree->pIterList, i);
tLDataIterClose(pIter);
}
pMTree->pIterList = taosArrayDestroy(pMTree->pIterList);
pMTree->pIter = NULL;
if (pMTree->destroyLoadInfo) {
pMTree->pLoadInfo = destroyLastBlockLoadInfo(pMTree->pLoadInfo);
pMTree->destroyLoadInfo = false;
......
......@@ -18,6 +18,7 @@
#include "tsimplehash.h"
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
#define getCurrentKeyInLastBlock(_r) ((_r)->currentKey)
typedef enum {
EXTERNAL_ROWS_PREV = 0x1,
......@@ -108,6 +109,7 @@ typedef struct SLastBlockReader {
uint64_t uid;
SMergeTree mergeTree;
SSttBlockLoadInfo* pInfo;
int64_t currentKey;
} SLastBlockReader;
typedef struct SFilesetIter {
......@@ -125,12 +127,12 @@ typedef struct SFileDataBlockInfo {
} SFileDataBlockInfo;
typedef struct SDataBlockIter {
int32_t numOfBlocks;
int32_t index;
SArray* blockList; // SArray<SFileDataBlockInfo>
int32_t order;
SDataBlk block; // current SDataBlk data
SHashObj* pTableMap;
int32_t numOfBlocks;
int32_t index;
SArray* blockList; // SArray<SFileDataBlockInfo>
int32_t order;
SDataBlk block; // current SDataBlk data
SSHashObj* pTableMap;
} SDataBlockIter;
typedef struct SFileBlockDumpInfo {
......@@ -148,7 +150,8 @@ typedef struct STableUidList {
typedef struct SReaderStatus {
bool loadFromFile; // check file stage
bool composedDataBlock; // the returned data block is a composed block or not
SHashObj* pTableMap; // SHash<STableBlockScanInfo>
bool mapDataCleaned; // mapData has been cleaned up alreay or not
SSHashObj* pTableMap; // SHash<STableBlockScanInfo>
STableBlockScanInfo** pTableIter; // table iterator used in building in-memory buffer data blocks.
STableUidList uidList; // check tables in uid order, to avoid the repeatly load of blocks in STT.
SFileBlockDumpInfo fBlockDumpInfo;
......@@ -156,6 +159,9 @@ typedef struct SReaderStatus {
SBlockData fileBlockData;
SFilesetIter fileIter;
SDataBlockIter blockIter;
SLDataIter* pLDataIter;
SRowMerger merger;
SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data
} SReaderStatus;
typedef struct SBlockInfoBuf {
......@@ -165,6 +171,15 @@ typedef struct SBlockInfoBuf {
int32_t numOfTables;
} SBlockInfoBuf;
typedef struct STsdbReaderAttr {
STSchema* pSchema;
EReadMode readMode;
uint64_t rowsNum;
STimeWindow window;
bool freeBlock;
SVersionRange verRange;
} STsdbReaderAttr;
struct STsdbReader {
STsdb* pTsdb;
SVersionRange verRange;
......@@ -184,37 +199,34 @@ struct STsdbReader {
SBlockLoadSuppInfo suppInfo;
STsdbReadSnap* pReadSnap;
SIOCostSummary cost;
STSchema* pSchema; // the newest version schema
// STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times
SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema
SDataFReader* pFileReader; // the file reader
SDelFReader* pDelFReader; // the del file reader
SArray* pDelIdx; // del file block index;
SBlockInfoBuf blockInfoBuf;
int32_t step;
STsdbReader* innerReader[2];
STSchema* pSchema; // the newest version schema
SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema
SDataFReader* pFileReader; // the file reader
SDelFReader* pDelFReader; // the del file reader
SArray* pDelIdx; // del file block index;
SBlockInfoBuf blockInfoBuf;
int32_t step;
STsdbReader* innerReader[2];
};
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
STsdbReader* pReader);
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
SRowMerger* pMerger);
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
SRowMerger* pMerger, SVersionRange* pVerRange, const char* id);
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
STsdbReader* pReader);
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, STsdbReader* pReader);
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow,
STableBlockScanInfo* pInfo);
STableBlockScanInfo* pScanInfo);
static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
int32_t rowIndex);
static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order,
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order,
SVersionRange* pVerRange);
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
TSDBROW* pTSRow, STsdbReader* pReader, bool* freeTSRow);
TSDBROW* pResRow, STsdbReader* pReader, bool* freeTSRow);
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
STsdbReader* pReader, SRow** pTSRow);
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
......@@ -225,7 +237,6 @@ static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdb
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
int8_t* pLevel);
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t doBuildDataBlock(STsdbReader* pReader);
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
......@@ -233,9 +244,9 @@ static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFil
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
static STableBlockScanInfo* getTableBlockScanInfo(SHashObj* pTableMap, uint64_t uid, const char* id);
static STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id);
static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid);
static STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid);
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
......@@ -384,12 +395,11 @@ static int32_t uidComparFunc(const void* p1, const void* p2) {
}
// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
static SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
STableUidList* pUidList, int32_t numOfTables) {
// allocate buffer in order to load data blocks from file
// todo use simple hash instead, optimize the memory consumption
SHashObj* pTableMap =
taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
SSHashObj* pTableMap = tSimpleHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
if (pTableMap == NULL) {
return NULL;
}
......@@ -399,7 +409,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf
pUidList->tableUidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
if (pUidList->tableUidList == NULL) {
taosHashCleanup(pTableMap);
tSimpleHashCleanup(pTableMap);
return NULL;
}
......@@ -421,7 +431,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf
pScanInfo->lastKeyInStt = ekey;
}
taosHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid,
pScanInfo->lastKey, pTsdbReader->idStr);
}
......@@ -436,9 +446,11 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf
return pTableMap;
}
static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts, int32_t step) {
STableBlockScanInfo** p = NULL;
while ((p = taosHashIterate(pTableMap, p)) != NULL) {
static void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) {
void *p = NULL;
int32_t iter = 0;
while ((p = tSimpleHashIterate(pTableMap, p, &iter)) != NULL) {
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
pInfo->iterInit = false;
......@@ -478,13 +490,15 @@ static void clearBlockScanInfo(STableBlockScanInfo* p) {
tMapDataClear(&p->mapData);
}
static void destroyAllBlockScanInfo(SHashObj* pTableMap) {
static void destroyAllBlockScanInfo(SSHashObj* pTableMap) {
void* p = NULL;
while ((p = taosHashIterate(pTableMap, p)) != NULL) {
int32_t iter = 0;
while ((p = tSimpleHashIterate(pTableMap, p, &iter)) != NULL) {
clearBlockScanInfo(*(STableBlockScanInfo**)p);
}
taosHashCleanup(pTableMap);
tSimpleHashCleanup(pTableMap);
}
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) { return pWindow->skey > pWindow->ekey; }
......@@ -767,13 +781,16 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
setColumnIdSlotList(pSup, pCond->colList, pCond->pSlotList, pCond->numOfCols);
code = tBlockDataCreate(&pReader->status.fileBlockData);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
goto _end;
}
setColumnIdSlotList(pSup, pCond->colList, pCond->pSlotList, pCond->numOfCols);
ASSERT (pReader->suppInfo.colId[0] == PRIMARYKEY_TIMESTAMP_COL_ID);
pReader->status.pPrimaryTsCol = taosArrayGet(pReader->pResBlock->pDataBlock, pSup->slotId[0]);
tsdbInitReaderLock(pReader);
......@@ -794,7 +811,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
goto _end;
}
int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle);
size_t num = taosArrayGetSize(aBlockIdx);
......@@ -858,28 +875,42 @@ _end:
return code;
}
static void cleanupTableScanInfo(SHashObj* pTableMap) {
static void doCleanupTableScanInfo(STableBlockScanInfo* pScanInfo) {
// reset the index in last block when handing a new file
tMapDataClear(&pScanInfo->mapData);
taosArrayClear(pScanInfo->pBlockList);
}
static void cleanupTableScanInfo(SReaderStatus* pStatus) {
if (pStatus->mapDataCleaned) {
return;
}
SSHashObj* pTableMap = pStatus->pTableMap;
STableBlockScanInfo** px = NULL;
int32_t iter = 0;
while (1) {
px = taosHashIterate(pTableMap, px);
px = tSimpleHashIterate(pTableMap, px, &iter);
if (px == NULL) {
break;
}
// reset the index in last block when handing a new file
tMapDataClear(&(*px)->mapData);
taosArrayClear((*px)->pBlockList);
doCleanupTableScanInfo(*px);
}
pStatus->mapDataCleaned = true;
}
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum) {
int32_t numOfQTable = 0;
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum, SArray* pTableScanInfoList) {
size_t sizeInDisk = 0;
size_t numOfTables = taosArrayGetSize(pIndexList);
int64_t st = taosGetTimestampUs();
cleanupTableScanInfo(pReader->status.pTableMap);
cleanupTableScanInfo(&pReader->status);
// set the flag for the new file
pReader->status.mapDataCleaned = false;
for (int32_t i = 0; i < numOfTables; ++i) {
SBlockIdx* pBlockIdx = taosArrayGet(pIndexList, i);
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockIdx->uid, pReader->idStr);
......@@ -933,7 +964,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
}
if (taosArrayGetSize(pScanInfo->pBlockList) > 0) {
numOfQTable += 1;
taosArrayPush(pTableScanInfoList, &pScanInfo);
}
}
......@@ -944,8 +975,8 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
tsdbDebug(
"load block of %ld tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
"time:%.2f ms %s",
numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
pReader->idStr);
numOfTables, pBlockNum->numOfBlocks, (int32_t)taosArrayGetSize(pTableScanInfoList), pBlockNum->numOfLastFiles,
sizeInDisk / 1000.0, el, pReader->idStr);
pReader->cost.numOfBlocks += total;
pReader->cost.headFileLoadTime += el;
......@@ -1424,7 +1455,7 @@ static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr)
return TSDB_CODE_SUCCESS;
}
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks, SArray* pTableList) {
bool asc = ASCENDING_TRAVERSE(pReader->order);
SBlockOrderSupporter sup = {0};
......@@ -1433,7 +1464,8 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
pBlockIter->pTableMap = pReader->status.pTableMap;
// access data blocks according to the offset of each block in asc/desc order.
int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
// int32_t numOfTables = (int32_t)tSimpleHashGetSize(pReader->status.pTableMap);
int32_t numOfTables = taosArrayGetSize(pTableList);
int64_t st = taosGetTimestampUs();
int32_t code = initBlockOrderSupporter(&sup, numOfTables);
......@@ -1442,17 +1474,21 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
}
int32_t cnt = 0;
void* ptr = NULL;
while (1) {
ptr = taosHashIterate(pReader->status.pTableMap, ptr);
if (ptr == NULL) {
break;
}
STableBlockScanInfo* pTableScanInfo = *(STableBlockScanInfo**)ptr;
if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
continue;
}
// void* ptr = NULL;
// int32_t iter = 0;
// while (1) {
// ptr = tSimpleHashIterate(pReader->status.pTableMap, ptr, &iter);
// if (ptr == NULL) {
// break;
// }
for(int32_t i = 0; i < numOfTables; ++i) {
STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, i);
ASSERT(pTableScanInfo->pBlockList != NULL && taosArrayGetSize(pTableScanInfo->pBlockList) > 0);
// if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
// continue;
// }
size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
sup.numOfBlocksPerTable[sup.numOfTables] = num;
......@@ -1832,11 +1868,14 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
return false;
}
TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
TSDBKEY k = TSDBROW_KEY(&row);
pScanInfo->lastKeyInStt = k.ts;
TSDBROW* pRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
int64_t key = pRow->pBlockData->aTSKEY[pRow->iRow];
int64_t ver = pRow->pBlockData->aVersion[pRow->iRow];
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order, pVerRange)) {
pLastBlockReader->currentKey = key;
pScanInfo->lastKeyInStt = key;
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, key, ver, pLastBlockReader->order, pVerRange)) {
// the qualifed ts may equal to k.ts, only a greater version one.
// here we need to fallback one step.
return true;
......@@ -1876,7 +1915,7 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas
return code;
}
static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid) {
STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid) {
if (pReader->pSchema != NULL) {
return pReader->pSchema;
}
......@@ -1899,6 +1938,12 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
terrno = code;
return NULL;
}
code = tsdbRowMergerInit_rv(&pReader->status.merger, pReader->pSchema);
if (code != 0) {
terrno = code;
return NULL;
}
}
if (pReader->pSchema && sversion == pReader->pSchema->version) {
......@@ -1927,7 +1972,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
SRowMerger merge = {0};
SRowMerger* pMerger = &pReader->status.merger;
SRow* pTSRow = NULL;
SBlockData* pBlockData = &pReader->status.fileBlockData;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
......@@ -1976,25 +2021,25 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (pReader->order == TSDB_ORDER_ASC) {
if (minKey == key) {
init = true; // todo check if pReader->pSchema is null or not
int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
}
if (minKey == tsLast) {
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) {
tsdbRowMergerAdd(&merge, &fRow1, NULL);
tsdbRowMergerAdd(&pReader->status.merger, fRow1, NULL);
} else {
init = true;
int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow1, pReader->pSchema);
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &pReader->status.merger, &pReader->verRange, pReader->idStr);
}
if (minKey == k.ts) {
......@@ -2003,15 +2048,15 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return terrno;
}
if (init) {
tsdbRowMergerAdd(&merge, pRow, pSchema);
tsdbRowMergerAdd(pMerger, pRow, pSchema);
} else {
init = true;
int32_t code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
int32_t code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2024,46 +2069,46 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return terrno;
}
int32_t code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
code = doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS || pMerger->pTSchema == NULL) {
return code;
}
}
if (minKey == tsLast) {
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) {
tsdbRowMergerAdd(&merge, &fRow1, NULL);
tsdbRowMergerAdd(pMerger, fRow1, NULL);
} else {
init = true;
int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow1, pReader->pSchema);
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->verRange, pReader->idStr);
}
if (minKey == key) {
if (init) {
tsdbRowMergerAdd(&merge, &fRow, NULL);
tsdbRowMergerAdd(pMerger, &fRow, NULL);
} else {
init = true;
int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
}
}
int32_t code = tsdbRowMergerGetRow(&merge, &pTSRow);
int32_t code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2071,7 +2116,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(&merge);
tsdbRowMergerClear_rv(pMerger);
return code;
}
......@@ -2079,14 +2124,19 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
bool mergeBlockData) {
SRowMerger* pMerger = &pReader->status.merger;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
bool copied = false;
int32_t code = TSDB_CODE_SUCCESS;
SRow* pTSRow = NULL;
SRowMerger merge = {0};
TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
bool copied = false;
int32_t code = TSDB_CODE_SUCCESS;
SRow* pTSRow = NULL;
TSDBROW* pRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
// create local variable to hold the row value
TSDBROW fRow = {.iRow = pRow->iRow, .type = TSDBROW_COL_FMT, .pBlockData = pRow->pBlockData};
tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", pRow->pBlockData, pRow->iRow, pLastBlockReader->uid, pReader->idStr);
// only last block exists
if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
......@@ -2099,16 +2149,16 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
pBlockScanInfo->lastKey = tsLastBlock;
return TSDB_CODE_SUCCESS;
} else {
code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMergerAdd(&merge, &fRow1, NULL);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange, pReader->idStr);
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMergerAdd(pMerger, pRow1, NULL);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->verRange, pReader->idStr);
code = tsdbRowMergerGetRow(&merge, &pTSRow);
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2116,26 +2166,26 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(&merge);
tsdbRowMergerClear_rv(pMerger);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
} else { // not merge block data
code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange, pReader->idStr);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->verRange, pReader->idStr);
// merge with block data if ts == key
if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
}
code = tsdbRowMergerGetRow(&merge, &pTSRow);
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2143,7 +2193,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(&merge);
tsdbRowMergerClear_rv(pMerger);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -2172,22 +2222,22 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if (key == ts) {
SRow* pTSRow = NULL;
SRowMerger merge = {0};
SRow* pTSRow = NULL;
SRowMerger* pMerger = &pReader->status.merger;
int32_t code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMergerAdd(&merge, &fRow1, NULL);
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMergerAdd(pMerger, pRow1, NULL);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge, &pReader->verRange, pReader->idStr);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, pMerger, &pReader->verRange, pReader->idStr);
code = tsdbRowMergerGetRow(&merge, &pTSRow);
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2195,7 +2245,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(&merge);
tsdbRowMergerClear_rv(pMerger);
return code;
} else {
return TSDB_CODE_SUCCESS;
......@@ -2210,7 +2260,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
SLastBlockReader* pLastBlockReader) {
SRowMerger merge = {0};
SRowMerger* pMerger = &pReader->status.merger;
SRow* pTSRow = NULL;
int32_t code = TSDB_CODE_SUCCESS;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
......@@ -2283,42 +2333,41 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == key) {
init = true;
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
}
if (minKey == tsLast) {
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) {
tsdbRowMergerAdd(&merge, &fRow1, NULL);
tsdbRowMergerAdd(pMerger, pRow1, NULL);
} else {
init = true;
code = tsdbRowMergerInit(&merge, NULL, &fRow1, pReader->pSchema);
code = tsdbRowMergerAdd(pMerger, pRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->verRange, pReader->idStr);
}
if (minKey == ik.ts) {
if (init) {
tsdbRowMergerAdd(&merge, piRow, piSchema);
tsdbRowMergerAdd(pMerger, piRow, piSchema);
} else {
init = true;
code = tsdbRowMergerInit(&merge, pSchema, piRow, piSchema);
code = tsdbRowMergerAdd(pMerger, piRow, piSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
pReader);
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2326,20 +2375,15 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == k.ts) {
if (init) {
if (merge.pTSchema == NULL) {
return code;
}
tsdbRowMergerAdd(&merge, pRow, pSchema);
tsdbRowMergerAdd(pMerger, pRow, pSchema);
} else {
// STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
pReader);
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2347,13 +2391,12 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
} else {
if (minKey == k.ts) {
init = true;
code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge,
pReader);
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2361,58 +2404,49 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == ik.ts) {
if (init) {
tsdbRowMergerAdd(&merge, piRow, piSchema);
tsdbRowMergerAdd(pMerger, piRow, piSchema);
} else {
init = true;
// STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
code = tsdbRowMergerInit(&merge, pSchema, piRow, piSchema);
code = tsdbRowMergerAdd(pMerger, piRow, piSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
pReader);
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
if (minKey == tsLast) {
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
if (init) {
tsdbRowMergerAdd(&merge, &fRow1, NULL);
tsdbRowMergerAdd(pMerger, pRow1, NULL);
} else {
init = true;
code = tsdbRowMergerInit(&merge, NULL, &fRow1, pReader->pSchema);
code = tsdbRowMergerAdd(pMerger, pRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge, &pReader->verRange, pReader->idStr);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->verRange, pReader->idStr);
}
if (minKey == key) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
if (!init) {
code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else {
if (merge.pTSchema == NULL) {
return code;
}
tsdbRowMergerAdd(&merge, &fRow, NULL);
tsdbRowMergerAdd(pMerger, &fRow, NULL);
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
}
}
if (merge.pTSchema == NULL) {
return code;
}
code = tsdbRowMergerGetRow(&merge, &pTSRow);
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2420,7 +2454,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(&merge);
tsdbRowMergerClear_rv(pMerger);
return code;
}
......@@ -2514,8 +2548,7 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
return false;
}
TSDBKEY k = {.ts = ts, .version = ver};
if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k, pReader->order,
if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, ts, ver, pReader->order,
&pReader->verRange)) {
return false;
}
......@@ -2547,7 +2580,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
pScanInfo->uid, pReader->idStr);
int32_t code = tMergeTreeOpen(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC),
pReader->pFileReader, pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange,
pLBlockReader->pInfo, false, pReader->idStr, false);
pLBlockReader->pInfo, false, pReader->idStr, false, pReader->status.pLDataIter);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
......@@ -2555,11 +2588,6 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
return nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
}
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
TSDBROW row = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
return TSDBROW_TS(&row);
}
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; }
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
......@@ -2585,15 +2613,13 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
SRow* pTSRow = NULL;
SRowMerger merge = {0};
code = tsdbRowMergerInit(&merge, NULL, &fRow, pReader->pSchema);
code = tsdbRowMergerAdd(&pReader->status.merger, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
code = tsdbRowMergerGetRow(&merge, &pTSRow);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader);
code = tsdbRowMergerGetRow(&pReader->status.merger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2601,7 +2627,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(&merge);
tsdbRowMergerClear_rv(&pReader->status.merger);
return code;
}
}
......@@ -2905,12 +2931,12 @@ TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader)
}
}
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SArray* pTableList) {
SReaderStatus* pStatus = &pReader->status;
pBlockNum->numOfBlocks = 0;
pBlockNum->numOfLastFiles = 0;
size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
size_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx));
while (1) {
......@@ -2933,13 +2959,14 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
}
if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) {
code = doLoadFileBlock(pReader, pIndexList, pBlockNum);
code = doLoadFileBlock(pReader, pIndexList, pBlockNum, pTableList);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pIndexList);
return code;
}
if (pBlockNum->numOfBlocks + pBlockNum->numOfLastFiles > 0) {
// ASSERT(taosArrayGetSize(pTableList) > 0);
break;
}
}
......@@ -2979,18 +3006,18 @@ static void resetTableListIndex(SReaderStatus* pStatus) {
pList->currentIndex = 0;
uint64_t uid = pList->tableUidList[0];
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
pStatus->pTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid));
}
static bool moveToNextTable(STableUidList* pOrderedCheckInfo, SReaderStatus* pStatus) {
pOrderedCheckInfo->currentIndex += 1;
if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
if (pOrderedCheckInfo->currentIndex >= tSimpleHashGetSize(pStatus->pTableMap)) {
pStatus->pTableIter = NULL;
return false;
}
uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex];
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
pStatus->pTableIter = tSimpleHashGet(pStatus->pTableMap, &uid, sizeof(uid));
return (pStatus->pTableIter != NULL);
}
......@@ -3000,7 +3027,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
STableUidList* pUidList = &pStatus->uidList;
int32_t code = TSDB_CODE_SUCCESS;
if (taosHashGetSize(pStatus->pTableMap) == 0) {
if (tSimpleHashGetSize(pStatus->pTableMap) == 0) {
return TSDB_CODE_SUCCESS;
}
......@@ -3010,8 +3037,12 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
// load the last data block of current table
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
if (!hasVal) {
// reset the index in last block when handing a new file
doCleanupTableScanInfo(pScanInfo);
pStatus->mapDataCleaned = true;
bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
if (!hasDataInLastFile) {
bool hasNexTable = moveToNextTable(pUidList, pStatus);
if (!hasNexTable) {
return TSDB_CODE_SUCCESS;
......@@ -3162,7 +3193,7 @@ static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReade
goto _end;
}
int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle);
size_t num = taosArrayGetSize(aBlockIdx);
......@@ -3172,14 +3203,13 @@ static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReade
}
SBlockIdx* pBlockIdx = NULL;
int32_t i = 0;
for (int32_t i = 0; i < num; ++i) {
pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
if (pBlockIdx->suid != pReader->suid) {
continue;
}
STableBlockScanInfo** p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(pBlockIdx->uid));
STableBlockScanInfo** p = tSimpleHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(pBlockIdx->uid));
if (p == NULL) {
continue;
}
......@@ -3225,13 +3255,13 @@ static int32_t doSumSttBlockRows(STsdbReader* pReader) {
taosArrayClear(pBlockLoadInfo->aSttBlk);
continue;
}
for (int32_t i = 0; i < size; ++i) {
SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
for (int32_t j = 0; j < size; ++j) {
SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, j);
pReader->rowsNum += p->nRow;
}
} else {
for (int32_t i = 0; i < size; ++i) {
SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
for (int32_t j = 0; j < size; ++j) {
SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, j);
uint64_t s = p->suid;
if (s < pReader->suid) {
continue;
......@@ -3301,13 +3331,6 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
STableUidList* pUidList = &pStatus->uidList;
while (1) {
// if (pStatus->pTableIter == NULL) {
// pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
// if (pStatus->pTableIter == NULL) {
// return TSDB_CODE_SUCCESS;
// }
// }
STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
initMemDataIterator(*pBlockScanInfo, pReader);
......@@ -3335,7 +3358,7 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter)
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
if (pBlockInfo) {
STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
STableBlockScanInfo* pScanInfo = tSimpleHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
if (pScanInfo) {
lastKey = pScanInfo->lastKey;
}
......@@ -3352,20 +3375,24 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter)
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
SBlockNumber num = {0};
int32_t code = moveToNextFile(pReader, &num);
SArray* pTableList = taosArrayInit(40, POINTER_BYTES);
int32_t code = moveToNextFile(pReader, &num, pTableList);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pTableList);
return code;
}
// all data files are consumed, try data in buffer
if (num.numOfBlocks + num.numOfLastFiles == 0) {
pReader->status.loadFromFile = false;
taosArrayDestroy(pTableList);
return code;
}
// initialize the block iterator for a new fileset
if (num.numOfBlocks > 0) {
code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks, pTableList);
} else { // no block data, only last block exists
tBlockDataReset(&pReader->status.fileBlockData);
resetDataBlockIterator(pBlockIter, pReader->order);
......@@ -3374,6 +3401,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl
// set the correct start position according to the query time window
initBlockDumpInfo(pReader, pBlockIter);
taosArrayDestroy(pTableList);
return code;
}
......@@ -3537,7 +3565,7 @@ SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_
return (SVersionRange){.minVer = startVer, .maxVer = endVer};
}
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order, SVersionRange* pVerRange) {
bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order, SVersionRange* pVerRange) {
if (pDelList == NULL) {
return false;
}
......@@ -3549,29 +3577,29 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
if (asc) {
if (*index >= num - 1) {
TSDBKEY* last = taosArrayGetLast(pDelList);
ASSERT(pKey->ts >= last->ts);
ASSERT(key >= last->ts);
if (pKey->ts > last->ts) {
if (key > last->ts) {
return false;
} else if (pKey->ts == last->ts) {
} else if (key == last->ts) {
TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
return (prev->version >= pKey->version && prev->version <= pVerRange->maxVer &&
return (prev->version >= ver && prev->version <= pVerRange->maxVer &&
prev->version >= pVerRange->minVer);
}
} else {
TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);
if (pKey->ts < pCurrent->ts) {
if (key < pCurrent->ts) {
return false;
}
if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
if (pCurrent->ts <= key && pNext->ts >= key && pCurrent->version >= ver &&
pVerRange->maxVer >= pCurrent->version) {
return true;
}
while (pNext->ts <= pKey->ts && (*index) < num - 1) {
while (pNext->ts <= key && (*index) < num - 1) {
(*index) += 1;
if ((*index) < num - 1) {
......@@ -3583,7 +3611,7 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
continue;
}
if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version &&
if (pCurrent->ts <= key && pNext->ts >= key && pCurrent->version >= ver &&
pVerRange->maxVer >= pCurrent->version) {
return true;
}
......@@ -3596,10 +3624,10 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
if (*index <= 0) {
TSDBKEY* pFirst = taosArrayGet(pDelList, 0);
if (pKey->ts < pFirst->ts) {
if (key < pFirst->ts) {
return false;
} else if (pKey->ts == pFirst->ts) {
return pFirst->version >= pKey->version;
} else if (key == pFirst->ts) {
return pFirst->version >= ver;
} else {
ASSERT(0);
}
......@@ -3607,15 +3635,15 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
TSDBKEY* pPrev = taosArrayGet(pDelList, (*index) - 1);
if (pKey->ts > pCurrent->ts) {
if (key > pCurrent->ts) {
return false;
}
if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
if (pPrev->ts <= key && pCurrent->ts >= key && pPrev->version >= ver) {
return true;
}
while (pPrev->ts >= pKey->ts && (*index) > 1) {
while (pPrev->ts >= key && (*index) > 1) {
(*index) += step;
if ((*index) >= 1) {
......@@ -3627,7 +3655,7 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
continue;
}
if (pPrev->ts <= pKey->ts && pCurrent->ts >= pKey->ts && pPrev->version >= pKey->version) {
if (pPrev->ts <= key && pCurrent->ts >= key && pPrev->version >= ver) {
return true;
}
}
......@@ -3655,7 +3683,7 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
// it is a valid data version
if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
(!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
(!hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, pReader->order, &pReader->verRange))) {
return pRow;
}
......@@ -3674,14 +3702,15 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
}
if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
(!hasBeenDropped(pDelList, &pIter->index, &key, pReader->order, &pReader->verRange))) {
(!hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, pReader->order, &pReader->verRange))) {
return pRow;
}
}
}
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
STsdbReader* pReader) {
int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, STsdbReader* pReader) {
SRowMerger* pMerger = &pReader->status.merger;
while (1) {
pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
if (!pIter->hasVal) {
......@@ -3760,10 +3789,10 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
return code;
}
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
SRowMerger* pMerger) {
int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SRowMerger* pMerger = &pReader->status.merger;
bool asc = ASCENDING_TRAVERSE(pReader->order);
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
int32_t step = asc ? 1 : -1;
......@@ -3801,8 +3830,8 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc
while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo, pVerRange)) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
if (next1 == ts) {
TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMergerAdd(pMerger, &fRow1, NULL);
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMergerAdd(pMerger, pRow1, NULL);
} else {
tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid,
pScanInfo->lastBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), pScanInfo->lastKeyInStt,
......@@ -3842,7 +3871,6 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
}
}
SRowMerger merge = {0};
terrno = 0;
int32_t code = 0;
......@@ -3854,8 +3882,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
return terrno;
}
STSchema* ps = (pReader->pSchema != NULL)? pReader->pSchema:pTSchema;
code = tsdbRowMergerInit(&merge, ps, &current, pTSchema);
code = tsdbRowMergerAdd(&pReader->status.merger, &current, pTSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -3865,28 +3892,28 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
return terrno;
}
tsdbRowMergerAdd(&merge, pNextRow, pTSchema1);
tsdbRowMergerAdd(&pReader->status.merger,pNextRow, pTSchema1);
} else { // let's merge rows in file block
code = tsdbRowMergerInit(&merge, NULL, &current, pReader->pSchema);
code = tsdbRowMergerAdd(&pReader->status.merger, &current, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
tsdbRowMergerAdd(&merge, pNextRow, NULL);
tsdbRowMergerAdd(&pReader->status.merger,pNextRow, NULL);
}
code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(&current), pDelList, &merge, pReader);
code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(&current), pDelList, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = tsdbRowMergerGetRow(&merge, &pResRow->pTSRow);
code = tsdbRowMergerGetRow(&pReader->status.merger, &pResRow->pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pResRow->type = TSDBROW_ROW_FMT;
tsdbRowMergerClear(&merge);
tsdbRowMergerClear_rv(&pReader->status.merger);
*freeTSRow = true;
return TSDB_CODE_SUCCESS;
......@@ -3894,7 +3921,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
SRow** pTSRow) {
SRowMerger merge = {0};
SRowMerger* pMerger = &pReader->status.merger;
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow);
......@@ -3909,46 +3936,43 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
}
if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem
int32_t code = tsdbRowMergerInit(&merge, pSchema, piRow, piSchema);
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
pReader);
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
tsdbRowMergerAdd(&merge, pRow, pSchema);
tsdbRowMergerAdd(&pReader->status.merger,pRow, pSchema);
code =
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else {
int32_t code = tsdbRowMergerInit(&merge, NULL, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS || merge.pTSchema == NULL) {
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS || pMerger->pTSchema == NULL) {
return code;
}
code =
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
tsdbRowMergerAdd(&merge, piRow, piSchema);
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge,
pReader);
tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema);
code = doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
int32_t code = tsdbRowMergerGetRow(&merge, pTSRow);
tsdbRowMergerClear(&merge);
int32_t code = tsdbRowMergerGetRow(pMerger, pTSRow);
tsdbRowMergerClear_rv(pMerger);
return code;
}
......@@ -4073,11 +4097,12 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
int32_t code = TSDB_CODE_SUCCESS;
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
if (pReader->suppInfo.colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
((int64_t*)pColData->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex];
// ASSERT (pReader->suppInfo.colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID);// {
// SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, pSupInfo->slotId[i]);
// ((int64_t*)pColData->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex];
((int64_t*)pReader->status.pPrimaryTsCol->pData)[outputRowIndex] = pBlockData->aTSKEY[rowIndex];
i += 1;
}
// }
SColVal cv = {0};
int32_t numOfInputCols = pBlockData->nColData;
......@@ -4163,10 +4188,12 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
// TODO refactor: with createDataBlockScanInfo
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
int32_t size = taosHashGetSize(pReader->status.pTableMap);
int32_t size = tSimpleHashGetSize(pReader->status.pTableMap);
STableBlockScanInfo** p = NULL;
while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
int32_t iter = 0;
while ((p = tSimpleHashIterate(pReader->status.pTableMap, p, &iter)) != NULL) {
clearBlockScanInfo(*p);
}
......@@ -4184,7 +4211,7 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n
pReader->status.uidList.tableUidList = (uint64_t*)p1;
}
taosHashClear(pReader->status.pTableMap);
tSimpleHashClear(pReader->status.pTableMap);
STableUidList* pUidList = &pReader->status.uidList;
pUidList->currentIndex = 0;
......@@ -4205,7 +4232,7 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n
pInfo->lastKeyInStt = ekey;
}
taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
tSimpleHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
}
return TDB_CODE_SUCCESS;
......@@ -4327,6 +4354,10 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
}
}
if (pReader->pSchema != NULL) {
tsdbRowMergerInit_rv(&pReader->status.merger, pReader->pSchema);
}
pReader->pSchemaMap = tSimpleHashInit(8, taosFastHash);
if (pReader->pSchemaMap == NULL) {
tsdbError("failed init schema hash for reader %s", pReader->idStr);
......@@ -4351,6 +4382,12 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
goto _err;
}
pReader->status.pLDataIter = taosMemoryCalloc(pVnode->config.sttTrigger, sizeof(SLDataIter));
if (pReader->status.pLDataIter == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pReader->suspended = true;
if (countOnly) {
......@@ -4367,29 +4404,38 @@ _err:
return code;
}
static void clearSharedPtr(STsdbReader* p) {
p->status.pLDataIter = NULL;
p->status.pTableMap = NULL;
p->status.uidList.tableUidList = NULL;
p->pReadSnap = NULL;
p->pSchema = NULL;
p->pSchemaMap = NULL;
}
static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) {
pDst->status.pTableMap = pSrc->status.pTableMap;
pDst->status.pLDataIter = pSrc->status.pLDataIter;
pDst->status.uidList = pSrc->status.uidList;
pDst->pSchema = pSrc->pSchema;
pDst->pSchemaMap = pSrc->pSchemaMap;
pDst->pReadSnap = pSrc->pReadSnap;
}
void tsdbReaderClose(STsdbReader* pReader) {
if (pReader == NULL) {
return;
}
tsdbAcquireReader(pReader);
{
if (pReader->innerReader[0] != NULL || pReader->innerReader[1] != NULL) {
STsdbReader* p = pReader->innerReader[0];
p->status.pTableMap = NULL;
p->status.uidList.tableUidList = NULL;
p->pReadSnap = NULL;
p->pSchema = NULL;
p->pSchemaMap = NULL;
clearSharedPtr(p);
p = pReader->innerReader[1];
p->status.pTableMap = NULL;
p->status.uidList.tableUidList = NULL;
p->pReadSnap = NULL;
p->pSchema = NULL;
p->pSchemaMap = NULL;
clearSharedPtr(p);
tsdbReaderClose(pReader->innerReader[0]);
tsdbReaderClose(pReader->innerReader[1]);
......@@ -4413,7 +4459,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
tBlockDataDestroy(&pReader->status.fileBlockData);
cleanupDataBlockIterator(&pReader->status.blockIter);
size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
size_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
if (pReader->status.pTableMap != NULL) {
destroyAllBlockScanInfo(pReader->status.pTableMap);
clearBlockScanInfoBuf(&pReader->blockInfoBuf);
......@@ -4440,7 +4486,8 @@ void tsdbReaderClose(STsdbReader* pReader) {
tsdbUninitReaderLock(pReader);
taosMemoryFree(pReader->status.uidList.tableUidList);
taosMemoryFreeClear(pReader->status.pLDataIter);
taosMemoryFreeClear(pReader->status.uidList.tableUidList);
SIOCostSummary* pCost = &pReader->cost;
SFilesetIter* pFilesetIter = &pReader->status.fileIter;
......@@ -4467,6 +4514,8 @@ void tsdbReaderClose(STsdbReader* pReader) {
pCost->initDelSkylineIterTime, pReader->idStr);
taosMemoryFree(pReader->idStr);
tsdbRowMergerCleanup_rv(&pReader->status.merger);
taosMemoryFree(pReader->pSchema);
tSimpleHashCleanup(pReader->pSchemaMap);
......@@ -4495,8 +4544,9 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
// resetDataBlockScanInfo excluding lastKey
STableBlockScanInfo** p = NULL;
int32_t iter = 0;
while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) {
while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) {
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
pInfo->iterInit = false;
......@@ -4517,8 +4567,9 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
} else {
// resetDataBlockScanInfo excluding lastKey
STableBlockScanInfo** p = NULL;
int32_t iter = 0;
while ((p = taosHashIterate(pStatus->pTableMap, p)) != NULL) {
while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) {
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
pInfo->iterInit = false;
......@@ -4607,7 +4658,7 @@ int32_t tsdbReaderResume(STsdbReader* pReader) {
// restore reader's state
// task snapshot
int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
if (numOfTables > 0) {
qTrace("tsdb/reader: %p, take snapshot", pReader);
code = tsdbTakeReadSnap(pReader, tsdbSetQueryReseek, &pReader->pReadSnap);
......@@ -4626,18 +4677,10 @@ int32_t tsdbReaderResume(STsdbReader* pReader) {
// we need only one row
pPrevReader->capacity = 1;
pPrevReader->status.pTableMap = pReader->status.pTableMap;
pPrevReader->status.uidList = pReader->status.uidList;
pPrevReader->pSchema = pReader->pSchema;
pPrevReader->pSchemaMap = pReader->pSchemaMap;
pPrevReader->pReadSnap = pReader->pReadSnap;
setSharedPtr(pPrevReader, pReader);
pNextReader->capacity = 1;
pNextReader->status.pTableMap = pReader->status.pTableMap;
pNextReader->status.uidList = pReader->status.uidList;
pNextReader->pSchema = pReader->pSchema;
pNextReader->pSchemaMap = pReader->pSchemaMap;
pNextReader->pReadSnap = pReader->pReadSnap;
setSharedPtr(pNextReader, pReader);
code = doOpenReaderImpl(pPrevReader);
if (code != TSDB_CODE_SUCCESS) {
......@@ -4694,7 +4737,7 @@ static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) {
*hasNext = false;
SReaderStatus* pStatus = &pReader->status;
if (taosHashGetSize(pStatus->pTableMap) == 0) {
if (tSimpleHashGetSize(pStatus->pTableMap) == 0) {
return code;
}
......@@ -4947,11 +4990,11 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
return code;
}
STableBlockScanInfo* getTableBlockScanInfo(SHashObj* pTableMap, uint64_t uid, const char* id) {
STableBlockScanInfo** p = taosHashGet(pTableMap, &uid, sizeof(uid));
STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id) {
STableBlockScanInfo** p = tSimpleHashGet(pTableMap, &uid, sizeof(uid));
if (p == NULL || *p == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
int32_t size = taosHashGetSize(pTableMap);
int32_t size = tSimpleHashGetSize(pTableMap);
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", uid, size, id);
return NULL;
}
......@@ -5037,7 +5080,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
tsdbDataFReaderClose(&pReader->pFileReader);
int32_t numOfTables = taosHashGetSize(pStatus->pTableMap);
int32_t numOfTables = tSimpleHashGetSize(pStatus->pTableMap);
initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
resetDataBlockIterator(pBlockIter, pReader->order);
......@@ -5108,7 +5151,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
pTableBlockInfo->numOfFiles += 1;
int32_t numOfTables = (int32_t)taosHashGetSize(pStatus->pTableMap);
int32_t numOfTables = (int32_t)tSimpleHashGetSize(pStatus->pTableMap);
int defaultRows = 4096;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
......@@ -5172,7 +5215,8 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
tsdbReaderResume(pReader);
}
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
int32_t iter = 0;
pStatus->pTableIter = tSimpleHashIterate(pStatus->pTableMap, NULL, &iter);
while (pStatus->pTableIter != NULL) {
STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
......@@ -5194,7 +5238,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
}
// current table is exhausted, let's try the next table
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
pStatus->pTableIter = tSimpleHashIterate(pStatus->pTableMap, pStatus->pTableIter, &iter);
}
tsdbReleaseReader(pReader);
......
......@@ -712,124 +712,163 @@ _exit:
int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0;
TSDBKEY key = TSDBROW_KEY(pRow);
SColVal *pColVal = &(SColVal){0};
SColVal * pColVal = &(SColVal){0};
STColumn *pTColumn;
int32_t iCol, jCol = 1;
if (NULL == pTSchema) {
pTSchema = pMerger->pTSchema;
}
ASSERT(((SColVal *)pMerger->pArray->pData)->value.val == key.ts);
for (iCol = 1; iCol < pMerger->pTSchema->numOfCols && jCol < pTSchema->numOfCols; ++iCol) {
pTColumn = &pMerger->pTSchema->columns[iCol];
if (pTSchema->columns[jCol].colId < pTColumn->colId) {
++jCol;
--iCol;
continue;
} else if (pTSchema->columns[jCol].colId > pTColumn->colId) {
continue;
if (taosArrayGetSize(pMerger->pArray) == 0) {
// ts
jCol = 0;
pTColumn = &pTSchema->columns[jCol++];
ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP);
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = key.ts});
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
// goto _exit;
}
tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
// other
for (iCol = 1; jCol < pTSchema->numOfCols && iCol < pMerger->pTSchema->numOfCols; ++iCol) {
pTColumn = &pMerger->pTSchema->columns[iCol];
if (pTSchema->columns[jCol].colId < pTColumn->colId) {
++jCol;
--iCol;
continue;
} else if (pTSchema->columns[jCol].colId > pTColumn->colId) {
taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type));
continue;
}
if (key.version > pMerger->version) {
if (!COL_VAL_IS_NONE(pColVal)) {
if (IS_VAR_DATA_TYPE(pColVal->type)) {
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
if (!COL_VAL_IS_NULL(pColVal)) {
code = tRealloc(&pTColVal->value.pData, pColVal->value.nData);
if (code) return code;
tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
uint8_t *pVal = pColVal->value.pData;
pTColVal->value.nData = pColVal->value.nData;
if (pTColVal->value.nData) {
memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData);
pColVal->value.pData = NULL;
code = tRealloc(&pColVal->value.pData, pColVal->value.nData);
if (code) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pColVal->value.nData) {
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
}
}
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
}
for (; iCol < pMerger->pTSchema->numOfCols; ++iCol) {
pTColumn = &pMerger->pTSchema->columns[iCol];
taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type));
}
pMerger->version = key.version;
return 0;
} else {
ASSERT(((SColVal *)pMerger->pArray->pData)->value.val == key.ts);
for (iCol = 1; iCol < pMerger->pTSchema->numOfCols && jCol < pTSchema->numOfCols; ++iCol) {
pTColumn = &pMerger->pTSchema->columns[iCol];
if (pTSchema->columns[jCol].colId < pTColumn->colId) {
++jCol;
--iCol;
continue;
} else if (pTSchema->columns[jCol].colId > pTColumn->colId) {
continue;
}
tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
if (key.version > pMerger->version) {
if (!COL_VAL_IS_NONE(pColVal)) {
if (IS_VAR_DATA_TYPE(pColVal->type)) {
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
if (!COL_VAL_IS_NULL(pColVal)) {
code = tRealloc(&pTColVal->value.pData, pColVal->value.nData);
if (code) return code;
pTColVal->value.nData = pColVal->value.nData;
if (pTColVal->value.nData) {
memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData);
}
pTColVal->flag = 0;
} else {
tFree(pTColVal->value.pData);
taosArraySet(pMerger->pArray, iCol, pColVal);
}
pTColVal->flag = 0;
} else {
tFree(pTColVal->value.pData);
taosArraySet(pMerger->pArray, iCol, pColVal);
}
} else {
taosArraySet(pMerger->pArray, iCol, pColVal);
}
}
} else if (key.version < pMerger->version) {
SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol);
if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
if ((!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
code = tRealloc(&tColVal->value.pData, pColVal->value.nData);
if (code) return code;
} else if (key.version < pMerger->version) {
SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol);
if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {
if ((!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
code = tRealloc(&tColVal->value.pData, pColVal->value.nData);
if (code) return code;
tColVal->value.nData = pColVal->value.nData;
if (pColVal->value.nData) {
memcpy(tColVal->value.pData, pColVal->value.pData, pColVal->value.nData);
tColVal->value.nData = pColVal->value.nData;
if (pColVal->value.nData) {
memcpy(tColVal->value.pData, pColVal->value.pData, pColVal->value.nData);
}
tColVal->flag = 0;
} else {
taosArraySet(pMerger->pArray, iCol, pColVal);
}
tColVal->flag = 0;
} else {
taosArraySet(pMerger->pArray, iCol, pColVal);
}
} else {
ASSERT(0 && "dup versions not allowed");
}
} else {
ASSERT(0 && "dup versions not allowed");
}
}
pMerger->version = key.version;
return code;
pMerger->version = key.version;
return code;
}
}
/*
int32_t tsdbRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0;
TSDBKEY key = TSDBROW_KEY(pRow);
SColVal *pColVal = &(SColVal){0};
STColumn *pTColumn;
pMerger->pTSchema = pTSchema;
pMerger->version = key.version;
pMerger->pArray = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal));
int32_t tsdbRowMergerInit_rv(SRowMerger* pMerger, STSchema *pSchema) {
pMerger->pTSchema = pSchema;
pMerger->pArray = taosArrayInit(pSchema->numOfCols, sizeof(SColVal));
if (pMerger->pArray == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
return TSDB_CODE_OUT_OF_MEMORY;
} else {
return TSDB_CODE_SUCCESS;
}
}
// ts
pTColumn = &pTSchema->columns[0];
ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP);
*pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.val = key.ts});
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
void tsdbRowMergerClear_rv(SRowMerger* pMerger) {
for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
if (IS_VAR_DATA_TYPE(pTColVal->type)) {
tFree(pTColVal->value.pData);
}
}
// other
for (int16_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) {
tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal);
if ((!COL_VAL_IS_NONE(pColVal)) && (!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
uint8_t *pVal = pColVal->value.pData;
pColVal->value.pData = NULL;
code = tRealloc(&pColVal->value.pData, pColVal->value.nData);
if (code) goto _exit;
if (pColVal->value.nData) {
memcpy(pColVal->value.pData, pVal, pColVal->value.nData);
}
}
taosArrayClear(pMerger->pArray);
}
if (taosArrayPush(pMerger->pArray, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
void tsdbRowMergerCleanup_rv(SRowMerger* pMerger) {
int32_t numOfCols = taosArrayGetSize(pMerger->pArray);
for (int32_t iCol = 1; iCol < numOfCols; iCol++) {
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
if (IS_VAR_DATA_TYPE(pTColVal->type)) {
tFree(pTColVal->value.pData);
}
}
_exit:
return code;
taosArrayDestroy(pMerger->pArray);
}
*/
void tsdbRowMergerClear(SRowMerger *pMerger) {
for (int32_t iCol = 1; iCol < pMerger->pTSchema->numOfCols; iCol++) {
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
......
......@@ -361,10 +361,6 @@ int32_t tSimpleHashIterateRemove(SSHashObj *pHashObj, const void *key, size_t ke
return TSDB_CODE_SUCCESS;
}
static void destroyItems(void* pItem) {
taosMemoryFree(*(void**)pItem);
}
void tSimpleHashClear(SSHashObj *pHashObj) {
if (!pHashObj || taosHashTableEmpty(pHashObj)) {
return;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册