diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index f514bb616c0ba468609a28576ad88d4d2c4d2d00..f94570112bba1d4a152a4e6fb8ca40845f1b147e 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1426,7 +1426,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tDecoderClear(&decoder); memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); - char buf[TSDB_OFFSET_LEN]; + char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &pRspWrapper->dataRsp.rspOffset); tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64, tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, buf, rspType, requestId); @@ -1572,7 +1572,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId); - char buf[TSDB_OFFSET_LEN]; + char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.currentOffset); tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, vgKey, buf); @@ -1800,7 +1800,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p sendInfo->msgType = TDMT_VND_TMQ_CONSUME; int64_t transporterId = 0; - char offsetFormatBuf[TSDB_OFFSET_LEN]; + char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.currentOffset); tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId, @@ -1946,7 +1946,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { updateVgInfo(pVg, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId); - char buf[TSDB_OFFSET_LEN]; + char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset); if (pDataRsp->blockNum == 0) { tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64 @@ -2043,7 +2043,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { tmq->totalRows += numOfRows; - char buf[TSDB_OFFSET_LEN]; + char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.currentOffset); tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, @@ -2677,7 +2677,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO; int64_t transporterId = 0; - char offsetFormatBuf[TSDB_OFFSET_LEN]; + char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset); tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 47cc4a1ce7b4a0df57f54dfcd2d3e3af94acf399..bdf9931ca25cc61c20580ccfe02cc8ae6eee87eb 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -419,6 +419,9 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { mDebug("heartbeat report offset rows.%s:%s", pConsumer->cgroup, data->topicName); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName); + if(pSub == NULL){ + continue; + } taosWLockLatch(&pSub->lock); SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t)); if(pConsumerEp){ diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index b35dc71ed974e656b94b30562ee895a9f81fcd20..c390cdfd3863ef5dffc9cafab26f287179d00b11 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -151,7 +151,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type, int32_t vgId); -int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId); +//int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId); +int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId); // tqMeta int32_t tqMetaOpen(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0c5ccab6a0d5e464f93f3ef090f6d952864eee4c..99f3c02f13365fd0e794a046d0b305933c99d321 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -232,26 +232,43 @@ static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqData return 0; } -int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) { - SMqDataRsp dataRsp = {0}; - dataRsp.head.consumerId = pHandle->consumerId; - dataRsp.head.epoch = pHandle->epoch; - dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; - - int64_t sver = 0, ever = 0; - walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); - tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, - ever); +int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { + SMqPollReq req = {0}; + if (tDeserializeSMqPollReq(pHandle->msg->pCont, pHandle->msg->contLen, &req) < 0) { + tqError("tDeserializeSMqPollReq %d failed", pHandle->msg->contLen); + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } - char buf1[TSDB_OFFSET_LEN] = {0}; - char buf2[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset); - tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset); - tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId, - dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2); + SMqDataRsp dataRsp = {0}; + tqInitDataRsp(&dataRsp, &req); + dataRsp.blockNum = 0; + dataRsp.rspOffset = dataRsp.reqOffset; + tqSendDataRsp(pHandle, pHandle->msg, &req, &dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId); + tDeleteMqDataRsp(&dataRsp); return 0; } +//int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) { +// SMqDataRsp dataRsp = {0}; +// dataRsp.head.consumerId = pHandle->consumerId; +// dataRsp.head.epoch = pHandle->epoch; +// dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; +// +// int64_t sver = 0, ever = 0; +// walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); +// tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver, +// ever); +// +// char buf1[TSDB_OFFSET_LEN] = {0}; +// char buf2[TSDB_OFFSET_LEN] = {0}; +// tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset); +// tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset); +// tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId, +// dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2); +// return 0; +//} + int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type, int32_t vgId) { int64_t sver = 0, ever = 0; @@ -510,7 +527,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { pHandle->epoch = reqEpoch; } - char buf[TSDB_OFFSET_LEN]; + char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset); tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64, consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 4048ebe3f9ef844e4905ee06f4551c316dd49220..06af53d453cb4e189b82bcaff9c17c0e81936ed5 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -64,7 +64,9 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) { memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); } else { - tqPushDataRsp(pHandle, vgId); +// tqPushDataRsp(pHandle, vgId); + tqPushEmptyDataRsp(pHandle, vgId); + void* tmp = pHandle->msg->pCont; memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); pHandle->msg->pCont = tmp; @@ -89,7 +91,8 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) { tqDebug("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId); if(pHandle->msg != NULL) { - tqPushDataRsp(pHandle, vgId); +// tqPushDataRsp(pHandle, vgId); + tqPushEmptyDataRsp(pHandle, vgId); rpcFreeCont(pHandle->msg->pCont); taosMemoryFree(pHandle->msg); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index a301d82c30a603f6c45ccff200e90b6b3f87494b..34c5112eeeb2dc9ef9efe535f51c659e60bd7b36 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -99,7 +99,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand if (pOffset != NULL) { *pOffsetVal = pOffset->val; - char formatBuf[TSDB_OFFSET_LEN]; + char formatBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal); tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue. reqId:0x%" PRIx64, @@ -162,6 +162,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, uint64_t consumerId = pRequest->consumerId; int32_t vgId = TD_VID(pTq->pVnode); int code = 0; + terrno = 0; SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, pRequest); diff --git a/source/libs/parser/src/parInsertSml.c b/source/libs/parser/src/parInsertSml.c index 0e5ffc57da59e14583a3d86133191e3382356454..78b05b6df57d10d5f7d54669353d8906556a72d4 100644 --- a/source/libs/parser/src/parInsertSml.c +++ b/source/libs/parser/src/parInsertSml.c @@ -127,7 +127,7 @@ static int32_t smlBuildTagRow(SArray* cols, SBoundColInfo* tags, SSchema* pSchem if(kv->keyLen != strlen(pTagSchema->name) || memcmp(kv->key, pTagSchema->name, kv->keyLen) != 0 || kv->type != pTagSchema->type){ code = TSDB_CODE_SML_INVALID_DATA; - uError("SML smlBuildCol error col not same %s", pTagSchema->name); + uError("SML smlBuildTagRow error col not same %s", pTagSchema->name); goto end; } @@ -210,7 +210,7 @@ int32_t smlBuildCol(STableDataCxt* pTableCxt, SSchema* schema, void* data, int32 SSmlKv* kv = (SSmlKv*)data; if(kv->keyLen != strlen(pColSchema->name) || memcmp(kv->key, pColSchema->name, kv->keyLen) != 0 || kv->type != pColSchema->type){ ret = TSDB_CODE_SML_INVALID_DATA; - uError("SML smlBuildCol error col not same %s", pColSchema->name); + uInfo("SML smlBuildCol error col not same %s", pColSchema->name); goto end; } if (kv->type == TSDB_DATA_TYPE_NCHAR) { diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 1223e3756cc0ffc436c9eb5d04b4b178af66f6dc..786f48ce88fc9a66ed3c05506108f4759aa4e8a8 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -82,6 +82,11 @@ int32_t walNextValidMsg(SWalReader *pReader) { ", applied index:%" PRId64", end index:%" PRId64, pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer); + if (fetchVer > endVer){ + terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; + return -1; + } + while (fetchVer <= endVer) { if (walFetchHeadNew(pReader, fetchVer) < 0) { return -1; diff --git a/tests/system-test/7-tmq/subscribeStb3.py b/tests/system-test/7-tmq/subscribeStb3.py index 6f3230e68791c9c1806abf576936cd5878bd7b06..ed44ab1fb1940969e0011dd69211f76d58ece449 100644 --- a/tests/system-test/7-tmq/subscribeStb3.py +++ b/tests/system-test/7-tmq/subscribeStb3.py @@ -546,7 +546,7 @@ class TDTestCase: keyList = 'group.id:cgrp1,\ enable.auto.commit:false,\ auto.commit.interval.ms:6000,\ - auto.offset.reset:none' + auto.offset.reset:earliest' self.insertConsumerInfo(consumerId, expectrowcnt/2,topicList,keyList,ifcheckdata,ifManualCommit) tdLog.info("again start consume processor") @@ -569,7 +569,7 @@ class TDTestCase: keyList = 'group.id:cgrp1,\ enable.auto.commit:false,\ auto.commit.interval.ms:6000,\ - auto.offset.reset:none' + auto.offset.reset:earliest' self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) tdLog.info("again start consume processor")