提交 60d9acb7 编写于 作者: H Haojun Liao

feature(query): support last function cache and retrieve data.

上级 5d4d2ec1
...@@ -139,7 +139,7 @@ typedef struct SqlFunctionCtx { ...@@ -139,7 +139,7 @@ typedef struct SqlFunctionCtx {
struct SExprInfo *pExpr; struct SExprInfo *pExpr;
struct SDiskbasedBuf *pBuf; struct SDiskbasedBuf *pBuf;
struct SSDataBlock *pSrcBlock; struct SSDataBlock *pSrcBlock;
struct SSDataBlock *pDstBlock; // used by indifinite rows function to set selectivity struct SSDataBlock *pDstBlock; // used by indefinite rows function to set selectivity
int32_t curBufPage; int32_t curBufPage;
bool increase; bool increase;
bool isStream; bool isStream;
......
...@@ -128,8 +128,10 @@ typedef struct STsdbReader STsdbReader; ...@@ -128,8 +128,10 @@ typedef struct STsdbReader STsdbReader;
#define TIMEWINDOW_RANGE_CONTAINED 1 #define TIMEWINDOW_RANGE_CONTAINED 1
#define TIMEWINDOW_RANGE_EXTERNAL 2 #define TIMEWINDOW_RANGE_EXTERNAL 2
#define LASTROW_RETRIEVE_TYPE_ALL 0x1 #define CACHESCAN_RETRIEVE_TYPE_ALL 0x1
#define LASTROW_RETRIEVE_TYPE_SINGLE 0x2 #define CACHESCAN_RETRIEVE_TYPE_SINGLE 0x2
#define CACHESCAN_RETRIEVE_LAST_ROW 0x4
#define CACHESCAN_RETRIEVE_LAST 0x8
int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid); int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid);
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader, int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader,
...@@ -146,9 +148,9 @@ void *tsdbGetIdx(SMeta *pMeta); ...@@ -146,9 +148,9 @@ void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta);
uint64_t getReaderMaxVersion(STsdbReader *pReader); uint64_t getReaderMaxVersion(STsdbReader *pReader);
int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader); int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader);
int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
int32_t tsdbLastrowReaderClose(void *pReader); int32_t tsdbCacherowsReaderClose(void *pReader);
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity); void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include "tcommon.h" #include "tcommon.h"
#include "tsdb.h" #include "tsdb.h"
typedef struct SLastrowReader { typedef struct SCacheRowsReader {
SVnode* pVnode; SVnode* pVnode;
STSchema* pSchema; STSchema* pSchema;
uint64_t uid; uint64_t uid;
...@@ -27,9 +27,9 @@ typedef struct SLastrowReader { ...@@ -27,9 +27,9 @@ typedef struct SLastrowReader {
int32_t type; int32_t type;
int32_t tableIndex; // currently returned result tables int32_t tableIndex; // currently returned result tables
SArray* pTableList; // table id list SArray* pTableList; // table id list
} SLastrowReader; } SCacheRowsReader;
static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReader, const int32_t* slotIds) { static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds) {
ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock)); ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock));
int32_t numOfRows = pBlock->info.rows; int32_t numOfRows = pBlock->info.rows;
...@@ -61,8 +61,10 @@ static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReade ...@@ -61,8 +61,10 @@ static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReade
pBlock->info.rows += 1; pBlock->info.rows += 1;
} }
int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, void** pReader) { int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, void** pReader) {
SLastrowReader* p = taosMemoryCalloc(1, sizeof(SLastrowReader)); *pReader = NULL;
SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
if (p == NULL) { if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
...@@ -81,9 +83,17 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, ...@@ -81,9 +83,17 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList,
p->pTableList = pTableIdList; p->pTableList = pTableIdList;
p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES); p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES);
if (p->transferBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) { for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
if (IS_VAR_DATA_TYPE(p->pSchema->columns[i].type)) { if (IS_VAR_DATA_TYPE(p->pSchema->columns[i].type)) {
p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes); p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes);
if (p->transferBuf[i] == NULL) {
tsdbCacherowsReaderClose(p);
return TSDB_CODE_OUT_OF_MEMORY;
}
} }
} }
...@@ -91,8 +101,8 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, ...@@ -91,8 +101,8 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tsdbLastrowReaderClose(void* pReader) { int32_t tsdbCacherowsReaderClose(void* pReader) {
SLastrowReader* p = pReader; SCacheRowsReader* p = pReader;
if (p->pSchema != NULL) { if (p->pSchema != NULL) {
for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) { for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) {
...@@ -107,28 +117,56 @@ int32_t tsdbLastrowReaderClose(void* pReader) { ...@@ -107,28 +117,56 @@ int32_t tsdbLastrowReaderClose(void* pReader) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) { static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint64_t uid, STSRow** pRow, LRUHandle** h) {
int32_t code = TSDB_CODE_SUCCESS;
if ((pr->type & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW) {
code = tsdbCacheGetLastrowH(lruCache, uid, pr->pVnode->pTsdb, h);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// no data in the table of Uid
if (*h != NULL) {
*pRow = (STSRow*)taosLRUCacheValue(lruCache, *h);
}
} else {
code = tsdbCacheGetLastH(lruCache, uid, pr->pVnode->pTsdb, h);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// no data in the table of Uid
if (*h != NULL) {
SArray* pLast = (SArray*)taosLRUCacheValue(lruCache, *h);
tsdbCacheLastArray2Row(pLast, pRow, pr->pSchema);
}
}
return code;
}
int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) {
if (pReader == NULL || pResBlock == NULL) { if (pReader == NULL || pResBlock == NULL) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
SLastrowReader* pr = pReader; SCacheRowsReader* pr = pReader;
int32_t code = TSDB_CODE_SUCCESS;
SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache; SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache;
LRUHandle* h = NULL; LRUHandle* h = NULL;
STSRow* pRow = NULL; STSRow* pRow = NULL;
size_t numOfTables = taosArrayGetSize(pr->pTableList); size_t numOfTables = taosArrayGetSize(pr->pTableList);
// retrieve the only one last row of all tables in the uid list. // retrieve the only one last row of all tables in the uid list.
if (pr->type == LASTROW_RETRIEVE_TYPE_SINGLE) { if ((pr->type & CACHESCAN_RETRIEVE_TYPE_SINGLE) == CACHESCAN_RETRIEVE_TYPE_SINGLE) {
int64_t lastKey = INT64_MIN; int64_t lastKey = INT64_MIN;
bool internalResult = false; bool internalResult = false;
for (int32_t i = 0; i < numOfTables; ++i) { for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
int32_t code = tsdbCacheGetLastrowH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h); code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
// int32_t code = tsdbCacheGetLastH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h); if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -136,9 +174,6 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t ...@@ -136,9 +174,6 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
continue; continue;
} }
pRow = (STSRow*)taosLRUCacheValue(lruCache, h);
// SArray* pLast = (SArray*)taosLRUCacheValue(lruCache, h);
// tsdbCacheLastArray2Row(pLast, &pRow, pr->pSchema);
if (pRow->ts > lastKey) { if (pRow->ts > lastKey) {
// Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already // Set result row into the same rowIndex repeatly, so we need to check if the internal result row has already
// appended or not. // appended or not.
...@@ -155,25 +190,18 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t ...@@ -155,25 +190,18 @@ int32_t tsdbRetrieveLastRow(void* pReader, SSDataBlock* pResBlock, const int32_t
tsdbCacheRelease(lruCache, h); tsdbCacheRelease(lruCache, h);
} }
} else if (pr->type == LASTROW_RETRIEVE_TYPE_ALL) { } else if ((pr->type & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) {
for (int32_t i = pr->tableIndex; i < numOfTables; ++i) { for (int32_t i = pr->tableIndex; i < numOfTables; ++i) {
STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i);
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
int32_t code = tsdbCacheGetLastrowH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h); if (code != TSDB_CODE_SUCCESS) {
// int32_t code = tsdbCacheGetLastH(lruCache, pKeyInfo->uid, pr->pVnode->pTsdb, &h);
if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
// no data in the table of Uid
if (h == NULL) { if (h == NULL) {
continue; continue;
} }
pRow = (STSRow*)taosLRUCacheValue(lruCache, h);
// SArray* pLast = (SArray*)taosLRUCacheValue(lruCache, h);
// tsdbCacheLastArray2Row(pLast, &pRow, pr->pSchema);
saveOneRow(pRow, pResBlock, pr, slotIds); saveOneRow(pRow, pResBlock, pr, slotIds);
taosArrayPush(pTableUidList, &pKeyInfo->uid); taosArrayPush(pTableUidList, &pKeyInfo->uid);
......
...@@ -16,9 +16,9 @@ ...@@ -16,9 +16,9 @@
#include "osDef.h" #include "osDef.h"
#include "tsdb.h" #include "tsdb.h"
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
#define ALL_ROWS_CHECKED_INDEX (INT16_MIN) #define ALL_ROWS_CHECKED_INDEX (INT16_MIN)
#define DEFAULT_ROW_INDEX_VAL (-1) #define INITIAL_ROW_INDEX_VAL (-1)
typedef enum { typedef enum {
EXTERNAL_ROWS_PREV = 0x1, EXTERNAL_ROWS_PREV = 0x1,
...@@ -234,7 +234,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK ...@@ -234,7 +234,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
} }
for (int32_t j = 0; j < numOfTables; ++j) { for (int32_t j = 0; j < numOfTables; ++j) {
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid, .indexInBlockL = DEFAULT_ROW_INDEX_VAL}; STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid, .indexInBlockL = INITIAL_ROW_INDEX_VAL};
if (ASCENDING_TRAVERSE(pTsdbReader->order)) { if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) { if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) {
info.lastKey = pTsdbReader->window.skey; info.lastKey = pTsdbReader->window.skey;
...@@ -266,7 +266,9 @@ static void resetDataBlockScanInfo(SHashObj* pTableMap) { ...@@ -266,7 +266,9 @@ static void resetDataBlockScanInfo(SHashObj* pTableMap) {
p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter); p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
} }
p->delSkyline = taosArrayDestroy(p->delSkyline); p->fileDelIndex = -1;
p->delSkyline = taosArrayDestroy(p->delSkyline);
p->lastBlockDelIndex = INITIAL_ROW_INDEX_VAL;
} }
} }
...@@ -414,7 +416,7 @@ _err: ...@@ -414,7 +416,7 @@ _err:
return false; return false;
} }
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, SHashObj* pTableMap) { static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order) {
pIter->order = order; pIter->order = order;
pIter->index = -1; pIter->index = -1;
pIter->numOfBlocks = 0; pIter->numOfBlocks = 0;
...@@ -423,7 +425,6 @@ static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, SHashOb ...@@ -423,7 +425,6 @@ static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, SHashOb
} else { } else {
taosArrayClear(pIter->blockList); taosArrayClear(pIter->blockList);
} }
pIter->pTableMap = pTableMap;
} }
static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); } static void cleanupDataBlockIterator(SDataBlockIter* pIter) { taosArrayDestroy(pIter->blockList); }
...@@ -579,7 +580,7 @@ static void cleanupTableScanInfo(SHashObj* pTableMap) { ...@@ -579,7 +580,7 @@ static void cleanupTableScanInfo(SHashObj* pTableMap) {
} }
// reset the index in last block when handing a new file // reset the index in last block when handing a new file
px->indexInBlockL = DEFAULT_ROW_INDEX_VAL; px->indexInBlockL = INITIAL_ROW_INDEX_VAL;
tMapDataClear(&px->mapData); tMapDataClear(&px->mapData);
taosArrayClear(px->pBlockList); taosArrayClear(px->pBlockList);
} }
...@@ -2403,7 +2404,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { ...@@ -2403,7 +2404,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL); initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL);
int32_t index = pScanInfo->indexInBlockL; int32_t index = pScanInfo->indexInBlockL;
if (index == DEFAULT_ROW_INDEX_VAL || index == pLastBlockReader->lastBlockData.nRow) { if (index == INITIAL_ROW_INDEX_VAL || index == pLastBlockReader->lastBlockData.nRow) {
bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo); bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo);
if (!hasData) { // current table does not have rows in last block, try next table if (!hasData) { // current table does not have rows in last block, try next table
bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus); bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
...@@ -2470,7 +2471,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { ...@@ -2470,7 +2471,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
// note: the lastblock may be null here // note: the lastblock may be null here
initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL); initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL);
if (pScanInfo->indexInBlockL == DEFAULT_ROW_INDEX_VAL || pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) { if (pScanInfo->indexInBlockL == INITIAL_ROW_INDEX_VAL || pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) {
bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo); bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo);
} }
} }
...@@ -2582,7 +2583,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl ...@@ -2582,7 +2583,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl
code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks); code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
} else { // no block data, only last block exists } else { // no block data, only last block exists
tBlockDataReset(&pReader->status.fileBlockData); tBlockDataReset(&pReader->status.fileBlockData);
resetDataBlockIterator(pBlockIter, pReader->order, pReader->status.pTableMap); resetDataBlockIterator(pBlockIter, pReader->order);
} }
SLastBlockReader* pLReader = pReader->status.fileIter.pLastBlockReader; SLastBlockReader* pLReader = pReader->status.fileIter.pLastBlockReader;
...@@ -2654,7 +2655,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { ...@@ -2654,7 +2655,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
initBlockDumpInfo(pReader, pBlockIter); initBlockDumpInfo(pReader, pBlockIter);
} else if (taosArrayGetSize(pReader->status.fileIter.pLastBlockReader->pBlockL) > 0) { // data blocks in current file are exhausted, let's try the next file now } else if (taosArrayGetSize(pReader->status.fileIter.pLastBlockReader->pBlockL) > 0) { // data blocks in current file are exhausted, let's try the next file now
tBlockDataReset(&pReader->status.fileBlockData); tBlockDataReset(&pReader->status.fileBlockData);
resetDataBlockIterator(pBlockIter, pReader->order, pReader->status.pTableMap); resetDataBlockIterator(pBlockIter, pReader->order);
goto _begin; goto _begin;
} else { } else {
code = initForFirstBlockInFile(pReader, pBlockIter); code = initForFirstBlockInFile(pReader, pBlockIter);
...@@ -3276,7 +3277,7 @@ int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) { ...@@ -3276,7 +3277,7 @@ int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) {
ASSERT(pReader != NULL); ASSERT(pReader != NULL);
taosHashClear(pReader->status.pTableMap); taosHashClear(pReader->status.pTableMap);
STableBlockScanInfo info = {.lastKey = 0, .uid = uid, .indexInBlockL = DEFAULT_ROW_INDEX_VAL}; STableBlockScanInfo info = {.lastKey = 0, .uid = uid, .indexInBlockL = INITIAL_ROW_INDEX_VAL};
taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info)); taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
} }
...@@ -3372,7 +3373,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl ...@@ -3372,7 +3373,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
SDataBlockIter* pBlockIter = &pReader->status.blockIter; SDataBlockIter* pBlockIter = &pReader->status.blockIter;
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap); resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
// no data in files, let's try buffer in memory // no data in files, let's try buffer in memory
if (pReader->status.fileIter.numOfFiles == 0) { if (pReader->status.fileIter.numOfFiles == 0) {
...@@ -3393,7 +3394,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl ...@@ -3393,7 +3394,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
} }
initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader); initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader);
resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order, pReader->status.pTableMap); resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order);
// no data in files, let's try buffer in memory // no data in files, let's try buffer in memory
if (pPrevReader->status.fileIter.numOfFiles == 0) { if (pPrevReader->status.fileIter.numOfFiles == 0) {
...@@ -3696,7 +3697,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { ...@@ -3696,7 +3697,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
tsdbDataFReaderClose(&pReader->pFileReader); tsdbDataFReaderClose(&pReader->pFileReader);
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap); resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
resetDataBlockScanInfo(pReader->status.pTableMap); resetDataBlockScanInfo(pReader->status.pTableMap);
int32_t code = 0; int32_t code = 0;
......
...@@ -908,7 +908,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy ...@@ -908,7 +908,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
......
...@@ -25,24 +25,27 @@ ...@@ -25,24 +25,27 @@
#include "thash.h" #include "thash.h"
#include "ttypes.h" #include "ttypes.h"
static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator); static SSDataBlock* doScanCache(SOperatorInfo* pOperator);
static void destroyLastrowScanOperator(void* param); static void destroyLastrowScanOperator(void* param);
static int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds); static int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds);
SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo)); SLastrowScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SLastrowScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error; goto _error;
} }
pInfo->readHandle = *readHandle; pInfo->readHandle = *readHandle;
pInfo->pRes = createResDataBlock(pScanNode->scan.node.pOutputDataBlockDesc); pInfo->pRes = createResDataBlock(pScanNode->scan.node.pOutputDataBlockDesc);
int32_t numOfCols = 0; int32_t numOfCols = 0;
pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->scan.pScanCols, pScanNode->scan.node.pOutputDataBlockDesc, &numOfCols, pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->scan.pScanCols, pScanNode->scan.node.pOutputDataBlockDesc, &numOfCols,
COL_MATCH_FROM_COL_ID); COL_MATCH_FROM_COL_ID);
int32_t code = extractTargetSlotId(pInfo->pColMatchInfo, pTaskInfo, &pInfo->pSlotIds); code = extractTargetSlotId(pInfo->pColMatchInfo, pTaskInfo, &pInfo->pSlotIds);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
...@@ -55,13 +58,17 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead ...@@ -55,13 +58,17 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead
// partition by tbname // partition by tbname
if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) { if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) {
pInfo->retrieveType = LASTROW_RETRIEVE_TYPE_ALL; pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL|CACHESCAN_RETRIEVE_LAST_ROW;
tsdbLastRowReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList, code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList,
taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader); taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false); pInfo->pBufferredRes = createOneDataBlock(pInfo->pRes, false);
blockDataEnsureCapacity(pInfo->pBufferredRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pBufferredRes, pOperator->resultInfo.capacity);
} else { // by tags } else { // by tags
pInfo->retrieveType = LASTROW_RETRIEVE_TYPE_SINGLE; pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_SINGLE|CACHESCAN_RETRIEVE_LAST_ROW;
} }
if (pScanNode->scan.pScanPseudoCols != NULL) { if (pScanNode->scan.pScanPseudoCols != NULL) {
...@@ -80,19 +87,19 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead ...@@ -80,19 +87,19 @@ SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SRead
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doScanLastrow, NULL, NULL, destroyLastrowScanOperator, NULL, NULL, NULL); createOperatorFpSet(operatorDummyOpenFn, doScanCache, NULL, NULL, destroyLastrowScanOperator, NULL, NULL, NULL);
pOperator->cost.openCost = 0; pOperator->cost.openCost = 0;
return pOperator; return pOperator;
_error: _error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; pTaskInfo->code = code;
taosMemoryFree(pInfo); destroyLastrowScanOperator(pInfo);
taosMemoryFree(pOperator); taosMemoryFree(pOperator);
return NULL; return NULL;
} }
SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} }
...@@ -109,14 +116,14 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { ...@@ -109,14 +116,14 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) {
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
// check if it is a group by tbname // check if it is a group by tbname
if (pInfo->retrieveType == LASTROW_RETRIEVE_TYPE_ALL) { if ((pInfo->retrieveType & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) {
if (pInfo->indexOfBufferedRes >= pInfo->pBufferredRes->info.rows) { if (pInfo->indexOfBufferedRes >= pInfo->pBufferredRes->info.rows) {
blockDataCleanup(pInfo->pBufferredRes); blockDataCleanup(pInfo->pBufferredRes);
taosArrayClear(pInfo->pUidList); taosArrayClear(pInfo->pUidList);
int32_t code = tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, pInfo->pUidList); int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, pInfo->pUidList);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
// check for tag values // check for tag values
...@@ -172,11 +179,11 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { ...@@ -172,11 +179,11 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) {
while (pInfo->currentGroupIndex < totalGroups) { while (pInfo->currentGroupIndex < totalGroups) {
SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex); SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex);
tsdbLastRowReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList, tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList,
taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader); taosArrayGetSize(pInfo->pColMatchInfo), &pInfo->pLastrowReader);
taosArrayClear(pInfo->pUidList); taosArrayClear(pInfo->pUidList);
int32_t code = tsdbRetrieveLastRow(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList); int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code); longjmp(pTaskInfo->env, code);
} }
...@@ -200,7 +207,7 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { ...@@ -200,7 +207,7 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) {
} }
} }
tsdbLastrowReaderClose(pInfo->pLastrowReader); tsdbCacherowsReaderClose(pInfo->pLastrowReader);
return pInfo->pRes; return pInfo->pRes;
} }
} }
......
...@@ -4138,7 +4138,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4138,7 +4138,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return NULL; return NULL;
} }
pOperator = createLastrowScanOperator(pScanNode, pHandle, pTaskInfo); pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo); pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo);
} else { } else {
......
...@@ -3643,7 +3643,7 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -3643,7 +3643,7 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = getTopBotOutputInfo(pCtx); STopBotRes* pRes = getTopBotOutputInfo(pCtx);
int16_t type = pCtx->input.pData[0]->info.type; int16_t type = pCtx->pExpr->base.resSchema.type;
int32_t slotId = pCtx->pExpr->base.resSchema.slotId; int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
......
...@@ -293,16 +293,6 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) { ...@@ -293,16 +293,6 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
} }
} }
// int32_t pos = listNEles(pBuf->lruList);
// SListIter iter1 = {0};
// tdListInitIter(pBuf->lruList, &iter1, TD_LIST_BACKWARD);
// SListNode* pn1 = NULL;
// while((pn1 = tdListNext(&iter1)) != NULL) {
// SPageInfo* pageInfo = *(SPageInfo**) pn1->data;
// printf("page %d is used, dirty:%d, pos:%d\n", pageInfo->pageId, pageInfo->dirty, pos - 1);
// pos -= 1;
// }
return pn; return pn;
} }
...@@ -661,10 +651,14 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) { ...@@ -661,10 +651,14 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) {
pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0, pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id); listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id);
printf( if (ps->loadPages > 0) {
"Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb\n", printf(
ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->loadPages, "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb\n",
ps->loadBytes / (1024.0 * ps->loadPages)); ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f,
ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages));
} else {
printf("no page loaded\n");
}
} }
void clearDiskbasedBuf(SDiskbasedBuf* pBuf) { void clearDiskbasedBuf(SDiskbasedBuf* pBuf) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册