未验证 提交 516fcc53 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #21943 from taosdata/fix/TD-25071

fix:[TD-25071] pSub is null & return wal not exist if no data
......@@ -1426,7 +1426,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tDecoderClear(&decoder);
memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
char buf[TSDB_OFFSET_LEN];
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pRspWrapper->dataRsp.rspOffset);
tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64,
tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, buf, rspType, requestId);
......@@ -1572,7 +1572,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId);
char buf[TSDB_OFFSET_LEN];
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.currentOffset);
tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
vgKey, buf);
......@@ -1800,7 +1800,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
int64_t transporterId = 0;
char offsetFormatBuf[TSDB_OFFSET_LEN];
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.currentOffset);
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId,
......@@ -1946,7 +1946,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
updateVgInfo(pVg, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId);
char buf[TSDB_OFFSET_LEN];
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset);
if (pDataRsp->blockNum == 0) {
tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
......@@ -2043,7 +2043,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tmq->totalRows += numOfRows;
char buf[TSDB_OFFSET_LEN];
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.currentOffset);
tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64,
......@@ -2677,7 +2677,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;
int64_t transporterId = 0;
char offsetFormatBuf[TSDB_OFFSET_LEN];
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset);
tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
......
......@@ -419,6 +419,9 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
mDebug("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName);
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName);
if(pSub == NULL){
continue;
}
taosWLockLatch(&pSub->lock);
SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
if(pConsumerEp){
......
......@@ -151,7 +151,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
int32_t type, int32_t vgId);
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId);
//int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId);
int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId);
// tqMeta
int32_t tqMetaOpen(STQ* pTq);
......
......@@ -232,26 +232,43 @@ static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqData
return 0;
}
int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
SMqDataRsp dataRsp = {0};
dataRsp.head.consumerId = pHandle->consumerId;
dataRsp.head.epoch = pHandle->epoch;
dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
int64_t sver = 0, ever = 0;
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver,
ever);
int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
SMqPollReq req = {0};
if (tDeserializeSMqPollReq(pHandle->msg->pCont, pHandle->msg->contLen, &req) < 0) {
tqError("tDeserializeSMqPollReq %d failed", pHandle->msg->contLen);
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
char buf1[TSDB_OFFSET_LEN] = {0};
char buf2[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId,
dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, &req);
dataRsp.blockNum = 0;
dataRsp.rspOffset = dataRsp.reqOffset;
tqSendDataRsp(pHandle, pHandle->msg, &req, &dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId);
tDeleteMqDataRsp(&dataRsp);
return 0;
}
//int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
// SMqDataRsp dataRsp = {0};
// dataRsp.head.consumerId = pHandle->consumerId;
// dataRsp.head.epoch = pHandle->epoch;
// dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
//
// int64_t sver = 0, ever = 0;
// walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
// tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver,
// ever);
//
// char buf1[TSDB_OFFSET_LEN] = {0};
// char buf2[TSDB_OFFSET_LEN] = {0};
// tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
// tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
// tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId,
// dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
// return 0;
//}
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
int32_t type, int32_t vgId) {
int64_t sver = 0, ever = 0;
......@@ -510,7 +527,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
pHandle->epoch = reqEpoch;
}
char buf[TSDB_OFFSET_LEN];
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset);
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
......
......@@ -64,7 +64,9 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
} else {
tqPushDataRsp(pHandle, vgId);
// tqPushDataRsp(pHandle, vgId);
tqPushEmptyDataRsp(pHandle, vgId);
void* tmp = pHandle->msg->pCont;
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
pHandle->msg->pCont = tmp;
......@@ -89,7 +91,8 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) {
tqDebug("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId);
if(pHandle->msg != NULL) {
tqPushDataRsp(pHandle, vgId);
// tqPushDataRsp(pHandle, vgId);
tqPushEmptyDataRsp(pHandle, vgId);
rpcFreeCont(pHandle->msg->pCont);
taosMemoryFree(pHandle->msg);
......
......@@ -99,7 +99,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
if (pOffset != NULL) {
*pOffsetVal = pOffset->val;
char formatBuf[TSDB_OFFSET_LEN];
char formatBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal);
tqDebug("tmq poll: consumer:0x%" PRIx64
", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue. reqId:0x%" PRIx64,
......@@ -162,6 +162,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
uint64_t consumerId = pRequest->consumerId;
int32_t vgId = TD_VID(pTq->pVnode);
int code = 0;
terrno = 0;
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest);
......
......@@ -127,7 +127,7 @@ static int32_t smlBuildTagRow(SArray* cols, SBoundColInfo* tags, SSchema* pSchem
if(kv->keyLen != strlen(pTagSchema->name) || memcmp(kv->key, pTagSchema->name, kv->keyLen) != 0 || kv->type != pTagSchema->type){
code = TSDB_CODE_SML_INVALID_DATA;
uError("SML smlBuildCol error col not same %s", pTagSchema->name);
uError("SML smlBuildTagRow error col not same %s", pTagSchema->name);
goto end;
}
......@@ -210,7 +210,7 @@ int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32
SSmlKv* kv = (SSmlKv*)data;
if(kv->keyLen != strlen(pColSchema->name) || memcmp(kv->key, pColSchema->name, kv->keyLen) != 0 || kv->type != pColSchema->type){
ret = TSDB_CODE_SML_INVALID_DATA;
uError("SML smlBuildCol error col not same %s", pColSchema->name);
uInfo("SML smlBuildCol error col not same %s", pColSchema->name);
goto end;
}
if (kv->type == TSDB_DATA_TYPE_NCHAR) {
......
......@@ -82,6 +82,11 @@ int32_t walNextValidMsg(SWalReader *pReader) {
", applied index:%" PRId64", end index:%" PRId64,
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
if (fetchVer > endVer){
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
}
while (fetchVer <= endVer) {
if (walFetchHeadNew(pReader, fetchVer) < 0) {
return -1;
......
......@@ -546,7 +546,7 @@ class TDTestCase:
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:none'
auto.offset.reset:earliest'
self.insertConsumerInfo(consumerId, expectrowcnt/2,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("again start consume processor")
......@@ -569,7 +569,7 @@ class TDTestCase:
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:none'
auto.offset.reset:earliest'
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("again start consume processor")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册