提交 74bea443 编写于 作者: L liuyao

stream op transform

上级 aafbdcb4
......@@ -779,8 +779,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
SStreamTask* pSateTask = pTask;
SStreamTask task = {0};
if (pTask->info.fillHistory) {
pSateTask = *(SStreamTask**)taosHashGet(pTq->pStreamMeta->pTasks, &pTask->streamTaskId.taskId, sizeof(int32_t));
task.id = pTask->streamTaskId;
SStreamMeta meta = {0};
task.pMeta = pTask->pMeta;
pSateTask = &task;
}
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1);
if (pTask->pState == NULL) {
......@@ -798,8 +802,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
SStreamTask* pSateTask = pTask;
SStreamTask task = {0};
if (pTask->info.fillHistory) {
pSateTask = *(SStreamTask**)taosHashGet(pTq->pStreamMeta->pTasks, &pTask->streamTaskId.taskId, sizeof(int32_t));
task.id = pTask->streamTaskId;
SStreamMeta meta = {0};
task.pMeta = pTask->pMeta;
pSateTask = &task;
}
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1);
if (pTask->pState == NULL) {
......
......@@ -682,6 +682,8 @@ void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExpr
void doClearBufferedBlocks(SStreamScanInfo* pInfo);
uint64_t calcGroupId(char* pData, int32_t len);
void streamOpReleaseState(struct SOperatorInfo* pOperator);
void streamOpReloadState(struct SOperatorInfo* pOperator);
#ifdef __cplusplus
}
......
......@@ -1086,3 +1086,17 @@ void qStreamCloseTsdbReader(void* task) {
}
}
}
void streamOpReleaseState(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.releaseStreamStateFn) {
downstream->fpSet.releaseStreamStateFn(downstream);
}
}
void streamOpReloadState(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream);
}
}
......@@ -1560,6 +1560,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
pTaskInfo);
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL);
setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
......
......@@ -1326,6 +1326,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamHashPartition, NULL,
destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL);
setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState);
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
code = appendDownstream(pOperator, &downstream, 1);
......
......@@ -2531,12 +2531,12 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
setOperatorInfo(pOperator, STREAM_SCAN_OP_NAME, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
setOperatorStreamStateFn(pOperator, streamScanReleaseState, streamScanReloadState);
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
__optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScan : doQueueScan;
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
setOperatorStreamStateFn(pOperator, streamScanReleaseState, streamScanReloadState);
return pOperator;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册