From d8525123a2b03c54d35564d179b3077061a32f67 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 19 May 2023 11:03:41 +0800 Subject: [PATCH] enh(stream): add API to retrieve last ts for multi-tables. --- source/dnode/vnode/inc/vnode.h | 1 + source/dnode/vnode/src/tq/tq.c | 3 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 55 ++++++++++++++++++++++++++ source/libs/stream/src/streamRecover.c | 17 ++++---- 4 files changed, 67 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index da5d6f8b3c..18fa893fa4 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -198,6 +198,7 @@ void *tsdbGetIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta); uint64_t tsdbGetReaderMaxVersion(STsdbReader *pReader); void tsdbReaderSetCloseFlag(STsdbReader *pReader); +int64_t tsdbGetLastTimestamp(SVnode* pVnode, void* pTableList, int32_t numOfTables, const char* pIdStr); int32_t tsdbReuseCacherowsReader(void* pReader, void* pTableIdList, int32_t numOfTables); int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a8c2a09319..7dc7f999fb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -760,6 +760,7 @@ void freePtr(void *ptr) { int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t vgId = TD_VID(pTq->pVnode); + pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId); pTask->refCnt = 1; pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; @@ -837,7 +838,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } if (pTask->taskLevel == TASK_LEVEL__SOURCE) { - SWalFilterCond cond = {.deleteMsg = 1}; + SWalFilterCond cond = {.deleteMsg = 1}; // delete msg also extract from wal files pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond); } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 36b18ce9f4..cde2672541 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -5506,3 +5506,58 @@ void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) { } void tsdbReaderSetCloseFlag(STsdbReader* pReader) { pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED; } + +/*-------------todo:refactor the implementation of those APIs in this file to seperate the API into two files------*/ +// opt perf, do NOT create so many readers +int64_t tsdbGetLastTimestamp(SVnode* pVnode, void* pTableList, int32_t numOfTables, const char* pIdStr) { + SQueryTableDataCond cond = {.type = TIMEWINDOW_RANGE_CONTAINED, .numOfCols = 1, .order = TSDB_ORDER_DESC, + .startVersion = -1, .endVersion = -1}; + cond.twindows.skey = INT64_MIN; + cond.twindows.ekey = INT64_MAX; + + cond.colList = taosMemoryCalloc(1, sizeof(SColumnInfo)); + cond.pSlotList = taosMemoryMalloc(sizeof(int32_t) * cond.numOfCols); + if (cond.colList == NULL || cond.pSlotList == NULL) { + // todo + } + + cond.colList[0].colId = 1; + cond.colList[0].slotId = 0; + cond.colList[0].type = TSDB_DATA_TYPE_TIMESTAMP; + + cond.pSlotList[0] = 0; + + STableKeyInfo* pTableKeyInfo = pTableList; + STsdbReader* pReader = NULL; + SSDataBlock* pBlock = createDataBlock(); + + SColumnInfoData data = {0}; + data.info = (SColumnInfo) {.type = TSDB_DATA_TYPE_TIMESTAMP, .colId = 1, .bytes = TSDB_KEYSIZE}; + blockDataAppendColInfo(pBlock, &data); + + int64_t key = INT64_MIN; + + for(int32_t i = 0; i < numOfTables; ++i) { + int32_t code = tsdbReaderOpen(pVnode, &cond, &pTableKeyInfo[i], 1, pBlock, &pReader, pIdStr, false, NULL); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + bool hasData = false; + code = tsdbNextDataBlock(pReader, &hasData); + if (!hasData || code != TSDB_CODE_SUCCESS) { + continue; + } + + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, 0); + int64_t k = *(int64_t*)pCol->pData; + + if (key < k) { + key = k; + } + + tsdbReaderClose(pReader); + } + + return 0; +} diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index f71d078a3d..7236c6c4b9 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -20,7 +20,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { if (pTask->taskLevel == TASK_LEVEL__SOURCE) { atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__RECOVER_PREPARE); - qDebug("s-task:%s set task status:%d and start recover", pTask->id.idStr, pTask->status.taskStatus); + qDebug("s-task:%s set task status:%d and start to recover", pTask->id.idStr, pTask->status.taskStatus); streamSetParamForRecover(pTask); streamSourceRecoverPrepareStep1(pTask, version); @@ -46,6 +46,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { streamSetParamForRecover(pTask); streamAggRecoverPrepare(pTask); } else if (pTask->taskLevel == TASK_LEVEL__SINK) { + // sink nodes has no specified operation for fill history atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); } @@ -71,23 +72,23 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; pTask->checkReqId = req.reqId; - qDebug("s-task:%s at node %d check downstream task %d at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, + qDebug("s-task:%s at node %d check downstream task:%d at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t vgSz = taosArrayGetSize(vgInfo); - pTask->recoverTryingDownstream = vgSz; - pTask->checkReqIds = taosArrayInit(vgSz, sizeof(int64_t)); + int32_t numOfVgs = taosArrayGetSize(vgInfo); + pTask->recoverTryingDownstream = numOfVgs; + pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t)); - for (int32_t i = 0; i < vgSz; i++) { + for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); req.reqId = tGenIdPI64(); taosArrayPush(pTask->checkReqIds, &req.reqId); req.downstreamNodeId = pVgInfo->vgId; req.downstreamTaskId = pVgInfo->taskId; - qDebug("s-task:%s at node %d check downstream task %d at node %d (shuffle)", pTask->id.idStr, pTask->nodeId, + qDebug("s-task:%s at node %d check downstream task:%d at node %d (shuffle)", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } @@ -161,7 +162,7 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* taosArrayDestroy(pTask->checkReqIds); pTask->checkReqIds = NULL; - qDebug("s-task:%s all downstream tasks:%d are ready, now enter into recover stage", pTask->id.idStr, numOfReqs); + qDebug("s-task:%s all %d downstream tasks are ready, now enter into recover stage", pTask->id.idStr, numOfReqs); streamTaskLaunchRecover(pTask, version); } } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { -- GitLab