Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
a803f240
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a803f240
编写于
11月 16, 2020
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-2066]<feature>: pass error code when commit failed
上级
47d6e311
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
349 addition
and
302 deletion
+349
-302
src/inc/tsdb.h
src/inc/tsdb.h
+1
-1
src/tsdb/src/tsdbCommit.c
src/tsdb/src/tsdbCommit.c
+337
-0
src/tsdb/src/tsdbFile.c
src/tsdb/src/tsdbFile.c
+2
-1
src/tsdb/src/tsdbMemTable.c
src/tsdb/src/tsdbMemTable.c
+1
-298
src/util/src/tkvstore.c
src/util/src/tkvstore.c
+2
-0
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+6
-2
未找到文件。
src/inc/tsdb.h
浏览文件 @
a803f240
...
...
@@ -46,7 +46,7 @@ extern "C" {
typedef
struct
{
void
*
appH
;
void
*
cqH
;
int
(
*
notifyStatus
)(
void
*
,
int
status
);
int
(
*
notifyStatus
)(
void
*
,
int
status
,
int
eno
);
int
(
*
eventCallBack
)(
void
*
);
void
*
(
*
cqCreateFunc
)(
void
*
handle
,
uint64_t
uid
,
int
sid
,
char
*
sqlStr
,
STSchema
*
pSchema
);
void
(
*
cqDropFunc
)(
void
*
handle
);
...
...
src/tsdb/src/tsdbCommit.c
0 → 100644
浏览文件 @
a803f240
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbMain.h"
static
int
tsdbCommitTSData
(
STsdbRepo
*
pRepo
);
static
int
tsdbCommitMeta
(
STsdbRepo
*
pRepo
);
static
void
tsdbEndCommit
(
STsdbRepo
*
pRepo
,
int
eno
);
static
int
tsdbHasDataToCommit
(
SCommitIter
*
iters
,
int
nIters
,
TSKEY
minKey
,
TSKEY
maxKey
);
static
int
tsdbCommitToFile
(
STsdbRepo
*
pRepo
,
int
fid
,
SCommitIter
*
iters
,
SRWHelper
*
pHelper
,
SDataCols
*
pDataCols
);
static
SCommitIter
*
tsdbCreateCommitIters
(
STsdbRepo
*
pRepo
);
static
void
tsdbDestroyCommitIters
(
SCommitIter
*
iters
,
int
maxTables
);
void
*
tsdbCommitData
(
STsdbRepo
*
pRepo
)
{
SMemTable
*
pMem
=
pRepo
->
imem
;
tsdbInfo
(
"vgId:%d start to commit! keyFirst %"
PRId64
" keyLast %"
PRId64
" numOfRows %"
PRId64
" meta rows: %d"
,
REPO_ID
(
pRepo
),
pMem
->
keyFirst
,
pMem
->
keyLast
,
pMem
->
numOfRows
,
listNEles
(
pMem
->
actList
));
// Commit to update meta file
if
(
tsdbCommitMeta
(
pRepo
)
<
0
)
{
tsdbError
(
"vgId:%d error occurs while committing META data since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_err
;
}
// Create the iterator to read from cache
if
(
tsdbCommitTSData
(
pRepo
)
<
0
)
{
tsdbError
(
"vgId:%d error occurs while committing TS data since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_err
;
}
tsdbFitRetention
(
pRepo
);
tsdbInfo
(
"vgId:%d commit over, succeed"
,
REPO_ID
(
pRepo
));
tsdbEndCommit
(
pRepo
,
TSDB_CODE_SUCCESS
);
return
NULL
;
_err:
ASSERT
(
terrno
!=
TSDB_CODE_SUCCESS
);
tsdbInfo
(
"vgId:%d commit over, failed"
,
REPO_ID
(
pRepo
));
tsdbEndCommit
(
pRepo
,
terrno
);
return
NULL
;
}
static
int
tsdbCommitTSData
(
STsdbRepo
*
pRepo
)
{
SMemTable
*
pMem
=
pRepo
->
imem
;
SDataCols
*
pDataCols
=
NULL
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
SCommitIter
*
iters
=
NULL
;
SRWHelper
whelper
=
{
0
};
STsdbCfg
*
pCfg
=
&
(
pRepo
->
config
);
if
(
pMem
->
numOfRows
<=
0
)
return
0
;
iters
=
tsdbCreateCommitIters
(
pRepo
);
if
(
iters
==
NULL
)
{
tsdbError
(
"vgId:%d failed to create commit iterator since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_err
;
}
if
(
tsdbInitWriteHelper
(
&
whelper
,
pRepo
)
<
0
)
{
tsdbError
(
"vgId:%d failed to init write helper since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_err
;
}
if
((
pDataCols
=
tdNewDataCols
(
pMeta
->
maxRowBytes
,
pMeta
->
maxCols
,
pCfg
->
maxRowsPerFileBlock
))
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
tsdbError
(
"vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s"
,
REPO_ID
(
pRepo
),
pMeta
->
maxCols
,
pMeta
->
maxRowBytes
,
pCfg
->
maxRowsPerFileBlock
,
tstrerror
(
terrno
));
goto
_err
;
}
int
sfid
=
(
int
)(
TSDB_KEY_FILEID
(
pMem
->
keyFirst
,
pCfg
->
daysPerFile
,
pCfg
->
precision
));
int
efid
=
(
int
)(
TSDB_KEY_FILEID
(
pMem
->
keyLast
,
pCfg
->
daysPerFile
,
pCfg
->
precision
));
// Loop to commit to each file
for
(
int
fid
=
sfid
;
fid
<=
efid
;
fid
++
)
{
if
(
tsdbCommitToFile
(
pRepo
,
fid
,
iters
,
&
whelper
,
pDataCols
)
<
0
)
{
tsdbError
(
"vgId:%d failed to commit to file %d since %s"
,
REPO_ID
(
pRepo
),
fid
,
tstrerror
(
terrno
));
goto
_err
;
}
}
tdFreeDataCols
(
pDataCols
);
tsdbDestroyCommitIters
(
iters
,
pMem
->
maxTables
);
tsdbDestroyHelper
(
&
whelper
);
return
0
;
_err:
tdFreeDataCols
(
pDataCols
);
tsdbDestroyCommitIters
(
iters
,
pMem
->
maxTables
);
tsdbDestroyHelper
(
&
whelper
);
return
-
1
;
}
static
int
tsdbCommitMeta
(
STsdbRepo
*
pRepo
)
{
SMemTable
*
pMem
=
pRepo
->
imem
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
SActObj
*
pAct
=
NULL
;
SActCont
*
pCont
=
NULL
;
if
(
listNEles
(
pMem
->
actList
)
<=
0
)
return
0
;
if
(
tdKVStoreStartCommit
(
pMeta
->
pStore
)
<
0
)
{
tsdbError
(
"vgId:%d failed to commit data while start commit meta since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_err
;
}
SListNode
*
pNode
=
NULL
;
while
((
pNode
=
tdListPopHead
(
pMem
->
actList
))
!=
NULL
)
{
pAct
=
(
SActObj
*
)
pNode
->
data
;
if
(
pAct
->
act
==
TSDB_UPDATE_META
)
{
pCont
=
(
SActCont
*
)
POINTER_SHIFT
(
pAct
,
sizeof
(
SActObj
));
if
(
tdUpdateKVStoreRecord
(
pMeta
->
pStore
,
pAct
->
uid
,
(
void
*
)(
pCont
->
cont
),
pCont
->
len
)
<
0
)
{
tsdbError
(
"vgId:%d failed to update meta with uid %"
PRIu64
" since %s"
,
REPO_ID
(
pRepo
),
pAct
->
uid
,
tstrerror
(
terrno
));
tdKVStoreEndCommit
(
pMeta
->
pStore
);
goto
_err
;
}
}
else
if
(
pAct
->
act
==
TSDB_DROP_META
)
{
if
(
tdDropKVStoreRecord
(
pMeta
->
pStore
,
pAct
->
uid
)
<
0
)
{
tsdbError
(
"vgId:%d failed to drop meta with uid %"
PRIu64
" since %s"
,
REPO_ID
(
pRepo
),
pAct
->
uid
,
tstrerror
(
terrno
));
tdKVStoreEndCommit
(
pMeta
->
pStore
);
goto
_err
;
}
}
else
{
ASSERT
(
false
);
}
}
if
(
tdKVStoreEndCommit
(
pMeta
->
pStore
)
<
0
)
{
tsdbError
(
"vgId:%d failed to commit data while end commit meta since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_err
;
}
return
0
;
_err:
return
-
1
;
}
static
void
tsdbEndCommit
(
STsdbRepo
*
pRepo
,
int
eno
)
{
if
(
pRepo
->
appH
.
notifyStatus
)
pRepo
->
appH
.
notifyStatus
(
pRepo
->
appH
.
appH
,
TSDB_STATUS_COMMIT_OVER
,
eno
);
sem_post
(
&
(
pRepo
->
readyToCommit
));
}
static
int
tsdbHasDataToCommit
(
SCommitIter
*
iters
,
int
nIters
,
TSKEY
minKey
,
TSKEY
maxKey
)
{
for
(
int
i
=
0
;
i
<
nIters
;
i
++
)
{
TSKEY
nextKey
=
tsdbNextIterKey
((
iters
+
i
)
->
pIter
);
if
(
nextKey
!=
TSDB_DATA_TIMESTAMP_NULL
&&
(
nextKey
>=
minKey
&&
nextKey
<=
maxKey
))
return
1
;
}
return
0
;
}
static
int
tsdbCommitToFile
(
STsdbRepo
*
pRepo
,
int
fid
,
SCommitIter
*
iters
,
SRWHelper
*
pHelper
,
SDataCols
*
pDataCols
)
{
char
*
dataDir
=
NULL
;
STsdbCfg
*
pCfg
=
&
pRepo
->
config
;
STsdbFileH
*
pFileH
=
pRepo
->
tsdbFileH
;
SFileGroup
*
pGroup
=
NULL
;
SMemTable
*
pMem
=
pRepo
->
imem
;
bool
newLast
=
false
;
TSKEY
minKey
=
0
,
maxKey
=
0
;
tsdbGetFidKeyRange
(
pCfg
->
daysPerFile
,
pCfg
->
precision
,
fid
,
&
minKey
,
&
maxKey
);
// Check if there are data to commit to this file
int
hasDataToCommit
=
tsdbHasDataToCommit
(
iters
,
pMem
->
maxTables
,
minKey
,
maxKey
);
if
(
!
hasDataToCommit
)
{
tsdbDebug
(
"vgId:%d no data to commit to file %d"
,
REPO_ID
(
pRepo
),
fid
);
return
0
;
}
// Create and open files for commit
dataDir
=
tsdbGetDataDirName
(
pRepo
->
rootDir
);
if
(
dataDir
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
}
if
((
pGroup
=
tsdbCreateFGroupIfNeed
(
pRepo
,
dataDir
,
fid
))
==
NULL
)
{
tsdbError
(
"vgId:%d failed to create file group %d since %s"
,
REPO_ID
(
pRepo
),
fid
,
tstrerror
(
terrno
));
goto
_err
;
}
// Open files for write/read
if
(
tsdbSetAndOpenHelperFile
(
pHelper
,
pGroup
)
<
0
)
{
tsdbError
(
"vgId:%d failed to set helper file since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_err
;
}
newLast
=
TSDB_NLAST_FILE_OPENED
(
pHelper
);
if
(
tsdbLoadCompIdx
(
pHelper
,
NULL
)
<
0
)
{
tsdbError
(
"vgId:%d failed to load SCompIdx part since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_err
;
}
// Loop to commit data in each table
for
(
int
tid
=
1
;
tid
<
pMem
->
maxTables
;
tid
++
)
{
SCommitIter
*
pIter
=
iters
+
tid
;
if
(
pIter
->
pTable
==
NULL
)
continue
;
taosRLockLatch
(
&
(
pIter
->
pTable
->
latch
));
if
(
tsdbSetHelperTable
(
pHelper
,
pIter
->
pTable
,
pRepo
)
<
0
)
goto
_err
;
if
(
pIter
->
pIter
!=
NULL
)
{
if
(
tdInitDataCols
(
pDataCols
,
tsdbGetTableSchemaImpl
(
pIter
->
pTable
,
false
,
false
,
-
1
))
<
0
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_err
;
}
if
(
tsdbCommitTableData
(
pHelper
,
pIter
,
pDataCols
,
maxKey
)
<
0
)
{
taosRUnLockLatch
(
&
(
pIter
->
pTable
->
latch
));
tsdbError
(
"vgId:%d failed to write data of table %s tid %d uid %"
PRIu64
" since %s"
,
REPO_ID
(
pRepo
),
TABLE_CHAR_NAME
(
pIter
->
pTable
),
TABLE_TID
(
pIter
->
pTable
),
TABLE_UID
(
pIter
->
pTable
),
tstrerror
(
terrno
));
goto
_err
;
}
}
taosRUnLockLatch
(
&
(
pIter
->
pTable
->
latch
));
// Move the last block to the new .l file if neccessary
if
(
tsdbMoveLastBlockIfNeccessary
(
pHelper
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to move last block, since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_err
;
}
// Write the SCompBlock part
if
(
tsdbWriteCompInfo
(
pHelper
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to write compInfo part since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_err
;
}
}
if
(
tsdbWriteCompIdx
(
pHelper
)
<
0
)
{
tsdbError
(
"vgId:%d failed to write compIdx part to file %d since %s"
,
REPO_ID
(
pRepo
),
fid
,
tstrerror
(
terrno
));
goto
_err
;
}
tfree
(
dataDir
);
tsdbCloseHelperFile
(
pHelper
,
0
,
pGroup
);
pthread_rwlock_wrlock
(
&
(
pFileH
->
fhlock
));
(
void
)
rename
(
helperNewHeadF
(
pHelper
)
->
fname
,
helperHeadF
(
pHelper
)
->
fname
);
pGroup
->
files
[
TSDB_FILE_TYPE_HEAD
].
info
=
helperNewHeadF
(
pHelper
)
->
info
;
if
(
newLast
)
{
(
void
)
rename
(
helperNewLastF
(
pHelper
)
->
fname
,
helperLastF
(
pHelper
)
->
fname
);
pGroup
->
files
[
TSDB_FILE_TYPE_LAST
].
info
=
helperNewLastF
(
pHelper
)
->
info
;
}
else
{
pGroup
->
files
[
TSDB_FILE_TYPE_LAST
].
info
=
helperLastF
(
pHelper
)
->
info
;
}
pGroup
->
files
[
TSDB_FILE_TYPE_DATA
].
info
=
helperDataF
(
pHelper
)
->
info
;
pthread_rwlock_unlock
(
&
(
pFileH
->
fhlock
));
return
0
;
_err:
tfree
(
dataDir
);
tsdbCloseHelperFile
(
pHelper
,
1
,
NULL
);
return
-
1
;
}
static
SCommitIter
*
tsdbCreateCommitIters
(
STsdbRepo
*
pRepo
)
{
SMemTable
*
pMem
=
pRepo
->
imem
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
SCommitIter
*
iters
=
(
SCommitIter
*
)
calloc
(
pMem
->
maxTables
,
sizeof
(
SCommitIter
));
if
(
iters
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
NULL
;
}
if
(
tsdbRLockRepoMeta
(
pRepo
)
<
0
)
goto
_err
;
// reference all tables
for
(
int
i
=
0
;
i
<
pMem
->
maxTables
;
i
++
)
{
if
(
pMeta
->
tables
[
i
]
!=
NULL
)
{
tsdbRefTable
(
pMeta
->
tables
[
i
]);
iters
[
i
].
pTable
=
pMeta
->
tables
[
i
];
}
}
if
(
tsdbUnlockRepoMeta
(
pRepo
)
<
0
)
goto
_err
;
for
(
int
i
=
0
;
i
<
pMem
->
maxTables
;
i
++
)
{
if
((
iters
[
i
].
pTable
!=
NULL
)
&&
(
pMem
->
tData
[
i
]
!=
NULL
)
&&
(
TABLE_UID
(
iters
[
i
].
pTable
)
==
pMem
->
tData
[
i
]
->
uid
))
{
if
((
iters
[
i
].
pIter
=
tSkipListCreateIter
(
pMem
->
tData
[
i
]
->
pData
))
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_err
;
}
tSkipListIterNext
(
iters
[
i
].
pIter
);
}
}
return
iters
;
_err:
tsdbDestroyCommitIters
(
iters
,
pMem
->
maxTables
);
return
NULL
;
}
static
void
tsdbDestroyCommitIters
(
SCommitIter
*
iters
,
int
maxTables
)
{
if
(
iters
==
NULL
)
return
;
for
(
int
i
=
1
;
i
<
maxTables
;
i
++
)
{
if
(
iters
[
i
].
pTable
!=
NULL
)
{
tsdbUnRefTable
(
iters
[
i
].
pTable
);
tSkipListDestroyIter
(
iters
[
i
].
pIter
);
}
}
free
(
iters
);
}
src/tsdb/src/tsdbFile.c
浏览文件 @
a803f240
...
...
@@ -256,7 +256,8 @@ SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid) {
pFileH
->
pFGroup
[
pFileH
->
nFGroups
++
]
=
fGroup
;
qsort
((
void
*
)(
pFileH
->
pFGroup
),
pFileH
->
nFGroups
,
sizeof
(
SFileGroup
),
compFGroup
);
pthread_rwlock_unlock
(
&
pFileH
->
fhlock
);
return
tsdbSearchFGroup
(
pFileH
,
fid
,
TD_EQ
);
pGroup
=
tsdbSearchFGroup
(
pFileH
,
fid
,
TD_EQ
);
ASSERT
(
pGroup
!=
NULL
);
}
return
pGroup
;
...
...
src/tsdb/src/tsdbMemTable.c
浏览文件 @
a803f240
...
...
@@ -23,12 +23,6 @@ static void tsdbFreeMemTable(SMemTable *pMemTable);
static
STableData
*
tsdbNewTableData
(
STsdbCfg
*
pCfg
,
STable
*
pTable
);
static
void
tsdbFreeTableData
(
STableData
*
pTableData
);
static
char
*
tsdbGetTsTupleKey
(
const
void
*
data
);
static
int
tsdbCommitMeta
(
STsdbRepo
*
pRepo
);
static
void
tsdbEndCommit
(
STsdbRepo
*
pRepo
);
static
int
tsdbHasDataToCommit
(
SCommitIter
*
iters
,
int
nIters
,
TSKEY
minKey
,
TSKEY
maxKey
);
static
int
tsdbCommitToFile
(
STsdbRepo
*
pRepo
,
int
fid
,
SCommitIter
*
iters
,
SRWHelper
*
pHelper
,
SDataCols
*
pDataCols
);
static
SCommitIter
*
tsdbCreateCommitIters
(
STsdbRepo
*
pRepo
);
static
void
tsdbDestroyCommitIters
(
SCommitIter
*
iters
,
int
maxTables
);
static
int
tsdbAdjustMemMaxTables
(
SMemTable
*
pMemTable
,
int
maxTables
);
static
int
tsdbAppendTableRowToCols
(
STable
*
pTable
,
SDataCols
*
pCols
,
STSchema
**
ppSchema
,
SDataRow
row
);
static
int
tsdbInitSubmitBlkIter
(
SSubmitBlk
*
pBlock
,
SSubmitBlkIter
*
pIter
);
...
...
@@ -215,7 +209,7 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
sem_wait
(
&
(
pRepo
->
readyToCommit
));
if
(
pRepo
->
appH
.
notifyStatus
)
pRepo
->
appH
.
notifyStatus
(
pRepo
->
appH
.
appH
,
TSDB_STATUS_COMMIT_START
);
if
(
pRepo
->
appH
.
notifyStatus
)
pRepo
->
appH
.
notifyStatus
(
pRepo
->
appH
.
appH
,
TSDB_STATUS_COMMIT_START
,
TSDB_CODE_SUCCESS
);
if
(
tsdbLockRepo
(
pRepo
)
<
0
)
return
-
1
;
pRepo
->
imem
=
pRepo
->
mem
;
pRepo
->
mem
=
NULL
;
...
...
@@ -355,68 +349,6 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
return
0
;
}
void
*
tsdbCommitData
(
STsdbRepo
*
pRepo
)
{
SMemTable
*
pMem
=
pRepo
->
imem
;
STsdbCfg
*
pCfg
=
&
pRepo
->
config
;
SDataCols
*
pDataCols
=
NULL
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
SCommitIter
*
iters
=
NULL
;
SRWHelper
whelper
=
{
0
};
ASSERT
(
pMem
!=
NULL
);
tsdbInfo
(
"vgId:%d start to commit! keyFirst %"
PRId64
" keyLast %"
PRId64
" numOfRows %"
PRId64
,
REPO_ID
(
pRepo
),
pMem
->
keyFirst
,
pMem
->
keyLast
,
pMem
->
numOfRows
);
// Create the iterator to read from cache
if
(
pMem
->
numOfRows
>
0
)
{
iters
=
tsdbCreateCommitIters
(
pRepo
);
if
(
iters
==
NULL
)
{
tsdbError
(
"vgId:%d failed to create commit iterator since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_exit
;
}
if
(
tsdbInitWriteHelper
(
&
whelper
,
pRepo
)
<
0
)
{
tsdbError
(
"vgId:%d failed to init write helper since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_exit
;
}
if
((
pDataCols
=
tdNewDataCols
(
pMeta
->
maxRowBytes
,
pMeta
->
maxCols
,
pCfg
->
maxRowsPerFileBlock
))
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
tsdbError
(
"vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s"
,
REPO_ID
(
pRepo
),
pMeta
->
maxCols
,
pMeta
->
maxRowBytes
,
pCfg
->
maxRowsPerFileBlock
,
tstrerror
(
terrno
));
goto
_exit
;
}
int
sfid
=
(
int
)(
TSDB_KEY_FILEID
(
pMem
->
keyFirst
,
pCfg
->
daysPerFile
,
pCfg
->
precision
));
int
efid
=
(
int
)(
TSDB_KEY_FILEID
(
pMem
->
keyLast
,
pCfg
->
daysPerFile
,
pCfg
->
precision
));
// Loop to commit to each file
for
(
int
fid
=
sfid
;
fid
<=
efid
;
fid
++
)
{
if
(
tsdbCommitToFile
(
pRepo
,
fid
,
iters
,
&
whelper
,
pDataCols
)
<
0
)
{
tsdbError
(
"vgId:%d failed to commit to file %d since %s"
,
REPO_ID
(
pRepo
),
fid
,
tstrerror
(
terrno
));
goto
_exit
;
}
}
}
// Commit to update meta file
if
(
tsdbCommitMeta
(
pRepo
)
<
0
)
{
tsdbError
(
"vgId:%d failed to commit data while committing meta data since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_exit
;
}
tsdbFitRetention
(
pRepo
);
_exit:
tdFreeDataCols
(
pDataCols
);
tsdbDestroyCommitIters
(
iters
,
pMem
->
maxTables
);
tsdbDestroyHelper
(
&
whelper
);
tsdbInfo
(
"vgId:%d commit over"
,
pRepo
->
config
.
tsdbId
);
tsdbEndCommit
(
pRepo
);
return
NULL
;
}
// ---------------- LOCAL FUNCTIONS ----------------
static
SMemTable
*
tsdbNewMemTable
(
STsdbRepo
*
pRepo
)
{
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
...
...
@@ -508,240 +440,11 @@ static void tsdbFreeTableData(STableData *pTableData) {
static
char
*
tsdbGetTsTupleKey
(
const
void
*
data
)
{
return
dataRowTuple
((
SDataRow
)
data
);
}
static
int
tsdbCommitMeta
(
STsdbRepo
*
pRepo
)
{
SMemTable
*
pMem
=
pRepo
->
imem
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
SActObj
*
pAct
=
NULL
;
SActCont
*
pCont
=
NULL
;
if
(
listNEles
(
pMem
->
actList
)
>
0
)
{
if
(
tdKVStoreStartCommit
(
pMeta
->
pStore
)
<
0
)
{
tsdbError
(
"vgId:%d failed to commit data while start commit meta since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_err
;
}
SListNode
*
pNode
=
NULL
;
while
((
pNode
=
tdListPopHead
(
pMem
->
actList
))
!=
NULL
)
{
pAct
=
(
SActObj
*
)
pNode
->
data
;
if
(
pAct
->
act
==
TSDB_UPDATE_META
)
{
pCont
=
(
SActCont
*
)
POINTER_SHIFT
(
pAct
,
sizeof
(
SActObj
));
if
(
tdUpdateKVStoreRecord
(
pMeta
->
pStore
,
pAct
->
uid
,
(
void
*
)(
pCont
->
cont
),
pCont
->
len
)
<
0
)
{
tsdbError
(
"vgId:%d failed to update meta with uid %"
PRIu64
" since %s"
,
REPO_ID
(
pRepo
),
pAct
->
uid
,
tstrerror
(
terrno
));
tdKVStoreEndCommit
(
pMeta
->
pStore
);
goto
_err
;
}
}
else
if
(
pAct
->
act
==
TSDB_DROP_META
)
{
if
(
tdDropKVStoreRecord
(
pMeta
->
pStore
,
pAct
->
uid
)
<
0
)
{
tsdbError
(
"vgId:%d failed to drop meta with uid %"
PRIu64
" since %s"
,
REPO_ID
(
pRepo
),
pAct
->
uid
,
tstrerror
(
terrno
));
tdKVStoreEndCommit
(
pMeta
->
pStore
);
goto
_err
;
}
}
else
{
ASSERT
(
false
);
}
}
if
(
tdKVStoreEndCommit
(
pMeta
->
pStore
)
<
0
)
{
tsdbError
(
"vgId:%d failed to commit data while end commit meta since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_err
;
}
}
return
0
;
_err:
return
-
1
;
}
static
void
tsdbEndCommit
(
STsdbRepo
*
pRepo
)
{
if
(
pRepo
->
appH
.
notifyStatus
)
pRepo
->
appH
.
notifyStatus
(
pRepo
->
appH
.
appH
,
TSDB_STATUS_COMMIT_OVER
);
sem_post
(
&
(
pRepo
->
readyToCommit
));
}
static
int
tsdbHasDataToCommit
(
SCommitIter
*
iters
,
int
nIters
,
TSKEY
minKey
,
TSKEY
maxKey
)
{
for
(
int
i
=
0
;
i
<
nIters
;
i
++
)
{
TSKEY
nextKey
=
tsdbNextIterKey
((
iters
+
i
)
->
pIter
);
if
(
nextKey
!=
TSDB_DATA_TIMESTAMP_NULL
&&
(
nextKey
>=
minKey
&&
nextKey
<=
maxKey
))
return
1
;
}
return
0
;
}
void
tsdbGetFidKeyRange
(
int
daysPerFile
,
int8_t
precision
,
int
fileId
,
TSKEY
*
minKey
,
TSKEY
*
maxKey
)
{
*
minKey
=
fileId
*
daysPerFile
*
tsMsPerDay
[
precision
];
*
maxKey
=
*
minKey
+
daysPerFile
*
tsMsPerDay
[
precision
]
-
1
;
}
static
int
tsdbCommitToFile
(
STsdbRepo
*
pRepo
,
int
fid
,
SCommitIter
*
iters
,
SRWHelper
*
pHelper
,
SDataCols
*
pDataCols
)
{
char
*
dataDir
=
NULL
;
STsdbCfg
*
pCfg
=
&
pRepo
->
config
;
STsdbFileH
*
pFileH
=
pRepo
->
tsdbFileH
;
SFileGroup
*
pGroup
=
NULL
;
SMemTable
*
pMem
=
pRepo
->
imem
;
bool
newLast
=
false
;
TSKEY
minKey
=
0
,
maxKey
=
0
;
tsdbGetFidKeyRange
(
pCfg
->
daysPerFile
,
pCfg
->
precision
,
fid
,
&
minKey
,
&
maxKey
);
// Check if there are data to commit to this file
int
hasDataToCommit
=
tsdbHasDataToCommit
(
iters
,
pMem
->
maxTables
,
minKey
,
maxKey
);
if
(
!
hasDataToCommit
)
{
tsdbDebug
(
"vgId:%d no data to commit to file %d"
,
REPO_ID
(
pRepo
),
fid
);
return
0
;
}
// Create and open files for commit
dataDir
=
tsdbGetDataDirName
(
pRepo
->
rootDir
);
if
(
dataDir
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
}
if
((
pGroup
=
tsdbCreateFGroupIfNeed
(
pRepo
,
dataDir
,
fid
))
==
NULL
)
{
tsdbError
(
"vgId:%d failed to create file group %d since %s"
,
REPO_ID
(
pRepo
),
fid
,
tstrerror
(
terrno
));
goto
_err
;
}
// Open files for write/read
if
(
tsdbSetAndOpenHelperFile
(
pHelper
,
pGroup
)
<
0
)
{
tsdbError
(
"vgId:%d failed to set helper file since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_err
;
}
newLast
=
TSDB_NLAST_FILE_OPENED
(
pHelper
);
if
(
tsdbLoadCompIdx
(
pHelper
,
NULL
)
<
0
)
{
tsdbError
(
"vgId:%d failed to load SCompIdx part since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_err
;
}
// Loop to commit data in each table
for
(
int
tid
=
1
;
tid
<
pMem
->
maxTables
;
tid
++
)
{
SCommitIter
*
pIter
=
iters
+
tid
;
if
(
pIter
->
pTable
==
NULL
)
continue
;
taosRLockLatch
(
&
(
pIter
->
pTable
->
latch
));
if
(
tsdbSetHelperTable
(
pHelper
,
pIter
->
pTable
,
pRepo
)
<
0
)
goto
_err
;
if
(
pIter
->
pIter
!=
NULL
)
{
if
(
tdInitDataCols
(
pDataCols
,
tsdbGetTableSchemaImpl
(
pIter
->
pTable
,
false
,
false
,
-
1
))
<
0
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_err
;
}
if
(
tsdbCommitTableData
(
pHelper
,
pIter
,
pDataCols
,
maxKey
)
<
0
)
{
taosRUnLockLatch
(
&
(
pIter
->
pTable
->
latch
));
tsdbError
(
"vgId:%d failed to write data of table %s tid %d uid %"
PRIu64
" since %s"
,
REPO_ID
(
pRepo
),
TABLE_CHAR_NAME
(
pIter
->
pTable
),
TABLE_TID
(
pIter
->
pTable
),
TABLE_UID
(
pIter
->
pTable
),
tstrerror
(
terrno
));
goto
_err
;
}
}
taosRUnLockLatch
(
&
(
pIter
->
pTable
->
latch
));
// Move the last block to the new .l file if neccessary
if
(
tsdbMoveLastBlockIfNeccessary
(
pHelper
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to move last block, since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_err
;
}
// Write the SCompBlock part
if
(
tsdbWriteCompInfo
(
pHelper
)
<
0
)
{
tsdbError
(
"vgId:%d, failed to write compInfo part since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
goto
_err
;
}
}
if
(
tsdbWriteCompIdx
(
pHelper
)
<
0
)
{
tsdbError
(
"vgId:%d failed to write compIdx part to file %d since %s"
,
REPO_ID
(
pRepo
),
fid
,
tstrerror
(
terrno
));
goto
_err
;
}
tfree
(
dataDir
);
tsdbCloseHelperFile
(
pHelper
,
0
,
pGroup
);
pthread_rwlock_wrlock
(
&
(
pFileH
->
fhlock
));
(
void
)
rename
(
helperNewHeadF
(
pHelper
)
->
fname
,
helperHeadF
(
pHelper
)
->
fname
);
pGroup
->
files
[
TSDB_FILE_TYPE_HEAD
].
info
=
helperNewHeadF
(
pHelper
)
->
info
;
if
(
newLast
)
{
(
void
)
rename
(
helperNewLastF
(
pHelper
)
->
fname
,
helperLastF
(
pHelper
)
->
fname
);
pGroup
->
files
[
TSDB_FILE_TYPE_LAST
].
info
=
helperNewLastF
(
pHelper
)
->
info
;
}
else
{
pGroup
->
files
[
TSDB_FILE_TYPE_LAST
].
info
=
helperLastF
(
pHelper
)
->
info
;
}
pGroup
->
files
[
TSDB_FILE_TYPE_DATA
].
info
=
helperDataF
(
pHelper
)
->
info
;
pthread_rwlock_unlock
(
&
(
pFileH
->
fhlock
));
return
0
;
_err:
tfree
(
dataDir
);
tsdbCloseHelperFile
(
pHelper
,
1
,
NULL
);
return
-
1
;
}
static
SCommitIter
*
tsdbCreateCommitIters
(
STsdbRepo
*
pRepo
)
{
SMemTable
*
pMem
=
pRepo
->
imem
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
SCommitIter
*
iters
=
(
SCommitIter
*
)
calloc
(
pMem
->
maxTables
,
sizeof
(
SCommitIter
));
if
(
iters
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
NULL
;
}
if
(
tsdbRLockRepoMeta
(
pRepo
)
<
0
)
goto
_err
;
// reference all tables
for
(
int
i
=
0
;
i
<
pMem
->
maxTables
;
i
++
)
{
if
(
pMeta
->
tables
[
i
]
!=
NULL
)
{
tsdbRefTable
(
pMeta
->
tables
[
i
]);
iters
[
i
].
pTable
=
pMeta
->
tables
[
i
];
}
}
if
(
tsdbUnlockRepoMeta
(
pRepo
)
<
0
)
goto
_err
;
for
(
int
i
=
0
;
i
<
pMem
->
maxTables
;
i
++
)
{
if
((
iters
[
i
].
pTable
!=
NULL
)
&&
(
pMem
->
tData
[
i
]
!=
NULL
)
&&
(
TABLE_UID
(
iters
[
i
].
pTable
)
==
pMem
->
tData
[
i
]
->
uid
))
{
if
((
iters
[
i
].
pIter
=
tSkipListCreateIter
(
pMem
->
tData
[
i
]
->
pData
))
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_err
;
}
tSkipListIterNext
(
iters
[
i
].
pIter
);
}
}
return
iters
;
_err:
tsdbDestroyCommitIters
(
iters
,
pMem
->
maxTables
);
return
NULL
;
}
static
void
tsdbDestroyCommitIters
(
SCommitIter
*
iters
,
int
maxTables
)
{
if
(
iters
==
NULL
)
return
;
for
(
int
i
=
1
;
i
<
maxTables
;
i
++
)
{
if
(
iters
[
i
].
pTable
!=
NULL
)
{
tsdbUnRefTable
(
iters
[
i
].
pTable
);
tSkipListDestroyIter
(
iters
[
i
].
pIter
);
}
}
free
(
iters
);
}
static
int
tsdbAdjustMemMaxTables
(
SMemTable
*
pMemTable
,
int
maxTables
)
{
ASSERT
(
pMemTable
->
maxTables
<
maxTables
);
...
...
src/util/src/tkvstore.c
浏览文件 @
a803f240
...
...
@@ -236,6 +236,7 @@ int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLe
rInfo
.
offset
=
lseek
(
pStore
->
fd
,
0
,
SEEK_CUR
);
if
(
rInfo
.
offset
<
0
)
{
uError
(
"failed to lseek file %s since %s"
,
pStore
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
...
...
@@ -254,6 +255,7 @@ int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLe
if
(
taosWrite
(
pStore
->
fd
,
cont
,
contLen
)
<
contLen
)
{
uError
(
"failed to write %d bytes to file %s since %s"
,
contLen
,
pStore
->
fname
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
a803f240
...
...
@@ -30,7 +30,7 @@
static
SHashObj
*
tsVnodesHash
;
static
void
vnodeCleanUp
(
SVnodeObj
*
pVnode
);
static
int
vnodeProcessTsdbStatus
(
void
*
arg
,
int
status
);
static
int
vnodeProcessTsdbStatus
(
void
*
arg
,
int
status
,
int
eno
);
static
uint32_t
vnodeGetFileInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int64_t
*
size
,
uint64_t
*
fversion
);
static
int
vnodeGetWalInfo
(
void
*
ahandle
,
char
*
fileName
,
int64_t
*
fileId
);
static
void
vnodeNotifyRole
(
void
*
ahandle
,
int8_t
role
);
...
...
@@ -590,9 +590,13 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
}
// TODO: this is a simple implement
static
int
vnodeProcessTsdbStatus
(
void
*
arg
,
int
status
)
{
static
int
vnodeProcessTsdbStatus
(
void
*
arg
,
int
status
,
int
eno
)
{
SVnodeObj
*
pVnode
=
arg
;
if
(
eno
!=
TSDB_CODE_SUCCESS
)
{
// TODO: deal with the error here
}
if
(
status
==
TSDB_STATUS_COMMIT_START
)
{
pVnode
->
fversion
=
pVnode
->
version
;
vDebug
(
"vgId:%d, start commit, fver:%"
PRIu64
" vver:%"
PRIu64
,
pVnode
->
vgId
,
pVnode
->
fversion
,
pVnode
->
version
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录