提交 14844c7b 编写于 作者: H Haojun Liao

refactor(tsdb): opt tsdb read perf

上级 a94ace4c
......@@ -798,7 +798,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, const char *idStr,
bool strictTimeRange, SArray *pSttFileBlockIterArray, void *pCurrentFileSet, STSchema* pSchema,
int16_t* pCols, int32_t numOfCols);
int16_t* pCols, int32_t numOfCols, void* pReader);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree *pMTree);
......
......@@ -15,6 +15,7 @@
#include "tsdb.h"
#include "tsdbFSet2.h"
#include "tsdbReadUtil.h"
#include "tsdbSttFileRW.h"
static void tLDataIterClose2(SLDataIter *pIter);
......@@ -392,9 +393,9 @@ static int32_t loadSttTombBlockData(SSttFileReader* pSttFileReader, uint64_t sui
return TSDB_CODE_SUCCESS;
}
int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pReader, int32_t iStt, int8_t backward, uint64_t suid,
int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t iStt, int8_t backward, uint64_t suid,
uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo,
const char *idStr, bool strictTimeRange) {
const char *idStr, bool strictTimeRange, void* pReader1) {
int32_t code = TSDB_CODE_SUCCESS;
pIter->uid = uid;
......@@ -404,7 +405,7 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pReader, int32
pIter->verRange.maxVer = pRange->maxVer;
pIter->timeWindow.skey = pTimeWindow->skey;
pIter->timeWindow.ekey = pTimeWindow->ekey;
pIter->pReader = pReader;
pIter->pReader = pSttFileReader;
pIter->pBlockLoadInfo = pBlockLoadInfo;
if (!pBlockLoadInfo->sttBlockLoaded) {
......@@ -413,16 +414,18 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pReader, int32
code = tsdbSttFileReadSttBlk(pIter->pReader, (const TSttBlkArray **)&pBlockLoadInfo->pBlockArray);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("load stt blk, code:%s, %s", tstrerror(code), idStr);
return code;
}
code = loadSttBlockInfo(pIter, pBlockLoadInfo, suid);
code = loadSttTombBlockData(pReader, suid, pBlockLoadInfo);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("load stt block info failed, code:%s, %s", tstrerror(code), idStr);
return code;
}
code = loadSttTombDataForAll(pReader1, pIter->pReader, pBlockLoadInfo);
double el = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr);
}
......@@ -721,7 +724,7 @@ _end:
int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint64_t suid, uint64_t uid,
STimeWindow *pTimeWindow, SVersionRange *pVerRange, const char *idStr,
bool strictTimeRange, SArray *pSttFileBlockIterArray, void *pCurrentFileSet, STSchema* pSchema,
int16_t* pCols, int32_t numOfCols) {
int16_t* pCols, int32_t numOfCols, void* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
pMTree->backward = backward;
......@@ -785,7 +788,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint6
memset(pIter, 0, sizeof(SLDataIter));
code = tLDataIterOpen2(pIter, pSttFileReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange,
pLoadInfo, pMTree->idStr, strictTimeRange);
pLoadInfo, pMTree->idStr, strictTimeRange, pReader);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
......
......@@ -20,208 +20,13 @@
#include "tsdbMerge.h"
#include "tsdbUtil2.h"
#include "tsimplehash.h"
#include "tsdbReadUtil.h"
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
#define getCurrentKeyInLastBlock(_r) ((_r)->currentKey)
typedef enum {
READER_STATUS_SUSPEND = 0x1,
READER_STATUS_NORMAL = 0x2,
} EReaderStatus;
typedef enum {
EXTERNAL_ROWS_PREV = 0x1,
EXTERNAL_ROWS_MAIN = 0x2,
EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;
typedef enum {
READ_MODE_COUNT_ONLY = 0x1,
READ_MODE_ALL,
} EReadMode;
typedef struct {
STbDataIter* iter;
int32_t index;
bool hasVal;
} SIterInfo;
typedef struct {
int32_t numOfBlocks;
int32_t numOfLastFiles;
} SBlockNumber;
typedef struct SBlockIndex {
int32_t ordinalIndex;
int64_t inFileOffset;
STimeWindow window; // todo replace it with overlap flag.
} SBlockIndex;
typedef struct STableBlockScanInfo {
uint64_t uid;
TSKEY lastKey;
TSKEY lastKeyInStt; // last accessed key in stt
SArray* pBlockList; // block data index list, SArray<SBrinRecord>
SArray* pMemDelData; // SArray<SDelData>
SArray* pfileDelData; // SArray<SDelData> from each file set
SIterInfo iter; // mem buffer skip list iterator
SIterInfo iiter; // imem buffer skip list iterator
SArray* delSkyline; // delete info for this table
int32_t fileDelIndex; // file block delete index
int32_t lastBlockDelIndex; // delete index for last block
bool iterInit; // whether to initialize the in-memory skip list iterator or not
} STableBlockScanInfo;
typedef struct SBlockOrderWrapper {
int64_t uid;
int64_t offset;
STableBlockScanInfo *pInfo;
} SBlockOrderWrapper;
typedef struct SBlockOrderSupporter {
SBlockOrderWrapper** pDataBlockInfo;
int32_t* indexPerTable;
int32_t* numOfBlocksPerTable;
int32_t numOfTables;
} SBlockOrderSupporter;
typedef struct SIOCostSummary {
int64_t numOfBlocks;
double blockLoadTime;
double buildmemBlock;
int64_t headFileLoad;
double headFileLoadTime;
int64_t smaDataLoad;
double smaLoadTime;
int64_t lastBlockLoad;
double lastBlockLoadTime;
int64_t composedBlocks;
double buildComposedBlockTime;
double createScanInfoList;
double createSkylineIterTime;
double initLastBlockReader;
} SCostSummary;
typedef struct SBlockLoadSuppInfo {
TColumnDataAggArray colAggArray;
SColumnDataAgg tsColAgg;
int16_t* colId;
int16_t* slotId;
int32_t numOfCols;
char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated.
bool smaValid; // the sma on all queried columns are activated
} SBlockLoadSuppInfo;
typedef struct SLastBlockReader {
STimeWindow window;
SVersionRange verRange;
int32_t order;
uint64_t uid;
SMergeTree mergeTree;
SSttBlockLoadInfo* pInfo;
int64_t currentKey;
} SLastBlockReader;
typedef struct SFilesetIter {
int32_t numOfFiles; // number of total files
int32_t index; // current accessed index in the list
TFileSetArray* pFilesetList;// data file set list
int32_t order;
SLastBlockReader* pLastBlockReader; // last file block reader
} SFilesetIter;
typedef struct SFileDataBlockInfo {
// index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
uint64_t uid;
int32_t tbBlockIdx;
SBrinRecord record;
} SFileDataBlockInfo;
typedef struct SDataBlockIter {
int32_t numOfBlocks;
int32_t index;
SArray* blockList; // SArray<SFileDataBlockInfo>
int32_t order;
SDataBlk block; // current SDataBlk data
SSHashObj* pTableMap;
} SDataBlockIter;
typedef struct SFileBlockDumpInfo {
int32_t totalRows;
int32_t rowIndex;
int64_t lastKey;
bool allDumped;
} SFileBlockDumpInfo;
typedef struct STableUidList {
uint64_t* tableUidList; // access table uid list in uid ascending order list
int32_t currentIndex; // index in table uid list
} STableUidList;
typedef struct SReaderStatus {
bool loadFromFile; // check file stage
bool composedDataBlock; // the returned data block is a composed block or not
SSHashObj* pTableMap; // SHash<STableBlockScanInfo>
STableBlockScanInfo** pTableIter; // table iterator used in building in-memory buffer data blocks.
STableUidList uidList; // check tables in uid order, to avoid the repeatly load of blocks in STT.
SFileBlockDumpInfo fBlockDumpInfo;
STFileSet* pCurrentFileset;// current opened file set
SBlockData fileBlockData;
SFilesetIter fileIter;
SDataBlockIter blockIter;
SArray* pLDataIterArray;
SRowMerger merger;
SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data
TFileSetArray* pfSetArray;
} SReaderStatus;
typedef struct SBlockInfoBuf {
int32_t currentIndex;
SArray* pData;
int32_t numPerBucket;
int32_t numOfTables;
} SBlockInfoBuf;
typedef struct STsdbReaderInfo {
uint64_t suid;
STSchema* pSchema;
EReadMode readMode;
uint64_t rowsNum;
STimeWindow window;
SVersionRange verRange;
int16_t order;
} STsdbReaderInfo;
typedef struct SResultBlockInfo {
SSDataBlock* pResBlock;
bool freeBlock;
int64_t capacity;
} SResultBlockInfo;
struct STsdbReader {
STsdb* pTsdb;
STsdbReaderInfo info;
TdThreadMutex readerMutex;
EReaderStatus flag;
int32_t code;
uint64_t rowsNum;
SResultBlockInfo resBlockInfo;
SReaderStatus status;
char* idStr; // query info handle, for debug purpose
int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
SBlockLoadSuppInfo suppInfo;
STsdbReadSnap* pReadSnap;
SCostSummary cost;
SHashObj** pIgnoreTables;
SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema
SDataFileReader* pFileReader; // the file reader
SBlockInfoBuf blockInfoBuf;
EContentData step;
STsdbReader* innerReader[2];
};
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
STsdbReader* pReader);
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
......@@ -243,9 +48,9 @@ static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScan
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
STsdbReader* pReader);
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost);
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
int8_t* pLevel);
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost);
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
int8_t* pLevel);
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t doBuildDataBlock(STsdbReader* pReader);
......@@ -254,8 +59,6 @@ static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFil
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
static STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id);
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList,
......@@ -297,8 +100,7 @@ static int32_t updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInf
i += 1;
j += 1;
} else if (pTCol->colId < pSupInfo->colId[j]) {
// do nothing
} else if (pTCol->colId < pSupInfo->colId[j]) { // do nothing
i += 1;
} else {
return TSDB_CODE_INVALID_PARA;
......@@ -308,207 +110,6 @@ static int32_t updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInf
return TSDB_CODE_SUCCESS;
}
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
int32_t num = numOfTables / pBuf->numPerBucket;
int32_t remainder = numOfTables % pBuf->numPerBucket;
if (pBuf->pData == NULL) {
pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
}
for (int32_t i = 0; i < num; ++i) {
char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pBuf->pData, &p);
}
if (remainder > 0) {
char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pBuf->pData, &p);
}
pBuf->numOfTables = numOfTables;
return TSDB_CODE_SUCCESS;
}
static int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
if (numOfTables <= pBuf->numOfTables) {
return TSDB_CODE_SUCCESS;
}
if (pBuf->numOfTables > 0) {
STableBlockScanInfo** p = (STableBlockScanInfo**)taosArrayPop(pBuf->pData);
taosMemoryFree(*p);
pBuf->numOfTables /= pBuf->numPerBucket;
}
int32_t num = (numOfTables - pBuf->numOfTables) / pBuf->numPerBucket;
int32_t remainder = (numOfTables - pBuf->numOfTables) % pBuf->numPerBucket;
if (pBuf->pData == NULL) {
pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
}
for (int32_t i = 0; i < num; ++i) {
char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pBuf->pData, &p);
}
if (remainder > 0) {
char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pBuf->pData, &p);
}
pBuf->numOfTables = numOfTables;
return TSDB_CODE_SUCCESS;
}
static void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) {
size_t num = taosArrayGetSize(pBuf->pData);
for (int32_t i = 0; i < num; ++i) {
char** p = taosArrayGet(pBuf->pData, i);
taosMemoryFree(*p);
}
taosArrayDestroy(pBuf->pData);
}
static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
int32_t bucketIndex = index / pBuf->numPerBucket;
char** pBucket = taosArrayGet(pBuf->pData, bucketIndex);
return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo);
}
static int32_t uidComparFunc(const void* p1, const void* p2) {
uint64_t pu1 = *(uint64_t*)p1;
uint64_t pu2 = *(uint64_t*)p2;
if (pu1 == pu2) {
return 0;
} else {
return (pu1 < pu2) ? -1 : 1;
}
}
// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
static SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
STableUidList* pUidList, int32_t numOfTables) {
// allocate buffer in order to load data blocks from file
// todo use simple hash instead, optimize the memory consumption
SSHashObj* pTableMap = tSimpleHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
if (pTableMap == NULL) {
return NULL;
}
int64_t st = taosGetTimestampUs();
initBlockScanInfoBuf(pBuf, numOfTables);
pUidList->tableUidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
if (pUidList->tableUidList == NULL) {
tSimpleHashCleanup(pTableMap);
return NULL;
}
pUidList->currentIndex = 0;
for (int32_t j = 0; j < numOfTables; ++j) {
STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(pBuf, j);
pScanInfo->uid = idList[j].uid;
pUidList->tableUidList[j] = idList[j].uid;
if (ASCENDING_TRAVERSE(pTsdbReader->info.order)) {
int64_t skey = pTsdbReader->info.window.skey;
pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
pScanInfo->lastKeyInStt = skey;
} else {
int64_t ekey = pTsdbReader->info.window.ekey;
pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
pScanInfo->lastKeyInStt = ekey;
}
tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid,
pScanInfo->lastKey, pTsdbReader->idStr);
}
taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc);
pTsdbReader->cost.createScanInfoList = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, elapsed time:%.2f ms, %s", pTsdbReader, numOfTables,
(sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->cost.createScanInfoList,
pTsdbReader->idStr);
return pTableMap;
}
static void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) {
void *p = NULL;
int32_t iter = 0;
while ((p = tSimpleHashIterate(pTableMap, p, &iter)) != NULL) {
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
pInfo->iterInit = false;
pInfo->iter.hasVal = false;
pInfo->iiter.hasVal = false;
if (pInfo->iter.iter != NULL) {
pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
}
if (pInfo->iiter.iter != NULL) {
pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
}
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
pInfo->lastKey = ts;
pInfo->lastKeyInStt = ts + step;
}
}
static void clearBlockScanInfo(STableBlockScanInfo* p) {
p->iterInit = false;
p->iter.hasVal = false;
p->iiter.hasVal = false;
if (p->iter.iter != NULL) {
p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
}
if (p->iiter.iter != NULL) {
p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
}
p->delSkyline = taosArrayDestroy(p->delSkyline);
p->pBlockList = taosArrayDestroy(p->pBlockList);
p->pMemDelData = taosArrayDestroy(p->pMemDelData);
p->pfileDelData = taosArrayDestroy(p->pfileDelData);
}
static void destroyAllBlockScanInfo(SSHashObj* pTableMap) {
void* p = NULL;
int32_t iter = 0;
while ((p = tSimpleHashIterate(pTableMap, p, &iter)) != NULL) {
clearBlockScanInfo(*(STableBlockScanInfo**)p);
}
tSimpleHashCleanup(pTableMap);
}
static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) { return pWindow->skey > pWindow->ekey; }
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
......@@ -572,7 +173,8 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo
pIter->pLastBlockReader->uid = 0;
tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &pSum->lastBlockLoad, &pSum->lastBlockLoadTime);
pReader->status.pLDataIterArray =
destroySttBlockReader(pReader->status.pLDataIterArray, &pSum->lastBlockLoad, &pSum->lastBlockLoadTime);
pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
// check file the time range of coverage
......@@ -645,7 +247,7 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo
return TSDB_CODE_SUCCESS;
}
_err:
_err:
*hasNext = false;
return code;
}
......@@ -752,7 +354,8 @@ void tsdbReleaseDataBlock2(STsdbReader* pReader) {
}
}
static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacity, SSDataBlock* pResBlock, SQueryTableDataCond* pCond) {
static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacity, SSDataBlock* pResBlock,
SQueryTableDataCond* pCond) {
pResBlockInfo->capacity = capacity;
pResBlockInfo->pResBlock = pResBlock;
terrno = 0;
......@@ -835,7 +438,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
*ppReader = pReader;
return code;
_end:
_end:
tsdbReaderClose(pReader);
*ppReader = NULL;
return code;
......@@ -898,15 +501,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead
break;
}
// if (taosArrayGetSize(pIndexList) == 0) {
taosArrayPush(pIndexList, pBrinBlk);
// } else {
// if (newBlk) {
// taosArrayPush(pIndexList, pBrinBlk);
// }
// newBlk = false;
// }
taosArrayPush(pIndexList, pBrinBlk);
i += 1;
}
......@@ -922,76 +517,13 @@ _end:
return code;
}
static void doCleanupTableScanInfo(STableBlockScanInfo* pScanInfo) {
// reset the index in last block when handing a new file
taosArrayClear(pScanInfo->pBlockList);
taosArrayClear(pScanInfo->pfileDelData); // del data from each file set
}
static void cleanupTableScanInfo(SReaderStatus* pStatus) {
SSHashObj* pTableMap = pStatus->pTableMap;
STableBlockScanInfo** px = NULL;
int32_t iter = 0;
while (1) {
px = tSimpleHashIterate(pTableMap, px, &iter);
if (px == NULL) {
break;
}
doCleanupTableScanInfo(*px);
}
}
typedef struct SBrinRecordIter {
SArray* pBrinBlockList;
SBrinBlk* pCurrentBlk;
int32_t blockIndex;
int32_t recordIndex;
SDataFileReader* pReader;
SBrinBlock block;
SBrinRecord record;
} SBrinRecordIter;
void initBrinRecordIter(SBrinRecordIter* pIter, SDataFileReader* pReader, SArray* pList) {
memset(&pIter->block, 0, sizeof(SBrinBlock));
memset(&pIter->record, 0, sizeof(SBrinRecord));
pIter->blockIndex = -1;
pIter->recordIndex = -1;
pIter->pReader = pReader;
pIter->pBrinBlockList = pList;
}
SBrinRecord* getNextBrinRecord(SBrinRecordIter* pIter) {
if (pIter->blockIndex == -1 || (pIter->recordIndex + 1) >= TARRAY2_SIZE(pIter->block.numRow)) {
pIter->blockIndex += 1;
if (pIter->blockIndex >= taosArrayGetSize(pIter->pBrinBlockList)) {
return NULL;
}
pIter->pCurrentBlk = taosArrayGet(pIter->pBrinBlockList, pIter->blockIndex);
tBrinBlockClear(&pIter->block);
tsdbDataFileReadBrinBlock(pIter->pReader, pIter->pCurrentBlk, &pIter->block);
pIter->recordIndex = -1;
}
pIter->recordIndex += 1;
tBrinBlockGet(&pIter->block, pIter->recordIndex, &pIter->record);
return &pIter->record;
}
void clearBrinBlockIter(SBrinRecordIter* pIter) {
tBrinBlockDestroy(&pIter->block);
}
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum, SArray* pTableScanInfoList) {
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum,
SArray* pTableScanInfoList) {
size_t sizeInDisk = 0;
int64_t st = taosGetTimestampUs();
// clear info for the new file
cleanupTableScanInfo(&pReader->status);
cleanupInfoFoxNextFileset(pReader->status.pTableMap);
int32_t k = 0;
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
......@@ -1196,18 +728,18 @@ static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData
endPos = doBinarySearchKey(pBlockData->aTSKEY, pRecord->numRow, pos, key, pReader->info.order);
}
if ((pReader->info.verRange.maxVer >= pRecord->minVer && pReader->info.verRange.maxVer < pRecord->maxVer)||
if ((pReader->info.verRange.maxVer >= pRecord->minVer && pReader->info.verRange.maxVer < pRecord->maxVer) ||
(pReader->info.verRange.minVer <= pRecord->maxVer && pReader->info.verRange.minVer > pRecord->minVer)) {
int32_t i = endPos;
if (asc) {
for(; i >= 0; --i) {
for (; i >= 0; --i) {
if (pBlockData->aVersion[i] <= pReader->info.verRange.maxVer) {
break;
}
}
} else {
for(; i < pRecord->numRow; ++i) {
for (; i < pRecord->numRow; ++i) {
if (pBlockData->aVersion[i] >= pReader->info.verRange.minVer) {
break;
}
......@@ -1361,7 +893,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pRecord->numRow - 1 && (!asc))) {
if (asc && pReader->info.window.skey <= pRecord->firstKey && pReader->info.verRange.minVer <= pRecord->minVer) {
// pDumpInfo->rowIndex = 0;
} else if (!asc && pReader->info.window.ekey >= pRecord->lastKey && pReader->info.verRange.maxVer >= pRecord->maxVer) {
} else if (!asc && pReader->info.window.ekey >= pRecord->lastKey &&
pReader->info.verRange.maxVer >= pRecord->maxVer) {
// pDumpInfo->rowIndex = pRecord->numRow - 1;
} else { // find the appropriate the start position in current block, and set it to be the current rowIndex
int32_t pos = asc ? pRecord->numRow - 1 : 0;
......@@ -1381,17 +914,17 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
ASSERT(pReader->info.verRange.minVer <= pRecord->maxVer && pReader->info.verRange.maxVer >= pRecord->minVer);
// find the appropriate start position that satisfies the version requirement.
if ((pReader->info.verRange.maxVer >= pRecord->minVer && pReader->info.verRange.maxVer < pRecord->maxVer)||
if ((pReader->info.verRange.maxVer >= pRecord->minVer && pReader->info.verRange.maxVer < pRecord->maxVer) ||
(pReader->info.verRange.minVer <= pRecord->maxVer && pReader->info.verRange.minVer > pRecord->minVer)) {
int32_t i = pDumpInfo->rowIndex;
if (asc) {
for(; i < pRecord->numRow; ++i) {
for (; i < pRecord->numRow; ++i) {
if (pBlockData->aVersion[i] >= pReader->info.verRange.minVer) {
break;
}
}
} else {
for(; i >= 0; --i) {
for (; i >= 0; --i) {
if (pBlockData->aVersion[i] <= pReader->info.verRange.maxVer) {
break;
}
......@@ -1478,7 +1011,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
// check if current block are all handled
if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pRecord->numRow) {
int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
if (outOfTimeWindow(ts, &pReader->info.window)) { // the remain data has out of query time window, ignore current block
if (outOfTimeWindow(ts,
&pReader->info.window)) { // the remain data has out of query time window, ignore current block
setBlockAllDumped(pDumpInfo, ts, pReader->info.order);
}
} else {
......@@ -1491,7 +1025,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
int32_t unDumpedRows = asc ? pRecord->numRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%" PRIu64 " elapsed time:%.2f ms, %s",
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%" PRIu64 " elapsed time:%.2f ms, %s",
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey, pRecord->lastKey, dumpedRows,
unDumpedRows, pRecord->minVer, pRecord->maxVer, pBlockInfo->uid, elapsedTime, pReader->idStr);
......@@ -1562,200 +1096,6 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
return TSDB_CODE_SUCCESS;
}
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
taosMemoryFreeClear(pSup->numOfBlocksPerTable);
taosMemoryFreeClear(pSup->indexPerTable);
for (int32_t i = 0; i < pSup->numOfTables; ++i) {
SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
taosMemoryFreeClear(pBlockInfo);
}
taosMemoryFreeClear(pSup->pDataBlockInfo);
}
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
cleanupBlockOrderSupporter(pSup);
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
int32_t leftIndex = *(int32_t*)pLeft;
int32_t rightIndex = *(int32_t*)pRight;
SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
/* left block is empty */
return 1;
} else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
/* right block is empty */
return -1;
}
SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
}
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) {
#if 0
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
if (pBlockInfo != NULL) {
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pBlockIter->pTableMap, pBlockInfo->uid, idStr);
if (pScanInfo == NULL) {
return terrno;
}
SBlockIndex* pIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx);
tMapDataGetItemByIdx(&pScanInfo->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk);
}
#endif
#if 0
qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset);
#endif
return TSDB_CODE_SUCCESS;
}
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks, SArray* pTableList) {
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
SBlockOrderSupporter sup = {0};
pBlockIter->numOfBlocks = numOfBlocks;
taosArrayClear(pBlockIter->blockList);
pBlockIter->pTableMap = pReader->status.pTableMap;
// access data blocks according to the offset of each block in asc/desc order.
int32_t numOfTables = taosArrayGetSize(pTableList);
int64_t st = taosGetTimestampUs();
int32_t code = initBlockOrderSupporter(&sup, numOfTables);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t cnt = 0;
for (int32_t i = 0; i < numOfTables; ++i) {
STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, i);
// ASSERT(pTableScanInfo->pBlockList != NULL && taosArrayGetSize(pTableScanInfo->pBlockList) > 0);
size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
sup.numOfBlocksPerTable[sup.numOfTables] = num;
char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
if (buf == NULL) {
cleanupBlockOrderSupporter(&sup);
return TSDB_CODE_OUT_OF_MEMORY;
}
sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
for (int32_t k = 0; k < num; ++k) {
SBrinRecord* pRecord = taosArrayGet(pTableScanInfo->pBlockList, k);
sup.pDataBlockInfo[sup.numOfTables][k] =
(SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pRecord->blockOffset, .pInfo = pTableScanInfo};
cnt++;
}
sup.numOfTables += 1;
}
if (numOfBlocks != cnt && sup.numOfTables != numOfTables) {
cleanupBlockOrderSupporter(&sup);
return TSDB_CODE_INVALID_PARA;
}
// since there is only one table qualified, blocks are not sorted
if (sup.numOfTables == 1) {
for (int32_t i = 0; i < numOfBlocks; ++i) {
SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
blockInfo.record = *(SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[0][i].pInfo->pBlockList, i);
taosArrayPush(pBlockIter->blockList, &blockInfo);
}
int64_t et = taosGetTimestampUs();
tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
cleanupBlockOrderSupporter(&sup);
doSetCurrentBlock(pBlockIter, pReader->idStr);
return TSDB_CODE_SUCCESS;
}
tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
pReader->idStr);
SMultiwayMergeTreeInfo* pTree = NULL;
uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
if (ret != TSDB_CODE_SUCCESS) {
cleanupBlockOrderSupporter(&sup);
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t numOfTotal = 0;
while (numOfTotal < cnt) {
int32_t pos = tMergeTreeGetChosenIndex(pTree);
int32_t index = sup.indexPerTable[pos]++;
SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
blockInfo.record = *(SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index);
taosArrayPush(pBlockIter->blockList, &blockInfo);
// set data block index overflow, in order to disable the offset comparator
if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
}
numOfTotal += 1;
tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
}
int64_t et = taosGetTimestampUs();
tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
(et - st) / 1000.0, pReader->idStr);
cleanupBlockOrderSupporter(&sup);
taosMemoryFree(pTree);
pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
doSetCurrentBlock(pBlockIter, pReader->idStr);
return TSDB_CODE_SUCCESS;
}
static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
bool asc = ASCENDING_TRAVERSE(pBlockIter->order);
int32_t step = asc ? 1 : -1;
if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
return false;
}
pBlockIter->index += step;
doSetCurrentBlock(pBlockIter, idStr);
return true;
}
/**
* This is an two rectangles overlap cases.
*/
......@@ -1778,8 +1118,8 @@ static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBl
}
int32_t step = asc ? 1 : -1;
// *nextIndex = pBlockInfo->tbBlockIdx + step;
// *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
// *nextIndex = pBlockInfo->tbBlockIdx + step;
// *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
SBrinRecord* p = taosArrayGet(pTableBlockScanInfo->pBlockList, pBlockInfo->tbBlockIdx + step);
memcpy(pRecord, p, sizeof(SBrinRecord));
......@@ -1819,20 +1159,10 @@ static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t
ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx);
}
doSetCurrentBlock(pBlockIter, "");
return TSDB_CODE_SUCCESS;
}
// todo: this attribute could be acquired during extractin the global ordered block list.
static bool overlapWithNeighborBlock(SDataBlk* pBlock, SBlockIndex* pNeighborBlockIndex, int32_t order) {
// it is the last block in current file, no chance to overlap with neighbor blocks.
if (ASCENDING_TRAVERSE(order)) {
return pBlock->maxKey.ts == pNeighborBlockIndex->window.skey;
} else {
return pBlock->minKey.ts == pNeighborBlockIndex->window.ekey;
}
}
static bool overlapWithNeighborBlock2(SFileDataBlockInfo* pBlock, SBrinRecord* pRec, int32_t order) {
// it is the last block in current file, no chance to overlap with neighbor blocks.
if (ASCENDING_TRAVERSE(order)) {
......@@ -1981,8 +1311,8 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlock
return loadDataBlock;
}
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo,
STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo,
TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) {
SDataBlockToLoadInfo info = {0};
getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader, pReader);
bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf ||
......@@ -2007,7 +1337,7 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%" PRId64 ", brange:%" PRId64
" - %" PRId64 ", uid:%" PRIu64 ", %s",
" - %" PRId64 ", uid:%" PRIu64 ", %s",
pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
pBlockScanInfo->uid, pReader->idStr);
......@@ -2053,13 +1383,14 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
}
TSDBROW* pRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
int64_t key = pRow->pBlockData->aTSKEY[pRow->iRow];
int64_t ver = pRow->pBlockData->aVersion[pRow->iRow];
int64_t key = pRow->pBlockData->aTSKEY[pRow->iRow];
int64_t ver = pRow->pBlockData->aVersion[pRow->iRow];
pLastBlockReader->currentKey = key;
pScanInfo->lastKeyInStt = key;
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, key, ver, pLastBlockReader->order, pVerRange)) {
if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, key, ver, pLastBlockReader->order,
pVerRange)) {
return true;
}
}
......@@ -2116,7 +1447,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
}
STSchema* ptr = NULL;
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &ptr);
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &ptr);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
......@@ -2209,7 +1540,8 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return code;
}
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
pReader->idStr);
}
if (minKey == k.ts) {
......@@ -2239,7 +1571,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return terrno;
}
int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2261,7 +1593,8 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return code;
}
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
pReader->idStr);
}
if (minKey == key) {
......@@ -2294,7 +1627,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
bool mergeBlockData) {
SRowMerger* pMerger = &pReader->status.merger;
SRowMerger* pMerger = &pReader->status.merger;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
......@@ -2304,9 +1637,10 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
TSDBROW* pRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
// create local variable to hold the row value
TSDBROW fRow = {.iRow = pRow->iRow, .type = TSDBROW_COL_FMT, .pBlockData = pRow->pBlockData};
TSDBROW fRow = {.iRow = pRow->iRow, .type = TSDBROW_COL_FMT, .pBlockData = pRow->pBlockData};
tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", pRow->pBlockData, pRow->iRow, pLastBlockReader->uid, pReader->idStr);
tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", pRow->pBlockData, pRow->iRow, pLastBlockReader->uid,
pReader->idStr);
// only last block exists
if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
......@@ -2326,7 +1660,8 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMergerAdd(pMerger, pRow1, NULL);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange, pReader->idStr);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange,
pReader->idStr);
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
......@@ -2348,7 +1683,8 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
return code;
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange, pReader->idStr);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange,
pReader->idStr);
// merge with block data if ts == key
if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
......@@ -2376,7 +1712,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SRowMerger* pMerger = &pReader->status.merger;
SRowMerger* pMerger = &pReader->status.merger;
// merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized
if (pMerger->pArray == NULL) {
......@@ -2401,7 +1737,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if (key == ts) {
SRow* pTSRow = NULL;
SRow* pTSRow = NULL;
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -2539,7 +1875,8 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
}
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
pReader->idStr);
}
if (minKey == ik.ts) {
......@@ -2615,7 +1952,8 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
return code;
}
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange,
pReader->idStr);
}
if (minKey == key) {
......@@ -2675,34 +2013,6 @@ int32_t doInitMemDataIter(STsdbReader* pReader, STbData** pData, STableBlockScan
return code;
}
static void doLoadMemTombData(STableBlockScanInfo* pScanInfo, STbData* pMemTbData, STbData* piMemTbData, int64_t ver) {
if (pScanInfo->pMemDelData == NULL) {
pScanInfo->pMemDelData = taosArrayInit(4, sizeof(SDelData));
}
SDelData* p = NULL;
if (pMemTbData != NULL) {
p = pMemTbData->pHead;
while (p) {
if (p->version <= ver) {
taosArrayPush(pScanInfo->pMemDelData, p);
}
p = p->pNext;
}
}
if (piMemTbData != NULL) {
p = piMemTbData->pHead;
while (p) {
if (p->version <= ver) {
taosArrayPush(pScanInfo->pMemDelData, p);
}
p = p->pNext;
}
}
}
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
if (pBlockScanInfo->iterInit) {
return TSDB_CODE_SUCCESS;
......@@ -2716,20 +2026,20 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->info.verRange.maxVer};
}
int32_t code = doInitMemDataIter(pReader, &d, pBlockScanInfo, &startKey, pReader->pReadSnap->pMem,
&pBlockScanInfo->iter, "mem");
int32_t code =
doInitMemDataIter(pReader, &d, pBlockScanInfo, &startKey, pReader->pReadSnap->pMem, &pBlockScanInfo->iter, "mem");
if (code != TSDB_CODE_SUCCESS) {
return code;
}
STbData* di = NULL;
code = doInitMemDataIter(pReader, &di, pBlockScanInfo, &startKey, pReader->pReadSnap->pIMem,
&pBlockScanInfo->iiter, "imem");
code = doInitMemDataIter(pReader, &di, pBlockScanInfo, &startKey, pReader->pReadSnap->pIMem, &pBlockScanInfo->iiter,
"imem");
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doLoadMemTombData(pBlockScanInfo, d, di, pReader->info.verRange.maxVer);
loadMemTombData(pBlockScanInfo, d, di, pReader->info.verRange.maxVer);
pBlockScanInfo->iterInit = true;
return TSDB_CODE_SUCCESS;
......@@ -2764,159 +2074,6 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
return true;
}
static int32_t checkTombBlockRecords(SArray* pData, STombBlock* pBlock, uint64_t suid, uint64_t uid, int64_t maxVer) {
STombRecord record = {0};
for (int32_t j = 0; j < pBlock->suid->size; ++j) {
int32_t code = tTombBlockGet(pBlock, j, &record);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (record.suid < suid) {
continue;
}
if (record.suid > suid || (record.suid == suid && record.uid > uid)) {
break;
}
if (record.uid < uid) {
continue;
}
if (record.version <= maxVer) {
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
taosArrayPush(pData, &delData);
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t loadTombRecordsFromSttFiles(SArray* pLDataIterList, uint64_t suid, STableBlockScanInfo* pBlockScanInfo,
uint64_t maxVer) {
int32_t size = taosArrayGetSize(pLDataIterList);
if (size <= 0) {
return TSDB_CODE_SUCCESS;
}
uint64_t uid = pBlockScanInfo->uid;
if (pBlockScanInfo->pfileDelData == NULL) {
pBlockScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData));
}
for(int32_t i = 0; i < size; ++i) {
SArray* pLeveledLDataIter = taosArrayGetP(pLDataIterList, i);
int32_t numOfIter = taosArrayGetSize(pLeveledLDataIter);
if (numOfIter == 0) {
continue;
}
for (int32_t f = 0; f < numOfIter; ++f) {
SLDataIter* pIter = taosArrayGetP(pLeveledLDataIter, f);
SArray* pTombBlockArray = pIter->pBlockLoadInfo->pTombBlockArray;
int32_t numOfBlocks = taosArrayGetSize(pTombBlockArray);
for (int32_t k = 0; k < numOfBlocks; ++k) {
STombBlock* pBlock = taosArrayGetP(pTombBlockArray, k);
int32_t code = checkTombBlockRecords(pBlockScanInfo->pfileDelData, pBlock, suid, uid, maxVer);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t loadTombRecordsFromDataFiles(STsdbReader* pReader, int32_t numOfTables) {
if (pReader->status.pCurrentFileset == NULL || pReader->status.pCurrentFileset->farr[3] == NULL) {
return TSDB_CODE_SUCCESS;
}
const TTombBlkArray* pBlkArray = NULL;
int32_t code = tsdbDataFileReadTombBlk(pReader->pFileReader, &pBlkArray);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// todo find the correct start position.
int32_t i = 0, j = 0;
while (i < pBlkArray->size && j < numOfTables) {
STombBlock block = {0};
code = tsdbDataFileReadTombBlock(pReader->pFileReader, &pBlkArray->data[i], &block);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
uint64_t uid = pReader->status.uidList.tableUidList[j];
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
if (pScanInfo->pfileDelData == NULL) {
pScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData));
}
STombRecord record = {0};
for (int32_t k = 0; k < TARRAY2_SIZE(block.suid); ++k) {
code = tTombBlockGet(&block, k, &record);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (record.suid < pReader->info.suid) {
continue;
}
if (record.suid > pReader->info.suid) {
tTombBlockDestroy(&block);
return TSDB_CODE_SUCCESS;
}
bool newTable = false;
if (uid < record.uid) {
while (pReader->status.uidList.tableUidList[j] < record.uid && j < numOfTables) {
j += 1;
newTable = true;
}
if (j >= numOfTables) {
tTombBlockDestroy(&block);
break;
}
uid = pReader->status.uidList.tableUidList[j];
}
if (record.uid < uid) {
continue;
}
ASSERT(record.suid == pReader->info.suid && uid == record.uid);
if (newTable) {
pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
if (pScanInfo->pfileDelData == NULL) {
pScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData));
}
}
if (record.version <= pReader->info.verRange.maxVer) {
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
taosArrayPush(pScanInfo->pfileDelData, &delData);
}
}
i += 1;
tTombBlockDestroy(&block);
}
return TSDB_CODE_SUCCESS;
}
static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
// the last block reader has been initialized for this table.
if (pLBlockReader->uid == pScanInfo->uid) {
......@@ -2939,16 +2096,11 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
int64_t st = taosGetTimestampUs();
tsdbDebug("init last block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey,
pScanInfo->uid, pReader->idStr);
int32_t code = tMergeTreeOpen2(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pTsdb,
pReader->info.suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pReader->idStr, false,
pReader->status.pLDataIterArray, pReader->status.pCurrentFileset, pReader->info.pSchema,
pReader->suppInfo.colId, pReader->suppInfo.numOfCols);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
code = loadTombRecordsFromSttFiles(pReader->status.pLDataIterArray, pReader->info.suid, pScanInfo, pReader->info.verRange.maxVer);
int32_t code = tMergeTreeOpen2(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pTsdb,
pReader->info.suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pReader->idStr,
false, pReader->status.pLDataIterArray, pReader->status.pCurrentFileset,
pReader->info.pSchema, pReader->suppInfo.colId, pReader->suppInfo.numOfCols, pReader);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
......@@ -2961,7 +2113,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
int64_t el = taosGetTimestampUs() - st;
pReader->cost.initLastBlockReader += (el / 1000.0);
tsdbDebug("init last block reader completed, elapsed time:%"PRId64"us %s", el, pReader->idStr);
tsdbDebug("init last block reader completed, elapsed time:%" PRId64 "us %s", el, pReader->idStr);
return code;
}
......@@ -3000,7 +2152,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
} else {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
SRow* pTSRow = NULL;
SRow* pTSRow = NULL;
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -3055,9 +2207,9 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo,
STsdbReader* pReader, bool* loadNeighbor) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t step = ASCENDING_TRAVERSE(pReader->info.order) ? 1 : -1;
int32_t nextIndex = -1;
int32_t code = TSDB_CODE_SUCCESS;
int32_t step = ASCENDING_TRAVERSE(pReader->info.order) ? 1 : -1;
int32_t nextIndex = -1;
*loadNeighbor = false;
......@@ -3113,10 +2265,10 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
int64_t st = taosGetTimestampUs();
int32_t step = asc ? 1 : -1;
double el = 0;
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
int64_t st = taosGetTimestampUs();
int32_t step = asc ? 1 : -1;
double el = 0;
SBrinRecord* pRecord = &pBlockInfo->record;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
......@@ -3134,7 +2286,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
}
pRecord = &pBlockInfo->record;
TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader);
// it is a clean block, load it directly
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
......@@ -3153,8 +2305,9 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
} else { // file blocks not exist
ASSERT(0);
pBlockScanInfo = *pReader->status.pTableIter;
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockScanInfo->uid, sizeof(pBlockScanInfo->uid))) {
// setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->info.order);
if (pReader->pIgnoreTables &&
taosHashGet(*pReader->pIgnoreTables, &pBlockScanInfo->uid, sizeof(pBlockScanInfo->uid))) {
// setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->info.order);
return code;
}
}
......@@ -3200,7 +2353,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
// currently loaded file data block is consumed
if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
// pBlock = getCurrentBlock(&pReader->status.blockIter);
// pBlock = getCurrentBlock(&pReader->status.blockIter);
setBlockAllDumped(pDumpInfo, pRecord->lastKey, pReader->info.order);
break;
}
......@@ -3210,13 +2363,13 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
}
}
_end:
_end:
el = (taosGetTimestampUs() - st) / 1000.0;
updateComposedBlockInfo(pReader, el, pBlockScanInfo);
if (pResBlock->info.rows > 0) {
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%" PRId64
", elapsed time:%.2f ms %s",
", elapsed time:%.2f ms %s",
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr);
}
......@@ -3243,7 +2396,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t orde
}
int64_t st = taosGetTimestampUs();
if (pBlockScanInfo->delSkyline != NULL) {
taosArrayClear(pBlockScanInfo->delSkyline);
} else {
......@@ -3258,7 +2411,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t orde
}
code = tsdbBuildDeleteSkyline(pSource, 0, taosArrayGetSize(pSource) - 1, pBlockScanInfo->delSkyline);
taosArrayClear(pBlockScanInfo->pfileDelData);
int32_t index = getInitialDelIndex(pBlockScanInfo->delSkyline, order);
......@@ -3269,12 +2422,12 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t orde
double el = taosGetTimestampUs() - st;
pCost->createSkylineIterTime = el / 1000.0;
return code;
}
TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
TSDBKEY key = {.ts = TSKEY_INITIAL_VAL}, ikey = {.ts = TSKEY_INITIAL_VAL};
bool hasKey = false, hasIKey = false;
......@@ -3356,7 +2509,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
}
taosArrayDestroy(pIndexList);
return loadTombRecordsFromDataFiles(pReader, numOfTables);
return loadDataFileTombDataForAll(pReader);
}
static void resetTableListIndex(SReaderStatus* pStatus) {
......@@ -3401,7 +2554,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) {
// reset the index in last block when handing a new file
// doCleanupTableScanInfo(pScanInfo);
// doCleanupTableScanInfo(pScanInfo);
bool hasNexTable = moveToNextTable(pUidList, pStatus);
if (!hasNexTable) {
return TSDB_CODE_SUCCESS;
......@@ -3411,7 +2564,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
}
// reset the index in last block when handing a new file
// doCleanupTableScanInfo(pScanInfo);
// doCleanupTableScanInfo(pScanInfo);
bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
if (!hasDataInLastFile) {
......@@ -3447,7 +2600,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
if (pResBlock->info.rows > 0) {
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%" PRId64
", elapsed time:%.2f ms %s",
", elapsed time:%.2f ms %s",
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr);
return TSDB_CODE_SUCCESS;
......@@ -3462,7 +2615,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
}
static int32_t doBuildDataBlock(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS;
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
......@@ -3498,7 +2651,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
} else if (bufferDataInFileBlockGap(pReader->info.order, keyInBuf, pBlockInfo)) {
// data in memory that are earlier than current file block
// rows in buffer should be less than the file block in asc, greater than file block in desc
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey;
int64_t endKey =
(ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey;
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
} else {
if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->info.order)) {
......@@ -3537,7 +2691,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
if (pResBlock->info.rows > 0) {
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%" PRId64
", elapsed time:%.2f ms %s",
", elapsed time:%.2f ms %s",
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr);
}
......@@ -3552,14 +2706,15 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
// update the last key for the corresponding table
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey;
tsdbDebug("%p uid:%" PRIu64 " clean file block retrieved from file, global index:%d, "
tsdbDebug("%p uid:%" PRIu64
" clean file block retrieved from file, global index:%d, "
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow,
pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr);
}
}
return (pReader->code != TSDB_CODE_SUCCESS)? pReader->code:code;
return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code;
}
static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReader) {
......@@ -3592,14 +2747,14 @@ static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReade
}
STableBlockScanInfo* pScanInfo = *p;
SDataBlk block = {0};
// for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
// tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block);
// pReader->rowsNum += block.nRow;
// }
SDataBlk block = {0};
// for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
// tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block);
// pReader->rowsNum += block.nRow;
// }
}
_end:
_end:
tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
return code;
}
......@@ -3669,7 +2824,7 @@ static int32_t readRowsCountFromFiles(STsdbReader* pReader) {
break;
}
// code = doSumFileBlockRows(pReader, pReader->pFileReader);
// code = doSumFileBlockRows(pReader, pReader->pFileReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -3712,7 +2867,8 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
}
STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &(*pBlockScanInfo)->uid, sizeof((*pBlockScanInfo)->uid))) {
if (pReader->pIgnoreTables &&
taosHashGet(*pReader->pIgnoreTables, &(*pBlockScanInfo)->uid, sizeof((*pBlockScanInfo)->uid))) {
bool hasNexTable = moveToNextTable(pUidList, pStatus);
if (!hasNexTable) {
return TSDB_CODE_SUCCESS;
......@@ -3767,7 +2923,7 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter)
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
SBlockNumber num = {0};
SArray* pTableList = taosArrayInit(40, POINTER_BYTES);
SArray* pTableList = taosArrayInit(40, POINTER_BYTES);
int32_t code = moveToNextFile(pReader, &num, pTableList);
if (code != TSDB_CODE_SUCCESS) {
......@@ -3812,7 +2968,7 @@ static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) {
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
while(1) {
while (1) {
terrno = 0;
code = doLoadLastBlockSequentially(pReader);
......@@ -3835,7 +2991,7 @@ static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) {
return TSDB_READ_RETURN;
}
if (pBlockIter->numOfBlocks > 0) { // there are data blocks existed.
if (pBlockIter->numOfBlocks > 0) { // there are data blocks existed.
return TSDB_READ_CONTINUE;
} else { // all blocks in data file are checked, let's check the data in last files
resetTableListIndex(&pReader->status);
......@@ -3848,7 +3004,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
if (pBlockIter->numOfBlocks == 0) {
// let's try to extract data from stt files.
......@@ -3910,8 +3066,8 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret
int8_t precision = pVnode->config.tsdbCfg.precision;
int64_t now = taosGetTimestamp(precision);
int64_t offset = tsQueryRsmaTolerance * ((precision == TSDB_TIME_PRECISION_MILLI) ? 1L
: (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L
: 1000000L);
: (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L
: 1000000L);
for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
SRetention* pRetention = retentions + level;
......@@ -3961,7 +3117,8 @@ SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_
return (SVersionRange){.minVer = startVer, .maxVer = endVer};
}
bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order, SVersionRange* pVerRange) {
bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order,
SVersionRange* pVerRange) {
if (pDelList == NULL || (taosArrayGetSize(pDelList) == 0)) {
return false;
}
......@@ -3979,8 +3136,7 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t
return false;
} else if (key == last->ts) {
TSDBKEY* prev = taosArrayGet(pDelList, num - 2);
return (prev->version >= ver && prev->version <= pVerRange->maxVer &&
prev->version >= pVerRange->minVer);
return (prev->version >= ver && prev->version <= pVerRange->maxVer && prev->version >= pVerRange->minVer);
}
} else {
TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
......@@ -4189,9 +3345,9 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SRowMerger* pMerger = &pReader->status.merger;
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
int32_t step = asc ? 1 : -1;
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
int32_t step = asc ? 1 : -1;
pDumpInfo->rowIndex += step;
if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
......@@ -4205,7 +3361,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
CHECK_FILEBLOCK_STATE st;
SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
// SDataBlk* pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
// SDataBlk* pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
if (pFileBlockInfo == NULL) {
st = CHECK_FILEBLOCK_QUIT;
break;
......@@ -4288,14 +3444,14 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
return terrno;
}
tsdbRowMergerAdd(&pReader->status.merger,pNextRow, pTSchema1);
tsdbRowMergerAdd(&pReader->status.merger, pNextRow, pTSchema1);
} else { // let's merge rows in file block
code = tsdbRowMergerAdd(&pReader->status.merger, &current, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
tsdbRowMergerAdd(&pReader->status.merger,pNextRow, NULL);
tsdbRowMergerAdd(&pReader->status.merger, pNextRow, NULL);
}
code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(&current), pDelList, pReader);
......@@ -4342,9 +3498,8 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
return code;
}
tsdbRowMergerAdd(&pReader->status.merger,pRow, pSchema);
code =
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema);
code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -4372,8 +3527,8 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
return code;
}
static int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, TSDBROW* pResRow, int64_t endKey,
bool* freeTSRow) {
static int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, TSDBROW* pResRow,
int64_t endKey, bool* freeTSRow) {
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
SArray* pDelList = pBlockScanInfo->delSkyline;
......@@ -4583,7 +3738,7 @@ int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t
int32_t size = tSimpleHashGetSize(pReader->status.pTableMap);
STableBlockScanInfo** p = NULL;
int32_t iter = 0;
int32_t iter = 0;
while ((p = tSimpleHashIterate(pReader->status.pTableMap, p, &iter)) != NULL) {
clearBlockScanInfo(*p);
......@@ -4670,15 +3825,38 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
}
static void freeSchemaFunc(void* param) {
void **p = (void **)param;
void** p = (void**)param;
taosMemoryFreeClear(*p);
}
static void clearSharedPtr(STsdbReader* p) {
p->status.pTableMap = NULL;
p->status.uidList.tableUidList = NULL;
p->status.pfSetArray = NULL;
p->info.pSchema = NULL;
p->pReadSnap = NULL;
p->pSchemaMap = NULL;
}
static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) {
pDst->status.pTableMap = pSrc->status.pTableMap;
pDst->status.uidList = pSrc->status.uidList;
pDst->status.pfSetArray = pSrc->status.pfSetArray;
pDst->info.pSchema = pSrc->info.pSchema;
pDst->pSchemaMap = pSrc->pSchemaMap;
pDst->pReadSnap = pSrc->pReadSnap;
if (pDst->info.pSchema) {
tsdbRowMergerInit(&pDst->status.merger, pDst->info.pSchema);
}
}
// ====================================== EXPOSED APIs ======================================
int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
SSDataBlock* pResBlock, void** ppReader, const char* idstr, bool countOnly, SHashObj** pIgnoreTables) {
SSDataBlock* pResBlock, void** ppReader, const char* idstr, bool countOnly,
SHashObj** pIgnoreTables) {
STimeWindow window = pCond->twindows;
SVnodeCfg* pConf = &(((SVnode*)pVnode)->config);
SVnodeCfg* pConf = &(((SVnode*)pVnode)->config);
int32_t capacity = pConf->tsdbCfg.maxRows;
if (pResBlock != NULL) {
......@@ -4790,41 +3968,19 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
pReader->pIgnoreTables = pIgnoreTables;
tsdbDebug("%p total numOfTable:%d, window:%" PRId64 " - %" PRId64 ", verRange:%" PRId64 " - %" PRId64
" in this query %s",
" in this query %s",
pReader, numOfTables, pReader->info.window.skey, pReader->info.window.ekey, pReader->info.verRange.minVer,
pReader->info.verRange.maxVer, pReader->idStr);
return code;
_err:
_err:
tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
tsdbReaderClose2(*ppReader);
*ppReader = NULL; // reset the pointer value.
return code;
}
static void clearSharedPtr(STsdbReader* p) {
p->status.pTableMap = NULL;
p->status.uidList.tableUidList = NULL;
p->status.pfSetArray = NULL;
p->info.pSchema = NULL;
p->pReadSnap = NULL;
p->pSchemaMap = NULL;
}
static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) {
pDst->status.pTableMap = pSrc->status.pTableMap;
pDst->status.uidList = pSrc->status.uidList;
pDst->status.pfSetArray = pSrc->status.pfSetArray;
pDst->info.pSchema = pSrc->info.pSchema;
pDst->pSchemaMap = pSrc->pSchemaMap;
pDst->pReadSnap = pSrc->pReadSnap;
if (pDst->info.pSchema) {
tsdbRowMergerInit(&pDst->status.merger, pDst->info.pSchema);
}
}
void tsdbReaderClose2(STsdbReader* pReader) {
if (pReader == NULL) {
return;
......@@ -4880,7 +4036,7 @@ void tsdbReaderClose2(STsdbReader* pReader) {
tsdbUninitReaderLock(pReader);
SCostSummary* pCost = &pReader->cost;
SFilesetIter* pFilesetIter = &pReader->status.fileIter;
SFilesetIter* pFilesetIter = &pReader->status.fileIter;
if (pFilesetIter->pLastBlockReader != NULL) {
SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
tMergeTreeClose(&pLReader->mergeTree);
......@@ -4892,7 +4048,8 @@ void tsdbReaderClose2(STsdbReader* pReader) {
tsdbDebug(
"%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64
" SMA-time:%.2f ms, fileBlocks:%" PRId64 ", fileBlocks-load-time:%.2f ms, "
" SMA-time:%.2f ms, fileBlocks:%" PRId64
", fileBlocks-load-time:%.2f ms, "
"build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64
", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, createTime:%.2f ms,createSkylineIterTime:%.2f "
"ms, initLastBlockReader:%.2fms, %s",
......@@ -4932,7 +4089,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
// resetDataBlockScanInfo excluding lastKey
STableBlockScanInfo** p = NULL;
int32_t iter = 0;
int32_t iter = 0;
while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) {
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
......@@ -4955,7 +4112,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
} else {
// resetDataBlockScanInfo excluding lastKey
STableBlockScanInfo** p = NULL;
int32_t iter = 0;
int32_t iter = 0;
while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) {
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
......@@ -5008,7 +4165,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
pReader->idStr);
return code;
_err:
_err:
tsdbError("failed to suspend data reader, code:%s %s", tstrerror(code), pReader->idStr);
return code;
}
......@@ -5080,7 +4237,7 @@ int32_t tsdbReaderResume2(STsdbReader* pReader) {
pBlockScanInfo ? (*pBlockScanInfo)->uid : 0, numOfTables, pReader->idStr);
return code;
_err:
_err:
tsdbError("failed to resume data reader, code:%s %s", tstrerror(code), pReader->idStr);
return code;
}
......@@ -5154,8 +4311,9 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) {
*hasNext = false;
if (isEmptyQueryTimeWindow(&pReader->info.window) || pReader->step == EXTERNAL_ROWS_NEXT || pReader->code != TSDB_CODE_SUCCESS) {
return (pReader->code != TSDB_CODE_SUCCESS)? pReader->code:code;
if (isEmptyQueryTimeWindow(&pReader->info.window) || pReader->step == EXTERNAL_ROWS_NEXT ||
pReader->code != TSDB_CODE_SUCCESS) {
return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code;
}
SReaderStatus* pStatus = &pReader->status;
......@@ -5286,7 +4444,7 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
}
}
int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave, bool *hasNullSMA) {
int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave, bool* hasNullSMA) {
SColumnDataAgg*** pBlockSMA = &pDataBlock->pBlockAgg;
int32_t code = 0;
......@@ -5310,7 +4468,7 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
return TSDB_CODE_SUCCESS;
}
// int64_t st = taosGetTimestampUs();
// int64_t st = taosGetTimestampUs();
TARRAY2_CLEAR(&pSup->colAggArray, 0);
code = tsdbDataFileReadBlockSma(pReader->pFileReader, &pFBlock->record, &pSup->colAggArray);
......@@ -5367,29 +4525,17 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock,
*pBlockSMA = pResBlock->pBlockAgg;
pReader->cost.smaDataLoad += 1;
// double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
pReader->cost.smaLoadTime += 0;//elapsedTime;
// double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
pReader->cost.smaLoadTime += 0; // elapsedTime;
tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr);
return code;
}
STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id) {
STableBlockScanInfo** p = tSimpleHashGet(pTableMap, &uid, sizeof(uid));
if (p == NULL || *p == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
int32_t size = tSimpleHashGetSize(pTableMap);
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", uid, size, id);
return NULL;
}
return *p;
}
static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
int32_t code = TSDB_CODE_SUCCESS;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
SReaderStatus* pStatus = &pReader->status;
int32_t code = TSDB_CODE_SUCCESS;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
if (pReader->code != TSDB_CODE_SUCCESS) {
return NULL;
......@@ -5503,8 +4649,8 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) {
tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64
" in query %s",
pReader, pReader->info.suid, numOfTables, pCond->twindows.skey, pReader->info.window.skey, pReader->info.window.ekey,
pReader->idStr);
pReader, pReader->info.suid, numOfTables, pCond->twindows.skey, pReader->info.window.skey,
pReader->info.window.ekey, pReader->idStr);
tsdbReleaseReader(pReader);
......@@ -5565,7 +4711,7 @@ int32_t tsdbGetFileBlocksDistInfo2(STsdbReader* pReader, STableBlockDistInfo* pT
while (true) {
if (hasNext) {
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
int32_t numOfRows = pBlockInfo->record.numRow;
int32_t numOfRows = pBlockInfo->record.numRow;
pTableBlockInfo->totalRows += numOfRows;
......@@ -5741,7 +4887,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode));
_exit:
_exit:
if (code) {
*ppSnap = NULL;
if (pSnap) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册