Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d05d704a
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d05d704a
编写于
4月 25, 2022
作者:
S
slzhou
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of github.com:taosdata/TDengine into 3.0_udfd
上级
e29141a4
b05c263e
变更
28
展开全部
隐藏空白更改
内联
并排
Showing
28 changed file
with
539 addition
and
365 deletion
+539
-365
example/src/tmq.c
example/src/tmq.c
+6
-2
include/common/tmsg.h
include/common/tmsg.h
+10
-12
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+1
-1
include/libs/wal/wal.h
include/libs/wal/wal.h
+7
-1
source/client/src/clientMsgHandler.c
source/client/src/clientMsgHandler.c
+19
-11
source/client/src/tmq.c
source/client/src/tmq.c
+7
-5
source/common/src/tmsg.c
source/common/src/tmsg.c
+10
-6
source/dnode/mgmt/interface/src/dmInt.c
source/dnode/mgmt/interface/src/dmInt.c
+6
-2
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
+1
-0
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+0
-2
source/dnode/mnode/impl/src/mndDef.c
source/dnode/mnode/impl/src/mndDef.c
+0
-3
source/dnode/mnode/impl/src/mndQnode.c
source/dnode/mnode/impl/src/mndQnode.c
+3
-2
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+2
-177
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+4
-8
source/dnode/qnode/src/qnode.c
source/dnode/qnode/src/qnode.c
+6
-1
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+0
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+33
-3
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+1
-1
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+5
-4
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+70
-57
source/libs/qcom/src/querymsg.c
source/libs/qcom/src/querymsg.c
+4
-12
source/libs/qworker/src/qworkerMsg.c
source/libs/qworker/src/qworkerMsg.c
+8
-8
source/libs/scalar/inc/sclInt.h
source/libs/scalar/inc/sclInt.h
+3
-0
source/libs/scalar/src/scalar.c
source/libs/scalar/src/scalar.c
+120
-38
source/libs/scalar/test/scalar/scalarTests.cpp
source/libs/scalar/test/scalar/scalarTests.cpp
+116
-0
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+87
-0
source/libs/wal/src/walWrite.c
source/libs/wal/src/walWrite.c
+3
-5
tools/shell/src/shellNettest.c
tools/shell/src/shellNettest.c
+7
-3
未找到文件。
example/src/tmq.c
浏览文件 @
d05d704a
...
...
@@ -17,6 +17,7 @@
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include "taos.h"
static
int
running
=
1
;
...
...
@@ -47,6 +48,7 @@ int32_t init_env() {
return
-
1
;
}
taos_free_result
(
pRes
);
sleep
(
1
);
pRes
=
taos_query
(
pConn
,
"use abc1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
...
...
@@ -58,6 +60,7 @@ int32_t init_env() {
pRes
=
taos_query
(
pConn
,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
assert
(
0
);
printf
(
"failed to create super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
...
...
@@ -265,10 +268,11 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
}
int
main
(
int
argc
,
char
*
argv
[])
{
int
code
;
if
(
argc
>
1
)
{
printf
(
"env init
\n
"
);
code
=
init_env
();
if
(
init_env
()
<
0
)
{
return
-
1
;
}
create_topic
();
}
tmq_t
*
tmq
=
build_consumer
();
...
...
include/common/tmsg.h
浏览文件 @
d05d704a
...
...
@@ -1314,7 +1314,6 @@ typedef struct {
}
SMqConsumerLostMsg
;
typedef
struct
{
int32_t
topicNum
;
int64_t
consumerId
;
char
cgroup
[
TSDB_CGROUP_LEN
];
SArray
*
topicNames
;
// SArray<char*>
...
...
@@ -1322,22 +1321,27 @@ typedef struct {
static
FORCE_INLINE
int32_t
tSerializeSCMSubscribeReq
(
void
**
buf
,
const
SCMSubscribeReq
*
pReq
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
topicNum
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
consumerId
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
cgroup
);
for
(
int32_t
i
=
0
;
i
<
pReq
->
topicNum
;
i
++
)
{
int32_t
topicNum
=
taosArrayGetSize
(
pReq
->
topicNames
);
tlen
+=
taosEncodeFixedI32
(
buf
,
topicNum
);
for
(
int32_t
i
=
0
;
i
<
topicNum
;
i
++
)
{
tlen
+=
taosEncodeString
(
buf
,
(
char
*
)
taosArrayGetP
(
pReq
->
topicNames
,
i
));
}
return
tlen
;
}
static
FORCE_INLINE
void
*
tDeserializeSCMSubscribeReq
(
void
*
buf
,
SCMSubscribeReq
*
pReq
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
topicNum
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
consumerId
);
buf
=
taosDecodeStringTo
(
buf
,
pReq
->
cgroup
);
pReq
->
topicNames
=
taosArrayInit
(
pReq
->
topicNum
,
sizeof
(
void
*
));
for
(
int32_t
i
=
0
;
i
<
pReq
->
topicNum
;
i
++
)
{
int32_t
topicNum
;
buf
=
taosDecodeFixedI32
(
buf
,
&
topicNum
);
pReq
->
topicNames
=
taosArrayInit
(
topicNum
,
sizeof
(
void
*
));
for
(
int32_t
i
=
0
;
i
<
topicNum
;
i
++
)
{
char
*
name
;
buf
=
taosDecodeString
(
buf
,
&
name
);
taosArrayPush
(
pReq
->
topicNames
,
&
name
);
...
...
@@ -1969,7 +1973,6 @@ typedef struct {
int8_t
withTbName
;
int8_t
withSchema
;
int8_t
withTag
;
int8_t
withTagSchema
;
char
*
qmsg
;
}
SMqRebVgReq
;
...
...
@@ -1984,7 +1987,6 @@ static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pR
tlen
+=
taosEncodeFixedI8
(
buf
,
pReq
->
withTbName
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pReq
->
withSchema
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pReq
->
withTag
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pReq
->
withTagSchema
);
if
(
pReq
->
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
tlen
+=
taosEncodeString
(
buf
,
pReq
->
qmsg
);
}
...
...
@@ -2001,7 +2003,6 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq)
buf
=
taosDecodeFixedI8
(
buf
,
&
pReq
->
withTbName
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pReq
->
withSchema
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pReq
->
withTag
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pReq
->
withTagSchema
);
if
(
pReq
->
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
buf
=
taosDecodeString
(
buf
,
&
pReq
->
qmsg
);
}
...
...
@@ -2590,7 +2591,6 @@ typedef struct {
int8_t
withTbName
;
int8_t
withSchema
;
int8_t
withTag
;
int8_t
withTagSchema
;
SArray
*
blockDataLen
;
// SArray<int32_t>
SArray
*
blockData
;
// SArray<SRetrieveTableRsp*>
SArray
*
blockTbName
;
// SArray<char*>
...
...
@@ -2609,7 +2609,6 @@ static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp
tlen
+=
taosEncodeFixedI8
(
buf
,
pRsp
->
withTbName
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pRsp
->
withSchema
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pRsp
->
withTag
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pRsp
->
withTagSchema
);
for
(
int32_t
i
=
0
;
i
<
pRsp
->
blockNum
;
i
++
)
{
int32_t
bLen
=
*
(
int32_t
*
)
taosArrayGet
(
pRsp
->
blockDataLen
,
i
);
...
...
@@ -2632,7 +2631,6 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
buf
=
taosDecodeFixedI8
(
buf
,
&
pRsp
->
withTbName
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pRsp
->
withSchema
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pRsp
->
withTag
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pRsp
->
withTagSchema
);
for
(
int32_t
i
=
0
;
i
<
pRsp
->
blockNum
;
i
++
)
{
int32_t
bLen
=
0
;
...
...
include/libs/catalog/catalog.h
浏览文件 @
d05d704a
...
...
@@ -51,7 +51,7 @@ typedef struct SMetaData {
SArray
*
pTableMeta
;
// STableMeta array
SArray
*
pVgroupInfo
;
// SVgroupInfo list
SArray
*
pUdfList
;
// udf info list
SArray
*
p
EpSetList
;
// qnode epset list, SArray<SEpSet
>
SArray
*
p
QnodeList
;
// qnode list, SArray<SQueryNodeAddr
>
}
SMetaData
;
typedef
struct
SCatalogCfg
{
...
...
include/libs/wal/wal.h
浏览文件 @
d05d704a
...
...
@@ -192,7 +192,13 @@ int32_t walEndSnapshot(SWal *);
SWalReadHandle
*
walOpenReadHandle
(
SWal
*
);
void
walCloseReadHandle
(
SWalReadHandle
*
);
int32_t
walReadWithHandle
(
SWalReadHandle
*
pRead
,
int64_t
ver
);
int32_t
walReadWithHandle_s
(
SWalReadHandle
*
pRead
,
int64_t
ver
,
SWalReadHead
**
ppHead
);
// only for tq usage
// int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **ppHead);
void
walSetReaderCapacity
(
SWalReadHandle
*
pRead
,
int32_t
capacity
);
int32_t
walFetchHead
(
SWalReadHandle
*
pRead
,
int64_t
ver
,
SWalHead
*
pHead
);
int32_t
walFetchBody
(
SWalReadHandle
*
pRead
,
SWalHead
**
ppHead
);
int32_t
walSkipFetchBody
(
SWalReadHandle
*
pRead
,
const
SWalHead
*
pHead
);
// deprecated
#if 0
...
...
source/client/src/clientMsgHandler.c
浏览文件 @
d05d704a
...
...
@@ -13,13 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tdef.h"
#include "tname.h"
#include "catalog.h"
#include "clientInt.h"
#include "clientLog.h"
#include "
catalog
.h"
#include "
os
.h"
#include "query.h"
#include "tdef.h"
#include "tname.h"
int32_t
(
*
handleRequestRspFp
[
TDMT_MAX
])(
void
*
,
const
SDataBuf
*
pMsg
,
int32_t
code
);
...
...
@@ -50,7 +50,13 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SConnectRsp
connectRsp
=
{
0
};
tDeserializeSConnectRsp
(
pMsg
->
pData
,
pMsg
->
len
,
&
connectRsp
);
assert
(
connectRsp
.
epSet
.
numOfEps
>
0
);
/*assert(connectRsp.epSet.numOfEps > 0);*/
if
(
connectRsp
.
epSet
.
numOfEps
==
0
)
{
taosMemoryFree
(
pMsg
->
pData
);
setErrno
(
pRequest
,
TSDB_CODE_MND_APP_ERROR
);
tsem_post
(
&
pRequest
->
body
.
rspSem
);
return
code
;
}
if
(
!
isEpsetEqual
(
&
pTscObj
->
pAppInfo
->
mgmtEp
.
epSet
,
&
connectRsp
.
epSet
))
{
updateEpSet_s
(
&
pTscObj
->
pAppInfo
->
mgmtEp
,
&
connectRsp
.
epSet
);
...
...
@@ -82,18 +88,20 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return
0
;
}
SMsgSendInfo
*
buildMsgInfoImpl
(
SRequestObj
*
pRequest
)
{
SMsgSendInfo
*
buildMsgInfoImpl
(
SRequestObj
*
pRequest
)
{
SMsgSendInfo
*
pMsgSendInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgSendInfo
));
pMsgSendInfo
->
requestObjRefId
=
pRequest
->
self
;
pMsgSendInfo
->
requestId
=
pRequest
->
requestId
;
pMsgSendInfo
->
param
=
pRequest
;
pMsgSendInfo
->
msgType
=
pRequest
->
type
;
pMsgSendInfo
->
requestId
=
pRequest
->
requestId
;
pMsgSendInfo
->
param
=
pRequest
;
pMsgSendInfo
->
msgType
=
pRequest
->
type
;
assert
(
pRequest
!=
NULL
);
pMsgSendInfo
->
msgInfo
=
pRequest
->
body
.
requestMsg
;
pMsgSendInfo
->
fp
=
(
handleRequestRspFp
[
TMSG_INDEX
(
pRequest
->
type
)]
==
NULL
)
?
genericRspCallback
:
handleRequestRspFp
[
TMSG_INDEX
(
pRequest
->
type
)];
pMsgSendInfo
->
fp
=
(
handleRequestRspFp
[
TMSG_INDEX
(
pRequest
->
type
)]
==
NULL
)
?
genericRspCallback
:
handleRequestRspFp
[
TMSG_INDEX
(
pRequest
->
type
)];
return
pMsgSendInfo
;
}
...
...
@@ -114,7 +122,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
if
(
TSDB_CODE_MND_DB_NOT_EXIST
==
code
)
{
SUseDbRsp
usedbRsp
=
{
0
};
tDeserializeSUseDbRsp
(
pMsg
->
pData
,
pMsg
->
len
,
&
usedbRsp
);
struct
SCatalog
*
pCatalog
=
NULL
;
struct
SCatalog
*
pCatalog
=
NULL
;
if
(
usedbRsp
.
vgVersion
>=
0
)
{
int32_t
code1
=
catalogGetHandle
(
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
&
pCatalog
);
...
...
source/client/src/tmq.c
浏览文件 @
d05d704a
...
...
@@ -382,12 +382,9 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq
->
pTscObj
=
taos_connect_internal
(
conf
->
ip
,
user
,
pass
,
NULL
,
conf
->
db
,
conf
->
port
,
CONN_TYPE__TMQ
);
if
(
pTmq
->
pTscObj
==
NULL
)
return
NULL
;
/*pTmq->inWaiting = 0;*/
pTmq
->
status
=
0
;
pTmq
->
pollCnt
=
0
;
pTmq
->
epoch
=
0
;
/*pTmq->waitingRequest = 0;*/
/*pTmq->readyRequest = 0;*/
pTmq
->
epStatus
=
0
;
pTmq
->
epSkipCnt
=
0
;
// set conf
...
...
@@ -509,7 +506,6 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
tmq
->
clientTopics
=
taosArrayInit
(
sz
,
sizeof
(
SMqClientTopic
));
SCMSubscribeReq
req
;
req
.
topicNum
=
sz
;
req
.
consumerId
=
tmq
->
consumerId
;
strcpy
(
req
.
cgroup
,
tmq
->
groupId
);
req
.
topicNames
=
taosArrayInit
(
sz
,
sizeof
(
void
*
));
...
...
@@ -519,12 +515,16 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
char
*
topicName
=
taosArrayGetP
(
container
,
i
);
SName
name
=
{
0
};
#if 0
char* dbName = getDbOfConnection(tmq->pTscObj);
if (dbName == NULL) {
return TMQ_RESP_ERR__FAIL;
}
tNameSetDbName
(
&
name
,
tmq
->
pTscObj
->
acctId
,
dbName
,
strlen
(
dbName
));
#endif
tNameSetDbName
(
&
name
,
tmq
->
pTscObj
->
acctId
,
topicName
,
strlen
(
topicName
));
#if 0
tNameFromString(&name, topicName, T_NAME_TABLE);
#endif
char
*
topicFname
=
taosMemoryCalloc
(
1
,
TSDB_TOPIC_FNAME_LEN
);
if
(
topicFname
==
NULL
)
{
...
...
@@ -542,7 +542,9 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
topic
.
vgs
=
taosArrayInit
(
0
,
sizeof
(
SMqClientVg
));
taosArrayPush
(
tmq
->
clientTopics
,
&
topic
);
taosArrayPush
(
req
.
topicNames
,
&
topicFname
);
#if 0
taosMemoryFree(dbName);
#endif
}
int
tlen
=
tSerializeSCMSubscribeReq
(
NULL
,
&
req
);
...
...
source/common/src/tmsg.c
浏览文件 @
d05d704a
...
...
@@ -126,7 +126,6 @@ int32_t tDecodeSQueryNodeAddr(SCoder *pDecoder, SQueryNodeAddr *pAddr) {
return
0
;
}
int32_t
taosEncodeSEpSet
(
void
**
buf
,
const
SEpSet
*
pEp
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI8
(
buf
,
pEp
->
inUse
);
...
...
@@ -2084,10 +2083,15 @@ int32_t tDeserializeSQnodeListRsp(void *buf, int32_t bufLen, SQnodeListRsp *pRsp
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
int32_t
num
=
0
;
if
(
tDecodeI32
(
&
decoder
,
&
num
)
<
0
)
return
-
1
;
pRsp
->
addrsList
=
taosArrayInit
(
num
,
sizeof
(
SQueryNodeAddr
));
if
(
NULL
==
pRsp
->
addrsList
)
return
-
1
;
if
(
NULL
==
pRsp
->
addrsList
)
{
pRsp
->
addrsList
=
taosArrayInit
(
num
,
sizeof
(
SQueryNodeAddr
));
if
(
NULL
==
pRsp
->
addrsList
)
return
-
1
;
}
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
if
(
tDecodeSQueryNodeAddr
(
&
decoder
,
TARRAY_GET_ELEM
(
pRsp
->
addrsList
,
i
))
<
0
)
return
-
1
;
SQueryNodeAddr
addr
=
{
0
};
if
(
tDecodeSQueryNodeAddr
(
&
decoder
,
&
addr
)
<
0
)
return
-
1
;
taosArrayPush
(
pRsp
->
addrsList
,
&
addr
);
}
tEndDecode
(
&
decoder
);
...
...
@@ -2740,11 +2744,11 @@ int32_t tSerializeSCMCreateTopicReq(void *buf, int32_t bufLen, const SCMCreateTo
if
(
tEncodeI8
(
&
encoder
,
pReq
->
withTbName
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
withSchema
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
withTag
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
subscribeDbName
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
sqlLen
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
astLen
)
<
0
)
return
-
1
;
if
(
sqlLen
>
0
&&
tEncodeCStr
(
&
encoder
,
pReq
->
sql
)
<
0
)
return
-
1
;
if
(
astLen
>
0
&&
tEncodeCStr
(
&
encoder
,
pReq
->
ast
)
<
0
)
return
-
1
;
if
(
0
==
astLen
&&
tEncodeCStr
(
&
encoder
,
pReq
->
subscribeDbName
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
...
...
@@ -2766,6 +2770,7 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
withTbName
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
withSchema
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
withTag
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
subscribeDbName
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
sqlLen
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
astLen
)
<
0
)
return
-
1
;
...
...
@@ -2780,7 +2785,6 @@ int32_t tDeserializeSCMCreateTopicReq(void *buf, int32_t bufLen, SCMCreateTopicR
if
(
pReq
->
ast
==
NULL
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
ast
)
<
0
)
return
-
1
;
}
else
{
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
subscribeDbName
)
<
0
)
return
-
1
;
}
tEndDecode
(
&
decoder
);
...
...
source/dnode/mgmt/interface/src/dmInt.c
浏览文件 @
d05d704a
...
...
@@ -173,9 +173,13 @@ static void dmGetServerStatus(SDnode *pDnode, SServerStatusRsp *pStatus) {
void
dmProcessNettestReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpc
)
{
dDebug
(
"net test req is received"
);
SRpcMsg
rsp
=
{.
handle
=
pRpc
->
handle
,
.
ahandle
=
pRpc
->
ahandle
,
.
code
=
0
};
SRpcMsg
rsp
=
{.
handle
=
pRpc
->
handle
,
.
refId
=
pRpc
->
refId
,
.
ahandle
=
pRpc
->
ahandle
,
.
code
=
0
};
rsp
.
pCont
=
rpcMallocCont
(
pRpc
->
contLen
);
rsp
.
contLen
=
pRpc
->
contLen
;
if
(
rsp
.
pCont
==
NULL
)
{
rsp
.
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
else
{
rsp
.
contLen
=
pRpc
->
contLen
;
}
rpcSendResponse
(
&
rsp
);
}
...
...
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
浏览文件 @
d05d704a
...
...
@@ -177,6 +177,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle
(
pWrapper
,
TDMT_MND_DROP_MNODE
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_MND_CREATE_QNODE
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_MND_DROP_QNODE
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_MND_QNODE_LIST
,
mmProcessReadMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_MND_CREATE_SNODE
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_MND_DROP_SNODE
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_MND_CREATE_BNODE
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
d05d704a
...
...
@@ -450,7 +450,6 @@ typedef struct {
int8_t
withTbName
;
int8_t
withSchema
;
int8_t
withTag
;
int8_t
withTagSchema
;
SRWLatch
lock
;
int32_t
sqlLen
;
int32_t
astLen
;
...
...
@@ -517,7 +516,6 @@ typedef struct {
int8_t
withTbName
;
int8_t
withSchema
;
int8_t
withTag
;
int8_t
withTagSchema
;
SHashObj
*
consumerHash
;
// consumerId -> SMqConsumerEpInSub
// TODO put -1 into unassignVgs
// SArray* unassignedVgs;
...
...
source/dnode/mnode/impl/src/mndDef.c
浏览文件 @
d05d704a
...
...
@@ -237,7 +237,6 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
pSubNew
->
withTbName
=
pSub
->
withTbName
;
pSubNew
->
withSchema
=
pSub
->
withSchema
;
pSubNew
->
withTag
=
pSub
->
withTag
;
pSubNew
->
withTagSchema
=
pSub
->
withTagSchema
;
pSubNew
->
vgNum
=
pSub
->
vgNum
;
pSubNew
->
consumerHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
...
...
@@ -270,7 +269,6 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
tlen
+=
taosEncodeFixedI8
(
buf
,
pSub
->
withTbName
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pSub
->
withSchema
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pSub
->
withTag
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pSub
->
withTagSchema
);
void
*
pIter
=
NULL
;
int32_t
sz
=
taosHashGetSize
(
pSub
->
consumerHash
);
...
...
@@ -297,7 +295,6 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
buf
=
taosDecodeFixedI8
(
buf
,
&
pSub
->
withTbName
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pSub
->
withSchema
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pSub
->
withTag
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pSub
->
withTagSchema
);
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
...
...
source/dnode/mnode/impl/src/mndQnode.c
浏览文件 @
d05d704a
...
...
@@ -451,8 +451,9 @@ static int32_t mndProcessQnodeListReq(SNodeMsg *pReq) {
goto
_OVER
;
}
void
*
pIter
=
NULL
;
while
(
1
)
{
void
*
pIter
=
sdbFetch
(
pSdb
,
SDB_QNODE
,
NULL
,
(
void
**
)
&
pObj
);
pIter
=
sdbFetch
(
pSdb
,
SDB_QNODE
,
pIter
,
(
void
**
)
&
pObj
);
if
(
pIter
==
NULL
)
break
;
SQueryNodeAddr
nodeAddr
=
{
0
};
...
...
@@ -472,7 +473,7 @@ static int32_t mndProcessQnodeListReq(SNodeMsg *pReq) {
}
int32_t
rspLen
=
tSerializeSQnodeListRsp
(
NULL
,
0
,
&
qlistRsp
);
void
*
pRsp
=
taosMemoryMalloc
(
rspLen
);
void
*
pRsp
=
rpcMallocCont
(
rspLen
);
if
(
pRsp
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_OVER
;
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
d05d704a
...
...
@@ -35,11 +35,6 @@
#define MND_SUBSCRIBE_REBALANCE_CNT 3
enum
{
MQ_SUBSCRIBE_STATUS__ACTIVE
=
1
,
MQ_SUBSCRIBE_STATUS__DELETED
,
};
static
SSdbRaw
*
mndSubActionEncode
(
SMqSubscribeObj
*
);
static
SSdbRow
*
mndSubActionDecode
(
SSdbRaw
*
pRaw
);
static
int32_t
mndSubActionInsert
(
SSdb
*
pSdb
,
SMqSubscribeObj
*
);
...
...
@@ -89,7 +84,6 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic,
pSub
->
withTbName
=
pTopic
->
withTbName
;
pSub
->
withSchema
=
pTopic
->
withSchema
;
pSub
->
withTag
=
pTopic
->
withTag
;
pSub
->
withTagSchema
=
pTopic
->
withTagSchema
;
ASSERT
(
taosHashGetSize
(
pSub
->
consumerHash
)
==
1
);
...
...
@@ -115,7 +109,6 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
req
.
withTbName
=
pSub
->
withTbName
;
req
.
withSchema
=
pSub
->
withSchema
;
req
.
withTag
=
pSub
->
withTag
;
req
.
withTagSchema
=
pSub
->
withTagSchema
;
strncpy
(
req
.
subKey
,
pSub
->
key
,
TSDB_SUBSCRIBE_KEY_LEN
);
int32_t
tlen
=
sizeof
(
SMsgHead
)
+
tEncodeSMqRebVgReq
(
NULL
,
&
req
);
...
...
@@ -514,9 +507,11 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
// TODO replace assert with error check
ASSERT
(
mndDoRebalance
(
pMnode
,
&
rebInput
,
&
rebOutput
)
==
0
);
// if add more consumer to balanced subscribe,
// possibly no vg is changed
/*ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0);*/
ASSERT
(
mndPersistRebResult
(
pMnode
,
pMsg
,
&
rebOutput
)
==
0
);
if
(
rebInput
.
pTopic
)
{
...
...
@@ -673,177 +668,7 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
sdbRelease
(
pSdb
,
pSub
);
}
#if 0
static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
SMnode *pMnode = pMsg->pNode;
char *msgStr = pMsg->rpcMsg.pCont;
SCMSubscribeReq subscribe;
tDeserializeSCMSubscribeReq(msgStr, &subscribe);
int64_t consumerId = subscribe.consumerId;
char *cgroup = subscribe.consumerGroup;
SArray *newSub = subscribe.topicNames;
int32_t newTopicNum = subscribe.topicNum;
taosArraySortString(newSub, taosArrayCompareString);
SArray *oldSub = NULL;
int32_t oldTopicNum = 0;
bool createConsumer = false;
// create consumer if not exist
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) {
// create consumer
pConsumer = mndCreateConsumer(consumerId, cgroup);
createConsumer = true;
} else {
pConsumer->epoch++;
oldSub = pConsumer->currentTopics;
}
pConsumer->currentTopics = newSub;
if (oldSub != NULL) {
oldTopicNum = taosArrayGetSize(oldSub);
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
if (pTrans == NULL) {
// TODO: free memory
return -1;
}
int32_t i = 0, j = 0;
while (i < newTopicNum || j < oldTopicNum) {
char *newTopicName = NULL;
char *oldTopicName = NULL;
if (i >= newTopicNum) {
// encode unset topic msg to all vnodes related to that topic
oldTopicName = taosArrayGetP(oldSub, j);
j++;
} else if (j >= oldTopicNum) {
newTopicName = taosArrayGetP(newSub, i);
i++;
} else {
newTopicName = taosArrayGetP(newSub, i);
oldTopicName = taosArrayGetP(oldSub, j);
int32_t comp = compareLenPrefixedStr(newTopicName, oldTopicName);
if (comp == 0) {
// do nothing
oldTopicName = newTopicName = NULL;
i++;
j++;
continue;
} else if (comp < 0) {
oldTopicName = NULL;
i++;
} else {
newTopicName = NULL;
j++;
}
}
if (oldTopicName != NULL) {
ASSERT(newTopicName == NULL);
// cancel subscribe of old topic
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, oldTopicName);
ASSERT(pSub);
int32_t csz = taosArrayGetSize(pSub->consumers);
for (int32_t ci = 0; ci < csz; ci++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, ci);
if (pSubConsumer->consumerId == consumerId) {
int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
for (int32_t vgi = 0; vgi < vgsz; vgi++) {
SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, vgi);
mndPersistCancelConnReq(pMnode, pTrans, pConsumerEp, oldTopicName);
taosArrayPush(pSub->unassignedVg, pConsumerEp);
}
taosArrayRemove(pSub->consumers, ci);
break;
}
}
char *oldTopicNameDup = strdup(oldTopicName);
taosArrayPush(pConsumer->recentRemovedTopics, &oldTopicNameDup);
atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__MODIFY);
/*pSub->status = MQ_SUBSCRIBE_STATUS__DELETED;*/
} else if (newTopicName != NULL) {
ASSERT(oldTopicName == NULL);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName);
if (pTopic == NULL) {
mError("topic being subscribed not exist: %s", newTopicName);
continue;
}
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, newTopicName);
bool createSub = false;
if (pSub == NULL) {
mDebug("create new subscription by consumer %" PRId64 ", group: %s, topic %s", consumerId, cgroup,
newTopicName);
pSub = mndCreateSubscription(pMnode, pTopic, cgroup);
createSub = true;
mndCreateOffset(pTrans, cgroup, newTopicName, pSub->unassignedVg);
}
SMqSubConsumer mqSubConsumer;
mqSubConsumer.consumerId = consumerId;
mqSubConsumer.vgInfo = taosArrayInit(0, sizeof(SMqConsumerEp));
taosArrayPush(pSub->consumers, &mqSubConsumer);
// if have un assigned vg, assign one to the consumer
if (taosArrayGetSize(pSub->unassignedVg) > 0) {
SMqConsumerEp *pConsumerEp = taosArrayPop(pSub->unassignedVg);
pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
pConsumerEp->consumerId = consumerId;
taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp);
if (pConsumerEp->oldConsumerId == -1) {
mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 "", pConsumerEp->vgId, newTopicName,
pConsumerEp->consumerId);
mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
} else {
mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp, newTopicName);
}
// to trigger rebalance at once, do not set status active
/*atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);*/
}
SSdbRaw *pRaw = mndSubActionEncode(pSub);
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pRaw);
if (!createSub) mndReleaseSubscribe(pMnode, pSub);
mndReleaseTopic(pMnode, pTopic);
}
}
/*if (oldSub) taosArrayDestroyEx(oldSub, (void (*)(void *))taosMemoryFree);*/
// persist consumerObj
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pConsumerRaw);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("mq-subscribe-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer);
return -1;
}
mndTransDrop(pTrans);
if (!createConsumer) mndReleaseConsumer(pMnode, pConsumer);
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
#endif
static
int32_t
mndProcessSubscribeInternalRsp
(
SNodeMsg
*
pRsp
)
{
mndTransProcessRsp
(
pRsp
);
return
0
;
}
static
void
mndCancelGetNextConsumer
(
SMnode
*
pMnode
,
void
*
pIter
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbCancelFetch
(
pSdb
,
pIter
);
}
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
d05d704a
...
...
@@ -82,7 +82,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT8
(
pRaw
,
dataPos
,
pTopic
->
withTbName
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT8
(
pRaw
,
dataPos
,
pTopic
->
withSchema
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT8
(
pRaw
,
dataPos
,
pTopic
->
withTag
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT8
(
pRaw
,
dataPos
,
pTopic
->
withTagSchema
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTopic
->
sqlLen
,
TOPIC_ENCODE_OVER
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
sql
,
pTopic
->
sqlLen
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTopic
->
astLen
,
TOPIC_ENCODE_OVER
);
...
...
@@ -146,7 +145,6 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
pTopic
->
withTbName
,
TOPIC_DECODE_OVER
);
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
pTopic
->
withSchema
,
TOPIC_DECODE_OVER
);
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
pTopic
->
withTag
,
TOPIC_DECODE_OVER
);
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
pTopic
->
withTagSchema
,
TOPIC_DECODE_OVER
);
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pTopic
->
sqlLen
,
TOPIC_DECODE_OVER
);
pTopic
->
sql
=
taosMemoryCalloc
(
pTopic
->
sqlLen
,
sizeof
(
char
));
...
...
@@ -234,6 +232,7 @@ void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
sdbRelease
(
pSdb
,
pTopic
);
}
#if 0
static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
SName name = {0};
tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
...
...
@@ -243,6 +242,7 @@ static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
return mndAcquireDb(pMnode, db);
}
#endif
static
SDDropTopicReq
*
mndBuildDropTopicMsg
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
,
SMqTopicObj
*
pTopic
)
{
int32_t
contLen
=
sizeof
(
SDDropTopicReq
);
...
...
@@ -262,15 +262,11 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq
}
static
int32_t
mndCheckCreateTopicReq
(
SCMCreateTopicReq
*
pCreate
)
{
if
(
pCreate
->
name
[
0
]
==
0
||
pCreate
->
sql
==
NULL
||
pCreate
->
sql
[
0
]
==
0
)
{
if
(
pCreate
->
name
[
0
]
==
0
||
pCreate
->
sql
==
NULL
||
pCreate
->
sql
[
0
]
==
0
||
pCreate
->
subscribeDbName
[
0
]
==
0
)
{
terrno
=
TSDB_CODE_MND_INVALID_TOPIC_OPTION
;
return
-
1
;
}
if
((
pCreate
->
ast
==
NULL
||
pCreate
->
ast
[
0
]
==
0
)
&&
pCreate
->
subscribeDbName
[
0
]
==
0
)
{
terrno
=
TSDB_CODE_MND_INVALID_TOPIC_OPTION
;
return
-
1
;
}
return
0
;
}
...
...
@@ -386,7 +382,7 @@ static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq) {
goto
CREATE_TOPIC_OVER
;
}
pDb
=
mndAcquireDb
ByTopic
(
pMnode
,
createTopicReq
.
n
ame
);
pDb
=
mndAcquireDb
(
pMnode
,
createTopicReq
.
subscribeDbN
ame
);
if
(
pDb
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DB_NOT_SELECTED
;
goto
CREATE_TOPIC_OVER
;
...
...
source/dnode/qnode/src/qnode.c
浏览文件 @
d05d704a
...
...
@@ -17,6 +17,7 @@
#include "qndInt.h"
#include "query.h"
#include "qworker.h"
//#include "tudf.h"
SQnode
*
qndOpen
(
const
SQnodeOpt
*
pOption
)
{
SQnode
*
pQnode
=
taosMemoryCalloc
(
1
,
sizeof
(
SQnode
));
...
...
@@ -25,6 +26,8 @@ SQnode *qndOpen(const SQnodeOpt *pOption) {
return
NULL
;
}
//udfcOpen();
if
(
qWorkerInit
(
NODE_TYPE_QNODE
,
pQnode
->
qndId
,
NULL
,
(
void
**
)
&
pQnode
->
pQuery
,
&
pOption
->
msgCb
))
{
taosMemoryFreeClear
(
pQnode
);
return
NULL
;
...
...
@@ -37,13 +40,15 @@ SQnode *qndOpen(const SQnodeOpt *pOption) {
void
qndClose
(
SQnode
*
pQnode
)
{
qWorkerDestroy
((
void
**
)
&
pQnode
->
pQuery
);
//udfcClose();
taosMemoryFree
(
pQnode
);
}
int32_t
qndGetLoad
(
SQnode
*
pQnode
,
SQnodeLoad
*
pLoad
)
{
return
0
;
}
int32_t
qndProcessQueryMsg
(
SQnode
*
pQnode
,
SRpcMsg
*
pMsg
)
{
qTrace
(
"message in query queue is processing"
);
qTrace
(
"message in q
node q
uery queue is processing"
);
SReadHandle
handle
=
{
0
};
switch
(
pMsg
->
msgType
)
{
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
d05d704a
...
...
@@ -159,7 +159,6 @@ typedef struct {
int8_t
withTbName
;
int8_t
withSchema
;
int8_t
withTag
;
int8_t
withTagSchema
;
char
*
qmsg
;
STqPushHandle
pushHandle
;
// SRWLatch lock;
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
d05d704a
...
...
@@ -31,6 +31,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
pTq
->
path
=
strdup
(
path
);
pTq
->
pVnode
=
pVnode
;
pTq
->
pWal
=
pWal
;
#if 0
pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer,
(FTqDelete)taosMemoryFree, 0);
...
...
@@ -401,6 +402,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
consumerEpoch
=
atomic_val_compare_exchange_32
(
&
pExec
->
epoch
,
consumerEpoch
,
reqEpoch
);
}
SWalHead
*
pHeadWithCkSum
=
taosMemoryMalloc
(
sizeof
(
SWalHead
)
+
2048
);
if
(
pHeadWithCkSum
==
NULL
)
{
return
-
1
;
}
walSetReaderCapacity
(
pExec
->
pWalReader
,
2048
);
SMqDataBlkRsp
rsp
=
{
0
};
rsp
.
reqOffset
=
pReq
->
currentOffset
;
rsp
.
blockData
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
...
...
@@ -414,6 +422,26 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
break
;
}
taosThreadMutexLock
(
&
pExec
->
pWalReader
->
mutex
);
if
(
walFetchHead
(
pExec
->
pWalReader
,
fetchOffset
,
pHeadWithCkSum
)
<
0
)
{
vDebug
(
"tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return"
,
consumerId
,
pReq
->
epoch
,
TD_VID
(
pTq
->
pVnode
),
fetchOffset
);
taosThreadMutexUnlock
(
&
pExec
->
pWalReader
->
mutex
);
break
;
}
if
(
pHeadWithCkSum
->
head
.
msgType
!=
TDMT_VND_SUBMIT
)
{
walSkipFetchBody
(
pExec
->
pWalReader
,
pHeadWithCkSum
);
}
else
{
walFetchBody
(
pExec
->
pWalReader
,
&
pHeadWithCkSum
);
}
SWalReadHead
*
pHead
=
&
pHeadWithCkSum
->
head
;
taosThreadMutexUnlock
(
&
pExec
->
pWalReader
->
mutex
);
#if 0
SWalReadHead* pHead;
if (walReadWithHandle_s(pExec->pWalReader, fetchOffset, &pHead) < 0) {
// TODO: no more log, set timer to wait blocking time
...
...
@@ -443,14 +471,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
return 0;
#endif
break
;
}
break;
}
#endif
vDebug
(
"tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d"
,
consumerId
,
pReq
->
epoch
,
TD_VID
(
pTq
->
pVnode
),
fetchOffset
,
pHead
->
msgType
);
if
(
pHead
->
msgType
==
TDMT_VND_SUBMIT
)
{
SSubmitReq
*
pCont
=
(
SSubmitReq
*
)
&
pHead
->
body
;
// table subscribe
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
qTaskInfo_t
task
=
pExec
->
task
[
workerId
];
ASSERT
(
task
);
...
...
@@ -484,6 +514,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
taosArrayPush
(
rsp
.
blockData
,
&
buf
);
rsp
.
blockNum
++
;
}
// db subscribe
}
else
if
(
pExec
->
subType
==
TOPIC_SUB_TYPE__DB
)
{
STqReadHandle
*
pReader
=
pExec
->
pExecReader
[
workerId
];
tqReadHandleSetMsg
(
pReader
,
pCont
,
0
);
...
...
@@ -789,7 +820,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pExec
->
withTbName
=
req
.
withTbName
;
pExec
->
withSchema
=
req
.
withSchema
;
pExec
->
withTag
=
req
.
withTag
;
pExec
->
withTagSchema
=
req
.
withTagSchema
;
pExec
->
qmsg
=
req
.
qmsg
;
req
.
qmsg
=
NULL
;
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
d05d704a
...
...
@@ -135,7 +135,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
}
int
vnodeProcessQueryMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
vTrace
(
"message in query queue is processing"
);
vTrace
(
"message in
vnode
query queue is processing"
);
SReadHandle
handle
=
{.
reader
=
pVnode
->
pTsdb
,
.
meta
=
pVnode
->
pMeta
,
.
config
=
&
pVnode
->
config
};
switch
(
pMsg
->
msgType
)
{
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
d05d704a
...
...
@@ -494,7 +494,7 @@ _return:
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgGetQnodeListFromMnode
(
SCatalog
*
pCtg
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
SArray
*
*
out
)
{
int32_t
ctgGetQnodeListFromMnode
(
SCatalog
*
pCtg
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
SArray
*
out
)
{
char
*
msg
=
NULL
;
int32_t
msgLen
=
0
;
...
...
@@ -526,7 +526,7 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmt
CTG_ERR_RET
(
code
);
}
ctgDebug
(
"Got qnode list from mnode, listNum:%d"
,
(
int32_t
)
taosArrayGetSize
(
*
out
));
ctgDebug
(
"Got qnode list from mnode, listNum:%d"
,
(
int32_t
)
taosArrayGetSize
(
out
));
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -2778,7 +2778,8 @@ int32_t catalogGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps,
}
if
(
pReq
->
qNodeRequired
)
{
CTG_ERR_JRET
(
ctgGetQnodeListFromMnode
(
pCtg
,
pTrans
,
pMgmtEps
,
&
pRsp
->
pEpSetList
));
pRsp
->
pQnodeList
=
taosArrayInit
(
10
,
sizeof
(
SQueryNodeAddr
));
CTG_ERR_JRET
(
ctgGetQnodeListFromMnode
(
pCtg
,
pTrans
,
pMgmtEps
,
pRsp
->
pQnodeList
));
}
CTG_API_LEAVE
(
TSDB_CODE_SUCCESS
);
...
...
@@ -2807,7 +2808,7 @@ int32_t catalogGetQnodeList(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps,
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
CTG_ERR_JRET
(
ctgGetQnodeListFromMnode
(
pCtg
,
pRpc
,
pMgmtEps
,
&
pQnodeList
));
CTG_ERR_JRET
(
ctgGetQnodeListFromMnode
(
pCtg
,
pRpc
,
pMgmtEps
,
pQnodeList
));
_return:
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
d05d704a
此差异已折叠。
点击以展开。
source/libs/qcom/src/querymsg.c
浏览文件 @
d05d704a
...
...
@@ -342,28 +342,20 @@ PROCESS_META_OVER:
int32_t
queryProcessQnodeListRsp
(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
{
SQnodeListRsp
out
=
{
0
};
int32_t
code
=
-
1
;
int32_t
code
=
0
;
if
(
NULL
==
output
||
NULL
==
msg
||
msgSize
<=
0
)
{
code
=
TSDB_CODE_TSC_INVALID_INPUT
;
goto
PROCESS_QLIST_OVER
;
return
code
;
}
out
.
addrsList
=
(
SArray
*
)
output
;
if
(
tDeserializeSQnodeListRsp
(
msg
,
msgSize
,
&
out
)
!=
0
)
{
qError
(
"invalid qnode list rsp msg, msgSize:%d"
,
msgSize
);
code
=
TSDB_CODE_INVALID_MSG
;
goto
PROCESS_QLIST_OVER
;
return
code
;
}
PROCESS_QLIST_OVER:
if
(
code
!=
0
)
{
tFreeSQnodeListRsp
(
&
out
);
out
.
addrsList
=
NULL
;
}
*
(
SArray
**
)
output
=
out
.
addrsList
;
return
code
;
}
...
...
source/libs/qworker/src/qworkerMsg.c
浏览文件 @
d05d704a
...
...
@@ -46,7 +46,7 @@ int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code) {
SQueryTableRsp
rsp
=
{.
code
=
code
};
int32_t
contLen
=
tSerializeSQueryTableRsp
(
NULL
,
0
,
&
rsp
);
void
*
msg
=
rpcMallocCont
(
contLen
);
void
*
msg
=
rpcMallocCont
(
contLen
);
tSerializeSQueryTableRsp
(
msg
,
contLen
,
&
rsp
);
SRpcMsg
rpcRsp
=
{
...
...
@@ -87,7 +87,7 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo,
SExplainRsp
rsp
=
{.
numOfPlans
=
num
,
.
subplanInfo
=
execInfo
};
int32_t
contLen
=
tSerializeSExplainRsp
(
NULL
,
0
,
&
rsp
);
void
*
pRsp
=
rpcMallocCont
(
contLen
);
void
*
pRsp
=
rpcMallocCont
(
contLen
);
tSerializeSExplainRsp
(
pRsp
,
contLen
,
&
rsp
);
SRpcMsg
rpcRsp
=
{
...
...
@@ -107,7 +107,7 @@ int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo,
int32_t
qwBuildAndSendHbRsp
(
SQWConnInfo
*
pConn
,
SSchedulerHbRsp
*
pStatus
,
int32_t
code
)
{
int32_t
contLen
=
tSerializeSSchedulerHbRsp
(
NULL
,
0
,
pStatus
);
void
*
pRsp
=
rpcMallocCont
(
contLen
);
void
*
pRsp
=
rpcMallocCont
(
contLen
);
tSerializeSSchedulerHbRsp
(
pRsp
,
contLen
,
pStatus
);
SRpcMsg
rpcRsp
=
{
...
...
@@ -223,7 +223,7 @@ int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
showRsp
.
tableMeta
.
numOfColumns
=
cols
;
int32_t
bufLen
=
tSerializeSShowRsp
(
NULL
,
0
,
&
showRsp
);
void
*
pBuf
=
rpcMallocCont
(
bufLen
);
void
*
pBuf
=
rpcMallocCont
(
bufLen
);
tSerializeSShowRsp
(
pBuf
,
bufLen
,
&
showRsp
);
SRpcMsg
rpcMsg
=
{
...
...
@@ -403,8 +403,8 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
bool
queryDone
=
false
;
SQueryContinueReq
*
msg
=
(
SQueryContinueReq
*
)
pMsg
->
pCont
;
bool
needStop
=
false
;
SQWTaskCtx
*
handles
=
NULL
;
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
SQWTaskCtx
*
handles
=
NULL
;
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
QW_ELOG
(
"invalid cquery msg, msg:%p, msgLen:%d"
,
msg
,
pMsg
->
contLen
);
...
...
@@ -538,7 +538,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
int32_t
code
=
0
;
STaskCancelReq
*
msg
=
pMsg
->
pCont
;
if
(
NULL
==
msg
||
pMsg
->
contLen
<
sizeof
(
*
msg
))
{
...
...
@@ -620,7 +620,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t
code
=
0
;
SSchedulerHbReq
req
=
{
0
};
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
SQWorkerMgmt
*
mgmt
=
(
SQWorkerMgmt
*
)
qWorkerMgmt
;
if
(
NULL
==
pMsg
->
pCont
)
{
QW_ELOG
(
"invalid hb msg, msg:%p, msgLen:%d"
,
pMsg
->
pCont
,
pMsg
->
contLen
);
...
...
source/libs/scalar/inc/sclInt.h
浏览文件 @
d05d704a
...
...
@@ -32,6 +32,9 @@ typedef struct SScalarCtx {
#define SCL_DATA_TYPE_DUMMY_HASH 9000
#define SCL_DEFAULT_OP_NUM 10
#define SCL_IS_CONST_NODE(_node) ((NULL == (_node)) || (QUERY_NODE_VALUE == (_node)->type) || (QUERY_NODE_NODE_LIST == (_node)->type))
#define SCL_IS_CONST_CALC(_ctx) (NULL == (_ctx)->pBlockList)
#define sclFatal(...) qFatal(__VA_ARGS__)
#define sclError(...) qError(__VA_ARGS__)
#define sclWarn(...) qWarn(__VA_ARGS__)
...
...
source/libs/scalar/src/scalar.c
浏览文件 @
d05d704a
...
...
@@ -244,23 +244,53 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
return
TSDB_CODE_SUCCESS
;
}
int32_t
sclInitParamList
(
SScalarParam
**
pParams
,
SNodeList
*
pParamList
,
SScalarCtx
*
ctx
,
int32_t
*
rowNum
)
{
int32_t
sclInitParamList
(
SScalarParam
**
pParams
,
SNodeList
*
pParamList
,
SScalarCtx
*
ctx
,
int32_t
*
paramNum
,
int32_t
*
rowNum
)
{
int32_t
code
=
0
;
SScalarParam
*
paramList
=
taosMemoryCalloc
(
pParamList
->
length
,
sizeof
(
SScalarParam
));
if
(
NULL
==
pParamList
)
{
if
(
ctx
->
pBlockList
)
{
SSDataBlock
*
pBlock
=
taosArrayGet
(
ctx
->
pBlockList
,
0
);
*
rowNum
=
pBlock
->
info
.
rows
;
}
else
{
*
rowNum
=
1
;
}
*
paramNum
=
1
;
}
else
{
*
paramNum
=
pParamList
->
length
;
}
SScalarParam
*
paramList
=
taosMemoryCalloc
(
*
paramNum
,
sizeof
(
SScalarParam
));
if
(
NULL
==
paramList
)
{
sclError
(
"calloc %d failed"
,
(
int32_t
)(
pParamList
->
length
*
sizeof
(
SScalarParam
)));
sclError
(
"calloc %d failed"
,
(
int32_t
)(
(
*
paramNum
)
*
sizeof
(
SScalarParam
)));
SCL_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
SListCell
*
cell
=
pParamList
->
pHead
;
for
(
int32_t
i
=
0
;
i
<
pParamList
->
length
;
++
i
)
{
if
(
NULL
==
cell
||
NULL
==
cell
->
pNode
)
{
sclError
(
"invalid cell, cell:%p, pNode:%p"
,
cell
,
cell
->
pNode
);
SCL_ERR_JRET
(
TSDB_CODE_QRY_INVALID_INPUT
);
if
(
pParamList
)
{
SNode
*
tnode
=
NULL
;
int32_t
i
=
0
;
if
(
SCL_IS_CONST_CALC
(
ctx
))
{
WHERE_EACH
(
tnode
,
pParamList
)
{
if
(
!
SCL_IS_CONST_NODE
(
tnode
))
{
WHERE_NEXT
;
}
else
{
SCL_ERR_JRET
(
sclInitParam
(
tnode
,
&
paramList
[
i
],
ctx
,
rowNum
));
ERASE_NODE
(
pParamList
);
}
++
i
;
}
}
else
{
FOREACH
(
tnode
,
pParamList
)
{
SCL_ERR_JRET
(
sclInitParam
(
tnode
,
&
paramList
[
i
],
ctx
,
rowNum
));
++
i
;
}
}
}
else
{
paramList
[
0
].
numOfRows
=
*
rowNum
;
}
SCL_ERR_JRET
(
sclInitParam
(
cell
->
pNode
,
&
paramList
[
i
],
ctx
,
rowNum
));
cell
=
cell
->
pNext
;
if
(
0
==
*
rowNum
)
{
taosMemoryFreeClear
(
paramList
);
}
*
pParams
=
paramList
;
...
...
@@ -299,37 +329,45 @@ _return:
}
int32_t
sclExecFunction
(
SFunctionNode
*
node
,
SScalarCtx
*
ctx
,
SScalarParam
*
output
)
{
if
(
NULL
==
node
->
pParameterList
||
node
->
pParameterList
->
length
<=
0
)
{
sclError
(
"invalid function parameter list, list:%p, paramNum:%d"
,
node
->
pParameterList
,
node
->
pParameterList
?
node
->
pParameterList
->
length
:
0
);
SCL_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SScalarFuncExecFuncs
ffpSet
=
{
0
};
int32_t
code
=
fmGetScalarFuncExecFuncs
(
node
->
funcId
,
&
ffpSet
);
if
(
code
)
{
sclError
(
"fmGetFuncExecFuncs failed, funcId:%d, code:%s"
,
node
->
funcId
,
tstrerror
(
code
));
SCL_ERR_RET
(
code
);
}
SScalarParam
*
params
=
NULL
;
int32_t
rowNum
=
0
;
SCL_ERR_RET
(
sclInitParamList
(
&
params
,
node
->
pParameterList
,
ctx
,
&
rowNum
));
output
->
columnData
=
createColumnInfoData
(
&
node
->
node
.
resType
,
rowNum
);
if
(
output
->
columnData
==
NULL
)
{
sclError
(
"calloc %d failed"
,
(
int32_t
)(
rowNum
*
output
->
columnData
->
info
.
bytes
));
SCL_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
int32_t
paramNum
=
0
;
int32_t
code
=
0
;
SCL_ERR_RET
(
sclInitParamList
(
&
params
,
node
->
pParameterList
,
ctx
,
&
paramNum
,
&
rowNum
));
code
=
(
*
ffpSet
.
process
)(
params
,
node
->
pParameterList
->
length
,
output
);
if
(
code
)
{
sclError
(
"scalar function exec failed, funcId:%d, code:%s"
,
node
->
funcId
,
tstrerror
(
code
));
if
(
fmIsUserDefinedFunc
(
node
->
funcId
))
{
#if 0
UdfcFuncHandle udfHandle = NULL;
SCL_ERR_JRET(setupUdf(node->functionName, &udfHandle));
code = callUdfScalarFunc(udfHandle, params, paramNum, output);
teardownUdf(udfHandle);
SCL_ERR_JRET(code);
#endif
}
else
{
SScalarFuncExecFuncs
ffpSet
=
{
0
};
code
=
fmGetScalarFuncExecFuncs
(
node
->
funcId
,
&
ffpSet
);
if
(
code
)
{
sclError
(
"fmGetFuncExecFuncs failed, funcId:%d, code:%s"
,
node
->
funcId
,
tstrerror
(
code
));
SCL_ERR_JRET
(
code
);
}
output
->
columnData
=
createColumnInfoData
(
&
node
->
node
.
resType
,
rowNum
);
if
(
output
->
columnData
==
NULL
)
{
sclError
(
"calloc %d failed"
,
(
int32_t
)(
rowNum
*
output
->
columnData
->
info
.
bytes
));
SCL_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
code
=
(
*
ffpSet
.
process
)(
params
,
paramNum
,
output
);
if
(
code
)
{
sclError
(
"scalar function exec failed, funcId:%d, code:%s"
,
node
->
funcId
,
tstrerror
(
code
));
SCL_ERR_JRET
(
code
);
}
}
_return:
for
(
int32_t
i
=
0
;
i
<
node
->
pParameterList
->
length
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
paramNum
;
++
i
)
{
// sclFreeParamNoData(params + i);
}
...
...
@@ -355,8 +393,13 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
SScalarParam
*
params
=
NULL
;
int32_t
rowNum
=
0
;
int32_t
paramNum
=
0
;
int32_t
code
=
0
;
SCL_ERR_RET
(
sclInitParamList
(
&
params
,
node
->
pParameterList
,
ctx
,
&
rowNum
));
SCL_ERR_RET
(
sclInitParamList
(
&
params
,
node
->
pParameterList
,
ctx
,
&
paramNum
,
&
rowNum
));
if
(
NULL
==
params
)
{
output
->
numOfRows
=
0
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
type
=
node
->
node
.
resType
.
type
;
output
->
numOfRows
=
rowNum
;
...
...
@@ -369,25 +412,41 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
}
bool
value
=
false
;
bool
complete
=
true
;
for
(
int32_t
i
=
0
;
i
<
rowNum
;
++
i
)
{
for
(
int32_t
m
=
0
;
m
<
node
->
pParameterList
->
length
;
++
m
)
{
complete
=
true
;
for
(
int32_t
m
=
0
;
m
<
paramNum
;
++
m
)
{
if
(
NULL
==
params
[
m
].
columnData
)
{
complete
=
false
;
continue
;
}
char
*
p
=
colDataGetData
(
params
[
m
].
columnData
,
i
);
GET_TYPED_DATA
(
value
,
bool
,
params
[
m
].
columnData
->
info
.
type
,
p
);
if
(
LOGIC_COND_TYPE_AND
==
node
->
condType
&&
(
false
==
value
))
{
complete
=
true
;
break
;
}
else
if
(
LOGIC_COND_TYPE_OR
==
node
->
condType
&&
value
)
{
complete
=
true
;
break
;
}
else
if
(
LOGIC_COND_TYPE_NOT
==
node
->
condType
)
{
value
=
!
value
;
}
}
colDataAppend
(
output
->
columnData
,
i
,
(
char
*
)
&
value
,
false
);
if
(
complete
)
{
colDataAppend
(
output
->
columnData
,
i
,
(
char
*
)
&
value
,
false
);
}
}
if
(
SCL_IS_CONST_CALC
(
ctx
)
&&
(
false
==
complete
))
{
sclFreeParam
(
output
);
output
->
numOfRows
=
0
;
}
_return:
for
(
int32_t
i
=
0
;
i
<
node
->
pParameterList
->
length
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
paramNum
;
++
i
)
{
// sclFreeParamNoData(params + i);
}
...
...
@@ -426,6 +485,17 @@ _return:
EDealRes
sclRewriteFunction
(
SNode
**
pNode
,
SScalarCtx
*
ctx
)
{
SFunctionNode
*
node
=
(
SFunctionNode
*
)
*
pNode
;
SNode
*
tnode
=
NULL
;
if
(
fmIsUserDefinedFunc
(
node
->
funcId
))
{
return
DEAL_RES_CONTINUE
;
}
FOREACH
(
tnode
,
node
->
pParameterList
)
{
if
(
!
SCL_IS_CONST_NODE
(
tnode
))
{
return
DEAL_RES_CONTINUE
;
}
}
SScalarParam
output
=
{
0
};
ctx
->
code
=
sclExecFunction
(
node
,
ctx
,
&
output
);
...
...
@@ -470,6 +540,10 @@ EDealRes sclRewriteLogic(SNode** pNode, SScalarCtx *ctx) {
return
DEAL_RES_ERROR
;
}
if
(
0
==
output
.
numOfRows
)
{
return
DEAL_RES_CONTINUE
;
}
SValueNode
*
res
=
(
SValueNode
*
)
nodesMakeNode
(
QUERY_NODE_VALUE
);
if
(
NULL
==
res
)
{
sclError
(
"make value node failed"
);
...
...
@@ -498,6 +572,14 @@ EDealRes sclRewriteLogic(SNode** pNode, SScalarCtx *ctx) {
EDealRes
sclRewriteOperator
(
SNode
**
pNode
,
SScalarCtx
*
ctx
)
{
SOperatorNode
*
node
=
(
SOperatorNode
*
)
*
pNode
;
if
(
!
SCL_IS_CONST_NODE
(
node
->
pLeft
))
{
return
DEAL_RES_CONTINUE
;
}
if
(
!
SCL_IS_CONST_NODE
(
node
->
pRight
))
{
return
DEAL_RES_CONTINUE
;
}
SScalarParam
output
=
{.
columnData
=
taosMemoryCalloc
(
1
,
sizeof
(
SColumnInfoData
))};
ctx
->
code
=
sclExecOperator
(
node
,
ctx
,
&
output
);
if
(
ctx
->
code
)
{
...
...
@@ -530,7 +612,7 @@ EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) {
}
EDealRes
sclConstantsRewriter
(
SNode
**
pNode
,
void
*
pContext
)
{
if
(
QUERY_NODE_VALUE
==
nodeType
(
*
pNode
)
||
QUERY_NODE_NODE_LIST
==
nodeType
(
*
pNode
))
{
if
(
QUERY_NODE_VALUE
==
nodeType
(
*
pNode
)
||
QUERY_NODE_
COLUMN
==
nodeType
(
*
pNode
)
||
QUERY_NODE_
NODE_LIST
==
nodeType
(
*
pNode
))
{
return
DEAL_RES_CONTINUE
;
}
...
...
source/libs/scalar/test/scalar/scalarTests.cpp
浏览文件 @
d05d704a
...
...
@@ -137,6 +137,11 @@ void scltMakeColumnNode(SNode **pNode, SSDataBlock **block, int32_t dataType, in
rnode
->
node
.
resType
.
bytes
=
dataBytes
;
rnode
->
dataBlockId
=
0
;
if
(
NULL
==
block
)
{
*
pNode
=
(
SNode
*
)
rnode
;
return
;
}
if
(
NULL
==
*
block
)
{
SSDataBlock
*
res
=
(
SSDataBlock
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SSDataBlock
));
res
->
info
.
numOfCols
=
3
;
...
...
@@ -889,6 +894,8 @@ TEST(constantTest, int_greater_int_is_true2) {
}
TEST
(
constantTest
,
greater_and_lower
)
{
scltInitLogFile
();
SNode
*
pval1
=
NULL
,
*
pval2
=
NULL
,
*
opNode1
=
NULL
,
*
opNode2
=
NULL
,
*
logicNode
=
NULL
,
*
res
=
NULL
;
bool
eRes
[
5
]
=
{
false
,
false
,
true
,
true
,
true
};
int64_t
v1
=
333
,
v2
=
222
,
v3
=
-
10
,
v4
=
20
;
...
...
@@ -913,6 +920,115 @@ TEST(constantTest, greater_and_lower) {
nodesDestroyNode
(
res
);
}
TEST
(
constantTest
,
column_and_value1
)
{
scltInitLogFile
();
SNode
*
pval1
=
NULL
,
*
pval2
=
NULL
,
*
opNode1
=
NULL
,
*
opNode2
=
NULL
,
*
logicNode
=
NULL
,
*
res
=
NULL
;
bool
eRes
[
5
]
=
{
false
,
false
,
true
,
true
,
true
};
int64_t
v1
=
333
,
v2
=
222
,
v3
=
-
10
,
v4
=
20
;
SNode
*
list
[
2
]
=
{
0
};
scltMakeValueNode
(
&
pval1
,
TSDB_DATA_TYPE_BIGINT
,
&
v1
);
scltMakeValueNode
(
&
pval2
,
TSDB_DATA_TYPE_BIGINT
,
&
v2
);
scltMakeOpNode
(
&
opNode1
,
OP_TYPE_GREATER_THAN
,
TSDB_DATA_TYPE_BOOL
,
pval1
,
pval2
);
scltMakeValueNode
(
&
pval1
,
TSDB_DATA_TYPE_BIGINT
,
&
v3
);
scltMakeColumnNode
(
&
pval2
,
NULL
,
TSDB_DATA_TYPE_BIGINT
,
sizeof
(
int64_t
),
0
,
NULL
);
scltMakeOpNode
(
&
opNode2
,
OP_TYPE_LOWER_THAN
,
TSDB_DATA_TYPE_BOOL
,
pval1
,
pval2
);
list
[
0
]
=
opNode1
;
list
[
1
]
=
opNode2
;
scltMakeLogicNode
(
&
logicNode
,
LOGIC_COND_TYPE_AND
,
list
,
2
);
int32_t
code
=
scalarCalculateConstants
(
logicNode
,
&
res
);
ASSERT_EQ
(
code
,
0
);
ASSERT_TRUE
(
res
);
ASSERT_EQ
(
nodeType
(
res
),
QUERY_NODE_LOGIC_CONDITION
);
SLogicConditionNode
*
v
=
(
SLogicConditionNode
*
)
res
;
ASSERT_EQ
(
v
->
condType
,
LOGIC_COND_TYPE_AND
);
ASSERT_EQ
(
v
->
pParameterList
->
length
,
1
);
nodesDestroyNode
(
res
);
}
TEST
(
constantTest
,
column_and_value2
)
{
scltInitLogFile
();
SNode
*
pval1
=
NULL
,
*
pval2
=
NULL
,
*
opNode1
=
NULL
,
*
opNode2
=
NULL
,
*
logicNode
=
NULL
,
*
res
=
NULL
;
bool
eRes
[
5
]
=
{
false
,
false
,
true
,
true
,
true
};
int64_t
v1
=
333
,
v2
=
222
,
v3
=
-
10
,
v4
=
20
;
SNode
*
list
[
2
]
=
{
0
};
scltMakeValueNode
(
&
pval1
,
TSDB_DATA_TYPE_BIGINT
,
&
v1
);
scltMakeValueNode
(
&
pval2
,
TSDB_DATA_TYPE_BIGINT
,
&
v2
);
scltMakeOpNode
(
&
opNode1
,
OP_TYPE_LOWER_THAN
,
TSDB_DATA_TYPE_BOOL
,
pval1
,
pval2
);
scltMakeValueNode
(
&
pval1
,
TSDB_DATA_TYPE_BIGINT
,
&
v3
);
scltMakeColumnNode
(
&
pval2
,
NULL
,
TSDB_DATA_TYPE_BIGINT
,
sizeof
(
int64_t
),
0
,
NULL
);
scltMakeOpNode
(
&
opNode2
,
OP_TYPE_LOWER_THAN
,
TSDB_DATA_TYPE_BOOL
,
pval1
,
pval2
);
list
[
0
]
=
opNode1
;
list
[
1
]
=
opNode2
;
scltMakeLogicNode
(
&
logicNode
,
LOGIC_COND_TYPE_AND
,
list
,
2
);
int32_t
code
=
scalarCalculateConstants
(
logicNode
,
&
res
);
ASSERT_EQ
(
code
,
0
);
ASSERT_TRUE
(
res
);
ASSERT_EQ
(
nodeType
(
res
),
QUERY_NODE_VALUE
);
SValueNode
*
v
=
(
SValueNode
*
)
res
;
ASSERT_EQ
(
v
->
node
.
resType
.
type
,
TSDB_DATA_TYPE_BOOL
);
ASSERT_EQ
(
v
->
datum
.
b
,
false
);
nodesDestroyNode
(
res
);
}
TEST
(
constantTest
,
column_and_value3
)
{
scltInitLogFile
();
SNode
*
pval1
=
NULL
,
*
pval2
=
NULL
,
*
opNode1
=
NULL
,
*
opNode2
=
NULL
,
*
logicNode
=
NULL
,
*
res
=
NULL
;
bool
eRes
[
5
]
=
{
false
,
false
,
true
,
true
,
true
};
int64_t
v1
=
333
,
v2
=
222
,
v3
=
-
10
,
v4
=
20
;
SNode
*
list
[
2
]
=
{
0
};
scltMakeValueNode
(
&
pval1
,
TSDB_DATA_TYPE_BIGINT
,
&
v1
);
scltMakeValueNode
(
&
pval2
,
TSDB_DATA_TYPE_BIGINT
,
&
v2
);
scltMakeOpNode
(
&
opNode1
,
OP_TYPE_GREATER_THAN
,
TSDB_DATA_TYPE_BOOL
,
pval1
,
pval2
);
scltMakeValueNode
(
&
pval1
,
TSDB_DATA_TYPE_BIGINT
,
&
v3
);
scltMakeColumnNode
(
&
pval2
,
NULL
,
TSDB_DATA_TYPE_BIGINT
,
sizeof
(
int64_t
),
0
,
NULL
);
scltMakeOpNode
(
&
opNode2
,
OP_TYPE_LOWER_THAN
,
TSDB_DATA_TYPE_BOOL
,
pval1
,
pval2
);
list
[
0
]
=
opNode1
;
list
[
1
]
=
opNode2
;
scltMakeLogicNode
(
&
logicNode
,
LOGIC_COND_TYPE_OR
,
list
,
2
);
int32_t
code
=
scalarCalculateConstants
(
logicNode
,
&
res
);
ASSERT_EQ
(
code
,
0
);
ASSERT_TRUE
(
res
);
ASSERT_EQ
(
nodeType
(
res
),
QUERY_NODE_VALUE
);
SValueNode
*
v
=
(
SValueNode
*
)
res
;
ASSERT_EQ
(
v
->
node
.
resType
.
type
,
TSDB_DATA_TYPE_BOOL
);
ASSERT_EQ
(
v
->
datum
.
b
,
true
);
nodesDestroyNode
(
res
);
}
TEST
(
constantTest
,
column_and_value4
)
{
scltInitLogFile
();
SNode
*
pval1
=
NULL
,
*
pval2
=
NULL
,
*
opNode1
=
NULL
,
*
opNode2
=
NULL
,
*
logicNode
=
NULL
,
*
res
=
NULL
;
bool
eRes
[
5
]
=
{
false
,
false
,
true
,
true
,
true
};
int64_t
v1
=
333
,
v2
=
222
,
v3
=
-
10
,
v4
=
20
;
SNode
*
list
[
2
]
=
{
0
};
scltMakeValueNode
(
&
pval1
,
TSDB_DATA_TYPE_BIGINT
,
&
v1
);
scltMakeValueNode
(
&
pval2
,
TSDB_DATA_TYPE_BIGINT
,
&
v2
);
scltMakeOpNode
(
&
opNode1
,
OP_TYPE_LOWER_THAN
,
TSDB_DATA_TYPE_BOOL
,
pval1
,
pval2
);
scltMakeValueNode
(
&
pval1
,
TSDB_DATA_TYPE_BIGINT
,
&
v3
);
scltMakeColumnNode
(
&
pval2
,
NULL
,
TSDB_DATA_TYPE_BIGINT
,
sizeof
(
int64_t
),
0
,
NULL
);
scltMakeOpNode
(
&
opNode2
,
OP_TYPE_LOWER_THAN
,
TSDB_DATA_TYPE_BOOL
,
pval1
,
pval2
);
list
[
0
]
=
opNode1
;
list
[
1
]
=
opNode2
;
scltMakeLogicNode
(
&
logicNode
,
LOGIC_COND_TYPE_OR
,
list
,
2
);
int32_t
code
=
scalarCalculateConstants
(
logicNode
,
&
res
);
ASSERT_EQ
(
code
,
0
);
ASSERT_TRUE
(
res
);
ASSERT_EQ
(
nodeType
(
res
),
QUERY_NODE_LOGIC_CONDITION
);
SLogicConditionNode
*
v
=
(
SLogicConditionNode
*
)
res
;
ASSERT_EQ
(
v
->
condType
,
LOGIC_COND_TYPE_OR
);
ASSERT_EQ
(
v
->
pParameterList
->
length
,
1
);
nodesDestroyNode
(
res
);
}
void
makeJsonArrow
(
SSDataBlock
**
src
,
SNode
**
opNode
,
void
*
json
,
char
*
key
){
char
keyVar
[
32
]
=
{
0
};
memcpy
(
varDataVal
(
keyVar
),
key
,
strlen
(
key
));
...
...
source/libs/wal/src/walRead.c
浏览文件 @
d05d704a
...
...
@@ -138,6 +138,91 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
return
0
;
}
void
walSetReaderCapacity
(
SWalReadHandle
*
pRead
,
int32_t
capacity
)
{
pRead
->
capacity
=
capacity
;
}
int32_t
walFetchHead
(
SWalReadHandle
*
pRead
,
int64_t
ver
,
SWalHead
*
pHead
)
{
int32_t
code
;
// TODO: valid ver
if
(
pRead
->
curVersion
!=
ver
)
{
code
=
walReadSeekVer
(
pRead
,
ver
);
if
(
code
<
0
)
return
-
1
;
}
if
(
!
taosValidFile
(
pRead
->
pReadLogTFile
))
{
return
-
1
;
}
code
=
taosReadFile
(
pRead
->
pReadLogTFile
,
pHead
,
sizeof
(
SWalHead
));
if
(
code
!=
sizeof
(
SWalHead
))
{
return
-
1
;
}
code
=
walValidHeadCksum
(
pHead
);
if
(
code
!=
0
)
{
wError
(
"unexpected wal log version: % "
PRId64
", since head checksum not passed"
,
ver
);
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
return
0
;
}
int32_t
walSkipFetchBody
(
SWalReadHandle
*
pRead
,
const
SWalHead
*
pHead
)
{
int32_t
code
;
ASSERT
(
pRead
->
curVersion
==
pHead
->
head
.
version
);
code
=
taosLSeekFile
(
pRead
->
pReadLogTFile
,
pHead
->
head
.
bodyLen
,
SEEK_CUR
);
if
(
code
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
pRead
->
curVersion
=
-
1
;
return
-
1
;
}
pRead
->
curVersion
++
;
return
0
;
}
int32_t
walFetchBody
(
SWalReadHandle
*
pRead
,
SWalHead
**
ppHead
)
{
SWalReadHead
*
pReadHead
=
&
((
*
ppHead
)
->
head
);
int64_t
ver
=
pReadHead
->
version
;
if
(
pRead
->
capacity
<
pReadHead
->
bodyLen
)
{
void
*
ptr
=
taosMemoryRealloc
(
*
ppHead
,
sizeof
(
SWalHead
)
+
pReadHead
->
bodyLen
);
if
(
ptr
==
NULL
)
{
terrno
=
TSDB_CODE_WAL_OUT_OF_MEMORY
;
return
-
1
;
}
*
ppHead
=
ptr
;
pRead
->
capacity
=
pReadHead
->
bodyLen
;
}
if
(
pReadHead
->
bodyLen
!=
taosReadFile
(
pRead
->
pReadLogTFile
,
pReadHead
->
body
,
pReadHead
->
bodyLen
))
{
return
-
1
;
}
if
(
pReadHead
->
version
!=
ver
)
{
wError
(
"unexpected wal log version: %"
PRId64
", read request version:%"
PRId64
""
,
pRead
->
pHead
->
head
.
version
,
ver
);
pRead
->
curVersion
=
-
1
;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
if
(
walValidBodyCksum
(
*
ppHead
)
!=
0
)
{
wError
(
"unexpected wal log version: % "
PRId64
", since body checksum not passed"
,
ver
);
pRead
->
curVersion
=
-
1
;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
pRead
->
curVersion
=
ver
+
1
;
return
0
;
}
int32_t
walReadWithHandle_s
(
SWalReadHandle
*
pRead
,
int64_t
ver
,
SWalReadHead
**
ppHead
)
{
taosThreadMutexLock
(
&
pRead
->
mutex
);
if
(
walReadWithHandle
(
pRead
,
ver
)
<
0
)
{
...
...
@@ -172,12 +257,14 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
if
(
code
!=
sizeof
(
SWalHead
))
{
return
-
1
;
}
code
=
walValidHeadCksum
(
pRead
->
pHead
);
if
(
code
!=
0
)
{
wError
(
"unexpected wal log version: % "
PRId64
", since head checksum not passed"
,
ver
);
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
if
(
pRead
->
capacity
<
pRead
->
pHead
->
head
.
bodyLen
)
{
void
*
ptr
=
taosMemoryRealloc
(
pRead
->
pHead
,
sizeof
(
SWalHead
)
+
pRead
->
pHead
->
head
.
bodyLen
);
if
(
ptr
==
NULL
)
{
...
...
source/libs/wal/src/walWrite.c
浏览文件 @
d05d704a
...
...
@@ -13,8 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tchecksum.h"
...
...
@@ -298,14 +296,14 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
pWal
->
writeHead
.
head
.
bodyLen
=
bodyLen
;
pWal
->
writeHead
.
head
.
msgType
=
msgType
;
// sync info
// sync info
for sync module
pWal
->
writeHead
.
head
.
syncMeta
=
syncMeta
;
pWal
->
writeHead
.
cksumHead
=
walCalcHeadCksum
(
&
pWal
->
writeHead
);
pWal
->
writeHead
.
cksumBody
=
walCalcBodyCksum
(
body
,
bodyLen
);
if
(
taosWriteFile
(
pWal
->
pWriteLogTFile
,
&
pWal
->
writeHead
,
sizeof
(
SWalHead
))
!=
sizeof
(
SWalHead
))
{
// ftruncate
//
TODO
ftruncate
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, file:%"
PRId64
".log, failed to write since %s"
,
pWal
->
cfg
.
vgId
,
walGetLastFileFirstVer
(
pWal
),
strerror
(
errno
));
...
...
@@ -313,7 +311,7 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
}
if
(
taosWriteFile
(
pWal
->
pWriteLogTFile
,
(
char
*
)
body
,
bodyLen
)
!=
bodyLen
)
{
// ftruncate
//
TODO
ftruncate
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, file:%"
PRId64
".log, failed to write since %s"
,
pWal
->
cfg
.
vgId
,
walGetLastFileFirstVer
(
pWal
),
strerror
(
errno
));
...
...
tools/shell/src/shellNettest.c
浏览文件 @
d05d704a
...
...
@@ -67,7 +67,7 @@ static void shellWorkAsClient() {
printf
(
"request is sent, size:%d
\n
"
,
rpcMsg
.
contLen
);
rpcSendRecv
(
clientRpc
,
&
epSet
,
&
rpcMsg
,
&
rpcRsp
);
if
(
rpcRsp
.
code
==
0
&&
rpcRsp
.
contLen
==
rpcMsg
.
contLen
)
{
if
(
rpcRsp
.
code
==
0
&&
rpcRsp
.
contLen
==
rpcMsg
.
contLen
)
{
printf
(
"response is received, size:%d
\n
"
,
rpcMsg
.
contLen
);
if
(
rpcRsp
.
code
==
0
)
totalSucc
++
;
}
else
{
...
...
@@ -97,8 +97,12 @@ static void shellProcessMsg(void *p, SRpcMsg *pRpc, SEpSet *pEpSet) {
printf
(
"request is received, size:%d
\n
"
,
pRpc
->
contLen
);
fflush
(
stdout
);
SRpcMsg
rsp
=
{.
handle
=
pRpc
->
handle
,
.
refId
=
pRpc
->
refId
,
.
ahandle
=
pRpc
->
ahandle
,
.
code
=
0
};
rsp
.
pCont
=
rpcMallocCont
(
shell
.
args
.
pktLen
);
rsp
.
contLen
=
shell
.
args
.
pktLen
;
rsp
.
pCont
=
rpcMallocCont
(
pRpc
->
contLen
);
if
(
rsp
.
pCont
==
NULL
)
{
rsp
.
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
else
{
rsp
.
contLen
=
pRpc
->
contLen
;
}
rpcSendResponse
(
&
rsp
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录