Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
e28f1139
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e28f1139
编写于
7月 24, 2020
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'feature/vnode' of
https://github.com/taosdata/TDengine
into feature/vnode
上级
c9f72ce6
3403c4d3
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
167 addition
and
134 deletion
+167
-134
src/inc/tsdb.h
src/inc/tsdb.h
+0
-2
src/kit/taosmigrate/taosmigrateVnodeCfg.c
src/kit/taosmigrate/taosmigrateVnodeCfg.c
+12
-14
src/tsdb/inc/tsdbMain.h
src/tsdb/inc/tsdbMain.h
+7
-1
src/tsdb/src/tsdbFile.c
src/tsdb/src/tsdbFile.c
+1
-1
src/tsdb/src/tsdbMain.c
src/tsdb/src/tsdbMain.c
+20
-59
src/tsdb/src/tsdbMemTable.c
src/tsdb/src/tsdbMemTable.c
+64
-21
src/tsdb/src/tsdbMeta.c
src/tsdb/src/tsdbMeta.c
+36
-10
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+11
-10
src/tsdb/tests/tsdbTests.cpp
src/tsdb/tests/tsdbTests.cpp
+1
-1
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+15
-15
未找到文件。
src/inc/tsdb.h
浏览文件 @
e28f1139
...
...
@@ -53,14 +53,12 @@ typedef struct {
int32_t
tsdbId
;
int32_t
cacheBlockSize
;
int32_t
totalBlocks
;
int32_t
maxTables
;
// maximum number of tables this repository can have
int32_t
daysPerFile
;
// day per file sharding policy
int32_t
keep
;
// day of data to keep
int32_t
keep1
;
int32_t
keep2
;
int32_t
minRowsPerFileBlock
;
// minimum rows per file block
int32_t
maxRowsPerFileBlock
;
// maximum rows per file block
int32_t
commitTime
;
int8_t
precision
;
int8_t
compression
;
}
STsdbCfg
;
...
...
src/kit/taosmigrate/taosmigrateVnodeCfg.c
浏览文件 @
e28f1139
...
...
@@ -37,14 +37,12 @@ static int32_t saveVnodeCfg(SVnodeObj *pVnode, char* cfgFile)
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
cfgVersion
\"
: %d,
\n
"
,
pVnode
->
cfgVersion
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
cacheBlockSize
\"
: %d,
\n
"
,
pVnode
->
tsdbCfg
.
cacheBlockSize
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
totalBlocks
\"
: %d,
\n
"
,
pVnode
->
tsdbCfg
.
totalBlocks
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
maxTables
\"
: %d,
\n
"
,
pVnode
->
tsdbCfg
.
maxTables
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
daysPerFile
\"
: %d,
\n
"
,
pVnode
->
tsdbCfg
.
daysPerFile
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
daysToKeep
\"
: %d,
\n
"
,
pVnode
->
tsdbCfg
.
keep
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
daysToKeep1
\"
: %d,
\n
"
,
pVnode
->
tsdbCfg
.
keep1
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
daysToKeep2
\"
: %d,
\n
"
,
pVnode
->
tsdbCfg
.
keep2
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
minRowsPerFileBlock
\"
: %d,
\n
"
,
pVnode
->
tsdbCfg
.
minRowsPerFileBlock
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
maxRowsPerFileBlock
\"
: %d,
\n
"
,
pVnode
->
tsdbCfg
.
maxRowsPerFileBlock
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
commitTime
\"
: %d,
\n
"
,
pVnode
->
tsdbCfg
.
commitTime
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
precision
\"
: %d,
\n
"
,
pVnode
->
tsdbCfg
.
precision
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
compression
\"
: %d,
\n
"
,
pVnode
->
tsdbCfg
.
compression
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
walLevel
\"
: %d,
\n
"
,
pVnode
->
walCfg
.
walLevel
);
...
...
@@ -136,12 +134,12 @@ static int32_t readVnodeCfg(SVnodeObj *pVnode, char* cfgFile)
}
pVnode
->
tsdbCfg
.
totalBlocks
=
totalBlocks
->
valueint
;
cJSON
*
maxTables
=
cJSON_GetObjectItem
(
root
,
"maxTables"
);
if
(
!
maxTables
||
maxTables
->
type
!=
cJSON_Number
)
{
printf
(
"vgId:%d, failed to read vnode cfg, maxTables not found
\n
"
,
pVnode
->
vgId
);
goto
PARSE_OVER
;
}
pVnode
->
tsdbCfg
.
maxTables
=
maxTables
->
valueint
;
//
cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables");
//
if (!maxTables || maxTables->type != cJSON_Number) {
//
printf("vgId:%d, failed to read vnode cfg, maxTables not found\n", pVnode->vgId);
//
goto PARSE_OVER;
//
}
//
pVnode->tsdbCfg.maxTables = maxTables->valueint;
cJSON
*
daysPerFile
=
cJSON_GetObjectItem
(
root
,
"daysPerFile"
);
if
(
!
daysPerFile
||
daysPerFile
->
type
!=
cJSON_Number
)
{
...
...
@@ -185,12 +183,12 @@ static int32_t readVnodeCfg(SVnodeObj *pVnode, char* cfgFile)
}
pVnode
->
tsdbCfg
.
maxRowsPerFileBlock
=
maxRowsPerFileBlock
->
valueint
;
cJSON
*
commitTime
=
cJSON_GetObjectItem
(
root
,
"commitTime"
);
if
(
!
commitTime
||
commitTime
->
type
!=
cJSON_Number
)
{
printf
(
"vgId:%d, failed to read vnode cfg, commitTime not found
\n
"
,
pVnode
->
vgId
);
goto
PARSE_OVER
;
}
pVnode
->
tsdbCfg
.
commitTime
=
(
int8_t
)
commitTime
->
valueint
;
//
cJSON *commitTime = cJSON_GetObjectItem(root, "commitTime");
//
if (!commitTime || commitTime->type != cJSON_Number) {
//
printf("vgId:%d, failed to read vnode cfg, commitTime not found\n", pVnode->vgId);
//
goto PARSE_OVER;
//
}
//
pVnode->tsdbCfg.commitTime = (int8_t)commitTime->valueint;
cJSON
*
precision
=
cJSON_GetObjectItem
(
root
,
"precision"
);
if
(
!
precision
||
precision
->
type
!=
cJSON_Number
)
{
...
...
src/tsdb/inc/tsdbMain.h
浏览文件 @
e28f1139
...
...
@@ -70,6 +70,7 @@ typedef struct {
pthread_rwlock_t
rwLock
;
int32_t
nTables
;
int32_t
maxTables
;
STable
**
tables
;
SList
*
superList
;
SHashObj
*
uidMap
;
...
...
@@ -111,9 +112,11 @@ typedef struct {
typedef
struct
{
T_REF_DECLARE
();
SRWLatch
latch
;
TSKEY
keyFirst
;
TSKEY
keyLast
;
int64_t
numOfRows
;
int32_t
maxTables
;
STableData
**
tData
;
SList
*
actList
;
SList
*
bufBlockList
;
...
...
@@ -304,6 +307,7 @@ typedef struct {
// Operations
// ------------------ tsdbMeta.c
#define TSDB_INIT_NTABLES 1024
#define TABLE_TYPE(t) (t)->type
#define TABLE_NAME(t) (t)->name
#define TABLE_CHAR_NAME(t) TABLE_NAME(t)->data
...
...
@@ -395,6 +399,7 @@ int tsdbInsertRowToMem(STsdbRepo* pRepo, SDataRow row, STable* pTable);
int
tsdbRefMemTable
(
STsdbRepo
*
pRepo
,
SMemTable
*
pMemTable
);
int
tsdbUnRefMemTable
(
STsdbRepo
*
pRepo
,
SMemTable
*
pMemTable
);
int
tsdbTakeMemSnapshot
(
STsdbRepo
*
pRepo
,
SMemTable
**
pMem
,
SMemTable
**
pIMem
);
void
tsdbUnTakeMemSnapShot
(
STsdbRepo
*
pRepo
,
SMemTable
*
pMem
,
SMemTable
*
pIMem
);
void
*
tsdbAllocBytes
(
STsdbRepo
*
pRepo
,
int
bytes
);
int
tsdbAsyncCommit
(
STsdbRepo
*
pRepo
);
int
tsdbLoadDataFromCache
(
STable
*
pTable
,
SSkipListIterator
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
,
...
...
@@ -429,7 +434,7 @@ STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg);
void
tsdbFreeFileH
(
STsdbFileH
*
pFileH
);
int
tsdbOpenFileH
(
STsdbRepo
*
pRepo
);
void
tsdbCloseFileH
(
STsdbRepo
*
pRepo
);
SFileGroup
*
tsdbCreateFGroupIfNeed
(
STsdbRepo
*
pRepo
,
char
*
dataDir
,
int
fid
,
int
maxTables
);
SFileGroup
*
tsdbCreateFGroupIfNeed
(
STsdbRepo
*
pRepo
,
char
*
dataDir
,
int
fid
);
void
tsdbInitFileGroupIter
(
STsdbFileH
*
pFileH
,
SFileGroupIter
*
pIter
,
int
direction
);
void
tsdbSeekFileGroupIter
(
SFileGroupIter
*
pIter
,
int
fid
);
SFileGroup
*
tsdbGetFileGroupNext
(
SFileGroupIter
*
pIter
);
...
...
@@ -511,6 +516,7 @@ void tsdbGetDataFileName(STsdbRepo* pRepo, int fid, int type, char* fname
int
tsdbLockRepo
(
STsdbRepo
*
pRepo
);
int
tsdbUnlockRepo
(
STsdbRepo
*
pRepo
);
char
*
tsdbGetDataDirName
(
char
*
rootDir
);
int
tsdbGetNextMaxTables
(
int
tid
);
STsdbMeta
*
tsdbGetMeta
(
TSDB_REPO_T
*
pRepo
);
STsdbFileH
*
tsdbGetFile
(
TSDB_REPO_T
*
pRepo
);
...
...
src/tsdb/src/tsdbFile.c
浏览文件 @
e28f1139
...
...
@@ -149,7 +149,7 @@ void tsdbCloseFileH(STsdbRepo *pRepo) {
}
}
SFileGroup
*
tsdbCreateFGroupIfNeed
(
STsdbRepo
*
pRepo
,
char
*
dataDir
,
int
fid
,
int
maxTables
)
{
SFileGroup
*
tsdbCreateFGroupIfNeed
(
STsdbRepo
*
pRepo
,
char
*
dataDir
,
int
fid
)
{
STsdbFileH
*
pFileH
=
pRepo
->
tsdbFileH
;
if
(
pFileH
->
nFGroups
>=
pFileH
->
maxFGroups
)
return
NULL
;
...
...
src/tsdb/src/tsdbMain.c
浏览文件 @
e28f1139
...
...
@@ -62,7 +62,6 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo);
static
int
tsdbInitSubmitBlkIter
(
SSubmitBlk
*
pBlock
,
SSubmitBlkIter
*
pIter
);
static
void
tsdbAlterCompression
(
STsdbRepo
*
pRepo
,
int8_t
compression
);
static
int
tsdbAlterKeep
(
STsdbRepo
*
pRepo
,
int32_t
keep
);
static
int
tsdbAlterMaxTables
(
STsdbRepo
*
pRepo
,
int32_t
maxTables
);
static
int
tsdbAlterCacheTotalBlocks
(
STsdbRepo
*
pRepo
,
int
totalBlocks
);
static
int
keyFGroupCompFunc
(
const
void
*
key
,
const
void
*
fgroup
);
static
int
tsdbEncodeCfg
(
void
**
buf
,
STsdbCfg
*
pCfg
);
...
...
@@ -85,10 +84,10 @@ int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) {
if
(
tsdbSetRepoEnv
(
rootDir
,
pCfg
)
<
0
)
return
-
1
;
tsdbDebug
(
"vgId:%d tsdb env create succeed! cacheBlockSize %d totalBlocks %d
maxTables %d
daysPerFile %d keep "
"vgId:%d tsdb env create succeed! cacheBlockSize %d totalBlocks %d daysPerFile %d keep "
"%d minRowsPerFileBlock %d maxRowsPerFileBlock %d precision %d compression %d"
,
pCfg
->
tsdbId
,
pCfg
->
cacheBlockSize
,
pCfg
->
totalBlocks
,
pCfg
->
maxTables
,
pCfg
->
daysPerFile
,
pCfg
->
keep
,
pCfg
->
m
inRowsPerFileBlock
,
pCfg
->
m
axRowsPerFileBlock
,
pCfg
->
precision
,
pCfg
->
compression
);
pCfg
->
tsdbId
,
pCfg
->
cacheBlockSize
,
pCfg
->
totalBlocks
,
pCfg
->
daysPerFile
,
pCfg
->
keep
,
pCfg
->
minRowsPerFileBlock
,
pCfg
->
maxRowsPerFileBlock
,
pCfg
->
precision
,
pCfg
->
compression
);
return
0
;
}
...
...
@@ -307,13 +306,6 @@ int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) {
tsdbAlterCacheTotalBlocks
(
pRepo
,
pCfg
->
totalBlocks
);
configChanged
=
true
;
}
if
(
pRCfg
->
maxTables
!=
pCfg
->
maxTables
)
{
if
(
tsdbAlterMaxTables
(
pRepo
,
pCfg
->
maxTables
)
<
0
)
{
tsdbError
(
"vgId:%d failed to configure repo when alter maxTables since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
return
-
1
;
}
configChanged
=
true
;
}
if
(
configChanged
)
{
if
(
tsdbSaveConfig
(
pRepo
->
rootDir
,
&
pRepo
->
config
)
<
0
)
{
...
...
@@ -385,6 +377,18 @@ char *tsdbGetDataDirName(char *rootDir) {
return
fname
;
}
int
tsdbGetNextMaxTables
(
int
tid
)
{
ASSERT
(
tid
>=
1
&&
tid
<=
TSDB_MAX_TABLES
);
int
maxTables
=
TSDB_INIT_NTABLES
;
while
(
true
)
{
maxTables
=
MIN
(
maxTables
,
TSDB_MAX_TABLES
);
if
(
tid
<=
maxTables
)
break
;
maxTables
*=
2
;
}
return
maxTables
+
1
;
}
STsdbMeta
*
tsdbGetMeta
(
TSDB_REPO_T
*
pRepo
)
{
return
((
STsdbRepo
*
)
pRepo
)
->
tsdbMeta
;
}
STsdbFileH
*
tsdbGetFile
(
TSDB_REPO_T
*
pRepo
)
{
return
((
STsdbRepo
*
)
pRepo
)
->
tsdbFileH
;
}
STsdbRepoInfo
*
tsdbGetStatus
(
TSDB_REPO_T
*
pRepo
)
{
return
NULL
;
}
...
...
@@ -417,17 +421,6 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
goto
_err
;
}
// Check maxTables
if
(
pCfg
->
maxTables
==
-
1
)
{
pCfg
->
maxTables
=
TSDB_DEFAULT_TABLES
+
1
;
}
else
{
if
(
pCfg
->
maxTables
-
1
<
TSDB_MIN_TABLES
||
pCfg
->
maxTables
-
1
>
TSDB_MAX_TABLES
)
{
tsdbError
(
"vgId:%d invalid maxTables configuration! maxTables %d TSDB_MIN_TABLES %d TSDB_MAX_TABLES %d"
,
pCfg
->
tsdbId
,
pCfg
->
maxTables
-
1
,
TSDB_MIN_TABLES
,
TSDB_MAX_TABLES
);
goto
_err
;
}
}
// Check daysPerFile
if
(
pCfg
->
daysPerFile
==
-
1
)
{
pCfg
->
daysPerFile
=
TSDB_DEFAULT_DAYS_PER_FILE
;
...
...
@@ -713,6 +706,7 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
int64_t
points
=
0
;
ASSERT
(
pBlock
->
tid
<
pMeta
->
maxTables
);
STable
*
pTable
=
pMeta
->
tables
[
pBlock
->
tid
];
ASSERT
(
pTable
!=
NULL
&&
TABLE_UID
(
pTable
)
==
pBlock
->
uid
);
...
...
@@ -779,7 +773,6 @@ static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) {
}
static
int
tsdbRestoreInfo
(
STsdbRepo
*
pRepo
)
{
// TODO
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
STsdbFileH
*
pFileH
=
pRepo
->
tsdbFileH
;
SFileGroup
*
pFGroup
=
NULL
;
...
...
@@ -792,7 +785,7 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) {
tsdbInitFileGroupIter
(
pFileH
,
&
iter
,
TSDB_ORDER_DESC
);
while
((
pFGroup
=
tsdbGetFileGroupNext
(
&
iter
))
!=
NULL
)
{
if
(
tsdbSetAndOpenHelperFile
(
&
rhelper
,
pFGroup
)
<
0
)
goto
_err
;
for
(
int
i
=
1
;
i
<
p
Repo
->
config
.
maxTables
;
i
++
)
{
for
(
int
i
=
1
;
i
<
p
Meta
->
maxTables
;
i
++
)
{
STable
*
pTable
=
pMeta
->
tables
[
i
];
if
(
pTable
==
NULL
)
continue
;
tsdbSetHelperTable
(
&
rhelper
,
pTable
,
pRepo
);
...
...
@@ -868,36 +861,6 @@ static int tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) {
return
0
;
}
static
int
tsdbAlterMaxTables
(
STsdbRepo
*
pRepo
,
int32_t
maxTables
)
{
// TODO
int
oldMaxTables
=
pRepo
->
config
.
maxTables
;
if
(
oldMaxTables
<
pRepo
->
config
.
maxTables
)
{
terrno
=
TSDB_CODE_TDB_INVALID_ACTION
;
return
-
1
;
}
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
pMeta
->
tables
=
realloc
(
pMeta
->
tables
,
maxTables
*
sizeof
(
STable
*
));
memset
(
&
pMeta
->
tables
[
oldMaxTables
],
0
,
sizeof
(
STable
*
)
*
(
maxTables
-
oldMaxTables
));
pRepo
->
config
.
maxTables
=
maxTables
;
if
(
pRepo
->
mem
)
{
pRepo
->
mem
->
tData
=
realloc
(
pRepo
->
mem
->
tData
,
maxTables
*
sizeof
(
STableData
*
));
memset
(
POINTER_SHIFT
(
pRepo
->
mem
->
tData
,
sizeof
(
STableData
*
)
*
oldMaxTables
),
0
,
sizeof
(
STableData
*
)
*
(
maxTables
-
oldMaxTables
));
}
if
(
pRepo
->
imem
)
{
pRepo
->
imem
->
tData
=
realloc
(
pRepo
->
imem
->
tData
,
maxTables
*
sizeof
(
STableData
*
));
memset
(
POINTER_SHIFT
(
pRepo
->
imem
->
tData
,
sizeof
(
STableData
*
)
*
oldMaxTables
),
0
,
sizeof
(
STableData
*
)
*
(
maxTables
-
oldMaxTables
));
}
tsdbDebug
(
"vgId:%d, tsdb maxTables is changed from %d to %d!"
,
pRepo
->
config
.
tsdbId
,
oldMaxTables
,
maxTables
);
return
0
;
}
static
int
keyFGroupCompFunc
(
const
void
*
key
,
const
void
*
fgroup
)
{
int
fid
=
*
(
int
*
)
key
;
SFileGroup
*
pFGroup
=
(
SFileGroup
*
)
fgroup
;
...
...
@@ -914,7 +877,6 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg) {
tlen
+=
taosEncodeVariantI32
(
buf
,
pCfg
->
tsdbId
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pCfg
->
cacheBlockSize
);
tlen
+=
taosEncodeVariantI32
(
buf
,
pCfg
->
totalBlocks
);
tlen
+=
taosEncodeVariantI32
(
buf
,
pCfg
->
maxTables
);
tlen
+=
taosEncodeVariantI32
(
buf
,
pCfg
->
daysPerFile
);
tlen
+=
taosEncodeVariantI32
(
buf
,
pCfg
->
keep
);
tlen
+=
taosEncodeVariantI32
(
buf
,
pCfg
->
keep1
);
...
...
@@ -931,7 +893,6 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) {
buf
=
taosDecodeVariantI32
(
buf
,
&
(
pCfg
->
tsdbId
));
buf
=
taosDecodeFixedI32
(
buf
,
&
(
pCfg
->
cacheBlockSize
));
buf
=
taosDecodeVariantI32
(
buf
,
&
(
pCfg
->
totalBlocks
));
buf
=
taosDecodeVariantI32
(
buf
,
&
(
pCfg
->
maxTables
));
buf
=
taosDecodeVariantI32
(
buf
,
&
(
pCfg
->
daysPerFile
));
buf
=
taosDecodeVariantI32
(
buf
,
&
(
pCfg
->
keep
));
buf
=
taosDecodeVariantI32
(
buf
,
&
(
pCfg
->
keep1
));
...
...
@@ -1037,7 +998,7 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
pBlock
->
schemaLen
=
htonl
(
pBlock
->
schemaLen
);
pBlock
->
numOfRows
=
htons
(
pBlock
->
numOfRows
);
if
(
pBlock
->
tid
<=
0
||
pBlock
->
tid
>=
p
Repo
->
config
.
maxTables
)
{
if
(
pBlock
->
tid
<=
0
||
pBlock
->
tid
>=
p
Meta
->
maxTables
)
{
tsdbError
(
"vgId:%d failed to get table to insert data, uid %"
PRIu64
" tid %d"
,
REPO_ID
(
pRepo
),
pBlock
->
uid
,
pBlock
->
tid
);
terrno
=
TSDB_CODE_TDB_INVALID_TABLE_ID
;
...
...
@@ -1120,7 +1081,7 @@ TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid) {
static
void
tsdbStartStream
(
STsdbRepo
*
pRepo
)
{
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
for
(
int
i
=
0
;
i
<
p
Repo
->
config
.
maxTables
;
i
++
)
{
for
(
int
i
=
0
;
i
<
p
Meta
->
maxTables
;
i
++
)
{
STable
*
pTable
=
pMeta
->
tables
[
i
];
if
(
pTable
&&
pTable
->
type
==
TSDB_STREAM_TABLE
)
{
pTable
->
cqhandle
=
(
*
pRepo
->
appH
.
cqCreateFunc
)(
pRepo
->
appH
.
cqH
,
TABLE_UID
(
pTable
),
TABLE_TID
(
pTable
),
pTable
->
sql
,
...
...
@@ -1133,7 +1094,7 @@ static void tsdbStartStream(STsdbRepo *pRepo) {
static
void
tsdbStopStream
(
STsdbRepo
*
pRepo
)
{
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
for
(
int
i
=
0
;
i
<
p
Repo
->
config
.
maxTables
;
i
++
)
{
for
(
int
i
=
0
;
i
<
p
Meta
->
maxTables
;
i
++
)
{
STable
*
pTable
=
pMeta
->
tables
[
i
];
if
(
pTable
&&
pTable
->
type
==
TSDB_STREAM_TABLE
)
{
(
*
pRepo
->
appH
.
cqDropFunc
)(
pTable
->
cqhandle
);
...
...
src/tsdb/src/tsdbMemTable.c
浏览文件 @
e28f1139
...
...
@@ -21,7 +21,7 @@
static
FORCE_INLINE
STsdbBufBlock
*
tsdbGetCurrBufBlock
(
STsdbRepo
*
pRepo
);
static
void
tsdbFreeBytes
(
STsdbRepo
*
pRepo
,
void
*
ptr
,
int
bytes
);
static
SMemTable
*
tsdbNewMemTable
(
STsdb
Cfg
*
pCfg
);
static
SMemTable
*
tsdbNewMemTable
(
STsdb
Repo
*
pRepo
);
static
void
tsdbFreeMemTable
(
SMemTable
*
pMemTable
);
static
STableData
*
tsdbNewTableData
(
STsdbCfg
*
pCfg
,
STable
*
pTable
);
static
void
tsdbFreeTableData
(
STableData
*
pTableData
);
...
...
@@ -30,13 +30,15 @@ static void * tsdbCommitData(void *arg);
static
int
tsdbCommitMeta
(
STsdbRepo
*
pRepo
);
static
void
tsdbEndCommit
(
STsdbRepo
*
pRepo
);
static
int
tsdbHasDataToCommit
(
SCommitIter
*
iters
,
int
nIters
,
TSKEY
minKey
,
TSKEY
maxKey
);
static
int
tsdbCommitToFile
(
STsdbRepo
*
pRepo
,
int
fid
,
SCommitIter
*
iters
,
SRWHelper
*
pHelper
,
SDataCols
*
pDataCols
);
static
int
tsdbCommitToFile
(
STsdbRepo
*
pRepo
,
int
fid
,
SCommitIter
*
iters
,
SRWHelper
*
pHelper
,
SDataCols
*
pDataCols
);
static
SCommitIter
*
tsdbCreateCommitIters
(
STsdbRepo
*
pRepo
);
static
void
tsdbDestroyCommitIters
(
SCommitIter
*
iters
,
int
maxTables
);
static
int
tsdbAdjustMemMaxTables
(
SMemTable
*
pMemTable
,
int
maxTables
);
// ---------------- INTERNAL FUNCTIONS ----------------
int
tsdbInsertRowToMem
(
STsdbRepo
*
pRepo
,
SDataRow
row
,
STable
*
pTable
)
{
STsdbCfg
*
pCfg
=
&
pRepo
->
config
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
int32_t
level
=
0
;
int32_t
headSize
=
0
;
TSKEY
key
=
dataRowKey
(
row
);
...
...
@@ -45,7 +47,7 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
SSkipList
*
pSList
=
NULL
;
int
bytes
=
0
;
if
(
pMemTable
!=
NULL
&&
pMemTable
->
tData
[
TABLE_TID
(
pTable
)]
!=
NULL
&&
if
(
pMemTable
!=
NULL
&&
TABLE_TID
(
pTable
)
<
pMemTable
->
maxTables
&&
pMemTable
->
tData
[
TABLE_TID
(
pTable
)]
!=
NULL
&&
pMemTable
->
tData
[
TABLE_TID
(
pTable
)]
->
uid
==
TABLE_UID
(
pTable
))
{
pTableData
=
pMemTable
->
tData
[
TABLE_TID
(
pTable
)];
pSList
=
pTableData
->
pData
;
...
...
@@ -66,13 +68,20 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
// Operations above may change pRepo->mem, retake those values
ASSERT
(
pRepo
->
mem
!=
NULL
);
pMemTable
=
pRepo
->
mem
;
if
(
TABLE_TID
(
pTable
)
>=
pMemTable
->
maxTables
)
{
if
(
tsdbAdjustMemMaxTables
(
pMemTable
,
pMeta
->
maxTables
)
<
0
)
return
-
1
;;
}
pTableData
=
pMemTable
->
tData
[
TABLE_TID
(
pTable
)];
if
(
pTableData
==
NULL
||
pTableData
->
uid
!=
TABLE_UID
(
pTable
))
{
if
(
pTableData
!=
NULL
)
{
// destroy the table skiplist (may have race condition problem)
taosWLockLatch
(
&
(
pMemTable
->
latch
));
pMemTable
->
tData
[
TABLE_TID
(
pTable
)]
=
NULL
;
tsdbFreeTableData
(
pTableData
);
taosWUnLockLatch
(
&
(
pMemTable
->
latch
));
}
pTableData
=
tsdbNewTableData
(
pCfg
,
pTable
);
if
(
pTableData
==
NULL
)
{
tsdbError
(
"vgId:%d failed to insert row with key %"
PRId64
...
...
@@ -122,7 +131,6 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
int
ref
=
T_REF_DEC
(
pMemTable
);
tsdbDebug
(
"vgId:%d unref memtable %p ref %d"
,
REPO_ID
(
pRepo
),
pMemTable
,
ref
);
if
(
ref
==
0
)
{
STsdbCfg
*
pCfg
=
&
pRepo
->
config
;
STsdbBufPool
*
pBufPool
=
pRepo
->
pPool
;
SListNode
*
pNode
=
NULL
;
...
...
@@ -139,7 +147,7 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
}
if
(
tsdbUnlockRepo
(
pRepo
)
<
0
)
return
-
1
;
for
(
int
i
=
0
;
i
<
p
Cfg
->
maxTables
;
i
++
)
{
for
(
int
i
=
0
;
i
<
p
MemTable
->
maxTables
;
i
++
)
{
if
(
pMemTable
->
tData
[
i
]
!=
NULL
)
{
tsdbFreeTableData
(
pMemTable
->
tData
[
i
]);
}
...
...
@@ -161,11 +169,24 @@ int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) {
tsdbRefMemTable
(
pRepo
,
*
pIMem
);
if
(
tsdbUnlockRepo
(
pRepo
)
<
0
)
return
-
1
;
tsdbDebug
(
"vgId:%d take memory snapshot, pMem %p pIMem %p"
,
REPO_ID
(
pRepo
),
*
pMem
,
*
pIMem
);
if
(
*
pMem
!=
NULL
)
taosRLockLatch
(
&
((
*
pMem
)
->
latch
));
tsdbDebug
(
"vgId:%d take memory snapshot, pMem %p pIMem %p"
,
REPO_ID
(
pRepo
),
*
pMem
,
*
pIMem
);
return
0
;
}
void
tsdbUnTakeMemSnapShot
(
STsdbRepo
*
pRepo
,
SMemTable
*
pMem
,
SMemTable
*
pIMem
)
{
if
(
pMem
!=
NULL
)
{
taosRUnLockLatch
(
&
(
pMem
->
latch
));
tsdbUnRefMemTable
(
pRepo
,
pMem
);
}
if
(
pIMem
!=
NULL
)
{
tsdbUnRefMemTable
(
pRepo
,
pIMem
);
}
}
void
*
tsdbAllocBytes
(
STsdbRepo
*
pRepo
,
int
bytes
)
{
STsdbCfg
*
pCfg
=
&
pRepo
->
config
;
STsdbBufBlock
*
pBufBlock
=
tsdbGetCurrBufBlock
(
pRepo
);
...
...
@@ -182,7 +203,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
}
if
(
pRepo
->
mem
==
NULL
)
{
SMemTable
*
pMemTable
=
tsdbNewMemTable
(
&
pRepo
->
config
);
SMemTable
*
pMemTable
=
tsdbNewMemTable
(
pRepo
);
if
(
pMemTable
==
NULL
)
return
NULL
;
if
(
tsdbLockRepo
(
pRepo
)
<
0
)
{
...
...
@@ -329,7 +350,9 @@ static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) {
listNEles
(
pRepo
->
mem
->
bufBlockList
),
pBufBlock
->
offset
,
pBufBlock
->
remain
);
}
static
SMemTable
*
tsdbNewMemTable
(
STsdbCfg
*
pCfg
)
{
static
SMemTable
*
tsdbNewMemTable
(
STsdbRepo
*
pRepo
)
{
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
SMemTable
*
pMemTable
=
(
SMemTable
*
)
calloc
(
1
,
sizeof
(
*
pMemTable
));
if
(
pMemTable
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
...
...
@@ -340,7 +363,8 @@ static SMemTable* tsdbNewMemTable(STsdbCfg* pCfg) {
pMemTable
->
keyLast
=
0
;
pMemTable
->
numOfRows
=
0
;
pMemTable
->
tData
=
(
STableData
**
)
calloc
(
pCfg
->
maxTables
,
sizeof
(
STableData
*
));
pMemTable
->
maxTables
=
pMeta
->
maxTables
;
pMemTable
->
tData
=
(
STableData
**
)
calloc
(
pMemTable
->
maxTables
,
sizeof
(
STableData
*
));
if
(
pMemTable
->
tData
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_err
;
...
...
@@ -398,9 +422,6 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) {
goto
_err
;
}
// TODO: operation here should not be here, remove it
pTableData
->
pData
->
level
=
1
;
return
pTableData
;
_err:
...
...
@@ -473,7 +494,7 @@ static void *tsdbCommitData(void *arg) {
_exit:
tdFreeDataCols
(
pDataCols
);
tsdbDestroyCommitIters
(
iters
,
p
Cfg
->
maxTables
);
tsdbDestroyCommitIters
(
iters
,
p
Mem
->
maxTables
);
tsdbDestroyHelper
(
&
whelper
);
tsdbEndCommit
(
pRepo
);
tsdbInfo
(
"vgId:%d commit over"
,
pRepo
->
config
.
tsdbId
);
...
...
@@ -552,12 +573,13 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
STsdbCfg
*
pCfg
=
&
pRepo
->
config
;
STsdbFileH
*
pFileH
=
pRepo
->
tsdbFileH
;
SFileGroup
*
pGroup
=
NULL
;
SMemTable
*
pMem
=
pRepo
->
imem
;
TSKEY
minKey
=
0
,
maxKey
=
0
;
tsdbGetFidKeyRange
(
pCfg
->
daysPerFile
,
pCfg
->
precision
,
fid
,
&
minKey
,
&
maxKey
);
// Check if there are data to commit to this file
int
hasDataToCommit
=
tsdbHasDataToCommit
(
iters
,
p
Cfg
->
maxTables
,
minKey
,
maxKey
);
int
hasDataToCommit
=
tsdbHasDataToCommit
(
iters
,
p
Mem
->
maxTables
,
minKey
,
maxKey
);
if
(
!
hasDataToCommit
)
{
tsdbDebug
(
"vgId:%d no data to commit to file %d"
,
REPO_ID
(
pRepo
),
fid
);
return
0
;
...
...
@@ -570,7 +592,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
return
-
1
;
}
if
((
pGroup
=
tsdbCreateFGroupIfNeed
(
pRepo
,
dataDir
,
fid
,
pCfg
->
maxTables
))
==
NULL
)
{
if
((
pGroup
=
tsdbCreateFGroupIfNeed
(
pRepo
,
dataDir
,
fid
))
==
NULL
)
{
tsdbError
(
"vgId:%d failed to create file group %d since %s"
,
REPO_ID
(
pRepo
),
fid
,
tstrerror
(
terrno
));
goto
_err
;
}
...
...
@@ -582,7 +604,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
}
// Loop to commit data in each table
for
(
int
tid
=
1
;
tid
<
p
Cfg
->
maxTables
;
tid
++
)
{
for
(
int
tid
=
1
;
tid
<
p
Mem
->
maxTables
;
tid
++
)
{
SCommitIter
*
pIter
=
iters
+
tid
;
if
(
pIter
->
pTable
==
NULL
)
continue
;
...
...
@@ -643,11 +665,10 @@ _err:
}
static
SCommitIter
*
tsdbCreateCommitIters
(
STsdbRepo
*
pRepo
)
{
STsdbCfg
*
pCfg
=
&
(
pRepo
->
config
);
SMemTable
*
pMem
=
pRepo
->
imem
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
SCommitIter
*
iters
=
(
SCommitIter
*
)
calloc
(
p
Cfg
->
maxTables
,
sizeof
(
SCommitIter
));
SCommitIter
*
iters
=
(
SCommitIter
*
)
calloc
(
p
Mem
->
maxTables
,
sizeof
(
SCommitIter
));
if
(
iters
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
NULL
;
...
...
@@ -656,7 +677,7 @@ static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
if
(
tsdbRLockRepoMeta
(
pRepo
)
<
0
)
goto
_err
;
// reference all tables
for
(
int
i
=
0
;
i
<
p
Cfg
->
maxTables
;
i
++
)
{
for
(
int
i
=
0
;
i
<
p
Mem
->
maxTables
;
i
++
)
{
if
(
pMeta
->
tables
[
i
]
!=
NULL
)
{
tsdbRefTable
(
pMeta
->
tables
[
i
]);
iters
[
i
].
pTable
=
pMeta
->
tables
[
i
];
...
...
@@ -665,7 +686,7 @@ static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
if
(
tsdbUnlockRepoMeta
(
pRepo
)
<
0
)
goto
_err
;
for
(
int
i
=
0
;
i
<
p
Cfg
->
maxTables
;
i
++
)
{
for
(
int
i
=
0
;
i
<
p
Mem
->
maxTables
;
i
++
)
{
if
((
iters
[
i
].
pTable
!=
NULL
)
&&
(
pMem
->
tData
[
i
]
!=
NULL
)
&&
(
TABLE_UID
(
iters
[
i
].
pTable
)
==
pMem
->
tData
[
i
]
->
uid
))
{
if
((
iters
[
i
].
pIter
=
tSkipListCreateIter
(
pMem
->
tData
[
i
]
->
pData
))
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
...
...
@@ -679,7 +700,7 @@ static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
return
iters
;
_err:
tsdbDestroyCommitIters
(
iters
,
p
Cfg
->
maxTables
);
tsdbDestroyCommitIters
(
iters
,
p
Mem
->
maxTables
);
return
NULL
;
}
...
...
@@ -694,4 +715,26 @@ static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) {
}
free
(
iters
);
}
static
int
tsdbAdjustMemMaxTables
(
SMemTable
*
pMemTable
,
int
maxTables
)
{
ASSERT
(
pMemTable
->
maxTables
<
maxTables
);
STableData
**
pTableData
=
(
STableData
**
)
calloc
(
maxTables
,
sizeof
(
STableData
*
));
if
(
pTableData
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
}
memcpy
((
void
*
)
pTableData
,
(
void
*
)
pMemTable
->
tData
,
sizeof
(
STableData
*
)
*
pMemTable
->
maxTables
);
STableData
**
tData
=
pMemTable
->
tData
;
taosWLockLatch
(
&
(
pMemTable
->
latch
));
pMemTable
->
maxTables
=
maxTables
;
pMemTable
->
tData
=
pTableData
;
taosWUnLockLatch
(
&
(
pMemTable
->
latch
));
tfree
(
tData
);
return
0
;
}
\ No newline at end of file
src/tsdb/src/tsdbMeta.c
浏览文件 @
e28f1139
...
...
@@ -49,6 +49,7 @@ static int tsdbGetTableEncodeSize(int8_t act, STable *pTable);
static
void
*
tsdbInsertTableAct
(
STsdbRepo
*
pRepo
,
int8_t
act
,
void
*
buf
,
STable
*
pTable
);
static
int
tsdbRemoveTableFromStore
(
STsdbRepo
*
pRepo
,
STable
*
pTable
);
static
int
tsdbRmTableFromMeta
(
STsdbRepo
*
pRepo
,
STable
*
pTable
);
static
int
tsdbAdjustMetaTables
(
STsdbRepo
*
pRepo
,
int
tid
);
// ------------------ OUTER FUNCTIONS ------------------
int
tsdbCreateTable
(
TSDB_REPO_T
*
repo
,
STableCfg
*
pCfg
)
{
...
...
@@ -60,13 +61,13 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
int
tid
=
pCfg
->
tableId
.
tid
;
STable
*
pTable
=
NULL
;
if
(
tid
<
0
||
tid
>=
pRepo
->
config
.
maxTables
)
{
if
(
tid
<
1
||
tid
>
TSDB_MAX_TABLES
)
{
tsdbError
(
"vgId:%d failed to create table since invalid tid %d"
,
REPO_ID
(
pRepo
),
tid
);
terrno
=
TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO
;
goto
_err
;
}
if
(
pMeta
->
tables
[
tid
]
!=
NULL
)
{
if
(
tid
<
pMeta
->
maxTables
&&
pMeta
->
tables
[
tid
]
!=
NULL
)
{
if
(
TABLE_UID
(
pMeta
->
tables
[
tid
])
==
pCfg
->
tableId
.
uid
)
{
tsdbError
(
"vgId:%d table %s already exists, tid %d uid %"
PRId64
,
REPO_ID
(
pRepo
),
TABLE_CHAR_NAME
(
pTable
),
TABLE_TID
(
pTable
),
TABLE_UID
(
pTable
));
...
...
@@ -422,7 +423,8 @@ STsdbMeta *tsdbNewMeta(STsdbCfg *pCfg) {
goto
_err
;
}
pMeta
->
tables
=
(
STable
**
)
calloc
(
pCfg
->
maxTables
,
sizeof
(
STable
*
));
pMeta
->
maxTables
=
TSDB_INIT_NTABLES
+
1
;
pMeta
->
tables
=
(
STable
**
)
calloc
(
pMeta
->
maxTables
,
sizeof
(
STable
*
));
if
(
pMeta
->
tables
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_err
;
...
...
@@ -434,7 +436,7 @@ STsdbMeta *tsdbNewMeta(STsdbCfg *pCfg) {
goto
_err
;
}
pMeta
->
uidMap
=
taosHashInit
(
pCfg
->
maxTables
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
);
pMeta
->
uidMap
=
taosHashInit
(
TSDB_INIT_NTABLES
*
1
.
1
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
);
if
(
pMeta
->
uidMap
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_err
;
...
...
@@ -484,14 +486,13 @@ _err:
}
int
tsdbCloseMeta
(
STsdbRepo
*
pRepo
)
{
STsdbCfg
*
pCfg
=
&
pRepo
->
config
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
SListNode
*
pNode
=
NULL
;
STable
*
pTable
=
NULL
;
if
(
pMeta
==
NULL
)
return
0
;
tdCloseKVStore
(
pMeta
->
pStore
);
for
(
int
i
=
1
;
i
<
p
Cfg
->
maxTables
;
i
++
)
{
for
(
int
i
=
1
;
i
<
p
Meta
->
maxTables
;
i
++
)
{
tsdbFreeTable
(
pMeta
->
tables
[
i
]);
}
...
...
@@ -624,9 +625,8 @@ static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
static
void
tsdbOrgMeta
(
void
*
pHandle
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
pHandle
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
STsdbCfg
*
pCfg
=
&
pRepo
->
config
;
for
(
int
i
=
1
;
i
<
p
Cfg
->
maxTables
;
i
++
)
{
for
(
int
i
=
1
;
i
<
p
Meta
->
maxTables
;
i
++
)
{
STable
*
pTable
=
pMeta
->
tables
[
i
];
if
(
pTable
!=
NULL
&&
pTable
->
type
==
TSDB_CHILD_TABLE
)
{
tsdbAddTableIntoIndex
(
pMeta
,
pTable
,
true
);
...
...
@@ -781,6 +781,9 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo
goto
_err
;
}
}
else
{
if
(
TABLE_TID
(
pTable
)
>=
pMeta
->
maxTables
)
{
if
(
tsdbAdjustMetaTables
(
pRepo
,
TABLE_TID
(
pTable
))
<
0
)
goto
_err
;
}
if
(
TABLE_TYPE
(
pTable
)
==
TSDB_CHILD_TABLE
&&
addIdx
)
{
// add STABLE to the index
if
(
tsdbAddTableIntoIndex
(
pMeta
,
pTable
,
true
)
<
0
)
{
tsdbDebug
(
"vgId:%d failed to add table %s to meta while add table to index since %s"
,
REPO_ID
(
pRepo
),
...
...
@@ -788,6 +791,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo
goto
_err
;
}
}
ASSERT
(
TABLE_TID
(
pTable
)
<
pMeta
->
maxTables
);
pMeta
->
tables
[
TABLE_TID
(
pTable
)]
=
pTable
;
pMeta
->
nTables
++
;
}
...
...
@@ -827,7 +831,6 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
SListIter
lIter
=
{
0
};
SListNode
*
pNode
=
NULL
;
STable
*
tTable
=
NULL
;
STsdbCfg
*
pCfg
=
&
(
pRepo
->
config
);
STSchema
*
pSchema
=
tsdbGetTableSchemaImpl
(
pTable
,
false
,
false
,
-
1
);
int
maxCols
=
schemaNCols
(
pSchema
);
...
...
@@ -860,7 +863,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
if
(
maxCols
==
pMeta
->
maxCols
||
maxRowBytes
==
pMeta
->
maxRowBytes
)
{
maxCols
=
0
;
maxRowBytes
=
0
;
for
(
int
i
=
0
;
i
<
p
Cfg
->
maxTables
;
i
++
)
{
for
(
int
i
=
0
;
i
<
p
Meta
->
maxTables
;
i
++
)
{
STable
*
pTable
=
pMeta
->
tables
[
i
];
if
(
pTable
!=
NULL
)
{
pSchema
=
tsdbGetTableSchemaImpl
(
pTable
,
false
,
false
,
-
1
);
...
...
@@ -1266,5 +1269,28 @@ static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable) {
tsdbRemoveTableFromMeta
(
pRepo
,
pTable
,
true
,
true
);
}
return
0
;
}
static
int
tsdbAdjustMetaTables
(
STsdbRepo
*
pRepo
,
int
tid
)
{
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
ASSERT
(
tid
>=
pMeta
->
maxTables
);
int
maxTables
=
tsdbGetNextMaxTables
(
tid
);
STable
**
tables
=
(
STable
**
)
calloc
(
maxTables
,
sizeof
(
STable
*
));
if
(
tables
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
}
memcpy
((
void
*
)
tables
,
(
void
*
)
pMeta
->
tables
,
sizeof
(
STable
*
)
*
pMeta
->
maxTables
);
pMeta
->
maxTables
=
maxTables
;
STable
**
tTables
=
pMeta
->
tables
;
pMeta
->
tables
=
tables
;
tfree
(
tTables
);
tsdbDebug
(
"vgId:%d tsdb meta maxTables is adjusted as %d"
,
REPO_ID
(
pRepo
),
maxTables
);
return
0
;
}
\ No newline at end of file
src/tsdb/src/tsdbRead.c
浏览文件 @
e28f1139
...
...
@@ -317,17 +317,20 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
}
assert
(
pCheckInfo
->
iter
==
NULL
&&
pCheckInfo
->
iiter
==
NULL
);
if
(
pHandle
->
mem
&&
pHandle
->
mem
->
tData
[
pCheckInfo
->
tableId
.
tid
]
!=
NULL
)
{
// TODO: add uid check
if
(
pHandle
->
mem
&&
pCheckInfo
->
tableId
.
tid
<
pHandle
->
mem
->
maxTables
&&
pHandle
->
mem
->
tData
[
pCheckInfo
->
tableId
.
tid
]
!=
NULL
)
{
pCheckInfo
->
iter
=
tSkipListCreateIterFromVal
(
pHandle
->
mem
->
tData
[
pCheckInfo
->
tableId
.
tid
]
->
pData
,
(
const
char
*
)
&
pCheckInfo
->
lastKey
,
TSDB_DATA_TYPE_TIMESTAMP
,
order
);
(
const
char
*
)
&
pCheckInfo
->
lastKey
,
TSDB_DATA_TYPE_TIMESTAMP
,
order
);
}
if
(
pHandle
->
imem
&&
pHandle
->
imem
->
tData
[
pCheckInfo
->
tableId
.
tid
]
!=
NULL
)
{
if
(
pHandle
->
imem
&&
pCheckInfo
->
tableId
.
tid
<
pHandle
->
imem
->
maxTables
&&
pHandle
->
imem
->
tData
[
pCheckInfo
->
tableId
.
tid
]
!=
NULL
)
{
pCheckInfo
->
iiter
=
tSkipListCreateIterFromVal
(
pHandle
->
imem
->
tData
[
pCheckInfo
->
tableId
.
tid
]
->
pData
,
(
const
char
*
)
&
pCheckInfo
->
lastKey
,
TSDB_DATA_TYPE_TIMESTAMP
,
order
);
(
const
char
*
)
&
pCheckInfo
->
lastKey
,
TSDB_DATA_TYPE_TIMESTAMP
,
order
);
}
// both iterators are NULL, no data in buffer right now
if
(
pCheckInfo
->
iter
==
NULL
&&
pCheckInfo
->
iiter
==
NULL
)
{
return
false
;
...
...
@@ -1529,7 +1532,6 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
static
bool
doHasDataInBuffer
(
STsdbQueryHandle
*
pQueryHandle
)
{
size_t
numOfTables
=
taosArrayGetSize
(
pQueryHandle
->
pTableCheckInfo
);
assert
(
numOfTables
<=
((
STsdbRepo
*
)
pQueryHandle
->
pTsdb
)
->
config
.
maxTables
);
while
(
pQueryHandle
->
activeIndex
<
numOfTables
)
{
if
(
hasMoreDataInCache
(
pQueryHandle
))
{
...
...
@@ -2418,8 +2420,7 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
tfree
(
pQueryHandle
->
statis
);
// todo check error
tsdbUnRefMemTable
(
pQueryHandle
->
pTsdb
,
pQueryHandle
->
mem
);
tsdbUnRefMemTable
(
pQueryHandle
->
pTsdb
,
pQueryHandle
->
imem
);
tsdbUnTakeMemSnapShot
(
pQueryHandle
->
pTsdb
,
pQueryHandle
->
mem
,
pQueryHandle
->
imem
);
tsdbDestroyHelper
(
&
pQueryHandle
->
rhelper
);
...
...
src/tsdb/tests/tsdbTests.cpp
浏览文件 @
e28f1139
...
...
@@ -98,7 +98,7 @@ static void tsdbSetCfg(STsdbCfg *pCfg, int32_t tsdbId, int32_t cacheBlockSize, i
pCfg
->
tsdbId
=
tsdbId
;
pCfg
->
cacheBlockSize
=
cacheBlockSize
;
pCfg
->
totalBlocks
=
totalBlocks
;
pCfg
->
maxTables
=
maxTables
;
//
pCfg->maxTables = maxTables;
pCfg
->
daysPerFile
=
daysPerFile
;
pCfg
->
keep
=
keep
;
pCfg
->
minRowsPerFileBlock
=
minRows
;
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
e28f1139
...
...
@@ -123,7 +123,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
tsdbCfg
.
tsdbId
=
pVnodeCfg
->
cfg
.
vgId
;
tsdbCfg
.
cacheBlockSize
=
pVnodeCfg
->
cfg
.
cacheBlockSize
;
tsdbCfg
.
totalBlocks
=
pVnodeCfg
->
cfg
.
totalBlocks
;
tsdbCfg
.
maxTables
=
pVnodeCfg
->
cfg
.
maxTables
;
//
tsdbCfg.maxTables = pVnodeCfg->cfg.maxTables;
tsdbCfg
.
daysPerFile
=
pVnodeCfg
->
cfg
.
daysPerFile
;
tsdbCfg
.
keep
=
pVnodeCfg
->
cfg
.
daysToKeep
;
tsdbCfg
.
minRowsPerFileBlock
=
pVnodeCfg
->
cfg
.
minRowsPerFileBlock
;
...
...
@@ -630,14 +630,14 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
cfgVersion
\"
: %d,
\n
"
,
pVnodeCfg
->
cfg
.
cfgVersion
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
cacheBlockSize
\"
: %d,
\n
"
,
pVnodeCfg
->
cfg
.
cacheBlockSize
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
totalBlocks
\"
: %d,
\n
"
,
pVnodeCfg
->
cfg
.
totalBlocks
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
maxTables
\"
: %d,
\n
"
,
pVnodeCfg
->
cfg
.
maxTables
);
//
len += snprintf(content + len, maxLen - len, " \"maxTables\": %d,\n", pVnodeCfg->cfg.maxTables);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
daysPerFile
\"
: %d,
\n
"
,
pVnodeCfg
->
cfg
.
daysPerFile
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
daysToKeep
\"
: %d,
\n
"
,
pVnodeCfg
->
cfg
.
daysToKeep
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
daysToKeep1
\"
: %d,
\n
"
,
pVnodeCfg
->
cfg
.
daysToKeep1
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
daysToKeep2
\"
: %d,
\n
"
,
pVnodeCfg
->
cfg
.
daysToKeep2
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
minRowsPerFileBlock
\"
: %d,
\n
"
,
pVnodeCfg
->
cfg
.
minRowsPerFileBlock
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
maxRowsPerFileBlock
\"
: %d,
\n
"
,
pVnodeCfg
->
cfg
.
maxRowsPerFileBlock
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
commitTime
\"
: %d,
\n
"
,
pVnodeCfg
->
cfg
.
commitTime
);
//
len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
precision
\"
: %d,
\n
"
,
pVnodeCfg
->
cfg
.
precision
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
compression
\"
: %d,
\n
"
,
pVnodeCfg
->
cfg
.
compression
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
walLevel
\"
: %d,
\n
"
,
pVnodeCfg
->
cfg
.
walLevel
);
...
...
@@ -729,12 +729,12 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
}
pVnode
->
tsdbCfg
.
totalBlocks
=
totalBlocks
->
valueint
;
cJSON
*
maxTables
=
cJSON_GetObjectItem
(
root
,
"maxTables"
);
if
(
!
maxTables
||
maxTables
->
type
!=
cJSON_Number
)
{
vError
(
"vgId:%d, failed to read vnode cfg, maxTables not found"
,
pVnode
->
vgId
);
goto
PARSE_OVER
;
}
pVnode
->
tsdbCfg
.
maxTables
=
maxTables
->
valueint
;
//
cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables");
//
if (!maxTables || maxTables->type != cJSON_Number) {
//
vError("vgId:%d, failed to read vnode cfg, maxTables not found", pVnode->vgId);
//
goto PARSE_OVER;
//
}
//
pVnode->tsdbCfg.maxTables = maxTables->valueint;
cJSON
*
daysPerFile
=
cJSON_GetObjectItem
(
root
,
"daysPerFile"
);
if
(
!
daysPerFile
||
daysPerFile
->
type
!=
cJSON_Number
)
{
...
...
@@ -778,12 +778,12 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
}
pVnode
->
tsdbCfg
.
maxRowsPerFileBlock
=
maxRowsPerFileBlock
->
valueint
;
cJSON
*
commitTime
=
cJSON_GetObjectItem
(
root
,
"commitTime"
);
if
(
!
commitTime
||
commitTime
->
type
!=
cJSON_Number
)
{
vError
(
"vgId:%d, failed to read vnode cfg, commitTime not found"
,
pVnode
->
vgId
);
goto
PARSE_OVER
;
}
pVnode
->
tsdbCfg
.
commitTime
=
(
int8_t
)
commitTime
->
valueint
;
//
cJSON *commitTime = cJSON_GetObjectItem(root, "commitTime");
//
if (!commitTime || commitTime->type != cJSON_Number) {
//
vError("vgId:%d, failed to read vnode cfg, commitTime not found", pVnode->vgId);
//
goto PARSE_OVER;
//
}
//
pVnode->tsdbCfg.commitTime = (int8_t)commitTime->valueint;
cJSON
*
precision
=
cJSON_GetObjectItem
(
root
,
"precision"
);
if
(
!
precision
||
precision
->
type
!=
cJSON_Number
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录