Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
45c7f7dd
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
45c7f7dd
编写于
5月 20, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/TD-14481-3.0
上级
f5499150
e6a02f74
变更
16
隐藏空白更改
内联
并排
Showing
16 changed file
with
421 addition
and
135 deletion
+421
-135
cmake/cmake.define
cmake/cmake.define
+7
-1
cmake/cmake.options
cmake/cmake.options
+12
-6
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+3
-4
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+26
-34
source/dnode/mgmt/node_mgmt/src/dmMonitor.c
source/dnode/mgmt/node_mgmt/src/dmMonitor.c
+7
-5
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+2
-0
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+11
-0
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+8
-3
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+39
-29
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+15
-17
source/libs/tdb/src/db/tdbPCache.c
source/libs/tdb/src/db/tdbPCache.c
+36
-22
source/libs/tdb/src/db/tdbPager.c
source/libs/tdb/src/db/tdbPager.c
+12
-1
source/libs/tdb/src/inc/tdbInt.h
source/libs/tdb/src/inc/tdbInt.h
+7
-7
source/libs/tdb/test/tdbTest.cpp
source/libs/tdb/test/tdbTest.cpp
+5
-5
source/util/src/tqueue.c
source/util/src/tqueue.c
+1
-1
tests/script/tsim/insert/update0.sim
tests/script/tsim/insert/update0.sim
+230
-0
未找到文件。
cmake/cmake.define
浏览文件 @
45c7f7dd
...
@@ -46,11 +46,17 @@ ENDIF ()
...
@@ -46,11 +46,17 @@ ENDIF ()
IF (TD_WINDOWS)
IF (TD_WINDOWS)
MESSAGE("${Yellow} set compiler flag for Windows! ${ColourReset}")
MESSAGE("${Yellow} set compiler flag for Windows! ${ColourReset}")
SET(COMMON_FLAGS "/
W3
/D_WIN32")
SET(COMMON_FLAGS "/
w
/D_WIN32")
SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /MANIFEST:NO")
SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /MANIFEST:NO")
# IF (MSVC AND (MSVC_VERSION GREATER_EQUAL 1900))
# IF (MSVC AND (MSVC_VERSION GREATER_EQUAL 1900))
# SET(COMMON_FLAGS "${COMMON_FLAGS} /Wv:18")
# SET(COMMON_FLAGS "${COMMON_FLAGS} /Wv:18")
# ENDIF ()
# ENDIF ()
IF (CMAKE_DEPFILE_FLAGS_C)
SET(CMAKE_DEPFILE_FLAGS_C "")
ENDIF ()
IF (CMAKE_DEPFILE_FLAGS_CXX)
SET(CMAKE_DEPFILE_FLAGS_CXX "")
ENDIF ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${COMMON_FLAGS}")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${COMMON_FLAGS}")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${COMMON_FLAGS}")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${COMMON_FLAGS}")
...
...
cmake/cmake.options
浏览文件 @
45c7f7dd
...
@@ -46,6 +46,18 @@ IF(${TD_WINDOWS})
...
@@ -46,6 +46,18 @@ IF(${TD_WINDOWS})
ON
ON
)
)
option(
BUILD_TEST
"If build unit tests using googletest"
OFF
)
ELSE ()
option(
BUILD_TEST
"If build unit tests using googletest"
ON
)
ENDIF ()
ENDIF ()
option(
option(
...
@@ -54,12 +66,6 @@ option(
...
@@ -54,12 +66,6 @@ option(
OFF
OFF
)
)
option(
BUILD_TEST
"If build unit tests using googletest"
ON
)
option(
option(
BUILD_WITH_LEVELDB
BUILD_WITH_LEVELDB
"If build with leveldb"
"If build with leveldb"
...
...
source/client/test/clientTests.cpp
浏览文件 @
45c7f7dd
...
@@ -567,6 +567,7 @@ TEST(testCase, insert_test) {
...
@@ -567,6 +567,7 @@ TEST(testCase, insert_test) {
taos_free_result(pRes);
taos_free_result(pRes);
taos_close(pConn);
taos_close(pConn);
}
}
#endif
TEST
(
testCase
,
projection_query_tables
)
{
TEST
(
testCase
,
projection_query_tables
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
...
@@ -605,7 +606,7 @@ TEST(testCase, projection_query_tables) {
...
@@ -605,7 +606,7 @@ TEST(testCase, projection_query_tables) {
}
}
taos_free_result
(
pRes
);
taos_free_result
(
pRes
);
for(int32_t i = 0; i < 100000
00
; i += 20) {
for
(
int32_t
i
=
0
;
i
<
100000
;
i
+=
20
)
{
char
sql
[
1024
]
=
{
0
};
char
sql
[
1024
]
=
{
0
};
sprintf
(
sql
,
sprintf
(
sql
,
"insert into tu values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"insert into tu values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
...
@@ -625,7 +626,7 @@ TEST(testCase, projection_query_tables) {
...
@@ -625,7 +626,7 @@ TEST(testCase, projection_query_tables) {
printf
(
"start to insert next table
\n
"
);
printf
(
"start to insert next table
\n
"
);
for(int32_t i = 0; i < 100000
00
; i += 20) {
for
(
int32_t
i
=
0
;
i
<
100000
;
i
+=
20
)
{
char
sql
[
1024
]
=
{
0
};
char
sql
[
1024
]
=
{
0
};
sprintf
(
sql
,
sprintf
(
sql
,
"insert into tu2 values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"insert into tu2 values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
...
@@ -692,8 +693,6 @@ TEST(testCase, projection_query_stables) {
...
@@ -692,8 +693,6 @@ TEST(testCase, projection_query_stables) {
taos_close
(
pConn
);
taos_close
(
pConn
);
}
}
#endif
TEST
(
testCase
,
agg_query_tables
)
{
TEST
(
testCase
,
agg_query_tables
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
ASSERT_NE
(
pConn
,
nullptr
);
ASSERT_NE
(
pConn
,
nullptr
);
...
...
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
浏览文件 @
45c7f7dd
...
@@ -22,21 +22,19 @@
...
@@ -22,21 +22,19 @@
static
inline
void
vmSendRsp
(
SRpcMsg
*
pMsg
,
int32_t
code
)
{
static
inline
void
vmSendRsp
(
SRpcMsg
*
pMsg
,
int32_t
code
)
{
SRpcMsg
rsp
=
{
SRpcMsg
rsp
=
{
.
code
=
code
,
.
code
=
code
,
.
info
=
pMsg
->
info
,
.
pCont
=
pMsg
->
info
.
rsp
,
.
pCont
=
pMsg
->
info
.
rsp
,
.
contLen
=
pMsg
->
info
.
rspLen
,
.
contLen
=
pMsg
->
info
.
rspLen
,
.
info
=
pMsg
->
info
,
};
};
tmsgSendRsp
(
&
rsp
);
tmsgSendRsp
(
&
rsp
);
}
}
static
void
vmProcessMgmtMonitorQueue
(
SQueueInfo
*
pInfo
,
SRpcMsg
*
pMsg
)
{
static
void
vmProcessMgmtMonitorQueue
(
SQueueInfo
*
pInfo
,
SRpcMsg
*
pMsg
)
{
SVnodeMgmt
*
pMgmt
=
pInfo
->
ahandle
;
SVnodeMgmt
*
pMgmt
=
pInfo
->
ahandle
;
int32_t
code
=
-
1
;
dTrace
(
"msg:%p, get from vnode queue, type:%s"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
int32_t
code
=
-
1
;
switch
(
pMsg
->
msgType
)
{
tmsg_t
msgType
=
pMsg
->
msgType
;
dTrace
(
"msg:%p, get from vnode queue, type:%s"
,
pMsg
,
TMSG_INFO
(
msgType
));
switch
(
msgType
)
{
case
TDMT_MON_VM_INFO
:
case
TDMT_MON_VM_INFO
:
code
=
vmProcessGetMonitorInfoReq
(
pMgmt
,
pMsg
);
code
=
vmProcessGetMonitorInfoReq
(
pMgmt
,
pMsg
);
break
;
break
;
...
@@ -54,7 +52,7 @@ static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
...
@@ -54,7 +52,7 @@ static void vmProcessMgmtMonitorQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
dError
(
"msg:%p, not processed in vnode queue"
,
pMsg
);
dError
(
"msg:%p, not processed in vnode queue"
,
pMsg
);
}
}
if
(
msgType
&
1u
)
{
if
(
IsReq
(
pMsg
)
)
{
if
(
code
!=
0
&&
terrno
!=
0
)
code
=
terrno
;
if
(
code
!=
0
&&
terrno
!=
0
)
code
=
terrno
;
vmSendRsp
(
pMsg
,
code
);
vmSendRsp
(
pMsg
,
code
);
}
}
...
@@ -96,7 +94,6 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
...
@@ -96,7 +94,6 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
static
void
vmProcessWriteQueue
(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
static
void
vmProcessWriteQueue
(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
SVnodeObj
*
pVnode
=
pInfo
->
ahandle
;
SVnodeObj
*
pVnode
=
pInfo
->
ahandle
;
SArray
*
pArray
=
taosArrayInit
(
numOfMsgs
,
sizeof
(
SRpcMsg
*
));
SArray
*
pArray
=
taosArrayInit
(
numOfMsgs
,
sizeof
(
SRpcMsg
*
));
if
(
pArray
==
NULL
)
{
if
(
pArray
==
NULL
)
{
dError
(
"failed to process %d msgs in write-queue since %s"
,
numOfMsgs
,
terrstr
());
dError
(
"failed to process %d msgs in write-queue since %s"
,
numOfMsgs
,
terrstr
());
...
@@ -116,7 +113,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
...
@@ -116,7 +113,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pArray
);
i
++
)
{
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pArray
);
i
++
)
{
SRpcMsg
*
pMsg
=
*
(
SRpcMsg
**
)
taosArrayGet
(
pArray
,
i
);
SRpcMsg
*
pMsg
=
*
(
SRpcMsg
**
)
taosArrayGet
(
pArray
,
i
);
SRpcMsg
rsp
=
{.
info
=
pMsg
->
info
,
.
pCont
=
NULL
,
.
contLen
=
0
};
SRpcMsg
rsp
=
{.
info
=
pMsg
->
info
};
int32_t
ret
=
syncPropose
(
vnodeGetSyncHandle
(
pVnode
->
pImpl
),
pMsg
,
false
);
int32_t
ret
=
syncPropose
(
vnodeGetSyncHandle
(
pVnode
->
pImpl
),
pMsg
,
false
);
if
(
ret
==
TAOS_SYNC_PROPOSE_NOT_LEADER
)
{
if
(
ret
==
TAOS_SYNC_PROPOSE_NOT_LEADER
)
{
...
@@ -130,7 +127,6 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
...
@@ -130,7 +127,6 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
rsp
.
code
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
rsp
.
code
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
tmsgSendRsp
(
&
rsp
);
tmsgSendRsp
(
&
rsp
);
}
else
if
(
ret
==
TAOS_SYNC_PROPOSE_SUCCESS
)
{
}
else
if
(
ret
==
TAOS_SYNC_PROPOSE_SUCCESS
)
{
// ok
// send response in applyQ
// send response in applyQ
}
else
{
}
else
{
assert
(
0
);
assert
(
0
);
...
@@ -149,16 +145,13 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
...
@@ -149,16 +145,13 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
static
void
vmProcessApplyQueue
(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
static
void
vmProcessApplyQueue
(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
SVnodeObj
*
pVnode
=
pInfo
->
ahandle
;
SVnodeObj
*
pVnode
=
pInfo
->
ahandle
;
SRpcMsg
*
pMsg
=
NULL
;
SRpcMsg
rsp
;
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
SRpcMsg
*
pMsg
=
NULL
;
taosGetQitem
(
qall
,
(
void
**
)
&
pMsg
);
taosGetQitem
(
qall
,
(
void
**
)
&
pMsg
);
// init response rpc msg
// init response rpc msg
rsp
.
code
=
0
;
SRpcMsg
rsp
=
{
0
};
rsp
.
pCont
=
NULL
;
rsp
.
contLen
=
0
;
// get original rpc msg
// get original rpc msg
assert
(
pMsg
->
msgType
==
TDMT_VND_SYNC_APPLY_MSG
);
assert
(
pMsg
->
msgType
==
TDMT_VND_SYNC_APPLY_MSG
);
...
@@ -177,7 +170,6 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
...
@@ -177,7 +170,6 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
rpcFreeCont
(
originalRpcMsg
.
pCont
);
rpcFreeCont
(
originalRpcMsg
.
pCont
);
// if leader, send response
// if leader, send response
// if (pMsg->rpcMsg.handle != NULL && pMsg->rpcMsg.ahandle != NULL) {
if
(
pMsg
->
info
.
handle
!=
NULL
)
{
if
(
pMsg
->
info
.
handle
!=
NULL
)
{
rsp
.
info
=
pMsg
->
info
;
rsp
.
info
=
pMsg
->
info
;
tmsgSendRsp
(
&
rsp
);
tmsgSendRsp
(
&
rsp
);
...
@@ -190,21 +182,19 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
...
@@ -190,21 +182,19 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
static
void
vmProcessSyncQueue
(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
static
void
vmProcessSyncQueue
(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
SVnodeObj
*
pVnode
=
pInfo
->
ahandle
;
SVnodeObj
*
pVnode
=
pInfo
->
ahandle
;
SRpcMsg
*
pMsg
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
SRpcMsg
*
pMsg
=
NULL
;
taosGetQitem
(
qall
,
(
void
**
)
&
pMsg
);
taosGetQitem
(
qall
,
(
void
**
)
&
pMsg
);
// todo
int32_t
code
=
vnodeProcessSyncReq
(
pVnode
->
pImpl
,
pMsg
,
NULL
);
SRpcMsg
*
pRsp
=
NULL
;
if
(
code
!=
0
)
{
int32_t
ret
=
vnodeProcessSyncReq
(
pVnode
->
pImpl
,
pMsg
,
&
pRsp
);
if
(
ret
!=
0
)
{
// if leader, send response
if
(
pMsg
->
info
.
handle
!=
NULL
)
{
if
(
pMsg
->
info
.
handle
!=
NULL
)
{
SRpcMsg
rsp
=
{
0
};
SRpcMsg
rsp
=
{
rsp
.
code
=
terrno
;
.
code
=
(
terrno
<
0
)
?
terrno
:
code
,
rsp
.
info
=
pMsg
->
info
;
.
info
=
pMsg
->
info
,
dTrace
(
"msg:%p, process sync queue error since code:%s"
,
pMsg
,
terrstr
());
};
dTrace
(
"msg:%p, failed to process sync queue since %s"
,
pMsg
,
terrstr
());
tmsgSendRsp
(
&
rsp
);
tmsgSendRsp
(
&
rsp
);
}
}
}
}
...
@@ -216,9 +206,9 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
...
@@ -216,9 +206,9 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
static
void
vmProcessMergeQueue
(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
static
void
vmProcessMergeQueue
(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
SVnodeObj
*
pVnode
=
pInfo
->
ahandle
;
SVnodeObj
*
pVnode
=
pInfo
->
ahandle
;
SRpcMsg
*
pMsg
=
NULL
;
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
SRpcMsg
*
pMsg
=
NULL
;
taosGetQitem
(
qall
,
(
void
**
)
&
pMsg
);
taosGetQitem
(
qall
,
(
void
**
)
&
pMsg
);
dTrace
(
"msg:%p, get from vnode-merge queue"
,
pMsg
);
dTrace
(
"msg:%p, get from vnode-merge queue"
,
pMsg
);
...
@@ -308,22 +298,24 @@ int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
...
@@ -308,22 +298,24 @@ int32_t vmPutNodeMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t
vmPutNodeMsgToMonitorQueue
(
SVnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
)
{
int32_t
vmPutNodeMsgToMonitorQueue
(
SVnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
)
{
SSingleWorker
*
pWorker
=
&
pMgmt
->
monitorWorker
;
SSingleWorker
*
pWorker
=
&
pMgmt
->
monitorWorker
;
dTrace
(
"msg:%p, put into vnode-monitor worker, type:%s"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
dTrace
(
"msg:%p, put into vnode-monitor worker, type:%s"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
taosWriteQitem
(
pWorker
->
queue
,
pMsg
);
taosWriteQitem
(
pWorker
->
queue
,
pMsg
);
return
0
;
return
0
;
}
}
static
int32_t
vmPutRpcMsgToQueue
(
SVnodeMgmt
*
pMgmt
,
SRpcMsg
*
pRpc
,
EQueueType
qtype
)
{
static
int32_t
vmPutRpcMsgToQueue
(
SVnodeMgmt
*
pMgmt
,
SRpcMsg
*
pRpc
,
EQueueType
qtype
)
{
SMsgHead
*
pHead
=
pRpc
->
pCont
;
SMsgHead
*
pHead
=
pRpc
->
pCont
;
SVnodeObj
*
pVnode
=
vmAcquireVnode
(
pMgmt
,
pHead
->
vgId
);
SVnodeObj
*
pVnode
=
vmAcquireVnode
(
pMgmt
,
pHead
->
vgId
);
if
(
pVnode
==
NULL
)
return
-
1
;
if
(
pVnode
==
NULL
)
return
-
1
;
SRpcMsg
*
pMsg
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
),
RPC_QITEM
);
SRpcMsg
*
pMsg
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
),
RPC_QITEM
);
int32_t
code
=
0
;
int32_t
code
=
0
;
if
(
pMsg
!=
NULL
)
{
if
(
pMsg
==
NULL
)
{
rpcFreeCont
(
pRpc
->
pCont
);
pRpc
->
pCont
=
NULL
;
code
=
-
1
;
}
else
{
memcpy
(
pMsg
,
pRpc
,
sizeof
(
SRpcMsg
));
memcpy
(
pMsg
,
pRpc
,
sizeof
(
SRpcMsg
));
switch
(
qtype
)
{
switch
(
qtype
)
{
case
WRITE_QUEUE
:
case
WRITE_QUEUE
:
...
@@ -428,7 +420,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
...
@@ -428,7 +420,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
return
-
1
;
return
-
1
;
}
}
dDebug
(
"vgId:%d,
vnode
queue is alloced"
,
pVnode
->
vgId
);
dDebug
(
"vgId:%d, queue is alloced"
,
pVnode
->
vgId
);
return
0
;
return
0
;
}
}
...
@@ -445,7 +437,7 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
...
@@ -445,7 +437,7 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode
->
pQueryQ
=
NULL
;
pVnode
->
pQueryQ
=
NULL
;
pVnode
->
pFetchQ
=
NULL
;
pVnode
->
pFetchQ
=
NULL
;
pVnode
->
pMergeQ
=
NULL
;
pVnode
->
pMergeQ
=
NULL
;
dDebug
(
"vgId:%d,
vnode
queue is freed"
,
pVnode
->
vgId
);
dDebug
(
"vgId:%d, queue is freed"
,
pVnode
->
vgId
);
}
}
int32_t
vmStartWorker
(
SVnodeMgmt
*
pMgmt
)
{
int32_t
vmStartWorker
(
SVnodeMgmt
*
pMgmt
)
{
...
@@ -496,7 +488,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
...
@@ -496,7 +488,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
.
param
=
pMgmt
,
.
param
=
pMgmt
,
};
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
monitorWorker
,
&
mCfg
)
!=
0
)
{
if
(
tSingleWorkerInit
(
&
pMgmt
->
monitorWorker
,
&
mCfg
)
!=
0
)
{
dError
(
"failed to start
mnode
vnode-monitor worker since %s"
,
terrstr
());
dError
(
"failed to start vnode-monitor worker since %s"
,
terrstr
());
return
-
1
;
return
-
1
;
}
}
...
...
source/dnode/mgmt/node_mgmt/src/dmMonitor.c
浏览文件 @
45c7f7dd
...
@@ -161,10 +161,12 @@ void dmGetVnodeLoads(SMonVloadInfo *pInfo) {
...
@@ -161,10 +161,12 @@ void dmGetVnodeLoads(SMonVloadInfo *pInfo) {
void
dmGetMnodeLoads
(
SMonMloadInfo
*
pInfo
)
{
void
dmGetMnodeLoads
(
SMonMloadInfo
*
pInfo
)
{
SDnode
*
pDnode
=
dmInstance
();
SDnode
*
pDnode
=
dmInstance
();
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
MNODE
];
SMgmtWrapper
*
pWrapper
=
&
pDnode
->
wrappers
[
MNODE
];
if
(
tsMultiProcess
)
{
if
(
dmMarkWrapper
(
pWrapper
)
==
0
)
{
dmSendLocalRecv
(
pDnode
,
TDMT_MON_MM_LOAD
,
tDeserializeSMonMloadInfo
,
pInfo
);
if
(
tsMultiProcess
)
{
}
else
if
(
pWrapper
->
pMgmt
!=
NULL
)
{
dmSendLocalRecv
(
pDnode
,
TDMT_MON_MM_LOAD
,
tDeserializeSMonMloadInfo
,
pInfo
);
mmGetMnodeLoads
(
pWrapper
->
pMgmt
,
pInfo
);
}
else
if
(
pWrapper
->
pMgmt
!=
NULL
)
{
mmGetMnodeLoads
(
pWrapper
->
pMgmt
,
pInfo
);
}
dmReleaseWrapper
(
pWrapper
);
}
}
dmReleaseWrapper
(
pWrapper
);
}
}
source/dnode/vnode/inc/vnode.h
浏览文件 @
45c7f7dd
...
@@ -126,6 +126,8 @@ STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta);
...
@@ -126,6 +126,8 @@ STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta);
void
tqReadHandleSetColIdList
(
STqReadHandle
*
pReadHandle
,
SArray
*
pColIdList
);
void
tqReadHandleSetColIdList
(
STqReadHandle
*
pReadHandle
,
SArray
*
pColIdList
);
int32_t
tqReadHandleSetTbUidList
(
STqReadHandle
*
pHandle
,
const
SArray
*
tbUidList
);
int32_t
tqReadHandleSetTbUidList
(
STqReadHandle
*
pHandle
,
const
SArray
*
tbUidList
);
int32_t
tqReadHandleAddTbUidList
(
STqReadHandle
*
pHandle
,
const
SArray
*
tbUidList
);
int32_t
tqReadHandleAddTbUidList
(
STqReadHandle
*
pHandle
,
const
SArray
*
tbUidList
);
int32_t
tqReadHandleRemoveTbUidList
(
STqReadHandle
*
pHandle
,
const
SArray
*
tbUidList
);
int32_t
tqReadHandleSetMsg
(
STqReadHandle
*
pHandle
,
SSubmitReq
*
pMsg
,
int64_t
ver
);
int32_t
tqReadHandleSetMsg
(
STqReadHandle
*
pHandle
,
SSubmitReq
*
pMsg
,
int64_t
ver
);
bool
tqNextDataBlock
(
STqReadHandle
*
pHandle
);
bool
tqNextDataBlock
(
STqReadHandle
*
pHandle
);
bool
tqNextDataBlockFilterOut
(
STqReadHandle
*
pHandle
,
SHashObj
*
filterOutUids
);
bool
tqNextDataBlockFilterOut
(
STqReadHandle
*
pHandle
,
SHashObj
*
filterOutUids
);
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
45c7f7dd
...
@@ -231,3 +231,14 @@ int tqReadHandleAddTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) {
...
@@ -231,3 +231,14 @@ int tqReadHandleAddTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) {
return
0
;
return
0
;
}
}
int
tqReadHandleRemoveTbUidList
(
STqReadHandle
*
pHandle
,
const
SArray
*
tbUidList
)
{
ASSERT
(
pHandle
->
tbIdHash
!=
NULL
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
tbUidList
);
i
++
)
{
int64_t
*
pKey
=
(
int64_t
*
)
taosArrayGet
(
tbUidList
,
i
);
taosHashRemove
(
pHandle
->
tbIdHash
,
pKey
,
sizeof
(
int64_t
));
}
return
0
;
}
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
45c7f7dd
...
@@ -2076,8 +2076,14 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
...
@@ -2076,8 +2076,14 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
}
}
#endif
#endif
if
(
TD_SUPPORT_UPDATE
(
pCfg
->
update
))
{
if
(
TD_SUPPORT_UPDATE
(
pCfg
->
update
))
{
if
(
lastKeyAppend
!=
key
)
{
if
(
lastKeyAppend
!=
TSKEY_INITIAL_VAL
)
{
++
curRow
;
}
lastKeyAppend
=
key
;
}
// load data from file firstly
numOfRows
=
doCopyRowsFromFileBlock
(
pTsdbReadHandle
,
pTsdbReadHandle
->
outputCapacity
,
curRow
,
pos
,
pos
);
numOfRows
=
doCopyRowsFromFileBlock
(
pTsdbReadHandle
,
pTsdbReadHandle
->
outputCapacity
,
curRow
,
pos
,
pos
);
lastKeyAppend
=
key
;
if
(
rv1
!=
TD_ROW_SVER
(
row1
))
{
if
(
rv1
!=
TD_ROW_SVER
(
row1
))
{
rv1
=
TD_ROW_SVER
(
row1
);
rv1
=
TD_ROW_SVER
(
row1
);
...
@@ -2087,7 +2093,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
...
@@ -2087,7 +2093,7 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
}
}
// still assign data into current row
// still assign data into current row
mergeTwoRowFromMem
(
pTsdbReadHandle
,
pTsdbReadHandle
->
outputCapacity
,
&
curRow
,
row1
,
row2
,
numOfCols
,
numOfRows
+=
mergeTwoRowFromMem
(
pTsdbReadHandle
,
pTsdbReadHandle
->
outputCapacity
,
&
curRow
,
row1
,
row2
,
numOfCols
,
pCheckInfo
->
tableId
,
pSchema1
,
pSchema2
,
pCfg
->
update
,
&
lastKeyAppend
);
pCheckInfo
->
tableId
,
pSchema1
,
pSchema2
,
pCfg
->
update
,
&
lastKeyAppend
);
if
(
cur
->
win
.
skey
==
TSKEY_INITIAL_VAL
)
{
if
(
cur
->
win
.
skey
==
TSKEY_INITIAL_VAL
)
{
...
@@ -2099,7 +2105,6 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
...
@@ -2099,7 +2105,6 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
cur
->
mixBlock
=
true
;
cur
->
mixBlock
=
true
;
moveToNextRowInMem
(
pCheckInfo
);
moveToNextRowInMem
(
pCheckInfo
);
++
curRow
;
pos
+=
step
;
pos
+=
step
;
}
else
{
}
else
{
...
...
source/libs/executor/src/executor.c
浏览文件 @
45c7f7dd
...
@@ -125,6 +125,33 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
...
@@ -125,6 +125,33 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
return
pTaskInfo
;
return
pTaskInfo
;
}
}
static
SArray
*
filterQualifiedChildTables
(
const
SStreamBlockScanInfo
*
pScanInfo
,
const
SArray
*
tableIdList
)
{
SArray
*
qa
=
taosArrayInit
(
4
,
sizeof
(
tb_uid_t
));
// let's discard the tables those are not created according to the queried super table.
SMetaReader
mr
=
{
0
};
metaReaderInit
(
&
mr
,
pScanInfo
->
readHandle
.
meta
,
0
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
tableIdList
);
++
i
)
{
int64_t
*
id
=
(
int64_t
*
)
taosArrayGet
(
tableIdList
,
i
);
int32_t
code
=
metaGetTableEntryByUid
(
&
mr
,
*
id
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to get table meta, uid:%"
PRIu64
" code:%s"
,
*
id
,
tstrerror
(
terrno
));
continue
;
}
ASSERT
(
mr
.
me
.
type
==
TSDB_CHILD_TABLE
);
if
(
mr
.
me
.
ctbEntry
.
suid
!=
pScanInfo
->
tableUid
)
{
continue
;
}
taosArrayPush
(
qa
,
id
);
}
metaReaderClear
(
&
mr
);
return
qa
;
}
int32_t
qUpdateQualifiedTableId
(
qTaskInfo_t
tinfo
,
const
SArray
*
tableIdList
,
bool
isAdd
)
{
int32_t
qUpdateQualifiedTableId
(
qTaskInfo_t
tinfo
,
const
SArray
*
tableIdList
,
bool
isAdd
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
...
@@ -134,41 +161,24 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
...
@@ -134,41 +161,24 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
pInfo
=
pInfo
->
pDownstream
[
0
];
pInfo
=
pInfo
->
pDownstream
[
0
];
}
}
int32_t
code
=
0
;
SStreamBlockScanInfo
*
pScanInfo
=
pInfo
->
info
;
SStreamBlockScanInfo
*
pScanInfo
=
pInfo
->
info
;
if
(
isAdd
)
{
if
(
isAdd
)
{
// add new table id
SArray
*
qa
=
taosArrayInit
(
4
,
sizeof
(
tb_uid_t
));
SArray
*
qa
=
filterQualifiedChildTables
(
pScanInfo
,
tableIdList
);
SMetaReader
mr
=
{
0
};
metaReaderInit
(
&
mr
,
pScanInfo
->
readHandle
.
meta
,
0
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
tableIdList
);
++
i
)
{
int64_t
*
id
=
(
int64_t
*
)
taosArrayGet
(
tableIdList
,
i
);
int32_t
code
=
metaGetTableEntryByUid
(
&
mr
,
*
id
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to get table meta, uid:%"
PRIu64
" code:%s"
,
*
id
,
tstrerror
(
terrno
));
continue
;
}
ASSERT
(
mr
.
me
.
type
==
TSDB_CHILD_TABLE
);
if
(
mr
.
me
.
ctbEntry
.
suid
!=
pScanInfo
->
tableUid
)
{
continue
;
}
taosArrayPush
(
qa
,
id
);
qDebug
(
" %d qualified child tables added into stream scanner"
,
(
int32_t
)
taosArrayGetSize
(
qa
));
}
code
=
tqReadHandleAddTbUidList
(
pScanInfo
->
streamBlockReader
,
qa
);
taosArrayDestroy
(
qa
);
metaReaderClear
(
&
mr
);
}
else
{
// remove the table id in current list
SArray
*
qa
=
filterQualifiedChildTables
(
pScanInfo
,
tableIdList
);
qDebug
(
" %d qualified child tables added into stream scanner"
,
(
int32_t
)
taosArrayGetSize
(
qa
));
qDebug
(
" %d remove child tables from the stream scanner"
,
(
int32_t
)
taosArrayGetSize
(
tableIdList
));
int32_t
code
=
tqReadHandleAddTbUidList
(
pScanInfo
->
streamBlockReader
,
qa
);
code
=
tqReadHandleAddTbUidList
(
pScanInfo
->
streamBlockReader
,
tableIdList
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
taosArrayDestroy
(
qa
);
return
code
;
}
}
else
{
assert
(
0
);
}
}
return
TSDB_CODE_SUCCESS
;
return
code
;
}
}
int32_t
qGetQueriedTableSchemaVersion
(
qTaskInfo_t
tinfo
,
char
*
dbName
,
char
*
tableName
,
int32_t
*
sversion
,
int32_t
*
tversion
)
{
int32_t
qGetQueriedTableSchemaVersion
(
qTaskInfo_t
tinfo
,
char
*
dbName
,
char
*
tableName
,
int32_t
*
sversion
,
int32_t
*
tversion
)
{
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
45c7f7dd
...
@@ -2062,15 +2062,7 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p
...
@@ -2062,15 +2062,7 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p
pAggInfo
->
groupId
=
groupId
;
pAggInfo
->
groupId
=
groupId
;
}
}
/**
int32_t
doCopyToSDataBlock
(
SExecTaskInfo
*
pTaskInfo
,
SSDataBlock
*
pBlock
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
,
SGroupResInfo
*
pGroupResInfo
,
* For interval query of both super table and table, copy the data in ascending order, since the output results are
* ordered in SWindowResutl already. While handling the group by query for both table and super table,
* all group result are completed already.
*
* @param pQInfo
* @param result
*/
int32_t
doCopyToSDataBlock
(
SExecTaskInfo
*
taskInfo
,
SSDataBlock
*
pBlock
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
,
SGroupResInfo
*
pGroupResInfo
,
int32_t
*
rowCellOffset
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExprs
)
{
int32_t
*
rowCellOffset
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExprs
)
{
int32_t
numOfRows
=
getNumOfTotalRes
(
pGroupResInfo
);
int32_t
numOfRows
=
getNumOfTotalRes
(
pGroupResInfo
);
int32_t
start
=
pGroupResInfo
->
index
;
int32_t
start
=
pGroupResInfo
->
index
;
...
@@ -2087,6 +2079,15 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
...
@@ -2087,6 +2079,15 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
continue
;
continue
;
}
}
if
(
pBlock
->
info
.
groupId
==
0
)
{
pBlock
->
info
.
groupId
=
pPos
->
groupId
;
}
else
{
// current value belongs to different group, it can't be packed into one datablock
if
(
pBlock
->
info
.
groupId
!=
pPos
->
groupId
)
{
break
;
}
}
if
(
pBlock
->
info
.
rows
+
pRow
->
numOfRows
>
pBlock
->
info
.
capacity
)
{
if
(
pBlock
->
info
.
rows
+
pRow
->
numOfRows
>
pBlock
->
info
.
capacity
)
{
break
;
break
;
}
}
...
@@ -2100,9 +2101,8 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
...
@@ -2100,9 +2101,8 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
if
(
pCtx
[
j
].
fpSet
.
finalize
)
{
if
(
pCtx
[
j
].
fpSet
.
finalize
)
{
int32_t
code
=
pCtx
[
j
].
fpSet
.
finalize
(
&
pCtx
[
j
],
pBlock
);
int32_t
code
=
pCtx
[
j
].
fpSet
.
finalize
(
&
pCtx
[
j
],
pBlock
);
if
(
TAOS_FAILED
(
code
))
{
if
(
TAOS_FAILED
(
code
))
{
qError
(
"%s build result data block error, code %s"
,
GET_TASKID
(
taskInfo
),
tstrerror
(
code
));
qError
(
"%s build result data block error, code %s"
,
GET_TASKID
(
pTaskInfo
),
tstrerror
(
code
));
taskInfo
->
code
=
code
;
longjmp
(
pTaskInfo
->
env
,
code
);
longjmp
(
taskInfo
->
env
,
code
);
}
}
}
else
if
(
strcmp
(
pCtx
[
j
].
pExpr
->
pExpr
->
_function
.
functionName
,
"_select_value"
)
==
0
)
{
}
else
if
(
strcmp
(
pCtx
[
j
].
pExpr
->
pExpr
->
_function
.
functionName
,
"_select_value"
)
==
0
)
{
// do nothing, todo refactor
// do nothing, todo refactor
...
@@ -2124,7 +2124,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
...
@@ -2124,7 +2124,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
}
}
}
}
// qDebug("QInfo:0x%"PRIx64" copy data to query buf completed", GET_TASKID(pRuntimeEnv)
);
qDebug
(
"%s result generated, rows:%d, groupId:%"
PRIu64
,
GET_TASKID
(
pTaskInfo
),
pBlock
->
info
.
rows
,
pBlock
->
info
.
groupId
);
blockDataUpdateTsWindow
(
pBlock
);
blockDataUpdateTsWindow
(
pBlock
);
return
0
;
return
0
;
}
}
...
@@ -2145,10 +2145,9 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
...
@@ -2145,10 +2145,9 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
return
;
return
;
}
}
// clear the existed group id
pBlock
->
info
.
groupId
=
0
;
doCopyToSDataBlock
(
pTaskInfo
,
pBlock
,
pExprInfo
,
pBuf
,
pGroupResInfo
,
rowCellOffset
,
pCtx
,
numOfExprs
);
doCopyToSDataBlock
(
pTaskInfo
,
pBlock
,
pExprInfo
,
pBuf
,
pGroupResInfo
,
rowCellOffset
,
pCtx
,
numOfExprs
);
// add condition (pBlock->info.rows >= 1) just to runtime happy
blockDataUpdateTsWindow
(
pBlock
);
}
}
static
void
updateNumOfRowsInResultRows
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
SResultRowInfo
*
pResultRowInfo
,
static
void
updateNumOfRowsInResultRows
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
SResultRowInfo
*
pResultRowInfo
,
...
@@ -3656,7 +3655,6 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
...
@@ -3656,7 +3655,6 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
doSetOperatorCompleted
(
pOperator
);
doSetOperatorCompleted
(
pOperator
);
}
}
doSetOperatorCompleted
(
pOperator
);
return
(
blockDataGetNumOfRows
(
pInfo
->
pRes
)
!=
0
)
?
pInfo
->
pRes
:
NULL
;
return
(
blockDataGetNumOfRows
(
pInfo
->
pRes
)
!=
0
)
?
pInfo
->
pRes
:
NULL
;
}
}
...
...
source/libs/tdb/src/db/tdbPCache.c
浏览文件 @
45c7f7dd
...
@@ -14,6 +14,9 @@
...
@@ -14,6 +14,9 @@
*/
*/
#include "tdbInt.h"
#include "tdbInt.h"
// #include <sys/types.h>
// #include <unistd.h>
struct
SPCache
{
struct
SPCache
{
int
szPage
;
int
szPage
;
int
nPages
;
int
nPages
;
...
@@ -32,7 +35,6 @@ static inline uint32_t tdbPCachePageHash(const SPgid *pPgid) {
...
@@ -32,7 +35,6 @@ static inline uint32_t tdbPCachePageHash(const SPgid *pPgid) {
uint32_t
*
t
=
(
uint32_t
*
)((
pPgid
)
->
fileid
);
uint32_t
*
t
=
(
uint32_t
*
)((
pPgid
)
->
fileid
);
return
(
uint32_t
)(
t
[
0
]
+
t
[
1
]
+
t
[
2
]
+
t
[
3
]
+
t
[
4
]
+
t
[
5
]
+
(
pPgid
)
->
pgno
);
return
(
uint32_t
)(
t
[
0
]
+
t
[
1
]
+
t
[
2
]
+
t
[
3
]
+
t
[
4
]
+
t
[
5
]
+
(
pPgid
)
->
pgno
);
}
}
#define PAGE_IS_PINNED(pPage) ((pPage)->pLruNext == NULL)
static
int
tdbPCacheOpenImpl
(
SPCache
*
pCache
);
static
int
tdbPCacheOpenImpl
(
SPCache
*
pCache
);
static
SPage
*
tdbPCacheFetchImpl
(
SPCache
*
pCache
,
const
SPgid
*
pPgid
,
TXN
*
pTxn
);
static
SPage
*
tdbPCacheFetchImpl
(
SPCache
*
pCache
,
const
SPgid
*
pPgid
,
TXN
*
pTxn
);
...
@@ -80,16 +82,22 @@ int tdbPCacheClose(SPCache *pCache) {
...
@@ -80,16 +82,22 @@ int tdbPCacheClose(SPCache *pCache) {
SPage
*
tdbPCacheFetch
(
SPCache
*
pCache
,
const
SPgid
*
pPgid
,
TXN
*
pTxn
)
{
SPage
*
tdbPCacheFetch
(
SPCache
*
pCache
,
const
SPgid
*
pPgid
,
TXN
*
pTxn
)
{
SPage
*
pPage
;
SPage
*
pPage
;
i32
nRef
;
tdbPCacheLock
(
pCache
);
tdbPCacheLock
(
pCache
);
pPage
=
tdbPCacheFetchImpl
(
pCache
,
pPgid
,
pTxn
);
pPage
=
tdbPCacheFetchImpl
(
pCache
,
pPgid
,
pTxn
);
if
(
pPage
)
{
if
(
pPage
)
{
tdbRefPage
(
pPage
);
nRef
=
tdbRefPage
(
pPage
);
}
}
ASSERT
(
pPage
);
tdbPCacheUnlock
(
pCache
);
tdbPCacheUnlock
(
pCache
);
// printf("thread %" PRId64 " fetch page %d pgno %d pPage %p nRef %d\n", taosGetSelfPthreadId(), pPage->id,
// TDB_PAGE_PGNO(pPage), pPage, nRef);
return
pPage
;
return
pPage
;
}
}
...
@@ -98,30 +106,31 @@ void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn) {
...
@@ -98,30 +106,31 @@ void tdbPCacheRelease(SPCache *pCache, SPage *pPage, TXN *pTxn) {
ASSERT
(
pTxn
);
ASSERT
(
pTxn
);
nRef
=
tdbUnrefPage
(
pPage
);
//
nRef = tdbUnrefPage(pPage);
ASSERT
(
nRef
>=
0
);
//
ASSERT(nRef >= 0);
tdbPCacheLock
(
pCache
);
nRef
=
tdbUnrefPage
(
pPage
);
if
(
nRef
==
0
)
{
if
(
nRef
==
0
)
{
tdbPCacheLock
(
pCache
);
// test the nRef again to make sure
// test the nRef again to make sure
// it is safe th handle the page
// it is safe th handle the page
nRef
=
tdbGetPageRef
(
pPage
);
// nRef = tdbGetPageRef(pPage);
if
(
nRef
==
0
)
{
// if (nRef == 0) {
if
(
pPage
->
isLocal
)
{
if
(
pPage
->
isLocal
)
{
tdbPCacheUnpinPage
(
pCache
,
pPage
);
tdbPCacheUnpinPage
(
pCache
,
pPage
);
}
else
{
}
else
{
if
(
TDB_TXN_IS_WRITE
(
pTxn
))
{
if
(
TDB_TXN_IS_WRITE
(
pTxn
))
{
// remove from hash
// remove from hash
tdbPCacheRemovePageFromHash
(
pCache
,
pPage
);
tdbPCacheRemovePageFromHash
(
pCache
,
pPage
);
}
tdbPageDestroy
(
pPage
,
pTxn
->
xFree
,
pTxn
->
xArg
);
}
}
}
tdbPCacheUnlock
(
pCache
);
tdbPageDestroy
(
pPage
,
pTxn
->
xFree
,
pTxn
->
xArg
);
}
// }
}
}
tdbPCacheUnlock
(
pCache
);
// printf("thread %" PRId64 " relas page %d pgno %d pPage %p nRef %d\n", taosGetSelfPthreadId(), pPage->id,
// TDB_PAGE_PGNO(pPage), pPage, nRef);
}
}
int
tdbPCacheGetPageSize
(
SPCache
*
pCache
)
{
return
pCache
->
szPage
;
}
int
tdbPCacheGetPageSize
(
SPCache
*
pCache
)
{
return
pCache
->
szPage
;
}
...
@@ -223,6 +232,7 @@ static void tdbPCachePinPage(SPCache *pCache, SPage *pPage) {
...
@@ -223,6 +232,7 @@ static void tdbPCachePinPage(SPCache *pCache, SPage *pPage) {
pCache
->
nRecyclable
--
;
pCache
->
nRecyclable
--
;
// printf("pin page %d pgno %d pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage);
tdbTrace
(
"pin page %d"
,
pPage
->
id
);
tdbTrace
(
"pin page %d"
,
pPage
->
id
);
}
}
}
}
...
@@ -243,6 +253,7 @@ static void tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage) {
...
@@ -243,6 +253,7 @@ static void tdbPCacheUnpinPage(SPCache *pCache, SPage *pPage) {
pCache
->
nRecyclable
++
;
pCache
->
nRecyclable
++
;
// printf("unpin page %d pgno %d pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage);
tdbTrace
(
"unpin page %d"
,
pPage
->
id
);
tdbTrace
(
"unpin page %d"
,
pPage
->
id
);
}
}
...
@@ -253,10 +264,12 @@ static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage) {
...
@@ -253,10 +264,12 @@ static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage) {
h
=
tdbPCachePageHash
(
&
(
pPage
->
pgid
));
h
=
tdbPCachePageHash
(
&
(
pPage
->
pgid
));
for
(
ppPage
=
&
(
pCache
->
pgHash
[
h
%
pCache
->
nHash
]);
(
*
ppPage
)
&&
*
ppPage
!=
pPage
;
ppPage
=
&
((
*
ppPage
)
->
pHashNext
))
for
(
ppPage
=
&
(
pCache
->
pgHash
[
h
%
pCache
->
nHash
]);
(
*
ppPage
)
&&
*
ppPage
!=
pPage
;
ppPage
=
&
((
*
ppPage
)
->
pHashNext
))
;
;
ASSERT
(
*
ppPage
==
pPage
);
*
ppPage
=
pPage
->
pHashNext
;
pCache
->
nPage
--
;
if
(
*
ppPage
)
{
*
ppPage
=
pPage
->
pHashNext
;
pCache
->
nPage
--
;
// printf("rmv page %d to hash, pgno %d, pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage);
}
tdbTrace
(
"remove page %d to hash"
,
pPage
->
id
);
tdbTrace
(
"remove page %d to hash"
,
pPage
->
id
);
}
}
...
@@ -271,6 +284,7 @@ static void tdbPCacheAddPageToHash(SPCache *pCache, SPage *pPage) {
...
@@ -271,6 +284,7 @@ static void tdbPCacheAddPageToHash(SPCache *pCache, SPage *pPage) {
pCache
->
nPage
++
;
pCache
->
nPage
++
;
// printf("add page %d to hash, pgno %d, pPage %p\n", pPage->id, TDB_PAGE_PGNO(pPage), pPage);
tdbTrace
(
"add page %d to hash"
,
pPage
->
id
);
tdbTrace
(
"add page %d to hash"
,
pPage
->
id
);
}
}
...
...
source/libs/tdb/src/db/tdbPager.c
浏览文件 @
45c7f7dd
...
@@ -265,6 +265,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
...
@@ -265,6 +265,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
pgid
.
pgno
=
pgno
;
pgid
.
pgno
=
pgno
;
pPage
=
tdbPCacheFetch
(
pPager
->
pCache
,
&
pgid
,
pTxn
);
pPage
=
tdbPCacheFetch
(
pPager
->
pCache
,
&
pgid
,
pTxn
);
if
(
pPage
==
NULL
)
{
if
(
pPage
==
NULL
)
{
ASSERT
(
0
);
return
-
1
;
return
-
1
;
}
}
...
@@ -272,10 +273,14 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
...
@@ -272,10 +273,14 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
if
(
!
TDB_PAGE_INITIALIZED
(
pPage
))
{
if
(
!
TDB_PAGE_INITIALIZED
(
pPage
))
{
ret
=
tdbPagerInitPage
(
pPager
,
pPage
,
initPage
,
arg
,
loadPage
);
ret
=
tdbPagerInitPage
(
pPager
,
pPage
,
initPage
,
arg
,
loadPage
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
ASSERT
(
0
);
return
-
1
;
return
-
1
;
}
}
}
}
// printf("thread %" PRId64 " pager fetch page %d pgno %d ppage %p\n", taosGetSelfPthreadId(), pPage->id,
// TDB_PAGE_PGNO(pPage), pPage);
ASSERT
(
TDB_PAGE_INITIALIZED
(
pPage
));
ASSERT
(
TDB_PAGE_INITIALIZED
(
pPage
));
ASSERT
(
pPage
->
pPager
==
pPager
);
ASSERT
(
pPage
->
pPager
==
pPager
);
...
@@ -284,7 +289,11 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
...
@@ -284,7 +289,11 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
return
0
;
return
0
;
}
}
void
tdbPagerReturnPage
(
SPager
*
pPager
,
SPage
*
pPage
,
TXN
*
pTxn
)
{
tdbPCacheRelease
(
pPager
->
pCache
,
pPage
,
pTxn
);
}
void
tdbPagerReturnPage
(
SPager
*
pPager
,
SPage
*
pPage
,
TXN
*
pTxn
)
{
tdbPCacheRelease
(
pPager
->
pCache
,
pPage
,
pTxn
);
// printf("thread %" PRId64 " pager retun page %d pgno %d ppage %p\n", taosGetSelfPthreadId(), pPage->id,
// TDB_PAGE_PGNO(pPage), pPage);
}
static
int
tdbPagerAllocFreePage
(
SPager
*
pPager
,
SPgno
*
ppgno
)
{
static
int
tdbPagerAllocFreePage
(
SPager
*
pPager
,
SPgno
*
ppgno
)
{
// TODO: Allocate a page from the free list
// TODO: Allocate a page from the free list
...
@@ -352,6 +361,7 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
...
@@ -352,6 +361,7 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
ret
=
(
*
initPage
)(
pPage
,
arg
,
init
);
ret
=
(
*
initPage
)(
pPage
,
arg
,
init
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
ASSERT
(
0
);
TDB_UNLOCK_PAGE
(
pPage
);
TDB_UNLOCK_PAGE
(
pPage
);
return
-
1
;
return
-
1
;
}
}
...
@@ -370,6 +380,7 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
...
@@ -370,6 +380,7 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
}
}
}
}
}
else
{
}
else
{
ASSERT
(
0
);
return
-
1
;
return
-
1
;
}
}
...
...
source/libs/tdb/src/inc/tdbInt.h
浏览文件 @
45c7f7dd
...
@@ -275,15 +275,15 @@ static inline i32 tdbUnrefPage(SPage *pPage) {
...
@@ -275,15 +275,15 @@ static inline i32 tdbUnrefPage(SPage *pPage) {
#define P_LOCK_FAIL -1
#define P_LOCK_FAIL -1
static
inline
int
tdbTryLockPage
(
tdb_spinlock_t
*
pLock
)
{
static
inline
int
tdbTryLockPage
(
tdb_spinlock_t
*
pLock
)
{
int
ret
;
int
ret
=
tdbSpinlockTrylock
(
pLock
)
;
if
(
tdbSpinlockTrylock
(
pLock
)
==
0
)
{
if
(
ret
==
0
)
{
ret
=
P_LOCK_SUCC
;
ret
urn
P_LOCK_SUCC
;
}
else
if
(
errno
==
EBUSY
)
{
}
else
if
(
ret
==
EBUSY
)
{
ret
=
P_LOCK_BUSY
;
ret
urn
P_LOCK_BUSY
;
}
else
{
}
else
{
ret
=
P_LOCK_FAIL
;
ASSERT
(
0
);
return
P_LOCK_FAIL
;
}
}
return
ret
;
}
}
#define TDB_INIT_PAGE_LOCK(pPage) tdbSpinlockInit(&((pPage)->lock), 0)
#define TDB_INIT_PAGE_LOCK(pPage) tdbSpinlockInit(&((pPage)->lock), 0)
...
...
source/libs/tdb/test/tdbTest.cpp
浏览文件 @
45c7f7dd
...
@@ -486,18 +486,18 @@ TEST(tdb_test, DISABLED_simple_upsert1) {
...
@@ -486,18 +486,18 @@ TEST(tdb_test, DISABLED_simple_upsert1) {
tdbClose
(
pEnv
);
tdbClose
(
pEnv
);
}
}
TEST
(
tdb_test
,
DISABLED_
multi_thread_query
)
{
TEST
(
tdb_test
,
multi_thread_query
)
{
int
ret
;
int
ret
;
TDB
*
pEnv
;
TDB
*
pEnv
;
TTB
*
pDb
;
TTB
*
pDb
;
tdb_cmpr_fn_t
compFunc
;
tdb_cmpr_fn_t
compFunc
;
int
nData
=
100000
;
int
nData
=
100000
0
;
TXN
txn
;
TXN
txn
;
taosRemoveDir
(
"tdb"
);
taosRemoveDir
(
"tdb"
);
// Open Env
// Open Env
ret
=
tdbOpen
(
"tdb"
,
512
,
1
,
&
pEnv
);
ret
=
tdbOpen
(
"tdb"
,
4096
,
10
,
&
pEnv
);
GTEST_ASSERT_EQ
(
ret
,
0
);
GTEST_ASSERT_EQ
(
ret
,
0
);
// Create a database
// Create a database
...
@@ -507,7 +507,7 @@ TEST(tdb_test, DISABLED_multi_thread_query) {
...
@@ -507,7 +507,7 @@ TEST(tdb_test, DISABLED_multi_thread_query) {
char
key
[
64
];
char
key
[
64
];
char
val
[
64
];
char
val
[
64
];
int64_t
poolLimit
=
4096
;
// 1M pool limit
int64_t
poolLimit
=
4096
*
20
;
// 1M pool limit
int64_t
txnid
=
0
;
int64_t
txnid
=
0
;
SPoolMem
*
pPool
;
SPoolMem
*
pPool
;
...
@@ -600,7 +600,7 @@ TEST(tdb_test, DISABLED_multi_thread_query) {
...
@@ -600,7 +600,7 @@ TEST(tdb_test, DISABLED_multi_thread_query) {
GTEST_ASSERT_EQ
(
ret
,
0
);
GTEST_ASSERT_EQ
(
ret
,
0
);
}
}
TEST
(
tdb_test
,
multi_thread1
)
{
TEST
(
tdb_test
,
DISABLED_
multi_thread1
)
{
#if 0
#if 0
int ret;
int ret;
TDB *pDb;
TDB *pDb;
...
...
source/util/src/tqueue.c
浏览文件 @
45c7f7dd
...
@@ -162,7 +162,7 @@ void *taosAllocateQitem(int32_t size, EQItype itype) {
...
@@ -162,7 +162,7 @@ void *taosAllocateQitem(int32_t size, EQItype itype) {
uTrace
(
"item:%p, node:%p is allocated"
,
pNode
->
item
,
pNode
);
uTrace
(
"item:%p, node:%p is allocated"
,
pNode
->
item
,
pNode
);
}
}
return
(
void
*
)
pNode
->
item
;
return
pNode
->
item
;
}
}
void
taosFreeQitem
(
void
*
pItem
)
{
void
taosFreeQitem
(
void
*
pItem
)
{
...
...
tests/script/tsim/insert/update0.sim
0 → 100644
浏览文件 @
45c7f7dd
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database
sql create database d0 keep 365000d,365000d,365000d
sql use d0
print =============== create super table and register rsma
sql create table if not exists stb (ts timestamp, c1 int) tags (city binary(20),district binary(20)) rollup(min) file_factor 0.1 delay 2;
sql show stables
if $rows != 1 then
return -1
endi
print =============== create child table
sql create table ct1 using stb tags("BeiJing", "ChaoYang")
sql create table ct2 using stb tags("BeiJing", "HaiDian")
sql show tables
if $rows != 2 then
return -1
endi
print =============== step3-1 insert records into ct1
sql insert into ct1 values('2022-05-03 16:59:00.010', 10);
sql insert into ct1 values('2022-05-03 16:59:00.011', 11);
sql insert into ct1 values('2022-05-03 16:59:00.016', 16);
sql insert into ct1 values('2022-05-03 16:59:00.016', 17);
sql insert into ct1 values('2022-05-03 16:59:00.020', 20);
sql insert into ct1 values('2022-05-03 16:59:00.016', 18);
sql insert into ct1 values('2022-05-03 16:59:00.021', 21);
sql insert into ct1 values('2022-05-03 16:59:00.022', 22);
print =============== step3-1 query records of ct1 from memory
sql select * from ct1;
print $data00 $data01
print $data10 $data11
print $data20 $data21
print $data30 $data31
print $data40 $data41
print $data50 $data51
if $rows != 6 then
print rows $rows != 6
return -1
endi
if $data01 != 10 then
print data01 $data01 != 10
return -1
endi
if $data21 != 18 then
print data21 $data21 != 18
return -1
endi
if $data51 != 22 then
print data51 $data51 != 22
return -1
endi
print =============== step3-1 insert records into ct2
sql insert into ct2 values('2022-03-02 16:59:00.010', 1),('2022-03-02 16:59:00.010',11),('2022-04-01 16:59:00.011',2),('2022-04-01 16:59:00.011',5),('2022-03-06 16:59:00.013',7);
sql insert into ct2 values('2022-03-02 16:59:00.010', 3),('2022-03-02 16:59:00.010',33),('2022-04-01 16:59:00.011',4),('2022-04-01 16:59:00.011',6),('2022-03-06 16:59:00.013',8);
sql insert into ct2 values('2022-03-02 16:59:00.010', 103),('2022-03-02 16:59:00.010',303),('2022-04-01 16:59:00.011',40),('2022-04-01 16:59:00.011',60),('2022-03-06 16:59:00.013',80);
print =============== step3-1 query records of ct2 from memory
sql select * from ct2;
print $data00 $data01
print $data10 $data11
print $data20 $data21
if $rows != 3 then
print rows $rows != 3
return -1
endi
if $data01 != 103 then
print data01 $data01 != 103
return -1
endi
if $data11 != 80 then
print data11 $data11 != 80
return -1
endi
if $data21 != 40 then
print data21 $data21 != 40
return -1
endi
#==================== reboot to trigger commit data to file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
print =============== step3-2 query records of ct1 from file
sql select * from ct1;
print $data00 $data01
print $data10 $data11
print $data20 $data21
print $data30 $data31
print $data40 $data41
print $data50 $data51
if $rows != 6 then
print rows $rows != 6
return -1
endi
if $data01 != 10 then
print data01 $data01 != 10
return -1
endi
if $data21 != 18 then
print data21 $data21 != 18
return -1
endi
if $data51 != 22 then
print data51 $data51 != 22
return -1
endi
print =============== step3-2 query records of ct2 from file
sql select * from ct2;
print $data00 $data01
print $data10 $data11
print $data20 $data21
if $rows != 3 then
print rows $rows != 3
return -1
endi
if $data01 != 103 then
print data01 $data01 != 103
return -1
endi
if $data11 != 80 then
print data11 $data11 != 80
return -1
endi
if $data21 != 40 then
print data21 $data21 != 40
return -1
endi
print =============== step3-3 query records of ct1 from memory and file(merge)
sql insert into ct1 values('2022-05-03 16:59:00.010', 100);
sql insert into ct1 values('2022-05-03 16:59:00.022', 200);
sql insert into ct1 values('2022-05-03 16:59:00.016', 160);
sql select * from ct1;
print $data00 $data01
print $data10 $data11
print $data20 $data21
print $data30 $data31
print $data40 $data41
print $data50 $data51
if $rows != 6 then
print rows $rows != 6
return -1
endi
if $data01 != 100 then
print data01 $data01 != 100
return -1
endi
if $data21 != 160 then
print data21 $data21 != 160
return -1
endi
if $data51 != 200 then
print data51 $data51 != 200
return -1
endi
print =============== step3-3 query records of ct2 from memory and file(merge)
sql insert into ct2(ts) values('2022-04-02 16:59:00.016');
sql insert into ct2 values('2022-03-06 16:59:00.013', NULL);
sql insert into ct2 values('2022-03-01 16:59:00.016', 10);
sql insert into ct2(ts) values('2022-04-01 16:59:00.011');
sql select * from ct2;
print $data00 $data01
print $data10 $data11
print $data20 $data21
print $data30 $data31
print $data40 $data41
if $rows != 5 then
print rows $rows != 5
return -1
endi
if $data01 != 10 then
print data01 $data01 != 10
return -1
endi
if $data11 != 103 then
print data11 $data11 != 103
return -1
endi
if $data21 != NULL then
print data21 $data21 != NULL
return -1
endi
if $data31 != 40 then
print data31 $data31 != 40
return -1
endi
if $data41 != NULL then
print data41 $data41 != NULL
return -1
endi
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录