Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
5614b970
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
5614b970
编写于
12月 13, 2021
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/vnode
上级
556c7912
97344d84
变更
24
隐藏空白更改
内联
并排
Showing
24 changed file
with
967 addition
and
244 deletion
+967
-244
include/libs/wal/wal.h
include/libs/wal/wal.h
+24
-23
include/util/tarray.h
include/util/tarray.h
+15
-0
include/util/tchecksum.h
include/util/tchecksum.h
+1
-1
source/libs/index/inc/index_fst.h
source/libs/index/inc/index_fst.h
+19
-18
source/libs/index/inc/index_fst_automation.h
source/libs/index/inc/index_fst_automation.h
+11
-1
source/libs/index/src/index_fst.c
source/libs/index/src/index_fst.c
+8
-0
source/libs/index/src/index_fst_automation.c
source/libs/index/src/index_fst_automation.c
+17
-8
source/libs/parser/inc/insertParser.h
source/libs/parser/inc/insertParser.h
+8
-0
source/libs/parser/src/insertParser.c
source/libs/parser/src/insertParser.c
+3
-9
source/libs/parser/test/insertTest.cpp
source/libs/parser/test/insertTest.cpp
+88
-0
source/libs/parser/test/mockCatalog.cpp
source/libs/parser/test/mockCatalog.cpp
+52
-0
source/libs/parser/test/mockCatalog.h
source/libs/parser/test/mockCatalog.h
+27
-0
source/libs/parser/test/mockCatalogService.cpp
source/libs/parser/test/mockCatalogService.cpp
+188
-0
source/libs/parser/test/mockCatalogService.h
source/libs/parser/test/mockCatalogService.h
+63
-0
source/libs/parser/test/parserMain.cpp
source/libs/parser/test/parserMain.cpp
+41
-0
source/libs/parser/test/tokenizerTest.cpp
source/libs/parser/test/tokenizerTest.cpp
+0
-5
source/libs/wal/inc/walInt.h
source/libs/wal/inc/walInt.h
+30
-0
source/libs/wal/src/walMeta.c
source/libs/wal/src/walMeta.c
+12
-0
source/libs/wal/src/walMgmt.c
source/libs/wal/src/walMgmt.c
+10
-2
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+2
-30
source/libs/wal/src/walSeek.c
source/libs/wal/src/walSeek.c
+34
-11
source/libs/wal/src/walWrite.c
source/libs/wal/src/walWrite.c
+196
-111
source/libs/wal/test/walMetaTest.cpp
source/libs/wal/test/walMetaTest.cpp
+108
-24
source/util/src/tarray.c
source/util/src/tarray.c
+10
-1
未找到文件。
include/libs/wal/wal.h
浏览文件 @
5614b970
...
...
@@ -38,6 +38,24 @@ typedef enum {
TAOS_WAL_FSYNC
=
2
}
EWalType
;
typedef
struct
SWalReadHead
{
int8_t
sver
;
uint8_t
msgType
;
int8_t
reserved
[
2
];
int32_t
len
;
int64_t
version
;
char
cont
[];
}
SWalReadHead
;
typedef
struct
{
int32_t
vgId
;
int32_t
fsyncPeriod
;
// millisecond
int32_t
retentionPeriod
;
// secs
int32_t
rollPeriod
;
// secs
int64_t
segSize
;
EWalType
walLevel
;
// wal level
}
SWalCfg
;
typedef
struct
{
//union {
//uint32_t info;
...
...
@@ -47,25 +65,11 @@ typedef struct {
//uint32_t reserved : 24;
//};
//};
int8_t
sver
;
uint8_t
msgType
;
int8_t
reserved
[
2
];
int32_t
len
;
int64_t
version
;
uint32_t
signature
;
uint32_t
cksumHead
;
uint32_t
cksumBody
;
char
cont
[]
;
SWalReadHead
head
;
}
SWalHead
;
typedef
struct
{
int32_t
vgId
;
int32_t
fsyncPeriod
;
// millisecond
int32_t
rollPeriod
;
int64_t
segSize
;
EWalType
walLevel
;
// wal level
}
SWalCfg
;
#define WAL_PREFIX "wal"
#define WAL_PREFIX_LEN 3
#define WAL_NOSUFFIX_LEN 20
...
...
@@ -80,7 +84,7 @@ typedef struct {
//#define WAL_FILE_NUM 1 // 3
#define WAL_FILESET_MAX 128
#define WAL_IDX_ENTRY_SIZE
(sizeof(int64_t)*2)
#define WAL_IDX_ENTRY_SIZE (sizeof(int64_t)*2)
#define WAL_CUR_POS_WRITABLE 1
#define WAL_CUR_FILE_WRITABLE 2
#define WAL_CUR_FAILED 4
...
...
@@ -103,21 +107,17 @@ typedef struct SWal {
//write tfd
int64_t
writeLogTfd
;
int64_t
writeIdxTfd
;
//read tfd
int64_t
readLogTfd
;
int64_t
readIdxTfd
;
//current version
int64_t
curVersion
;
//wal lifecycle
int64_t
firstVersion
;
int64_t
snapshotVersion
;
int64_t
commitVersion
;
int64_t
lastVersion
;
//snapshotting version
int64_t
snapshottingVer
;
//roll status
int64_t
lastRollSeq
;
//file set
int32_t
writeCur
;
int32_t
readCur
;
SArray
*
fileInfoSet
;
//ctl
int32_t
curStatus
;
...
...
@@ -148,7 +148,8 @@ int32_t walCommit(SWal *, int64_t ver);
// truncate after
int32_t
walRollback
(
SWal
*
,
int64_t
ver
);
// notify that previous logs can be pruned safely
int32_t
walTakeSnapshot
(
SWal
*
,
int64_t
ver
);
int32_t
walBeginTakeSnapshot
(
SWal
*
,
int64_t
ver
);
int32_t
walEndTakeSnapshot
(
SWal
*
);
//int32_t walDataCorrupted(SWal*);
// read
...
...
include/util/tarray.h
浏览文件 @
5614b970
...
...
@@ -153,6 +153,13 @@ void taosArraySet(SArray* pArray, size_t index, void* pData);
*/
void
taosArrayPopFrontBatch
(
SArray
*
pArray
,
size_t
cnt
);
/**
* remove some data entry from front
* @param pArray
* @param cnt
*/
void
taosArrayPopTailBatch
(
SArray
*
pArray
,
size_t
cnt
);
/**
* remove data entry of the given index
* @param pArray
...
...
@@ -213,6 +220,14 @@ void taosArraySortString(SArray* pArray, __compar_fn_t comparFn);
*/
void
*
taosArraySearch
(
const
SArray
*
pArray
,
const
void
*
key
,
__compar_fn_t
comparFn
,
int
flags
);
/**
* search the array, return index of the element
* @param pArray
* @param compar
* @param key
*/
int32_t
taosArraySearchIdx
(
const
SArray
*
pArray
,
const
void
*
key
,
__compar_fn_t
comparFn
,
int
flags
);
/**
* search the array
* @param pArray
...
...
include/util/tchecksum.h
浏览文件 @
5614b970
...
...
@@ -39,7 +39,7 @@ static FORCE_INLINE int taosCalcChecksumAppend(TSCKSUM csi, uint8_t *stream, uin
}
static
FORCE_INLINE
int
taosCheckChecksum
(
const
uint8_t
*
stream
,
uint32_t
ssize
,
TSCKSUM
checksum
)
{
return
(
checksum
=
=
(
*
crc32c
)(
0
,
stream
,
(
size_t
)
ssize
));
return
(
checksum
!
=
(
*
crc32c
)(
0
,
stream
,
(
size_t
)
ssize
));
}
static
FORCE_INLINE
int
taosCheckChecksumWhole
(
const
uint8_t
*
stream
,
uint32_t
ssize
)
{
...
...
source/libs/index/inc/index_fst.h
浏览文件 @
5614b970
...
...
@@ -26,10 +26,24 @@ extern "C" {
#include "index_fst_counting_writer.h"
#include "index_fst_automation.h"
#define OUTPUT_PREFIX(a, b) ((a) > (b) ? (b) : (a)
typedef
struct
Fst
Fst
;
typedef
struct
FstNode
FstNode
;
#define OUTPUT_PREFIX(a, b) ((a) > (b) ? (b) : (a)
typedef
enum
{
Included
,
Excluded
,
Unbounded
}
FstBound
;
typedef
struct
FstBoundWithData
{
FstSlice
data
;
FstBound
type
;
}
FstBoundWithData
;
typedef
struct
FstStreamBuilder
{
Fst
*
fst
;
AutomationCtx
*
aut
;
FstBoundWithData
*
min
;
FstBoundWithData
*
max
;
}
FstStreamBuilder
,
FstStreamWithStateBuilder
;
typedef
struct
FstRange
{
uint64_t
start
;
...
...
@@ -39,16 +53,9 @@ typedef struct FstRange {
typedef
enum
{
GE
,
GT
,
LE
,
LT
}
RangeType
;
typedef
enum
{
OneTransNext
,
OneTrans
,
AnyTrans
,
EmptyFinal
}
State
;
typedef
enum
{
Ordered
,
OutOfOrdered
,
DuplicateKey
}
OrderType
;
typedef
enum
{
Included
,
Excluded
,
Unbounded
}
FstBound
;
typedef
struct
FstBoundWithData
{
FstSlice
data
;
FstBound
type
;
}
FstBoundWithData
;
FstBoundWithData
*
fstBoundStateCreate
(
FstBound
type
,
FstSlice
*
data
);
bool
fstBoundWithDataExceededBy
(
FstBoundWithData
*
bound
,
FstSlice
*
slice
);
bool
fstBoundWithDataIsEmpty
(
FstBoundWithData
*
bound
);
...
...
@@ -60,8 +67,6 @@ typedef struct FstOutput {
Output
out
;
}
FstOutput
;
/*
*
* UnFinished node and helper function
...
...
@@ -275,6 +280,8 @@ FstNode* fstGetRoot(Fst *fst);
FstType
fstGetType
(
Fst
*
fst
);
CompiledAddr
fstGetRootAddr
(
Fst
*
fst
);
Output
fstEmptyFinalOutput
(
Fst
*
fst
,
bool
*
null
);
FstStreamBuilder
*
fstSearch
(
Fst
*
fst
,
AutomationCtx
*
ctx
);
FstStreamWithStateBuilder
*
fstSearchWithState
(
Fst
*
fst
,
AutomationCtx
*
ctx
);
bool
fstVerify
(
Fst
*
fst
);
...
...
@@ -298,7 +305,7 @@ typedef struct StreamWithState {
FstOutput
emptyOutput
;
SArray
*
stack
;
// <StreamState>
FstBoundWithData
*
endAt
;
}
StreamWithState
;
}
StreamWithState
;
typedef
struct
StreamWithStateResult
{
FstSlice
data
;
...
...
@@ -314,14 +321,8 @@ typedef void* (*StreamCallback)(void *);
StreamWithState
*
streamWithStateCreate
(
Fst
*
fst
,
AutomationCtx
*
automation
,
FstBoundWithData
*
min
,
FstBoundWithData
*
max
)
;
void
streamWithStateDestroy
(
StreamWithState
*
sws
);
bool
streamWithStateSeekMin
(
StreamWithState
*
sws
,
FstBoundWithData
*
min
);
StreamWithStateResult
*
streamWithStateNextWith
(
StreamWithState
*
sws
,
StreamCallback
callback
);
typedef
struct
FstStreamBuilder
{
Fst
*
fst
;
AutomationCtx
*
aut
;
FstBoundWithData
*
min
;
FstBoundWithData
*
max
;
}
FstStreamBuilder
;
StreamWithStateResult
*
streamWithStateNextWith
(
StreamWithState
*
sws
,
StreamCallback
callback
);
FstStreamBuilder
*
fstStreamBuilderCreate
(
Fst
*
fst
,
AutomationCtx
*
aut
);
// set up bound range
...
...
source/libs/index/inc/index_fst_automation.h
浏览文件 @
5614b970
...
...
@@ -38,8 +38,18 @@ typedef struct Complement {
// automation
typedef
struct
AutomationCtx
{
AutomationType
type
;
void
*
data
;
}
AutomationCtx
;
typedef
enum
StartWithStateKind
{
Done
,
Running
}
StartWithStateKind
;
typedef
struct
StartWithStateValue
{
StartWithStateKind
kind
;
void
*
value
;
}
StartWithStateValue
;
typedef
struct
AutomationFunc
{
void
*
(
*
start
)(
AutomationCtx
*
ctx
)
;
bool
(
*
isMatch
)(
AutomationCtx
*
ctx
,
void
*
);
...
...
@@ -50,7 +60,7 @@ typedef struct AutomationFunc {
}
AutomationFunc
;
AutomationCtx
*
automCtxCreate
(
void
*
data
,
AutomationType
type
);
void
autoCtxDestroy
(
AutomationCtx
*
ctx
);
void
auto
m
CtxDestroy
(
AutomationCtx
*
ctx
);
extern
AutomationFunc
automFuncs
[];
#ifdef __cplusplus
...
...
source/libs/index/src/index_fst.c
浏览文件 @
5614b970
...
...
@@ -1090,6 +1090,12 @@ bool fstGet(Fst *fst, FstSlice *b, Output *out) {
return
true
;
}
FstStreamBuilder
*
fstSearch
(
Fst
*
fst
,
AutomationCtx
*
ctx
)
{
return
fstStreamBuilderCreate
(
fst
,
ctx
);
}
FstStreamWithStateBuilder
*
fstSearchWithState
(
Fst
*
fst
,
AutomationCtx
*
ctx
)
{
return
fstStreamBuilderCreate
(
fst
,
ctx
);
}
FstNode
*
fstGetRoot
(
Fst
*
fst
)
{
if
(
fst
->
root
!=
NULL
)
{
...
...
@@ -1440,3 +1446,5 @@ FstStreamBuilder *fstStreamBuilderRange(FstStreamBuilder *b, FstSlice *val, Rang
source/libs/index/src/index_fst_automation.c
浏览文件 @
5614b970
...
...
@@ -17,8 +17,9 @@
// prefix query, impl later
static
void
*
prefixStart
(
AutomationCtx
*
ctx
)
{
return
NULL
;
static
void
*
prefixStart
(
AutomationCtx
*
ctx
)
{
StartWithStateValue
*
data
=
(
StartWithStateValue
*
)(
ctx
->
data
);
return
data
;
};
static
bool
prefixIsMatch
(
AutomationCtx
*
ctx
,
void
*
data
)
{
return
true
;
...
...
@@ -82,16 +83,24 @@ AutomationCtx* automCtxCreate(void *data, AutomationType type) {
AutomationCtx
*
ctx
=
calloc
(
1
,
sizeof
(
AutomationCtx
));
if
(
ctx
==
NULL
)
{
return
NULL
;
}
ctx
->
type
=
type
;
if
(
ctx
->
type
==
AUTOMATION_PREFIX
)
{
}
else
if
(
ctx
->
type
==
AUTMMATION_MATCH
)
{
if
(
type
==
AUTOMATION_PREFIX
)
{
StartWithStateValue
*
swsv
=
(
StartWithStateValue
*
)
calloc
(
1
,
sizeof
(
StartWithStateValue
));
swsv
->
kind
=
Done
;
swsv
->
value
=
NULL
;
ctx
->
data
=
(
void
*
)
swsv
;
}
else
if
(
type
==
AUTMMATION_MATCH
)
{
}
else
{
// add more search type
}
ctx
->
type
=
type
;
return
ctx
;
}
void
autoCtxDestroy
(
AutomationCtx
*
ctx
)
{
void
automCtxDestroy
(
AutomationCtx
*
ctx
)
{
if
(
ctx
->
type
==
AUTOMATION_PREFIX
)
{
free
(
ctx
->
data
);
}
else
if
(
ctx
->
type
==
AUTMMATION_MATCH
)
{
}
free
(
ctx
);
}
source/libs/parser/inc/insertParser.h
浏览文件 @
5614b970
...
...
@@ -16,8 +16,16 @@
#ifndef TDENGINE_INSERTPARSER_H
#define TDENGINE_INSERTPARSER_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include "parser.h"
int32_t
parseInsertSql
(
SParseContext
*
pContext
,
SInsertStmtInfo
**
pInfo
);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_INSERTPARSER_H
source/libs/parser/src/insertParser.c
浏览文件 @
5614b970
...
...
@@ -145,16 +145,10 @@ static int32_t toInt64(const char* z, int16_t type, int32_t n, int64_t* value, b
}
static
int32_t
createInsertStmtInfo
(
SInsertStmtInfo
**
pInsertInfo
)
{
SInsertStmtInfo
*
i
nfo
=
calloc
(
1
,
sizeof
(
SQueryStmtInfo
));
if
(
NULL
==
i
nfo
)
{
*
pInsertI
nfo
=
calloc
(
1
,
sizeof
(
SQueryStmtInfo
));
if
(
NULL
==
*
pInsertI
nfo
)
{
return
TSDB_CODE_TSC_OUT_OF_MEMORY
;
}
// info->pTableBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
// if (NULL == info->pTableBlockHashList) {
// tfree(info);
// return TSDB_CODE_TSC_OUT_OF_MEMORY;
// }
*
pInsertInfo
=
info
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -808,7 +802,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
// no data in the sql string anymore.
if
(
sToken
.
n
==
0
)
{
if
(
0
==
pCxt
->
totalNum
)
{
return
TSDB_CODE_TSC_INVALID_OPERATION
;
return
buildInvalidOperationMsg
(
&
pCxt
->
msg
,
"no data in sql"
);
;
}
break
;
}
...
...
source/libs/parser/test/insertTest.cpp
0 → 100644
浏览文件 @
5614b970
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include "insertParser.h"
#include "mockCatalog.h"
using
namespace
std
;
using
namespace
testing
;
namespace
{
string
toString
(
int32_t
code
)
{
return
tstrerror
(
code
);
}
}
// syntax:
// INSERT INTO
// tb_name
// [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
// [(field1_name, ...)]
// VALUES (field1_value, ...) [(field1_value2, ...) ...] | FILE csv_file_path
// [...];
class
InsertTest
:
public
Test
{
protected:
void
bind
(
const
char
*
sql
)
{
reset
();
cxt
.
pSql
=
sql
;
cxt
.
sqlLen
=
strlen
(
sql
);
}
int32_t
run
()
{
code
=
parseInsertSql
(
&
cxt
,
&
res
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
cout
<<
"code:"
<<
toString
(
code
)
<<
", msg:"
<<
errMagBuf
<<
endl
;
}
return
code
;
}
SInsertStmtInfo
*
reslut
()
{
return
res
;
}
private:
static
const
int
max_err_len
=
1024
;
void
reset
()
{
memset
(
&
cxt
,
0
,
sizeof
(
cxt
));
memset
(
errMagBuf
,
0
,
max_err_len
);
cxt
.
pMsg
=
errMagBuf
;
cxt
.
msgLen
=
max_err_len
;
code
=
TSDB_CODE_SUCCESS
;
res
=
nullptr
;
}
char
errMagBuf
[
max_err_len
];
SParseContext
cxt
;
int32_t
code
;
SInsertStmtInfo
*
res
;
};
// INSERT INTO tb_name VALUES (field1_value, ...)
TEST_F
(
InsertTest
,
simpleTest
)
{
bind
(
"insert into .. values (...)"
);
ASSERT_EQ
(
run
(),
TSDB_CODE_SUCCESS
);
SInsertStmtInfo
*
res
=
reslut
();
// todo check
}
TEST_F
(
InsertTest
,
toleranceTest
)
{
bind
(
"insert into"
);
ASSERT_NE
(
run
(),
TSDB_CODE_SUCCESS
);
bind
(
"insert into t"
);
ASSERT_NE
(
run
(),
TSDB_CODE_SUCCESS
);
}
source/libs/parser/test/mockCatalog.cpp
0 → 100644
浏览文件 @
5614b970
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mockCatalog.h"
#include <iostream>
void
generateMetaData
(
MockCatalogService
*
mcs
)
{
{
ITableBuilder
&
builder
=
mcs
->
createTableBuilder
(
"test"
,
"t1"
,
TSDB_NORMAL_TABLE
,
MockCatalogService
::
numOfDataTypes
)
.
setPrecision
(
TSDB_TIME_PRECISION_MILLI
).
setVgid
(
1
).
addColumn
(
"ts"
,
TSDB_DATA_TYPE_TIMESTAMP
);
for
(
int32_t
i
=
0
;
i
<
MockCatalogService
::
numOfDataTypes
;
++
i
)
{
if
(
TSDB_DATA_TYPE_NULL
==
tDataTypes
[
i
].
type
)
{
continue
;
}
builder
=
builder
.
addColumn
(
"c"
+
std
::
to_string
(
i
+
1
),
tDataTypes
[
i
].
type
);
}
builder
.
done
();
}
{
ITableBuilder
&
builder
=
mcs
->
createTableBuilder
(
"test"
,
"st1"
,
TSDB_SUPER_TABLE
,
MockCatalogService
::
numOfDataTypes
,
2
)
.
setPrecision
(
TSDB_TIME_PRECISION_MILLI
).
setVgid
(
2
).
addColumn
(
"ts"
,
TSDB_DATA_TYPE_TIMESTAMP
);
for
(
int32_t
i
=
0
;
i
<
MockCatalogService
::
numOfDataTypes
;
++
i
)
{
if
(
TSDB_DATA_TYPE_NULL
==
tDataTypes
[
i
].
type
)
{
continue
;
}
builder
=
builder
.
addColumn
(
"c"
+
std
::
to_string
(
i
+
1
),
tDataTypes
[
i
].
type
);
}
builder
.
done
();
}
mcs
->
showTables
();
}
struct
SCatalog
*
getCatalogHandle
(
const
SEpSet
*
pMgmtEps
)
{
return
mockCatalogService
->
getCatalogHandle
(
pMgmtEps
);
}
int32_t
catalogGetMetaData
(
struct
SCatalog
*
pCatalog
,
const
SMetaReq
*
pMetaReq
,
SMetaData
*
pMetaData
)
{
return
mockCatalogService
->
catalogGetMetaData
(
pCatalog
,
pMetaReq
,
pMetaData
);
}
source/libs/parser/test/mockCatalog.h
0 → 100644
浏览文件 @
5614b970
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef MOCK_CATALOG_H
#define MOCK_CATALOG_H
#include "mockCatalogService.h"
void
generateMetaData
(
MockCatalogService
*
mcs
);
// mock
struct
SCatalog
*
getCatalogHandle
(
const
SEpSet
*
pMgmtEps
);
int32_t
catalogGetMetaData
(
struct
SCatalog
*
pCatalog
,
const
SMetaReq
*
pMetaReq
,
SMetaData
*
pMetaData
);
#endif // MOCK_CATALOG_H
source/libs/parser/test/mockCatalogService.cpp
0 → 100644
浏览文件 @
5614b970
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mockCatalogService.h"
#include <iomanip>
#include <iostream>
#include <map>
#include "ttypes.h"
std
::
unique_ptr
<
MockCatalogService
>
mockCatalogService
;
class
TableBuilder
:
public
ITableBuilder
{
public:
virtual
TableBuilder
&
addColumn
(
const
std
::
string
&
name
,
int8_t
type
,
int32_t
bytes
)
{
assert
(
colIndex_
<
meta_
->
tableInfo
.
numOfTags
+
meta_
->
tableInfo
.
numOfColumns
);
SSchema
*
col
=
meta_
->
schema
+
colIndex_
;
col
->
type
=
type
;
col
->
colId
=
colIndex_
++
;
col
->
bytes
=
bytes
;
strcpy
(
col
->
name
,
name
.
c_str
());
return
*
this
;
}
virtual
TableBuilder
&
setVgid
(
int16_t
vgid
)
{
meta_
->
vgId
=
vgid
;
return
*
this
;
}
virtual
TableBuilder
&
setPrecision
(
uint8_t
precision
)
{
meta_
->
tableInfo
.
precision
=
precision
;
return
*
this
;
}
virtual
void
done
()
{
meta_
->
tableInfo
.
rowSize
=
rowsize_
;
}
private:
friend
class
MockCatalogServiceImpl
;
static
std
::
unique_ptr
<
TableBuilder
>
createTableBuilder
(
int8_t
tableType
,
int32_t
numOfColumns
,
int32_t
numOfTags
)
{
STableMeta
*
meta
=
(
STableMeta
*
)
std
::
calloc
(
1
,
sizeof
(
STableMeta
)
+
sizeof
(
SSchema
)
*
(
numOfColumns
+
numOfTags
));
if
(
nullptr
==
meta
)
{
throw
std
::
bad_alloc
();
}
meta
->
tableType
=
tableType
;
meta
->
tableInfo
.
numOfTags
=
numOfTags
;
meta
->
tableInfo
.
numOfColumns
=
numOfColumns
;
return
std
::
unique_ptr
<
TableBuilder
>
(
new
TableBuilder
(
meta
));
}
TableBuilder
(
STableMeta
*
meta
)
:
colIndex_
(
0
),
rowsize_
(
0
),
meta_
(
meta
)
{
}
STableMeta
*
table
()
{
return
meta_
;
}
int32_t
colIndex_
;
int32_t
rowsize_
;
STableMeta
*
meta_
;
};
class
MockCatalogServiceImpl
{
public:
static
const
int32_t
numOfDataTypes
=
sizeof
(
tDataTypes
)
/
sizeof
(
tDataTypes
[
0
]);
MockCatalogServiceImpl
()
{
}
struct
SCatalog
*
getCatalogHandle
(
const
SEpSet
*
pMgmtEps
)
{
return
(
struct
SCatalog
*
)
0x01
;
}
int32_t
catalogGetMetaData
(
struct
SCatalog
*
pCatalog
,
const
SMetaReq
*
pMetaReq
,
SMetaData
*
pMetaData
)
{
return
0
;
}
TableBuilder
&
createTableBuilder
(
const
std
::
string
&
db
,
const
std
::
string
&
tbname
,
int8_t
tableType
,
int32_t
numOfColumns
,
int32_t
numOfTags
)
{
builder_
=
TableBuilder
::
createTableBuilder
(
tableType
,
numOfColumns
,
numOfTags
);
meta_
[
db
][
tbname
].
reset
(
builder_
->
table
());
meta_
[
db
][
tbname
]
->
uid
=
id_
++
;
return
*
(
builder_
.
get
());
}
void
showTables
()
const
{
// number of forward fills
#define NOF(n) ((n) / 2)
// number of backward fills
#define NOB(n) ((n) % 2 ? (n) / 2 + 1 : (n) / 2)
// center aligned
#define CA(n, s) std::setw(NOF((n) - (s).length())) << "" << (s) << std::setw(NOB((n) - (s).length())) << "" << "|"
// string field length
#define SFL 20
// string field header
#define SH(h) CA(SFL, std::string(h))
// string field
#define SF(n) CA(SFL, n)
// integer field length
#define IFL 10
// integer field header
#define IH(i) CA(IFL, std::string(i))
// integer field
#define IF(i) CA(IFL, std::to_string(i))
// split line
#define SL(sn, in) std::setfill('=') << std::setw((sn) * (SFL + 1) + (in) * (IFL + 1)) << "" << std::setfill(' ')
for
(
const
auto
&
db
:
meta_
)
{
std
::
cout
<<
SH
(
"Database"
)
<<
SH
(
"Table"
)
<<
SH
(
"Type"
)
<<
SH
(
"Precision"
)
<<
IH
(
std
::
string
(
"Vgid"
))
<<
std
::
endl
;
std
::
cout
<<
SL
(
4
,
1
)
<<
std
::
endl
;
for
(
const
auto
&
table
:
db
.
second
)
{
std
::
cout
<<
SF
(
db
.
first
)
<<
SF
(
table
.
first
)
<<
SF
(
ttToString
(
table
.
second
->
tableType
))
<<
SF
(
pToString
(
table
.
second
->
tableInfo
.
precision
))
<<
IF
(
table
.
second
->
vgId
)
<<
std
::
endl
;
// int16_t numOfFields = table.second->tableInfo.numOfTags + table.second->tableInfo.numOfColumns;
// for (int16_t i = 0; i < numOfFields; ++i) {
// const SSchema* schema = table.second->schema + i;
// std::cout << schema->name << " " << schema->type << " " << schema->bytes << std::endl;
// }
}
}
}
private:
std
::
string
ttToString
(
int8_t
tableType
)
const
{
switch
(
tableType
)
{
case
TSDB_SUPER_TABLE
:
return
"super table"
;
case
TSDB_CHILD_TABLE
:
return
"child table"
;
case
TSDB_NORMAL_TABLE
:
return
"normal table"
;
default:
return
"unknown"
;
}
}
std
::
string
pToString
(
uint8_t
precision
)
const
{
switch
(
precision
)
{
case
TSDB_TIME_PRECISION_MILLI
:
return
"millisecond"
;
case
TSDB_TIME_PRECISION_MICRO
:
return
"microsecond"
;
case
TSDB_TIME_PRECISION_NANO
:
return
"nanosecond"
;
default:
return
"unknown"
;
}
}
uint64_t
id_
;
std
::
unique_ptr
<
TableBuilder
>
builder_
;
std
::
map
<
std
::
string
,
std
::
map
<
std
::
string
,
std
::
shared_ptr
<
STableMeta
>
>
>
meta_
;
};
MockCatalogService
::
MockCatalogService
()
:
impl_
(
new
MockCatalogServiceImpl
())
{
}
MockCatalogService
::~
MockCatalogService
()
{
}
struct
SCatalog
*
MockCatalogService
::
getCatalogHandle
(
const
SEpSet
*
pMgmtEps
)
{
return
impl_
->
getCatalogHandle
(
pMgmtEps
);
}
int32_t
MockCatalogService
::
catalogGetMetaData
(
struct
SCatalog
*
pCatalog
,
const
SMetaReq
*
pMetaReq
,
SMetaData
*
pMetaData
)
{
return
impl_
->
catalogGetMetaData
(
pCatalog
,
pMetaReq
,
pMetaData
);
}
ITableBuilder
&
MockCatalogService
::
createTableBuilder
(
const
std
::
string
&
db
,
const
std
::
string
&
tbname
,
int8_t
tableType
,
int32_t
numOfColumns
,
int32_t
numOfTags
)
{
return
impl_
->
createTableBuilder
(
db
,
tbname
,
tableType
,
numOfColumns
,
numOfTags
);
}
void
MockCatalogService
::
showTables
()
const
{
impl_
->
showTables
();
}
\ No newline at end of file
source/libs/parser/test/mockCatalogService.h
0 → 100644
浏览文件 @
5614b970
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef MOCK_CATALOG_SERVICE_H
#define MOCK_CATALOG_SERVICE_H
#include <memory>
#include <string>
#include "catalog.h"
class
ITableBuilder
{
public:
ITableBuilder
&
addTag
(
const
std
::
string
&
name
,
int8_t
type
)
{
return
addColumn
(
name
,
type
,
tDataTypes
[
type
].
bytes
);
}
ITableBuilder
&
addTag
(
const
std
::
string
&
name
,
int8_t
type
,
int32_t
bytes
)
{
return
addColumn
(
name
,
type
,
bytes
);
}
ITableBuilder
&
addColumn
(
const
std
::
string
&
name
,
int8_t
type
)
{
return
addColumn
(
name
,
type
,
tDataTypes
[
type
].
bytes
);
}
virtual
ITableBuilder
&
addColumn
(
const
std
::
string
&
name
,
int8_t
type
,
int32_t
bytes
)
=
0
;
virtual
ITableBuilder
&
setVgid
(
int16_t
vgid
)
=
0
;
virtual
ITableBuilder
&
setPrecision
(
uint8_t
precision
)
=
0
;
virtual
void
done
()
=
0
;
};
class
MockCatalogServiceImpl
;
class
MockCatalogService
{
public:
static
const
int32_t
numOfDataTypes
=
sizeof
(
tDataTypes
)
/
sizeof
(
tDataTypes
[
0
]);
MockCatalogService
();
~
MockCatalogService
();
struct
SCatalog
*
getCatalogHandle
(
const
SEpSet
*
pMgmtEps
);
int32_t
catalogGetMetaData
(
struct
SCatalog
*
pCatalog
,
const
SMetaReq
*
pMetaReq
,
SMetaData
*
pMetaData
);
ITableBuilder
&
createTableBuilder
(
const
std
::
string
&
db
,
const
std
::
string
&
tbname
,
int8_t
tableType
,
int32_t
numOfColumns
,
int32_t
numOfTags
=
0
);
void
showTables
()
const
;
private:
std
::
unique_ptr
<
MockCatalogServiceImpl
>
impl_
;
};
extern
std
::
unique_ptr
<
MockCatalogService
>
mockCatalogService
;
#endif // MOCK_CATALOG_SERVICE_H
source/libs/parser/test/parserMain.cpp
0 → 100644
浏览文件 @
5614b970
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string>
#include <gtest/gtest.h>
#include "mockCatalog.h"
class
ParserEnv
:
public
testing
::
Environment
{
public:
virtual
void
SetUp
()
{
mockCatalogService
.
reset
(
new
MockCatalogService
());
generateMetaData
(
mockCatalogService
.
get
());
}
virtual
void
TearDown
()
{
mockCatalogService
.
reset
();
}
ParserEnv
()
{}
virtual
~
ParserEnv
()
{}
};
int
main
(
int
argc
,
char
*
argv
[])
{
testing
::
AddGlobalTestEnvironment
(
new
ParserEnv
());
testing
::
InitGoogleTest
(
&
argc
,
argv
);
return
RUN_ALL_TESTS
();
}
source/libs/parser/test/tokenizerTest.cpp
浏览文件 @
5614b970
...
...
@@ -79,11 +79,6 @@ static void _init_tvariant_nchar(SVariant* t) {
t
->
nLen
=
twcslen
(
t
->
wpz
);
}
int
main
(
int
argc
,
char
**
argv
)
{
testing
::
InitGoogleTest
(
&
argc
,
argv
);
return
RUN_ALL_TESTS
();
}
TEST
(
testCase
,
validateToken_test
)
{
char
t01
[]
=
"abc"
;
EXPECT_EQ
(
testValidateName
(
t01
),
TSDB_CODE_SUCCESS
);
...
...
source/libs/wal/inc/walInt.h
浏览文件 @
5614b970
...
...
@@ -18,6 +18,7 @@
#include "wal.h"
#include "compare.h"
#include "tchecksum.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -32,6 +33,11 @@ typedef struct WalFileInfo {
int64_t
fileSize
;
}
WalFileInfo
;
typedef
struct
WalIdxEntry
{
int64_t
ver
;
int64_t
offset
;
}
WalIdxEntry
;
static
inline
int32_t
compareWalFileInfo
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
WalFileInfo
*
pInfoLeft
=
(
WalFileInfo
*
)
pLeft
;
WalFileInfo
*
pInfoRight
=
(
WalFileInfo
*
)
pRight
;
...
...
@@ -79,6 +85,26 @@ static inline int walBuildIdxName(SWal*pWal, int64_t fileFirstVer, char* buf) {
return
sprintf
(
buf
,
"%s/%"
PRId64
"."
WAL_INDEX_SUFFIX
,
pWal
->
path
,
fileFirstVer
);
}
static
inline
int
walValidHeadCksum
(
SWalHead
*
pHead
)
{
return
taosCheckChecksum
((
uint8_t
*
)
&
pHead
->
head
,
sizeof
(
SWalReadHead
),
pHead
->
cksumHead
);
}
static
inline
int
walValidBodyCksum
(
SWalHead
*
pHead
)
{
return
taosCheckChecksum
((
uint8_t
*
)
pHead
->
head
.
cont
,
pHead
->
head
.
len
,
pHead
->
cksumBody
);
}
static
inline
int
walValidCksum
(
SWalHead
*
pHead
,
void
*
body
,
int64_t
bodyLen
)
{
return
walValidHeadCksum
(
pHead
)
&&
walValidBodyCksum
(
pHead
);
}
static
inline
uint32_t
walCalcHeadCksum
(
SWalHead
*
pHead
)
{
return
taosCalcChecksum
(
0
,
(
uint8_t
*
)
&
pHead
->
head
,
sizeof
(
SWalReadHead
));
}
static
inline
uint32_t
walCalcBodyCksum
(
const
void
*
body
,
uint32_t
len
)
{
return
taosCalcChecksum
(
0
,
(
uint8_t
*
)
body
,
len
);
}
int
walReadMeta
(
SWal
*
pWal
);
int
walWriteMeta
(
SWal
*
pWal
);
int
walRollFileInfo
(
SWal
*
pWal
);
...
...
@@ -87,6 +113,10 @@ char* walMetaSerialize(SWal* pWal);
int
walMetaDeserialize
(
SWal
*
pWal
,
const
char
*
bytes
);
//meta section end
//seek section
int
walChangeFile
(
SWal
*
pWal
,
int64_t
ver
);
//seek section end
int64_t
walGetSeq
();
int
walSeekVer
(
SWal
*
pWal
,
int64_t
ver
);
int
walRoll
(
SWal
*
pWal
);
...
...
source/libs/wal/src/walMeta.c
浏览文件 @
5614b970
...
...
@@ -24,6 +24,18 @@
#include <libgen.h>
#include <regex.h>
int64_t
walGetFirstVer
(
SWal
*
pWal
)
{
return
pWal
->
firstVersion
;
}
int64_t
walGetSnaphostVer
(
SWal
*
pWal
)
{
return
pWal
->
snapshotVersion
;
}
int64_t
walGetLastVer
(
SWal
*
pWal
)
{
return
pWal
->
lastVersion
;
}
int
walRollFileInfo
(
SWal
*
pWal
)
{
int64_t
ts
=
taosGetTimestampSec
();
...
...
source/libs/wal/src/walMgmt.c
浏览文件 @
5614b970
...
...
@@ -82,6 +82,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
}
pWal
->
writeLogTfd
=
-
1
;
pWal
->
writeIdxTfd
=
-
1
;
pWal
->
writeCur
=
-
1
;
//set config
pWal
->
vgId
=
pCfg
->
vgId
;
...
...
@@ -90,13 +91,20 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
pWal
->
segSize
=
pCfg
->
segSize
;
pWal
->
level
=
pCfg
->
walLevel
;
//init status
//init version info
pWal
->
firstVersion
=
-
1
;
pWal
->
commitVersion
=
-
1
;
pWal
->
snapshotVersion
=
-
1
;
pWal
->
lastVersion
=
-
1
;
pWal
->
snapshottingVer
=
-
1
;
//init status
pWal
->
lastRollSeq
=
-
1
;
//init write buffer
memset
(
&
pWal
->
head
,
0
,
sizeof
(
SWalHead
));
pWal
->
head
.
sver
=
0
;
pWal
->
head
.
head
.
sver
=
0
;
tstrncpy
(
pWal
->
path
,
path
,
sizeof
(
pWal
->
path
));
pthread_mutex_init
(
&
pWal
->
mutex
,
NULL
);
...
...
source/libs/wal/src/walRead.c
浏览文件 @
5614b970
...
...
@@ -15,19 +15,6 @@
#include "walInt.h"
#include "tfile.h"
#include "tchecksum.h"
static
inline
int
walValidHeadCksum
(
SWalHead
*
pHead
)
{
return
taosCheckChecksum
((
uint8_t
*
)
pHead
,
sizeof
(
SWalHead
)
-
sizeof
(
uint32_t
)
*
2
,
pHead
->
cksumHead
);
}
static
inline
int
walValidBodyCksum
(
SWalHead
*
pHead
)
{
return
taosCheckChecksum
((
uint8_t
*
)
pHead
->
cont
,
pHead
->
len
,
pHead
->
cksumBody
);
}
static
int
walValidCksum
(
SWalHead
*
pHead
,
void
*
body
,
int64_t
bodyLen
)
{
return
walValidHeadCksum
(
pHead
)
&&
walValidBodyCksum
(
pHead
);
}
int32_t
walRead
(
SWal
*
pWal
,
SWalHead
**
ppHead
,
int64_t
ver
)
{
int
code
;
...
...
@@ -49,13 +36,13 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
if
(
walValidHeadCksum
(
*
ppHead
)
!=
0
)
{
return
-
1
;
}
void
*
ptr
=
realloc
(
*
ppHead
,
sizeof
(
SWalHead
)
+
(
*
ppHead
)
->
len
);
void
*
ptr
=
realloc
(
*
ppHead
,
sizeof
(
SWalHead
)
+
(
*
ppHead
)
->
head
.
len
);
if
(
ptr
==
NULL
)
{
free
(
*
ppHead
);
*
ppHead
=
NULL
;
return
-
1
;
}
if
(
tfRead
(
pWal
->
writeLogTfd
,
(
*
ppHead
)
->
cont
,
(
*
ppHead
)
->
len
)
!=
(
*
ppHead
)
->
len
)
{
if
(
tfRead
(
pWal
->
writeLogTfd
,
(
*
ppHead
)
->
head
.
cont
,
(
*
ppHead
)
->
head
.
len
)
!=
(
*
ppHead
)
->
head
.
len
)
{
return
-
1
;
}
//TODO: endian compatibility processing after read
...
...
@@ -69,18 +56,3 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
int32_t
walReadWithFp
(
SWal
*
pWal
,
FWalWrite
writeFp
,
int64_t
verStart
,
int32_t
readNum
)
{
return
0
;
}
int64_t
walGetFirstVer
(
SWal
*
pWal
)
{
if
(
pWal
==
NULL
)
return
0
;
return
pWal
->
firstVersion
;
}
int64_t
walGetSnaphostVer
(
SWal
*
pWal
)
{
if
(
pWal
==
NULL
)
return
0
;
return
pWal
->
snapshotVersion
;
}
int64_t
walGetLastVer
(
SWal
*
pWal
)
{
if
(
pWal
==
NULL
)
return
0
;
return
pWal
->
lastVersion
;
}
source/libs/wal/src/wal
Index
.c
→
source/libs/wal/src/wal
Seek
.c
浏览文件 @
5614b970
...
...
@@ -43,12 +43,35 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
if
(
code
!=
0
)
{
return
-
1
;
}
/*pWal->curLogOffset = readBuf[1];*/
pWal
->
curVersion
=
ver
;
return
code
;
}
static
int
walChangeFile
(
SWal
*
pWal
,
int64_t
ver
)
{
int
walChangeFileToLast
(
SWal
*
pWal
)
{
int64_t
idxTfd
,
logTfd
;
WalFileInfo
*
pRet
=
taosArrayGetLast
(
pWal
->
fileInfoSet
);
ASSERT
(
pRet
!=
NULL
);
int64_t
fileFirstVer
=
pRet
->
firstVer
;
char
fnameStr
[
WAL_FILE_LEN
];
walBuildIdxName
(
pWal
,
fileFirstVer
,
fnameStr
);
idxTfd
=
tfOpenReadWrite
(
fnameStr
);
if
(
idxTfd
<
0
)
{
return
-
1
;
}
walBuildLogName
(
pWal
,
fileFirstVer
,
fnameStr
);
logTfd
=
tfOpenReadWrite
(
fnameStr
);
if
(
logTfd
<
0
)
{
return
-
1
;
}
//switch file
pWal
->
writeIdxTfd
=
idxTfd
;
pWal
->
writeLogTfd
=
logTfd
;
//change status
pWal
->
curStatus
=
WAL_CUR_FILE_WRITABLE
;
return
0
;
}
int
walChangeFile
(
SWal
*
pWal
,
int64_t
ver
)
{
int
code
=
0
;
int64_t
idxTfd
,
logTfd
;
char
fnameStr
[
WAL_FILE_LEN
];
...
...
@@ -86,21 +109,21 @@ static int walChangeFile(SWal *pWal, int64_t ver) {
return
code
;
}
int
walGetVerOffset
(
SWal
*
pWal
,
int64_t
ver
)
{
int
code
;
return
0
;
}
int
walSeekVer
(
SWal
*
pWal
,
int64_t
ver
)
{
int
code
;
if
(
(
!
(
pWal
->
curStatus
&
WAL_CUR_FAILED
))
&&
ver
==
pWal
->
cur
Version
)
{
if
(
ver
==
pWal
->
last
Version
)
{
return
0
;
}
if
(
ver
>
pWal
->
lastVersion
)
{
//TODO: some records are skipped
return
-
1
;
}
if
(
ver
<
pWal
->
firstVersion
)
{
//TODO: try to seek pruned log
if
(
ver
>
pWal
->
lastVersion
||
ver
<
pWal
->
firstVersion
)
{
return
-
1
;
}
if
(
ver
<
pWal
->
snapshotVersion
)
{
//TODO: se
ek snapshotted log, invalid in some cases
//TODO: se
t flag to prevent roll back
}
if
(
ver
<
walGetCurFileFirstVer
(
pWal
)
||
(
ver
>
walGetCurFileLastVer
(
pWal
)))
{
code
=
walChangeFile
(
pWal
,
ver
);
...
...
source/libs/wal/src/walWrite.c
浏览文件 @
5614b970
...
...
@@ -21,65 +21,6 @@
#include "tfile.h"
#include "walInt.h"
static
void
walFtruncate
(
SWal
*
pWal
,
int64_t
ver
);
int32_t
walCommit
(
SWal
*
pWal
,
int64_t
ver
)
{
ASSERT
(
pWal
->
snapshotVersion
<=
pWal
->
commitVersion
);
ASSERT
(
pWal
->
commitVersion
<=
pWal
->
lastVersion
);
ASSERT
(
ver
>=
pWal
->
commitVersion
);
ASSERT
(
ver
<=
pWal
->
lastVersion
);
pWal
->
commitVersion
=
ver
;
return
0
;
}
int32_t
walRollback
(
SWal
*
pWal
,
int64_t
ver
)
{
//TODO: ftruncate
ASSERT
(
ver
>
pWal
->
commitVersion
);
ASSERT
(
ver
<=
pWal
->
lastVersion
);
//seek position
walSeekVer
(
pWal
,
ver
);
walFtruncate
(
pWal
,
ver
);
return
0
;
}
int32_t
walTakeSnapshot
(
SWal
*
pWal
,
int64_t
ver
)
{
pWal
->
snapshotVersion
=
ver
;
int
ts
=
taosGetTimestampSec
();
int
deleteCnt
=
0
;
int64_t
newTotSize
=
pWal
->
totSize
;
WalFileInfo
tmp
;
tmp
.
firstVer
=
ver
;
//mark files safe to delete
WalFileInfo
*
pInfo
=
taosArraySearch
(
pWal
->
fileInfoSet
,
&
tmp
,
compareWalFileInfo
,
TD_LE
);
//iterate files, until the searched result
for
(
WalFileInfo
*
iter
=
pWal
->
fileInfoSet
->
pData
;
iter
<
pInfo
;
iter
++
)
{
if
(
pWal
->
totSize
>
pWal
->
retentionSize
||
iter
->
closeTs
+
pWal
->
retentionPeriod
>
ts
)
{
//delete according to file size or close time
deleteCnt
++
;
newTotSize
-=
iter
->
fileSize
;
}
}
char
fnameStr
[
WAL_FILE_LEN
];
//remove file
for
(
int
i
=
0
;
i
<
deleteCnt
;
i
++
)
{
WalFileInfo
*
pInfo
=
taosArrayGet
(
pWal
->
fileInfoSet
,
i
);
walBuildLogName
(
pWal
,
pInfo
->
firstVer
,
fnameStr
);
remove
(
fnameStr
);
walBuildIdxName
(
pWal
,
pInfo
->
firstVer
,
fnameStr
);
remove
(
fnameStr
);
}
//save snapshot ver, commit ver
//make new array, remove files
taosArrayPopFrontBatch
(
pWal
->
fileInfoSet
,
deleteCnt
);
pWal
->
totSize
=
newTotSize
;
return
0
;
}
#if 0
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name, int64_t fileId);
...
...
@@ -172,6 +113,185 @@ void walRemoveAllOldFiles(void *handle) {
}
#endif
int32_t
walCommit
(
SWal
*
pWal
,
int64_t
ver
)
{
ASSERT
(
pWal
->
commitVersion
>=
pWal
->
snapshotVersion
);
ASSERT
(
pWal
->
commitVersion
<=
pWal
->
lastVersion
);
if
(
ver
<
pWal
->
commitVersion
||
ver
>
pWal
->
lastVersion
)
{
return
-
1
;
}
pWal
->
commitVersion
=
ver
;
return
0
;
}
int32_t
walRollback
(
SWal
*
pWal
,
int64_t
ver
)
{
int
code
;
char
fnameStr
[
WAL_FILE_LEN
];
if
(
ver
==
pWal
->
lastVersion
)
{
return
0
;
}
if
(
ver
>
pWal
->
lastVersion
||
ver
<
pWal
->
commitVersion
)
{
return
-
1
;
}
pthread_mutex_lock
(
&
pWal
->
mutex
);
//find correct file
if
(
ver
<
walGetLastFileFirstVer
(
pWal
))
{
//close current files
tfClose
(
pWal
->
writeIdxTfd
);
tfClose
(
pWal
->
writeLogTfd
);
//open old files
code
=
walChangeFile
(
pWal
,
ver
);
if
(
code
!=
0
)
{
return
-
1
;
}
//delete files
int
fileSetSize
=
taosArrayGetSize
(
pWal
->
fileInfoSet
);
for
(
int
i
=
pWal
->
writeCur
;
i
<
fileSetSize
;
i
++
)
{
walBuildLogName
(
pWal
,
((
WalFileInfo
*
)
taosArrayGet
(
pWal
->
fileInfoSet
,
i
))
->
firstVer
,
fnameStr
);
remove
(
fnameStr
);
walBuildIdxName
(
pWal
,
((
WalFileInfo
*
)
taosArrayGet
(
pWal
->
fileInfoSet
,
i
))
->
firstVer
,
fnameStr
);
remove
(
fnameStr
);
}
//pop from fileInfoSet
taosArraySetSize
(
pWal
->
fileInfoSet
,
pWal
->
writeCur
+
1
);
}
walBuildIdxName
(
pWal
,
walGetCurFileFirstVer
(
pWal
),
fnameStr
);
int64_t
idxTfd
=
tfOpenReadWrite
(
fnameStr
);
//change to deserialize function
if
(
idxTfd
<
0
)
{
pthread_mutex_unlock
(
&
pWal
->
mutex
);
return
-
1
;
}
int
idxOff
=
(
ver
-
walGetCurFileFirstVer
(
pWal
))
*
WAL_IDX_ENTRY_SIZE
;
code
=
tfLseek
(
idxTfd
,
idxOff
,
SEEK_SET
);
if
(
code
<
0
)
{
pthread_mutex_unlock
(
&
pWal
->
mutex
);
return
-
1
;
}
//read idx file and get log file pos
//TODO:change to deserialize function
WalIdxEntry
entry
;
if
(
tfRead
(
idxTfd
,
&
entry
,
sizeof
(
WalIdxEntry
))
!=
sizeof
(
WalIdxEntry
))
{
pthread_mutex_unlock
(
&
pWal
->
mutex
);
return
-
1
;
}
ASSERT
(
entry
.
ver
==
ver
);
walBuildLogName
(
pWal
,
walGetCurFileFirstVer
(
pWal
),
fnameStr
);
int64_t
logTfd
=
tfOpenReadWrite
(
fnameStr
);
if
(
logTfd
<
0
)
{
//TODO
pthread_mutex_unlock
(
&
pWal
->
mutex
);
return
-
1
;
}
code
=
tfLseek
(
logTfd
,
entry
.
offset
,
SEEK_SET
);
if
(
code
<
0
)
{
//TODO
pthread_mutex_unlock
(
&
pWal
->
mutex
);
return
-
1
;
}
//validate offset
SWalHead
head
;
ASSERT
(
tfValid
(
logTfd
));
int
size
=
tfRead
(
logTfd
,
&
head
,
sizeof
(
SWalHead
));
if
(
size
!=
sizeof
(
SWalHead
))
{
return
-
1
;
}
code
=
walValidHeadCksum
(
&
head
);
ASSERT
(
code
==
0
);
if
(
code
!=
0
)
{
return
-
1
;
}
if
(
head
.
head
.
version
!=
ver
)
{
//TODO
return
-
1
;
}
//truncate old files
code
=
tfFtruncate
(
logTfd
,
entry
.
offset
);
if
(
code
<
0
)
{
return
-
1
;
}
code
=
tfFtruncate
(
idxTfd
,
idxOff
);
if
(
code
<
0
)
{
return
-
1
;
}
pWal
->
lastVersion
=
ver
-
1
;
((
WalFileInfo
*
)
taosArrayGetLast
(
pWal
->
fileInfoSet
))
->
lastVer
=
ver
-
1
;
((
WalFileInfo
*
)
taosArrayGetLast
(
pWal
->
fileInfoSet
))
->
fileSize
=
entry
.
offset
;
//unlock
pthread_mutex_unlock
(
&
pWal
->
mutex
);
return
0
;
}
int32_t
walBeginTakeSnapshot
(
SWal
*
pWal
,
int64_t
ver
)
{
pWal
->
snapshottingVer
=
ver
;
//check file rolling
if
(
pWal
->
retentionPeriod
==
0
)
{
walRoll
(
pWal
);
}
return
0
;
}
int32_t
walEndTakeSnapshot
(
SWal
*
pWal
)
{
int64_t
ver
=
pWal
->
snapshottingVer
;
if
(
ver
==
-
1
)
return
-
1
;
pWal
->
snapshotVersion
=
ver
;
int
ts
=
taosGetTimestampSec
();
int
deleteCnt
=
0
;
int64_t
newTotSize
=
pWal
->
totSize
;
WalFileInfo
tmp
;
tmp
.
firstVer
=
ver
;
//find files safe to delete
WalFileInfo
*
pInfo
=
taosArraySearch
(
pWal
->
fileInfoSet
,
&
tmp
,
compareWalFileInfo
,
TD_LE
);
if
(
ver
>=
pInfo
->
lastVer
)
{
pInfo
++
;
}
//iterate files, until the searched result
for
(
WalFileInfo
*
iter
=
pWal
->
fileInfoSet
->
pData
;
iter
<
pInfo
;
iter
++
)
{
if
(
pWal
->
totSize
>
pWal
->
retentionSize
||
iter
->
closeTs
+
pWal
->
retentionPeriod
>
ts
)
{
//delete according to file size or close time
deleteCnt
++
;
newTotSize
-=
iter
->
fileSize
;
}
}
char
fnameStr
[
WAL_FILE_LEN
];
//remove file
for
(
int
i
=
0
;
i
<
deleteCnt
;
i
++
)
{
WalFileInfo
*
pInfo
=
taosArrayGet
(
pWal
->
fileInfoSet
,
i
);
walBuildLogName
(
pWal
,
pInfo
->
firstVer
,
fnameStr
);
remove
(
fnameStr
);
walBuildIdxName
(
pWal
,
pInfo
->
firstVer
,
fnameStr
);
remove
(
fnameStr
);
}
//make new array, remove files
taosArrayPopFrontBatch
(
pWal
->
fileInfoSet
,
deleteCnt
);
if
(
taosArrayGetSize
(
pWal
->
fileInfoSet
)
==
0
)
{
pWal
->
firstVersion
=
-
1
;
}
else
{
pWal
->
firstVersion
=
((
WalFileInfo
*
)
taosArrayGet
(
pWal
->
fileInfoSet
,
0
))
->
firstVer
;
}
pWal
->
totSize
=
newTotSize
;
pWal
->
snapshottingVer
=
-
1
;
//save snapshot ver, commit ver
int
code
=
walWriteMeta
(
pWal
);
if
(
code
!=
0
)
{
return
-
1
;
}
return
0
;
}
int
walRoll
(
SWal
*
pWal
)
{
int
code
=
0
;
if
(
pWal
->
writeIdxTfd
!=
-
1
)
{
...
...
@@ -211,6 +331,7 @@ int walRoll(SWal *pWal) {
//switch file
pWal
->
writeIdxTfd
=
idxTfd
;
pWal
->
writeLogTfd
=
logTfd
;
pWal
->
writeCur
=
taosArrayGetSize
(
pWal
->
fileInfoSet
)
-
1
;
//change status
pWal
->
curStatus
=
WAL_CUR_FILE_WRITABLE
&
WAL_CUR_POS_WRITABLE
;
...
...
@@ -218,32 +339,6 @@ int walRoll(SWal *pWal) {
return
0
;
}
int
walChangeFileToLast
(
SWal
*
pWal
)
{
int64_t
idxTfd
,
logTfd
;
WalFileInfo
*
pRet
=
taosArrayGetLast
(
pWal
->
fileInfoSet
);
ASSERT
(
pRet
!=
NULL
);
int64_t
fileFirstVer
=
pRet
->
firstVer
;
char
fnameStr
[
WAL_FILE_LEN
];
walBuildIdxName
(
pWal
,
fileFirstVer
,
fnameStr
);
idxTfd
=
tfOpenReadWrite
(
fnameStr
);
if
(
idxTfd
<
0
)
{
return
-
1
;
}
walBuildLogName
(
pWal
,
fileFirstVer
,
fnameStr
);
logTfd
=
tfOpenReadWrite
(
fnameStr
);
if
(
logTfd
<
0
)
{
return
-
1
;
}
//switch file
pWal
->
writeIdxTfd
=
idxTfd
;
pWal
->
writeLogTfd
=
logTfd
;
//change status
pWal
->
curVersion
=
fileFirstVer
;
pWal
->
curStatus
=
WAL_CUR_FILE_WRITABLE
;
return
0
;
}
static
int
walWriteIndex
(
SWal
*
pWal
,
int64_t
ver
,
int64_t
offset
)
{
int
code
=
0
;
//get index file
...
...
@@ -253,9 +348,11 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
wError
(
"vgId:%d, file:%"
PRId64
".idx, failed to open since %s"
,
pWal
->
vgId
,
walGetLastFileFirstVer
(
pWal
),
strerror
(
errno
));
return
code
;
}
int64_t
writeBuf
[
2
]
=
{
ver
,
offset
};
int
size
=
tfWrite
(
pWal
->
writeIdxTfd
,
writeBuf
,
sizeof
(
writeBuf
));
if
(
size
!=
sizeof
(
writeBuf
))
{
char
fnameStr
[
WAL_FILE_LEN
];
walBuildIdxName
(
pWal
,
walGetCurFileFirstVer
(
pWal
),
fnameStr
);
WalIdxEntry
entry
=
{
.
ver
=
ver
,
.
offset
=
offset
};
int
size
=
tfWrite
(
pWal
->
writeIdxTfd
,
&
entry
,
sizeof
(
WalIdxEntry
));
if
(
size
!=
sizeof
(
WalIdxEntry
))
{
return
-
1
;
}
return
0
;
...
...
@@ -270,13 +367,14 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
if
(
index
==
pWal
->
lastVersion
+
1
)
{
if
(
taosArrayGetSize
(
pWal
->
fileInfoSet
)
==
0
)
{
pWal
->
firstVersion
=
index
;
code
=
walRoll
(
pWal
);
ASSERT
(
code
==
0
);
}
else
{
int64_t
passed
=
walGetSeq
()
-
pWal
->
lastRollSeq
;
if
(
pWal
->
rollPeriod
!=
-
1
&&
passed
>
pWal
->
rollPeriod
)
{
if
(
pWal
->
rollPeriod
!=
-
1
&&
p
Wal
->
rollPeriod
!=
0
&&
p
assed
>
pWal
->
rollPeriod
)
{
walRoll
(
pWal
);
}
else
if
(
pWal
->
segSize
!=
-
1
&&
walGetLastFileSize
(
pWal
)
>
pWal
->
segSize
)
{
}
else
if
(
pWal
->
segSize
!=
-
1
&&
pWal
->
segSize
!=
0
&&
walGetLastFileSize
(
pWal
)
>
pWal
->
segSize
)
{
walRoll
(
pWal
);
}
}
...
...
@@ -287,16 +385,13 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
}
/*if (!tfValid(pWal->curLogTfd)) return 0;*/
pWal
->
head
.
version
=
index
;
pWal
->
head
.
signature
=
WAL_SIGNATURE
;
pWal
->
head
.
len
=
bodyLen
;
pWal
->
head
.
msgType
=
msgType
;
pWal
->
head
.
cksumHead
=
taosCalcChecksum
(
0
,
(
const
uint8_t
*
)
&
pWal
->
head
,
sizeof
(
SWalHead
)
-
sizeof
(
uint32_t
)
*
2
);
pWal
->
head
.
cksumBody
=
taosCalcChecksum
(
0
,
(
const
uint8_t
*
)
&
body
,
bodyLen
);
pthread_mutex_lock
(
&
pWal
->
mutex
);
pWal
->
head
.
head
.
version
=
index
;
pWal
->
head
.
head
.
len
=
bodyLen
;
pWal
->
head
.
head
.
msgType
=
msgType
;
pWal
->
head
.
cksumHead
=
walCalcHeadCksum
(
&
pWal
->
head
);
pWal
->
head
.
cksumBody
=
walCalcBodyCksum
(
body
,
bodyLen
);
if
(
tfWrite
(
pWal
->
writeLogTfd
,
&
pWal
->
head
,
sizeof
(
SWalHead
))
!=
sizeof
(
SWalHead
))
{
//ftruncate
...
...
@@ -312,6 +407,7 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
code
=
walWriteIndex
(
pWal
,
index
,
walGetCurFileOffset
(
pWal
));
if
(
code
!=
0
)
{
//TODO
return
-
1
;
}
//set status
...
...
@@ -326,8 +422,6 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
}
void
walFsync
(
SWal
*
pWal
,
bool
forceFsync
)
{
if
(
pWal
==
NULL
||
!
tfValid
(
pWal
->
writeLogTfd
))
return
;
if
(
forceFsync
||
(
pWal
->
level
==
TAOS_WAL_FSYNC
&&
pWal
->
fsyncPeriod
==
0
))
{
wTrace
(
"vgId:%d, fileId:%"
PRId64
".log, do fsync"
,
pWal
->
vgId
,
walGetCurFileFirstVer
(
pWal
));
if
(
tfFsync
(
pWal
->
writeLogTfd
)
<
0
)
{
...
...
@@ -408,7 +502,7 @@ static int walValidateOffset(SWal* pWal, int64_t ver) {
int
code
=
0
;
SWalHead
*
pHead
=
NULL
;
code
=
(
int
)
walRead
(
pWal
,
&
pHead
,
ver
);
if
(
pHead
->
version
!=
ver
)
{
if
(
pHead
->
head
.
version
!=
ver
)
{
return
-
1
;
}
return
0
;
...
...
@@ -428,15 +522,6 @@ static int64_t walGetOffset(SWal* pWal, int64_t ver) {
return
0
;
}
static
void
walFtruncate
(
SWal
*
pWal
,
int64_t
ver
)
{
int64_t
tfd
=
pWal
->
writeLogTfd
;
tfFtruncate
(
tfd
,
ver
);
tfFsync
(
tfd
);
tfd
=
pWal
->
writeIdxTfd
;
tfFtruncate
(
tfd
,
ver
*
WAL_IDX_ENTRY_SIZE
);
tfFsync
(
tfd
);
}
#if 0
static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) {
int64_t pos = *offset;
...
...
source/libs/wal/test/walMetaTest.cpp
浏览文件 @
5614b970
...
...
@@ -36,6 +36,36 @@ class WalCleanEnv : public ::testing::Test {
const
char
*
pathName
=
"/tmp/wal_test"
;
};
class
WalCleanDeleteEnv
:
public
::
testing
::
Test
{
protected:
static
void
SetUpTestCase
()
{
int
code
=
walInit
();
ASSERT
(
code
==
0
);
}
static
void
TearDownTestCase
()
{
walCleanUp
();
}
void
SetUp
()
override
{
taosRemoveDir
(
pathName
);
SWalCfg
*
pCfg
=
(
SWalCfg
*
)
malloc
(
sizeof
(
SWal
));
memset
(
pCfg
,
0
,
sizeof
(
SWalCfg
));
pCfg
->
retentionPeriod
=
0
;
pCfg
->
walLevel
=
TAOS_WAL_FSYNC
;
pWal
=
walOpen
(
pathName
,
pCfg
);
ASSERT
(
pWal
!=
NULL
);
}
void
TearDown
()
override
{
walClose
(
pWal
);
pWal
=
NULL
;
}
SWal
*
pWal
=
NULL
;
const
char
*
pathName
=
"/tmp/wal_test"
;
};
class
WalKeepEnv
:
public
::
testing
::
Test
{
protected:
static
void
SetUpTestCase
()
{
...
...
@@ -110,40 +140,94 @@ TEST_F(WalCleanEnv, removeOldMeta) {
ASSERT
(
code
==
0
);
}
TEST_F
(
WalKeepEnv
,
readOldMeta
)
{
int
code
=
walRollFileInfo
(
pWal
);
ASSERT
(
code
==
0
);
code
=
walWriteMeta
(
pWal
);
ASSERT
(
code
==
0
);
code
=
walRollFileInfo
(
pWal
);
ASSERT
(
code
==
0
);
code
=
walWriteMeta
(
pWal
);
ASSERT
(
code
==
0
);
char
*
oldss
=
walMetaSerialize
(
pWal
);
TearDown
();
SetUp
();
code
=
walReadMeta
(
pWal
);
ASSERT
(
code
==
0
);
char
*
newss
=
walMetaSerialize
(
pWal
);
int
len
=
strlen
(
oldss
);
ASSERT_EQ
(
len
,
strlen
(
newss
));
for
(
int
i
=
0
;
i
<
len
;
i
++
)
{
EXPECT_EQ
(
oldss
[
i
],
newss
[
i
]);
//TEST_F(WalKeepEnv, readOldMeta) {
//int code = walRollFileInfo(pWal);
//ASSERT(code == 0);
//code = walWriteMeta(pWal);
//ASSERT(code == 0);
//code = walRollFileInfo(pWal);
//ASSERT(code == 0);
//code = walWriteMeta(pWal);
//ASSERT(code == 0);
//char*oldss = walMetaSerialize(pWal);
//TearDown();
//SetUp();
//code = walReadMeta(pWal);
//ASSERT(code == 0);
//char* newss = walMetaSerialize(pWal);
//int len = strlen(oldss);
//ASSERT_EQ(len, strlen(newss));
//for(int i = 0; i < len; i++) {
//EXPECT_EQ(oldss[i], newss[i]);
//}
//}
TEST_F
(
WalCleanEnv
,
write
)
{
const
char
*
ranStr
=
"tvapq02tcp"
;
const
int
len
=
strlen
(
ranStr
);
int
code
;
for
(
int
i
=
0
;
i
<
10
;
i
++
)
{
code
=
walWrite
(
pWal
,
i
,
i
+
1
,
(
void
*
)
ranStr
,
len
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
pWal
->
lastVersion
,
i
);
code
=
walWrite
(
pWal
,
i
+
2
,
i
,
(
void
*
)
ranStr
,
len
);
ASSERT_EQ
(
code
,
-
1
);
ASSERT_EQ
(
pWal
->
lastVersion
,
i
);
}
code
=
walWriteMeta
(
pWal
);
ASSERT_EQ
(
code
,
0
);
}
TEST_F
(
Wal
KeepEnv
,
write
)
{
TEST_F
(
Wal
CleanEnv
,
rollback
)
{
const
char
*
ranStr
=
"tvapq02tcp"
;
const
int
len
=
strlen
(
ranStr
);
int
code
;
for
(
int
i
=
0
;
i
<
10
;
i
++
)
{
code
=
walWrite
(
pWal
,
i
,
i
+
1
,
(
void
*
)
ranStr
,
len
);
ASSERT_EQ
(
code
,
0
);
code
=
walWrite
(
pWal
,
i
+
2
,
i
,
(
void
*
)
ranStr
,
len
);
ASSERT_EQ
(
code
,
-
1
);
ASSERT_EQ
(
pWal
->
lastVersion
,
i
);
}
code
=
walRollback
(
pWal
,
5
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
pWal
->
lastVersion
,
4
);
code
=
walRollback
(
pWal
,
3
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
pWal
->
lastVersion
,
2
);
code
=
walWriteMeta
(
pWal
);
ASSERT_EQ
(
code
,
0
);
}
TEST_F
(
WalCleanDeleteEnv
,
roll
)
{
const
char
*
ranStr
=
"tvapq02tcp"
;
const
int
len
=
strlen
(
ranStr
);
int
code
;
int
i
;
for
(
i
=
0
;
i
<
100
;
i
++
)
{
code
=
walWrite
(
pWal
,
i
,
0
,
(
void
*
)
ranStr
,
len
);
ASSERT_EQ
(
code
,
0
);
ASSERT_EQ
(
pWal
->
lastVersion
,
i
);
code
=
walCommit
(
pWal
,
i
);
ASSERT_EQ
(
pWal
->
commitVersion
,
i
);
}
walBeginTakeSnapshot
(
pWal
,
i
-
1
);
ASSERT_EQ
(
pWal
->
snapshottingVer
,
i
-
1
);
walEndTakeSnapshot
(
pWal
);
ASSERT_EQ
(
pWal
->
snapshotVersion
,
i
-
1
);
ASSERT_EQ
(
pWal
->
snapshottingVer
,
-
1
);
code
=
walWrite
(
pWal
,
5
,
0
,
(
void
*
)
ranStr
,
len
);
ASSERT_NE
(
code
,
0
);
for
(;
i
<
200
;
i
++
)
{
code
=
walWrite
(
pWal
,
i
,
0
,
(
void
*
)
ranStr
,
len
);
ASSERT_EQ
(
code
,
0
);
code
=
walCommit
(
pWal
,
i
);
ASSERT_EQ
(
pWal
->
commitVersion
,
i
);
}
code
=
walWriteMeta
(
pWal
);
ASSERT_EQ
(
code
,
0
);
}
source/util/src/tarray.c
浏览文件 @
5614b970
...
...
@@ -241,12 +241,16 @@ void taosArrayPopFrontBatch(SArray* pArray, size_t cnt) {
assert
(
cnt
<=
pArray
->
size
);
pArray
->
size
=
pArray
->
size
-
cnt
;
if
(
pArray
->
size
==
0
)
{
pArray
->
size
=
0
;
return
;
}
memmove
(
pArray
->
pData
,
(
char
*
)
pArray
->
pData
+
cnt
*
pArray
->
elemSize
,
pArray
->
size
);
}
void
taosArrayPopTailBatch
(
SArray
*
pArray
,
size_t
cnt
)
{
assert
(
cnt
<=
pArray
->
size
);
pArray
->
size
=
pArray
->
size
-
cnt
;
}
void
taosArrayRemove
(
SArray
*
pArray
,
size_t
index
)
{
assert
(
index
<
pArray
->
size
);
...
...
@@ -329,6 +333,11 @@ void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t compa
return
taosbsearch
(
key
,
pArray
->
pData
,
pArray
->
size
,
pArray
->
elemSize
,
comparFn
,
flags
);
}
int32_t
taosArraySearchIdx
(
const
SArray
*
pArray
,
const
void
*
key
,
__compar_fn_t
comparFn
,
int
flags
)
{
void
*
item
=
taosArraySearch
(
pArray
,
key
,
comparFn
,
flags
);
return
(
int32_t
)((
char
*
)
item
-
(
char
*
)
pArray
->
pData
)
/
pArray
->
elemSize
;
}
void
taosArraySortString
(
SArray
*
pArray
,
__compar_fn_t
comparFn
)
{
assert
(
pArray
!=
NULL
);
qsort
(
pArray
->
pData
,
pArray
->
size
,
pArray
->
elemSize
,
comparFn
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录