Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
3fdb868a
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
3fdb868a
编写于
1月 08, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
more progress
上级
4c1e95b5
变更
7
显示空白变更内容
内联
并排
Showing
7 changed file
with
281 addition
and
154 deletion
+281
-154
source/dnode/vnode/tsdb/inc/tsdbCompact.h
source/dnode/vnode/tsdb/inc/tsdbCompact.h
+5
-5
source/dnode/vnode/tsdb/inc/tsdbDef.h
source/dnode/vnode/tsdb/inc/tsdbDef.h
+4
-0
source/dnode/vnode/tsdb/inc/tsdbMemTable.h
source/dnode/vnode/tsdb/inc/tsdbMemTable.h
+21
-1
source/dnode/vnode/tsdb/src/tsdbCommit.c
source/dnode/vnode/tsdb/src/tsdbCommit.c
+109
-131
source/dnode/vnode/tsdb/src/tsdbCompact.c
source/dnode/vnode/tsdb/src/tsdbCompact.c
+3
-0
source/dnode/vnode/tsdb/src/tsdbMemTable.c
source/dnode/vnode/tsdb/src/tsdbMemTable.c
+139
-15
source/dnode/vnode/tsdb/src/tsdbReadImpl.c
source/dnode/vnode/tsdb/src/tsdbReadImpl.c
+0
-2
未找到文件。
source/dnode/vnode/tsdb/inc/tsdbCompact.h
浏览文件 @
3fdb868a
...
@@ -12,6 +12,8 @@
...
@@ -12,6 +12,8 @@
* You should have received a copy of the GNU Affero General Public License
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#if 0
#ifndef _TD_TSDB_COMPACT_H_
#ifndef _TD_TSDB_COMPACT_H_
#define _TD_TSDB_COMPACT_H_
#define _TD_TSDB_COMPACT_H_
...
@@ -19,14 +21,12 @@
...
@@ -19,14 +21,12 @@
extern "C" {
extern "C" {
#endif
#endif
#if 0
void *tsdbCompactImpl(STsdbRepo *pRepo);
void *tsdbCompactImpl(STsdbRepo *pRepo);
#endif
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
#endif /* _TD_TSDB_COMPACT_H_ */
#endif /* _TD_TSDB_COMPACT_H_ */
#endif
\ No newline at end of file
source/dnode/vnode/tsdb/inc/tsdbDef.h
浏览文件 @
3fdb868a
...
@@ -54,6 +54,10 @@ struct STsdb {
...
@@ -54,6 +54,10 @@ struct STsdb {
#define REPO_CFG(r) (&(r)->config)
#define REPO_CFG(r) (&(r)->config)
#define REPO_FS(r) (&(r)->fs)
#define REPO_FS(r) (&(r)->fs)
static
FORCE_INLINE
STSchema
*
tsdbGetTableSchemaImpl
(
STable
*
pTable
,
bool
lock
,
bool
copy
,
int32_t
version
)
{
return
pTable
->
pSchema
;
}
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
source/dnode/vnode/tsdb/inc/tsdbMemTable.h
浏览文件 @
3fdb868a
...
@@ -22,6 +22,16 @@
...
@@ -22,6 +22,16 @@
extern
"C"
{
extern
"C"
{
#endif
#endif
typedef
struct
{
int
rowsInserted
;
int
rowsUpdated
;
int
rowsDeleteSucceed
;
int
rowsDeleteFailed
;
int
nOperations
;
TSKEY
keyFirst
;
TSKEY
keyLast
;
}
SMergeInfo
;
typedef
struct
STbData
{
typedef
struct
STbData
{
tb_uid_t
uid
;
tb_uid_t
uid
;
TSKEY
keyMin
;
TSKEY
keyMin
;
...
@@ -42,10 +52,20 @@ typedef struct STsdbMemTable {
...
@@ -42,10 +52,20 @@ typedef struct STsdbMemTable {
SHashObj
*
pHashIdx
;
SHashObj
*
pHashIdx
;
}
STsdbMemTable
;
}
STsdbMemTable
;
STsdbMemTable
*
tsdbNewMemTable
(
STsdb
*
pTsdb
);
STsdbMemTable
*
tsdbNewMemTable
(
STsdb
*
pTsdb
);
void
tsdbFreeMemTable
(
STsdb
*
pTsdb
,
STsdbMemTable
*
pMemTable
);
void
tsdbFreeMemTable
(
STsdb
*
pTsdb
,
STsdbMemTable
*
pMemTable
);
int
tsdbMemTableInsert
(
STsdb
*
pTsdb
,
STsdbMemTable
*
pMemTable
,
SSubmitMsg
*
pMsg
,
SShellSubmitRspMsg
*
pRsp
);
int
tsdbMemTableInsert
(
STsdb
*
pTsdb
,
STsdbMemTable
*
pMemTable
,
SSubmitMsg
*
pMsg
,
SShellSubmitRspMsg
*
pRsp
);
int
tsdbLoadDataFromCache
(
STable
*
pTable
,
SSkipListIterator
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
,
TKEY
*
filterKeys
,
int
nFilterKeys
,
bool
keepDup
,
SMergeInfo
*
pMergeInfo
);
static
FORCE_INLINE
SMemRow
tsdbNextIterRow
(
SSkipListIterator
*
pIter
)
{
if
(
pIter
==
NULL
)
return
NULL
;
SSkipListNode
*
node
=
tSkipListIterGet
(
pIter
);
if
(
node
==
NULL
)
return
NULL
;
return
(
SMemRow
)
SL_GET_NODE_DATA
(
node
);
}
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/dnode/vnode/tsdb/src/tsdbCommit.c
浏览文件 @
3fdb868a
...
@@ -60,6 +60,7 @@ static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key);
...
@@ -60,6 +60,7 @@ static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key);
static
int
tsdbNextCommitFid
(
SCommitH
*
pCommith
);
static
int
tsdbNextCommitFid
(
SCommitH
*
pCommith
);
static
void
tsdbDestroyCommitH
(
SCommitH
*
pCommith
);
static
void
tsdbDestroyCommitH
(
SCommitH
*
pCommith
);
static
int
tsdbCreateCommitIters
(
SCommitH
*
pCommith
);
static
int
tsdbCreateCommitIters
(
SCommitH
*
pCommith
);
static
void
tsdbDestroyCommitIters
(
SCommitH
*
pCommith
);
static
int
tsdbCommitToFile
(
SCommitH
*
pCommith
,
SDFileSet
*
pSet
,
int
fid
);
static
int
tsdbCommitToFile
(
SCommitH
*
pCommith
,
SDFileSet
*
pSet
,
int
fid
);
int
tsdbApplyRtnOnFSet
(
STsdb
*
pRepo
,
SDFileSet
*
pSet
,
SRtn
*
pRtn
)
{
int
tsdbApplyRtnOnFSet
(
STsdb
*
pRepo
,
SDFileSet
*
pSet
,
SRtn
*
pRtn
)
{
...
@@ -127,7 +128,6 @@ int tsdbCommit(STsdb *pRepo) {
...
@@ -127,7 +128,6 @@ int tsdbCommit(STsdb *pRepo) {
return
-
1
;
return
-
1
;
}
}
#if 0
// Skip expired memory data and expired FSET
// Skip expired memory data and expired FSET
tsdbSeekCommitIter
(
&
commith
,
commith
.
rtn
.
minKey
);
tsdbSeekCommitIter
(
&
commith
,
commith
.
rtn
.
minKey
);
while
((
pSet
=
tsdbFSIterNext
(
&
(
commith
.
fsIter
))))
{
while
((
pSet
=
tsdbFSIterNext
(
&
(
commith
.
fsIter
))))
{
...
@@ -138,6 +138,7 @@ int tsdbCommit(STsdb *pRepo) {
...
@@ -138,6 +138,7 @@ int tsdbCommit(STsdb *pRepo) {
break
;
break
;
}
}
}
}
#if 0
// Loop to commit to each file
// Loop to commit to each file
fid = tsdbNextCommitFid(&(commith));
fid = tsdbNextCommitFid(&(commith));
...
@@ -280,28 +281,28 @@ static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) {
...
@@ -280,28 +281,28 @@ static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) {
}
}
}
}
static
int
tsdbNextCommitFid
(
SCommitH
*
pCommith
)
{
//
static int tsdbNextCommitFid(SCommitH *pCommith) {
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
//
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
//
STsdbCfg *pCfg = REPO_CFG(pRepo);
int
fid
=
TSDB_IVLD_FID
;
//
int fid = TSDB_IVLD_FID;
for
(
int
i
=
0
;
i
<
pCommith
->
niters
;
i
++
)
{
//
for (int i = 0; i < pCommith->niters; i++) {
SCommitIter
*
pIter
=
pCommith
->
iters
+
i
;
//
SCommitIter *pIter = pCommith->iters + i;
if
(
pIter
->
pTable
==
NULL
||
pIter
->
pIter
==
NULL
)
continue
;
//
if (pIter->pTable == NULL || pIter->pIter == NULL) continue;
TSKEY
nextKey
=
tsdbNextIterKey
(
pIter
->
pIter
);
//
TSKEY nextKey = tsdbNextIterKey(pIter->pIter);
if
(
nextKey
==
TSDB_DATA_TIMESTAMP_NULL
)
{
//
if (nextKey == TSDB_DATA_TIMESTAMP_NULL) {
continue
;
//
continue;
}
else
{
//
} else {
int
tfid
=
(
int
)(
TSDB_KEY_FID
(
nextKey
,
pCfg
->
daysPerFile
,
pCfg
->
precision
));
//
int tfid = (int)(TSDB_KEY_FID(nextKey, pCfg->daysPerFile, pCfg->precision));
if
(
fid
==
TSDB_IVLD_FID
||
fid
>
tfid
)
{
//
if (fid == TSDB_IVLD_FID || fid > tfid) {
fid
=
tfid
;
//
fid = tfid;
}
//
}
}
//
}
}
//
}
return
fid
;
//
return fid;
}
//
}
static
void
tsdbDestroyCommitH
(
SCommitH
*
pCommith
)
{
static
void
tsdbDestroyCommitH
(
SCommitH
*
pCommith
)
{
pCommith
->
pDataCols
=
tdFreeDataCols
(
pCommith
->
pDataCols
);
pCommith
->
pDataCols
=
tdFreeDataCols
(
pCommith
->
pDataCols
);
...
@@ -313,117 +314,109 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) {
...
@@ -313,117 +314,109 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) {
tsdbCloseDFileSet
(
TSDB_COMMIT_WRITE_FSET
(
pCommith
));
tsdbCloseDFileSet
(
TSDB_COMMIT_WRITE_FSET
(
pCommith
));
}
}
static
int
tsdbCommitToFile
(
SCommitH
*
pCommith
,
SDFileSet
*
pSet
,
int
fid
)
{
// static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
// STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
// STsdbCfg *pCfg = REPO_CFG(pRepo);
ASSERT
(
pSet
==
NULL
||
pSet
->
fid
==
fid
);
// ASSERT(pSet == NULL || pSet->fid == fid);
tsdbResetCommitFile
(
pCommith
);
// tsdbResetCommitFile(pCommith);
tsdbGetFidKeyRange
(
pCfg
->
daysPerFile
,
pCfg
->
precision
,
fid
,
&
(
pCommith
->
minKey
),
&
(
pCommith
->
maxKey
));
// tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &(pCommith->minKey), &(pCommith->maxKey));
// Set and open files
// // Set and open files
if
(
tsdbSetAndOpenCommitFile
(
pCommith
,
pSet
,
fid
)
<
0
)
{
// if (tsdbSetAndOpenCommitFile(pCommith, pSet, fid) < 0) {
return
-
1
;
// return -1;
}
// }
// Loop to commit each table data
// // Loop to commit each table data
for
(
int
tid
=
1
;
tid
<
pCommith
->
niters
;
tid
++
)
{
// for (int tid = 1; tid < pCommith->niters; tid++) {
SCommitIter
*
pIter
=
pCommith
->
iters
+
tid
;
// SCommitIter *pIter = pCommith->iters + tid;
if
(
pIter
->
pTable
==
NULL
)
continue
;
// if (pIter->pTable == NULL) continue;
if
(
tsdbCommitToTable
(
pCommith
,
tid
)
<
0
)
{
// if (tsdbCommitToTable(pCommith, tid) < 0) {
tsdbCloseCommitFile
(
pCommith
,
true
);
// tsdbCloseCommitFile(pCommith, true);
// revert the file change
// // revert the file change
tsdbApplyDFileSetChange
(
TSDB_COMMIT_WRITE_FSET
(
pCommith
),
pSet
);
// tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
return
-
1
;
// return -1;
}
// }
}
// }
// if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith))))
// <
// 0) {
// tsdbError("vgId:%d failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
// tsdbCloseCommitFile(pCommith, true);
// // revert the file change
// tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
// return -1;
// }
// if (tsdbUpdateDFileSetHeader(&(pCommith->wSet)) < 0) {
// tsdbError("vgId:%d failed to update FSET %d header since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
// tsdbCloseCommitFile(pCommith, true);
// // revert the file change
// tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet);
// return -1;
// }
// // Close commit file
// tsdbCloseCommitFile(pCommith, false);
// if (tsdbUpdateDFileSet(REPO_FS(pRepo), &(pCommith->wSet)) < 0) {
// return -1;
// }
// return 0;
// }
if
(
tsdbWriteBlockIdx
(
TSDB_COMMIT_HEAD_FILE
(
pCommith
),
pCommith
->
aBlkIdx
,
(
void
**
)(
&
(
TSDB_COMMIT_BUF
(
pCommith
))))
<
static
int
tsdbCreateCommitIters
(
SCommitH
*
pCommith
)
{
0
)
{
STsdb
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
tsdbError
(
"vgId:%d failed to write SBlockIdx part to FSET %d since %s"
,
REPO_ID
(
pRepo
),
fid
,
tstrerror
(
terrno
));
STsdbMemTable
*
pMem
=
pRepo
->
imem
;
tsdbCloseCommitFile
(
pCommith
,
true
);
SSkipListIterator
*
pSlIter
;
// revert the file change
SCommitIter
*
pCommitIter
;
tsdbApplyDFileSetChange
(
TSDB_COMMIT_WRITE_FSET
(
pCommith
),
pSet
);
SSkipListNode
*
pNode
;
STbData
*
pTbData
;
pCommith
->
niters
=
SL_SIZE
(
pMem
->
pSlIdx
);
pCommith
->
iters
=
(
SCommitIter
*
)
calloc
(
pCommith
->
niters
,
sizeof
(
SCommitIter
));
if
(
pCommith
->
iters
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
return
-
1
;
}
}
if
(
tsdbUpdateDFileSetHeader
(
&
(
pCommith
->
wSet
))
<
0
)
{
// Loop to create iters for each skiplist
tsdbError
(
"vgId:%d failed to update FSET %d header since %s"
,
REPO_ID
(
pRepo
),
fid
,
tstrerror
(
terrno
));
pSlIter
=
tSkipListCreateIter
(
pMem
->
pSlIdx
);
tsdbCloseCommitFile
(
pCommith
,
true
);
if
(
pSlIter
==
NULL
)
{
// revert the file change
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
tsdbApplyDFileSetChange
(
TSDB_COMMIT_WRITE_FSET
(
pCommith
),
pSet
);
return
-
1
;
return
-
1
;
}
}
for
(
int
i
=
0
;
i
<
pCommith
->
niters
;
i
++
)
{
tSkipListIterNext
(
pSlIter
);
pNode
=
tSkipListIterGet
(
pSlIter
);
pTbData
=
(
STbData
*
)
pNode
->
pData
;
// Close commit file
pCommitIter
=
pCommith
->
iters
+
i
;
tsdbCloseCommitFile
(
pCommith
,
false
);
pCommitIter
->
pIter
=
tSkipListCreateIter
(
pTbData
->
pData
);
tSkipListIterNext
(
pCommitIter
->
pIter
);
if
(
tsdbUpdateDFileSet
(
REPO_FS
(
pRepo
),
&
(
pCommith
->
wSet
))
<
0
)
{
return
-
1
;
}
}
return
0
;
return
0
;
}
}
static
int
tsdbCreateCommitIters
(
SCommitH
*
pCommith
)
{
static
void
tsdbDestroyCommitIters
(
SCommitH
*
pCommith
)
{
#if 0
if
(
pCommith
->
iters
==
NULL
)
return
;
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
SMemTable *pMem = pRepo->imem;
// STsdbMeta *pMeta = pRepo->tsdbMeta;
pCommith->niters = pMem->maxTables;
pCommith->iters = (SCommitIter *)calloc(pMem->maxTables, sizeof(SCommitIter));
if (pCommith->iters == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
if (tsdbRLockRepoMeta(pRepo) < 0) return -1;
// reference all tables
for (int i = 0; i < pMem->maxTables; i++) {
if (pMeta->tables[i] != NULL) {
tsdbRefTable(pMeta->tables[i]);
pCommith->iters[i].pTable = pMeta->tables[i];
}
}
if (tsdbUnlockRepoMeta(pRepo) < 0) return -1;
for (int i = 0; i < pMem->maxTables; i++) {
if ((pCommith->iters[i].pTable != NULL) && (pMem->tData[i] != NULL) &&
(TABLE_UID(pCommith->iters[i].pTable) == pMem->tData[i]->uid)) {
if ((pCommith->iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
tSkipListIterNext(pCommith->iters[i].pIter);
for
(
int
i
=
1
;
i
<
pCommith
->
niters
;
i
++
)
{
}
tSkipListDestroyIter
(
pCommith
->
iters
[
i
].
pIter
);
}
}
#endif
free
(
pCommith
->
iters
);
return
0
;
pCommith
->
iters
=
NULL
;
pCommith
->
niters
=
0
;
}
}
#if 0
#if 0
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbint.h"
#include "tsdbint.h"
extern int32_t tsTsdbMetaCompactRatio;
extern int32_t tsTsdbMetaCompactRatio;
...
@@ -895,21 +888,6 @@ static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS
...
@@ -895,21 +888,6 @@ static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS
static void tsdbDestroyCommitIters(SCommitH *pCommith) {
if (pCommith->iters == NULL) return;
for (int i = 1; i < pCommith->niters; i++) {
if (pCommith->iters[i].pTable != NULL) {
tsdbUnRefTable(pCommith->iters[i].pTable);
tSkipListDestroyIter(pCommith->iters[i].pIter);
}
}
free(pCommith->iters);
pCommith->iters = NULL;
pCommith->niters = 0;
}
static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
...
...
source/dnode/vnode/tsdb/src/tsdbCompact.c
浏览文件 @
3fdb868a
...
@@ -12,6 +12,7 @@
...
@@ -12,6 +12,7 @@
* You should have received a copy of the GNU Affero General Public License
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#if 0
#include "tsdbint.h"
#include "tsdbint.h"
typedef struct {
typedef struct {
...
@@ -528,3 +529,5 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
...
@@ -528,3 +529,5 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
return 0;
return 0;
}
}
#endif
source/dnode/vnode/tsdb/src/tsdbMemTable.c
浏览文件 @
3fdb868a
...
@@ -22,6 +22,7 @@ static void tsdbFreeTbData(STbData *pTbData);
...
@@ -22,6 +22,7 @@ static void tsdbFreeTbData(STbData *pTbData);
static
char
*
tsdbGetTsTupleKey
(
const
void
*
data
);
static
char
*
tsdbGetTsTupleKey
(
const
void
*
data
);
static
int
tsdbTbDataComp
(
const
void
*
arg1
,
const
void
*
arg2
);
static
int
tsdbTbDataComp
(
const
void
*
arg1
,
const
void
*
arg2
);
static
char
*
tsdbTbDataGetUid
(
const
void
*
arg
);
static
char
*
tsdbTbDataGetUid
(
const
void
*
arg
);
static
int
tsdbAppendTableRowToCols
(
STable
*
pTable
,
SDataCols
*
pCols
,
STSchema
**
ppSchema
,
SMemRow
row
);
STsdbMemTable
*
tsdbNewMemTable
(
STsdb
*
pTsdb
)
{
STsdbMemTable
*
tsdbNewMemTable
(
STsdb
*
pTsdb
)
{
STsdbMemTable
*
pMemTable
=
(
STsdbMemTable
*
)
calloc
(
1
,
sizeof
(
*
pMemTable
));
STsdbMemTable
*
pMemTable
=
(
STsdbMemTable
*
)
calloc
(
1
,
sizeof
(
*
pMemTable
));
...
@@ -103,6 +104,129 @@ int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg,
...
@@ -103,6 +104,129 @@ int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitMsg *pMsg,
return
0
;
return
0
;
}
}
/**
* This is an important function to load data or try to load data from memory skiplist iterator.
*
* This function load memory data until:
* 1. iterator ends
* 2. data key exceeds maxKey
* 3. rowsIncreased = rowsInserted - rowsDeleteSucceed >= maxRowsToRead
* 4. operations in pCols not exceeds its max capacity if pCols is given
*
* The function tries to procceed AS MUCH AS POSSIBLE.
*/
int
tsdbLoadDataFromCache
(
STable
*
pTable
,
SSkipListIterator
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
,
TKEY
*
filterKeys
,
int
nFilterKeys
,
bool
keepDup
,
SMergeInfo
*
pMergeInfo
)
{
ASSERT
(
maxRowsToRead
>
0
&&
nFilterKeys
>=
0
);
if
(
pIter
==
NULL
)
return
0
;
STSchema
*
pSchema
=
NULL
;
TSKEY
rowKey
=
0
;
TSKEY
fKey
=
0
;
bool
isRowDel
=
false
;
int
filterIter
=
0
;
SMemRow
row
=
NULL
;
SMergeInfo
mInfo
;
if
(
pMergeInfo
==
NULL
)
pMergeInfo
=
&
mInfo
;
memset
(
pMergeInfo
,
0
,
sizeof
(
*
pMergeInfo
));
pMergeInfo
->
keyFirst
=
INT64_MAX
;
pMergeInfo
->
keyLast
=
INT64_MIN
;
if
(
pCols
)
tdResetDataCols
(
pCols
);
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
||
memRowKey
(
row
)
>
maxKey
)
{
rowKey
=
INT64_MAX
;
isRowDel
=
false
;
}
else
{
rowKey
=
memRowKey
(
row
);
isRowDel
=
memRowDeleted
(
row
);
}
if
(
filterIter
>=
nFilterKeys
)
{
fKey
=
INT64_MAX
;
}
else
{
fKey
=
tdGetKey
(
filterKeys
[
filterIter
]);
}
while
(
true
)
{
if
(
fKey
==
INT64_MAX
&&
rowKey
==
INT64_MAX
)
break
;
if
(
fKey
<
rowKey
)
{
pMergeInfo
->
keyFirst
=
MIN
(
pMergeInfo
->
keyFirst
,
fKey
);
pMergeInfo
->
keyLast
=
MAX
(
pMergeInfo
->
keyLast
,
fKey
);
filterIter
++
;
if
(
filterIter
>=
nFilterKeys
)
{
fKey
=
INT64_MAX
;
}
else
{
fKey
=
tdGetKey
(
filterKeys
[
filterIter
]);
}
}
else
if
(
fKey
>
rowKey
)
{
if
(
isRowDel
)
{
pMergeInfo
->
rowsDeleteFailed
++
;
}
else
{
if
(
pMergeInfo
->
rowsInserted
-
pMergeInfo
->
rowsDeleteSucceed
>=
maxRowsToRead
)
break
;
if
(
pCols
&&
pMergeInfo
->
nOperations
>=
pCols
->
maxPoints
)
break
;
pMergeInfo
->
rowsInserted
++
;
pMergeInfo
->
nOperations
++
;
pMergeInfo
->
keyFirst
=
MIN
(
pMergeInfo
->
keyFirst
,
rowKey
);
pMergeInfo
->
keyLast
=
MAX
(
pMergeInfo
->
keyLast
,
rowKey
);
tsdbAppendTableRowToCols
(
pTable
,
pCols
,
&
pSchema
,
row
);
}
tSkipListIterNext
(
pIter
);
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
||
memRowKey
(
row
)
>
maxKey
)
{
rowKey
=
INT64_MAX
;
isRowDel
=
false
;
}
else
{
rowKey
=
memRowKey
(
row
);
isRowDel
=
memRowDeleted
(
row
);
}
}
else
{
if
(
isRowDel
)
{
ASSERT
(
!
keepDup
);
if
(
pCols
&&
pMergeInfo
->
nOperations
>=
pCols
->
maxPoints
)
break
;
pMergeInfo
->
rowsDeleteSucceed
++
;
pMergeInfo
->
nOperations
++
;
tsdbAppendTableRowToCols
(
pTable
,
pCols
,
&
pSchema
,
row
);
}
else
{
if
(
keepDup
)
{
if
(
pCols
&&
pMergeInfo
->
nOperations
>=
pCols
->
maxPoints
)
break
;
pMergeInfo
->
rowsUpdated
++
;
pMergeInfo
->
nOperations
++
;
pMergeInfo
->
keyFirst
=
MIN
(
pMergeInfo
->
keyFirst
,
rowKey
);
pMergeInfo
->
keyLast
=
MAX
(
pMergeInfo
->
keyLast
,
rowKey
);
tsdbAppendTableRowToCols
(
pTable
,
pCols
,
&
pSchema
,
row
);
}
else
{
pMergeInfo
->
keyFirst
=
MIN
(
pMergeInfo
->
keyFirst
,
fKey
);
pMergeInfo
->
keyLast
=
MAX
(
pMergeInfo
->
keyLast
,
fKey
);
}
}
tSkipListIterNext
(
pIter
);
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
||
memRowKey
(
row
)
>
maxKey
)
{
rowKey
=
INT64_MAX
;
isRowDel
=
false
;
}
else
{
rowKey
=
memRowKey
(
row
);
isRowDel
=
memRowDeleted
(
row
);
}
filterIter
++
;
if
(
filterIter
>=
nFilterKeys
)
{
fKey
=
INT64_MAX
;
}
else
{
fKey
=
tdGetKey
(
filterKeys
[
filterIter
]);
}
}
}
return
0
;
}
static
int
tsdbScanAndConvertSubmitMsg
(
STsdb
*
pTsdb
,
SSubmitMsg
*
pMsg
)
{
static
int
tsdbScanAndConvertSubmitMsg
(
STsdb
*
pTsdb
,
SSubmitMsg
*
pMsg
)
{
ASSERT
(
pMsg
!=
NULL
);
ASSERT
(
pMsg
!=
NULL
);
// STsdbMeta * pMeta = pTsdb->tsdbMeta;
// STsdbMeta * pMeta = pTsdb->tsdbMeta;
...
@@ -313,6 +437,21 @@ static char *tsdbTbDataGetUid(const void *arg) {
...
@@ -313,6 +437,21 @@ static char *tsdbTbDataGetUid(const void *arg) {
STbData
*
pTbData
=
(
STbData
*
)
arg
;
STbData
*
pTbData
=
(
STbData
*
)
arg
;
return
(
char
*
)(
&
(
pTbData
->
uid
));
return
(
char
*
)(
&
(
pTbData
->
uid
));
}
}
static
int
tsdbAppendTableRowToCols
(
STable
*
pTable
,
SDataCols
*
pCols
,
STSchema
**
ppSchema
,
SMemRow
row
)
{
if
(
pCols
)
{
if
(
*
ppSchema
==
NULL
||
schemaVersion
(
*
ppSchema
)
!=
memRowVersion
(
row
))
{
*
ppSchema
=
tsdbGetTableSchemaImpl
(
pTable
,
false
,
false
,
memRowVersion
(
row
));
if
(
*
ppSchema
==
NULL
)
{
ASSERT
(
false
);
return
-
1
;
}
}
tdAppendMemRowToDataCol
(
row
,
*
ppSchema
,
pCols
,
true
);
}
return
0
;
}
/* ------------------------ REFACTORING ------------------------ */
/* ------------------------ REFACTORING ------------------------ */
#if 0
#if 0
...
@@ -650,21 +789,6 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
...
@@ -650,21 +789,6 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
}
}
// ---------------- LOCAL FUNCTIONS ----------------
// ---------------- LOCAL FUNCTIONS ----------------
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row) {
if (pCols) {
if (*ppSchema == NULL || schemaVersion(*ppSchema) != memRowVersion(row)) {
*ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row), (int8_t)memRowType(row));
if (*ppSchema == NULL) {
ASSERT(false);
return -1;
}
}
tdAppendMemRowToDataCol(row, *ppSchema, pCols, true, 0);
}
return 0;
}
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey,
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey,
TSKEY now) {
TSKEY now) {
...
...
source/dnode/vnode/tsdb/src/tsdbReadImpl.c
浏览文件 @
3fdb868a
...
@@ -25,7 +25,6 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int3
...
@@ -25,7 +25,6 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int3
static
int
tsdbLoadBlockDataColsImpl
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SDataCols
*
pDataCols
,
int16_t
*
colIds
,
static
int
tsdbLoadBlockDataColsImpl
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SDataCols
*
pDataCols
,
int16_t
*
colIds
,
int
numOfColIds
);
int
numOfColIds
);
static
int
tsdbLoadColData
(
SReadH
*
pReadh
,
SDFile
*
pDFile
,
SBlock
*
pBlock
,
SBlockCol
*
pBlockCol
,
SDataCol
*
pDataCol
);
static
int
tsdbLoadColData
(
SReadH
*
pReadh
,
SDFile
*
pDFile
,
SBlock
*
pBlock
,
SBlockCol
*
pBlockCol
,
SDataCol
*
pDataCol
);
static
STSchema
*
tsdbGetTableSchemaImpl
(
STable
*
pTable
,
bool
lock
,
bool
copy
,
int32_t
version
)
{
return
NULL
;
}
int
tsdbInitReadH
(
SReadH
*
pReadh
,
STsdb
*
pRepo
)
{
int
tsdbInitReadH
(
SReadH
*
pReadh
,
STsdb
*
pRepo
)
{
ASSERT
(
pReadh
!=
NULL
&&
pRepo
!=
NULL
);
ASSERT
(
pReadh
!=
NULL
&&
pRepo
!=
NULL
);
...
@@ -667,4 +666,3 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
...
@@ -667,4 +666,3 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc
return
0
;
return
0
;
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录