Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4a3ee65e
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4a3ee65e
编写于
7月 06, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'feat/tsdb_refact' of github.com:taosdata/TDengine into feat/tsdb_refact
上级
3091f111
fc20a61c
变更
28
展开全部
隐藏空白更改
内联
并排
Showing
28 changed file
with
399 addition
and
344 deletion
+399
-344
include/common/tmsg.h
include/common/tmsg.h
+6
-2
include/common/tmsgdef.h
include/common/tmsgdef.h
+7
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+0
-1
source/client/src/clientMain.c
source/client/src/clientMain.c
+2
-3
source/dnode/mgmt/mgmt_vnode/src/vmFile.c
source/dnode/mgmt/mgmt_vnode/src/vmFile.c
+1
-2
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
+4
-3
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+16
-25
source/dnode/mgmt/node_mgmt/src/dmMgmt.c
source/dnode/mgmt/node_mgmt/src/dmMgmt.c
+1
-0
source/dnode/mgmt/node_mgmt/src/dmTransport.c
source/dnode/mgmt/node_mgmt/src/dmTransport.c
+5
-4
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+6
-3
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+2
-2
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+1
-1
source/dnode/vnode/src/tq/tqOffset.c
source/dnode/vnode/src/tq/tqOffset.c
+1
-0
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+1
-1
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+1
-1
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+174
-211
source/libs/command/src/command.c
source/libs/command/src/command.c
+2
-4
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+5
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+2
-1
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+18
-16
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+76
-16
source/libs/planner/test/planOptimizeTest.cpp
source/libs/planner/test/planOptimizeTest.cpp
+5
-0
source/libs/scalar/src/sclfunc.c
source/libs/scalar/src/sclfunc.c
+16
-11
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+8
-8
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+27
-17
tests/system-test/2-query/timetruncate.py
tests/system-test/2-query/timetruncate.py
+9
-8
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+1
-1
tests/system-test/simpletest.bat
tests/system-test/simpletest.bat
+2
-2
未找到文件。
include/common/tmsg.h
浏览文件 @
4a3ee65e
...
...
@@ -55,8 +55,12 @@ extern int32_t tMsgDict[];
#define TMSG_SEG_CODE(TYPE) (((TYPE)&0xff00) >> 8)
#define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff)
#define TMSG_INFO(TYPE) \
(((TYPE) >= 0 && (TYPE) < TDMT_MAX) ? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] : 0)
#define TMSG_INFO(TYPE) \
((TYPE) >= 0 && \
((TYPE) < TDMT_DND_MAX_MSG | (TYPE) < TDMT_MND_MAX_MSG | (TYPE) < TDMT_VND_MAX_MSG | (TYPE) < TDMT_SCH_MAX_MSG | \
(TYPE) < TDMT_STREAM_MAX_MSG | (TYPE) < TDMT_MON_MAX_MSG | (TYPE) < TDMT_SYNC_MAX_MSG)) \
? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] \
: 0
#define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE))
typedef
uint16_t
tmsg_t
;
...
...
include/common/tmsgdef.h
浏览文件 @
4a3ee65e
...
...
@@ -82,6 +82,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_DND_NET_TEST
,
"net-test"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_CONFIG_DNODE
,
"config-dnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_SYSTABLE_RETRIEVE
,
"dnode-retrieve"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_MAX_MSG
,
"dnd-max"
,
NULL
,
NULL
)
TD_NEW_MSG_SEG
(
TDMT_MND_MSG
)
TD_DEF_MSG_TYPE
(
TDMT_MND_CONNECT
,
"connect"
,
NULL
,
NULL
)
...
...
@@ -164,6 +165,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_MND_SPLIT_VGROUP
,
"split-vgroup"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_SHOW_VARIABLES
,
"show-variables"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_SERVER_VERSION
,
"server-version"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_MAX_MSG
,
"mnd-max"
,
NULL
,
NULL
)
TD_NEW_MSG_SEG
(
TDMT_VND_MSG
)
TD_DEF_MSG_TYPE
(
TDMT_VND_SUBMIT
,
"submit"
,
SSubmitReq
,
SSubmitRsp
)
...
...
@@ -199,6 +201,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_VND_COMPACT
,
"compact"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_DROP_TTL_TABLE
,
"drop-ttl-stb"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_COMMIT
,
"commit vnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MAX_MSG
,
"vnd-max"
,
NULL
,
NULL
)
TD_NEW_MSG_SEG
(
TDMT_SCH_MSG
)
TD_DEF_MSG_TYPE
(
TDMT_SCH_QUERY
,
"query"
,
NULL
,
NULL
)
...
...
@@ -210,6 +213,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_SCH_DROP_TASK
,
"drop-task"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SCH_EXPLAIN
,
"explain"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SCH_LINK_BROKEN
,
"link-broken"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SCH_MAX_MSG
,
"sch-max"
,
NULL
,
NULL
)
TD_NEW_MSG_SEG
(
TDMT_STREAM_MSG
)
TD_DEF_MSG_TYPE
(
TDMT_STREAM_TASK_DEPLOY
,
"stream-task-deploy"
,
SStreamTaskDeployReq
,
SStreamTaskDeployRsp
)
...
...
@@ -218,6 +222,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_STREAM_TASK_DISPATCH
,
"stream-task-dispatch"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_STREAM_TASK_RECOVER
,
"stream-task-recover"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_STREAM_RETRIEVE
,
"stream-retrieve"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_STREAM_MAX_MSG
,
"stream-max"
,
NULL
,
NULL
)
TD_NEW_MSG_SEG
(
TDMT_MON_MSG
)
TD_DEF_MSG_TYPE
(
TDMT_MON_MM_INFO
,
"monitor-minfo"
,
NULL
,
NULL
)
...
...
@@ -228,6 +233,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_MON_VM_LOAD
,
"monitor-vload"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MON_MM_LOAD
,
"monitor-mload"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MON_QM_LOAD
,
"monitor-qload"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MON_MAX_MSG
,
"monitor-max"
,
NULL
,
NULL
)
TD_NEW_MSG_SEG
(
TDMT_SYNC_MSG
)
TD_DEF_MSG_TYPE
(
TDMT_SYNC_TIMEOUT
,
"sync-timer"
,
NULL
,
NULL
)
...
...
@@ -252,6 +258,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_SYNC_LEADER_TRANSFER
,
"sync-leader-transfer"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SYNC_SET_MNODE_STANDBY
,
"set-mnode-standby"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SYNC_SET_VNODE_STANDBY
,
"set-vnode-standby"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SYNC_MAX_MSG
,
"sync-max"
,
NULL
,
NULL
)
#if defined(TD_MSG_NUMBER_)
TDMT_MAX
...
...
source/client/src/clientImpl.c
浏览文件 @
4a3ee65e
...
...
@@ -279,7 +279,6 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
}
pRequest
->
body
.
queryFp
(
pRequest
->
body
.
param
,
pRequest
,
code
);
// pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows);
}
int32_t
asyncExecDdlQuery
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
)
{
...
...
source/client/src/clientMain.c
浏览文件 @
4a3ee65e
...
...
@@ -665,8 +665,6 @@ static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) {
}
void
retrieveMetaCallback
(
SMetaData
*
pResultMeta
,
void
*
param
,
int32_t
code
)
{
tscDebug
(
"enter meta callback, code %s"
,
tstrerror
(
code
));
SqlParseWrapper
*
pWrapper
=
(
SqlParseWrapper
*
)
param
;
SQuery
*
pQuery
=
pWrapper
->
pQuery
;
SRequestObj
*
pRequest
=
pWrapper
->
pRequest
;
...
...
@@ -686,10 +684,11 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
TSWAP
(
pRequest
->
tableList
,
(
pQuery
)
->
pTableList
);
destorySqlParseWrapper
(
pWrapper
);
tscDebug
(
"0x%"
PRIx64
" analysis semantics completed, start async query, reqId:0x%"
PRIx64
,
pRequest
->
self
,
pRequest
->
requestId
);
launchAsyncQuery
(
pRequest
,
pQuery
,
pResultMeta
);
}
else
{
destorySqlParseWrapper
(
pWrapper
);
tscDebug
(
"error happens, code:%d"
,
code
);
if
(
NEED_CLIENT_HANDLE_ERROR
(
code
))
{
tscDebug
(
"0x%"
PRIx64
" client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%"
PRIx64
,
pRequest
->
self
,
code
,
tstrerror
(
code
),
pRequest
->
retry
,
pRequest
->
requestId
);
...
...
source/dnode/mgmt/mgmt_vnode/src/vmFile.c
浏览文件 @
4a3ee65e
...
...
@@ -32,8 +32,7 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) {
if
(
pVnode
&&
num
<
size
)
{
int32_t
refCount
=
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
// dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
pVnodes
[
num
]
=
(
*
ppVnode
);
num
++
;
pVnodes
[
num
++
]
=
(
*
ppVnode
);
pIter
=
taosHashIterate
(
pMgmt
->
hash
,
pIter
);
}
else
{
taosHashCancelIterate
(
pMgmt
->
hash
,
pIter
);
...
...
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
浏览文件 @
4a3ee65e
...
...
@@ -88,7 +88,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
while
(
!
taosQueueEmpty
(
pVnode
->
pApplyQ
))
taosMsleep
(
10
);
while
(
!
taosQueueEmpty
(
pVnode
->
pQueryQ
))
taosMsleep
(
10
);
while
(
!
taosQueueEmpty
(
pVnode
->
pFetchQ
))
taosMsleep
(
10
);
dTrace
(
"vgId:%d, vnode
-fetch
queue is empty"
,
pVnode
->
vgId
);
dTrace
(
"vgId:%d, vnode queue is empty"
,
pVnode
->
vgId
);
vmFreeQueue
(
pMgmt
,
pVnode
);
vnodeClose
(
pVnode
->
pImpl
);
...
...
@@ -140,7 +140,7 @@ static void *vmOpenVnodeInThread(void *param) {
}
static
int32_t
vmOpenVnodes
(
SVnodeMgmt
*
pMgmt
)
{
pMgmt
->
hash
=
taosHashInit
(
TSDB_MIN_VNODES
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_
NO
_LOCK
);
pMgmt
->
hash
=
taosHashInit
(
TSDB_MIN_VNODES
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_
ENTRY
_LOCK
);
if
(
pMgmt
->
hash
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
dError
(
"failed to init vnode hash since %s"
,
terrstr
());
...
...
@@ -156,7 +156,8 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
pMgmt
->
state
.
totalVnodes
=
numOfVnodes
;
int32_t
threadNum
=
1
;
int32_t
threadNum
=
tsNumOfCores
/
2
;
if
(
threadNum
<
1
)
threadNum
=
1
;
int32_t
vnodesPerThread
=
numOfVnodes
/
threadNum
+
1
;
SVnodeThread
*
threads
=
taosMemoryCalloc
(
threadNum
,
sizeof
(
SVnodeThread
));
...
...
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
浏览文件 @
4a3ee65e
...
...
@@ -107,35 +107,13 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
dGTrace
(
"vgId:%d, msg:%p get from vnode-sync queue"
,
pVnode
->
vgId
,
pMsg
);
int32_t
code
=
vnodeProcessSync
Req
(
pVnode
->
pImpl
,
pMsg
,
NULL
);
// no response here
int32_t
code
=
vnodeProcessSync
Msg
(
pVnode
->
pImpl
,
pMsg
,
NULL
);
// no response here
dGTrace
(
"vgId:%d, msg:%p is freed, code:0x%x"
,
pVnode
->
vgId
,
pMsg
,
code
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
}
static
void
vmProcessMergeQueue
(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
SVnodeObj
*
pVnode
=
pInfo
->
ahandle
;
SRpcMsg
*
pMsg
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
if
(
taosGetQitem
(
qall
,
(
void
**
)
&
pMsg
)
==
0
)
continue
;
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
dGTrace
(
"vgId:%d, msg:%p get from vnode-merge queue"
,
pVnode
->
vgId
,
pMsg
);
int32_t
code
=
vnodeProcessFetchMsg
(
pVnode
->
pImpl
,
pMsg
,
pInfo
);
if
(
code
!=
0
)
{
if
(
terrno
!=
0
)
code
=
terrno
;
dGError
(
"vgId:%d, msg:%p failed to merge since %s"
,
pVnode
->
vgId
,
pMsg
,
terrstr
());
vmSendRsp
(
pMsg
,
code
);
}
dGTrace
(
"msg:%p, is freed, code:0x%x"
,
pMsg
,
code
);
rpcFreeCont
(
pMsg
->
pCont
);
taosFreeQitem
(
pMsg
);
}
}
static
int32_t
vmPutMsgToQueue
(
SVnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
,
EQueueType
qtype
)
{
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
SMsgHead
*
pHead
=
pMsg
->
pCont
;
...
...
@@ -207,7 +185,11 @@ int32_t vmPutMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t
vmPutRpcMsgToQueue
(
SVnodeMgmt
*
pMgmt
,
EQueueType
qtype
,
SRpcMsg
*
pRpc
)
{
SRpcMsg
*
pMsg
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
),
RPC_QITEM
);
if
(
pMsg
==
NULL
)
return
-
1
;
if
(
pMsg
==
NULL
)
{
rpcFreeCont
(
pMsg
->
pCont
);
pRpc
->
pCont
=
NULL
;
return
-
1
;
}
SMsgHead
*
pHead
=
pRpc
->
pCont
;
dTrace
(
"vgId:%d, msg:%p is created, type:%s"
,
pHead
->
vgId
,
pMsg
,
TMSG_INFO
(
pRpc
->
msgType
));
...
...
@@ -215,7 +197,16 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
pHead
->
contLen
=
htonl
(
pHead
->
contLen
);
pHead
->
vgId
=
htonl
(
pHead
->
vgId
);
memcpy
(
pMsg
,
pRpc
,
sizeof
(
SRpcMsg
));
return
vmPutMsgToQueue
(
pMgmt
,
pMsg
,
qtype
);
int32_t
code
=
vmPutMsgToQueue
(
pMgmt
,
pMsg
,
qtype
);
if
(
code
!=
0
)
{
dTrace
(
"msg:%p, is freed"
,
pMsg
);
taosFreeQitem
(
pMsg
);
rpcFreeCont
(
pMsg
->
pCont
);
pRpc
->
pCont
=
NULL
;
}
return
code
;
}
int32_t
vmGetQueueSize
(
SVnodeMgmt
*
pMgmt
,
int32_t
vgId
,
EQueueType
qtype
)
{
...
...
source/dnode/mgmt/node_mgmt/src/dmMgmt.c
浏览文件 @
4a3ee65e
...
...
@@ -212,6 +212,7 @@ void dmCleanupDnode(SDnode *pDnode) {
dmCleanupClient
(
pDnode
);
dmCleanupServer
(
pDnode
);
dmClearVars
(
pDnode
);
rpcCleanup
();
dDebug
(
"dnode is closed, ptr:%p"
,
pDnode
);
}
...
...
source/dnode/mgmt/node_mgmt/src/dmTransport.c
浏览文件 @
4a3ee65e
...
...
@@ -71,9 +71,9 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
}
static
void
dmProcessRpcMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpc
,
SEpSet
*
pEpSet
)
{
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
SDnodeTrans
*
pTrans
=
&
pDnode
->
trans
;
int32_t
code
=
-
1
;
SRpcMsg
*
pMsg
=
NULL
;
SRpcMsg
*
pMsg
=
NULL
;
SMgmtWrapper
*
pWrapper
=
NULL
;
SDnodeHandle
*
pHandle
=
&
pTrans
->
msgHandles
[
TMSG_INDEX
(
pRpc
->
msgType
)];
...
...
@@ -185,6 +185,7 @@ _OVER:
taosFreeQitem
(
pMsg
);
}
rpcFreeCont
(
pRpc
->
pCont
);
pRpc
->
pCont
=
NULL
;
}
dmReleaseWrapper
(
pWrapper
);
...
...
@@ -195,11 +196,11 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
for
(
EDndNodeType
ntype
=
DNODE
;
ntype
<
NODE_END
;
++
ntype
)
{
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
ntype
];
SArray
*
pArray
=
(
*
pWrapper
->
func
.
getHandlesFp
)();
SArray
*
pArray
=
(
*
pWrapper
->
func
.
getHandlesFp
)();
if
(
pArray
==
NULL
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pArray
);
++
i
)
{
SMgmtHandle
*
pMgmt
=
taosArrayGet
(
pArray
,
i
);
SMgmtHandle
*
pMgmt
=
taosArrayGet
(
pArray
,
i
);
SDnodeHandle
*
pHandle
=
&
pTrans
->
msgHandles
[
TMSG_INDEX
(
pMgmt
->
msgType
)];
if
(
pMgmt
->
needCheckVgId
)
{
pHandle
->
needCheckVgId
=
pMgmt
->
needCheckVgId
;
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
4a3ee65e
...
...
@@ -739,9 +739,12 @@ int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDbInput, SVgObj *pVgroup) {
pDb
=
mndAcquireDb
(
pMnode
,
pVgroup
->
dbName
);
}
int64_t
vgroupMemroy
=
(
int64_t
)
pDb
->
cfg
.
buffer
*
1024
*
1024
+
(
int64_t
)
pDb
->
cfg
.
pages
*
pDb
->
cfg
.
pageSize
*
1024
;
if
(
pDb
->
cfg
.
cacheLastRow
>
0
)
{
vgroupMemroy
+=
(
int64_t
)
pDb
->
cfg
.
lastRowMem
*
1024
*
1024
;
int64_t
vgroupMemroy
=
0
;
if
(
pDb
!=
NULL
)
{
vgroupMemroy
=
(
int64_t
)
pDb
->
cfg
.
buffer
*
1024
*
1024
+
(
int64_t
)
pDb
->
cfg
.
pages
*
pDb
->
cfg
.
pageSize
*
1024
;
if
(
pDb
->
cfg
.
cacheLastRow
>
0
)
{
vgroupMemroy
+=
(
int64_t
)
pDb
->
cfg
.
lastRowMem
*
1024
*
1024
;
}
}
if
(
pDbInput
==
NULL
)
{
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
4a3ee65e
...
...
@@ -51,10 +51,10 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
void
vnodeDestroy
(
const
char
*
path
,
STfs
*
pTfs
);
SVnode
*
vnodeOpen
(
const
char
*
path
,
STfs
*
pTfs
,
SMsgCb
msgCb
);
void
vnodeClose
(
SVnode
*
pVnode
);
int32_t
vnodePre
p
rocessReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
int32_t
vnodePre
P
rocessReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
int32_t
vnodeProcessWriteReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
int64_t
version
,
SRpcMsg
*
pRsp
);
int32_t
vnodeProcessCMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
);
int32_t
vnodeProcessSync
Req
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
);
int32_t
vnodeProcessSync
Msg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
);
int32_t
vnodePreprocessQueryMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
int32_t
vnodeProcessQueryMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
int32_t
vnodeProcessFetchMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SQueueInfo
*
pInfo
);
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
4a3ee65e
...
...
@@ -242,7 +242,7 @@ struct SVnode {
SSink
*
pSink
;
tsem_t
canCommit
;
int64_t
sync
;
int32_t
sync
Count
;
int32_t
block
Count
;
tsem_t
syncSem
;
SQHandle
*
pQuery
;
};
...
...
source/dnode/vnode/src/tq/tqOffset.c
浏览文件 @
4a3ee65e
...
...
@@ -85,6 +85,7 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) {
void
tqOffsetClose
(
STqOffsetStore
*
pStore
)
{
tqOffsetSnapshot
(
pStore
);
taosHashCleanup
(
pStore
->
pHash
);
taosMemoryFree
(
pStore
);
}
STqOffset
*
tqOffsetRead
(
STqOffsetStore
*
pStore
,
const
char
*
subscribeKey
)
{
...
...
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
4a3ee65e
...
...
@@ -83,7 +83,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
pVnode
->
state
.
commitID
=
info
.
state
.
commitID
;
pVnode
->
pTfs
=
pTfs
;
pVnode
->
msgCb
=
msgCb
;
pVnode
->
sync
Count
=
0
;
pVnode
->
block
Count
=
0
;
tsem_init
(
&
pVnode
->
syncSem
,
0
,
0
);
tsem_init
(
&
(
pVnode
->
canCommit
),
0
,
1
);
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
4a3ee65e
...
...
@@ -28,7 +28,7 @@ static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, vo
static
int32_t
vnodeProcessDropTtlTbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
static
int32_t
vnodeProcessDeleteReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
int32_t
vnodePre
p
rocessReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
int32_t
vnodePre
P
rocessReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
int32_t
code
=
0
;
SDecoder
dc
=
{
0
};
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
4a3ee65e
此差异已折叠。
点击以展开。
source/libs/command/src/command.c
浏览文件 @
4a3ee65e
...
...
@@ -548,19 +548,17 @@ static int32_t execShowLocalVariables(SRetrieveTableRsp** pRsp) {
}
static
int32_t
createSelectResultDataBlock
(
SNodeList
*
pProjects
,
SSDataBlock
**
pOutput
)
{
SSDataBlock
*
pBlock
=
taosMemoryCalloc
(
1
,
sizeof
(
SSDataBlock
)
);
SSDataBlock
*
pBlock
=
createDataBlock
(
);
if
(
NULL
==
pBlock
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pBlock
->
pDataBlock
=
taosArrayInit
(
LIST_LENGTH
(
pProjects
),
sizeof
(
SColumnInfoData
));
SNode
*
pProj
=
NULL
;
FOREACH
(
pProj
,
pProjects
)
{
SColumnInfoData
infoData
=
{
0
};
infoData
.
info
.
type
=
((
SExprNode
*
)
pProj
)
->
resType
.
type
;
infoData
.
info
.
bytes
=
((
SExprNode
*
)
pProj
)
->
resType
.
bytes
;
taosArrayPush
(
pBlock
->
pData
Block
,
&
infoData
);
blockDataAppendColInfo
(
p
Block
,
&
infoData
);
}
*
pOutput
=
pBlock
;
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
4a3ee65e
...
...
@@ -27,6 +27,10 @@ static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
int32_t
exchangeObjRefPool
=
-
1
;
static
void
initRefPool
()
{
exchangeObjRefPool
=
taosOpenRef
(
1024
,
doDestroyExchangeOperatorInfo
);
}
static
void
cleanupRefPool
()
{
int32_t
ref
=
atomic_val_compare_exchange_32
(
&
exchangeObjRefPool
,
exchangeObjRefPool
,
0
);
taosCloseRef
(
ref
);
}
int32_t
qCreateExecTask
(
SReadHandle
*
readHandle
,
int32_t
vgId
,
uint64_t
taskId
,
SSubplan
*
pSubplan
,
qTaskInfo_t
*
pTaskInfo
,
DataSinkHandle
*
handle
,
const
char
*
sql
,
EOPTR_EXEC_MODEL
model
)
{
...
...
@@ -34,7 +38,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
SExecTaskInfo
**
pTask
=
(
SExecTaskInfo
**
)
pTaskInfo
;
taosThreadOnce
(
&
initPoolOnce
,
initRefPool
);
atexit
(
cleanupRefPool
);
int32_t
code
=
createExecTaskInfoImpl
(
pSubplan
,
pTask
,
readHandle
,
taskId
,
sql
,
model
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
4a3ee65e
...
...
@@ -1367,6 +1367,7 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) {
}
taosArrayDestroy
(
pInfo
->
scanCols
);
taosMemoryFreeClear
(
pInfo
->
pUser
);
}
static
int32_t
getSysTableDbNameColId
(
const
char
*
pTable
)
{
...
...
@@ -1788,8 +1789,8 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
getPerfDbMeta
(
&
pSysDbTableMeta
,
&
size
);
p
->
info
.
rows
=
buildDbTableInfoBlock
(
p
,
pSysDbTableMeta
,
size
,
TSDB_PERFORMANCE_SCHEMA_DB
);
relocateColumnData
(
pInfo
->
pRes
,
pInfo
->
scanCols
,
p
->
pDataBlock
,
false
);
pInfo
->
pRes
->
info
.
rows
=
p
->
info
.
rows
;
relocateColumnData
(
pInfo
->
pRes
,
pInfo
->
scanCols
,
p
->
pDataBlock
,
false
);
blockDataDestroy
(
p
);
return
pInfo
->
pRes
->
info
.
rows
;
...
...
source/libs/function/src/builtins.c
浏览文件 @
4a3ee65e
...
...
@@ -48,13 +48,19 @@ static int32_t validateTimeUnitParam(uint8_t dbPrec, const SValueNode* pVal) {
return
TIME_UNIT_INVALID
;
}
if
(
TSDB_TIME_PRECISION_MILLI
==
dbPrec
&&
0
==
strcasecmp
(
pVal
->
literal
,
"1u"
))
{
if
(
TSDB_TIME_PRECISION_MILLI
==
dbPrec
&&
(
0
==
strcasecmp
(
pVal
->
literal
,
"1u"
)
||
0
==
strcasecmp
(
pVal
->
literal
,
"1b"
)))
{
return
TIME_UNIT_TOO_SMALL
;
}
if
(
pVal
->
literal
[
0
]
!=
'1'
||
(
pVal
->
literal
[
1
]
!=
'u'
&&
pVal
->
literal
[
1
]
!=
'a'
&&
pVal
->
literal
[
1
]
!=
's'
&&
pVal
->
literal
[
1
]
!=
'm'
&&
pVal
->
literal
[
1
]
!=
'h'
&&
pVal
->
literal
[
1
]
!=
'd'
&&
pVal
->
literal
[
1
]
!=
'w'
))
{
if
(
TSDB_TIME_PRECISION_MICRO
==
dbPrec
&&
0
==
strcasecmp
(
pVal
->
literal
,
"1b"
))
{
return
TIME_UNIT_TOO_SMALL
;
}
if
(
pVal
->
literal
[
0
]
!=
'1'
||
(
pVal
->
literal
[
1
]
!=
'u'
&&
pVal
->
literal
[
1
]
!=
'a'
&&
pVal
->
literal
[
1
]
!=
's'
&&
pVal
->
literal
[
1
]
!=
'm'
&&
pVal
->
literal
[
1
]
!=
'h'
&&
pVal
->
literal
[
1
]
!=
'd'
&&
pVal
->
literal
[
1
]
!=
'w'
&&
pVal
->
literal
[
1
]
!=
'b'
))
{
return
TIME_UNIT_INVALID
;
}
...
...
@@ -700,9 +706,8 @@ static int32_t translateElapsed(SFunctionNode* pFunc, char* pErrBuf, int32_t len
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_ERROR
,
"ELAPSED function time unit parameter should be greater than db precision"
);
}
else
if
(
ret
==
TIME_UNIT_INVALID
)
{
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_ERROR
,
"ELAPSED function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]"
);
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_ERROR
,
"ELAPSED function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]"
);
}
}
...
...
@@ -1223,9 +1228,8 @@ static int32_t translateStateDuration(SFunctionNode* pFunc, char* pErrBuf, int32
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_ERROR
,
"STATEDURATION function time unit parameter should be greater than db precision"
);
}
else
if
(
ret
==
TIME_UNIT_INVALID
)
{
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_ERROR
,
"STATEDURATION function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]"
);
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_ERROR
,
"STATEDURATION function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]"
);
}
}
...
...
@@ -1735,9 +1739,8 @@ static int32_t translateTimeTruncate(SFunctionNode* pFunc, char* pErrBuf, int32_
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_ERROR
,
"TIMETRUNCATE function time unit parameter should be greater than db precision"
);
}
else
if
(
ret
==
TIME_UNIT_INVALID
)
{
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_ERROR
,
"TIMETRUNCATE function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]"
);
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_ERROR
,
"TIMETRUNCATE function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]"
);
}
addDbPrecisonParam
(
&
pFunc
->
pParameterList
,
dbPrec
);
...
...
@@ -1775,9 +1778,8 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_ERROR
,
"TIMEDIFF function time unit parameter should be greater than db precision"
);
}
else
if
(
ret
==
TIME_UNIT_INVALID
)
{
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_ERROR
,
"TIMEDIFF function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]"
);
return
buildFuncErrMsg
(
pErrBuf
,
len
,
TSDB_CODE_FUNC_FUNTION_ERROR
,
"TIMEDIFF function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]"
);
}
}
...
...
source/libs/planner/src/planOptimizer.c
浏览文件 @
4a3ee65e
...
...
@@ -490,18 +490,7 @@ static int32_t pushDownCondOptPushCondToJoin(SOptimizeContext* pCxt, SJoinLogicN
}
static
int32_t
pushDownCondOptPushCondToChild
(
SOptimizeContext
*
pCxt
,
SLogicNode
*
pChild
,
SNode
**
pCond
)
{
switch
(
nodeType
(
pChild
))
{
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
return
pushDownCondOptPushCondToScan
(
pCxt
,
(
SScanLogicNode
*
)
pChild
,
pCond
);
case
QUERY_NODE_LOGIC_PLAN_PROJECT
:
return
pushDownCondOptPushCondToProject
(
pCxt
,
(
SProjectLogicNode
*
)
pChild
,
pCond
);
case
QUERY_NODE_LOGIC_PLAN_JOIN
:
return
pushDownCondOptPushCondToJoin
(
pCxt
,
(
SJoinLogicNode
*
)
pChild
,
pCond
);
default:
break
;
}
planError
(
"pushDownCondOptPushCondToChild failed, invalid logic plan node %s"
,
nodesNodeName
(
nodeType
(
pChild
)));
return
TSDB_CODE_PLAN_INTERNAL_ERROR
;
return
pushDownCondOptAppendCond
(
&
pChild
->
pConditions
,
pCond
);
}
static
bool
pushDownCondOptIsPriKey
(
SNode
*
pNode
,
SNodeList
*
pTableCols
)
{
...
...
@@ -802,10 +791,7 @@ static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAg
return
TSDB_CODE_SUCCESS
;
}
// TODO: remove it after full implementation of pushing down to child
if
(
1
!=
LIST_LENGTH
(
pAgg
->
node
.
pChildren
)
||
QUERY_NODE_LOGIC_PLAN_SCAN
!=
nodeType
(
nodesListGetNode
(
pAgg
->
node
.
pChildren
,
0
))
&&
QUERY_NODE_LOGIC_PLAN_PROJECT
!=
nodeType
(
nodesListGetNode
(
pAgg
->
node
.
pChildren
,
0
))
&&
QUERY_NODE_LOGIC_PLAN_JOIN
!=
nodeType
(
nodesListGetNode
(
pAgg
->
node
.
pChildren
,
0
)))
{
if
(
1
!=
LIST_LENGTH
(
pAgg
->
node
.
pChildren
))
{
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -832,6 +818,77 @@ static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAg
return
code
;
}
typedef
struct
SRewriteProjCondContext
{
SProjectLogicNode
*
pProj
;
int32_t
errCode
;
}
SRewriteProjCondContext
;
static
EDealRes
rewriteProjectCondForPushDownImpl
(
SNode
**
ppNode
,
void
*
pContext
)
{
SRewriteProjCondContext
*
pCxt
=
pContext
;
SProjectLogicNode
*
pProj
=
pCxt
->
pProj
;
if
(
QUERY_NODE_COLUMN
==
nodeType
(
*
ppNode
))
{
SNode
*
pTarget
=
NULL
;
FOREACH
(
pTarget
,
pProj
->
node
.
pTargets
)
{
if
(
nodesEqualNode
(
pTarget
,
*
ppNode
))
{
SNode
*
pProjection
=
NULL
;
FOREACH
(
pProjection
,
pProj
->
pProjections
)
{
if
(
0
==
strcmp
(((
SExprNode
*
)
pProjection
)
->
aliasName
,
((
SColumnNode
*
)(
*
ppNode
))
->
colName
))
{
SNode
*
pExpr
=
nodesCloneNode
(
pProjection
);
if
(
pExpr
==
NULL
)
{
pCxt
->
errCode
=
terrno
;
return
DEAL_RES_ERROR
;
}
nodesDestroyNode
(
*
ppNode
);
*
ppNode
=
pExpr
;
}
// end if expr alias name equal column name
}
// end for each project
}
// end if target node equals cond column node
}
// end for each targets
return
DEAL_RES_IGNORE_CHILD
;
}
return
DEAL_RES_CONTINUE
;
}
static
int32_t
rewriteProjectCondForPushDown
(
SOptimizeContext
*
pCxt
,
SProjectLogicNode
*
pProject
,
SNode
**
ppProjectCond
)
{
SRewriteProjCondContext
cxt
=
{.
pProj
=
pProject
,
.
errCode
=
TSDB_CODE_SUCCESS
};
SNode
*
pProjectCond
=
pProject
->
node
.
pConditions
;
nodesRewriteExpr
(
&
pProjectCond
,
rewriteProjectCondForPushDownImpl
,
&
cxt
);
*
ppProjectCond
=
pProjectCond
;
pProject
->
node
.
pConditions
=
NULL
;
return
cxt
.
errCode
;
}
static
int32_t
pushDownCondOptDealProject
(
SOptimizeContext
*
pCxt
,
SProjectLogicNode
*
pProject
)
{
if
(
NULL
==
pProject
->
node
.
pConditions
||
OPTIMIZE_FLAG_TEST_MASK
(
pProject
->
node
.
optimizedFlag
,
OPTIMIZE_FLAG_PUSH_DOWN_CONDE
))
{
return
TSDB_CODE_SUCCESS
;
}
// TODO: remove it after full implementation of pushing down to child
if
(
1
!=
LIST_LENGTH
(
pProject
->
node
.
pChildren
))
{
return
TSDB_CODE_SUCCESS
;
}
if
(
NULL
!=
pProject
->
node
.
pLimit
||
NULL
!=
pProject
->
node
.
pSlimit
)
{
return
TSDB_CODE_SUCCESS
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
SNode
*
pProjCond
=
NULL
;
code
=
rewriteProjectCondForPushDown
(
pCxt
,
pProject
,
&
pProjCond
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
SLogicNode
*
pChild
=
(
SLogicNode
*
)
nodesListGetNode
(
pProject
->
node
.
pChildren
,
0
);
code
=
pushDownCondOptPushCondToChild
(
pCxt
,
pChild
,
&
pProjCond
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
OPTIMIZE_FLAG_SET_MASK
(
pProject
->
node
.
optimizedFlag
,
OPTIMIZE_FLAG_PUSH_DOWN_CONDE
);
pCxt
->
optimized
=
true
;
}
else
{
nodesDestroyNode
(
pProjCond
);
}
return
code
;
}
static
int32_t
pushDownCondOptimizeImpl
(
SOptimizeContext
*
pCxt
,
SLogicNode
*
pLogicNode
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
switch
(
nodeType
(
pLogicNode
))
{
...
...
@@ -844,6 +901,9 @@ static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLog
case
QUERY_NODE_LOGIC_PLAN_AGG
:
code
=
pushDownCondOptDealAgg
(
pCxt
,
(
SAggLogicNode
*
)
pLogicNode
);
break
;
case
QUERY_NODE_LOGIC_PLAN_PROJECT
:
code
=
pushDownCondOptDealProject
(
pCxt
,
(
SProjectLogicNode
*
)
pLogicNode
);
break
;
default:
break
;
}
...
...
source/libs/planner/test/planOptimizeTest.cpp
浏览文件 @
4a3ee65e
...
...
@@ -81,3 +81,8 @@ TEST_F(PlanOptimizeTest, eliminateProjection) {
run
(
"SELECT c1 FROM st1s3"
);
// run("select 1-abs(c1) from (select unique(c1) c1 from st1s3) order by 1 nulls first");
}
TEST_F
(
PlanOptimizeTest
,
pushDownProjectCond
)
{
useDb
(
"root"
,
"test"
);
run
(
"select 1-abs(c1) from (select unique(c1) c1 from st1s3) where 1-c1>5 order by 1 nulls first"
);
}
\ No newline at end of file
source/libs/scalar/src/sclfunc.c
浏览文件 @
4a3ee65e
...
...
@@ -1174,7 +1174,7 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
int64_t
factor
=
(
timePrec
==
TSDB_TIME_PRECISION_MILLI
)
?
1000
:
(
timePrec
==
TSDB_TIME_PRECISION_MICRO
?
1000000
:
1000000000
);
timeU
nit
=
timeUnit
*
1000
/
factor
;
int64_t
u
nit
=
timeUnit
*
1000
/
factor
;
for
(
int32_t
i
=
0
;
i
<
pInput
[
0
].
numOfRows
;
++
i
)
{
if
(
colDataIsNull_s
(
pInput
[
0
].
columnData
,
i
))
{
...
...
@@ -1209,12 +1209,14 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
NUM_TO_STRING
(
TSDB_DATA_TYPE_BIGINT
,
&
timeVal
,
sizeof
(
buf
),
buf
);
int32_t
tsDigits
=
(
int32_t
)
strlen
(
buf
);
switch
(
timeU
nit
)
{
case
0
:
{
/* 1u */
switch
(
u
nit
)
{
case
0
:
{
/* 1u
or 1b
*/
if
(
tsDigits
==
TSDB_TIME_PRECISION_NANO_DIGITS
)
{
timeVal
=
timeVal
/
1000
*
1000
;
//} else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) {
// //timeVal = timeVal / 1000;
if
(
timePrec
==
TSDB_TIME_PRECISION_NANO
&&
timeUnit
==
1
)
{
timeVal
=
timeVal
*
1
;
}
else
{
timeVal
=
timeVal
/
1000
*
1000
;
}
}
else
if
(
tsDigits
<=
TSDB_TIME_PRECISION_SEC_DIGITS
)
{
timeVal
=
timeVal
*
factor
;
}
else
{
...
...
@@ -1366,8 +1368,6 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
int64_t
factor
=
(
timePrec
==
TSDB_TIME_PRECISION_MILLI
)
?
1000
:
(
timePrec
==
TSDB_TIME_PRECISION_MICRO
?
1000000
:
1000000000
);
timeUnit
=
timeUnit
*
1000
/
factor
;
int32_t
numOfRows
=
0
;
for
(
int32_t
i
=
0
;
i
<
inputNum
;
++
i
)
{
if
(
pInput
[
i
].
numOfRows
>
numOfRows
)
{
...
...
@@ -1447,9 +1447,14 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
}
}
}
else
{
switch
(
timeUnit
)
{
case
0
:
{
/* 1u */
result
=
result
/
1000
;
int64_t
unit
=
timeUnit
*
1000
/
factor
;
switch
(
unit
)
{
case
0
:
{
/* 1u or 1b */
if
(
timePrec
==
TSDB_TIME_PRECISION_NANO
&&
timeUnit
==
1
)
{
result
=
result
/
1
;
}
else
{
result
=
result
/
1000
;
}
break
;
}
case
1
:
{
/* 1a */
...
...
source/libs/transport/inc/transComm.h
浏览文件 @
4a3ee65e
...
...
@@ -96,8 +96,8 @@ typedef void* queue[2];
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
#define TRANS_RETRY_INTERVAL
15
// ms retry interval
#define TRANS_CONN_TIMEOUT
3
// connect timeout
#define TRANS_RETRY_INTERVAL
15
// ms retry interval
#define TRANS_CONN_TIMEOUT
3
// connect timeout
typedef
SRpcMsg
STransMsg
;
typedef
SRpcCtx
STransCtx
;
...
...
@@ -180,18 +180,18 @@ typedef enum { Normal, Quit, Release, Register, Update } STransMsgType;
typedef
enum
{
ConnNormal
,
ConnAcquire
,
ConnRelease
,
ConnBroken
,
ConnInPool
}
ConnStatus
;
#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
#define RPC_RESERVE_SIZE (sizeof(STranConnCtx))
#define RPC_RESERVE_SIZE
(sizeof(STranConnCtx))
#define rpcIsReq(type) (type & 1U)
#define TRANS_RESERVE_SIZE (sizeof(STranConnCtx))
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead))
#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
#define transContFromHead(msg) (msg + sizeof(STransMsgHead))
#define TRANS_MSG_OVERHEAD
(sizeof(STransMsgHead))
#define transHeadFromCont(cont)
((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
#define transContFromHead(msg)
(msg + sizeof(STransMsgHead))
#define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead))
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
#define transIsReq(type) (type & 1U)
#define transContLenFromMsg(msgLen)
(msgLen - sizeof(STransMsgHead));
#define transIsReq(type)
(type & 1U)
#define transLabel(trans) ((STrans*)trans)->label
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
4a3ee65e
...
...
@@ -241,18 +241,19 @@ static void uvHandleReq(SSvrConn* pConn) {
tDebug
(
"conn %p acquired by server app"
,
pConn
);
}
}
STrans
*
pTransInst
=
pConn
->
pTransInst
;
STraceId
*
trace
=
&
pHead
->
traceId
;
if
(
pConn
->
status
==
ConnNormal
&&
pHead
->
noResp
==
0
)
{
transRefSrvHandle
(
pConn
);
tGTrace
(
"%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d"
,
transLabel
(
p
Conn
),
pConn
,
tGTrace
(
"%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d"
,
transLabel
(
p
TransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
localAddr
.
sin_addr
),
ntohs
(
pConn
->
localAddr
.
sin_port
),
transMsg
.
contLen
);
}
else
{
tGTrace
(
"%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d, code: %d"
,
transLabel
(
pConn
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
localAddr
.
sin_addr
),
ntohs
(
pConn
->
localAddr
.
sin_port
),
transMsg
.
contLen
,
pHead
->
noResp
,
transMsg
.
code
);
tGTrace
(
"%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d, code: %d"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
localAddr
.
sin_addr
),
ntohs
(
pConn
->
localAddr
.
sin_port
)
,
transMsg
.
co
ntLen
,
pHead
->
noResp
,
transMsg
.
co
de
);
// no ref here
}
...
...
@@ -265,8 +266,8 @@ static void uvHandleReq(SSvrConn* pConn) {
transMsg
.
info
.
refId
=
pConn
->
refId
;
transMsg
.
info
.
traceId
=
pHead
->
traceId
;
tGTrace
(
"%s handle %p conn: %p translated to app, refId: %"
PRIu64
""
,
transLabel
(
p
Conn
),
transMsg
.
info
.
handle
,
pConn
,
pConn
->
refId
);
tGTrace
(
"%s handle %p conn: %p translated to app, refId: %"
PRIu64
""
,
transLabel
(
p
TransInst
),
transMsg
.
info
.
handle
,
pConn
,
pConn
->
refId
);
assert
(
transMsg
.
info
.
handle
!=
NULL
);
if
(
pHead
->
noResp
==
1
)
{
...
...
@@ -281,7 +282,6 @@ static void uvHandleReq(SSvrConn* pConn) {
transReleaseExHandle
(
transGetRefMgt
(),
pConn
->
refId
);
STrans
*
pTransInst
=
pConn
->
pTransInst
;
(
*
pTransInst
->
cfp
)(
pTransInst
->
parent
,
&
transMsg
,
NULL
);
// uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
}
...
...
@@ -290,14 +290,15 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
// opt
SSvrConn
*
conn
=
cli
->
data
;
SConnBuffer
*
pBuf
=
&
conn
->
readBuf
;
STrans
*
pTransInst
=
conn
->
pTransInst
;
if
(
nread
>
0
)
{
pBuf
->
len
+=
nread
;
tTrace
(
"%s conn %p total read: %d, current read: %d"
,
transLabel
(
conn
->
pTransInst
),
conn
,
pBuf
->
len
,
(
int
)
nread
);
tTrace
(
"%s conn %p total read: %d, current read: %d"
,
transLabel
(
pTransInst
),
conn
,
pBuf
->
len
,
(
int
)
nread
);
if
(
transReadComplete
(
pBuf
))
{
tTrace
(
"%s conn %p alread read complete packet"
,
transLabel
(
conn
->
pTransInst
),
conn
);
tTrace
(
"%s conn %p alread read complete packet"
,
transLabel
(
pTransInst
),
conn
);
uvHandleReq
(
conn
);
}
else
{
tTrace
(
"%s conn %p read partial packet, continue to read"
,
transLabel
(
conn
->
pTransInst
),
conn
);
tTrace
(
"%s conn %p read partial packet, continue to read"
,
transLabel
(
pTransInst
),
conn
);
}
return
;
}
...
...
@@ -305,12 +306,12 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
return
;
}
tError
(
"%s conn %p read error: %s"
,
transLabel
(
conn
->
pTransInst
),
conn
,
uv_err_name
(
nread
));
tError
(
"%s conn %p read error: %s"
,
transLabel
(
pTransInst
),
conn
,
uv_err_name
(
nread
));
if
(
nread
<
0
)
{
conn
->
broken
=
true
;
if
(
conn
->
status
==
ConnAcquire
)
{
if
(
conn
->
regArg
.
init
)
{
tTrace
(
"%s conn %p broken, notify server app"
,
transLabel
(
conn
->
pTransInst
),
conn
);
tTrace
(
"%s conn %p broken, notify server app"
,
transLabel
(
pTransInst
),
conn
);
STrans
*
pTransInst
=
conn
->
pTransInst
;
(
*
pTransInst
->
cfp
)(
pTransInst
->
parent
,
&
(
conn
->
regArg
.
msg
),
NULL
);
memset
(
&
conn
->
regArg
,
0
,
sizeof
(
conn
->
regArg
));
...
...
@@ -414,8 +415,9 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
char
*
msg
=
(
char
*
)
pHead
;
int32_t
len
=
transMsgLenFromCont
(
pMsg
->
contLen
);
STrans
*
pTransInst
=
pConn
->
pTransInst
;
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
tGTrace
(
"%s conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d"
,
transLabel
(
p
Conn
->
p
TransInst
),
pConn
,
tGTrace
(
"%s conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
pHead
->
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
localAddr
.
sin_addr
),
ntohs
(
pConn
->
localAddr
.
sin_port
),
len
);
pHead
->
msgLen
=
htonl
(
len
);
...
...
@@ -761,9 +763,10 @@ static SSvrConn* createConn(void* hThrd) {
exh
->
refId
=
transAddExHandle
(
transGetRefMgt
(),
exh
);
transAcquireExHandle
(
transGetRefMgt
(),
exh
->
refId
);
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
pConn
->
refId
=
exh
->
refId
;
transRefSrvHandle
(
pConn
);
tTrace
(
"%s handle %p, conn %p created, refId: %"
PRId64
""
,
transLabel
(
pT
hrd
->
pT
ransInst
),
exh
,
pConn
,
pConn
->
refId
);
tTrace
(
"%s handle %p, conn %p created, refId: %"
PRId64
""
,
transLabel
(
pTransInst
),
exh
,
pConn
,
pConn
->
refId
);
return
pConn
;
}
...
...
@@ -812,7 +815,13 @@ static void uvDestroyConn(uv_handle_t* handle) {
transReleaseExHandle
(
transGetRefMgt
(),
conn
->
refId
);
transRemoveExHandle
(
transGetRefMgt
(),
conn
->
refId
);
tDebug
(
"%s conn %p destroy"
,
transLabel
(
thrd
->
pTransInst
),
conn
);
STrans
*
pTransInst
=
thrd
->
pTransInst
;
tDebug
(
"%s conn %p destroy"
,
transLabel
(
pTransInst
),
conn
);
for
(
int
i
=
0
;
i
<
transQueueSize
(
&
conn
->
srvMsgs
);
i
++
)
{
SSvrMsg
*
msg
=
transQueueGet
(
&
conn
->
srvMsgs
,
i
);
destroySmsg
(
msg
);
}
transQueueDestroy
(
&
conn
->
srvMsgs
);
QUEUE_REMOVE
(
&
conn
->
queue
);
...
...
@@ -1103,7 +1112,8 @@ void transRegisterMsg(const STransMsg* msg) {
m
->
msg
=
tmsg
;
m
->
type
=
Register
;
tTrace
(
"%s conn %p start to register brokenlink callback"
,
transLabel
(
pThrd
->
pTransInst
),
exh
->
handle
);
STrans
*
pTransInst
=
pThrd
->
pTransInst
;
tTrace
(
"%s conn %p start to register brokenlink callback"
,
transLabel
(
pTransInst
),
exh
->
handle
);
transAsyncSend
(
pThrd
->
asyncPool
,
&
m
->
q
);
transReleaseExHandle
(
transGetRefMgt
(),
refId
);
return
;
...
...
tests/system-test/2-query/timetruncate.py
浏览文件 @
4a3ee65e
...
...
@@ -24,7 +24,8 @@ class TDTestCase:
]
self
.
db_param_precision
=
[
'ms'
,
'us'
,
'ns'
]
self
.
time_unit
=
[
'1w'
,
'1d'
,
'1h'
,
'1m'
,
'1s'
,
'1a'
,
'1u'
]
self
.
error_unit
=
[
'1b'
,
'2w'
,
'2d'
,
'2h'
,
'2m'
,
'2s'
,
'2a'
,
'2u'
,
'1c'
,
'#1'
]
#self.error_unit = ['1b','2w','2d','2h','2m','2s','2a','2u','1c','#1']
self
.
error_unit
=
[
'2w'
,
'2d'
,
'2h'
,
'2m'
,
'2s'
,
'2a'
,
'2u'
,
'1c'
,
'#1'
]
self
.
ntbname
=
'ntb'
self
.
stbname
=
'stb'
self
.
ctbname
=
'ctb'
...
...
@@ -92,7 +93,7 @@ class TDTestCase:
elif
unit
.
lower
()
==
'1d'
:
for
i
in
range
(
len
(
self
.
ts_str
)):
ts_result
=
self
.
get_ms_timestamp
(
str
(
tdSql
.
queryResult
[
i
][
0
]))
tdSql
.
checkEqual
(
ts_result
,
int
(
date_time
[
i
]
/
1000
/
60
/
60
/
24
)
*
24
*
60
*
60
*
1000
)
tdSql
.
checkEqual
(
ts_result
,
int
(
date_time
[
i
]
/
1000
/
60
/
60
/
24
)
*
24
*
60
*
60
*
1000
)
elif
unit
.
lower
()
==
'1w'
:
for
i
in
range
(
len
(
self
.
ts_str
)):
ts_result
=
self
.
get_ms_timestamp
(
str
(
tdSql
.
queryResult
[
i
][
0
]))
...
...
@@ -121,7 +122,7 @@ class TDTestCase:
elif
unit
.
lower
()
==
'1d'
:
for
i
in
range
(
len
(
self
.
ts_str
)):
ts_result
=
self
.
get_us_timestamp
(
str
(
tdSql
.
queryResult
[
i
][
0
]))
tdSql
.
checkEqual
(
ts_result
,
int
(
date_time
[
i
]
/
1000
/
1000
/
60
/
60
/
24
)
*
24
*
60
*
60
*
1000
*
1000
)
tdSql
.
checkEqual
(
ts_result
,
int
(
date_time
[
i
]
/
1000
/
1000
/
60
/
60
/
24
)
*
24
*
60
*
60
*
1000
*
1000
)
elif
unit
.
lower
()
==
'1w'
:
for
i
in
range
(
len
(
self
.
ts_str
)):
ts_result
=
self
.
get_us_timestamp
(
str
(
tdSql
.
queryResult
[
i
][
0
]))
...
...
@@ -144,21 +145,21 @@ class TDTestCase:
tdSql
.
checkEqual
(
tdSql
.
queryResult
[
i
][
0
],
int
(
date_time
[
i
]
*
1000
/
1000
/
1000
/
1000
/
60
/
60
)
*
60
*
60
*
1000
*
1000
*
1000
)
elif
unit
.
lower
()
==
'1d'
:
for
i
in
range
(
len
(
self
.
ts_str
)):
tdSql
.
checkEqual
(
tdSql
.
queryResult
[
i
][
0
],
int
(
date_time
[
i
]
*
1000
/
1000
/
1000
/
1000
/
60
/
60
/
24
)
*
24
*
60
*
60
*
1000
*
1000
*
1000
)
tdSql
.
checkEqual
(
tdSql
.
queryResult
[
i
][
0
],
int
(
date_time
[
i
]
*
1000
/
1000
/
1000
/
1000
/
60
/
60
/
24
)
*
24
*
60
*
60
*
1000
*
1000
*
1000
)
elif
unit
.
lower
()
==
'1w'
:
for
i
in
range
(
len
(
self
.
ts_str
)):
tdSql
.
checkEqual
(
tdSql
.
queryResult
[
i
][
0
],
int
(
date_time
[
i
]
*
1000
/
1000
/
1000
/
1000
/
60
/
60
/
24
/
7
)
*
7
*
24
*
60
*
60
*
1000
*
1000
*
1000
)
def
data_check
(
self
,
date_time
,
precision
,
tb_type
):
for
unit
in
self
.
time_unit
:
if
(
unit
.
lower
()
==
'1u'
and
precision
.
lower
()
==
'ms'
)
or
()
:
if
tb_type
.
lower
()
==
'ntb'
:
if
tb_type
.
lower
()
==
'ntb'
:
tdSql
.
error
(
f
'select timetruncate(ts,
{
unit
}
) from
{
self
.
ntbname
}
'
)
elif
tb_type
.
lower
()
==
'ctb'
:
tdSql
.
error
(
f
'select timetruncate(ts,
{
unit
}
) from
{
self
.
ctbname
}
'
)
elif
tb_type
.
lower
()
==
'stb'
:
tdSql
.
error
(
f
'select timetruncate(ts,
{
unit
}
) from
{
self
.
stbname
}
'
)
elif
precision
.
lower
()
==
'ms'
:
if
tb_type
.
lower
()
==
'ntb'
:
if
tb_type
.
lower
()
==
'ntb'
:
tdSql
.
query
(
f
'select timetruncate(ts,
{
unit
}
) from
{
self
.
ntbname
}
'
)
elif
tb_type
.
lower
()
==
'ctb'
:
tdSql
.
query
(
f
'select timetruncate(ts,
{
unit
}
) from
{
self
.
ctbname
}
'
)
...
...
@@ -167,7 +168,7 @@ class TDTestCase:
tdSql
.
checkRows
(
len
(
self
.
ts_str
))
self
.
check_ms_timestamp
(
unit
,
date_time
)
elif
precision
.
lower
()
==
'us'
:
if
tb_type
.
lower
()
==
'ntb'
:
if
tb_type
.
lower
()
==
'ntb'
:
tdSql
.
query
(
f
'select timetruncate(ts,
{
unit
}
) from
{
self
.
ntbname
}
'
)
elif
tb_type
.
lower
()
==
'ctb'
:
tdSql
.
query
(
f
'select timetruncate(ts,
{
unit
}
) from
{
self
.
ctbname
}
'
)
...
...
@@ -176,7 +177,7 @@ class TDTestCase:
tdSql
.
checkRows
(
len
(
self
.
ts_str
))
self
.
check_us_timestamp
(
unit
,
date_time
)
elif
precision
.
lower
()
==
'ns'
:
if
tb_type
.
lower
()
==
'ntb'
:
if
tb_type
.
lower
()
==
'ntb'
:
tdSql
.
query
(
f
'select timetruncate(ts,
{
unit
}
) from
{
self
.
ntbname
}
'
)
elif
tb_type
.
lower
()
==
'ctb'
:
tdSql
.
query
(
f
'select timetruncate(ts,
{
unit
}
) from
{
self
.
ctbname
}
'
)
...
...
tests/system-test/fulltest.sh
浏览文件 @
4a3ee65e
...
...
@@ -157,7 +157,7 @@ python3 ./test.py -f 7-tmq/tmqCheckData1.py
python3 ./test.py
-f
7-tmq/tmqUdf.py
#python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 5
python3 ./test.py
-f
7-tmq/tmqConsumerGroup.py
#
python3 ./test.py -f 7-tmq/tmqShow.py
python3 ./test.py
-f
7-tmq/tmqShow.py
python3 ./test.py
-f
7-tmq/tmqAlterSchema.py
python3 ./test.py
-f
7-tmq/tmqConsFromTsdb.py
python3 ./test.py
-f
7-tmq/tmqConsFromTsdb1.py
...
...
tests/system-test/simpletest.bat
浏览文件 @
4a3ee65e
python3
.\test.py
-f
0
-others
\taosShell.py
@REM
python3 .\test.py -f 0-others\taosShell.py
python3
.\test.py
-f
0
-others
\taosShellError.py
python3
.\test.py
-f
0
-others
\taosShellNetChk.py
python3
.\test.py
-f
0
-others
\telemetry.py
python3
.\test.py
-f
0
-others
\taosdMonitor.py
python3
.\test.py
-f
0
-others
\udfTest.py
python3
.\test.py
-f
0
-others
\udf_create.py
python3
.\test.py
-f
0
-others
\udf_restart_taosd.py
@REM
python3 .\test.py -f 0-others\udf_restart_taosd.py
@REM python3 .\test.py -f 0-others\cachelast.py
@REM python3 .\test.py -f 0-others\user_control.py
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录