Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
88fd71bd
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
88fd71bd
编写于
7月 18, 2023
作者:
K
kailixu
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into fix/TD-25236-3.0
上级
a154494f
2bb5ac94
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
103 addition
and
13 deletion
+103
-13
source/common/src/tglobal.c
source/common/src/tglobal.c
+0
-6
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+43
-2
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+3
-0
source/libs/command/src/command.c
source/libs/command/src/command.c
+25
-0
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+32
-5
未找到文件。
source/common/src/tglobal.c
浏览文件 @
88fd71bd
...
...
@@ -1498,14 +1498,8 @@ void taosCfgDynamicOptions(const char *option, const char *value) {
if
(
strcasecmp
(
option
,
"keepTimeOffset"
)
==
0
)
{
int32_t
newKeepTimeOffset
=
atoi
(
value
);
if
(
newKeepTimeOffset
<
0
||
newKeepTimeOffset
>
23
)
{
uError
(
"failed to set keepTimeOffset from %d to %d. Valid range: [0, 23]"
,
tsKeepTimeOffset
,
newKeepTimeOffset
);
return
;
}
uInfo
(
"keepTimeOffset set from %d to %d"
,
tsKeepTimeOffset
,
newKeepTimeOffset
);
tsKeepTimeOffset
=
newKeepTimeOffset
;
return
;
}
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
88fd71bd
...
...
@@ -70,6 +70,8 @@ static void mndCancelGetNextConfig(SMnode *pMnode, void *pIter);
static
int32_t
mndRetrieveDnodes
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextDnode
(
SMnode
*
pMnode
,
void
*
pIter
);
static
int32_t
mndMCfgGetValInt32
(
SMCfgDnodeReq
*
pInMCfgReq
,
int32_t
opLen
,
int32_t
*
pOutValue
);
int32_t
mndInitDnode
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{
.
sdbType
=
SDB_DNODE
,
...
...
@@ -947,7 +949,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
goto
_OVER
;
}
mInfo
(
"dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s"
,
mInfo
(
"dnode:%d, start to drop, ep:%s:%d, force:%s, unsafe:%s"
,
dropReq
.
dnodeId
,
dropReq
.
fqdn
,
dropReq
.
port
,
dropReq
.
force
?
"true"
:
"false"
,
dropReq
.
unsafe
?
"true"
:
"false"
);
if
(
mndCheckOperPrivilege
(
pMnode
,
pReq
->
info
.
conn
.
user
,
MND_OPER_DROP_MNODE
)
!=
0
)
{
goto
_OVER
;
...
...
@@ -987,7 +989,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
int32_t
numOfVnodes
=
mndGetVnodesNum
(
pMnode
,
pDnode
->
id
);
bool
isonline
=
mndIsDnodeOnline
(
pDnode
,
taosGetTimestampMs
());
if
(
isonline
&&
force
)
{
terrno
=
TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE
;
mError
(
"dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d"
,
pDnode
->
id
,
terrstr
(),
...
...
@@ -1060,6 +1062,20 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
strcpy
(
dcfgReq
.
config
,
"monitor"
);
snprintf
(
dcfgReq
.
value
,
TSDB_DNODE_VALUE_LEN
,
"%d"
,
flag
);
}
else
if
(
strncasecmp
(
cfgReq
.
config
,
"keeptimeoffset"
,
14
)
==
0
)
{
int32_t
optLen
=
strlen
(
"keeptimeoffset"
);
int32_t
flag
=
-
1
;
int32_t
code
=
mndMCfgGetValInt32
(
&
cfgReq
,
optLen
,
&
flag
);
if
(
code
<
0
)
return
code
;
if
(
flag
<
0
||
flag
>
23
)
{
mError
(
"dnode:%d, failed to config keepTimeOffset since value:%d. Valid range: [0, 23]"
,
cfgReq
.
dnodeId
,
flag
);
terrno
=
TSDB_CODE_INVALID_CFG
;
return
-
1
;
}
strcpy
(
dcfgReq
.
config
,
"keeptimeoffset"
);
snprintf
(
dcfgReq
.
value
,
TSDB_DNODE_VALUE_LEN
,
"%d"
,
flag
);
#ifdef TD_ENTERPRISE
}
else
if
(
strncasecmp
(
cfgReq
.
config
,
"activeCode"
,
10
)
==
0
||
strncasecmp
(
cfgReq
.
config
,
"cActiveCode"
,
11
)
==
0
)
{
int8_t
opt
=
strncasecmp
(
cfgReq
.
config
,
"a"
,
1
)
==
0
?
DND_ACTIVE_CODE
:
DND_CONN_ACTIVE_CODE
;
...
...
@@ -1292,3 +1308,28 @@ static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbCancelFetch
(
pSdb
,
pIter
);
}
// get int32_t value from 'SMCfgDnodeReq'
static
int32_t
mndMCfgGetValInt32
(
SMCfgDnodeReq
*
pMCfgReq
,
int32_t
opLen
,
int32_t
*
pOutValue
)
{
terrno
=
0
;
if
(
' '
!=
pMCfgReq
->
config
[
opLen
]
&&
0
!=
pMCfgReq
->
config
[
opLen
])
{
goto
_err
;
}
if
(
' '
==
pMCfgReq
->
config
[
opLen
])
{
// 'key value'
if
(
strlen
(
pMCfgReq
->
value
)
!=
0
)
goto
_err
;
*
pOutValue
=
atoi
(
pMCfgReq
->
config
+
opLen
+
1
);
}
else
{
// 'key' 'value'
if
(
strlen
(
pMCfgReq
->
value
)
==
0
)
goto
_err
;
*
pOutValue
=
atoi
(
pMCfgReq
->
value
);
}
return
0
;
_err:
mError
(
"dnode:%d, failed to config keeptimeoffset since invalid conf:%s"
,
pMCfgReq
->
dnodeId
,
pMCfgReq
->
config
);
terrno
=
TSDB_CODE_INVALID_CFG
;
return
-
1
;
}
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
88fd71bd
...
...
@@ -1738,6 +1738,7 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa
SSchema
*
pSrcSchema
=
&
pStb
->
pColumns
[
i
];
memcpy
(
pSchema
->
name
,
pSrcSchema
->
name
,
TSDB_COL_NAME_LEN
);
pSchema
->
type
=
pSrcSchema
->
type
;
pSchema
->
flags
=
pSrcSchema
->
flags
;
pSchema
->
colId
=
pSrcSchema
->
colId
;
pSchema
->
bytes
=
pSrcSchema
->
bytes
;
}
...
...
@@ -1788,6 +1789,7 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName,
SSchema
*
pSrcSchema
=
&
pStb
->
pColumns
[
i
];
memcpy
(
pSchema
->
name
,
pSrcSchema
->
name
,
TSDB_COL_NAME_LEN
);
pSchema
->
type
=
pSrcSchema
->
type
;
pSchema
->
flags
=
pSrcSchema
->
flags
;
pSchema
->
colId
=
pSrcSchema
->
colId
;
pSchema
->
bytes
=
pSrcSchema
->
bytes
;
}
...
...
@@ -1797,6 +1799,7 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName,
SSchema
*
pSrcSchema
=
&
pStb
->
pTags
[
i
];
memcpy
(
pSchema
->
name
,
pSrcSchema
->
name
,
TSDB_COL_NAME_LEN
);
pSchema
->
type
=
pSrcSchema
->
type
;
pSchema
->
flags
=
pSrcSchema
->
flags
;
pSchema
->
colId
=
pSrcSchema
->
colId
;
pSchema
->
bytes
=
pSrcSchema
->
bytes
;
}
...
...
source/libs/command/src/command.c
浏览文件 @
88fd71bd
...
...
@@ -615,6 +615,31 @@ void appendTableOptions(char* buf, int32_t* len, SDbCfgInfo* pDbCfg, STableCfg*
if
(
pCfg
->
ttl
>
0
)
{
*
len
+=
sprintf
(
buf
+
VARSTR_HEADER_SIZE
+
*
len
,
" TTL %d"
,
pCfg
->
ttl
);
}
if
(
TSDB_SUPER_TABLE
==
pCfg
->
tableType
||
TSDB_NORMAL_TABLE
==
pCfg
->
tableType
)
{
int32_t
nSma
=
0
;
for
(
int32_t
i
=
0
;
i
<
pCfg
->
numOfColumns
;
++
i
)
{
if
(
IS_BSMA_ON
(
pCfg
->
pSchemas
+
i
))
{
++
nSma
;
}
}
if
(
nSma
<
pCfg
->
numOfColumns
)
{
bool
smaOn
=
false
;
*
len
+=
sprintf
(
buf
+
VARSTR_HEADER_SIZE
+
*
len
,
" SMA("
);
for
(
int32_t
i
=
0
;
i
<
pCfg
->
numOfColumns
;
++
i
)
{
if
(
IS_BSMA_ON
(
pCfg
->
pSchemas
+
i
))
{
if
(
smaOn
)
{
*
len
+=
sprintf
(
buf
+
VARSTR_HEADER_SIZE
+
*
len
,
",`%s`"
,
(
pCfg
->
pSchemas
+
i
)
->
name
);
}
else
{
smaOn
=
true
;
*
len
+=
sprintf
(
buf
+
VARSTR_HEADER_SIZE
+
*
len
,
"`%s`"
,
(
pCfg
->
pSchemas
+
i
)
->
name
);
}
}
}
*
len
+=
sprintf
(
buf
+
VARSTR_HEADER_SIZE
+
*
len
,
")"
);
}
}
}
static
int32_t
setCreateTBResultIntoDataBlock
(
SSDataBlock
*
pBlock
,
SDbCfgInfo
*
pDbCfg
,
char
*
tbName
,
STableCfg
*
pCfg
)
{
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
88fd71bd
...
...
@@ -4036,7 +4036,7 @@ bool isEqualStateKey(SStateWindowInfo* pWin, char* pKeyData) {
bool
compareStateKey
(
void
*
data
,
void
*
key
)
{
if
(
!
data
||
!
key
)
{
return
tru
e
;
return
fals
e
;
}
SStateKeys
*
stateKey
=
(
SStateKeys
*
)
key
;
stateKey
->
pData
=
(
char
*
)
key
+
sizeof
(
SStateKeys
);
...
...
@@ -4062,7 +4062,13 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
if
(
code
==
TSDB_CODE_SUCCESS
&&
!
inWinRange
(
&
pAggSup
->
winRange
,
&
pCurWin
->
winInfo
.
sessionWin
.
win
))
{
code
=
TSDB_CODE_FAILED
;
releaseOutputBuf
(
pAggSup
->
pState
,
NULL
,
(
SResultRow
*
)
pCurWin
->
winInfo
.
pOutputBuf
,
&
pAggSup
->
pSessionAPI
->
stateStore
);
pCurWin
->
winInfo
.
pOutputBuf
=
taosMemoryMalloc
(
size
);
pCurWin
->
winInfo
.
pOutputBuf
=
taosMemoryCalloc
(
1
,
size
);
pCurWin
->
pStateKey
=
(
SStateKeys
*
)((
char
*
)
pCurWin
->
winInfo
.
pOutputBuf
+
(
pAggSup
->
resultRowSize
-
pAggSup
->
stateKeySize
));
pCurWin
->
pStateKey
->
bytes
=
pAggSup
->
stateKeySize
-
sizeof
(
SStateKeys
);
pCurWin
->
pStateKey
->
type
=
pAggSup
->
stateKeyType
;
pCurWin
->
pStateKey
->
pData
=
(
char
*
)
pCurWin
->
pStateKey
+
sizeof
(
SStateKeys
);
pCurWin
->
pStateKey
->
isNull
=
false
;
}
if
(
code
==
TSDB_CODE_SUCCESS
)
{
...
...
@@ -4076,11 +4082,19 @@ void setStateOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
}
pNextWin
->
winInfo
.
sessionWin
=
pCurWin
->
winInfo
.
sessionWin
;
pNextWin
->
winInfo
.
pOutputBuf
=
NULL
;
SStreamStateCur
*
pCur
=
pAggSup
->
stateStore
.
streamStateSessionSeekKeyNext
(
pAggSup
->
pState
,
&
pCurWin
->
winInfo
.
sessionWin
)
;
code
=
pAggSup
->
stateStore
.
streamStateSessionGetKVByCur
(
pCur
,
&
pNextWin
->
winInfo
.
sessionWin
,
NULL
,
0
);
SStreamStateCur
*
pCur
=
pAggSup
->
stateStore
.
streamStateSessionSeekKeyNext
(
pAggSup
->
pState
,
&
pNextWin
->
winInfo
.
sessionWin
)
;
int32_t
nextSize
=
pAggSup
->
resultRowSize
;
code
=
pAggSup
->
stateStore
.
streamStateSessionGetKVByCur
(
pCur
,
&
pNextWin
->
winInfo
.
sessionWin
,
&
pNextWin
->
winInfo
.
pOutputBuf
,
&
nextSize
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
SET_SESSION_WIN_INVALID
(
pNextWin
->
winInfo
);
}
else
{
pNextWin
->
pStateKey
=
(
SStateKeys
*
)((
char
*
)
pNextWin
->
winInfo
.
pOutputBuf
+
(
pAggSup
->
resultRowSize
-
pAggSup
->
stateKeySize
));
pNextWin
->
pStateKey
->
bytes
=
pAggSup
->
stateKeySize
-
sizeof
(
SStateKeys
);
pNextWin
->
pStateKey
->
type
=
pAggSup
->
stateKeyType
;
pNextWin
->
pStateKey
->
pData
=
(
char
*
)
pNextWin
->
pStateKey
+
sizeof
(
SStateKeys
);
pNextWin
->
pStateKey
->
isNull
=
false
;
pNextWin
->
winInfo
.
isOutput
=
true
;
}
pAggSup
->
stateStore
.
streamStateFreeCur
(
pCur
);
}
...
...
@@ -4156,6 +4170,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
SStateWindowInfo
curWin
=
{
0
};
SStateWindowInfo
nextWin
=
{
0
};
setStateOutputBuf
(
pAggSup
,
tsCols
[
i
],
groupId
,
pKeyData
,
&
curWin
,
&
nextWin
);
if
(
IS_VALID_SESSION_WIN
(
nextWin
.
winInfo
))
{
releaseOutputBuf
(
pAggSup
->
pState
,
NULL
,
(
SResultRow
*
)
nextWin
.
winInfo
.
pOutputBuf
,
&
pAPI
->
stateStore
);
}
setSessionWinOutputInfo
(
pSeUpdated
,
&
curWin
.
winInfo
);
winRows
=
updateStateWindowInfo
(
&
curWin
,
&
nextWin
,
tsCols
,
groupId
,
pKeyColInfo
,
rows
,
i
,
&
allEqual
,
pAggSup
->
pResultRows
,
pSeUpdated
,
pStDeleted
);
...
...
@@ -4346,9 +4363,19 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
for
(
int32_t
i
=
0
;
i
<
num
;
i
++
)
{
SStateWindowInfo
curInfo
=
{
0
};
SStateWindowInfo
nextInfo
=
{
0
};
SStateWindowInfo
dummy
=
{
0
};
setStateOutputBuf
(
pAggSup
,
pSeKeyBuf
[
i
].
win
.
skey
,
pSeKeyBuf
[
i
].
groupId
,
NULL
,
&
curInfo
,
&
nextInfo
);
if
(
compareStateKey
(
curInfo
.
pStateKey
,
nextInfo
.
pStateKey
))
{
compactStateWindow
(
pOperator
,
&
curInfo
.
winInfo
,
&
nextInfo
.
winInfo
,
pInfo
->
pStUpdated
,
pInfo
->
pStDeleted
);
saveResult
(
curInfo
.
winInfo
,
pInfo
->
pStUpdated
);
}
if
(
IS_VALID_SESSION_WIN
(
curInfo
.
winInfo
))
{
releaseOutputBuf
(
pAggSup
->
pState
,
NULL
,
(
SResultRow
*
)
curInfo
.
winInfo
.
pOutputBuf
,
&
pAggSup
->
pSessionAPI
->
stateStore
);
}
if
(
IS_VALID_SESSION_WIN
(
nextInfo
.
winInfo
))
{
releaseOutputBuf
(
pAggSup
->
pState
,
NULL
,
(
SResultRow
*
)
nextInfo
.
winInfo
.
pOutputBuf
,
&
pAggSup
->
pSessionAPI
->
stateStore
);
}
}
taosMemoryFree
(
pBuf
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录