Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
1e32228d
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
1e32228d
编写于
5月 14, 2022
作者:
P
plum-lihui
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of github.com:taosdata/TDengine into 3.0
上级
ec6efecf
a5210ee1
变更
29
展开全部
显示空白变更内容
内联
并排
Showing
29 changed file
with
858 addition
and
734 deletion
+858
-734
include/common/tglobal.h
include/common/tglobal.h
+2
-0
include/libs/nodes/querynodes.h
include/libs/nodes/querynodes.h
+1
-0
include/util/taoserror.h
include/util/taoserror.h
+1
-0
include/util/tprocess.h
include/util/tprocess.h
+2
-1
include/util/tqueue.h
include/util/tqueue.h
+10
-4
packaging/release.sh
packaging/release.sh
+4
-4
source/client/src/tmq.c
source/client/src/tmq.c
+5
-5
source/common/src/tglobal.c
source/common/src/tglobal.c
+7
-0
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
+1
-1
source/dnode/mgmt/mgmt_qnode/src/qmWorker.c
source/dnode/mgmt/mgmt_qnode/src/qmWorker.c
+3
-3
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+7
-7
source/dnode/mgmt/node_mgmt/src/dmTransport.c
source/dnode/mgmt/node_mgmt/src/dmTransport.c
+1
-1
source/dnode/mnode/impl/src/mnode.c
source/dnode/mnode/impl/src/mnode.c
+1
-5
source/libs/executor/src/dataDispatcher.c
source/libs/executor/src/dataDispatcher.c
+4
-4
source/libs/nodes/src/nodesUtilFuncs.c
source/libs/nodes/src/nodesUtilFuncs.c
+13
-0
source/libs/parser/inc/sql.y
source/libs/parser/inc/sql.y
+3
-3
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+53
-14
source/libs/parser/src/sql.c
source/libs/parser/src/sql.c
+638
-621
source/libs/parser/test/parSelectTest.cpp
source/libs/parser/test/parSelectTest.cpp
+8
-0
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+5
-1
source/libs/planner/test/planSubqueryTest.cpp
source/libs/planner/test/planSubqueryTest.cpp
+6
-3
source/libs/scalar/src/scalar.c
source/libs/scalar/src/scalar.c
+1
-0
source/libs/sync/src/syncIO.c
source/libs/sync/src/syncIO.c
+3
-3
source/libs/tdb/CMakeLists.txt
source/libs/tdb/CMakeLists.txt
+1
-1
source/libs/transport/test/pushServer.c
source/libs/transport/test/pushServer.c
+1
-1
source/libs/transport/test/rserver.c
source/libs/transport/test/rserver.c
+1
-1
source/util/src/terror.c
source/util/src/terror.c
+1
-0
source/util/src/tprocess.c
source/util/src/tprocess.c
+2
-2
source/util/src/tqueue.c
source/util/src/tqueue.c
+73
-49
未找到文件。
include/common/tglobal.h
浏览文件 @
1e32228d
...
...
@@ -51,6 +51,7 @@ extern int32_t tsVnodeShmSize;
extern
int32_t
tsQnodeShmSize
;
extern
int32_t
tsSnodeShmSize
;
extern
int32_t
tsBnodeShmSize
;
extern
int32_t
tsNumOfShmThreads
;
// queue & threads
extern
int32_t
tsNumOfRpcThreads
;
...
...
@@ -67,6 +68,7 @@ extern int32_t tsNumOfQnodeQueryThreads;
extern
int32_t
tsNumOfQnodeFetchThreads
;
extern
int32_t
tsNumOfSnodeSharedThreads
;
extern
int32_t
tsNumOfSnodeUniqueThreads
;
extern
int64_t
tsRpcQueueMemoryAllowed
;
// monitor
extern
bool
tsEnableMonitor
;
...
...
include/libs/nodes/querynodes.h
浏览文件 @
1e32228d
...
...
@@ -345,6 +345,7 @@ bool nodesIsUnaryOp(const SOperatorNode* pOp);
bool
nodesIsArithmeticOp
(
const
SOperatorNode
*
pOp
);
bool
nodesIsComparisonOp
(
const
SOperatorNode
*
pOp
);
bool
nodesIsJsonOp
(
const
SOperatorNode
*
pOp
);
bool
nodesIsRegularOp
(
const
SOperatorNode
*
pOp
);
bool
nodesIsTimeorderQuery
(
const
SNode
*
pQuery
);
bool
nodesIsTimelineQuery
(
const
SNode
*
pQuery
);
...
...
include/util/taoserror.h
浏览文件 @
1e32228d
...
...
@@ -89,6 +89,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_REPEAT_INIT TAOS_DEF_ERROR_CODE(0, 0x0115)
#define TSDB_CODE_DUP_KEY TAOS_DEF_ERROR_CODE(0, 0x0116)
#define TSDB_CODE_NEED_RETRY TAOS_DEF_ERROR_CODE(0, 0x0117)
#define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0118)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0140)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0141)
...
...
include/util/tprocess.h
浏览文件 @
1e32228d
...
...
@@ -17,6 +17,7 @@
#define _TD_UTIL_PROCESS_H_
#include "os.h"
#include "tqueue.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -25,7 +26,7 @@ extern "C" {
typedef
enum
{
PROC_FUNC_REQ
=
1
,
PROC_FUNC_RSP
,
PROC_FUNC_REGIST
,
PROC_FUNC_RELEASE
}
EProcFuncType
;
typedef
struct
SProcObj
SProcObj
;
typedef
void
*
(
*
ProcMallocFp
)(
int32_t
contLen
);
typedef
void
*
(
*
ProcMallocFp
)(
int32_t
contLen
,
EQItype
itype
);
typedef
void
*
(
*
ProcFreeFp
)(
void
*
pCont
);
typedef
void
(
*
ProcConsumeFp
)(
void
*
parent
,
void
*
pHead
,
int16_t
headLen
,
void
*
pBody
,
int32_t
bodyLen
,
EProcFuncType
ftype
);
...
...
include/util/tqueue.h
浏览文件 @
1e32228d
...
...
@@ -48,18 +48,24 @@ typedef struct {
int32_t
threadNum
;
}
SQueueInfo
;
typedef
enum
{
DEF_QITEM
=
0
,
RPC_QITEM
=
1
,
}
EQItype
;
typedef
void
(
*
FItem
)(
SQueueInfo
*
pInfo
,
void
*
pItem
);
typedef
void
(
*
FItems
)(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfItems
);
STaosQueue
*
taosOpenQueue
();
void
taosCloseQueue
(
STaosQueue
*
queue
);
void
taosSetQueueFp
(
STaosQueue
*
queue
,
FItem
itemFp
,
FItems
itemsFp
);
void
*
taosAllocateQitem
(
int32_t
size
);
void
*
taosAllocateQitem
(
int32_t
size
,
EQItype
itype
);
void
taosFreeQitem
(
void
*
pItem
);
void
taosWriteQitem
(
STaosQueue
*
queue
,
void
*
pItem
);
int32_t
taosReadQitem
(
STaosQueue
*
queue
,
void
**
ppItem
);
bool
taosQueueEmpty
(
STaosQueue
*
queue
);
int32_t
taosQueueSize
(
STaosQueue
*
queue
);
int32_t
taosQueueItemSize
(
STaosQueue
*
queue
);
int64_t
taosQueueMemorySize
(
STaosQueue
*
queue
);
STaosQall
*
taosAllocateQall
();
void
taosFreeQall
(
STaosQall
*
qall
);
...
...
@@ -77,8 +83,8 @@ int32_t taosGetQueueNumber(STaosQset *qset);
int32_t
taosReadQitemFromQset
(
STaosQset
*
qset
,
void
**
ppItem
,
void
**
ahandle
,
FItem
*
itemFp
);
int32_t
taosReadAllQitemsFromQset
(
STaosQset
*
qset
,
STaosQall
*
qall
,
void
**
ahandle
,
FItems
*
itemsFp
);
void
taosResetQsetThread
(
STaosQset
*
qset
,
void
*
pItem
);
int32_t
taosGetQueueItemsNumber
(
STaosQueue
*
queue
);
int32_t
taosGetQsetItemsNumber
(
STaosQset
*
qset
)
;
extern
int64_t
tsRpcQueueMemoryAllowed
;
#ifdef __cplusplus
}
...
...
packaging/release.sh
浏览文件 @
1e32228d
...
...
@@ -63,12 +63,12 @@ cp ${install_files} ${install_dir}
header_files
=
"
${
top_dir
}
/include/client/taos.h
${
top_dir
}
/include/util/taoserror.h"
cp
${
header_files
}
${
install_dir
}
/inc
bin_files
=
"
${
compile_dir
}
/
source/dnode/mgmt/taosd
${
compile_dir
}
/tools/shell/taos
${
compile_dir
}
/tests/test/c/create_table
${
compile_dir
}
/tests/test/c
/tmq_sim
${
script_dir
}
/remove.sh
${
compile_dir
}
/build/bin/taosBenchmark
${
compile_dir
}
/build/bin/taosdump"
bin_files
=
"
${
compile_dir
}
/
build/bin/taosd
${
compile_dir
}
/build/bin/taos
${
compile_dir
}
/build/bin/create_table
${
compile_dir
}
/build/bin
/tmq_sim
${
script_dir
}
/remove.sh
${
compile_dir
}
/build/bin/taosBenchmark
${
compile_dir
}
/build/bin/taosdump"
cp
-rf
${
bin_files
}
${
install_dir
}
/bin
&&
chmod
a+x
${
install_dir
}
/bin/
*
||
:
cp
-rf
${
compile_dir
}
/source/client
/libtaos.so
${
install_dir
}
/lib/
cp
-rf
${
compile_dir
}
/source/libs/td
b/libtdb.so
${
install_dir
}
/lib/
cp
-rf
${
compile_dir
}
/build/lib/libavro
*
${
install_dir
}
/lib/
>
/dev/null
||
echo
-e
"failed to copy avro libraries"
cp
${
compile_dir
}
/build/lib
/libtaos.so
${
install_dir
}
/lib/
cp
${
compile_dir
}
/build/li
b/libtdb.so
${
install_dir
}
/lib/
cp
${
compile_dir
}
/build/lib/libavro
*
${
install_dir
}
/lib/
>
/dev/null
||
echo
-e
"failed to copy avro libraries"
cp
-rf
${
compile_dir
}
/build/lib/pkgconfig
${
install_dir
}
/lib/
>
/dev/null
||
echo
-e
"failed to copy pkgconfig directory"
...
...
source/client/src/tmq.c
浏览文件 @
1e32228d
...
...
@@ -316,7 +316,7 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
void
tmqAssignDelayedHbTask
(
void
*
param
,
void
*
tmrId
)
{
tmq_t
*
tmq
=
(
tmq_t
*
)
param
;
int8_t
*
pTaskType
=
taosAllocateQitem
(
sizeof
(
int8_t
));
int8_t
*
pTaskType
=
taosAllocateQitem
(
sizeof
(
int8_t
)
,
DEF_QITEM
);
*
pTaskType
=
TMQ_DELAYED_TASK__HB
;
taosWriteQitem
(
tmq
->
delayedTask
,
pTaskType
);
tsem_post
(
&
tmq
->
rspSem
);
...
...
@@ -324,7 +324,7 @@ void tmqAssignDelayedHbTask(void* param, void* tmrId) {
void
tmqAssignDelayedCommitTask
(
void
*
param
,
void
*
tmrId
)
{
tmq_t
*
tmq
=
(
tmq_t
*
)
param
;
int8_t
*
pTaskType
=
taosAllocateQitem
(
sizeof
(
int8_t
));
int8_t
*
pTaskType
=
taosAllocateQitem
(
sizeof
(
int8_t
)
,
DEF_QITEM
);
*
pTaskType
=
TMQ_DELAYED_TASK__COMMIT
;
taosWriteQitem
(
tmq
->
delayedTask
,
pTaskType
);
tsem_post
(
&
tmq
->
rspSem
);
...
...
@@ -332,7 +332,7 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
void
tmqAssignDelayedReportTask
(
void
*
param
,
void
*
tmrId
)
{
tmq_t
*
tmq
=
(
tmq_t
*
)
param
;
int8_t
*
pTaskType
=
taosAllocateQitem
(
sizeof
(
int8_t
));
int8_t
*
pTaskType
=
taosAllocateQitem
(
sizeof
(
int8_t
)
,
DEF_QITEM
);
*
pTaskType
=
TMQ_DELAYED_TASK__REPORT
;
taosWriteQitem
(
tmq
->
delayedTask
,
pTaskType
);
tsem_post
(
&
tmq
->
rspSem
);
...
...
@@ -848,7 +848,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
tscWarn
(
"mismatch rsp from vg %d, epoch %d, current epoch %d"
,
pParam
->
vgId
,
msgEpoch
,
tmqEpoch
);
}
SMqPollRspWrapper
*
pRspWrapper
=
taosAllocateQitem
(
sizeof
(
SMqPollRspWrapper
));
SMqPollRspWrapper
*
pRspWrapper
=
taosAllocateQitem
(
sizeof
(
SMqPollRspWrapper
)
,
DEF_QITEM
);
if
(
pRspWrapper
==
NULL
)
{
tscWarn
(
"msg discard from vg %d, epoch %d since out of memory"
,
pParam
->
vgId
,
pParam
->
epoch
);
goto
CREATE_MSG_FAIL
;
...
...
@@ -987,7 +987,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
tmqUpdateEp
(
tmq
,
head
->
epoch
,
&
rsp
);
tDeleteSMqAskEpRsp
(
&
rsp
);
}
else
{
SMqAskEpRspWrapper
*
pWrapper
=
taosAllocateQitem
(
sizeof
(
SMqAskEpRspWrapper
));
SMqAskEpRspWrapper
*
pWrapper
=
taosAllocateQitem
(
sizeof
(
SMqAskEpRspWrapper
)
,
DEF_QITEM
);
if
(
pWrapper
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
-
1
;
...
...
source/common/src/tglobal.c
浏览文件 @
1e32228d
...
...
@@ -44,6 +44,7 @@ int32_t tsVnodeShmSize = TSDB_MAX_WAL_SIZE * 10 + 128;
int32_t
tsQnodeShmSize
=
TSDB_MAX_WAL_SIZE
*
4
+
128
;
int32_t
tsSnodeShmSize
=
TSDB_MAX_WAL_SIZE
*
4
+
128
;
int32_t
tsBnodeShmSize
=
TSDB_MAX_WAL_SIZE
*
4
+
128
;
int32_t
tsNumOfShmThreads
=
1
;
// queue & threads
int32_t
tsNumOfRpcThreads
=
1
;
...
...
@@ -375,6 +376,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if
(
cfgAddInt32
(
pCfg
,
"qnodeShmSize"
,
tsQnodeShmSize
,
TSDB_MAX_WAL_SIZE
+
128
,
INT32_MAX
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"snodeShmSize"
,
tsSnodeShmSize
,
TSDB_MAX_WAL_SIZE
+
128
,
INT32_MAX
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"bnodeShmSize"
,
tsBnodeShmSize
,
TSDB_MAX_WAL_SIZE
+
128
,
INT32_MAX
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"mumOfShmThreads"
,
tsNumOfShmThreads
,
1
,
1024
,
0
)
!=
0
)
return
-
1
;
tsNumOfRpcThreads
=
tsNumOfCores
/
2
;
tsNumOfRpcThreads
=
TRANGE
(
tsNumOfRpcThreads
,
1
,
4
);
...
...
@@ -428,6 +430,10 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfSnodeUniqueThreads
=
TRANGE
(
tsNumOfSnodeUniqueThreads
,
2
,
4
);
if
(
cfgAddInt32
(
pCfg
,
"numOfSnodeUniqueThreads"
,
tsNumOfSnodeUniqueThreads
,
1
,
1024
,
0
)
!=
0
)
return
-
1
;
tsRpcQueueMemoryAllowed
=
tsTotalMemoryKB
*
1024
*
0
.
1
;
tsRpcQueueMemoryAllowed
=
TRANGE
(
tsRpcQueueMemoryAllowed
,
TSDB_MAX_WAL_SIZE
*
10L
,
TSDB_MAX_WAL_SIZE
*
10000L
);
if
(
cfgAddInt64
(
pCfg
,
"rpcQueueMemoryAllowed"
,
tsRpcQueueMemoryAllowed
,
1
,
INT64_MAX
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddBool
(
pCfg
,
"monitor"
,
tsEnableMonitor
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"monitorInterval"
,
tsMonitorInterval
,
1
,
200000
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddString
(
pCfg
,
"monitorFqdn"
,
tsMonitorFqdn
,
0
)
!=
0
)
return
-
1
;
...
...
@@ -568,6 +574,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfQnodeFetchThreads
=
cfgGetItem
(
pCfg
,
"numOfQnodeFetchThreads"
)
->
i32
;
tsNumOfSnodeSharedThreads
=
cfgGetItem
(
pCfg
,
"numOfSnodeSharedThreads"
)
->
i32
;
tsNumOfSnodeUniqueThreads
=
cfgGetItem
(
pCfg
,
"numOfSnodeUniqueThreads"
)
->
i32
;
tsRpcQueueMemoryAllowed
=
cfgGetItem
(
pCfg
,
"rpcQueueMemoryAllowed"
)
->
i64
;
tsEnableMonitor
=
cfgGetItem
(
pCfg
,
"monitor"
)
->
bval
;
tsMonitorInterval
=
cfgGetItem
(
pCfg
,
"monitorInterval"
)
->
i32
;
...
...
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
浏览文件 @
1e32228d
...
...
@@ -104,7 +104,7 @@ int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
}
static
inline
int32_t
mmPutRpcMsgToWorker
(
SSingleWorker
*
pWorker
,
SRpcMsg
*
pRpc
)
{
SNodeMsg
*
pMsg
=
taosAllocateQitem
(
sizeof
(
SNodeMsg
));
SNodeMsg
*
pMsg
=
taosAllocateQitem
(
sizeof
(
SNodeMsg
)
,
RPC_QITEM
);
if
(
pMsg
==
NULL
)
return
-
1
;
dTrace
(
"msg:%p, is created and put into worker:%s, type:%s"
,
pMsg
,
pWorker
->
name
,
TMSG_INFO
(
pRpc
->
msgType
));
...
...
source/dnode/mgmt/mgmt_qnode/src/qmWorker.c
浏览文件 @
1e32228d
...
...
@@ -102,7 +102,7 @@ int32_t qmPutNodeMsgToMonitorQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {
}
static
int32_t
qmPutRpcMsgToWorker
(
SQnodeMgmt
*
pMgmt
,
SSingleWorker
*
pWorker
,
SRpcMsg
*
pRpc
)
{
SNodeMsg
*
pMsg
=
taosAllocateQitem
(
sizeof
(
SNodeMsg
));
SNodeMsg
*
pMsg
=
taosAllocateQitem
(
sizeof
(
SNodeMsg
)
,
RPC_QITEM
);
if
(
pMsg
==
NULL
)
{
return
-
1
;
}
...
...
@@ -126,10 +126,10 @@ int32_t qmGetQueueSize(SQnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
switch
(
qtype
)
{
case
QUERY_QUEUE
:
size
=
taosQueueSize
(
pMgmt
->
queryWorker
.
queue
);
size
=
taosQueue
Item
Size
(
pMgmt
->
queryWorker
.
queue
);
break
;
case
FETCH_QUEUE
:
size
=
taosQueueSize
(
pMgmt
->
fetchWorker
.
queue
);
size
=
taosQueue
Item
Size
(
pMgmt
->
fetchWorker
.
queue
);
break
;
default:
break
;
...
...
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
浏览文件 @
1e32228d
...
...
@@ -326,7 +326,7 @@ static int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pRpc, EQueueType q
SVnodeObj
*
pVnode
=
vmAcquireVnode
(
pMgmt
,
pHead
->
vgId
);
if
(
pVnode
==
NULL
)
return
-
1
;
SNodeMsg
*
pMsg
=
taosAllocateQitem
(
sizeof
(
SNodeMsg
));
SNodeMsg
*
pMsg
=
taosAllocateQitem
(
sizeof
(
SNodeMsg
)
,
RPC_QITEM
);
int32_t
code
=
0
;
if
(
pMsg
!=
NULL
)
{
...
...
@@ -397,22 +397,22 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
if
(
pVnode
!=
NULL
)
{
switch
(
qtype
)
{
case
WRITE_QUEUE
:
size
=
taosQueueSize
(
pVnode
->
pWriteQ
);
size
=
taosQueue
Item
Size
(
pVnode
->
pWriteQ
);
break
;
case
SYNC_QUEUE
:
size
=
taosQueueSize
(
pVnode
->
pSyncQ
);
size
=
taosQueue
Item
Size
(
pVnode
->
pSyncQ
);
break
;
case
APPLY_QUEUE
:
size
=
taosQueueSize
(
pVnode
->
pApplyQ
);
size
=
taosQueue
Item
Size
(
pVnode
->
pApplyQ
);
break
;
case
QUERY_QUEUE
:
size
=
taosQueueSize
(
pVnode
->
pQueryQ
);
size
=
taosQueue
Item
Size
(
pVnode
->
pQueryQ
);
break
;
case
FETCH_QUEUE
:
size
=
taosQueueSize
(
pVnode
->
pFetchQ
);
size
=
taosQueue
Item
Size
(
pVnode
->
pFetchQ
);
break
;
case
MERGE_QUEUE
:
size
=
taosQueueSize
(
pVnode
->
pMergeQ
);
size
=
taosQueue
Item
Size
(
pVnode
->
pMergeQ
);
break
;
default:
break
;
...
...
source/dnode/mgmt/node_mgmt/src/dmTransport.c
浏览文件 @
1e32228d
...
...
@@ -79,7 +79,7 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe
needRelease
=
true
;
if
((
msgFp
=
dmGetMsgFp
(
pWrapper
,
pRpc
))
==
NULL
)
goto
_OVER
;
if
((
pMsg
=
taosAllocateQitem
(
sizeof
(
SNodeMsg
)))
==
NULL
)
goto
_OVER
;
if
((
pMsg
=
taosAllocateQitem
(
sizeof
(
SNodeMsg
)
,
RPC_QITEM
))
==
NULL
)
goto
_OVER
;
if
(
dmBuildMsg
(
pMsg
,
pRpc
)
!=
0
)
goto
_OVER
;
if
(
pWrapper
->
procType
!=
DND_PROC_PARENT
)
{
...
...
source/dnode/mnode/impl/src/mnode.c
浏览文件 @
1e32228d
...
...
@@ -66,11 +66,7 @@ static void mndPullupTrans(SMnode *pMnode) {
static
void
mndCalMqRebalance
(
SMnode
*
pMnode
)
{
int32_t
contLen
=
0
;
void
*
pReq
=
mndBuildTimerMsg
(
&
contLen
);
SRpcMsg
rpcMsg
=
{
.
msgType
=
TDMT_MND_MQ_TIMER
,
.
pCont
=
pReq
,
.
contLen
=
contLen
,
};
SRpcMsg
rpcMsg
=
{.
msgType
=
TDMT_MND_MQ_TIMER
,
.
pCont
=
pReq
,
.
contLen
=
contLen
};
tmsgPutToQueue
(
&
pMnode
->
msgCb
,
READ_QUEUE
,
&
rpcMsg
);
}
...
...
source/libs/executor/src/dataDispatcher.c
浏览文件 @
1e32228d
...
...
@@ -88,9 +88,9 @@ static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputDat
static
bool
allocBuf
(
SDataDispatchHandle
*
pDispatcher
,
const
SInputData
*
pInput
,
SDataDispatchBuf
*
pBuf
)
{
uint32_t
capacity
=
pDispatcher
->
pManager
->
cfg
.
maxDataBlockNumPerQuery
;
if
(
taosQueueSize
(
pDispatcher
->
pDataBlocks
)
>
capacity
)
{
if
(
taosQueue
Item
Size
(
pDispatcher
->
pDataBlocks
)
>
capacity
)
{
qError
(
"SinkNode queue is full, no capacity, max:%d, current:%d, no capacity"
,
capacity
,
taosQueueSize
(
pDispatcher
->
pDataBlocks
));
taosQueue
Item
Size
(
pDispatcher
->
pDataBlocks
));
return
false
;
}
...
...
@@ -106,7 +106,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
static
int32_t
updateStatus
(
SDataDispatchHandle
*
pDispatcher
)
{
taosThreadMutexLock
(
&
pDispatcher
->
mutex
);
int32_t
blockNums
=
taosQueueSize
(
pDispatcher
->
pDataBlocks
);
int32_t
blockNums
=
taosQueue
Item
Size
(
pDispatcher
->
pDataBlocks
);
int32_t
status
=
(
0
==
blockNums
?
DS_BUF_EMPTY
:
(
blockNums
<
pDispatcher
->
pManager
->
cfg
.
maxDataBlockNumPerQuery
?
DS_BUF_LOW
:
DS_BUF_FULL
));
...
...
@@ -124,7 +124,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
static
int32_t
putDataBlock
(
SDataSinkHandle
*
pHandle
,
const
SInputData
*
pInput
,
bool
*
pContinue
)
{
SDataDispatchHandle
*
pDispatcher
=
(
SDataDispatchHandle
*
)
pHandle
;
SDataDispatchBuf
*
pBuf
=
taosAllocateQitem
(
sizeof
(
SDataDispatchBuf
));
SDataDispatchBuf
*
pBuf
=
taosAllocateQitem
(
sizeof
(
SDataDispatchBuf
)
,
DEF_QITEM
);
if
(
NULL
==
pBuf
||
!
allocBuf
(
pDispatcher
,
pInput
,
pBuf
))
{
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
...
...
source/libs/nodes/src/nodesUtilFuncs.c
浏览文件 @
1e32228d
...
...
@@ -1112,6 +1112,19 @@ bool nodesIsJsonOp(const SOperatorNode* pOp) {
return
false
;
}
bool
nodesIsRegularOp
(
const
SOperatorNode
*
pOp
)
{
switch
(
pOp
->
opType
)
{
case
OP_TYPE_LIKE
:
case
OP_TYPE_NOT_LIKE
:
case
OP_TYPE_MATCH
:
case
OP_TYPE_NMATCH
:
return
true
;
default:
break
;
}
return
false
;
}
bool
nodesIsTimeorderQuery
(
const
SNode
*
pQuery
)
{
return
false
;
}
bool
nodesIsTimelineQuery
(
const
SNode
*
pQuery
)
{
return
false
;
}
...
...
source/libs/parser/inc/sql.y
浏览文件 @
1e32228d
...
...
@@ -868,9 +868,9 @@ query_expression_body(A) ::=
query_expression_body(B) UNION query_expression_body(D). { A = createSetOperator(pCxt, SET_OP_TYPE_UNION, B, D); }
query_primary(A) ::= query_specification(B). { A = B; }
//
query_primary(A) ::=
//
NK_LP query_expression_body(B)
//
order_by_clause_opt slimit_clause_opt limit_clause_opt NK_RP. { A = B; }
query_primary(A) ::=
NK_LP query_expression_body(B)
order_by_clause_opt slimit_clause_opt limit_clause_opt NK_RP. { A = B; }
%type order_by_clause_opt { SNodeList* }
%destructor order_by_clause_opt { nodesDestroyList($$); }
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
1e32228d
...
...
@@ -480,8 +480,8 @@ static EDealRes translateColumn(STranslateContext* pCxt, SColumnNode* pCol) {
return
res
;
}
static
EDealRes
translateValue
(
STranslateContext
*
pCxt
,
SValueNode
*
pVal
)
{
uint8_t
precision
=
(
NULL
!=
pCxt
->
pCurrStmt
?
pCxt
->
pCurrStmt
->
precision
:
pVal
->
node
.
resType
.
precision
);
static
EDealRes
translateValue
Impl
(
STranslateContext
*
pCxt
,
SValueNode
*
pVal
,
SDataType
targetDt
)
{
uint8_t
precision
=
(
NULL
!=
pCxt
->
pCurrStmt
?
pCxt
->
pCurrStmt
->
precision
:
targetDt
.
precision
);
pVal
->
node
.
resType
.
precision
=
precision
;
if
(
pVal
->
placeholderNo
>
0
)
{
return
DEAL_RES_CONTINUE
;
...
...
@@ -493,7 +493,7 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
}
*
(
int64_t
*
)
&
pVal
->
typeData
=
pVal
->
datum
.
i
;
}
else
{
switch
(
pVal
->
node
.
resType
.
type
)
{
switch
(
targetDt
.
type
)
{
case
TSDB_DATA_TYPE_NULL
:
break
;
case
TSDB_DATA_TYPE_BOOL
:
...
...
@@ -562,35 +562,54 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
}
case
TSDB_DATA_TYPE_VARCHAR
:
case
TSDB_DATA_TYPE_VARBINARY
:
{
pVal
->
datum
.
p
=
taosMemoryCalloc
(
1
,
pVal
->
node
.
resType
.
bytes
+
VARSTR_HEADER_SIZE
+
1
);
pVal
->
datum
.
p
=
taosMemoryCalloc
(
1
,
targetDt
.
bytes
+
VARSTR_HEADER_SIZE
+
1
);
if
(
NULL
==
pVal
->
datum
.
p
)
{
return
generateDealNodeErrMsg
(
pCxt
,
TSDB_CODE_OUT_OF_MEMORY
);
}
varDataSetLen
(
pVal
->
datum
.
p
,
pVal
->
node
.
resType
.
bytes
);
strncpy
(
varDataVal
(
pVal
->
datum
.
p
),
pVal
->
literal
,
pVal
->
node
.
resType
.
bytes
);
varDataSetLen
(
pVal
->
datum
.
p
,
targetDt
.
bytes
);
strncpy
(
varDataVal
(
pVal
->
datum
.
p
),
pVal
->
literal
,
targetDt
.
bytes
);
break
;
}
case
TSDB_DATA_TYPE_TIMESTAMP
:
{
if
(
taosParseTime
(
pVal
->
literal
,
&
pVal
->
datum
.
i
,
pVal
->
node
.
resType
.
bytes
,
precision
,
tsDaylight
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
taosParseTime
(
pVal
->
literal
,
&
pVal
->
datum
.
i
,
targetDt
.
bytes
,
precision
,
tsDaylight
)
!=
TSDB_CODE_SUCCESS
)
{
return
generateDealNodeErrMsg
(
pCxt
,
TSDB_CODE_PAR_WRONG_VALUE_TYPE
,
pVal
->
literal
);
}
*
(
int64_t
*
)
&
pVal
->
typeData
=
pVal
->
datum
.
i
;
break
;
}
case
TSDB_DATA_TYPE_NCHAR
:
case
TSDB_DATA_TYPE_NCHAR
:
{
int32_t
bytes
=
targetDt
.
bytes
*
TSDB_NCHAR_SIZE
;
pVal
->
datum
.
p
=
taosMemoryCalloc
(
1
,
bytes
+
VARSTR_HEADER_SIZE
+
1
);
if
(
NULL
==
pVal
->
datum
.
p
)
{
return
generateDealNodeErrMsg
(
pCxt
,
TSDB_CODE_OUT_OF_MEMORY
);
;
}
int32_t
output
=
0
;
if
(
!
taosMbsToUcs4
(
pVal
->
literal
,
pVal
->
node
.
resType
.
bytes
,
(
TdUcs4
*
)
varDataVal
(
pVal
->
datum
.
p
),
bytes
,
&
output
))
{
return
generateDealNodeErrMsg
(
pCxt
,
TSDB_CODE_PAR_WRONG_VALUE_TYPE
,
pVal
->
literal
);
}
varDataSetLen
(
pVal
->
datum
.
p
,
output
);
break
;
}
case
TSDB_DATA_TYPE_JSON
:
case
TSDB_DATA_TYPE_DECIMAL
:
case
TSDB_DATA_TYPE_BLOB
:
// todo
return
generateDealNodeErrMsg
(
pCxt
,
TSDB_CODE_PAR_WRONG_VALUE_TYPE
,
pVal
->
literal
);
default:
break
;
}
}
pVal
->
node
.
resType
=
targetDt
;
pVal
->
translate
=
true
;
return
DEAL_RES_CONTINUE
;
}
static
EDealRes
translateValue
(
STranslateContext
*
pCxt
,
SValueNode
*
pVal
)
{
return
translateValueImpl
(
pCxt
,
pVal
,
pVal
->
node
.
resType
);
}
static
EDealRes
translateOperator
(
STranslateContext
*
pCxt
,
SOperatorNode
*
pOp
)
{
if
(
nodesIsUnaryOp
(
pOp
))
{
if
(
OP_TYPE_MINUS
==
pOp
->
opType
)
{
...
...
@@ -635,6 +654,14 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
if
(
OP_TYPE_IN
==
pOp
->
opType
||
OP_TYPE_NOT_IN
==
pOp
->
opType
)
{
((
SExprNode
*
)
pOp
->
pRight
)
->
resType
=
((
SExprNode
*
)
pOp
->
pLeft
)
->
resType
;
}
if
(
nodesIsRegularOp
(
pOp
))
{
if
(
!
IS_STR_DATA_TYPE
(((
SExprNode
*
)(
pOp
->
pLeft
))
->
resType
.
type
))
{
return
generateDealNodeErrMsg
(
pCxt
,
TSDB_CODE_PAR_WRONG_VALUE_TYPE
,
((
SExprNode
*
)(
pOp
->
pLeft
))
->
aliasName
);
}
if
(
QUERY_NODE_VALUE
!=
nodeType
(
pOp
->
pRight
)
||
!
IS_STR_DATA_TYPE
(((
SExprNode
*
)(
pOp
->
pRight
))
->
resType
.
type
))
{
return
generateDealNodeErrMsg
(
pCxt
,
TSDB_CODE_PAR_WRONG_VALUE_TYPE
,
((
SExprNode
*
)(
pOp
->
pRight
))
->
aliasName
);
}
}
pOp
->
node
.
resType
.
type
=
TSDB_DATA_TYPE_BOOL
;
pOp
->
node
.
resType
.
bytes
=
tDataTypes
[
TSDB_DATA_TYPE_BOOL
].
bytes
;
}
else
if
(
nodesIsJsonOp
(
pOp
))
{
...
...
@@ -3858,11 +3885,23 @@ static int32_t createValueFromFunction(STranslateContext* pCxt, SFunctionNode* p
return
scalarCalculateConstants
((
SNode
*
)
pFunc
,
(
SNode
**
)
pVal
);
}
static
int32_t
translateTagVal
(
STranslateContext
*
pCxt
,
SNode
*
pNode
,
SValueNode
**
pVal
)
{
static
SDataType
schemaToDataType
(
SSchema
*
pSchema
)
{
SDataType
dt
=
{.
type
=
pSchema
->
type
,
.
bytes
=
pSchema
->
bytes
,
.
precision
=
0
,
.
scale
=
0
};
if
(
TSDB_DATA_TYPE_VARCHAR
==
dt
.
type
||
TSDB_DATA_TYPE_BINARY
==
dt
.
type
||
TSDB_DATA_TYPE_VARBINARY
==
dt
.
type
)
{
dt
.
bytes
-=
VARSTR_HEADER_SIZE
;
}
else
if
(
TSDB_DATA_TYPE_NCHAR
==
dt
.
type
)
{
dt
.
bytes
=
(
dt
.
bytes
-
VARSTR_HEADER_SIZE
)
/
TSDB_NCHAR_SIZE
;
}
return
dt
;
}
static
int32_t
translateTagVal
(
STranslateContext
*
pCxt
,
SSchema
*
pSchema
,
SNode
*
pNode
,
SValueNode
**
pVal
)
{
if
(
QUERY_NODE_FUNCTION
==
nodeType
(
pNode
))
{
return
createValueFromFunction
(
pCxt
,
(
SFunctionNode
*
)
pNode
,
pVal
);
}
else
if
(
QUERY_NODE_VALUE
==
nodeType
(
pNode
))
{
return
(
DEAL_RES_ERROR
==
translateValue
(
pCxt
,
(
SValueNode
*
)
pNode
)
?
pCxt
->
errCode
:
TSDB_CODE_SUCCESS
);
return
(
DEAL_RES_ERROR
==
translateValueImpl
(
pCxt
,
(
SValueNode
*
)
pNode
,
schemaToDataType
(
pSchema
))
?
pCxt
->
errCode
:
TSDB_CODE_SUCCESS
);
}
else
{
return
TSDB_CODE_FAILED
;
}
...
...
@@ -3891,7 +3930,7 @@ static int32_t buildKVRowForBindTags(STranslateContext* pCxt, SCreateSubTableCla
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_TAG_NAME
,
pCol
->
colName
);
}
SValueNode
*
pVal
=
NULL
;
int32_t
code
=
translateTagVal
(
pCxt
,
pNode
,
&
pVal
);
int32_t
code
=
translateTagVal
(
pCxt
,
p
Schema
,
p
Node
,
&
pVal
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
NULL
==
pVal
)
{
pVal
=
(
SValueNode
*
)
pNode
;
...
...
@@ -3921,7 +3960,7 @@ static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClau
int32_t
index
=
0
;
FOREACH
(
pNode
,
pStmt
->
pValsOfTags
)
{
SValueNode
*
pVal
=
NULL
;
int32_t
code
=
translateTagVal
(
pCxt
,
pNode
,
&
pVal
);
int32_t
code
=
translateTagVal
(
pCxt
,
p
TagSchema
+
index
,
p
Node
,
&
pVal
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
NULL
==
pVal
)
{
pVal
=
(
SValueNode
*
)
pNode
;
...
...
source/libs/parser/src/sql.c
浏览文件 @
1e32228d
此差异已折叠。
点击以展开。
source/libs/parser/test/parSelectTest.cpp
浏览文件 @
1e32228d
...
...
@@ -232,4 +232,12 @@ TEST_F(ParserSelectTest, semanticError) {
PARSER_STAGE_TRANSLATE
);
}
TEST_F
(
ParserSelectTest
,
setOperator
)
{
useDb
(
"root"
,
"test"
);
run
(
"SELECT * FROM t1 UNION ALL SELECT * FROM t1"
);
run
(
"(SELECT * FROM t1) UNION ALL (SELECT * FROM t1)"
);
}
}
// namespace ParserTest
source/libs/planner/src/planOptimizer.c
浏览文件 @
1e32228d
...
...
@@ -723,7 +723,10 @@ static int32_t opkGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimize, SNodeL
switch
(
nodeType
(
pNode
))
{
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
if
(
TSDB_SUPER_TABLE
!=
((
SScanLogicNode
*
)
pNode
)
->
pMeta
->
tableType
)
{
return
nodesListMakeAppend
(
pScanNodes
,
pNode
);
}
break
;
case
QUERY_NODE_LOGIC_PLAN_JOIN
:
code
=
opkGetScanNodesImpl
(
nodesListGetNode
(
pNode
->
pChildren
,
0
),
pNotOptimize
,
pScanNodes
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
...
...
@@ -739,6 +742,7 @@ static int32_t opkGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimize, SNodeL
if
(
1
!=
LIST_LENGTH
(
pNode
->
pChildren
))
{
*
pNotOptimize
=
true
;
return
TSDB_CODE_SUCCESS
;
}
return
opkGetScanNodesImpl
(
nodesListGetNode
(
pNode
->
pChildren
,
0
),
pNotOptimize
,
pScanNodes
);
...
...
source/libs/planner/test/planSubqueryTest.cpp
浏览文件 @
1e32228d
...
...
@@ -23,12 +23,15 @@ class PlanSubqeuryTest : public PlannerTestBase {};
TEST_F
(
PlanSubqeuryTest
,
basic
)
{
useDb
(
"root"
,
"test"
);
run
(
"select * from (select * from t1)"
);
run
(
"SELECT * FROM (SELECT * FROM t1)"
);
// run("SELECT LAST(c1) FROM ( SELECT * FROM t1)");
}
TEST_F
(
PlanSubqeuryTest
,
doubleGroupBy
)
{
useDb
(
"root"
,
"test"
);
run
(
"select count(*) from (select c1 + c3 a, c1 + count(*) b from t1 where c2 = 'abc' group by c1, c3) where a > 100 "
"group by b"
);
run
(
"SELECT COUNT(*) FROM ("
"SELECT c1 + c3 a, c1 + COUNT(*) b FROM t1 WHERE c2 = 'abc' GROUP BY c1, c3) "
"WHERE a > 100 GROUP BY b"
);
}
source/libs/scalar/src/scalar.c
浏览文件 @
1e32228d
...
...
@@ -27,6 +27,7 @@ void sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode) {
valueNode
->
datum
.
i
=
0
;
}
taosMemoryFree
(
timeStr
);
valueNode
->
typeData
=
valueNode
->
datum
.
i
;
valueNode
->
node
.
resType
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
valueNode
->
node
.
resType
.
bytes
=
tDataTypes
[
TSDB_DATA_TYPE_TIMESTAMP
].
bytes
;
...
...
source/libs/sync/src/syncIO.c
浏览文件 @
1e32228d
...
...
@@ -92,7 +92,7 @@ int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) {
syncRpcMsgLog2
((
char
*
)
"==syncIOEqMsg=="
,
pMsg
);
SRpcMsg
*
pTemp
;
pTemp
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
));
pTemp
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
)
,
DEF_QITEM
);
memcpy
(
pTemp
,
pMsg
,
sizeof
(
SRpcMsg
));
STaosQueue
*
pMsgQ
=
queue
;
...
...
@@ -360,7 +360,7 @@ static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
syncRpcMsgLog2
((
char
*
)
"==syncIOProcessRequest=="
,
pMsg
);
SSyncIO
*
io
=
pParent
;
SRpcMsg
*
pTemp
;
pTemp
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
));
pTemp
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
)
,
DEF_QITEM
);
memcpy
(
pTemp
,
pMsg
,
sizeof
(
SRpcMsg
));
taosWriteQitem
(
io
->
pMsgQ
,
pTemp
);
}
...
...
@@ -420,7 +420,7 @@ static void syncIOTickQ(void *param, void *tmrId) {
SRpcMsg
rpcMsg
;
syncPingReply2RpcMsg
(
pMsg
,
&
rpcMsg
);
SRpcMsg
*
pTemp
;
pTemp
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
));
pTemp
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
)
,
DEF_QITEM
);
memcpy
(
pTemp
,
&
rpcMsg
,
sizeof
(
SRpcMsg
));
syncRpcMsgLog2
((
char
*
)
"==syncIOTickQ=="
,
&
rpcMsg
);
taosWriteQitem
(
io
->
pMsgQ
,
pTemp
);
...
...
source/libs/tdb/CMakeLists.txt
浏览文件 @
1e32228d
# tdb
add_library
(
tdb S
TATIC
""
)
add_library
(
tdb S
HARED
""
)
target_sources
(
tdb
PRIVATE
"src/db/tdbPCache.c"
...
...
source/libs/transport/test/pushServer.c
浏览文件 @
1e32228d
...
...
@@ -114,7 +114,7 @@ int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char
void
processRequestMsg
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SRpcMsg
*
pTemp
;
pTemp
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
));
pTemp
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
)
,
DEF_QITEM
);
memcpy
(
pTemp
,
pMsg
,
sizeof
(
SRpcMsg
));
tDebug
(
"request is received, type:%d, contLen:%d, item:%p"
,
pMsg
->
msgType
,
pMsg
->
contLen
,
pTemp
);
...
...
source/libs/transport/test/rserver.c
浏览文件 @
1e32228d
...
...
@@ -103,7 +103,7 @@ int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char
void
processRequestMsg
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SRpcMsg
*
pTemp
;
pTemp
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
));
pTemp
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
)
,
DEF_QITEM
);
memcpy
(
pTemp
,
pMsg
,
sizeof
(
SRpcMsg
));
tDebug
(
"request is received, type:%d, contLen:%d, item:%p"
,
pMsg
->
msgType
,
pMsg
->
contLen
,
pTemp
);
...
...
source/util/src/terror.c
浏览文件 @
1e32228d
...
...
@@ -95,6 +95,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CFG_NOT_FOUND, "Config not found")
TAOS_DEFINE_ERROR
(
TSDB_CODE_REPEAT_INIT
,
"Repeat initialization"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_DUP_KEY
,
"Cannot add duplicate keys to hash"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_NEED_RETRY
,
"Retry needed"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE
,
"Out of memory in rpc queue"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_REF_NO_MEMORY
,
"Ref out of memory"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_REF_FULL
,
"too many Ref Objs"
)
...
...
source/util/src/tprocess.c
浏览文件 @
1e32228d
...
...
@@ -258,8 +258,8 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
int16_t
headLen
=
CEIL8
(
rawHeadLen
);
int32_t
bodyLen
=
CEIL8
(
rawBodyLen
);
void
*
pHead
=
(
*
mallocHeadFp
)(
headLen
);
void
*
pBody
=
(
*
mallocBodyFp
)(
bodyLen
);
void
*
pHead
=
(
*
mallocHeadFp
)(
headLen
,
RPC_QITEM
);
void
*
pBody
=
(
*
mallocBodyFp
)(
bodyLen
,
RPC_QITEM
);
if
(
pHead
==
NULL
||
pBody
==
NULL
)
{
taosThreadMutexUnlock
(
&
pQueue
->
mutex
);
tsem_post
(
&
pQueue
->
sem
);
...
...
source/util/src/tqueue.c
浏览文件 @
1e32228d
...
...
@@ -18,18 +18,21 @@
#include "taoserror.h"
#include "tlog.h"
int64_t
tsRpcQueueMemoryAllowed
=
0
;
int64_t
tsRpcQueueMemoryUsed
=
0
;
typedef
struct
STaosQnode
STaosQnode
;
typedef
struct
STaosQnode
{
STaosQnode
*
next
;
STaosQueue
*
queue
;
int32_t
size
;
int8_t
itype
;
int8_t
reserved
[
3
];
char
item
[];
}
STaosQnode
;
typedef
struct
STaosQueue
{
int32_t
itemSize
;
int32_t
numOfItems
;
int32_t
threadId
;
STaosQnode
*
head
;
STaosQnode
*
tail
;
STaosQueue
*
next
;
// for queue set
...
...
@@ -38,21 +41,22 @@ typedef struct STaosQueue {
FItem
itemFp
;
FItems
itemsFp
;
TdThreadMutex
mutex
;
int64_t
memOfItems
;
int32_t
numOfItems
;
}
STaosQueue
;
typedef
struct
STaosQset
{
STaosQueue
*
head
;
STaosQueue
*
current
;
TdThreadMutex
mutex
;
tsem_t
sem
;
int32_t
numOfQueues
;
int32_t
numOfItems
;
tsem_t
sem
;
}
STaosQset
;
typedef
struct
STaosQall
{
STaosQnode
*
current
;
STaosQnode
*
start
;
int32_t
itemSize
;
int32_t
numOfItems
;
}
STaosQall
;
...
...
@@ -118,32 +122,61 @@ bool taosQueueEmpty(STaosQueue *queue) {
return
empty
;
}
int32_t
taosQueueSize
(
STaosQueue
*
queue
)
{
int32_t
taosQueueItemSize
(
STaosQueue
*
queue
)
{
if
(
queue
==
NULL
)
return
0
;
taosThreadMutexLock
(
&
queue
->
mutex
);
int32_t
numOfItems
=
queue
->
numOfItems
;
taosThreadMutexUnlock
(
&
queue
->
mutex
);
return
numOfItems
;
}
void
*
taosAllocateQitem
(
int32_t
size
)
{
int64_t
taosQueueMemorySize
(
STaosQueue
*
queue
)
{
if
(
queue
==
NULL
)
return
0
;
taosThreadMutexLock
(
&
queue
->
mutex
);
int64_t
memOfItems
=
queue
->
memOfItems
;
taosThreadMutexUnlock
(
&
queue
->
mutex
);
return
memOfItems
;
}
void
*
taosAllocateQitem
(
int32_t
size
,
EQItype
itype
)
{
STaosQnode
*
pNode
=
taosMemoryCalloc
(
1
,
sizeof
(
STaosQnode
)
+
size
);
pNode
->
size
=
size
;
pNode
->
itype
=
itype
;
if
(
pNode
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
if
(
itype
==
RPC_QITEM
)
{
int64_t
alloced
=
atomic_add_fetch_64
(
&
tsRpcQueueMemoryUsed
,
size
);
if
(
alloced
>
tsRpcQueueMemoryUsed
)
{
taosMemoryFree
(
pNode
);
terrno
=
TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE
;
return
NULL
;
}
uTrace
(
"item:%p, node:%p is allocated, alloc:%"
PRId64
,
pNode
->
item
,
pNode
,
alloced
);
}
else
{
uTrace
(
"item:%p, node:%p is allocated"
,
pNode
->
item
,
pNode
);
}
return
(
void
*
)
pNode
->
item
;
}
void
taosFreeQitem
(
void
*
pItem
)
{
if
(
pItem
==
NULL
)
return
;
char
*
temp
=
pItem
;
temp
-=
sizeof
(
STaosQnode
);
uTrace
(
"item:%p, node:%p is freed"
,
pItem
,
temp
);
taosMemoryFree
(
temp
);
STaosQnode
*
pNode
=
(
STaosQnode
*
)((
char
*
)
pItem
-
sizeof
(
STaosQnode
));
if
(
pNode
->
itype
>
0
)
{
int64_t
alloced
=
atomic_sub_fetch_64
(
&
tsRpcQueueMemoryUsed
,
pNode
->
size
);
uTrace
(
"item:%p, node:%p is freed, alloc:%"
PRId64
,
pItem
,
pNode
,
alloced
);
}
else
{
uTrace
(
"item:%p, node:%p is freed"
,
pItem
,
pNode
);
}
taosMemoryFree
(
pNode
);
}
void
taosWriteQitem
(
STaosQueue
*
queue
,
void
*
pItem
)
{
...
...
@@ -161,8 +194,9 @@ void taosWriteQitem(STaosQueue *queue, void *pItem) {
}
queue
->
numOfItems
++
;
queue
->
memOfItems
+=
pNode
->
size
;
if
(
queue
->
qset
)
atomic_add_fetch_32
(
&
queue
->
qset
->
numOfItems
,
1
);
uTrace
(
"item:%p is put into queue:%p, items:%d
"
,
pItem
,
queue
,
queue
->
nu
mOfItems
);
uTrace
(
"item:%p is put into queue:%p, items:%d
mem:%"
PRId64
,
pItem
,
queue
,
queue
->
numOfItems
,
queue
->
me
mOfItems
);
taosThreadMutexUnlock
(
&
queue
->
mutex
);
...
...
@@ -181,9 +215,11 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
queue
->
head
=
pNode
->
next
;
if
(
queue
->
head
==
NULL
)
queue
->
tail
=
NULL
;
queue
->
numOfItems
--
;
queue
->
memOfItems
-=
pNode
->
size
;
if
(
queue
->
qset
)
atomic_sub_fetch_32
(
&
queue
->
qset
->
numOfItems
,
1
);
code
=
1
;
uTrace
(
"item:%p is read out from queue:%p, items:%d"
,
*
ppItem
,
queue
,
queue
->
numOfItems
);
uTrace
(
"item:%p is read out from queue:%p, items:%d mem:%"
PRId64
,
*
ppItem
,
queue
,
queue
->
numOfItems
,
queue
->
memOfItems
);
}
taosThreadMutexUnlock
(
&
queue
->
mutex
);
...
...
@@ -191,7 +227,13 @@ int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
return
code
;
}
STaosQall
*
taosAllocateQall
()
{
return
taosMemoryCalloc
(
1
,
sizeof
(
STaosQall
));
}
STaosQall
*
taosAllocateQall
()
{
STaosQall
*
qall
=
taosMemoryCalloc
(
1
,
sizeof
(
STaosQall
));
if
(
qall
!=
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
}
return
qall
;
}
void
taosFreeQall
(
STaosQall
*
qall
)
{
taosMemoryFree
(
qall
);
}
...
...
@@ -207,12 +249,12 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
qall
->
current
=
queue
->
head
;
qall
->
start
=
queue
->
head
;
qall
->
numOfItems
=
queue
->
numOfItems
;
qall
->
itemSize
=
queue
->
itemSize
;
code
=
qall
->
numOfItems
;
queue
->
head
=
NULL
;
queue
->
tail
=
NULL
;
queue
->
numOfItems
=
0
;
queue
->
memOfItems
=
0
;
if
(
queue
->
qset
)
atomic_sub_fetch_32
(
&
queue
->
qset
->
numOfItems
,
qall
->
numOfItems
);
}
...
...
@@ -377,9 +419,11 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FI
queue
->
head
=
pNode
->
next
;
if
(
queue
->
head
==
NULL
)
queue
->
tail
=
NULL
;
queue
->
numOfItems
--
;
queue
->
memOfItems
-=
pNode
->
size
;
atomic_sub_fetch_32
(
&
qset
->
numOfItems
,
1
);
code
=
1
;
uTrace
(
"item:%p is read out from queue:%p, items:%d"
,
*
ppItem
,
queue
,
queue
->
numOfItems
);
uTrace
(
"item:%p is read out from queue:%p, items:%d mem:%"
PRId64
,
*
ppItem
,
queue
,
queue
->
numOfItems
,
queue
->
memOfItems
);
}
taosThreadMutexUnlock
(
&
queue
->
mutex
);
...
...
@@ -411,7 +455,6 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand
qall
->
current
=
queue
->
head
;
qall
->
start
=
queue
->
head
;
qall
->
numOfItems
=
queue
->
numOfItems
;
qall
->
itemSize
=
queue
->
itemSize
;
code
=
qall
->
numOfItems
;
if
(
ahandle
)
*
ahandle
=
queue
->
ahandle
;
if
(
itemsFp
)
*
itemsFp
=
queue
->
itemsFp
;
...
...
@@ -419,6 +462,7 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand
queue
->
head
=
NULL
;
queue
->
tail
=
NULL
;
queue
->
numOfItems
=
0
;
queue
->
memOfItems
=
0
;
atomic_sub_fetch_32
(
&
qset
->
numOfItems
,
qall
->
numOfItems
);
for
(
int32_t
j
=
1
;
j
<
qall
->
numOfItems
;
++
j
)
{
tsem_wait
(
&
qset
->
sem
);
...
...
@@ -444,23 +488,3 @@ void taosResetQsetThread(STaosQset *qset, void *pItem) {
}
taosThreadMutexUnlock
(
&
qset
->
mutex
);
}
int32_t
taosGetQueueItemsNumber
(
STaosQueue
*
queue
)
{
if
(
!
queue
)
return
0
;
int32_t
num
;
taosThreadMutexLock
(
&
queue
->
mutex
);
num
=
queue
->
numOfItems
;
taosThreadMutexUnlock
(
&
queue
->
mutex
);
return
num
;
}
int32_t
taosGetQsetItemsNumber
(
STaosQset
*
qset
)
{
if
(
!
qset
)
return
0
;
int32_t
num
=
0
;
taosThreadMutexLock
(
&
qset
->
mutex
);
num
=
qset
->
numOfItems
;
taosThreadMutexUnlock
(
&
qset
->
mutex
);
return
num
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录