提交 d8525123 编写于 作者: H Haojun Liao

enh(stream): add API to retrieve last ts for multi-tables.

上级 7374ea2f
...@@ -198,6 +198,7 @@ void *tsdbGetIdx(SMeta *pMeta); ...@@ -198,6 +198,7 @@ void *tsdbGetIdx(SMeta *pMeta);
void *tsdbGetIvtIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta);
uint64_t tsdbGetReaderMaxVersion(STsdbReader *pReader); uint64_t tsdbGetReaderMaxVersion(STsdbReader *pReader);
void tsdbReaderSetCloseFlag(STsdbReader *pReader); void tsdbReaderSetCloseFlag(STsdbReader *pReader);
int64_t tsdbGetLastTimestamp(SVnode* pVnode, void* pTableList, int32_t numOfTables, const char* pIdStr);
int32_t tsdbReuseCacherowsReader(void* pReader, void* pTableIdList, int32_t numOfTables); int32_t tsdbReuseCacherowsReader(void* pReader, void* pTableIdList, int32_t numOfTables);
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
......
...@@ -760,6 +760,7 @@ void freePtr(void *ptr) { ...@@ -760,6 +760,7 @@ void freePtr(void *ptr) {
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId); pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
pTask->refCnt = 1; pTask->refCnt = 1;
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
...@@ -837,7 +838,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -837,7 +838,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
} }
if (pTask->taskLevel == TASK_LEVEL__SOURCE) { if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
SWalFilterCond cond = {.deleteMsg = 1}; SWalFilterCond cond = {.deleteMsg = 1}; // delete msg also extract from wal files
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond); pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
} }
......
...@@ -5506,3 +5506,58 @@ void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) { ...@@ -5506,3 +5506,58 @@ void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) {
} }
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED; } void tsdbReaderSetCloseFlag(STsdbReader* pReader) { pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
/*-------------todo:refactor the implementation of those APIs in this file to seperate the API into two files------*/
// opt perf, do NOT create so many readers
int64_t tsdbGetLastTimestamp(SVnode* pVnode, void* pTableList, int32_t numOfTables, const char* pIdStr) {
SQueryTableDataCond cond = {.type = TIMEWINDOW_RANGE_CONTAINED, .numOfCols = 1, .order = TSDB_ORDER_DESC,
.startVersion = -1, .endVersion = -1};
cond.twindows.skey = INT64_MIN;
cond.twindows.ekey = INT64_MAX;
cond.colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
cond.pSlotList = taosMemoryMalloc(sizeof(int32_t) * cond.numOfCols);
if (cond.colList == NULL || cond.pSlotList == NULL) {
// todo
}
cond.colList[0].colId = 1;
cond.colList[0].slotId = 0;
cond.colList[0].type = TSDB_DATA_TYPE_TIMESTAMP;
cond.pSlotList[0] = 0;
STableKeyInfo* pTableKeyInfo = pTableList;
STsdbReader* pReader = NULL;
SSDataBlock* pBlock = createDataBlock();
SColumnInfoData data = {0};
data.info = (SColumnInfo) {.type = TSDB_DATA_TYPE_TIMESTAMP, .colId = 1, .bytes = TSDB_KEYSIZE};
blockDataAppendColInfo(pBlock, &data);
int64_t key = INT64_MIN;
for(int32_t i = 0; i < numOfTables; ++i) {
int32_t code = tsdbReaderOpen(pVnode, &cond, &pTableKeyInfo[i], 1, pBlock, &pReader, pIdStr, false, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
bool hasData = false;
code = tsdbNextDataBlock(pReader, &hasData);
if (!hasData || code != TSDB_CODE_SUCCESS) {
continue;
}
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, 0);
int64_t k = *(int64_t*)pCol->pData;
if (key < k) {
key = k;
}
tsdbReaderClose(pReader);
}
return 0;
}
...@@ -20,7 +20,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { ...@@ -20,7 +20,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
if (pTask->taskLevel == TASK_LEVEL__SOURCE) { if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__RECOVER_PREPARE); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__RECOVER_PREPARE);
qDebug("s-task:%s set task status:%d and start recover", pTask->id.idStr, pTask->status.taskStatus); qDebug("s-task:%s set task status:%d and start to recover", pTask->id.idStr, pTask->status.taskStatus);
streamSetParamForRecover(pTask); streamSetParamForRecover(pTask);
streamSourceRecoverPrepareStep1(pTask, version); streamSourceRecoverPrepareStep1(pTask, version);
...@@ -46,6 +46,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { ...@@ -46,6 +46,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) {
streamSetParamForRecover(pTask); streamSetParamForRecover(pTask);
streamAggRecoverPrepare(pTask); streamAggRecoverPrepare(pTask);
} else if (pTask->taskLevel == TASK_LEVEL__SINK) { } else if (pTask->taskLevel == TASK_LEVEL__SINK) {
// sink nodes has no specified operation for fill history
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
} }
...@@ -71,23 +72,23 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { ...@@ -71,23 +72,23 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) {
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
pTask->checkReqId = req.reqId; pTask->checkReqId = req.reqId;
qDebug("s-task:%s at node %d check downstream task %d at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, qDebug("s-task:%s at node %d check downstream task:%d at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId,
req.downstreamNodeId); req.downstreamNodeId);
streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t vgSz = taosArrayGetSize(vgInfo); int32_t numOfVgs = taosArrayGetSize(vgInfo);
pTask->recoverTryingDownstream = vgSz; pTask->recoverTryingDownstream = numOfVgs;
pTask->checkReqIds = taosArrayInit(vgSz, sizeof(int64_t)); pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t));
for (int32_t i = 0; i < vgSz; i++) { for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.reqId = tGenIdPI64(); req.reqId = tGenIdPI64();
taosArrayPush(pTask->checkReqIds, &req.reqId); taosArrayPush(pTask->checkReqIds, &req.reqId);
req.downstreamNodeId = pVgInfo->vgId; req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId; req.downstreamTaskId = pVgInfo->taskId;
qDebug("s-task:%s at node %d check downstream task %d at node %d (shuffle)", pTask->id.idStr, pTask->nodeId, qDebug("s-task:%s at node %d check downstream task:%d at node %d (shuffle)", pTask->id.idStr, pTask->nodeId,
req.downstreamTaskId, req.downstreamNodeId); req.downstreamTaskId, req.downstreamNodeId);
streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
} }
...@@ -161,7 +162,7 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* ...@@ -161,7 +162,7 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
taosArrayDestroy(pTask->checkReqIds); taosArrayDestroy(pTask->checkReqIds);
pTask->checkReqIds = NULL; pTask->checkReqIds = NULL;
qDebug("s-task:%s all downstream tasks:%d are ready, now enter into recover stage", pTask->id.idStr, numOfReqs); qDebug("s-task:%s all %d downstream tasks are ready, now enter into recover stage", pTask->id.idStr, numOfReqs);
streamTaskLaunchRecover(pTask, version); streamTaskLaunchRecover(pTask, version);
} }
} else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册