diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index f17df3e6f1ffb906ef729f50769c80449a06c38f..426a62433b640d7075a3c09c4e707a9cd3abfee6 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -911,7 +911,7 @@ CREATE_MSG_FAIL: bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { /*printf("call update ep %d\n", epoch);*/ - /*printf("tmq update ep epoch %d to epoch %d\n", tmq->epoch, epoch);*/ + tscDebug("tmq update ep epoch %d to epoch %d", tmq->epoch, epoch); bool set = false; int32_t topicNumGet = taosArrayGetSize(pRsp->topics); char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; @@ -984,6 +984,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; tmq_t* tmq = pParam->tmq; + tscDebug("consumer %ld recv ep", tmq->consumerId); if (code != 0) { tscError("get topic endpoint error, not ready, wait:%d\n", pParam->sync); goto END; @@ -1032,12 +1033,14 @@ END: int32_t tmqAskEp(tmq_t* tmq, bool sync) { int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1); if (epStatus == 1) { + tscDebug("consumer %ld skip ask ep", tmq->consumerId); return 0; } int32_t tlen = sizeof(SMqCMGetSubEpReq); SMqCMGetSubEpReq* req = taosMemoryMalloc(tlen); if (req == NULL) { tscError("failed to malloc get subscribe ep buf"); + atomic_store_8(&tmq->epStatus, 0); return -1; } req->consumerId = htobe64(tmq->consumerId); @@ -1048,6 +1051,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) { if (pParam == NULL) { tscError("failed to malloc subscribe param"); taosMemoryFree(req); + atomic_store_8(&tmq->epStatus, 0); return -1; } pParam->tmq = tmq; @@ -1059,6 +1063,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) { tsem_destroy(&pParam->rspSem); taosMemoryFree(pParam); taosMemoryFree(req); + atomic_store_8(&tmq->epStatus, 0); return -1; } @@ -1076,6 +1081,8 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) { SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); + tscDebug("consumer %ld ask ep", tmq->consumerId); + int64_t transporterId = 0; asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); @@ -1214,7 +1221,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); if (vgStatus != TMQ_VG_STATUS__IDLE) { - tscDebug("skip vg %d", pVg->vgId); + tscDebug("consumer %ld skip vg %d", tmq->consumerId, pVg->vgId); continue; } SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); @@ -1258,7 +1265,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { int64_t transporterId = 0; /*printf("send poll\n");*/ atomic_add_fetch_32(&tmq->waitingRequest, 1); - tscDebug("tmq send poll: vg %d, req offset %ld", pVg->vgId, pVg->currentOffset); + tscDebug("consumer %ld send poll: vg %d, req offset %ld", tmq->consumerId, pVg->vgId, pVg->currentOffset); /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/ asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); pVg->pollCnt++; @@ -1322,7 +1329,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese tmqHandleNoPollRsp(tmq, rspHead, &reset); taosFreeQitem(rspHead); if (pollIfReset && reset) { - tscDebug("reset and repoll\n"); + tscDebug("consumer %ld reset and repoll", tmq->consumerId); tmqPollImpl(tmq, blockingTime); } } @@ -1561,24 +1568,3 @@ TAOS_ROW tmq_get_row(tmq_message_t* message) { } char* tmq_get_topic_name(tmq_message_t* message) { return "not implemented yet"; } - -#if 0 -tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) { - tmq_t* pTmq = taosMemoryMalloc(sizeof(tmq_t)); - if (pTmq == NULL) { - return NULL; - } - strcpy(pTmq->groupId, conf->groupId); - strcpy(pTmq->clientId, conf->clientId); - pTmq->pTscObj = (STscObj*)conn; - pTmq->pTscObj->connType = HEARTBEAT_TYPE_MQ; - return pTmq; -} - - -static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { - assert(pMsgBody != NULL); - taosMemoryFreeClear(pMsgBody->msgInfo.pData); - taosMemoryFreeClear(pMsgBody); -} -#endif diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 1afbb740676f75c27002895a870a352efe58ca09..211163ce35e5e79d5c8cba611ec6ee76b0104ca9 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -507,7 +507,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { // TODO: log rebalance statistics SSdbRaw *pSubRaw = mndSubActionEncode(pSub); - sdbSetRawStatus(pSubRaw, SDB_STATUS_UPDATING); + sdbSetRawStatus(pSubRaw, SDB_STATUS_READY); mndTransAppendRedolog(pTrans, pSubRaw); } mndReleaseSubscribe(pMnode, pSub); diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 204cd870f4b01683891f6b04155f7a2b402292b0..dc2c12a2c4f8f07aaf39a00d6e8423fd0682cc45 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -40,6 +40,10 @@ const char *sdbTableName(ESdbType type) { return "auth"; case SDB_ACCT: return "acct"; + case SDB_STREAM: + return "stream"; + case SDB_OFFSET: + return "offset"; case SDB_SUBSCRIBE: return "subscribe"; case SDB_CONSUMER: