diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h
index 00acc4741d78af3bebece1c04ed791e02b2e9fe9..23fb9d2ee571edfa9cf4845788c4ed67da302677 100644
--- a/include/libs/executor/executor.h
+++ b/include/libs/executor/executor.h
@@ -36,7 +36,6 @@ typedef struct SReadHandle {
void* vnode;
void* mnd;
SMsgCb* pMsgCb;
-// int8_t initTsdbReader;
} SReadHandle;
enum {
@@ -140,12 +139,6 @@ int32_t qKillTask(qTaskInfo_t tinfo);
*/
int32_t qAsyncKillTask(qTaskInfo_t tinfo);
-/**
- * return whether query is completed or not
- * @param tinfo
- * @return
- */
-int32_t qIsTaskCompleted(qTaskInfo_t tinfo);
/**
* destroy query info structure
@@ -176,6 +169,15 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len);
int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len);
+/**
+ * return the scan info, in the form of tuple of two items, including table uid and current timestamp
+ * @param tinfo
+ * @param uid
+ * @param ts
+ * @return
+ */
+int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts);
+
#ifdef __cplusplus
}
#endif
diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h
index 29dd2b1656b7e94955782e094f6c423d58a00002..dfc73420511d667c4ed5efc26be0377368558ff4 100644
--- a/source/libs/executor/inc/executorimpl.h
+++ b/source/libs/executor/inc/executorimpl.h
@@ -253,18 +253,15 @@ typedef struct STableScanInfo {
SReadHandle readHandle;
SFileBlockLoadRecorder readRecorder;
- int64_t numOfRows;
SScanInfo scanInfo;
int32_t scanTimes;
SNode* pFilterNode; // filter info, which is push down by optimizer
- SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context
- SResultRowInfo* pResultRowInfo;
- int32_t* rowEntryInfoOffset;
- SExprInfo* pExpr;
+ SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context,todo: remove this by using SExprSup
+ int32_t* rowEntryInfoOffset; // todo: remove this by using SExprSup
+ SExprInfo* pExpr;// todo: remove this by using SExprSup
+
SSDataBlock* pResBlock;
SArray* pColMatchInfo;
- int32_t numOfOutput;
-
SExprSupp pseudoSup;
SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
@@ -275,8 +272,13 @@ typedef struct STableScanInfo {
int32_t curTWinIdx;
int32_t currentGroupId;
- uint64_t queryId;
- uint64_t taskId;
+ uint64_t queryId; // todo remove it
+ uint64_t taskId; // todo remove it
+
+ struct {
+ uint64_t uid;
+ int64_t t;
+ } scanStatus;
} STableScanInfo;
typedef struct STagScanInfo {
@@ -321,31 +323,31 @@ typedef struct SessionWindowSupporter {
} SessionWindowSupporter;
typedef struct SStreamBlockScanInfo {
+ uint64_t tableUid; // queried super table uid
+ SExprInfo* pPseudoExpr;
+ int32_t numOfPseudoExpr;
+ int32_t primaryTsIndex; // primary time stamp slot id
+ SReadHandle readHandle;
+ SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
+ SArray* pColMatchInfo; //
+ SNode* pCondition;
+
SArray* pBlockLists; // multiple SSDatablock.
SSDataBlock* pRes; // result SSDataBlock
SSDataBlock* pUpdateRes; // update SSDataBlock
int32_t updateResIndex;
int32_t blockType; // current block type
int32_t validBlockIndex; // Is current data has returned?
- SColumnInfo* pCols; // the output column info
uint64_t numOfExec; // execution times
void* streamBlockReader;// stream block reader handle
- SArray* pColMatchInfo; //
- SNode* pCondition;
+
int32_t tsArrayIndex;
SArray* tsArray;
uint64_t groupId;
SUpdateInfo* pUpdateInfo;
- SExprInfo* pPseudoExpr;
- int32_t numOfPseudoExpr;
-
- int32_t primaryTsIndex; // primary time stamp slot id
- SReadHandle readHandle;
- uint64_t tableUid; // queried super table uid
EStreamScanMode scanMode;
SOperatorInfo* pSnapshotReadOp;
- SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
SArray* childIds;
SessionWindowSupporter sessionSup;
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
@@ -683,7 +685,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
void cleanupBasicInfo(SOptrBasicInfo* pInfo);
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
-void cleanupExprSup(SExprSupp* pSup);
+void cleanupExprSupp(SExprSupp* pSup);
int32_t initAggInfo(SExprSupp *pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey);
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
@@ -707,7 +709,7 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId);
-void cleanupExecSupp(SExprSupp* pSupp);
+int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts);
SSDataBlock* loadNextDataBlock(void* param);
diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c
index a9e1e03178b0a28780ecfdd696d44e244d2648a1..3a89f7136a7fe62489964ba8c8a5877a1bc630bc 100644
--- a/source/libs/executor/src/executorMain.c
+++ b/source/libs/executor/src/executorMain.c
@@ -191,16 +191,6 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
return TSDB_CODE_SUCCESS;
}
-int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
- SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
-
- if (pTaskInfo == NULL) {
- return TSDB_CODE_QRY_INVALID_QHANDLE;
- }
-
- return isTaskKilled(pTaskInfo);
-}
-
void qDestroyTask(qTaskInfo_t qTaskHandle) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
@@ -236,3 +226,10 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le
}
+int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts) {
+ SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo;
+
+ return TSDB_CODE_SUCCESS;
+}
+
+
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index ff3baf64bd5654e9cb979d03100190536498f8fe..b176d8e88f152d73d985975a14b367e46d0e8236 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -1034,7 +1034,7 @@ static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSData
SqlFunctionCtx* pCtx = pTableScanInfo->pCtx;
uint32_t status = BLK_DATA_NOT_LOAD;
- int32_t numOfOutput = pTableScanInfo->numOfOutput;
+ int32_t numOfOutput = 0;//pTableScanInfo->numOfOutput;
for (int32_t i = 0; i < numOfOutput; ++i) {
int32_t functionId = pCtx[i].functionId;
int32_t colId = pTableScanInfo->pExpr[i].base.pParam[0].pCol->colId;
@@ -2822,6 +2822,24 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
}
}
+int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) {
+ int32_t type = pOperator->operatorType;
+ if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
+ SStreamBlockScanInfo* pScanInfo = pOperator->info;
+ STableScanInfo* pSnapShotScanInfo = pScanInfo->pSnapshotReadOp->info;
+ *uid = pSnapShotScanInfo->scanStatus.uid;
+ *ts = pSnapShotScanInfo->scanStatus.t;
+ } else {
+ if (pOperator->pDownstream[0] == NULL) {
+ return TSDB_CODE_INVALID_PARA;
+ } else {
+ doGetScanStatus(pOperator->pDownstream[0], uid, ts);
+ }
+ }
+
+ return TSDB_CODE_SUCCESS;
+}
+
// this is a blocking operator
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
if (OPTR_IS_OPENED(pOperator)) {
@@ -3544,7 +3562,7 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pPseudoColInfo);
}
-void cleanupExecSupp(SExprSupp* pSupp) {
+void cleanupExprSupp(SExprSupp* pSupp) {
destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs);
destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
@@ -3557,7 +3575,7 @@ static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) {
taosArrayDestroy(pInfo->pPseudoColInfo);
cleanupAggSup(&pInfo->aggSup);
- cleanupExecSupp(&pInfo->scalarSup);
+ cleanupExprSupp(&pInfo->scalarSup);
}
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c
index 4e4aaba7f466595417b9878ebb367cd23ccf785e..0a14993c21233da11b9dba156215ab5ef34149b2 100644
--- a/source/libs/executor/src/groupoperator.c
+++ b/source/libs/executor/src/groupoperator.c
@@ -37,7 +37,7 @@ static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
taosMemoryFreeClear(pInfo->keyBuf);
taosArrayDestroy(pInfo->pGroupCols);
taosArrayDestroy(pInfo->pGroupColVals);
- cleanupExecSupp(&pInfo->scalarSup);
+ cleanupExprSupp(&pInfo->scalarSup);
}
static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
@@ -701,7 +701,7 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
taosHashCleanup(pInfo->pGroupSet);
taosMemoryFree(pInfo->columnOffset);
- cleanupExecSupp(&pInfo->scalarSup);
+ cleanupExprSupp(&pInfo->scalarSup);
}
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) {
diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c
index d89e6f88746c75e8797b080b4aff7ef30502d2c4..6c38e4d70c488346429435b96147eba246274d87 100644
--- a/source/libs/executor/src/scanoperator.c
+++ b/source/libs/executor/src/scanoperator.c
@@ -13,6 +13,7 @@
* along with this program. If not, see .
*/
+#include
#include
#include "filter.h"
#include "function.h"
@@ -413,6 +414,11 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime;
+
+ // todo refactor
+ pTableScanInfo->scanStatus.uid = pBlock->info.uid;
+ pTableScanInfo->scanStatus.t = pBlock->info.window.ekey;
+
return pBlock;
}
return NULL;
@@ -459,7 +465,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
int32_t total = pTableScanInfo->scanInfo.numOfAsc + pTableScanInfo->scanInfo.numOfDesc;
if (pTableScanInfo->scanTimes < total) {
if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) {
- prepareForDescendingScan(pTableScanInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput);
+ prepareForDescendingScan(pTableScanInfo, pTableScanInfo->pCtx, 0);
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
pTableScanInfo->curTWinIdx = 0;
}