Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
42c6f019
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
42c6f019
编写于
4月 26, 2022
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refact vnode includes
上级
08942d5a
变更
43
隐藏空白更改
内联
并排
Showing
43 changed file
with
172 addition
and
250 deletion
+172
-250
source/dnode/vnode/CMakeLists.txt
source/dnode/vnode/CMakeLists.txt
+0
-2
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+4
-4
source/dnode/vnode/src/inc/meta.h
source/dnode/vnode/src/inc/meta.h
+6
-28
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+2
-10
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+2
-14
source/dnode/vnode/src/inc/tsdbSma.h
source/dnode/vnode/src/inc/tsdbSma.h
+1
-9
source/dnode/vnode/src/inc/vnd.h
source/dnode/vnode/src/inc/vnd.h
+21
-5
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+55
-12
source/dnode/vnode/src/inc/vnodeSync.h
source/dnode/vnode/src/inc/vnodeSync.h
+0
-44
source/dnode/vnode/src/meta/metaCommit.c
source/dnode/vnode/src/meta/metaCommit.c
+1
-1
source/dnode/vnode/src/meta/metaEntry.c
source/dnode/vnode/src/meta/metaEntry.c
+1
-1
source/dnode/vnode/src/meta/metaIdx.c
source/dnode/vnode/src/meta/metaIdx.c
+1
-1
source/dnode/vnode/src/meta/metaOpen.c
source/dnode/vnode/src/meta/metaOpen.c
+1
-1
source/dnode/vnode/src/meta/metaQuery.c
source/dnode/vnode/src/meta/metaQuery.c
+6
-6
source/dnode/vnode/src/meta/metaTable.c
source/dnode/vnode/src/meta/metaTable.c
+3
-3
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+23
-23
source/dnode/vnode/src/tq/tqCommit.c
source/dnode/vnode/src/tq/tqCommit.c
+1
-1
source/dnode/vnode/src/tq/tqMetaStore.c
source/dnode/vnode/src/tq/tqMetaStore.c
+2
-2
source/dnode/vnode/src/tq/tqOffset.c
source/dnode/vnode/src/tq/tqOffset.c
+1
-1
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbCommit2.c
source/dnode/vnode/src/tsdb/tsdbCommit2.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbCompact.c
source/dnode/vnode/src/tsdb/tsdbCompact.c
+5
-6
source/dnode/vnode/src/tsdb/tsdbFS.c
source/dnode/vnode/src/tsdb/tsdbFS.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbFile.c
source/dnode/vnode/src/tsdb/tsdbFile.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbMain.c
source/dnode/vnode/src/tsdb/tsdbMain.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbMemTable.c
source/dnode/vnode/src/tsdb/tsdbMemTable.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbOptions.c
source/dnode/vnode/src/tsdb/tsdbOptions.c
+0
-34
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+4
-5
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbScan.c
source/dnode/vnode/src/tsdb/tsdbScan.c
+1
-2
source/dnode/vnode/src/tsdb/tsdbSma.c
source/dnode/vnode/src/tsdb/tsdbSma.c
+2
-2
source/dnode/vnode/src/tsdb/tsdbTDBImpl.c
source/dnode/vnode/src/tsdb/tsdbTDBImpl.c
+3
-3
source/dnode/vnode/src/tsdb/tsdbWrite.c
source/dnode/vnode/src/tsdb/tsdbWrite.c
+1
-1
source/dnode/vnode/src/vnd/vnodeBufPool.c
source/dnode/vnode/src/vnd/vnodeBufPool.c
+1
-1
source/dnode/vnode/src/vnd/vnodeCfg.c
source/dnode/vnode/src/vnd/vnodeCfg.c
+1
-1
source/dnode/vnode/src/vnd/vnodeCommit.c
source/dnode/vnode/src/vnd/vnodeCommit.c
+2
-2
source/dnode/vnode/src/vnd/vnodeInt.c
source/dnode/vnode/src/vnd/vnodeInt.c
+1
-1
source/dnode/vnode/src/vnd/vnodeModule.c
source/dnode/vnode/src/vnd/vnodeModule.c
+1
-1
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+1
-2
source/dnode/vnode/src/vnd/vnodeQuery.c
source/dnode/vnode/src/vnd/vnodeQuery.c
+3
-3
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+1
-5
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+6
-5
未找到文件。
source/dnode/vnode/CMakeLists.txt
浏览文件 @
42c6f019
...
...
@@ -19,7 +19,6 @@ target_sources(
"src/meta/metaOpen.c"
"src/meta/metaIdx.c"
"src/meta/metaTable.c"
"src/meta/metaTDBImpl.c"
"src/meta/metaQuery.c"
"src/meta/metaCommit.c"
"src/meta/metaEntry.c"
...
...
@@ -33,7 +32,6 @@ target_sources(
"src/tsdb/tsdbFS.c"
"src/tsdb/tsdbMain.c"
"src/tsdb/tsdbMemTable.c"
"src/tsdb/tsdbOptions.c"
"src/tsdb/tsdbRead.c"
"src/tsdb/tsdbReadImpl.c"
"src/tsdb/tsdbScan.c"
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
42c6f019
...
...
@@ -74,7 +74,7 @@ typedef struct SMeta SMeta; // todo: remove
typedef
struct
SMetaReader
SMetaReader
;
typedef
struct
SMetaEntry
SMetaEntry
;
void
metaReaderInit
(
SMetaReader
*
pReader
,
S
Vnode
*
pVnode
,
int32_t
flags
);
void
metaReaderInit
(
SMetaReader
*
pReader
,
S
Meta
*
pMeta
,
int32_t
flags
);
void
metaReaderClear
(
SMetaReader
*
pReader
);
int
metaReadNext
(
SMetaReader
*
pReader
);
...
...
@@ -90,8 +90,8 @@ int metaTbCursorNext(SMTbCursor *pTbCur);
#endif
// tsdb
typedef
struct
STsdb
STsdb
;
typedef
void
*
tsdbReaderT
;
typedef
struct
STsdb
STsdb
;
typedef
void
*
tsdbReaderT
;
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
...
...
@@ -111,7 +111,7 @@ bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
void
tsdbRetrieveDataBlockInfo
(
tsdbReaderT
*
pTsdbReadHandle
,
SDataBlockInfo
*
pBlockInfo
);
int32_t
tsdbRetrieveDataBlockStatisInfo
(
tsdbReaderT
*
pTsdbReadHandle
,
SColumnDataAgg
**
pBlockStatis
);
SArray
*
tsdbRetrieveDataBlock
(
tsdbReaderT
*
pTsdbReadHandle
,
SArray
*
pColumnIdList
);
void
tsdbResetReadHandle
(
tsdbReaderT
queryHandle
,
SQueryTableDataCond
*
pCond
);
void
tsdbResetReadHandle
(
tsdbReaderT
queryHandle
,
SQueryTableDataCond
*
pCond
);
void
tsdbDestroyTableGroup
(
STableGroupInfo
*
pGroupList
);
int32_t
tsdbGetOneTableGroup
(
void
*
pMeta
,
uint64_t
uid
,
TSKEY
startKey
,
STableGroupInfo
*
pGroupInfo
);
int32_t
tsdbGetTableGroupFromIdList
(
STsdb
*
tsdb
,
SArray
*
pTableIdList
,
STableGroupInfo
*
pGroupInfo
);
...
...
source/dnode/vnode/src/inc/meta.h
浏览文件 @
42c6f019
...
...
@@ -16,13 +16,14 @@
#ifndef _TD_VNODE_META_H_
#define _TD_VNODE_META_H_
#include "vnodeInt.h"
#ifdef __cplusplus
extern
"C"
{
#endif
typedef
struct
SMetaIdx
SMetaIdx
;
typedef
struct
SMetaDB
SMetaDB
;
typedef
struct
SMCtbCursor
SMCtbCursor
;
typedef
struct
SMSmaCursor
SMSmaCursor
;
// metaDebug ==================
...
...
@@ -36,22 +37,16 @@ typedef struct SMSmaCursor SMSmaCursor;
// clang-format on
// metaOpen ==================
int
metaOpen
(
SVnode
*
pVnode
,
SMeta
**
ppMeta
);
int
metaClose
(
SMeta
*
pMeta
);
// metaEntry ==================
int
metaEncodeEntry
(
SCoder
*
pCoder
,
const
SMetaEntry
*
pME
);
int
metaDecodeEntry
(
SCoder
*
pCoder
,
SMetaEntry
*
pME
);
// metaTable ==================
int
metaCreateSTable
(
SMeta
*
pMeta
,
int64_t
version
,
SVCreateStbReq
*
pReq
);
int
metaDropSTable
(
SMeta
*
pMeta
,
int64_t
verison
,
SVDropStbReq
*
pReq
);
int
metaCreateTable
(
SMeta
*
pMeta
,
int64_t
version
,
SVCreateTbReq
*
pReq
);
// metaQuery ==================
int
metaGetTableEntryByVersion
(
SMetaReader
*
pReader
,
int64_t
version
,
tb_uid_t
uid
);
int
metaGetTableEntryByUid
(
SMetaReader
*
pReader
,
tb_uid_t
uid
);
int
metaGetTableEntryByName
(
SMetaReader
*
pReader
,
const
char
*
name
);
// metaIdx ==================
int
metaOpenIdx
(
SMeta
*
pMeta
);
...
...
@@ -60,8 +55,6 @@ int metaSaveTableToIdx(SMeta* pMeta, const STbCfg* pTbOptions);
int
metaRemoveTableFromIdx
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
// metaCommit ==================
int
metaBegin
(
SMeta
*
pMeta
);
static
FORCE_INLINE
tb_uid_t
metaGenerateUid
(
SMeta
*
pMeta
)
{
return
tGenIdPI64
();
}
struct
SMeta
{
...
...
@@ -106,27 +99,12 @@ typedef struct {
}
STtlIdxKey
;
#if 1
#define META_SUPER_TABLE TD_SUPER_TABLE
#define META_CHILD_TABLE TD_CHILD_TABLE
#define META_NORMAL_TABLE TD_NORMAL_TABLE
// int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle);
int
metaDropTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
int
metaCommit
(
SMeta
*
pMeta
);
int32_t
metaCreateTSma
(
SMeta
*
pMeta
,
SSmaCfg
*
pCfg
);
int32_t
metaDropTSma
(
SMeta
*
pMeta
,
int64_t
indexUid
);
SSchemaWrapper
*
metaGetTableSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
,
bool
isinline
);
STSchema
*
metaGetTbTSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
);
void
*
metaGetSmaInfoByIndex
(
SMeta
*
pMeta
,
int64_t
indexUid
,
bool
isDecode
);
STSmaWrapper
*
metaGetSmaInfoByTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
SArray
*
metaGetSmaTbUids
(
SMeta
*
pMeta
,
bool
isDup
);
int
metaGetTbNum
(
SMeta
*
pMeta
);
SMSmaCursor
*
metaOpenSmaCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
void
metaCloseSmaCursor
(
SMSmaCursor
*
pSmaCur
);
int64_t
metaSmaCursorNext
(
SMSmaCursor
*
pSmaCur
);
SMCtbCursor
*
metaOpenCtbCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
void
metaCloseCtbCurosr
(
SMCtbCursor
*
pCtbCur
);
tb_uid_t
metaCtbCursorNext
(
SMCtbCursor
*
pCtbCur
);
int
metaDropTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
SMSmaCursor
*
metaOpenSmaCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
void
metaCloseSmaCursor
(
SMSmaCursor
*
pSmaCur
);
int64_t
metaSmaCursorNext
(
SMSmaCursor
*
pSmaCur
);
#ifndef META_REFACT
// SMetaDB
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
42c6f019
...
...
@@ -16,6 +16,8 @@
#ifndef _TD_VNODE_TQ_H_
#define _TD_VNODE_TQ_H_
#include "vnodeInt.h"
#include "executor.h"
#include "os.h"
#include "tcache.h"
...
...
@@ -237,17 +239,7 @@ int tqInit();
void
tqCleanUp
();
// open in each vnode
STQ
*
tqOpen
(
const
char
*
path
,
SVnode
*
pVnode
,
SWal
*
pWal
);
void
tqClose
(
STQ
*
);
// required by vnode
int
tqPushMsg
(
STQ
*
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
ver
);
int
tqCommit
(
STQ
*
);
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
,
int32_t
workerId
);
int32_t
tqProcessVgChangeReq
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcessTaskExec
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
,
int32_t
workerId
);
int32_t
tqProcessTaskDeploy
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcessStreamTrigger
(
STQ
*
pTq
,
void
*
data
,
int32_t
dataLen
,
int32_t
workerId
);
int32_t
tqSerializeConsumer
(
const
STqConsumer
*
,
STqSerializedHead
**
);
int32_t
tqDeserializeConsumer
(
STQ
*
,
const
STqSerializedHead
*
,
STqConsumer
**
);
...
...
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
42c6f019
...
...
@@ -16,6 +16,8 @@
#ifndef _TD_VNODE_TSDB_H_
#define _TD_VNODE_TSDB_H_
#include "vnodeInt.h"
#ifdef __cplusplus
extern
"C"
{
#endif
...
...
@@ -43,7 +45,6 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKe
TKEY
*
filterKeys
,
int
nFilterKeys
,
bool
keepDup
,
SMergeInfo
*
pMergeInfo
);
// tsdbCommit ================
int
tsdbBegin
(
STsdb
*
pTsdb
);
#if 1
...
...
@@ -60,16 +61,9 @@ struct STable {
#define TABLE_TID(t) (t)->tid
#define TABLE_UID(t) (t)->uid
int
tsdbOpen
(
SVnode
*
pVnode
,
STsdb
**
ppTsdb
);
int
tsdbClose
(
STsdb
*
pTsdb
);
int
tsdbInsertData
(
STsdb
*
pTsdb
,
int64_t
version
,
SSubmitReq
*
pMsg
,
SSubmitRsp
*
pRsp
);
int
tsdbPrepareCommit
(
STsdb
*
pTsdb
);
int
tsdbCommit
(
STsdb
*
pTsdb
);
int32_t
tsdbInitSma
(
STsdb
*
pTsdb
);
int32_t
tsdbCreateTSma
(
STsdb
*
pTsdb
,
char
*
pMsg
);
int32_t
tsdbDropTSma
(
STsdb
*
pTsdb
,
char
*
pMsg
);
int32_t
tsdbUpdateSmaWindow
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
,
int64_t
version
);
int32_t
tsdbInsertTSmaData
(
STsdb
*
pTsdb
,
int64_t
indexUid
,
const
char
*
msg
);
int32_t
tsdbDropTSmaData
(
STsdb
*
pTsdb
,
int64_t
indexUid
);
int32_t
tsdbInsertRSmaData
(
STsdb
*
pTsdb
,
char
*
msg
);
void
tsdbCleanupReadHandle
(
tsdbReaderT
queryHandle
);
...
...
@@ -237,12 +231,6 @@ static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator *pIter) {
return
TD_ROW_KEY
(
row
);
}
// tsdbOptions
extern
const
STsdbCfg
defautlTsdbOptions
;
int
tsdbValidateOptions
(
const
STsdbCfg
*
);
void
tsdbOptionsCopy
(
STsdbCfg
*
pDest
,
const
STsdbCfg
*
pSrc
);
// tsdbReadImpl
typedef
struct
SReadH
SReadH
;
...
...
source/dnode/vnode/src/inc/tsdbSma.h
浏览文件 @
42c6f019
...
...
@@ -16,9 +16,7 @@
#ifndef _TD_VNODE_TSDB_SMA_H_
#define _TD_VNODE_TSDB_SMA_H_
#include "os.h"
#include "thash.h"
#include "tmsg.h"
#include "tsdb.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -32,12 +30,6 @@ struct STbDdlH {
__tb_ddl_fn_t
fp
;
};
typedef
struct
{
tb_uid_t
suid
;
SArray
*
tbUids
;
SHashObj
*
uidHash
;
}
STbUidStore
;
static
FORCE_INLINE
int32_t
tsdbUidStoreInit
(
STbUidStore
**
pStore
)
{
ASSERT
(
*
pStore
==
NULL
);
*
pStore
=
taosMemoryCalloc
(
1
,
sizeof
(
STbUidStore
));
...
...
source/dnode/vnode/src/inc/vnd.h
浏览文件 @
42c6f019
...
...
@@ -16,6 +16,10 @@
#ifndef _TD_VND_H_
#define _TD_VND_H_
#include "sync.h"
#include "syncTools.h"
#include "vnodeInt.h"
#ifdef __cplusplus
extern
"C"
{
#endif
...
...
@@ -58,11 +62,9 @@ struct SVBufPool {
SVBufPoolNode
node
;
};
int
vnodeOpenBufPool
(
SVnode
*
pVnode
,
int64_t
size
);
int
vnodeCloseBufPool
(
SVnode
*
pVnode
);
void
vnodeBufPoolReset
(
SVBufPool
*
pPool
);
void
*
vnodeBufPoolMalloc
(
SVBufPool
*
pPool
,
int
size
);
void
vnodeBufPoolFree
(
SVBufPool
*
pPool
,
void
*
p
);
int
vnodeOpenBufPool
(
SVnode
*
pVnode
,
int64_t
size
);
int
vnodeCloseBufPool
(
SVnode
*
pVnode
);
void
vnodeBufPoolReset
(
SVBufPool
*
pPool
);
// vnodeQuery ====================
int
vnodeQueryOpen
(
SVnode
*
pVnode
);
...
...
@@ -79,6 +81,20 @@ int vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo);
int
vnodeSyncCommit
(
SVnode
*
pVnode
);
int
vnodeAsyncCommit
(
SVnode
*
pVnode
);
// vnodeCommit ====================
int32_t
vnodeSyncOpen
(
SVnode
*
pVnode
,
char
*
path
);
int32_t
vnodeSyncStart
(
SVnode
*
pVnode
);
void
vnodeSyncClose
(
SVnode
*
pVnode
);
void
vnodeSyncSetQ
(
SVnode
*
pVnode
,
void
*
qHandle
);
void
vnodeSyncSetRpc
(
SVnode
*
pVnode
,
void
*
rpcHandle
);
int32_t
vnodeSyncEqMsg
(
void
*
qHandle
,
SRpcMsg
*
pMsg
);
int32_t
vnodeSendMsg
(
void
*
rpcHandle
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
);
void
vnodeSyncCommitCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
);
void
vnodeSyncPreCommitCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
);
void
vnodeSyncRollBackCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
);
int32_t
vnodeSyncGetSnapshotCb
(
struct
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
);
SSyncFSM
*
syncVnodeMakeFsm
();
#ifdef __cplusplus
}
#endif
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
42c6f019
...
...
@@ -59,6 +59,55 @@ typedef struct SQWorkerMgmt SQHandle;
#define VNODE_TQ_DIR "tq"
#define VNODE_WAL_DIR "wal"
// vnd.h
void
*
vnodeBufPoolMalloc
(
SVBufPool
*
pPool
,
int
size
);
void
vnodeBufPoolFree
(
SVBufPool
*
pPool
,
void
*
p
);
// meta
typedef
struct
SMCtbCursor
SMCtbCursor
;
typedef
struct
STbUidStore
STbUidStore
;
int
metaOpen
(
SVnode
*
pVnode
,
SMeta
**
ppMeta
);
int
metaClose
(
SMeta
*
pMeta
);
int
metaBegin
(
SMeta
*
pMeta
);
int
metaCommit
(
SMeta
*
pMeta
);
int
metaCreateSTable
(
SMeta
*
pMeta
,
int64_t
version
,
SVCreateStbReq
*
pReq
);
int
metaCreateTable
(
SMeta
*
pMeta
,
int64_t
version
,
SVCreateTbReq
*
pReq
);
SSchemaWrapper
*
metaGetTableSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
,
bool
isinline
);
STSchema
*
metaGetTbTSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
);
int
metaGetTableEntryByUid
(
SMetaReader
*
pReader
,
tb_uid_t
uid
);
int
metaGetTableEntryByName
(
SMetaReader
*
pReader
,
const
char
*
name
);
int
metaGetTbNum
(
SMeta
*
pMeta
);
SMCtbCursor
*
metaOpenCtbCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
void
metaCloseCtbCurosr
(
SMCtbCursor
*
pCtbCur
);
tb_uid_t
metaCtbCursorNext
(
SMCtbCursor
*
pCtbCur
);
SArray
*
metaGetSmaTbUids
(
SMeta
*
pMeta
,
bool
isDup
);
void
*
metaGetSmaInfoByIndex
(
SMeta
*
pMeta
,
int64_t
indexUid
,
bool
isDecode
);
STSmaWrapper
*
metaGetSmaInfoByTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
int32_t
metaCreateTSma
(
SMeta
*
pMeta
,
SSmaCfg
*
pCfg
);
int32_t
metaDropTSma
(
SMeta
*
pMeta
,
int64_t
indexUid
);
// tsdb
int
tsdbOpen
(
SVnode
*
pVnode
,
STsdb
**
ppTsdb
);
int
tsdbClose
(
STsdb
*
pTsdb
);
int
tsdbBegin
(
STsdb
*
pTsdb
);
int
tsdbCommit
(
STsdb
*
pTsdb
);
int32_t
tsdbUpdateSmaWindow
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
,
int64_t
version
);
int32_t
tsdbCreateTSma
(
STsdb
*
pTsdb
,
char
*
pMsg
);
int32_t
tsdbInsertTSmaData
(
STsdb
*
pTsdb
,
int64_t
indexUid
,
const
char
*
msg
);
int
tsdbInsertData
(
STsdb
*
pTsdb
,
int64_t
version
,
SSubmitReq
*
pMsg
,
SSubmitRsp
*
pRsp
);
// tq
STQ
*
tqOpen
(
const
char
*
path
,
SVnode
*
pVnode
,
SWal
*
pWal
);
void
tqClose
(
STQ
*
);
int
tqPushMsg
(
STQ
*
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
ver
);
int
tqCommit
(
STQ
*
);
int32_t
tqProcessVgChangeReq
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcessTaskExec
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
,
int32_t
workerId
);
int32_t
tqProcessTaskDeploy
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcessStreamTrigger
(
STQ
*
pTq
,
void
*
data
,
int32_t
dataLen
,
int32_t
workerId
);
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
,
int32_t
workerId
);
typedef
struct
{
int8_t
streamType
;
// sma or other
int8_t
dstType
;
...
...
@@ -106,6 +155,12 @@ struct SVnode {
SQHandle
*
pQuery
;
};
struct
STbUidStore
{
tb_uid_t
suid
;
SArray
*
tbUids
;
SHashObj
*
uidHash
;
};
#define TD_VID(PVNODE) (PVNODE)->config.vgId
typedef
struct
STbDdlH
STbDdlH
;
...
...
@@ -113,18 +168,6 @@ typedef struct STbDdlH STbDdlH;
// sma
void
smaHandleRes
(
void
*
pVnode
,
int64_t
smaId
,
const
SArray
*
data
);
#include "vnd.h"
#include "meta.h"
#include "tsdb.h"
#include "tq.h"
#include "vnodeSync.h"
#include "tsdbSma.h"
#ifdef __cplusplus
}
#endif
...
...
source/dnode/vnode/src/inc/vnodeSync.h
已删除
100644 → 0
浏览文件 @
08942d5a
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_SYNC_H_
#define _TD_VNODE_SYNC_H_
#ifdef __cplusplus
extern
"C"
{
#endif
int32_t
vnodeSyncOpen
(
SVnode
*
pVnode
,
char
*
path
);
int32_t
vnodeSyncStart
(
SVnode
*
pVnode
);
void
vnodeSyncClose
(
SVnode
*
pVnode
);
void
vnodeSyncSetQ
(
SVnode
*
pVnode
,
void
*
qHandle
);
void
vnodeSyncSetRpc
(
SVnode
*
pVnode
,
void
*
rpcHandle
);
int32_t
vnodeSyncEqMsg
(
void
*
qHandle
,
SRpcMsg
*
pMsg
);
int32_t
vnodeSendMsg
(
void
*
rpcHandle
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
);
void
vnodeSyncCommitCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
);
void
vnodeSyncPreCommitCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
);
void
vnodeSyncRollBackCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
);
int32_t
vnodeSyncGetSnapshotCb
(
struct
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
);
SSyncFSM
*
syncVnodeMakeFsm
();
#ifdef __cplusplus
}
#endif
#endif
/*_TD_VNODE_SYNC_H_*/
source/dnode/vnode/src/meta/metaCommit.c
浏览文件 @
42c6f019
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
vnodeInt
.h"
#include "
meta
.h"
static
FORCE_INLINE
void
*
metaMalloc
(
void
*
pPool
,
size_t
size
)
{
return
vnodeBufPoolMalloc
((
SVBufPool
*
)
pPool
,
size
);
}
static
FORCE_INLINE
void
metaFree
(
void
*
pPool
,
void
*
p
)
{
vnodeBufPoolFree
((
SVBufPool
*
)
pPool
,
p
);
}
...
...
source/dnode/vnode/src/meta/metaEntry.c
浏览文件 @
42c6f019
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
vnodeInt
.h"
#include "
meta
.h"
int
metaEncodeEntry
(
SCoder
*
pCoder
,
const
SMetaEntry
*
pME
)
{
if
(
tStartEncode
(
pCoder
)
<
0
)
return
-
1
;
...
...
source/dnode/vnode/src/meta/metaIdx.c
浏览文件 @
42c6f019
...
...
@@ -16,7 +16,7 @@
#ifdef USE_INVERTED_INDEX
#include "index.h"
#endif
#include "
vnodeInt
.h"
#include "
meta
.h"
struct
SMetaIdx
{
#ifdef USE_INVERTED_INDEX
...
...
source/dnode/vnode/src/meta/metaOpen.c
浏览文件 @
42c6f019
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
vnodeInt
.h"
#include "
meta
.h"
static
int
tbDbKeyCmpr
(
const
void
*
pKey1
,
int
kLen1
,
const
void
*
pKey2
,
int
kLen2
);
static
int
skmDbKeyCmpr
(
const
void
*
pKey1
,
int
kLen1
,
const
void
*
pKey2
,
int
kLen2
);
...
...
source/dnode/vnode/src/meta/metaQuery.c
浏览文件 @
42c6f019
...
...
@@ -13,12 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
vnodeInt
.h"
#include "
meta
.h"
void
metaReaderInit
(
SMetaReader
*
pReader
,
S
Vnode
*
pVnode
,
int32_t
flags
)
{
void
metaReaderInit
(
SMetaReader
*
pReader
,
S
Meta
*
pMeta
,
int32_t
flags
)
{
memset
(
pReader
,
0
,
sizeof
(
*
pReader
));
pReader
->
flags
=
flags
;
pReader
->
pMeta
=
p
Vnode
->
p
Meta
;
pReader
->
pMeta
=
pMeta
;
}
void
metaReaderClear
(
SMetaReader
*
pReader
)
{
...
...
@@ -91,7 +91,7 @@ SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
return
NULL
;
}
metaReaderInit
(
&
pTbCur
->
mr
,
pMeta
->
pVnode
,
0
);
metaReaderInit
(
&
pTbCur
->
mr
,
pMeta
,
0
);
tdbDbcOpen
(
pMeta
->
pUidIdx
,
&
pTbCur
->
pDbc
);
...
...
@@ -122,7 +122,7 @@ int metaTbCursorNext(SMTbCursor *pTbCur) {
}
metaGetTableEntryByVersion
(
&
pTbCur
->
mr
,
*
(
int64_t
*
)
pTbCur
->
pVal
,
*
(
tb_uid_t
*
)
pTbCur
->
pKey
);
if
(
pTbCur
->
mr
.
me
.
type
==
META
_SUPER_TABLE
)
{
if
(
pTbCur
->
mr
.
me
.
type
==
TSDB
_SUPER_TABLE
)
{
continue
;
}
...
...
@@ -234,7 +234,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
STSchemaBuilder
sb
=
{
0
};
SSchema
*
pSchema
;
metaReaderInit
(
&
mr
,
pMeta
->
pVnode
,
0
);
metaReaderInit
(
&
mr
,
pMeta
,
0
);
metaGetTableEntryByUid
(
&
mr
,
uid
);
if
(
mr
.
me
.
type
==
TSDB_CHILD_TABLE
)
{
...
...
source/dnode/vnode/src/meta/metaTable.c
浏览文件 @
42c6f019
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
vnodeInt
.h"
#include "
meta
.h"
static
int
metaHandleEntry
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
);
static
int
metaSaveToTbDb
(
SMeta
*
pMeta
,
const
SMetaEntry
*
pME
);
...
...
@@ -37,7 +37,7 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
SMetaReader
mr
=
{
0
};
// validate req
metaReaderInit
(
&
mr
,
pMeta
->
pVnode
,
0
);
metaReaderInit
(
&
mr
,
pMeta
,
0
);
if
(
metaGetTableEntryByName
(
&
mr
,
pReq
->
name
)
==
0
)
{
// TODO: just for pass case
#if 0
...
...
@@ -91,7 +91,7 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
pReq
->
ctime
=
taosGetTimestampSec
();
// validate req
metaReaderInit
(
&
mr
,
pMeta
->
pVnode
,
0
);
metaReaderInit
(
&
mr
,
pMeta
,
0
);
if
(
metaGetTableEntryByName
(
&
mr
,
pReq
->
name
)
==
0
)
{
terrno
=
TSDB_CODE_TDB_TABLE_ALREADY_EXIST
;
metaReaderClear
(
&
mr
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
42c6f019
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
vnodeInt
.h"
#include "
tq
.h"
int32_t
tqInit
()
{
//
...
...
@@ -176,9 +176,9 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
atomic_store_ptr
(
&
pExec
->
pushHandle
.
handle
,
NULL
);
taosWUnLockLatch
(
&
pExec
->
pushHandle
.
lock
);
v
Debug
(
"vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld"
,
TD_VID
(
pTq
->
pVnode
),
fetchOffset
,
pExec
->
pushHandle
.
consumerId
,
pExec
->
pushHandle
.
epoch
,
rsp
.
blockNum
,
rsp
.
reqOffset
,
rsp
.
rspOffset
);
tq
Debug
(
"vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld"
,
TD_VID
(
pTq
->
pVnode
),
fetchOffset
,
pExec
->
pushHandle
.
consumerId
,
pExec
->
pushHandle
.
epoch
,
rsp
.
blockNum
,
rsp
.
reqOffset
,
rsp
.
rspOffset
);
// TODO destroy
taosArrayDestroy
(
rsp
.
blockData
);
...
...
@@ -390,8 +390,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
fetchOffset
=
pReq
->
currentOffset
+
1
;
}
v
Debug
(
"tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld"
,
consumerId
,
pReq
->
epoch
,
TD_VID
(
pTq
->
pVnode
),
pReq
->
currentOffset
,
fetchOffset
);
tq
Debug
(
"tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld"
,
consumerId
,
pReq
->
epoch
,
TD_VID
(
pTq
->
pVnode
),
pReq
->
currentOffset
,
fetchOffset
);
STqExec
*
pExec
=
taosHashGet
(
pTq
->
execs
,
pReq
->
subKey
,
strlen
(
pReq
->
subKey
));
ASSERT
(
pExec
);
...
...
@@ -418,16 +418,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
while
(
1
)
{
consumerEpoch
=
atomic_load_32
(
&
pExec
->
epoch
);
if
(
consumerEpoch
>
reqEpoch
)
{
v
Debug
(
"tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d"
,
consumerId
,
pReq
->
epoch
,
TD_VID
(
pTq
->
pVnode
),
fetchOffset
,
consumerEpoch
,
reqEpoch
);
tq
Debug
(
"tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d"
,
consumerId
,
pReq
->
epoch
,
TD_VID
(
pTq
->
pVnode
),
fetchOffset
,
consumerEpoch
,
reqEpoch
);
break
;
}
taosThreadMutexLock
(
&
pExec
->
pWalReader
->
mutex
);
if
(
walFetchHead
(
pExec
->
pWalReader
,
fetchOffset
,
pHeadWithCkSum
)
<
0
)
{
v
Debug
(
"tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return"
,
consumerId
,
pReq
->
epoch
,
TD_VID
(
pTq
->
pVnode
),
fetchOffset
);
tq
Debug
(
"tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return"
,
consumerId
,
pReq
->
epoch
,
TD_VID
(
pTq
->
pVnode
),
fetchOffset
);
taosThreadMutexUnlock
(
&
pExec
->
pWalReader
->
mutex
);
break
;
}
...
...
@@ -448,7 +448,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// TODO: no more log, set timer to wait blocking time
// if data inserted during waiting, launch query and
// response to user
v
Debug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
tq
Debug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchOffset);
#if 0
...
...
@@ -476,8 +476,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
#endif
v
Debug
(
"tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d"
,
consumerId
,
pReq
->
epoch
,
TD_VID
(
pTq
->
pVnode
),
fetchOffset
,
pHead
->
msgType
);
tq
Debug
(
"tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d"
,
consumerId
,
pReq
->
epoch
,
TD_VID
(
pTq
->
pVnode
),
fetchOffset
,
pHead
->
msgType
);
if
(
pHead
->
msgType
==
TDMT_VND_SUBMIT
)
{
SSubmitReq
*
pCont
=
(
SSubmitReq
*
)
&
pHead
->
body
;
...
...
@@ -591,8 +591,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pMsg
->
code
=
0
;
tmsgSendRsp
(
pMsg
);
v
Debug
(
"vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld"
,
TD_VID
(
pTq
->
pVnode
),
fetchOffset
,
consumerId
,
pReq
->
epoch
,
rsp
.
blockNum
,
rsp
.
reqOffset
,
rsp
.
rspOffset
);
tq
Debug
(
"vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld"
,
TD_VID
(
pTq
->
pVnode
),
fetchOffset
,
consumerId
,
pReq
->
epoch
,
rsp
.
blockNum
,
rsp
.
reqOffset
,
rsp
.
rspOffset
);
// TODO destroy
taosArrayDestroy
(
rsp
.
blockData
);
...
...
@@ -618,7 +618,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
fetchOffset = pReq->currentOffset + 1;
}
v
Debug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
tq
Debug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
SMqPollRspV2 rspV2 = {0};
...
...
@@ -660,7 +660,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
return 0;
}
v
Debug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch,
tq
Debug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch,
TD_VID(pTq->pVnode));
rspV2.reqOffset = pReq->currentOffset;
...
...
@@ -671,7 +671,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// TODO
consumerEpoch = atomic_load_32(&pConsumer->epoch);
if (consumerEpoch > reqEpoch) {
v
Debug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d",
tq
Debug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d",
consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch);
break;
}
...
...
@@ -680,11 +680,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// TODO: no more log, set timer to wait blocking time
// if data inserted during waiting, launch query and
// response to user
v
Debug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
tq
Debug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchOffset);
break;
}
v
Debug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
tq
Debug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
/*pHead = pTopic->pReadhandle->pHead;*/
...
...
@@ -709,7 +709,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
if (taosArrayGetSize(pRes) == 0) {
v
Debug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId,
tq
Debug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId,
pReq->epoch, TD_VID(pTq->pVnode), fetchOffset);
fetchOffset++;
rspV2.skipLogNum++;
...
...
@@ -770,7 +770,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pMsg->pCont = buf;
pMsg->contLen = msgLen;
pMsg->code = 0;
v
Debug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset,
tq
Debug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset,
pHead->msgType, consumerId, pReq->epoch);
tmsgSendRsp(pMsg);
taosMemoryFree(pHead);
...
...
@@ -804,7 +804,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pMsg->contLen = tlen;
pMsg->code = 0;
tmsgSendRsp(pMsg);
v
Debug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId,
tq
Debug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId,
pReq->epoch);
/*}*/
...
...
source/dnode/vnode/src/tq/tqCommit.c
浏览文件 @
42c6f019
...
...
@@ -13,4 +13,4 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
vnodeInt
.h"
#include "
tq
.h"
source/dnode/vnode/src/tq/tqMetaStore.c
浏览文件 @
42c6f019
...
...
@@ -12,7 +12,7 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
vnodeInt
.h"
#include "
tq
.h"
// #include <fcntl.h>
// #include <string.h>
// #include <unistd.h>
...
...
@@ -86,7 +86,7 @@ STqMetaStore* tqStoreOpen(STQ* pTq, const char* path, FTqSerialize serializer, F
}
strcpy
(
pMeta
->
dirPath
,
path
);
char
*
name
=
taosMemoryMalloc
(
pathLen
+
10
)
;
char
*
name
=
taosMemoryMalloc
(
pathLen
+
10
)
;
strcpy
(
name
,
path
);
if
(
!
taosDirExist
(
name
)
&&
taosMkDir
(
name
)
!=
0
)
{
...
...
source/dnode/vnode/src/tq/tqOffset.c
浏览文件 @
42c6f019
...
...
@@ -14,7 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "
vnodeInt
.h"
#include "
tq
.h"
enum
ETqOffsetPersist
{
TQ_OFFSET_PERSIST__LAZY
=
1
,
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
42c6f019
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
vnodeInt
.h"
#include "
tq
.h"
STqReadHandle
*
tqInitSubmitMsgScanner
(
SMeta
*
pMeta
)
{
STqReadHandle
*
pReadHandle
=
taosMemoryMalloc
(
sizeof
(
STqReadHandle
));
...
...
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
42c6f019
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
vnodeInt
.h"
#include "
tsdb
.h"
#define TSDB_MAX_SUBBLOCKS 8
...
...
source/dnode/vnode/src/tsdb/tsdbCommit2.c
浏览文件 @
42c6f019
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
vnodeInt
.h"
#include "
tsdb
.h"
int
tsdbBegin
(
STsdb
*
pTsdb
)
{
STsdbMemTable
*
pMem
;
...
...
source/dnode/vnode/src/tsdb/tsdbCompact.c
浏览文件 @
42c6f019
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if 0
#include "tsdb
int
.h"
#include "tsdb.h"
typedef struct {
STable * pTable;
...
...
@@ -33,15 +33,15 @@ typedef struct {
SDataCols *pDataCols;
} SCompactH;
#define TSDB_COMPACT_WSET(pComph) (&((pComph)->wSet))
#define TSDB_COMPACT_REPO(pComph) TSDB_READ_REPO(&((pComph)->readh))
#define TSDB_COMPACT_WSET(pComph)
(&((pComph)->wSet))
#define TSDB_COMPACT_REPO(pComph)
TSDB_READ_REPO(&((pComph)->readh))
#define TSDB_COMPACT_HEAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_HEAD)
#define TSDB_COMPACT_DATA_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_DATA)
#define TSDB_COMPACT_LAST_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_LAST)
#define TSDB_COMPACT_SMAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMAD)
#define TSDB_COMPACT_SMAL_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMAL)
#define TSDB_COMPACT_BUF(pComph) TSDB_READ_BUF(&((pComph)->readh))
#define TSDB_COMPACT_COMP_BUF(pComph) TSDB_READ_COMP_BUF(&((pComph)->readh))
#define TSDB_COMPACT_BUF(pComph)
TSDB_READ_BUF(&((pComph)->readh))
#define TSDB_COMPACT_COMP_BUF(pComph)
TSDB_READ_COMP_BUF(&((pComph)->readh))
static int tsdbAsyncCompact(STsdbRepo *pRepo);
static void tsdbStartCompact(STsdbRepo *pRepo);
...
...
@@ -531,4 +531,3 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
}
#endif
source/dnode/vnode/src/tsdb/tsdbFS.c
浏览文件 @
42c6f019
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
vnodeInt
.h"
#include "
tsdb
.h"
typedef
enum
{
TSDB_TXN_TEMP_FILE
=
0
,
TSDB_TXN_CURR_FILE
}
TSDB_TXN_FILE_T
;
static
const
char
*
tsdbTxnFname
[]
=
{
"current.t"
,
"current"
};
...
...
source/dnode/vnode/src/tsdb/tsdbFile.c
浏览文件 @
42c6f019
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
vnodeInt
.h"
#include "
tsdb
.h"
static
const
char
*
TSDB_FNAME_SUFFIX
[]
=
{
"head"
,
// TSDB_FILE_HEAD
...
...
source/dnode/vnode/src/tsdb/tsdbMain.c
浏览文件 @
42c6f019
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
vnodeInt
.h"
#include "
tsdb
.h"
int
tsdbOpen
(
SVnode
*
pVnode
,
STsdb
**
ppTsdb
)
{
STsdb
*
pTsdb
=
NULL
;
...
...
source/dnode/vnode/src/tsdb/tsdbMemTable.c
浏览文件 @
42c6f019
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
vnodeInt
.h"
#include "
tsdb
.h"
static
STbData
*
tsdbNewTbData
(
tb_uid_t
uid
);
static
void
tsdbFreeTbData
(
STbData
*
pTbData
);
...
...
source/dnode/vnode/src/tsdb/tsdbOptions.c
已删除
100644 → 0
浏览文件 @
08942d5a
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
const
STsdbCfg
defautlTsdbOptions
=
{.
precision
=
0
,
.
lruCacheSize
=
0
,
.
days
=
10
,
.
minRows
=
100
,
.
maxRows
=
4096
,
.
keep2
=
3650
,
.
keep0
=
3650
,
.
keep1
=
3650
,
.
update
=
0
,
.
compression
=
TWO_STAGE_COMP
};
int
tsdbValidateOptions
(
const
STsdbCfg
*
pTsdbOptions
)
{
// TODO
return
0
;
}
void
tsdbOptionsCopy
(
STsdbCfg
*
pDest
,
const
STsdbCfg
*
pSrc
)
{
memcpy
(
pDest
,
pSrc
,
sizeof
(
STsdbCfg
));
}
\ No newline at end of file
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
42c6f019
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
vnodeInt
.h"
#include "
tsdb
.h"
#define EXTRA_BYTES 2
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
...
...
@@ -3618,7 +3618,7 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch
SColIndex
*
pColIndex
,
int32_t
numOfCols
,
uint64_t
reqId
,
uint64_t
taskId
)
{
SMetaReader
mr
=
{
0
};
metaReaderInit
(
&
mr
,
(
(
SMeta
*
)
pMeta
)
->
pVnode
,
0
);
metaReaderInit
(
&
mr
,
(
SMeta
*
)
pMeta
,
0
);
if
(
metaGetTableEntryByUid
(
&
mr
,
uid
)
<
0
)
{
tsdbError
(
"%p failed to get stable, uid:%"
PRIu64
", TID:0x%"
PRIx64
" QID:0x%"
PRIx64
,
pMeta
,
uid
,
taskId
,
reqId
);
...
...
@@ -3628,7 +3628,7 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch
tsdbDebug
(
"%p succeed to get stable, uid:%"
PRIu64
", TID:0x%"
PRIx64
" QID:0x%"
PRIx64
,
pMeta
,
uid
,
taskId
,
reqId
);
}
if
(
mr
.
me
.
type
!=
META
_SUPER_TABLE
)
{
if
(
mr
.
me
.
type
!=
TSDB
_SUPER_TABLE
)
{
tsdbError
(
"%p query normal tag not allowed, uid:%"
PRIu64
", TID:0x%"
PRIx64
" QID:0x%"
PRIx64
,
pMeta
,
uid
,
taskId
,
reqId
);
terrno
=
TSDB_CODE_OPS_NOT_SUPPORT
;
// basically, this error is caused by invalid sql issued by client
...
...
@@ -3687,10 +3687,9 @@ int32_t tsdbQueryTableList(void* pMeta, SArray* pRes, void* filterInfo) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
tsdbGetOneTableGroup
(
void
*
pMeta
,
uint64_t
uid
,
TSKEY
startKey
,
STableGroupInfo
*
pGroupInfo
)
{
SMeta
*
metaP
=
(
SMeta
*
)
pMeta
;
SMetaReader
mr
=
{
0
};
metaReaderInit
(
&
mr
,
metaP
->
pVnode
,
0
);
metaReaderInit
(
&
mr
,
(
SMeta
*
)
pMeta
,
0
);
if
(
metaGetTableEntryByUid
(
&
mr
,
uid
)
<
0
)
{
terrno
=
TSDB_CODE_TDB_INVALID_TABLE_ID
;
...
...
source/dnode/vnode/src/tsdb/tsdbReadImpl.c
浏览文件 @
42c6f019
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
vnodeInt
.h"
#include "
tsdb
.h"
#define TSDB_KEY_COL_OFFSET 0
...
...
source/dnode/vnode/src/tsdb/tsdbScan.c
浏览文件 @
42c6f019
...
...
@@ -13,9 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#if 0
#include "tsdb
int
.h"
#include "tsdb.h"
#ifndef _TSDB_PLUGINS
int tsdbScanFGroup(STsdbScanHandle* pScanHandle, char* rootDir, int fid) { return 0; }
...
...
source/dnode/vnode/src/tsdb/tsdbSma.c
浏览文件 @
42c6f019
...
...
@@ -13,7 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
#include "tsdbSma.h"
#include "tsdb.h"
static
const
char
*
TSDB_SMA_DNAME
[]
=
{
""
,
// TSDB_SMA_TYPE_BLOCK
...
...
@@ -678,7 +679,6 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers
return
TSDB_CODE_FAILED
;
}
if
(
tsdbCheckAndInitSmaEnv
(
pTsdb
,
TSDB_SMA_TYPE_TIME_RANGE
)
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
TSDB_CODE_TDB_INIT_FAILED
;
return
TSDB_CODE_FAILED
;
...
...
source/dnode/vnode/src/tsdb/tsdbTDBImpl.c
浏览文件 @
42c6f019
...
...
@@ -15,14 +15,14 @@
#define ALLOW_FORBID_FUNC
#include "
vnodeInt
.h"
#include "
tsdb
.h"
int32_t
tsdbOpenDBEnv
(
TENV
**
ppEnv
,
const
char
*
path
)
{
int
ret
=
0
;
int
ret
=
0
;
if
(
path
==
NULL
)
return
-
1
;
ret
=
tdbEnvOpen
(
path
,
4096
,
256
,
ppEnv
);
// use as param
ret
=
tdbEnvOpen
(
path
,
4096
,
256
,
ppEnv
);
// use as param
if
(
ret
!=
0
)
{
tsdbError
(
"Failed to create tsdb db env, ret = %d"
,
ret
);
...
...
source/dnode/vnode/src/tsdb/tsdbWrite.c
浏览文件 @
42c6f019
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
vnodeInt
.h"
#include "
tsdb
.h"
static
int
tsdbScanAndConvertSubmitMsg
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
);
...
...
source/dnode/vnode/src/vnd/vnodeBufPool.c
浏览文件 @
42c6f019
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vn
odeInt
.h"
#include "vn
d
.h"
/* ------------------------ STRUCTURES ------------------------ */
...
...
source/dnode/vnode/src/vnd/vnodeCfg.c
浏览文件 @
42c6f019
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vn
odeInt
.h"
#include "vn
d
.h"
const
SVnodeCfg
vnodeCfgDefault
=
{
.
vgId
=
-
1
,
...
...
source/dnode/vnode/src/vnd/vnodeCommit.c
浏览文件 @
42c6f019
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vn
odeInt
.h"
#include "vn
d
.h"
#define VND_INFO_FNAME "vnode.json"
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
...
...
@@ -175,7 +175,7 @@ int vnodeAsyncCommit(SVnode *pVnode) {
vnodeWaitCommit
(
pVnode
);
// vnodeBufPoolSwitch(pVnode);
tsdbPrepareCommit
(
pVnode
->
pTsdb
);
//
tsdbPrepareCommit(pVnode->pTsdb);
vnodeScheduleTask
(
vnodeCommitImpl
,
pVnode
);
...
...
source/dnode/vnode/src/vnd/vnodeInt.c
浏览文件 @
42c6f019
...
...
@@ -14,7 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "vn
odeInt
.h"
#include "vn
d
.h"
// #include "vnodeInt.h"
int32_t
vnodeAlter
(
SVnode
*
pVnode
,
const
SVnodeCfg
*
pCfg
)
{
return
0
;
}
...
...
source/dnode/vnode/src/vnd/vnodeModule.c
浏览文件 @
42c6f019
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vn
odeInt
.h"
#include "vn
d
.h"
typedef
struct
SVnodeTask
SVnodeTask
;
struct
SVnodeTask
{
...
...
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
42c6f019
...
...
@@ -13,8 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
#include "vnodeSync.h"
#include "vnd.h"
int
vnodeCreate
(
const
char
*
path
,
SVnodeCfg
*
pCfg
,
STfs
*
pTfs
)
{
SVnodeInfo
info
=
{
0
};
...
...
source/dnode/vnode/src/vnd/vnodeQuery.c
浏览文件 @
42c6f019
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vn
odeInt
.h"
#include "vn
d
.h"
int
vnodeQueryOpen
(
SVnode
*
pVnode
)
{
return
qWorkerInit
(
NODE_TYPE_VNODE
,
TD_VID
(
pVnode
),
NULL
,
(
void
**
)
&
pVnode
->
pQuery
,
&
pVnode
->
msgCb
);
...
...
@@ -51,7 +51,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
}
// query meta
metaReaderInit
(
&
mer1
,
pVnode
,
0
);
metaReaderInit
(
&
mer1
,
pVnode
->
pMeta
,
0
);
if
(
metaGetTableEntryByName
(
&
mer1
,
infoReq
.
tbName
)
<
0
)
{
goto
_exit
;
...
...
@@ -67,7 +67,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
schemaTag
=
mer1
.
me
.
stbEntry
.
schemaTag
;
metaRsp
.
suid
=
mer1
.
me
.
uid
;
}
else
if
(
mer1
.
me
.
type
==
TSDB_CHILD_TABLE
)
{
metaReaderInit
(
&
mer2
,
pVnode
,
0
);
metaReaderInit
(
&
mer2
,
pVnode
->
pMeta
,
0
);
if
(
metaGetTableEntryByUid
(
&
mer2
,
mer1
.
me
.
ctbEntry
.
suid
)
<
0
)
goto
_exit
;
strcpy
(
metaRsp
.
stbName
,
mer2
.
me
.
name
);
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
42c6f019
...
...
@@ -13,9 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sync.h"
#include "syncTools.h"
#include "vnodeInt.h"
#include "vnd.h"
static
int
vnodeProcessCreateStbReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int
len
,
SRpcMsg
*
pRsp
);
static
int
vnodeProcessAlterStbReq
(
SVnode
*
pVnode
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
...
...
@@ -201,9 +199,7 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
// sync integration
int
vnodeProcessSyncReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
if
(
syncEnvIsStart
())
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
pVnode
->
sync
);
assert
(
pSyncNode
!=
NULL
);
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
42c6f019
...
...
@@ -13,10 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sync.h"
#include "syncTools.h"
#include "tmsgcb.h"
#include "vnodeInt.h"
#include "vnd.h"
// #include "sync.h"
// #include "syncTools.h"
// #include "tmsgcb.h"
// #include "vnodeInt.h"
// sync integration
...
...
@@ -113,7 +114,7 @@ void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
syncUtilState2String
(
cbMeta
.
state
),
beginIndex
);
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
SVnode
*
pVnode
=
(
SVnode
*
)(
pFsm
->
data
);
SVnode
*
pVnode
=
(
SVnode
*
)(
pFsm
->
data
);
SyncApplyMsg
*
pSyncApplyMsg
=
syncApplyMsgBuild2
(
pMsg
,
pVnode
->
config
.
vgId
,
&
cbMeta
);
SRpcMsg
applyMsg
;
syncApplyMsg2RpcMsg
(
pSyncApplyMsg
,
&
applyMsg
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录