提交 0dd93301 编写于 作者: H Haojun Liao

enh(stream): make history task for stream running.

上级 e8549ce5
...@@ -552,10 +552,11 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus); ...@@ -552,10 +552,11 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
// recover and fill history // recover and fill history
int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version); int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask);
int32_t streamTaskLaunchRecover(SStreamTask* pTask); int32_t streamTaskLaunchRecover(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask);
int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version); int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
int32_t streamTaskStartHistoryTask(SStreamTask* pTask, int64_t ver);
// common // common
int32_t streamSetParamForRecover(SStreamTask* pTask); int32_t streamSetParamForRecover(SStreamTask* pTask);
...@@ -570,7 +571,6 @@ int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver); ...@@ -570,7 +571,6 @@ int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver);
int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask); int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask);
// agg level // agg level
int32_t streamAggRecoverPrepare(SStreamTask* pTask); int32_t streamAggRecoverPrepare(SStreamTask* pTask);
// int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask);
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId); int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId);
void streamMetaInit(); void streamMetaInit();
......
...@@ -92,7 +92,7 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -92,7 +92,7 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
dGTrace("vgId:%d, msg:%p get from vnode-stream queue", pVnode->vgId, pMsg); dGTrace("vgId:%d, msg:%p get from vnode-stream queue", pVnode->vgId, pMsg);
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); int32_t code = vnodeProcessStreamMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) { if (code != 0) {
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
dGError("vgId:%d, msg:%p failed to process stream msg %s since %s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType), dGError("vgId:%d, msg:%p failed to process stream msg %s since %s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
......
...@@ -238,10 +238,6 @@ int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* p ...@@ -238,10 +238,6 @@ int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* p
return 0; return 0;
} }
static int32_t mndScheduleFillHistoryStreamTask(SMnode* pMnode, SStreamObj* pStream) {
return 0;
}
static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SArray* pSinkTaskList, static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SArray* pSinkTaskList,
SStreamObj* pStream, SSubplan* plan, uint64_t uid, int8_t fillHistory, SStreamObj* pStream, SSubplan* plan, uint64_t uid, int8_t fillHistory,
bool hasExtraSink) { bool hasExtraSink) {
...@@ -250,6 +246,11 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas ...@@ -250,6 +246,11 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas
return terrno; return terrno;
} }
if (fillHistory) { // todo set the correct ts, which should be last key of queried table.
pTask->dataRange.window.skey = INT64_MIN;
pTask->dataRange.window.ekey = taosGetTimestampMs();
}
// sink or dispatch // sink or dispatch
if (hasExtraSink) { if (hasExtraSink) {
mndAddDispatcherForInnerTask(pMnode, pStream, pSinkTaskList, pTask); mndAddDispatcherForInnerTask(pMnode, pStream, pSinkTaskList, pTask);
......
...@@ -95,6 +95,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -95,6 +95,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit); void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit);
......
...@@ -979,7 +979,7 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32 ...@@ -979,7 +979,7 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32
return -1; return -1;
} }
code = streamProcessTaskCheckRsp(pTask, &rsp, sversion); code = streamProcessCheckRsp(pTask, &rsp);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return code; return code;
} }
...@@ -1032,25 +1032,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms ...@@ -1032,25 +1032,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
tqDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr); tqDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
} else { } else {
if (pTask->historyTaskId.taskId != 0) { if (pTask->historyTaskId.taskId != 0) {
// todo fix the bug: 1. maybe failed to located the fill history task, since it is not built yet. 2. race condition streamTaskStartHistoryTask(pTask, sversion);
// an fill history task needs to be started.
// Set the execute conditions, including the query time window and the version range
SStreamTask* pHTask = taosHashGet(pStreamMeta->pTasks, &pTask->historyTaskId.taskId, sizeof(pTask->historyTaskId.taskId));
pHTask->dataRange.range.minVer = 0;
pHTask->dataRange.range.maxVer = sversion;
pHTask->dataRange.window.skey = INT64_MIN;
pHTask->dataRange.window.ekey = 1000000;
tqDebug("s-task:%s set the launch condition for fill history task:%s, window:%" PRId64 " - %" PRId64
" verrange:%" PRId64 " - %" PRId64,
pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey,
pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer);
// check if downstream tasks have been ready
streamTaskCheckDownstream(pHTask, sversion);
} }
} }
...@@ -1091,8 +1073,11 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1091,8 +1073,11 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
} }
double el = (taosGetTimestampMs() - st) / 1000.0; double el = (taosGetTimestampMs() - st) / 1000.0;
tqDebug("s-task:%s non-blocking recover stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el); tqDebug("s-task:%s history scan stage(step 1) ended, elapsed time:%.2fs", pTask->id.idStr, el);
// todo transfer the executor status, and then destroy this stream task
#if 0
// build msg to launch next step // build msg to launch next step
SStreamRecoverStep2Req req; SStreamRecoverStep2Req req;
code = streamBuildSourceRecover2Req(pTask, &req); code = streamBuildSourceRecover2Req(pTask, &req);
...@@ -1123,6 +1108,8 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1123,6 +1108,8 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
.code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq}; .code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq};
tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg); tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
#endif
return 0; return 0;
} }
......
...@@ -588,6 +588,40 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { ...@@ -588,6 +588,40 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
} }
} }
int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
pMsg->msgType == TDMT_VND_BATCH_META) &&
!syncIsReadyForRead(pVnode->sync)) {
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
return 0;
}
switch (pMsg->msgType) {
case TDMT_STREAM_TASK_RUN:
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH:
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
case TDMT_STREAM_TASK_CHECK:
return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH_RSP:
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE:
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE_RSP:
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE:
return tqProcessTaskRecover1Req(pVnode->pTq, pMsg);
case TDMT_STREAM_RECOVER_FINISH:
return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg);
case TDMT_STREAM_RECOVER_FINISH_RSP:
return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg);
default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR;
}
}
// TODO: remove the function // TODO: remove the function
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
// TODO // TODO
......
...@@ -31,7 +31,7 @@ typedef struct { ...@@ -31,7 +31,7 @@ typedef struct {
void* timer; void* timer;
} SStreamGlobalEnv; } SStreamGlobalEnv;
static SStreamGlobalEnv streamEnv; extern SStreamGlobalEnv streamEnv;
int32_t streamDispatchStreamBlock(SStreamTask* pTask); int32_t streamDispatchStreamBlock(SStreamTask* pTask);
......
...@@ -21,6 +21,8 @@ ...@@ -21,6 +21,8 @@
#define ONE_MB_F (1048576.0) #define ONE_MB_F (1048576.0)
#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F) #define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F)
SStreamGlobalEnv streamEnv;
int32_t streamInit() { int32_t streamInit() {
int8_t old; int8_t old;
while (1) { while (1) {
......
...@@ -387,9 +387,10 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { ...@@ -387,9 +387,10 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
} }
// todo handle the fill history task // todo handle the fill history task
ASSERT(0);
if (pTask->fillHistory) { if (pTask->fillHistory) {
ASSERT(pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM); ASSERT(pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM);
streamTaskCheckDownstream(pTask, ver); streamTaskCheckDownstreamTasks(pTask);
} }
} }
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "streamInc.h" #include "streamInc.h"
#include "ttimer.h"
int32_t streamTaskLaunchRecover(SStreamTask* pTask) { int32_t streamTaskLaunchRecover(SStreamTask* pTask) {
qDebug("s-task:%s at node %d launch recover", pTask->id.idStr, pTask->nodeId); qDebug("s-task:%s at node %d launch recover", pTask->id.idStr, pTask->nodeId);
...@@ -54,8 +55,9 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask) { ...@@ -54,8 +55,9 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask) {
} }
// check status // check status
int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t ver) { int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
qDebug("s-task:%s in fill history stage, ver:%"PRId64, pTask->id.idStr, ver); qDebug("s-task:%s in fill history stage, ver:%"PRId64" ekey:%"PRId64, pTask->id.idStr, pTask->dataRange.range.maxVer,
pTask->dataRange.window.ekey);
SStreamTaskCheckReq req = { SStreamTaskCheckReq req = {
.streamId = pTask->id.streamId, .streamId = pTask->id.streamId,
...@@ -135,7 +137,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask) { ...@@ -135,7 +137,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask) {
return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL? 1:0; return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL? 1:0;
} }
int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t ver) { int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
qDebug("s-task:%s at node %d recv check rsp from task:0x%x at node %d: status %d", pTask->id.idStr, qDebug("s-task:%s at node %d recv check rsp from task:0x%x at node %d: status %d", pTask->id.idStr,
...@@ -297,6 +299,64 @@ int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) { ...@@ -297,6 +299,64 @@ int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) {
return 0; return 0;
} }
static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
pHTask->dataRange.range.minVer = 0;
pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer;
qDebug("s-task:%s set the launch condition for fill history task:%s, window:%" PRId64 " - %" PRId64
" verrange:%" PRId64 " - %" PRId64,
pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey,
pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer);
// check if downstream tasks have been ready
streamTaskCheckDownstreamTasks(pHTask);
}
static void tryLaunchHistoryTask(void* param, void* tmrId) {
SStreamTask* pTask = param;
SStreamMeta* pMeta = pTask->pMeta;
SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &pTask->historyTaskId.taskId, sizeof(pTask->historyTaskId.taskId));
if (pHTask == NULL) {
qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
pMeta->vgId, pTask->historyTaskId.taskId);
taosTmrReset(tryLaunchHistoryTask, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
return;
}
doCheckDownstreamStatus(pTask, *pHTask);
}
// todo fix the bug: 2. race condition
// an fill history task needs to be started.
int32_t streamTaskStartHistoryTask(SStreamTask* pTask, int64_t ver) {
SStreamMeta* pMeta = pTask->pMeta;
if (pTask->historyTaskId.taskId == 0) {
return TSDB_CODE_SUCCESS;
}
// Set the execute conditions, including the query time window and the version range
SStreamTask** pHTask = taosHashGet(pMeta->pTasks, &pTask->historyTaskId.taskId, sizeof(pTask->historyTaskId.taskId));
if (pHTask == NULL) {
qWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since it is not built yet", pTask->id.idStr,
pMeta->vgId, pTask->historyTaskId.taskId);
if (pTask->timer == NULL) {
pTask->timer = taosTmrStart(tryLaunchHistoryTask, 100, pTask, streamEnv.timer);
if (pTask->timer == NULL) {
// todo failed to create timer
}
}
// try again in 500ms
return TSDB_CODE_SUCCESS;
}
doCheckDownstreamStatus(pTask, *pHTask);
return TSDB_CODE_SUCCESS;
}
int32_t tEncodeSStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) { int32_t tEncodeSStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册