Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
fa66a89b
TDengine
项目概览
taosdata
/
TDengine
12 个月 前同步成功
通知
1180
Star
22014
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
fa66a89b
编写于
12月 20, 2021
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/scheduler
上级
5579b50a
67ed87e2
变更
29
隐藏空白更改
内联
并排
Showing
29 changed file
with
875 addition
and
217 deletion
+875
-217
cmake/cmake.options
cmake/cmake.options
+7
-0
include/common/taosmsg.h
include/common/taosmsg.h
+13
-8
include/dnode/vnode/tsdb/tsdb.h
include/dnode/vnode/tsdb/tsdb.h
+10
-10
include/libs/index/index.h
include/libs/index/index.h
+17
-5
include/util/mallocator.h
include/util/mallocator.h
+3
-0
include/util/taoserror.h
include/util/taoserror.h
+12
-11
include/util/tdef.h
include/util/tdef.h
+1
-1
source/dnode/mgmt/impl/src/dndVnodes.c
source/dnode/mgmt/impl/src/dndVnodes.c
+52
-36
source/dnode/mgmt/impl/test/CMakeLists.txt
source/dnode/mgmt/impl/test/CMakeLists.txt
+1
-1
source/dnode/mgmt/impl/test/db/db.cpp
source/dnode/mgmt/impl/test/db/db.cpp
+1
-0
source/dnode/mgmt/impl/test/vgroup/CMakeLists.txt
source/dnode/mgmt/impl/test/vgroup/CMakeLists.txt
+27
-0
source/dnode/mgmt/impl/test/vgroup/vgroup.cpp
source/dnode/mgmt/impl/test/vgroup/vgroup.cpp
+224
-0
source/dnode/mnode/impl/inc/mndTrans.h
source/dnode/mnode/impl/inc/mndTrans.h
+2
-2
source/dnode/mnode/impl/inc/mndVgroup.h
source/dnode/mnode/impl/inc/mndVgroup.h
+3
-1
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+51
-1
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+6
-2
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+162
-46
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+78
-3
source/dnode/vnode/impl/src/vnodeCommit.c
source/dnode/vnode/impl/src/vnodeCommit.c
+8
-0
source/dnode/vnode/tq/src/tq.c
source/dnode/vnode/tq/src/tq.c
+1
-1
source/dnode/vnode/tsdb/src/tsdbCommit.c
source/dnode/vnode/tsdb/src/tsdbCommit.c
+12
-0
source/libs/index/CMakeLists.txt
source/libs/index/CMakeLists.txt
+5
-1
source/libs/index/inc/indexInt.h
source/libs/index/inc/indexInt.h
+11
-12
source/libs/index/inc/index_cache.h
source/libs/index/inc/index_cache.h
+4
-4
source/libs/index/src/index.c
source/libs/index/src/index.c
+69
-46
source/libs/index/src/index_cache.c
source/libs/index/src/index_cache.c
+15
-17
source/libs/index/test/CMakeLists.txt
source/libs/index/test/CMakeLists.txt
+1
-1
source/libs/index/test/indexTests.cc
source/libs/index/test/indexTests.cc
+78
-8
source/util/src/terror.c
source/util/src/terror.c
+1
-0
未找到文件。
cmake/cmake.options
浏览文件 @
fa66a89b
...
...
@@ -37,6 +37,7 @@ option(
off
)
option(
BUILD_WITH_NURAFT
"If build with NuRaft"
...
...
@@ -54,3 +55,9 @@ option(
"If use doxygen build documents"
OFF
)
option(
BUILD_WITH_INVERTEDINDEX
"If use invertedIndex"
ON
)
include/common/taosmsg.h
浏览文件 @
fa66a89b
...
...
@@ -752,31 +752,36 @@ typedef struct {
}
SReplica
;
typedef
struct
{
char
db
[
TSDB_FULL_DB_NAME_LEN
];
int32_t
vgId
;
int32_t
dnodeId
;
char
db
[
TSDB_FULL_DB_NAME_LEN
];
uint64_t
dbUid
;
int32_t
cacheBlockSize
;
int32_t
totalBlocks
;
int32_t
daysPerFile
;
int32_t
daysToKeep0
;
int32_t
daysToKeep1
;
int32_t
daysToKeep2
;
int32_t
minRowsPerFileBlock
;
int32_t
maxRowsPerFileBlock
;
int32_t
minRows
;
int32_t
maxRows
;
int32_t
commitTime
;
int32_t
fsyncPeriod
;
int8_t
reserved
[
16
]
;
int8_t
walLevel
;
int8_t
precision
;
int8_t
compression
;
int8_t
cacheLastRow
;
int8_t
update
;
int8_t
walLevel
;
int8_t
quorum
;
int8_t
update
;
int8_t
cacheLastRow
;
int8_t
replica
;
int8_t
selfIndex
;
SReplica
replicas
[
TSDB_MAX_REPLICA
];
}
SCreateVnodeMsg
,
SAlterVnodeMsg
;
typedef
struct
{
int32_t
vgId
;
int32_t
vgId
;
int32_t
dnodeId
;
char
db
[
TSDB_FULL_DB_NAME_LEN
];
uint64_t
dbUid
;
}
SDropVnodeMsg
,
SSyncVnodeMsg
,
SCompactVnodeMsg
;
typedef
struct
{
...
...
include/dnode/vnode/tsdb/tsdb.h
浏览文件 @
fa66a89b
...
...
@@ -23,27 +23,27 @@ extern "C" {
#endif
// TYPES EXPOSED
typedef
struct
STsdb
STsdb
;
typedef
struct
STsdbCfg
STsdbCfg
;
typedef
struct
STsdb
STsdb
;
typedef
struct
STsdbCfg
{
uint64_t
lruCacheSize
;
uint32_t
keep0
;
uint32_t
keep1
;
uint32_t
keep2
;
}
STsdbCfg
;
// STsdb
STsdb
*
tsdbOpen
(
const
char
*
path
,
const
STsdbCfg
*
pTsdbCfg
,
SMemAllocatorFactory
*
pMAF
);
void
tsdbClose
(
STsdb
*
);
void
tsdbRemove
(
const
char
*
path
);
int
tsdbInsertData
(
STsdb
*
pTsdb
,
SSubmitMsg
*
pMsg
);
int
tsdbPrepareCommit
(
STsdb
*
pTsdb
);
int
tsdbCommit
(
STsdb
*
pTsdb
);
// STsdbCfg
int
tsdbOptionsInit
(
STsdbCfg
*
);
void
tsdbOptionsClear
(
STsdbCfg
*
);
/* ------------------------ STRUCT DEFINITIONS ------------------------ */
struct
STsdbCfg
{
uint64_t
lruCacheSize
;
uint32_t
keep0
;
uint32_t
keep1
;
uint32_t
keep2
;
};
#ifdef __cplusplus
}
#endif
...
...
include/libs/index/index.h
浏览文件 @
fa66a89b
...
...
@@ -24,6 +24,7 @@ extern "C" {
#endif
typedef
struct
SIndex
SIndex
;
typedef
struct
SIndexTerm
SIndexTerm
;
typedef
struct
SIndexOpts
SIndexOpts
;
typedef
struct
SIndexMultiTermQuery
SIndexMultiTermQuery
;
typedef
struct
SArray
SIndexMultiTerm
;
...
...
@@ -35,7 +36,7 @@ typedef enum {
ADD_INDEX
,
// add index on specify column
DROP_INDEX
,
// drop existed index
DROP_SATBLE
// drop stable
}
SIndex
ColumnType
;
}
SIndex
OperOnColumn
;
typedef
enum
{
MUST
=
0
,
SHOULD
=
1
,
NOT
=
2
}
EIndexOperatorType
;
typedef
enum
{
QUERY_TERM
=
0
,
QUERY_PREFIX
=
1
,
QUERY_SUFFIX
=
2
,
QUERY_REGEX
=
3
}
EIndexQueryType
;
...
...
@@ -45,12 +46,12 @@ typedef enum { QUERY_TERM = 0, QUERY_PREFIX = 1, QUERY_SUFFIX = 2,QUERY_REGEX =
*/
SIndexMultiTermQuery
*
indexMultiTermQueryCreate
(
EIndexOperatorType
oper
);
void
indexMultiTermQueryDestroy
(
SIndexMultiTermQuery
*
pQuery
);
int
indexMultiTermQueryAdd
(
SIndexMultiTermQuery
*
pQuery
,
const
char
*
field
,
int32_t
nFields
,
const
char
*
value
,
int32_t
nValue
,
EIndexQueryType
type
);
int
indexMultiTermQueryAdd
(
SIndexMultiTermQuery
*
pQuery
,
SIndexTerm
*
term
,
EIndexQueryType
type
);
/*
* @param:
* @param:
*/
SIndex
*
indexOpen
(
SIndexOpts
*
opt
,
const
char
*
path
);
int
indexOpen
(
SIndexOpts
*
opt
,
const
char
*
path
,
SIndex
**
index
);
void
indexClose
(
SIndex
*
index
);
int
indexPut
(
SIndex
*
index
,
SIndexMultiTerm
*
terms
,
int
uid
);
int
indexDelete
(
SIndex
*
index
,
SIndexMultiTermQuery
*
query
);
...
...
@@ -61,8 +62,8 @@ int indexRebuild(SIndex *index, SIndexOpts *opt);
* @param
*/
SIndexMultiTerm
*
indexMultiTermCreate
();
int
indexMultiTermAdd
(
SIndexMultiTerm
*
terms
,
const
char
*
field
,
int32_t
nFields
,
const
char
*
value
,
int32_t
nValue
);
void
indexMultiTermDestroy
(
SIndexMultiTerm
*
terms
);
int
indexMultiTermAdd
(
SIndexMultiTerm
*
terms
,
SIndexTerm
*
term
);
void
indexMultiTermDestroy
(
SIndexMultiTerm
*
terms
);
/*
* @param:
* @param:
...
...
@@ -70,6 +71,17 @@ void indexMultiTermDestroy(SIndexMultiTerm *terms);
SIndexOpts
*
indexOptsCreate
();
void
indexOptsDestroy
(
SIndexOpts
*
opts
);
/*
* @param:
* @param:
*/
SIndexTerm
*
indexTermCreate
(
int64_t
suid
,
SIndexOperOnColumn
operType
,
uint8_t
colType
,
const
char
*
colName
,
int32_t
nColName
,
const
char
*
colVal
,
int32_t
nColVal
);
void
indexTermDestroy
(
SIndexTerm
*
p
);
#ifdef __cplusplus
}
#endif
...
...
include/util/mallocator.h
浏览文件 @
fa66a89b
...
...
@@ -39,6 +39,9 @@ typedef struct SMemAllocator {
TD_MEM_ALCT
(
SMemAllocator
);
}
SMemAllocator
;
#define tMalloc(pMA, SIZE) TD_MA_MALLOC(PMA, SIZE)
#define tFree(pMA, PTR) TD_MA_FREE(PMA, PTR)
typedef
struct
SMemAllocatorFactory
{
void
*
impl
;
SMemAllocator
*
(
*
create
)(
struct
SMemAllocatorFactory
*
);
...
...
include/util/taoserror.h
浏览文件 @
fa66a89b
...
...
@@ -120,17 +120,18 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0X0224) //"Invalid tsc input")
// mnode-common
#define TSDB_CODE_MND_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0300)
#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0301)
#define TSDB_CODE_MND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0302)
#define TSDB_CODE_MND_ACTION_NEED_REPROCESSED TAOS_DEF_ERROR_CODE(0, 0x0303)
#define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0304)
#define TSDB_CODE_MND_INVALID_OPTIONS TAOS_DEF_ERROR_CODE(0, 0x0305)
#define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0306)
#define TSDB_CODE_MND_INVALID_MSG_VERSION TAOS_DEF_ERROR_CODE(0, 0x0307)
#define TSDB_CODE_MND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0308)
#define TSDB_CODE_MND_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x0309)
#define TSDB_CODE_MND_TOO_MANY_SHELL_CONNS TAOS_DEF_ERROR_CODE(0, 0x030A)
#define TSDB_CODE_MND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0300)
#define TSDB_CODE_MND_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0301)
#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0302)
#define TSDB_CODE_MND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0303)
#define TSDB_CODE_MND_ACTION_NEED_REPROCESSED TAOS_DEF_ERROR_CODE(0, 0x0304)
#define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0305)
#define TSDB_CODE_MND_INVALID_OPTIONS TAOS_DEF_ERROR_CODE(0, 0x0306)
#define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0307)
#define TSDB_CODE_MND_INVALID_MSG_VERSION TAOS_DEF_ERROR_CODE(0, 0x0308)
#define TSDB_CODE_MND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0309)
#define TSDB_CODE_MND_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x030A)
#define TSDB_CODE_MND_TOO_MANY_SHELL_CONNS TAOS_DEF_ERROR_CODE(0, 0x030B)
// mnode-show
#define TSDB_CODE_MND_INVALID_SHOWOBJ TAOS_DEF_ERROR_CODE(0, 0x0310)
...
...
include/util/tdef.h
浏览文件 @
fa66a89b
...
...
@@ -231,7 +231,7 @@ do { \
#define TSDB_DEFAULT_PAYLOAD_SIZE 5120 // default payload size, greater than PATH_MAX value
#define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth
#define TSDB_CQ_SQL_SIZE 1024
#define TSDB_MIN_VNODES
64
#define TSDB_MIN_VNODES
16
#define TSDB_MAX_VNODES 512
#define TSDB_MIN_VNODES_PER_DB 1
#define TSDB_MAX_VNODES_PER_DB 4096
...
...
source/dnode/mgmt/impl/src/dndVnodes.c
浏览文件 @
fa66a89b
...
...
@@ -95,7 +95,7 @@ static int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg);
static
SVnodeObj
*
dndAcquireVnode
(
SDnode
*
pDnode
,
int32_t
vgId
)
{
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
SVnodeObj
*
pVnode
=
NULL
;
SVnodeObj
*
pVnode
=
NULL
;
int32_t
refCount
=
0
;
taosRLockLatch
(
&
pMgmt
->
latch
);
...
...
@@ -107,23 +107,23 @@ static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) {
}
taosRUnLockLatch
(
&
pMgmt
->
latch
);
dTrace
(
"vgId:%d, acquire vnode, refCount:%d"
,
pVnode
->
vgId
,
refCount
);
if
(
pVnode
!=
NULL
)
{
dTrace
(
"vgId:%d, acquire vnode, refCount:%d"
,
pVnode
->
vgId
,
refCount
);
}
return
pVnode
;
}
static
void
dndReleaseVnode
(
SDnode
*
pDnode
,
SVnodeObj
*
pVnode
)
{
if
(
pVnode
==
NULL
)
return
;
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
int32_t
refCount
=
0
;
taosRLockLatch
(
&
pMgmt
->
latch
);
if
(
pVnode
!=
NULL
)
{
refCount
=
atomic_sub_fetch_32
(
&
pVnode
->
refCount
,
1
);
}
int32_t
refCount
=
atomic_sub_fetch_32
(
&
pVnode
->
refCount
,
1
);
taosRUnLockLatch
(
&
pMgmt
->
latch
);
if
(
pVnode
!=
NULL
)
{
dTrace
(
"vgId:%d, release vnode, refCount:%d"
,
pVnode
->
vgId
,
refCount
);
}
dTrace
(
"vgId:%d, release vnode, refCount:%d"
,
pVnode
->
vgId
,
refCount
);
}
static
int32_t
dndCreateVnodeWrapper
(
SDnode
*
pDnode
,
int32_t
vgId
,
char
*
path
,
SVnode
*
pImpl
)
{
...
...
@@ -457,7 +457,7 @@ static int32_t dndOpenVnodes(SDnode *pDnode) {
pMgmt
->
totalVnodes
=
numOfVnodes
;
int32_t
threadNum
=
tsN
umOfCores
;
int32_t
threadNum
=
pDnode
->
opt
.
n
umOfCores
;
int32_t
vnodesPerThread
=
numOfVnodes
/
threadNum
+
1
;
SVnodeThread
*
threads
=
calloc
(
threadNum
,
sizeof
(
SVnodeThread
));
...
...
@@ -525,33 +525,49 @@ static void dndCloseVnodes(SDnode *pDnode) {
static
int32_t
dndParseCreateVnodeReq
(
SRpcMsg
*
rpcMsg
,
int32_t
*
vgId
,
SVnodeCfg
*
pCfg
)
{
SCreateVnodeMsg
*
pCreate
=
rpcMsg
->
pCont
;
*
vgId
=
htonl
(
pCreate
->
vgId
);
pCreate
->
vgId
=
htonl
(
pCreate
->
vgId
);
pCreate
->
dnodeId
=
htonl
(
pCreate
->
dnodeId
);
pCreate
->
dbUid
=
htobe64
(
pCreate
->
dbUid
);
pCreate
->
cacheBlockSize
=
htonl
(
pCreate
->
cacheBlockSize
);
pCreate
->
totalBlocks
=
htonl
(
pCreate
->
totalBlocks
);
pCreate
->
daysPerFile
=
htonl
(
pCreate
->
daysPerFile
);
pCreate
->
daysToKeep0
=
htonl
(
pCreate
->
daysToKeep0
);
pCreate
->
daysToKeep1
=
htonl
(
pCreate
->
daysToKeep1
);
pCreate
->
daysToKeep2
=
htonl
(
pCreate
->
daysToKeep2
);
pCreate
->
minRows
=
htonl
(
pCreate
->
minRows
);
pCreate
->
maxRows
=
htonl
(
pCreate
->
maxRows
);
pCreate
->
commitTime
=
htonl
(
pCreate
->
commitTime
);
pCreate
->
fsyncPeriod
=
htonl
(
pCreate
->
fsyncPeriod
);
for
(
int
r
=
0
;
r
<
pCreate
->
replica
;
++
r
)
{
SReplica
*
pReplica
=
&
pCreate
->
replicas
[
r
];
pReplica
->
id
=
htonl
(
pReplica
->
id
);
pReplica
->
port
=
htons
(
pReplica
->
port
);
}
*
vgId
=
pCreate
->
vgId
;
#if 0
tstrncpy(pCfg->db, pCreate->db, TSDB_FULL_DB_NAME_LEN);
pCfg->cacheBlockSize = htonl(pCreate->cacheBlockSize);
pCfg->totalBlocks = htonl(pCreate->totalBlocks);
pCfg->daysPerFile = htonl(pCreate->daysPerFile);
pCfg->daysToKeep0 = htonl(pCreate->daysToKeep0);
pCfg->daysToKeep1 = htonl(pCreate->daysToKeep1);
pCfg->daysToKeep2 = htonl(pCreate->daysToKeep2);
pCfg->minRowsPerFileBlock = htonl(pCreate->minRowsPerFileBlock);
pCfg->maxRowsPerFileBlock = htonl(pCreate->maxRowsPerFileBlock);
pCfg->precision = pCreate->precision;
pCfg->compression = pCreate->compression;
pCfg->cacheLastRow = pCreate->cacheLastRow;
pCfg->update = pCreate->update;
pCfg->quorum = pCreate->quorum;
pCfg->replica = pCreate->replica;
pCfg->walLevel = pCreate->walLevel;
pCfg->fsyncPeriod = htonl(pCreate->fsyncPeriod);
for (int32_t i = 0; i < pCfg->replica; ++i) {
pCfg->replicas[i].port = htons(pCreate->replicas[i].port);
tstrncpy(pCfg->replicas[i].fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN);
}
pCfg->wsize = pCreate->cacheBlockSize;
pCfg->ssize = pCreate->cacheBlockSize;
pCfg->wsize = pCreate->cacheBlockSize;
pCfg->lsize = pCreate->cacheBlockSize;
pCfg->isHeapAllocator = true;
pCfg->ttl = 4;
pCfg->keep = pCreate->daysToKeep0;
pCfg->isWeak = true;
pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0;
pCfg->tsdbCfg.keep1 = pCreate->daysToKeep2;
pCfg->tsdbCfg.keep2 = pCreate->daysToKeep0;
pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize;
pCfg->metaCfg.lruSize = pCreate->cacheBlockSize;
pCfg->walCfg.fsyncPeriod = pCreate->fsyncPeriod;
pCfg->walCfg.level = pCreate->walLevel;
pCfg->walCfg.retentionPeriod = 10;
pCfg->walCfg.retentionSize = 128;
pCfg->walCfg.rollPeriod = 128;
pCfg->walCfg.segSize = 128;
pCfg->walCfg.vgId = pCreate->vgId;
#endif
return
0
;
}
...
...
@@ -1016,7 +1032,7 @@ static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) {
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
SMWorkerPool
*
pPool
=
&
pMgmt
->
writePool
;
pPool
->
name
=
"vnode-write"
;
pPool
->
max
=
tsN
umOfCores
;
pPool
->
max
=
pDnode
->
opt
.
n
umOfCores
;
if
(
tMWorkerInit
(
pPool
)
!=
0
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
...
...
@@ -1050,7 +1066,7 @@ static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) {
}
static
int32_t
dndInitVnodeSyncWorker
(
SDnode
*
pDnode
)
{
int32_t
maxThreads
=
tsN
umOfCores
/
2
;
int32_t
maxThreads
=
pDnode
->
opt
.
n
umOfCores
/
2
;
if
(
maxThreads
<
1
)
maxThreads
=
1
;
SVnodesMgmt
*
pMgmt
=
&
pDnode
->
vmgmt
;
...
...
source/dnode/mgmt/impl/test/CMakeLists.txt
浏览文件 @
fa66a89b
...
...
@@ -15,6 +15,6 @@ add_subdirectory(stb)
# add_subdirectory(telem)
# add_subdirectory(trans)
add_subdirectory
(
user
)
#
add_subdirectory(vgroup)
add_subdirectory
(
vgroup
)
# add_subdirectory(common)
source/dnode/mgmt/impl/test/db/db.cpp
浏览文件 @
fa66a89b
...
...
@@ -232,6 +232,7 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
SRpcMsg
*
pMsg
=
pClient
->
pRsp
;
ASSERT_NE
(
pMsg
,
nullptr
);
ASSERT_EQ
(
pMsg
->
code
,
0
);
// taosMsleep(1000000);
}
SendTheCheckShowMetaMsg
(
TSDB_MGMT_TABLE_DB
,
"show databases"
,
17
,
NULL
);
...
...
source/dnode/mgmt/impl/test/vgroup/CMakeLists.txt
0 → 100644
浏览文件 @
fa66a89b
add_executable
(
dnode_test_vgroup
""
)
target_sources
(
dnode_test_vgroup
PRIVATE
"vgroup.cpp"
"../sut/deploy.cpp"
)
target_link_libraries
(
dnode_test_vgroup
PUBLIC dnode
PUBLIC util
PUBLIC os
PUBLIC gtest_main
)
target_include_directories
(
dnode_test_vgroup
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/server/dnode/mgmt"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../../inc"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../sut"
)
add_test
(
NAME dnode_test_vgroup
COMMAND dnode_test_vgroup
)
source/dnode/mgmt/impl/test/vgroup/vgroup.cpp
0 → 100644
浏览文件 @
fa66a89b
/**
* @file db.cpp
* @author slguan (slguan@taosdata.com)
* @brief DNODE module vgroup-msg tests
* @version 0.1
* @date 2021-12-20
*
* @copyright Copyright (c) 2021
*
*/
#include "deploy.h"
class
DndTestVgroup
:
public
::
testing
::
Test
{
protected:
static
SServer
*
CreateServer
(
const
char
*
path
,
const
char
*
fqdn
,
uint16_t
port
,
const
char
*
firstEp
)
{
SServer
*
pServer
=
createServer
(
path
,
fqdn
,
port
,
firstEp
);
ASSERT
(
pServer
);
return
pServer
;
}
static
void
SetUpTestSuite
()
{
initLog
(
"/tmp/tdlog"
);
const
char
*
fqdn
=
"localhost"
;
const
char
*
firstEp
=
"localhost:9150"
;
pServer
=
CreateServer
(
"/tmp/dnode_test_vgroup"
,
fqdn
,
9150
,
firstEp
);
pClient
=
createClient
(
"root"
,
"taosdata"
,
fqdn
,
9150
);
taosMsleep
(
1100
);
}
static
void
TearDownTestSuite
()
{
stopServer
(
pServer
);
dropClient
(
pClient
);
pServer
=
NULL
;
pClient
=
NULL
;
}
static
SServer
*
pServer
;
static
SClient
*
pClient
;
static
int32_t
connId
;
public:
void
SetUp
()
override
{}
void
TearDown
()
override
{}
void
SendTheCheckShowMetaMsg
(
int8_t
showType
,
const
char
*
showName
,
int32_t
columns
,
const
char
*
db
)
{
SShowMsg
*
pShow
=
(
SShowMsg
*
)
rpcMallocCont
(
sizeof
(
SShowMsg
));
pShow
->
type
=
showType
;
if
(
db
!=
NULL
)
{
strcpy
(
pShow
->
db
,
db
);
}
SRpcMsg
showRpcMsg
=
{
0
};
showRpcMsg
.
pCont
=
pShow
;
showRpcMsg
.
contLen
=
sizeof
(
SShowMsg
);
showRpcMsg
.
msgType
=
TSDB_MSG_TYPE_SHOW
;
sendMsg
(
pClient
,
&
showRpcMsg
);
ASSERT_NE
(
pClient
->
pRsp
,
nullptr
);
ASSERT_EQ
(
pClient
->
pRsp
->
code
,
0
);
ASSERT_NE
(
pClient
->
pRsp
->
pCont
,
nullptr
);
SShowRsp
*
pShowRsp
=
(
SShowRsp
*
)
pClient
->
pRsp
->
pCont
;
ASSERT_NE
(
pShowRsp
,
nullptr
);
pShowRsp
->
showId
=
htonl
(
pShowRsp
->
showId
);
pMeta
=
&
pShowRsp
->
tableMeta
;
pMeta
->
numOfTags
=
htonl
(
pMeta
->
numOfTags
);
pMeta
->
numOfColumns
=
htonl
(
pMeta
->
numOfColumns
);
pMeta
->
sversion
=
htonl
(
pMeta
->
sversion
);
pMeta
->
tversion
=
htonl
(
pMeta
->
tversion
);
pMeta
->
tuid
=
htobe64
(
pMeta
->
tuid
);
pMeta
->
suid
=
htobe64
(
pMeta
->
suid
);
showId
=
pShowRsp
->
showId
;
EXPECT_NE
(
pShowRsp
->
showId
,
0
);
EXPECT_STREQ
(
pMeta
->
tbFname
,
showName
);
EXPECT_EQ
(
pMeta
->
numOfTags
,
0
);
EXPECT_EQ
(
pMeta
->
numOfColumns
,
columns
);
EXPECT_EQ
(
pMeta
->
precision
,
0
);
EXPECT_EQ
(
pMeta
->
tableType
,
0
);
EXPECT_EQ
(
pMeta
->
update
,
0
);
EXPECT_EQ
(
pMeta
->
sversion
,
0
);
EXPECT_EQ
(
pMeta
->
tversion
,
0
);
EXPECT_EQ
(
pMeta
->
tuid
,
0
);
EXPECT_EQ
(
pMeta
->
suid
,
0
);
}
void
CheckSchema
(
int32_t
index
,
int8_t
type
,
int32_t
bytes
,
const
char
*
name
)
{
SSchema
*
pSchema
=
&
pMeta
->
pSchema
[
index
];
pSchema
->
bytes
=
htonl
(
pSchema
->
bytes
);
EXPECT_EQ
(
pSchema
->
colId
,
0
);
EXPECT_EQ
(
pSchema
->
type
,
type
);
EXPECT_EQ
(
pSchema
->
bytes
,
bytes
);
EXPECT_STREQ
(
pSchema
->
name
,
name
);
}
void
SendThenCheckShowRetrieveMsg
(
int32_t
rows
)
{
SRetrieveTableMsg
*
pRetrieve
=
(
SRetrieveTableMsg
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableMsg
));
pRetrieve
->
showId
=
htonl
(
showId
);
pRetrieve
->
free
=
0
;
SRpcMsg
retrieveRpcMsg
=
{
0
};
retrieveRpcMsg
.
pCont
=
pRetrieve
;
retrieveRpcMsg
.
contLen
=
sizeof
(
SRetrieveTableMsg
);
retrieveRpcMsg
.
msgType
=
TSDB_MSG_TYPE_SHOW_RETRIEVE
;
sendMsg
(
pClient
,
&
retrieveRpcMsg
);
ASSERT_NE
(
pClient
->
pRsp
,
nullptr
);
ASSERT_EQ
(
pClient
->
pRsp
->
code
,
0
);
ASSERT_NE
(
pClient
->
pRsp
->
pCont
,
nullptr
);
pRetrieveRsp
=
(
SRetrieveTableRsp
*
)
pClient
->
pRsp
->
pCont
;
ASSERT_NE
(
pRetrieveRsp
,
nullptr
);
pRetrieveRsp
->
numOfRows
=
htonl
(
pRetrieveRsp
->
numOfRows
);
pRetrieveRsp
->
useconds
=
htobe64
(
pRetrieveRsp
->
useconds
);
pRetrieveRsp
->
compLen
=
htonl
(
pRetrieveRsp
->
compLen
);
EXPECT_EQ
(
pRetrieveRsp
->
numOfRows
,
rows
);
EXPECT_EQ
(
pRetrieveRsp
->
useconds
,
0
);
// EXPECT_EQ(pRetrieveRsp->completed, completed);
EXPECT_EQ
(
pRetrieveRsp
->
precision
,
TSDB_TIME_PRECISION_MILLI
);
EXPECT_EQ
(
pRetrieveRsp
->
compressed
,
0
);
EXPECT_EQ
(
pRetrieveRsp
->
compLen
,
0
);
pData
=
pRetrieveRsp
->
data
;
pos
=
0
;
}
void
CheckInt8
(
int8_t
val
)
{
int8_t
data
=
*
((
int8_t
*
)(
pData
+
pos
));
pos
+=
sizeof
(
int8_t
);
EXPECT_EQ
(
data
,
val
);
}
void
CheckInt16
(
int16_t
val
)
{
int16_t
data
=
*
((
int16_t
*
)(
pData
+
pos
));
pos
+=
sizeof
(
int16_t
);
EXPECT_EQ
(
data
,
val
);
}
void
CheckInt32
(
int32_t
val
)
{
int32_t
data
=
*
((
int32_t
*
)(
pData
+
pos
));
pos
+=
sizeof
(
int32_t
);
EXPECT_EQ
(
data
,
val
);
}
void
CheckInt64
(
int64_t
val
)
{
int64_t
data
=
*
((
int64_t
*
)(
pData
+
pos
));
pos
+=
sizeof
(
int64_t
);
EXPECT_EQ
(
data
,
val
);
}
void
CheckTimestamp
()
{
int64_t
data
=
*
((
int64_t
*
)(
pData
+
pos
));
pos
+=
sizeof
(
int64_t
);
EXPECT_GT
(
data
,
0
);
}
void
CheckBinary
(
const
char
*
val
,
int32_t
len
)
{
pos
+=
sizeof
(
VarDataLenT
);
char
*
data
=
(
char
*
)(
pData
+
pos
);
pos
+=
len
;
EXPECT_STREQ
(
data
,
val
);
}
int32_t
showId
;
STableMetaMsg
*
pMeta
;
SRetrieveTableRsp
*
pRetrieveRsp
;
char
*
pData
;
int32_t
pos
;
};
SServer
*
DndTestVgroup
::
pServer
;
SClient
*
DndTestVgroup
::
pClient
;
int32_t
DndTestVgroup
::
connId
;
TEST_F
(
DndTestVgroup
,
01
_Create_Restart_Drop_Vnode
)
{
{
SCreateVnodeMsg
*
pReq
=
(
SCreateVnodeMsg
*
)
rpcMallocCont
(
sizeof
(
SCreateVnodeMsg
));
pReq
->
vgId
=
htonl
(
2
);
pReq
->
dnodeId
=
htonl
(
1
);
strcpy
(
pReq
->
db
,
"1.d1"
);
pReq
->
dbUid
=
htobe64
(
9527
);
pReq
->
cacheBlockSize
=
htonl
(
16
);
pReq
->
totalBlocks
=
htonl
(
10
);
pReq
->
daysPerFile
=
htonl
(
10
);
pReq
->
daysToKeep0
=
htonl
(
3650
);
pReq
->
daysToKeep1
=
htonl
(
3650
);
pReq
->
daysToKeep2
=
htonl
(
3650
);
pReq
->
minRows
=
htonl
(
100
);
pReq
->
minRows
=
htonl
(
4096
);
pReq
->
commitTime
=
htonl
(
3600
);
pReq
->
fsyncPeriod
=
htonl
(
3000
);
pReq
->
walLevel
=
1
;
pReq
->
precision
=
0
;
pReq
->
compression
=
2
;
pReq
->
replica
=
1
;
pReq
->
quorum
=
1
;
pReq
->
update
=
0
;
pReq
->
cacheLastRow
=
0
;
pReq
->
selfIndex
=
0
;
for
(
int
r
=
0
;
r
<
pReq
->
replica
;
++
r
)
{
SReplica
*
pReplica
=
&
pReq
->
replicas
[
r
];
pReplica
->
id
=
htonl
(
1
);
pReplica
->
port
=
htons
(
9150
);
}
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
pReq
;
rpcMsg
.
contLen
=
sizeof
(
SCreateVnodeMsg
);
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_CREATE_VNODE_IN
;
sendMsg
(
pClient
,
&
rpcMsg
);
SRpcMsg
*
pMsg
=
pClient
->
pRsp
;
ASSERT_NE
(
pMsg
,
nullptr
);
ASSERT_EQ
(
pMsg
->
code
,
0
);
taosMsleep
(
1000000
);
}
}
source/dnode/mnode/impl/inc/mndTrans.h
浏览文件 @
fa66a89b
...
...
@@ -30,8 +30,8 @@ void mndTransDrop(STrans *pTrans);
int32_t
mndTransAppendRedolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
int32_t
mndTransAppendUndolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
int32_t
mndTransAppendCommitlog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
int32_t
mndTransAppendRedoAction
(
STrans
*
pTrans
,
SEpSet
*
,
void
*
pMsg
);
int32_t
mndTransAppendUndoAction
(
STrans
*
pTrans
,
SEpSet
*
,
void
*
pMsg
);
int32_t
mndTransAppendRedoAction
(
STrans
*
pTrans
,
SEpSet
*
pEpSet
,
int8_t
msgType
,
int32_t
contLen
,
void
*
pCont
);
int32_t
mndTransAppendUndoAction
(
STrans
*
pTrans
,
SEpSet
*
pEpSet
,
int8_t
msgType
,
int32_t
contLen
,
void
*
pCont
);
int32_t
mndTransPrepare
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
void
mndTransApply
(
SMnode
*
pMnode
,
SSdbRaw
*
pRaw
,
STransMsg
*
pMsg
,
int32_t
code
);
char
*
mndTransStageStr
(
ETrnStage
stage
);
...
...
source/dnode/mnode/impl/inc/mndVgroup.h
浏览文件 @
fa66a89b
...
...
@@ -28,7 +28,9 @@ SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId);
void
mndReleaseVgroup
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
);
int32_t
mndAllocVgroup
(
SMnode
*
pMnode
,
SDbObj
*
pDb
,
SVgObj
**
ppVgroups
);
SSdbRaw
*
mndVgroupActionEncode
(
SVgObj
*
pVgroup
);
SSdbRow
*
mndVgroupActionDecode
(
SSdbRaw
*
pRaw
);
SCreateVnodeMsg
*
mndBuildCreateVnodeMsg
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
);
SDropVnodeMsg
*
mndBuildDropVnodeMsg
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
fa66a89b
...
...
@@ -285,10 +285,60 @@ static int32_t mndSetCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVg
}
static
int32_t
mndSetRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroups
)
{
for
(
int
v
=
0
;
v
<
pDb
->
cfg
.
numOfVgroups
;
++
v
)
{
SVgObj
*
pVgroup
=
pVgroups
+
v
;
for
(
int32_t
vn
=
0
;
vn
<
pVgroup
->
replica
;
++
vn
)
{
SVnodeGid
*
pVgid
=
pVgroup
->
vnodeGid
+
vn
;
SDnodeObj
*
pDnode
=
mndAcquireDnode
(
pMnode
,
pVgid
->
dnodeId
);
if
(
pDnode
==
NULL
)
{
return
-
1
;
}
SEpSet
epset
=
mndGetDnodeEpset
(
pDnode
);
mndReleaseDnode
(
pMnode
,
pDnode
);
SCreateVnodeMsg
*
pMsg
=
mndBuildCreateVnodeMsg
(
pMnode
,
pDnode
,
pDb
,
pVgroup
);
if
(
pMsg
==
NULL
)
{
return
-
1
;
}
if
(
mndTransAppendRedoAction
(
pTrans
,
&
epset
,
TSDB_MSG_TYPE_ALTER_VNODE_IN
,
sizeof
(
SCreateVnodeMsg
),
pMsg
)
!=
0
)
{
free
(
pMsg
);
return
-
1
;
}
}
}
return
0
;
}
static
int32_t
mndSetUndoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDbObj
*
pDb
,
SVgObj
*
pVgroups
)
{
for
(
int
v
=
0
;
v
<
pDb
->
cfg
.
numOfVgroups
;
++
v
)
{
SVgObj
*
pVgroup
=
pVgroups
+
v
;
for
(
int32_t
vn
=
0
;
vn
<
pVgroup
->
replica
;
++
vn
)
{
SVnodeGid
*
pVgid
=
pVgroup
->
vnodeGid
+
vn
;
SDnodeObj
*
pDnode
=
mndAcquireDnode
(
pMnode
,
pVgid
->
dnodeId
);
if
(
pDnode
==
NULL
)
{
return
-
1
;
}
SEpSet
epset
=
mndGetDnodeEpset
(
pDnode
);
mndReleaseDnode
(
pMnode
,
pDnode
);
SDropVnodeMsg
*
pMsg
=
mndBuildDropVnodeMsg
(
pMnode
,
pDnode
,
pDb
,
pVgroup
);
if
(
pMsg
==
NULL
)
{
return
-
1
;
}
if
(
mndTransAppendUndoAction
(
pTrans
,
&
epset
,
TSDB_MSG_TYPE_DROP_VNODE_IN
,
sizeof
(
SDropVnodeMsg
),
pMsg
)
!=
0
)
{
free
(
pMsg
);
return
-
1
;
}
}
}
return
0
;
}
...
...
@@ -644,7 +694,7 @@ static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) {
return
-
1
;
}
int32_t
contLen
=
sizeof
(
SUseDbRsp
)
+
pDb
->
cfg
.
numOfVgroups
*
sizeof
(
SVgroupInfo
);
int32_t
contLen
=
sizeof
(
SUseDbRsp
)
+
pDb
->
cfg
.
numOfVgroups
*
sizeof
(
SVgroupInfo
);
SUseDbRsp
*
pRsp
=
rpcMallocCont
(
contLen
);
if
(
pRsp
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
fa66a89b
...
...
@@ -180,8 +180,12 @@ static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOldDnode, SDnodeObj
}
SDnodeObj
*
mndAcquireDnode
(
SMnode
*
pMnode
,
int32_t
dnodeId
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
return
sdbAcquire
(
pSdb
,
SDB_DNODE
,
&
dnodeId
);
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SDnodeObj
*
pDnode
=
sdbAcquire
(
pSdb
,
SDB_DNODE
,
&
dnodeId
);
if
(
pDnode
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DNODE_NOT_EXIST
;
}
return
pDnode
;
}
void
mndReleaseDnode
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
)
{
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
fa66a89b
...
...
@@ -21,6 +21,13 @@
#define TSDB_TRN_ARRAY_SIZE 8
#define TSDB_TRN_RESERVE_SIZE 64
typedef
struct
{
SEpSet
epSet
;
int8_t
msgType
;
int32_t
contLen
;
void
*
pCont
;
}
STransAction
;
static
SSdbRaw
*
mndTransActionEncode
(
STrans
*
pTrans
);
static
SSdbRow
*
mndTransActionDecode
(
SSdbRaw
*
pRaw
);
static
int32_t
mndTransActionInsert
(
SSdb
*
pSdb
,
STrans
*
pTrans
);
...
...
@@ -29,9 +36,12 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans);
static
void
mndTransSetRpcHandle
(
STrans
*
pTrans
,
void
*
rpcHandle
);
static
void
mndTransSendRpcRsp
(
STrans
*
pTrans
,
int32_t
code
);
static
int32_t
mndTransAppendArray
(
SArray
*
pArray
,
SSdbRaw
*
pRaw
);
static
void
mndTransDropArray
(
SArray
*
pArray
);
static
int32_t
mndTransExecuteArray
(
SMnode
*
pMnode
,
SArray
*
pArray
);
static
int32_t
mndTransAppendLog
(
SArray
*
pArray
,
SSdbRaw
*
pRaw
);
static
int32_t
mndTransAppendAction
(
SArray
*
pArray
,
SEpSet
*
pEpSet
,
int8_t
msgType
,
int32_t
contLen
,
void
*
pCont
);
static
void
mndTransDropLogs
(
SArray
*
pArray
);
static
void
mndTransDropActions
(
SArray
*
pArray
);
static
int32_t
mndTransExecuteLogs
(
SMnode
*
pMnode
,
SArray
*
pArray
);
static
int32_t
mndTransExecuteActions
(
SMnode
*
pMnode
,
SArray
*
pArray
);
static
int32_t
mndTransExecuteRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransExecuteUndoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
static
int32_t
mndTransExecuteCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
...
...
@@ -58,7 +68,7 @@ int32_t mndInitTrans(SMnode *pMnode) {
void
mndCleanupTrans
(
SMnode
*
pMnode
)
{}
static
SSdbRaw
*
mndTransActionEncode
(
STrans
*
pTrans
)
{
int32_t
rawDataLen
=
16
*
sizeof
(
int32_t
)
+
TSDB_TRN_RESERVE_SIZE
;
int32_t
rawDataLen
=
sizeof
(
STrans
)
+
TSDB_TRN_RESERVE_SIZE
;
int32_t
redoLogNum
=
taosArrayGetSize
(
pTrans
->
redoLogs
);
int32_t
undoLogNum
=
taosArrayGetSize
(
pTrans
->
undoLogs
);
int32_t
commitLogNum
=
taosArrayGetSize
(
pTrans
->
commitLogs
);
...
...
@@ -80,6 +90,16 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
rawDataLen
+=
sdbGetRawTotalSize
(
pTmp
);
}
for
(
int32_t
i
=
0
;
i
<
redoActionNum
;
++
i
)
{
STransAction
*
pAction
=
taosArrayGet
(
pTrans
->
redoActions
,
i
);
rawDataLen
+=
(
sizeof
(
STransAction
)
+
pAction
->
contLen
);
}
for
(
int32_t
i
=
0
;
i
<
undoActionNum
;
++
i
)
{
STransAction
*
pAction
=
taosArrayGet
(
pTrans
->
undoActions
,
i
);
rawDataLen
+=
(
sizeof
(
STransAction
)
+
pAction
->
contLen
);
}
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_TRANS
,
TSDB_TRANS_VER
,
rawDataLen
);
if
(
pRaw
==
NULL
)
{
mError
(
"trans:%d, failed to alloc raw since %s"
,
pTrans
->
id
,
terrstr
());
...
...
@@ -116,6 +136,22 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
pTmp
,
len
)
}
for
(
int32_t
i
=
0
;
i
<
redoActionNum
;
++
i
)
{
STransAction
*
pAction
=
taosArrayGet
(
pTrans
->
redoActions
,
i
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
&
pAction
->
epSet
,
sizeof
(
SEpSet
));
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
msgType
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
contLen
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pAction
->
pCont
,
pAction
->
contLen
);
}
for
(
int32_t
i
=
0
;
i
<
undoActionNum
;
++
i
)
{
STransAction
*
pAction
=
taosArrayGet
(
pTrans
->
undoActions
,
i
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
&
pAction
->
epSet
,
sizeof
(
SEpSet
));
SDB_SET_INT8
(
pRaw
,
dataPos
,
pAction
->
msgType
)
SDB_SET_INT32
(
pRaw
,
dataPos
,
pAction
->
contLen
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
pAction
->
pCont
,
pAction
->
contLen
);
}
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
TSDB_TRN_RESERVE_SIZE
)
SDB_SET_DATALEN
(
pRaw
,
dataPos
);
mTrace
(
"trans:%d, encode to raw:%p, len:%d"
,
pTrans
->
id
,
pRaw
,
dataPos
);
...
...
@@ -147,8 +183,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
pTrans
->
redoLogs
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoLogs
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
commitLogs
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoActions
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoActions
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoActions
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
STransAction
));
pTrans
->
undoActions
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
STransAction
));
if
(
pTrans
->
redoLogs
==
NULL
||
pTrans
->
undoLogs
==
NULL
||
pTrans
->
commitLogs
==
NULL
||
pTrans
->
redoActions
==
NULL
||
pTrans
->
undoActions
==
NULL
)
{
...
...
@@ -175,42 +211,77 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
for
(
int32_t
i
=
0
;
i
<
redoLogNum
;
++
i
)
{
int32_t
dataLen
=
0
;
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
dataLen
)
char
*
pData
=
malloc
(
dataLen
);
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
pData
,
dataLen
);
void
*
ret
=
taosArrayPush
(
pTrans
->
redoLogs
,
&
pData
);
if
(
ret
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TRANS_DECODE_OVER
;
break
;
}
}
for
(
int32_t
i
=
0
;
i
<
undoLogNum
;
++
i
)
{
int32_t
dataLen
=
0
;
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
dataLen
)
char
*
pData
=
malloc
(
dataLen
);
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
pData
,
dataLen
);
void
*
ret
=
taosArrayPush
(
pTrans
->
undoLogs
,
&
pData
);
if
(
ret
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TRANS_DECODE_OVER
;
break
;
}
}
for
(
int32_t
i
=
0
;
i
<
commitLogNum
;
++
i
)
{
int32_t
dataLen
=
0
;
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
dataLen
)
char
*
pData
=
malloc
(
dataLen
);
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
pData
,
dataLen
);
void
*
ret
=
taosArrayPush
(
pTrans
->
commitLogs
,
&
pData
);
if
(
ret
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TRANS_DECODE_OVER
;
break
;
}
}
for
(
int32_t
i
=
0
;
i
<
redoActionNum
;
++
i
)
{
STransAction
action
=
{
0
};
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
(
void
*
)
&
action
.
epSet
,
sizeof
(
SEpSet
));
SDB_GET_INT8
(
pRaw
,
pRow
,
dataPos
,
&
action
.
msgType
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
action
.
contLen
)
action
.
pCont
=
malloc
(
action
.
contLen
);
if
(
action
.
pCont
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TRANS_DECODE_OVER
;
}
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
action
.
pCont
,
action
.
contLen
);
void
*
ret
=
taosArrayPush
(
pTrans
->
redoActions
,
&
action
);
if
(
ret
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TRANS_DECODE_OVER
;
}
}
for
(
int32_t
i
=
0
;
i
<
undoActionNum
;
++
i
)
{
STransAction
action
=
{
0
};
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
(
void
*
)
&
action
.
epSet
,
sizeof
(
SEpSet
));
SDB_GET_INT8
(
pRaw
,
pRow
,
dataPos
,
&
action
.
msgType
)
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
action
.
contLen
)
action
.
pCont
=
malloc
(
action
.
contLen
);
if
(
action
.
pCont
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TRANS_DECODE_OVER
;
}
SDB_GET_BINARY
(
pRaw
,
pRow
,
dataPos
,
action
.
pCont
,
action
.
contLen
);
void
*
ret
=
taosArrayPush
(
pTrans
->
undoActions
,
&
action
);
if
(
ret
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TRANS_DECODE_OVER
;
}
}
...
...
@@ -237,11 +308,11 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
static
int32_t
mndTransActionDelete
(
SSdb
*
pSdb
,
STrans
*
pTrans
)
{
mTrace
(
"trans:%d, perform delete action, stage:%s"
,
pTrans
->
id
,
mndTransStageStr
(
pTrans
->
stage
));
mndTransDrop
Array
(
pTrans
->
redoLogs
);
mndTransDrop
Array
(
pTrans
->
undoLogs
);
mndTransDrop
Array
(
pTrans
->
commitLogs
);
mndTransDropA
rray
(
pTrans
->
redoActions
);
mndTransDropA
rray
(
pTrans
->
undoActions
);
mndTransDrop
Logs
(
pTrans
->
redoLogs
);
mndTransDrop
Logs
(
pTrans
->
undoLogs
);
mndTransDrop
Logs
(
pTrans
->
commitLogs
);
mndTransDropA
ctions
(
pTrans
->
redoActions
);
mndTransDropA
ctions
(
pTrans
->
undoActions
);
return
0
;
}
...
...
@@ -274,6 +345,8 @@ char *mndTransStageStr(ETrnStage stage) {
return
"rollback"
;
case
TRN_STAGE_RETRY
:
return
"retry"
;
case
TRN_STAGE_OVER
:
return
"stop"
;
default:
return
"undefined"
;
}
...
...
@@ -305,8 +378,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
pTrans
->
redoLogs
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoLogs
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
commitLogs
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoActions
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
undoActions
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
void
*
));
pTrans
->
redoActions
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
STransAction
));
pTrans
->
undoActions
=
taosArrayInit
(
TSDB_TRN_ARRAY_SIZE
,
sizeof
(
STransAction
));
if
(
pTrans
->
redoLogs
==
NULL
||
pTrans
->
undoLogs
==
NULL
||
pTrans
->
commitLogs
==
NULL
||
pTrans
->
redoActions
==
NULL
||
pTrans
->
undoActions
==
NULL
)
{
...
...
@@ -319,7 +392,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
return
pTrans
;
}
static
void
mndTransDrop
Array
(
SArray
*
pArray
)
{
static
void
mndTransDrop
Logs
(
SArray
*
pArray
)
{
for
(
int32_t
i
=
0
;
i
<
pArray
->
size
;
++
i
)
{
SSdbRaw
*
pRaw
=
taosArrayGetP
(
pArray
,
i
);
tfree
(
pRaw
);
...
...
@@ -328,12 +401,21 @@ static void mndTransDropArray(SArray *pArray) {
taosArrayDestroy
(
pArray
);
}
static
void
mndTransDropActions
(
SArray
*
pArray
)
{
for
(
int32_t
i
=
0
;
i
<
pArray
->
size
;
++
i
)
{
STransAction
*
pAction
=
taosArrayGet
(
pArray
,
i
);
free
(
pAction
->
pCont
);
}
taosArrayDestroy
(
pArray
);
}
void
mndTransDrop
(
STrans
*
pTrans
)
{
mndTransDrop
Array
(
pTrans
->
redoLogs
);
mndTransDrop
Array
(
pTrans
->
undoLogs
);
mndTransDrop
Array
(
pTrans
->
commitLogs
);
mndTransDropA
rray
(
pTrans
->
redoActions
);
mndTransDropA
rray
(
pTrans
->
undoActions
);
mndTransDrop
Logs
(
pTrans
->
redoLogs
);
mndTransDrop
Logs
(
pTrans
->
undoLogs
);
mndTransDrop
Logs
(
pTrans
->
commitLogs
);
mndTransDropA
ctions
(
pTrans
->
redoActions
);
mndTransDropA
ctions
(
pTrans
->
undoActions
);
mDebug
(
"trans:%d, data:%p is dropped"
,
pTrans
->
id
,
pTrans
);
tfree
(
pTrans
);
...
...
@@ -344,7 +426,7 @@ static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle) {
mTrace
(
"trans:%d, set rpc handle:%p"
,
pTrans
->
id
,
rpcHandle
);
}
static
int32_t
mndTransAppend
Array
(
SArray
*
pArray
,
SSdbRaw
*
pRaw
)
{
static
int32_t
mndTransAppend
Log
(
SArray
*
pArray
,
SSdbRaw
*
pRaw
)
{
if
(
pArray
==
NULL
||
pRaw
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
...
...
@@ -360,32 +442,44 @@ static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw) {
}
int32_t
mndTransAppendRedolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
int32_t
code
=
mndTransAppend
Array
(
pTrans
->
redoLogs
,
pRaw
);
int32_t
code
=
mndTransAppend
Log
(
pTrans
->
redoLogs
,
pRaw
);
mTrace
(
"trans:%d, raw:%p append to redo logs, code:0x%x"
,
pTrans
->
id
,
pRaw
,
code
);
return
code
;
}
int32_t
mndTransAppendUndolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
int32_t
code
=
mndTransAppend
Array
(
pTrans
->
undoLogs
,
pRaw
);
int32_t
code
=
mndTransAppend
Log
(
pTrans
->
undoLogs
,
pRaw
);
mTrace
(
"trans:%d, raw:%p append to undo logs, code:0x%x"
,
pTrans
->
id
,
pRaw
,
code
);
return
code
;
}
int32_t
mndTransAppendCommitlog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
)
{
int32_t
code
=
mndTransAppend
Array
(
pTrans
->
commitLogs
,
pRaw
);
int32_t
code
=
mndTransAppend
Log
(
pTrans
->
commitLogs
,
pRaw
);
mTrace
(
"trans:%d, raw:%p append to commit logs, code:0x%x"
,
pTrans
->
id
,
pRaw
,
code
);
return
code
;
}
int32_t
mndTransAppendRedoAction
(
STrans
*
pTrans
,
SEpSet
*
pEpSet
,
void
*
pMsg
)
{
int32_t
code
=
mndTransAppendArray
(
pTrans
->
redoActions
,
pMsg
);
mTrace
(
"trans:%d, msg:%p append to redo actions"
,
pTrans
->
id
,
pMsg
);
static
int32_t
mndTransAppendAction
(
SArray
*
pArray
,
SEpSet
*
pEpSet
,
int8_t
msgType
,
int32_t
contLen
,
void
*
pCont
)
{
STransAction
action
=
{.
epSet
=
*
pEpSet
,
.
msgType
=
msgType
,
.
contLen
=
contLen
,
.
pCont
=
pCont
};
void
*
ptr
=
taosArrayPush
(
pArray
,
&
action
);
if
(
ptr
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
return
0
;
}
int32_t
mndTransAppendRedoAction
(
STrans
*
pTrans
,
SEpSet
*
pEpSet
,
int8_t
msgType
,
int32_t
contLen
,
void
*
pCont
)
{
int32_t
code
=
mndTransAppendAction
(
pTrans
->
redoActions
,
pEpSet
,
msgType
,
contLen
,
pCont
);
mTrace
(
"trans:%d, msg:%s len:%d append to redo actions"
,
pTrans
->
id
,
taosMsg
[
msgType
],
contLen
);
return
code
;
}
int32_t
mndTransAppendUndoAction
(
STrans
*
pTrans
,
SEpSet
*
pEpSet
,
void
*
pMsg
)
{
int32_t
code
=
mndTransAppendA
rray
(
pTrans
->
undoActions
,
pMsg
);
mTrace
(
"trans:%d, msg:%
p append to undo actions"
,
pTrans
->
id
,
pMsg
);
int32_t
mndTransAppendUndoAction
(
STrans
*
pTrans
,
SEpSet
*
pEpSet
,
int8_t
msgType
,
int32_t
contLen
,
void
*
pCont
)
{
int32_t
code
=
mndTransAppendA
ction
(
pTrans
->
undoActions
,
pEpSet
,
msgType
,
contLen
,
pCont
);
mTrace
(
"trans:%d, msg:%
s len:%d append to undo actions"
,
pTrans
->
id
,
taosMsg
[
msgType
],
contLen
);
return
code
;
}
...
...
@@ -502,7 +596,7 @@ void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code)
// todo
}
static
int32_t
mndTransExecute
Array
(
SMnode
*
pMnode
,
SArray
*
pArray
)
{
static
int32_t
mndTransExecute
Logs
(
SMnode
*
pMnode
,
SArray
*
pArray
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
arraySize
=
taosArrayGetSize
(
pArray
);
...
...
@@ -520,7 +614,7 @@ static int32_t mndTransExecuteArray(SMnode *pMnode, SArray *pArray) {
static
int32_t
mndTransExecuteRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
0
;
if
(
taosArrayGetSize
(
pTrans
->
redoLogs
)
!=
0
)
{
code
=
mndTransExecute
Array
(
pMnode
,
pTrans
->
redoLogs
);
code
=
mndTransExecute
Logs
(
pMnode
,
pTrans
->
redoLogs
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to execute redo logs since %s"
,
pTrans
->
id
,
terrstr
())
}
else
{
...
...
@@ -534,7 +628,7 @@ static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) {
static
int32_t
mndTransExecuteUndoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
0
;
if
(
taosArrayGetSize
(
pTrans
->
undoLogs
)
!=
0
)
{
code
=
mndTransExecute
Array
(
pMnode
,
pTrans
->
undoLogs
);
code
=
mndTransExecute
Logs
(
pMnode
,
pTrans
->
undoLogs
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to execute undo logs since %s"
,
pTrans
->
id
,
terrstr
())
}
else
{
...
...
@@ -548,7 +642,7 @@ static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) {
static
int32_t
mndTransExecuteCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
code
=
0
;
if
(
taosArrayGetSize
(
pTrans
->
commitLogs
)
!=
0
)
{
code
=
mndTransExecute
Array
(
pMnode
,
pTrans
->
commitLogs
);
code
=
mndTransExecute
Logs
(
pMnode
,
pTrans
->
commitLogs
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to execute commit logs since %s"
,
pTrans
->
id
,
terrstr
())
}
else
{
...
...
@@ -559,18 +653,40 @@ static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) {
return
code
;
}
static
int32_t
mndTransExecuteRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
if
(
taosArrayGetSize
(
pTrans
->
redoActions
)
!=
0
)
{
mTrace
(
"trans:%d, execute redo actions finished"
,
pTrans
->
id
);
static
int32_t
mndTransExecuteActions
(
SMnode
*
pMnode
,
SArray
*
pArray
)
{
#if 0
int32_t arraySize = taosArrayGetSize(pArray);
for (int32_t i = 0; i < arraySize; ++i) {
STransAction *pAction = taosArrayGet(pArray, i);
SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen};
rpcMsg.pCont = rpcMallocCont(pAction->contLen);
if (rpcMsg.pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
mndSendMsgToDnode(pMnode, &pAction->epSet, &rpcMsg);
}
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
#else
return
0
;
#endif
}
static
int32_t
mndTransExecuteRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
if
(
taosArrayGetSize
(
pTrans
->
redoActions
)
<=
0
)
return
0
;
mTrace
(
"trans:%d, start to execute redo actions"
,
pTrans
->
id
);
return
mndTransExecuteActions
(
pMnode
,
pTrans
->
redoActions
);
}
static
int32_t
mndTransExecuteUndoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
if
(
taosArrayGetSize
(
pTrans
->
undoActions
)
!=
0
)
{
mTrace
(
"trans:%d, execute undo actions finished"
,
pTrans
->
id
);
}
return
0
;
if
(
taosArrayGetSize
(
pTrans
->
undoActions
)
<=
0
)
return
0
;
mTrace
(
"trans:%d, start to execute undo actions"
,
pTrans
->
id
);
return
mndTransExecuteActions
(
pMnode
,
pTrans
->
undoActions
)
;
}
static
int32_t
mndTransPerformPrepareStage
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
fa66a89b
...
...
@@ -24,9 +24,10 @@
#define TSDB_VGROUP_VER_NUM 1
#define TSDB_VGROUP_RESERVE_SIZE 64
static
int32_t
mndVgroupActionInsert
(
SSdb
*
pSdb
,
SVgObj
*
pVgroup
);
static
int32_t
mndVgroupActionDelete
(
SSdb
*
pSdb
,
SVgObj
*
pVgroup
);
static
int32_t
mndVgroupActionUpdate
(
SSdb
*
pSdb
,
SVgObj
*
pOldVgroup
,
SVgObj
*
pNewVgroup
);
static
SSdbRow
*
mndVgroupActionDecode
(
SSdbRaw
*
pRaw
);
static
int32_t
mndVgroupActionInsert
(
SSdb
*
pSdb
,
SVgObj
*
pVgroup
);
static
int32_t
mndVgroupActionDelete
(
SSdb
*
pSdb
,
SVgObj
*
pVgroup
);
static
int32_t
mndVgroupActionUpdate
(
SSdb
*
pSdb
,
SVgObj
*
pOldVgroup
,
SVgObj
*
pNewVgroup
);
static
int32_t
mndProcessCreateVnodeRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessAlterVnodeRsp
(
SMnodeMsg
*
pMsg
);
...
...
@@ -156,6 +157,80 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
sdbRelease
(
pSdb
,
pVgroup
);
}
SCreateVnodeMsg
*
mndBuildCreateVnodeMsg
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
)
{
SCreateVnodeMsg
*
pCreate
=
malloc
(
sizeof
(
SCreateVnodeMsg
));
if
(
pCreate
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
pCreate
->
dnodeId
=
htonl
(
pDnode
->
id
);
pCreate
->
vgId
=
htonl
(
pVgroup
->
vgId
);
memcpy
(
pCreate
->
db
,
pDb
->
name
,
TSDB_FULL_DB_NAME_LEN
);
pCreate
->
dbUid
=
htobe64
(
pDb
->
uid
);
pCreate
->
cacheBlockSize
=
htonl
(
pDb
->
cfg
.
cacheBlockSize
);
pCreate
->
totalBlocks
=
htonl
(
pDb
->
cfg
.
totalBlocks
);
pCreate
->
daysPerFile
=
htonl
(
pDb
->
cfg
.
daysPerFile
);
pCreate
->
daysToKeep0
=
htonl
(
pDb
->
cfg
.
daysToKeep0
);
pCreate
->
daysToKeep1
=
htonl
(
pDb
->
cfg
.
daysToKeep1
);
pCreate
->
daysToKeep2
=
htonl
(
pDb
->
cfg
.
daysToKeep2
);
pCreate
->
minRows
=
htonl
(
pDb
->
cfg
.
minRows
);
pCreate
->
maxRows
=
htonl
(
pDb
->
cfg
.
maxRows
);
pCreate
->
commitTime
=
htonl
(
pDb
->
cfg
.
commitTime
);
pCreate
->
fsyncPeriod
=
htonl
(
pDb
->
cfg
.
fsyncPeriod
);
pCreate
->
walLevel
=
pDb
->
cfg
.
walLevel
;
pCreate
->
precision
=
pDb
->
cfg
.
precision
;
pCreate
->
compression
=
pDb
->
cfg
.
compression
;
pCreate
->
quorum
=
pDb
->
cfg
.
quorum
;
pCreate
->
update
=
pDb
->
cfg
.
update
;
pCreate
->
cacheLastRow
=
pDb
->
cfg
.
cacheLastRow
;
pCreate
->
replica
=
pVgroup
->
replica
;
pCreate
->
selfIndex
=
-
1
;
for
(
int32_t
v
=
0
;
v
<
pVgroup
->
replica
;
++
v
)
{
SReplica
*
pReplica
=
&
pCreate
->
replicas
[
v
];
SVnodeGid
*
pVgid
=
&
pVgroup
->
vnodeGid
[
v
];
SDnodeObj
*
pVgidDnode
=
mndAcquireDnode
(
pMnode
,
pVgid
->
dnodeId
);
if
(
pVgidDnode
==
NULL
)
{
free
(
pCreate
);
terrno
=
TSDB_CODE_MND_APP_ERROR
;
return
NULL
;
}
pReplica
->
id
=
htonl
(
pVgidDnode
->
id
);
pReplica
->
port
=
htons
(
pVgidDnode
->
port
);
memcpy
(
pReplica
->
fqdn
,
pVgidDnode
->
fqdn
,
TSDB_FQDN_LEN
);
mndReleaseDnode
(
pMnode
,
pVgidDnode
);
if
(
pDnode
->
id
==
pVgid
->
dnodeId
)
{
pCreate
->
selfIndex
=
v
;
}
}
if
(
pCreate
->
selfIndex
==
-
1
)
{
free
(
pCreate
);
terrno
=
TSDB_CODE_MND_APP_ERROR
;
return
NULL
;
}
return
pCreate
;
}
SDropVnodeMsg
*
mndBuildDropVnodeMsg
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
SDbObj
*
pDb
,
SVgObj
*
pVgroup
)
{
SDropVnodeMsg
*
pDrop
=
malloc
(
sizeof
(
SDropVnodeMsg
));
if
(
pDrop
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
pDrop
->
dnodeId
=
htonl
(
pDnode
->
id
);
pDrop
->
vgId
=
htonl
(
pVgroup
->
vgId
);
memcpy
(
pDrop
->
db
,
pDb
->
name
,
TSDB_FULL_DB_NAME_LEN
);
pDrop
->
dbUid
=
htobe64
(
pDb
->
uid
);
return
pDrop
;
}
static
int32_t
mndGetAvailableDnode
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
allocedVnodes
=
0
;
...
...
source/dnode/vnode/impl/src/vnodeCommit.c
浏览文件 @
fa66a89b
...
...
@@ -25,6 +25,10 @@ int vnodeAsyncCommit(SVnode *pVnode) {
pTask
->
execute
=
vnodeCommit
;
// TODO
pTask
->
arg
=
pVnode
;
// TODO
tsdbPrepareCommit
(
pVnode
->
pTsdb
);
// metaPrepareCommit(pVnode->pMeta);
// walPreapareCommit(pVnode->pWal);
vnodeScheduleTask
(
pTask
);
return
0
;
}
...
...
@@ -32,6 +36,10 @@ int vnodeAsyncCommit(SVnode *pVnode) {
int
vnodeCommit
(
void
*
arg
)
{
SVnode
*
pVnode
=
(
SVnode
*
)
arg
;
metaCommit
(
pVnode
->
pMeta
);
tqCommit
(
pVnode
->
pTq
);
tsdbCommit
(
pVnode
->
pTq
);
vnodeBufPoolRecycle
(
pVnode
);
// TODO
return
0
;
...
...
source/dnode/vnode/tq/src/tq.c
浏览文件 @
fa66a89b
...
...
@@ -50,7 +50,7 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemA
pTq
->
tqConfig
=
tqConfig
;
pTq
->
tqLogReader
=
tqLogReader
;
pTq
->
tqMemRef
.
pAlloctorFactory
=
allocFac
;
pTq
->
tqMemRef
.
pAllocator
=
allocFac
->
create
(
allocFac
);
//
pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
if
(
pTq
->
tqMemRef
.
pAllocator
==
NULL
)
{
// TODO
}
...
...
source/dnode/vnode/tsdb/src/tsdbCommit.c
浏览文件 @
fa66a89b
...
...
@@ -15,7 +15,19 @@
#include "tsdbDef.h"
int
tsdbPrepareCommit
(
STsdb
*
pTsdb
)
{
if
(
pTsdb
->
mem
==
NULL
)
return
0
;
// tsem_wait(&(pTsdb->canCommit));
ASSERT
(
pTsdb
->
imem
==
NULL
);
pTsdb
->
imem
=
pTsdb
->
mem
;
pTsdb
->
mem
=
NULL
;
}
int
tsdbCommit
(
STsdb
*
pTsdb
)
{
// TODO
pTsdb
->
imem
=
NULL
;
// tsem_post(&(pTsdb->canCommit));
return
0
;
}
\ No newline at end of file
source/libs/index/CMakeLists.txt
浏览文件 @
fa66a89b
...
...
@@ -22,9 +22,13 @@ if (${BUILD_WITH_LUCENE})
index
PUBLIC lucene++
)
endif
(
${
BUILD_WITH_LUCENE
}
)
if
(
${
BUILD_WITH_INVERTEDINDEX
}
)
add_definitions
(
-DUSE_INVERTED_INDEX
)
endif
(
${
BUILD_WITH_INVERTEDINDEX
}
)
if
(
${
BUILD_TEST
}
)
add_subdirectory
(
test
)
endif
(
${
BUILD_TEST
}
)
...
...
source/libs/index/inc/indexInt.h
浏览文件 @
fa66a89b
...
...
@@ -37,10 +37,10 @@ struct SIndex {
#endif
void
*
cache
;
void
*
tindex
;
SHashObj
*
field
Obj
;
// < field name, field id>
SHashObj
*
col
Obj
;
// < field name, field id>
int64_t
suid
;
// current super table id, -1 is normal table
int
field
Id
;
// field id allocated to cache
int
col
Id
;
// field id allocated to cache
int32_t
cVersion
;
// current version allocated to cache
pthread_mutex_t
mtx
;
};
...
...
@@ -60,22 +60,21 @@ struct SIndexMultiTermQuery {
// field and key;
typedef
struct
SIndexTerm
{
uint8_t
type
;
// term data type, str/interger/json
char
*
key
;
int32_t
nKey
;
char
*
val
;
int32_t
nVal
;
int64_t
suid
;
SIndexOperOnColumn
operType
;
// oper type, add/del/update
uint8_t
colType
;
// term data type, str/interger/json
char
*
colName
;
int32_t
nColName
;
char
*
colVal
;
int32_t
nColVal
;
}
SIndexTerm
;
typedef
struct
SIndexTermQuery
{
SIndexTerm
*
field_value
;
EIndexQueryType
t
ype
;
SIndexTerm
*
term
;
EIndexQueryType
qT
ype
;
}
SIndexTermQuery
;
SIndexTerm
*
indexTermCreate
(
const
char
*
key
,
int32_t
nKey
,
const
char
*
val
,
int32_t
nVal
);
void
indexTermDestroy
(
SIndexTerm
*
p
);
#define indexFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("index FATAL ", 255, __VA_ARGS__); }} while(0)
#define indexError(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("index ERROR ", 255, __VA_ARGS__); }} while(0)
...
...
source/libs/index/inc/index_cache.h
浏览文件 @
fa66a89b
...
...
@@ -38,13 +38,13 @@ typedef struct IndexCache {
//
IndexCache
*
indexCacheCreate
();
void
indexCacheDestroy
(
IndexCache
*
cache
);
void
indexCacheDestroy
(
void
*
cache
);
int
indexCachePut
(
IndexCache
*
cache
,
int16_t
fieldId
,
int16_t
fieldType
,
const
char
*
fieldValue
,
int32_t
fvLen
,
int
indexCachePut
(
void
*
cache
,
int16_t
fieldId
,
int16_t
fieldType
,
const
char
*
fieldValue
,
int32_t
fvLen
,
uint32_t
version
,
uint64_t
uid
,
int8_t
operType
);
int
indexCacheGet
(
IndexCache
*
cache
,
uint64_t
*
rst
);
int
indexCacheSearch
(
IndexCache
*
cache
,
SIndexMultiTermQuery
*
query
,
SArray
*
result
);
int
indexCacheGet
(
void
*
cache
,
uint64_t
*
rst
);
int
indexCacheSearch
(
void
*
cache
,
SIndexMultiTermQuery
*
query
,
SArray
*
result
);
#ifdef __cplusplus
}
...
...
source/libs/index/src/index.c
浏览文件 @
fa66a89b
...
...
@@ -22,11 +22,10 @@
#endif
typedef
struct
SIdx
Field
Info
{
int
field
Id
;
// generated by index internal
typedef
struct
SIdx
Col
Info
{
int
col
Id
;
// generated by index internal
int
cVersion
;
int
type
;
// field type
}
SIdxFieldInfo
;
}
SIdxColInfo
;
static
pthread_once_t
isInit
=
PTHREAD_ONCE_INIT
;
static
void
indexInit
();
...
...
@@ -38,22 +37,25 @@ static int indexMergeCacheIntoTindex(struct SIndex *sIdx) {
indexWarn
(
"suid %"
PRIu64
" merge cache into tindex"
,
sIdx
->
suid
);
return
0
;
}
SIndex
*
indexOpen
(
SIndexOpts
*
opts
,
const
char
*
path
)
{
int
indexOpen
(
SIndexOpts
*
opts
,
const
char
*
path
,
SIndex
**
index
)
{
pthread_once
(
&
isInit
,
indexInit
);
SIndex
*
sIdx
=
calloc
(
1
,
sizeof
(
SIndex
));
if
(
sIdx
==
NULL
)
{
return
-
1
;
}
#ifdef USE_LUCENE
index_t
*
index
=
index_open
(
path
);
sIdx
->
index
=
index
;
#endif
sIdx
->
cache
=
(
void
*
)
indexCacheCreate
();
sIdx
->
tindex
=
NULL
;
sIdx
->
field
Obj
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
sIdx
->
field
Id
=
1
;
sIdx
->
col
Obj
=
taosHashInit
(
8
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
true
,
HASH_ENTRY_LOCK
);
sIdx
->
col
Id
=
1
;
sIdx
->
cVersion
=
1
;
pthread_mutex_init
(
&
sIdx
->
mtx
,
NULL
);
return
sIdx
;
*
index
=
sIdx
;
return
0
;
}
void
indexClose
(
SIndex
*
sIdx
)
{
...
...
@@ -61,14 +63,17 @@ void indexClose(SIndex *sIdx) {
index_close
(
sIdex
->
index
);
sIdx
->
index
=
NULL
;
#endif
#ifdef USE_INVERTED_INDEX
indexCacheDestroy
(
sIdx
->
cache
);
taosHashCleanup
(
sIdx
->
field
Obj
);
taosHashCleanup
(
sIdx
->
col
Obj
);
pthread_mutex_destroy
(
&
sIdx
->
mtx
);
#endif
free
(
sIdx
);
return
;
}
int
indexPut
(
SIndex
*
index
,
S
Array
*
fVals
,
int
uid
)
{
int
indexPut
(
SIndex
*
index
,
S
IndexMultiTerm
*
fVals
,
int
uid
)
{
#ifdef USE_LUCENE
index_document_t
*
doc
=
index_document_create
();
...
...
@@ -86,32 +91,38 @@ int indexPut(SIndex *index, SArray* fVals, int uid) {
index_document_destroy
(
doc
);
#endif
#ifdef USE_INVERTED_INDEX
//TODO(yihao): reduce the lock range
pthread_mutex_lock
(
&
index
->
mtx
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
fVals
);
i
++
)
{
SIndexTerm
*
p
=
taosArrayGetP
(
fVals
,
i
);
SIdx
FieldInfo
*
fi
=
taosHashGet
(
index
->
fieldObj
,
p
->
key
,
p
->
nKey
);
SIdx
ColInfo
*
fi
=
taosHashGet
(
index
->
colObj
,
p
->
colName
,
p
->
nColName
);
if
(
fi
==
NULL
)
{
SIdx
FieldInfo
tfi
=
{.
fieldId
=
index
->
fieldId
,
.
type
=
p
->
type
};
SIdx
ColInfo
tfi
=
{.
colId
=
index
->
colId
};
index
->
cVersion
++
;
index
->
field
Id
++
;
taosHashPut
(
index
->
fieldObj
,
p
->
key
,
p
->
nKey
,
&
tfi
,
sizeof
(
tfi
));
index
->
col
Id
++
;
taosHashPut
(
index
->
colObj
,
p
->
colName
,
p
->
nColName
,
&
tfi
,
sizeof
(
tfi
));
}
else
{
//TODO, del
}
}
pthread_mutex_unlock
(
&
index
->
mtx
);
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
fVals
);
i
++
)
{
SIndexTerm
*
p
=
taosArrayGetP
(
fVals
,
i
);
SIdx
FieldInfo
*
fi
=
taosHashGet
(
index
->
fieldObj
,
p
->
key
,
p
->
nKey
);
SIdx
ColInfo
*
fi
=
taosHashGet
(
index
->
colObj
,
p
->
colName
,
p
->
nColName
);
assert
(
fi
!=
NULL
);
int32_t
fieldId
=
fi
->
fieldId
;
int32_t
colType
=
fi
->
type
;
int32_t
colId
=
fi
->
colId
;
int32_t
version
=
index
->
cVersion
;
int
ret
=
indexCachePut
(
index
->
cache
,
colId
,
p
->
colType
,
p
->
colVal
,
p
->
nColVal
,
version
,
uid
,
p
->
operType
);
if
(
ret
!=
0
)
{
return
ret
;
}
}
pthread_mutex_unlock
(
&
index
->
mtx
);
return
1
;
#endif
return
0
;
}
int
indexSearch
(
SIndex
*
index
,
SIndexMultiTermQuery
*
multiQuerys
,
SArray
*
result
)
{
#ifdef USE_LUCENE
...
...
@@ -148,16 +159,26 @@ int indexSearch(SIndex *index, SIndexMultiTermQuery *multiQuerys, SArray *result
free
(
fields
);
free
(
keys
);
free
(
types
);
#endif
#ifdef USE_INVERTED_INDEX
#endif
return
1
;
}
int
indexDelete
(
SIndex
*
index
,
SIndexMultiTermQuery
*
query
)
{
#ifdef USE_INVERTED_INDEX
#endif
return
1
;
}
int
indexRebuild
(
SIndex
*
index
,
SIndexOpts
*
opts
);
int
indexRebuild
(
SIndex
*
index
,
SIndexOpts
*
opts
)
{
#ifdef USE_INVERTED_INDEX
#endif
}
SIndexOpts
*
indexOptsCreate
()
{
...
...
@@ -184,53 +205,55 @@ SIndexMultiTermQuery *indexMultiTermQueryCreate(EIndexOperatorType opera) {
void
indexMultiTermQueryDestroy
(
SIndexMultiTermQuery
*
pQuery
)
{
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pQuery
->
query
);
i
++
)
{
SIndexTermQuery
*
p
=
(
SIndexTermQuery
*
)
taosArrayGet
(
pQuery
->
query
,
i
);
indexTermDestroy
(
p
->
field_value
);
indexTermDestroy
(
p
->
term
);
}
taosArrayDestroy
(
pQuery
->
query
);
free
(
pQuery
);
};
int
indexMultiTermQueryAdd
(
SIndexMultiTermQuery
*
pQuery
,
const
char
*
field
,
int32_t
nFields
,
const
char
*
value
,
int32_t
nValue
,
EIndexQueryType
type
){
SIndexTerm
*
t
=
indexTermCreate
(
field
,
nFields
,
value
,
nValue
);
if
(
t
==
NULL
)
{
return
-
1
;}
SIndexTermQuery
q
=
{.
type
=
type
,
.
field_value
=
t
};
int
indexMultiTermQueryAdd
(
SIndexMultiTermQuery
*
pQuery
,
SIndexTerm
*
term
,
EIndexQueryType
qType
){
SIndexTermQuery
q
=
{.
qType
=
qType
,
.
term
=
term
};
taosArrayPush
(
pQuery
->
query
,
&
q
);
return
0
;
}
SIndexTerm
*
indexTermCreate
(
const
char
*
key
,
int32_t
nKey
,
const
char
*
val
,
int32_t
nVal
)
{
SIndexTerm
*
t
=
(
SIndexTerm
*
)
malloc
(
sizeof
(
SIndexTerm
));
t
->
key
=
(
char
*
)
calloc
(
nKey
+
1
,
1
);
memcpy
(
t
->
key
,
key
,
nKey
);
t
->
nKey
=
nKey
;
SIndexTerm
*
indexTermCreate
(
int64_t
suid
,
SIndexOperOnColumn
oper
,
uint8_t
colType
,
const
char
*
colName
,
int32_t
nColName
,
const
char
*
colVal
,
int32_t
nColVal
)
{
SIndexTerm
*
t
=
(
SIndexTerm
*
)
calloc
(
1
,
(
sizeof
(
SIndexTerm
)));
if
(
t
==
NULL
)
{
return
NULL
;
}
t
->
suid
=
suid
;
t
->
operType
=
oper
;
t
->
colType
=
colType
;
t
->
colName
=
(
char
*
)
calloc
(
1
,
nColName
+
1
);
memcpy
(
t
->
colName
,
colName
,
nColName
);
t
->
nColName
=
nColName
;
t
->
val
=
(
char
*
)
calloc
(
nVal
+
1
,
1
);
memcpy
(
t
->
val
,
val
,
n
Val
);
t
->
n
Val
=
n
Val
;
t
->
colVal
=
(
char
*
)
calloc
(
1
,
nColVal
+
1
);
memcpy
(
t
->
colVal
,
colVal
,
nCol
Val
);
t
->
n
ColVal
=
nCol
Val
;
return
t
;
}
void
indexTermDestroy
(
SIndexTerm
*
p
)
{
free
(
p
->
key
);
free
(
p
->
v
al
);
free
(
p
->
colName
);
free
(
p
->
colV
al
);
free
(
p
);
}
S
Array
*
indexMultiTermCreate
()
{
S
IndexMultiTerm
*
indexMultiTermCreate
()
{
return
taosArrayInit
(
4
,
sizeof
(
SIndexTerm
*
));
}
int
indexMultiTermAdd
(
SArray
*
array
,
const
char
*
field
,
int32_t
nField
,
const
char
*
val
,
int32_t
nVal
)
{
SIndexTerm
*
term
=
indexTermCreate
(
field
,
nField
,
val
,
nVal
);
if
(
term
==
NULL
)
{
return
-
1
;
}
taosArrayPush
(
array
,
&
term
);
int
indexMultiTermAdd
(
SIndexMultiTerm
*
terms
,
SIndexTerm
*
term
)
{
taosArrayPush
(
terms
,
&
term
);
return
0
;
}
void
indexMultiTermDestroy
(
S
Array
*
array
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
array
);
i
++
)
{
SIndexTerm
*
p
=
taosArrayGetP
(
array
,
i
);
void
indexMultiTermDestroy
(
S
IndexMultiTerm
*
terms
)
{
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
terms
);
i
++
)
{
SIndexTerm
*
p
=
taosArrayGetP
(
terms
,
i
);
indexTermDestroy
(
p
);
}
taosArrayDestroy
(
array
);
taosArrayDestroy
(
terms
);
}
void
indexInit
()
{
...
...
source/libs/index/src/index_cache.c
浏览文件 @
fa66a89b
...
...
@@ -16,7 +16,7 @@
#include "index_cache.h"
#include "tcompare.h"
#define MAX_INDEX_KEY_LEN
128
// test only, change later
#define MAX_INDEX_KEY_LEN
256
// test only, change later
static
char
*
getIndexKey
(
const
void
*
pData
)
{
return
NULL
;
...
...
@@ -96,16 +96,19 @@ IndexCache *indexCacheCreate() {
}
void
indexCacheDestroy
(
IndexCache
*
cache
)
{
if
(
cache
==
NULL
)
{
return
;
}
tSkipListDestroy
(
cache
->
skiplist
);
free
(
cache
);
void
indexCacheDestroy
(
void
*
cache
)
{
IndexCache
*
pCache
=
cache
;
if
(
pCache
==
NULL
)
{
return
;
}
tSkipListDestroy
(
pCache
->
skiplist
);
free
(
pCache
);
}
int
indexCachePut
(
IndexCache
*
cache
,
int16_t
fieldId
,
int16_t
fieldType
,
const
char
*
fieldValue
,
int32_t
fvLen
,
int
indexCachePut
(
void
*
cache
,
int16_t
fieldId
,
int16_t
fieldType
,
const
char
*
fieldValue
,
int32_t
fvLen
,
uint32_t
version
,
uint64_t
uid
,
int8_t
operType
)
{
if
(
cache
==
NULL
)
{
return
-
1
;}
IndexCache
*
pCache
=
cache
;
// encode data
int32_t
total
=
sizeof
(
int32_t
)
+
sizeof
(
fieldId
)
+
sizeof
(
fieldType
)
+
sizeof
(
fvLen
)
+
fvLen
+
sizeof
(
version
)
+
sizeof
(
uid
)
+
sizeof
(
operType
);
...
...
@@ -135,20 +138,15 @@ int indexCachePut(IndexCache *cache, int16_t fieldId, int16_t fieldType, const c
memcpy
(
p
,
&
operType
,
sizeof
(
operType
));
p
+=
sizeof
(
operType
);
tSkipListPut
(
c
ache
->
skiplist
,
(
void
*
)
buf
);
tSkipListPut
(
pC
ache
->
skiplist
,
(
void
*
)
buf
);
// encode end
}
int
indexCacheDel
(
IndexCache
*
cache
,
int32_t
fieldId
,
const
char
*
fieldValue
,
int32_t
fvlen
,
uint64_t
uid
,
int8_t
operType
)
{
int
indexCacheDel
(
void
*
cache
,
int32_t
fieldId
,
const
char
*
fieldValue
,
int32_t
fvlen
,
uint64_t
uid
,
int8_t
operType
)
{
IndexCache
*
pCache
=
cache
;
return
0
;
}
int
indexCacheSearch
(
IndexCache
*
cache
,
SIndexMultiTermQuery
*
query
,
SArray
*
result
)
{
int
indexCacheSearch
(
void
*
cache
,
SIndexMultiTermQuery
*
query
,
SArray
*
result
)
{
return
0
;
}
source/libs/index/test/CMakeLists.txt
浏览文件 @
fa66a89b
add_executable
(
indexTest
""
)
target_sources
(
indexTest
PRIVATE
"indexTests.c
pp
"
"indexTests.c
c
"
)
target_include_directories
(
indexTest
PUBLIC
...
...
source/libs/index/test/indexTests.c
pp
→
source/libs/index/test/indexTests.c
c
浏览文件 @
fa66a89b
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <gtest/gtest.h>
#include <string>
#include <iostream>
...
...
@@ -61,7 +75,7 @@ class FstReadMemory {
// add later
bool
Search
(
AutomationCtx
*
ctx
,
std
::
vector
<
uint64_t
>
&
result
)
{
FstStreamBuilder
*
sb
=
fstSearch
(
_fst
,
ctx
);
StreamWithState
*
st
=
streamBuilderIntoStream
(
sb
);
StreamWithState
*
st
=
streamBuilderIntoStream
(
sb
);
StreamWithStateResult
*
rt
=
NULL
;
while
((
rt
=
streamWithStateNextWith
(
st
,
NULL
))
!=
NULL
)
{
...
...
@@ -279,15 +293,71 @@ void validateFst() {
delete
m
;
}
class
IndexEnv
:
public
::
testing
::
Test
{
protected:
virtual
void
SetUp
()
{
taosRemoveDir
(
path
);
opts
=
indexOptsCreate
();
int
ret
=
indexOpen
(
opts
,
path
,
&
index
);
assert
(
ret
==
0
);
}
virtual
void
TearDown
()
{
indexClose
(
index
);
indexOptsDestroy
(
opts
);
}
const
char
*
path
=
"/tmp/tindex"
;
SIndexOpts
*
opts
;
SIndex
*
index
;
};
TEST_F
(
IndexEnv
,
testPut
)
{
// single index column
{
std
::
string
colName
(
"tag1"
),
colVal
(
"Hello world"
);
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
indexMultiTermAdd
(
terms
,
term
);
for
(
size_t
i
=
0
;
i
<
100
;
i
++
)
{
int
tableId
=
i
;
int
ret
=
indexPut
(
index
,
terms
,
tableId
);
assert
(
ret
==
0
);
}
indexMultiTermDestroy
(
terms
);
}
// multi index column
{
SIndexMultiTerm
*
terms
=
indexMultiTermCreate
();
{
std
::
string
colName
(
"tag1"
),
colVal
(
"Hello world"
);
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
indexMultiTermAdd
(
terms
,
term
);
}
{
std
::
string
colName
(
"tag2"
),
colVal
(
"Hello world"
);
SIndexTerm
*
term
=
indexTermCreate
(
0
,
ADD_VALUE
,
TSDB_DATA_TYPE_BINARY
,
colName
.
c_str
(),
colName
.
size
(),
colVal
.
c_str
(),
colVal
.
size
());
indexMultiTermAdd
(
terms
,
term
);
}
for
(
int
i
=
0
;
i
<
100
;
i
++
)
{
int
tableId
=
i
;
int
ret
=
indexPut
(
index
,
terms
,
tableId
);
assert
(
ret
==
0
);
}
indexMultiTermDestroy
(
terms
);
}
//
}
int
main
(
int
argc
,
char
**
argv
)
{
checkFstPerf
();
//checkFstPrefixSearch();
return
1
;
TEST_F
(
IndexEnv
,
testDel
)
{
}
//TEST(IndexFstBuilder, IndexFstInput) {
//
//}
source/util/src/terror.c
浏览文件 @
fa66a89b
...
...
@@ -130,6 +130,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_VALUE_OUT_OF_RANGE, "Value out of range")
TAOS_DEFINE_ERROR
(
TSDB_CODE_TSC_INVALID_INPUT
,
"Invalid tsc input"
)
// mnode-common
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_APP_ERROR
,
"Mnode internal error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_NOT_READY
,
"Cluster not ready"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_MSG_NOT_PROCESSED
,
"Message not processed"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_ACTION_IN_PROGRESS
,
"Message is progressing"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录