Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2268a1e9
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
2268a1e9
编写于
10月 14, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix some converity scan problem
上级
9a024590
变更
17
隐藏空白更改
内联
并排
Showing
17 changed file
with
158 addition
and
99 deletion
+158
-99
include/util/trbtree.h
include/util/trbtree.h
+2
-4
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+8
-1
source/dnode/vnode/src/tsdb/tsdbFS.c
source/dnode/vnode/src/tsdb/tsdbFS.c
+3
-6
source/dnode/vnode/src/tsdb/tsdbMemTable.c
source/dnode/vnode/src/tsdb/tsdbMemTable.c
+2
-0
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
+4
-4
source/dnode/vnode/src/tsdb/tsdbOpen.c
source/dnode/vnode/src/tsdb/tsdbOpen.c
+5
-1
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
+44
-29
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
+46
-24
source/dnode/vnode/src/tsdb/tsdbWrite.c
source/dnode/vnode/src/tsdb/tsdbWrite.c
+3
-1
source/dnode/vnode/src/vnd/vnodeBufPool.c
source/dnode/vnode/src/vnd/vnodeBufPool.c
+9
-9
source/dnode/vnode/src/vnd/vnodeModule.c
source/dnode/vnode/src/vnd/vnodeModule.c
+7
-4
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+2
-2
source/dnode/vnode/src/vnd/vnodeSnapshot.c
source/dnode/vnode/src/vnd/vnodeSnapshot.c
+6
-3
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+4
-1
source/libs/tdb/src/db/tdbPager.c
source/libs/tdb/src/db/tdbPager.c
+3
-3
source/util/src/trbtree.c
source/util/src/trbtree.c
+6
-6
source/util/src/tskiplist.c
source/util/src/tskiplist.c
+4
-1
未找到文件。
include/util/trbtree.h
浏览文件 @
2268a1e9
...
...
@@ -26,7 +26,7 @@ typedef struct SRBTree SRBTree;
typedef
struct
SRBTreeNode
SRBTreeNode
;
typedef
struct
SRBTreeIter
SRBTreeIter
;
typedef
int32_t
(
*
tRBTreeCmprFn
)(
const
void
*
,
const
void
*
);
typedef
int32_t
(
*
tRBTreeCmprFn
)(
const
SRBTreeNode
*
,
const
SRBTreeNode
*
);
// SRBTree =============================================
#define tRBTreeMin(T) ((T)->min == ((T)->NIL) ? NULL : (T)->min)
...
...
@@ -36,7 +36,7 @@ void tRBTreeCreate(SRBTree *pTree, tRBTreeCmprFn cmprFn);
SRBTreeNode
*
tRBTreePut
(
SRBTree
*
pTree
,
SRBTreeNode
*
z
);
void
tRBTreeDrop
(
SRBTree
*
pTree
,
SRBTreeNode
*
z
);
SRBTreeNode
*
tRBTreeDropByKey
(
SRBTree
*
pTree
,
void
*
pKey
);
SRBTreeNode
*
tRBTreeGet
(
SRBTree
*
pTree
,
void
*
pKey
);
SRBTreeNode
*
tRBTreeGet
(
SRBTree
*
pTree
,
const
SRBTreeNode
*
pKeyNode
);
// SRBTreeIter =============================================
#define tRBTreeIterCreate(tree, ascend) \
...
...
@@ -53,8 +53,6 @@ struct SRBTreeNode {
SRBTreeNode
*
right
;
};
#define RBTREE_NODE_PAYLOAD(N) ((const void *)&(N)[1])
struct
SRBTree
{
tRBTreeCmprFn
cmprFn
;
int64_t
n
;
...
...
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
2268a1e9
...
...
@@ -396,12 +396,19 @@ _exit:
return
code
;
}
static
int32_t
tDataIterCmprFn
(
const
SRBTreeNode
*
n1
,
const
SRBTreeNode
*
n2
)
{
SDataIter
*
pIter1
=
(
SDataIter
*
)((
uint8_t
*
)
n1
-
offsetof
(
SDataIter
,
n
));
SDataIter
*
pIter2
=
(
SDataIter
*
)((
uint8_t
*
)
n2
-
offsetof
(
SDataIter
,
n
));
return
tRowInfoCmprFn
(
&
pIter1
->
r
,
&
pIter2
->
r
);
}
static
int32_t
tsdbOpenCommitIter
(
SCommitter
*
pCommitter
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
pCommitter
->
pIter
=
NULL
;
tRBTreeCreate
(
&
pCommitter
->
rbt
,
t
RowInfo
CmprFn
);
tRBTreeCreate
(
&
pCommitter
->
rbt
,
t
DataIter
CmprFn
);
// memory
TSDBKEY
tKey
=
{.
ts
=
pCommitter
->
minKey
,
.
version
=
VERSION_MIN
};
...
...
source/dnode/vnode/src/tsdb/tsdbFS.c
浏览文件 @
2268a1e9
...
...
@@ -610,9 +610,6 @@ int32_t tsdbFSRollback(STsdbFS *pFS) {
ASSERT
(
0
);
return
code
;
_err:
return
code
;
}
int32_t
tsdbFSUpsertDelFile
(
STsdbFS
*
pFS
,
SDelFile
*
pDelFile
)
{
...
...
@@ -866,7 +863,7 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
nRef
=
atomic_sub_fetch_32
(
&
fSet
.
pSmaF
->
nRef
,
1
);
if
(
nRef
==
0
)
{
tsdbSmaFileName
(
pTsdb
,
pSetOld
->
diskId
,
pSetOld
->
fid
,
fSet
.
pSmaF
,
fname
);
taosRemoveFile
(
fname
);
(
void
)
taosRemoveFile
(
fname
);
taosMemoryFree
(
fSet
.
pSmaF
);
}
}
else
{
...
...
@@ -877,7 +874,7 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) {
// stt
if
(
sameDisk
)
{
if
(
pSetNew
->
nSttF
>
pSetOld
->
nSttF
)
{
ASSERT
(
pSetNew
->
nSttF
=
pSetOld
->
nSttF
+
1
);
ASSERT
(
pSetNew
->
nSttF
=
=
pSetOld
->
nSttF
+
1
);
pSetOld
->
aSttF
[
pSetOld
->
nSttF
]
=
(
SSttFile
*
)
taosMemoryMalloc
(
sizeof
(
SSttFile
));
if
(
pSetOld
->
aSttF
[
pSetOld
->
nSttF
]
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -1104,7 +1101,7 @@ void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) {
ASSERT
(
nRef
>=
0
);
if
(
nRef
==
0
)
{
tsdbDelFileName
(
pTsdb
,
pFS
->
pDelFile
,
fname
);
taosRemoveFile
(
fname
);
(
void
)
taosRemoveFile
(
fname
);
taosMemoryFree
(
pFS
->
pDelFile
);
}
}
...
...
source/dnode/vnode/src/tsdb/tsdbMemTable.c
浏览文件 @
2268a1e9
...
...
@@ -559,6 +559,8 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
// backward put first data
row
.
pTSRow
=
tGetSubmitBlkNext
(
&
blkIter
);
if
(
row
.
pTSRow
==
NULL
)
return
code
;
key
.
ts
=
row
.
pTSRow
->
ts
;
nRow
++
;
tbDataMovePosTo
(
pTbData
,
pos
,
&
key
,
SL_MOVE_BACKWARD
);
...
...
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
浏览文件 @
2268a1e9
...
...
@@ -504,9 +504,9 @@ _exit:
SRowInfo
*
tLDataIterGet
(
SLDataIter
*
pIter
)
{
return
&
pIter
->
rInfo
;
}
// SMergeTree =================================================
static
FORCE_INLINE
int32_t
tLDataIterCmprFn
(
const
void
*
p1
,
const
void
*
p2
)
{
SLDataIter
*
pIter1
=
(
SLDataIter
*
)(((
uint8_t
*
)
p1
)
-
sizeof
(
SRBTreeN
ode
));
SLDataIter
*
pIter2
=
(
SLDataIter
*
)(((
uint8_t
*
)
p2
)
-
sizeof
(
SRBTreeN
ode
));
static
FORCE_INLINE
int32_t
tLDataIterCmprFn
(
const
SRBTreeNode
*
p1
,
const
SRBTreeNode
*
p2
)
{
SLDataIter
*
pIter1
=
(
SLDataIter
*
)(((
uint8_t
*
)
p1
)
-
offsetof
(
SLDataIter
,
n
ode
));
SLDataIter
*
pIter2
=
(
SLDataIter
*
)(((
uint8_t
*
)
p2
)
-
offsetof
(
SLDataIter
,
n
ode
));
TSDBKEY
key1
=
TSDBROW_KEY
(
&
pIter1
->
rInfo
.
row
);
TSDBKEY
key2
=
TSDBROW_KEY
(
&
pIter2
->
rInfo
.
row
);
...
...
@@ -583,7 +583,7 @@ bool tMergeTreeNext(SMergeTree *pMTree) {
// compare with min in RB Tree
pIter
=
(
SLDataIter
*
)
tRBTreeMin
(
&
pMTree
->
rbt
);
if
(
pMTree
->
pIter
&&
pIter
)
{
int32_t
c
=
pMTree
->
rbt
.
cmprFn
(
RBTREE_NODE_PAYLOAD
(
&
pMTree
->
pIter
->
node
),
RBTREE_NODE_PAYLOAD
(
&
pIter
->
node
)
);
int32_t
c
=
pMTree
->
rbt
.
cmprFn
(
&
pMTree
->
pIter
->
node
,
&
pIter
->
node
);
if
(
c
>
0
)
{
tRBTreePut
(
&
pMTree
->
rbt
,
(
SRBTreeNode
*
)
pMTree
->
pIter
);
pMTree
->
pIter
=
NULL
;
...
...
source/dnode/vnode/src/tsdb/tsdbOpen.c
浏览文件 @
2268a1e9
...
...
@@ -87,9 +87,13 @@ _err:
int
tsdbClose
(
STsdb
**
pTsdb
)
{
if
(
*
pTsdb
)
{
taosThreadRwlock
Destroy
(
&
(
*
pTsdb
)
->
rwLock
);
taosThreadRwlock
Wrlock
(
&
(
*
pTsdb
)
->
rwLock
);
tsdbMemTableDestroy
((
*
pTsdb
)
->
mem
);
(
*
pTsdb
)
->
mem
=
NULL
;
taosThreadRwlockUnlock
(
&
(
*
pTsdb
)
->
rwLock
);
taosThreadRwlockDestroy
(
&
(
*
pTsdb
)
->
rwLock
);
tsdbFSClose
(
*
pTsdb
);
tsdbCloseCache
(
*
pTsdb
);
taosMemoryFreeClear
(
*
pTsdb
);
...
...
source/dnode/vnode/src/tsdb/tsdbReaderWriter.c
浏览文件 @
2268a1e9
...
...
@@ -18,7 +18,7 @@
// =============== PAGE-WISE FILE ===============
static
int32_t
tsdbOpenFile
(
const
char
*
path
,
int32_t
szPage
,
int32_t
flag
,
STsdbFD
**
ppFD
)
{
int32_t
code
=
0
;
STsdbFD
*
pFD
;
STsdbFD
*
pFD
=
NULL
;
*
ppFD
=
NULL
;
...
...
@@ -35,6 +35,7 @@ static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsd
pFD
->
pFD
=
taosOpenFile
(
path
,
flag
);
if
(
pFD
->
pFD
==
NULL
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
taosMemoryFree
(
pFD
);
goto
_exit
;
}
pFD
->
szPage
=
szPage
;
...
...
@@ -42,11 +43,15 @@ static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsd
pFD
->
pBuf
=
taosMemoryCalloc
(
1
,
szPage
);
if
(
pFD
->
pBuf
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
taosCloseFile
(
&
pFD
->
pFD
);
taosMemoryFree
(
pFD
);
goto
_exit
;
}
if
(
taosStatFile
(
path
,
&
pFD
->
szFile
,
NULL
)
<
0
)
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
taosMemoryFree
(
pFD
->
pBuf
);
taosCloseFile
(
&
pFD
->
pFD
);
taosMemoryFree
(
pFD
);
goto
_exit
;
}
ASSERT
(
pFD
->
szFile
%
szPage
==
0
);
...
...
@@ -59,10 +64,12 @@ _exit:
static
void
tsdbCloseFile
(
STsdbFD
**
ppFD
)
{
STsdbFD
*
pFD
=
*
ppFD
;
taosMemoryFree
(
pFD
->
pBuf
);
taosCloseFile
(
&
pFD
->
pFD
);
taosMemoryFree
(
pFD
);
*
ppFD
=
NULL
;
if
(
pFD
)
{
taosMemoryFree
(
pFD
->
pBuf
);
taosCloseFile
(
&
pFD
->
pFD
);
taosMemoryFree
(
pFD
);
*
ppFD
=
NULL
;
}
}
static
int32_t
tsdbWriteFilePage
(
STsdbFD
*
pFD
)
{
...
...
@@ -443,7 +450,7 @@ int32_t tsdbWriteDataBlk(SDataFWriter *pWriter, SMapData *mDataBlk, SBlockIdx *p
pBlockIdx
->
size
=
size
;
pHeadFile
->
size
+=
size
;
tsdbTrace
(
"vgId:%d, write block, file ID:%d commit ID:%
d
suid:%"
PRId64
" uid:%"
PRId64
" offset:%"
PRId64
tsdbTrace
(
"vgId:%d, write block, file ID:%d commit ID:%
"
PRId64
"
suid:%"
PRId64
" uid:%"
PRId64
" offset:%"
PRId64
" size:%"
PRId64
" nItem:%d"
,
TD_VID
(
pWriter
->
pTsdb
->
pVnode
),
pWriter
->
wSet
.
fid
,
pHeadFile
->
commitID
,
pBlockIdx
->
suid
,
pBlockIdx
->
uid
,
pBlockIdx
->
offset
,
pBlockIdx
->
size
,
mDataBlk
->
nItem
);
...
...
@@ -457,7 +464,7 @@ _err:
int32_t
tsdbWriteSttBlk
(
SDataFWriter
*
pWriter
,
SArray
*
aSttBlk
)
{
int32_t
code
=
0
;
SSttFile
*
pSttFile
=
&
pWriter
->
fStt
[
pWriter
->
wSet
.
nSttF
-
1
];
int64_t
size
;
int64_t
size
=
0
;
int64_t
n
;
// check
...
...
@@ -906,10 +913,6 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) {
taosMemoryFree
(
*
ppReader
);
*
ppReader
=
NULL
;
return
code
;
_err:
tsdbError
(
"vgId:%d, data file reader close failed since %s"
,
TD_VID
((
*
ppReader
)
->
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
int32_t
tsdbReadBlockIdx
(
SDataFReader
*
pReader
,
SArray
*
aBlockIdx
)
{
...
...
@@ -1289,16 +1292,17 @@ _exit:
// SDelFWriter ====================================================
int32_t
tsdbDelFWriterOpen
(
SDelFWriter
**
ppWriter
,
SDelFile
*
pFile
,
STsdb
*
pTsdb
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
char
fname
[
TSDB_FILENAME_LEN
];
uint8_t
hdr
[
TSDB_FHDR_SIZE
]
=
{
0
};
SDelFWriter
*
pDelFWriter
;
SDelFWriter
*
pDelFWriter
=
NULL
;
int64_t
n
;
// alloc
pDelFWriter
=
(
SDelFWriter
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
pDelFWriter
));
if
(
pDelFWriter
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
)
;
}
pDelFWriter
->
pTsdb
=
pTsdb
;
pDelFWriter
->
fDel
=
*
pFile
;
...
...
@@ -1306,21 +1310,28 @@ int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb
tsdbDelFileName
(
pTsdb
,
pFile
,
fname
);
code
=
tsdbOpenFile
(
fname
,
pTsdb
->
pVnode
->
config
.
tsdbPageSize
,
TD_FILE_READ
|
TD_FILE_WRITE
|
TD_FILE_CREATE
,
&
pDelFWriter
->
pWriteH
);
if
(
code
)
goto
_err
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
)
;
// update header
code
=
tsdbWriteFile
(
pDelFWriter
->
pWriteH
,
0
,
hdr
,
TSDB_FHDR_SIZE
);
if
(
code
)
goto
_err
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
)
;
pDelFWriter
->
fDel
.
size
=
TSDB_FHDR_SIZE
;
pDelFWriter
->
fDel
.
offset
=
0
;
*
ppWriter
=
pDelFWriter
;
return
code
;
_err:
tsdbError
(
"vgId:%d, failed to open del file writer since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
*
ppWriter
=
NULL
;
_exit:
if
(
code
)
{
if
(
pDelFWriter
)
{
taosMemoryFree
(
pDelFWriter
);
tsdbCloseFile
(
&
pDelFWriter
->
pWriteH
);
}
*
ppWriter
=
NULL
;
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
errno
));
}
else
{
*
ppWriter
=
pDelFWriter
;
}
return
code
;
}
...
...
@@ -1456,15 +1467,15 @@ struct SDelFReader {
int32_t
tsdbDelFReaderOpen
(
SDelFReader
**
ppReader
,
SDelFile
*
pFile
,
STsdb
*
pTsdb
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
char
fname
[
TSDB_FILENAME_LEN
];
SDelFReader
*
pDelFReader
;
int64_t
n
;
SDelFReader
*
pDelFReader
=
NULL
;
// alloc
pDelFReader
=
(
SDelFReader
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
pDelFReader
));
if
(
pDelFReader
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_e
rr
;
goto
_e
xit
;
}
// open impl
...
...
@@ -1473,14 +1484,18 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb
tsdbDelFileName
(
pTsdb
,
pFile
,
fname
);
code
=
tsdbOpenFile
(
fname
,
pTsdb
->
pVnode
->
config
.
tsdbPageSize
,
TD_FILE_READ
,
&
pDelFReader
->
pReadH
);
if
(
code
)
goto
_err
;
*
ppReader
=
pDelFReader
;
return
code
;
if
(
code
)
{
taosMemoryFree
(
pDelFReader
);
goto
_exit
;
}
_err:
tsdbError
(
"vgId:%d, del file reader open failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
*
ppReader
=
NULL
;
_exit:
if
(
code
)
{
*
ppReader
=
NULL
;
tsdbError
(
"vgId:%d %s failed at %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
}
else
{
*
ppReader
=
pDelFReader
;
}
return
code
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
浏览文件 @
2268a1e9
...
...
@@ -67,6 +67,13 @@ extern int32_t tRowInfoCmprFn(const void* p1, const void* p2);
extern
int32_t
tsdbReadDataBlockEx
(
SDataFReader
*
pReader
,
SDataBlk
*
pDataBlk
,
SBlockData
*
pBlockData
);
extern
int32_t
tsdbUpdateTableSchema
(
SMeta
*
pMeta
,
int64_t
suid
,
int64_t
uid
,
SSkmInfo
*
pSkmInfo
);
static
int32_t
tFDataIterCmprFn
(
const
SRBTreeNode
*
pNode1
,
const
SRBTreeNode
*
pNode2
)
{
SFDataIter
*
pIter1
=
(
SFDataIter
*
)(((
uint8_t
*
)
pNode1
)
-
offsetof
(
SFDataIter
,
n
));
SFDataIter
*
pIter2
=
(
SFDataIter
*
)(((
uint8_t
*
)
pNode2
)
-
offsetof
(
SFDataIter
,
n
));
return
tRowInfoCmprFn
(
&
pIter1
->
rInfo
,
&
pIter2
->
rInfo
);
}
static
int32_t
tsdbSnapReadOpenFile
(
STsdbSnapReader
*
pReader
)
{
int32_t
code
=
0
;
...
...
@@ -79,7 +86,7 @@ static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) {
if
(
code
)
goto
_err
;
pReader
->
pIter
=
NULL
;
tRBTreeCreate
(
&
pReader
->
rbt
,
t
RowInfo
CmprFn
);
tRBTreeCreate
(
&
pReader
->
rbt
,
t
FDataIter
CmprFn
);
// .data file
SFDataIter
*
pIter
=
&
pReader
->
aFDataIter
[
0
];
...
...
@@ -421,7 +428,7 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) {
n
+=
tPutDelData
((
*
ppData
)
+
n
,
pDelData
);
}
tsdbInfo
(
"vgId:%d, vnode snapshot tsdb read del data for %s, suid:%"
PRId64
" uid:%
d
"
PRId64
" size:%d"
,
tsdbInfo
(
"vgId:%d, vnode snapshot tsdb read del data for %s, suid:%"
PRId64
" uid:%"
PRId64
" size:%d"
,
TD_VID
(
pTsdb
->
pVnode
),
pTsdb
->
path
,
pDelIdx
->
suid
,
pDelIdx
->
uid
,
size
);
break
;
...
...
@@ -431,7 +438,7 @@ _exit:
return
code
;
_err:
tsdbError
(
"vgId:%d, vnode snapshot tsdb read del for %s failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
pTsdb
->
p
Vnode
,
tsdbError
(
"vgId:%d, vnode snapshot tsdb read del for %s failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
pTsdb
->
p
ath
,
tstrerror
(
code
));
return
code
;
}
...
...
@@ -1247,20 +1254,21 @@ _err:
// APIs
int32_t
tsdbSnapWriterOpen
(
STsdb
*
pTsdb
,
int64_t
sver
,
int64_t
ever
,
STsdbSnapWriter
**
ppWriter
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
STsdbSnapWriter
*
pWriter
=
NULL
;
// alloc
pWriter
=
(
STsdbSnapWriter
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
pWriter
));
if
(
pWriter
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
)
;
}
pWriter
->
pTsdb
=
pTsdb
;
pWriter
->
sver
=
sver
;
pWriter
->
ever
=
ever
;
code
=
tsdbFSCopy
(
pTsdb
,
&
pWriter
->
fs
);
if
(
code
)
goto
_err
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
)
;
// config
pWriter
->
minutes
=
pTsdb
->
keepCfg
.
days
;
...
...
@@ -1272,7 +1280,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
// SNAP_DATA_TSDB
code
=
tBlockDataCreate
(
&
pWriter
->
bData
);
if
(
code
)
goto
_err
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
)
;
pWriter
->
fid
=
INT32_MIN
;
pWriter
->
id
=
(
TABLEID
){
0
};
...
...
@@ -1280,53 +1288,67 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
pWriter
->
dReader
.
aBlockIdx
=
taosArrayInit
(
0
,
sizeof
(
SBlockIdx
));
if
(
pWriter
->
dReader
.
aBlockIdx
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
)
;
}
code
=
tBlockDataCreate
(
&
pWriter
->
dReader
.
bData
);
if
(
code
)
goto
_err
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
)
;
// Writer
pWriter
->
dWriter
.
aBlockIdx
=
taosArrayInit
(
0
,
sizeof
(
SBlockIdx
));
if
(
pWriter
->
dWriter
.
aBlockIdx
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
)
;
}
pWriter
->
dWriter
.
aSttBlk
=
taosArrayInit
(
0
,
sizeof
(
SSttBlk
));
if
(
pWriter
->
dWriter
.
aSttBlk
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
)
;
}
code
=
tBlockDataCreate
(
&
pWriter
->
dWriter
.
bData
);
if
(
code
)
goto
_err
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
)
;
code
=
tBlockDataCreate
(
&
pWriter
->
dWriter
.
sData
);
if
(
code
)
goto
_err
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
)
;
// SNAP_DATA_DEL
pWriter
->
aDelIdxR
=
taosArrayInit
(
0
,
sizeof
(
SDelIdx
));
if
(
pWriter
->
aDelIdxR
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
)
;
}
pWriter
->
aDelData
=
taosArrayInit
(
0
,
sizeof
(
SDelData
));
if
(
pWriter
->
aDelData
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
)
;
}
pWriter
->
aDelIdxW
=
taosArrayInit
(
0
,
sizeof
(
SDelIdx
));
if
(
pWriter
->
aDelIdxW
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
)
;
}
*
ppWriter
=
pWriter
;
tsdbInfo
(
"vgId:%d, tsdb snapshot writer open for %s succeed"
,
TD_VID
(
pTsdb
->
pVnode
),
pTsdb
->
path
);
return
code
;
_err:
tsdbError
(
"vgId:%d, tsdb snapshot writer open for %s failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
pTsdb
->
path
,
tstrerror
(
code
));
*
ppWriter
=
NULL
;
_exit:
if
(
code
)
{
tsdbError
(
"vgId:%d %s failed at line %d since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
__func__
,
lino
,
tstrerror
(
code
));
*
ppWriter
=
NULL
;
if
(
pWriter
)
{
if
(
pWriter
->
aDelIdxW
)
taosArrayDestroy
(
pWriter
->
aDelIdxW
);
if
(
pWriter
->
aDelData
)
taosArrayDestroy
(
pWriter
->
aDelData
);
if
(
pWriter
->
aDelIdxR
)
taosArrayDestroy
(
pWriter
->
aDelIdxR
);
tBlockDataDestroy
(
&
pWriter
->
dWriter
.
sData
,
1
);
tBlockDataDestroy
(
&
pWriter
->
dWriter
.
bData
,
1
);
if
(
pWriter
->
dWriter
.
aSttBlk
)
taosArrayDestroy
(
pWriter
->
dWriter
.
aSttBlk
);
if
(
pWriter
->
dWriter
.
aBlockIdx
)
taosArrayDestroy
(
pWriter
->
dWriter
.
aBlockIdx
);
tBlockDataDestroy
(
&
pWriter
->
dReader
.
bData
,
1
);
if
(
pWriter
->
dReader
.
aBlockIdx
)
taosArrayDestroy
(
pWriter
->
dReader
.
aBlockIdx
);
tBlockDataDestroy
(
&
pWriter
->
bData
,
1
);
tsdbFSDestroy
(
&
pWriter
->
fs
);
taosMemoryFree
(
pWriter
);
}
}
else
{
tsdbDebug
(
"vgId:%d, tsdb snapshot writer open for %s succeed"
,
TD_VID
(
pTsdb
->
pVnode
),
pTsdb
->
path
);
*
ppWriter
=
pWriter
;
}
return
code
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbWrite.c
浏览文件 @
2268a1e9
...
...
@@ -34,7 +34,9 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *
}
// loop to insert
tInitSubmitMsgIter
(
pMsg
,
&
msgIter
);
if
(
tInitSubmitMsgIter
(
pMsg
,
&
msgIter
)
<
0
)
{
return
-
1
;
}
while
(
true
)
{
SSubmitBlkRsp
r
=
{
0
};
tGetSubmitMsgNext
(
&
msgIter
,
&
pBlock
);
...
...
source/dnode/vnode/src/vnd/vnodeBufPool.c
浏览文件 @
2268a1e9
...
...
@@ -141,17 +141,17 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
}
void
vnodeBufPoolFree
(
SVBufPool
*
pPool
,
void
*
p
)
{
uint8_t
*
ptr
=
(
uint8_t
*
)
p
;
SVBufPoolNode
*
pNode
;
//
uint8_t *ptr = (uint8_t *)p;
//
SVBufPoolNode *pNode;
if
(
ptr
<
pPool
->
node
.
data
||
ptr
>=
pPool
->
node
.
data
+
pPool
->
node
.
size
)
{
pNode
=
&
((
SVBufPoolNode
*
)
p
)[
-
1
];
*
pNode
->
pnext
=
pNode
->
prev
;
pNode
->
prev
->
pnext
=
pNode
->
pnext
;
//
if (ptr < pPool->node.data || ptr >= pPool->node.data + pPool->node.size) {
//
pNode = &((SVBufPoolNode *)p)[-1];
//
*pNode->pnext = pNode->prev;
//
pNode->prev->pnext = pNode->pnext;
pPool
->
size
=
pPool
->
size
-
sizeof
(
*
pNode
)
-
pNode
->
size
;
taosMemoryFree
(
pNode
);
}
//
pPool->size = pPool->size - sizeof(*pNode) - pNode->size;
//
taosMemoryFree(pNode);
//
}
}
void
vnodeBufPoolRef
(
SVBufPool
*
pPool
)
{
...
...
source/dnode/vnode/src/vnd/vnodeModule.c
浏览文件 @
2268a1e9
...
...
@@ -46,11 +46,17 @@ int vnodeInit(int nthreads) {
return
0
;
}
vnodeGlobal
.
stop
=
0
;
taosThreadMutexInit
(
&
vnodeGlobal
.
mutex
,
NULL
);
taosThreadCondInit
(
&
vnodeGlobal
.
hasTask
,
NULL
);
taosThreadMutexLock
(
&
vnodeGlobal
.
mutex
);
vnodeGlobal
.
stop
=
0
;
vnodeGlobal
.
queue
.
next
=
&
vnodeGlobal
.
queue
;
vnodeGlobal
.
queue
.
prev
=
&
vnodeGlobal
.
queue
;
taosThreadMutexUnlock
(
&
(
vnodeGlobal
.
mutex
));
vnodeGlobal
.
nthreads
=
nthreads
;
vnodeGlobal
.
threads
=
taosMemoryCalloc
(
nthreads
,
sizeof
(
TdThread
));
if
(
vnodeGlobal
.
threads
==
NULL
)
{
...
...
@@ -59,9 +65,6 @@ int vnodeInit(int nthreads) {
return
-
1
;
}
taosThreadMutexInit
(
&
vnodeGlobal
.
mutex
,
NULL
);
taosThreadCondInit
(
&
vnodeGlobal
.
hasTask
,
NULL
);
for
(
int
i
=
0
;
i
<
nthreads
;
i
++
)
{
taosThreadCreate
(
&
(
vnodeGlobal
.
threads
[
i
]),
NULL
,
loop
,
NULL
);
}
...
...
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
2268a1e9
...
...
@@ -38,7 +38,7 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
if
(
taosMkDir
(
path
))
{
return
TAOS_SYSTEM_ERROR
(
errno
);
}
s
trcpy
(
dir
,
path
);
s
nprintf
(
dir
,
TSDB_FILENAME_LEN
,
"%s"
,
path
);
}
if
(
pCfg
)
{
...
...
@@ -51,7 +51,7 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
info
.
state
.
commitID
=
0
;
if
(
vnodeSaveInfo
(
dir
,
&
info
)
<
0
||
vnodeCommitInfo
(
dir
,
&
info
)
<
0
)
{
vError
(
"vgId:%d, failed to save vnode config since %s"
,
pCfg
->
vgId
,
tstrerror
(
terrno
));
vError
(
"vgId:%d, failed to save vnode config since %s"
,
pCfg
?
pCfg
->
vgId
:
0
,
tstrerror
(
terrno
));
return
-
1
;
}
...
...
source/dnode/vnode/src/vnd/vnodeSnapshot.c
浏览文件 @
2268a1e9
...
...
@@ -166,7 +166,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
if
(
*
ppData
)
{
goto
_exit
;
}
else
{
pReader
->
tq
Handle
Done
=
1
;
pReader
->
tq
Offset
Done
=
1
;
code
=
tqOffsetReaderClose
(
&
pReader
->
pTqOffsetReader
);
if
(
code
)
goto
_err
;
}
...
...
@@ -219,7 +219,7 @@ _exit:
return
code
;
_err:
vError
(
"vgId:% vnode snapshot read failed since %s"
,
TD_VID
(
pReader
->
pVnode
),
tstrerror
(
code
));
vError
(
"vgId:%
d
vnode snapshot read failed since %s"
,
TD_VID
(
pReader
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
...
...
@@ -260,7 +260,10 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr
// commit it
code
=
vnodeCommit
(
pVnode
);
if
(
code
)
goto
_err
;
if
(
code
)
{
taosMemoryFree
(
pWriter
);
goto
_err
;
}
// inc commit ID
pVnode
->
state
.
commitID
++
;
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
2268a1e9
...
...
@@ -68,7 +68,10 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int64_t
ctime
=
taosGetTimestampMs
();
tb_uid_t
uid
;
tInitSubmitMsgIter
(
pSubmitReq
,
&
msgIter
);
if
(
tInitSubmitMsgIter
(
pSubmitReq
,
&
msgIter
)
<
0
)
{
code
=
terrno
;
goto
_err
;
}
for
(;;)
{
tGetSubmitMsgNext
(
&
msgIter
,
&
pBlock
);
...
...
source/libs/tdb/src/db/tdbPager.c
浏览文件 @
2268a1e9
...
...
@@ -34,9 +34,9 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
static
int
tdbPagerWritePageToJournal
(
SPager
*
pPager
,
SPage
*
pPage
);
static
int
tdbPagerWritePageToDB
(
SPager
*
pPager
,
SPage
*
pPage
);
static
FORCE_INLINE
int32_t
pageCmpFn
(
const
void
*
lhs
,
const
void
*
rhs
)
{
SPage
*
pPageL
=
(
SPage
*
)(((
uint8_t
*
)
lhs
)
-
sizeof
(
SRBTreeN
ode
));
SPage
*
pPageR
=
(
SPage
*
)(((
uint8_t
*
)
rhs
)
-
sizeof
(
SRBTreeN
ode
));
static
FORCE_INLINE
int32_t
pageCmpFn
(
const
SRBTreeNode
*
lhs
,
const
SRBTreeNode
*
rhs
)
{
SPage
*
pPageL
=
(
SPage
*
)(((
uint8_t
*
)
lhs
)
-
offsetof
(
SPage
,
n
ode
));
SPage
*
pPageR
=
(
SPage
*
)(((
uint8_t
*
)
rhs
)
-
offsetof
(
SPage
,
n
ode
));
SPgno
pgnoL
=
TDB_PAGE_PGNO
(
pPageL
);
SPgno
pgnoR
=
TDB_PAGE_PGNO
(
pPageR
);
...
...
source/util/src/trbtree.c
浏览文件 @
2268a1e9
...
...
@@ -219,7 +219,7 @@ SRBTreeNode *tRBTreePut(SRBTree *pTree, SRBTreeNode *z) {
while
(
temp
!=
pTree
->
NIL
)
{
y
=
temp
;
int32_t
c
=
pTree
->
cmprFn
(
RBTREE_NODE_PAYLOAD
(
z
),
RBTREE_NODE_PAYLOAD
(
temp
)
);
int32_t
c
=
pTree
->
cmprFn
(
z
,
temp
);
if
(
c
<
0
)
{
temp
=
temp
->
left
;
}
else
if
(
c
>
0
)
{
...
...
@@ -232,7 +232,7 @@ SRBTreeNode *tRBTreePut(SRBTree *pTree, SRBTreeNode *z) {
if
(
y
==
pTree
->
NIL
)
{
pTree
->
root
=
z
;
}
else
if
(
pTree
->
cmprFn
(
RBTREE_NODE_PAYLOAD
(
z
),
RBTREE_NODE_PAYLOAD
(
y
)
)
<
0
)
{
}
else
if
(
pTree
->
cmprFn
(
z
,
y
)
<
0
)
{
y
->
left
=
z
;
}
else
{
y
->
right
=
z
;
...
...
@@ -245,10 +245,10 @@ SRBTreeNode *tRBTreePut(SRBTree *pTree, SRBTreeNode *z) {
tRBTreePutFix
(
pTree
,
z
);
// update min/max node
if
(
pTree
->
min
==
pTree
->
NIL
||
pTree
->
cmprFn
(
RBTREE_NODE_PAYLOAD
(
pTree
->
min
),
RBTREE_NODE_PAYLOAD
(
z
)
)
>
0
)
{
if
(
pTree
->
min
==
pTree
->
NIL
||
pTree
->
cmprFn
(
pTree
->
min
,
z
)
>
0
)
{
pTree
->
min
=
z
;
}
if
(
pTree
->
max
==
pTree
->
NIL
||
pTree
->
cmprFn
(
RBTREE_NODE_PAYLOAD
(
pTree
->
max
),
RBTREE_NODE_PAYLOAD
(
z
)
)
<
0
)
{
if
(
pTree
->
max
==
pTree
->
NIL
||
pTree
->
cmprFn
(
pTree
->
max
,
z
)
<
0
)
{
pTree
->
max
=
z
;
}
pTree
->
n
++
;
...
...
@@ -309,11 +309,11 @@ SRBTreeNode *tRBTreeDropByKey(SRBTree *pTree, void *pKey) {
return
pNode
;
}
SRBTreeNode
*
tRBTreeGet
(
SRBTree
*
pTree
,
void
*
pKey
)
{
SRBTreeNode
*
tRBTreeGet
(
SRBTree
*
pTree
,
const
SRBTreeNode
*
pKeyNode
)
{
SRBTreeNode
*
pNode
=
pTree
->
root
;
while
(
pNode
!=
pTree
->
NIL
)
{
int32_t
c
=
pTree
->
cmprFn
(
pKey
,
RBTREE_NODE_PAYLOAD
(
pNode
)
);
int32_t
c
=
pTree
->
cmprFn
(
pKey
Node
,
pNode
);
if
(
c
<
0
)
{
pNode
=
pNode
->
left
;
...
...
source/util/src/tskiplist.c
浏览文件 @
2268a1e9
...
...
@@ -145,7 +145,10 @@ void tSkipListPutBatchByIter(SSkipList *pSkipList, void *iter, iter_next_fn_t it
tSkipListWLock
(
pSkipList
);
void
*
pData
=
iterate
(
iter
);
if
(
pData
==
NULL
)
return
;
if
(
pData
==
NULL
)
{
tSkipListUnlock
(
pSkipList
);
return
;
}
// backward to put the first data
hasDup
=
tSkipListGetPosToPut
(
pSkipList
,
backward
,
pData
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录