提交 78a240be 编写于 作者: H Haojun Liao

enh(stream): add dispatch msg.

上级 69c9eda7
...@@ -253,6 +253,7 @@ enum { ...@@ -253,6 +253,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RECOVER_FINISH, "stream-recover-finish", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RECOVER_FINISH, "stream-recover-finish", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TRANSFER_STATE, "stream-transfer-state", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "stream-task-check", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "stream-task-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT, "stream-checkpoint", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT, "stream-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL)
......
...@@ -270,6 +270,7 @@ typedef struct SStreamStatus { ...@@ -270,6 +270,7 @@ typedef struct SStreamStatus {
int8_t taskStatus; int8_t taskStatus;
int8_t schedStatus; int8_t schedStatus;
int8_t keepTaskStatus; int8_t keepTaskStatus;
bool transferState;
} SStreamStatus; } SStreamStatus;
typedef struct SHistDataRange { typedef struct SHistDataRange {
...@@ -454,10 +455,10 @@ typedef struct { ...@@ -454,10 +455,10 @@ typedef struct {
int64_t streamId; int64_t streamId;
int32_t taskId; int32_t taskId;
int32_t childId; int32_t childId;
} SStreamRecoverFinishReq; } SStreamRecoverFinishReq, SStreamTransferReq;
int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq); int32_t tEncodeStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq);
int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq); int32_t tDecodeStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq);
typedef struct { typedef struct {
int64_t streamId; int64_t streamId;
...@@ -583,6 +584,9 @@ int32_t streamSourceRecoverScanStep1(SStreamTask* pTask); ...@@ -583,6 +584,9 @@ int32_t streamSourceRecoverScanStep1(SStreamTask* pTask);
int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq); int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq);
int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver); int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver);
int32_t streamDispatchRecoverFinishMsg(SStreamTask* pTask); int32_t streamDispatchRecoverFinishMsg(SStreamTask* pTask);
int32_t streamDispatchTransferStateMsg(SStreamTask* pTask);
// agg level // agg level
int32_t streamAggRecoverPrepare(SStreamTask* pTask); int32_t streamAggRecoverPrepare(SStreamTask* pTask);
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId); int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId);
......
...@@ -728,8 +728,9 @@ SArray *vmGetMsgHandles() { ...@@ -728,8 +728,9 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RECOVER_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RECOVER_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRANSFER_STATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
...@@ -277,7 +277,7 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) { ...@@ -277,7 +277,7 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, msg, msgLen); tDecoderInit(&decoder, msg, msgLen);
tDecodeSStreamRecoverFinishReq(&decoder, &req); tDecodeStreamRecoverFinishReq(&decoder, &req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
// find task // find task
......
...@@ -245,7 +245,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); ...@@ -245,7 +245,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqCheckLogInWal(STQ* pTq, int64_t version); int32_t tqCheckLogInWal(STQ* pTq, int64_t version);
......
...@@ -1102,29 +1102,35 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1102,29 +1102,35 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
double el = (taosGetTimestampMs() - st) / 1000.0; double el = (taosGetTimestampMs() - st) / 1000.0;
tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el); tqDebug("s-task:%s history data scan stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el);
if (pTask->info.fillHistory) {/* if (pTask->info.fillHistory) {
// 1. stop the related stream task, get the current scan wal version of stream task, ver1. // 1. stop the related stream task, get the current scan wal version of stream task, ver.
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) { if (pStreamTask == NULL) {
// todo handle error // todo handle error
} }
// todo here we should use another flag to avoid user resume the task
pStreamTask->status.taskStatus = TASK_STATUS__PAUSE; pStreamTask->status.taskStatus = TASK_STATUS__PAUSE;
// if it's an source task, extract the last version in wal. // if it's an source task, extract the last version in wal.
int64_t ver = pTask->dataRange.range.maxVer;
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
ASSERT(latestVer >= ver);
ver = latestVer;
// 2. wait for downstream tasks to completed // 2. do secondary scan of the history data, the time window remain, and the version range is updated to [pTask->dataRange.range.maxVer, ver1]
// 3. do secondary scan of the history data scan, the time window remain, and the version range is updated to [pTask->dataRange.range.maxVer, ver1]
// 3. notify the downstream tasks to transfer executor state after handle all history blocks.
code = streamDispatchTransferStateMsg(pTask);
if (code != TSDB_CODE_SUCCESS) {
// todo handle error
}
// 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task. // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
// 5. resume the related stream task. // 5. resume the related stream task.
*/
streamMetaReleaseTask(pMeta, pTask);
} else { } else {
// todo update the chkInfo version for current task. // todo update the chkInfo version for current task.
// this task has an associated history stream task, so we need to scan wal from the end version of // this task has an associated history stream task, so we need to scan wal from the end version of
...@@ -1138,53 +1144,23 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1138,53 +1144,23 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
return code; return code;
} }
#if 0
// build msg to launch next step
SStreamRecoverStep2Req req;
code = streamBuildSourceRecover2Req(pTask, &req);
if (code < 0) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return -1;
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
return 0;
}
// serialize msg
int32_t len = sizeof(SStreamRecoverStep1Req);
void* serializedReq = rpcMallocCont(len);
if (serializedReq == NULL) {
tqError("s-task:%s failed to prepare the step2 stage, out of memory", pTask->id.idStr);
return -1;
}
memcpy(serializedReq, &req, len);
// dispatch msg
tqDebug("s-task:%s start recover block stage", pTask->id.idStr);
SRpcMsg rpcMsg = {
.code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq};
tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
#endif
return 0; return 0;
} }
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { // notify the downstream tasks to transfer executor state after handle all history blocks.
int32_t code = 0; int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SStreamTransferReq* pReq = (SStreamTransferReq*)msg;
SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
if (pTask == NULL) { if (pTask == NULL) {
return -1; return -1;
} }
// do recovery step 2 ASSERT(pTask->streamTaskId.taskId != 0);
pTask->status.transferState = true; // persistent data?
#if 0
// do check if current task handle all data in the input queue
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
tqDebug("s-task:%s start step2 recover, ts:%" PRId64, pTask->id.idStr, st); tqDebug("s-task:%s start step2 recover, ts:%" PRId64, pTask->id.idStr, st);
...@@ -1231,8 +1207,11 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t ...@@ -1231,8 +1207,11 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t
atomic_store_8(&pTask->info.fillHistory, 0); atomic_store_8(&pTask->info.fillHistory, 0);
streamMetaSaveTask(pTq->pStreamMeta, pTask); streamMetaSaveTask(pTq->pStreamMeta, pTask);
#endif
streamSchedExec(pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0; return 0;
} }
...@@ -1245,7 +1224,7 @@ int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1245,7 +1224,7 @@ int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen); tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
tDecodeSStreamRecoverFinishReq(&decoder, &req); tDecodeStreamRecoverFinishReq(&decoder, &req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
// find task // find task
......
...@@ -326,10 +326,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg ...@@ -326,10 +326,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
if (!syncUtilUserCommit(pMsg->msgType)) goto _exit; if (!syncUtilUserCommit(pMsg->msgType)) goto _exit;
if (pMsg->msgType == TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE || pMsg->msgType == TDMT_STREAM_TASK_CHECK_RSP) {
if (tqCheckLogInWal(pVnode->pTq, ver)) return 0;
}
// skip header // skip header
pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
len = pMsg->contLen - sizeof(SMsgHead); len = pMsg->contLen - sizeof(SMsgHead);
...@@ -425,11 +421,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg ...@@ -425,11 +421,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
goto _err; goto _err;
} }
} break; } break;
case TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE: {
if (tqProcessTaskRecover2Req(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
goto _err;
}
} break;
case TDMT_STREAM_TASK_CHECK_RSP: { case TDMT_STREAM_TASK_CHECK_RSP: {
if (tqProcessStreamTaskCheckRsp(pVnode->pTq, ver, pReq, len) < 0) { if (tqProcessStreamTaskCheckRsp(pVnode->pTq, ver, pReq, len) < 0) {
goto _err; goto _err;
...@@ -593,6 +584,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) ...@@ -593,6 +584,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg); return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE: case TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE:
return tqProcessTaskRecover1Req(pVnode->pTq, pMsg); return tqProcessTaskRecover1Req(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE:
return tqProcessTaskTransferStateReq(pVnode->pTq, 0, pMsg->pCont, pMsg->contLen);
case TDMT_STREAM_RECOVER_FINISH: case TDMT_STREAM_RECOVER_FINISH:
return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg); return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg);
case TDMT_STREAM_RECOVER_FINISH_RSP: case TDMT_STREAM_RECOVER_FINISH_RSP:
......
...@@ -267,8 +267,8 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR ...@@ -267,8 +267,8 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR
msg.pCont = buf; msg.pCont = buf;
msg.msgType = TDMT_STREAM_TASK_CHECK; msg.msgType = TDMT_STREAM_TASK_CHECK;
qDebug("s-task:%s dispatch check msg to s-task:%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr, qDebug("s-task:%s (level:%d) dispatch check msg to s-task:%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr,
pReq->streamId, pReq->downstreamTaskId, nodeId); pTask->info.taskLevel, pReq->streamId, pReq->downstreamTaskId, nodeId);
tmsgSendReq(pEpSet, &msg); tmsgSendReq(pEpSet, &msg);
return 0; return 0;
...@@ -281,7 +281,7 @@ int32_t streamDoDispatchRecoverFinishMsg(SStreamTask* pTask, const SStreamRecove ...@@ -281,7 +281,7 @@ int32_t streamDoDispatchRecoverFinishMsg(SStreamTask* pTask, const SStreamRecove
SRpcMsg msg = {0}; SRpcMsg msg = {0};
int32_t tlen; int32_t tlen;
tEncodeSize(tEncodeSStreamRecoverFinishReq, pReq, tlen, code); tEncodeSize(tEncodeStreamRecoverFinishReq, pReq, tlen, code);
if (code < 0) { if (code < 0) {
return -1; return -1;
} }
...@@ -297,7 +297,7 @@ int32_t streamDoDispatchRecoverFinishMsg(SStreamTask* pTask, const SStreamRecove ...@@ -297,7 +297,7 @@ int32_t streamDoDispatchRecoverFinishMsg(SStreamTask* pTask, const SStreamRecove
SEncoder encoder; SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen); tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeSStreamRecoverFinishReq(&encoder, pReq)) < 0) { if ((code = tEncodeStreamRecoverFinishReq(&encoder, pReq)) < 0) {
if (buf) { if (buf) {
rpcFreeCont(buf); rpcFreeCont(buf);
} }
...@@ -385,8 +385,6 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S ...@@ -385,8 +385,6 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName); snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName);
} }
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
/*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/ /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/
SUseDbRsp* pDbInfo = &pTask->shuffleDispatcher.dbInfo; SUseDbRsp* pDbInfo = &pTask->shuffleDispatcher.dbInfo;
hashValue = hashValue =
......
...@@ -386,6 +386,23 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -386,6 +386,23 @@ int32_t streamExecForAll(SStreamTask* pTask) {
} }
if (pInput == NULL) { if (pInput == NULL) {
if (pTask->info.fillHistory && pTask->status.transferState) {
// todo transfer task state here
SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId);
ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
ASSERT(pStreamTask->status.taskStatus == STREAM_STATUS__PAUSE);
// update the scan data range for source task.
streamSetStatusNormal(pStreamTask);
streamSchedExec(pStreamTask);
streamMetaReleaseTask(pTask->pMeta, pStreamTask);
// todo set the task with specified status, to avoid execute this process again
}
break; break;
} }
......
...@@ -285,6 +285,72 @@ int32_t streamDispatchRecoverFinishMsg(SStreamTask* pTask) { ...@@ -285,6 +285,72 @@ int32_t streamDispatchRecoverFinishMsg(SStreamTask* pTask) {
return 0; return 0;
} }
static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferReq* pReq, int32_t vgId, SEpSet* pEpSet) {
void* buf = NULL;
int32_t code = -1;
SRpcMsg msg = {0};
int32_t tlen;
tEncodeSize(tEncodeStreamRecoverFinishReq, pReq, tlen, code);
if (code < 0) {
return -1;
}
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
((SMsgHead*)buf)->vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamRecoverFinishReq(&encoder, pReq)) < 0) {
if (buf) {
rpcFreeCont(buf);
}
return code;
}
tEncoderClear(&encoder);
msg.contLen = tlen + sizeof(SMsgHead);
msg.pCont = buf;
msg.msgType = TDMT_STREAM_TRANSFER_STATE;
msg.info.noResp = 1;
tmsgSendReq(pEpSet, &msg);
qDebug("s-task:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pReq->taskId, vgId);
return 0;
}
int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
SStreamTransferReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };
// serialize
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
qDebug("s-task:%s send recover finish msg to downstream (fix-dispatch) to taskId:0x%x, status:%d", pTask->id.idStr,
pTask->fixedEpDispatcher.taskId, pTask->status.taskStatus);
req.taskId = pTask->fixedEpDispatcher.taskId;
doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.taskId = pVgInfo->taskId;
doDispatchTransferMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
}
return 0;
}
// agg // agg
int32_t streamAggRecoverPrepare(SStreamTask* pTask) { int32_t streamAggRecoverPrepare(SStreamTask* pTask) {
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList); pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
...@@ -465,7 +531,7 @@ int32_t tDecodeSStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp ...@@ -465,7 +531,7 @@ int32_t tDecodeSStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp
return 0; return 0;
} }
int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq) { int32_t tEncodeStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
...@@ -473,7 +539,7 @@ int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverF ...@@ -473,7 +539,7 @@ int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverF
tEndEncode(pEncoder); tEndEncode(pEncoder);
return pEncoder->pos; return pEncoder->pos;
} }
int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq) { int32_t tDecodeStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1; if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册