From 5ab54481f0d8b974f60d549a8bce9124a3c8223b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 11 Apr 2023 19:24:34 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 96 ++++++++++---------------- source/dnode/snode/src/snode.c | 2 +- source/dnode/vnode/src/tq/tq.c | 22 +++--- source/dnode/vnode/src/tq/tqRestore.c | 6 +- source/dnode/vnode/src/tq/tqUtil.c | 8 +-- source/libs/stream/src/stream.c | 9 +-- source/libs/stream/src/streamExec.c | 16 ++--- source/libs/stream/src/streamMeta.c | 26 ++----- source/libs/stream/src/streamRecover.c | 10 +-- source/libs/stream/src/streamTask.c | 10 +-- 10 files changed, 86 insertions(+), 119 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 2368788824..9f7d366a46 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#include "executor.h" #include "os.h" +#include "executor.h" #include "query.h" #include "streamState.h" #include "tdatablock.h" @@ -104,21 +104,8 @@ typedef struct { int8_t type; } SStreamQueueItem; -#if 0 -typedef struct { - int8_t type; - int64_t ver; - int32_t* dataRef; - SSubmitReq* data; -} SStreamDataSubmit; - -typedef struct { - int8_t type; - int64_t ver; - SArray* dataRefs; // SArray - SArray* reqs; // SArray -} SStreamMergedSubmit; -#endif +typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data); +typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver); typedef struct { int8_t type; @@ -220,7 +207,6 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) { } static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { - // return queue->qItem; } @@ -249,16 +235,13 @@ typedef struct { SUseDbRsp dbInfo; } STaskDispatcherShuffle; -typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data); - typedef struct { int64_t stbUid; char stbFullName[TSDB_TABLE_FNAME_LEN]; SSchemaWrapper* pSchemaWrapper; - // not applicable to encoder and decoder - void* vnode; - FTbSink* tbSinkFunc; - STSchema* pTSchema; + void* vnode; // not available to encoder and decoder + FTbSink* tbSinkFunc; + STSchema* pTSchema; } STaskSinkTb; typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data); @@ -292,14 +275,18 @@ typedef struct SCheckpointInfo { int64_t version; // offset in WAL } SCheckpointInfo; +typedef struct SStreamStatus { + int8_t taskStatus; + int8_t schedStatus; +} SStreamStatus; + struct SStreamTask { SStreamId id; int32_t totalLevel; int8_t taskLevel; int8_t outputType; int16_t dispatchMsgType; - int8_t taskStatus; - int8_t schedStatus; + SStreamStatus status; int32_t selfChildId; int32_t nodeId; SEpSet epSet; @@ -329,15 +316,11 @@ struct SStreamTask { SStreamQueue* outputQueue; // trigger - int8_t triggerStatus; - int64_t triggerParam; - void* timer; - - // msg handle - SMsgCb* pMsgCb; - - // state backend - SStreamState* pState; + int8_t triggerStatus; + int64_t triggerParam; + void* timer; + SMsgCb* pMsgCb; // msg handle + SStreamState* pState; // state backend // the followings attributes don't be serialized int32_t recoverTryingDownstream; @@ -350,6 +333,21 @@ struct SStreamTask { struct SStreamMeta* pMeta; }; +// meta +typedef struct SStreamMeta { + char* path; + TDB* db; + TTB* pTaskDb; + TTB* pCheckpointDb; + SHashObj* pTasks; + SHashObj* pRestoreTasks; + void* ahandle; + TXN* txn; + FTaskExpand* expandFunc; + int32_t vgId; + SRWLatch lock; +} SStreamMeta; + int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo); @@ -566,42 +564,22 @@ int32_t streamAggRecoverPrepare(SStreamTask* pTask); // int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask); int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId); -// expand and deploy -typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver); - -// meta -typedef struct SStreamMeta { - char* path; - TDB* db; - TTB* pTaskDb; - TTB* pCheckpointDb; - SHashObj* pTasks; - SHashObj* pRestoreTasks; - void* ahandle; - TXN* txn; - FTaskExpand* expandFunc; - int32_t vgId; - SRWLatch lock; -} SStreamMeta; - SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId); void streamMetaClose(SStreamMeta* streamMeta); int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen); -// SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId); +SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); -SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId); - -int32_t streamMetaBegin(SStreamMeta* pMeta); -int32_t streamMetaCommit(SStreamMeta* pMeta); -int32_t streamMetaRollBack(SStreamMeta* pMeta); -int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver); +int32_t streamMetaBegin(SStreamMeta* pMeta); +int32_t streamMetaCommit(SStreamMeta* pMeta); +int32_t streamMetaRollBack(SStreamMeta* pMeta); +int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index f8e4268aad..7ccbb3b586 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -65,7 +65,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0); pTask->refCnt = 1; - pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE; + pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->inputQueue = streamQueueOpen(); pTask->outputQueue = streamQueueOpen(); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 48c9a4f445..b029afc935 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -551,7 +551,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t vgId = TD_VID(pTq->pVnode); pTask->id.idStr = taosStrdup(buf); pTask->refCnt = 1; - pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE; + pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->inputQueue = streamQueueOpen(); pTask->outputQueue = streamQueueOpen(); @@ -566,9 +566,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { // expand executor if (pTask->fillHistory) { - pTask->taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; + pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; } else { - pTask->taskStatus = TASK_STATUS__RESTORE; + pTask->status.taskStatus = TASK_STATUS__RESTORE; } if (pTask->taskLevel == TASK_LEVEL__SOURCE) { @@ -661,7 +661,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { }; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); - if (pTask && atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) { + if (pTask && atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL) { rsp.status = 1; } else { rsp.status = 0; @@ -788,7 +788,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { // do recovery step 1 streamSourceRecoverScanStep1(pTask); - if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) { + if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } @@ -803,7 +803,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { streamMetaReleaseTask(pTq->pStreamMeta, pTask); - if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) { + if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { return 0; } @@ -845,7 +845,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t return -1; } - if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) { + if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } @@ -1061,9 +1061,9 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { continue; } - if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { + if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId, - pTask->taskStatus); + pTask->status.taskStatus); continue; } @@ -1137,10 +1137,10 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { } else { SStreamTask* pTask = streamMetaAcquireTaskEx(pTq->pStreamMeta, taskId); if (pTask != NULL) { - if (pTask->taskStatus == TASK_STATUS__NORMAL) { + if (pTask->status.taskStatus == TASK_STATUS__NORMAL) { tqDebug("vgId:%d s-task:%s start to process run req", vgId, pTask->id.idStr); streamProcessRunReq(pTask); - } else if (pTask->taskStatus == TASK_STATUS__RESTORE) { + } else if (pTask->status.taskStatus == TASK_STATUS__RESTORE) { tqDebug("vgId:%d s-task:%s start to process in restore procedure from last chk point:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.version); streamProcessRunReq(pTask); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index a123bdb1dc..877c686d35 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -56,7 +56,7 @@ int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList) { taosHashRemove(pStreamMeta->pRestoreTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); // NOTE: do not change the following order - atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL); + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); taosHashPut(pStreamMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES); } @@ -78,8 +78,8 @@ int32_t restoreStreamTaskImpl(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS continue; } - if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { - tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, pTask->taskStatus); + if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { + tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, pTask->status.taskStatus); continue; } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 8b86cb6716..2e8c6a53bb 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -89,9 +89,9 @@ void initOffsetForAllRestoreTasks(STQ* pTq) { continue; } - if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { + if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId, - pTask->taskStatus); + pTask->status.taskStatus); continue; } @@ -120,9 +120,9 @@ void saveOffsetForAllTasks(STQ* pTq, int64_t ver) { continue; } - if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { + if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId, - pTask->taskStatus); + pTask->status.taskStatus); continue; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 59ac8a61d6..71d4e5efd8 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -52,7 +52,7 @@ void streamCleanUp() { void streamSchedByTimer(void* param, void* tmrId) { SStreamTask* pTask = (void*)param; - if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) { + if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { streamMetaReleaseTask(NULL, pTask); return; } @@ -66,8 +66,8 @@ void streamSchedByTimer(void* param, void* tmrId) { taosFreeQitem(trigger); return; } - trigger->pBlock->info.type = STREAM_GET_ALL; + trigger->pBlock->info.type = STREAM_GET_ALL; atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE); if (tAppendDataForStream(pTask, (SStreamQueueItem*)trigger) < 0) { @@ -75,6 +75,7 @@ void streamSchedByTimer(void* param, void* tmrId) { taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); return; } + streamSchedExec(pTask); } @@ -93,13 +94,13 @@ int32_t streamSetupTrigger(SStreamTask* pTask) { int32_t streamSchedExec(SStreamTask* pTask) { int8_t schedStatus = - atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__WAITING); + atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__WAITING); if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); if (pRunReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE); + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); return -1; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 075e477eb3..db9be593c0 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -22,10 +22,10 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* void* pExecutor = pTask->exec.pExecutor; while (pTask->taskLevel == TASK_LEVEL__SOURCE) { - int8_t status = atomic_load_8(&pTask->taskStatus); + int8_t status = atomic_load_8(&pTask->status.taskStatus); if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__RESTORE) { qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr, - atomic_load_8(&pTask->taskStatus)); + atomic_load_8(&pTask->status.taskStatus)); taosMsleep(2); } else { break; @@ -66,7 +66,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* // pExecutor while (1) { - if (pTask->taskStatus == TASK_STATUS__DROPPING) { + if (pTask->status.taskStatus == TASK_STATUS__DROPPING) { return 0; } @@ -134,7 +134,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { int32_t batchCnt = 0; while (1) { - if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) { + if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { taosArrayDestroy(pRes); return 0; } @@ -267,7 +267,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { } } - if (pTask->taskStatus == TASK_STATUS__DROPPING) { + if (pTask->status.taskStatus == TASK_STATUS__DROPPING) { if (pInput) { streamFreeQitem(pInput); } @@ -343,17 +343,17 @@ int32_t streamExecForAll(SStreamTask* pTask) { int32_t streamTryExec(SStreamTask* pTask) { // this function may be executed by multi-threads, so status check is required. int8_t schedStatus = - atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE); + atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE); if (schedStatus == TASK_SCHED_STATUS__WAITING) { int32_t code = streamExecForAll(pTask); if (code < 0) { - atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__FAILED); + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED); return -1; } // todo the task should be commit here - atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE); + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); if (!taosQueueEmpty(pTask->inputQueue->queue)) { streamSchedExec(pTask); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ae65753bed..2e9bb4d762 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -179,23 +179,11 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { } #endif -#if 0 -SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId) { - SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); - if (ppTask) { - ASSERT((*ppTask)->taskId == taskId); - return *ppTask; - } else { - return NULL; - } -} -#endif - SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { taosRLockLatch(&pMeta->lock); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); - if (ppTask != NULL && (atomic_load_8(&((*ppTask)->taskStatus)) != TASK_STATUS__DROPPING)) { + if (ppTask != NULL && (atomic_load_8(&((*ppTask)->status.taskStatus)) != TASK_STATUS__DROPPING)) { atomic_add_fetch_32(&(*ppTask)->refCnt, 1); taosRUnLockLatch(&pMeta->lock); return *ppTask; @@ -209,7 +197,7 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { int32_t left = atomic_sub_fetch_32(&pTask->refCnt, 1); ASSERT(left >= 0); if (left == 0) { - ASSERT(atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING); + ASSERT(atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING); tFreeStreamTask(pTask); } } @@ -223,7 +211,7 @@ SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId) { SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pRestoreTasks, &taskId, sizeof(int32_t)); if (p != NULL) { pTask = *p; - if (pTask != NULL && (atomic_load_8(&(pTask->taskStatus)) != TASK_STATUS__DROPPING)) { + if (pTask != NULL && (atomic_load_8(&(pTask->status.taskStatus)) != TASK_STATUS__DROPPING)) { atomic_add_fetch_32(&pTask->refCnt, 1); taosRUnLockLatch(&pMeta->lock); return pTask; @@ -233,7 +221,7 @@ SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId) { SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); if (p != NULL) { pTask = *p; - if (pTask != NULL && atomic_load_8(&(pTask->taskStatus)) != TASK_STATUS__DROPPING) { + if (pTask != NULL && atomic_load_8(&(pTask->status.taskStatus)) != TASK_STATUS__DROPPING) { atomic_add_fetch_32(&pTask->refCnt, 1); taosRUnLockLatch(&pMeta->lock); return pTask; @@ -255,7 +243,7 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { * taosTmrStop(pTask->timer);*/ /*pTask->timer = NULL;*/ /*}*/ - atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING); + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); taosWLockLatch(&pMeta->lock); streamMetaReleaseTask(pMeta, pTask); @@ -338,9 +326,9 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { return -1; } - /*pTask->taskStatus = TASK_STATUS__NORMAL;*/ + /*pTask->status.taskStatus = TASK_STATUS__NORMAL;*/ if (pTask->fillHistory) { - pTask->taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; + pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; streamTaskCheckDownstream(pTask, ver); } } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 3e7a02b8d5..9962cdfcc0 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -18,7 +18,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { qDebug("s-task:%s at node %d launch recover", pTask->id.idStr, pTask->nodeId); if (pTask->taskLevel == TASK_LEVEL__SOURCE) { - atomic_store_8(&pTask->taskStatus, TASK_STATUS__RECOVER_PREPARE); + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__RECOVER_PREPARE); streamSetParamForRecover(pTask); streamSourceRecoverPrepareStep1(pTask, version); @@ -44,11 +44,11 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { } } else if (pTask->taskLevel == TASK_LEVEL__AGG) { - atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL); + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); streamSetParamForRecover(pTask); streamAggRecoverPrepare(pTask); } else if (pTask->taskLevel == TASK_LEVEL__SINK) { - atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL); + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); } return 0; } @@ -122,7 +122,7 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp } int32_t streamProcessTaskCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq) { - return atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL; + return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL; } int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version) { @@ -168,7 +168,7 @@ int32_t streamRestoreParam(SStreamTask* pTask) { return qStreamRestoreParam(exec); } int32_t streamSetStatusNormal(SStreamTask* pTask) { - atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL); + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); return 0; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index f45b6ad7b7..834c022a9a 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -29,7 +29,7 @@ SStreamTask* tNewStreamTask(int64_t streamId) { sprintf(buf, "0x%"PRIx64"-%d", pTask->id.streamId, pTask->id.taskId); pTask->id.idStr = taosStrdup(buf); - pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE; + pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; @@ -63,8 +63,8 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI8(pEncoder, pTask->outputType) < 0) return -1; if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->taskStatus) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->schedStatus) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->status.taskStatus) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->status.schedStatus) < 0) return -1; if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1; @@ -116,8 +116,8 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI8(pDecoder, &pTask->outputType) < 0) return -1; if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->taskStatus) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->schedStatus) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->status.taskStatus) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->status.schedStatus) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1; -- GitLab