Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e0654bac
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e0654bac
编写于
4月 14, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/3.0_liaohj
上级
9c9e980e
7323c88d
变更
46
展开全部
隐藏空白更改
内联
并排
Showing
46 changed file
with
3373 addition
and
2671 deletion
+3373
-2671
example/src/tmq.c
example/src/tmq.c
+17
-6
include/common/tmsg.h
include/common/tmsg.h
+6
-9
include/common/ttokendef.h
include/common/ttokendef.h
+42
-41
include/libs/nodes/nodes.h
include/libs/nodes/nodes.h
+1
-0
source/client/src/clientMain.c
source/client/src/clientMain.c
+14
-10
source/client/src/tmq.c
source/client/src/tmq.c
+14
-10
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+30
-14
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+147
-469
source/dnode/vnode/src/inc/meta.h
source/dnode/vnode/src/inc/meta.h
+11
-0
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+5
-1
source/dnode/vnode/src/meta/metaBDBImpl.c
source/dnode/vnode/src/meta/metaBDBImpl.c
+9
-12
source/dnode/vnode/src/meta/metaCfg.c
source/dnode/vnode/src/meta/metaCfg.c
+0
-5
source/dnode/vnode/src/meta/metaQuery.c
source/dnode/vnode/src/meta/metaQuery.c
+3
-1
source/dnode/vnode/src/meta/metaTDBImpl.c
source/dnode/vnode/src/meta/metaTDBImpl.c
+34
-36
source/dnode/vnode/src/meta/metaTbCfg.c
source/dnode/vnode/src/meta/metaTbCfg.c
+0
-1
source/dnode/vnode/src/meta/metaTbTag.c
source/dnode/vnode/src/meta/metaTbTag.c
+3
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+22
-15
source/dnode/vnode/src/tq/tqCommit.c
source/dnode/vnode/src/tq/tqCommit.c
+2
-0
source/dnode/vnode/src/tq/tqMetaStore.c
source/dnode/vnode/src/tq/tqMetaStore.c
+5
-5
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+40
-3
source/dnode/vnode/src/tsdb/tsdbBDBImpl.c
source/dnode/vnode/src/tsdb/tsdbBDBImpl.c
+2
-4
source/dnode/vnode/src/tsdb/tsdbFS.c
source/dnode/vnode/src/tsdb/tsdbFS.c
+24
-26
source/dnode/vnode/src/tsdb/tsdbOptions.c
source/dnode/vnode/src/tsdb/tsdbOptions.c
+0
-9
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+0
-12
source/dnode/vnode/src/tsdb/tsdbSma.c
source/dnode/vnode/src/tsdb/tsdbSma.c
+17
-30
source/dnode/vnode/src/vnd/vnodeCfg.c
source/dnode/vnode/src/vnd/vnodeCfg.c
+2
-11
source/dnode/vnode/src/vnd/vnodeInt.c
source/dnode/vnode/src/vnd/vnodeInt.c
+0
-1
source/dnode/vnode/src/vnd/vnodeMgr.c
source/dnode/vnode/src/vnd/vnodeMgr.c
+0
-1
source/dnode/vnode/src/vnd/vnodeQuery.c
source/dnode/vnode/src/vnd/vnodeQuery.c
+0
-1
source/dnode/vnode/src/vnd/vnodeWrite.c
source/dnode/vnode/src/vnd/vnodeWrite.c
+8
-8
source/dnode/vnode/test/tsdbSmaTest.cpp
source/dnode/vnode/test/tsdbSmaTest.cpp
+1
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+1
-1
source/libs/nodes/src/nodesTraverseFuncs.c
source/libs/nodes/src/nodesTraverseFuncs.c
+26
-24
source/libs/parser/inc/sql.y
source/libs/parser/inc/sql.y
+1
-0
source/libs/parser/src/parAstCreater.c
source/libs/parser/src/parAstCreater.c
+20
-2
source/libs/parser/src/parInsert.c
source/libs/parser/src/parInsert.c
+5
-0
source/libs/parser/src/parTokenizer.c
source/libs/parser/src/parTokenizer.c
+1
-0
source/libs/parser/src/sql.c
source/libs/parser/src/sql.c
+2242
-1827
source/libs/parser/test/mockCatalogService.cpp
source/libs/parser/test/mockCatalogService.cpp
+6
-5
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+0
-1
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+225
-17
source/libs/planner/src/planSpliter.c
source/libs/planner/src/planSpliter.c
+105
-42
source/libs/planner/test/plannerTest.cpp
source/libs/planner/test/plannerTest.cpp
+11
-5
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+4
-4
tests/script/tsim/tmq/overlapTopic2Con1Cgrp.sim
tests/script/tsim/tmq/overlapTopic2Con1Cgrp.sim
+240
-0
tests/test/c/tmqSim.c
tests/test/c/tmqSim.c
+27
-0
未找到文件。
example/src/tmq.c
浏览文件 @
e0654bac
...
...
@@ -19,8 +19,20 @@
#include <time.h>
#include "taos.h"
static
int
running
=
1
;
/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/
static
int
running
=
1
;
static
void
msg_process
(
TAOS_RES
*
msg
)
{
char
buf
[
1024
];
printf
(
"topic: %s
\n
"
,
tmq_get_topic_name
(
msg
));
printf
(
"vg:%d
\n
"
,
tmq_get_vgroup_id
(
msg
));
while
(
1
)
{
TAOS_ROW
row
=
taos_fetch_row
(
msg
);
if
(
row
==
NULL
)
break
;
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
msg
);
int32_t
numOfFields
=
taos_field_count
(
msg
);
taos_print_row
(
buf
,
row
,
fields
,
numOfFields
);
printf
(
"%s
\n
"
,
buf
);
}
}
int32_t
init_env
()
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
...
...
@@ -42,8 +54,7 @@ int32_t init_env() {
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)"
);
pRes
=
taos_query
(
pConn
,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c4 int) tags(t1 int)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
@@ -90,7 +101,7 @@ int32_t create_topic() {
/*const char* sql = "select * from tu1";*/
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column as select ts, c1 from ct1"
);
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column as select ts, c1
, c2, c4
from ct1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create topic topic_ctb_column, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
@@ -200,7 +211,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
while
(
running
)
{
TAOS_RES
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
1000
);
if
(
tmqmessage
)
{
/*msg_process(tmqmessage);*/
msg_process
(
tmqmessage
);
tmq_message_destroy
(
tmqmessage
);
if
((
++
msg_count
%
MIN_COMMIT_COUNT
)
==
0
)
tmq_commit
(
tmq
,
NULL
,
0
);
...
...
include/common/tmsg.h
浏览文件 @
e0654bac
...
...
@@ -2370,11 +2370,10 @@ typedef struct {
}
SMqSubVgEp
;
typedef
struct
{
char
topic
[
TSDB_TOPIC_FNAME_LEN
];
int8_t
isSchemaAdaptive
;
SArray
*
vgs
;
// SArray<SMqSubVgEp>
int32_t
numOfFields
;
TAOS_FIELD
*
fields
;
char
topic
[
TSDB_TOPIC_FNAME_LEN
];
int8_t
isSchemaAdaptive
;
SArray
*
vgs
;
// SArray<SMqSubVgEp>
SSchemaWrapper
schema
;
}
SMqSubTopicEp
;
typedef
struct
{
...
...
@@ -2473,8 +2472,7 @@ static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp
SMqSubVgEp
*
pVgEp
=
(
SMqSubVgEp
*
)
taosArrayGet
(
pTopicEp
->
vgs
,
i
);
tlen
+=
tEncodeSMqSubVgEp
(
buf
,
pVgEp
);
}
tlen
+=
taosEncodeFixedI32
(
buf
,
pTopicEp
->
numOfFields
);
// tlen += taosEncodeBinary(buf, pTopicEp->fields, pTopicEp->numOfFields * sizeof(TAOS_FIELD));
tlen
+=
taosEncodeSSchemaWrapper
(
buf
,
&
pTopicEp
->
schema
);
return
tlen
;
}
...
...
@@ -2492,8 +2490,7 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE
buf
=
tDecodeSMqSubVgEp
(
buf
,
&
vgEp
);
taosArrayPush
(
pTopicEp
->
vgs
,
&
vgEp
);
}
buf
=
taosDecodeFixedI32
(
buf
,
&
pTopicEp
->
numOfFields
);
// buf = taosDecodeBinary(buf, (void**)&pTopicEp->fields, pTopicEp->numOfFields * sizeof(TAOS_FIELD));
buf
=
taosDecodeSSchemaWrapper
(
buf
,
&
pTopicEp
->
schema
);
return
buf
;
}
...
...
include/common/ttokendef.h
浏览文件 @
e0654bac
...
...
@@ -182,47 +182,48 @@
#define TK_FIRST 164
#define TK_LAST 165
#define TK_NOW 166
#define TK_ROWTS 167
#define TK_TBNAME 168
#define TK_QSTARTTS 169
#define TK_QENDTS 170
#define TK_WSTARTTS 171
#define TK_WENDTS 172
#define TK_WDURATION 173
#define TK_BETWEEN 174
#define TK_IS 175
#define TK_NK_LT 176
#define TK_NK_GT 177
#define TK_NK_LE 178
#define TK_NK_GE 179
#define TK_NK_NE 180
#define TK_MATCH 181
#define TK_NMATCH 182
#define TK_JOIN 183
#define TK_INNER 184
#define TK_SELECT 185
#define TK_DISTINCT 186
#define TK_WHERE 187
#define TK_PARTITION 188
#define TK_BY 189
#define TK_SESSION 190
#define TK_STATE_WINDOW 191
#define TK_SLIDING 192
#define TK_FILL 193
#define TK_VALUE 194
#define TK_NONE 195
#define TK_PREV 196
#define TK_LINEAR 197
#define TK_NEXT 198
#define TK_GROUP 199
#define TK_HAVING 200
#define TK_ORDER 201
#define TK_SLIMIT 202
#define TK_SOFFSET 203
#define TK_LIMIT 204
#define TK_OFFSET 205
#define TK_ASC 206
#define TK_NULLS 207
#define TK_TODAY 167
#define TK_ROWTS 168
#define TK_TBNAME 169
#define TK_QSTARTTS 170
#define TK_QENDTS 171
#define TK_WSTARTTS 172
#define TK_WENDTS 173
#define TK_WDURATION 174
#define TK_BETWEEN 175
#define TK_IS 176
#define TK_NK_LT 177
#define TK_NK_GT 178
#define TK_NK_LE 179
#define TK_NK_GE 180
#define TK_NK_NE 181
#define TK_MATCH 182
#define TK_NMATCH 183
#define TK_JOIN 184
#define TK_INNER 185
#define TK_SELECT 186
#define TK_DISTINCT 187
#define TK_WHERE 188
#define TK_PARTITION 189
#define TK_BY 190
#define TK_SESSION 191
#define TK_STATE_WINDOW 192
#define TK_SLIDING 193
#define TK_FILL 194
#define TK_VALUE 195
#define TK_NONE 196
#define TK_PREV 197
#define TK_LINEAR 198
#define TK_NEXT 199
#define TK_GROUP 200
#define TK_HAVING 201
#define TK_ORDER 202
#define TK_SLIMIT 203
#define TK_SOFFSET 204
#define TK_LIMIT 205
#define TK_OFFSET 206
#define TK_ASC 207
#define TK_NULLS 208
#define TK_NK_SPACE 300
#define TK_NK_COMMENT 301
...
...
include/libs/nodes/nodes.h
浏览文件 @
e0654bac
...
...
@@ -231,6 +231,7 @@ typedef enum EDealRes {
DEAL_RES_CONTINUE
=
1
,
DEAL_RES_IGNORE_CHILD
,
DEAL_RES_ERROR
,
DEAL_RES_END
}
EDealRes
;
typedef
EDealRes
(
*
FNodeWalker
)(
SNode
*
pNode
,
void
*
pContext
);
...
...
source/client/src/clientMain.c
浏览文件 @
e0654bac
...
...
@@ -171,21 +171,25 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return
doFetchRow
(
pRequest
,
true
,
true
);
}
else
if
(
TD_RES_TMQ
(
res
))
{
SMqRspObj
*
msg
=
((
SMqRspObj
*
)
res
);
SMqRspObj
*
msg
=
((
SMqRspObj
*
)
res
);
if
(
msg
->
resIter
==
-
1
)
msg
->
resIter
++
;
SReqResultInfo
*
pResultInfo
=
taosArrayGet
(
msg
->
res
,
msg
->
resIter
);
doSetOneRowPtr
(
pResultInfo
);
pResultInfo
->
current
+=
1
;
if
(
pResultInfo
->
row
==
NULL
)
{
msg
->
resIter
++
;
pResultInfo
=
taosArrayGet
(
msg
->
res
,
msg
->
resIter
);
if
(
pResultInfo
->
current
<
pResultInfo
->
numOfRows
)
{
doSetOneRowPtr
(
pResultInfo
);
pResultInfo
->
current
+=
1
;
return
pResultInfo
->
row
;
}
else
{
msg
->
resIter
++
;
if
(
msg
->
resIter
<
taosArrayGetSize
(
msg
->
res
))
{
pResultInfo
=
taosArrayGet
(
msg
->
res
,
msg
->
resIter
);
doSetOneRowPtr
(
pResultInfo
);
pResultInfo
->
current
+=
1
;
return
pResultInfo
->
row
;
}
else
{
return
NULL
;
}
}
return
pResultInfo
->
row
;
}
else
{
// assert to avoid uninitialization error
ASSERT
(
0
);
...
...
source/client/src/tmq.c
浏览文件 @
e0654bac
...
...
@@ -119,14 +119,14 @@ typedef struct {
typedef
struct
{
// subscribe info
int32_t
sqlLen
;
char
*
sql
;
char
*
topicName
;
int64_t
topicId
;
SArray
*
vgs
;
// SArray<SMqClientVg>
int8_t
isSchemaAdaptive
;
int32_t
numOfFields
;
TAOS_FIELD
*
fields
;
int32_t
sqlLen
;
char
*
sql
;
char
*
topicName
;
int64_t
topicId
;
SArray
*
vgs
;
// SArray<SMqClientVg>
int8_t
isSchemaAdaptive
;
int32_t
numOfFields
;
SSchemaWrapper
schema
;
}
SMqClientTopic
;
typedef
struct
{
...
...
@@ -956,6 +956,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
for
(
int32_t
i
=
0
;
i
<
topicNumGet
;
i
++
)
{
SMqClientTopic
topic
=
{
0
};
SMqSubTopicEp
*
pTopicEp
=
taosArrayGet
(
pRsp
->
topics
,
i
);
topic
.
schema
=
pTopicEp
->
schema
;
taosHashClear
(
pHash
);
topic
.
topicName
=
strdup
(
pTopicEp
->
topic
);
...
...
@@ -1191,7 +1192,10 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
for
(
int32_t
i
=
0
;
i
<
blockNum
;
i
++
)
{
int32_t
pos
=
*
(
int32_t
*
)
taosArrayGet
(
pRsp
->
blockPos
,
i
);
SRetrieveTableRsp
*
pRetrieve
=
POINTER_SHIFT
(
pRsp
->
blockData
,
pos
);
SReqResultInfo
resInfo
;
SReqResultInfo
resInfo
=
{
0
};
resInfo
.
totalRows
=
0
;
resInfo
.
precision
=
TSDB_TIME_PRECISION_MILLI
;
setResSchemaInfo
(
&
resInfo
,
pWrapper
->
topicHandle
->
schema
.
pSchema
,
pWrapper
->
topicHandle
->
schema
.
nCols
);
setQueryResultFromRsp
(
&
resInfo
,
pRetrieve
,
true
);
taosArrayPush
(
pRspObj
->
res
,
&
resInfo
);
}
...
...
@@ -1386,7 +1390,7 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
rspWrapper
=
NULL
;
continue
;
}
// build
msg
// build
rsp
SMqRspObj
*
pRsp
=
tmqBuildRspFromWrapper
(
pollRspWrapper
);
return
pRsp
;
}
else
{
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
e0654bac
...
...
@@ -60,8 +60,10 @@ static int32_t mndProcessResetOffsetReq(SNodeMsg *pMsg);
static
int32_t
mndPersistMqSetConnReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqTopicObj
*
pTopic
,
const
char
*
cgroup
,
const
SMqConsumerEp
*
pConsumerEp
);
static
int32_t
mndPersistRebalanceMsg
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
topicName
);
static
int32_t
mndPersistCancelConnReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
oldTopicName
);
static
int32_t
mndPersistRebalanceMsg
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
topicName
);
static
int32_t
mndPersistCancelConnReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
oldTopicName
);
int32_t
mndInitSubscribe
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_SUBSCRIBE
,
...
...
@@ -102,7 +104,8 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj
return
pSub
;
}
static
int32_t
mndBuildRebalanceMsg
(
void
**
pBuf
,
int32_t
*
pLen
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
topicName
)
{
static
int32_t
mndBuildRebalanceMsg
(
void
**
pBuf
,
int32_t
*
pLen
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
topicName
)
{
SMqMVRebReq
req
=
{
.
vgId
=
pConsumerEp
->
vgId
,
.
oldConsumerId
=
pConsumerEp
->
oldConsumerId
,
...
...
@@ -131,7 +134,8 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume
return
0
;
}
static
int32_t
mndPersistRebalanceMsg
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
topicName
)
{
static
int32_t
mndPersistRebalanceMsg
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
topicName
)
{
ASSERT
(
pConsumerEp
->
oldConsumerId
!=
-
1
);
void
*
buf
;
...
...
@@ -158,7 +162,8 @@ static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqC
return
0
;
}
static
int32_t
mndBuildCancelConnReq
(
void
**
pBuf
,
int32_t
*
pLen
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
oldTopicName
)
{
static
int32_t
mndBuildCancelConnReq
(
void
**
pBuf
,
int32_t
*
pLen
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
oldTopicName
)
{
SMqCancelConnReq
req
=
{
0
};
req
.
consumerId
=
pConsumerEp
->
consumerId
;
req
.
vgId
=
pConsumerEp
->
vgId
;
...
...
@@ -182,7 +187,8 @@ static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsum
return
0
;
}
static
int32_t
mndPersistCancelConnReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
oldTopicName
)
{
static
int32_t
mndPersistCancelConnReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
oldTopicName
)
{
void
*
buf
;
int32_t
tlen
;
if
(
mndBuildCancelConnReq
(
&
buf
,
&
tlen
,
pConsumerEp
,
oldTopicName
)
<
0
)
{
...
...
@@ -219,13 +225,14 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
terrno
=
TSDB_CODE_MND_CONSUMER_NOT_EXIST
;
return
-
1
;
}
//TODO add lock
//
TODO add lock
ASSERT
(
strcmp
(
pReq
->
cgroup
,
pConsumer
->
cgroup
)
==
0
);
int32_t
serverEpoch
=
pConsumer
->
epoch
;
int32_t
serverEpoch
=
pConsumer
->
epoch
;
// TODO
int32_t
hbStatus
=
atomic_load_32
(
&
pConsumer
->
hbStatus
);
mDebug
(
"consumer %ld epoch(%d) try to get sub ep, server epoch %d, old val: %d"
,
consumerId
,
epoch
,
serverEpoch
,
hbStatus
);
mDebug
(
"consumer %ld epoch(%d) try to get sub ep, server epoch %d, old val: %d"
,
consumerId
,
epoch
,
serverEpoch
,
hbStatus
);
atomic_store_32
(
&
pConsumer
->
hbStatus
,
0
);
/*SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);*/
/*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/
...
...
@@ -233,7 +240,8 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
strcpy
(
rsp
.
cgroup
,
pReq
->
cgroup
);
if
(
epoch
!=
serverEpoch
)
{
mInfo
(
"send new assignment to consumer %ld, consumer epoch %d, server epoch %d"
,
pConsumer
->
consumerId
,
epoch
,
serverEpoch
);
mInfo
(
"send new assignment to consumer %ld, consumer epoch %d, server epoch %d"
,
pConsumer
->
consumerId
,
epoch
,
serverEpoch
);
mDebug
(
"consumer %ld try r lock"
,
consumerId
);
taosRLockLatch
(
&
pConsumer
->
lock
);
mDebug
(
"consumer %ld r locked"
,
consumerId
);
...
...
@@ -251,8 +259,15 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
if
(
consumerId
==
pSubConsumer
->
consumerId
)
{
int32_t
vgsz
=
taosArrayGetSize
(
pSubConsumer
->
vgInfo
);
mInfo
(
"topic %s has %d vg"
,
topicName
,
serverEpoch
);
SMqSubTopicEp
topicEp
;
strcpy
(
topicEp
.
topic
,
topicName
);
SMqTopicObj
*
pTopic
=
mndAcquireTopic
(
pMnode
,
topicName
);
ASSERT
(
pTopic
!=
NULL
);
topicEp
.
schema
=
pTopic
->
schema
;
mndReleaseTopic
(
pMnode
,
pTopic
);
topicEp
.
vgs
=
taosArrayInit
(
vgsz
,
sizeof
(
SMqSubVgEp
));
for
(
int32_t
k
=
0
;
k
<
vgsz
;
k
++
)
{
char
offsetKey
[
TSDB_PARTITION_KEY_LEN
];
...
...
@@ -409,7 +424,8 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
SMqSubscribeObj
*
pSub
=
mndAcquireSubscribeByKey
(
pMnode
,
pRebSub
->
key
);
taosMemoryFreeClear
(
pRebSub
->
key
);
mInfo
(
"mq rebalance subscription: %s, vgNum: %d, unassignedVg: %d"
,
pSub
->
key
,
pSub
->
vgNum
,
(
int32_t
)
taosArrayGetSize
(
pSub
->
unassignedVg
));
mInfo
(
"mq rebalance subscription: %s, vgNum: %d, unassignedVg: %d"
,
pSub
->
key
,
pSub
->
vgNum
,
(
int32_t
)
taosArrayGetSize
(
pSub
->
unassignedVg
));
// remove lost consumer
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pRebSub
->
lostConsumers
);
i
++
)
{
...
...
@@ -459,12 +475,12 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
mDebug
(
"consumer %ld try w lock"
,
pRebConsumer
->
consumerId
);
taosWLockLatch
(
&
pRebConsumer
->
lock
);
mDebug
(
"consumer %ld w locked"
,
pRebConsumer
->
consumerId
);
int32_t
status
=
atomic_load_32
(
&
pRebConsumer
->
status
);
int32_t
status
=
atomic_load_32
(
&
pRebConsumer
->
status
);
if
(
vgThisConsumerAfterRb
!=
vgThisConsumerBeforeRb
||
(
vgThisConsumerAfterRb
!=
0
&&
status
!=
MQ_CONSUMER_STATUS__ACTIVE
)
||
(
vgThisConsumerAfterRb
==
0
&&
status
!=
MQ_CONSUMER_STATUS__LOST
))
{
/*if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb) {*/
/*pRebConsumer->epoch++;*/
/*pRebConsumer->epoch++;*/
/*}*/
if
(
vgThisConsumerAfterRb
!=
0
)
{
atomic_store_32
(
&
pRebConsumer
->
status
,
MQ_CONSUMER_STATUS__ACTIVE
);
...
...
@@ -500,7 +516,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
pConsumerEp
->
oldConsumerId
=
pConsumerEp
->
consumerId
;
pConsumerEp
->
consumerId
=
pSubConsumer
->
consumerId
;
//TODO
//
TODO
pConsumerEp
->
epoch
=
0
;
taosArrayPush
(
pSubConsumer
->
vgInfo
,
pConsumerEp
);
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
e0654bac
此差异已折叠。
点击以展开。
source/dnode/vnode/src/inc/meta.h
浏览文件 @
e0654bac
...
...
@@ -24,6 +24,17 @@ typedef struct SMetaCache SMetaCache;
typedef
struct
SMetaIdx
SMetaIdx
;
typedef
struct
SMetaDB
SMetaDB
;
SMeta
*
metaOpen
(
const
char
*
path
,
const
SMetaCfg
*
pMetaCfg
,
SMemAllocatorFactory
*
pMAF
);
void
metaClose
(
SMeta
*
pMeta
);
void
metaRemove
(
const
char
*
path
);
int
metaCreateTable
(
SMeta
*
pMeta
,
STbCfg
*
pTbCfg
);
int
metaDropTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
int
metaCommit
(
SMeta
*
pMeta
);
int32_t
metaCreateTSma
(
SMeta
*
pMeta
,
SSmaCfg
*
pCfg
);
int32_t
metaDropTSma
(
SMeta
*
pMeta
,
int64_t
indexUid
);
STbCfg
*
metaGetTbInfoByUid
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
STbCfg
*
metaGetTbInfoByName
(
SMeta
*
pMeta
,
char
*
tbname
,
tb_uid_t
*
uid
);
// SMetaDB
int
metaOpenDB
(
SMeta
*
pMeta
);
void
metaCloseDB
(
SMeta
*
pMeta
);
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
e0654bac
...
...
@@ -17,6 +17,9 @@
#define _TD_VNODE_DEF_H_
#include "executor.h"
#include "filter.h"
#include "qworker.h"
#include "sync.h"
#include "tchecksum.h"
#include "tcoding.h"
#include "tcompression.h"
...
...
@@ -25,14 +28,15 @@
#include "tglobal.h"
#include "tlist.h"
#include "tlockfree.h"
#include "tlosertree.h"
#include "tmacro.h"
#include "tmallocator.h"
#include "tskiplist.h"
#include "tstream.h"
#include "ttime.h"
#include "ttimer.h"
#include "vnode.h"
#include "wal.h"
#include "qworker.h"
#ifdef __cplusplus
extern
"C"
{
...
...
source/dnode/vnode/src/meta/metaBDBImpl.c
浏览文件 @
e0654bac
...
...
@@ -16,10 +16,7 @@
#define ALLOW_FORBID_FUNC
#include "db.h"
#include "metaDef.h"
#include "tcoding.h"
#include "thash.h"
#include "vnodeInt.h"
#define IMPL_WITH_LOCK 1
// #if IMPL_WITH_LOCK
...
...
@@ -262,7 +259,7 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
return
0
;
}
int
metaRemoveSmaFromDb
(
SMeta
*
pMeta
,
int64_t
indexUid
)
{
int
metaRemoveSmaFromDb
(
SMeta
*
pMeta
,
int64_t
indexUid
)
{
// TODO
#if 0
DBT key = {0};
...
...
@@ -668,7 +665,7 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) {
}
void
*
metaGetSmaInfoByIndex
(
SMeta
*
pMeta
,
int64_t
indexUid
,
bool
isDecode
)
{
STSma
*
pCfg
=
NULL
;
STSma
*
pCfg
=
NULL
;
SMetaDB
*
pDB
=
pMeta
->
pDB
;
DBT
key
=
{
0
};
DBT
value
=
{
0
};
...
...
@@ -711,9 +708,9 @@ static SSchemaWrapper *metaGetTableSchemaImpl(SMeta *pMeta, tb_uid_t uid, int32_
int
ret
;
void
*
pBuf
;
// SSchema *pSchema;
SSchemaKey
schemaKey
=
{
uid
,
sver
,
0
};
DBT
key
=
{
0
};
DBT
value
=
{
0
};
SSchemaKey
schemaKey
=
{
uid
,
sver
,
0
};
DBT
key
=
{
0
};
DBT
value
=
{
0
};
// Set key/value properties
key
.
data
=
&
schemaKey
;
...
...
@@ -761,14 +758,14 @@ SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
}
int
metaGetTbNum
(
SMeta
*
pMeta
)
{
SMetaDB
*
pDB
=
pMeta
->
pDB
;
SMetaDB
*
pDB
=
pMeta
->
pDB
;
DB_BTREE_STAT
*
sp1
;
pDB
->
pTbDB
->
stat
(
pDB
->
pNtbIdx
,
NULL
,
&
sp1
,
0
);
DB_BTREE_STAT
*
sp2
;
pDB
->
pTbDB
->
stat
(
pDB
->
pCtbIdx
,
NULL
,
&
sp2
,
0
);
return
sp1
->
bt_nkeys
+
sp2
->
bt_nkeys
;
}
...
...
source/dnode/vnode/src/meta/metaCfg.c
浏览文件 @
e0654bac
...
...
@@ -18,11 +18,6 @@
const
SMetaCfg
defaultMetaOptions
=
{.
lruSize
=
0
};
/* ------------------------ EXPOSED METHODS ------------------------ */
void
metaOptionsInit
(
SMetaCfg
*
pMetaOptions
)
{
metaOptionsCopy
(
pMetaOptions
,
&
defaultMetaOptions
);
}
void
metaOptionsClear
(
SMetaCfg
*
pMetaOptions
)
{
// TODO
}
int
metaValidateOptions
(
const
SMetaCfg
*
pMetaOptions
)
{
// TODO
...
...
source/dnode/vnode/src/meta/metaQuery.c
浏览文件 @
e0654bac
...
...
@@ -11,4 +11,6 @@
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
*/
#include "vnodeInt.h"
\ No newline at end of file
source/dnode/vnode/src/meta/metaTDBImpl.c
浏览文件 @
e0654bac
...
...
@@ -15,7 +15,6 @@
#include "vnodeInt.h"
#include "tdbInt.h"
typedef
struct
SPoolMem
{
int64_t
size
;
struct
SPoolMem
*
prev
;
...
...
@@ -27,18 +26,18 @@ typedef struct SPoolMem {
static
SPoolMem
*
openPool
();
static
void
clearPool
(
SPoolMem
*
pPool
);
static
void
closePool
(
SPoolMem
*
pPool
);
static
void
*
poolMalloc
(
void
*
arg
,
size_t
size
);
static
void
*
poolMalloc
(
void
*
arg
,
size_t
size
);
static
void
poolFree
(
void
*
arg
,
void
*
ptr
);
struct
SMetaDB
{
TXN
txn
;
TENV
*
pEnv
;
TDB
*
pTbDB
;
TDB
*
pSchemaDB
;
TDB
*
pNameIdx
;
TDB
*
pStbIdx
;
TDB
*
pNtbIdx
;
TDB
*
pCtbIdx
;
TENV
*
pEnv
;
TDB
*
pTbDB
;
TDB
*
pSchemaDB
;
TDB
*
pNameIdx
;
TDB
*
pStbIdx
;
TDB
*
pNtbIdx
;
TDB
*
pCtbIdx
;
SPoolMem
*
pPool
;
#ifdef META_TDB_SMA_TEST
TDB
*
pSmaDB
;
...
...
@@ -52,7 +51,7 @@ typedef struct __attribute__((__packed__)) {
}
SSchemaDbKey
;
typedef
struct
{
char
*
name
;
char
*
name
;
tb_uid_t
uid
;
}
SNameIdxKey
;
...
...
@@ -251,14 +250,14 @@ void metaCloseDB(SMeta *pMeta) {
int
metaSaveTableToDB
(
SMeta
*
pMeta
,
STbCfg
*
pTbCfg
)
{
tb_uid_t
uid
;
SMetaDB
*
pMetaDb
;
void
*
pKey
;
void
*
pVal
;
SMetaDB
*
pMetaDb
;
void
*
pKey
;
void
*
pVal
;
int
kLen
;
int
vLen
;
int
ret
;
char
buf
[
512
];
void
*
pBuf
;
void
*
pBuf
;
SCtbIdxKey
ctbIdxKey
;
SSchemaDbKey
schemaDbKey
;
SSchemaWrapper
schemaWrapper
;
...
...
@@ -375,11 +374,11 @@ int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) {
STbCfg
*
metaGetTbInfoByUid
(
SMeta
*
pMeta
,
tb_uid_t
uid
)
{
int
ret
;
SMetaDB
*
pMetaDb
=
pMeta
->
pDB
;
void
*
pKey
;
void
*
pVal
;
void
*
pKey
;
void
*
pVal
;
int
kLen
;
int
vLen
;
STbCfg
*
pTbCfg
;
STbCfg
*
pTbCfg
;
// Fetch
pKey
=
&
uid
;
...
...
@@ -431,14 +430,14 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
}
static
SSchemaWrapper
*
metaGetTableSchemaImpl
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
,
bool
isinline
,
bool
isGetEx
)
{
void
*
pKey
;
void
*
pVal
;
void
*
pKey
;
void
*
pVal
;
int
kLen
;
int
vLen
;
int
ret
;
SSchemaDbKey
schemaDbKey
;
SSchemaWrapper
*
pSchemaWrapper
;
void
*
pBuf
;
void
*
pBuf
;
// fetch
schemaDbKey
.
uid
=
uid
;
...
...
@@ -465,9 +464,9 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
tb_uid_t
quid
;
SSchemaWrapper
*
pSW
;
STSchemaBuilder
sb
;
SSchemaEx
*
pSchema
;
STSchema
*
pTSchema
;
STbCfg
*
pTbCfg
;
SSchemaEx
*
pSchema
;
STSchema
*
pTSchema
;
STbCfg
*
pTbCfg
;
pTbCfg
=
metaGetTbInfoByUid
(
pMeta
,
uid
);
if
(
pTbCfg
->
type
==
META_CHILD_TABLE
)
{
...
...
@@ -498,7 +497,7 @@ struct SMTbCursor {
SMTbCursor
*
metaOpenTbCursor
(
SMeta
*
pMeta
)
{
SMTbCursor
*
pTbCur
=
NULL
;
SMetaDB
*
pDB
=
pMeta
->
pDB
;
SMetaDB
*
pDB
=
pMeta
->
pDB
;
pTbCur
=
(
SMTbCursor
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
pTbCur
));
if
(
pTbCur
==
NULL
)
{
...
...
@@ -520,12 +519,12 @@ void metaCloseTbCursor(SMTbCursor *pTbCur) {
}
char
*
metaTbCursorNext
(
SMTbCursor
*
pTbCur
)
{
void
*
pKey
=
NULL
;
void
*
pVal
=
NULL
;
void
*
pKey
=
NULL
;
void
*
pVal
=
NULL
;
int
kLen
;
int
vLen
;
int
ret
;
void
*
pBuf
;
void
*
pBuf
;
STbCfg
tbCfg
;
for
(;;)
{
...
...
@@ -548,17 +547,17 @@ char *metaTbCursorNext(SMTbCursor *pTbCur) {
}
struct
SMCtbCursor
{
TDBC
*
pCur
;
TDBC
*
pCur
;
tb_uid_t
suid
;
void
*
pKey
;
void
*
pVal
;
void
*
pKey
;
void
*
pVal
;
int
kLen
;
int
vLen
;
};
SMCtbCursor
*
metaOpenCtbCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
)
{
SMCtbCursor
*
pCtbCur
=
NULL
;
SMetaDB
*
pDB
=
pMeta
->
pDB
;
SMetaDB
*
pDB
=
pMeta
->
pDB
;
int
ret
;
pCtbCur
=
(
SMCtbCursor
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
pCtbCur
));
...
...
@@ -654,7 +653,6 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) {
continue
;
}
++
pSW
->
number
;
STSma
*
tptr
=
(
STSma
*
)
taosMemoryRealloc
(
pSW
->
tSma
,
pSW
->
number
*
sizeof
(
STSma
));
if
(
tptr
==
NULL
)
{
...
...
@@ -709,10 +707,10 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
// ASSERT(0);
#ifdef META_TDB_SMA_TEST
int32_t
ret
=
0
;
int32_t
ret
=
0
;
SMetaDB
*
pMetaDb
=
pMeta
->
pDB
;
void
*
pBuf
=
NULL
,
*
qBuf
=
NULL
;
void
*
key
=
{
0
},
*
val
=
{
0
};
void
*
pBuf
=
NULL
,
*
qBuf
=
NULL
;
void
*
key
=
{
0
},
*
val
=
{
0
};
// save sma info
int32_t
len
=
tEncodeTSma
(
NULL
,
pSmaCfg
);
...
...
@@ -1103,7 +1101,7 @@ static void closePool(SPoolMem *pPool) {
}
static
void
*
poolMalloc
(
void
*
arg
,
size_t
size
)
{
void
*
ptr
=
NULL
;
void
*
ptr
=
NULL
;
SPoolMem
*
pPool
=
(
SPoolMem
*
)
arg
;
SPoolMem
*
pMem
;
...
...
source/dnode/vnode/src/meta/metaTbCfg.c
浏览文件 @
e0654bac
...
...
@@ -14,7 +14,6 @@
*/
#include "vnodeInt.h"
#include "tcoding.h"
int
metaValidateTbCfg
(
SMeta
*
pMeta
,
const
STbCfg
*
pTbOptions
)
{
// TODO
...
...
source/dnode/vnode/src/meta/metaTbTag.c
浏览文件 @
e0654bac
...
...
@@ -11,4 +11,6 @@
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
*/
#include "vnodeInt.h"
\ No newline at end of file
source/dnode/vnode/src/tq/tq.c
浏览文件 @
e0654bac
...
...
@@ -13,9 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tcompare.h"
#include "tdatablock.h"
#include "tstream.h"
#include "vnodeInt.h"
int32_t
tqInit
()
{
return
tqPushMgrInit
();
}
...
...
@@ -82,7 +79,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi
memcpy
(
data
,
msg
,
msgLen
);
if
(
msgType
==
TDMT_VND_SUBMIT
)
{
if
(
tsdbUpdateSmaWindow
(
pTq
->
pVnode
->
pTsdb
,
msg
)
!=
0
)
{
if
(
tsdbUpdateSmaWindow
(
pTq
->
pVnode
->
pTsdb
,
msg
,
version
)
!=
0
)
{
return
-
1
;
}
}
...
...
@@ -551,30 +548,40 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
rspV2
.
rspOffset
=
fetchOffset
;
int32_t
blockSz
=
taosArrayGetSize
(
pRes
);
int32_t
tl
en
=
0
;
int32_t
dataBlockStrL
en
=
0
;
for
(
int32_t
i
=
0
;
i
<
blockSz
;
i
++
)
{
SSDataBlock
*
pBlock
=
taosArrayGet
(
pRes
,
i
);
tl
en
+=
sizeof
(
SRetrieveTableRsp
)
+
blockGetEncodeSize
(
pBlock
);
dataBlockStrL
en
+=
sizeof
(
SRetrieveTableRsp
)
+
blockGetEncodeSize
(
pBlock
);
}
void
*
data
=
taosMemoryMalloc
(
tl
en
);
if
(
data
==
NULL
)
{
void
*
data
BlockBuf
=
taosMemoryMalloc
(
dataBlockStrL
en
);
if
(
data
BlockBuf
==
NULL
)
{
pMsg
->
code
=
-
1
;
taosMemoryFree
(
pHead
);
}
rspV2
.
blockData
=
data
;
rspV2
.
blockData
=
data
BlockBuf
;
void
*
dataBlockBuf
=
data
;
int32_t
pos
;
rspV2
.
blockPos
=
taosArrayInit
(
blockSz
,
sizeof
(
int32_t
));
for
(
int32_t
i
=
0
;
i
<
blockSz
;
i
++
)
{
pos
=
0
;
SSDataBlock
*
pBlock
=
taosArrayGet
(
pRes
,
i
);
blockCompressEncode
(
pBlock
,
dataBlockBuf
,
&
pos
,
pBlock
->
info
.
numOfCols
,
false
);
SSDataBlock
*
pBlock
=
taosArrayGet
(
pRes
,
i
);
SRetrieveTableRsp
*
pRetrieve
=
(
SRetrieveTableRsp
*
)
dataBlockBuf
;
pRetrieve
->
useconds
=
0
;
pRetrieve
->
precision
=
0
;
pRetrieve
->
compressed
=
0
;
pRetrieve
->
completed
=
1
;
pRetrieve
->
numOfRows
=
htonl
(
pBlock
->
info
.
rows
);
blockCompressEncode
(
pBlock
,
pRetrieve
->
data
,
&
pos
,
pBlock
->
info
.
numOfCols
,
false
);
taosArrayPush
(
rspV2
.
blockPos
,
&
rspV2
.
dataLen
);
rspV2
.
dataLen
+=
pos
;
dataBlockBuf
=
POINTER_SHIFT
(
dataBlockBuf
,
pos
);
int32_t
totLen
=
sizeof
(
SRetrieveTableRsp
)
+
pos
;
pRetrieve
->
compLen
=
htonl
(
totLen
);
rspV2
.
dataLen
+=
totLen
;
dataBlockBuf
=
POINTER_SHIFT
(
dataBlockBuf
,
totLen
);
}
ASSERT
(
POINTER_DISTANCE
(
dataBlockBuf
,
rspV2
.
blockData
)
<=
dataBlockStrLen
);
int32_t
msgLen
=
sizeof
(
SMqRspHead
)
+
tEncodeSMqPollRspV2
(
NULL
,
&
rspV2
);
void
*
buf
=
rpcMallocCont
(
msgLen
);
...
...
@@ -590,7 +597,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
/*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/
pMsg
->
pCont
=
buf
;
pMsg
->
contLen
=
tl
en
;
pMsg
->
contLen
=
msgL
en
;
pMsg
->
code
=
0
;
vDebug
(
"vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp"
,
pTq
->
pVnode
->
vgId
,
fetchOffset
,
pHead
->
msgType
,
consumerId
,
pReq
->
epoch
);
...
...
source/dnode/vnode/src/tq/tqCommit.c
浏览文件 @
e0654bac
...
...
@@ -12,3 +12,5 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
source/dnode/vnode/src/tq/tqMetaStore.c
浏览文件 @
e0654bac
...
...
@@ -14,13 +14,13 @@
*/
#include "vnodeInt.h"
// TODO:replace by an abstract file layer
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#include "osDir.h"
//
#include <fcntl.h>
//
#include <string.h>
//
#include <unistd.h>
//
#include "osDir.h"
#define TQ_META_NAME "tq.meta"
#define TQ_IDX_NAME "tq.idx"
#define TQ_IDX_NAME
"tq.idx"
static
int32_t
tqHandlePutCommitted
(
STqMetaStore
*
,
int64_t
key
,
void
*
value
);
static
void
*
tqHandleGetUncommitted
(
STqMetaStore
*
,
int64_t
key
);
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
e0654bac
...
...
@@ -13,8 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdatablock.h"
#include "vnode.h"
#include "vnodeInt.h"
STqReadHandle
*
tqInitSubmitMsgScanner
(
SMeta
*
pMeta
)
{
STqReadHandle
*
pReadHandle
=
taosMemoryMalloc
(
sizeof
(
STqReadHandle
));
...
...
@@ -88,7 +87,7 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo)
pBlockInfo
->
numOfCols
=
taosArrayGetSize
(
pHandle
->
pColIdList
);
pBlockInfo
->
rows
=
pHandle
->
pBlock
->
numOfRows
;
// pBlockInfo->uid = pHandle->pBlock->uid; // the uid can not be assigned to pBlockData.
// pBlockInfo->uid = pHandle->pBlock->uid; // the uid can not be assigned to pBlockData.
return
0
;
}
...
...
@@ -177,3 +176,41 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
}
return
pArray
;
}
void
tqReadHandleSetColIdList
(
STqReadHandle
*
pReadHandle
,
SArray
*
pColIdList
)
{
pReadHandle
->
pColIdList
=
pColIdList
;
}
int
tqReadHandleSetTbUidList
(
STqReadHandle
*
pHandle
,
const
SArray
*
tbUidList
)
{
if
(
pHandle
->
tbIdHash
)
{
taosHashClear
(
pHandle
->
tbIdHash
);
}
pHandle
->
tbIdHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
HASH_NO_LOCK
);
if
(
pHandle
->
tbIdHash
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
tbUidList
);
i
++
)
{
int64_t
*
pKey
=
(
int64_t
*
)
taosArrayGet
(
tbUidList
,
i
);
taosHashPut
(
pHandle
->
tbIdHash
,
pKey
,
sizeof
(
int64_t
),
NULL
,
0
);
}
return
0
;
}
int
tqReadHandleAddTbUidList
(
STqReadHandle
*
pHandle
,
const
SArray
*
tbUidList
)
{
if
(
pHandle
->
tbIdHash
==
NULL
)
{
pHandle
->
tbIdHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
HASH_NO_LOCK
);
if
(
pHandle
->
tbIdHash
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
}
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
tbUidList
);
i
++
)
{
int64_t
*
pKey
=
(
int64_t
*
)
taosArrayGet
(
tbUidList
,
i
);
taosHashPut
(
pHandle
->
tbIdHash
,
pKey
,
sizeof
(
int64_t
),
NULL
,
0
);
}
return
0
;
}
source/dnode/vnode/src/tsdb/tsdbBDBImpl.c
浏览文件 @
e0654bac
...
...
@@ -16,9 +16,7 @@
#define ALLOW_FORBID_FUNC
#include "db.h"
#include "taoserror.h"
#include "tcoding.h"
#include "thash.h"
#include "vnodeInt.h"
#define IMPL_WITH_LOCK 1
...
...
@@ -139,7 +137,7 @@ int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *key, uint32_t keySize, void *data,
return
0
;
}
void
*
tsdbGetSmaDataByKey
(
SDBFile
*
pDBF
,
void
*
key
,
uint32_t
keySize
,
uint32_t
*
valueSize
)
{
void
*
tsdbGetSmaDataByKey
(
SDBFile
*
pDBF
,
void
*
key
,
uint32_t
keySize
,
uint32_t
*
valueSize
)
{
void
*
result
=
NULL
;
DBT
key1
=
{
0
};
DBT
value1
=
{
0
};
...
...
source/dnode/vnode/src/tsdb/tsdbFS.c
浏览文件 @
e0654bac
...
...
@@ -13,9 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <regex.h>
#include "vnodeInt.h"
#include "os.h"
typedef
enum
{
TSDB_TXN_TEMP_FILE
=
0
,
TSDB_TXN_CURR_FILE
}
TSDB_TXN_FILE_T
;
static
const
char
*
tsdbTxnFname
[]
=
{
"current.t"
,
"current"
};
...
...
@@ -97,8 +95,8 @@ static int tsdbEncodeDFileSetArray(void **buf, SArray *pArray) {
return
tlen
;
}
static
void
*
tsdbDecodeDFileSetArray
(
STsdb
*
pRepo
,
void
*
buf
,
SArray
*
pArray
)
{
uint64_t
nset
=
0
;
static
void
*
tsdbDecodeDFileSetArray
(
STsdb
*
pRepo
,
void
*
buf
,
SArray
*
pArray
)
{
uint64_t
nset
=
0
;
taosArrayClear
(
pArray
);
...
...
@@ -122,7 +120,7 @@ static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) {
return
tlen
;
}
static
void
*
tsdbDecodeFSStatus
(
STsdb
*
pRepo
,
void
*
buf
,
SFSStatus
*
pStatus
)
{
static
void
*
tsdbDecodeFSStatus
(
STsdb
*
pRepo
,
void
*
buf
,
SFSStatus
*
pStatus
)
{
tsdbResetFSStatus
(
pStatus
);
// pStatus->pmf = &(pStatus->mf);
...
...
@@ -407,8 +405,8 @@ int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFil
static
int
tsdbSaveFSStatus
(
STsdb
*
pRepo
,
SFSStatus
*
pStatus
)
{
SFSHeader
fsheader
;
void
*
pBuf
=
NULL
;
void
*
ptr
;
void
*
pBuf
=
NULL
;
void
*
ptr
;
char
hbuf
[
TSDB_FILE_HEAD_SIZE
]
=
"
\0
"
;
char
tfname
[
TSDB_FILENAME_LEN
]
=
"
\0
"
;
char
cfname
[
TSDB_FILENAME_LEN
]
=
"
\0
"
;
...
...
@@ -592,7 +590,7 @@ void tsdbFSIterSeek(SFSIter *pIter, int fid) {
}
SDFileSet
*
tsdbFSIterNext
(
SFSIter
*
pIter
)
{
STsdbFS
*
pfs
=
pIter
->
pfs
;
STsdbFS
*
pfs
=
pIter
->
pfs
;
SDFileSet
*
pSet
;
if
(
pIter
->
index
<
0
)
{
...
...
@@ -651,12 +649,12 @@ static void tsdbGetTxnFname(STsdb *pRepo, TSDB_TXN_FILE_T ftype, char fname[]) {
}
static
int
tsdbOpenFSFromCurrent
(
STsdb
*
pRepo
)
{
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
TdFilePtr
pFile
=
NULL
;
void
*
buffer
=
NULL
;
void
*
buffer
=
NULL
;
SFSHeader
fsheader
;
char
current
[
TSDB_FILENAME_LEN
]
=
"
\0
"
;
void
*
ptr
;
void
*
ptr
;
tsdbGetTxnFname
(
pRepo
,
TSDB_TXN_CURR_FILE
,
current
);
...
...
@@ -746,7 +744,7 @@ _err:
// Scan and try to fix incorrect files
static
int
tsdbScanAndTryFixFS
(
STsdb
*
pRepo
)
{
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
SFSStatus
*
pStatus
=
pfs
->
cstatus
;
// if (tsdbScanAndTryFixMFile(pRepo) < 0) {
...
...
@@ -908,9 +906,9 @@ static int tsdbScanAndTryFixFS(STsdb *pRepo) {
// }
static
int
tsdbScanRootDir
(
STsdb
*
pRepo
)
{
char
rootDir
[
TSDB_FILENAME_LEN
];
char
bname
[
TSDB_FILENAME_LEN
];
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
char
rootDir
[
TSDB_FILENAME_LEN
];
char
bname
[
TSDB_FILENAME_LEN
];
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
const
STfsFile
*
pf
;
tsdbGetRootDir
(
REPO_ID
(
pRepo
),
rootDir
);
...
...
@@ -942,9 +940,9 @@ static int tsdbScanRootDir(STsdb *pRepo) {
}
static
int
tsdbScanDataDir
(
STsdb
*
pRepo
)
{
char
dataDir
[
TSDB_FILENAME_LEN
];
char
bname
[
TSDB_FILENAME_LEN
];
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
char
dataDir
[
TSDB_FILENAME_LEN
];
char
bname
[
TSDB_FILENAME_LEN
];
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
const
STfsFile
*
pf
;
tsdbGetDataDir
(
REPO_ID
(
pRepo
),
dataDir
);
...
...
@@ -1107,14 +1105,14 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const STfsFile *pf) {
// }
static
int
tsdbRestoreDFileSet
(
STsdb
*
pRepo
)
{
char
dataDir
[
TSDB_FILENAME_LEN
];
char
bname
[
TSDB_FILENAME_LEN
];
STfsDir
*
tdir
=
NULL
;
char
dataDir
[
TSDB_FILENAME_LEN
];
char
bname
[
TSDB_FILENAME_LEN
];
STfsDir
*
tdir
=
NULL
;
const
STfsFile
*
pf
=
NULL
;
const
char
*
pattern
=
"^v[0-9]+f[0-9]+
\\
.(head|data|last|smad|smal)(-ver[0-9]+)?$"
;
SArray
*
fArray
=
NULL
;
regex_t
regex
;
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
const
char
*
pattern
=
"^v[0-9]+f[0-9]+
\\
.(head|data|last|smad|smal)(-ver[0-9]+)?$"
;
SArray
*
fArray
=
NULL
;
regex_t
regex
;
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
tsdbGetDataDir
(
REPO_ID
(
pRepo
),
dataDir
);
...
...
@@ -1327,7 +1325,7 @@ static int tsdbComparTFILE(const void *arg1, const void *arg2) {
}
static
void
tsdbScanAndTryFixDFilesHeader
(
STsdb
*
pRepo
,
int32_t
*
nExpired
)
{
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
SFSStatus
*
pStatus
=
pfs
->
cstatus
;
SDFInfo
info
;
...
...
source/dnode/vnode/src/tsdb/tsdbOptions.c
浏览文件 @
e0654bac
...
...
@@ -26,15 +26,6 @@ const STsdbCfg defautlTsdbOptions = {.precision = 0,
.
update
=
0
,
.
compression
=
TWO_STAGE_COMP
};
int
tsdbOptionsInit
(
STsdbCfg
*
pTsdbOptions
)
{
// TODO
return
0
;
}
void
tsdbOptionsClear
(
STsdbCfg
*
pTsdbOptions
)
{
// TODO
}
int
tsdbValidateOptions
(
const
STsdbCfg
*
pTsdbOptions
)
{
// TODO
return
0
;
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
e0654bac
...
...
@@ -13,18 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "talgo.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tdataformat.h"
#include "texception.h"
#include "vnodeInt.h"
#include "filter.h"
#include "taosdef.h"
#include "tlosertree.h"
#include "tmsg.h"
#include "vnodeInt.h"
#define EXTRA_BYTES 2
...
...
source/dnode/vnode/src/tsdb/tsdbSma.c
浏览文件 @
e0654bac
...
...
@@ -105,8 +105,8 @@ struct SSmaStat {
// declaration of static functions
// expired window
static
int32_t
tsdbUpdateExpiredWindowImpl
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
);
static
int32_t
tsdbSetExpiredWindow
(
STsdb
*
pTsdb
,
SHashObj
*
pItemsHash
,
int64_t
indexUid
,
int64_t
winSKey
);
static
int32_t
tsdbUpdateExpiredWindowImpl
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
,
int64_t
version
);
static
int32_t
tsdbSetExpiredWindow
(
STsdb
*
pTsdb
,
SHashObj
*
pItemsHash
,
int64_t
indexUid
,
int64_t
winSKey
,
int64_t
version
);
static
int32_t
tsdbInitSmaStat
(
SSmaStat
**
pSmaStat
);
static
void
*
tsdbFreeSmaStatItem
(
SSmaStatItem
*
pSmaStatItem
);
static
int32_t
tsdbDestroySmaState
(
SSmaStat
*
pSmaStat
);
...
...
@@ -544,7 +544,7 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
return
TSDB_CODE_SUCCESS
;
};
static
int32_t
tsdbSetExpiredWindow
(
STsdb
*
pTsdb
,
SHashObj
*
pItemsHash
,
int64_t
indexUid
,
int64_t
winSKey
)
{
static
int32_t
tsdbSetExpiredWindow
(
STsdb
*
pTsdb
,
SHashObj
*
pItemsHash
,
int64_t
indexUid
,
int64_t
winSKey
,
int64_t
version
)
{
SSmaStatItem
*
pItem
=
taosHashGet
(
pItemsHash
,
&
indexUid
,
sizeof
(
indexUid
));
if
(
pItem
==
NULL
)
{
// TODO: use TSDB_SMA_STAT_EXPIRED and update by stream computing later
...
...
@@ -578,8 +578,7 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t
return
TSDB_CODE_FAILED
;
}
int8_t
state
=
TSDB_SMA_STAT_EXPIRED
;
if
(
taosHashPut
(
pItem
->
expiredWindows
,
&
winSKey
,
sizeof
(
TSKEY
),
&
state
,
sizeof
(
state
))
!=
0
)
{
if
(
taosHashPut
(
pItem
->
expiredWindows
,
&
winSKey
,
sizeof
(
TSKEY
),
&
version
,
sizeof
(
version
))
!=
0
)
{
// If error occurs during taosHashPut expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would
// tell query module to query raw TS data.
// N.B.
...
...
@@ -606,7 +605,8 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t
* @param msg SSubmitReq
* @return int32_t
*/
int32_t
tsdbUpdateExpiredWindowImpl
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
)
{
int32_t
tsdbUpdateExpiredWindowImpl
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
,
int64_t
version
)
{
// no time-range-sma, just return success
if
(
atomic_load_16
(
&
REPO_TSMA_NUM
(
pTsdb
))
<=
0
)
{
tsdbTrace
(
"vgId:%d not update expire window since no tSma"
,
REPO_ID
(
pTsdb
));
return
TSDB_CODE_SUCCESS
;
...
...
@@ -621,20 +621,6 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg) {
return
TSDB_CODE_FAILED
;
}
// TODO: decode the msg from Stream Computing module => start
#ifdef TSDB_SMA_TESTx
int64_t
indexUid
=
SMA_TEST_INDEX_UID
;
const
int32_t
SMA_TEST_EXPIRED_WINDOW_SIZE
=
10
;
TSKEY
expiredWindows
[
SMA_TEST_EXPIRED_WINDOW_SIZE
];
TSKEY
skey1
=
1646987196
*
1e3
;
for
(
int32_t
i
=
0
;
i
<
SMA_TEST_EXPIRED_WINDOW_SIZE
;
++
i
)
{
expiredWindows
[
i
]
=
skey1
+
i
;
}
#else
#endif
// TODO: decode the msg <= end
if
(
tsdbCheckAndInitSmaEnv
(
pTsdb
,
TSDB_SMA_TYPE_TIME_RANGE
)
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
TSDB_CODE_TDB_INIT_FAILED
;
return
TSDB_CODE_FAILED
;
...
...
@@ -700,7 +686,7 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg) {
TSKEY
winSKey
=
taosTimeTruncate
(
TD_ROW_KEY
(
row
),
&
interval
,
interval
.
precision
);
tsdbSetExpiredWindow
(
pTsdb
,
pItemsHash
,
pTSma
->
indexUid
,
winSKey
);
tsdbSetExpiredWindow
(
pTsdb
,
pItemsHash
,
pTSma
->
indexUid
,
winSKey
,
version
);
// TODO: release only when suid changes.
tdDestroyTSmaWrapper
(
pSW
);
...
...
@@ -975,7 +961,7 @@ static int tsdbSmaBeginCommit(SSmaEnv *pEnv) {
// start a new txn
tdbTxnOpen
(
pTxn
,
0
,
poolMalloc
,
poolFree
,
pEnv
->
pPool
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
);
if
(
tdbBegin
(
pEnv
->
dbEnv
,
pTxn
)
!=
0
)
{
tsdbWarn
(
"tsdbSma tdb
restart txn
fail"
);
tsdbWarn
(
"tsdbSma tdb
begin commit
fail"
);
return
-
1
;
}
return
0
;
...
...
@@ -986,7 +972,7 @@ static int tsdbSmaEndCommit(SSmaEnv *pEnv) {
// Commit current txn
if
(
tdbCommit
(
pEnv
->
dbEnv
,
pTxn
)
!=
0
)
{
tsdbWarn
(
"tsdbSma tdb commit fail"
);
tsdbWarn
(
"tsdbSma tdb
end
commit fail"
);
return
-
1
;
}
tdbTxnClose
(
pTxn
);
...
...
@@ -1009,12 +995,12 @@ static int tsdbSmaEndCommit(SSmaEnv *pEnv) {
static
int32_t
tsdbInsertTSmaDataImpl
(
STsdb
*
pTsdb
,
int64_t
indexUid
,
const
char
*
msg
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pTsdb
);
const
SArray
*
pDataBlocks
=
(
const
SArray
*
)
msg
;
SSmaEnv
*
pEnv
=
atomic_load_ptr
(
&
REPO_TSMA_ENV
(
pTsdb
));
if
(
pEnv
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
tsdbWarn
(
"vgId:%d insert tSma data failed since pTSmaEnv is NULL"
,
REPO_ID
(
pTsdb
));
return
terrno
;
// For super table aggregation, the sma data is stored in vgroup calculated from the hash value of stable name. Thus
// the sma data would arrive ahead of the update-expired-window msg.
if
(
tsdbCheckAndInitSmaEnv
(
pTsdb
,
TSDB_SMA_TYPE_TIME_RANGE
)
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
TSDB_CODE_TDB_INIT_FAILED
;
return
TSDB_CODE_FAILED
;
}
if
(
pDataBlocks
==
NULL
)
{
...
...
@@ -1029,6 +1015,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
return
TSDB_CODE_FAILED
;
}
SSmaEnv
*
pEnv
=
REPO_TSMA_ENV
(
pTsdb
);
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
SSmaStatItem
*
pItem
=
NULL
;
...
...
@@ -1683,9 +1670,9 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg) {
return
code
;
}
int32_t
tsdbUpdateSmaWindow
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
)
{
int32_t
tsdbUpdateSmaWindow
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
,
int64_t
version
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
((
code
=
tsdbUpdateExpiredWindowImpl
(
pTsdb
,
pMsg
))
<
0
)
{
if
((
code
=
tsdbUpdateExpiredWindowImpl
(
pTsdb
,
pMsg
,
version
))
<
0
)
{
tsdbWarn
(
"vgId:%d update expired sma window failed since %s"
,
REPO_ID
(
pTsdb
),
tstrerror
(
terrno
));
}
return
code
;
...
...
source/dnode/vnode/src/vnd/vnodeCfg.c
浏览文件 @
e0654bac
...
...
@@ -18,13 +18,6 @@
const
SVnodeCfg
defaultVnodeOptions
=
{
.
wsize
=
96
*
1024
*
1024
,
.
ssize
=
1
*
1024
*
1024
,
.
lsize
=
1024
,
.
walCfg
=
{.
level
=
TAOS_WAL_WRITE
}};
/* TODO */
void
vnodeOptionsInit
(
SVnodeCfg
*
pVnodeOptions
)
{
/* TODO */
vnodeOptionsCopy
(
pVnodeOptions
,
&
defaultVnodeOptions
);
}
void
vnodeOptionsClear
(
SVnodeCfg
*
pVnodeOptions
)
{
/* TODO */
}
int
vnodeValidateOptions
(
const
SVnodeCfg
*
pVnodeOptions
)
{
// TODO
return
0
;
...
...
@@ -36,14 +29,14 @@ void vnodeOptionsCopy(SVnodeCfg *pDest, const SVnodeCfg *pSrc) {
int
vnodeValidateTableHash
(
SVnodeCfg
*
pVnodeOptions
,
char
*
tableFName
)
{
uint32_t
hashValue
=
0
;
switch
(
pVnodeOptions
->
hashMethod
)
{
default:
hashValue
=
MurmurHash3_32
(
tableFName
,
strlen
(
tableFName
));
break
;
}
// TODO OPEN THIS !!!!!!!
// TODO OPEN THIS !!!!!!!
#if 0
if (hashValue < pVnodeOptions->hashBegin || hashValue > pVnodeOptions->hashEnd) {
terrno = TSDB_CODE_VND_HASH_MISMATCH;
...
...
@@ -53,5 +46,3 @@ int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName) {
return
TSDB_CODE_SUCCESS
;
}
source/dnode/vnode/src/vnd/vnodeInt.c
浏览文件 @
e0654bac
...
...
@@ -14,7 +14,6 @@
*/
#define _DEFAULT_SOURCE
#include "sync.h"
#include "vnodeInt.h"
// #include "vnodeInt.h"
...
...
source/dnode/vnode/src/vnd/vnodeMgr.c
浏览文件 @
e0654bac
...
...
@@ -14,7 +14,6 @@
*/
#include "vnodeInt.h"
#include "tglobal.h"
SVnodeMgr
vnodeMgr
=
{.
vnodeInitFlag
=
TD_MOD_UNINITIALIZED
};
...
...
source/dnode/vnode/src/vnd/vnodeQuery.c
浏览文件 @
e0654bac
...
...
@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor.h"
#include "vnodeInt.h"
static
int
vnodeGetTableMeta
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
...
...
source/dnode/vnode/src/vnd/vnodeWrite.c
浏览文件 @
e0654bac
...
...
@@ -81,7 +81,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// TODO: to encapsule a free API
taosMemoryFree
(
vCreateTbReq
.
stbCfg
.
pSchema
);
taosMemoryFree
(
vCreateTbReq
.
stbCfg
.
pTagSchema
);
if
(
vCreateTbReq
.
stbCfg
.
pRSmaParam
)
{
if
(
vCreateTbReq
.
stbCfg
.
pRSmaParam
)
{
taosMemoryFree
(
vCreateTbReq
.
stbCfg
.
pRSmaParam
->
pFuncIds
);
taosMemoryFree
(
vCreateTbReq
.
stbCfg
.
pRSmaParam
);
}
...
...
@@ -235,13 +235,13 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
if
(
tsdbCreateTSma
(
pVnode
->
pTsdb
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)))
<
0
)
{
// TODO
}
// } break;
// case TDMT_VND_CANCEL_SMA: { // timeRangeSMA
// } break;
// case TDMT_VND_DROP_SMA: { // timeRangeSMA
// if (tsdbDropTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
// // TODO
// }
// } break;
// case TDMT_VND_CANCEL_SMA: { // timeRangeSMA
// } break;
// case TDMT_VND_DROP_SMA: { // timeRangeSMA
// if (tsdbDropTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
// // TODO
// }
#if 0
tsdbTSmaSub(pVnode->pTsdb, 1);
SVDropTSmaReq vDropSmaReq = {0};
...
...
source/dnode/vnode/test/tsdbSmaTest.cpp
浏览文件 @
e0654bac
...
...
@@ -409,7 +409,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
EXPECT_EQ
(
tdScanAndConvertSubmitMsg
(
pMsg
),
TSDB_CODE_SUCCESS
);
EXPECT_EQ
(
tsdbUpdateSmaWindow
(
pTsdb
,
pMsg
),
0
);
EXPECT_EQ
(
tsdbUpdateSmaWindow
(
pTsdb
,
pMsg
,
0
),
0
);
// init
const
int32_t
tSmaGroupSize
=
4
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
e0654bac
...
...
@@ -543,7 +543,7 @@ EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) {
char
*
dbName
=
nodesGetValueFromNode
(
node
);
strncpy
(
pContext
,
varDataVal
(
dbName
),
varDataLen
(
dbName
));
*
((
char
*
)
pContext
+
varDataLen
(
dbName
))
=
0
;
return
DEAL_RES_E
RROR
;
// stop walk
return
DEAL_RES_E
ND
;
// stop walk
}
default:
break
;
...
...
source/libs/nodes/src/nodesTraverseFuncs.c
浏览文件 @
e0654bac
...
...
@@ -46,7 +46,7 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
case
QUERY_NODE_OPERATOR
:
{
SOperatorNode
*
pOpNode
=
(
SOperatorNode
*
)
pNode
;
res
=
walkNode
(
pOpNode
->
pLeft
,
order
,
walker
,
pContext
);
if
(
DEAL_RES_ERROR
!=
res
)
{
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
)
{
res
=
walkNode
(
pOpNode
->
pRight
,
order
,
walker
,
pContext
);
}
break
;
...
...
@@ -63,10 +63,10 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
case
QUERY_NODE_JOIN_TABLE
:
{
SJoinTableNode
*
pJoinTableNode
=
(
SJoinTableNode
*
)
pNode
;
res
=
walkNode
(
pJoinTableNode
->
pLeft
,
order
,
walker
,
pContext
);
if
(
DEAL_RES_ERROR
!=
res
)
{
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
)
{
res
=
walkNode
(
pJoinTableNode
->
pRight
,
order
,
walker
,
pContext
);
}
if
(
DEAL_RES_ERROR
!=
res
)
{
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
)
{
res
=
walkNode
(
pJoinTableNode
->
pOnCond
,
order
,
walker
,
pContext
);
}
break
;
...
...
@@ -80,7 +80,7 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
case
QUERY_NODE_STATE_WINDOW
:
{
SStateWindowNode
*
pState
=
(
SStateWindowNode
*
)
pNode
;
res
=
walkNode
(
pState
->
pExpr
,
order
,
walker
,
pContext
);
if
(
DEAL_RES_ERROR
!=
res
)
{
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
)
{
res
=
walkNode
(
pState
->
pCol
,
order
,
walker
,
pContext
);
}
break
;
...
...
@@ -88,7 +88,7 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
case
QUERY_NODE_SESSION_WINDOW
:
{
SSessionWindowNode
*
pSession
=
(
SSessionWindowNode
*
)
pNode
;
res
=
walkNode
(
pSession
->
pCol
,
order
,
walker
,
pContext
);
if
(
DEAL_RES_ERROR
!=
res
)
{
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
)
{
res
=
walkNode
(
pSession
->
pGap
,
order
,
walker
,
pContext
);
}
break
;
...
...
@@ -96,16 +96,16 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
case
QUERY_NODE_INTERVAL_WINDOW
:
{
SIntervalWindowNode
*
pInterval
=
(
SIntervalWindowNode
*
)
pNode
;
res
=
walkNode
(
pInterval
->
pInterval
,
order
,
walker
,
pContext
);
if
(
DEAL_RES_ERROR
!=
res
)
{
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
)
{
res
=
walkNode
(
pInterval
->
pOffset
,
order
,
walker
,
pContext
);
}
if
(
DEAL_RES_ERROR
!=
res
)
{
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
)
{
res
=
walkNode
(
pInterval
->
pSliding
,
order
,
walker
,
pContext
);
}
if
(
DEAL_RES_ERROR
!=
res
)
{
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
)
{
res
=
walkNode
(
pInterval
->
pFill
,
order
,
walker
,
pContext
);
}
if
(
DEAL_RES_ERROR
!=
res
)
{
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
)
{
res
=
walkNode
(
pInterval
->
pCol
,
order
,
walker
,
pContext
);
}
break
;
...
...
@@ -126,7 +126,7 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
break
;
}
if
(
DEAL_RES_ERROR
!=
res
&&
TRAVERSAL_POSTORDER
==
order
)
{
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
&&
TRAVERSAL_POSTORDER
==
order
)
{
res
=
walker
(
pNode
,
pContext
);
}
...
...
@@ -136,8 +136,9 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker
static
EDealRes
walkList
(
SNodeList
*
pNodeList
,
ETraversalOrder
order
,
FNodeWalker
walker
,
void
*
pContext
)
{
SNode
*
node
;
FOREACH
(
node
,
pNodeList
)
{
if
(
DEAL_RES_ERROR
==
walkNode
(
node
,
order
,
walker
,
pContext
))
{
return
DEAL_RES_ERROR
;
EDealRes
res
=
walkNode
(
node
,
order
,
walker
,
pContext
);
if
(
DEAL_RES_ERROR
==
res
||
DEAL_RES_END
==
res
)
{
return
res
;
}
}
return
DEAL_RES_CONTINUE
;
...
...
@@ -185,7 +186,7 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
case
QUERY_NODE_OPERATOR
:
{
SOperatorNode
*
pOpNode
=
(
SOperatorNode
*
)
pNode
;
res
=
rewriteNode
(
&
(
pOpNode
->
pLeft
),
order
,
rewriter
,
pContext
);
if
(
DEAL_RES_ERROR
!=
res
)
{
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
)
{
res
=
rewriteNode
(
&
(
pOpNode
->
pRight
),
order
,
rewriter
,
pContext
);
}
break
;
...
...
@@ -202,10 +203,10 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
case
QUERY_NODE_JOIN_TABLE
:
{
SJoinTableNode
*
pJoinTableNode
=
(
SJoinTableNode
*
)
pNode
;
res
=
rewriteNode
(
&
(
pJoinTableNode
->
pLeft
),
order
,
rewriter
,
pContext
);
if
(
DEAL_RES_ERROR
!=
res
)
{
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
)
{
res
=
rewriteNode
(
&
(
pJoinTableNode
->
pRight
),
order
,
rewriter
,
pContext
);
}
if
(
DEAL_RES_ERROR
!=
res
)
{
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
)
{
res
=
rewriteNode
(
&
(
pJoinTableNode
->
pOnCond
),
order
,
rewriter
,
pContext
);
}
break
;
...
...
@@ -219,7 +220,7 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
case
QUERY_NODE_STATE_WINDOW
:
{
SStateWindowNode
*
pState
=
(
SStateWindowNode
*
)
pNode
;
res
=
rewriteNode
(
&
pState
->
pExpr
,
order
,
rewriter
,
pContext
);
if
(
DEAL_RES_ERROR
!=
res
)
{
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
)
{
res
=
rewriteNode
(
&
pState
->
pCol
,
order
,
rewriter
,
pContext
);
}
break
;
...
...
@@ -227,7 +228,7 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
case
QUERY_NODE_SESSION_WINDOW
:
{
SSessionWindowNode
*
pSession
=
(
SSessionWindowNode
*
)
pNode
;
res
=
rewriteNode
(
&
pSession
->
pCol
,
order
,
rewriter
,
pContext
);
if
(
DEAL_RES_ERROR
!=
res
)
{
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
)
{
res
=
rewriteNode
(
&
pSession
->
pGap
,
order
,
rewriter
,
pContext
);
}
break
;
...
...
@@ -235,16 +236,16 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
case
QUERY_NODE_INTERVAL_WINDOW
:
{
SIntervalWindowNode
*
pInterval
=
(
SIntervalWindowNode
*
)
pNode
;
res
=
rewriteNode
(
&
(
pInterval
->
pInterval
),
order
,
rewriter
,
pContext
);
if
(
DEAL_RES_ERROR
!=
res
)
{
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
)
{
res
=
rewriteNode
(
&
(
pInterval
->
pOffset
),
order
,
rewriter
,
pContext
);
}
if
(
DEAL_RES_ERROR
!=
res
)
{
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
)
{
res
=
rewriteNode
(
&
(
pInterval
->
pSliding
),
order
,
rewriter
,
pContext
);
}
if
(
DEAL_RES_ERROR
!=
res
)
{
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
)
{
res
=
rewriteNode
(
&
(
pInterval
->
pFill
),
order
,
rewriter
,
pContext
);
}
if
(
DEAL_RES_ERROR
!=
res
)
{
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
)
{
res
=
rewriteNode
(
&
(
pInterval
->
pCol
),
order
,
rewriter
,
pContext
);
}
break
;
...
...
@@ -265,7 +266,7 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
break
;
}
if
(
DEAL_RES_ERROR
!=
res
&&
TRAVERSAL_POSTORDER
==
order
)
{
if
(
DEAL_RES_ERROR
!=
res
&&
DEAL_RES_END
!=
res
&&
TRAVERSAL_POSTORDER
==
order
)
{
res
=
rewriter
(
pRawNode
,
pContext
);
}
...
...
@@ -275,8 +276,9 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit
static
EDealRes
rewriteList
(
SNodeList
*
pNodeList
,
ETraversalOrder
order
,
FNodeRewriter
rewriter
,
void
*
pContext
)
{
SNode
**
pNode
;
FOREACH_FOR_REWRITE
(
pNode
,
pNodeList
)
{
if
(
DEAL_RES_ERROR
==
rewriteNode
(
pNode
,
order
,
rewriter
,
pContext
))
{
return
DEAL_RES_ERROR
;
EDealRes
res
=
rewriteNode
(
pNode
,
order
,
rewriter
,
pContext
);
if
(
DEAL_RES_ERROR
==
res
||
DEAL_RES_END
==
res
)
{
return
res
;
}
}
return
DEAL_RES_CONTINUE
;
...
...
source/libs/parser/inc/sql.y
浏览文件 @
e0654bac
...
...
@@ -588,6 +588,7 @@ column_reference(A) ::= column_name(B).
column_reference(A) ::= table_name(B) NK_DOT column_name(C). { A = createRawExprNodeExt(pCxt, &B, &C, createColumnNode(pCxt, &B, &C)); }
pseudo_column(A) ::= NOW(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
pseudo_column(A) ::= TODAY(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
pseudo_column(A) ::= ROWTS(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
pseudo_column(A) ::= TBNAME(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
pseudo_column(A) ::= QSTARTTS(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
...
...
source/libs/parser/src/parAstCreater.c
浏览文件 @
e0654bac
...
...
@@ -335,8 +335,26 @@ SNode* createLogicConditionNode(SAstCreateContext* pCxt, ELogicConditionType typ
CHECK_OUT_OF_MEM
(
cond
);
cond
->
condType
=
type
;
cond
->
pParameterList
=
nodesMakeList
();
nodesListAppend
(
cond
->
pParameterList
,
pParam1
);
nodesListAppend
(
cond
->
pParameterList
,
pParam2
);
if
((
QUERY_NODE_LOGIC_CONDITION
==
nodeType
(
pParam1
)
&&
type
!=
((
SLogicConditionNode
*
)
pParam1
)
->
condType
)
||
(
QUERY_NODE_LOGIC_CONDITION
==
nodeType
(
pParam2
)
&&
type
!=
((
SLogicConditionNode
*
)
pParam2
)
->
condType
))
{
nodesListAppend
(
cond
->
pParameterList
,
pParam1
);
nodesListAppend
(
cond
->
pParameterList
,
pParam2
);
}
else
{
if
(
QUERY_NODE_LOGIC_CONDITION
==
nodeType
(
pParam1
))
{
nodesListAppendList
(
cond
->
pParameterList
,
((
SLogicConditionNode
*
)
pParam1
)
->
pParameterList
);
((
SLogicConditionNode
*
)
pParam1
)
->
pParameterList
=
NULL
;
nodesDestroyNode
(
pParam1
);
}
else
{
nodesListAppend
(
cond
->
pParameterList
,
pParam1
);
}
if
(
QUERY_NODE_LOGIC_CONDITION
==
nodeType
(
pParam2
))
{
nodesListAppendList
(
cond
->
pParameterList
,
((
SLogicConditionNode
*
)
pParam2
)
->
pParameterList
);
((
SLogicConditionNode
*
)
pParam2
)
->
pParameterList
=
NULL
;
nodesDestroyNode
(
pParam2
);
}
else
{
nodesListAppend
(
cond
->
pParameterList
,
pParam2
);
}
}
return
(
SNode
*
)
cond
;
}
...
...
source/libs/parser/src/parInsert.c
浏览文件 @
e0654bac
...
...
@@ -325,6 +325,11 @@ static int parseTime(char **end, SToken *pToken, int16_t timePrec, int64_t *time
for
(
int
k
=
pToken
->
n
;
pToken
->
z
[
k
]
!=
'\0'
;
k
++
)
{
if
(
pToken
->
z
[
k
]
==
' '
||
pToken
->
z
[
k
]
==
'\t'
)
continue
;
if
(
pToken
->
z
[
k
]
==
'('
&&
pToken
->
z
[
k
+
1
]
==
')'
)
{
//for insert NOW()/TODAY()
*
end
=
pTokenEnd
=
&
pToken
->
z
[
k
+
2
];
k
++
;
continue
;
}
if
(
pToken
->
z
[
k
]
==
','
)
{
*
end
=
pTokenEnd
;
*
time
=
ts
;
...
...
source/libs/parser/src/parTokenizer.c
浏览文件 @
e0654bac
...
...
@@ -175,6 +175,7 @@ static SKeyword keywordTable[] = {
{
"TBNAME"
,
TK_TBNAME
},
{
"TIMESTAMP"
,
TK_TIMESTAMP
},
{
"TINYINT"
,
TK_TINYINT
},
{
"TODAY"
,
TK_TODAY
},
{
"TOPIC"
,
TK_TOPIC
},
{
"TOPICS"
,
TK_TOPICS
},
{
"TSERIES"
,
TK_TSERIES
},
...
...
source/libs/parser/src/sql.c
浏览文件 @
e0654bac
此差异已折叠。
点击以展开。
source/libs/parser/test/mockCatalogService.cpp
浏览文件 @
e0654bac
...
...
@@ -146,6 +146,7 @@ public:
meta_
[
db
][
tbname
].
reset
(
new
MockTableMeta
());
meta_
[
db
][
tbname
]
->
schema
=
table
.
release
();
meta_
[
db
][
tbname
]
->
schema
->
uid
=
id_
++
;
meta_
[
db
][
tbname
]
->
schema
->
tableType
=
TSDB_CHILD_TABLE
;
SVgroupInfo
vgroup
=
{.
vgId
=
vgid
,
.
hashBegin
=
0
,
.
hashEnd
=
0
,};
addEpIntoEpSet
(
&
vgroup
.
epSet
,
"dnode_1"
,
6030
);
...
...
@@ -197,11 +198,11 @@ public:
std
::
cout
<<
"Table:"
<<
table
.
first
<<
std
::
endl
;
std
::
cout
<<
SH
(
"Field"
)
<<
SH
(
"Type"
)
<<
SH
(
"DataType"
)
<<
IH
(
"Bytes"
)
<<
std
::
endl
;
std
::
cout
<<
SL
(
3
,
1
)
<<
std
::
endl
;
int16_t
numOf
Tags
=
schema
->
tableInfo
.
numOfTag
s
;
int16_t
numOfFields
=
numOf
Tags
+
schema
->
tableInfo
.
numOfColumn
s
;
int16_t
numOf
Columns
=
schema
->
tableInfo
.
numOfColumn
s
;
int16_t
numOfFields
=
numOf
Columns
+
schema
->
tableInfo
.
numOfTag
s
;
for
(
int16_t
i
=
0
;
i
<
numOfFields
;
++
i
)
{
const
SSchema
*
col
=
schema
->
schema
+
i
;
std
::
cout
<<
SF
(
std
::
string
(
col
->
name
))
<<
SH
(
ftToString
(
i
,
numOf
Tag
s
))
<<
SH
(
dtToString
(
col
->
type
))
<<
IF
(
col
->
bytes
)
<<
std
::
endl
;
std
::
cout
<<
SF
(
std
::
string
(
col
->
name
))
<<
SH
(
ftToString
(
i
,
numOf
Column
s
))
<<
SH
(
dtToString
(
col
->
type
))
<<
IF
(
col
->
bytes
)
<<
std
::
endl
;
}
std
::
cout
<<
std
::
endl
;
}
...
...
@@ -262,8 +263,8 @@ private:
return
tDataTypes
[
type
].
name
;
}
std
::
string
ftToString
(
int16_t
colid
,
int16_t
numOf
Tag
s
)
const
{
return
(
0
==
colid
?
"column"
:
(
colid
<=
numOf
Tag
s
?
"tag"
:
"column"
));
std
::
string
ftToString
(
int16_t
colid
,
int16_t
numOf
Column
s
)
const
{
return
(
0
==
colid
?
"column"
:
(
colid
<=
numOf
Column
s
?
"tag"
:
"column"
));
}
STableMeta
*
getTableSchemaMeta
(
const
std
::
string
&
db
,
const
std
::
string
&
tbname
)
const
{
...
...
source/libs/planner/src/planLogicCreater.c
浏览文件 @
e0654bac
...
...
@@ -694,7 +694,6 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS
}
return
code
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
createDistinctLogicNode
(
SLogicPlanContext
*
pCxt
,
SSelectStmt
*
pSelect
,
SLogicNode
**
pLogicNode
)
{
...
...
source/libs/planner/src/planOptimizer.c
浏览文件 @
e0654bac
...
...
@@ -41,6 +41,21 @@ typedef struct SOsdInfo {
SNodeList
*
pDsoFuncs
;
}
SOsdInfo
;
typedef
struct
SCpdIsMultiTableCondCxt
{
SNodeList
*
pLeftCols
;
SNodeList
*
pRightCols
;
bool
havaLeftCol
;
bool
haveRightCol
;
}
SCpdIsMultiTableCondCxt
;
typedef
enum
ECondAction
{
COND_ACTION_STAY
=
1
,
COND_ACTION_PUSH_JOIN
,
COND_ACTION_PUSH_LEFT_CHILD
,
COND_ACTION_PUSH_RIGHT_CHILD
// after supporting outer join, there are other possibilities
}
ECondAction
;
static
bool
osdMayBeOptimized
(
SLogicNode
*
pNode
)
{
if
(
OPTIMIZE_FLAG_TEST_MASK
(
pNode
->
optimizedFlag
,
OPTIMIZE_FLAG_OSD
))
{
return
false
;
...
...
@@ -152,36 +167,229 @@ static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode*
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
cpdPartitionCondition
(
SJoinLogicNode
*
pJoin
,
SNodeList
**
pMultiTableCond
,
SNodeList
**
pSingleTableCond
)
{
// todo
return
TSDB_CODE_SUCCESS
;
static
bool
belongThisTable
(
SNode
*
pCondCol
,
SNodeList
*
pTableCols
)
{
SNode
*
pTableCol
=
NULL
;
FOREACH
(
pTableCol
,
pTableCols
)
{
if
(
nodesEqualNode
(
pCondCol
,
pTableCol
))
{
return
true
;
}
}
return
false
;
}
static
int32_t
cpdPushJoinCondToOnCond
(
SOptimizeContext
*
pCxt
,
SJoinLogicNode
*
pJoin
,
SNodeList
*
pMultiTableCond
)
{
// todo
return
TSDB_CODE_SUCCESS
;
static
EDealRes
cpdIsMultiTableCondImpl
(
SNode
*
pNode
,
void
*
pContext
)
{
SCpdIsMultiTableCondCxt
*
pCxt
=
pContext
;
if
(
QUERY_NODE_COLUMN
==
nodeType
(
pNode
))
{
if
(
belongThisTable
(
pNode
,
pCxt
->
pLeftCols
))
{
pCxt
->
havaLeftCol
=
true
;
}
else
if
(
belongThisTable
(
pNode
,
pCxt
->
pRightCols
))
{
pCxt
->
haveRightCol
=
true
;
}
return
pCxt
->
havaLeftCol
&&
pCxt
->
haveRightCol
?
DEAL_RES_END
:
DEAL_RES_CONTINUE
;
}
return
DEAL_RES_CONTINUE
;
}
static
int32_t
cpdPushJoinCondToChildren
(
SOptimizeContext
*
pCxt
,
SJoinLogicNode
*
pJoin
,
SNodeList
*
pSingleTableCond
)
{
// todo
static
ECondAction
cpdCondAction
(
EJoinType
joinType
,
SNodeList
*
pLeftCols
,
SNodeList
*
pRightCols
,
SNode
*
pNode
)
{
SCpdIsMultiTableCondCxt
cxt
=
{
.
pLeftCols
=
pLeftCols
,
.
pRightCols
=
pRightCols
,
.
havaLeftCol
=
false
,
.
haveRightCol
=
false
};
nodesWalkExpr
(
pNode
,
cpdIsMultiTableCondImpl
,
&
cxt
);
return
(
JOIN_TYPE_INNER
!=
joinType
?
COND_ACTION_STAY
:
(
cxt
.
havaLeftCol
&&
cxt
.
haveRightCol
?
COND_ACTION_PUSH_JOIN
:
(
cxt
.
havaLeftCol
?
COND_ACTION_PUSH_LEFT_CHILD
:
COND_ACTION_PUSH_RIGHT_CHILD
)));
}
static
int32_t
cpdMakeCond
(
SNodeList
**
pConds
,
SNode
**
pCond
)
{
if
(
NULL
==
*
pConds
)
{
return
TSDB_CODE_SUCCESS
;
}
if
(
1
==
LIST_LENGTH
(
*
pConds
))
{
*
pCond
=
nodesListGetNode
(
*
pConds
,
0
);
nodesClearList
(
*
pConds
);
}
else
{
SLogicConditionNode
*
pLogicCond
=
nodesMakeNode
(
QUERY_NODE_LOGIC_CONDITION
);
if
(
NULL
==
pLogicCond
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pLogicCond
->
condType
=
LOGIC_COND_TYPE_AND
;
pLogicCond
->
pParameterList
=
*
pConds
;
*
pCond
=
(
SNode
*
)
pLogicCond
;
}
*
pConds
=
NULL
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
cpdPushJoinCondition
(
SOptimizeContext
*
pCxt
,
SJoinLogicNode
*
pJoin
)
{
if
(
NULL
!=
pJoin
->
node
.
pConditions
)
{
SNodeList
*
pMultiTableCond
=
NULL
;
SNodeList
*
pSingleTableCond
=
NULL
;
int32_t
code
=
cpdPartitionCondition
(
pJoin
,
&
pMultiTableCond
,
&
pSingleTableCond
);
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pMultiTableCond
)
{
code
=
cpdPushJoinCondToOnCond
(
pCxt
,
pJoin
,
pMultiTableCond
);
static
int32_t
cpdPartitionLogicCond
(
SJoinLogicNode
*
pJoin
,
SNode
**
pOnCond
,
SNode
**
pLeftChildCond
,
SNode
**
pRightChildCond
)
{
SLogicConditionNode
*
pLogicCond
=
(
SLogicConditionNode
*
)
pJoin
->
node
.
pConditions
;
if
(
LOGIC_COND_TYPE_AND
!=
pLogicCond
->
condType
)
{
return
TSDB_CODE_SUCCESS
;
}
SNodeList
*
pLeftCols
=
((
SLogicNode
*
)
nodesListGetNode
(
pJoin
->
node
.
pChildren
,
0
))
->
pTargets
;
SNodeList
*
pRightCols
=
((
SLogicNode
*
)
nodesListGetNode
(
pJoin
->
node
.
pChildren
,
1
))
->
pTargets
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
SNodeList
*
pOnConds
=
NULL
;
SNodeList
*
pLeftChildConds
=
NULL
;
SNodeList
*
pRightChildConds
=
NULL
;
SNodeList
*
pRemainConds
=
NULL
;
SNode
*
pCond
=
NULL
;
FOREACH
(
pCond
,
pLogicCond
->
pParameterList
)
{
ECondAction
condAction
=
cpdCondAction
(
pJoin
->
joinType
,
pLeftCols
,
pRightCols
,
pCond
);
if
(
COND_ACTION_PUSH_JOIN
==
condAction
)
{
code
=
nodesListMakeAppend
(
&
pOnConds
,
nodesCloneNode
(
pCond
));
}
else
if
(
COND_ACTION_PUSH_LEFT_CHILD
==
condAction
)
{
code
=
nodesListMakeAppend
(
&
pLeftChildConds
,
nodesCloneNode
(
pCond
));
}
else
if
(
COND_ACTION_PUSH_RIGHT_CHILD
==
condAction
)
{
code
=
nodesListMakeAppend
(
&
pRightChildConds
,
nodesCloneNode
(
pCond
));
}
else
{
code
=
nodesListMakeAppend
(
&
pRemainConds
,
nodesCloneNode
(
pCond
));
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pSingleTableCond
)
{
code
=
cpdPushJoinCondToChildren
(
pCxt
,
pJoin
,
pSingleTableCond
)
;
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
break
;
}
}
SNode
*
pTempOnCond
=
NULL
;
SNode
*
pTempLeftChildCond
=
NULL
;
SNode
*
pTempRightChildCond
=
NULL
;
SNode
*
pTempRemainCond
=
NULL
;
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
cpdMakeCond
(
&
pOnConds
,
&
pTempOnCond
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
cpdMakeCond
(
&
pLeftChildConds
,
&
pTempLeftChildCond
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
cpdMakeCond
(
&
pRightChildConds
,
&
pTempRightChildCond
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
cpdMakeCond
(
&
pRemainConds
,
&
pTempRemainCond
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pOnCond
=
pTempOnCond
;
*
pLeftChildCond
=
pTempLeftChildCond
;
*
pRightChildCond
=
pTempRightChildCond
;
nodesDestroyNode
(
pJoin
->
node
.
pConditions
);
pJoin
->
node
.
pConditions
=
pTempRemainCond
;
}
else
{
nodesDestroyList
(
pOnConds
);
nodesDestroyList
(
pLeftChildConds
);
nodesDestroyList
(
pRightChildConds
);
nodesDestroyList
(
pRemainConds
);
nodesDestroyNode
(
pTempOnCond
);
nodesDestroyNode
(
pTempLeftChildCond
);
nodesDestroyNode
(
pTempRightChildCond
);
nodesDestroyNode
(
pTempRemainCond
);
}
return
code
;
}
static
int32_t
cpdPartitionOpCond
(
SJoinLogicNode
*
pJoin
,
SNode
**
pOnCond
,
SNode
**
pLeftChildCond
,
SNode
**
pRightChildCond
)
{
SNodeList
*
pLeftCols
=
((
SLogicNode
*
)
nodesListGetNode
(
pJoin
->
node
.
pChildren
,
0
))
->
pTargets
;
SNodeList
*
pRightCols
=
((
SLogicNode
*
)
nodesListGetNode
(
pJoin
->
node
.
pChildren
,
1
))
->
pTargets
;
ECondAction
condAction
=
cpdCondAction
(
pJoin
->
joinType
,
pLeftCols
,
pRightCols
,
pJoin
->
node
.
pConditions
);
if
(
COND_ACTION_STAY
==
condAction
)
{
return
TSDB_CODE_SUCCESS
;
}
else
if
(
COND_ACTION_PUSH_JOIN
==
condAction
)
{
*
pOnCond
=
pJoin
->
node
.
pConditions
;
}
else
if
(
COND_ACTION_PUSH_LEFT_CHILD
==
condAction
)
{
*
pLeftChildCond
=
pJoin
->
node
.
pConditions
;
}
else
if
(
COND_ACTION_PUSH_RIGHT_CHILD
==
condAction
)
{
*
pRightChildCond
=
pJoin
->
node
.
pConditions
;
}
pJoin
->
node
.
pConditions
=
NULL
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
cpdPartitionCond
(
SJoinLogicNode
*
pJoin
,
SNode
**
pOnCond
,
SNode
**
pLeftChildCond
,
SNode
**
pRightChildCond
)
{
if
(
QUERY_NODE_LOGIC_CONDITION
==
nodeType
(
pJoin
->
node
.
pConditions
))
{
return
cpdPartitionLogicCond
(
pJoin
,
pOnCond
,
pLeftChildCond
,
pRightChildCond
);
}
else
{
return
cpdPartitionOpCond
(
pJoin
,
pOnCond
,
pLeftChildCond
,
pRightChildCond
);
}
}
static
int32_t
cpdCondAppend
(
SOptimizeContext
*
pCxt
,
SNode
**
pCond
,
SNode
**
pAdditionalCond
)
{
if
(
NULL
==
*
pCond
)
{
TSWAP
(
*
pCond
,
*
pAdditionalCond
,
SNode
*
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
QUERY_NODE_LOGIC_CONDITION
==
nodeType
(
*
pCond
))
{
code
=
nodesListAppend
(((
SLogicConditionNode
*
)
*
pCond
)
->
pParameterList
,
*
pAdditionalCond
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pAdditionalCond
=
NULL
;
}
}
else
{
SLogicConditionNode
*
pLogicCond
=
nodesMakeNode
(
QUERY_NODE_LOGIC_CONDITION
);
if
(
NULL
==
pLogicCond
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pLogicCond
->
condType
=
LOGIC_COND_TYPE_AND
;
code
=
nodesListMakeAppend
(
&
pLogicCond
->
pParameterList
,
*
pAdditionalCond
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pAdditionalCond
=
NULL
;
code
=
nodesListMakeAppend
(
&
pLogicCond
->
pParameterList
,
*
pCond
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pCond
=
(
SNode
*
)
pLogicCond
;
}
else
{
nodesDestroyNode
(
pLogicCond
);
}
}
return
code
;
}
static
int32_t
cpdPushCondToOnCond
(
SOptimizeContext
*
pCxt
,
SJoinLogicNode
*
pJoin
,
SNode
**
pCond
)
{
return
cpdCondAppend
(
pCxt
,
&
pJoin
->
pOnConditions
,
pCond
);
}
static
int32_t
cpdPushCondToScan
(
SOptimizeContext
*
pCxt
,
SScanLogicNode
*
pScan
,
SNode
**
pCond
)
{
return
cpdCondAppend
(
pCxt
,
&
pScan
->
node
.
pConditions
,
pCond
);
}
static
int32_t
cpdPushCondToChild
(
SOptimizeContext
*
pCxt
,
SLogicNode
*
pChild
,
SNode
**
pCond
)
{
switch
(
nodeType
(
pChild
))
{
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
return
cpdPushCondToScan
(
pCxt
,
(
SScanLogicNode
*
)
pChild
,
pCond
);
default:
break
;
}
return
TSDB_CODE_PLAN_INTERNAL_ERROR
;
}
static
int32_t
cpdPushJoinCondition
(
SOptimizeContext
*
pCxt
,
SJoinLogicNode
*
pJoin
)
{
if
(
NULL
==
pJoin
->
node
.
pConditions
)
{
return
TSDB_CODE_SUCCESS
;
}
SNode
*
pOnCond
=
NULL
;
SNode
*
pLeftChildCond
=
NULL
;
SNode
*
pRightChildCond
=
NULL
;
int32_t
code
=
cpdPartitionCond
(
pJoin
,
&
pOnCond
,
&
pLeftChildCond
,
&
pRightChildCond
);
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pOnCond
)
{
code
=
cpdPushCondToOnCond
(
pCxt
,
pJoin
,
&
pOnCond
);
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pLeftChildCond
)
{
code
=
cpdPushCondToChild
(
pCxt
,
(
SLogicNode
*
)
nodesListGetNode
(
pJoin
->
node
.
pChildren
,
0
),
&
pLeftChildCond
);
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pRightChildCond
)
{
code
=
cpdPushCondToChild
(
pCxt
,
(
SLogicNode
*
)
nodesListGetNode
(
pJoin
->
node
.
pChildren
,
1
),
&
pRightChildCond
);
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
nodesDestroyNode
(
pOnCond
);
nodesDestroyNode
(
pLeftChildCond
);
nodesDestroyNode
(
pRightChildCond
);
}
return
code
;
}
static
int32_t
cpdPushAggCondition
(
SOptimizeContext
*
pCxt
,
SAggLogicNode
*
pAgg
)
{
// todo
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/planner/src/planSpliter.c
浏览文件 @
e0654bac
...
...
@@ -18,6 +18,7 @@
#define SPLIT_FLAG_MASK(n) (1 << n)
#define SPLIT_FLAG_STS SPLIT_FLAG_MASK(0)
#define SPLIT_FLAG_CTJ SPLIT_FLAG_MASK(1)
#define SPLIT_FLAG_SET_MASK(val, mask) (val) |= (mask)
#define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
...
...
@@ -39,43 +40,14 @@ typedef struct SStsInfo {
SLogicSubplan
*
pSubplan
;
}
SStsInfo
;
static
SLogicNode
*
stsMatchByNode
(
SLogicNode
*
pNode
)
{
if
(
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
pNode
)
&&
NULL
!=
((
SScanLogicNode
*
)
pNode
)
->
pVgroupList
&&
((
SScanLogicNode
*
)
pNode
)
->
pVgroupList
->
numOfVgroups
>
1
)
{
return
pNode
;
}
SNode
*
pChild
;
FOREACH
(
pChild
,
pNode
->
pChildren
)
{
SLogicNode
*
pSplitNode
=
stsMatchByNode
((
SLogicNode
*
)
pChild
);
if
(
NULL
!=
pSplitNode
)
{
return
pSplitNode
;
}
}
return
NULL
;
}
typedef
struct
SCtjInfo
{
SScanLogicNode
*
pScan
;
SLogicSubplan
*
pSubplan
;
}
SCtjInfo
;
static
void
stsFindSplitNode
(
SLogicSubplan
*
pSubplan
,
SStsInfo
*
pInfo
)
{
SLogicNode
*
pSplitNode
=
stsMatchByNode
(
pSubplan
->
pNode
);
if
(
NULL
!=
pSplitNode
)
{
pInfo
->
pScan
=
(
SScanLogicNode
*
)
pSplitNode
;
pInfo
->
pSubplan
=
pSubplan
;
}
}
static
void
stsMatch
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SStsInfo
*
pInfo
)
{
if
(
!
SPLIT_FLAG_TEST_MASK
(
pSubplan
->
splitFlag
,
SPLIT_FLAG_STS
))
{
stsFindSplitNode
(
pSubplan
,
pInfo
);
}
SNode
*
pChild
;
FOREACH
(
pChild
,
pSubplan
->
pChildren
)
{
stsMatch
(
pCxt
,
(
SLogicSubplan
*
)
pChild
,
pInfo
);
if
(
NULL
!=
pInfo
->
pScan
)
{
break
;
}
}
return
;
}
typedef
bool
(
*
FSplFindSplitNode
)(
SLogicSubplan
*
pSubplan
,
SStsInfo
*
pInfo
);
static
SLogicSubplan
*
s
tsCreateScanSubplan
(
SSplitContext
*
pCxt
,
SScanLogicNode
*
pScan
)
{
static
SLogicSubplan
*
s
plCreateScanSubplan
(
SSplitContext
*
pCxt
,
SScanLogicNode
*
pScan
,
int32_t
flag
)
{
SLogicSubplan
*
pSubplan
=
nodesMakeNode
(
QUERY_NODE_LOGIC_SUBPLAN
);
if
(
NULL
==
pSubplan
)
{
return
NULL
;
...
...
@@ -84,11 +56,11 @@ static SLogicSubplan* stsCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode*
pSubplan
->
subplanType
=
SUBPLAN_TYPE_SCAN
;
pSubplan
->
pNode
=
(
SLogicNode
*
)
nodesCloneNode
(
pScan
);
TSWAP
(
pSubplan
->
pVgroupList
,
((
SScanLogicNode
*
)
pSubplan
->
pNode
)
->
pVgroupList
,
SVgroupsInfo
*
);
SPLIT_FLAG_SET_MASK
(
pSubplan
->
splitFlag
,
SPLIT_FLAG_STS
);
SPLIT_FLAG_SET_MASK
(
pSubplan
->
splitFlag
,
flag
);
return
pSubplan
;
}
static
int32_t
s
tsCreateExchangeNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SScanLogicNode
*
pScan
)
{
static
int32_t
s
plCreateExchangeNode
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
SScanLogicNode
*
pScan
,
ESubplanType
subplanType
)
{
SExchangeLogicNode
*
pExchange
=
nodesMakeNode
(
QUERY_NODE_LOGIC_PLAN_EXCHANGE
);
if
(
NULL
==
pExchange
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -119,10 +91,100 @@ static int32_t stsCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla
return
TSDB_CODE_FAILED
;
}
static
bool
splMatch
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
,
int32_t
flag
,
FSplFindSplitNode
func
,
void
*
pInfo
)
{
if
(
!
SPLIT_FLAG_TEST_MASK
(
pSubplan
->
splitFlag
,
flag
))
{
if
(
func
(
pSubplan
,
pInfo
))
{
return
true
;
}
}
SNode
*
pChild
;
FOREACH
(
pChild
,
pSubplan
->
pChildren
)
{
if
(
splMatch
(
pCxt
,
(
SLogicSubplan
*
)
pChild
,
flag
,
func
,
pInfo
))
{
return
true
;
}
}
return
false
;
}
static
SLogicNode
*
stsMatchByNode
(
SLogicNode
*
pNode
)
{
if
(
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
pNode
)
&&
NULL
!=
((
SScanLogicNode
*
)
pNode
)
->
pVgroupList
&&
((
SScanLogicNode
*
)
pNode
)
->
pVgroupList
->
numOfVgroups
>
1
)
{
return
pNode
;
}
SNode
*
pChild
;
FOREACH
(
pChild
,
pNode
->
pChildren
)
{
SLogicNode
*
pSplitNode
=
stsMatchByNode
((
SLogicNode
*
)
pChild
);
if
(
NULL
!=
pSplitNode
)
{
return
pSplitNode
;
}
}
return
NULL
;
}
static
bool
stsFindSplitNode
(
SLogicSubplan
*
pSubplan
,
SStsInfo
*
pInfo
)
{
SLogicNode
*
pSplitNode
=
stsMatchByNode
(
pSubplan
->
pNode
);
if
(
NULL
!=
pSplitNode
)
{
pInfo
->
pScan
=
(
SScanLogicNode
*
)
pSplitNode
;
pInfo
->
pSubplan
=
pSubplan
;
}
return
NULL
!=
pSplitNode
;
}
static
int32_t
stsSplit
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
)
{
SStsInfo
info
=
{
0
};
stsMatch
(
pCxt
,
pSubplan
,
&
info
);
if
(
NULL
==
info
.
pScan
)
{
if
(
!
splMatch
(
pCxt
,
pSubplan
,
SPLIT_FLAG_STS
,
stsFindSplitNode
,
&
info
))
{
return
TSDB_CODE_SUCCESS
;
}
if
(
NULL
==
info
.
pSubplan
->
pChildren
)
{
info
.
pSubplan
->
pChildren
=
nodesMakeList
();
if
(
NULL
==
info
.
pSubplan
->
pChildren
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
int32_t
code
=
nodesListStrictAppend
(
info
.
pSubplan
->
pChildren
,
splCreateScanSubplan
(
pCxt
,
info
.
pScan
,
SPLIT_FLAG_STS
));
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
splCreateExchangeNode
(
pCxt
,
info
.
pSubplan
,
info
.
pScan
,
SUBPLAN_TYPE_MERGE
);
}
++
(
pCxt
->
groupId
);
pCxt
->
split
=
true
;
return
code
;
}
static
bool
ctjIsSingleTable
(
int8_t
tableType
)
{
return
(
TSDB_CHILD_TABLE
==
tableType
||
TSDB_NORMAL_TABLE
==
tableType
);
}
static
SLogicNode
*
ctjMatchByNode
(
SLogicNode
*
pNode
)
{
if
(
QUERY_NODE_LOGIC_PLAN_JOIN
==
nodeType
(
pNode
))
{
SLogicNode
*
pLeft
=
(
SLogicNode
*
)
nodesListGetNode
(
pNode
->
pChildren
,
0
);
SLogicNode
*
pRight
=
(
SLogicNode
*
)
nodesListGetNode
(
pNode
->
pChildren
,
1
);
if
(
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
pLeft
)
&&
ctjIsSingleTable
(((
SScanLogicNode
*
)
pLeft
)
->
pMeta
->
tableType
)
&&
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
pRight
)
&&
ctjIsSingleTable
(((
SScanLogicNode
*
)
pRight
)
->
pMeta
->
tableType
))
{
return
pRight
;
}
}
SNode
*
pChild
;
FOREACH
(
pChild
,
pNode
->
pChildren
)
{
SLogicNode
*
pSplitNode
=
ctjMatchByNode
((
SLogicNode
*
)
pChild
);
if
(
NULL
!=
pSplitNode
)
{
return
pSplitNode
;
}
}
return
NULL
;
}
static
bool
ctjFindSplitNode
(
SLogicSubplan
*
pSubplan
,
SStsInfo
*
pInfo
)
{
SLogicNode
*
pSplitNode
=
ctjMatchByNode
(
pSubplan
->
pNode
);
if
(
NULL
!=
pSplitNode
)
{
pInfo
->
pScan
=
(
SScanLogicNode
*
)
pSplitNode
;
pInfo
->
pSubplan
=
pSubplan
;
}
return
NULL
!=
pSplitNode
;
}
static
int32_t
ctjSplit
(
SSplitContext
*
pCxt
,
SLogicSubplan
*
pSubplan
)
{
SCtjInfo
info
=
{
0
};
if
(
!
splMatch
(
pCxt
,
pSubplan
,
SPLIT_FLAG_CTJ
,
ctjFindSplitNode
,
&
info
))
{
return
TSDB_CODE_SUCCESS
;
}
if
(
NULL
==
info
.
pSubplan
->
pChildren
)
{
...
...
@@ -131,9 +193,9 @@ static int32_t stsSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
int32_t
code
=
nodesListStrictAppend
(
info
.
pSubplan
->
pChildren
,
s
tsCreateScanSubplan
(
pCxt
,
info
.
pScan
));
int32_t
code
=
nodesListStrictAppend
(
info
.
pSubplan
->
pChildren
,
s
plCreateScanSubplan
(
pCxt
,
info
.
pScan
,
SPLIT_FLAG_CTJ
));
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
s
tsCreateExchangeNode
(
pCxt
,
info
.
pSubplan
,
info
.
pScan
);
code
=
s
plCreateExchangeNode
(
pCxt
,
info
.
pSubplan
,
info
.
pScan
,
info
.
pSubplan
->
subplanType
);
}
++
(
pCxt
->
groupId
);
pCxt
->
split
=
true
;
...
...
@@ -141,7 +203,8 @@ static int32_t stsSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
}
static
const
SSplitRule
splitRuleSet
[]
=
{
{
.
pName
=
"SuperTableScan"
,
.
splitFunc
=
stsSplit
}
{
.
pName
=
"SuperTableScan"
,
.
splitFunc
=
stsSplit
},
{
.
pName
=
"ChildTableJoin"
,
.
splitFunc
=
ctjSplit
},
};
static
const
int32_t
splitRuleNum
=
(
sizeof
(
splitRuleSet
)
/
sizeof
(
SSplitRule
));
...
...
source/libs/planner/test/plannerTest.cpp
浏览文件 @
e0654bac
...
...
@@ -70,6 +70,12 @@ protected:
cout
<<
"unformatted logic plan : "
<<
endl
;
cout
<<
toString
((
const
SNode
*
)
pLogicNode
,
false
)
<<
endl
;
code
=
optimizeLogicPlan
(
&
cxt
,
pLogicNode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
cout
<<
"sql:["
<<
cxt_
.
pSql
<<
"] optimizeLogicPlan code:"
<<
code
<<
", strerror:"
<<
tstrerror
(
code
)
<<
endl
;
return
false
;
}
SLogicSubplan
*
pLogicSubplan
=
nullptr
;
code
=
splitLogicPlan
(
&
cxt
,
pLogicNode
,
&
pLogicSubplan
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -174,13 +180,13 @@ TEST_F(PlannerTest, selectStableBasic) {
TEST_F
(
PlannerTest
,
selectJoin
)
{
setDatabase
(
"root"
,
"test"
);
bind
(
"SELECT *
FROM st1s1 t1, st1s2 t2 where t1.ts = t2.ts"
);
ASSERT_TRUE
(
run
());
// bind("SELECT t1.c1, t2.c2
FROM st1s1 t1, st1s2 t2 where t1.ts = t2.ts");
//
ASSERT_TRUE(run());
bind
(
"SELECT * FROM st1s1 t1 join st1s2 t2 on t1.ts = t2.ts where t1.c1 > t2.c1
"
);
ASSERT_TRUE
(
run
());
// bind("SELECT t1.*, t2.* FROM st1s1 t1, st1s2 t2 where t1.ts = t2.ts
");
//
ASSERT_TRUE(run());
bind
(
"SELECT t1.
* FROM st1s1 t1 join st1s2 t2 on t1.ts = t2.ts where t1.c1 > t2.c1
"
);
bind
(
"SELECT t1.
c1, t2.c1 FROM st1s1 t1 join st1s2 t2 on t1.ts = t2.ts where t1.c1 > t2.c1 and t1.c2 = 'abc' and t2.c2 = 'qwe'
"
);
ASSERT_TRUE
(
run
());
}
...
...
tests/script/jenkins/basic.txt
浏览文件 @
e0654bac
...
...
@@ -59,13 +59,13 @@
./test.sh -f tsim/tmq/oneTopic.sim
./test.sh -f tsim/tmq/multiTopic.sim
./test.sh -f tsim/tmq/mainConsumerInMultiTopic.sim
./test.sh -f tsim/tmq/mainConsumerInOneTopic.sim
#
./test.sh -f tsim/tmq/mainConsumerInMultiTopic.sim
#
./test.sh -f tsim/tmq/mainConsumerInOneTopic.sim
#fail ./test.sh -f tsim/tmq/main2Con1Cgrp1TopicFrCtb.sim
#fail ./test.sh -f tsim/tmq/main2Con1Cgrp1TopicFrStb.sim
./test.sh -f tsim/tmq/main2Con1Cgrp2TopicFrCtb.sim
./test.sh -f tsim/tmq/main2Con1Cgrp2TopicFrStb.sim
#
./test.sh -f tsim/tmq/main2Con1Cgrp2TopicFrCtb.sim
#
./test.sh -f tsim/tmq/main2Con1Cgrp2TopicFrStb.sim
# --- stable
...
...
tests/script/tsim/tmq/overlapTopic2Con1Cgrp.sim
0 → 100644
浏览文件 @
e0654bac
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
# scene1: vgroups=1, one topic for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
# scene2: vgroups=1, multi topics for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
# scene3: vgroups=4, one topic for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
# scene4: vgroups=4, multi topics for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
#
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
#
######## ######## ######## ######## ######## ######## ######## ######## ######## ########
######## This test case include scene2 and scene4
######## ######## ######## ######## ######## ######## ######## ######## ######## ########
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1
system sh/exec.sh -n dnode1 -s start
$loop_cnt = 0
check_dnode_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05
if $data00 != 1 then
return -1
endi
if $data04 != ready then
goto check_dnode_ready
endi
sql connect
$loop_cnt = 0
$vgroups = 1
$dbNamme = d0
loop_vgroups:
print =============== create database $dbNamme vgroups $vgroups
sql create database $dbNamme vgroups $vgroups
sql show databases
print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
print $data10 $data11 $data12 $data13 $data14 $data15 $data16 $data17 $data18 $data19
print $data20 $data21 $data22 $data23 $data24 $data25 $data26 $data27 $data28 $data29
if $loop_cnt == 0 then
if $rows != 2 then
return -1
endi
if $data02 != 1 then # vgroups
print vgroups: $data02
return -1
endi
else
if $rows != 3 then
return -1
endi
if $data00 == d1 then
if $data02 != 4 then # vgroups
print vgroups: $data02
return -1
endi
else
if $data12 != 4 then # vgroups
print vgroups: $data12
return -1
endi
endi
endi
sql use $dbNamme
print =============== create super table
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int)
sql show stables
if $rows != 1 then
return -1
endi
print =============== create child table
$tbPrefix = ct
$tbNum = 10
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
sql create table $tb using stb tags( $i )
$i = $i + 1
endw
print =============== create normal table
sql create table ntb (ts timestamp, c1 int, c2 float, c3 binary(10))
print =============== create multi topics. notes: now only support:
print =============== 1. columns from stb/ctb/ntb; 2. * from ctb/ntb; 3. function from stb/ctb/ntb
print =============== will support: * from stb
sql create topic topic_stb_column as select ts, c1, c3 from stb
sql create topic topic_stb_all as select sqrt(c1) from stb
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
sql create topic topic_ctb_column as select ts, c1, c3 from ct0
sql create topic topic_ctb_all as select * from ct0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ct0
sql create topic topic_ntb_column as select ts, c1, c3 from ntb
sql create topic topic_ntb_all as select * from ntb
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb
sql show tables
if $rows != 11 then
return -1
endi
print =============== insert data
$rowNum = 100
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
$x = 0
while $x < $rowNum
$c = $x / 10
$c = $c * 10
$c = $x - $c
$binary = ' . binary
$binary = $binary . $c
$binary = $binary . '
sql insert into $tb values ($tstart , $c , $x , $binary )
sql insert into ntb values ($tstart , $c , $x , $binary )
$tstart = $tstart + 1
$x = $x + 1
endw
$i = $i + 1
# $tstart = 1640966400000
endw
#root@trd02 /home $ tmq_sim --help
# -c Configuration directory, default is
# -d The name of the database for cosumer, no default
# -t The topic string for cosumer, no default
# -k The key-value string for cosumer, no default
# -g showMsgFlag, default is 0
#
$consumeDelay = 2
$expectMsgCntFromCtb = $rowNum
$expectMsgCntFromStb = $rowNum * $tbNum
$expectMsgCntFromNtb = $rowNum * $tbNum
print expectMsgCntFromCtb: $expectMsgCntFromCtb
print expectMsgCntFromStb: $expectMsgCntFromStb
print expectMsgCntFromNtb: $expectMsgCntFromNtb
# supported key:
# group.id:<xxx>
# enable.auto.commit:<true | false>
# auto.offset.reset:<earliest | latest | none>
# td.connect.ip:<fqdn | ipaddress>
# td.connect.user:root
# td.connect.pass:taosdata
# td.connect.port:6030
# td.connect.db:db
$numOfTopics = 3
$totalMsgCntOfmultiTopics = $expectMsgCntFromStb * $numOfTopics
$expect_result = @{consume success: @
$expect_result = $expect_result . $totalMsgCntOfmultiTopics
$expect_result = $expect_result . @, @
$expect_result = $expect_result . 0}
print expect_result----> $expect_result
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column, topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_function, topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 2
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column, topic_stb_function" -k1 "group.id:tg2" -t "topic_stb_function, topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 2
print cmd result----> $system_content
if $system_content != success then
return -1
endi
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column, topic_stb_function" -k1 "group.id:tg1" -t "topic_stb_function, topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 3
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t1 "topic_stb_column, topic_stb_function" -k1 "group.id:tg1" -t "topic_stb_function, topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromStb -j 3
print cmd result----> $system_content
if $system_content != success then
return -1
endi
#$numOfTopics = 3
#$totalMsgCntOfmultiTopics = $rowNum * $numOfTopics
#$expect_result = @{consume success: @
#$expect_result = $expect_result . $totalMsgCntOfmultiTopics
#$expect_result = $expect_result . @, @
#$expect_result = $expect_result . 0}
#print expect_result----> $expect_result
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column, topic_ctb_function, topic_ctb_all" -k "group.id:tg2" -t "topic_ctb_column, topic_ctb_function, topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb -j 4
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column, topic_ctb_function, topic_ctb_all" -k "group.id:tg2" -t "topic_ctb_column, topic_ctb_function, topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb -j 4
print cmd result----> $system_content
if $system_content != success then
return -1
endi
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column, topic_ctb_function" -k "group.id:tg1" -t "topic_ctb_function, topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb -j 3
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column, topic_ctb_function" -k "group.id:tg1" -t "topic_ctb_function, topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromCtb -j 3
print cmd result----> $system_content
if $system_content != success then
return -1
endi
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column, topic_ntb_function, topic_ntb_all" -k "group.id:tg2" -t "topic_ntb_column, topic_ntb_function, topic_ntb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromNtb -j 4
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column, topic_ntb_function, topic_ntb_all" -k "group.id:tg2" -t "topic_ntb_column, topic_ntb_function, topic_ntb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromNtb -j 4
print cmd result----> $system_content
if $system_content != success then
return -1
endi
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column, topic_ntb_function" -k "group.id:tg1" -t "topic_ntb_function, topic_ntb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromNtb -j 3
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column, topic_ntb_function" -k "group.id:tg1" -t "topic_ntb_function, topic_ntb_all" -k "group.id:tg2" -y $consumeDelay -m $expectMsgCntFromNtb -j 3
print cmd result----> $system_content
if $system_content != success then
return -1
endi
if $loop_cnt == 0 then
$loop_cnt = 1
$vgroups = 4
$dbNamme = d1
goto loop_vgroups
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
tests/test/c/tmqSim.c
浏览文件 @
e0654bac
...
...
@@ -136,6 +136,10 @@ void parseArgument(int32_t argc, char* argv[]) {
}
}
if
(
0
==
g_stConfInfo
.
consumeMsgCnt
)
{
g_stConfInfo
.
consumeMsgCnt
=
0x7fffffff
;
}
#if 0
pPrint("%s configDir:%s %s", GREEN, configDir, NC);
pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC);
...
...
@@ -493,6 +497,29 @@ int main(int32_t argc, char* argv[]) {
}
else
{
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
if
(
2
==
g_stConfInfo
.
checkMode
)
{
if
((
totalMsgs
+
pInfo
->
consumeMsgCnt
)
==
3
*
g_stConfInfo
.
consumeMsgCnt
)
{
printf
(
"success"
);
}
else
{
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
if
(
3
==
g_stConfInfo
.
checkMode
)
{
if
((
totalMsgs
==
2
*
g_stConfInfo
.
consumeMsgCnt
)
&&
(
pInfo
->
consumeMsgCnt
==
2
*
g_stConfInfo
.
consumeMsgCnt
))
{
printf
(
"success"
);
}
else
{
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
if
(
4
==
g_stConfInfo
.
checkMode
)
{
if
(((
totalMsgs
==
0
)
&&
(
pInfo
->
consumeMsgCnt
==
3
*
g_stConfInfo
.
consumeMsgCnt
))
||
((
pInfo
->
consumeMsgCnt
==
0
)
&&
(
totalMsgs
==
3
*
g_stConfInfo
.
consumeMsgCnt
))
||
((
pInfo
->
consumeMsgCnt
==
g_stConfInfo
.
consumeMsgCnt
)
&&
(
totalMsgs
==
2
*
g_stConfInfo
.
consumeMsgCnt
))
||
((
pInfo
->
consumeMsgCnt
==
2
*
g_stConfInfo
.
consumeMsgCnt
)
&&
(
totalMsgs
==
g_stConfInfo
.
consumeMsgCnt
)))
{
printf
(
"success"
);
}
else
{
printf
(
"fail, consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
else
{
printf
(
"fail, check mode unknow. consumer msg cnt: %d, %d"
,
totalMsgs
,
pInfo
->
consumeMsgCnt
);
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录