Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f739e7c3
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
f739e7c3
编写于
4月 06, 2022
作者:
H
Hongze Cheng
提交者:
GitHub
4月 06, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #11258 from taosdata/feature/meta
Feature/meta
上级
af32cfca
70810ddb
变更
35
隐藏空白更改
内联
并排
Showing
35 changed file
with
471 addition
and
360 deletion
+471
-360
source/dnode/vnode/CMakeLists.txt
source/dnode/vnode/CMakeLists.txt
+1
-1
source/dnode/vnode/src/inc/tsdbReadImpl.h
source/dnode/vnode/src/inc/tsdbReadImpl.h
+16
-16
source/dnode/vnode/src/meta/metaCommit.c
source/dnode/vnode/src/meta/metaCommit.c
+0
-21
source/dnode/vnode/src/meta/metaTDBImpl.c
source/dnode/vnode/src/meta/metaTDBImpl.c
+130
-17
source/dnode/vnode/src/meta/metaTbUid.c
source/dnode/vnode/src/meta/metaTbUid.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+0
-1
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
+16
-15
source/dnode/vnode/src/vnd/vnodeCommit.c
source/dnode/vnode/src/vnd/vnodeCommit.c
+1
-1
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+4
-0
source/libs/tdb/src/btree/tdbBtreeBalance.c
source/libs/tdb/src/btree/tdbBtreeBalance.c
+0
-14
source/libs/tdb/src/btree/tdbBtreeCommon.c
source/libs/tdb/src/btree/tdbBtreeCommon.c
+0
-14
source/libs/tdb/src/btree/tdbBtreeDelete.c
source/libs/tdb/src/btree/tdbBtreeDelete.c
+0
-14
source/libs/tdb/src/btree/tdbBtreeInsert.c
source/libs/tdb/src/btree/tdbBtreeInsert.c
+0
-14
source/libs/tdb/src/btree/tdbBtreeInt.h
source/libs/tdb/src/btree/tdbBtreeInt.h
+0
-27
source/libs/tdb/src/btree/tdbBtreeOpen.c
source/libs/tdb/src/btree/tdbBtreeOpen.c
+0
-14
source/libs/tdb/src/db/tdbBtree.c
source/libs/tdb/src/db/tdbBtree.c
+36
-32
source/libs/tdb/src/db/tdbDb.c
source/libs/tdb/src/db/tdbDb.c
+3
-3
source/libs/tdb/src/db/tdbEnv.c
source/libs/tdb/src/db/tdbEnv.c
+5
-5
source/libs/tdb/src/db/tdbPCache.c
source/libs/tdb/src/db/tdbPCache.c
+76
-46
source/libs/tdb/src/db/tdbPage.c
source/libs/tdb/src/db/tdbPage.c
+1
-1
source/libs/tdb/src/db/tdbPager.c
source/libs/tdb/src/db/tdbPager.c
+36
-24
source/libs/tdb/src/db/tdbTxn.c
source/libs/tdb/src/db/tdbTxn.c
+12
-24
source/libs/tdb/src/db/tdbUtil.c
source/libs/tdb/src/db/tdbUtil.c
+13
-0
source/libs/tdb/src/inc/tdbBtree.h
source/libs/tdb/src/inc/tdbBtree.h
+3
-2
source/libs/tdb/src/inc/tdbDb.h
source/libs/tdb/src/inc/tdbDb.h
+1
-1
source/libs/tdb/src/inc/tdbEnv.h
source/libs/tdb/src/inc/tdbEnv.h
+3
-3
source/libs/tdb/src/inc/tdbInt.h
source/libs/tdb/src/inc/tdbInt.h
+15
-0
source/libs/tdb/src/inc/tdbOs.h
source/libs/tdb/src/inc/tdbOs.h
+11
-9
source/libs/tdb/src/inc/tdbPCache.h
source/libs/tdb/src/inc/tdbPCache.h
+4
-3
source/libs/tdb/src/inc/tdbPager.h
source/libs/tdb/src/inc/tdbPager.h
+8
-5
source/libs/tdb/src/inc/tdbTxn.h
source/libs/tdb/src/inc/tdbTxn.h
+3
-7
source/libs/tdb/src/inc/tdbUtil.h
source/libs/tdb/src/inc/tdbUtil.h
+6
-2
source/libs/tdb/test/CMakeLists.txt
source/libs/tdb/test/CMakeLists.txt
+5
-1
source/libs/tdb/test/tdbTest.cpp
source/libs/tdb/test/tdbTest.cpp
+41
-22
source/libs/tdb/test/tdbUtilTest.cpp
source/libs/tdb/test/tdbUtilTest.cpp
+20
-0
未找到文件。
source/dnode/vnode/CMakeLists.txt
浏览文件 @
f739e7c3
set
(
META_DB_IMPL_LIST
"BDB"
"TDB"
)
set
(
META_DB_IMPL
"
B
DB"
CACHE STRING
"Use BDB as the default META implementation"
)
set
(
META_DB_IMPL
"
T
DB"
CACHE STRING
"Use BDB as the default META implementation"
)
set_property
(
CACHE META_DB_IMPL PROPERTY STRINGS
${
META_DB_IMPL_LIST
}
)
if
(
META_DB_IMPL IN_LIST META_DB_IMPL_LIST
)
...
...
source/dnode/vnode/src/inc/tsdbReadImpl.h
浏览文件 @
f739e7c3
...
...
@@ -17,12 +17,12 @@
#define _TD_TSDB_READ_IMPL_H_
#include "os.h"
#include "tcommon.h"
#include "tfs.h"
#include "tsdb.h"
#include "tsdbFile.h"
#include "tskiplist.h"
#include "tsdbMemory.h"
#include "t
common
.h"
#include "t
skiplist
.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -31,7 +31,6 @@ extern "C" {
typedef
struct
SReadH
SReadH
;
typedef
struct
{
int32_t
tid
;
uint32_t
len
;
uint32_t
offset
;
uint32_t
hasLast
:
2
;
...
...
@@ -81,7 +80,7 @@ typedef struct {
TSKEY
keyLast
;
}
SBlockV0
;
#define SBlock
SBlockV0 // latest SBlock definition
#define SBlock SBlockV0 // latest SBlock definition
#endif
...
...
@@ -165,19 +164,19 @@ typedef struct {
typedef
void
SAggrBlkData
;
// SBlockCol cols[];
struct
SReadH
{
STsdb
*
pRepo
;
STsdb
*
pRepo
;
SDFileSet
rSet
;
// FSET to read
SArray
*
aBlkIdx
;
// SBlockIdx array
STable
*
pTable
;
// table to read
SBlockIdx
*
pBlkIdx
;
// current reading table SBlockIdx
SArray
*
aBlkIdx
;
// SBlockIdx array
STable
*
pTable
;
// table to read
SBlockIdx
*
pBlkIdx
;
// current reading table SBlockIdx
int
cidx
;
SBlockInfo
*
pBlkInfo
;
SBlockData
*
pBlkData
;
// Block info
SBlockInfo
*
pBlkInfo
;
SBlockData
*
pBlkData
;
// Block info
SAggrBlkData
*
pAggrBlkData
;
// Aggregate Block info
SDataCols
*
pDCols
[
2
];
void
*
pBuf
;
// buffer
void
*
pCBuf
;
// compression buffer
void
*
pExBuf
;
// extra buffer
SDataCols
*
pDCols
[
2
];
void
*
pBuf
;
// buffer
void
*
pCBuf
;
// compression buffer
void
*
pExBuf
;
// extra buffer
};
#define TSDB_READ_REPO(rh) ((rh)->pRepo)
...
...
@@ -222,14 +221,15 @@ int tsdbLoadBlockIdx(SReadH *pReadh);
int
tsdbSetReadTable
(
SReadH
*
pReadh
,
STable
*
pTable
);
int
tsdbLoadBlockInfo
(
SReadH
*
pReadh
,
void
*
pTarget
);
int
tsdbLoadBlockData
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SBlockInfo
*
pBlockInfo
);
int
tsdbLoadBlockDataCols
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SBlockInfo
*
pBlkInfo
,
const
int16_t
*
colIds
,
int
numOfColsIds
);
int
tsdbLoadBlockDataCols
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SBlockInfo
*
pBlkInfo
,
const
int16_t
*
colIds
,
int
numOfColsIds
);
int
tsdbLoadBlockStatis
(
SReadH
*
pReadh
,
SBlock
*
pBlock
);
int
tsdbEncodeSBlockIdx
(
void
**
buf
,
SBlockIdx
*
pIdx
);
void
*
tsdbDecodeSBlockIdx
(
void
*
buf
,
SBlockIdx
*
pIdx
);
void
tsdbGetBlockStatis
(
SReadH
*
pReadh
,
SDataStatis
*
pStatis
,
int
numOfCols
,
SBlock
*
pBlock
);
static
FORCE_INLINE
int
tsdbMakeRoom
(
void
**
ppBuf
,
size_t
size
)
{
void
*
pBuf
=
*
ppBuf
;
void
*
pBuf
=
*
ppBuf
;
size_t
tsize
=
taosTSizeof
(
pBuf
);
if
(
tsize
<
size
)
{
...
...
source/dnode/vnode/src/meta/metaCommit.c
已删除
100644 → 0
浏览文件 @
af32cfca
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "meta.h"
int
metaCommit
(
SMeta
*
pMeta
)
{
// TODO
return
0
;
}
\ No newline at end of file
source/dnode/vnode/src/meta/metaTDBImpl.c
浏览文件 @
f739e7c3
...
...
@@ -16,15 +16,28 @@
#include "metaDef.h"
#include "tdbInt.h"
typedef
struct
SPoolMem
{
int64_t
size
;
struct
SPoolMem
*
prev
;
struct
SPoolMem
*
next
;
}
SPoolMem
;
static
SPoolMem
*
openPool
();
static
void
clearPool
(
SPoolMem
*
pPool
);
static
void
closePool
(
SPoolMem
*
pPool
);
static
void
*
poolMalloc
(
void
*
arg
,
size_t
size
);
static
void
poolFree
(
void
*
arg
,
void
*
ptr
);
struct
SMetaDB
{
TENV
*
pEnv
;
TDB
*
pTbDB
;
TDB
*
pSchemaDB
;
TDB
*
pNameIdx
;
TDB
*
pStbIdx
;
TDB
*
pNtbIdx
;
TDB
*
pCtbIdx
;
TXN
txn
;
TENV
*
pEnv
;
TDB
*
pTbDB
;
TDB
*
pSchemaDB
;
TDB
*
pNameIdx
;
TDB
*
pStbIdx
;
TDB
*
pNtbIdx
;
TDB
*
pCtbIdx
;
SPoolMem
*
pPool
;
};
typedef
struct
__attribute__
((
__packed__
))
{
...
...
@@ -167,12 +180,19 @@ int metaOpenDB(SMeta *pMeta) {
return
-
1
;
}
pMetaDb
->
pPool
=
openPool
();
tdbTxnOpen
(
&
pMetaDb
->
txn
,
0
,
poolMalloc
,
poolFree
,
pMetaDb
->
pPool
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
);
tdbBegin
(
pMetaDb
->
pEnv
,
NULL
);
pMeta
->
pDB
=
pMetaDb
;
return
0
;
}
void
metaCloseDB
(
SMeta
*
pMeta
)
{
if
(
pMeta
->
pDB
)
{
tdbCommit
(
pMeta
->
pDB
->
pEnv
,
&
pMeta
->
pDB
->
txn
);
tdbTxnClose
(
&
pMeta
->
pDB
->
txn
);
clearPool
(
pMeta
->
pDB
->
pPool
);
tdbDbClose
(
pMeta
->
pDB
->
pCtbIdx
);
tdbDbClose
(
pMeta
->
pDB
->
pNtbIdx
);
tdbDbClose
(
pMeta
->
pDB
->
pStbIdx
);
...
...
@@ -206,13 +226,21 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
uid
=
metaGenerateUid
(
pMeta
);
}
// check name and uid unique
if
(
tdbDbGet
(
pMetaDb
->
pTbDB
,
&
uid
,
sizeof
(
uid
),
NULL
,
NULL
)
==
0
)
{
return
-
1
;
}
if
(
tdbDbGet
(
pMetaDb
->
pNameIdx
,
pTbCfg
->
name
,
strlen
(
pTbCfg
->
name
)
+
1
,
NULL
,
NULL
)
==
0
)
{
return
-
1
;
}
// save to table.db
pKey
=
&
uid
;
kLen
=
sizeof
(
uid
);
pVal
=
pBuf
=
buf
;
metaEncodeTbInfo
(
&
pBuf
,
pTbCfg
);
vLen
=
POINTER_DISTANCE
(
pBuf
,
buf
);
ret
=
tdbDbInsert
(
pMetaDb
->
pTbDB
,
pKey
,
kLen
,
pVal
,
vLen
);
ret
=
tdbDbInsert
(
pMetaDb
->
pTbDB
,
pKey
,
kLen
,
pVal
,
vLen
,
&
pMetaDb
->
txn
);
if
(
ret
<
0
)
{
return
-
1
;
}
...
...
@@ -226,15 +254,15 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
if
(
pTbCfg
->
type
==
META_SUPER_TABLE
)
{
schemaWrapper
.
nCols
=
pTbCfg
->
stbCfg
.
nCols
;
schemaWrapper
.
pSchema
=
pTbCfg
->
stbCfg
.
pSchema
;
schemaWrapper
.
pSchema
Ex
=
pTbCfg
->
stbCfg
.
pSchema
;
}
else
{
schemaWrapper
.
nCols
=
pTbCfg
->
ntbCfg
.
nCols
;
schemaWrapper
.
pSchema
=
pTbCfg
->
ntbCfg
.
pSchema
;
schemaWrapper
.
pSchema
Ex
=
pTbCfg
->
ntbCfg
.
pSchema
;
}
pVal
=
pBuf
=
buf
;
metaEncodeSchemaEx
(
&
pBuf
,
&
schemaWrapper
);
vLen
=
POINTER_DISTANCE
(
pBuf
,
buf
);
ret
=
tdbDbInsert
(
pMetaDb
->
pSchemaDB
,
pKey
,
kLen
,
pVal
,
vLen
);
ret
=
tdbDbInsert
(
pMetaDb
->
pSchemaDB
,
pKey
,
kLen
,
pVal
,
vLen
,
&
pMeta
->
pDB
->
txn
);
if
(
ret
<
0
)
{
return
-
1
;
}
...
...
@@ -248,7 +276,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
kLen
=
nameLen
+
1
+
sizeof
(
uid
);
pVal
=
NULL
;
vLen
=
0
;
ret
=
tdbDbInsert
(
pMetaDb
->
pNameIdx
,
pKey
,
kLen
,
pVal
,
vLen
);
ret
=
tdbDbInsert
(
pMetaDb
->
pNameIdx
,
pKey
,
kLen
,
pVal
,
vLen
,
&
pMetaDb
->
txn
);
if
(
ret
<
0
)
{
return
-
1
;
}
...
...
@@ -259,7 +287,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
kLen
=
sizeof
(
uid
);
pVal
=
NULL
;
vLen
=
0
;
ret
=
tdbDbInsert
(
pMetaDb
->
pStbIdx
,
pKey
,
kLen
,
pVal
,
vLen
);
ret
=
tdbDbInsert
(
pMetaDb
->
pStbIdx
,
pKey
,
kLen
,
pVal
,
vLen
,
&
pMetaDb
->
txn
);
if
(
ret
<
0
)
{
return
-
1
;
}
...
...
@@ -270,7 +298,7 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
kLen
=
sizeof
(
ctbIdxKey
);
pVal
=
NULL
;
vLen
=
0
;
ret
=
tdbDbInsert
(
pMetaDb
->
pCtbIdx
,
pKey
,
kLen
,
pVal
,
vLen
);
ret
=
tdbDbInsert
(
pMetaDb
->
pCtbIdx
,
pKey
,
kLen
,
pVal
,
vLen
,
&
pMetaDb
->
txn
);
if
(
ret
<
0
)
{
return
-
1
;
}
...
...
@@ -279,12 +307,16 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
kLen
=
sizeof
(
uid
);
pVal
=
NULL
;
vLen
=
0
;
ret
=
tdbDbInsert
(
pMetaDb
->
pNtbIdx
,
pKey
,
kLen
,
pVal
,
vLen
);
ret
=
tdbDbInsert
(
pMetaDb
->
pNtbIdx
,
pKey
,
kLen
,
pVal
,
vLen
,
&
pMetaDb
->
txn
);
if
(
ret
<
0
)
{
return
-
1
;
}
}
if
(
pMeta
->
pDB
->
pPool
->
size
>
0
)
{
metaCommit
(
pMeta
);
}
return
0
;
}
...
...
@@ -349,7 +381,7 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) {
}
SSchemaWrapper
*
metaGetTableSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
,
bool
isinline
)
{
return
*
metaGetTableSchemaImpl
(
pMeta
,
uid
,
sver
,
isinline
,
false
);
return
metaGetTableSchemaImpl
(
pMeta
,
uid
,
sver
,
isinline
,
false
);
}
static
SSchemaWrapper
*
metaGetTableSchemaImpl
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
,
bool
isinline
,
bool
isGetEx
)
{
...
...
@@ -523,7 +555,7 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
return
0
;
}
pCtbIdxKey
=
pCtbCur
->
p
Val
;
pCtbIdxKey
=
pCtbCur
->
p
Key
;
return
pCtbIdxKey
->
uid
;
}
...
...
@@ -701,3 +733,84 @@ static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) {
}
return
buf
;
}
int
metaCommit
(
SMeta
*
pMeta
)
{
TXN
*
pTxn
=
&
pMeta
->
pDB
->
txn
;
// Commit current txn
tdbCommit
(
pMeta
->
pDB
->
pEnv
,
pTxn
);
tdbTxnClose
(
pTxn
);
clearPool
(
pMeta
->
pDB
->
pPool
);
// start a new txn
tdbTxnOpen
(
&
pMeta
->
pDB
->
txn
,
0
,
poolMalloc
,
poolFree
,
pMeta
->
pDB
->
pPool
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
);
tdbBegin
(
pMeta
->
pDB
->
pEnv
,
pTxn
);
return
0
;
}
static
SPoolMem
*
openPool
()
{
SPoolMem
*
pPool
=
(
SPoolMem
*
)
tdbOsMalloc
(
sizeof
(
*
pPool
));
pPool
->
prev
=
pPool
->
next
=
pPool
;
pPool
->
size
=
0
;
return
pPool
;
}
static
void
clearPool
(
SPoolMem
*
pPool
)
{
SPoolMem
*
pMem
;
do
{
pMem
=
pPool
->
next
;
if
(
pMem
==
pPool
)
break
;
pMem
->
next
->
prev
=
pMem
->
prev
;
pMem
->
prev
->
next
=
pMem
->
next
;
pPool
->
size
-=
pMem
->
size
;
tdbOsFree
(
pMem
);
}
while
(
1
);
assert
(
pPool
->
size
==
0
);
}
static
void
closePool
(
SPoolMem
*
pPool
)
{
clearPool
(
pPool
);
tdbOsFree
(
pPool
);
}
static
void
*
poolMalloc
(
void
*
arg
,
size_t
size
)
{
void
*
ptr
=
NULL
;
SPoolMem
*
pPool
=
(
SPoolMem
*
)
arg
;
SPoolMem
*
pMem
;
pMem
=
(
SPoolMem
*
)
tdbOsMalloc
(
sizeof
(
*
pMem
)
+
size
);
if
(
pMem
==
NULL
)
{
assert
(
0
);
}
pMem
->
size
=
sizeof
(
*
pMem
)
+
size
;
pMem
->
next
=
pPool
->
next
;
pMem
->
prev
=
pPool
;
pPool
->
next
->
prev
=
pMem
;
pPool
->
next
=
pMem
;
pPool
->
size
+=
pMem
->
size
;
ptr
=
(
void
*
)(
&
pMem
[
1
]);
return
ptr
;
}
static
void
poolFree
(
void
*
arg
,
void
*
ptr
)
{
SPoolMem
*
pPool
=
(
SPoolMem
*
)
arg
;
SPoolMem
*
pMem
;
pMem
=
&
(((
SPoolMem
*
)
ptr
)[
-
1
]);
pMem
->
next
->
prev
=
pMem
->
prev
;
pMem
->
prev
->
next
=
pMem
->
next
;
pPool
->
size
-=
pMem
->
size
;
tdbOsFree
(
pMem
);
}
source/dnode/vnode/src/meta/metaTbUid.c
浏览文件 @
f739e7c3
...
...
@@ -27,5 +27,5 @@ void metaCloseUidGnrt(SMeta *pMeta) { /* TODO */
tb_uid_t
metaGenerateUid
(
SMeta
*
pMeta
)
{
// Generate a new table UID
return
tGenIdPI
32
();
return
tGenIdPI
64
();
}
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
f739e7c3
...
...
@@ -701,7 +701,6 @@ int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray
// Set pIdx
pBlock
=
taosArrayGetLast
(
pSupA
);
pIdx
->
tid
=
TABLE_TID
(
pTable
);
pIdx
->
uid
=
TABLE_UID
(
pTable
);
pIdx
->
hasLast
=
pBlock
->
last
?
1
:
0
;
pIdx
->
maxKey
=
pBlock
->
keyLast
;
...
...
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
浏览文件 @
f739e7c3
...
...
@@ -98,7 +98,7 @@ int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet) {
void
tsdbCloseAndUnsetFSet
(
SReadH
*
pReadh
)
{
tsdbResetReadFile
(
pReadh
);
}
int
tsdbLoadBlockIdx
(
SReadH
*
pReadh
)
{
SDFile
*
pHeadf
=
TSDB_READ_HEAD_FILE
(
pReadh
);
SDFile
*
pHeadf
=
TSDB_READ_HEAD_FILE
(
pReadh
);
SBlockIdx
blkIdx
;
ASSERT
(
taosArrayGetSize
(
pReadh
->
aBlkIdx
)
==
0
);
...
...
@@ -149,8 +149,8 @@ int tsdbLoadBlockIdx(SReadH *pReadh) {
}
tsize
++
;
ASSERT
(
tsize
==
1
||
((
SBlockIdx
*
)
taosArrayGet
(
pReadh
->
aBlkIdx
,
tsize
-
2
))
->
tid
<
((
SBlockIdx
*
)
taosArrayGet
(
pReadh
->
aBlkIdx
,
tsize
-
1
))
->
tid
);
//
ASSERT(tsize == 1 || ((SBlockIdx *)taosArrayGet(pReadh->aBlkIdx, tsize - 2))->tid <
//
((SBlockIdx *)taosArrayGet(pReadh->aBlkIdx, tsize - 1))->tid);
}
return
0
;
...
...
@@ -180,7 +180,7 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
}
SBlockIdx
*
pBlkIdx
=
taosArrayGet
(
pReadh
->
aBlkIdx
,
pReadh
->
cidx
);
if
(
pBlkIdx
->
t
id
==
TABLE_TID
(
pTable
))
{
if
(
pBlkIdx
->
u
id
==
TABLE_TID
(
pTable
))
{
if
(
pBlkIdx
->
uid
==
TABLE_UID
(
pTable
))
{
pReadh
->
pBlkIdx
=
pBlkIdx
;
}
else
{
...
...
@@ -188,7 +188,7 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
}
pReadh
->
cidx
++
;
break
;
}
else
if
(
pBlkIdx
->
t
id
>
TABLE_TID
(
pTable
))
{
}
else
if
(
pBlkIdx
->
u
id
>
TABLE_TID
(
pTable
))
{
pReadh
->
pBlkIdx
=
NULL
;
break
;
}
else
{
...
...
@@ -205,7 +205,7 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
int
tsdbLoadBlockInfo
(
SReadH
*
pReadh
,
void
*
pTarget
)
{
ASSERT
(
pReadh
->
pBlkIdx
!=
NULL
);
SDFile
*
pHeadf
=
TSDB_READ_HEAD_FILE
(
pReadh
);
SDFile
*
pHeadf
=
TSDB_READ_HEAD_FILE
(
pReadh
);
SBlockIdx
*
pBlkIdx
=
pReadh
->
pBlkIdx
;
if
(
tsdbSeekDFile
(
pHeadf
,
pBlkIdx
->
offset
,
SEEK_SET
)
<
0
)
{
...
...
@@ -237,7 +237,7 @@ int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) {
return
-
1
;
}
ASSERT
(
pBlkIdx
->
tid
==
pReadh
->
pBlkInfo
->
tid
&&
pBlkIdx
->
uid
==
pReadh
->
pBlkInfo
->
uid
);
//
ASSERT(pBlkIdx->tid == pReadh->pBlkInfo->tid && pBlkIdx->uid == pReadh->pBlkInfo->uid);
if
(
pTarget
)
{
memcpy
(
pTarget
,
(
void
*
)(
pReadh
->
pBlkInfo
),
pBlkIdx
->
len
);
...
...
@@ -275,7 +275,8 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
return
0
;
}
int
tsdbLoadBlockDataCols
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SBlockInfo
*
pBlkInfo
,
const
int16_t
*
colIds
,
int
numOfColsIds
)
{
int
tsdbLoadBlockDataCols
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SBlockInfo
*
pBlkInfo
,
const
int16_t
*
colIds
,
int
numOfColsIds
)
{
ASSERT
(
pBlock
->
numOfSubBlocks
>
0
);
int8_t
update
=
pReadh
->
pRepo
->
config
.
update
;
...
...
@@ -388,7 +389,7 @@ static int tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock) {
int
tsdbEncodeSBlockIdx
(
void
**
buf
,
SBlockIdx
*
pIdx
)
{
int
tlen
=
0
;
tlen
+=
taosEncodeVariantI32
(
buf
,
pIdx
->
tid
);
//
tlen += taosEncodeVariantI32(buf, pIdx->tid);
tlen
+=
taosEncodeVariantU32
(
buf
,
pIdx
->
len
);
tlen
+=
taosEncodeVariantU32
(
buf
,
pIdx
->
offset
);
tlen
+=
taosEncodeFixedU8
(
buf
,
pIdx
->
hasLast
);
...
...
@@ -404,7 +405,7 @@ void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) {
uint32_t
numOfBlocks
=
0
;
uint64_t
value
=
0
;
if
((
buf
=
taosDecodeVariantI32
(
buf
,
&
(
pIdx
->
tid
)))
==
NULL
)
return
NULL
;
//
if ((buf = taosDecodeVariantI32(buf, &(pIdx->tid))) == NULL) return NULL;
if
((
buf
=
taosDecodeVariantU32
(
buf
,
&
(
pIdx
->
len
)))
==
NULL
)
return
NULL
;
if
((
buf
=
taosDecodeVariantU32
(
buf
,
&
(
pIdx
->
offset
)))
==
NULL
)
return
NULL
;
if
((
buf
=
taosDecodeFixedU8
(
buf
,
&
(
hasLast
)))
==
NULL
)
return
NULL
;
...
...
@@ -538,9 +539,9 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat
pDataCols
->
numOfRows
=
pBlock
->
numOfRows
;
// Recover the data
int
ccol
=
0
;
// loop iter for SBlockCol object
int
dcol
=
0
;
// loop iter for SDataCols object
int
nBitmaps
=
(
int
)
TD_BITMAP_BYTES
(
pBlock
->
numOfRows
);
int
ccol
=
0
;
// loop iter for SBlockCol object
int
dcol
=
0
;
// loop iter for SDataCols object
int
nBitmaps
=
(
int
)
TD_BITMAP_BYTES
(
pBlock
->
numOfRows
);
SBlockCol
*
pBlockCol
=
NULL
;
while
(
dcol
<
pDataCols
->
numOfCols
)
{
SDataCol
*
pDataCol
=
&
(
pDataCols
->
cols
[
dcol
]);
...
...
@@ -686,7 +687,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
ASSERT
(
pBlock
->
numOfSubBlocks
==
0
||
pBlock
->
numOfSubBlocks
==
1
);
ASSERT
(
colIds
[
0
]
==
PRIMARYKEY_TIMESTAMP_COL_ID
);
SDFile
*
pDFile
=
(
pBlock
->
last
)
?
TSDB_READ_LAST_FILE
(
pReadh
)
:
TSDB_READ_DATA_FILE
(
pReadh
);
SDFile
*
pDFile
=
(
pBlock
->
last
)
?
TSDB_READ_LAST_FILE
(
pReadh
)
:
TSDB_READ_DATA_FILE
(
pReadh
);
SBlockCol
blockCol
=
{
0
};
tdResetDataCols
(
pDataCols
);
...
...
@@ -700,7 +701,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *
int
ccol
=
0
;
for
(
int
i
=
0
;
i
<
numOfColIds
;
i
++
)
{
int16_t
colId
=
colIds
[
i
];
SDataCol
*
pDataCol
=
NULL
;
SDataCol
*
pDataCol
=
NULL
;
SBlockCol
*
pBlockCol
=
NULL
;
while
(
true
)
{
...
...
source/dnode/vnode/src/vnd/vnodeCommit.c
浏览文件 @
f739e7c3
...
...
@@ -47,7 +47,7 @@ int vnodeSyncCommit(SVnode *pVnode) {
static
int
vnodeCommit
(
void
*
arg
)
{
SVnode
*
pVnode
=
(
SVnode
*
)
arg
;
metaCommit
(
pVnode
->
pMeta
);
//
metaCommit(pVnode->pMeta);
tqCommit
(
pVnode
->
pTq
);
tsdbCommit
(
pVnode
->
pTsdb
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
f739e7c3
...
...
@@ -297,6 +297,10 @@ SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) {
idata
.
info
.
slotId
=
pDescNode
->
slotId
;
idata
.
info
.
precision
=
pDescNode
->
dataType
.
precision
;
if
(
IS_VAR_DATA_TYPE
(
idata
.
info
.
type
))
{
pBlock
->
info
.
hasVarCol
=
true
;
}
taosArrayPush
(
pBlock
->
pDataBlock
,
&
idata
);
}
...
...
source/libs/tdb/src/btree/tdbBtreeBalance.c
已删除
100644 → 0
浏览文件 @
af32cfca
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
source/libs/tdb/src/btree/tdbBtreeCommon.c
已删除
100644 → 0
浏览文件 @
af32cfca
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
source/libs/tdb/src/btree/tdbBtreeDelete.c
已删除
100644 → 0
浏览文件 @
af32cfca
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
source/libs/tdb/src/btree/tdbBtreeInsert.c
已删除
100644 → 0
浏览文件 @
af32cfca
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
source/libs/tdb/src/btree/tdbBtreeInt.h
已删除
100644 → 0
浏览文件 @
af32cfca
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TDB_BTREE_INT_H_
#define _TDB_BTREE_INT_H_
#ifdef __cplusplus
extern
"C"
{
#endif
#ifdef __cplusplus
}
#endif
#endif
/*_TDB_BTREE_INT_H_*/
\ No newline at end of file
source/libs/tdb/src/btree/tdbBtreeOpen.c
已删除
100644 → 0
浏览文件 @
af32cfca
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
source/libs/tdb/src/db/tdbBtree.c
浏览文件 @
f739e7c3
...
...
@@ -127,7 +127,7 @@ int tdbBtreeClose(SBTree *pBt) {
return
0
;
}
int
tdbBtreeInsert
(
SBTree
*
pBt
,
const
void
*
pKey
,
int
kLen
,
const
void
*
pVal
,
int
vLen
)
{
int
tdbBtreeInsert
(
SBTree
*
pBt
,
const
void
*
pKey
,
int
kLen
,
const
void
*
pVal
,
int
vLen
,
TXN
*
pTxn
)
{
SBTC
btc
;
SCell
*
pCell
;
void
*
pBuf
;
...
...
@@ -137,7 +137,7 @@ int tdbBtreeInsert(SBTree *pBt, const void *pKey, int kLen, const void *pVal, in
int
idx
;
int
c
;
tdbBtcOpen
(
&
btc
,
pBt
);
tdbBtcOpen
(
&
btc
,
pBt
,
pTxn
);
// move to the position to insert
ret
=
tdbBtcMoveTo
(
&
btc
,
pKey
,
kLen
,
&
c
);
...
...
@@ -225,7 +225,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
void
*
pTVal
=
NULL
;
SCellDecoder
cd
;
tdbBtcOpen
(
&
btc
,
pBt
);
tdbBtcOpen
(
&
btc
,
pBt
,
NULL
);
ret
=
tdbBtcMoveTo
(
&
btc
,
pKey
,
kLen
,
&
cret
);
if
(
ret
<
0
)
{
...
...
@@ -233,7 +233,7 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
ASSERT
(
0
);
}
if
(
cret
)
{
if
(
btc
.
idx
<
0
||
cret
)
{
tdbBtcClose
(
&
btc
);
return
-
1
;
}
...
...
@@ -253,15 +253,17 @@ int tdbBtreePGet(SBTree *pBt, const void *pKey, int kLen, void **ppKey, int *pkL
memcpy
(
*
ppKey
,
cd
.
pKey
,
cd
.
kLen
);
}
pTVal
=
TDB_REALLOC
(
*
ppVal
,
cd
.
vLen
);
if
(
pTVal
==
NULL
)
{
tdbBtcClose
(
&
btc
);
ASSERT
(
0
);
return
-
1
;
if
(
ppVal
)
{
pTVal
=
TDB_REALLOC
(
*
ppVal
,
cd
.
vLen
);
if
(
pTVal
==
NULL
)
{
tdbBtcClose
(
&
btc
);
ASSERT
(
0
);
return
-
1
;
}
*
ppVal
=
pTVal
;
*
vLen
=
cd
.
vLen
;
memcpy
(
*
ppVal
,
cd
.
pVal
,
cd
.
vLen
);
}
*
ppVal
=
pTVal
;
*
vLen
=
cd
.
vLen
;
memcpy
(
*
ppVal
,
cd
.
pVal
,
cd
.
vLen
);
tdbBtcClose
(
&
btc
);
...
...
@@ -297,7 +299,8 @@ static int tdbBtreeOpenImpl(SBTree *pBt) {
{
// 1. TODO: Search the main DB to check if the DB exists
pgno
=
0
;
ret
=
tdbPagerOpenDB
(
pBt
->
pPager
,
&
pgno
,
true
);
ASSERT
(
ret
==
0
);
}
if
(
pgno
!=
0
)
{
...
...
@@ -307,13 +310,13 @@ static int tdbBtreeOpenImpl(SBTree *pBt) {
// Try to create a new database
SBtreeInitPageArg
zArg
=
{.
flags
=
TDB_BTREE_ROOT
|
TDB_BTREE_LEAF
,
.
pBt
=
pBt
};
ret
=
tdbPagerNewPage
(
pBt
->
pPager
,
&
pgno
,
&
pPage
,
tdbBtreeZeroPage
,
&
zArg
);
ret
=
tdbPagerNewPage
(
pBt
->
pPager
,
&
pgno
,
&
pPage
,
tdbBtreeZeroPage
,
&
zArg
,
NULL
);
if
(
ret
<
0
)
{
return
-
1
;
}
// TODO: here still has problem
tdbPagerReturnPage
(
pBt
->
pPager
,
pPage
);
tdbPagerReturnPage
(
pBt
->
pPager
,
pPage
,
NULL
);
ASSERT
(
pgno
!=
0
);
pBt
->
root
=
pgno
;
...
...
@@ -385,7 +388,7 @@ static int tdbBtreeZeroPage(SPage *pPage, void *arg) {
}
// TDB_BTREE_BALANCE =====================
static
int
tdbBtreeBalanceDeeper
(
SBTree
*
pBt
,
SPage
*
pRoot
,
SPage
**
ppChild
)
{
static
int
tdbBtreeBalanceDeeper
(
SBTree
*
pBt
,
SPage
*
pRoot
,
SPage
**
ppChild
,
TXN
*
pTxn
)
{
SPager
*
pPager
;
SPage
*
pChild
;
SPgno
pgnoChild
;
...
...
@@ -402,7 +405,7 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild) {
// Allocate a new child page
zArg
.
flags
=
TDB_FLAG_REMOVE
(
flags
,
TDB_BTREE_ROOT
);
zArg
.
pBt
=
pBt
;
ret
=
tdbPagerNewPage
(
pPager
,
&
pgnoChild
,
&
pChild
,
tdbBtreeZeroPage
,
&
zArg
);
ret
=
tdbPagerNewPage
(
pPager
,
&
pgnoChild
,
&
pChild
,
tdbBtreeZeroPage
,
&
zArg
,
pTxn
);
if
(
ret
<
0
)
{
return
-
1
;
}
...
...
@@ -436,7 +439,7 @@ static int tdbBtreeBalanceDeeper(SBTree *pBt, SPage *pRoot, SPage **ppChild) {
return
0
;
}
static
int
tdbBtreeBalanceNonRoot
(
SBTree
*
pBt
,
SPage
*
pParent
,
int
idx
)
{
static
int
tdbBtreeBalanceNonRoot
(
SBTree
*
pBt
,
SPage
*
pParent
,
int
idx
,
TXN
*
pTxn
)
{
int
ret
;
int
nOlds
;
...
...
@@ -477,7 +480,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
pgno
=
*
(
SPgno
*
)
pCell
;
}
ret
=
tdbPagerFetchPage
(
pBt
->
pPager
,
pgno
,
pOlds
+
i
,
tdbBtreeInitPage
,
pBt
);
ret
=
tdbPagerFetchPage
(
pBt
->
pPager
,
pgno
,
pOlds
+
i
,
tdbBtreeInitPage
,
pBt
,
pTxn
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
return
-
1
;
...
...
@@ -640,7 +643,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
}
else
{
iarg
.
pBt
=
pBt
;
iarg
.
flags
=
flags
;
ret
=
tdbPagerNewPage
(
pBt
->
pPager
,
&
pgno
,
pNews
+
iNew
,
tdbBtreeZeroPage
,
&
iarg
);
ret
=
tdbPagerNewPage
(
pBt
->
pPager
,
&
pgno
,
pNews
+
iNew
,
tdbBtreeZeroPage
,
&
iarg
,
pTxn
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
}
...
...
@@ -767,9 +770,9 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
// TODO: here is not corrent for drop case
for
(
int
i
=
0
;
i
<
nNews
;
i
++
)
{
if
(
i
<
nOlds
)
{
tdbPagerReturnPage
(
pBt
->
pPager
,
pOlds
[
i
]);
tdbPagerReturnPage
(
pBt
->
pPager
,
pOlds
[
i
]
,
pTxn
);
}
else
{
tdbPagerReturnPage
(
pBt
->
pPager
,
pNews
[
i
]);
tdbPagerReturnPage
(
pBt
->
pPager
,
pNews
[
i
]
,
pTxn
);
}
}
...
...
@@ -805,7 +808,7 @@ static int tdbBtreeBalance(SBTC *pBtc) {
// ignore the case of empty
if
(
pPage
->
nOverflow
==
0
)
break
;
ret
=
tdbBtreeBalanceDeeper
(
pBtc
->
pBt
,
pPage
,
&
(
pBtc
->
pgStack
[
1
]));
ret
=
tdbBtreeBalanceDeeper
(
pBtc
->
pBt
,
pPage
,
&
(
pBtc
->
pgStack
[
1
])
,
pBtc
->
pTxn
);
if
(
ret
<
0
)
{
return
-
1
;
}
...
...
@@ -819,12 +822,12 @@ static int tdbBtreeBalance(SBTC *pBtc) {
// Generalized balance step
pParent
=
pBtc
->
pgStack
[
iPage
-
1
];
ret
=
tdbBtreeBalanceNonRoot
(
pBtc
->
pBt
,
pParent
,
pBtc
->
idxStack
[
pBtc
->
iPage
-
1
]);
ret
=
tdbBtreeBalanceNonRoot
(
pBtc
->
pBt
,
pParent
,
pBtc
->
idxStack
[
pBtc
->
iPage
-
1
]
,
pBtc
->
pTxn
);
if
(
ret
<
0
)
{
return
-
1
;
}
tdbPagerReturnPage
(
pBtc
->
pBt
->
pPager
,
pBtc
->
pPage
);
tdbPagerReturnPage
(
pBtc
->
pBt
->
pPager
,
pBtc
->
pPage
,
pBtc
->
pTxn
);
pBtc
->
iPage
--
;
pBtc
->
pPage
=
pBtc
->
pgStack
[
pBtc
->
iPage
];
...
...
@@ -1024,11 +1027,12 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell) {
// TDB_BTREE_CELL
// TDB_BTREE_CURSOR =====================
int
tdbBtcOpen
(
SBTC
*
pBtc
,
SBTree
*
pBt
)
{
int
tdbBtcOpen
(
SBTC
*
pBtc
,
SBTree
*
pBt
,
TXN
*
pTxn
)
{
pBtc
->
pBt
=
pBt
;
pBtc
->
iPage
=
-
1
;
pBtc
->
pPage
=
NULL
;
pBtc
->
idx
=
-
1
;
pBtc
->
pTxn
=
pTxn
;
return
0
;
}
...
...
@@ -1045,7 +1049,7 @@ int tdbBtcMoveToFirst(SBTC *pBtc) {
if
(
pBtc
->
iPage
<
0
)
{
// move a clean cursor
ret
=
tdbPagerFetchPage
(
pPager
,
pBt
->
root
,
&
(
pBtc
->
pPage
),
tdbBtreeInitPage
,
pBt
);
ret
=
tdbPagerFetchPage
(
pPager
,
pBt
->
root
,
&
(
pBtc
->
pPage
),
tdbBtreeInitPage
,
pBt
,
pBtc
->
pTxn
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
return
-
1
;
...
...
@@ -1110,7 +1114,7 @@ int tdbBtcMoveToLast(SBTC *pBtc) {
if
(
pBtc
->
iPage
<
0
)
{
// move a clean cursor
ret
=
tdbPagerFetchPage
(
pPager
,
pBt
->
root
,
&
(
pBtc
->
pPage
),
tdbBtreeInitPage
,
pBt
);
ret
=
tdbPagerFetchPage
(
pPager
,
pBt
->
root
,
&
(
pBtc
->
pPage
),
tdbBtreeInitPage
,
pBt
,
pBtc
->
pTxn
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
return
-
1
;
...
...
@@ -1284,7 +1288,7 @@ static int tdbBtcMoveDownward(SBTC *pBtc) {
pBtc
->
pPage
=
NULL
;
pBtc
->
idx
=
-
1
;
ret
=
tdbPagerFetchPage
(
pBtc
->
pBt
->
pPager
,
pgno
,
&
pBtc
->
pPage
,
tdbBtreeInitPage
,
pBtc
->
pBt
);
ret
=
tdbPagerFetchPage
(
pBtc
->
pBt
->
pPager
,
pgno
,
&
pBtc
->
pPage
,
tdbBtreeInitPage
,
pBtc
->
pBt
,
pBtc
->
pTxn
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
return
-
1
;
...
...
@@ -1296,7 +1300,7 @@ static int tdbBtcMoveDownward(SBTC *pBtc) {
static
int
tdbBtcMoveUpward
(
SBTC
*
pBtc
)
{
if
(
pBtc
->
iPage
==
0
)
return
-
1
;
tdbPagerReturnPage
(
pBtc
->
pBt
->
pPager
,
pBtc
->
pPage
);
tdbPagerReturnPage
(
pBtc
->
pBt
->
pPager
,
pBtc
->
pPage
,
pBtc
->
pTxn
);
pBtc
->
iPage
--
;
pBtc
->
pPage
=
pBtc
->
pgStack
[
pBtc
->
iPage
];
...
...
@@ -1319,7 +1323,7 @@ static int tdbBtcMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
if
(
pBtc
->
iPage
<
0
)
{
// move from a clear cursor
ret
=
tdbPagerFetchPage
(
pPager
,
pBt
->
root
,
&
(
pBtc
->
pPage
),
tdbBtreeInitPage
,
pBt
);
ret
=
tdbPagerFetchPage
(
pPager
,
pBt
->
root
,
&
(
pBtc
->
pPage
),
tdbBtreeInitPage
,
pBt
,
pBtc
->
pTxn
);
if
(
ret
<
0
)
{
// TODO
ASSERT
(
0
);
...
...
@@ -1456,7 +1460,7 @@ int tdbBtcClose(SBTC *pBtc) {
for
(;;)
{
ASSERT
(
pBtc
->
pPage
);
tdbPagerReturnPage
(
pBtc
->
pBt
->
pPager
,
pBtc
->
pPage
);
tdbPagerReturnPage
(
pBtc
->
pBt
->
pPager
,
pBtc
->
pPage
,
pBtc
->
pTxn
);
pBtc
->
iPage
--
;
if
(
pBtc
->
iPage
<
0
)
break
;
...
...
source/libs/tdb/src/db/tdbDb.c
浏览文件 @
f739e7c3
...
...
@@ -75,8 +75,8 @@ int tdbDbDrop(TDB *pDb) {
return
0
;
}
int
tdbDbInsert
(
TDB
*
pDb
,
const
void
*
pKey
,
int
keyLen
,
const
void
*
pVal
,
int
valLen
)
{
return
tdbBtreeInsert
(
pDb
->
pBt
,
pKey
,
keyLen
,
pVal
,
valLen
);
int
tdbDbInsert
(
TDB
*
pDb
,
const
void
*
pKey
,
int
keyLen
,
const
void
*
pVal
,
int
valLen
,
TXN
*
pTxn
)
{
return
tdbBtreeInsert
(
pDb
->
pBt
,
pKey
,
keyLen
,
pVal
,
valLen
,
pTxn
);
}
int
tdbDbGet
(
TDB
*
pDb
,
const
void
*
pKey
,
int
kLen
,
void
**
ppVal
,
int
*
vLen
)
{
...
...
@@ -97,7 +97,7 @@ int tdbDbcOpen(TDB *pDb, TDBC **ppDbc) {
return
-
1
;
}
tdbBtcOpen
(
&
pDbc
->
btc
,
pDb
->
pBt
);
tdbBtcOpen
(
&
pDbc
->
btc
,
pDb
->
pBt
,
NULL
);
// TODO: move to first now, we can move to any key-value
// and in any direction, design new APIs.
...
...
source/libs/tdb/src/db/tdbEnv.c
浏览文件 @
f739e7c3
...
...
@@ -73,12 +73,12 @@ int tdbEnvClose(TENV *pEnv) {
return
0
;
}
int
tdbBegin
(
TENV
*
pEnv
)
{
int
tdbBegin
(
TENV
*
pEnv
,
TXN
*
pTxn
)
{
SPager
*
pPager
;
int
ret
;
for
(
pPager
=
pEnv
->
pgrList
;
pPager
;
pPager
=
pPager
->
pNext
)
{
ret
=
tdbPagerBegin
(
pPager
);
ret
=
tdbPagerBegin
(
pPager
,
pTxn
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
return
-
1
;
...
...
@@ -88,12 +88,12 @@ int tdbBegin(TENV *pEnv) {
return
0
;
}
int
tdbCommit
(
TENV
*
pEnv
)
{
int
tdbCommit
(
TENV
*
pEnv
,
TXN
*
pTxn
)
{
SPager
*
pPager
;
int
ret
;
for
(
pPager
=
pEnv
->
pgrList
;
pPager
;
pPager
=
pPager
->
pNext
)
{
ret
=
tdbPagerCommit
(
pPager
);
ret
=
tdbPagerCommit
(
pPager
,
pTxn
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
return
-
1
;
...
...
@@ -103,7 +103,7 @@ int tdbCommit(TENV *pEnv) {
return
0
;
}
int
tdbRollback
(
TENV
*
pEnv
)
{
int
tdbRollback
(
TENV
*
pEnv
,
TXN
*
pTxn
)
{
ASSERT
(
0
);
return
0
;
}
...
...
source/libs/tdb/src/db/tdbPCache.c
浏览文件 @
f739e7c3
...
...
@@ -18,6 +18,7 @@ struct SPCache {
int
pageSize
;
int
cacheSize
;
tdb_mutex_t
mutex
;
SPage
*
pList
;
int
nFree
;
SPage
*
pFree
;
int
nPage
;
...
...
@@ -35,16 +36,17 @@ struct SPCache {
#define PAGE_IS_PINNED(pPage) ((pPage)->pLruNext == NULL)
static
int
tdbPCacheOpenImpl
(
SPCache
*
pCache
);
static
void
tdbPCacheInitLock
(
SPCache
*
pCache
);
static
void
tdbPCacheClearLock
(
SPCache
*
pCache
);
static
void
tdbPCacheLock
(
SPCache
*
pCache
);
static
void
tdbPCacheUnlock
(
SPCache
*
pCache
);
static
bool
tdbPCacheLocked
(
SPCache
*
pCache
);
static
SPage
*
tdbPCacheFetchImpl
(
SPCache
*
pCache
,
const
SPgid
*
pPgid
,
bool
alcNewPage
);
static
SPage
*
tdbPCacheFetchImpl
(
SPCache
*
pCache
,
const
SPgid
*
pPgid
,
TXN
*
pTxn
);
static
void
tdbPCachePinPage
(
SPCache
*
pCache
,
SPage
*
pPage
);
static
void
tdbPCacheRemovePageFromHash
(
SPCache
*
pCache
,
SPage
*
pPage
);
static
void
tdbPCacheAddPageToHash
(
SPCache
*
pCache
,
SPage
*
pPage
);
static
void
tdbPCacheUnpinPage
(
SPCache
*
pCache
,
SPage
*
pPage
);
static
int
tdbPCacheCloseImpl
(
SPCache
*
pCache
);
static
void
tdbPCacheInitLock
(
SPCache
*
pCache
)
{
tdbMutexInit
(
&
(
pCache
->
mutex
),
NULL
);
}
static
void
tdbPCacheDestroyLock
(
SPCache
*
pCache
)
{
tdbMutexDestroy
(
&
(
pCache
->
mutex
));
}
static
void
tdbPCacheLock
(
SPCache
*
pCache
)
{
tdbMutexLock
(
&
(
pCache
->
mutex
));
}
static
void
tdbPCacheUnlock
(
SPCache
*
pCache
)
{
tdbMutexUnlock
(
&
(
pCache
->
mutex
));
}
int
tdbPCacheOpen
(
int
pageSize
,
int
cacheSize
,
SPCache
**
ppCache
)
{
SPCache
*
pCache
;
...
...
@@ -69,16 +71,19 @@ int tdbPCacheOpen(int pageSize, int cacheSize, SPCache **ppCache) {
}
int
tdbPCacheClose
(
SPCache
*
pCache
)
{
/* TODO */
if
(
pCache
)
{
tdbPCacheCloseImpl
(
pCache
);
tdbOsFree
(
pCache
);
}
return
0
;
}
SPage
*
tdbPCacheFetch
(
SPCache
*
pCache
,
const
SPgid
*
pPgid
,
bool
alcNewPage
)
{
SPage
*
tdbPCacheFetch
(
SPCache
*
pCache
,
const
SPgid
*
pPgid
,
TXN
*
pTxn
)
{
SPage
*
pPage
;
tdbPCacheLock
(
pCache
);
pPage
=
tdbPCacheFetchImpl
(
pCache
,
pPgid
,
alcNewPage
);
pPage
=
tdbPCacheFetchImpl
(
pCache
,
pPgid
,
pTxn
);
if
(
pPage
)
{
TDB_REF_PAGE
(
pPage
);
}
...
...
@@ -88,32 +93,40 @@ SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, bool alcNewPage) {
return
pPage
;
}
void
tdbPCacheRelease
(
SPCache
*
pCache
,
SPage
*
pPage
)
{
void
tdbPCacheRelease
(
SPCache
*
pCache
,
SPage
*
pPage
,
TXN
*
pTxn
)
{
i32
nRef
;
nRef
=
TDB_UNREF_PAGE
(
pPage
);
ASSERT
(
nRef
>=
0
);
if
(
nRef
==
0
)
{
tdbPCacheUnpinPage
(
pCache
,
pPage
);
tdbPCacheLock
(
pCache
);
// test the nRef again to make sure
// it is safe th handle the page
nRef
=
TDB_GET_PAGE_REF
(
pPage
);
if
(
nRef
==
0
)
{
if
(
pPage
->
isLocal
)
{
tdbPCacheUnpinPage
(
pCache
,
pPage
);
}
else
{
// remove from hash
tdbPCacheRemovePageFromHash
(
pCache
,
pPage
);
// free the page
if
(
pTxn
&&
pTxn
->
xFree
)
{
tdbPageDestroy
(
pPage
,
pTxn
->
xFree
,
pTxn
->
xArg
);
}
}
}
tdbPCacheUnlock
(
pCache
);
}
}
static
void
tdbPCacheInitLock
(
SPCache
*
pCache
)
{
tdbMutexInit
(
&
(
pCache
->
mutex
),
NULL
);
}
static
void
tdbPCacheClearLock
(
SPCache
*
pCache
)
{
tdbMutexDestroy
(
&
(
pCache
->
mutex
));
}
static
void
tdbPCacheLock
(
SPCache
*
pCache
)
{
tdbMutexLock
(
&
(
pCache
->
mutex
));
}
static
void
tdbPCacheUnlock
(
SPCache
*
pCache
)
{
tdbMutexUnlock
(
&
(
pCache
->
mutex
));
}
static
bool
tdbPCacheLocked
(
SPCache
*
pCache
)
{
assert
(
0
);
// TODO
return
true
;
}
int
tdbPCacheGetPageSize
(
SPCache
*
pCache
)
{
return
pCache
->
pageSize
;
}
static
SPage
*
tdbPCacheFetchImpl
(
SPCache
*
pCache
,
const
SPgid
*
pPgid
,
bool
alcNewPage
)
{
static
SPage
*
tdbPCacheFetchImpl
(
SPCache
*
pCache
,
const
SPgid
*
pPgid
,
TXN
*
pTxn
)
{
int
ret
;
SPage
*
pPage
;
// 1. Search the hash table
...
...
@@ -123,10 +136,10 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, bool alcNe
pPage
=
pPage
->
pHashNext
;
}
if
(
pPage
||
!
alcNewPage
)
{
if
(
pPage
)
{
tdbPCachePinPage
(
pCache
,
pPage
);
}
if
(
pPage
)
{
// TODO: the page need to be copied and
// replaced the page in hash table
tdbPCachePinPage
(
pCache
,
pPage
);
return
pPage
;
}
...
...
@@ -145,7 +158,20 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, bool alcNe
tdbPCachePinPage
(
pCache
,
pPage
);
}
// 4. Try a stress allocation (TODO)
// 4. Try a create new page
if
(
!
pPage
&&
pTxn
&&
pTxn
->
xMalloc
)
{
ret
=
tdbPageCreate
(
pCache
->
pageSize
,
&
pPage
,
pTxn
->
xMalloc
,
pTxn
->
xArg
);
if
(
ret
<
0
)
{
// TODO
ASSERT
(
0
);
return
NULL
;
}
// init the page fields
pPage
->
isAnchor
=
0
;
pPage
->
isLocal
=
0
;
TDB_INIT_PAGE_REF
(
pPage
);
}
// 5. Page here are just created from a free list
// or by recycling or allocated streesly,
...
...
@@ -154,6 +180,8 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, bool alcNe
memcpy
(
&
(
pPage
->
pgid
),
pPgid
,
sizeof
(
*
pPgid
));
pPage
->
pLruNext
=
NULL
;
pPage
->
pPager
=
NULL
;
// TODO: allocated page may not add to hash
tdbPCacheAddPageToHash
(
pCache
,
pPage
);
}
...
...
@@ -173,25 +201,17 @@ static void tdbPCachePinPage(SPCache *pCache, SPage *pPage) {
static
void
tdbPCacheUnpinPage
(
SPCache
*
pCache
,
SPage
*
pPage
)
{
i32
nRef
;
tdbPCacheLock
(
pCache
);
ASSERT
(
!
pPage
->
isDirty
);
ASSERT
(
TDB_GET_PAGE_REF
(
pPage
)
==
0
);
nRef
=
TDB_GET_PAGE_REF
(
pPage
);
ASSERT
(
nRef
>=
0
);
if
(
nRef
==
0
)
{
// Add the page to LRU list
ASSERT
(
pPage
->
pLruNext
==
NULL
);
ASSERT
(
pPage
->
pLruNext
==
NULL
);
pPage
->
pLruPrev
=
&
(
pCache
->
lru
);
pPage
->
pLruNext
=
pCache
->
lru
.
pLruNext
;
pCache
->
lru
.
pLruNext
->
pLruPrev
=
pPage
;
pCache
->
lru
.
pLruNext
=
pPage
;
}
pPage
->
pLruPrev
=
&
(
pCache
->
lru
);
pPage
->
pLruNext
=
pCache
->
lru
.
pLruNext
;
pCache
->
lru
.
pLruNext
->
pLruPrev
=
pPage
;
pCache
->
lru
.
pLruNext
=
pPage
;
pCache
->
nRecyclable
++
;
tdbPCacheUnlock
(
pCache
);
}
static
void
tdbPCacheRemovePageFromHash
(
SPCache
*
pCache
,
SPage
*
pPage
)
{
...
...
@@ -238,13 +258,14 @@ static int tdbPCacheOpenImpl(SPCache *pCache) {
// pPage->pgid = 0;
pPage
->
isAnchor
=
0
;
pPage
->
isLocal
Page
=
1
;
pPage
->
isLocal
=
1
;
TDB_INIT_PAGE_REF
(
pPage
);
pPage
->
pHashNext
=
NULL
;
pPage
->
pLruNext
=
NULL
;
pPage
->
pLruPrev
=
NULL
;
pPage
->
pDirtyNext
=
NULL
;
// add page to free list
pPage
->
pFreeNext
=
pCache
->
pFree
;
pCache
->
pFree
=
pPage
;
pCache
->
nFree
++
;
...
...
@@ -268,4 +289,13 @@ static int tdbPCacheOpenImpl(SPCache *pCache) {
return
0
;
}
int
tdbPCacheGetPageSize
(
SPCache
*
pCache
)
{
return
pCache
->
pageSize
;
}
static
int
tdbPCacheCloseImpl
(
SPCache
*
pCache
)
{
SPage
*
pPage
;
for
(
pPage
=
pCache
->
pList
;
pPage
;
pPage
=
pCache
->
pList
)
{
pCache
->
pList
=
pPage
->
pCacheNext
;
tdbPageDestroy
(
pPage
,
NULL
,
NULL
);
}
tdbPCacheDestroyLock
(
pCache
);
}
source/libs/tdb/src/db/tdbPage.c
浏览文件 @
f739e7c3
...
...
@@ -278,7 +278,7 @@ static int tdbPageAllocate(SPage *pPage, int szCell, SCell **ppCell) {
// 2. Try to allocate from the page free list
cellFree
=
TDB_PAGE_FCELL
(
pPage
);
ASSERT
(
cellFree
==
0
||
cellFree
>
pPage
->
pFreeEnd
-
pPage
->
pData
);
ASSERT
(
cellFree
==
0
||
cellFree
>
=
pPage
->
pFreeEnd
-
pPage
->
pData
);
if
(
cellFree
&&
pPage
->
pFreeEnd
-
pPage
->
pFreeStart
>=
TDB_PAGE_OFFSET_SIZE
(
pPage
))
{
SCell
*
pPrevFreeCell
=
NULL
;
int
szPrevFreeCell
;
...
...
source/libs/tdb/src/db/tdbPager.c
浏览文件 @
f739e7c3
...
...
@@ -27,7 +27,6 @@ TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct")
#define TDB_PAGE_INITIALIZED(pPage) ((pPage)->pPager != NULL)
static
int
tdbPagerAllocPage
(
SPager
*
pPager
,
SPgno
*
ppgno
);
static
int
tdbPagerInitPage
(
SPager
*
pPager
,
SPage
*
pPage
,
int
(
*
initPage
)(
SPage
*
,
void
*
),
void
*
arg
,
u8
loadPage
);
static
int
tdbPagerWritePageToJournal
(
SPager
*
pPager
,
SPage
*
pPage
);
static
int
tdbPagerWritePageToDB
(
SPager
*
pPager
,
SPage
*
pPage
);
...
...
@@ -77,6 +76,8 @@ int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) {
// pPager->jfd = -1;
pPager
->
pageSize
=
tdbPCacheGetPageSize
(
pCache
);
// pPager->dbOrigSize
ret
=
tdbGetFileSize
(
pPager
->
fd
,
pPager
->
pageSize
,
&
(
pPager
->
dbOrigSize
));
*
ppPager
=
pPager
;
return
0
;
...
...
@@ -92,26 +93,32 @@ int tdbPagerOpenDB(SPager *pPager, SPgno *ppgno, bool toCreate) {
SPage
*
pPage
;
int
ret
;
{
// TODO: try to search the main DB to get the page number
if
(
pPager
->
dbOrigSize
>
0
)
{
pgno
=
1
;
}
else
{
pgno
=
0
;
}
// if (pgno == 0 && toCreate) {
// ret = tdbPagerAllocPage(pPager, &pPage, &pgno);
// if (ret < 0) {
// return -1;
// }
{
// TODO: try to search the main DB to get the page number
// pgno = 0;
}
// if (pgno == 0 && toCreate) {
// ret = tdbPagerAllocPage(pPager, &pPage, &pgno);
// if (ret < 0) {
// return -1;
// }
// // TODO: Need to zero the page
// // TODO: Need to zero the page
// ret = tdbPagerWrite(pPager, pPage);
// if (ret < 0) {
// return -1;
// }
// }
// ret = tdbPagerWrite(pPager, pPage);
// if (ret < 0) {
// return -1;
// }
// }
*
ppgno
=
pgno
;
*
ppgno
=
pgno
;
return
0
;
}
...
...
@@ -157,7 +164,7 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) {
return
0
;
}
int
tdbPagerBegin
(
SPager
*
pPager
)
{
int
tdbPagerBegin
(
SPager
*
pPager
,
TXN
*
pTxn
)
{
if
(
pPager
->
inTran
)
{
return
0
;
}
...
...
@@ -175,7 +182,7 @@ int tdbPagerBegin(SPager *pPager) {
return
0
;
}
int
tdbPagerCommit
(
SPager
*
pPager
)
{
int
tdbPagerCommit
(
SPager
*
pPager
,
TXN
*
pTxn
)
{
SPage
*
pPage
;
int
ret
;
...
...
@@ -204,7 +211,7 @@ int tdbPagerCommit(SPager *pPager) {
pPage
->
isDirty
=
0
;
tdbPCacheRelease
(
pPager
->
pCache
,
pPage
);
tdbPCacheRelease
(
pPager
->
pCache
,
pPage
,
pTxn
);
}
// sync the db file
...
...
@@ -219,7 +226,8 @@ int tdbPagerCommit(SPager *pPager) {
return
0
;
}
int
tdbPagerFetchPage
(
SPager
*
pPager
,
SPgno
pgno
,
SPage
**
ppPage
,
int
(
*
initPage
)(
SPage
*
,
void
*
),
void
*
arg
)
{
int
tdbPagerFetchPage
(
SPager
*
pPager
,
SPgno
pgno
,
SPage
**
ppPage
,
int
(
*
initPage
)(
SPage
*
,
void
*
),
void
*
arg
,
TXN
*
pTxn
)
{
SPage
*
pPage
;
SPgid
pgid
;
int
ret
;
...
...
@@ -227,7 +235,7 @@ int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage
// Fetch a page container from the page cache
memcpy
(
&
pgid
,
pPager
->
fid
,
TDB_FILE_ID_LEN
);
pgid
.
pgno
=
pgno
;
pPage
=
tdbPCacheFetch
(
pPager
->
pCache
,
&
pgid
,
1
);
pPage
=
tdbPCacheFetch
(
pPager
->
pCache
,
&
pgid
,
pTxn
);
if
(
pPage
==
NULL
)
{
return
-
1
;
}
...
...
@@ -247,7 +255,8 @@ int tdbPagerFetchPage(SPager *pPager, SPgno pgno, SPage **ppPage, int (*initPage
return
0
;
}
int
tdbPagerNewPage
(
SPager
*
pPager
,
SPgno
*
ppgno
,
SPage
**
ppPage
,
int
(
*
initPage
)(
SPage
*
,
void
*
),
void
*
arg
)
{
int
tdbPagerNewPage
(
SPager
*
pPager
,
SPgno
*
ppgno
,
SPage
**
ppPage
,
int
(
*
initPage
)(
SPage
*
,
void
*
),
void
*
arg
,
TXN
*
pTxn
)
{
int
ret
;
SPage
*
pPage
;
SPgid
pgid
;
...
...
@@ -255,6 +264,7 @@ int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage
// Allocate a page number
ret
=
tdbPagerAllocPage
(
pPager
,
ppgno
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
...
...
@@ -263,8 +273,9 @@ int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage
// Fetch a page container from the page cache
memcpy
(
&
pgid
,
pPager
->
fid
,
TDB_FILE_ID_LEN
);
pgid
.
pgno
=
*
ppgno
;
pPage
=
tdbPCacheFetch
(
pPager
->
pCache
,
&
pgid
,
1
);
pPage
=
tdbPCacheFetch
(
pPager
->
pCache
,
&
pgid
,
pTxn
);
if
(
pPage
==
NULL
)
{
ASSERT
(
0
);
return
-
1
;
}
...
...
@@ -273,6 +284,7 @@ int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage
// Initialize the page if need
ret
=
tdbPagerInitPage
(
pPager
,
pPage
,
initPage
,
arg
,
0
);
if
(
ret
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
...
...
@@ -283,7 +295,7 @@ int tdbPagerNewPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage
return
0
;
}
void
tdbPagerReturnPage
(
SPager
*
pPager
,
SPage
*
pPage
)
{
tdbPCacheRelease
(
pPager
->
pCache
,
pPage
);
}
void
tdbPagerReturnPage
(
SPager
*
pPager
,
SPage
*
pPage
,
TXN
*
pTxn
)
{
tdbPCacheRelease
(
pPager
->
pCache
,
pPage
,
pTxn
);
}
static
int
tdbPagerAllocFreePage
(
SPager
*
pPager
,
SPgno
*
ppgno
)
{
// TODO: Allocate a page from the free list
...
...
@@ -295,7 +307,7 @@ static int tdbPagerAllocNewPage(SPager *pPager, SPgno *ppgno) {
return
0
;
}
static
int
tdbPagerAllocPage
(
SPager
*
pPager
,
SPgno
*
ppgno
)
{
int
tdbPagerAllocPage
(
SPager
*
pPager
,
SPgno
*
ppgno
)
{
int
ret
;
*
ppgno
=
0
;
...
...
source/libs/tdb/src/db/tdbTxn.c
浏览文件 @
f739e7c3
...
...
@@ -15,29 +15,17 @@
#include "tdbInt.h"
// int tdbTxnBegin(TENV *pEnv) {
// // TODO
// return 0;
// }
int
tdbTxnOpen
(
TXN
*
pTxn
,
int64_t
txnid
,
void
*
(
*
xMalloc
)(
void
*
,
size_t
),
void
(
*
xFree
)(
void
*
,
void
*
),
void
*
xArg
,
int
flags
)
{
// not support read-committed version at the moment
ASSERT
(
flags
==
0
||
flags
==
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
);
// int tdbTxnCommit(TENV *pEnv) {
// SPager *pPager = NULL;
// int ret;
pTxn
->
flags
=
flags
;
pTxn
->
txnId
=
txnid
;
pTxn
->
xMalloc
=
xMalloc
;
pTxn
->
xFree
=
xFree
;
pTxn
->
xArg
=
xArg
;
return
0
;
}
// for (;;) {
// break;
// ret = tdbPagerCommit(pPager);
// if (ret < 0) {
// ASSERT(0);
// return -1;
// }
// }
// // TODO
// return 0;
// }
// int tdbTxnRollback(TENV *pEnv) {
// // TODO
// return 0;
// }
\ No newline at end of file
int
tdbTxnClose
(
TXN
*
pTxn
)
{
return
0
;
}
\ No newline at end of file
source/libs/tdb/src/db/tdbUtil.c
浏览文件 @
f739e7c3
...
...
@@ -30,5 +30,18 @@ int tdbGnrtFileID(const char *fname, uint8_t *fileid, bool unique) {
((
uint64_t
*
)
fileid
)[
2
]
=
taosRand
();
}
return
0
;
}
int
tdbGetFileSize
(
tdb_fd_t
fd
,
int
szPage
,
SPgno
*
size
)
{
int
ret
;
int64_t
szBytes
;
ret
=
tdbOsFileSize
(
fd
,
&
szBytes
);
if
(
ret
<
0
)
{
return
-
1
;
}
*
size
=
szBytes
/
szPage
;
return
0
;
}
\ No newline at end of file
source/libs/tdb/src/inc/tdbBtree.h
浏览文件 @
f739e7c3
...
...
@@ -35,17 +35,18 @@ struct SBTC {
int
idx
;
int
idxStack
[
BTREE_MAX_DEPTH
+
1
];
SPage
*
pgStack
[
BTREE_MAX_DEPTH
+
1
];
TXN
*
pTxn
;
};
// SBTree
int
tdbBtreeOpen
(
int
keyLen
,
int
valLen
,
SPager
*
pFile
,
FKeyComparator
kcmpr
,
SBTree
**
ppBt
);
int
tdbBtreeClose
(
SBTree
*
pBt
);
int
tdbBtreeInsert
(
SBTree
*
pBt
,
const
void
*
pKey
,
int
kLen
,
const
void
*
pVal
,
int
vLen
);
int
tdbBtreeInsert
(
SBTree
*
pBt
,
const
void
*
pKey
,
int
kLen
,
const
void
*
pVal
,
int
vLen
,
TXN
*
pTxn
);
int
tdbBtreeGet
(
SBTree
*
pBt
,
const
void
*
pKey
,
int
kLen
,
void
**
ppVal
,
int
*
vLen
);
int
tdbBtreePGet
(
SBTree
*
pBt
,
const
void
*
pKey
,
int
kLen
,
void
**
ppKey
,
int
*
pkLen
,
void
**
ppVal
,
int
*
vLen
);
// SBTC
int
tdbBtcOpen
(
SBTC
*
p
Cur
,
SBTree
*
pBt
);
int
tdbBtcOpen
(
SBTC
*
p
Btc
,
SBTree
*
pBt
,
TXN
*
pTxn
);
int
tdbBtcMoveToFirst
(
SBTC
*
pBtc
);
int
tdbBtcMoveToLast
(
SBTC
*
pBtc
);
int
tdbBtreeNext
(
SBTC
*
pBtc
,
void
**
ppKey
,
int
*
kLen
,
void
**
ppVal
,
int
*
vLen
);
...
...
source/libs/tdb/src/inc/tdbDb.h
浏览文件 @
f739e7c3
...
...
@@ -27,7 +27,7 @@ typedef struct STDBC TDBC;
int
tdbDbOpen
(
const
char
*
fname
,
int
keyLen
,
int
valLen
,
FKeyComparator
keyCmprFn
,
TENV
*
pEnv
,
TDB
**
ppDb
);
int
tdbDbClose
(
TDB
*
pDb
);
int
tdbDbDrop
(
TDB
*
pDb
);
int
tdbDbInsert
(
TDB
*
pDb
,
const
void
*
pKey
,
int
keyLen
,
const
void
*
pVal
,
int
valLen
);
int
tdbDbInsert
(
TDB
*
pDb
,
const
void
*
pKey
,
int
keyLen
,
const
void
*
pVal
,
int
valLen
,
TXN
*
pTxn
);
int
tdbDbGet
(
TDB
*
pDb
,
const
void
*
pKey
,
int
kLen
,
void
**
ppVal
,
int
*
vLen
);
int
tdbDbPGet
(
TDB
*
pDb
,
const
void
*
pKey
,
int
kLen
,
void
**
ppKey
,
int
*
pkLen
,
void
**
ppVal
,
int
*
vLen
);
...
...
source/libs/tdb/src/inc/tdbEnv.h
浏览文件 @
f739e7c3
...
...
@@ -33,9 +33,9 @@ typedef struct STEnv {
int
tdbEnvOpen
(
const
char
*
rootDir
,
int
pageSize
,
int
cacheSize
,
TENV
**
ppEnv
);
int
tdbEnvClose
(
TENV
*
pEnv
);
int
tdbBegin
(
TENV
*
pEnv
);
int
tdbCommit
(
TENV
*
pEnv
);
int
tdbRollback
(
TENV
*
pEnv
);
int
tdbBegin
(
TENV
*
pEnv
,
TXN
*
pTxn
);
int
tdbCommit
(
TENV
*
pEnv
,
TXN
*
pTxn
);
int
tdbRollback
(
TENV
*
pEnv
,
TXN
*
pTxn
);
void
tdbEnvAddPager
(
TENV
*
pEnv
,
SPager
*
pPager
);
void
tdbEnvRemovePager
(
TENV
*
pEnv
,
SPager
*
pPager
);
...
...
source/libs/tdb/src/inc/tdbInt.h
浏览文件 @
f739e7c3
...
...
@@ -111,6 +111,21 @@ typedef struct SPager SPager;
typedef
struct
SPCache
SPCache
;
typedef
struct
SPage
SPage
;
// transaction
#define TDB_TXN_WRITE 0x1
#define TDB_TXN_READ_UNCOMMITTED 0x2
typedef
struct
STxn
{
int
flags
;
i64
txnId
;
void
*
(
*
xMalloc
)(
void
*
,
size_t
);
void
(
*
xFree
)(
void
*
,
void
*
);
void
*
xArg
;
}
TXN
;
#define TDB_TXN_IS_WRITE(PTXN) ((PTXN)->flags & TDB_TXN_WRITE)
#define TDB_TXN_IS_READ(PTXN) (!TDB_TXN_IS_WRITE(PTXN))
#define TDB_TXN_IS_READ_UNCOMMITTED(PTXN) ((PTXN)->flags & TDB_TXN_READ_UNCOMMITTED)
#include "tdbOs.h"
#include "tdbUtil.h"
...
...
source/libs/tdb/src/inc/tdbOs.h
浏览文件 @
f739e7c3
...
...
@@ -46,13 +46,14 @@ typedef TdFilePtr tdb_fd_t;
#define tdbOsOpen(PATH, OPTION, MODE) taosOpenFile((PATH), (OPTION))
#define tdbOsClose(FD) taosCloseFile(&(FD))
#define tdbOsRead taosReadFile
#define tdbOsPRead taosPReadFile
#define tdbOsWrite taosWriteFile
#define tdbOsFSync taosFsyncFile
#define tdbOsLSeek taosLSeekFile
#define tdbOsRemove remove
#define tdbOsClose(FD) taosCloseFile(&(FD))
#define tdbOsRead taosReadFile
#define tdbOsPRead taosPReadFile
#define tdbOsWrite taosWriteFile
#define tdbOsFSync taosFsyncFile
#define tdbOsLSeek taosLSeekFile
#define tdbOsRemove remove
#define tdbOsFileSize(FD, PSIZE) taosFStatFile(FD, PSIZE, NULL)
/* directory */
#define tdbOsMkdir taosMkDir
...
...
@@ -110,10 +111,11 @@ i64 tdbOsWrite(tdb_fd_t fd, const void *pData, i64 nBytes);
#define tdbOsFSync fsync
#define tdbOsLSeek lseek
#define tdbOsRemove remove
#define tdbOsFileSize(FD, PSIZE)
/* directory */
#define tdbOsMkdir
mkdir
#define tdbOsRmdir
rmdir
#define tdbOsMkdir mkdir
#define tdbOsRmdir rmdir
// For threads and lock -----------------
/* spin lock */
...
...
source/libs/tdb/src/inc/tdbPCache.h
浏览文件 @
f739e7c3
...
...
@@ -22,9 +22,10 @@ extern "C" {
#define TDB_PCACHE_PAGE \
u8 isAnchor; \
u8 isLocal
Page;
\
u8 isLocal
;
\
u8 isDirty; \
i32 nRef; \
SPage *pCacheNext; \
SPage *pFreeNext; \
SPage *pHashNext; \
SPage *pLruNext; \
...
...
@@ -47,8 +48,8 @@ extern "C" {
int
tdbPCacheOpen
(
int
pageSize
,
int
cacheSize
,
SPCache
**
ppCache
);
int
tdbPCacheClose
(
SPCache
*
pCache
);
SPage
*
tdbPCacheFetch
(
SPCache
*
pCache
,
const
SPgid
*
pPgid
,
bool
alcNewPage
);
void
tdbPCacheRelease
(
SPCache
*
pCache
,
SPage
*
pPage
);
SPage
*
tdbPCacheFetch
(
SPCache
*
pCache
,
const
SPgid
*
pPgid
,
TXN
*
pTxn
);
void
tdbPCacheRelease
(
SPCache
*
pCache
,
SPage
*
pPage
,
TXN
*
pTxn
);
int
tdbPCacheGetPageSize
(
SPCache
*
pCache
);
#ifdef __cplusplus
...
...
source/libs/tdb/src/inc/tdbPager.h
浏览文件 @
f739e7c3
...
...
@@ -40,11 +40,14 @@ int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager);
int
tdbPagerClose
(
SPager
*
pPager
);
int
tdbPagerOpenDB
(
SPager
*
pPager
,
SPgno
*
ppgno
,
bool
toCreate
);
int
tdbPagerWrite
(
SPager
*
pPager
,
SPage
*
pPage
);
int
tdbPagerBegin
(
SPager
*
pPager
);
int
tdbPagerCommit
(
SPager
*
pPager
);
int
tdbPagerFetchPage
(
SPager
*
pPager
,
SPgno
pgno
,
SPage
**
ppPage
,
int
(
*
initPage
)(
SPage
*
,
void
*
),
void
*
arg
);
int
tdbPagerNewPage
(
SPager
*
pPager
,
SPgno
*
ppgno
,
SPage
**
ppPage
,
int
(
*
initPage
)(
SPage
*
,
void
*
),
void
*
arg
);
void
tdbPagerReturnPage
(
SPager
*
pPager
,
SPage
*
pPage
);
int
tdbPagerBegin
(
SPager
*
pPager
,
TXN
*
pTxn
);
int
tdbPagerCommit
(
SPager
*
pPager
,
TXN
*
pTxn
);
int
tdbPagerFetchPage
(
SPager
*
pPager
,
SPgno
pgno
,
SPage
**
ppPage
,
int
(
*
initPage
)(
SPage
*
,
void
*
),
void
*
arg
,
TXN
*
pTxn
);
int
tdbPagerNewPage
(
SPager
*
pPager
,
SPgno
*
ppgno
,
SPage
**
ppPage
,
int
(
*
initPage
)(
SPage
*
,
void
*
),
void
*
arg
,
TXN
*
pTxn
);
void
tdbPagerReturnPage
(
SPager
*
pPager
,
SPage
*
pPage
,
TXN
*
pTxn
);
int
tdbPagerAllocPage
(
SPager
*
pPager
,
SPgno
*
ppgno
);
#ifdef __cplusplus
}
...
...
source/libs/tdb/src/inc/tdbTxn.h
浏览文件 @
f739e7c3
...
...
@@ -20,13 +20,9 @@
extern
"C"
{
#endif
typedef
struct
STxn
TXN
;
struct
STxn
{
u64
txnId
;
void
*
(
*
xMalloc
)(
void
*
,
int
);
void
*
xArg
;
};
int
tdbTxnOpen
(
TXN
*
pTxn
,
int64_t
txnid
,
void
*
(
*
xMalloc
)(
void
*
,
size_t
),
void
(
*
xFree
)(
void
*
,
void
*
),
void
*
xArg
,
int
flags
);
int
tdbTxnClose
(
TXN
*
pTxn
);
#ifdef __cplusplus
}
...
...
source/libs/tdb/src/inc/tdbUtil.h
浏览文件 @
f739e7c3
...
...
@@ -29,6 +29,7 @@ extern "C" {
#define TDB_ROUND8(x) (((x) + 7) & ~7)
int
tdbGnrtFileID
(
const
char
*
fname
,
uint8_t
*
fileid
,
bool
unique
);
int
tdbGetFileSize
(
tdb_fd_t
fd
,
int
szPage
,
SPgno
*
size
);
#define TDB_REALLOC(PTR, SIZE) \
({ \
...
...
@@ -83,15 +84,18 @@ static inline int tdbPutVarInt(u8 *p, int v) {
static
inline
int
tdbGetVarInt
(
const
u8
*
p
,
int
*
v
)
{
int
n
=
0
;
int
tv
=
0
;
int
t
;
for
(;;)
{
if
(
p
[
n
]
<=
0x7f
)
{
tv
=
(
tv
<<
7
)
|
p
[
n
];
t
=
p
[
n
];
tv
|=
(
t
<<
(
7
*
n
));
n
++
;
break
;
}
tv
=
(
tv
<<
7
)
|
(
p
[
n
]
&
0x7f
);
t
=
p
[
n
]
&
0x7f
;
tv
|=
(
t
<<
(
7
*
n
));
n
++
;
}
...
...
source/libs/tdb/test/CMakeLists.txt
浏览文件 @
f739e7c3
# tdbTest
add_executable
(
tdbTest
"tdbTest.cpp"
)
target_link_libraries
(
tdbTest tdb gtest gtest_main
)
\ No newline at end of file
target_link_libraries
(
tdbTest tdb gtest gtest_main
)
# tdbUtilTest
add_executable
(
tdbUtilTest
"tdbUtilTest.cpp"
)
target_link_libraries
(
tdbUtilTest tdb gtest gtest_main
)
\ No newline at end of file
source/libs/tdb/test/tdbTest.cpp
浏览文件 @
f739e7c3
...
...
@@ -19,7 +19,7 @@ static SPoolMem *openPool() {
return
pPool
;
}
static
void
cl
ose
Pool
(
SPoolMem
*
pPool
)
{
static
void
cl
ear
Pool
(
SPoolMem
*
pPool
)
{
SPoolMem
*
pMem
;
do
{
...
...
@@ -35,13 +35,14 @@ static void closePool(SPoolMem *pPool) {
}
while
(
1
);
assert
(
pPool
->
size
==
0
);
}
static
void
closePool
(
SPoolMem
*
pPool
)
{
clearPool
(
pPool
);
tdbOsFree
(
pPool
);
}
#define clearPool closePool
static
void
*
poolMalloc
(
void
*
arg
,
int
size
)
{
static
void
*
poolMalloc
(
void
*
arg
,
size_t
size
)
{
void
*
ptr
=
NULL
;
SPoolMem
*
pPool
=
(
SPoolMem
*
)
arg
;
SPoolMem
*
pMem
;
...
...
@@ -118,7 +119,8 @@ TEST(tdb_test, simple_test) {
TENV
*
pEnv
;
TDB
*
pDb
;
FKeyComparator
compFunc
;
int
nData
=
50000000
;
int
nData
=
10000000
;
TXN
txn
;
// Open Env
ret
=
tdbEnvOpen
(
"tdb"
,
4096
,
64
,
&
pEnv
);
...
...
@@ -130,26 +132,43 @@ TEST(tdb_test, simple_test) {
GTEST_ASSERT_EQ
(
ret
,
0
);
{
char
key
[
64
];
char
val
[
64
];
{
// Insert some data
for
(
int
i
=
1
;
i
<=
nData
;)
{
tdbBegin
(
pEnv
);
for
(
int
k
=
0
;
k
<
2000
;
k
++
)
{
sprintf
(
key
,
"key%d"
,
i
);
sprintf
(
val
,
"value%d"
,
i
);
ret
=
tdbDbInsert
(
pDb
,
key
,
strlen
(
key
),
val
,
strlen
(
val
));
GTEST_ASSERT_EQ
(
ret
,
0
);
i
++
;
}
tdbCommit
(
pEnv
);
char
key
[
64
];
char
val
[
64
];
int64_t
poolLimit
=
4096
;
// 1M pool limit
int64_t
txnid
=
0
;
SPoolMem
*
pPool
;
// open the pool
pPool
=
openPool
();
// start a transaction
txnid
++
;
tdbTxnOpen
(
&
txn
,
txnid
,
poolMalloc
,
poolFree
,
pPool
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
);
tdbBegin
(
pEnv
,
&
txn
);
for
(
int
iData
=
1
;
iData
<=
nData
;
iData
++
)
{
sprintf
(
key
,
"key%d"
,
iData
);
sprintf
(
val
,
"value%d"
,
iData
);
ret
=
tdbDbInsert
(
pDb
,
key
,
strlen
(
key
),
val
,
strlen
(
val
),
&
txn
);
GTEST_ASSERT_EQ
(
ret
,
0
);
// if pool is full, commit the transaction and start a new one
if
(
pPool
->
size
>=
poolLimit
)
{
// commit current transaction
tdbCommit
(
pEnv
,
&
txn
);
tdbTxnClose
(
&
txn
);
// start a new transaction
clearPool
(
pPool
);
txnid
++
;
tdbTxnOpen
(
&
txn
,
txnid
,
poolMalloc
,
poolFree
,
pPool
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
);
tdbBegin
(
pEnv
,
&
txn
);
}
}
tdbCommit
(
pEnv
);
// commit the transaction
tdbCommit
(
pEnv
,
&
txn
);
tdbTxnClose
(
&
txn
);
{
// Query the data
void
*
pVal
=
NULL
;
...
...
source/libs/tdb/test/tdbUtilTest.cpp
0 → 100644
浏览文件 @
f739e7c3
#include <gtest/gtest.h>
#include "tdbInt.h"
#include <string>
TEST
(
tdb_util_test
,
simple_test
)
{
int
vEncode
=
5000
;
int
vDecode
;
int
nEncode
;
int
nDecode
;
u8
buffer
[
128
];
nEncode
=
tdbPutVarInt
(
buffer
,
vEncode
);
nDecode
=
tdbGetVarInt
(
buffer
,
&
vDecode
);
GTEST_ASSERT_EQ
(
nEncode
,
nDecode
);
GTEST_ASSERT_EQ
(
vEncode
,
vDecode
);
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录