diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index e382fa4efd4eea7f22cf41da92ec751bd7a2fa4a..ba16acf7b00e6e485262c7ae372f5c1977c9cea9 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -104,6 +104,7 @@ typedef struct SJoinLogicNode { SNode* pMergeCondition; SNode* pOnConditions; bool isSingleTableJoin; + EOrder inputTsOrder; } SJoinLogicNode; typedef struct SAggLogicNode { @@ -201,6 +202,7 @@ typedef struct SWindowLogicNode { int64_t watermark; int8_t igExpired; EWindowAlgorithm windowAlgo; + EOrder inputTsOrder; } SWindowLogicNode; typedef struct SFillLogicNode { @@ -356,15 +358,14 @@ typedef struct SInterpFuncPhysiNode { SNode* pTimeSeries; // SColumnNode } SInterpFuncPhysiNode; -typedef struct SJoinPhysiNode { +typedef struct SSortMergeJoinPhysiNode { SPhysiNode node; EJoinType joinType; SNode* pMergeCondition; SNode* pOnConditions; SNodeList* pTargets; -} SJoinPhysiNode; - -typedef SJoinPhysiNode SSortMergeJoinPhysiNode; + EOrder inputTsOrder; +} SSortMergeJoinPhysiNode; typedef struct SAggPhysiNode { SPhysiNode node; diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index f8c702459119572f2830bfa96016df43f508926e..81ed5b5ecd2e1453bcde056b14980b6381c6861c 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -255,6 +255,7 @@ typedef struct SSelectStmt { int32_t selectFuncNum; bool isEmptyResult; bool isTimeLineResult; + bool isSubquery; bool hasAggFuncs; bool hasRepeatScanFuncs; bool hasIndefiniteRowsFunc; diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 907b3be5609b119f7a7b9d48c68d88d1bc53f38c..220c4f73e0867b9ca71ef4adb2a3c9d5ac9ffa21 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -114,21 +114,30 @@ typedef struct SWal { int64_t refId; TdThreadMutex mutex; // ref - SHashObj *pRefHash; // ref -> SWalRef + SHashObj *pRefHash; // refId -> SWalRef // path char path[WAL_PATH_LEN]; // reusable write head SWalCkHead writeHead; -} SWal; // WAL HANDLE +} SWal; + +typedef struct { + int64_t refId; + int64_t refVer; + int64_t refFile; + SWal *pWal; +} SWalRef; typedef struct { int8_t scanUncommited; + int8_t scanNotApplied; int8_t scanMeta; int8_t enableRef; } SWalFilterCond; typedef struct { SWal *pWal; + int64_t readerId; TdFilePtr pLogFile; TdFilePtr pIdxFile; int64_t curFileFirstVer; @@ -138,7 +147,8 @@ typedef struct { int8_t curStopped; TdThreadMutex mutex; SWalFilterCond cond; - SWalCkHead *pHead; + // TODO remove it + SWalCkHead *pHead; } SWalReader; // module initialization @@ -157,11 +167,7 @@ int32_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_ int32_t walWriteWithSyncInfo(SWal *, int64_t index, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen); -// This interface assign version automatically and return to caller. -// When using this interface with concurrent writes, -// wal will write all logs atomically, -// but not sure which one will be actually write first, -// and then the unique index of successful writen is returned. +// Assign version automatically and return to caller, // -1 will be returned for failed writes int64_t walAppendLog(SWal *, tmsg_t msgType, SWalSyncInfo syncMeta, const void *body, int32_t bodyLen); @@ -191,17 +197,15 @@ void walSetReaderCapacity(SWalReader *pRead, int32_t capacity); int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead); int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead); int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead); -typedef struct { - int64_t refId; - int64_t ver; -} SWalRef; + +SWalRef *walRefCommittedVer(SWal *); SWalRef *walOpenRef(SWal *); -void walCloseRef(SWalRef *); +void walCloseRef(SWal *pWal, int64_t refId); int32_t walRefVer(SWalRef *, int64_t ver); -int32_t walUnrefVer(SWal *); +void walUnrefVer(SWalRef *); -// help function for raft +// helper function for raft bool walLogExist(SWal *, int64_t ver); bool walIsEmpty(SWal *); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index b063e552f683536736546d7a5e2f7df04ec303ec..262300a3e73f9f4f879f5f1e9e9304ce5b180aef 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -104,6 +104,8 @@ typedef struct { // TODO remove SWalReader* pWalReader; + SWalRef* pRef; + // push STqPushHandle pushHandle; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8ea4b8d98b5c672b3f9ff3bca341153ce6dd0be6..118e3a5d43d159abf7f3ed8a901c2707e7987cdf 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -212,6 +212,15 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) { ASSERT(0); return -1; } + + if (offset.val.type == TMQ_OFFSET__LOG) { + STqHandle* pHandle = taosHashGet(pTq->handles, offset.subKey, strlen(offset.subKey)); + if (walRefVer(pHandle->pRef, offset.val.version) < 0) { + ASSERT(0); + return -1; + } + } + /*}*/ /*}*/ @@ -376,8 +385,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) { - int64_t fetchVer = fetchOffsetNew.version + 1; - SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); + int64_t fetchVer = fetchOffsetNew.version + 1; + pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); if (pCkHead == NULL) { code = -1; goto OVER; @@ -534,11 +543,14 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { pHandle->execHandle.subType = req.subType; pHandle->fetchMeta = req.withMeta; + // TODO version should be assigned and refed during preprocess + SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal); + if (pRef == NULL) { + ASSERT(0); + } + int64_t ver = pRef->refVer; + pHandle->pRef = pRef; - pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); - - // TODO version should be assigned in preprocess - int64_t ver = walGetCommittedVer(pTq->pVnode->pWal); if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { pHandle->execHandle.execCol.qmsg = req.qmsg; pHandle->snapshotVer = ver; @@ -560,10 +572,14 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); ASSERT(pHandle->execHandle.pExecReader); } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { + pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); + pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode); pHandle->execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { + pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); + pHandle->execHandle.execTb.suid = req.suid; SArray* tbUidList = taosArrayInit(0, sizeof(int64_t)); vnodeGetCtbIdList(pTq->pVnode, req.suid, tbUidList); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 620417016f9fd44e395edf878cc511f44e9ce22b..290ffe5c8d6a16865df7ee1296c871ccebf0a847 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -52,7 +52,7 @@ int32_t tqMetaOpen(STQ* pTq) { ASSERT(0); } - TXN txn; + TXN txn = {0}; if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { ASSERT(0); @@ -75,7 +75,13 @@ int32_t tqMetaOpen(STQ* pTq) { STqHandle handle; tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecodeSTqHandle(&decoder, &handle); - handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); + + handle.pRef = walOpenRef(pTq->pVnode->pWal); + if (handle.pRef == NULL) { + ASSERT(0); + } + walRefVer(handle.pRef, handle.snapshotVer); + if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { SReadHandle reader = { .meta = pTq->pVnode->pMeta, @@ -94,6 +100,7 @@ int32_t tqMetaOpen(STQ* pTq) { handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); ASSERT(handle.execHandle.pExecReader); } else { + handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); handle.execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); } diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 9ffdfc2289c6865a246d2782a121339dd095da57..266f96b41e89264e7813a47cdde1932ae7c48a79 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -135,7 +135,7 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo break; } case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { - SJoinPhysiNode *pJoinNode = (SJoinPhysiNode *)pNode; + SSortMergeJoinPhysiNode *pJoinNode = (SSortMergeJoinPhysiNode *)pNode; pPhysiChildren = pJoinNode->node.pChildren; break; } @@ -434,7 +434,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: { STableScanPhysiNode *pTblScanNode = (STableScanPhysiNode *)pNode; EXPLAIN_ROW_NEW(level, - QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == pNode->type ? EXPLAIN_TBL_MERGE_SCAN_FORMAT : EXPLAIN_TBL_SCAN_FORMAT, + QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == pNode->type ? EXPLAIN_TBL_MERGE_SCAN_FORMAT + : EXPLAIN_TBL_SCAN_FORMAT, pTblScanNode->scan.tableName.tname); EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); if (pResNode->pExecInfo) { @@ -551,7 +552,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); if (pSTblScanNode->scan.pScanPseudoCols) { EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pSTblScanNode->scan.pScanPseudoCols->length); - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); } EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); @@ -613,7 +614,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i break; } case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { - SJoinPhysiNode *pJoinNode = (SJoinPhysiNode *)pNode; + SSortMergeJoinPhysiNode *pJoinNode = (SSortMergeJoinPhysiNode *)pNode; EXPLAIN_ROW_NEW(level, EXPLAIN_JOIN_FORMAT, EXPLAIN_JOIN_STRING(pJoinNode->joinType)); EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); if (pResNode->pExecInfo) { @@ -1180,7 +1181,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); if (pDistScanNode->pScanPseudoCols) { EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pDistScanNode->pScanPseudoCols->length); - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); } EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pDistScanNode->node.pOutputDataBlockDesc->totalRowSize); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); @@ -1367,7 +1368,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pInterpNode->pFuncs->length); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); } - + EXPLAIN_ROW_APPEND(EXPLAIN_MODE_FORMAT, nodesGetFillModeString(pInterpNode->fillMode)); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); @@ -1419,7 +1420,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i } } break; - } + } default: qError("not supported physical node type %d", pNode->type); return TSDB_CODE_QRY_APP_ERROR; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index bde7a94c53aacd9e5146f232a3378706ee3895a0..0beb6f17842ecbb8c414c33a6de5d84270e0fda3 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -320,6 +320,47 @@ typedef struct STableScanInfo { int8_t noTable; } STableScanInfo; +typedef struct STableMergeScanInfo { + STableListInfo* tableListInfo; + int32_t tableStartIndex; + int32_t tableEndIndex; + bool hasGroupId; + uint64_t groupId; + SArray* dataReaders; // array of tsdbReaderT* + SReadHandle readHandle; + int32_t bufPageSize; + uint32_t sortBufSize; // max buffer size for in-memory sort + SArray* pSortInfo; + SSortHandle* pSortHandle; + + SSDataBlock* pSortInputBlock; + int64_t startTs; // sort start time + SArray* sortSourceParams; + + SFileBlockLoadRecorder readRecorder; + int64_t numOfRows; + SScanInfo scanInfo; + int32_t scanTimes; + SNode* pFilterNode; // filter info, which is push down by optimizer + SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context + SResultRowInfo* pResultRowInfo; + int32_t* rowEntryInfoOffset; + SExprInfo* pExpr; + SSDataBlock* pResBlock; + SArray* pColMatchInfo; + int32_t numOfOutput; + + SExprSupp pseudoSup; + + SQueryTableDataCond cond; + int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan + int32_t dataBlockLoadFlag; + // if the upstream is an interval operator, the interval info is also kept here to get the time + // window to check if current data block needs to be loaded. + SInterval interval; + SSampleExecInfo sample; // sample execution info +} STableMergeScanInfo; + typedef struct STagScanInfo { SColumnInfo *pCols; SSDataBlock *pRes; @@ -886,7 +927,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SJoinPhysiNode* pJoinNode, +SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index d9cc2dbeb249f1aa16d675b5c0741f0fa95cd46f..36ae1d19ecbb7759d00657e9519a32c1ead3b81f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1325,7 +1325,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM extractQualifiedTupleByFilterResult(pBlock, rowRes, keep); if (pColMatchInfo != NULL) { - for(int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) { SColMatchInfo* pInfo = taosArrayGet(pColMatchInfo, i); if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->targetSlotId); @@ -1646,10 +1646,10 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) { SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder; if (pSummary->pRecoder != NULL) { qDebug( - "%s :cost summary: elapsed time:%.2f ms, total blocks:%d, load block SMA:%d, load data block:%d, total rows:%" - PRId64 ", check rows:%" PRId64, GET_TASKID(pTaskInfo), pSummary->elapsedTime / 1000.0, - pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows, - pRecorder->totalCheckedRows); + "%s :cost summary: elapsed time:%.2f ms, total blocks:%d, load block SMA:%d, load data block:%d, total " + "rows:%" PRId64 ", check rows:%" PRId64, + GET_TASKID(pTaskInfo), pSummary->elapsedTime / 1000.0, pRecorder->totalBlocks, pRecorder->loadBlockStatis, + pRecorder->loadBlocks, pRecorder->totalRows, pRecorder->totalCheckedRows); } // qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, @@ -2783,11 +2783,16 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan *order = TSDB_ORDER_ASC; *scanFlag = MAIN_SCAN; return TSDB_CODE_SUCCESS; - } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) { + } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pTableScanInfo = pOperator->info; *order = pTableScanInfo->cond.order; *scanFlag = pTableScanInfo->scanFlag; return TSDB_CODE_SUCCESS; + } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) { + STableMergeScanInfo* pTableScanInfo = pOperator->info; + *order = pTableScanInfo->cond.order; + *scanFlag = pTableScanInfo->scanFlag; + return TSDB_CODE_SUCCESS; } else { if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) { return TSDB_CODE_INVALID_PARA; @@ -3728,7 +3733,7 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) { } // this the tags and pseudo function columns, we only keep the tag columns - for(int32_t i = 0; i < numOfTags; ++i) { + for (int32_t i = 0; i < numOfTags; ++i) { STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanPseudoCols, i); int32_t type = nodeType(pNode->pExpr); @@ -3844,7 +3849,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, int32_t groupNum = 0; for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) { STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); - int32_t code = getGroupIdFromTagsVal(pHandle->meta, info->uid, group, keyBuf, &info->groupId); + int32_t code = getGroupIdFromTagsVal(pHandle->meta, info->uid, group, keyBuf, &info->groupId); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -4165,7 +4170,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) { pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) { - pOptr = createMergeJoinOperatorInfo(ops, size, (SJoinPhysiNode*)pPhyNode, pTaskInfo); + pOptr = createMergeJoinOperatorInfo(ops, size, (SSortMergeJoinPhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_FILL == type) { pOptr = createFillOperatorInfo(ops[0], (SFillPhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC == type) { diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 2e6c9bd351b5c5b9bdff21ca6d44f4d6520e0eab..f26b2f4f0a0c9e9a4fd33111bd689de82b055a36 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -28,30 +28,30 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator); static void destroyMergeJoinOperator(void* param, int32_t numOfOutput); static void extractTimeCondition(SJoinOperatorInfo* Info, SLogicConditionNode* pLogicConditionNode); -SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SJoinPhysiNode* pJoinNode, - SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, + SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pOperator == NULL || pInfo == NULL) { goto _error; } - SSDataBlock* pResBlock = createResDataBlock(pJoinNode->node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createResDataBlock(pJoinNode->node.pOutputDataBlockDesc); - int32_t numOfCols = 0; + int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols); initResultSizeInfo(&pOperator->resultInfo, 4096); - pInfo->pRes = pResBlock; - pOperator->name = "MergeJoinOperator"; + pInfo->pRes = pResBlock; + pOperator->name = "MergeJoinOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = numOfCols; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->exprSupp.pExprInfo = pExprInfo; + pOperator->exprSupp.numOfExprs = numOfCols; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; SNode* pMergeCondition = pJoinNode->pMergeCondition; if (nodeType(pMergeCondition) == QUERY_NODE_OPERATOR) { @@ -104,7 +104,7 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) { void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; nodesDestroyNode(pJoinOperator->pCondAfterMerge); - + taosMemoryFreeClear(param); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a9d03aebbe365f4c4e25129cdeb1b2c02f51c505..2dcb5558346b1796c295bd8964168d1dfcabbdde 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -274,7 +274,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); } else { - qDebug("%s data block filter out, elapsed time:%"PRId64, GET_TASKID(pTaskInfo), (et - st)); + qDebug("%s data block filter out, elapsed time:%" PRId64, GET_TASKID(pTaskInfo), (et - st)); } return TSDB_CODE_SUCCESS; @@ -1838,11 +1838,14 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { int8_t tagType = smr.me.stbEntry.schemaTag.pSchema[i].type; pColInfoData = taosArrayGet(p->pDataBlock, 4); char tagTypeStr[VARSTR_HEADER_SIZE + 32]; - int tagTypeLen = sprintf(varDataVal(tagTypeStr), "%s", tDataTypes[tagType].name); + int tagTypeLen = sprintf(varDataVal(tagTypeStr), "%s", tDataTypes[tagType].name); if (tagType == TSDB_DATA_TYPE_VARCHAR) { - tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)", (int32_t)(smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE)); + tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)", + (int32_t)(smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE)); } else if (tagType == TSDB_DATA_TYPE_NCHAR) { - tagTypeLen += sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)", (int32_t)((smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)); + tagTypeLen += + sprintf(varDataVal(tagTypeStr) + tagTypeLen, "(%d)", + (int32_t)((smr.me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)); } varDataSetLen(tagTypeStr, tagTypeLen); colDataAppend(pColInfoData, numOfRows, (char*)tagTypeStr, false); @@ -2527,49 +2530,6 @@ _error: return NULL; } -typedef struct STableMergeScanInfo { - STableListInfo* tableListInfo; - int32_t tableStartIndex; - int32_t tableEndIndex; - bool hasGroupId; - uint64_t groupId; - SArray* dataReaders; // array of tsdbReaderT* - SReadHandle readHandle; - int32_t bufPageSize; - uint32_t sortBufSize; // max buffer size for in-memory sort - SArray* pSortInfo; - SSortHandle* pSortHandle; - - SSDataBlock* pSortInputBlock; - int64_t startTs; // sort start time - SArray* sortSourceParams; - - SFileBlockLoadRecorder readRecorder; - int64_t numOfRows; - SScanInfo scanInfo; - int32_t scanTimes; - SNode* pFilterNode; // filter info, which is push down by optimizer - SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context - SResultRowInfo* pResultRowInfo; - int32_t* rowEntryInfoOffset; - SExprInfo* pExpr; - SSDataBlock* pResBlock; - SArray* pColMatchInfo; - int32_t numOfOutput; - - SExprInfo* pPseudoExpr; - int32_t numOfPseudoExpr; - SqlFunctionCtx* pPseudoCtx; - - SQueryTableDataCond cond; - int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan - int32_t dataBlockLoadFlag; - // if the upstream is an interval operator, the interval info is also kept here to get the time - // window to check if current data block needs to be loaded. - SInterval interval; - SSampleExecInfo sample; // sample execution info -} STableMergeScanInfo; - int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, const char* idStr) { @@ -2700,9 +2660,9 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true); // currently only the tbname pseudo column - if (pTableScanInfo->numOfPseudoExpr > 0) { - int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, - pTableScanInfo->numOfPseudoExpr, pBlock, GET_TASKID(pTaskInfo)); + if (pTableScanInfo->pseudoSup.numOfExprs > 0) { + int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo, + pTableScanInfo->pseudoSup.numOfExprs, pBlock, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, code); } @@ -2869,29 +2829,31 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - tsortDestroySortHandle(pInfo->pSortHandle); + size_t numReaders = taosArrayGetSize(pInfo->dataReaders); + + for (int32_t i = 0; i < numReaders; ++i) { + STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i); + blockDataDestroy(param->inputBlock); + } taosArrayClear(pInfo->sortSourceParams); - for (int32_t i = 0; i < taosArrayGetSize(pInfo->dataReaders); ++i) { + tsortDestroySortHandle(pInfo->pSortHandle); + + for (int32_t i = 0; i < numReaders; ++i) { STsdbReader* reader = taosArrayGetP(pInfo->dataReaders, i); tsdbReaderClose(reader); } - taosArrayDestroy(pInfo->dataReaders); pInfo->dataReaders = NULL; return TSDB_CODE_SUCCESS; } -SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capacity, SOperatorInfo* pOperator) { +SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* pResBlock, int32_t capacity, SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SSDataBlock* p = tsortGetSortedDataBlock(pHandle); - if (p == NULL) { - return NULL; - } - - blockDataEnsureCapacity(p, capacity); + blockDataCleanup(pResBlock); + blockDataEnsureCapacity(pResBlock, capacity); while (1) { STupleHandle* pTupleHandle = tsortNextTuple(pHandle); @@ -2899,14 +2861,15 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa break; } - appendOneRowToDataBlock(p, pTupleHandle); - if (p->info.rows >= capacity) { + appendOneRowToDataBlock(pResBlock, pTupleHandle); + if (pResBlock->info.rows >= capacity) { break; } } - qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), p->info.rows); - return (p->info.rows > 0) ? p : NULL; + + qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), pResBlock->info.rows); + return (pResBlock->info.rows > 0) ? pResBlock : NULL; } SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { @@ -2935,7 +2898,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { } SSDataBlock* pBlock = NULL; while (pInfo->tableStartIndex < tableListSize) { - pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator); + pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pInfo->pResBlock, pOperator->resultInfo.capacity, pOperator); if (pBlock != NULL) { pBlock->info.groupId = pInfo->groupId; pOperator->resultInfo.totalRows += pBlock->info.rows; @@ -2959,6 +2922,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; cleanupQueryTableDataCond(&pTableScanInfo->cond); + taosArrayDestroy(pTableScanInfo->sortSourceParams); for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) { STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, i); @@ -2974,7 +2938,9 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock); taosArrayDestroy(pTableScanInfo->pSortInfo); + cleanupExprSupp(&pTableScanInfo->pseudoSup); + taosMemoryFreeClear(pTableScanInfo->rowEntryInfoOffset); taosMemoryFreeClear(param); } @@ -3031,8 +2997,9 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN } if (pTableScanNode->scan.pScanPseudoCols != NULL) { - pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr); - pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowEntryInfoOffset); + SExprSupp* pSup = &pInfo->pseudoSup; + pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs); + pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset); } pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 121e69763040c42c9fc06f919a0ee59964dc5cd5..5279d015b438226fe6ceecc386213cd9faaf9b20 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -375,6 +375,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { CLONE_NODE_FIELD(pMergeCondition); CLONE_NODE_FIELD(pOnConditions); COPY_SCALAR_FIELD(isSingleTableJoin); + COPY_SCALAR_FIELD(inputTsOrder); return TSDB_CODE_SUCCESS; } @@ -440,6 +441,7 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p COPY_SCALAR_FIELD(watermark); COPY_SCALAR_FIELD(igExpired); COPY_SCALAR_FIELD(windowAlgo); + COPY_SCALAR_FIELD(inputTsOrder); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index eec4780293bdcad4ec3a81ad18b1492ef2a4a849..186a51f0001911ef1b0144e404c6dbdc6b7af89c 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1717,7 +1717,7 @@ static const char* jkJoinPhysiPlanOnConditions = "OnConditions"; static const char* jkJoinPhysiPlanTargets = "Targets"; static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { - const SJoinPhysiNode* pNode = (const SJoinPhysiNode*)pObj; + const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj; int32_t code = physicPlanNodeToJson(pObj, pJson); if (TSDB_CODE_SUCCESS == code) { @@ -1737,7 +1737,7 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) { } static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) { - SJoinPhysiNode* pNode = (SJoinPhysiNode*)pObj; + SSortMergeJoinPhysiNode* pNode = (SSortMergeJoinPhysiNode*)pObj; int32_t code = jsonToPhysicPlanNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/nodes/src/nodesTraverseFuncs.c b/source/libs/nodes/src/nodesTraverseFuncs.c index b12e3b14c70d0ba471baf2172c16c6c7b5a617bc..77681af1bc2320fabefd75c326ca3f52123eaeff 100644 --- a/source/libs/nodes/src/nodesTraverseFuncs.c +++ b/source/libs/nodes/src/nodesTraverseFuncs.c @@ -468,7 +468,7 @@ static EDealRes dispatchPhysiPlan(SNode* pNode, ETraversalOrder order, FNodeWalk break; } case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { - SJoinPhysiNode* pJoin = (SJoinPhysiNode*)pNode; + SSortMergeJoinPhysiNode* pJoin = (SSortMergeJoinPhysiNode*)pNode; res = walkPhysiNode((SPhysiNode*)pNode, order, walker, pContext); if (DEAL_RES_ERROR != res && DEAL_RES_END != res) { res = walkPhysiPlan(pJoin->pMergeCondition, order, walker, pContext); diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 23f0bb088d1aff274adc3fb4607a9acc13d784e7..3c6fbe409c6be5d40b0686376e4a0ec87c9e275e 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -287,7 +287,7 @@ SNode* nodesMakeNode(ENodeType type) { case QUERY_NODE_PHYSICAL_PLAN_PROJECT: return makeNode(type, sizeof(SProjectPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: - return makeNode(type, sizeof(SJoinPhysiNode)); + return makeNode(type, sizeof(SSortMergeJoinPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: return makeNode(type, sizeof(SAggPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: @@ -883,7 +883,7 @@ void nodesDestroyNode(SNode* pNode) { break; } case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN: { - SJoinPhysiNode* pPhyNode = (SJoinPhysiNode*)pNode; + SSortMergeJoinPhysiNode* pPhyNode = (SSortMergeJoinPhysiNode*)pNode; destroyPhysiNode((SPhysiNode*)pPhyNode); nodesDestroyNode(pPhyNode->pMergeCondition); nodesDestroyNode(pPhyNode->pOnConditions); diff --git a/source/libs/parser/inc/parAst.h b/source/libs/parser/inc/parAst.h index f3ea332fe248a4d63eaeaaeeebcee24a14c6948d..12c18733b10d4c7b35f73567fc1f433e65707c26 100644 --- a/source/libs/parser/inc/parAst.h +++ b/source/libs/parser/inc/parAst.h @@ -90,7 +90,7 @@ SNode* createValueNode(SAstCreateContext* pCxt, int32_t dataType, const SToken* SNode* createDurationValueNode(SAstCreateContext* pCxt, const SToken* pLiteral); SNode* createDefaultDatabaseCondValue(SAstCreateContext* pCxt); SNode* createPlaceholderValueNode(SAstCreateContext* pCxt, const SToken* pLiteral); -SNode* setProjectionAlias(SAstCreateContext* pCxt, SNode* pNode, const SToken* pAlias); +SNode* setProjectionAlias(SAstCreateContext* pCxt, SNode* pNode, SToken* pAlias); SNode* createLogicConditionNode(SAstCreateContext* pCxt, ELogicConditionType type, SNode* pParam1, SNode* pParam2); SNode* createOperatorNode(SAstCreateContext* pCxt, EOperatorType type, SNode* pLeft, SNode* pRight); SNode* createBetweenAnd(SAstCreateContext* pCxt, SNode* pExpr, SNode* pLeft, SNode* pRight); diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index b237fd2c6edd8f176eb9a336b9f805d1c66a29a7..a54dae1ee98acdbc6dda57855885308c97c8387c 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -527,6 +527,7 @@ SNode* createTempTableNode(SAstCreateContext* pCxt, SNode* pSubquery, const STok } if (QUERY_NODE_SELECT_STMT == nodeType(pSubquery)) { strcpy(((SSelectStmt*)pSubquery)->stmtName, tempTable->table.tableAlias); + ((SSelectStmt*)pSubquery)->isSubquery = true; } else if (QUERY_NODE_SET_OPERATOR == nodeType(pSubquery)) { strcpy(((SSetOperator*)pSubquery)->stmtName, tempTable->table.tableAlias); } @@ -637,8 +638,9 @@ SNode* createInterpTimeRange(SAstCreateContext* pCxt, SNode* pStart, SNode* pEnd return createBetweenAnd(pCxt, createPrimaryKeyCol(pCxt), pStart, pEnd); } -SNode* setProjectionAlias(SAstCreateContext* pCxt, SNode* pNode, const SToken* pAlias) { +SNode* setProjectionAlias(SAstCreateContext* pCxt, SNode* pNode, SToken* pAlias) { CHECK_PARSER_STATUS(pCxt); + trimEscape(pAlias); int32_t len = TMIN(sizeof(((SExprNode*)pNode)->aliasName) - 1, pAlias->n); strncpy(((SExprNode*)pNode)->aliasName, pAlias->z, len); ((SExprNode*)pNode)->aliasName[len] = '\0'; diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 702422e022b5292d5829e09c221f77ef6ccaddc0..d564d536335db100d282230c7031d807c28f1f41 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -739,12 +739,13 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* return TSDB_CODE_SUCCESS; } -static void buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname, SArray* tagName, uint8_t tagNum) { +static void buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname, + SArray* tagName, uint8_t tagNum) { pTbReq->type = TD_CHILD_TABLE; pTbReq->name = strdup(tname); pTbReq->ctb.suid = suid; pTbReq->ctb.tagNum = tagNum; - if(sname) pTbReq->ctb.name = strdup(sname); + if (sname) pTbReq->ctb.name = strdup(sname); pTbReq->ctb.pTag = (uint8_t*)pTag; pTbReq->ctb.tagName = taosArrayDup(tagName); pTbReq->commentLen = -1; @@ -969,7 +970,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint } SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i]]; - char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // todo this can be optimize with parse column + char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // todo this can be optimize with parse column code = checkAndTrimValue(&sToken, tmpTokenBuf, &pCxt->msg); if (code != TSDB_CODE_SUCCESS) { goto end; @@ -1012,7 +1013,8 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint goto end; } - buildCreateTbReq(&pCxt->createTblReq, tName, pTag, pCxt->pTableMeta->suid, pCxt->sTableName, tagName, pCxt->pTableMeta->tableInfo.numOfTags); + buildCreateTbReq(&pCxt->createTblReq, tName, pTag, pCxt->pTableMeta->suid, pCxt->sTableName, tagName, + pCxt->pTableMeta->tableInfo.numOfTags); end: for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) { @@ -1650,7 +1652,6 @@ static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt) { static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) { SName name; CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg)); - CHECK_CODE(reserveDbCfgInCache(pCxt->pComCxt->acctId, name.dbname, pCxt->pMetaCache)); CHECK_CODE(reserveUserAuthInCacheExt(pCxt->pComCxt->pUser, &name, AUTH_TYPE_WRITE, pCxt->pMetaCache)); CHECK_CODE(reserveTableMetaInCacheExt(&name, pCxt->pMetaCache)); CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache)); @@ -2332,7 +2333,8 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols return ret; } - buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName, pTableMeta->tableInfo.numOfTags); + buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName, + pTableMeta->tableInfo.numOfTags); taosArrayDestroy(tagName); smlHandle->tableExecHandle.createTblReq.ctb.name = taosMemoryMalloc(sTableNameLen + 1); diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 74d5f03dc120a91dbb049733a1aab3605a22e581..7c9a8b10dd206d047154b5e1df5d02d119c814c6 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -92,7 +92,7 @@ static char* getSyntaxErrFormat(int32_t errCode) { case TSDB_CODE_PAR_INTER_SLIDING_TOO_BIG: return "sliding value no larger than the interval value"; case TSDB_CODE_PAR_INTER_SLIDING_TOO_SMALL: - return "sliding value can not less than 1% of interval value"; + return "sliding value can not less than 1%% of interval value"; case TSDB_CODE_PAR_ONLY_ONE_JSON_TAG: return "Only one tag if there is a json tag"; case TSDB_CODE_PAR_INCORRECT_NUM_OF_COL: diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 84e712b4664b3d17b56f7b99e15f46308408212a..30e3b676df5b24080561b51d09741fe7a5064861 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -339,6 +339,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect pJoin->joinType = pJoinTable->joinType; pJoin->isSingleTableJoin = pJoinTable->table.singleTable; + pJoin->inputTsOrder = ORDER_ASC; pJoin->node.groupAction = GROUP_ACTION_CLEAR; pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; @@ -625,14 +626,14 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SWindowLogicNode* pWindow, SLogicNode** pLogicNode) { - int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_WINDOW, fmIsWindowClauseFunc, &pWindow->pFuncs); - if (pCxt->pPlanCxt->streamQuery) { pWindow->triggerType = pCxt->pPlanCxt->triggerType; pWindow->watermark = pCxt->pPlanCxt->watermark; pWindow->igExpired = pCxt->pPlanCxt->igExpired; } + pWindow->inputTsOrder = ORDER_ASC; + int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_WINDOW, fmIsWindowClauseFunc, &pWindow->pFuncs); if (TSDB_CODE_SUCCESS == code) { code = rewriteExprsForSelect(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW); } @@ -861,7 +862,8 @@ static int32_t createProjectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel TSWAP(pProject->node.pLimit, pSelect->pLimit); TSWAP(pProject->node.pSlimit, pSelect->pSlimit); - pProject->node.groupAction = GROUP_ACTION_CLEAR; + pProject->node.groupAction = + (!pSelect->isSubquery && pCxt->pPlanCxt->streamQuery) ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR; pProject->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; pProject->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 7b60710c7d40dc569d37d77bbbe6d22f9a01aad8..fcc395af62b1e05d55707e326dc308bb1b5c9dff 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -993,25 +993,28 @@ static bool sortPriKeyOptMayBeOptimized(SLogicNode* pNode) { } static int32_t sortPriKeyOptGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimize, SNodeList** pScanNodes) { - int32_t code = TSDB_CODE_SUCCESS; - switch (nodeType(pNode)) { - case QUERY_NODE_LOGIC_PLAN_SCAN: - if (TSDB_SUPER_TABLE != ((SScanLogicNode*)pNode)->tableType) { - return nodesListMakeAppend(pScanNodes, (SNode*)pNode); + case QUERY_NODE_LOGIC_PLAN_SCAN: { + SScanLogicNode* pScan = (SScanLogicNode*)pNode; + if (NULL != pScan->pGroupTags) { + *pNotOptimize = true; + return TSDB_CODE_SUCCESS; } - break; - case QUERY_NODE_LOGIC_PLAN_JOIN: - code = + return nodesListMakeAppend(pScanNodes, (SNode*)pNode); + } + case QUERY_NODE_LOGIC_PLAN_JOIN: { + int32_t code = sortPriKeyOptGetScanNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes); if (TSDB_CODE_SUCCESS == code) { code = sortPriKeyOptGetScanNodesImpl((SLogicNode*)nodesListGetNode(pNode->pChildren, 1), pNotOptimize, pScanNodes); } return code; + } case QUERY_NODE_LOGIC_PLAN_AGG: + case QUERY_NODE_LOGIC_PLAN_PARTITION: *pNotOptimize = true; - return code; + return TSDB_CODE_SUCCESS; default: break; } @@ -1037,17 +1040,33 @@ static EOrder sortPriKeyOptGetPriKeyOrder(SSortLogicNode* pSort) { return ((SOrderByExprNode*)nodesListGetNode(pSort->pSortKeys, 0))->order; } +static void sortPriKeyOptSetParentOrder(SLogicNode* pNode, EOrder order) { + if (NULL == pNode) { + return; + } + if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode)) { + ((SWindowLogicNode*)pNode)->inputTsOrder = order; + } else if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pNode)) { + ((SJoinLogicNode*)pNode)->inputTsOrder = order; + } + sortPriKeyOptSetParentOrder(pNode->pParent, order); +} + static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SSortLogicNode* pSort, SNodeList* pScanNodes) { EOrder order = sortPriKeyOptGetPriKeyOrder(pSort); - if (ORDER_DESC == order) { - SNode* pScanNode = NULL; - FOREACH(pScanNode, pScanNodes) { - SScanLogicNode* pScan = (SScanLogicNode*)pScanNode; - if (pScan->scanSeq[0] > 0) { - TSWAP(pScan->scanSeq[0], pScan->scanSeq[1]); - } + SNode* pScanNode = NULL; + FOREACH(pScanNode, pScanNodes) { + SScanLogicNode* pScan = (SScanLogicNode*)pScanNode; + if (ORDER_DESC == order && pScan->scanSeq[0] > 0) { + TSWAP(pScan->scanSeq[0], pScan->scanSeq[1]); + } + if (TSDB_SUPER_TABLE == pScan->tableType) { + pScan->scanType = SCAN_TYPE_TABLE_MERGE; + pScan->node.resultDataOrder = DATA_ORDER_LEVEL_GLOBAL; + pScan->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; } + sortPriKeyOptSetParentOrder(pScan->node.pParent, order); } SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0); @@ -1613,10 +1632,10 @@ static void alignProjectionWithTarget(SLogicNode* pNode) { } SProjectLogicNode* pProjectNode = (SProjectLogicNode*)pNode; - SNode* pProjection = NULL; + SNode* pProjection = NULL; FOREACH(pProjection, pProjectNode->pProjections) { SNode* pTarget = NULL; - bool keep = false; + bool keep = false; FOREACH(pTarget, pNode->pTargets) { if (0 == strcmp(((SColumnNode*)pProjection)->node.aliasName, ((SColumnNode*)pTarget)->colName)) { keep = true; @@ -2214,7 +2233,7 @@ static bool tagScanMayBeOptimized(SLogicNode* pNode) { !planOptNodeListHasTbname(pAgg->pGroupKeys)) { return false; } - + SNode* pGroupKey = NULL; FOREACH(pGroupKey, pAgg->pGroupKeys) { SNode* pGroup = NULL; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 0a1f8bbd0ba2f9b10669ad348de976cd195ccadb..587e5669398474fee5f134a6e87e4d13039a7b9c 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -415,7 +415,6 @@ static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SSubplan* pS SScanPhysiNode* pScanPhysiNode, SPhysiNode** pPhyNode) { int32_t code = createScanCols(pCxt, pScanPhysiNode, pScanLogicNode->pScanCols); if (TSDB_CODE_SUCCESS == code) { - // Data block describe also needs to be set without scanning column, such as SELECT COUNT(*) FROM t code = addDataBlockSlots(pCxt, pScanPhysiNode->pScanCols, pScanPhysiNode->node.pOutputDataBlockDesc); } @@ -622,8 +621,8 @@ static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SJoinLogicNode* pJoinLogicNode, SPhysiNode** pPhyNode) { - SJoinPhysiNode* pJoin = - (SJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN); + SSortMergeJoinPhysiNode* pJoin = + (SSortMergeJoinPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pJoinLogicNode, QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN); if (NULL == pJoin) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -975,6 +974,9 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh } static bool projectCanMergeDataBlock(SProjectLogicNode* pProject) { + if (GROUP_ACTION_KEEP == pProject->node.groupAction) { + return false; + } if (DATA_ORDER_LEVEL_NONE == pProject->node.resultDataOrder) { return true; } diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 8586234b7e28342b0e51890306a37c4b22e56487..81e2bff1799af7989f2c7b898d05a97ff4b852f3 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -469,7 +469,7 @@ static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent return code; } -static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, SNodeList** pMergeKeys) { +static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, EOrder order, SNodeList** pMergeKeys) { SOrderByExprNode* pMergeKey = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR); if (NULL == pMergeKey) { return TSDB_CODE_OUT_OF_MEMORY; @@ -479,7 +479,7 @@ static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, SNodeList** nodesDestroyNode((SNode*)pMergeKey); return TSDB_CODE_OUT_OF_MEMORY; } - pMergeKey->order = ORDER_ASC; + pMergeKey->order = order; pMergeKey->nullOrder = NULL_ORDER_FIRST; return nodesListMakeStrictAppend(pMergeKeys, (SNode*)pMergeKey); } @@ -491,7 +491,8 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo ((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_HASH; ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_MERGE; SNodeList* pMergeKeys = NULL; - code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys); + code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, + ((SWindowLogicNode*)pInfo->pSplitNode)->inputTsOrder, &pMergeKeys); if (TSDB_CODE_SUCCESS == code) { code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow, true); } @@ -579,7 +580,8 @@ static int32_t stbSplSplitSessionOrStateForBatch(SSplitContext* pCxt, SStableSpl SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pWindow->pChildren, 0); SNodeList* pMergeKeys = NULL; - int32_t code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk, &pMergeKeys); + int32_t code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pWindow)->pTspk, + ((SWindowLogicNode*)pWindow)->inputTsOrder, &pMergeKeys); if (TSDB_CODE_SUCCESS == code) { code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pChild, pMergeKeys, (SLogicNode*)pChild, true); @@ -913,27 +915,70 @@ static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplit } static SNode* stbSplFindPrimaryKeyFromScan(SScanLogicNode* pScan) { + bool find = false; SNode* pCol = NULL; FOREACH(pCol, pScan->pScanCols) { if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pCol)->colId) { + find = true; + break; + } + } + if (!find) { + return NULL; + } + SNode* pTarget = NULL; + FOREACH(pTarget, pScan->node.pTargets) { + if (nodesEqualNode(pTarget, pCol)) { return pCol; } } - return NULL; + nodesListStrictAppend(pScan->node.pTargets, nodesCloneNode(pCol)); + return pCol; +} + +static int32_t stbSplCreateMergeScanNode(SScanLogicNode* pScan, SLogicNode** pOutputMergeScan, + SNodeList** pOutputMergeKeys) { + SNodeList* pChildren = pScan->node.pChildren; + pScan->node.pChildren = NULL; + + int32_t code = TSDB_CODE_SUCCESS; + SScanLogicNode* pMergeScan = (SScanLogicNode*)nodesCloneNode((SNode*)pScan); + if (NULL == pMergeScan) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + + SNodeList* pMergeKeys = NULL; + if (TSDB_CODE_SUCCESS == code) { + pMergeScan->scanType = SCAN_TYPE_TABLE_MERGE; + pMergeScan->node.pChildren = pChildren; + splSetParent((SLogicNode*)pMergeScan); + code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pMergeScan), + pMergeScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, &pMergeKeys); + } + + if (TSDB_CODE_SUCCESS == code) { + *pOutputMergeScan = (SLogicNode*)pMergeScan; + *pOutputMergeKeys = pMergeKeys; + } else { + nodesDestroyNode((SNode*)pMergeScan); + nodesDestroyList(pMergeKeys); + } + + return code; } static int32_t stbSplSplitMergeScanNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan, bool groupSort) { - SNodeList* pMergeKeys = NULL; - int32_t code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pScan), &pMergeKeys); + SLogicNode* pMergeScan = NULL; + SNodeList* pMergeKeys = NULL; + int32_t code = stbSplCreateMergeScanNode(pScan, &pMergeScan, &pMergeKeys); if (TSDB_CODE_SUCCESS == code) { - code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, (SLogicNode*)pScan, groupSort); + code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, pMergeScan, groupSort); } if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeStrictAppend(&pSubplan->pChildren, - (SNode*)splCreateScanSubplan(pCxt, (SLogicNode*)pScan, SPLIT_FLAG_STABLE_SPLIT)); + (SNode*)splCreateScanSubplan(pCxt, pMergeScan, SPLIT_FLAG_STABLE_SPLIT)); } - pScan->scanType = SCAN_TYPE_TABLE_MERGE; ++(pCxt->groupId); return code; } @@ -978,14 +1023,14 @@ static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) } static int32_t stbSplCreateMergeKeysForPartitionNode(SLogicNode* pPart, SNodeList** pMergeKeys) { - SNode* pPrimaryKey = - nodesCloneNode(stbSplFindPrimaryKeyFromScan((SScanLogicNode*)nodesListGetNode(pPart->pChildren, 0))); + SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pPart->pChildren, 0); + SNode* pPrimaryKey = nodesCloneNode(stbSplFindPrimaryKeyFromScan(pScan)); if (NULL == pPrimaryKey) { return TSDB_CODE_OUT_OF_MEMORY; } int32_t code = nodesListAppend(pPart->pTargets, pPrimaryKey); if (TSDB_CODE_SUCCESS == code) { - code = stbSplCreateMergeKeysByPrimaryKey(pPrimaryKey, pMergeKeys); + code = stbSplCreateMergeKeysByPrimaryKey(pPrimaryKey, pScan->scanSeq[0] > 0 ? ORDER_ASC : ORDER_DESC, pMergeKeys); } return code; } diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index bfa6079cb15f92588b6f9490fa8ebb63809cdfb7..7aab8a7ca3622874cdbbb6653c1cd6f05d804b45 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -124,7 +124,8 @@ int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* } static int32_t adjustScanDataRequirement(SScanLogicNode* pScan, EDataOrderLevel requirement) { - if (SCAN_TYPE_TABLE != pScan->scanType && SCAN_TYPE_TABLE_MERGE != pScan->scanType) { + if ((SCAN_TYPE_TABLE != pScan->scanType && SCAN_TYPE_TABLE_MERGE != pScan->scanType) || + DATA_ORDER_LEVEL_GLOBAL == pScan->node.requireDataOrder) { return TSDB_CODE_SUCCESS; } // The lowest sort level of scan output data is DATA_ORDER_LEVEL_IN_BLOCK diff --git a/source/libs/planner/test/planBasicTest.cpp b/source/libs/planner/test/planBasicTest.cpp index 8f9cd94c19eb29c20898a10a1167803a74384b2e..9cfae68d34a39186b33a162d1ca0fe0ab6cd13c7 100644 --- a/source/libs/planner/test/planBasicTest.cpp +++ b/source/libs/planner/test/planBasicTest.cpp @@ -24,9 +24,10 @@ TEST_F(PlanBasicTest, selectClause) { useDb("root", "test"); run("SELECT * FROM t1"); - run("SELECT 1 FROM t1"); - run("SELECT * FROM st1"); - run("SELECT 1 FROM st1"); + + run("SELECT MAX(c1) c2, c2 FROM t1"); + + run("SELECT MAX(c1) c2, c2 FROM st1"); } TEST_F(PlanBasicTest, whereClause) { diff --git a/source/libs/planner/test/planOptimizeTest.cpp b/source/libs/planner/test/planOptimizeTest.cpp index 770ac94e5bd540a83193e7ea9636eeafd93fd116..058705403b75520bf74b1f4c75a74ac9390adc8d 100644 --- a/source/libs/planner/test/planOptimizeTest.cpp +++ b/source/libs/planner/test/planOptimizeTest.cpp @@ -53,6 +53,8 @@ TEST_F(PlanOptimizeTest, sortPrimaryKey) { run("SELECT c1 FROM t1 ORDER BY ts"); + run("SELECT c1 FROM st1 ORDER BY ts"); + run("SELECT c1 FROM t1 ORDER BY ts DESC"); run("SELECT COUNT(*) FROM t1 INTERVAL(10S) ORDER BY _WSTART DESC"); diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index 491b982968d4fdfb75d3c5fe2cc6e4b27d8c2ebf..85238e87b9f5c5549f2a1cf82435dca8fd85b5a4 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -93,7 +93,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { } // init ref - pWal->pRefHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); + pWal->pRefHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); if (pWal->pRefHash == NULL) { taosMemoryFree(pWal); return NULL; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 6d0e844e8e2f17681f0aa7559153bbd4e9eeb24c..ac62b7d98dfeec5e7df073ef165a79d1ad95b7f6 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -21,107 +21,112 @@ static int32_t walFetchBodyNew(SWalReader *pRead); static int32_t walSkipFetchBodyNew(SWalReader *pRead); SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) { - SWalReader *pRead = taosMemoryCalloc(1, sizeof(SWalReader)); - if (pRead == NULL) { + SWalReader *pReader = taosMemoryCalloc(1, sizeof(SWalReader)); + if (pReader == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - pRead->pWal = pWal; - pRead->pIdxFile = NULL; - pRead->pLogFile = NULL; - pRead->curVersion = -1; - pRead->curFileFirstVer = -1; - pRead->curInvalid = 1; - pRead->capacity = 0; + pReader->pWal = pWal; + pReader->readerId = tGenIdPI64(); + pReader->pIdxFile = NULL; + pReader->pLogFile = NULL; + pReader->curVersion = -1; + pReader->curFileFirstVer = -1; + pReader->curInvalid = 1; + pReader->capacity = 0; if (cond) { - pRead->cond = *cond; + pReader->cond = *cond; } else { - pRead->cond.scanMeta = 0; - pRead->cond.scanUncommited = 0; - pRead->cond.enableRef = 0; + pReader->cond.scanUncommited = 0; + pReader->cond.scanNotApplied = 0; + pReader->cond.scanMeta = 0; + pReader->cond.enableRef = 0; } - taosThreadMutexInit(&pRead->mutex, NULL); + taosThreadMutexInit(&pReader->mutex, NULL); - /*if (pRead->cond.enableRef) {*/ - /*walOpenRef(pWal);*/ - /*}*/ - - pRead->pHead = taosMemoryMalloc(sizeof(SWalCkHead)); - if (pRead->pHead == NULL) { + pReader->pHead = taosMemoryMalloc(sizeof(SWalCkHead)); + if (pReader->pHead == NULL) { terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; - taosMemoryFree(pRead); + taosMemoryFree(pReader); return NULL; } - return pRead; + /*if (pReader->cond.enableRef) {*/ + /* taosHashPut(pWal->pRefHash, &pReader->readerId, sizeof(int64_t), &pReader, sizeof(void *));*/ + /*}*/ + + return pReader; } -void walCloseReader(SWalReader *pRead) { - taosCloseFile(&pRead->pIdxFile); - taosCloseFile(&pRead->pLogFile); - taosMemoryFreeClear(pRead->pHead); - taosMemoryFree(pRead); +void walCloseReader(SWalReader *pReader) { + taosCloseFile(&pReader->pIdxFile); + taosCloseFile(&pReader->pLogFile); + /*if (pReader->cond.enableRef) {*/ + /*taosHashRemove(pReader->pWal->pRefHash, &pReader->readerId, sizeof(int64_t));*/ + /*}*/ + taosMemoryFreeClear(pReader->pHead); + taosMemoryFree(pReader); } -int32_t walNextValidMsg(SWalReader *pRead) { - int64_t fetchVer = pRead->curVersion; - int64_t lastVer = walGetLastVer(pRead->pWal); - int64_t committedVer = walGetCommittedVer(pRead->pWal); - int64_t appliedVer = walGetAppliedVer(pRead->pWal); - int64_t endVer = pRead->cond.scanUncommited ? lastVer : committedVer; +int32_t walNextValidMsg(SWalReader *pReader) { + int64_t fetchVer = pReader->curVersion; + int64_t lastVer = walGetLastVer(pReader->pWal); + int64_t committedVer = walGetCommittedVer(pReader->pWal); + int64_t appliedVer = walGetAppliedVer(pReader->pWal); + int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer; endVer = TMIN(appliedVer, endVer); wDebug("vgId:%d wal start to fetch, ver %ld, last ver %ld commit ver %ld, applied ver %ld, end ver %ld", - pRead->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer); - pRead->curStopped = 0; + pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer); + pReader->curStopped = 0; while (fetchVer <= endVer) { - if (walFetchHeadNew(pRead, fetchVer) < 0) { + if (walFetchHeadNew(pReader, fetchVer) < 0) { return -1; } - if (pRead->pHead->head.msgType == TDMT_VND_SUBMIT || - (IS_META_MSG(pRead->pHead->head.msgType) && pRead->cond.scanMeta)) { - if (walFetchBodyNew(pRead) < 0) { + if (pReader->pHead->head.msgType == TDMT_VND_SUBMIT || + (IS_META_MSG(pReader->pHead->head.msgType) && pReader->cond.scanMeta)) { + if (walFetchBodyNew(pReader) < 0) { return -1; } return 0; } else { - if (walSkipFetchBodyNew(pRead) < 0) { + if (walSkipFetchBodyNew(pReader) < 0) { return -1; } fetchVer++; - ASSERT(fetchVer == pRead->curVersion); + ASSERT(fetchVer == pReader->curVersion); } } - pRead->curStopped = 1; + pReader->curStopped = 1; return -1; } -static int64_t walReadSeekFilePos(SWalReader *pRead, int64_t fileFirstVer, int64_t ver) { +static int64_t walReadSeekFilePos(SWalReader *pReader, int64_t fileFirstVer, int64_t ver) { int64_t ret = 0; - TdFilePtr pIdxTFile = pRead->pIdxFile; - TdFilePtr pLogTFile = pRead->pLogFile; + TdFilePtr pIdxTFile = pReader->pIdxFile; + TdFilePtr pLogTFile = pReader->pLogFile; // seek position int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry); ret = taosLSeekFile(pIdxTFile, offset, SEEK_SET); if (ret < 0) { terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, failed to seek idx file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, - offset, terrstr()); + wError("vgId:%d, failed to seek idx file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, + ver, offset, terrstr()); return -1; } SWalIdxEntry entry = {0}; if ((ret = taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry))) != sizeof(SWalIdxEntry)) { if (ret < 0) { terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, failed to read idx file, since %s", pRead->pWal->cfg.vgId, terrstr()); + wError("vgId:%d, failed to read idx file, since %s", pReader->pWal->cfg.vgId, terrstr()); } else { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; wError("vgId:%d, read idx file incompletely, read bytes %" PRId64 ", bytes should be %" PRIu64, - pRead->pWal->cfg.vgId, ret, sizeof(SWalIdxEntry)); + pReader->pWal->cfg.vgId, ret, sizeof(SWalIdxEntry)); } return -1; } @@ -130,79 +135,79 @@ static int64_t walReadSeekFilePos(SWalReader *pRead, int64_t fileFirstVer, int64 ret = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET); if (ret < 0) { terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, failed to seek log file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, - entry.offset, terrstr()); + wError("vgId:%d, failed to seek log file, index:%" PRId64 ", pos:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, + ver, entry.offset, terrstr()); return -1; } return ret; } -static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) { +static int32_t walReadChangeFile(SWalReader *pReader, int64_t fileFirstVer) { char fnameStr[WAL_FILE_LEN]; - taosCloseFile(&pRead->pIdxFile); - taosCloseFile(&pRead->pLogFile); + taosCloseFile(&pReader->pIdxFile); + taosCloseFile(&pReader->pLogFile); - walBuildLogName(pRead->pWal, fileFirstVer, fnameStr); - TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_READ); - if (pLogTFile == NULL) { + walBuildLogName(pReader->pWal, fileFirstVer, fnameStr); + TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_READ); + if (pLogFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, cannot open file %s, since %s", pRead->pWal->cfg.vgId, fnameStr, terrstr()); + wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr()); return -1; } - pRead->pLogFile = pLogTFile; + pReader->pLogFile = pLogFile; - walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr); - TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ); - if (pIdxTFile == NULL) { + walBuildIdxName(pReader->pWal, fileFirstVer, fnameStr); + TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ); + if (pIdxFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, cannot open file %s, since %s", pRead->pWal->cfg.vgId, fnameStr, terrstr()); + wError("vgId:%d, cannot open file %s, since %s", pReader->pWal->cfg.vgId, fnameStr, terrstr()); return -1; } - pRead->pIdxFile = pIdxTFile; + pReader->pIdxFile = pIdxFile; return 0; } -int32_t walReadSeekVerImpl(SWalReader *pRead, int64_t ver) { - SWal *pWal = pRead->pWal; +int32_t walReadSeekVerImpl(SWalReader *pReader, int64_t ver) { + SWal *pWal = pReader->pWal; + // bsearch in fileSet SWalFileInfo tmpInfo; tmpInfo.firstVer = ver; - // bsearch in fileSet SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); ASSERT(pRet != NULL); - if (pRead->curFileFirstVer != pRet->firstVer) { + if (pReader->curFileFirstVer != pRet->firstVer) { // error code was set inner - if (walReadChangeFile(pRead, pRet->firstVer) < 0) { + if (walReadChangeFile(pReader, pRet->firstVer) < 0) { return -1; } } // error code was set inner - if (walReadSeekFilePos(pRead, pRet->firstVer, ver) < 0) { + if (walReadSeekFilePos(pReader, pRet->firstVer, ver) < 0) { return -1; } - wDebug("wal version reset from %ld(invalid: %d) to %ld", pRead->curVersion, pRead->curInvalid, ver); + wDebug("wal version reset from %ld(invalid: %d) to %ld", pReader->curVersion, pReader->curInvalid, ver); - pRead->curVersion = ver; + pReader->curVersion = ver; return 0; } -int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) { - SWal *pWal = pRead->pWal; - if (!pRead->curInvalid && ver == pRead->curVersion) { +int32_t walReadSeekVer(SWalReader *pReader, int64_t ver) { + SWal *pWal = pReader->pWal; + if (!pReader->curInvalid && ver == pReader->curVersion) { wDebug("wal version %ld match, no need to reset", ver); return 0; } - pRead->curInvalid = 1; - pRead->curVersion = ver; + pReader->curInvalid = 1; + pReader->curVersion = ver; if (ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { - wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId, + wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId, ver, pWal->vers.firstVer, pWal->vers.lastVer); terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; return -1; @@ -210,7 +215,7 @@ int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) { if (ver < pWal->vers.snapshotVer) { } - if (walReadSeekVerImpl(pRead, ver) < 0) { + if (walReadSeekVerImpl(pReader, ver) < 0) { return -1; } diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c new file mode 100644 index 0000000000000000000000000000000000000000..bd0f6fb1a8106d47b7c874f7e7ac3a99f9384911 --- /dev/null +++ b/source/libs/wal/src/walRef.c @@ -0,0 +1,89 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "cJSON.h" +#include "os.h" +#include "taoserror.h" +#include "tutil.h" +#include "walInt.h" + +SWalRef *walOpenRef(SWal *pWal) { + SWalRef *pRef = taosMemoryCalloc(1, sizeof(SWalRef)); + if (pRef == NULL) { + return NULL; + } + pRef->refId = tGenIdPI64(); + pRef->refVer = -1; + pRef->refFile = -1; + pRef->pWal = pWal; + taosHashPut(pWal->pRefHash, &pRef->refId, sizeof(int64_t), &pRef, sizeof(void *)); + return pRef; +} + +void walCloseRef(SWal *pWal, int64_t refId) { + SWalRef *pRef = *(SWalRef **)taosHashGet(pWal->pRefHash, &refId, sizeof(int64_t)); + taosHashRemove(pWal->pRefHash, &refId, sizeof(int64_t)); + taosMemoryFree(pRef); +} + +int32_t walRefVer(SWalRef *pRef, int64_t ver) { + SWal *pWal = pRef->pWal; + if (pRef->refVer != ver) { + taosThreadMutexLock(&pWal->mutex); + if (ver < pWal->vers.firstVer || ver > pWal->vers.lastVer) { + taosThreadMutexUnlock(&pWal->mutex); + terrno = TSDB_CODE_WAL_INVALID_VER; + return -1; + } + + pRef->refVer = ver; + // bsearch in fileSet + SWalFileInfo tmpInfo; + tmpInfo.firstVer = ver; + SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); + ASSERT(pRet != NULL); + pRef->refFile = pRet->firstVer; + + taosThreadMutexUnlock(&pWal->mutex); + } + + return 0; +} + +void walUnrefVer(SWalRef *pRef) { + pRef->refId = -1; + pRef->refFile = -1; +} + +SWalRef *walRefCommittedVer(SWal *pWal) { + SWalRef *pRef = walOpenRef(pWal); + if (pRef == NULL) { + return NULL; + } + taosThreadMutexLock(&pWal->mutex); + + int64_t ver = walGetCommittedVer(pWal); + + pRef->refVer = ver; + // bsearch in fileSet + SWalFileInfo tmpInfo; + tmpInfo.firstVer = ver; + SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); + ASSERT(pRet != NULL); + pRef->refFile = pRet->firstVer; + + taosThreadMutexUnlock(&pWal->mutex); + return pRef; +} diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index d6348cc5dd78776b8a4d17dbb6492e0a63f6ba37..81500d80882f930e253771b835ac0f9e63f0f57b 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -26,7 +26,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { pIter = taosHashIterate(pWal->pRefHash, pIter); if (pIter == NULL) break; SWalRef *pRef = (SWalRef *)pIter; - if (pRef->ver != -1) { + if (pRef->refVer != -1) { taosHashCancelIterate(pWal->pRefHash, pIter); return -1; } @@ -215,22 +215,23 @@ int32_t walRollback(SWal *pWal, int64_t ver) { static FORCE_INLINE int32_t walCheckAndRoll(SWal *pWal) { if (taosArrayGetSize(pWal->fileInfoSet) == 0) { - /*pWal->vers.firstVer = index;*/ if (walRollImpl(pWal) < 0) { return -1; } - } else { - int64_t passed = walGetSeq() - pWal->lastRollSeq; - if (pWal->cfg.rollPeriod != -1 && pWal->cfg.rollPeriod != 0 && passed > pWal->cfg.rollPeriod) { - if (walRollImpl(pWal) < 0) { - return -1; - } - } else if (pWal->cfg.segSize != -1 && pWal->cfg.segSize != 0 && walGetLastFileSize(pWal) > pWal->cfg.segSize) { - if (walRollImpl(pWal) < 0) { - return -1; - } + return 0; + } + + int64_t passed = walGetSeq() - pWal->lastRollSeq; + if (pWal->cfg.rollPeriod != -1 && pWal->cfg.rollPeriod != 0 && passed > pWal->cfg.rollPeriod) { + if (walRollImpl(pWal) < 0) { + return -1; + } + } else if (pWal->cfg.segSize != -1 && pWal->cfg.segSize != 0 && walGetLastFileSize(pWal) > pWal->cfg.segSize) { + if (walRollImpl(pWal) < 0) { + return -1; } } + return 0; } @@ -260,6 +261,16 @@ int32_t walEndSnapshot(SWal *pWal) { pWal->vers.snapshotVer = ver; int ts = taosGetTimestampSec(); + int64_t minVerToDelete = ver; + void *pIter = NULL; + while (1) { + pIter = taosHashIterate(pWal->pRefHash, pIter); + if (pIter == NULL) break; + SWalRef *pRef = *(SWalRef **)pIter; + if (pRef->refVer == -1) continue; + minVerToDelete = TMIN(minVerToDelete, pRef->refVer); + } + int deleteCnt = 0; int64_t newTotSize = pWal->totSize; SWalFileInfo tmp; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 80ad480e43aca9d42e4f755d23e1730aae5514de..ad6eff3c122af7b831ecb3b15157bff54dadc41c 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -512,7 +512,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_OFFSET_UNIT, "Cannot use 'year' as TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_OFFSET_TOO_BIG, "Interval offset should be shorter than interval") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_SLIDING_UNIT, "Does not support sliding when interval is natural month/year") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_SLIDING_TOO_BIG, "sliding value no larger than the interval value") -TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_SLIDING_TOO_SMALL, "sliding value can not less than 1% of interval value") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_SLIDING_TOO_SMALL, "sliding value can not less than 1%% of interval value") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_ONLY_ONE_JSON_TAG, "Only one tag if there is a json tag") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INCORRECT_NUM_OF_COL, "Query block has incorrect number of result columns") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL, "Incorrect TIMESTAMP value")