Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
12f716b3
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
“bab5f8209e529d390c97e5240bfdeb2fb1d77200”上不存在“source/dnode/mnode/impl/src/mnodeMnode.c”
提交
12f716b3
编写于
3月 28, 2022
作者:
A
Alex Duan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TS-238-D]<fix>(tsdb): deal block readonly modify delete status
上级
6185cef9
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
391 addition
and
258 deletion
+391
-258
src/client/src/tscServer.c
src/client/src/tscServer.c
+11
-9
src/inc/taosmsg.h
src/inc/taosmsg.h
+4
-0
src/tsdb/inc/tsdbTruncate.h
src/tsdb/inc/tsdbTruncate.h
+2
-5
src/tsdb/src/tsdbCommit.c
src/tsdb/src/tsdbCommit.c
+4
-2
src/tsdb/src/tsdbMemTable.c
src/tsdb/src/tsdbMemTable.c
+15
-10
src/tsdb/src/tsdbTruncate.c
src/tsdb/src/tsdbTruncate.c
+355
-232
未找到文件。
src/client/src/tscServer.c
浏览文件 @
12f716b3
...
@@ -3331,7 +3331,7 @@ int buildTableDelDataMsg(SSqlObj* pSql, SSqlCmd* pCmd, SQueryInfo* pQueryInfo, S
...
@@ -3331,7 +3331,7 @@ int buildTableDelDataMsg(SSqlObj* pSql, SSqlCmd* pCmd, SQueryInfo* pQueryInfo, S
tscDebug
(
"0x%"
PRIx64
" table deldata submit msg built, numberOfEP:%d"
,
pSql
->
self
,
pSql
->
epSet
.
numOfEps
);
tscDebug
(
"0x%"
PRIx64
" table deldata submit msg built, numberOfEP:%d"
,
pSql
->
self
,
pSql
->
epSet
.
numOfEps
);
// set payload
// set payload
size_t
payloadLen
=
sizeof
(
SMsgDesc
)
+
sizeof
(
SSubmitMsg
)
+
sizeof
(
SSubmitBlk
)
+
sizeof
(
SControlData
);
size_t
payloadLen
=
sizeof
(
SMsgDesc
)
+
sizeof
(
SSubmitMsg
)
+
sizeof
(
SSubmitBlk
)
+
sizeof
(
SControlData
)
+
sizeof
(
int32_t
)
;
int32_t
ret
=
tscAllocPayload
(
pCmd
,
payloadLen
);
int32_t
ret
=
tscAllocPayload
(
pCmd
,
payloadLen
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
return
ret
;
...
@@ -3351,22 +3351,24 @@ int buildTableDelDataMsg(SSqlObj* pSql, SSqlCmd* pCmd, SQueryInfo* pQueryInfo, S
...
@@ -3351,22 +3351,24 @@ int buildTableDelDataMsg(SSqlObj* pSql, SSqlCmd* pCmd, SQueryInfo* pQueryInfo, S
pMsgDesc
->
numOfVnodes
=
htonl
(
1
);
pMsgDesc
->
numOfVnodes
=
htonl
(
1
);
// SSubmitMsg
// SSubmitMsg
int32_t
size
=
pCmd
->
payloadLen
-
sizeof
(
SMsgDesc
);
int32_t
size
=
pCmd
->
payloadLen
-
sizeof
(
SMsgDesc
);
pSubmitMsg
->
header
.
vgId
=
htonl
(
pTableMeta
->
vgId
);
pSubmitMsg
->
header
.
vgId
=
htonl
(
pTableMeta
->
vgId
);
pSubmitMsg
->
header
.
contLen
=
htonl
(
size
);
pSubmitMsg
->
header
.
contLen
=
htonl
(
size
);
pSubmitMsg
->
length
=
pSubmitMsg
->
header
.
contLen
;
pSubmitMsg
->
length
=
pSubmitMsg
->
header
.
contLen
;
pSubmitMsg
->
numOfBlocks
=
htonl
(
1
);
pSubmitMsg
->
numOfBlocks
=
htonl
(
1
);
// SSubmitBlk
// SSubmitBlk
pSubmitBlk
->
flag
=
FLAG_BLK_CONTROL
;
// this is control block
pSubmitBlk
->
flag
=
FLAG_BLK_CONTROL
;
// this is control block
pSubmitBlk
->
tid
=
htonl
(
pTableMeta
->
id
.
tid
);
pSubmitBlk
->
tid
=
htonl
(
pTableMeta
->
id
.
tid
);
pSubmitBlk
->
uid
=
htobe64
(
pTableMeta
->
id
.
uid
);
pSubmitBlk
->
uid
=
htobe64
(
pTableMeta
->
id
.
uid
);
pSubmitBlk
->
numOfRows
=
htons
(
1
);
pSubmitBlk
->
numOfRows
=
htons
(
1
);
pSubmitBlk
->
schemaLen
=
0
;
// only server return TSDB_CODE_TDB_TABLE_RECONFIGURE need schema attached
pSubmitBlk
->
schemaLen
=
0
;
// only server return TSDB_CODE_TDB_TABLE_RECONFIGURE need schema attached
pSubmitBlk
->
sversion
=
htonl
(
pTableMeta
->
sversion
);
pSubmitBlk
->
sversion
=
htonl
(
pTableMeta
->
sversion
);
pSubmitBlk
->
dataLen
=
htonl
(
sizeof
(
SControlData
));
pSubmitBlk
->
dataLen
=
htonl
(
sizeof
(
SControlData
)
+
sizeof
(
int32_t
));
// SControlData
// SControlData
pControlData
->
command
=
htonl
(
CMD_DELETE_DATA
);
pControlData
->
command
=
htonl
(
CMD_DELETE_DATA
);
pControlData
->
win
.
skey
=
htobe64
(
pQueryInfo
->
window
.
skey
);
pControlData
->
win
.
skey
=
htobe64
(
pQueryInfo
->
window
.
skey
);
pControlData
->
win
.
ekey
=
htobe64
(
pQueryInfo
->
window
.
ekey
);
pControlData
->
win
.
ekey
=
htobe64
(
pQueryInfo
->
window
.
ekey
);
pControlData
->
tnum
=
htonl
(
1
);
pControlData
->
tids
[
0
]
=
htonl
(
pTableMeta
->
id
.
tid
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
...
src/inc/taosmsg.h
浏览文件 @
12f716b3
...
@@ -1005,9 +1005,13 @@ typedef struct {
...
@@ -1005,9 +1005,13 @@ typedef struct {
#define CMD_DELETE_DATA 0x00000001
#define CMD_DELETE_DATA 0x00000001
#define CMD_TRUNCATE 0x00000002
#define CMD_TRUNCATE 0x00000002
#define GET_CTLDATA_SIZE(p) (sizeof(SControlData) + p->tnum * sizeof(int32_t))
typedef
struct
SControlData
{
typedef
struct
SControlData
{
uint32_t
command
;
// see define CMD_???
uint32_t
command
;
// see define CMD_???
STimeWindow
win
;
STimeWindow
win
;
int32_t
tnum
;
// tids nums
int32_t
tids
[];
// delete table tid
}
SControlData
;
}
SControlData
;
enum
{
enum
{
...
...
src/tsdb/inc/tsdbTruncate.h
浏览文件 @
12f716b3
...
@@ -20,16 +20,13 @@ extern "C" {
...
@@ -20,16 +20,13 @@ extern "C" {
#endif
#endif
// SControlData addition information
// SControlData addition information
#define GET_CTLINFO_SIZE(p) (sizeof(SControlDataInfo) + p.ctlData.tnum * sizeof(int32_t))
typedef
struct
{
typedef
struct
{
SControlData
ctlData
;
// addition info
// addition info
uint64_t
uid
;
// table unique id
int32_t
tid
;
// table id
tsem_t
*
pSem
;
tsem_t
*
pSem
;
bool
memNull
;
// pRepo->mem is NULL, this is true
bool
memNull
;
// pRepo->mem is NULL, this is true
uint64_t
*
uids
;
// delete table
int32_t
uidCount
;
SShellSubmitRspMsg
*
pRsp
;
SShellSubmitRspMsg
*
pRsp
;
SControlData
ctlData
;
}
SControlDataInfo
;
}
SControlDataInfo
;
// -------- interface ---------
// -------- interface ---------
...
...
src/tsdb/src/tsdbCommit.c
浏览文件 @
12f716b3
...
@@ -240,7 +240,8 @@ int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) {
...
@@ -240,7 +240,8 @@ int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) {
pBlkIdx
=
(
SBlockIdx
*
)
taosArrayGet
(
pIdxA
,
i
);
pBlkIdx
=
(
SBlockIdx
*
)
taosArrayGet
(
pIdxA
,
i
);
size
=
tsdbEncodeSBlockIdx
(
NULL
,
pBlkIdx
);
size
=
tsdbEncodeSBlockIdx
(
NULL
,
pBlkIdx
);
if
(
tsdbMakeRoom
(
ppBuf
,
tlen
+
size
)
<
0
)
return
-
1
;
if
(
tsdbMakeRoom
(
ppBuf
,
tlen
+
size
)
<
0
)
return
-
1
;
void
*
ptr
=
POINTER_SHIFT
(
*
ppBuf
,
tlen
);
void
*
ptr
=
POINTER_SHIFT
(
*
ppBuf
,
tlen
);
tsdbEncodeSBlockIdx
(
&
ptr
,
pBlkIdx
);
tsdbEncodeSBlockIdx
(
&
ptr
,
pBlkIdx
);
...
@@ -249,7 +250,8 @@ int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) {
...
@@ -249,7 +250,8 @@ int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) {
}
}
tlen
+=
sizeof
(
TSCKSUM
);
tlen
+=
sizeof
(
TSCKSUM
);
if
(
tsdbMakeRoom
(
ppBuf
,
tlen
)
<
0
)
return
-
1
;
if
(
tsdbMakeRoom
(
ppBuf
,
tlen
)
<
0
)
return
-
1
;
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)(
*
ppBuf
),
tlen
);
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)(
*
ppBuf
),
tlen
);
if
(
tsdbAppendDFile
(
pHeadf
,
*
ppBuf
,
tlen
,
&
offset
)
<
tlen
)
{
if
(
tsdbAppendDFile
(
pHeadf
,
*
ppBuf
,
tlen
,
&
offset
)
<
tlen
)
{
...
...
src/tsdb/src/tsdbMemTable.c
浏览文件 @
12f716b3
...
@@ -1122,7 +1122,6 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow r
...
@@ -1122,7 +1122,6 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow r
// Control Data
// Control Data
int32_t
tsdbInsertControlData
(
STsdbRepo
*
pRepo
,
SSubmitBlk
*
pBlock
,
SShellSubmitRspMsg
*
pRsp
,
tsem_t
**
ppSem
)
{
int32_t
tsdbInsertControlData
(
STsdbRepo
*
pRepo
,
SSubmitBlk
*
pBlock
,
SShellSubmitRspMsg
*
pRsp
,
tsem_t
**
ppSem
)
{
int32_t
ret
=
TSDB_CODE_SUCCESS
;
int32_t
ret
=
TSDB_CODE_SUCCESS
;
assert
(
pBlock
->
dataLen
==
sizeof
(
SControlData
));
SControlData
*
pCtlData
=
(
SControlData
*
)
pBlock
->
data
;
SControlData
*
pCtlData
=
(
SControlData
*
)
pBlock
->
data
;
// INIT SEM FOR ASYNC WAIT COMMIT RESULT
// INIT SEM FOR ASYNC WAIT COMMIT RESULT
...
@@ -1136,15 +1135,18 @@ int32_t tsdbInsertControlData(STsdbRepo* pRepo, SSubmitBlk* pBlock, SShellSubmit
...
@@ -1136,15 +1135,18 @@ int32_t tsdbInsertControlData(STsdbRepo* pRepo, SSubmitBlk* pBlock, SShellSubmit
// anti-serialize
// anti-serialize
pCtlData
->
command
=
htonl
(
pCtlData
->
command
);
pCtlData
->
command
=
htonl
(
pCtlData
->
command
);
pCtlData
->
tnum
=
htonl
(
pCtlData
->
tnum
);
pCtlData
->
win
.
skey
=
htobe64
(
pCtlData
->
win
.
skey
);
pCtlData
->
win
.
skey
=
htobe64
(
pCtlData
->
win
.
skey
);
pCtlData
->
win
.
ekey
=
htobe64
(
pCtlData
->
win
.
ekey
);
pCtlData
->
win
.
ekey
=
htobe64
(
pCtlData
->
win
.
ekey
);
for
(
int32_t
i
=
0
;
i
<
pCtlData
->
tnum
;
i
++
)
{
pCtlData
->
tids
[
i
]
=
htonl
(
pCtlData
->
tids
[
i
]);
}
// server data set
// server data set
SControlDataInfo
*
pNew
=
(
SControlDataInfo
*
)
tmalloc
(
sizeof
(
SControlDataInfo
));
size_t
nsize
=
sizeof
(
SControlDataInfo
)
+
pCtlData
->
tnum
*
sizeof
(
int32_t
);
memset
(
pNew
,
0
,
sizeof
(
SControlDataInfo
));
SControlDataInfo
*
pNew
=
(
SControlDataInfo
*
)
tmalloc
(
nsize
);
pNew
->
ctlData
=
*
pCtlData
;
memset
(
pNew
,
0
,
nsize
);
pNew
->
uid
=
pBlock
->
uid
;
memcpy
(
&
pNew
->
ctlData
,
pCtlData
,
GET_CTLDATA_SIZE
(
pCtlData
));
pNew
->
tid
=
pBlock
->
tid
;
pNew
->
pRsp
=
pRsp
;
pNew
->
pRsp
=
pRsp
;
if
(
ppSem
)
if
(
ppSem
)
pNew
->
pSem
=
*
ppSem
;
pNew
->
pSem
=
*
ppSem
;
...
@@ -1155,9 +1157,12 @@ int32_t tsdbInsertControlData(STsdbRepo* pRepo, SSubmitBlk* pBlock, SShellSubmit
...
@@ -1155,9 +1157,12 @@ int32_t tsdbInsertControlData(STsdbRepo* pRepo, SSubmitBlk* pBlock, SShellSubmit
}
}
// if async post failed , must set wait event ppSem NULL
// if async post failed , must set wait event ppSem NULL
if
(
ret
!=
TSDB_CODE_SUCCESS
&&
ppSem
)
{
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
tsem_destroy
(
*
ppSem
);
if
(
*
ppSem
)
{
*
ppSem
=
NULL
;
tsem_destroy
(
*
ppSem
);
*
ppSem
=
NULL
;
}
tfree
(
pNew
);
}
}
return
ret
;
return
ret
;
...
...
src/tsdb/src/tsdbTruncate.c
浏览文件 @
12f716b3
...
@@ -15,6 +15,18 @@
...
@@ -15,6 +15,18 @@
#include "tsdbint.h"
#include "tsdbint.h"
#include "tsdbTruncate.h"
#include "tsdbTruncate.h"
enum
{
TSDB_NO_TRUNCATE
,
TSDB_IN_TRUNCATE
,
TSDB_WAITING_TRUNCATE
,
};
enum
BlockSolve
{
BLOCK_RETAIN
=
0
,
BLOCK_MODIFY
,
BLOCK_DELETE
};
typedef
struct
{
typedef
struct
{
STable
*
pTable
;
STable
*
pTable
;
SBlockIdx
*
pBlkIdx
;
SBlockIdx
*
pBlkIdx
;
...
@@ -30,51 +42,46 @@ typedef struct {
...
@@ -30,51 +42,46 @@ typedef struct {
SDFileSet
wSet
;
SDFileSet
wSet
;
SArray
*
aBlkIdx
;
SArray
*
aBlkIdx
;
SArray
*
aSupBlk
;
SArray
*
aSupBlk
;
SArray
*
aSubBlk
;
SDataCols
*
pDCols
;
SDataCols
*
pDCols
;
SControlDataInfo
*
pCtlInfo
;
SControlDataInfo
*
pCtlInfo
;
}
STruncateH
;
}
STruncateH
;
#define TSDB_TRUNCATE_WSET(prh) (&((prh)->wSet))
#define TSDB_TRUNCATE_REPO(prh) TSDB_READ_REPO(&((prh)->readh))
#define TSDB_TRUNCATE_WSET(ptru) (&((ptru)->wSet))
#define TSDB_TRUNCATE_HEAD_FILE(prh) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(prh), TSDB_FILE_HEAD)
#define TSDB_TRUNCATE_REPO(ptru) TSDB_READ_REPO(&((ptru)->readh))
#define TSDB_TRUNCATE_DATA_FILE(prh) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(prh), TSDB_FILE_DATA)
#define TSDB_TRUNCATE_HEAD_FILE(ptru) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(ptru), TSDB_FILE_HEAD)
#define TSDB_TRUNCATE_LAST_FILE(prh) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(prh), TSDB_FILE_LAST)
#define TSDB_TRUNCATE_DATA_FILE(ptru) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(ptru), TSDB_FILE_DATA)
#define TSDB_TRUNCATE_SMAD_FILE(prh) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(prh), TSDB_FILE_SMAD)
#define TSDB_TRUNCATE_LAST_FILE(ptru) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(ptru), TSDB_FILE_LAST)
#define TSDB_TRUNCATE_SMAL_FILE(prh) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(prh), TSDB_FILE_SMAL)
#define TSDB_TRUNCATE_SMAD_FILE(ptru) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(ptru), TSDB_FILE_SMAD)
#define TSDB_TRUNCATE_BUF(prh) TSDB_READ_BUF(&((prh)->readh))
#define TSDB_TRUNCATE_SMAL_FILE(ptru) TSDB_DFILE_IN_SET(TSDB_TRUNCATE_WSET(ptru), TSDB_FILE_SMAL)
#define TSDB_TRUNCATE_COMP_BUF(prh) TSDB_READ_COMP_BUF(&((prh)->readh))
#define TSDB_TRUNCATE_BUF(ptru) TSDB_READ_BUF(&((ptru)->readh))
#define TSDB_TRUNCATE_EXBUF(prh) TSDB_READ_EXBUF(&((prh)->readh))
#define TSDB_TRUNCATE_COMP_BUF(ptru) TSDB_READ_COMP_BUF(&((ptru)->readh))
#define TSDB_TRUNCATE_EXBUF(ptru) TSDB_READ_EXBUF(&((ptru)->readh))
static
void
tsdbStartTruncate
(
STsdbRepo
*
pRepo
);
static
void
tsdbStartTruncate
(
STsdbRepo
*
pRepo
);
static
void
tsdbEndTruncate
(
STsdbRepo
*
pRepo
,
int
eno
);
static
void
tsdbEndTruncate
(
STsdbRepo
*
pRepo
,
int
eno
);
static
int
tsdbTruncateMeta
(
STsdbRepo
*
pRepo
);
static
int
tsdbTruncateMeta
(
STsdbRepo
*
pRepo
);
static
int
tsdbTruncateTSData
(
STsdbRepo
*
pRepo
,
SControlDataInfo
*
pCtlInfo
);
static
int
tsdbTruncateTSData
(
STsdbRepo
*
pRepo
,
SControlDataInfo
*
pCtlInfo
);
static
int
tsdb
TruncateFSet
(
STruncateH
*
prh
,
SDFileSet
*
pSet
);
static
int
tsdb
FSetTruncate
(
STruncateH
*
ptru
,
SDFileSet
*
pSet
);
static
int
tsdb
DeleteFSet
(
STruncateH
*
prh
,
SDFileSet
*
pSet
);
static
int
tsdb
FSetDelete
(
STruncateH
*
ptru
,
SDFileSet
*
pSet
);
static
int
tsdbInitTruncateH
(
STruncateH
*
p
rh
,
STsdbRepo
*
pRepo
);
static
int
tsdbInitTruncateH
(
STruncateH
*
p
tru
,
STsdbRepo
*
pRepo
);
static
void
tsdbDestroyTruncateH
(
STruncateH
*
p
rh
);
static
void
tsdbDestroyTruncateH
(
STruncateH
*
p
tru
);
static
int
tsdbInitTruncateTblArray
(
STruncateH
*
p
rh
);
static
int
tsdbInitTruncateTblArray
(
STruncateH
*
p
tru
);
static
void
tsdbDestroyTruncateTblArray
(
STruncateH
*
p
rh
);
static
void
tsdbDestroyTruncateTblArray
(
STruncateH
*
p
tru
);
static
int
tsdbCacheFSetIndex
(
STruncateH
*
p
rh
);
static
int
tsdbCacheFSetIndex
(
STruncateH
*
p
tru
);
static
int
tsdbTruncateCache
(
STsdbRepo
*
pRepo
,
void
*
param
);
static
int
tsdbTruncateCache
(
STsdbRepo
*
pRepo
,
void
*
param
);
static
int
tsdb
TruncateFSetInit
(
STruncateH
*
prh
,
SDFileSet
*
pSet
);
static
int
tsdb
FSetInit
(
STruncateH
*
ptru
,
SDFileSet
*
pSet
);
static
void
tsdbTruncateFSetEnd
(
STruncateH
*
p
rh
);
static
void
tsdbTruncateFSetEnd
(
STruncateH
*
p
tru
);
static
int
tsdbTruncateFSetImpl
(
STruncateH
*
p
rh
);
static
int
tsdbTruncateFSetImpl
(
STruncateH
*
p
tru
);
static
int
tsdb
DeleteFSetImpl
(
STruncateH
*
prh
);
static
int
tsdb
FSetDeleteImpl
(
STruncateH
*
ptru
);
static
bool
tsdbBlockInterleaved
(
STruncateH
*
prh
,
SBlock
*
pBlock
);
static
int
tsdbBlockSolve
(
STruncateH
*
ptru
,
SBlock
*
pBlock
);
static
int
tsdbWriteBlockTo
RightFile
(
STruncateH
*
prh
,
STable
*
pTable
,
SDataCols
*
pDCols
,
void
**
ppBuf
,
static
int
tsdbWriteBlockTo
File
(
STruncateH
*
ptru
,
STable
*
pTable
,
SDataCols
*
pDCols
,
void
**
ppBuf
,
void
**
ppCBuf
,
void
**
ppExBuf
);
void
**
ppCBuf
,
void
**
ppExBuf
,
SBlock
*
pBlock
);
static
int
tsdbTruncateImplCommon
(
STsdbRepo
*
pRepo
,
SControlDataInfo
*
pCtlInfo
);
static
int
tsdbTruncateImplCommon
(
STsdbRepo
*
pRepo
,
SControlDataInfo
*
pCtlInfo
);
enum
{
TSDB_NO_TRUNCATE
,
TSDB_IN_TRUNCATE
,
TSDB_WAITING_TRUNCATE
,
};
// delete
// delete
int
tsdbControlDelete
(
STsdbRepo
*
pRepo
,
SControlDataInfo
*
pCtlInfo
)
{
int
tsdbControlDelete
(
STsdbRepo
*
pRepo
,
SControlDataInfo
*
pCtlInfo
)
{
int
ret
=
TSDB_CODE_SUCCESS
;
int
ret
=
TSDB_CODE_SUCCESS
;
...
@@ -175,7 +182,7 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
...
@@ -175,7 +182,7 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
STruncateH
truncateH
=
{
0
};
STruncateH
truncateH
=
{
0
};
SDFileSet
*
pSet
=
NULL
;
SDFileSet
*
pSet
=
NULL
;
tsdbDebug
(
"vgId:%d start to truncate TS data for %
"
PRIu64
,
REPO_ID
(
pRepo
),
pCtlInfo
->
uid
);
tsdbDebug
(
"vgId:%d start to truncate TS data for %
d"
,
REPO_ID
(
pRepo
),
pCtlInfo
->
ctlData
.
tids
[
0
]
);
if
(
tsdbInitTruncateH
(
&
truncateH
,
pRepo
)
<
0
)
{
if
(
tsdbInitTruncateH
(
&
truncateH
,
pRepo
)
<
0
)
{
return
-
1
;
return
-
1
;
...
@@ -213,13 +220,13 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
...
@@ -213,13 +220,13 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
#endif
#endif
if
(
pCtlInfo
->
ctlData
.
command
==
CMD_TRUNCATE
)
{
if
(
pCtlInfo
->
ctlData
.
command
==
CMD_TRUNCATE
)
{
if
(
tsdb
TruncateFSet
(
&
truncateH
,
pSet
)
<
0
)
{
if
(
tsdb
FSetTruncate
(
&
truncateH
,
pSet
)
<
0
)
{
tsdbDestroyTruncateH
(
&
truncateH
);
tsdbDestroyTruncateH
(
&
truncateH
);
tsdbError
(
"vgId:%d failed to truncate table in FSET %d since %s"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
tstrerror
(
terrno
));
tsdbError
(
"vgId:%d failed to truncate table in FSET %d since %s"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
tstrerror
(
terrno
));
return
-
1
;
return
-
1
;
}
}
}
else
if
(
pCtlInfo
->
ctlData
.
command
==
CMD_DELETE_DATA
)
{
}
else
if
(
pCtlInfo
->
ctlData
.
command
==
CMD_DELETE_DATA
)
{
if
(
tsdb
DeleteFSet
(
&
truncateH
,
pSet
)
<
0
)
{
if
(
tsdb
FSetDelete
(
&
truncateH
,
pSet
)
<
0
)
{
tsdbDestroyTruncateH
(
&
truncateH
);
tsdbDestroyTruncateH
(
&
truncateH
);
tsdbError
(
"vgId:%d failed to truncate data in FSET %d since %s"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
tstrerror
(
terrno
));
tsdbError
(
"vgId:%d failed to truncate data in FSET %d since %s"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
tstrerror
(
terrno
));
return
-
1
;
return
-
1
;
...
@@ -235,68 +242,68 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
...
@@ -235,68 +242,68 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
return
0
;
return
0
;
}
}
static
int
tsdb
DeleteFSet
(
STruncateH
*
prh
,
SDFileSet
*
pSet
)
{
static
int
tsdb
FSetDelete
(
STruncateH
*
ptru
,
SDFileSet
*
pSet
)
{
STsdbRepo
*
pRepo
=
TSDB_TRUNCATE_REPO
(
p
rh
);
STsdbRepo
*
pRepo
=
TSDB_TRUNCATE_REPO
(
p
tru
);
SDiskID
did
=
{
0
};
SDiskID
did
=
{
0
};
tsdbDebug
(
"vgId:%d start to truncate data in FSET %d on level %d id %d"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
tsdbDebug
(
"vgId:%d start to truncate data in FSET %d on level %d id %d"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
TSDB_FSET_LEVEL
(
pSet
),
TSDB_FSET_ID
(
pSet
));
TSDB_FSET_LEVEL
(
pSet
),
TSDB_FSET_ID
(
pSet
));
if
(
tsdb
TruncateFSetInit
(
prh
,
pSet
)
<
0
)
{
if
(
tsdb
FSetInit
(
ptru
,
pSet
)
<
0
)
{
return
-
1
;
return
-
1
;
}
}
// Create new fset as deleted fset
// Create new fset as deleted fset
tfsAllocDisk
(
tsdbGetFidLevel
(
pSet
->
fid
,
&
(
p
rh
->
rtn
)),
&
(
did
.
level
),
&
(
did
.
id
));
tfsAllocDisk
(
tsdbGetFidLevel
(
pSet
->
fid
,
&
(
p
tru
->
rtn
)),
&
(
did
.
level
),
&
(
did
.
id
));
if
(
did
.
level
==
TFS_UNDECIDED_LEVEL
)
{
if
(
did
.
level
==
TFS_UNDECIDED_LEVEL
)
{
terrno
=
TSDB_CODE_TDB_NO_AVAIL_DISK
;
terrno
=
TSDB_CODE_TDB_NO_AVAIL_DISK
;
tsdbError
(
"vgId:%d failed to truncate data in FSET %d since %s"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
tstrerror
(
terrno
));
tsdbError
(
"vgId:%d failed to truncate data in FSET %d since %s"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
tstrerror
(
terrno
));
tsdbTruncateFSetEnd
(
p
rh
);
tsdbTruncateFSetEnd
(
p
tru
);
return
-
1
;
return
-
1
;
}
}
tsdbInitDFileSet
(
TSDB_TRUNCATE_WSET
(
p
rh
),
did
,
REPO_ID
(
pRepo
),
TSDB_FSET_FID
(
pSet
),
tsdbInitDFileSet
(
TSDB_TRUNCATE_WSET
(
p
tru
),
did
,
REPO_ID
(
pRepo
),
TSDB_FSET_FID
(
pSet
),
FS_TXN_VERSION
(
REPO_FS
(
pRepo
)),
TSDB_LATEST_FSET_VER
);
FS_TXN_VERSION
(
REPO_FS
(
pRepo
)),
TSDB_LATEST_FSET_VER
);
if
(
tsdbCreateDFileSet
(
TSDB_TRUNCATE_WSET
(
p
rh
),
true
)
<
0
)
{
if
(
tsdbCreateDFileSet
(
TSDB_TRUNCATE_WSET
(
p
tru
),
true
)
<
0
)
{
tsdbError
(
"vgId:%d failed to truncate data in FSET %d since %s"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
tstrerror
(
terrno
));
tsdbError
(
"vgId:%d failed to truncate data in FSET %d since %s"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
tstrerror
(
terrno
));
tsdbTruncateFSetEnd
(
p
rh
);
tsdbTruncateFSetEnd
(
p
tru
);
return
-
1
;
return
-
1
;
}
}
if
(
tsdb
DeleteFSetImpl
(
prh
)
<
0
)
{
if
(
tsdb
FSetDeleteImpl
(
ptru
)
<
0
)
{
tsdbCloseDFileSet
(
TSDB_TRUNCATE_WSET
(
p
rh
));
tsdbCloseDFileSet
(
TSDB_TRUNCATE_WSET
(
p
tru
));
tsdbRemoveDFileSet
(
TSDB_TRUNCATE_WSET
(
p
rh
));
tsdbRemoveDFileSet
(
TSDB_TRUNCATE_WSET
(
p
tru
));
tsdbTruncateFSetEnd
(
p
rh
);
tsdbTruncateFSetEnd
(
p
tru
);
return
-
1
;
return
-
1
;
}
}
tsdbCloseDFileSet
(
TSDB_TRUNCATE_WSET
(
p
rh
));
tsdbCloseDFileSet
(
TSDB_TRUNCATE_WSET
(
p
tru
));
tsdbUpdateDFileSet
(
REPO_FS
(
pRepo
),
TSDB_TRUNCATE_WSET
(
p
rh
));
tsdbUpdateDFileSet
(
REPO_FS
(
pRepo
),
TSDB_TRUNCATE_WSET
(
p
tru
));
tsdbDebug
(
"vgId:%d FSET %d truncate data over"
,
REPO_ID
(
pRepo
),
pSet
->
fid
);
tsdbDebug
(
"vgId:%d FSET %d truncate data over"
,
REPO_ID
(
pRepo
),
pSet
->
fid
);
tsdbTruncateFSetEnd
(
p
rh
);
tsdbTruncateFSetEnd
(
p
tru
);
return
0
;
return
0
;
}
}
static
int
tsdb
TruncateFSet
(
STruncateH
*
prh
,
SDFileSet
*
pSet
)
{
static
int
tsdb
FSetTruncate
(
STruncateH
*
ptru
,
SDFileSet
*
pSet
)
{
STsdbRepo
*
pRepo
=
TSDB_TRUNCATE_REPO
(
p
rh
);
STsdbRepo
*
pRepo
=
TSDB_TRUNCATE_REPO
(
p
tru
);
SDiskID
did
=
{
0
};
SDiskID
did
=
{
0
};
SDFileSet
*
pWSet
=
TSDB_TRUNCATE_WSET
(
p
rh
);
SDFileSet
*
pWSet
=
TSDB_TRUNCATE_WSET
(
p
tru
);
tsdbDebug
(
"vgId:%d start to truncate table in FSET %d on level %d id %d"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
tsdbDebug
(
"vgId:%d start to truncate table in FSET %d on level %d id %d"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
TSDB_FSET_LEVEL
(
pSet
),
TSDB_FSET_ID
(
pSet
));
TSDB_FSET_LEVEL
(
pSet
),
TSDB_FSET_ID
(
pSet
));
if
(
tsdb
TruncateFSetInit
(
prh
,
pSet
)
<
0
)
{
if
(
tsdb
FSetInit
(
ptru
,
pSet
)
<
0
)
{
return
-
1
;
return
-
1
;
}
}
// Create new fset as truncated fset
// Create new fset as truncated fset
tfsAllocDisk
(
tsdbGetFidLevel
(
pSet
->
fid
,
&
(
p
rh
->
rtn
)),
&
(
did
.
level
),
&
(
did
.
id
));
tfsAllocDisk
(
tsdbGetFidLevel
(
pSet
->
fid
,
&
(
p
tru
->
rtn
)),
&
(
did
.
level
),
&
(
did
.
id
));
if
(
did
.
level
==
TFS_UNDECIDED_LEVEL
)
{
if
(
did
.
level
==
TFS_UNDECIDED_LEVEL
)
{
terrno
=
TSDB_CODE_TDB_NO_AVAIL_DISK
;
terrno
=
TSDB_CODE_TDB_NO_AVAIL_DISK
;
tsdbError
(
"vgId:%d failed to truncate table in FSET %d since %s"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
tstrerror
(
terrno
));
tsdbError
(
"vgId:%d failed to truncate table in FSET %d since %s"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
tstrerror
(
terrno
));
tsdbTruncateFSetEnd
(
p
rh
);
tsdbTruncateFSetEnd
(
p
tru
);
return
-
1
;
return
-
1
;
}
}
...
@@ -320,81 +327,90 @@ static int tsdbTruncateFSet(STruncateH *prh, SDFileSet *pSet) {
...
@@ -320,81 +327,90 @@ static int tsdbTruncateFSet(STruncateH *prh, SDFileSet *pSet) {
return
-
1
;
return
-
1
;
}
}
if
(
tsdbTruncateFSetImpl
(
p
rh
)
<
0
)
{
if
(
tsdbTruncateFSetImpl
(
p
tru
)
<
0
)
{
tsdbCloseDFileSet
(
TSDB_TRUNCATE_WSET
(
p
rh
));
tsdbCloseDFileSet
(
TSDB_TRUNCATE_WSET
(
p
tru
));
tsdbRemoveDFileSet
(
TSDB_TRUNCATE_WSET
(
p
rh
));
tsdbRemoveDFileSet
(
TSDB_TRUNCATE_WSET
(
p
tru
));
tsdbTruncateFSetEnd
(
p
rh
);
tsdbTruncateFSetEnd
(
p
tru
);
return
-
1
;
return
-
1
;
}
}
tsdbCloseDFileSet
(
TSDB_TRUNCATE_WSET
(
p
rh
));
tsdbCloseDFileSet
(
TSDB_TRUNCATE_WSET
(
p
tru
));
tsdbUpdateDFileSet
(
REPO_FS
(
pRepo
),
TSDB_TRUNCATE_WSET
(
p
rh
));
tsdbUpdateDFileSet
(
REPO_FS
(
pRepo
),
TSDB_TRUNCATE_WSET
(
p
tru
));
tsdbDebug
(
"vgId:%d FSET %d truncate table over"
,
REPO_ID
(
pRepo
),
pSet
->
fid
);
tsdbDebug
(
"vgId:%d FSET %d truncate table over"
,
REPO_ID
(
pRepo
),
pSet
->
fid
);
tsdbTruncateFSetEnd
(
p
rh
);
tsdbTruncateFSetEnd
(
p
tru
);
return
0
;
return
0
;
}
}
static
int
tsdbInitTruncateH
(
STruncateH
*
p
rh
,
STsdbRepo
*
pRepo
)
{
static
int
tsdbInitTruncateH
(
STruncateH
*
p
tru
,
STsdbRepo
*
pRepo
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
memset
(
p
rh
,
0
,
sizeof
(
*
prh
));
memset
(
p
tru
,
0
,
sizeof
(
*
ptru
));
TSDB_FSET_SET_CLOSED
(
TSDB_TRUNCATE_WSET
(
p
rh
));
TSDB_FSET_SET_CLOSED
(
TSDB_TRUNCATE_WSET
(
p
tru
));
tsdbGetRtnSnap
(
pRepo
,
&
(
p
rh
->
rtn
));
tsdbGetRtnSnap
(
pRepo
,
&
(
p
tru
->
rtn
));
tsdbFSIterInit
(
&
(
p
rh
->
fsIter
),
REPO_FS
(
pRepo
),
TSDB_FS_ITER_FORWARD
);
tsdbFSIterInit
(
&
(
p
tru
->
fsIter
),
REPO_FS
(
pRepo
),
TSDB_FS_ITER_FORWARD
);
if
(
tsdbInitReadH
(
&
(
p
rh
->
readh
),
pRepo
)
<
0
)
{
if
(
tsdbInitReadH
(
&
(
p
tru
->
readh
),
pRepo
)
<
0
)
{
return
-
1
;
return
-
1
;
}
}
if
(
tsdbInitTruncateTblArray
(
p
rh
)
<
0
)
{
if
(
tsdbInitTruncateTblArray
(
p
tru
)
<
0
)
{
tsdbDestroyTruncateH
(
p
rh
);
tsdbDestroyTruncateH
(
p
tru
);
return
-
1
;
return
-
1
;
}
}
p
rh
->
aBlkIdx
=
taosArrayInit
(
1024
,
sizeof
(
SBlockIdx
));
p
tru
->
aBlkIdx
=
taosArrayInit
(
1024
,
sizeof
(
SBlockIdx
));
if
(
p
rh
->
aBlkIdx
==
NULL
)
{
if
(
p
tru
->
aBlkIdx
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
tsdbDestroyTruncateH
(
p
rh
);
tsdbDestroyTruncateH
(
p
tru
);
return
-
1
;
return
-
1
;
}
}
p
rh
->
aSupBlk
=
taosArrayInit
(
1024
,
sizeof
(
SBlock
));
p
tru
->
aSupBlk
=
taosArrayInit
(
1024
,
sizeof
(
SBlock
));
if
(
p
rh
->
aSupBlk
==
NULL
)
{
if
(
p
tru
->
aSupBlk
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
tsdbDestroyTruncateH
(
p
rh
);
tsdbDestroyTruncateH
(
p
tru
);
return
-
1
;
return
-
1
;
}
}
p
rh
->
pDCols
=
tdNewDataCols
(
0
,
pCfg
->
maxRowsPerFileBlock
);
p
tru
->
aSubBlk
=
taosArrayInit
(
20
,
sizeof
(
SBlock
)
);
if
(
p
rh
->
pDCols
==
NULL
)
{
if
(
p
tru
->
aSubBlk
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
tsdbDestroyTruncateH
(
prh
);
tsdbDestroyTruncateH
(
ptru
);
return
-
1
;
}
ptru
->
pDCols
=
tdNewDataCols
(
0
,
pCfg
->
maxRowsPerFileBlock
);
if
(
ptru
->
pDCols
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
tsdbDestroyTruncateH
(
ptru
);
return
-
1
;
return
-
1
;
}
}
return
0
;
return
0
;
}
}
static
void
tsdbDestroyTruncateH
(
STruncateH
*
prh
)
{
static
void
tsdbDestroyTruncateH
(
STruncateH
*
ptru
)
{
prh
->
pDCols
=
tdFreeDataCols
(
prh
->
pDCols
);
ptru
->
pDCols
=
tdFreeDataCols
(
ptru
->
pDCols
);
prh
->
aSupBlk
=
taosArrayDestroy
(
&
prh
->
aSupBlk
);
ptru
->
aSupBlk
=
taosArrayDestroy
(
&
ptru
->
aSupBlk
);
prh
->
aBlkIdx
=
taosArrayDestroy
(
&
prh
->
aBlkIdx
);
ptru
->
aSubBlk
=
taosArrayDestroy
(
&
ptru
->
aSubBlk
);
tsdbDestroyTruncateTblArray
(
prh
);
ptru
->
aBlkIdx
=
taosArrayDestroy
(
&
ptru
->
aBlkIdx
);
tsdbDestroyReadH
(
&
(
prh
->
readh
));
tsdbDestroyTruncateTblArray
(
ptru
);
tsdbCloseDFileSet
(
TSDB_TRUNCATE_WSET
(
prh
));
tsdbDestroyReadH
(
&
(
ptru
->
readh
));
tsdbCloseDFileSet
(
TSDB_TRUNCATE_WSET
(
ptru
));
}
}
static
int
tsdbInitTruncateTblArray
(
STruncateH
*
prh
)
{
// init tbl array with pRepo->meta
STsdbRepo
*
pRepo
=
TSDB_TRUNCATE_REPO
(
prh
);
static
int
tsdbInitTruncateTblArray
(
STruncateH
*
ptru
)
{
STsdbRepo
*
pRepo
=
TSDB_TRUNCATE_REPO
(
ptru
);
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
if
(
tsdbRLockRepoMeta
(
pRepo
)
<
0
)
return
-
1
;
if
(
tsdbRLockRepoMeta
(
pRepo
)
<
0
)
return
-
1
;
p
rh
->
tblArray
=
taosArrayInit
(
pMeta
->
maxTables
,
sizeof
(
STableTruncateH
));
p
tru
->
tblArray
=
taosArrayInit
(
pMeta
->
maxTables
,
sizeof
(
STableTruncateH
));
if
(
p
rh
->
tblArray
==
NULL
)
{
if
(
p
tru
->
tblArray
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
tsdbUnlockRepoMeta
(
pRepo
);
tsdbUnlockRepoMeta
(
pRepo
);
return
-
1
;
return
-
1
;
...
@@ -402,13 +418,13 @@ static int tsdbInitTruncateTblArray(STruncateH *prh) {
...
@@ -402,13 +418,13 @@ static int tsdbInitTruncateTblArray(STruncateH *prh) {
// Note here must start from 0
// Note here must start from 0
for
(
int
i
=
0
;
i
<
pMeta
->
maxTables
;
++
i
)
{
for
(
int
i
=
0
;
i
<
pMeta
->
maxTables
;
++
i
)
{
STableTruncateH
ch
=
{
0
};
STableTruncateH
tbl
=
{
0
};
if
(
pMeta
->
tables
[
i
]
!=
NULL
)
{
if
(
pMeta
->
tables
[
i
]
!=
NULL
)
{
tsdbRefTable
(
pMeta
->
tables
[
i
]);
tsdbRefTable
(
pMeta
->
tables
[
i
]);
ch
.
pTable
=
pMeta
->
tables
[
i
];
tbl
.
pTable
=
pMeta
->
tables
[
i
];
}
}
if
(
taosArrayPush
(
p
rh
->
tblArray
,
&
ch
)
==
NULL
)
{
if
(
taosArrayPush
(
p
tru
->
tblArray
,
&
tbl
)
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
tsdbUnlockRepoMeta
(
pRepo
);
tsdbUnlockRepoMeta
(
pRepo
);
return
-
1
;
return
-
1
;
...
@@ -419,46 +435,46 @@ static int tsdbInitTruncateTblArray(STruncateH *prh) {
...
@@ -419,46 +435,46 @@ static int tsdbInitTruncateTblArray(STruncateH *prh) {
return
0
;
return
0
;
}
}
static
void
tsdbDestroyTruncateTblArray
(
STruncateH
*
p
rh
)
{
static
void
tsdbDestroyTruncateTblArray
(
STruncateH
*
p
tru
)
{
STableTruncateH
*
p
Handle
=
NULL
;
STableTruncateH
*
p
Item
=
NULL
;
if
(
p
rh
->
tblArray
==
NULL
)
return
;
if
(
p
tru
->
tblArray
==
NULL
)
return
;
for
(
size_t
i
=
0
;
i
<
taosArrayGetSize
(
p
rh
->
tblArray
);
++
i
)
{
for
(
size_t
i
=
0
;
i
<
taosArrayGetSize
(
p
tru
->
tblArray
);
++
i
)
{
p
Handle
=
(
STableTruncateH
*
)
taosArrayGet
(
prh
->
tblArray
,
i
);
p
Item
=
(
STableTruncateH
*
)
taosArrayGet
(
ptru
->
tblArray
,
i
);
if
(
p
Handle
->
pTable
)
{
if
(
p
Item
->
pTable
)
{
tsdbUnRefTable
(
p
Handle
->
pTable
);
tsdbUnRefTable
(
p
Item
->
pTable
);
}
}
tfree
(
p
Handle
->
pInfo
);
tfree
(
p
Item
->
pInfo
);
}
}
p
rh
->
tblArray
=
taosArrayDestroy
(
&
prh
->
tblArray
);
p
tru
->
tblArray
=
taosArrayDestroy
(
&
ptru
->
tblArray
);
}
}
static
int
tsdbCacheFSetIndex
(
STruncateH
*
p
rh
)
{
static
int
tsdbCacheFSetIndex
(
STruncateH
*
p
tru
)
{
SReadH
*
pReadH
=
&
(
p
rh
->
readh
);
SReadH
*
pReadH
=
&
(
p
tru
->
readh
);
if
(
tsdbLoadBlockIdx
(
pReadH
)
<
0
)
{
if
(
tsdbLoadBlockIdx
(
pReadH
)
<
0
)
{
return
-
1
;
return
-
1
;
}
}
size_t
tblArraySize
=
taosArrayGetSize
(
prh
->
tblArray
);
size_t
cnt
=
taosArrayGetSize
(
ptru
->
tblArray
);
for
(
size_t
tid
=
1
;
tid
<
tblArraySize
;
++
tid
)
{
for
(
size_t
tid
=
1
;
tid
<
cnt
;
++
tid
)
{
STableTruncateH
*
p
Handle
=
(
STableTruncateH
*
)
taosArrayGet
(
prh
->
tblArray
,
tid
);
STableTruncateH
*
p
Item
=
(
STableTruncateH
*
)
taosArrayGet
(
ptru
->
tblArray
,
tid
);
p
Handle
->
pBlkIdx
=
NULL
;
p
Item
->
pBlkIdx
=
NULL
;
if
(
pHandle
->
pTable
==
NULL
)
continue
;
if
(
pItem
->
pTable
==
NULL
)
if
(
tsdbSetReadTable
(
pReadH
,
pHandle
->
pTable
)
<
0
)
{
continue
;
if
(
tsdbSetReadTable
(
pReadH
,
pItem
->
pTable
)
<
0
)
return
-
1
;
return
-
1
;
}
if
(
pReadH
->
pBlkIdx
==
NULL
)
continue
;
if
(
pReadH
->
pBlkIdx
==
NULL
)
continue
;
pItem
->
bIndex
=
*
(
pReadH
->
pBlkIdx
);
pHandle
->
bIndex
=
*
(
pReadH
->
pBlkIdx
);
pItem
->
pBlkIdx
=
&
(
pItem
->
bIndex
);
pHandle
->
pBlkIdx
=
&
(
pHandle
->
bIndex
);
uint32_t
originLen
=
0
;
uint32_t
originLen
=
0
;
if
(
tsdbLoadBlockInfo
(
pReadH
,
(
void
**
)(
&
(
p
Handle
->
pInfo
)),
&
originLen
)
<
0
)
{
if
(
tsdbLoadBlockInfo
(
pReadH
,
(
void
**
)(
&
(
p
Item
->
pInfo
)),
&
originLen
)
<
0
)
{
return
-
1
;
return
-
1
;
}
}
}
}
...
@@ -466,39 +482,30 @@ static int tsdbCacheFSetIndex(STruncateH *prh) {
...
@@ -466,39 +482,30 @@ static int tsdbCacheFSetIndex(STruncateH *prh) {
return
0
;
return
0
;
}
}
static
int
tsdb
TruncateFSetInit
(
STruncateH
*
prh
,
SDFileSet
*
pSet
)
{
static
int
tsdb
FSetInit
(
STruncateH
*
ptru
,
SDFileSet
*
pSet
)
{
taosArrayClear
(
p
rh
->
aBlkIdx
);
taosArrayClear
(
p
tru
->
aBlkIdx
);
taosArrayClear
(
p
rh
->
aSupBlk
);
taosArrayClear
(
p
tru
->
aSupBlk
);
if
(
tsdbSetAndOpenReadFSet
(
&
(
prh
->
readh
),
pSet
)
<
0
)
{
// open
if
(
tsdbSetAndOpenReadFSet
(
&
(
ptru
->
readh
),
pSet
)
<
0
)
{
return
-
1
;
return
-
1
;
}
}
if
(
tsdbCacheFSetIndex
(
prh
)
<
0
)
{
// load index to cache
tsdbCloseAndUnsetFSet
(
&
(
prh
->
readh
));
if
(
tsdbCacheFSetIndex
(
ptru
)
<
0
)
{
tsdbCloseAndUnsetFSet
(
&
(
ptru
->
readh
));
return
-
1
;
return
-
1
;
}
}
return
0
;
return
0
;
}
}
static
void
tsdbTruncateFSetEnd
(
STruncateH
*
p
rh
)
{
tsdbCloseAndUnsetFSet
(
&
(
prh
->
readh
));
}
static
void
tsdbTruncateFSetEnd
(
STruncateH
*
p
tru
)
{
tsdbCloseAndUnsetFSet
(
&
(
ptru
->
readh
));
}
static
bool
tsdbBlockInterleaved
(
STruncateH
*
prh
,
SBlock
*
pBlock
)
{
// STruncateTblMsg *pMsg = (STruncateTblMsg *)prh->param;
// for (uint16_t i = 0; i < pMsg->nSpan; ++i) {
// STimeWindow tw = pMsg->span[i];
// if (!(pBlock->keyFirst > tw.ekey || pBlock->keyLast < tw.skey)) {
// return true;
// }
// }
// return false;
return
true
;
}
static
int32_t
tsdbFilterDataCols
(
STruncateH
*
p
rh
,
SDataCols
*
pSrcDCols
)
{
static
int32_t
tsdbFilterDataCols
(
STruncateH
*
p
tru
,
SDataCols
*
pSrcDCols
)
{
SDataCols
*
pDstDCols
=
p
rh
->
pDCols
;
SDataCols
*
pDstDCols
=
p
tru
->
pDCols
;
SControlData
*
pCtlData
=
&
p
rh
->
pCtlInfo
->
ctlData
;
SControlData
*
pCtlData
=
&
p
tru
->
pCtlInfo
->
ctlData
;
tdResetDataCols
(
pDstDCols
);
tdResetDataCols
(
pDstDCols
);
pDstDCols
->
maxCols
=
pSrcDCols
->
maxCols
;
pDstDCols
->
maxCols
=
pSrcDCols
->
maxCols
;
...
@@ -518,44 +525,52 @@ static int32_t tsdbFilterDataCols(STruncateH *prh, SDataCols *pSrcDCols) {
...
@@ -518,44 +525,52 @@ static int32_t tsdbFilterDataCols(STruncateH *prh, SDataCols *pSrcDCols) {
pDstDCols
->
maxPoints
,
0
);
pDstDCols
->
maxPoints
,
0
);
}
}
}
}
++
pDstDCols
->
numOfRows
;
++
pDstDCols
->
numOfRows
;
}
}
return
0
;
return
0
;
}
}
static
int
tsdbTruncateFSetImpl
(
STruncateH
*
prh
)
{
// table in delete list
STsdbRepo
*
pRepo
=
TSDB_TRUNCATE_REPO
(
prh
);
bool
tableInDel
(
STruncateH
*
ptru
,
int32_t
tid
)
{
// SReadH * pReadh = &(prh->readh);
for
(
int32_t
i
=
0
;
i
<
ptru
->
pCtlInfo
->
ctlData
.
tnum
;
i
++
)
{
SBlockIdx
*
pBlkIdx
=
NULL
;
if
(
tid
==
ptru
->
pCtlInfo
->
ctlData
.
tids
[
i
])
void
**
ppBuf
=
&
(
TSDB_TRUNCATE_BUF
(
prh
));
return
true
;
// void ** ppCBuf = &(TSDB_TRUNCATE_COMP_BUF(prh));
}
// void ** ppExBuf = &(TSDB_TRUNCATE_EXBUF(prh));
return
false
;
}
taosArrayClear
(
prh
->
aBlkIdx
);
static
int
tsdbTruncateFSetImpl
(
STruncateH
*
ptru
)
{
STsdbRepo
*
pRepo
=
TSDB_TRUNCATE_REPO
(
ptru
);
// SReadH * pReadh = &(ptru->readh);
SBlockIdx
*
pBlkIdx
=
NULL
;
void
**
ppBuf
=
&
(
TSDB_TRUNCATE_BUF
(
ptru
));
// void ** ppCBuf = &(TSDB_TRUNCATE_COMP_BUF(ptru));
// void ** ppExBuf = &(TSDB_TRUNCATE_EXBUF(ptru));
for
(
size_t
tid
=
1
;
tid
<
taosArrayGetSize
(
prh
->
tblArray
);
++
tid
)
{
taosArrayClear
(
ptru
->
aBlkIdx
);
STableTruncateH
*
pHandle
=
(
STableTruncateH
*
)
taosArrayGet
(
prh
->
tblArray
,
tid
);
pBlkIdx
=
pHandle
->
pBlkIdx
;
if
(
pHandle
->
pTable
==
NULL
||
pHandle
->
pBlkIdx
==
NULL
)
continue
;
for
(
size_t
tid
=
1
;
tid
<
taosArrayGetSize
(
ptru
->
tblArray
);
++
tid
)
{
STableTruncateH
*
pItem
=
(
STableTruncateH
*
)
taosArrayGet
(
ptru
->
tblArray
,
tid
);
pBlkIdx
=
pItem
->
pBlkIdx
;
taosArrayClear
(
prh
->
aSupBlk
)
;
if
(
pItem
->
pTable
==
NULL
||
pItem
->
pBlkIdx
==
NULL
)
continue
;
uint64_t
uid
=
pHandle
->
pTable
->
tableId
.
uid
;
taosArrayClear
(
ptru
->
aSupBlk
)
;
if
(
uid
!=
prh
->
pCtlInfo
->
uid
)
{
if
(
!
tableInDel
(
ptru
,
tid
)
)
{
if
((
pBlkIdx
->
numOfBlocks
>
0
)
&&
(
taosArrayPush
(
p
rh
->
aBlkIdx
,
(
const
void
*
)(
pBlkIdx
))
==
NULL
))
{
if
((
pBlkIdx
->
numOfBlocks
>
0
)
&&
(
taosArrayPush
(
p
tru
->
aBlkIdx
,
(
const
void
*
)(
pBlkIdx
))
==
NULL
))
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
return
-
1
;
}
}
}
else
{
}
else
{
// Loop to mark delete flag for each block data
// Loop to mark delete flag for each block data
tsdbDebug
(
"vgId:%d
uid %"
PRIu64
" matched to truncate"
,
REPO_ID
(
pRepo
),
u
id
);
tsdbDebug
(
"vgId:%d
tid %ld matched to truncate"
,
REPO_ID
(
pRepo
),
t
id
);
// for (int i = 0; i < p
Handle
->pBlkIdx->numOfBlocks; ++i) {
// for (int i = 0; i < p
Item
->pBlkIdx->numOfBlocks; ++i) {
// SBlock *pBlock = p
Handle
->pInfo->blocks + i;
// SBlock *pBlock = p
Item
->pInfo->blocks + i;
// if (tsdbWriteBlockTo
RightFile(prh, pHandle->pTable, prh
->pDCols, ppBuf, ppCBuf, ppExBuf) <
// if (tsdbWriteBlockTo
File(ptru, pItem->pTable, ptru
->pDCols, ppBuf, ppCBuf, ppExBuf) <
// 0) {
// 0) {
// return -1;
// return -1;
// }
// }
...
@@ -563,126 +578,234 @@ static int tsdbTruncateFSetImpl(STruncateH *prh) {
...
@@ -563,126 +578,234 @@ static int tsdbTruncateFSetImpl(STruncateH *prh) {
}
}
}
}
if
(
tsdbWriteBlockIdx
(
TSDB_TRUNCATE_HEAD_FILE
(
p
rh
),
prh
->
aBlkIdx
,
ppBuf
)
<
0
)
{
if
(
tsdbWriteBlockIdx
(
TSDB_TRUNCATE_HEAD_FILE
(
p
tru
),
ptru
->
aBlkIdx
,
ppBuf
)
<
0
)
{
return
-
1
;
return
-
1
;
}
}
return
0
;
return
0
;
}
}
static
int
tsdbDeleteFSetImpl
(
STruncateH
*
prh
)
{
// if pBlock is border block return true else return false
STsdbRepo
*
pRepo
=
TSDB_TRUNCATE_REPO
(
prh
);
static
int
tsdbBlockSolve
(
STruncateH
*
ptru
,
SBlock
*
pBlock
)
{
// STsdbCfg * pCfg = REPO_CFG(pRepo);
// delete window
SReadH
*
pReadh
=
&
(
prh
->
readh
);
STimeWindow
*
pdel
=
&
ptru
->
pCtlInfo
->
ctlData
.
win
;
SBlockIdx
blkIdx
=
{
0
};
void
**
ppBuf
=
&
(
TSDB_TRUNCATE_BUF
(
prh
));
void
**
ppCBuf
=
&
(
TSDB_TRUNCATE_COMP_BUF
(
prh
));
void
**
ppExBuf
=
&
(
TSDB_TRUNCATE_EXBUF
(
prh
));
// int defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
taosArrayClear
(
prh
->
aBlkIdx
);
// do nothing for no delete
if
(
pBlock
->
keyFirst
>
pdel
->
ekey
||
pBlock
->
keyLast
<
pdel
->
skey
)
return
BLOCK_RETAIN
;
for
(
size_t
tid
=
1
;
tid
<
taosArrayGetSize
(
prh
->
tblArray
);
++
tid
)
{
// border block
STableTruncateH
*
pHandle
=
(
STableTruncateH
*
)
taosArrayGet
(
prh
->
tblArray
,
tid
);
if
(
pBlock
->
keyFirst
<=
pdel
->
skey
||
pBlock
->
keyLast
>=
pdel
->
ekey
)
STSchema
*
pSchema
=
NULL
;
return
BLOCK_MODIFY
;
if
(
pHandle
->
pTable
==
NULL
||
pHandle
->
pBlkIdx
==
NULL
)
continue
;
// need del
return
BLOCK_DELETE
;
}
if
((
pSchema
=
tsdbGetTableSchemaImpl
(
pHandle
->
pTable
,
true
,
true
,
-
1
,
-
1
))
==
NULL
)
{
// remove del block from pBlockInfo
return
-
1
;
int
tsdbRemoveDelBlocks
(
STruncateH
*
ptru
,
STableTruncateH
*
pItem
)
{
// loop
int
numOfBlocks
=
pItem
->
pBlkIdx
->
numOfBlocks
;
int
from
=
-
1
;
int
delAll
=
0
;
for
(
int
i
=
numOfBlocks
-
1
;
i
>=
0
;
--
i
)
{
SBlock
*
pBlock
=
pItem
->
pInfo
->
blocks
+
i
;
int32_t
solve
=
tsdbBlockSolve
(
ptru
,
pBlock
);
bool
doDel
=
false
;
if
(
solve
==
BLOCK_DELETE
)
{
if
(
from
==
-
1
)
from
=
i
;
if
(
i
==
0
)
doDel
=
true
;
}
else
{
if
(
from
!=
-
1
)
doDel
=
true
;
}
}
taosArrayClear
(
prh
->
aSupBlk
);
// do del
if
(
doDel
)
{
int
delCnt
=
from
-
i
+
1
;
memmove
(
pBlock
,
pItem
->
pInfo
->
blocks
+
i
+
delCnt
,
sizeof
(
SBlock
)
*
delCnt
);
delAll
+=
delCnt
;
}
}
uint64_t
uid
=
pHandle
->
pTable
->
tableId
.
uid
;
// set value
// if(uid != pMsg->uid) {
pItem
->
pBlkIdx
->
numOfBlocks
-=
delAll
;
// TODO: copy the block data directly
// }
if
((
tdInitDataCols
(
prh
->
pDCols
,
pSchema
)
<
0
)
||
(
tdInitDataCols
(
pReadh
->
pDCols
[
0
],
pSchema
)
<
0
)
||
return
delAll
;
(
tdInitDataCols
(
pReadh
->
pDCols
[
1
],
pSchema
)
<
0
))
{
}
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
tdFreeSchema
(
pSchema
);
static
void
tsdbAddBlock
(
STruncateH
*
ptru
,
STableTruncateH
*
pItem
,
SBlock
*
pBlock
)
{
return
-
1
;
taosArrayPush
(
ptru
->
aSubBlk
,
(
const
void
*
)
pBlock
);
// have sub block
if
(
pBlock
->
numOfSubBlocks
>
1
)
{
SBlock
*
jBlock
=
POINTER_SHIFT
(
pItem
->
pInfo
,
pBlock
->
offset
);;
for
(
int
j
=
0
;
j
<
pBlock
->
numOfSubBlocks
;
j
++
)
{
taosArrayPush
(
ptru
->
aSubBlk
,
(
const
void
*
)
jBlock
++
);
}
}
}
}
// need modify blocks
static
int
tsdbModifyBlocks
(
STruncateH
*
ptru
,
STableTruncateH
*
pItem
)
{
SReadH
*
pReadh
=
&
(
ptru
->
readh
);
void
**
ppBuf
=
&
(
TSDB_TRUNCATE_BUF
(
ptru
));
void
**
ppCBuf
=
&
(
TSDB_TRUNCATE_COMP_BUF
(
ptru
));
void
**
ppExBuf
=
&
(
TSDB_TRUNCATE_EXBUF
(
ptru
));
STSchema
*
pSchema
=
NULL
;
SBlockIdx
blkIdx
=
{
0
};
// get pSchema for del table
if
((
pSchema
=
tsdbGetTableSchemaImpl
(
pItem
->
pTable
,
true
,
true
,
-
1
,
-
1
))
==
NULL
)
{
return
-
1
;
}
if
((
tdInitDataCols
(
ptru
->
pDCols
,
pSchema
)
<
0
)
||
(
tdInitDataCols
(
pReadh
->
pDCols
[
0
],
pSchema
)
<
0
)
||
(
tdInitDataCols
(
pReadh
->
pDCols
[
1
],
pSchema
)
<
0
))
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
tdFreeSchema
(
pSchema
);
tdFreeSchema
(
pSchema
);
return
-
1
;
}
tdFreeSchema
(
pSchema
);
// Loop to truncate each block data
// delete block
for
(
int
i
=
0
;
i
<
pHandle
->
pBlkIdx
->
numOfBlocks
;
++
i
)
{
tsdbRemoveDelBlocks
(
ptru
,
pItem
);
SBlock
*
pBlock
=
pHandle
->
pInfo
->
blocks
+
i
;
if
(
pItem
->
pBlkIdx
->
numOfBlocks
==
0
)
{
// all blocks were deleted
return
TSDB_CODE_SUCCESS
;
}
// Copy the Blocks directly if TS is not interleaved.
taosArrayClear
(
ptru
->
aSupBlk
);
if
(
!
tsdbBlockInterleaved
(
prh
,
pBlock
))
{
taosArrayClear
(
ptru
->
aSubBlk
);
// tsdbWriteBlockAndDataToFile();
continue
;
}
// Otherwise load the block data and copy the specific rows.
// Loop to truncate each block data
if
(
tsdbLoadBlockData
(
pReadh
,
pBlock
,
pHandle
->
pInfo
)
<
0
)
{
for
(
int
i
=
0
;
i
<
pItem
->
pBlkIdx
->
numOfBlocks
;
++
i
)
{
return
-
1
;
SBlock
*
pBlock
=
pItem
->
pInfo
->
blocks
+
i
;
}
int32_t
solve
=
tsdbBlockSolve
(
ptru
,
pBlock
);
if
(
uid
==
prh
->
pCtlInfo
->
uid
)
{
if
(
solve
==
BLOCK_RETAIN
)
{
tsdbFilterDataCols
(
prh
,
pReadh
->
pDCols
[
0
]);
tsdbAddBlock
(
ptru
,
pItem
,
pBlock
);
tsdbDebug
(
"vgId:%d uid %"
PRIu64
" matched, filter block data from rows %d to %d rows"
,
REPO_ID
(
pRepo
),
uid
,
continue
;
pReadh
->
pDCols
[
0
]
->
numOfRows
,
prh
->
pDCols
->
numOfRows
);
}
if
(
prh
->
pDCols
->
numOfRows
<=
0
)
continue
;
if
(
tsdbWriteBlockToRightFile
(
prh
,
pHandle
->
pTable
,
prh
->
pDCols
,
ppBuf
,
ppCBuf
,
ppExBuf
)
<
0
)
{
return
-
1
;
}
}
else
{
tsdbDebug
(
"vgId:%d uid %"
PRIu64
" not matched, copy block data directly
\n
"
,
REPO_ID
(
pRepo
),
uid
);
if
(
tsdbWriteBlockToRightFile
(
prh
,
pHandle
->
pTable
,
pReadh
->
pDCols
[
0
],
ppBuf
,
ppCBuf
,
ppExBuf
)
<
0
)
{
return
-
1
;
}
}
}
if
(
tsdbWriteBlockInfoImpl
(
TSDB_TRUNCATE_HEAD_FILE
(
prh
),
pHandle
->
pTable
,
prh
->
aSupBlk
,
NULL
,
// border block need load to delete no-use data
ppBuf
,
&
blkIdx
)
<
0
)
{
if
(
tsdbLoadBlockData
(
pReadh
,
pBlock
,
pItem
->
pInfo
)
<
0
)
{
return
-
1
;
return
-
1
;
}
}
if
((
blkIdx
.
numOfBlocks
>
0
)
&&
(
taosArrayPush
(
prh
->
aBlkIdx
,
(
const
void
*
)(
&
blkIdx
))
==
NULL
))
{
tsdbFilterDataCols
(
ptru
,
pReadh
->
pDCols
[
0
]);
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
if
(
ptru
->
pDCols
->
numOfRows
<=
0
)
{
continue
;
}
SBlock
newBlock
=
{
0
};
if
(
tsdbWriteBlockToFile
(
ptru
,
pItem
->
pTable
,
ptru
->
pDCols
,
ppBuf
,
ppCBuf
,
ppExBuf
,
&
newBlock
)
<
0
)
{
return
-
1
;
return
-
1
;
}
}
// add new block to info
tsdbAddBlock
(
ptru
,
pItem
,
&
newBlock
);
}
// write block info for each table
if
(
tsdbWriteBlockInfoImpl
(
TSDB_TRUNCATE_HEAD_FILE
(
ptru
),
pItem
->
pTable
,
ptru
->
aSupBlk
,
ptru
->
aSubBlk
,
ppBuf
,
&
blkIdx
)
<
0
)
{
return
-
1
;
}
}
if
(
tsdbWriteBlockIdx
(
TSDB_TRUNCATE_HEAD_FILE
(
prh
),
prh
->
aBlkIdx
,
ppBuf
)
<
0
)
{
// each table's blkIdx
if
(
blkIdx
.
numOfBlocks
>
0
&&
taosArrayPush
(
ptru
->
aBlkIdx
,
(
const
void
*
)(
&
blkIdx
))
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
return
-
1
;
}
}
return
0
;
return
0
;
}
}
static
int
tsdbWriteBlockToRightFile
(
STruncateH
*
prh
,
STable
*
pTable
,
SDataCols
*
pDCols
,
void
**
ppBuf
,
// keep intact blocks info and write to head file then save offset to blkIdx
void
**
ppCBuf
,
void
**
ppExBuf
)
{
static
int
tsdbKeepIntactBlocks
(
STruncateH
*
ptru
,
STableTruncateH
*
pItem
)
{
STsdbRepo
*
pRepo
=
TSDB_TRUNCATE_REPO
(
prh
);
// init
SBlockIdx
blkIdx
=
{
0
};
taosArrayClear
(
ptru
->
aSupBlk
);
taosArrayClear
(
ptru
->
aSubBlk
);
for
(
int32_t
i
=
0
;
i
<
pItem
->
pBlkIdx
->
numOfBlocks
;
i
++
)
{
SBlock
*
pBlock
=
pItem
->
pInfo
->
blocks
+
i
;
tsdbAddBlock
(
ptru
,
pItem
,
pBlock
);
}
// write block info for one table
void
**
ppBuf
=
&
(
TSDB_TRUNCATE_BUF
(
ptru
));
int32_t
ret
=
tsdbWriteBlockInfoImpl
(
TSDB_TRUNCATE_HEAD_FILE
(
ptru
),
pItem
->
pTable
,
ptru
->
aSupBlk
,
ptru
->
aSubBlk
,
ppBuf
,
&
blkIdx
);
if
(
ret
!=
TSDB_CODE_SUCCESS
)
{
return
ret
;
}
// each table's blkIdx
if
(
blkIdx
.
numOfBlocks
>
0
&&
taosArrayPush
(
ptru
->
aBlkIdx
,
(
const
void
*
)
&
blkIdx
)
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
}
return
ret
;
}
static
int
tsdbFSetDeleteImpl
(
STruncateH
*
ptru
)
{
void
**
ppBuf
=
&
(
TSDB_TRUNCATE_BUF
(
ptru
));
int32_t
ret
=
TSDB_CODE_SUCCESS
;
// 1.INIT
taosArrayClear
(
ptru
->
aBlkIdx
);
for
(
size_t
tid
=
1
;
tid
<
taosArrayGetSize
(
ptru
->
tblArray
);
++
tid
)
{
STableTruncateH
*
pItem
=
(
STableTruncateH
*
)
taosArrayGet
(
ptru
->
tblArray
,
tid
);
// no table in this tid position
if
(
pItem
->
pTable
==
NULL
||
pItem
->
pBlkIdx
==
NULL
)
continue
;
// 2.WRITE INFO OF EACH TABLE BLOCK INFO TO HEAD FILE
if
(
tableInDel
(
ptru
,
tid
))
{
// modify blocks info and write to head file then save offset to blkIdx
ret
=
tsdbModifyBlocks
(
ptru
,
pItem
);
}
else
{
// keep intact blocks info and write to head file then save offset to blkIdx
ret
=
tsdbKeepIntactBlocks
(
ptru
,
pItem
);
}
if
(
ret
!=
TSDB_CODE_SUCCESS
)
return
ret
;
}
// tid for
// 3.WRITE INDEX OF ALL TABLE'S BLOCK TO HEAD FILE
if
(
tsdbWriteBlockIdx
(
TSDB_TRUNCATE_HEAD_FILE
(
ptru
),
ptru
->
aBlkIdx
,
ppBuf
)
<
0
)
{
return
-
1
;
}
return
ret
;
}
static
int
tsdbWriteBlockToFile
(
STruncateH
*
ptru
,
STable
*
pTable
,
SDataCols
*
pDCols
,
void
**
ppBuf
,
void
**
ppCBuf
,
void
**
ppExBuf
,
SBlock
*
pBlock
)
{
STsdbRepo
*
pRepo
=
TSDB_TRUNCATE_REPO
(
ptru
);
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
SDFile
*
pDFile
=
NULL
;
SDFile
*
pDFile
=
NULL
;
bool
isLast
=
false
;
bool
isLast
=
false
;
SBlock
block
=
{
0
};
ASSERT
(
pDCols
->
numOfRows
>
0
);
ASSERT
(
pDCols
->
numOfRows
>
0
);
if
(
pDCols
->
numOfRows
<
pCfg
->
minRowsPerFileBlock
)
{
if
(
pDCols
->
numOfRows
<
pCfg
->
minRowsPerFileBlock
)
{
pDFile
=
TSDB_TRUNCATE_LAST_FILE
(
p
rh
);
pDFile
=
TSDB_TRUNCATE_LAST_FILE
(
p
tru
);
isLast
=
true
;
isLast
=
true
;
}
else
{
}
else
{
pDFile
=
TSDB_TRUNCATE_DATA_FILE
(
p
rh
);
pDFile
=
TSDB_TRUNCATE_DATA_FILE
(
p
tru
);
isLast
=
false
;
isLast
=
false
;
}
}
if
(
tsdbWriteBlockImpl
(
pRepo
,
pTable
,
pDFile
,
if
(
tsdbWriteBlockImpl
(
pRepo
,
pTable
,
pDFile
,
isLast
?
TSDB_TRUNCATE_SMAL_FILE
(
prh
)
:
TSDB_TRUNCATE_SMAD_FILE
(
prh
),
pDCols
,
isLast
?
TSDB_TRUNCATE_SMAL_FILE
(
ptru
)
:
TSDB_TRUNCATE_SMAD_FILE
(
ptru
),
pDCols
,
&
block
,
isLast
,
true
,
ppBuf
,
ppCBuf
,
ppExBuf
)
<
0
)
{
pBlock
,
isLast
,
true
,
ppBuf
,
ppCBuf
,
ppExBuf
)
<
0
)
{
return
-
1
;
}
if
(
taosArrayPush
(
prh
->
aSupBlk
,
(
void
*
)(
&
block
))
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
return
-
1
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录