From 18479d8115f26c52e726b56d75ddb76bcada6209 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 11 Apr 2023 15:42:24 +0800 Subject: [PATCH] refactor: do some internall refactor. --- include/libs/stream/tstream.h | 8 ++++---- source/dnode/mnode/impl/src/mndDef.c | 4 ++-- source/dnode/mnode/impl/src/mndScheduler.c | 10 +++++----- source/dnode/mnode/impl/src/mndStream.c | 8 ++++---- source/dnode/snode/src/snode.c | 2 +- source/dnode/vnode/src/tq/tq.c | 10 +++++----- source/dnode/vnode/src/tq/tqRestore.c | 5 +++-- source/libs/stream/src/streamExec.c | 14 +++++++++----- source/libs/stream/src/streamMeta.c | 8 ++++---- source/libs/stream/src/streamTask.c | 6 +++--- 10 files changed, 40 insertions(+), 35 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 7dcb2e1796..2368788824 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -50,7 +50,7 @@ enum { TASK_STATUS__RECOVER_PREPARE, TASK_STATUS__RECOVER1, TASK_STATUS__RECOVER2, - TASK_STATUS_RESTORE, // only available for source task to replay WAL from the checkpoint + TASK_STATUS__RESTORE, // only available for source task to replay WAL from the checkpoint }; enum { @@ -353,9 +353,9 @@ struct SStreamTask { int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo); -SStreamTask* tNewSStreamTask(int64_t streamId); -int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); -int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask); +SStreamTask* tNewStreamTask(int64_t streamId); +int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); +int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); void tFreeStreamTask(SStreamTask* pTask); int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem); bool tInputQueueIsFull(const SStreamTask* pTask); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index e221a64619..c69f08eb6b 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -70,7 +70,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { if (tEncodeI32(pEncoder, innerSz) < 0) return -1; for (int32_t j = 0; j < innerSz; j++) { SStreamTask *pTask = taosArrayGetP(pArray, j); - if (tEncodeSStreamTask(pEncoder, pTask) < 0) return -1; + if (tEncodeStreamTask(pEncoder, pTask) < 0) return -1; } } @@ -130,7 +130,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) { taosArrayDestroy(pArray); return -1; } - if (tDecodeSStreamTask(pDecoder, pTask) < 0) { + if (tDecodeStreamTask(pDecoder, pTask) < 0) { taosMemoryFree(pTask); taosArrayDestroy(pArray); return -1; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 504749df49..36521fd778 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -224,7 +224,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { continue; } - SStreamTask* pTask = tNewSStreamTask(pStream->uid); + SStreamTask* pTask = tNewStreamTask(pStream->uid); if (pTask == NULL) { sdbRelease(pSdb, pVgroup); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -260,7 +260,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) { SArray* tasks = taosArrayGetP(pStream->tasks, 0); - SStreamTask* pTask = tNewSStreamTask(pStream->uid); + SStreamTask* pTask = tNewStreamTask(pStream->uid); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -350,7 +350,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { return -1; } - pInnerTask = tNewSStreamTask(pStream->uid); + pInnerTask = tNewStreamTask(pStream->uid); if (pInnerTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; qDestroyQueryPlan(pPlan); @@ -421,7 +421,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { continue; } - SStreamTask* pTask = tNewSStreamTask(pStream->uid); + SStreamTask* pTask = tNewStreamTask(pStream->uid); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; sdbRelease(pSdb, pVgroup); @@ -491,7 +491,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { continue; } - SStreamTask* pTask = tNewSStreamTask(pStream->uid); + SStreamTask* pTask = tNewStreamTask(pStream->uid); if (pTask == NULL) { sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index a0118ee749..b8c540266f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -39,8 +39,8 @@ static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStream static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq); static int32_t mndProcessDropStreamReq(SRpcMsg *pReq); static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq); -// static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq); -/*static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq);*/ +static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq); +static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq); static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq); static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); @@ -418,7 +418,7 @@ FAIL: int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) { SEncoder encoder; tEncoderInit(&encoder, NULL, 0); - tEncodeSStreamTask(&encoder, pTask); + tEncodeStreamTask(&encoder, pTask); int32_t size = encoder.pos; int32_t tlen = sizeof(SMsgHead) + size; tEncoderClear(&encoder); @@ -430,7 +430,7 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) { ((SMsgHead *)buf)->vgId = htonl(pTask->nodeId); void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); tEncoderInit(&encoder, abuf, size); - tEncodeSStreamTask(&encoder, pTask); + tEncodeStreamTask(&encoder, pTask); tEncoderClear(&encoder); STransAction action = {0}; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index ee6f649d52..f8e4268aad 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -139,7 +139,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { } SDecoder decoder; tDecoderInit(&decoder, (uint8_t *)msg, msgLen); - code = tDecodeSStreamTask(&decoder, pTask); + code = tDecodeStreamTask(&decoder, pTask); if (code < 0) { tDecoderClear(&decoder); taosMemoryFree(pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index afdbf4e7c8..6fca9cc808 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -906,14 +906,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pTq->pVnode->msgCb; - pTask->chkInfo.version = ver; pTask->pMeta = pTq->pStreamMeta; // expand executor if (pTask->fillHistory) { pTask->taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; } else { - pTask->taskStatus = TASK_STATUS_RESTORE; + pTask->taskStatus = TASK_STATUS__RESTORE; } if (pTask->taskLevel == TASK_LEVEL__SOURCE) { @@ -1089,7 +1088,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - code = tDecodeSStreamTask(&decoder, pTask); + code = tDecodeStreamTask(&decoder, pTask); if (code < 0) { tDecoderClear(&decoder); taosMemoryFree(pTask); @@ -1485,8 +1484,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { if (pTask->taskStatus == TASK_STATUS__NORMAL) { tqDebug("vgId:%d s-task:%s start to process run req", vgId, pTask->id.idStr); streamProcessRunReq(pTask); - } else if (pTask->taskStatus == TASK_STATUS_RESTORE) { - tqDebug("vgId:%d s-task:%s start to restore from last ck", vgId, pTask->id.idStr); + } else if (pTask->taskStatus == TASK_STATUS__RESTORE) { + tqDebug("vgId:%d s-task:%s start to process in restore procedure from last chk point:%" PRId64, vgId, + pTask->id.idStr, pTask->chkInfo.version); streamProcessRunReq(pTask); } else { tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 9377e3d58f..a123bdb1dc 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -49,9 +49,10 @@ int32_t transferToNormalTask(SStreamMeta* pStreamMeta, SArray* pTaskList) { } // todo: add lock - for(int32_t i = 0; i < numOfTask; ++i){ + for (int32_t i = 0; i < numOfTask; ++i) { SStreamTask* pTask = taosArrayGetP(pTaskList, i); - tqDebug("vgId:%d transfer s-task:%s state restore -> ready", pStreamMeta->vgId, pTask->id.idStr); + tqDebug("vgId:%d transfer s-task:%s state restore -> ready, checkpoint:%" PRId64 " checkpoint id:%" PRId64, + pStreamMeta->vgId, pTask->id.idStr, pTask->chkInfo.version, pTask->chkInfo.id); taosHashRemove(pStreamMeta->pRestoreTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); // NOTE: do not change the following order diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 98052ec6ba..075e477eb3 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -21,11 +21,15 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* int32_t code = TSDB_CODE_SUCCESS; void* pExecutor = pTask->exec.pExecutor; - while (pTask->taskLevel == TASK_LEVEL__SOURCE && atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) { - qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr, - atomic_load_8(&pTask->taskStatus)); - taosMsleep(2); - continue; + while (pTask->taskLevel == TASK_LEVEL__SOURCE) { + int8_t status = atomic_load_8(&pTask->taskStatus); + if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__RESTORE) { + qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr, + atomic_load_8(&pTask->taskStatus)); + taosMsleep(2); + } else { + break; + } } // set input diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a22d768a89..ae65753bed 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -109,7 +109,7 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t ver, char* msg, } SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - if (tDecodeSStreamTask(&decoder, pTask) < 0) { + if (tDecodeStreamTask(&decoder, pTask) < 0) { tDecoderClear(&decoder); goto FAIL; } @@ -142,7 +142,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { void* buf = NULL; int32_t len; int32_t code; - tEncodeSize(tEncodeSStreamTask, pTask, len, code); + tEncodeSize(tEncodeStreamTask, pTask, len, code); if (code < 0) { return -1; } @@ -153,7 +153,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, len); - tEncodeSStreamTask(&encoder, pTask); + tEncodeStreamTask(&encoder, pTask); tEncoderClear(&encoder); if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), buf, len, pMeta->txn) < 0) { @@ -321,7 +321,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { return -1; } tDecoderInit(&decoder, (uint8_t*)pVal, vLen); - tDecodeSStreamTask(&decoder, pTask); + tDecodeStreamTask(&decoder, pTask); tDecoderClear(&decoder); if (pMeta->expandFunc(pMeta->ahandle, pTask, -1) < 0) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index b1f0a63c2e..f45b6ad7b7 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -16,7 +16,7 @@ #include "executor.h" #include "tstream.h" -SStreamTask* tNewSStreamTask(int64_t streamId) { +SStreamTask* tNewStreamTask(int64_t streamId) { SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { return NULL; @@ -54,7 +54,7 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) { return 0; } -int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { +int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1; @@ -107,7 +107,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { return pEncoder->pos; } -int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { +int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1; -- GitLab