未验证 提交 d29a6049 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #15554 from taosdata/feature/3_liaohj

refactor: do some internal refactor and opt query performance.
...@@ -27,10 +27,6 @@ else () ...@@ -27,10 +27,6 @@ else ()
cat("${TD_SUPPORT_DIR}/taosadapter_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) cat("${TD_SUPPORT_DIR}/taosadapter_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif() endif()
if(TD_LINUX_64 AND JEMALLOC_ENABLED)
cat("${TD_SUPPORT_DIR}/jemalloc_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif()
# pthread # pthread
if(${BUILD_PTHREAD}) if(${BUILD_PTHREAD})
cat("${TD_SUPPORT_DIR}/pthread_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) cat("${TD_SUPPORT_DIR}/pthread_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
...@@ -418,18 +414,6 @@ if(${BUILD_ADDR2LINE}) ...@@ -418,18 +414,6 @@ if(${BUILD_ADDR2LINE})
endif(NOT ${TD_WINDOWS}) endif(NOT ${TD_WINDOWS})
endif(${BUILD_ADDR2LINE}) endif(${BUILD_ADDR2LINE})
# jemalloc
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
include(ExternalProject)
ExternalProject_Add(jemalloc
PREFIX "jemalloc"
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/jemalloc
BUILD_IN_SOURCE 1
CONFIGURE_COMMAND ./autogen.sh COMMAND ./configure --prefix=${CMAKE_BINARY_DIR}/build/
BUILD_COMMAND ${MAKE}
)
INCLUDE_DIRECTORIES(${CMAKE_BINARY_DIR}/build/include)
ENDIF ()
# ================================================================================================ # ================================================================================================
# Build test # Build test
......
...@@ -1404,7 +1404,7 @@ typedef struct STableScanAnalyzeInfo { ...@@ -1404,7 +1404,7 @@ typedef struct STableScanAnalyzeInfo {
uint32_t skipBlocks; uint32_t skipBlocks;
uint32_t filterOutBlocks; uint32_t filterOutBlocks;
double elapsedTime; double elapsedTime;
uint64_t filterTime; double filterTime;
} STableScanAnalyzeInfo; } STableScanAnalyzeInfo;
int32_t tSerializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp); int32_t tSerializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp);
......
...@@ -123,7 +123,7 @@ void createNewTable(TAOS* pConn, int32_t index) { ...@@ -123,7 +123,7 @@ void createNewTable(TAOS* pConn, int32_t index) {
} }
taos_free_result(pRes); taos_free_result(pRes);
for(int32_t i = 0; i < 1000; i += 20) { for(int32_t i = 0; i < 100000; i += 20) {
char sql[1024] = {0}; char sql[1024] = {0};
sprintf(sql, sprintf(sql,
"insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" "insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
...@@ -154,7 +154,7 @@ TEST(testCase, driverInit_Test) { ...@@ -154,7 +154,7 @@ TEST(testCase, driverInit_Test) {
} }
TEST(testCase, connect_Test) { TEST(testCase, connect_Test) {
// taos_options(TSDB_OPTION_CONFIGDIR, "/home/ubuntu/first/cfg"); taos_options(TSDB_OPTION_CONFIGDIR, "/home/lisa/Documents/workspace/tdengine/sim/dnode1/cfg");
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) { if (pConn == NULL) {
...@@ -501,7 +501,6 @@ TEST(testCase, show_vgroup_Test) { ...@@ -501,7 +501,6 @@ TEST(testCase, show_vgroup_Test) {
taos_close(pConn); taos_close(pConn);
} }
TEST(testCase, create_multiple_tables) { TEST(testCase, create_multiple_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr); ASSERT_NE(pConn, nullptr);
...@@ -665,6 +664,7 @@ TEST(testCase, insert_test) { ...@@ -665,6 +664,7 @@ TEST(testCase, insert_test) {
taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
} }
#endif
TEST(testCase, projection_query_tables) { TEST(testCase, projection_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
...@@ -697,7 +697,7 @@ TEST(testCase, projection_query_tables) { ...@@ -697,7 +697,7 @@ TEST(testCase, projection_query_tables) {
} }
taos_free_result(pRes); taos_free_result(pRes);
for(int32_t i = 0; i < 100; ++i) { for(int32_t i = 0; i < 1; ++i) {
printf("create table :%d\n", i); printf("create table :%d\n", i);
createNewTable(pConn, i); createNewTable(pConn, i);
} }
...@@ -723,6 +723,7 @@ TEST(testCase, projection_query_tables) { ...@@ -723,6 +723,7 @@ TEST(testCase, projection_query_tables) {
taos_close(pConn); taos_close(pConn);
} }
#if 0
TEST(testCase, projection_query_stables) { TEST(testCase, projection_query_stables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr); ASSERT_NE(pConn, nullptr);
...@@ -820,21 +821,8 @@ TEST(testCase, async_api_test) { ...@@ -820,21 +821,8 @@ TEST(testCase, async_api_test) {
getchar(); getchar();
taos_close(pConn); taos_close(pConn);
} }
#endif
TEST(testCase, update_test) { TEST(testCase, update_test) {
SInterval interval = {0};
interval.offset = 8000;
interval.interval = 10000;
interval.sliding = 4000;
interval.intervalUnit = 's';
interval.offsetUnit = 's';
interval.slidingUnit = 's';
// STimeWindow w = getAlignQueryTimeWindow(&interval, 0, 1630000000000);
STimeWindow w = getAlignQueryTimeWindow(&interval, 0, 1629999999999);
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr); ASSERT_NE(pConn, nullptr);
...@@ -869,4 +857,8 @@ TEST(testCase, update_test) { ...@@ -869,4 +857,8 @@ TEST(testCase, update_test) {
taos_free_result(pRes); taos_free_result(pRes);
} }
} }
#endif
#pragma GCC diagnostic pop #pragma GCC diagnostic pop
...@@ -145,7 +145,8 @@ static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanI ...@@ -145,7 +145,8 @@ static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanI
SRowMerger* pMerger); SRowMerger* pMerger);
static int32_t doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger, static int32_t doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
STsdbReader* pReader); STsdbReader* pReader);
static int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow); static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow);
static int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex);
static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
static void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader); static void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader);
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order); static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
...@@ -691,16 +692,13 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn ...@@ -691,16 +692,13 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
SBlock* pBlock = getCurrentBlock(pBlockIter); SBlock* pBlock = getCurrentBlock(pBlockIter);
SSDataBlock* pResBlock = pReader->pResBlock; SSDataBlock* pResBlock = pReader->pResBlock;
int32_t numOfCols = blockDataGetNumOfCols(pResBlock); int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock);
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int64_t st = taosGetTimestampUs();
SColVal cv = {0}; SColVal cv = {0};
int32_t colIndex = 0; int64_t st = taosGetTimestampUs();
bool asc = ASCENDING_TRAVERSE(pReader->order); bool asc = ASCENDING_TRAVERSE(pReader->order);
int32_t step = asc ? 1 : -1; int32_t step = asc ? 1 : -1;
...@@ -724,7 +722,9 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn ...@@ -724,7 +722,9 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
i += 1; i += 1;
} }
while (i < numOfCols && colIndex < taosArrayGetSize(pBlockData->aIdx)) { int32_t colIndex = 0;
int32_t num = taosArrayGetSize(pBlockData->aIdx);
while (i < numOfOutputCols && colIndex < num) {
rowIndex = 0; rowIndex = 0;
pColData = taosArrayGet(pResBlock->pDataBlock, i); pColData = taosArrayGet(pResBlock->pDataBlock, i);
...@@ -744,7 +744,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn ...@@ -744,7 +744,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
i += 1; i += 1;
} }
while (i < numOfCols) { while (i < numOfOutputCols) {
pColData = taosArrayGet(pResBlock->pDataBlock, i); pColData = taosArrayGet(pResBlock->pDataBlock, i);
colDataAppendNNULL(pColData, 0, remain); colDataAppendNNULL(pColData, 0, remain);
i += 1; i += 1;
...@@ -1256,7 +1256,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1256,7 +1256,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
} }
tRowMergerClear(&merge); tRowMergerClear(&merge);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1300,7 +1300,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1300,7 +1300,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
} }
tRowMergerGetRow(&merge, &pTSRow); tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { // key > ik.ts || key > k.ts } else { // key > ik.ts || key > k.ts
ASSERT(key != ik.ts); ASSERT(key != ik.ts);
...@@ -1309,7 +1309,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1309,7 +1309,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
// [4] ik.ts < k.ts <= key // [4] ik.ts < k.ts <= key
if (ik.ts < k.ts) { if (ik.ts < k.ts) {
doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader); doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1317,7 +1317,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1317,7 +1317,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
// [6] k.ts < ik.ts <= key // [6] k.ts < ik.ts <= key
if (k.ts < ik.ts) { if (k.ts < ik.ts) {
doMergeMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader); doMergeMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1326,7 +1326,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1326,7 +1326,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
ASSERT(key > ik.ts && key > k.ts); ASSERT(key > ik.ts && key > k.ts);
doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow); doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
...@@ -1350,7 +1350,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1350,7 +1350,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
} }
tRowMergerGetRow(&merge, &pTSRow); tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
ASSERT(ik.ts != k.ts); // this case has been included in the previous if branch ASSERT(ik.ts != k.ts); // this case has been included in the previous if branch
...@@ -1359,7 +1359,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1359,7 +1359,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
// [4] ik.ts > key >= k.ts // [4] ik.ts > key >= k.ts
if (ik.ts > key) { if (ik.ts > key) {
doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader); doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1371,7 +1371,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1371,7 +1371,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow); tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1383,7 +1383,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1383,7 +1383,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
tRowMerge(&merge, &fRow); tRowMerge(&merge, &fRow);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow); tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
...@@ -1438,6 +1438,21 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI ...@@ -1438,6 +1438,21 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
} }
// imem & mem are all empty, only file exist // imem & mem are all empty, only file exist
// opt version
// 1. it is not a border point
// 2. the direct next point is not an duplicated timestamp
if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) ||
(pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) {
int32_t step = pReader->order == TSDB_ORDER_ASC? 1:-1;
int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
if (nextKey != key) { // merge is not needed
doAppendRowFromBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
pDumpInfo->rowIndex += step;
return TSDB_CODE_SUCCESS;
}
}
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
STSRow* pTSRow = NULL; STSRow* pTSRow = NULL;
...@@ -1446,7 +1461,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI ...@@ -1446,7 +1461,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
tRowMergerInit(&merge, &fRow, pReader->pSchema); tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow); tRowMergerGetRow(&merge, &pTSRow);
doAppendOneRow(pReader->pResBlock, pReader, pTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow);
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
tRowMergerClear(&merge); tRowMergerClear(&merge);
...@@ -2201,7 +2216,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc ...@@ -2201,7 +2216,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
int32_t step = asc ? 1 : -1; int32_t step = asc ? 1 : -1;
pDumpInfo->rowIndex += step; pDumpInfo->rowIndex += step;
if (pDumpInfo->rowIndex <= pBlockData->nRow - 1) { if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) ||(pDumpInfo->rowIndex >= 0 && !asc)) {
pDumpInfo->rowIndex = pDumpInfo->rowIndex =
doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step); doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step);
} }
...@@ -2325,7 +2340,7 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR ...@@ -2325,7 +2340,7 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow) { int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow) {
int32_t numOfRows = pBlock->info.rows; int32_t numOfRows = pBlock->info.rows;
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
...@@ -2369,6 +2384,47 @@ int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow ...@@ -2369,6 +2384,47 @@ int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex) {
int32_t i = 0, j = 0;
int32_t outputRowIndex = pResBlock->info.rows;
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i);
if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
colDataAppendInt64(pColData, outputRowIndex, &pBlockData->aTSKEY[rowIndex]);
i += 1;
}
SColVal cv = {0};
int32_t numOfInputCols = taosArrayGetSize(pBlockData->aIdx);
int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock);
while(i < numOfOutputCols && j < numOfInputCols) {
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j);
if (pData->cid == pCol->info.colId) {
tColDataGetValue(pData, rowIndex, &cv);
doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
j += 1;
} else { // the specified column does not exist in file block, fill with null data
colDataAppendNULL(pCol, outputRowIndex);
}
i += 1;
}
while (i < numOfOutputCols) {
SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i);
colDataAppendNULL(pCol, outputRowIndex);
i += 1;
}
pResBlock->info.rows += 1;
return TSDB_CODE_SUCCESS;
}
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
STsdbReader* pReader) { STsdbReader* pReader) {
SSDataBlock* pBlock = pReader->pResBlock; SSDataBlock* pBlock = pReader->pResBlock;
...@@ -2380,7 +2436,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e ...@@ -2380,7 +2436,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
break; break;
} }
doAppendOneRow(pBlock, pReader, pTSRow); doAppendRowFromTSRow(pBlock, pReader, pTSRow);
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
// no data in buffer, return immediately // no data in buffer, return immediately
......
...@@ -103,7 +103,7 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int ...@@ -103,7 +103,7 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList); void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
bool hasDataInGroupInfo(SGroupResInfo* pGroupResInfo); bool hasRemainResults(SGroupResInfo* pGroupResInfo);
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
......
...@@ -297,6 +297,20 @@ enum { ...@@ -297,6 +297,20 @@ enum {
TABLE_SCAN__BLOCK_ORDER = 2, TABLE_SCAN__BLOCK_ORDER = 2,
}; };
typedef struct SAggSupporter {
SHashObj* pResultRowHashTable; // quick locate the window object for each result
char* keyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
} SAggSupporter;
typedef struct {
// if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
SInterval interval;
SAggSupporter *pAggSup;
SExprSupp *pExprSup; // expr supporter of aggregate operator
} SAggOptrPushDownInfo;
typedef struct STableScanInfo { typedef struct STableScanInfo {
STsdbReader* dataReader; STsdbReader* dataReader;
SReadHandle readHandle; SReadHandle readHandle;
...@@ -312,12 +326,13 @@ typedef struct STableScanInfo { ...@@ -312,12 +326,13 @@ typedef struct STableScanInfo {
SQueryTableDataCond cond; SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag; int32_t dataBlockLoadFlag;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded. // SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
SSampleExecInfo sample; // sample execution info SSampleExecInfo sample; // sample execution info
int32_t currentGroupId; int32_t currentGroupId;
int32_t currentTable; int32_t currentTable;
int8_t scanMode; int8_t scanMode;
int8_t noTable; int8_t noTable;
SAggOptrPushDownInfo pdInfo;
int8_t assignBlockUid; int8_t assignBlockUid;
} STableScanInfo; } STableScanInfo;
...@@ -505,13 +520,6 @@ typedef struct SOptrBasicInfo { ...@@ -505,13 +520,6 @@ typedef struct SOptrBasicInfo {
SSDataBlock* pRes; SSDataBlock* pRes;
} SOptrBasicInfo; } SOptrBasicInfo;
typedef struct SAggSupporter {
SHashObj* pResultRowHashTable; // quick locate the window object for each result
char* keyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
} SAggSupporter;
typedef struct SIntervalAggOperatorInfo { typedef struct SIntervalAggOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo; // basic info SOptrBasicInfo binfo; // basic info
...@@ -1019,6 +1027,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -1019,6 +1027,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
bool groupbyTbname(SNodeList* pGroupList);
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey); int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey);
SSDataBlock* createSpecialDataBlock(EStreamType type); SSDataBlock* createSpecialDataBlock(EStreamType type);
void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput);
......
...@@ -137,7 +137,7 @@ void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL ...@@ -137,7 +137,7 @@ void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL
ASSERT(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo)); ASSERT(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
} }
bool hasDataInGroupInfo(SGroupResInfo* pGroupResInfo) { bool hasRemainResults(SGroupResInfo* pGroupResInfo) {
if (pGroupResInfo->pRows == NULL) { if (pGroupResInfo->pRows == NULL) {
return false; return false;
} }
......
...@@ -242,25 +242,17 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo ...@@ -242,25 +242,17 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
} }
// todo refactor STableList // todo refactor STableList
bool assignUid = false;
size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0; size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0;
char* keyBuf = NULL; char* keyBuf = NULL;
if (bufLen > 0) { if (bufLen > 0) {
assignUid = groupbyTbname(pScanInfo->pGroupTags);
keyBuf = taosMemoryMalloc(bufLen); keyBuf = taosMemoryMalloc(bufLen);
if (keyBuf == NULL) { if (keyBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
} }
bool assignUid = false;
if (LIST_LENGTH(pScanInfo->pGroupTags) > 0) {
SNode* p = nodesListGetNode(pScanInfo->pGroupTags, 0);
if (p->type == QUERY_NODE_FUNCTION) {
// partition by tbname/group by tbname
assignUid = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0);
}
}
for (int32_t i = 0; i < taosArrayGetSize(qa); ++i) { for (int32_t i = 0; i < taosArrayGetSize(qa); ++i) {
uint64_t* uid = taosArrayGet(qa, i); uint64_t* uid = taosArrayGet(qa, i);
STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0}; STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
......
...@@ -141,8 +141,7 @@ static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, ...@@ -141,8 +141,7 @@ static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock,
SqlFunctionCtx* pCtx, int32_t numOfExprs); SqlFunctionCtx* pCtx, int32_t numOfExprs);
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput, static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
uint64_t groupId);
// setup the output buffer for each operator // setup the output buffer for each operator
static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) { static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) {
...@@ -1393,10 +1392,11 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR ...@@ -1393,10 +1392,11 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
} }
} }
void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput, void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
uint64_t groupId) {
// for simple group by query without interval, all the tables belong to one group result. // for simple group by query without interval, all the tables belong to one group result.
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SAggOperatorInfo* pAggInfo = pOperator->info;
SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo; SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo;
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset; int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset;
...@@ -1420,14 +1420,13 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggIn ...@@ -1420,14 +1420,13 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggIn
setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
} }
void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId, SAggOperatorInfo* pAggInfo) { static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
if (pAggInfo->groupId != INT32_MIN && pAggInfo->groupId == groupId) { SAggOperatorInfo* pAggInfo = pOperator->info;
if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) {
return; return;
} }
#ifdef BUF_PAGE_DEBUG
qDebug("page_setbuf, groupId:%" PRIu64, groupId); doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId);
#endif
doSetTableGroupOutputBuf(pOperator, pAggInfo, numOfOutput, groupId);
// record the current active group id // record the current active group id
pAggInfo->groupId = groupId; pAggInfo->groupId = groupId;
...@@ -1594,7 +1593,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG ...@@ -1594,7 +1593,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
pBlock->info.version = pTaskInfo->version; pBlock->info.version = pTaskInfo->version;
blockDataCleanup(pBlock); blockDataCleanup(pBlock);
if (!hasDataInGroupInfo(pGroupResInfo)) { if (!hasRemainResults(pGroupResInfo)) {
return; return;
} }
...@@ -2931,7 +2930,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { ...@@ -2931,7 +2930,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
} }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId, pAggInfo); setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId);
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, true); setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, true);
code = doAggregateImpl(pOperator, pSup->pCtx); code = doAggregateImpl(pOperator, pSup->pCtx);
if (code != 0) { if (code != 0) {
...@@ -2966,7 +2965,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { ...@@ -2966,7 +2965,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
doFilter(pAggInfo->pCondition, pInfo->pRes, NULL); doFilter(pAggInfo->pCondition, pInfo->pRes, NULL);
if (!hasDataInGroupInfo(&pAggInfo->groupResInfo)) { if (!hasRemainResults(&pAggInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
break; break;
} }
...@@ -3501,7 +3500,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -3501,7 +3500,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
goto _error; goto _error;
} }
pInfo->groupId = INT32_MIN; pInfo->groupId = UINT64_MAX;
pInfo->pCondition = pCondition; pInfo->pCondition = pCondition;
pOperator->name = "TableAggregate"; pOperator->name = "TableAggregate";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG;
...@@ -3513,6 +3512,12 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -3513,6 +3512,12 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo, pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo,
aggEncodeResultRow, aggDecodeResultRow, NULL); aggEncodeResultRow, aggDecodeResultRow, NULL);
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pTableScanInfo = downstream->info;
pTableScanInfo->pdInfo.pExprSup = &pOperator->exprSupp;
pTableScanInfo->pdInfo.pAggSup = &pInfo->aggSup;
}
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
...@@ -3825,6 +3830,19 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) ...@@ -3825,6 +3830,19 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum)
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
} }
bool groupbyTbname(SNodeList* pGroupList) {
bool bytbname = false;
if (LIST_LENGTH(pGroupList) > 0) {
SNode* p = nodesListGetNode(pGroupList, 0);
if (p->type == QUERY_NODE_FUNCTION) {
// partition by tbname/group by tbname
bytbname = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0);
}
}
return bytbname;
}
int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group) { int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group) {
if (group == NULL) { if (group == NULL) {
return TDB_CODE_SUCCESS; return TDB_CODE_SUCCESS;
...@@ -3851,12 +3869,21 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, ...@@ -3851,12 +3869,21 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
bool assignUid = groupbyTbname(group);
int32_t groupNum = 0; int32_t groupNum = 0;
for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) { size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
for (int32_t i = 0; i < numOfTables; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
int32_t code = getGroupIdFromTagsVal(pHandle->meta, info->uid, group, keyBuf, &info->groupId);
if (code != TSDB_CODE_SUCCESS) { if (assignUid) {
return code; info->groupId = info->uid;
} else {
int32_t code = getGroupIdFromTagsVal(pHandle->meta, info->uid, group, keyBuf, &info->groupId);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} }
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
......
...@@ -301,8 +301,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) { ...@@ -301,8 +301,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes, NULL); doFilter(pInfo->pCondition, pRes, NULL);
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); if (!hasRemainResults(&pInfo->groupResInfo)) {
if (!hasRemain) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
break; break;
} }
......
...@@ -166,6 +166,67 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn ...@@ -166,6 +166,67 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
return false; return false;
} }
// this function is for table scanner to extract temporary results of upstream aggregate results.
static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t groupId, SFilePage** pPage) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
return NULL;
}
int64_t buf[2] = {0};
SET_RES_WINDOW_KEY((char*)buf, &groupId, sizeof(groupId), groupId);
STableScanInfo* pTableScanInfo = pOperator->info;
SResultRowPosition* p1 =
(SResultRowPosition*)taosHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId)));
if (p1 == NULL) {
return NULL;
}
*pPage = getBufPage(pTableScanInfo->pdInfo.pAggSup->pResultBuf, p1->pageId);
return (SResultRow*)((char*)(*pPage) + p1->offset);
}
static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) {
STableScanInfo* pTableScanInfo = pOperator->info;
if (pTableScanInfo->pdInfo.pExprSup == NULL) {
return TSDB_CODE_SUCCESS;
}
SExprSupp* pSup1 = pTableScanInfo->pdInfo.pExprSup;
SFilePage* pPage = NULL;
SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->groupId, &pPage);
if (pRow == NULL) {
return TSDB_CODE_SUCCESS;
}
bool notLoadBlock = true;
for (int32_t i = 0; i < pSup1->numOfExprs; ++i) {
int32_t functionId = pSup1->pCtx[i].functionId;
SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->pdInfo.pExprSup->rowEntryInfoOffset);
int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window);
if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) {
notLoadBlock = false;
break;
}
}
// release buffer pages
releaseBufPage(pTableScanInfo->pdInfo.pAggSup->pResultBuf, pPage);
if (notLoadBlock) {
*status = FUNC_DATA_REQUIRED_NOT_LOAD;
}
return TSDB_CODE_SUCCESS;
}
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
uint32_t* status) { uint32_t* status) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
...@@ -178,7 +239,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca ...@@ -178,7 +239,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
*status = pInfo->dataBlockLoadFlag; *status = pInfo->dataBlockLoadFlag;
if (pTableScanInfo->pFilterNode != NULL || if (pTableScanInfo->pFilterNode != NULL ||
overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) { overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) {
(*status) = FUNC_DATA_REQUIRED_DATA_LOAD; (*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
} }
...@@ -232,6 +293,16 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca ...@@ -232,6 +293,16 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD); ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
// todo filter data block according to the block sma data firstly // todo filter data block according to the block sma data firstly
doDynamicPruneDataBlock(pOperator, pBlockInfo, status);
if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->skipBlocks += 1;
return TSDB_CODE_SUCCESS;
}
#if 0 #if 0
if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) { if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) {
pCost->filterOutBlocks += 1; pCost->filterOutBlocks += 1;
...@@ -263,18 +334,20 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca ...@@ -263,18 +334,20 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
} }
} }
int64_t st = taosGetTimestampMs(); if (pTableScanInfo->pFilterNode != NULL) {
doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo); int64_t st = taosGetTimestampUs();
doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo);
int64_t et = taosGetTimestampMs(); double el = (taosGetTimestampUs() - st) / 1000.0;
pTableScanInfo->readRecorder.filterTime += (et - st); pTableScanInfo->readRecorder.filterTime += el;
if (pBlock->info.rows == 0) { if (pBlock->info.rows == 0) {
pCost->filterOutBlocks += 1; pCost->filterOutBlocks += 1;
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms",
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
} else { } else {
qDebug("%s data block filter out, elapsed time:%" PRId64, GET_TASKID(pTaskInfo), (et - st)); qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -607,10 +680,11 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -607,10 +680,11 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
} }
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
// pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose // pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose
// pInfo->cond.order = TSDB_ORDER_DESC;
pInfo->pdInfo.interval = extractIntervalInfo(pTableScanNode);
pInfo->readHandle = *readHandle; pInfo->readHandle = *readHandle;
pInfo->interval = extractIntervalInfo(pTableScanNode);
pInfo->sample.sampleRatio = pTableScanNode->ratio; pInfo->sample.sampleRatio = pTableScanNode->ratio;
pInfo->sample.seed = taosGetTimestampSec(); pInfo->sample.seed = taosGetTimestampSec();
...@@ -1489,14 +1563,14 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys ...@@ -1489,14 +1563,14 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->tqReader = pHandle->tqReader; pInfo->tqReader = pHandle->tqReader;
} }
if (pTSInfo->interval.interval > 0) { if (pTSInfo->pdInfo.interval.interval > 0) {
pInfo->pUpdateInfo = updateInfoInitP(&pTSInfo->interval, pInfo->twAggSup.waterMark); pInfo->pUpdateInfo = updateInfoInitP(&pTSInfo->pdInfo.interval, pInfo->twAggSup.waterMark);
} else { } else {
pInfo->pUpdateInfo = NULL; pInfo->pUpdateInfo = NULL;
} }
pInfo->pTableScanOp = pTableScanOp; pInfo->pTableScanOp = pTableScanOp;
pInfo->interval = pTSInfo->interval; pInfo->interval = pTSInfo->pdInfo.interval;
pInfo->readHandle = *pHandle; pInfo->readHandle = *pHandle;
pInfo->tableUid = pScanPhyNode->uid; pInfo->tableUid = pScanPhyNode->uid;
...@@ -2672,16 +2746,20 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc ...@@ -2672,16 +2746,20 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
} }
} }
int64_t st = taosGetTimestampMs(); if (pTableScanInfo->pFilterNode != NULL) {
doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo); int64_t st = taosGetTimestampMs();
doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo);
int64_t et = taosGetTimestampMs(); double el = (taosGetTimestampUs() - st) / 1000.0;
pTableScanInfo->readRecorder.filterTime += (et - st); pTableScanInfo->readRecorder.filterTime += el;
if (pBlock->info.rows == 0) { if (pBlock->info.rows == 0) {
pCost->filterOutBlocks += 1; pCost->filterOutBlocks += 1;
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms",
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
} else {
qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -1218,7 +1218,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { ...@@ -1218,7 +1218,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes, NULL); doFilter(pInfo->pCondition, pBInfo->pRes, NULL);
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) { if (!hasRemain) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
break; break;
...@@ -1257,7 +1257,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { ...@@ -1257,7 +1257,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes, NULL); doFilter(pInfo->pCondition, pBInfo->pRes, NULL);
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) { if (!hasRemain) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
break; break;
...@@ -1294,7 +1294,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { ...@@ -1294,7 +1294,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBlock, NULL); doFilter(pInfo->pCondition, pBlock, NULL);
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) { if (!hasRemain) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
break; break;
...@@ -1563,7 +1563,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1563,7 +1563,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
} }
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
qDebug("===stream===single interval is done"); qDebug("===stream===single interval is done");
freeAllPages(pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); freeAllPages(pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
...@@ -2010,7 +2010,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { ...@@ -2010,7 +2010,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes, NULL); doFilter(pInfo->pCondition, pBInfo->pRes, NULL);
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) { if (!hasRemain) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
break; break;
...@@ -2053,7 +2053,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { ...@@ -2053,7 +2053,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pBInfo->pRes, NULL); doFilter(pInfo->pCondition, pBInfo->pRes, NULL);
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); bool hasRemain = hasRemainResults(&pInfo->groupResInfo);
if (!hasRemain) { if (!hasRemain) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
break; break;
...@@ -2219,7 +2219,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { ...@@ -2219,7 +2219,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
// if (pOperator->status == OP_RES_TO_RETURN) { // if (pOperator->status == OP_RES_TO_RETURN) {
// // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); // // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
// if (pResBlock->info.rows == 0 || !hasDataInGroupInfo(&pSliceInfo->groupResInfo)) { // if (pResBlock->info.rows == 0 || !hasRemainResults(&pSliceInfo->groupResInfo)) {
// doSetOperatorCompleted(pOperator); // doSetOperatorCompleted(pOperator);
// } // }
// //
...@@ -3860,7 +3860,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -3860,7 +3860,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session"); printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
...@@ -4420,7 +4420,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { ...@@ -4420,7 +4420,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
return pInfo->pDelRes; return pInfo->pDelRes;
} }
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
printDataBlock(pBInfo->pRes, "single state"); printDataBlock(pBInfo->pRes, "single state");
......
...@@ -118,6 +118,7 @@ int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); ...@@ -118,6 +118,7 @@ int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
int32_t getFirstLastInfoSize(int32_t resBytes); int32_t getFirstLastInfoSize(int32_t resBytes);
EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow);
int32_t lastRowFunction(SqlFunctionCtx *pCtx); int32_t lastRowFunction(SqlFunctionCtx *pCtx);
......
...@@ -2328,6 +2328,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -2328,6 +2328,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.type = FUNCTION_TYPE_LAST, .type = FUNCTION_TYPE_LAST,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.translateFunc = translateFirstLast, .translateFunc = translateFirstLast,
.dynDataRequiredFunc = lastDynDataReq,
.getEnvFunc = getFirstLastFuncEnv, .getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
.processFunc = lastFunction, .processFunc = lastFunction,
......
...@@ -2700,6 +2700,22 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) ...@@ -2700,6 +2700,22 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow) {
SResultRowEntryInfo* pEntry = (SResultRowEntryInfo*) pRes;
// not initialized yet, data is required
if (pEntry == NULL) {
return FUNC_DATA_REQUIRED_DATA_LOAD;
}
SFirstLastRes* pResult = GET_ROWCELL_INTERBUF(pEntry);
if (pResult->hasResult && pResult->ts >= pTimeWindow->ekey) {
return FUNC_DATA_REQUIRED_NOT_LOAD;
} else {
return FUNC_DATA_REQUIRED_DATA_LOAD;
}
}
int32_t getFirstLastInfoSize(int32_t resBytes) { return sizeof(SFirstLastRes) + resBytes; } int32_t getFirstLastInfoSize(int32_t resBytes) { return sizeof(SFirstLastRes) + resBytes; }
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
......
...@@ -115,7 +115,12 @@ EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, STimeWindow* ...@@ -115,7 +115,12 @@ EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, STimeWindow*
if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) { if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
return funcMgtBuiltins[funcId].dynDataRequiredFunc(pRes, pTimeWindow);
if (funcMgtBuiltins[funcId].dynDataRequiredFunc == NULL) {
return FUNC_DATA_REQUIRED_DATA_LOAD;
} else {
return funcMgtBuiltins[funcId].dynDataRequiredFunc(pRes, pTimeWindow);
}
} }
int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) { int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
......
...@@ -54,7 +54,8 @@ print $data30 $data31 $data32 $data33 ...@@ -54,7 +54,8 @@ print $data30 $data31 $data32 $data33
if $rows != 4 then if $rows != 4 then
return -1 return -1
endi endi
if $data01 != 10 then if $data01 != 10 then
print expect 10, actual: $data01
return -1 return -1
endi endi
if $data02 != 2.00000 then if $data02 != 2.00000 then
......
...@@ -500,11 +500,12 @@ if $rows != 2 then ...@@ -500,11 +500,12 @@ if $rows != 2 then
return -1 return -1
endi endi
sql select stddev(k), stddev(b), stddev(c),tbname, a from m1 group by tbname,a sql select stddev(k), stddev(b), stddev(c),tbname, a from m1 group by tbname,a order by a asc
if $rows != 2 then if $rows != 2 then
return -1 return -1
endi endi
if $data00 != 1.414213562 then if $data00 != 1.414213562 then
print expect 1.414213562, actual: $data00
return -1 return -1
endi endi
if $data01 != 14.142135624 then if $data01 != 14.142135624 then
...@@ -732,6 +733,7 @@ if $rows != 1 then ...@@ -732,6 +733,7 @@ if $rows != 1 then
return -1 return -1
endi endi
if $data00 != 0.005633334 then if $data00 != 0.005633334 then
print expect 0.005633334, actual: $data00
return -1 return -1
endi endi
......
...@@ -681,12 +681,13 @@ if $data14 != 1 then ...@@ -681,12 +681,13 @@ if $data14 != 1 then
return -1 return -1
endi endi
sql select _wstart, irate(c), tbname, t1, t2 from st where t1=1 and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' partition by tbname, t1, t2 interval(1m) sliding(15s) order by tbname desc limit 1; sql select _wstart, irate(c), tbname, t1, t2 from st where t1=1 and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' partition by tbname, t1, t2 interval(1m) sliding(15s) order by tbname desc,_wstart asc limit 1;
if $rows != 1 then if $rows != 1 then
return -1 return -1
endi endi
if $data01 != 1.000000000 then if $data01 != 1.000000000 then
print expect 1.000000000, actual: $data01
return -1 return -1
endi endi
if $data02 != t2 then if $data02 != t2 then
......
...@@ -360,6 +360,7 @@ endi ...@@ -360,6 +360,7 @@ endi
#sql select max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9) from lm_stb0 where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-09-17 10:30:00.000' and c1 > 1 and c2 < 9 and c3 > 2 and c4 < 8 and c5 > 3 and c6 < 7 and c7 > 0 and c8 like '%5' and t1 > 3 and t1 < 6 limit 1 offset 0; #sql select max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9) from lm_stb0 where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-09-17 10:30:00.000' and c1 > 1 and c2 < 9 and c3 > 2 and c4 < 8 and c5 > 3 and c6 < 7 and c7 > 0 and c8 like '%5' and t1 > 3 and t1 < 6 limit 1 offset 0;
sql select max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9) from lm_stb0 where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-09-17 10:30:00.000' and c1 > 1 and c2 < 9 and c3 > 2 and c4 < 8 and c5 > 3 and c6 < 7 and c7 = true and c8 like '%5' and t1 > 3 and t1 < 6 limit 1 offset 0; sql select max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9) from lm_stb0 where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-09-17 10:30:00.000' and c1 > 1 and c2 < 9 and c3 > 2 and c4 < 8 and c5 > 3 and c6 < 7 and c7 = true and c8 like '%5' and t1 > 3 and t1 < 6 limit 1 offset 0;
if $rows != 1 then if $rows != 1 then
print expect 1, actual: $rows
return -1 return -1
endi endi
if $data00 != 5 then if $data00 != 5 then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册