Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4935b146
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
4935b146
编写于
6月 08, 2022
作者:
H
Hongze Cheng
提交者:
GitHub
6月 08, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #13600 from taosdata/feat/row_refact
feat: vnode multi-version
上级
0d665b57
aa6f8f0e
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
364 addition
and
1388 deletion
+364
-1388
include/util/talgo.h
include/util/talgo.h
+3
-4
source/dnode/vnode/CMakeLists.txt
source/dnode/vnode/CMakeLists.txt
+0
-2
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+0
-51
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+1
-13
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+321
-83
source/dnode/vnode/src/tsdb/tsdbCommit2.c
source/dnode/vnode/src/tsdb/tsdbCommit2.c
+0
-436
source/dnode/vnode/src/tsdb/tsdbMemTable.c
source/dnode/vnode/src/tsdb/tsdbMemTable.c
+0
-177
source/dnode/vnode/src/tsdb/tsdbMemTable2.c
source/dnode/vnode/src/tsdb/tsdbMemTable2.c
+0
-530
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
+0
-27
source/util/src/talgo.c
source/util/src/talgo.c
+31
-65
source/util/test/CMakeLists.txt
source/util/test/CMakeLists.txt
+8
-0
未找到文件。
include/util/talgo.h
浏览文件 @
4935b146
...
...
@@ -65,7 +65,7 @@ void taosqsort(void *src, int64_t numOfElem, int64_t size, const void *param, __
* @param flags
* @return
*/
void
*
taosbsearch
(
const
void
*
key
,
const
void
*
base
,
int
64_t
nmemb
,
int64_t
size
,
__compar_fn_t
fn
,
int32_t
flags
);
void
*
taosbsearch
(
const
void
*
key
,
const
void
*
base
,
int
32_t
nmemb
,
int32_t
size
,
__compar_fn_t
compar
,
int32_t
flags
);
/**
* adjust heap
...
...
@@ -82,7 +82,7 @@ void *taosbsearch(const void *key, const void *base, int64_t nmemb, int64_t size
* @return
*/
void
taosheapadjust
(
void
*
base
,
int32_t
size
,
int32_t
start
,
int32_t
end
,
const
void
*
parcompar
,
__ext_compar_fn_t
compar
,
char
*
buf
,
bool
maxroot
);
__ext_compar_fn_t
compar
,
char
*
buf
,
bool
maxroot
);
/**
* sort heap to make sure it is a max/min root heap
...
...
@@ -97,8 +97,7 @@ void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const
* @param maxroot: if heap is max root heap
* @return
*/
void
taosheapsort
(
void
*
base
,
int32_t
size
,
int32_t
len
,
const
void
*
parcompar
,
__ext_compar_fn_t
compar
,
bool
maxroot
);
void
taosheapsort
(
void
*
base
,
int32_t
size
,
int32_t
len
,
const
void
*
parcompar
,
__ext_compar_fn_t
compar
,
bool
maxroot
);
#ifdef __cplusplus
}
...
...
source/dnode/vnode/CMakeLists.txt
浏览文件 @
4935b146
...
...
@@ -36,12 +36,10 @@ target_sources(
# tsdb
"src/tsdb/tsdbCommit.c"
# "src/tsdb/tsdbCommit2.c"
"src/tsdb/tsdbFile.c"
"src/tsdb/tsdbFS.c"
"src/tsdb/tsdbOpen.c"
"src/tsdb/tsdbMemTable.c"
# "src/tsdb/tsdbMemTable2.c"
"src/tsdb/tsdbRead.c"
"src/tsdb/tsdbReadImpl.c"
"src/tsdb/tsdbWrite.c"
...
...
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
4935b146
...
...
@@ -58,28 +58,6 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, ST
bool
tsdbTbDataIterNext
(
STbDataIter
*
pIter
);
bool
tsdbTbDataIterGet
(
STbDataIter
*
pIter
,
TSDBROW
*
pRow
);
int
tsdbLoadDataFromCache
(
STsdb
*
pTsdb
,
STable
*
pTable
,
STbDataIter
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
,
TKEY
*
filterKeys
,
int
nFilterKeys
,
bool
keepDup
,
SMergeInfo
*
pMergeInfo
);
// tsdbMemTable2.c ==============================================================================================
// typedef struct SMemTable2 SMemTable2;
// typedef struct SMemData SMemData;
// typedef struct SMemDataIter SMemDataIter;
// int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable2 **ppMemTable);
// void tsdbMemTableDestroy2(SMemTable2 *pMemTable);
// int32_t tsdbInsertTableData2(STsdb *pTsdb, int64_t version, SVSubmitBlk *pSubmitBlk);
// int32_t tsdbDeleteTableData2(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
// /* SMemDataIter */
// void tsdbMemDataIterOpen(SMemData *pMemData, TSDBKEY *pKey, int8_t backward, SMemDataIter *pIter);
// bool tsdbMemDataIterNext(SMemDataIter *pIter);
// void tsdbMemDataIterGet(SMemDataIter *pIter, TSDBROW **ppRow);
// // tsdbCommit2.c ==============================================================================================
// int32_t tsdbBegin2(STsdb *pTsdb);
// int32_t tsdbCommit2(STsdb *pTsdb);
// tsdbFile.c ==============================================================================================
typedef
int32_t
TSDB_FILE_T
;
typedef
struct
SDFInfo
SDFInfo
;
...
...
@@ -700,17 +678,6 @@ typedef struct {
TSKEY
eKey
;
}
SDelInfo
;
struct
SMemTable2
{
STsdb
*
pTsdb
;
int32_t
nRef
;
TSDBKEY
minKey
;
TSDBKEY
maxKey
;
int64_t
nRows
;
int64_t
nDelOp
;
SArray
*
aSkmInfo
;
SArray
*
aMemData
;
};
static
FORCE_INLINE
int
tsdbKeyCmprFn
(
const
void
*
p1
,
const
void
*
p2
)
{
TSDBKEY
*
pKey1
=
(
TSDBKEY
*
)
p1
;
TSDBKEY
*
pKey2
=
(
TSDBKEY
*
)
p2
;
...
...
@@ -730,24 +697,6 @@ static FORCE_INLINE int tsdbKeyCmprFn(const void *p1, const void *p2) {
return
0
;
}
struct
SMemData
{
tb_uid_t
suid
;
tb_uid_t
uid
;
TSDBKEY
minKey
;
TSDBKEY
maxKey
;
SDelOp
*
delOpHead
;
SDelOp
*
delOpTail
;
SMemSkipList
sl
;
};
struct
SMemDataIter
{
STbData
*
pMemData
;
int8_t
backward
;
TSDBROW
*
pRow
;
SMemSkipListNode
*
pNode
;
// current node
TSDBROW
row
;
};
struct
STbDataIter
{
STbData
*
pTbData
;
int8_t
backward
;
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
4935b146
...
...
@@ -112,7 +112,7 @@ int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
// tsdb
int
tsdbOpen
(
SVnode
*
pVnode
,
STsdb
**
ppTsdb
,
const
char
*
dir
,
STsdbKeepCfg
*
pKeepCfg
);
int
tsdbClose
(
STsdb
**
pTsdb
);
int
tsdbBegin
(
STsdb
*
pTsdb
);
int
32_t
tsdbBegin
(
STsdb
*
pTsdb
);
int32_t
tsdbCommit
(
STsdb
*
pTsdb
);
int
tsdbScanAndConvertSubmitMsg
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
);
int
tsdbInsertData
(
STsdb
*
pTsdb
,
int64_t
version
,
SSubmitReq
*
pMsg
,
SSubmitRsp
*
pRsp
);
...
...
@@ -161,18 +161,6 @@ int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore);
void
tdUidStoreDestory
(
STbUidStore
*
pStore
);
void
*
tdUidStoreFree
(
STbUidStore
*
pStore
);
#if 0
int32_t tsdbUpdateSmaWindow(STsdb* pTsdb, SSubmitReq* pMsg, int64_t version);
int32_t tsdbCreateTSma(STsdb* pTsdb, char* pMsg);
int32_t tsdbInsertTSmaData(STsdb* pTsdb, int64_t indexUid, const char* msg);
int32_t tsdbRegisterRSma(STsdb* pTsdb, SMeta* pMeta, SVCreateStbReq* pReq, SMsgCb* pMsgCb);
int32_t tsdbFetchTbUidList(STsdb* pTsdb, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
int32_t tsdbUpdateTbUidList(STsdb* pTsdb, STbUidStore* pUidStore);
void tsdbUidStoreDestory(STbUidStore* pStore);
void* tsdbUidStoreFree(STbUidStore* pStore);
int32_t tsdbTriggerRSma(STsdb* pTsdb, void* pMsg, int32_t inputType);
#endif
typedef
struct
{
int8_t
streamType
;
// sma or other
int8_t
dstType
;
...
...
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
4935b146
...
...
@@ -28,6 +28,8 @@ typedef struct {
int
niters
;
// memory iterators
SCommitIter
*
iters
;
bool
isRFileSet
;
// read and commit FSET
int32_t
fid
;
SDFileSet
*
pSet
;
SReadH
readh
;
SDFileSet
wSet
;
bool
isDFileSame
;
...
...
@@ -58,8 +60,12 @@ typedef struct {
#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->pVnode->config.tsdbCfg.maxRows)
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
static
void
tsdbStartCommit
(
STsdb
*
pRepo
);
static
void
tsdbEndCommit
(
STsdb
*
pTsdb
,
int
eno
);
static
int32_t
tsdbCommitData
(
SCommitH
*
pCommith
);
static
int32_t
tsdbCommitDel
(
SCommitH
*
pCommith
);
static
int32_t
tsdbCommitCache
(
SCommitH
*
pCommith
);
static
int32_t
tsdbStartCommit
(
STsdb
*
pTsdb
,
SCommitH
*
pCHandle
);
static
int32_t
tsdbEndCommit
(
SCommitH
*
pCHandle
,
int
eno
);
static
int
tsdbInitCommitH
(
SCommitH
*
pCommith
,
STsdb
*
pRepo
);
static
void
tsdbSeekCommitIter
(
SCommitH
*
pCommith
,
TSKEY
key
);
static
int
tsdbNextCommitFid
(
SCommitH
*
pCommith
);
...
...
@@ -67,7 +73,6 @@ static void tsdbDestroyCommitH(SCommitH *pCommith);
static
int32_t
tsdbCreateCommitIters
(
SCommitH
*
pCommith
);
static
void
tsdbDestroyCommitIters
(
SCommitH
*
pCommith
);
static
int
tsdbCommitToFile
(
SCommitH
*
pCommith
,
SDFileSet
*
pSet
,
int
fid
);
static
void
tsdbResetCommitFile
(
SCommitH
*
pCommith
);
static
int
tsdbSetAndOpenCommitFile
(
SCommitH
*
pCommith
,
SDFileSet
*
pSet
,
int
fid
);
static
int
tsdbCommitToTable
(
SCommitH
*
pCommith
,
int
tid
);
static
bool
tsdbCommitIsSameFile
(
SCommitH
*
pCommith
,
int
bidx
);
...
...
@@ -88,8 +93,11 @@ static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *i
SDataCols
*
pTarget
,
TSKEY
maxKey
,
int
maxRows
,
int8_t
update
);
static
int
tsdbWriteBlockIdx
(
SDFile
*
pHeadf
,
SArray
*
pIdxA
,
void
**
ppBuf
);
static
int
tsdbApplyRtnOnFSet
(
STsdb
*
pRepo
,
SDFileSet
*
pSet
,
SRtn
*
pRtn
);
static
int
tsdbLoadDataFromCache
(
STsdb
*
pTsdb
,
STable
*
pTable
,
STbDataIter
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
,
TKEY
*
filterKeys
,
int
nFilterKeys
,
bool
keepDup
,
SMergeInfo
*
pMergeInfo
);
int
tsdbBegin
(
STsdb
*
pTsdb
)
{
int
32_t
tsdbBegin
(
STsdb
*
pTsdb
)
{
if
(
!
pTsdb
)
return
0
;
SMemTable
*
pMem
;
...
...
@@ -112,15 +120,50 @@ int32_t tsdbCommit(STsdb *pTsdb) {
pTsdb
->
mem
=
NULL
;
// start commit
tsdbStartCommit
(
pTsdb
);
if
(
tsdbInitCommitH
(
&
commith
,
pTsdb
)
<
0
)
{
return
-
1
;
code
=
tsdbStartCommit
(
pTsdb
,
&
commith
);
if
(
code
)
{
goto
_err
;
}
// commit impl
code
=
tsdbCommitData
(
&
commith
);
if
(
code
)
{
goto
_err
;
}
code
=
tsdbCommitDel
(
&
commith
);
if
(
code
)
{
goto
_err
;
}
code
=
tsdbCommitCache
(
&
commith
);
if
(
code
)
{
goto
_err
;
}
// end commit
code
=
tsdbEndCommit
(
&
commith
,
0
);
if
(
code
)
{
goto
_err
;
}
return
code
;
_err:
tsdbError
(
"vgId:%d failed to commit since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
static
int32_t
tsdbCommitData
(
SCommitH
*
pCommith
)
{
int32_t
fid
;
SDFileSet
*
pSet
=
NULL
;
int32_t
code
=
0
;
STsdb
*
pTsdb
=
TSDB_COMMIT_REPO
(
pCommith
);
// Skip expired memory data and expired FSET
tsdbSeekCommitIter
(
&
commith
,
commith
.
rtn
.
minKey
);
while
((
pSet
=
tsdbFSIterNext
(
&
(
commith
.
fsIter
))))
{
if
(
pSet
->
fid
<
commith
.
rtn
.
minFid
)
{
tsdbSeekCommitIter
(
pCommith
,
pCommith
->
rtn
.
minKey
);
while
((
pSet
=
tsdbFSIterNext
(
&
(
pCommith
->
fsIter
))))
{
if
(
pSet
->
fid
<
pCommith
->
rtn
.
minFid
)
{
tsdbInfo
(
"vgId:%d, FSET %d on level %d disk id %d expires, remove it"
,
REPO_ID
(
pTsdb
),
pSet
->
fid
,
TSDB_FSET_LEVEL
(
pSet
),
TSDB_FSET_ID
(
pSet
));
}
else
{
...
...
@@ -129,7 +172,7 @@ int32_t tsdbCommit(STsdb *pTsdb) {
}
// commit
fid
=
tsdbNextCommitFid
(
&
(
commith
)
);
fid
=
tsdbNextCommitFid
(
pCommith
);
while
(
true
)
{
// Loop over both on disk and memory
if
(
pSet
==
NULL
&&
fid
==
TSDB_IVLD_FID
)
break
;
...
...
@@ -137,12 +180,12 @@ int32_t tsdbCommit(STsdb *pTsdb) {
if
(
pSet
&&
(
fid
==
TSDB_IVLD_FID
||
pSet
->
fid
<
fid
))
{
// Only has existing FSET but no memory data to commit in this
// existing FSET, only check if file in correct retention
if
(
tsdbApplyRtnOnFSet
(
pTsdb
,
pSet
,
&
(
commith
.
rtn
))
<
0
)
{
tsdbDestroyCommitH
(
&
c
ommith
);
if
(
tsdbApplyRtnOnFSet
(
TSDB_COMMIT_REPO
(
pCommith
),
pSet
,
&
(
pCommith
->
rtn
))
<
0
)
{
tsdbDestroyCommitH
(
pC
ommith
);
return
-
1
;
}
pSet
=
tsdbFSIterNext
(
&
(
commith
.
fsIter
));
pSet
=
tsdbFSIterNext
(
&
(
pCommith
->
fsIter
));
}
else
{
// Has memory data to commit
SDFileSet
*
pCSet
;
...
...
@@ -156,22 +199,30 @@ int32_t tsdbCommit(STsdb *pTsdb) {
// Commit to an existing FSET
pCSet
=
pSet
;
cfid
=
pSet
->
fid
;
pSet
=
tsdbFSIterNext
(
&
(
commith
.
fsIter
));
pSet
=
tsdbFSIterNext
(
&
(
pCommith
->
fsIter
));
}
if
(
tsdbCommitToFile
(
&
c
ommith
,
pCSet
,
cfid
)
<
0
)
{
tsdbDestroyCommitH
(
&
c
ommith
);
if
(
tsdbCommitToFile
(
pC
ommith
,
pCSet
,
cfid
)
<
0
)
{
tsdbDestroyCommitH
(
pC
ommith
);
return
-
1
;
}
fid
=
tsdbNextCommitFid
(
&
c
ommith
);
fid
=
tsdbNextCommitFid
(
pC
ommith
);
}
}
// end commit
tsdbDestroyCommitH
(
&
commith
);
tsdbEndCommit
(
pTsdb
,
TSDB_CODE_SUCCESS
);
return
code
;
}
static
int32_t
tsdbCommitDel
(
SCommitH
*
pCommith
)
{
int32_t
code
=
0
;
// TODO
return
code
;
}
static
int32_t
tsdbCommitCache
(
SCommitH
*
pCommith
)
{
int32_t
code
=
0
;
// TODO
return
code
;
}
...
...
@@ -216,16 +267,6 @@ static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
return
0
;
}
// int tsdbPrepareCommit(STsdb *pTsdb) {
// if (pTsdb->mem == NULL) return 0;
// ASSERT(pTsdb->imem == NULL);
// pTsdb->imem = pTsdb->mem;
// pTsdb->mem = NULL;
// return 0;
// }
void
tsdbGetRtnSnap
(
STsdb
*
pRepo
,
SRtn
*
pRtn
)
{
STsdbKeepCfg
*
pCfg
=
REPO_KEEP_CFG
(
pRepo
);
TSKEY
minKey
,
midKey
,
maxKey
,
now
;
...
...
@@ -243,19 +284,32 @@ void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) {
pRtn
->
minFid
,
pRtn
->
midFid
,
pRtn
->
maxFid
);
}
static
void
tsdbStartCommit
(
STsdb
*
pRepo
)
{
SMemTable
*
pMem
=
pRepo
->
imem
;
static
int32_t
tsdbStartCommit
(
STsdb
*
pTsdb
,
SCommitH
*
pCHandle
)
{
int32_t
code
=
0
;
tsdbInfo
(
"vgId:%d, start to commit"
,
REPO_ID
(
pTsdb
));
if
(
tsdbInitCommitH
(
pCHandle
,
pTsdb
)
<
0
)
{
return
-
1
;
}
tsdb
Info
(
"vgId:%d, start to commit"
,
REPO_ID
(
pRepo
)
);
tsdb
StartFSTxn
(
pTsdb
,
0
,
0
);
tsdbStartFSTxn
(
pRepo
,
0
,
0
)
;
return
code
;
}
static
void
tsdbEndCommit
(
STsdb
*
pTsdb
,
int
eno
)
{
static
int32_t
tsdbEndCommit
(
SCommitH
*
pCHandle
,
int
eno
)
{
int32_t
code
=
0
;
STsdb
*
pTsdb
=
TSDB_COMMIT_REPO
(
pCHandle
);
tsdbDestroyCommitH
(
pCHandle
);
tsdbEndFSTxn
(
pTsdb
);
tsdbMemTableDestroy
(
pTsdb
->
imem
);
pTsdb
->
imem
=
NULL
;
tsdbInfo
(
"vgId:%d, commit over, %s"
,
REPO_ID
(
pTsdb
),
(
eno
==
TSDB_CODE_SUCCESS
)
?
"succeed"
:
"failed"
);
return
code
;
}
static
int
tsdbInitCommitH
(
SCommitH
*
pCommith
,
STsdb
*
pRepo
)
{
...
...
@@ -354,34 +408,73 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) {
tsdbCloseDFileSet
(
TSDB_COMMIT_WRITE_FSET
(
pCommith
));
}
static
int
tsdbCommitToFile
(
SCommitH
*
pCommith
,
SDFileSet
*
pSet
,
int
fid
)
{
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
static
int32_t
tsdbCommitToFileStart
(
SCommitH
*
pCHandle
,
SDFileSet
*
pSet
,
int32_t
fid
)
{
int32_t
code
=
0
;
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCHandle
);
STsdbKeepCfg
*
pCfg
=
REPO_KEEP_CFG
(
pRepo
);
ASSERT
(
pSet
==
NULL
||
pSet
->
fid
==
fid
);
tsdbResetCommitFile
(
pCommith
);
tsdbGetFidKeyRange
(
pCfg
->
days
,
pCfg
->
precision
,
fid
,
&
(
pCommith
->
minKey
),
&
(
pCommith
->
maxKey
));
pCHandle
->
fid
=
fid
;
pCHandle
->
pSet
=
pSet
;
pCHandle
->
isRFileSet
=
false
;
pCHandle
->
isDFileSame
=
false
;
pCHandle
->
isLFileSame
=
false
;
taosArrayClear
(
pCHandle
->
aBlkIdx
);
tsdbGetFidKeyRange
(
pCfg
->
days
,
pCfg
->
precision
,
fid
,
&
(
pCHandle
->
minKey
),
&
(
pCHandle
->
maxKey
));
code
=
tsdbSetAndOpenCommitFile
(
pCHandle
,
pSet
,
fid
);
// Set and open files
if
(
tsdbSetAndOpenCommitFile
(
pCommith
,
pSet
,
fid
)
<
0
)
{
return
code
;
}
static
int32_t
tsdbCommitToFileImpl
(
SCommitH
*
pCHandle
)
{
int32_t
code
=
0
;
// TODO
return
code
;
}
static
int32_t
tsdbCommitToFileEnd
(
SCommitH
*
pCommith
)
{
int32_t
code
=
0
;
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
if
(
tsdbWriteBlockIdx
(
TSDB_COMMIT_HEAD_FILE
(
pCommith
),
pCommith
->
aBlkIdx
,
(
void
**
)(
&
(
TSDB_COMMIT_BUF
(
pCommith
))))
<
0
)
{
tsdbError
(
"vgId:%d, failed to write SBlockIdx part to FSET %d since %s"
,
REPO_ID
(
pRepo
),
pCommith
->
fid
,
tstrerror
(
terrno
));
tsdbCloseCommitFile
(
pCommith
,
true
);
// revert the file change
tsdbApplyDFileSetChange
(
TSDB_COMMIT_WRITE_FSET
(
pCommith
),
pCommith
->
pSet
);
return
-
1
;
}
#if 0
// Loop to commit each table data
for (int tid = 0; tid < pCommith->niters; tid++) {
SCommitIter *pIter = pCommith->iters + tid;
if (pIter->pTable == NULL) continue;
if
(
tsdbUpdateDFileSetHeader
(
&
(
pCommith
->
wSet
))
<
0
)
{
tsdbError
(
"vgId:%d, failed to update FSET %d header since %s"
,
REPO_ID
(
pRepo
),
pCommith
->
fid
,
tstrerror
(
terrno
));
tsdbCloseCommitFile
(
pCommith
,
true
);
// revert the file change
tsdbApplyDFileSetChange
(
TSDB_COMMIT_WRITE_FSET
(
pCommith
),
pCommith
->
pSet
);
return
-
1
;
}
if (tsdbCommitToTable(pCommith, tid) < 0) {
tsdbCloseCommitFile(pCommith, true);
// revert the file change
tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
return -1;
}
// Close commit file
tsdbCloseCommitFile
(
pCommith
,
false
);
if
(
tsdbUpdateDFileSet
(
REPO_FS
(
pRepo
),
&
(
pCommith
->
wSet
))
<
0
)
{
return
-
1
;
}
#endif
return
code
;
}
static
int32_t
tsdbCommitToFile
(
SCommitH
*
pCommith
,
SDFileSet
*
pSet
,
int
fid
)
{
int32_t
code
=
0
;
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
STsdbKeepCfg
*
pCfg
=
REPO_KEEP_CFG
(
pRepo
);
// commit to file start
code
=
tsdbCommitToFileStart
(
pCommith
,
pSet
,
fid
);
if
(
code
)
{
goto
_err
;
}
// Loop to commit each table data in mem and file
int
mIter
=
0
,
fIter
=
0
;
int
nBlkIdx
=
taosArrayGetSize
(
pCommith
->
readh
.
aBlkIdx
);
...
...
@@ -426,31 +519,16 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
}
}
if
(
tsdbWriteBlockIdx
(
TSDB_COMMIT_HEAD_FILE
(
pCommith
),
pCommith
->
aBlkIdx
,
(
void
**
)(
&
(
TSDB_COMMIT_BUF
(
pCommith
))))
<
0
)
{
tsdbError
(
"vgId:%d, failed to write SBlockIdx part to FSET %d since %s"
,
REPO_ID
(
pRepo
),
fid
,
tstrerror
(
terrno
));
tsdbCloseCommitFile
(
pCommith
,
true
);
// revert the file change
tsdbApplyDFileSetChange
(
TSDB_COMMIT_WRITE_FSET
(
pCommith
),
pSet
);
return
-
1
;
}
if
(
tsdbUpdateDFileSetHeader
(
&
(
pCommith
->
wSet
))
<
0
)
{
tsdbError
(
"vgId:%d, failed to update FSET %d header since %s"
,
REPO_ID
(
pRepo
),
fid
,
tstrerror
(
terrno
));
tsdbCloseCommitFile
(
pCommith
,
true
);
// revert the file change
tsdbApplyDFileSetChange
(
TSDB_COMMIT_WRITE_FSET
(
pCommith
),
pSet
);
return
-
1
;
// commit to file end
code
=
tsdbCommitToFileEnd
(
pCommith
);
if
(
code
)
{
goto
_err
;
}
// Close commit file
tsdbCloseCommitFile
(
pCommith
,
false
);
if
(
tsdbUpdateDFileSet
(
REPO_FS
(
pRepo
),
&
(
pCommith
->
wSet
))
<
0
)
{
return
-
1
;
}
return
code
;
return
0
;
_err:
return
code
;
}
static
int32_t
tsdbCreateCommitIters
(
SCommitH
*
pCommith
)
{
...
...
@@ -507,13 +585,6 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) {
pCommith
->
niters
=
0
;
}
static
void
tsdbResetCommitFile
(
SCommitH
*
pCommith
)
{
pCommith
->
isRFileSet
=
false
;
pCommith
->
isDFileSame
=
false
;
pCommith
->
isLFileSame
=
false
;
taosArrayClear
(
pCommith
->
aBlkIdx
);
}
static
int
tsdbSetAndOpenCommitFile
(
SCommitH
*
pCommith
,
SDFileSet
*
pSet
,
int
fid
)
{
SDiskID
did
;
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
...
...
@@ -1590,4 +1661,171 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p
}
return
false
;
}
static
int
tsdbAppendTableRowToCols
(
STsdb
*
pTsdb
,
STable
*
pTable
,
SDataCols
*
pCols
,
STSchema
**
ppSchema
,
STSRow
*
row
,
bool
merge
)
{
if
(
pCols
)
{
if
(
*
ppSchema
==
NULL
||
schemaVersion
(
*
ppSchema
)
!=
TD_ROW_SVER
(
row
))
{
*
ppSchema
=
tsdbGetTableSchemaImpl
(
pTsdb
,
pTable
,
false
,
false
,
TD_ROW_SVER
(
row
));
if
(
*
ppSchema
==
NULL
)
{
ASSERT
(
false
);
return
-
1
;
}
}
tdAppendSTSRowToDataCol
(
row
,
*
ppSchema
,
pCols
,
merge
);
}
return
0
;
}
static
int
tsdbLoadDataFromCache
(
STsdb
*
pTsdb
,
STable
*
pTable
,
STbDataIter
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
,
TKEY
*
filterKeys
,
int
nFilterKeys
,
bool
keepDup
,
SMergeInfo
*
pMergeInfo
)
{
ASSERT
(
maxRowsToRead
>
0
&&
nFilterKeys
>=
0
);
if
(
pIter
==
NULL
)
return
0
;
STSchema
*
pSchema
=
NULL
;
TSKEY
rowKey
=
0
;
TSKEY
fKey
=
0
;
// only fetch lastKey from mem data as file data not used in this function actually
TSKEY
lastKey
=
TSKEY_INITIAL_VAL
;
bool
isRowDel
=
false
;
int
filterIter
=
0
;
STSRow
*
row
=
NULL
;
SMergeInfo
mInfo
;
// TODO: support Multi-Version(the rows with the same TS keys in memory can't be merged if its version refered by
// query handle)
if
(
pMergeInfo
==
NULL
)
pMergeInfo
=
&
mInfo
;
memset
(
pMergeInfo
,
0
,
sizeof
(
*
pMergeInfo
));
pMergeInfo
->
keyFirst
=
INT64_MAX
;
pMergeInfo
->
keyLast
=
INT64_MIN
;
if
(
pCols
)
tdResetDataCols
(
pCols
);
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
||
TD_ROW_KEY
(
row
)
>
maxKey
)
{
rowKey
=
INT64_MAX
;
isRowDel
=
false
;
}
else
{
rowKey
=
TD_ROW_KEY
(
row
);
isRowDel
=
TD_ROW_IS_DELETED
(
row
);
}
if
(
filterIter
>=
nFilterKeys
)
{
fKey
=
INT64_MAX
;
}
else
{
fKey
=
tdGetKey
(
filterKeys
[
filterIter
]);
}
// 1. fkey - no dup since merged up to maxVersion of each query handle by tsdbLoadBlockDataCols
// 2. rowKey - would dup since Multi-Version supported
while
(
true
)
{
if
(
fKey
==
INT64_MAX
&&
rowKey
==
INT64_MAX
)
break
;
if
(
fKey
<
rowKey
)
{
pMergeInfo
->
keyFirst
=
TMIN
(
pMergeInfo
->
keyFirst
,
fKey
);
pMergeInfo
->
keyLast
=
TMAX
(
pMergeInfo
->
keyLast
,
fKey
);
filterIter
++
;
if
(
filterIter
>=
nFilterKeys
)
{
fKey
=
INT64_MAX
;
}
else
{
fKey
=
tdGetKey
(
filterKeys
[
filterIter
]);
}
#if 1
}
else
if
(
fKey
>
rowKey
)
{
if
(
isRowDel
)
{
// TODO: support delete function
pMergeInfo
->
rowsDeleteFailed
++
;
}
else
{
if
(
pMergeInfo
->
rowsInserted
-
pMergeInfo
->
rowsDeleteSucceed
>=
maxRowsToRead
)
break
;
if
(
pCols
&&
pMergeInfo
->
nOperations
>=
pCols
->
maxPoints
)
break
;
if
(
lastKey
!=
rowKey
)
{
pMergeInfo
->
rowsInserted
++
;
pMergeInfo
->
nOperations
++
;
pMergeInfo
->
keyFirst
=
TMIN
(
pMergeInfo
->
keyFirst
,
rowKey
);
pMergeInfo
->
keyLast
=
TMAX
(
pMergeInfo
->
keyLast
,
rowKey
);
if
(
pCols
)
{
if
(
lastKey
!=
TSKEY_INITIAL_VAL
)
{
++
pCols
->
numOfRows
;
}
tsdbAppendTableRowToCols
(
pTsdb
,
pTable
,
pCols
,
&
pSchema
,
row
,
false
);
}
lastKey
=
rowKey
;
}
else
{
if
(
keepDup
)
{
tsdbAppendTableRowToCols
(
pTsdb
,
pTable
,
pCols
,
&
pSchema
,
row
,
true
);
}
else
{
// discard
}
}
}
tsdbTbDataIterNext
(
pIter
);
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
||
TD_ROW_KEY
(
row
)
>
maxKey
)
{
rowKey
=
INT64_MAX
;
isRowDel
=
false
;
}
else
{
rowKey
=
TD_ROW_KEY
(
row
);
isRowDel
=
TD_ROW_IS_DELETED
(
row
);
}
}
else
{
// fkey == rowKey
if
(
isRowDel
)
{
// TODO: support delete function(How to stands for delete in file? rowVersion = -1?)
ASSERT
(
!
keepDup
);
if
(
pCols
&&
pMergeInfo
->
nOperations
>=
pCols
->
maxPoints
)
break
;
pMergeInfo
->
rowsDeleteSucceed
++
;
pMergeInfo
->
nOperations
++
;
tsdbAppendTableRowToCols
(
pTsdb
,
pTable
,
pCols
,
&
pSchema
,
row
,
false
);
}
else
{
if
(
keepDup
)
{
if
(
pCols
&&
pMergeInfo
->
nOperations
>=
pCols
->
maxPoints
)
break
;
if
(
lastKey
!=
rowKey
)
{
pMergeInfo
->
rowsUpdated
++
;
pMergeInfo
->
nOperations
++
;
pMergeInfo
->
keyFirst
=
TMIN
(
pMergeInfo
->
keyFirst
,
rowKey
);
pMergeInfo
->
keyLast
=
TMAX
(
pMergeInfo
->
keyLast
,
rowKey
);
if
(
pCols
)
{
if
(
lastKey
!=
TSKEY_INITIAL_VAL
)
{
++
pCols
->
numOfRows
;
}
tsdbAppendTableRowToCols
(
pTsdb
,
pTable
,
pCols
,
&
pSchema
,
row
,
false
);
}
lastKey
=
rowKey
;
}
else
{
tsdbAppendTableRowToCols
(
pTsdb
,
pTable
,
pCols
,
&
pSchema
,
row
,
true
);
}
}
else
{
pMergeInfo
->
keyFirst
=
TMIN
(
pMergeInfo
->
keyFirst
,
fKey
);
pMergeInfo
->
keyLast
=
TMAX
(
pMergeInfo
->
keyLast
,
fKey
);
}
}
tsdbTbDataIterNext
(
pIter
);
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
||
TD_ROW_KEY
(
row
)
>
maxKey
)
{
rowKey
=
INT64_MAX
;
isRowDel
=
false
;
}
else
{
rowKey
=
TD_ROW_KEY
(
row
);
isRowDel
=
TD_ROW_IS_DELETED
(
row
);
}
filterIter
++
;
if
(
filterIter
>=
nFilterKeys
)
{
fKey
=
INT64_MAX
;
}
else
{
fKey
=
tdGetKey
(
filterKeys
[
filterIter
]);
}
}
#endif
}
if
(
pCols
&&
(
lastKey
!=
TSKEY_INITIAL_VAL
))
{
++
pCols
->
numOfRows
;
}
return
0
;
}
\ No newline at end of file
source/dnode/vnode/src/tsdb/tsdbCommit2.c
已删除
100644 → 0
浏览文件 @
0d665b57
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdb.h"
typedef
struct
{
SMemTable2
*
pMemTable
;
int32_t
minutes
;
int8_t
precision
;
TSKEY
nCommitKey
;
int32_t
fid
;
TSKEY
minKey
;
TSKEY
maxKey
;
SReadH
readh
;
SDFileSet
wSet
;
SArray
*
aBlkIdx
;
SArray
*
aSupBlk
;
SArray
*
aSubBlk
;
SArray
*
aDelInfo
;
}
SCommitH
;
static
int32_t
tsdbCommitStart
(
SCommitH
*
pCHandle
,
STsdb
*
pTsdb
);
static
int32_t
tsdbCommitEnd
(
SCommitH
*
pCHandle
);
static
int32_t
tsdbCommitImpl
(
SCommitH
*
pCHandle
);
int32_t
tsdbBegin2
(
STsdb
*
pTsdb
)
{
int32_t
code
=
0
;
ASSERT
(
pTsdb
->
mem
==
NULL
);
code
=
tsdbMemTableCreate2
(
pTsdb
,
(
SMemTable2
**
)
&
pTsdb
->
mem
);
if
(
code
)
{
tsdbError
(
"vgId:%d failed to begin TSDB since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
goto
_exit
;
}
_exit:
return
code
;
}
int32_t
tsdbCommit2
(
STsdb
*
pTsdb
)
{
int32_t
code
=
0
;
SCommitH
ch
=
{
0
};
// start to commit
code
=
tsdbCommitStart
(
&
ch
,
pTsdb
);
if
(
code
)
{
goto
_exit
;
}
// commit
code
=
tsdbCommitImpl
(
&
ch
);
if
(
code
)
{
goto
_err
;
}
// end commit
code
=
tsdbCommitEnd
(
&
ch
);
if
(
code
)
{
goto
_exit
;
}
_exit:
return
code
;
_err:
tsdbError
(
"vgId:%d failed to commit since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
static
int32_t
tsdbCommitStart
(
SCommitH
*
pCHandle
,
STsdb
*
pTsdb
)
{
int32_t
code
=
0
;
SMemTable2
*
pMemTable
=
(
SMemTable2
*
)
pTsdb
->
mem
;
tsdbInfo
(
"vgId:%d start to commit"
,
TD_VID
(
pTsdb
->
pVnode
));
// switch to commit
ASSERT
(
pTsdb
->
imem
==
NULL
&&
pTsdb
->
mem
);
pTsdb
->
imem
=
pTsdb
->
mem
;
pTsdb
->
mem
=
NULL
;
// open handle
pCHandle
->
pMemTable
=
pMemTable
;
pCHandle
->
minutes
=
pTsdb
->
keepCfg
.
days
;
pCHandle
->
precision
=
pTsdb
->
keepCfg
.
precision
;
pCHandle
->
nCommitKey
=
pMemTable
->
minKey
.
ts
;
code
=
tsdbInitReadH
(
&
pCHandle
->
readh
,
pTsdb
);
if
(
code
)
{
goto
_err
;
}
pCHandle
->
aBlkIdx
=
taosArrayInit
(
0
,
sizeof
(
SBlockIdx
));
if
(
pCHandle
->
aBlkIdx
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
pCHandle
->
aSupBlk
=
taosArrayInit
(
0
,
sizeof
(
SBlock
));
if
(
pCHandle
->
aSupBlk
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
pCHandle
->
aSubBlk
=
taosArrayInit
(
0
,
sizeof
(
SBlock
));
if
(
pCHandle
->
aSubBlk
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
pCHandle
->
aDelInfo
=
taosArrayInit
(
0
,
sizeof
(
SDelInfo
));
if
(
pCHandle
->
aDelInfo
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
// start FS transaction
tsdbStartFSTxn
(
pTsdb
,
0
,
0
);
return
code
;
_err:
return
code
;
}
static
int32_t
tsdbCommitEnd
(
SCommitH
*
pCHandle
)
{
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pCHandle
->
pMemTable
->
pTsdb
;
SMemTable2
*
pMemTable
=
(
SMemTable2
*
)
pTsdb
->
imem
;
// end transaction
code
=
tsdbEndFSTxn
(
pTsdb
);
if
(
code
)
{
goto
_err
;
}
// close handle
taosArrayClear
(
pCHandle
->
aDelInfo
);
taosArrayClear
(
pCHandle
->
aSubBlk
);
taosArrayClear
(
pCHandle
->
aSupBlk
);
taosArrayClear
(
pCHandle
->
aBlkIdx
);
tsdbDestroyReadH
(
&
pCHandle
->
readh
);
// destroy memtable (todo: unref it)
pTsdb
->
imem
=
NULL
;
tsdbMemTableDestroy2
(
pMemTable
);
tsdbInfo
(
"vgId:%d commit over"
,
TD_VID
(
pTsdb
->
pVnode
));
return
code
;
_err:
return
code
;
}
static
int32_t
tsdbCommitTableStart
(
SCommitH
*
pCHandle
)
{
int32_t
code
=
0
;
// TODO
return
code
;
}
static
int32_t
tsdbCommitTableEnd
(
SCommitH
*
pCHandle
)
{
int32_t
code
=
0
;
// TODO
return
code
;
}
static
int32_t
tsdbCommitTable
(
SCommitH
*
pCHandle
,
SMemData
*
pMemData
,
SBlockIdx
*
pBlockIdx
)
{
int32_t
code
=
0
;
SMemDataIter
iter
=
{
0
};
// commit table start
code
=
tsdbCommitTableStart
(
pCHandle
);
if
(
code
)
{
goto
_err
;
}
// commit table impl
if
(
pMemData
&&
pBlockIdx
)
{
// TODO
}
else
if
(
pMemData
)
{
// TODO
}
else
{
// TODO
}
// commit table end
code
=
tsdbCommitTableEnd
(
pCHandle
);
if
(
code
)
{
goto
_err
;
}
return
code
;
_err:
return
code
;
}
static
int32_t
tsdbTableIdCmprFn
(
const
void
*
p1
,
const
void
*
p2
)
{
TABLEID
*
pId1
=
(
TABLEID
*
)
p1
;
TABLEID
*
pId2
=
(
TABLEID
*
)
p2
;
if
(
pId1
->
suid
<
pId2
->
suid
)
{
return
-
1
;
}
else
if
(
pId1
->
suid
>
pId2
->
suid
)
{
return
1
;
}
if
(
pId1
->
uid
<
pId2
->
uid
)
{
return
-
1
;
}
else
if
(
pId1
->
uid
>
pId2
->
uid
)
{
return
1
;
}
return
0
;
}
static
int32_t
tsdbWriteBlockIdx
(
SDFile
*
pFile
,
SArray
*
pArray
,
uint8_t
**
ppBuf
)
{
int32_t
code
=
0
;
// TODO
return
code
;
}
static
int32_t
tsdbCommitFileStart
(
SCommitH
*
pCHandle
)
{
int32_t
code
=
0
;
STsdb
*
pTsdb
=
pCHandle
->
pMemTable
->
pTsdb
;
SDFileSet
*
pSet
=
NULL
;
taosArrayClear
(
pCHandle
->
aBlkIdx
);
return
code
;
}
static
int32_t
tsdbCommitFileEnd
(
SCommitH
*
pCHandle
)
{
int32_t
code
=
0
;
// TODO
return
code
;
}
static
int32_t
tsdbCommitFile
(
SCommitH
*
pCHandle
)
{
int32_t
code
=
0
;
SMemData
*
pMemData
;
SBlockIdx
*
pBlockIdx
;
int32_t
iMemData
;
int32_t
nMemData
;
int32_t
iBlockIdx
;
int32_t
nBlockIdx
;
// commit file start
code
=
tsdbCommitFileStart
(
pCHandle
);
if
(
code
)
{
goto
_err
;
}
// commit file impl
iMemData
=
0
;
nMemData
=
taosArrayGetSize
(
pCHandle
->
pMemTable
->
aMemData
);
iBlockIdx
=
0
;
nBlockIdx
=
0
;
// todo
for
(;;)
{
if
(
iMemData
>=
nMemData
&&
iBlockIdx
>=
nBlockIdx
)
break
;
pMemData
=
NULL
;
pBlockIdx
=
NULL
;
if
(
iMemData
<
nMemData
)
{
pMemData
=
(
SMemData
*
)
taosArrayGetP
(
pCHandle
->
pMemTable
->
aMemData
,
iMemData
);
}
if
(
iBlockIdx
<
nBlockIdx
)
{
// pBlockIdx = ;
}
if
(
pMemData
&&
pBlockIdx
)
{
int32_t
c
=
tsdbTableIdCmprFn
(
pMemData
,
pBlockIdx
);
if
(
c
<
0
)
{
iMemData
++
;
pBlockIdx
=
NULL
;
}
else
if
(
c
==
0
)
{
iMemData
++
;
iBlockIdx
++
;
}
else
{
iBlockIdx
++
;
pMemData
=
NULL
;
}
}
else
{
if
(
pMemData
)
{
iMemData
++
;
}
else
{
iBlockIdx
++
;
}
}
code
=
tsdbCommitTable
(
pCHandle
,
pMemData
,
pBlockIdx
);
if
(
code
)
{
goto
_err
;
}
}
// commit file end
code
=
tsdbCommitFileEnd
(
pCHandle
);
if
(
code
)
{
goto
_err
;
}
return
code
;
_err:
return
code
;
}
static
int32_t
tsdbCommitData
(
SCommitH
*
pCHandle
)
{
int32_t
code
=
0
;
int32_t
fid
;
if
(
pCHandle
->
pMemTable
->
nRows
==
0
)
goto
_exit
;
// loop to commit to each file
for
(;;)
{
if
(
pCHandle
->
nCommitKey
==
TSKEY_MAX
)
break
;
pCHandle
->
fid
=
TSDB_KEY_FID
(
pCHandle
->
nCommitKey
,
pCHandle
->
minutes
,
pCHandle
->
precision
);
tsdbGetFidKeyRange
(
pCHandle
->
minutes
,
pCHandle
->
precision
,
pCHandle
->
fid
,
&
pCHandle
->
minKey
,
&
pCHandle
->
maxKey
);
code
=
tsdbCommitFile
(
pCHandle
);
if
(
code
)
{
goto
_err
;
}
}
_exit:
return
code
;
_err:
return
code
;
}
static
int32_t
delInfoCmprFn
(
const
void
*
p1
,
const
void
*
p2
)
{
SDelInfo
*
pDelInfo1
=
(
SDelInfo
*
)
p1
;
SDelInfo
*
pDelInfo2
=
(
SDelInfo
*
)
p2
;
if
(
pDelInfo1
->
suid
<
pDelInfo2
->
suid
)
{
return
-
1
;
}
else
if
(
pDelInfo1
->
suid
>
pDelInfo2
->
suid
)
{
return
1
;
}
if
(
pDelInfo1
->
uid
<
pDelInfo2
->
uid
)
{
return
-
1
;
}
else
if
(
pDelInfo1
->
uid
>
pDelInfo2
->
uid
)
{
return
1
;
}
if
(
pDelInfo1
->
version
<
pDelInfo2
->
version
)
{
return
-
1
;
}
else
if
(
pDelInfo1
->
version
>
pDelInfo2
->
version
)
{
return
1
;
}
return
0
;
}
static
int32_t
tsdbCommitDelete
(
SCommitH
*
pCHandle
)
{
int32_t
code
=
0
;
SDelInfo
delInfo
;
SMemData
*
pMemData
;
if
(
pCHandle
->
pMemTable
->
nDelOp
==
0
)
goto
_exit
;
// load del array (todo)
// loop to append SDelInfo
for
(
int32_t
iMemData
=
0
;
iMemData
<
taosArrayGetSize
(
pCHandle
->
pMemTable
->
aMemData
);
iMemData
++
)
{
pMemData
=
(
SMemData
*
)
taosArrayGetP
(
pCHandle
->
pMemTable
->
aMemData
,
iMemData
);
for
(
SDelOp
*
pDelOp
=
pMemData
->
delOpHead
;
pDelOp
;
pDelOp
=
pDelOp
->
pNext
)
{
delInfo
=
(
SDelInfo
){.
suid
=
pMemData
->
suid
,
.
uid
=
pMemData
->
uid
,
.
version
=
pDelOp
->
version
,
.
sKey
=
pDelOp
->
sKey
,
.
eKey
=
pDelOp
->
eKey
};
if
(
taosArrayPush
(
pCHandle
->
aDelInfo
,
&
delInfo
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
}
}
taosArraySort
(
pCHandle
->
aDelInfo
,
delInfoCmprFn
);
// write to new file
_exit:
return
code
;
_err:
return
code
;
}
static
int32_t
tsdbCommitCache
(
SCommitH
*
pCHandle
)
{
int32_t
code
=
0
;
// TODO
return
code
;
}
static
int32_t
tsdbCommitImpl
(
SCommitH
*
pCHandle
)
{
int32_t
code
=
0
;
// commit data
code
=
tsdbCommitData
(
pCHandle
);
if
(
code
)
{
goto
_err
;
}
// commit delete
code
=
tsdbCommitDelete
(
pCHandle
);
if
(
code
)
{
goto
_err
;
}
// commit cache if need (todo)
if
(
0
)
{
code
=
tsdbCommitCache
(
pCHandle
);
if
(
code
)
{
goto
_err
;
}
}
return
code
;
_err:
return
code
;
}
\ No newline at end of file
source/dnode/vnode/src/tsdb/tsdbMemTable.c
浏览文件 @
4935b146
...
...
@@ -188,23 +188,6 @@ _err:
return
code
;
}
static
int
tsdbAppendTableRowToCols
(
STsdb
*
pTsdb
,
STable
*
pTable
,
SDataCols
*
pCols
,
STSchema
**
ppSchema
,
STSRow
*
row
,
bool
merge
)
{
if
(
pCols
)
{
if
(
*
ppSchema
==
NULL
||
schemaVersion
(
*
ppSchema
)
!=
TD_ROW_SVER
(
row
))
{
*
ppSchema
=
tsdbGetTableSchemaImpl
(
pTsdb
,
pTable
,
false
,
false
,
TD_ROW_SVER
(
row
));
if
(
*
ppSchema
==
NULL
)
{
ASSERT
(
false
);
return
-
1
;
}
}
tdAppendSTSRowToDataCol
(
row
,
*
ppSchema
,
pCols
,
merge
);
}
return
0
;
}
int32_t
tsdbTbDataIterCreate
(
STbData
*
pTbData
,
TSDBKEY
*
pFrom
,
int8_t
backward
,
STbDataIter
**
ppIter
)
{
int32_t
code
=
0
;
...
...
@@ -310,166 +293,6 @@ bool tsdbTbDataIterGet(STbDataIter *pIter, TSDBROW *pRow) {
return
true
;
}
/**
* This is an important function to load data or try to load data from memory skiplist iterator.
*
* This function load memory data until:
* 1. iterator ends
* 2. data key exceeds maxKey
* 3. rowsIncreased = rowsInserted - rowsDeleteSucceed >= maxRowsToRead
* 4. operations in pCols not exceeds its max capacity if pCols is given
*
* The function tries to procceed AS MUCH AS POSSIBLE.
*/
int
tsdbLoadDataFromCache
(
STsdb
*
pTsdb
,
STable
*
pTable
,
STbDataIter
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
,
TKEY
*
filterKeys
,
int
nFilterKeys
,
bool
keepDup
,
SMergeInfo
*
pMergeInfo
)
{
ASSERT
(
maxRowsToRead
>
0
&&
nFilterKeys
>=
0
);
if
(
pIter
==
NULL
)
return
0
;
STSchema
*
pSchema
=
NULL
;
TSKEY
rowKey
=
0
;
TSKEY
fKey
=
0
;
// only fetch lastKey from mem data as file data not used in this function actually
TSKEY
lastKey
=
TSKEY_INITIAL_VAL
;
bool
isRowDel
=
false
;
int
filterIter
=
0
;
STSRow
*
row
=
NULL
;
SMergeInfo
mInfo
;
// TODO: support Multi-Version(the rows with the same TS keys in memory can't be merged if its version refered by
// query handle)
if
(
pMergeInfo
==
NULL
)
pMergeInfo
=
&
mInfo
;
memset
(
pMergeInfo
,
0
,
sizeof
(
*
pMergeInfo
));
pMergeInfo
->
keyFirst
=
INT64_MAX
;
pMergeInfo
->
keyLast
=
INT64_MIN
;
if
(
pCols
)
tdResetDataCols
(
pCols
);
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
||
TD_ROW_KEY
(
row
)
>
maxKey
)
{
rowKey
=
INT64_MAX
;
isRowDel
=
false
;
}
else
{
rowKey
=
TD_ROW_KEY
(
row
);
isRowDel
=
TD_ROW_IS_DELETED
(
row
);
}
if
(
filterIter
>=
nFilterKeys
)
{
fKey
=
INT64_MAX
;
}
else
{
fKey
=
tdGetKey
(
filterKeys
[
filterIter
]);
}
// 1. fkey - no dup since merged up to maxVersion of each query handle by tsdbLoadBlockDataCols
// 2. rowKey - would dup since Multi-Version supported
while
(
true
)
{
if
(
fKey
==
INT64_MAX
&&
rowKey
==
INT64_MAX
)
break
;
if
(
fKey
<
rowKey
)
{
pMergeInfo
->
keyFirst
=
TMIN
(
pMergeInfo
->
keyFirst
,
fKey
);
pMergeInfo
->
keyLast
=
TMAX
(
pMergeInfo
->
keyLast
,
fKey
);
filterIter
++
;
if
(
filterIter
>=
nFilterKeys
)
{
fKey
=
INT64_MAX
;
}
else
{
fKey
=
tdGetKey
(
filterKeys
[
filterIter
]);
}
#if 1
}
else
if
(
fKey
>
rowKey
)
{
if
(
isRowDel
)
{
// TODO: support delete function
pMergeInfo
->
rowsDeleteFailed
++
;
}
else
{
if
(
pMergeInfo
->
rowsInserted
-
pMergeInfo
->
rowsDeleteSucceed
>=
maxRowsToRead
)
break
;
if
(
pCols
&&
pMergeInfo
->
nOperations
>=
pCols
->
maxPoints
)
break
;
if
(
lastKey
!=
rowKey
)
{
pMergeInfo
->
rowsInserted
++
;
pMergeInfo
->
nOperations
++
;
pMergeInfo
->
keyFirst
=
TMIN
(
pMergeInfo
->
keyFirst
,
rowKey
);
pMergeInfo
->
keyLast
=
TMAX
(
pMergeInfo
->
keyLast
,
rowKey
);
if
(
pCols
)
{
if
(
lastKey
!=
TSKEY_INITIAL_VAL
)
{
++
pCols
->
numOfRows
;
}
tsdbAppendTableRowToCols
(
pTsdb
,
pTable
,
pCols
,
&
pSchema
,
row
,
false
);
}
lastKey
=
rowKey
;
}
else
{
if
(
keepDup
)
{
tsdbAppendTableRowToCols
(
pTsdb
,
pTable
,
pCols
,
&
pSchema
,
row
,
true
);
}
else
{
// discard
}
}
}
tsdbTbDataIterNext
(
pIter
);
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
||
TD_ROW_KEY
(
row
)
>
maxKey
)
{
rowKey
=
INT64_MAX
;
isRowDel
=
false
;
}
else
{
rowKey
=
TD_ROW_KEY
(
row
);
isRowDel
=
TD_ROW_IS_DELETED
(
row
);
}
}
else
{
// fkey == rowKey
if
(
isRowDel
)
{
// TODO: support delete function(How to stands for delete in file? rowVersion = -1?)
ASSERT
(
!
keepDup
);
if
(
pCols
&&
pMergeInfo
->
nOperations
>=
pCols
->
maxPoints
)
break
;
pMergeInfo
->
rowsDeleteSucceed
++
;
pMergeInfo
->
nOperations
++
;
tsdbAppendTableRowToCols
(
pTsdb
,
pTable
,
pCols
,
&
pSchema
,
row
,
false
);
}
else
{
if
(
keepDup
)
{
if
(
pCols
&&
pMergeInfo
->
nOperations
>=
pCols
->
maxPoints
)
break
;
if
(
lastKey
!=
rowKey
)
{
pMergeInfo
->
rowsUpdated
++
;
pMergeInfo
->
nOperations
++
;
pMergeInfo
->
keyFirst
=
TMIN
(
pMergeInfo
->
keyFirst
,
rowKey
);
pMergeInfo
->
keyLast
=
TMAX
(
pMergeInfo
->
keyLast
,
rowKey
);
if
(
pCols
)
{
if
(
lastKey
!=
TSKEY_INITIAL_VAL
)
{
++
pCols
->
numOfRows
;
}
tsdbAppendTableRowToCols
(
pTsdb
,
pTable
,
pCols
,
&
pSchema
,
row
,
false
);
}
lastKey
=
rowKey
;
}
else
{
tsdbAppendTableRowToCols
(
pTsdb
,
pTable
,
pCols
,
&
pSchema
,
row
,
true
);
}
}
else
{
pMergeInfo
->
keyFirst
=
TMIN
(
pMergeInfo
->
keyFirst
,
fKey
);
pMergeInfo
->
keyLast
=
TMAX
(
pMergeInfo
->
keyLast
,
fKey
);
}
}
tsdbTbDataIterNext
(
pIter
);
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
||
TD_ROW_KEY
(
row
)
>
maxKey
)
{
rowKey
=
INT64_MAX
;
isRowDel
=
false
;
}
else
{
rowKey
=
TD_ROW_KEY
(
row
);
isRowDel
=
TD_ROW_IS_DELETED
(
row
);
}
filterIter
++
;
if
(
filterIter
>=
nFilterKeys
)
{
fKey
=
INT64_MAX
;
}
else
{
fKey
=
tdGetKey
(
filterKeys
[
filterIter
]);
}
}
#endif
}
if
(
pCols
&&
(
lastKey
!=
TSKEY_INITIAL_VAL
))
{
++
pCols
->
numOfRows
;
}
return
0
;
}
static
int32_t
tsdbGetOrCreateTbData
(
SMemTable
*
pMemTable
,
tb_uid_t
suid
,
tb_uid_t
uid
,
STbData
**
ppTbData
)
{
int32_t
code
=
0
;
int32_t
idx
=
0
;
...
...
source/dnode/vnode/src/tsdb/tsdbMemTable2.c
已删除
100644 → 0
浏览文件 @
0d665b57
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdb.h"
typedef
struct
{
tb_uid_t
uid
;
STSchema
*
pTSchema
;
}
SSkmInfo
;
#define SL_MAX_LEVEL 5
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2)
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level))
#define SL_MOVE_BACKWARD 0x1
#define SL_MOVE_FROM_POS 0x2
static
int32_t
tsdbGetOrCreateMemData
(
SMemTable2
*
pMemTable
,
tb_uid_t
suid
,
tb_uid_t
uid
,
SMemData
**
ppMemData
);
static
int
memDataPCmprFn
(
const
void
*
p1
,
const
void
*
p2
);
static
int32_t
tPutTSDBRow
(
uint8_t
*
p
,
TSDBROW
*
pRow
);
static
int32_t
tGetTSDBRow
(
uint8_t
*
p
,
TSDBROW
*
pRow
);
static
int8_t
tsdbMemSkipListRandLevel
(
SMemSkipList
*
pSl
);
static
int32_t
tsdbInsertTableDataImpl
(
SMemTable2
*
pMemTable
,
SMemData
*
pMemData
,
int64_t
version
,
SVSubmitBlk
*
pSubmitBlk
);
static
void
memDataMovePosTo
(
SMemData
*
pMemData
,
SMemSkipListNode
**
pos
,
TSDBKEY
*
pKey
,
int32_t
flags
);
// SMemTable ==============================================
int32_t
tsdbMemTableCreate2
(
STsdb
*
pTsdb
,
SMemTable2
**
ppMemTable
)
{
int32_t
code
=
0
;
SMemTable2
*
pMemTable
=
NULL
;
pMemTable
=
(
SMemTable2
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
pMemTable
));
if
(
pMemTable
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
pMemTable
->
pTsdb
=
pTsdb
;
pMemTable
->
nRef
=
1
;
pMemTable
->
minKey
=
(
TSDBKEY
){.
version
=
INT64_MAX
,
.
ts
=
TSKEY_MAX
};
pMemTable
->
maxKey
=
(
TSDBKEY
){.
version
=
-
1
,
.
ts
=
TSKEY_MIN
};
pMemTable
->
nRows
=
0
;
pMemTable
->
nDelOp
=
0
;
pMemTable
->
aMemData
=
taosArrayInit
(
512
,
sizeof
(
SMemData
*
));
if
(
pMemTable
->
aMemData
==
NULL
)
{
taosMemoryFree
(
pMemTable
);
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
*
ppMemTable
=
pMemTable
;
return
code
;
_err:
*
ppMemTable
=
NULL
;
return
code
;
}
void
tsdbMemTableDestroy2
(
SMemTable2
*
pMemTable
)
{
taosArrayDestroyEx
(
pMemTable
->
aMemData
,
NULL
/*TODO*/
);
taosMemoryFree
(
pMemTable
);
}
int32_t
tsdbInsertTableData2
(
STsdb
*
pTsdb
,
int64_t
version
,
SVSubmitBlk
*
pSubmitBlk
)
{
int32_t
code
=
0
;
SMemTable2
*
pMemTable
=
(
SMemTable2
*
)
pTsdb
->
mem
;
// TODO
SMemData
*
pMemData
;
TSDBROW
row
=
{.
version
=
version
};
ASSERT
(
pMemTable
);
ASSERT
(
pSubmitBlk
->
nData
>
0
);
{
// check if table exists (todo)
}
code
=
tsdbGetOrCreateMemData
(
pMemTable
,
pSubmitBlk
->
suid
,
pSubmitBlk
->
uid
,
&
pMemData
);
if
(
code
)
{
tsdbError
(
"vgId:%d, failed to create/get table data since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
goto
_err
;
}
// do insert
code
=
tsdbInsertTableDataImpl
(
pMemTable
,
pMemData
,
version
,
pSubmitBlk
);
if
(
code
)
{
goto
_err
;
}
return
code
;
_err:
return
code
;
}
int32_t
tsdbDeleteTableData2
(
STsdb
*
pTsdb
,
int64_t
version
,
tb_uid_t
suid
,
tb_uid_t
uid
,
TSKEY
sKey
,
TSKEY
eKey
)
{
int32_t
code
=
0
;
SMemTable2
*
pMemTable
=
(
SMemTable2
*
)
pTsdb
->
mem
;
// TODO
SMemData
*
pMemData
;
SVBufPool
*
pPool
=
pTsdb
->
pVnode
->
inUse
;
ASSERT
(
pMemTable
);
{
// check if table exists (todo)
}
code
=
tsdbGetOrCreateMemData
(
pMemTable
,
suid
,
uid
,
&
pMemData
);
if
(
code
)
{
goto
_err
;
}
// do delete
SDelOp
*
pDelOp
=
(
SDelOp
*
)
vnodeBufPoolMalloc
(
pPool
,
sizeof
(
*
pDelOp
));
if
(
pDelOp
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
pDelOp
->
version
=
version
;
pDelOp
->
sKey
=
sKey
;
pDelOp
->
eKey
=
eKey
;
pDelOp
->
pNext
=
NULL
;
if
(
pMemData
->
delOpHead
==
NULL
)
{
ASSERT
(
pMemData
->
delOpTail
==
NULL
);
pMemData
->
delOpHead
=
pMemData
->
delOpTail
=
pDelOp
;
}
else
{
pMemData
->
delOpTail
->
pNext
=
pDelOp
;
pMemData
->
delOpTail
=
pDelOp
;
}
{
// update the state of pMemTable, pMemData, last and lastrow (todo)
}
pMemTable
->
nDelOp
++
;
tsdbDebug
(
"vgId:%d, delete data from table suid:%"
PRId64
" uid:%"
PRId64
" sKey:%"
PRId64
" eKey:%"
PRId64
" since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
suid
,
uid
,
sKey
,
eKey
,
tstrerror
(
code
));
return
code
;
_err:
tsdbError
(
"vgId:%d, failed to delete data from table suid:%"
PRId64
" uid:%"
PRId64
" sKey:%"
PRId64
" eKey:%"
PRId64
" since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
suid
,
uid
,
sKey
,
eKey
,
tstrerror
(
code
));
return
code
;
}
void
tsdbMemDataIterOpen
(
SMemData
*
pMemData
,
TSDBKEY
*
pKey
,
int8_t
backward
,
SMemDataIter
*
pIter
)
{
SMemSkipListNode
*
pos
[
SL_MAX_LEVEL
];
pIter
->
pMemData
=
pMemData
;
pIter
->
backward
=
backward
;
pIter
->
pRow
=
NULL
;
if
(
pKey
==
NULL
)
{
// create from head or tail
if
(
backward
)
{
pIter
->
pNode
=
SL_NODE_BACKWARD
(
pMemData
->
sl
.
pTail
,
0
);
}
else
{
pIter
->
pNode
=
SL_NODE_FORWARD
(
pMemData
->
sl
.
pHead
,
0
);
}
}
else
{
// create from a key
if
(
backward
)
{
memDataMovePosTo
(
pMemData
,
pos
,
pKey
,
SL_MOVE_BACKWARD
);
pIter
->
pNode
=
SL_NODE_BACKWARD
(
pos
[
0
],
0
);
}
else
{
memDataMovePosTo
(
pMemData
,
pos
,
pKey
,
0
);
pIter
->
pNode
=
SL_NODE_FORWARD
(
pos
[
0
],
0
);
}
}
}
bool
tsdbMemDataIterNext
(
SMemDataIter
*
pIter
)
{
SMemSkipListNode
*
pHead
=
pIter
->
pMemData
->
sl
.
pHead
;
SMemSkipListNode
*
pTail
=
pIter
->
pMemData
->
sl
.
pTail
;
pIter
->
pRow
=
NULL
;
if
(
pIter
->
backward
)
{
ASSERT
(
pIter
->
pNode
!=
pTail
);
if
(
pIter
->
pNode
==
pHead
)
{
return
false
;
}
pIter
->
pNode
=
SL_NODE_BACKWARD
(
pIter
->
pNode
,
0
);
if
(
pIter
->
pNode
==
pHead
)
{
return
false
;
}
}
else
{
ASSERT
(
pIter
->
pNode
!=
pHead
);
if
(
pIter
->
pNode
==
pTail
)
{
return
false
;
}
pIter
->
pNode
=
SL_NODE_FORWARD
(
pIter
->
pNode
,
0
);
if
(
pIter
->
pNode
==
pTail
)
{
return
false
;
}
}
return
true
;
}
void
tsdbMemDataIterGet
(
SMemDataIter
*
pIter
,
TSDBROW
**
ppRow
)
{
if
(
pIter
->
pRow
)
{
*
ppRow
=
pIter
->
pRow
;
}
else
{
SMemSkipListNode
*
pHead
=
pIter
->
pMemData
->
sl
.
pHead
;
SMemSkipListNode
*
pTail
=
pIter
->
pMemData
->
sl
.
pTail
;
if
(
pIter
->
backward
)
{
ASSERT
(
pIter
->
pNode
!=
pTail
);
if
(
pIter
->
pNode
==
pHead
)
{
*
ppRow
=
NULL
;
}
else
{
tGetTSDBRow
((
uint8_t
*
)
SL_NODE_DATA
(
pIter
->
pNode
),
&
pIter
->
row
);
*
ppRow
=
&
pIter
->
row
;
}
}
else
{
ASSERT
(
pIter
->
pNode
!=
pHead
);
if
(
pIter
->
pNode
==
pTail
)
{
*
ppRow
=
NULL
;
}
else
{
tGetTSDBRow
((
uint8_t
*
)
SL_NODE_DATA
(
pIter
->
pNode
),
&
pIter
->
row
);
*
ppRow
=
&
pIter
->
row
;
}
}
}
}
static
int32_t
tsdbGetOrCreateMemData
(
SMemTable2
*
pMemTable
,
tb_uid_t
suid
,
tb_uid_t
uid
,
SMemData
**
ppMemData
)
{
int32_t
code
=
0
;
int32_t
idx
=
0
;
SMemData
*
pMemDataT
=
&
(
SMemData
){.
suid
=
suid
,
.
uid
=
uid
};
SMemData
*
pMemData
=
NULL
;
SVBufPool
*
pPool
=
pMemTable
->
pTsdb
->
pVnode
->
inUse
;
int8_t
maxLevel
=
pMemTable
->
pTsdb
->
pVnode
->
config
.
tsdbCfg
.
slLevel
;
// get
idx
=
taosArraySearchIdx
(
pMemTable
->
aMemData
,
&
pMemDataT
,
memDataPCmprFn
,
TD_GE
);
if
(
idx
>=
0
)
{
pMemData
=
(
SMemData
*
)
taosArrayGet
(
pMemTable
->
aMemData
,
idx
);
if
(
memDataPCmprFn
(
&
pMemDataT
,
&
pMemData
)
==
0
)
goto
_exit
;
}
// create
pMemData
=
vnodeBufPoolMalloc
(
pPool
,
sizeof
(
*
pMemData
)
+
SL_NODE_SIZE
(
maxLevel
)
*
2
);
if
(
pMemData
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
pMemData
->
suid
=
suid
;
pMemData
->
uid
=
uid
;
pMemData
->
minKey
=
(
TSDBKEY
){.
version
=
INT64_MAX
,
.
ts
=
TSKEY_MAX
};
pMemData
->
maxKey
=
(
TSDBKEY
){.
version
=
-
1
,
.
ts
=
TSKEY_MIN
};
pMemData
->
delOpHead
=
pMemData
->
delOpTail
=
NULL
;
pMemData
->
sl
.
seed
=
taosRand
();
pMemData
->
sl
.
size
=
0
;
pMemData
->
sl
.
maxLevel
=
maxLevel
;
pMemData
->
sl
.
level
=
0
;
pMemData
->
sl
.
pHead
=
(
SMemSkipListNode
*
)
&
pMemData
[
1
];
pMemData
->
sl
.
pTail
=
(
SMemSkipListNode
*
)
POINTER_SHIFT
(
pMemData
->
sl
.
pHead
,
SL_NODE_SIZE
(
maxLevel
));
pMemData
->
sl
.
pHead
->
level
=
maxLevel
;
pMemData
->
sl
.
pTail
->
level
=
maxLevel
;
for
(
int8_t
iLevel
=
0
;
iLevel
<
pMemData
->
sl
.
maxLevel
;
iLevel
++
)
{
SL_NODE_FORWARD
(
pMemData
->
sl
.
pHead
,
iLevel
)
=
pMemData
->
sl
.
pTail
;
SL_NODE_BACKWARD
(
pMemData
->
sl
.
pHead
,
iLevel
)
=
NULL
;
SL_NODE_BACKWARD
(
pMemData
->
sl
.
pTail
,
iLevel
)
=
pMemData
->
sl
.
pHead
;
SL_NODE_FORWARD
(
pMemData
->
sl
.
pTail
,
iLevel
)
=
NULL
;
}
if
(
idx
<
0
)
idx
=
0
;
if
(
taosArrayInsert
(
pMemTable
->
aMemData
,
idx
,
&
pMemData
)
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_err
;
}
_exit:
*
ppMemData
=
pMemData
;
return
code
;
_err:
*
ppMemData
=
NULL
;
return
code
;
}
static
int
memDataPCmprFn
(
const
void
*
p1
,
const
void
*
p2
)
{
SMemData
*
pMemData1
=
*
(
SMemData
**
)
p1
;
SMemData
*
pMemData2
=
*
(
SMemData
**
)
p2
;
if
(
pMemData1
->
suid
<
pMemData2
->
suid
)
{
return
-
1
;
}
else
if
(
pMemData1
->
suid
>
pMemData2
->
suid
)
{
return
1
;
}
if
(
pMemData1
->
uid
<
pMemData2
->
uid
)
{
return
-
1
;
}
else
if
(
pMemData1
->
uid
>
pMemData2
->
uid
)
{
return
1
;
}
return
0
;
}
static
int32_t
tPutTSDBRow
(
uint8_t
*
p
,
TSDBROW
*
pRow
)
{
int32_t
n
=
0
;
n
+=
tPutI64
(
p
?
p
+
n
:
p
,
pRow
->
version
);
n
+=
tPutTSRow
(
p
?
p
+
n
:
p
,
&
pRow
->
tsRow
);
return
n
;
}
static
int32_t
tGetTSDBRow
(
uint8_t
*
p
,
TSDBROW
*
pRow
)
{
int32_t
n
=
0
;
n
+=
tGetI64
(
p
+
n
,
&
pRow
->
version
);
n
+=
tGetTSRow
(
p
+
n
,
&
pRow
->
tsRow
);
return
n
;
}
static
FORCE_INLINE
int8_t
tsdbMemSkipListRandLevel
(
SMemSkipList
*
pSl
)
{
int8_t
level
=
1
;
int8_t
tlevel
=
TMIN
(
pSl
->
maxLevel
,
pSl
->
level
+
1
);
const
uint32_t
factor
=
4
;
while
((
taosRandR
(
&
pSl
->
seed
)
%
factor
)
==
0
&&
level
<
tlevel
)
{
level
++
;
}
return
level
;
}
static
void
memDataMovePosTo
(
SMemData
*
pMemData
,
SMemSkipListNode
**
pos
,
TSDBKEY
*
pKey
,
int32_t
flags
)
{
SMemSkipListNode
*
px
;
SMemSkipListNode
*
pn
;
TSDBKEY
*
pTKey
;
int
c
;
int
backward
=
flags
&
SL_MOVE_BACKWARD
;
int
fromPos
=
flags
&
SL_MOVE_FROM_POS
;
if
(
backward
)
{
px
=
pMemData
->
sl
.
pTail
;
for
(
int8_t
iLevel
=
pMemData
->
sl
.
maxLevel
-
1
;
iLevel
>=
pMemData
->
sl
.
level
;
iLevel
--
)
{
pos
[
iLevel
]
=
px
;
}
if
(
pMemData
->
sl
.
level
)
{
if
(
fromPos
)
px
=
pos
[
pMemData
->
sl
.
level
-
1
];
for
(
int8_t
iLevel
=
pMemData
->
sl
.
level
-
1
;
iLevel
>=
0
;
iLevel
--
)
{
pn
=
SL_NODE_BACKWARD
(
px
,
iLevel
);
while
(
pn
!=
pMemData
->
sl
.
pHead
)
{
pTKey
=
(
TSDBKEY
*
)
SL_NODE_DATA
(
pn
);
c
=
tsdbKeyCmprFn
(
pTKey
,
pKey
);
if
(
c
<=
0
)
{
break
;
}
else
{
px
=
pn
;
pn
=
SL_NODE_BACKWARD
(
px
,
iLevel
);
}
}
pos
[
iLevel
]
=
px
;
}
}
}
else
{
px
=
pMemData
->
sl
.
pHead
;
for
(
int8_t
iLevel
=
pMemData
->
sl
.
maxLevel
-
1
;
iLevel
>=
pMemData
->
sl
.
level
;
iLevel
--
)
{
pos
[
iLevel
]
=
px
;
}
if
(
pMemData
->
sl
.
level
)
{
if
(
fromPos
)
px
=
pos
[
pMemData
->
sl
.
level
-
1
];
for
(
int8_t
iLevel
=
pMemData
->
sl
.
level
-
1
;
iLevel
>=
0
;
iLevel
--
)
{
pn
=
SL_NODE_FORWARD
(
px
,
iLevel
);
while
(
pn
!=
pMemData
->
sl
.
pHead
)
{
pTKey
=
(
TSDBKEY
*
)
SL_NODE_DATA
(
pn
);
c
=
tsdbKeyCmprFn
(
pTKey
,
pKey
);
if
(
c
>=
0
)
{
break
;
}
else
{
px
=
pn
;
pn
=
SL_NODE_FORWARD
(
px
,
iLevel
);
}
}
pos
[
iLevel
]
=
px
;
}
}
}
}
static
int32_t
memDataDoPut
(
SMemTable2
*
pMemTable
,
SMemData
*
pMemData
,
SMemSkipListNode
**
pos
,
TSDBROW
*
pRow
,
int8_t
forward
)
{
int32_t
code
=
0
;
int8_t
level
;
SMemSkipListNode
*
pNode
;
SVBufPool
*
pPool
=
pMemTable
->
pTsdb
->
pVnode
->
inUse
;
// node
level
=
tsdbMemSkipListRandLevel
(
&
pMemData
->
sl
);
pNode
=
(
SMemSkipListNode
*
)
vnodeBufPoolMalloc
(
pPool
,
SL_NODE_SIZE
(
level
)
+
tPutTSDBRow
(
NULL
,
pRow
));
if
(
pNode
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_exit
;
}
pNode
->
level
=
level
;
for
(
int8_t
iLevel
=
0
;
iLevel
<
level
;
iLevel
++
)
{
SL_NODE_FORWARD
(
pNode
,
iLevel
)
=
NULL
;
SL_NODE_BACKWARD
(
pNode
,
iLevel
)
=
NULL
;
}
tPutTSDBRow
((
uint8_t
*
)
SL_NODE_DATA
(
pNode
),
pRow
);
// put
for
(
int8_t
iLevel
=
0
;
iLevel
<
pNode
->
level
;
iLevel
++
)
{
SMemSkipListNode
*
px
=
pos
[
iLevel
];
if
(
forward
)
{
SMemSkipListNode
*
pNext
=
SL_NODE_FORWARD
(
px
,
iLevel
);
SL_NODE_FORWARD
(
pNode
,
iLevel
)
=
pNext
;
SL_NODE_BACKWARD
(
pNode
,
iLevel
)
=
px
;
SL_NODE_BACKWARD
(
pNext
,
iLevel
)
=
pNode
;
SL_NODE_FORWARD
(
px
,
iLevel
)
=
pNode
;
}
else
{
SMemSkipListNode
*
pPrev
=
SL_NODE_BACKWARD
(
px
,
iLevel
);
SL_NODE_FORWARD
(
pNode
,
iLevel
)
=
px
;
SL_NODE_BACKWARD
(
pNode
,
iLevel
)
=
pPrev
;
SL_NODE_FORWARD
(
pPrev
,
iLevel
)
=
pNode
;
SL_NODE_BACKWARD
(
px
,
iLevel
)
=
pNode
;
}
}
pMemData
->
sl
.
size
++
;
if
(
pMemData
->
sl
.
level
<
pNode
->
level
)
{
pMemData
->
sl
.
level
=
pNode
->
level
;
}
_exit:
return
code
;
}
static
int32_t
tsdbInsertTableDataImpl
(
SMemTable2
*
pMemTable
,
SMemData
*
pMemData
,
int64_t
version
,
SVSubmitBlk
*
pSubmitBlk
)
{
int32_t
code
=
0
;
int32_t
n
=
0
;
uint8_t
*
p
=
pSubmitBlk
->
pData
;
int32_t
nRow
=
0
;
TSDBROW
row
=
{.
version
=
version
};
SMemSkipListNode
*
pos
[
SL_MAX_LEVEL
];
ASSERT
(
pSubmitBlk
->
nData
);
// backward put first data
n
+=
tGetTSRow
(
p
+
n
,
&
row
.
tsRow
);
ASSERT
(
n
<=
pSubmitBlk
->
nData
);
memDataMovePosTo
(
pMemData
,
pos
,
&
(
TSDBKEY
){.
version
=
version
,
.
ts
=
row
.
tsRow
.
ts
},
SL_MOVE_BACKWARD
);
code
=
memDataDoPut
(
pMemTable
,
pMemData
,
pos
,
&
row
,
0
);
if
(
code
)
{
goto
_exit
;
}
nRow
++
;
if
(
tsdbKeyCmprFn
((
TSDBKEY
*
)
&
row
,
&
pMemData
->
minKey
)
<
0
)
{
pMemData
->
minKey
=
*
(
TSDBKEY
*
)
&
row
;
}
if
(
tsdbKeyCmprFn
((
TSDBKEY
*
)
&
row
,
&
pMemTable
->
minKey
)
<
0
)
{
pMemTable
->
minKey
=
*
(
TSDBKEY
*
)
&
row
;
}
// forward put rest
for
(
int8_t
iLevel
=
0
;
iLevel
<
pMemData
->
sl
.
maxLevel
;
iLevel
++
)
{
pos
[
iLevel
]
=
SL_NODE_BACKWARD
(
pos
[
iLevel
],
iLevel
);
}
while
(
n
<
pSubmitBlk
->
nData
)
{
n
+=
tGetTSRow
(
p
+
n
,
&
row
.
tsRow
);
ASSERT
(
n
<=
pSubmitBlk
->
nData
);
memDataMovePosTo
(
pMemData
,
pos
,
&
(
TSDBKEY
){.
version
=
version
,
.
ts
=
row
.
tsRow
.
ts
},
SL_MOVE_FROM_POS
);
code
=
memDataDoPut
(
pMemTable
,
pMemData
,
pos
,
&
row
,
1
);
if
(
code
)
{
goto
_exit
;
}
nRow
++
;
}
if
(
tsdbKeyCmprFn
((
TSDBKEY
*
)
&
row
,
&
pMemData
->
maxKey
)
>
0
)
{
pMemData
->
maxKey
=
*
(
TSDBKEY
*
)
&
row
;
}
if
(
tsdbKeyCmprFn
((
TSDBKEY
*
)
&
row
,
&
pMemTable
->
maxKey
)
>
0
)
{
pMemTable
->
maxKey
=
*
(
TSDBKEY
*
)
&
row
;
}
pMemTable
->
nRows
+=
nRow
;
_exit:
return
code
;
}
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
浏览文件 @
4935b146
...
...
@@ -196,33 +196,6 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
}
else
{
pReadh
->
pBlkIdx
=
(
SBlockIdx
*
)
p
;
}
// size_t size = taosArrayGetSize(pReadh->aBlkIdx);
// if (size > 0) {
// while (true) {
// if (pReadh->cidx >= size) {
// pReadh->pBlkIdx = NULL;
// break;
// }
// SBlockIdx *pBlkIdx = taosArrayGet(pReadh->aBlkIdx, pReadh->cidx);
// if (pBlkIdx->uid == TABLE_TID(pTable)) {
// if (pBlkIdx->uid == TABLE_UID(pTable)) {
// pReadh->pBlkIdx = pBlkIdx;
// } else {
// pReadh->pBlkIdx = NULL;
// }
// pReadh->cidx++;
// break;
// } else if (pBlkIdx->uid > TABLE_TID(pTable)) {
// pReadh->pBlkIdx = NULL;
// break;
// } else {
// pReadh->cidx++;
// }
// }
// } else {
// pReadh->pBlkIdx = NULL;
// }
return
0
;
}
...
...
source/util/src/talgo.c
浏览文件 @
4935b146
...
...
@@ -158,82 +158,48 @@ void taosqsort(void *src, int64_t numOfElem, int64_t size, const void *param, __
taosMemoryFreeClear
(
buf
);
}
void
*
taosbsearch
(
const
void
*
key
,
const
void
*
base
,
int64_t
nmemb
,
int64_t
size
,
__compar_fn_t
compar
,
int32_t
flags
)
{
// TODO: need to check the correctness of this function
int32_t
l
=
0
;
int32_t
r
=
(
int32_t
)
nmemb
;
int32_t
idx
=
0
;
int32_t
comparison
;
void
*
taosbsearch
(
const
void
*
key
,
const
void
*
base
,
int32_t
nmemb
,
int32_t
size
,
__compar_fn_t
compar
,
int32_t
flags
)
{
uint8_t
*
p
;
int32_t
lidx
;
int32_t
ridx
;
int32_t
midx
;
int32_t
c
;
if
(
nmemb
<=
0
)
return
NULL
;
lidx
=
0
;
ridx
=
nmemb
-
1
;
while
(
lidx
<=
ridx
)
{
midx
=
(
lidx
+
ridx
)
/
2
;
p
=
(
uint8_t
*
)
base
+
size
*
midx
;
c
=
compar
(
key
,
p
);
if
(
c
==
0
)
{
break
;
}
else
if
(
c
<
0
)
{
ridx
=
midx
-
1
;
}
else
{
lidx
=
midx
+
1
;
}
}
if
(
flags
==
TD_EQ
)
{
return
bsearch
(
key
,
base
,
nmemb
,
size
,
compar
)
;
return
c
?
NULL
:
p
;
}
else
if
(
flags
==
TD_GE
)
{
if
(
nmemb
<=
0
)
return
NULL
;
if
((
*
compar
)(
key
,
elePtrAt
(
base
,
size
,
0
))
<=
0
)
return
elePtrAt
(
base
,
size
,
0
);
if
((
*
compar
)(
key
,
elePtrAt
(
base
,
size
,
nmemb
-
1
))
>
0
)
return
NULL
;
while
(
l
<
r
)
{
idx
=
(
l
+
r
)
/
2
;
comparison
=
(
*
compar
)(
key
,
elePtrAt
(
base
,
size
,
idx
));
if
(
comparison
<
0
)
{
r
=
idx
;
}
else
if
(
comparison
>
0
)
{
l
=
idx
+
1
;
}
else
{
return
elePtrAt
(
base
,
size
,
idx
);
}
}
if
((
*
compar
)(
key
,
elePtrAt
(
base
,
size
,
idx
))
<
0
)
{
return
elePtrAt
(
base
,
size
,
idx
);
}
else
{
if
(
idx
+
1
>
nmemb
-
1
)
{
return
NULL
;
}
else
{
return
elePtrAt
(
base
,
size
,
idx
+
1
);
}
}
return
(
c
<=
0
)
?
p
:
(
midx
+
1
<
nmemb
?
p
+
size
:
NULL
);
}
else
if
(
flags
==
TD_LE
)
{
if
(
nmemb
<=
0
)
return
NULL
;
if
((
*
compar
)(
key
,
elePtrAt
(
base
,
size
,
nmemb
-
1
))
>=
0
)
return
elePtrAt
(
base
,
size
,
nmemb
-
1
);
if
((
*
compar
)(
key
,
elePtrAt
(
base
,
size
,
0
))
<
0
)
return
NULL
;
while
(
l
<
r
)
{
idx
=
(
l
+
r
)
/
2
;
comparison
=
(
*
compar
)(
key
,
elePtrAt
(
base
,
size
,
idx
));
if
(
comparison
<
0
)
{
r
=
idx
;
}
else
if
(
comparison
>
0
)
{
l
=
idx
+
1
;
}
else
{
return
elePtrAt
(
base
,
size
,
idx
);
}
}
if
((
*
compar
)(
key
,
elePtrAt
(
base
,
size
,
idx
))
>
0
)
{
return
elePtrAt
(
base
,
size
,
idx
);
}
else
{
if
(
idx
==
0
)
{
return
NULL
;
}
else
{
return
elePtrAt
(
base
,
size
,
idx
-
1
);
}
}
return
(
c
>=
0
)
?
p
:
(
midx
>
0
?
p
-
size
:
NULL
);
}
else
{
assert
(
0
);
return
NULL
;
ASSERT
(
0
);
}
return
NULL
;
}
void
taosheapadjust
(
void
*
base
,
int32_t
size
,
int32_t
start
,
int32_t
end
,
const
void
*
parcompar
,
__ext_compar_fn_t
compar
,
char
*
buf
,
bool
maxroot
)
{
__ext_compar_fn_t
compar
,
char
*
buf
,
bool
maxroot
)
{
int32_t
parent
;
int32_t
child
;
char
*
tmp
=
NULL
;
char
*
tmp
=
NULL
;
if
(
buf
==
NULL
)
{
tmp
=
taosMemoryMalloc
(
size
);
}
else
{
...
...
@@ -288,7 +254,7 @@ void taosheapsort(void *base, int32_t size, int32_t len, const void *parcompar,
bool
maxroot
)
{
int32_t
i
;
char
*
buf
=
taosMemoryCalloc
(
1
,
size
);
char
*
buf
=
taosMemoryCalloc
(
1
,
size
);
if
(
buf
==
NULL
)
{
return
;
}
...
...
source/util/test/CMakeLists.txt
浏览文件 @
4935b146
...
...
@@ -67,4 +67,12 @@ target_link_libraries(bloomFilterTest os util gtest_main)
add_test
(
NAME bloomFilterTest
COMMAND bloomFilterTest
)
# taosbsearchTest
add_executable
(
taosbsearchTest
"taosbsearchTest.cpp"
)
target_link_libraries
(
taosbsearchTest os util gtest_main
)
add_test
(
NAME taosbsearchTest
COMMAND taosbsearchTest
)
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录