From 00acf4520c3418350afd962607fbd6715ca6758e Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 27 Jul 2022 16:49:03 +0800 Subject: [PATCH] refactor(stream): remove option --- include/libs/stream/tstream.h | 2 +- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 2 +- source/dnode/mnode/impl/src/mndScheduler.c | 2 ++ source/dnode/vnode/src/tq/tq.c | 2 +- source/libs/stream/src/streamTask.c | 4 ++-- source/libs/wal/inc/walInt.h | 6 ++++++ 6 files changed, 13 insertions(+), 5 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 6199732cc8..ab1c00a694 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -270,7 +270,7 @@ typedef struct SStreamTask { int64_t startVer; int64_t checkpointVer; int64_t processedVer; - int32_t numOfVgroups; + // int32_t numOfVgroups; // children info SArray* childEpInfo; // SArray diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 5ad13e383a..9d1142801d 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -89,7 +89,7 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { if (terrno != 0) code = terrno; - dGError("vgId:%d, msg:%p failed to stream since %s", pVnode->vgId, pMsg, terrstr()); + dGError("vgId:%d, msg:%p failed to process stream since %s", pVnode->vgId, pMsg, terrstr()); vmSendRsp(pMsg, code); } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 9882b0a9ae..9d7fa537bb 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -391,10 +391,12 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { // exec pInnerTask->execType = TASK_EXEC__PIPE; +#if 0 SDbObj* pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb); ASSERT(pDbObj != NULL); sdbRelease(pSdb, pSourceDb); pInnerTask->numOfVgroups = pSourceDb->cfg.numOfVgroups; +#endif if (tsSchedStreamToSnode) { SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 01f2f659ff..6b0e3944e3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -653,7 +653,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { } else { SReadHandle mgHandle = { .vnode = NULL, - .numOfVgroups = pTask->numOfVgroups, + .numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo), }; pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 5921e44a9c..216e3fa761 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -64,7 +64,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->numOfVgroups) < 0) return -1; + /*if (tEncodeI32(pEncoder, pTask->numOfVgroups) < 0) return -1;*/ int32_t epSz = taosArrayGetSize(pTask->childEpInfo); if (tEncodeI32(pEncoder, epSz) < 0) return -1; @@ -119,7 +119,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->numOfVgroups) < 0) return -1; + /*if (tDecodeI32(pDecoder, &pTask->numOfVgroups) < 0) return -1;*/ int32_t epSz; if (tDecodeI32(pDecoder, &epSz) < 0) return -1; diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index 20667fc918..3aebb1c6ba 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -61,26 +61,31 @@ static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) } static inline int64_t walGetLastFileSize(SWal* pWal) { + if (taosArrayGetSize(pWal->fileInfoSet) == 0) return 0; SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); return pInfo->fileSize; } static inline int64_t walGetLastFileFirstVer(SWal* pWal) { + if (taosArrayGetSize(pWal->fileInfoSet) == 0) return -1; SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); return pInfo->firstVer; } static inline int64_t walGetCurFileFirstVer(SWal* pWal) { + if (pWal->writeCur == -1) return -1; SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); return pInfo->firstVer; } static inline int64_t walGetCurFileLastVer(SWal* pWal) { + if (pWal->writeCur == -1) return -1; SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); return pInfo->firstVer; } static inline int64_t walGetCurFileOffset(SWal* pWal) { + if (pWal->writeCur == -1) return -1; SWalFileInfo* pInfo = (SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); return pInfo->fileSize; } @@ -88,6 +93,7 @@ static inline int64_t walGetCurFileOffset(SWal* pWal) { static inline bool walCurFileClosed(SWal* pWal) { return taosArrayGetSize(pWal->fileInfoSet) != pWal->writeCur; } static inline SWalFileInfo* walGetCurFileInfo(SWal* pWal) { + if (pWal->writeCur == -1) return NULL; return (SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur); } -- GitLab