提交 c638c33a 编写于 作者: H Haojun Liao

enh(query): retrieve in-memory data.

上级 aac1fb64
......@@ -63,8 +63,8 @@ typedef struct STableBlockScanInfo {
// int32_t numOfBlocks : 29; // number of qualified data blocks not the original blocks
uint8_t chosen : 2; // indicate which iterator should move forward
bool iterInit; // whether to initialize the in-memory skip list iterator or not
STbDataIter iter; // mem buffer skip list iterator
STbDataIter iiter; // imem buffer skip list iterator
STbDataIter* iter; // mem buffer skip list iterator
STbDataIter* iiter; // imem buffer skip list iterator
bool memHasVal;
bool imemHasVal;
} STableBlockScanInfo;
......@@ -126,17 +126,14 @@ typedef struct SComposedDataBlock {
typedef struct SReaderStatus {
SQueryFilePos cur; // current position
int32_t tableListIndex;
bool loadFromFile; // check file stage
bool initStartPos;
SHashObj* pTableMap; // SHash<STableBlockScanInfo>
int32_t realNumOfRows;
STableBlockScanInfo* pTableIter; // table iterator used in building in-memory buffer data blocks.
SFileBlockDumpInfo fBlockDumpInfo;
SBlockData fileBlockData;
SFileSetIter fileIter;
SDataBlockIter blockIter;
bool composedDataBlock;// the returned data block is a composed block or not
} SReaderStatus;
struct STsdbReader {
......@@ -173,10 +170,13 @@ struct STsdbReader {
};
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
static int tsdbReadRowsFromCache(STableBlockScanInfo* pScanInfo, TSDBKEY maxKey, int32_t capacity, STsdbReader* pReader);
static int buildInmemDataBlockImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKey, int32_t capacity, STsdbReader* pReader);
static TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader);
static int32_t doLoadRowsOfIdenticalTsInFileBlock(SBlockData* pData, SFileBlockDumpInfo* pDumpInfo, int64_t ts, SRowMerger *pMerger,
STsdbReader* pReader, STSRow** pRow);
static int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock* pBlock, SBlockData* pBlockData,
STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger);
static int32_t doLoadRowsOfIdenticalTs(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader);
static int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow);
static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
// static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
// pBlockLoadInfo->slot = -1;
......@@ -204,7 +204,7 @@ static int32_t setColumnIdList(STsdbReader* pReader, SSDataBlock* pBlock) {
return TSDB_CODE_SUCCESS;
}
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const uint64_t* idList, int32_t numOfTables) {
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
ASSERT(numOfTables >= 1);
// allocate buffer in order to load data blocks from file
......@@ -216,7 +216,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const uint64_
// todo apply the lastkey of table check to avoid to load header file
for (int32_t j = 0; j < numOfTables; ++j) {
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j]};
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) {
info.lastKey = pTsdbReader->window.skey;
......@@ -337,7 +337,7 @@ static void resetDataBlockIterator(SDataBlockIter* pIter) {
pIter->numOfBlocks = -1;
}
static bool nextFilesetIterator(SFileSetIter* pIter, int32_t order, STsdbReader* pReader) {
static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader* pReader) {
if (pIter->index >= pIter->numOfFiles) {
return false;
}
......@@ -370,8 +370,7 @@ static bool nextFilesetIterator(SFileSetIter* pIter, int32_t order, STsdbReader*
static void initReaderStatus(SReaderStatus* pStatus) {
pStatus->cur.fid = INT32_MIN;
pStatus->cur.win = TSWINDOW_INITIALIZER;
pStatus->initStartPos = false;
pStatus->tableListIndex = 0; // current active table index
pStatus->pTableIter = NULL;
pStatus->loadFromFile = true;
}
......@@ -384,13 +383,18 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
}
initReaderStatus(&pReader->status);
pReader->pTsdb = pVnode->pTsdb;
pReader->suid = pCond->suid;
pReader->order = pCond->order;
pReader->capacity = 4096;
pReader->idStr = strdup(idstr);
pReader->startVersion= pCond->startVersion;
pReader->endVersion = pCond->endVersion;
pReader->endVersion = 100000;//pCond->endVersion; // todo for test purpose
pReader->type = pCond->type;
pReader->window = *pCond->twindows;
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1);
// todo remove this
setQueryTimewindow(pReader, pCond, 0);
......@@ -405,7 +409,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
goto _end;
}
// todo use new api refactor this
// todo use new api refactor this after merge with 3.0
pReader->pResBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pReader->pResBlock == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
......@@ -418,13 +422,15 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
colInfo.info = pCond->colList[i];
taosArrayPush(pReader->pResBlock->pDataBlock, &colInfo);
}
pReader->pResBlock->info.numOfCols = taosArrayGetSize(pReader->pResBlock->pDataBlock);
blockDataEnsureCapacity(pReader->pResBlock, pReader->capacity);
setColumnIdList(pReader, pReader->pResBlock);
pReader->suppInfo.slotIds = taosMemoryCalloc(pCond->numOfCols, sizeof(int32_t));
pReader->suppInfo.plist = taosMemoryCalloc(pCond->numOfCols, POINTER_BYTES);
}
// todo refactor
STsdbFSState* pFState = pReader->pTsdb->fs->cState;
initFileIterator(&pReader->status.fileIter, pFState);
resetDataBlockIterator(&pReader->status.blockIter);
......@@ -443,39 +449,6 @@ _end:
return code;
}
// static int32_t setCurrentSchema(SVnode* pVnode, STsdbReader* pTsdbReadHandle) {
// STableBlockScanInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, 0);
// int32_t sversion = 1;
// SMetaReader mr = {0};
// metaReaderInit(&mr, pVnode->pMeta, 0);
// int32_t code = metaGetTableEntryByUid(&mr, pCheckInfo->tableId);
// if (code != TSDB_CODE_SUCCESS) {
// terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
// metaReaderClear(&mr);
// return terrno;
// }
// if (mr.me.type == TSDB_CHILD_TABLE) {
// tb_uid_t suid = mr.me.ctbEntry.suid;
// code = metaGetTableEntryByUid(&mr, suid);
// if (code != TSDB_CODE_SUCCESS) {
// terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
// metaReaderClear(&mr);
// return terrno;
// }
// sversion = mr.me.stbEntry.schemaRow.version;
// } else {
// ASSERT(mr.me.type == TSDB_NORMAL_TABLE);
// sversion = mr.me.ntbEntry.schemaRow.version;
// }
// metaReaderClear(&mr);
// pTsdbReadHandle->pSchema = metaGetTbTSchema(pVnode->pMeta, pCheckInfo->tableId, sversion);
// return TSDB_CODE_SUCCESS;
// }
// void tsdbResetQueryHandleForNewTable(STsdbReader* queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList,
// int32_t tWinIdx) {
// STsdbReader* pTsdbReadHandle = queryHandle;
......@@ -796,7 +769,7 @@ _end:
// int32_t step = ASCENDING_TRAVERSE(pHandle->order) ? 1 : -1;
// STimeWindow* win = &pHandle->cur.win;
// pHandle->cur.rows = tsdbReadRowsFromCache(pCheckInfo, pHandle->window.ekey, pHandle->outputCapacity, win, pHandle);
// pHandle->cur.rows = buildInmemDataBlockImpl(pCheckInfo, pHandle->window.ekey, pHandle->outputCapacity, win, pHandle);
// // update the last key value
// pCheckInfo->lastKey = win->ekey + step;
......@@ -1036,7 +1009,7 @@ _error:
// TSKEY maxKey = ascScan ? (binfo.window.skey - step) : (binfo.window.ekey - step);
// cur->rows =
// tsdbReadRowsFromCache(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle);
// buildInmemDataBlockImpl(pCheckInfo, maxKey, pTsdbReadHandle->outputCapacity, &cur->win, pTsdbReadHandle);
// pTsdbReadHandle->realNumOfRows = cur->rows;
// // update the last key value
......@@ -2083,6 +2056,15 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
return TSDB_CODE_SUCCESS;
}
static bool blockIteratorNext(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
if (pBlockIter->index >= pBlockIter->numOfBlocks - 1) {
return false;
}
pBlockIter->index += 1;
return true;
}
// static int32_t getFirstFileDataBlock(STsdbReader* pTsdbReadHandle, bool* exists);
//static int32_t getDataBlock(STsdbReader* pTsdbReadHandle, SFileBlockInfo* pNext, bool* exists) {
......@@ -2228,77 +2210,271 @@ static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
return pFBlockInfo;
}
static bool overlapWithNeighborBlock(SBlock* pBlock, int32_t blockIndex) {
static bool overlapWithNeighborBlock(SFileDataBlockInfo *pFBlockInfo, SBlock* pBlock, STableBlockScanInfo* pTableBlockScanInfo) {
// it is the last block in current file, no chance to overlap with neighbor blocks.
if(pFBlockInfo->tbBlockIdx == taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) { // last block in current file,
return false;
}
SBlock* pNext = taosArrayGet(pTableBlockScanInfo->pBlockList, pFBlockInfo->tbBlockIdx + 1);
return (pNext->minKey.ts == pBlock->maxKey.ts);
}
static bool bufferDataInFileBlockGap(int32_t order, int64_t key, SBlock* pBlock) {
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock) {
bool ascScan = ASCENDING_TRAVERSE(order);
return (ascScan && (key != TSKEY_INITIAL_VAL && key <= pBlock->minKey.ts)) ||
(!ascScan && (key != TSKEY_INITIAL_VAL && key >= pBlock->maxKey.ts));
return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
(!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
}
static int32_t buildInmemDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlock* pBlock, TSDBKEY *key) {
int32_t code = TSDB_CODE_SUCCESS;
bool ascScan = ASCENDING_TRAVERSE(pReader->order);
bool cacheDataInFileBlockHole = (ascScan && (key->ts != TSKEY_INITIAL_VAL && key->ts < pBlock->minKey.ts)) ||
(!ascScan && (key->ts != TSKEY_INITIAL_VAL && key->ts > pBlock->maxKey.ts));
ASSERT(cacheDataInFileBlockHole);
static int32_t buildInmemDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBKEY *key) {
if (pBlockScanInfo->iter != NULL) {
pBlockScanInfo->memHasVal = tsdbTbDataIterNext(pBlockScanInfo->iter);
} else {
pBlockScanInfo->memHasVal = false;
}
// do not load file block into buffer
int32_t step = ascScan ? 1 : -1;
if (pBlockScanInfo->iiter != NULL) {
pBlockScanInfo->imemHasVal = tsdbTbDataIterNext(pBlockScanInfo->iiter);
} else {
pBlockScanInfo->imemHasVal = false;
}
TSDBKEY maxKey = {.version = pReader->endVersion};
maxKey.ts = ascScan ? (pBlock->minKey.ts - step) : (pBlock->maxKey.ts - step);
if (!(pBlockScanInfo->imemHasVal || pBlockScanInfo->memHasVal)) {
return TSDB_CODE_SUCCESS;
}
pBlockScanInfo->memHasVal = tsdbTbDataIterNext(&pBlockScanInfo->iter);
pBlockScanInfo->imemHasVal = tsdbTbDataIterNext(&pBlockScanInfo->iiter);
int32_t code = buildInmemDataBlockImpl(pBlockScanInfo, *key, pReader->capacity, pReader);
setComposedBlockFlag(pReader, true);
code = tsdbReadRowsFromCache(pBlockScanInfo, maxKey, pReader->capacity, pReader);
// set the correct block data info
return code;
}
static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock, STableBlockScanInfo* pBlockScanInfo) {
SFileBlockDumpInfo *pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pData = &pReader->status.fileBlockData;
SBlockData* pBlockData = &pReader->status.fileBlockData;
STSchema* pSchema = NULL;
SRowMerger merge = {0};
STSRow* pTSRow = NULL;
TSKEY mergeTs = TSKEY_INITIAL_VAL;
int64_t key = pData->aTSKEY[0];
TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader);
TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader);
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader);
TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader);
if (pBlockScanInfo->memHasVal && pBlockScanInfo->imemHasVal) {
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow);
// todo check version in file
if (key < k.ts || key < ik.ts) {
tRowMergerInit(&merge, NULL, pSchema);
STSRow* pTsRow = NULL;
doLoadRowsOfIdenticalTsInFileBlock(pData, pDumpInfo, key, &merge, pReader, &pTsRow);
// [1&2] key <= [k.ts|ik.ts]
if (key <= k.ts || key <= ik.ts) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge);
if (ik.ts == mergeTs) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
}
if (k.ts == mergeTs) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
}
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
} else {
// [3] ik.ts < key <= k.ts
if (ik.ts < k.ts) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
// [4] k.ts < key <= ik.ts
if (k.ts < ik.ts) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
// [5] k.ts == ik.ts < key
if (k.ts == ik.ts) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
tRowMergerGetRow(&merge, &pTSRow);
if (k.ts == mergeTs) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
}
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
// [6] k.ts < ik.ts < key
if (k.ts < ik.ts) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
// [6] ik.ts < k.ts < key
if (ik.ts < k.ts) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
}
} else if (pBlockScanInfo->imemHasVal) {
TSDBKEY ik = TSDBROW_KEY(piRow);
if (key <= ik.ts) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge);
if (ik.ts == mergeTs) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
}
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
if (ik.ts < key) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
} else { // pBlockScanInfo->memHasVal != NULL
TSDBKEY k = TSDBROW_KEY(pRow);
if (key <= k.ts) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doLoadRowsOfIdenticalTsInFileBlock(pFBlock, pBlock, pBlockData, pBlockScanInfo, pReader, &merge);
if (k.ts == mergeTs) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
}
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
if (k.ts < key) {
doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t buildComposedDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock, STableBlockScanInfo* pBlockScanInfo) {
SSDataBlock* pResBlock = pReader->pResBlock;
while(1) {
buildComposedDataBlockImpl(pReader, pFBlock, pBlock, pBlockScanInfo);
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
if (pBlockInfo->tbBlockIdx == pFBlock->tbBlockIdx) { // still in the same file block now
if (pDumpInfo->rowIndex >= pBlock->nRow) {
break;
}
if (pResBlock->info.rows >= pReader->capacity) {
break;
}
} else { // todo traverse to next file due to time window overlap
if (pResBlock->info.rows >= pReader->capacity) {
ASSERT(0);
return TSDB_CODE_SUCCESS;
}
}
}
pResBlock->info.uid = pBlockScanInfo->uid;
setComposedBlockFlag(pReader, true);
return TSDB_CODE_SUCCESS;
}
void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; }
static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
if (pBlockScanInfo->iterInit) {
return TSDB_CODE_SUCCESS;
}
TSDBKEY startKey = {.ts = pReader->window.skey, .version = pReader->startVersion};
STbData* d = NULL;
if (pReader->pTsdb->mem != NULL) {
tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pBlockScanInfo->uid, &d);
tsdbTbDataIterCreate(d, &startKey, 0, &pBlockScanInfo->iter);
}
STbData* di = NULL;
if (pReader->pTsdb->imem != NULL) {
tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pBlockScanInfo->uid, &di);
tsdbTbDataIterCreate(di, &startKey, 0, &pBlockScanInfo->iiter);
}
pBlockScanInfo->iterInit = true;
return TSDB_CODE_SUCCESS;
}
static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pReader) {
TSDBKEY key = {.ts = TSKEY_INITIAL_VAL};
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
initMemIterator(pScanInfo, pReader);
if (pScanInfo->memHasVal) {
TSDBROW* pRow = getValidRow(pScanInfo->iter, &pScanInfo->memHasVal, pReader);
key = TSDBROW_KEY(pRow);
}
if (pScanInfo->imemHasVal) {
TSDBROW* pRow = getValidRow(pScanInfo->iiter, &pScanInfo->imemHasVal, pReader);
TSDBKEY k = TSDBROW_KEY(pRow);
if (key.ts > k.ts) {
key = k;
}
}
return key;
}
static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) {
SReaderStatus* pStatus = &pReader->status;
SFileSetIter* pFIter = &pStatus->fileIter;
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
if (pFIter->index < pFIter->numOfFiles) {
if (pReader->status.blockIter.index == -1) {
int32_t numOfBlocks = 0;
while (1) {
bool hasNext = nextFilesetIterator(&pStatus->fileIter, pReader->order, pReader);
if (!hasNext) {
// no data files on disk
bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader->order, pReader);
if (!hasNext) { // no data files on disk
break;
}
......@@ -2323,51 +2499,61 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) {
// no blocks in current file, try next files
}
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
int32_t code = initBlockIterator(pReader, pBlockIter, numOfBlocks);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int64_t key = 0; // todo get the first qualified key in buffer
// todo extract method: getCurrentKeyInBuf()
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
if (pScanInfo->iterInit == false) {
STbData* d = NULL;
tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pScanInfo->uid, &d);
TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader);
TSDBKEY startKey = {.ts = pReader->window.skey, .version = pReader->startVersion};
tsdbTbDataIterOpen(d, &startKey, 0, &pScanInfo->iter);
STbData* di = NULL;
tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pScanInfo->uid, &di);
tsdbTbDataIterOpen(di, &startKey, 0, &pScanInfo->iiter);
pScanInfo->iterInit = true;
}
if (dataBlockPartialRequired(&pReader->window, pBlock) || overlapWithNeighborBlock(pBlock, pFBlock->tbBlockIdx)) {
if (dataBlockPartialRequired(&pReader->window, pBlock) || overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo) /*|| points overlaps with data block*/) {
SBlockData data = {0};
doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &data);
// build composed data block
buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo);
} else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
// data in memory that are earlier than current file block
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->endVersion};
buildInmemDataBlock(pReader, pScanInfo, pBlock, &maxKey);
buildInmemDataBlock(pReader, pScanInfo, &maxKey);
// build data block from in-memory buffer data completed.
} else { // whole block is required, return it directly
SDataBlockInfo info = {0};
info.rows = pBlock->nRow;
info.uid = pScanInfo->uid;
info.window.skey = pBlock->minKey.ts;
info.window.ekey = pBlock->maxKey.ts;
// todo check the data version
SDataBlockInfo* pInfo = &pReader->pResBlock->info;
pInfo->rows = pBlock->nRow;
pInfo->uid = pScanInfo->uid;
pInfo->window.skey = pBlock->minKey.ts;
pInfo->window.ekey = pBlock->maxKey.ts;
setComposedBlockFlag(pReader, false);
}
} else {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
// current block are exhausted, try the next file block
if (pDumpInfo->rowIndex >= pBlock->nRow) {
bool hasNext = blockIteratorNext(pReader, &pReader->status.blockIter);
if (!hasNext) {
// current file is exhausted, let's try the next file
} else { // try next data block in current file
// 1. check if ts in buffer is overlap with current file data block
TSDBKEY key1 = getCurrentKeyInBuf(pBlockIter, pReader);
}
} else {
buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo);
return TSDB_CODE_SUCCESS;
}
// repeat the previous procedure.
}
}
......@@ -2375,20 +2561,6 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) {
return TSDB_CODE_SUCCESS;
}
// static bool doHasDataInBuffer(STsdbReader* pTsdbReadHandle) {
// size_t numOfTables = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
// while (pTsdbReadHandle->activeIndex < numOfTables) {
// if (hasMoreDataInCache(pTsdbReadHandle)) {
// return true;
// }
// pTsdbReadHandle->activeIndex += 1;
// }
// return false;
// }
// // todo not unref yet, since it is not support multi-group interpolation query
// static UNUSED_FUNC void changeQueryHandleForInterpQuery(STsdbReader* pHandle) {
// // filter the queried time stamp in the first place
......@@ -2423,7 +2595,7 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) {
// }
TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader) {
if (!hasVal) {
if (!(*hasVal)) {
return NULL;
}
......@@ -2462,7 +2634,7 @@ TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader) {
int32_t doLoadRowsOfIdenticalTs(STbDataIter *pIter, bool* hasVal, int64_t ts, SRowMerger* pMerger, STsdbReader* pReader) {
while (1) {
*hasVal = tsdbTbDataIterNext(pIter);
if (!*hasVal) {
if (!(*hasVal)) {
break;
}
......@@ -2478,33 +2650,47 @@ int32_t doLoadRowsOfIdenticalTs(STbDataIter *pIter, bool* hasVal, int64_t ts, SR
return TSDB_CODE_SUCCESS;
}
int32_t doLoadRowsOfIdenticalTsInFileBlock(SBlockData* pData, SFileBlockDumpInfo* pDumpInfo, int64_t ts, SRowMerger *pMerger,
STsdbReader* pReader, STSRow** pRow) {
int64_t key = pData->aTSKEY[pDumpInfo->rowIndex];
if ((pDumpInfo->rowIndex < pData->nRow - 1)) {
if (pData->aTSKEY[pDumpInfo->rowIndex + 1] < key) {
SRowMerger merger = {0};
tRowMergerInit(&merger, NULL, NULL);
int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock* pBlock, SBlockData* pBlockData,
STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
if (pDumpInfo->rowIndex < pBlockData->nRow - 1) {
if (pBlockData->aTSKEY[pDumpInfo->rowIndex + 1] == key) {
int32_t rowIndex = pDumpInfo->rowIndex + 1;
while (pData->aTSKEY[rowIndex] == key) {
tRowMerge(&merger, NULL);
while (pBlockData->aTSKEY[rowIndex] == key) {
if (pBlockData->aVersion[rowIndex] > pReader->endVersion) {
continue;
}
tRowMergerGetRow(&merger, pRow);
tRowMergerClear(&merger);
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex);
tRowMerge(pMerger, &fRow);
rowIndex += 1;
}
} else {
pDumpInfo->rowIndex = rowIndex;
}
} else { // last row of current block, check if current block is overlapped with neighbor block
pDumpInfo->rowIndex += 1;
bool overlap = overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo);
if (overlap) {
// load next block
}
}
return TSDB_CODE_SUCCESS;
}
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow) {
STSchema* pSchema = NULL; // todo set the correct schema
TSKEY mergeTs = TSKEY_INITIAL_VAL;
SRowMerger merge = {0};
TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader);
TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader);
TSDBROW* pRow = getValidRow(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, pReader);
TSDBROW* piRow = getValidRow(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, pReader);
TSDBKEY k = {.ts = TSKEY_INITIAL_VAL};
TSDBKEY ik = {.ts = TSKEY_INITIAL_VAL};
......@@ -2514,33 +2700,35 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
ik = TSDBROW_KEY(piRow);
if (ik.ts <= k.ts) {
tRowMergerInit(&merge, piRow, pSchema);
doLoadRowsOfIdenticalTs(&pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
tRowMergerInit(&merge, piRow, pReader->pSchema);
doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
if (k.ts == mergeTs) {
doLoadRowsOfIdenticalTs(&pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
}
tRowMergerGetRow(&merge, pTSRow);
return TSDB_CODE_SUCCESS;
} else { // k.ts < ik.ts
tRowMergerInit(&merge, pRow, pSchema);
doLoadRowsOfIdenticalTs(&pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
tRowMergerInit(&merge, pRow, pReader->pSchema);
doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
tRowMergerGetRow(&merge, pTSRow);
return TSDB_CODE_SUCCESS;
}
}
if (pBlockScanInfo->memHasVal) {
tRowMergerInit(&merge, pRow, pSchema);
doLoadRowsOfIdenticalTs(&pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
k = TSDBROW_KEY(pRow);
tRowMergerInit(&merge, pRow, pReader->pSchema);
doLoadRowsOfIdenticalTs(pBlockScanInfo->iter, &pBlockScanInfo->memHasVal, k.ts, &merge, pReader);
tRowMergerGetRow(&merge, pTSRow);
return TSDB_CODE_SUCCESS;
}
if (pBlockScanInfo->imemHasVal) {
tRowMergerInit(&merge, piRow, pSchema);
doLoadRowsOfIdenticalTs(&pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
ik = TSDBROW_KEY(piRow);
tRowMergerInit(&merge, piRow, pReader->pSchema);
doLoadRowsOfIdenticalTs(pBlockScanInfo->iiter, &pBlockScanInfo->imemHasVal, ik.ts, &merge, pReader);
tRowMergerGetRow(&merge, pTSRow);
return TSDB_CODE_SUCCESS;
}
......@@ -2548,22 +2736,33 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
return TSDB_CODE_SUCCESS;
}
int32_t tsdbReadRowsFromCache(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKey, int32_t capacity, STsdbReader* pReader) {
int32_t numOfRows = 0;
int32_t numOfCols = (int32_t)taosArrayGetSize(pReader->pResBlock->pDataBlock);
int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow) {
int32_t numOfRows = pBlock->info.rows;
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
SColVal colVal = {0};
for(int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
tTSRowGetVal(pTSRow, pReader->pSchema, pColInfoData->info.colId, &colVal);
colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, colVal.isNull);
}
pBlock->info.rows += 1;
return TSDB_CODE_SUCCESS;
}
int32_t buildInmemDataBlockImpl(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKey, int32_t capacity, STsdbReader* pReader) {
SSDataBlock* pBlock = pReader->pResBlock;
int64_t st = taosGetTimestampUs();
STSchema* pSchema = NULL;
do {
STSRow* pTSRow = NULL;
tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow);
// todo assign to ssdatablock
doAppendOneRow(pBlock, pReader, pTSRow);
if (pBlockScanInfo->memHasVal) {
TSDBROW* pRow = tsdbTbDataIterGet(&pBlockScanInfo->iter);
TSDBROW* pRow = tsdbTbDataIterGet(pBlockScanInfo->iter);
TSDBKEY k = TSDBROW_KEY(pRow);
if (k.ts >= maxKey.ts) {
break;
......@@ -2571,7 +2770,7 @@ int32_t tsdbReadRowsFromCache(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKe
}
if (pBlockScanInfo->imemHasVal) {
TSDBROW* pRow = tsdbTbDataIterGet(&pBlockScanInfo->iiter);
TSDBROW* pRow = tsdbTbDataIterGet(pBlockScanInfo->iiter);
TSDBKEY k = TSDBROW_KEY(pRow);
if (k.ts >= maxKey.ts) {
break;
......@@ -2583,13 +2782,15 @@ int32_t tsdbReadRowsFromCache(STableBlockScanInfo* pBlockScanInfo, TSDBKEY maxKe
break;
}
if (numOfRows >= capacity) {
if (pBlock->info.rows >= capacity) {
break;
}
} while (1);
taosMemoryFreeClear(pSchema);
assert(numOfRows <= capacity);
ASSERT(pBlock->info.rows <= capacity);
pBlock->info.uid = pBlockScanInfo->uid;
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
int64_t elapsedTime = taosGetTimestampUs() - st;
tsdbDebug("%p build data block from cache completed, elapsed time:%" PRId64 " us, numOfRows:%d, numOfCols:%d, %s",
......@@ -2810,7 +3011,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInf
return TSDB_CODE_SUCCESS;
}
pReader->status.pTableMap = createDataBlockScanInfo(pReader, pCond->uidList, pCond->numOfTables);
pReader->status.pTableMap = createDataBlockScanInfo(pReader, pTableList->pTableList->pData, taosArrayGetSize(pTableList->pTableList));
if (pReader->status.pTableMap == NULL) {
tsdbReaderClose(pReader);
*ppReader = NULL;
......@@ -2908,19 +3109,44 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
int64_t stime = taosGetTimestampUs();
int64_t elapsedTime = stime;
SReaderStatus* pStatus = &pReader->status;
if (pReader->type == BLOCK_LOAD_OFFSET_ORDER) {
if (pReader->status.loadFromFile) {
if (pStatus->loadFromFile) {
bool exists = true;
int32_t code = loadDataInFiles(pReader, &exists);
} else { // no data in files, let's try in-memory buffer
} else { // no data in files, let's try the buffer
while(1) {
if (pStatus->pTableIter == NULL) {
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
if (pStatus->pTableIter == NULL) {
return false;
}
}
STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
initMemIterator(pBlockScanInfo, pReader);
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->endVersion};
buildInmemDataBlock(pReader, pBlockScanInfo, &maxKey);
if (pReader->pResBlock->info.rows > 0) {
return true;
}
// current table is exhausted, let's try the next table
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, &pStatus->pTableIter);
if (pStatus->pTableIter == NULL) {
return false;
}
}
}
} else if (pReader->type == BLOCK_LOAD_TABLESEQ_ORDER) {
} else if (pReader->type == BLOCK_LOAD_EXTERN_ORDER) {
} else {
ASSERT(0);
}
// if (pReader->loadType == BLOCK_LOAD_TABLE_SEQ_ORDER) {
// return loadDataBlockFromTableSeq(pReader);
......@@ -2957,32 +3183,10 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
}
void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
// SQueryFilePos* cur = &pReader->cur;
// uint64_t uid = 0;
// // there are data in file
// if (pReader->cur.fid != INT32_MIN) {
// SFileBlockInfo* pBlockInfo = &pReader->pDataBlockInfo[cur->slot];
// uid = pBlockInfo->pTableCheckInfo->tableId;
// } else {
// STableBlockScanInfo* pCheckInfo = taosArrayGet(pReader->pTableCheckInfo, pReader->activeIndex);
// uid = pCheckInfo->tableId;
// }
// tsdbDebug("data block generated, uid:%" PRIu64 " numOfRows:%d, tsrange:%" PRId64 " - %" PRId64 " %s", uid,
// cur->rows,
// cur->win.skey, cur->win.ekey, pReader->idStr);
// pDataBlockInfo->uid = uid;
// #if 0
// // for multi-group data query processing test purpose
// pDataBlockInfo->groupId = uid;
// #endif
// pDataBlockInfo->rows = cur->rows;
// pDataBlockInfo->window = cur->win;
ASSERT(pDataBlockInfo != NULL && pReader != NULL);
pDataBlockInfo->rows = pReader->pResBlock->info.rows;
pDataBlockInfo->uid = pReader->pResBlock->info.uid;
pDataBlockInfo->window = pReader->pResBlock->info.window;
}
int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
......@@ -3061,6 +3265,17 @@ int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader* pReader, SColumnDataAgg***
}
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
if (pReader->status.composedDataBlock) {
return pReader->pResBlock->pDataBlock;
} else {
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
SBlockData data = {0};
doLoadFileBlockData(pReader, &pReader->status.blockIter, pBlockScanInfo, &data);
// todo convert blockData to ssdatablock
}
// /**
// * In the following two cases, the data has been loaded to SColumnInfoData.
// * 1. data is from cache, 2. data block is not completed qualified to query time range
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册