未验证 提交 1e72a957 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #19846 from taosdata/fix/nodisk

fix(query): check if table exists or not during build data blocks
......@@ -60,6 +60,6 @@ target_link_libraries(
PRIVATE os util common transport nodes parser command planner catalog scheduler function qcom
)
if(${BUILD_TEST})
#if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
endif(${BUILD_TEST})
\ No newline at end of file
#endif(${BUILD_TEST})
\ No newline at end of file
......@@ -112,7 +112,7 @@ void createNewTable(TAOS* pConn, int32_t index) {
}
taos_free_result(pRes);
for(int32_t i = 0; i < 20; i += 20) {
for(int32_t i = 0; i < 2000; i += 20) {
char sql[1024] = {0};
sprintf(sql,
"insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
......@@ -692,6 +692,7 @@ TEST(testCase, insert_test) {
taos_free_result(pRes);
taos_close(pConn);
}
#endif
TEST(testCase, projection_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
......@@ -725,7 +726,7 @@ TEST(testCase, projection_query_tables) {
}
taos_free_result(pRes);
for (int32_t i = 0; i < 200000; ++i) {
for (int32_t i = 0; i < 2; ++i) {
printf("create table :%d\n", i);
createNewTable(pConn, i);
}
......@@ -751,6 +752,7 @@ TEST(testCase, projection_query_tables) {
taos_close(pConn);
}
#if 0
TEST(testCase, tsbs_perf_test) {
TdThread qid[20] = {0};
......@@ -760,8 +762,6 @@ TEST(testCase, tsbs_perf_test) {
getchar();
}
#endif
TEST(testCase, projection_query_stables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
......@@ -790,7 +790,6 @@ TEST(testCase, projection_query_stables) {
taos_close(pConn);
}
#if 0
TEST(testCase, agg_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
......@@ -831,7 +830,7 @@ TEST(testCase, async_api_test) {
ASSERT_NE(pConn, nullptr);
taos_query(pConn, "use abc1");
#if 0
TAOS_RES* pRes = taos_query(pConn, "insert into tu(ts) values('2022-02-27 12:12:61')");
if (taos_errno(pRes) != 0) {
printf("failed, reason:%s\n", taos_errstr(pRes));
......@@ -854,7 +853,6 @@ TEST(testCase, async_api_test) {
printf("%s\n", str);
memset(str, 0, sizeof(str));
}
#endif
taos_query_a(pConn, "select count(*) from tu", queryCallback, pConn);
getchar();
......
......@@ -178,7 +178,6 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableL
void tsdbReaderClose(STsdbReader *pReader);
bool tsdbNextDataBlock(STsdbReader *pReader);
void tsdbRetrieveDataBlockInfo(const STsdbReader *pReader, int32_t *rows, uint64_t *uid, STimeWindow *pWindow);
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave);
SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
......
......@@ -515,7 +515,7 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
_exit:
return (terrno == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL);
return (terrno == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL);
}
SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; }
......
......@@ -220,6 +220,8 @@ static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFil
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid);
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList,
......@@ -998,7 +1000,7 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo
}
}
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
......@@ -1015,6 +1017,14 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
bool asc = ASCENDING_TRAVERSE(pReader->order);
int32_t step = asc ? 1 : -1;
// no data exists, return directly.
if (pBlockData->nRow == 0 || pBlockData->aTSKEY == 0) {
tsdbWarn("%p no need to copy since no data in blockData, table uid:%" PRIu64 " has been dropped, %s", pReader, pBlockInfo->uid,
pReader->idStr);
pResBlock->info.rows = 0;
return 0;
}
if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pBlock->nRow - 1 && (!asc))) {
if (asc && pReader->window.skey <= pBlock->minKey.ts) {
// pDumpInfo->rowIndex = 0;
......@@ -1128,12 +1138,19 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData,
uint64_t uid) {
int32_t code = 0;
int64_t st = taosGetTimestampUs();
tBlockDataReset(pBlockData);
STSchema* pSchema = getLatestTableSchema(pReader, uid);
if (pSchema == NULL) {
tsdbDebug("%p table uid:%"PRIu64" has been dropped, no data existed, %s", pReader, uid, pReader->idStr);
return code;
}
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
TABLEID tid = {.suid = pReader->suid, .uid = uid};
int32_t code =
tBlockDataInit(pBlockData, &tid, pReader->pSchema, &pReader->suppInfo.colId[1], pReader->suppInfo.numOfCols - 1);
code = tBlockDataInit(pBlockData, &tid, pSchema, &pSup->colId[1], pSup->numOfCols - 1);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -1656,6 +1673,19 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas
return false;
}
static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid) {
if (pReader->pSchema != NULL) {
return pReader->pSchema;
}
pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1, 1);
if (pReader->pSchema == NULL) {
tsdbError("failed to get table schema, uid:%" PRIu64 ", it may have been dropped, ver:-1, %s", uid, pReader->idStr);
}
return pReader->pSchema;
}
static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader, uint64_t uid) {
// always set the newest schema version in pReader->pSchema
if (pReader->pSchema == NULL) {
......@@ -2459,7 +2489,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
pBlock->nRow <= pReader->capacity) {
if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) {
copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
copyBlockDataToSDataBlock(pReader);
// record the last key value
pBlockScanInfo->lastKey = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
......@@ -2936,8 +2966,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
// update the last key for the corresponding table
pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? pInfo->window.ekey : pInfo->window.skey;
tsdbDebug("%p uid:%" PRIu64
" clean file block retrieved from file, global index:%d, "
tsdbDebug("%p uid:%" PRIu64 " clean file block retrieved from file, global index:%d, "
"table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s",
pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->nRow, pBlock->minKey.ts,
pBlock->maxKey.ts, pReader->idStr);
......@@ -4029,9 +4058,11 @@ void tsdbReaderClose(STsdbReader* pReader) {
taosMemoryFree(pReader->idStr);
taosMemoryFree(pReader->pSchema);
if (pReader->pMemSchema != pReader->pSchema) {
taosMemoryFree(pReader->pMemSchema);
}
taosMemoryFreeClear(pReader);
}
......@@ -4111,26 +4142,6 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
return false;
}
static void setBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
*rows = pReader->pResBlock->info.rows;
*uid = pReader->pResBlock->info.id.uid;
*pWindow = pReader->pResBlock->info.window;
}
void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) {
if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
if (pReader->step == EXTERNAL_ROWS_MAIN) {
setBlockInfo(pReader, rows, uid, pWindow);
} else if (pReader->step == EXTERNAL_ROWS_PREV) {
setBlockInfo(pReader->innerReader[0], rows, uid, pWindow);
} else {
setBlockInfo(pReader->innerReader[1], rows, uid, pWindow);
}
} else {
setBlockInfo(pReader, rows, uid, pWindow);
}
}
static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols, SColumnDataAgg* pTsAgg) {
// do fill all null column value SMA info
int32_t i = 0, j = 0;
......@@ -4266,7 +4277,7 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
return NULL;
}
copyBlockDataToSDataBlock(pReader, pBlockScanInfo);
copyBlockDataToSDataBlock(pReader);
return pReader->pResBlock;
}
......
......@@ -568,7 +568,7 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
needRemoveFile = true;
uDebug(
"Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page "
"size:%.2f Kb, %s\n",
"size:%.2f Kb, %s",
pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id);
......@@ -585,8 +585,7 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->loadPages);
} else {
uDebug(
"Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f "
"Kb",
"Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPgSize:%.2f Kb",
ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f,
ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册