diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index b27150c9fbe2dc422bd0abbc23dbe6f43b31e02c..a108cc6937d16cd41332d0a7e6dc1f3c60990d19 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -66,6 +66,7 @@ typedef enum ENodeType { QUERY_NODE_DATABLOCK_DESC, QUERY_NODE_SLOT_DESC, QUERY_NODE_COLUMN_DEF, + QUERY_NODE_DOWNSTREAM_SOURCE, // Statement nodes are used in parser and planner module. QUERY_NODE_SET_OPERATOR, @@ -98,6 +99,7 @@ typedef enum ENodeType { QUERY_NODE_LOGIC_PLAN_AGG, QUERY_NODE_LOGIC_PLAN_PROJECT, QUERY_NODE_LOGIC_PLAN_VNODE_MODIF, + QUERY_NODE_LOGIC_PLAN_EXCHANGE, QUERY_NODE_LOGIC_SUBPLAN, QUERY_NODE_LOGIC_PLAN, diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index c805eba3205540dc28b65e0a1d25f1015894f660..084d6422925159dbd247ccff5c58906ce23792ad 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -67,12 +67,17 @@ typedef struct SProjectLogicNode { } SProjectLogicNode; typedef struct SVnodeModifLogicNode { - ENodeType type;; + SLogicNode node;; int32_t msgType; SArray* pDataBlocks; SVgDataBlocks* pVgDataBlocks; } SVnodeModifLogicNode; +typedef struct SExchangeLogicNode { + SLogicNode node; + int32_t srcGroupId; +} SExchangeLogicNode; + typedef enum ESubplanType { SUBPLAN_TYPE_MERGE = 1, SUBPLAN_TYPE_PARTIAL, @@ -80,13 +85,22 @@ typedef enum ESubplanType { SUBPLAN_TYPE_MODIFY } ESubplanType; +typedef struct SSubplanId { + uint64_t queryId; + int32_t groupId; + int32_t subplanId; +} SSubplanId; + typedef struct SSubLogicPlan { ENodeType type; + SSubplanId id; SNodeList* pChildren; SNodeList* pParents; SLogicNode* pNode; ESubplanType subplanType; + SVgroupsInfo* pVgroupList; int32_t level; + int32_t splitFlag; } SSubLogicPlan; typedef struct SQueryLogicPlan { @@ -161,20 +175,21 @@ typedef struct SAggPhysiNode { SNodeList* pAggFuncs; } SAggPhysiNode; -typedef struct SDownstreamSource { +typedef struct SDownstreamSourceNode { + ENodeType type; SQueryNodeAddr addr; - uint64_t taskId; - uint64_t schedId; -} SDownstreamSource; + uint64_t taskId; + uint64_t schedId; +} SDownstreamSourceNode; typedef struct SExchangePhysiNode { - SPhysiNode node; - uint64_t srcTemplateId; // template id of datasource suplans - SArray* pSrcEndPoints; // SArray, scheduler fill by calling qSetSuplanExecutionNode + SPhysiNode node; + int32_t srcGroupId; // group id of datasource suplans + SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode } SExchangePhysiNode; typedef struct SDataSinkNode { - ENodeType type;; + ENodeType type; SDataBlockDescNode* pInputDataBlockDesc; } SDataSinkNode; @@ -189,12 +204,6 @@ typedef struct SDataInserterNode { char *pData; } SDataInserterNode; -typedef struct SSubplanId { - uint64_t queryId; - int32_t templateId; - int32_t subplanId; -} SSubplanId; - typedef struct SSubplan { ENodeType type; SSubplanId id; // unique id of the subplan diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 0d62b7f0dfad6b0e998d29c130862e9508cf3665..5d6ec46d85e2c27ca1cde6ad119e42ea4b01fd5c 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -32,9 +32,9 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo // Set datasource of this subplan, multiple calls may be made to a subplan. // @subplan subplan to be schedule -// @templateId templateId of a group of datasource subplans of this @subplan +// @groupId id of a group of datasource subplans of this @subplan // @ep one execution location of this group of datasource subplans -void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource); +void qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource); // Convert to subplan to string for the scheduler to send to the executor int32_t qSubPlanToString(const SSubplan* subplan, char** str, int32_t* len); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index ea97f10dd5cdbb5aa1c14ac8871c5d11df1a70fa..f5ef9ccc164c65335f9a7c6de663abb74f8fd69e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4984,7 +4984,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo *pExchangeInfo, SExecTaskInf return pTaskInfo->code; } - SDownstreamSource *pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex); + SDownstreamSourceNode *pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex); SSourceDataInfo *pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex); qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu, @@ -5082,7 +5082,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx } SRetrieveTableRsp* pRsp = pDataInfo->pRsp; - SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, i); + SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i); SSDataBlock* pRes = pExchangeInfo->pResult; @@ -5179,7 +5179,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) { tsem_wait(&pExchangeInfo->ready); SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current); - SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current); + SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current); SRetrieveTableRsp* pRsp = pDataInfo->pRsp; if (pRsp->numOfRows == 0) { diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index d9eeb6eeeb324ea7b9c03d432a5eee7552c43ce7..e3dd2b499c8b20cd33e0eb264e26c2a4c34b615f 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -51,7 +51,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { } }; -const int funcMgtBuiltinsNum = (sizeof(funcMgtBuiltins) / sizeof(SBuiltinFuncDefinition)); +const int32_t funcMgtBuiltinsNum = (sizeof(funcMgtBuiltins) / sizeof(SBuiltinFuncDefinition)); int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { return TSDB_CODE_SUCCESS; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 27d71ff53269e94979ea681c0b660c62e4d50f02..e11a7a6833abc9c9d3f20a6671942a8e3d685bf1 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -60,6 +60,9 @@ #define CLONE_OBJECT_FIELD(fldname, cloneFunc) \ do { \ + if (NULL == (pSrc)->fldname) { \ + break; \ + } \ (pDst)->fldname = cloneFunc((pSrc)->fldname); \ if (NULL == (pDst)->fldname) { \ nodesDestroyNode((SNode*)(pDst)); \ @@ -234,10 +237,17 @@ static SNode* logicProjectCopy(const SProjectLogicNode* pSrc, SProjectLogicNode* } static SNode* logicVnodeModifCopy(const SVnodeModifLogicNode* pSrc, SVnodeModifLogicNode* pDst) { + COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); COPY_SCALAR_FIELD(msgType); return (SNode*)pDst; } +static SNode* logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNode* pDst) { + COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); + COPY_SCALAR_FIELD(srcGroupId); + return (SNode*)pDst; +} + static SNode* logicSubplanCopy(const SSubLogicPlan* pSrc, SSubLogicPlan* pDst) { CLONE_NODE_FIELD(pNode); COPY_SCALAR_FIELD(subplanType); @@ -304,6 +314,8 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) { return logicProjectCopy((const SProjectLogicNode*)pNode, (SProjectLogicNode*)pDst); case QUERY_NODE_LOGIC_PLAN_VNODE_MODIF: return logicVnodeModifCopy((const SVnodeModifLogicNode*)pNode, (SVnodeModifLogicNode*)pDst); + case QUERY_NODE_LOGIC_PLAN_EXCHANGE: + return logicExchangeCopy((const SExchangeLogicNode*)pNode, (SExchangeLogicNode*)pDst); case QUERY_NODE_LOGIC_SUBPLAN: return logicSubplanCopy((const SSubLogicPlan*)pNode, (SSubLogicPlan*)pDst); default: diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index b28c45e55407ffcee74660813aa2c2ee133ca542..2e004aa4b23148a66c7b8a9bb422f18c480621b2 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -496,6 +496,37 @@ static int32_t jsonToPhysiAggNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkExchangePhysiPlanSrcGroupId = "SrcGroupId"; +static const char* jkExchangePhysiPlanSrcEndPoints = "SrcEndPoints"; + +static int32_t physiExchangeNodeToJson(const void* pObj, SJson* pJson) { + const SExchangePhysiNode* pNode = (const SExchangePhysiNode*)pObj; + + int32_t code = physicPlanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkExchangePhysiPlanSrcGroupId, pNode->srcGroupId); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkExchangePhysiPlanSrcEndPoints, pNode->pSrcEndPoints); + } + + return code; +} + +static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) { + SExchangePhysiNode* pNode = (SExchangePhysiNode*)pObj; + + int32_t code = jsonToPhysicPlanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkExchangePhysiPlanSrcGroupId, &pNode->srcGroupId); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkExchangePhysiPlanSrcEndPoints, &pNode->pSrcEndPoints); + } + + return code; +} + static const char* jkDataSinkInputDataBlockDesc = "InputDataBlockDesc"; static int32_t physicDataSinkNodeToJson(const void* pObj, SJson* pJson) { @@ -517,7 +548,7 @@ static int32_t jsonToPhysiDispatchNode(const SJson* pJson, void* pObj) { } static const char* jkSubplanIdQueryId = "QueryId"; -static const char* jkSubplanIdTemplateId = "TemplateId"; +static const char* jkSubplanIdGroupId = "GroupId"; static const char* jkSubplanIdSubplanId = "SubplanId"; static int32_t subplanIdToJson(const void* pObj, SJson* pJson) { @@ -525,7 +556,7 @@ static int32_t subplanIdToJson(const void* pObj, SJson* pJson) { int32_t code = tjsonAddIntegerToObject(pJson, jkSubplanIdQueryId, pNode->queryId); if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddIntegerToObject(pJson, jkSubplanIdTemplateId, pNode->templateId); + code = tjsonAddIntegerToObject(pJson, jkSubplanIdGroupId, pNode->groupId); } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkSubplanIdSubplanId, pNode->subplanId); @@ -539,7 +570,7 @@ static int32_t jsonToSubplanId(const SJson* pJson, void* pObj) { int32_t code = tjsonGetUBigIntValue(pJson, jkSubplanIdQueryId, &pNode->queryId); if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetIntValue(pJson, jkSubplanIdTemplateId, &pNode->templateId); + code = tjsonGetIntValue(pJson, jkSubplanIdGroupId, &pNode->groupId); } if (TSDB_CODE_SUCCESS == code) { code = tjsonGetIntValue(pJson, jkSubplanIdSubplanId, &pNode->subplanId); @@ -1387,7 +1418,9 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_PHYSICAL_PLAN_AGG: return physiAggNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: + return physiExchangeNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_SORT: + break; case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: return physiDispatchNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_INSERT: @@ -1459,6 +1492,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToPhysiJoinNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_AGG: return jsonToPhysiAggNode(pJson, pObj); + case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: + return jsonToPhysiExchangeNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: return jsonToPhysiDispatchNode(pJson, pObj); case QUERY_NODE_PHYSICAL_SUBPLAN: diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 39e64ba1075f20def7d526eb3be9027480dcafac..c2b96b1c997bbc0be66e054d5c595d005b5fca8b 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -129,6 +129,8 @@ SNodeptr nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SProjectLogicNode)); case QUERY_NODE_LOGIC_PLAN_VNODE_MODIF: return makeNode(type, sizeof(SVnodeModifLogicNode)); + case QUERY_NODE_LOGIC_PLAN_EXCHANGE: + return makeNode(type, sizeof(SExchangeLogicNode)); case QUERY_NODE_LOGIC_SUBPLAN: return makeNode(type, sizeof(SSubLogicPlan)); case QUERY_NODE_LOGIC_PLAN: diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index 991ad72a3112c1ee4e3668c6a1cbdeb095311151..d57f40ca3555e872387e0b46ab20b4eeab627b6c 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -50,6 +50,7 @@ extern "C" { int32_t createLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode); int32_t optimize(SPlanContext* pCxt, SLogicNode* pLogicNode); +int32_t applySplitRule(SSubLogicPlan* pSubplan); int32_t createPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan** pPlan, SArray* pExecNodeList); #ifdef __cplusplus diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 61c5f517a72c0de3da00b8ee0b00a395e37adaf6..b0c087b36a63ee981bf04507b911709039b337b5 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -28,6 +28,8 @@ typedef struct SPhysiPlanContext { int16_t nextDataBlockId; SArray* pLocationHelper; SArray* pExecNodeList; + int32_t groupId; + int32_t subplanId; } SPhysiPlanContext; static int32_t getSlotKey(SNode* pNode, char* pKey) { @@ -81,12 +83,7 @@ static int32_t addDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SData SNode* pNode = NULL; int16_t slotId = taosHashGetSize(pHash); FOREACH(pNode, pList) { - SNode* pSlot = createSlotDesc(pCxt, pNode, slotId); - CHECK_ALLOC(pSlot, TSDB_CODE_OUT_OF_MEMORY); - if (TSDB_CODE_SUCCESS != nodesListAppend(pDataBlockDesc->pSlots, (SNode*)pSlot)) { - nodesDestroyNode(pSlot); - return TSDB_CODE_OUT_OF_MEMORY; - } + CHECK_CODE_EXT(nodesListStrictAppend(pDataBlockDesc->pSlots, createSlotDesc(pCxt, pNode, slotId))); SSlotIndex index = { .dataBlockId = pDataBlockDesc->dataBlockId, .slotId = slotId }; char name[TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN]; @@ -97,7 +94,7 @@ static int32_t addDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SData CHECK_ALLOC(pTarget, TSDB_CODE_OUT_OF_MEMORY); REPLACE_NODE(pTarget); - pDataBlockDesc->resultRowSize += ((SSlotDescNode*)pSlot)->dataType.bytes; + pDataBlockDesc->resultRowSize += ((SExprNode*)pNode)->resType.bytes; ++slotId; } return TSDB_CODE_SUCCESS; @@ -467,6 +464,14 @@ static SPhysiNode* createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pC return (SPhysiNode*)pProject; } +static SPhysiNode* createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode) { + SExchangePhysiNode* pExchange = (SExchangePhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE); + CHECK_ALLOC(pExchange, NULL); + CHECK_CODE(addDataBlockDesc(pCxt, pExchangeLogicNode->node.pTargets, pExchange->node.pOutputDataBlockDesc), (SPhysiNode*)pExchange); + pExchange->srcGroupId = pExchangeLogicNode->srcGroupId; + return (SPhysiNode*)pExchange; +} + static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SLogicNode* pLogicPlan) { SNodeList* pChildren = nodesMakeList(); CHECK_ALLOC(pChildren, NULL); @@ -495,6 +500,9 @@ static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, case QUERY_NODE_LOGIC_PLAN_PROJECT: pPhyNode = createProjectPhysiNode(pCxt, pChildren, (SProjectLogicNode*)pLogicPlan); break; + case QUERY_NODE_LOGIC_PLAN_EXCHANGE: + pPhyNode = createExchangePhysiNode(pCxt, (SExchangeLogicNode*)pLogicPlan); + break; default: break; } @@ -525,8 +533,15 @@ static SDataSinkNode* createDataDispatcher(SPhysiPlanContext* pCxt, const SPhysi return (SDataSinkNode*)pDispatcher; } +static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLogicSubplan) { + SSubplan* pSubplan = nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); + CHECK_ALLOC(pSubplan, NULL); + pSubplan->id = pLogicSubplan->id; + return pSubplan; +} + static SSubplan* createPhysiSubplan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLogicSubplan) { - SSubplan* pSubplan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); + SSubplan* pSubplan = makeSubplan(pCxt, pLogicSubplan); CHECK_ALLOC(pSubplan, NULL); if (SUBPLAN_TYPE_MODIFY == pLogicSubplan->subplanType) { SVnodeModifLogicNode* pModif = (SVnodeModifLogicNode*)pLogicSubplan->pNode; @@ -540,9 +555,22 @@ static SSubplan* createPhysiSubplan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLog pSubplan->msgType = TDMT_VND_QUERY; } pSubplan->subplanType = pLogicSubplan->subplanType; + pSubplan->level = pLogicSubplan->level; return pSubplan; } +static void doSetLogicNodeParent(SLogicNode* pNode, SLogicNode* pParent) { + pNode->pParent = pParent; + SNode* pChild; + FOREACH(pChild, pNode->pChildren) { + doSetLogicNodeParent((SLogicNode*)pChild, pNode); + } +} + +static void setLogicNodeParent(SLogicNode* pNode) { + doSetLogicNodeParent(pNode, NULL); +} + static int32_t splitLogicPlan(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, SSubLogicPlan** pSubLogicPlan) { *pSubLogicPlan = (SSubLogicPlan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN); CHECK_ALLOC(*pSubLogicPlan, TSDB_CODE_OUT_OF_MEMORY); @@ -553,8 +581,9 @@ static int32_t splitLogicPlan(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode, S } else { (*pSubLogicPlan)->subplanType = SUBPLAN_TYPE_MERGE; } - // todo split - return TSDB_CODE_SUCCESS; + (*pSubLogicPlan)->id.queryId = pCxt->pPlanCxt->queryId; + setLogicNodeParent((*pSubLogicPlan)->pNode); + return applySplitRule(*pSubLogicPlan); } static int32_t pushSubplan(SPhysiPlanContext* pCxt, SNodeptr pSubplan, int32_t level, SNodeList* pSubplans) { @@ -571,6 +600,7 @@ static int32_t pushSubplan(SPhysiPlanContext* pCxt, SNodeptr pSubplan, int32_t l CHECK_ALLOC(pGroup->pNodeList, TSDB_CODE_OUT_OF_MEMORY); } CHECK_CODE(nodesListStrictAppend(pGroup->pNodeList, pSubplan), TSDB_CODE_OUT_OF_MEMORY); + return TSDB_CODE_SUCCESS; } SSubLogicPlan* singleCloneSubLogicPlan(SPhysiPlanContext* pCxt, SSubLogicPlan* pSrc, int32_t level) { @@ -583,29 +613,105 @@ SSubLogicPlan* singleCloneSubLogicPlan(SPhysiPlanContext* pCxt, SSubLogicPlan* p } pDst->subplanType = pSrc->subplanType; pDst->level = level; + pDst->id.queryId = pCxt->pPlanCxt->queryId; + pDst->id.groupId = pCxt->groupId; + pDst->id.subplanId = pCxt->subplanId++; return pDst; } -static int32_t doScaleOut(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t level, SQueryLogicPlan* pLogicPlan) { - if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType) { - SVnodeModifLogicNode* pNode = (SVnodeModifLogicNode*)pSubplan->pNode; - size_t numOfVgroups = taosArrayGetSize(pNode->pDataBlocks); - for (int32_t i = 0; i < numOfVgroups; ++i) { +static int32_t scaleOutForModify(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t level, SQueryLogicPlan* pLogicPlan, SNodeList* pGroup) { + SVnodeModifLogicNode* pNode = (SVnodeModifLogicNode*)pSubplan->pNode; + size_t numOfVgroups = taosArrayGetSize(pNode->pDataBlocks); + for (int32_t i = 0; i < numOfVgroups; ++i) { + SSubLogicPlan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level); + CHECK_ALLOC(pNewSubplan, TSDB_CODE_OUT_OF_MEMORY); + SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pNode->pDataBlocks, i); + ((SVnodeModifLogicNode*)pNewSubplan->pNode)->pVgDataBlocks = blocks; + // CHECK_CODE_EXT(pushSubplan(pCxt, pNewSubplan, level, pLogicPlan->pSubplans)); + CHECK_CODE_EXT(nodesListAppend(pGroup, pNewSubplan)); + } + return TSDB_CODE_SUCCESS; +} + +static int32_t scaleOutForMerge(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t level, SQueryLogicPlan* pLogicPlan, SNodeList* pGroup) { + // SSubLogicPlan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level); + // CHECK_ALLOC(pNewSubplan, TSDB_CODE_OUT_OF_MEMORY); + // CHECK_CODE_EXT(pushSubplan(pCxt, pNewSubplan, level, pLogicPlan->pSubplans)); + // return TSDB_CODE_SUCCESS; + return nodesListStrictAppend(pGroup, singleCloneSubLogicPlan(pCxt, pSubplan, level)); +} + +static int32_t doSetScanVgroup(SPhysiPlanContext* pCxt, SLogicNode* pNode, const SVgroupInfo* pVgroup, bool* pFound) { + if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { + SScanLogicNode* pScan = (SScanLogicNode*)pNode; + pScan->pVgroupList = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo)); + CHECK_ALLOC(pScan->pVgroupList, TSDB_CODE_OUT_OF_MEMORY); + memcpy(pScan->pVgroupList->vgroups, pVgroup, sizeof(SVgroupInfo)); + *pFound = true; + return TSDB_CODE_SUCCESS; + } + SNode* pChild = NULL; + FOREACH(pChild, pNode->pChildren) { + int32_t code = doSetScanVgroup(pCxt, (SLogicNode*)pChild, pVgroup, pFound); + if (TSDB_CODE_SUCCESS != code || *pFound) { + return code; + } + } + return TSDB_CODE_SUCCESS; +} + +static int32_t setScanVgroup(SPhysiPlanContext* pCxt, SLogicNode* pNode, const SVgroupInfo* pVgroup) { + bool found = false; + return doSetScanVgroup(pCxt, pNode, pVgroup, &found); +} + +static int32_t scaleOutForScan(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t level, SQueryLogicPlan* pLogicPlan, SNodeList* pGroup) { + if (pSubplan->pVgroupList) { + for (int32_t i = 0; i < pSubplan->pVgroupList->numOfVgroups; ++i) { SSubLogicPlan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level); CHECK_ALLOC(pNewSubplan, TSDB_CODE_OUT_OF_MEMORY); - SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pNode->pDataBlocks, i); - ((SVnodeModifLogicNode*)pNewSubplan->pNode)->pVgDataBlocks = blocks; - CHECK_CODE_EXT(pushSubplan(pCxt, pNewSubplan, level, pLogicPlan->pSubplans)); + CHECK_CODE_EXT(setScanVgroup(pCxt, pNewSubplan->pNode, pSubplan->pVgroupList->vgroups + i)); + // CHECK_CODE_EXT(pushSubplan(pCxt, pNewSubplan, level, pLogicPlan->pSubplans)); + CHECK_CODE_EXT(nodesListAppend(pGroup, pNewSubplan)); } + return TSDB_CODE_SUCCESS; } else { - SSubLogicPlan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level); - CHECK_ALLOC(pNewSubplan, TSDB_CODE_OUT_OF_MEMORY); - CHECK_CODE_EXT(pushSubplan(pCxt, pNewSubplan, level, pLogicPlan->pSubplans)); + return scaleOutForMerge(pCxt, pSubplan, level, pLogicPlan, pGroup); } +} +// static int32_t pushHierarchicalPlan(SNodeList* pParentsGroup, SNodeList* pCurrentGroup, int32_t level, SQueryLogicPlan* pLogicPlan) { +// FOREACH() { + +// } +// } + +static int32_t doScaleOut(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t level, SQueryLogicPlan* pLogicPlan, SHashObj* pHash, SNodeList* pParentsGroup) { + SNodeList* pCurrentGroup = nodesMakeList(); + CHECK_ALLOC(pCurrentGroup, TSDB_CODE_OUT_OF_MEMORY); + int32_t code = TSDB_CODE_SUCCESS; + switch (pSubplan->subplanType) { + case SUBPLAN_TYPE_MERGE: + code = scaleOutForMerge(pCxt, pSubplan, level, pLogicPlan, pCurrentGroup); + break; + case SUBPLAN_TYPE_SCAN: + code = scaleOutForScan(pCxt, pSubplan, level, pLogicPlan, pCurrentGroup); + break; + case SUBPLAN_TYPE_MODIFY: + code = scaleOutForModify(pCxt, pSubplan, level, pLogicPlan, pCurrentGroup); + break; + default: + break; + } + if (TSDB_CODE_SUCCESS != code) { + return code; + } + // pushHierarchicalPlan(pParentsGroup, pCurrentGroup, level, pLogicPlan); + CHECK_CODE(taosHashPut(pHash, &pCxt->groupId, sizeof(pCxt->groupId), &pSubplan->id.groupId, sizeof(pSubplan->id.groupId)), TSDB_CODE_OUT_OF_MEMORY); + ++(pCxt->groupId); SNode* pChild; FOREACH(pChild, pSubplan->pChildren) { - CHECK_CODE_EXT(doScaleOut(pCxt, (SSubLogicPlan*)pChild, level + 1, pLogicPlan)); + CHECK_CODE_EXT(doScaleOut(pCxt, (SSubLogicPlan*)pChild, level + 1, pLogicPlan, pHash, pCurrentGroup)); } return TSDB_CODE_SUCCESS; @@ -622,10 +728,49 @@ static SQueryLogicPlan* makeQueryLogicPlan(SPhysiPlanContext* pCxt) { return pLogicPlan; } +static int32_t doMappingLogicPlan(SLogicNode* pNode, SHashObj* pHash) { + if (QUERY_NODE_LOGIC_PLAN_EXCHANGE == nodeType(pNode)) { + SExchangeLogicNode* pExchange = (SExchangeLogicNode*)pNode; + int32_t* pGroupId = taosHashGet(pHash, &pExchange->srcGroupId, sizeof(pExchange->srcGroupId)); + if (NULL == pGroupId) { + return TSDB_CODE_FAILED; + } + pExchange->srcGroupId = *pGroupId; + return TSDB_CODE_SUCCESS; + } + + SNode* pChild; + FOREACH(pChild, pNode->pChildren) { + doMappingLogicPlan((SLogicNode*)pChild, pHash); + } + return TSDB_CODE_SUCCESS; +} + +static int32_t mappingLogicPlan(SQueryLogicPlan* pLogicPlan, SHashObj* pHash) { + SNode* pNode = NULL; + FOREACH(pNode, pLogicPlan->pSubplans) { + SNode* pSubplan = NULL; + FOREACH(pSubplan, ((SNodeListNode*)pNode)->pNodeList) { + int32_t code = doMappingLogicPlan(((SSubLogicPlan*)pSubplan)->pNode, pHash); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + } + } + return TSDB_CODE_SUCCESS; +} + static int32_t scaleOutLogicPlan(SPhysiPlanContext* pCxt, SSubLogicPlan* pRootSubLogicPlan, SQueryLogicPlan** pLogicPlan) { *pLogicPlan = makeQueryLogicPlan(pCxt); CHECK_ALLOC(*pLogicPlan, TSDB_CODE_OUT_OF_MEMORY); - return doScaleOut(pCxt, pRootSubLogicPlan, 0, *pLogicPlan); + SHashObj* pHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + CHECK_ALLOC(pHash, TSDB_CODE_OUT_OF_MEMORY); + int32_t code = doScaleOut(pCxt, pRootSubLogicPlan, 0, *pLogicPlan, pHash, NULL); + if (TSDB_CODE_SUCCESS == code) { + code = mappingLogicPlan(*pLogicPlan, pHash); + } + taosHashCleanup(pHash); + return code; } typedef struct SBuildPhysiSubplanCxt { @@ -681,13 +826,12 @@ int32_t createPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan** if (NULL == cxt.pLocationHelper) { return TSDB_CODE_OUT_OF_MEMORY; } - SQueryLogicPlan* pLogicPlan; - SSubLogicPlan* pSubLogicPlan; + SQueryLogicPlan* pLogicPlan = NULL; + SSubLogicPlan* pSubLogicPlan = NULL; int32_t code = splitLogicPlan(&cxt, pLogicNode, &pSubLogicPlan); if (TSDB_CODE_SUCCESS == code) { code = scaleOutLogicPlan(&cxt, pSubLogicPlan, &pLogicPlan); } - // todo maping if (TSDB_CODE_SUCCESS == code) { code = buildPhysiPlan(&cxt, pLogicPlan, pPlan); } diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index d7d9f1d129b80b9c43d845f7cb5754cecd7c0a47..4fa9b6ba10c7df422262850bdd3be0d8cf2a1b5a 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -34,7 +34,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo return code; } -void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource) { +void qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource) { } @@ -62,5 +62,5 @@ SQueryPlan* qStringToQueryPlan(const char* pStr) { } void qDestroyQueryPlan(SQueryPlan* pPlan) { - + nodesDestroyNode(pPlan); } diff --git a/source/libs/planner/src/splitPlan.c b/source/libs/planner/src/splitPlan.c new file mode 100644 index 0000000000000000000000000000000000000000..97e2eefb673a8083a7278d02cb1d585451d43232 --- /dev/null +++ b/source/libs/planner/src/splitPlan.c @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "plannerInt.h" + +#define SPLIT_FLAG_MASK(n) (1 << n) + +#define SPLIT_FLAG_STS SPLIT_FLAG_MASK(0) + +#define SPLIT_FLAG_SET_MASK(val, mask) (val) |= (mask) +#define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0) + +typedef struct SSplitContext { + int32_t errCode; + int32_t groupId; + bool match; + void* pInfo; +} SSplitContext; + +typedef int32_t (*FMatch)(SSplitContext* pCxt, SSubLogicPlan* pSubplan); +typedef int32_t (*FSplit)(SSplitContext* pCxt); + +typedef struct SSplitRule { + char* pName; + FMatch matchFunc; + FSplit splitFunc; +} SSplitRule; + +typedef struct SStsInfo { + SScanLogicNode* pScan; + SSubLogicPlan* pSubplan; +} SStsInfo; + +static SLogicNode* stsMatchByNode(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && TSDB_SUPER_TABLE == ((SScanLogicNode*)pNode)->pMeta->tableType) { + return pNode; + } + SNode* pChild; + FOREACH(pChild, pNode->pChildren) { + SLogicNode* pSplitNode = stsMatchByNode((SLogicNode*)pChild); + if (NULL != pSplitNode) { + return pSplitNode; + } + } + return NULL; +} + +static int32_t stsMatch(SSplitContext* pCxt, SSubLogicPlan* pSubplan) { + if (SPLIT_FLAG_TEST_MASK(pSubplan->splitFlag, SPLIT_FLAG_STS)) { + return TSDB_CODE_SUCCESS; + } + SLogicNode* pSplitNode = stsMatchByNode(pSubplan->pNode); + if (NULL != pSplitNode) { + SStsInfo* pInfo = calloc(1, sizeof(SStsInfo)); + CHECK_ALLOC(pInfo, TSDB_CODE_OUT_OF_MEMORY); + pInfo->pScan = (SScanLogicNode*)pSplitNode; + pInfo->pSubplan = pSubplan; + pCxt->pInfo = pInfo; + pCxt->match = true; + return TSDB_CODE_SUCCESS; + } + SNode* pChild; + FOREACH(pChild, pSubplan->pChildren) { + int32_t code = stsMatch(pCxt, (SSubLogicPlan*)pChild); + if (TSDB_CODE_SUCCESS != code || pCxt->match) { + return code; + } + } + return TSDB_CODE_SUCCESS; +} + +static SSubLogicPlan* stsCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* pScan) { + SSubLogicPlan* pSubplan = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN); + if (NULL == pSubplan) { + return NULL; + } + pSubplan->id.groupId = pCxt->groupId; + pSubplan->subplanType = SUBPLAN_TYPE_SCAN; + pSubplan->pNode = (SLogicNode*)nodesCloneNode(pScan); + TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pSubplan->pNode)->pVgroupList, SVgroupsInfo); + SPLIT_FLAG_SET_MASK(pSubplan->splitFlag, SPLIT_FLAG_STS); + return pSubplan; +} + +static int32_t stsCreateExchangeNode(SSplitContext* pCxt, SSubLogicPlan* pSubplan, SScanLogicNode* pScan) { + SExchangeLogicNode* pExchange = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE); + if (NULL == pExchange) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pExchange->srcGroupId = pCxt->groupId; + pExchange->node.pTargets = nodesCloneList(pScan->node.pTargets); + if (NULL == pExchange->node.pTargets) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + if (NULL == pScan->node.pParent) { + pSubplan->pNode = (SLogicNode*)pExchange; + return TSDB_CODE_SUCCESS; + } + + SNode* pNode; + FOREACH(pNode, pScan->node.pParent->pChildren) { + if (nodesEqualNode(pNode, pScan)) { + REPLACE_NODE(pExchange); + nodesDestroyNode(pNode); + return TSDB_CODE_SUCCESS; + } + } + nodesDestroyNode(pExchange); + return TSDB_CODE_FAILED; +} + +static int32_t stsSplit(SSplitContext* pCxt) { + SStsInfo* pInfo = pCxt->pInfo; + if (NULL == pInfo->pSubplan->pChildren) { + pInfo->pSubplan->pChildren = nodesMakeList(); + if (NULL == pInfo->pSubplan->pChildren) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + int32_t code = nodesListStrictAppend(pInfo->pSubplan->pChildren, stsCreateScanSubplan(pCxt, pInfo->pScan)); + if (TSDB_CODE_SUCCESS == code) { + code = stsCreateExchangeNode(pCxt, pInfo->pSubplan, pInfo->pScan); + } + ++(pCxt->groupId); + return code; +} + +static const SSplitRule splitRuleSet[] = { + { .pName = "SuperTableScan", .matchFunc = stsMatch, .splitFunc = stsSplit } +}; + +static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule)); + +int32_t applySplitRule(SSubLogicPlan* pSubplan) { + SSplitContext cxt = { .errCode = TSDB_CODE_SUCCESS, .groupId = pSubplan->id.groupId + 1, .match = false, .pInfo = NULL }; + bool split = false; + do { + split = false; + for (int32_t i = 0; i < splitRuleNum; ++i) { + cxt.match = false; + int32_t code = splitRuleSet[i].matchFunc(&cxt, pSubplan); + if (TSDB_CODE_SUCCESS == code && cxt.match) { + code = splitRuleSet[i].splitFunc(&cxt); + split = true; + } + if (TSDB_CODE_SUCCESS != code) { + return code; + } + } + } while (split); + return TSDB_CODE_SUCCESS; +} diff --git a/source/libs/planner/test/plannerTest.cpp b/source/libs/planner/test/plannerTest.cpp index 8ba80d1ab93699996deeb78da4572270d5849a32..ae25b157f60850ab0c8f49a67282182da6de750c 100644 --- a/source/libs/planner/test/plannerTest.cpp +++ b/source/libs/planner/test/plannerTest.cpp @@ -137,6 +137,13 @@ TEST_F(PlannerTest, simple) { ASSERT_TRUE(run()); } +TEST_F(PlannerTest, stSimple) { + setDatabase("root", "test"); + + bind("SELECT * FROM st1"); + ASSERT_TRUE(run()); +} + TEST_F(PlannerTest, groupBy) { setDatabase("root", "test"); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index fe886dfcdbe8f4f3f35f74d4406efcb48982e003..81398acc704b9d067a1966205a6d269152cc4972 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -797,8 +797,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { int32_t readyNum = atomic_add_fetch_32(&par->childReady, 1); SCH_LOCK(SCH_WRITE, &par->lock); - SDownstreamSource source = {.taskId = pTask->taskId, .schedId = schMgmt.sId, .addr = pTask->succeedAddr}; - qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &source); + SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE, .taskId = pTask->taskId, .schedId = schMgmt.sId, .addr = pTask->succeedAddr}; + qSetSubplanExecutionNode(par->plan, pTask->plan->id.groupId, &source); SCH_UNLOCK(SCH_WRITE, &par->lock); if (SCH_TASK_READY_TO_LUNCH(readyNum, par)) { diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 0347318ae59b4ef17f2ebe8a8f8decc0dbdb99c6..084a76ebdeb29af3c568dcd786886ea870298102 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -99,7 +99,7 @@ void schtBuildQueryDag(SQueryPlan *dag) { SSubplan *mergePlan = (SSubplan *)calloc(1, sizeof(SSubplan)); scanPlan->id.queryId = qId; - scanPlan->id.templateId = 0x0000000000000002; + scanPlan->id.groupId = 0x0000000000000002; scanPlan->id.subplanId = 0x0000000000000003; scanPlan->subplanType = SUBPLAN_TYPE_SCAN; @@ -114,7 +114,7 @@ void schtBuildQueryDag(SQueryPlan *dag) { scanPlan->msgType = TDMT_VND_QUERY; mergePlan->id.queryId = qId; - mergePlan->id.templateId = schtMergeTemplateId; + mergePlan->id.groupId = schtMergeTemplateId; mergePlan->id.subplanId = 0x5555; mergePlan->subplanType = SUBPLAN_TYPE_MERGE; mergePlan->level = 0; @@ -158,7 +158,7 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) { for (int32_t i = 0; i < scanPlanNum; ++i) { scanPlan[i].id.queryId = qId; - scanPlan[i].id.templateId = 0x0000000000000002; + scanPlan[i].id.groupId = 0x0000000000000002; scanPlan[i].id.subplanId = 0x0000000000000003 + i; scanPlan[i].subplanType = SUBPLAN_TYPE_SCAN; @@ -183,7 +183,7 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) { } mergePlan->id.queryId = qId; - mergePlan->id.templateId = schtMergeTemplateId; + mergePlan->id.groupId = schtMergeTemplateId; mergePlan->id.subplanId = 0x5555; mergePlan->subplanType = SUBPLAN_TYPE_MERGE; mergePlan->level = 0; @@ -216,7 +216,7 @@ void schtBuildInsertDag(SQueryPlan *dag) { SSubplan *insertPlan = (SSubplan *)calloc(2, sizeof(SSubplan)); insertPlan[0].id.queryId = qId; - insertPlan[0].id.templateId = 0x0000000000000003; + insertPlan[0].id.groupId = 0x0000000000000003; insertPlan[0].id.subplanId = 0x0000000000000004; insertPlan[0].subplanType = SUBPLAN_TYPE_MODIFY; insertPlan[0].level = 0; @@ -232,7 +232,7 @@ void schtBuildInsertDag(SQueryPlan *dag) { insertPlan[0].msgType = TDMT_VND_SUBMIT; insertPlan[1].id.queryId = qId; - insertPlan[1].id.templateId = 0x0000000000000003; + insertPlan[1].id.groupId = 0x0000000000000003; insertPlan[1].id.subplanId = 0x0000000000000005; insertPlan[1].subplanType = SUBPLAN_TYPE_MODIFY; insertPlan[1].level = 0; @@ -263,7 +263,7 @@ int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) { return 0; } -void schtExecNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) { +void schtExecNode(SSubplan* subplan, uint64_t groupId, SQueryNodeAddr* ep) { }