diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 7c394c4baf5119a6fae11f2c165a95fc4490a654..a97c8ff132be35cd89ae0c4b32fab7efdd85cf32 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -88,7 +88,7 @@ typedef struct { STqExecTb execTb; STqExecDb execDb; }; -// int32_t numOfCols; // number of out pout column, temporarily used + int32_t numOfCols; // number of out pout column, temporarily used SSchemaWrapper* pSchemaWrapper; // columns that are involved in query } STqExecHandle; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 26db68a1d442047318cb615c5754e560f3fc502f..54f764c6b335811a37095fe7ce58ddcf00a831ef 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -596,7 +596,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe req.qmsg = NULL; pHandle->execHandle.task = - qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, NULL, + qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, &pHandle->execHandle.pSchemaWrapper); ASSERT(pHandle->execHandle.task); void* scanner = NULL; diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index e21125f3a4b9b0c89473460bd257dc31d5b0198d..a0b8141cfb3987d40bd4a077f0be184998c272d7 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -110,7 +110,12 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* taosArrayPush(pRsp->blockSchema, &pSW); } } - tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock)); + + if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN){ + tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols); + }else{ + tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock)); + } pRsp->blockNum++; if (pOffset->type == TMQ_OFFSET__LOG) { continue; diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 6b6717ff57e1664b9cfb10f93f8339181b4ea9d1..a192d1f863819f560a2b7b3ce92fe15c603c0fc1 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -260,7 +260,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { handle.execHandle.task = qCreateQueueExecTaskInfo( - handle.execHandle.execCol.qmsg, &reader, NULL, &handle.execHandle.pSchemaWrapper); + handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper); ASSERT(handle.execHandle.task); void* scanner = NULL; qExtractStreamScanner(handle.execHandle.task, &scanner); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 271a65647d3009ba3dc95cf5f3712c5df9b50c55..124f4b44b0b454e738b928f1c607ab6e5bd09c89 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -177,13 +177,13 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n // extract the number of output columns SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc; - if(numOfCols) *numOfCols = 0; + *numOfCols = 0; SNode* pNode; FOREACH(pNode, pDescNode->pSlots) { SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode; if (pSlotDesc->output) { - if(numOfCols) ++(*numOfCols); + ++(*numOfCols); } } diff --git a/tests/system-test/7-tmq/stbTagFilter-1ctb.py b/tests/system-test/7-tmq/stbTagFilter-1ctb.py index 526ff7181e92ca4e62945d64c6deaf079591a3c1..6cb152342be5c80b5f755d0b3f2f7e7bf1c7894a 100644 --- a/tests/system-test/7-tmq/stbTagFilter-1ctb.py +++ b/tests/system-test/7-tmq/stbTagFilter-1ctb.py @@ -250,15 +250,14 @@ class TDTestCase: tdLog.printNoPrefix("=============================================") tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") self.tmqCase1() - # self.tmqCase2() + self.tmqCase2() self.prepareTestEnv() tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") self.snapshot = 1 self.tmqCase1() - # self.tmqCase2() - + self.tmqCase2() def stop(self): tdSql.close()