Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
28fa84b1
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
28fa84b1
编写于
7月 11, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of
https://github.com/taosdata/TDengine
into feat/row_refact
上级
85b4896f
e41169ea
变更
24
隐藏空白更改
内联
并排
Showing
24 changed file
with
598 addition
and
130 deletion
+598
-130
include/common/tmsg.h
include/common/tmsg.h
+14
-0
include/common/tmsgdef.h
include/common/tmsgdef.h
+4
-3
include/util/tRealloc.h
include/util/tRealloc.h
+4
-3
source/common/src/tmsg.c
source/common/src/tmsg.c
+50
-0
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
+1
-0
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+1
-0
source/dnode/mnode/impl/inc/mndPrivilege.h
source/dnode/mnode/impl/inc/mndPrivilege.h
+1
-0
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+75
-2
source/dnode/mnode/impl/src/mndMain.c
source/dnode/mnode/impl/src/mndMain.c
+1
-2
source/dnode/mnode/impl/src/mndPrivilege.c
source/dnode/mnode/impl/src/mndPrivilege.c
+2
-1
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+10
-10
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+1
-1
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+35
-33
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+1
-0
source/dnode/vnode/src/tsdb/tsdbFS.c
source/dnode/vnode/src/tsdb/tsdbFS.c
+8
-0
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
+111
-0
source/dnode/vnode/src/tsdb/tsdbRetention.c
source/dnode/vnode/src/tsdb/tsdbRetention.c
+67
-5
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
+115
-46
source/dnode/vnode/src/tsdb/tsdbUtil.c
source/dnode/vnode/src/tsdb/tsdbUtil.c
+31
-11
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+1
-1
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+34
-5
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+6
-0
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+24
-6
tests/script/tsim/valgrind/checkError1.sim
tests/script/tsim/valgrind/checkError1.sim
+1
-1
未找到文件。
include/common/tmsg.h
浏览文件 @
28fa84b1
...
...
@@ -822,6 +822,20 @@ typedef struct {
int32_t
tSerializeSTrimDbReq
(
void
*
buf
,
int32_t
bufLen
,
STrimDbReq
*
pReq
);
int32_t
tDeserializeSTrimDbReq
(
void
*
buf
,
int32_t
bufLen
,
STrimDbReq
*
pReq
);
typedef
struct
{
int32_t
timestamp
;
}
SVTrimDbReq
;
int32_t
tSerializeSVTrimDbReq
(
void
*
buf
,
int32_t
bufLen
,
SVTrimDbReq
*
pReq
);
int32_t
tDeserializeSVTrimDbReq
(
void
*
buf
,
int32_t
bufLen
,
SVTrimDbReq
*
pReq
);
typedef
struct
{
int32_t
timestamp
;
}
SVDropTtlTableReq
;
int32_t
tSerializeSVDropTtlTableReq
(
void
*
buf
,
int32_t
bufLen
,
SVDropTtlTableReq
*
pReq
);
int32_t
tDeserializeSVDropTtlTableReq
(
void
*
buf
,
int32_t
bufLen
,
SVDropTtlTableReq
*
pReq
);
typedef
struct
{
int32_t
numOfVgroups
;
int32_t
numOfStables
;
...
...
include/common/tmsgdef.h
浏览文件 @
28fa84b1
...
...
@@ -199,9 +199,10 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_VND_ALTER_REPLICA
,
"alter-replica"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_ALTER_CONFIRM
,
"alter-confirm"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_ALTER_HASHRANGE
,
"alter-hashrange"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_COMPACT
,
"compact"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_DROP_TTL_TABLE
,
"drop-ttl-stb"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_COMMIT
,
"commit vnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_COMPACT
,
"vnode-compact"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_DROP_TTL_TABLE
,
"vnode-drop-ttl-stb"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TRIM
,
"vnode-trim"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_COMMIT
,
"vnode-commit"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MAX_MSG
,
"vnd-max"
,
NULL
,
NULL
)
TD_NEW_MSG_SEG
(
TDMT_SCH_MSG
)
...
...
include/util/tRealloc.h
浏览文件 @
28fa84b1
...
...
@@ -25,10 +25,11 @@ extern "C" {
static
FORCE_INLINE
int32_t
tRealloc
(
uint8_t
**
ppBuf
,
int64_t
size
)
{
int32_t
code
=
0
;
int64_t
bsize
=
0
;
uint8_t
*
pBuf
;
uint8_t
*
pBuf
=
NULL
;
if
(
*
ppBuf
)
{
bsize
=
*
(
int64_t
*
)((
*
ppBuf
)
-
sizeof
(
int64_t
));
pBuf
=
(
*
ppBuf
)
-
sizeof
(
int64_t
);
bsize
=
*
(
int64_t
*
)
pBuf
;
}
if
(
bsize
>=
size
)
goto
_exit
;
...
...
@@ -38,7 +39,7 @@ static FORCE_INLINE int32_t tRealloc(uint8_t **ppBuf, int64_t size) {
bsize
*=
2
;
}
pBuf
=
(
uint8_t
*
)
taosMemoryRealloc
(
*
ppBuf
?
(
*
ppBuf
)
-
sizeof
(
int64_t
)
:
*
p
pBuf
,
bsize
+
sizeof
(
int64_t
));
pBuf
=
(
uint8_t
*
)
taosMemoryRealloc
(
pBuf
,
bsize
+
sizeof
(
int64_t
));
if
(
pBuf
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_exit
;
...
...
source/common/src/tmsg.c
浏览文件 @
28fa84b1
...
...
@@ -2672,6 +2672,56 @@ int32_t tDeserializeSTrimDbReq(void *buf, int32_t bufLen, STrimDbReq *pReq) {
return
0
;
}
int32_t
tSerializeSVTrimDbReq
(
void
*
buf
,
int32_t
bufLen
,
SVTrimDbReq
*
pReq
)
{
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
timestamp
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tEncoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeSVTrimDbReq
(
void
*
buf
,
int32_t
bufLen
,
SVTrimDbReq
*
pReq
)
{
SDecoder
decoder
=
{
0
};
tDecoderInit
(
&
decoder
,
buf
,
bufLen
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
timestamp
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
return
0
;
}
int32_t
tSerializeSVDropTtlTableReq
(
void
*
buf
,
int32_t
bufLen
,
SVDropTtlTableReq
*
pReq
)
{
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
timestamp
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tEncoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeSVDropTtlTableReq
(
void
*
buf
,
int32_t
bufLen
,
SVDropTtlTableReq
*
pReq
)
{
SDecoder
decoder
=
{
0
};
tDecoderInit
(
&
decoder
,
buf
,
bufLen
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
timestamp
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
return
0
;
}
int32_t
tSerializeSDbCfgRsp
(
void
*
buf
,
int32_t
bufLen
,
const
SDbCfgRsp
*
pRsp
)
{
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
...
...
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
浏览文件 @
28fa84b1
...
...
@@ -171,6 +171,7 @@ SArray *mmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_USE_DB
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_ALTER_DB
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_COMPACT_DB
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_TRIM_DB
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_GET_DB_CFG
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_VGROUP_LIST
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_REDISTRIBUTE_VGROUP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
28fa84b1
...
...
@@ -371,6 +371,7 @@ SArray *vmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_CONFIRM
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_HASHRANGE
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_COMPACT
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_TRIM
,
vmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_CREATE_VNODE
,
vmPutMsgToMgmtQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_DROP_VNODE
,
vmPutMsgToMgmtQueue
,
0
)
==
NULL
)
goto
_OVER
;
...
...
source/dnode/mnode/impl/inc/mndPrivilege.h
浏览文件 @
28fa84b1
...
...
@@ -54,6 +54,7 @@ typedef enum {
MND_OPER_ALTER_DB
,
MND_OPER_DROP_DB
,
MND_OPER_COMPACT_DB
,
MND_OPER_TRIM_DB
,
MND_OPER_USE_DB
,
MND_OPER_WRITE_DB
,
MND_OPER_READ_DB
,
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
28fa84b1
...
...
@@ -42,6 +42,7 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq);
static
int32_t
mndProcessDropDbReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessUseDbReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessCompactDbReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessTrimDbReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndRetrieveDbs
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rowsCapacity
);
static
void
mndCancelGetNextDb
(
SMnode
*
pMnode
,
void
*
pIter
);
static
int32_t
mndProcessGetDbCfgReq
(
SRpcMsg
*
pReq
);
...
...
@@ -62,6 +63,7 @@ int32_t mndInitDb(SMnode *pMnode) {
mndSetMsgHandle
(
pMnode
,
TDMT_MND_DROP_DB
,
mndProcessDropDbReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_USE_DB
,
mndProcessUseDbReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_COMPACT_DB
,
mndProcessCompactDbReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_TRIM_DB
,
mndProcessTrimDbReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_GET_DB_CFG
,
mndProcessGetDbCfgReq
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_DB
,
mndRetrieveDbs
);
...
...
@@ -1268,6 +1270,8 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
return
0
;
}
static
int32_t
mndCompactDb
(
SMnode
*
pMnode
,
SDbObj
*
pDb
)
{
return
0
;
}
static
int32_t
mndProcessCompactDbReq
(
SRpcMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
info
.
node
;
int32_t
code
=
-
1
;
...
...
@@ -1279,7 +1283,7 @@ static int32_t mndProcessCompactDbReq(SRpcMsg *pReq) {
goto
_OVER
;
}
mDebug
(
"db:%s, start to
sync
"
,
compactReq
.
db
);
mDebug
(
"db:%s, start to
compact
"
,
compactReq
.
db
);
pDb
=
mndAcquireDb
(
pMnode
,
compactReq
.
db
);
if
(
pDb
==
NULL
)
{
...
...
@@ -1290,7 +1294,7 @@ static int32_t mndProcessCompactDbReq(SRpcMsg *pReq) {
goto
_OVER
;
}
// code = mndCompactDb(
);
code
=
mndCompactDb
(
pMnode
,
pDb
);
_OVER:
if
(
code
!=
0
)
{
...
...
@@ -1301,6 +1305,75 @@ _OVER:
return
code
;
}
static
int32_t
mndTrimDb
(
SMnode
*
pMnode
,
SDbObj
*
pDb
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SVgObj
*
pVgroup
=
NULL
;
void
*
pIter
=
NULL
;
SVTrimDbReq
trimReq
=
{.
timestamp
=
taosGetTimestampSec
()};
int32_t
reqLen
=
tSerializeSVTrimDbReq
(
NULL
,
0
,
&
trimReq
);
int32_t
contLen
=
reqLen
+
sizeof
(
SMsgHead
);
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pIter
==
NULL
)
break
;
SMsgHead
*
pHead
=
rpcMallocCont
(
contLen
);
if
(
pHead
==
NULL
)
{
sdbCancelFetch
(
pSdb
,
pVgroup
);
sdbRelease
(
pSdb
,
pVgroup
);
continue
;
}
pHead
->
contLen
=
htonl
(
contLen
);
pHead
->
vgId
=
htonl
(
pVgroup
->
vgId
);
tSerializeSVTrimDbReq
((
char
*
)
pHead
+
sizeof
(
SMsgHead
),
contLen
,
&
trimReq
);
SRpcMsg
rpcMsg
=
{.
msgType
=
TDMT_VND_TRIM
,
.
pCont
=
pHead
,
.
contLen
=
contLen
};
SEpSet
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
int32_t
code
=
tmsgSendReq
(
&
epSet
,
&
rpcMsg
);
if
(
code
!=
0
)
{
mError
(
"vgId:%d, failed to send vnode-trim request to vnode since 0x%x"
,
pVgroup
->
vgId
,
code
);
}
else
{
mDebug
(
"vgId:%d, send vnode-trim request to vnode, time:%d"
,
pVgroup
->
vgId
,
trimReq
.
timestamp
);
}
sdbRelease
(
pSdb
,
pVgroup
);
}
return
0
;
}
static
int32_t
mndProcessTrimDbReq
(
SRpcMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
info
.
node
;
int32_t
code
=
-
1
;
SDbObj
*
pDb
=
NULL
;
STrimDbReq
trimReq
=
{
0
};
if
(
tDeserializeSTrimDbReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
trimReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
goto
_OVER
;
}
mDebug
(
"db:%s, start to trim"
,
trimReq
.
db
);
pDb
=
mndAcquireDb
(
pMnode
,
trimReq
.
db
);
if
(
pDb
==
NULL
)
{
goto
_OVER
;
}
if
(
mndCheckDbPrivilege
(
pMnode
,
pReq
->
info
.
conn
.
user
,
MND_OPER_TRIM_DB
,
pDb
)
!=
0
)
{
goto
_OVER
;
}
code
=
mndTrimDb
(
pMnode
,
pDb
);
_OVER:
if
(
code
!=
0
)
{
mError
(
"db:%s, failed to process trim db req since %s"
,
trimReq
.
db
,
terrstr
());
}
mndReleaseDb
(
pMnode
,
pDb
);
return
code
;
}
const
char
*
mndGetDbStr
(
const
char
*
src
)
{
char
*
pos
=
strstr
(
src
,
TS_PATH_DELIMITER
);
if
(
pos
!=
NULL
)
++
pos
;
...
...
source/dnode/mnode/impl/src/mndMain.c
浏览文件 @
28fa84b1
...
...
@@ -531,8 +531,7 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
if
(
!
IsReq
(
pMsg
))
return
0
;
if
(
pMsg
->
msgType
==
TDMT_SCH_QUERY
||
pMsg
->
msgType
==
TDMT_SCH_MERGE_QUERY
||
pMsg
->
msgType
==
TDMT_SCH_QUERY_CONTINUE
||
pMsg
->
msgType
==
TDMT_SCH_QUERY_HEARTBEAT
||
pMsg
->
msgType
==
TDMT_SCH_FETCH
||
pMsg
->
msgType
==
TDMT_SCH_MERGE_FETCH
||
pMsg
->
msgType
==
TDMT_SCH_DROP_TASK
)
{
pMsg
->
msgType
==
TDMT_SCH_FETCH
||
pMsg
->
msgType
==
TDMT_SCH_MERGE_FETCH
||
pMsg
->
msgType
==
TDMT_SCH_DROP_TASK
)
{
return
0
;
}
if
(
mndAcquireRpcRef
(
pMsg
->
info
.
node
)
==
0
)
return
0
;
...
...
source/dnode/mnode/impl/src/mndPrivilege.c
浏览文件 @
28fa84b1
...
...
@@ -155,7 +155,8 @@ int32_t mndCheckDbPrivilege(SMnode *pMnode, const char *user, EOperType operType
if
(
pUser
->
sysInfo
)
goto
_OVER
;
}
if
(
operType
==
MND_OPER_ALTER_DB
||
operType
==
MND_OPER_DROP_DB
||
operType
==
MND_OPER_COMPACT_DB
)
{
if
(
operType
==
MND_OPER_ALTER_DB
||
operType
==
MND_OPER_DROP_DB
||
operType
==
MND_OPER_COMPACT_DB
||
operType
==
MND_OPER_TRIM_DB
)
{
if
(
strcmp
(
pUser
->
user
,
pDb
->
createUser
)
==
0
&&
pUser
->
sysInfo
)
goto
_OVER
;
}
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
28fa84b1
...
...
@@ -813,16 +813,18 @@ int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p
}
static
int32_t
mndProcessTtlTimer
(
SRpcMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
info
.
node
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SVgObj
*
pVgroup
=
NULL
;
void
*
pIter
=
NULL
;
SMnode
*
pMnode
=
pReq
->
info
.
node
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SVgObj
*
pVgroup
=
NULL
;
void
*
pIter
=
NULL
;
SVDropTtlTableReq
ttlReq
=
{.
timestamp
=
taosGetTimestampSec
()};
int32_t
reqLen
=
tSerializeSVDropTtlTableReq
(
NULL
,
0
,
&
ttlReq
);
int32_t
contLen
=
reqLen
+
sizeof
(
SMsgHead
);
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pIter
==
NULL
)
break
;
int32_t
contLen
=
sizeof
(
SMsgHead
)
+
sizeof
(
int32_t
);
SMsgHead
*
pHead
=
rpcMallocCont
(
contLen
);
if
(
pHead
==
NULL
)
{
sdbCancelFetch
(
pSdb
,
pVgroup
);
...
...
@@ -831,17 +833,15 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
}
pHead
->
contLen
=
htonl
(
contLen
);
pHead
->
vgId
=
htonl
(
pVgroup
->
vgId
);
int32_t
t
=
taosGetTimestampSec
();
*
(
int32_t
*
)((
char
*
)
pHead
+
sizeof
(
SMsgHead
))
=
htonl
(
t
);
tSerializeSVDropTtlTableReq
((
char
*
)
pHead
+
sizeof
(
SMsgHead
),
contLen
,
&
ttlReq
);
SRpcMsg
rpcMsg
=
{.
msgType
=
TDMT_VND_DROP_TTL_TABLE
,
.
pCont
=
pHead
,
.
contLen
=
contLen
};
SEpSet
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
int32_t
code
=
tmsgSendReq
(
&
epSet
,
&
rpcMsg
);
if
(
code
!=
0
)
{
mError
(
"
failed to send ttl time seed, code:0x%x"
,
code
);
mError
(
"
vgId:%d, failed to send drop ttl table request to vnode since 0x%x"
,
pVgroup
->
vgId
,
code
);
}
else
{
mDebug
(
"
send ttl time seed success, time:%d"
,
t
);
mDebug
(
"
vgId:%d, send drop ttl table request to vnode, time:%d"
,
pVgroup
->
vgId
,
ttlReq
.
timestamp
);
}
sdbRelease
(
pSdb
,
pVgroup
);
}
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
28fa84b1
...
...
@@ -307,7 +307,7 @@ void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
contLen
+=
+
sizeof
(
SMsgHead
);
contLen
+=
sizeof
(
SMsgHead
);
void
*
pReq
=
taosMemoryMalloc
(
contLen
);
if
(
pReq
==
NULL
)
{
...
...
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
28fa84b1
...
...
@@ -32,39 +32,38 @@ extern "C" {
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
typedef
struct
TSDBROW
TSDBROW
;
typedef
struct
TABLEID
TABLEID
;
typedef
struct
TSDBKEY
TSDBKEY
;
typedef
struct
SDelData
SDelData
;
typedef
struct
SDelIdx
SDelIdx
;
typedef
struct
STbData
STbData
;
typedef
struct
SMemTable
SMemTable
;
typedef
struct
STbDataIter
STbDataIter
;
typedef
struct
STable
STable
;
typedef
struct
SMapData
SMapData
;
typedef
struct
SBlockIdx
SBlockIdx
;
typedef
struct
SBlock
SBlock
;
typedef
struct
SBlockStatis
SBlockStatis
;
typedef
struct
SAggrBlkCol
SAggrBlkCol
;
typedef
struct
SColData
SColData
;
typedef
struct
SBlockDataHdr
SBlockDataHdr
;
typedef
struct
SBlockData
SBlockData
;
typedef
struct
SDelFile
SDelFile
;
typedef
struct
STsdbCacheFile
STsdbCacheFile
;
typedef
struct
SHeadFile
SHeadFile
;
typedef
struct
SDataFile
SDataFile
;
typedef
struct
SLastFile
SLastFile
;
typedef
struct
SSmaFile
SSmaFile
;
typedef
struct
SDFileSet
SDFileSet
;
typedef
struct
SDataFWriter
SDataFWriter
;
typedef
struct
SDataFReader
SDataFReader
;
typedef
struct
SDelFWriter
SDelFWriter
;
typedef
struct
SDelFReader
SDelFReader
;
typedef
struct
SRowIter
SRowIter
;
typedef
struct
STsdbFS
STsdbFS
;
typedef
struct
SRowMerger
SRowMerger
;
typedef
struct
STsdbFSState
STsdbFSState
;
typedef
struct
STsdbSnapHdr
STsdbSnapHdr
;
typedef
struct
TSDBROW
TSDBROW
;
typedef
struct
TABLEID
TABLEID
;
typedef
struct
TSDBKEY
TSDBKEY
;
typedef
struct
SDelData
SDelData
;
typedef
struct
SDelIdx
SDelIdx
;
typedef
struct
STbData
STbData
;
typedef
struct
SMemTable
SMemTable
;
typedef
struct
STbDataIter
STbDataIter
;
typedef
struct
STable
STable
;
typedef
struct
SMapData
SMapData
;
typedef
struct
SBlockIdx
SBlockIdx
;
typedef
struct
SBlock
SBlock
;
typedef
struct
SBlockStatis
SBlockStatis
;
typedef
struct
SAggrBlkCol
SAggrBlkCol
;
typedef
struct
SColData
SColData
;
typedef
struct
SBlockDataHdr
SBlockDataHdr
;
typedef
struct
SBlockData
SBlockData
;
typedef
struct
SDelFile
SDelFile
;
typedef
struct
SHeadFile
SHeadFile
;
typedef
struct
SDataFile
SDataFile
;
typedef
struct
SLastFile
SLastFile
;
typedef
struct
SSmaFile
SSmaFile
;
typedef
struct
SDFileSet
SDFileSet
;
typedef
struct
SDataFWriter
SDataFWriter
;
typedef
struct
SDataFReader
SDataFReader
;
typedef
struct
SDelFWriter
SDelFWriter
;
typedef
struct
SDelFReader
SDelFReader
;
typedef
struct
SRowIter
SRowIter
;
typedef
struct
STsdbFS
STsdbFS
;
typedef
struct
SRowMerger
SRowMerger
;
typedef
struct
STsdbFSState
STsdbFSState
;
typedef
struct
STsdbSnapHdr
STsdbSnapHdr
;
#define TSDB_MAX_SUBBLOCKS 8
#define TSDB_FHDR_SIZE 512
...
...
@@ -163,6 +162,7 @@ int32_t tGetMapData(uint8_t *p, SMapData *pMapData);
// other
int32_t
tsdbKeyFid
(
TSKEY
key
,
int32_t
minutes
,
int8_t
precision
);
void
tsdbFidKeyRange
(
int32_t
fid
,
int32_t
minutes
,
int8_t
precision
,
TSKEY
*
minKey
,
TSKEY
*
maxKey
);
int32_t
tsdbFidLevel
(
int32_t
fid
,
STsdbKeepCfg
*
pKeepCfg
,
int64_t
now
);
int32_t
tsdbBuildDeleteSkyline
(
SArray
*
aDelData
,
int32_t
sidx
,
int32_t
eidx
,
SArray
*
aSkyline
);
void
tsdbCalcColDataSMA
(
SColData
*
pColData
,
SColumnDataAgg
*
pColAgg
);
// tsdbMemTable ==============================================================================================
...
...
@@ -200,6 +200,7 @@ int32_t tsdbFSRollback(STsdbFS *pFS);
int32_t
tsdbFSStateUpsertDelFile
(
STsdbFSState
*
pState
,
SDelFile
*
pDelFile
);
int32_t
tsdbFSStateUpsertDFileSet
(
STsdbFSState
*
pState
,
SDFileSet
*
pSet
);
void
tsdbFSStateDeleteDFileSet
(
STsdbFSState
*
pState
,
int32_t
fid
);
SDelFile
*
tsdbFSStateGetDelFile
(
STsdbFSState
*
pState
);
SDFileSet
*
tsdbFSStateGetDFileSet
(
STsdbFSState
*
pState
,
int32_t
fid
);
// tsdbReaderWriter.c ==============================================================================================
...
...
@@ -213,6 +214,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
SBlockIdx
*
pBlockIdx
,
SBlock
*
pBlock
,
int8_t
cmprAlg
);
SDFileSet
*
tsdbDataFWriterGetWSet
(
SDataFWriter
*
pWriter
);
int32_t
tsdbDFileSetCopy
(
STsdb
*
pTsdb
,
SDFileSet
*
pSetFrom
,
SDFileSet
*
pSetTo
);
// SDataFReader
int32_t
tsdbDataFReaderOpen
(
SDataFReader
**
ppReader
,
STsdb
*
pTsdb
,
SDFileSet
*
pSet
);
int32_t
tsdbDataFReaderClose
(
SDataFReader
**
ppReader
);
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
28fa84b1
...
...
@@ -122,6 +122,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepC
int
tsdbClose
(
STsdb
**
pTsdb
);
int32_t
tsdbBegin
(
STsdb
*
pTsdb
);
int32_t
tsdbCommit
(
STsdb
*
pTsdb
);
int32_t
tsdbDoRetention
(
STsdb
*
pTsdb
,
int64_t
now
);
int
tsdbScanAndConvertSubmitMsg
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
);
int
tsdbInsertData
(
STsdb
*
pTsdb
,
int64_t
version
,
SSubmitReq
*
pMsg
,
SSubmitRsp
*
pRsp
);
int32_t
tsdbInsertTableData
(
STsdb
*
pTsdb
,
int64_t
version
,
SSubmitMsgIter
*
pMsgIter
,
SSubmitBlk
*
pBlock
,
...
...
source/dnode/vnode/src/tsdb/tsdbFS.c
浏览文件 @
28fa84b1
...
...
@@ -688,6 +688,14 @@ _exit:
return
code
;
}
void
tsdbFSStateDeleteDFileSet
(
STsdbFSState
*
pState
,
int32_t
fid
)
{
int32_t
idx
;
idx
=
taosArraySearchIdx
(
pState
->
aDFileSet
,
&
(
SDFileSet
){.
fid
=
fid
},
tDFileSetCmprFn
,
TD_EQ
);
ASSERT
(
idx
>=
0
);
taosArrayRemove
(
pState
->
aDFileSet
,
idx
);
}
SDelFile
*
tsdbFSStateGetDelFile
(
STsdbFSState
*
pState
)
{
return
pState
->
pDelFile
;
}
SDFileSet
*
tsdbFSStateGetDFileSet
(
STsdbFSState
*
pState
,
int32_t
fid
)
{
...
...
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
浏览文件 @
28fa84b1
...
...
@@ -1913,3 +1913,114 @@ _err:
taosArrayDestroy
(
aBlockCol
);
return
code
;
}
int32_t
tsdbDFileSetCopy
(
STsdb
*
pTsdb
,
SDFileSet
*
pSetFrom
,
SDFileSet
*
pSetTo
)
{
int32_t
code
=
0
;
int64_t
n
;
int64_t
size
;
TdFilePtr
pOutFD
=
NULL
;
// TODO
TdFilePtr
PInFD
=
NULL
;
// TODO
char
fNameFrom
[
TSDB_FILENAME_LEN
];
char
fNameTo
[
TSDB_FILENAME_LEN
];
// head
tsdbDataFileName
(
pTsdb
,
pSetFrom
,
TSDB_HEAD_FILE
,
fNameFrom
);
tsdbDataFileName
(
pTsdb
,
pSetTo
,
TSDB_HEAD_FILE
,
fNameTo
);
pOutFD
=
taosOpenFile
(
fNameTo
,
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
);
if
(
pOutFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
PInFD
=
taosOpenFile
(
fNameFrom
,
TD_FILE_READ
);
if
(
PInFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
n
=
taosFSendFile
(
pOutFD
,
PInFD
,
0
,
pSetFrom
->
fHead
.
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
taosCloseFile
(
&
pOutFD
);
taosCloseFile
(
&
PInFD
);
// data
tsdbDataFileName
(
pTsdb
,
pSetFrom
,
TSDB_DATA_FILE
,
fNameFrom
);
tsdbDataFileName
(
pTsdb
,
pSetTo
,
TSDB_DATA_FILE
,
fNameTo
);
pOutFD
=
taosOpenFile
(
fNameTo
,
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
);
if
(
pOutFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
PInFD
=
taosOpenFile
(
fNameFrom
,
TD_FILE_READ
);
if
(
PInFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
n
=
taosFSendFile
(
pOutFD
,
PInFD
,
0
,
pSetFrom
->
fData
.
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
taosCloseFile
(
&
pOutFD
);
taosCloseFile
(
&
PInFD
);
// last
tsdbDataFileName
(
pTsdb
,
pSetFrom
,
TSDB_LAST_FILE
,
fNameFrom
);
tsdbDataFileName
(
pTsdb
,
pSetTo
,
TSDB_LAST_FILE
,
fNameTo
);
pOutFD
=
taosOpenFile
(
fNameTo
,
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
);
if
(
pOutFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
PInFD
=
taosOpenFile
(
fNameFrom
,
TD_FILE_READ
);
if
(
PInFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
n
=
taosFSendFile
(
pOutFD
,
PInFD
,
0
,
pSetFrom
->
fLast
.
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
taosCloseFile
(
&
pOutFD
);
taosCloseFile
(
&
PInFD
);
// sma
tsdbDataFileName
(
pTsdb
,
pSetFrom
,
TSDB_SMA_FILE
,
fNameFrom
);
tsdbDataFileName
(
pTsdb
,
pSetTo
,
TSDB_SMA_FILE
,
fNameTo
);
pOutFD
=
taosOpenFile
(
fNameTo
,
TD_FILE_WRITE
|
TD_FILE_CREATE
|
TD_FILE_TRUNC
);
if
(
pOutFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
PInFD
=
taosOpenFile
(
fNameFrom
,
TD_FILE_READ
);
if
(
PInFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
n
=
taosFSendFile
(
pOutFD
,
PInFD
,
0
,
pSetFrom
->
fSma
.
size
);
if
(
n
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
goto
_err
;
}
taosCloseFile
(
&
pOutFD
);
taosCloseFile
(
&
PInFD
);
return
code
;
_err:
tsdbError
(
"vgId:%d tsdb DFileSet copy failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
\ No newline at end of file
source/dnode/vnode/src/tsdb/tsdbRetention.c
浏览文件 @
28fa84b1
...
...
@@ -15,19 +15,80 @@
#include "tsdb.h"
static
int32_t
tsdbDoRetentionImpl
(
STsdb
*
pTsdb
,
int64_t
now
,
int8_t
try
,
int8_t
*
canDo
)
{
int32_t
code
=
0
;
STsdbFSState
*
pState
;
if
(
try
)
{
pState
=
pTsdb
->
fs
->
cState
;
*
canDo
=
0
;
}
else
{
pState
=
pTsdb
->
fs
->
nState
;
}
for
(
int32_t
iSet
=
0
;
iSet
<
taosArrayGetSize
(
pState
->
aDFileSet
);
iSet
++
)
{
SDFileSet
*
pDFileSet
=
(
SDFileSet
*
)
taosArrayGet
(
pState
->
aDFileSet
,
iSet
);
int32_t
expLevel
=
tsdbFidLevel
(
pDFileSet
->
fid
,
&
pTsdb
->
keepCfg
,
now
);
SDiskID
did
;
// check
if
(
expLevel
==
pDFileSet
->
diskId
.
id
)
continue
;
// delete or move
if
(
expLevel
<
0
)
{
if
(
try
)
{
*
canDo
=
1
;
}
else
{
tsdbFSStateDeleteDFileSet
(
pState
,
pDFileSet
->
fid
);
iSet
--
;
}
}
else
{
// alloc
if
(
tfsAllocDisk
(
pTsdb
->
pVnode
->
pTfs
,
expLevel
,
&
did
)
<
0
)
{
code
=
terrno
;
goto
_exit
;
}
if
(
did
.
level
==
pDFileSet
->
diskId
.
level
)
continue
;
if
(
try
)
{
*
canDo
=
1
;
}
else
{
// copy the file to new disk
SDFileSet
nDFileSet
=
*
pDFileSet
;
nDFileSet
.
diskId
=
did
;
tfsMkdirRecurAt
(
pTsdb
->
pVnode
->
pTfs
,
pTsdb
->
path
,
did
);
code
=
tsdbDFileSetCopy
(
pTsdb
,
pDFileSet
,
&
nDFileSet
);
if
(
code
)
goto
_exit
;
code
=
tsdbFSStateUpsertDFileSet
(
pState
,
&
nDFileSet
);
if
(
code
)
goto
_exit
;
}
}
}
_exit:
return
code
;
}
int32_t
tsdbDoRetention
(
STsdb
*
pTsdb
,
int64_t
now
)
{
int32_t
code
=
0
;
int8_t
canDo
;
// try
tsdbDoRetentionImpl
(
pTsdb
,
now
,
1
,
&
canDo
);
if
(
!
canDo
)
goto
_exit
;
// begin
code
=
tsdbFSBegin
(
pTsdb
->
fs
);
if
(
code
)
goto
_err
;
// do retention
for
(
int32_t
iSet
=
0
;
iSet
<
taosArrayGetSize
(
pTsdb
->
fs
->
nState
->
aDFileSet
);
iSet
++
)
{
SDFileSet
*
pDFileSet
=
(
SDFileSet
*
)
taosArrayGet
(
pTsdb
->
fs
->
nState
->
aDFileSet
,
iSet
);
// TODO
}
code
=
tsdbDoRetentionImpl
(
pTsdb
,
now
,
0
,
NULL
);
if
(
code
)
goto
_err
;
// commit
code
=
tsdbFSCommit
(
pTsdb
->
fs
);
...
...
@@ -38,5 +99,6 @@ _exit:
_err:
tsdbError
(
"vgId:%d tsdb do retention failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbFSRollback
(
pTsdb
->
fs
);
return
code
;
}
\ No newline at end of file
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
浏览文件 @
28fa84b1
...
...
@@ -312,6 +312,9 @@ struct STsdbSnapWriter {
// config
int32_t
minutes
;
int8_t
precision
;
int32_t
minRow
;
int32_t
maxRow
;
int8_t
cmprAlg
;
// for data file
int32_t
fid
;
...
...
@@ -321,14 +324,18 @@ struct STsdbSnapWriter {
SBlockIdx
*
pBlockIdx
;
SMapData
mBlock
;
int32_t
iBlock
;
SBlock
*
pBlock
;
SBlock
block
;
SBlockData
blockData
;
int32_t
iRow
;
SDataFWriter
*
pDataFWriter
;
SArray
*
aBlockIdxN
;
SBlockIdx
*
pBlockIdxN
;
SBlockIdx
blockIdx
;
SMapData
mBlockN
;
SBlock
block
;
SBlock
*
pBlockN
;
SBlock
blockN
;
SBlockData
nBlockData
;
// for del file
...
...
@@ -394,13 +401,114 @@ _err:
return
code
;
}
static
int32_t
tsdbSnapWriteTableDataEnd
(
STsdbSnapWriter
*
pWrite
)
{
int32_t
code
=
0
;
// TODO
return
code
;
}
static
int32_t
tsdbSnapWriteTableData
(
STsdbSnapWriter
*
pWriter
,
uint8_t
*
pData
,
uint32_t
nData
)
{
int32_t
code
=
0
;
TABLEID
id
=
{
0
};
// TODO
// skip
while
(
pWriter
->
pBlockIdx
&&
tTABLEIDCmprFn
(
&
id
,
pWriter
->
pBlockIdx
)
<
0
)
{
code
=
tsdbSnapWriteTableDataEnd
(
pWriter
);
if
(
code
)
goto
_err
;
pWriter
->
iBlockIdx
++
;
if
(
pWriter
->
iBlockIdx
<
taosArrayGetSize
(
pWriter
->
aBlockIdx
))
{
pWriter
->
pBlockIdx
=
(
SBlockIdx
*
)
taosArrayGet
(
pWriter
->
aBlockIdx
,
pWriter
->
iBlockIdx
);
}
else
{
pWriter
->
pBlockIdx
=
NULL
;
}
}
// new or merge
if
(
pWriter
->
pBlockIdx
==
NULL
||
tTABLEIDCmprFn
(
&
id
,
pWriter
->
pBlockIdx
)
<
0
)
{
int32_t
c
;
if
(
pWriter
->
pBlockIdxN
&&
((
c
=
tTABLEIDCmprFn
(
&
id
,
pWriter
->
pBlockIdxN
))
!=
0
))
{
ASSERT
(
c
>
0
);
code
=
tsdbSnapWriteTableDataEnd
(
pWriter
);
if
(
code
)
goto
_err
;
}
if
(
pWriter
->
pBlockIdxN
==
NULL
)
{
pWriter
->
pBlockIdx
=
&
pWriter
->
blockIdx
;
pWriter
->
pBlockIdx
->
suid
=
id
.
suid
;
pWriter
->
pBlockIdx
->
uid
=
id
.
uid
;
}
// loop to write the data
TSDBROW
*
pRow
=
NULL
;
// todo
int32_t
nRow
=
0
;
// todo
SBlockData
*
pBlockData
=
NULL
;
// todo
for
(
int32_t
iRow
=
0
;
iRow
<
nRow
;
iRow
++
)
{
code
=
tBlockDataAppendRow
(
&
pWriter
->
nBlockData
,
&
tsdbRowFromBlockData
(
pBlockData
,
iRow
),
NULL
);
if
(
code
)
goto
_err
;
if
(
pWriter
->
nBlockData
.
nRow
>
pWriter
->
maxRow
*
4
/
5
)
{
code
=
tsdbWriteBlockData
(
pWriter
->
pDataFWriter
,
&
pWriter
->
nBlockData
,
NULL
,
NULL
,
pWriter
->
pBlockIdxN
,
pWriter
->
pBlockN
,
pWriter
->
cmprAlg
);
if
(
code
)
goto
_err
;
}
}
}
else
{
// skip
while
(
true
)
{
if
(
pWriter
->
pBlock
==
NULL
)
break
;
if
(
pWriter
->
pBlock
->
last
)
break
;
if
(
tBlockCmprFn
(
&
(
SBlock
){.
minKey
=
{
0
},
.
maxKey
=
{
0
}},
pWriter
->
pBlock
)
>=
0
)
break
;
code
=
tMapDataPutItem
(
&
pWriter
->
mBlockN
,
pWriter
->
pBlock
,
tPutBlock
);
if
(
code
)
goto
_err
;
}
if
(
pWriter
->
pBlock
)
{
if
(
pWriter
->
pBlock
->
last
)
{
// load the last block and merge with the data (todo)
}
else
{
int32_t
c
=
tBlockCmprFn
(
&
(
SBlock
){
0
/*TODO*/
},
pWriter
->
pBlock
);
if
(
c
>
0
)
{
// commit until pWriter->pBlock (todo)
}
else
{
// load the block and merge with the data (todo)
}
}
}
else
{
int32_t
nRow
=
0
;
SBlockData
*
pBlockData
=
NULL
;
for
(
int32_t
iRow
=
0
;
iRow
<
nRow
;
iRow
++
)
{
code
=
tBlockDataAppendRow
(
&
pWriter
->
nBlockData
,
&
tsdbRowFromBlockData
(
pBlockData
,
iRow
),
NULL
);
if
(
code
)
goto
_err
;
if
(
pWriter
->
nBlockData
.
nRow
>=
pWriter
->
maxRow
*
4
/
5
)
{
code
=
tsdbWriteBlockData
(
pWriter
->
pDataFWriter
,
&
pWriter
->
nBlockData
,
NULL
,
NULL
,
pWriter
->
pBlockIdxN
,
pWriter
->
pBlockN
,
pWriter
->
cmprAlg
);
if
(
code
)
goto
_err
;
tBlockDataClearData
(
&
pWriter
->
nBlockData
);
}
}
}
}
return
code
;
_err:
tsdbError
(
"vgId:%d tsdb snapshot write table data failed since %s"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
static
int32_t
tsdbSnapWriteData
(
STsdbSnapWriter
*
pWriter
,
uint8_t
*
pData
,
uint32_t
nData
)
{
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pWriter
->
pTsdb
;
int64_t
suid
=
0
;
// todo
int64_t
uid
=
0
;
// todo
int64_t
skey
;
// todo
int64_t
ekey
;
// todo
int64_t
skey
;
// todo
int64_t
ekey
;
// todo
int32_t
fid
=
tsdbKeyFid
(
skey
,
pWriter
->
minutes
,
pWriter
->
precision
);
ASSERT
(
fid
==
tsdbKeyFid
(
ekey
,
pWriter
->
minutes
,
pWriter
->
precision
));
...
...
@@ -440,47 +548,8 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
taosArrayClear
(
pWriter
->
aBlockIdxN
);
}
// process
TABLEID
id
=
{
0
};
// TODO
TSKEY
minKey
=
0
;
// TODO
TSKEY
maxKey
=
0
;
// TODO
while
(
true
)
{
if
(
pWriter
->
pBlockIdx
)
{
int32_t
c
=
tTABLEIDCmprFn
(
&
id
,
pWriter
->
pBlockIdx
);
if
(
c
==
0
)
{
}
else
if
(
c
<
0
)
{
// keep merge
}
else
{
// code = tsdbSnapWriteTableDataEnd(pWriter);
if
(
code
)
goto
_err
;
pWriter
->
iBlockIdx
++
;
if
(
pWriter
->
iBlockIdx
<
taosArrayGetSize
(
pWriter
->
aBlockIdx
))
{
pWriter
->
pBlockIdx
=
(
SBlockIdx
*
)
taosArrayGet
(
pWriter
->
aBlockIdx
,
pWriter
->
iBlockIdx
);
}
else
{
pWriter
->
pBlockIdx
=
NULL
;
}
if
(
pWriter
->
pBlockIdx
)
{
code
=
tsdbReadBlock
(
pWriter
->
pDataFReader
,
pWriter
->
pBlockIdx
,
&
pWriter
->
mBlock
,
NULL
);
if
(
code
)
goto
_err
;
}
}
}
else
{
int32_t
c
=
tTABLEIDCmprFn
(
&
id
,
&
pWriter
->
blockIdx
);
if
(
c
==
0
)
{
// merge commit the block data
}
else
if
(
c
>
0
)
{
// code = tsdbSnapWriteTableDataEnd(pWriter);
if
(
code
)
goto
_err
;
}
else
{
ASSERT
(
0
);
}
}
}
code
=
tsdbSnapWriteTableData
(
pWriter
,
pData
,
nData
);
if
(
code
)
goto
_err
;
return
code
;
...
...
source/dnode/vnode/src/tsdb/tsdbUtil.c
浏览文件 @
28fa84b1
...
...
@@ -465,17 +465,37 @@ void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minK
*
maxKey
=
*
minKey
+
minutes
*
tsTickPerMin
[
precision
]
-
1
;
}
// int tsdFidLevel(int fid, TSKEY now, minute) {
// if (fid >= pRtn->maxFid) {
// return 0;
// } else if (fid >= pRtn->midFid) {
// return 1;
// } else if (fid >= pRtn->minFid) {
// return 2;
// } else {
// return -1;
// }
// }
int32_t
tsdbFidLevel
(
int32_t
fid
,
STsdbKeepCfg
*
pKeepCfg
,
int64_t
now
)
{
int32_t
aFid
[
3
];
TSKEY
key
;
if
(
pKeepCfg
->
precision
==
TSDB_TIME_PRECISION_MILLI
)
{
now
=
now
*
1000
;
}
else
if
(
pKeepCfg
->
precision
==
TSDB_TIME_PRECISION_MICRO
)
{
now
=
now
*
1000000l
;
}
else
if
(
pKeepCfg
->
precision
==
TSDB_TIME_PRECISION_NANO
)
{
now
=
now
*
1000000000l
;
}
else
{
ASSERT
(
0
);
}
key
=
now
-
pKeepCfg
->
keep0
*
tsTickPerMin
[
pKeepCfg
->
precision
];
aFid
[
0
]
=
tsdbKeyFid
(
key
,
pKeepCfg
->
days
,
pKeepCfg
->
precision
);
key
=
now
-
pKeepCfg
->
keep1
*
tsTickPerMin
[
pKeepCfg
->
precision
];
aFid
[
1
]
=
tsdbKeyFid
(
key
,
pKeepCfg
->
days
,
pKeepCfg
->
precision
);
key
=
now
-
pKeepCfg
->
keep2
*
tsTickPerMin
[
pKeepCfg
->
precision
];
aFid
[
2
]
=
tsdbKeyFid
(
key
,
pKeepCfg
->
days
,
pKeepCfg
->
precision
);
if
(
fid
>=
aFid
[
0
])
{
return
0
;
}
else
if
(
fid
>=
aFid
[
1
])
{
return
1
;
}
else
if
(
fid
>=
aFid
[
2
])
{
return
2
;
}
else
{
return
-
1
;
}
}
// TSDBROW ======================================================
void
tsdbRowGetColVal
(
TSDBROW
*
pRow
,
STSchema
*
pTSchema
,
int32_t
iCol
,
SColVal
*
pColVal
)
{
...
...
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
28fa84b1
...
...
@@ -28,7 +28,7 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
}
// create vnode env
if
(
tfsMkdir
(
pTfs
,
path
)
<
0
)
{
if
(
tfsMkdir
At
(
pTfs
,
path
,
(
SDiskID
){
0
}
)
<
0
)
{
vError
(
"vgId:%d, failed to create vnode since: %s"
,
pCfg
->
vgId
,
tstrerror
(
terrno
));
return
-
1
;
}
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
28fa84b1
...
...
@@ -27,6 +27,7 @@ static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void
static
int32_t
vnodeProcessAlterHashRangeReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
static
int32_t
vnodeProcessAlterConfigReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
static
int32_t
vnodeProcessDropTtlTbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
static
int32_t
vnodeProcessTrimReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
static
int32_t
vnodeProcessDeleteReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
int32_t
vnodePreProcessWriteMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
...
...
@@ -173,9 +174,12 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
case
TDMT_VND_DROP_TTL_TABLE
:
if
(
vnodeProcessDropTtlTbReq
(
pVnode
,
version
,
pReq
,
len
,
pRsp
)
<
0
)
goto
_err
;
break
;
case
TDMT_VND_CREATE_SMA
:
{
case
TDMT_VND_TRIM
:
if
(
vnodeProcessTrimReq
(
pVnode
,
version
,
pReq
,
len
,
pRsp
)
<
0
)
goto
_err
;
break
;
case
TDMT_VND_CREATE_SMA
:
if
(
vnodeProcessCreateTSmaReq
(
pVnode
,
version
,
pReq
,
len
,
pRsp
)
<
0
)
goto
_err
;
}
break
;
break
;
/* TSDB */
case
TDMT_VND_SUBMIT
:
if
(
vnodeProcessSubmitReq
(
pVnode
,
version
,
pMsg
->
pCont
,
pMsg
->
contLen
,
pRsp
)
<
0
)
goto
_err
;
...
...
@@ -347,13 +351,38 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
pMetaRsp
->
precision
=
pVnode
->
config
.
tsdbCfg
.
precision
;
}
static
int32_t
vnodeProcessTrimReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
)
{
int32_t
code
=
0
;
SVTrimDbReq
trimReq
=
{
0
};
vInfo
(
"vgId:%d, trim vnode request will be processed, time:%d"
,
pVnode
->
config
.
vgId
,
trimReq
.
timestamp
);
// decode
if
(
tDeserializeSVTrimDbReq
(
pReq
,
len
,
&
trimReq
)
!=
0
)
{
code
=
TSDB_CODE_INVALID_MSG
;
goto
_exit
;
}
// process
code
=
tsdbDoRetention
(
pVnode
->
pTsdb
,
trimReq
.
timestamp
);
if
(
code
)
goto
_exit
;
_exit:
return
code
;
}
static
int32_t
vnodeProcessDropTtlTbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
)
{
SArray
*
tbUids
=
taosArrayInit
(
8
,
sizeof
(
int64_t
));
if
(
tbUids
==
NULL
)
return
TSDB_CODE_OUT_OF_MEMORY
;
int32_t
t
=
ntohl
(
*
(
int32_t
*
)
pReq
);
vDebug
(
"rec ttl time:%d"
,
t
);
int32_t
ret
=
metaTtlDropTable
(
pVnode
->
pMeta
,
t
,
tbUids
);
SVDropTtlTableReq
ttlReq
=
{
0
};
if
(
tDeserializeSVDropTtlTableReq
(
pReq
,
len
,
&
ttlReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
goto
end
;
}
vInfo
(
"vgId:%d, drop ttl table req will be processed, time:%d"
,
pVnode
->
config
.
vgId
,
ttlReq
.
timestamp
);
int32_t
ret
=
metaTtlDropTable
(
pVnode
->
pMeta
,
ttlReq
.
timestamp
,
tbUids
);
if
(
ret
!=
0
)
{
goto
end
;
}
...
...
source/libs/transport/inc/transComm.h
浏览文件 @
28fa84b1
...
...
@@ -311,6 +311,12 @@ void transCtxMerge(STransCtx* dst, STransCtx* src);
void
*
transCtxDumpVal
(
STransCtx
*
ctx
,
int32_t
key
);
void
*
transCtxDumpBrokenlinkVal
(
STransCtx
*
ctx
,
int32_t
*
msgType
);
// request list
typedef
struct
STransReq
{
queue
q
;
void
*
data
;
}
STransReq
;
// queue sending msgs
typedef
struct
{
SArray
*
q
;
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
28fa84b1
...
...
@@ -29,7 +29,7 @@ typedef struct {
typedef
struct
SSvrConn
{
T_REF_DECLARE
()
uv_tcp_t
*
pTcp
;
uv_write_t
pWriter
;
queue
wreqQueue
;
uv_timer_t
pTimer
;
queue
queue
;
...
...
@@ -331,8 +331,14 @@ void uvOnTimeoutCb(uv_timer_t* handle) {
}
void
uvOnSendCb
(
uv_write_t
*
req
,
int
status
)
{
SSvrConn
*
conn
=
req
&&
req
->
handle
?
req
->
handle
->
data
:
NULL
;
taosMemoryFree
(
req
);
STransReq
*
wreq
=
req
&&
req
->
data
?
req
->
data
:
NULL
;
SSvrConn
*
conn
=
req
&&
req
->
handle
?
req
->
handle
->
data
:
NULL
;
if
(
wreq
!=
NULL
&&
conn
!=
NULL
)
{
QUEUE_REMOVE
(
&
wreq
->
q
);
taosMemoryFree
(
wreq
->
data
);
taosMemoryFree
(
wreq
);
}
if
(
conn
==
NULL
)
return
;
if
(
status
==
0
)
{
...
...
@@ -437,12 +443,16 @@ static void uvStartSendRespInternal(SSvrMsg* smsg) {
transRefSrvHandle
(
pConn
);
uv_write_t
*
req
=
taosMemoryCalloc
(
1
,
sizeof
(
uv_write_t
));
STransReq
*
wreq
=
taosMemoryCalloc
(
1
,
sizeof
(
STransReq
));
wreq
->
data
=
req
;
req
->
data
=
wreq
;
QUEUE_PUSH
(
&
pConn
->
wreqQueue
,
&
wreq
->
q
);
uv_write
(
req
,
(
uv_stream_t
*
)
pConn
->
pTcp
,
&
wb
,
1
,
uvOnSendCb
);
}
static
void
uvStartSendResp
(
SSvrMsg
*
smsg
)
{
// impl
SSvrConn
*
pConn
=
smsg
->
pConn
;
if
(
pConn
->
broken
==
true
)
{
// persist by
transFreeMsg
(
smsg
->
msg
.
pCont
);
...
...
@@ -639,8 +649,6 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
uv_tcp_init
(
pThrd
->
loop
,
pConn
->
pTcp
);
pConn
->
pTcp
->
data
=
pConn
;
pConn
->
pWriter
.
data
=
pConn
;
transSetConnOption
((
uv_tcp_t
*
)
pConn
->
pTcp
);
if
(
uv_accept
(
q
,
(
uv_stream_t
*
)(
pConn
->
pTcp
))
==
0
)
{
...
...
@@ -748,6 +756,8 @@ static SSvrConn* createConn(void* hThrd) {
SWorkThrd
*
pThrd
=
hThrd
;
SSvrConn
*
pConn
=
(
SSvrConn
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SSvrConn
));
QUEUE_INIT
(
&
pConn
->
wreqQueue
);
QUEUE_INIT
(
&
pConn
->
queue
);
QUEUE_PUSH
(
&
pThrd
->
conn
,
&
pConn
->
queue
);
...
...
@@ -823,6 +833,14 @@ static void uvDestroyConn(uv_handle_t* handle) {
SSvrMsg
*
msg
=
transQueueGet
(
&
conn
->
srvMsgs
,
i
);
destroySmsg
(
msg
);
}
while
(
!
QUEUE_IS_EMPTY
(
&
conn
->
wreqQueue
))
{
queue
*
h
=
QUEUE_HEAD
(
&
conn
->
wreqQueue
);
QUEUE_REMOVE
(
h
);
STransReq
*
req
=
QUEUE_DATA
(
h
,
STransReq
,
q
);
taosMemoryFree
(
req
->
data
);
taosMemoryFree
(
req
);
}
transQueueDestroy
(
&
conn
->
srvMsgs
);
QUEUE_REMOVE
(
&
conn
->
queue
);
...
...
tests/script/tsim/valgrind/checkError1.sim
浏览文件 @
28fa84b1
...
...
@@ -98,7 +98,7 @@ print ----> start to check if there are ERRORS in vagrind log file for each dnod
system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ]
if $system_content <=
0
then
if $system_content <=
2
then
return 0
endi
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录