提交 a9bf05bc 编写于 作者: L Liu Jicong

refactor: stream meta ref count

上级 76cd3122
...@@ -338,7 +338,7 @@ typedef struct SStreamTask { ...@@ -338,7 +338,7 @@ typedef struct SStreamTask {
int32_t recoverWaitingUpstream; int32_t recoverWaitingUpstream;
int64_t checkReqId; int64_t checkReqId;
SArray* checkReqIds; // shuffle SArray* checkReqIds; // shuffle
int32_t refCnt;
} SStreamTask; } SStreamTask;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
...@@ -565,6 +565,7 @@ typedef struct SStreamMeta { ...@@ -565,6 +565,7 @@ typedef struct SStreamMeta {
TXN txn; TXN txn;
FTaskExpand* expandFunc; FTaskExpand* expandFunc;
int32_t vgId; int32_t vgId;
SRWLatch lock;
} SStreamMeta; } SStreamMeta;
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId); SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
...@@ -575,6 +576,10 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, c ...@@ -575,6 +576,10 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, c
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaRemoveTask1(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaRollBack(SStreamMeta* pMeta); int32_t streamMetaRollBack(SStreamMeta* pMeta);
......
...@@ -738,6 +738,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { ...@@ -738,6 +738,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
} }
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return TSDB_CODE_ACTION_IN_PROGRESS; return TSDB_CODE_ACTION_IN_PROGRESS;
} }
......
...@@ -36,13 +36,14 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -36,13 +36,14 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) {
int32_t taskId = req.taskId; int32_t taskId = req.taskId;
SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
if (pTask) { if (pTask) {
SRpcMsg rsp = { SRpcMsg rsp = {
.info = pMsg->info, .info = pMsg->info,
.code = 0, .code = 0,
}; };
streamProcessDispatchReq(pTask, &req, &rsp, false); streamProcessDispatchReq(pTask, &req, &rsp, false);
streamMetaReleaseTask(pSnode->pMeta, pTask);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
return; return;
...@@ -63,6 +64,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { ...@@ -63,6 +64,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG); ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0); ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
pTask->refCnt = 1;
pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->inputQueue = streamQueueOpen(); pTask->inputQueue = streamQueueOpen();
pTask->outputQueue = streamQueueOpen(); pTask->outputQueue = streamQueueOpen();
...@@ -166,15 +168,17 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { ...@@ -166,15 +168,17 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) { int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg; SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
return streamMetaRemoveTask(pSnode->pMeta, pReq->taskId); streamMetaRemoveTask1(pSnode->pMeta, pReq->taskId);
return 0;
} }
int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) { int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) {
SStreamTaskRunReq *pReq = pMsg->pCont; SStreamTaskRunReq *pReq = pMsg->pCont;
int32_t taskId = pReq->taskId; int32_t taskId = pReq->taskId;
SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
if (pTask) { if (pTask) {
streamProcessRunReq(pTask); streamProcessRunReq(pTask);
streamMetaReleaseTask(pSnode->pMeta, pTask);
return 0; return 0;
} else { } else {
return -1; return -1;
...@@ -191,13 +195,14 @@ int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) { ...@@ -191,13 +195,14 @@ int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) {
tDecodeStreamDispatchReq(&decoder, &req); tDecodeStreamDispatchReq(&decoder, &req);
int32_t taskId = req.taskId; int32_t taskId = req.taskId;
SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
if (pTask) { if (pTask) {
SRpcMsg rsp = { SRpcMsg rsp = {
.info = pMsg->info, .info = pMsg->info,
.code = 0, .code = 0,
}; };
streamProcessDispatchReq(pTask, &req, &rsp, exec); streamProcessDispatchReq(pTask, &req, &rsp, exec);
streamMetaReleaseTask(pSnode->pMeta, pTask);
return 0; return 0;
} else { } else {
return -1; return -1;
...@@ -215,13 +220,14 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -215,13 +220,14 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) {
tDecodeStreamRetrieveReq(&decoder, &req); tDecodeStreamRetrieveReq(&decoder, &req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
int32_t taskId = req.dstTaskId; int32_t taskId = req.dstTaskId;
SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
if (pTask) { if (pTask) {
SRpcMsg rsp = { SRpcMsg rsp = {
.info = pMsg->info, .info = pMsg->info,
.code = 0, .code = 0,
}; };
streamProcessRetrieveReq(pTask, &req, &rsp); streamProcessRetrieveReq(pTask, &req, &rsp);
streamMetaReleaseTask(pSnode->pMeta, pTask);
tDeleteStreamRetrieveReq(&req); tDeleteStreamRetrieveReq(&req);
return 0; return 0;
} else { } else {
...@@ -232,9 +238,10 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -232,9 +238,10 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) {
int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) { int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) {
SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = ntohl(pRsp->upstreamTaskId); int32_t taskId = ntohl(pRsp->upstreamTaskId);
SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, taskId); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
if (pTask) { if (pTask) {
streamProcessDispatchRsp(pTask, pRsp, pMsg->code); streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
streamMetaReleaseTask(pSnode->pMeta, pTask);
return 0; return 0;
} else { } else {
return -1; return -1;
...@@ -274,15 +281,17 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -274,15 +281,17 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
// find task // find task
SStreamTask *pTask = streamMetaGetTask(pSnode->pMeta, req.taskId); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.taskId);
if (pTask == NULL) { if (pTask == NULL) {
return -1; return -1;
} }
// do process request // do process request
if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) { if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
streamMetaReleaseTask(pSnode->pMeta, pTask);
return -1; return -1;
} }
streamMetaReleaseTask(pSnode->pMeta, pTask);
return 0; return 0;
} }
......
...@@ -882,6 +882,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -882,6 +882,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0); ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
} }
pTask->refCnt = 1;
pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->inputQueue = streamQueueOpen(); pTask->inputQueue = streamQueueOpen();
...@@ -975,13 +976,15 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -975,13 +976,15 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
.upstreamNodeId = req.upstreamNodeId, .upstreamNodeId = req.upstreamNodeId,
.upstreamTaskId = req.upstreamTaskId, .upstreamTaskId = req.upstreamTaskId,
}; };
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask && atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) { if (pTask && atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) {
rsp.status = 1; rsp.status = 1;
} else { } else {
rsp.status = 0; rsp.status = 0;
} }
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqDebug("tq recv task check req(reqId: %" PRId64 ") %d at node %d check req from task %d at node %d, status %d", tqDebug("tq recv task check req(reqId: %" PRId64 ") %d at node %d check req from task %d at node %d, status %d",
rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
...@@ -1027,12 +1030,14 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_ ...@@ -1027,12 +1030,14 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_
tqDebug("tq recv task check rsp(reqId: %" PRId64 ") %d at node %d check req from task %d at node %d, status %d", tqDebug("tq recv task check rsp(reqId: %" PRId64 ") %d at node %d check req from task %d at node %d, status %d",
rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, rsp.upstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
return -1; return -1;
} }
return streamProcessTaskCheckRsp(pTask, &rsp, version); code = streamProcessTaskCheckRsp(pTask, &rsp, version);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return code;
} }
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
...@@ -1077,15 +1082,17 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1077,15 +1082,17 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
int32_t msgLen = pMsg->contLen; int32_t msgLen = pMsg->contLen;
SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg; SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg;
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
if (pTask == NULL) { if (pTask == NULL) {
return -1; return -1;
} }
ASSERT(pReq->taskId == pTask->taskId);
// check param // check param
int64_t fillVer1 = pTask->startVer; int64_t fillVer1 = pTask->startVer;
if (fillVer1 <= 0) { if (fillVer1 <= 0) {
ASSERT(0); ASSERT(0);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return -1; return -1;
} }
...@@ -1096,10 +1103,11 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1096,10 +1103,11 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
SStreamRecoverStep2Req req; SStreamRecoverStep2Req req;
code = streamBuildSourceRecover2Req(pTask, &req); code = streamBuildSourceRecover2Req(pTask, &req);
if (code < 0) { if (code < 0) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return -1; return -1;
} }
ASSERT(pReq->taskId == pTask->taskId); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
// serialize msg // serialize msg
int32_t len = sizeof(SStreamRecoverStep1Req); int32_t len = sizeof(SStreamRecoverStep1Req);
...@@ -1127,7 +1135,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1127,7 +1135,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
int32_t code; int32_t code;
SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg; SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
if (pTask == NULL) { if (pTask == NULL) {
return -1; return -1;
} }
...@@ -1135,27 +1143,33 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t m ...@@ -1135,27 +1143,33 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t m
// do recovery step 2 // do recovery step 2
code = streamSourceRecoverScanStep2(pTask, version); code = streamSourceRecoverScanStep2(pTask, version);
if (code < 0) { if (code < 0) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return -1; return -1;
} }
// restore param // restore param
code = streamRestoreParam(pTask); code = streamRestoreParam(pTask);
if (code < 0) { if (code < 0) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return -1; return -1;
} }
// set status normal // set status normal
code = streamSetStatusNormal(pTask); code = streamSetStatusNormal(pTask);
if (code < 0) { if (code < 0) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return -1; return -1;
} }
// dispatch recover finish req to all related downstream task // dispatch recover finish req to all related downstream task
code = streamDispatchRecoverFinishReq(pTask); code = streamDispatchRecoverFinishReq(pTask);
if (code < 0) { if (code < 0) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return -1; return -1;
} }
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0; return 0;
} }
...@@ -1172,15 +1186,17 @@ int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1172,15 +1186,17 @@ int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
// find task // find task
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, req.taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
if (pTask == NULL) { if (pTask == NULL) {
return -1; return -1;
} }
// do process request // do process request
if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) { if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return -1; return -1;
} }
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0; return 0;
} }
...@@ -1354,9 +1370,10 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) { ...@@ -1354,9 +1370,10 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRunReq* pReq = pMsg->pCont; SStreamTaskRunReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId; int32_t taskId = pReq->taskId;
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask) { if (pTask) {
streamProcessRunReq(pTask); streamProcessRunReq(pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0; return 0;
} else { } else {
return -1; return -1;
...@@ -1373,13 +1390,14 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { ...@@ -1373,13 +1390,14 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
tDecodeStreamDispatchReq(&decoder, &req); tDecodeStreamDispatchReq(&decoder, &req);
int32_t taskId = req.taskId; int32_t taskId = req.taskId;
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask) { if (pTask) {
SRpcMsg rsp = { SRpcMsg rsp = {
.info = pMsg->info, .info = pMsg->info,
.code = 0, .code = 0,
}; };
streamProcessDispatchReq(pTask, &req, &rsp, exec); streamProcessDispatchReq(pTask, &req, &rsp, exec);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0; return 0;
} else { } else {
return -1; return -1;
...@@ -1389,10 +1407,11 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { ...@@ -1389,10 +1407,11 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = ntohl(pRsp->upstreamTaskId); int32_t taskId = ntohl(pRsp->upstreamTaskId);
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
tqDebug("recv dispatch rsp, code: %x", pMsg->code); tqDebug("recv dispatch rsp, code: %x", pMsg->code);
if (pTask) { if (pTask) {
streamProcessDispatchRsp(pTask, pRsp, pMsg->code); streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0; return 0;
} else { } else {
return -1; return -1;
...@@ -1401,7 +1420,8 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1401,7 +1420,8 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) { int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId); streamMetaRemoveTask1(pTq->pStreamMeta, pReq->taskId);
return 0;
} }
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
...@@ -1414,13 +1434,14 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1414,13 +1434,14 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
tDecodeStreamRetrieveReq(&decoder, &req); tDecodeStreamRetrieveReq(&decoder, &req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
int32_t taskId = req.dstTaskId; int32_t taskId = req.dstTaskId;
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask) { if (pTask) {
SRpcMsg rsp = { SRpcMsg rsp = {
.info = pMsg->info, .info = pMsg->info,
.code = 0, .code = 0,
}; };
streamProcessRetrieveReq(pTask, &req, &rsp); streamProcessRetrieveReq(pTask, &req, &rsp);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tDeleteStreamRetrieveReq(&req); tDeleteStreamRetrieveReq(&req);
return 0; return 0;
} else { } else {
...@@ -1452,13 +1473,14 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { ...@@ -1452,13 +1473,14 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
int32_t taskId = req.taskId; int32_t taskId = req.taskId;
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask) { if (pTask) {
SRpcMsg rsp = { SRpcMsg rsp = {
.info = pMsg->info, .info = pMsg->info,
.code = 0, .code = 0,
}; };
streamProcessDispatchReq(pTask, &req, &rsp, false); streamProcessDispatchReq(pTask, &req, &rsp, false);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
return 0; return 0;
......
...@@ -2385,6 +2385,8 @@ static void destroyStreamScanOperatorInfo(void* param) { ...@@ -2385,6 +2385,8 @@ static void destroyStreamScanOperatorInfo(void* param) {
taosMemoryFree(pStreamScan->pPseudoExpr); taosMemoryFree(pStreamScan->pPseudoExpr);
} }
cleanupExprSupp(&pStreamScan->tbnameCalSup);
updateInfoDestroy(pStreamScan->pUpdateInfo); updateInfoDestroy(pStreamScan->pUpdateInfo);
blockDataDestroy(pStreamScan->pRes); blockDataDestroy(pStreamScan->pRes);
blockDataDestroy(pStreamScan->pUpdateRes); blockDataDestroy(pStreamScan->pUpdateRes);
......
...@@ -169,6 +169,47 @@ SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId) { ...@@ -169,6 +169,47 @@ SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId) {
} }
} }
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
taosRLockLatch(&pMeta->lock);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) {
SStreamTask* pTask = *ppTask;
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__DROPPING) {
atomic_add_fetch_32(&pTask->refCnt, 1);
taosRUnLockLatch(&pMeta->lock);
return pTask;
} else {
taosRUnLockLatch(&pMeta->lock);
return NULL;
}
}
taosRUnLockLatch(&pMeta->lock);
return NULL;
}
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
int32_t left = atomic_sub_fetch_32(&pTask->refCnt, 1);
ASSERT(left >= 0);
if (left == 0) {
ASSERT(atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING);
tFreeSStreamTask(pTask);
}
}
void streamMetaRemoveTask1(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) {
SStreamTask* pTask = *ppTask;
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
taosWLockLatch(&pMeta->lock);
streamMetaReleaseTask(pMeta, pTask);
taosWUnLockLatch(&pMeta->lock);
}
}
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) { if (ppTask) {
......
...@@ -32,7 +32,7 @@ SWalRef *walOpenRef(SWal *pWal) { ...@@ -32,7 +32,7 @@ SWalRef *walOpenRef(SWal *pWal) {
return pRef; return pRef;
} }
#if 0 #if 1
void walCloseRef(SWal *pWal, int64_t refId) { void walCloseRef(SWal *pWal, int64_t refId) {
SWalRef **ppRef = taosHashGet(pWal->pRefHash, &refId, sizeof(int64_t)); SWalRef **ppRef = taosHashGet(pWal->pRefHash, &refId, sizeof(int64_t));
if (ppRef == NULL) return; if (ppRef == NULL) return;
...@@ -67,7 +67,7 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) { ...@@ -67,7 +67,7 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) {
return 0; return 0;
} }
#if 0 #if 1
void walUnrefVer(SWalRef *pRef) { void walUnrefVer(SWalRef *pRef) {
pRef->refId = -1; pRef->refId = -1;
pRef->refFile = -1; pRef->refFile = -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册