提交 c7559a81 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 27b719fe
......@@ -130,7 +130,7 @@ bool tsdbNextDataBlock(STsdbReader *pReader);
void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo);
int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
void tsdbResetReadHandle(STsdbReader *pReader, SQueryTableDataCond *pCond, int32_t tWinIdx);
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond, int32_t tWinIdx);
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
......
......@@ -16,17 +16,22 @@
#include "tsdb.h"
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
typedef struct {
STbDataIter *iter;
int32_t index;
bool hasVal;
} SIterInfo;
typedef struct STableBlockScanInfo {
uint64_t uid;
TSKEY lastKey;
SBlockIdx blockIdx;
SArray* pBlockList; // block data index list
bool iterInit; // whether to initialize the in-memory skip list iterator or not
STbDataIter* iter; // mem buffer skip list iterator
STbDataIter* iiter; // imem buffer skip list iterator
SIterInfo iter; // mem buffer skip list iterator
SIterInfo iiter; // imem buffer skip list iterator
SArray* delSkyline; // delete info for this table
bool memHasVal;
bool imemHasVal;
int32_t fileDelIndex;
bool iterInit; // whether to initialize the in-memory skip list iterator or not
} STableBlockScanInfo;
typedef struct SBlockOrderWrapper {
......@@ -112,14 +117,14 @@ struct STsdbReader {
char* idStr; // query info handle, for debug purpose
int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
SBlockLoadSuppInfo suppInfo;
SArray* prev; // previous row which is before than time window
SArray* next; // next row which is after the query time window
SIOCostSummary cost;
STSchema* pSchema;
SDataFReader* pFileReader;
SVersionRange verRange;
SDataFReader* pFileReader;
SVersionRange verRange;
#if 0
SArray* prev; // previous row which is before than time window
SArray* next; // next row which is after the query time window
SFileBlockInfo* pDataBlockInfo;
SDataCols* pDataCols; // in order to hold current file data block
int32_t allocSize; // allocated data block size
......@@ -136,19 +141,19 @@ struct STsdbReader {
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
STsdbReader* pReader);
static TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader);
static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
SRowMerger* pMerger);
static int32_t doMergeRowsInBuf(STbDataIter* pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger,
STsdbReader* pReader);
static int32_t doMergeRowsInBuf(SIterInfo *pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader);
static int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow);
static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
static void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader);
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey);
static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, STbDataIter* dIter, bool* hasVal, STSRow** pTSRow,
STsdbReader* pReader);
static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo *pIter, SArray* pDelList, STSRow** pTSRow, STsdbReader* pReader);
static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
STSRow** pTSRow);
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData, STbData* piMemTbData);
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
......@@ -372,7 +377,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
pReader->suid = pCond->suid;
pReader->order = pCond->order;
pReader->capacity = 4096;
pReader->idStr = strdup(idstr);
pReader->idStr = (idstr != NULL)? strdup(idstr):NULL;
pReader->verRange = (SVersionRange) {.minVer = pCond->startVersion, .maxVer = 10000};
pReader->type = pCond->type;
pReader->window = updateQueryTimeWindow(pVnode->pTsdb, pCond->twindows);
......@@ -1708,7 +1713,7 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc
}
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
if (!(pBlockScanInfo->imemHasVal || pBlockScanInfo->memHasVal)) {
if (!(pBlockScanInfo->iiter.hasVal || pBlockScanInfo->iter.hasVal)) {
return TSDB_CODE_SUCCESS;
}
......@@ -1728,13 +1733,14 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
}
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
STSRow* pTSRow, STbDataIter* pIter, bool* hasVal, int64_t key) {
STSRow* pTSRow, SIterInfo* pIter, int64_t key) {
SRowMerger merge = {0};
SBlockData* pBlockData = &pReader->status.fileBlockData;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
SArray* pDelList = pBlockScanInfo->delSkyline;
// ascending order traverse
if (ASCENDING_TRAVERSE(pReader->order)) {
......@@ -1744,19 +1750,19 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow);
} else if (k.ts < key) { // k.ts < key
doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, hasVal, &pTSRow, pReader);
doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader);
} else { // k.ts == key, ascending order: file block ----> imem rows -----> mem rows
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMerge(&merge, pRow);
doMergeRowsInBuf(pIter, hasVal, k.ts, &merge, pReader);
doMergeRowsInBuf(pIter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
tRowMergerGetRow(&merge, &pTSRow);
}
} else { // descending order scan
if (key < k.ts) {
doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, hasVal, &pTSRow, pReader);
doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader);
} else if (k.ts < key) {
tRowMergerInit(&merge, &fRow, pReader->pSchema);
......@@ -1766,7 +1772,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
updateSchema(pRow, pBlockScanInfo->uid, pReader);
tRowMergerInit(&merge, pRow, pReader->pSchema);
doMergeRowsInBuf(pIter, hasVal, k.ts, &merge, pReader);
doMergeRowsInBuf(pIter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
tRowMerge(&merge, &fRow);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
......@@ -1786,9 +1792,10 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pBlockData = &pReader->status.fileBlockData;
SArray* pDelList = pBlockScanInfo->delSkyline;
TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader);
TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader);
TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pDelList, pReader);
TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pDelList, pReader);
ASSERT(pRow != NULL && piRow != NULL);
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
......@@ -1808,12 +1815,12 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (ik.ts == key) {
tRowMerge(&merge, piRow);
doMergeRowsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, key, &merge, pReader);
doMergeRowsInBuf(&pBlockScanInfo->iiter, key, pBlockScanInfo->delSkyline, &merge, pReader);
}
if (k.ts == key) {
tRowMerge(&merge, pRow);
doMergeRowsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, key, &merge, pReader);
doMergeRowsInBuf(&pBlockScanInfo->iter, key, pBlockScanInfo->delSkyline, &merge, pReader);
}
tRowMergerGetRow(&merge, &pTSRow);
......@@ -1825,7 +1832,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
// [3] ik.ts < key <= k.ts
// [4] ik.ts < k.ts <= key
if (ik.ts < k.ts) {
doMergeMultiRows(piRow, uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, &pTSRow, pReader);
doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
......@@ -1833,7 +1840,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
// [5] k.ts < key <= ik.ts
// [6] k.ts < ik.ts <= key
if (k.ts < ik.ts) {
doMergeMultiRows(pRow, uid, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, &pTSRow, pReader);
doMergeMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
......@@ -1853,11 +1860,11 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
updateSchema(pRow, uid, pReader);
tRowMergerInit(&merge, pRow, pReader->pSchema);
doMergeRowsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, key, &merge, pReader);
doMergeRowsInBuf(&pBlockScanInfo->iter, key, pBlockScanInfo->delSkyline, &merge, pReader);
if (ik.ts == k.ts) {
tRowMerge(&merge, piRow);
doMergeRowsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, key, &merge, pReader);
doMergeRowsInBuf(&pBlockScanInfo->iiter, key, pBlockScanInfo->delSkyline, &merge, pReader);
}
if (k.ts == key) {
......@@ -1875,7 +1882,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
// [3] ik.ts > k.ts >= Key
// [4] ik.ts > key >= k.ts
if (ik.ts > key) {
doMergeMultiRows(piRow, uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, &pTSRow, pReader);
doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
......@@ -1894,7 +1901,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
//[7] key = ik.ts > k.ts
if (key == ik.ts) {
doMergeMultiRows(piRow, uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, &pTSRow, pReader);
doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader);
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMerge(&merge, &fRow);
......@@ -1909,7 +1916,8 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
ASSERT(0);
}
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, STsdbReader* pReader) {
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, STableBlockScanInfo* pBlockScanInfo,
STsdbReader* pReader) {
// check for version and time range
int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
......@@ -1921,6 +1929,11 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
return false;
}
TSDBKEY k = {.ts = ts, .version = ver};
if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->fileDelIndex, &k)) {
return false;
}
return true;
}
......@@ -1934,22 +1947,20 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
STSRow* pTSRow = NULL;
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader);
TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader);
TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
if (pBlockScanInfo->memHasVal && pBlockScanInfo->imemHasVal) {
if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
return doMergeThreeLevelRows(pReader, pBlockScanInfo);
} else {
// imem + file
if (pBlockScanInfo->imemHasVal) {
return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, pTSRow, pBlockScanInfo->iiter,
&pBlockScanInfo->imemHasVal, key);
if (pBlockScanInfo->iiter.hasVal) {
return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, pTSRow, &pBlockScanInfo->iiter, key);
}
// mem + file
if (pBlockScanInfo->memHasVal) {
return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, pTSRow, pBlockScanInfo->iter,
&pBlockScanInfo->memHasVal, key);
if (pBlockScanInfo->iter.hasVal) {
return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, pTSRow, &pBlockScanInfo->iter,key);
}
// imem & mem are all empty, only file exist
......@@ -1973,7 +1984,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo*
while (1) {
// todo check the validate of row in file block
{
if (!isValidFileBlockRow(pBlockData, pDumpInfo, pReader)) {
if (!isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
pDumpInfo->rowIndex += step;
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
......@@ -2018,7 +2029,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo*
void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }
static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
if (pBlockScanInfo->iterInit) {
return TSDB_CODE_SUCCESS;
}
......@@ -2038,9 +2049,9 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
if (pReader->pTsdb->mem != NULL) {
tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pBlockScanInfo->uid, &d);
if (d != NULL) {
code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter);
code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter);
if (code == TSDB_CODE_SUCCESS) {
pBlockScanInfo->memHasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter) != NULL);
pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL);
tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 " %s",
......@@ -2059,9 +2070,9 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
if (pReader->pTsdb->imem != NULL) {
tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pBlockScanInfo->uid, &di);
if (di != NULL) {
code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter);
code = tsdbTbDataIterCreate(di, &startKey, backward, &pBlockScanInfo->iiter.iter);
if (code == TSDB_CODE_SUCCESS) {
pBlockScanInfo->imemHasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter) != NULL);
pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL);
tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 " %s",
......@@ -2076,18 +2087,22 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
}
initDelSkylineIterator(pBlockScanInfo, pReader, d, di);
pBlockScanInfo->iterInit = true;
return TSDB_CODE_SUCCESS;
}
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData, STbData* piMemTbData) {
if (pBlockScanInfo->delSkyline != NULL) {
return TSDB_CODE_SUCCESS;
}
#if 0
int32_t code = 0;
STsdb* pTsdb = pReader->pTsdb;
SArray* pDelData = taosArrayInit(4, sizeof(SDelData));
SDelFile *pDelFile = tsdbFSStateGetDelFile(pTsdb->fs->cState);
if (pDelFile) {
SDelFReader* pDelFReader = NULL;
......@@ -2101,27 +2116,51 @@ static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdb
goto _err;
}
SDelFile* pDelFileR = pReader->pTsdb->fs->nState->pDelFile;
if (pDelFileR) {
code = tsdbDelFReaderOpen(&pDelFReader, pDelFileR, pTsdb, NULL);
if (code) {
goto _err;
}
code = tsdbReadDelIdx(pDelFReader, aDelIdx, NULL);
if (code) {
goto _err;
}
code = tsdbReadDelIdx(pDelFReader, aDelIdx, NULL);
if (code) {
goto _err;
}
SDelIdx idx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid};
SDelIdx* pIdx = taosArraySearch(aDelIdx, &idx, tCmprDelIdx, TD_EQ);
code = tsdbReadDelData(pDelFReader, pIdx, pDelData, NULL);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
}
code = tsdbBuildDeleteSkyline(pBlockScanInfo->delSkyline, 0, (int32_t)(nDelData - 1), aSkyline);
SDelData* p = NULL;
if (pMemTbData != NULL) {
p = pMemTbData->pHead;
while (p) {
taosArrayPush(pDelData, p);
p = p->pNext;
}
}
_err:
if (piMemTbData != NULL) {
p = piMemTbData->pHead;
while (p) {
taosArrayPush(pDelData, p);
p = p->pNext;
}
}
if (taosArrayGetSize(pDelData) > 0) {
pBlockScanInfo->delSkyline = taosArrayInit(4, sizeof(TSDBKEY));
code = tsdbBuildDeleteSkyline(pDelData, 0, (int32_t)(taosArrayGetSize(pDelData) - 1), pBlockScanInfo->delSkyline);
}
taosArrayDestroy(pDelData);
pBlockScanInfo->iter.index = ASCENDING_TRAVERSE(pReader->order)? 0:taosArrayGetSize(pBlockScanInfo->delSkyline) - 1;
pBlockScanInfo->iiter.index = pBlockScanInfo->iter.index;
pBlockScanInfo->fileDelIndex = pBlockScanInfo->iter.index;
return code;
#endif
return 0;
_err:
taosArrayDestroy(pDelData);
return code;
}
static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pReader) {
......@@ -2130,13 +2169,13 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
initMemIterator(pScanInfo, pReader);
TSDBROW* pRow = getValidRow(pScanInfo->iter, &pScanInfo->memHasVal, pReader);
initMemDataIterator(pScanInfo, pReader);
TSDBROW* pRow = getValidRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
if (pRow != NULL) {
key = TSDBROW_KEY(pRow);
}
pRow = getValidRow(pScanInfo->iiter, &pScanInfo->imemHasVal, pReader);
pRow = getValidRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
if (pRow != NULL) {
TSDBKEY k = TSDBROW_KEY(pRow);
if (key.ts > k.ts) {
......@@ -2230,7 +2269,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
}
STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
initMemIterator(pBlockScanInfo, pReader);
initMemDataIterator(pBlockScanInfo, pReader);
int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, endKey);
......@@ -2369,51 +2408,89 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
// taosArrayPush(pTsdbReadHandle->pTableCheckInfo, &info);
// }
TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader) {
if (!(*hasVal)) {
bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey) {
ASSERT(pKey != NULL);
if (pDelList == NULL) {
return false;
}
if (*index >= taosArrayGetSize(pDelList) - 1) {
TSDBKEY* last = taosArrayGetLast(pDelList);
if (pKey->ts > last->ts) {
return false;
} else if (pKey->ts == last->ts) {
size_t size = taosArrayGetSize(pDelList);
TSDBKEY* prev = taosArrayGet(pDelList, size - 2);
if (prev->version >= pKey->version) {
return true;
} else {
return false;
}
} else {
ASSERT(0);
}
} else {
TSDBKEY* pCurrent = taosArrayGet(pDelList, *index);
TSDBKEY* pNext = taosArrayGet(pDelList, (*index) + 1);
if (pCurrent->ts <= pKey->ts && pNext->ts >= pKey->ts && pCurrent->version >= pKey->version) {
return true;
} else {
while (pNext->ts < pKey->ts && (*index) < taosArrayGetSize(pDelList) - 1) {
(*index) += 1;
}
return false;
}
}
}
TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
if (!pIter->hasVal) {
return NULL;
}
TSDBROW* pRow = tsdbTbDataIterGet(pIter);
TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter);
TSDBKEY key = TSDBROW_KEY(pRow);
if (outOfTimeWindow(key.ts, &pReader->window)) {
*hasVal = false;
pIter->hasVal = false;
return NULL;
}
if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) {
// it is a valid data version
if ((key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) && (!hasBeenDropped(pDelList, &pIter->index, &key))) {
return pRow;
}
while (1) {
*hasVal = tsdbTbDataIterNext(pIter);
if (!(*hasVal)) {
pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
if (!pIter->hasVal) {
return NULL;
}
pRow = tsdbTbDataIterGet(pIter);
pRow = tsdbTbDataIterGet(pIter->iter);
key = TSDBROW_KEY(pRow);
if (outOfTimeWindow(key.ts, &pReader->window)) {
*hasVal = false;
pIter->hasVal = false;
return NULL;
}
if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer) {
if (key.version <= pReader->verRange.maxVer && key.version >= pReader->verRange.minVer && (!hasBeenDropped(pDelList, &pIter->index, &key))) {
return pRow;
}
}
}
int32_t doMergeRowsInBuf(STbDataIter* pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader) {
int32_t doMergeRowsInBuf(SIterInfo *pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader) {
while (1) {
*hasVal = tsdbTbDataIterNext(pIter);
if (!(*hasVal)) {
pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
if (!pIter->hasVal) {
break;
}
// data exists but not valid
TSDBROW* pRow = getValidRow(pIter, hasVal, pReader);
TSDBROW* pRow = getValidRow(pIter, pDelList, pReader);
if (pRow == NULL) {
break;
}
......@@ -2539,15 +2616,14 @@ void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) {
}
}
void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, STbDataIter* dIter, bool* hasVal, STSRow** pTSRow,
STsdbReader* pReader) {
void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo *pIter, SArray* pDelList, STSRow** pTSRow, STsdbReader* pReader) {
SRowMerger merge = {0};
TSDBKEY k = TSDBROW_KEY(pRow);
updateSchema(pRow, uid, pReader);
tRowMergerInit(&merge, pRow, pReader->pSchema);
doMergeRowsInBuf(dIter, hasVal, k.ts, &merge, pReader);
doMergeRowsInBuf(pIter, k.ts, pDelList, &merge, pReader);
tRowMergerGetRow(&merge, pTSRow);
}
......@@ -2562,18 +2638,18 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlo
updateSchema(piRow, pBlockScanInfo->uid, pReader);
tRowMergerInit(&merge, piRow, pReader->pSchema);
doMergeRowsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
doMergeRowsInBuf(&pBlockScanInfo->iiter, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
tRowMerge(&merge, pRow);
doMergeRowsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
doMergeRowsInBuf(&pBlockScanInfo->iter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
} else {
updateSchema(pRow, pBlockScanInfo->uid, pReader);
tRowMergerInit(&merge, pRow, pReader->pSchema);
doMergeRowsInBuf(pBlockScanInfo->iiter, &pBlockScanInfo->memHasVal, ik.ts, &merge, pReader);
doMergeRowsInBuf(&pBlockScanInfo->iter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
tRowMerge(&merge, piRow);
doMergeRowsInBuf(pBlockScanInfo->iter, &pBlockScanInfo->imemHasVal, k.ts, &merge, pReader);
doMergeRowsInBuf(&pBlockScanInfo->iiter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
}
tRowMergerGetRow(&merge, pTSRow);
......@@ -2581,33 +2657,34 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlo
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow,
int64_t endKey) {
TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader);
TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader);
TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
SArray* pDelList = pBlockScanInfo->delSkyline;
// todo refactor
bool asc = ASCENDING_TRAVERSE(pReader->order);
if (pBlockScanInfo->memHasVal) {
if (pBlockScanInfo->iter.hasVal) {
TSDBKEY k = TSDBROW_KEY(pRow);
if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
pRow = NULL;
}
}
if (pBlockScanInfo->imemHasVal) {
if (pBlockScanInfo->iiter.hasVal) {
TSDBKEY k = TSDBROW_KEY(piRow);
if ((k.ts >= endKey && asc) || (k.ts <= endKey && !asc)) {
piRow = NULL;
}
}
if (pBlockScanInfo->memHasVal && pBlockScanInfo->imemHasVal && pRow != NULL && piRow != NULL) {
if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal && pRow != NULL && piRow != NULL) {
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow);
if (ik.ts < k.ts) { // ik.ts < k.ts
doMergeMultiRows(piRow, pBlockScanInfo->uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pTSRow, pReader);
doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader);
} else if (k.ts < ik.ts) {
doMergeMultiRows(pRow, pBlockScanInfo->uid, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pTSRow, pReader);
doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader);
} else { // ik.ts == k.ts
doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow);
}
......@@ -2615,13 +2692,13 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
return TSDB_CODE_SUCCESS;
}
if (pBlockScanInfo->memHasVal && pRow != NULL) {
doMergeMultiRows(pRow, pBlockScanInfo->uid, pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pTSRow, pReader);
if (pBlockScanInfo->iter.hasVal && pRow != NULL) {
doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader);
return TSDB_CODE_SUCCESS;
}
if (pBlockScanInfo->imemHasVal && piRow != NULL) {
doMergeMultiRows(piRow, pBlockScanInfo->uid, pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pTSRow, pReader);
if (pBlockScanInfo->iiter.hasVal && piRow != NULL) {
doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader);
return TSDB_CODE_SUCCESS;
}
......@@ -2686,7 +2763,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
doAppendOneRow(pBlock, pReader, pTSRow);
// no data in buffer, return immediately
if (!(pBlockScanInfo->memHasVal || pBlockScanInfo->imemHasVal)) {
if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
break;
}
......@@ -2699,12 +2776,13 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
return TSDB_CODE_SUCCESS;
}
// todo refactor, use arraylist instead
int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) {
// if (pReader->pTableCheckInfo) taosArrayDestroy(pReader->pTableCheckInfo);
// pReader->pTableCheckInfo = createCheckInfoFromUid(pReader, uid);
// if (pReader->pTableCheckInfo == NULL) {
// return TSDB_CODE_TDB_OUT_OF_MEMORY;
// }
ASSERT(pReader != NULL);
taosHashClear(pReader->status.pTableMap);
STableBlockScanInfo info = {.lastKey = 0, .uid = uid};
taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
return TDB_CODE_SUCCESS;
}
......@@ -3165,40 +3243,46 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
return pReader->pResBlock->pDataBlock;
}
void tsdbResetReadHandle(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_t tWinIdx) {
// if (isEmptyQueryTimeWindow(pReader)) {
// if (pCond->order != pReader->order) {
// pReader->order = pCond->order;
// TSWAP(pReader->window.skey, pReader->window.ekey);
// }
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_t tWinIdx) {
if (isEmptyQueryTimeWindow(&pReader->window)) {
return TSDB_CODE_SUCCESS;
}
// return;
// }
setQueryTimewindow(pReader, pCond, tWinIdx);
// pReader->order = pCond->order;
// setQueryTimewindow(pReader, pCond, tWinIdx);
// pReader->type = TSDB_QUERY_TYPE_ALL;
// pReader->cur.fid = -1;
// pReader->cur.win = TSWINDOW_INITIALIZER;
// pReader->checkFiles = true;
// pReader->activeIndex = 0; // current active table index
// pReader->locateStart = false;
// pReader->loadExternalRow = pCond->loadExternalRows;
// if (ASCENDING_TRAVERSE(pCond->order)) {
// assert(pReader->window.skey <= pReader->window.ekey);
// } else {
// assert(pReader->window.skey >= pReader->window.ekey);
// }
pReader->order = pCond->order;
pReader->type = BLOCK_LOAD_OFFSET_ORDER;
pReader->status.loadFromFile = true;
pReader->status.pTableIter = NULL;
// // allocate buffer in order to load data blocks from file
// memset(pReader->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
// memset(pReader->suppInfo.plist, 0, POINTER_BYTES);
pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows[tWinIdx]);
// tsdbInitDataBlockLoadInfo(&pReader->dataBlockLoadInfo);
// tsdbInitCompBlockLoadInfo(&pReader->compBlockLoadInfo);
// allocate buffer in order to load data blocks from file
memset(pReader->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
memset(pReader->suppInfo.plist, 0, POINTER_BYTES);
// todo set the correct numOfTables
int32_t numOfTables = 1;
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
STsdbFSState* pFState = pReader->pTsdb->fs->cState;
initFilesetIterator(&pReader->status.fileIter, pFState, pReader->order, pReader->idStr);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
int32_t code = 0;
// no data in files, let's try buffer in memory
if (pReader->status.fileIter.numOfFiles == 0) {
pReader->status.loadFromFile = false;
} else {
code = initForFirstBlockInFile(pReader, pBlockIter);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
// resetCheckInfo(pReader);
ASSERT(0);
tsdbDebug("%p reset tsdbreader in query %s", pReader, numOfTables, pReader->idStr);
return code;
}
int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTableBlockInfo) {
......
......@@ -2866,7 +2866,7 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
tsdbSetTableId(pInfo->dataReader, uid);
int64_t oldSkey = pInfo->cond.twindows[0].skey;
pInfo->cond.twindows[0].skey = ts + 1;
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
tsdbReaderReset(pInfo->dataReader, &pInfo->cond, 0);
pInfo->cond.twindows[0].skey = oldSkey;
pInfo->scanTimes = 0;
pInfo->curTWinIdx = 0;
......
......@@ -440,7 +440,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
}
pTableScanInfo->curTWinIdx += 1;
if (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, pTableScanInfo->curTWinIdx);
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, pTableScanInfo->curTWinIdx);
}
}
......@@ -455,7 +455,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
qDebug("%s qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
}
// do prepare for the next round table scan operation
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
pTableScanInfo->curTWinIdx = 0;
}
}
......@@ -464,7 +464,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
if (pTableScanInfo->scanTimes < total) {
if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) {
prepareForDescendingScan(pTableScanInfo, pTableScanInfo->pCtx, 0);
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
pTableScanInfo->curTWinIdx = 0;
}
......@@ -482,7 +482,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
}
pTableScanInfo->curTWinIdx += 1;
if (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, pTableScanInfo->curTWinIdx);
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, pTableScanInfo->curTWinIdx);
}
}
......@@ -498,7 +498,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
qDebug("%s qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
}
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
pTableScanInfo->curTWinIdx = 0;
}
}
......@@ -526,7 +526,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
}
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable);
tsdbSetTableId(pInfo->dataReader, pTableInfo->uid);
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
tsdbReaderReset(pInfo->dataReader, &pInfo->cond, 0);
pInfo->scanTimes = 0;
pInfo->curTWinIdx = 0;
}
......@@ -560,7 +560,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId);
// tsdbSetTableList(pInfo->dataReader, tableList);
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
tsdbReaderReset(pInfo->dataReader, &pInfo->cond, 0);
pInfo->curTWinIdx = 0;
pInfo->scanTimes = 0;
......@@ -859,7 +859,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3
STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info;
pTableScanInfo->cond.twindows[0] = win;
pTableScanInfo->curTWinIdx = 0;
// tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
// tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
// if (!pTableScanInfo->dataReader) {
// return false;
// }
......
......@@ -131,6 +131,7 @@ if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $expectmsgcnt then
print expect $expectmsgcnt , actual $data02
return -1
endi
if $data[0][3] != $expectmsgcnt then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册