提交 b44447e6 编写于 作者: H Haojun Liao

enh(stream): support restore from disk.

上级 e392fb1a
...@@ -268,6 +268,7 @@ typedef struct SCheckpointInfo { ...@@ -268,6 +268,7 @@ typedef struct SCheckpointInfo {
typedef struct SStreamStatus { typedef struct SStreamStatus {
int8_t taskStatus; int8_t taskStatus;
int8_t checkDownstream;
int8_t schedStatus; int8_t schedStatus;
int8_t keepTaskStatus; int8_t keepTaskStatus;
bool transferState; bool transferState;
...@@ -528,11 +529,11 @@ typedef struct { ...@@ -528,11 +529,11 @@ typedef struct {
SArray* checkpointVer; // SArray<SStreamCheckpointInfo> SArray* checkpointVer; // SArray<SStreamCheckpointInfo>
} SStreamRecoverDownstreamRsp; } SStreamRecoverDownstreamRsp;
int32_t tEncodeSStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq); int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq);
int32_t tDecodeSStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq); int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq);
int32_t tEncodeSStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp); int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp);
int32_t tDecodeSStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp); int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp);
int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq); int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq);
int32_t tDecodeSStreamTaskRecoverReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq); int32_t tDecodeSStreamTaskRecoverReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq);
...@@ -568,10 +569,10 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); ...@@ -568,10 +569,10 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
// recover and fill history // recover and fill history
int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask); int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask);
int32_t streamTaskLaunchRecover(SStreamTask* pTask); int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask);
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
int32_t streamTaskStartHistoryTask(SStreamTask* pTask, int64_t ver); int32_t streamTaskStartHistoryTask(SStreamTask* pTask);
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
// common // common
......
...@@ -250,7 +250,8 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas ...@@ -250,7 +250,8 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas
pTask->dataRange.window.skey = INT64_MIN; pTask->dataRange.window.skey = INT64_MIN;
pTask->dataRange.window.ekey = 1685959190000;//taosGetTimestampMs(); pTask->dataRange.window.ekey = 1685959190000;//taosGetTimestampMs();
mDebug("0x%x----------------window:%"PRId64" - %"PRId64, pTask->id.taskId, pTask->dataRange.window.skey, pTask->dataRange.window.ekey); mDebug("add source task 0x%x window:%" PRId64 " - %" PRId64, pTask->id.taskId, pTask->dataRange.window.skey,
pTask->dataRange.window.ekey);
// sink or dispatch // sink or dispatch
if (hasExtraSink) { if (hasExtraSink) {
...@@ -323,7 +324,8 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) { ...@@ -323,7 +324,8 @@ static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) {
(*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId; (*pHTask)->streamTaskId.taskId = (*pStreamTask)->id.taskId;
(*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId; (*pHTask)->streamTaskId.streamId = (*pStreamTask)->id.streamId;
mDebug("s-task:0x%x related history task:0x%x", (*pStreamTask)->id.taskId, (*pHTask)->id.taskId); mDebug("s-task:0x%x related history task:0x%x, level:%d", (*pStreamTask)->id.taskId, (*pHTask)->id.taskId,
(*pHTask)->info.taskLevel);
} }
} }
......
...@@ -45,27 +45,10 @@ extern "C" { ...@@ -45,27 +45,10 @@ extern "C" {
typedef struct STqOffsetStore STqOffsetStore; typedef struct STqOffsetStore STqOffsetStore;
// tqPush // tqPush
#define EXTRACT_DATA_FROM_WAL_ID (-1)
// typedef struct { #define STREAM_TASK_STATUS_CHECK_ID (-2)
// // msg info
// int64_t consumerId;
// int64_t reqOffset;
// int64_t processedVer;
// int32_t epoch;
// // rpc info
// int64_t reqId;
// SRpcHandleInfo rpcInfo;
// tmr_h timerId;
// int8_t tmrStopped;
// // exec
// int8_t inputStatus;
// int8_t execStatus;
// SStreamQueue inputQ;
// SRWLatch lock;
// } STqPushHandle;
// tqExec // tqExec
typedef struct { typedef struct {
char* qmsg; // SubPlanToString char* qmsg; // SubPlanToString
} STqExecCol; } STqExecCol;
...@@ -181,6 +164,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); ...@@ -181,6 +164,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
// tqStream // tqStream
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqStreamTasksScanWal(STQ* pTq); int32_t tqStreamTasksScanWal(STQ* pTq);
int32_t tqStreamTasksStatusCheck(STQ* pTq);
// tq util // tq util
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock); int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock);
......
...@@ -218,6 +218,7 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); ...@@ -218,6 +218,7 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
int tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
int tqCheckforStreamStatus(STQ* pTq);
int tqCommit(STQ*); int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
......
...@@ -18,8 +18,6 @@ ...@@ -18,8 +18,6 @@
// 0: not init // 0: not init
// 1: already inited // 1: already inited
// 2: wait to be inited or cleaup // 2: wait to be inited or cleaup
#define WAL_READ_TASKS_ID (-1)
static int32_t tqInitialize(STQ* pTq); static int32_t tqInitialize(STQ* pTq);
static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; } static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; }
...@@ -819,9 +817,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ...@@ -819,9 +817,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->dataRange.range.maxVer = ver; pTask->dataRange.range.maxVer = ver;
pTask->dataRange.range.minVer = ver; pTask->dataRange.range.minVer = ver;
// expand executor
pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
if (pTask->pState == NULL) { if (pTask->pState == NULL) {
...@@ -903,10 +898,11 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -903,10 +898,11 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeSStreamTaskCheckReq(&decoder, &req); tDecodeStreamTaskCheckReq(&decoder, &req);
tDecoderClear(&decoder); tDecoderClear(&decoder);
int32_t taskId = req.downstreamTaskId; int32_t taskId = req.downstreamTaskId;
SStreamTaskCheckRsp rsp = { SStreamTaskCheckRsp rsp = {
.reqId = req.reqId, .reqId = req.reqId,
.streamId = req.streamId, .streamId = req.streamId,
...@@ -924,7 +920,8 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -924,7 +920,8 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d", tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d",
pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status); pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId,
streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status);
} else { } else {
rsp.status = 0; rsp.status = 0;
tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d", tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
...@@ -935,7 +932,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -935,7 +932,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t code; int32_t code;
int32_t len; int32_t len;
tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code); tEncodeSize(tEncodeStreamTaskCheckRsp, &rsp, len, code);
if (code < 0) { if (code < 0) {
tqError("vgId:%d failed to encode task check rsp, task:0x%x", pTq->pStreamMeta->vgId, taskId); tqError("vgId:%d failed to encode task check rsp, task:0x%x", pTq->pStreamMeta->vgId, taskId);
return -1; return -1;
...@@ -946,7 +943,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -946,7 +943,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, (uint8_t*)abuf, len); tEncoderInit(&encoder, (uint8_t*)abuf, len);
tEncodeSStreamTaskCheckRsp(&encoder, &rsp); tEncodeStreamTaskCheckRsp(&encoder, &rsp);
tEncoderClear(&encoder); tEncoderClear(&encoder);
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info}; SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info};
...@@ -961,7 +958,7 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32 ...@@ -961,7 +958,7 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen); tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp); code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
if (code < 0) { if (code < 0) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
...@@ -969,8 +966,8 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32 ...@@ -969,8 +966,8 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d (vgId:%d) check req from task:0x%x (vgId:%d), status %d", tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
if (pTask == NULL) { if (pTask == NULL) {
...@@ -1034,7 +1031,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms ...@@ -1034,7 +1031,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
// calculate the correct start time window, and start the handle the history data for the main task. // calculate the correct start time window, and start the handle the history data for the main task.
if (pTask->historyTaskId.taskId != 0) { if (pTask->historyTaskId.taskId != 0) {
// launch the history fill stream task // launch the history fill stream task
streamTaskStartHistoryTask(pTask, sversion); streamTaskStartHistoryTask(pTask);
// launch current task // launch current task
SHistDataRange* pRange = &pTask->dataRange; SHistDataRange* pRange = &pTask->dataRange;
...@@ -1056,11 +1053,12 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms ...@@ -1056,11 +1053,12 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
} }
ASSERT(pTask->status.checkDownstream == 0);
streamTaskCheckDownstreamTasks(pTask); streamTaskCheckDownstreamTasks(pTask);
} }
tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, numOfTasks:%d", vgId, pTask->id.idStr, tqDebug("vgId:%d s-task:%s is deployed and add into meta, status:%s, numOfTasks:%d", vgId, pTask->id.idStr,
pTask->status.taskStatus, numOfTasks); streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
return 0; return 0;
} }
...@@ -1120,8 +1118,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1120,8 +1118,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
// wait for the stream task get ready for scan history data // wait for the stream task get ready for scan history data
while (pStreamTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM || while (pStreamTask->status.checkDownstream == 0 || pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
tqDebug("s-task:%s level:%d not ready for halt, wait for 100ms and recheck", pStreamTask->id.idStr, tqDebug("s-task:%s level:%d not ready for halt, wait for 100ms and recheck", pStreamTask->id.idStr,
pStreamTask->info.taskLevel); pStreamTask->info.taskLevel);
taosMsleep(100); taosMsleep(100);
...@@ -1155,8 +1152,17 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1155,8 +1152,17 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// 5. resume the related stream task. // 5. resume the related stream task.
streamTryExec(pTask); streamTryExec(pTask);
streamMetaReleaseTask(pMeta, pStreamTask); pTask->status.taskStatus = TASK_STATUS__DROPPING;
tqDebug("s-task:%s set status to be dropping", pTask->id.idStr);
streamMetaSaveTask(pMeta, pTask);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
// streamMetaRemoveTask(pMeta, pTask->id.taskId);
streamMetaReleaseTask(pMeta, pStreamTask);
if (streamMetaCommit(pTask->pMeta) < 0) {
}
} else { } else {
// todo update the chkInfo version for current task. // todo update the chkInfo version for current task.
// this task has an associated history stream task, so we need to scan wal from the end version of // this task has an associated history stream task, so we need to scan wal from the end version of
...@@ -1338,7 +1344,12 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1338,7 +1344,12 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t taskId = pReq->taskId; int32_t taskId = pReq->taskId;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
if (taskId == WAL_READ_TASKS_ID) { // all tasks are extracted submit data from the wal if (taskId == STREAM_TASK_STATUS_CHECK_ID) {
tqStreamTasksStatusCheck(pTq);
return 0;
}
if (taskId == EXTRACT_DATA_FROM_WAL_ID) { // all tasks are extracted submit data from the wal
tqStreamTasksScanWal(pTq); tqStreamTasksScanWal(pTq);
return 0; return 0;
} }
...@@ -1553,43 +1564,3 @@ FAIL: ...@@ -1553,43 +1564,3 @@ FAIL:
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; } int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
int32_t tqStartStreamTasks(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
taosWLockLatch(&pMeta->lock);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqInfo("vgId:%d no stream tasks exist", vgId);
taosWUnLockLatch(&pMeta->lock);
return 0;
}
pMeta->walScanCounter += 1;
if (pMeta->walScanCounter > 1) {
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);
taosWUnLockLatch(&pMeta->lock);
return 0;
}
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
taosWUnLockLatch(&pMeta->lock);
return -1;
}
tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks);
pRunReq->head.vgId = vgId;
pRunReq->streamId = 0;
pRunReq->taskId = WAL_READ_TASKS_ID;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
taosWUnLockLatch(&pMeta->lock);
return 0;
}
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include "tq.h" #include "tq.h"
static int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle); static int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle);
static int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId);
// this function should be executed by stream threads. // this function should be executed by stream threads.
// extract submit block from WAL, and add them into the input queue for the sources tasks. // extract submit block from WAL, and add them into the input queue for the sources tasks.
...@@ -57,7 +58,110 @@ int32_t tqStreamTasksScanWal(STQ* pTq) { ...@@ -57,7 +58,110 @@ int32_t tqStreamTasksScanWal(STQ* pTq) {
return 0; return 0;
} }
static int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) { int32_t tqStreamTasksStatusCheck(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
tqDebug("vgId:%d start to check all (%d) stream tasks downstream status", vgId, numOfTasks);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
SArray* pTaskList = NULL;
taosWLockLatch(&pMeta->lock);
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
taosWUnLockLatch(&pMeta->lock);
for (int32_t i = 0; i < numOfTasks; ++i) {
int32_t* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, *pTaskId);
if (pTask == NULL) {
continue;
}
streamTaskCheckDownstreamTasks(pTask);
streamMetaReleaseTask(pMeta, pTask);
}
return 0;
}
int32_t tqCheckforStreamStatus(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
taosWLockLatch(&pMeta->lock);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqInfo("vgId:%d no stream tasks exist", vgId);
taosWUnLockLatch(&pMeta->lock);
return 0;
}
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
taosWUnLockLatch(&pMeta->lock);
return -1;
}
tqDebug("vgId:%d check for stream tasks status, numOfTasks:%d", vgId, numOfTasks);
pRunReq->head.vgId = vgId;
pRunReq->streamId = 0;
pRunReq->taskId = STREAM_TASK_STATUS_CHECK_ID;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
taosWUnLockLatch(&pMeta->lock);
return 0;
}
int32_t tqStartStreamTasks(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
taosWLockLatch(&pMeta->lock);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqInfo("vgId:%d no stream tasks exist", vgId);
taosWUnLockLatch(&pMeta->lock);
return 0;
}
pMeta->walScanCounter += 1;
if (pMeta->walScanCounter > 1) {
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);
taosWUnLockLatch(&pMeta->lock);
return 0;
}
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
taosWUnLockLatch(&pMeta->lock);
return -1;
}
tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks);
pRunReq->head.vgId = vgId;
pRunReq->streamId = 0;
pRunReq->taskId = EXTRACT_DATA_FROM_WAL_ID;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
taosWUnLockLatch(&pMeta->lock);
return 0;
}
int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
// seek the stored version and extract data from WAL // seek the stored version and extract data from WAL
int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader); int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
if (pTask->chkInfo.currentVer < firstVer) { if (pTask->chkInfo.currentVer < firstVer) {
...@@ -192,3 +296,4 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { ...@@ -192,3 +296,4 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
taosArrayDestroy(pTaskList); taosArrayDestroy(pTaskList);
return 0; return 0;
} }
...@@ -554,7 +554,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) ...@@ -554,7 +554,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", pVnode->config.vgId); vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", pVnode->config.vgId);
} else { } else {
vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId); vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId);
tqStartStreamTasks(pVnode->pTq); tqCheckforStreamStatus(pVnode->pTq);
} }
} }
......
...@@ -241,7 +241,7 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR ...@@ -241,7 +241,7 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR
SRpcMsg msg = {0}; SRpcMsg msg = {0};
int32_t tlen; int32_t tlen;
tEncodeSize(tEncodeSStreamTaskCheckReq, pReq, tlen, code); tEncodeSize(tEncodeStreamTaskCheckReq, pReq, tlen, code);
if (code < 0) { if (code < 0) {
return -1; return -1;
} }
...@@ -256,7 +256,7 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR ...@@ -256,7 +256,7 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR
SEncoder encoder; SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen); tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeSStreamTaskCheckReq(&encoder, pReq)) < 0) { if ((code = tEncodeStreamTaskCheckReq(&encoder, pReq)) < 0) {
rpcFreeCont(buf); rpcFreeCont(buf);
return code; return code;
} }
......
...@@ -389,8 +389,6 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -389,8 +389,6 @@ int32_t streamExecForAll(SStreamTask* pTask) {
} }
if (pInput == NULL) { if (pInput == NULL) {
qDebug("789 %s", pTask->id.idStr);
if (pTask->info.fillHistory && pTask->status.transferState) { if (pTask->info.fillHistory && pTask->status.transferState) {
// todo transfer task state here // todo transfer task state here
...@@ -414,14 +412,18 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -414,14 +412,18 @@ int32_t streamExecForAll(SStreamTask* pTask) {
// OR wait for the inputQ && outputQ of agg tasks are all consumed, and then start the state transfer // OR wait for the inputQ && outputQ of agg tasks are all consumed, and then start the state transfer
qDebug("s-task:%s no need to update time window", pStreamTask->id.idStr); qDebug("s-task:%s no need to update time window, for non-source task", pStreamTask->id.idStr);
} }
pTimeWindow->skey = INT64_MIN; pTimeWindow->skey = INT64_MIN;
streamSetStatusNormal(pStreamTask); streamSetStatusNormal(pStreamTask);
streamSchedExec(pStreamTask); streamMetaSaveTask(pTask->pMeta, pStreamTask);
if (streamMetaCommit(pTask->pMeta)) {
// persistent to disk for
}
streamSchedExec(pStreamTask);
streamMetaReleaseTask(pTask->pMeta, pStreamTask); streamMetaReleaseTask(pTask->pMeta, pStreamTask);
} }
......
...@@ -377,6 +377,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { ...@@ -377,6 +377,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
taosMemoryFree(pTask); taosMemoryFree(pTask);
continue; continue;
} }
if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, sizeof(void*)) < 0) { if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, sizeof(void*)) < 0) {
tdbFree(pKey); tdbFree(pKey);
tdbFree(pVal); tdbFree(pVal);
...@@ -385,12 +386,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { ...@@ -385,12 +386,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
return -1; return -1;
} }
// todo handle the fill history task ASSERT(pTask->status.checkDownstream == 0);
ASSERT(0);
if (pTask->info.fillHistory) {
ASSERT(pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM);
streamTaskCheckDownstreamTasks(pTask);
}
} }
tdbFree(pKey); tdbFree(pKey);
......
...@@ -28,32 +28,45 @@ const char* streamGetTaskStatusStr(int32_t status) { ...@@ -28,32 +28,45 @@ const char* streamGetTaskStatusStr(int32_t status) {
} }
} }
int32_t streamTaskLaunchRecover(SStreamTask* pTask) { static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { SVersionRange* pRange = &pTask->dataRange.range;
SVersionRange* pRange = &pTask->dataRange.range;
qDebug("s-task:%s vgId:%d task status:%s and start to scan-history-data task, ver:%" PRId64 " - %" PRId64,
pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->dataRange.range.minVer, pTask->dataRange.range.maxVer);
streamSetParamForRecover(pTask); qDebug("s-task:%s vgId:%d task status:%s and start to scan-history-data task, ver:%" PRId64 " - %" PRId64,
streamSourceRecoverPrepareStep1(pTask, pRange, &pTask->dataRange.window); pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->dataRange.range.minVer, pTask->dataRange.range.maxVer);
SStreamRecoverStep1Req req; streamSetParamForRecover(pTask);
streamBuildSourceRecover1Req(pTask, &req); streamSourceRecoverPrepareStep1(pTask, pRange, &pTask->dataRange.window);
int32_t len = sizeof(SStreamRecoverStep1Req);
void* serializedReq = rpcMallocCont(len); SStreamRecoverStep1Req req;
if (serializedReq == NULL) { streamBuildSourceRecover1Req(pTask, &req);
return -1; int32_t len = sizeof(SStreamRecoverStep1Req);
}
void* serializedReq = rpcMallocCont(len);
if (serializedReq == NULL) {
return -1;
}
memcpy(serializedReq, &req, len); memcpy(serializedReq, &req, len);
SRpcMsg rpcMsg = { .contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY }; SRpcMsg rpcMsg = {.contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY};
if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) { if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) {
/*ASSERT(0);*/ /*ASSERT(0);*/
} }
return 0;
}
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
return doLaunchScanHistoryTask(pTask);
} else {
ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
qDebug("s-task:%s no need to scan-history-data, status:%s, sched-status:%d, ver:%" PRId64, pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus,
walReaderGetCurrentVer(pTask->exec.pWalReader));
}
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
streamSetStatusNormal(pTask); streamSetStatusNormal(pTask);
streamSetParamForRecover(pTask); streamSetParamForRecover(pTask);
...@@ -68,9 +81,9 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask) { ...@@ -68,9 +81,9 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask) {
// check status // check status
int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
qDebug("s-task:%s in fill history stage, ver:%" PRId64 "-%"PRId64" window:%" PRId64"-%"PRId64, pTask->id.idStr, SHistDataRange* pRange = &pTask->dataRange;
pTask->dataRange.range.minVer, pTask->dataRange.range.maxVer, pTask->dataRange.window.skey, qDebug("s-task:%s check downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
pTask->dataRange.window.ekey); pTask->id.idStr, pRange->range.minVer, pRange->range.maxVer, pRange->window.skey, pRange->window.ekey);
SStreamTaskCheckReq req = { SStreamTaskCheckReq req = {
.streamId = pTask->id.streamId, .streamId = pTask->id.streamId,
...@@ -108,8 +121,10 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { ...@@ -108,8 +121,10 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
} }
} else { } else {
qDebug("s-task:%s (vgId:%d) direct launch recover since no downstream", pTask->id.idStr, pTask->info.nodeId); pTask->status.checkDownstream = 1;
streamTaskLaunchRecover(pTask); qDebug("s-task:%s (vgId:%d) set downstream task checked for task without downstream tasks, try to launch scan-history-data, status:%s",
pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus));
streamTaskLaunchScanHistory(pTask);
} }
return 0; return 0;
...@@ -178,8 +193,15 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs ...@@ -178,8 +193,15 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
taosArrayDestroy(pTask->checkReqIds); taosArrayDestroy(pTask->checkReqIds);
pTask->checkReqIds = NULL; pTask->checkReqIds = NULL;
qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage", id, numOfReqs); if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
streamTaskLaunchRecover(pTask); qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id, numOfReqs,
streamGetTaskStatusStr(pTask->status.taskStatus));
streamTaskLaunchScanHistory(pTask);
} else {
ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
qDebug("s-task:%s fixed downstream task is ready, now ready for data from wal, status:%s", id,
streamGetTaskStatusStr(pTask->status.taskStatus));
}
} else { } else {
qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, remain not ready:%d", id, qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, remain not ready:%d", id,
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, left); pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, left);
...@@ -189,13 +211,20 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs ...@@ -189,13 +211,20 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
return -1; return -1;
} }
ASSERT(pTask->status.taskStatus != TASK_STATUS__HALT); ASSERT(pTask->status.checkDownstream == 0);
pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY;
qDebug("s-task:%s fixed downstream task is ready, now enter into scan-history-data stage, status:%s", id, pTask->status.checkDownstream = 1;
streamGetTaskStatusStr(pTask->status.taskStatus)); ASSERT(pTask->status.taskStatus != TASK_STATUS__HALT);
streamTaskLaunchRecover(pTask); if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
qDebug("s-task:%s fixed downstream task is ready, now enter into scan-history-data stage, status:%s", id,
streamGetTaskStatusStr(pTask->status.taskStatus));
streamTaskLaunchScanHistory(pTask);
} else {
ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
qDebug("s-task:%s fixed downstream task is ready, now ready for data from wal, status:%s", id,
streamGetTaskStatusStr(pTask->status.taskStatus));
}
} else { } else {
ASSERT(0); ASSERT(0);
} }
...@@ -401,7 +430,7 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) { ...@@ -401,7 +430,7 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
qDebug("s-task:%s set the launch condition for fill history s-task:%s, window:%" PRId64 " - %" PRId64 qDebug("s-task:%s set the launch condition for fill history s-task:%s, window:%" PRId64 " - %" PRId64
" ver range:%" PRId64 " - %" PRId64, " ver range:%" PRId64 " - %" PRId64,
pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey, pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey,
pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer); pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer);
} else { } else {
...@@ -430,7 +459,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { ...@@ -430,7 +459,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
// todo fix the bug: 2. race condition // todo fix the bug: 2. race condition
// an fill history task needs to be started. // an fill history task needs to be started.
int32_t streamTaskStartHistoryTask(SStreamTask* pTask, int64_t ver) { int32_t streamTaskStartHistoryTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
if (pTask->historyTaskId.taskId == 0) { if (pTask->historyTaskId.taskId == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -479,11 +508,13 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { ...@@ -479,11 +508,13 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
/*code = */streamSetStatusNormal(pTask); /*code = */streamSetStatusNormal(pTask);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
// todo check rsp
streamMetaSaveTask(pMeta, pTask); streamMetaSaveTask(pMeta, pTask);
return 0; return 0;
} }
int32_t tEncodeSStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) { int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->reqId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
...@@ -496,7 +527,7 @@ int32_t tEncodeSStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq ...@@ -496,7 +527,7 @@ int32_t tEncodeSStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq
return pEncoder->pos; return pEncoder->pos;
} }
int32_t tDecodeSStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) { int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1; if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->reqId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
...@@ -509,7 +540,7 @@ int32_t tDecodeSStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq ...@@ -509,7 +540,7 @@ int32_t tDecodeSStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq
return 0; return 0;
} }
int32_t tEncodeSStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) { int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->reqId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
...@@ -523,7 +554,7 @@ int32_t tEncodeSStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp ...@@ -523,7 +554,7 @@ int32_t tEncodeSStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp
return pEncoder->pos; return pEncoder->pos;
} }
int32_t tDecodeSStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) { int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp) {
if (tStartDecode(pDecoder) < 0) return -1; if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->reqId) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
......
...@@ -42,6 +42,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto ...@@ -42,6 +42,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto
pTask->id.idStr = taosStrdup(buf); pTask->id.idStr = taosStrdup(buf);
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY;
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册