提交 76b5c08f 编写于 作者: dengyihao's avatar dengyihao

add checkpoint

上级 53ac9fe4
...@@ -174,13 +174,13 @@ enum { ...@@ -174,13 +174,13 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_UPTIME_TIMER, "uptime-timer", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, "lost-consumer-clear", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, "lost-consumer-clear", NULL, NULL)
// TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL)
// TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_BALANCE_VGROUP_LEADER, "balance-vgroup-leader", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_RESTORE_DNODE, "restore-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RESTORE_DNODE, "restore-dnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_PAUSE_STREAM, "pause-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_PAUSE_STREAM, "pause-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_RESUME_STREAM, "resume-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RESUME_STREAM, "resume-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
......
...@@ -70,8 +70,8 @@ int32_t mndInitStream(SMnode *pMnode) { ...@@ -70,8 +70,8 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp);
// mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
// mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq);
...@@ -792,8 +792,6 @@ _OVER: ...@@ -792,8 +792,6 @@ _OVER:
return code; return code;
} }
#if 0
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
...@@ -834,8 +832,8 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, con ...@@ -834,8 +832,8 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, con
req.checkpointId = pMsg->checkpointId; req.checkpointId = pMsg->checkpointId;
req.nodeId = pTask->nodeId; req.nodeId = pTask->nodeId;
req.expireTime = -1; req.expireTime = -1;
req.streamId = pTask->streamId; req.streamId = pTask->id.streamId;
req.taskId = pTask->taskId; req.taskId = pTask->id.taskId;
int32_t code; int32_t code;
int32_t blen; int32_t blen;
...@@ -880,7 +878,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { ...@@ -880,7 +878,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SStreamObj *pStream = mndAcquireStream(pMnode, pMsg->streamName); SStreamObj *pStream = mndAcquireStream(pMnode, pMsg->streamName);
if (pStream == NULL || pStream->uid != pMsg->streamId) { if (pStream == NULL || pStream->uid != pMsg->streamId) {
mError("start checkpointing failed since stream %s not found", pMsg->streamName); mError("failed to checkpoint since stream %s not found", pMsg->streamName);
return -1; return -1;
} }
...@@ -889,6 +887,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { ...@@ -889,6 +887,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
if (pTrans == NULL) return -1; if (pTrans == NULL) return -1;
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) { if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
mError("failed to checkpoin since stream %s not found", tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
mndReleaseStream(pMnode, pStream); mndReleaseStream(pMnode, pStream);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
...@@ -958,8 +957,6 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { ...@@ -958,8 +957,6 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
return 0; return 0;
} }
#endif
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
...@@ -1401,7 +1398,6 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { ...@@ -1401,7 +1398,6 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
return TSDB_CODE_ACTION_IN_PROGRESS; return TSDB_CODE_ACTION_IN_PROGRESS;
} }
static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t igUntreated) { static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t igUntreated) {
SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq)); SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq));
if (pReq == NULL) { if (pReq == NULL) {
......
...@@ -197,7 +197,7 @@ void tqClose(STQ*); ...@@ -197,7 +197,7 @@ void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
int tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
int tqCommit(STQ*); int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
...@@ -220,6 +220,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msg ...@@ -220,6 +220,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msg
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq); int32_t tqProcessSubmitReqForSubscribe(STQ* pTq);
int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen);
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver); int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
...@@ -228,6 +229,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); ...@@ -228,6 +229,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqCheckLogInWal(STQ* pTq, int64_t version); int32_t tqCheckLogInWal(STQ* pTq, int64_t version);
......
...@@ -71,7 +71,7 @@ static void destroyTqHandle(void* data) { ...@@ -71,7 +71,7 @@ static void destroyTqHandle(void* data) {
walCloseReader(pData->pWalReader); walCloseReader(pData->pWalReader);
tqCloseReader(pData->execHandle.pTqReader); tqCloseReader(pData->execHandle.pTqReader);
} }
if(pData->msg != NULL) { if (pData->msg != NULL) {
rpcFreeCont(pData->msg->pCont); rpcFreeCont(pData->msg->pCont);
taosMemoryFree(pData->msg); taosMemoryFree(pData->msg);
pData->msg = NULL; pData->msg = NULL;
...@@ -233,14 +233,15 @@ int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) { ...@@ -233,14 +233,15 @@ int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
int64_t sver = 0, ever = 0; int64_t sver = 0, ever = 0;
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, ever); tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver,
ever);
char buf1[80] = {0}; char buf1[80] = {0};
char buf2[80] = {0}; char buf2[80] = {0};
tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset); tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset); tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId,
vgId, dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2); dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
return 0; return 0;
} }
...@@ -256,8 +257,8 @@ int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* ...@@ -256,8 +257,8 @@ int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq*
tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf1, 80, &pRsp->reqOffset);
tFormatOffset(buf2, 80, &pRsp->rspOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset);
tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, vgId,
vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId); pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
return 0; return 0;
} }
...@@ -332,8 +333,7 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) ...@@ -332,8 +333,7 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen)
STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
if (pHandle == NULL) { if (pHandle == NULL) {
tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, pOffset->subKey);
pOffset->subKey);
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
...@@ -349,7 +349,7 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) ...@@ -349,7 +349,7 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen)
} }
taosRUnLockLatch(&pTq->lock); taosRUnLockLatch(&pTq->lock);
//3. check the offset info // 3. check the offset info
STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey); STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
if (pSavedOffset != NULL) { if (pSavedOffset != NULL) {
if (pSavedOffset->val.type != TMQ_OFFSET__LOG) { if (pSavedOffset->val.type != TMQ_OFFSET__LOG) {
...@@ -377,7 +377,7 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) ...@@ -377,7 +377,7 @@ int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen)
tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version, tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
pSavedOffset->val.version); pSavedOffset->val.version);
} else { } else {
tqDebug("vgId:%d sub:%s seek to:%"PRId64" not saved yet", vgId, pOffset->subKey, pOffset->val.version); tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version);
} }
if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) { if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
...@@ -525,7 +525,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -525,7 +525,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
dataRsp.rspOffset.type = TMQ_OFFSET__LOG; dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
if (reqOffset.type == TMQ_OFFSET__LOG) { if (reqOffset.type == TMQ_OFFSET__LOG) {
dataRsp.rspOffset.version = currentVer; // return current consume offset value dataRsp.rspOffset.version = currentVer; // return current consume offset value
} else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
...@@ -545,16 +545,16 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -545,16 +545,16 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey); tqDebug("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey);
int32_t code = 0; int32_t code = 0;
// taosWLockLatch(&pTq->lock); // taosWLockLatch(&pTq->lock);
// int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); // int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
// if (code != 0) { // if (code != 0) {
// tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey); // tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey);
// } // }
// taosWUnLockLatch(&pTq->lock); // taosWUnLockLatch(&pTq->lock);
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
if (pHandle) { if (pHandle) {
...@@ -618,7 +618,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t ...@@ -618,7 +618,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t
} }
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
int ret = 0; int ret = 0;
SMqRebVgReq req = {0}; SMqRebVgReq req = {0};
tDecodeSMqRebVgReq(msg, &req); tDecodeSMqRebVgReq(msg, &req);
...@@ -730,7 +730,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -730,7 +730,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
// remove if it has been register in the push manager, and return one empty block to consumer // remove if it has been register in the push manager, and return one empty block to consumer
tqUnregisterPushHandle(pTq, pHandle); tqUnregisterPushHandle(pTq, pHandle);
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
qStreamCloseTsdbReader(pTaskInfo); qStreamCloseTsdbReader(pTaskInfo);
} }
...@@ -745,9 +744,7 @@ end: ...@@ -745,9 +744,7 @@ end:
return ret; return ret;
} }
void freePtr(void *ptr) { void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
taosMemoryFree(*(void**)ptr);
}
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
...@@ -769,7 +766,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -769,7 +766,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->chkInfo.currentVer = ver; pTask->chkInfo.currentVer = ver;
// expand executor // expand executor
pTask->status.taskStatus = (pTask->fillHistory)? TASK_STATUS__WAIT_DOWNSTREAM:TASK_STATUS__NORMAL; pTask->status.taskStatus = (pTask->fillHistory) ? TASK_STATUS__WAIT_DOWNSTREAM : TASK_STATUS__NORMAL;
if (pTask->taskLevel == TASK_LEVEL__SOURCE) { if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
...@@ -833,8 +830,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -833,8 +830,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
streamSetupTrigger(pTask); streamSetupTrigger(pTask);
tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr, tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId,
pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel); pTask->id.idStr, pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel);
// next valid version will add one // next valid version will add one
pTask->chkInfo.version += 1; pTask->chkInfo.version += 1;
...@@ -1052,7 +1049,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t ...@@ -1052,7 +1049,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t
// do recovery step 2 // do recovery step 2
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
tqDebug("s-task:%s start step2 recover, ts:%"PRId64, pTask->id.idStr, st); tqDebug("s-task:%s start step2 recover, ts:%" PRId64, pTask->id.idStr, st);
code = streamSourceRecoverScanStep2(pTask, sversion); code = streamSourceRecoverScanStep2(pTask, sversion);
if (code < 0) { if (code < 0) {
...@@ -1080,7 +1077,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t ...@@ -1080,7 +1077,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t
return -1; return -1;
} }
double el = (taosGetTimestampMs() - st)/ 1000.0; double el = (taosGetTimestampMs() - st) / 1000.0;
tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el); tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el);
// dispatch recover finish req to all related downstream task // dispatch recover finish req to all related downstream task
...@@ -1265,8 +1262,11 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { ...@@ -1265,8 +1262,11 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
if (ASSERT(pHandle->msg != NULL)) { if (ASSERT(pHandle->msg != NULL)) {
tqError("pHandle->msg should not be null"); tqError("pHandle->msg should not be null");
break; break;
}else{ } else {
SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info}; SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME,
.pCont = pHandle->msg->pCont,
.contLen = pHandle->msg->contLen,
.info = pHandle->msg->info};
tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg); tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
taosMemoryFree(pHandle->msg); taosMemoryFree(pHandle->msg);
pHandle->msg = NULL; pHandle->msg = NULL;
...@@ -1297,8 +1297,8 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1297,8 +1297,8 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask != NULL) { if (pTask != NULL) {
if (pTask->status.taskStatus == TASK_STATUS__NORMAL) { if (pTask->status.taskStatus == TASK_STATUS__NORMAL) {
tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId, tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId, pTask->id.idStr,
pTask->id.idStr, pTask->chkInfo.version); pTask->chkInfo.version);
streamProcessRunReq(pTask); streamProcessRunReq(pTask);
} else { } else {
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr); tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr);
...@@ -1356,7 +1356,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL ...@@ -1356,7 +1356,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg; SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
if (pTask) { if (pTask) {
tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr); tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus); atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
...@@ -1368,18 +1368,18 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -1368,18 +1368,18 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
if (pTask) { if (pTask) {
atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus); atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
// no lock needs to secure the access of the version // no lock needs to secure the access of the version
if (pReq->igUntreated) { // discard all the data when the stream task is suspended. if (pReq->igUntreated) { // discard all the data when the stream task is suspended.
pTask->chkInfo.currentVer = sversion; pTask->chkInfo.currentVer = sversion;
tqDebug("vgId:%d s-task:%s resume to normal from the latest version:%" PRId64 ", vnode ver:%" PRId64, pTq->pStreamMeta->vgId, tqDebug("vgId:%d s-task:%s resume to normal from the latest version:%" PRId64 ", vnode ver:%" PRId64,
pTask->id.idStr, pTask->chkInfo.currentVer, sversion); pTq->pStreamMeta->vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion);
} else { // from the previous paused version and go on } else { // from the previous paused version and go on
tqDebug("vgId:%d s-task:%s resume to normal from paused ver:%" PRId64 ", vnode ver:%" PRId64, pTq->pStreamMeta->vgId, tqDebug("vgId:%d s-task:%s resume to normal from paused ver:%" PRId64 ", vnode ver:%" PRId64,
pTask->id.idStr, pTask->chkInfo.currentVer, sversion); pTq->pStreamMeta->vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion);
} }
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
...@@ -1523,3 +1523,7 @@ int32_t tqStartStreamTasks(STQ* pTq) { ...@@ -1523,3 +1523,7 @@ int32_t tqStartStreamTasks(STQ* pTq) {
return 0; return 0;
} }
int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
return 0;
return 0;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册