提交 cf64d4c9 编写于 作者: wmmhello's avatar wmmhello

fix:set get_assignment offset to first version of response block

上级 8dd7f369
...@@ -3371,6 +3371,12 @@ typedef struct { ...@@ -3371,6 +3371,12 @@ typedef struct {
int8_t reserved; int8_t reserved;
} SMqHbRsp; } SMqHbRsp;
typedef struct {
SMsgHead head;
int64_t consumerId;
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
} SMqSeekReq;
#define TD_AUTO_CREATE_TABLE 0x1 #define TD_AUTO_CREATE_TABLE 0x1
typedef struct { typedef struct {
int64_t suid; int64_t suid;
...@@ -3500,6 +3506,8 @@ int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq); ...@@ -3500,6 +3506,8 @@ int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq); int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq);
int32_t tDeatroySMqHbReq(SMqHbReq* pReq); int32_t tDeatroySMqHbReq(SMqHbReq* pReq);
int32_t tSerializeSMqSeekReq(void *buf, int32_t bufLen, SMqSeekReq *pReq);
int32_t tDeserializeSMqSeekReq(void *buf, int32_t bufLen, SMqSeekReq *pReq);
#define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1 #define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1
#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2 #define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2
......
...@@ -306,7 +306,7 @@ enum { ...@@ -306,7 +306,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SUBSCRIBE, "vnode-tmq-subscribe", SMqRebVgReq, SMqRebVgRsp) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SUBSCRIBE, "vnode-tmq-subscribe", SMqRebVgReq, SMqRebVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DELETE_SUB, "vnode-tmq-delete-sub", SMqVDeleteReq, SMqVDeleteRsp) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DELETE_SUB, "vnode-tmq-delete-sub", SMqVDeleteReq, SMqVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_COMMIT_OFFSET, "vnode-tmq-commit-offset", STqOffset, STqOffset) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_COMMIT_OFFSET, "vnode-tmq-commit-offset", STqOffset, STqOffset)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SEEK_TO_OFFSET, "vnode-tmq-seekto-offset", STqOffset, STqOffset) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SEEK, "vnode-tmq-seek", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_ADD_CHECKINFO, "vnode-tmq-add-checkinfo", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_ADD_CHECKINFO, "vnode-tmq-add-checkinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DEL_CHECKINFO, "vnode-del-checkinfo", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DEL_CHECKINFO, "vnode-del-checkinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp) TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp)
......
...@@ -140,6 +140,7 @@ enum { ...@@ -140,6 +140,7 @@ enum {
typedef struct SVgOffsetInfo { typedef struct SVgOffsetInfo {
STqOffsetVal committedOffset; STqOffsetVal committedOffset;
STqOffsetVal currentOffset; STqOffsetVal currentOffset;
STqOffsetVal seekOffset; // the first version in block for seek operation
int64_t walVerBegin; int64_t walVerBegin;
int64_t walVerEnd; int64_t walVerEnd;
} SVgOffsetInfo; } SVgOffsetInfo;
...@@ -214,6 +215,11 @@ typedef struct SMqVgCommon { ...@@ -214,6 +215,11 @@ typedef struct SMqVgCommon {
int32_t code; int32_t code;
} SMqVgCommon; } SMqVgCommon;
typedef struct SMqSeekParam {
tsem_t sem;
int32_t code;
} SMqSeekParam;
typedef struct SMqVgWalInfoParam { typedef struct SMqVgWalInfoParam {
int32_t vgId; int32_t vgId;
int32_t epoch; int32_t epoch;
...@@ -821,7 +827,7 @@ void tmqSendHbReq(void* param, void* tmrId) { ...@@ -821,7 +827,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1); OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
offRows->vgId = pVg->vgId; offRows->vgId = pVg->vgId;
offRows->rows = pVg->numOfRows; offRows->rows = pVg->numOfRows;
offRows->offset = pVg->offsetInfo.currentOffset; offRows->offset = pVg->offsetInfo.seekOffset;
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset); tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
tscInfo("consumer:0x%" PRIx64 ",report offset: vgId:%d, offset:%s, rows:%"PRId64, tmq->consumerId, offRows->vgId, buf, offRows->rows); tscInfo("consumer:0x%" PRIx64 ",report offset: vgId:%d, offset:%s, rows:%"PRId64, tmq->consumerId, offRows->vgId, buf, offRows->rows);
...@@ -1479,6 +1485,7 @@ CREATE_MSG_FAIL: ...@@ -1479,6 +1485,7 @@ CREATE_MSG_FAIL:
typedef struct SVgroupSaveInfo { typedef struct SVgroupSaveInfo {
STqOffsetVal currentOffset; STqOffsetVal currentOffset;
STqOffsetVal commitOffset; STqOffsetVal commitOffset;
STqOffsetVal seekOffset;
int64_t numOfRows; int64_t numOfRows;
} SVgroupSaveInfo; } SVgroupSaveInfo;
...@@ -1518,6 +1525,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic ...@@ -1518,6 +1525,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
clientVg.offsetInfo.currentOffset = pInfo ? pInfo->currentOffset : offsetNew; clientVg.offsetInfo.currentOffset = pInfo ? pInfo->currentOffset : offsetNew;
clientVg.offsetInfo.committedOffset = pInfo ? pInfo->commitOffset : offsetNew; clientVg.offsetInfo.committedOffset = pInfo ? pInfo->commitOffset : offsetNew;
clientVg.offsetInfo.seekOffset = pInfo ? pInfo->seekOffset : offsetNew;
clientVg.offsetInfo.walVerBegin = -1; clientVg.offsetInfo.walVerBegin = -1;
clientVg.offsetInfo.walVerEnd = -1; clientVg.offsetInfo.walVerEnd = -1;
clientVg.seekUpdated = false; clientVg.seekUpdated = false;
...@@ -1577,7 +1585,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) ...@@ -1577,7 +1585,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
vgKey, buf); vgKey, buf);
SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.currentOffset, .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows}; SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.currentOffset, .seekOffset = pVgCur->offsetInfo.seekOffset, .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows};
taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)); taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo));
} }
} }
...@@ -1879,10 +1887,11 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p ...@@ -1879,10 +1887,11 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p
return 0; return 0;
} }
static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* offset, int64_t sver, int64_t ever, int64_t consumerId){ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId){
if (!pVg->seekUpdated) { if (!pVg->seekUpdated) {
tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId); tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId);
pVg->offsetInfo.currentOffset = *offset; pVg->offsetInfo.seekOffset = *reqOffset;
pVg->offsetInfo.currentOffset = *rspOffset;
} else { } else {
tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId); tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId);
} }
...@@ -1944,7 +1953,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1944,7 +1953,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pVg->epSet = *pollRspWrapper->pEpset; pVg->epSet = *pollRspWrapper->pEpset;
} }
updateVgInfo(pVg, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId); updateVgInfo(pVg, &pDataRsp->reqOffset, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId);
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset); tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset);
...@@ -1994,7 +2003,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -1994,7 +2003,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return NULL; return NULL;
} }
updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId); updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId);
// build rsp // build rsp
SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
taosFreeQitem(pollRspWrapper); taosFreeQitem(pollRspWrapper);
...@@ -2022,7 +2031,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { ...@@ -2022,7 +2031,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
return NULL; return NULL;
} }
updateVgInfo(pVg, &pollRspWrapper->taosxRsp.rspOffset, pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId); updateVgInfo(pVg, &pollRspWrapper->taosxRsp.reqOffset, &pollRspWrapper->taosxRsp.rspOffset, pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId);
if (pollRspWrapper->taosxRsp.blockNum == 0) { if (pollRspWrapper->taosxRsp.blockNum == 0) {
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64, tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64,
...@@ -2545,6 +2554,8 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -2545,6 +2554,8 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
tsem_post(&pCommon->rsp); tsem_post(&pCommon->rsp);
} }
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pParam); taosMemoryFree(pParam);
return 0; return 0;
} }
...@@ -2615,7 +2626,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a ...@@ -2615,7 +2626,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
} }
tmq_topic_assignment* pAssignment = &(*assignment)[j]; tmq_topic_assignment* pAssignment = &(*assignment)[j];
pAssignment->currentOffset = pClientVg->offsetInfo.currentOffset.version; pAssignment->currentOffset = pClientVg->offsetInfo.seekOffset.version;
pAssignment->begin = pClientVg->offsetInfo.walVerBegin; pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
pAssignment->end = pClientVg->offsetInfo.walVerEnd; pAssignment->end = pClientVg->offsetInfo.walVerEnd;
pAssignment->vgId = pClientVg->vgId; pAssignment->vgId = pClientVg->vgId;
...@@ -2654,6 +2665,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a ...@@ -2654,6 +2665,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
SMqPollReq req = {0}; SMqPollReq req = {0};
tmqBuildConsumeReqImpl(&req, tmq, 10, pTopic, pClientVg); tmqBuildConsumeReqImpl(&req, tmq, 10, pTopic, pClientVg);
req.reqOffset = pClientVg->offsetInfo.seekOffset;
int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req); int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
if (msgSize < 0) { if (msgSize < 0) {
...@@ -2750,6 +2762,17 @@ void tmq_free_assignment(tmq_topic_assignment* pAssignment) { ...@@ -2750,6 +2762,17 @@ void tmq_free_assignment(tmq_topic_assignment* pAssignment) {
taosMemoryFree(pAssignment); taosMemoryFree(pAssignment);
} }
static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) {
if (pMsg) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
}
SMqSeekParam* pParam = param;
pParam->code = code;
tsem_post(&pParam->sem);
return 0;
}
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) { int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
if (tmq == NULL) { if (tmq == NULL) {
tscError("invalid tmq handle, null"); tscError("invalid tmq handle, null");
...@@ -2803,35 +2826,71 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ ...@@ -2803,35 +2826,71 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
// update the offset, and then commit to vnode // update the offset, and then commit to vnode
pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;
pOffsetInfo->currentOffset.version = offset >= 1 ? offset - 1 : 0; pOffsetInfo->currentOffset.version = offset >= 1 ? offset - 1 : 0;
pOffsetInfo->seekOffset = pOffsetInfo->currentOffset;
// pOffsetInfo->committedOffset.version = INT64_MIN; // pOffsetInfo->committedOffset.version = INT64_MIN;
pVg->seekUpdated = true; pVg->seekUpdated = true;
tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId);
SMqSeekReq req = {0};
snprintf(req.subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s:%s", tmq->groupId, pTopic->topicName);
req.head.vgId = pVg->vgId;
req.consumerId = tmq->consumerId;
int32_t msgSize = tSerializeSMqSeekReq(NULL, 0, &req);
if (msgSize < 0) {
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
char* msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_OUT_OF_MEMORY;
}
if (tSerializeSMqSeekReq(msg, msgSize, &req) < 0) {
taosMemoryFree(msg);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
taosMemoryFree(msg);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_OUT_OF_MEMORY;
}
SMqSeekParam* pParam = taosMemoryMalloc(sizeof(SMqSeekParam));
if (pParam == NULL) {
taosMemoryFree(msg);
taosMemoryFree(sendInfo);
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_OUT_OF_MEMORY;
}
tsem_init(&pParam->sem, 0, 0);
sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->fp = tmqSeekCb;
sendInfo->msgType = TDMT_VND_TMQ_SEEK;
tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId); int64_t transporterId = 0;
tscInfo("consumer:0x%" PRIx64 " %s send seek info vgId:%d, epoch %d" PRIx64,
tmq->consumerId, pTopic->topicName, vgId, tmq->epoch);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
taosWUnLockLatch(&tmq->lock); taosWUnLockLatch(&tmq->lock);
// SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId}; tsem_wait(&pParam->sem);
// tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic)); int32_t code = pParam->code;
// tsem_destroy(&pParam->sem);
// SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo)); taosMemoryFree(pParam);
// if (pInfo == NULL) {
// tscError("consumer:0x%"PRIx64" failed to prepare seek operation", tmq->consumerId);
// return TSDB_CODE_OUT_OF_MEMORY;
// }
//
// tsem_init(&pInfo->sem, 0, 0);
// pInfo->code = 0;
//
// asyncCommitOffset(tmq, &rspObj, TDMT_VND_TMQ_SEEK_TO_OFFSET, commitCallBackFn, pInfo);
//
// tsem_wait(&pInfo->sem);
// int32_t code = pInfo->code;
//
// tsem_destroy(&pInfo->sem);
// taosMemoryFree(pInfo);
//
// if (code != TSDB_CODE_SUCCESS) {
// tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, pVg->vgId, tstrerror(code));
// }
return 0; if (code != TSDB_CODE_SUCCESS) {
tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, vgId, tstrerror(code));
}
return code;
} }
\ No newline at end of file
...@@ -34,6 +34,8 @@ namespace { ...@@ -34,6 +34,8 @@ namespace {
void printSubResults(void* pRes, int32_t* totalRows) { void printSubResults(void* pRes, int32_t* totalRows) {
char buf[1024]; char buf[1024];
int32_t vgId = tmq_get_vgroup_id(pRes);
int64_t offset = tmq_get_vgroup_offset(pRes);
while (1) { while (1) {
TAOS_ROW row = taos_fetch_row(pRes); TAOS_ROW row = taos_fetch_row(pRes);
if (row == NULL) { if (row == NULL) {
...@@ -45,7 +47,7 @@ void printSubResults(void* pRes, int32_t* totalRows) { ...@@ -45,7 +47,7 @@ void printSubResults(void* pRes, int32_t* totalRows) {
int32_t precision = taos_result_precision(pRes); int32_t precision = taos_result_precision(pRes);
taos_print_row(buf, row, fields, numOfFields); taos_print_row(buf, row, fields, numOfFields);
*totalRows += 1; *totalRows += 1;
printf("precision: %d, row content: %s\n", precision, buf); printf("vgId: %d, offset: %"PRId64", precision: %d, row content: %s\n", vgId, offset, precision, buf);
} }
// taos_free_result(pRes); // taos_free_result(pRes);
...@@ -1160,6 +1162,7 @@ TEST(clientCase, td_25129) { ...@@ -1160,6 +1162,7 @@ TEST(clientCase, td_25129) {
} }
while (1) { while (1) {
printf("start to poll\n");
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout); TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
if (pRes) { if (pRes) {
char buf[128]; char buf[128];
...@@ -1173,9 +1176,24 @@ TEST(clientCase, td_25129) { ...@@ -1173,9 +1176,24 @@ TEST(clientCase, td_25129) {
// printf("vgroup id: %d\n", vgroupId); // printf("vgroup id: %d\n", vgroupId);
printSubResults(pRes, &totalRows); printSubResults(pRes, &totalRows);
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
if (code != 0) {
printf("error occurs:%s\n", tmq_err2str(code));
tmq_free_assignment(pAssign);
tmq_consumer_close(tmq);
taos_close(pConn);
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
return;
}
for(int i = 0; i < numOfAssign; i++){
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
}
} else { } else {
tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset); tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset);
tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset); tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset);
tmq_commit_sync(tmq, pRes);
continue; continue;
} }
......
...@@ -5382,6 +5382,48 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) { ...@@ -5382,6 +5382,48 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
return 0; return 0;
} }
int32_t tSerializeSMqSeekReq(void *buf, int32_t bufLen, SMqSeekReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) {
buf = (char *)buf + headLen;
bufLen -= headLen;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI64(&encoder, pReq->consumerId) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->subKey) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
if (buf != NULL) {
SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen);
pHead->vgId = htonl(pReq->head.vgId);
pHead->contLen = htonl(tlen + headLen);
}
return tlen + headLen;
}
int32_t tDeserializeSMqSeekReq(void *buf, int32_t bufLen, SMqSeekReq *pReq) {
int32_t headLen = sizeof(SMsgHead);
SDecoder decoder = {0};
tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->consumerId) < 0) return -1;
tDecodeCStrTo(&decoder, pReq->subKey);
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) { int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) {
int32_t headLen = sizeof(SMsgHead); int32_t headLen = sizeof(SMsgHead);
if (buf != NULL) { if (buf != NULL) {
......
...@@ -726,7 +726,7 @@ SArray *vmGetMsgHandles() { ...@@ -726,7 +726,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK_TO_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
......
...@@ -228,7 +228,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t m ...@@ -228,7 +228,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t m
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen); int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg);
......
...@@ -330,86 +330,124 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t ...@@ -330,86 +330,124 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
return 0; return 0;
} }
int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
SMqVgOffset vgOffset = {0}; SMqSeekReq req = {0};
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SRpcMsg rsp = {.info = pMsg->info};
int code = 0;
SDecoder decoder; tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey);
tDecoderInit(&decoder, (uint8_t*)msg, msgLen); if (tDeserializeSMqSeekReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) { code = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to decode seek msg", vgId); goto end;
return -1;
}
tDecoderClear(&decoder);
tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64,
vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version);
STqOffset* pOffset = &vgOffset.offset;
if (pOffset->val.type != TMQ_OFFSET__LOG) {
tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type);
return -1;
} }
STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey)); STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) { if (pHandle == NULL) {
tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, pOffset->subKey); tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey);
terrno = TSDB_CODE_INVALID_MSG; code = 0;
return -1; goto end;
} }
// 2. check consumer-vg assignment status // 2. check consumer-vg assignment status
taosRLockLatch(&pTq->lock); taosRLockLatch(&pTq->lock);
if (pHandle->consumerId != vgOffset.consumerId) { if (pHandle->consumerId != req.consumerId) {
tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, tqError("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId); req.consumerId, vgId, req.subKey, pHandle->consumerId);
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
taosRUnLockLatch(&pTq->lock); taosRUnLockLatch(&pTq->lock);
return -1; code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
} goto end;
taosRUnLockLatch(&pTq->lock);
// 3. check the offset info
STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
if (pSavedOffset != NULL) {
if (pSavedOffset->val.type != TMQ_OFFSET__LOG) {
tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey);
return 0; // no need to update the offset value
}
if (pSavedOffset->val.version == pOffset->val.version) {
tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey,
pOffset->val.version, pSavedOffset->val.version);
return 0;
}
}
int64_t sver = 0, ever = 0;
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
if (pOffset->val.version < sver) {
pOffset->val.version = sver;
} else if (pOffset->val.version > ever) {
pOffset->val.version = ever;
} }
// save the new offset value //if consumer register to push manager, push empty to consumer to change vg status from TMQ_VG_STATUS__WAIT to TMQ_VG_STATUS__IDLE,
if (pSavedOffset != NULL) { //otherwise poll data failed after seek.
tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version, tqUnregisterPushHandle(pTq, pHandle);
pSavedOffset->val.version); taosRUnLockLatch(&pTq->lock);
} else {
tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version);
}
if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);
return -1;
}
tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId,
vgOffset.consumerId, vgOffset.offset.val.version);
end:
rsp.code = code;
tmsgSendRsp(&rsp);
return 0; return 0;
// SMqVgOffset vgOffset = {0};
// int32_t vgId = TD_VID(pTq->pVnode);
//
// SDecoder decoder;
// tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
// if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) {
// tqError("vgId:%d failed to decode seek msg", vgId);
// return -1;
// }
//
// tDecoderClear(&decoder);
//
// tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64,
// vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version);
//
// STqOffset* pOffset = &vgOffset.offset;
// if (pOffset->val.type != TMQ_OFFSET__LOG) {
// tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type);
// return -1;
// }
//
// STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey));
// if (pHandle == NULL) {
// tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, pOffset->subKey);
// terrno = TSDB_CODE_INVALID_MSG;
// return -1;
// }
//
// // 2. check consumer-vg assignment status
// taosRLockLatch(&pTq->lock);
// if (pHandle->consumerId != vgOffset.consumerId) {
// tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
// vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId);
// terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
// taosRUnLockLatch(&pTq->lock);
// return -1;
// }
// taosRUnLockLatch(&pTq->lock);
//
// // 3. check the offset info
// STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
// if (pSavedOffset != NULL) {
// if (pSavedOffset->val.type != TMQ_OFFSET__LOG) {
// tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey);
// return 0; // no need to update the offset value
// }
//
// if (pSavedOffset->val.version == pOffset->val.version) {
// tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey,
// pOffset->val.version, pSavedOffset->val.version);
// return 0;
// }
// }
//
// int64_t sver = 0, ever = 0;
// walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
// if (pOffset->val.version < sver) {
// pOffset->val.version = sver;
// } else if (pOffset->val.version > ever) {
// pOffset->val.version = ever;
// }
//
// // save the new offset value
// if (pSavedOffset != NULL) {
// tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version,
// pSavedOffset->val.version);
// } else {
// tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version);
// }
//
// if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) {
// tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version);
// return -1;
// }
//
// tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId,
// vgOffset.consumerId, vgOffset.offset.val.version);
//
// return 0;
} }
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) { int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
......
...@@ -214,7 +214,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ...@@ -214,7 +214,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
SMqMetaRsp metaRsp = {0}; SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0}; STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, pRequest); tqInitTaosxRsp(&taosxRsp, pRequest);
taosxRsp.reqOffset.type = offset->type; // stroe origin type for getting offset in tmq_get_vgroup_offset taosxRsp.reqOffset.type = offset->type; // store origin type for getting offset in tmq_get_vgroup_offset
if (offset->type != TMQ_OFFSET__LOG) { if (offset->type != TMQ_OFFSET__LOG) {
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {
......
...@@ -466,11 +466,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg ...@@ -466,11 +466,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
goto _err; goto _err;
} }
break; break;
case TDMT_VND_TMQ_SEEK_TO_OFFSET:
if (tqProcessSeekReq(pVnode->pTq, ver, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) {
goto _err;
}
break;
case TDMT_VND_TMQ_ADD_CHECKINFO: case TDMT_VND_TMQ_ADD_CHECKINFO:
if (tqProcessAddCheckInfoReq(pVnode->pTq, ver, pReq, len) < 0) { if (tqProcessAddCheckInfoReq(pVnode->pTq, ver, pReq, len) < 0) {
goto _err; goto _err;
...@@ -643,6 +638,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { ...@@ -643,6 +638,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
// return tqProcessPollReq(pVnode->pTq, pMsg); // return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_VG_WALINFO: case TDMT_VND_TMQ_VG_WALINFO:
return tqProcessVgWalInfoReq(pVnode->pTq, pMsg); return tqProcessVgWalInfoReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_SEEK:
return tqProcessSeekReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RUN: case TDMT_STREAM_TASK_RUN:
return tqProcessTaskRunReq(pVnode->pTq, pMsg); return tqProcessTaskRunReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH: case TDMT_STREAM_TASK_DISPATCH:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册