提交 cb62d4cb 编写于 作者: wmmhello's avatar wmmhello

fix:set firset version to reqOffset of response

上级 31a8af9e
......@@ -772,6 +772,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4004)
#define TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4005)
#define TSDB_CODE_TMQ_SNAPSHOT_ERROR TAOS_DEF_ERROR_CODE(0, 0x4006)
#define TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4007)
#define TSDB_CODE_TMQ_INVALID_VGID TAOS_DEF_ERROR_CODE(0, 0x4008)
#define TSDB_CODE_TMQ_INVALID_TOPIC TAOS_DEF_ERROR_CODE(0, 0x4009)
// stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
......
......@@ -2259,9 +2259,9 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*) res;
STqOffsetVal* pOffset = &pRspObj->rsp.rspOffset;
STqOffsetVal* pOffset = &pRspObj->rsp.reqOffset;
if (pOffset->type == TMQ_OFFSET__LOG) {
return pRspObj->rsp.rspOffset.version;
return pRspObj->rsp.reqOffset.version;
}
} else if (TD_RES_TMQ_META(res)) {
SMqMetaRspObj* pRspObj = (SMqMetaRspObj*)res;
......@@ -2270,8 +2270,8 @@ int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
}
} else if (TD_RES_TMQ_METADATA(res)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*) res;
if (pRspObj->rsp.rspOffset.type == TMQ_OFFSET__LOG) {
return pRspObj->rsp.rspOffset.version;
if (pRspObj->rsp.reqOffset.type == TMQ_OFFSET__LOG) {
return pRspObj->rsp.reqOffset.version;
}
}
......@@ -2761,7 +2761,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
if (pTopic == NULL) {
tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_INVALID_PARA;
return TSDB_CODE_TMQ_INVALID_TOPIC;
}
SMqClientVg* pVg = NULL;
......@@ -2777,7 +2777,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
if (pVg == NULL) {
tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_INVALID_PARA;
return TSDB_CODE_TMQ_INVALID_VGID;
}
SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;
......@@ -2793,7 +2793,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]",
tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_INVALID_PARA;
return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE;
}
// update the offset, and then commit to vnode
......
......@@ -7351,27 +7351,8 @@ void tDeleteMqDataRsp(SMqDataRsp *pRsp) {
}
int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) {
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1;
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->blockNum) < 0) return -1;
if (pRsp->blockNum != 0) {
if (tEncodeI8(pEncoder, pRsp->withTbName) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->withSchema) < 0) return -1;
if (tEncodeMqDataRsp(pEncoder, (const SMqDataRsp *)pRsp) < 0) return -1;
for (int32_t i = 0; i < pRsp->blockNum; i++) {
int32_t bLen = *(int32_t *)taosArrayGet(pRsp->blockDataLen, i);
void *data = taosArrayGetP(pRsp->blockData, i);
if (tEncodeBinary(pEncoder, (const uint8_t *)data, bLen) < 0) return -1;
if (pRsp->withSchema) {
SSchemaWrapper *pSW = (SSchemaWrapper *)taosArrayGetP(pRsp->blockSchema, i);
if (tEncodeSSchemaWrapper(pEncoder, pSW) < 0) return -1;
}
if (pRsp->withTbName) {
char *tbName = (char *)taosArrayGetP(pRsp->blockTbName, i);
if (tEncodeCStr(pEncoder, tbName) < 0) return -1;
}
}
}
if (tEncodeI32(pEncoder, pRsp->createTableNum) < 0) return -1;
if (pRsp->createTableNum) {
for (int32_t i = 0; i < pRsp->createTableNum; i++) {
......@@ -7384,46 +7365,8 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) {
}
int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) {
if (tDecodeSTqOffsetVal(pDecoder, &pRsp->reqOffset) < 0) return -1;
if (tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->blockNum) < 0) return -1;
if (pRsp->blockNum != 0) {
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void *));
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(int32_t));
if (tDecodeI8(pDecoder, &pRsp->withTbName) < 0) return -1;
if (tDecodeI8(pDecoder, &pRsp->withSchema) < 0) return -1;
if (pRsp->withTbName) {
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void *));
}
if (pRsp->withSchema) {
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void *));
}
if (tDecodeMqDataRsp(pDecoder, (SMqDataRsp*)pRsp) < 0) return -1;
for (int32_t i = 0; i < pRsp->blockNum; i++) {
void *data;
uint64_t bLen;
if (tDecodeBinaryAlloc(pDecoder, &data, &bLen) < 0) return -1;
taosArrayPush(pRsp->blockData, &data);
int32_t len = bLen;
taosArrayPush(pRsp->blockDataLen, &len);
if (pRsp->withSchema) {
SSchemaWrapper *pSW = (SSchemaWrapper *)taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pSW == NULL) return -1;
if (tDecodeSSchemaWrapper(pDecoder, pSW) < 0) {
taosMemoryFree(pSW);
return -1;
}
taosArrayPush(pRsp->blockSchema, &pSW);
}
if (pRsp->withTbName) {
char *tbName;
if (tDecodeCStrAlloc(pDecoder, &tbName) < 0) return -1;
taosArrayPush(pRsp->blockTbName, &tbName);
}
}
}
if (tDecodeI32(pDecoder, &pRsp->createTableNum) < 0) return -1;
if (pRsp->createTableNum) {
pRsp->createTableLen = taosArrayInit(pRsp->createTableNum, sizeof(int32_t));
......
......@@ -137,12 +137,11 @@ typedef enum {
} EDndReason;
typedef enum {
CONSUMER_UPDATE_REB_MODIFY_NOTOPIC = 1, // topic do not need modified after rebalance
CONSUMER_UPDATE_REB_MODIFY_TOPIC, // topic need modified after rebalance
CONSUMER_UPDATE_REB_MODIFY_REMOVE, // topic need removed after rebalance
// CONSUMER_UPDATE_TIMER_LOST,
CONSUMER_UPDATE_RECOVER,
CONSUMER_UPDATE_SUB_MODIFY, // modify after subscribe req
CONSUMER_UPDATE_REB = 1, // update after rebalance
CONSUMER_ADD_REB, // add after rebalance
CONSUMER_REMOVE_REB, // remove after rebalance
CONSUMER_UPDATE_REC, // update after recover
CONSUMER_UPDATE_SUB, // update after subscribe req
} ECsmUpdateType;
typedef struct {
......
......@@ -184,7 +184,7 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
}
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup);
pConsumerNew->updateType = CONSUMER_UPDATE_RECOVER;
pConsumerNew->updateType = CONSUMER_UPDATE_REC;
mndReleaseConsumer(pMnode, pConsumer);
......@@ -701,7 +701,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
pConsumerNew->autoCommitInterval = subscribe.autoCommitInterval;
pConsumerNew->resetOffsetCfg = subscribe.resetOffsetCfg;
// pConsumerNew->updateType = CONSUMER_UPDATE_SUB_MODIFY; // use insert logic
// pConsumerNew->updateType = CONSUMER_UPDATE_SUB; // use insert logic
taosArrayDestroy(pConsumerNew->assignedTopics);
pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
......@@ -731,7 +731,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
}
// set the update type
pConsumerNew->updateType = CONSUMER_UPDATE_SUB_MODIFY;
pConsumerNew->updateType = CONSUMER_UPDATE_SUB;
taosArrayDestroy(pConsumerNew->assignedTopics);
pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
......@@ -984,7 +984,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
taosWLockLatch(&pOldConsumer->lock);
if (pNewConsumer->updateType == CONSUMER_UPDATE_SUB_MODIFY) {
if (pNewConsumer->updateType == CONSUMER_UPDATE_SUB) {
TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics);
TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics);
TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics);
......@@ -1004,7 +1004,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
// mInfo("consumer:0x%" PRIx64 " timer update, timer lost. state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d",
// pOldConsumer->consumerId, mndConsumerStatusName(prevStatus), mndConsumerStatusName(pOldConsumer->status),
// pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_RECOVER) {
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REC) {
int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics);
for (int32_t i = 0; i < sz; i++) {
char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i));
......@@ -1013,12 +1013,12 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
mInfo("consumer:0x%" PRIx64 " timer update, timer recover",pOldConsumer->consumerId);
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_NOTOPIC) {
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB) {
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
pOldConsumer->rebalanceTime = taosGetTimestampMs();
mInfo("consumer:0x%" PRIx64 " reb update, only rebalance time", pOldConsumer->consumerId);
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_TOPIC) {
} else if (pNewConsumer->updateType == CONSUMER_ADD_REB) {
char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
// check if exist in current topic
......@@ -1049,7 +1049,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
(int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
(int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
} else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_REMOVE) {
} else if (pNewConsumer->updateType == CONSUMER_REMOVE_REB) {
char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
// remove from removed topic
......
......@@ -597,7 +597,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
for (int32_t i = 0; i < consumerNum; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->modifyConsumers, i);
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
pConsumerNew->updateType = CONSUMER_UPDATE_REB_MODIFY_NOTOPIC;
pConsumerNew->updateType = CONSUMER_UPDATE_REB;
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
tDeleteSMqConsumerObj(pConsumerNew, true);
......@@ -613,7 +613,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
for (int32_t i = 0; i < consumerNum; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->newConsumers, i);
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
pConsumerNew->updateType = CONSUMER_UPDATE_REB_MODIFY_TOPIC;
pConsumerNew->updateType = CONSUMER_ADD_REB;
char* topicTmp = taosStrdup(topic);
taosArrayPush(pConsumerNew->rebNewTopics, &topicTmp);
......@@ -633,7 +633,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i);
SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
pConsumerNew->updateType = CONSUMER_UPDATE_REB_MODIFY_REMOVE;
pConsumerNew->updateType = CONSUMER_REMOVE_REB;
char* topicTmp = taosStrdup(topic);
taosArrayPush(pConsumerNew->rebRemovedTopics, &topicTmp);
......
......@@ -157,18 +157,23 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return 0;
}
static void setRequestVersion(STqOffsetVal* offset, int64_t ver){
if(offset->type == TMQ_OFFSET__LOG){
offset->version = ver + 1;
}
}
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
uint64_t consumerId = pRequest->consumerId;
int32_t vgId = TD_VID(pTq->pVnode);
int code = 0;
terrno = 0;
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest);
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
goto end;
}
......@@ -183,11 +188,10 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
code = tqRegisterPushHandle(pTq, pHandle, pMsg);
taosWUnLockLatch(&pTq->lock);
goto end;
} else {
taosWUnLockLatch(&pTq->lock);
}
taosWUnLockLatch(&pTq->lock);
}
setRequestVersion(&dataRsp.reqOffset, pOffset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId);
end : {
......@@ -261,6 +265,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
goto end;
}
......@@ -273,6 +278,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (pHead->msgType != TDMT_VND_SUBMIT) {
if (totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1);
setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
goto end;
}
......@@ -302,6 +308,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
setRequestVersion(&taosxRsp.reqOffset, offset->version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
goto end;
} else {
......
......@@ -629,6 +629,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_INVALID_FILE, "Index file is inval
//tmq
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SNAPSHOT_ERROR, "Can not operate in snapshot mode")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE, "Offset out of range")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_VGID, "VgId does not belong to this consumer")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_TOPIC, "Topic does not belong to this consumer")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册