提交 5dfdb911 编写于 作者: H Haojun Liao

fix(query): check if table exists or not during build data block in tsdbread.

上级 5f89fe76
......@@ -178,7 +178,6 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableL
void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader);
void tsdbRetrieveDataBlockInfo(const STsdbReader *pReader, int32_t *rows, uint64_t *uid, STimeWindow *pWindow);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave);
SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
......
......@@ -515,7 +515,7 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
_exit:
return (terrno == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL);
return (terrno == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL);
}
SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; }
......
......@@ -220,6 +220,8 @@ static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFil
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid);
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList,
......@@ -998,7 +1000,7 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo
}
}
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
......@@ -1015,6 +1017,14 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
bool asc = ASCENDING_TRAVERSE(pReader->order);
int32_t step = asc ? 1 : -1;
// no data exists, return directly.
if (pBlockData->nRow == 0 || pBlockData->aTSKEY == 0) {
tsdbWarn("%p no need to copy since no data in blockData, table uid:%" PRIu64 " has been dropped, %s", pReader, pBlockInfo->uid,
pReader->idStr);
pResBlock->info.rows = 0;
return 0;
}
if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) {
if (asc && pReader->window.skey <= pBlock->minKey.ts) {
// pDumpInfo->rowIndex = 0;
......@@ -1128,12 +1138,19 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData,
uint64_t uid) {
int32_t code = 0;
int64_t st = taosGetTimestampUs();
tBlockDataReset(pBlockData);
STSchema* pSchema = getLatestTableSchema(pReader, uid);
if (pSchema == NULL) {
tsdbDebug("%p table uid:%"PRIu64" has been dropped, no data existed, %s", pReader, uid, pReader->idStr);
return code;
}
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
TABLEID tid = {.suid = pReader->suid, .uid = uid};
int32_t code =
tBlockDataInit(pBlockData, &tid, pReader->pSchema, &pReader->suppInfo.colId[1], pReader->suppInfo.numOfCols - 1);
code = tBlockDataInit(pBlockData, &tid, pSchema, &pSup->colId[1], pSup->numOfCols - 1);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -1656,6 +1673,19 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas
return false;
}
static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid) {
if (pReader->pSchema != NULL) {
return pReader->pSchema;
}
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
if (pReader->pSchema == NULL) {
tsdbError("failed to get table schema, uid:%" PRIu64 ", it may have been dropped, ver:-1, %s", uid, pReader->idStr);
}
return pReader->pSchema;
}
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
// always set the newest schema version in pReader->pSchema
if (pReader->pSchema == NULL) {
......@@ -2459,7 +2489,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
pBlock->nRow <= pReader->capacity) {
if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) {
copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
copyBlockDataToSDataBlock(pReader);
// record the last key value
pBlockScanInfo->lastKey = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
......@@ -2936,8 +2966,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
// update the last key for the corresponding table
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey;
tsdbDebug("%p uid:%" PRIu64
" clean file block retrieved from file, global index:%d, "
tsdbDebug("%p uid:%" PRIu64 " clean file block retrieved from file, global index:%d, "
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->nRow, pBlock->minKey.ts,
pBlock->maxKey.ts, pReader->idStr);
......@@ -3867,6 +3896,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
// no valid error code set in metaGetTbTSchema, so let's set the error code here.
// we should proceed in case of tmq processing.
if (pCond->suid != 0) {
taosSsleep(20);
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pReader->suid, -1, 1);
if (pReader->pSchema == NULL) {
tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr);
......@@ -4029,9 +4060,11 @@ void tsdbReaderClose(STsdbReader* pReader) {
taosMemoryFree(pReader->idStr);
taosMemoryFree(pReader->pSchema);
if (pReader->pMemSchema != pReader->pSchema) {
taosMemoryFree(pReader->pMemSchema);
}
taosMemoryFreeClear(pReader);
}
......@@ -4111,26 +4144,6 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
return false;
}
static void setBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
*rows = pReader->pResBlock->info.rows;
*uid = pReader->pResBlock->info.id.uid;
*pWindow = pReader->pResBlock->info.window;
}
void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
if (pReader->step == EXTERNAL_ROWS_MAIN) {
setBlockInfo(pReader, rows, uid, pWindow);
} else if (pReader->step == EXTERNAL_ROWS_PREV) {
setBlockInfo(pReader->innerReader[0], rows, uid, pWindow);
} else {
setBlockInfo(pReader->innerReader[1], rows, uid, pWindow);
}
} else {
setBlockInfo(pReader, rows, uid, pWindow);
}
}
static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols, SColumnDataAgg* pTsAgg) {
// do fill all null column value SMA info
int32_t i = 0, j = 0;
......@@ -4266,7 +4279,7 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
return NULL;
}
copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
copyBlockDataToSDataBlock(pReader);
return pReader->pResBlock;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册