diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h
index 236878882494264b6ba894b7a82759c639109bbb..9f7d366a4602c25a5f6d9e876ae68dceeba048ec 100644
--- a/include/libs/stream/tstream.h
+++ b/include/libs/stream/tstream.h
@@ -13,8 +13,8 @@
* along with this program. If not, see .
*/
-#include "executor.h"
#include "os.h"
+#include "executor.h"
#include "query.h"
#include "streamState.h"
#include "tdatablock.h"
@@ -104,21 +104,8 @@ typedef struct {
int8_t type;
} SStreamQueueItem;
-#if 0
-typedef struct {
- int8_t type;
- int64_t ver;
- int32_t* dataRef;
- SSubmitReq* data;
-} SStreamDataSubmit;
-
-typedef struct {
- int8_t type;
- int64_t ver;
- SArray* dataRefs; // SArray
- SArray* reqs; // SArray
-} SStreamMergedSubmit;
-#endif
+typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
+typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver);
typedef struct {
int8_t type;
@@ -220,7 +207,6 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
}
static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) {
- //
return queue->qItem;
}
@@ -249,16 +235,13 @@ typedef struct {
SUseDbRsp dbInfo;
} STaskDispatcherShuffle;
-typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
-
typedef struct {
int64_t stbUid;
char stbFullName[TSDB_TABLE_FNAME_LEN];
SSchemaWrapper* pSchemaWrapper;
- // not applicable to encoder and decoder
- void* vnode;
- FTbSink* tbSinkFunc;
- STSchema* pTSchema;
+ void* vnode; // not available to encoder and decoder
+ FTbSink* tbSinkFunc;
+ STSchema* pTSchema;
} STaskSinkTb;
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
@@ -292,14 +275,18 @@ typedef struct SCheckpointInfo {
int64_t version; // offset in WAL
} SCheckpointInfo;
+typedef struct SStreamStatus {
+ int8_t taskStatus;
+ int8_t schedStatus;
+} SStreamStatus;
+
struct SStreamTask {
SStreamId id;
int32_t totalLevel;
int8_t taskLevel;
int8_t outputType;
int16_t dispatchMsgType;
- int8_t taskStatus;
- int8_t schedStatus;
+ SStreamStatus status;
int32_t selfChildId;
int32_t nodeId;
SEpSet epSet;
@@ -329,15 +316,11 @@ struct SStreamTask {
SStreamQueue* outputQueue;
// trigger
- int8_t triggerStatus;
- int64_t triggerParam;
- void* timer;
-
- // msg handle
- SMsgCb* pMsgCb;
-
- // state backend
- SStreamState* pState;
+ int8_t triggerStatus;
+ int64_t triggerParam;
+ void* timer;
+ SMsgCb* pMsgCb; // msg handle
+ SStreamState* pState; // state backend
// the followings attributes don't be serialized
int32_t recoverTryingDownstream;
@@ -350,6 +333,21 @@ struct SStreamTask {
struct SStreamMeta* pMeta;
};
+// meta
+typedef struct SStreamMeta {
+ char* path;
+ TDB* db;
+ TTB* pTaskDb;
+ TTB* pCheckpointDb;
+ SHashObj* pTasks;
+ SHashObj* pRestoreTasks;
+ void* ahandle;
+ TXN* txn;
+ FTaskExpand* expandFunc;
+ int32_t vgId;
+ SRWLatch lock;
+} SStreamMeta;
+
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
@@ -566,42 +564,22 @@ int32_t streamAggRecoverPrepare(SStreamTask* pTask);
// int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask);
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId);
-// expand and deploy
-typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver);
-
-// meta
-typedef struct SStreamMeta {
- char* path;
- TDB* db;
- TTB* pTaskDb;
- TTB* pCheckpointDb;
- SHashObj* pTasks;
- SHashObj* pRestoreTasks;
- void* ahandle;
- TXN* txn;
- FTaskExpand* expandFunc;
- int32_t vgId;
- SRWLatch lock;
-} SStreamMeta;
-
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen);
-// SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
+SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
-SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId);
-
-int32_t streamMetaBegin(SStreamMeta* pMeta);
-int32_t streamMetaCommit(SStreamMeta* pMeta);
-int32_t streamMetaRollBack(SStreamMeta* pMeta);
-int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver);
+int32_t streamMetaBegin(SStreamMeta* pMeta);
+int32_t streamMetaCommit(SStreamMeta* pMeta);
+int32_t streamMetaRollBack(SStreamMeta* pMeta);
+int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver);
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c
index f8e4268aada287e5abe55326b686ad543ba076d8..7ccbb3b586d8734c6809631d3695efb46a03e518 100644
--- a/source/dnode/snode/src/snode.c
+++ b/source/dnode/snode/src/snode.c
@@ -65,7 +65,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
pTask->refCnt = 1;
- pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
+ pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->inputQueue = streamQueueOpen();
pTask->outputQueue = streamQueueOpen();
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index 48c9a4f445e4bd56a72ae70c82cbcd4bef8e2ec0..b029afc93554e7b08c1590119130585fbea2eb57 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -551,7 +551,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
int32_t vgId = TD_VID(pTq->pVnode);
pTask->id.idStr = taosStrdup(buf);
pTask->refCnt = 1;
- pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
+ pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->inputQueue = streamQueueOpen();
pTask->outputQueue = streamQueueOpen();
@@ -566,9 +566,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
// expand executor
if (pTask->fillHistory) {
- pTask->taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
+ pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
} else {
- pTask->taskStatus = TASK_STATUS__RESTORE;
+ pTask->status.taskStatus = TASK_STATUS__RESTORE;
}
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
@@ -661,7 +661,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
};
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
- if (pTask && atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) {
+ if (pTask && atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL) {
rsp.status = 1;
} else {
rsp.status = 0;
@@ -788,7 +788,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
// do recovery step 1
streamSourceRecoverScanStep1(pTask);
- if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
+ if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
@@ -803,7 +803,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
- if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
+ if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
return 0;
}
@@ -845,7 +845,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t
return -1;
}
- if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
+ if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
@@ -1061,9 +1061,9 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
continue;
}
- if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
+ if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId,
- pTask->taskStatus);
+ pTask->status.taskStatus);
continue;
}
@@ -1137,10 +1137,10 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
} else {
SStreamTask* pTask = streamMetaAcquireTaskEx(pTq->pStreamMeta, taskId);
if (pTask != NULL) {
- if (pTask->taskStatus == TASK_STATUS__NORMAL) {
+ if (pTask->status.taskStatus == TASK_STATUS__NORMAL) {
tqDebug("vgId:%d s-task:%s start to process run req", vgId, pTask->id.idStr);
streamProcessRunReq(pTask);
- } else if (pTask->taskStatus == TASK_STATUS__RESTORE) {
+ } else if (pTask->status.taskStatus == TASK_STATUS__RESTORE) {
tqDebug("vgId:%d s-task:%s start to process in restore procedure from last chk point:%" PRId64, vgId,
pTask->id.idStr, pTask->chkInfo.version);
streamProcessRunReq(pTask);
diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c
index a123bdb1dc88244f027c8f216fdce71394a8e568..877c686d357b5675e9c3167419e83b9f73771cbe 100644
--- a/source/dnode/vnode/src/tq/tqRestore.c
+++ b/source/dnode/vnode/src/tq/tqRestore.c
@@ -56,7 +56,7 @@ int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList) {
taosHashRemove(pStreamMeta->pRestoreTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
// NOTE: do not change the following order
- atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL);
+ atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
taosHashPut(pStreamMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES);
}
@@ -78,8 +78,8 @@ int32_t restoreStreamTaskImpl(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS
continue;
}
- if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
- tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, pTask->taskStatus);
+ if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
+ tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, pTask->status.taskStatus);
continue;
}
diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c
index 8b86cb6716cf397f28b158ed100e3d76cc4296a7..2e8c6a53bb7858df62e2a26ab07285e3a8aa0210 100644
--- a/source/dnode/vnode/src/tq/tqUtil.c
+++ b/source/dnode/vnode/src/tq/tqUtil.c
@@ -89,9 +89,9 @@ void initOffsetForAllRestoreTasks(STQ* pTq) {
continue;
}
- if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
+ if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId,
- pTask->taskStatus);
+ pTask->status.taskStatus);
continue;
}
@@ -120,9 +120,9 @@ void saveOffsetForAllTasks(STQ* pTq, int64_t ver) {
continue;
}
- if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
+ if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId,
- pTask->taskStatus);
+ pTask->status.taskStatus);
continue;
}
diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c
index 59ac8a61d6856cc95e0d332679002fef4ce9de46..71d4e5efd8c6f8abaf3c116c8e924e02243958eb 100644
--- a/source/libs/stream/src/stream.c
+++ b/source/libs/stream/src/stream.c
@@ -52,7 +52,7 @@ void streamCleanUp() {
void streamSchedByTimer(void* param, void* tmrId) {
SStreamTask* pTask = (void*)param;
- if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
+ if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
streamMetaReleaseTask(NULL, pTask);
return;
}
@@ -66,8 +66,8 @@ void streamSchedByTimer(void* param, void* tmrId) {
taosFreeQitem(trigger);
return;
}
- trigger->pBlock->info.type = STREAM_GET_ALL;
+ trigger->pBlock->info.type = STREAM_GET_ALL;
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);
if (tAppendDataForStream(pTask, (SStreamQueueItem*)trigger) < 0) {
@@ -75,6 +75,7 @@ void streamSchedByTimer(void* param, void* tmrId) {
taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
return;
}
+
streamSchedExec(pTask);
}
@@ -93,13 +94,13 @@ int32_t streamSetupTrigger(SStreamTask* pTask) {
int32_t streamSchedExec(SStreamTask* pTask) {
int8_t schedStatus =
- atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__WAITING);
+ atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__WAITING);
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
- atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE);
+ atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
return -1;
}
diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c
index 075e477eb350b96506aa1411c4ca8593e4a332aa..db9be593c065c695d423e75953625b6b14964ad1 100644
--- a/source/libs/stream/src/streamExec.c
+++ b/source/libs/stream/src/streamExec.c
@@ -22,10 +22,10 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
void* pExecutor = pTask->exec.pExecutor;
while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
- int8_t status = atomic_load_8(&pTask->taskStatus);
+ int8_t status = atomic_load_8(&pTask->status.taskStatus);
if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__RESTORE) {
qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr,
- atomic_load_8(&pTask->taskStatus));
+ atomic_load_8(&pTask->status.taskStatus));
taosMsleep(2);
} else {
break;
@@ -66,7 +66,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
// pExecutor
while (1) {
- if (pTask->taskStatus == TASK_STATUS__DROPPING) {
+ if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
return 0;
}
@@ -134,7 +134,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
int32_t batchCnt = 0;
while (1) {
- if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
+ if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
taosArrayDestroy(pRes);
return 0;
}
@@ -267,7 +267,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
}
}
- if (pTask->taskStatus == TASK_STATUS__DROPPING) {
+ if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
if (pInput) {
streamFreeQitem(pInput);
}
@@ -343,17 +343,17 @@ int32_t streamExecForAll(SStreamTask* pTask) {
int32_t streamTryExec(SStreamTask* pTask) {
// this function may be executed by multi-threads, so status check is required.
int8_t schedStatus =
- atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
+ atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
if (schedStatus == TASK_SCHED_STATUS__WAITING) {
int32_t code = streamExecForAll(pTask);
if (code < 0) {
- atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__FAILED);
+ atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
return -1;
}
// todo the task should be commit here
- atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE);
+ atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
if (!taosQueueEmpty(pTask->inputQueue->queue)) {
streamSchedExec(pTask);
diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c
index ae65753bedd2191b7df95a002e2eb5b007c8ca97..2e9bb4d762674eb006a64547406a74ca1c210560 100644
--- a/source/libs/stream/src/streamMeta.c
+++ b/source/libs/stream/src/streamMeta.c
@@ -179,23 +179,11 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) {
}
#endif
-#if 0
-SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId) {
- SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
- if (ppTask) {
- ASSERT((*ppTask)->taskId == taskId);
- return *ppTask;
- } else {
- return NULL;
- }
-}
-#endif
-
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
taosRLockLatch(&pMeta->lock);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
- if (ppTask != NULL && (atomic_load_8(&((*ppTask)->taskStatus)) != TASK_STATUS__DROPPING)) {
+ if (ppTask != NULL && (atomic_load_8(&((*ppTask)->status.taskStatus)) != TASK_STATUS__DROPPING)) {
atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
taosRUnLockLatch(&pMeta->lock);
return *ppTask;
@@ -209,7 +197,7 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
int32_t left = atomic_sub_fetch_32(&pTask->refCnt, 1);
ASSERT(left >= 0);
if (left == 0) {
- ASSERT(atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING);
+ ASSERT(atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING);
tFreeStreamTask(pTask);
}
}
@@ -223,7 +211,7 @@ SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pRestoreTasks, &taskId, sizeof(int32_t));
if (p != NULL) {
pTask = *p;
- if (pTask != NULL && (atomic_load_8(&(pTask->taskStatus)) != TASK_STATUS__DROPPING)) {
+ if (pTask != NULL && (atomic_load_8(&(pTask->status.taskStatus)) != TASK_STATUS__DROPPING)) {
atomic_add_fetch_32(&pTask->refCnt, 1);
taosRUnLockLatch(&pMeta->lock);
return pTask;
@@ -233,7 +221,7 @@ SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (p != NULL) {
pTask = *p;
- if (pTask != NULL && atomic_load_8(&(pTask->taskStatus)) != TASK_STATUS__DROPPING) {
+ if (pTask != NULL && atomic_load_8(&(pTask->status.taskStatus)) != TASK_STATUS__DROPPING) {
atomic_add_fetch_32(&pTask->refCnt, 1);
taosRUnLockLatch(&pMeta->lock);
return pTask;
@@ -255,7 +243,7 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
* taosTmrStop(pTask->timer);*/
/*pTask->timer = NULL;*/
/*}*/
- atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
+ atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
taosWLockLatch(&pMeta->lock);
streamMetaReleaseTask(pMeta, pTask);
@@ -338,9 +326,9 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
return -1;
}
- /*pTask->taskStatus = TASK_STATUS__NORMAL;*/
+ /*pTask->status.taskStatus = TASK_STATUS__NORMAL;*/
if (pTask->fillHistory) {
- pTask->taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
+ pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
streamTaskCheckDownstream(pTask, ver);
}
}
diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c
index 3e7a02b8d5a62d2412c5b6f5c10d336f54fe1b32..9962cdfcc0a1a4d5399b93d358038cd5c1f149df 100644
--- a/source/libs/stream/src/streamRecover.c
+++ b/source/libs/stream/src/streamRecover.c
@@ -18,7 +18,7 @@
int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
qDebug("s-task:%s at node %d launch recover", pTask->id.idStr, pTask->nodeId);
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
- atomic_store_8(&pTask->taskStatus, TASK_STATUS__RECOVER_PREPARE);
+ atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__RECOVER_PREPARE);
streamSetParamForRecover(pTask);
streamSourceRecoverPrepareStep1(pTask, version);
@@ -44,11 +44,11 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
}
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
- atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL);
+ atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
streamSetParamForRecover(pTask);
streamAggRecoverPrepare(pTask);
} else if (pTask->taskLevel == TASK_LEVEL__SINK) {
- atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL);
+ atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
}
return 0;
}
@@ -122,7 +122,7 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp
}
int32_t streamProcessTaskCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq) {
- return atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL;
+ return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL;
}
int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version) {
@@ -168,7 +168,7 @@ int32_t streamRestoreParam(SStreamTask* pTask) {
return qStreamRestoreParam(exec);
}
int32_t streamSetStatusNormal(SStreamTask* pTask) {
- atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL);
+ atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
return 0;
}
diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c
index f45b6ad7b72f66de9850d0998b850a975a53f44a..834c022a9ae5e817eb3c380a55d4682d51b09d59 100644
--- a/source/libs/stream/src/streamTask.c
+++ b/source/libs/stream/src/streamTask.c
@@ -29,7 +29,7 @@ SStreamTask* tNewStreamTask(int64_t streamId) {
sprintf(buf, "0x%"PRIx64"-%d", pTask->id.streamId, pTask->id.taskId);
pTask->id.idStr = taosStrdup(buf);
- pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
+ pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
@@ -63,8 +63,8 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI8(pEncoder, pTask->outputType) < 0) return -1;
if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
- if (tEncodeI8(pEncoder, pTask->taskStatus) < 0) return -1;
- if (tEncodeI8(pEncoder, pTask->schedStatus) < 0) return -1;
+ if (tEncodeI8(pEncoder, pTask->status.taskStatus) < 0) return -1;
+ if (tEncodeI8(pEncoder, pTask->status.schedStatus) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
@@ -116,8 +116,8 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI8(pDecoder, &pTask->outputType) < 0) return -1;
if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
- if (tDecodeI8(pDecoder, &pTask->taskStatus) < 0) return -1;
- if (tDecodeI8(pDecoder, &pTask->schedStatus) < 0) return -1;
+ if (tDecodeI8(pDecoder, &pTask->status.taskStatus) < 0) return -1;
+ if (tDecodeI8(pDecoder, &pTask->status.schedStatus) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;