Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
388aac88
T
TDengine
项目概览
taosdata
/
TDengine
11 个月 前同步成功
通知
1179
Star
22014
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
388aac88
编写于
3月 14, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/TD-11463-3.0
上级
65870538
4189328d
变更
21
隐藏空白更改
内联
并排
Showing
21 changed file
with
403 addition
and
116 deletion
+403
-116
include/libs/nodes/nodes.h
include/libs/nodes/nodes.h
+2
-0
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+26
-0
include/libs/transport/trpc.h
include/libs/transport/trpc.h
+7
-0
include/os/osLocale.h
include/os/osLocale.h
+5
-1
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+1
-5
source/libs/nodes/src/nodesCloneFuncs.c
source/libs/nodes/src/nodesCloneFuncs.c
+21
-0
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+63
-0
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+4
-0
source/libs/parser/inc/sql.y
source/libs/parser/inc/sql.y
+1
-1
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+9
-1
source/libs/parser/test/parserAstTest.cpp
source/libs/parser/test/parserAstTest.cpp
+7
-0
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+47
-0
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+49
-2
source/libs/planner/test/plannerTest.cpp
source/libs/planner/test/plannerTest.cpp
+7
-0
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+17
-2
source/libs/transport/inc/transportInt.h
source/libs/transport/inc/transportInt.h
+1
-0
source/libs/transport/src/trans.c
source/libs/transport/src/trans.c
+24
-3
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+95
-78
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+16
-19
source/os/src/osLocale.c
source/os/src/osLocale.c
+1
-0
tests/script/sh/massiveTable/compileVersion.sh
tests/script/sh/massiveTable/compileVersion.sh
+0
-4
未找到文件。
include/libs/nodes/nodes.h
浏览文件 @
388aac88
...
...
@@ -101,6 +101,7 @@ typedef enum ENodeType {
QUERY_NODE_LOGIC_PLAN_PROJECT
,
QUERY_NODE_LOGIC_PLAN_VNODE_MODIF
,
QUERY_NODE_LOGIC_PLAN_EXCHANGE
,
QUERY_NODE_LOGIC_PLAN_WINDOW
,
QUERY_NODE_LOGIC_SUBPLAN
,
QUERY_NODE_LOGIC_PLAN
,
...
...
@@ -115,6 +116,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_AGG
,
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
,
QUERY_NODE_PHYSICAL_PLAN_SORT
,
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
,
QUERY_NODE_PHYSICAL_PLAN_DISPATCH
,
QUERY_NODE_PHYSICAL_PLAN_INSERT
,
QUERY_NODE_PHYSICAL_SUBPLAN
,
...
...
include/libs/nodes/plannodes.h
浏览文件 @
388aac88
...
...
@@ -80,6 +80,22 @@ typedef struct SExchangeLogicNode {
int32_t
srcGroupId
;
}
SExchangeLogicNode
;
typedef
enum
EWindowType
{
WINDOW_TYPE_INTERVAL
=
1
,
WINDOW_TYPE_SESSION
,
WINDOW_TYPE_STATE
}
EWindowType
;
typedef
struct
SWindowLogicNode
{
SLogicNode
node
;
EWindowType
winType
;
SNodeList
*
pFuncs
;
int64_t
interval
;
int64_t
offset
;
int64_t
sliding
;
SFillNode
*
pFill
;
}
SWindowLogicNode
;
typedef
enum
ESubplanType
{
SUBPLAN_TYPE_MERGE
=
1
,
SUBPLAN_TYPE_PARTIAL
,
...
...
@@ -191,6 +207,16 @@ typedef struct SExchangePhysiNode {
SNodeList
*
pSrcEndPoints
;
// element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode
}
SExchangePhysiNode
;
typedef
struct
SIntervalPhysiNode
{
SPhysiNode
node
;
SNodeList
*
pExprs
;
// these are expression list of parameter expression of function
SNodeList
*
pFuncs
;
int64_t
interval
;
int64_t
offset
;
int64_t
sliding
;
SFillNode
*
pFill
;
}
SIntervalPhysiNode
;
typedef
struct
SDataSinkNode
{
ENodeType
type
;
SDataBlockDescNode
*
pInputDataBlockDesc
;
...
...
include/libs/transport/trpc.h
浏览文件 @
388aac88
...
...
@@ -71,6 +71,10 @@ typedef struct SRpcInit {
// call back to keep conn or not
bool
(
*
pfp
)(
void
*
parent
,
tmsg_t
msgType
);
// to support Send messages multiple times on a link
//
void
*
(
*
mfp
)(
void
*
parent
,
tmsg_t
msgType
);
void
*
parent
;
}
SRpcInit
;
...
...
@@ -89,6 +93,9 @@ void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp)
int
rpcReportProgress
(
void
*
pConn
,
char
*
pCont
,
int
contLen
);
void
rpcCancelRequest
(
int64_t
rid
);
// just release client conn to rpc instance, no close sock
void
rpcReleaseHandle
(
void
*
handle
);
void
rpcRefHandle
(
void
*
handle
,
int8_t
type
);
void
rpcUnrefHandle
(
void
*
handle
,
int8_t
type
);
...
...
include/os/osLocale.h
浏览文件 @
388aac88
...
...
@@ -17,12 +17,16 @@
#define _TD_OS_LOCALE_H_
#include "os.h"
#include "osString.h"
#ifdef __cplusplus
extern
"C"
{
#endif
// If the error is in a third-party library, place this header file under the third-party library header file.
#ifndef ALLOW_FORBID_FUNC
#define setlocale SETLOCALE_FUNC_TAOS_FORBID
#endif
char
*
taosCharsetReplace
(
char
*
charsetstr
);
void
taosGetSystemLocale
(
char
*
outLocale
,
char
*
outCharset
);
void
taosSetSystemLocale
(
const
char
*
inLocale
,
const
char
*
inCharSet
);
...
...
source/client/src/clientImpl.c
浏览文件 @
388aac88
...
...
@@ -195,11 +195,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
int32_t
getPlan
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
SQueryPlan
**
pPlan
,
SArray
*
pNodeList
)
{
pRequest
->
type
=
pQuery
->
msgType
;
SPlanContext
cxt
=
{
.
queryId
=
pRequest
->
requestId
,
.
pAstRoot
=
pQuery
->
pRoot
,
.
acctId
=
pRequest
->
pTscObj
->
acctId
};
int32_t
code
=
qCreateQueryPlan
(
&
cxt
,
pPlan
,
pNodeList
);
if
(
code
!=
0
)
{
return
code
;
}
return
code
;
return
qCreateQueryPlan
(
&
cxt
,
pPlan
,
pNodeList
);
}
void
setResSchemaInfo
(
SReqResultInfo
*
pResInfo
,
const
SSchema
*
pSchema
,
int32_t
numOfCols
)
{
...
...
source/libs/nodes/src/nodesCloneFuncs.c
浏览文件 @
388aac88
...
...
@@ -183,6 +183,12 @@ static SNode* groupingSetNodeCopy(const SGroupingSetNode* pSrc, SGroupingSetNode
return
(
SNode
*
)
pDst
;
}
static
SNode
*
fillNodeCopy
(
const
SFillNode
*
pSrc
,
SFillNode
*
pDst
)
{
COPY_SCALAR_FIELD
(
mode
);
CLONE_NODE_FIELD
(
pValues
);
return
(
SNode
*
)
pDst
;
}
static
SNode
*
logicNodeCopy
(
const
SLogicNode
*
pSrc
,
SLogicNode
*
pDst
)
{
COPY_SCALAR_FIELD
(
id
);
CLONE_NODE_LIST_FIELD
(
pTargets
);
...
...
@@ -248,6 +254,17 @@ static SNode* logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNo
return
(
SNode
*
)
pDst
;
}
static
SNode
*
logicWindowCopy
(
const
SWindowLogicNode
*
pSrc
,
SWindowLogicNode
*
pDst
)
{
COPY_BASE_OBJECT_FIELD
(
node
,
logicNodeCopy
);
COPY_SCALAR_FIELD
(
winType
);
CLONE_NODE_LIST_FIELD
(
pFuncs
);
COPY_SCALAR_FIELD
(
interval
);
COPY_SCALAR_FIELD
(
offset
);
COPY_SCALAR_FIELD
(
sliding
);
CLONE_NODE_FIELD
(
pFill
);
return
(
SNode
*
)
pDst
;
}
static
SNode
*
logicSubplanCopy
(
const
SSubLogicPlan
*
pSrc
,
SSubLogicPlan
*
pDst
)
{
CLONE_NODE_FIELD
(
pNode
);
COPY_SCALAR_FIELD
(
subplanType
);
...
...
@@ -309,6 +326,8 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
case
QUERY_NODE_ORDER_BY_EXPR
:
case
QUERY_NODE_LIMIT
:
break
;
case
QUERY_NODE_FILL
:
return
fillNodeCopy
((
const
SFillNode
*
)
pNode
,
(
SFillNode
*
)
pDst
);
case
QUERY_NODE_DATABLOCK_DESC
:
return
dataBlockDescCopy
((
const
SDataBlockDescNode
*
)
pNode
,
(
SDataBlockDescNode
*
)
pDst
);
case
QUERY_NODE_SLOT_DESC
:
...
...
@@ -325,6 +344,8 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
return
logicVnodeModifCopy
((
const
SVnodeModifLogicNode
*
)
pNode
,
(
SVnodeModifLogicNode
*
)
pDst
);
case
QUERY_NODE_LOGIC_PLAN_EXCHANGE
:
return
logicExchangeCopy
((
const
SExchangeLogicNode
*
)
pNode
,
(
SExchangeLogicNode
*
)
pDst
);
case
QUERY_NODE_LOGIC_PLAN_WINDOW
:
return
logicWindowCopy
((
const
SWindowLogicNode
*
)
pNode
,
(
SWindowLogicNode
*
)
pDst
);
case
QUERY_NODE_LOGIC_SUBPLAN
:
return
logicSubplanCopy
((
const
SSubLogicPlan
*
)
pNode
,
(
SSubLogicPlan
*
)
pDst
);
default:
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
388aac88
...
...
@@ -117,6 +117,8 @@ const char* nodesNodeName(ENodeType type) {
return
"PhysiExchange"
;
case
QUERY_NODE_PHYSICAL_PLAN_SORT
:
return
"PhysiSort"
;
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
return
"PhysiInterval"
;
case
QUERY_NODE_PHYSICAL_PLAN_DISPATCH
:
return
"PhysiDispatch"
;
case
QUERY_NODE_PHYSICAL_PLAN_INSERT
:
...
...
@@ -573,6 +575,65 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) {
return
code
;
}
static
const
char
*
jkIntervalPhysiPlanExprs
=
"Exprs"
;
static
const
char
*
jkIntervalPhysiPlanFuncs
=
"Funcs"
;
static
const
char
*
jkIntervalPhysiPlanInterval
=
"Interval"
;
static
const
char
*
jkIntervalPhysiPlanOffset
=
"Offset"
;
static
const
char
*
jkIntervalPhysiPlanSliding
=
"Sliding"
;
static
const
char
*
jkIntervalPhysiPlanFill
=
"Fill"
;
static
int32_t
physiIntervalNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SIntervalPhysiNode
*
pNode
=
(
const
SIntervalPhysiNode
*
)
pObj
;
int32_t
code
=
physicPlanNodeToJson
(
pObj
,
pJson
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodeListToJson
(
pJson
,
jkIntervalPhysiPlanExprs
,
pNode
->
pExprs
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodeListToJson
(
pJson
,
jkIntervalPhysiPlanFuncs
,
pNode
->
pFuncs
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkIntervalPhysiPlanInterval
,
pNode
->
interval
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkIntervalPhysiPlanOffset
,
pNode
->
offset
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkIntervalPhysiPlanSliding
,
pNode
->
sliding
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkIntervalPhysiPlanFill
,
nodeToJson
,
pNode
->
pFill
);
}
return
code
;
}
static
int32_t
jsonToPhysiIntervalNode
(
const
SJson
*
pJson
,
void
*
pObj
)
{
SIntervalPhysiNode
*
pNode
=
(
SIntervalPhysiNode
*
)
pObj
;
int32_t
code
=
jsonToPhysicPlanNode
(
pJson
,
pObj
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkIntervalPhysiPlanExprs
,
&
pNode
->
pExprs
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeList
(
pJson
,
jkIntervalPhysiPlanFuncs
,
&
pNode
->
pFuncs
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetBigIntValue
(
pJson
,
jkIntervalPhysiPlanInterval
,
&
pNode
->
interval
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetBigIntValue
(
pJson
,
jkIntervalPhysiPlanOffset
,
&
pNode
->
offset
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetBigIntValue
(
pJson
,
jkIntervalPhysiPlanSliding
,
&
pNode
->
sliding
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkIntervalPhysiPlanFill
,
(
SNode
**
)
&
pNode
->
pFill
);
}
return
code
;
}
static
const
char
*
jkDataSinkInputDataBlockDesc
=
"InputDataBlockDesc"
;
static
int32_t
physicDataSinkNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
...
...
@@ -1500,6 +1561,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return
physiExchangeNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_SORT
:
break
;
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
return
physiIntervalNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_DISPATCH
:
return
physiDispatchNodeToJson
(
pObj
,
pJson
);
case
QUERY_NODE_PHYSICAL_PLAN_INSERT
:
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
388aac88
...
...
@@ -134,6 +134,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return
makeNode
(
type
,
sizeof
(
SVnodeModifLogicNode
));
case
QUERY_NODE_LOGIC_PLAN_EXCHANGE
:
return
makeNode
(
type
,
sizeof
(
SExchangeLogicNode
));
case
QUERY_NODE_LOGIC_PLAN_WINDOW
:
return
makeNode
(
type
,
sizeof
(
SWindowLogicNode
));
case
QUERY_NODE_LOGIC_SUBPLAN
:
return
makeNode
(
type
,
sizeof
(
SSubLogicPlan
));
case
QUERY_NODE_LOGIC_PLAN
:
...
...
@@ -156,6 +158,8 @@ SNodeptr nodesMakeNode(ENodeType type) {
return
makeNode
(
type
,
sizeof
(
SExchangePhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_SORT
:
return
makeNode
(
type
,
sizeof
(
SNode
));
case
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
:
return
makeNode
(
type
,
sizeof
(
SIntervalPhysiNode
));
case
QUERY_NODE_PHYSICAL_PLAN_DISPATCH
:
return
makeNode
(
type
,
sizeof
(
SDataDispatcherNode
));
case
QUERY_NODE_PHYSICAL_PLAN_INSERT
:
...
...
source/libs/parser/inc/sql.y
浏览文件 @
388aac88
...
...
@@ -198,7 +198,7 @@ col_name(A) ::= column_name(B).
cmd ::= SHOW VGROUPS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VGROUPS_STMT, NULL); }
cmd ::= SHOW db_name(B) NK_DOT VGROUPS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_VGROUPS_STMT, &B); }
/************************************************ show
vgroups
********************************************************/
/************************************************ show
mnodes *
********************************************************/
cmd ::= SHOW MNODES. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_MNODES_STMT, NULL); }
/************************************************ select **************************************************************/
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
388aac88
...
...
@@ -706,9 +706,17 @@ static int32_t translateGroupBy(STranslateContext* pCxt, SNodeList* pGroupByList
return
translateExprList
(
pCxt
,
pGroupByList
);
}
static
int32_t
doTranslateWindow
(
STranslateContext
*
pCxt
,
SNode
*
pWindow
)
{
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
translateWindow
(
STranslateContext
*
pCxt
,
SNode
*
pWindow
)
{
pCxt
->
currClause
=
SQL_CLAUSE_WINDOW
;
return
translateExpr
(
pCxt
,
pWindow
);
int32_t
code
=
translateExpr
(
pCxt
,
pWindow
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
doTranslateWindow
(
pCxt
,
pWindow
);
}
return
code
;
}
static
int32_t
translatePartitionBy
(
STranslateContext
*
pCxt
,
SNodeList
*
pPartitionByList
)
{
...
...
source/libs/parser/test/parserAstTest.cpp
浏览文件 @
388aac88
...
...
@@ -183,6 +183,13 @@ TEST_F(ParserTest, selectClause) {
ASSERT_TRUE
(
run
());
}
TEST_F
(
ParserTest
,
selectWindow
)
{
setDatabase
(
"root"
,
"test"
);
bind
(
"SELECT count(*) FROM t1 interval(10s)"
);
ASSERT_TRUE
(
run
());
}
TEST_F
(
ParserTest
,
selectSyntaxError
)
{
setDatabase
(
"root"
,
"test"
);
...
...
source/libs/planner/src/planLogicCreater.c
浏览文件 @
388aac88
...
...
@@ -304,6 +304,50 @@ static SLogicNode* createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel
return
(
SLogicNode
*
)
pAgg
;
}
static
SLogicNode
*
createWindowLogicNodeByInterval
(
SLogicPlanContext
*
pCxt
,
SIntervalWindowNode
*
pInterval
,
SSelectStmt
*
pSelect
)
{
SWindowLogicNode
*
pWindow
=
nodesMakeNode
(
QUERY_NODE_LOGIC_PLAN_WINDOW
);
CHECK_ALLOC
(
pWindow
,
NULL
);
pWindow
->
node
.
id
=
pCxt
->
planNodeId
++
;
pWindow
->
winType
=
WINDOW_TYPE_INTERVAL
;
pWindow
->
interval
=
((
SValueNode
*
)
pInterval
->
pInterval
)
->
datum
.
i
;
pWindow
->
offset
=
(
NULL
!=
pInterval
->
pOffset
?
((
SValueNode
*
)
pInterval
->
pOffset
)
->
datum
.
i
:
0
);
pWindow
->
sliding
=
(
NULL
!=
pInterval
->
pSliding
?
((
SValueNode
*
)
pInterval
->
pSliding
)
->
datum
.
i
:
0
);
if
(
NULL
!=
pInterval
->
pFill
)
{
pWindow
->
pFill
=
nodesCloneNode
(
pInterval
->
pFill
);
CHECK_ALLOC
(
pWindow
->
pFill
,
(
SLogicNode
*
)
pWindow
);
}
SNodeList
*
pFuncs
=
NULL
;
CHECK_CODE
(
nodesCollectFuncs
(
pSelect
,
fmIsAggFunc
,
&
pFuncs
),
NULL
);
if
(
NULL
!=
pFuncs
)
{
pWindow
->
pFuncs
=
nodesCloneList
(
pFuncs
);
CHECK_ALLOC
(
pWindow
->
pFuncs
,
(
SLogicNode
*
)
pWindow
);
}
CHECK_CODE
(
rewriteExpr
(
pWindow
->
node
.
id
,
1
,
pWindow
->
pFuncs
,
pSelect
,
SQL_CLAUSE_WINDOW
),
(
SLogicNode
*
)
pWindow
);
pWindow
->
node
.
pTargets
=
createColumnByRewriteExps
(
pCxt
,
pWindow
->
pFuncs
);
CHECK_ALLOC
(
pWindow
->
node
.
pTargets
,
(
SLogicNode
*
)
pWindow
);
return
(
SLogicNode
*
)
pWindow
;
}
static
SLogicNode
*
createWindowLogicNode
(
SLogicPlanContext
*
pCxt
,
SSelectStmt
*
pSelect
)
{
if
(
NULL
==
pSelect
->
pWindow
)
{
return
NULL
;
}
switch
(
nodeType
(
pSelect
->
pWindow
))
{
case
QUERY_NODE_INTERVAL_WINDOW
:
return
createWindowLogicNodeByInterval
(
pCxt
,
(
SIntervalWindowNode
*
)
pSelect
->
pWindow
,
pSelect
);
default:
break
;
}
return
NULL
;
}
static
SNodeList
*
createColumnByProjections
(
SLogicPlanContext
*
pCxt
,
SNodeList
*
pExprs
)
{
SNodeList
*
pList
=
nodesMakeList
();
CHECK_ALLOC
(
pList
,
NULL
);
...
...
@@ -345,6 +389,9 @@ static SLogicNode* createSelectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
pRoot
->
pConditions
=
nodesCloneNode
(
pSelect
->
pWhere
);
CHECK_ALLOC
(
pRoot
->
pConditions
,
pRoot
);
}
if
(
TSDB_CODE_SUCCESS
==
pCxt
->
errCode
)
{
pRoot
=
pushLogicNode
(
pCxt
,
pRoot
,
createWindowLogicNode
(
pCxt
,
pSelect
));
}
if
(
TSDB_CODE_SUCCESS
==
pCxt
->
errCode
)
{
pRoot
=
pushLogicNode
(
pCxt
,
pRoot
,
createAggLogicNode
(
pCxt
,
pSelect
));
}
...
...
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
388aac88
...
...
@@ -473,14 +473,58 @@ static SPhysiNode* createExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLog
return
(
SPhysiNode
*
)
pExchange
;
}
static
SPhysiNode
*
createIntervalPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SWindowLogicNode
*
pWindowLogicNode
)
{
SIntervalPhysiNode
*
pInterval
=
(
SIntervalPhysiNode
*
)
makePhysiNode
(
pCxt
,
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
);
CHECK_ALLOC
(
pInterval
,
NULL
);
pInterval
->
interval
=
pWindowLogicNode
->
interval
;
pInterval
->
offset
=
pWindowLogicNode
->
offset
;
pInterval
->
sliding
=
pWindowLogicNode
->
sliding
;
pInterval
->
pFill
=
nodesCloneNode
(
pWindowLogicNode
->
pFill
);
SNodeList
*
pPrecalcExprs
=
NULL
;
SNodeList
*
pFuncs
=
NULL
;
CHECK_CODE
(
rewritePrecalcExprs
(
pCxt
,
pWindowLogicNode
->
pFuncs
,
&
pPrecalcExprs
,
&
pFuncs
),
(
SPhysiNode
*
)
pInterval
);
SDataBlockDescNode
*
pChildTupe
=
(((
SPhysiNode
*
)
nodesListGetNode
(
pChildren
,
0
))
->
pOutputDataBlockDesc
);
// push down expression to pOutputDataBlockDesc of child node
if
(
NULL
!=
pPrecalcExprs
)
{
pInterval
->
pExprs
=
setListSlotId
(
pCxt
,
pChildTupe
->
dataBlockId
,
-
1
,
pPrecalcExprs
);
CHECK_ALLOC
(
pInterval
->
pExprs
,
(
SPhysiNode
*
)
pInterval
);
CHECK_CODE
(
addDataBlockDesc
(
pCxt
,
pInterval
->
pExprs
,
pChildTupe
),
(
SPhysiNode
*
)
pInterval
);
}
if
(
NULL
!=
pFuncs
)
{
pInterval
->
pFuncs
=
setListSlotId
(
pCxt
,
pChildTupe
->
dataBlockId
,
-
1
,
pFuncs
);
CHECK_ALLOC
(
pInterval
->
pFuncs
,
(
SPhysiNode
*
)
pInterval
);
CHECK_CODE
(
addDataBlockDesc
(
pCxt
,
pInterval
->
pFuncs
,
pInterval
->
node
.
pOutputDataBlockDesc
),
(
SPhysiNode
*
)
pInterval
);
}
CHECK_CODE
(
setSlotOutput
(
pCxt
,
pWindowLogicNode
->
node
.
pTargets
,
pInterval
->
node
.
pOutputDataBlockDesc
),
(
SPhysiNode
*
)
pInterval
);
return
(
SPhysiNode
*
)
pInterval
;
}
static
SPhysiNode
*
createWindowPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SNodeList
*
pChildren
,
SWindowLogicNode
*
pWindowLogicNode
)
{
switch
(
pWindowLogicNode
->
winType
)
{
case
WINDOW_TYPE_INTERVAL
:
return
createIntervalPhysiNode
(
pCxt
,
pChildren
,
pWindowLogicNode
);
case
WINDOW_TYPE_SESSION
:
case
WINDOW_TYPE_STATE
:
break
;
default:
break
;
}
return
NULL
;
}
static
SPhysiNode
*
createPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SSubplan
*
pSubplan
,
SLogicNode
*
pLogicPlan
)
{
SNodeList
*
pChildren
=
nodesMakeList
();
CHECK_ALLOC
(
pChildren
,
NULL
);
SNode
*
pLogicChild
;
FOREACH
(
pLogicChild
,
pLogicPlan
->
pChildren
)
{
SNode
*
pChildPhyNode
=
(
SNode
*
)
createPhysiNode
(
pCxt
,
pSubplan
,
(
SLogicNode
*
)
pLogicChild
);
if
(
TSDB_CODE_SUCCESS
!=
nodesListAppend
(
pChildren
,
pChildPhyNode
))
{
if
(
TSDB_CODE_SUCCESS
!=
nodesListStrictAppend
(
pChildren
,
createPhysiNode
(
pCxt
,
pSubplan
,
(
SLogicNode
*
)
pLogicChild
)))
{
pCxt
->
errCode
=
TSDB_CODE_OUT_OF_MEMORY
;
nodesDestroyList
(
pChildren
);
return
NULL
;
...
...
@@ -504,6 +548,9 @@ static SPhysiNode* createPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
case
QUERY_NODE_LOGIC_PLAN_EXCHANGE
:
pPhyNode
=
createExchangePhysiNode
(
pCxt
,
(
SExchangeLogicNode
*
)
pLogicPlan
);
break
;
case
QUERY_NODE_LOGIC_PLAN_WINDOW
:
pPhyNode
=
createWindowPhysiNode
(
pCxt
,
pChildren
,
(
SWindowLogicNode
*
)
pLogicPlan
);
break
;
default:
break
;
}
...
...
source/libs/planner/test/plannerTest.cpp
浏览文件 @
388aac88
...
...
@@ -166,3 +166,10 @@ TEST_F(PlannerTest, subquery) {
bind
(
"SELECT count(*) FROM (SELECT c1 + c3 a, c1 + count(*) b FROM t1 where c2 = 'abc' GROUP BY c1, c3) where a > 100 group by b"
);
ASSERT_TRUE
(
run
());
}
TEST_F
(
PlannerTest
,
interval
)
{
setDatabase
(
"root"
,
"test"
);
bind
(
"SELECT count(*) FROM t1 interval(10s)"
);
ASSERT_TRUE
(
run
());
}
source/libs/transport/inc/transComm.h
浏览文件 @
388aac88
...
...
@@ -120,6 +120,10 @@ typedef struct {
// SEpSet* pSet; // for synchronous API
}
SRpcReqContext
;
typedef
SRpcMsg
STransMsg
;
typedef
SRpcInfo
STrans
;
typedef
SRpcConnInfo
STransHandleInfo
;
typedef
struct
{
SEpSet
epSet
;
// ip list provided by app
void
*
ahandle
;
// handle provided by app
...
...
@@ -134,8 +138,8 @@ typedef struct {
int8_t
connType
;
// connection type
int64_t
rid
;
// refId returned by taosAddRef
S
Rpc
Msg
*
pRsp
;
// for synchronous API
tsem_t
*
pSem
;
// for synchronous API
S
Trans
Msg
*
pRsp
;
// for synchronous API
tsem_t
*
pSem
;
// for synchronous API
int
hThrdIdx
;
char
*
ip
;
...
...
@@ -249,4 +253,15 @@ void transUnrefSrvHandle(void* handle);
void
transRefCliHandle
(
void
*
handle
);
void
transUnrefCliHandle
(
void
*
handle
);
void
transSendRequest
(
void
*
shandle
,
const
char
*
ip
,
uint32_t
port
,
STransMsg
*
pMsg
);
void
transSendRecv
(
void
*
shandle
,
const
char
*
ip
,
uint32_t
port
,
STransMsg
*
pMsg
,
STransMsg
*
pRsp
);
void
transSendResponse
(
const
STransMsg
*
pMsg
);
int
transGetConnInfo
(
void
*
thandle
,
STransHandleInfo
*
pInfo
);
void
*
transInitServer
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
);
void
*
transInitClient
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
);
void
transCloseClient
(
void
*
arg
);
void
transCloseServer
(
void
*
arg
);
#endif
source/libs/transport/inc/transportInt.h
浏览文件 @
388aac88
...
...
@@ -64,6 +64,7 @@ typedef struct {
void
(
*
cfp
)(
void
*
parent
,
SRpcMsg
*
,
SEpSet
*
);
int
(
*
afp
)(
void
*
parent
,
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
bool
(
*
pfp
)(
void
*
parent
,
tmsg_t
msgType
);
void
*
(
*
mfp
)(
void
*
parent
,
tmsg_t
msgType
);
int32_t
refCount
;
void
*
parent
;
...
...
source/libs/transport/src/trans.c
浏览文件 @
388aac88
...
...
@@ -18,8 +18,9 @@
#include "transComm.h"
void
*
(
*
taosInitHandle
[])(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
=
{
taosInitServer
,
taosInitClient
};
void
(
*
taosCloseHandle
[])(
void
*
arg
)
=
{
taosCloseServer
,
taosCloseClient
};
transInitServer
,
transInitClient
};
void
(
*
taosCloseHandle
[])(
void
*
arg
)
=
{
transCloseServer
,
transCloseClient
};
void
*
rpcOpen
(
const
SRpcInit
*
pInit
)
{
SRpcInfo
*
pRpc
=
calloc
(
1
,
sizeof
(
SRpcInfo
));
...
...
@@ -34,11 +35,12 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc
->
cfp
=
pInit
->
cfp
;
pRpc
->
afp
=
pInit
->
afp
;
pRpc
->
pfp
=
pInit
->
pfp
;
pRpc
->
mfp
=
pInit
->
mfp
;
if
(
pInit
->
connType
==
TAOS_CONN_SERVER
)
{
pRpc
->
numOfThreads
=
pInit
->
numOfThreads
>
TSDB_MAX_RPC_THREADS
?
TSDB_MAX_RPC_THREADS
:
pInit
->
numOfThreads
;
}
else
{
pRpc
->
numOfThreads
=
pInit
->
numOfThreads
;
pRpc
->
numOfThreads
=
pInit
->
numOfThreads
>
TSDB_MAX_RPC_THREADS
?
TSDB_MAX_RPC_THREADS
:
pInit
->
numOfThreads
;
}
pRpc
->
connType
=
pInit
->
connType
;
...
...
@@ -116,6 +118,24 @@ int32_t rpcInit() {
return
0
;
}
void
rpcSendRequest
(
void
*
shandle
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
int64_t
*
pRid
)
{
char
*
ip
=
(
char
*
)(
pEpSet
->
eps
[
pEpSet
->
inUse
].
fqdn
);
uint32_t
port
=
pEpSet
->
eps
[
pEpSet
->
inUse
].
port
;
transSendRequest
(
shandle
,
ip
,
port
,
pMsg
);
}
void
rpcSendRecv
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
SRpcMsg
*
pRsp
)
{
char
*
ip
=
(
char
*
)(
pEpSet
->
eps
[
pEpSet
->
inUse
].
fqdn
);
uint32_t
port
=
pEpSet
->
eps
[
pEpSet
->
inUse
].
port
;
transSendRecv
(
shandle
,
ip
,
port
,
pMsg
,
pRsp
);
}
void
rpcSendResponse
(
const
SRpcMsg
*
pMsg
)
{
transSendResponse
(
pMsg
);
}
int
rpcGetConnInfo
(
void
*
thandle
,
SRpcConnInfo
*
pInfo
)
{
return
transGetConnInfo
((
void
*
)
thandle
,
pInfo
);
}
void
rpcCleanup
(
void
)
{
// impl later
//
...
...
@@ -129,6 +149,7 @@ void rpcRefHandle(void* handle, int8_t type) {
assert
(
type
==
TAOS_CONN_SERVER
||
type
==
TAOS_CONN_CLIENT
);
(
*
taosRefHandle
[
type
])(
handle
);
}
void
rpcUnrefHandle
(
void
*
handle
,
int8_t
type
)
{
assert
(
type
==
TAOS_CONN_SERVER
||
type
==
TAOS_CONN_CLIENT
);
(
*
taosUnRefHandle
[
type
])(
handle
);
...
...
source/libs/transport/src/transCli.c
浏览文件 @
388aac88
...
...
@@ -42,7 +42,7 @@ typedef struct SCliConn {
typedef
struct
SCliMsg
{
STransConnCtx
*
ctx
;
S
RpcMsg
msg
;
S
TransMsg
msg
;
queue
q
;
uint64_t
st
;
}
SCliMsg
;
...
...
@@ -105,9 +105,9 @@ static void cliHandleExcept(SCliConn* conn);
static
void
cliHandleReq
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
);
static
void
cliHandleQuit
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
);
static
void
cliSendQuit
(
SCliThrdObj
*
thrd
);
static
void
destroyUserdata
(
S
Rpc
Msg
*
userdata
);
static
void
destroyUserdata
(
S
Trans
Msg
*
userdata
);
static
int
cliRBChoseIdx
(
S
RpcInfo
*
pTransInst
);
static
int
cliRBChoseIdx
(
S
Trans
*
pTransInst
);
static
void
destroyCmsg
(
SCliMsg
*
cmsg
);
static
void
transDestroyConnCtx
(
STransConnCtx
*
ctx
);
...
...
@@ -118,11 +118,11 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
#define CONN_GET_INST_LABEL(conn) (((S
RpcInfo
*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label)
#define CONN_GET_INST_LABEL(conn) (((S
Trans
*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label)
#define CONN_HANDLE_THREAD_QUIT(conn, thrd) \
do { \
if (thrd->quit) { \
cliHandleExcept(conn); \
cliHandleExcept(conn);
\
goto _RETURE; \
} \
} while (0)
...
...
@@ -130,20 +130,25 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
#define CONN_HANDLE_BROKEN(conn) \
do { \
if (conn->broken) { \
cliHandleExcept(conn); \
cliHandleExcept(conn);
\
goto _RETURE; \
} \
} while (0);
static
void
*
cliWorkThread
(
void
*
arg
);
#define CONN_SET_PERSIST_BY_APP(conn) \
do { \
if (conn->persist == false) { \
conn->persist = true; \
transRefCliHandle(conn); \
} \
} while (0)
#define CONN_NO_PERSIST_BY_APP(conn) ((conn)->persist == false)
static
void
*
cliNotifyApp
()
{}
static
void
cliHandleResp
(
SCliConn
*
conn
)
{
SCliMsg
*
pMsg
=
conn
->
data
;
STransConnCtx
*
pCtx
=
pMsg
->
ctx
;
static
void
*
cliWorkThread
(
void
*
arg
);
void
cliHandleResp
(
SCliConn
*
conn
)
{
SCliThrdObj
*
pThrd
=
conn
->
hostThrd
;
S
RpcInfo
*
pTransInst
=
pThrd
->
pTransInst
;
S
Trans
*
pTransInst
=
pThrd
->
pTransInst
;
STransMsgHead
*
pHead
=
(
STransMsgHead
*
)(
conn
->
readBuf
.
buf
);
pHead
->
code
=
htonl
(
pHead
->
code
);
...
...
@@ -152,19 +157,29 @@ static void cliHandleResp(SCliConn* conn) {
// buf's mem alread translated to rpcMsg.pCont
transClearBuffer
(
&
conn
->
readBuf
);
S
Rpc
Msg
rpcMsg
=
{
0
};
S
Trans
Msg
rpcMsg
=
{
0
};
rpcMsg
.
contLen
=
transContLenFromMsg
(
pHead
->
msgLen
);
rpcMsg
.
pCont
=
transContFromHead
((
char
*
)
pHead
);
rpcMsg
.
code
=
pHead
->
code
;
rpcMsg
.
msgType
=
pHead
->
msgType
;
rpcMsg
.
ahandle
=
pCtx
->
ahandle
;
rpcMsg
.
ahandle
=
NULL
;
if
(
pTransInst
->
pfp
!=
NULL
&&
(
pTransInst
->
pfp
)(
pTransInst
->
parent
,
rpcMsg
.
msgType
))
{
rpcMsg
.
handle
=
conn
;
transRefCliHandle
(
conn
);
SCliMsg
*
pMsg
=
conn
->
data
;
STransConnCtx
*
pCtx
=
pMsg
?
pMsg
->
ctx
:
NULL
;
if
(
pMsg
==
NULL
&&
!
CONN_NO_PERSIST_BY_APP
(
conn
))
{
rpcMsg
.
ahandle
=
pTransInst
->
mfp
?
(
*
pTransInst
->
mfp
)(
pTransInst
->
parent
,
rpcMsg
.
msgType
)
:
NULL
;
}
else
{
rpcMsg
.
ahandle
=
pCtx
?
pCtx
->
ahandle
:
NULL
;
}
// if (rpcMsg.ahandle == NULL) {
// tDebug("%s cli conn %p handle except", CONN_GET_INST_LABEL(conn), conn);
// return;
//}
conn
->
persist
=
1
;
tDebug
(
"cli conn %p persist by app"
,
conn
);
if
(
pTransInst
->
pfp
!=
NULL
&&
(
*
pTransInst
->
pfp
)(
pTransInst
->
parent
,
rpcMsg
.
msgType
))
{
rpcMsg
.
handle
=
conn
;
CONN_SET_PERSIST_BY_APP
(
conn
);
tDebug
(
"%s cli conn %p ref by app"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
}
tDebug
(
"%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d"
,
pTransInst
->
label
,
conn
,
...
...
@@ -173,7 +188,7 @@ static void cliHandleResp(SCliConn* conn) {
conn
->
secured
=
pHead
->
secured
;
if
(
pCtx
->
pSem
==
NULL
)
{
if
(
pCtx
==
NULL
||
pCtx
->
pSem
==
NULL
)
{
tTrace
(
"%s cli conn %p handle resp"
,
pTransInst
->
label
,
conn
);
(
pTransInst
->
cfp
)(
pTransInst
->
parent
,
&
rpcMsg
,
NULL
);
}
else
{
...
...
@@ -184,8 +199,7 @@ static void cliHandleResp(SCliConn* conn) {
uv_read_start
((
uv_stream_t
*
)
conn
->
stream
,
cliAllocBufferCb
,
cliRecvCb
);
// user owns conn->persist = 1
if
(
conn
->
persist
==
0
)
{
if
(
CONN_NO_PERSIST_BY_APP
(
conn
))
{
addConnToPool
(
pThrd
->
pool
,
pCtx
->
ip
,
pCtx
->
port
,
conn
);
}
destroyCmsg
(
conn
->
data
);
...
...
@@ -196,24 +210,32 @@ static void cliHandleResp(SCliConn* conn) {
// uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0);
}
}
static
void
cliHandleExcept
(
SCliConn
*
pConn
)
{
void
cliHandleExcept
(
SCliConn
*
pConn
)
{
if
(
pConn
->
data
==
NULL
)
{
// handle conn except in conn pool
transUnrefCliHandle
(
pConn
);
return
;
if
(
pConn
->
broken
==
true
||
CONN_NO_PERSIST_BY_APP
(
pConn
))
{
transUnrefCliHandle
(
pConn
);
return
;
}
}
SCliThrdObj
*
pThrd
=
pConn
->
hostThrd
;
S
RpcInfo
*
pTransInst
=
pThrd
->
pTransInst
;
S
Trans
*
pTransInst
=
pThrd
->
pTransInst
;
SCliMsg
*
pMsg
=
pConn
->
data
;
STransConnCtx
*
pCtx
=
pMsg
->
ctx
;
STransConnCtx
*
pCtx
=
pMsg
?
pMsg
->
ctx
:
NULL
;
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
ahandle
=
pCtx
->
ahandle
;
STransMsg
rpcMsg
=
{
0
};
rpcMsg
.
code
=
TSDB_CODE_RPC_NETWORK_UNAVAIL
;
rpcMsg
.
msgType
=
pMsg
->
msg
.
msgType
+
1
;
rpcMsg
.
msgType
=
pMsg
?
pMsg
->
msg
.
msgType
+
1
:
0
;
rpcMsg
.
ahandle
=
NULL
;
if
(
pMsg
==
NULL
&&
!
CONN_NO_PERSIST_BY_APP
(
pConn
))
{
rpcMsg
.
ahandle
=
pTransInst
->
mfp
?
(
*
pTransInst
->
mfp
)(
pTransInst
->
parent
,
rpcMsg
.
msgType
)
:
NULL
;
}
else
{
rpcMsg
.
ahandle
=
pCtx
?
pCtx
->
ahandle
:
NULL
;
}
if
(
pCtx
->
pSem
==
NULL
)
{
if
(
pCtx
==
NULL
||
pCtx
->
pSem
==
NULL
)
{
tTrace
(
"%s cli conn %p handle resp"
,
pTransInst
->
label
,
pConn
);
(
pTransInst
->
cfp
)(
pTransInst
->
parent
,
&
rpcMsg
,
NULL
);
}
else
{
...
...
@@ -228,9 +250,9 @@ static void cliHandleExcept(SCliConn* pConn) {
transUnrefCliHandle
(
pConn
);
}
static
void
cliTimeoutCb
(
uv_timer_t
*
handle
)
{
void
cliTimeoutCb
(
uv_timer_t
*
handle
)
{
SCliThrdObj
*
pThrd
=
handle
->
data
;
S
RpcInfo
*
pRpc
=
pThrd
->
pTransInst
;
S
Trans
*
pRpc
=
pThrd
->
pTransInst
;
int64_t
currentTime
=
pThrd
->
nextTimeout
;
tTrace
(
"%s, cli conn timeout, try to remove expire conn from conn pool"
,
pRpc
->
label
);
...
...
@@ -252,11 +274,12 @@ static void cliTimeoutCb(uv_timer_t* handle) {
pThrd
->
nextTimeout
=
taosGetTimestampMs
()
+
CONN_PERSIST_TIME
(
pRpc
->
idleTime
);
uv_timer_start
(
handle
,
cliTimeoutCb
,
CONN_PERSIST_TIME
(
pRpc
->
idleTime
)
/
2
,
0
);
}
static
void
*
createConnPool
(
int
size
)
{
void
*
createConnPool
(
int
size
)
{
// thread local, no lock
return
taosHashInit
(
size
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
}
static
void
*
destroyConnPool
(
void
*
pool
)
{
void
*
destroyConnPool
(
void
*
pool
)
{
SConnList
*
connList
=
taosHashIterate
((
SHashObj
*
)
pool
,
NULL
);
while
(
connList
!=
NULL
)
{
while
(
!
QUEUE_IS_EMPTY
(
&
connList
->
conn
))
{
...
...
@@ -301,7 +324,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
tstrncpy
(
key
+
strlen
(
key
),
(
char
*
)(
&
port
),
sizeof
(
port
));
tTrace
(
"cli conn %p added to conn pool, read buf cap: %d"
,
conn
,
conn
->
readBuf
.
cap
);
S
RpcInfo
*
pRpc
=
((
SCliThrdObj
*
)
conn
->
hostThrd
)
->
pTransInst
;
S
Trans
*
pRpc
=
((
SCliThrdObj
*
)
conn
->
hostThrd
)
->
pTransInst
;
conn
->
expireTime
=
taosGetTimestampMs
()
+
CONN_PERSIST_TIME
(
pRpc
->
idleTime
);
SConnList
*
plist
=
taosHashGet
((
SHashObj
*
)
pool
,
key
,
strlen
(
key
));
...
...
@@ -358,6 +381,7 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) {
QUEUE_INIT
(
&
conn
->
conn
);
conn
->
hostThrd
=
pThrd
;
conn
->
persist
=
false
;
conn
->
broken
=
false
;
transRefCliHandle
(
conn
);
return
conn
;
...
...
@@ -395,16 +419,16 @@ static void cliSendCb(uv_write_t* req, int status) {
uv_read_start
((
uv_stream_t
*
)
pConn
->
stream
,
cliAllocBufferCb
,
cliRecvCb
);
}
static
void
cliSend
(
SCliConn
*
pConn
)
{
void
cliSend
(
SCliConn
*
pConn
)
{
CONN_HANDLE_BROKEN
(
pConn
);
SCliMsg
*
pCliMsg
=
pConn
->
data
;
STransConnCtx
*
pCtx
=
pCliMsg
->
ctx
;
SCliThrdObj
*
pThrd
=
pConn
->
hostThrd
;
S
RpcInfo
*
pTransInst
=
pThrd
->
pTransInst
;
S
Trans
*
pTransInst
=
pThrd
->
pTransInst
;
S
RpcMsg
*
pMsg
=
(
SRpc
Msg
*
)(
&
pCliMsg
->
msg
);
S
TransMsg
*
pMsg
=
(
STrans
Msg
*
)(
&
pCliMsg
->
msg
);
STransMsgHead
*
pHead
=
transHeadFromCont
(
pMsg
->
pCont
);
int
msgLen
=
transMsgLenFromCont
(
pMsg
->
contLen
);
...
...
@@ -442,7 +466,8 @@ static void cliSend(SCliConn* pConn) {
_RETURE:
return
;
}
static
void
cliConnCb
(
uv_connect_t
*
req
,
int
status
)
{
void
cliConnCb
(
uv_connect_t
*
req
,
int
status
)
{
// impl later
SCliConn
*
pConn
=
req
->
data
;
if
(
status
!=
0
)
{
...
...
@@ -472,11 +497,11 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
pThrd
->
quit
=
true
;
uv_stop
(
pThrd
->
loop
);
}
static
SCliConn
*
cliGetConn
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
)
{
SCliConn
*
cliGetConn
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
)
{
SCliConn
*
conn
=
NULL
;
if
(
pMsg
->
msg
.
handle
!=
NULL
)
{
conn
=
(
SCliConn
*
)(
pMsg
->
msg
.
handle
);
transUnrefCliHandle
(
conn
);
if
(
conn
!=
NULL
)
{
tTrace
(
"%s cli conn %p reused"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
}
...
...
@@ -487,13 +512,14 @@ static SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) {
}
return
conn
;
}
static
void
cliHandleReq
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
)
{
void
cliHandleReq
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
)
{
uint64_t
et
=
taosGetTimestampUs
();
uint64_t
el
=
et
-
pMsg
->
st
;
tTrace
(
"%s cli msg tran time cost: %"
PRIu64
"us"
,
((
S
RpcInfo
*
)
pThrd
->
pTransInst
)
->
label
,
el
);
tTrace
(
"%s cli msg tran time cost: %"
PRIu64
"us"
,
((
S
Trans
*
)
pThrd
->
pTransInst
)
->
label
,
el
);
STransConnCtx
*
pCtx
=
pMsg
->
ctx
;
S
RpcInfo
*
pTransInst
=
pThrd
->
pTransInst
;
S
Trans
*
pTransInst
=
pThrd
->
pTransInst
;
SCliConn
*
conn
=
cliGetConn
(
pMsg
,
pThrd
);
if
(
conn
!=
NULL
)
{
...
...
@@ -514,6 +540,7 @@ static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
tTrace
(
"%s cli conn %p try to connect to %s:%d"
,
pTransInst
->
label
,
conn
,
pMsg
->
ctx
->
ip
,
pMsg
->
ctx
->
port
);
uv_tcp_connect
(
&
conn
->
connReq
,
(
uv_tcp_t
*
)(
conn
->
stream
),
(
const
struct
sockaddr
*
)
&
addr
,
cliConnCb
);
}
conn
->
hThrdIdx
=
pCtx
->
hThrdIdx
;
}
static
void
cliAsyncCb
(
uv_async_t
*
handle
)
{
...
...
@@ -522,7 +549,7 @@ static void cliAsyncCb(uv_async_t* handle) {
SCliMsg
*
pMsg
=
NULL
;
// batch process to avoid to lock/unlock frequently
queue
wq
;
queue
wq
;
pthread_mutex_lock
(
&
item
->
mtx
);
QUEUE_MOVE
(
&
item
->
qmsg
,
&
wq
);
pthread_mutex_unlock
(
&
item
->
mtx
);
...
...
@@ -551,10 +578,10 @@ static void* cliWorkThread(void* arg) {
uv_run
(
pThrd
->
loop
,
UV_RUN_DEFAULT
);
}
void
*
t
ao
sInitClient
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
{
void
*
t
ran
sInitClient
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
{
SCliObj
*
cli
=
calloc
(
1
,
sizeof
(
SCliObj
));
S
RpcInfo
*
pRpc
=
shandle
;
S
Trans
*
pRpc
=
shandle
;
memcpy
(
cli
->
label
,
label
,
strlen
(
label
));
cli
->
numOfThreads
=
numOfThreads
;
cli
->
pThreadObj
=
(
SCliThrdObj
**
)
calloc
(
cli
->
numOfThreads
,
sizeof
(
SCliThrdObj
*
));
...
...
@@ -573,7 +600,7 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
return
cli
;
}
static
void
destroyUserdata
(
S
Rpc
Msg
*
userdata
)
{
static
void
destroyUserdata
(
S
Trans
Msg
*
userdata
)
{
if
(
userdata
->
pCont
==
NULL
)
{
return
;
}
...
...
@@ -629,12 +656,20 @@ static void transDestroyConnCtx(STransConnCtx* ctx) {
free
(
ctx
);
}
//
static
void
cliSendQuit
(
SCliThrdObj
*
thrd
)
{
void
cliSendQuit
(
SCliThrdObj
*
thrd
)
{
// cli can stop gracefully
SCliMsg
*
msg
=
calloc
(
1
,
sizeof
(
SCliMsg
));
transSendAsync
(
thrd
->
asyncPool
,
&
msg
->
q
);
}
void
taosCloseClient
(
void
*
arg
)
{
int
cliRBChoseIdx
(
STrans
*
pTransInst
)
{
int64_t
index
=
pTransInst
->
index
;
if
(
pTransInst
->
index
++
>=
pTransInst
->
numOfThreads
)
{
pTransInst
->
index
=
0
;
}
return
index
%
pTransInst
->
numOfThreads
;
}
void
transCloseClient
(
void
*
arg
)
{
SCliObj
*
cli
=
arg
;
for
(
int
i
=
0
;
i
<
cli
->
numOfThreads
;
i
++
)
{
cliSendQuit
(
cli
->
pThreadObj
[
i
]);
...
...
@@ -643,13 +678,6 @@ void taosCloseClient(void* arg) {
free
(
cli
->
pThreadObj
);
free
(
cli
);
}
static
int
cliRBChoseIdx
(
SRpcInfo
*
pTransInst
)
{
int64_t
index
=
pTransInst
->
index
;
if
(
pTransInst
->
index
++
>=
pTransInst
->
numOfThreads
)
{
pTransInst
->
index
=
0
;
}
return
index
%
pTransInst
->
numOfThreads
;
}
void
transRefCliHandle
(
void
*
handle
)
{
if
(
handle
==
NULL
)
{
return
;
...
...
@@ -665,17 +693,11 @@ void transUnrefCliHandle(void* handle) {
if
(
ref
==
0
)
{
cliDestroyConn
((
SCliConn
*
)
handle
,
true
);
}
// unref cli handle
}
void
rpcSendRequest
(
void
*
shandle
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
int64_t
*
pRid
)
{
// impl later
char
*
ip
=
(
char
*
)(
pEpSet
->
eps
[
pEpSet
->
inUse
].
fqdn
);
uint32_t
port
=
pEpSet
->
eps
[
pEpSet
->
inUse
].
port
;
SRpcInfo
*
pTransInst
=
(
SRpcInfo
*
)
shandle
;
int
index
=
CONN_HOST_THREAD_INDEX
(
pMsg
->
handle
);
void
transSendRequest
(
void
*
shandle
,
const
char
*
ip
,
uint32_t
port
,
STransMsg
*
pMsg
)
{
STrans
*
pTransInst
=
(
STrans
*
)
shandle
;
int
index
=
CONN_HOST_THREAD_INDEX
((
SCliConn
*
)
pMsg
->
handle
);
if
(
index
==
-
1
)
{
index
=
cliRBChoseIdx
(
pTransInst
);
}
...
...
@@ -683,6 +705,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
if
(
transCompressMsg
(
pMsg
->
pCont
,
pMsg
->
contLen
,
&
flen
))
{
// imp later
}
tDebug
(
"send request at thread:%d %p"
,
index
,
pMsg
);
STransConnCtx
*
pCtx
=
calloc
(
1
,
sizeof
(
STransConnCtx
));
pCtx
->
ahandle
=
pMsg
->
ahandle
;
pCtx
->
msgType
=
pMsg
->
msgType
;
...
...
@@ -701,14 +724,9 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
SCliThrdObj
*
thrd
=
((
SCliObj
*
)
pTransInst
->
tcphandle
)
->
pThreadObj
[
index
];
transSendAsync
(
thrd
->
asyncPool
,
&
(
cliMsg
->
q
));
}
void
rpcSendRecv
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pReq
,
SRpcMsg
*
pRsp
)
{
char
*
ip
=
(
char
*
)(
pEpSet
->
eps
[
pEpSet
->
inUse
].
fqdn
);
uint32_t
port
=
pEpSet
->
eps
[
pEpSet
->
inUse
].
port
;
SRpcInfo
*
pTransInst
=
(
SRpcInfo
*
)
shandle
;
int
index
=
CONN_HOST_THREAD_INDEX
(
pReq
->
handle
);
void
transSendRecv
(
void
*
shandle
,
const
char
*
ip
,
uint32_t
port
,
STransMsg
*
pReq
,
STransMsg
*
pRsp
)
{
STrans
*
pTransInst
=
(
STrans
*
)
shandle
;
int
index
=
CONN_HOST_THREAD_INDEX
(
pReq
->
handle
);
if
(
index
==
-
1
)
{
index
=
cliRBChoseIdx
(
pTransInst
);
}
...
...
@@ -734,7 +752,6 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) {
tsem_wait
(
pSem
);
tsem_destroy
(
pSem
);
free
(
pSem
);
return
;
}
#endif
source/libs/transport/src/transSrv.c
浏览文件 @
388aac88
...
...
@@ -37,8 +37,7 @@ typedef struct SSrvConn {
struct
sockaddr_in
addr
;
struct
sockaddr_in
locaddr
;
// SRpcMsg sendMsg;
// del later
char
secured
;
int
spi
;
char
info
[
64
];
...
...
@@ -49,7 +48,7 @@ typedef struct SSrvConn {
typedef
struct
SSrvMsg
{
SSrvConn
*
pConn
;
S
RpcMsg
msg
;
S
TransMsg
msg
;
queue
q
;
}
SSrvMsg
;
...
...
@@ -207,20 +206,20 @@ static void uvHandleReq(SSrvConn* pConn) {
pConn
->
inType
=
pHead
->
msgType
;
S
RpcInfo
*
pRpc
=
(
SRpcInfo
*
)
p
->
shandle
;
S
Trans
*
pRpc
=
(
STrans
*
)
p
->
shandle
;
pHead
->
code
=
htonl
(
pHead
->
code
);
int32_t
dlen
=
0
;
if
(
transDecompressMsg
(
NULL
,
0
,
NULL
))
{
// add compress later
// pHead = rpcDecompres
sRpc
Msg(pHead);
// pHead = rpcDecompres
STrans
Msg(pHead);
}
else
{
pHead
->
msgLen
=
htonl
(
pHead
->
msgLen
);
// impl later
//
}
S
Rpc
Msg
rpcMsg
;
S
Trans
Msg
rpcMsg
;
rpcMsg
.
contLen
=
transContLenFromMsg
(
pHead
->
msgLen
);
rpcMsg
.
pCont
=
pHead
->
content
;
rpcMsg
.
msgType
=
pHead
->
msgType
;
...
...
@@ -260,7 +259,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
}
tError
(
"server conn %p read error: %s"
,
conn
,
uv_err_name
(
nread
));
if
(
nread
<
0
||
nread
==
UV_EOF
)
{
if
(
nread
<
0
)
{
conn
->
broken
=
true
;
transUnrefSrvHandle
(
conn
);
...
...
@@ -318,8 +317,8 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
// impl later;
tTrace
(
"server conn %p prepare to send resp"
,
smsg
->
pConn
);
SSrvConn
*
pConn
=
smsg
->
pConn
;
S
RpcMsg
*
pMsg
=
&
smsg
->
msg
;
SSrvConn
*
pConn
=
smsg
->
pConn
;
S
TransMsg
*
pMsg
=
&
smsg
->
msg
;
if
(
pMsg
->
pCont
==
0
)
{
pMsg
->
pCont
=
(
void
*
)
rpcMallocCont
(
0
);
pMsg
->
contLen
=
0
;
...
...
@@ -547,7 +546,7 @@ static bool addHandleToWorkloop(void* arg) {
return
false
;
}
// S
RpcInfo
* pRpc = pThrd->shandle;
// S
Trans
* pRpc = pThrd->shandle;
uv_pipe_init
(
pThrd
->
loop
,
pThrd
->
pipe
,
1
);
uv_pipe_open
(
pThrd
->
pipe
,
pThrd
->
fd
);
...
...
@@ -668,7 +667,7 @@ static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen) {
return
msgLen
;
}
void
*
t
ao
sInitServer
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
{
void
*
t
ran
sInitServer
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
{
SServerObj
*
srv
=
calloc
(
1
,
sizeof
(
SServerObj
));
srv
->
loop
=
(
uv_loop_t
*
)
malloc
(
sizeof
(
uv_loop_t
));
srv
->
numOfThreads
=
numOfThreads
;
...
...
@@ -720,7 +719,7 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
return
srv
;
End:
t
ao
sCloseServer
(
srv
);
t
ran
sCloseServer
(
srv
);
return
NULL
;
}
...
...
@@ -740,7 +739,7 @@ void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
transSendAsync
(
pThrd
->
asyncPool
,
&
srvMsg
->
q
);
}
void
t
ao
sCloseServer
(
void
*
arg
)
{
void
t
ran
sCloseServer
(
void
*
arg
)
{
// impl later
SServerObj
*
srv
=
arg
;
for
(
int
i
=
0
;
i
<
srv
->
numOfThreads
;
i
++
)
{
...
...
@@ -786,7 +785,7 @@ void transUnrefSrvHandle(void* handle) {
}
// unref srv handle
}
void
rpcSendResponse
(
const
SRpc
Msg
*
pMsg
)
{
void
transSendResponse
(
const
STrans
Msg
*
pMsg
)
{
if
(
pMsg
->
handle
==
NULL
)
{
return
;
}
...
...
@@ -799,14 +798,12 @@ void rpcSendResponse(const SRpcMsg* pMsg) {
tTrace
(
"server conn %p start to send resp"
,
pConn
);
transSendAsync
(
pThrd
->
asyncPool
,
&
srvMsg
->
q
);
}
int
rpcGetConnInfo
(
void
*
thandle
,
SRpcConnInfo
*
pInfo
)
{
SSrvConn
*
pConn
=
thandle
;
int
transGetConnInfo
(
void
*
thandle
,
STransHandleInfo
*
pInfo
)
{
SSrvConn
*
pConn
=
thandle
;
struct
sockaddr_in
addr
=
pConn
->
addr
;
pInfo
->
clientIp
=
(
uint32_t
)(
addr
.
sin_addr
.
s_addr
);
pInfo
->
clientPort
=
ntohs
(
addr
.
sin_port
);
tstrncpy
(
pInfo
->
user
,
pConn
->
user
,
sizeof
(
pInfo
->
user
));
return
0
;
}
...
...
source/os/src/osLocale.c
浏览文件 @
388aac88
...
...
@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define ALLOW_FORBID_FUNC
#define _DEFAULT_SOURCE
#include "osLocale.h"
...
...
tests/script/sh/massiveTable/compileVersion.sh
浏览文件 @
388aac88
...
...
@@ -45,10 +45,6 @@ function gitPullBranchInfo () {
git pull origin
$branch_name
||
:
echo
"==== git pull
$branch_name
end ===="
git pull
--recurse-submodules
cd
tests
git checkout
$branch_name
git pull
cd
..
}
function
compileTDengineVersion
()
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录