Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
df616a5f
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
df616a5f
编写于
7月 19, 2022
作者:
H
Hongze Cheng
提交者:
GitHub
7月 19, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #15125 from taosdata/fix/hzcheng_3.0
fix: r/w concurrency
上级
b71c3d52
024f2221
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
230 addition
and
140 deletion
+230
-140
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+23
-30
source/dnode/vnode/src/inc/vnd.h
source/dnode/vnode/src/inc/vnd.h
+8
-7
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+22
-20
source/dnode/vnode/src/tsdb/tsdbCache.c
source/dnode/vnode/src/tsdb/tsdbCache.c
+17
-8
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+35
-13
source/dnode/vnode/src/tsdb/tsdbMemTable.c
source/dnode/vnode/src/tsdb/tsdbMemTable.c
+58
-0
source/dnode/vnode/src/tsdb/tsdbOpen.c
source/dnode/vnode/src/tsdb/tsdbOpen.c
+4
-28
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+13
-7
source/dnode/vnode/src/tsdb/tsdbRetention.c
source/dnode/vnode/src/tsdb/tsdbRetention.c
+5
-5
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
+9
-9
source/dnode/vnode/src/vnd/vnodeBufPool.c
source/dnode/vnode/src/vnd/vnodeBufPool.c
+26
-3
source/dnode/vnode/src/vnd/vnodeCommit.c
source/dnode/vnode/src/vnd/vnodeCommit.c
+6
-10
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+4
-0
未找到文件。
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
df616a5f
...
...
@@ -174,6 +174,10 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg);
int32_t
tsdbMemTableCreate
(
STsdb
*
pTsdb
,
SMemTable
**
ppMemTable
);
void
tsdbMemTableDestroy
(
SMemTable
*
pMemTable
);
void
tsdbGetTbDataFromMemTable
(
SMemTable
*
pMemTable
,
tb_uid_t
suid
,
tb_uid_t
uid
,
STbData
**
ppTbData
);
void
tsdbRefMemTable
(
SMemTable
*
pMemTable
);
void
tsdbUnrefMemTable
(
SMemTable
*
pMemTable
);
int32_t
tsdbTakeMemSnapshot
(
STsdb
*
pTsdb
,
SMemTable
**
ppMem
,
SMemTable
**
ppIMem
);
void
tsdbUntakeMemSnapshot
(
STsdb
*
pTsdb
,
SMemTable
*
pMem
,
SMemTable
*
pIMem
);
// STbDataIter
int32_t
tsdbTbDataIterCreate
(
STbData
*
pTbData
,
TSDBKEY
*
pFrom
,
int8_t
backward
,
STbDataIter
**
ppIter
);
void
*
tsdbTbDataIterDestroy
(
STbDataIter
*
pIter
);
...
...
@@ -273,23 +277,14 @@ typedef struct {
}
SRtn
;
struct
STsdb
{
char
*
path
;
SVnode
*
pVnode
;
TdThreadMutex
mutex
;
bool
repoLocked
;
STsdbKeepCfg
keepCfg
;
SMemTable
*
mem
;
SMemTable
*
imem
;
SRtn
rtn
;
STsdbFS
*
fs
;
SLRUCache
*
lruCache
;
};
struct
STable
{
uint64_t
suid
;
uint64_t
uid
;
STSchema
*
pSchema
;
// latest schema
STSchema
*
pCacheSchema
;
// cached cache
char
*
path
;
SVnode
*
pVnode
;
STsdbKeepCfg
keepCfg
;
TdThreadRwlock
rwLock
;
SMemTable
*
mem
;
SMemTable
*
imem
;
STsdbFS
*
pFS
;
SLRUCache
*
lruCache
;
};
struct
TSDBKEY
{
...
...
@@ -330,21 +325,19 @@ struct STbData {
};
struct
SMemTable
{
SRWLatch
latch
;
STsdb
*
pTsdb
;
int32_t
nRef
;
TSKEY
minKey
;
TSKEY
maxKey
;
int64_t
minVersion
;
int64_t
maxVersion
;
int64_t
nRow
;
int64_t
nDel
;
SArray
*
aTbData
;
// SArray<STbData*>
SRWLatch
latch
;
STsdb
*
pTsdb
;
SVBufPool
*
pPool
;
volatile
int32_t
nRef
;
TSKEY
minKey
;
TSKEY
maxKey
;
int64_t
minVersion
;
int64_t
maxVersion
;
int64_t
nRow
;
int64_t
nDel
;
SArray
*
aTbData
;
// SArray<STbData*>
};
int
tsdbLockRepo
(
STsdb
*
pTsdb
);
int
tsdbUnlockRepo
(
STsdb
*
pTsdb
);
struct
TSDBROW
{
int8_t
type
;
// 0 for row from tsRow, 1 for row from block data
union
{
...
...
source/dnode/vnode/src/inc/vnd.h
浏览文件 @
df616a5f
...
...
@@ -62,12 +62,13 @@ struct SVBufPoolNode {
};
struct
SVBufPool
{
SVBufPool
*
next
;
int64_t
nRef
;
int64_t
size
;
uint8_t
*
ptr
;
SVBufPoolNode
*
pTail
;
SVBufPoolNode
node
;
SVBufPool
*
next
;
SVnode
*
pVnode
;
volatile
int32_t
nRef
;
int64_t
size
;
uint8_t
*
ptr
;
SVBufPoolNode
*
pTail
;
SVBufPoolNode
node
;
};
int32_t
vnodeOpenBufPool
(
SVnode
*
pVnode
,
int64_t
size
);
...
...
@@ -78,7 +79,7 @@ void vnodeBufPoolReset(SVBufPool* pPool);
int32_t
vnodeQueryOpen
(
SVnode
*
pVnode
);
void
vnodeQueryClose
(
SVnode
*
pVnode
);
int32_t
vnodeGetTableMeta
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
int
vnodeGetTableCfg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
int
vnodeGetTableCfg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
// vnodeCommit.c
int32_t
vnodeBegin
(
SVnode
*
pVnode
);
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
df616a5f
...
...
@@ -77,6 +77,8 @@ typedef struct SSnapDataHdr SSnapDataHdr;
// vnd.h
void
*
vnodeBufPoolMalloc
(
SVBufPool
*
pPool
,
int
size
);
void
vnodeBufPoolFree
(
SVBufPool
*
pPool
,
void
*
p
);
void
vnodeBufPoolRef
(
SVBufPool
*
pPool
);
void
vnodeBufPoolUnRef
(
SVBufPool
*
pPool
);
// meta
typedef
struct
SMCtbCursor
SMCtbCursor
;
...
...
@@ -247,26 +249,26 @@ struct STsdbKeepCfg {
};
struct
SVnode
{
char
*
path
;
SVnodeCfg
config
;
SVState
state
;
STfs
*
pTfs
;
SMsgCb
msgCb
;
SVBufPool
*
pPool
;
SVBufPool
*
inUse
;
SVBufPool
*
onCommit
;
SVBufPool
*
onRecycl
e
;
SMeta
*
pMeta
;
SSma
*
pSma
;
STsdb
*
pTsdb
;
SWal
*
pWal
;
STQ
*
pTq
;
SSink
*
pSink
;
tsem_t
canCommit
;
int64_t
sync
;
int32_t
blockCount
;
tsem_t
syncSem
;
SQHandle
*
pQuery
;
char
*
path
;
SVnodeCfg
config
;
SVState
state
;
STfs
*
pTfs
;
SMsgCb
msgCb
;
TdThreadMutex
mutex
;
TdThreadCond
poolNotEmpty
;
SVBufPool
*
pPool
;
SVBufPool
*
inUs
e
;
SMeta
*
pMeta
;
SSma
*
pSma
;
STsdb
*
pTsdb
;
SWal
*
pWal
;
STQ
*
pTq
;
SSink
*
pSink
;
tsem_t
canCommit
;
int64_t
sync
;
int32_t
blockCount
;
tsem_t
syncSem
;
SQHandle
*
pQuery
;
};
#define TD_VID(PVNODE) ((PVNODE)->config.vgId)
...
...
source/dnode/vnode/src/tsdb/tsdbCache.c
浏览文件 @
df616a5f
...
...
@@ -464,7 +464,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
switch
(
state
->
state
)
{
case
SFSNEXTROW_FS
:
state
->
aDFileSet
=
state
->
pTsdb
->
fs
->
cState
->
aDFileSet
;
state
->
aDFileSet
=
state
->
pTsdb
->
pFS
->
cState
->
aDFileSet
;
state
->
nFileSet
=
taosArrayGetSize
(
state
->
aDFileSet
);
state
->
iFileSet
=
state
->
nFileSet
;
...
...
@@ -793,6 +793,9 @@ typedef struct {
TSDBROW
memRow
,
imemRow
,
fsRow
;
TsdbNextRowState
input
[
3
];
SMemTable
*
pMemTable
;
SMemTable
*
pIMemTable
;
STsdb
*
pTsdb
;
}
CacheNextRowIter
;
static
int32_t
nextRowIterOpen
(
CacheNextRowIter
*
pIter
,
tb_uid_t
uid
,
STsdb
*
pTsdb
)
{
...
...
@@ -800,21 +803,25 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs
tb_uid_t
suid
=
getTableSuidByUid
(
uid
,
pTsdb
);
tsdbTakeMemSnapshot
(
pTsdb
,
&
pIter
->
pMemTable
,
&
pIter
->
pIMemTable
);
STbData
*
pMem
=
NULL
;
if
(
p
Tsdb
->
mem
)
{
tsdbGetTbDataFromMemTable
(
p
Tsdb
->
mem
,
suid
,
uid
,
&
pMem
);
if
(
p
Iter
->
pMemTable
)
{
tsdbGetTbDataFromMemTable
(
p
Iter
->
pMemTable
,
suid
,
uid
,
&
pMem
);
}
STbData
*
pIMem
=
NULL
;
if
(
p
Tsdb
->
imem
)
{
tsdbGetTbDataFromMemTable
(
p
Tsdb
->
imem
,
suid
,
uid
,
&
pIMem
);
if
(
p
Iter
->
pIMemTable
)
{
tsdbGetTbDataFromMemTable
(
p
Iter
->
pIMemTable
,
suid
,
uid
,
&
pIMem
);
}
pIter
->
pTsdb
=
pTsdb
;
pIter
->
pSkyline
=
taosArrayInit
(
32
,
sizeof
(
TSDBKEY
));
SDelIdx
delIdx
;
SDelFile
*
pDelFile
=
tsdbFSStateGetDelFile
(
pTsdb
->
fs
->
cState
);
SDelFile
*
pDelFile
=
tsdbFSStateGetDelFile
(
pTsdb
->
pFS
->
cState
);
if
(
pDelFile
)
{
SDelFReader
*
pDelFReader
;
...
...
@@ -878,6 +885,8 @@ static int32_t nextRowIterClose(CacheNextRowIter *pIter) {
taosArrayDestroy
(
pIter
->
pSkyline
);
}
tsdbUntakeMemSnapshot
(
pIter
->
pTsdb
,
pIter
->
pMemTable
,
pIter
->
pIMemTable
);
return
code
;
_err:
return
code
;
...
...
@@ -1189,7 +1198,7 @@ static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, bool *dup, STSRow **ppRo
SDelIdx
delIdx
;
SDelFile
*
pDelFile
=
tsdbFSStateGetDelFile
(
pTsdb
->
fs
->
cState
);
SDelFile
*
pDelFile
=
tsdbFSStateGetDelFile
(
pTsdb
->
pFS
->
cState
);
if
(
pDelFile
)
{
SDelFReader
*
pDelFReader
;
...
...
@@ -1377,7 +1386,7 @@ static int32_t mergeLast(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray) {
SDelIdx
delIdx
;
SDelFile
*
pDelFile
=
tsdbFSStateGetDelFile
(
pTsdb
->
fs
->
cState
);
SDelFile
*
pDelFile
=
tsdbFSStateGetDelFile
(
pTsdb
->
pFS
->
cState
);
if
(
pDelFile
)
{
SDelFReader
*
pDelFReader
;
...
...
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
df616a5f
...
...
@@ -64,9 +64,26 @@ int32_t tsdbBegin(STsdb *pTsdb) {
if
(
!
pTsdb
)
return
code
;
code
=
tsdbMemTableCreate
(
pTsdb
,
&
pTsdb
->
mem
);
SMemTable
*
pMemTable
;
code
=
tsdbMemTableCreate
(
pTsdb
,
&
pMemTable
);
if
(
code
)
goto
_err
;
// lock
code
=
taosThreadRwlockWrlock
(
&
pTsdb
->
rwLock
);
if
(
code
)
{
code
=
TAOS_SYSTEM_ERROR
(
code
);
goto
_err
;
}
pTsdb
->
mem
=
pMemTable
;
// unlock
code
=
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
if
(
code
)
{
code
=
TAOS_SYSTEM_ERROR
(
code
);
goto
_err
;
}
return
code
;
_err:
...
...
@@ -83,9 +100,11 @@ int32_t tsdbCommit(STsdb *pTsdb) {
// check
if
(
pMemTable
->
nRow
==
0
&&
pMemTable
->
nDel
==
0
)
{
// TODO: lock?
taosThreadRwlockWrlock
(
&
pTsdb
->
rwLock
);
pTsdb
->
mem
=
NULL
;
tsdbMemTableDestroy
(
pMemTable
);
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
tsdbUnrefMemTable
(
pMemTable
);
goto
_exit
;
}
...
...
@@ -139,7 +158,7 @@ static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
goto
_err
;
}
SDelFile
*
pDelFileR
=
pTsdb
->
fs
->
nState
->
pDelFile
;
SDelFile
*
pDelFileR
=
pTsdb
->
pFS
->
nState
->
pDelFile
;
if
(
pDelFileR
)
{
code
=
tsdbDelFReaderOpen
(
&
pCommitter
->
pDelFReader
,
pDelFileR
,
pTsdb
,
NULL
);
if
(
code
)
goto
_err
;
...
...
@@ -228,7 +247,7 @@ static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
code
=
tsdbUpdateDelFileHdr
(
pCommitter
->
pDelFWriter
);
if
(
code
)
goto
_err
;
code
=
tsdbFSStateUpsertDelFile
(
pTsdb
->
fs
->
nState
,
&
pCommitter
->
pDelFWriter
->
fDel
);
code
=
tsdbFSStateUpsertDelFile
(
pTsdb
->
pFS
->
nState
,
&
pCommitter
->
pDelFWriter
->
fDel
);
if
(
code
)
goto
_err
;
code
=
tsdbDelFWriterClose
(
&
pCommitter
->
pDelFWriter
,
1
);
...
...
@@ -263,7 +282,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
taosArrayClear
(
pCommitter
->
aBlockIdx
);
tMapDataReset
(
&
pCommitter
->
oBlockMap
);
tBlockDataReset
(
&
pCommitter
->
oBlockData
);
pRSet
=
tsdbFSStateGetDFileSet
(
pTsdb
->
fs
->
nState
,
pCommitter
->
commitFid
,
TD_EQ
);
pRSet
=
tsdbFSStateGetDFileSet
(
pTsdb
->
pFS
->
nState
,
pCommitter
->
commitFid
,
TD_EQ
);
if
(
pRSet
)
{
code
=
tsdbDataFReaderOpen
(
&
pCommitter
->
pReader
,
pTsdb
,
pRSet
);
if
(
code
)
goto
_err
;
...
...
@@ -836,7 +855,7 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
if
(
code
)
goto
_err
;
// upsert SDFileSet
code
=
tsdbFSStateUpsertDFileSet
(
pCommitter
->
pTsdb
->
fs
->
nState
,
tsdbDataFWriterGetWSet
(
pCommitter
->
pWriter
));
code
=
tsdbFSStateUpsertDFileSet
(
pCommitter
->
pTsdb
->
pFS
->
nState
,
tsdbDataFWriterGetWSet
(
pCommitter
->
pWriter
));
if
(
code
)
goto
_err
;
// close and sync
...
...
@@ -941,10 +960,10 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
memset
(
pCommitter
,
0
,
sizeof
(
*
pCommitter
));
ASSERT
(
pTsdb
->
mem
&&
pTsdb
->
imem
==
NULL
);
// lock(
);
taosThreadRwlockWrlock
(
&
pTsdb
->
rwLock
);
pTsdb
->
imem
=
pTsdb
->
mem
;
pTsdb
->
mem
=
NULL
;
// unlock(
);
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
pCommitter
->
pTsdb
=
pTsdb
;
pCommitter
->
commitID
=
pTsdb
->
pVnode
->
state
.
commitID
;
...
...
@@ -954,7 +973,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
pCommitter
->
maxRow
=
pTsdb
->
pVnode
->
config
.
tsdbCfg
.
maxRows
;
pCommitter
->
cmprAlg
=
pTsdb
->
pVnode
->
config
.
tsdbCfg
.
compression
;
code
=
tsdbFSBegin
(
pTsdb
->
fs
);
code
=
tsdbFSBegin
(
pTsdb
->
pFS
);
if
(
code
)
goto
_err
;
return
code
;
...
...
@@ -1135,13 +1154,16 @@ static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
SMemTable
*
pMemTable
=
pTsdb
->
imem
;
if
(
eno
==
0
)
{
code
=
tsdbFSCommit
(
pTsdb
->
fs
);
code
=
tsdbFSCommit
(
pTsdb
->
pFS
);
}
else
{
code
=
tsdbFSRollback
(
pTsdb
->
fs
);
code
=
tsdbFSRollback
(
pTsdb
->
pFS
);
}
t
sdbMemTableDestroy
(
pMemTable
);
t
aosThreadRwlockWrlock
(
&
pTsdb
->
rwLock
);
pTsdb
->
imem
=
NULL
;
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
tsdbUnrefMemTable
(
pMemTable
);
tsdbInfo
(
"vgId:%d tsdb end commit"
,
TD_VID
(
pTsdb
->
pVnode
));
return
code
;
...
...
source/dnode/vnode/src/tsdb/tsdbMemTable.c
浏览文件 @
df616a5f
...
...
@@ -41,6 +41,7 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
}
taosInitRWLatch
(
&
pMemTable
->
latch
);
pMemTable
->
pTsdb
=
pTsdb
;
pMemTable
->
pPool
=
pTsdb
->
pVnode
->
inUse
;
pMemTable
->
nRef
=
1
;
pMemTable
->
minKey
=
TSKEY_MAX
;
pMemTable
->
maxKey
=
TSKEY_MIN
;
...
...
@@ -54,6 +55,7 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
taosMemoryFree
(
pMemTable
);
goto
_err
;
}
vnodeBufPoolRef
(
pMemTable
->
pPool
);
*
ppMemTable
=
pMemTable
;
return
code
;
...
...
@@ -65,6 +67,7 @@ _err:
void
tsdbMemTableDestroy
(
SMemTable
*
pMemTable
)
{
if
(
pMemTable
)
{
vnodeBufPoolUnRef
(
pMemTable
->
pPool
);
taosArrayDestroy
(
pMemTable
->
aTbData
);
taosMemoryFree
(
pMemTable
);
}
...
...
@@ -590,3 +593,58 @@ _err:
}
int32_t
tsdbGetNRowsInTbData
(
STbData
*
pTbData
)
{
return
pTbData
->
sl
.
size
;
}
void
tsdbRefMemTable
(
SMemTable
*
pMemTable
)
{
int32_t
nRef
=
atomic_fetch_add_32
(
&
pMemTable
->
nRef
,
1
);
ASSERT
(
nRef
>
0
);
}
void
tsdbUnrefMemTable
(
SMemTable
*
pMemTable
)
{
int32_t
nRef
=
atomic_sub_fetch_32
(
&
pMemTable
->
nRef
,
1
);
if
(
nRef
==
0
)
{
tsdbMemTableDestroy
(
pMemTable
);
}
}
int32_t
tsdbTakeMemSnapshot
(
STsdb
*
pTsdb
,
SMemTable
**
ppMem
,
SMemTable
**
ppIMem
)
{
int32_t
code
=
0
;
// lock
code
=
taosThreadRwlockRdlock
(
&
pTsdb
->
rwLock
);
if
(
code
)
{
code
=
TAOS_SYSTEM_ERROR
(
code
);
goto
_exit
;
}
// take snapshot
*
ppMem
=
pTsdb
->
mem
;
*
ppIMem
=
pTsdb
->
imem
;
if
(
*
ppMem
)
{
tsdbRefMemTable
(
*
ppMem
);
}
if
(
*
ppIMem
)
{
tsdbRefMemTable
(
*
ppIMem
);
}
// unlock
code
=
taosThreadRwlockUnlock
(
&
pTsdb
->
rwLock
);
if
(
code
)
{
code
=
TAOS_SYSTEM_ERROR
(
code
);
goto
_exit
;
}
_exit:
return
code
;
}
void
tsdbUntakeMemSnapshot
(
STsdb
*
pTsdb
,
SMemTable
*
pMem
,
SMemTable
*
pIMem
)
{
if
(
pMem
)
{
tsdbUnrefMemTable
(
pMem
);
}
if
(
pIMem
)
{
tsdbUnrefMemTable
(
pIMem
);
}
}
\ No newline at end of file
source/dnode/vnode/src/tsdb/tsdbOpen.c
浏览文件 @
df616a5f
...
...
@@ -54,8 +54,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
sprintf
(
pTsdb
->
path
,
"%s%s%s"
,
pVnode
->
path
,
TD_DIRSEP
,
dir
);
taosRealPath
(
pTsdb
->
path
,
NULL
,
slen
);
pTsdb
->
pVnode
=
pVnode
;
pTsdb
->
repoLocked
=
false
;
taosThreadMutexInit
(
&
pTsdb
->
mutex
,
NULL
);
taosThreadRwlockInit
(
&
pTsdb
->
rwLock
,
NULL
);
if
(
!
pKeepCfg
)
{
tsdbSetKeepCfg
(
&
pTsdb
->
keepCfg
,
&
pVnode
->
config
.
tsdbCfg
);
}
else
{
...
...
@@ -67,7 +66,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
tfsMkdir
(
pVnode
->
pTfs
,
pTsdb
->
path
);
// open tsdb
if
(
tsdbFSOpen
(
pTsdb
,
&
pTsdb
->
fs
)
<
0
)
{
if
(
tsdbFSOpen
(
pTsdb
,
&
pTsdb
->
pFS
)
<
0
)
{
goto
_err
;
}
...
...
@@ -88,33 +87,10 @@ _err:
int
tsdbClose
(
STsdb
**
pTsdb
)
{
if
(
*
pTsdb
)
{
taosThread
MutexDestroy
(
&
(
*
pTsdb
)
->
mutex
);
tsdbFSClose
((
*
pTsdb
)
->
fs
);
taosThread
RwlockDestroy
(
&
(
*
pTsdb
)
->
rwLock
);
tsdbFSClose
((
*
pTsdb
)
->
pFS
);
tsdbCloseCache
((
*
pTsdb
)
->
lruCache
);
taosMemoryFreeClear
(
*
pTsdb
);
}
return
0
;
}
int
tsdbLockRepo
(
STsdb
*
pTsdb
)
{
int
code
=
taosThreadMutexLock
(
&
pTsdb
->
mutex
);
if
(
code
!=
0
)
{
tsdbError
(
"vgId:%d, failed to lock tsdb since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
return
-
1
;
}
pTsdb
->
repoLocked
=
true
;
return
0
;
}
int
tsdbUnlockRepo
(
STsdb
*
pTsdb
)
{
// ASSERT(IS_REPO_LOCKED(pTsdb));
pTsdb
->
repoLocked
=
false
;
int
code
=
taosThreadMutexUnlock
(
&
pTsdb
->
mutex
);
if
(
code
!=
0
)
{
tsdbError
(
"vgId:%d, failed to unlock tsdb since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
return
-
1
;
}
return
0
;
}
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
df616a5f
...
...
@@ -118,6 +118,8 @@ struct STsdbReader {
char
*
idStr
;
// query info handle, for debug purpose
int32_t
type
;
// query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
SBlockLoadSuppInfo
suppInfo
;
SMemTable
*
pMem
;
SMemTable
*
pIMem
;
SIOCostSummary
cost
;
STSchema
*
pSchema
;
...
...
@@ -1880,8 +1882,8 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
int32_t
backward
=
(
!
ASCENDING_TRAVERSE
(
pReader
->
order
));
STbData
*
d
=
NULL
;
if
(
pReader
->
p
Tsdb
->
m
em
!=
NULL
)
{
tsdbGetTbDataFromMemTable
(
pReader
->
p
Tsdb
->
m
em
,
pReader
->
suid
,
pBlockScanInfo
->
uid
,
&
d
);
if
(
pReader
->
p
M
em
!=
NULL
)
{
tsdbGetTbDataFromMemTable
(
pReader
->
p
M
em
,
pReader
->
suid
,
pBlockScanInfo
->
uid
,
&
d
);
if
(
d
!=
NULL
)
{
code
=
tsdbTbDataIterCreate
(
d
,
&
startKey
,
backward
,
&
pBlockScanInfo
->
iter
.
iter
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
...
...
@@ -1901,8 +1903,8 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea
}
STbData
*
di
=
NULL
;
if
(
pReader
->
p
Tsdb
->
im
em
!=
NULL
)
{
tsdbGetTbDataFromMemTable
(
pReader
->
p
Tsdb
->
im
em
,
pReader
->
suid
,
pBlockScanInfo
->
uid
,
&
di
);
if
(
pReader
->
p
IM
em
!=
NULL
)
{
tsdbGetTbDataFromMemTable
(
pReader
->
p
IM
em
,
pReader
->
suid
,
pBlockScanInfo
->
uid
,
&
di
);
if
(
di
!=
NULL
)
{
code
=
tsdbTbDataIterCreate
(
di
,
&
startKey
,
backward
,
&
pBlockScanInfo
->
iiter
.
iter
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
...
...
@@ -1938,7 +1940,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
SArray
*
pDelData
=
taosArrayInit
(
4
,
sizeof
(
SDelData
));
SDelFile
*
pDelFile
=
tsdbFSStateGetDelFile
(
pTsdb
->
fs
->
cState
);
SDelFile
*
pDelFile
=
tsdbFSStateGetDelFile
(
pTsdb
->
pFS
->
cState
);
if
(
pDelFile
)
{
SDelFReader
*
pDelFReader
=
NULL
;
code
=
tsdbDelFReaderOpen
(
&
pDelFReader
,
pDelFile
,
pTsdb
,
NULL
);
...
...
@@ -2828,7 +2830,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
SDataBlockIter
*
pBlockIter
=
&
pReader
->
status
.
blockIter
;
STsdbFSState
*
pFState
=
pReader
->
pTsdb
->
fs
->
cState
;
STsdbFSState
*
pFState
=
pReader
->
pTsdb
->
pFS
->
cState
;
initFilesetIterator
(
&
pReader
->
status
.
fileIter
,
pFState
,
pReader
->
order
,
pReader
->
idStr
);
resetDataBlockIterator
(
&
pReader
->
status
.
blockIter
,
pReader
->
order
);
...
...
@@ -2842,6 +2844,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
}
}
tsdbTakeMemSnapshot
(
pReader
->
pTsdb
,
&
pReader
->
pMem
,
&
pReader
->
pIMem
);
tsdbDebug
(
"%p total numOfTable:%d in this query %s"
,
pReader
,
numOfTables
,
pReader
->
idStr
);
return
code
;
...
...
@@ -2857,6 +2861,8 @@ void tsdbReaderClose(STsdbReader* pReader) {
SBlockLoadSuppInfo
*
pSupInfo
=
&
pReader
->
suppInfo
;
tsdbUntakeMemSnapshot
(
pReader
->
pTsdb
,
pReader
->
pMem
,
pReader
->
pIMem
);
taosMemoryFreeClear
(
pSupInfo
->
plist
);
taosMemoryFree
(
pSupInfo
->
colIds
);
...
...
@@ -3075,7 +3081,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
tsdbDataFReaderClose
(
&
pReader
->
pFileReader
);
STsdbFSState
*
pFState
=
pReader
->
pTsdb
->
fs
->
cState
;
STsdbFSState
*
pFState
=
pReader
->
pTsdb
->
pFS
->
cState
;
initFilesetIterator
(
&
pReader
->
status
.
fileIter
,
pFState
,
pReader
->
order
,
pReader
->
idStr
);
resetDataBlockIterator
(
&
pReader
->
status
.
blockIter
,
pReader
->
order
);
resetDataBlockScanInfo
(
pReader
->
status
.
pTableMap
);
...
...
source/dnode/vnode/src/tsdb/tsdbRetention.c
浏览文件 @
df616a5f
...
...
@@ -20,10 +20,10 @@ static int32_t tsdbDoRetentionImpl(STsdb *pTsdb, int64_t now, int8_t try, int8_t
STsdbFSState
*
pState
;
if
(
try
)
{
pState
=
pTsdb
->
fs
->
cState
;
pState
=
pTsdb
->
pFS
->
cState
;
*
canDo
=
0
;
}
else
{
pState
=
pTsdb
->
fs
->
nState
;
pState
=
pTsdb
->
pFS
->
nState
;
}
for
(
int32_t
iSet
=
0
;
iSet
<
taosArrayGetSize
(
pState
->
aDFileSet
);
iSet
++
)
{
...
...
@@ -83,7 +83,7 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
if
(
!
canDo
)
goto
_exit
;
// begin
code
=
tsdbFSBegin
(
pTsdb
->
fs
);
code
=
tsdbFSBegin
(
pTsdb
->
pFS
);
if
(
code
)
goto
_err
;
// do retention
...
...
@@ -91,7 +91,7 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) {
if
(
code
)
goto
_err
;
// commit
code
=
tsdbFSCommit
(
pTsdb
->
fs
);
code
=
tsdbFSCommit
(
pTsdb
->
pFS
);
if
(
code
)
goto
_err
;
_exit:
...
...
@@ -99,6 +99,6 @@ _exit:
_err:
tsdbError
(
"vgId:%d tsdb do retention failed since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbFSRollback
(
pTsdb
->
fs
);
tsdbFSRollback
(
pTsdb
->
pFS
);
return
code
;
}
\ No newline at end of file
source/dnode/vnode/src/tsdb/tsdbSnapshot.c
浏览文件 @
df616a5f
...
...
@@ -45,7 +45,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
while
(
true
)
{
if
(
pReader
->
pDataFReader
==
NULL
)
{
SDFileSet
*
pSet
=
tsdbFSStateGetDFileSet
(
pTsdb
->
fs
->
cState
,
pReader
->
fid
,
TD_GT
);
SDFileSet
*
pSet
=
tsdbFSStateGetDFileSet
(
pTsdb
->
pFS
->
cState
,
pReader
->
fid
,
TD_GT
);
if
(
pSet
==
NULL
)
goto
_exit
;
...
...
@@ -159,7 +159,7 @@ _err:
static
int32_t
tsdbSnapReadDel
(
STsdbSnapReader
*
pReader
,
uint8_t
**
ppData
)
{
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pReader
->
pTsdb
;
SDelFile
*
pDelFile
=
pTsdb
->
fs
->
cState
->
pDelFile
;
SDelFile
*
pDelFile
=
pTsdb
->
pFS
->
cState
->
pDelFile
;
if
(
pReader
->
pDelFReader
==
NULL
)
{
if
(
pDelFile
==
NULL
)
{
...
...
@@ -798,7 +798,7 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) {
code
=
tsdbWriteBlockIdx
(
pWriter
->
pDataFWriter
,
pWriter
->
aBlockIdxW
,
NULL
);
if
(
code
)
goto
_err
;
code
=
tsdbFSStateUpsertDFileSet
(
pTsdb
->
fs
->
nState
,
tsdbDataFWriterGetWSet
(
pWriter
->
pDataFWriter
));
code
=
tsdbFSStateUpsertDFileSet
(
pTsdb
->
pFS
->
nState
,
tsdbDataFWriterGetWSet
(
pWriter
->
pDataFWriter
));
if
(
code
)
goto
_err
;
code
=
tsdbDataFWriterClose
(
&
pWriter
->
pDataFWriter
,
1
);
...
...
@@ -843,7 +843,7 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
pWriter
->
fid
=
fid
;
// read
SDFileSet
*
pSet
=
tsdbFSStateGetDFileSet
(
pTsdb
->
fs
->
nState
,
fid
,
TD_EQ
);
SDFileSet
*
pSet
=
tsdbFSStateGetDFileSet
(
pTsdb
->
pFS
->
nState
,
fid
,
TD_EQ
);
if
(
pSet
)
{
code
=
tsdbDataFReaderOpen
(
&
pWriter
->
pDataFReader
,
pTsdb
,
pSet
);
if
(
code
)
goto
_err
;
...
...
@@ -907,7 +907,7 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32
STsdb
*
pTsdb
=
pWriter
->
pTsdb
;
if
(
pWriter
->
pDelFWriter
==
NULL
)
{
SDelFile
*
pDelFile
=
tsdbFSStateGetDelFile
(
pTsdb
->
fs
->
nState
);
SDelFile
*
pDelFile
=
tsdbFSStateGetDelFile
(
pTsdb
->
pFS
->
nState
);
// reader
if
(
pDelFile
)
{
...
...
@@ -1017,7 +1017,7 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) {
code
=
tsdbUpdateDelFileHdr
(
pWriter
->
pDelFWriter
);
if
(
code
)
goto
_err
;
code
=
tsdbFSStateUpsertDelFile
(
pTsdb
->
fs
->
nState
,
&
pWriter
->
pDelFWriter
->
fDel
);
code
=
tsdbFSStateUpsertDelFile
(
pTsdb
->
pFS
->
nState
,
&
pWriter
->
pDelFWriter
->
fDel
);
if
(
code
)
goto
_err
;
code
=
tsdbDelFWriterClose
(
&
pWriter
->
pDelFWriter
,
1
);
...
...
@@ -1096,7 +1096,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr
goto
_err
;
}
code
=
tsdbFSBegin
(
pTsdb
->
fs
);
code
=
tsdbFSBegin
(
pTsdb
->
pFS
);
if
(
code
)
goto
_err
;
*
ppWriter
=
pWriter
;
...
...
@@ -1113,7 +1113,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
STsdbSnapWriter
*
pWriter
=
*
ppWriter
;
if
(
rollback
)
{
code
=
tsdbFSRollback
(
pWriter
->
pTsdb
->
fs
);
code
=
tsdbFSRollback
(
pWriter
->
pTsdb
->
pFS
);
if
(
code
)
goto
_err
;
}
else
{
code
=
tsdbSnapWriteDataEnd
(
pWriter
);
...
...
@@ -1122,7 +1122,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) {
code
=
tsdbSnapWriteDelEnd
(
pWriter
);
if
(
code
)
goto
_err
;
code
=
tsdbFSCommit
(
pWriter
->
pTsdb
->
fs
);
code
=
tsdbFSCommit
(
pWriter
->
pTsdb
->
pFS
);
if
(
code
)
goto
_err
;
}
...
...
source/dnode/vnode/src/vnd/vnodeBufPool.c
浏览文件 @
df616a5f
...
...
@@ -17,7 +17,7 @@
/* ------------------------ STRUCTURES ------------------------ */
static
int
vnodeBufPoolCreate
(
int64_t
size
,
SVBufPool
**
ppPool
);
static
int
vnodeBufPoolCreate
(
SVnode
*
pVnode
,
int64_t
size
,
SVBufPool
**
ppPool
);
static
int
vnodeBufPoolDestroy
(
SVBufPool
*
pPool
);
int
vnodeOpenBufPool
(
SVnode
*
pVnode
,
int64_t
size
)
{
...
...
@@ -28,7 +28,7 @@ int vnodeOpenBufPool(SVnode *pVnode, int64_t size) {
for
(
int
i
=
0
;
i
<
3
;
i
++
)
{
// create pool
ret
=
vnodeBufPoolCreate
(
size
,
&
pPool
);
ret
=
vnodeBufPoolCreate
(
pVnode
,
size
,
&
pPool
);
if
(
ret
<
0
)
{
vError
(
"vgId:%d, failed to open vnode buffer pool since %s"
,
TD_VID
(
pVnode
),
tstrerror
(
terrno
));
vnodeCloseBufPool
(
pVnode
);
...
...
@@ -120,7 +120,7 @@ void vnodeBufPoolFree(SVBufPool *pPool, void *p) {
}
// STATIC METHODS -------------------
static
int
vnodeBufPoolCreate
(
int64_t
size
,
SVBufPool
**
ppPool
)
{
static
int
vnodeBufPoolCreate
(
SVnode
*
pVnode
,
int64_t
size
,
SVBufPool
**
ppPool
)
{
SVBufPool
*
pPool
;
pPool
=
taosMemoryMalloc
(
sizeof
(
SVBufPool
)
+
size
);
...
...
@@ -130,6 +130,7 @@ static int vnodeBufPoolCreate(int64_t size, SVBufPool **ppPool) {
}
pPool
->
next
=
NULL
;
pPool
->
pVnode
=
pVnode
;
pPool
->
nRef
=
0
;
pPool
->
size
=
0
;
pPool
->
ptr
=
pPool
->
node
.
data
;
...
...
@@ -146,4 +147,26 @@ static int vnodeBufPoolDestroy(SVBufPool *pPool) {
vnodeBufPoolReset
(
pPool
);
taosMemoryFree
(
pPool
);
return
0
;
}
void
vnodeBufPoolRef
(
SVBufPool
*
pPool
)
{
int32_t
nRef
=
atomic_fetch_add_32
(
&
pPool
->
nRef
,
1
);
ASSERT
(
nRef
>
0
);
}
void
vnodeBufPoolUnRef
(
SVBufPool
*
pPool
)
{
int32_t
nRef
=
atomic_sub_fetch_32
(
&
pPool
->
nRef
,
1
);
if
(
nRef
==
0
)
{
SVnode
*
pVnode
=
pPool
->
pVnode
;
vnodeBufPoolReset
(
pPool
);
taosThreadMutexLock
(
&
pVnode
->
mutex
);
pPool
->
next
=
pVnode
->
pPool
;
pVnode
->
pPool
=
pPool
;
taosThreadCondSignal
(
&
pVnode
->
poolNotEmpty
);
taosThreadMutexUnlock
(
&
pVnode
->
mutex
);
}
}
\ No newline at end of file
source/dnode/vnode/src/vnd/vnodeCommit.c
浏览文件 @
df616a5f
...
...
@@ -15,7 +15,7 @@
#include "vnd.h"
#define VND_INFO_FNAME "vnode.json"
#define VND_INFO_FNAME
"vnode.json"
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
static
int
vnodeEncodeInfo
(
const
SVnodeInfo
*
pInfo
,
char
**
ppData
);
...
...
@@ -27,18 +27,18 @@ static void vnodeWaitCommit(SVnode *pVnode);
int
vnodeBegin
(
SVnode
*
pVnode
)
{
// alloc buffer pool
/* pthread_mutex_lock(); */
taosThreadMutexLock
(
&
pVnode
->
mutex
);
while
(
pVnode
->
pPool
==
NULL
)
{
/* pthread_cond_wait(); */
taosThreadCondWait
(
&
pVnode
->
poolNotEmpty
,
&
pVnode
->
mutex
);
}
pVnode
->
inUse
=
pVnode
->
pPool
;
pVnode
->
inUse
->
nRef
=
1
;
pVnode
->
pPool
=
pVnode
->
inUse
->
next
;
pVnode
->
inUse
->
next
=
NULL
;
/* ref pVnode->inUse buffer pool */
/* pthread_mutex_unlock(); */
taosThreadMutexUnlock
(
&
pVnode
->
mutex
);
pVnode
->
state
.
commitID
++
;
// begin meta
...
...
@@ -217,7 +217,7 @@ int vnodeCommit(SVnode *pVnode) {
vInfo
(
"vgId:%d, start to commit, commit ID:%"
PRId64
" version:%"
PRId64
,
TD_VID
(
pVnode
),
pVnode
->
state
.
commitID
,
pVnode
->
state
.
applied
);
pVnode
->
onCommit
=
pVnode
->
inUse
;
vnodeBufPoolUnRef
(
pVnode
->
inUse
)
;
pVnode
->
inUse
=
NULL
;
// save info
...
...
@@ -284,10 +284,6 @@ int vnodeCommit(SVnode *pVnode) {
// apply the commit (TODO)
walEndSnapshot
(
pVnode
->
pWal
);
vnodeBufPoolReset
(
pVnode
->
onCommit
);
pVnode
->
onCommit
->
next
=
pVnode
->
pPool
;
pVnode
->
pPool
=
pVnode
->
onCommit
;
pVnode
->
onCommit
=
NULL
;
vInfo
(
"vgId:%d, commit over"
,
TD_VID
(
pVnode
));
...
...
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
df616a5f
...
...
@@ -89,6 +89,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
tsem_init
(
&
pVnode
->
syncSem
,
0
,
0
);
tsem_init
(
&
(
pVnode
->
canCommit
),
0
,
1
);
taosThreadMutexInit
(
&
pVnode
->
mutex
,
NULL
);
taosThreadCondInit
(
&
pVnode
->
poolNotEmpty
,
NULL
);
// open buffer pool
if
(
vnodeOpenBufPool
(
pVnode
,
pVnode
->
config
.
isHeap
?
0
:
pVnode
->
config
.
szBuf
/
3
)
<
0
)
{
...
...
@@ -195,6 +197,8 @@ void vnodeClose(SVnode *pVnode) {
// destroy handle
tsem_destroy
(
&
(
pVnode
->
canCommit
));
tsem_destroy
(
&
pVnode
->
syncSem
);
taosThreadCondDestroy
(
&
pVnode
->
poolNotEmpty
);
taosThreadMutexDestroy
(
&
pVnode
->
mutex
);
taosMemoryFree
(
pVnode
);
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录