提交 4c929973 编写于 作者: H Haojun Liao

fix(stream): use streamId&taskId to identify the stream task in the stream meta.

上级 3cdb1635
...@@ -2767,6 +2767,7 @@ typedef struct { ...@@ -2767,6 +2767,7 @@ typedef struct {
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
int64_t leftForVer; int64_t leftForVer;
int64_t streamId;
int32_t taskId; int32_t taskId;
} SVDropStreamTaskReq; } SVDropStreamTaskReq;
...@@ -2958,6 +2959,7 @@ int32_t tDecodeMqVgOffset(SDecoder* pDecoder, SMqVgOffset* pOffset); ...@@ -2958,6 +2959,7 @@ int32_t tDecodeMqVgOffset(SDecoder* pDecoder, SMqVgOffset* pOffset);
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
int64_t streamId;
int32_t taskId; int32_t taskId;
} SVPauseStreamTaskReq; } SVPauseStreamTaskReq;
...@@ -2976,6 +2978,7 @@ int32_t tDeserializeSMPauseStreamReq(void* buf, int32_t bufLen, SMPauseStreamReq ...@@ -2976,6 +2978,7 @@ int32_t tDeserializeSMPauseStreamReq(void* buf, int32_t bufLen, SMPauseStreamReq
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
int32_t taskId; int32_t taskId;
int64_t streamId;
int8_t igUntreated; int8_t igUntreated;
} SVResumeStreamTaskReq; } SVResumeStreamTaskReq;
......
...@@ -646,7 +646,7 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); ...@@ -646,7 +646,7 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded); int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); // todo remove it
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaBegin(SStreamMeta* pMeta);
......
...@@ -649,6 +649,8 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { ...@@ -649,6 +649,8 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
pReq->head.vgId = htonl(pTask->info.nodeId); pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId; pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
STransAction action = {0}; STransAction action = {0};
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet)); memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet));
action.pCont = pReq; action.pCont = pReq;
...@@ -1361,6 +1363,8 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) { ...@@ -1361,6 +1363,8 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
} }
pReq->head.vgId = htonl(pTask->info.nodeId); pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId; pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
STransAction action = {0}; STransAction action = {0};
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet)); memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet));
action.pCont = pReq; action.pCont = pReq;
...@@ -1501,7 +1505,9 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig ...@@ -1501,7 +1505,9 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig
} }
pReq->head.vgId = htonl(pTask->info.nodeId); pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId; pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
pReq->igUntreated = igUntreated; pReq->igUntreated = igUntreated;
STransAction action = {0}; STransAction action = {0};
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet)); memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet));
action.pCont = pReq; action.pCont = pReq;
......
...@@ -35,9 +35,7 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -35,9 +35,7 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
int32_t taskId = req.taskId; SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.taskId);
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
if (pTask) { if (pTask) {
SRpcMsg rsp = { SRpcMsg rsp = {
.info = pMsg->info, .info = pMsg->info,
...@@ -181,7 +179,7 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) { ...@@ -181,7 +179,7 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg; SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId); qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId);
SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) { if (pTask == NULL) {
qError("vgId:%d failed to acquire s-task:0x%x when dropping it", pSnode->pMeta->vgId, pReq->taskId); qError("vgId:%d failed to acquire s-task:0x%x when dropping it", pSnode->pMeta->vgId, pReq->taskId);
return 0; return 0;
...@@ -194,8 +192,7 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) { ...@@ -194,8 +192,7 @@ int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) { int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) {
SStreamTaskRunReq *pReq = pMsg->pCont; SStreamTaskRunReq *pReq = pMsg->pCont;
int32_t taskId = pReq->taskId; SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
if (pTask) { if (pTask) {
streamProcessRunReq(pTask); streamProcessRunReq(pTask);
streamMetaReleaseTask(pSnode->pMeta, pTask); streamMetaReleaseTask(pSnode->pMeta, pTask);
...@@ -213,9 +210,8 @@ int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) { ...@@ -213,9 +210,8 @@ int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) {
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen); tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen);
tDecodeStreamDispatchReq(&decoder, &req); tDecodeStreamDispatchReq(&decoder, &req);
int32_t taskId = req.taskId;
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.taskId);
if (pTask) { if (pTask) {
SRpcMsg rsp = { .info = pMsg->info, .code = 0 }; SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
streamProcessDispatchMsg(pTask, &req, &rsp, exec); streamProcessDispatchMsg(pTask, &req, &rsp, exec);
...@@ -235,8 +231,7 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -235,8 +231,7 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) {
tDecoderInit(&decoder, msgBody, msgLen); tDecoderInit(&decoder, msgBody, msgLen);
tDecodeStreamRetrieveReq(&decoder, &req); tDecodeStreamRetrieveReq(&decoder, &req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
int32_t taskId = req.dstTaskId; SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.dstTaskId);
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
if (pTask) { if (pTask) {
SRpcMsg rsp = { .info = pMsg->info, .code = 0}; SRpcMsg rsp = { .info = pMsg->info, .code = 0};
...@@ -252,7 +247,7 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -252,7 +247,7 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) {
int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) { int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) {
SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = ntohl(pRsp->upstreamTaskId); int32_t taskId = ntohl(pRsp->upstreamTaskId);
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pRsp->streamId, taskId);
if (pTask) { if (pTask) {
streamProcessDispatchRsp(pTask, pRsp, pMsg->code); streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
streamMetaReleaseTask(pSnode->pMeta, pTask); streamMetaReleaseTask(pSnode->pMeta, pTask);
...@@ -297,7 +292,7 @@ int32_t sndProcessStreamTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg) ...@@ -297,7 +292,7 @@ int32_t sndProcessStreamTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg)
tDecoderClear(&decoder); tDecoderClear(&decoder);
// find task // find task
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.downstreamTaskId); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.downstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
return -1; return -1;
} }
...@@ -340,7 +335,7 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -340,7 +335,7 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
.upstreamTaskId = req.upstreamTaskId, .upstreamTaskId = req.upstreamTaskId,
}; };
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, taskId);
if (pTask != NULL) { if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask); rsp.status = streamTaskCheckStatus(pTask);
...@@ -400,7 +395,7 @@ int32_t sndProcessStreamTaskCheckRsp(SSnode* pSnode, SRpcMsg* pMsg) { ...@@ -400,7 +395,7 @@ int32_t sndProcessStreamTaskCheckRsp(SSnode* pSnode, SRpcMsg* pMsg) {
qDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", qDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, rsp.upstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, rsp.streamId, rsp.upstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
qError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId, qError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
pSnode->pMeta->vgId); pSnode->pMeta->vgId);
......
...@@ -1062,7 +1062,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1062,7 +1062,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
.upstreamTaskId = req.upstreamTaskId, .upstreamTaskId = req.upstreamTaskId,
}; };
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId);
if (pTask != NULL) { if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask); rsp.status = streamTaskCheckStatus(pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
...@@ -1099,7 +1099,7 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) { ...@@ -1099,7 +1099,7 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) {
tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.streamId, rsp.upstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId, tqError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
pTq->pStreamMeta->vgId); pTq->pStreamMeta->vgId);
...@@ -1160,7 +1160,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms ...@@ -1160,7 +1160,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
// not added into meta store // not added into meta store
if (added) { if (added) {
tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks); tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId); SStreamTask* p = streamMetaAcquireTask(pStreamMeta, pTask->id.streamId, taskId);
if (p != NULL) { // reset the downstreamReady flag. if (p != NULL) { // reset the downstreamReady flag.
streamTaskCheckDownstreamTasks(p); streamTaskCheckDownstreamTasks(p);
} }
...@@ -1178,7 +1178,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1178,7 +1178,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed", tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed",
pMeta->vgId, pReq->taskId); pMeta->vgId, pReq->taskId);
...@@ -1234,7 +1234,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1234,7 +1234,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
bool done = false; bool done = false;
// 1. get the related stream task // 1. get the related stream task
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) { if (pStreamTask == NULL) {
// todo delete this task, if the related stream task is dropped // todo delete this task, if the related stream task is dropped
qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s", qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s",
...@@ -1350,7 +1350,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1350,7 +1350,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.downstreamTaskId); tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.downstreamTaskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.downstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.downstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.downstreamTaskId); tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.downstreamTaskId);
return -1; return -1;
...@@ -1386,7 +1386,7 @@ int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1386,7 +1386,7 @@ int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
tDecodeStreamScanHistoryFinishReq(&decoder, &req); tDecodeStreamScanHistoryFinishReq(&decoder, &req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.downstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.downstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed", tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed",
pTq->pStreamMeta->vgId, req.downstreamTaskId); pTq->pStreamMeta->vgId, req.downstreamTaskId);
...@@ -1412,7 +1412,7 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1412,7 +1412,7 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
tDecodeCompleteHistoryDataMsg(&decoder, &req); tDecodeCompleteHistoryDataMsg(&decoder, &req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.upstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.upstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed",
pTq->pStreamMeta->vgId, req.upstreamTaskId); pTq->pStreamMeta->vgId, req.upstreamTaskId);
...@@ -1503,7 +1503,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1503,7 +1503,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
return 0; return 0;
} }
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, taskId);
if (pTask != NULL) { if (pTask != NULL) {
// even in halt status, the data in inputQ must be processed // even in halt status, the data in inputQ must be processed
int8_t st = pTask->status.taskStatus; int8_t st = pTask->status.taskStatus;
...@@ -1538,7 +1538,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { ...@@ -1538,7 +1538,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeStreamDispatchReq(&decoder, &req); tDecodeStreamDispatchReq(&decoder, &req);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.taskId);
if (pTask) { if (pTask) {
SRpcMsg rsp = {.info = pMsg->info, .code = 0}; SRpcMsg rsp = {.info = pMsg->info, .code = 0};
streamProcessDispatchMsg(pTask, &req, &rsp, exec); streamProcessDispatchMsg(pTask, &req, &rsp, exec);
...@@ -1553,7 +1553,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { ...@@ -1553,7 +1553,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = ntohl(pRsp->upstreamTaskId); int32_t taskId = ntohl(pRsp->upstreamTaskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pRsp->streamId, taskId);
int32_t vgId = pTq->pStreamMeta->vgId; int32_t vgId = pTq->pStreamMeta->vgId;
if (pTask) { if (pTask) {
...@@ -1569,7 +1569,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1569,7 +1569,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId); tqDebug("vgId:%d receive msg to drop stream task:0x%x", TD_VID(pTq->pVnode), pReq->taskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d failed to acquire s-task:0x%x when dropping it", pTq->pStreamMeta->vgId, pReq->taskId); tqError("vgId:%d failed to acquire s-task:0x%x when dropping it", pTq->pStreamMeta->vgId, pReq->taskId);
return 0; return 0;
...@@ -1584,7 +1584,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -1584,7 +1584,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg; SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId, tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
pReq->taskId); pReq->taskId);
...@@ -1597,7 +1597,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -1597,7 +1597,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
SStreamTask* pHistoryTask = NULL; SStreamTask* pHistoryTask = NULL;
if (pTask->historyTaskId.taskId != 0) { if (pTask->historyTaskId.taskId != 0) {
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId); pHistoryTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
if (pHistoryTask == NULL) { if (pHistoryTask == NULL) {
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already", tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%x, it may have been dropped already",
pMeta->vgId, pTask->historyTaskId.taskId); pMeta->vgId, pTask->historyTaskId.taskId);
...@@ -1656,13 +1656,13 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, ...@@ -1656,13 +1656,13 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId);
int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated); int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated);
if (code != 0) { if (code != 0) {
return code; return code;
} }
SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.taskId); SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
if (pHistoryTask) { if (pHistoryTask) {
code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated); code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated);
} }
...@@ -1681,8 +1681,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1681,8 +1681,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
tDecodeStreamRetrieveReq(&decoder, &req); tDecodeStreamRetrieveReq(&decoder, &req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
int32_t taskId = req.dstTaskId; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.dstTaskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask) { if (pTask) {
SRpcMsg rsp = {.info = pMsg->info, .code = 0}; SRpcMsg rsp = {.info = pMsg->info, .code = 0};
...@@ -1720,7 +1719,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { ...@@ -1720,7 +1719,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
int32_t taskId = req.taskId; int32_t taskId = req.taskId;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.taskId);
if (pTask != NULL) { if (pTask != NULL) {
SRpcMsg rsp = {.info = pMsg->info, .code = 0}; SRpcMsg rsp = {.info = pMsg->info, .code = 0};
streamProcessDispatchMsg(pTask, &req, &rsp, false); streamProcessDispatchMsg(pTask, &req, &rsp, false);
......
...@@ -72,8 +72,8 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) { ...@@ -72,8 +72,8 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) {
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
for (int32_t i = 0; i < numOfTasks; ++i) { for (int32_t i = 0; i < numOfTasks; ++i) {
int32_t* pTaskId = taosArrayGet(pTaskList, i); SStreamId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, *pTaskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) { if (pTask == NULL) {
continue; continue;
} }
...@@ -242,8 +242,8 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { ...@@ -242,8 +242,8 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
numOfTasks = taosArrayGetSize(pTaskList); numOfTasks = taosArrayGetSize(pTaskList);
for (int32_t i = 0; i < numOfTasks; ++i) { for (int32_t i = 0; i < numOfTasks; ++i) {
int32_t* pTaskId = taosArrayGet(pTaskList, i); SStreamId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, *pTaskId); SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) { if (pTask == NULL) {
continue; continue;
} }
......
...@@ -290,7 +290,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { ...@@ -290,7 +290,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) { if (pStreamTask == NULL) {
// todo: destroy the fill-history task here // todo: destroy the fill-history task here
qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr, qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr,
......
...@@ -66,14 +66,14 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF ...@@ -66,14 +66,14 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto _err; goto _err;
} }
_hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT); _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
pMeta->pTasks = taosHashInit(64, fp, true, HASH_NO_LOCK); pMeta->pTasks = taosHashInit(64, fp, true, HASH_NO_LOCK);
if (pMeta->pTasks == NULL) { if (pMeta->pTasks == NULL) {
goto _err; goto _err;
} }
// task list // task list
pMeta->pTaskList = taosArrayInit(4, sizeof(int32_t)); pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamId));
if (pMeta->pTaskList == NULL) { if (pMeta->pTaskList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
...@@ -248,7 +248,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa ...@@ -248,7 +248,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
return -1; return -1;
} }
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); taosArrayPush(pMeta->pTaskList, &pTask->id);
if (streamMetaSaveTask(pMeta, pTask) < 0) { if (streamMetaSaveTask(pMeta, pTask) < 0) {
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
...@@ -274,10 +274,11 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) { ...@@ -274,10 +274,11 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
return (int32_t)size; return (int32_t)size;
} }
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
taosRLockLatch(&pMeta->lock); taosRLockLatch(&pMeta->lock);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); int64_t keys[2] = {streamId, taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (ppTask != NULL) { if (ppTask != NULL) {
if (!streamTaskShouldStop(&(*ppTask)->status)) { if (!streamTaskShouldStop(&(*ppTask)->status)) {
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1); int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
...@@ -304,10 +305,10 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { ...@@ -304,10 +305,10 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
} }
} }
static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, int32_t taskId) { static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, SStreamId* id) {
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
int32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i); SStreamId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
if (*pTaskId == taskId) { if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) {
taosArrayRemove(pMeta->pTaskList, i); taosArrayRemove(pMeta->pTaskList, i);
break; break;
} }
...@@ -360,9 +361,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) { ...@@ -360,9 +361,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) {
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
ASSERT(pTask->status.timerActive == 0); ASSERT(pTask->status.timerActive == 0);
doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
int32_t num = taosArrayGetSize(pMeta->pTaskList);
doRemoveIdFromList(pMeta, num, pTask->id.taskId);
// remove the ref by timer // remove the ref by timer
if (pTask->triggerParam != 0) { if (pTask->triggerParam != 0) {
...@@ -484,7 +483,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { ...@@ -484,7 +483,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
return -1; return -1;
} }
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); taosArrayPush(pMeta->pTaskList, &pTask->id);
} else { } else {
tdbFree(pKey); tdbFree(pKey);
tdbFree(pVal); tdbFree(pVal);
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
typedef struct SStreamTaskRetryInfo { typedef struct SStreamTaskRetryInfo {
SStreamMeta* pMeta; SStreamMeta* pMeta;
int32_t taskId; int32_t taskId;
int64_t streamId;
} SStreamTaskRetryInfo; } SStreamTaskRetryInfo;
static int32_t streamSetParamForScanHistory(SStreamTask* pTask); static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
...@@ -556,12 +557,12 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { ...@@ -556,12 +557,12 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
} }
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->streamId, pInfo->taskId);
if (pTask != NULL) { if (pTask != NULL) {
ASSERT(pTask->status.timerActive == 1); ASSERT(pTask->status.timerActive == 1);
// abort the timer if intend to stop task // abort the timer if intend to stop task
SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.taskId); SStreamTask* pHTask = streamMetaAcquireTask(pMeta, pTask->historyTaskId.streamId, pTask->historyTaskId.taskId);
if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) { if (pHTask == NULL && (!streamTaskShouldStop(&pTask->status))) {
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
qWarn( qWarn(
...@@ -603,6 +604,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { ...@@ -603,6 +604,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo)); SStreamTaskRetryInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTaskRetryInfo));
pInfo->taskId = pTask->id.taskId; pInfo->taskId = pTask->id.taskId;
pInfo->streamId = pTask->id.streamId;
pInfo->pMeta = pTask->pMeta; pInfo->pMeta = pTask->pMeta;
if (pTask->launchTaskTimer == NULL) { if (pTask->launchTaskTimer == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册