提交 e8f0d4df 编写于 作者: H Haojun Liao

[td-13039] support stream execution interval query.

上级 39036ea5
......@@ -37,6 +37,11 @@ typedef struct SReadHandle {
#define STREAM_DATA_TYPE_SUBMIT_BLOCK 0x1
#define STREAM_DATA_TYPE_SSDATA_BLOCK 0x2
typedef enum {
OPTR_EXEC_MODEL_BATCH = 0x1,
OPTR_EXEC_MODEL_STREAM = 0x2,
} EOPTR_EXEC_MODEL;
/**
* Create the exec task for streaming mode
* @param pMsg
......@@ -84,7 +89,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA
* @return
*/
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle);
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model);
/**
* The main task execution function, including query on both table and multiple tables,
......
......@@ -233,8 +233,8 @@ struct SOperatorInfo;
typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char **result, int32_t *length);
typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char *result, int32_t length);
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* param);
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* param, bool* newgroup);
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr, bool* newgroup);
typedef void (*__optr_close_fn_t)(void* param, int32_t num);
typedef struct STaskIdInfo {
......@@ -255,7 +255,8 @@ typedef struct SExecTaskInfo {
uint64_t totalRows; // total number of rows
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
char* sql; // query sql string
jmp_buf env; //
jmp_buf env; // jump to this position when error happens.
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
struct SOperatorInfo* pRoot;
} SExecTaskInfo;
......@@ -466,11 +467,6 @@ typedef struct SAggSupporter {
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
} SAggSupporter;
typedef enum {
OPTR_EXEC_MODEL_BATCH = 0x1,
OPTR_EXEC_MODEL_STREAM = 0x2,
} OPTR_EXEC_MODEL;
typedef struct STableIntervalOperatorInfo {
SOptrBasicInfo binfo; // basic info
SGroupResInfo groupResInfo; // multiple results build supporter
......@@ -481,9 +477,9 @@ typedef struct STableIntervalOperatorInfo {
SAggSupporter aggSup; // aggregate supporter
STableQueryInfo *pCurrent; // current tableQueryInfo struct
int32_t order; // current SSDataBlock scan order
OPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
SArray *pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} STableIntervalOperatorInfo;
typedef struct SAggOperatorInfo {
......@@ -718,7 +714,7 @@ int32_t getMaximumIdleDurationSec();
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model);
#ifdef __cplusplus
}
......
......@@ -113,7 +113,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
}
qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL);
code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL, OPTR_EXEC_MODEL_STREAM);
if (code != TSDB_CODE_SUCCESS) {
// TODO: destroy SSubplan & pTaskInfo
terrno = code;
......
......@@ -51,11 +51,12 @@ static void freeqinfoFn(void *qhandle) {
qDestroyTask(*handle);
}
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) {
assert(readHandle != NULL && pSubplan != NULL);
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId);
int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, model);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......
......@@ -6930,6 +6930,10 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro
return NULL;
}
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
return pOperator->getStreamResFn(pOperator, newgroup);
}
pTaskInfo->code = pOperator->_openFn(pOperator);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
return NULL;
......@@ -6946,7 +6950,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
}
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator) {
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
STableIntervalOperatorInfo* pInfo = pOperator->info;
int32_t order = TSDB_ORDER_ASC;
......@@ -6964,14 +6968,14 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator) {
}
// STimeWindow win = {0};
bool newgroup = false;
*newgroup = false;
SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = NULL;
while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->getNextFn(downstream, &newgroup);
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
......@@ -7882,9 +7886,10 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
}
pInfo->order = TSDB_ORDER_ASC;
pInfo->win = pTaskInfo->window;
pInfo->interval = *pInterval;
pInfo->execModel = OPTR_EXEC_MODEL_BATCH;
pInfo->execModel = pTaskInfo->execModel;
pInfo->win = pTaskInfo->window;
pInfo->win.skey = 0;
pInfo->win.ekey = INT64_MAX;
......@@ -7909,6 +7914,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->info = pInfo;
pOperator->_openFn = doOpenIntervalAgg;
pOperator->getNextFn = doBuildIntervalResult;
pOperator->getStreamResFn= doStreamIntervalAgg;
pOperator->closeFn = destroyIntervalOperatorInfo;
code = appendDownstream(pOperator, &downstream, 1);
......@@ -8736,12 +8742,13 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
return pExprs;
}
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId) {
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model) {
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTaskInfo->cost.created = taosGetTimestampMs();
pTaskInfo->id.queryId = queryId;
pTaskInfo->execModel = model;
char* p = taosMemoryCalloc(1, 128);
snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
......@@ -9102,11 +9109,11 @@ _error:
return NULL;
}
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId) {
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model) {
uint64_t queryId = pPlan->id.queryId;
int32_t code = TSDB_CODE_SUCCESS;
*pTaskInfo = createExecTaskInfo(queryId, taskId);
*pTaskInfo = createExecTaskInfo(queryId, taskId, model);
if (*pTaskInfo == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _complete;
......
......@@ -959,7 +959,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
QW_ERR_JRET(code);
}
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle);
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册