diff --git a/source/server/vnode/src/vnodeReadMsg.c b/source/server/vnode/src/vnodeReadMsg.c index 21ecde332646d0af23970b1e456b0379480aff9e..1835b4f55885247f8617d363ab300d588c859cda 100644 --- a/source/server/vnode/src/vnodeReadMsg.c +++ b/source/server/vnode/src/vnodeReadMsg.c @@ -225,16 +225,16 @@ int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead) { tmqMsgHead msgHead = pConsumeMsg->head; //extract head STQ *pTq = pVnode->pTQ; - tqGroupHandle *pHandle = tqFindGHandleBycId(pTq, msgHead.clientId); + /*tqBufferHandle *pHandle = tqGetHandle(pTq, msgHead.clientId);*/ //return msg if offset not moved - if(pConsumeMsg->commitOffset == pHandle->consumeOffset) { + /*if(pConsumeMsg->commitOffset == pHandle->consumeOffset) {*/ //return msg - return 0; - } + /*return 0;*/ + /*}*/ //or move offset - tqMoveOffsetToNext(pHandle); + /*tqMoveOffsetToNext(pHandle);*/ //fetch or register context - tqFetchMsg(pHandle, pRead); + /*tqFetchMsg(pHandle, pRead);*/ //judge mode, tail read or catch up read /*int64_t lastVer = walLastVer(pVnode->wal);*/ //launch new query diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index b0a1c08673998096c2699601e8eb2c3b65ff8428..52702057d692ce905dff14ea9be05264f0dc42e3 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -148,6 +148,19 @@ static int tqFetch(tqGroupHandle* ghandle, void** msg) { return totSize; } + +tqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) { + return NULL; +} + +int tqLaunchQuery(tqGroupHandle* ghandle) { + return 0; +} + +int tqSendLaunchQuery(STQ* pTq, int64_t topicId, int64_t cgId, void* query) { + return 0; +} + /*int tqMoveOffsetToNext(tqGroupHandle* ghandle) {*/ /*return 0;*/ /*}*/ @@ -268,3 +281,16 @@ int tqDeserializeBufItem(const void* pBytes, tqBufferItem **pBufItem) { return 0; } + +int tqGetGHandleSSize(const tqGroupHandle *gHandle) { + return 0; +} +int tqListHandleSSize(const tqListHandle *listHandle) { + return 0; +} +int tqBufHandleSSize(const tqBufferHandle *bufHandle) { + return 0; +} +int tqBufItemSSize(const tqBufferItem *bufItem) { + return 0; +}