提交 3dbe2087 编写于 作者: H Haojun Liao

enh(tmq): fix memory error and add retrieve wal info as required.

上级 c654f114
......@@ -128,6 +128,7 @@ enum {
TMQ_MSG_TYPE__POLL_META_RSP,
TMQ_MSG_TYPE__EP_RSP,
TMQ_MSG_TYPE__TAOSX_RSP,
TMQ_MSG_TYPE__WALINFO_RSP,
TMQ_MSG_TYPE__END_RSP,
};
......
......@@ -3106,15 +3106,14 @@ typedef struct {
} SMqRspHead;
typedef struct {
SMsgHead head;
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int8_t withTbName;
int8_t useSnapshot;
int32_t epoch;
uint64_t reqId;
int64_t consumerId;
int64_t timeout;
// int64_t currentOffset;
SMsgHead head;
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int8_t withTbName;
int8_t useSnapshot;
int32_t epoch;
uint64_t reqId;
int64_t consumerId;
int64_t timeout;
STqOffsetVal reqOffset;
} SMqPollReq;
......
......@@ -304,6 +304,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_ADD_CHECKINFO, "vnode-tmq-add-checkinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_DEL_CHECKINFO, "vnode-del-checkinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_CONSUME, "vnode-tmq-consume", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_VG_WALINFO, "vnode-tmq-vg-walinfo", SMqPollReq, SMqDataBlkRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TMQ_MAX_MSG, "vnd-tmq-max", NULL, NULL)
......
......@@ -147,6 +147,7 @@ typedef struct {
int32_t vgId;
int32_t vgStatus;
int32_t vgSkipCnt; // here used to mark the slow vgroups
bool receiveInfo;
int64_t emptyBlockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data
SEpSet epSet;
} SMqClientVg;
......@@ -196,6 +197,23 @@ typedef struct {
uint64_t requestId; // request id for debug purpose
} SMqPollCbParam;
typedef struct SMqVgCommon {
tsem_t rsp;
int32_t numOfRsp;
SArray* pList;
TdThreadMutex mutex;
int64_t consumerId;
char* pTopicName;
int32_t code;
} SMqVgCommon;
typedef struct SMqVgWalInfoParam {
int32_t vgId;
int32_t epoch;
int32_t totalReq;
SMqVgCommon* pCommon;
} SMqVgWalInfoParam;
typedef struct {
int64_t refId;
int32_t epoch;
......@@ -1100,7 +1118,7 @@ _failed:
}
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
const int32_t MAX_RETRY_COUNT = 120 * 2; // let's wait for 2 mins at most
const int32_t MAX_RETRY_COUNT = 120 * 4; // let's wait for 4 mins at most
const SArray* container = &topic_list->container;
int32_t sz = taosArrayGetSize(container);
void* buf = NULL;
......@@ -1153,22 +1171,13 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
goto FAIL;
}
SMqSubscribeCbParam param = {
.rspErr = 0,
.refId = tmq->refId,
.epoch = tmq->epoch,
};
SMqSubscribeCbParam param = { .rspErr = 0, .refId = tmq->refId, .epoch = tmq->epoch };
if (tsem_init(&param.rspSem, 0, 0) != 0) {
code = TSDB_CODE_TSC_INTERNAL_ERROR;
goto FAIL;
}
sendInfo->msgInfo = (SDataBuf){
.pData = buf,
.len = tlen,
.handle = NULL,
};
sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
......@@ -1196,7 +1205,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
if (retryCnt++ > MAX_RETRY_COUNT) {
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, max retry reached:%d", tmq->consumerId, retryCnt);
code = TSDB_CODE_TSC_INTERNAL_ERROR;
goto FAIL;
}
......@@ -1232,7 +1241,7 @@ void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* para
conf->commitCbUserParam = param;
}
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
static int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqPollCbParam* pParam = (SMqPollCbParam*)param;
int64_t refId = pParam->refId;
......@@ -1285,12 +1294,12 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
}
int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
if (msgEpoch < tmqEpoch) {
int32_t clientEpoch = atomic_load_32(&tmq->epoch);
if (msgEpoch < clientEpoch) {
// do not write into queue since updating epoch reset
tscWarn("consumer:0x%" PRIx64
" msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64,
tmq->consumerId, vgId, msgEpoch, tmqEpoch, requestId);
tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
tsem_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId);
......@@ -1300,9 +1309,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
return 0;
}
if (msgEpoch != tmqEpoch) {
if (msgEpoch != clientEpoch) {
tscWarn("consumer:0x%" PRIx64 " mismatch rsp from vgId:%d, epoch %d, current epoch %d, reqId:0x%" PRIx64,
tmq->consumerId, vgId, msgEpoch, tmqEpoch, requestId);
tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
}
// handle meta rsp
......@@ -1551,8 +1560,8 @@ int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
head->epoch, epoch);
}
pParam->pUserFn(tmq, code, pMsg, pParam->pParam);
pParam->pUserFn(tmq, code, pMsg, pParam->pParam);
taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
taosMemoryFree(pMsg->pEpSet);
......@@ -1725,13 +1734,6 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
tscTrace("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
pVg->vgId, vgSkipCnt);
continue;
#if 0
if (skipCnt < 30000) {
continue;
} else {
tscDebug("consumer:0x%" PRIx64 ",skip vgId:%d skip too much reset", tmq->consumerId, pVg->vgId);
}
#endif
}
atomic_store_32(&pVg->vgSkipCnt, 0);
......@@ -1815,6 +1817,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
// update the valid wal version range
pVg->offsetInfo.walVerBegin = pDataRsp->head.walsver;
pVg->offsetInfo.walVerEnd = pDataRsp->head.walever;
pVg->receiveInfo = true;
char buf[80];
tFormatOffset(buf, 80, &pDataRsp->rspOffset);
......@@ -1931,15 +1934,6 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime,
timeout);
#if 0
tmqHandleAllDelayedTask(tmq);
tmqPollImpl(tmq, timeout);
rspObj = tmqHandleAllRsp(tmq, timeout, false);
if (rspObj) {
return (TAOS_RES*)rspObj;
}
#endif
// in no topic status, delayed task also need to be processed
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
tscDebug("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
......@@ -2139,7 +2133,7 @@ void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void*
}
}
static void commitCallBackFn(tmq_t *UNUSED_PARAM(pTmq), int32_t code, void* param) {
static void commitCallBackFn(tmq_t *UNUSED_PARAM(tmq), int32_t code, void* param) {
SSyncCommitInfo* pInfo = (SSyncCommitInfo*) param;
pInfo->code = code;
tsem_post(&pInfo->sem);
......@@ -2341,13 +2335,57 @@ SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) {
return NULL;
}
static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqVgWalInfoParam* pParam = param;
SMqVgCommon* pCommon = pParam->pCommon;
int32_t total = atomic_add_fetch_32(&pCommon->numOfRsp, 1);
if (code != TSDB_CODE_SUCCESS) {
tscError("consumer:0x%" PRIx64 " failed to get the wal info from vgId:%d for topic:%s", pCommon->consumerId,
pParam->vgId, pCommon->pTopicName);
pCommon->code = code;
} else {
SMqDataRsp rsp;
SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
tDecodeMqDataRsp(&decoder, &rsp);
tDecoderClear(&decoder);
SMqRspHead* pHead = pMsg->pData;
tmq_topic_assignment assignment = {.begin = pHead->walsver,
.end = pHead->walever,
.currentOffset = rsp.rspOffset.version,
.vgroupHandle = pParam->vgId};
taosThreadMutexLock(&pCommon->mutex);
taosArrayPush(pCommon->pList, &assignment);
taosThreadMutexUnlock(&pCommon->mutex);
}
if (total == pParam->totalReq) {
tsem_post(&pCommon->rsp);
}
taosMemoryFree(pParam);
return 0;
}
static void destroyCommonInfo(SMqVgCommon* pCommon) {
taosArrayDestroy(pCommon->pList);
tsem_destroy(&pCommon->rsp);
taosThreadMutexDestroy(&pCommon->mutex);
taosMemoryFree(pCommon->pTopicName);
taosMemoryFree(pCommon);
}
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
int32_t* numOfAssignment) {
*numOfAssignment = 0;
*assignment = NULL;
int32_t accId = tmq->pTscObj->acctId;
char tname[128] = {0};
char tname[128] = {0};
sprintf(tname, "%d.%s", accId, pTopicName);
SMqClientTopic* pTopic = getTopicByName(tmq, tname);
......@@ -2365,8 +2403,14 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
return TSDB_CODE_OUT_OF_MEMORY;
}
bool needFetch = false;
for (int32_t j = 0; j < (*numOfAssignment); ++j) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
if (!pClientVg->receiveInfo) {
needFetch = true;
break;
}
tmq_topic_assignment* pAssignment = &(*assignment)[j];
if (pClientVg->offsetInfo.currentOffset.type == TMQ_OFFSET__LOG) {
......@@ -2380,7 +2424,102 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
pAssignment->vgroupHandle = pClientVg->vgId;
}
return TSDB_CODE_SUCCESS;
if (needFetch) {
SMqVgCommon* pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon));
if (pCommon == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
pCommon->pList= taosArrayInit(4, sizeof(tmq_topic_assignment));
tsem_init(&pCommon->rsp, 0, 0);
taosThreadMutexInit(&pCommon->mutex, 0);
pCommon->pTopicName = taosStrdup(pTopic->topicName);
pCommon->consumerId = tmq->consumerId;
terrno = TSDB_CODE_OUT_OF_MEMORY;
for (int32_t i = 0; i < (*numOfAssignment); ++i) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam));
if (pParam == NULL) {
destroyCommonInfo(pCommon);
return terrno;
}
pParam->epoch = tmq->epoch;
pParam->vgId = pClientVg->vgId;
pParam->totalReq = *numOfAssignment;
pParam->pCommon = pCommon;
SMqPollReq req = {0};
tmqBuildConsumeReqImpl(&req, tmq, 10, pTopic, pClientVg);
int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
if (msgSize < 0) {
taosMemoryFree(pParam);
destroyCommonInfo(pCommon);
return terrno;
}
char* msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
taosMemoryFree(pParam);
destroyCommonInfo(pCommon);
return terrno;
}
if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
taosMemoryFree(msg);
taosMemoryFree(pParam);
destroyCommonInfo(pCommon);
return terrno;
}
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
taosMemoryFree(pParam);
taosMemoryFree(msg);
destroyCommonInfo(pCommon);
return terrno;
}
sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
sendInfo->requestId = req.reqId;
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->fp = tmqGetWalInfoCb;
sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;
int64_t transporterId = 0;
char offsetFormatBuf[80];
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset);
tscDebug("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo);
}
tsem_wait(&pCommon->rsp);
int32_t code = pCommon->code;
terrno = code;
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(*assignment);
*numOfAssignment = 0;
} else {
int32_t num = taosArrayGetSize(pCommon->pList);
for(int32_t i = 0; i < num; ++i) {
(*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i);
}
*numOfAssignment = num;
}
destroyCommonInfo(pCommon);
return code;
} else {
return TSDB_CODE_SUCCESS;
}
}
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgroupHandle, int64_t offset) {
......
......@@ -1113,7 +1113,7 @@ TEST(clientCase, sub_tb_test) {
tmq_topic_assignment* pAssign = NULL;
int32_t numOfAssign = 0;
TAOS_RES* p = tmq_consumer_poll(tmq, timeout);
// TAOS_RES* p = tmq_consumer_poll(tmq, timeout);
int32_t code = tmq_get_topic_assignment(tmq, "topic_t1", &pAssign, &numOfAssign);
while (1) {
......
......@@ -521,6 +521,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_VG_WALINFO, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -188,6 +188,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t type, int64_t sver, int64_t ever);
int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq);
void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver);
void saveOffsetForAllTasks(STQ* pTq, int64_t ver);
void initOffsetForAllRestoreTasks(STQ* pTq);
......
......@@ -208,6 +208,8 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessSeekReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg);
// tq-stream
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
......
......@@ -365,6 +365,81 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
return tqExtractDataForMq(pTq, pHandle, &req, pMsg);
}
int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
SMqPollReq req = {0};
if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
int64_t consumerId = req.consumerId;
STqOffsetVal reqOffset = req.reqOffset;
int32_t vgId = TD_VID(pTq->pVnode);
// 1. find handle
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
// 2. check re-balance status
taosRLockLatch(&pTq->lock);
if (pHandle->consumerId != consumerId) {
tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
consumerId, vgId, req.subKey, pHandle->consumerId);
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
taosRUnLockLatch(&pTq->lock);
return -1;
}
taosRUnLockLatch(&pTq->lock);
int64_t sver = 0, ever = 0;
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, &req);
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
if (pOffset != NULL) {
if (pOffset->val.type != TMQ_OFFSET__LOG) {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s use snapshot, no valid wal info", consumerId, vgId, req.subKey);
terrno = TSDB_CODE_INVALID_PARA;
tDeleteMqDataRsp(&dataRsp);
return -1;
}
dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
dataRsp.rspOffset.version = pOffset->val.version;
} else {
if (req.useSnapshot == true) {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey);
terrno = TSDB_CODE_INVALID_PARA;
tDeleteMqDataRsp(&dataRsp);
return -1;
}
dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
dataRsp.rspOffset.version = sver;
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
dataRsp.rspOffset.version = ever;
} else {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey,
reqOffset.type);
terrno = TSDB_CODE_INVALID_PARA;
tDeleteMqDataRsp(&dataRsp);
return -1;
}
}
tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever);
return 0;
}
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
......
......@@ -124,7 +124,7 @@ void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ve
tqOffsetWrite(pOffsetStore, &offset);
}
static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t subType) {
int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq) {
pRsp->reqOffset = pReq->reqOffset;
pRsp->blockData = taosArrayInit(0, sizeof(void*));
......@@ -214,7 +214,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
tqInitDataRsp(&dataRsp, pRequest);
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
......@@ -252,7 +252,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
int32_t vgId = TD_VID(pTq->pVnode);
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
tqInitDataRsp(&dataRsp, pRequest);
// lock
taosWLockLatch(&pTq->lock);
......@@ -489,7 +489,7 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp*
int32_t len = 0;
int32_t code = 0;
if (type == TMQ_MSG_TYPE__POLL_RSP) {
if (type == TMQ_MSG_TYPE__POLL_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) {
tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
} else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
......@@ -513,7 +513,7 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp*
SEncoder encoder = {0};
tEncoderInit(&encoder, abuf, len);
if (type == TMQ_MSG_TYPE__POLL_RSP) {
if (type == TMQ_MSG_TYPE__POLL_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) {
tEncodeMqDataRsp(&encoder, pRsp);
} else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
......
......@@ -544,6 +544,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return vnodeGetBatchMeta(pVnode, pMsg);
case TDMT_VND_TMQ_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TMQ_VG_WALINFO:
return tqProcessVgWalInfoReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RUN:
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册