提交 3cdb1635 编写于 作者: H Haojun Liao

fix(stream): avoid launching check downstream when failed to add stream task into stream meta.

上级 bb39d94f
...@@ -74,7 +74,7 @@ typedef enum { ...@@ -74,7 +74,7 @@ typedef enum {
* @param vgId * @param vgId
* @return * @return
*/ */
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId); qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId);
/** /**
* Create the exec task for queue mode * Create the exec task for queue mode
...@@ -95,8 +95,6 @@ int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList ...@@ -95,8 +95,6 @@ int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList
*/ */
void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId); void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
//void qSetTaskCode(qTaskInfo_t tinfo, int32_t code);
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
// todo refactor // todo refactor
......
...@@ -88,7 +88,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { ...@@ -88,7 +88,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory }; SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory };
initStreamStateAPI(&handle.api); initStreamStateAPI(&handle.api);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0); pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0, pTask->id.taskId);
ASSERT(pTask->exec.pExecutor); ASSERT(pTask->exec.pExecutor);
taosThreadMutexInit(&pTask->lock, NULL); taosThreadMutexInit(&pTask->lock, NULL);
......
...@@ -267,7 +267,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat ...@@ -267,7 +267,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
SReadHandle handle = {.vnode = pVnode, .initTqReader = 1, .pStateBackend = pStreamState}; SReadHandle handle = {.vnode = pVnode, .initTqReader = 1, .pStateBackend = pStreamState};
initStorageAPI(&handle.api); initStorageAPI(&handle.api);
pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode)); pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode), 0);
if (!pRSmaInfo->taskInfo[idx]) { if (!pRSmaInfo->taskInfo[idx]) {
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
......
...@@ -956,7 +956,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -956,7 +956,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
.winRange = pTask->dataRange.window}; .winRange = pTask->dataRange.window};
initStorageAPI(&handle.api); initStorageAPI(&handle.api);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId); pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
if (pTask->exec.pExecutor == NULL) { if (pTask->exec.pExecutor == NULL) {
return -1; return -1;
} }
...@@ -983,7 +983,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -983,7 +983,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
.winRange = pTask->dataRange.window}; .winRange = pTask->dataRange.window};
initStorageAPI(&handle.api); initStorageAPI(&handle.api);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId); pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
if (pTask->exec.pExecutor == NULL) { if (pTask->exec.pExecutor == NULL) {
return -1; return -1;
} }
...@@ -1149,32 +1149,27 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms ...@@ -1149,32 +1149,27 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
taosWLockLatch(&pStreamMeta->lock); taosWLockLatch(&pStreamMeta->lock);
code = streamMetaRegisterTask(pStreamMeta, sversion, pTask, &added); code = streamMetaRegisterTask(pStreamMeta, sversion, pTask, &added);
int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
taosWUnLockLatch(&pStreamMeta->lock);
if (code < 0) { if (code < 0) {
tqError("vgId:%d failed to add s-task:0x%x, total:%d", vgId, pTask->id.taskId, numOfTasks); tqError("vgId:%d failed to add s-task:0x%x, total:%d", vgId, pTask->id.taskId, numOfTasks);
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
taosWUnLockLatch(&pStreamMeta->lock);
return -1; return -1;
} }
// not added into meta store // not added into meta store
if (!added) { if (added) {
tqWarn("vgId:%d failed to add s-task:0x%x, already exists in meta store", vgId, taskId);
tFreeStreamTask(pTask);
pTask = NULL;
}
taosWUnLockLatch(&pStreamMeta->lock);
tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks); tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
// 3. It's an fill history task, do nothing. wait for the main task to start it
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId); SStreamTask* p = streamMetaAcquireTask(pStreamMeta, taskId);
if (p != NULL) { // reset the downstreamReady flag. if (p != NULL) { // reset the downstreamReady flag.
streamTaskCheckDownstreamTasks(p); streamTaskCheckDownstreamTasks(p);
} }
streamMetaReleaseTask(pStreamMeta, p); streamMetaReleaseTask(pStreamMeta, p);
} else {
tqWarn("vgId:%d failed to add s-task:0x%x, already exists in meta store", vgId, taskId);
tFreeStreamTask(pTask);
}
return 0; return 0;
} }
......
...@@ -304,7 +304,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3 ...@@ -304,7 +304,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
return pTaskInfo; return pTaskInfo;
} }
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId) { qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId) {
if (msg == NULL) { if (msg == NULL) {
return NULL; return NULL;
} }
...@@ -317,7 +317,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v ...@@ -317,7 +317,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
} }
qTaskInfo_t pTaskInfo = NULL; qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(readers, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM); code = qCreateExecTask(readers, vgId, taskId, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
nodesDestroyNode((SNode*)pPlan); nodesDestroyNode((SNode*)pPlan);
qDestroyTask(pTaskInfo); qDestroyTask(pTaskInfo);
......
...@@ -216,7 +216,7 @@ static void freeItem(void* p) { ...@@ -216,7 +216,7 @@ static void freeItem(void* p) {
} }
void tFreeStreamTask(SStreamTask* pTask) { void tFreeStreamTask(SStreamTask* pTask) {
qDebug("free s-task:%s, %p", pTask->id.idStr, pTask); qDebug("free s-task:0x%x, %p", pTask->id.taskId, pTask);
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
if (pTask->inputQueue) { if (pTask->inputQueue) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册