From 6d7fd79709197868a7ef461baea69b71175c8aba Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 26 Jan 2022 11:19:26 +0800 Subject: [PATCH] fix query error --- source/client/test/clientTests.cpp | 2 -- source/dnode/mnode/impl/inc/mndDef.h | 6 +++-- source/dnode/vnode/src/tq/tq.c | 34 ++++++++++++++++++++-------- 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index b73079741c..1136a813d3 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -570,7 +570,6 @@ TEST(testCase, create_topic_Test) { //taos_close(pConn); //} -#if 0 TEST(testCase, tmq_subscribe_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -595,7 +594,6 @@ TEST(testCase, tmq_subscribe_Test) { //if (msg == NULL) break; } } -#endif TEST(testCase, tmq_consume_Test) { } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 2a23618798..5ec9173fc8 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -626,8 +626,10 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerTopic(void** buf, SMqConsumerTopic int32_t tlen = 0; tlen += taosEncodeString(buf, pConsumerTopic->name); tlen += taosEncodeFixedI32(buf, pConsumerTopic->epoch); - ASSERT(pConsumerTopic->pVgInfo); - int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo); + int32_t sz = 0; + if (pConsumerTopic->pVgInfo != NULL) { + sz = taosArrayGetSize(pConsumerTopic->pVgInfo); + } tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { int32_t* pVgInfo = taosArrayGet(pConsumerTopic->pVgInfo, i); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4fb4b0eeae..4322f7be79 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -757,12 +757,12 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { if (pTopic->buffer.lastOffset == -1 || pReq->offset > pTopic->buffer.lastOffset) { pTopic->buffer.lastOffset = pReq->offset; } - // put output into rsp - SMqConsumeRsp rsp = { - .consumerId = consumerId, - .numOfTopics = 1 - }; } + // put output into rsp + SMqConsumeRsp rsp = { + .consumerId = consumerId, + .numOfTopics = 1 + }; return 0; } @@ -822,14 +822,29 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) { pReadHandle->pMsg = pMsg; + pMsg->length = htonl(pMsg->length); + pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); tInitSubmitMsgIter(pMsg, &pReadHandle->msgIter); pReadHandle->ver = ver; memset(&pReadHandle->blkIter, 0, sizeof(SSubmitBlkIter)); } bool tqNextDataBlock(STqReadHandle* pHandle) { - while (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock)) { - if (pHandle->tbUid == pHandle->pBlock->uid) return true; + while (1) { + if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { + return false; + } + if (pHandle->pBlock == NULL) return false; + + pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid); + if (pHandle->tbUid == pHandle->pBlock->uid){ + pHandle->pBlock->tid = htonl(pHandle->pBlock->tid); + pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion); + pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen); + pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen); + pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows); + return true; + } } return false; } @@ -845,8 +860,9 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { int32_t sversion = pHandle->pBlock->sversion; - SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true); - STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion); + //TODO : change sversion + SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, 0, true); + STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, 0); SArray* pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData)); if (pArray == NULL) { return NULL; -- GitLab