Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d68899e5
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
d68899e5
编写于
12月 15, 2021
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
more work
上级
2e643995
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
117 addition
and
1391 deletion
+117
-1391
include/dnode/vnode/tsdb/tsdb.h
include/dnode/vnode/tsdb/tsdb.h
+4
-6
include/util/tdef.h
include/util/tdef.h
+3
-1
source/dnode/vnode/impl/CMakeLists.txt
source/dnode/vnode/impl/CMakeLists.txt
+1
-1
source/dnode/vnode/tsdb/inc/tsdbDef.h
source/dnode/vnode/tsdb/inc/tsdbDef.h
+4
-0
source/dnode/vnode/tsdb/inc/tsdbMemTable.h
source/dnode/vnode/tsdb/inc/tsdbMemTable.h
+5
-4
source/dnode/vnode/tsdb/src/tsdbCommit.c
source/dnode/vnode/tsdb/src/tsdbCommit.c
+21
-0
source/dnode/vnode/tsdb/src/tsdbMemTable.c
source/dnode/vnode/tsdb/src/tsdbMemTable.c
+54
-1
source/dnode/vnode/tsdb/src/tsdbWrite.c
source/dnode/vnode/tsdb/src/tsdbWrite.c
+25
-0
src/tsdb/inc/tsdbMemTable.h
src/tsdb/inc/tsdbMemTable.h
+0
-97
src/tsdb/src/tsdbBuffer.c
src/tsdb/src/tsdbBuffer.c
+0
-214
src/tsdb/src/tsdbMemTable.c
src/tsdb/src/tsdbMemTable.c
+0
-1067
未找到文件。
include/dnode/vnode/tsdb/tsdb.h
浏览文件 @
d68899e5
...
...
@@ -21,15 +21,14 @@ extern "C" {
#endif
// TYPES EXPOSED
typedef
struct
STsdb
STsdb
;
typedef
struct
STsdbCfg
STsdbCfg
;
typedef
struct
STsdbMemAllocator
STsdbMemAllocator
;
typedef
struct
STsdb
STsdb
;
typedef
struct
STsdbCfg
STsdbCfg
;
// STsdb
STsdb
*
tsdbOpen
(
const
char
*
path
,
const
STsdbCfg
*
);
STsdb
*
tsdbOpen
(
const
char
*
path
,
const
STsdbCfg
*
pTsdbCfg
);
void
tsdbClose
(
STsdb
*
);
void
tsdbRemove
(
const
char
*
path
);
int
tsdbInsertData
(
STsdb
*
pTsdb
,
void
*
pData
,
int
len
);
int
tsdbInsertData
(
STsdb
*
pTsdb
,
SSubmitMsg
*
pMsg
);
// STsdbCfg
int
tsdbOptionsInit
(
STsdbCfg
*
);
...
...
@@ -41,7 +40,6 @@ struct STsdbCfg {
uint32_t
keep0
;
uint32_t
keep1
;
uint32_t
keep2
;
/* TODO */
};
#ifdef __cplusplus
...
...
include/util/tdef.h
浏览文件 @
d68899e5
...
...
@@ -25,7 +25,9 @@ extern "C" {
#define TSDB__packed
#define TSKEY int64_t
#define TSKEY_INITIAL_VAL INT64_MIN
#define TSKEY_MIN INT64_MIN
#define TSKEY_MAX (INT64_MAX - 1)
#define TSKEY_INITIAL_VAL TSKEY_MIN
// Bytes for each type.
extern
const
int32_t
TYPE_BYTES
[
15
];
...
...
source/dnode/vnode/impl/CMakeLists.txt
浏览文件 @
d68899e5
...
...
@@ -19,5 +19,5 @@ target_link_libraries(
# test
if
(
${
BUILD_TEST
}
)
#
add_subdirectory(test)
add_subdirectory
(
test
)
endif
(
${
BUILD_TEST
}
)
\ No newline at end of file
source/dnode/vnode/tsdb/inc/tsdbDef.h
浏览文件 @
d68899e5
...
...
@@ -17,6 +17,8 @@
#define _TD_TSDB_DEF_H_
#include "mallocator.h"
#include "taosmsg.h"
#include "thash.h"
#include "tsdb.h"
#include "tsdbMemTable.h"
...
...
@@ -29,6 +31,8 @@ extern "C" {
struct
STsdb
{
char
*
path
;
STsdbCfg
options
;
STsdbMemTable
*
mem
;
STsdbMemTable
*
imem
;
SMemAllocatorFactory
*
pmaf
;
};
...
...
source/dnode/vnode/tsdb/inc/tsdbMemTable.h
浏览文件 @
d68899e5
...
...
@@ -22,10 +22,11 @@
extern
"C"
{
#endif
typedef
struct
SMemTable
{
/* TODO */
SMemAllocator
*
pma
;
}
SMemTable
;
typedef
struct
STsdbMemTable
STsdbMemTable
;
STsdbMemTable
*
tsdbNewMemTable
(
SMemAllocatorFactory
*
pMAF
);
void
tsdbFreeMemTable
(
SMemAllocatorFactory
*
pMAF
,
STsdbMemTable
*
pMemTable
);
int
tsdbInsertDataToMemTable
(
STsdbMemTable
*
pMemTable
,
SSubmitMsg
*
pMsg
);
#ifdef __cplusplus
}
...
...
source/dnode/vnode/tsdb/src/tsdbCommit.c
0 → 100644
浏览文件 @
d68899e5
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbDef.h"
int
tsdbCommit
(
STsdb
*
pTsdb
)
{
// TODO
return
0
;
}
\ No newline at end of file
source/dnode/vnode/tsdb/src/tsdbMemTable.c
浏览文件 @
d68899e5
...
...
@@ -11,4 +11,57 @@
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
*/
#include "tsdbDef.h"
struct
STsdbMemTable
{
T_REF_DECLARE
()
SRWLatch
latch
;
TSKEY
keyMin
;
TSKEY
keyMax
;
uint32_t
nRow
;
SHashObj
*
pHash
;
SMemAllocator
*
pMA
;
};
STsdbMemTable
*
tsdbNewMemTable
(
SMemAllocatorFactory
*
pMAF
)
{
STsdbMemTable
*
pMemTable
;
SMemAllocator
*
pMA
;
pMA
=
(
*
pMAF
->
create
)(
pMAF
);
ASSERT
(
pMA
!=
NULL
);
pMemTable
=
(
STsdbMemTable
*
)((
*
pMA
->
malloc
)(
pMA
,
sizeof
(
*
pMemTable
)));
if
(
pMemTable
==
NULL
)
{
(
*
pMAF
->
destroy
)(
pMAF
,
pMA
);
return
NULL
;
}
T_REF_INIT_VAL
(
pMemTable
,
1
);
taosInitRWLatch
(
&
(
pMemTable
->
latch
));
pMemTable
->
keyMin
=
TSKEY_MAX
;
pMemTable
->
keyMax
=
TSKEY_MIN
;
pMemTable
->
nRow
=
0
;
pMemTable
->
pHash
=
NULL
;
/// TODO
pMemTable
->
pMA
=
pMA
;
// TODO
return
pMemTable
;
}
void
tsdbFreeMemTable
(
SMemAllocatorFactory
*
pMAF
,
STsdbMemTable
*
pMemTable
)
{
SMemAllocator
*
pMA
=
pMemTable
->
pMA
;
if
(
pMA
->
free
)
{
// TODO
ASSERT
(
0
);
}
(
*
pMAF
->
destroy
)(
pMAF
,
pMA
);
}
int
tsdbInsertDataToMemTable
(
STsdbMemTable
*
pMemTable
,
SSubmitMsg
*
pMsg
)
{
// TODO
return
0
;
}
\ No newline at end of file
source/dnode/vnode/tsdb/src/tsdbWrite.c
0 → 100644
浏览文件 @
d68899e5
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbDef.h"
int
tsdbInsertData
(
STsdb
*
pTsdb
,
SSubmitMsg
*
pMsg
)
{
// Check if mem is there. If not, create one.
pTsdb
->
mem
=
tsdbNewMemTable
(
pTsdb
->
pmaf
);
if
(
pTsdb
->
mem
==
NULL
)
{
return
-
1
;
}
return
tsdbInsertDataToMemTable
(
pTsdb
->
mem
,
pMsg
);
}
\ No newline at end of file
src/tsdb/inc/tsdbMemTable.h
已删除
100644 → 0
浏览文件 @
2e643995
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_MEMTABLE_H_
#define _TD_TSDB_MEMTABLE_H_
typedef
struct
{
int
rowsInserted
;
int
rowsUpdated
;
int
rowsDeleteSucceed
;
int
rowsDeleteFailed
;
int
nOperations
;
TSKEY
keyFirst
;
TSKEY
keyLast
;
}
SMergeInfo
;
typedef
struct
{
STable
*
pTable
;
SSkipListIterator
*
pIter
;
}
SCommitIter
;
struct
STableData
{
uint64_t
uid
;
TSKEY
keyFirst
;
TSKEY
keyLast
;
int64_t
numOfRows
;
SSkipList
*
pData
;
T_REF_DECLARE
()
};
enum
{
TSDB_UPDATE_META
,
TSDB_DROP_META
};
#ifdef WINDOWS
#pragma pack(push ,1)
typedef
struct
{
#else
typedef
struct
__attribute__
((
packed
)){
#endif
char
act
;
uint64_t
uid
;
}
SActObj
;
#ifdef WINDOWS
#pragma pack(pop)
#endif
typedef
struct
{
int
len
;
char
cont
[];
}
SActCont
;
int
tsdbRefMemTable
(
STsdbRepo
*
pRepo
,
SMemTable
*
pMemTable
);
int
tsdbUnRefMemTable
(
STsdbRepo
*
pRepo
,
SMemTable
*
pMemTable
);
int
tsdbTakeMemSnapshot
(
STsdbRepo
*
pRepo
,
SMemSnapshot
*
pSnapshot
,
SArray
*
pATable
);
void
tsdbUnTakeMemSnapShot
(
STsdbRepo
*
pRepo
,
SMemSnapshot
*
pSnapshot
);
void
*
tsdbAllocBytes
(
STsdbRepo
*
pRepo
,
int
bytes
);
int
tsdbAsyncCommit
(
STsdbRepo
*
pRepo
);
int
tsdbSyncCommitConfig
(
STsdbRepo
*
pRepo
);
int
tsdbLoadDataFromCache
(
STable
*
pTable
,
SSkipListIterator
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
,
TKEY
*
filterKeys
,
int
nFilterKeys
,
bool
keepDup
,
SMergeInfo
*
pMergeInfo
);
void
*
tsdbCommitData
(
STsdbRepo
*
pRepo
);
static
FORCE_INLINE
SMemRow
tsdbNextIterRow
(
SSkipListIterator
*
pIter
)
{
if
(
pIter
==
NULL
)
return
NULL
;
SSkipListNode
*
node
=
tSkipListIterGet
(
pIter
);
if
(
node
==
NULL
)
return
NULL
;
return
(
SMemRow
)
SL_GET_NODE_DATA
(
node
);
}
static
FORCE_INLINE
TSKEY
tsdbNextIterKey
(
SSkipListIterator
*
pIter
)
{
SMemRow
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
)
return
TSDB_DATA_TIMESTAMP_NULL
;
return
memRowKey
(
row
);
}
static
FORCE_INLINE
TKEY
tsdbNextIterTKey
(
SSkipListIterator
*
pIter
)
{
SMemRow
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
)
return
TKEY_NULL
;
return
memRowTKey
(
row
);
}
#endif
/* _TD_TSDB_MEMTABLE_H_ */
\ No newline at end of file
src/tsdb/src/tsdbBuffer.c
已删除
100644 → 0
浏览文件 @
2e643995
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdbint.h"
#include "tsdbHealth.h"
#define POOL_IS_EMPTY(b) (listNEles((b)->bufBlockList) == 0)
// ---------------- INTERNAL FUNCTIONS ----------------
STsdbBufPool
*
tsdbNewBufPool
()
{
STsdbBufPool
*
pBufPool
=
(
STsdbBufPool
*
)
calloc
(
1
,
sizeof
(
*
pBufPool
));
if
(
pBufPool
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_err
;
}
int
code
=
pthread_cond_init
(
&
(
pBufPool
->
poolNotEmpty
),
NULL
);
if
(
code
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
goto
_err
;
}
pBufPool
->
bufBlockList
=
tdListNew
(
sizeof
(
STsdbBufBlock
*
));
if
(
pBufPool
->
bufBlockList
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_err
;
}
return
pBufPool
;
_err:
tsdbFreeBufPool
(
pBufPool
);
return
NULL
;
}
void
tsdbFreeBufPool
(
STsdbBufPool
*
pBufPool
)
{
if
(
pBufPool
)
{
if
(
pBufPool
->
bufBlockList
)
{
ASSERT
(
listNEles
(
pBufPool
->
bufBlockList
)
==
0
);
tdListFree
(
pBufPool
->
bufBlockList
);
}
pthread_cond_destroy
(
&
pBufPool
->
poolNotEmpty
);
free
(
pBufPool
);
}
}
int
tsdbOpenBufPool
(
STsdbRepo
*
pRepo
)
{
STsdbCfg
*
pCfg
=
&
(
pRepo
->
config
);
STsdbBufPool
*
pPool
=
pRepo
->
pPool
;
ASSERT
(
pPool
!=
NULL
);
pPool
->
bufBlockSize
=
pCfg
->
cacheBlockSize
*
1024
*
1024
;
// MB
pPool
->
tBufBlocks
=
pCfg
->
totalBlocks
;
pPool
->
nBufBlocks
=
0
;
pPool
->
nElasticBlocks
=
0
;
pPool
->
index
=
0
;
pPool
->
nRecycleBlocks
=
0
;
for
(
int
i
=
0
;
i
<
pCfg
->
totalBlocks
;
i
++
)
{
STsdbBufBlock
*
pBufBlock
=
tsdbNewBufBlock
(
pPool
->
bufBlockSize
);
if
(
pBufBlock
==
NULL
)
goto
_err
;
if
(
tdListAppend
(
pPool
->
bufBlockList
,
(
void
*
)(
&
pBufBlock
))
<
0
)
{
tsdbFreeBufBlock
(
pBufBlock
);
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_err
;
}
pPool
->
nBufBlocks
++
;
}
tsdbDebug
(
"vgId:%d buffer pool is opened! bufBlockSize:%d tBufBlocks:%d nBufBlocks:%d"
,
REPO_ID
(
pRepo
),
pPool
->
bufBlockSize
,
pPool
->
tBufBlocks
,
pPool
->
nBufBlocks
);
return
0
;
_err:
tsdbCloseBufPool
(
pRepo
);
return
-
1
;
}
void
tsdbCloseBufPool
(
STsdbRepo
*
pRepo
)
{
if
(
pRepo
==
NULL
)
return
;
STsdbBufPool
*
pBufPool
=
pRepo
->
pPool
;
STsdbBufBlock
*
pBufBlock
=
NULL
;
if
(
pBufPool
)
{
SListNode
*
pNode
=
NULL
;
while
((
pNode
=
tdListPopHead
(
pBufPool
->
bufBlockList
))
!=
NULL
)
{
tdListNodeGetData
(
pBufPool
->
bufBlockList
,
pNode
,
(
void
*
)(
&
pBufBlock
));
tsdbFreeBufBlock
(
pBufBlock
);
free
(
pNode
);
}
}
tsdbDebug
(
"vgId:%d, buffer pool is closed"
,
REPO_ID
(
pRepo
));
}
SListNode
*
tsdbAllocBufBlockFromPool
(
STsdbRepo
*
pRepo
)
{
ASSERT
(
pRepo
!=
NULL
&&
pRepo
->
pPool
!=
NULL
);
ASSERT
(
IS_REPO_LOCKED
(
pRepo
));
STsdbBufPool
*
pBufPool
=
pRepo
->
pPool
;
while
(
POOL_IS_EMPTY
(
pBufPool
))
{
if
(
tsDeadLockKillQuery
)
{
// supply new Block
if
(
tsdbInsertNewBlock
(
pRepo
)
>
0
)
{
tsdbWarn
(
"vgId:%d add new elastic block . elasticBlocks=%d cur free Blocks=%d"
,
REPO_ID
(
pRepo
),
pBufPool
->
nElasticBlocks
,
pBufPool
->
bufBlockList
->
numOfEles
);
break
;
}
else
{
// no newBlock, kill query free
if
(
!
tsdbUrgeQueryFree
(
pRepo
))
tsdbWarn
(
"vgId:%d Urge query free thread start failed."
,
REPO_ID
(
pRepo
));
}
}
pRepo
->
repoLocked
=
false
;
pthread_cond_wait
(
&
(
pBufPool
->
poolNotEmpty
),
&
(
pRepo
->
mutex
));
pRepo
->
repoLocked
=
true
;
}
SListNode
*
pNode
=
tdListPopHead
(
pBufPool
->
bufBlockList
);
ASSERT
(
pNode
!=
NULL
);
STsdbBufBlock
*
pBufBlock
=
NULL
;
tdListNodeGetData
(
pBufPool
->
bufBlockList
,
pNode
,
(
void
*
)(
&
pBufBlock
));
pBufBlock
->
blockId
=
pBufPool
->
index
++
;
pBufBlock
->
offset
=
0
;
pBufBlock
->
remain
=
pBufPool
->
bufBlockSize
;
tsdbDebug
(
"vgId:%d, buffer block is allocated, blockId:%"
PRId64
,
REPO_ID
(
pRepo
),
pBufBlock
->
blockId
);
return
pNode
;
}
// ---------------- LOCAL FUNCTIONS ----------------
STsdbBufBlock
*
tsdbNewBufBlock
(
int
bufBlockSize
)
{
STsdbBufBlock
*
pBufBlock
=
(
STsdbBufBlock
*
)
malloc
(
sizeof
(
*
pBufBlock
)
+
bufBlockSize
);
if
(
pBufBlock
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
NULL
;
}
pBufBlock
->
blockId
=
0
;
pBufBlock
->
offset
=
0
;
pBufBlock
->
remain
=
bufBlockSize
;
return
pBufBlock
;
}
void
tsdbFreeBufBlock
(
STsdbBufBlock
*
pBufBlock
)
{
tfree
(
pBufBlock
);
}
int
tsdbExpandPool
(
STsdbRepo
*
pRepo
,
int32_t
oldTotalBlocks
)
{
if
(
oldTotalBlocks
==
pRepo
->
config
.
totalBlocks
)
{
return
TSDB_CODE_SUCCESS
;
}
int
err
=
TSDB_CODE_SUCCESS
;
if
(
tsdbLockRepo
(
pRepo
)
<
0
)
return
terrno
;
STsdbBufPool
*
pPool
=
pRepo
->
pPool
;
if
(
pRepo
->
config
.
totalBlocks
>
oldTotalBlocks
)
{
for
(
int
i
=
0
;
i
<
pRepo
->
config
.
totalBlocks
-
oldTotalBlocks
;
i
++
)
{
STsdbBufBlock
*
pBufBlock
=
tsdbNewBufBlock
(
pPool
->
bufBlockSize
);
if
(
pBufBlock
==
NULL
)
goto
err
;
if
(
tdListAppend
(
pPool
->
bufBlockList
,
(
void
*
)(
&
pBufBlock
))
<
0
)
{
tsdbFreeBufBlock
(
pBufBlock
);
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
err
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
err
;
}
pPool
->
nBufBlocks
++
;
}
pthread_cond_signal
(
&
pPool
->
poolNotEmpty
);
}
else
{
pPool
->
nRecycleBlocks
=
oldTotalBlocks
-
pRepo
->
config
.
totalBlocks
;
}
err:
tsdbUnlockRepo
(
pRepo
);
return
err
;
}
void
tsdbRecycleBufferBlock
(
STsdbBufPool
*
pPool
,
SListNode
*
pNode
,
bool
bELastic
)
{
STsdbBufBlock
*
pBufBlock
=
NULL
;
tdListNodeGetData
(
pPool
->
bufBlockList
,
pNode
,
(
void
*
)(
&
pBufBlock
));
tsdbFreeBufBlock
(
pBufBlock
);
free
(
pNode
);
if
(
bELastic
)
{
pPool
->
nElasticBlocks
--
;
tsdbWarn
(
"pPool=%p elastic block reduce one . nElasticBlocks=%d cur free Blocks=%d"
,
pPool
,
pPool
->
nElasticBlocks
,
pPool
->
bufBlockList
->
numOfEles
);
}
else
pPool
->
nBufBlocks
--
;
}
\ No newline at end of file
src/tsdb/src/tsdbMemTable.c
已删除
100644 → 0
浏览文件 @
2e643995
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdataformat.h"
#include "tfunctional.h"
#include "tsdbint.h"
#include "tskiplist.h"
#include "tsdbRowMergeBuf.h"
#define TSDB_DATA_SKIPLIST_LEVEL 5
#define TSDB_MAX_INSERT_BATCH 512
typedef
struct
{
int32_t
totalLen
;
int32_t
len
;
SMemRow
row
;
}
SSubmitBlkIter
;
typedef
struct
{
int32_t
totalLen
;
int32_t
len
;
void
*
pMsg
;
}
SSubmitMsgIter
;
static
SMemTable
*
tsdbNewMemTable
(
STsdbRepo
*
pRepo
);
static
void
tsdbFreeMemTable
(
SMemTable
*
pMemTable
);
static
STableData
*
tsdbNewTableData
(
STsdbCfg
*
pCfg
,
STable
*
pTable
);
static
void
tsdbFreeTableData
(
STableData
*
pTableData
);
static
char
*
tsdbGetTsTupleKey
(
const
void
*
data
);
static
int
tsdbAdjustMemMaxTables
(
SMemTable
*
pMemTable
,
int
maxTables
);
static
int
tsdbAppendTableRowToCols
(
STable
*
pTable
,
SDataCols
*
pCols
,
STSchema
**
ppSchema
,
SMemRow
row
);
static
int
tsdbInitSubmitBlkIter
(
SSubmitBlk
*
pBlock
,
SSubmitBlkIter
*
pIter
);
static
SMemRow
tsdbGetSubmitBlkNext
(
SSubmitBlkIter
*
pIter
);
static
int
tsdbScanAndConvertSubmitMsg
(
STsdbRepo
*
pRepo
,
SSubmitMsg
*
pMsg
);
static
int
tsdbInsertDataToTable
(
STsdbRepo
*
pRepo
,
SSubmitBlk
*
pBlock
,
int32_t
*
affectedrows
);
static
int
tsdbInitSubmitMsgIter
(
SSubmitMsg
*
pMsg
,
SSubmitMsgIter
*
pIter
);
static
int
tsdbGetSubmitMsgNext
(
SSubmitMsgIter
*
pIter
,
SSubmitBlk
**
pPBlock
);
static
int
tsdbCheckTableSchema
(
STsdbRepo
*
pRepo
,
SSubmitBlk
*
pBlock
,
STable
*
pTable
);
static
int
tsdbUpdateTableLatestInfo
(
STsdbRepo
*
pRepo
,
STable
*
pTable
,
SMemRow
row
);
static
FORCE_INLINE
int
tsdbCheckRowRange
(
STsdbRepo
*
pRepo
,
STable
*
pTable
,
SMemRow
row
,
TSKEY
minKey
,
TSKEY
maxKey
,
TSKEY
now
);
int32_t
tsdbInsertData
(
STsdbRepo
*
repo
,
SSubmitMsg
*
pMsg
,
SShellSubmitRspMsg
*
pRsp
)
{
STsdbRepo
*
pRepo
=
repo
;
SSubmitMsgIter
msgIter
=
{
0
};
SSubmitBlk
*
pBlock
=
NULL
;
int32_t
affectedrows
=
0
;
if
(
tsdbScanAndConvertSubmitMsg
(
pRepo
,
pMsg
)
<
0
)
{
if
(
terrno
!=
TSDB_CODE_TDB_TABLE_RECONFIGURE
)
{
tsdbError
(
"vgId:%d failed to insert data since %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
}
return
-
1
;
}
tsdbInitSubmitMsgIter
(
pMsg
,
&
msgIter
);
while
(
true
)
{
tsdbGetSubmitMsgNext
(
&
msgIter
,
&
pBlock
);
if
(
pBlock
==
NULL
)
break
;
if
(
tsdbInsertDataToTable
(
pRepo
,
pBlock
,
&
affectedrows
)
<
0
)
{
return
-
1
;
}
}
if
(
pRsp
!=
NULL
)
pRsp
->
affectedRows
=
htonl
(
affectedrows
);
if
(
tsdbCheckCommit
(
pRepo
)
<
0
)
return
-
1
;
return
0
;
}
// ---------------- INTERNAL FUNCTIONS ----------------
int
tsdbRefMemTable
(
STsdbRepo
*
pRepo
,
SMemTable
*
pMemTable
)
{
if
(
pMemTable
==
NULL
)
return
0
;
int
ref
=
T_REF_INC
(
pMemTable
);
tsdbDebug
(
"vgId:%d ref memtable %p ref %d"
,
REPO_ID
(
pRepo
),
pMemTable
,
ref
);
return
0
;
}
// Need to lock the repository
int
tsdbUnRefMemTable
(
STsdbRepo
*
pRepo
,
SMemTable
*
pMemTable
)
{
if
(
pMemTable
==
NULL
)
return
0
;
int
ref
=
T_REF_DEC
(
pMemTable
);
tsdbDebug
(
"vgId:%d unref memtable %p ref %d"
,
REPO_ID
(
pRepo
),
pMemTable
,
ref
);
if
(
ref
==
0
)
{
STsdbBufPool
*
pBufPool
=
pRepo
->
pPool
;
SListNode
*
pNode
=
NULL
;
bool
addNew
=
false
;
if
(
tsdbLockRepo
(
pRepo
)
<
0
)
return
-
1
;
while
((
pNode
=
tdListPopHead
(
pMemTable
->
bufBlockList
))
!=
NULL
)
{
if
(
pBufPool
->
nRecycleBlocks
>
0
)
{
tsdbRecycleBufferBlock
(
pBufPool
,
pNode
,
false
);
pBufPool
->
nRecycleBlocks
-=
1
;
}
else
{
if
(
pBufPool
->
nElasticBlocks
>
0
&&
listNEles
(
pBufPool
->
bufBlockList
)
>
2
)
{
tsdbRecycleBufferBlock
(
pBufPool
,
pNode
,
true
);
}
else
{
tdListAppendNode
(
pBufPool
->
bufBlockList
,
pNode
);
addNew
=
true
;
}
}
}
if
(
addNew
)
{
int
code
=
pthread_cond_signal
(
&
pBufPool
->
poolNotEmpty
);
if
(
code
!=
0
)
{
if
(
tsdbUnlockRepo
(
pRepo
)
<
0
)
return
-
1
;
tsdbError
(
"vgId:%d failed to signal pool not empty since %s"
,
REPO_ID
(
pRepo
),
strerror
(
code
));
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
return
-
1
;
}
}
if
(
tsdbUnlockRepo
(
pRepo
)
<
0
)
return
-
1
;
for
(
int
i
=
0
;
i
<
pMemTable
->
maxTables
;
i
++
)
{
if
(
pMemTable
->
tData
[
i
]
!=
NULL
)
{
tsdbFreeTableData
(
pMemTable
->
tData
[
i
]);
}
}
tdListDiscard
(
pMemTable
->
actList
);
tdListDiscard
(
pMemTable
->
bufBlockList
);
tsdbFreeMemTable
(
pMemTable
);
}
return
0
;
}
int
tsdbTakeMemSnapshot
(
STsdbRepo
*
pRepo
,
SMemSnapshot
*
pSnapshot
,
SArray
*
pATable
)
{
memset
(
pSnapshot
,
0
,
sizeof
(
*
pSnapshot
));
if
(
tsdbLockRepo
(
pRepo
)
<
0
)
return
-
1
;
pSnapshot
->
omem
=
pRepo
->
mem
;
pSnapshot
->
imem
=
pRepo
->
imem
;
tsdbRefMemTable
(
pRepo
,
pRepo
->
mem
);
tsdbRefMemTable
(
pRepo
,
pRepo
->
imem
);
if
(
tsdbUnlockRepo
(
pRepo
)
<
0
)
return
-
1
;
if
(
pSnapshot
->
omem
)
{
taosRLockLatch
(
&
(
pSnapshot
->
omem
->
latch
));
pSnapshot
->
mem
=
&
(
pSnapshot
->
mtable
);
pSnapshot
->
mem
->
tData
=
(
STableData
**
)
calloc
(
pSnapshot
->
omem
->
maxTables
,
sizeof
(
STableData
*
));
if
(
pSnapshot
->
mem
->
tData
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
taosRUnLockLatch
(
&
(
pSnapshot
->
omem
->
latch
));
tsdbUnRefMemTable
(
pRepo
,
pSnapshot
->
omem
);
tsdbUnRefMemTable
(
pRepo
,
pSnapshot
->
imem
);
pSnapshot
->
mem
=
NULL
;
pSnapshot
->
imem
=
NULL
;
pSnapshot
->
omem
=
NULL
;
return
-
1
;
}
pSnapshot
->
mem
->
keyFirst
=
pSnapshot
->
omem
->
keyFirst
;
pSnapshot
->
mem
->
keyLast
=
pSnapshot
->
omem
->
keyLast
;
pSnapshot
->
mem
->
numOfRows
=
pSnapshot
->
omem
->
numOfRows
;
pSnapshot
->
mem
->
maxTables
=
pSnapshot
->
omem
->
maxTables
;
for
(
size_t
i
=
0
;
i
<
taosArrayGetSize
(
pATable
);
i
++
)
{
STable
*
pTable
=
*
(
STable
**
)
taosArrayGet
(
pATable
,
i
);
int32_t
tid
=
TABLE_TID
(
pTable
);
STableData
*
pTableData
=
(
tid
<
pSnapshot
->
omem
->
maxTables
)
?
pSnapshot
->
omem
->
tData
[
tid
]
:
NULL
;
if
((
pTableData
==
NULL
)
||
(
TABLE_UID
(
pTable
)
!=
pTableData
->
uid
))
continue
;
pSnapshot
->
mem
->
tData
[
tid
]
=
pTableData
;
T_REF_INC
(
pTableData
);
}
taosRUnLockLatch
(
&
(
pSnapshot
->
omem
->
latch
));
}
tsdbDebug
(
"vgId:%d take memory snapshot, pMem %p pIMem %p"
,
REPO_ID
(
pRepo
),
pSnapshot
->
omem
,
pSnapshot
->
imem
);
return
0
;
}
void
tsdbUnTakeMemSnapShot
(
STsdbRepo
*
pRepo
,
SMemSnapshot
*
pSnapshot
)
{
tsdbDebug
(
"vgId:%d untake memory snapshot, pMem %p pIMem %p"
,
REPO_ID
(
pRepo
),
pSnapshot
->
omem
,
pSnapshot
->
imem
);
if
(
pSnapshot
->
mem
)
{
ASSERT
(
pSnapshot
->
omem
!=
NULL
);
for
(
size_t
i
=
0
;
i
<
pSnapshot
->
mem
->
maxTables
;
i
++
)
{
STableData
*
pTableData
=
pSnapshot
->
mem
->
tData
[
i
];
if
(
pTableData
)
{
tsdbFreeTableData
(
pTableData
);
}
}
tfree
(
pSnapshot
->
mem
->
tData
);
tsdbUnRefMemTable
(
pRepo
,
pSnapshot
->
omem
);
}
tsdbUnRefMemTable
(
pRepo
,
pSnapshot
->
imem
);
pSnapshot
->
mem
=
NULL
;
pSnapshot
->
imem
=
NULL
;
pSnapshot
->
omem
=
NULL
;
}
void
*
tsdbAllocBytes
(
STsdbRepo
*
pRepo
,
int
bytes
)
{
STsdbCfg
*
pCfg
=
&
pRepo
->
config
;
STsdbBufBlock
*
pBufBlock
=
NULL
;
void
*
ptr
=
NULL
;
// Either allocate from buffer blocks or from SYSTEM memory pool
if
(
pRepo
->
mem
==
NULL
)
{
SMemTable
*
pMemTable
=
tsdbNewMemTable
(
pRepo
);
if
(
pMemTable
==
NULL
)
return
NULL
;
pRepo
->
mem
=
pMemTable
;
}
ASSERT
(
pRepo
->
mem
!=
NULL
);
pBufBlock
=
tsdbGetCurrBufBlock
(
pRepo
);
if
((
pRepo
->
mem
->
extraBuffList
!=
NULL
)
||
((
listNEles
(
pRepo
->
mem
->
bufBlockList
)
>=
pCfg
->
totalBlocks
/
3
)
&&
(
pBufBlock
->
remain
<
bytes
)))
{
// allocate from SYSTEM buffer pool
if
(
pRepo
->
mem
->
extraBuffList
==
NULL
)
{
pRepo
->
mem
->
extraBuffList
=
tdListNew
(
0
);
if
(
pRepo
->
mem
->
extraBuffList
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
NULL
;
}
}
ASSERT
(
pRepo
->
mem
->
extraBuffList
!=
NULL
);
SListNode
*
pNode
=
(
SListNode
*
)
malloc
(
sizeof
(
SListNode
)
+
bytes
);
if
(
pNode
==
NULL
)
{
if
(
listNEles
(
pRepo
->
mem
->
extraBuffList
)
==
0
)
{
tdListFree
(
pRepo
->
mem
->
extraBuffList
);
pRepo
->
mem
->
extraBuffList
=
NULL
;
}
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
NULL
;
}
pNode
->
next
=
pNode
->
prev
=
NULL
;
tdListAppendNode
(
pRepo
->
mem
->
extraBuffList
,
pNode
);
ptr
=
(
void
*
)(
pNode
->
data
);
tsdbTrace
(
"vgId:%d allocate %d bytes from SYSTEM buffer block"
,
REPO_ID
(
pRepo
),
bytes
);
}
else
{
// allocate from TSDB buffer pool
if
(
pBufBlock
==
NULL
||
pBufBlock
->
remain
<
bytes
)
{
ASSERT
(
listNEles
(
pRepo
->
mem
->
bufBlockList
)
<
pCfg
->
totalBlocks
/
3
);
if
(
tsdbLockRepo
(
pRepo
)
<
0
)
return
NULL
;
SListNode
*
pNode
=
tsdbAllocBufBlockFromPool
(
pRepo
);
tdListAppendNode
(
pRepo
->
mem
->
bufBlockList
,
pNode
);
if
(
tsdbUnlockRepo
(
pRepo
)
<
0
)
return
NULL
;
pBufBlock
=
tsdbGetCurrBufBlock
(
pRepo
);
}
ASSERT
(
pBufBlock
->
remain
>=
bytes
);
ptr
=
POINTER_SHIFT
(
pBufBlock
->
data
,
pBufBlock
->
offset
);
pBufBlock
->
offset
+=
bytes
;
pBufBlock
->
remain
-=
bytes
;
tsdbTrace
(
"vgId:%d allocate %d bytes from TSDB buffer block, nBlocks %d offset %d remain %d"
,
REPO_ID
(
pRepo
),
bytes
,
listNEles
(
pRepo
->
mem
->
bufBlockList
),
pBufBlock
->
offset
,
pBufBlock
->
remain
);
}
return
ptr
;
}
int
tsdbSyncCommitConfig
(
STsdbRepo
*
pRepo
)
{
ASSERT
(
pRepo
->
config_changed
==
true
);
tsem_wait
(
&
(
pRepo
->
readyToCommit
));
if
(
pRepo
->
code
!=
TSDB_CODE_SUCCESS
)
{
tsdbWarn
(
"vgId:%d try to commit config when TSDB not in good state: %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
}
if
(
tsdbLockRepo
(
pRepo
)
<
0
)
return
-
1
;
tsdbScheduleCommit
(
pRepo
,
COMMIT_CONFIG_REQ
);
if
(
tsdbUnlockRepo
(
pRepo
)
<
0
)
return
-
1
;
tsem_wait
(
&
(
pRepo
->
readyToCommit
));
tsem_post
(
&
(
pRepo
->
readyToCommit
));
if
(
pRepo
->
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
pRepo
->
code
;
return
-
1
;
}
terrno
=
TSDB_CODE_SUCCESS
;
return
0
;
}
int
tsdbAsyncCommit
(
STsdbRepo
*
pRepo
)
{
tsem_wait
(
&
(
pRepo
->
readyToCommit
));
ASSERT
(
pRepo
->
imem
==
NULL
);
if
(
pRepo
->
mem
==
NULL
)
{
tsem_post
(
&
(
pRepo
->
readyToCommit
));
return
0
;
}
if
(
pRepo
->
code
!=
TSDB_CODE_SUCCESS
)
{
tsdbWarn
(
"vgId:%d try to commit when TSDB not in good state: %s"
,
REPO_ID
(
pRepo
),
tstrerror
(
terrno
));
}
if
(
pRepo
->
appH
.
notifyStatus
)
pRepo
->
appH
.
notifyStatus
(
pRepo
->
appH
.
appH
,
TSDB_STATUS_COMMIT_START
,
TSDB_CODE_SUCCESS
);
if
(
tsdbLockRepo
(
pRepo
)
<
0
)
return
-
1
;
pRepo
->
imem
=
pRepo
->
mem
;
pRepo
->
mem
=
NULL
;
tsdbScheduleCommit
(
pRepo
,
COMMIT_REQ
);
if
(
tsdbUnlockRepo
(
pRepo
)
<
0
)
return
-
1
;
return
0
;
}
int
tsdbSyncCommit
(
STsdbRepo
*
repo
)
{
STsdbRepo
*
pRepo
=
repo
;
tsdbAsyncCommit
(
pRepo
);
tsem_wait
(
&
(
pRepo
->
readyToCommit
));
tsem_post
(
&
(
pRepo
->
readyToCommit
));
if
(
pRepo
->
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
pRepo
->
code
;
return
-
1
;
}
else
{
terrno
=
TSDB_CODE_SUCCESS
;
return
0
;
}
}
/**
* This is an important function to load data or try to load data from memory skiplist iterator.
*
* This function load memory data until:
* 1. iterator ends
* 2. data key exceeds maxKey
* 3. rowsIncreased = rowsInserted - rowsDeleteSucceed >= maxRowsToRead
* 4. operations in pCols not exceeds its max capacity if pCols is given
*
* The function tries to procceed AS MUCH AS POSSIBLE.
*/
int
tsdbLoadDataFromCache
(
STable
*
pTable
,
SSkipListIterator
*
pIter
,
TSKEY
maxKey
,
int
maxRowsToRead
,
SDataCols
*
pCols
,
TKEY
*
filterKeys
,
int
nFilterKeys
,
bool
keepDup
,
SMergeInfo
*
pMergeInfo
)
{
ASSERT
(
maxRowsToRead
>
0
&&
nFilterKeys
>=
0
);
if
(
pIter
==
NULL
)
return
0
;
STSchema
*
pSchema
=
NULL
;
TSKEY
rowKey
=
0
;
TSKEY
fKey
=
0
;
bool
isRowDel
=
false
;
int
filterIter
=
0
;
SMemRow
row
=
NULL
;
SMergeInfo
mInfo
;
if
(
pMergeInfo
==
NULL
)
pMergeInfo
=
&
mInfo
;
memset
(
pMergeInfo
,
0
,
sizeof
(
*
pMergeInfo
));
pMergeInfo
->
keyFirst
=
INT64_MAX
;
pMergeInfo
->
keyLast
=
INT64_MIN
;
if
(
pCols
)
tdResetDataCols
(
pCols
);
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
||
memRowKey
(
row
)
>
maxKey
)
{
rowKey
=
INT64_MAX
;
isRowDel
=
false
;
}
else
{
rowKey
=
memRowKey
(
row
);
isRowDel
=
memRowDeleted
(
row
);
}
if
(
filterIter
>=
nFilterKeys
)
{
fKey
=
INT64_MAX
;
}
else
{
fKey
=
tdGetKey
(
filterKeys
[
filterIter
]);
}
while
(
true
)
{
if
(
fKey
==
INT64_MAX
&&
rowKey
==
INT64_MAX
)
break
;
if
(
fKey
<
rowKey
)
{
pMergeInfo
->
keyFirst
=
MIN
(
pMergeInfo
->
keyFirst
,
fKey
);
pMergeInfo
->
keyLast
=
MAX
(
pMergeInfo
->
keyLast
,
fKey
);
filterIter
++
;
if
(
filterIter
>=
nFilterKeys
)
{
fKey
=
INT64_MAX
;
}
else
{
fKey
=
tdGetKey
(
filterKeys
[
filterIter
]);
}
}
else
if
(
fKey
>
rowKey
)
{
if
(
isRowDel
)
{
pMergeInfo
->
rowsDeleteFailed
++
;
}
else
{
if
(
pMergeInfo
->
rowsInserted
-
pMergeInfo
->
rowsDeleteSucceed
>=
maxRowsToRead
)
break
;
if
(
pCols
&&
pMergeInfo
->
nOperations
>=
pCols
->
maxPoints
)
break
;
pMergeInfo
->
rowsInserted
++
;
pMergeInfo
->
nOperations
++
;
pMergeInfo
->
keyFirst
=
MIN
(
pMergeInfo
->
keyFirst
,
rowKey
);
pMergeInfo
->
keyLast
=
MAX
(
pMergeInfo
->
keyLast
,
rowKey
);
tsdbAppendTableRowToCols
(
pTable
,
pCols
,
&
pSchema
,
row
);
}
tSkipListIterNext
(
pIter
);
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
||
memRowKey
(
row
)
>
maxKey
)
{
rowKey
=
INT64_MAX
;
isRowDel
=
false
;
}
else
{
rowKey
=
memRowKey
(
row
);
isRowDel
=
memRowDeleted
(
row
);
}
}
else
{
if
(
isRowDel
)
{
ASSERT
(
!
keepDup
);
if
(
pCols
&&
pMergeInfo
->
nOperations
>=
pCols
->
maxPoints
)
break
;
pMergeInfo
->
rowsDeleteSucceed
++
;
pMergeInfo
->
nOperations
++
;
tsdbAppendTableRowToCols
(
pTable
,
pCols
,
&
pSchema
,
row
);
}
else
{
if
(
keepDup
)
{
if
(
pCols
&&
pMergeInfo
->
nOperations
>=
pCols
->
maxPoints
)
break
;
pMergeInfo
->
rowsUpdated
++
;
pMergeInfo
->
nOperations
++
;
pMergeInfo
->
keyFirst
=
MIN
(
pMergeInfo
->
keyFirst
,
rowKey
);
pMergeInfo
->
keyLast
=
MAX
(
pMergeInfo
->
keyLast
,
rowKey
);
tsdbAppendTableRowToCols
(
pTable
,
pCols
,
&
pSchema
,
row
);
}
else
{
pMergeInfo
->
keyFirst
=
MIN
(
pMergeInfo
->
keyFirst
,
fKey
);
pMergeInfo
->
keyLast
=
MAX
(
pMergeInfo
->
keyLast
,
fKey
);
}
}
tSkipListIterNext
(
pIter
);
row
=
tsdbNextIterRow
(
pIter
);
if
(
row
==
NULL
||
memRowKey
(
row
)
>
maxKey
)
{
rowKey
=
INT64_MAX
;
isRowDel
=
false
;
}
else
{
rowKey
=
memRowKey
(
row
);
isRowDel
=
memRowDeleted
(
row
);
}
filterIter
++
;
if
(
filterIter
>=
nFilterKeys
)
{
fKey
=
INT64_MAX
;
}
else
{
fKey
=
tdGetKey
(
filterKeys
[
filterIter
]);
}
}
}
return
0
;
}
// ---------------- LOCAL FUNCTIONS ----------------
static
SMemTable
*
tsdbNewMemTable
(
STsdbRepo
*
pRepo
)
{
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
SMemTable
*
pMemTable
=
(
SMemTable
*
)
calloc
(
1
,
sizeof
(
*
pMemTable
));
if
(
pMemTable
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_err
;
}
pMemTable
->
keyFirst
=
INT64_MAX
;
pMemTable
->
keyLast
=
0
;
pMemTable
->
numOfRows
=
0
;
pMemTable
->
maxTables
=
pMeta
->
maxTables
;
pMemTable
->
tData
=
(
STableData
**
)
calloc
(
pMemTable
->
maxTables
,
sizeof
(
STableData
*
));
if
(
pMemTable
->
tData
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_err
;
}
pMemTable
->
actList
=
tdListNew
(
0
);
if
(
pMemTable
->
actList
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_err
;
}
pMemTable
->
bufBlockList
=
tdListNew
(
sizeof
(
STsdbBufBlock
*
));
if
(
pMemTable
->
bufBlockList
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
goto
_err
;
}
T_REF_INC
(
pMemTable
);
return
pMemTable
;
_err:
tsdbFreeMemTable
(
pMemTable
);
return
NULL
;
}
static
void
tsdbFreeMemTable
(
SMemTable
*
pMemTable
)
{
if
(
pMemTable
)
{
ASSERT
((
pMemTable
->
bufBlockList
==
NULL
)
?
true
:
(
listNEles
(
pMemTable
->
bufBlockList
)
==
0
));
ASSERT
((
pMemTable
->
actList
==
NULL
)
?
true
:
(
listNEles
(
pMemTable
->
actList
)
==
0
));
tdListFree
(
pMemTable
->
extraBuffList
);
tdListFree
(
pMemTable
->
bufBlockList
);
tdListFree
(
pMemTable
->
actList
);
tfree
(
pMemTable
->
tData
);
free
(
pMemTable
);
}
}
static
STableData
*
tsdbNewTableData
(
STsdbCfg
*
pCfg
,
STable
*
pTable
)
{
STableData
*
pTableData
=
(
STableData
*
)
calloc
(
1
,
sizeof
(
*
pTableData
));
if
(
pTableData
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
NULL
;
}
pTableData
->
uid
=
TABLE_UID
(
pTable
);
pTableData
->
keyFirst
=
INT64_MAX
;
pTableData
->
keyLast
=
0
;
pTableData
->
numOfRows
=
0
;
uint8_t
skipListCreateFlags
;
if
(
pCfg
->
update
==
TD_ROW_DISCARD_UPDATE
)
skipListCreateFlags
=
SL_DISCARD_DUP_KEY
;
else
skipListCreateFlags
=
SL_UPDATE_DUP_KEY
;
pTableData
->
pData
=
tSkipListCreate
(
TSDB_DATA_SKIPLIST_LEVEL
,
TSDB_DATA_TYPE_TIMESTAMP
,
TYPE_BYTES
[
TSDB_DATA_TYPE_TIMESTAMP
],
tkeyComparFn
,
skipListCreateFlags
,
tsdbGetTsTupleKey
);
if
(
pTableData
->
pData
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
free
(
pTableData
);
return
NULL
;
}
T_REF_INC
(
pTableData
);
return
pTableData
;
}
static
void
tsdbFreeTableData
(
STableData
*
pTableData
)
{
if
(
pTableData
)
{
int32_t
ref
=
T_REF_DEC
(
pTableData
);
if
(
ref
==
0
)
{
tSkipListDestroy
(
pTableData
->
pData
);
free
(
pTableData
);
}
}
}
static
char
*
tsdbGetTsTupleKey
(
const
void
*
data
)
{
return
memRowTuple
((
SMemRow
)
data
);
}
static
int
tsdbAdjustMemMaxTables
(
SMemTable
*
pMemTable
,
int
maxTables
)
{
ASSERT
(
pMemTable
->
maxTables
<
maxTables
);
STableData
**
pTableData
=
(
STableData
**
)
calloc
(
maxTables
,
sizeof
(
STableData
*
));
if
(
pTableData
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
}
memcpy
((
void
*
)
pTableData
,
(
void
*
)
pMemTable
->
tData
,
sizeof
(
STableData
*
)
*
pMemTable
->
maxTables
);
STableData
**
tData
=
pMemTable
->
tData
;
taosWLockLatch
(
&
(
pMemTable
->
latch
));
pMemTable
->
maxTables
=
maxTables
;
pMemTable
->
tData
=
pTableData
;
taosWUnLockLatch
(
&
(
pMemTable
->
latch
));
tfree
(
tData
);
return
0
;
}
static
int
tsdbAppendTableRowToCols
(
STable
*
pTable
,
SDataCols
*
pCols
,
STSchema
**
ppSchema
,
SMemRow
row
)
{
if
(
pCols
)
{
if
(
*
ppSchema
==
NULL
||
schemaVersion
(
*
ppSchema
)
!=
memRowVersion
(
row
))
{
*
ppSchema
=
tsdbGetTableSchemaImpl
(
pTable
,
false
,
false
,
memRowVersion
(
row
));
if
(
*
ppSchema
==
NULL
)
{
ASSERT
(
false
);
return
-
1
;
}
}
tdAppendMemRowToDataCol
(
row
,
*
ppSchema
,
pCols
,
true
);
}
return
0
;
}
static
int
tsdbInitSubmitBlkIter
(
SSubmitBlk
*
pBlock
,
SSubmitBlkIter
*
pIter
)
{
if
(
pBlock
->
dataLen
<=
0
)
return
-
1
;
pIter
->
totalLen
=
pBlock
->
dataLen
;
pIter
->
len
=
0
;
pIter
->
row
=
(
SMemRow
)(
pBlock
->
data
+
pBlock
->
schemaLen
);
return
0
;
}
static
SMemRow
tsdbGetSubmitBlkNext
(
SSubmitBlkIter
*
pIter
)
{
SMemRow
row
=
pIter
->
row
;
// firstly, get current row
if
(
row
==
NULL
)
return
NULL
;
pIter
->
len
+=
memRowTLen
(
row
);
if
(
pIter
->
len
>=
pIter
->
totalLen
)
{
// reach the end
pIter
->
row
=
NULL
;
}
else
{
pIter
->
row
=
(
char
*
)
row
+
memRowTLen
(
row
);
// secondly, move to next row
}
return
row
;
}
static
FORCE_INLINE
int
tsdbCheckRowRange
(
STsdbRepo
*
pRepo
,
STable
*
pTable
,
SMemRow
row
,
TSKEY
minKey
,
TSKEY
maxKey
,
TSKEY
now
)
{
TSKEY
rowKey
=
memRowKey
(
row
);
if
(
rowKey
<
minKey
||
rowKey
>
maxKey
)
{
tsdbError
(
"vgId:%d table %s tid %d uid %"
PRIu64
" timestamp is out of range! now %"
PRId64
" minKey %"
PRId64
" maxKey %"
PRId64
" row key %"
PRId64
,
REPO_ID
(
pRepo
),
TABLE_CHAR_NAME
(
pTable
),
TABLE_TID
(
pTable
),
TABLE_UID
(
pTable
),
now
,
minKey
,
maxKey
,
rowKey
);
terrno
=
TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE
;
return
-
1
;
}
return
0
;
}
static
int
tsdbScanAndConvertSubmitMsg
(
STsdbRepo
*
pRepo
,
SSubmitMsg
*
pMsg
)
{
ASSERT
(
pMsg
!=
NULL
);
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
SSubmitMsgIter
msgIter
=
{
0
};
SSubmitBlk
*
pBlock
=
NULL
;
SSubmitBlkIter
blkIter
=
{
0
};
SMemRow
row
=
NULL
;
TSKEY
now
=
taosGetTimestamp
(
pRepo
->
config
.
precision
);
TSKEY
minKey
=
now
-
tsTickPerDay
[
pRepo
->
config
.
precision
]
*
pRepo
->
config
.
keep
;
TSKEY
maxKey
=
now
+
tsTickPerDay
[
pRepo
->
config
.
precision
]
*
pRepo
->
config
.
daysPerFile
;
terrno
=
TSDB_CODE_SUCCESS
;
pMsg
->
length
=
htonl
(
pMsg
->
length
);
pMsg
->
numOfBlocks
=
htonl
(
pMsg
->
numOfBlocks
);
if
(
tsdbInitSubmitMsgIter
(
pMsg
,
&
msgIter
)
<
0
)
return
-
1
;
while
(
true
)
{
if
(
tsdbGetSubmitMsgNext
(
&
msgIter
,
&
pBlock
)
<
0
)
return
-
1
;
if
(
pBlock
==
NULL
)
break
;
pBlock
->
uid
=
htobe64
(
pBlock
->
uid
);
pBlock
->
tid
=
htonl
(
pBlock
->
tid
);
pBlock
->
sversion
=
htonl
(
pBlock
->
sversion
);
pBlock
->
dataLen
=
htonl
(
pBlock
->
dataLen
);
pBlock
->
schemaLen
=
htonl
(
pBlock
->
schemaLen
);
pBlock
->
numOfRows
=
htons
(
pBlock
->
numOfRows
);
if
(
pBlock
->
tid
<=
0
||
pBlock
->
tid
>=
pMeta
->
maxTables
)
{
tsdbError
(
"vgId:%d failed to get table to insert data, uid %"
PRIu64
" tid %d"
,
REPO_ID
(
pRepo
),
pBlock
->
uid
,
pBlock
->
tid
);
terrno
=
TSDB_CODE_TDB_INVALID_TABLE_ID
;
return
-
1
;
}
STable
*
pTable
=
pMeta
->
tables
[
pBlock
->
tid
];
if
(
pTable
==
NULL
||
TABLE_UID
(
pTable
)
!=
pBlock
->
uid
)
{
tsdbError
(
"vgId:%d failed to get table to insert data, uid %"
PRIu64
" tid %d"
,
REPO_ID
(
pRepo
),
pBlock
->
uid
,
pBlock
->
tid
);
terrno
=
TSDB_CODE_TDB_INVALID_TABLE_ID
;
return
-
1
;
}
if
(
TABLE_TYPE
(
pTable
)
==
TSDB_SUPER_TABLE
)
{
tsdbError
(
"vgId:%d invalid action trying to insert a super table %s"
,
REPO_ID
(
pRepo
),
TABLE_CHAR_NAME
(
pTable
));
terrno
=
TSDB_CODE_TDB_INVALID_ACTION
;
return
-
1
;
}
// Check schema version and update schema if needed
if
(
tsdbCheckTableSchema
(
pRepo
,
pBlock
,
pTable
)
<
0
)
{
if
(
terrno
==
TSDB_CODE_TDB_TABLE_RECONFIGURE
)
{
continue
;
}
else
{
return
-
1
;
}
}
tsdbInitSubmitBlkIter
(
pBlock
,
&
blkIter
);
while
((
row
=
tsdbGetSubmitBlkNext
(
&
blkIter
))
!=
NULL
)
{
if
(
tsdbCheckRowRange
(
pRepo
,
pTable
,
row
,
minKey
,
maxKey
,
now
)
<
0
)
{
return
-
1
;
}
}
}
if
(
terrno
!=
TSDB_CODE_SUCCESS
)
return
-
1
;
return
0
;
}
//row1 has higher priority
static
SMemRow
tsdbInsertDupKeyMerge
(
SMemRow
row1
,
SMemRow
row2
,
STsdbRepo
*
pRepo
,
STSchema
**
ppSchema1
,
STSchema
**
ppSchema2
,
STable
*
pTable
,
int32_t
*
pPoints
,
SMemRow
*
pLastRow
)
{
//for compatiblity, duplicate key inserted when update=0 should be also calculated as affected rows!
if
(
row1
==
NULL
&&
row2
==
NULL
&&
pRepo
->
config
.
update
==
TD_ROW_DISCARD_UPDATE
)
{
(
*
pPoints
)
++
;
return
NULL
;
}
if
(
row2
==
NULL
||
pRepo
->
config
.
update
!=
TD_ROW_PARTIAL_UPDATE
)
{
void
*
pMem
=
tsdbAllocBytes
(
pRepo
,
memRowTLen
(
row1
));
if
(
pMem
==
NULL
)
return
NULL
;
memRowCpy
(
pMem
,
row1
);
(
*
pPoints
)
++
;
*
pLastRow
=
pMem
;
return
pMem
;
}
STSchema
*
pSchema1
=
*
ppSchema1
;
STSchema
*
pSchema2
=
*
ppSchema2
;
SMergeBuf
*
pBuf
=
&
pRepo
->
mergeBuf
;
int
dv1
=
memRowVersion
(
row1
);
int
dv2
=
memRowVersion
(
row2
);
if
(
pSchema1
==
NULL
||
schemaVersion
(
pSchema1
)
!=
dv1
)
{
if
(
pSchema2
!=
NULL
&&
schemaVersion
(
pSchema2
)
==
dv1
)
{
*
ppSchema1
=
pSchema2
;
}
else
{
*
ppSchema1
=
tsdbGetTableSchemaImpl
(
pTable
,
false
,
false
,
memRowVersion
(
row1
));
}
pSchema1
=
*
ppSchema1
;
}
if
(
pSchema2
==
NULL
||
schemaVersion
(
pSchema2
)
!=
dv2
)
{
if
(
schemaVersion
(
pSchema1
)
==
dv2
)
{
pSchema2
=
pSchema1
;
}
else
{
*
ppSchema2
=
tsdbGetTableSchemaImpl
(
pTable
,
false
,
false
,
memRowVersion
(
row2
));
pSchema2
=
*
ppSchema2
;
}
}
SMemRow
tmp
=
tsdbMergeTwoRows
(
pBuf
,
row1
,
row2
,
pSchema1
,
pSchema2
);
void
*
pMem
=
tsdbAllocBytes
(
pRepo
,
memRowTLen
(
tmp
));
if
(
pMem
==
NULL
)
return
NULL
;
memRowCpy
(
pMem
,
tmp
);
(
*
pPoints
)
++
;
*
pLastRow
=
pMem
;
return
pMem
;
}
static
void
*
tsdbInsertDupKeyMergePacked
(
void
**
args
)
{
return
tsdbInsertDupKeyMerge
(
args
[
0
],
args
[
1
],
args
[
2
],
(
STSchema
**
)
&
args
[
3
],
(
STSchema
**
)
&
args
[
4
],
args
[
5
],
args
[
6
],
args
[
7
]);
}
static
void
tsdbSetupSkipListHookFns
(
SSkipList
*
pSkipList
,
STsdbRepo
*
pRepo
,
STable
*
pTable
,
int32_t
*
pPoints
,
SMemRow
*
pLastRow
)
{
if
(
pSkipList
->
insertHandleFn
==
NULL
)
{
tGenericSavedFunc
*
dupHandleSavedFunc
=
genericSavedFuncInit
((
GenericVaFunc
)
&
tsdbInsertDupKeyMergePacked
,
9
);
dupHandleSavedFunc
->
args
[
2
]
=
pRepo
;
dupHandleSavedFunc
->
args
[
3
]
=
NULL
;
dupHandleSavedFunc
->
args
[
4
]
=
NULL
;
dupHandleSavedFunc
->
args
[
5
]
=
pTable
;
pSkipList
->
insertHandleFn
=
dupHandleSavedFunc
;
}
pSkipList
->
insertHandleFn
->
args
[
6
]
=
pPoints
;
pSkipList
->
insertHandleFn
->
args
[
7
]
=
pLastRow
;
}
static
int
tsdbInsertDataToTable
(
STsdbRepo
*
pRepo
,
SSubmitBlk
*
pBlock
,
int32_t
*
pAffectedRows
)
{
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
int32_t
points
=
0
;
STable
*
pTable
=
NULL
;
SSubmitBlkIter
blkIter
=
{
0
};
SMemTable
*
pMemTable
=
NULL
;
STableData
*
pTableData
=
NULL
;
STsdbCfg
*
pCfg
=
&
(
pRepo
->
config
);
tsdbInitSubmitBlkIter
(
pBlock
,
&
blkIter
);
if
(
blkIter
.
row
==
NULL
)
return
0
;
TSKEY
firstRowKey
=
memRowKey
(
blkIter
.
row
);
tsdbAllocBytes
(
pRepo
,
0
);
pMemTable
=
pRepo
->
mem
;
ASSERT
(
pMemTable
!=
NULL
);
ASSERT
(
pBlock
->
tid
<
pMeta
->
maxTables
);
pTable
=
pMeta
->
tables
[
pBlock
->
tid
];
ASSERT
(
pTable
!=
NULL
&&
TABLE_UID
(
pTable
)
==
pBlock
->
uid
);
if
(
TABLE_TID
(
pTable
)
>=
pMemTable
->
maxTables
)
{
if
(
tsdbAdjustMemMaxTables
(
pMemTable
,
pMeta
->
maxTables
)
<
0
)
{
return
-
1
;
}
}
pTableData
=
pMemTable
->
tData
[
TABLE_TID
(
pTable
)];
if
(
pTableData
==
NULL
||
pTableData
->
uid
!=
TABLE_UID
(
pTable
))
{
if
(
pTableData
!=
NULL
)
{
taosWLockLatch
(
&
(
pMemTable
->
latch
));
pMemTable
->
tData
[
TABLE_TID
(
pTable
)]
=
NULL
;
tsdbFreeTableData
(
pTableData
);
taosWUnLockLatch
(
&
(
pMemTable
->
latch
));
}
pTableData
=
tsdbNewTableData
(
pCfg
,
pTable
);
if
(
pTableData
==
NULL
)
{
tsdbError
(
"vgId:%d failed to insert data to table %s uid %"
PRId64
" tid %d since %s"
,
REPO_ID
(
pRepo
),
TABLE_CHAR_NAME
(
pTable
),
TABLE_UID
(
pTable
),
TABLE_TID
(
pTable
),
tstrerror
(
terrno
));
return
-
1
;
}
pRepo
->
mem
->
tData
[
TABLE_TID
(
pTable
)]
=
pTableData
;
}
ASSERT
((
pTableData
!=
NULL
)
&&
pTableData
->
uid
==
TABLE_UID
(
pTable
));
SMemRow
lastRow
=
NULL
;
int64_t
osize
=
SL_SIZE
(
pTableData
->
pData
);
tsdbSetupSkipListHookFns
(
pTableData
->
pData
,
pRepo
,
pTable
,
&
points
,
&
lastRow
);
tSkipListPutBatchByIter
(
pTableData
->
pData
,
&
blkIter
,
(
iter_next_fn_t
)
tsdbGetSubmitBlkNext
);
int64_t
dsize
=
SL_SIZE
(
pTableData
->
pData
)
-
osize
;
(
*
pAffectedRows
)
+=
points
;
if
(
lastRow
!=
NULL
)
{
TSKEY
lastRowKey
=
memRowKey
(
lastRow
);
if
(
pMemTable
->
keyFirst
>
firstRowKey
)
pMemTable
->
keyFirst
=
firstRowKey
;
pMemTable
->
numOfRows
+=
dsize
;
if
(
pTableData
->
keyFirst
>
firstRowKey
)
pTableData
->
keyFirst
=
firstRowKey
;
pTableData
->
numOfRows
+=
dsize
;
if
(
pMemTable
->
keyLast
<
lastRowKey
)
pMemTable
->
keyLast
=
lastRowKey
;
if
(
pTableData
->
keyLast
<
lastRowKey
)
pTableData
->
keyLast
=
lastRowKey
;
if
(
tsdbUpdateTableLatestInfo
(
pRepo
,
pTable
,
lastRow
)
<
0
)
{
return
-
1
;
}
}
STSchema
*
pSchema
=
tsdbGetTableSchemaByVersion
(
pTable
,
pBlock
->
sversion
);
pRepo
->
stat
.
pointsWritten
+=
points
*
schemaNCols
(
pSchema
);
pRepo
->
stat
.
totalStorage
+=
points
*
schemaVLen
(
pSchema
);
return
0
;
}
static
int
tsdbInitSubmitMsgIter
(
SSubmitMsg
*
pMsg
,
SSubmitMsgIter
*
pIter
)
{
if
(
pMsg
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP
;
return
-
1
;
}
pIter
->
totalLen
=
pMsg
->
length
;
pIter
->
len
=
0
;
pIter
->
pMsg
=
pMsg
;
if
(
pMsg
->
length
<=
TSDB_SUBMIT_MSG_HEAD_SIZE
)
{
terrno
=
TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP
;
return
-
1
;
}
return
0
;
}
static
int
tsdbGetSubmitMsgNext
(
SSubmitMsgIter
*
pIter
,
SSubmitBlk
**
pPBlock
)
{
if
(
pIter
->
len
==
0
)
{
pIter
->
len
+=
TSDB_SUBMIT_MSG_HEAD_SIZE
;
}
else
{
SSubmitBlk
*
pSubmitBlk
=
(
SSubmitBlk
*
)
POINTER_SHIFT
(
pIter
->
pMsg
,
pIter
->
len
);
pIter
->
len
+=
(
sizeof
(
SSubmitBlk
)
+
pSubmitBlk
->
dataLen
+
pSubmitBlk
->
schemaLen
);
}
if
(
pIter
->
len
>
pIter
->
totalLen
)
{
terrno
=
TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP
;
*
pPBlock
=
NULL
;
return
-
1
;
}
*
pPBlock
=
(
pIter
->
len
==
pIter
->
totalLen
)
?
NULL
:
(
SSubmitBlk
*
)
POINTER_SHIFT
(
pIter
->
pMsg
,
pIter
->
len
);
return
0
;
}
static
int
tsdbCheckTableSchema
(
STsdbRepo
*
pRepo
,
SSubmitBlk
*
pBlock
,
STable
*
pTable
)
{
ASSERT
(
pTable
!=
NULL
);
STSchema
*
pSchema
=
tsdbGetTableSchemaImpl
(
pTable
,
false
,
false
,
-
1
);
int
sversion
=
schemaVersion
(
pSchema
);
if
(
pBlock
->
sversion
==
sversion
)
{
return
0
;
}
else
{
if
(
TABLE_TYPE
(
pTable
)
==
TSDB_STREAM_TABLE
)
{
// stream table is not allowed to change schema
terrno
=
TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION
;
return
-
1
;
}
}
if
(
pBlock
->
sversion
>
sversion
)
{
// may need to update table schema
if
(
pBlock
->
schemaLen
>
0
)
{
tsdbDebug
(
"vgId:%d table %s tid %d uid %"
PRIu64
" schema version %d is out of data, client version %d, update..."
,
REPO_ID
(
pRepo
),
TABLE_CHAR_NAME
(
pTable
),
TABLE_TID
(
pTable
),
TABLE_UID
(
pTable
),
sversion
,
pBlock
->
sversion
);
ASSERT
(
pBlock
->
schemaLen
%
sizeof
(
STColumn
)
==
0
);
int
numOfCols
=
pBlock
->
schemaLen
/
sizeof
(
STColumn
);
STColumn
*
pTCol
=
(
STColumn
*
)
pBlock
->
data
;
STSchemaBuilder
schemaBuilder
=
{
0
};
if
(
tdInitTSchemaBuilder
(
&
schemaBuilder
,
pBlock
->
sversion
)
<
0
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
tsdbError
(
"vgId:%d failed to update schema of table %s since %s"
,
REPO_ID
(
pRepo
),
TABLE_CHAR_NAME
(
pTable
),
tstrerror
(
terrno
));
return
-
1
;
}
for
(
int
i
=
0
;
i
<
numOfCols
;
i
++
)
{
if
(
tdAddColToSchema
(
&
schemaBuilder
,
pTCol
[
i
].
type
,
htons
(
pTCol
[
i
].
colId
),
htons
(
pTCol
[
i
].
bytes
))
<
0
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
tsdbError
(
"vgId:%d failed to update schema of table %s since %s"
,
REPO_ID
(
pRepo
),
TABLE_CHAR_NAME
(
pTable
),
tstrerror
(
terrno
));
tdDestroyTSchemaBuilder
(
&
schemaBuilder
);
return
-
1
;
}
}
STSchema
*
pNSchema
=
tdGetSchemaFromBuilder
(
&
schemaBuilder
);
if
(
pNSchema
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
tdDestroyTSchemaBuilder
(
&
schemaBuilder
);
return
-
1
;
}
tdDestroyTSchemaBuilder
(
&
schemaBuilder
);
tsdbUpdateTableSchema
(
pRepo
,
pTable
,
pNSchema
,
true
);
}
else
{
tsdbDebug
(
"vgId:%d table %s tid %d uid %"
PRIu64
" schema version %d is out of data, client version %d, reconfigure..."
,
REPO_ID
(
pRepo
),
TABLE_CHAR_NAME
(
pTable
),
TABLE_TID
(
pTable
),
TABLE_UID
(
pTable
),
sversion
,
pBlock
->
sversion
);
terrno
=
TSDB_CODE_TDB_TABLE_RECONFIGURE
;
return
-
1
;
}
}
else
{
ASSERT
(
pBlock
->
sversion
>=
0
);
if
(
tsdbGetTableSchemaImpl
(
pTable
,
false
,
false
,
pBlock
->
sversion
)
==
NULL
)
{
tsdbError
(
"vgId:%d invalid submit schema version %d to table %s tid %d from client"
,
REPO_ID
(
pRepo
),
pBlock
->
sversion
,
TABLE_CHAR_NAME
(
pTable
),
TABLE_TID
(
pTable
));
terrno
=
TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION
;
return
-
1
;
}
}
return
0
;
}
static
void
updateTableLatestColumn
(
STsdbRepo
*
pRepo
,
STable
*
pTable
,
SMemRow
row
)
{
tsdbDebug
(
"vgId:%d updateTableLatestColumn, %s row version:%d"
,
REPO_ID
(
pRepo
),
pTable
->
name
->
data
,
memRowVersion
(
row
));
STSchema
*
pSchema
=
tsdbGetTableLatestSchema
(
pTable
);
if
(
tsdbUpdateLastColSchema
(
pTable
,
pSchema
)
<
0
)
{
return
;
}
pSchema
=
tsdbGetTableSchemaByVersion
(
pTable
,
memRowVersion
(
row
));
if
(
pSchema
==
NULL
)
{
return
;
}
SDataCol
*
pLatestCols
=
pTable
->
lastCols
;
int32_t
kvIdx
=
0
;
for
(
int16_t
j
=
0
;
j
<
schemaNCols
(
pSchema
);
j
++
)
{
STColumn
*
pTCol
=
schemaColAt
(
pSchema
,
j
);
// ignore not exist colId
int16_t
idx
=
tsdbGetLastColumnsIndexByColId
(
pTable
,
pTCol
->
colId
);
if
(
idx
==
-
1
)
{
continue
;
}
void
*
value
=
NULL
;
value
=
tdGetMemRowDataOfColEx
(
row
,
pTCol
->
colId
,
(
int8_t
)
pTCol
->
type
,
TD_DATA_ROW_HEAD_SIZE
+
pSchema
->
columns
[
j
].
offset
,
&
kvIdx
);
if
((
value
==
NULL
)
||
isNull
(
value
,
pTCol
->
type
))
{
continue
;
}
SDataCol
*
pDataCol
=
&
(
pLatestCols
[
idx
]);
if
(
pDataCol
->
pData
==
NULL
)
{
pDataCol
->
pData
=
malloc
(
pTCol
->
bytes
);
pDataCol
->
bytes
=
pTCol
->
bytes
;
}
else
if
(
pDataCol
->
bytes
<
pTCol
->
bytes
)
{
pDataCol
->
pData
=
realloc
(
pDataCol
->
pData
,
pTCol
->
bytes
);
pDataCol
->
bytes
=
pTCol
->
bytes
;
}
// the actual value size
uint16_t
bytes
=
IS_VAR_DATA_TYPE
(
pTCol
->
type
)
?
varDataTLen
(
value
)
:
pTCol
->
bytes
;
// the actual data size CANNOT larger than column size
assert
(
pTCol
->
bytes
>=
bytes
);
memcpy
(
pDataCol
->
pData
,
value
,
bytes
);
//tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes, (char*)pDataCol->pData);
pDataCol
->
ts
=
memRowKey
(
row
);
}
}
static
int
tsdbUpdateTableLatestInfo
(
STsdbRepo
*
pRepo
,
STable
*
pTable
,
SMemRow
row
)
{
STsdbCfg
*
pCfg
=
&
pRepo
->
config
;
// if cacheLastRow config has been reset, free the lastRow
if
(
!
pCfg
->
cacheLastRow
&&
pTable
->
lastRow
!=
NULL
)
{
SMemRow
cachedLastRow
=
pTable
->
lastRow
;
TSDB_WLOCK_TABLE
(
pTable
);
pTable
->
lastRow
=
NULL
;
TSDB_WUNLOCK_TABLE
(
pTable
);
taosTZfree
(
cachedLastRow
);
}
if
(
tsdbGetTableLastKeyImpl
(
pTable
)
<=
memRowKey
(
row
))
{
if
(
CACHE_LAST_ROW
(
pCfg
)
||
pTable
->
lastRow
!=
NULL
)
{
SMemRow
nrow
=
pTable
->
lastRow
;
if
(
taosTSizeof
(
nrow
)
<
memRowTLen
(
row
))
{
SMemRow
orow
=
nrow
;
nrow
=
taosTMalloc
(
memRowTLen
(
row
));
if
(
nrow
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
}
memRowCpy
(
nrow
,
row
);
TSDB_WLOCK_TABLE
(
pTable
);
pTable
->
lastKey
=
memRowKey
(
row
);
pTable
->
lastRow
=
nrow
;
TSDB_WUNLOCK_TABLE
(
pTable
);
taosTZfree
(
orow
);
}
else
{
TSDB_WLOCK_TABLE
(
pTable
);
pTable
->
lastKey
=
memRowKey
(
row
);
memRowCpy
(
nrow
,
row
);
TSDB_WUNLOCK_TABLE
(
pTable
);
}
}
else
{
pTable
->
lastKey
=
memRowKey
(
row
);
}
if
(
CACHE_LAST_NULL_COLUMN
(
pCfg
))
{
updateTableLatestColumn
(
pRepo
,
pTable
,
row
);
}
}
return
0
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录