Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9f4744c6
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9f4744c6
编写于
4月 08, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feature/qnode
上级
b0184c20
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
322 addition
and
7 deletion
+322
-7
include/common/tmsg.h
include/common/tmsg.h
+37
-0
include/common/tmsgdef.h
include/common/tmsgdef.h
+1
-0
include/libs/catalog/catalog.h
include/libs/catalog/catalog.h
+4
-0
source/common/src/tmsg.c
source/common/src/tmsg.c
+92
-0
source/dnode/mgmt/mm/mmHandle.c
source/dnode/mgmt/mm/mmHandle.c
+1
-0
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+2
-0
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+90
-0
source/dnode/mnode/impl/src/mndInfoSchema.c
source/dnode/mnode/impl/src/mndInfoSchema.c
+5
-1
source/libs/catalog/src/catalog.c
source/libs/catalog/src/catalog.c
+47
-1
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+2
-0
source/libs/qcom/src/querymsg.c
source/libs/qcom/src/querymsg.c
+38
-2
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+3
-3
未找到文件。
include/common/tmsg.h
浏览文件 @
9f4744c6
...
...
@@ -512,6 +512,7 @@ typedef struct {
int32_t
maxRows
;
int32_t
commitTime
;
int32_t
fsyncPeriod
;
int32_t
ttl
;
int8_t
walLevel
;
int8_t
precision
;
// time resolution
int8_t
compression
;
...
...
@@ -521,6 +522,7 @@ typedef struct {
int8_t
cacheLastRow
;
int8_t
ignoreExist
;
int8_t
streamMode
;
int8_t
singleSTable
;
int32_t
numOfRetensions
;
SArray
*
pRetensions
;
// SRetention
}
SCreateDbReq
;
...
...
@@ -586,6 +588,41 @@ int32_t tSerializeSUseDbRspImp(SCoder* pEncoder, const SUseDbRsp* pRsp);
int32_t
tDeserializeSUseDbRspImp
(
SCoder
*
pDecoder
,
SUseDbRsp
*
pRsp
);
void
tFreeSUsedbRsp
(
SUseDbRsp
*
pRsp
);
typedef
struct
{
char
db
[
TSDB_DB_FNAME_LEN
];
}
SDbCfgReq
;
int32_t
tSerializeSDbCfgReq
(
void
*
buf
,
int32_t
bufLen
,
SDbCfgReq
*
pReq
);
int32_t
tDeserializeSDbCfgReq
(
void
*
buf
,
int32_t
bufLen
,
SDbCfgReq
*
pReq
);
typedef
struct
{
int32_t
numOfVgroups
;
int32_t
cacheBlockSize
;
int32_t
totalBlocks
;
int32_t
daysPerFile
;
int32_t
daysToKeep0
;
int32_t
daysToKeep1
;
int32_t
daysToKeep2
;
int32_t
minRows
;
int32_t
maxRows
;
int32_t
commitTime
;
int32_t
fsyncPeriod
;
int32_t
ttl
;
int8_t
walLevel
;
int8_t
precision
;
int8_t
compression
;
int8_t
replications
;
int8_t
quorum
;
int8_t
update
;
int8_t
cacheLastRow
;
int8_t
streamMode
;
int8_t
singleSTable
;
}
SDbCfgRsp
;
int32_t
tSerializeSDbCfgRsp
(
void
*
buf
,
int32_t
bufLen
,
const
SDbCfgRsp
*
pRsp
);
int32_t
tDeserializeSDbCfgRsp
(
void
*
buf
,
int32_t
bufLen
,
SDbCfgRsp
*
pRsp
);
typedef
struct
{
int32_t
rowNum
;
}
SQnodeListReq
;
...
...
include/common/tmsgdef.h
浏览文件 @
9f4744c6
...
...
@@ -155,6 +155,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_MND_CREATE_STREAM
,
"mnode-create-stream"
,
SCMCreateStreamReq
,
SCMCreateStreamRsp
)
TD_DEF_MSG_TYPE
(
TDMT_MND_ALTER_STREAM
,
"mnode-alter-stream"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_STREAM
,
"mnode-drop-stream"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_GET_DB_CFG
,
"mnode-get-db-cfg"
,
NULL
,
NULL
)
// Requests handled by VNODE
TD_NEW_MSG_SEG
(
TDMT_VND_MSG
)
...
...
include/libs/catalog/catalog.h
浏览文件 @
9f4744c6
...
...
@@ -77,6 +77,8 @@ typedef struct SDbVgVersion {
int32_t
numOfTable
;
// unit is TSDB_TABLE_NUM_UNIT
}
SDbVgVersion
;
typedef
SDbCfgRsp
SDbCfgInfo
;
int32_t
catalogInit
(
SCatalogCfg
*
cfg
);
/**
...
...
@@ -217,6 +219,8 @@ int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion **stable
int32_t
catalogGetExpiredDBs
(
SCatalog
*
pCatalog
,
SDbVgVersion
**
dbs
,
uint32_t
*
num
);
int32_t
catalogGetDBCfg
(
SCatalog
*
pCtg
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
dbFName
,
SDbCfgInfo
*
pDbCfg
);
/**
* Destroy catalog and relase all resources
...
...
source/common/src/tmsg.c
浏览文件 @
9f4744c6
...
...
@@ -1515,6 +1515,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
if
(
tEncodeI32
(
&
encoder
,
pReq
->
maxRows
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
commitTime
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
fsyncPeriod
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
ttl
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
walLevel
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
precision
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
compression
)
<
0
)
return
-
1
;
...
...
@@ -1524,6 +1525,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
if
(
tEncodeI8
(
&
encoder
,
pReq
->
cacheLastRow
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
ignoreExist
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
streamMode
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
singleSTable
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
numOfRetensions
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pReq
->
numOfRetensions
;
++
i
)
{
SRetention
*
pRetension
=
taosArrayGet
(
pReq
->
pRetensions
,
i
);
...
...
@@ -1556,6 +1558,7 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
maxRows
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
commitTime
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
fsyncPeriod
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
ttl
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
walLevel
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
precision
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
compression
)
<
0
)
return
-
1
;
...
...
@@ -1565,6 +1568,7 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
cacheLastRow
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
ignoreExist
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
streamMode
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
singleSTable
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
numOfRetensions
)
<
0
)
return
-
1
;
pReq
->
pRetensions
=
taosArrayInit
(
pReq
->
numOfRetensions
,
sizeof
(
SRetention
));
if
(
pReq
->
pRetensions
==
NULL
)
{
...
...
@@ -1942,6 +1946,94 @@ void tFreeSUseDbBatchRsp(SUseDbBatchRsp *pRsp) {
taosArrayDestroy
(
pRsp
->
pArray
);
}
int32_t
tSerializeSDbCfgReq
(
void
*
buf
,
int32_t
bufLen
,
SDbCfgReq
*
pReq
)
{
SCoder
encoder
=
{
0
};
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
&
encoder
,
pReq
->
db
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tCoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeSDbCfgReq
(
void
*
buf
,
int32_t
bufLen
,
SDbCfgReq
*
pReq
)
{
SCoder
decoder
=
{
0
};
tCoderInit
(
&
decoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_DECODER
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
&
decoder
,
pReq
->
db
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tCoderClear
(
&
decoder
);
return
0
;
}
int32_t
tSerializeSDbCfgRsp
(
void
*
buf
,
int32_t
bufLen
,
const
SDbCfgRsp
*
pRsp
)
{
SCoder
encoder
=
{
0
};
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
numOfVgroups
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
cacheBlockSize
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
totalBlocks
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
daysPerFile
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
daysToKeep0
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
daysToKeep1
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
daysToKeep2
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
minRows
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
maxRows
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
commitTime
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
fsyncPeriod
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRsp
->
walLevel
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRsp
->
precision
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRsp
->
compression
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRsp
->
replications
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRsp
->
quorum
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRsp
->
update
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRsp
->
cacheLastRow
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pRsp
->
streamMode
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tCoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeSDbCfgRsp
(
void
*
buf
,
int32_t
bufLen
,
SDbCfgRsp
*
pRsp
)
{
SCoder
decoder
=
{
0
};
tCoderInit
(
&
decoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_DECODER
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
numOfVgroups
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
cacheBlockSize
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
totalBlocks
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
daysPerFile
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
daysToKeep0
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
daysToKeep1
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
daysToKeep2
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
minRows
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
maxRows
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
commitTime
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
fsyncPeriod
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pRsp
->
walLevel
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pRsp
->
precision
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pRsp
->
compression
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pRsp
->
replications
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pRsp
->
quorum
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pRsp
->
update
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pRsp
->
cacheLastRow
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pRsp
->
streamMode
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tCoderClear
(
&
decoder
);
return
0
;
}
int32_t
tSerializeSShowReq
(
void
*
buf
,
int32_t
bufLen
,
SShowReq
*
pReq
)
{
SCoder
encoder
=
{
0
};
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
...
...
source/dnode/mgmt/mm/mmHandle.c
浏览文件 @
9f4744c6
...
...
@@ -146,6 +146,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) {
dndSetMsgHandle
(
pWrapper
,
TDMT_MND_GET_SUB_EP
,
mmProcessReadMsg
,
DEFAULT_HANDLE
);
dndSetMsgHandle
(
pWrapper
,
TDMT_MND_CREATE_STREAM
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_TASK_DEPLOY_RSP
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
dndSetMsgHandle
(
pWrapper
,
TDMT_MND_GET_DB_CFG
,
mmProcessReadMsg
,
DEFAULT_HANDLE
);
// Requests handled by VNODE
dndSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_SET_CONN_RSP
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
9f4744c6
...
...
@@ -260,6 +260,7 @@ typedef struct {
int32_t
maxRows
;
int32_t
commitTime
;
int32_t
fsyncPeriod
;
int32_t
ttl
;
int8_t
walLevel
;
int8_t
precision
;
int8_t
compression
;
...
...
@@ -268,6 +269,7 @@ typedef struct {
int8_t
update
;
int8_t
cacheLastRow
;
int8_t
streamMode
;
int8_t
singleSTable
;
int32_t
numOfRetensions
;
SArray
*
pRetensions
;
}
SDbCfg
;
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
9f4744c6
...
...
@@ -40,6 +40,7 @@ static int32_t mndProcessCompactDbReq(SNodeMsg *pReq);
static
int32_t
mndGetDbMeta
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
STableMetaRsp
*
pMeta
);
static
int32_t
mndRetrieveDbs
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
);
static
void
mndCancelGetNextDb
(
SMnode
*
pMnode
,
void
*
pIter
);
static
int32_t
mndProcessGetDbCfgReq
(
SNodeMsg
*
pReq
);
int32_t
mndInitDb
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_DB
,
...
...
@@ -56,6 +57,7 @@ int32_t mndInitDb(SMnode *pMnode) {
mndSetMsgHandle
(
pMnode
,
TDMT_MND_USE_DB
,
mndProcessUseDbReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_SYNC_DB
,
mndProcessSyncDbReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_COMPACT_DB
,
mndProcessCompactDbReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_GET_DB_CFG
,
mndProcessGetDbCfgReq
);
mndAddShowMetaHandle
(
pMnode
,
TSDB_MGMT_TABLE_DB
,
mndGetDbMeta
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_DB
,
mndRetrieveDbs
);
...
...
@@ -268,6 +270,7 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
if
(
pCfg
->
minRows
>
pCfg
->
maxRows
)
return
-
1
;
if
(
pCfg
->
commitTime
<
TSDB_MIN_COMMIT_TIME
||
pCfg
->
commitTime
>
TSDB_MAX_COMMIT_TIME
)
return
-
1
;
if
(
pCfg
->
fsyncPeriod
<
TSDB_MIN_FSYNC_PERIOD
||
pCfg
->
fsyncPeriod
>
TSDB_MAX_FSYNC_PERIOD
)
return
-
1
;
if
(
pCfg
->
ttl
<
TSDB_MIN_DB_TTL_OPTION
&&
pCfg
->
ttl
!=
TSDB_DEFAULT_DB_TTL_OPTION
)
return
-
1
;
if
(
pCfg
->
walLevel
<
TSDB_MIN_WAL_LEVEL
||
pCfg
->
walLevel
>
TSDB_MAX_WAL_LEVEL
)
return
-
1
;
if
(
pCfg
->
precision
<
TSDB_MIN_PRECISION
&&
pCfg
->
precision
>
TSDB_MAX_PRECISION
)
return
-
1
;
if
(
pCfg
->
compression
<
TSDB_MIN_COMP_LEVEL
||
pCfg
->
compression
>
TSDB_MAX_COMP_LEVEL
)
return
-
1
;
...
...
@@ -278,6 +281,7 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
if
(
pCfg
->
update
<
TSDB_MIN_DB_UPDATE
||
pCfg
->
update
>
TSDB_MAX_DB_UPDATE
)
return
-
1
;
if
(
pCfg
->
cacheLastRow
<
TSDB_MIN_DB_CACHE_LAST_ROW
||
pCfg
->
cacheLastRow
>
TSDB_MAX_DB_CACHE_LAST_ROW
)
return
-
1
;
if
(
pCfg
->
streamMode
<
TSDB_MIN_DB_STREAM_MODE
||
pCfg
->
streamMode
>
TSDB_MAX_DB_STREAM_MODE
)
return
-
1
;
if
(
pCfg
->
singleSTable
<
TSDB_MIN_DB_SINGLE_STABLE_OPTION
||
pCfg
->
streamMode
>
TSDB_MAX_DB_SINGLE_STABLE_OPTION
)
return
-
1
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -293,6 +297,7 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if
(
pCfg
->
maxRows
<
0
)
pCfg
->
maxRows
=
TSDB_DEFAULT_MAX_ROW_FBLOCK
;
if
(
pCfg
->
commitTime
<
0
)
pCfg
->
commitTime
=
TSDB_DEFAULT_COMMIT_TIME
;
if
(
pCfg
->
fsyncPeriod
<
0
)
pCfg
->
fsyncPeriod
=
TSDB_DEFAULT_FSYNC_PERIOD
;
if
(
pCfg
->
ttl
<
0
)
pCfg
->
ttl
=
TSDB_DEFAULT_DB_TTL_OPTION
;
if
(
pCfg
->
walLevel
<
0
)
pCfg
->
walLevel
=
TSDB_DEFAULT_WAL_LEVEL
;
if
(
pCfg
->
precision
<
0
)
pCfg
->
precision
=
TSDB_DEFAULT_PRECISION
;
if
(
pCfg
->
compression
<
0
)
pCfg
->
compression
=
TSDB_DEFAULT_COMP_LEVEL
;
...
...
@@ -301,6 +306,7 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if
(
pCfg
->
update
<
0
)
pCfg
->
update
=
TSDB_DEFAULT_DB_UPDATE_OPTION
;
if
(
pCfg
->
cacheLastRow
<
0
)
pCfg
->
cacheLastRow
=
TSDB_DEFAULT_CACHE_LAST_ROW
;
if
(
pCfg
->
streamMode
<
0
)
pCfg
->
streamMode
=
TSDB_DEFAULT_DB_STREAM_MODE
;
if
(
pCfg
->
singleSTable
<
0
)
pCfg
->
singleSTable
=
TSDB_DEFAULT_DB_SINGLE_STABLE_OPTION
;
if
(
pCfg
->
numOfRetensions
<
0
)
pCfg
->
numOfRetensions
=
0
;
}
...
...
@@ -437,6 +443,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SNodeMsg *pReq, SCreateDbReq *pCreate
.
maxRows
=
pCreate
->
maxRows
,
.
commitTime
=
pCreate
->
commitTime
,
.
fsyncPeriod
=
pCreate
->
fsyncPeriod
,
.
ttl
=
pCreate
->
ttl
,
.
walLevel
=
pCreate
->
walLevel
,
.
precision
=
pCreate
->
precision
,
.
compression
=
pCreate
->
compression
,
...
...
@@ -445,6 +452,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SNodeMsg *pReq, SCreateDbReq *pCreate
.
update
=
pCreate
->
update
,
.
cacheLastRow
=
pCreate
->
cacheLastRow
,
.
streamMode
=
pCreate
->
streamMode
,
.
singleSTable
=
pCreate
->
singleSTable
,
};
dbObj
.
cfg
.
numOfRetensions
=
pCreate
->
numOfRetensions
;
...
...
@@ -730,6 +738,71 @@ ALTER_DB_OVER:
return
code
;
}
static
int32_t
mndProcessGetDbCfgReq
(
SNodeMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
pNode
;
int32_t
code
=
-
1
;
SDbObj
*
pDb
=
NULL
;
SDbCfgReq
cfgReq
=
{
0
};
SDbCfgRsp
cfgRsp
=
{
0
};
if
(
tDeserializeSDbCfgReq
(
pReq
->
rpcMsg
.
pCont
,
pReq
->
rpcMsg
.
contLen
,
&
cfgReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
goto
GET_DB_CFG_OVER
;
}
pDb
=
mndAcquireDb
(
pMnode
,
cfgReq
.
db
);
if
(
pDb
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DB_NOT_EXIST
;
goto
GET_DB_CFG_OVER
;
}
cfgRsp
.
numOfVgroups
=
pDb
->
cfg
.
numOfVgroups
;
cfgRsp
.
cacheBlockSize
=
pDb
->
cfg
.
cacheBlockSize
;
cfgRsp
.
totalBlocks
=
pDb
->
cfg
.
totalBlocks
;
cfgRsp
.
daysPerFile
=
pDb
->
cfg
.
daysPerFile
;
cfgRsp
.
daysToKeep0
=
pDb
->
cfg
.
daysToKeep0
;
cfgRsp
.
daysToKeep1
=
pDb
->
cfg
.
daysToKeep1
;
cfgRsp
.
daysToKeep2
=
pDb
->
cfg
.
daysToKeep2
;
cfgRsp
.
minRows
=
pDb
->
cfg
.
minRows
;
cfgRsp
.
maxRows
=
pDb
->
cfg
.
maxRows
;
cfgRsp
.
commitTime
=
pDb
->
cfg
.
commitTime
;
cfgRsp
.
fsyncPeriod
=
pDb
->
cfg
.
fsyncPeriod
;
cfgRsp
.
ttl
=
pDb
->
cfg
.
ttl
;
cfgRsp
.
walLevel
=
pDb
->
cfg
.
walLevel
;
cfgRsp
.
precision
=
pDb
->
cfg
.
precision
;
cfgRsp
.
compression
=
pDb
->
cfg
.
compression
;
cfgRsp
.
replications
=
pDb
->
cfg
.
replications
;
cfgRsp
.
quorum
=
pDb
->
cfg
.
quorum
;
cfgRsp
.
update
=
pDb
->
cfg
.
update
;
cfgRsp
.
cacheLastRow
=
pDb
->
cfg
.
cacheLastRow
;
cfgRsp
.
streamMode
=
pDb
->
cfg
.
streamMode
;
cfgRsp
.
singleSTable
=
pDb
->
cfg
.
singleSTable
;
int32_t
contLen
=
tSerializeSDbCfgRsp
(
NULL
,
0
,
&
cfgRsp
);
void
*
pRsp
=
rpcMallocCont
(
contLen
);
if
(
pRsp
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
-
1
;
goto
GET_DB_CFG_OVER
;
}
tSerializeSDbCfgRsp
(
pRsp
,
contLen
,
&
cfgRsp
);
pReq
->
pRsp
=
pRsp
;
pReq
->
rspLen
=
contLen
;
GET_DB_CFG_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_MND_ACTION_IN_PROGRESS
)
{
mError
(
"db:%s, failed to get cfg since %s"
,
cfgReq
.
db
,
terrstr
());
}
mndReleaseDb
(
pMnode
,
pDb
);
return
code
;
}
static
int32_t
mndSetDropDbRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
)
{
SSdbRaw
*
pRedoRaw
=
mndDbActionEncode
(
pDb
);
if
(
pRedoRaw
==
NULL
)
return
-
1
;
...
...
@@ -1509,6 +1582,23 @@ static void dumpDbInfoToPayload(char *data, SDbObj *pDb, SShowObj *pShow, int32_
STR_WITH_SIZE_TO_VARSTR
(
pWrite
,
prec
,
2
);
cols
++
;
pWrite
=
getDataPosition
(
data
,
pShow
,
cols
,
rows
,
rowCapacity
);
*
(
int32_t
*
)
pWrite
=
pDb
->
cfg
.
ttl
;
cols
++
;
pWrite
=
getDataPosition
(
data
,
pShow
,
cols
,
rows
,
rowCapacity
);
*
(
int8_t
*
)
pWrite
=
pDb
->
cfg
.
singleSTable
;
cols
++
;
pWrite
=
getDataPosition
(
data
,
pShow
,
cols
,
rows
,
rowCapacity
);
*
(
int8_t
*
)
pWrite
=
pDb
->
cfg
.
streamMode
;
cols
++
;
pWrite
=
getDataPosition
(
data
,
pShow
,
cols
,
rows
,
rowCapacity
);
char
*
status
=
"ready"
;
STR_WITH_SIZE_TO_VARSTR
(
pWrite
,
status
,
strlen
(
status
));
cols
++
;
// pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity);
// *(int8_t *)pWrite = pDb->cfg.update;
}
...
...
source/dnode/mnode/impl/src/mndInfoSchema.c
浏览文件 @
9f4744c6
...
...
@@ -63,7 +63,11 @@ static const SInfosTableSchema userDBSchema[] = {
{.
name
=
"fsync"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"comp"
,
.
bytes
=
1
,
.
type
=
TSDB_DATA_TYPE_TINYINT
},
{.
name
=
"cachelast"
,
.
bytes
=
1
,
.
type
=
TSDB_DATA_TYPE_TINYINT
},
{.
name
=
"precision"
,
.
bytes
=
3
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"precision"
,
.
bytes
=
2
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"ttl"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"single_stable"
,
.
bytes
=
1
,
.
type
=
TSDB_DATA_TYPE_TINYINT
},
{.
name
=
"stream_mode"
,
.
bytes
=
1
,
.
type
=
TSDB_DATA_TYPE_TINYINT
},
{.
name
=
"status"
,
.
bytes
=
10
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
// {.name = "update", .bytes = 1, .type =
// TSDB_DATA_TYPE_TINYINT}, // disable update
};
...
...
source/libs/catalog/src/catalog.c
浏览文件 @
9f4744c6
...
...
@@ -569,6 +569,44 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtE
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgGetDBCfgFromMnode
(
SCatalog
*
pCtg
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
dbFName
,
SDbCfgInfo
*
out
)
{
char
*
msg
=
NULL
;
int32_t
msgLen
=
0
;
ctgDebug
(
"try to get db cfg from mnode, dbFName:%s"
,
dbFName
);
int32_t
code
=
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_GET_DB_CFG
)]((
void
*
)
dbFName
,
&
msg
,
0
,
&
msgLen
);
if
(
code
)
{
ctgError
(
"Build get db cfg msg failed, code:%x, db:%s"
,
code
,
dbFName
);
CTG_ERR_RET
(
code
);
}
SRpcMsg
rpcMsg
=
{
.
msgType
=
TDMT_MND_GET_DB_CFG
,
.
pCont
=
msg
,
.
contLen
=
msgLen
,
};
SRpcMsg
rpcRsp
=
{
0
};
rpcSendRecv
(
pRpc
,
(
SEpSet
*
)
pMgmtEps
,
&
rpcMsg
,
&
rpcRsp
);
if
(
TSDB_CODE_SUCCESS
!=
rpcRsp
.
code
)
{
ctgError
(
"error rsp for get db cfg, error:%s, db:%s"
,
tstrerror
(
rpcRsp
.
code
),
dbFName
);
CTG_ERR_RET
(
rpcRsp
.
code
);
}
code
=
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_GET_DB_CFG
)](
out
,
rpcRsp
.
pCont
,
rpcRsp
.
contLen
);
if
(
code
)
{
ctgError
(
"Process get db cfg rsp failed, code:%x, db:%s"
,
code
,
dbFName
);
CTG_ERR_RET
(
code
);
}
ctgDebug
(
"Got db cfg from mnode, dbFName:%s"
,
dbFName
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
ctgIsTableMetaExistInCache
(
SCatalog
*
pCtg
,
char
*
dbFName
,
char
*
tbName
,
int32_t
*
exist
)
{
if
(
NULL
==
pCtg
->
dbCache
)
{
*
exist
=
0
;
...
...
@@ -2137,7 +2175,6 @@ _return:
CTG_RET
(
code
);
}
int32_t
catalogInit
(
SCatalogCfg
*
cfg
)
{
if
(
gCtgMgmt
.
pCluster
)
{
qError
(
"catalog already initialized"
);
...
...
@@ -2717,6 +2754,15 @@ int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbVgVersion **dbs, uint32_t *num)
CTG_API_LEAVE
(
ctgMetaRentGet
(
&
pCtg
->
dbRent
,
(
void
**
)
dbs
,
num
,
sizeof
(
SDbVgVersion
)));
}
int32_t
catalogGetDBCfg
(
SCatalog
*
pCtg
,
void
*
pRpc
,
const
SEpSet
*
pMgmtEps
,
const
char
*
dbFName
,
SDbCfgInfo
*
pDbCfg
)
{
CTG_API_ENTER
();
if
(
NULL
==
pCtg
||
NULL
==
pRpc
||
NULL
==
pMgmtEps
||
NULL
==
dbFName
||
NULL
==
pDbCfg
)
{
CTG_API_LEAVE
(
TSDB_CODE_CTG_INVALID_INPUT
);
}
CTG_API_LEAVE
(
ctgGetDBCfgFromMnode
(
pCtg
,
pRpc
,
pMgmtEps
,
dbFName
,
pDbCfg
));
}
void
catalogDestroy
(
void
)
{
qInfo
(
"start to destroy catalog"
);
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
9f4744c6
...
...
@@ -984,6 +984,8 @@ static int32_t buildCreateDbReq(STranslateContext* pCxt, SCreateDatabaseStmt* pS
pReq
->
cacheLastRow
=
GET_OPTION_VAL
(
pStmt
->
pOptions
->
pCachelast
,
TSDB_DEFAULT_CACHE_LAST_ROW
);
pReq
->
ignoreExist
=
pStmt
->
ignoreExists
;
pReq
->
streamMode
=
GET_OPTION_VAL
(
pStmt
->
pOptions
->
pStreamMode
,
TSDB_DEFAULT_DB_STREAM_MODE_OPTION
);
pReq
->
ttl
=
GET_OPTION_VAL
(
pStmt
->
pOptions
->
pTtl
,
TSDB_DEFAULT_DB_TTL_OPTION
);
pReq
->
singleSTable
=
GET_OPTION_VAL
(
pStmt
->
pOptions
->
pSingleStable
,
TSDB_DEFAULT_DB_SINGLE_STABLE_OPTION
);
return
buildCreateDbRetentions
(
pStmt
->
pOptions
->
pRetentions
,
pReq
);
}
...
...
source/libs/qcom/src/querymsg.c
浏览文件 @
9f4744c6
...
...
@@ -121,6 +121,23 @@ int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryBuildGetDBCfgMsg
(
void
*
input
,
char
**
msg
,
int32_t
msgSize
,
int32_t
*
msgLen
)
{
if
(
NULL
==
msg
||
NULL
==
msgLen
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
SDbCfgReq
dbCfgReq
=
{
0
};
strcpy
(
dbCfgReq
.
db
,
input
);
int32_t
bufLen
=
tSerializeSDbCfgReq
(
NULL
,
0
,
&
dbCfgReq
);
void
*
pBuf
=
rpcMallocCont
(
bufLen
);
tSerializeSDbCfgReq
(
pBuf
,
bufLen
,
&
dbCfgReq
);
*
msg
=
pBuf
;
*
msgLen
=
bufLen
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
queryProcessUseDBRsp
(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
{
SUseDbOutput
*
pOut
=
output
;
...
...
@@ -309,17 +326,36 @@ PROCESS_QLIST_OVER:
return
code
;
}
int32_t
queryProcessGetDbCfgRsp
(
void
*
output
,
char
*
msg
,
int32_t
msgSize
)
{
SDbCfgRsp
out
=
{
0
};
if
(
NULL
==
output
||
NULL
==
msg
||
msgSize
<=
0
)
{
return
TSDB_CODE_TSC_INVALID_INPUT
;
}
if
(
tDeserializeSDbCfgRsp
(
msg
,
msgSize
,
&
out
)
!=
0
)
{
qError
(
"tDeserializeSDbCfgRsp failed, msgSize:%d"
,
msgSize
);
return
TSDB_CODE_INVALID_MSG
;
}
memcpy
(
output
,
&
out
,
sizeof
(
out
));
return
TSDB_CODE_SUCCESS
;
}
void
initQueryModuleMsgHandle
()
{
queryBuildMsg
[
TMSG_INDEX
(
TDMT_VND_TABLE_META
)]
=
queryBuildTableMetaReqMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_TABLE_META
)]
=
queryBuildTableMetaReqMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_USE_DB
)]
=
queryBuildUseDbMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_USE_DB
)]
=
queryBuildUseDbMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_QNODE_LIST
)]
=
queryBuildQnodeListMsg
;
queryBuildMsg
[
TMSG_INDEX
(
TDMT_MND_GET_DB_CFG
)]
=
queryBuildGetDBCfgMsg
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_VND_TABLE_META
)]
=
queryProcessTableMetaRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_TABLE_META
)]
=
queryProcessTableMetaRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_USE_DB
)]
=
queryProcessUseDBRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_USE_DB
)]
=
queryProcessUseDBRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_QNODE_LIST
)]
=
queryProcessQnodeListRsp
;
queryProcessMsgRsp
[
TMSG_INDEX
(
TDMT_MND_GET_DB_CFG
)]
=
queryProcessGetDbCfgRsp
;
}
#pragma GCC diagnostic pop
source/libs/scheduler/src/scheduler.c
浏览文件 @
9f4744c6
...
...
@@ -1251,8 +1251,8 @@ int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **
return
TSDB_CODE_SUCCESS
;
}
int32_t
schUpdateTaskExecNodeHandle
(
SSchTask
*
pTask
,
void
*
handle
)
{
if
(
NULL
==
pTask
->
execNodes
||
taosArrayGetSize
(
pTask
->
execNodes
)
>
1
||
taosArrayGetSize
(
pTask
->
execNodes
)
<=
0
)
{
int32_t
schUpdateTaskExecNodeHandle
(
SSchTask
*
pTask
,
void
*
handle
,
int32_t
rspCode
)
{
if
(
rspCode
||
NULL
==
pTask
->
execNodes
||
taosArrayGetSize
(
pTask
->
execNodes
)
>
1
||
taosArrayGetSize
(
pTask
->
execNodes
)
<=
0
)
{
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1293,7 +1293,7 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in
SCH_TASK_DLOG
(
"rsp msg received, type:%s, handle:%p, code:%s"
,
TMSG_INFO
(
msgType
),
pMsg
->
handle
,
tstrerror
(
rspCode
));
SCH_SET_TASK_HANDLE
(
pTask
,
pMsg
->
handle
);
schUpdateTaskExecNodeHandle
(
pTask
,
pMsg
->
handle
);
schUpdateTaskExecNodeHandle
(
pTask
,
pMsg
->
handle
,
rspCode
);
SCH_ERR_JRET
(
schHandleResponseMsg
(
pJob
,
pTask
,
msgType
,
pMsg
->
pData
,
pMsg
->
len
,
rspCode
));
_return:
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录