diff --git a/include/common/tcommon.h b/include/common/tcommon.h index cc51f96f6c7c0a5fed3b231a360335ad92429cf8..2e646f4769607947102be1dbae767410a167a6e8 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -37,14 +37,6 @@ enum { TMQ_MSG_TYPE__EP_RSP, }; -enum { - STREAM_TRIGGER__AT_ONCE = 1, - STREAM_TRIGGER__WINDOW_CLOSE, - STREAM_TRIGGER__BY_COUNT, - STREAM_TRIGGER__BY_BATCH_COUNT, - STREAM_TRIGGER__BY_EVENT_TIME, -}; - typedef enum EStreamType { STREAM_NORMAL = 1, STREAM_INVERT, diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 58905cac19e852a130cf7472f09b392a603a930f..7d49c4206f9c300f15dba0f09e24d37918fc8237 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -408,7 +408,7 @@ int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_ pParam->userParam = userParam; if (!async) tsem_init(&pParam->rspSem, 0, 0); - sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo)); + sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) goto END; sendInfo->msgInfo = (SDataBuf){ .pData = buf, @@ -704,7 +704,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { void* abuf = buf; tSerializeSCMSubscribeReq(&abuf, &req); - SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo)); + SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) goto FAIL; SMqSubscribeCbParam param = { @@ -1008,7 +1008,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { pParam->async = async; tsem_init(&pParam->rspSem, 0, 0); - SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo)); + SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { tsem_destroy(&pParam->rspSem); taosMemoryFree(pParam); @@ -1162,7 +1162,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { pParam->vgId = pVg->vgId; pParam->epoch = tmq->epoch; - SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo)); + SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { taosMemoryFree(pReq); taosMemoryFree(pParam); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index a4ddf966300aca2974fa90f802bac7ed2aeacacb..2454a4686d68fe00de8e0062a25f4e49bffa815a 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -351,9 +351,15 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ASSERT(totLevel <= 2); pStream->tasks = taosArrayInit(totLevel, sizeof(void*)); - bool hasExtraSink = false; - bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; - if (totLevel == 2 || externalTargetDB) { + bool hasExtraSink = false; + bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; + SDbObj* pDbObj = mndAcquireDb(pMnode, pStream->targetDb); + ASSERT(pDbObj != NULL); + sdbRelease(pSdb, pDbObj); + + bool multiTarget = pDbObj->cfg.numOfVgroups > 1; + + if (totLevel == 2 || externalTargetDB || multiTarget) { SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); taosArrayPush(pStream->tasks, &taskOneLevel); // add extra sink diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e810bb99086cc32a580717fb1633ce92682c0ac5..4a5ea49d79dce40c9bfaafd583546d3a80feb9e4 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -136,6 +136,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pReq->subKey); return -1; } + if (pHandle->consumerId != consumerId) { tqError("tmq poll: consumer handle mismatch for consumer %ld in vg %d, subkey %s, handle consumer id %ld", consumerId, pTq->pVnode->config.vgId, pReq->subKey, pHandle->consumerId); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 069595390db7833e025f3f9498e169cfb6980581..d3828d9ee446f31676e4b0fb4f5ec7c125682d66 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -110,9 +110,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp return 0; } // continue dispatch - if (pTask->dispatchType != TASK_DISPATCH__NONE) { - streamDispatch(pTask, pMsgCb); - } + streamDispatch(pTask, pMsgCb); return 0; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index d52337463874ab656c697ff72ce5123f5ac4ed64..59ec2b5cebcce508cc7b75bcea446ca17f9ddc43 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -182,6 +182,7 @@ FAIL: } int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) { + ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE); #if 1 int8_t old = atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);