提交 c654f114 编写于 作者: H Haojun Liao

enh(stream): add new msg for seek, and do some internal refactor.

上级 95346cd4
......@@ -3101,6 +3101,8 @@ typedef struct {
int32_t code;
int32_t epoch;
int64_t consumerId;
int64_t walsver;
int64_t walever;
} SMqRspHead;
typedef struct {
......@@ -3147,43 +3149,9 @@ typedef struct {
SSchemaWrapper schema;
} SMqSubTopicEp;
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pTopicEp->topic);
tlen += taosEncodeString(buf, pTopicEp->db);
int32_t sz = taosArrayGetSize(pTopicEp->vgs);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqSubVgEp* pVgEp = (SMqSubVgEp*)taosArrayGet(pTopicEp->vgs, i);
tlen += tEncodeSMqSubVgEp(buf, pVgEp);
tlen += taosEncodeSSchemaWrapper(buf, &pTopicEp->schema);
return tlen;
static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) {
buf = taosDecodeStringTo(buf, pTopicEp->topic);
buf = taosDecodeStringTo(buf, pTopicEp->db);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp));
if (pTopicEp->vgs == NULL) {
return NULL;
for (int32_t i = 0; i < sz; i++) {
SMqSubVgEp vgEp;
buf = tDecodeSMqSubVgEp(buf, &vgEp);
taosArrayPush(pTopicEp->vgs, &vgEp);
buf = taosDecodeSSchemaWrapper(buf, &pTopicEp->schema);
return buf;
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
pSubTopicEp->schema.nCols = 0;
int32_t tEncodeMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp);
void* tDecodeMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp);
void tDeleteMqSubTopicEp(SMqSubTopicEp* pSubTopicEp);
typedef struct {
SMqRspHead head;
......@@ -3193,8 +3161,8 @@ typedef struct {
void* metaRsp;
} SMqMetaRsp;
int32_t tEncodeSMqMetaRsp(SEncoder* pEncoder, const SMqMetaRsp* pRsp);
int32_t tDecodeSMqMetaRsp(SDecoder* pDecoder, SMqMetaRsp* pRsp);
int32_t tEncodeMqMetaRsp(SEncoder* pEncoder, const SMqMetaRsp* pRsp);
int32_t tDecodeMqMetaRsp(SDecoder* pDecoder, SMqMetaRsp* pRsp);
typedef struct {
SMqRspHead head;
......@@ -3209,9 +3177,9 @@ typedef struct {
SArray* blockSchema;
} SMqDataRsp;
int32_t tEncodeSMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp);
int32_t tDecodeSMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp);
void tDeleteSMqDataRsp(SMqDataRsp* pRsp);
int32_t tEncodeMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp);
int32_t tDecodeMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp);
void tDeleteMqDataRsp(SMqDataRsp* pRsp);
typedef struct {
SMqRspHead head;
......@@ -3247,7 +3215,7 @@ static FORCE_INLINE int32_t tEncodeSMqAskEpRsp(void** buf, const SMqAskEpRsp* pR
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqSubTopicEp* pVgEp = (SMqSubTopicEp*)taosArrayGet(pRsp->topics, i);
tlen += tEncodeSMqSubTopicEp(buf, pVgEp);
tlen += tEncodeMqSubTopicEp(buf, pVgEp);
return tlen;
......@@ -3262,14 +3230,14 @@ static FORCE_INLINE void* tDecodeSMqAskEpRsp(void* buf, SMqAskEpRsp* pRsp) {
for (int32_t i = 0; i < sz; i++) {
SMqSubTopicEp topicEp;
buf = tDecodeSMqSubTopicEp(buf, &topicEp);
buf = tDecodeMqSubTopicEp(buf, &topicEp);
taosArrayPush(pRsp->topics, &topicEp);
return buf;
static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) {
taosArrayDestroyEx(pRsp->topics, (FDelete)tDeleteSMqSubTopicEp);
taosArrayDestroyEx(pRsp->topics, (FDelete)tDeleteMqSubTopicEp);
......@@ -300,6 +300,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SUBSCRIBE, "vnode-tmq-subscribe", SMqRebVgReq, SMqRebVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DELETE_SUB, "vnode-tmq-delete-sub", SMqVDeleteReq, SMqVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_COMMIT_OFFSET, "vnode-tmq-commit-offset", STqOffset, STqOffset)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_SEEK_TO_OFFSET, "vnode-tmq-seekto-offset", STqOffset, STqOffset)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp)
......@@ -190,7 +190,7 @@ int32_t walApplyVer(SWal *, int64_t ver);
// int32_t walDataCorrupted(SWal*);
// read
// wal reader
SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond);
void walCloseReader(SWalReader *pRead);
void walReadReset(SWalReader *pReader);
......@@ -198,6 +198,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver);
int32_t walReadSeekVer(SWalReader *pRead, int64_t ver);
int32_t walNextValidMsg(SWalReader *pRead);
int64_t walReaderGetCurrentVer(const SWalReader* pReader);
void walReaderValidVersionRange(SWalReader* pReader, int64_t *sver, int64_t *ever);
// only for tq usage
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
......@@ -1511,7 +1511,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
rspObj.resType = RES_TYPE__TMQ;
tDecoderInit(&decoder, data, dataLen);
code = tDecodeSMqDataRsp(&decoder, &rspObj.rsp);
code = tDecodeMqDataRsp(&decoder, &rspObj.rsp);
if (code != 0) {
uError("WriteRaw:decode smqDataRsp error");
......@@ -1615,7 +1615,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
code = pRequest->code;
......@@ -1858,7 +1858,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
int32_t len = 0;
int32_t code = 0;
tEncodeSize(tEncodeSMqDataRsp, &rspObj->rsp, len, code);
tEncodeSize(tEncodeMqDataRsp, &rspObj->rsp, len, code);
if (code < 0) {
return -1;
......@@ -1866,7 +1866,7 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
void* buf = taosMemoryCalloc(1, len);
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, len);
tEncodeSMqDataRsp(&encoder, &rspObj->rsp);
tEncodeMqDataRsp(&encoder, &rspObj->rsp);
raw->raw = buf;
......@@ -225,7 +225,7 @@ static int32_t doAskEp(tmq_t* tmq);
static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg);
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet,
int32_t index, int32_t totalVgroups);
int32_t index, int32_t totalVgroups, int32_t type);
static void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId);
static void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param);
static void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param);
......@@ -473,7 +473,7 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet,
int32_t index, int32_t totalVgroups) {
int32_t index, int32_t totalVgroups, int32_t type) {
STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
if (pOffset == NULL) {
......@@ -539,7 +539,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
pMsgSendInfo->param = pParam;
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->fp = tmqCommitCb;
pMsgSendInfo->msgType = type;
atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
atomic_add_fetch_32(&pParamSet->totalRspNum, 1);
......@@ -575,7 +575,7 @@ static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) {
return NULL;
static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* pCommitFp, void* userParam) {
static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tmq_commit_cb* pCommitFp, void* userParam) {
char* pTopicName = NULL;
int32_t vgId = 0;
int32_t code = 0;
......@@ -645,7 +645,7 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* p
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) {
code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups);
code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups, type);
// failed to commit, callback user function directly.
if (code != TSDB_CODE_SUCCESS) {
......@@ -686,7 +686,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) {
int32_t code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups);
int32_t code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups, TDMT_VND_TMQ_COMMIT_OFFSET);
if (code != TSDB_CODE_SUCCESS) {
tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64 " failed, code:%s ordinal:%d/%d",
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.committedOffset.version, tstrerror(terrno),
......@@ -1328,7 +1328,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp);
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
......@@ -1339,7 +1339,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
} else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
tDecodeSMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
tDecodeMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
} else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) {
......@@ -1808,8 +1808,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
// update the local offset value only for the returned values.
pVg->offsetInfo.currentOffset = pDataRsp->rspOffset;
// update the status
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// update the valid wal version range
pVg->offsetInfo.walVerBegin = pDataRsp->head.walsver;
pVg->offsetInfo.walVerEnd = pDataRsp->head.walever;
char buf[80];
tFormatOffset(buf, 80, &pDataRsp->rspOffset);
if (pDataRsp->blockNum == 0) {
......@@ -1837,6 +1843,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
// todo handle the wal range and epset for each vgroup
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
......@@ -2128,11 +2135,11 @@ void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void*
if (pRes == NULL) { // here needs to commit all offsets.
asyncCommitAllOffsets(tmq, cb, param);
} else { // only commit one offset
asyncCommitOffset(tmq, pRes, cb, param);
asyncCommitOffset(tmq, pRes, TDMT_VND_TMQ_COMMIT_OFFSET, cb, param);
static void commitCallBackFn(tmq_t *pTmq, int32_t code, void* param) {
static void commitCallBackFn(tmq_t *UNUSED_PARAM(pTmq), int32_t code, void* param) {
SSyncCommitInfo* pInfo = (SSyncCommitInfo*) param;
pInfo->code = code;
......@@ -2148,7 +2155,7 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
if (pRes == NULL) {
asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo);
} else {
asyncCommitOffset(tmq, pRes, commitCallBackFn, pInfo);
asyncCommitOffset(tmq, pRes, TDMT_VND_TMQ_COMMIT_OFFSET, commitCallBackFn, pInfo);
......@@ -2339,13 +2346,25 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
*numOfAssignment = 0;
*assignment = NULL;
SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName);
int32_t accId = tmq->pTscObj->acctId;
char tname[128] = {0};
sprintf(tname, "%d.%s", accId, pTopicName);
SMqClientTopic* pTopic = getTopicByName(tmq, tname);
if (pTopic == NULL) {
// in case of snapshot is opened, no valid offset will return
*numOfAssignment = taosArrayGetSize(pTopic->vgs);
*assignment = taosMemoryCalloc(*numOfAssignment, sizeof(tmq_topic_assignment));
if (*assignment == NULL) {
tscError("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId,
(*numOfAssignment) * sizeof(tmq_topic_assignment));
for (int32_t j = 0; j < (*numOfAssignment); ++j) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
......@@ -2370,9 +2389,13 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgroupHandle
SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName);
int32_t accId = tmq->pTscObj->acctId;
char tname[128] = {0};
sprintf(tname, "%d.%s", accId, pTopicName);
SMqClientTopic* pTopic = getTopicByName(tmq, tname);
if (pTopic == NULL) {
tscError("consumer:0x:" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
......@@ -2387,7 +2410,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgroupHandle
if (pVg == NULL) {
tscError("consumer:0x:" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgroupHandle);
tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgroupHandle);
......@@ -2395,26 +2418,43 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgroupHandle
int32_t type = pOffsetInfo->currentOffset.type;
if (type != TMQ_OFFSET__LOG) {
tscError("consumer:0x:" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
if (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd) {
tscError("consumer:0x:" PRIx64 " invalid seek params, offset:%" PRId64, tmq->consumerId, offset);
tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64, tmq->consumerId, offset);
// update the offset, and then commit to vnode
if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) {
pOffsetInfo->currentOffset.version = offset;
pOffsetInfo->committedOffset.version = offset;
pOffsetInfo->committedOffset.version = INT64_MIN;
SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId};
tstrncpy(rspObj.topic, pTopicName, tListLen(rspObj.topic));
tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic));
tscDebug("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId);
int32_t code = tmq_commit_sync(tmq, &rspObj);
SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
if (pInfo == NULL) {
tscError("consumer:0x%"PRIx64" failed to prepare seek operation", tmq->consumerId);
tsem_init(&pInfo->sem, 0, 0);
pInfo->code = 0;
asyncCommitOffset(tmq, &rspObj, TDMT_VND_TMQ_SEEK_TO_OFFSET, commitCallBackFn, pInfo);
int32_t code = pInfo->code;
if (code != TSDB_CODE_SUCCESS) {
tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, pVg->vgId,
......@@ -30,6 +30,27 @@
#include "taos.h"
namespace {
void printSubResults(void* pRes, int32_t* totalRows) {
char buf[1024];
while (1) {
TAOS_ROW row = taos_fetch_row(pRes);
if (row == NULL) {
TAOS_FIELD* fields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_field_count(pRes);
int32_t precision = taos_result_precision(pRes);
taos_print_row(buf, row, fields, numOfFields);
*totalRows += 1;
printf("precision: %d, row content: %s\n", precision, buf);
// taos_free_result(pRes);
void showDB(TAOS* pConn) {
TAOS_RES* pRes = taos_query(pConn, "show databases");
......@@ -1059,13 +1080,13 @@ TEST(clientCase, sub_tb_test) {
ASSERT_NE(pConn, nullptr);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "enable.auto.commit", "false");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "cgrpName45");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
tmq_conf_set(conf, "experimental.snapshot.enable", "false");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
......@@ -1074,7 +1095,7 @@ TEST(clientCase, sub_tb_test) {
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topic_t2");
tmq_list_append(topicList, "topic_t1");
// 启动订阅
tmq_subscribe(tmq, topicList);
......@@ -1089,11 +1110,15 @@ TEST(clientCase, sub_tb_test) {
int32_t count = 0;
tmq_topic_assignment* pAssign = NULL;
int32_t numOfAssign = 0;
TAOS_RES* p = tmq_consumer_poll(tmq, timeout);
int32_t code = tmq_get_topic_assignment(tmq, "topic_t1", &pAssign, &numOfAssign);
while (1) {
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
if (pRes) {
char buf[1024];
if (pRes != NULL) {
const char* topicName = tmq_get_topic_name(pRes);
const char* dbName = tmq_get_db_name(pRes);
int32_t vgroupId = tmq_get_vgroup_id(pRes);
......@@ -1102,27 +1127,18 @@ TEST(clientCase, sub_tb_test) {
printf("db: %s\n", dbName);
printf("vgroup id: %d\n", vgroupId);
while (1) {
TAOS_ROW row = taos_fetch_row(pRes);
if (row == NULL) {
fields = taos_fetch_fields(pRes);
numOfFields = taos_field_count(pRes);
precision = taos_result_precision(pRes);
taos_print_row(buf, row, fields, numOfFields);
totalRows += 1;
printf("precision: %d, row content: %s\n", precision, buf);
printSubResults(pRes, &totalRows);
} else {
// tmq_offset_seek(tmq, "topic_t1", pAssign[0].vgroupHandle, pAssign[0].begin);
// break;
tmq_commit_sync(tmq, pRes);
if (pRes != NULL) {
// if ((++count) > 1) {
// break;
// }
} else {
tmq_offset_seek(tmq, "topic_t1", pAssign[0].vgroupHandle, pAssign[0].begin);
......@@ -6973,21 +6973,21 @@ int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) {
return 0;
int32_t tEncodeSMqMetaRsp(SEncoder *pEncoder, const SMqMetaRsp *pRsp) {
int32_t tEncodeMqMetaRsp(SEncoder *pEncoder, const SMqMetaRsp *pRsp) {
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1;
if (tEncodeI16(pEncoder, pRsp->resMsgType)) return -1;
if (tEncodeBinary(pEncoder, pRsp->metaRsp, pRsp->metaRspLen)) return -1;
return 0;
int32_t tDecodeSMqMetaRsp(SDecoder *pDecoder, SMqMetaRsp *pRsp) {
int32_t tDecodeMqMetaRsp(SDecoder *pDecoder, SMqMetaRsp *pRsp) {
if (tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset) < 0) return -1;
if (tDecodeI16(pDecoder, &pRsp->resMsgType) < 0) return -1;
if (tDecodeBinaryAlloc(pDecoder, &pRsp->metaRsp, (uint64_t *)&pRsp->metaRspLen) < 0) return -1;
return 0;
int32_t tEncodeSMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1;
if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->blockNum) < 0) return -1;
......@@ -7012,7 +7012,7 @@ int32_t tEncodeSMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) {
return 0;
int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
if (tDecodeSTqOffsetVal(pDecoder, &pRsp->reqOffset) < 0) return -1;
if (tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->blockNum) < 0) return -1;
......@@ -7057,7 +7057,7 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
return 0;
void tDeleteSMqDataRsp(SMqDataRsp *pRsp) {
void tDeleteMqDataRsp(SMqDataRsp *pRsp) {
pRsp->blockDataLen = taosArrayDestroy(pRsp->blockDataLen);
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
pRsp->blockData = NULL;
......@@ -7539,3 +7539,41 @@ void tDestroySSubmitRsp2(SSubmitRsp2 *pRsp, int32_t flag) {
int32_t tEncodeMqSubTopicEp(void **buf, const SMqSubTopicEp *pTopicEp) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pTopicEp->topic);
tlen += taosEncodeString(buf, pTopicEp->db);
int32_t sz = taosArrayGetSize(pTopicEp->vgs);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqSubVgEp *pVgEp = (SMqSubVgEp *)taosArrayGet(pTopicEp->vgs, i);
tlen += tEncodeSMqSubVgEp(buf, pVgEp);
tlen += taosEncodeSSchemaWrapper(buf, &pTopicEp->schema);
return tlen;
void *tDecodeMqSubTopicEp(void *buf, SMqSubTopicEp *pTopicEp) {
buf = taosDecodeStringTo(buf, pTopicEp->topic);
buf = taosDecodeStringTo(buf, pTopicEp->db);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp));
if (pTopicEp->vgs == NULL) {
return NULL;
for (int32_t i = 0; i < sz; i++) {
SMqSubVgEp vgEp;
buf = tDecodeSMqSubVgEp(buf, &vgEp);
taosArrayPush(pTopicEp->vgs, &vgEp);
buf = taosDecodeSSchemaWrapper(buf, &pTopicEp->schema);
return buf;
void tDeleteMqSubTopicEp(SMqSubTopicEp *pSubTopicEp) {
pSubTopicEp->schema.nCols = 0;
\ No newline at end of file
......@@ -517,6 +517,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SUBSCRIBE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DELETE_SUB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_SEEK_TO_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
......@@ -556,9 +556,14 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
return -1;
((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
((SMqRspHead *)buf)->epoch = serverEpoch;
((SMqRspHead *)buf)->consumerId = pConsumer->consumerId;
SMqRspHead* pHead = buf;
pHead->mqMsgType = TMQ_MSG_TYPE__EP_RSP;
pHead->epoch = serverEpoch;
pHead->consumerId = pConsumer->consumerId;
pHead->walsver = 0;
pHead->walever = 0;
void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqAskEpRsp(&abuf, &rsp);
......@@ -106,6 +106,7 @@ typedef struct {
SMqDataRsp* pDataRsp;
SRpcHandleInfo info;
STqHandle* pHandle;
} STqPushEntry;
struct STQ {
......@@ -145,8 +146,9 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
// tqExec
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows);
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type);
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry);
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
int32_t type, int32_t vgId);
int32_t tqPushDataRsp(STqPushEntry* pPushEntry, int32_t vgId);
// tqMeta
int32_t tqMetaOpen(STQ* pTq);
......@@ -182,13 +184,13 @@ int32_t tqStreamTasksScanWal(STQ* pTq);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId);
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver);
int32_t launchTaskForWalBlock(SStreamTask* pTask, SFetchRet* pRet, STqOffset* pOffset);
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t type, int64_t sver, int64_t ever);
void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver);
void saveOffsetForAllTasks(STQ* pTq, int64_t ver);
void initOffsetForAllRestoreTasks(STQ* pTq);
int32_t transferToWalReadTask(SStreamMeta* pStreamMeta, SArray* pTaskList);
#ifdef __cplusplus
......@@ -206,6 +206,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t m
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
// tq-stream
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
......@@ -76,7 +76,7 @@ static void destroyTqHandle(void* data) {
static void tqPushEntryFree(void* data) {
STqPushEntry* p = *(void**)data;
if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) {
} else if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__TAOSX_RSP) {
......@@ -154,71 +154,30 @@ void tqClose(STQ* pTq) {
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
int64_t consumerId, int32_t type) {
int32_t len = 0;
int32_t code = 0;
if (type == TMQ_MSG_TYPE__POLL_RSP) {
tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
} else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
if (code < 0) {
return -1;
int32_t tlen = sizeof(SMqRspHead) + len;
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
return -1;
((SMqRspHead*)buf)->mqMsgType = type;
((SMqRspHead*)buf)->epoch = epoch;
((SMqRspHead*)buf)->consumerId = consumerId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
SEncoder encoder = {0};
tEncoderInit(&encoder, abuf, len);
if (type == TMQ_MSG_TYPE__POLL_RSP) {
tEncodeSMqDataRsp(&encoder, pRsp);
} else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
SRpcMsg rsp = {
.info = *pRpcHandleInfo,
.pCont = buf,
.contLen = tlen,
.code = 0,
return 0;
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
int32_t tqPushDataRsp(STqPushEntry* pPushEntry, int32_t vgId) {
SMqDataRsp* pRsp = pPushEntry->pDataRsp;
SMqRspHead* pHeader = &pPushEntry->pDataRsp->head;
doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType);
int64_t sver = 0, ever = 0;
walReaderValidVersionRange(pPushEntry->pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
tqDoSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType, sver, ever);
char buf1[80] = {0};
char buf2[80] = {0};
tFormatOffset(buf1, tListLen(buf1), &pRsp->reqOffset);
tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset);
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
vgId, pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
return 0;
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type) {
doSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type);
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
int32_t type, int32_t vgId) {
int64_t sver = 0, ever = 0;
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
char buf1[80] = {0};
char buf2[80] = {0};
......@@ -226,7 +185,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
tFormatOffset(buf2, 80, &pRsp->rspOffset);
tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64,
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
return 0;
......@@ -259,6 +218,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
if (pSavedOffset != NULL && tqOffsetLessOrEqual(&offset, pSavedOffset)) {
tqDebug("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
vgId, offset.subKey, offset.val.version, pSavedOffset->val.version);
return 0; // no need to update the offset value
......@@ -277,6 +238,57 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
return 0;
int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
STqOffset offset = {0};
int32_t vgId = TD_VID(pTq->pVnode);
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
if (tDecodeSTqOffset(&decoder, &offset) < 0) {
return -1;
if (offset.val.type != TMQ_OFFSET__LOG) {
tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, offset.subKey, offset.val.type);
return -1;
STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
if (pSavedOffset != NULL && pSavedOffset->val.type != TMQ_OFFSET__LOG) {
tqError("invalid saved offset type, vgId:%d sub:%s", vgId, offset.subKey);
return 0; // no need to update the offset value
if (pSavedOffset->val.version == offset.val.version) {
tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, offset.subKey, offset.val.version,
return 0;
STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
int64_t sver = 0, ever = 0;
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
if (offset.val.version < sver) {
offset.val.version = sver;
} else if (offset.val.version > ever) {
offset.val.version = ever;
// save the new offset value
tqDebug("vgId:%d sub:%s seek to %" PRId64 " prev offset:%" PRId64, vgId, offset.subKey, offset.val.version,
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, offset.subKey, offset.val.version);
return -1;
return 0;
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
void* pIter = NULL;
......@@ -263,7 +263,7 @@ static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int6
if (pRsp->blockNum > 0) {
tqOffsetResetToLog(&pRsp->rspOffset, ver);
tqPushDataRsp(pTq, pPushEntry);
tqPushDataRsp(pPushEntry, vgId);
recordPushedEntry(pCachedKey, pIter);
......@@ -376,6 +376,7 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest
return -1;
pPushEntry->pHandle = pHandle;
pPushEntry->info = pRpcMsg->info;
memcpy(pPushEntry->subKey, pTqHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN);
......@@ -388,6 +389,7 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest
SMqRspHead* pHead = &pPushEntry->pDataRsp->head;
pHead->consumerId = consumerId;
pHead->epoch = pRequest->epoch;
pHead->mqMsgType = type;
......@@ -411,7 +413,7 @@ int32_t tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint6
(*pEntry)->subKey, vgId, taosHashGetSize(pTq->pPushMgr) - 1);
if (rspConsumer) { // rsp the old consumer with empty block.
tqPushDataRsp(pTq, *pEntry);
tqPushDataRsp(*pEntry, vgId);
taosHashRemove(pTq->pPushMgr, pKey, keyLen);
......@@ -17,7 +17,7 @@
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
static int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp);
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp, int32_t vgId);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
char buf[128] = {0};
......@@ -219,8 +219,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
pHandle->subKey, vgId, dataRsp.rspOffset.version);
int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId);
*pBlockReturned = true;
return code;
......@@ -228,7 +228,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, pRequest);
tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
*pBlockReturned = true;
......@@ -247,6 +247,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
char buf[80] = {0};
uint64_t consumerId = pRequest->consumerId;
int32_t vgId = TD_VID(pTq->pVnode);
......@@ -257,37 +258,32 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
if(code != 0) {
goto end;
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
return code;
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
if (code == 0) {
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
return code;
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
// NOTE: this pHandle->consumerId may have been changed already.
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId);
// NOTE: this pHandle->consumerId may have been changed already.
tFormatOffset(buf, 80, &dataRsp.rspOffset);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64
" code:%d",
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
char buf[80] = {0};
tFormatOffset(buf, 80, &dataRsp.rspOffset);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 " code:%d",
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code);
return code;
static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal *offset) {
int code = 0;
int32_t vgId = TD_VID(pTq->pVnode);
......@@ -303,7 +299,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (metaRsp.metaRspLen > 0) {
code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
code = tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId);
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64,
pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.ts);
......@@ -314,7 +310,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
",ts:%" PRId64,pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,taosxRsp.rspOffset.ts);
if (taosxRsp.blockNum > 0) {
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
return code;
}else {
......@@ -322,7 +318,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (offset->type == TMQ_OFFSET__LOG) {
int64_t fetchVer = offset->version + 1;
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
......@@ -337,13 +332,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
if (savedEpoch > pRequest->epoch) {
tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64
", found new consumer epoch %d, discard req epoch %d", pRequest->consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch);
", found new consumer epoch %d, discard req epoch %d",
pRequest->consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch);
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
return code;
......@@ -357,7 +353,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (pHead->msgType != TDMT_VND_SUBMIT) {
if(totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
return code;
......@@ -368,7 +364,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
metaRsp.resMsgType = pHead->msgType;
metaRsp.metaRspLen = pHead->bodyLen;
metaRsp.metaRsp = pHead->body;
if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) {
if (tqSendMetaPollRsp(pHandle, pMsg, pRequest, &metaRsp, vgId) < 0) {
code = -1;
......@@ -398,7 +394,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
return code;
......@@ -446,10 +442,19 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) {
static void initMqRspHead(SMqRspHead* pMsgHead, int32_t type, int32_t epoch, int64_t consumerId, int64_t sver,
int64_t ever) {
pMsgHead->consumerId = consumerId;
pMsgHead->epoch = epoch;
pMsgHead->mqMsgType = type;
pMsgHead->walsver = sver;
pMsgHead->walever = ever;
int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp, int32_t vgId) {
int32_t len = 0;
int32_t code = 0;
tEncodeSize(tEncodeSMqMetaRsp, pRsp, len, code);
tEncodeSize(tEncodeMqMetaRsp, pRsp, len, code);
if (code < 0) {
return -1;
......@@ -459,27 +464,64 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq,
return -1;
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_META_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
((SMqRspHead*)buf)->consumerId = pReq->consumerId;
int64_t sver = 0, ever = 0;
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
initMqRspHead(buf, TMQ_MSG_TYPE__POLL_META_RSP, pReq->epoch, pReq->consumerId, sver, ever);
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
SEncoder encoder = {0};
tEncoderInit(&encoder, abuf, len);
tEncodeSMqMetaRsp(&encoder, pRsp);
tEncodeMqMetaRsp(&encoder, pRsp);
SRpcMsg resp = {
.info = pMsg->info,
.pCont = buf,
.contLen = tlen,
.code = 0,
SRpcMsg resp = { .info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0 };
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d",
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d", vgId,
pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
return 0;
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t type, int64_t sver, int64_t ever) {
int32_t len = 0;
int32_t code = 0;
if (type == TMQ_MSG_TYPE__POLL_RSP) {
tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
} else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
if (code < 0) {
return -1;
int32_t tlen = sizeof(SMqRspHead) + len;
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
return -1;
SMqRspHead* pHead = (SMqRspHead*)buf;
initMqRspHead(pHead, type, epoch, consumerId, sver, ever);
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
SEncoder encoder = {0};
tEncoderInit(&encoder, abuf, len);
if (type == TMQ_MSG_TYPE__POLL_RSP) {
tEncodeMqDataRsp(&encoder, pRsp);
} else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
SRpcMsg rsp = { .info = *pRpcHandleInfo, .pCont = buf, .contLen = tlen, .code = 0 };
return 0;
\ No newline at end of file
......@@ -389,6 +389,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
goto _err;
if (tqProcessSeekReq(pVnode->pTq, version, pReq, pMsg->contLen - sizeof(SMsgHead)) < 0) {
goto _err;
if (tqProcessAddCheckInfoReq(pVnode->pTq, version, pReq, len) < 0) {
goto _err;
......@@ -102,6 +102,13 @@ int32_t walNextValidMsg(SWalReader *pReader) {
int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; }
void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *ever) {
*sver = walGetFirstVer(pReader->pWal);
int64_t lastVer = walGetLastVer(pReader->pWal);
int64_t committedVer = walGetCommittedVer(pReader->pWal);
*ever = pReader->cond.scanUncommited ? lastVer : committedVer;
static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) {
int64_t ret = 0;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册