提交 64365666 编写于 作者: H Haojun Liao

fix(stream): update the info, and do some internal refactor.

上级 4c82558e
......@@ -253,7 +253,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY, "stream-scan-history", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RECOVER_FINISH, "stream-recover-finish", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY_FINISH, "stream-scan-history-finish", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TRANSFER_STATE, "stream-transfer-state", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "stream-task-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT, "stream-checkpoint", NULL, NULL)
......
......@@ -568,12 +568,15 @@ bool streamTaskIsIdle(const SStreamTask* pTask);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
// recover and fill history
void streamPrepareNdoCheckDownstream(SStreamTask* pTask);
int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask);
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask);
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
int32_t streamTaskStartHistoryTask(SStreamTask* pTask);
int32_t streamCheckHistoryTaskDownstrem(SStreamTask* pTask);
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated);
......
......@@ -76,6 +76,9 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
code = 0;
_OVER:
......
......@@ -730,7 +730,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RECOVER_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRANSFER_STATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
......
......@@ -177,6 +177,7 @@ int32_t mndAssignStreamTaskToSnode(SMnode* pMnode, SStreamTask* pTask, SSubplan*
plan->execNode.nodeId = SNODE_HANDLE;
plan->execNode.epSet = pTask->info.epSet;
mDebug("s-task:0x%x set the agg task to snode:%d", pTask->id.taskId, SNODE_HANDLE);
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
......@@ -240,7 +241,7 @@ int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* p
static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SArray* pSinkTaskList,
SStreamObj* pStream, SSubplan* plan, uint64_t uid, int8_t fillHistory,
bool hasExtraSink) {
bool hasExtraSink, int64_t firstWindowSkey) {
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, pStream->conf.triggerParam, pTaskList);
if (pTask == NULL) {
return terrno;
......@@ -249,6 +250,7 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas
// todo set the correct ts, which should be last key of queried table.
pTask->dataRange.window.skey = INT64_MIN;
pTask->dataRange.window.ekey = 1685959190000;//taosGetTimestampMs();
// pTask->dataRange.window.ekey = firstWindowSkey - 1;//taosGetTimestampMs();
mDebug("add source task 0x%x window:%" PRId64 " - %" PRId64, pTask->id.taskId, pTask->dataRange.window.skey,
pTask->dataRange.window.ekey);
......@@ -330,7 +332,7 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) {
}
static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* pPlan, SStreamObj* pStream,
bool hasExtraSink) {
bool hasExtraSink, int64_t lastTs) {
// create exec stream task, since only one level, the exec task is also the source task
SArray* pTaskList = addNewTaskList(pStream->tasks);
......@@ -368,7 +370,7 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan*
// new stream task
SArray** pSinkTaskList = taosArrayGet(pStream->tasks, SINK_NODE_LEVEL);
int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid,
0, hasExtraSink);
0, hasExtraSink, lastTs);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pSdb, pVgroup);
return -1;
......@@ -377,7 +379,7 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan*
if (pStream->conf.fillHistory) {
SArray** pHSinkTaskList = taosArrayGet(pStream->pHTasksList, SINK_NODE_LEVEL);
code = addSourceStreamTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid,
pStream->conf.fillHistory, hasExtraSink);
pStream->conf.fillHistory, hasExtraSink, lastTs);
setHTasksId(pTaskList, pHTaskList);
}
......@@ -402,7 +404,8 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui
pTask->dataRange.window.skey = INT64_MIN;
pTask->dataRange.window.ekey = 1685959190000;//taosGetTimestampMs();
mDebug("s-task:0x%x set time window:%"PRId64" - %"PRId64, pTask->id.taskId, pTask->dataRange.window.skey, pTask->dataRange.window.ekey);
mDebug("s-task:0x%x level:%d set time window:%" PRId64 " - %" PRId64, pTask->id.taskId, pTask->info.taskLevel,
pTask->dataRange.window.skey, pTask->dataRange.window.ekey);
// all the source tasks dispatch result to a single agg node.
setFixedDownstreamEpInfo(pTask, pDownstreamTask);
......@@ -570,7 +573,7 @@ static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStr
return TSDB_CODE_SUCCESS;
}
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan) {
static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan, int64_t lastTs) {
SSdb* pSdb = pMnode->pSdb;
int32_t numOfPlanLevel = LIST_LENGTH(pPlan->pSubplans);
......@@ -624,7 +627,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan*
// source level
return addSourceTasksForMultiLevelStream(pMnode, pPlan, pStream, pAggTask, pHAggTask);
} else if (numOfPlanLevel == 1) {
return addSourceTasksForOneLevelStream(pMnode, pPlan, pStream, hasExtraSink);
return addSourceTasksForOneLevelStream(pMnode, pPlan, pStream, hasExtraSink, lastTs);
}
return 0;
......@@ -637,7 +640,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
return -1;
}
int32_t code = doScheduleStream(pStream, pMnode, pPlan);
int32_t code = doScheduleStream(pStream, pMnode, pPlan, 0);
qDestroyQueryPlan(pPlan);
return code;
......
......@@ -52,10 +52,7 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) {
FAIL:
if (pMsg->info.handle == NULL) return;
SRpcMsg rsp = {
.code = code,
.info = pMsg->info,
};
SRpcMsg rsp = { .code = code, .info = pMsg->info};
tmsgSendRsp(&rsp);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
......@@ -65,10 +62,11 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->pUpstreamEpInfoList) != 0);
pTask->refCnt = 1;
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
pTask->inputQueue = streamQueueOpen(0);
pTask->outputQueue = streamQueueOpen(0);
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->inputQueue = streamQueueOpen(512 << 10);
pTask->outputQueue = streamQueueOpen(512 << 10);
if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
return -1;
......@@ -93,6 +91,10 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
ASSERT(pTask->exec.pExecutor);
streamSetupTrigger(pTask);
qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE,
pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel);
return 0;
}
......@@ -149,6 +151,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
taosMemoryFree(pTask);
return -1;
}
tDecoderClear(&decoder);
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG);
......@@ -161,19 +164,20 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
return -1;
}
int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta);
taosWUnLockLatch(&pSnode->pMeta->lock);
// 3.go through recover steps to fill history
if (pTask->info.fillHistory) {
streamSetParamForScanHistoryData(pTask);
streamAggRecoverPrepare(pTask);
}
streamPrepareNdoCheckDownstream(pTask);
qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
return 0;
}
int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId);
streamMetaRemoveTask(pSnode->pMeta, pReq->taskId);
return 0;
}
......@@ -255,13 +259,15 @@ int32_t sndProcessTaskRetrieveRsp(SSnode *pSnode, SRpcMsg *pMsg) {
}
int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
void *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
switch (pMsg->msgType) {
case TDMT_STREAM_TASK_DEPLOY:
case TDMT_STREAM_TASK_DEPLOY: {
void *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
return sndProcessTaskDeployReq(pSnode, pReq, len);
}
case TDMT_STREAM_TASK_DROP:
return sndProcessTaskDropReq(pSnode, pReq, len);
return sndProcessTaskDropReq(pSnode, pMsg->pCont, pMsg->contLen);
default:
ASSERT(0);
}
......@@ -300,6 +306,102 @@ int32_t sndProcessTaskRecoverFinishRsp(SSnode *pSnode, SRpcMsg *pMsg) {
return 0;
}
int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
char *msgStr = pMsg->pCont;
char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamTaskCheckReq req;
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen);
tDecodeStreamTaskCheckReq(&decoder, &req);
tDecoderClear(&decoder);
int32_t taskId = req.downstreamTaskId;
SStreamTaskCheckRsp rsp = {
.reqId = req.reqId,
.streamId = req.streamId,
.childId = req.childId,
.downstreamNodeId = req.downstreamNodeId,
.downstreamTaskId = req.downstreamTaskId,
.upstreamNodeId = req.upstreamNodeId,
.upstreamTaskId = req.upstreamTaskId,
};
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask);
streamMetaReleaseTask(pSnode->pMeta, pTask);
qDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d",
pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId,
streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status);
} else {
rsp.status = 0;
qDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64
") from task:0x%x (vgId:%d), rsp status %d",
taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
}
SEncoder encoder;
int32_t code;
int32_t len;
tEncodeSize(tEncodeStreamTaskCheckRsp, &rsp, len, code);
if (code < 0) {
qError("vgId:%d failed to encode task check rsp, task:0x%x", pSnode->pMeta->vgId, taskId);
return -1;
}
void *buf = rpcMallocCont(sizeof(SMsgHead) + len);
((SMsgHead *)buf)->vgId = htonl(req.upstreamNodeId);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, (uint8_t *)abuf, len);
tEncodeStreamTaskCheckRsp(&encoder, &rsp);
tEncoderClear(&encoder);
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info};
tmsgSendRsp(&rspMsg);
return 0;
}
int32_t sndProcessStreamTaskCheckRsp(SSnode* pSnode, SRpcMsg* pMsg) {
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t code;
SStreamTaskCheckRsp rsp;
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)pReq, len);
code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
if (code < 0) {
tDecoderClear(&decoder);
return -1;
}
tDecoderClear(&decoder);
qDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, rsp.upstreamTaskId);
if (pTask == NULL) {
qError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
pSnode->pMeta->vgId);
return -1;
}
code = streamProcessCheckRsp(pTask, &rsp);
streamMetaReleaseTask(pSnode->pMeta, pTask);
return code;
}
int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
switch (pMsg->msgType) {
case TDMT_STREAM_TASK_RUN:
......@@ -312,10 +414,14 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
return sndProcessTaskRetrieveReq(pSnode, pMsg);
case TDMT_STREAM_RETRIEVE_RSP:
return sndProcessTaskRetrieveRsp(pSnode, pMsg);
case TDMT_STREAM_RECOVER_FINISH:
case TDMT_STREAM_SCAN_HISTORY_FINISH:
return sndProcessTaskRecoverFinishReq(pSnode, pMsg);
case TDMT_STREAM_RECOVER_FINISH_RSP:
case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP:
return sndProcessTaskRecoverFinishRsp(pSnode, pMsg);
case TDMT_STREAM_TASK_CHECK:
return sndProcessStreamTaskCheckReq(pSnode, pMsg);
case TDMT_STREAM_TASK_CHECK_RSP:
return sndProcessStreamTaskCheckRsp(pSnode, pMsg);
default:
ASSERT(0);
}
......
......@@ -171,7 +171,6 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq);
// tq util
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem);
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
......
......@@ -218,7 +218,7 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
int tqCheckforStreamStatus(STQ* pTq);
int tqCheckStreamStatus(STQ* pTq);
int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
......@@ -240,7 +240,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, SRpcMsg* pMsg);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
......
......@@ -920,12 +920,15 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
return 0;
}
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, SRpcMsg* pMsg) {
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t code;
SStreamTaskCheckRsp rsp;
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
tDecoderInit(&decoder, (uint8_t*)pReq, len);
code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
if (code < 0) {
......@@ -993,37 +996,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
taosWUnLockLatch(&pStreamMeta->lock);
// 3. It's an fill history task, do nothing. wait for the main task to start it
if (pTask->info.fillHistory) {
tqDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
} else {
// calculate the correct start time window, and start the handle the history data for the main task.
if (pTask->historyTaskId.taskId != 0) {
// launch the history fill stream task
streamTaskStartHistoryTask(pTask);
// launch current task
SHistDataRange* pRange = &pTask->dataRange;
int64_t ekey = pRange->window.ekey;
int64_t ver = pRange->range.minVer;
pRange->window.skey = ekey;
pRange->window.ekey = INT64_MAX;
pRange->range.minVer = 0;
pRange->range.maxVer = ver;
tqDebug("s-task:%s fill-history task exists, update stream time window:%" PRId64 " - %" PRId64
", ver range:%" PRId64 " - %" PRId64,
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
} else {
SHistDataRange* pRange = &pTask->dataRange;
tqDebug("s-task:%s no associated task, stream time window:%" PRId64 " - %" PRId64 ", ver range:%" PRId64
" - %" PRId64,
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
}
ASSERT(pTask->status.checkDownstream == 0);
streamTaskCheckDownstreamTasks(pTask);
}
streamPrepareNdoCheckDownstream(pTask);
tqDebug("vgId:%d s-task:%s is deployed and add into meta, status:%s, numOfTasks:%d", vgId, pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
......@@ -1160,9 +1133,16 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// todo update the chkInfo version for current task.
// this task has an associated history stream task, so we need to scan wal from the end version of
// history scan. The current version of chkInfo.current is not updated during the history scan
tqDebug("s-task:%s history data scan completed, now start to scan data from wal, start ver:%" PRId64
", window:%" PRId64 " - %" PRId64,
pTask->id.idStr, pTask->chkInfo.currentVer, pTask->dataRange.window.skey, pTask->dataRange.window.ekey);
if (pTask->historyTaskId.taskId == 0) {
pTask->dataRange.window.ekey = INT64_MAX;
pTask->dataRange.window.skey = INT64_MIN;
tqDebug("s-task:%s without associated stream task, reset the time window:%"PRId64" - %"PRId64, pTask->id.idStr,
pTask->dataRange.window.skey, pTask->dataRange.window.ekey);
} else {
tqDebug("s-task:%s history data scan completed, now start to scan data from wal, start ver:%" PRId64
", window:%" PRId64 " - %" PRId64,
pTask->id.idStr, pTask->chkInfo.currentVer, pTask->dataRange.window.skey, pTask->dataRange.window.ekey);
}
code = streamTaskScanHistoryDataComplete(pTask);
streamMetaReleaseTask(pMeta, pTask);
......
......@@ -87,7 +87,7 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) {
return 0;
}
int32_t tqCheckforStreamStatus(STQ* pTq) {
int32_t tqCheckStreamStatus(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
......
......@@ -20,12 +20,6 @@
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
const SMqMetaRsp* pRsp, int32_t vgId);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
char buf[128] = {0};
sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId);
return taosStrdup(buf);
}
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem) {
int32_t code = tAppendDataToInputQueue(pTask, pQueueItem);
if (code < 0) {
......
......@@ -579,11 +579,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_CHECK:
return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_CHECK_RSP: {
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
return tqProcessStreamTaskCheckRsp(pVnode->pTq, 0, pReq, len);
}
case TDMT_STREAM_TASK_CHECK_RSP:
return tqProcessStreamTaskCheckRsp(pVnode->pTq, 0, pMsg);
case TDMT_STREAM_RETRIEVE:
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE_RSP:
......@@ -595,9 +592,9 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
int32_t len = pMsg->contLen - sizeof(SMsgHead);
return tqProcessTaskTransferStateReq(pVnode->pTq, 0, pReq, len);
}
case TDMT_STREAM_RECOVER_FINISH:
case TDMT_STREAM_SCAN_HISTORY_FINISH:
return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg);
case TDMT_STREAM_RECOVER_FINISH_RSP:
case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP:
return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg);
default:
vError("unknown msg type:%d in stream queue", pMsg->msgType);
......
......@@ -554,7 +554,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", pVnode->config.vgId);
} else {
vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId);
tqCheckforStreamStatus(pVnode->pTq);
tqCheckStreamStatus(pVnode->pTq);
}
}
......
......@@ -55,6 +55,12 @@ void streamCleanUp() {
}
}
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
char buf[128] = {0};
sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId);
return taosStrdup(buf);
}
void streamSchedByTimer(void* param, void* tmrId) {
SStreamTask* pTask = (void*)param;
......
......@@ -308,7 +308,7 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRe
msg.contLen = tlen + sizeof(SMsgHead);
msg.pCont = buf;
msg.msgType = TDMT_STREAM_RECOVER_FINISH;
msg.msgType = TDMT_STREAM_SCAN_HISTORY_FINISH;
msg.info.noResp = 1;
tmsgSendReq(pEpSet, &msg);
......
......@@ -98,7 +98,6 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
// serialize
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
req.reqId = tGenIdPI64();
req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
......@@ -126,8 +125,9 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
}
} else {
pTask->status.checkDownstream = 1;
qDebug("s-task:%s (vgId:%d) set downstream task checked for task without downstream tasks, try to launch scan-history-data, status:%s",
qDebug("s-task:%s (vgId:%d) no downstream tasks, set downstream checked, try to launch scan-history-data, status:%s",
pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus));
streamTaskLaunchScanHistory(pTask);
}
......@@ -460,11 +460,8 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
// todo fix the bug: 2. race condition
// an fill history task needs to be started.
int32_t streamTaskStartHistoryTask(SStreamTask* pTask) {
int32_t streamCheckHistoryTaskDownstrem(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
if (pTask->historyTaskId.taskId == 0) {
return TSDB_CODE_SUCCESS;
}
// Set the execute conditions, including the query time window and the version range
SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &pTask->historyTaskId.taskId, sizeof(pTask->historyTaskId.taskId));
......@@ -585,3 +582,40 @@ int32_t tDecodeStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishRe
tEndDecode(pDecoder);
return 0;
}
void streamPrepareNdoCheckDownstream(SStreamTask* pTask) {
if (pTask->info.fillHistory) {
qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
} else {
// calculate the correct start time window, and start the handle the history data for the main task.
if (pTask->historyTaskId.taskId != 0) {
// check downstream tasks for associated scan-history-data tasks
streamCheckHistoryTaskDownstrem(pTask);
// launch current task
SHistDataRange* pRange = &pTask->dataRange;
int64_t ekey = pRange->window.ekey;
int64_t ver = pRange->range.minVer;
pRange->window.skey = ekey;
pRange->window.ekey = INT64_MAX;
pRange->range.minVer = 0;
pRange->range.maxVer = ver;
qDebug("s-task:%s level:%d fill-history task exists, update stream time window:%" PRId64 " - %" PRId64
", ver range:%" PRId64 " - %" PRId64,
pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
pRange->range.maxVer);
} else {
SHistDataRange* pRange = &pTask->dataRange;
qDebug("s-task:%s no associated scan-history task, stream time window:%" PRId64 " - %" PRId64 ", ver range:%" PRId64
" - %" PRId64,
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
}
ASSERT(pTask->status.checkDownstream == 0);
// check downstream tasks for itself
streamTaskCheckDownstreamTasks(pTask);
}
}
......@@ -14,6 +14,7 @@ sql create table ts3 using st tags(3,2,2);
sql create table ts4 using st tags(4,2,2);
sql create stream stream_t1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into test0.streamtST1 as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from st partition by ta,tb,tc interval(10s);
sleep 500
sql insert into ts1 values(1648791213001,1,12,3,1.0);
sql insert into ts2 values(1648791213001,1,12,3,1.0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册