提交 f7648eb2 编写于 作者: H Hongze Cheng

Merge branch 'feat/tsdb_refact' of https://github.com/taosdata/TDengine into feat/tsdb_refact

...@@ -251,9 +251,6 @@ int32_t tsdbOpenCache(STsdb *pTsdb); ...@@ -251,9 +251,6 @@ int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(SLRUCache *pCache); void tsdbCloseCache(SLRUCache *pCache);
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row); int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row);
// bug api, deprecated, USE H version
int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow);
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h); int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h); int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, LRUHandle **h);
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h); int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
......
...@@ -22,12 +22,6 @@ ...@@ -22,12 +22,6 @@
.rows = (_block)->numOfRows, \ .rows = (_block)->numOfRows, \
.uid = (_checkInfo)->tableId}) .uid = (_checkInfo)->tableId})
enum {
TSDB_CACHED_TYPE_NONE = 0,
TSDB_CACHED_TYPE_LASTROW = 1,
TSDB_CACHED_TYPE_LAST = 2,
};
typedef struct SQueryFilePos { typedef struct SQueryFilePos {
int32_t fid; int32_t fid;
int32_t slot; int32_t slot;
...@@ -46,12 +40,6 @@ typedef struct SDataBlockLoadInfo { ...@@ -46,12 +40,6 @@ typedef struct SDataBlockLoadInfo {
SArray* pLoadedCols; SArray* pLoadedCols;
} SDataBlockLoadInfo; } SDataBlockLoadInfo;
enum {
CHECKINFO_CHOSEN_MEM = 0,
CHECKINFO_CHOSEN_IMEM = 1,
CHECKINFO_CHOSEN_BOTH = 2 // for update=2(merge case)
};
typedef struct STableBlockScanInfo { typedef struct STableBlockScanInfo {
uint64_t uid; uint64_t uid;
TSKEY lastKey; TSKEY lastKey;
...@@ -74,10 +62,10 @@ typedef struct SBlockOrderWrapper { ...@@ -74,10 +62,10 @@ typedef struct SBlockOrderWrapper {
} SBlockOrderWrapper; } SBlockOrderWrapper;
typedef struct SBlockOrderSupporter { typedef struct SBlockOrderSupporter {
int32_t numOfTables;
SBlockOrderWrapper** pDataBlockInfo; SBlockOrderWrapper** pDataBlockInfo;
int32_t* indexPerTable; int32_t* indexPerTable;
int32_t* numOfBlocksPerTable; int32_t* numOfBlocksPerTable;
int32_t numOfTables;
} SBlockOrderSupporter; } SBlockOrderSupporter;
typedef struct SIOCostSummary { typedef struct SIOCostSummary {
...@@ -123,11 +111,6 @@ typedef struct SVersionRange { ...@@ -123,11 +111,6 @@ typedef struct SVersionRange {
uint64_t maxVer; uint64_t maxVer;
} SVersionRange; } SVersionRange;
typedef struct SComposedDataBlock {
bool composed;
int32_t rows;
} SComposedDataBlock;
typedef struct SReaderStatus { typedef struct SReaderStatus {
SQueryFilePos cur; // current position SQueryFilePos cur; // current position
bool loadFromFile; // check file stage bool loadFromFile; // check file stage
...@@ -289,13 +272,10 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK ...@@ -289,13 +272,10 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
// return pNew; // return pNew;
// } // }
static bool isEmptyQueryTimeWindow(STsdbReader* pTsdbReader) { static bool isEmptyQueryTimeWindow(STimeWindow* pWindow, int32_t order) {
ASSERT(pTsdbReader != NULL); ASSERT(pWindow != NULL);
bool asc = ASCENDING_TRAVERSE(order);
STimeWindow* w = &pTsdbReader->window; return ((asc && pWindow->skey > pWindow->ekey) || (!asc && pWindow->ekey > pWindow->skey));
bool asc = ASCENDING_TRAVERSE(pTsdbReader->order);
return ((asc && w->skey > w->ekey) || (!asc && w->ekey > w->skey));
} }
// // Update the query time window according to the data time to live(TTL) information, in order to avoid to return // // Update the query time window according to the data time to live(TTL) information, in order to avoid to return
...@@ -334,34 +314,29 @@ static void setQueryTimewindow(STsdbReader* pReader, SQueryTableDataCond* pCond, ...@@ -334,34 +314,29 @@ static void setQueryTimewindow(STsdbReader* pReader, SQueryTableDataCond* pCond,
// } // }
} }
static void checkResultSize(const SQueryTableDataCond* pCond, STsdbReader* pReader) { static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* capacity) {
int32_t rowLen = 0; int32_t rowLen = 0;
for (int32_t i = 0; i < pCond->numOfCols; ++i) { for (int32_t i = 0; i < pCond->numOfCols; ++i) {
rowLen += pCond->colList[i].bytes; rowLen += pCond->colList[i].bytes;
} }
// make sure the output SSDataBlock size be less than 2MB. // make sure the output SSDataBlock size be less than 2MB.
int32_t TWOMB = 2 * 1024 * 1024; const int32_t TWOMB = 2 * 1024 * 1024;
if (pReader->capacity * rowLen > TWOMB) { if ((*capacity) * rowLen > TWOMB) {
pReader->capacity = TWOMB / rowLen; (*capacity) = TWOMB / rowLen;
} }
} }
// init file iterator // init file iterator
static int32_t initFileIterator(SFileSetIter* pIter, const STsdbFSState* pFState) { static int32_t initFileIterator(SFileSetIter* pIter, const STsdbFSState* pFState, const char* idstr) {
pIter->index = -1; pIter->index = -1;
pIter->numOfFiles = taosArrayGetSize(pFState->aDFileSet); pIter->numOfFiles = taosArrayGetSize(pFState->aDFileSet);
pIter->pFileList = taosArrayDup(pFState->aDFileSet); pIter->pFileList = taosArrayDup(pFState->aDFileSet);
tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, idstr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void resetDataBlockIterator(SDataBlockIter* pIter) {
pIter->numOfBlocks = -1;
pIter->index = -1;
pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
}
static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader* pReader) { static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader* pReader) {
pIter->index += 1; pIter->index += 1;
if (pIter->index >= pIter->numOfFiles) { if (pIter->index >= pIter->numOfFiles) {
...@@ -394,6 +369,12 @@ static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader* ...@@ -394,6 +369,12 @@ static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader*
return false; return false;
} }
static void resetDataBlockIterator(SDataBlockIter* pIter) {
pIter->index = -1;
pIter->numOfBlocks = -1;
pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
}
static void initReaderStatus(SReaderStatus* pStatus) { static void initReaderStatus(SReaderStatus* pStatus) {
pStatus->cur.fid = INT32_MIN; pStatus->cur.fid = INT32_MIN;
pStatus->cur.win = TSWINDOW_INITIALIZER; pStatus->cur.win = TSWINDOW_INITIALIZER;
...@@ -427,7 +408,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd ...@@ -427,7 +408,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
setQueryTimewindow(pReader, pCond, 0); setQueryTimewindow(pReader, pCond, 0);
if (pCond->numOfCols > 0) { if (pCond->numOfCols > 0) {
checkResultSize(pCond, pReader); limitOutputBufferSize(pCond, &pReader->capacity);
// allocate buffer in order to load data blocks from file // allocate buffer in order to load data blocks from file
pReader->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg)); pReader->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
...@@ -450,7 +431,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd ...@@ -450,7 +431,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
} }
STsdbFSState* pFState = pReader->pTsdb->fs->cState; STsdbFSState* pFState = pReader->pTsdb->fs->cState;
initFileIterator(&pReader->status.fileIter, pFState); initFileIterator(&pReader->status.fileIter, pFState, pReader->idStr);
resetDataBlockIterator(&pReader->status.blockIter); resetDataBlockIterator(&pReader->status.blockIter);
// no data in files, let's try buffer in memory // no data in files, let's try buffer in memory
...@@ -520,93 +501,6 @@ _end: ...@@ -520,93 +501,6 @@ _end:
// return res; // return res;
// } // }
// static bool initTableMemIterator(STsdbReader* pHandle, STableBlockScanInfo* pCheckInfo) {
// if (pCheckInfo->initBuf) {
// return true;
// }
// pCheckInfo->initBuf = true;
// int32_t order = pHandle->order;
// STbData* pMem = NULL;
// STbData* pIMem = NULL;
// int8_t backward = (pHandle->order == TSDB_ORDER_DESC) ? 1 : 0;
// TSKEY tLastKey = keyToTkey(pCheckInfo->lastKey);
// if (pHandle->pTsdb->mem != NULL) {
// tsdbGetTbDataFromMemTable(pHandle->pTsdb->mem, pCheckInfo->suid, pCheckInfo->tableId, &pMem);
// if (pMem != NULL) {
// tsdbTbDataIterCreate(pMem, &(TSDBKEY){.version = 0, .ts = tLastKey}, backward, &pCheckInfo->iter);
// }
// }
// if (pHandle->pTsdb->imem != NULL) {
// tsdbGetTbDataFromMemTable(pHandle->pTsdb->mem, pCheckInfo->suid, pCheckInfo->tableId, &pIMem);
// if (pIMem != NULL) {
// tsdbTbDataIterCreate(pIMem, &(TSDBKEY){.version = 0, .ts = tLastKey}, backward, &pCheckInfo->iiter);
// }
// }
// // both iterators are NULL, no data in buffer right now
// if (pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL) {
// return false;
// }
// bool memEmpty =
// (pCheckInfo->iter == NULL) || (pCheckInfo->iter != NULL && !tsdbTbDataIterGet(pCheckInfo->iter, NULL));
// bool imemEmpty =
// (pCheckInfo->iiter == NULL) || (pCheckInfo->iiter != NULL && !tsdbTbDataIterGet(pCheckInfo->iiter, NULL));
// if (memEmpty && imemEmpty) { // buffer is empty
// return false;
// }
// if (!memEmpty) {
// TSDBROW row;
// tsdbTbDataIterGet(pCheckInfo->iter, &row);
// TSKEY key = row.pTSRow->ts; // first timestamp in buffer
// tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
// "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s",
// pHandle, pCheckInfo->tableId, key, order, pMem->minKey.ts, pMem->maxKey.ts, pCheckInfo->lastKey,
// pMem->sl.size, pHandle->idStr);
// if (ASCENDING_TRAVERSE(order)) {
// assert(pCheckInfo->lastKey <= key);
// } else {
// assert(pCheckInfo->lastKey >= key);
// }
// } else {
// tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
// }
// if (!imemEmpty) {
// TSDBROW row;
// tsdbTbDataIterGet(pCheckInfo->iter, &row);
// TSKEY key = row.pTSRow->ts; // first timestamp in buffer
// tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
// "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%" PRId64 ", %s",
// pHandle, pCheckInfo->tableId, key, order, pIMem->minKey.ts, pIMem->maxKey.ts, pCheckInfo->lastKey,
// pIMem->sl.size, pHandle->idStr);
// if (ASCENDING_TRAVERSE(order)) {
// assert(pCheckInfo->lastKey <= key);
// } else {
// assert(pCheckInfo->lastKey >= key);
// }
// } else {
// tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pHandle, pCheckInfo->tableId, pHandle->idStr);
// }
// return true;
// }
// static void destroyTableMemIterator(STableBlockScanInfo* pCheckInfo) {
// tsdbTbDataIterDestroy(pCheckInfo->iter);
// tsdbTbDataIterDestroy(pCheckInfo->iiter);
// }
// static TSKEY extractFirstTraverseKey(STableBlockScanInfo* pCheckInfo, int32_t order, int32_t update, TDRowVerT maxVer) { // static TSKEY extractFirstTraverseKey(STableBlockScanInfo* pCheckInfo, int32_t order, int32_t update, TDRowVerT maxVer) {
// TSDBROW row = {0}; // TSDBROW row = {0};
// STSRow *rmem = NULL, *rimem = NULL; // STSRow *rmem = NULL, *rimem = NULL;
...@@ -958,77 +852,79 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_ ...@@ -958,77 +852,79 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void setBlockDumpCompleted(SFileBlockDumpInfo* pDumpInfo, SBlockData* pBlockData) { static void setBlockDumpCompleted(SFileBlockDumpInfo* pDumpInfo, SBlock* pBlock, int32_t order) {
pDumpInfo->rowIndex = pBlockData->nRow; int32_t step = ASCENDING_TRAVERSE(order)? 1:-1;
pDumpInfo->totalRows = pBlockData->nRow;
pDumpInfo->lastKey = pBlockData->aTSKEY[pBlockData->nRow - 1] + 1; // todo step value pDumpInfo->rowIndex = pBlock->nRow;
pDumpInfo->totalRows = pBlock->nRow;
pDumpInfo->lastKey = pBlock->maxKey.ts + step;
}
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal, SBlockLoadSuppInfo* pSup) {
if (IS_VAR_DATA_TYPE(pColVal->type)) {
if (pColVal->isNull) {
colDataAppendNULL(pColInfoData, rowIndex);
} else {
varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
colDataAppend(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false);
}
} else {
colDataAppend(pColInfoData, rowIndex, (const char*)&pColVal->value, pColVal->isNull);
}
} }
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx); SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
SSDataBlock* pResBlock = pReader->pResBlock;
uint8_t *pb = NULL, *pb1 = NULL;
int32_t code = tsdbReadBlockData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pBlockData, &pb, &pb1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
for(int32_t i = 0; i < taosArrayGetSize(pReader->pResBlock->pDataBlock); ++i) {
SColumnInfoData* pColData = taosArrayGet(pReader->pResBlock->pDataBlock, i);
if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
for (int32_t j = 0; j < pBlockData->nRow; ++j) {
colDataAppend(pColData, j, (const char*)&pBlockData->aTSKEY[j], false);
}
} else {
SColVal cv = {0};
SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, pReader->suppInfo.slotIds[i] - 1);
for (int32_t j = 0; j < pBlockData->nRow; ++j) {
tColDataGetValue(pData, j, &cv);
colDataAppend(pColData, j, (const char*)&cv.value, cv.isNull);
}
}
}
pReader->pResBlock->info.rows = pBlockData->nRow; SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
setBlockDumpCompleted(&pReader->status.fBlockDumpInfo, pBlockData);
/*
int32_t ret = tsdbLoadBlockDataCols(&(pReader->rhelper), pBlock, pCheckInfo->pCompInfo, colIds,
(int)(QH_GET_NUM_OF_COLS(pReader)), true);
if (ret != TSDB_CODE_SUCCESS) {
int32_t c = terrno;
assert(c != TSDB_CODE_SUCCESS);
goto _error;
}
SDataBlockLoadInfo* pBlockLoadInfo = &pReader->dataBlockLoadInfo; uint8_t *pb = NULL, *pb1 = NULL;
int32_t code = tsdbReadBlockData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pBlockData, &pb, &pb1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pBlockLoadInfo->fileGroup = pReader->pFileGroup; int32_t numOfCols = blockDataGetNumOfCols(pResBlock);
pBlockLoadInfo->slot = pReader->cur.slot;
pBlockLoadInfo->uid = pCheckInfo->tableId;
SDataCols* pCols = pReader->rhelper.pDCols[0]; SColVal cv = {0};
assert(pCols->numOfRows != 0 && pCols->numOfRows <= pBlock->numOfRows); for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
for (int32_t j = 0; j < pBlockData->nRow; ++j) {
colDataAppend(pColData, j, (const char*)&pBlockData->aTSKEY[j], false);
}
} else {
SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, pSupInfo->slotIds[i] - 1);
for (int32_t j = 0; j < pBlockData->nRow; ++j) {
tColDataGetValue(pData, j, &cv);
doCopyColVal(pColData, j, i, &cv, pSupInfo);
}
}
}
pBlock->numOfRows = pCols->numOfRows; pResBlock->info.rows = pBlockData->nRow;
*/ setBlockDumpCompleted(&pReader->status.fBlockDumpInfo, pBlock, pReader->order);
int64_t elapsedTime = (taosGetTimestampUs() - st); int64_t elapsedTime = (taosGetTimestampUs() - st);
pReader->cost.blockLoadTime += elapsedTime; pReader->cost.blockLoadTime += elapsedTime;
tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%" PRId64 " us, %s", ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%" PRId64 " us, %s",
pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlockData->nRow, pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlockData->nRow,
pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr); pBlock->minVersion, pBlock->maxVersion, elapsedTime, pReader->idStr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_error: _error:
tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, %s", tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow, pReader->idStr); ", rows:%d, %s",
return code; pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
pReader->idStr);
return code;
} }
// static int32_t handleDataMergeIfNeeded(STsdbReader* pTsdbReadHandle, SBlock* pBlock, STableBlockScanInfo* pCheckInfo) { // static int32_t handleDataMergeIfNeeded(STsdbReader* pTsdbReadHandle, SBlock* pBlock, STableBlockScanInfo* pCheckInfo) {
...@@ -2492,9 +2388,18 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* ...@@ -2492,9 +2388,18 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
if (d != NULL) { if (d != NULL) {
code = tsdbTbDataIterCreate(d, &startKey, 0, &pBlockScanInfo->iter); code = tsdbTbDataIterCreate(d, &startKey, 0, &pBlockScanInfo->iter);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s",
pReader, pBlockScanInfo->uid, tstrerror(code), pReader->idStr);
return code; return code;
} else {
tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 " %s",
pReader, pBlockScanInfo->uid, pReader->window.skey, pReader->order, d->minKey, d->maxKey,
pReader->idStr);
} }
} }
} else {
tsdbDebug("%p uid:%" PRId64 ", no data in mem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
} }
STbData* di = NULL; STbData* di = NULL;
...@@ -2503,9 +2408,18 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* ...@@ -2503,9 +2408,18 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
if (di != NULL) { if (di != NULL) {
code = tsdbTbDataIterCreate(di, &startKey, 0, &pBlockScanInfo->iiter); code = tsdbTbDataIterCreate(di, &startKey, 0, &pBlockScanInfo->iiter);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s",
pReader, pBlockScanInfo->uid, tstrerror(code), pReader->idStr);
return code; return code;
} else {
tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 " %s",
pReader, pBlockScanInfo->uid, pReader->window.skey, pReader->order, di->minKey, di->maxKey,
pReader->idStr);
} }
} }
} else {
tsdbDebug("%p uid:%" PRId64 ", no data in imem, %s", pReader, pBlockScanInfo->uid, pReader->idStr);
} }
pBlockScanInfo->iterInit = true; pBlockScanInfo->iterInit = true;
...@@ -2605,24 +2519,24 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { ...@@ -2605,24 +2519,24 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
// 2. current block should not overlap with next neighbor block // 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other // 3. current timestamp should not be overlap with each other
SDataBlockInfo* pInfo = &pReader->pResBlock->info; SDataBlockInfo* pInfo = &pReader->pResBlock->info;
pInfo->rows = pBlock->nRow; pInfo->rows = pBlock->nRow;
pInfo->uid = pScanInfo->uid; pInfo->uid = pScanInfo->uid;
pInfo->window.skey = pBlock->minKey.ts; pInfo->window = (STimeWindow) {.skey = pBlock->minKey.ts, .ekey = pBlock->maxKey.ts};
pInfo->window.ekey = pBlock->maxKey.ts;
setComposedBlockFlag(pReader, false); setComposedBlockFlag(pReader, false);
setBlockDumpCompleted(&pStatus->fBlockDumpInfo, pBlock, pReader->order);
} }
return code; return code;
} }
static int32_t buildBlockFromBufferSeqentially(STsdbReader* pReader) { static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
while(1) { while(1) {
if (pStatus->pTableIter == NULL) { if (pStatus->pTableIter == NULL) {
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL); pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
if (pStatus->pTableIter == NULL) { if (pStatus->pTableIter == NULL) {
return false; return TSDB_CODE_SUCCESS;
} }
} }
...@@ -2630,20 +2544,26 @@ static int32_t buildBlockFromBufferSeqentially(STsdbReader* pReader) { ...@@ -2630,20 +2544,26 @@ static int32_t buildBlockFromBufferSeqentially(STsdbReader* pReader) {
initMemIterator(pBlockScanInfo, pReader); initMemIterator(pBlockScanInfo, pReader);
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer}; TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
buildInmemDataBlock(pReader, pBlockScanInfo, &maxKey); int32_t code = buildInmemDataBlock(pReader, pBlockScanInfo, &maxKey);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pReader->pResBlock->info.rows > 0) { if (pReader->pResBlock->info.rows > 0) {
return true; return TSDB_CODE_SUCCESS;
} }
// current table is exhausted, let's try the next table // current table is exhausted, let's try the next table
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter); pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
if (pStatus->pTableIter == NULL) { if (pStatus->pTableIter == NULL) {
return false; return TSDB_CODE_SUCCESS;
} }
} }
} }
static int32_t buildBlockFromFiles(STsdbReader* pReader) { static int32_t buildBlockFromFiles(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SFileSetIter* pFIter = &pStatus->fileIter; SFileSetIter* pFIter = &pStatus->fileIter;
...@@ -2652,7 +2572,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { ...@@ -2652,7 +2572,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
if (pFIter->index < pFIter->numOfFiles) { if (pFIter->index < pFIter->numOfFiles) {
if (pReader->status.blockIter.index == -1) { if (pReader->status.blockIter.index == -1) {
int32_t numOfBlocks = 0; int32_t numOfBlocks = 0;
int32_t code = moveToNextFile(pReader, &numOfBlocks); code = moveToNextFile(pReader, &numOfBlocks);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2664,7 +2584,9 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { ...@@ -2664,7 +2584,9 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
} }
code = doBuildDataBlock(pReader); code = doBuildDataBlock(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else { } else {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
...@@ -2676,7 +2598,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { ...@@ -2676,7 +2598,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
bool hasNext = blockIteratorNext(&pReader->status.blockIter); bool hasNext = blockIteratorNext(&pReader->status.blockIter);
if (!hasNext) { // current file is exhausted, let's try the next file if (!hasNext) { // current file is exhausted, let's try the next file
int32_t numOfBlocks = 0; int32_t numOfBlocks = 0;
int32_t code = moveToNextFile(pReader, &numOfBlocks); code = moveToNextFile(pReader, &numOfBlocks);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
...@@ -2692,15 +2614,21 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { ...@@ -2692,15 +2614,21 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
return code; return code;
} }
doBuildDataBlock(pReader); code = doBuildDataBlock(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} }
} else { // try next data block in current file } else { // try next data block in current file
blockIteratorNext(pBlockIter); blockIteratorNext(pBlockIter);
doBuildDataBlock(pReader); code = doBuildDataBlock(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} }
} else { } else {
buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo); code = buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo);
return TSDB_CODE_SUCCESS; return code;
} }
// repeat the previous procedure. // repeat the previous procedure.
...@@ -2708,7 +2636,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { ...@@ -2708,7 +2636,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
} }
} }
return TSDB_CODE_SUCCESS; return code;
} }
// // todo not unref yet, since it is not support multi-group interpolation query // // todo not unref yet, since it is not support multi-group interpolation query
...@@ -2901,18 +2829,7 @@ int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow ...@@ -2901,18 +2829,7 @@ int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow
colDataAppend(pColInfoData, numOfRows, (const char*) &pTSRow->ts, false); colDataAppend(pColInfoData, numOfRows, (const char*) &pTSRow->ts, false);
} else { } else {
tTSRowGetVal(pTSRow, pReader->pSchema, slotId, &colVal); tTSRowGetVal(pTSRow, pReader->pSchema, slotId, &colVal);
doCopyColVal(pColInfoData, i, numOfRows, &colVal, pSupInfo);
if (IS_VAR_DATA_TYPE(colVal.type)) {
if (colVal.isNull) {
colDataAppendNULL(pColInfoData, numOfRows);
} else {
varDataSetLen(pSupInfo->buildBuf[i], colVal.value.nData);
memcpy(varDataVal(pSupInfo->buildBuf[i]), colVal.value.pData, colVal.value.nData);
colDataAppend(pColInfoData, numOfRows, pSupInfo->buildBuf[i], false);
}
} else {
colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, colVal.isNull);
}
} }
} }
...@@ -3198,7 +3115,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl ...@@ -3198,7 +3115,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
} }
STsdbReader* pReader = *ppReader; STsdbReader* pReader = *ppReader;
if (isEmptyQueryTimeWindow(pReader)) { if (isEmptyQueryTimeWindow(&pReader->window, pReader->order)) {
tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr); tsdbDebug("%p query window not overlaps with the data set, no result returned, %s", pReader, pReader->idStr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -3259,7 +3176,7 @@ void tsdbReaderClose(STsdbReader* pReader) { ...@@ -3259,7 +3176,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
taosMemoryFreeClear(pReader->suppInfo.plist); taosMemoryFreeClear(pReader->suppInfo.plist);
taosMemoryFree(pReader->suppInfo.slotIds); taosMemoryFree(pReader->suppInfo.slotIds);
if (!isEmptyQueryTimeWindow(pReader)) { if (!isEmptyQueryTimeWindow(&pReader->window, pReader->order)) {
// tsdbMayUnTakeMemSnapshot(pTsdbReadHandle); // tsdbMayUnTakeMemSnapshot(pTsdbReadHandle);
} else { } else {
ASSERT(pReader->status.pTableMap == NULL); ASSERT(pReader->status.pTableMap == NULL);
...@@ -3291,7 +3208,7 @@ void tsdbReaderClose(STsdbReader* pReader) { ...@@ -3291,7 +3208,7 @@ void tsdbReaderClose(STsdbReader* pReader) {
} }
bool tsdbNextDataBlock(STsdbReader* pReader) { bool tsdbNextDataBlock(STsdbReader* pReader) {
if (isEmptyQueryTimeWindow(pReader)) { if (isEmptyQueryTimeWindow(&pReader->window, pReader->order)) {
return false; return false;
} }
...@@ -3313,11 +3230,11 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { ...@@ -3313,11 +3230,11 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
if (pBlock->info.rows > 0) { if (pBlock->info.rows > 0) {
return true; return true;
} else { } else {
buildBlockFromBufferSeqentially(pReader); buildBlockFromBufferSequentially(pReader);
return pBlock->info.rows > 0; return pBlock->info.rows > 0;
} }
} else { // no data in files, let's try the buffer } else { // no data in files, let's try the buffer
buildBlockFromBufferSeqentially(pReader); buildBlockFromBufferSequentially(pReader);
return pBlock->info.rows > 0; return pBlock->info.rows > 0;
} }
} else if (pReader->type == BLOCK_LOAD_TABLESEQ_ORDER) { } else if (pReader->type == BLOCK_LOAD_TABLESEQ_ORDER) {
...@@ -3370,13 +3287,12 @@ void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockI ...@@ -3370,13 +3287,12 @@ void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockI
int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) { int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader* pReader, SColumnDataAgg*** pBlockStatis, bool* allHave) {
int32_t code = 0; int32_t code = 0;
// *allHave = false; *allHave = false;
// SQueryFilePos* c = &pReader->cur; if (pReader->status.composedDataBlock) {
// if (c->mixBlock) { *pBlockStatis = NULL;
// *pBlockStatis = NULL; return TSDB_CODE_SUCCESS;
// return TSDB_CODE_SUCCESS; }
// }
// SFileBlockInfo* pBlockInfo = &pReader->pDataBlockInfo[c->slot]; // SFileBlockInfo* pBlockInfo = &pReader->pDataBlockInfo[c->slot];
// assert((c->slot >= 0 && c->slot < pReader->numOfBlocks) || ((c->slot == pReader->numOfBlocks) && (c->slot == 0))); // assert((c->slot >= 0 && c->slot < pReader->numOfBlocks) || ((c->slot == pReader->numOfBlocks) && (c->slot == 0)));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册