From e8f0d4df0b220c0562e1bfa81a095b78ca125e85 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 30 Mar 2022 14:54:00 +0800 Subject: [PATCH] [td-13039] support stream execution interval query. --- include/libs/executor/executor.h | 7 ++++++- source/libs/executor/inc/executorimpl.h | 18 +++++++----------- source/libs/executor/src/executor.c | 2 +- source/libs/executor/src/executorMain.c | 5 +++-- source/libs/executor/src/executorimpl.c | 23 +++++++++++++++-------- source/libs/qworker/src/qworker.c | 2 +- 6 files changed, 33 insertions(+), 24 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index fbdb1d66b4..d3cd828cf5 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -37,6 +37,11 @@ typedef struct SReadHandle { #define STREAM_DATA_TYPE_SUBMIT_BLOCK 0x1 #define STREAM_DATA_TYPE_SSDATA_BLOCK 0x2 +typedef enum { + OPTR_EXEC_MODEL_BATCH = 0x1, + OPTR_EXEC_MODEL_STREAM = 0x2, +} EOPTR_EXEC_MODEL; + /** * Create the exec task for streaming mode * @param pMsg @@ -84,7 +89,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA * @return */ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan, - qTaskInfo_t* pTaskInfo, DataSinkHandle* handle); + qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model); /** * The main task execution function, including query on both table and multiple tables, diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index bc55f46986..1d7023930d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -233,8 +233,8 @@ struct SOperatorInfo; typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char **result, int32_t *length); typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char *result, int32_t length); -typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* param); -typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* param, bool* newgroup); +typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr); +typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr, bool* newgroup); typedef void (*__optr_close_fn_t)(void* param, int32_t num); typedef struct STaskIdInfo { @@ -255,7 +255,8 @@ typedef struct SExecTaskInfo { uint64_t totalRows; // total number of rows STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure char* sql; // query sql string - jmp_buf env; // + jmp_buf env; // jump to this position when error happens. + EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] struct SOperatorInfo* pRoot; } SExecTaskInfo; @@ -466,11 +467,6 @@ typedef struct SAggSupporter { int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row } SAggSupporter; -typedef enum { - OPTR_EXEC_MODEL_BATCH = 0x1, - OPTR_EXEC_MODEL_STREAM = 0x2, -} OPTR_EXEC_MODEL; - typedef struct STableIntervalOperatorInfo { SOptrBasicInfo binfo; // basic info SGroupResInfo groupResInfo; // multiple results build supporter @@ -481,9 +477,9 @@ typedef struct STableIntervalOperatorInfo { SAggSupporter aggSup; // aggregate supporter STableQueryInfo *pCurrent; // current tableQueryInfo struct int32_t order; // current SSDataBlock scan order - OPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] + EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] SArray *pUpdatedWindow; // updated time window due to the input data block from the downstream operator. - SColumnInfoData timeWindowData; // query time window info for scalar function execution. + SColumnInfoData timeWindowData; // query time window info for scalar function execution. } STableIntervalOperatorInfo; typedef struct SAggOperatorInfo { @@ -718,7 +714,7 @@ int32_t getMaximumIdleDurationSec(); void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); -int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId); +int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 615bcdcdbb..15203d91ca 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -113,7 +113,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { } qTaskInfo_t pTaskInfo = NULL; - code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL); + code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL, OPTR_EXEC_MODEL_STREAM); if (code != TSDB_CODE_SUCCESS) { // TODO: destroy SSubplan & pTaskInfo terrno = code; diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index cc9921ce73..6d78fa835a 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -51,11 +51,12 @@ static void freeqinfoFn(void *qhandle) { qDestroyTask(*handle); } -int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) { +int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, + qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) { assert(readHandle != NULL && pSubplan != NULL); SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; - int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId); + int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, model); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 55def79958..2df18f135c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -6930,6 +6930,10 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro return NULL; } + if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { + return pOperator->getStreamResFn(pOperator, newgroup); + } + pTaskInfo->code = pOperator->_openFn(pOperator); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { return NULL; @@ -6946,7 +6950,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; } -static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator) { +static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { STableIntervalOperatorInfo* pInfo = pOperator->info; int32_t order = TSDB_ORDER_ASC; @@ -6964,14 +6968,14 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator) { } // STimeWindow win = {0}; - bool newgroup = false; + *newgroup = false; SOperatorInfo* downstream = pOperator->pDownstream[0]; SArray* pUpdated = NULL; while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, &newgroup); + SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -7882,9 +7886,10 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* } pInfo->order = TSDB_ORDER_ASC; - pInfo->win = pTaskInfo->window; pInfo->interval = *pInterval; - pInfo->execModel = OPTR_EXEC_MODEL_BATCH; + pInfo->execModel = pTaskInfo->execModel; + + pInfo->win = pTaskInfo->window; pInfo->win.skey = 0; pInfo->win.ekey = INT64_MAX; @@ -7909,6 +7914,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->info = pInfo; pOperator->_openFn = doOpenIntervalAgg; pOperator->getNextFn = doBuildIntervalResult; + pOperator->getStreamResFn= doStreamIntervalAgg; pOperator->closeFn = destroyIntervalOperatorInfo; code = appendDownstream(pOperator, &downstream, 1); @@ -8736,12 +8742,13 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* return pExprs; } -static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId) { +static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model) { SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); pTaskInfo->cost.created = taosGetTimestampMs(); pTaskInfo->id.queryId = queryId; + pTaskInfo->execModel = model; char* p = taosMemoryCalloc(1, 128); snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId); @@ -9102,11 +9109,11 @@ _error: return NULL; } -int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId) { +int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model) { uint64_t queryId = pPlan->id.queryId; int32_t code = TSDB_CODE_SUCCESS; - *pTaskInfo = createExecTaskInfo(queryId, taskId); + *pTaskInfo = createExecTaskInfo(queryId, taskId, model); if (*pTaskInfo == NULL) { code = TSDB_CODE_QRY_OUT_OF_MEMORY; goto _complete; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 34159a1355..42d10a2361 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -959,7 +959,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { QW_ERR_JRET(code); } - code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle); + code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH); if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); QW_ERR_JRET(code); -- GitLab