From ef092c1cce8882c47ee9976958216fa44532d3fa Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 15 Mar 2023 11:54:36 +0000 Subject: [PATCH] enh: add queue limit --- source/dnode/vnode/src/tq/tq.c | 27 +++++++++++++++++---------- source/libs/stream/src/streamMeta.c | 8 +++++++- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5d3350a69a..4c57e92a6c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -15,6 +15,10 @@ #include "tq.h" +// 0: not init +// 1: already inited +// 2: wait to be inited or cleaup + int32_t tqInit() { int8_t old; while (1) { @@ -275,8 +279,8 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con char buf2[80] = {0}; tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset); - tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d), block num:%d, req:%s, rsp:%s", - TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); + tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d), block num:%d, req:%s, rsp:%s", TD_VID(pTq->pVnode), + pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); return 0; } @@ -497,7 +501,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { // update epoch if need int32_t savedEpoch = atomic_load_32(&pHandle->epoch); while (savedEpoch < reqEpoch) { - tqDebug("tmq poll: consumer:0x%"PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch); + tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, + reqEpoch); savedEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, savedEpoch, reqEpoch); } @@ -602,7 +607,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { code = -1; } - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "", + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 + ", ts:%" PRId64 "", consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, dataRsp.rspOffset.ts); @@ -612,7 +618,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { // for taosx SMqMetaRsp metaRsp = {0}; - STaosxRsp taosxRsp = {0}; + STaosxRsp taosxRsp = {0}; tqInitTaosxRsp(&taosxRsp, &req); if (fetchOffsetNew.type != TMQ_OFFSET__LOG) { @@ -887,14 +893,15 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); - tqDebug("try to persist handle %s consumer:0x%" PRIx64" , old consumer:0x%"PRIx64, req.subKey, pHandle->consumerId, - oldConsumerId); + tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey, + pHandle->consumerId, oldConsumerId); if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { return -1; } } else { // TODO handle qmsg and exec modification - tqInfo("update the consumer info, old consumer id:0x%"PRIx64", new Id:0x%"PRIx64, pHandle->consumerId, req.newConsumerId); + tqInfo("update the consumer info, old consumer id:0x%" PRIx64 ", new Id:0x%" PRIx64, pHandle->consumerId, + req.newConsumerId); atomic_store_32(&pHandle->epoch, -1); atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_add_fetch_32(&pHandle->epoch, 1); @@ -983,9 +990,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->tbSink.vnode = pTq->pVnode; pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2; - int32_t version = 1; + int32_t version = 1; SMetaInfo info = {0}; - int32_t code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL); + int32_t code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL); if (code == TSDB_CODE_SUCCESS) { version = info.skmVer; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 66d98e90bf..a638da1336 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -18,6 +18,7 @@ #include "ttimer.h" SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) { + int code = -1; SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); if (pMeta == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -33,7 +34,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF } sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints"); - taosMulModeMkDir(streamPath, 0755); + code = taosMulModeMkDir(streamPath, 0755) != 0; + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + taosMemoryFree(streamPath); + goto _err; + } taosMemoryFree(streamPath); if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) { -- GitLab