diff --git a/include/util/taoserror.h b/include/util/taoserror.h index ffeabc5684c7ac1f98f2adb43477223c3503126d..8b10e4217cddc74868b1a726b2f971afe13ce9df 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -772,6 +772,9 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4004) #define TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4005) #define TSDB_CODE_TMQ_SNAPSHOT_ERROR TAOS_DEF_ERROR_CODE(0, 0x4006) +#define TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4007) +#define TSDB_CODE_TMQ_INVALID_VGID TAOS_DEF_ERROR_CODE(0, 0x4008) +#define TSDB_CODE_TMQ_INVALID_TOPIC TAOS_DEF_ERROR_CODE(0, 0x4009) // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 0821719f4eb4f7ef8452ccfd42b357f681f30037..78f45be6bff7631766c27e591a53cc908c1e8eaa 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2259,9 +2259,9 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) { int64_t tmq_get_vgroup_offset(TAOS_RES* res) { if (TD_RES_TMQ(res)) { SMqRspObj* pRspObj = (SMqRspObj*) res; - STqOffsetVal* pOffset = &pRspObj->rsp.rspOffset; + STqOffsetVal* pOffset = &pRspObj->rsp.reqOffset; if (pOffset->type == TMQ_OFFSET__LOG) { - return pRspObj->rsp.rspOffset.version; + return pRspObj->rsp.reqOffset.version; } } else if (TD_RES_TMQ_META(res)) { SMqMetaRspObj* pRspObj = (SMqMetaRspObj*)res; @@ -2270,8 +2270,8 @@ int64_t tmq_get_vgroup_offset(TAOS_RES* res) { } } else if (TD_RES_TMQ_METADATA(res)) { SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*) res; - if (pRspObj->rsp.rspOffset.type == TMQ_OFFSET__LOG) { - return pRspObj->rsp.rspOffset.version; + if (pRspObj->rsp.reqOffset.type == TMQ_OFFSET__LOG) { + return pRspObj->rsp.reqOffset.version; } } @@ -2761,7 +2761,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ if (pTopic == NULL) { tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_INVALID_PARA; + return TSDB_CODE_TMQ_INVALID_TOPIC; } SMqClientVg* pVg = NULL; @@ -2777,7 +2777,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ if (pVg == NULL) { tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId); taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_INVALID_PARA; + return TSDB_CODE_TMQ_INVALID_VGID; } SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; @@ -2793,7 +2793,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd); taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_INVALID_PARA; + return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE; } // update the offset, and then commit to vnode diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 38806b6042461c483de7d0d5b842261f4a26f822..f6b3d0ca498ec1f5e1ccdec06d765e7f37029c96 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7351,27 +7351,8 @@ void tDeleteMqDataRsp(SMqDataRsp *pRsp) { } int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) { - if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1; - if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->blockNum) < 0) return -1; - if (pRsp->blockNum != 0) { - if (tEncodeI8(pEncoder, pRsp->withTbName) < 0) return -1; - if (tEncodeI8(pEncoder, pRsp->withSchema) < 0) return -1; + if (tEncodeMqDataRsp(pEncoder, (const SMqDataRsp *)pRsp) < 0) return -1; - for (int32_t i = 0; i < pRsp->blockNum; i++) { - int32_t bLen = *(int32_t *)taosArrayGet(pRsp->blockDataLen, i); - void *data = taosArrayGetP(pRsp->blockData, i); - if (tEncodeBinary(pEncoder, (const uint8_t *)data, bLen) < 0) return -1; - if (pRsp->withSchema) { - SSchemaWrapper *pSW = (SSchemaWrapper *)taosArrayGetP(pRsp->blockSchema, i); - if (tEncodeSSchemaWrapper(pEncoder, pSW) < 0) return -1; - } - if (pRsp->withTbName) { - char *tbName = (char *)taosArrayGetP(pRsp->blockTbName, i); - if (tEncodeCStr(pEncoder, tbName) < 0) return -1; - } - } - } if (tEncodeI32(pEncoder, pRsp->createTableNum) < 0) return -1; if (pRsp->createTableNum) { for (int32_t i = 0; i < pRsp->createTableNum; i++) { @@ -7384,46 +7365,8 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) { } int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) { - if (tDecodeSTqOffsetVal(pDecoder, &pRsp->reqOffset) < 0) return -1; - if (tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->blockNum) < 0) return -1; - if (pRsp->blockNum != 0) { - pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void *)); - pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(int32_t)); - if (tDecodeI8(pDecoder, &pRsp->withTbName) < 0) return -1; - if (tDecodeI8(pDecoder, &pRsp->withSchema) < 0) return -1; - if (pRsp->withTbName) { - pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void *)); - } - if (pRsp->withSchema) { - pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void *)); - } + if (tDecodeMqDataRsp(pDecoder, (SMqDataRsp*)pRsp) < 0) return -1; - for (int32_t i = 0; i < pRsp->blockNum; i++) { - void *data; - uint64_t bLen; - if (tDecodeBinaryAlloc(pDecoder, &data, &bLen) < 0) return -1; - taosArrayPush(pRsp->blockData, &data); - int32_t len = bLen; - taosArrayPush(pRsp->blockDataLen, &len); - - if (pRsp->withSchema) { - SSchemaWrapper *pSW = (SSchemaWrapper *)taosMemoryCalloc(1, sizeof(SSchemaWrapper)); - if (pSW == NULL) return -1; - if (tDecodeSSchemaWrapper(pDecoder, pSW) < 0) { - taosMemoryFree(pSW); - return -1; - } - taosArrayPush(pRsp->blockSchema, &pSW); - } - - if (pRsp->withTbName) { - char *tbName; - if (tDecodeCStrAlloc(pDecoder, &tbName) < 0) return -1; - taosArrayPush(pRsp->blockTbName, &tbName); - } - } - } if (tDecodeI32(pDecoder, &pRsp->createTableNum) < 0) return -1; if (pRsp->createTableNum) { pRsp->createTableLen = taosArrayInit(pRsp->createTableNum, sizeof(int32_t)); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 696549fa05633cde2e0f500272e259f3613fe603..44dbfe6b12e8afb934ff55528025c24344335593 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -137,12 +137,11 @@ typedef enum { } EDndReason; typedef enum { - CONSUMER_UPDATE_REB_MODIFY_NOTOPIC = 1, // topic do not need modified after rebalance - CONSUMER_UPDATE_REB_MODIFY_TOPIC, // topic need modified after rebalance - CONSUMER_UPDATE_REB_MODIFY_REMOVE, // topic need removed after rebalance -// CONSUMER_UPDATE_TIMER_LOST, - CONSUMER_UPDATE_RECOVER, - CONSUMER_UPDATE_SUB_MODIFY, // modify after subscribe req + CONSUMER_UPDATE_REB = 1, // update after rebalance + CONSUMER_ADD_REB, // add after rebalance + CONSUMER_REMOVE_REB, // remove after rebalance + CONSUMER_UPDATE_REC, // update after recover + CONSUMER_UPDATE_SUB, // update after subscribe req } ECsmUpdateType; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index bdf9931ca25cc61c20580ccfe02cc8ae6eee87eb..2b538eccc9f9f279dd74a6991be7e0486184e499 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -184,7 +184,7 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { } SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup); - pConsumerNew->updateType = CONSUMER_UPDATE_RECOVER; + pConsumerNew->updateType = CONSUMER_UPDATE_REC; mndReleaseConsumer(pMnode, pConsumer); @@ -701,7 +701,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { pConsumerNew->autoCommitInterval = subscribe.autoCommitInterval; pConsumerNew->resetOffsetCfg = subscribe.resetOffsetCfg; -// pConsumerNew->updateType = CONSUMER_UPDATE_SUB_MODIFY; // use insert logic +// pConsumerNew->updateType = CONSUMER_UPDATE_SUB; // use insert logic taosArrayDestroy(pConsumerNew->assignedTopics); pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup); @@ -731,7 +731,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { } // set the update type - pConsumerNew->updateType = CONSUMER_UPDATE_SUB_MODIFY; + pConsumerNew->updateType = CONSUMER_UPDATE_SUB; taosArrayDestroy(pConsumerNew->assignedTopics); pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup); @@ -984,7 +984,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, taosWLockLatch(&pOldConsumer->lock); - if (pNewConsumer->updateType == CONSUMER_UPDATE_SUB_MODIFY) { + if (pNewConsumer->updateType == CONSUMER_UPDATE_SUB) { TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics); TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics); TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics); @@ -1004,7 +1004,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, // mInfo("consumer:0x%" PRIx64 " timer update, timer lost. state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d", // pOldConsumer->consumerId, mndConsumerStatusName(prevStatus), mndConsumerStatusName(pOldConsumer->status), // pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics)); - } else if (pNewConsumer->updateType == CONSUMER_UPDATE_RECOVER) { + } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REC) { int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics); for (int32_t i = 0; i < sz; i++) { char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i)); @@ -1013,12 +1013,12 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; mInfo("consumer:0x%" PRIx64 " timer update, timer recover",pOldConsumer->consumerId); - } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_NOTOPIC) { + } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) { atomic_add_fetch_32(&pOldConsumer->epoch, 1); pOldConsumer->rebalanceTime = taosGetTimestampMs(); mInfo("consumer:0x%" PRIx64 " reb update, only rebalance time", pOldConsumer->consumerId); - } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_TOPIC) { + } else if (pNewConsumer->updateType == CONSUMER_ADD_REB) { char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0)); // check if exist in current topic @@ -1049,7 +1049,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics), (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics)); - } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_REMOVE) { + } else if (pNewConsumer->updateType == CONSUMER_REMOVE_REB) { char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0); // remove from removed topic diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 48de21199b9bc1ae0611b97ba270fa802ac65a8e..b2235c8b503d503d63a03c066931d4cd7e81c356 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -597,7 +597,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu for (int32_t i = 0; i < consumerNum; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->modifyConsumers, i); SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); - pConsumerNew->updateType = CONSUMER_UPDATE_REB_MODIFY_NOTOPIC; + pConsumerNew->updateType = CONSUMER_UPDATE_REB; if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { tDeleteSMqConsumerObj(pConsumerNew, true); @@ -613,7 +613,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu for (int32_t i = 0; i < consumerNum; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->newConsumers, i); SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); - pConsumerNew->updateType = CONSUMER_UPDATE_REB_MODIFY_TOPIC; + pConsumerNew->updateType = CONSUMER_ADD_REB; char* topicTmp = taosStrdup(topic); taosArrayPush(pConsumerNew->rebNewTopics, &topicTmp); @@ -633,7 +633,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i); SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); - pConsumerNew->updateType = CONSUMER_UPDATE_REB_MODIFY_REMOVE; + pConsumerNew->updateType = CONSUMER_REMOVE_REB; char* topicTmp = taosStrdup(topic); taosArrayPush(pConsumerNew->rebRemovedTopics, &topicTmp); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 34c5112eeeb2dc9ef9efe535f51c659e60bd7b36..4365cf63a7349714cbd48acc1d673902d73f7c2a 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -157,18 +157,23 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand return 0; } +static void setRequestVersion(STqOffsetVal* offset, int64_t ver){ + if(offset->type == TMQ_OFFSET__LOG){ + offset->version = ver + 1; + } +} + static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal* pOffset) { uint64_t consumerId = pRequest->consumerId; int32_t vgId = TD_VID(pTq->pVnode); - int code = 0; terrno = 0; SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, pRequest); qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); - code = tqScanData(pTq, pHandle, &dataRsp, pOffset); + int code = tqScanData(pTq, pHandle, &dataRsp, pOffset); if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) { goto end; } @@ -183,11 +188,10 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, code = tqRegisterPushHandle(pTq, pHandle, pMsg); taosWUnLockLatch(&pTq->lock); goto end; - } else { - taosWUnLockLatch(&pTq->lock); } + taosWUnLockLatch(&pTq->lock); } - + setRequestVersion(&dataRsp.reqOffset, pOffset->version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId); end : { @@ -261,6 +265,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); + setRequestVersion(&taosxRsp.reqOffset, offset->version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); goto end; } @@ -273,6 +278,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (pHead->msgType != TDMT_VND_SUBMIT) { if (totalRows > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1); + setRequestVersion(&taosxRsp.reqOffset, offset->version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); goto end; } @@ -302,6 +308,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (totalRows >= 4096 || taosxRsp.createTableNum > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); + setRequestVersion(&taosxRsp.reqOffset, offset->version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); goto end; } else { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index f0c7b22bb1bfc5a1329763c2772ea0fac932823a..4c52f89bdc01c969353815b35d686c281fbc83d7 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -629,6 +629,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_INVALID_FILE, "Index file is inval //tmq TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SNAPSHOT_ERROR, "Can not operate in snapshot mode") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE, "Offset out of range") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_VGID, "VgId does not belong to this consumer") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_TOPIC, "Topic does not belong to this consumer") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log")