提交 a94ace4c 编写于 作者: H Haojun Liao

refactor:do some internal refactor.

上级 6abf8ff8
......@@ -161,7 +161,6 @@ typedef struct STableUidList {
typedef struct SReaderStatus {
bool loadFromFile; // check file stage
bool composedDataBlock; // the returned data block is a composed block or not
// bool mapDataCleaned; // mapData has been cleaned up alreay or not
SSHashObj* pTableMap; // SHash<STableBlockScanInfo>
STableBlockScanInfo** pTableIter; // table iterator used in building in-memory buffer data blocks.
STableUidList uidList; // check tables in uid order, to avoid the repeatly load of blocks in STT.
......@@ -173,6 +172,7 @@ typedef struct SReaderStatus {
SArray* pLDataIterArray;
SRowMerger merger;
SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data
TFileSetArray* pfSetArray;
} SReaderStatus;
typedef struct SBlockInfoBuf {
......@@ -182,15 +182,15 @@ typedef struct SBlockInfoBuf {
int32_t numOfTables;
} SBlockInfoBuf;
typedef struct STsdbReaderAttr {
typedef struct STsdbReaderInfo {
uint64_t suid;
STSchema* pSchema;
EReadMode readMode;
uint64_t rowsNum;
STimeWindow window;
bool freeBlock;
SVersionRange verRange;
int16_t order;
} STsdbReaderAttr;
} STsdbReaderInfo;
typedef struct SResultBlockInfo {
SSDataBlock* pResBlock;
......@@ -200,15 +200,11 @@ typedef struct SResultBlockInfo {
struct STsdbReader {
STsdb* pTsdb;
SVersionRange verRange;
STsdbReaderInfo info;
TdThreadMutex readerMutex;
EReaderStatus flag;
int32_t code;
uint64_t suid;
int16_t order;
EReadMode readMode;
uint64_t rowsNum;
STimeWindow window; // the primary query time window that applies to all queries
SResultBlockInfo resBlockInfo;
SReaderStatus status;
char* idStr; // query info handle, for debug purpose
......@@ -217,15 +213,11 @@ struct STsdbReader {
STsdbReadSnap* pReadSnap;
SCostSummary cost;
SHashObj** pIgnoreTables;
STSchema* pSchema; // the newest version schema
SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema
SDataFileReader* pFileReader; // the file reader
SDelFReader* pDelFReader; // the del file reader, todo remove it
SArray* pDelIdx; // del file block index;
SBlockInfoBuf blockInfoBuf;
EContentData step;
STsdbReader* innerReader[2];
TFileSetArray* pfSetArray;
};
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
......@@ -437,12 +429,12 @@ static SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBu
pScanInfo->uid = idList[j].uid;
pUidList->tableUidList[j] = idList[j].uid;
if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
int64_t skey = pTsdbReader->window.skey;
if (ASCENDING_TRAVERSE(pTsdbReader->info.order)) {
int64_t skey = pTsdbReader->info.window.skey;
pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
pScanInfo->lastKeyInStt = skey;
} else {
int64_t ekey = pTsdbReader->window.ekey;
int64_t ekey = pTsdbReader->info.window.ekey;
pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
pScanInfo->lastKeyInStt = ekey;
}
......@@ -539,8 +531,8 @@ static STimeWindow updateQueryTimeWindow(STsdb* pTsdb, STimeWindow* pWindow) {
static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetArray, STsdbReader* pReader) {
size_t numOfFileset = TARRAY2_SIZE(pFileSetArray);
pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
pIter->order = pReader->order;
pIter->index = ASCENDING_TRAVERSE(pReader->info.order) ? -1 : numOfFileset;
pIter->order = pReader->info.order;
pIter->pFilesetList = pFileSetArray;
pIter->numOfFiles = numOfFileset;
......@@ -554,9 +546,9 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA
}
SLastBlockReader* pLReader = pIter->pLastBlockReader;
pLReader->order = pReader->order;
pLReader->window = pReader->window;
pLReader->verRange = pReader->verRange;
pLReader->order = pReader->info.order;
pLReader->window = pReader->info.window;
pLReader->verRange = pReader->info.verRange;
pLReader->uid = 0;
tMergeTreeClose(&pLReader->mergeTree);
......@@ -631,14 +623,14 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo
tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);
// current file are no longer overlapped with query time window, ignore remain files
if ((asc && win.skey > pReader->window.ekey) || (!asc && win.ekey < pReader->window.skey)) {
if ((asc && win.skey > pReader->info.window.ekey) || (!asc && win.ekey < pReader->info.window.skey)) {
tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pReader,
pReader->window.skey, pReader->window.ekey, pReader->idStr);
pReader->info.window.skey, pReader->info.window.ekey, pReader->idStr);
*hasNext = false;
return TSDB_CODE_SUCCESS;
}
if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
if ((asc && (win.ekey < pReader->info.window.skey)) || ((!asc) && (win.skey > pReader->info.window.ekey))) {
pIter->index += step;
if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
*hasNext = false;
......@@ -647,8 +639,8 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo
continue;
}
tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->window.skey,
pReader->window.ekey, pReader->idStr);
tsdbDebug("%p file found fid:%d for qrange:%" PRId64 "-%" PRId64 ", %s", pReader, fid, pReader->info.window.skey,
pReader->info.window.ekey, pReader->idStr);
*hasNext = true;
return TSDB_CODE_SUCCESS;
}
......@@ -792,13 +784,13 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
initReaderStatus(&pReader->status);
pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
pReader->suid = pCond->suid;
pReader->order = pCond->order;
pReader->info.suid = pCond->suid;
pReader->info.order = pCond->order;
pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL;
pReader->verRange = getQueryVerRange(pVnode, pCond, level);
pReader->info.verRange = getQueryVerRange(pVnode, pCond, level);
pReader->type = pCond->type;
pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
pReader->info.window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond);
......@@ -887,22 +879,22 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead
while (i < TARRAY2_SIZE(pBlkArray)) {
pBrinBlk = &pBlkArray->data[i];
if (pBrinBlk->maxTbid.suid < pReader->suid) {
if (pBrinBlk->maxTbid.suid < pReader->info.suid) {
i += 1;
continue;
}
if (pBrinBlk->minTbid.suid > pReader->suid) { // not include the queried table/super table, quit the loop
if (pBrinBlk->minTbid.suid > pReader->info.suid) { // not include the queried table/super table, quit the loop
break;
}
ASSERT(pBrinBlk->minTbid.suid <= pReader->suid && pBrinBlk->maxTbid.suid >= pReader->suid);
if (pBrinBlk->maxTbid.suid == pReader->suid && pBrinBlk->maxTbid.uid < pList->tableUidList[0]) {
ASSERT(pBrinBlk->minTbid.suid <= pReader->info.suid && pBrinBlk->maxTbid.suid >= pReader->info.suid);
if (pBrinBlk->maxTbid.suid == pReader->info.suid && pBrinBlk->maxTbid.uid < pList->tableUidList[0]) {
i += 1;
continue;
}
if (pBrinBlk->minTbid.suid == pReader->suid && pBrinBlk->minTbid.uid > pList->tableUidList[numOfTables - 1]) {
if (pBrinBlk->minTbid.suid == pReader->info.suid && pBrinBlk->minTbid.uid > pList->tableUidList[numOfTables - 1]) {
break;
}
......@@ -1003,32 +995,28 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
int32_t k = 0;
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
STimeWindow w = pReader->window;
int32_t step = ASCENDING_TRAVERSE(pReader->info.order) ? 1 : -1;
STimeWindow w = pReader->info.window;
SBrinRecord* pRecord = NULL;
SBrinRecordIter iter = {0};
initBrinRecordIter(&iter, pReader->pFileReader, pIndexList);
while (k < numOfTables) {
pRecord = getNextBrinRecord(&iter);
uint64_t uid = pReader->status.uidList.tableUidList[k];
if (pRecord == NULL || pRecord->suid > pReader->suid) {
while (((pRecord = getNextBrinRecord(&iter)) != NULL)) {
if (pRecord->suid > pReader->info.suid) {
break;
}
if (pRecord->suid < pReader->suid) {
uint64_t uid = pReader->status.uidList.tableUidList[k];
if (pRecord->suid < pReader->info.suid) {
continue;
}
ASSERT(pRecord->suid == pReader->suid);
if (pRecord->uid < uid) {
continue;
if (uid < pRecord->uid) { // forward the table uid index
while (pReader->status.uidList.tableUidList[k] < pRecord->uid && k < numOfTables) {
k += 1;
}
while (pRecord->uid > uid && k < numOfTables) {
k += 1;
if (k >= numOfTables) {
break;
}
......@@ -1036,18 +1024,14 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
uid = pReader->status.uidList.tableUidList[k];
}
if (k >= numOfTables) {
break;
}
if (pRecord->uid < uid) {
continue;
}
ASSERT(pRecord->suid == pReader->suid && uid == pRecord->uid);
ASSERT(pRecord->suid == pReader->info.suid && uid == pRecord->uid);
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
if (ASCENDING_TRAVERSE(pReader->order)) {
if (ASCENDING_TRAVERSE(pReader->info.order)) {
w.skey = pScanInfo->lastKey + step;
} else {
w.ekey = pScanInfo->lastKey + step;
......@@ -1064,7 +1048,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
}
// 2. version range check
if (pRecord->minVer > pReader->verRange.maxVer || pRecord->maxVer < pReader->verRange.minVer) {
if (pRecord->minVer > pReader->info.verRange.maxVer || pRecord->maxVer < pReader->info.verRange.minVer) {
continue;
}
......@@ -1201,30 +1185,30 @@ static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int or
static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SBrinRecord* pRecord, int32_t pos) {
// NOTE: reverse the order to find the end position in data block
int32_t endPos = -1;
bool asc = ASCENDING_TRAVERSE(pReader->order);
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
if (asc && pReader->window.ekey >= pRecord->lastKey) {
if (asc && pReader->info.window.ekey >= pRecord->lastKey) {
endPos = pRecord->numRow - 1;
} else if (!asc && pReader->window.skey <= pRecord->firstKey) {
} else if (!asc && pReader->info.window.skey <= pRecord->firstKey) {
endPos = 0;
} else {
int64_t key = asc ? pReader->window.ekey : pReader->window.skey;
endPos = doBinarySearchKey(pBlockData->aTSKEY, pRecord->numRow, pos, key, pReader->order);
int64_t key = asc ? pReader->info.window.ekey : pReader->info.window.skey;
endPos = doBinarySearchKey(pBlockData->aTSKEY, pRecord->numRow, pos, key, pReader->info.order);
}
if ((pReader->verRange.maxVer >= pRecord->minVer && pReader->verRange.maxVer < pRecord->maxVer)||
(pReader->verRange.minVer <= pRecord->maxVer && pReader->verRange.minVer > pRecord->minVer)) {
if ((pReader->info.verRange.maxVer >= pRecord->minVer && pReader->info.verRange.maxVer < pRecord->maxVer)||
(pReader->info.verRange.minVer <= pRecord->maxVer && pReader->info.verRange.minVer > pRecord->minVer)) {
int32_t i = endPos;
if (asc) {
for(; i >= 0; --i) {
if (pBlockData->aVersion[i] <= pReader->verRange.maxVer) {
if (pBlockData->aVersion[i] <= pReader->info.verRange.maxVer) {
break;
}
}
} else {
for(; i < pRecord->numRow; ++i) {
if (pBlockData->aVersion[i] >= pReader->verRange.minVer) {
if (pBlockData->aVersion[i] >= pReader->info.verRange.minVer) {
break;
}
}
......@@ -1360,7 +1344,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
SColVal cv = {0};
int64_t st = taosGetTimestampUs();
bool asc = ASCENDING_TRAVERSE(pReader->order);
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
int32_t step = asc ? 1 : -1;
SBrinRecord* pRecord = &pBlockInfo->record;
......@@ -1375,14 +1359,14 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
// row index of dump info remain the initial position, let's find the appropriate start position.
if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pRecord->numRow - 1 && (!asc))) {
if (asc && pReader->window.skey <= pRecord->firstKey && pReader->verRange.minVer <= pRecord->minVer) {
if (asc && pReader->info.window.skey <= pRecord->firstKey && pReader->info.verRange.minVer <= pRecord->minVer) {
// pDumpInfo->rowIndex = 0;
} else if (!asc && pReader->window.ekey >= pRecord->lastKey && pReader->verRange.maxVer >= pRecord->maxVer) {
} else if (!asc && pReader->info.window.ekey >= pRecord->lastKey && pReader->info.verRange.maxVer >= pRecord->maxVer) {
// pDumpInfo->rowIndex = pRecord->numRow - 1;
} else { // find the appropriate the start position in current block, and set it to be the current rowIndex
int32_t pos = asc ? pRecord->numRow - 1 : 0;
int32_t order = asc ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
int64_t key = asc ? pReader->window.skey : pReader->window.ekey;
int64_t key = asc ? pReader->info.window.skey : pReader->info.window.ekey;
pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pRecord->numRow, pos, key, order);
if (pDumpInfo->rowIndex < 0) {
......@@ -1394,21 +1378,21 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
return TSDB_CODE_INVALID_PARA;
}
ASSERT(pReader->verRange.minVer <= pRecord->maxVer && pReader->verRange.maxVer >= pRecord->minVer);
ASSERT(pReader->info.verRange.minVer <= pRecord->maxVer && pReader->info.verRange.maxVer >= pRecord->minVer);
// find the appropriate start position that satisfies the version requirement.
if ((pReader->verRange.maxVer >= pRecord->minVer && pReader->verRange.maxVer < pRecord->maxVer)||
(pReader->verRange.minVer <= pRecord->maxVer && pReader->verRange.minVer > pRecord->minVer)) {
if ((pReader->info.verRange.maxVer >= pRecord->minVer && pReader->info.verRange.maxVer < pRecord->maxVer)||
(pReader->info.verRange.minVer <= pRecord->maxVer && pReader->info.verRange.minVer > pRecord->minVer)) {
int32_t i = pDumpInfo->rowIndex;
if (asc) {
for(; i < pRecord->numRow; ++i) {
if (pBlockData->aVersion[i] >= pReader->verRange.minVer) {
if (pBlockData->aVersion[i] >= pReader->info.verRange.minVer) {
break;
}
}
} else {
for(; i >= 0; --i) {
if (pBlockData->aVersion[i] <= pReader->verRange.maxVer) {
if (pBlockData->aVersion[i] <= pReader->info.verRange.maxVer) {
break;
}
}
......@@ -1422,7 +1406,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
// time window check
int32_t endIndex = getEndPosInDataBlock(pReader, pBlockData, pRecord, pDumpInfo->rowIndex);
if (endIndex == -1) {
setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order);
setBlockAllDumped(pDumpInfo, pReader->info.window.ekey, pReader->info.order);
return TSDB_CODE_SUCCESS;
}
......@@ -1431,7 +1415,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
if (dumpedRows > pReader->resBlockInfo.capacity) { // output buffer check
dumpedRows = pReader->resBlockInfo.capacity;
} else if (dumpedRows <= 0) { // no qualified rows in current data block, abort directly.
setBlockAllDumped(pDumpInfo, pReader->window.ekey, pReader->order);
setBlockAllDumped(pDumpInfo, pReader->info.window.ekey, pReader->info.order);
return TSDB_CODE_SUCCESS;
}
......@@ -1494,12 +1478,12 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
// check if current block are all handled
if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pRecord->numRow) {
int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
if (outOfTimeWindow(ts, &pReader->window)) { // the remain data has out of query time window, ignore current block
setBlockAllDumped(pDumpInfo, ts, pReader->order);
if (outOfTimeWindow(ts, &pReader->info.window)) { // the remain data has out of query time window, ignore current block
setBlockAllDumped(pDumpInfo, ts, pReader->info.order);
}
} else {
int64_t ts = asc ? pRecord->lastKey : pRecord->firstKey;
setBlockAllDumped(pDumpInfo, ts, pReader->order);
setBlockAllDumped(pDumpInfo, ts, pReader->info.order);
}
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
......@@ -1515,34 +1499,34 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
}
static FORCE_INLINE STSchema* getTableSchemaImpl(STsdbReader* pReader, uint64_t uid) {
ASSERT(pReader->pSchema == NULL);
ASSERT(pReader->info.pSchema == NULL);
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, -1, &pReader->pSchema);
if (code != TSDB_CODE_SUCCESS || pReader->pSchema == NULL) {
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, -1, &pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS || pReader->info.pSchema == NULL) {
terrno = code;
tsdbError("failed to get table schema, uid:%" PRIu64 ", it may have been dropped, ver:-1, %s", uid, pReader->idStr);
return NULL;
}
code = tsdbRowMergerInit(&pReader->status.merger, pReader->pSchema);
code = tsdbRowMergerInit(&pReader->status.merger, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
tsdbError("failed to init merger, code:%s, %s", tstrerror(code), pReader->idStr);
return NULL;
}
return pReader->pSchema;
return pReader->info.pSchema;
}
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData,
uint64_t uid) {
int32_t code = 0;
STSchema* pSchema = pReader->pSchema;
STSchema* pSchema = pReader->info.pSchema;
int64_t st = taosGetTimestampUs();
tBlockDataReset(pBlockData);
if (pReader->pSchema == NULL) {
if (pReader->info.pSchema == NULL) {
pSchema = getTableSchemaImpl(pReader, uid);
if (pSchema == NULL) {
tsdbDebug("%p table uid:%" PRIu64 " has been dropped, no data existed, %s", pReader, uid, pReader->idStr);
......@@ -1648,7 +1632,7 @@ static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr)
}
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks, SArray* pTableList) {
bool asc = ASCENDING_TRAVERSE(pReader->order);
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
SBlockOrderSupporter sup = {0};
pBlockIter->numOfBlocks = numOfBlocks;
......@@ -1948,16 +1932,16 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
int32_t neighborIndex = 0;
SBrinRecord rec = {0};
bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order, &rec);
bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->info.order, &rec);
// overlap with neighbor
if (hasNeighbor) {
pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock2(pBlockInfo, &rec, pReader->order);
pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock2(pBlockInfo, &rec, pReader->info.order);
}
// has duplicated ts of different version in this block
pInfo->hasDupTs = (pBlockInfo->record.numRow > pBlockInfo->record.count);
pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pBlockInfo->record, pReader->order);
pInfo->overlapWithDelInfo = overlapWithDelSkyline(pScanInfo, &pBlockInfo->record, pReader->info.order);
if (hasDataInLastBlock(pLastBlockReader)) {
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
......@@ -1965,8 +1949,8 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
}
pInfo->moreThanCapcity = pBlockInfo->record.numRow > pReader->resBlockInfo.capacity;
pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlockInfo);
pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlockInfo, &pReader->verRange);
pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->info.window, &pReader->info.verRange, pBlockInfo);
pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlockInfo, &pReader->info.verRange);
}
// 1. the version of all rows should be less than the endVersion
......@@ -2039,9 +2023,9 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
int32_t code = TSDB_CODE_SUCCESS;
*copied = false;
bool asc = (pReader->order == TSDB_ORDER_ASC);
bool asc = (pReader->info.order == TSDB_ORDER_ASC);
if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) {
int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
int32_t step = pReader->info.order == TSDB_ORDER_ASC ? 1 : -1;
int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
if (nextKey != key) { // merge is not needed
......@@ -2088,7 +2072,7 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas
*copied = false;
bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange);
bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->info.verRange);
if (hasVal) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
if (next1 != ts) {
......@@ -2114,16 +2098,16 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas
}
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
// always set the newest schema version in pReader->pSchema
if (pReader->pSchema == NULL) {
// always set the newest schema version in pReader->info.pSchema
if (pReader->info.pSchema == NULL) {
STSchema* ps = getTableSchemaImpl(pReader, uid);
if (ps == NULL) {
return NULL;
}
}
if (pReader->pSchema && sversion == pReader->pSchema->version) {
return pReader->pSchema;
if (pReader->info.pSchema && sversion == pReader->info.pSchema->version) {
return pReader->info.pSchema;
}
void** p = tSimpleHashGet(pReader->pSchemaMap, &sversion, sizeof(sversion));
......@@ -2132,7 +2116,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
}
STSchema* ptr = NULL;
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &ptr);
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &ptr);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
......@@ -2161,9 +2145,9 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
// merge is not initialized yet, due to the fact that the pReader->pSchema is not initialized
// merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized
if (pMerger->pArray == NULL) {
ASSERT(pReader->pSchema == NULL);
ASSERT(pReader->info.pSchema == NULL);
STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid);
if (ps == NULL) {
return terrno;
......@@ -2171,7 +2155,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
}
int64_t minKey = 0;
if (pReader->order == TSDB_ORDER_ASC) {
if (pReader->info.order == TSDB_ORDER_ASC) {
minKey = INT64_MAX; // chosen the minimum value
if (minKey > tsLast && hasDataInLastBlock(pLastBlockReader)) {
minKey = tsLast;
......@@ -2204,10 +2188,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
// ASC: file block ---> last block -----> imem -----> mem
// DESC: mem -----> imem -----> last block -----> file block
if (pReader->order == TSDB_ORDER_ASC) {
if (pReader->info.order == TSDB_ORDER_ASC) {
if (minKey == key) {
init = true;
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2220,12 +2204,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
tsdbRowMergerAdd(pMerger, fRow1, NULL);
} else {
init = true;
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, pReader->pSchema);
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->verRange, pReader->idStr);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
}
if (minKey == k.ts) {
......@@ -2272,12 +2256,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
tsdbRowMergerAdd(pMerger, fRow1, NULL);
} else {
init = true;
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, pReader->pSchema);
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->verRange, pReader->idStr);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
}
if (minKey == key) {
......@@ -2285,7 +2269,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
tsdbRowMergerAdd(pMerger, &fRow, NULL);
} else {
init = true;
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2335,14 +2319,14 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
pBlockScanInfo->lastKey = tsLastBlock;
return TSDB_CODE_SUCCESS;
} else {
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMergerAdd(pMerger, pRow1, NULL);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->verRange, pReader->idStr);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange, pReader->idStr);
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
......@@ -2359,12 +2343,12 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
}
}
} else { // not merge block data
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->verRange, pReader->idStr);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange, pReader->idStr);
// merge with block data if ts == key
if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
......@@ -2394,9 +2378,9 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SRowMerger* pMerger = &pReader->status.merger;
// merge is not initialized yet, due to the fact that the pReader->pSchema is not initialized
// merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized
if (pMerger->pArray == NULL) {
ASSERT(pReader->pSchema == NULL);
ASSERT(pReader->info.pSchema == NULL);
STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid);
if (ps == NULL) {
return terrno;
......@@ -2413,12 +2397,12 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
if (ASCENDING_TRAVERSE(pReader->order)) {
if (ASCENDING_TRAVERSE(pReader->info.order)) {
if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if (key == ts) {
SRow* pTSRow = NULL;
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2428,7 +2412,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbRowMergerAdd(pMerger, pRow1, NULL);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, pMerger, &pReader->verRange, pReader->idStr);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, pMerger, &pReader->info.verRange, pReader->idStr);
code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) {
......@@ -2481,9 +2465,9 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
return code;
}
// merge is not initialized yet, due to the fact that the pReader->pSchema is not initialized
// merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized
if (pMerger->pArray == NULL) {
ASSERT(pReader->pSchema == NULL);
ASSERT(pReader->info.pSchema == NULL);
STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid);
if (ps == NULL) {
return terrno;
......@@ -2491,7 +2475,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
}
int64_t minKey = 0;
if (ASCENDING_TRAVERSE(pReader->order)) {
if (ASCENDING_TRAVERSE(pReader->info.order)) {
minKey = INT64_MAX; // let's find the minimum
if (minKey > k.ts) {
minKey = k.ts;
......@@ -2531,11 +2515,11 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
// ASC: file block -----> last block -----> imem -----> mem
// DESC: mem -----> imem -----> last block -----> file block
if (ASCENDING_TRAVERSE(pReader->order)) {
if (ASCENDING_TRAVERSE(pReader->info.order)) {
if (minKey == key) {
init = true;
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2549,13 +2533,13 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
tsdbRowMergerAdd(pMerger, pRow1, NULL);
} else {
init = true;
code = tsdbRowMergerAdd(pMerger, pRow1, pReader->pSchema);
code = tsdbRowMergerAdd(pMerger, pRow1, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->verRange, pReader->idStr);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
}
if (minKey == ik.ts) {
......@@ -2626,18 +2610,18 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
tsdbRowMergerAdd(pMerger, pRow1, NULL);
} else {
init = true;
code = tsdbRowMergerAdd(pMerger, pRow1, pReader->pSchema);
code = tsdbRowMergerAdd(pMerger, pRow1, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->verRange, pReader->idStr);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
}
if (minKey == key) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
if (!init) {
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2663,11 +2647,11 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
int32_t doInitMemDataIter(STsdbReader* pReader, STbData** pData, STableBlockScanInfo* pBlockScanInfo, TSDBKEY* pKey,
SMemTable* pMem, SIterInfo* pIter, const char* type) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t backward = (!ASCENDING_TRAVERSE(pReader->order));
int32_t backward = (!ASCENDING_TRAVERSE(pReader->info.order));
pIter->hasVal = false;
if (pMem != NULL) {
*pData = tsdbGetTbDataFromMemTable(pMem, pReader->suid, pBlockScanInfo->uid);
*pData = tsdbGetTbDataFromMemTable(pMem, pReader->info.suid, pBlockScanInfo->uid);
if ((*pData) != NULL) {
code = tsdbTbDataIterCreate((*pData), pKey, backward, &pIter->iter);
......@@ -2676,7 +2660,7 @@ int32_t doInitMemDataIter(STsdbReader* pReader, STbData** pData, STableBlockScan
tsdbDebug("%p uid:%" PRIu64 ", check data in %s from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 " %s",
pReader, pBlockScanInfo->uid, type, pKey->ts, pReader->order, (*pData)->minKey, (*pData)->maxKey,
pReader, pBlockScanInfo->uid, type, pKey->ts, pReader->info.order, (*pData)->minKey, (*pData)->maxKey,
pReader->idStr);
} else {
tsdbError("%p uid:%" PRIu64 ", failed to create iterator for %s, code:%s, %s", pReader, pBlockScanInfo->uid,
......@@ -2726,10 +2710,10 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
STbData* d = NULL;
TSDBKEY startKey = {0};
if (ASCENDING_TRAVERSE(pReader->order)) {
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey + 1, .version = pReader->verRange.minVer};
if (ASCENDING_TRAVERSE(pReader->info.order)) {
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey + 1, .version = pReader->info.verRange.minVer};
} else {
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->verRange.maxVer};
startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->info.verRange.maxVer};
}
int32_t code = doInitMemDataIter(pReader, &d, pBlockScanInfo, &startKey, pReader->pReadSnap->pMem,
......@@ -2745,7 +2729,7 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
return code;
}
doLoadMemTombData(pBlockScanInfo, d, di, pReader->verRange.maxVer);
doLoadMemTombData(pBlockScanInfo, d, di, pReader->info.verRange.maxVer);
pBlockScanInfo->iterInit = true;
return TSDB_CODE_SUCCESS;
......@@ -2763,17 +2747,17 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
// check for version and time range
int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
if (ver > pReader->info.verRange.maxVer || ver < pReader->info.verRange.minVer) {
return false;
}
int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
if (ts > pReader->window.ekey || ts < pReader->window.skey) {
if (ts > pReader->info.window.ekey || ts < pReader->info.window.skey) {
return false;
}
if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, ts, ver, pReader->order,
&pReader->verRange)) {
if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, ts, ver, pReader->info.order,
&pReader->info.verRange)) {
return false;
}
......@@ -2809,8 +2793,8 @@ static int32_t checkTombBlockRecords(SArray* pData, STombBlock* pBlock, uint64_t
return TSDB_CODE_SUCCESS;
}
static int32_t loadTombRecordInfoFromSttFiles(SArray* pLDataIterList, uint64_t suid,
STableBlockScanInfo* pBlockScanInfo, uint64_t maxVer) {
static int32_t loadTombRecordsFromSttFiles(SArray* pLDataIterList, uint64_t suid, STableBlockScanInfo* pBlockScanInfo,
uint64_t maxVer) {
int32_t size = taosArrayGetSize(pLDataIterList);
if (size <= 0) {
return TSDB_CODE_SUCCESS;
......@@ -2883,41 +2867,44 @@ static int32_t loadTombRecordsFromDataFiles(STsdbReader* pReader, int32_t numOfT
return code;
}
if (record.suid < pReader->suid) {
if (record.suid < pReader->info.suid) {
continue;
}
if (record.suid > pReader->suid) {
if (record.suid > pReader->info.suid) {
tTombBlockDestroy(&block);
return TSDB_CODE_SUCCESS;
}
ASSERT(record.suid == pReader->suid);
if (record.uid < uid) {
continue;
}
bool newTable = false;
while (uid < record.uid && j < (numOfTables - 1)) {
if (uid < record.uid) {
while (pReader->status.uidList.tableUidList[j] < record.uid && j < numOfTables) {
j += 1;
uid = pReader->status.uidList.tableUidList[j];
newTable = true;
}
if (uid != record.uid) {
if (j >= numOfTables) {
tTombBlockDestroy(&block);
return TSDB_CODE_SUCCESS;
} else {
break;
}
uid = pReader->status.uidList.tableUidList[j];
}
if (record.uid < uid) {
continue;
}
ASSERT(record.suid == pReader->info.suid && uid == record.uid);
if (newTable) {
pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
if (pScanInfo->pfileDelData == NULL) {
pScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData));
}
}
}
ASSERT(record.uid == uid);
if (record.version <= pReader->verRange.maxVer) {
if (record.version <= pReader->info.verRange.maxVer) {
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
taosArrayPush(pScanInfo->pfileDelData, &delData);
}
......@@ -2954,22 +2941,22 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan
pScanInfo->uid, pReader->idStr);
int32_t code = tMergeTreeOpen2(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pTsdb,
pReader->suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pReader->idStr, false,
pReader->status.pLDataIterArray, pReader->status.pCurrentFileset, pReader->pSchema,
pReader->info.suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pReader->idStr, false,
pReader->status.pLDataIterArray, pReader->status.pCurrentFileset, pReader->info.pSchema,
pReader->suppInfo.colId, pReader->suppInfo.numOfCols);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
code = loadTombRecordInfoFromSttFiles(pReader->status.pLDataIterArray, pReader->suid, pScanInfo, pReader->verRange.maxVer);
code = loadTombRecordsFromSttFiles(pReader->status.pLDataIterArray, pReader->info.suid, pScanInfo, pReader->info.verRange.maxVer);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
initMemDataIterator(pScanInfo, pReader);
initDelSkylineIterator(pScanInfo, pReader->order, &pReader->cost);
code = nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->verRange);
initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost);
code = nextRowFromLastBlocks(pLBlockReader, pScanInfo, &pReader->info.verRange);
int64_t el = taosGetTimestampUs() - st;
pReader->cost.initLastBlockReader += (el / 1000.0);
......@@ -2998,9 +2985,9 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
return code;
}
// merge is not initialized yet, due to the fact that the pReader->pSchema is not initialized
// merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized
if (pMerger->pArray == NULL) {
ASSERT(pReader->pSchema == NULL);
ASSERT(pReader->info.pSchema == NULL);
STSchema* ps = getTableSchemaImpl(pReader, pBlockScanInfo->uid);
if (ps == NULL) {
return terrno;
......@@ -3014,7 +3001,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
SRow* pTSRow = NULL;
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->pSchema);
code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -3069,18 +3056,18 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo,
STsdbReader* pReader, bool* loadNeighbor) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
int32_t step = ASCENDING_TRAVERSE(pReader->info.order) ? 1 : -1;
int32_t nextIndex = -1;
*loadNeighbor = false;
SBrinRecord rec = {0};
bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->order, &rec);
bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pBlockScanInfo, &nextIndex, pReader->info.order, &rec);
if (!hasNeighbor) { // do nothing
return code;
}
if (overlapWithNeighborBlock2(pBlockInfo, &rec, pReader->order)) { // load next block
if (overlapWithNeighborBlock2(pBlockInfo, &rec, pReader->info.order)) { // load next block
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
......@@ -3126,7 +3113,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
bool asc = ASCENDING_TRAVERSE(pReader->order);
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
int64_t st = taosGetTimestampUs();
int32_t step = asc ? 1 : -1;
double el = 0;
......@@ -3137,7 +3124,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
STableBlockScanInfo* pBlockScanInfo = NULL;
if (pBlockInfo != NULL) {
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockInfo->uid, sizeof(pBlockInfo->uid))) {
setBlockAllDumped(pDumpInfo, pRecord->lastKey, pReader->order);
setBlockAllDumped(pDumpInfo, pRecord->lastKey, pReader->info.order);
return code;
}
......@@ -3167,7 +3154,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
ASSERT(0);
pBlockScanInfo = *pReader->status.pTableIter;
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockScanInfo->uid, sizeof(pBlockScanInfo->uid))) {
// setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
// setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->info.order);
return code;
}
}
......@@ -3194,7 +3181,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
bool loadNeighbor = false;
code = loadNeighborIfOverlap(pBlockInfo, pBlockScanInfo, pReader, &loadNeighbor);
if ((!loadNeighbor) || (code != 0)) {
setBlockAllDumped(pDumpInfo, pRecord->lastKey, pReader->order);
setBlockAllDumped(pDumpInfo, pRecord->lastKey, pReader->info.order);
break;
}
}
......@@ -3214,7 +3201,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
// currently loaded file data block is consumed
if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
// pBlock = getCurrentBlock(&pReader->status.blockIter);
setBlockAllDumped(pDumpInfo, pRecord->lastKey, pReader->order);
setBlockAllDumped(pDumpInfo, pRecord->lastKey, pReader->info.order);
break;
}
......@@ -3287,7 +3274,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t orde
}
TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) {
bool asc = ASCENDING_TRAVERSE(pReader->order);
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
TSDBKEY key = {.ts = TSKEY_INITIAL_VAL}, ikey = {.ts = TSKEY_INITIAL_VAL};
bool hasKey = false, hasIKey = false;
......@@ -3484,7 +3471,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockInfo->uid, sizeof(pBlockInfo->uid))) {
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->order);
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order);
return code;
}
......@@ -3508,13 +3495,13 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
// build composed data block
code = buildComposedDataBlock(pReader);
} else if (bufferDataInFileBlockGap(pReader->order, keyInBuf, pBlockInfo)) {
} else if (bufferDataInFileBlockGap(pReader->info.order, keyInBuf, pBlockInfo)) {
// data in memory that are earlier than current file block
// rows in buffer should be less than the file block in asc, greater than file block in desc
int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey;
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey;
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
} else {
if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->order)) {
if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->info.order)) {
// only return the rows in last block
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
ASSERT(tsLast >= pBlockInfo->record.lastKey);
......@@ -3561,10 +3548,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
pInfo->dataLoad = 0;
pInfo->window = (STimeWindow){.skey = pBlockInfo->record.firstKey, .ekey = pBlockInfo->record.lastKey};
setComposedBlockFlag(pReader, false);
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->order);
setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->record.lastKey, pReader->info.order);
// update the last key for the corresponding table
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey;
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey;
tsdbDebug("%p uid:%" PRIu64 " clean file block retrieved from file, global index:%d, "
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow,
......@@ -3595,7 +3582,7 @@ static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReade
SBlockIdx* pBlockIdx = NULL;
for (int32_t i = 0; i < num; ++i) {
pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
if (pBlockIdx->suid != pReader->suid) {
if (pBlockIdx->suid != pReader->info.suid) {
continue;
}
......@@ -3637,7 +3624,7 @@ static int32_t doSumSttBlockRows(STsdbReader* pReader) {
// all identical
if (pStart->suid == pEnd->suid) {
if (pStart->suid != pReader->suid) {
if (pStart->suid != pReader->info.suid) {
// no qualified stt block existed
taosArrayClear(pBlockLoadInfo->aSttBlk);
continue;
......@@ -3650,13 +3637,13 @@ static int32_t doSumSttBlockRows(STsdbReader* pReader) {
for (int32_t j = 0; j < size; ++j) {
SSttBlk* p = taosArrayGet(pBlockLoadInfo->aSttBlk, j);
uint64_t s = p->suid;
if (s < pReader->suid) {
if (s < pReader->info.suid) {
continue;
}
if (s == pReader->suid) {
if (s == pReader->info.suid) {
pReader->rowsNum += p->nRow;
} else if (s > pReader->suid) {
} else if (s > pReader->info.suid) {
break;
}
}
......@@ -3734,9 +3721,9 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
}
initMemDataIterator(*pBlockScanInfo, pReader);
initDelSkylineIterator(*pBlockScanInfo, pReader->order, &pReader->cost);
initDelSkylineIterator(*pBlockScanInfo, pReader->info.order, &pReader->cost);
int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? INT64_MAX : INT64_MIN;
int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -3756,7 +3743,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
// set the correct start position in case of the first/last file block, according to the query time window
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
int64_t lastKey = ASCENDING_TRAVERSE(pReader->order) ? INT64_MIN : INT64_MAX;
int64_t lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? INT64_MIN : INT64_MAX;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
SReaderStatus* pStatus = &pReader->status;
SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
......@@ -3768,7 +3755,7 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter)
}
pDumpInfo->totalRows = pBlockInfo->record.numRow;
pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlockInfo->record.numRow - 1;
pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->info.order) ? 0 : pBlockInfo->record.numRow - 1;
} else {
pDumpInfo->totalRows = 0;
pDumpInfo->rowIndex = 0;
......@@ -3800,7 +3787,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl
code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks, pTableList);
} else { // no block data, only last block exists
tBlockDataReset(&pReader->status.fileBlockData);
resetDataBlockIterator(pBlockIter, pReader->order);
resetDataBlockIterator(pBlockIter, pReader->info.order);
resetTableListIndex(&pReader->status);
}
......@@ -3858,7 +3845,7 @@ static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) {
static int32_t buildBlockFromFiles(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
bool asc = ASCENDING_TRAVERSE(pReader->order);
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
......@@ -3897,7 +3884,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
}
tBlockDataReset(pBlockData);
resetDataBlockIterator(pBlockIter, pReader->order);
resetDataBlockIterator(pBlockIter, pReader->info.order);
resetTableListIndex(&pReader->status);
ERetrieveType type = doReadDataFromLastFiles(pReader);
......@@ -4085,14 +4072,14 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
TSDBKEY key = TSDBROW_KEY(pRow);
if (outOfTimeWindow(key.ts, &pReader->window)) {
if (outOfTimeWindow(key.ts, &pReader->info.window)) {
pIter->hasVal = false;
return NULL;
}
// it is a valid data version
if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) &&
(!hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, pReader->order, &pReader->verRange))) {
if ((key.version <= pReader->info.verRange.maxVer && key.version >= pReader->info.verRange.minVer) &&
(!hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, pReader->info.order, &pReader->info.verRange))) {
return pRow;
}
......@@ -4105,13 +4092,13 @@ TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* p
pRow = tsdbTbDataIterGet(pIter->iter);
key = TSDBROW_KEY(pRow);
if (outOfTimeWindow(key.ts, &pReader->window)) {
if (outOfTimeWindow(key.ts, &pReader->info.window)) {
pIter->hasVal = false;
return NULL;
}
if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer &&
(!hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, pReader->order, &pReader->verRange))) {
if (key.version <= pReader->info.verRange.maxVer && key.version >= pReader->info.verRange.minVer &&
(!hasBeenDropped(pDelList, &pIter->index, key.ts, key.version, pReader->info.order, &pReader->info.verRange))) {
return pRow;
}
}
......@@ -4179,17 +4166,17 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
CHECK_FILEBLOCK_STATE* state) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pBlockData = &pReader->status.fileBlockData;
bool asc = ASCENDING_TRAVERSE(pReader->order);
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
*state = CHECK_FILEBLOCK_QUIT;
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
int32_t step = ASCENDING_TRAVERSE(pReader->info.order) ? 1 : -1;
bool loadNeighbor = true;
int32_t code = loadNeighborIfOverlap(pFBlock, pScanInfo, pReader, &loadNeighbor);
if (loadNeighbor && (code == TSDB_CODE_SUCCESS)) {
pDumpInfo->rowIndex =
doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->info.verRange, step);
if ((pDumpInfo->rowIndex >= pDumpInfo->totalRows && asc) || (pDumpInfo->rowIndex < 0 && !asc)) {
*state = CHECK_FILEBLOCK_CONT;
}
......@@ -4202,14 +4189,14 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SRowMerger* pMerger = &pReader->status.merger;
bool asc = ASCENDING_TRAVERSE(pReader->order);
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
int32_t step = asc ? 1 : -1;
pDumpInfo->rowIndex += step;
if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) {
pDumpInfo->rowIndex =
doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->info.verRange, step);
}
// all rows are consumed, let's try next file block
......@@ -4303,7 +4290,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
tsdbRowMergerAdd(&pReader->status.merger,pNextRow, pTSchema1);
} else { // let's merge rows in file block
code = tsdbRowMergerAdd(&pReader->status.merger, &current, pReader->pSchema);
code = tsdbRowMergerAdd(&pReader->status.merger, &current, pReader->info.pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -4344,7 +4331,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
return terrno;
}
if (ASCENDING_TRAVERSE(pReader->order)) { // ascending order imem --> mem
if (ASCENDING_TRAVERSE(pReader->info.order)) { // ascending order imem --> mem
int32_t code = tsdbRowMergerAdd(&pReader->status.merger, piRow, piSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -4393,7 +4380,7 @@ static int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbRea
uint64_t uid = pBlockScanInfo->uid;
// todo refactor
bool asc = ASCENDING_TRAVERSE(pReader->order);
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
if (pBlockScanInfo->iter.hasVal) {
TSDBKEY k = TSDBROW_KEY(pRow);
if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
......@@ -4627,12 +4614,12 @@ int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t
pUidList->tableUidList[i] = pList[i].uid;
// todo extract method
if (ASCENDING_TRAVERSE(pReader->order)) {
int64_t skey = pReader->window.skey;
if (ASCENDING_TRAVERSE(pReader->info.order)) {
int64_t skey = pReader->info.window.skey;
pInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
pInfo->lastKeyInStt = skey;
} else {
int64_t ekey = pReader->window.ekey;
int64_t ekey = pReader->info.window.ekey;
pInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
pInfo->lastKeyInStt = ekey;
}
......@@ -4657,19 +4644,19 @@ void* tsdbGetIvtIdx2(SMeta* pMeta) {
return metaGetIvtIdx(pMeta);
}
uint64_t tsdbGetReaderMaxVersion2(STsdbReader* pReader) { return pReader->verRange.maxVer; }
uint64_t tsdbGetReaderMaxVersion2(STsdbReader* pReader) { return pReader->info.verRange.maxVer; }
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
initFilesetIterator(&pStatus->fileIter, pReader->pfSetArray, pReader);
resetDataBlockIterator(&pStatus->blockIter, pReader->order);
initFilesetIterator(&pStatus->fileIter, pReader->status.pfSetArray, pReader);
resetDataBlockIterator(&pStatus->blockIter, pReader->info.order);
int32_t code = TSDB_CODE_SUCCESS;
if (pStatus->fileIter.numOfFiles == 0) {
pStatus->loadFromFile = false;
} else if (READ_MODE_COUNT_ONLY == pReader->readMode) {
} else if (READ_MODE_COUNT_ONLY == pReader->info.readMode) {
// DO NOTHING
} else {
code = initForFirstBlockInFile(pReader, pBlockIter);
......@@ -4705,7 +4692,7 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
// check for query time window
STsdbReader* pReader = *ppReader;
if (isEmptyQueryTimeWindow(&pReader->window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) {
if (isEmptyQueryTimeWindow(&pReader->info.window) && pCond->type == TIMEWINDOW_RANGE_CONTAINED) {
tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
return TSDB_CODE_SUCCESS;
}
......@@ -4748,20 +4735,20 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
// no valid error code set in metaGetTbTSchema, so let's set the error code here.
// we should proceed in case of tmq processing.
if (pCond->suid != 0) {
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1);
if (pReader->pSchema == NULL) {
tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr);
pReader->info.pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, -1, 1);
if (pReader->info.pSchema == NULL) {
tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->info.suid, pReader->idStr);
}
} else if (numOfTables > 0) {
STableKeyInfo* pKey = pTableList;
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
if (pReader->pSchema == NULL) {
pReader->info.pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1);
if (pReader->info.pSchema == NULL) {
tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr);
}
}
if (pReader->pSchema != NULL) {
tsdbRowMergerInit(&pReader->status.merger, pReader->pSchema);
if (pReader->info.pSchema != NULL) {
tsdbRowMergerInit(&pReader->status.merger, pReader->info.pSchema);
}
pReader->pSchemaMap = tSimpleHashInit(8, taosFastHash);
......@@ -4772,8 +4759,8 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
}
tSimpleHashSetFreeFp(pReader->pSchemaMap, freeSchemaFunc);
if (pReader->pSchema != NULL) {
code = updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo);
if (pReader->info.pSchema != NULL) {
code = updateBlockSMAInfo(pReader->info.pSchema, &pReader->suppInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
......@@ -4797,15 +4784,15 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
pReader->flag = READER_STATUS_SUSPEND;
if (countOnly) {
pReader->readMode = READ_MODE_COUNT_ONLY;
pReader->info.readMode = READ_MODE_COUNT_ONLY;
}
pReader->pIgnoreTables = pIgnoreTables;
tsdbDebug("%p total numOfTable:%d, window:%" PRId64 " - %" PRId64 ", verRange:%" PRId64 " - %" PRId64
" in this query %s",
pReader, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->verRange.minVer,
pReader->verRange.maxVer, pReader->idStr);
pReader, numOfTables, pReader->info.window.skey, pReader->info.window.ekey, pReader->info.verRange.minVer,
pReader->info.verRange.maxVer, pReader->idStr);
return code;
......@@ -4819,22 +4806,22 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
static void clearSharedPtr(STsdbReader* p) {
p->status.pTableMap = NULL;
p->status.uidList.tableUidList = NULL;
p->status.pfSetArray = NULL;
p->info.pSchema = NULL;
p->pReadSnap = NULL;
p->pSchema = NULL;
p->pSchemaMap = NULL;
p->pfSetArray = NULL;
}
static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) {
pDst->status.pTableMap = pSrc->status.pTableMap;
pDst->status.uidList = pSrc->status.uidList;
pDst->pSchema = pSrc->pSchema;
pDst->status.pfSetArray = pSrc->status.pfSetArray;
pDst->info.pSchema = pSrc->info.pSchema;
pDst->pSchemaMap = pSrc->pSchemaMap;
pDst->pReadSnap = pSrc->pReadSnap;
pDst->pfSetArray = pSrc->pfSetArray;
if (pDst->pSchema) {
tsdbRowMergerInit(&pDst->status.merger, pDst->pSchema);
if (pDst->info.pSchema) {
tsdbRowMergerInit(&pDst->status.merger, pDst->info.pSchema);
}
}
......@@ -4885,15 +4872,6 @@ void tsdbReaderClose2(STsdbReader* pReader) {
tsdbDataFileReaderClose(&pReader->pFileReader);
}
if (pReader->pDelFReader != NULL) {
tsdbDelFReaderClose(&pReader->pDelFReader);
}
if (pReader->pDelIdx != NULL) {
taosArrayDestroy(pReader->pDelIdx);
pReader->pDelIdx = NULL;
}
qTrace("tsdb/reader-close: %p, untake snapshot", pReader);
tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, true);
pReader->pReadSnap = NULL;
......@@ -4926,7 +4904,7 @@ void tsdbReaderClose2(STsdbReader* pReader) {
taosMemoryFree(pReader->idStr);
tsdbRowMergerCleanup(&pReader->status.merger);
taosMemoryFree(pReader->pSchema);
taosMemoryFree(pReader->info.pSchema);
tSimpleHashCleanup(pReader->pSchemaMap);
taosMemoryFreeClear(pReader);
......@@ -5001,7 +4979,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
if (pBlockScanInfo) {
// save lastKey to restore memory iterator
STimeWindow w = pReader->resBlockInfo.pResBlock->info.window;
pBlockScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? w.ekey : w.skey;
pBlockScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? w.ekey : w.skey;
// reset current current table's data block scan info,
pBlockScanInfo->iterInit = false;
......@@ -5148,7 +5126,7 @@ static int32_t doTsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) {
return code;
}
if (READ_MODE_COUNT_ONLY == pReader->readMode) {
if (READ_MODE_COUNT_ONLY == pReader->info.readMode) {
return tsdbReadRowsCountOnly(pReader);
}
......@@ -5176,7 +5154,7 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) {
*hasNext = false;
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->step == EXTERNAL_ROWS_NEXT || pReader->code != TSDB_CODE_SUCCESS) {
if (isEmptyQueryTimeWindow(&pReader->info.window) || pReader->step == EXTERNAL_ROWS_NEXT || pReader->code != TSDB_CODE_SUCCESS) {
return (pReader->code != TSDB_CODE_SUCCESS)? pReader->code:code;
}
......@@ -5216,7 +5194,7 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) {
// prepare for the main scan
code = doOpenReaderImpl(pReader);
int32_t step = 1;
resetAllDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey, step);
resetAllDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->info.window.ekey, step);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -5244,7 +5222,7 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) {
// prepare for the next row scan
int32_t step = -1;
code = doOpenReaderImpl(pReader->innerReader[1]);
resetAllDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey, step);
resetAllDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->info.window.ekey, step);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -5476,7 +5454,7 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) {
}
}
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
if (isEmptyQueryTimeWindow(&pReader->info.window) || pReader->pReadSnap == NULL) {
tsdbDebug("tsdb reader reset return %p, %s", pReader->pReadSnap, pReader->idStr);
tsdbReleaseReader(pReader);
return TSDB_CODE_SUCCESS;
......@@ -5485,11 +5463,11 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) {
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
pReader->order = pCond->order;
pReader->info.order = pCond->order;
pReader->type = TIMEWINDOW_RANGE_CONTAINED;
pStatus->loadFromFile = true;
pStatus->pTableIter = NULL;
pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
pReader->info.window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
// allocate buffer in order to load data blocks from file
memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg));
......@@ -5499,13 +5477,13 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) {
int32_t numOfTables = tSimpleHashGetSize(pStatus->pTableMap);
initFilesetIterator(&pStatus->fileIter, pReader->pfSetArray, pReader);
resetDataBlockIterator(pBlockIter, pReader->order);
initFilesetIterator(&pStatus->fileIter, pReader->status.pfSetArray, pReader);
resetDataBlockIterator(pBlockIter, pReader->info.order);
resetTableListIndex(&pReader->status);
bool asc = ASCENDING_TRAVERSE(pReader->order);
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
int32_t step = asc ? 1 : -1;
int64_t ts = asc ? pReader->window.skey - 1 : pReader->window.ekey + 1;
int64_t ts = asc ? pReader->info.window.skey - 1 : pReader->info.window.ekey + 1;
resetAllDataBlockScanInfo(pStatus->pTableMap, ts, step);
// no data in files, let's try buffer in memory
......@@ -5516,7 +5494,7 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) {
code = initForFirstBlockInFile(pReader, pBlockIter);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader,
numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr);
numOfTables, pReader->info.window.skey, pReader->info.window.ekey, pReader->idStr);
tsdbReleaseReader(pReader);
return code;
......@@ -5525,7 +5503,7 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) {
tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64
" in query %s",
pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey,
pReader, pReader->info.suid, numOfTables, pCond->twindows.skey, pReader->info.window.skey, pReader->info.window.ekey,
pReader->idStr);
tsdbReleaseReader(pReader);
......@@ -5648,7 +5626,7 @@ int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader* pReader) {
STbData* d = NULL;
if (pReader->pReadSnap->pMem != NULL) {
d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->info.suid, pBlockScanInfo->uid);
if (d != NULL) {
rows += tsdbGetNRowsInTbData(d);
}
......@@ -5656,7 +5634,7 @@ int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader* pReader) {
STbData* di = NULL;
if (pReader->pReadSnap->pIMem != NULL) {
di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->info.suid, pBlockScanInfo->uid);
if (di != NULL) {
rows += tsdbGetNRowsInTbData(di);
}
......@@ -5710,7 +5688,7 @@ int32_t tsdbGetTableSchema2(void* pVnode, int64_t uid, STSchema** pSchema, int64
int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STsdbReadSnap** ppSnap) {
int32_t code = 0;
STsdb* pTsdb = pReader->pTsdb;
SVersionRange* pRange = &pReader->verRange;
SVersionRange* pRange = &pReader->info.verRange;
// lock
taosThreadRwlockRdlock(&pTsdb->rwLock);
......@@ -5752,7 +5730,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
}
// fs
code = tsdbFSCreateRefSnapshot(pTsdb->pFS, &pReader->pfSetArray);
code = tsdbFSCreateRefSnapshot(pTsdb->pFS, &pReader->status.pfSetArray);
if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
goto _exit;
......@@ -5795,7 +5773,7 @@ void tsdbUntakeReadSnap2(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proact
if (pSnap->pINode) taosMemoryFree(pSnap->pINode);
taosMemoryFree(pSnap);
tsdbFSDestroyRefSnapshot(&pReader->pfSetArray);
tsdbFSDestroyRefSnapshot(&pReader->status.pfSetArray);
}
tsdbTrace("vgId:%d, untake read snapshot", TD_VID(pTsdb->pVnode));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册