Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
fa46811a
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
fa46811a
编写于
1月 11, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/qnode
上级
e105d952
c0dcee34
变更
29
隐藏空白更改
内联
并排
Showing
29 changed file
with
623 addition
and
167 deletion
+623
-167
include/libs/wal/wal.h
include/libs/wal/wal.h
+2
-0
source/common/src/tglobal.c
source/common/src/tglobal.c
+1
-1
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+4
-1
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+4
-2
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+1
-1
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+6
-8
source/dnode/mnode/impl/test/db/db.cpp
source/dnode/mnode/impl/test/db/db.cpp
+4
-15
source/dnode/vnode/meta/src/metaIdx.c
source/dnode/vnode/meta/src/metaIdx.c
+3
-7
source/libs/index/src/index.c
source/libs/index/src/index.c
+41
-37
source/libs/index/src/index_cache.c
source/libs/index/src/index_cache.c
+18
-4
source/libs/index/src/index_fst.c
source/libs/index/src/index_fst.c
+1
-0
source/libs/index/src/index_fst_counting_writer.c
source/libs/index/src/index_fst_counting_writer.c
+10
-4
source/libs/index/src/index_tfile.c
source/libs/index/src/index_tfile.c
+13
-3
source/libs/wal/inc/walInt.h
source/libs/wal/inc/walInt.h
+18
-5
source/libs/wal/src/walMeta.c
source/libs/wal/src/walMeta.c
+177
-25
source/libs/wal/src/walMgmt.c
source/libs/wal/src/walMgmt.c
+5
-1
source/libs/wal/src/walSeek.c
source/libs/wal/src/walSeek.c
+44
-32
source/libs/wal/src/walWrite.c
source/libs/wal/src/walWrite.c
+12
-13
source/libs/wal/test/walMetaTest.cpp
source/libs/wal/test/walMetaTest.cpp
+120
-0
source/util/src/tarray.c
source/util/src/tarray.c
+1
-1
source/util/test/arrayTest.cpp
source/util/test/arrayTest.cpp
+32
-0
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+5
-4
tests/script/sh/deploy.sh
tests/script/sh/deploy.sh
+1
-1
tests/script/sim/db/basic1.sim
tests/script/sim/db/basic1.sim
+1
-2
tests/script/sim/db/basic6.sim
tests/script/sim/db/basic6.sim
+0
-0
tests/script/sim/db/error1.sim
tests/script/sim/db/error1.sim
+99
-0
tests/script/sim/dnode/basic1.sim
tests/script/sim/dnode/basic1.sim
+0
-0
tests/script/sim/table/basic1.sim
tests/script/sim/table/basic1.sim
+0
-0
tests/script/sim/user/basic1.sim
tests/script/sim/user/basic1.sim
+0
-0
未找到文件。
include/libs/wal/wal.h
浏览文件 @
fa46811a
...
...
@@ -71,6 +71,7 @@ extern int32_t wDebugFlag;
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_MAGIC 0xFAFBFCFDULL
#define WAL_CUR_FAILED 1
...
...
@@ -98,6 +99,7 @@ typedef struct {
}
SWalCfg
;
typedef
struct
{
uint64_t
magic
;
uint32_t
cksumHead
;
uint32_t
cksumBody
;
SWalReadHead
head
;
...
...
source/common/src/tglobal.c
浏览文件 @
fa46811a
...
...
@@ -37,7 +37,7 @@ uint16_t tsServerPort = 6030;
int32_t
tsStatusInterval
=
1
;
// second
int8_t
tsEnableTelemetryReporting
=
0
;
char
tsEmail
[
TSDB_FQDN_LEN
]
=
{
0
};
int32_t
tsNumOfSupportVnodes
=
1
6
;
int32_t
tsNumOfSupportVnodes
=
1
28
;
// common
int32_t
tsRpcTimer
=
300
;
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
fa46811a
...
...
@@ -463,7 +463,7 @@ static int32_t mndProcessCreateDbReq(SMnodeMsg *pReq) {
pCreate
->
commitTime
=
htonl
(
pCreate
->
commitTime
);
pCreate
->
fsyncPeriod
=
htonl
(
pCreate
->
fsyncPeriod
);
mDebug
(
"db:%s, start to create
"
,
pCreate
->
db
);
mDebug
(
"db:%s, start to create
, vgroups:%d"
,
pCreate
->
db
,
pCreate
->
numOfVgroups
);
SDbObj
*
pDb
=
mndAcquireDb
(
pMnode
,
pCreate
->
db
);
if
(
pDb
!=
NULL
)
{
...
...
@@ -476,6 +476,9 @@ static int32_t mndProcessCreateDbReq(SMnodeMsg *pReq) {
mError
(
"db:%s, failed to create since %s"
,
pCreate
->
db
,
terrstr
());
return
-
1
;
}
}
else
if
(
terrno
!=
TSDB_CODE_MND_DB_NOT_EXIST
)
{
mError
(
"db:%s, failed to create since %s"
,
pCreate
->
db
,
terrstr
());
return
-
1
;
}
SUserObj
*
pOperUser
=
mndAcquireUser
(
pMnode
,
pReq
->
user
);
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
fa46811a
...
...
@@ -204,7 +204,7 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOldStb, SStbObj *pNewStb
SStbObj
*
mndAcquireStb
(
SMnode
*
pMnode
,
char
*
stbName
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SStbObj
*
pStb
=
sdbAcquire
(
pSdb
,
SDB_STB
,
stbName
);
if
(
pStb
==
NULL
)
{
if
(
pStb
==
NULL
&&
terrno
==
TSDB_CODE_SDB_OBJ_NOT_THERE
)
{
terrno
=
TSDB_CODE_MND_STB_NOT_EXIST
;
}
return
pStb
;
...
...
@@ -513,9 +513,11 @@ static int32_t mndProcesSMCreateStbReq(SMnodeMsg *pMsg) {
return
0
;
}
else
{
terrno
=
TSDB_CODE_MND_STB_ALREADY_EXIST
;
mError
(
"
d
b:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
mError
(
"
st
b:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
}
}
else
if
(
terrno
!=
TSDB_CODE_MND_STB_NOT_EXIST
)
{
mError
(
"stb:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
}
// topic should have different name with stb
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
fa46811a
...
...
@@ -809,7 +809,7 @@ static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) {
mDebug
(
"trans:%d, stage from undoLog to rollback"
,
pTrans
->
id
);
continueExec
=
true
;
}
else
{
m
Debug
(
"trans:%d, stage keep on undoLog since %s"
,
pTrans
->
id
,
terrstr
());
m
Error
(
"trans:%d, stage keep on undoLog since %s"
,
pTrans
->
id
,
terrstr
());
continueExec
=
false
;
}
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
fa46811a
...
...
@@ -273,15 +273,10 @@ static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2
SDnodeObj
*
pDnode
=
pObj
;
SArray
*
pArray
=
p1
;
pDnode
->
numOfVnodes
=
mndGetVnodesNum
(
pMnode
,
pDnode
->
id
);
int64_t
curMs
=
taosGetTimestampMs
();
bool
online
=
mndIsDnodeOnline
(
pMnode
,
pDnode
,
curMs
);
if
(
online
&&
pDnode
->
numOfSupportVnodes
>
0
)
{
taosArrayPush
(
pArray
,
pDnode
);
}
bool
isMnode
=
mndIsMnode
(
pMnode
,
pDnode
->
id
);
bool
isMnode
=
mndIsMnode
(
pMnode
,
pDnode
->
id
);
pDnode
->
numOfVnodes
=
mndGetVnodesNum
(
pMnode
,
pDnode
->
id
);
mDebug
(
"dnode:%d, vnodes:%d supportVnodes:%d isMnode:%d online:%d"
,
pDnode
->
id
,
pDnode
->
numOfVnodes
,
pDnode
->
numOfSupportVnodes
,
isMnode
,
online
);
...
...
@@ -290,6 +285,9 @@ static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2
pDnode
->
numOfVnodes
++
;
}
if
(
online
&&
pDnode
->
numOfSupportVnodes
>
0
)
{
taosArrayPush
(
pArray
,
pDnode
);
}
return
true
;
}
...
...
@@ -311,7 +309,7 @@ static SArray *mndBuildDnodesArray(SMnode *pMnode) {
static
int32_t
mndCompareDnodeVnodes
(
SDnodeObj
*
pDnode1
,
SDnodeObj
*
pDnode2
)
{
float
d1Score
=
(
float
)
pDnode1
->
numOfVnodes
/
pDnode1
->
numOfSupportVnodes
;
float
d2Score
=
(
float
)
pDnode2
->
numOfVnodes
/
pDnode2
->
numOfSupportVnodes
;
return
d1Score
>
d2Score
?
1
:
0
;
return
d1Score
>
=
d2Score
?
1
:
0
;
}
static
int32_t
mndGetAvailableDnode
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
,
SArray
*
pArray
)
{
...
...
source/dnode/mnode/impl/test/db/db.cpp
浏览文件 @
fa46811a
...
...
@@ -13,28 +13,17 @@
class
MndTestDb
:
public
::
testing
::
Test
{
protected:
static
void
SetUpTestSuite
()
{
test
.
Init
(
"/tmp/mnode_test_db"
,
9030
);
const
char
*
fqdn
=
"localhost"
;
const
char
*
firstEp
=
"localhost:9030"
;
static
void
SetUpTestSuite
()
{
test
.
Init
(
"/tmp/mnode_test_db"
,
9030
);
}
static
void
TearDownTestSuite
()
{
test
.
Cleanup
();
}
server2
.
Start
(
"/tmp/mnode_test_db2"
,
fqdn
,
9031
,
firstEp
);
}
static
void
TearDownTestSuite
()
{
server2
.
Stop
();
test
.
Cleanup
();
}
static
Testbase
test
;
static
TestServer
server2
;
static
Testbase
test
;
public:
void
SetUp
()
override
{}
void
TearDown
()
override
{}
};
Testbase
MndTestDb
::
test
;
TestServer
MndTestDb
::
server2
;
Testbase
MndTestDb
::
test
;
TEST_F
(
MndTestDb
,
01
_ShowDb
)
{
test
.
SendShowMetaReq
(
TSDB_MGMT_TABLE_DB
,
""
);
...
...
source/dnode/vnode/meta/src/metaIdx.c
浏览文件 @
fa46811a
...
...
@@ -49,9 +49,7 @@ int metaOpenIdx(SMeta *pMeta) {
#ifdef USE_INVERTED_INDEX
SIndexOpts
opts
;
if
(
indexOpen
(
&
opts
,
pMeta
->
path
,
&
pMeta
->
pIdx
->
pIdx
)
!=
0
)
{
return
-
1
;
}
if
(
indexOpen
(
&
opts
,
pMeta
->
path
,
&
pMeta
->
pIdx
->
pIdx
)
!=
0
)
{
return
-
1
;
}
#endif
return
0
;
...
...
@@ -67,16 +65,14 @@ void metaCloseIdx(SMeta *pMeta) { /* TODO */
#ifdef USE_INVERTED_INDEX
SIndexOpts
opts
;
if
(
indexClose
(
pMeta
->
pIdx
->
pIdx
)
!=
0
)
{
return
-
1
;
}
if
(
indexClose
(
pMeta
->
pIdx
->
pIdx
)
!=
0
)
{
return
-
1
;
}
#endif
}
int
metaSaveTableToIdx
(
SMeta
*
pMeta
,
const
STbCfg
*
pTbCfg
)
{
#ifdef USE_INVERTED_INDEX
if
(
pTbCfgs
-
type
==
META_CHILD_TABLE
)
{
if
(
pTbCfgs
->
type
==
META_CHILD_TABLE
)
{
char
buf
[
8
]
=
{
0
};
int16_t
colId
=
(
kvRowColIdx
(
pTbCfg
->
ctbCfg
.
pTag
))[
0
].
colId
;
sprintf
(
buf
,
"%d"
,
colId
);
// colname
...
...
source/libs/index/src/index.c
浏览文件 @
fa46811a
...
...
@@ -59,6 +59,10 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oTyp
static
int
indexGenTFile
(
SIndex
*
index
,
IndexCache
*
cache
,
SArray
*
batch
);
// merge cache and tfile by opera type
static
void
indexMergeCacheAndTFile
(
SArray
*
result
,
IterateValue
*
icache
,
IterateValue
*
iTfv
);
static
void
indexMergeSameKey
(
SArray
*
result
,
TFileValue
*
tv
);
int
indexOpen
(
SIndexOpts
*
opts
,
const
char
*
path
,
SIndex
**
index
)
{
pthread_once
(
&
isInit
,
indexInit
);
SIndex
*
sIdx
=
calloc
(
1
,
sizeof
(
SIndex
));
...
...
@@ -385,6 +389,27 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv) {
taosArrayPush
(
result
,
&
tv
);
}
}
static
void
indexMergeCacheAndTFile
(
SArray
*
result
,
IterateValue
*
cv
,
IterateValue
*
tv
)
{
// opt
char
*
colVal
=
(
cv
!=
NULL
)
?
cv
->
colVal
:
tv
->
colVal
;
// design merge-algorithm later, too complicated to handle all kind of situation
TFileValue
*
tfv
=
tfileValueCreate
(
colVal
);
if
(
cv
!=
NULL
)
{
if
(
cv
->
type
==
ADD_VALUE
)
{
taosArrayAddAll
(
tfv
->
tableId
,
cv
->
val
);
}
else
if
(
cv
->
type
==
DEL_VALUE
)
{
}
else
if
(
cv
->
type
==
UPDATE_VALUE
)
{
}
else
{
// do nothing
}
}
if
(
tv
!=
NULL
)
{
// opt later
taosArrayAddAll
(
tfv
->
tableId
,
tv
->
val
);
}
indexMergeSameKey
(
result
,
tfv
);
}
static
void
indexDestroyTempResult
(
SArray
*
result
)
{
int32_t
sz
=
result
?
taosArrayGetSize
(
result
)
:
0
;
for
(
size_t
i
=
0
;
i
<
sz
;
i
++
)
{
...
...
@@ -411,51 +436,30 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
bool
cn
=
cacheIter
?
cacheIter
->
next
(
cacheIter
)
:
false
;
bool
tn
=
tfileIter
?
tfileIter
->
next
(
tfileIter
)
:
false
;
while
(
cn
==
true
&&
tn
==
true
)
{
IterateValue
*
cv
=
cacheIter
->
getValue
(
cacheIter
);
IterateValue
*
tv
=
tfileIter
->
getValue
(
tfileIter
);
// dump value
int
comp
=
strcmp
(
cv
->
colVal
,
tv
->
colVal
);
while
(
cn
==
true
||
tn
==
true
)
{
IterateValue
*
cv
=
(
cn
==
true
)
?
cacheIter
->
getValue
(
cacheIter
)
:
NULL
;
IterateValue
*
tv
=
(
tn
==
true
)
?
tfileIter
->
getValue
(
tfileIter
)
:
NULL
;
int
comp
=
0
;
if
(
cn
==
true
&&
tn
==
true
)
{
comp
=
strcmp
(
cv
->
colVal
,
tv
->
colVal
);
}
else
if
(
cn
==
true
)
{
comp
=
-
1
;
}
else
{
comp
=
1
;
}
if
(
comp
==
0
)
{
TFileValue
*
tfv
=
tfileValueCreate
(
cv
->
colVal
);
taosArrayAddAll
(
tfv
->
tableId
,
cv
->
val
);
taosArrayAddAll
(
tfv
->
tableId
,
tv
->
val
);
indexMergeSameKey
(
result
,
tfv
);
indexMergeCacheAndTFile
(
result
,
cv
,
tv
);
cn
=
cacheIter
->
next
(
cacheIter
);
tn
=
tfileIter
->
next
(
tfileIter
);
continue
;
}
else
if
(
comp
<
0
)
{
TFileValue
*
tfv
=
tfileValueCreate
(
cv
->
colVal
);
taosArrayAddAll
(
tfv
->
tableId
,
cv
->
val
);
indexMergeSameKey
(
result
,
tfv
);
// copy to final Result;
indexMergeCacheAndTFile
(
result
,
cv
,
NULL
);
cn
=
cacheIter
->
next
(
cacheIter
);
}
else
{
TFileValue
*
tfv
=
tfileValueCreate
(
tv
->
colVal
);
taosArrayAddAll
(
tfv
->
tableId
,
tv
->
val
);
indexMergeSameKey
(
result
,
tfv
);
// copy to final result
indexMergeCacheAndTFile
(
result
,
NULL
,
tv
);
tn
=
tfileIter
->
next
(
tfileIter
);
}
}
while
(
cn
==
true
)
{
IterateValue
*
cv
=
cacheIter
->
getValue
(
cacheIter
);
TFileValue
*
tfv
=
tfileValueCreate
(
cv
->
colVal
);
taosArrayAddAll
(
tfv
->
tableId
,
cv
->
val
);
indexMergeSameKey
(
result
,
tfv
);
cn
=
cacheIter
->
next
(
cacheIter
);
}
while
(
tn
==
true
)
{
IterateValue
*
tv
=
tfileIter
->
getValue
(
tfileIter
);
TFileValue
*
tfv
=
tfileValueCreate
(
tv
->
colVal
);
taosArrayAddAll
(
tfv
->
tableId
,
tv
->
val
);
indexMergeSameKey
(
result
,
tfv
);
tn
=
tfileIter
->
next
(
tfileIter
);
}
int
ret
=
indexGenTFile
(
sIdx
,
pCache
,
result
);
indexDestroyTempResult
(
result
);
...
...
@@ -503,7 +507,7 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) {
tfileWriterClose
(
tw
);
TFileReader
*
reader
=
tfileReaderOpen
(
sIdx
->
path
,
cache
->
suid
,
version
,
cache
->
colName
);
if
(
reader
==
NULL
)
{
goto
END
;
}
if
(
reader
==
NULL
)
{
return
-
1
;
}
TFileHeader
*
header
=
&
reader
->
header
;
ICacheKey
key
=
{.
suid
=
cache
->
suid
,
.
colName
=
header
->
colName
,
.
nColName
=
strlen
(
header
->
colName
)};
...
...
source/libs/index/src/index_cache.c
浏览文件 @
fa46811a
...
...
@@ -217,9 +217,9 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
// set value
ct
->
uid
=
uid
;
ct
->
operaType
=
term
->
operType
;
// ugly code, refactor later
int64_t
estimate
=
sizeof
(
ct
)
+
strlen
(
ct
->
colVal
);
pthread_mutex_lock
(
&
pCache
->
mtx
);
pCache
->
occupiedMem
+=
estimate
;
indexCacheMakeRoomForWrite
(
pCache
);
...
...
@@ -331,7 +331,6 @@ static char* indexCacheTermGet(const void* pData) {
static
int32_t
indexCacheTermCompare
(
const
void
*
l
,
const
void
*
r
)
{
CacheTerm
*
lt
=
(
CacheTerm
*
)
l
;
CacheTerm
*
rt
=
(
CacheTerm
*
)
r
;
// compare colVal
int32_t
cmp
=
strcmp
(
lt
->
colVal
,
rt
->
colVal
);
if
(
cmp
==
0
)
{
return
rt
->
version
-
lt
->
version
;
}
...
...
@@ -359,17 +358,32 @@ static bool indexCacheIteratorNext(Iterate* itera) {
IterateValue
*
iv
=
&
itera
->
val
;
iterateValueDestroy
(
iv
,
false
);
// IterateValue* iv = &itera->val;
// IterateValue tIterVal = {.colVal = NULL, .val = taosArrayInit(1, sizeof(uint64_t))};
bool
next
=
tSkipListIterNext
(
iter
);
if
(
next
)
{
SSkipListNode
*
node
=
tSkipListIterGet
(
iter
);
CacheTerm
*
ct
=
(
CacheTerm
*
)
SL_GET_NODE_DATA
(
node
);
// equal func
// if (iv->colVal != NULL && ct->colVal != NULL) {
// if (0 == strcmp(iv->colVal, ct->colVal)) { if (iv->type == ADD_VALUE) }
//} else {
// tIterVal.colVal = calloc(1, strlen(ct->colVal) + 1);
// tIterval.colVal = tstrdup(ct->colVal);
//}
iv
->
type
=
ct
->
operaType
;
iv
->
colVal
=
calloc
(
1
,
strlen
(
ct
->
colVal
)
+
1
);
memcpy
(
iv
->
colVal
,
ct
->
colVal
,
strlen
(
ct
->
colVal
));
iv
->
colVal
=
tstrdup
(
ct
->
colVal
);
// iv->colVal = calloc(1, strlen(ct->colVal) + 1);
// memcpy(iv->colVal, ct->colVal, strlen(ct->colVal));
taosArrayPush
(
iv
->
val
,
&
ct
->
uid
);
}
// IterateValue* iv = &itera->val;
// iterateValueDestroy(iv, true);
//*iv = tIterVal;
return
next
;
}
...
...
source/libs/index/src/index_fst.c
浏览文件 @
fa46811a
...
...
@@ -936,6 +936,7 @@ Fst* fstCreate(FstSlice* slice) {
len
-=
sizeof
(
checkSum
);
taosDecodeFixedU32
(
buf
+
len
,
&
checkSum
);
if
(
taosCheckChecksum
(
buf
,
len
,
checkSum
))
{
indexError
(
"index file is corrupted"
);
// verify fst
return
NULL
;
}
...
...
source/libs/index/src/index_fst_counting_writer.c
浏览文件 @
fa46811a
...
...
@@ -60,9 +60,10 @@ static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t off
return
nRead
;
}
static
int
writeCtxGetSize
(
WriterCtx
*
ctx
)
{
if
(
ctx
->
type
==
TFile
&&
ctx
->
file
.
readOnly
)
{
// refactor later
return
ctx
->
file
.
size
;
if
(
ctx
->
type
==
TFile
)
{
struct
stat
fstat
;
stat
(
ctx
->
file
.
buf
,
&
fstat
);
return
fstat
.
st_size
;
}
return
0
;
}
...
...
@@ -88,7 +89,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
if
(
readOnly
==
false
)
{
// ctx->file.fd = open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
ctx
->
file
.
fd
=
tfOpenCreateWriteAppend
(
path
);
tfFtruncate
(
ctx
->
file
.
fd
,
0
);
struct
stat
fstat
;
stat
(
path
,
&
fstat
);
ctx
->
file
.
size
=
fstat
.
st_size
;
...
...
@@ -138,6 +139,11 @@ void writerCtxDestroy(WriterCtx* ctx, bool remove) {
munmap
(
ctx
->
file
.
ptr
,
ctx
->
file
.
size
);
#endif
}
if
(
ctx
->
file
.
readOnly
==
false
)
{
struct
stat
fstat
;
stat
(
ctx
->
file
.
buf
,
&
fstat
);
// indexError("write file size: %d", (int)(fstat.st_size));
}
if
(
remove
)
{
unlink
(
ctx
->
file
.
buf
);
}
}
free
(
ctx
);
...
...
source/libs/index/src/index_tfile.c
浏览文件 @
fa46811a
...
...
@@ -147,21 +147,22 @@ TFileReader* tfileReaderCreate(WriterCtx* ctx) {
reader
->
ctx
=
ctx
;
if
(
0
!=
tfileReaderVerify
(
reader
))
{
tfileReaderDestroy
(
reader
);
indexError
(
"invalid tfile, suid: %"
PRIu64
", colName: %s"
,
reader
->
header
.
suid
,
reader
->
header
.
colName
);
tfileReaderDestroy
(
reader
);
return
NULL
;
}
// T_REF_INC(reader);
if
(
0
!=
tfileReaderLoadHeader
(
reader
))
{
tfileReaderDestroy
(
reader
);
indexError
(
"failed to load index header, suid: %"
PRIu64
", colName: %s"
,
reader
->
header
.
suid
,
reader
->
header
.
colName
);
tfileReaderDestroy
(
reader
);
return
NULL
;
}
if
(
0
!=
tfileReaderLoadFst
(
reader
))
{
indexError
(
"failed to load index fst, suid: %"
PRIu64
", colName: %s, errno: %d"
,
reader
->
header
.
suid
,
reader
->
header
.
colName
,
errno
);
tfileReaderDestroy
(
reader
);
indexError
(
"failed to load index fst, suid: %"
PRIu64
", colName: %s"
,
reader
->
header
.
suid
,
reader
->
header
.
colName
);
return
NULL
;
}
...
...
@@ -303,6 +304,8 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) {
}
else
{
// indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset,
// (int)taosArrayGetSize(v->tableId));
// indexInfo("tfile write data size: %d", tw->ctx->size(tw->ctx));
}
}
fstBuilderFinish
(
tw
->
fb
);
...
...
@@ -485,7 +488,9 @@ static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) {
static
int
tfileWriteFstOffset
(
TFileWriter
*
tw
,
int32_t
offset
)
{
int32_t
fstOffset
=
offset
+
sizeof
(
tw
->
header
.
fstOffset
);
tw
->
header
.
fstOffset
=
fstOffset
;
if
(
sizeof
(
fstOffset
)
!=
tw
->
ctx
->
write
(
tw
->
ctx
,
(
char
*
)
&
fstOffset
,
sizeof
(
fstOffset
)))
{
return
-
1
;
}
indexInfo
(
"tfile write fst offset: %d"
,
tw
->
ctx
->
size
(
tw
->
ctx
));
tw
->
offset
+=
sizeof
(
fstOffset
);
return
0
;
}
...
...
@@ -495,8 +500,11 @@ static int tfileWriteHeader(TFileWriter* writer) {
TFileHeader
*
header
=
&
writer
->
header
;
memcpy
(
buf
,
(
char
*
)
header
,
sizeof
(
buf
));
indexInfo
(
"tfile pre write header size: %d"
,
writer
->
ctx
->
size
(
writer
->
ctx
));
int
nwrite
=
writer
->
ctx
->
write
(
writer
->
ctx
,
buf
,
sizeof
(
buf
));
if
(
sizeof
(
buf
)
!=
nwrite
)
{
return
-
1
;
}
indexInfo
(
"tfile after write header size: %d"
,
writer
->
ctx
->
size
(
writer
->
ctx
));
writer
->
offset
=
nwrite
;
return
0
;
}
...
...
@@ -521,6 +529,8 @@ static int tfileWriteFooter(TFileWriter* write) {
void
*
pBuf
=
(
void
*
)
buf
;
taosEncodeFixedU64
((
void
**
)(
void
*
)
&
pBuf
,
tfileMagicNumber
);
int
nwrite
=
write
->
ctx
->
write
(
write
->
ctx
,
buf
,
strlen
(
buf
));
indexInfo
(
"tfile write footer size: %d"
,
write
->
ctx
->
size
(
write
->
ctx
));
assert
(
nwrite
==
sizeof
(
tfileMagicNumber
));
return
nwrite
;
}
...
...
source/libs/wal/inc/walInt.h
浏览文件 @
fa46811a
...
...
@@ -17,11 +17,11 @@
#define _TD_WAL_INT_H_
#include "compare.h"
#include "taoserror.h"
#include "tchecksum.h"
#include "tcoding.h"
#include "wal.h"
#include "taoserror.h"
#ifdef __cplusplus
extern
"C"
{
#endif
...
...
@@ -40,6 +40,19 @@ typedef struct WalIdxEntry {
int64_t
offset
;
}
SWalIdxEntry
;
static
inline
int
tSerializeWalIdxEntry
(
void
**
buf
,
SWalIdxEntry
*
pIdxEntry
)
{
int
tlen
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pIdxEntry
->
ver
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pIdxEntry
->
offset
);
return
0
;
}
static
inline
void
*
tDeserializeWalIdxEntry
(
void
*
buf
,
SWalIdxEntry
*
pIdxEntry
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pIdxEntry
->
ver
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pIdxEntry
->
offset
);
return
buf
;
}
static
inline
int32_t
compareWalFileInfo
(
const
void
*
pLeft
,
const
void
*
pRight
)
{
SWalFileInfo
*
pInfoLeft
=
(
SWalFileInfo
*
)
pLeft
;
SWalFileInfo
*
pInfoRight
=
(
SWalFileInfo
*
)
pRight
;
...
...
@@ -130,12 +143,12 @@ int walMetaDeserialize(SWal* pWal, const char* bytes);
// meta section end
// seek section
int
walChange
Fil
e
(
SWal
*
pWal
,
int64_t
ver
);
int
wal
ChangeFileToLast
(
SWal
*
pWal
);
int
walChange
Writ
e
(
SWal
*
pWal
,
int64_t
ver
);
int
wal
SetWrite
(
SWal
*
pWal
);
// seek section end
int64_t
walGetSeq
();
int
walSeekVer
(
SWal
*
pWal
,
int64_t
ver
);
int
walSeek
Write
Ver
(
SWal
*
pWal
,
int64_t
ver
);
int
walRoll
(
SWal
*
pWal
);
#ifdef __cplusplus
...
...
source/libs/wal/src/walMeta.c
浏览文件 @
fa46811a
...
...
@@ -13,11 +13,9 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "cJSON.h"
#include "os.h"
#include "taoserror.h"
#include "tfile.h"
#include "tref.h"
#include "walInt.h"
...
...
@@ -34,13 +32,98 @@ static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
return
sprintf
(
buf
,
"%s/meta-ver%d"
,
pWal
->
path
,
metaVer
);
}
void
*
tmemmem
(
char
*
haystack
,
int
hlen
,
char
*
needle
,
int
nlen
)
{
char
*
limit
;
if
(
nlen
==
0
||
hlen
<
nlen
)
{
return
false
;
}
limit
=
haystack
+
hlen
-
nlen
+
1
;
while
((
haystack
=
(
char
*
)
memchr
(
haystack
,
needle
[
0
],
limit
-
haystack
))
!=
NULL
)
{
if
(
memcmp
(
haystack
,
needle
,
nlen
)
==
0
)
{
return
haystack
;
}
haystack
++
;
}
return
NULL
;
}
static
inline
int64_t
walScanLogGetLastVer
(
SWal
*
pWal
)
{
ASSERT
(
pWal
->
fileInfoSet
!=
NULL
);
int
sz
=
taosArrayGetSize
(
pWal
->
fileInfoSet
);
ASSERT
(
sz
>
0
);
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
SWalFileInfo
*
pFileInfo
=
taosArrayGet
(
pWal
->
fileInfoSet
,
i
);
}
SWalFileInfo
*
pLastFileInfo
=
taosArrayGet
(
pWal
->
fileInfoSet
,
sz
-
1
);
char
fnameStr
[
WAL_FILE_LEN
];
walBuildLogName
(
pWal
,
pLastFileInfo
->
firstVer
,
fnameStr
);
struct
stat
statbuf
;
stat
(
fnameStr
,
&
statbuf
);
int
readSize
=
MIN
(
WAL_MAX_SIZE
+
2
,
statbuf
.
st_size
);
pLastFileInfo
->
fileSize
=
statbuf
.
st_size
;
FileFd
fd
=
taosOpenFileRead
(
fnameStr
);
if
(
fd
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
uint64_t
magic
=
WAL_MAGIC
;
char
*
buf
=
malloc
(
readSize
+
5
);
if
(
buf
==
NULL
)
{
taosCloseFile
(
fd
);
terrno
=
TSDB_CODE_WAL_OUT_OF_MEMORY
;
return
-
1
;
}
taosLSeekFile
(
fd
,
-
readSize
,
SEEK_END
);
if
(
readSize
!=
taosReadFile
(
fd
,
buf
,
readSize
))
{
free
(
buf
);
taosCloseFile
(
fd
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
char
*
haystack
=
buf
;
char
*
found
=
NULL
;
char
*
candidate
;
while
((
candidate
=
tmemmem
(
haystack
,
readSize
-
(
haystack
-
buf
),
(
char
*
)
&
magic
,
sizeof
(
uint64_t
)))
!=
NULL
)
{
// read and validate
SWalHead
*
logContent
=
(
SWalHead
*
)
candidate
;
if
(
walValidHeadCksum
(
logContent
)
==
0
&&
walValidBodyCksum
(
logContent
)
==
0
)
{
found
=
candidate
;
}
haystack
=
candidate
+
1
;
}
if
(
found
==
buf
)
{
SWalHead
*
logContent
=
(
SWalHead
*
)
found
;
if
(
walValidHeadCksum
(
logContent
)
!=
0
||
walValidBodyCksum
(
logContent
)
!=
0
)
{
// file has to be deleted
free
(
buf
);
taosCloseFile
(
fd
);
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
}
taosCloseFile
(
fd
);
SWalHead
*
lastEntry
=
(
SWalHead
*
)
found
;
return
lastEntry
->
head
.
version
;
}
int
walCheckAndRepairMeta
(
SWal
*
pWal
)
{
// load log files, get first/snapshot/last version info
const
char
*
logPattern
=
"^[0-9]+.log$"
;
const
char
*
idxPattern
=
"^[0-9]+.idx$"
;
regex_t
logRegPattern
;
regex_t
idxRegPattern
;
SArray
*
pLog
Array
=
taosArrayInit
(
8
,
sizeof
(
int64_t
));
SArray
*
pLog
InfoArray
=
taosArrayInit
(
8
,
sizeof
(
SWalFileInfo
));
regcomp
(
&
logRegPattern
,
logPattern
,
REG_EXTENDED
);
regcomp
(
&
idxRegPattern
,
idxPattern
,
REG_EXTENDED
);
...
...
@@ -51,20 +134,78 @@ int walCheckAndRepairMeta(SWal* pWal) {
return
-
1
;
}
// scan log files and build new meta
struct
dirent
*
ent
;
while
((
ent
=
readdir
(
dir
))
!=
NULL
)
{
char
*
name
=
basename
(
ent
->
d_name
);
int
code
=
regexec
(
&
logRegPattern
,
name
,
0
,
NULL
,
0
);
if
(
code
==
0
)
{
int64_t
firstVer
;
sscanf
(
name
,
"%"
PRId64
".log"
,
&
firstVer
);
taosArrayPush
(
pLogArray
,
&
firstVer
);
SWalFileInfo
fileInfo
;
memset
(
&
fileInfo
,
-
1
,
sizeof
(
SWalFileInfo
));
sscanf
(
name
,
"%"
PRId64
".log"
,
&
fileInfo
.
firstVer
);
//get lastVer
//get size
taosArrayPush
(
pLogInfoArray
,
&
fileInfo
);
}
}
// load meta
// if not match, or meta missing
regfree
(
&
logRegPattern
);
regfree
(
&
idxRegPattern
);
taosArraySort
(
pLogInfoArray
,
compareWalFileInfo
);
int
oldSz
=
0
;
if
(
pWal
->
fileInfoSet
)
{
oldSz
=
taosArrayGetSize
(
pWal
->
fileInfoSet
);
}
int
newSz
=
taosArrayGetSize
(
pLogInfoArray
);
// case 1. meta file not exist / cannot be parsed
if
(
oldSz
<
newSz
)
{
for
(
int
i
=
oldSz
;
i
<
newSz
;
i
++
)
{
SWalFileInfo
*
pFileInfo
=
taosArrayGet
(
pLogInfoArray
,
i
);
taosArrayPush
(
pWal
->
fileInfoSet
,
pFileInfo
);
}
pWal
->
writeCur
=
newSz
-
1
;
pWal
->
vers
.
firstVer
=
((
SWalFileInfo
*
)
taosArrayGet
(
pLogInfoArray
,
0
))
->
firstVer
;
pWal
->
vers
.
lastVer
=
walScanLogGetLastVer
(
pWal
);
((
SWalFileInfo
*
)
taosArrayGetLast
(
pWal
->
fileInfoSet
))
->
lastVer
=
pWal
->
vers
.
lastVer
;
ASSERT
(
pWal
->
vers
.
lastVer
!=
-
1
);
int
code
=
walSaveMeta
(
pWal
);
if
(
code
<
0
)
{
taosArrayDestroy
(
pLogInfoArray
);
return
-
1
;
}
}
// case 2. versions in meta not match log
// or some log not included in meta
// (e.g. program killed)
//
// case 3. other corrupt cases
//
#if 0
int sz = taosArrayGetSize(pLogInfoArray);
for (int i = 0; i < sz; i++) {
SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, i);
if (i == 0 && pFileInfo->firstVer != walGetFirstVer(pWal)) {
//repair
}
if (i > 0) {
SWalFileInfo* pLastFileInfo = taosArrayGet(pLogInfoArray, i-1);
if (pLastFileInfo->lastVer != pFileInfo->firstVer) {
}
}
}
#endif
// get last version of this file
//
// rebuild meta
taosArrayDestroy
(
pLogInfoArray
);
return
0
;
}
...
...
@@ -87,6 +228,7 @@ int walRollFileInfo(SWal* pWal) {
// TODO: change to emplace back
SWalFileInfo
*
pNewInfo
=
malloc
(
sizeof
(
SWalFileInfo
));
if
(
pNewInfo
==
NULL
)
{
terrno
=
TSDB_CODE_WAL_OUT_OF_MEMORY
;
return
-
1
;
}
pNewInfo
->
firstVer
=
pWal
->
vers
.
lastVer
+
1
;
...
...
@@ -94,7 +236,7 @@ int walRollFileInfo(SWal* pWal) {
pNewInfo
->
createTs
=
ts
;
pNewInfo
->
closeTs
=
-
1
;
pNewInfo
->
fileSize
=
0
;
taosArrayPush
(
p
Wal
->
fileInfoSet
,
pNewInfo
);
taosArrayPush
(
p
Array
,
pNewInfo
);
free
(
pNewInfo
);
return
0
;
}
...
...
@@ -108,7 +250,16 @@ char* walMetaSerialize(SWal* pWal) {
cJSON
*
pFiles
=
cJSON_CreateArray
();
cJSON
*
pField
;
if
(
pRoot
==
NULL
||
pMeta
==
NULL
||
pFiles
==
NULL
)
{
// TODO
if
(
pRoot
)
{
cJSON_Delete
(
pRoot
);
}
if
(
pMeta
)
{
cJSON_Delete
(
pMeta
);
}
if
(
pFiles
)
{
cJSON_Delete
(
pFiles
);
}
terrno
=
TSDB_CODE_WAL_OUT_OF_MEMORY
;
return
NULL
;
}
cJSON_AddItemToObject
(
pRoot
,
"meta"
,
pMeta
);
...
...
@@ -221,18 +372,18 @@ int walSaveMeta(SWal* pWal) {
int
metaVer
=
walFindCurMetaVer
(
pWal
);
char
fnameStr
[
WAL_FILE_LEN
];
walBuildMetaName
(
pWal
,
metaVer
+
1
,
fnameStr
);
int
metaTfd
=
tfOpen
CreateWrite
(
fnameStr
);
if
(
meta
Tf
d
<
0
)
{
FileFd
metaFd
=
taosOpenFile
CreateWrite
(
fnameStr
);
if
(
meta
F
d
<
0
)
{
return
-
1
;
}
char
*
serialized
=
walMetaSerialize
(
pWal
);
int
len
=
strlen
(
serialized
);
if
(
len
!=
t
fWrite
(
metaTf
d
,
serialized
,
len
))
{
if
(
len
!=
t
aosWriteFile
(
metaF
d
,
serialized
,
len
))
{
// TODO:clean file
return
-
1
;
}
t
fClose
(
metaTf
d
);
t
aosCloseFile
(
metaF
d
);
// delete old file
if
(
metaVer
>
-
1
)
{
walBuildMetaName
(
pWal
,
metaVer
,
fnameStr
);
...
...
@@ -247,7 +398,7 @@ int walLoadMeta(SWal* pWal) {
// find existing meta file
int
metaVer
=
walFindCurMetaVer
(
pWal
);
if
(
metaVer
==
-
1
)
{
return
0
;
return
-
1
;
}
char
fnameStr
[
WAL_FILE_LEN
];
walBuildMetaName
(
pWal
,
metaVer
,
fnameStr
);
...
...
@@ -257,23 +408,24 @@ int walLoadMeta(SWal* pWal) {
int
size
=
statbuf
.
st_size
;
char
*
buf
=
malloc
(
size
+
5
);
if
(
buf
==
NULL
)
{
terrno
=
TSDB_CODE_WAL_OUT_OF_MEMORY
;
return
-
1
;
}
memset
(
buf
,
0
,
size
+
5
);
int
tfd
=
tfOpenRead
(
fnameStr
);
if
(
tfRead
(
tfd
,
buf
,
size
)
!=
size
)
{
tfClose
(
tfd
);
free
(
buf
);
FileFd
fd
=
taosOpenFileRead
(
fnameStr
);
if
(
fd
<
0
)
{
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
// load into fileInfoSet
int
code
=
walMetaDeserialize
(
pWal
,
buf
);
if
(
code
!=
0
)
{
tfClose
(
tfd
);
if
(
taosReadFile
(
fd
,
buf
,
size
)
!=
size
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
taosCloseFile
(
fd
);
free
(
buf
);
return
-
1
;
}
tfClose
(
tfd
);
// load into fileInfoSet
int
code
=
walMetaDeserialize
(
pWal
,
buf
);
taosCloseFile
(
fd
);
free
(
buf
);
return
0
;
return
code
;
}
source/libs/wal/src/walMgmt.c
浏览文件 @
fa46811a
...
...
@@ -106,6 +106,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
// init write buffer
memset
(
&
pWal
->
writeHead
,
0
,
sizeof
(
SWalHead
));
pWal
->
writeHead
.
head
.
headVer
=
WAL_HEAD_VER
;
pWal
->
writeHead
.
magic
=
WAL_MAGIC
;
if
(
pthread_mutex_init
(
&
pWal
->
mutex
,
NULL
)
<
0
)
{
taosArrayDestroy
(
pWal
->
fileInfoSet
);
...
...
@@ -121,7 +122,9 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
return
NULL
;
}
if
(
walLoadMeta
(
pWal
)
<
0
&&
walCheckAndRepairMeta
(
pWal
)
<
0
)
{
walLoadMeta
(
pWal
);
if
(
walCheckAndRepairMeta
(
pWal
)
<
0
)
{
taosRemoveRef
(
tsWal
.
refSetId
,
pWal
->
refId
);
pthread_mutex_destroy
(
&
pWal
->
mutex
);
taosArrayDestroy
(
pWal
->
fileInfoSet
);
...
...
@@ -130,6 +133,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
}
if
(
walCheckAndRepairIdx
(
pWal
)
<
0
)
{
}
wDebug
(
"vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d"
,
pWal
->
cfg
.
vgId
,
pWal
,
pWal
->
cfg
.
level
,
...
...
source/libs/wal/src/walSeek.c
浏览文件 @
fa46811a
...
...
@@ -20,7 +20,7 @@
#include "tref.h"
#include "walInt.h"
static
int
walSeek
Fil
ePos
(
SWal
*
pWal
,
int64_t
ver
)
{
static
int
walSeek
Writ
ePos
(
SWal
*
pWal
,
int64_t
ver
)
{
int
code
=
0
;
int64_t
idxTfd
=
pWal
->
writeIdxTfd
;
...
...
@@ -41,7 +41,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
return
-
1
;
}
ASSERT
(
entry
.
ver
==
ver
);
code
=
tfLseek
(
logTfd
,
entry
.
offset
,
SEEK_
CUR
);
code
=
tfLseek
(
logTfd
,
entry
.
offset
,
SEEK_
SET
);
if
(
code
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
...
...
@@ -49,7 +49,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) {
return
code
;
}
int
wal
ChangeFileToLast
(
SWal
*
pWal
)
{
int
wal
SetWrite
(
SWal
*
pWal
)
{
int64_t
idxTfd
,
logTfd
;
SWalFileInfo
*
pRet
=
taosArrayGetLast
(
pWal
->
fileInfoSet
);
ASSERT
(
pRet
!=
NULL
);
...
...
@@ -57,13 +57,13 @@ int walChangeFileToLast(SWal* pWal) {
char
fnameStr
[
WAL_FILE_LEN
];
walBuildIdxName
(
pWal
,
fileFirstVer
,
fnameStr
);
idxTfd
=
tfOpen
ReadWrite
(
fnameStr
);
idxTfd
=
tfOpen
CreateWriteAppend
(
fnameStr
);
if
(
idxTfd
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
walBuildLogName
(
pWal
,
fileFirstVer
,
fnameStr
);
logTfd
=
tfOpen
ReadWrite
(
fnameStr
);
logTfd
=
tfOpen
CreateWriteAppend
(
fnameStr
);
if
(
logTfd
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
...
...
@@ -74,46 +74,57 @@ int walChangeFileToLast(SWal* pWal) {
return
0
;
}
int
walChange
Fil
e
(
SWal
*
pWal
,
int64_t
ver
)
{
int
walChange
Writ
e
(
SWal
*
pWal
,
int64_t
ver
)
{
int
code
=
0
;
int64_t
idxTfd
,
logTfd
;
char
fnameStr
[
WAL_FILE_LEN
];
code
=
tfClose
(
pWal
->
writeLogTfd
);
if
(
code
!=
0
)
{
// TODO
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
if
(
pWal
->
writeLogTfd
!=
-
1
)
{
code
=
tfClose
(
pWal
->
writeLogTfd
);
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
}
code
=
tfClose
(
pWal
->
writeIdxTfd
);
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
if
(
pWal
->
writeIdxTfd
!=
-
1
)
{
code
=
tfClose
(
pWal
->
writeIdxTfd
);
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
}
SWalFileInfo
tmpInfo
;
tmpInfo
.
firstVer
=
ver
;
// bsearch in fileSet
SWalFileInfo
*
pRet
=
taosArraySearch
(
pWal
->
fileInfoSet
,
&
tmpInfo
,
compareWalFileInfo
,
TD_LE
);
ASSERT
(
pRet
!=
NULL
);
int64_t
fileFirstVer
=
pRet
->
firstVer
;
// closed
if
(
taosArrayGetLast
(
pWal
->
fileInfoSet
)
!=
pRet
)
{
walBuildIdxName
(
pWal
,
fileFirstVer
,
fnameStr
);
idxTfd
=
tfOpenRead
(
fnameStr
);
walBuildLogName
(
pWal
,
fileFirstVer
,
fnameStr
);
logTfd
=
tfOpenRead
(
fnameStr
);
}
else
{
walBuildIdxName
(
pWal
,
fileFirstVer
,
fnameStr
);
idxTfd
=
tfOpenReadWrite
(
fnameStr
);
walBuildLogName
(
pWal
,
fileFirstVer
,
fnameStr
);
logTfd
=
tfOpenReadWrite
(
fnameStr
);
int32_t
idx
=
taosArraySearchIdx
(
pWal
->
fileInfoSet
,
&
tmpInfo
,
compareWalFileInfo
,
TD_LE
);
ASSERT
(
idx
!=
-
1
);
SWalFileInfo
*
pFileInfo
=
taosArrayGet
(
pWal
->
fileInfoSet
,
idx
);
/*ASSERT(pFileInfo != NULL);*/
int64_t
fileFirstVer
=
pFileInfo
->
firstVer
;
walBuildIdxName
(
pWal
,
fileFirstVer
,
fnameStr
);
idxTfd
=
tfOpenCreateWriteAppend
(
fnameStr
);
if
(
idxTfd
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
pWal
->
writeIdxTfd
=
-
1
;
return
-
1
;
}
walBuildLogName
(
pWal
,
fileFirstVer
,
fnameStr
);
logTfd
=
tfOpenCreateWriteAppend
(
fnameStr
);
if
(
logTfd
<
0
)
{
tfClose
(
idxTfd
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
pWal
->
writeLogTfd
=
-
1
;
return
-
1
;
}
pWal
->
writeLogTfd
=
logTfd
;
pWal
->
writeIdxTfd
=
idxTfd
;
pWal
->
writeCur
=
idx
;
return
fileFirstVer
;
}
int
walSeekVer
(
SWal
*
pWal
,
int64_t
ver
)
{
int
walSeek
Write
Ver
(
SWal
*
pWal
,
int64_t
ver
)
{
int
code
;
if
(
ver
==
pWal
->
vers
.
lastVer
)
{
return
0
;
...
...
@@ -123,14 +134,15 @@ int walSeekVer(SWal* pWal, int64_t ver) {
return
-
1
;
}
if
(
ver
<
pWal
->
vers
.
snapshotVer
)
{
}
if
(
ver
<
walGetCurFileFirstVer
(
pWal
)
||
(
ver
>
walGetCurFileLastVer
(
pWal
)))
{
code
=
walChange
Fil
e
(
pWal
,
ver
);
code
=
walChange
Writ
e
(
pWal
,
ver
);
if
(
code
!=
0
)
{
return
-
1
;
}
}
code
=
walSeek
Fil
ePos
(
pWal
,
ver
);
code
=
walSeek
Writ
ePos
(
pWal
,
ver
);
if
(
code
!=
0
)
{
return
-
1
;
}
...
...
source/libs/wal/src/walWrite.c
浏览文件 @
fa46811a
...
...
@@ -46,12 +46,9 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
// find correct file
if
(
ver
<
walGetLastFileFirstVer
(
pWal
))
{
// close current files
tfClose
(
pWal
->
writeIdxTfd
);
tfClose
(
pWal
->
writeLogTfd
);
// open old files
code
=
walChangeFile
(
pWal
,
ver
);
if
(
code
!=
0
)
{
// change current files
code
=
walChangeWrite
(
pWal
,
ver
);
if
(
code
<
0
)
{
return
-
1
;
}
...
...
@@ -166,7 +163,8 @@ int32_t walEndSnapshot(SWal *pWal) {
}
// iterate files, until the searched result
for
(
SWalFileInfo
*
iter
=
pWal
->
fileInfoSet
->
pData
;
iter
<
pInfo
;
iter
++
)
{
if
(
pWal
->
totSize
>
pWal
->
cfg
.
retentionSize
||
iter
->
closeTs
+
pWal
->
cfg
.
retentionPeriod
>
ts
)
{
if
((
pWal
->
cfg
.
retentionSize
!=
-
1
&&
pWal
->
totSize
>
pWal
->
cfg
.
retentionSize
)
||
(
pWal
->
cfg
.
retentionPeriod
!=
-
1
&&
iter
->
closeTs
+
pWal
->
cfg
.
retentionPeriod
>
ts
))
{
// delete according to file size or close time
deleteCnt
++
;
newTotSize
-=
iter
->
fileSize
;
...
...
@@ -191,13 +189,12 @@ int32_t walEndSnapshot(SWal *pWal) {
pWal
->
vers
.
firstVer
=
((
SWalFileInfo
*
)
taosArrayGet
(
pWal
->
fileInfoSet
,
0
))
->
firstVer
;
}
pWal
->
writeCur
=
taosArrayGetSize
(
pWal
->
fileInfoSet
)
-
1
;
;
pWal
->
totSize
=
newTotSize
;
pWal
->
vers
.
verInSnapshotting
=
-
1
;
// save snapshot ver, commit ver
int
code
=
walSaveMeta
(
pWal
);
if
(
code
!=
0
)
{
if
(
code
<
0
)
{
return
-
1
;
}
...
...
@@ -225,18 +222,17 @@ int walRoll(SWal *pWal) {
walBuildIdxName
(
pWal
,
newFileFirstVersion
,
fnameStr
);
idxTfd
=
tfOpenCreateWriteAppend
(
fnameStr
);
if
(
idxTfd
<
0
)
{
ASSERT
(
0
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
walBuildLogName
(
pWal
,
newFileFirstVersion
,
fnameStr
);
logTfd
=
tfOpenCreateWriteAppend
(
fnameStr
);
if
(
logTfd
<
0
)
{
ASSERT
(
0
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
code
=
walRollFileInfo
(
pWal
);
if
(
code
!=
0
)
{
ASSERT
(
0
);
return
-
1
;
}
...
...
@@ -291,8 +287,11 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
ASSERT
(
pWal
->
writeCur
>=
0
);
pthread_mutex_lock
(
&
pWal
->
mutex
);
if
(
pWal
->
writeIdxTfd
==
-
1
||
pWal
->
writeLogTfd
==
-
1
)
{
walChangeFileToLast
(
pWal
);
walSetWrite
(
pWal
);
tfLseek
(
pWal
->
writeLogTfd
,
0
,
SEEK_END
);
tfLseek
(
pWal
->
writeIdxTfd
,
0
,
SEEK_END
);
}
pWal
->
writeHead
.
head
.
version
=
index
;
...
...
source/libs/wal/test/walMetaTest.cpp
浏览文件 @
fa46811a
...
...
@@ -107,6 +107,43 @@ class WalKeepEnv : public ::testing::Test {
const
char
*
pathName
=
"/tmp/wal_test"
;
};
class
WalRetentionEnv
:
public
::
testing
::
Test
{
protected:
static
void
SetUpTestCase
()
{
int
code
=
walInit
();
ASSERT
(
code
==
0
);
}
static
void
TearDownTestCase
()
{
walCleanUp
();
}
void
walResetEnv
()
{
TearDown
();
taosRemoveDir
(
pathName
);
SetUp
();
}
void
SetUp
()
override
{
SWalCfg
cfg
;
cfg
.
rollPeriod
=
-
1
,
cfg
.
segSize
=
-
1
,
cfg
.
retentionPeriod
=
-
1
,
cfg
.
retentionSize
=
0
,
cfg
.
rollPeriod
=
0
,
cfg
.
vgId
=
0
,
cfg
.
level
=
TAOS_WAL_FSYNC
;
pWal
=
walOpen
(
pathName
,
&
cfg
);
ASSERT
(
pWal
!=
NULL
);
}
void
TearDown
()
override
{
walClose
(
pWal
);
pWal
=
NULL
;
}
SWal
*
pWal
=
NULL
;
const
char
*
pathName
=
"/tmp/wal_test"
;
};
TEST_F
(
WalCleanEnv
,
createNew
)
{
walRollFileInfo
(
pWal
);
ASSERT
(
pWal
->
fileInfoSet
!=
NULL
);
...
...
@@ -283,3 +320,86 @@ TEST_F(WalKeepEnv, readHandleRead) {
}
}
}
TEST_F
(
WalRetentionEnv
,
repairMeta1
)
{
walResetEnv
();
int
code
;
int
i
;
for
(
i
=
0
;
i
<
100
;
i
++
)
{
char
newStr
[
100
];
sprintf
(
newStr
,
"%s-%d"
,
ranStr
,
i
);
int
len
=
strlen
(
newStr
);
code
=
walWrite
(
pWal
,
i
,
0
,
newStr
,
len
);
ASSERT_EQ
(
code
,
0
);
}
TearDown
();
//getchar();
char
buf
[
100
];
sprintf
(
buf
,
"%s/meta-ver%d"
,
pathName
,
0
);
remove
(
buf
);
sprintf
(
buf
,
"%s/meta-ver%d"
,
pathName
,
1
);
remove
(
buf
);
SetUp
();
//getchar();
ASSERT_EQ
(
pWal
->
vers
.
lastVer
,
99
);
SWalReadHandle
*
pRead
=
walOpenReadHandle
(
pWal
);
ASSERT
(
pRead
!=
NULL
);
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
int
ver
=
rand
()
%
100
;
code
=
walReadWithHandle
(
pRead
,
ver
);
ASSERT_EQ
(
code
,
0
);
// printf("rrbody: \n");
// for(int i = 0; i < pRead->pHead->head.len; i++) {
// printf("%d ", pRead->pHead->head.body[i]);
//}
// printf("\n");
ASSERT_EQ
(
pRead
->
pHead
->
head
.
version
,
ver
);
ASSERT_EQ
(
pRead
->
curVersion
,
ver
+
1
);
char
newStr
[
100
];
sprintf
(
newStr
,
"%s-%d"
,
ranStr
,
ver
);
int
len
=
strlen
(
newStr
);
ASSERT_EQ
(
pRead
->
pHead
->
head
.
len
,
len
);
for
(
int
j
=
0
;
j
<
len
;
j
++
)
{
EXPECT_EQ
(
newStr
[
j
],
pRead
->
pHead
->
head
.
body
[
j
]);
}
}
for
(
i
=
100
;
i
<
200
;
i
++
)
{
char
newStr
[
100
];
sprintf
(
newStr
,
"%s-%d"
,
ranStr
,
i
);
int
len
=
strlen
(
newStr
);
code
=
walWrite
(
pWal
,
i
,
0
,
newStr
,
len
);
ASSERT_EQ
(
code
,
0
);
}
for
(
int
i
=
0
;
i
<
1000
;
i
++
)
{
int
ver
=
rand
()
%
200
;
code
=
walReadWithHandle
(
pRead
,
ver
);
ASSERT_EQ
(
code
,
0
);
// printf("rrbody: \n");
// for(int i = 0; i < pRead->pHead->head.len; i++) {
// printf("%d ", pRead->pHead->head.body[i]);
//}
// printf("\n");
ASSERT_EQ
(
pRead
->
pHead
->
head
.
version
,
ver
);
ASSERT_EQ
(
pRead
->
curVersion
,
ver
+
1
);
char
newStr
[
100
];
sprintf
(
newStr
,
"%s-%d"
,
ranStr
,
ver
);
int
len
=
strlen
(
newStr
);
ASSERT_EQ
(
pRead
->
pHead
->
head
.
len
,
len
);
for
(
int
j
=
0
;
j
<
len
;
j
++
)
{
EXPECT_EQ
(
newStr
[
j
],
pRead
->
pHead
->
head
.
body
[
j
]);
}
}
}
source/util/src/tarray.c
浏览文件 @
fa46811a
...
...
@@ -342,7 +342,7 @@ void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t compa
int32_t
taosArraySearchIdx
(
const
SArray
*
pArray
,
const
void
*
key
,
__compar_fn_t
comparFn
,
int
flags
)
{
void
*
item
=
taosArraySearch
(
pArray
,
key
,
comparFn
,
flags
);
return
(
int32_t
)((
char
*
)
item
-
(
char
*
)
pArray
->
pData
)
/
pArray
->
elemSize
;
return
item
==
NULL
?
-
1
:
(
int32_t
)((
char
*
)
item
-
(
char
*
)
pArray
->
pData
)
/
pArray
->
elemSize
;
}
void
taosArraySortString
(
SArray
*
pArray
,
__compar_fn_t
comparFn
)
{
...
...
source/util/test/arrayTest.cpp
浏览文件 @
fa46811a
...
...
@@ -4,6 +4,7 @@
#include <random>
#include "tarray.h"
#include "tcompare.h"
namespace
{
...
...
@@ -48,3 +49,34 @@ static void remove_batch_test() {
TEST
(
arrayTest
,
array_list_test
)
{
remove_batch_test
();
}
TEST
(
arrayTest
,
array_search_test
)
{
SArray
*
pa
=
(
SArray
*
)
taosArrayInit
(
4
,
sizeof
(
int32_t
));
for
(
int32_t
i
=
10
;
i
<
20
;
++
i
)
{
int32_t
a
=
i
;
taosArrayPush
(
pa
,
&
a
);
}
for
(
int
i
=
0
;
i
<
30
;
i
++
)
{
int32_t
k
=
i
;
int32_t
*
pRet
=
(
int32_t
*
)
taosArraySearch
(
pa
,
&
k
,
compareInt32Val
,
TD_GE
);
int32_t
idx
=
taosArraySearchIdx
(
pa
,
&
k
,
compareInt32Val
,
TD_GE
);
if
(
pRet
==
NULL
)
{
ASSERT_EQ
(
idx
,
-
1
);
}
else
{
ASSERT_EQ
(
taosArrayGet
(
pa
,
idx
),
pRet
);
}
pRet
=
(
int32_t
*
)
taosArraySearch
(
pa
,
&
k
,
compareInt32Val
,
TD_LE
);
idx
=
taosArraySearchIdx
(
pa
,
&
k
,
compareInt32Val
,
TD_LE
);
if
(
pRet
==
NULL
)
{
ASSERT_EQ
(
idx
,
-
1
);
}
else
{
ASSERT_EQ
(
taosArrayGet
(
pa
,
idx
),
pRet
);
}
}
}
tests/script/jenkins/basic.txt
浏览文件 @
fa46811a
...
...
@@ -2,15 +2,16 @@
#======================b1-start===============
# ---- user
./test.sh -f
general
/user/basic1.sim
./test.sh -f
sim
/user/basic1.sim
# ---- db
./test.sh -f general/db/basic1.sim
./test.sh -f sim/db/basic1.sim
./test.sh -f sim/db/error1.sim
# ---- table
./test.sh -f
general
/table/basic1.sim
./test.sh -f
sim
/table/basic1.sim
# ---- dnode
./test.sh -f
unique
/dnode/basic1.sim
./test.sh -f
sim
/dnode/basic1.sim
#======================b1-end===============
tests/script/sh/deploy.sh
浏览文件 @
fa46811a
...
...
@@ -120,7 +120,7 @@ echo "firstEp ${HOSTNAME}:7100" >> $TAOS_CFG
echo
"secondEp
${
HOSTNAME
}
:7200"
>>
$TAOS_CFG
echo
"fqdn
${
HOSTNAME
}
"
>>
$TAOS_CFG
echo
"serverPort
${
NODE
}
"
>>
$TAOS_CFG
echo
"supportVnodes 1
6
"
>>
$TAOS_CFG
echo
"supportVnodes 1
28
"
>>
$TAOS_CFG
echo
"dataDir
$DATA_DIR
"
>>
$TAOS_CFG
echo
"logDir
$LOG_DIR
"
>>
$TAOS_CFG
echo
"debugFlag 0"
>>
$TAOS_CFG
...
...
tests/script/
general
/db/basic1.sim
→
tests/script/
sim
/db/basic1.sim
浏览文件 @
fa46811a
...
...
@@ -85,7 +85,6 @@ if $data02 != 2 then
return -1
endi
return
system sh/exec.sh -n dnode1 -s stop -x SIGKILL
system sh/exec.sh -n dnode1 -s start
...
...
@@ -104,4 +103,4 @@ if $rows != 2 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
tests/script/
general/db/basic
.sim
→
tests/script/
sim/db/basic6
.sim
浏览文件 @
fa46811a
文件已移动
tests/script/sim/db/error1.sim
0 → 100644
浏览文件 @
fa46811a
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
system sh/deploy.sh -n dnode2 -i 2
system sh/exec.sh -n dnode2 -s start
sql connect
print ========== create dnodes
sql create dnode $hostname port 7200
$x = 0
create1:
$x = $x + 1
sleep 1000
if $x == 10 then
return -1
endi
sql show dnodes
if $data4_2 != ready then
goto create1
endi
print ========== stop dnode2
system sh/exec.sh -n dnode2 -s stop -x SIGKILL
print =============== create database
sql_error create database d1 vgroups 4
print ========== start dnode2
system sh/exec.sh -n dnode2 -s start
print =============== re-create database
$x = 0
re-create1:
$x = $x + 1
sleep 1000
if $x == 10 then
return -1
endi
sql create database d1 vgroups 2 -x re-create1
sql show databases
if $rows != 1 then
return -1
endi
if $data00 != d1 then
return -1
endi
if $data02 != 2 then
return -1
endi
if $data03 != 0 then
return -1
endi
print ========== stop dnode2
system sh/exec.sh -n dnode2 -s stop -x SIGKILL
print =============== create database
sql_error drop database d1
print ========== start dnode2
system sh/exec.sh -n dnode2 -s start
print =============== re-create database
$x = 0
re-create2:
$x = $x + 1
sleep 1000
if $x == 10 then
return -1
endi
sql create database d1 vgroups 5 -x re-create2
sql show databases
if $rows != 1 then
return -1
endi
if $data00 != d1 then
return -1
endi
if $data02 != 5 then
return -1
endi
if $data03 != 0 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
\ No newline at end of file
tests/script/
unique
/dnode/basic1.sim
→
tests/script/
sim
/dnode/basic1.sim
浏览文件 @
fa46811a
文件已移动
tests/script/
general
/table/basic1.sim
→
tests/script/
sim
/table/basic1.sim
浏览文件 @
fa46811a
文件已移动
tests/script/
general
/user/basic1.sim
→
tests/script/
sim
/user/basic1.sim
浏览文件 @
fa46811a
文件已移动
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录