diff --git a/source/dnode/mnode/impl/inc/mndScheduler.h b/source/dnode/mnode/impl/inc/mndScheduler.h index 15d2c6cd5e213955f81e6203b63e2ba26125e44e..db81d8843e5415b4bf11fd33a013fb700f5914bc 100644 --- a/source/dnode/mnode/impl/inc/mndScheduler.h +++ b/source/dnode/mnode/impl/inc/mndScheduler.h @@ -27,12 +27,10 @@ void mndCleanupScheduler(SMnode* pMnode); int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub); -int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); - int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, int64_t watermark); -int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); +int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 9d8ed3701ee9a7001c20540b3e6c97f87db40a29..eb0ccbb5f77f51420b37c00d9280b9bcd206e190 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -97,37 +97,7 @@ END: return terrno; } -int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) { - SEncoder encoder; - tEncoderInit(&encoder, NULL, 0); - tEncodeSStreamTask(&encoder, pTask); - int32_t size = encoder.pos; - int32_t tlen = sizeof(SMsgHead) + size; - tEncoderClear(&encoder); - void* buf = taosMemoryCalloc(1, tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - ((SMsgHead*)buf)->vgId = htonl(nodeId); - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tEncoderInit(&encoder, abuf, size); - tEncodeSStreamTask(&encoder, pTask); - tEncoderClear(&encoder); - - STransAction action = {0}; - memcpy(&action.epSet, pEpSet, sizeof(SEpSet)); - action.pCont = buf; - action.contLen = tlen; - action.msgType = type; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(buf); - return -1; - } - return 0; -} - -int32_t mndAddSinkToTask(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, SStreamTask* pTask) { +int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) { pTask->dispatchType = TASK_DISPATCH__NONE; // sink if (pStream->smaId != 0) { @@ -142,7 +112,7 @@ int32_t mndAddSinkToTask(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, SS return 0; } -int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, SStreamTask* pTask) { +int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) { pTask->sinkType = TASK_SINK__NONE; if (pStream->fixedSinkVgId == 0) { pTask->dispatchType = TASK_DISPATCH__SHUFFLE; @@ -187,7 +157,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj* return 0; } -int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) { +int32_t mndAssignTaskToVg(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) { int32_t msgLen; pTask->nodeId = pVgroup->vgId; pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); @@ -196,11 +166,11 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SS plan->execNode.epSet = pTask->epSet; if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) { + ASSERT(0); terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE || pTask->sinkType != TASK_SINK__NONE); - mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_STREAM_TASK_DEPLOY, pVgroup->vgId); return 0; } @@ -212,8 +182,7 @@ SSnodeObj* mndSchedFetchOneSnode(SMnode* pMnode) { return pObj; } -int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, - const SSnodeObj* pSnode) { +int32_t mndAssignTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SSnodeObj* pSnode) { int32_t msgLen; pTask->nodeId = SNODE_HANDLE; @@ -223,10 +192,10 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, plan->execNode.epSet = pTask->epSet; if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) { + ASSERT(0); terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } - mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_STREAM_TASK_DEPLOY, SNODE_HANDLE); return 0; } @@ -245,7 +214,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { return pVgroup; } -int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { +int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { SSdb* pSdb = pMnode->pSdb; void* pIter = NULL; SArray* tasks = taosArrayGetP(pStream->tasks, 0); @@ -262,6 +231,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamOb } SStreamTask* pTask = tNewSStreamTask(pStream->uid); if (pTask == NULL) { + sdbRelease(pSdb, pVgroup); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -290,13 +260,11 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, STrans* pTrans, SStreamOb // dispatch pTask->dispatchType = TASK_DISPATCH__NONE; - - mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_STREAM_TASK_DEPLOY, pVgroup->vgId); } return 0; } -int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { +int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) { ASSERT(pStream->fixedSinkVgId != 0); SArray* tasks = taosArrayGetP(pStream->tasks, 0); SStreamTask* pTask = tNewSStreamTask(pStream->uid); @@ -337,13 +305,10 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* // dispatch pTask->dispatchType = TASK_DISPATCH__NONE; - - mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_STREAM_TASK_DEPLOY, pStream->fixedSinkVg.vgId); - return 0; } -int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { +int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { SSdb* pSdb = pMnode->pSdb; SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); if (pPlan == NULL) { @@ -368,9 +333,15 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { // add extra sink hasExtraSink = true; if (pStream->fixedSinkVgId == 0) { - mndAddShuffleSinkTasksToStream(pMnode, pTrans, pStream); + if (mndAddShuffleSinkTasksToStream(pMnode, pStream) < 0) { + // TODO free + return -1; + } } else { - mndAddFixedSinkTaskToStream(pMnode, pTrans, pStream); + if (mndAddFixedSinkTaskToStream(pMnode, pStream) < 0) { + // TODO free + return -1; + } } } @@ -386,6 +357,11 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE); pInnerTask = tNewSStreamTask(pStream->uid); + if (pInnerTask == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + qDestroyQueryPlan(pPlan); + return -1; + } mndAddTaskToTaskSet(taskInnerLevel, pInnerTask); pInnerTask->childEpInfo = taosArrayInit(0, sizeof(void*)); @@ -397,7 +373,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { pInnerTask->triggerParam = pStream->triggerParam; // dispatch - if (mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pInnerTask) < 0) { + if (mndAddDispatcherToInnerTask(pMnode, pStream, pInnerTask) < 0) { qDestroyQueryPlan(pPlan); return -1; } @@ -409,14 +385,13 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode); if (pSnode == NULL) { SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid); - if (mndAssignTaskToVg(pMnode, pTrans, pInnerTask, plan, pVgroup) < 0) { + if (mndAssignTaskToVg(pMnode, pInnerTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); return -1; } } else { - if (mndAssignTaskToSnode(pMnode, pTrans, pInnerTask, plan, pSnode) < 0) { - ASSERT(0); + if (mndAssignTaskToSnode(pMnode, pInnerTask, plan, pSnode) < 0) { sdbRelease(pSdb, pSnode); qDestroyQueryPlan(pPlan); return -1; @@ -424,7 +399,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { } } else { SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid); - if (mndAssignTaskToVg(pMnode, pTrans, pInnerTask, plan, pVgroup) < 0) { + if (mndAssignTaskToVg(pMnode, pInnerTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); return -1; @@ -450,6 +425,12 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { continue; } SStreamTask* pTask = tNewSStreamTask(pStream->uid); + if (pInnerTask == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + sdbRelease(pSdb, pVgroup); + qDestroyQueryPlan(pPlan); + return -1; + } mndAddTaskToTaskSet(taskSourceLevel, pTask); // source @@ -466,7 +447,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { // exec pTask->execType = TASK_EXEC__PIPE; - if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { + if (mndAssignTaskToVg(pMnode, pTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); return -1; @@ -507,6 +488,11 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { continue; } SStreamTask* pTask = tNewSStreamTask(pStream->uid); + if (pTask == NULL) { + sdbRelease(pSdb, pVgroup); + qDestroyQueryPlan(pPlan); + return -1; + } mndAddTaskToTaskSet(taskOneLevel, pTask); // source @@ -517,14 +503,14 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { // sink or dispatch if (hasExtraSink) { - mndAddDispatcherToInnerTask(pMnode, pTrans, pStream, pTask); + mndAddDispatcherToInnerTask(pMnode, pStream, pTask); } else { - mndAddSinkToTask(pMnode, pTrans, pStream, pTask); + mndAddSinkToTask(pMnode, pStream, pTask); } // exec pTask->execType = TASK_EXEC__PIPE; - if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { + if (mndAssignTaskToVg(pMnode, pTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); return -1; diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index c44fb03be23c7740759cf31233b4847756d1af08..05603f855483b85d21352512aae7a0ef345a0069 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -533,7 +533,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea #if 0 smaObj.timezone = pCreate->timezone; #endif - smaObj.timezone = tsTimezone; // use timezone of server + smaObj.timezone = tsTimezone; // use timezone of server smaObj.interval = pCreate->interval; smaObj.offset = pCreate->offset; smaObj.sliding = pCreate->sliding; @@ -623,7 +623,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER; // if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER; - if (mndScheduleStream(pMnode, pTrans, &streamObj) != 0) goto _OVER; + if (mndScheduleStream(pMnode, &streamObj) != 0) goto _OVER; if (mndPersistStream(pMnode, pTrans, &streamObj) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 7e9069d5fcf0fc62a349254d10b251d80ef4f0ec..96a82e2c18fa44cb982c9729739049cd29ffd5fe 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -320,70 +320,72 @@ FAIL: return 0; } -int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { - SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); +int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) { + SEncoder encoder; + tEncoderInit(&encoder, NULL, 0); + tEncodeSStreamTask(&encoder, pTask); + int32_t size = encoder.pos; + int32_t tlen = sizeof(SMsgHead) + size; + tEncoderClear(&encoder); + void *buf = taosMemoryCalloc(1, tlen); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); - return 0; -} + ((SMsgHead *)buf)->vgId = htonl(pTask->nodeId); + void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tEncoderInit(&encoder, abuf, size); + tEncodeSStreamTask(&encoder, pTask); + tEncoderClear(&encoder); -int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { - SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); - if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); + STransAction action = {0}; + memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet)); + action.pCont = buf; + action.contLen = tlen; + action.msgType = TDMT_STREAM_TASK_DEPLOY; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(buf); return -1; } - sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); return 0; } -int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) { - SNode *pAst = NULL; - - if (nodesStringToNode(ast, &pAst) < 0) { - return -1; - } - - if (qExtractResultSchema(pAst, (int32_t *)&pStream->outputSchema.nCols, &pStream->outputSchema.pSchema) != 0) { - nodesDestroyNode(pAst); - return -1; - } - // free - nodesDestroyNode(pAst); - -#if 0 - printf("|"); - for (int i = 0; i < pStream->outputSchema.nCols; i++) { - printf(" %15s |", (char *)pStream->outputSchema.pSchema[i].name); +int32_t mndPersistStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { + int32_t level = taosArrayGetSize(pStream->tasks); + for (int32_t i = 0; i < level; i++) { + SArray *pLevel = taosArrayGetP(pStream->tasks, i); + int32_t sz = taosArrayGetSize(pLevel); + for (int32_t j = 0; j < sz; j++) { + SStreamTask *pTask = taosArrayGetP(pLevel, j); + if (mndPersistTaskDeployReq(pTrans, pTask) < 0) { + return -1; + } + } } - printf("\n=======================================================\n"); - -#endif + return 0; +} - if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, pStream->trigger, pStream->watermark, &pStream->physicalPlan)) { - mError("topic:%s, failed to get plan since %s", pStream->name, terrstr()); +int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { + if (mndPersistStreamTasks(pMnode, pTrans, pStream) < 0) { return -1; } - - if (mndScheduleStream(pMnode, pTrans, pStream) < 0) { - mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr()); + SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); return -1; } - mDebug("trans:%d, used to create stream:%s", pTrans->id, pStream->name); + sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); + return 0; +} +int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; } - sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); - + sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); return 0; } @@ -503,64 +505,6 @@ static int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pS return 0; } -static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) { - mDebug("stream:%s to create", pCreate->name); - SStreamObj streamObj = {0}; - tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN); - tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN); - tstrncpy(streamObj.targetSTbName, pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN); - streamObj.createTime = taosGetTimestampMs(); - streamObj.updateTime = streamObj.createTime; - streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name)); - streamObj.targetStbUid = mndGenerateUid(pCreate->targetStbFullName, TSDB_TABLE_FNAME_LEN); - streamObj.sourceDbUid = pDb->uid; - streamObj.version = 1; - streamObj.sql = pCreate->sql; - // TODO - streamObj.fixedSinkVgId = 0; - streamObj.smaId = 0; - streamObj.trigger = pCreate->triggerType; - streamObj.watermark = pCreate->watermark; - streamObj.triggerParam = pCreate->maxDelay; - - if (streamObj.targetSTbName[0]) { - pDb = mndAcquireDbByStb(pMnode, streamObj.targetSTbName); - if (pDb == NULL) { - terrno = TSDB_CODE_MND_DB_NOT_SELECTED; - return -1; - } - tstrncpy(streamObj.targetDb, pDb->name, TSDB_DB_FNAME_LEN); - } - - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq); - if (pTrans == NULL) { - mError("stream:%s, failed to create since %s", pCreate->name, terrstr()); - return -1; - } - mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name); - - if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) { - mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } - - if (streamObj.targetSTbName[0] && mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) { - mError("trans:%d, failed to create stb for stream since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - return -1; - } - - mndTransDrop(pTrans); - return 0; -} - static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = -1; @@ -631,7 +575,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { } // schedule stream task for stream obj - if (mndScheduleStream(pMnode, pTrans, &streamObj) < 0) { + if (mndScheduleStream(pMnode, &streamObj) < 0) { mError("stream:%s, failed to schedule since %s", createStreamReq.name, terrstr()); mndTransDrop(pTrans); goto _OVER; @@ -653,8 +597,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { mndTransDrop(pTrans); - /*code = mndCreateStream(pMnode, pReq, &createStreamReq, pDb);*/ - /*if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;*/ code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 5d2fb81cf0669e07f703fc0006416353ada7bc2d..0243f2a9f01ed83256d07c850d485638b979bbc9 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -146,8 +146,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_ int32_t* pNumOfRows) { *pUid = 0; - // TODO set to real sversion - /*int32_t sversion = 1;*/ + // TODO: cache multiple schema int32_t sversion = htonl(pHandle->pBlock->sversion); if (pHandle->cachedSchemaSuid == 0 || pHandle->cachedSchemaVer != sversion || pHandle->cachedSchemaSuid != pHandle->msgIter.suid) { @@ -161,7 +160,6 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_ return -1; } - // this interface use suid instead of uid if (pHandle->pSchemaWrapper) tDeleteSSchemaWrapper(pHandle->pSchemaWrapper); pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion, true); if (pHandle->pSchemaWrapper == NULL) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 1bd1ce14b1d3f6102f3a909ba53462a1f6e5e538..ad400d720aecb1cf1bd6969bb533635e8a511660 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -40,15 +40,8 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu SStreamBlockScanInfo* pInfo = pOperator->info; pInfo->assignBlockUid = assignUid; - // no need to check -#if 0 - if (pInfo->blockType == 0) { - pInfo->blockType = type; - } else if (pInfo->blockType != type) { - ASSERT(0); - return TSDB_CODE_QRY_APP_ERROR; - } -#endif + // TODO: if a block was set but not consumed, + // prevent setting a different type of block pInfo->blockType = type; if (type == STREAM_DATA_TYPE_SUBMIT_BLOCK) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index afe0a1f8fa9391857df89ebae4ad74fbc635a76f..2db10973a5b951cae1c96786e2cf7ab242e80bfe 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1321,7 +1321,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) { // todo move to the initialization function int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0); - size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); SFilterColumnParam param1 = {.numOfCols = numOfCols, .pDataBlock = pBlock->pDataBlock}; code = filterSetDataFromSlotId(filter, ¶m1); @@ -2048,7 +2048,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLo SArray* pColList) { if (pColList == NULL) { // data from other sources blockDataCleanup(pRes); -// blockDataEnsureCapacity(pRes, numOfRows); + // blockDataEnsureCapacity(pRes, numOfRows); blockCompressDecode(pRes, numOfOutput, numOfRows, pData); } else { // extract data according to pColList ASSERT(numOfOutput == taosArrayGetSize(pColList)); @@ -2402,14 +2402,14 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode tsem_init(&pInfo->ready, 0, 0); - pInfo->seqLoadData = false; + pInfo->seqLoadData = false; pInfo->pTransporter = pTransporter; - pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc); - pOperator->name = "ExchangeOperator"; + pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc); + pOperator->name = "ExchangeOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pResult->pDataBlock); pOperator->pTaskInfo = pTaskInfo; @@ -4361,6 +4361,62 @@ _error: return NULL; } +int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) { + if (pNode->pChildren == NULL || LIST_LENGTH(pNode->pChildren) == 0) { + if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == pNode->type) { + *ppNode = (STableScanPhysiNode*)pNode; + return 0; + } else { + ASSERT(0); + terrno = TSDB_CODE_QRY_APP_ERROR; + return -1; + } + } else { + if (LIST_LENGTH(pNode->pChildren) != 1) { + ASSERT(0); + terrno = TSDB_CODE_QRY_APP_ERROR; + return -1; + } + SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pNode->pChildren, 0); + return extractTableScanNode(pChildNode, ppNode); + } + return -1; +} + +int32_t doRebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle) { + if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + if (pOperator->numOfDownstream == 0) { + qError("failed to find stream scan operator"); + return TSDB_CODE_QRY_APP_ERROR; + } + + if (pOperator->numOfDownstream > 1) { + qError("join not supported for stream block scan"); + return TSDB_CODE_QRY_APP_ERROR; + } + return doRebuildReader(pOperator->pDownstream[0], plan, pHandle); + } else { + SStreamBlockScanInfo* pInfo = pOperator->info; + ASSERT(pInfo->pSnapshotReadOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN); + STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info; + + tsdbCleanupReadHandle(pTableScanInfo->dataReader); + STableScanPhysiNode* pNode = NULL; + if (extractTableScanNode(plan->pNode, &pNode) < 0) { + ASSERT(0); + } + + STableListInfo info = {0}; + pTableScanInfo->dataReader = doCreateDataReader(pNode, pHandle, &info, 0, 0); + if (pTableScanInfo->dataReader == NULL) { + ASSERT(0); + qError("failed to create data reader"); + return TSDB_CODE_QRY_APP_ERROR; + } + } + return 0; +} + int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length) { int32_t code = TDB_CODE_SUCCESS; char* pCurrent = NULL;