提交 96b93266 编写于 作者: wmmhello's avatar wmmhello

fix:[TS-3756] assignment is same even though there are data

上级 56679a7b
...@@ -2119,7 +2119,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { ...@@ -2119,7 +2119,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
rspObj = tmqHandleAllRsp(tmq, timeout, false); rspObj = tmqHandleAllRsp(tmq, timeout, false);
if (rspObj) { if (rspObj) {
tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj); tscInfo("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
return (TAOS_RES*)rspObj; return (TAOS_RES*)rspObj;
} else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
tscInfo("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId); tscInfo("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId);
...@@ -2878,7 +2878,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ ...@@ -2878,7 +2878,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
sendInfo->msgType = TDMT_VND_TMQ_SEEK; sendInfo->msgType = TDMT_VND_TMQ_SEEK;
int64_t transporterId = 0; int64_t transporterId = 0;
tscInfo("consumer:0x%" PRIx64 " %s send seek info vgId:%d, epoch %d" PRIx64, tscInfo("consumer:0x%" PRIx64 " %s send seek info vgId:%d, epoch %d",
tmq->consumerId, pTopic->topicName, vgId, tmq->epoch); tmq->consumerId, pTopic->topicName, vgId, tmq->epoch);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
taosWUnLockLatch(&tmq->lock); taosWUnLockLatch(&tmq->lock);
......
...@@ -1088,8 +1088,8 @@ TEST(clientCase, td_25129) { ...@@ -1088,8 +1088,8 @@ TEST(clientCase, td_25129) {
tmq_conf_set(conf, "group.id", "group_id_2"); tmq_conf_set(conf, "group.id", "group_id_2");
tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest"); tmq_conf_set(conf, "auto.offset.reset", "latest");
tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set(conf, "msg.with.table.name", "false");
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf); tmq_conf_destroy(conf);
...@@ -1107,7 +1107,7 @@ TEST(clientCase, td_25129) { ...@@ -1107,7 +1107,7 @@ TEST(clientCase, td_25129) {
int32_t precision = 0; int32_t precision = 0;
int32_t totalRows = 0; int32_t totalRows = 0;
int32_t msgCnt = 0; int32_t msgCnt = 0;
int32_t timeout = 2000; int32_t timeout = 200;
int32_t count = 0; int32_t count = 0;
...@@ -1177,7 +1177,10 @@ TEST(clientCase, td_25129) { ...@@ -1177,7 +1177,10 @@ TEST(clientCase, td_25129) {
printSubResults(pRes, &totalRows); printSubResults(pRes, &totalRows);
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); tmq_topic_assignment* pAssignTmp = NULL;
int32_t numOfAssignTmp = 0;
code = tmq_get_topic_assignment(tmq, "tp", &pAssignTmp, &numOfAssignTmp);
if (code != 0) { if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code)); printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign); tmq_free_assignment(pAssign);
...@@ -1188,12 +1191,28 @@ TEST(clientCase, td_25129) { ...@@ -1188,12 +1191,28 @@ TEST(clientCase, td_25129) {
} }
for(int i = 0; i < numOfAssign; i++){ for(int i = 0; i < numOfAssign; i++){
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssignTmp[i].vgId, pAssignTmp[i].currentOffset, pAssignTmp[i].begin, pAssignTmp[i].end);
} }
if(numOfAssign != 0){
int i = 0;
for(; i < numOfAssign; i++){
if(pAssign[i].currentOffset != pAssignTmp[i].currentOffset){
break;
}
}
if(i == numOfAssign){
printf("all position is same\n");
break;
}
tmq_free_assignment(pAssign);
}
numOfAssign = numOfAssignTmp;
pAssign = pAssignTmp;
} else { } else {
tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset); // tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset);
tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset); // tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset);
tmq_commit_sync(tmq, pRes); // tmq_commit_sync(tmq, pRes);
continue; continue;
} }
...@@ -1204,7 +1223,7 @@ TEST(clientCase, td_25129) { ...@@ -1204,7 +1223,7 @@ TEST(clientCase, td_25129) {
// break; // break;
// } // }
} else { } else {
break; // break;
} }
// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].begin); // tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].begin);
......
...@@ -157,9 +157,12 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand ...@@ -157,9 +157,12 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
return 0; return 0;
} }
static void setRequestVersion(STqOffsetVal* offset, int64_t ver){ static void setRequestVersion(STqOffsetVal* offset, int64_t verStart, int64_t verEnd){
if(offset->type == TMQ_OFFSET__LOG){ if(offset->type == TMQ_OFFSET__LOG){
offset->version = ver + 1; offset->version = verStart + 1;
}
if(offset->version > verEnd){
offset->version = verEnd;
} }
} }
...@@ -192,7 +195,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -192,7 +195,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
} }
taosWUnLockLatch(&pTq->lock); taosWUnLockLatch(&pTq->lock);
} }
setRequestVersion(&dataRsp.reqOffset, pOffset->version); setRequestVersion(&dataRsp.reqOffset, pOffset->version, dataRsp.rspOffset.version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId);
end : { end : {
...@@ -267,7 +270,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -267,7 +270,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
setRequestVersion(&taosxRsp.reqOffset, offset->version); setRequestVersion(&taosxRsp.reqOffset, offset->version, taosxRsp.rspOffset.version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
goto end; goto end;
} }
...@@ -280,7 +283,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -280,7 +283,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (pHead->msgType != TDMT_VND_SUBMIT) { if (pHead->msgType != TDMT_VND_SUBMIT) {
if (totalRows > 0) { if (totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1);
setRequestVersion(&taosxRsp.reqOffset, offset->version); setRequestVersion(&taosxRsp.reqOffset, offset->version, taosxRsp.rspOffset.version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
goto end; goto end;
} }
...@@ -310,7 +313,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -310,7 +313,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) { if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
setRequestVersion(&taosxRsp.reqOffset, offset->version); setRequestVersion(&taosxRsp.reqOffset, offset->version, taosxRsp.rspOffset.version);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
goto end; goto end;
} else { } else {
......
...@@ -76,7 +76,8 @@ int32_t walNextValidMsg(SWalReader *pReader) { ...@@ -76,7 +76,8 @@ int32_t walNextValidMsg(SWalReader *pReader) {
wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer); wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64, pReader->pWal->cfg.vgId, appliedVer, committedVer);
} }
int64_t endVer = TMIN(appliedVer, committedVer); int64_t endVer = committedVer;
// int64_t endVer = TMIN(appliedVer, committedVer);
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
", applied index:%" PRId64", end index:%" PRId64, ", applied index:%" PRId64", end index:%" PRId64,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册