提交 b7456053 编写于 作者: L Liu Jicong

fix(stream): check task exist

上级 bf37f3fa
......@@ -506,7 +506,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
.initTqReader = true,
.version = ver,
};
pHandle->execHandle.execCol.task[i] = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols);
pHandle->execHandle.execCol.task[i] =
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols);
ASSERT(pHandle->execHandle.execCol.task[i]);
void* scanner = NULL;
qExtractStreamScanner(pHandle->execHandle.execCol.task[i], &scanner);
......@@ -679,9 +680,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
//
SStreamTaskRunReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (pTask) {
streamProcessRunReq(pTask);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) {
streamProcessRunReq(*ppTask);
return 0;
} else {
return -1;
......@@ -696,14 +697,14 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
SDecoder decoder;
tDecoderInit(&decoder, msgBody, msgLen);
tDecodeStreamDispatchReq(&decoder, &req);
int32_t taskId = req.taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (pTask) {
int32_t taskId = req.taskId;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) {
SRpcMsg rsp = {
.info = pMsg->info,
.code = 0,
};
streamProcessDispatchReq(pTask, &req, &rsp);
streamProcessDispatchReq(*ppTask, &req, &rsp);
return 0;
} else {
return -1;
......@@ -713,9 +714,9 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (pTask) {
streamProcessRecoverReq(pTask, pReq, pMsg);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) {
streamProcessRecoverReq(*ppTask, pReq, pMsg);
return 0;
} else {
return -1;
......@@ -725,9 +726,9 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = pRsp->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (pTask) {
streamProcessDispatchRsp(pTask, pRsp);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) {
streamProcessDispatchRsp(*ppTask, pRsp);
return 0;
} else {
return -1;
......@@ -737,9 +738,9 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
int32_t taskId = pRsp->taskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (pTask) {
streamProcessRecoverRsp(pTask, pRsp);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) {
streamProcessRecoverRsp(*ppTask, pRsp);
return 0;
} else {
return -1;
......@@ -749,10 +750,10 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
if (pTask) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
if (ppTask) {
taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
atomic_store_8(&(*ppTask)->taskStatus, TASK_STATUS__DROPPING);
}
// todo
// clear queue
......@@ -780,16 +781,17 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
SDecoder decoder;
tDecoderInit(&decoder, msgBody, msgLen);
tDecodeStreamRetrieveReq(&decoder, &req);
int32_t taskId = req.dstTaskId;
SStreamTask* pTask = *(SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
return 0;
int32_t taskId = req.dstTaskId;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) {
SRpcMsg rsp = {
.info = pMsg->info,
.code = 0,
};
streamProcessRetrieveReq(*ppTask, &req, &rsp);
} else {
return -1;
}
SRpcMsg rsp = {
.info = pMsg->info,
.code = 0,
};
streamProcessRetrieveReq(pTask, &req, &rsp);
return 0;
}
......
......@@ -94,7 +94,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
haystack = candidate + 1;
}
if (found || offset == 0) break;
offset = TMIN(0, offset - readSize + 8);
offset = TMIN(0, offset - readSize + sizeof(uint64_t));
int64_t offset2 = taosLSeekFile(pFile, offset, SEEK_SET);
ASSERT(offset == offset2);
if (readSize != taosReadFile(pFile, buf, readSize)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册