Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
3b058023
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
3b058023
编写于
12月 17, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/dnode3
上级
7bf10f1a
dbafa13b
变更
15
隐藏空白更改
内联
并排
Showing
15 changed file
with
343 addition
and
295 deletion
+343
-295
include/dnode/vnode/meta/meta.h
include/dnode/vnode/meta/meta.h
+4
-3
include/dnode/vnode/tsdb/tsdb.h
include/dnode/vnode/tsdb/tsdb.h
+3
-1
include/util/tmacro.h
include/util/tmacro.h
+42
-0
source/dnode/vnode/impl/CMakeLists.txt
source/dnode/vnode/impl/CMakeLists.txt
+1
-1
source/dnode/vnode/impl/inc/vnodeBufferPool.h
source/dnode/vnode/impl/inc/vnodeBufferPool.h
+2
-0
source/dnode/vnode/impl/inc/vnodeMAF.h
source/dnode/vnode/impl/inc/vnodeMAF.h
+32
-0
source/dnode/vnode/impl/src/vnodeBufferPool.c
source/dnode/vnode/impl/src/vnodeBufferPool.c
+48
-177
source/dnode/vnode/impl/src/vnodeMain.c
source/dnode/vnode/impl/src/vnodeMain.c
+6
-4
source/dnode/vnode/impl/src/vnodeRequest.c
source/dnode/vnode/impl/src/vnodeRequest.c
+2
-1
source/dnode/vnode/impl/test/vBenchmarkTest.cpp
source/dnode/vnode/impl/test/vBenchmarkTest.cpp
+2
-0
source/dnode/vnode/impl/test/vnodeApiTests.cpp
source/dnode/vnode/impl/test/vnodeApiTests.cpp
+175
-90
source/dnode/vnode/meta/inc/metaDef.h
source/dnode/vnode/meta/inc/metaDef.h
+1
-1
source/dnode/vnode/meta/src/metaMain.c
source/dnode/vnode/meta/src/metaMain.c
+9
-8
source/dnode/vnode/tq/src/tq.c
source/dnode/vnode/tq/src/tq.c
+7
-1
source/dnode/vnode/tsdb/src/tsdbMain.c
source/dnode/vnode/tsdb/src/tsdbMain.c
+9
-8
未找到文件。
include/dnode/vnode/meta/meta.h
浏览文件 @
3b058023
...
...
@@ -16,6 +16,7 @@
#ifndef _TD_META_H_
#define _TD_META_H_
#include "mallocator.h"
#include "os.h"
#include "trow.h"
...
...
@@ -71,7 +72,7 @@ typedef struct STbCfg {
}
STbCfg
;
// SMeta operations
SMeta
*
metaOpen
(
const
char
*
path
,
const
SMetaCfg
*
p
Options
);
SMeta
*
metaOpen
(
const
char
*
path
,
const
SMetaCfg
*
p
MetaCfg
,
SMemAllocatorFactory
*
pMAF
);
void
metaClose
(
SMeta
*
pMeta
);
void
metaRemove
(
const
char
*
path
);
int
metaCreateTable
(
SMeta
*
pMeta
,
STbCfg
*
pTbCfg
);
...
...
@@ -79,8 +80,8 @@ int metaDropTable(SMeta *pMeta, tb_uid_t uid);
int
metaCommit
(
SMeta
*
pMeta
);
// Options
void
metaOptionsInit
(
SMetaCfg
*
p
Options
);
void
metaOptionsClear
(
SMetaCfg
*
p
Options
);
void
metaOptionsInit
(
SMetaCfg
*
p
MetaCfg
);
void
metaOptionsClear
(
SMetaCfg
*
p
MetaCfg
);
// STbCfg
#define META_INIT_STB_CFG(NAME, TTL, KEEP, SUID, PSCHEMA, PTAGSCHEMA) \
...
...
include/dnode/vnode/tsdb/tsdb.h
浏览文件 @
3b058023
...
...
@@ -16,6 +16,8 @@
#ifndef _TD_TSDB_H_
#define _TD_TSDB_H_
#include "mallocator.h"
#ifdef __cplusplus
extern
"C"
{
#endif
...
...
@@ -25,7 +27,7 @@ typedef struct STsdb STsdb;
typedef
struct
STsdbCfg
STsdbCfg
;
// STsdb
STsdb
*
tsdbOpen
(
const
char
*
path
,
const
STsdbCfg
*
pTsdbCfg
);
STsdb
*
tsdbOpen
(
const
char
*
path
,
const
STsdbCfg
*
pTsdbCfg
,
SMemAllocatorFactory
*
pMAF
);
void
tsdbClose
(
STsdb
*
);
void
tsdbRemove
(
const
char
*
path
);
int
tsdbInsertData
(
STsdb
*
pTsdb
,
SSubmitMsg
*
pMsg
);
...
...
include/util/tmacro.h
0 → 100644
浏览文件 @
3b058023
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_UTIL_MACRO_H_
#define _TD_UTIL_MACRO_H_
#include "os.h"
#ifdef __cplusplus
extern
"C"
{
#endif
// Module init/clear MACRO definitions
#define TD_MOD_UNINITIALIZED 0
#define TD_MOD_INITIALIZED 1
#define TD_MOD_UNCLEARD 0
#define TD_MOD_CLEARD 1
typedef
int8_t
td_mode_flag_t
;
#define TD_CHECK_AND_SET_MODE_INIT(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNINITIALIZED, TD_MOD_INITIALIZED)
#define TD_CHECK_AND_SET_MOD_CLEAR(FLAG) atomic_val_compare_exchange_8((FLAG), TD_MOD_UNCLEARD, TD_MOD_CLEARD)
#ifdef __cplusplus
}
#endif
#endif
/*_TD_UTIL_MACRO_H_*/
\ No newline at end of file
source/dnode/vnode/impl/CMakeLists.txt
浏览文件 @
3b058023
...
...
@@ -19,5 +19,5 @@ target_link_libraries(
# test
if
(
${
BUILD_TEST
}
)
#
add_subdirectory(test)
add_subdirectory
(
test
)
endif
(
${
BUILD_TEST
}
)
\ No newline at end of file
source/dnode/vnode/impl/inc/vnodeBufferPool.h
浏览文件 @
3b058023
...
...
@@ -32,6 +32,8 @@ int vnodeBufPoolRecycle(SVnode *pVnode);
void
*
vnodeMalloc
(
SVnode
*
pVnode
,
uint64_t
size
);
bool
vnodeBufPoolIsFull
(
SVnode
*
pVnode
);
SMemAllocatorFactory
*
vBufPoolGetMAF
(
SVnode
*
pVnode
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/vnode/impl/inc/vnodeMAF.h
0 → 100644
浏览文件 @
3b058023
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_MAF_H_
#define _TD_VNODE_MAF_H_
#include "vnode.h"
#ifdef __cplusplus
extern
"C"
{
#endif
int
vnodeOpenMAF
(
SVnode
*
pVnode
);
void
vnodeCloseMAF
(
SVnode
*
pVnode
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_VNODE_MAF_H_*/
\ No newline at end of file
source/dnode/vnode/impl/src/vnodeBufferPool.c
浏览文件 @
3b058023
...
...
@@ -24,10 +24,13 @@ struct SVBufPool {
TD_DLIST
(
SVMemAllocator
)
free
;
TD_DLIST
(
SVMemAllocator
)
incycle
;
SVMemAllocator
*
inuse
;
// MAF for submodules
// SMemAllocatorFactory maf
;
// MAF for submodules
to use
SMemAllocatorFactory
*
pMAF
;
};
static
SMemAllocator
*
vBufPoolCreateMA
(
SMemAllocatorFactory
*
pMAF
);
static
void
vBufPoolDestroyMA
(
SMemAllocatorFactory
*
pMAF
,
SMemAllocator
*
pMA
);
int
vnodeOpenBufPool
(
SVnode
*
pVnode
)
{
uint64_t
capacity
;
...
...
@@ -54,6 +57,15 @@ int vnodeOpenBufPool(SVnode *pVnode) {
tDListAppend
(
&
(
pVnode
->
pBufPool
->
free
),
pVMA
);
}
pVnode
->
pBufPool
->
pMAF
=
(
SMemAllocatorFactory
*
)
malloc
(
sizeof
(
SMemAllocatorFactory
));
if
(
pVnode
->
pBufPool
->
pMAF
==
NULL
)
{
// TODO: handle error
return
-
1
;
}
pVnode
->
pBufPool
->
pMAF
->
impl
=
pVnode
;
pVnode
->
pBufPool
->
pMAF
->
create
=
vBufPoolCreateMA
;
pVnode
->
pBufPool
->
pMAF
->
destroy
=
vBufPoolDestroyMA
;
return
0
;
}
...
...
@@ -125,195 +137,54 @@ bool vnodeBufPoolIsFull(SVnode *pVnode) {
return
vmaIsFull
(
pVnode
->
pBufPool
->
inuse
);
}
#if 0
typedef enum {
// Heap allocator
E_V_HEAP_ALLOCATOR = 0,
// Arena allocator
E_V_ARENA_ALLOCATOR
} EVMemAllocatorT;
typedef struct {
/* TODO */
} SVHeapAllocator;
typedef struct SVArenaNode {
struct SVArenaNode *prev;
uint64_t size;
void * ptr;
char data[];
} SVArenaNode;
typedef struct {
uint64_t ssize; // step size
uint64_t lsize; // limit size
SVArenaNode *inuse;
SVArenaNode node;
} SVArenaAllocator;
SMemAllocatorFactory
*
vBufPoolGetMAF
(
SVnode
*
pVnode
)
{
return
pVnode
->
pBufPool
->
pMAF
;
}
/* ------------------------ STATIC METHODS ------------------------ */
typedef
struct
{
SVnode * pVnode;
S
ListNode *pNode
;
SVnode
*
pVnode
;
S
VMemAllocator
*
pVMA
;
}
SVMAWrapper
;
static
FORCE_INLINE
void
*
vmaMaloocCb
(
SMemAllocator
*
pMA
,
uint64_t
size
)
{
SVMAWrapper
*
pWrapper
=
(
SVMAWrapper
*
)(
pMA
->
impl
);
static SListNode * vBufPoolNewNode(uint64_t capacity, EVMemAllocatorT type);
static void vBufPoolFreeNode(SListNode *pNode);
static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pmaf);
static void vBufPoolDestroyMA(SMemAllocatorFactory *pmaf, SMemAllocator *pma);
static void * vBufPoolMalloc(SVMemAllocator *pvma, uint64_t size);
/* ------------------------ STATIC METHODS ------------------------ */
static SListNode *vBufPoolNewNode(uint64_t capacity, EVMemAllocatorT type) {
SListNode * pNode;
SVMemAllocator *pvma;
uint64_t msize;
uint64_t ssize = 4096; // TODO
uint64_t lsize = 1024; // TODO
msize = sizeof(SListNode) + sizeof(SVMemAllocator);
if (type == E_V_ARENA_ALLOCATOR) {
msize += capacity;
}
pNode = (SListNode *)calloc(1, msize);
if (pNode == NULL) {
// TODO: handle error
return NULL;
}
pvma = (SVMemAllocator *)(pNode->data);
pvma->capacity = capacity;
pvma->type = type;
switch (type) {
case E_V_ARENA_ALLOCATOR:
vArenaAllocatorInit(&(pvma->vaa), capacity, ssize, lsize);
break;
case E_V_HEAP_ALLOCATOR:
// vHeapAllocatorInit(&(pvma->vha));
break;
default:
ASSERT(0);
}
return pNode;
}
static void vBufPoolFreeNode(SListNode *pNode) {
SVMemAllocator *pvma = (SVMemAllocator *)(pNode->data);
switch (pvma->type) {
case E_V_ARENA_ALLOCATOR:
vArenaAllocatorClear(&(pvma->vaa));
break;
case E_V_HEAP_ALLOCATOR:
// vHeapAllocatorClear(&(pvma->vha));
break;
default:
break;
}
free(pNode);
return
vmaMalloc
(
pWrapper
->
pVMA
,
size
);
}
static void *vBufPoolMalloc(SVMemAllocator *pvma, uint64_t size) {
void *ptr = NULL;
if (pvma->type == E_V_ARENA_ALLOCATOR) {
SVArenaAllocator *pvaa = &(pvma->vaa);
if (POINTER_DISTANCE(pvaa->inuse->ptr, pvaa->inuse->data) + size > pvaa->inuse->size) {
SVArenaNode *pNode = (SVArenaNode *)malloc(sizeof(*pNode) + MAX(size, pvaa->ssize));
if (pNode == NULL) {
// TODO: handle error
return NULL;
}
pNode->prev = pvaa->inuse;
pNode->size = MAX(size, pvaa->ssize);
pNode->ptr = pNode->data;
pvaa->inuse = pNode;
}
ptr = pvaa->inuse->ptr;
pvaa->inuse->ptr = POINTER_SHIFT(ptr, size);
} else if (pvma->type == E_V_HEAP_ALLOCATOR) {
/* TODO */
}
return ptr;
}
// TODO: Add atomic operations here
static
SMemAllocator
*
vBufPoolCreateMA
(
SMemAllocatorFactory
*
pMAF
)
{
SMemAllocator
*
pMA
;
SVnode
*
pVnode
=
(
SVnode
*
)(
pMAF
->
impl
);
SVMAWrapper
*
pWrapper
;
static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pmaf) {
SVnode * pVnode;
SMemAllocator * pma;
SVMemAllocator *pvma;
SVMAWrapper * pvmaw;
pVnode = (SVnode *)(pmaf->impl);
pma = (SMemAllocator *)calloc(1, sizeof(*pma) + sizeof(SVMAWrapper));
if (pma == NULL) {
// TODO: handle error
pMA
=
(
SMemAllocator
*
)
calloc
(
1
,
sizeof
(
*
pMA
)
+
sizeof
(
SVMAWrapper
));
if
(
pMA
==
NULL
)
{
return
NULL
;
}
pvmaw = (SVMAWrapper *)POINTER_SHIFT(pma, sizeof(*pma));
// No allocator used currently
if (pVnode->pBufPool->inuse == NULL) {
while (listNEles(&(pVnode->pBufPool->free)) == 0) {
// TODO: wait until all released ro kill query
// tsem_wait();
ASSERT(0);
}
pVnode->pBufPool->inuse = tdListPopHead(&(pVnode->pBufPool->free));
pvma = (SVMemAllocator *)(pVnode->pBufPool->inuse->data);
T_REF_INIT_VAL(pvma, 1);
} else {
pvma = (SVMemAllocator *)(pVnode->pBufPool->inuse->data);
}
T_REF_INC(pvma);
pVnode
->
pBufPool
->
inuse
->
_ref
.
val
++
;
pWrapper
=
POINTER_SHIFT
(
pMA
,
sizeof
(
*
pMA
));
pWrapper
->
pVnode
=
pVnode
;
pWrapper
->
pVMA
=
pVnode
->
pBufPool
->
inuse
;
pvmaw->pVnode = pVnode;
pvmaw->pNode = pVnode->pBufPool->inuse;
pMA
->
impl
=
pWrapper
;
pMA
->
malloc
=
vmaMaloocCb
;
pMA
->
calloc
=
NULL
;
pMA
->
realloc
=
NULL
;
pMA
->
free
=
NULL
;
pMA
->
usage
=
NULL
;
pma->impl = pvmaw;
pma->malloc = NULL;
pma->calloc = NULL; /* TODO */
pma->realloc = NULL; /* TODO */
pma->free = NULL; /* TODO */
pma->usage = NULL; /* TODO */
return pma;
return
pMA
;
}
static void vBufPoolDestroyMA(SMemAllocatorFactory *pmaf, SMemAllocator *pma) { /* TODO */
SVnode * pVnode = (SVnode *)(pmaf->impl);
SListNode * pNode = ((SVMAWrapper *)(pma->impl))->pNode;
SVMemAllocator *pvma = (SVMemAllocator *)(pNode->data);
if (T_REF_DEC(pvma) == 0) {
if (pvma->type == E_V_ARENA_ALLOCATOR) {
SVArenaAllocator *pvaa = &(pvma->vaa);
while (pvaa->inuse != &(pvaa->node)) {
SVArenaNode *pNode = pvaa->inuse;
pvaa->inuse = pNode->prev;
/* code */
}
pvaa->inuse->ptr = pvaa->inuse->data;
} else if (pvma->type == E_V_HEAP_ALLOCATOR) {
} else {
ASSERT(0);
}
static
void
vBufPoolDestroyMA
(
SMemAllocatorFactory
*
pMAF
,
SMemAllocator
*
pMA
)
{
SVMAWrapper
*
pWrapper
=
(
SVMAWrapper
*
)(
pMA
->
impl
);
SVnode
*
pVnode
=
pWrapper
->
pVnode
;
SVMemAllocator
*
pVMA
=
pWrapper
->
pVMA
;
// Move node from incycle to free
tdListAppendNode(&(pVnode->pBufPool->free), tdListPopNode(&(pVnode->pBufPool->incycle), pNode));
// tsem_post(); todo: sem_post
free
(
pMA
);
if
(
--
pVMA
->
_ref
.
val
==
0
)
{
tDListPop
(
&
(
pVnode
->
pBufPool
->
incycle
),
pVMA
);
tDListAppend
(
&
(
pVnode
->
pBufPool
->
free
),
pVMA
);
}
}
#endif
\ No newline at end of file
}
\ No newline at end of file
source/dnode/vnode/impl/src/vnodeMain.c
浏览文件 @
3b058023
...
...
@@ -94,7 +94,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
// Open meta
sprintf
(
dir
,
"%s/meta"
,
pVnode
->
path
);
pVnode
->
pMeta
=
metaOpen
(
dir
,
&
(
pVnode
->
config
.
metaCfg
));
pVnode
->
pMeta
=
metaOpen
(
dir
,
&
(
pVnode
->
config
.
metaCfg
)
,
vBufPoolGetMAF
(
pVnode
)
);
if
(
pVnode
->
pMeta
==
NULL
)
{
// TODO: handle error
return
-
1
;
...
...
@@ -102,7 +102,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
// Open tsdb
sprintf
(
dir
,
"%s/tsdb"
,
pVnode
->
path
);
pVnode
->
pTsdb
=
tsdbOpen
(
dir
,
&
(
pVnode
->
config
.
tsdbCfg
));
pVnode
->
pTsdb
=
tsdbOpen
(
dir
,
&
(
pVnode
->
config
.
tsdbCfg
)
,
vBufPoolGetMAF
(
pVnode
)
);
if
(
pVnode
->
pTsdb
==
NULL
)
{
// TODO: handle error
return
-
1
;
...
...
@@ -110,7 +110,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
// TODO: Open TQ
sprintf
(
dir
,
"%s/tq"
,
pVnode
->
path
);
pVnode
->
pTq
=
tqOpen
(
dir
,
&
(
pVnode
->
config
.
tqCfg
),
NULL
,
NULL
);
pVnode
->
pTq
=
tqOpen
(
dir
,
&
(
pVnode
->
config
.
tqCfg
),
NULL
,
vBufPoolGetMAF
(
pVnode
)
);
if
(
pVnode
->
pTq
==
NULL
)
{
// TODO: handle error
return
-
1
;
...
...
@@ -131,7 +131,9 @@ static int vnodeOpenImpl(SVnode *pVnode) {
static
void
vnodeCloseImpl
(
SVnode
*
pVnode
)
{
if
(
pVnode
)
{
vnodeCloseBufPool
(
pVnode
);
tsdbClose
(
pVnode
->
pTsdb
);
metaClose
(
pVnode
->
pMeta
);
tsdbClose
(
pVnode
->
pTsdb
);
tqClose
(
pVnode
->
pTq
);
walClose
(
pVnode
->
pWal
);
}
}
\ No newline at end of file
source/dnode/vnode/impl/src/vnodeRequest.c
浏览文件 @
3b058023
...
...
@@ -25,9 +25,10 @@ int vnodeBuildReq(void **buf, const SVnodeReq *pReq, uint8_t type) {
switch
(
type
)
{
case
TSDB_MSG_TYPE_CREATE_TABLE
:
tsize
+=
vnodeBuildCreateTableReq
(
buf
,
&
(
pReq
->
ctReq
));
break
;
case
TSDB_MSG_TYPE_SUBMIT
:
/* code */
break
;
default:
break
;
}
...
...
source/dnode/vnode/impl/test/vBenchmarkTest.cpp
0 → 100644
浏览文件 @
3b058023
// https://stackoverflow.com/questions/8565666/benchmarking-with-googletest
// https://github.com/google/benchmark
\ No newline at end of file
source/dnode/vnode/impl/test/vnodeApiTests.cpp
浏览文件 @
3b058023
...
...
@@ -14,7 +14,7 @@
#include "vnode.h"
static
STSchema
*
c
reateBasicSchema
()
{
static
STSchema
*
vtC
reateBasicSchema
()
{
STSchemaBuilder
sb
;
STSchema
*
pSchema
=
NULL
;
...
...
@@ -32,7 +32,7 @@ static STSchema *createBasicSchema() {
return
pSchema
;
}
static
STSchema
*
c
reateBasicTagSchema
()
{
static
STSchema
*
vtC
reateBasicTagSchema
()
{
STSchemaBuilder
sb
;
STSchema
*
pSchema
=
NULL
;
...
...
@@ -50,7 +50,7 @@ static STSchema *createBasicTagSchema() {
return
pSchema
;
}
static
SKVRow
c
reateBasicTag
()
{
static
SKVRow
vtC
reateBasicTag
()
{
SKVRowBuilder
rb
;
SKVRow
pTag
;
...
...
@@ -71,118 +71,203 @@ static SKVRow createBasicTag() {
return
pTag
;
}
#if 0
TEST(vnodeApiTest, test_create_table_encode_and_decode_function) {
tb_uid_t suid = 1638166374163;
STSchema *pSchema = createBasicSchema();
STSchema *pTagSchema = createBasicTagSchema();
char tbname[128] = "st";
char * buffer = new char[1024];
void * pBuf = (void *)buffer;
static
void
vtBuildCreateStbReq
(
tb_uid_t
suid
,
char
*
tbname
,
SRpcMsg
**
ppMsg
)
{
SRpcMsg
*
pMsg
;
STSchema
*
pSchema
;
STSchema
*
pTagSchema
;
int
zs
;
void
*
pBuf
;
pSchema
=
vtCreateBasicSchema
();
pTagSchema
=
vtCreateBasicTagSchema
();
SVnodeReq
vCreateSTbReq
=
VNODE_INIT_CREATE_STB_REQ
(
tbname
,
UINT32_MAX
,
UINT32_MAX
,
suid
,
pSchema
,
pTagSchema
);
zs
=
vnodeBuildReq
(
NULL
,
&
vCreateSTbReq
,
TSDB_MSG_TYPE_CREATE_TABLE
);
pMsg
=
(
SRpcMsg
*
)
malloc
(
sizeof
(
SRpcMsg
)
+
zs
);
pMsg
->
msgType
=
TSDB_MSG_TYPE_CREATE_TABLE
;
pMsg
->
contLen
=
zs
;
pMsg
->
pCont
=
POINTER_SHIFT
(
pMsg
,
sizeof
(
SRpcMsg
));
pBuf
=
pMsg
->
pCont
;
vnodeBuildReq
(
&
pBuf
,
&
vCreateSTbReq
,
TSDB_MSG_TYPE_CREATE_TABLE
);
META_CLEAR_TB_CFG
(
&
vCreateSTbReq
);
SVnodeReq decoded_req;
tdFreeSchema
(
pSchema
);
tdFreeSchema
(
pTagSchema
);
*
ppMsg
=
pMsg
;
}
vnodeParseReq(buffer, &decoded_req, TSDB_MSG_TYPE_CREATE_TABLE);
static
void
vtBuildCreateCtbReq
(
tb_uid_t
suid
,
char
*
tbname
,
SRpcMsg
**
ppMsg
)
{
SRpcMsg
*
pMsg
;
int
tz
;
SKVRow
pTag
=
vtCreateBasicTag
();
int k = 10;
SVnodeReq
vCreateCTbReq
=
VNODE_INIT_CREATE_CTB_REQ
(
tbname
,
UINT32_MAX
,
UINT32_MAX
,
suid
,
pTag
);
tz
=
vnodeBuildReq
(
NULL
,
&
vCreateCTbReq
,
TSDB_MSG_TYPE_CREATE_TABLE
);
pMsg
=
(
SRpcMsg
*
)
malloc
(
sizeof
(
SRpcMsg
)
+
tz
);
pMsg
->
msgType
=
TSDB_MSG_TYPE_CREATE_TABLE
;
pMsg
->
contLen
=
tz
;
pMsg
->
pCont
=
POINTER_SHIFT
(
pMsg
,
sizeof
(
*
pMsg
));
void
*
pBuf
=
pMsg
->
pCont
;
vnodeBuildReq
(
&
pBuf
,
&
vCreateCTbReq
,
TSDB_MSG_TYPE_CREATE_TABLE
);
META_CLEAR_TB_CFG
(
&
vCreateCTbReq
);
free
(
pTag
);
*
ppMsg
=
pMsg
;
}
#endif
TEST
(
vnodeApiTest
,
vnodeOpen_vnodeClose_test
)
{
vnodeDestroy
(
"vnode1"
);
static
void
vtBuildCreateNtbReq
(
char
*
tbname
,
SRpcMsg
**
ppMsg
)
{
// TODO
}
static
void
vtBuildSubmitReq
(
SRpcMsg
**
ppMsg
)
{
SRpcMsg
*
pMsg
;
SSubmitMsg
*
pSubmitMsg
;
SSubmitBlk
*
pSubmitBlk
;
int
tz
=
1024
;
// TODO
pMsg
=
(
SRpcMsg
*
)
malloc
(
sizeof
(
*
pMsg
)
+
tz
);
pMsg
->
msgType
=
TSDB_MSG_TYPE_SUBMIT
;
pMsg
->
contLen
=
tz
;
pMsg
->
pCont
=
POINTER_SHIFT
(
pMsg
,
sizeof
(
*
pMsg
));
// For submit msg header
pSubmitMsg
=
(
SSubmitMsg
*
)(
pMsg
->
pCont
);
// pSubmitMsg->header.contLen = 0;
// pSubmitMsg->header.vgId = 0;
// pSubmitMsg->length = 0;
pSubmitMsg
->
numOfBlocks
=
1
;
// For submit blk
pSubmitBlk
=
(
SSubmitBlk
*
)(
pSubmitMsg
->
blocks
);
pSubmitBlk
->
uid
=
0
;
pSubmitBlk
->
tid
=
0
;
pSubmitBlk
->
padding
=
0
;
pSubmitBlk
->
sversion
=
0
;
pSubmitBlk
->
dataLen
=
0
;
pSubmitBlk
->
numOfRows
=
0
;
// For row batch
*
ppMsg
=
pMsg
;
}
static
void
vtClearMsgBatch
(
SArray
*
pMsgArr
)
{
SRpcMsg
*
pMsg
;
for
(
size_t
i
=
0
;
i
<
taosArrayGetSize
(
pMsgArr
);
i
++
)
{
pMsg
=
*
(
SRpcMsg
**
)
taosArrayGet
(
pMsgArr
,
i
);
free
(
pMsg
);
}
taosArrayClear
(
pMsgArr
);
}
TEST
(
vnodeApiTest
,
vnode_simple_create_table_test
)
{
tb_uid_t
suid
=
1638166374163
;
SRpcMsg
*
pMsg
;
SArray
*
pMsgArr
=
NULL
;
SVnode
*
pVnode
;
int
rcode
;
int
ntables
=
1000000
;
int
batch
=
10
;
char
tbname
[
128
];
pMsgArr
=
(
SArray
*
)
taosArrayInit
(
batch
,
sizeof
(
pMsg
));
vnodeDestroy
(
"vnode1"
);
GTEST_ASSERT_GE
(
vnodeInit
(
2
),
0
);
// C
reate and open a vnode
SVnode
*
pVnode
=
vnodeOpen
(
"vnode1"
,
NULL
);
// C
REATE AND OPEN A VNODE
pVnode
=
vnodeOpen
(
"vnode1"
,
NULL
);
ASSERT_NE
(
pVnode
,
nullptr
);
tb_uid_t
suid
=
1638166374163
;
{
// Create a super table
STSchema
*
pSchema
=
createBasicSchema
();
STSchema
*
pTagSchema
=
createBasicTagSchema
();
char
tbname
[
128
]
=
"st"
;
// CREATE A SUPER TABLE
sprintf
(
tbname
,
"st"
);
vtBuildCreateStbReq
(
suid
,
tbname
,
&
pMsg
);
taosArrayPush
(
pMsgArr
,
&
pMsg
);
rcode
=
vnodeProcessWMsgs
(
pVnode
,
pMsgArr
);
ASSERT_EQ
(
rcode
,
0
);
vtClearMsgBatch
(
pMsgArr
);
// CREATE A LOT OF CHILD TABLES
for
(
int
i
=
0
;
i
<
ntables
/
batch
;
i
++
)
{
// Build request batch
for
(
int
j
=
0
;
j
<
batch
;
j
++
)
{
sprintf
(
tbname
,
"ct%d"
,
i
*
batch
+
j
+
1
);
vtBuildCreateCtbReq
(
suid
,
tbname
,
&
pMsg
);
taosArrayPush
(
pMsgArr
,
&
pMsg
);
}
SArray
*
pMsgs
=
(
SArray
*
)
taosArrayInit
(
1
,
sizeof
(
SRpcMsg
*
));
SVnodeReq
vCreateSTbReq
=
VNODE_INIT_CREATE_STB_REQ
(
tbname
,
UINT32_MAX
,
UINT32_MAX
,
suid
,
pSchema
,
pTagSchema
);
// Process request batch
rcode
=
vnodeProcessWMsgs
(
pVnode
,
pMsgArr
);
ASSERT_EQ
(
rcode
,
0
);
int
zs
=
vnodeBuildReq
(
NULL
,
&
vCreateSTbReq
,
TSDB_MSG_TYPE_CREATE_TABLE
);
SRpcMsg
*
pMsg
=
(
SRpcMsg
*
)
malloc
(
sizeof
(
SRpcMsg
)
+
zs
);
pMsg
->
msgType
=
TSDB_MSG_TYPE_CREATE_TABLE
;
pMsg
->
contLen
=
zs
;
pMsg
->
pCont
=
POINTER_SHIFT
(
pMsg
,
sizeof
(
SRpcMsg
));
// Clear request batch
vtClearMsgBatch
(
pMsgArr
);
}
// CLOSE THE VNODE
vnodeClose
(
pVnode
);
vnodeClear
();
void
*
pBuf
=
pMsg
->
pCont
;
taosArrayDestroy
(
pMsgArr
);
}
vnodeBuildReq
(
&
pBuf
,
&
vCreateSTbReq
,
TSDB_MSG_TYPE_CREATE_TABLE
);
META_CLEAR_TB_CFG
(
&
vCreateSTbReq
);
TEST
(
vnodeApiTest
,
vnode_simple_insert_test
)
{
const
char
*
vname
=
"vnode2"
;
char
tbname
[
128
];
tb_uid_t
suid
=
1638166374163
;
SRpcMsg
*
pMsg
;
SArray
*
pMsgArr
;
int
rcode
;
SVnode
*
pVnode
;
int
batch
=
1
;
int
loop
=
1000000
;
taosArrayPush
(
pMsgs
,
&
(
pMsg
));
pMsgArr
=
(
SArray
*
)
taosArrayInit
(
0
,
sizeof
(
pMsg
));
vnodeProcessWMsgs
(
pVnode
,
pMsgs
);
vnodeDestroy
(
vname
);
free
(
pMsg
);
taosArrayDestroy
(
pMsgs
);
tdFreeSchema
(
pSchema
);
tdFreeSchema
(
pTagSchema
);
}
GTEST_ASSERT_GE
(
vnodeInit
(
2
),
0
);
{
// Create some child tables
int
ntables
=
1000000
;
int
batch
=
10
;
for
(
int
i
=
0
;
i
<
ntables
/
batch
;
i
++
)
{
SArray
*
pMsgs
=
(
SArray
*
)
taosArrayInit
(
batch
,
sizeof
(
SRpcMsg
*
));
for
(
int
j
=
0
;
j
<
batch
;
j
++
)
{
SKVRow
pTag
=
createBasicTag
();
char
tbname
[
128
];
sprintf
(
tbname
,
"tb%d"
,
i
*
batch
+
j
);
SVnodeReq
vCreateCTbReq
=
VNODE_INIT_CREATE_CTB_REQ
(
tbname
,
UINT32_MAX
,
UINT32_MAX
,
suid
,
pTag
);
int
tz
=
vnodeBuildReq
(
NULL
,
&
vCreateCTbReq
,
TSDB_MSG_TYPE_CREATE_TABLE
);
SRpcMsg
*
pMsg
=
(
SRpcMsg
*
)
malloc
(
sizeof
(
SRpcMsg
)
+
tz
);
pMsg
->
msgType
=
TSDB_MSG_TYPE_CREATE_TABLE
;
pMsg
->
contLen
=
tz
;
pMsg
->
pCont
=
POINTER_SHIFT
(
pMsg
,
sizeof
(
*
pMsg
));
void
*
pBuf
=
pMsg
->
pCont
;
vnodeBuildReq
(
&
pBuf
,
&
vCreateCTbReq
,
TSDB_MSG_TYPE_CREATE_TABLE
);
META_CLEAR_TB_CFG
(
&
vCreateCTbReq
);
free
(
pTag
);
taosArrayPush
(
pMsgs
,
&
(
pMsg
));
}
vnodeProcessWMsgs
(
pVnode
,
pMsgs
);
for
(
int
j
=
0
;
j
<
batch
;
j
++
)
{
SRpcMsg
*
pMsg
=
*
(
SRpcMsg
**
)
taosArrayPop
(
pMsgs
);
free
(
pMsg
);
}
taosArrayDestroy
(
pMsgs
);
// std::cout << "the " << i << "th batch is created" << std::endl;
// Open a vnode
pVnode
=
vnodeOpen
(
vname
,
NULL
);
GTEST_ASSERT_NE
(
pVnode
,
nullptr
);
// 1. CREATE A SUPER TABLE
sprintf
(
tbname
,
"st"
);
vtBuildCreateStbReq
(
suid
,
tbname
,
&
pMsg
);
taosArrayPush
(
pMsgArr
,
&
pMsg
);
rcode
=
vnodeProcessWMsgs
(
pVnode
,
pMsgArr
);
GTEST_ASSERT_EQ
(
rcode
,
0
);
vtClearMsgBatch
(
pMsgArr
);
// 2. CREATE A CHILD TABLE
sprintf
(
tbname
,
"t0"
);
vtBuildCreateCtbReq
(
suid
,
tbname
,
&
pMsg
);
taosArrayPush
(
pMsgArr
,
&
pMsg
);
rcode
=
vnodeProcessWMsgs
(
pVnode
,
pMsgArr
);
GTEST_ASSERT_EQ
(
rcode
,
0
);
vtClearMsgBatch
(
pMsgArr
);
// 3. WRITE A LOT OF TIME-SERIES DATA
for
(
int
j
=
0
;
j
<
loop
;
j
++
)
{
for
(
int
i
=
0
;
i
<
batch
;
i
++
)
{
vtBuildSubmitReq
(
&
pMsg
);
taosArrayPush
(
pMsgArr
,
&
pMsg
);
}
rcode
=
vnodeProcessWMsgs
(
pVnode
,
pMsgArr
);
GTEST_ASSERT_EQ
(
rcode
,
0
);
vtClearMsgBatch
(
pMsgArr
);
}
// Close the vnode
vnodeClose
(
pVnode
);
vnodeClear
();
}
TEST
(
vnodeApiTest
,
DISABLED_vnode_process_create_table
)
{
STSchema
*
pSchema
=
NULL
;
STSchema
*
pTagSchema
=
NULL
;
char
stname
[
15
];
SVCreateTableReq
pReq
=
META_INIT_STB_CFG
(
stname
,
UINT32_MAX
,
UINT32_MAX
,
0
,
pSchema
,
pTagSchema
);
int
k
=
10
;
META_CLEAR_TB_CFG
(
pReq
);
}
taosArrayDestroy
(
pMsgArr
);
}
\ No newline at end of file
source/dnode/vnode/meta/inc/metaDef.h
浏览文件 @
3b058023
...
...
@@ -34,7 +34,7 @@ extern "C" {
struct
SMeta
{
char
*
path
;
SMetaCfg
options
;
SMetaDB
*
pDB
;
SMetaDB
*
pDB
;
SMetaIdx
*
pIdx
;
SMetaCache
*
pCache
;
STbUidGenerator
uidGnrt
;
...
...
source/dnode/vnode/meta/src/metaMain.c
浏览文件 @
3b058023
...
...
@@ -17,27 +17,27 @@
#include "metaDef.h"
static
SMeta
*
metaNew
(
const
char
*
path
,
const
SMetaCfg
*
pMeta
Options
);
static
SMeta
*
metaNew
(
const
char
*
path
,
const
SMetaCfg
*
pMeta
Cfg
,
SMemAllocatorFactory
*
pMAF
);
static
void
metaFree
(
SMeta
*
pMeta
);
static
int
metaOpenImpl
(
SMeta
*
pMeta
);
static
void
metaCloseImpl
(
SMeta
*
pMeta
);
SMeta
*
metaOpen
(
const
char
*
path
,
const
SMetaCfg
*
pMeta
Options
)
{
SMeta
*
metaOpen
(
const
char
*
path
,
const
SMetaCfg
*
pMeta
Cfg
,
SMemAllocatorFactory
*
pMAF
)
{
SMeta
*
pMeta
=
NULL
;
// Set default options
if
(
pMeta
Options
==
NULL
)
{
pMeta
Options
=
&
defaultMetaOptions
;
if
(
pMeta
Cfg
==
NULL
)
{
pMeta
Cfg
=
&
defaultMetaOptions
;
}
// Validate the options
if
(
metaValidateOptions
(
pMeta
Options
)
<
0
)
{
if
(
metaValidateOptions
(
pMeta
Cfg
)
<
0
)
{
// TODO: deal with error
return
NULL
;
}
// Allocate handle
pMeta
=
metaNew
(
path
,
pMeta
Options
);
pMeta
=
metaNew
(
path
,
pMeta
Cfg
,
pMAF
);
if
(
pMeta
==
NULL
)
{
// TODO: handle error
return
NULL
;
...
...
@@ -65,7 +65,7 @@ void metaClose(SMeta *pMeta) {
void
metaRemove
(
const
char
*
path
)
{
taosRemoveDir
(
path
);
}
/* ------------------------ STATIC METHODS ------------------------ */
static
SMeta
*
metaNew
(
const
char
*
path
,
const
SMetaCfg
*
pMeta
Options
)
{
static
SMeta
*
metaNew
(
const
char
*
path
,
const
SMetaCfg
*
pMeta
Cfg
,
SMemAllocatorFactory
*
pMAF
)
{
SMeta
*
pMeta
;
size_t
psize
=
strlen
(
path
);
...
...
@@ -80,7 +80,8 @@ static SMeta *metaNew(const char *path, const SMetaCfg *pMetaOptions) {
return
NULL
;
}
metaOptionsCopy
(
&
(
pMeta
->
options
),
pMetaOptions
);
metaOptionsCopy
(
&
(
pMeta
->
options
),
pMetaCfg
);
pMeta
->
pmaf
=
pMAF
;
return
pMeta
;
};
...
...
source/dnode/vnode/tq/src/tq.c
浏览文件 @
3b058023
...
...
@@ -63,7 +63,13 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemA
return
pTq
;
}
static
int
tqProtoCheck
(
TmqMsgHead
*
pMsg
)
{
return
pMsg
->
protoVer
==
0
;
}
void
tqClose
(
STQ
*
pTq
)
{
// TODO
}
static
int
tqProtoCheck
(
TmqMsgHead
*
pMsg
)
{
return
pMsg
->
protoVer
==
0
;
}
static
int
tqAckOneTopic
(
STqBufferHandle
*
bHandle
,
TmqOneAck
*
pAck
,
STqQueryMsg
**
ppQuery
)
{
// clean old item and move forward
...
...
source/dnode/vnode/tsdb/src/tsdbMain.c
浏览文件 @
3b058023
...
...
@@ -15,27 +15,27 @@
#include "tsdbDef.h"
static
STsdb
*
tsdbNew
(
const
char
*
path
,
const
STsdbCfg
*
pTsdb
Options
);
static
STsdb
*
tsdbNew
(
const
char
*
path
,
const
STsdbCfg
*
pTsdb
Cfg
,
SMemAllocatorFactory
*
pMAF
);
static
void
tsdbFree
(
STsdb
*
pTsdb
);
static
int
tsdbOpenImpl
(
STsdb
*
pTsdb
);
static
void
tsdbCloseImpl
(
STsdb
*
pTsdb
);
STsdb
*
tsdbOpen
(
const
char
*
path
,
const
STsdbCfg
*
pTsdb
Options
)
{
STsdb
*
tsdbOpen
(
const
char
*
path
,
const
STsdbCfg
*
pTsdb
Cfg
,
SMemAllocatorFactory
*
pMAF
)
{
STsdb
*
pTsdb
=
NULL
;
// Set default TSDB Options
if
(
pTsdb
Options
==
NULL
)
{
pTsdb
Options
=
&
defautlTsdbOptions
;
if
(
pTsdb
Cfg
==
NULL
)
{
pTsdb
Cfg
=
&
defautlTsdbOptions
;
}
// Validate the options
if
(
tsdbValidateOptions
(
pTsdb
Options
)
<
0
)
{
if
(
tsdbValidateOptions
(
pTsdb
Cfg
)
<
0
)
{
// TODO: handle error
return
NULL
;
}
// Create the handle
pTsdb
=
tsdbNew
(
path
,
pTsdb
Options
);
pTsdb
=
tsdbNew
(
path
,
pTsdb
Cfg
,
pMAF
);
if
(
pTsdb
==
NULL
)
{
// TODO: handle error
return
NULL
;
...
...
@@ -62,7 +62,7 @@ void tsdbClose(STsdb *pTsdb) {
void
tsdbRemove
(
const
char
*
path
)
{
taosRemoveDir
(
path
);
}
/* ------------------------ STATIC METHODS ------------------------ */
static
STsdb
*
tsdbNew
(
const
char
*
path
,
const
STsdbCfg
*
pTsdb
Options
)
{
static
STsdb
*
tsdbNew
(
const
char
*
path
,
const
STsdbCfg
*
pTsdb
Cfg
,
SMemAllocatorFactory
*
pMAF
)
{
STsdb
*
pTsdb
=
NULL
;
pTsdb
=
(
STsdb
*
)
calloc
(
1
,
sizeof
(
STsdb
));
...
...
@@ -72,7 +72,8 @@ static STsdb *tsdbNew(const char *path, const STsdbCfg *pTsdbOptions) {
}
pTsdb
->
path
=
strdup
(
path
);
tsdbOptionsCopy
(
&
(
pTsdb
->
options
),
pTsdbOptions
);
tsdbOptionsCopy
(
&
(
pTsdb
->
options
),
pTsdbCfg
);
pTsdb
->
pmaf
=
pMAF
;
return
pTsdb
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录