提交 bf3231ab 编写于 作者: wmmhello's avatar wmmhello

fix: cols num error in tmq for query

上级 ecc043a3
...@@ -88,7 +88,7 @@ typedef struct { ...@@ -88,7 +88,7 @@ typedef struct {
STqExecTb execTb; STqExecTb execTb;
STqExecDb execDb; STqExecDb execDb;
}; };
// int32_t numOfCols; // number of out pout column, temporarily used int32_t numOfCols; // number of out pout column, temporarily used
SSchemaWrapper* pSchemaWrapper; // columns that are involved in query SSchemaWrapper* pSchemaWrapper; // columns that are involved in query
} STqExecHandle; } STqExecHandle;
......
...@@ -596,7 +596,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe ...@@ -596,7 +596,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
req.qmsg = NULL; req.qmsg = NULL;
pHandle->execHandle.task = pHandle->execHandle.task =
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, NULL, qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols,
&pHandle->execHandle.pSchemaWrapper); &pHandle->execHandle.pSchemaWrapper);
ASSERT(pHandle->execHandle.task); ASSERT(pHandle->execHandle.task);
void* scanner = NULL; void* scanner = NULL;
......
...@@ -110,7 +110,12 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp* ...@@ -110,7 +110,12 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqMetaRsp*
taosArrayPush(pRsp->blockSchema, &pSW); taosArrayPush(pRsp->blockSchema, &pSW);
} }
} }
tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock));
if(pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN){
tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols);
}else{
tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock));
}
pRsp->blockNum++; pRsp->blockNum++;
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
continue; continue;
......
...@@ -260,7 +260,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { ...@@ -260,7 +260,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
handle.execHandle.task = qCreateQueueExecTaskInfo( handle.execHandle.task = qCreateQueueExecTaskInfo(
handle.execHandle.execCol.qmsg, &reader, NULL, &handle.execHandle.pSchemaWrapper); handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper);
ASSERT(handle.execHandle.task); ASSERT(handle.execHandle.task);
void* scanner = NULL; void* scanner = NULL;
qExtractStreamScanner(handle.execHandle.task, &scanner); qExtractStreamScanner(handle.execHandle.task, &scanner);
......
...@@ -177,13 +177,13 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n ...@@ -177,13 +177,13 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
// extract the number of output columns // extract the number of output columns
SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc; SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc;
if(numOfCols) *numOfCols = 0; *numOfCols = 0;
SNode* pNode; SNode* pNode;
FOREACH(pNode, pDescNode->pSlots) { FOREACH(pNode, pDescNode->pSlots) {
SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode; SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
if (pSlotDesc->output) { if (pSlotDesc->output) {
if(numOfCols) ++(*numOfCols); ++(*numOfCols);
} }
} }
......
...@@ -250,15 +250,14 @@ class TDTestCase: ...@@ -250,15 +250,14 @@ class TDTestCase:
tdLog.printNoPrefix("=============================================") tdLog.printNoPrefix("=============================================")
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self.tmqCase1() self.tmqCase1()
# self.tmqCase2() self.tmqCase2()
self.prepareTestEnv() self.prepareTestEnv()
tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("====================================================================")
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
self.snapshot = 1 self.snapshot = 1
self.tmqCase1() self.tmqCase1()
# self.tmqCase2() self.tmqCase2()
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册