提交 c409760f 编写于 作者: L Liu Jicong

feat(stream): drop stream

上级 7571d9d9
...@@ -187,7 +187,6 @@ enum { ...@@ -187,7 +187,6 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_DISPATCH_WRITE, "vnode-stream-task-dispatch-write", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_DISPATCH_WRITE, "vnode-stream-task-dispatch-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_DROP, "vnode-stream-task-drop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
...@@ -204,6 +203,7 @@ enum { ...@@ -204,6 +203,7 @@ enum {
//shared by snode and vnode //shared by snode and vnode
TD_NEW_MSG_SEG(TDMT_STREAM_MSG) TD_NEW_MSG_SEG(TDMT_STREAM_MSG)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DROP, "stream-task-drop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RUN, "stream-task-run", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RUN, "stream-task-run", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RECOVER, "stream-task-recover", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RECOVER, "stream-task-recover", NULL, NULL)
......
...@@ -30,9 +30,14 @@ extern "C" { ...@@ -30,9 +30,14 @@ extern "C" {
typedef struct SStreamTask SStreamTask; typedef struct SStreamTask SStreamTask;
enum { enum {
TASK_STATUS__IDLE = 1, TASK_STATUS__NORMAL = 0,
TASK_STATUS__EXECUTING, TASK_STATUS__DROPPING,
TASK_STATUS__CLOSING, };
enum {
TASK_EXEC_STATUS__IDLE = 1,
TASK_EXEC_STATUS__EXECUTING,
TASK_EXEC_STATUS__CLOSING,
}; };
enum { enum {
...@@ -50,16 +55,12 @@ enum { ...@@ -50,16 +55,12 @@ enum {
TASK_OUTPUT_STATUS__BLOCKED, TASK_OUTPUT_STATUS__BLOCKED,
}; };
enum {
STREAM_CREATED_BY__USER = 1,
STREAM_CREATED_BY__SMA,
};
enum { enum {
STREAM_INPUT__DATA_SUBMIT = 1, STREAM_INPUT__DATA_SUBMIT = 1,
STREAM_INPUT__DATA_BLOCK, STREAM_INPUT__DATA_BLOCK,
STREAM_INPUT__TRIGGER, STREAM_INPUT__TRIGGER,
STREAM_INPUT__CHECKPOINT, STREAM_INPUT__CHECKPOINT,
STREAM_INPUT__DROP,
}; };
typedef struct { typedef struct {
...@@ -237,7 +238,9 @@ struct SStreamTask { ...@@ -237,7 +238,9 @@ struct SStreamTask {
int64_t streamId; int64_t streamId;
int32_t taskId; int32_t taskId;
int8_t inputType; int8_t inputType;
int8_t status; int8_t taskStatus;
int8_t execStatus;
int8_t execType; int8_t execType;
int8_t sinkType; int8_t sinkType;
......
...@@ -216,7 +216,7 @@ SArray *mmGetMsgHandles() { ...@@ -216,7 +216,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
...@@ -95,7 +95,7 @@ SArray *smGetMsgHandles() { ...@@ -95,7 +95,7 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MON_SM_INFO, smPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MON_SM_INFO, smPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToSharedQueue, 1) == NULL) goto _OVER;
......
...@@ -351,7 +351,7 @@ SArray *vmGetMsgHandles() { ...@@ -351,7 +351,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
......
...@@ -54,9 +54,10 @@ int32_t mndInitStream(SMnode *pMnode) { ...@@ -54,9 +54,10 @@ int32_t mndInitStream(SMnode *pMnode) {
}; };
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
...@@ -477,7 +478,7 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { ...@@ -477,7 +478,7 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet)); memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
action.pCont = pReq; action.pCont = pReq;
action.contLen = sizeof(SVDropStreamTaskReq); action.contLen = sizeof(SVDropStreamTaskReq);
action.msgType = TDMT_VND_STREAM_TASK_DROP; action.msgType = TDMT_STREAM_TASK_DROP;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
return -1; return -1;
...@@ -670,20 +671,24 @@ _OVER: ...@@ -670,20 +671,24 @@ _OVER:
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
int32_t code = -1;
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
/*SDbObj *pDb = NULL;*/ /*SDbObj *pDb = NULL;*/
/*SUserObj *pUser = NULL;*/ /*SUserObj *pUser = NULL;*/
SMDropStreamReq dropReq = *(SMDropStreamReq *)pReq->pCont; SMDropStreamReq dropReq = {0};
if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
ASSERT(0);
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
pStream = mndAcquireStream(pMnode, dropReq.name); pStream = mndAcquireStream(pMnode, dropReq.name);
if (pStream == NULL) { if (pStream == NULL) {
if (dropReq.igNotExists) { if (dropReq.igNotExists) {
mDebug("stream:%s, not exist, ignore not exist is set", dropReq.name); mDebug("stream:%s, not exist, ignore not exist is set", dropReq.name);
code = 0; sdbRelease(pMnode->pSdb, pStream);
goto DROP_STREAM_OVER; return -1;
} else { } else {
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
return -1; return -1;
...@@ -701,14 +706,16 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { ...@@ -701,14 +706,16 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr()); mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
return code; sdbRelease(pMnode->pSdb, pStream);
return -1;
} }
mDebug("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name); mDebug("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name);
// drop all tasks // drop all tasks
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr()); mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
return code; sdbRelease(pMnode->pSdb, pStream);
return -1;
} }
// drop stream // drop stream
...@@ -717,8 +724,16 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { ...@@ -717,8 +724,16 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
return -1; return -1;
} }
DROP_STREAM_OVER: if (mndTransPrepare(pMnode, pTrans) != 0) {
return code; mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
sdbRelease(pMnode->pSdb, pStream);
return TSDB_CODE_ACTION_IN_PROGRESS;
} }
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
......
...@@ -92,7 +92,7 @@ static int32_t sndProcessTaskDeployReq(SSnode *pNode, SRpcMsg *pMsg) { ...@@ -92,7 +92,7 @@ static int32_t sndProcessTaskDeployReq(SSnode *pNode, SRpcMsg *pMsg) {
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
pTask->status = TASK_STATUS__IDLE; pTask->execStatus = TASK_EXEC_STATUS__IDLE;
pTask->inputQueue = streamQueueOpen(); pTask->inputQueue = streamQueueOpen();
pTask->outputQueue = streamQueueOpen(); pTask->outputQueue = streamQueueOpen();
...@@ -205,7 +205,7 @@ int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -205,7 +205,7 @@ int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_STREAM_TASK_DEPLOY: case TDMT_STREAM_TASK_DEPLOY:
return sndProcessTaskDeployReq(pSnode, pMsg); return sndProcessTaskDeployReq(pSnode, pMsg);
case TDMT_VND_STREAM_TASK_DROP: case TDMT_STREAM_TASK_DROP:
return sndProcessTaskDropReq(pSnode, pMsg); return sndProcessTaskDropReq(pSnode, pMsg);
default: default:
ASSERT(0); ASSERT(0);
......
...@@ -384,7 +384,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -384,7 +384,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
pTask->status = TASK_STATUS__IDLE; pTask->execStatus = TASK_EXEC_STATUS__IDLE;
pTask->inputQueue = streamQueueOpen(); pTask->inputQueue = streamQueueOpen();
pTask->outputQueue = streamQueueOpen(); pTask->outputQueue = streamQueueOpen();
...@@ -459,6 +459,9 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) { ...@@ -459,6 +459,9 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter); pIter = taosHashIterate(pTq->pStreamTasks, pIter);
if (pIter == NULL) break; if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter; SStreamTask* pTask = *(SStreamTask**)pIter;
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
continue;
}
if (pTask->inputType != STREAM_INPUT__DATA_SUBMIT) continue; if (pTask->inputType != STREAM_INPUT__DATA_SUBMIT) continue;
if (!failed) { if (!failed) {
...@@ -487,6 +490,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -487,6 +490,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRunReq* pReq = pMsg->pCont; SStreamTaskRunReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId; int32_t taskId = pReq->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) {
return 0;
}
streamProcessRunReq(pTask); streamProcessRunReq(pTask);
return 0; return 0;
} }
...@@ -501,9 +507,12 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -501,9 +507,12 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
tDecodeStreamDispatchReq(&decoder, &req); tDecodeStreamDispatchReq(&decoder, &req);
int32_t taskId = req.taskId; int32_t taskId = req.taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
SRpcMsg rsp = { if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) {
.info = pMsg->info, return 0;
.code = 0, }
SRpcMsg rsp = {
.info = pMsg->info,
.code = 0,
}; };
streamProcessDispatchReq(pTask, &req, &rsp); streamProcessDispatchReq(pTask, &req, &rsp);
return 0; return 0;
...@@ -513,6 +522,9 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -513,6 +522,9 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverReq* pReq = pMsg->pCont; SStreamTaskRecoverReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId; int32_t taskId = pReq->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) {
return 0;
}
streamProcessRecoverReq(pTask, pReq, pMsg); streamProcessRecoverReq(pTask, pReq, pMsg);
return 0; return 0;
} }
...@@ -521,6 +533,9 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { ...@@ -521,6 +533,9 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = pRsp->taskId; int32_t taskId = pRsp->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) {
return 0;
}
streamProcessDispatchRsp(pTask, pRsp); streamProcessDispatchRsp(pTask, pRsp);
return 0; return 0;
} }
...@@ -529,16 +544,32 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) { ...@@ -529,16 +544,32 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverRsp* pRsp = pMsg->pCont; SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
int32_t taskId = pRsp->taskId; int32_t taskId = pRsp->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) {
return 0;
}
streamProcessRecoverRsp(pTask, pRsp); streamProcessRecoverRsp(pTask, pRsp);
return 0; return 0;
} }
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
// todo
// clear queue
// push drop req into queue
// launch exec to free memory
// remove from hash
return 0;
#if 0
int32_t code = taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t)); int32_t code = taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
// set status dropping
ASSERT(code == 0); ASSERT(code == 0);
if (code == 0) { if (code == 0) {
// sendrsp // sendrsp
} }
return code; return code;
#endif
} }
...@@ -45,9 +45,9 @@ static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubm ...@@ -45,9 +45,9 @@ static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubm
int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) { int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) {
SMqDataBlkRsp rsp = {0}; SMqDataBlkRsp rsp = {0};
// 1. guard and set status executing // 1. guard and set status executing
int8_t execStatus = int8_t execStatus = atomic_val_compare_exchange_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE,
atomic_val_compare_exchange_8(&pHandle->pushHandle.execStatus, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING); TASK_EXEC_STATUS__EXECUTING);
if (execStatus == TASK_STATUS__IDLE) { if (execStatus == TASK_EXEC_STATUS__IDLE) {
SStreamDataSubmit* pSubmit = NULL; SStreamDataSubmit* pSubmit = NULL;
// 2. check processedVer // 2. check processedVer
// 2.1. if not missed, get msg from queue // 2.1. if not missed, get msg from queue
...@@ -68,18 +68,18 @@ int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) { ...@@ -68,18 +68,18 @@ int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) {
goto SEND_RSP; goto SEND_RSP;
} }
// set exec status closing // set exec status closing
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_STATUS__CLOSING); atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__CLOSING);
// second run // second run
if (tqLoopExecFromQueue(pTq, pHandle, &pSubmit, &rsp) == 0) { if (tqLoopExecFromQueue(pTq, pHandle, &pSubmit, &rsp) == 0) {
goto SEND_RSP; goto SEND_RSP;
} }
// set exec status idle // set exec status idle
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_STATUS__IDLE); atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE);
} }
SEND_RSP: SEND_RSP:
// 4. if get result // 4. if get result
// 4.1 set exec input status blocked and exec status idle // 4.1 set exec input status blocked and exec status idle
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_STATUS__IDLE); atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE);
// 4.2 rpc send // 4.2 rpc send
rsp.rspOffset = pHandle->pushHandle.processedVer; rsp.rspOffset = pHandle->pushHandle.processedVer;
/*if (tqSendPollRsp(pTq, pMsg, pReq, &rsp) < 0) {*/ /*if (tqSendPollRsp(pTq, pMsg, pReq, &rsp) < 0) {*/
...@@ -150,7 +150,7 @@ int32_t tqEnqueueAll(STQ* pTq, SSubmitReq* pReq) { ...@@ -150,7 +150,7 @@ int32_t tqEnqueueAll(STQ* pTq, SSubmitReq* pReq) {
continue; continue;
} }
int8_t execStatus = atomic_load_8(&pHandle->pushHandle.execStatus); int8_t execStatus = atomic_load_8(&pHandle->pushHandle.execStatus);
if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) { if (execStatus == TASK_EXEC_STATUS__IDLE || execStatus == TASK_EXEC_STATUS__CLOSING) {
tqSendExecReq(pTq, pHandle); tqSendExecReq(pTq, pHandle);
} }
} }
......
...@@ -172,7 +172,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -172,7 +172,7 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
goto _err; goto _err;
} }
} break; } break;
case TDMT_VND_STREAM_TASK_DROP: { case TDMT_STREAM_TASK_DROP: {
if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) { if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
goto _err; goto _err;
} }
......
...@@ -50,6 +50,10 @@ void streamCleanUp() { ...@@ -50,6 +50,10 @@ void streamCleanUp() {
void streamTriggerByTimer(void* param, void* tmrId) { void streamTriggerByTimer(void* param, void* tmrId) {
SStreamTask* pTask = (void*)param; SStreamTask* pTask = (void*)param;
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
return;
}
if (atomic_load_8(&pTask->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) { if (atomic_load_8(&pTask->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) {
SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM); SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM);
if (trigger == NULL) return; if (trigger == NULL) return;
...@@ -82,8 +86,8 @@ int32_t streamSetupTrigger(SStreamTask* pTask) { ...@@ -82,8 +86,8 @@ int32_t streamSetupTrigger(SStreamTask* pTask) {
} }
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) { int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) {
int8_t execStatus = atomic_load_8(&pTask->status); int8_t execStatus = atomic_load_8(&pTask->execStatus);
if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) { if (execStatus == TASK_EXEC_STATUS__IDLE || execStatus == TASK_EXEC_STATUS__CLOSING) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) return -1; if (pRunReq == NULL) return -1;
...@@ -188,6 +192,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) { ...@@ -188,6 +192,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
int32_t streamProcessRunReq(SStreamTask* pTask) { int32_t streamProcessRunReq(SStreamTask* pTask) {
streamExec(pTask, pTask->pMsgCb); streamExec(pTask, pTask->pMsgCb);
if (pTask->dispatchType != TASK_DISPATCH__NONE) { if (pTask->dispatchType != TASK_DISPATCH__NONE) {
streamDispatch(pTask, pTask->pMsgCb); streamDispatch(pTask, pTask->pMsgCb);
} }
......
...@@ -33,6 +33,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) ...@@ -33,6 +33,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
ASSERT(pTask->inputType == STREAM_INPUT__DATA_BLOCK); ASSERT(pTask->inputType == STREAM_INPUT__DATA_BLOCK);
SArray* blocks = pBlock->blocks; SArray* blocks = pBlock->blocks;
qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK, false); qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_DATA_TYPE_SSDATA_BLOCK, false);
} else if (pItem->type == STREAM_INPUT__DROP) {
// TODO exec drop
return 0;
} }
// exec // exec
...@@ -58,6 +61,10 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { ...@@ -58,6 +61,10 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
streamTaskExecImpl(pTask, data, pRes); streamTaskExecImpl(pTask, data, pRes);
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
return NULL;
}
if (taosArrayGetSize(pRes) != 0) { if (taosArrayGetSize(pRes) != 0) {
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
if (qRes == NULL) { if (qRes == NULL) {
...@@ -94,25 +101,26 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) { ...@@ -94,25 +101,26 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
if (pRes == NULL) return -1; if (pRes == NULL) return -1;
while (1) { while (1) {
int8_t execStatus = atomic_val_compare_exchange_8(&pTask->status, TASK_STATUS__IDLE, TASK_STATUS__EXECUTING); int8_t execStatus =
if (execStatus == TASK_STATUS__IDLE) { atomic_val_compare_exchange_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE, TASK_EXEC_STATUS__EXECUTING);
if (execStatus == TASK_EXEC_STATUS__IDLE) {
// first run // first run
pRes = streamExecForQall(pTask, pRes); pRes = streamExecForQall(pTask, pRes);
if (pRes == NULL) goto FAIL; if (pRes == NULL) goto FAIL;
// set status closing // set status closing
atomic_store_8(&pTask->status, TASK_STATUS__CLOSING); atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__CLOSING);
// second run, make sure inputQ and qall are cleared // second run, make sure inputQ and qall are cleared
pRes = streamExecForQall(pTask, pRes); pRes = streamExecForQall(pTask, pRes);
if (pRes == NULL) goto FAIL; if (pRes == NULL) goto FAIL;
taosArrayDestroy(pRes); taosArrayDestroy(pRes);
atomic_store_8(&pTask->status, TASK_STATUS__IDLE); atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
return 0; return 0;
} else if (execStatus == TASK_STATUS__CLOSING) { } else if (execStatus == TASK_EXEC_STATUS__CLOSING) {
continue; continue;
} else if (execStatus == TASK_STATUS__EXECUTING) { } else if (execStatus == TASK_EXEC_STATUS__EXECUTING) {
ASSERT(taosArrayGetSize(pRes) == 0); ASSERT(taosArrayGetSize(pRes) == 0);
taosArrayDestroy(pRes); taosArrayDestroy(pRes);
return 0; return 0;
...@@ -122,7 +130,7 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) { ...@@ -122,7 +130,7 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
} }
FAIL: FAIL:
if (pRes) taosArrayDestroy(pRes); if (pRes) taosArrayDestroy(pRes);
atomic_store_8(&pTask->status, TASK_STATUS__IDLE); atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
return -1; return -1;
} }
...@@ -23,7 +23,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId) { ...@@ -23,7 +23,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId) {
} }
pTask->taskId = tGenIdPI32(); pTask->taskId = tGenIdPI32();
pTask->streamId = streamId; pTask->streamId = streamId;
pTask->status = TASK_STATUS__IDLE; pTask->execStatus = TASK_EXEC_STATUS__IDLE;
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
...@@ -35,7 +35,8 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { ...@@ -35,7 +35,8 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->inputType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->inputType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->status) < 0) return -1; if (tEncodeI8(pEncoder, pTask->taskStatus) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->execStatus) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1;
...@@ -83,7 +84,8 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { ...@@ -83,7 +84,8 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->inputType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->inputType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->taskStatus) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->execStatus) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册