Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e777ee72
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e777ee72
编写于
1月 25, 2022
作者:
S
Shengliang
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/mnode
上级
808efe80
2e56a0c3
变更
41
隐藏空白更改
内联
并排
Showing
41 changed file
with
560 addition
and
594 deletion
+560
-594
2.0/src/query/src/queryMain.c
2.0/src/query/src/queryMain.c
+2
-2
include/common/tmsg.h
include/common/tmsg.h
+24
-17
include/libs/executor/executor.h
include/libs/executor/executor.h
+1
-1
include/libs/parser/parser.h
include/libs/parser/parser.h
+1
-0
include/os/os.h
include/os/os.h
+1
-0
include/os/osDef.h
include/os/osDef.h
+2
-1
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+23
-7
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+48
-3
source/dnode/mgmt/impl/src/dndMgmt.c
source/dnode/mgmt/impl/src/dndMgmt.c
+2
-0
source/dnode/mgmt/impl/src/dndTransport.c
source/dnode/mgmt/impl/src/dndTransport.c
+5
-0
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+13
-8
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+16
-60
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+65
-36
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+9
-8
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+6
-5
source/dnode/vnode/src/vnd/vnodeMgr.c
source/dnode/vnode/src/vnd/vnodeMgr.c
+2
-0
source/dnode/vnode/src/vnd/vnodeWrite.c
source/dnode/vnode/src/vnd/vnodeWrite.c
+2
-1
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+4
-9
source/libs/executor/src/dataDispatcher.c
source/libs/executor/src/dataDispatcher.c
+9
-1
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+3
-3
source/libs/executor/src/executorMain.c
source/libs/executor/src/executorMain.c
+30
-30
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+33
-280
source/libs/parser/inc/dataBlockMgt.h
source/libs/parser/inc/dataBlockMgt.h
+2
-0
source/libs/parser/src/astValidate.c
source/libs/parser/src/astValidate.c
+25
-7
source/libs/parser/src/dataBlockMgt.c
source/libs/parser/src/dataBlockMgt.c
+20
-3
source/libs/parser/src/insertParser.c
source/libs/parser/src/insertParser.c
+19
-1
source/libs/parser/src/parser.c
source/libs/parser/src/parser.c
+8
-3
source/libs/parser/src/parserUtil.c
source/libs/parser/src/parserUtil.c
+1
-11
source/libs/parser/src/queryInfoUtil.c
source/libs/parser/src/queryInfoUtil.c
+6
-1
source/libs/planner/src/logicPlan.c
source/libs/planner/src/logicPlan.c
+9
-5
source/libs/planner/src/physicalPlan.c
source/libs/planner/src/physicalPlan.c
+36
-0
source/libs/planner/src/physicalPlanJson.c
source/libs/planner/src/physicalPlanJson.c
+7
-3
source/libs/planner/src/planner.c
source/libs/planner/src/planner.c
+1
-19
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+1
-1
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+1
-3
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+5
-4
source/libs/transport/src/rpcMain.c
source/libs/transport/src/rpcMain.c
+6
-3
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+75
-39
source/libs/transport/src/transComm.c
source/libs/transport/src/transComm.c
+7
-0
source/libs/transport/src/transSrv.c
source/libs/transport/src/transSrv.c
+29
-18
source/libs/transport/test/rclient.c
source/libs/transport/test/rclient.c
+1
-1
未找到文件。
2.0/src/query/src/queryMain.c
浏览文件 @
e777ee72
...
...
@@ -461,7 +461,7 @@ int32_t qKillQuery(qinfo_t qinfo) {
}
qDebug
(
"QInfo:0x%"
PRIx64
" query killed"
,
pQInfo
->
qId
);
set
Query
Killed
(
pQInfo
);
set
Task
Killed
(
pQInfo
);
// Wait for the query executing thread being stopped/
// Once the query is stopped, the owner of qHandle will be cleared immediately.
...
...
@@ -634,7 +634,7 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
qWarn
(
"QId:0x%"
PRIx64
" be killed(no memory commit)."
,
pQInfo
->
qId
);
set
Query
Killed
(
pQInfo
);
set
Task
Killed
(
pQInfo
);
// wait query stop
int32_t
loop
=
0
;
...
...
include/common/tmsg.h
浏览文件 @
e777ee72
...
...
@@ -1519,15 +1519,17 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
}
typedef
struct
SMqSetCVgReq
{
int32_t
vgId
;
int64_t
oldConsumerId
;
int64_t
newConsumerId
;
char
topicName
[
TSDB_TOPIC_FNAME_LEN
];
char
cgroup
[
TSDB_CONSUMER_GROUP_LEN
];
char
*
sql
;
char
*
logicalPlan
;
char
*
physicalPlan
;
SSubQueryMsg
msg
;
int32_t
vgId
;
int64_t
oldConsumerId
;
int64_t
newConsumerId
;
char
topicName
[
TSDB_TOPIC_FNAME_LEN
];
char
cgroup
[
TSDB_CONSUMER_GROUP_LEN
];
char
*
sql
;
char
*
logicalPlan
;
char
*
physicalPlan
;
uint32_t
qmsgLen
;
void
*
qmsg
;
//SSubQueryMsg msg;
}
SMqSetCVgReq
;
static
FORCE_INLINE
int32_t
tEncodeSSubQueryMsg
(
void
**
buf
,
const
SSubQueryMsg
*
pMsg
)
{
...
...
@@ -1536,7 +1538,7 @@ static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg*
tlen
+=
taosEncodeFixedU64
(
buf
,
pMsg
->
queryId
);
tlen
+=
taosEncodeFixedU64
(
buf
,
pMsg
->
taskId
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pMsg
->
contentLen
);
tlen
+=
taosEncodeBinary
(
buf
,
pMsg
->
msg
,
pMsg
->
contentLen
);
//
tlen += taosEncodeBinary(buf, pMsg->msg, pMsg->contentLen);
return
tlen
;
}
...
...
@@ -1545,7 +1547,7 @@ static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) {
buf
=
taosDecodeFixedU64
(
buf
,
&
pMsg
->
queryId
);
buf
=
taosDecodeFixedU64
(
buf
,
&
pMsg
->
taskId
);
buf
=
taosDecodeFixedU32
(
buf
,
&
pMsg
->
contentLen
);
buf
=
taosDecodeBinaryTo
(
buf
,
pMsg
->
msg
,
pMsg
->
contentLen
);
//
buf = taosDecodeBinaryTo(buf, pMsg->msg, pMsg->contentLen);
return
buf
;
}
...
...
@@ -1559,7 +1561,9 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
tlen
+=
taosEncodeString
(
buf
,
pReq
->
sql
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
logicalPlan
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
physicalPlan
);
tlen
+=
tEncodeSSubQueryMsg
(
buf
,
&
pReq
->
msg
);
tlen
+=
taosEncodeFixedU32
(
buf
,
pReq
->
qmsgLen
);
tlen
+=
taosEncodeBinary
(
buf
,
pReq
->
qmsg
,
pReq
->
qmsgLen
);
//tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
return
tlen
;
}
...
...
@@ -1572,15 +1576,18 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
buf
=
taosDecodeString
(
buf
,
&
pReq
->
sql
);
buf
=
taosDecodeString
(
buf
,
&
pReq
->
logicalPlan
);
buf
=
taosDecodeString
(
buf
,
&
pReq
->
physicalPlan
);
buf
=
tDecodeSSubQueryMsg
(
buf
,
&
pReq
->
msg
);
buf
=
taosDecodeFixedU32
(
buf
,
&
pReq
->
qmsgLen
);
buf
=
taosDecodeBinary
(
buf
,
&
pReq
->
qmsg
,
pReq
->
qmsgLen
);
//buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
return
buf
;
}
typedef
struct
SMqSetCVgRsp
{
int32_t
vgId
;
int64_t
consumerId
;
char
topicName
[
TSDB_TOPIC_FNAME_LEN
];
char
cGroup
[
TSDB_CONSUMER_GROUP_LEN
];
SMsgHead
header
;
int32_t
vgId
;
int64_t
consumerId
;
char
topicName
[
TSDB_TOPIC_FNAME_LEN
];
char
cGroup
[
TSDB_CONSUMER_GROUP_LEN
];
}
SMqSetCVgRsp
;
typedef
struct
SMqColData
{
...
...
include/libs/executor/executor.h
浏览文件 @
e777ee72
...
...
@@ -32,7 +32,7 @@ struct SSubplan;
* @param streamReadHandle
* @return
*/
qTaskInfo_t
qCreateStreamExecTaskInfo
(
SSubQueryMsg
*
pM
sg
,
void
*
streamReadHandle
);
qTaskInfo_t
qCreateStreamExecTaskInfo
(
void
*
m
sg
,
void
*
streamReadHandle
);
int32_t
qSetStreamInput
(
qTaskInfo_t
tinfo
,
void
*
input
);
...
...
include/libs/parser/parser.h
浏览文件 @
e777ee72
...
...
@@ -74,6 +74,7 @@ void columnListCopy(SArray* dst, const SArray* src, uint64_t uid);
void
columnListDestroy
(
SArray
*
pColumnList
);
void
dropAllExprInfo
(
SArray
**
pExprInfo
,
int32_t
numOfLevel
);
void
dropOneLevelExprInfo
(
SArray
*
pExprInfo
);
typedef
struct
SSourceParam
{
SArray
*
pExprNodeList
;
//Array<struct tExprNode*>
...
...
include/os/os.h
浏览文件 @
e777ee72
...
...
@@ -51,6 +51,7 @@ extern "C" {
#include <libgen.h>
#include <sys/mman.h>
#include <sys/prctl.h>
#include "osAtomic.h"
#include "osDef.h"
...
...
include/os/osDef.h
浏览文件 @
e777ee72
...
...
@@ -181,7 +181,8 @@ extern "C" {
#endif
#else
// Windows
#define setThreadName(name)
// #define setThreadName(name)
#define setThreadName(name) do { prctl(PR_SET_NAME, (name)); } while (0)
#endif
#if defined(_WIN32)
...
...
source/client/src/clientImpl.c
浏览文件 @
e777ee72
...
...
@@ -218,6 +218,7 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag,
if
(
pQueryNode
->
type
==
TSDB_SQL_SELECT
)
{
setResSchemaInfo
(
&
pRequest
->
body
.
resInfo
,
pSchema
,
numOfCols
);
tfree
(
pSchema
);
pRequest
->
type
=
TDMT_VND_QUERY
;
}
else
{
tfree
(
pSchema
);
...
...
@@ -313,10 +314,10 @@ tmq_conf_t* tmq_conf_new() {
}
int32_t
tmq_conf_set
(
tmq_conf_t
*
conf
,
const
char
*
key
,
const
char
*
value
)
{
if
(
strcmp
(
key
,
"group.id"
))
{
if
(
strcmp
(
key
,
"group.id"
)
==
0
)
{
strcpy
(
conf
->
groupId
,
value
);
}
if
(
strcmp
(
key
,
"client.id"
))
{
if
(
strcmp
(
key
,
"client.id"
)
==
0
)
{
strcpy
(
conf
->
clientId
,
value
);
}
return
0
;
...
...
@@ -364,7 +365,7 @@ tmq_list_t* tmq_list_new() {
int32_t
tmq_list_append
(
tmq_list_t
*
ptr
,
char
*
src
)
{
if
(
ptr
->
cnt
>=
ptr
->
tot
-
1
)
return
-
1
;
ptr
->
elems
[
ptr
->
cnt
]
=
s
rc
;
ptr
->
elems
[
ptr
->
cnt
]
=
s
trdup
(
src
)
;
ptr
->
cnt
++
;
return
0
;
}
...
...
@@ -376,8 +377,23 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
int32_t
sz
=
topic_list
->
cnt
;
tmq
->
clientTopics
=
taosArrayInit
(
sz
,
sizeof
(
void
*
));
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
char
*
topicName
=
strdup
(
topic_list
->
elems
[
i
]);
taosArrayPush
(
tmq
->
clientTopics
,
&
topicName
);
char
*
topicName
=
topic_list
->
elems
[
i
];
SName
name
=
{
0
};
char
*
dbName
=
getDbOfConnection
(
tmq
->
pTscObj
);
tNameSetDbName
(
&
name
,
tmq
->
pTscObj
->
acctId
,
dbName
,
strlen
(
dbName
));
tNameFromString
(
&
name
,
topicName
,
T_NAME_TABLE
);
char
*
topicFname
=
calloc
(
1
,
TSDB_TOPIC_FNAME_LEN
);
if
(
topicFname
==
NULL
)
{
}
tNameExtractFullName
(
&
name
,
topicFname
);
tscDebug
(
"subscribe topic: %s"
,
topicFname
);
taosArrayPush
(
tmq
->
clientTopics
,
&
topicFname
);
/*SMqClientTopic topic = {*/
/*.*/
/*};*/
}
SCMSubscribeReq
req
;
req
.
topicNum
=
taosArrayGetSize
(
tmq
->
clientTopics
);
...
...
@@ -401,7 +417,7 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
}
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){
.
pData
=
buf
,
.
len
=
tlen
};
pRequest
->
type
=
TDMT_MND_
CREATE_TOPIC
;
pRequest
->
type
=
TDMT_MND_
SUBSCRIBE
;
SMsgSendInfo
*
body
=
buildMsgInfoImpl
(
pRequest
);
SEpSet
epSet
=
getEpSet_s
(
&
tmq
->
pTscObj
->
pAppInfo
->
mgmtEp
);
...
...
@@ -534,7 +550,7 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql,
SCMCreateTopicReq
req
=
{
.
name
=
(
char
*
)
topicFname
,
.
igExists
=
0
,
.
igExists
=
1
,
.
physicalPlan
=
(
char
*
)
pStr
,
.
sql
=
(
char
*
)
sql
,
.
logicalPlan
=
"no logic plan"
,
...
...
source/client/test/clientTests.cpp
浏览文件 @
e777ee72
...
...
@@ -458,10 +458,9 @@ TEST(testCase, show_table_Test) {
assert
(
pConn
!=
NULL
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"show tables"
);
ASSERT_NE
(
taos_errno
(
pRes
),
0
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"expected failed to show tables, reason:%s
\n
"
,
taos_errstr
(
pRes
));
printf
(
"failed to show tables, reason:%s
\n
"
,
taos_errstr
(
pRes
));
taos_free_result
(
pRes
);
}
taos_free_result
(
pRes
);
...
...
@@ -537,6 +536,7 @@ TEST(testCase, create_topic_Test) {
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
taos_free_result
(
pRes
);
TAOS_FIELD
*
pFields
=
taos_fetch_fields
(
pRes
);
ASSERT_TRUE
(
pFields
==
nullptr
);
...
...
@@ -570,6 +570,51 @@ TEST(testCase, insert_test) {
taos_close
(
pConn
);
}
#if 0
TEST(testCase, tmq_subscribe_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg1");
tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_topic_1");
tmq_subscribe(tmq, topic_list);
while (1) {
tmq_message_t* msg = tmq_consume_poll(tmq, 0);
printf("get msg\n");
if (msg == NULL) break;
}
}
TEST(testCase, tmq_consume_Test) {
}
TEST(testCase, tmq_commit_TEST) {
}
#endif
//TEST(testCase, insert_test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_NE(pConn, nullptr);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "insert into t_2 values(now, 1)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
TEST
(
testCase
,
projection_query_tables
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
ASSERT_NE
(
pConn
,
nullptr
);
...
...
source/dnode/mgmt/impl/src/dndMgmt.c
浏览文件 @
e777ee72
...
...
@@ -487,6 +487,8 @@ static void *dnodeThreadRoutine(void *param) {
SDnodeMgmt
*
pMgmt
=
&
pDnode
->
dmgmt
;
int32_t
ms
=
pDnode
->
cfg
.
statusInterval
*
1000
;
setThreadName
(
"dnode-hb"
);
while
(
true
)
{
pthread_testcancel
();
taosMsleep
(
ms
);
...
...
source/dnode/mgmt/impl/src/dndTransport.c
浏览文件 @
e777ee72
...
...
@@ -112,6 +112,9 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_TOPIC
)]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_ALTER_TOPIC
)]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_TOPIC
)]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_SUBSCRIBE
)]
=
dndProcessMnodeWriteMsg
;
/*pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;*/
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_MQ_SET_CONN_RSP
)]
=
dndProcessMnodeWriteMsg
;
// Requests handled by VNODE
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_SUBMIT
)]
=
dndProcessVnodeWriteMsg
;
...
...
@@ -142,6 +145,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_DROP_TABLE
)]
=
dndProcessVnodeWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_SHOW_TABLES
)]
=
dndProcessVnodeFetchMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_SHOW_TABLES_FETCH
)]
=
dndProcessVnodeFetchMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_MQ_SET_CONN
)]
=
dndProcessVnodeWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_MQ_SET_CUR
)]
=
dndProcessVnodeFetchMsg
;
}
static
void
dndProcessResponse
(
void
*
parent
,
SRpcMsg
*
pRsp
,
SEpSet
*
pEpSet
)
{
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
e777ee72
...
...
@@ -363,8 +363,9 @@ typedef struct SMqConsumerEp {
int64_t
consumerId
;
// -1 for unassigned
int64_t
lastConsumerHbTs
;
int64_t
lastVgHbTs
;
int32_t
execLen
;
SSubQueryMsg
qExec
;
uint32_t
qmsgLen
;
char
*
qmsg
;
//SSubQueryMsg qExec;
}
SMqConsumerEp
;
static
FORCE_INLINE
int32_t
tEncodeSMqConsumerEp
(
void
**
buf
,
SMqConsumerEp
*
pConsumerEp
)
{
...
...
@@ -373,7 +374,9 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pCon
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumerEp
->
status
);
tlen
+=
taosEncodeSEpSet
(
buf
,
&
pConsumerEp
->
epSet
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumerEp
->
consumerId
);
tlen
+=
tEncodeSSubQueryMsg
(
buf
,
&
pConsumerEp
->
qExec
);
//tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec);
tlen
+=
taosEncodeFixedU32
(
buf
,
pConsumerEp
->
qmsgLen
);
tlen
+=
taosEncodeBinary
(
buf
,
pConsumerEp
->
qmsg
,
pConsumerEp
->
qmsgLen
);
return
tlen
;
}
...
...
@@ -382,8 +385,9 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumerEp
->
status
);
buf
=
taosDecodeSEpSet
(
buf
,
&
pConsumerEp
->
epSet
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumerEp
->
consumerId
);
buf
=
tDecodeSSubQueryMsg
(
buf
,
&
pConsumerEp
->
qExec
);
pConsumerEp
->
execLen
=
sizeof
(
SSubQueryMsg
)
+
pConsumerEp
->
qExec
.
contentLen
;
//buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec);
buf
=
taosDecodeFixedU32
(
buf
,
&
pConsumerEp
->
qmsgLen
);
buf
=
taosDecodeBinary
(
buf
,
(
void
**
)
&
pConsumerEp
->
qmsg
,
pConsumerEp
->
qmsgLen
);
return
buf
;
}
...
...
@@ -402,11 +406,12 @@ typedef struct SMqSubscribeObj {
static
FORCE_INLINE
SMqSubscribeObj
*
tNewSubscribeObj
()
{
SMqSubscribeObj
*
pSub
=
malloc
(
sizeof
(
SMqSubscribeObj
));
pSub
->
key
[
0
]
=
0
;
pSub
->
epoch
=
0
;
if
(
pSub
==
NULL
)
{
return
NULL
;
}
pSub
->
key
[
0
]
=
0
;
pSub
->
epoch
=
0
;
pSub
->
availConsumer
=
taosArrayInit
(
0
,
sizeof
(
int64_t
));
if
(
pSub
->
availConsumer
==
NULL
)
{
free
(
pSub
);
...
...
@@ -433,7 +438,7 @@ static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() {
free
(
pSub
);
return
NULL
;
}
return
NULL
;
return
pSub
;
}
static
FORCE_INLINE
int32_t
tEncodeSubscribeObj
(
void
**
buf
,
const
SMqSubscribeObj
*
pSub
)
{
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
e777ee72
...
...
@@ -56,7 +56,9 @@ void mndCleanupConsumer(SMnode *pMnode) {}
SSdbRaw
*
mndConsumerActionEncode
(
SMqConsumerObj
*
pConsumer
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
int32_t
tlen
=
tEncodeSMqConsumerObj
(
NULL
,
pConsumer
);
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_CONSUMER
,
MND_CONSUMER_VER_NUMBER
,
tlen
);
int32_t
size
=
sizeof
(
int32_t
)
+
tlen
+
MND_CONSUMER_RESERVE_SIZE
;
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_CONSUMER
,
MND_CONSUMER_VER_NUMBER
,
size
);
if
(
pRaw
==
NULL
)
goto
CM_ENCODE_OVER
;
void
*
buf
=
malloc
(
tlen
);
...
...
@@ -68,34 +70,6 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
int32_t
dataPos
=
0
;
SDB_SET_INT32
(
pRaw
,
dataPos
,
tlen
,
CM_ENCODE_OVER
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
buf
,
tlen
,
CM_ENCODE_OVER
);
#if 0
int32_t topicNum = taosArrayGetSize(pConsumer->topics);
SDB_SET_INT64(pRaw, dataPos, pConsumer->consumerId, CM_ENCODE_OVER);
int32_t len = strlen(pConsumer->cgroup);
SDB_SET_INT32(pRaw, dataPos, len, CM_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pConsumer->cgroup, len, CM_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, topicNum, CM_ENCODE_OVER);
for (int i = 0; i < topicNum; i++) {
int32_t len;
SMqConsumerTopic *pConsumerTopic = taosArrayGet(pConsumer->topics, i);
len = strlen(pConsumerTopic->name);
SDB_SET_INT32(pRaw, dataPos, len, CM_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, pConsumerTopic->name, len, CM_ENCODE_OVER);
int vgSize;
if (pConsumerTopic->vgroups == NULL) {
vgSize = 0;
} else {
vgSize = listNEles(pConsumerTopic->vgroups);
}
SDB_SET_INT32(pRaw, dataPos, vgSize, CM_ENCODE_OVER);
for (int j = 0; j < vgSize; j++) {
// SList* head;
/*SDB_SET_INT64(pRaw, dataPos, 0[> change to list item <]);*/
}
}
#endif
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
MND_CONSUMER_RESERVE_SIZE
,
CM_ENCODE_OVER
);
SDB_SET_DATALEN
(
pRaw
,
dataPos
,
CM_ENCODE_OVER
);
...
...
@@ -116,53 +90,35 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
int8_t
sver
=
0
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
goto
C
ONSUME
_DECODE_OVER
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
goto
C
M
_DECODE_OVER
;
if
(
sver
!=
MND_CONSUMER_VER_NUMBER
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
goto
C
ONSUME
_DECODE_OVER
;
goto
C
M
_DECODE_OVER
;
}
SSdbRow
*
pRow
=
sdbAllocRow
(
sizeof
(
SMqConsumerObj
));
if
(
pRow
==
NULL
)
goto
C
ONSUME
_DECODE_OVER
;
if
(
pRow
==
NULL
)
goto
C
M
_DECODE_OVER
;
SMqConsumerObj
*
pConsumer
=
sdbGetRowObj
(
pRow
);
if
(
pConsumer
==
NULL
)
goto
C
ONSUME
_DECODE_OVER
;
if
(
pConsumer
==
NULL
)
goto
C
M
_DECODE_OVER
;
int32_t
dataPos
=
0
;
int32_t
len
;
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
len
,
C
ONSUME
_DECODE_OVER
);
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
len
,
C
M
_DECODE_OVER
);
void
*
buf
=
malloc
(
len
);
if
(
buf
==
NULL
)
goto
CONSUME_DECODE_OVER
;
SDB_GET_BINARY
(
pRaw
,
dataPos
,
buf
,
len
,
CONSUME_DECODE_OVER
);
tDecodeSMqConsumerObj
(
buf
,
pConsumer
);
if
(
buf
==
NULL
)
goto
CM_DECODE_OVER
;
SDB_GET_BINARY
(
pRaw
,
dataPos
,
buf
,
len
,
CM_DECODE_OVER
);
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
MND_CONSUMER_RESERVE_SIZE
,
CM_DECODE_OVER
);
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
MND_CONSUMER_RESERVE_SIZE
,
CONSUME_DECODE_OVER
);
if
(
tDecodeSMqConsumerObj
(
buf
,
pConsumer
)
==
NULL
)
{
goto
CM_DECODE_OVER
;
}
terrno
=
TSDB_CODE_SUCCESS
;
#if 0
SDB_GET_INT32(pRaw, dataPos, &topicNum, CONSUME_DECODE_OVER);
for (int i = 0; i < topicNum; i++) {
int32_t topicLen;
SMqConsumerTopic *pConsumerTopic = malloc(sizeof(SMqConsumerTopic));
if (pConsumerTopic == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
// TODO
return NULL;
}
/*pConsumerTopic->vgroups = taosArrayInit(topicNum, sizeof(SMqConsumerTopic));*/
SDB_GET_INT32(pRaw, dataPos, &topicLen, CONSUME_DECODE_OVER);
SDB_GET_BINARY(pRaw, dataPos, pConsumerTopic->name, topicLen, CONSUME_DECODE_OVER);
int32_t vgSize;
SDB_GET_INT32(pRaw, dataPos, &vgSize, CONSUME_DECODE_OVER);
}
#endif
CONSUME_DECODE_OVER:
if
(
terrno
!=
0
)
{
CM_DECODE_OVER:
if
(
terrno
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"consumer:%ld, failed to decode from raw:%p since %s"
,
pConsumer
->
consumerId
,
pRaw
,
terrstr
());
tfree
(
pRow
);
return
NULL
;
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
e777ee72
...
...
@@ -55,7 +55,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
.
deleteFp
=
(
SdbDeleteFp
)
mndSubActionDelete
};
mndSetMsgHandle
(
pMnode
,
TDMT_MND_SUBSCRIBE
,
mndProcessSubscribeReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_
SUBSCRIBE
_RSP
,
mndProcessSubscribeInternalRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_
MQ_SET_CONN
_RSP
,
mndProcessSubscribeInternalRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_MQ_TIMER
,
mndProcessMqTimerMsg
);
return
sdbSetTable
(
pMnode
->
pSdb
,
table
);
}
...
...
@@ -96,25 +96,27 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
pSub
->
nextConsumerIdx
=
(
pSub
->
nextConsumerIdx
+
1
)
%
taosArrayGetSize
(
pSub
->
availConsumer
);
// build msg
SMqSetCVgReq
req
=
{
.
vgId
=
pCEp
->
vgId
,
.
oldConsumerId
=
-
1
,
.
newConsumerId
=
consumerId
,
};
strcpy
(
req
.
cgroup
,
cgroup
);
strcpy
(
req
.
topicName
,
topic
);
strcpy
(
req
.
sql
,
pTopic
->
sql
);
strcpy
(
req
.
logicalPlan
,
pTopic
->
logicalPlan
);
strcpy
(
req
.
physicalPlan
,
pTopic
->
physicalPlan
);
memcpy
(
&
req
.
msg
,
&
pCEp
->
qExec
,
pCEp
->
execLen
);
int32_t
tlen
=
tEncodeSMqSetCVgReq
(
NULL
,
&
req
);
SMqSetCVgReq
*
pReq
=
malloc
(
sizeof
(
SMqSetCVgReq
)
+
pCEp
->
qmsgLen
);
if
(
pReq
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
strcpy
(
pReq
->
cgroup
,
cgroup
);
strcpy
(
pReq
->
topicName
,
topic
);
pReq
->
sql
=
strdup
(
pTopic
->
sql
);
pReq
->
logicalPlan
=
strdup
(
pTopic
->
logicalPlan
);
pReq
->
physicalPlan
=
strdup
(
pTopic
->
physicalPlan
);
pReq
->
qmsgLen
=
pCEp
->
qmsgLen
;
memcpy
(
pReq
->
qmsg
,
pCEp
->
qmsg
,
pCEp
->
qmsgLen
);
int32_t
tlen
=
tEncodeSMqSetCVgReq
(
NULL
,
pReq
);
void
*
reqStr
=
malloc
(
tlen
);
if
(
reqStr
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
void
*
abuf
=
reqStr
;
tEncodeSMqSetCVgReq
(
abuf
,
&
r
eq
);
tEncodeSMqSetCVgReq
(
&
abuf
,
pR
eq
);
// persist msg
STransAction
action
=
{
0
};
...
...
@@ -128,6 +130,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
SSdbRaw
*
pRaw
=
mndSubActionEncode
(
pSub
);
mndTransAppendRedolog
(
pTrans
,
pRaw
);
free
(
pReq
);
tfree
(
topic
);
tfree
(
cgroup
);
}
...
...
@@ -146,6 +149,14 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
//convert phyplan to dag
SQueryDag
*
pDag
=
qStringToDag
(
pTopic
->
physicalPlan
);
SArray
*
pArray
;
SArray
*
inner
=
taosArrayGet
(
pDag
->
pSubplans
,
0
);
SSubplan
*
plan
=
taosArrayGetP
(
inner
,
0
);
plan
->
execNode
.
inUse
=
0
;
strcpy
(
plan
->
execNode
.
epAddr
[
0
].
fqdn
,
"localhost"
);
plan
->
execNode
.
epAddr
[
0
].
port
=
6030
;
plan
->
execNode
.
nodeId
=
2
;
plan
->
execNode
.
numOfEps
=
1
;
if
(
schedulerConvertDagToTaskList
(
pDag
,
&
pArray
)
<
0
)
{
return
-
1
;
}
...
...
@@ -157,11 +168,18 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
CEp
.
lastConsumerHbTs
=
CEp
.
lastVgHbTs
=
-
1
;
STaskInfo
*
pTaskInfo
=
taosArrayGet
(
pArray
,
i
);
tConvertQueryAddrToEpSet
(
&
CEp
.
epSet
,
&
pTaskInfo
->
addr
);
/*mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1], CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/
CEp
.
vgId
=
pTaskInfo
->
addr
.
nodeId
;
CEp
.
qmsgLen
=
pTaskInfo
->
msg
->
contentLen
;
CEp
.
qmsg
=
malloc
(
CEp
.
qmsgLen
);
if
(
CEp
.
qmsg
==
NULL
)
{
return
-
1
;
}
memcpy
(
CEp
.
qmsg
,
pTaskInfo
->
msg
->
msg
,
pTaskInfo
->
msg
->
contentLen
);
taosArrayPush
(
unassignedVg
,
&
CEp
);
}
qDestroyQueryDag
(
pDag
);
/*qDestroyQueryDag(pDag);*/
return
0
;
}
...
...
@@ -178,27 +196,33 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume
};
strcpy
(
req
.
cgroup
,
pConsumer
->
cgroup
);
strcpy
(
req
.
topicName
,
pTopic
->
name
);
strcpy
(
req
.
sql
,
pTopic
->
sql
)
;
strcpy
(
req
.
logicalPlan
,
pTopic
->
logicalPlan
)
;
strcpy
(
req
.
physicalPlan
,
pTopic
->
physicalPlan
)
;
req
.
sql
=
pTopic
->
sql
;
req
.
logicalPlan
=
pTopic
->
logicalPlan
;
req
.
physicalPlan
=
pTopic
->
physicalPlan
;
int32_t
tlen
=
tEncodeSMqSetCVgReq
(
NULL
,
&
req
);
void
*
reqStr
=
malloc
(
tlen
);
if
(
reqStr
==
NULL
)
{
void
*
buf
=
malloc
(
sizeof
(
SMsgHead
)
+
tlen
);
if
(
buf
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
void
*
abuf
=
reqStr
;
SMsgHead
*
pMsgHead
=
(
SMsgHead
*
)
buf
;
pMsgHead
->
contLen
=
htonl
(
sizeof
(
SMsgHead
)
+
tlen
);
pMsgHead
->
vgId
=
htonl
(
vgId
);
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
tEncodeSMqSetCVgReq
(
&
abuf
,
&
req
);
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgObj
);
action
.
pCont
=
reqStr
;
action
.
pCont
=
buf
;
action
.
contLen
=
tlen
;
action
.
msgType
=
TDMT_VND_MQ_SET_CONN
;
mndReleaseVgroup
(
pMnode
,
pVgObj
);
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
free
(
reqStr
);
free
(
buf
);
return
-
1
;
}
}
...
...
@@ -208,19 +232,18 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume
void
mndCleanupSubscribe
(
SMnode
*
pMnode
)
{}
static
SSdbRaw
*
mndSubActionEncode
(
SMqSubscribeObj
*
pSub
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
int32_t
tlen
=
tEncodeSubscribeObj
(
NULL
,
pSub
);
int32_t
size
=
tlen
+
MND_SUBSCRIBE_RESERVE_SIZE
;
int32_t
size
=
sizeof
(
int32_t
)
+
tlen
+
MND_SUBSCRIBE_RESERVE_SIZE
;
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_SUBSCRIBE
,
MND_SUBSCRIBE_VER_NUMBER
,
size
);
if
(
pRaw
==
NULL
)
goto
SUB_ENCODE_OVER
;
void
*
buf
=
malloc
(
tlen
);
if
(
buf
==
NULL
)
{
goto
SUB_ENCODE_OVER
;
}
void
*
abuf
=
buf
;
if
(
buf
==
NULL
)
goto
SUB_ENCODE_OVER
;
tEncodeSubscribeObj
(
&
buf
,
pSub
);
void
*
abuf
=
buf
;
tEncodeSubscribeObj
(
&
abuf
,
pSub
);
int32_t
dataPos
=
0
;
SDB_SET_INT32
(
pRaw
,
dataPos
,
tlen
,
SUB_ENCODE_OVER
);
...
...
@@ -228,6 +251,8 @@ static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
MND_SUBSCRIBE_RESERVE_SIZE
,
SUB_ENCODE_OVER
);
SDB_SET_DATALEN
(
pRaw
,
dataPos
,
SUB_ENCODE_OVER
);
terrno
=
TSDB_CODE_SUCCESS
;
SUB_ENCODE_OVER:
if
(
terrno
!=
0
)
{
mError
(
"subscribe:%s, failed to encode to raw:%p since %s"
,
pSub
->
key
,
pRaw
,
terrstr
());
...
...
@@ -259,9 +284,9 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
int32_t
dataPos
=
0
;
int32_t
tlen
;
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
tlen
,
SUB_DECODE_OVER
);
void
*
buf
=
malloc
(
tlen
+
1
);
if
(
buf
==
NULL
)
goto
SUB_DECODE_OVER
;
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
tlen
,
SUB_DECODE_OVER
);
SDB_GET_BINARY
(
pRaw
,
dataPos
,
buf
,
tlen
,
SUB_DECODE_OVER
);
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
MND_SUBSCRIBE_RESERVE_SIZE
,
SUB_DECODE_OVER
);
...
...
@@ -269,8 +294,10 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
goto
SUB_DECODE_OVER
;
}
terrno
=
TSDB_CODE_SUCCESS
;
SUB_DECODE_OVER:
if
(
terrno
!=
0
)
{
if
(
terrno
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"subscribe:%s, failed to decode from raw:%p since %s"
,
pSub
->
key
,
pRaw
,
terrstr
());
// TODO free subscribeobj
tfree
(
pRow
);
...
...
@@ -379,10 +406,10 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
oldTopicName
=
((
SMqConsumerTopic
*
)
taosArrayGet
(
oldSub
,
j
))
->
name
;
j
++
;
}
else
if
(
j
>=
oldTopicNum
)
{
newTopicName
=
taosArrayGet
(
newSub
,
i
);
newTopicName
=
taosArrayGet
P
(
newSub
,
i
);
i
++
;
}
else
{
newTopicName
=
taosArrayGet
(
newSub
,
i
);
newTopicName
=
taosArrayGet
P
(
newSub
,
i
);
oldTopicName
=
((
SMqConsumerTopic
*
)
taosArrayGet
(
oldSub
,
j
))
->
name
;
int
comp
=
compareLenPrefixedStr
(
newTopicName
,
oldTopicName
);
...
...
@@ -466,6 +493,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
char
*
key
=
mndMakeSubscribeKey
(
consumerGroup
,
newTopicName
);
strcpy
(
pSub
->
key
,
key
);
// set unassigned vg
mndInitUnassignedVg
(
pMnode
,
pTopic
,
pSub
->
unassignedVg
);
//TODO: disable alter
...
...
@@ -486,7 +515,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
}
SSdbRaw
*
pRaw
=
mndSubActionEncode
(
pSub
);
/*sdbSetRawStatus(pRaw, SDB_STATUS_READY);*/
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
mndTransAppendRedolog
(
pTrans
,
pRaw
);
#if 0
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
...
...
@@ -519,8 +548,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
mndTransAppendRedolog(pTrans, pTopicRaw);
#endif
mndReleaseTopic
(
pMnode
,
pTopic
);
mndReleaseSubscribe
(
pMnode
,
pSub
);
/*mndReleaseTopic(pMnode, pTopic);*/
/*mndReleaseSubscribe(pMnode, pSub);*/
}
}
// part3. persist consumerObj
...
...
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
e777ee72
...
...
@@ -60,7 +60,9 @@ void mndCleanupTopic(SMnode *pMnode) {}
SSdbRaw
*
mndTopicActionEncode
(
SMqTopicObj
*
pTopic
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
int32_t
size
=
sizeof
(
SMqTopicObj
)
+
MND_TOPIC_RESERVE_SIZE
;
int32_t
logicalPlanLen
=
strlen
(
pTopic
->
logicalPlan
)
+
1
;
int32_t
physicalPlanLen
=
strlen
(
pTopic
->
physicalPlan
)
+
1
;
int32_t
size
=
sizeof
(
SMqTopicObj
)
+
logicalPlanLen
+
physicalPlanLen
+
pTopic
->
sqlLen
+
MND_TOPIC_RESERVE_SIZE
;
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_TOPIC
,
MND_TOPIC_VER_NUMBER
,
size
);
if
(
pRaw
==
NULL
)
goto
TOPIC_ENCODE_OVER
;
...
...
@@ -74,12 +76,10 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTopic
->
version
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTopic
->
sqlLen
,
TOPIC_ENCODE_OVER
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
sql
,
pTopic
->
sqlLen
,
TOPIC_ENCODE_OVER
);
int32_t
logicalPlanLen
=
strlen
(
pTopic
->
logicalPlan
)
+
1
;
SDB_SET_INT32
(
pRaw
,
dataPos
,
strlen
(
pTopic
->
logicalPlan
)
+
1
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
logicalPlanLen
,
TOPIC_ENCODE_OVER
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
logicalPlan
,
logicalPlanLen
,
TOPIC_ENCODE_OVER
);
int32_t
physicalPlanLen
=
strlen
(
pTopic
->
physicalPlan
)
+
1
;
SDB_SET_INT32
(
pRaw
,
dataPos
,
strlen
(
pTopic
->
physicalPlan
)
+
1
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
physicalPlanLen
,
TOPIC_ENCODE_OVER
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
physicalPlan
,
physicalPlanLen
,
TOPIC_ENCODE_OVER
);
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
MND_TOPIC_RESERVE_SIZE
,
TOPIC_ENCODE_OVER
);
...
...
@@ -135,7 +135,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TOPIC_DECODE_OVER
;
}
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
logicalPlan
,
len
+
1
,
TOPIC_DECODE_OVER
);
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
logicalPlan
,
len
,
TOPIC_DECODE_OVER
);
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
len
,
TOPIC_DECODE_OVER
);
pTopic
->
physicalPlan
=
calloc
(
len
+
1
,
sizeof
(
char
));
...
...
@@ -144,7 +144,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TOPIC_DECODE_OVER
;
}
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
physicalPlan
,
len
+
1
,
TOPIC_DECODE_OVER
);
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
physicalPlan
,
len
,
TOPIC_DECODE_OVER
);
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
MND_TOPIC_RESERVE_SIZE
,
TOPIC_DECODE_OVER
);
...
...
@@ -231,6 +231,7 @@ static int32_t mndCheckCreateTopicMsg(SCMCreateTopicReq *creattopReq) {
}
static
int32_t
mndCreateTopic
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SCMCreateTopicReq
*
pCreate
,
SDbObj
*
pDb
)
{
mDebug
(
"topic:%s to create"
,
pCreate
->
name
);
SMqTopicObj
topicObj
=
{
0
};
tstrncpy
(
topicObj
.
name
,
pCreate
->
name
,
TSDB_TOPIC_FNAME_LEN
);
tstrncpy
(
topicObj
.
db
,
pDb
->
name
,
TSDB_DB_FNAME_LEN
);
...
...
@@ -273,7 +274,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) {
return
0
;
}
else
{
terrno
=
TSDB_CODE_MND_TOPIC_ALREADY_EXIST
;
mError
(
"
db:%s, failed to create since %s"
,
createTopicReq
.
name
,
terrstr
()
);
mError
(
"
topic:%s, failed to create since already exists"
,
createTopicReq
.
name
);
return
-
1
;
}
}
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
e777ee72
...
...
@@ -799,9 +799,9 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
return
-
1
;
}
strcpy
(
pTopic
->
topicName
,
req
.
topicName
);
strcpy
(
pTopic
->
sql
,
req
.
sql
);
strcpy
(
pTopic
->
logicalPlan
,
req
.
logicalPlan
);
strcpy
(
pTopic
->
physicalPlan
,
req
.
physicalPlan
);
pTopic
->
sql
=
strdup
(
req
.
sql
);
pTopic
->
logicalPlan
=
strdup
(
req
.
logicalPlan
);
pTopic
->
physicalPlan
=
strdup
(
req
.
physicalPlan
);
pTopic
->
buffer
.
firstOffset
=
-
1
;
pTopic
->
buffer
.
lastOffset
=
-
1
;
...
...
@@ -811,9 +811,10 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
for
(
int
i
=
0
;
i
<
TQ_BUFFER_SIZE
;
i
++
)
{
pTopic
->
buffer
.
output
[
i
].
status
=
0
;
STqReadHandle
*
pReadHandle
=
tqInitSubmitMsgScanner
(
pTq
->
pMeta
);
pTopic
->
buffer
.
output
[
i
].
task
=
qCreateStreamExecTaskInfo
(
&
req
.
msg
,
pReadHandle
);
pTopic
->
buffer
.
output
[
i
].
task
=
qCreateStreamExecTaskInfo
(
&
req
.
q
msg
,
pReadHandle
);
}
taosArrayPush
(
pConsumer
->
topics
,
pTopic
);
terrno
=
TSDB_CODE_SUCCESS
;
return
0
;
}
...
...
@@ -826,7 +827,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
pReadHandle
->
pMsg
=
NULL
;
pReadHandle
->
ver
=
-
1
;
pReadHandle
->
pColIdList
=
NULL
;
return
NULL
;
return
pReadHandle
;
}
void
tqReadHandleSetMsg
(
STqReadHandle
*
pReadHandle
,
SSubmitMsg
*
pMsg
,
int64_t
ver
)
{
...
...
source/dnode/vnode/src/vnd/vnodeMgr.c
浏览文件 @
e777ee72
...
...
@@ -98,6 +98,8 @@ int32_t vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) {
/* ------------------------ STATIC METHODS ------------------------ */
static
void
*
loop
(
void
*
arg
)
{
setThreadName
(
"vnode-commit"
);
SVnodeTask
*
pTask
;
for
(;;)
{
pthread_mutex_lock
(
&
(
vnodeMgr
.
mutex
));
...
...
source/dnode/vnode/src/vnd/vnodeWrite.c
浏览文件 @
e777ee72
...
...
@@ -115,7 +115,8 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
}
break
;
case
TDMT_VND_MQ_SET_CONN
:
{
if
(
tqProcessSetConnReq
(
pVnode
->
pTq
,
ptr
)
<
0
)
{
if
(
tqProcessSetConnReq
(
pVnode
->
pTq
,
POINTER_SHIFT
(
ptr
,
sizeof
(
SMsgHead
)))
<
0
)
{
// TODO: handle error
}
}
break
;
default:
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
e777ee72
...
...
@@ -250,9 +250,8 @@ typedef struct SExecTaskInfo {
STaskCostInfo
cost
;
int64_t
owner
;
// if it is in execution
int32_t
code
;
uint64_t
totalRows
;
// total number of rows
STableGroupInfo
tableqinfoGroupInfo
;
// this is a group array list, including SArray<STableQueryInfo*> structure
pthread_mutex_t
lock
;
// used to synchronize the rsp/query threads
char
*
sql
;
// query sql string
jmp_buf
env
;
//
struct
SOperatorInfo
*
pRoot
;
...
...
@@ -622,8 +621,6 @@ int32_t createIndirectQueryFuncExprFromMsg(SQueryTableReq *pQueryMsg, int32_t nu
int32_t
createQueryFilter
(
char
*
data
,
uint16_t
len
,
SFilterInfo
**
pFilters
);
SGroupbyExpr
*
createGroupbyExprFromMsg
(
SQueryTableReq
*
pQueryMsg
,
SColIndex
*
pColIndex
,
int32_t
*
code
);
SQInfo
*
createQInfoImpl
(
SQueryTableReq
*
pQueryMsg
,
SGroupbyExpr
*
pGroupbyExpr
,
SExprInfo
*
pExprs
,
SExprInfo
*
pSecExprs
,
STableGroupInfo
*
pTableGroupInfo
,
SColumnInfo
*
pTagCols
,
SFilterInfo
*
pFilters
,
int32_t
vgId
,
char
*
sql
,
uint64_t
qId
,
struct
SUdfInfo
*
pUdfInfo
);
int32_t
initQInfo
(
STsBufInfo
*
pTsBufInfo
,
void
*
tsdb
,
void
*
sourceOptr
,
SQInfo
*
pQInfo
,
STaskParam
*
param
,
char
*
start
,
int32_t
prevResultLen
,
void
*
merger
);
...
...
@@ -645,20 +642,18 @@ void setQueryStatus(STaskRuntimeEnv *pRuntimeEnv, int8_t status);
bool
onlyQueryTags
(
STaskAttr
*
pQueryAttr
);
//void destroyUdfInfo(struct SUdfInfo* pUdfInfo);
bool
isValidQInfo
(
void
*
param
);
int32_t
doDumpQueryResult
(
SQInfo
*
pQInfo
,
char
*
data
,
int8_t
compressed
,
int32_t
*
compLen
);
size_t
getResultSize
(
SQInfo
*
pQInfo
,
int64_t
*
numOfRows
);
void
set
QueryKilled
(
SQInfo
*
pQ
Info
);
void
set
TaskKilled
(
SExecTaskInfo
*
pTask
Info
);
void
publishOperatorProfEvent
(
SOperatorInfo
*
operatorInfo
,
EQueryProfEventType
eventType
);
void
publishQueryAbortEvent
(
SExecTaskInfo
*
pTaskInfo
,
int32_t
code
);
void
calculateOperatorProfResults
(
SQInfo
*
pQInfo
);
void
queryCostStatis
(
S
QInfo
*
pQ
Info
);
void
queryCostStatis
(
S
ExecTaskInfo
*
pTask
Info
);
void
doDestroyTask
(
S
QInfo
*
pQ
Info
);
void
doDestroyTask
(
S
ExecTaskInfo
*
pTask
Info
);
void
freeQueryAttr
(
STaskAttr
*
pQuery
);
int32_t
getMaximumIdleDurationSec
();
...
...
source/libs/executor/src/dataDispatcher.c
浏览文件 @
e777ee72
...
...
@@ -121,11 +121,19 @@ static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputDat
}
static
bool
allocBuf
(
SDataDispatchHandle
*
pDispatcher
,
const
SInputData
*
pInput
,
SDataDispatchBuf
*
pBuf
)
{
if
(
taosQueueSize
(
pDispatcher
->
pDataBlocks
)
>=
pDispatcher
->
pManager
->
cfg
.
maxDataBlockNumPerQuery
)
{
uint32_t
capacity
=
pDispatcher
->
pManager
->
cfg
.
maxDataBlockNumPerQuery
;
if
(
taosQueueSize
(
pDispatcher
->
pDataBlocks
)
>
capacity
)
{
qError
(
"SinkNode queue is full, no capacity, max:%d, current:%d, no capacity"
,
capacity
,
taosQueueSize
(
pDispatcher
->
pDataBlocks
));
return
false
;
}
pBuf
->
allocSize
=
DATA_META_LENGTH
(
pInput
->
pTableRetrieveTsMap
)
+
pDispatcher
->
schema
.
resultRowSize
*
pInput
->
pData
->
info
.
rows
;
pBuf
->
pData
=
malloc
(
pBuf
->
allocSize
);
if
(
pBuf
->
pData
==
NULL
)
{
qError
(
"SinkNode failed to malloc memory, size:%d, code:%d"
,
pBuf
->
allocSize
,
TAOS_SYSTEM_ERROR
(
errno
));
}
return
NULL
!=
pBuf
->
pData
;
}
...
...
source/libs/executor/src/executor.c
浏览文件 @
e777ee72
...
...
@@ -60,8 +60,8 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) {
return
code
;
}
qTaskInfo_t
qCreateStreamExecTaskInfo
(
SSubQueryMsg
*
pM
sg
,
void
*
streamReadHandle
)
{
if
(
pM
sg
==
NULL
||
streamReadHandle
==
NULL
)
{
qTaskInfo_t
qCreateStreamExecTaskInfo
(
void
*
m
sg
,
void
*
streamReadHandle
)
{
if
(
m
sg
==
NULL
||
streamReadHandle
==
NULL
)
{
return
NULL
;
}
...
...
@@ -74,7 +74,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* streamReadHandle
#endif
struct
SSubplan
*
plan
=
NULL
;
int32_t
code
=
qStringToSubplan
(
pMsg
->
msg
,
&
plan
);
int32_t
code
=
qStringToSubplan
(
msg
,
&
plan
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
code
;
return
NULL
;
...
...
source/libs/executor/src/executorMain.c
浏览文件 @
e777ee72
...
...
@@ -149,7 +149,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
int64_t
curOwner
=
0
;
if
((
curOwner
=
atomic_val_compare_exchange_64
(
&
pTaskInfo
->
owner
,
0
,
threadId
))
!=
0
)
{
qError
(
"QI
nfo
:0x%"
PRIx64
"-%p qhandle is now executed by thread:%p"
,
GET_TASKID
(
pTaskInfo
),
pTaskInfo
,
qError
(
"QI
D
:0x%"
PRIx64
"-%p qhandle is now executed by thread:%p"
,
GET_TASKID
(
pTaskInfo
),
pTaskInfo
,
(
void
*
)
curOwner
);
pTaskInfo
->
code
=
TSDB_CODE_QRY_IN_EXEC
;
return
pTaskInfo
->
code
;
...
...
@@ -160,7 +160,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
}
if
(
isTaskKilled
(
pTaskInfo
))
{
qDebug
(
"QI
nfo
:0x%"
PRIx64
" it is already killed, abort"
,
GET_TASKID
(
pTaskInfo
));
qDebug
(
"QI
D
:0x%"
PRIx64
" it is already killed, abort"
,
GET_TASKID
(
pTaskInfo
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -169,12 +169,12 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
publishQueryAbortEvent
(
pTaskInfo
,
ret
);
pTaskInfo
->
code
=
ret
;
qDebug
(
"QI
nfo
:0x%"
PRIx64
" query abort due to error/cancel occurs, code:%s"
,
GET_TASKID
(
pTaskInfo
),
qDebug
(
"QI
D
:0x%"
PRIx64
" query abort due to error/cancel occurs, code:%s"
,
GET_TASKID
(
pTaskInfo
),
tstrerror
(
pTaskInfo
->
code
));
return
pTaskInfo
->
code
;
}
qDebug
(
"QI
nfo
:0x%"
PRIx64
" query task is launched"
,
GET_TASKID
(
pTaskInfo
));
qDebug
(
"QI
D
:0x%"
PRIx64
" query task is launched"
,
GET_TASKID
(
pTaskInfo
));
bool
newgroup
=
false
;
publishOperatorProfEvent
(
pTaskInfo
->
pRoot
,
QUERY_PROF_BEFORE_OPERATOR_EXEC
);
...
...
@@ -190,8 +190,11 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
*
useconds
=
pTaskInfo
->
cost
.
elapsedTime
;
}
qDebug
(
"QInfo:0x%"
PRIx64
" query paused, %d rows returned, total:%"
PRId64
" rows, in sinkNode:%d"
,
GET_TASKID
(
pTaskInfo
),
0
,
0L
,
0
);
int32_t
current
=
(
*
pRes
!=
NULL
)
?
(
*
pRes
)
->
info
.
rows
:
0
;
pTaskInfo
->
totalRows
+=
current
;
qDebug
(
"QID:0x%"
PRIx64
" task paused, %d rows returned, total:%"
PRId64
" rows, in sinkNode:%d"
,
GET_TASKID
(
pTaskInfo
),
current
,
pTaskInfo
->
totalRows
,
0
);
atomic_store_64
(
&
pTaskInfo
->
owner
,
0
);
return
pTaskInfo
->
code
;
...
...
@@ -200,14 +203,14 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
int32_t
qRetrieveQueryResultInfo
(
qTaskInfo_t
qinfo
,
bool
*
buildRes
,
void
*
pRspContext
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qinfo
;
if
(
pQInfo
==
NULL
||
!
isValidQInfo
(
pQInfo
)
)
{
if
(
pQInfo
==
NULL
)
{
qError
(
"QInfo invalid qhandle"
);
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
*
buildRes
=
false
;
if
(
IS_QUERY_KILLED
(
pQInfo
))
{
qDebug
(
"QI
nfo
:0x%"
PRIx64
" query is killed, code:0x%08x"
,
pQInfo
->
qId
,
pQInfo
->
code
);
qDebug
(
"QI
D
:0x%"
PRIx64
" query is killed, code:0x%08x"
,
pQInfo
->
qId
,
pQInfo
->
code
);
return
pQInfo
->
code
;
}
...
...
@@ -227,11 +230,11 @@ int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspCo
assert
(
pQInfo
->
rspContext
==
NULL
);
if
(
pQInfo
->
dataReady
==
QUERY_RESULT_READY
)
{
*
buildRes
=
true
;
qDebug
(
"QI
nfo
:0x%"
PRIx64
" retrieve result info, rowsize:%d, rows:%d, code:%s"
,
pQInfo
->
qId
,
pQueryAttr
->
resultRowSize
,
qDebug
(
"QI
D
:0x%"
PRIx64
" retrieve result info, rowsize:%d, rows:%d, code:%s"
,
pQInfo
->
qId
,
pQueryAttr
->
resultRowSize
,
GET_NUM_OF_RESULTS
(
pRuntimeEnv
),
tstrerror
(
pQInfo
->
code
));
}
else
{
*
buildRes
=
false
;
qDebug
(
"QI
nfo
:0x%"
PRIx64
" retrieve req set query return result after paused"
,
pQInfo
->
qId
);
qDebug
(
"QI
D
:0x%"
PRIx64
" retrieve req set query return result after paused"
,
pQInfo
->
qId
);
pQInfo
->
rspContext
=
pRspContext
;
assert
(
pQInfo
->
rspContext
!=
NULL
);
}
...
...
@@ -251,18 +254,18 @@ void* qGetResultRetrieveMsg(qTaskInfo_t qinfo) {
}
int32_t
qKillTask
(
qTaskInfo_t
qinfo
)
{
S
QInfo
*
pQInfo
=
(
SQ
Info
*
)
qinfo
;
S
ExecTaskInfo
*
pTaskInfo
=
(
SExecTask
Info
*
)
qinfo
;
if
(
p
QInfo
==
NULL
||
!
isValidQInfo
(
pQInfo
)
)
{
if
(
p
TaskInfo
==
NULL
)
{
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
qDebug
(
"QI
nfo:0x%"
PRIx64
" query killed"
,
pQInfo
->
q
Id
);
set
QueryKilled
(
pQ
Info
);
qDebug
(
"QI
D:0x%"
PRIx64
" execTask killed"
,
pTaskInfo
->
id
.
query
Id
);
set
TaskKilled
(
pTask
Info
);
// Wait for the query executing thread being stopped/
// Once the query is stopped, the owner of qHandle will be cleared immediately.
while
(
p
Q
Info
->
owner
!=
0
)
{
while
(
p
Task
Info
->
owner
!=
0
)
{
taosMsleep
(
100
);
}
...
...
@@ -270,14 +273,14 @@ int32_t qKillTask(qTaskInfo_t qinfo) {
}
int32_t
qAsyncKillTask
(
qTaskInfo_t
qinfo
)
{
S
QInfo
*
pQInfo
=
(
SQ
Info
*
)
qinfo
;
S
ExecTaskInfo
*
pTaskInfo
=
(
SExecTask
Info
*
)
qinfo
;
if
(
p
QInfo
==
NULL
||
!
isValidQInfo
(
pQInfo
)
)
{
if
(
p
TaskInfo
==
NULL
)
{
return
TSDB_CODE_QRY_INVALID_QHANDLE
;
}
qDebug
(
"QI
nfo:0x%"
PRIx64
" query async killed"
,
pQInfo
->
q
Id
);
set
QueryKilled
(
pQ
Info
);
qDebug
(
"QI
D:0x%"
PRIx64
" query async killed"
,
pTaskInfo
->
id
.
query
Id
);
set
TaskKilled
(
pTask
Info
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -292,15 +295,12 @@ int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
return
isTaskKilled
(
pTaskInfo
)
||
Q_STATUS_EQUAL
(
pTaskInfo
->
status
,
TASK_OVER
);
}
void
qDestroyTask
(
qTaskInfo_t
qHandle
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
qHandle
;
if
(
!
isValidQInfo
(
pQInfo
))
{
return
;
}
void
qDestroyTask
(
qTaskInfo_t
qTaskHandle
)
{
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
qTaskHandle
;
qDebug
(
"QID:0x%"
PRIx64
" execTask completed, numOfRows:%"
PRId64
,
pTaskInfo
->
id
.
queryId
,
pTaskInfo
->
totalRows
);
qDebug
(
"QInfo:0x%"
PRIx64
" query completed"
,
pQInfo
->
qId
);
queryCostStatis
(
pQInfo
);
// print the query cost summary
doDestroyTask
(
pQInfo
);
queryCostStatis
(
pTaskInfo
);
// print the query cost summary
doDestroyTask
(
pTaskInfo
);
}
void
*
qOpenTaskMgmt
(
int32_t
vgId
)
{
...
...
@@ -381,7 +381,7 @@ void** qRegisterTask(void* pMgmt, uint64_t qId, void *qInfo) {
STaskMgmt
*
pQueryMgmt
=
pMgmt
;
if
(
pQueryMgmt
->
qinfoPool
==
NULL
)
{
qError
(
"QI
nfo
:0x%"
PRIx64
"-%p failed to add qhandle into qMgmt, since qMgmt is closed"
,
qId
,
(
void
*
)
qInfo
);
qError
(
"QI
D
:0x%"
PRIx64
"-%p failed to add qhandle into qMgmt, since qMgmt is closed"
,
qId
,
(
void
*
)
qInfo
);
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
return
NULL
;
}
...
...
@@ -389,7 +389,7 @@ void** qRegisterTask(void* pMgmt, uint64_t qId, void *qInfo) {
pthread_mutex_lock
(
&
pQueryMgmt
->
lock
);
if
(
pQueryMgmt
->
closed
)
{
pthread_mutex_unlock
(
&
pQueryMgmt
->
lock
);
qError
(
"QI
nfo
:0x%"
PRIx64
"-%p failed to add qhandle into cache, since qMgmt is colsing"
,
qId
,
(
void
*
)
qInfo
);
qError
(
"QI
D
:0x%"
PRIx64
"-%p failed to add qhandle into cache, since qMgmt is colsing"
,
qId
,
(
void
*
)
qInfo
);
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
return
NULL
;
}
else
{
...
...
@@ -445,7 +445,7 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
qWarn("QId:0x%"PRIx64" be killed(no memory commit).", pQInfo->qId);
set
Query
Killed(pQInfo);
set
Task
Killed(pQInfo);
// wait query stop
int32_t loop = 0;
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
e777ee72
...
...
@@ -2432,9 +2432,9 @@ static bool needBuildResAfterQueryComplete(SQInfo* pQInfo) {
}
bool
isTaskKilled
(
SExecTaskInfo
*
pTaskInfo
)
{
if
(
IS_QUERY_KILLED
(
pTaskInfo
))
{
return
true
;
}
//
if (IS_QUERY_KILLED(pTaskInfo)) {
//
return true;
//
}
// query has been executed more than tsShellActivityTimer, and the retrieve has not arrived
// abort current query execution.
...
...
@@ -2444,13 +2444,13 @@ bool isTaskKilled(SExecTaskInfo *pTaskInfo) {
assert
(
pTaskInfo
->
cost
.
start
!=
0
);
// qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64
// ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec());
return
true
;
//
return true;
}
return
false
;
}
void
set
QueryKilled
(
SQInfo
*
pQInfo
)
{
pQ
Info
->
code
=
TSDB_CODE_TSC_QUERY_CANCELLED
;}
void
set
TaskKilled
(
SExecTaskInfo
*
pTaskInfo
)
{
pTask
Info
->
code
=
TSDB_CODE_TSC_QUERY_CANCELLED
;}
//static bool isFixedOutputQuery(STaskAttr* pQueryAttr) {
// if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
...
...
@@ -4420,33 +4420,32 @@ void calculateOperatorProfResults(SQInfo* pQInfo) {
taosArrayDestroy
(
opStack
);
}
void
queryCostStatis
(
SQInfo
*
pQInfo
)
{
STaskRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
STaskCostInfo
*
pSummary
=
&
pQInfo
->
summary
;
void
queryCostStatis
(
SExecTaskInfo
*
pTaskInfo
)
{
STaskCostInfo
*
pSummary
=
&
pTaskInfo
->
cost
;
uint64_t
hashSize
=
taosHashGetMemSize
(
pQInfo
->
runtimeEnv
.
pResultRowHashTable
);
hashSize
+=
taosHashGetMemSize
(
pRuntimeEnv
->
tableqinfoGroupInfo
.
map
);
pSummary
->
hashSize
=
hashSize
;
//
uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable);
//
hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map);
//
pSummary->hashSize = hashSize;
// add the merge time
pSummary
->
elapsedTime
+=
pSummary
->
firstStageMergeTime
;
SResultRowPool
*
p
=
pQInfo
->
runtimeEnv
.
pool
;
if
(
p
!=
NULL
)
{
pSummary
->
winInfoSize
=
getResultRowPoolMemSize
(
p
);
pSummary
->
numOfTimeWindows
=
getNumOfAllocatedResultRows
(
p
);
}
else
{
pSummary
->
winInfoSize
=
0
;
pSummary
->
numOfTimeWindows
=
0
;
}
calculateOperatorProfResults
(
pQInfo
);
//qDebug("QInfo:0x%"PRIx64" :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, "
// "load block statis:%d, load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64,
// pQInfo->qId, pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, pSummary->loadBlockStatis,
// pSummary->loadBlocks, pSummary->totalRows, pSummary->totalCheckedRows);
// SResultRowPool* p = pTaskInfo->pool;
// if (p != NULL) {
// pSummary->winInfoSize = getResultRowPoolMemSize(p);
// pSummary->numOfTimeWindows = getNumOfAllocatedResultRows(p);
// } else {
// pSummary->winInfoSize = 0;
// pSummary->numOfTimeWindows = 0;
// }
//
// calculateOperatorProfResults(pQInfo);
qDebug
(
"QID:0x%"
PRIx64
" :cost summary: elapsed time:%"
PRId64
" us, first merge:%"
PRId64
" us, total blocks:%d, "
"load block statis:%d, load data block:%d, total rows:%"
PRId64
", check rows:%"
PRId64
,
pTaskInfo
->
id
.
queryId
,
pSummary
->
elapsedTime
,
pSummary
->
firstStageMergeTime
,
pSummary
->
totalBlocks
,
pSummary
->
loadBlockStatis
,
pSummary
->
loadBlocks
,
pSummary
->
totalRows
,
pSummary
->
totalCheckedRows
);
//
//qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
// pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
...
...
@@ -7733,7 +7732,6 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId) {
SExecTaskInfo
*
pTaskInfo
=
calloc
(
1
,
sizeof
(
SExecTaskInfo
));
setTaskStatus
(
pTaskInfo
,
TASK_NOT_COMPLETED
);
pthread_mutex_init
(
&
pTaskInfo
->
lock
,
NULL
);
pTaskInfo
->
cost
.
created
=
taosGetTimestampMs
();
pTaskInfo
->
id
.
queryId
=
queryId
;
return
pTaskInfo
;
...
...
@@ -8673,229 +8671,6 @@ FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) {
return
((
SQInfo
*
)
qHandle
)
->
qId
==
qId
;
}
SQInfo
*
createQInfoImpl
(
SQueryTableReq
*
pQueryMsg
,
SGroupbyExpr
*
pGroupbyExpr
,
SExprInfo
*
pExprs
,
SExprInfo
*
pSecExprs
,
STableGroupInfo
*
pTableGroupInfo
,
SColumnInfo
*
pTagCols
,
SFilterInfo
*
pFilters
,
int32_t
vgId
,
char
*
sql
,
uint64_t
qId
,
struct
SUdfInfo
*
pUdfInfo
)
{
int16_t
numOfCols
=
pQueryMsg
->
numOfCols
;
int16_t
numOfOutput
=
pQueryMsg
->
numOfOutput
;
SQInfo
*
pQInfo
=
(
SQInfo
*
)
calloc
(
1
,
sizeof
(
SQInfo
));
if
(
pQInfo
==
NULL
)
{
goto
_cleanup_qinfo
;
}
pQInfo
->
qId
=
qId
;
pQInfo
->
startExecTs
=
0
;
pQInfo
->
runtimeEnv
.
pUdfInfo
=
pUdfInfo
;
// to make sure third party won't overwrite this structure
pQInfo
->
signature
=
pQInfo
;
STaskAttr
*
pQueryAttr
=
&
pQInfo
->
query
;
pQInfo
->
runtimeEnv
.
pQueryAttr
=
pQueryAttr
;
pQueryAttr
->
tableGroupInfo
=
*
pTableGroupInfo
;
pQueryAttr
->
numOfCols
=
numOfCols
;
pQueryAttr
->
numOfOutput
=
numOfOutput
;
pQueryAttr
->
limit
.
limit
=
pQueryMsg
->
limit
;
pQueryAttr
->
limit
.
offset
=
pQueryMsg
->
offset
;
pQueryAttr
->
order
.
order
=
pQueryMsg
->
order
;
pQueryAttr
->
order
.
col
.
info
.
colId
=
pQueryMsg
->
orderColId
;
pQueryAttr
->
pExpr1
=
pExprs
;
pQueryAttr
->
pExpr2
=
pSecExprs
;
pQueryAttr
->
numOfExpr2
=
pQueryMsg
->
secondStageOutput
;
pQueryAttr
->
pGroupbyExpr
=
pGroupbyExpr
;
memcpy
(
&
pQueryAttr
->
interval
,
&
pQueryMsg
->
interval
,
sizeof
(
pQueryAttr
->
interval
));
pQueryAttr
->
fillType
=
pQueryMsg
->
fillType
;
pQueryAttr
->
numOfTags
=
pQueryMsg
->
numOfTags
;
pQueryAttr
->
tagColList
=
pTagCols
;
pQueryAttr
->
prjInfo
.
vgroupLimit
=
pQueryMsg
->
vgroupLimit
;
pQueryAttr
->
prjInfo
.
ts
=
(
pQueryMsg
->
order
==
TSDB_ORDER_ASC
)
?
INT64_MIN
:
INT64_MAX
;
// pQueryAttr->sw = pQueryMsg->sw;
pQueryAttr
->
vgId
=
vgId
;
pQueryAttr
->
stableQuery
=
pQueryMsg
->
stableQuery
;
pQueryAttr
->
topBotQuery
=
pQueryMsg
->
topBotQuery
;
pQueryAttr
->
groupbyColumn
=
pQueryMsg
->
groupbyColumn
;
pQueryAttr
->
hasTagResults
=
pQueryMsg
->
hasTagResults
;
pQueryAttr
->
timeWindowInterpo
=
pQueryMsg
->
timeWindowInterpo
;
pQueryAttr
->
queryBlockDist
=
pQueryMsg
->
queryBlockDist
;
pQueryAttr
->
stabledev
=
pQueryMsg
->
stabledev
;
pQueryAttr
->
tsCompQuery
=
pQueryMsg
->
tsCompQuery
;
pQueryAttr
->
simpleAgg
=
pQueryMsg
->
simpleAgg
;
pQueryAttr
->
pointInterpQuery
=
pQueryMsg
->
pointInterpQuery
;
pQueryAttr
->
needReverseScan
=
pQueryMsg
->
needReverseScan
;
pQueryAttr
->
stateWindow
=
pQueryMsg
->
stateWindow
;
pQueryAttr
->
vgId
=
vgId
;
// pQueryAttr->pFilters = pFilters;
pQueryAttr
->
tableCols
=
calloc
(
numOfCols
,
sizeof
(
SSingleColumnFilterInfo
));
if
(
pQueryAttr
->
tableCols
==
NULL
)
{
goto
_cleanup
;
}
pQueryAttr
->
srcRowSize
=
0
;
pQueryAttr
->
maxTableColumnWidth
=
0
;
for
(
int16_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
pQueryAttr
->
tableCols
[
i
]
=
pQueryMsg
->
tableCols
[
i
];
// pQueryAttr->tableCols[i].flist.filterInfo = tFilterInfoDup(pQueryMsg->tableCols[i].flist.filterInfo, pQueryAttr->tableCols[i].flist.numOfFilters);
pQueryAttr
->
srcRowSize
+=
pQueryAttr
->
tableCols
[
i
].
bytes
;
if
(
pQueryAttr
->
maxTableColumnWidth
<
pQueryAttr
->
tableCols
[
i
].
bytes
)
{
pQueryAttr
->
maxTableColumnWidth
=
pQueryAttr
->
tableCols
[
i
].
bytes
;
}
}
for
(
int16_t
col
=
0
;
col
<
numOfOutput
;
++
col
)
{
assert
(
pExprs
[
col
].
base
.
resSchema
.
bytes
>
0
);
pQueryAttr
->
resultRowSize
+=
pExprs
[
col
].
base
.
resSchema
.
bytes
;
// keep the tag length
if
(
TSDB_COL_IS_TAG
(
pExprs
[
col
].
base
.
pColumns
->
flag
))
{
pQueryAttr
->
tagLen
+=
pExprs
[
col
].
base
.
resSchema
.
bytes
;
}
// if (pExprs[col].base.flist.filterInfo) {
// ++pQueryAttr->havingNum;
// }
}
doUpdateExprColumnIndex
(
pQueryAttr
);
if
(
pSecExprs
!=
NULL
)
{
int32_t
resultRowSize
=
0
;
// calculate the result row size
for
(
int16_t
col
=
0
;
col
<
pQueryAttr
->
numOfExpr2
;
++
col
)
{
assert
(
pSecExprs
[
col
].
base
.
resSchema
.
bytes
>
0
);
resultRowSize
+=
pSecExprs
[
col
].
base
.
resSchema
.
bytes
;
}
if
(
resultRowSize
>
pQueryAttr
->
resultRowSize
)
{
pQueryAttr
->
resultRowSize
=
resultRowSize
;
}
}
if
(
pQueryAttr
->
fillType
!=
TSDB_FILL_NONE
)
{
pQueryAttr
->
fillVal
=
malloc
(
sizeof
(
int64_t
)
*
pQueryAttr
->
numOfOutput
);
if
(
pQueryAttr
->
fillVal
==
NULL
)
{
goto
_cleanup
;
}
// the first column is the timestamp
memcpy
(
pQueryAttr
->
fillVal
,
(
char
*
)
pQueryMsg
->
fillVal
,
pQueryAttr
->
numOfOutput
*
sizeof
(
int64_t
));
}
size_t
numOfGroups
=
0
;
if
(
pTableGroupInfo
->
pGroupList
!=
NULL
)
{
numOfGroups
=
taosArrayGetSize
(
pTableGroupInfo
->
pGroupList
);
STableGroupInfo
*
pTableqinfo
=
&
pQInfo
->
runtimeEnv
.
tableqinfoGroupInfo
;
pTableqinfo
->
pGroupList
=
taosArrayInit
(
numOfGroups
,
POINTER_BYTES
);
pTableqinfo
->
numOfTables
=
pTableGroupInfo
->
numOfTables
;
pTableqinfo
->
map
=
taosHashInit
(
pTableGroupInfo
->
numOfTables
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
}
pQInfo
->
pBuf
=
calloc
(
pTableGroupInfo
->
numOfTables
,
sizeof
(
STableQueryInfo
));
if
(
pQInfo
->
pBuf
==
NULL
)
{
goto
_cleanup
;
}
pQInfo
->
dataReady
=
QUERY_RESULT_NOT_READY
;
pQInfo
->
rspContext
=
NULL
;
pQInfo
->
sql
=
sql
;
pthread_mutex_init
(
&
pQInfo
->
lock
,
NULL
);
tsem_init
(
&
pQInfo
->
ready
,
0
,
0
);
pQueryAttr
->
window
=
pQueryMsg
->
window
;
updateDataCheckOrder
(
pQInfo
,
pQueryMsg
,
pQueryAttr
->
stableQuery
);
STaskRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
STimeWindow
window
=
pQueryAttr
->
window
;
int32_t
index
=
0
;
for
(
int32_t
i
=
0
;
i
<
numOfGroups
;
++
i
)
{
SArray
*
pa
=
taosArrayGetP
(
pQueryAttr
->
tableGroupInfo
.
pGroupList
,
i
);
size_t
s
=
taosArrayGetSize
(
pa
);
SArray
*
p1
=
taosArrayInit
(
s
,
POINTER_BYTES
);
if
(
p1
==
NULL
)
{
goto
_cleanup
;
}
taosArrayPush
(
pRuntimeEnv
->
tableqinfoGroupInfo
.
pGroupList
,
&
p1
);
for
(
int32_t
j
=
0
;
j
<
s
;
++
j
)
{
// STableKeyInfo* info = taosArrayGet(pa, j);
// window.skey = info->lastKey;
//
// void* buf = (char*) pQInfo->pBuf + index * sizeof(STableQueryInfo);
// STableQueryInfo* item = createTableQueryInfo(pQueryAttr, info->pTable, pQueryAttr->groupbyColumn, window, buf);
// if (item == NULL) {
// goto _cleanup;
// }
//
// item->groupIndex = i;
// taosArrayPush(p1, &item);
// STableId* id = TSDB_TABLEID(info->pTable);
// taosHashPut(pRuntimeEnv->tableqinfoGroupInfo.map, &id->tid, sizeof(id->tid), &item, POINTER_BYTES);
// index += 1;
}
}
colIdCheck
(
pQueryAttr
,
pQInfo
->
qId
);
// int32_t functionId = getExprFunctionId(&pExpr[0]);
// pQInfo->query.queryBlockDist = (functionId == FUNCTION_BLKINFO);
//qDebug("qmsg:%p vgId:%d, QInfo:0x%" PRIx64 "-%p created", pQueryMsg, pQInfo->query.vgId, pQInfo->qId, pQInfo);
return
pQInfo
;
_cleanup_qinfo:
// tsdbDestroyTableGroup(pTableGroupInfo);
if
(
pGroupbyExpr
!=
NULL
)
{
taosArrayDestroy
(
pGroupbyExpr
->
columnInfo
);
free
(
pGroupbyExpr
);
}
tfree
(
pTagCols
);
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
SExprInfo
*
pExprInfo
=
&
pExprs
[
i
];
if
(
pExprInfo
->
pExpr
!=
NULL
)
{
tExprTreeDestroy
(
pExprInfo
->
pExpr
,
NULL
);
pExprInfo
->
pExpr
=
NULL
;
}
// if (pExprInfo->base.flist.filterInfo) {
// freeColumnFilterInfo(pExprInfo->base.flist.filterInfo, pExprInfo->base.flist.numOfFilters);
// }
}
tfree
(
pExprs
);
// filterFreeInfo(pFilters);
_cleanup:
doDestroyTask
(
pQInfo
);
return
NULL
;
}
bool
isValidQInfo
(
void
*
param
)
{
SQInfo
*
pQInfo
=
(
SQInfo
*
)
param
;
if
(
pQInfo
==
NULL
)
{
return
false
;
}
/*
* pQInfo->signature may be changed by another thread, so we assign value of signature
* into local variable, then compare by using local variable
*/
uint64_t
sig
=
(
uint64_t
)
pQInfo
->
signature
;
return
(
sig
==
(
uint64_t
)
pQInfo
);
}
int32_t
initQInfo
(
STsBufInfo
*
pTsBufInfo
,
void
*
tsdb
,
void
*
sourceOptr
,
SQInfo
*
pQInfo
,
STaskParam
*
param
,
char
*
start
,
int32_t
prevResultLen
,
void
*
merger
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -8957,7 +8732,7 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo*
_error:
// table query ref will be decrease during error handling
doDestroyTask
(
pQInfo
);
//
doDestroyTask(pQInfo);
return
code
;
}
...
...
@@ -9038,36 +8813,14 @@ void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols) {
return
NULL
;
}
void
doDestroyTask
(
SQInfo
*
pQInfo
)
{
if
(
!
isValidQInfo
(
pQInfo
))
{
return
;
}
//qDebug("QInfo:0x%"PRIx64" start to free QInfo", pQInfo->qId);
STaskRuntimeEnv
*
pRuntimeEnv
=
&
pQInfo
->
runtimeEnv
;
releaseQueryBuf
(
pRuntimeEnv
->
tableqinfoGroupInfo
.
numOfTables
);
doDestroyTableQueryInfo
(
&
pRuntimeEnv
->
tableqinfoGroupInfo
);
teardownQueryRuntimeEnv
(
&
pQInfo
->
runtimeEnv
);
STaskAttr
*
pQueryAttr
=
pQInfo
->
runtimeEnv
.
pQueryAttr
;
freeQueryAttr
(
pQueryAttr
);
// tsdbDestroyTableGroup(&pQueryAttr->tableGroupInfo);
tfree
(
pQInfo
->
pBuf
);
tfree
(
pQInfo
->
sql
);
taosArrayDestroy
(
pQInfo
->
summary
.
queryProfEvents
);
taosHashCleanup
(
pQInfo
->
summary
.
operatorProfResults
);
taosArrayDestroy
(
pRuntimeEnv
->
groupResInfo
.
pRows
);
pQInfo
->
signature
=
0
;
//qDebug("QInfo:0x%"PRIx64" QInfo is freed", pQInfo->qId);
void
doDestroyTask
(
SExecTaskInfo
*
pTaskInfo
)
{
qDebug
(
"QID:0x%"
PRIx64
" start to free execTask"
,
pTaskInfo
->
id
.
queryId
);
doDestroyTableQueryInfo
(
&
pTaskInfo
->
tableqinfoGroupInfo
);
// taosArrayDestroy(pTaskInfo->summary.queryProfEvents);
// taosHashCleanup(pTaskInfo->summary.operatorProfResults);
tfree
(
pQInfo
);
qDebug
(
"QID:0x%"
PRIx64
" execTask is freed"
,
pTaskInfo
->
id
.
queryId
);
tfree
(
pTaskInfo
);
}
int32_t
doDumpQueryResult
(
SQInfo
*
pQInfo
,
char
*
data
,
int8_t
compressed
,
int32_t
*
compLen
)
{
...
...
source/libs/parser/inc/dataBlockMgt.h
浏览文件 @
e777ee72
...
...
@@ -171,6 +171,8 @@ int32_t boundIdxCompar(const void *lhs, const void *rhs);
void
setBoundColumnInfo
(
SParsedDataColInfo
*
pColList
,
SSchema
*
pSchema
,
int32_t
numOfCols
);
void
destroyBoundColumnInfo
(
SParsedDataColInfo
*
pColList
);
void
destroyBlockArrayList
(
SArray
*
pDataBlockList
);
void
destroyBlockHashmap
(
SHashObj
*
pDataBlockHash
);
int32_t
initMemRowBuilder
(
SMemRowBuilder
*
pBuilder
,
uint32_t
nRows
,
uint32_t
nCols
,
uint32_t
nBoundCols
,
int32_t
allNullLen
);
int32_t
allocateMemIfNeed
(
STableDataBlocks
*
pDataBlock
,
int32_t
rowSize
,
int32_t
*
numOfRows
);
int32_t
getDataBlockFromList
(
SHashObj
*
pHashList
,
int64_t
id
,
int32_t
size
,
int32_t
startOffset
,
int32_t
rowSize
,
...
...
source/libs/parser/src/astValidate.c
浏览文件 @
e777ee72
...
...
@@ -213,10 +213,11 @@ SQueryStmtInfo *createQueryInfo() {
pQueryInfo
->
slimit
.
limit
=
-
1
;
pQueryInfo
->
slimit
.
offset
=
0
;
pQueryInfo
->
pDownstream
=
taosArrayInit
(
4
,
POINTER_BYTES
);
pQueryInfo
->
pDownstream
=
taosArrayInit
(
4
,
POINTER_BYTES
);
pQueryInfo
->
window
=
TSWINDOW_INITIALIZER
;
pQueryInfo
->
exprList
=
calloc
(
10
,
POINTER_BYTES
);
for
(
int32_t
i
=
0
;
i
<
10
;
++
i
)
{
pQueryInfo
->
exprList
[
i
]
=
taosArrayInit
(
4
,
POINTER_BYTES
);
}
...
...
@@ -232,7 +233,8 @@ static void destroyQueryInfoImpl(SQueryStmtInfo* pQueryInfo) {
cleanupFieldInfo
(
&
pQueryInfo
->
fieldsInfo
);
dropAllExprInfo
(
pQueryInfo
->
exprList
,
10
);
pQueryInfo
->
exprList
=
NULL
;
tfree
(
pQueryInfo
->
exprList
);
columnListDestroy
(
pQueryInfo
->
colList
);
pQueryInfo
->
colList
=
NULL
;
...
...
@@ -258,10 +260,10 @@ void destroyQueryInfo(SQueryStmtInfo* pQueryInfo) {
size_t
numOfUpstream
=
taosArrayGetSize
(
pQueryInfo
->
pDownstream
);
for
(
int32_t
i
=
0
;
i
<
numOfUpstream
;
++
i
)
{
SQueryStmtInfo
*
p
UpQueryInfo
=
taosArrayGetP
(
pQueryInfo
->
pDownstream
,
i
);
destroyQueryInfoImpl
(
p
UpQueryInfo
);
clearAllTableMetaInfo
(
p
UpQueryInfo
,
false
,
0
);
tfree
(
p
UpQueryInfo
);
SQueryStmtInfo
*
p
Downstream
=
taosArrayGetP
(
pQueryInfo
->
pDownstream
,
i
);
destroyQueryInfoImpl
(
p
Downstream
);
clearAllTableMetaInfo
(
p
Downstream
,
false
,
0
);
tfree
(
p
Downstream
);
}
destroyQueryInfoImpl
(
pQueryInfo
);
...
...
@@ -1395,6 +1397,13 @@ int32_t validateFillNode(SQueryStmtInfo *pQueryInfo, SSqlNode* pSqlNode, SMsgBuf
static
void
pushDownAggFuncExprInfo
(
SQueryStmtInfo
*
pQueryInfo
);
static
void
addColumnNodeFromLowerLevel
(
SQueryStmtInfo
*
pQueryInfo
);
static
void
freeItemHelper
(
void
*
pItem
)
{
void
**
p
=
pItem
;
if
(
*
p
!=
NULL
)
{
tfree
(
*
p
);
}
}
int32_t
validateSqlNode
(
SSqlNode
*
pSqlNode
,
SQueryStmtInfo
*
pQueryInfo
,
SMsgBuf
*
pMsgBuf
)
{
assert
(
pSqlNode
!=
NULL
&&
(
pSqlNode
->
from
==
NULL
||
taosArrayGetSize
(
pSqlNode
->
from
->
list
)
>
0
));
...
...
@@ -1590,7 +1599,10 @@ int32_t validateSqlNode(SSqlNode* pSqlNode, SQueryStmtInfo* pQueryInfo, SMsgBuf*
SArray
*
functionList
=
extractFunctionList
(
pQueryInfo
->
exprList
[
i
]);
extractFunctionDesc
(
functionList
,
&
pQueryInfo
->
info
);
if
((
code
=
checkForInvalidExpr
(
pQueryInfo
,
pMsgBuf
))
!=
TSDB_CODE_SUCCESS
)
{
code
=
checkForInvalidExpr
(
pQueryInfo
,
pMsgBuf
);
taosArrayDestroyEx
(
functionList
,
freeItemHelper
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
}
...
...
@@ -2902,6 +2914,8 @@ int32_t doAddOneProjectCol(SQueryStmtInfo* pQueryInfo, int32_t outputColIndex, S
}
pQueryInfo
->
info
.
projectionQuery
=
true
;
taosArrayDestroy
(
pColumnList
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -3983,5 +3997,9 @@ int32_t qParserValidateSqlNode(SParseContext *pCtx, SSqlInfo* pInfo, SQueryStmtI
validateSqlNode
(
p
,
pQueryInfo
,
&
buf
);
}
taosArrayDestroy
(
data
.
pTableMeta
);
taosArrayDestroy
(
req
.
pUdf
);
taosArrayDestroy
(
req
.
pTableName
);
return
code
;
}
source/libs/parser/src/dataBlockMgt.c
浏览文件 @
e777ee72
...
...
@@ -249,7 +249,7 @@ static FORCE_INLINE void convertSMemRow(SMemRow dest, SMemRow src, STableDataBlo
}
}
void
destroyDataBlock
(
STableDataBlocks
*
pDataBlock
)
{
static
void
destroyDataBlock
(
STableDataBlocks
*
pDataBlock
)
{
if
(
pDataBlock
==
NULL
)
{
return
;
}
...
...
@@ -273,12 +273,29 @@ void destroyBlockArrayList(SArray* pDataBlockList) {
size_t
size
=
taosArrayGetSize
(
pDataBlockList
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
destroyDataBlock
(
taosArrayGetP
(
pDataBlockList
,
i
));
void
*
p
=
taosArrayGetP
(
pDataBlockList
,
i
);
destroyDataBlock
(
p
);
}
taosArrayDestroy
(
pDataBlockList
);
}
void
destroyBlockHashmap
(
SHashObj
*
pDataBlockHash
)
{
if
(
pDataBlockHash
==
NULL
)
{
return
;
}
void
**
p1
=
taosHashIterate
(
pDataBlockHash
,
NULL
);
while
(
p1
)
{
STableDataBlocks
*
pBlocks
=
*
p1
;
destroyDataBlock
(
pBlocks
);
p1
=
taosHashIterate
(
pDataBlockHash
,
p1
);
}
taosHashCleanup
(
pDataBlockHash
);
}
// data block is disordered, sort it in ascending order
void
sortRemoveDataBlockDupRowsRaw
(
STableDataBlocks
*
dataBuf
)
{
SSubmitBlk
*
pBlocks
=
(
SSubmitBlk
*
)
dataBuf
->
pData
;
...
...
@@ -490,7 +507,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t
}
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
int32_t
expandSize
=
isRawPayload
?
getRowExpandSize
(
pOneTableBlock
->
pTableMeta
)
:
0
;
int32_t
expandSize
=
isRawPayload
?
getRowExpandSize
(
pOneTableBlock
->
pTableMeta
)
:
0
;
int64_t
destSize
=
dataBuf
->
size
+
pOneTableBlock
->
size
+
pBlocks
->
numOfRows
*
expandSize
+
sizeof
(
STColumn
)
*
getNumOfColumns
(
pOneTableBlock
->
pTableMeta
);
...
...
source/libs/parser/src/insertParser.c
浏览文件 @
e777ee72
...
...
@@ -525,10 +525,28 @@ static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) {
tdDestroyKVRowBuilder
(
&
pCxt
->
tagsBuilder
);
}
static
void
destroyDataBlock
(
STableDataBlocks
*
pDataBlock
)
{
if
(
pDataBlock
==
NULL
)
{
return
;
}
tfree
(
pDataBlock
->
pData
);
if
(
!
pDataBlock
->
cloned
)
{
// free the refcount for metermeta
if
(
pDataBlock
->
pTableMeta
!=
NULL
)
{
tfree
(
pDataBlock
->
pTableMeta
);
}
destroyBoundColumnInfo
(
&
pDataBlock
->
boundColumnInfo
);
}
tfree
(
pDataBlock
);
}
static
void
destroyInsertParseContext
(
SInsertParseContext
*
pCxt
)
{
destroyInsertParseContextForTable
(
pCxt
);
taosHashCleanup
(
pCxt
->
pVgroupsHashObj
);
taosHashCleanup
(
pCxt
->
pTableBlockHashObj
);
destroyBlockHashmap
(
pCxt
->
pTableBlockHashObj
);
destroyBlockArrayList
(
pCxt
->
pTableDataBlocks
);
destroyBlockArrayList
(
pCxt
->
pVgDataBlocks
);
}
...
...
source/libs/parser/src/parser.c
浏览文件 @
e777ee72
...
...
@@ -248,10 +248,15 @@ void qDestroyQuery(SQueryNode* pQueryNode) {
if
(
NULL
==
pQueryNode
)
{
return
;
}
if
(
nodeType
(
pQueryNode
)
==
TSDB_SQL_INSERT
||
nodeType
(
pQueryNode
)
==
TSDB_SQL_CREATE_TABLE
)
{
int32_t
type
=
nodeType
(
pQueryNode
);
if
(
type
==
TSDB_SQL_INSERT
||
type
==
TSDB_SQL_CREATE_TABLE
)
{
SVnodeModifOpStmtInfo
*
pModifInfo
=
(
SVnodeModifOpStmtInfo
*
)
pQueryNode
;
taosArrayDestroy
(
pModifInfo
->
pDataBlocks
);
}
tfree
(
pQueryNode
);
tfree
(
pQueryNode
);
}
else
if
(
type
==
TSDB_SQL_SELECT
)
{
SQueryStmtInfo
*
pQueryStmtInfo
=
(
SQueryStmtInfo
*
)
pQueryNode
;
destroyQueryInfo
(
pQueryStmtInfo
);
}
}
source/libs/parser/src/parserUtil.c
浏览文件 @
e777ee72
...
...
@@ -732,18 +732,8 @@ void cleanupFieldInfo(SFieldInfo* pFieldInfo) {
return
;
}
if
(
pFieldInfo
->
internalField
!=
NULL
)
{
size_t
num
=
taosArrayGetSize
(
pFieldInfo
->
internalField
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
// SInternalField* pfield = taosArrayGet(pFieldInfo->internalField, i);
// if (pfield->pExpr != NULL && pfield->pExpr->pExpr != NULL) {
// sqlExprDestroy(pfield->pExpr);
// }
}
}
taosArrayDestroy
(
pFieldInfo
->
internalField
);
//
tfree(pFieldInfo->final);
tfree
(
pFieldInfo
->
final
);
memset
(
pFieldInfo
,
0
,
sizeof
(
SFieldInfo
));
}
...
...
source/libs/parser/src/queryInfoUtil.c
浏览文件 @
e777ee72
...
...
@@ -191,10 +191,12 @@ void destroyExprInfo(SExprInfo* pExprInfo) {
for
(
int32_t
i
=
0
;
i
<
pExprInfo
->
base
.
numOfParams
;
++
i
)
{
taosVariantDestroy
(
&
pExprInfo
->
base
.
param
[
i
]);
}
tfree
(
pExprInfo
->
base
.
pColumns
);
tfree
(
pExprInfo
);
}
static
void
dropOneLevelExprInfo
(
SArray
*
pExprInfo
)
{
void
dropOneLevelExprInfo
(
SArray
*
pExprInfo
)
{
size_t
size
=
taosArrayGetSize
(
pExprInfo
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
...
...
@@ -239,6 +241,9 @@ void assignExprInfo(SExprInfo* dst, const SExprInfo* src) {
#endif
dst
->
pExpr
=
exprdup
(
src
->
pExpr
);
dst
->
base
.
pColumns
=
calloc
(
src
->
base
.
numOfCols
,
sizeof
(
SColumn
));
memcpy
(
dst
->
base
.
pColumns
,
src
->
base
.
pColumns
,
sizeof
(
SColumn
)
*
src
->
base
.
numOfCols
);
memset
(
dst
->
base
.
param
,
0
,
sizeof
(
SVariant
)
*
tListLen
(
dst
->
base
.
param
));
for
(
int32_t
j
=
0
;
j
<
src
->
base
.
numOfParams
;
++
j
)
{
taosVariantAssign
(
&
dst
->
base
.
param
[
j
],
&
src
->
base
.
param
[
j
]);
...
...
source/libs/planner/src/logicPlan.c
浏览文件 @
e777ee72
...
...
@@ -265,7 +265,6 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(const SQueryStmtInfo*
}
else
{
// here we can push down the projection to tablescan operator.
pNode
->
numOfExpr
=
num
;
pNode
->
pExpr
=
taosArrayInit
(
num
,
POINTER_BYTES
);
taosArrayAddAll
(
pNode
->
pExpr
,
p
);
}
}
...
...
@@ -357,7 +356,6 @@ SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) {
SArray
*
exprList
=
taosArrayInit
(
4
,
POINTER_BYTES
);
if
(
copyExprInfoList
(
exprList
,
pQueryInfo
->
exprList
[
0
],
uid
,
true
)
!=
0
)
{
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
// dropAllExprInfo(exprList);
exit
(
-
1
);
}
...
...
@@ -373,7 +371,6 @@ SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) {
// 4. add the projection query node
SQueryPlanNode
*
pNode
=
doAddTableColumnNode
(
pQueryInfo
,
&
info
,
exprList
,
tableColumnList
);
columnListDestroy
(
tableColumnList
);
// dropAllExprInfo(exprList);
taosArrayPush
(
pDownstream
,
&
pNode
);
}
...
...
@@ -398,7 +395,8 @@ SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) {
}
static
void
doDestroyQueryNode
(
SQueryPlanNode
*
pQueryNode
)
{
if
(
pQueryNode
->
info
.
type
==
QNODE_MODIFY
)
{
int32_t
type
=
nodeType
(
pQueryNode
);
if
(
type
==
QNODE_MODIFY
)
{
SDataPayloadInfo
*
pInfo
=
pQueryNode
->
pExtInfo
;
size_t
size
=
taosArrayGetSize
(
pInfo
->
payload
);
...
...
@@ -410,10 +408,16 @@ static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) {
taosArrayDestroy
(
pInfo
->
payload
);
}
if
(
type
==
QNODE_STREAMSCAN
||
type
==
QNODE_TABLESCAN
)
{
SQueryTableInfo
*
pQueryTableInfo
=
pQueryNode
->
pExtInfo
;
tfree
(
pQueryTableInfo
->
tableName
);
}
taosArrayDestroy
(
pQueryNode
->
pExpr
);
tfree
(
pQueryNode
->
pExtInfo
);
tfree
(
pQueryNode
->
pSchema
);
tfree
(
pQueryNode
->
info
.
name
);
// dropAllExprInfo(pQueryNode->pExpr);
if
(
pQueryNode
->
pChildren
!=
NULL
)
{
int32_t
size
=
(
int32_t
)
taosArrayGetSize
(
pQueryNode
->
pChildren
);
...
...
source/libs/planner/src/physicalPlan.c
浏览文件 @
e777ee72
...
...
@@ -155,6 +155,16 @@ static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t si
return
node
;
}
static
void
cleanupPhyNode
(
SPhyNode
*
pPhyNode
)
{
if
(
pPhyNode
==
NULL
)
{
return
;
}
dropOneLevelExprInfo
(
pPhyNode
->
pTargets
);
tfree
(
pPhyNode
->
targetSchema
.
pSchema
);
tfree
(
pPhyNode
);
}
static
SPhyNode
*
initScanNode
(
SQueryPlanNode
*
pPlanNode
,
SQueryTableInfo
*
pTable
,
int32_t
type
,
int32_t
size
)
{
SScanPhyNode
*
node
=
(
SScanPhyNode
*
)
initPhyNode
(
pPlanNode
,
type
,
size
);
...
...
@@ -445,3 +455,29 @@ void setExchangSourceNode(uint64_t templateId, SDownstreamSource *pSource, SPhyN
void
setSubplanExecutionNode
(
SSubplan
*
subplan
,
uint64_t
templateId
,
SDownstreamSource
*
pSource
)
{
setExchangSourceNode
(
templateId
,
pSource
,
subplan
->
pNode
);
}
static
void
destroyDataSinkNode
(
SDataSink
*
pSinkNode
)
{
if
(
pSinkNode
==
NULL
)
{
return
;
}
if
(
nodeType
(
pSinkNode
)
==
DSINK_Dispatch
)
{
SDataDispatcher
*
pDdSink
=
(
SDataDispatcher
*
)
pSinkNode
;
tfree
(
pDdSink
->
sink
.
schema
.
pSchema
);
}
tfree
(
pSinkNode
);
}
void
qDestroySubplan
(
SSubplan
*
pSubplan
)
{
if
(
pSubplan
==
NULL
)
{
return
;
}
taosArrayDestroy
(
pSubplan
->
pChildren
);
taosArrayDestroy
(
pSubplan
->
pParents
);
destroyDataSinkNode
(
pSubplan
->
pDataSink
);
cleanupPhyNode
(
pSubplan
->
pNode
);
tfree
(
pSubplan
);
}
source/libs/planner/src/physicalPlanJson.c
浏览文件 @
e777ee72
...
...
@@ -87,6 +87,7 @@ static bool fromObjectWithAlloc(const cJSON* json, const char* name, FFromJson f
static
const
char
*
jkPnodeType
=
"Type"
;
static
int32_t
getPnodeTypeSize
(
cJSON
*
json
)
{
switch
(
getNumber
(
json
,
jkPnodeType
))
{
case
OP_StreamScan
:
case
OP_TableScan
:
case
OP_DataBlocksOptScan
:
case
OP_TableSeqScan
:
...
...
@@ -869,6 +870,7 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) {
SPhyNode
*
phyNode
=
(
SPhyNode
*
)
obj
;
switch
(
phyNode
->
info
.
type
)
{
case
OP_TableScan
:
case
OP_StreamScan
:
case
OP_DataBlocksOptScan
:
case
OP_TableSeqScan
:
return
tableScanNodeFromJson
(
json
,
obj
);
...
...
@@ -1121,6 +1123,8 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
}
*
str
=
cJSON_Print
(
json
);
cJSON_Delete
(
json
);
// printf("====Physical plan:====\n");
// printf("%s\n", *str);
*
len
=
strlen
(
*
str
)
+
1
;
...
...
@@ -1187,14 +1191,14 @@ SQueryDag* qJsonToDag(const cJSON* pRoot) {
if
(
pDag
==
NULL
)
{
return
NULL
;
}
pDag
->
numOfSubplans
=
cJSON_GetNumberValue
(
cJSON_GetObjectItem
(
pRoot
,
"
numOfSubplans
"
));
pDag
->
queryId
=
cJSON_GetNumberValue
(
cJSON_GetObjectItem
(
pRoot
,
"
q
ueryId"
));
pDag
->
numOfSubplans
=
cJSON_GetNumberValue
(
cJSON_GetObjectItem
(
pRoot
,
"
Number
"
));
pDag
->
queryId
=
cJSON_GetNumberValue
(
cJSON_GetObjectItem
(
pRoot
,
"
Q
ueryId"
));
pDag
->
pSubplans
=
taosArrayInit
(
0
,
sizeof
(
SArray
));
if
(
pDag
->
pSubplans
==
NULL
)
{
free
(
pDag
);
return
NULL
;
}
cJSON
*
pLevels
=
cJSON_GetObjectItem
(
pRoot
,
"
p
Subplans"
);
cJSON
*
pLevels
=
cJSON_GetObjectItem
(
pRoot
,
"Subplans"
);
int
level
=
cJSON_GetArraySize
(
pLevels
);
for
(
int
i
=
0
;
i
<
level
;
i
++
)
{
SArray
*
plansOneLevel
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
...
...
source/libs/planner/src/planner.c
浏览文件 @
e777ee72
...
...
@@ -18,25 +18,6 @@
static
void
extractResSchema
(
struct
SQueryDag
*
const
*
pDag
,
SSchema
**
pResSchema
,
int32_t
*
numOfCols
);
static
void
destroyDataSinkNode
(
SDataSink
*
pSinkNode
)
{
if
(
pSinkNode
==
NULL
)
{
return
;
}
tfree
(
pSinkNode
);
}
void
qDestroySubplan
(
SSubplan
*
pSubplan
)
{
if
(
pSubplan
==
NULL
)
{
return
;
}
taosArrayDestroy
(
pSubplan
->
pChildren
);
taosArrayDestroy
(
pSubplan
->
pParents
);
destroyDataSinkNode
(
pSubplan
->
pDataSink
);
// todo destroy pNode
tfree
(
pSubplan
);
}
void
qDestroyQueryDag
(
struct
SQueryDag
*
pDag
)
{
if
(
pDag
==
NULL
)
{
return
;
...
...
@@ -51,6 +32,7 @@ void qDestroyQueryDag(struct SQueryDag* pDag) {
SSubplan
*
pSubplan
=
taosArrayGetP
(
pa
,
j
);
qDestroySubplan
(
pSubplan
);
}
taosArrayDestroy
(
pa
);
}
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
e777ee72
...
...
@@ -492,7 +492,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
SInputData
inputData
=
{.
pData
=
pRes
,
.
pTableRetrieveTsMap
=
NULL
};
code
=
dsPutDataBlock
(
sinkHandle
,
&
inputData
,
&
qcontinue
);
if
(
code
)
{
QW_TASK_ELOG
(
"dsPutDataBlock failed, code:%
x"
,
code
);
QW_TASK_ELOG
(
"dsPutDataBlock failed, code:%
s"
,
tstrerror
(
code
)
);
QW_ERR_JRET
(
code
);
}
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
e777ee72
...
...
@@ -1490,7 +1490,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
SSubQueryMsg
*
pMsg
=
(
SSubQueryMsg
*
)
msg
;
pMsg
->
header
.
vgId
=
htonl
(
tInfo
.
addr
.
nodeId
)
;
pMsg
->
header
.
vgId
=
tInfo
.
addr
.
nodeId
;
pMsg
->
sId
=
schMgmt
.
sId
;
pMsg
->
queryId
=
plan
->
id
.
queryId
;
...
...
@@ -1512,9 +1512,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
info
=
NULL
;
_return:
schedulerFreeTaskList
(
info
);
SCH_RET
(
code
);
}
...
...
source/libs/transport/inc/transComm.h
浏览文件 @
e777ee72
...
...
@@ -123,9 +123,9 @@ typedef struct {
}
SRpcReqContext
;
typedef
struct
{
SRpcInfo
*
p
Rpc
;
// associated SRpcInfo
SEpSet
epSet
;
// ip list provided by app
void
*
ahandle
;
// handle provided by app
SRpcInfo
*
p
TransInst
;
// associated SRpcInfo
SEpSet
epSet
;
// ip list provided by app
void
*
ahandle
;
// handle provided by app
// struct SRpcConn* pConn; // pConn allocated
tmsg_t
msgType
;
// message type
uint8_t
*
pCont
;
// content provided by app
...
...
@@ -182,7 +182,7 @@ typedef struct {
#define TRANS_RESERVE_SIZE (sizeof(STranConnCtx))
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead)
+ sizeof(STransDigestMsg)
)
#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead))
#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead)))
#define transContFromHead(msg) (msg + sizeof(STransMsgHead))
#define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead))
...
...
@@ -201,6 +201,7 @@ bool transDecompressMsg(char* msg, int32_t len, int32_t* flen);
void
transConnCtxDestroy
(
STransConnCtx
*
ctx
);
void
transFreeMsg
(
void
*
msg
);
typedef
struct
SConnBuffer
{
char
*
buf
;
int
len
;
...
...
source/libs/transport/src/rpcMain.c
浏览文件 @
e777ee72
...
...
@@ -242,11 +242,14 @@ void *rpcOpen(const SRpcInit *pInit) {
pRpc
=
(
SRpcInfo
*
)
calloc
(
1
,
sizeof
(
SRpcInfo
));
if
(
pRpc
==
NULL
)
return
NULL
;
if
(
pInit
->
label
)
tstrncpy
(
pRpc
->
label
,
pInit
->
label
,
strl
en
(
pInit
->
label
));
if
(
pInit
->
label
)
tstrncpy
(
pRpc
->
label
,
pInit
->
label
,
tListL
en
(
pInit
->
label
));
pRpc
->
connType
=
pInit
->
connType
;
if
(
pRpc
->
connType
==
TAOS_CONN_CLIENT
)
{
pRpc
->
numOfThreads
=
pInit
->
numOfThreads
;
if
(
pRpc
->
numOfThreads
>=
10
)
{
pRpc
->
numOfThreads
=
10
;
}
}
else
{
pRpc
->
numOfThreads
=
pInit
->
numOfThreads
>
TSDB_MAX_RPC_THREADS
?
TSDB_MAX_RPC_THREADS
:
pInit
->
numOfThreads
;
}
...
...
@@ -769,8 +772,8 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
}
taosHashPut
(
pRpc
->
hash
,
hashstr
,
size
,
(
char
*
)
&
pConn
,
POINTER_BYTES
);
tDebug
(
"%s %p server connection is allocated, uid:0x%x sid:%d key:%s spi:%d"
,
pRpc
->
label
,
pConn
,
pConn
->
linkUid
,
sid
,
hashstr
,
pConn
->
spi
);
tDebug
(
"%s %p server connection is allocated, uid:0x%x sid:%d key:%s spi:%d"
,
pRpc
->
label
,
pConn
,
pConn
->
linkUid
,
sid
,
hashstr
,
pConn
->
spi
);
}
return
pConn
;
...
...
source/libs/transport/src/transCli.c
浏览文件 @
e777ee72
...
...
@@ -30,6 +30,7 @@ typedef struct SCliConn {
char
spi
;
char
secured
;
uint64_t
expireTime
;
int8_t
notifyCount
;
// timers already notify to client
}
SCliConn
;
typedef
struct
SCliMsg
{
...
...
@@ -72,8 +73,6 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* co
// register timer in each thread to clear expire conn
static
void
clientTimeoutCb
(
uv_timer_t
*
handle
);
// process data read from server, auth/decompress etc later
static
void
clientHandleResp
(
SCliConn
*
conn
);
// check whether already read complete packet from server
static
bool
clientReadComplete
(
SConnBuffer
*
pBuf
);
// alloc buf for read
...
...
@@ -88,10 +87,15 @@ static void clientAsyncCb(uv_async_t* handle);
static
void
clientDestroy
(
uv_handle_t
*
handle
);
static
void
clientConnDestroy
(
SCliConn
*
pConn
,
bool
clear
/*clear tcp handle or not*/
);
static
void
clientMsgDestroy
(
SCliMsg
*
pMsg
);
// process data read from server, auth/decompress etc later
static
void
clientHandleResp
(
SCliConn
*
conn
);
// handle except about conn
static
void
clientHandleExcept
(
SCliConn
*
conn
);
// handle req from app
static
void
clientHandleReq
(
SCliMsg
*
pMsg
,
SCliThrdObj
*
pThrd
);
static
void
clientMsgDestroy
(
SCliMsg
*
pMsg
);
static
void
destroyTransConnCtx
(
STransConnCtx
*
ctx
);
// thread obj
static
SCliThrdObj
*
createThrdObj
();
static
void
destroyThrdObj
(
SCliThrdObj
*
pThrd
);
...
...
@@ -100,22 +104,50 @@ static void* clientThread(void* arg);
static
void
clientHandleResp
(
SCliConn
*
conn
)
{
STransConnCtx
*
pCtx
=
((
SCliMsg
*
)
conn
->
data
)
->
ctx
;
SRpcInfo
*
pRpc
=
pCtx
->
pRpc
;
SRpcMsg
rpcMsg
;
SRpcInfo
*
pRpc
=
pCtx
->
pTransInst
;
STransMsgHead
*
pHead
=
(
STransMsgHead
*
)(
conn
->
readBuf
.
buf
);
pHead
->
code
=
htonl
(
pHead
->
code
);
pHead
->
msgLen
=
htonl
(
pHead
->
msgLen
);
rpcMsg
.
pCont
=
conn
->
readBuf
.
buf
;
rpcMsg
.
contLen
=
conn
->
readBuf
.
len
;
SRpcMsg
rpcMsg
;
rpcMsg
.
contLen
=
transContLenFromMsg
(
pHead
->
msgLen
);
rpcMsg
.
pCont
=
transContFromHead
(
pHead
);
rpcMsg
.
code
=
pHead
->
code
;
rpcMsg
.
msgType
=
pHead
->
msgType
;
rpcMsg
.
ahandle
=
pCtx
->
ahandle
;
(
pRpc
->
cfp
)(
NULL
,
&
rpcMsg
,
NULL
);
conn
->
notifyCount
+=
1
;
SCliThrdObj
*
pThrd
=
conn
->
hostThrd
;
tfree
(
conn
->
data
);
addConnToPool
(
pThrd
->
pool
,
pCtx
->
ip
,
pCtx
->
port
,
conn
);
// start thread's timer of conn pool if not active
if
(
!
uv_is_active
((
uv_handle_t
*
)
pThrd
->
pTimer
)
&&
pRpc
->
idleTime
>
0
)
{
uv_timer_start
((
uv_timer_t
*
)
pThrd
->
pTimer
,
clientTimeoutCb
,
CONN_PERSIST_TIME
(
pRpc
->
idleTime
)
/
2
,
0
);
}
free
(
pCtx
->
ip
);
free
(
pCtx
);
// impl
destroyTransConnCtx
(
pCtx
);
}
static
void
clientHandleExcept
(
SCliConn
*
pConn
)
{
SCliMsg
*
pMsg
=
pConn
->
data
;
STransConnCtx
*
pCtx
=
pMsg
->
ctx
;
SRpcInfo
*
pRpc
=
pCtx
->
pTransInst
;
transFreeMsg
((
pMsg
->
msg
.
pCont
));
pMsg
->
msg
.
pCont
=
NULL
;
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
ahandle
=
pCtx
->
ahandle
;
rpcMsg
.
code
=
-
1
;
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
(
pRpc
->
cfp
)(
NULL
,
&
rpcMsg
,
NULL
);
tfree
(
pConn
->
data
);
pConn
->
notifyCount
+=
1
;
destroyTransConnCtx
(
pCtx
);
clientConnDestroy
(
pConn
,
true
);
}
static
void
clientTimeoutCb
(
uv_timer_t
*
handle
)
{
...
...
@@ -191,6 +223,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) {
SRpcInfo
*
pRpc
=
((
SCliThrdObj
*
)
conn
->
hostThrd
)
->
pTransInst
;
conn
->
expireTime
=
taosGetTimestampMs
()
+
CONN_PERSIST_TIME
(
pRpc
->
idleTime
);
SConnList
*
plist
=
taosHashGet
((
SHashObj
*
)
pool
,
key
,
strlen
(
key
));
conn
->
notifyCount
=
0
;
// list already create before
assert
(
plist
!=
NULL
);
QUEUE_PUSH
(
&
plist
->
conn
,
&
conn
->
conn
);
...
...
@@ -246,19 +279,21 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf
if
(
nread
>
0
)
{
pBuf
->
len
+=
nread
;
if
(
clientReadComplete
(
pBuf
))
{
tDebug
(
"
alread read complete"
);
tDebug
(
"
conn %p read complete"
,
conn
);
clientHandleResp
(
conn
);
}
else
{
tDebug
(
"
read half packet, continue to read"
);
tDebug
(
"
conn %p read partial packet, continue to read"
,
conn
);
}
return
;
}
assert
(
nread
<=
0
);
if
(
nread
==
0
)
{
tError
(
"conn %p closed"
,
conn
);
return
;
}
if
(
nread
!=
UV_EOF
)
{
tDebug
(
"read error %s"
,
uv_err_name
(
nread
));
if
(
nread
<
0
)
{
tError
(
"conn %p read error: %s"
,
conn
,
uv_err_name
(
nread
));
clientHandleExcept
(
conn
);
}
// tDebug("Read error %s\n", uv_err_name(nread));
// uv_close((uv_handle_t*)handle, clientDestroy);
...
...
@@ -282,19 +317,24 @@ static void clientDestroy(uv_handle_t* handle) {
static
void
clientWriteCb
(
uv_write_t
*
req
,
int
status
)
{
SCliConn
*
pConn
=
req
->
data
;
SCliMsg
*
pMsg
=
pConn
->
data
;
transFreeMsg
((
pMsg
->
msg
.
pCont
));
pMsg
->
msg
.
pCont
=
NULL
;
if
(
status
==
0
)
{
tDebug
(
"
data already was written on stream"
);
tDebug
(
"
conn %p data already was written out"
,
pConn
);
}
else
{
tError
(
"
failed to write: %s"
,
uv_err_name
(
status
));
client
ConnDestroy
(
pConn
,
true
);
tError
(
"
conn %p failed to write: %s"
,
pConn
,
uv_err_name
(
status
));
client
HandleExcept
(
pConn
);
return
;
}
SCliThrdObj
*
pThrd
=
pConn
->
hostThrd
;
if
(
pConn
->
stream
==
NULL
)
{
pConn
->
stream
=
(
uv_stream_t
*
)
malloc
(
sizeof
(
uv_tcp_t
));
uv_tcp_init
(
pThrd
->
loop
,
(
uv_tcp_t
*
)
pConn
->
stream
);
pConn
->
stream
->
data
=
pConn
;
}
//
if (pConn->stream == NULL) {
//
pConn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
//
uv_tcp_init(pThrd->loop, (uv_tcp_t*)pConn->stream);
//
pConn->stream->data = pConn;
//
}
uv_read_start
((
uv_stream_t
*
)
pConn
->
stream
,
clientAllocReadBufferCb
,
clientReadCb
);
// impl later
}
...
...
@@ -310,30 +350,19 @@ static void clientWrite(SCliConn* pConn) {
pHead
->
msgLen
=
(
int32_t
)
htonl
((
uint32_t
)
msgLen
);
uv_buf_t
wb
=
uv_buf_init
((
char
*
)
pHead
,
msgLen
);
tDebug
(
"
data write out, msgType : %d, len: %d"
,
pHead
->
msgType
,
msgLen
);
tDebug
(
"
conn %p data write out, msgType : %d, len: %d"
,
pConn
,
pHead
->
msgType
,
msgLen
);
uv_write
(
pConn
->
writeReq
,
(
uv_stream_t
*
)
pConn
->
stream
,
&
wb
,
1
,
clientWriteCb
);
}
static
void
clientConnCb
(
uv_connect_t
*
req
,
int
status
)
{
// impl later
SCliConn
*
pConn
=
req
->
data
;
SCliMsg
*
pMsg
=
pConn
->
data
;
STransConnCtx
*
pCtx
=
pMsg
->
ctx
;
SRpcInfo
*
pRpc
=
pCtx
->
pRpc
;
if
(
status
!=
0
)
{
// tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status));
tError
(
"failed to connect server, errmsg: %s"
,
uv_strerror
(
status
));
// call user fp later
SRpcMsg
rpcMsg
;
rpcMsg
.
ahandle
=
pCtx
->
ahandle
;
// SRpcInfo* pRpc = pMsg->ctx->pRpc;
(
pRpc
->
cfp
)(
NULL
,
&
rpcMsg
,
NULL
);
clientConnDestroy
(
pConn
,
true
);
// uv_close((uv_handle_t*)req->handle, clientDestroy);
tError
(
"conn %p failed to connect server: %s"
,
pConn
,
uv_strerror
(
status
));
clientHandleExcept
(
pConn
);
return
;
}
tDebug
(
"conn %p create"
,
pConn
);
assert
(
pConn
->
stream
==
req
->
handle
);
clientWrite
(
pConn
);
...
...
@@ -349,6 +378,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn
*
conn
=
getConnFromPool
(
pThrd
->
pool
,
pCtx
->
ip
,
pCtx
->
port
);
if
(
conn
!=
NULL
)
{
// impl later
tDebug
(
"conn %p get from conn pool"
,
conn
);
conn
->
data
=
pMsg
;
conn
->
writeReq
->
data
=
conn
;
...
...
@@ -462,6 +492,13 @@ static void destroyThrdObj(SCliThrdObj* pThrd) {
free
(
pThrd
->
loop
);
free
(
pThrd
);
}
static
void
destroyTransConnCtx
(
STransConnCtx
*
ctx
)
{
if
(
ctx
!=
NULL
)
{
free
(
ctx
->
ip
);
}
free
(
ctx
);
}
//
void
taosCloseClient
(
void
*
arg
)
{
// impl later
...
...
@@ -472,7 +509,6 @@ void taosCloseClient(void* arg) {
free
(
cli
->
pThreadObj
);
free
(
cli
);
}
void
rpcSendRequest
(
void
*
shandle
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
int64_t
*
pRid
)
{
// impl later
char
*
ip
=
(
char
*
)(
pEpSet
->
fqdn
[
pEpSet
->
inUse
]);
...
...
@@ -487,7 +523,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t*
STransConnCtx
*
pCtx
=
calloc
(
1
,
sizeof
(
STransConnCtx
));
pCtx
->
p
Rpc
=
(
SRpcInfo
*
)
shandle
;
pCtx
->
p
TransInst
=
(
SRpcInfo
*
)
shandle
;
pCtx
->
ahandle
=
pMsg
->
ahandle
;
pCtx
->
msgType
=
pMsg
->
msgType
;
pCtx
->
ip
=
strdup
(
ip
);
...
...
source/libs/transport/src/transComm.c
浏览文件 @
e777ee72
...
...
@@ -191,4 +191,11 @@ void transConnCtxDestroy(STransConnCtx* ctx) {
free
(
ctx
->
ip
);
free
(
ctx
);
}
void
transFreeMsg
(
void
*
msg
)
{
if
(
msg
==
NULL
)
{
return
;
}
free
((
char
*
)
msg
-
sizeof
(
STransMsgHead
));
}
#endif
source/libs/transport/src/transSrv.c
浏览文件 @
e777ee72
...
...
@@ -16,6 +16,7 @@
#ifdef USE_UV
#include "transComm.h"
typedef
struct
SConn
{
uv_tcp_t
*
pTcp
;
uv_write_t
*
pWriter
;
...
...
@@ -26,7 +27,6 @@ typedef struct SConn {
int
ref
;
int
persist
;
// persist connection or not
SConnBuffer
connBuf
;
// read buf,
int
count
;
int
inType
;
void
*
pTransInst
;
// rpc init
void
*
ahandle
;
//
...
...
@@ -226,7 +226,7 @@ static void uvHandleActivityTimeout(uv_timer_t* handle) {
tDebug
(
"%p timeout since no activity"
,
conn
);
}
static
void
uv
ProcessData
(
SConn
*
pConn
)
{
static
void
uv
HandleReq
(
SConn
*
pConn
)
{
SRecvInfo
info
;
SRecvInfo
*
p
=
&
info
;
SConnBuffer
*
pBuf
=
&
pConn
->
connBuf
;
...
...
@@ -271,6 +271,7 @@ static void uvProcessData(SConn* pConn) {
rpcMsg
.
ahandle
=
NULL
;
rpcMsg
.
handle
=
pConn
;
pConn
->
ref
++
;
(
*
(
pRpc
->
cfp
))(
pRpc
->
parent
,
&
rpcMsg
,
NULL
);
// uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
// auth
...
...
@@ -283,20 +284,23 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
SConnBuffer
*
pBuf
=
&
conn
->
connBuf
;
if
(
nread
>
0
)
{
pBuf
->
len
+=
nread
;
tDebug
(
"
on read %p, total read: %d, current read: %d"
,
cli
,
pBuf
->
len
,
(
int
)
nread
);
tDebug
(
"
conn %p read summroy, total read: %d, current read: %d"
,
conn
,
pBuf
->
len
,
(
int
)
nread
);
if
(
readComplete
(
pBuf
))
{
tDebug
(
"
alread read complete packet"
);
uv
ProcessData
(
conn
);
tDebug
(
"
conn %p alread read complete packet"
,
conn
);
uv
HandleReq
(
conn
);
}
else
{
tDebug
(
"
read half packet, continue to read"
);
tDebug
(
"
conn %p read partial packet, continue to read"
,
conn
);
}
return
;
}
if
(
nread
==
0
)
{
tDebug
(
"conn %p except read"
,
conn
);
// destroyConn(conn, true);
return
;
}
if
(
nread
!=
UV_EOF
)
{
tDebug
(
"read error %s"
,
uv_err_name
(
nread
));
tDebug
(
"conn %p read error: %s"
,
conn
,
uv_err_name
(
nread
));
destroyConn
(
conn
,
true
);
}
}
void
uvAllocConnBufferCb
(
uv_handle_t
*
handle
,
size_t
suggested_size
,
uv_buf_t
*
buf
)
{
...
...
@@ -306,7 +310,8 @@ void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
void
uvOnTimeoutCb
(
uv_timer_t
*
handle
)
{
// opt
tDebug
(
"time out"
);
SConn
*
pConn
=
handle
->
data
;
tDebug
(
"conn %p time out"
,
pConn
);
}
void
uvOnWriteCb
(
uv_write_t
*
req
,
int
status
)
{
...
...
@@ -316,10 +321,14 @@ void uvOnWriteCb(uv_write_t* req, int status) {
buf
->
len
=
0
;
memset
(
buf
->
buf
,
0
,
buf
->
cap
);
buf
->
left
=
-
1
;
SRpcMsg
*
pMsg
=
&
conn
->
sendMsg
;
transFreeMsg
(
pMsg
->
pCont
);
if
(
status
==
0
)
{
tDebug
(
"
data already was written on stream"
);
tDebug
(
"
conn %p data already was written on stream"
,
conn
);
}
else
{
tDebug
(
"
failed to write data, %s"
,
uv_err_name
(
status
));
tDebug
(
"
conn %p failed to write data, %s"
,
conn
,
uv_err_name
(
status
));
destroyConn
(
conn
,
true
);
}
// opt
...
...
@@ -334,7 +343,7 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
static
void
uvPrepareSendData
(
SConn
*
conn
,
uv_buf_t
*
wb
)
{
// impl later;
tDebug
(
"
prepare to send back"
);
tDebug
(
"
conn %p prepare to send resp"
,
conn
);
SRpcMsg
*
pMsg
=
&
conn
->
sendMsg
;
if
(
pMsg
->
pCont
==
0
)
{
pMsg
->
pCont
=
(
void
*
)
rpcMallocCont
(
0
);
...
...
@@ -427,6 +436,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
assert
(
pending
==
UV_TCP
);
SConn
*
pConn
=
createConn
();
pConn
->
pTransInst
=
pThrd
->
pTransInst
;
/* init conn timer*/
pConn
->
pTimer
=
malloc
(
sizeof
(
uv_timer_t
));
...
...
@@ -448,7 +458,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
if
(
uv_accept
(
q
,
(
uv_stream_t
*
)(
pConn
->
pTcp
))
==
0
)
{
uv_os_fd_t
fd
;
uv_fileno
((
const
uv_handle_t
*
)
pConn
->
pTcp
,
&
fd
);
tDebug
(
"
new connection created: %d"
,
fd
);
tDebug
(
"
conn %p created, fd: %d"
,
pConn
,
fd
);
uv_read_start
((
uv_stream_t
*
)(
pConn
->
pTcp
),
uvAllocReadBufferCb
,
uvOnReadCb
);
}
else
{
tDebug
(
"failed to create new connection"
);
...
...
@@ -515,19 +525,19 @@ void* workerThread(void* arg) {
static
SConn
*
createConn
()
{
SConn
*
pConn
=
(
SConn
*
)
calloc
(
1
,
sizeof
(
SConn
));
++
pConn
->
ref
;
return
pConn
;
}
static
void
connCloseCb
(
uv_handle_t
*
handle
)
{
// impl later
//
}
static
void
destroyConn
(
SConn
*
conn
,
bool
clear
)
{
if
(
conn
==
NULL
)
{
return
;
}
if
(
--
conn
->
ref
==
0
)
{
return
;
}
if
(
clear
)
{
uv_handle_t
handle
=
*
((
uv_handle_t
*
)
conn
->
pTcp
);
uv_close
(
&
handle
,
NULL
);
uv_close
((
uv_handle_t
*
)
conn
->
pTcp
,
NULL
);
}
uv_timer_stop
(
conn
->
pTimer
);
free
(
conn
->
pTimer
);
...
...
@@ -646,6 +656,7 @@ void rpcSendResponse(const SRpcMsg* pMsg) {
pthread_mutex_lock
(
&
pThrd
->
connMtx
);
QUEUE_PUSH
(
&
pThrd
->
conn
,
&
pConn
->
queue
);
pthread_mutex_unlock
(
&
pThrd
->
connMtx
);
tDebug
(
"conn %p start to send resp"
,
pConn
);
uv_async_send
(
pConn
->
pWorkerAsync
);
}
...
...
source/libs/transport/test/rclient.c
浏览文件 @
e777ee72
...
...
@@ -63,7 +63,7 @@ static void *sendRequest(void *param) {
if
(
pInfo
->
num
%
20000
==
0
)
tInfo
(
"thread:%d, %d requests have been sent"
,
pInfo
->
index
,
pInfo
->
num
);
// tsem_wait(&pInfo->rspSem);
tsem_wait
(
&
pInfo
->
rspSem
);
tDebug
(
"recv response"
);
tDebug
(
"recv response
succefully
"
);
// usleep(100000000);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录