diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 8fb52ceb858d01919ab8a9d5b9abed9b6b5b7231..3a9dd3c713b09d627243e76884a5e5db887b3116 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -101,6 +101,7 @@ typedef enum ENodeType { QUERY_NODE_LOGIC_PLAN_PROJECT, QUERY_NODE_LOGIC_PLAN_VNODE_MODIF, QUERY_NODE_LOGIC_PLAN_EXCHANGE, + QUERY_NODE_LOGIC_PLAN_WINDOW, QUERY_NODE_LOGIC_SUBPLAN, QUERY_NODE_LOGIC_PLAN, @@ -115,6 +116,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_AGG, QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, QUERY_NODE_PHYSICAL_PLAN_SORT, + QUERY_NODE_PHYSICAL_PLAN_INTERVAL, QUERY_NODE_PHYSICAL_PLAN_DISPATCH, QUERY_NODE_PHYSICAL_PLAN_INSERT, QUERY_NODE_PHYSICAL_SUBPLAN, diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 717baf24cdb261cda7849d8e0c0cd8b174084ca9..c028c4a1b99663abaa80b4c8ab4a8a4b4d1b1f48 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -80,6 +80,22 @@ typedef struct SExchangeLogicNode { int32_t srcGroupId; } SExchangeLogicNode; +typedef enum EWindowType { + WINDOW_TYPE_INTERVAL = 1, + WINDOW_TYPE_SESSION, + WINDOW_TYPE_STATE +} EWindowType; + +typedef struct SWindowLogicNode { + SLogicNode node; + EWindowType winType; + SNodeList* pFuncs; + int64_t interval; + int64_t offset; + int64_t sliding; + SFillNode* pFill; +} SWindowLogicNode; + typedef enum ESubplanType { SUBPLAN_TYPE_MERGE = 1, SUBPLAN_TYPE_PARTIAL, @@ -191,6 +207,16 @@ typedef struct SExchangePhysiNode { SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode } SExchangePhysiNode; +typedef struct SIntervalPhysiNode { + SPhysiNode node; + SNodeList* pExprs; // these are expression list of parameter expression of function + SNodeList* pFuncs; + int64_t interval; + int64_t offset; + int64_t sliding; + SFillNode* pFill; +} SIntervalPhysiNode; + typedef struct SDataSinkNode { ENodeType type; SDataBlockDescNode* pInputDataBlockDesc; diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index fdc9368b76e989791453813581327e416007bc3c..8dfd736df6f3984f823b73d2703f154f68803afc 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -71,6 +71,10 @@ typedef struct SRpcInit { // call back to keep conn or not bool (*pfp)(void *parent, tmsg_t msgType); + // to support Send messages multiple times on a link + // + void* (*mfp)(void *parent, tmsg_t msgType); + void *parent; } SRpcInit; @@ -89,6 +93,9 @@ void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcCancelRequest(int64_t rid); +// just release client conn to rpc instance, no close sock +void rpcReleaseHandle(void *handle); + void rpcRefHandle(void *handle, int8_t type); void rpcUnrefHandle(void *handle, int8_t type); diff --git a/include/os/osLocale.h b/include/os/osLocale.h index 6e313eb8cde14e67f56a9e8334239ca714b88afd..ddafd2e93c5f549beb30304d17d96dfe408ae518 100644 --- a/include/os/osLocale.h +++ b/include/os/osLocale.h @@ -17,12 +17,16 @@ #define _TD_OS_LOCALE_H_ #include "os.h" -#include "osString.h" #ifdef __cplusplus extern "C" { #endif +// If the error is in a third-party library, place this header file under the third-party library header file. +#ifndef ALLOW_FORBID_FUNC + #define setlocale SETLOCALE_FUNC_TAOS_FORBID +#endif + char *taosCharsetReplace(char *charsetstr); void taosGetSystemLocale(char *outLocale, char *outCharset); void taosSetSystemLocale(const char *inLocale, const char *inCharSet); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 13a4fc70f70b062ac8bcebdeea27714a5779efc0..646b443fb748ccb3106fa87b25f8737d693594aa 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -195,11 +195,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) { pRequest->type = pQuery->msgType; SPlanContext cxt = { .queryId = pRequest->requestId, .pAstRoot = pQuery->pRoot, .acctId = pRequest->pTscObj->acctId }; - int32_t code = qCreateQueryPlan(&cxt, pPlan, pNodeList); - if (code != 0) { - return code; - } - return code; + return qCreateQueryPlan(&cxt, pPlan, pNodeList); } void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) { diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index c675827253ee08085383dcc0f91a69b54679f5cc..34c211561bd3840caa612e6a3fe074138f1545cc 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -183,6 +183,12 @@ static SNode* groupingSetNodeCopy(const SGroupingSetNode* pSrc, SGroupingSetNode return (SNode*)pDst; } +static SNode* fillNodeCopy(const SFillNode* pSrc, SFillNode* pDst) { + COPY_SCALAR_FIELD(mode); + CLONE_NODE_FIELD(pValues); + return (SNode*)pDst; +} + static SNode* logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) { COPY_SCALAR_FIELD(id); CLONE_NODE_LIST_FIELD(pTargets); @@ -248,6 +254,17 @@ static SNode* logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNo return (SNode*)pDst; } +static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pDst) { + COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); + COPY_SCALAR_FIELD(winType); + CLONE_NODE_LIST_FIELD(pFuncs); + COPY_SCALAR_FIELD(interval); + COPY_SCALAR_FIELD(offset); + COPY_SCALAR_FIELD(sliding); + CLONE_NODE_FIELD(pFill); + return (SNode*)pDst; +} + static SNode* logicSubplanCopy(const SSubLogicPlan* pSrc, SSubLogicPlan* pDst) { CLONE_NODE_FIELD(pNode); COPY_SCALAR_FIELD(subplanType); @@ -309,6 +326,8 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) { case QUERY_NODE_ORDER_BY_EXPR: case QUERY_NODE_LIMIT: break; + case QUERY_NODE_FILL: + return fillNodeCopy((const SFillNode*)pNode, (SFillNode*)pDst); case QUERY_NODE_DATABLOCK_DESC: return dataBlockDescCopy((const SDataBlockDescNode*)pNode, (SDataBlockDescNode*)pDst); case QUERY_NODE_SLOT_DESC: @@ -325,6 +344,8 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) { return logicVnodeModifCopy((const SVnodeModifLogicNode*)pNode, (SVnodeModifLogicNode*)pDst); case QUERY_NODE_LOGIC_PLAN_EXCHANGE: return logicExchangeCopy((const SExchangeLogicNode*)pNode, (SExchangeLogicNode*)pDst); + case QUERY_NODE_LOGIC_PLAN_WINDOW: + return logicWindowCopy((const SWindowLogicNode*)pNode, (SWindowLogicNode*)pDst); case QUERY_NODE_LOGIC_SUBPLAN: return logicSubplanCopy((const SSubLogicPlan*)pNode, (SSubLogicPlan*)pDst); default: diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 782cdcb71a934fbc56d2f8f915210e883bea5d7e..57ebc2c4b6fb3956a6a80b6e688aae8751406c19 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -117,6 +117,8 @@ const char* nodesNodeName(ENodeType type) { return "PhysiExchange"; case QUERY_NODE_PHYSICAL_PLAN_SORT: return "PhysiSort"; + case QUERY_NODE_PHYSICAL_PLAN_INTERVAL: + return "PhysiInterval"; case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: return "PhysiDispatch"; case QUERY_NODE_PHYSICAL_PLAN_INSERT: @@ -573,6 +575,65 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkIntervalPhysiPlanExprs = "Exprs"; +static const char* jkIntervalPhysiPlanFuncs = "Funcs"; +static const char* jkIntervalPhysiPlanInterval = "Interval"; +static const char* jkIntervalPhysiPlanOffset = "Offset"; +static const char* jkIntervalPhysiPlanSliding = "Sliding"; +static const char* jkIntervalPhysiPlanFill = "Fill"; + +static int32_t physiIntervalNodeToJson(const void* pObj, SJson* pJson) { + const SIntervalPhysiNode* pNode = (const SIntervalPhysiNode*)pObj; + + int32_t code = physicPlanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkIntervalPhysiPlanExprs, pNode->pExprs); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodeListToJson(pJson, jkIntervalPhysiPlanFuncs, pNode->pFuncs); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkIntervalPhysiPlanInterval, pNode->interval); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkIntervalPhysiPlanOffset, pNode->offset); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkIntervalPhysiPlanSliding, pNode->sliding); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkIntervalPhysiPlanFill, nodeToJson, pNode->pFill); + } + + return code; +} + +static int32_t jsonToPhysiIntervalNode(const SJson* pJson, void* pObj) { + SIntervalPhysiNode* pNode = (SIntervalPhysiNode*)pObj; + + int32_t code = jsonToPhysicPlanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkIntervalPhysiPlanExprs, &pNode->pExprs); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeList(pJson, jkIntervalPhysiPlanFuncs, &pNode->pFuncs); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkIntervalPhysiPlanInterval, &pNode->interval); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkIntervalPhysiPlanOffset, &pNode->offset); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkIntervalPhysiPlanSliding, &pNode->sliding); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkIntervalPhysiPlanFill, (SNode**)&pNode->pFill); + } + + return code; +} + static const char* jkDataSinkInputDataBlockDesc = "InputDataBlockDesc"; static int32_t physicDataSinkNodeToJson(const void* pObj, SJson* pJson) { @@ -1500,6 +1561,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { return physiExchangeNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_SORT: break; + case QUERY_NODE_PHYSICAL_PLAN_INTERVAL: + return physiIntervalNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: return physiDispatchNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_INSERT: diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 7e1d15c191ac3237b089ea961c44ed5121c9f7c1..0adbf8cb2f0afbd0ec64c1cb5f1876e8bbbd764d 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -134,6 +134,8 @@ SNodeptr nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SVnodeModifLogicNode)); case QUERY_NODE_LOGIC_PLAN_EXCHANGE: return makeNode(type, sizeof(SExchangeLogicNode)); + case QUERY_NODE_LOGIC_PLAN_WINDOW: + return makeNode(type, sizeof(SWindowLogicNode)); case QUERY_NODE_LOGIC_SUBPLAN: return makeNode(type, sizeof(SSubLogicPlan)); case QUERY_NODE_LOGIC_PLAN: @@ -156,6 +158,8 @@ SNodeptr nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SExchangePhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_SORT: return makeNode(type, sizeof(SNode)); + case QUERY_NODE_PHYSICAL_PLAN_INTERVAL: + return makeNode(type, sizeof(SIntervalPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: return makeNode(type, sizeof(SDataDispatcherNode)); case QUERY_NODE_PHYSICAL_PLAN_INSERT: diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index 310da2e966c0d1ac6f3b68a018a08e458d1bae20..f4ed3d85978a537ff676ad69642c2bd354c3ed68 100644 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -198,7 +198,7 @@ col_name(A) ::= column_name(B). cmd ::= SHOW VGROUPS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VGROUPS_STMT, NULL); } cmd ::= SHOW db_name(B) NK_DOT VGROUPS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VGROUPS_STMT, &B); } -/************************************************ show vgroups ********************************************************/ +/************************************************ show mnodes *********************************************************/ cmd ::= SHOW MNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_MNODES_STMT, NULL); } /************************************************ select **************************************************************/ diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index ac1509462bee1dad787f78ffb7e3aadf027b0acf..c1ba59dd8409b70cd81b6bb085149b8853ac8849 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -706,9 +706,17 @@ static int32_t translateGroupBy(STranslateContext* pCxt, SNodeList* pGroupByList return translateExprList(pCxt, pGroupByList); } +static int32_t doTranslateWindow(STranslateContext* pCxt, SNode* pWindow) { + return TSDB_CODE_SUCCESS; +} + static int32_t translateWindow(STranslateContext* pCxt, SNode* pWindow) { pCxt->currClause = SQL_CLAUSE_WINDOW; - return translateExpr(pCxt, pWindow); + int32_t code = translateExpr(pCxt, pWindow); + if (TSDB_CODE_SUCCESS == code) { + code = doTranslateWindow(pCxt, pWindow); + } + return code; } static int32_t translatePartitionBy(STranslateContext* pCxt, SNodeList* pPartitionByList) { diff --git a/source/libs/parser/test/parserAstTest.cpp b/source/libs/parser/test/parserAstTest.cpp index f9a2bafe1066f522d3c880155643523e89acdf56..d30c21cc2113355e3671d8e53ad0b1aece63454e 100644 --- a/source/libs/parser/test/parserAstTest.cpp +++ b/source/libs/parser/test/parserAstTest.cpp @@ -183,6 +183,13 @@ TEST_F(ParserTest, selectClause) { ASSERT_TRUE(run()); } +TEST_F(ParserTest, selectWindow) { + setDatabase("root", "test"); + + bind("SELECT count(*) FROM t1 interval(10s)"); + ASSERT_TRUE(run()); +} + TEST_F(ParserTest, selectSyntaxError) { setDatabase("root", "test"); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index a93985e8ba26cec7af6935222548378dc3656961..7520ea3c9e3c36d503d1358845d44bb42bf105a9 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -304,6 +304,50 @@ static SLogicNode* createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel return (SLogicNode*)pAgg; } +static SLogicNode* createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SIntervalWindowNode* pInterval, SSelectStmt* pSelect) { + SWindowLogicNode* pWindow = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_WINDOW); + CHECK_ALLOC(pWindow, NULL); + pWindow->node.id = pCxt->planNodeId++; + + pWindow->winType = WINDOW_TYPE_INTERVAL; + pWindow->interval = ((SValueNode*)pInterval->pInterval)->datum.i; + pWindow->offset = (NULL != pInterval->pOffset ? ((SValueNode*)pInterval->pOffset)->datum.i : 0); + pWindow->sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : 0); + if (NULL != pInterval->pFill) { + pWindow->pFill = nodesCloneNode(pInterval->pFill); + CHECK_ALLOC(pWindow->pFill, (SLogicNode*)pWindow); + } + + SNodeList* pFuncs = NULL; + CHECK_CODE(nodesCollectFuncs(pSelect, fmIsAggFunc, &pFuncs), NULL); + if (NULL != pFuncs) { + pWindow->pFuncs = nodesCloneList(pFuncs); + CHECK_ALLOC(pWindow->pFuncs, (SLogicNode*)pWindow); + } + + CHECK_CODE(rewriteExpr(pWindow->node.id, 1, pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW), (SLogicNode*)pWindow); + + pWindow->node.pTargets = createColumnByRewriteExps(pCxt, pWindow->pFuncs); + CHECK_ALLOC(pWindow->node.pTargets, (SLogicNode*)pWindow); + + return (SLogicNode*)pWindow; +} + +static SLogicNode* createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect) { + if (NULL == pSelect->pWindow) { + return NULL; + } + + switch (nodeType(pSelect->pWindow)) { + case QUERY_NODE_INTERVAL_WINDOW: + return createWindowLogicNodeByInterval(pCxt, (SIntervalWindowNode*)pSelect->pWindow, pSelect); + default: + break; + } + + return NULL; +} + static SNodeList* createColumnByProjections(SLogicPlanContext* pCxt, SNodeList* pExprs) { SNodeList* pList = nodesMakeList(); CHECK_ALLOC(pList, NULL); @@ -345,6 +389,9 @@ static SLogicNode* createSelectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p pRoot->pConditions = nodesCloneNode(pSelect->pWhere); CHECK_ALLOC(pRoot->pConditions, pRoot); } + if (TSDB_CODE_SUCCESS == pCxt->errCode) { + pRoot = pushLogicNode(pCxt, pRoot, createWindowLogicNode(pCxt, pSelect)); + } if (TSDB_CODE_SUCCESS == pCxt->errCode) { pRoot = pushLogicNode(pCxt, pRoot, createAggLogicNode(pCxt, pSelect)); } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 5af26b3e32cf6168b79aa1da7194f786d42c3665..7b7dd26df1a985463f915408f8984d00f245653f 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -473,14 +473,58 @@ static SPhysiNode* createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLog return (SPhysiNode*)pExchange; } +static SPhysiNode* createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode) { + SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_INTERVAL); + CHECK_ALLOC(pInterval, NULL); + + pInterval->interval = pWindowLogicNode->interval; + pInterval->offset = pWindowLogicNode->offset; + pInterval->sliding = pWindowLogicNode->sliding; + pInterval->pFill = nodesCloneNode(pWindowLogicNode->pFill); + + SNodeList* pPrecalcExprs = NULL; + SNodeList* pFuncs = NULL; + CHECK_CODE(rewritePrecalcExprs(pCxt, pWindowLogicNode->pFuncs, &pPrecalcExprs, &pFuncs), (SPhysiNode*)pInterval); + + SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc); + // push down expression to pOutputDataBlockDesc of child node + if (NULL != pPrecalcExprs) { + pInterval->pExprs = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pPrecalcExprs); + CHECK_ALLOC(pInterval->pExprs, (SPhysiNode*)pInterval); + CHECK_CODE(addDataBlockDesc(pCxt, pInterval->pExprs, pChildTupe), (SPhysiNode*)pInterval); + } + + if (NULL != pFuncs) { + pInterval->pFuncs = setListSlotId(pCxt, pChildTupe->dataBlockId, -1, pFuncs); + CHECK_ALLOC(pInterval->pFuncs, (SPhysiNode*)pInterval); + CHECK_CODE(addDataBlockDesc(pCxt, pInterval->pFuncs, pInterval->node.pOutputDataBlockDesc), (SPhysiNode*)pInterval); + } + + CHECK_CODE(setSlotOutput(pCxt, pWindowLogicNode->node.pTargets, pInterval->node.pOutputDataBlockDesc), (SPhysiNode*)pInterval); + + return (SPhysiNode*)pInterval; +} + +static SPhysiNode* createWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode) { + switch (pWindowLogicNode->winType) { + case WINDOW_TYPE_INTERVAL: + return createIntervalPhysiNode(pCxt, pChildren, pWindowLogicNode); + case WINDOW_TYPE_SESSION: + case WINDOW_TYPE_STATE: + break; + default: + break; + } + return NULL; +} + static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SLogicNode* pLogicPlan) { SNodeList* pChildren = nodesMakeList(); CHECK_ALLOC(pChildren, NULL); SNode* pLogicChild; FOREACH(pLogicChild, pLogicPlan->pChildren) { - SNode* pChildPhyNode = (SNode*)createPhysiNode(pCxt, pSubplan, (SLogicNode*)pLogicChild); - if (TSDB_CODE_SUCCESS != nodesListAppend(pChildren, pChildPhyNode)) { + if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pChildren, createPhysiNode(pCxt, pSubplan, (SLogicNode*)pLogicChild))) { pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; nodesDestroyList(pChildren); return NULL; @@ -504,6 +548,9 @@ static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, case QUERY_NODE_LOGIC_PLAN_EXCHANGE: pPhyNode = createExchangePhysiNode(pCxt, (SExchangeLogicNode*)pLogicPlan); break; + case QUERY_NODE_LOGIC_PLAN_WINDOW: + pPhyNode = createWindowPhysiNode(pCxt, pChildren, (SWindowLogicNode*)pLogicPlan); + break; default: break; } diff --git a/source/libs/planner/test/plannerTest.cpp b/source/libs/planner/test/plannerTest.cpp index 1af89d71c94e9c9cd2ea298e923178f4489e6631..8baa64d8fb78b92bbee314442ccf2f0b30cc2213 100644 --- a/source/libs/planner/test/plannerTest.cpp +++ b/source/libs/planner/test/plannerTest.cpp @@ -166,3 +166,10 @@ TEST_F(PlannerTest, subquery) { bind("SELECT count(*) FROM (SELECT c1 + c3 a, c1 + count(*) b FROM t1 where c2 = 'abc' GROUP BY c1, c3) where a > 100 group by b"); ASSERT_TRUE(run()); } + +TEST_F(PlannerTest, interval) { + setDatabase("root", "test"); + + bind("SELECT count(*) FROM t1 interval(10s)"); + ASSERT_TRUE(run()); +} diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index f2ac77fe61e41cbc8d041cfb82d80d3a39f0e505..99f890d3a0288b6da16c8047267986213a43e5fd 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -120,6 +120,10 @@ typedef struct { // SEpSet* pSet; // for synchronous API } SRpcReqContext; +typedef SRpcMsg STransMsg; +typedef SRpcInfo STrans; +typedef SRpcConnInfo STransHandleInfo; + typedef struct { SEpSet epSet; // ip list provided by app void* ahandle; // handle provided by app @@ -134,8 +138,8 @@ typedef struct { int8_t connType; // connection type int64_t rid; // refId returned by taosAddRef - SRpcMsg* pRsp; // for synchronous API - tsem_t* pSem; // for synchronous API + STransMsg* pRsp; // for synchronous API + tsem_t* pSem; // for synchronous API int hThrdIdx; char* ip; @@ -249,4 +253,15 @@ void transUnrefSrvHandle(void* handle); void transRefCliHandle(void* handle); void transUnrefCliHandle(void* handle); +void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg); +void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp); +void transSendResponse(const STransMsg* pMsg); +int transGetConnInfo(void* thandle, STransHandleInfo* pInfo); + +void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); +void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); + +void transCloseClient(void* arg); +void transCloseServer(void* arg); + #endif diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 4e4dcf7aa4b033959f793c705768694f7ebcf0a3..3924a5cf1a022349233eef43e57a00ff5d56bb40 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -64,6 +64,7 @@ typedef struct { void (*cfp)(void* parent, SRpcMsg*, SEpSet*); int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); bool (*pfp)(void* parent, tmsg_t msgType); + void* (*mfp)(void* parent, tmsg_t msgType); int32_t refCount; void* parent; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 4d244665c7d84dbfedded1e9339097d4a1bdfb3c..015018f73f32d113706f0b27f4b75a0e1713caed 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -18,8 +18,9 @@ #include "transComm.h" void* (*taosInitHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = { - taosInitServer, taosInitClient}; -void (*taosCloseHandle[])(void* arg) = {taosCloseServer, taosCloseClient}; + transInitServer, transInitClient}; + +void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient}; void* rpcOpen(const SRpcInit* pInit) { SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo)); @@ -34,11 +35,12 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->cfp = pInit->cfp; pRpc->afp = pInit->afp; pRpc->pfp = pInit->pfp; + pRpc->mfp = pInit->mfp; if (pInit->connType == TAOS_CONN_SERVER) { pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; } else { - pRpc->numOfThreads = pInit->numOfThreads; + pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; } pRpc->connType = pInit->connType; @@ -116,6 +118,24 @@ int32_t rpcInit() { return 0; } +void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg* pMsg, int64_t *pRid) { + char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); + uint32_t port = pEpSet->eps[pEpSet->inUse].port; + transSendRequest(shandle, ip, port, pMsg); +} +void rpcSendRecv(void* shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); + uint32_t port = pEpSet->eps[pEpSet->inUse].port; + transSendRecv(shandle, ip, port, pMsg, pRsp); +} + +void rpcSendResponse(const SRpcMsg *pMsg) { + transSendResponse(pMsg); +} +int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) { + return transGetConnInfo((void *)thandle, pInfo); +} + void rpcCleanup(void) { // impl later // @@ -129,6 +149,7 @@ void rpcRefHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); (*taosRefHandle[type])(handle); } + void rpcUnrefHandle(void* handle, int8_t type) { assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); (*taosUnRefHandle[type])(handle); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 727845b7a978fe56fd0a9de5a89db40796392b07..4af02a982ef30e150cb8556ae969067df3d4dd6d 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -42,7 +42,7 @@ typedef struct SCliConn { typedef struct SCliMsg { STransConnCtx* ctx; - SRpcMsg msg; + STransMsg msg; queue q; uint64_t st; } SCliMsg; @@ -105,9 +105,9 @@ static void cliHandleExcept(SCliConn* conn); static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd); static void cliSendQuit(SCliThrdObj* thrd); -static void destroyUserdata(SRpcMsg* userdata); +static void destroyUserdata(STransMsg* userdata); -static int cliRBChoseIdx(SRpcInfo* pTransInst); +static int cliRBChoseIdx(STrans* pTransInst); static void destroyCmsg(SCliMsg* cmsg); static void transDestroyConnCtx(STransConnCtx* ctx); @@ -118,11 +118,11 @@ static void destroyThrdObj(SCliThrdObj* pThrd); #define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1) #define CONN_PERSIST_TIME(para) (para * 1000 * 10) -#define CONN_GET_INST_LABEL(conn) (((SRpcInfo*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label) +#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label) #define CONN_HANDLE_THREAD_QUIT(conn, thrd) \ do { \ if (thrd->quit) { \ - cliHandleExcept(conn); \ + cliHandleExcept(conn); \ goto _RETURE; \ } \ } while (0) @@ -130,20 +130,25 @@ static void destroyThrdObj(SCliThrdObj* pThrd); #define CONN_HANDLE_BROKEN(conn) \ do { \ if (conn->broken) { \ - cliHandleExcept(conn); \ + cliHandleExcept(conn); \ goto _RETURE; \ } \ } while (0); -static void* cliWorkThread(void* arg); +#define CONN_SET_PERSIST_BY_APP(conn) \ + do { \ + if (conn->persist == false) { \ + conn->persist = true; \ + transRefCliHandle(conn); \ + } \ + } while (0) +#define CONN_NO_PERSIST_BY_APP(conn) ((conn)->persist == false) -static void* cliNotifyApp() {} -static void cliHandleResp(SCliConn* conn) { - SCliMsg* pMsg = conn->data; - STransConnCtx* pCtx = pMsg->ctx; +static void* cliWorkThread(void* arg); +void cliHandleResp(SCliConn* conn) { SCliThrdObj* pThrd = conn->hostThrd; - SRpcInfo* pTransInst = pThrd->pTransInst; + STrans* pTransInst = pThrd->pTransInst; STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf); pHead->code = htonl(pHead->code); @@ -152,19 +157,29 @@ static void cliHandleResp(SCliConn* conn) { // buf's mem alread translated to rpcMsg.pCont transClearBuffer(&conn->readBuf); - SRpcMsg rpcMsg = {0}; + STransMsg rpcMsg = {0}; rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); rpcMsg.pCont = transContFromHead((char*)pHead); rpcMsg.code = pHead->code; rpcMsg.msgType = pHead->msgType; - rpcMsg.ahandle = pCtx->ahandle; + rpcMsg.ahandle = NULL; - if (pTransInst->pfp != NULL && (pTransInst->pfp)(pTransInst->parent, rpcMsg.msgType)) { - rpcMsg.handle = conn; - transRefCliHandle(conn); + SCliMsg* pMsg = conn->data; + STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; + if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { + rpcMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, rpcMsg.msgType) : NULL; + } else { + rpcMsg.ahandle = pCtx ? pCtx->ahandle : NULL; + } + // if (rpcMsg.ahandle == NULL) { + // tDebug("%s cli conn %p handle except", CONN_GET_INST_LABEL(conn), conn); + // return; + //} - conn->persist = 1; - tDebug("cli conn %p persist by app", conn); + if (pTransInst->pfp != NULL && (*pTransInst->pfp)(pTransInst->parent, rpcMsg.msgType)) { + rpcMsg.handle = conn; + CONN_SET_PERSIST_BY_APP(conn); + tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); } tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, @@ -173,7 +188,7 @@ static void cliHandleResp(SCliConn* conn) { conn->secured = pHead->secured; - if (pCtx->pSem == NULL) { + if (pCtx == NULL || pCtx->pSem == NULL) { tTrace("%s cli conn %p handle resp", pTransInst->label, conn); (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); } else { @@ -184,8 +199,7 @@ static void cliHandleResp(SCliConn* conn) { uv_read_start((uv_stream_t*)conn->stream, cliAllocBufferCb, cliRecvCb); - // user owns conn->persist = 1 - if (conn->persist == 0) { + if (CONN_NO_PERSIST_BY_APP(conn)) { addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); } destroyCmsg(conn->data); @@ -196,24 +210,32 @@ static void cliHandleResp(SCliConn* conn) { // uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } } -static void cliHandleExcept(SCliConn* pConn) { + +void cliHandleExcept(SCliConn* pConn) { if (pConn->data == NULL) { - // handle conn except in conn pool - transUnrefCliHandle(pConn); - return; + if (pConn->broken == true || CONN_NO_PERSIST_BY_APP(pConn)) { + transUnrefCliHandle(pConn); + return; + } } SCliThrdObj* pThrd = pConn->hostThrd; - SRpcInfo* pTransInst = pThrd->pTransInst; + STrans* pTransInst = pThrd->pTransInst; SCliMsg* pMsg = pConn->data; - STransConnCtx* pCtx = pMsg->ctx; + STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; - SRpcMsg rpcMsg = {0}; - rpcMsg.ahandle = pCtx->ahandle; + STransMsg rpcMsg = {0}; rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - rpcMsg.msgType = pMsg->msg.msgType + 1; + rpcMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; + rpcMsg.ahandle = NULL; + + if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { + rpcMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, rpcMsg.msgType) : NULL; + } else { + rpcMsg.ahandle = pCtx ? pCtx->ahandle : NULL; + } - if (pCtx->pSem == NULL) { + if (pCtx == NULL || pCtx->pSem == NULL) { tTrace("%s cli conn %p handle resp", pTransInst->label, pConn); (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); } else { @@ -228,9 +250,9 @@ static void cliHandleExcept(SCliConn* pConn) { transUnrefCliHandle(pConn); } -static void cliTimeoutCb(uv_timer_t* handle) { +void cliTimeoutCb(uv_timer_t* handle) { SCliThrdObj* pThrd = handle->data; - SRpcInfo* pRpc = pThrd->pTransInst; + STrans* pRpc = pThrd->pTransInst; int64_t currentTime = pThrd->nextTimeout; tTrace("%s, cli conn timeout, try to remove expire conn from conn pool", pRpc->label); @@ -252,11 +274,12 @@ static void cliTimeoutCb(uv_timer_t* handle) { pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } -static void* createConnPool(int size) { + +void* createConnPool(int size) { // thread local, no lock return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); } -static void* destroyConnPool(void* pool) { +void* destroyConnPool(void* pool) { SConnList* connList = taosHashIterate((SHashObj*)pool, NULL); while (connList != NULL) { while (!QUEUE_IS_EMPTY(&connList->conn)) { @@ -301,7 +324,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); - SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; + STrans* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); @@ -358,6 +381,7 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) { QUEUE_INIT(&conn->conn); conn->hostThrd = pThrd; + conn->persist = false; conn->broken = false; transRefCliHandle(conn); return conn; @@ -395,16 +419,16 @@ static void cliSendCb(uv_write_t* req, int status) { uv_read_start((uv_stream_t*)pConn->stream, cliAllocBufferCb, cliRecvCb); } -static void cliSend(SCliConn* pConn) { +void cliSend(SCliConn* pConn) { CONN_HANDLE_BROKEN(pConn); SCliMsg* pCliMsg = pConn->data; STransConnCtx* pCtx = pCliMsg->ctx; SCliThrdObj* pThrd = pConn->hostThrd; - SRpcInfo* pTransInst = pThrd->pTransInst; + STrans* pTransInst = pThrd->pTransInst; - SRpcMsg* pMsg = (SRpcMsg*)(&pCliMsg->msg); + STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg); STransMsgHead* pHead = transHeadFromCont(pMsg->pCont); int msgLen = transMsgLenFromCont(pMsg->contLen); @@ -442,7 +466,8 @@ static void cliSend(SCliConn* pConn) { _RETURE: return; } -static void cliConnCb(uv_connect_t* req, int status) { + +void cliConnCb(uv_connect_t* req, int status) { // impl later SCliConn* pConn = req->data; if (status != 0) { @@ -472,11 +497,11 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { pThrd->quit = true; uv_stop(pThrd->loop); } -static SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { + +SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = NULL; if (pMsg->msg.handle != NULL) { conn = (SCliConn*)(pMsg->msg.handle); - transUnrefCliHandle(conn); if (conn != NULL) { tTrace("%s cli conn %p reused", CONN_GET_INST_LABEL(conn), conn); } @@ -487,13 +512,14 @@ static SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { } return conn; } -static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { + +void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { uint64_t et = taosGetTimestampUs(); uint64_t el = et - pMsg->st; - tTrace("%s cli msg tran time cost: %" PRIu64 "us", ((SRpcInfo*)pThrd->pTransInst)->label, el); + tTrace("%s cli msg tran time cost: %" PRIu64 "us", ((STrans*)pThrd->pTransInst)->label, el); STransConnCtx* pCtx = pMsg->ctx; - SRpcInfo* pTransInst = pThrd->pTransInst; + STrans* pTransInst = pThrd->pTransInst; SCliConn* conn = cliGetConn(pMsg, pThrd); if (conn != NULL) { @@ -514,6 +540,7 @@ static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port); uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); } + conn->hThrdIdx = pCtx->hThrdIdx; } static void cliAsyncCb(uv_async_t* handle) { @@ -522,7 +549,7 @@ static void cliAsyncCb(uv_async_t* handle) { SCliMsg* pMsg = NULL; // batch process to avoid to lock/unlock frequently - queue wq; + queue wq; pthread_mutex_lock(&item->mtx); QUEUE_MOVE(&item->qmsg, &wq); pthread_mutex_unlock(&item->mtx); @@ -551,10 +578,10 @@ static void* cliWorkThread(void* arg) { uv_run(pThrd->loop, UV_RUN_DEFAULT); } -void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { +void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { SCliObj* cli = calloc(1, sizeof(SCliObj)); - SRpcInfo* pRpc = shandle; + STrans* pRpc = shandle; memcpy(cli->label, label, strlen(label)); cli->numOfThreads = numOfThreads; cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*)); @@ -573,7 +600,7 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, return cli; } -static void destroyUserdata(SRpcMsg* userdata) { +static void destroyUserdata(STransMsg* userdata) { if (userdata->pCont == NULL) { return; } @@ -629,12 +656,20 @@ static void transDestroyConnCtx(STransConnCtx* ctx) { free(ctx); } // -static void cliSendQuit(SCliThrdObj* thrd) { +void cliSendQuit(SCliThrdObj* thrd) { // cli can stop gracefully SCliMsg* msg = calloc(1, sizeof(SCliMsg)); transSendAsync(thrd->asyncPool, &msg->q); } -void taosCloseClient(void* arg) { +int cliRBChoseIdx(STrans* pTransInst) { + int64_t index = pTransInst->index; + if (pTransInst->index++ >= pTransInst->numOfThreads) { + pTransInst->index = 0; + } + return index % pTransInst->numOfThreads; +} + +void transCloseClient(void* arg) { SCliObj* cli = arg; for (int i = 0; i < cli->numOfThreads; i++) { cliSendQuit(cli->pThreadObj[i]); @@ -643,13 +678,6 @@ void taosCloseClient(void* arg) { free(cli->pThreadObj); free(cli); } -static int cliRBChoseIdx(SRpcInfo* pTransInst) { - int64_t index = pTransInst->index; - if (pTransInst->index++ >= pTransInst->numOfThreads) { - pTransInst->index = 0; - } - return index % pTransInst->numOfThreads; -} void transRefCliHandle(void* handle) { if (handle == NULL) { return; @@ -665,17 +693,11 @@ void transUnrefCliHandle(void* handle) { if (ref == 0) { cliDestroyConn((SCliConn*)handle, true); } - - // unref cli handle } -void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { - // impl later - char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); - uint32_t port = pEpSet->eps[pEpSet->inUse].port; - - SRpcInfo* pTransInst = (SRpcInfo*)shandle; - int index = CONN_HOST_THREAD_INDEX(pMsg->handle); +void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg) { + STrans* pTransInst = (STrans*)shandle; + int index = CONN_HOST_THREAD_INDEX((SCliConn*)pMsg->handle); if (index == -1) { index = cliRBChoseIdx(pTransInst); } @@ -683,6 +705,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) { // imp later } + tDebug("send request at thread:%d %p", index, pMsg); STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); pCtx->ahandle = pMsg->ahandle; pCtx->msgType = pMsg->msgType; @@ -701,14 +724,9 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; transSendAsync(thrd->asyncPool, &(cliMsg->q)); } - -void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { - char* ip = (char*)(pEpSet->eps[pEpSet->inUse].fqdn); - uint32_t port = pEpSet->eps[pEpSet->inUse].port; - - SRpcInfo* pTransInst = (SRpcInfo*)shandle; - - int index = CONN_HOST_THREAD_INDEX(pReq->handle); +void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq, STransMsg* pRsp) { + STrans* pTransInst = (STrans*)shandle; + int index = CONN_HOST_THREAD_INDEX(pReq->handle); if (index == -1) { index = cliRBChoseIdx(pTransInst); } @@ -734,7 +752,6 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { tsem_wait(pSem); tsem_destroy(pSem); free(pSem); - - return; } + #endif diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 1abca9ad975ef6be1900a2b84f24d94a624889a1..c236a69f4ee0663112797d34f6623776365abd70 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -37,8 +37,7 @@ typedef struct SSrvConn { struct sockaddr_in addr; struct sockaddr_in locaddr; - // SRpcMsg sendMsg; - // del later + char secured; int spi; char info[64]; @@ -49,7 +48,7 @@ typedef struct SSrvConn { typedef struct SSrvMsg { SSrvConn* pConn; - SRpcMsg msg; + STransMsg msg; queue q; } SSrvMsg; @@ -207,20 +206,20 @@ static void uvHandleReq(SSrvConn* pConn) { pConn->inType = pHead->msgType; - SRpcInfo* pRpc = (SRpcInfo*)p->shandle; + STrans* pRpc = (STrans*)p->shandle; pHead->code = htonl(pHead->code); int32_t dlen = 0; if (transDecompressMsg(NULL, 0, NULL)) { // add compress later - // pHead = rpcDecompressRpcMsg(pHead); + // pHead = rpcDecompresSTransMsg(pHead); } else { pHead->msgLen = htonl(pHead->msgLen); // impl later // } - SRpcMsg rpcMsg; + STransMsg rpcMsg; rpcMsg.contLen = transContLenFromMsg(pHead->msgLen); rpcMsg.pCont = pHead->content; rpcMsg.msgType = pHead->msgType; @@ -260,7 +259,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { } tError("server conn %p read error: %s", conn, uv_err_name(nread)); - if (nread < 0 || nread == UV_EOF) { + if (nread < 0) { conn->broken = true; transUnrefSrvHandle(conn); @@ -318,8 +317,8 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { // impl later; tTrace("server conn %p prepare to send resp", smsg->pConn); - SSrvConn* pConn = smsg->pConn; - SRpcMsg* pMsg = &smsg->msg; + SSrvConn* pConn = smsg->pConn; + STransMsg* pMsg = &smsg->msg; if (pMsg->pCont == 0) { pMsg->pCont = (void*)rpcMallocCont(0); pMsg->contLen = 0; @@ -547,7 +546,7 @@ static bool addHandleToWorkloop(void* arg) { return false; } - // SRpcInfo* pRpc = pThrd->shandle; + // STrans* pRpc = pThrd->shandle; uv_pipe_init(pThrd->loop, pThrd->pipe, 1); uv_pipe_open(pThrd->pipe, pThrd->fd); @@ -668,7 +667,7 @@ static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen) { return msgLen; } -void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { +void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { SServerObj* srv = calloc(1, sizeof(SServerObj)); srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); srv->numOfThreads = numOfThreads; @@ -720,7 +719,7 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, return srv; End: - taosCloseServer(srv); + transCloseServer(srv); return NULL; } @@ -740,7 +739,7 @@ void sendQuitToWorkThrd(SWorkThrdObj* pThrd) { transSendAsync(pThrd->asyncPool, &srvMsg->q); } -void taosCloseServer(void* arg) { +void transCloseServer(void* arg) { // impl later SServerObj* srv = arg; for (int i = 0; i < srv->numOfThreads; i++) { @@ -786,7 +785,7 @@ void transUnrefSrvHandle(void* handle) { } // unref srv handle } -void rpcSendResponse(const SRpcMsg* pMsg) { +void transSendResponse(const STransMsg* pMsg) { if (pMsg->handle == NULL) { return; } @@ -799,14 +798,12 @@ void rpcSendResponse(const SRpcMsg* pMsg) { tTrace("server conn %p start to send resp", pConn); transSendAsync(pThrd->asyncPool, &srvMsg->q); } - -int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { - SSrvConn* pConn = thandle; - +int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) { + SSrvConn* pConn = thandle; struct sockaddr_in addr = pConn->addr; + pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr); pInfo->clientPort = ntohs(addr.sin_port); - tstrncpy(pInfo->user, pConn->user, sizeof(pInfo->user)); return 0; } diff --git a/source/os/src/osLocale.c b/source/os/src/osLocale.c index 47546f7deb2ddaf03106fd63e6f82a716f23a7fd..e9d6ed7c5424aff687175391942171f57c43c5f8 100644 --- a/source/os/src/osLocale.c +++ b/source/os/src/osLocale.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#define ALLOW_FORBID_FUNC #define _DEFAULT_SOURCE #include "osLocale.h" diff --git a/tests/script/sh/massiveTable/compileVersion.sh b/tests/script/sh/massiveTable/compileVersion.sh index 787da09b85dea03311795b828a209bf8c7468be4..c6c92bf72458c110ffa25a0127468d0bf2723c99 100755 --- a/tests/script/sh/massiveTable/compileVersion.sh +++ b/tests/script/sh/massiveTable/compileVersion.sh @@ -45,10 +45,6 @@ function gitPullBranchInfo () { git pull origin $branch_name ||: echo "==== git pull $branch_name end ====" git pull --recurse-submodules - cd tests - git checkout $branch_name - git pull - cd .. } function compileTDengineVersion() {