diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 2e4fb2320701bb9d74b61107f98c4c6297ba2059..7a8c074283c257c7e917cb47a1064885e4d50c21 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -221,7 +221,7 @@ void* qExtractReaderFromStreamScanner(void* scanner); int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner); int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo); -int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver, int64_t ekey); +int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow); int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver); int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); int32_t qStreamRestoreParam(qTaskInfo_t tinfo); diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 4cfbf2c2fa2c3ae35053b2ad1ce5ed6aef115a42..e9340a33c36595779655608d702d167b928a4e63 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -233,28 +233,6 @@ typedef struct SStoreSnapshotFn { int32_t (*getTableInfoFromSnapshot)(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid); } SStoreSnapshotFn; -/** -void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags); -void metaReaderReleaseLock(SMetaReader *pReader); -void metaReaderClear(SMetaReader *pReader); -int32_t metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid); -int32_t metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid); -int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList); -const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal); -int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName); - -int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid); -int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType); -bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid); -int32_t metaGetCachedTableUidList(SMeta *pMeta, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList, - bool *acquired); -int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload, - int32_t payloadLen, double selectivityRatio); -tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name); -int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList); -int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t payloadLen); - */ - typedef struct SStoreMeta { SMTbCursor *(*openTableMetaCursor)(void *pVnode); // metaOpenTbCursor void (*closeTableMetaCursor)(SMTbCursor *pTbCur); // metaCloseTbCursor diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5d2c0e4e33261d3f193b093e7b36af92845b5e20..898db47e865762de2eb26904c6b3b0a57129e6da 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -557,18 +557,19 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask); int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); int32_t streamTaskStartHistoryTask(SStreamTask* pTask, int64_t ver); +int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); // common int32_t streamSetParamForRecover(SStreamTask* pTask); int32_t streamRestoreParam(SStreamTask* pTask); int32_t streamSetStatusNormal(SStreamTask* pTask); // source level -int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, int64_t ver, int64_t ekey); +int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow); int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq); int32_t streamSourceRecoverScanStep1(SStreamTask* pTask); int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq); int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver); -int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask); +int32_t streamDispatchRecoverFinishMsg(SStreamTask* pTask); // agg level int32_t streamAggRecoverPrepare(SStreamTask* pTask); int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 8886687f01cb64abddea2b6754aad451a21ad99d..a1c81a999f8dfc4e9df16270ad4a5bebcc2608f8 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -25,7 +25,7 @@ #define SINK_NODE_LEVEL (0) extern bool tsDeployOnSnode; -static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup); +static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, int32_t fillHistory); static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask); int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, @@ -101,7 +101,7 @@ int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) { return 0; } -int32_t mndAddDispatcherForInnerTask(SMnode* pMnode, SStreamObj* pStream, SArray* pSinkNodeList, SStreamTask* pTask) { +int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SArray* pSinkNodeList, SStreamTask* pTask) { bool isShuffle = false; if (pStream->fixedSinkVgId == 0) { @@ -202,7 +202,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { } // create sink node for each vgroup. -int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStreamObj* pStream) { +int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStreamObj* pStream, int32_t fillHistory) { SSdb* pSdb = pMnode->pSdb; void* pIter = NULL; @@ -218,15 +218,15 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStrea continue; } - mndAddSinkTaskToStream(pStream, pTaskList, pMnode, pVgroup->vgId, pVgroup); + mndAddSinkTaskToStream(pStream, pTaskList, pMnode, pVgroup->vgId, pVgroup, fillHistory); sdbRelease(pSdb, pVgroup); } return 0; } -int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup) { - SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, pStream->conf.fillHistory, 0, pTaskList); +int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, int32_t fillHistory) { + SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, fillHistory, 0, pTaskList); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -246,14 +246,15 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas return terrno; } - if (fillHistory) { // todo set the correct ts, which should be last key of queried table. - pTask->dataRange.window.skey = INT64_MIN; - pTask->dataRange.window.ekey = taosGetTimestampMs(); - } + // todo set the correct ts, which should be last key of queried table. + pTask->dataRange.window.skey = INT64_MIN; + pTask->dataRange.window.ekey = 1685959190000;//taosGetTimestampMs(); + + mDebug("0x%x----------------window:%"PRId64" - %"PRId64, pTask->id.taskId, pTask->dataRange.window.skey, pTask->dataRange.window.ekey); // sink or dispatch if (hasExtraSink) { - mndAddDispatcherForInnerTask(pMnode, pStream, pSinkTaskList, pTask); + mndAddDispatcherForInternalTask(pMnode, pStream, pSinkTaskList, pTask); } else { mndSetSinkTaskInfo(pStream, pTask); } @@ -318,11 +319,12 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) { (*pStreamTask)->historyTaskId.taskId = (*pHTask)->id.taskId; (*pStreamTask)->historyTaskId.streamId = (*pHTask)->id.streamId; + mDebug("s-task:0x%x related history task:0x%x", (*pStreamTask)->id.taskId, (*pHTask)->id.taskId); } } -static int32_t addSourceTasksForSingleLevelStream(SMnode* pMnode, const SQueryPlan* pPlan, SStreamObj* pStream, - bool hasExtraSink) { +static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* pPlan, SStreamObj* pStream, + bool hasExtraSink) { // create exec stream task, since only one level, the exec task is also the source task SArray* pTaskList = addNewTaskList(pStream->tasks); @@ -368,8 +370,8 @@ static int32_t addSourceTasksForSingleLevelStream(SMnode* pMnode, const SQueryPl if (pStream->conf.fillHistory) { SArray** pHSinkTaskList = taosArrayGet(pStream->pHTasksList, SINK_NODE_LEVEL); - code = addSourceStreamTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid, pStream->conf.fillHistory, - hasExtraSink); + code = addSourceStreamTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid, + pStream->conf.fillHistory, hasExtraSink); setHTasksId(pTaskList, pHTaskList); } @@ -390,6 +392,13 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui return -1; } + // todo set the correct ts, which should be last key of queried table. + pTask->dataRange.window.skey = INT64_MIN; + pTask->dataRange.window.ekey = 1685959190000;//taosGetTimestampMs(); + + mDebug("0x%x----------------window:%"PRId64" - %"PRId64, pTask->id.taskId, pTask->dataRange.window.skey, pTask->dataRange.window.ekey); + + // all the source tasks dispatch result to a single agg node. setFixedDownstreamEpInfo(pTask, pDownstreamTask); if (mndAssignStreamTaskToVgroup(pMnode, pTask, pPlan, pVgroup) < 0) { @@ -399,38 +408,38 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui return setEpToDownstreamTask(pTask, pDownstreamTask); } -static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream, +static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream, int32_t fillHistory, SStreamTask** pAggTask) { - *pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, pStream->conf.fillHistory, - pStream->conf.triggerParam, pTaskList); + *pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, fillHistory, pStream->conf.triggerParam, pTaskList); if (*pAggTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } // dispatch - if (mndAddDispatcherForInnerTask(pMnode, pStream, pSinkNodeList, *pAggTask) < 0) { + if (mndAddDispatcherForInternalTask(pMnode, pStream, pSinkNodeList, *pAggTask) < 0) { return -1; } return 0; } -static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan) { +static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, SStreamTask** pAggTask, + SStreamTask** pHAggTask) { SArray* pAggTaskList = addNewTaskList(pStream->tasks); SSdb* pSdb = pMnode->pSdb; - SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); - SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0); + SNodeListNode* pInnerNode = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0); + SSubplan* plan = (SSubplan*)nodesListGetNode(pInnerNode->pNodeList, 0); if (plan->subplanType != SUBPLAN_TYPE_MERGE) { terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } - SStreamTask* pAggTask = NULL; - SArray* pSinkNodeList = taosArrayGet(pStream->tasks, SINK_NODE_LEVEL); + *pAggTask = NULL; + SArray* pSinkNodeList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL); - int32_t code = doAddAggTask(pStream->uid, pAggTaskList, pSinkNodeList, pMnode, pStream, &pAggTask); + int32_t code = doAddAggTask(pStream->uid, pAggTaskList, pSinkNodeList, pMnode, pStream, 0, pAggTask); if (code != TSDB_CODE_SUCCESS) { return -1; } @@ -448,16 +457,17 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan } if (pSnode != NULL) { - code = mndAssignStreamTaskToSnode(pMnode, pAggTask, plan, pSnode); + code = mndAssignStreamTaskToSnode(pMnode, *pAggTask, plan, pSnode); } else { - code = mndAssignStreamTaskToVgroup(pMnode, pAggTask, plan, pVgroup); + code = mndAssignStreamTaskToVgroup(pMnode, *pAggTask, plan, pVgroup); } if (pStream->conf.fillHistory) { SArray* pHAggTaskList = addNewTaskList(pStream->pHTasksList); + SArray* pHSinkNodeList = taosArrayGetP(pStream->pHTasksList, SINK_NODE_LEVEL); - SStreamTask* pHAggTask = NULL; - code = doAddAggTask(pStream->uid, pAggTaskList, pSinkNodeList, pMnode, pStream, &pHAggTask); + *pHAggTask = NULL; + code = doAddAggTask(pStream->hTaskUid, pHAggTaskList, pHSinkNodeList, pMnode, pStream, pStream->conf.fillHistory, pHAggTask); if (code != TSDB_CODE_SUCCESS) { if (pSnode != NULL) { sdbRelease(pSdb, pSnode); @@ -468,9 +478,9 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan } if (pSnode != NULL) { - code = mndAssignStreamTaskToSnode(pMnode, pHAggTask, plan, pSnode); + code = mndAssignStreamTaskToSnode(pMnode, *pHAggTask, plan, pSnode); } else { - code = mndAssignStreamTaskToVgroup(pMnode, pHAggTask, plan, pVgroup); + code = mndAssignStreamTaskToVgroup(pMnode, *pHAggTask, plan, pVgroup); } setHTasksId(pAggTaskList, pHAggTaskList); @@ -515,7 +525,7 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl continue; } - int32_t code = doAddSourceTask(pSourceTaskList, pStream->conf.fillHistory, pStream->uid, pDownstreamTask, pMnode, plan, pVgroup); + int32_t code = doAddSourceTask(pSourceTaskList, 0, pStream->uid, pDownstreamTask, pMnode, plan, pVgroup); if (code != TSDB_CODE_SUCCESS) { sdbRelease(pSdb, pVgroup); terrno = code; @@ -523,7 +533,7 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl } if (pStream->conf.fillHistory) { - code = doAddSourceTask(pHSourceTaskList, 0, pStream->hTaskUid, pHDownstreamTask, pMnode, plan, pVgroup); + code = doAddSourceTask(pHSourceTaskList, pStream->conf.fillHistory, pStream->hTaskUid, pHDownstreamTask, pMnode, plan, pVgroup); sdbRelease(pSdb, pVgroup); if (code != TSDB_CODE_SUCCESS) { @@ -537,15 +547,15 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl return TSDB_CODE_SUCCESS; } -static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStream, SArray** pCreatedTaskList) { +static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStream, SArray** pCreatedTaskList, int32_t fillHistory) { SArray* pSinkTaskList = addNewTaskList(pTasksList); if (pStream->fixedSinkVgId == 0) { - if (mndAddShuffleSinkTasksToStream(pMnode, pSinkTaskList, pStream) < 0) { + if (mndAddShuffleSinkTasksToStream(pMnode, pSinkTaskList, pStream, fillHistory) < 0) { // TODO free return -1; } } else { - if (mndAddSinkTaskToStream(pStream, pSinkTaskList, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg) < 0) { + if (mndAddSinkTaskToStream(pStream, pSinkTaskList, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg, fillHistory) < 0) { // TODO free return -1; } @@ -578,7 +588,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* hasExtraSink = true; SArray* pSinkTaskList = NULL; - int32_t code = addSinkTasks(pStream->tasks, pMnode, pStream, &pSinkTaskList); + int32_t code = addSinkTasks(pStream->tasks, pMnode, pStream, &pSinkTaskList, 0); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -586,7 +596,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* // check for fill history if (pStream->conf.fillHistory) { SArray* pHSinkTaskList = NULL; - code = addSinkTasks(pStream->pHTasksList, pMnode, pStream, &pHSinkTaskList); + code = addSinkTasks(pStream->pHTasksList, pMnode, pStream, &pHSinkTaskList, pStream->conf.fillHistory); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -598,16 +608,18 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pStream->totalLevel = numOfPlanLevel + hasExtraSink; if (numOfPlanLevel > 1) { - SStreamTask* pInnerTask; - int32_t code = addAggTask(pStream, pMnode, pPlan); + SStreamTask* pAggTask = NULL; + SStreamTask* pHAggTask = NULL; + + int32_t code = addAggTask(pStream, pMnode, pPlan, &pAggTask, &pHAggTask); if (code != TSDB_CODE_SUCCESS) { return code; } // source level - return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pInnerTask, NULL); + return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask); } else if (numOfPlanLevel == 1) { - return addSourceTasksForSingleLevelStream(pMnode, pPlan, pStream, hasExtraSink); + return addSourceTasksForOneLevelStream(pMnode, pPlan, pStream, hasExtraSink); } return 0; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 0187a9ac6e34ddc955f97579101d89071907105f..e29d2f7a8fcd2ef033952ebefc3616ca06693473 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -101,7 +101,7 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit); // meta -void _metaReaderInit(SMetaReader *pReader, void *pVnode, int32_t flags, SStoreMeta* pAPI); +void metaReaderInit(SMetaReader *pReader, void *pVnode, int32_t flags, SStoreMeta* pAPI); void metaReaderReleaseLock(SMetaReader *pReader); void metaReaderClear(SMetaReader *pReader); int32_t metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid); @@ -125,8 +125,6 @@ int32_t metaGetCachedTbGroup(void *pVnode, tb_uid_t suid, const uint8_t *pKey, int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload, int32_t payloadLen); -int64_t metaGetTbNum(SMeta *pMeta); - int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables); // tsdb diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index db285dc124f4c2081b67e0aa88381a85a7a27d17..607ff8665ac63b58c983da4510747bdb9f8602d9 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -175,7 +175,8 @@ void* metaGetIdx(SMeta* pMeta); void* metaGetIvtIdx(SMeta* pMeta); int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList); -void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags); +int64_t metaGetTbNum(SMeta *pMeta); +void metaReaderDoInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags); int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index fa9eea5e29d743062b8b3fbf4b3f57c690774602..8cfe41340abca51bdecafe156eb3c960a479b151 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -17,13 +17,13 @@ #include "osMemory.h" #include "tencode.h" -void _metaReaderInit(SMetaReader* pReader, void* pVnode, int32_t flags, SStoreMeta* pAPI) { +void metaReaderInit(SMetaReader* pReader, void* pVnode, int32_t flags, SStoreMeta* pAPI) { SMeta* pMeta = ((SVnode*)pVnode)->pMeta; - metaReaderInit(pReader, pMeta, flags); + metaReaderDoInit(pReader, pMeta, flags); pReader->pAPI = pAPI; } -void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) { +void metaReaderDoInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) { memset(pReader, 0, sizeof(*pReader)); pReader->pMeta = pMeta; pReader->flags = flags; @@ -143,7 +143,7 @@ tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) { int metaGetTableNameByUid(void *pVnode, uint64_t uid, char *tbName) { int code = 0; SMetaReader mr = {0}; - metaReaderInit(&mr, ((SVnode*)pVnode)->pMeta, 0); + metaReaderDoInit(&mr, ((SVnode*)pVnode)->pMeta, 0); code = metaReaderGetTableEntryByUid(&mr, uid); if (code < 0) { metaReaderClear(&mr); @@ -159,7 +159,7 @@ int metaGetTableNameByUid(void *pVnode, uint64_t uid, char *tbName) { int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName) { int code = 0; SMetaReader mr = {0}; - metaReaderInit(&mr, (SMeta *)meta, 0); + metaReaderDoInit(&mr, (SMeta *)meta, 0); code = metaReaderGetTableEntryByUid(&mr, uid); if (code < 0) { metaReaderClear(&mr); @@ -174,7 +174,7 @@ int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName) { int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid) { int code = 0; SMetaReader mr = {0}; - metaReaderInit(&mr, ((SVnode *)pVnode)->pMeta, 0); + metaReaderDoInit(&mr, ((SVnode *)pVnode)->pMeta, 0); SMetaReader *pReader = &mr; @@ -195,7 +195,7 @@ int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid) { int metaGetTableTypeByName(void *pVnode, char *tbName, ETableType *tbType) { int code = 0; SMetaReader mr = {0}; - metaReaderInit(&mr, ((SVnode*)pVnode)->pMeta, 0); + metaReaderDoInit(&mr, ((SVnode*)pVnode)->pMeta, 0); code = metaGetTableEntryByName(&mr, tbName); if (code == 0) *tbType = mr.me.type; @@ -222,7 +222,7 @@ SMTbCursor *metaOpenTbCursor(void *pVnode) { } SVnode* pVnodeObj = pVnode; - metaReaderInit(&pTbCur->mr, pVnodeObj->pMeta, 0); + metaReaderDoInit(&pTbCur->mr, pVnodeObj->pMeta, 0); tdbTbcOpen(pVnodeObj->pMeta->pUidIdx, (TBC **)&pTbCur->pDbc, NULL); @@ -760,7 +760,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) { } SMetaReader mr = {0}; - metaReaderInit(&mr, pMeta, 0); + metaReaderDoInit(&mr, pMeta, 0); int64_t smaId; int smaIdx = 0; STSma *pTSma = NULL; @@ -815,7 +815,7 @@ _err: STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) { STSma *pTSma = NULL; SMetaReader mr = {0}; - metaReaderInit(&mr, pMeta, 0); + metaReaderDoInit(&mr, pMeta, 0); if (metaReaderGetTableEntryByUid(&mr, indexUid) < 0) { metaWarn("vgId:%d, failed to get table entry for smaId:%" PRIi64, TD_VID(pMeta->pVnode), indexUid); metaReaderClear(&mr); diff --git a/source/dnode/vnode/src/meta/metaSma.c b/source/dnode/vnode/src/meta/metaSma.c index a49848f4421e1c0e103d2d083713582574aee609..91704f5c7ab1558a5610496757763e053dad570d 100644 --- a/source/dnode/vnode/src/meta/metaSma.c +++ b/source/dnode/vnode/src/meta/metaSma.c @@ -37,7 +37,7 @@ int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) { // validate req // save smaIndex - metaReaderInit(&mr, pMeta, 0); + metaReaderDoInit(&mr, pMeta, 0); if (metaReaderGetTableEntryByUidCache(&mr, pCfg->indexUid) == 0) { #if 1 terrno = TSDB_CODE_TSMA_ALREADY_EXIST; diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index df16304e28ecd67fcc6823d1b4d16c8951500571..def008311a800d6356ed76a53e244566d717137f 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -709,7 +709,7 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs } // validate req - metaReaderInit(&mr, pMeta, 0); + metaReaderDoInit(&mr, pMeta, 0); if (metaGetTableEntryByName(&mr, pReq->name) == 0) { if (pReq->type == TSDB_CHILD_TABLE && pReq->ctb.suid != mr.me.ctbEntry.suid) { terrno = TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 39aa5c30437a72f6ad26f22f4c2a01aa03a8c7dc..d393f4b6bc191f69d9405686c0fc9bf66e03eed5 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -896,7 +896,7 @@ static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) { return TSDB_CODE_SUCCESS; } - metaReaderInit(&mr, SMA_META(pSma), 0); + metaReaderDoInit(&mr, SMA_META(pSma), 0); smaDebug("vgId:%d, rsma clone qTaskInfo for suid:%" PRIi64, SMA_VID(pSma), pInfo->suid); if (metaReaderGetTableEntryByUidCache(&mr, pInfo->suid) < 0) { code = terrno; @@ -1116,7 +1116,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) { } int64_t nRsmaTables = 0; - metaReaderInit(&mr, SMA_META(pSma), 0); + metaReaderDoInit(&mr, SMA_META(pSma), 0); if (!(uidStore.tbUids = taosArrayInit(1024, sizeof(tb_uid_t)))) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4addda25b7b83751148eea03a2ca45927d4749c4..84e37649ac4835b4aafd055f79d2b3b4ad31b34c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -812,11 +812,15 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pTq->pVnode->msgCb; pTask->pMeta = pTq->pStreamMeta; + pTask->chkInfo.version = ver; pTask->chkInfo.currentVer = ver; + pTask->dataRange.range.maxVer = ver; + pTask->dataRange.range.minVer = ver; + // expand executor - pTask->status.taskStatus = (pTask->fillHistory) ? TASK_STATUS__WAIT_DOWNSTREAM : TASK_STATUS__NORMAL; + pTask->status.taskStatus = /*(pTask->fillHistory) ? */TASK_STATUS__WAIT_DOWNSTREAM /*: TASK_STATUS__NORMAL*/; if (pTask->taskLevel == TASK_LEVEL__SOURCE) { pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); @@ -919,16 +923,12 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { rsp.status = streamTaskCheckStatus(pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask); - tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 - ") %d at node %d task status:%d, check req from task %d at node %d, rsp status %d", - pTask->id.idStr, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus, - rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%d, rsp status %d", + pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, pTask->status.taskStatus, rsp.status); } else { rsp.status = 0; - tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 - ") %d at node %d, check req from task:0x%x at node %d, rsp status %d", - taskId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, - rsp.status); + tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d", + taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } SEncoder encoder; @@ -969,12 +969,12 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32 } tDecoderClear(&decoder); - tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task:0x%x at node %d, status %d", + tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d (vgId:%d) check req from task:0x%x (vgId:%d), status %d", rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId); if (pTask == NULL) { - tqError("tq failed to locate the stream task:0x%x vgId:%d, it may have been destroyed", rsp.upstreamTaskId, + tqError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId, pTq->pStreamMeta->vgId); return -1; } @@ -1027,13 +1027,27 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms taosWUnLockLatch(&pStreamMeta->lock); - // 3. for fill history task, do nothing. wait for the main task to start it + // 3. It's an fill history task, do nothing. wait for the main task to start it if (pTask->fillHistory) { tqDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr); } else { + // calculate the correct start time window, and start the handle the history data for the main task. if (pTask->historyTaskId.taskId != 0) { + // launch the history fill stream task streamTaskStartHistoryTask(pTask, sversion); + + // launch current task + SHistoryDataRange* pRange = &pTask->dataRange; + int64_t ekey = pRange->window.ekey; + int64_t ver = pRange->range.minVer; + + pRange->window.skey = ekey; + pRange->window.ekey = INT64_MAX; + pRange->range.minVer = 0; + pRange->range.maxVer = ver; } + + streamTaskCheckDownstreamTasks(pTask); } tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", vgId, pTask->id.idStr, @@ -1043,20 +1057,24 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms } int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { - int32_t code; + int32_t code = TSDB_CODE_SUCCESS; char* msg = pMsg->pCont; int32_t msgLen = pMsg->contLen; + SStreamMeta* pMeta = pTq->pStreamMeta; SStreamRecoverStep1Req* pReq = (SStreamRecoverStep1Req*)msg; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->taskId); if (pTask == NULL) { + tqError("vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed", + pMeta->vgId, pReq->taskId); return -1; } // check param int64_t fillVer1 = pTask->chkInfo.version; if (fillVer1 <= 0) { - streamMetaReleaseTask(pTq->pStreamMeta, pTask); + streamMetaReleaseTask(pMeta, pTask); return -1; } @@ -1068,14 +1086,27 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr); - streamMetaReleaseTask(pTq->pStreamMeta, pTask); + streamMetaReleaseTask(pMeta, pTask); return 0; } double el = (taosGetTimestampMs() - st) / 1000.0; tqDebug("s-task:%s history scan stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el); - // todo transfer the executor status, and then destroy this stream task + if (pTask->fillHistory) { + // todo transfer the executor status, and then destroy this stream task + } else { + // todo update the chkInfo version for current task. + // this task has an associated history stream task, so we need to scan wal from the end version of + // history scan. The current version of chkInfo.current is not updated during the history scan + tqDebug("s-task:%s history data scan completed, now start to scan data from wal, start ver:%" PRId64 + ", window:%" PRId64 " - %" PRId64, + pTask->id.idStr, pTask->chkInfo.currentVer, pTask->dataRange.window.skey, pTask->dataRange.window.ekey); + + code = streamTaskScanHistoryDataComplete(pTask); + streamMetaReleaseTask(pMeta, pTask); + return code; + } #if 0 // build msg to launch next step @@ -1162,7 +1193,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el); // dispatch recover finish req to all related downstream task - code = streamDispatchRecoverFinishReq(pTask); + code = streamDispatchRecoverFinishMsg(pTask); if (code < 0) { streamMetaReleaseTask(pTq->pStreamMeta, pTask); return -1; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 77a966715eb84410cecc245c38d0f1ba60bcd9aa..155ef92ae5025d6b6e7a42e7e95d87078d186225 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -114,7 +114,7 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) { } SMetaReader mr = {0}; - metaReaderInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, 0); + metaReaderDoInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, 0); if (metaGetTableEntryByName(&mr, req.tbName) < 0) { metaReaderClear(&mr); diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 1ff78c586f747ad74bce7d752818bddd90a1de67..2fd78f1309c0b73d4799e49fa9815fee6e23a640 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -48,7 +48,7 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, int32_t n) { SMetaReader mr = {0}; - metaReaderInit(&mr, pTq->pVnode->pMeta, 0); + metaReaderDoInit(&mr, pTq->pVnode->pMeta, 0); // TODO add reference to gurantee success if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index db1b5ed9029dc53c7907d2322cf1f9c648268b2a..604b171daff38c9972d9ef6dc288f8902b49ebea 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -311,7 +311,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d tbData.uid = pTableSinkInfo->uid; } else { SMetaReader mr = {0}; - metaReaderInit(&mr, pVnode->pMeta, 0); + metaReaderDoInit(&mr, pVnode->pMeta, 0); if (metaGetTableEntryByName(&mr, ctbName) < 0) { metaReaderClear(&mr); taosMemoryFree(pTableSinkInfo); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index c659c8f4a2a917da32764f12494e6f318f7681b9..fc1600dc3a9acaaedfa6c2c5ebcef7a69f850a26 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1431,7 +1431,7 @@ static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) { tb_uid_t suid = 0; SMetaReader mr = {0}; - metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0); + metaReaderDoInit(&mr, pTsdb->pVnode->pMeta, 0); if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) { metaReaderClear(&mr); // table not esist return 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 2500015ec1d87d05667b63ca305eac6a16bfe605..10d19da4c9c5b7c55039199c3f9a198b3528a835 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -5452,7 +5452,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) { int32_t tsdbGetTableSchema(void* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid) { SMetaReader mr = {0}; - metaReaderInit(&mr, ((SVnode*)pVnode)->pMeta, 0); + metaReaderDoInit(&mr, ((SVnode*)pVnode)->pMeta, 0); int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid); if (code != TSDB_CODE_SUCCESS) { terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; @@ -5584,57 +5584,3 @@ void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) { void tsdbReaderSetCloseFlag(STsdbReader* pReader) { pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED; } -/*-------------todo:refactor the implementation of those APIs in this file to seperate the API into two files------*/ -// opt perf, do NOT create so many readers -int64_t tsdbGetLastTimestamp(SVnode* pVnode, void* pTableList, int32_t numOfTables, const char* pIdStr) { - SQueryTableDataCond cond = {.type = TIMEWINDOW_RANGE_CONTAINED, .numOfCols = 1, .order = TSDB_ORDER_DESC, - .startVersion = -1, .endVersion = -1}; - cond.twindows.skey = INT64_MIN; - cond.twindows.ekey = INT64_MAX; - - cond.colList = taosMemoryCalloc(1, sizeof(SColumnInfo)); - cond.pSlotList = taosMemoryMalloc(sizeof(int32_t) * cond.numOfCols); - if (cond.colList == NULL || cond.pSlotList == NULL) { - // todo - } - - cond.colList[0].colId = 1; - cond.colList[0].slotId = 0; - cond.colList[0].type = TSDB_DATA_TYPE_TIMESTAMP; - - cond.pSlotList[0] = 0; - - STableKeyInfo* pTableKeyInfo = pTableList; - STsdbReader* pReader = NULL; - SSDataBlock* pBlock = createDataBlock(); - - SColumnInfoData data = {0}; - data.info = (SColumnInfo) {.type = TSDB_DATA_TYPE_TIMESTAMP, .colId = 1, .bytes = TSDB_KEYSIZE}; - blockDataAppendColInfo(pBlock, &data); - - int64_t key = INT64_MIN; - - for(int32_t i = 0; i < numOfTables; ++i) { - int32_t code = tsdbReaderOpen(pVnode, &cond, &pTableKeyInfo[i], 1, pBlock, (void**)&pReader, pIdStr, false, NULL); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - bool hasData = false; - code = tsdbNextDataBlock(pReader, &hasData); - if (!hasData || code != TSDB_CODE_SUCCESS) { - continue; - } - - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, 0); - int64_t k = *(int64_t*)pCol->pData; - - if (key < k) { - key = k; - } - - tsdbReaderClose(pReader); - } - - return 0; -} diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index 526b9b4e2db3f586f6cd8936d939e7da4c970be9..45dec6ebee9f42814a2a8f069dc0e7a053bce358 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -204,7 +204,7 @@ void initStateStoreAPI(SStateStore* pStore) { } void initMetaReaderAPI(SStoreMetaReader* pMetaReader) { - pMetaReader->initReader = _metaReaderInit; + pMetaReader->initReader = metaReaderInit; pMetaReader->clearReader = metaReaderClear; pMetaReader->getTableEntryByUid = metaReaderGetTableEntryByUid; diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 022fc4c951cf2b1d27326dc4788affc8bb9caf48..c122a98a123850dbc4132cffb6d3719a5ae72e72 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -62,7 +62,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { } // query meta - metaReaderInit(&mer1, pVnode->pMeta, 0); + metaReaderDoInit(&mer1, pVnode->pMeta, 0); if (metaGetTableEntryByName(&mer1, infoReq.tbName) < 0) { code = terrno; @@ -79,7 +79,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { schemaTag = mer1.me.stbEntry.schemaTag; metaRsp.suid = mer1.me.uid; } else if (mer1.me.type == TSDB_CHILD_TABLE) { - metaReaderInit(&mer2, pVnode->pMeta, META_READER_NOLOCK); + metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_NOLOCK); if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit; strcpy(metaRsp.stbName, mer2.me.name); @@ -175,7 +175,7 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { } // query meta - metaReaderInit(&mer1, pVnode->pMeta, 0); + metaReaderDoInit(&mer1, pVnode->pMeta, 0); if (metaGetTableEntryByName(&mer1, cfgReq.tbName) < 0) { code = terrno; @@ -188,7 +188,7 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { code = TSDB_CODE_VND_HASH_MISMATCH; goto _exit; } else if (mer1.me.type == TSDB_CHILD_TABLE) { - metaReaderInit(&mer2, pVnode->pMeta, 0); + metaReaderDoInit(&mer2, pVnode->pMeta, 0); if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit; strcpy(cfgRsp.stbName, mer2.me.name); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 28a5becfd41aa1d92069593fcf754307b276cac6..6d68e949890511501981e826d4596614563480b3 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -564,24 +564,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_VND_TMQ_VG_WALINFO: return tqProcessVgWalInfoReq(pVnode->pTq, pMsg); - case TDMT_STREAM_TASK_RUN: - return tqProcessTaskRunReq(pVnode->pTq, pMsg); - case TDMT_STREAM_TASK_DISPATCH: - return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true); - case TDMT_STREAM_TASK_CHECK: - return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg); - case TDMT_STREAM_TASK_DISPATCH_RSP: - return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg); - case TDMT_STREAM_RETRIEVE: - return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg); - case TDMT_STREAM_RETRIEVE_RSP: - return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg); - case TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE: - return tqProcessTaskRecover1Req(pVnode->pTq, pMsg); - case TDMT_STREAM_RECOVER_FINISH: - return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg); - case TDMT_STREAM_RECOVER_FINISH_RSP: - return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in fetch queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; @@ -1651,7 +1633,7 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pRe tDecodeSBatchDeleteReq(&decoder, &deleteReq); SMetaReader mr = {0}; - metaReaderInit(&mr, pVnode->pMeta, META_READER_NOLOCK); + metaReaderDoInit(&mr, pVnode->pMeta, META_READER_NOLOCK); int32_t sz = taosArrayGetSize(deleteReq.deleteReqs); for (int32_t i = 0; i < sz; i++) { diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index 2b9256f08e453101c572f341d381ddde85f77a6f..6c2a16bb0c17342f1af7ee4f788c7a0f315c99b1 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -64,8 +64,10 @@ typedef struct { int8_t recoverStep; int8_t recoverScanFinished; SQueryTableDataCond tableCond; - int64_t fillHistoryVer1; - int64_t fillHisotryeKey1; + SVersionRange fillHistoryVer; + STimeWindow fillHistoryWindow; +// int64_t fillHistoryVer1; +// int64_t fillHisotryeKey1; int64_t fillHistoryVer2; SStreamState* pState; int64_t dataVersion; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 0ea6ae714472124485f3365c43fc1cfe7cfdf5a3..d77323d7378a136654a9f5e15501b32a5fb45819 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -869,11 +869,11 @@ int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) { } } -int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver, int64_t ekey) { +int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); - pTaskInfo->streamInfo.fillHistoryVer1 = ver; - pTaskInfo->streamInfo.fillHisotryeKey1 = ekey; + pTaskInfo->streamInfo.fillHistoryVer = *pVerRange; + pTaskInfo->streamInfo.fillHistoryWindow = *pWindow; pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE1; return 0; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 4b53a9918d24615c6d4f568fba15a42c61361cc7..c892617783310ac86020bcc2ec16c77640d5f32e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1785,14 +1785,17 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE2) { STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; memcpy(&pTSInfo->base.cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond)); + if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE1) { - pTSInfo->base.cond.startVersion = 0; - pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1; - qDebug("stream recover step1, verRange:%" PRId64 " - %" PRId64 ", %s", pTSInfo->base.cond.startVersion, - pTSInfo->base.cond.endVersion, id); + pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer.minVer; + pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer.maxVer; + + pTSInfo->base.cond.twindows = pTaskInfo->streamInfo.fillHistoryWindow; + qDebug("stream recover step1, verRange:%" PRId64 "-%" PRId64 " window:%"PRId64"-%"PRId64", %s", pTSInfo->base.cond.startVersion, + pTSInfo->base.cond.endVersion, pTSInfo->base.cond.twindows.skey, pTSInfo->base.cond.twindows.ekey, id); pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1; } else { - pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1; + pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer.minVer + 1; pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2; qDebug("stream recover step2, verRange:%" PRId64 " - %" PRId64", %s", pTSInfo->base.cond.startVersion, pTSInfo->base.cond.endVersion, id); @@ -2085,8 +2088,11 @@ FETCH_NEXT_BLOCK: return pInfo->pCreateTbRes; } + // todo apply time window range filter + doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock); doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); + pBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 4fc4d3ccf33776c3a76254c7a465679d5c98c25b..e12a0fdd435689958b86972ac1c0a2c3d4ed8818 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -48,7 +48,7 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); -int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId, +int32_t streamDoDispatchRecoverFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId, SEpSet* pEpSet); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 9cb0a5664493ab444c1e81d778f700c53cbcefdc..26dd19ce7ecc1ada962434c5c22fc976a7cefd86 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -164,7 +164,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) } buf = NULL; - qDebug("s-task:%s (child %d) send retrieve req to task %d at node %d, reqId:0x%" PRIx64, pTask->id.idStr, + qDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr, pTask->selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId); } code = 0; @@ -238,14 +238,14 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR msg.pCont = buf; msg.msgType = TDMT_STREAM_TASK_CHECK; - qDebug("s-task:%s dispatch check msg to downstream s-task:%" PRIx64 ":%d node %d: check msg", pTask->id.idStr, + qDebug("s-task:%s dispatch check msg to s-task:%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr, pReq->streamId, pReq->downstreamTaskId, nodeId); tmsgSendReq(pEpSet, &msg); return 0; } -int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId, +int32_t streamDoDispatchRecoverFinishMsg(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId, SEpSet* pEpSet) { void* buf = NULL; int32_t code = -1; @@ -283,8 +283,7 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov msg.info.noResp = 1; tmsgSendReq(pEpSet, &msg); - qDebug("s-task:%s dispatch recover finish msg to downstream taskId:0x%x node %d: recover finish msg", pTask->id.idStr, - pReq->taskId, vgId); + qDebug("s-task:%s dispatch recover finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pReq->taskId, vgId); return 0; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 5d366aca050dac8cf88774a9a9f5431ae20164cc..0bcb078d51b10b2538fa5833fba236131058ba33 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -15,16 +15,30 @@ #include "streamInc.h" #include "ttimer.h" - +#include "wal.h" + +const char* streamGetTaskStatusStr(int32_t status) { + switch(status) { + case TASK_STATUS__NORMAL: return "normal"; + case TASK_STATUS__WAIT_DOWNSTREAM: return "wait-for-downstream"; + case TASK_STATUS__RECOVER_PREPARE: return "scan-history-prepare"; + case TASK_STATUS__RECOVER1: return "scan-history-data"; + default:return ""; + } +} int32_t streamTaskLaunchRecover(SStreamTask* pTask) { - qDebug("s-task:%s at node %d launch recover", pTask->id.idStr, pTask->nodeId); + qDebug("s-task:%s (vgId:%d) launch recover", pTask->id.idStr, pTask->nodeId); if (pTask->taskLevel == TASK_LEVEL__SOURCE) { atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__RECOVER_PREPARE); - qDebug("s-task:%s set task status:%d and start to recover", pTask->id.idStr, pTask->status.taskStatus); + + SVersionRange* pRange = &pTask->dataRange.range; + qDebug("s-task:%s set task status:%s and start to recover, ver:%" PRId64 "-%" PRId64, pTask->id.idStr, + streamGetTaskStatusStr(pTask->status.taskStatus), pTask->dataRange.range.minVer, + pTask->dataRange.range.maxVer); streamSetParamForRecover(pTask); - streamSourceRecoverPrepareStep1(pTask, pTask->dataRange.range.maxVer, pTask->dataRange.window.ekey); + streamSourceRecoverPrepareStep1(pTask, pRange, &pTask->dataRange.window); SStreamRecoverStep1Req req; streamBuildSourceRecover1Req(pTask, &req); @@ -43,12 +57,12 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask) { } } else if (pTask->taskLevel == TASK_LEVEL__AGG) { - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); + streamSetStatusNormal(pTask); streamSetParamForRecover(pTask); streamAggRecoverPrepare(pTask); } else if (pTask->taskLevel == TASK_LEVEL__SINK) { - // sink nodes has no specified operation for fill history - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); + streamSetStatusNormal(pTask); + qDebug("s-task:%s sink task convert to normal immediately", pTask->id.idStr); } return 0; @@ -56,8 +70,9 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask) { // check status int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { - qDebug("s-task:%s in fill history stage, ver:%"PRId64" ekey:%"PRId64, pTask->id.idStr, pTask->dataRange.range.maxVer, - pTask->dataRange.window.ekey); + qDebug("s-task:%s in fill history stage, ver:%" PRId64 "-%"PRId64" window:%" PRId64"-%"PRId64, pTask->id.idStr, + pTask->dataRange.range.minVer, pTask->dataRange.range.maxVer, pTask->dataRange.window.skey, + pTask->dataRange.window.ekey); SStreamTaskCheckReq req = { .streamId = pTask->id.streamId, @@ -74,7 +89,7 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; pTask->checkReqId = req.reqId; - qDebug("s-task:%s at node %d check downstream task:0x%x at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, + qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d)", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { @@ -90,7 +105,7 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { taosArrayPush(pTask->checkReqIds, &req.reqId); req.downstreamNodeId = pVgInfo->vgId; req.downstreamTaskId = pVgInfo->taskId; - qDebug("s-task:%s (vgId:%d) check downstream task:0x%x at node %d (shuffle)", pTask->id.idStr, pTask->nodeId, + qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (shuffle)", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } @@ -113,7 +128,7 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp .childId = pRsp->childId, }; - qDebug("s-task:%s at node %d check downstream task:0x%x at node %d (recheck)", pTask->id.idStr, pTask->nodeId, + qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { @@ -139,9 +154,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask) { int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); - - qDebug("s-task:%s at node %d recv check rsp from task:0x%x at node %d: status %d", pTask->id.idStr, - pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status); + const char* id = pTask->id.idStr; if (pRsp->status == 1) { if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { @@ -167,21 +180,25 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs taosArrayDestroy(pTask->checkReqIds); pTask->checkReqIds = NULL; - qDebug("s-task:%s all %d downstream tasks are ready, now enter into recover stage", pTask->id.idStr, numOfReqs); + qDebug("s-task:%s all %d downstream tasks are ready, now enter into recover stage", id, numOfReqs); streamTaskLaunchRecover(pTask); + } else { + qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, remain not ready:%d", id, + pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, left); } } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { if (pRsp->reqId != pTask->checkReqId) { return -1; } + qDebug("s-task:%s fixed downstream tasks is ready, now enter into recover stage", id); streamTaskLaunchRecover(pTask); } else { ASSERT(0); } - } else { // not ready, wait for 100ms and retry - qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", pTask->id.idStr, - pRsp->downstreamTaskId, pRsp->downstreamNodeId); + } else { // not ready, wait for 100ms and retry + qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", id, pRsp->downstreamTaskId, + pRsp->downstreamNodeId); taosMsleep(100); streamRecheckOneDownstream(pTask, pRsp); @@ -206,9 +223,9 @@ int32_t streamSetStatusNormal(SStreamTask* pTask) { } // source -int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, int64_t ver, int64_t ekey) { +int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) { void* exec = pTask->exec.pExecutor; - return qStreamSourceRecoverStep1(exec, ver, ekey); + return qStreamSourceRecoverStep1(exec, pVerRange, pWindow); } int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq) { @@ -246,23 +263,23 @@ int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) { return code; } -int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) { +int32_t streamDispatchRecoverFinishMsg(SStreamTask* pTask) { SStreamRecoverFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->selfChildId }; // serialize if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { - qDebug("s-task:%s send recover finish msg to downstream (fix-dispatch) to taskId:%d, status:%d", pTask->id.idStr, + qDebug("s-task:%s send recover finish msg to downstream (fix-dispatch) to taskId:0x%x, status:%d", pTask->id.idStr, pTask->fixedEpDispatcher.taskId, pTask->status.taskStatus); req.taskId = pTask->fixedEpDispatcher.taskId; - streamDispatchOneRecoverFinishReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); + streamDoDispatchRecoverFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; int32_t vgSz = taosArrayGetSize(vgInfo); for (int32_t i = 0; i < vgSz; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); req.taskId = pVgInfo->taskId; - streamDispatchOneRecoverFinishReq(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); + streamDoDispatchRecoverFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } } return 0; @@ -271,7 +288,8 @@ int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) { // agg int32_t streamAggRecoverPrepare(SStreamTask* pTask) { pTask->recoverWaitingUpstream = taosArrayGetSize(pTask->childEpInfo); - qDebug("s-task:%s wait for %d upstreams", pTask->id.idStr, pTask->recoverWaitingUpstream); + qDebug("s-task:%s agg task is ready and wait for %d upstream tasks complete fill history procedure", pTask->id.idStr, + pTask->recoverWaitingUpstream); return 0; } @@ -303,8 +321,8 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) { pHTask->dataRange.range.minVer = 0; pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer; - qDebug("s-task:%s set the launch condition for fill history task:%s, window:%" PRId64 " - %" PRId64 - " verrange:%" PRId64 " - %" PRId64, + qDebug("s-task:%s set the launch condition for fill history s-task:%s, window:%" PRId64 "-%" PRId64 + " verrange:%" PRId64 "-%" PRId64, pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey, pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer); @@ -321,7 +339,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr, pMeta->vgId, pTask->historyTaskId.taskId); - taosTmrReset(tryLaunchHistoryTask, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); + taosTmrReset(tryLaunchHistoryTask, 100, pTask, streamEnv.timer, &pTask->timer); return; } @@ -357,6 +375,42 @@ int32_t streamTaskStartHistoryTask(SStreamTask* pTask, int64_t ver) { return TSDB_CODE_SUCCESS; } +int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { + SStreamMeta* pMeta = pTask->pMeta; + + qDebug("s-task:%s set start wal scan start ver:%" PRId64, pTask->id.idStr, pTask->chkInfo.currentVer); + ASSERT(walReaderGetCurrentVer(pTask->exec.pWalReader) == -1); + +// walReaderSeekVer(pTask->exec.pWalReader, sversion); +// pTask->chkInfo.currentVer = sversion; + + if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { + return 0; + } + + // restore param + int32_t code = streamRestoreParam(pTask); + if (code < 0) { + return -1; + } + + // dispatch recover finish req to all related downstream task + code = streamDispatchRecoverFinishMsg(pTask); + if (code < 0) { + return -1; + } + + // set status normal + qDebug("s-task:%s set the status to be normal, and start wal scan", pTask->id.idStr); + code = streamSetStatusNormal(pTask); + if (code < 0) { + return -1; + } + + streamMetaSaveTask(pMeta, pTask); + return 0; +} + int32_t tEncodeSStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 4f883b76e4e225e77c5fe685645a849643d6345b..de10f021d25f899702d421722917bb79abaf78a7 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -17,7 +17,7 @@ #include "tstream.h" #include "wal.h" -static int32_t mndAddToTaskset(SArray* pArray, SStreamTask* pTask) { +static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) { int32_t childId = taosArrayGetSize(pArray); pTask->selfChildId = childId; taosArrayPush(pArray, &pTask); @@ -45,110 +45,10 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; - mndAddToTaskset(pTaskList, pTask); + addToTaskset(pTaskList, pTask); return pTask; } -SStreamTask* streamTaskClone(SStreamTask* pTask) { - SStreamTask* pDst = taosMemoryCalloc(1, sizeof(SStreamTask)); - /* pDst-> - - SStreamId id; - int32_t totalLevel; - int8_t taskLevel; - int8_t outputType; - int16_t dispatchMsgType; - SStreamStatus status; - int32_t selfChildId; - int32_t nodeId; // vgroup id - SEpSet epSet; - SCheckpointInfo chkInfo; - STaskExec exec; - int8_t fillHistory; // fill history - int64_t ekey; // end ts key - int64_t endVer; // end version - - // children info - SArray* childEpInfo; // SArray - int32_t nextCheckId; - SArray* checkpointInfo; // SArray - - // output - union { - STaskDispatcherFixedEp fixedEpDispatcher; - STaskDispatcherShuffle shuffleDispatcher; - STaskSinkTb tbSink; - STaskSinkSma smaSink; - STaskSinkFetch fetchSink; - }; - - int8_t inputStatus; - int8_t outputStatus; - SStreamQueue* inputQueue; - SStreamQueue* outputQueue; - - // trigger - int8_t triggerStatus; - int64_t triggerParam; - void* timer; - SMsgCb* pMsgCb; // msg handle - SStreamState* pState; // state backend - - // the followings attributes don't be serialized - int32_t recoverTryingDownstream; - int32_t recoverWaitingUpstream; - int64_t checkReqId; - SArray* checkReqIds; // shuffle - int32_t refCnt; - int64_t checkpointingId; - int32_t checkpointAlignCnt; - struct SStreamMeta* pMeta; - - int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); - if (pTask->inputQueue) { - streamQueueClose(pTask->inputQueue); - } - if (pTask->outputQueue) { - streamQueueClose(pTask->outputQueue); - } - if (pTask->exec.qmsg) { - taosMemoryFree(pTask->exec.qmsg); - } - - if (pTask->exec.pExecutor) { - qDestroyTask(pTask->exec.pExecutor); - pTask->exec.pExecutor = NULL; - } - - if (pTask->exec.pWalReader != NULL) { - walCloseReader(pTask->exec.pWalReader); - } - - taosArrayDestroyP(pTask->childEpInfo, taosMemoryFree); - if (pTask->outputType == TASK_OUTPUT__TABLE) { - tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper); - taosMemoryFree(pTask->tbSink.pTSchema); - tSimpleHashCleanup(pTask->tbSink.pTblInfo); - } - - if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { - taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); - taosArrayDestroy(pTask->checkReqIds); - pTask->checkReqIds = NULL; - } - - if (pTask->pState) { - streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING); - } - - if (pTask->id.idStr != NULL) { - taosMemoryFree((void*)pTask->id.idStr); - } - - taosMemoryFree(pTask);*/ - return NULL; -} - int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) { if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1;