提交 5ab54481 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 f083697d
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "executor.h"
#include "os.h" #include "os.h"
#include "executor.h"
#include "query.h" #include "query.h"
#include "streamState.h" #include "streamState.h"
#include "tdatablock.h" #include "tdatablock.h"
...@@ -104,21 +104,8 @@ typedef struct { ...@@ -104,21 +104,8 @@ typedef struct {
int8_t type; int8_t type;
} SStreamQueueItem; } SStreamQueueItem;
#if 0 typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
typedef struct { typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver);
int8_t type;
int64_t ver;
int32_t* dataRef;
SSubmitReq* data;
} SStreamDataSubmit;
typedef struct {
int8_t type;
int64_t ver;
SArray* dataRefs; // SArray<int32_t*>
SArray* reqs; // SArray<SSubmitReq*>
} SStreamMergedSubmit;
#endif
typedef struct { typedef struct {
int8_t type; int8_t type;
...@@ -220,7 +207,6 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) { ...@@ -220,7 +207,6 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
} }
static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) {
//
return queue->qItem; return queue->qItem;
} }
...@@ -249,16 +235,13 @@ typedef struct { ...@@ -249,16 +235,13 @@ typedef struct {
SUseDbRsp dbInfo; SUseDbRsp dbInfo;
} STaskDispatcherShuffle; } STaskDispatcherShuffle;
typedef void FTbSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
typedef struct { typedef struct {
int64_t stbUid; int64_t stbUid;
char stbFullName[TSDB_TABLE_FNAME_LEN]; char stbFullName[TSDB_TABLE_FNAME_LEN];
SSchemaWrapper* pSchemaWrapper; SSchemaWrapper* pSchemaWrapper;
// not applicable to encoder and decoder void* vnode; // not available to encoder and decoder
void* vnode; FTbSink* tbSinkFunc;
FTbSink* tbSinkFunc; STSchema* pTSchema;
STSchema* pTSchema;
} STaskSinkTb; } STaskSinkTb;
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data); typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
...@@ -292,14 +275,18 @@ typedef struct SCheckpointInfo { ...@@ -292,14 +275,18 @@ typedef struct SCheckpointInfo {
int64_t version; // offset in WAL int64_t version; // offset in WAL
} SCheckpointInfo; } SCheckpointInfo;
typedef struct SStreamStatus {
int8_t taskStatus;
int8_t schedStatus;
} SStreamStatus;
struct SStreamTask { struct SStreamTask {
SStreamId id; SStreamId id;
int32_t totalLevel; int32_t totalLevel;
int8_t taskLevel; int8_t taskLevel;
int8_t outputType; int8_t outputType;
int16_t dispatchMsgType; int16_t dispatchMsgType;
int8_t taskStatus; SStreamStatus status;
int8_t schedStatus;
int32_t selfChildId; int32_t selfChildId;
int32_t nodeId; int32_t nodeId;
SEpSet epSet; SEpSet epSet;
...@@ -329,15 +316,11 @@ struct SStreamTask { ...@@ -329,15 +316,11 @@ struct SStreamTask {
SStreamQueue* outputQueue; SStreamQueue* outputQueue;
// trigger // trigger
int8_t triggerStatus; int8_t triggerStatus;
int64_t triggerParam; int64_t triggerParam;
void* timer; void* timer;
SMsgCb* pMsgCb; // msg handle
// msg handle SStreamState* pState; // state backend
SMsgCb* pMsgCb;
// state backend
SStreamState* pState;
// the followings attributes don't be serialized // the followings attributes don't be serialized
int32_t recoverTryingDownstream; int32_t recoverTryingDownstream;
...@@ -350,6 +333,21 @@ struct SStreamTask { ...@@ -350,6 +333,21 @@ struct SStreamTask {
struct SStreamMeta* pMeta; struct SStreamMeta* pMeta;
}; };
// meta
typedef struct SStreamMeta {
char* path;
TDB* db;
TTB* pTaskDb;
TTB* pCheckpointDb;
SHashObj* pTasks;
SHashObj* pRestoreTasks;
void* ahandle;
TXN* txn;
FTaskExpand* expandFunc;
int32_t vgId;
SRWLatch lock;
} SStreamMeta;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo); int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
...@@ -566,42 +564,22 @@ int32_t streamAggRecoverPrepare(SStreamTask* pTask); ...@@ -566,42 +564,22 @@ int32_t streamAggRecoverPrepare(SStreamTask* pTask);
// int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask); // int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask);
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId); int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId);
// expand and deploy
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver);
// meta
typedef struct SStreamMeta {
char* path;
TDB* db;
TTB* pTaskDb;
TTB* pCheckpointDb;
SHashObj* pTasks;
SHashObj* pRestoreTasks;
void* ahandle;
TXN* txn;
FTaskExpand* expandFunc;
int32_t vgId;
SRWLatch lock;
} SStreamMeta;
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId); SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
void streamMetaClose(SStreamMeta* streamMeta); void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen); int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen);
// SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaRollBack(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver);
int32_t streamMetaRollBack(SStreamMeta* pMeta);
int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver);
// checkpoint // checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
......
...@@ -65,7 +65,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { ...@@ -65,7 +65,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0); ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
pTask->refCnt = 1; pTask->refCnt = 1;
pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->inputQueue = streamQueueOpen(); pTask->inputQueue = streamQueueOpen();
pTask->outputQueue = streamQueueOpen(); pTask->outputQueue = streamQueueOpen();
......
...@@ -551,7 +551,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -551,7 +551,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
pTask->id.idStr = taosStrdup(buf); pTask->id.idStr = taosStrdup(buf);
pTask->refCnt = 1; pTask->refCnt = 1;
pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->inputQueue = streamQueueOpen(); pTask->inputQueue = streamQueueOpen();
pTask->outputQueue = streamQueueOpen(); pTask->outputQueue = streamQueueOpen();
...@@ -566,9 +566,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -566,9 +566,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
// expand executor // expand executor
if (pTask->fillHistory) { if (pTask->fillHistory) {
pTask->taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
} else { } else {
pTask->taskStatus = TASK_STATUS__RESTORE; pTask->status.taskStatus = TASK_STATUS__RESTORE;
} }
if (pTask->taskLevel == TASK_LEVEL__SOURCE) { if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
...@@ -661,7 +661,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -661,7 +661,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
}; };
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask && atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) { if (pTask && atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL) {
rsp.status = 1; rsp.status = 1;
} else { } else {
rsp.status = 0; rsp.status = 0;
...@@ -788,7 +788,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { ...@@ -788,7 +788,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
// do recovery step 1 // do recovery step 1
streamSourceRecoverScanStep1(pTask); streamSourceRecoverScanStep1(pTask);
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) { if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0; return 0;
} }
...@@ -803,7 +803,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { ...@@ -803,7 +803,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) { if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
return 0; return 0;
} }
...@@ -845,7 +845,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t ...@@ -845,7 +845,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t
return -1; return -1;
} }
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) { if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0; return 0;
} }
...@@ -1061,9 +1061,9 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { ...@@ -1061,9 +1061,9 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
continue; continue;
} }
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId, tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId,
pTask->taskStatus); pTask->status.taskStatus);
continue; continue;
} }
...@@ -1137,10 +1137,10 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1137,10 +1137,10 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
} else { } else {
SStreamTask* pTask = streamMetaAcquireTaskEx(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTaskEx(pTq->pStreamMeta, taskId);
if (pTask != NULL) { if (pTask != NULL) {
if (pTask->taskStatus == TASK_STATUS__NORMAL) { if (pTask->status.taskStatus == TASK_STATUS__NORMAL) {
tqDebug("vgId:%d s-task:%s start to process run req", vgId, pTask->id.idStr); tqDebug("vgId:%d s-task:%s start to process run req", vgId, pTask->id.idStr);
streamProcessRunReq(pTask); streamProcessRunReq(pTask);
} else if (pTask->taskStatus == TASK_STATUS__RESTORE) { } else if (pTask->status.taskStatus == TASK_STATUS__RESTORE) {
tqDebug("vgId:%d s-task:%s start to process in restore procedure from last chk point:%" PRId64, vgId, tqDebug("vgId:%d s-task:%s start to process in restore procedure from last chk point:%" PRId64, vgId,
pTask->id.idStr, pTask->chkInfo.version); pTask->id.idStr, pTask->chkInfo.version);
streamProcessRunReq(pTask); streamProcessRunReq(pTask);
......
...@@ -56,7 +56,7 @@ int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList) { ...@@ -56,7 +56,7 @@ int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList) {
taosHashRemove(pStreamMeta->pRestoreTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); taosHashRemove(pStreamMeta->pRestoreTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
// NOTE: do not change the following order // NOTE: do not change the following order
atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
taosHashPut(pStreamMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES); taosHashPut(pStreamMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES);
} }
...@@ -78,8 +78,8 @@ int32_t restoreStreamTaskImpl(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS ...@@ -78,8 +78,8 @@ int32_t restoreStreamTaskImpl(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS
continue; continue;
} }
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, pTask->taskStatus); tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, pTask->status.taskStatus);
continue; continue;
} }
......
...@@ -89,9 +89,9 @@ void initOffsetForAllRestoreTasks(STQ* pTq) { ...@@ -89,9 +89,9 @@ void initOffsetForAllRestoreTasks(STQ* pTq) {
continue; continue;
} }
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId, tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId,
pTask->taskStatus); pTask->status.taskStatus);
continue; continue;
} }
...@@ -120,9 +120,9 @@ void saveOffsetForAllTasks(STQ* pTq, int64_t ver) { ...@@ -120,9 +120,9 @@ void saveOffsetForAllTasks(STQ* pTq, int64_t ver) {
continue; continue;
} }
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId, tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId,
pTask->taskStatus); pTask->status.taskStatus);
continue; continue;
} }
......
...@@ -52,7 +52,7 @@ void streamCleanUp() { ...@@ -52,7 +52,7 @@ void streamCleanUp() {
void streamSchedByTimer(void* param, void* tmrId) { void streamSchedByTimer(void* param, void* tmrId) {
SStreamTask* pTask = (void*)param; SStreamTask* pTask = (void*)param;
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) { if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
streamMetaReleaseTask(NULL, pTask); streamMetaReleaseTask(NULL, pTask);
return; return;
} }
...@@ -66,8 +66,8 @@ void streamSchedByTimer(void* param, void* tmrId) { ...@@ -66,8 +66,8 @@ void streamSchedByTimer(void* param, void* tmrId) {
taosFreeQitem(trigger); taosFreeQitem(trigger);
return; return;
} }
trigger->pBlock->info.type = STREAM_GET_ALL;
trigger->pBlock->info.type = STREAM_GET_ALL;
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE); atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE);
if (tAppendDataForStream(pTask, (SStreamQueueItem*)trigger) < 0) { if (tAppendDataForStream(pTask, (SStreamQueueItem*)trigger) < 0) {
...@@ -75,6 +75,7 @@ void streamSchedByTimer(void* param, void* tmrId) { ...@@ -75,6 +75,7 @@ void streamSchedByTimer(void* param, void* tmrId) {
taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
return; return;
} }
streamSchedExec(pTask); streamSchedExec(pTask);
} }
...@@ -93,13 +94,13 @@ int32_t streamSetupTrigger(SStreamTask* pTask) { ...@@ -93,13 +94,13 @@ int32_t streamSetupTrigger(SStreamTask* pTask) {
int32_t streamSchedExec(SStreamTask* pTask) { int32_t streamSchedExec(SStreamTask* pTask) {
int8_t schedStatus = int8_t schedStatus =
atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__WAITING); atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__WAITING);
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) { if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
return -1; return -1;
} }
......
...@@ -22,10 +22,10 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* ...@@ -22,10 +22,10 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
void* pExecutor = pTask->exec.pExecutor; void* pExecutor = pTask->exec.pExecutor;
while (pTask->taskLevel == TASK_LEVEL__SOURCE) { while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
int8_t status = atomic_load_8(&pTask->taskStatus); int8_t status = atomic_load_8(&pTask->status.taskStatus);
if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__RESTORE) { if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__RESTORE) {
qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr, qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr,
atomic_load_8(&pTask->taskStatus)); atomic_load_8(&pTask->status.taskStatus));
taosMsleep(2); taosMsleep(2);
} else { } else {
break; break;
...@@ -66,7 +66,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* ...@@ -66,7 +66,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
// pExecutor // pExecutor
while (1) { while (1) {
if (pTask->taskStatus == TASK_STATUS__DROPPING) { if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
return 0; return 0;
} }
...@@ -134,7 +134,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -134,7 +134,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
int32_t batchCnt = 0; int32_t batchCnt = 0;
while (1) { while (1) {
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) { if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
taosArrayDestroy(pRes); taosArrayDestroy(pRes);
return 0; return 0;
} }
...@@ -267,7 +267,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -267,7 +267,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
} }
} }
if (pTask->taskStatus == TASK_STATUS__DROPPING) { if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
if (pInput) { if (pInput) {
streamFreeQitem(pInput); streamFreeQitem(pInput);
} }
...@@ -343,17 +343,17 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -343,17 +343,17 @@ int32_t streamExecForAll(SStreamTask* pTask) {
int32_t streamTryExec(SStreamTask* pTask) { int32_t streamTryExec(SStreamTask* pTask) {
// this function may be executed by multi-threads, so status check is required. // this function may be executed by multi-threads, so status check is required.
int8_t schedStatus = int8_t schedStatus =
atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE); atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
if (schedStatus == TASK_SCHED_STATUS__WAITING) { if (schedStatus == TASK_SCHED_STATUS__WAITING) {
int32_t code = streamExecForAll(pTask); int32_t code = streamExecForAll(pTask);
if (code < 0) { if (code < 0) {
atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__FAILED); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED);
return -1; return -1;
} }
// todo the task should be commit here // todo the task should be commit here
atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
if (!taosQueueEmpty(pTask->inputQueue->queue)) { if (!taosQueueEmpty(pTask->inputQueue->queue)) {
streamSchedExec(pTask); streamSchedExec(pTask);
......
...@@ -179,23 +179,11 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { ...@@ -179,23 +179,11 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) {
} }
#endif #endif
#if 0
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) {
ASSERT((*ppTask)->taskId == taskId);
return *ppTask;
} else {
return NULL;
}
}
#endif
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
taosRLockLatch(&pMeta->lock); taosRLockLatch(&pMeta->lock);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask != NULL && (atomic_load_8(&((*ppTask)->taskStatus)) != TASK_STATUS__DROPPING)) { if (ppTask != NULL && (atomic_load_8(&((*ppTask)->status.taskStatus)) != TASK_STATUS__DROPPING)) {
atomic_add_fetch_32(&(*ppTask)->refCnt, 1); atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
taosRUnLockLatch(&pMeta->lock); taosRUnLockLatch(&pMeta->lock);
return *ppTask; return *ppTask;
...@@ -209,7 +197,7 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { ...@@ -209,7 +197,7 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
int32_t left = atomic_sub_fetch_32(&pTask->refCnt, 1); int32_t left = atomic_sub_fetch_32(&pTask->refCnt, 1);
ASSERT(left >= 0); ASSERT(left >= 0);
if (left == 0) { if (left == 0) {
ASSERT(atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING); ASSERT(atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING);
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
} }
} }
...@@ -223,7 +211,7 @@ SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId) { ...@@ -223,7 +211,7 @@ SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pRestoreTasks, &taskId, sizeof(int32_t)); SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pRestoreTasks, &taskId, sizeof(int32_t));
if (p != NULL) { if (p != NULL) {
pTask = *p; pTask = *p;
if (pTask != NULL && (atomic_load_8(&(pTask->taskStatus)) != TASK_STATUS__DROPPING)) { if (pTask != NULL && (atomic_load_8(&(pTask->status.taskStatus)) != TASK_STATUS__DROPPING)) {
atomic_add_fetch_32(&pTask->refCnt, 1); atomic_add_fetch_32(&pTask->refCnt, 1);
taosRUnLockLatch(&pMeta->lock); taosRUnLockLatch(&pMeta->lock);
return pTask; return pTask;
...@@ -233,7 +221,7 @@ SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId) { ...@@ -233,7 +221,7 @@ SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (p != NULL) { if (p != NULL) {
pTask = *p; pTask = *p;
if (pTask != NULL && atomic_load_8(&(pTask->taskStatus)) != TASK_STATUS__DROPPING) { if (pTask != NULL && atomic_load_8(&(pTask->status.taskStatus)) != TASK_STATUS__DROPPING) {
atomic_add_fetch_32(&pTask->refCnt, 1); atomic_add_fetch_32(&pTask->refCnt, 1);
taosRUnLockLatch(&pMeta->lock); taosRUnLockLatch(&pMeta->lock);
return pTask; return pTask;
...@@ -255,7 +243,7 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { ...@@ -255,7 +243,7 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
* taosTmrStop(pTask->timer);*/ * taosTmrStop(pTask->timer);*/
/*pTask->timer = NULL;*/ /*pTask->timer = NULL;*/
/*}*/ /*}*/
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
...@@ -338,9 +326,9 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { ...@@ -338,9 +326,9 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
return -1; return -1;
} }
/*pTask->taskStatus = TASK_STATUS__NORMAL;*/ /*pTask->status.taskStatus = TASK_STATUS__NORMAL;*/
if (pTask->fillHistory) { if (pTask->fillHistory) {
pTask->taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
streamTaskCheckDownstream(pTask, ver); streamTaskCheckDownstream(pTask, ver);
} }
} }
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
qDebug("s-task:%s at node %d launch recover", pTask->id.idStr, pTask->nodeId); qDebug("s-task:%s at node %d launch recover", pTask->id.idStr, pTask->nodeId);
if (pTask->taskLevel == TASK_LEVEL__SOURCE) { if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
atomic_store_8(&pTask->taskStatus, TASK_STATUS__RECOVER_PREPARE); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__RECOVER_PREPARE);
streamSetParamForRecover(pTask); streamSetParamForRecover(pTask);
streamSourceRecoverPrepareStep1(pTask, version); streamSourceRecoverPrepareStep1(pTask, version);
...@@ -44,11 +44,11 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { ...@@ -44,11 +44,11 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
} }
} else if (pTask->taskLevel == TASK_LEVEL__AGG) { } else if (pTask->taskLevel == TASK_LEVEL__AGG) {
atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
streamSetParamForRecover(pTask); streamSetParamForRecover(pTask);
streamAggRecoverPrepare(pTask); streamAggRecoverPrepare(pTask);
} else if (pTask->taskLevel == TASK_LEVEL__SINK) { } else if (pTask->taskLevel == TASK_LEVEL__SINK) {
atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
} }
return 0; return 0;
} }
...@@ -122,7 +122,7 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp ...@@ -122,7 +122,7 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp
} }
int32_t streamProcessTaskCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq) { int32_t streamProcessTaskCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq) {
return atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL; return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL;
} }
int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version) { int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version) {
...@@ -168,7 +168,7 @@ int32_t streamRestoreParam(SStreamTask* pTask) { ...@@ -168,7 +168,7 @@ int32_t streamRestoreParam(SStreamTask* pTask) {
return qStreamRestoreParam(exec); return qStreamRestoreParam(exec);
} }
int32_t streamSetStatusNormal(SStreamTask* pTask) { int32_t streamSetStatusNormal(SStreamTask* pTask) {
atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
return 0; return 0;
} }
......
...@@ -29,7 +29,7 @@ SStreamTask* tNewStreamTask(int64_t streamId) { ...@@ -29,7 +29,7 @@ SStreamTask* tNewStreamTask(int64_t streamId) {
sprintf(buf, "0x%"PRIx64"-%d", pTask->id.streamId, pTask->id.taskId); sprintf(buf, "0x%"PRIx64"-%d", pTask->id.streamId, pTask->id.taskId);
pTask->id.idStr = taosStrdup(buf); pTask->id.idStr = taosStrdup(buf);
pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
...@@ -63,8 +63,8 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { ...@@ -63,8 +63,8 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI8(pEncoder, pTask->outputType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->outputType) < 0) return -1;
if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1; if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->taskStatus) < 0) return -1; if (tEncodeI8(pEncoder, pTask->status.taskStatus) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->schedStatus) < 0) return -1; if (tEncodeI8(pEncoder, pTask->status.schedStatus) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
...@@ -116,8 +116,8 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { ...@@ -116,8 +116,8 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI8(pDecoder, &pTask->outputType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->outputType) < 0) return -1;
if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1; if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->taskStatus) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->status.taskStatus) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->schedStatus) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->status.schedStatus) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册