From 56c98d77684049c00481330460992b53a73cd3ae Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 29 Mar 2023 19:35:04 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/executor/executor.h | 3 ++- source/dnode/vnode/src/tq/tq.c | 7 +++--- source/dnode/vnode/src/tq/tqMeta.c | 10 ++++---- source/libs/executor/inc/executorimpl.h | 8 ++----- source/libs/executor/src/executor.c | 24 +++++++------------ source/libs/executor/src/executorimpl.c | 4 ++-- tests/system-test/2-query/columnLenUpdated.py | 2 +- 7 files changed, 23 insertions(+), 35 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 19a6407b77..8d63306a88 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -78,7 +78,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v * @param SReadHandle * @return */ -qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, SSchemaWrapper** pSchema); +qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, uint64_t id); /** * set the task Id, usually used by message queue process @@ -89,6 +89,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3 void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId); int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); + /** * Set multiple input data blocks for the stream scan. * @param tinfo diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4db53c1627..484a0559a8 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -799,7 +799,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg STqHandle tqHandle = {0}; pHandle = &tqHandle; - /*taosInitRWLatch(&pExec->lock);*/ uint64_t oldConsumerId = pHandle->consumerId; memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN); @@ -834,7 +833,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg req.qmsg = NULL; pHandle->execHandle.task = - qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, &pHandle->execHandle.numOfCols, NULL); + qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, &pHandle->execHandle.numOfCols, req.newConsumerId); void* scanner = NULL; qExtractStreamScanner(pHandle->execHandle.task, &scanner); pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); @@ -847,7 +846,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext**)(&handle.sContext)); - pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL); + pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId); } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL); pHandle->execHandle.execTb.suid = req.suid; @@ -865,7 +864,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext**)(&handle.sContext)); - pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL); + pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId); } taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index b6bca1e4ca..cb4be7a539 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -307,7 +307,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { handle.execHandle.task = - qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, NULL); +` qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, 0); if (handle.execHandle.task == NULL) { tqError("cannot create exec task for %s", handle.subKey); code = -1; @@ -332,7 +332,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, (SSnapContext**)(&reader.sContext)); - handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, NULL); + handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0); } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) { handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); @@ -341,7 +341,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid); for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) { int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); - tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid); + tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid); } handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode); tqReaderSetTbUidList(handle.execHandle.pExecReader, tbUidList); @@ -349,9 +349,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, handle.fetchMeta, (SSnapContext**)(&reader.sContext)); - handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, NULL); + handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0); } - tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode)); + tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId); taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle)); } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index c269367eb2..0cb9955056 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -482,12 +482,6 @@ typedef struct SStreamScanInfo { } SStreamScanInfo; typedef struct { - // int8_t subType; - // bool withMeta; - // int64_t suid; - // int64_t snapVersion; - // void *metaInfo; - // void *dataInfo; SVnode* vnode; SSDataBlock pRes; // result SSDataBlock STsdbReader* dataReader; @@ -690,6 +684,8 @@ typedef struct SStreamFillOperatorInfo { #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) #define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) +SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, char* dbFName); + SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, __optr_explain_fn_t explain); int32_t optrDummyOpenFn(SOperatorInfo* pOperator); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 6c4a4bbe88..06813f0148 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -242,30 +242,28 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, return code; } -qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, SSchemaWrapper** pSchema) { - if (msg == NULL) { - // create raw scan - SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); +qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, uint64_t id) { + if (msg == NULL) { // create raw scan + SExecTaskInfo* pTaskInfo = doCreateExecTaskInfo(0, id, vgId, OPTR_EXEC_MODEL_QUEUE, ""); if (NULL == pTaskInfo) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); - - pTaskInfo->cost.created = taosGetTimestampUs(); - pTaskInfo->execModel = OPTR_EXEC_MODEL_QUEUE; pTaskInfo->pRoot = createRawScanOperatorInfo(pReaderHandle, pTaskInfo); if (NULL == pTaskInfo->pRoot) { terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(pTaskInfo); return NULL; } + + qDebug("create raw scan task info completed, vgId:%d, %s", vgId, GET_TASKID(pTaskInfo)); return pTaskInfo; } struct SSubplan* pPlan = NULL; - int32_t code = qStringToSubplan(msg, &pPlan); + + int32_t code = qStringToSubplan(msg, &pPlan); if (code != TSDB_CODE_SUCCESS) { terrno = code; return NULL; @@ -292,9 +290,6 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3 } } - if (pSchema) { - *pSchema = tCloneSSchemaWrapper(((SExecTaskInfo*)pTaskInfo)->schemaInfo.qsw); - } return pTaskInfo; } @@ -1138,6 +1133,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT STableKeyInfo* pTableInfo = tableListGetInfo(pTableListInfo, 0); uid = pTableInfo->uid; ts = INT64_MIN; + pScanInfo->currentTable = 0; } else { taosRUnLockLatch(&pTaskInfo->lock); qError("no table in table list, %s", id); @@ -1210,10 +1206,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo); pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts; -// if (pTableListInfo == NULL) { -// pTableListInfo = tableListCreate(); -// } - tableListAddTableInfo(pTableListInfo, mtInfo.uid, 0); STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3a6ab4463a..c2a068e1cb 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1974,7 +1974,7 @@ char* buildTaskId(uint64_t taskId, uint64_t queryId) { return p; } -static SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, char* dbFName) { +SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, char* dbFName) { SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); if (pTaskInfo == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1982,6 +1982,7 @@ static SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, in } setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); + pTaskInfo->cost.created = taosGetTimestampUs(); pTaskInfo->schemaInfo.dbname = taosStrdup(dbFName); pTaskInfo->execModel = model; @@ -2465,7 +2466,6 @@ int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHand goto _complete; } - (*pTaskInfo)->cost.created = taosGetTimestampUs(); return TSDB_CODE_SUCCESS; _complete: diff --git a/tests/system-test/2-query/columnLenUpdated.py b/tests/system-test/2-query/columnLenUpdated.py index d6940fde8b..e4555b4e79 100644 --- a/tests/system-test/2-query/columnLenUpdated.py +++ b/tests/system-test/2-query/columnLenUpdated.py @@ -83,7 +83,7 @@ class TDTestCase: def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") - tdSql.init(conn.cursor()) + tdSql.init(conn.cursor(), True) def getBuildPath(self): selfPath = os.path.dirname(os.path.realpath(__file__)) -- GitLab