diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 12876e177423406f29c27bf3dc8d1cf16c215544..47bfc47b532b83d05e6be64b5e985c64dc28278f 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -474,7 +474,6 @@ typedef struct { int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); -// int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId); int32_t streamSetupTrigger(SStreamTask* pTask); int32_t streamProcessRunReq(SStreamTask* pTask); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index c68c3fad955e5f5e3b64651c2eb044a0192af1e1..f7d45dc6ff965bfa18693a5a1f21855c3540516a 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -189,20 +189,6 @@ typedef struct { tsem_t rspSem; } SMqPollCbParam; -#if 0 -typedef struct { - tmq_t* tmq; - int8_t async; - int8_t automatic; - int8_t freeOffsets; - tmq_commit_cb* userCb; - tsem_t rspSem; - int32_t rspErr; - SArray* offsets; - void* userParam; -} SMqCommitCbParam; -#endif - typedef struct { tmq_t* tmq; int8_t automatic; @@ -385,29 +371,6 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) { return sprintf(dst, "%s:%d", topicName, vg); } -#if 0 -int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { - SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; - pParam->rspErr = code; - if (pParam->async) { - if (pParam->automatic && pParam->tmq->commitCb) { - pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, pParam->tmq->commitCbUserParam); - } else if (!pParam->automatic && pParam->userCb) { - pParam->userCb(pParam->tmq, pParam->rspErr, pParam->userParam); - } - - if (pParam->freeOffsets) { - taosArrayDestroy(pParam->offsets); - } - - taosMemoryFree(pParam); - } else { - tsem_post(&pParam->rspSem); - } - return 0; -} -#endif - int32_t tmqCommitCb2(void* param, SDataBuf* pBuf, int32_t code) { SMqCommitCbParam2* pParam = (SMqCommitCbParam2*)param; SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params; @@ -660,123 +623,6 @@ int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_ return 0; } -#if 0 -int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, - tmq_commit_cb* userCb, void* userParam) { - SMqCMCommitOffsetReq req; - SArray* pOffsets = NULL; - void* buf = NULL; - SMqCommitCbParam* pParam = NULL; - SMsgSendInfo* sendInfo = NULL; - int8_t freeOffsets; - int32_t code = -1; - - if (msg == NULL) { - freeOffsets = 1; - pOffsets = taosArrayInit(0, sizeof(SMqOffset)); - for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { - SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); - for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { - SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - SMqOffset offset; - tstrncpy(offset.topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN); - tstrncpy(offset.cgroup, tmq->groupId, TSDB_CGROUP_LEN); - offset.vgId = pVg->vgId; - offset.offset = pVg->currentOffset; - taosArrayPush(pOffsets, &offset); - } - } - } else { - freeOffsets = 0; - pOffsets = (SArray*)&msg->container; - } - - req.num = (int32_t)pOffsets->size; - req.offsets = pOffsets->pData; - - SEncoder encoder; - - tEncoderInit(&encoder, NULL, 0); - code = tEncodeSMqCMCommitOffsetReq(&encoder, &req); - if (code < 0) { - goto END; - } - int32_t tlen = encoder.pos; - buf = taosMemoryMalloc(tlen); - if (buf == NULL) { - tEncoderClear(&encoder); - goto END; - } - tEncoderClear(&encoder); - - tEncoderInit(&encoder, buf, tlen); - tEncodeSMqCMCommitOffsetReq(&encoder, &req); - tEncoderClear(&encoder); - - pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam)); - if (pParam == NULL) { - goto END; - } - pParam->tmq = tmq; - pParam->automatic = automatic; - pParam->async = async; - pParam->offsets = pOffsets; - pParam->freeOffsets = freeOffsets; - pParam->userCb = userCb; - pParam->userParam = userParam; - if (!async) tsem_init(&pParam->rspSem, 0, 0); - - sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); - if (sendInfo == NULL) goto END; - sendInfo->msgInfo = (SDataBuf){ - .pData = buf, - .len = tlen, - .handle = NULL, - }; - - sendInfo->requestId = generateRequestId(); - sendInfo->requestObjRefId = 0; - sendInfo->param = pParam; - sendInfo->fp = tmqCommitCb; - sendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET; - - SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); - - int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); - - if (!async) { - tsem_wait(&pParam->rspSem); - code = pParam->rspErr; - tsem_destroy(&pParam->rspSem); - taosMemoryFree(pParam); - } else { - code = 0; - } - - // avoid double free if msg is sent - buf = NULL; - -END: - if (buf) taosMemoryFree(buf); - /*if (pParam) taosMemoryFree(pParam);*/ - /*if (sendInfo) taosMemoryFree(sendInfo);*/ - - if (code != 0 && async) { - if (automatic) { - tmq->commitCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, tmq->commitCbUserParam); - } else { - userCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, userParam); - } - } - - if (!async && freeOffsets) { - taosArrayDestroy(pOffsets); - } - return code; -} -#endif - void tmqAssignAskEpTask(void* param, void* tmrId) { tmq_t* tmq = (tmq_t*)param; int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM); @@ -1839,13 +1685,21 @@ int32_t tmq_consumer_close(tmq_t* tmq) { return rsp; } + int32_t retryCnt = 0; tmq_list_t* lst = tmq_list_new(); - rsp = tmq_subscribe(tmq, lst); + while (1) { + rsp = tmq_subscribe(tmq, lst); + if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) { + break; + } else { + retryCnt++; + taosMsleep(500); + } + } + tmq_list_destroy(lst); - if (rsp != 0) { - return rsp; - } + return rsp; } // TODO: free resources return 0; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 34d5e04310b8db9fe3e88032f2dd077ccf8794b4..84a1191ef3f916fb0ef0bf9bd9640eae6e98aeb1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -216,9 +216,11 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) { if (offset.val.type == TMQ_OFFSET__LOG) { STqHandle* pHandle = taosHashGet(pTq->handles, offset.subKey, strlen(offset.subKey)); - if (walRefVer(pHandle->pRef, offset.val.version) < 0) { - ASSERT(0); - return -1; + if (pHandle) { + if (walRefVer(pHandle->pRef, offset.val.version) < 0) { + ASSERT(0); + return -1; + } } } @@ -515,7 +517,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { // todo lock STqHandle* pHandle = taosHashGet(pTq->handles, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { - ASSERT(req.oldConsumerId == -1); + if (req.oldConsumerId != -1) { + tqError("vgId:%d, build new consumer handle %s for consumer %d, but old consumerId is %ld", req.vgId, req.subKey, + req.newConsumerId, req.oldConsumerId); + } ASSERT(req.newConsumerId != -1); STqHandle tqHandle = {0}; pHandle = &tqHandle; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 70065882466e96b4bd86536bf3542939cb52dc55..82da396b30a8941bc8329f2d147b6dd61cb8b7bd 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -47,7 +47,7 @@ void streamCleanUp() { } } -void streamTriggerByTimer(void* param, void* tmrId) { +void streamSchedByTimer(void* param, void* tmrId) { SStreamTask* pTask = (void*)param; if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) { @@ -71,39 +71,17 @@ void streamTriggerByTimer(void* param, void* tmrId) { streamSchedExec(pTask); } - taosTmrReset(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); + taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); } int32_t streamSetupTrigger(SStreamTask* pTask) { if (pTask->triggerParam != 0) { - pTask->timer = taosTmrStart(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer); + pTask->timer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer); pTask->triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE; } return 0; } -#if 0 -int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) { - int8_t schedStatus = atomic_load_8(&pTask->schedStatus); - if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { - SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); - if (pRunReq == NULL) return -1; - - // TODO: do we need htonl? - pRunReq->head.vgId = vgId; - pRunReq->streamId = pTask->streamId; - pRunReq->taskId = pTask->taskId; - SRpcMsg msg = { - .msgType = TDMT_STREAM_TASK_RUN, - .pCont = pRunReq, - .contLen = sizeof(SStreamTaskRunReq), - }; - tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg); - } - return 0; -} -#endif - int32_t streamSchedExec(SStreamTask* pTask) { int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__WAITING); @@ -296,6 +274,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S ASSERT(pTask->execType != TASK_EXEC__NONE); streamSchedExec(pTask); + /*streamTryExec(pTask);*/ /*ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);*/