Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
90512f6e
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
90512f6e
编写于
12月 10, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/dnode3
上级
17a3469a
bd408504
变更
23
隐藏空白更改
内联
并排
Showing
23 changed file
with
1100 addition
and
539 deletion
+1100
-539
include/libs/wal/wal.h
include/libs/wal/wal.h
+22
-15
include/util/tarray.h
include/util/tarray.h
+7
-0
include/util/tfile.h
include/util/tfile.h
+2
-0
include/util/tmd5.h
include/util/tmd5.h
+4
-4
include/util/tutil.h
include/util/tutil.h
+4
-4
source/dnode/vnode/impl/test/vnodeApiTests.cpp
source/dnode/vnode/impl/test/vnodeApiTests.cpp
+2
-2
source/dnode/vnode/meta/src/metaBDBImpl.c
source/dnode/vnode/meta/src/metaBDBImpl.c
+141
-163
source/libs/index/src/index_fst.c
source/libs/index/src/index_fst.c
+17
-9
source/libs/index/test/indexTests.cpp
source/libs/index/test/indexTests.cpp
+155
-54
source/libs/transport/src/rpcMain.c
source/libs/transport/src/rpcMain.c
+12
-12
source/libs/wal/CMakeLists.txt
source/libs/wal/CMakeLists.txt
+5
-0
source/libs/wal/inc/walInt.h
source/libs/wal/inc/walInt.h
+65
-1
source/libs/wal/src/walIndex.c
source/libs/wal/src/walIndex.c
+33
-28
source/libs/wal/src/walMeta.c
source/libs/wal/src/walMeta.c
+219
-0
source/libs/wal/src/walMgmt.c
source/libs/wal/src/walMgmt.c
+31
-38
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+44
-4
source/libs/wal/src/walWrite.c
source/libs/wal/src/walWrite.c
+147
-63
source/libs/wal/test/CMakeLists.txt
source/libs/wal/test/CMakeLists.txt
+20
-0
source/libs/wal/test/walMetaTest.cpp
source/libs/wal/test/walMetaTest.cpp
+149
-0
source/libs/wal/test/walTests.cpp
source/libs/wal/test/walTests.cpp
+0
-137
source/util/src/tarray.c
source/util/src/tarray.c
+10
-0
source/util/src/tfile.c
source/util/src/tfile.c
+6
-0
source/util/src/tmd5.c
source/util/src/tmd5.c
+5
-5
未找到文件。
include/libs/wal/wal.h
浏览文件 @
90512f6e
...
@@ -55,12 +55,14 @@ typedef struct {
...
@@ -55,12 +55,14 @@ typedef struct {
uint32_t
signature
;
uint32_t
signature
;
uint32_t
cksumHead
;
uint32_t
cksumHead
;
uint32_t
cksumBody
;
uint32_t
cksumBody
;
//
char cont[];
char
cont
[];
}
SWalHead
;
}
SWalHead
;
typedef
struct
{
typedef
struct
{
int32_t
vgId
;
int32_t
vgId
;
int32_t
fsyncPeriod
;
// millisecond
int32_t
fsyncPeriod
;
// millisecond
int32_t
rollPeriod
;
int64_t
segSize
;
EWalType
walLevel
;
// wal level
EWalType
walLevel
;
// wal level
}
SWalCfg
;
}
SWalCfg
;
...
@@ -87,36 +89,41 @@ typedef struct SWal {
...
@@ -87,36 +89,41 @@ typedef struct SWal {
// cfg
// cfg
int32_t
vgId
;
int32_t
vgId
;
int32_t
fsyncPeriod
;
// millisecond
int32_t
fsyncPeriod
;
// millisecond
int32_t
fsyncSeq
;
int32_t
rollPeriod
;
// second
int32_t
rollPeriod
;
// second
int64_t
segSize
;
int64_t
segSize
;
int64_t
retentionSize
;
int32_t
retentionPeriod
;
EWalType
level
;
EWalType
level
;
//total size
int64_t
totSize
;
//fsync seq
int32_t
fsyncSeq
;
//reference
//reference
int64_t
refId
;
int64_t
refId
;
//current tfd
//write tfd
int64_t
curLogTfd
;
int64_t
writeLogTfd
;
int64_t
curIdxTfd
;
int64_t
writeIdxTfd
;
//read tfd
int64_t
readLogTfd
;
int64_t
readIdxTfd
;
//current version
//current version
int64_t
curVersion
;
int64_t
curVersion
;
int64_t
curLogOffset
;
//wal lifecycle
//current file version
int64_t
curFileFirstVersion
;
int64_t
curFileLastVersion
;
//wal fileset version
int64_t
firstVersion
;
int64_t
firstVersion
;
int64_t
snapshotVersion
;
int64_t
snapshotVersion
;
int64_t
commitVersion
;
int64_t
lastVersion
;
int64_t
lastVersion
;
int64_t
lastFileName
;
//roll status
//roll status
int64_t
lastRollSeq
;
int64_t
lastRollSeq
;
int64_t
lastFileWriteSize
;
//file set
int32_t
writeCur
;
int32_t
readCur
;
SArray
*
fileInfoSet
;
//ctl
//ctl
int32_t
curStatus
;
int32_t
curStatus
;
pthread_mutex_t
mutex
;
pthread_mutex_t
mutex
;
//path
//path
char
path
[
WAL_PATH_LEN
];
char
path
[
WAL_PATH_LEN
];
//file set
SArray
*
fileSet
;
//reusable write head
//reusable write head
SWalHead
head
;
SWalHead
head
;
}
SWal
;
// WAL HANDLE
}
SWal
;
// WAL HANDLE
...
@@ -133,7 +140,7 @@ int32_t walAlter(SWal *, SWalCfg *pCfg);
...
@@ -133,7 +140,7 @@ int32_t walAlter(SWal *, SWalCfg *pCfg);
void
walClose
(
SWal
*
);
void
walClose
(
SWal
*
);
// write
// write
int64_t
walWrite
(
SWal
*
,
int64_t
index
,
uint8_t
msgType
,
void
*
body
,
int32_t
bodyLen
);
int64_t
walWrite
(
SWal
*
,
int64_t
index
,
uint8_t
msgType
,
const
void
*
body
,
int32_t
bodyLen
);
void
walFsync
(
SWal
*
,
bool
force
);
void
walFsync
(
SWal
*
,
bool
force
);
// apis for lifecycle management
// apis for lifecycle management
...
...
include/util/tarray.h
浏览文件 @
90512f6e
...
@@ -146,6 +146,13 @@ void* taosArrayInsert(SArray* pArray, size_t index, void* pData);
...
@@ -146,6 +146,13 @@ void* taosArrayInsert(SArray* pArray, size_t index, void* pData);
*/
*/
void
taosArraySet
(
SArray
*
pArray
,
size_t
index
,
void
*
pData
);
void
taosArraySet
(
SArray
*
pArray
,
size_t
index
,
void
*
pData
);
/**
* remove some data entry from front
* @param pArray
* @param cnt
*/
void
taosArrayPopFrontBatch
(
SArray
*
pArray
,
size_t
cnt
);
/**
/**
* remove data entry of the given index
* remove data entry of the given index
* @param pArray
* @param pArray
...
...
include/util/tfile.h
浏览文件 @
90512f6e
...
@@ -16,6 +16,8 @@
...
@@ -16,6 +16,8 @@
#ifndef _TD_UTIL_FILE_H
#ifndef _TD_UTIL_FILE_H
#define _TD_UTIL_FILE_H
#define _TD_UTIL_FILE_H
#include "os.h"
#ifdef __cplusplus
#ifdef __cplusplus
extern
"C"
{
extern
"C"
{
#endif
#endif
...
...
include/util/tmd5.h
浏览文件 @
90512f6e
...
@@ -30,10 +30,10 @@ typedef struct {
...
@@ -30,10 +30,10 @@ typedef struct {
uint32_t
buf
[
4
];
/* scratch buffer */
uint32_t
buf
[
4
];
/* scratch buffer */
uint8_t
in
[
64
];
/* input buffer */
uint8_t
in
[
64
];
/* input buffer */
uint8_t
digest
[
16
];
/* actual digest after MD5Final call */
uint8_t
digest
[
16
];
/* actual digest after MD5Final call */
}
MD5_CTX
;
}
T_
MD5_CTX
;
void
MD5Init
(
MD5_CTX
*
mdContext
);
void
tMD5Init
(
T_
MD5_CTX
*
mdContext
);
void
MD5Update
(
MD5_CTX
*
mdContext
,
uint8_t
*
inBuf
,
unsigned
int
inLen
);
void
tMD5Update
(
T_
MD5_CTX
*
mdContext
,
uint8_t
*
inBuf
,
unsigned
int
inLen
);
void
MD5Final
(
MD5_CTX
*
mdContext
);
void
tMD5Final
(
T_
MD5_CTX
*
mdContext
);
#endif
/*_TD_UTIL_MD5_H*/
#endif
/*_TD_UTIL_MD5_H*/
include/util/tutil.h
浏览文件 @
90512f6e
...
@@ -48,10 +48,10 @@ void taosIpPort2String(uint32_t ip, uint16_t port, char *str);
...
@@ -48,10 +48,10 @@ void taosIpPort2String(uint32_t ip, uint16_t port, char *str);
int32_t
taosGetFqdnPortFromEp
(
const
char
*
ep
,
char
*
fqdn
,
uint16_t
*
port
);
int32_t
taosGetFqdnPortFromEp
(
const
char
*
ep
,
char
*
fqdn
,
uint16_t
*
port
);
static
FORCE_INLINE
void
taosEncryptPass
(
uint8_t
*
inBuf
,
size_t
inLen
,
char
*
target
)
{
static
FORCE_INLINE
void
taosEncryptPass
(
uint8_t
*
inBuf
,
size_t
inLen
,
char
*
target
)
{
MD5_CTX
context
;
T_
MD5_CTX
context
;
MD5Init
(
&
context
);
t
MD5Init
(
&
context
);
MD5Update
(
&
context
,
inBuf
,
(
uint32_
t
)
inLen
);
tMD5Update
(
&
context
,
inBuf
,
(
unsigned
in
t
)
inLen
);
MD5Final
(
&
context
);
t
MD5Final
(
&
context
);
memcpy
(
target
,
context
.
digest
,
TSDB_KEY_LEN
);
memcpy
(
target
,
context
.
digest
,
TSDB_KEY_LEN
);
}
}
...
...
source/dnode/vnode/impl/test/vnodeApiTests.cpp
浏览文件 @
90512f6e
...
@@ -45,7 +45,7 @@ static SKVRow createBasicTag() {
...
@@ -45,7 +45,7 @@ static SKVRow createBasicTag() {
tdInitKVRowBuilder
(
&
rb
);
tdInitKVRowBuilder
(
&
rb
);
for
(
int
i
=
10
;
i
<
1
2
;
i
++
)
{
for
(
int
i
=
0
;
i
<
2
;
i
++
)
{
void
*
pVal
=
malloc
(
sizeof
(
VarDataLenT
)
+
strlen
(
"foo"
));
void
*
pVal
=
malloc
(
sizeof
(
VarDataLenT
)
+
strlen
(
"foo"
));
varDataLen
(
pVal
)
=
strlen
(
"foo"
);
varDataLen
(
pVal
)
=
strlen
(
"foo"
);
memcpy
(
varDataVal
(
pVal
),
"foo"
,
strlen
(
"foo"
));
memcpy
(
varDataVal
(
pVal
),
"foo"
,
strlen
(
"foo"
));
...
@@ -120,7 +120,7 @@ TEST(vnodeApiTest, vnodeOpen_vnodeClose_test) {
...
@@ -120,7 +120,7 @@ TEST(vnodeApiTest, vnodeOpen_vnodeClose_test) {
{
{
// Create some child tables
// Create some child tables
int
ntables
=
100000
;
int
ntables
=
100000
0
;
int
batch
=
10
;
int
batch
=
10
;
for
(
int
i
=
0
;
i
<
ntables
/
batch
;
i
++
)
{
for
(
int
i
=
0
;
i
<
ntables
/
batch
;
i
++
)
{
SArray
*
pMsgs
=
(
SArray
*
)
taosArrayInit
(
batch
,
sizeof
(
SRpcMsg
*
));
SArray
*
pMsgs
=
(
SArray
*
)
taosArrayInit
(
batch
,
sizeof
(
SRpcMsg
*
));
...
...
source/dnode/vnode/meta/src/metaBDBImpl.c
浏览文件 @
90512f6e
...
@@ -20,6 +20,11 @@
...
@@ -20,6 +20,11 @@
#include "tcoding.h"
#include "tcoding.h"
#include "thash.h"
#include "thash.h"
typedef
struct
{
tb_uid_t
uid
;
int32_t
sver
;
}
SSchemaKey
;
struct
SMetaDB
{
struct
SMetaDB
{
// DB
// DB
DB
*
pTbDB
;
DB
*
pTbDB
;
...
@@ -39,13 +44,17 @@ static SMetaDB *metaNewDB();
...
@@ -39,13 +44,17 @@ static SMetaDB *metaNewDB();
static
void
metaFreeDB
(
SMetaDB
*
pDB
);
static
void
metaFreeDB
(
SMetaDB
*
pDB
);
static
int
metaOpenBDBEnv
(
DB_ENV
**
ppEnv
,
const
char
*
path
);
static
int
metaOpenBDBEnv
(
DB_ENV
**
ppEnv
,
const
char
*
path
);
static
void
metaCloseBDBEnv
(
DB_ENV
*
pEnv
);
static
void
metaCloseBDBEnv
(
DB_ENV
*
pEnv
);
static
int
metaOpenBDBDb
(
DB
**
ppDB
,
DB_ENV
*
pEnv
,
const
char
*
pFName
);
static
int
metaOpenBDBDb
(
DB
**
ppDB
,
DB_ENV
*
pEnv
,
const
char
*
pFName
,
bool
isDup
);
static
void
metaCloseBDBDb
(
DB
*
pDB
);
static
void
metaCloseBDBDb
(
DB
*
pDB
);
static
int
metaOpenBDBIdx
(
DB
**
ppIdx
,
DB_ENV
*
pEnv
,
const
char
*
pFName
,
DB
*
pDB
,
bdbIdxCbPtr
cbf
);
static
int
metaOpenBDBIdx
(
DB
**
ppIdx
,
DB_ENV
*
pEnv
,
const
char
*
pFName
,
DB
*
pDB
,
bdbIdxCbPtr
cbf
,
bool
isDup
);
static
void
metaCloseBDBIdx
(
DB
*
pIdx
);
static
int
metaNameIdxCb
(
DB
*
pIdx
,
const
DBT
*
pKey
,
const
DBT
*
pValue
,
DBT
*
pSKey
);
static
int
metaNameIdxCb
(
DB
*
pIdx
,
const
DBT
*
pKey
,
const
DBT
*
pValue
,
DBT
*
pSKey
);
static
int
metaStbIdxCb
(
DB
*
pIdx
,
const
DBT
*
pKey
,
const
DBT
*
pValue
,
DBT
*
pSKey
);
static
int
metaStbIdxCb
(
DB
*
pIdx
,
const
DBT
*
pKey
,
const
DBT
*
pValue
,
DBT
*
pSKey
);
static
int
metaNtbIdxCb
(
DB
*
pIdx
,
const
DBT
*
pKey
,
const
DBT
*
pValue
,
DBT
*
pSKey
);
static
int
metaNtbIdxCb
(
DB
*
pIdx
,
const
DBT
*
pKey
,
const
DBT
*
pValue
,
DBT
*
pSKey
);
static
int
metaCtbIdxCb
(
DB
*
pIdx
,
const
DBT
*
pKey
,
const
DBT
*
pValue
,
DBT
*
pSKey
);
static
int
metaCtbIdxCb
(
DB
*
pIdx
,
const
DBT
*
pKey
,
const
DBT
*
pValue
,
DBT
*
pSKey
);
static
int
metaEncodeTbInfo
(
void
**
buf
,
STbCfg
*
pTbCfg
);
static
void
*
metaDecodeTbInfo
(
void
*
buf
,
STbCfg
*
pTbCfg
);
static
void
metaClearTbCfg
(
STbCfg
*
pTbCfg
);
#define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code))
#define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code))
...
@@ -67,33 +76,33 @@ int metaOpenDB(SMeta *pMeta) {
...
@@ -67,33 +76,33 @@ int metaOpenDB(SMeta *pMeta) {
}
}
// Open DBs
// Open DBs
if
(
metaOpenBDBDb
(
&
(
pDB
->
pTbDB
),
pDB
->
pEvn
,
"meta.db"
)
<
0
)
{
if
(
metaOpenBDBDb
(
&
(
pDB
->
pTbDB
),
pDB
->
pEvn
,
"meta.db"
,
false
)
<
0
)
{
metaCloseDB
(
pMeta
);
metaCloseDB
(
pMeta
);
return
-
1
;
return
-
1
;
}
}
if
(
metaOpenBDBDb
(
&
(
pDB
->
pSchemaDB
),
pDB
->
pEvn
,
"meta.db"
)
<
0
)
{
if
(
metaOpenBDBDb
(
&
(
pDB
->
pSchemaDB
),
pDB
->
pEvn
,
"meta.db"
,
false
)
<
0
)
{
metaCloseDB
(
pMeta
);
metaCloseDB
(
pMeta
);
return
-
1
;
return
-
1
;
}
}
// Open Indices
// Open Indices
if
(
metaOpenBDBIdx
(
&
(
pDB
->
pNameIdx
),
pDB
->
pEvn
,
"
index.db"
,
pDB
->
pTbDB
,
&
metaNameIdxCb
)
<
0
)
{
if
(
metaOpenBDBIdx
(
&
(
pDB
->
pNameIdx
),
pDB
->
pEvn
,
"
name.index"
,
pDB
->
pTbDB
,
&
metaNameIdxCb
,
false
)
<
0
)
{
metaCloseDB
(
pMeta
);
metaCloseDB
(
pMeta
);
return
-
1
;
return
-
1
;
}
}
if
(
metaOpenBDBIdx
(
&
(
pDB
->
pStbIdx
),
pDB
->
pEvn
,
"
index.db"
,
pDB
->
pTbDB
,
&
metaStbIdxCb
)
<
0
)
{
if
(
metaOpenBDBIdx
(
&
(
pDB
->
pStbIdx
),
pDB
->
pEvn
,
"
stb.index"
,
pDB
->
pTbDB
,
&
metaStbIdxCb
,
false
)
<
0
)
{
metaCloseDB
(
pMeta
);
metaCloseDB
(
pMeta
);
return
-
1
;
return
-
1
;
}
}
if
(
metaOpenBDBIdx
(
&
(
pDB
->
pNtbIdx
),
pDB
->
pEvn
,
"
index.db"
,
pDB
->
pTbDB
,
&
metaNtbIdxCb
)
<
0
)
{
if
(
metaOpenBDBIdx
(
&
(
pDB
->
pNtbIdx
),
pDB
->
pEvn
,
"
ntb.index"
,
pDB
->
pTbDB
,
&
metaNtbIdxCb
,
false
)
<
0
)
{
metaCloseDB
(
pMeta
);
metaCloseDB
(
pMeta
);
return
-
1
;
return
-
1
;
}
}
if
(
metaOpenBDBIdx
(
&
(
pDB
->
pCtbIdx
),
pDB
->
pEvn
,
"
index.db"
,
pDB
->
pTbDB
,
&
metaCtbIdxCb
)
<
0
)
{
if
(
metaOpenBDBIdx
(
&
(
pDB
->
pCtbIdx
),
pDB
->
pEvn
,
"
ctb.index"
,
pDB
->
pTbDB
,
&
metaCtbIdxCb
,
true
)
<
0
)
{
metaCloseDB
(
pMeta
);
metaCloseDB
(
pMeta
);
return
-
1
;
return
-
1
;
}
}
...
@@ -103,6 +112,12 @@ int metaOpenDB(SMeta *pMeta) {
...
@@ -103,6 +112,12 @@ int metaOpenDB(SMeta *pMeta) {
void
metaCloseDB
(
SMeta
*
pMeta
)
{
void
metaCloseDB
(
SMeta
*
pMeta
)
{
if
(
pMeta
->
pDB
)
{
if
(
pMeta
->
pDB
)
{
metaCloseBDBIdx
(
pMeta
->
pDB
->
pCtbIdx
);
metaCloseBDBIdx
(
pMeta
->
pDB
->
pNtbIdx
);
metaCloseBDBIdx
(
pMeta
->
pDB
->
pStbIdx
);
metaCloseBDBIdx
(
pMeta
->
pDB
->
pNameIdx
);
metaCloseBDBDb
(
pMeta
->
pDB
->
pSchemaDB
);
metaCloseBDBDb
(
pMeta
->
pDB
->
pTbDB
);
metaCloseBDBEnv
(
pMeta
->
pDB
->
pEvn
);
metaCloseBDBEnv
(
pMeta
->
pDB
->
pEvn
);
metaFreeDB
(
pMeta
->
pDB
);
metaFreeDB
(
pMeta
->
pDB
);
pMeta
->
pDB
=
NULL
;
pMeta
->
pDB
=
NULL
;
...
@@ -110,7 +125,60 @@ void metaCloseDB(SMeta *pMeta) {
...
@@ -110,7 +125,60 @@ void metaCloseDB(SMeta *pMeta) {
}
}
int
metaSaveTableToDB
(
SMeta
*
pMeta
,
STbCfg
*
pTbCfg
)
{
int
metaSaveTableToDB
(
SMeta
*
pMeta
,
STbCfg
*
pTbCfg
)
{
// TODO
tb_uid_t
uid
;
char
buf
[
512
];
void
*
pBuf
;
DBT
key
,
value
;
STSchema
*
pSchema
=
NULL
;
if
(
pTbCfg
->
type
==
META_SUPER_TABLE
)
{
uid
=
pTbCfg
->
stbCfg
.
suid
;
}
else
{
uid
=
metaGenerateUid
(
pMeta
);
}
{
// save table info
pBuf
=
buf
;
memset
(
&
key
,
0
,
sizeof
(
key
));
memset
(
&
value
,
0
,
sizeof
(
key
));
key
.
data
=
&
uid
;
key
.
size
=
sizeof
(
uid
);
metaEncodeTbInfo
(
&
pBuf
,
pTbCfg
);
value
.
data
=
buf
;
value
.
size
=
POINTER_DISTANCE
(
pBuf
,
buf
);
value
.
app_data
=
pTbCfg
;
pMeta
->
pDB
->
pTbDB
->
put
(
pMeta
->
pDB
->
pTbDB
,
NULL
,
&
key
,
&
value
,
0
);
}
// save schema
if
(
pTbCfg
->
type
==
META_SUPER_TABLE
)
{
pSchema
=
pTbCfg
->
stbCfg
.
pSchema
;
}
else
if
(
pTbCfg
->
type
==
META_NORMAL_TABLE
)
{
pSchema
=
pTbCfg
->
ntbCfg
.
pSchema
;
}
if
(
pSchema
)
{
pBuf
=
buf
;
memset
(
&
key
,
0
,
sizeof
(
key
));
memset
(
&
value
,
0
,
sizeof
(
key
));
SSchemaKey
schemaKey
=
{
uid
,
schemaVersion
(
pSchema
)};
key
.
data
=
&
schemaKey
;
key
.
size
=
sizeof
(
schemaKey
);
tdEncodeSchema
(
&
pBuf
,
pSchema
);
value
.
data
=
buf
;
value
.
size
=
POINTER_DISTANCE
(
pBuf
,
buf
);
pMeta
->
pDB
->
pSchemaDB
->
put
(
pMeta
->
pDB
->
pSchemaDB
,
NULL
,
&
key
,
&
value
,
0
);
}
return
0
;
return
0
;
}
}
...
@@ -165,16 +233,24 @@ static void metaCloseBDBEnv(DB_ENV *pEnv) {
...
@@ -165,16 +233,24 @@ static void metaCloseBDBEnv(DB_ENV *pEnv) {
}
}
}
}
static
int
metaOpenBDBDb
(
DB
**
ppDB
,
DB_ENV
*
pEnv
,
const
char
*
pFName
)
{
static
int
metaOpenBDBDb
(
DB
**
ppDB
,
DB_ENV
*
pEnv
,
const
char
*
pFName
,
bool
isDup
)
{
int
ret
;
int
ret
;
DB
*
pDB
;
DB
*
pDB
;
ret
=
db_create
(
&
(
(
pDB
)),
(
pEnv
)
,
0
);
ret
=
db_create
(
&
(
pDB
),
pEnv
,
0
);
if
(
ret
!=
0
)
{
if
(
ret
!=
0
)
{
BDB_PERR
(
"Failed to create META DB"
,
ret
);
BDB_PERR
(
"Failed to create META DB"
,
ret
);
return
-
1
;
return
-
1
;
}
}
if
(
isDup
)
{
ret
=
pDB
->
set_flags
(
pDB
,
DB_DUPSORT
);
if
(
ret
!=
0
)
{
BDB_PERR
(
"Failed to set DB flags"
,
ret
);
return
-
1
;
}
}
ret
=
pDB
->
open
(
pDB
,
NULL
,
pFName
,
NULL
,
DB_BTREE
,
DB_CREATE
,
0
);
ret
=
pDB
->
open
(
pDB
,
NULL
,
pFName
,
NULL
,
DB_BTREE
,
DB_CREATE
,
0
);
if
(
ret
)
{
if
(
ret
)
{
BDB_PERR
(
"Failed to open META DB"
,
ret
);
BDB_PERR
(
"Failed to open META DB"
,
ret
);
...
@@ -192,11 +268,11 @@ static void metaCloseBDBDb(DB *pDB) {
...
@@ -192,11 +268,11 @@ static void metaCloseBDBDb(DB *pDB) {
}
}
}
}
static
int
metaOpenBDBIdx
(
DB
**
ppIdx
,
DB_ENV
*
pEnv
,
const
char
*
pFName
,
DB
*
pDB
,
bdbIdxCbPtr
cbf
)
{
static
int
metaOpenBDBIdx
(
DB
**
ppIdx
,
DB_ENV
*
pEnv
,
const
char
*
pFName
,
DB
*
pDB
,
bdbIdxCbPtr
cbf
,
bool
isDup
)
{
DB
*
pIdx
;
DB
*
pIdx
;
int
ret
;
int
ret
;
if
(
metaOpenBDBDb
(
ppIdx
,
pEnv
,
pFName
)
<
0
)
{
if
(
metaOpenBDBDb
(
ppIdx
,
pEnv
,
pFName
,
isDup
)
<
0
)
{
return
-
1
;
return
-
1
;
}
}
...
@@ -216,158 +292,70 @@ static void metaCloseBDBIdx(DB *pIdx) {
...
@@ -216,158 +292,70 @@ static void metaCloseBDBIdx(DB *pIdx) {
}
}
static
int
metaNameIdxCb
(
DB
*
pIdx
,
const
DBT
*
pKey
,
const
DBT
*
pValue
,
DBT
*
pSKey
)
{
static
int
metaNameIdxCb
(
DB
*
pIdx
,
const
DBT
*
pKey
,
const
DBT
*
pValue
,
DBT
*
pSKey
)
{
// TODO
STbCfg
*
pTbCfg
=
(
STbCfg
*
)(
pValue
->
app_data
);
return
0
;
}
static
int
metaStbIdxCb
(
DB
*
pIdx
,
const
DBT
*
pKey
,
const
DBT
*
pValue
,
DBT
*
pSKey
)
{
memset
(
pSKey
,
0
,
sizeof
(
*
pSKey
));
// TODO
return
0
;
}
static
int
metaNtbIdxCb
(
DB
*
pIdx
,
const
DBT
*
pKey
,
const
DBT
*
pValue
,
DBT
*
pSKey
)
{
pSKey
->
data
=
pTbCfg
->
name
;
// TODO
pSKey
->
size
=
strlen
(
pTbCfg
->
name
);
return
0
;
}
static
int
metaCtbIdxCb
(
DB
*
pIdx
,
const
DBT
*
pKey
,
const
DBT
*
pValue
,
DBT
*
pSKey
)
{
// TODO
return
0
;
return
0
;
}
}
#if 0
static
int
metaStbIdxCb
(
DB
*
pIdx
,
const
DBT
*
pKey
,
const
DBT
*
pValue
,
DBT
*
pSKey
)
{
typedef struct {
STbCfg
*
pTbCfg
=
(
STbCfg
*
)(
pValue
->
app_data
);
tb_uid_t uid;
int32_t sver;
} SSchemaKey;
static SMetaDB *metaNewDB();
static void metaFreeDB(SMetaDB *pDB);
static int metaCreateDBEnv(SMetaDB *pDB, const char *path);
static void metaDestroyDBEnv(SMetaDB *pDB);
static int metaEncodeSchemaKey(void **buf, SSchemaKey *pSchemaKey);
static void * metaDecodeSchemaKey(void *buf, SSchemaKey *pSchemaKey);
static int metaNameIdxCb(DB *sdbp, const DBT *pKey, const DBT *pValue, DBT *pSKey);
static int metaUidIdxCb(DB *sdbp, const DBT *pKey, const DBT *pValue, DBT *pSKey);
static void metaPutSchema(SMeta *pMeta, tb_uid_t uid, STSchema *pSchema);
static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg);
static void * metaDecodeTbInfo(void *buf, STbCfg *pTbCfg);
static int metaSaveTbInfo(DB *pDB, tb_uid_t uid, STbCfg *pTbCfg);
#define META_ASSOCIATE_IDX(pDB, pIdx, cbf) \
do { \
int ret = (pDB)->associate((pDB), NULL, (pIdx), (cbf), 0); \
if (ret != 0) { \
P_ERROR("Failed to associate META DB", ret); \
metaCloseDB(pMeta); \
} \
} while (0)
int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
char buf[512];
void * pBuf;
DBT key = {0};
DBT value = {0};
SSchemaKey schemaKey;
tb_uid_t uid;
if
(
pTbCfg
->
type
==
META_SUPER_TABLE
)
{
if
(
pTbCfg
->
type
==
META_SUPER_TABLE
)
{
// Handle SUPER table
memset
(
pSKey
,
0
,
sizeof
(
*
pSKey
));
uid = pTbCfg->stbCfg.suid;
pSKey
->
data
=
pKey
->
data
;
pSKey
->
size
=
pKey
->
size
;
// Same table info
metaSaveTbInfo(pMeta->pDB->pStbDB, uid, pTbCfg);
// save schema
metaPutSchema(pMeta, uid, pTbCfg->stbCfg.pSchema);
{
// Create a super table DB and corresponding index DB
DB *pStbDB;
DB *pStbIdxDB;
META_OPEN_DB(pStbDB, pMeta->pDB->pEvn, "meta.db");
META_OPEN_DB(pStbIdxDB, pMeta->pDB->pEvn, "index.db");
// TODO META_ASSOCIATE_IDX();
}
} else if (pTbCfg->type == META_CHILD_TABLE) {
// Handle CHILD table
uid = metaGenerateUid(pMeta);
DB *pCTbDB = taosHashGet(pMeta->pDB->pCtbMap, &(pTbCfg->ctbCfg.suid), sizeof(pTbCfg->ctbCfg.suid));
if (pCTbDB == NULL) {
ASSERT(0);
}
metaSaveTbInfo(pCTbDB, uid, pTbCfg);
return
0
;
} else if (pTbCfg->type == META_NORMAL_TABLE) {
// Handle NORMAL table
uid = metaGenerateUid(pMeta);
metaSaveTbInfo(pMeta->pDB->pNtbDB, uid, pTbCfg);
metaPutSchema(pMeta, uid, pTbCfg->stbCfg.pSchema);
}
else
{
}
else
{
ASSERT(0)
;
return
DB_DONOTINDEX
;
}
}
return 0;
}
int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) {
// TODO
}
/* ------------------------ STATIC METHODS ------------------------ */
static int metaEncodeSchemaKey(void **buf, SSchemaKey *pSchemaKey) {
int tsize = 0;
tsize += taosEncodeFixedU64(buf, pSchemaKey->uid);
tsize += taosEncodeFixedI32(buf, pSchemaKey->sver);
return tsize;
}
}
static void *metaDecodeSchemaKey(void *buf, SSchemaKey *pSchemaKey) {
static
int
metaNtbIdxCb
(
DB
*
pIdx
,
const
DBT
*
pKey
,
const
DBT
*
pValue
,
DBT
*
pSKey
)
{
buf = taosDecodeFixedU64(buf, &(pSchemaKey->uid));
STbCfg
*
pTbCfg
=
(
STbCfg
*
)(
pValue
->
app_data
);
buf = taosDecodeFixedI32(buf, &(pSchemaKey->sver));
return buf;
if
(
pTbCfg
->
type
==
META_NORMAL_TABLE
)
{
}
memset
(
pSKey
,
0
,
sizeof
(
*
pSKey
));
pSKey
->
data
=
pKey
->
data
;
pSKey
->
size
=
pKey
->
size
;
static int metaNameIdxCb(DB *sdbp, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
return
0
;
// TODO
}
else
{
return 0;
return
DB_DONOTINDEX
;
}
}
}
static int metaUidIdxCb(DB *sdbp, const DBT *pKey, const DBT *pValue, DBT *pSKey) {
static
int
metaCtbIdxCb
(
DB
*
pIdx
,
const
DBT
*
pKey
,
const
DBT
*
pValue
,
DBT
*
pSKey
)
{
// TODO
STbCfg
*
pTbCfg
=
(
STbCfg
*
)(
pValue
->
app_data
);
return 0;
DBT
*
pDbt
;
}
static void metaPutSchema(SMeta *pMeta, tb_uid_t uid, STSchema *pSchema) {
if
(
pTbCfg
->
type
==
META_CHILD_TABLE
)
{
SSchemaKey skey;
pDbt
=
calloc
(
2
,
sizeof
(
DBT
));
char buf[256];
void * pBuf = buf;
DBT key = {0};
DBT value = {0};
skey.uid = uid;
// First key is suid
skey.sver = schemaVersion(pSchema);
pDbt
[
0
].
data
=
&
(
pTbCfg
->
ctbCfg
.
suid
);
pDbt
[
0
].
size
=
sizeof
(
pTbCfg
->
ctbCfg
.
suid
);
key.data = &skey;
// Second key is the first tag
key.size = sizeof(skey);
void
*
pTagVal
=
tdGetKVRowValOfCol
(
pTbCfg
->
ctbCfg
.
pTag
,
0
);
pDbt
[
1
].
data
=
varDataVal
(
pTagVal
);
pDbt
[
1
].
size
=
varDataLen
(
pTagVal
);
tdEncodeSchema(&pBuf, pSchema);
// Set index key
value.data = buf;
memset
(
pSKey
,
0
,
sizeof
(
*
pSKey
));
value.size = POINTER_DISTANCE(pBuf, buf);
pSKey
->
flags
=
DB_DBT_MULTIPLE
|
DB_DBT_APPMALLOC
;
pSKey
->
data
=
pDbt
;
pSKey
->
size
=
2
;
pMeta->pDB->pSchemaDB->put(pMeta->pDB->pSchemaDB, NULL, &key, &value, 0);
return
0
;
}
else
{
return
DB_DONOTINDEX
;
}
}
}
static
int
metaEncodeTbInfo
(
void
**
buf
,
STbCfg
*
pTbCfg
)
{
static
int
metaEncodeTbInfo
(
void
**
buf
,
STbCfg
*
pTbCfg
)
{
...
@@ -376,6 +364,7 @@ static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) {
...
@@ -376,6 +364,7 @@ static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) {
tsize
+=
taosEncodeString
(
buf
,
pTbCfg
->
name
);
tsize
+=
taosEncodeString
(
buf
,
pTbCfg
->
name
);
tsize
+=
taosEncodeFixedU32
(
buf
,
pTbCfg
->
ttl
);
tsize
+=
taosEncodeFixedU32
(
buf
,
pTbCfg
->
ttl
);
tsize
+=
taosEncodeFixedU32
(
buf
,
pTbCfg
->
keep
);
tsize
+=
taosEncodeFixedU32
(
buf
,
pTbCfg
->
keep
);
tsize
+=
taosEncodeFixedU8
(
buf
,
pTbCfg
->
type
);
if
(
pTbCfg
->
type
==
META_SUPER_TABLE
)
{
if
(
pTbCfg
->
type
==
META_SUPER_TABLE
)
{
tsize
+=
tdEncodeSchema
(
buf
,
pTbCfg
->
stbCfg
.
pTagSchema
);
tsize
+=
tdEncodeSchema
(
buf
,
pTbCfg
->
stbCfg
.
pTagSchema
);
...
@@ -391,10 +380,10 @@ static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) {
...
@@ -391,10 +380,10 @@ static int metaEncodeTbInfo(void **buf, STbCfg *pTbCfg) {
}
}
static
void
*
metaDecodeTbInfo
(
void
*
buf
,
STbCfg
*
pTbCfg
)
{
static
void
*
metaDecodeTbInfo
(
void
*
buf
,
STbCfg
*
pTbCfg
)
{
// TODO
buf
=
taosDecodeString
(
buf
,
&
(
pTbCfg
->
name
));
buf
=
taosDecodeString
(
buf
,
&
(
pTbCfg
->
name
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pTbCfg
->
ttl
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pTbCfg
->
ttl
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pTbCfg
->
keep
));
buf
=
taosDecodeFixedU32
(
buf
,
&
(
pTbCfg
->
keep
));
buf
=
taosDecodeFixedU8
(
buf
,
&
(
pTbCfg
->
type
));
if
(
pTbCfg
->
type
==
META_SUPER_TABLE
)
{
if
(
pTbCfg
->
type
==
META_SUPER_TABLE
)
{
buf
=
tdDecodeSchema
(
buf
,
&
(
pTbCfg
->
stbCfg
.
pTagSchema
));
buf
=
tdDecodeSchema
(
buf
,
&
(
pTbCfg
->
stbCfg
.
pTagSchema
));
...
@@ -408,22 +397,11 @@ static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) {
...
@@ -408,22 +397,11 @@ static void *metaDecodeTbInfo(void *buf, STbCfg *pTbCfg) {
return
buf
;
return
buf
;
}
}
static int metaSaveTbInfo(DB *pDB, tb_uid_t uid, STbCfg *pTbCfg) {
static
void
metaClearTbCfg
(
STbCfg
*
pTbCfg
)
{
DBT key = {0};
tfree
(
pTbCfg
->
name
);
DBT value = {0};
if
(
pTbCfg
->
type
==
META_SUPER_TABLE
)
{
char buf[512];
tdFreeSchema
(
pTbCfg
->
stbCfg
.
pTagSchema
);
void *pBuf = buf;
}
else
if
(
pTbCfg
->
type
==
META_CHILD_TABLE
)
{
tfree
(
pTbCfg
->
ctbCfg
.
pTag
);
key.data = &uid;
}
key.size = sizeof(uid);
}
\ No newline at end of file
metaEncodeTbInfo(&pBuf, pTbCfg);
value.data = buf;
value.size = POINTER_DISTANCE(pBuf, buf);
pDB->put(pDB, NULL, &key, &value, 0);
return 0;
}
#endif
\ No newline at end of file
source/libs/index/src/index_fst.c
浏览文件 @
90512f6e
...
@@ -297,15 +297,16 @@ void fstStateCompileForAnyTrans(FstCountingWriter *w, CompiledAddr addr, FstBuil
...
@@ -297,15 +297,16 @@ void fstStateCompileForAnyTrans(FstCountingWriter *w, CompiledAddr addr, FstBuil
// any value greater than or equal to the number of transitions in
// any value greater than or equal to the number of transitions in
// this node indicates an absent transition.
// this node indicates an absent transition.
uint8_t
*
index
=
(
uint8_t
*
)
malloc
(
sizeof
(
uint8_t
)
*
256
);
uint8_t
*
index
=
(
uint8_t
*
)
malloc
(
sizeof
(
uint8_t
)
*
256
);
for
(
uint8_t
i
=
0
;
i
<
256
;
i
++
)
{
memset
(
index
,
255
,
sizeof
(
uint8_t
)
*
256
);
index
[
i
]
=
255
;
///for (uint8_t i = 0; i < 256; i++) {
}
// index[i] = 255;
///}
for
(
size_t
i
=
0
;
i
<
sz
;
i
++
)
{
for
(
size_t
i
=
0
;
i
<
sz
;
i
++
)
{
FstTransition
*
t
=
taosArrayGet
(
node
->
trans
,
i
);
FstTransition
*
t
=
taosArrayGet
(
node
->
trans
,
i
);
index
[
t
->
inp
]
=
i
;
index
[
t
->
inp
]
=
i
;
fstCountingWriterWrite
(
w
,
(
char
*
)
index
,
sizeof
(
index
));
//fstPackDeltaIn(w, addr, t->addr, tSize);
//fstPackDeltaIn(w, addr, t->addr, tSize);
}
}
fstCountingWriterWrite
(
w
,
(
char
*
)
index
,
256
);
free
(
index
);
free
(
index
);
}
}
fstCountingWriterWrite
(
w
,
(
char
*
)
&
packSizes
,
1
);
fstCountingWriterWrite
(
w
,
(
char
*
)
&
packSizes
,
1
);
...
@@ -478,6 +479,7 @@ Output fstStateOutputForAnyTrans(FstState *s, FstNode *node, uint64_t i) {
...
@@ -478,6 +479,7 @@ Output fstStateOutputForAnyTrans(FstState *s, FstNode *node, uint64_t i) {
return
0
;
return
0
;
}
}
FstSlice
*
slice
=
&
node
->
data
;
FstSlice
*
slice
=
&
node
->
data
;
uint8_t
*
data
=
fstSliceData
(
slice
,
NULL
);
uint64_t
at
=
node
->
start
uint64_t
at
=
node
->
start
-
fstStateNtransLen
(
s
)
-
fstStateNtransLen
(
s
)
-
1
// pack size
-
1
// pack size
...
@@ -485,7 +487,6 @@ Output fstStateOutputForAnyTrans(FstState *s, FstNode *node, uint64_t i) {
...
@@ -485,7 +487,6 @@ Output fstStateOutputForAnyTrans(FstState *s, FstNode *node, uint64_t i) {
-
(
i
*
oSizes
)
-
(
i
*
oSizes
)
-
oSizes
;
-
oSizes
;
uint8_t
*
data
=
fstSliceData
(
slice
,
NULL
);
return
unpackUint64
(
data
+
at
,
oSizes
);
return
unpackUint64
(
data
+
at
,
oSizes
);
}
}
...
@@ -555,6 +556,7 @@ Output fstStateFinalOutput(FstState *s, uint64_t version, FstSlice *slice, Pack
...
@@ -555,6 +556,7 @@ Output fstStateFinalOutput(FstState *s, uint64_t version, FstSlice *slice, Pack
uint64_t
at
=
FST_SLICE_LEN
(
slice
)
uint64_t
at
=
FST_SLICE_LEN
(
slice
)
-
1
-
1
-
fstStateNtransLen
(
s
)
-
fstStateNtransLen
(
s
)
-
1
// pack size
-
fstStateTotalTransSize
(
s
,
version
,
sizes
,
nTrans
)
-
fstStateTotalTransSize
(
s
,
version
,
sizes
,
nTrans
)
-
(
nTrans
*
oSizes
)
-
(
nTrans
*
oSizes
)
-
oSizes
;
-
oSizes
;
...
@@ -587,7 +589,8 @@ uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) {
...
@@ -587,7 +589,8 @@ uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) {
FstSlice
t
=
fstSliceCopy
(
slice
,
start
,
end
-
1
);
FstSlice
t
=
fstSliceCopy
(
slice
,
start
,
end
-
1
);
int32_t
len
=
0
;
int32_t
len
=
0
;
uint8_t
*
data
=
fstSliceData
(
&
t
,
&
len
);
uint8_t
*
data
=
fstSliceData
(
&
t
,
&
len
);
for
(
int
i
=
0
;
i
<
len
;
i
++
)
{
int
i
=
0
;
for
(;
i
<
len
;
i
++
)
{
//uint8_t v = slice->data[slice->start + i];
//uint8_t v = slice->data[slice->start + i];
////slice->data[slice->start + i];
////slice->data[slice->start + i];
uint8_t
v
=
data
[
i
];
uint8_t
v
=
data
[
i
];
...
@@ -595,6 +598,7 @@ uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) {
...
@@ -595,6 +598,7 @@ uint64_t fstStateFindInput(FstState *s, FstNode *node, uint8_t b, bool *null) {
return
node
->
nTrans
-
i
-
1
;
// bug
return
node
->
nTrans
-
i
-
1
;
// bug
}
}
}
}
if
(
i
==
len
)
{
*
null
=
true
;
}
}
}
}
}
...
@@ -774,7 +778,7 @@ FstBuilder *fstBuilderCreate(void *w, FstType ty) {
...
@@ -774,7 +778,7 @@ FstBuilder *fstBuilderCreate(void *w, FstType ty) {
if
(
NULL
==
b
)
{
return
b
;
}
if
(
NULL
==
b
)
{
return
b
;
}
b
->
wrt
=
fstCountingWriterCreate
(
w
,
false
);
b
->
wrt
=
fstCountingWriterCreate
(
w
,
false
);
b
->
unfinished
=
fstUnFinishedNodesCreate
();
b
->
unfinished
=
fstUnFinishedNodesCreate
();
b
->
registry
=
fstRegistryCreate
(
10000
,
2
)
;
b
->
registry
=
fstRegistryCreate
(
10000
,
2
)
;
b
->
last
=
fstSliceCreate
(
NULL
,
0
);
b
->
last
=
fstSliceCreate
(
NULL
,
0
);
...
@@ -857,6 +861,7 @@ OrderType fstBuilderCheckLastKey(FstBuilder *b, FstSlice bs, bool ckDup) {
...
@@ -857,6 +861,7 @@ OrderType fstBuilderCheckLastKey(FstBuilder *b, FstSlice bs, bool ckDup) {
return
OutOfOrdered
;
return
OutOfOrdered
;
}
}
// deep copy or not
// deep copy or not
fstSliceDestroy
(
&
b
->
last
);
b
->
last
=
fstSliceCopy
(
&
bs
,
input
->
start
,
input
->
end
);
b
->
last
=
fstSliceCopy
(
&
bs
,
input
->
start
,
input
->
end
);
}
}
return
Ordered
;
return
Ordered
;
...
@@ -1007,8 +1012,7 @@ Fst* fstCreate(FstSlice *slice) {
...
@@ -1007,8 +1012,7 @@ Fst* fstCreate(FstSlice *slice) {
uint64_t
fstLen
;
uint64_t
fstLen
;
len
-=
sizeof
(
fstLen
);
len
-=
sizeof
(
fstLen
);
taosDecodeFixedU64
(
buf
+
len
,
&
fstLen
);
taosDecodeFixedU64
(
buf
+
len
,
&
fstLen
);
//TODO(validat root addr)
//TODO(validate root addr)
//
Fst
*
fst
=
(
Fst
*
)
calloc
(
1
,
sizeof
(
Fst
));
Fst
*
fst
=
(
Fst
*
)
calloc
(
1
,
sizeof
(
Fst
));
if
(
fst
==
NULL
)
{
return
NULL
;
}
if
(
fst
==
NULL
)
{
return
NULL
;
}
...
@@ -1023,6 +1027,7 @@ Fst* fstCreate(FstSlice *slice) {
...
@@ -1023,6 +1027,7 @@ Fst* fstCreate(FstSlice *slice) {
fst
->
meta
->
len
=
fstLen
;
fst
->
meta
->
len
=
fstLen
;
fst
->
meta
->
checkSum
=
checkSum
;
fst
->
meta
->
checkSum
=
checkSum
;
fst
->
data
=
slice
;
fst
->
data
=
slice
;
return
fst
;
return
fst
;
FST_CREAT_FAILED:
FST_CREAT_FAILED:
...
@@ -1122,6 +1127,7 @@ FstBoundWithData* fstBoundStateCreate(FstBound type, FstSlice *data) {
...
@@ -1122,6 +1127,7 @@ FstBoundWithData* fstBoundStateCreate(FstBound type, FstSlice *data) {
return
b
;
return
b
;
}
}
bool
fstBoundWithDataExceededBy
(
FstBoundWithData
*
bound
,
FstSlice
*
slice
)
{
bool
fstBoundWithDataExceededBy
(
FstBoundWithData
*
bound
,
FstSlice
*
slice
)
{
int
comp
=
fstSliceCompare
(
slice
,
&
bound
->
data
);
int
comp
=
fstSliceCompare
(
slice
,
&
bound
->
data
);
if
(
bound
->
type
==
Included
)
{
if
(
bound
->
type
==
Included
)
{
...
@@ -1374,7 +1380,9 @@ FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, Automation *aut) {
...
@@ -1374,7 +1380,9 @@ FstStreamBuilder *fstStreamBuilderCreate(Fst *fst, Automation *aut) {
}
}
void
fstStreamBuilderDestroy
(
FstStreamBuilder
*
b
)
{
void
fstStreamBuilderDestroy
(
FstStreamBuilder
*
b
)
{
fstSliceDestroy
(
&
b
->
min
->
data
);
fstSliceDestroy
(
&
b
->
min
->
data
);
tfree
(
b
->
min
);
fstSliceDestroy
(
&
b
->
max
->
data
);
fstSliceDestroy
(
&
b
->
max
->
data
);
tfree
(
b
->
max
);
free
(
b
);
free
(
b
);
}
}
FstStreamBuilder
*
fstStreamBuilderRange
(
FstStreamBuilder
*
b
,
FstSlice
*
val
,
RangeType
type
)
{
FstStreamBuilder
*
fstStreamBuilderRange
(
FstStreamBuilder
*
b
,
FstSlice
*
val
,
RangeType
type
)
{
...
...
source/libs/index/test/indexTests.cpp
浏览文件 @
90512f6e
...
@@ -2,13 +2,79 @@
...
@@ -2,13 +2,79 @@
#include <string>
#include <string>
#include <iostream>
#include <iostream>
#include "index.h"
#include "index.h"
#include "tutil.h"
#include "indexInt.h"
#include "indexInt.h"
#include "index_fst.h"
#include "index_fst.h"
#include "index_fst_util.h"
#include "index_fst_util.h"
#include "index_fst_counting_writer.h"
#include "index_fst_counting_writer.h"
class
FstWriter
{
public:
FstWriter
()
{
_b
=
fstBuilderCreate
(
NULL
,
0
);
}
bool
Put
(
const
std
::
string
&
key
,
uint64_t
val
)
{
FstSlice
skey
=
fstSliceCreate
((
uint8_t
*
)
key
.
c_str
(),
key
.
size
());
bool
ok
=
fstBuilderInsert
(
_b
,
skey
,
val
);
fstSliceDestroy
(
&
skey
);
return
ok
;
}
~
FstWriter
()
{
fstBuilderFinish
(
_b
);
fstBuilderDestroy
(
_b
);
}
private:
FstBuilder
*
_b
;
};
class
FstReadMemory
{
public:
FstReadMemory
(
size_t
size
)
{
_w
=
fstCountingWriterCreate
(
NULL
,
true
);
_size
=
size
;
memset
((
void
*
)
&
_s
,
0
,
sizeof
(
_s
));
}
bool
init
()
{
char
*
buf
=
(
char
*
)
calloc
(
1
,
sizeof
(
char
)
*
_size
);
int
nRead
=
fstCountingWriterRead
(
_w
,
(
uint8_t
*
)
buf
,
_size
);
if
(
nRead
<=
0
)
{
return
false
;
}
_size
=
nRead
;
_s
=
fstSliceCreate
((
uint8_t
*
)
buf
,
_size
);
_fst
=
fstCreate
(
&
_s
);
free
(
buf
);
return
_fst
!=
NULL
;
}
bool
Get
(
const
std
::
string
&
key
,
uint64_t
*
val
)
{
FstSlice
skey
=
fstSliceCreate
((
uint8_t
*
)
key
.
c_str
(),
key
.
size
());
bool
ok
=
fstGet
(
_fst
,
&
skey
,
val
);
fstSliceDestroy
(
&
skey
);
return
ok
;
}
bool
GetWithTimeCostUs
(
const
std
::
string
&
key
,
uint64_t
*
val
,
uint64_t
*
elapse
)
{
int64_t
s
=
taosGetTimestampUs
();
bool
ok
=
this
->
Get
(
key
,
val
);
int64_t
e
=
taosGetTimestampUs
();
*
elapse
=
e
-
s
;
return
ok
;
}
// add later
bool
Search
(
const
std
::
string
&
key
,
std
::
vector
<
uint64_t
>
&
result
)
{
return
true
;
}
~
FstReadMemory
()
{
fstCountingWriterDestroy
(
_w
);
fstSliceDestroy
(
&
_s
);
}
private:
FstCountingWriter
*
_w
;
Fst
*
_fst
;
FstSlice
_s
;
size_t
_size
;
};
//TEST(IndexTest, index_create_test) {
//TEST(IndexTest, index_create_test) {
// SIndexOpts *opts = indexOptsCreate();
// SIndexOpts *opts = indexOptsCreate();
...
@@ -62,69 +128,104 @@
...
@@ -62,69 +128,104 @@
// //
// //
//}
//}
int
main
(
int
argc
,
char
**
argv
)
{
// test write
FstBuilder
*
b
=
fstBuilderCreate
(
NULL
,
0
);
{
std
::
string
str
(
"aaa"
);
FstSlice
key
=
fstSliceCreate
((
uint8_t
*
)
str
.
c_str
(),
str
.
size
());
Output
val
=
1
;
fstBuilderInsert
(
b
,
key
,
val
);
}
//std::string str1("bcd");
//FstSlice key1 = fstSliceCreate((uint8_t *)str1.c_str(), str1.size());
//Output val2 = 10;
//
{
for
(
size_t
i
=
1
;
i
<
26
;
i
++
)
{
std
::
string
str
(
"aaa"
);
str
[
2
]
=
'a'
+
i
;
FstSlice
key
=
fstSliceCreate
((
uint8_t
*
)
str
.
c_str
(),
str
.
size
());
Output
val
=
0
;
fstBuilderInsert
(
b
,
key
,
val
);
}
}
fstBuilderFinish
(
b
);
fstBuilderDestroy
(
b
);
int
Performance_fstWriteRecords
(
FstWriter
*
b
)
{
std
::
string
str
(
"aa"
);
int
L
=
100
,
M
=
100
,
N
=
10
;
for
(
int
i
=
0
;
i
<
L
;
i
++
)
{
str
[
0
]
=
'a'
+
i
;
str
.
resize
(
2
);
for
(
int
j
=
0
;
j
<
M
;
j
++
)
{
str
[
1
]
=
'a'
+
j
;
str
.
resize
(
2
);
for
(
int
k
=
0
;
k
<
N
;
k
++
)
{
str
.
push_back
(
'a'
);
b
->
Put
(
str
,
k
);
printf
(
"(%d, %d, %d, %s)
\n
"
,
i
,
j
,
k
,
str
.
c_str
());
}
}
}
return
L
*
M
*
N
;
}
char
buf
[
64
*
1024
]
=
{
0
};
void
Performance_fstReadRecords
(
FstReadMemory
*
m
)
{
std
::
string
str
(
"a"
);
for
(
int
i
=
0
;
i
<
50
;
i
++
)
{
//std::string str("aa");
str
.
push_back
(
'a'
);
uint64_t
out
,
cost
;
bool
ok
=
m
->
GetWithTimeCostUs
(
str
,
&
out
,
&
cost
);
if
(
ok
==
true
)
{
printf
(
"success to get (%s, %"
PRId64
"), time cost: %"
PRId64
")
\n
"
,
str
.
c_str
(),
out
,
cost
);
}
else
{
printf
(
"failed to get(%s)
\n
"
,
str
.
c_str
());
}
}
}
void
checkFstPerf
()
{
FstWriter
*
fw
=
new
FstWriter
;
int64_t
s
=
taosGetTimestampUs
();
int
num
=
Performance_fstWriteRecords
(
fw
);
int64_t
e
=
taosGetTimestampUs
();
printf
(
"write %d record cost %"
PRId64
"us
\n
"
,
num
,
e
-
s
);
delete
fw
;
FstReadMemory
*
m
=
new
FstReadMemory
(
1024
*
64
);
if
(
m
->
init
())
{
uint64_t
val
;
if
(
m
->
Get
(
"aaaaaaa"
,
&
val
))
{
std
::
cout
<<
"succes to Get val: "
<<
val
<<
std
::
endl
;
}
else
{
std
::
cout
<<
"failed to Get "
<<
std
::
endl
;
}
}
}
FstSlice
s
;
FstCountingWriter
*
w
=
fstCountingWriterCreate
(
NULL
,
true
);
void
validateFst
()
{
int
nRead
=
fstCountingWriterRead
(
w
,
(
uint8_t
*
)
buf
,
sizeof
(
buf
));
int
val
=
100
;
assert
(
nRead
<=
sizeof
(
buf
));
int
count
=
100
;
s
=
fstSliceCreate
((
uint8_t
*
)
buf
,
nRead
);
FstWriter
*
fw
=
new
FstWriter
;
fstCountingWriterDestroy
(
w
);
// write
{
std
::
string
key
(
"ab"
);
for
(
int
i
=
0
;
i
<
count
;
i
++
)
{
key
.
push_back
(
'a'
+
i
);
fw
->
Put
(
key
,
val
-
i
);
}
}
delete
fw
;
// read
FstReadMemory
*
m
=
new
FstReadMemory
(
1024
*
64
);
if
(
m
->
init
()
==
false
)
{
std
::
cout
<<
"init readMemory failed"
<<
std
::
endl
;
}
// test reader
Fst
*
fst
=
fstCreate
(
&
s
);
{
{
std
::
string
str
(
"aax"
);
std
::
string
key
(
"ab"
);
uint64_t
out
;
uint64_t
out
;
if
(
m
->
Get
(
key
,
&
out
))
{
printf
(
"success to get (%s, %"
PRId64
")
\n
"
,
key
.
c_str
(),
out
);
FstSlice
key
=
fstSliceCreate
((
uint8_t
*
)
str
.
c_str
(),
str
.
size
());
}
else
{
bool
ok
=
fstGet
(
fst
,
&
key
,
&
out
);
printf
(
"failed to get(%s)
\n
"
,
key
.
c_str
());
if
(
ok
==
true
)
{
}
printf
(
"val = %d
\n
"
,
out
);
for
(
int
i
=
0
;
i
<
count
;
i
++
)
{
//indexInfo("Get key-value success, %s, %d", str.c_str(), out);
key
.
push_back
(
'a'
+
i
);
}
else
{
if
(
m
->
Get
(
key
,
&
out
)
)
{
//indexError("Get key-value failed, %s", str.c_str());
assert
(
val
-
i
==
out
);
printf
(
"success to get (%s, %"
PRId64
")
\n
"
,
key
.
c_str
(),
out
);
}
else
{
printf
(
"failed to get(%s)
\n
"
,
key
.
c_str
());
}
}
}
}
fstSliceDestroy
(
&
s
);
}
delete
m
;
}
int
main
(
int
argc
,
char
**
argv
)
{
checkFstPerf
();
return
1
;
return
1
;
}
}
...
...
source/libs/transport/src/rpcMain.c
浏览文件 @
90512f6e
...
@@ -1523,14 +1523,14 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
...
@@ -1523,14 +1523,14 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
}
}
static
int
rpcAuthenticateMsg
(
void
*
pMsg
,
int
msgLen
,
void
*
pAuth
,
void
*
pKey
)
{
static
int
rpcAuthenticateMsg
(
void
*
pMsg
,
int
msgLen
,
void
*
pAuth
,
void
*
pKey
)
{
MD5_CTX
context
;
T_
MD5_CTX
context
;
int
ret
=
-
1
;
int
ret
=
-
1
;
MD5Init
(
&
context
);
t
MD5Init
(
&
context
);
MD5Update
(
&
context
,
(
uint8_t
*
)
pKey
,
TSDB_KEY_LEN
);
t
MD5Update
(
&
context
,
(
uint8_t
*
)
pKey
,
TSDB_KEY_LEN
);
MD5Update
(
&
context
,
(
uint8_t
*
)
pMsg
,
msgLen
);
t
MD5Update
(
&
context
,
(
uint8_t
*
)
pMsg
,
msgLen
);
MD5Update
(
&
context
,
(
uint8_t
*
)
pKey
,
TSDB_KEY_LEN
);
t
MD5Update
(
&
context
,
(
uint8_t
*
)
pKey
,
TSDB_KEY_LEN
);
MD5Final
(
&
context
);
t
MD5Final
(
&
context
);
if
(
memcmp
(
context
.
digest
,
pAuth
,
sizeof
(
context
.
digest
))
==
0
)
ret
=
0
;
if
(
memcmp
(
context
.
digest
,
pAuth
,
sizeof
(
context
.
digest
))
==
0
)
ret
=
0
;
...
@@ -1538,13 +1538,13 @@ static int rpcAuthenticateMsg(void *pMsg, int msgLen, void *pAuth, void *pKey) {
...
@@ -1538,13 +1538,13 @@ static int rpcAuthenticateMsg(void *pMsg, int msgLen, void *pAuth, void *pKey) {
}
}
static
void
rpcBuildAuthHead
(
void
*
pMsg
,
int
msgLen
,
void
*
pAuth
,
void
*
pKey
)
{
static
void
rpcBuildAuthHead
(
void
*
pMsg
,
int
msgLen
,
void
*
pAuth
,
void
*
pKey
)
{
MD5_CTX
context
;
T_
MD5_CTX
context
;
MD5Init
(
&
context
);
t
MD5Init
(
&
context
);
MD5Update
(
&
context
,
(
uint8_t
*
)
pKey
,
TSDB_KEY_LEN
);
t
MD5Update
(
&
context
,
(
uint8_t
*
)
pKey
,
TSDB_KEY_LEN
);
MD5Update
(
&
context
,
(
uint8_t
*
)
pMsg
,
msgLen
);
t
MD5Update
(
&
context
,
(
uint8_t
*
)
pMsg
,
msgLen
);
MD5Update
(
&
context
,
(
uint8_t
*
)
pKey
,
TSDB_KEY_LEN
);
t
MD5Update
(
&
context
,
(
uint8_t
*
)
pKey
,
TSDB_KEY_LEN
);
MD5Final
(
&
context
);
t
MD5Final
(
&
context
);
memcpy
(
pAuth
,
context
.
digest
,
sizeof
(
context
.
digest
));
memcpy
(
pAuth
,
context
.
digest
,
sizeof
(
context
.
digest
));
}
}
...
...
source/libs/wal/CMakeLists.txt
浏览文件 @
90512f6e
...
@@ -8,6 +8,11 @@ target_include_directories(
...
@@ -8,6 +8,11 @@ target_include_directories(
target_link_libraries
(
target_link_libraries
(
wal
wal
PUBLIC cjson
PUBLIC os
PUBLIC os
PUBLIC util
PUBLIC util
)
)
if
(
${
BUILD_TEST
}
)
add_subdirectory
(
test
)
endif
(
${
BUILD_TEST
}
)
source/libs/wal/inc/walInt.h
浏览文件 @
90512f6e
...
@@ -23,9 +23,73 @@
...
@@ -23,9 +23,73 @@
extern
"C"
{
extern
"C"
{
#endif
#endif
int
walGetFile
(
SWal
*
pWal
,
int32_t
version
);
//meta section begin
typedef
struct
WalFileInfo
{
int64_t
firstVer
;
int64_t
lastVer
;
int64_t
createTs
;
int64_t
closeTs
;
int64_t
fileSize
;
}
WalFileInfo
;
static
inline
int32_t
compareWalFileInfo
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
WalFileInfo
*
pInfoLeft
=
(
WalFileInfo
*
)
pLeft
;
WalFileInfo
*
pInfoRight
=
(
WalFileInfo
*
)
pRight
;
return
compareInt64Val
(
&
pInfoLeft
->
firstVer
,
&
pInfoRight
->
firstVer
);
}
static
inline
int64_t
walGetLastFileSize
(
SWal
*
pWal
)
{
WalFileInfo
*
pInfo
=
(
WalFileInfo
*
)
taosArrayGetLast
(
pWal
->
fileInfoSet
);
return
pInfo
->
fileSize
;
}
static
inline
int64_t
walGetLastFileFirstVer
(
SWal
*
pWal
)
{
WalFileInfo
*
pInfo
=
(
WalFileInfo
*
)
taosArrayGetLast
(
pWal
->
fileInfoSet
);
return
pInfo
->
firstVer
;
}
static
inline
int64_t
walGetCurFileFirstVer
(
SWal
*
pWal
)
{
WalFileInfo
*
pInfo
=
(
WalFileInfo
*
)
taosArrayGet
(
pWal
->
fileInfoSet
,
pWal
->
writeCur
);
return
pInfo
->
firstVer
;
}
static
inline
int64_t
walGetCurFileLastVer
(
SWal
*
pWal
)
{
WalFileInfo
*
pInfo
=
(
WalFileInfo
*
)
taosArrayGet
(
pWal
->
fileInfoSet
,
pWal
->
writeCur
);
return
pInfo
->
firstVer
;
}
static
inline
int64_t
walGetCurFileOffset
(
SWal
*
pWal
)
{
WalFileInfo
*
pInfo
=
(
WalFileInfo
*
)
taosArrayGet
(
pWal
->
fileInfoSet
,
pWal
->
writeCur
);
return
pInfo
->
fileSize
;
}
static
inline
bool
walCurFileClosed
(
SWal
*
pWal
)
{
return
taosArrayGetSize
(
pWal
->
fileInfoSet
)
!=
pWal
->
writeCur
;
}
static
inline
WalFileInfo
*
walGetCurFileInfo
(
SWal
*
pWal
)
{
return
(
WalFileInfo
*
)
taosArrayGet
(
pWal
->
fileInfoSet
,
pWal
->
writeCur
);
}
static
inline
int
walBuildLogName
(
SWal
*
pWal
,
int64_t
fileFirstVer
,
char
*
buf
)
{
return
sprintf
(
buf
,
"%s/%"
PRId64
"."
WAL_LOG_SUFFIX
,
pWal
->
path
,
fileFirstVer
);
}
static
inline
int
walBuildIdxName
(
SWal
*
pWal
,
int64_t
fileFirstVer
,
char
*
buf
)
{
return
sprintf
(
buf
,
"%s/%"
PRId64
"."
WAL_INDEX_SUFFIX
,
pWal
->
path
,
fileFirstVer
);
}
int
walReadMeta
(
SWal
*
pWal
);
int
walWriteMeta
(
SWal
*
pWal
);
int
walRollFileInfo
(
SWal
*
pWal
);
char
*
walMetaSerialize
(
SWal
*
pWal
);
int
walMetaDeserialize
(
SWal
*
pWal
,
const
char
*
bytes
);
//meta section end
int64_t
walGetSeq
();
int64_t
walGetSeq
();
int
walSeekVer
(
SWal
*
pWal
,
int64_t
ver
);
int
walRoll
(
SWal
*
pWal
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/libs/wal/src/walIndex.c
浏览文件 @
90512f6e
...
@@ -23,27 +23,27 @@
...
@@ -23,27 +23,27 @@
static
int
walSeekFilePos
(
SWal
*
pWal
,
int64_t
ver
)
{
static
int
walSeekFilePos
(
SWal
*
pWal
,
int64_t
ver
)
{
int
code
=
0
;
int
code
=
0
;
int64_t
idxTfd
=
pWal
->
cur
IdxTfd
;
int64_t
idxTfd
=
pWal
->
write
IdxTfd
;
int64_t
logTfd
=
pWal
->
cur
LogTfd
;
int64_t
logTfd
=
pWal
->
write
LogTfd
;
//seek position
//seek position
int64_t
offset
=
(
ver
-
pWal
->
curFileFirstVersion
)
*
WAL_IDX_ENTRY_SIZE
;
int64_t
offset
=
(
ver
-
walGetCurFileFirstVer
(
pWal
)
)
*
WAL_IDX_ENTRY_SIZE
;
code
=
tfLseek
(
idxTfd
,
offset
,
SEEK_SET
);
code
=
tfLseek
(
idxTfd
,
offset
,
SEEK_SET
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
return
-
1
;
}
}
int64_t
readBuf
[
2
];
int64_t
readBuf
[
2
];
code
=
tfRead
(
idxTfd
,
readBuf
,
sizeof
(
readBuf
));
code
=
tfRead
(
idxTfd
,
readBuf
,
sizeof
(
readBuf
));
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
return
-
1
;
}
}
//TODO:deserialize
//TODO:deserialize
ASSERT
(
readBuf
[
0
]
==
ver
);
ASSERT
(
readBuf
[
0
]
==
ver
);
code
=
tfLseek
(
logTfd
,
readBuf
[
1
],
SEEK_CUR
);
code
=
tfLseek
(
logTfd
,
readBuf
[
1
],
SEEK_CUR
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
return
-
1
;
}
}
pWal
->
curLogOffset
=
readBuf
[
1
];
/*pWal->curLogOffset = readBuf[1];*/
pWal
->
curVersion
=
ver
;
pWal
->
curVersion
=
ver
;
return
code
;
return
code
;
}
}
...
@@ -52,43 +52,43 @@ static int walChangeFile(SWal *pWal, int64_t ver) {
...
@@ -52,43 +52,43 @@ static int walChangeFile(SWal *pWal, int64_t ver) {
int
code
=
0
;
int
code
=
0
;
int64_t
idxTfd
,
logTfd
;
int64_t
idxTfd
,
logTfd
;
char
fnameStr
[
WAL_FILE_LEN
];
char
fnameStr
[
WAL_FILE_LEN
];
code
=
tfClose
(
pWal
->
cur
LogTfd
);
code
=
tfClose
(
pWal
->
write
LogTfd
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
//TODO
//TODO
}
}
code
=
tfClose
(
pWal
->
cur
IdxTfd
);
code
=
tfClose
(
pWal
->
write
IdxTfd
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
//TODO
//TODO
}
}
WalFileInfo
tmpInfo
;
tmpInfo
.
firstVer
=
ver
;
//bsearch in fileSet
//bsearch in fileSet
int64_t
*
pRet
=
taosArraySearch
(
pWal
->
fileSet
,
&
ver
,
compareInt64Val
,
TD_LE
);
WalFileInfo
*
pRet
=
taosArraySearch
(
pWal
->
fileInfoSet
,
&
tmpInfo
,
compareWalFileInfo
,
TD_LE
);
ASSERT
(
pRet
!=
NULL
);
ASSERT
(
pRet
!=
NULL
);
int64_t
fname
=
*
pRet
;
int64_t
fileFirstVer
=
pRet
->
firstVer
;
if
(
fname
<
pWal
->
lastFileName
)
{
//closed
if
(
taosArrayGetLast
(
pWal
->
fileInfoSet
)
!=
pRet
)
{
pWal
->
curStatus
&=
~
WAL_CUR_FILE_WRITABLE
;
pWal
->
curStatus
&=
~
WAL_CUR_FILE_WRITABLE
;
pWal
->
curFileLastVersion
=
pRet
[
1
]
-
1
;
walBuildIdxName
(
pWal
,
fileFirstVer
,
fnameStr
);
sprintf
(
fnameStr
,
"%"
PRId64
"."
WAL_INDEX_SUFFIX
,
fname
);
idxTfd
=
tfOpenRead
(
fnameStr
);
idxTfd
=
tfOpenRead
(
fnameStr
);
sprintf
(
fnameStr
,
"%"
PRId64
"."
WAL_LOG_SUFFIX
,
fname
);
walBuildLogName
(
pWal
,
fileFirstVer
,
fnameStr
);
logTfd
=
tfOpenRead
(
fnameStr
);
logTfd
=
tfOpenRead
(
fnameStr
);
}
else
{
}
else
{
pWal
->
curStatus
|=
WAL_CUR_FILE_WRITABLE
;
pWal
->
curStatus
|=
WAL_CUR_FILE_WRITABLE
;
pWal
->
curFileLastVersion
=
-
1
;
walBuildIdxName
(
pWal
,
fileFirstVer
,
fnameStr
);
sprintf
(
fnameStr
,
"%"
PRId64
"."
WAL_INDEX_SUFFIX
,
fname
);
idxTfd
=
tfOpenReadWrite
(
fnameStr
);
idxTfd
=
tfOpenReadWrite
(
fnameStr
);
sprintf
(
fnameStr
,
"%"
PRId64
"."
WAL_LOG_SUFFIX
,
fname
);
walBuildLogName
(
pWal
,
fileFirstVer
,
fnameStr
);
logTfd
=
tfOpenReadWrite
(
fnameStr
);
logTfd
=
tfOpenReadWrite
(
fnameStr
);
}
}
pWal
->
curFileFirstVersion
=
fname
;
pWal
->
writeLogTfd
=
logTfd
;
pWal
->
curLogTfd
=
logTfd
;
pWal
->
writeIdxTfd
=
idxTfd
;
pWal
->
curIdxTfd
=
idxTfd
;
return
code
;
return
code
;
}
}
int
walSeekVer
(
SWal
*
pWal
,
int64_t
ver
)
{
int
walSeekVer
(
SWal
*
pWal
,
int64_t
ver
)
{
i
f
((
!
(
pWal
->
curStatus
&
WAL_CUR_FAILED
))
i
nt
code
;
&&
ver
==
pWal
->
curVersion
)
{
if
((
!
(
pWal
->
curStatus
&
WAL_CUR_FAILED
))
&&
ver
==
pWal
->
curVersion
)
{
return
0
;
return
0
;
}
}
if
(
ver
>
pWal
->
lastVersion
)
{
if
(
ver
>
pWal
->
lastVersion
)
{
...
@@ -102,11 +102,16 @@ int walSeekVer(SWal *pWal, int64_t ver) {
...
@@ -102,11 +102,16 @@ int walSeekVer(SWal *pWal, int64_t ver) {
if
(
ver
<
pWal
->
snapshotVersion
)
{
if
(
ver
<
pWal
->
snapshotVersion
)
{
//TODO: seek snapshotted log, invalid in some cases
//TODO: seek snapshotted log, invalid in some cases
}
}
if
(
ver
<
pWal
->
curFileFirstVersion
||
if
(
ver
<
walGetCurFileFirstVer
(
pWal
)
||
(
ver
>
walGetCurFileLastVer
(
pWal
)))
{
(
pWal
->
curFileLastVersion
!=
-
1
&&
ver
>
pWal
->
curFileLastVersion
))
{
code
=
walChangeFile
(
pWal
,
ver
);
walChangeFile
(
pWal
,
ver
);
if
(
code
!=
0
)
{
return
-
1
;
}
}
}
walSeekFilePos
(
pWal
,
ver
);
code
=
walSeekFilePos
(
pWal
,
ver
);
if
(
code
!=
0
)
{
return
-
1
;
}
return
0
;
return
0
;
}
}
source/libs/wal/src/walMeta.c
0 → 100644
浏览文件 @
90512f6e
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tref.h"
#include "tfile.h"
#include "cJSON.h"
#include "walInt.h"
#include <libgen.h>
#include <regex.h>
int
walRollFileInfo
(
SWal
*
pWal
)
{
int64_t
ts
=
taosGetTimestampSec
();
SArray
*
pArray
=
pWal
->
fileInfoSet
;
if
(
taosArrayGetSize
(
pArray
)
!=
0
)
{
WalFileInfo
*
pInfo
=
taosArrayGetLast
(
pArray
);
pInfo
->
lastVer
=
pWal
->
lastVersion
;
pInfo
->
closeTs
=
ts
;
}
WalFileInfo
*
pNewInfo
=
malloc
(
sizeof
(
WalFileInfo
));
if
(
pNewInfo
==
NULL
)
{
return
-
1
;
}
pNewInfo
->
firstVer
=
pWal
->
lastVersion
+
1
;
pNewInfo
->
lastVer
=
-
1
;
pNewInfo
->
createTs
=
ts
;
pNewInfo
->
closeTs
=
-
1
;
pNewInfo
->
fileSize
=
0
;
taosArrayPush
(
pWal
->
fileInfoSet
,
pNewInfo
);
return
0
;
}
char
*
walMetaSerialize
(
SWal
*
pWal
)
{
char
buf
[
30
];
if
(
pWal
==
NULL
||
pWal
->
fileInfoSet
==
NULL
)
return
0
;
int
sz
=
pWal
->
fileInfoSet
->
size
;
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON
*
pMeta
=
cJSON_CreateObject
();
cJSON
*
pFiles
=
cJSON_CreateArray
();
cJSON
*
pField
;
if
(
pRoot
==
NULL
||
pMeta
==
NULL
||
pFiles
==
NULL
)
{
//TODO
return
NULL
;
}
cJSON_AddItemToObject
(
pRoot
,
"meta"
,
pMeta
);
sprintf
(
buf
,
"%"
PRId64
,
pWal
->
firstVersion
);
cJSON_AddStringToObject
(
pMeta
,
"firstVer"
,
buf
);
sprintf
(
buf
,
"%"
PRId64
,
pWal
->
snapshotVersion
);
cJSON_AddStringToObject
(
pMeta
,
"snapshotVer"
,
buf
);
sprintf
(
buf
,
"%"
PRId64
,
pWal
->
commitVersion
);
cJSON_AddStringToObject
(
pMeta
,
"commitVer"
,
buf
);
sprintf
(
buf
,
"%"
PRId64
,
pWal
->
lastVersion
);
cJSON_AddStringToObject
(
pMeta
,
"lastVer"
,
buf
);
cJSON_AddItemToObject
(
pRoot
,
"files"
,
pFiles
);
WalFileInfo
*
pData
=
pWal
->
fileInfoSet
->
pData
;
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
WalFileInfo
*
pInfo
=
&
pData
[
i
];
cJSON_AddItemToArray
(
pFiles
,
pField
=
cJSON_CreateObject
());
if
(
pField
==
NULL
)
{
cJSON_Delete
(
pRoot
);
return
NULL
;
}
//cjson only support int32_t or double
//string are used to prohibit the loss of precision
sprintf
(
buf
,
"%"
PRId64
,
pInfo
->
firstVer
);
cJSON_AddStringToObject
(
pField
,
"firstVer"
,
buf
);
sprintf
(
buf
,
"%"
PRId64
,
pInfo
->
lastVer
);
cJSON_AddStringToObject
(
pField
,
"lastVer"
,
buf
);
sprintf
(
buf
,
"%"
PRId64
,
pInfo
->
createTs
);
cJSON_AddStringToObject
(
pField
,
"createTs"
,
buf
);
sprintf
(
buf
,
"%"
PRId64
,
pInfo
->
closeTs
);
cJSON_AddStringToObject
(
pField
,
"closeTs"
,
buf
);
sprintf
(
buf
,
"%"
PRId64
,
pInfo
->
fileSize
);
cJSON_AddStringToObject
(
pField
,
"fileSize"
,
buf
);
}
return
cJSON_Print
(
pRoot
);
}
int
walMetaDeserialize
(
SWal
*
pWal
,
const
char
*
bytes
)
{
ASSERT
(
taosArrayGetSize
(
pWal
->
fileInfoSet
)
==
0
);
cJSON
*
pRoot
,
*
pMeta
,
*
pFiles
,
*
pInfoJson
,
*
pField
;
pRoot
=
cJSON_Parse
(
bytes
);
pMeta
=
cJSON_GetObjectItem
(
pRoot
,
"meta"
);
pField
=
cJSON_GetObjectItem
(
pMeta
,
"firstVer"
);
pWal
->
firstVersion
=
atoll
(
cJSON_GetStringValue
(
pField
));
pField
=
cJSON_GetObjectItem
(
pMeta
,
"snapshotVer"
);
pWal
->
snapshotVersion
=
atoll
(
cJSON_GetStringValue
(
pField
));
pField
=
cJSON_GetObjectItem
(
pMeta
,
"commitVer"
);
pWal
->
commitVersion
=
atoll
(
cJSON_GetStringValue
(
pField
));
pField
=
cJSON_GetObjectItem
(
pMeta
,
"lastVer"
);
pWal
->
lastVersion
=
atoll
(
cJSON_GetStringValue
(
pField
));
pFiles
=
cJSON_GetObjectItem
(
pRoot
,
"files"
);
int
sz
=
cJSON_GetArraySize
(
pFiles
);
//deserialize
SArray
*
pArray
=
taosArrayInit
(
sz
,
sizeof
(
WalFileInfo
));
WalFileInfo
*
pData
=
pArray
->
pData
;
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
cJSON
*
pInfoJson
=
cJSON_GetArrayItem
(
pFiles
,
i
);
WalFileInfo
*
pInfo
=
&
pData
[
i
];
pField
=
cJSON_GetObjectItem
(
pInfoJson
,
"firstVer"
);
pInfo
->
firstVer
=
atoll
(
cJSON_GetStringValue
(
pField
));
pField
=
cJSON_GetObjectItem
(
pInfoJson
,
"lastVer"
);
pInfo
->
lastVer
=
atoll
(
cJSON_GetStringValue
(
pField
));
pField
=
cJSON_GetObjectItem
(
pInfoJson
,
"createTs"
);
pInfo
->
createTs
=
atoll
(
cJSON_GetStringValue
(
pField
));
pField
=
cJSON_GetObjectItem
(
pInfoJson
,
"closeTs"
);
pInfo
->
closeTs
=
atoll
(
cJSON_GetStringValue
(
pField
));
pField
=
cJSON_GetObjectItem
(
pInfoJson
,
"fileSize"
);
pInfo
->
fileSize
=
atoll
(
cJSON_GetStringValue
(
pField
));
}
taosArraySetSize
(
pArray
,
sz
);
pWal
->
fileInfoSet
=
pArray
;
return
0
;
}
static
inline
int
walBuildMetaName
(
SWal
*
pWal
,
int
metaVer
,
char
*
buf
)
{
return
sprintf
(
buf
,
"%s/meta-ver%d"
,
pWal
->
path
,
metaVer
);
}
static
int
walFindCurMetaVer
(
SWal
*
pWal
)
{
const
char
*
pattern
=
"^meta-ver[0-9]+$"
;
regex_t
walMetaRegexPattern
;
regcomp
(
&
walMetaRegexPattern
,
pattern
,
REG_EXTENDED
);
DIR
*
dir
=
opendir
(
pWal
->
path
);
if
(
dir
==
NULL
)
{
wError
(
"vgId:%d, path:%s, failed to open since %s"
,
pWal
->
vgId
,
pWal
->
path
,
strerror
(
errno
));
return
-
1
;
}
struct
dirent
*
ent
;
//find existing meta-ver[x].json
int
metaVer
=
-
1
;
while
((
ent
=
readdir
(
dir
))
!=
NULL
)
{
char
*
name
=
basename
(
ent
->
d_name
);
int
code
=
regexec
(
&
walMetaRegexPattern
,
name
,
0
,
NULL
,
0
);
if
(
code
==
0
)
{
sscanf
(
name
,
"meta-ver%d"
,
&
metaVer
);
break
;
}
}
return
metaVer
;
}
int
walWriteMeta
(
SWal
*
pWal
)
{
int
metaVer
=
walFindCurMetaVer
(
pWal
);
char
fnameStr
[
WAL_FILE_LEN
];
walBuildMetaName
(
pWal
,
metaVer
+
1
,
fnameStr
);
int
metaTfd
=
tfOpenCreateWrite
(
fnameStr
);
if
(
metaTfd
<
0
)
{
return
-
1
;
}
char
*
serialized
=
walMetaSerialize
(
pWal
);
int
len
=
strlen
(
serialized
);
if
(
len
!=
tfWrite
(
metaTfd
,
serialized
,
len
))
{
//TODO:clean file
return
-
1
;
}
tfClose
(
metaTfd
);
//delete old file
if
(
metaVer
>
-
1
)
{
walBuildMetaName
(
pWal
,
metaVer
,
fnameStr
);
remove
(
fnameStr
);
}
return
0
;
}
int
walReadMeta
(
SWal
*
pWal
)
{
ASSERT
(
pWal
->
fileInfoSet
->
size
==
0
);
//find existing meta file
int
metaVer
=
walFindCurMetaVer
(
pWal
);
if
(
metaVer
==
-
1
)
{
return
0
;
}
char
fnameStr
[
WAL_FILE_LEN
];
walBuildMetaName
(
pWal
,
metaVer
,
fnameStr
);
//read metafile
struct
stat
statbuf
;
stat
(
fnameStr
,
&
statbuf
);
int
size
=
statbuf
.
st_size
;
char
*
buf
=
malloc
(
size
+
5
);
if
(
buf
==
NULL
)
{
return
-
1
;
}
int
tfd
=
tfOpenRead
(
fnameStr
);
if
(
tfRead
(
tfd
,
buf
,
size
)
!=
size
)
{
free
(
buf
);
return
-
1
;
}
//load into fileInfoSet
int
code
=
walMetaDeserialize
(
pWal
,
buf
);
if
(
code
!=
0
)
{
free
(
buf
);
return
-
1
;
}
free
(
buf
);
return
0
;
}
source/libs/wal/src/walMgmt.c
浏览文件 @
90512f6e
...
@@ -48,9 +48,15 @@ int32_t walInit() {
...
@@ -48,9 +48,15 @@ int32_t walInit() {
int8_t
old
=
atomic_val_compare_exchange_8
(
&
tsWal
.
inited
,
0
,
1
);
int8_t
old
=
atomic_val_compare_exchange_8
(
&
tsWal
.
inited
,
0
,
1
);
if
(
old
==
1
)
return
0
;
if
(
old
==
1
)
return
0
;
int
code
=
tfInit
();
if
(
code
!=
0
)
{
wError
(
"failed to init tfile since %s"
,
tstrerror
(
code
));
atomic_store_8
(
&
tsWal
.
inited
,
0
);
return
code
;
}
tsWal
.
refSetId
=
taosOpenRef
(
TSDB_MIN_VNODES
,
walFreeObj
);
tsWal
.
refSetId
=
taosOpenRef
(
TSDB_MIN_VNODES
,
walFreeObj
);
int
code
=
walCreateThread
();
code
=
walCreateThread
();
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
wError
(
"failed to init wal module since %s"
,
tstrerror
(
code
));
wError
(
"failed to init wal module since %s"
,
tstrerror
(
code
));
atomic_store_8
(
&
tsWal
.
inited
,
0
);
atomic_store_8
(
&
tsWal
.
inited
,
0
);
...
@@ -64,43 +70,31 @@ int32_t walInit() {
...
@@ -64,43 +70,31 @@ int32_t walInit() {
void
walCleanUp
()
{
void
walCleanUp
()
{
walStopThread
();
walStopThread
();
taosCloseRef
(
tsWal
.
refSetId
);
taosCloseRef
(
tsWal
.
refSetId
);
atomic_store_8
(
&
tsWal
.
inited
,
0
);
wInfo
(
"wal module is cleaned up"
);
wInfo
(
"wal module is cleaned up"
);
}
}
static
int
walLoadFileset
(
SWal
*
pWal
)
{
DIR
*
dir
=
opendir
(
pWal
->
path
);
if
(
dir
==
NULL
)
{
wError
(
"vgId:%d, path:%s, failed to open since %s"
,
pWal
->
vgId
,
pWal
->
path
,
strerror
(
errno
));
return
-
1
;
}
struct
dirent
*
ent
;
while
((
ent
=
readdir
(
dir
))
!=
NULL
)
{
char
*
name
=
ent
->
d_name
;
name
[
WAL_NOSUFFIX_LEN
]
=
0
;
//validate file name by regex matching
if
(
1
/* TODO:regex match */
)
{
int64_t
fnameInt64
=
atoll
(
name
);
taosArrayPush
(
pWal
->
fileSet
,
&
fnameInt64
);
}
}
taosArraySort
(
pWal
->
fileSet
,
compareInt64Val
);
return
0
;
}
SWal
*
walOpen
(
const
char
*
path
,
SWalCfg
*
pCfg
)
{
SWal
*
walOpen
(
const
char
*
path
,
SWalCfg
*
pCfg
)
{
SWal
*
pWal
=
malloc
(
sizeof
(
SWal
));
SWal
*
pWal
=
malloc
(
sizeof
(
SWal
));
if
(
pWal
==
NULL
)
{
if
(
pWal
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
NULL
;
return
NULL
;
}
}
pWal
->
writeLogTfd
=
-
1
;
pWal
->
writeIdxTfd
=
-
1
;
//set config
pWal
->
vgId
=
pCfg
->
vgId
;
pWal
->
vgId
=
pCfg
->
vgId
;
pWal
->
curLogTfd
=
-
1
;
pWal
->
curIdxTfd
=
-
1
;
pWal
->
level
=
pCfg
->
walLevel
;
pWal
->
fsyncPeriod
=
pCfg
->
fsyncPeriod
;
pWal
->
fsyncPeriod
=
pCfg
->
fsyncPeriod
;
pWal
->
rollPeriod
=
pCfg
->
rollPeriod
;
pWal
->
segSize
=
pCfg
->
segSize
;
pWal
->
level
=
pCfg
->
walLevel
;
//init status
pWal
->
lastVersion
=
-
1
;
pWal
->
lastRollSeq
=
-
1
;
//init write buffer
memset
(
&
pWal
->
head
,
0
,
sizeof
(
SWalHead
));
memset
(
&
pWal
->
head
,
0
,
sizeof
(
SWalHead
));
pWal
->
head
.
sver
=
0
;
pWal
->
head
.
sver
=
0
;
...
@@ -120,7 +114,6 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
...
@@ -120,7 +114,6 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
walFreeObj
(
pWal
);
walFreeObj
(
pWal
);
return
NULL
;
return
NULL
;
}
}
walLoadFileset
(
pWal
);
wDebug
(
"vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d"
,
pWal
->
vgId
,
pWal
,
pWal
->
level
,
pWal
->
fsyncPeriod
);
wDebug
(
"vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d"
,
pWal
->
vgId
,
pWal
,
pWal
->
level
,
pWal
->
fsyncPeriod
);
...
@@ -151,10 +144,10 @@ void walClose(SWal *pWal) {
...
@@ -151,10 +144,10 @@ void walClose(SWal *pWal) {
if
(
pWal
==
NULL
)
return
;
if
(
pWal
==
NULL
)
return
;
pthread_mutex_lock
(
&
pWal
->
mutex
);
pthread_mutex_lock
(
&
pWal
->
mutex
);
tfClose
(
pWal
->
cur
LogTfd
);
tfClose
(
pWal
->
write
LogTfd
);
tfClose
(
pWal
->
cur
IdxTfd
);
tfClose
(
pWal
->
write
IdxTfd
);
taosArrayDestroy
(
pWal
->
fileSet
);
/*taosArrayDestroy(pWal->fileInfoSet);*/
pWal
->
fileSet
=
NULL
;
/*pWal->fileInfoSet = NULL;*/
pthread_mutex_unlock
(
&
pWal
->
mutex
);
pthread_mutex_unlock
(
&
pWal
->
mutex
);
taosRemoveRef
(
tsWal
.
refSetId
,
pWal
->
refId
);
taosRemoveRef
(
tsWal
.
refSetId
,
pWal
->
refId
);
}
}
...
@@ -164,8 +157,8 @@ static int32_t walInitObj(SWal *pWal) {
...
@@ -164,8 +157,8 @@ static int32_t walInitObj(SWal *pWal) {
wError
(
"vgId:%d, path:%s, failed to create directory since %s"
,
pWal
->
vgId
,
pWal
->
path
,
strerror
(
errno
));
wError
(
"vgId:%d, path:%s, failed to create directory since %s"
,
pWal
->
vgId
,
pWal
->
path
,
strerror
(
errno
));
return
TAOS_SYSTEM_ERROR
(
errno
);
return
TAOS_SYSTEM_ERROR
(
errno
);
}
}
pWal
->
file
Set
=
taosArrayInit
(
0
,
sizeof
(
int64_t
));
pWal
->
file
InfoSet
=
taosArrayInit
(
0
,
sizeof
(
WalFileInfo
));
if
(
pWal
->
fileSet
==
NULL
)
{
if
(
pWal
->
file
Info
Set
==
NULL
)
{
wError
(
"vgId:%d, path:%s, failed to init taosArray %s"
,
pWal
->
vgId
,
pWal
->
path
,
strerror
(
errno
));
wError
(
"vgId:%d, path:%s, failed to init taosArray %s"
,
pWal
->
vgId
,
pWal
->
path
,
strerror
(
errno
));
return
TAOS_SYSTEM_ERROR
(
errno
);
return
TAOS_SYSTEM_ERROR
(
errno
);
}
}
...
@@ -178,10 +171,10 @@ static void walFreeObj(void *wal) {
...
@@ -178,10 +171,10 @@ static void walFreeObj(void *wal) {
SWal
*
pWal
=
wal
;
SWal
*
pWal
=
wal
;
wDebug
(
"vgId:%d, wal:%p is freed"
,
pWal
->
vgId
,
pWal
);
wDebug
(
"vgId:%d, wal:%p is freed"
,
pWal
->
vgId
,
pWal
);
tfClose
(
pWal
->
cur
LogTfd
);
tfClose
(
pWal
->
write
LogTfd
);
tfClose
(
pWal
->
cur
IdxTfd
);
tfClose
(
pWal
->
write
IdxTfd
);
taosArrayDestroy
(
pWal
->
fileSet
);
taosArrayDestroy
(
pWal
->
file
Info
Set
);
pWal
->
fileSet
=
NULL
;
pWal
->
file
Info
Set
=
NULL
;
pthread_mutex_destroy
(
&
pWal
->
mutex
);
pthread_mutex_destroy
(
&
pWal
->
mutex
);
tfree
(
pWal
);
tfree
(
pWal
);
}
}
...
@@ -208,9 +201,9 @@ static void walFsyncAll() {
...
@@ -208,9 +201,9 @@ static void walFsyncAll() {
while
(
pWal
)
{
while
(
pWal
)
{
if
(
walNeedFsync
(
pWal
))
{
if
(
walNeedFsync
(
pWal
))
{
wTrace
(
"vgId:%d, do fsync, level:%d seq:%d rseq:%d"
,
pWal
->
vgId
,
pWal
->
level
,
pWal
->
fsyncSeq
,
atomic_load_32
(
&
tsWal
.
seq
));
wTrace
(
"vgId:%d, do fsync, level:%d seq:%d rseq:%d"
,
pWal
->
vgId
,
pWal
->
level
,
pWal
->
fsyncSeq
,
atomic_load_32
(
&
tsWal
.
seq
));
int32_t
code
=
tfFsync
(
pWal
->
cur
LogTfd
);
int32_t
code
=
tfFsync
(
pWal
->
write
LogTfd
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
wError
(
"vgId:%d, file:%"
PRId64
".log, failed to fsync since %s"
,
pWal
->
vgId
,
pWal
->
curFileFirstVersion
,
strerror
(
code
));
wError
(
"vgId:%d, file:%"
PRId64
".log, failed to fsync since %s"
,
pWal
->
vgId
,
walGetLastFileFirstVer
(
pWal
)
,
strerror
(
code
));
}
}
}
}
pWal
=
taosIterateRef
(
tsWal
.
refSetId
,
pWal
->
refId
);
pWal
=
taosIterateRef
(
tsWal
.
refSetId
,
pWal
->
refId
);
...
...
source/libs/wal/src/walRead.c
浏览文件 @
90512f6e
...
@@ -13,16 +13,56 @@
...
@@ -13,16 +13,56 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include "wal.h"
#include "walInt.h"
#include "tfile.h"
#include "tchecksum.h"
#include "tchecksum.h"
static
int
walValidateChecksum
(
SWalHead
*
pHead
,
void
*
body
,
int64_t
bodyLen
)
{
static
inline
int
walValidHeadCksum
(
SWalHead
*
pHead
)
{
return
taosCheckChecksum
((
uint8_t
*
)
pHead
,
sizeof
(
SWalHead
)
-
sizeof
(
uint32_t
)
*
2
,
pHead
->
cksumHead
)
&&
return
taosCheckChecksum
((
uint8_t
*
)
pHead
,
sizeof
(
SWalHead
)
-
sizeof
(
uint32_t
)
*
2
,
pHead
->
cksumHead
);
taosCheckChecksum
(
body
,
bodyLen
,
pHead
->
cksumBody
);
}
}
static
inline
int
walValidBodyCksum
(
SWalHead
*
pHead
)
{
return
taosCheckChecksum
((
uint8_t
*
)
pHead
->
cont
,
pHead
->
len
,
pHead
->
cksumBody
);
}
static
int
walValidCksum
(
SWalHead
*
pHead
,
void
*
body
,
int64_t
bodyLen
)
{
return
walValidHeadCksum
(
pHead
)
&&
walValidBodyCksum
(
pHead
);
}
int32_t
walRead
(
SWal
*
pWal
,
SWalHead
**
ppHead
,
int64_t
ver
)
{
int32_t
walRead
(
SWal
*
pWal
,
SWalHead
**
ppHead
,
int64_t
ver
)
{
int
code
;
code
=
walSeekVer
(
pWal
,
ver
);
if
(
code
!=
0
)
{
return
code
;
}
if
(
*
ppHead
==
NULL
)
{
void
*
ptr
=
realloc
(
*
ppHead
,
sizeof
(
SWalHead
));
if
(
ptr
==
NULL
)
{
return
-
1
;
}
*
ppHead
=
ptr
;
}
if
(
tfRead
(
pWal
->
writeLogTfd
,
*
ppHead
,
sizeof
(
SWalHead
))
!=
sizeof
(
SWalHead
))
{
return
-
1
;
}
//TODO: endian compatibility processing after read
if
(
walValidHeadCksum
(
*
ppHead
)
!=
0
)
{
return
-
1
;
}
void
*
ptr
=
realloc
(
*
ppHead
,
sizeof
(
SWalHead
)
+
(
*
ppHead
)
->
len
);
if
(
ptr
==
NULL
)
{
free
(
*
ppHead
);
*
ppHead
=
NULL
;
return
-
1
;
}
if
(
tfRead
(
pWal
->
writeLogTfd
,
(
*
ppHead
)
->
cont
,
(
*
ppHead
)
->
len
)
!=
(
*
ppHead
)
->
len
)
{
return
-
1
;
}
//TODO: endian compatibility processing after read
if
(
walValidBodyCksum
(
*
ppHead
)
!=
0
)
{
return
-
1
;
}
return
0
;
return
0
;
}
}
...
...
source/libs/wal/src/walWrite.c
浏览文件 @
90512f6e
...
@@ -21,29 +21,63 @@
...
@@ -21,29 +21,63 @@
#include "tfile.h"
#include "tfile.h"
#include "walInt.h"
#include "walInt.h"
static
void
walFtruncate
(
SWal
*
pWal
,
int64_t
ver
);
int32_t
walCommit
(
SWal
*
pWal
,
int64_t
ver
)
{
int32_t
walCommit
(
SWal
*
pWal
,
int64_t
ver
)
{
ASSERT
(
pWal
->
snapshotVersion
<=
pWal
->
commitVersion
);
ASSERT
(
pWal
->
commitVersion
<=
pWal
->
lastVersion
);
ASSERT
(
ver
>=
pWal
->
commitVersion
);
ASSERT
(
ver
<=
pWal
->
lastVersion
);
pWal
->
commitVersion
=
ver
;
return
0
;
return
0
;
}
}
int32_t
walRollback
(
SWal
*
pWal
,
int64_t
ver
)
{
int32_t
walRollback
(
SWal
*
pWal
,
int64_t
ver
)
{
//TODO: ftruncate
//TODO: ftruncate
ASSERT
(
ver
>
pWal
->
commitVersion
);
ASSERT
(
ver
<=
pWal
->
lastVersion
);
//seek position
walSeekVer
(
pWal
,
ver
);
walFtruncate
(
pWal
,
ver
);
return
0
;
return
0
;
}
}
int32_t
walTakeSnapshot
(
SWal
*
pWal
,
int64_t
ver
)
{
int32_t
walTakeSnapshot
(
SWal
*
pWal
,
int64_t
ver
)
{
pWal
->
snapshotVersion
=
ver
;
pWal
->
snapshotVersion
=
ver
;
int
ts
=
taosGetTimestampSec
();
int
deleteCnt
=
0
;
int64_t
newTotSize
=
pWal
->
totSize
;
WalFileInfo
tmp
;
tmp
.
firstVer
=
ver
;
//mark files safe to delete
//mark files safe to delete
int64_t
*
pRet
=
taosArraySearch
(
pWal
->
fileSet
,
&
ver
,
compareInt64Val
,
TD_LE
);
WalFileInfo
*
pInfo
=
taosArraySearch
(
pWal
->
fileInfoSet
,
&
tmp
,
compareWalFileInfo
,
TD_LE
);
if
(
pRet
!=
pWal
->
fileSet
->
pData
)
{
//iterate files, until the searched result
//delete files until less than retention size
for
(
WalFileInfo
*
iter
=
pWal
->
fileInfoSet
->
pData
;
iter
<
pInfo
;
iter
++
)
{
if
(
pWal
->
totSize
>
pWal
->
retentionSize
||
//find first file that exceeds retention time
iter
->
closeTs
+
pWal
->
retentionPeriod
>
ts
)
{
//delete according to file size or close time
deleteCnt
++
;
newTotSize
-=
iter
->
fileSize
;
}
}
char
fnameStr
[
WAL_FILE_LEN
];
//remove file
for
(
int
i
=
0
;
i
<
deleteCnt
;
i
++
)
{
WalFileInfo
*
pInfo
=
taosArrayGet
(
pWal
->
fileInfoSet
,
i
);
walBuildLogName
(
pWal
,
pInfo
->
firstVer
,
fnameStr
);
remove
(
fnameStr
);
walBuildIdxName
(
pWal
,
pInfo
->
firstVer
,
fnameStr
);
remove
(
fnameStr
);
}
}
//delete files living longer than retention limit
//save snapshot ver, commit ver
//remove file from fileset
//make new array, remove files
taosArrayPopFrontBatch
(
pWal
->
fileInfoSet
,
deleteCnt
);
pWal
->
totSize
=
newTotSize
;
return
0
;
return
0
;
}
}
...
@@ -138,105 +172,122 @@ void walRemoveAllOldFiles(void *handle) {
...
@@ -138,105 +172,122 @@ void walRemoveAllOldFiles(void *handle) {
}
}
#endif
#endif
static
int
walRoll
(
SWal
*
pWal
)
{
int
walRoll
(
SWal
*
pWal
)
{
int
code
=
0
;
int
code
=
0
;
code
=
tfClose
(
pWal
->
curIdxTfd
);
if
(
pWal
->
writeIdxTfd
!=
-
1
)
{
if
(
code
!=
0
)
{
code
=
tfClose
(
pWal
->
writeIdxTfd
);
return
code
;
if
(
code
!=
0
)
{
return
-
1
;
}
}
}
code
=
tfClose
(
pWal
->
curLogTfd
);
if
(
pWal
->
writeLogTfd
!=
-
1
)
{
if
(
code
!=
0
)
{
code
=
tfClose
(
pWal
->
writeLogTfd
);
return
code
;
if
(
code
!=
0
)
{
return
-
1
;
}
}
}
int64_t
idxTfd
,
logTfd
;
int64_t
idxTfd
,
logTfd
;
//create new file
//create new file
int64_t
newFileFirstVersion
=
pWal
->
lastVersion
+
1
;
int64_t
newFileFirstVersion
=
pWal
->
lastVersion
+
1
;
char
fnameStr
[
WAL_FILE_LEN
];
char
fnameStr
[
WAL_FILE_LEN
];
sprintf
(
fnameStr
,
"%"
PRId64
"."
WAL_INDEX_SUFFIX
,
newFileFirstVersion
);
walBuildIdxName
(
pWal
,
newFileFirstVersion
,
fnameStr
);
idxTfd
=
tfOpenCreateWrite
(
fnameStr
);
idxTfd
=
tfOpenCreateWrite
(
fnameStr
);
sprintf
(
fnameStr
,
"%"
PRId64
"."
WAL_LOG_SUFFIX
,
newFileFirstVersion
);
if
(
idxTfd
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
walBuildLogName
(
pWal
,
newFileFirstVersion
,
fnameStr
);
logTfd
=
tfOpenCreateWrite
(
fnameStr
);
logTfd
=
tfOpenCreateWrite
(
fnameStr
);
if
(
logTfd
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
code
=
walRollFileInfo
(
pWal
);
if
(
code
!=
0
)
{
ASSERT
(
0
);
return
-
1
;
}
taosArrayPush
(
pWal
->
fileSet
,
&
newFileFirstVersion
);
//switch file
//switch file
pWal
->
cur
IdxTfd
=
idxTfd
;
pWal
->
write
IdxTfd
=
idxTfd
;
pWal
->
cur
LogTfd
=
logTfd
;
pWal
->
write
LogTfd
=
logTfd
;
//change status
//change status
pWal
->
curFileLastVersion
=
-
1
;
pWal
->
curFileFirstVersion
=
newFileFirstVersion
;
pWal
->
curVersion
=
newFileFirstVersion
;
pWal
->
curLogOffset
=
0
;
pWal
->
curStatus
=
WAL_CUR_FILE_WRITABLE
&
WAL_CUR_POS_WRITABLE
;
pWal
->
curStatus
=
WAL_CUR_FILE_WRITABLE
&
WAL_CUR_POS_WRITABLE
;
pWal
->
lastFileName
=
newFileFirstVersion
;
pWal
->
lastFileWriteSize
=
0
;
pWal
->
lastRollSeq
=
walGetSeq
();
pWal
->
lastRollSeq
=
walGetSeq
();
return
0
;
return
0
;
}
}
int
walChangeFileToLast
(
SWal
*
pWal
)
{
int
walChangeFileToLast
(
SWal
*
pWal
)
{
int64_t
idxTfd
,
logTfd
;
int64_t
idxTfd
,
logTfd
;
int64_t
*
pRet
=
taosArrayGetLast
(
pWal
->
file
Set
);
WalFileInfo
*
pRet
=
taosArrayGetLast
(
pWal
->
fileInfo
Set
);
ASSERT
(
pRet
!=
NULL
);
ASSERT
(
pRet
!=
NULL
);
int64_t
f
name
=
*
pRet
;
int64_t
f
ileFirstVer
=
pRet
->
firstVer
;
char
fnameStr
[
WAL_FILE_LEN
];
char
fnameStr
[
WAL_FILE_LEN
];
sprintf
(
fnameStr
,
"%"
PRId64
"."
WAL_INDEX_SUFFIX
,
fname
);
walBuildIdxName
(
pWal
,
fileFirstVer
,
fnameStr
);
idxTfd
=
tfOpenReadWrite
(
fnameStr
);
idxTfd
=
tfOpenReadWrite
(
fnameStr
);
sprintf
(
fnameStr
,
"%"
PRId64
"."
WAL_LOG_SUFFIX
,
fname
);
if
(
idxTfd
<
0
)
{
return
-
1
;
}
walBuildLogName
(
pWal
,
fileFirstVer
,
fnameStr
);
logTfd
=
tfOpenReadWrite
(
fnameStr
);
logTfd
=
tfOpenReadWrite
(
fnameStr
);
if
(
logTfd
<
0
)
{
return
-
1
;
}
//switch file
//switch file
pWal
->
cur
IdxTfd
=
idxTfd
;
pWal
->
write
IdxTfd
=
idxTfd
;
pWal
->
cur
LogTfd
=
logTfd
;
pWal
->
write
LogTfd
=
logTfd
;
//change status
//change status
pWal
->
curFileLastVersion
=
-
1
;
pWal
->
curVersion
=
fileFirstVer
;
pWal
->
curFileFirstVersion
=
fname
;
pWal
->
curVersion
=
fname
;
pWal
->
curLogOffset
=
0
;
pWal
->
curStatus
=
WAL_CUR_FILE_WRITABLE
;
pWal
->
curStatus
=
WAL_CUR_FILE_WRITABLE
;
return
0
;
return
0
;
}
}
int
walWriteIndex
(
SWal
*
pWal
,
int64_t
ver
,
int64_t
offset
)
{
static
int
walWriteIndex
(
SWal
*
pWal
,
int64_t
ver
,
int64_t
offset
)
{
int
code
=
0
;
int
code
=
0
;
//get index file
//get index file
if
(
!
tfValid
(
pWal
->
cur
IdxTfd
))
{
if
(
!
tfValid
(
pWal
->
write
IdxTfd
))
{
code
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, file:%"
PRId64
".idx, failed to open since %s"
,
pWal
->
vgId
,
pWal
->
curFileFirstVersion
,
strerror
(
errno
));
wError
(
"vgId:%d, file:%"
PRId64
".idx, failed to open since %s"
,
pWal
->
vgId
,
walGetLastFileFirstVer
(
pWal
),
strerror
(
errno
));
return
code
;
}
}
int64_t
writeBuf
[
2
]
=
{
ver
,
offset
};
int64_t
writeBuf
[
2
]
=
{
ver
,
offset
};
int
size
=
tfWrite
(
pWal
->
cur
IdxTfd
,
writeBuf
,
sizeof
(
writeBuf
));
int
size
=
tfWrite
(
pWal
->
write
IdxTfd
,
writeBuf
,
sizeof
(
writeBuf
));
if
(
size
!=
sizeof
(
writeBuf
))
{
if
(
size
!=
sizeof
(
writeBuf
))
{
//TODO:
return
-
1
;
}
}
return
0
;
return
0
;
}
}
int64_t
walWrite
(
SWal
*
pWal
,
int64_t
index
,
uint8_t
msgType
,
void
*
body
,
int32_t
bodyLen
)
{
int64_t
walWrite
(
SWal
*
pWal
,
int64_t
index
,
uint8_t
msgType
,
const
void
*
body
,
int32_t
bodyLen
)
{
if
(
pWal
==
NULL
)
return
-
1
;
if
(
pWal
==
NULL
)
return
-
1
;
int
code
=
0
;
// no wal
// no wal
if
(
pWal
->
level
==
TAOS_WAL_NOLOG
)
return
0
;
if
(
pWal
->
level
==
TAOS_WAL_NOLOG
)
return
0
;
if
(
index
==
pWal
->
lastVersion
+
1
)
{
if
(
index
==
pWal
->
lastVersion
+
1
)
{
int64_t
passed
=
walGetSeq
()
-
pWal
->
lastRollSeq
;
if
(
taosArrayGetSize
(
pWal
->
fileInfoSet
)
==
0
)
{
if
(
passed
>
pWal
->
rollPeriod
)
{
code
=
walRoll
(
pWal
);
walRoll
(
pWal
);
ASSERT
(
code
==
0
);
}
else
if
(
pWal
->
lastFileWriteSize
>
pWal
->
segSize
)
{
walRoll
(
pWal
);
}
else
{
}
else
{
walChangeFileToLast
(
pWal
);
int64_t
passed
=
walGetSeq
()
-
pWal
->
lastRollSeq
;
if
(
pWal
->
rollPeriod
!=
-
1
&&
passed
>
pWal
->
rollPeriod
)
{
walRoll
(
pWal
);
}
else
if
(
pWal
->
segSize
!=
-
1
&&
walGetLastFileSize
(
pWal
)
>
pWal
->
segSize
)
{
walRoll
(
pWal
);
}
}
}
}
else
{
}
else
{
//reject skip log or rewrite log
//reject skip log or rewrite log
//must truncate explicitly first
//must truncate explicitly first
return
-
1
;
return
-
1
;
}
}
if
(
!
tfValid
(
pWal
->
curLogTfd
))
return
0
;
/*if (!tfValid(pWal->curLogTfd)) return 0;*/
pWal
->
head
.
version
=
index
;
pWal
->
head
.
version
=
index
;
int32_t
code
=
0
;
pWal
->
head
.
signature
=
WAL_SIGNATURE
;
pWal
->
head
.
signature
=
WAL_SIGNATURE
;
pWal
->
head
.
len
=
bodyLen
;
pWal
->
head
.
len
=
bodyLen
;
...
@@ -247,22 +298,27 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t
...
@@ -247,22 +298,27 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t
pthread_mutex_lock
(
&
pWal
->
mutex
);
pthread_mutex_lock
(
&
pWal
->
mutex
);
if
(
tfWrite
(
pWal
->
cur
LogTfd
,
&
pWal
->
head
,
sizeof
(
SWalHead
))
!=
sizeof
(
SWalHead
))
{
if
(
tfWrite
(
pWal
->
write
LogTfd
,
&
pWal
->
head
,
sizeof
(
SWalHead
))
!=
sizeof
(
SWalHead
))
{
//ftruncate
//ftruncate
code
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, file:%"
PRId64
".log, failed to write since %s"
,
pWal
->
vgId
,
pWal
->
curFileFirstVersion
,
strerror
(
errno
));
wError
(
"vgId:%d, file:%"
PRId64
".log, failed to write since %s"
,
pWal
->
vgId
,
walGetLastFileFirstVer
(
pWal
)
,
strerror
(
errno
));
}
}
if
(
tfWrite
(
pWal
->
cur
LogTfd
,
&
body
,
bodyLen
)
!=
bodyLen
)
{
if
(
tfWrite
(
pWal
->
write
LogTfd
,
&
body
,
bodyLen
)
!=
bodyLen
)
{
//ftruncate
//ftruncate
code
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"vgId:%d, file:%"
PRId64
".log, failed to write since %s"
,
pWal
->
vgId
,
pWal
->
curFileFirstVersion
,
strerror
(
errno
));
wError
(
"vgId:%d, file:%"
PRId64
".log, failed to write since %s"
,
pWal
->
vgId
,
walGetLastFileFirstVer
(
pWal
),
strerror
(
errno
));
}
code
=
walWriteIndex
(
pWal
,
index
,
walGetCurFileOffset
(
pWal
));
if
(
code
!=
0
)
{
//TODO
}
}
walWriteIndex
(
pWal
,
index
,
pWal
->
curLogOffset
);
pWal
->
curLogOffset
+=
sizeof
(
SWalHead
)
+
bodyLen
;
//set status
//set status
pWal
->
lastVersion
=
index
;
pWal
->
lastVersion
=
index
;
pWal
->
totSize
+=
sizeof
(
SWalHead
)
+
bodyLen
;
walGetCurFileInfo
(
pWal
)
->
lastVer
=
index
;
walGetCurFileInfo
(
pWal
)
->
fileSize
+=
sizeof
(
SWalHead
)
+
bodyLen
;
pthread_mutex_unlock
(
&
pWal
->
mutex
);
pthread_mutex_unlock
(
&
pWal
->
mutex
);
...
@@ -270,12 +326,12 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t
...
@@ -270,12 +326,12 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t
}
}
void
walFsync
(
SWal
*
pWal
,
bool
forceFsync
)
{
void
walFsync
(
SWal
*
pWal
,
bool
forceFsync
)
{
if
(
pWal
==
NULL
||
!
tfValid
(
pWal
->
cur
LogTfd
))
return
;
if
(
pWal
==
NULL
||
!
tfValid
(
pWal
->
write
LogTfd
))
return
;
if
(
forceFsync
||
(
pWal
->
level
==
TAOS_WAL_FSYNC
&&
pWal
->
fsyncPeriod
==
0
))
{
if
(
forceFsync
||
(
pWal
->
level
==
TAOS_WAL_FSYNC
&&
pWal
->
fsyncPeriod
==
0
))
{
wTrace
(
"vgId:%d, fileId:%"
PRId64
".log, do fsync"
,
pWal
->
vgId
,
pWal
->
curFileFirstVersion
);
wTrace
(
"vgId:%d, fileId:%"
PRId64
".log, do fsync"
,
pWal
->
vgId
,
walGetCurFileFirstVer
(
pWal
)
);
if
(
tfFsync
(
pWal
->
cur
LogTfd
)
<
0
)
{
if
(
tfFsync
(
pWal
->
write
LogTfd
)
<
0
)
{
wError
(
"vgId:%d, file:%"
PRId64
".log, fsync failed since %s"
,
pWal
->
vgId
,
pWal
->
curFileFirstVersion
,
strerror
(
errno
));
wError
(
"vgId:%d, file:%"
PRId64
".log, fsync failed since %s"
,
pWal
->
vgId
,
walGetCurFileFirstVer
(
pWal
)
,
strerror
(
errno
));
}
}
}
}
}
}
...
@@ -348,8 +404,36 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
...
@@ -348,8 +404,36 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
}
}
#endif
#endif
static
void
walFtruncate
(
SWal
*
pWal
,
int64_t
tfd
,
int64_t
offset
)
{
static
int
walValidateOffset
(
SWal
*
pWal
,
int64_t
ver
)
{
tfFtruncate
(
tfd
,
offset
);
int
code
=
0
;
SWalHead
*
pHead
=
NULL
;
code
=
(
int
)
walRead
(
pWal
,
&
pHead
,
ver
);
if
(
pHead
->
version
!=
ver
)
{
return
-
1
;
}
return
0
;
}
static
int64_t
walGetOffset
(
SWal
*
pWal
,
int64_t
ver
)
{
int
code
=
walSeekVer
(
pWal
,
ver
);
if
(
code
!=
0
)
{
return
-
1
;
}
code
=
walValidateOffset
(
pWal
,
ver
);
if
(
code
!=
0
)
{
return
-
1
;
}
return
0
;
}
static
void
walFtruncate
(
SWal
*
pWal
,
int64_t
ver
)
{
int64_t
tfd
=
pWal
->
writeLogTfd
;
tfFtruncate
(
tfd
,
ver
);
tfFsync
(
tfd
);
tfd
=
pWal
->
writeIdxTfd
;
tfFtruncate
(
tfd
,
ver
*
WAL_IDX_ENTRY_SIZE
);
tfFsync
(
tfd
);
tfFsync
(
tfd
);
}
}
...
...
source/libs/wal/test/CMakeLists.txt
0 → 100644
浏览文件 @
90512f6e
add_executable
(
walTest
""
)
target_sources
(
walTest
PRIVATE
"walMetaTest.cpp"
)
target_include_directories
(
walTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/wal"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_link_libraries
(
walTest
wal
gtest_main
)
enable_testing
()
add_test
(
NAME wal_test
COMMAND walTest
)
source/libs/wal/test/walMetaTest.cpp
0 → 100644
浏览文件 @
90512f6e
#include <gtest/gtest.h>
#include <cstring>
#include <iostream>
#include <queue>
#include "walInt.h"
class
WalCleanEnv
:
public
::
testing
::
Test
{
protected:
static
void
SetUpTestCase
()
{
int
code
=
walInit
();
ASSERT
(
code
==
0
);
}
static
void
TearDownTestCase
()
{
walCleanUp
();
}
void
SetUp
()
override
{
taosRemoveDir
(
pathName
);
SWalCfg
*
pCfg
=
(
SWalCfg
*
)
malloc
(
sizeof
(
SWal
));
memset
(
pCfg
,
0
,
sizeof
(
SWalCfg
));
pCfg
->
rollPeriod
=
-
1
;
pCfg
->
segSize
=
-
1
;
pCfg
->
walLevel
=
TAOS_WAL_FSYNC
;
pWal
=
walOpen
(
pathName
,
pCfg
);
ASSERT
(
pWal
!=
NULL
);
}
void
TearDown
()
override
{
walClose
(
pWal
);
pWal
=
NULL
;
}
SWal
*
pWal
=
NULL
;
const
char
*
pathName
=
"/tmp/wal_test"
;
};
class
WalKeepEnv
:
public
::
testing
::
Test
{
protected:
static
void
SetUpTestCase
()
{
int
code
=
walInit
();
ASSERT
(
code
==
0
);
}
static
void
TearDownTestCase
()
{
walCleanUp
();
}
void
SetUp
()
override
{
SWalCfg
*
pCfg
=
(
SWalCfg
*
)
malloc
(
sizeof
(
SWal
));
memset
(
pCfg
,
0
,
sizeof
(
SWalCfg
));
pCfg
->
rollPeriod
=
-
1
;
pCfg
->
segSize
=
-
1
;
pCfg
->
walLevel
=
TAOS_WAL_FSYNC
;
pWal
=
walOpen
(
pathName
,
pCfg
);
ASSERT
(
pWal
!=
NULL
);
}
void
TearDown
()
override
{
walClose
(
pWal
);
pWal
=
NULL
;
}
SWal
*
pWal
=
NULL
;
const
char
*
pathName
=
"/tmp/wal_test"
;
};
TEST_F
(
WalCleanEnv
,
createNew
)
{
walRollFileInfo
(
pWal
);
ASSERT
(
pWal
->
fileInfoSet
!=
NULL
);
ASSERT_EQ
(
pWal
->
fileInfoSet
->
size
,
1
);
WalFileInfo
*
pInfo
=
(
WalFileInfo
*
)
taosArrayGetLast
(
pWal
->
fileInfoSet
);
ASSERT_EQ
(
pInfo
->
firstVer
,
0
);
ASSERT_EQ
(
pInfo
->
lastVer
,
-
1
);
ASSERT_EQ
(
pInfo
->
closeTs
,
-
1
);
ASSERT_EQ
(
pInfo
->
fileSize
,
0
);
}
TEST_F
(
WalCleanEnv
,
serialize
)
{
int
code
=
walRollFileInfo
(
pWal
);
ASSERT
(
code
==
0
);
ASSERT
(
pWal
->
fileInfoSet
!=
NULL
);
code
=
walRollFileInfo
(
pWal
);
ASSERT
(
code
==
0
);
code
=
walRollFileInfo
(
pWal
);
ASSERT
(
code
==
0
);
code
=
walRollFileInfo
(
pWal
);
ASSERT
(
code
==
0
);
code
=
walRollFileInfo
(
pWal
);
ASSERT
(
code
==
0
);
code
=
walRollFileInfo
(
pWal
);
ASSERT
(
code
==
0
);
char
*
ss
=
walMetaSerialize
(
pWal
);
printf
(
"%s
\n
"
,
ss
);
code
=
walWriteMeta
(
pWal
);
ASSERT
(
code
==
0
);
}
TEST_F
(
WalCleanEnv
,
removeOldMeta
)
{
int
code
=
walRollFileInfo
(
pWal
);
ASSERT
(
code
==
0
);
ASSERT
(
pWal
->
fileInfoSet
!=
NULL
);
code
=
walWriteMeta
(
pWal
);
ASSERT
(
code
==
0
);
code
=
walRollFileInfo
(
pWal
);
ASSERT
(
code
==
0
);
code
=
walWriteMeta
(
pWal
);
ASSERT
(
code
==
0
);
}
TEST_F
(
WalKeepEnv
,
readOldMeta
)
{
int
code
=
walRollFileInfo
(
pWal
);
ASSERT
(
code
==
0
);
code
=
walWriteMeta
(
pWal
);
ASSERT
(
code
==
0
);
code
=
walRollFileInfo
(
pWal
);
ASSERT
(
code
==
0
);
code
=
walWriteMeta
(
pWal
);
ASSERT
(
code
==
0
);
char
*
oldss
=
walMetaSerialize
(
pWal
);
TearDown
();
SetUp
();
code
=
walReadMeta
(
pWal
);
ASSERT
(
code
==
0
);
char
*
newss
=
walMetaSerialize
(
pWal
);
int
len
=
strlen
(
oldss
);
ASSERT_EQ
(
len
,
strlen
(
newss
));
for
(
int
i
=
0
;
i
<
len
;
i
++
)
{
EXPECT_EQ
(
oldss
[
i
],
newss
[
i
]);
}
}
TEST_F
(
WalKeepEnv
,
write
)
{
const
char
*
ranStr
=
"tvapq02tcp"
;
const
int
len
=
strlen
(
ranStr
);
int
code
;
for
(
int
i
=
0
;
i
<
10
;
i
++
)
{
code
=
walWrite
(
pWal
,
i
,
i
+
1
,
(
void
*
)
ranStr
,
len
);
ASSERT_EQ
(
code
,
0
);
code
=
walWrite
(
pWal
,
i
+
2
,
i
,
(
void
*
)
ranStr
,
len
);
ASSERT_EQ
(
code
,
-
1
);
}
code
=
walWriteMeta
(
pWal
);
ASSERT_EQ
(
code
,
0
);
}
source/libs/wal/test/walTests.cpp
已删除
100644 → 0
浏览文件 @
17a3469a
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
//#define _DEFAULT_SOURCE
#include "os.h"
#include "tutil.h"
#include "tglobal.h"
#include "tlog.h"
#include "twal.h"
#include "tfile.h"
int64_t
ver
=
0
;
void
*
pWal
=
NULL
;
int
writeToQueue
(
void
*
pVnode
,
void
*
data
,
int
type
,
void
*
pMsg
)
{
// do nothing
SWalHead
*
pHead
=
data
;
if
(
pHead
->
version
>
ver
)
ver
=
pHead
->
version
;
walWrite
(
pWal
,
pHead
);
return
0
;
}
int
main
(
int
argc
,
char
*
argv
[])
{
char
path
[
128
]
=
"/tmp/wal"
;
int
level
=
2
;
int
total
=
5
;
int
rows
=
10000
;
int
size
=
128
;
int
keep
=
0
;
for
(
int
i
=
1
;
i
<
argc
;
++
i
)
{
if
(
strcmp
(
argv
[
i
],
"-p"
)
==
0
&&
i
<
argc
-
1
)
{
tstrncpy
(
path
,
argv
[
++
i
],
sizeof
(
path
));
}
else
if
(
strcmp
(
argv
[
i
],
"-l"
)
==
0
&&
i
<
argc
-
1
)
{
level
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-r"
)
==
0
&&
i
<
argc
-
1
)
{
rows
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-k"
)
==
0
&&
i
<
argc
-
1
)
{
keep
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-t"
)
==
0
&&
i
<
argc
-
1
)
{
total
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-s"
)
==
0
&&
i
<
argc
-
1
)
{
size
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-v"
)
==
0
&&
i
<
argc
-
1
)
{
ver
=
atoll
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
&&
i
<
argc
-
1
)
{
dDebugFlag
=
atoi
(
argv
[
++
i
]);
}
else
{
printf
(
"
\n
usage: %s [options]
\n
"
,
argv
[
0
]);
printf
(
" [-p path]: wal file path default is:%s
\n
"
,
path
);
printf
(
" [-l level]: log level, default is:%d
\n
"
,
level
);
printf
(
" [-t total]: total wal files, default is:%d
\n
"
,
total
);
printf
(
" [-r rows]: rows of records per wal file, default is:%d
\n
"
,
rows
);
printf
(
" [-k keep]: keep the wal after closing, default is:%d
\n
"
,
keep
);
printf
(
" [-v version]: initial version, default is:%"
PRId64
"
\n
"
,
ver
);
printf
(
" [-d debugFlag]: debug flag, default:%d
\n
"
,
dDebugFlag
);
printf
(
" [-h help]: print out this help
\n\n
"
);
exit
(
0
);
}
}
taosInitLog
(
"wal.log"
,
100000
,
10
);
tfInit
();
walInit
();
SWalCfg
walCfg
=
{
0
};
walCfg
.
walLevel
=
level
;
walCfg
.
keep
=
keep
;
pWal
=
walOpen
(
path
,
&
walCfg
);
if
(
pWal
==
NULL
)
{
printf
(
"failed to open wal
\n
"
);
exit
(
-
1
);
}
int
ret
=
walRestore
(
pWal
,
NULL
,
writeToQueue
);
if
(
ret
<
0
)
{
printf
(
"failed to restore wal
\n
"
);
exit
(
-
1
);
}
printf
(
"version starts from:%"
PRId64
"
\n
"
,
ver
);
int
contLen
=
sizeof
(
SWalHead
)
+
size
;
SWalHead
*
pHead
=
(
SWalHead
*
)
malloc
(
contLen
);
for
(
int
i
=
0
;
i
<
total
;
++
i
)
{
for
(
int
k
=
0
;
k
<
rows
;
++
k
)
{
pHead
->
version
=
++
ver
;
pHead
->
len
=
size
;
walWrite
(
pWal
,
pHead
);
}
printf
(
"renew a wal, i:%d
\n
"
,
i
);
walRenew
(
pWal
);
}
printf
(
"%d wal files are written
\n
"
,
total
);
int64_t
index
=
0
;
char
name
[
256
];
while
(
1
)
{
int
code
=
walGetWalFile
(
pWal
,
name
,
&
index
);
if
(
code
==
-
1
)
{
printf
(
"failed to get wal file, index:%"
PRId64
"
\n
"
,
index
);
break
;
}
printf
(
"index:%"
PRId64
" wal:%s
\n
"
,
index
,
name
);
if
(
code
==
0
)
break
;
}
getchar
();
walClose
(
pWal
);
walCleanUp
();
tfCleanup
();
return
0
;
}
source/util/src/tarray.c
浏览文件 @
90512f6e
...
@@ -237,6 +237,16 @@ void taosArraySet(SArray* pArray, size_t index, void* pData) {
...
@@ -237,6 +237,16 @@ void taosArraySet(SArray* pArray, size_t index, void* pData) {
memcpy
(
TARRAY_GET_ELEM
(
pArray
,
index
),
pData
,
pArray
->
elemSize
);
memcpy
(
TARRAY_GET_ELEM
(
pArray
,
index
),
pData
,
pArray
->
elemSize
);
}
}
void
taosArrayPopFrontBatch
(
SArray
*
pArray
,
size_t
cnt
)
{
assert
(
cnt
<=
pArray
->
size
);
pArray
->
size
=
pArray
->
size
-
cnt
;
if
(
pArray
->
size
==
0
)
{
pArray
->
size
=
0
;
return
;
}
memmove
(
pArray
->
pData
,
(
char
*
)
pArray
->
pData
+
cnt
*
pArray
->
elemSize
,
pArray
->
size
);
}
void
taosArrayRemove
(
SArray
*
pArray
,
size_t
index
)
{
void
taosArrayRemove
(
SArray
*
pArray
,
size_t
index
)
{
assert
(
index
<
pArray
->
size
);
assert
(
index
<
pArray
->
size
);
...
...
source/util/src/tfile.c
浏览文件 @
90512f6e
...
@@ -22,20 +22,26 @@
...
@@ -22,20 +22,26 @@
static
int32_t
tsFileRsetId
=
-
1
;
static
int32_t
tsFileRsetId
=
-
1
;
static
int8_t
tfInited
=
0
;
static
void
tfCloseFile
(
void
*
p
)
{
static
void
tfCloseFile
(
void
*
p
)
{
taosCloseFile
((
int32_t
)(
uintptr_t
)
p
);
taosCloseFile
((
int32_t
)(
uintptr_t
)
p
);
}
}
int32_t
tfInit
()
{
int32_t
tfInit
()
{
int8_t
old
=
atomic_val_compare_exchange_8
(
&
tfInited
,
0
,
1
);
if
(
old
==
1
)
return
0
;
tsFileRsetId
=
taosOpenRef
(
2000
,
tfCloseFile
);
tsFileRsetId
=
taosOpenRef
(
2000
,
tfCloseFile
);
if
(
tsFileRsetId
>
0
)
{
if
(
tsFileRsetId
>
0
)
{
return
0
;
return
0
;
}
else
{
}
else
{
atomic_store_8
(
&
tfInited
,
0
);
return
-
1
;
return
-
1
;
}
}
}
}
void
tfCleanup
()
{
void
tfCleanup
()
{
atomic_store_8
(
&
tfInited
,
0
);
if
(
tsFileRsetId
>=
0
)
taosCloseRef
(
tsFileRsetId
);
if
(
tsFileRsetId
>=
0
)
taosCloseRef
(
tsFileRsetId
);
tsFileRsetId
=
-
1
;
tsFileRsetId
=
-
1
;
}
}
...
...
source/util/src/tmd5.c
浏览文件 @
90512f6e
...
@@ -84,8 +84,8 @@ static uint8_t PADDING[64] = {0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x
...
@@ -84,8 +84,8 @@ static uint8_t PADDING[64] = {0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x
/* The routine MD5Init initializes the message-digest context
/* The routine MD5Init initializes the message-digest context
mdContext. All fields are set to zero.
mdContext. All fields are set to zero.
*/
*/
void
MD5Init
(
MD5_CTX
*
mdContext
)
{
void
tMD5Init
(
T_
MD5_CTX
*
mdContext
)
{
memset
(
mdContext
,
0
,
sizeof
(
MD5_CTX
));
memset
(
mdContext
,
0
,
sizeof
(
T_
MD5_CTX
));
/* Load magic initialization constants. */
/* Load magic initialization constants. */
mdContext
->
buf
[
0
]
=
(
uint32_t
)
0x67452301
;
mdContext
->
buf
[
0
]
=
(
uint32_t
)
0x67452301
;
...
@@ -98,7 +98,7 @@ void MD5Init(MD5_CTX *mdContext) {
...
@@ -98,7 +98,7 @@ void MD5Init(MD5_CTX *mdContext) {
account for the presence of each of the characters inBuf[0..inLen-1]
account for the presence of each of the characters inBuf[0..inLen-1]
in the message whose digest is being computed.
in the message whose digest is being computed.
*/
*/
void
MD5Update
(
MD5_CTX
*
mdContext
,
uint8_t
*
inBuf
,
unsigned
int
inLen
)
{
void
tMD5Update
(
T_
MD5_CTX
*
mdContext
,
uint8_t
*
inBuf
,
unsigned
int
inLen
)
{
uint32_t
in
[
16
];
uint32_t
in
[
16
];
int
mdi
;
int
mdi
;
unsigned
int
i
,
ii
;
unsigned
int
i
,
ii
;
...
@@ -129,7 +129,7 @@ void MD5Update(MD5_CTX *mdContext, uint8_t *inBuf, unsigned int inLen) {
...
@@ -129,7 +129,7 @@ void MD5Update(MD5_CTX *mdContext, uint8_t *inBuf, unsigned int inLen) {
/* The routine MD5Final terminates the message-digest computation and
/* The routine MD5Final terminates the message-digest computation and
ends with the desired message digest in mdContext->digest[0...15].
ends with the desired message digest in mdContext->digest[0...15].
*/
*/
void
MD5Final
(
MD5_CTX
*
mdContext
)
{
void
tMD5Final
(
T_
MD5_CTX
*
mdContext
)
{
uint32_t
in
[
16
];
uint32_t
in
[
16
];
int
mdi
;
int
mdi
;
unsigned
int
i
,
ii
;
unsigned
int
i
,
ii
;
...
@@ -144,7 +144,7 @@ void MD5Final(MD5_CTX *mdContext) {
...
@@ -144,7 +144,7 @@ void MD5Final(MD5_CTX *mdContext) {
/* pad out to 56 mod 64 */
/* pad out to 56 mod 64 */
padLen
=
(
mdi
<
56
)
?
(
56
-
mdi
)
:
(
120
-
mdi
);
padLen
=
(
mdi
<
56
)
?
(
56
-
mdi
)
:
(
120
-
mdi
);
MD5Update
(
mdContext
,
PADDING
,
padLen
);
t
MD5Update
(
mdContext
,
PADDING
,
padLen
);
/* append length in bits and transform */
/* append length in bits and transform */
for
(
i
=
0
,
ii
=
0
;
i
<
14
;
i
++
,
ii
+=
4
)
for
(
i
=
0
,
ii
=
0
;
i
<
14
;
i
++
,
ii
+=
4
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录