未验证 提交 ea875c58 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #22263 from taosdata/mark/tmq-main

fix:add committed & seek process logic
...@@ -1251,7 +1251,8 @@ TEST(clientCase, td_25129) { ...@@ -1251,7 +1251,8 @@ TEST(clientCase, td_25129) {
} }
for(int i = 0; i < numOfAssign; i++){ for(int i = 0; i < numOfAssign; i++){
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); int64_t committed = tmq_committed(tmq, topicName, pAssign[i].vgId);
printf("assign i:%d, vgId:%d, committed:%lld, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, committed, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
} }
while (1) { while (1) {
......
...@@ -692,6 +692,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { ...@@ -692,6 +692,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
taosArrayDestroy(rebOutput.modifyConsumers); taosArrayDestroy(rebOutput.modifyConsumers);
taosArrayDestroy(rebOutput.rebVgs); taosArrayDestroy(rebOutput.rebVgs);
taosHashCancelIterate(pReq->rebSubHash, pIter);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mInfo("mq re-balance failed, due to out of memory"); mInfo("mq re-balance failed, due to out of memory");
taosHashCleanup(pReq->rebSubHash); taosHashCleanup(pReq->rebSubHash);
......
...@@ -336,10 +336,10 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t ...@@ -336,10 +336,10 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
STqOffset* pOffset = &vgOffset.offset; STqOffset* pOffset = &vgOffset.offset;
if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) { if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
tqInfo("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64, tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts); pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
} else if (pOffset->val.type == TMQ_OFFSET__LOG) { } else if (pOffset->val.type == TMQ_OFFSET__LOG) {
tqInfo("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId, tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
pOffset->val.version); pOffset->val.version);
} else { } else {
tqError("invalid commit offset type:%d", pOffset->val.type); tqError("invalid commit offset type:%d", pOffset->val.type);
...@@ -367,12 +367,12 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -367,12 +367,12 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
SRpcMsg rsp = {.info = pMsg->info}; SRpcMsg rsp = {.info = pMsg->info};
int code = 0; int code = 0;
tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey);
if (tDeserializeSMqSeekReq(pMsg->pCont, pMsg->contLen, &req) < 0) { if (tDeserializeSMqSeekReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto end; goto end;
} }
tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey);
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) { if (pHandle == NULL) {
tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey); tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey);
......
...@@ -624,6 +624,10 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { ...@@ -624,6 +624,10 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
// return tqProcessPollReq(pVnode->pTq, pMsg); // return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_VG_WALINFO: case TDMT_VND_TMQ_VG_WALINFO:
return tqProcessVgWalInfoReq(pVnode->pTq, pMsg); return tqProcessVgWalInfoReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_VG_COMMITTEDINFO:
return tqProcessVgCommittedInfoReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_SEEK:
return tqProcessSeekReq(pVnode->pTq, pMsg);
default: default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType); vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册