Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a1203e40
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a1203e40
编写于
7月 31, 2023
作者:
wmmhello
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into 3.0
上级
3cd2b105
79646fff
变更
45
隐藏空白更改
内联
并排
Showing
45 changed file
with
641 addition
and
432 deletion
+641
-432
docs/en/14-reference/03-connector/06-rust.mdx
docs/en/14-reference/03-connector/06-rust.mdx
+2
-1
docs/zh/08-connector/26-rust.mdx
docs/zh/08-connector/26-rust.mdx
+2
-1
include/common/tglobal.h
include/common/tglobal.h
+2
-0
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+0
-1
include/util/taoserror.h
include/util/taoserror.h
+1
-0
include/util/tdef.h
include/util/tdef.h
+1
-1
source/client/inc/clientSml.h
source/client/inc/clientSml.h
+3
-2
source/client/src/clientSml.c
source/client/src/clientSml.c
+27
-8
source/client/src/clientSmlJson.c
source/client/src/clientSmlJson.c
+4
-4
source/client/src/clientSmlLine.c
source/client/src/clientSmlLine.c
+4
-2
source/client/src/clientSmlTelnet.c
source/client/src/clientSmlTelnet.c
+2
-2
source/client/src/clientTmq.c
source/client/src/clientTmq.c
+44
-34
source/common/src/tglobal.c
source/common/src/tglobal.c
+10
-0
source/common/src/tmsg.c
source/common/src/tmsg.c
+3
-0
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+9
-4
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+1
-1
source/dnode/mnode/impl/src/mndUser.c
source/dnode/mnode/impl/src/mndUser.c
+17
-9
source/dnode/snode/src/snode.c
source/dnode/snode/src/snode.c
+3
-3
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+124
-110
source/dnode/vnode/src/tq/tqMeta.c
source/dnode/vnode/src/tq/tqMeta.c
+3
-3
source/dnode/vnode/src/tq/tqPush.c
source/dnode/vnode/src/tq/tqPush.c
+3
-3
source/dnode/vnode/src/tq/tqRestore.c
source/dnode/vnode/src/tq/tqRestore.c
+4
-4
source/dnode/vnode/src/tq/tqUtil.c
source/dnode/vnode/src/tq/tqUtil.c
+1
-0
source/dnode/vnode/src/tsdb/tsdbCache.c
source/dnode/vnode/src/tsdb/tsdbCache.c
+13
-110
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+1
-1
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+1
-0
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+97
-92
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+4
-9
source/libs/parser/inc/parUtil.h
source/libs/parser/inc/parUtil.h
+6
-0
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+18
-16
source/libs/parser/src/parUtil.c
source/libs/parser/src/parUtil.c
+15
-0
source/libs/stream/src/stream.c
source/libs/stream/src/stream.c
+1
-1
source/libs/stream/src/streamExec.c
source/libs/stream/src/streamExec.c
+10
-1
source/libs/stream/src/streamRecover.c
source/libs/stream/src/streamRecover.c
+13
-1
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+3
-3
source/util/CMakeLists.txt
source/util/CMakeLists.txt
+7
-0
source/util/src/terror.c
source/util/src/terror.c
+1
-0
source/util/src/tlog.c
source/util/src/tlog.c
+4
-0
tests/parallel_test/cases.task
tests/parallel_test/cases.task
+3
-2
tests/system-test/1-insert/opentsdb_json_taosc_insert.py
tests/system-test/1-insert/opentsdb_json_taosc_insert.py
+2
-0
tests/system-test/1-insert/opentsdb_telnet_line_taosc_insert.py
...system-test/1-insert/opentsdb_telnet_line_taosc_insert.py
+2
-0
tests/system-test/2-query/sml.py
tests/system-test/2-query/sml.py
+10
-1
tests/system-test/2-query/sml_TS-3724.py
tests/system-test/2-query/sml_TS-3724.py
+125
-0
tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
...st/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
+2
-2
utils/test/c/sml_test.c
utils/test/c/sml_test.c
+33
-0
未找到文件。
docs/en/14-reference/03-connector/06-rust.mdx
浏览文件 @
a1203e40
...
...
@@ -31,7 +31,8 @@ Websocket connections are supported on all platforms that can run Go.
| connector-rust version | TDengine version | major features |
| :----------------: | :--------------: | :--------------------------------------------------: |
| v0.8.12 | 3.0.5.0 or later | TMQ: Get consuming progress and seek offset to consume. |
| v0.9.2 | 3.0.7.0 or later | STMT: Get tag_fields and col_fields under ws. |
| v0.8.12 | 3.0.5.0 | TMQ: Get consuming progress and seek offset to consume. |
| v0.8.0 | 3.0.4.0 | Support schemaless insert. |
| v0.7.6 | 3.0.3.0 | Support req_id in query. |
| v0.6.0 | 3.0.0.0 | Base features. |
...
...
docs/zh/08-connector/26-rust.mdx
浏览文件 @
a1203e40
...
...
@@ -30,7 +30,8 @@ Websocket 连接支持所有能运行 Rust 的平台。
| Rust 连接器版本 | TDengine 版本 | 主要功能 |
| :----------------: | :--------------: | :--------------------------------------------------: |
| v0.8.12 | 3.0.5.0 or later | 消息订阅:获取消费进度及按照指定进度开始消费。 |
| v0.9.2 | 3.0.7.0 or later | STMT:ws 下获取 tag_fields、col_fields。 |
| v0.8.12 | 3.0.5.0 | 消息订阅:获取消费进度及按照指定进度开始消费。 |
| v0.8.0 | 3.0.4.0 | 支持无模式写入。 |
| v0.7.6 | 3.0.3.0 | 支持在请求中使用 req_id。 |
| v0.6.0 | 3.0.0.0 | 基础功能。 |
...
...
include/common/tglobal.h
浏览文件 @
a1203e40
...
...
@@ -169,6 +169,8 @@ extern char tsUdfdLdLibPath[];
// schemaless
extern
char
tsSmlChildTableName
[];
extern
char
tsSmlTagName
[];
extern
bool
tsSmlDot2Underline
;
extern
char
tsSmlTsDefaultName
[];
// extern bool tsSmlDataFormat;
// extern int32_t tsSmlBatchSize;
...
...
include/libs/stream/tstream.h
浏览文件 @
a1203e40
...
...
@@ -45,7 +45,6 @@ enum {
TASK_STATUS__FAIL
,
TASK_STATUS__STOP
,
TASK_STATUS__SCAN_HISTORY
,
// stream task scan history data by using tsdbread in the stream scanner
TASK_STATUS__SCAN_HISTORY_WAL
,
// scan history data in wal
TASK_STATUS__HALT
,
// pause, but not be manipulated by user command
TASK_STATUS__PAUSE
,
// pause
};
...
...
include/util/taoserror.h
浏览文件 @
a1203e40
...
...
@@ -780,6 +780,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TMQ_INVALID_TOPIC TAOS_DEF_ERROR_CODE(0, 0x4009)
#define TSDB_CODE_TMQ_NEED_INITIALIZED TAOS_DEF_ERROR_CODE(0, 0x4010)
#define TSDB_CODE_TMQ_NO_COMMITTED TAOS_DEF_ERROR_CODE(0, 0x4011)
#define TSDB_CODE_TMQ_SAME_COMMITTED_VALUE TAOS_DEF_ERROR_CODE(0, 0x4012)
// stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
...
...
include/util/tdef.h
浏览文件 @
a1203e40
...
...
@@ -200,7 +200,7 @@ typedef enum ELogicConditionType {
#define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string
#define TSDB_DB_NAME_LEN 65
#define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_PRIVILEDGE_CONDITION_LEN
200
#define TSDB_PRIVILEDGE_CONDITION_LEN
48*1024
#define TSDB_FUNC_NAME_LEN 65
#define TSDB_FUNC_COMMENT_LEN 1024 * 1024
...
...
source/client/inc/clientSml.h
浏览文件 @
a1203e40
...
...
@@ -64,8 +64,8 @@ extern "C" {
#define IS_INVALID_COL_LEN(len) ((len) <= 0 || (len) >= TSDB_COL_NAME_LEN)
#define IS_INVALID_TABLE_LEN(len) ((len) <= 0 || (len) >= TSDB_TABLE_NAME_LEN)
#define TS "_ts"
#define TS_LEN 3
//
#define TS "_ts"
//
#define TS_LEN 3
#define VALUE "_value"
#define VALUE_LEN 6
...
...
@@ -258,6 +258,7 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
int32_t
smlParseTelnetString
(
SSmlHandle
*
info
,
char
*
sql
,
char
*
sqlEnd
,
SSmlLineInfo
*
elements
);
int32_t
smlParseJSON
(
SSmlHandle
*
info
,
char
*
payload
);
void
smlStrReplace
(
char
*
src
,
int32_t
len
);
#ifdef __cplusplus
}
#endif
...
...
source/client/src/clientSml.c
浏览文件 @
a1203e40
...
...
@@ -104,7 +104,7 @@ static int32_t smlCheckAuth(SSmlHandle *info, SRequestConnInfo* conn, const cha
SUserAuthRes
authRes
=
{
0
};
code
=
catalogChkAuth
(
info
->
pCatalog
,
conn
,
&
pAuth
,
&
authRes
);
nodesDestroyNode
(
authRes
.
pCond
);
return
(
code
==
TSDB_CODE_SUCCESS
)
?
(
authRes
.
pass
?
TSDB_CODE_SUCCESS
:
TSDB_CODE_PAR_PERMISSION_DENIED
)
:
code
;
...
...
@@ -114,6 +114,15 @@ inline bool smlDoubleToInt64OverFlow(double num) {
return
false
;
}
void
smlStrReplace
(
char
*
src
,
int32_t
len
){
if
(
!
tsSmlDot2Underline
)
return
;
for
(
int
i
=
0
;
i
<
len
;
i
++
){
if
(
src
[
i
]
==
'.'
){
src
[
i
]
=
'_'
;
}
}
}
int32_t
smlBuildInvalidDataMsg
(
SSmlMsgBuf
*
pBuf
,
const
char
*
msg1
,
const
char
*
msg2
)
{
if
(
pBuf
->
buf
)
{
memset
(
pBuf
->
buf
,
0
,
pBuf
->
len
);
...
...
@@ -193,6 +202,9 @@ static int32_t smlParseTableName(SArray *tags, char *childTableName) {
if
(
childTableNameLen
==
tag
->
keyLen
&&
strncmp
(
tag
->
key
,
tsSmlChildTableName
,
tag
->
keyLen
)
==
0
)
{
memset
(
childTableName
,
0
,
TSDB_TABLE_NAME_LEN
);
strncpy
(
childTableName
,
tag
->
value
,
(
tag
->
length
<
TSDB_TABLE_NAME_LEN
?
tag
->
length
:
TSDB_TABLE_NAME_LEN
));
if
(
tsSmlDot2Underline
){
smlStrReplace
(
childTableName
,
strlen
(
childTableName
));
}
taosArrayRemove
(
tags
,
i
);
break
;
}
...
...
@@ -838,6 +850,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
char
*
measure
=
taosMemoryMalloc
(
superTableLen
);
memcpy
(
measure
,
superTable
,
superTableLen
);
PROCESS_SLASH_IN_MEASUREMENT
(
measure
,
superTableLen
);
smlStrReplace
(
measure
,
superTableLen
);
memset
(
pName
.
tname
,
0
,
TSDB_TABLE_NAME_LEN
);
memcpy
(
pName
.
tname
,
measure
,
superTableLen
);
taosMemoryFree
(
measure
);
...
...
@@ -1051,7 +1064,8 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
taosMemoryFreeClear
(
sTableData
->
tableMeta
);
sTableData
->
tableMeta
=
pTableMeta
;
uDebug
(
"SML:0x%"
PRIx64
"modify schema uid:%"
PRIu64
", sversion:%d, tversion:%d"
,
info
->
id
,
pTableMeta
->
uid
,
pTableMeta
->
sversion
,
pTableMeta
->
tversion
)
tmp
=
(
SSmlSTableMeta
**
)
taosHashIterate
(
info
->
superTables
,
tmp
);
pTableMeta
->
sversion
,
pTableMeta
->
tversion
);
tmp
=
(
SSmlSTableMeta
**
)
taosHashIterate
(
info
->
superTables
,
tmp
);
}
uDebug
(
"SML:0x%"
PRIx64
" smlModifyDBSchemas end success, format:%d, needModifySchema:%d"
,
info
->
id
,
info
->
dataFormat
,
info
->
needModifySchema
);
...
...
@@ -1394,7 +1408,14 @@ static int32_t smlInsertData(SSmlHandle *info) {
SSmlTableInfo
**
oneTable
=
(
SSmlTableInfo
**
)
taosHashIterate
(
info
->
childTables
,
NULL
);
while
(
oneTable
)
{
SSmlTableInfo
*
tableData
=
*
oneTable
;
tstrncpy
(
pName
.
tname
,
tableData
->
sTableName
,
tableData
->
sTableNameLen
+
1
);
int
measureLen
=
tableData
->
sTableNameLen
;
char
*
measure
=
(
char
*
)
taosMemoryMalloc
(
tableData
->
sTableNameLen
);
memcpy
(
measure
,
tableData
->
sTableName
,
tableData
->
sTableNameLen
);
PROCESS_SLASH_IN_MEASUREMENT
(
measure
,
measureLen
);
smlStrReplace
(
measure
,
measureLen
);
memset
(
pName
.
tname
,
0
,
TSDB_TABLE_NAME_LEN
);
memcpy
(
pName
.
tname
,
measure
,
measureLen
);
if
(
info
->
pRequest
->
tableList
==
NULL
)
{
info
->
pRequest
->
tableList
=
taosArrayInit
(
1
,
sizeof
(
SName
));
...
...
@@ -1411,6 +1432,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
code
=
smlCheckAuth
(
info
,
&
conn
,
pName
.
tname
,
AUTH_TYPE_WRITE
);
if
(
code
!=
TSDB_CODE_SUCCESS
){
taosMemoryFree
(
measure
);
return
code
;
}
...
...
@@ -1418,6 +1440,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
code
=
catalogGetTableHashVgroup
(
info
->
pCatalog
,
&
conn
,
&
pName
,
&
vg
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
uError
(
"SML:0x%"
PRIx64
" catalogGetTableHashVgroup failed. table name: %s"
,
info
->
id
,
tableData
->
childTableName
);
taosMemoryFree
(
measure
);
return
code
;
}
taosHashPut
(
info
->
pVgHash
,
(
const
char
*
)
&
vg
.
vgId
,
sizeof
(
vg
.
vgId
),
(
char
*
)
&
vg
,
sizeof
(
vg
));
...
...
@@ -1426,6 +1449,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
(
SSmlSTableMeta
**
)
taosHashGet
(
info
->
superTables
,
tableData
->
sTableName
,
tableData
->
sTableNameLen
);
if
(
unlikely
(
NULL
==
pMeta
||
NULL
==
(
*
pMeta
)
->
tableMeta
))
{
uError
(
"SML:0x%"
PRIx64
" NULL == pMeta. table name: %s"
,
info
->
id
,
tableData
->
childTableName
);
taosMemoryFree
(
measure
);
return
TSDB_CODE_SML_INTERNAL_ERROR
;
}
...
...
@@ -1435,11 +1459,6 @@ static int32_t smlInsertData(SSmlHandle *info) {
uDebug
(
"SML:0x%"
PRIx64
" smlInsertData table:%s, uid:%"
PRIu64
", format:%d"
,
info
->
id
,
pName
.
tname
,
tableData
->
uid
,
info
->
dataFormat
);
int
measureLen
=
tableData
->
sTableNameLen
;
char
*
measure
=
(
char
*
)
taosMemoryMalloc
(
tableData
->
sTableNameLen
);
memcpy
(
measure
,
tableData
->
sTableName
,
tableData
->
sTableNameLen
);
PROCESS_SLASH_IN_MEASUREMENT
(
measure
,
measureLen
);
code
=
smlBindData
(
info
->
pQuery
,
info
->
dataFormat
,
tableData
->
tags
,
(
*
pMeta
)
->
cols
,
tableData
->
cols
,
(
*
pMeta
)
->
tableMeta
,
tableData
->
childTableName
,
measure
,
measureLen
,
info
->
ttl
,
info
->
msgBuf
.
buf
,
info
->
msgBuf
.
len
);
...
...
source/client/src/clientSmlJson.c
浏览文件 @
a1203e40
...
...
@@ -996,8 +996,8 @@ static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo
uError
(
"OTD:0x%"
PRIx64
" Unable to parse timestamp from JSON payload"
,
info
->
id
);
return
TSDB_CODE_INVALID_TIMESTAMP
;
}
SSmlKv
kvTs
=
{.
key
=
TS
,
.
keyLen
=
TS_LEN
,
SSmlKv
kvTs
=
{.
key
=
tsSmlTsDefaultName
,
.
keyLen
=
strlen
(
tsSmlTsDefaultName
)
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
,
.
i
=
ts
,
.
length
=
(
size_t
)
tDataTypes
[
TSDB_DATA_TYPE_TIMESTAMP
].
bytes
};
...
...
@@ -1200,8 +1200,8 @@ static int32_t smlParseJSONString(SSmlHandle *info, char **start, SSmlLineInfo *
return
TSDB_CODE_INVALID_TIMESTAMP
;
}
}
SSmlKv
kvTs
=
{.
key
=
TS
,
.
keyLen
=
TS_LEN
,
SSmlKv
kvTs
=
{.
key
=
tsSmlTsDefaultName
,
.
keyLen
=
strlen
(
tsSmlTsDefaultName
)
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
,
.
i
=
ts
,
.
length
=
(
size_t
)
tDataTypes
[
TSDB_DATA_TYPE_TIMESTAMP
].
bytes
};
...
...
source/client/src/clientSmlLine.c
浏览文件 @
a1203e40
...
...
@@ -157,6 +157,7 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
measure
=
(
char
*
)
taosMemoryMalloc
(
currElement
->
measureLen
);
memcpy
(
measure
,
currElement
->
measure
,
currElement
->
measureLen
);
PROCESS_SLASH_IN_MEASUREMENT
(
measure
,
measureLen
);
smlStrReplace
(
measure
,
measureLen
);
}
STableMeta
*
pTableMeta
=
smlGetMeta
(
info
,
measure
,
measureLen
);
if
(
currElement
->
measureEscaped
)
{
...
...
@@ -365,6 +366,7 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
measure
=
(
char
*
)
taosMemoryMalloc
(
currElement
->
measureLen
);
memcpy
(
measure
,
currElement
->
measure
,
currElement
->
measureLen
);
PROCESS_SLASH_IN_MEASUREMENT
(
measure
,
measureLen
);
smlStrReplace
(
measure
,
measureLen
);
}
STableMeta
*
pTableMeta
=
smlGetMeta
(
info
,
measure
,
measureLen
);
if
(
currElement
->
measureEscaped
)
{
...
...
@@ -651,8 +653,8 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
return
TSDB_CODE_INVALID_TIMESTAMP
;
}
// add ts to
SSmlKv
kv
=
{.
key
=
TS
,
.
keyLen
=
TS_LEN
,
SSmlKv
kv
=
{.
key
=
tsSmlTsDefaultName
,
.
keyLen
=
strlen
(
tsSmlTsDefaultName
)
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
,
.
i
=
ts
,
.
length
=
(
size_t
)
tDataTypes
[
TSDB_DATA_TYPE_TIMESTAMP
].
bytes
,
...
...
source/client/src/clientSmlTelnet.c
浏览文件 @
a1203e40
...
...
@@ -260,8 +260,8 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
smlBuildInvalidDataMsg
(
&
info
->
msgBuf
,
"invalid timestamp"
,
sql
);
return
TSDB_CODE_INVALID_TIMESTAMP
;
}
SSmlKv
kvTs
=
{.
key
=
TS
,
.
keyLen
=
TS_LEN
,
SSmlKv
kvTs
=
{.
key
=
tsSmlTsDefaultName
,
.
keyLen
=
strlen
(
tsSmlTsDefaultName
)
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
,
.
i
=
ts
,
.
length
=
(
size_t
)
tDataTypes
[
TSDB_DATA_TYPE_TIMESTAMP
].
bytes
};
...
...
source/client/src/clientTmq.c
浏览文件 @
a1203e40
...
...
@@ -586,31 +586,37 @@ static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STq
if
(
code
!=
0
){
goto
end
;
}
if
(
offsetVal
->
type
>
0
&&
!
tOffsetEqual
(
offsetVal
,
&
pVg
->
offsetInfo
.
committedOffset
))
{
char
offsetBuf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
offsetBuf
,
tListLen
(
offsetBuf
),
offsetVal
);
if
(
offsetVal
->
type
<=
0
)
{
code
=
TSDB_CODE_TMQ_INVALID_MSG
;
goto
end
;
}
if
(
tOffsetEqual
(
offsetVal
,
&
pVg
->
offsetInfo
.
committedOffset
)){
code
=
TSDB_CODE_TMQ_SAME_COMMITTED_VALUE
;
goto
end
;
}
char
offsetBuf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
offsetBuf
,
tListLen
(
offsetBuf
),
offsetVal
);
char
commitBuf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
commitBuf
,
tListLen
(
commitBuf
),
&
pVg
->
offsetInfo
.
committedOffset
);
char
commitBuf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
commitBuf
,
tListLen
(
commitBuf
),
&
pVg
->
offsetInfo
.
committedOffset
);
SMqCommitCbParamSet
*
pParamSet
=
prepareCommitCbParamSet
(
tmq
,
pCommitFp
,
userParam
,
0
);
if
(
pParamSet
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
end
;
}
code
=
doSendCommitMsg
(
tmq
,
pVg
->
vgId
,
&
pVg
->
epSet
,
offsetVal
,
pTopicName
,
pParamSet
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"consumer:0x%"
PRIx64
" topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s"
,
tmq
->
consumerId
,
pTopicName
,
pVg
->
vgId
,
offsetBuf
,
commitBuf
,
tstrerror
(
terrno
));
taosMemoryFree
(
pParamSet
);
goto
end
;
}
tscInfo
(
"consumer:0x%"
PRIx64
" topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s"
,
tmq
->
consumerId
,
pTopicName
,
pVg
->
vgId
,
offsetBuf
,
commitBuf
);
pVg
->
offsetInfo
.
committedOffset
=
*
offsetVal
;
SMqCommitCbParamSet
*
pParamSet
=
prepareCommitCbParamSet
(
tmq
,
pCommitFp
,
userParam
,
0
);
if
(
pParamSet
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
end
;
}
code
=
doSendCommitMsg
(
tmq
,
pVg
->
vgId
,
&
pVg
->
epSet
,
offsetVal
,
pTopicName
,
pParamSet
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"consumer:0x%"
PRIx64
" topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s"
,
tmq
->
consumerId
,
pTopicName
,
pVg
->
vgId
,
offsetBuf
,
commitBuf
,
tstrerror
(
terrno
));
taosMemoryFree
(
pParamSet
);
goto
end
;
}
tscInfo
(
"consumer:0x%"
PRIx64
" topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s"
,
tmq
->
consumerId
,
pTopicName
,
pVg
->
vgId
,
offsetBuf
,
commitBuf
);
pVg
->
offsetInfo
.
committedOffset
=
*
offsetVal
;
end:
taosRUnLockLatch
(
&
tmq
->
lock
);
return
code
;
...
...
@@ -650,7 +656,8 @@ static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_c
code
=
asyncCommitOffset
(
tmq
,
pTopicName
,
vgId
,
&
offsetVal
,
pCommitFp
,
userParam
);
end:
if
(
code
!=
TSDB_CODE_SUCCESS
){
if
(
code
!=
TSDB_CODE_SUCCESS
&&
pCommitFp
!=
NULL
){
if
(
code
==
TSDB_CODE_TMQ_SAME_COMMITTED_VALUE
)
code
=
TSDB_CODE_SUCCESS
;
pCommitFp
(
tmq
,
code
,
userParam
);
}
}
...
...
@@ -1859,8 +1866,8 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p
static
void
updateVgInfo
(
SMqClientVg
*
pVg
,
STqOffsetVal
*
reqOffset
,
STqOffsetVal
*
rspOffset
,
int64_t
sver
,
int64_t
ever
,
int64_t
consumerId
){
if
(
!
pVg
->
seekUpdated
)
{
tscDebug
(
"consumer:0x%"
PRIx64
" local offset is update, since seekupdate not set"
,
consumerId
);
if
(
reqOffset
->
type
!=
0
)
pVg
->
offsetInfo
.
beginOffset
=
*
reqOffset
;
if
(
rspOffset
->
type
!=
0
)
pVg
->
offsetInfo
.
endOffset
=
*
rspOffset
;
pVg
->
offsetInfo
.
beginOffset
=
*
reqOffset
;
pVg
->
offsetInfo
.
endOffset
=
*
rspOffset
;
}
else
{
tscDebug
(
"consumer:0x%"
PRIx64
" local offset is NOT update, since seekupdate is set"
,
consumerId
);
}
...
...
@@ -1948,7 +1955,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
}
taosWUnLockLatch
(
&
tmq
->
lock
);
}
else
{
tsc
Debug
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
tsc
Info
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
tmq
->
consumerId
,
pollRspWrapper
->
vgId
,
pDataRsp
->
head
.
epoch
,
consumerEpoch
);
pRspWrapper
=
tmqFreeRspWrapper
(
pRspWrapper
);
taosFreeQitem
(
pollRspWrapper
);
...
...
@@ -1979,7 +1986,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosWUnLockLatch
(
&
tmq
->
lock
);
return
pRsp
;
}
else
{
tsc
Debug
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
tsc
Info
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
tmq
->
consumerId
,
pollRspWrapper
->
vgId
,
pollRspWrapper
->
metaRsp
.
head
.
epoch
,
consumerEpoch
);
pRspWrapper
=
tmqFreeRspWrapper
(
pRspWrapper
);
taosFreeQitem
(
pollRspWrapper
);
...
...
@@ -2036,7 +2043,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
taosWUnLockLatch
(
&
tmq
->
lock
);
return
pRsp
;
}
else
{
tsc
Debug
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
tsc
Info
(
"consumer:0x%"
PRIx64
" vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d"
,
tmq
->
consumerId
,
pollRspWrapper
->
vgId
,
pollRspWrapper
->
taosxRsp
.
head
.
epoch
,
consumerEpoch
);
pRspWrapper
=
tmqFreeRspWrapper
(
pRspWrapper
);
taosFreeQitem
(
pollRspWrapper
);
...
...
@@ -2350,7 +2357,7 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
tsem_destroy
(
&
pInfo
->
sem
);
taosMemoryFree
(
pInfo
);
tsc
Debug
(
"consumer:0x%"
PRIx64
" sync
commit done, code:%s"
,
tmq
->
consumerId
,
tstrerror
(
code
));
tsc
Info
(
"consumer:0x%"
PRIx64
" sync res
commit done, code:%s"
,
tmq
->
consumerId
,
tstrerror
(
code
));
return
code
;
}
...
...
@@ -2406,15 +2413,17 @@ int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId,
tsem_init
(
&
pInfo
->
sem
,
0
,
0
);
pInfo
->
code
=
0
;
asyncCommitOffset
(
tmq
,
tname
,
vgId
,
&
offsetVal
,
commitCallBackFn
,
pInfo
);
tsem_wait
(
&
pInfo
->
sem
);
code
=
pInfo
->
code
;
code
=
asyncCommitOffset
(
tmq
,
tname
,
vgId
,
&
offsetVal
,
commitCallBackFn
,
pInfo
);
if
(
code
==
0
){
tsem_wait
(
&
pInfo
->
sem
);
code
=
pInfo
->
code
;
}
if
(
code
==
TSDB_CODE_TMQ_SAME_COMMITTED_VALUE
)
code
=
TSDB_CODE_SUCCESS
;
tsem_destroy
(
&
pInfo
->
sem
);
taosMemoryFree
(
pInfo
);
tscInfo
(
"consumer:0x%"
PRIx64
" sync send
seek
to vgId:%d, offset:%"
PRId64
" code:%s"
,
tmq
->
consumerId
,
vgId
,
offset
,
tstrerror
(
code
));
tscInfo
(
"consumer:0x%"
PRIx64
" sync send
commit
to vgId:%d, offset:%"
PRId64
" code:%s"
,
tmq
->
consumerId
,
vgId
,
offset
,
tstrerror
(
code
));
return
code
;
}
...
...
@@ -2451,10 +2460,11 @@ void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, i
code
=
asyncCommitOffset
(
tmq
,
tname
,
vgId
,
&
offsetVal
,
cb
,
param
);
tscInfo
(
"consumer:0x%"
PRIx64
" async send
seek
to vgId:%d, offset:%"
PRId64
" code:%s"
,
tmq
->
consumerId
,
vgId
,
offset
,
tstrerror
(
code
));
tscInfo
(
"consumer:0x%"
PRIx64
" async send
commit
to vgId:%d, offset:%"
PRId64
" code:%s"
,
tmq
->
consumerId
,
vgId
,
offset
,
tstrerror
(
code
));
end:
if
(
code
!=
0
&&
cb
!=
NULL
){
if
(
code
==
TSDB_CODE_TMQ_SAME_COMMITTED_VALUE
)
code
=
TSDB_CODE_SUCCESS
;
cb
(
tmq
,
code
,
param
);
}
}
...
...
source/common/src/tglobal.c
浏览文件 @
a1203e40
...
...
@@ -105,6 +105,8 @@ char *tsClientCrashReportUri = "/ccrashreport";
char
*
tsSvrCrashReportUri
=
"/dcrashreport"
;
// schemaless
bool
tsSmlDot2Underline
=
true
;
char
tsSmlTsDefaultName
[
TSDB_COL_NAME_LEN
]
=
"_ts"
;
char
tsSmlTagName
[
TSDB_COL_NAME_LEN
]
=
"_tag_null"
;
char
tsSmlChildTableName
[
TSDB_TABLE_NAME_LEN
]
=
""
;
// user defined child table name can be specified in tag value.
// If set to empty system will generate table name using MD5 hash.
...
...
@@ -366,6 +368,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if
(
cfgAddBool
(
pCfg
,
"keepColumnName"
,
tsKeepColumnName
,
CFG_SCOPE_CLIENT
)
!=
0
)
return
-
1
;
if
(
cfgAddString
(
pCfg
,
"smlChildTableName"
,
""
,
CFG_SCOPE_CLIENT
)
!=
0
)
return
-
1
;
if
(
cfgAddString
(
pCfg
,
"smlTagName"
,
tsSmlTagName
,
CFG_SCOPE_CLIENT
)
!=
0
)
return
-
1
;
if
(
cfgAddString
(
pCfg
,
"smlTsDefaultName"
,
tsSmlTsDefaultName
,
CFG_SCOPE_CLIENT
)
!=
0
)
return
-
1
;
if
(
cfgAddBool
(
pCfg
,
"smlDot2Underline"
,
tsSmlDot2Underline
,
CFG_SCOPE_CLIENT
)
!=
0
)
return
-
1
;
// if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, CFG_SCOPE_CLIENT) != 0) return -1;
// if (cfgAddInt32(pCfg, "smlBatchSize", tsSmlBatchSize, 1, INT32_MAX, CFG_SCOPE_CLIENT) != 0) return -1;
if
(
cfgAddInt32
(
pCfg
,
"maxInsertBatchRows"
,
tsMaxInsertBatchRows
,
1
,
INT32_MAX
,
CFG_SCOPE_CLIENT
)
!=
0
)
return
-
1
;
...
...
@@ -801,6 +805,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tstrncpy
(
tsSmlChildTableName
,
cfgGetItem
(
pCfg
,
"smlChildTableName"
)
->
str
,
TSDB_TABLE_NAME_LEN
);
tstrncpy
(
tsSmlTagName
,
cfgGetItem
(
pCfg
,
"smlTagName"
)
->
str
,
TSDB_COL_NAME_LEN
);
tstrncpy
(
tsSmlTsDefaultName
,
cfgGetItem
(
pCfg
,
"smlTsDefaultName"
)
->
str
,
TSDB_COL_NAME_LEN
);
tsSmlDot2Underline
=
cfgGetItem
(
pCfg
,
"smlDot2Underline"
)
->
bval
;
// tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval;
// tsSmlBatchSize = cfgGetItem(pCfg, "smlBatchSize")->i32;
...
...
@@ -1243,6 +1249,10 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) {
// tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval;
// } else if (strcasecmp("smlBatchSize", name) == 0) {
// tsSmlBatchSize = cfgGetItem(pCfg, "smlBatchSize")->i32;
}
else
if
(
strcasecmp
(
"smlTsDefaultName"
,
name
)
==
0
)
{
tstrncpy
(
tsSmlTsDefaultName
,
cfgGetItem
(
pCfg
,
"smlTsDefaultName"
)
->
str
,
TSDB_COL_NAME_LEN
);
}
else
if
(
strcasecmp
(
"smlDot2Underline"
,
name
)
==
0
)
{
tsSmlDot2Underline
=
cfgGetItem
(
pCfg
,
"smlDot2Underline"
)
->
bval
;
}
else
if
(
strcasecmp
(
"shellActivityTimer"
,
name
)
==
0
)
{
tsShellActivityTimer
=
cfgGetItem
(
pCfg
,
"shellActivityTimer"
)
->
i32
;
}
else
if
(
strcasecmp
(
"supportVnodes"
,
name
)
==
0
)
{
...
...
source/common/src/tmsg.c
浏览文件 @
a1203e40
...
...
@@ -7227,6 +7227,9 @@ bool tOffsetEqual(const STqOffsetVal *pLeft, const STqOffsetVal *pRight) {
return
pLeft
->
uid
==
pRight
->
uid
&&
pLeft
->
ts
==
pRight
->
ts
;
}
else
if
(
pLeft
->
type
==
TMQ_OFFSET__SNAPSHOT_META
)
{
return
pLeft
->
uid
==
pRight
->
uid
;
}
else
{
uError
(
"offset type:%d"
,
pLeft
->
type
);
ASSERT
(
0
);
}
}
return
false
;
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
a1203e40
...
...
@@ -94,7 +94,7 @@ void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){
bool
mndRebTryStart
()
{
int32_t
old
=
atomic_val_compare_exchange_32
(
&
mqRebInExecCnt
,
0
,
1
);
m
Info
(
"tq timer, rebalance counter old val:%d"
,
old
);
m
Debug
(
"tq timer, rebalance counter old val:%d"
,
old
);
return
old
==
0
;
}
...
...
@@ -116,7 +116,7 @@ void mndRebCntDec() {
int32_t
newVal
=
val
-
1
;
int32_t
oldVal
=
atomic_val_compare_exchange_32
(
&
mqRebInExecCnt
,
val
,
newVal
);
if
(
oldVal
==
val
)
{
m
Info
(
"rebalance trans end, rebalance counter:%d"
,
newVal
);
m
Debug
(
"rebalance trans end, rebalance counter:%d"
,
newVal
);
break
;
}
}
...
...
@@ -420,6 +420,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
SMqSubscribeObj
*
pSub
=
mndAcquireSubscribe
(
pMnode
,
pConsumer
->
cgroup
,
data
->
topicName
);
if
(
pSub
==
NULL
){
ASSERT
(
0
);
continue
;
}
taosWLockLatch
(
&
pSub
->
lock
);
...
...
@@ -515,7 +516,10 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
char
*
topic
=
taosArrayGetP
(
pConsumer
->
currentTopics
,
i
);
SMqSubscribeObj
*
pSub
=
mndAcquireSubscribe
(
pMnode
,
pConsumer
->
cgroup
,
topic
);
// txn guarantees pSub is created
if
(
pSub
==
NULL
)
continue
;
if
(
pSub
==
NULL
)
{
ASSERT
(
0
);
continue
;
}
taosRLockLatch
(
&
pSub
->
lock
);
SMqSubTopicEp
topicEp
=
{
0
};
...
...
@@ -524,6 +528,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
// 2.1 fetch topic schema
SMqTopicObj
*
pTopic
=
mndAcquireTopic
(
pMnode
,
topic
);
if
(
pTopic
==
NULL
)
{
ASSERT
(
0
);
taosRUnLockLatch
(
&
pSub
->
lock
);
mndReleaseSubscribe
(
pMnode
,
pSub
);
continue
;
...
...
@@ -898,7 +903,7 @@ static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
mInfo
(
"consumer:0x%"
PRIx64
" sub insert, cgroup:%s status:%d(%s) epoch:%d"
,
pConsumer
->
consumerId
,
pConsumer
->
cgroup
,
pConsumer
->
status
,
mndConsumerStatusName
(
pConsumer
->
status
),
pConsumer
->
epoch
);
pConsumer
->
subscribeTime
=
taosGetTimestampMs
()
;
pConsumer
->
subscribeTime
=
pConsumer
->
createTime
;
return
0
;
}
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
a1203e40
...
...
@@ -1168,7 +1168,7 @@ static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t cons
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
*
numOfRows
,
(
const
char
*
)
consumerIdHex
,
consumerId
==
-
1
);
m
Debug
(
"mnd show subscriptions: topic %s, consumer:0x%"
PRIx64
" cgroup %s vgid %d"
,
varDataVal
(
topic
),
m
Info
(
"mnd show subscriptions: topic %s, consumer:0x%"
PRIx64
" cgroup %s vgid %d"
,
varDataVal
(
topic
),
consumerId
,
varDataVal
(
cgroup
),
pVgEp
->
vgId
);
// offset
...
...
source/dnode/mnode/impl/src/mndUser.c
浏览文件 @
a1203e40
...
...
@@ -1174,26 +1174,30 @@ static void mndLoopHash(SHashObj *hash, char *priType, SSDataBlock *pBlock, int3
if
(
strcmp
(
"t"
,
value
)
!=
0
)
{
SNode
*
pAst
=
NULL
;
int32_t
sqlLen
=
0
;
char
sql
[
TSDB_EXPLAIN_RESULT_ROW_SIZE
]
=
{
0
};
size_t
bufSz
=
strlen
(
value
)
+
1
;
char
*
sql
=
taosMemoryMalloc
(
bufSz
+
1
);
char
*
obj
=
taosMemoryMalloc
(
TSDB_PRIVILEDGE_CONDITION_LEN
+
VARSTR_HEADER_SIZE
);
if
(
nodesStringToNode
(
value
,
&
pAst
)
==
0
)
{
nodesNodeToSQL
(
pAst
,
sql
,
TSDB_EXPLAIN_RESULT_ROW_SIZE
,
&
sqlLen
);
if
(
sql
!=
NULL
&&
obj
!=
NULL
&&
nodesStringToNode
(
value
,
&
pAst
)
==
0
)
{
nodesNodeToSQL
(
pAst
,
sql
,
bufSz
,
&
sqlLen
);
nodesDestroyNode
(
pAst
);
}
else
{
sqlLen
=
5
;
sprintf
(
sql
,
"error"
);
}
char
obj
[
TSDB_PRIVILEDGE_CONDITION_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
STR_WITH_MAXSIZE_TO_VARSTR
(
obj
,
sql
,
pShow
->
pMeta
->
pSchemas
[
cols
].
bytes
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
*
numOfRows
,
(
const
char
*
)
obj
,
false
);
taosMemoryFree
(
obj
);
taosMemoryFree
(
sql
);
}
else
{
char
condition
[
TSDB_PRIVILEDGE_CONDITION_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
}
;
char
*
condition
=
taosMemoryMalloc
(
TSDB_PRIVILEDGE_CONDITION_LEN
+
VARSTR_HEADER_SIZE
)
;
STR_WITH_MAXSIZE_TO_VARSTR
(
condition
,
""
,
pShow
->
pMeta
->
pSchemas
[
cols
].
bytes
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
*
numOfRows
,
(
const
char
*
)
condition
,
false
);
taosMemoryFree
(
condition
);
}
(
*
numOfRows
)
++
;
...
...
@@ -1242,10 +1246,11 @@ static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
tableName
,
false
);
char
condition
[
TSDB_PRIVILEDGE_CONDITION_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
}
;
char
*
condition
=
taosMemoryMalloc
(
TSDB_PRIVILEDGE_CONDITION_LEN
+
VARSTR_HEADER_SIZE
)
;
STR_WITH_MAXSIZE_TO_VARSTR
(
condition
,
""
,
pShow
->
pMeta
->
pSchemas
[
cols
].
bytes
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
condition
,
false
);
taosMemoryFree
(
condition
);
numOfRows
++
;
}
...
...
@@ -1276,10 +1281,11 @@ static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
tableName
,
false
);
char
condition
[
TSDB_PRIVILEDGE_CONDITION_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
}
;
char
*
condition
=
taosMemoryMalloc
(
TSDB_PRIVILEDGE_CONDITION_LEN
+
VARSTR_HEADER_SIZE
)
;
STR_WITH_MAXSIZE_TO_VARSTR
(
condition
,
""
,
pShow
->
pMeta
->
pSchemas
[
cols
].
bytes
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
condition
,
false
);
taosMemoryFree
(
condition
);
numOfRows
++
;
db
=
taosHashIterate
(
pUser
->
readDbs
,
db
);
...
...
@@ -1311,10 +1317,11 @@ static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
tableName
,
false
);
char
condition
[
TSDB_PRIVILEDGE_CONDITION_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
}
;
char
*
condition
=
taosMemoryMalloc
(
TSDB_PRIVILEDGE_CONDITION_LEN
+
VARSTR_HEADER_SIZE
)
;
STR_WITH_MAXSIZE_TO_VARSTR
(
condition
,
""
,
pShow
->
pMeta
->
pSchemas
[
cols
].
bytes
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
condition
,
false
);
taosMemoryFree
(
condition
);
numOfRows
++
;
db
=
taosHashIterate
(
pUser
->
writeDbs
,
db
);
...
...
@@ -1348,10 +1355,11 @@ static int32_t mndRetrievePrivileges(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
tableName
,
false
);
char
condition
[
TSDB_PRIVILEDGE_CONDITION_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
}
;
char
*
condition
=
taosMemoryMalloc
(
TSDB_PRIVILEDGE_CONDITION_LEN
+
VARSTR_HEADER_SIZE
)
;
STR_WITH_MAXSIZE_TO_VARSTR
(
condition
,
""
,
pShow
->
pMeta
->
pSchemas
[
cols
].
bytes
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataSetVal
(
pColInfo
,
numOfRows
,
(
const
char
*
)
condition
,
false
);
taosMemoryFree
(
condition
);
numOfRows
++
;
topic
=
taosHashIterate
(
pUser
->
topics
,
topic
);
...
...
source/dnode/snode/src/snode.c
浏览文件 @
a1203e40
...
...
@@ -346,9 +346,9 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
rsp
.
status
=
streamTaskCheckStatus
(
pTask
);
streamMetaReleaseTask
(
pSnode
->
pMeta
,
pTask
);
qDebug
(
"s-task:%s recv task check req(reqId:0x%"
PRIx64
") task:0x%x (vgId:%d), status:%s, rsp status %d"
,
pTask
->
id
.
idStr
,
rsp
.
reqId
,
rsp
.
upstreamTaskId
,
rsp
.
upstreamNodeId
,
streamGetTaskStatusStr
(
pTask
->
status
.
taskStatus
)
,
rsp
.
status
);
const
char
*
pStatus
=
streamGetTaskStatusStr
(
pTask
->
status
.
taskStatus
);
qDebug
(
"s-task:%s status:%s, recv task check req(reqId:0x%"
PRIx64
") task:0x%x (vgId:%d), ready:%d"
,
pTask
->
id
.
idStr
,
pStatus
,
rsp
.
reqId
,
rsp
.
upstreamTaskId
,
rsp
.
upstreamNodeId
,
rsp
.
status
);
}
else
{
rsp
.
status
=
0
;
qDebug
(
"tq recv task check(taskId:0x%x not built yet) req(reqId:0x%"
PRIx64
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
a1203e40
...
...
@@ -146,6 +146,20 @@ void tqClose(STQ* pTq) {
return
;
}
void
*
pIter
=
taosHashIterate
(
pTq
->
pPushMgr
,
NULL
);
while
(
pIter
)
{
STqHandle
*
pHandle
=
*
(
STqHandle
**
)
pIter
;
int32_t
vgId
=
TD_VID
(
pTq
->
pVnode
);
if
(
pHandle
->
msg
!=
NULL
)
{
tqPushEmptyDataRsp
(
pHandle
,
vgId
);
rpcFreeCont
(
pHandle
->
msg
->
pCont
);
taosMemoryFree
(
pHandle
->
msg
);
pHandle
->
msg
=
NULL
;
}
pIter
=
taosHashIterate
(
pTq
->
pPushMgr
,
pIter
);
}
tqOffsetClose
(
pTq
->
pOffsetStore
);
taosHashCleanup
(
pTq
->
pHandle
);
taosHashCleanup
(
pTq
->
pPushMgr
);
...
...
@@ -278,6 +292,10 @@ int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
tqInitDataRsp
(
&
dataRsp
,
&
req
);
dataRsp
.
blockNum
=
0
;
dataRsp
.
rspOffset
=
dataRsp
.
reqOffset
;
char
buf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
buf
,
TSDB_OFFSET_LEN
,
&
dataRsp
.
reqOffset
);
tqInfo
(
"tqPushEmptyDataRsp to consumer:0x%"
PRIx64
" vgId:%d, offset:%s, reqId:0x%"
PRIx64
,
req
.
consumerId
,
vgId
,
buf
,
req
.
reqId
);
tqSendDataRsp
(
pHandle
,
pHandle
->
msg
,
&
req
,
&
dataRsp
,
TMQ_MSG_TYPE__POLL_DATA_RSP
,
vgId
);
tDeleteMqDataRsp
(
&
dataRsp
);
return
0
;
...
...
@@ -515,10 +533,11 @@ int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
while
(
pIter
)
{
STqHandle
*
pHandle
=
*
(
STqHandle
**
)
pIter
;
tq
Debug
(
"vgId:%d start set submit for pHandle:%p, consumer:0x%"
PRIx64
,
vgId
,
pHandle
,
pHandle
->
consumerId
);
tq
Info
(
"vgId:%d start set submit for pHandle:%p, consumer:0x%"
PRIx64
,
vgId
,
pHandle
,
pHandle
->
consumerId
);
if
(
ASSERT
(
pHandle
->
msg
!=
NULL
))
{
tqError
(
"pHandle->msg should not be null"
);
taosHashCancelIterate
(
pTq
->
pPushMgr
,
pIter
);
break
;
}
else
{
SRpcMsg
msg
=
{.
msgType
=
TDMT_VND_TMQ_CONSUME
,
.
pCont
=
pHandle
->
msg
->
pCont
,
.
contLen
=
pHandle
->
msg
->
contLen
,
.
info
=
pHandle
->
msg
->
info
};
...
...
@@ -556,10 +575,18 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
// 1. find handle
pHandle
=
taosHashGet
(
pTq
->
pHandle
,
req
.
subKey
,
strlen
(
req
.
subKey
));
if
(
pHandle
==
NULL
)
{
tqError
(
"tmq poll: consumer:0x%"
PRIx64
" vgId:%d subkey %s not found"
,
consumerId
,
vgId
,
req
.
subKey
);
terrno
=
TSDB_CODE_INVALID_MSG
;
taosWUnLockLatch
(
&
pTq
->
lock
);
return
-
1
;
do
{
if
(
tqMetaGetHandle
(
pTq
,
req
.
subKey
)
==
0
){
pHandle
=
taosHashGet
(
pTq
->
pHandle
,
req
.
subKey
,
strlen
(
req
.
subKey
));
if
(
pHandle
!=
NULL
){
break
;
}
}
tqError
(
"tmq poll: consumer:0x%"
PRIx64
" vgId:%d subkey %s not found"
,
consumerId
,
vgId
,
req
.
subKey
);
terrno
=
TSDB_CODE_INVALID_MSG
;
taosWUnLockLatch
(
&
pTq
->
lock
);
return
-
1
;
}
while
(
0
);
}
// 2. check re-balance status
...
...
@@ -849,30 +876,28 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
taosWLockLatch
(
&
pTq
->
lock
);
if
(
pHandle
->
consumerId
==
req
.
newConsumerId
)
{
// do nothing
tqInfo
(
"vgId:%d consumer:0x%"
PRIx64
" remains, no switch occurs, should not reach here"
,
req
.
vgId
,
req
.
newConsumerId
);
tqInfo
(
"vgId:%d no switch consumer:0x%"
PRIx64
" remains, because redo wal log"
,
req
.
vgId
,
req
.
newConsumerId
);
}
else
{
tqInfo
(
"vgId:%d switch consumer from Id:0x%"
PRIx64
" to Id:0x%"
PRIx64
,
req
.
vgId
,
pHandle
->
consumerId
,
req
.
newConsumerId
);
tqInfo
(
"vgId:%d switch consumer from Id:0x%"
PRIx64
" to Id:0x%"
PRIx64
,
req
.
vgId
,
pHandle
->
consumerId
,
req
.
newConsumerId
);
atomic_store_64
(
&
pHandle
->
consumerId
,
req
.
newConsumerId
);
// atomic_add_fetch_32(&pHandle->epoch, 1);
// kill executing task
// if(tqIsHandleExec(pHandle)) {
// qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
// if (pTaskInfo != NULL) {
// qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
// }
// if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
// qStreamCloseTsdbReader(pTaskInfo);
// }
// }
// remove if it has been register in the push manager, and return one empty block to consumer
tqUnregisterPushHandle
(
pTq
,
pHandle
);
ret
=
tqMetaSaveHandle
(
pTq
,
req
.
subKey
,
pHandle
);
}
// atomic_add_fetch_32(&pHandle->epoch, 1);
// kill executing task
// if(tqIsHandleExec(pHandle)) {
// qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
// if (pTaskInfo != NULL) {
// qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
// }
// if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
// qStreamCloseTsdbReader(pTaskInfo);
// }
// }
// remove if it has been register in the push manager, and return one empty block to consumer
tqUnregisterPushHandle
(
pTq
,
pHandle
);
taosWUnLockLatch
(
&
pTq
->
lock
);
ret
=
tqMetaSaveHandle
(
pTq
,
req
.
subKey
,
pHandle
);
}
end:
...
...
@@ -1041,9 +1066,9 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
rsp
.
status
=
streamTaskCheckStatus
(
pTask
);
streamMetaReleaseTask
(
pTq
->
pStreamMeta
,
pTask
);
tqDebug
(
"s-task:%s recv task check req(reqId:0x%"
PRIx64
") task:0x%x (vgId:%d), status:%s, rsp status %d"
,
pTask
->
id
.
idStr
,
rsp
.
reqId
,
rsp
.
upstreamTaskId
,
rsp
.
upstreamNodeId
,
streamGetTaskStatusStr
(
pTask
->
status
.
taskStatus
)
,
rsp
.
status
);
const
char
*
pStatus
=
streamGetTaskStatusStr
(
pTask
->
status
.
taskStatus
);
tqDebug
(
"s-task:%s status:%s, recv task check req(reqId:0x%"
PRIx64
") task:0x%x (vgId:%d), ready:%d"
,
pTask
->
id
.
idStr
,
pStatus
,
rsp
.
reqId
,
rsp
.
upstreamTaskId
,
rsp
.
upstreamNodeId
,
rsp
.
status
);
}
else
{
rsp
.
status
=
0
;
tqDebug
(
"tq recv task check(taskId:0x%x not built yet) req(reqId:0x%"
PRIx64
") from task:0x%x (vgId:%d), rsp status %d"
,
...
...
@@ -1145,7 +1170,6 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
// 3. It's an fill history task, do nothing. wait for the main task to start it
SStreamTask
*
p
=
streamMetaAcquireTask
(
pStreamMeta
,
taskId
);
if
(
p
!=
NULL
)
{
// reset the downstreamReady flag.
p
->
status
.
downstreamReady
=
0
;
streamTaskCheckDownstreamTasks
(
p
);
}
...
...
@@ -1154,12 +1178,10 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
}
int32_t
tqProcessTaskScanHistory
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
char
*
msg
=
pMsg
->
pCont
;
SStreamScanHistoryReq
*
pReq
=
(
SStreamScanHistoryReq
*
)
pMsg
->
pCont
;
SStreamMeta
*
pMeta
=
pTq
->
pStreamMeta
;
SStreamScanHistoryReq
*
pReq
=
(
SStreamScanHistoryReq
*
)
msg
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
SStreamTask
*
pTask
=
streamMetaAcquireTask
(
pMeta
,
pReq
->
taskId
);
if
(
pTask
==
NULL
)
{
tqError
(
"vgId:%d failed to acquire stream task:0x%x during stream recover, task may have been destroyed"
,
...
...
@@ -1167,12 +1189,20 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return
-
1
;
}
// do recovery step
1
// do recovery step1
const
char
*
id
=
pTask
->
id
.
idStr
;
const
char
*
pStatus
=
streamGetTaskStatusStr
(
pTask
->
status
.
taskStatus
);
tqDebug
(
"s-task:%s start
history data scan
stage(step 1), status:%s"
,
id
,
pStatus
);
tqDebug
(
"s-task:%s start
scan-history
stage(step 1), status:%s"
,
id
,
pStatus
);
int64_t
st
=
taosGetTimestampMs
();
if
(
pTask
->
tsInfo
.
step1Start
==
0
)
{
ASSERT
(
pTask
->
status
.
pauseAllowed
==
false
);
pTask
->
tsInfo
.
step1Start
=
taosGetTimestampMs
();
if
(
pTask
->
info
.
fillHistory
==
1
)
{
streamTaskEnablePause
(
pTask
);
}
}
else
{
tqDebug
(
"s-task:%s resume from paused, start ts:%"
PRId64
,
pTask
->
id
.
idStr
,
pTask
->
tsInfo
.
step1Start
);
}
// we have to continue retrying to successfully execute the scan history task.
int8_t
schedStatus
=
atomic_val_compare_exchange_8
(
&
pTask
->
status
.
schedStatus
,
TASK_SCHED_STATUS__INACTIVE
,
...
...
@@ -1185,31 +1215,21 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return
0
;
}
ASSERT
(
pTask
->
status
.
pauseAllowed
==
false
);
if
(
pTask
->
info
.
fillHistory
==
1
)
{
streamTaskEnablePause
(
pTask
);
}
if
(
!
streamTaskRecoverScanStep1Finished
(
pTask
))
{
streamSourceScanHistoryData
(
pTask
);
}
// disable the pause when handling the step2 scan of tsdb data.
// the whole next procedure cann't be stopped.
// todo fix it: the following procedure should be executed completed and then shutdown when trying to close vnode.
if
(
pTask
->
info
.
fillHistory
==
1
)
{
streamTaskDisablePause
(
pTask
);
ASSERT
(
pTask
->
status
.
pauseAllowed
==
true
);
}
if
(
streamTaskShouldStop
(
&
pTask
->
status
)
||
streamTaskShouldPause
(
&
pTask
->
status
))
{
tqDebug
(
"s-task:%s is dropped or paused, abort recover in step1"
,
id
);
streamSourceScanHistoryData
(
pTask
);
if
(
pTask
->
status
.
taskStatus
==
TASK_STATUS__PAUSE
)
{
double
el
=
(
taosGetTimestampMs
()
-
pTask
->
tsInfo
.
step1Start
)
/
1000
.
0
;
tqDebug
(
"s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d"
,
pTask
->
id
.
idStr
,
el
,
TASK_SCHED_STATUS__INACTIVE
);
atomic_store_8
(
&
pTask
->
status
.
schedStatus
,
TASK_SCHED_STATUS__INACTIVE
);
streamMetaReleaseTask
(
pMeta
,
pTask
);
return
0
;
}
double
el
=
(
taosGetTimestampMs
()
-
st
)
/
1000
.
0
;
// the following procedure should be executed, no matter status is stop/pause or not
double
el
=
(
taosGetTimestampMs
()
-
pTask
->
tsInfo
.
step1Start
)
/
1000
.
0
;
tqDebug
(
"s-task:%s scan-history stage(step 1) ended, elapsed time:%.2fs"
,
id
,
el
);
if
(
pTask
->
info
.
fillHistory
)
{
...
...
@@ -1217,77 +1237,71 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
SStreamTask
*
pStreamTask
=
NULL
;
bool
done
=
false
;
if
(
!
pReq
->
igUntreated
&&
!
streamTaskRecoverScanStep1Finished
(
pTask
))
{
// 1. stop the related stream task, get the current scan wal version of stream task, ver.
pStreamTask
=
streamMetaAcquireTask
(
pMeta
,
pTask
->
streamTaskId
.
taskId
);
if
(
pStreamTask
==
NULL
)
{
qError
(
"failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s"
,
pTask
->
streamTaskId
.
taskId
,
pTask
->
id
.
idStr
);
// 1. get the related stream task
pStreamTask
=
streamMetaAcquireTask
(
pMeta
,
pTask
->
streamTaskId
.
taskId
);
if
(
pStreamTask
==
NULL
)
{
// todo delete this task, if the related stream task is dropped
qError
(
"failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s"
,
pTask
->
streamTaskId
.
taskId
,
pTask
->
id
.
idStr
);
pTask
->
status
.
taskStatus
=
TASK_STATUS__DROPPING
;
tqDebug
(
"s-task:%s fill-history task set status to be dropping"
,
id
);
tqDebug
(
"s-task:%s fill-history task set status to be dropping"
,
id
);
streamMetaSaveTask
(
pMeta
,
pTask
);
streamMetaReleaseTask
(
pMeta
,
pTask
);
return
-
1
;
}
streamMetaUnregisterTask
(
pMeta
,
pTask
->
id
.
taskId
);
streamMetaReleaseTask
(
pMeta
,
pTask
);
return
-
1
;
}
ASSERT
(
pStreamTask
->
info
.
taskLevel
==
TASK_LEVEL__SOURCE
);
ASSERT
(
pStreamTask
->
info
.
taskLevel
==
TASK_LEVEL__SOURCE
);
// stream task in TASK_STATUS__SCAN_HISTORY can not be paused.
// wait for the
stream task get ready for scan history data
while
(
pStreamTask
->
status
.
taskStatus
==
TASK_STATUS__SCAN_HISTORY
)
{
tqDebug
(
"s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms"
,
id
,
pTask
->
info
.
taskLevel
,
pStreamTask
->
id
.
idStr
,
streamGetTaskStatusStr
(
pStreamTask
->
status
.
taskStatus
));
taosMsleep
(
100
);
}
// 2. it cannot be paused, when the stream task in TASK_STATUS__SCAN_HISTORY status. Let's wait for the
//
stream task get ready for scan history data
while
(
pStreamTask
->
status
.
taskStatus
==
TASK_STATUS__SCAN_HISTORY
)
{
tqDebug
(
"s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms"
,
id
,
pTask
->
info
.
taskLevel
,
pStreamTask
->
id
.
idStr
,
streamGetTaskStatusStr
(
pStreamTask
->
status
.
taskStatus
));
taosMsleep
(
100
);
}
// now we can stop the stream task execution
streamTaskHalt
(
pStreamTask
);
tqDebug
(
"s-task:%s level:%d sched-status:%d is halt by fill-history task:%s"
,
pStreamTask
->
id
.
idStr
,
pStreamTask
->
info
.
taskLevel
,
pStreamTask
->
status
.
schedStatus
,
id
);
// now we can stop the stream task execution
streamTaskHalt
(
pStreamTask
);
// if it's an source task, extract the last version in wal.
pRange
=
&
pTask
->
dataRange
.
range
;
int64_t
latestVer
=
walReaderGetCurrentVer
(
pStreamTask
->
exec
.
pWalReader
);
done
=
streamHistoryTaskSetVerRangeStep2
(
pTask
,
latestVer
);
}
tqDebug
(
"s-task:%s level:%d sched-status:%d is halt by fill-history task:%s"
,
pStreamTask
->
id
.
idStr
,
pStreamTask
->
info
.
taskLevel
,
pStreamTask
->
status
.
schedStatus
,
id
);
// if it's an source task, extract the last version in wal.
pRange
=
&
pTask
->
dataRange
.
range
;
int64_t
latestVer
=
walReaderGetCurrentVer
(
pStreamTask
->
exec
.
pWalReader
);
done
=
streamHistoryTaskSetVerRangeStep2
(
pTask
,
latestVer
);
if
(
done
)
{
pTask
->
tsInfo
.
step2Start
=
taosGetTimestampMs
();
streamTaskEndScanWAL
(
pTask
);
streamMetaReleaseTask
(
pMeta
,
pTask
);
}
else
{
if
(
!
streamTaskRecoverScanStep1Finished
(
pTask
))
{
STimeWindow
*
pWindow
=
&
pTask
->
dataRange
.
window
;
tqDebug
(
"s-task:%s level:%d verRange:%"
PRId64
" - %"
PRId64
" window:%"
PRId64
"-%"
PRId64
", do secondary scan-history from WAL after halt the related stream task:%s"
,
id
,
pTask
->
info
.
taskLevel
,
pRange
->
minVer
,
pRange
->
maxVer
,
pWindow
->
skey
,
pWindow
->
ekey
,
id
);
ASSERT
(
pTask
->
status
.
schedStatus
==
TASK_SCHED_STATUS__WAITING
);
pTask
->
tsInfo
.
step2Start
=
taosGetTimestampMs
();
streamSetParamForStreamScannerStep2
(
pTask
,
pRange
,
pWindow
);
}
STimeWindow
*
pWindow
=
&
pTask
->
dataRange
.
window
;
tqDebug
(
"s-task:%s level:%d verRange:%"
PRId64
" - %"
PRId64
" window:%"
PRId64
"-%"
PRId64
", do secondary scan-history from WAL after halt the related stream task:%s"
,
id
,
pTask
->
info
.
taskLevel
,
pRange
->
minVer
,
pRange
->
maxVer
,
pWindow
->
skey
,
pWindow
->
ekey
,
pStreamTask
->
id
.
idStr
);
ASSERT
(
pTask
->
status
.
schedStatus
==
TASK_SCHED_STATUS__WAITING
);
if
(
!
streamTaskRecoverScanStep2Finished
(
pTask
))
{
pTask
->
status
.
taskStatus
=
TASK_STATUS__SCAN_HISTORY_WAL
;
if
(
streamTaskShouldStop
(
&
pTask
->
status
)
||
streamTaskShouldPause
(
&
pTask
->
status
))
{
tqDebug
(
"s-task:%s is dropped or paused, abort recover in step1"
,
id
);
streamMetaReleaseTask
(
pMeta
,
pTask
);
return
0
;
}
pTask
->
tsInfo
.
step2Start
=
taosGetTimestampMs
();
streamSetParamForStreamScannerStep2
(
pTask
,
pRange
,
pWindow
);
int64_t
dstVer
=
pTask
->
dataRange
.
range
.
minVer
-
1
;
int64_t
dstVer
=
pTask
->
dataRange
.
range
.
minVer
-
1
;
pTask
->
chkInfo
.
currentVer
=
dstVer
;
walReaderSetSkipToVersion
(
pTask
->
exec
.
pWalReader
,
dstVer
);
tqDebug
(
"s-task:%s wal reader start scan from WAL ver:%"
PRId64
", set sched-status:%d"
,
id
,
dstVer
,
TASK_SCHED_STATUS__INACTIVE
);
}
pTask
->
chkInfo
.
currentVer
=
dstVer
;
walReaderSetSkipToVersion
(
pTask
->
exec
.
pWalReader
,
dstVer
);
tqDebug
(
"s-task:%s wal reader start scan WAL verRange:%"
PRId64
"-%"
PRId64
", set sched-status:%d"
,
id
,
dstVer
,
pTask
->
dataRange
.
range
.
maxVer
,
TASK_SCHED_STATUS__INACTIVE
);
atomic_store_8
(
&
pTask
->
status
.
schedStatus
,
TASK_SCHED_STATUS__INACTIVE
);
// set the fill-history task to be normal
if
(
pTask
->
info
.
fillHistory
==
1
)
{
streamSetStatusNormal
(
pTask
);
}
// 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
// 5. resume the related stream task.
streamMetaReleaseTask
(
pMeta
,
pTask
);
...
...
@@ -1304,7 +1318,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if
(
pTask
->
historyTaskId
.
taskId
==
0
)
{
*
pWindow
=
(
STimeWindow
){
INT64_MIN
,
INT64_MAX
};
tqDebug
(
"s-task:%s scanhistory in stream time window completed, no related fill-history task, reset the time "
"s-task:%s scan
-
history in stream time window completed, no related fill-history task, reset the time "
"window:%"
PRId64
" - %"
PRId64
,
id
,
pWindow
->
skey
,
pWindow
->
ekey
);
qResetStreamInfoTimeWindow
(
pTask
->
exec
.
pExecutor
);
...
...
@@ -1500,7 +1514,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
if
(
pTask
!=
NULL
)
{
// even in halt status, the data in inputQ must be processed
int8_t
st
=
pTask
->
status
.
taskStatus
;
if
(
st
==
TASK_STATUS__NORMAL
||
st
==
TASK_STATUS__SCAN_HISTORY
||
st
==
TASK_STATUS__SCAN_HISTORY_WAL
)
{
if
(
st
==
TASK_STATUS__NORMAL
||
st
==
TASK_STATUS__SCAN_HISTORY
/* || st == TASK_STATUS__SCAN_HISTORY_WAL*/
)
{
tqDebug
(
"vgId:%d s-task:%s start to process block from inputQ, last chk point:%"
PRId64
,
vgId
,
pTask
->
id
.
idStr
,
pTask
->
chkInfo
.
version
);
streamProcessRunReq
(
pTask
);
...
...
@@ -1637,7 +1651,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
vgId
,
pTask
->
id
.
idStr
,
pTask
->
chkInfo
.
currentVer
,
sversion
,
pTask
->
status
.
schedStatus
);
}
if
(
level
==
TASK_LEVEL__SOURCE
&&
pTask
->
info
.
fillHistory
)
{
if
(
level
==
TASK_LEVEL__SOURCE
&&
pTask
->
info
.
fillHistory
&&
pTask
->
status
.
taskStatus
==
TASK_STATUS__SCAN_HISTORY
)
{
streamStartRecoverTask
(
pTask
,
igUntreated
);
}
else
if
(
level
==
TASK_LEVEL__SOURCE
&&
(
taosQueueItemSize
(
pTask
->
inputQueue
->
queue
)
==
0
))
{
tqStartStreamTasks
(
pTq
);
...
...
source/dnode/vnode/src/tq/tqMeta.c
浏览文件 @
a1203e40
...
...
@@ -338,7 +338,7 @@ static int buildHandle(STQ* pTq, STqHandle* handle){
taosArrayDestroy
(
tbUidList
);
return
-
1
;
}
tq
Debug
(
"vgId:%d, tq try to get ctb for stb subscribe, suid:%"
PRId64
,
pVnode
->
config
.
vgId
,
handle
->
execHandle
.
execTb
.
suid
);
tq
Info
(
"vgId:%d, tq try to get ctb for stb subscribe, suid:%"
PRId64
,
pVnode
->
config
.
vgId
,
handle
->
execHandle
.
execTb
.
suid
);
handle
->
execHandle
.
pTqReader
=
tqReaderOpen
(
pVnode
);
tqReaderSetTbUidList
(
handle
->
execHandle
.
pTqReader
,
tbUidList
,
NULL
);
taosArrayDestroy
(
tbUidList
);
...
...
@@ -356,7 +356,7 @@ static int restoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){
if
(
buildHandle
(
pTq
,
handle
)
<
0
){
return
-
1
;
}
tq
Debug
(
"tq restore %s consumer %"
PRId64
" vgId:%d"
,
handle
->
subKey
,
handle
->
consumerId
,
vgId
);
tq
Info
(
"tq restore %s consumer %"
PRId64
" vgId:%d"
,
handle
->
subKey
,
handle
->
consumerId
,
vgId
);
return
taosHashPut
(
pTq
->
pHandle
,
handle
->
subKey
,
strlen
(
handle
->
subKey
),
handle
,
sizeof
(
STqHandle
));
}
...
...
@@ -384,7 +384,7 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
if
(
buildHandle
(
pTq
,
handle
)
<
0
){
return
-
1
;
}
tq
Debug
(
"tq restore %s consumer %"
PRId64
" vgId:%d"
,
handle
->
subKey
,
handle
->
consumerId
,
vgId
);
tq
Info
(
"tq restore %s consumer %"
PRId64
" vgId:%d"
,
handle
->
subKey
,
handle
->
consumerId
,
vgId
);
return
taosHashPut
(
pTq
->
pHandle
,
handle
->
subKey
,
strlen
(
handle
->
subKey
),
handle
,
sizeof
(
STqHandle
));
}
...
...
source/dnode/vnode/src/tq/tqPush.c
浏览文件 @
a1203e40
...
...
@@ -78,12 +78,12 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
memcpy
(
pHandle
->
msg
->
pCont
,
pMsg
->
pCont
,
pMsg
->
contLen
);
pHandle
->
msg
->
contLen
=
pMsg
->
contLen
;
int32_t
ret
=
taosHashPut
(
pTq
->
pPushMgr
,
pHandle
->
subKey
,
strlen
(
pHandle
->
subKey
),
&
pHandle
,
POINTER_BYTES
);
tq
Debug
(
"vgId:%d data is over, ret:%d, consumerId:0x%"
PRIx64
", register to pHandle:%p, pCont:%p, len:%d"
,
vgId
,
ret
,
tq
Info
(
"vgId:%d data is over, ret:%d, consumerId:0x%"
PRIx64
", register to pHandle:%p, pCont:%p, len:%d"
,
vgId
,
ret
,
pHandle
->
consumerId
,
pHandle
,
pHandle
->
msg
->
pCont
,
pHandle
->
msg
->
contLen
);
return
0
;
}
int
32_t
tqUnregisterPushHandle
(
STQ
*
pTq
,
void
*
handle
)
{
int
tqUnregisterPushHandle
(
STQ
*
pTq
,
void
*
handle
)
{
STqHandle
*
pHandle
=
(
STqHandle
*
)
handle
;
int32_t
vgId
=
TD_VID
(
pTq
->
pVnode
);
...
...
@@ -91,7 +91,7 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) {
return
0
;
}
int32_t
ret
=
taosHashRemove
(
pTq
->
pPushMgr
,
pHandle
->
subKey
,
strlen
(
pHandle
->
subKey
));
tq
Debug
(
"vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%"
PRIx64
,
vgId
,
pHandle
,
ret
,
pHandle
->
consumerId
);
tq
Info
(
"vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%"
PRIx64
,
vgId
,
pHandle
,
ret
,
pHandle
->
consumerId
);
if
(
pHandle
->
msg
!=
NULL
)
{
// tqPushDataRsp(pHandle, vgId);
...
...
source/dnode/vnode/src/tq/tqRestore.c
浏览文件 @
a1203e40
...
...
@@ -211,7 +211,7 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
static
void
checkForFillHistoryVerRange
(
SStreamTask
*
pTask
,
int64_t
ver
)
{
if
((
pTask
->
info
.
fillHistory
==
1
)
&&
ver
>
pTask
->
dataRange
.
range
.
maxVer
)
{
qWarn
(
"s-task:%s fill-history scan WAL, currentVer:%"
PRId64
"reach the maximum ver:%"
PRId64
qWarn
(
"s-task:%s fill-history scan WAL, currentVer:%"
PRId64
"
reach the maximum ver:%"
PRId64
", not scan wal anymore, set the transfer state flag"
,
pTask
->
id
.
idStr
,
ver
,
pTask
->
dataRange
.
range
.
maxVer
);
pTask
->
status
.
transferState
=
true
;
...
...
@@ -251,19 +251,19 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
int32_t
status
=
pTask
->
status
.
taskStatus
;
// non-source or fill-history tasks don't need to response the WAL scan action.
if
(
pTask
->
info
.
taskLevel
!=
TASK_LEVEL__SOURCE
)
{
if
(
(
pTask
->
info
.
taskLevel
!=
TASK_LEVEL__SOURCE
)
||
(
pTask
->
status
.
downstreamReady
==
0
)
)
{
streamMetaReleaseTask
(
pStreamMeta
,
pTask
);
continue
;
}
if
(
status
!=
TASK_STATUS__NORMAL
&&
status
!=
TASK_STATUS__SCAN_HISTORY_WAL
)
{
if
(
status
!=
TASK_STATUS__NORMAL
)
{
tqDebug
(
"s-task:%s not ready for new submit block from wal, status:%s"
,
pTask
->
id
.
idStr
,
streamGetTaskStatusStr
(
status
));
streamMetaReleaseTask
(
pStreamMeta
,
pTask
);
continue
;
}
if
((
pTask
->
info
.
fillHistory
==
1
)
&&
pTask
->
status
.
transferState
)
{
ASSERT
(
status
==
TASK_STATUS__
SCAN_HISTORY_W
AL
);
ASSERT
(
status
==
TASK_STATUS__
NORM
AL
);
// the maximum version of data in the WAL has reached already, the step2 is done
tqDebug
(
"s-task:%s fill-history reach the maximum ver:%"
PRId64
", not scan wal anymore"
,
pTask
->
id
.
idStr
,
pTask
->
dataRange
.
range
.
maxVer
);
...
...
source/dnode/vnode/src/tq/tqUtil.c
浏览文件 @
a1203e40
...
...
@@ -317,6 +317,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
// the offset value can not be monotonious increase??
offset
=
reqOffset
;
}
else
{
uError
(
"req offset type is 0"
);
return
TSDB_CODE_TMQ_INVALID_MSG
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbCache.c
浏览文件 @
a1203e40
...
...
@@ -1028,55 +1028,7 @@ int32_t tsdbCacheGetBatch(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCache
return
code
;
}
/*
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
int32_t code = 0;
SLRUCache *pCache = pTsdb->lruCache;
SArray *pCidList = pr->pCidList;
int num_keys = TARRAY_SIZE(pCidList);
for (int i = 0; i < num_keys; ++i) {
SLastCol *pLastCol = NULL;
int16_t cid = *(int16_t *)taosArrayGet(pCidList, i);
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
LRUHandle *h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
if (!h) {
taosThreadMutexLock(&pTsdb->lruMutex);
h = taosLRUCacheLookup(pCache, key, ROCKS_KEY_LEN);
if (!h) {
pLastCol = tsdbCacheLoadCol(pTsdb, pr, pr->pSlotIds[i], uid, cid, ltype);
size_t charge = sizeof(*pLastCol);
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) {
charge += pLastCol->colVal.value.nData;
}
LRUStatus status = taosLRUCacheInsert(pCache, key, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, &h,
TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
if (status != TAOS_LRU_STATUS_OK) {
code = -1;
}
}
taosThreadMutexUnlock(&pTsdb->lruMutex);
}
pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h);
SLastCol lastCol = *pLastCol;
reallocVarData(&lastCol.colVal);
if (h) {
taosLRUCacheRelease(pCache, h, false);
}
taosArrayPush(pLastArray, &lastCol);
}
return code;
}
*/
int32_t
tsdbCacheDel
(
STsdb
*
pTsdb
,
tb_uid_t
suid
,
tb_uid_t
uid
,
TSKEY
sKey
,
TSKEY
eKey
)
{
int32_t
code
=
0
;
// fetch schema
...
...
@@ -1108,6 +1060,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
char
**
values_list
=
taosMemoryCalloc
(
num_keys
*
2
,
sizeof
(
char
*
));
size_t
*
values_list_sizes
=
taosMemoryCalloc
(
num_keys
*
2
,
sizeof
(
size_t
));
char
**
errs
=
taosMemoryCalloc
(
num_keys
*
2
,
sizeof
(
char
*
));
taosThreadMutexLock
(
&
pTsdb
->
lruMutex
);
taosThreadMutexLock
(
&
pTsdb
->
rCache
.
rMutex
);
rocksMayWrite
(
pTsdb
,
true
,
false
,
false
);
rocksdb_multi_get
(
pTsdb
->
rCache
.
db
,
pTsdb
->
rCache
.
readoptions
,
num_keys
*
2
,
(
const
char
*
const
*
)
keys_list
,
...
...
@@ -1137,7 +1090,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
rocksdb_free
(
values_list
[
i
]);
rocksdb_free
(
values_list
[
i
+
num_keys
]);
taosThreadMutexLock
(
&
pTsdb
->
lruMutex
);
//
taosThreadMutexLock(&pTsdb->lruMutex);
LRUHandle
*
h
=
taosLRUCacheLookup
(
pTsdb
->
lruCache
,
keys_list
[
i
],
klen
);
if
(
h
)
{
...
...
@@ -1159,7 +1112,7 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
}
taosLRUCacheErase
(
pTsdb
->
lruCache
,
keys_list
[
num_keys
+
i
],
klen
);
taosThreadMutexUnlock
(
&
pTsdb
->
lruMutex
);
//
taosThreadMutexUnlock(&pTsdb->lruMutex);
}
for
(
int
i
=
0
;
i
<
num_keys
;
++
i
)
{
taosMemoryFree
(
keys_list
[
i
]);
...
...
@@ -1171,6 +1124,8 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
rocksMayWrite
(
pTsdb
,
true
,
false
,
true
);
taosThreadMutexUnlock
(
&
pTsdb
->
lruMutex
);
_exit:
taosMemoryFree
(
pTSchema
);
...
...
@@ -1311,62 +1266,7 @@ int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
return
code
;
}
/*
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
int32_t code = 0;
char key[32] = {0};
int keyLen = 0;
// getTableCacheKey(uid, "lr", key, &keyLen);
getTableCacheKey(uid, 0, key, &keyLen);
LRUHandle *h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
bool invalidate = false;
int16_t nCol = taosArrayGetSize(pLast);
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (eKey >= tTsVal->ts) {
invalidate = true;
break;
}
}
if (invalidate) {
taosLRUCacheRelease(pCache, h, true);
} else {
taosLRUCacheRelease(pCache, h, false);
}
}
// getTableCacheKey(uid, "l", key, &keyLen);
getTableCacheKey(uid, 1, key, &keyLen);
h = taosLRUCacheLookup(pCache, key, keyLen);
if (h) {
SArray *pLast = (SArray *)taosLRUCacheValue(pCache, h);
bool invalidate = false;
int16_t nCol = taosArrayGetSize(pLast);
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLast, iCol);
if (eKey >= tTsVal->ts) {
invalidate = true;
break;
}
}
if (invalidate) {
taosLRUCacheRelease(pCache, h, true);
} else {
taosLRUCacheRelease(pCache, h, false);
}
// void taosLRUCacheErase(SLRUCache * cache, const void *key, size_t keyLen);
}
return code;
}
*/
int32_t
tsdbCacheInsertLastrow
(
SLRUCache
*
pCache
,
STsdb
*
pTsdb
,
tb_uid_t
uid
,
TSDBROW
*
row
,
bool
dup
)
{
int32_t
code
=
0
;
STSRow
*
cacheRow
=
NULL
;
...
...
@@ -1977,9 +1877,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
goto
_err
;
}
loadDataTomb
(
state
->
pr
,
state
->
pr
->
pFileReader
);
state
->
pr
->
pCurFileSet
=
state
->
pFileSet
;
loadDataTomb
(
state
->
pr
,
state
->
pr
->
pFileReader
);
}
if
(
!
state
->
pIndexList
)
{
...
...
@@ -2008,15 +1908,18 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
int
indexSize
=
TARRAY_SIZE
(
state
->
pIndexList
);
if
(
indexSize
<=
0
)
{
clearLastFileSet
(
state
);
state
->
state
=
SFSNEXTROW_FILESET
;
goto
_next_fileset
;
goto
_check_stt_data
;
}
state
->
state
=
SFSNEXTROW_INDEXLIST
;
state
->
iBrinIndex
=
indexSize
;
}
_check_stt_data:
if
(
state
->
pFileSet
!=
state
->
pr
->
pCurFileSet
)
{
state
->
pr
->
pCurFileSet
=
state
->
pFileSet
;
}
code
=
lastIterOpen
(
&
state
->
lastIter
,
state
->
pFileSet
,
state
->
pTsdb
,
state
->
pTSchema
,
state
->
suid
,
state
->
uid
,
state
->
pr
,
state
->
lastTs
,
aCols
,
nCols
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
a1203e40
...
...
@@ -476,8 +476,8 @@ void vnodeClose(SVnode *pVnode) {
tsem_wait
(
&
pVnode
->
canCommit
);
vnodeSyncClose
(
pVnode
);
vnodeQueryClose
(
pVnode
);
walClose
(
pVnode
->
pWal
);
tqClose
(
pVnode
->
pTq
);
walClose
(
pVnode
->
pWal
);
if
(
pVnode
->
pTsdb
)
tsdbClose
(
&
pVnode
->
pTsdb
);
smaClose
(
pVnode
->
pSma
);
if
(
pVnode
->
pMeta
)
metaClose
(
&
pVnode
->
pMeta
);
...
...
source/libs/executor/src/executor.c
浏览文件 @
a1203e40
...
...
@@ -341,6 +341,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
return
NULL
;
}
qResetStreamInfoTimeWindow
(
pTaskInfo
);
return
pTaskInfo
;
}
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
a1203e40
...
...
@@ -1550,10 +1550,86 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
}
}
static
int32_t
setBlockIntoRes
(
SStreamScanInfo
*
pInfo
,
const
SSDataBlock
*
pBlock
,
bool
filter
)
{
static
void
doBlockDataWindowFilter
(
SSDataBlock
*
pBlock
,
int32_t
tsIndex
,
STimeWindow
*
pWindow
,
const
char
*
id
)
{
if
(
pWindow
->
skey
!=
INT64_MIN
||
pWindow
->
ekey
!=
INT64_MAX
)
{
bool
*
p
=
taosMemoryCalloc
(
pBlock
->
info
.
rows
,
sizeof
(
bool
));
bool
hasUnqualified
=
false
;
SColumnInfoData
*
pCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
tsIndex
);
if
(
pWindow
->
skey
!=
INT64_MIN
)
{
qDebug
(
"%s filter for additional history window, skey:%"
PRId64
,
id
,
pWindow
->
skey
);
ASSERT
(
pCol
->
pData
!=
NULL
);
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
int64_t
*
ts
=
(
int64_t
*
)
colDataGetData
(
pCol
,
i
);
p
[
i
]
=
(
*
ts
>=
pWindow
->
skey
);
if
(
!
p
[
i
])
{
hasUnqualified
=
true
;
}
}
}
else
if
(
pWindow
->
ekey
!=
INT64_MAX
)
{
qDebug
(
"%s filter for additional history window, ekey:%"
PRId64
,
id
,
pWindow
->
ekey
);
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
int64_t
*
ts
=
(
int64_t
*
)
colDataGetData
(
pCol
,
i
);
p
[
i
]
=
(
*
ts
<=
pWindow
->
ekey
);
if
(
!
p
[
i
])
{
hasUnqualified
=
true
;
}
}
}
if
(
hasUnqualified
)
{
trimDataBlock
(
pBlock
,
pBlock
->
info
.
rows
,
p
);
}
taosMemoryFree
(
p
);
}
}
// re-build the delete block, ONLY according to the split timestamp
static
void
rebuildDeleteBlockData
(
SSDataBlock
*
pBlock
,
int64_t
skey
,
const
char
*
id
)
{
if
(
skey
==
INT64_MIN
)
{
return
;
}
int32_t
numOfRows
=
pBlock
->
info
.
rows
;
bool
*
p
=
taosMemoryCalloc
(
numOfRows
,
sizeof
(
bool
));
bool
hasUnqualified
=
false
;
SColumnInfoData
*
pSrcStartCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
uint64_t
*
tsStartCol
=
(
uint64_t
*
)
pSrcStartCol
->
pData
;
SColumnInfoData
*
pSrcEndCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
uint64_t
*
tsEndCol
=
(
uint64_t
*
)
pSrcEndCol
->
pData
;
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
i
++
)
{
if
(
tsStartCol
[
i
]
<
skey
)
{
tsStartCol
[
i
]
=
skey
;
}
if
(
tsEndCol
[
i
]
>=
skey
)
{
p
[
i
]
=
true
;
}
else
{
// this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX]
hasUnqualified
=
true
;
}
}
if
(
hasUnqualified
)
{
trimDataBlock
(
pBlock
,
pBlock
->
info
.
rows
,
p
);
}
qDebug
(
"%s re-build delete datablock, start key revised to:%"
PRId64
", rows:%"
PRId64
,
id
,
skey
,
pBlock
->
info
.
rows
);
taosMemoryFree
(
p
);
}
static
int32_t
setBlockIntoRes
(
SStreamScanInfo
*
pInfo
,
const
SSDataBlock
*
pBlock
,
STimeWindow
*
pTimeWindow
,
bool
filter
)
{
SDataBlockInfo
*
pBlockInfo
=
&
pInfo
->
pRes
->
info
;
SOperatorInfo
*
pOperator
=
pInfo
->
pStreamScanOp
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
const
char
*
id
=
GET_TASKID
(
pTaskInfo
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pBlock
->
info
.
rows
);
...
...
@@ -1593,7 +1669,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
// currently only the tbname pseudo column
if
(
pInfo
->
numOfPseudoExpr
>
0
)
{
int32_t
code
=
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pInfo
->
pPseudoExpr
,
pInfo
->
numOfPseudoExpr
,
pInfo
->
pRes
,
pBlockInfo
->
rows
,
GET_TASKID
(
pTaskInfo
)
,
&
pTableScanInfo
->
base
.
metaCache
);
pBlockInfo
->
rows
,
id
,
&
pTableScanInfo
->
base
.
metaCache
);
// ignore the table not exists error, since this table may have been dropped during the scan procedure.
if
(
code
!=
TSDB_CODE_SUCCESS
&&
code
!=
TSDB_CODE_PAR_TABLE_NOT_EXIST
)
{
blockDataFreeRes
((
SSDataBlock
*
)
pBlock
);
...
...
@@ -1608,8 +1684,14 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
doFilter
(
pInfo
->
pRes
,
pOperator
->
exprSupp
.
pFilterInfo
,
NULL
);
}
// filter the block extracted from WAL files, according to the time window apply additional time window filter
doBlockDataWindowFilter
(
pInfo
->
pRes
,
pInfo
->
primaryTsIndex
,
pTimeWindow
,
id
);
pInfo
->
pRes
->
info
.
dataLoad
=
1
;
blockDataUpdateTsWindow
(
pInfo
->
pRes
,
pInfo
->
primaryTsIndex
);
if
(
pInfo
->
pRes
->
info
.
rows
==
0
)
{
return
0
;
}
calBlockTbName
(
pInfo
,
pInfo
->
pRes
);
return
0
;
...
...
@@ -1666,7 +1748,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
qDebug
(
"doQueueScan get data from log %"
PRId64
" rows, version:%"
PRId64
,
pRes
->
info
.
rows
,
pTaskInfo
->
streamInfo
.
currentOffset
.
version
);
blockDataCleanup
(
pInfo
->
pRes
);
setBlockIntoRes
(
pInfo
,
pRes
,
true
);
STimeWindow
defaultWindow
=
{.
skey
=
INT64_MIN
,
.
ekey
=
INT64_MAX
};
setBlockIntoRes
(
pInfo
,
pRes
,
&
defaultWindow
,
true
);
if
(
pInfo
->
pRes
->
info
.
rows
>
0
)
{
return
pInfo
->
pRes
;
}
...
...
@@ -1775,80 +1858,6 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
}
}
static
void
doBlockDataWindowFilter
(
SSDataBlock
*
pBlock
,
int32_t
tsIndex
,
STimeWindow
*
pWindow
,
const
char
*
id
)
{
if
(
pWindow
->
skey
!=
INT64_MIN
||
pWindow
->
ekey
!=
INT64_MAX
)
{
bool
*
p
=
taosMemoryCalloc
(
pBlock
->
info
.
rows
,
sizeof
(
bool
));
bool
hasUnqualified
=
false
;
SColumnInfoData
*
pCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
tsIndex
);
if
(
pWindow
->
skey
!=
INT64_MIN
)
{
qDebug
(
"%s filter for additional history window, skey:%"
PRId64
,
id
,
pWindow
->
skey
);
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
int64_t
*
ts
=
(
int64_t
*
)
colDataGetData
(
pCol
,
i
);
p
[
i
]
=
(
*
ts
>=
pWindow
->
skey
);
if
(
!
p
[
i
])
{
hasUnqualified
=
true
;
}
}
}
else
if
(
pWindow
->
ekey
!=
INT64_MAX
)
{
qDebug
(
"%s filter for additional history window, ekey:%"
PRId64
,
id
,
pWindow
->
ekey
);
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
int64_t
*
ts
=
(
int64_t
*
)
colDataGetData
(
pCol
,
i
);
p
[
i
]
=
(
*
ts
<=
pWindow
->
ekey
);
if
(
!
p
[
i
])
{
hasUnqualified
=
true
;
}
}
}
if
(
hasUnqualified
)
{
trimDataBlock
(
pBlock
,
pBlock
->
info
.
rows
,
p
);
}
taosMemoryFree
(
p
);
}
}
// re-build the delete block, ONLY according to the split timestamp
static
void
rebuildDeleteBlockData
(
SSDataBlock
*
pBlock
,
int64_t
skey
,
const
char
*
id
)
{
if
(
skey
==
INT64_MIN
)
{
return
;
}
int32_t
numOfRows
=
pBlock
->
info
.
rows
;
bool
*
p
=
taosMemoryCalloc
(
numOfRows
,
sizeof
(
bool
));
bool
hasUnqualified
=
false
;
SColumnInfoData
*
pSrcStartCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
uint64_t
*
tsStartCol
=
(
uint64_t
*
)
pSrcStartCol
->
pData
;
SColumnInfoData
*
pSrcEndCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
uint64_t
*
tsEndCol
=
(
uint64_t
*
)
pSrcEndCol
->
pData
;
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
i
++
)
{
if
(
tsStartCol
[
i
]
<
skey
)
{
tsStartCol
[
i
]
=
skey
;
}
if
(
tsEndCol
[
i
]
>=
skey
)
{
p
[
i
]
=
true
;
}
else
{
// this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX]
hasUnqualified
=
true
;
}
}
if
(
hasUnqualified
)
{
trimDataBlock
(
pBlock
,
pBlock
->
info
.
rows
,
p
);
}
qDebug
(
"%s re-build delete datablock, start key revised to:%"
PRId64
", rows:%"
PRId64
,
id
,
skey
,
pBlock
->
info
.
rows
);
taosMemoryFree
(
p
);
}
static
SSDataBlock
*
doStreamScan
(
SOperatorInfo
*
pOperator
)
{
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
...
...
@@ -2121,8 +2130,7 @@ FETCH_NEXT_BLOCK:
return
pInfo
->
pUpdateRes
;
}
SSDataBlock
*
pBlock
=
pInfo
->
pRes
;
SDataBlockInfo
*
pBlockInfo
=
&
pBlock
->
info
;
SDataBlockInfo
*
pBlockInfo
=
&
pInfo
->
pRes
->
info
;
int32_t
totalBlocks
=
taosArrayGetSize
(
pInfo
->
pBlockLists
);
NEXT_SUBMIT_BLK:
...
...
@@ -2146,21 +2154,23 @@ FETCH_NEXT_BLOCK:
}
}
blockDataCleanup
(
p
Block
);
blockDataCleanup
(
p
Info
->
pRes
);
while
(
pAPI
->
tqReaderFn
.
tqNextBlockImpl
(
pInfo
->
tqReader
,
id
))
{
SSDataBlock
*
pRes
=
NULL
;
int32_t
code
=
pAPI
->
tqReaderFn
.
tqRetrieveBlock
(
pInfo
->
tqReader
,
&
pRes
,
id
);
qDebug
(
"retrieve data from submit completed code:%s, rows:%"
PRId64
" %s"
,
tstrerror
(
code
),
pRes
->
info
.
rows
,
id
);
qDebug
(
"retrieve data from submit completed code:%s rows:%"
PRId64
" %s"
,
tstrerror
(
code
),
pRes
->
info
.
rows
,
id
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
pRes
->
info
.
rows
==
0
)
{
qDebug
(
"retrieve data failed, try next block in submit block, %s"
,
id
);
continue
;
}
setBlockIntoRes
(
pInfo
,
pRes
,
false
);
setBlockIntoRes
(
pInfo
,
pRes
,
&
pStreamInfo
->
fillHistoryWindow
,
false
);
if
(
pInfo
->
pRes
->
info
.
rows
==
0
)
{
continue
;
}
if
(
pInfo
->
pCreateTbRes
->
info
.
rows
>
0
)
{
pInfo
->
scanMode
=
STREAM_SCAN_FROM_RES
;
...
...
@@ -2168,13 +2178,8 @@ FETCH_NEXT_BLOCK:
return
pInfo
->
pCreateTbRes
;
}
// apply additional time window filter
doBlockDataWindowFilter
(
pBlock
,
pInfo
->
primaryTsIndex
,
&
pStreamInfo
->
fillHistoryWindow
,
id
);
pBlock
->
info
.
dataLoad
=
1
;
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
primaryTsIndex
);
doCheckUpdate
(
pInfo
,
pBlockInfo
->
window
.
ekey
,
pBlock
);
doFilter
(
pBlock
,
pOperator
->
exprSupp
.
pFilterInfo
,
NULL
);
doCheckUpdate
(
pInfo
,
pBlockInfo
->
window
.
ekey
,
pInfo
->
pRes
);
doFilter
(
pInfo
->
pRes
,
pOperator
->
exprSupp
.
pFilterInfo
,
NULL
);
int64_t
numOfUpdateRes
=
pInfo
->
pUpdateDataRes
->
info
.
rows
;
qDebug
(
"%s %"
PRId64
" rows in datablock, update res:%"
PRId64
,
id
,
pBlockInfo
->
rows
,
numOfUpdateRes
);
...
...
@@ -2196,7 +2201,7 @@ FETCH_NEXT_BLOCK:
qDebug
(
"stream scan completed, and return source rows:%"
PRId64
", %s"
,
pBlockInfo
->
rows
,
id
);
if
(
pBlockInfo
->
rows
>
0
)
{
return
p
Block
;
return
p
Info
->
pRes
;
}
if
(
pInfo
->
pUpdateDataRes
->
info
.
rows
>
0
)
{
...
...
@@ -2587,7 +2592,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo
->
igCheckUpdate
=
pTableScanNode
->
igCheckUpdate
;
pInfo
->
igExpired
=
pTableScanNode
->
igExpired
;
pInfo
->
twAggSup
.
maxTs
=
INT64_MIN
;
pInfo
->
pState
=
NULL
;
pInfo
->
pState
=
pTaskInfo
->
streamInfo
.
pState
;
pInfo
->
stateStore
=
pTaskInfo
->
storageAPI
.
stateStore
;
pInfo
->
readerFn
=
pTaskInfo
->
storageAPI
.
tqReaderFn
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
a1203e40
...
...
@@ -3736,7 +3736,6 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
setSessionOutputBuf
(
pAggSup
,
pSeKeyBuf
[
i
].
win
.
skey
,
pSeKeyBuf
[
i
].
win
.
ekey
,
pSeKeyBuf
[
i
].
groupId
,
&
winInfo
);
int32_t
winNum
=
compactSessionWindow
(
pOperator
,
&
winInfo
,
pInfo
->
pStUpdated
,
pInfo
->
pStDeleted
,
true
);
if
(
winNum
>
0
)
{
saveSessionOutputBuf
(
pAggSup
,
&
winInfo
);
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
)
{
saveResult
(
winInfo
,
pInfo
->
pStUpdated
);
}
else
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_WINDOW_CLOSE
)
{
...
...
@@ -3747,9 +3746,8 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
getSessionHashKey
(
&
winInfo
.
sessionWin
,
&
key
);
tSimpleHashPut
(
pAggSup
->
pResultRows
,
&
key
,
sizeof
(
SSessionKey
),
&
winInfo
,
sizeof
(
SResultWindowInfo
));
}
}
else
{
releaseOutputBuf
(
pAggSup
->
pState
,
NULL
,
(
SResultRow
*
)
winInfo
.
pOutputBuf
,
&
pAggSup
->
stateStore
);
}
saveSessionOutputBuf
(
pAggSup
,
&
winInfo
);
}
taosMemoryFree
(
pBuf
);
...
...
@@ -4398,7 +4396,6 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
setStateOutputBuf
(
pAggSup
,
pSeKeyBuf
[
i
].
win
.
skey
,
pSeKeyBuf
[
i
].
groupId
,
NULL
,
&
curInfo
,
&
nextInfo
);
if
(
compareStateKey
(
curInfo
.
pStateKey
,
nextInfo
.
pStateKey
))
{
compactStateWindow
(
pOperator
,
&
curInfo
.
winInfo
,
&
nextInfo
.
winInfo
,
pInfo
->
pSeUpdated
,
pInfo
->
pSeUpdated
);
saveSessionOutputBuf
(
pAggSup
,
&
curInfo
.
winInfo
);
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
)
{
saveResult
(
curInfo
.
winInfo
,
pInfo
->
pSeUpdated
);
}
else
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_WINDOW_CLOSE
)
{
...
...
@@ -4409,14 +4406,12 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
getSessionHashKey
(
&
curInfo
.
winInfo
.
sessionWin
,
&
key
);
tSimpleHashPut
(
pAggSup
->
pResultRows
,
&
key
,
sizeof
(
SSessionKey
),
&
curInfo
.
winInfo
,
sizeof
(
SResultWindowInfo
));
}
}
else
if
(
IS_VALID_SESSION_WIN
(
nextInfo
.
winInfo
))
{
releaseOutputBuf
(
pAggSup
->
pState
,
NULL
,
(
SResultRow
*
)
nextInfo
.
winInfo
.
pOutputBuf
,
&
pAggSup
->
pSessionAPI
->
stateStore
);
}
if
(
IS_VALID_SESSION_WIN
(
curInfo
.
winInfo
))
{
releaseOutputBuf
(
pAggSup
->
pState
,
NULL
,
(
SResultRow
*
)
curInfo
.
winInfo
.
pOutputBuf
,
&
pAggSup
->
pSessionAPI
->
stateStore
);
}
if
(
IS_VALID_SESSION_WIN
(
nextInfo
.
winInfo
))
{
releaseOutputBuf
(
pAggSup
->
pState
,
NULL
,
(
SResultRow
*
)
nextInfo
.
winInfo
.
pOutputBuf
,
&
pAggSup
->
pSessionAPI
->
stateStore
);
saveSessionOutputBuf
(
pAggSup
,
&
curInfo
.
winInfo
);
}
}
taosMemoryFree
(
pBuf
);
...
...
source/libs/parser/inc/parUtil.h
浏览文件 @
a1203e40
...
...
@@ -118,6 +118,12 @@ int32_t getDnodeListFromCache(SParseMetaCache* pMetaCache, SArray** pDnodes);
void
destoryParseMetaCache
(
SParseMetaCache
*
pMetaCache
,
bool
request
);
SNode
*
createSelectStmtImpl
(
bool
isDistinct
,
SNodeList
*
pProjectionList
,
SNode
*
pTable
);
/**
* @brief return a - b with overflow check
* @retval val range between [INT64_MIN, INT64_MAX]
*/
int64_t
int64SafeSub
(
int64_t
a
,
int64_t
b
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
a1203e40
...
...
@@ -3296,23 +3296,25 @@ static int32_t checkFill(STranslateContext* pCxt, SFillNode* pFill, SValueNode*
if
(
NULL
==
pInterval
)
{
return
TSDB_CODE_SUCCESS
;
}
int64_t
timeRange
=
TABS
(
pFill
->
timeRange
.
skey
-
pFill
->
timeRange
.
ekey
);
int64_t
timeRange
=
0
;
int64_t
intervalRange
=
0
;
if
(
IS_CALENDAR_TIME_DURATION
(
pInterval
->
unit
))
{
int64_t
f
=
1
;
if
(
pInterval
->
unit
==
'n'
)
{
f
=
30LL
*
MILLISECOND_PER_DAY
;
}
else
if
(
pInterval
->
unit
==
'y'
)
{
f
=
365LL
*
MILLISECOND_PER_DAY
;
}
intervalRange
=
pInterval
->
datum
.
i
*
f
;
}
else
{
intervalRange
=
pInterval
->
datum
.
i
;
}
if
((
timeRange
/
intervalRange
)
>=
MAX_INTERVAL_TIME_WINDOW
)
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE
);
if
(
!
pCxt
->
createStream
)
{
int64_t
res
=
int64SafeSub
(
pFill
->
timeRange
.
skey
,
pFill
->
timeRange
.
ekey
);
timeRange
=
res
<
0
?
res
==
INT64_MIN
?
INT64_MAX
:
-
res
:
res
;
if
(
IS_CALENDAR_TIME_DURATION
(
pInterval
->
unit
))
{
int64_t
f
=
1
;
if
(
pInterval
->
unit
==
'n'
)
{
f
=
30LL
*
MILLISECOND_PER_DAY
;
}
else
if
(
pInterval
->
unit
==
'y'
)
{
f
=
365LL
*
MILLISECOND_PER_DAY
;
}
intervalRange
=
pInterval
->
datum
.
i
*
f
;
}
else
{
intervalRange
=
pInterval
->
datum
.
i
;
}
if
((
timeRange
/
intervalRange
)
>=
MAX_INTERVAL_TIME_WINDOW
)
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE
);
}
}
return
TSDB_CODE_SUCCESS
;
...
...
source/libs/parser/src/parUtil.c
浏览文件 @
a1203e40
...
...
@@ -1142,3 +1142,18 @@ void destoryParseMetaCache(SParseMetaCache* pMetaCache, bool request) {
taosHashCleanup
(
pMetaCache
->
pTableIndex
);
taosHashCleanup
(
pMetaCache
->
pTableCfg
);
}
int64_t
int64SafeSub
(
int64_t
a
,
int64_t
b
)
{
int64_t
res
=
(
uint64_t
)
a
-
(
uint64_t
)
b
;
if
(
a
>=
0
&&
b
<
0
)
{
if
((
uint64_t
)
res
>
(
uint64_t
)
INT64_MAX
)
{
// overflow
res
=
INT64_MAX
;
}
}
else
if
(
a
<
0
&&
b
>
0
&&
res
>=
0
)
{
// underflow
res
=
INT64_MIN
;
}
return
res
;
}
source/libs/stream/src/stream.c
浏览文件 @
a1203e40
...
...
@@ -379,7 +379,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
return
-
1
;
}
qDebug
(
"s-task:%s
data block enqueue, current(blocks:%d, size:%.2fMiB)
"
,
pTask
->
id
.
idStr
,
total
,
size
);
qDebug
(
"s-task:%s
blockdata enqueue, total in queue:%d, size:%.2fMiB
"
,
pTask
->
id
.
idStr
,
total
,
size
);
int32_t
code
=
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
destroyStreamDataBlock
((
SStreamDataBlock
*
)
pItem
);
...
...
source/libs/stream/src/streamExec.c
浏览文件 @
a1203e40
...
...
@@ -172,6 +172,12 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
bool
finished
=
false
;
while
(
1
)
{
if
(
streamTaskShouldPause
(
&
pTask
->
status
))
{
double
el
=
(
taosGetTimestampMs
()
-
pTask
->
tsInfo
.
step1Start
)
/
1000
.
0
;
qDebug
(
"s-task:%s paused from the scan-history task, elapsed time:%.2fsec"
,
pTask
->
id
.
idStr
,
el
);
return
0
;
}
SArray
*
pRes
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
if
(
pRes
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -404,6 +410,8 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
streamTaskReleaseState
(
pTask
);
streamTaskReloadState
(
pStreamTask
);
// clear the link between fill-history task and stream task info
pStreamTask
->
historyTaskId
.
taskId
=
0
;
streamTaskResumeFromHalt
(
pStreamTask
);
qDebug
(
"s-task:%s fill-history task set status to be dropping, save the state into disk"
,
pTask
->
id
.
idStr
);
...
...
@@ -414,6 +422,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
// save to disk
taosWLockLatch
(
&
pMeta
->
lock
);
streamMetaSaveTask
(
pMeta
,
pStreamTask
);
if
(
streamMetaCommit
(
pMeta
)
<
0
)
{
// persist to disk
...
...
@@ -615,7 +624,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
// todo the task should be commit here
if
(
taosQueueEmpty
(
pTask
->
inputQueue
->
queue
))
{
// fill-history WAL scan has completed
if
(
pTask
->
status
.
taskStatus
==
TASK_STATUS__SCAN_HISTORY_WAL
&&
pTask
->
status
.
transferState
==
true
)
{
if
(
pTask
->
info
.
taskLevel
==
TASK_LEVEL__SOURCE
&&
pTask
->
status
.
transferState
==
true
)
{
streamTaskRecoverSetAllStepFinished
(
pTask
);
streamTaskEndScanWAL
(
pTask
);
}
else
{
...
...
source/libs/stream/src/streamRecover.c
浏览文件 @
a1203e40
...
...
@@ -85,6 +85,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
if
(
pTask
->
info
.
fillHistory
)
{
streamSetParamForScanHistory
(
pTask
);
}
streamTaskEnablePause
(
pTask
);
streamTaskScanHistoryPrepare
(
pTask
);
}
else
if
(
pTask
->
info
.
taskLevel
==
TASK_LEVEL__SINK
)
{
qDebug
(
"s-task:%s sink task do nothing to handle scan-history"
,
pTask
->
id
.
idStr
);
...
...
@@ -839,7 +840,7 @@ void streamTaskPause(SStreamTask* pTask) {
return
;
}
while
(
!
pTask
->
status
.
pauseAllowed
||
(
pTask
->
status
.
taskStatus
==
TASK_STATUS__HALT
))
{
while
(
!
pTask
->
status
.
pauseAllowed
||
(
pTask
->
status
.
taskStatus
==
TASK_STATUS__HALT
))
{
status
=
pTask
->
status
.
taskStatus
;
if
(
status
==
TASK_STATUS__DROPPING
)
{
qDebug
(
"vgId:%d s-task:%s task already dropped, do nothing"
,
pMeta
->
vgId
,
pTask
->
id
.
idStr
);
...
...
@@ -856,8 +857,19 @@ void streamTaskPause(SStreamTask* pTask) {
taosMsleep
(
100
);
}
// todo: use the lock of the task.
taosWLockLatch
(
&
pMeta
->
lock
);
status
=
pTask
->
status
.
taskStatus
;
if
(
status
==
TASK_STATUS__DROPPING
||
status
==
TASK_STATUS__STOP
)
{
taosWUnLockLatch
(
&
pMeta
->
lock
);
qDebug
(
"vgId:%d s-task:%s task already dropped/stopped/paused, do nothing"
,
pMeta
->
vgId
,
pTask
->
id
.
idStr
);
return
;
}
atomic_store_8
(
&
pTask
->
status
.
keepTaskStatus
,
pTask
->
status
.
taskStatus
);
atomic_store_8
(
&
pTask
->
status
.
taskStatus
,
TASK_STATUS__PAUSE
);
taosWUnLockLatch
(
&
pMeta
->
lock
);
int64_t
el
=
taosGetTimestampMs
()
-
st
;
qDebug
(
"vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms"
,
pMeta
->
vgId
,
pTask
->
id
.
idStr
,
...
...
source/libs/wal/src/walRead.c
浏览文件 @
a1203e40
...
...
@@ -371,9 +371,9 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
pRead
->
pWal
->
vers
.
appliedVer
);
// TODO: valid ver
// if (ver > pRead->pWal->vers.applied
Ver) {
//
return -1;
//
}
if
(
ver
>
pRead
->
pWal
->
vers
.
commit
Ver
)
{
return
-
1
;
}
if
(
pRead
->
curVersion
!=
ver
)
{
code
=
walReaderSeekVer
(
pRead
,
ver
);
...
...
source/util/CMakeLists.txt
浏览文件 @
a1203e40
...
...
@@ -5,6 +5,13 @@ if (DEFINED GRANT_CFG_INCLUDE_DIR)
add_definitions
(
-DGRANTS_CFG
)
endif
()
IF
(
${
ASSERT_NOT_CORE
}
)
ADD_DEFINITIONS
(
-DASSERT_NOT_CORE
)
MESSAGE
(
STATUS
"disable assert core"
)
ELSE
()
MESSAGE
(
STATUS
"enable assert core"
)
ENDIF
(
${
ASSERT_NOT_CORE
}
)
target_include_directories
(
util
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/util"
...
...
source/util/src/terror.c
浏览文件 @
a1203e40
...
...
@@ -643,6 +643,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed")
TAOS_DEFINE_ERROR
(
TSDB_CODE_TMQ_CONSUMER_ERROR
,
"Consumer error, to see log"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE
,
"Topic num out of range"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE
,
"Group num out of range 100"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TMQ_SAME_COMMITTED_VALUE
,
"Same committed value"
)
// stream
TAOS_DEFINE_ERROR
(
TSDB_CODE_STREAM_TASK_NOT_EXIST
,
"Stream task not exist"
)
...
...
source/util/src/tlog.c
浏览文件 @
a1203e40
...
...
@@ -76,7 +76,11 @@ static int32_t tsDaylightActive; /* Currently in daylight saving time. */
bool
tsLogEmbedded
=
0
;
bool
tsAsyncLog
=
true
;
#ifdef ASSERT_NOT_CORE
bool
tsAssert
=
false
;
#else
bool
tsAssert
=
true
;
#endif
int32_t
tsNumOfLogLines
=
10000000
;
int32_t
tsLogKeepDays
=
0
;
LogFp
tsLogFp
=
NULL
;
...
...
tests/parallel_test/cases.task
浏览文件 @
a1203e40
...
...
@@ -105,7 +105,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb-funcNFilter.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py
#
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDnodeRestart.py
...
...
@@ -345,6 +345,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/smaTest.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/smaTest.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/sma_index.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml_TS-3724.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/spread.py
...
...
@@ -450,7 +451,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 -n 3
,,n,system-test,python3 ./test.py -f 6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py -N 6 -M 3
,,n,system-test,python ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1
#
,,n,system-test,python ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3
#,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5
...
...
tests/system-test/1-insert/opentsdb_json_taosc_insert.py
浏览文件 @
a1203e40
...
...
@@ -24,6 +24,8 @@ import threading
import
json
class
TDTestCase
:
updatecfgDict
=
{
'clientCfg'
:
{
'smlDot2Underline'
:
0
}}
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
"start to execute %s"
%
__file__
)
...
...
tests/system-test/1-insert/opentsdb_telnet_line_taosc_insert.py
浏览文件 @
a1203e40
...
...
@@ -28,6 +28,8 @@ if platform.system().lower() == 'windows':
sys
.
stdout
=
io
.
TextIOWrapper
(
sys
.
stdout
.
buffer
,
encoding
=
'utf8'
)
class
TDTestCase
:
updatecfgDict
=
{
'clientCfg'
:
{
'smlDot2Underline'
:
0
}}
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
"start to execute %s"
%
__file__
)
...
...
tests/system-test/2-query/sml.py
浏览文件 @
a1203e40
...
...
@@ -15,7 +15,7 @@ sys.path.append("./7-tmq")
from
tmqCommon
import
*
class
TDTestCase
:
updatecfgDict
=
{
'clientCfg'
:
{
'smlChildTableName'
:
'dataModelName'
,
'fqdn'
:
'localhost'
},
'fqdn'
:
'localhost'
}
updatecfgDict
=
{
'clientCfg'
:
{
'smlChildTableName'
:
'dataModelName'
,
'fqdn'
:
'localhost'
,
'smlDot2Underline'
:
0
},
'fqdn'
:
'localhost'
}
print
(
"===================: "
,
updatecfgDict
)
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
...
...
@@ -101,6 +101,15 @@ class TDTestCase:
tdSql
.
query
(
f
"desc
{
dbname
}
.macylr"
)
tdSql
.
checkRows
(
25
)
tdSql
.
query
(
f
"select * from ts3724.`.stb2`"
)
tdSql
.
checkRows
(
1
)
tdSql
.
query
(
f
"select * from ts3724.`stb.2`"
)
tdSql
.
checkRows
(
1
)
tdSql
.
query
(
f
"select * from ts3724.`stb2.`"
)
tdSql
.
checkRows
(
1
)
return
def
run
(
self
):
...
...
tests/system-test/2-query/sml_TS-3724.py
0 → 100644
浏览文件 @
a1203e40
import
taos
import
sys
import
time
import
socket
import
os
import
threading
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
updatecfgDict
=
{
'clientCfg'
:
{
'smlChildTableName'
:
'dataModelName'
,
'fqdn'
:
'localhost'
,
'smlTsDefaultName'
:
"times"
},
'fqdn'
:
'localhost'
}
print
(
"===================: "
,
updatecfgDict
)
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
(),
True
)
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def
checkContent
(
self
,
dbname
=
"sml_db"
):
simClientCfg
=
"%s/taos.cfg"
%
tdDnodes
.
getSimCfgPath
()
buildPath
=
tdCom
.
getBuildPath
()
cmdStr
=
'%s/build/bin/sml_test %s'
%
(
buildPath
,
simClientCfg
)
print
(
"cmdStr:"
,
cmdStr
)
tdLog
.
info
(
cmdStr
)
ret
=
os
.
system
(
cmdStr
)
if
ret
!=
0
:
tdLog
.
info
(
"sml_test ret != 0"
)
tdSql
.
query
(
f
"select * from ts3303.stb2"
)
tdSql
.
query
(
f
"select * from ts3303.meters"
)
# tdSql.execute('use sml_db')
tdSql
.
query
(
f
"select * from
{
dbname
}
.t_b7d815c9222ca64cdf2614c61de8f211"
)
tdSql
.
checkRows
(
1
)
tdSql
.
checkData
(
0
,
0
,
'2016-01-01 08:00:07.000'
)
tdSql
.
checkData
(
0
,
1
,
2000
)
tdSql
.
checkData
(
0
,
2
,
200
)
tdSql
.
checkData
(
0
,
3
,
15
)
tdSql
.
checkData
(
0
,
4
,
24.5208
)
tdSql
.
checkData
(
0
,
5
,
28.09377
)
tdSql
.
checkData
(
0
,
6
,
428
)
tdSql
.
checkData
(
0
,
7
,
0
)
tdSql
.
checkData
(
0
,
8
,
304
)
tdSql
.
checkData
(
0
,
9
,
0
)
tdSql
.
checkData
(
0
,
10
,
25
)
tdSql
.
query
(
f
"select * from
{
dbname
}
.readings"
)
tdSql
.
checkRows
(
9
)
tdSql
.
query
(
f
"select distinct tbname from
{
dbname
}
.readings"
)
tdSql
.
checkRows
(
4
)
tdSql
.
query
(
f
"select * from
{
dbname
}
.t_0799064f5487946e5d22164a822acfc8 order by times"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
3
,
"kk"
)
tdSql
.
checkData
(
1
,
3
,
""
)
tdSql
.
query
(
f
"select distinct tbname from
{
dbname
}
.`sys_if_bytes_out`"
)
tdSql
.
checkRows
(
2
)
tdSql
.
query
(
f
"select * from
{
dbname
}
.t_fc70dec6677d4277c5d9799c4da806da order by times"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
1
,
1.300000000
)
tdSql
.
checkData
(
1
,
1
,
13.000000000
)
tdSql
.
query
(
f
"select * from
{
dbname
}
.`sys_procs_running`"
)
tdSql
.
checkRows
(
1
)
tdSql
.
checkData
(
0
,
1
,
42.000000000
)
tdSql
.
checkData
(
0
,
2
,
"web01"
)
tdSql
.
query
(
f
"select distinct tbname from
{
dbname
}
.`sys_cpu_nice`"
)
tdSql
.
checkRows
(
3
)
tdSql
.
query
(
f
"select * from
{
dbname
}
.`sys_cpu_nice` order by times"
)
tdSql
.
checkRows
(
4
)
tdSql
.
checkData
(
0
,
1
,
13.000000000
)
tdSql
.
checkData
(
0
,
2
,
"web01"
)
tdSql
.
checkData
(
0
,
3
,
None
)
tdSql
.
checkData
(
0
,
4
,
"lga"
)
tdSql
.
checkData
(
1
,
1
,
9.000000000
)
tdSql
.
checkData
(
1
,
2
,
"web02"
)
tdSql
.
checkData
(
3
,
3
,
"t1"
)
tdSql
.
checkData
(
0
,
4
,
"lga"
)
tdSql
.
query
(
f
"select * from
{
dbname
}
.macylr"
)
tdSql
.
checkRows
(
2
)
tdSql
.
query
(
f
"select * from
{
dbname
}
.qelhxo"
)
tdSql
.
checkRows
(
5
)
tdSql
.
query
(
f
"desc
{
dbname
}
.macylr"
)
tdSql
.
checkRows
(
25
)
tdSql
.
query
(
f
"select * from ts3724._stb2"
)
tdSql
.
checkRows
(
1
)
tdSql
.
query
(
f
"select * from ts3724.stb_2"
)
tdSql
.
checkRows
(
1
)
tdSql
.
query
(
f
"select * from ts3724.stb2_"
)
tdSql
.
checkRows
(
1
)
return
def
run
(
self
):
tdSql
.
prepare
()
self
.
checkContent
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
浏览文件 @
a1203e40
...
...
@@ -222,9 +222,9 @@ class TDTestCase:
actConsumeTotalRows
=
resultList
[
0
]
if
not
(
actConsumeTotalRows
>
0
and
actConsumeTotalRows
<
totalRowsInserted
):
if
not
(
actConsumeTotalRows
>
=
0
and
actConsumeTotalRows
<=
totalRowsInserted
):
tdLog
.
info
(
"act consume rows: %d"
%
(
actConsumeTotalRows
))
tdLog
.
info
(
"and second consume rows should be between
0 and %d
"
%
(
totalRowsInserted
))
tdLog
.
info
(
"and second consume rows should be between
[0 and %d]
"
%
(
totalRowsInserted
))
tdLog
.
exit
(
"%d tmq consume rows error!"
%
consumerId
)
time
.
sleep
(
10
)
...
...
utils/test/c/sml_test.c
浏览文件 @
a1203e40
...
...
@@ -1522,6 +1522,36 @@ int sml_ts2385_Test() {
return
code
;
}
int
sml_ts3724_Test
()
{
TAOS
*
taos
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
TAOS_RES
*
pRes
=
taos_query
(
taos
,
"drop database if exists ts3724"
);
taos_free_result
(
pRes
);
pRes
=
taos_query
(
taos
,
"create database if not exists ts3724"
);
taos_free_result
(
pRes
);
const
char
*
sql
[]
=
{
"stb.2,t1=1 f1=283i32 1632299372000"
,
".stb2,t1=1 f1=106i32 1632299378000"
,
"stb2.,t1=1 f1=106i32 1632299378000"
,
};
pRes
=
taos_query
(
taos
,
"use ts3724"
);
taos_free_result
(
pRes
);
pRes
=
taos_schemaless_insert
(
taos
,
(
char
**
)
sql
,
sizeof
(
sql
)
/
sizeof
(
sql
[
0
]),
TSDB_SML_LINE_PROTOCOL
,
TSDB_SML_TIMESTAMP_MILLI_SECONDS
);
int
code
=
taos_errno
(
pRes
);
printf
(
"%s result0:%s
\n
"
,
__FUNCTION__
,
taos_errstr
(
pRes
));
taos_free_result
(
pRes
);
taos_close
(
taos
);
return
code
;
}
int
main
(
int
argc
,
char
*
argv
[])
{
if
(
argc
==
2
)
{
taos_options
(
TSDB_OPTION_CONFIGDIR
,
argv
[
1
]);
...
...
@@ -1579,5 +1609,8 @@ int main(int argc, char *argv[]) {
ASSERT
(
!
ret
);
ret
=
sml_19221_Test
();
ASSERT
(
!
ret
);
ret
=
sml_ts3724_Test
();
ASSERT
(
!
ret
);
return
ret
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录