Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a9a24a14
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a9a24a14
编写于
4月 14, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into feature/node
上级
aa736030
a654ece8
变更
23
显示空白变更内容
内联
并排
Showing
23 changed file
with
578 addition
and
628 deletion
+578
-628
.devcontainer/Dockerfile
.devcontainer/Dockerfile
+1
-1
.devcontainer/devcontainer.json
.devcontainer/devcontainer.json
+1
-1
packaging/install.sh
packaging/install.sh
+7
-7
source/client/src/clientMain.c
source/client/src/clientMain.c
+17
-22
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
+1
-1
source/dnode/vnode/CMakeLists.txt
source/dnode/vnode/CMakeLists.txt
+2
-6
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+6
-80
source/dnode/vnode/src/inc/meta.h
source/dnode/vnode/src/inc/meta.h
+41
-18
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+59
-42
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+139
-117
source/dnode/vnode/src/inc/vnd.h
source/dnode/vnode/src/inc/vnd.h
+92
-0
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+7
-131
source/dnode/vnode/src/meta/metaCache.c
source/dnode/vnode/src/meta/metaCache.c
+0
-40
source/dnode/vnode/src/meta/metaCfg.c
source/dnode/vnode/src/meta/metaCfg.c
+0
-29
source/dnode/vnode/src/meta/metaMain.c
source/dnode/vnode/src/meta/metaMain.c
+0
-22
source/dnode/vnode/src/meta/metaQuery.c
source/dnode/vnode/src/meta/metaQuery.c
+0
-16
source/dnode/vnode/src/meta/metaTbCfg.c
source/dnode/vnode/src/meta/metaTbCfg.c
+0
-48
source/dnode/vnode/src/meta/metaTbTag.c
source/dnode/vnode/src/meta/metaTbTag.c
+0
-16
source/dnode/vnode/src/vnd/vnodeCommit.c
source/dnode/vnode/src/vnd/vnodeCommit.c
+2
-8
source/dnode/vnode/src/vnd/vnodeModule.c
source/dnode/vnode/src/vnd/vnodeModule.c
+158
-0
source/libs/scalar/src/sclfunc.c
source/libs/scalar/src/sclfunc.c
+22
-5
tests/script/tsim/query/charScalarFunction.sim
tests/script/tsim/query/charScalarFunction.sim
+15
-14
tests/test/c/tmqDemo.c
tests/test/c/tmqDemo.c
+8
-4
未找到文件。
.devcontainer/Dockerfile
浏览文件 @
a9a24a14
...
@@ -7,4 +7,4 @@ FROM mcr.microsoft.com/vscode/devcontainers/cpp:0-${VARIANT}
...
@@ -7,4 +7,4 @@ FROM mcr.microsoft.com/vscode/devcontainers/cpp:0-${VARIANT}
# [Optional] Uncomment this section to install additional packages.
# [Optional] Uncomment this section to install additional packages.
# RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \
# RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \
# && apt-get -y install --no-install-recommends <your-package-list-here>
# && apt-get -y install --no-install-recommends <your-package-list-here>
RUN
apt-get update
&&
apt-get
-y
install
tree vim
RUN
apt-get update
&&
apt-get
-y
install
tree vim
tmux
.devcontainer/devcontainer.json
浏览文件 @
a9a24a14
...
@@ -32,7 +32,7 @@
...
@@ -32,7 +32,7 @@
//
Use
'forwardPorts'
to
make
a
list
of
ports
inside
the
container
available
locally.
//
Use
'forwardPorts'
to
make
a
list
of
ports
inside
the
container
available
locally.
//
"forwardPorts"
:
[],
//
"forwardPorts"
:
[],
//
Use
'postCreateCommand'
to
run
commands
after
the
container
is
created.
//
Use
'postCreateCommand'
to
run
commands
after
the
container
is
created.
//
"postCreateCommand"
:
"gcc -v
"
,
"postCreateCommand"
:
"wget https://raw.githubusercontent.com/hzcheng/config/master/.tmux.conf -P /root
"
,
//
Comment
out
connect
as
root
instead.
More
info:
https://aka.ms/vscode-remote/containers/non-root.
//
Comment
out
connect
as
root
instead.
More
info:
https://aka.ms/vscode-remote/containers/non-root.
"remoteUser"
:
"root"
"remoteUser"
:
"root"
}
}
\ No newline at end of file
packaging/install.sh
浏览文件 @
a9a24a14
...
@@ -460,14 +460,14 @@ function install_service_on_systemd() {
...
@@ -460,14 +460,14 @@ function install_service_on_systemd() {
}
}
function
install_service
()
{
function
install_service
()
{
if
((
${
service_mod
}
==
0
))
;
then
#
if ((${service_mod}==0)); then
install_service_on_systemd
#
install_service_on_systemd
elif
((
${
service_mod
}
==
1
))
;
then
#
elif ((${service_mod}==1)); then
install_service_on_sysvinit
#
install_service_on_sysvinit
else
#
else
# must manual stop taosd
#
# must manual stop taosd
kill_process taosd
kill_process taosd
fi
#
fi
}
}
function
install_TDengine
()
{
function
install_TDengine
()
{
...
...
source/client/src/clientMain.c
浏览文件 @
a9a24a14
...
@@ -463,7 +463,16 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
...
@@ -463,7 +463,16 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
if
(
res
==
NULL
)
{
if
(
res
==
NULL
)
{
return
0
;
return
0
;
}
}
if
(
TD_RES_QUERY
(
res
))
{
if
(
TD_RES_TMQ
(
res
))
{
SReqResultInfo
*
pResultInfo
=
tmqGetNextResInfo
(
res
);
if
(
pResultInfo
==
NULL
)
return
-
1
;
pResultInfo
->
current
=
pResultInfo
->
numOfRows
;
(
*
numOfRows
)
=
pResultInfo
->
numOfRows
;
(
*
pData
)
=
(
void
*
)
pResultInfo
->
pData
;
return
0
;
}
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
if
(
pRequest
->
type
==
TSDB_SQL_RETRIEVE_EMPTY_RESULT
||
pRequest
->
type
==
TSDB_SQL_INSERT
||
if
(
pRequest
->
type
==
TSDB_SQL_RETRIEVE_EMPTY_RESULT
||
pRequest
->
type
==
TSDB_SQL_INSERT
||
...
@@ -480,20 +489,6 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
...
@@ -480,20 +489,6 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
(
*
pData
)
=
(
void
*
)
pResultInfo
->
pData
;
(
*
pData
)
=
(
void
*
)
pResultInfo
->
pData
;
return
0
;
return
0
;
}
else
if
(
TD_RES_TMQ
(
res
))
{
SReqResultInfo
*
pResultInfo
=
tmqGetNextResInfo
(
res
);
if
(
pResultInfo
==
NULL
)
return
-
1
;
pResultInfo
->
current
=
pResultInfo
->
numOfRows
;
(
*
numOfRows
)
=
pResultInfo
->
numOfRows
;
(
*
pData
)
=
(
void
*
)
pResultInfo
->
pData
;
return
0
;
}
else
{
ASSERT
(
0
);
return
-
1
;
}
}
}
int
*
taos_get_column_data_offset
(
TAOS_RES
*
res
,
int
columnIndex
)
{
int
*
taos_get_column_data_offset
(
TAOS_RES
*
res
,
int
columnIndex
)
{
...
...
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
浏览文件 @
a9a24a14
...
@@ -299,7 +299,7 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) {
...
@@ -299,7 +299,7 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) {
goto
_OVER
;
goto
_OVER
;
}
}
if
(
vnodeInit
()
!=
0
)
{
if
(
vnodeInit
(
tsNumOfCommitThreads
)
!=
0
)
{
dError
(
"failed to init vnode since %s"
,
terrstr
());
dError
(
"failed to init vnode since %s"
,
terrstr
());
goto
_OVER
;
goto
_OVER
;
}
}
...
...
source/dnode/vnode/CMakeLists.txt
浏览文件 @
a9a24a14
...
@@ -10,21 +10,17 @@ target_sources(
...
@@ -10,21 +10,17 @@ target_sources(
"src/vnd/vnodeCommit.c"
"src/vnd/vnodeCommit.c"
"src/vnd/vnodeInt.c"
"src/vnd/vnodeInt.c"
"src/vnd/vnodeMain.c"
"src/vnd/vnodeMain.c"
"src/vnd/vnodeMgr.c"
"src/vnd/vnodeQuery.c"
"src/vnd/vnodeQuery.c"
"src/vnd/vnodeStateMgr.c"
"src/vnd/vnodeStateMgr.c"
"src/vnd/vnodeWrite.c"
"src/vnd/vnodeWrite.c"
"src/vnd/vnodeModule.c"
# "src/vnd/vnodeMgr.c"
# meta
# meta
# "src/meta/metaBDBImpl.c"
# "src/meta/metaBDBImpl.c"
"src/meta/metaCache.c"
"src/meta/metaCfg.c"
"src/meta/metaIdx.c"
"src/meta/metaIdx.c"
"src/meta/metaMain.c"
"src/meta/metaMain.c"
"src/meta/metaQuery.c"
"src/meta/metaTable.c"
"src/meta/metaTable.c"
"src/meta/metaTbCfg.c"
"src/meta/metaTbTag.c"
"src/meta/metaTbUid.c"
"src/meta/metaTbUid.c"
"src/meta/metaTDBImpl.c"
"src/meta/metaTDBImpl.c"
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
a9a24a14
...
@@ -61,33 +61,14 @@ int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName);
...
@@ -61,33 +61,14 @@ int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName);
// meta
// meta
typedef
struct
SMeta
SMeta
;
// todo: remove
typedef
struct
SMeta
SMeta
;
// todo: remove
typedef
struct
SMTbCursor
SMTbCursor
;
// todo: remove
typedef
struct
SMTbCursor
SMTbCursor
;
typedef
struct
SMCtbCursor
SMCtbCursor
;
// todo: remove
typedef
struct
SMSmaCursor
SMSmaCursor
;
// todo: remove
#define META_SUPER_TABLE TD_SUPER_TABLE
#define META_CHILD_TABLE TD_CHILD_TABLE
#define META_NORMAL_TABLE TD_NORMAL_TABLE
typedef
SVCreateTbReq
STbCfg
;
typedef
SVCreateTbReq
STbCfg
;
typedef
SVCreateTSmaReq
SSmaCfg
;
typedef
SVCreateTSmaReq
SSmaCfg
;
SSchemaWrapper
*
metaGetTableSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
,
bool
isinline
);
STSchema
*
metaGetTbTSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
);
void
*
metaGetSmaInfoByIndex
(
SMeta
*
pMeta
,
int64_t
indexUid
,
bool
isDecode
);
STSmaWrapper
*
metaGetSmaInfoByTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
SArray
*
metaGetSmaTbUids
(
SMeta
*
pMeta
,
bool
isDup
);
int
metaGetTbNum
(
SMeta
*
pMeta
);
SMTbCursor
*
metaOpenTbCursor
(
SMeta
*
pMeta
);
SMTbCursor
*
metaOpenTbCursor
(
SMeta
*
pMeta
);
void
metaCloseTbCursor
(
SMTbCursor
*
pTbCur
);
void
metaCloseTbCursor
(
SMTbCursor
*
pTbCur
);
char
*
metaTbCursorNext
(
SMTbCursor
*
pTbCur
);
char
*
metaTbCursorNext
(
SMTbCursor
*
pTbCur
);
SMCtbCursor
*
metaOpenCtbCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
void
metaCloseCtbCurosr
(
SMCtbCursor
*
pCtbCur
);
tb_uid_t
metaCtbCursorNext
(
SMCtbCursor
*
pCtbCur
);
SMSmaCursor
*
metaOpenSmaCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
void
metaCloseSmaCursor
(
SMSmaCursor
*
pSmaCur
);
int64_t
metaSmaCursorNext
(
SMSmaCursor
*
pSmaCur
);
// tsdb
// tsdb
typedef
struct
STsdb
STsdb
;
typedef
struct
STsdb
STsdb
;
...
@@ -98,18 +79,7 @@ typedef void *tsdbReaderT;
...
@@ -98,18 +79,7 @@ typedef void *tsdbReaderT;
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
#define BLOCK_LOAD_TABLE_RR_ORDER 3
#define TABLE_TID(t) (t)->tid
#define TABLE_UID(t) (t)->uid
STsdb
*
tsdbOpen
(
const
char
*
path
,
int32_t
vgId
,
const
STsdbCfg
*
pTsdbCfg
,
SMemAllocatorFactory
*
pMAF
,
SMeta
*
pMeta
,
STfs
*
pTfs
);
void
tsdbClose
(
STsdb
*
);
void
tsdbRemove
(
const
char
*
path
);
int
tsdbInsertData
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
,
SSubmitRsp
*
pRsp
);
int
tsdbPrepareCommit
(
STsdb
*
pTsdb
);
int
tsdbCommit
(
STsdb
*
pTsdb
);
int32_t
tsdbInitSma
(
STsdb
*
pTsdb
);
int32_t
tsdbCreateTSma
(
STsdb
*
pTsdb
,
char
*
pMsg
);
int32_t
tsdbDropTSma
(
STsdb
*
pTsdb
,
char
*
pMsg
);
tsdbReaderT
*
tsdbQueryTables
(
STsdb
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
tableInfoGroup
,
uint64_t
qId
,
tsdbReaderT
*
tsdbQueryTables
(
STsdb
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
tableInfoGroup
,
uint64_t
qId
,
uint64_t
taskId
);
uint64_t
taskId
);
tsdbReaderT
tsdbQueryCacheLast
(
STsdb
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
tsdbReaderT
tsdbQueryCacheLast
(
STsdb
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
...
@@ -127,18 +97,8 @@ SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumn
...
@@ -127,18 +97,8 @@ SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumn
void
tsdbDestroyTableGroup
(
STableGroupInfo
*
pGroupList
);
void
tsdbDestroyTableGroup
(
STableGroupInfo
*
pGroupList
);
int32_t
tsdbGetOneTableGroup
(
void
*
pMeta
,
uint64_t
uid
,
TSKEY
startKey
,
STableGroupInfo
*
pGroupInfo
);
int32_t
tsdbGetOneTableGroup
(
void
*
pMeta
,
uint64_t
uid
,
TSKEY
startKey
,
STableGroupInfo
*
pGroupInfo
);
int32_t
tsdbGetTableGroupFromIdList
(
STsdb
*
tsdb
,
SArray
*
pTableIdList
,
STableGroupInfo
*
pGroupInfo
);
int32_t
tsdbGetTableGroupFromIdList
(
STsdb
*
tsdb
,
SArray
*
pTableIdList
,
STableGroupInfo
*
pGroupInfo
);
void
tsdbCleanupReadHandle
(
tsdbReaderT
queryHandle
);
int32_t
tsdbUpdateSmaWindow
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
,
int64_t
version
);
int32_t
tsdbInsertTSmaData
(
STsdb
*
pTsdb
,
int64_t
indexUid
,
const
char
*
msg
);
int32_t
tsdbDropTSmaData
(
STsdb
*
pTsdb
,
int64_t
indexUid
);
int32_t
tsdbInsertRSmaData
(
STsdb
*
pTsdb
,
char
*
msg
);
// tq
// tq
enum
{
TQ_STREAM_TOKEN__DATA
=
1
,
TQ_STREAM_TOKEN__WATERMARK
,
TQ_STREAM_TOKEN__CHECKPOINT
,
};
typedef
struct
STqReadHandle
STqReadHandle
;
typedef
struct
STqReadHandle
STqReadHandle
;
...
@@ -153,9 +113,6 @@ int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockIn
...
@@ -153,9 +113,6 @@ int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockIn
SArray
*
tqRetrieveDataBlock
(
STqReadHandle
*
pHandle
);
SArray
*
tqRetrieveDataBlock
(
STqReadHandle
*
pHandle
);
// need to reposition
// need to reposition
typedef
struct
SMgmtWrapper
SMgmtWrapper
;
int32_t
tdScanAndConvertSubmitMsg
(
SSubmitReq
*
pMsg
);
// structs
// structs
struct
SMetaCfg
{
struct
SMetaCfg
{
...
@@ -202,21 +159,6 @@ struct SVnodeCfg {
...
@@ -202,21 +159,6 @@ struct SVnodeCfg {
int8_t
hashMethod
;
int8_t
hashMethod
;
};
};
struct
STqReadHandle
{
int64_t
ver
;
int64_t
tbUid
;
SHashObj
*
tbIdHash
;
const
SSubmitReq
*
pMsg
;
SSubmitBlk
*
pBlock
;
SSubmitMsgIter
msgIter
;
SSubmitBlkIter
blkIter
;
SMeta
*
pVnodeMeta
;
SArray
*
pColIdList
;
// SArray<int32_t>
int32_t
sver
;
SSchemaWrapper
*
pSchemaWrapper
;
STSchema
*
pSchema
;
};
struct
SDataStatis
{
struct
SDataStatis
{
int16_t
colId
;
int16_t
colId
;
int16_t
maxIndex
;
int16_t
maxIndex
;
...
@@ -241,22 +183,6 @@ typedef struct {
...
@@ -241,22 +183,6 @@ typedef struct {
uint64_t
uid
;
uint64_t
uid
;
}
STableKeyInfo
;
}
STableKeyInfo
;
typedef
struct
STable
{
uint64_t
tid
;
uint64_t
uid
;
STSchema
*
pSchema
;
}
STable
;
typedef
struct
{
int8_t
type
;
int8_t
reserved
[
7
];
union
{
void
*
data
;
int64_t
wmTs
;
int64_t
checkpointId
;
};
}
STqStreamToken
;
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
source/dnode/vnode/src/inc/meta.h
浏览文件 @
a9a24a14
...
@@ -23,6 +23,22 @@ extern "C" {
...
@@ -23,6 +23,22 @@ extern "C" {
typedef
struct
SMetaCache
SMetaCache
;
typedef
struct
SMetaCache
SMetaCache
;
typedef
struct
SMetaIdx
SMetaIdx
;
typedef
struct
SMetaIdx
SMetaIdx
;
typedef
struct
SMetaDB
SMetaDB
;
typedef
struct
SMetaDB
SMetaDB
;
typedef
struct
SMCtbCursor
SMCtbCursor
;
typedef
struct
SMSmaCursor
SMSmaCursor
;
// metaDebug ==================
// clang-format off
#define metaFatal(...) do { if (metaDebugFlag & DEBUG_FATAL) { taosPrintLog("META FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define metaError(...) do { if (metaDebugFlag & DEBUG_ERROR) { taosPrintLog("META ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
#define metaWarn(...) do { if (metaDebugFlag & DEBUG_WARN) { taosPrintLog("META WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#define metaInfo(...) do { if (metaDebugFlag & DEBUG_INFO) { taosPrintLog("META ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define metaDebug(...) do { if (metaDebugFlag & DEBUG_DEBUG) { taosPrintLog("META ", DEBUG_DEBUG, metaDebugFlag, __VA_ARGS__); }} while(0)
#define metaTrace(...) do { if (metaDebugFlag & DEBUG_TRACE) { taosPrintLog("META ", DEBUG_TRACE, metaDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
#define META_SUPER_TABLE TD_SUPER_TABLE
#define META_CHILD_TABLE TD_CHILD_TABLE
#define META_NORMAL_TABLE TD_NORMAL_TABLE
SMeta
*
metaOpen
(
const
char
*
path
,
const
SMetaCfg
*
pMetaCfg
,
SMemAllocatorFactory
*
pMAF
);
SMeta
*
metaOpen
(
const
char
*
path
,
const
SMetaCfg
*
pMetaCfg
,
SMemAllocatorFactory
*
pMAF
);
void
metaClose
(
SMeta
*
pMeta
);
void
metaClose
(
SMeta
*
pMeta
);
...
@@ -34,6 +50,18 @@ int32_t metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg);
...
@@ -34,6 +50,18 @@ int32_t metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg);
int32_t
metaDropTSma
(
SMeta
*
pMeta
,
int64_t
indexUid
);
int32_t
metaDropTSma
(
SMeta
*
pMeta
,
int64_t
indexUid
);
STbCfg
*
metaGetTbInfoByUid
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
STbCfg
*
metaGetTbInfoByUid
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
STbCfg
*
metaGetTbInfoByName
(
SMeta
*
pMeta
,
char
*
tbname
,
tb_uid_t
*
uid
);
STbCfg
*
metaGetTbInfoByName
(
SMeta
*
pMeta
,
char
*
tbname
,
tb_uid_t
*
uid
);
SSchemaWrapper
*
metaGetTableSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
,
bool
isinline
);
STSchema
*
metaGetTbTSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
);
void
*
metaGetSmaInfoByIndex
(
SMeta
*
pMeta
,
int64_t
indexUid
,
bool
isDecode
);
STSmaWrapper
*
metaGetSmaInfoByTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
SArray
*
metaGetSmaTbUids
(
SMeta
*
pMeta
,
bool
isDup
);
int
metaGetTbNum
(
SMeta
*
pMeta
);
SMSmaCursor
*
metaOpenSmaCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
void
metaCloseSmaCursor
(
SMSmaCursor
*
pSmaCur
);
int64_t
metaSmaCursorNext
(
SMSmaCursor
*
pSmaCur
);
SMCtbCursor
*
metaOpenCtbCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
void
metaCloseCtbCurosr
(
SMCtbCursor
*
pCtbCur
);
tb_uid_t
metaCtbCursorNext
(
SMCtbCursor
*
pCtbCur
);
// SMetaDB
// SMetaDB
int
metaOpenDB
(
SMeta
*
pMeta
);
int
metaOpenDB
(
SMeta
*
pMeta
);
...
@@ -47,11 +75,6 @@ int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid);
...
@@ -47,11 +75,6 @@ int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid);
int
metaOpenCache
(
SMeta
*
pMeta
);
int
metaOpenCache
(
SMeta
*
pMeta
);
void
metaCloseCache
(
SMeta
*
pMeta
);
void
metaCloseCache
(
SMeta
*
pMeta
);
// SMetaCfg
extern
const
SMetaCfg
defaultMetaOptions
;
// int metaValidateOptions(const SMetaCfg*);
void
metaOptionsCopy
(
SMetaCfg
*
pDest
,
const
SMetaCfg
*
pSrc
);
// SMetaIdx
// SMetaIdx
int
metaOpenIdx
(
SMeta
*
pMeta
);
int
metaOpenIdx
(
SMeta
*
pMeta
);
void
metaCloseIdx
(
SMeta
*
pMeta
);
void
metaCloseIdx
(
SMeta
*
pMeta
);
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
a9a24a14
...
@@ -20,48 +20,21 @@
...
@@ -20,48 +20,21 @@
extern
"C"
{
extern
"C"
{
#endif
#endif
// tqInt.h
// tqDebug ===================
#define tqFatal(...) \
// clang-format off
{ \
#define tqFatal(...) do { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
if (tqDebugFlag & DEBUG_FATAL) { \
#define tqError(...) do { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
taosPrintLog("TQ FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \
#define tqWarn(...) do { if (tqDebugFlag & DEBUG_WARN) { taosPrintLog("TQ WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
} \
#define tqInfo(...) do { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
}
#define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqError(...) \
// clang-format on
{ \
if (tqDebugFlag & DEBUG_ERROR) { \
enum
{
taosPrintLog("TQ ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \
TQ_STREAM_TOKEN__DATA
=
1
,
} \
TQ_STREAM_TOKEN__WATERMARK
,
}
TQ_STREAM_TOKEN__CHECKPOINT
,
};
#define tqWarn(...) \
{ \
if (tqDebugFlag & DEBUG_WARN) { \
taosPrintLog("TQ WARN ", DEBUG_WARN, 255, __VA_ARGS__); \
} \
}
#define tqInfo(...) \
{ \
if (tqDebugFlag & DEBUG_INFO) { \
taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); \
} \
}
#define tqDebug(...) \
{ \
if (tqDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); \
} \
}
#define tqTrace(...) \
{ \
if (tqDebugFlag & DEBUG_TRACE) { \
taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); \
} \
}
#define TQ_BUFFER_SIZE 4
#define TQ_BUFFER_SIZE 4
...
@@ -105,6 +78,31 @@ typedef enum { TQ_ITEM_READY, TQ_ITEM_PROCESS, TQ_ITEM_EMPTY } STqItemStatus;
...
@@ -105,6 +78,31 @@ typedef enum { TQ_ITEM_READY, TQ_ITEM_PROCESS, TQ_ITEM_EMPTY } STqItemStatus;
typedef
struct
STqOffsetCfg
STqOffsetCfg
;
typedef
struct
STqOffsetCfg
STqOffsetCfg
;
typedef
struct
STqOffsetStore
STqOffsetStore
;
typedef
struct
STqOffsetStore
STqOffsetStore
;
struct
STqReadHandle
{
int64_t
ver
;
int64_t
tbUid
;
SHashObj
*
tbIdHash
;
const
SSubmitReq
*
pMsg
;
SSubmitBlk
*
pBlock
;
SSubmitMsgIter
msgIter
;
SSubmitBlkIter
blkIter
;
SMeta
*
pVnodeMeta
;
SArray
*
pColIdList
;
// SArray<int32_t>
int32_t
sver
;
SSchemaWrapper
*
pSchemaWrapper
;
STSchema
*
pSchema
;
};
typedef
struct
{
int8_t
type
;
int8_t
reserved
[
7
];
union
{
void
*
data
;
int64_t
wmTs
;
int64_t
checkpointId
;
};
}
STqStreamToken
;
typedef
struct
{
typedef
struct
{
int16_t
ver
;
int16_t
ver
;
int16_t
action
;
int16_t
action
;
...
@@ -248,6 +246,25 @@ typedef struct {
...
@@ -248,6 +246,25 @@ typedef struct {
static
STqPushMgmt
tqPushMgmt
;
static
STqPushMgmt
tqPushMgmt
;
// init once
int
tqInit
();
void
tqCleanUp
();
// open in each vnode
STQ
*
tqOpen
(
const
char
*
path
,
SVnode
*
pVnode
,
SWal
*
pWal
,
SMeta
*
pMeta
,
STqCfg
*
tqConfig
,
SMemAllocatorFactory
*
allocFac
);
void
tqClose
(
STQ
*
);
// required by vnode
int
tqPushMsg
(
STQ
*
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
version
);
int
tqCommit
(
STQ
*
);
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
,
int32_t
workerId
);
int32_t
tqProcessSetConnReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessRebReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessCancelConnReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessTaskExec
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
,
int32_t
workerId
);
int32_t
tqProcessTaskDeploy
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcessStreamTrigger
(
STQ
*
pTq
,
void
*
data
,
int32_t
dataLen
,
int32_t
workerId
);
int32_t
tqSerializeConsumer
(
const
STqConsumer
*
,
STqSerializedHead
**
);
int32_t
tqSerializeConsumer
(
const
STqConsumer
*
,
STqSerializedHead
**
);
int32_t
tqDeserializeConsumer
(
STQ
*
,
const
STqSerializedHead
*
,
STqConsumer
**
);
int32_t
tqDeserializeConsumer
(
STQ
*
,
const
STqSerializedHead
*
,
STqConsumer
**
);
...
...
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
a9a24a14
...
@@ -20,10 +20,46 @@
...
@@ -20,10 +20,46 @@
extern
"C"
{
extern
"C"
{
#endif
#endif
// tsdbDebug ================
// clang-format off
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TSDB ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TSDB WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSDB ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
typedef
struct
SSmaStat
SSmaStat
;
typedef
struct
SSmaStat
SSmaStat
;
typedef
struct
SSmaEnv
SSmaEnv
;
typedef
struct
SSmaEnv
SSmaEnv
;
typedef
struct
SSmaEnvs
SSmaEnvs
;
typedef
struct
SSmaEnvs
SSmaEnvs
;
typedef
struct
STable
{
uint64_t
tid
;
uint64_t
uid
;
STSchema
*
pSchema
;
}
STable
;
#define TABLE_TID(t) (t)->tid
#define TABLE_UID(t) (t)->uid
STsdb
*
tsdbOpen
(
const
char
*
path
,
int32_t
vgId
,
const
STsdbCfg
*
pTsdbCfg
,
SMemAllocatorFactory
*
pMAF
,
SMeta
*
pMeta
,
STfs
*
pTfs
);
void
tsdbClose
(
STsdb
*
);
void
tsdbRemove
(
const
char
*
path
);
int
tsdbInsertData
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
,
SSubmitRsp
*
pRsp
);
int
tsdbPrepareCommit
(
STsdb
*
pTsdb
);
int
tsdbCommit
(
STsdb
*
pTsdb
);
int32_t
tsdbInitSma
(
STsdb
*
pTsdb
);
int32_t
tsdbCreateTSma
(
STsdb
*
pTsdb
,
char
*
pMsg
);
int32_t
tsdbDropTSma
(
STsdb
*
pTsdb
,
char
*
pMsg
);
int32_t
tsdbUpdateSmaWindow
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
,
int64_t
version
);
int32_t
tsdbInsertTSmaData
(
STsdb
*
pTsdb
,
int64_t
indexUid
,
const
char
*
msg
);
int32_t
tsdbDropTSmaData
(
STsdb
*
pTsdb
,
int64_t
indexUid
);
int32_t
tsdbInsertRSmaData
(
STsdb
*
pTsdb
,
char
*
msg
);
void
tsdbCleanupReadHandle
(
tsdbReaderT
queryHandle
);
int32_t
tdScanAndConvertSubmitMsg
(
SSubmitReq
*
pMsg
);
typedef
enum
{
typedef
enum
{
TSDB_FILE_HEAD
=
0
,
// .head
TSDB_FILE_HEAD
=
0
,
// .head
TSDB_FILE_DATA
,
// .data
TSDB_FILE_DATA
,
// .data
...
@@ -93,7 +129,7 @@ typedef struct STsdbMemTable {
...
@@ -93,7 +129,7 @@ typedef struct STsdbMemTable {
SMemAllocator
*
pMA
;
SMemAllocator
*
pMA
;
// Container
// Container
SSkipList
*
pSlIdx
;
// SSkiplist<STbData>
SSkipList
*
pSlIdx
;
// SSkiplist<STbData>
SHashObj
*
pHashIdx
;
SHashObj
*
pHashIdx
;
}
STsdbMemTable
;
}
STsdbMemTable
;
typedef
struct
{
typedef
struct
{
...
@@ -105,16 +141,16 @@ typedef struct {
...
@@ -105,16 +141,16 @@ typedef struct {
// ==================
// ==================
typedef
struct
{
typedef
struct
{
STsdbFSMeta
meta
;
// FS meta
STsdbFSMeta
meta
;
// FS meta
SArray
*
df
;
// data file array
SArray
*
df
;
// data file array
SArray
*
sf
;
// sma data file array v2f1900.index_name_1
SArray
*
sf
;
// sma data file array v2f1900.index_name_1
}
SFSStatus
;
}
SFSStatus
;
typedef
struct
{
typedef
struct
{
TdThreadRwlock
lock
;
TdThreadRwlock
lock
;
SFSStatus
*
cstatus
;
// current status
SFSStatus
*
cstatus
;
// current status
SHashObj
*
metaCache
;
// meta cache
SHashObj
*
metaCache
;
// meta cache
SHashObj
*
metaCacheComp
;
// meta cache for compact
SHashObj
*
metaCacheComp
;
// meta cache for compact
bool
intxn
;
bool
intxn
;
SFSStatus
*
nstatus
;
// new status
SFSStatus
*
nstatus
;
// new status
}
STsdbFS
;
}
STsdbFS
;
...
@@ -123,15 +159,15 @@ struct STsdb {
...
@@ -123,15 +159,15 @@ struct STsdb {
int32_t
vgId
;
int32_t
vgId
;
bool
repoLocked
;
bool
repoLocked
;
TdThreadMutex
mutex
;
TdThreadMutex
mutex
;
char
*
path
;
char
*
path
;
STsdbCfg
config
;
STsdbCfg
config
;
STsdbMemTable
*
mem
;
STsdbMemTable
*
mem
;
STsdbMemTable
*
imem
;
STsdbMemTable
*
imem
;
SRtn
rtn
;
SRtn
rtn
;
SMemAllocatorFactory
*
pmaf
;
SMemAllocatorFactory
*
pmaf
;
STsdbFS
*
fs
;
STsdbFS
*
fs
;
SMeta
*
pMeta
;
SMeta
*
pMeta
;
STfs
*
pTfs
;
STfs
*
pTfs
;
SSmaEnvs
smaEnvs
;
SSmaEnvs
smaEnvs
;
};
};
...
@@ -153,16 +189,6 @@ static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STable *pTable, bool lock,
...
@@ -153,16 +189,6 @@ static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STable *pTable, bool lock,
return
pTable
->
pSchema
;
return
pTable
->
pSchema
;
}
}
// tsdbLog
extern
int32_t
tsdbDebugFlag
;
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TSDB ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TSDB WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSDB ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// tsdbMemTable.h
// tsdbMemTable.h
typedef
struct
{
typedef
struct
{
int
rowsInserted
;
int
rowsInserted
;
...
@@ -174,10 +200,10 @@ typedef struct {
...
@@ -174,10 +200,10 @@ typedef struct {
TSKEY
keyLast
;
TSKEY
keyLast
;
}
SMergeInfo
;
}
SMergeInfo
;
static
void
*
taosTMalloc
(
size_t
size
);
static
void
*
taosTMalloc
(
size_t
size
);
static
void
*
taosTCalloc
(
size_t
nmemb
,
size_t
size
);
static
void
*
taosTCalloc
(
size_t
nmemb
,
size_t
size
);
static
void
*
taosTRealloc
(
void
*
ptr
,
size_t
size
);
static
void
*
taosTRealloc
(
void
*
ptr
,
size_t
size
);
static
void
*
taosTZfree
(
void
*
ptr
);
static
void
*
taosTZfree
(
void
*
ptr
);
static
size_t
taosTSizeof
(
void
*
ptr
);
static
size_t
taosTSizeof
(
void
*
ptr
);
static
void
taosTMemset
(
void
*
ptr
,
int
c
);
static
void
taosTMemset
(
void
*
ptr
,
int
c
);
...
@@ -406,8 +432,8 @@ int tsdbLoadBlockIdx(SReadH *pReadh);
...
@@ -406,8 +432,8 @@ int tsdbLoadBlockIdx(SReadH *pReadh);
int
tsdbSetReadTable
(
SReadH
*
pReadh
,
STable
*
pTable
);
int
tsdbSetReadTable
(
SReadH
*
pReadh
,
STable
*
pTable
);
int
tsdbLoadBlockInfo
(
SReadH
*
pReadh
,
void
*
pTarget
);
int
tsdbLoadBlockInfo
(
SReadH
*
pReadh
,
void
*
pTarget
);
int
tsdbLoadBlockData
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SBlockInfo
*
pBlockInfo
);
int
tsdbLoadBlockData
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SBlockInfo
*
pBlockInfo
);
int
tsdbLoadBlockDataCols
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SBlockInfo
*
pBlkInfo
,
const
int16_t
*
col
Ids
,
int
tsdbLoadBlockDataCols
(
SReadH
*
pReadh
,
SBlock
*
pBlock
,
SBlockInfo
*
pBlkInfo
,
const
int16_t
*
colIds
,
int
numOfCols
Ids
,
int
numOfColsIds
,
bool
mergeBitmap
);
bool
mergeBitmap
);
int
tsdbLoadBlockStatis
(
SReadH
*
pReadh
,
SBlock
*
pBlock
);
int
tsdbLoadBlockStatis
(
SReadH
*
pReadh
,
SBlock
*
pBlock
);
int
tsdbEncodeSBlockIdx
(
void
**
buf
,
SBlockIdx
*
pIdx
);
int
tsdbEncodeSBlockIdx
(
void
**
buf
,
SBlockIdx
*
pIdx
);
void
*
tsdbDecodeSBlockIdx
(
void
*
buf
,
SBlockIdx
*
pIdx
);
void
*
tsdbDecodeSBlockIdx
(
void
*
buf
,
SBlockIdx
*
pIdx
);
...
@@ -448,7 +474,7 @@ static FORCE_INLINE void *taosTMalloc(size_t size) {
...
@@ -448,7 +474,7 @@ static FORCE_INLINE void *taosTMalloc(size_t size) {
static
FORCE_INLINE
void
*
taosTCalloc
(
size_t
nmemb
,
size_t
size
)
{
static
FORCE_INLINE
void
*
taosTCalloc
(
size_t
nmemb
,
size_t
size
)
{
size_t
tsize
=
nmemb
*
size
;
size_t
tsize
=
nmemb
*
size
;
void
*
ret
=
taosTMalloc
(
tsize
);
void
*
ret
=
taosTMalloc
(
tsize
);
if
(
ret
==
NULL
)
return
NULL
;
if
(
ret
==
NULL
)
return
NULL
;
taosTMemset
(
ret
,
0
);
taosTMemset
(
ret
,
0
);
...
@@ -459,14 +485,14 @@ static FORCE_INLINE size_t taosTSizeof(void *ptr) { return (ptr) ? (*(size_t *)(
...
@@ -459,14 +485,14 @@ static FORCE_INLINE size_t taosTSizeof(void *ptr) { return (ptr) ? (*(size_t *)(
static
FORCE_INLINE
void
taosTMemset
(
void
*
ptr
,
int
c
)
{
memset
(
ptr
,
c
,
taosTSizeof
(
ptr
));
}
static
FORCE_INLINE
void
taosTMemset
(
void
*
ptr
,
int
c
)
{
memset
(
ptr
,
c
,
taosTSizeof
(
ptr
));
}
static
FORCE_INLINE
void
*
taosTRealloc
(
void
*
ptr
,
size_t
size
)
{
static
FORCE_INLINE
void
*
taosTRealloc
(
void
*
ptr
,
size_t
size
)
{
if
(
ptr
==
NULL
)
return
taosTMalloc
(
size
);
if
(
ptr
==
NULL
)
return
taosTMalloc
(
size
);
if
(
size
<=
taosTSizeof
(
ptr
))
return
ptr
;
if
(
size
<=
taosTSizeof
(
ptr
))
return
ptr
;
void
*
tptr
=
(
void
*
)((
char
*
)
ptr
-
sizeof
(
size_t
));
void
*
tptr
=
(
void
*
)((
char
*
)
ptr
-
sizeof
(
size_t
));
size_t
tsize
=
size
+
sizeof
(
size_t
);
size_t
tsize
=
size
+
sizeof
(
size_t
);
void
*
tptr1
=
taosMemoryRealloc
(
tptr
,
tsize
);
void
*
tptr1
=
taosMemoryRealloc
(
tptr
,
tsize
);
if
(
tptr1
==
NULL
)
return
NULL
;
if
(
tptr1
==
NULL
)
return
NULL
;
tptr
=
tptr1
;
tptr
=
tptr1
;
...
@@ -475,9 +501,9 @@ static FORCE_INLINE void * taosTRealloc(void *ptr, size_t size) {
...
@@ -475,9 +501,9 @@ static FORCE_INLINE void * taosTRealloc(void *ptr, size_t size) {
return
(
void
*
)((
char
*
)
tptr
+
sizeof
(
size_t
));
return
(
void
*
)((
char
*
)
tptr
+
sizeof
(
size_t
));
}
}
static
FORCE_INLINE
void
*
taosTZfree
(
void
*
ptr
)
{
static
FORCE_INLINE
void
*
taosTZfree
(
void
*
ptr
)
{
if
(
ptr
)
{
if
(
ptr
)
{
taosMemoryFree
((
void
*
)((
char
*
)
ptr
-
sizeof
(
size_t
)));
taosMemoryFree
((
void
*
)((
char
*
)
ptr
-
sizeof
(
size_t
)));
}
}
return
NULL
;
return
NULL
;
}
}
...
@@ -576,19 +602,18 @@ static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest v
...
@@ -576,19 +602,18 @@ static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest v
}
}
}
}
void
tsdbInitDFile
(
STsdb
*
pRepo
,
SDFile
*
pDFile
,
SDiskID
did
,
int
fid
,
uint32_t
ver
,
TSDB_FILE_T
ftype
);
void
tsdbInitDFileEx
(
SDFile
*
pDFile
,
SDFile
*
pODFile
);
int
tsdbEncodeSDFile
(
void
**
buf
,
SDFile
*
pDFile
);
void
*
tsdbDecodeSDFile
(
STsdb
*
pRepo
,
void
*
buf
,
SDFile
*
pDFile
);
int
tsdbCreateDFile
(
STsdb
*
pRepo
,
SDFile
*
pDFile
,
bool
updateHeader
,
TSDB_FILE_T
fType
);
int
tsdbUpdateDFileHeader
(
SDFile
*
pDFile
);
int
tsdbLoadDFileHeader
(
SDFile
*
pDFile
,
SDFInfo
*
pInfo
);
int
tsdbParseDFilename
(
const
char
*
fname
,
int
*
vid
,
int
*
fid
,
TSDB_FILE_T
*
ftype
,
uint32_t
*
version
);
void
tsdbInitDFile
(
STsdb
*
pRepo
,
SDFile
*
pDFile
,
SDiskID
did
,
int
fid
,
uint32_t
ver
,
TSDB_FILE_T
ftype
);
static
FORCE_INLINE
void
tsdbSetDFileInfo
(
SDFile
*
pDFile
,
SDFInfo
*
pInfo
)
{
pDFile
->
info
=
*
pInfo
;
}
void
tsdbInitDFileEx
(
SDFile
*
pDFile
,
SDFile
*
pODFile
);
int
tsdbEncodeSDFile
(
void
**
buf
,
SDFile
*
pDFile
);
void
*
tsdbDecodeSDFile
(
STsdb
*
pRepo
,
void
*
buf
,
SDFile
*
pDFile
);
int
tsdbCreateDFile
(
STsdb
*
pRepo
,
SDFile
*
pDFile
,
bool
updateHeader
,
TSDB_FILE_T
fType
);
int
tsdbUpdateDFileHeader
(
SDFile
*
pDFile
);
int
tsdbLoadDFileHeader
(
SDFile
*
pDFile
,
SDFInfo
*
pInfo
);
int
tsdbParseDFilename
(
const
char
*
fname
,
int
*
vid
,
int
*
fid
,
TSDB_FILE_T
*
ftype
,
uint32_t
*
version
);
static
FORCE_INLINE
void
tsdbSetDFileInfo
(
SDFile
*
pDFile
,
SDFInfo
*
pInfo
)
{
pDFile
->
info
=
*
pInfo
;
}
static
FORCE_INLINE
int
tsdbOpenDFile
(
SDFile
*
pDFile
,
int
flags
)
{
static
FORCE_INLINE
int
tsdbOpenDFile
(
SDFile
*
pDFile
,
int
flags
)
{
ASSERT
(
!
TSDB_FILE_OPENED
(
pDFile
));
ASSERT
(
!
TSDB_FILE_OPENED
(
pDFile
));
pDFile
->
pFile
=
taosOpenFile
(
TSDB_FILE_FULL_NAME
(
pDFile
),
flags
);
pDFile
->
pFile
=
taosOpenFile
(
TSDB_FILE_FULL_NAME
(
pDFile
),
flags
);
...
@@ -600,14 +625,14 @@ static FORCE_INLINE int tsdbOpenDFile(SDFile* pDFile, int flags) {
...
@@ -600,14 +625,14 @@ static FORCE_INLINE int tsdbOpenDFile(SDFile* pDFile, int flags) {
return
0
;
return
0
;
}
}
static
FORCE_INLINE
void
tsdbCloseDFile
(
SDFile
*
pDFile
)
{
static
FORCE_INLINE
void
tsdbCloseDFile
(
SDFile
*
pDFile
)
{
if
(
TSDB_FILE_OPENED
(
pDFile
))
{
if
(
TSDB_FILE_OPENED
(
pDFile
))
{
taosCloseFile
(
&
pDFile
->
pFile
);
taosCloseFile
(
&
pDFile
->
pFile
);
TSDB_FILE_SET_CLOSED
(
pDFile
);
TSDB_FILE_SET_CLOSED
(
pDFile
);
}
}
}
}
static
FORCE_INLINE
int64_t
tsdbSeekDFile
(
SDFile
*
pDFile
,
int64_t
offset
,
int
whence
)
{
static
FORCE_INLINE
int64_t
tsdbSeekDFile
(
SDFile
*
pDFile
,
int64_t
offset
,
int
whence
)
{
// ASSERT(TSDB_FILE_OPENED(pDFile));
// ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t
loffset
=
taosLSeekFile
(
TSDB_FILE_PFILE
(
pDFile
),
offset
,
whence
);
int64_t
loffset
=
taosLSeekFile
(
TSDB_FILE_PFILE
(
pDFile
),
offset
,
whence
);
...
@@ -619,7 +644,7 @@ static FORCE_INLINE int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int wh
...
@@ -619,7 +644,7 @@ static FORCE_INLINE int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int wh
return
loffset
;
return
loffset
;
}
}
static
FORCE_INLINE
int64_t
tsdbWriteDFile
(
SDFile
*
pDFile
,
void
*
buf
,
int64_t
nbyte
)
{
static
FORCE_INLINE
int64_t
tsdbWriteDFile
(
SDFile
*
pDFile
,
void
*
buf
,
int64_t
nbyte
)
{
ASSERT
(
TSDB_FILE_OPENED
(
pDFile
));
ASSERT
(
TSDB_FILE_OPENED
(
pDFile
));
int64_t
nwrite
=
taosWriteFile
(
pDFile
->
pFile
,
buf
,
nbyte
);
int64_t
nwrite
=
taosWriteFile
(
pDFile
->
pFile
,
buf
,
nbyte
);
...
@@ -631,11 +656,11 @@ static FORCE_INLINE int64_t tsdbWriteDFile(SDFile* pDFile, void* buf, int64_t nb
...
@@ -631,11 +656,11 @@ static FORCE_INLINE int64_t tsdbWriteDFile(SDFile* pDFile, void* buf, int64_t nb
return
nwrite
;
return
nwrite
;
}
}
static
FORCE_INLINE
void
tsdbUpdateDFileMagic
(
SDFile
*
pDFile
,
void
*
pCksm
)
{
static
FORCE_INLINE
void
tsdbUpdateDFileMagic
(
SDFile
*
pDFile
,
void
*
pCksm
)
{
pDFile
->
info
.
magic
=
taosCalcChecksum
(
pDFile
->
info
.
magic
,
(
uint8_t
*
)(
pCksm
),
sizeof
(
TSCKSUM
));
pDFile
->
info
.
magic
=
taosCalcChecksum
(
pDFile
->
info
.
magic
,
(
uint8_t
*
)(
pCksm
),
sizeof
(
TSCKSUM
));
}
}
static
FORCE_INLINE
int
tsdbAppendDFile
(
SDFile
*
pDFile
,
void
*
buf
,
int64_t
nbyte
,
int64_t
*
offset
)
{
static
FORCE_INLINE
int
tsdbAppendDFile
(
SDFile
*
pDFile
,
void
*
buf
,
int64_t
nbyte
,
int64_t
*
offset
)
{
ASSERT
(
TSDB_FILE_OPENED
(
pDFile
));
ASSERT
(
TSDB_FILE_OPENED
(
pDFile
));
int64_t
toffset
;
int64_t
toffset
;
...
@@ -659,9 +684,9 @@ static FORCE_INLINE int tsdbAppendDFile(SDFile* pDFile, void* buf, int64_t nbyte
...
@@ -659,9 +684,9 @@ static FORCE_INLINE int tsdbAppendDFile(SDFile* pDFile, void* buf, int64_t nbyte
return
(
int
)
nbyte
;
return
(
int
)
nbyte
;
}
}
static
FORCE_INLINE
int
tsdbRemoveDFile
(
SDFile
*
pDFile
)
{
return
tfsRemoveFile
(
TSDB_FILE_F
(
pDFile
));
}
static
FORCE_INLINE
int
tsdbRemoveDFile
(
SDFile
*
pDFile
)
{
return
tfsRemoveFile
(
TSDB_FILE_F
(
pDFile
));
}
static
FORCE_INLINE
int64_t
tsdbReadDFile
(
SDFile
*
pDFile
,
void
*
buf
,
int64_t
nbyte
)
{
static
FORCE_INLINE
int64_t
tsdbReadDFile
(
SDFile
*
pDFile
,
void
*
buf
,
int64_t
nbyte
)
{
ASSERT
(
TSDB_FILE_OPENED
(
pDFile
));
ASSERT
(
TSDB_FILE_OPENED
(
pDFile
));
int64_t
nread
=
taosReadFile
(
pDFile
->
pFile
,
buf
,
nbyte
);
int64_t
nread
=
taosReadFile
(
pDFile
->
pFile
,
buf
,
nbyte
);
...
@@ -673,7 +698,7 @@ static FORCE_INLINE int64_t tsdbReadDFile(SDFile* pDFile, void* buf, int64_t nby
...
@@ -673,7 +698,7 @@ static FORCE_INLINE int64_t tsdbReadDFile(SDFile* pDFile, void* buf, int64_t nby
return
nread
;
return
nread
;
}
}
static
FORCE_INLINE
int
tsdbCopyDFile
(
SDFile
*
pSrc
,
SDFile
*
pDest
)
{
static
FORCE_INLINE
int
tsdbCopyDFile
(
SDFile
*
pSrc
,
SDFile
*
pDest
)
{
if
(
tfsCopyFile
(
TSDB_FILE_F
(
pSrc
),
TSDB_FILE_F
(
pDest
))
<
0
)
{
if
(
tfsCopyFile
(
TSDB_FILE_F
(
pSrc
),
TSDB_FILE_F
(
pDest
))
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
return
-
1
;
...
@@ -719,24 +744,24 @@ typedef struct {
...
@@ -719,24 +744,24 @@ typedef struct {
} \
} \
} while (0);
} while (0);
void
tsdbInitDFileSet
(
STsdb
*
pRepo
,
SDFileSet
*
pSet
,
SDiskID
did
,
int
fid
,
uint32_t
ver
);
void
tsdbInitDFileSet
(
STsdb
*
pRepo
,
SDFileSet
*
pSet
,
SDiskID
did
,
int
fid
,
uint32_t
ver
);
void
tsdbInitDFileSetEx
(
SDFileSet
*
pSet
,
SDFileSet
*
pOSet
);
void
tsdbInitDFileSetEx
(
SDFileSet
*
pSet
,
SDFileSet
*
pOSet
);
int
tsdbEncodeDFileSet
(
void
**
buf
,
SDFileSet
*
pSet
);
int
tsdbEncodeDFileSet
(
void
**
buf
,
SDFileSet
*
pSet
);
void
*
tsdbDecodeDFileSet
(
STsdb
*
pRepo
,
void
*
buf
,
SDFileSet
*
pSet
);
void
*
tsdbDecodeDFileSet
(
STsdb
*
pRepo
,
void
*
buf
,
SDFileSet
*
pSet
);
int
tsdbEncodeDFileSetEx
(
void
**
buf
,
SDFileSet
*
pSet
);
int
tsdbEncodeDFileSetEx
(
void
**
buf
,
SDFileSet
*
pSet
);
void
*
tsdbDecodeDFileSetEx
(
void
*
buf
,
SDFileSet
*
pSet
);
void
*
tsdbDecodeDFileSetEx
(
void
*
buf
,
SDFileSet
*
pSet
);
int
tsdbApplyDFileSetChange
(
SDFileSet
*
from
,
SDFileSet
*
to
);
int
tsdbApplyDFileSetChange
(
SDFileSet
*
from
,
SDFileSet
*
to
);
int
tsdbCreateDFileSet
(
STsdb
*
pRepo
,
SDFileSet
*
pSet
,
bool
updateHeader
);
int
tsdbCreateDFileSet
(
STsdb
*
pRepo
,
SDFileSet
*
pSet
,
bool
updateHeader
);
int
tsdbUpdateDFileSetHeader
(
SDFileSet
*
pSet
);
int
tsdbUpdateDFileSetHeader
(
SDFileSet
*
pSet
);
int
tsdbScanAndTryFixDFileSet
(
STsdb
*
pRepo
,
SDFileSet
*
pSet
);
int
tsdbScanAndTryFixDFileSet
(
STsdb
*
pRepo
,
SDFileSet
*
pSet
);
static
FORCE_INLINE
void
tsdbCloseDFileSet
(
SDFileSet
*
pSet
)
{
static
FORCE_INLINE
void
tsdbCloseDFileSet
(
SDFileSet
*
pSet
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
tsdbCloseDFile
(
TSDB_DFILE_IN_SET
(
pSet
,
ftype
));
tsdbCloseDFile
(
TSDB_DFILE_IN_SET
(
pSet
,
ftype
));
}
}
}
}
static
FORCE_INLINE
int
tsdbOpenDFileSet
(
SDFileSet
*
pSet
,
int
flags
)
{
static
FORCE_INLINE
int
tsdbOpenDFileSet
(
SDFileSet
*
pSet
,
int
flags
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
if
(
tsdbOpenDFile
(
TSDB_DFILE_IN_SET
(
pSet
,
ftype
),
flags
)
<
0
)
{
if
(
tsdbOpenDFile
(
TSDB_DFILE_IN_SET
(
pSet
,
ftype
),
flags
)
<
0
)
{
tsdbCloseDFileSet
(
pSet
);
tsdbCloseDFileSet
(
pSet
);
...
@@ -746,13 +771,13 @@ static FORCE_INLINE int tsdbOpenDFileSet(SDFileSet* pSet, int flags) {
...
@@ -746,13 +771,13 @@ static FORCE_INLINE int tsdbOpenDFileSet(SDFileSet* pSet, int flags) {
return
0
;
return
0
;
}
}
static
FORCE_INLINE
void
tsdbRemoveDFileSet
(
SDFileSet
*
pSet
)
{
static
FORCE_INLINE
void
tsdbRemoveDFileSet
(
SDFileSet
*
pSet
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
(
void
)
tsdbRemoveDFile
(
TSDB_DFILE_IN_SET
(
pSet
,
ftype
));
(
void
)
tsdbRemoveDFile
(
TSDB_DFILE_IN_SET
(
pSet
,
ftype
));
}
}
}
}
static
FORCE_INLINE
int
tsdbCopyDFileSet
(
SDFileSet
*
pSrc
,
SDFileSet
*
pDest
)
{
static
FORCE_INLINE
int
tsdbCopyDFileSet
(
SDFileSet
*
pSrc
,
SDFileSet
*
pDest
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
if
(
tsdbCopyDFile
(
TSDB_DFILE_IN_SET
(
pSrc
,
ftype
),
TSDB_DFILE_IN_SET
(
pDest
,
ftype
))
<
0
)
{
if
(
tsdbCopyDFile
(
TSDB_DFILE_IN_SET
(
pSrc
,
ftype
),
TSDB_DFILE_IN_SET
(
pDest
,
ftype
))
<
0
)
{
tsdbRemoveDFileSet
(
pDest
);
tsdbRemoveDFileSet
(
pDest
);
...
@@ -763,12 +788,12 @@ static FORCE_INLINE int tsdbCopyDFileSet(SDFileSet* pSrc, SDFileSet* pDest) {
...
@@ -763,12 +788,12 @@ static FORCE_INLINE int tsdbCopyDFileSet(SDFileSet* pSrc, SDFileSet* pDest) {
return
0
;
return
0
;
}
}
static
FORCE_INLINE
void
tsdbGetFidKeyRange
(
int
days
,
int8_t
precision
,
int
fid
,
TSKEY
*
minKey
,
TSKEY
*
maxKey
)
{
static
FORCE_INLINE
void
tsdbGetFidKeyRange
(
int
days
,
int8_t
precision
,
int
fid
,
TSKEY
*
minKey
,
TSKEY
*
maxKey
)
{
*
minKey
=
fid
*
days
*
tsTickPerDay
[
precision
];
*
minKey
=
fid
*
days
*
tsTickPerDay
[
precision
];
*
maxKey
=
*
minKey
+
days
*
tsTickPerDay
[
precision
]
-
1
;
*
maxKey
=
*
minKey
+
days
*
tsTickPerDay
[
precision
]
-
1
;
}
}
static
FORCE_INLINE
bool
tsdbFSetIsOk
(
SDFileSet
*
pSet
)
{
static
FORCE_INLINE
bool
tsdbFSetIsOk
(
SDFileSet
*
pSet
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
for
(
TSDB_FILE_T
ftype
=
0
;
ftype
<
TSDB_FILE_MAX
;
ftype
++
)
{
if
(
TSDB_FILE_IS_BAD
(
TSDB_DFILE_IN_SET
(
pSet
,
ftype
)))
{
if
(
TSDB_FILE_IS_BAD
(
TSDB_DFILE_IN_SET
(
pSet
,
ftype
)))
{
return
false
;
return
false
;
...
@@ -817,7 +842,7 @@ typedef struct {
...
@@ -817,7 +842,7 @@ typedef struct {
typedef
struct
{
typedef
struct
{
int
direction
;
int
direction
;
uint64_t
version
;
// current FS version
uint64_t
version
;
// current FS version
STsdbFS
*
pfs
;
STsdbFS
*
pfs
;
int
index
;
// used to position next fset when version the same
int
index
;
// used to position next fset when version the same
int
fid
;
// used to seek when version is changed
int
fid
;
// used to seek when version is changed
SDFileSet
*
pSet
;
SDFileSet
*
pSet
;
...
@@ -827,7 +852,7 @@ typedef struct {
...
@@ -827,7 +852,7 @@ typedef struct {
#define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC
#define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC
STsdbFS
*
tsdbNewFS
(
const
STsdbCfg
*
pCfg
);
STsdbFS
*
tsdbNewFS
(
const
STsdbCfg
*
pCfg
);
void
*
tsdbFreeFS
(
STsdbFS
*
pfs
);
void
*
tsdbFreeFS
(
STsdbFS
*
pfs
);
int
tsdbOpenFS
(
STsdb
*
pRepo
);
int
tsdbOpenFS
(
STsdb
*
pRepo
);
void
tsdbCloseFS
(
STsdb
*
pRepo
);
void
tsdbCloseFS
(
STsdb
*
pRepo
);
void
tsdbStartFSTxn
(
STsdb
*
pRepo
,
int64_t
pointsAdd
,
int64_t
storageAdd
);
void
tsdbStartFSTxn
(
STsdb
*
pRepo
,
int64_t
pointsAdd
,
int64_t
storageAdd
);
...
@@ -872,7 +897,6 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) {
...
@@ -872,7 +897,6 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) {
// tsdbSma
// tsdbSma
// #define TSDB_SMA_TEST // remove after test finished
// #define TSDB_SMA_TEST // remove after test finished
// struct SSmaEnv {
// struct SSmaEnv {
// TdThreadRwlock lock;
// TdThreadRwlock lock;
// SDiskID did;
// SDiskID did;
...
@@ -888,7 +912,6 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) {
...
@@ -888,7 +912,6 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) {
// #define SMA_ENV_STAT(env) ((env)->pStat)
// #define SMA_ENV_STAT(env) ((env)->pStat)
// #define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems)
// #define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems)
// void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv);
// void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv);
// void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv);
// void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv);
// #if 0
// #if 0
...
@@ -935,5 +958,4 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) {
...
@@ -935,5 +958,4 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) {
}
}
#endif
#endif
#endif
/*_TD_VNODE_TSDB_H_*/
#endif
/*_TD_VNODE_TSDB_H_*/
\ No newline at end of file
source/dnode/vnode/src/inc/vnd.h
0 → 100644
浏览文件 @
a9a24a14
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VND_H_
#define _TD_VND_H_
#ifdef __cplusplus
extern
"C"
{
#endif
// vnodeDebug ====================
// clang-format off
#define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
#define vWarn(...) do { if (vDebugFlag & DEBUG_WARN) { taosPrintLog("VND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#define vInfo(...) do { if (vDebugFlag & DEBUG_INFO) { taosPrintLog("VND ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define vDebug(...) do { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("VND ", DEBUG_DEBUG, vDebugFlag, __VA_ARGS__); }} while(0)
#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
// vnodeModule ====================
int
vnodeScheduleTask
(
int
(
*
execute
)(
void
*
),
void
*
arg
);
// vnodeQuery ====================
int
vnodeQueryOpen
(
SVnode
*
pVnode
);
void
vnodeQueryClose
(
SVnode
*
pVnode
);
#if 1
// SVBufPool
int
vnodeOpenBufPool
(
SVnode
*
pVnode
);
void
vnodeCloseBufPool
(
SVnode
*
pVnode
);
int
vnodeBufPoolSwitch
(
SVnode
*
pVnode
);
int
vnodeBufPoolRecycle
(
SVnode
*
pVnode
);
void
*
vnodeMalloc
(
SVnode
*
pVnode
,
uint64_t
size
);
bool
vnodeBufPoolIsFull
(
SVnode
*
pVnode
);
SMemAllocatorFactory
*
vBufPoolGetMAF
(
SVnode
*
pVnode
);
// SVMemAllocator
typedef
struct
SVArenaNode
{
TD_SLIST_NODE
(
SVArenaNode
);
uint64_t
size
;
// current node size
void
*
ptr
;
char
data
[];
}
SVArenaNode
;
typedef
struct
SVMemAllocator
{
T_REF_DECLARE
()
TD_DLIST_NODE
(
SVMemAllocator
);
uint64_t
capacity
;
uint64_t
ssize
;
uint64_t
lsize
;
SVArenaNode
*
pNode
;
TD_SLIST
(
SVArenaNode
)
nlist
;
}
SVMemAllocator
;
SVMemAllocator
*
vmaCreate
(
uint64_t
capacity
,
uint64_t
ssize
,
uint64_t
lsize
);
void
vmaDestroy
(
SVMemAllocator
*
pVMA
);
void
vmaReset
(
SVMemAllocator
*
pVMA
);
void
*
vmaMalloc
(
SVMemAllocator
*
pVMA
,
uint64_t
size
);
void
vmaFree
(
SVMemAllocator
*
pVMA
,
void
*
ptr
);
bool
vmaIsFull
(
SVMemAllocator
*
pVMA
);
// vnodeCfg.h
extern
const
SVnodeCfg
defaultVnodeOptions
;
int
vnodeValidateOptions
(
const
SVnodeCfg
*
);
void
vnodeOptionsCopy
(
SVnodeCfg
*
pDest
,
const
SVnodeCfg
*
pSrc
);
// For commit
#define vnodeShouldCommit vnodeBufPoolIsFull
int
vnodeSyncCommit
(
SVnode
*
pVnode
);
int
vnodeAsyncCommit
(
SVnode
*
pVnode
);
#endif
#ifdef __cplusplus
}
#endif
#endif
/*_TD_VND_H_*/
\ No newline at end of file
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
a9a24a14
...
@@ -35,36 +35,21 @@
...
@@ -35,36 +35,21 @@
#include "tstream.h"
#include "tstream.h"
#include "ttime.h"
#include "ttime.h"
#include "ttimer.h"
#include "ttimer.h"
#include "vnode.h"
#include "wal.h"
#include "wal.h"
#include "vnode.h"
#ifdef __cplusplus
#ifdef __cplusplus
extern
"C"
{
extern
"C"
{
#endif
#endif
typedef
struct
SMeta
SMeta
;
typedef
struct
STsdb
STsdb
;
typedef
struct
STQ
STQ
;
typedef
struct
STQ
STQ
;
typedef
struct
SVState
SVState
;
typedef
struct
SVState
SVState
;
typedef
struct
SVBufPool
SVBufPool
;
typedef
struct
SVBufPool
SVBufPool
;
typedef
struct
SQWorkerMgmt
SQHandle
;
typedef
struct
SQWorkerMgmt
SQHandle
;
typedef
struct
SVnodeTask
{
TD_DLIST_NODE
(
SVnodeTask
);
void
*
arg
;
int
(
*
execute
)(
void
*
);
}
SVnodeTask
;
typedef
struct
SVnodeMgr
{
td_mode_flag_t
vnodeInitFlag
;
// For commit
bool
stop
;
uint16_t
nthreads
;
TdThread
*
threads
;
TdThreadMutex
mutex
;
TdThreadCond
hasTask
;
TD_DLIST
(
SVnodeTask
)
queue
;
}
SVnodeMgr
;
typedef
struct
{
typedef
struct
{
int8_t
streamType
;
// sma or other
int8_t
streamType
;
// sma or other
int8_t
dstType
;
int8_t
dstType
;
...
@@ -80,8 +65,6 @@ typedef struct {
...
@@ -80,8 +65,6 @@ typedef struct {
SHashObj
*
pHash
;
// streamId -> SStreamSinkInfo
SHashObj
*
pHash
;
// streamId -> SStreamSinkInfo
}
SSink
;
}
SSink
;
extern
SVnodeMgr
vnodeMgr
;
// SVState
// SVState
struct
SVState
{
struct
SVState
{
int64_t
processed
;
int64_t
processed
;
...
@@ -106,118 +89,11 @@ struct SVnode {
...
@@ -106,118 +89,11 @@ struct SVnode {
STfs
*
pTfs
;
STfs
*
pTfs
;
};
};
int
vnodeScheduleTask
(
SVnodeTask
*
task
);
int
vnodeQueryOpen
(
SVnode
*
pVnode
);
void
vnodeQueryClose
(
SVnode
*
pVnode
);
#define vFatal(...) \
do { \
if (vDebugFlag & DEBUG_FATAL) { \
taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \
} \
} while (0)
#define vError(...) \
do { \
if (vDebugFlag & DEBUG_ERROR) { \
taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \
} \
} while (0)
#define vWarn(...) \
do { \
if (vDebugFlag & DEBUG_WARN) { \
taosPrintLog("VND WARN ", DEBUG_WARN, 255, __VA_ARGS__); \
} \
} while (0)
#define vInfo(...) \
do { \
if (vDebugFlag & DEBUG_INFO) { \
taosPrintLog("VND ", DEBUG_INFO, 255, __VA_ARGS__); \
} \
} while (0)
#define vDebug(...) \
do { \
if (vDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("VND ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define vTrace(...) \
do { \
if (vDebugFlag & DEBUG_TRACE) { \
taosPrintLog("VND ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); \
} \
} while (0)
// vnodeCfg.h
extern
const
SVnodeCfg
defaultVnodeOptions
;
int
vnodeValidateOptions
(
const
SVnodeCfg
*
);
void
vnodeOptionsCopy
(
SVnodeCfg
*
pDest
,
const
SVnodeCfg
*
pSrc
);
// For commit
#define vnodeShouldCommit vnodeBufPoolIsFull
int
vnodeSyncCommit
(
SVnode
*
pVnode
);
int
vnodeAsyncCommit
(
SVnode
*
pVnode
);
// SVBufPool
int
vnodeOpenBufPool
(
SVnode
*
pVnode
);
void
vnodeCloseBufPool
(
SVnode
*
pVnode
);
int
vnodeBufPoolSwitch
(
SVnode
*
pVnode
);
int
vnodeBufPoolRecycle
(
SVnode
*
pVnode
);
void
*
vnodeMalloc
(
SVnode
*
pVnode
,
uint64_t
size
);
bool
vnodeBufPoolIsFull
(
SVnode
*
pVnode
);
SMemAllocatorFactory
*
vBufPoolGetMAF
(
SVnode
*
pVnode
);
// SVMemAllocator
typedef
struct
SVArenaNode
{
TD_SLIST_NODE
(
SVArenaNode
);
uint64_t
size
;
// current node size
void
*
ptr
;
char
data
[];
}
SVArenaNode
;
typedef
struct
SVMemAllocator
{
T_REF_DECLARE
()
TD_DLIST_NODE
(
SVMemAllocator
);
uint64_t
capacity
;
uint64_t
ssize
;
uint64_t
lsize
;
SVArenaNode
*
pNode
;
TD_SLIST
(
SVArenaNode
)
nlist
;
}
SVMemAllocator
;
SVMemAllocator
*
vmaCreate
(
uint64_t
capacity
,
uint64_t
ssize
,
uint64_t
lsize
);
void
vmaDestroy
(
SVMemAllocator
*
pVMA
);
void
vmaReset
(
SVMemAllocator
*
pVMA
);
void
*
vmaMalloc
(
SVMemAllocator
*
pVMA
,
uint64_t
size
);
void
vmaFree
(
SVMemAllocator
*
pVMA
,
void
*
ptr
);
bool
vmaIsFull
(
SVMemAllocator
*
pVMA
);
// init once
int
tqInit
();
void
tqCleanUp
();
// open in each vnode
STQ
*
tqOpen
(
const
char
*
path
,
SVnode
*
pVnode
,
SWal
*
pWal
,
SMeta
*
pMeta
,
STqCfg
*
tqConfig
,
SMemAllocatorFactory
*
allocFac
);
void
tqClose
(
STQ
*
);
// required by vnode
int
tqPushMsg
(
STQ
*
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
version
);
int
tqCommit
(
STQ
*
);
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
,
int32_t
workerId
);
int32_t
tqProcessSetConnReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessRebReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessCancelConnReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessTaskExec
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
,
int32_t
workerId
);
int32_t
tqProcessTaskDeploy
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcessStreamTrigger
(
STQ
*
pTq
,
void
*
data
,
int32_t
dataLen
,
int32_t
workerId
);
// sma
// sma
void
smaHandleRes
(
void
*
pVnode
,
int64_t
smaId
,
const
SArray
*
data
);
void
smaHandleRes
(
void
*
pVnode
,
int64_t
smaId
,
const
SArray
*
data
);
#include "vnd.h"
#include "meta.h"
#include "meta.h"
#include "tsdb.h"
#include "tsdb.h"
...
...
source/dnode/vnode/src/meta/metaCache.c
已删除
100644 → 0
浏览文件 @
aa736030
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
struct
SMetaCache
{
// TODO
};
int
metaOpenCache
(
SMeta
*
pMeta
)
{
// TODO
// if (pMeta->options.lruSize) {
// pMeta->pCache = rocksdb_cache_create_lru(pMeta->options.lruSize);
// if (pMeta->pCache == NULL) {
// // TODO: handle error
// return -1;
// }
// }
return
0
;
}
void
metaCloseCache
(
SMeta
*
pMeta
)
{
// if (pMeta->pCache) {
// rocksdb_cache_destroy(pMeta->pCache);
// pMeta->pCache = NULL;
// }
}
\ No newline at end of file
source/dnode/vnode/src/meta/metaCfg.c
已删除
100644 → 0
浏览文件 @
aa736030
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
const
SMetaCfg
defaultMetaOptions
=
{.
lruSize
=
0
};
/* ------------------------ EXPOSED METHODS ------------------------ */
int
metaValidateOptions
(
const
SMetaCfg
*
pMetaOptions
)
{
// TODO
return
0
;
}
void
metaOptionsCopy
(
SMetaCfg
*
pDest
,
const
SMetaCfg
*
pSrc
)
{
memcpy
(
pDest
,
pSrc
,
sizeof
(
*
pSrc
));
}
/* ------------------------ STATIC METHODS ------------------------ */
\ No newline at end of file
source/dnode/vnode/src/meta/metaMain.c
浏览文件 @
a9a24a14
...
@@ -25,17 +25,6 @@ static void metaCloseImpl(SMeta *pMeta);
...
@@ -25,17 +25,6 @@ static void metaCloseImpl(SMeta *pMeta);
SMeta
*
metaOpen
(
const
char
*
path
,
const
SMetaCfg
*
pMetaCfg
,
SMemAllocatorFactory
*
pMAF
)
{
SMeta
*
metaOpen
(
const
char
*
path
,
const
SMetaCfg
*
pMetaCfg
,
SMemAllocatorFactory
*
pMAF
)
{
SMeta
*
pMeta
=
NULL
;
SMeta
*
pMeta
=
NULL
;
// Set default options
if
(
pMetaCfg
==
NULL
)
{
pMetaCfg
=
&
defaultMetaOptions
;
}
// // Validate the options
// if (metaValidateOptions(pMetaCfg) < 0) {
// // TODO: deal with error
// return NULL;
// }
// Allocate handle
// Allocate handle
pMeta
=
metaNew
(
path
,
pMetaCfg
,
pMAF
);
pMeta
=
metaNew
(
path
,
pMetaCfg
,
pMAF
);
if
(
pMeta
==
NULL
)
{
if
(
pMeta
==
NULL
)
{
...
@@ -80,9 +69,6 @@ static SMeta *metaNew(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorF
...
@@ -80,9 +69,6 @@ static SMeta *metaNew(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorF
return
NULL
;
return
NULL
;
}
}
metaOptionsCopy
(
&
(
pMeta
->
options
),
pMetaCfg
);
pMeta
->
pmaf
=
pMAF
;
return
pMeta
;
return
pMeta
;
};
};
...
@@ -94,13 +80,6 @@ static void metaFree(SMeta *pMeta) {
...
@@ -94,13 +80,6 @@ static void metaFree(SMeta *pMeta) {
}
}
static
int
metaOpenImpl
(
SMeta
*
pMeta
)
{
static
int
metaOpenImpl
(
SMeta
*
pMeta
)
{
// Open meta cache
if
(
metaOpenCache
(
pMeta
)
<
0
)
{
// TODO: handle error
metaCloseImpl
(
pMeta
);
return
-
1
;
}
// Open meta db
// Open meta db
if
(
metaOpenDB
(
pMeta
)
<
0
)
{
if
(
metaOpenDB
(
pMeta
)
<
0
)
{
// TODO: handle error
// TODO: handle error
...
@@ -129,5 +108,4 @@ static void metaCloseImpl(SMeta *pMeta) {
...
@@ -129,5 +108,4 @@ static void metaCloseImpl(SMeta *pMeta) {
metaCloseUidGnrt
(
pMeta
);
metaCloseUidGnrt
(
pMeta
);
metaCloseIdx
(
pMeta
);
metaCloseIdx
(
pMeta
);
metaCloseDB
(
pMeta
);
metaCloseDB
(
pMeta
);
metaCloseCache
(
pMeta
);
}
}
\ No newline at end of file
source/dnode/vnode/src/meta/metaQuery.c
已删除
100644 → 0
浏览文件 @
aa736030
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
\ No newline at end of file
source/dnode/vnode/src/meta/metaTbCfg.c
已删除
100644 → 0
浏览文件 @
aa736030
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
int
metaValidateTbCfg
(
SMeta
*
pMeta
,
const
STbCfg
*
pTbOptions
)
{
// TODO
return
0
;
}
size_t
metaEncodeTbObjFromTbOptions
(
const
STbCfg
*
pTbOptions
,
void
*
pBuf
,
size_t
bsize
)
{
void
**
ppBuf
=
&
pBuf
;
int
tlen
=
0
;
tlen
+=
taosEncodeFixedU8
(
ppBuf
,
pTbOptions
->
type
);
tlen
+=
taosEncodeString
(
ppBuf
,
pTbOptions
->
name
);
tlen
+=
taosEncodeFixedU32
(
ppBuf
,
pTbOptions
->
ttl
);
switch
(
pTbOptions
->
type
)
{
case
META_SUPER_TABLE
:
tlen
+=
taosEncodeFixedU64
(
ppBuf
,
pTbOptions
->
stbCfg
.
suid
);
tlen
+=
tdEncodeSchema
(
ppBuf
,
(
STSchema
*
)
pTbOptions
->
stbCfg
.
pTagSchema
);
// TODO: encode schema version array
break
;
case
META_CHILD_TABLE
:
tlen
+=
taosEncodeFixedU64
(
ppBuf
,
pTbOptions
->
ctbCfg
.
suid
);
break
;
case
META_NORMAL_TABLE
:
// TODO: encode schema version array
break
;
default:
break
;
}
return
tlen
;
}
\ No newline at end of file
source/dnode/vnode/src/meta/metaTbTag.c
已删除
100644 → 0
浏览文件 @
aa736030
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
\ No newline at end of file
source/dnode/vnode/src/vnd/vnodeCommit.c
浏览文件 @
a9a24a14
...
@@ -24,16 +24,10 @@ int vnodeAsyncCommit(SVnode *pVnode) {
...
@@ -24,16 +24,10 @@ int vnodeAsyncCommit(SVnode *pVnode) {
vnodeWaitCommit
(
pVnode
);
vnodeWaitCommit
(
pVnode
);
vnodeBufPoolSwitch
(
pVnode
);
vnodeBufPoolSwitch
(
pVnode
);
SVnodeTask
*
pTask
=
(
SVnodeTask
*
)
taosMemoryMalloc
(
sizeof
(
*
pTask
));
pTask
->
execute
=
vnodeCommit
;
// TODO
pTask
->
arg
=
pVnode
;
// TODO
tsdbPrepareCommit
(
pVnode
->
pTsdb
);
tsdbPrepareCommit
(
pVnode
->
pTsdb
);
// metaPrepareCommit(pVnode->pMeta);
// walPreapareCommit(pVnode->pWal);
vnodeScheduleTask
(
pTask
);
vnodeScheduleTask
(
vnodeCommit
,
pVnode
);
return
0
;
return
0
;
}
}
...
...
source/dnode/vnode/src/vnd/vnodeM
gr
.c
→
source/dnode/vnode/src/vnd/vnodeM
odule
.c
浏览文件 @
a9a24a14
...
@@ -15,31 +15,55 @@
...
@@ -15,31 +15,55 @@
#include "vnodeInt.h"
#include "vnodeInt.h"
SVnodeMgr
vnodeMgr
=
{.
vnodeInitFlag
=
TD_MOD_UNINITIALIZED
};
typedef
struct
SVnodeTask
SVnodeTask
;
struct
SVnodeTask
{
SVnodeTask
*
next
;
SVnodeTask
*
prev
;
int
(
*
execute
)(
void
*
);
void
*
arg
;
};
struct
SVnodeGlobal
{
int8_t
init
;
int8_t
stop
;
int
nthreads
;
TdThread
*
threads
;
TdThreadMutex
mutex
;
TdThreadCond
hasTask
;
SVnodeTask
queue
;
};
struct
SVnodeGlobal
vnodeGlobal
;
static
void
*
loop
(
void
*
arg
);
static
void
*
loop
(
void
*
arg
);
int
vnodeInit
()
{
int
vnodeInit
(
int
nthreads
)
{
if
(
TD_CHECK_AND_SET_MODE_INIT
(
&
(
vnodeMgr
.
vnodeInitFlag
))
==
TD_MOD_INITIALIZED
)
{
int8_t
init
;
int
ret
;
init
=
atomic_val_compare_exchange_8
(
&
(
vnodeGlobal
.
init
),
0
,
1
);
if
(
init
)
{
return
0
;
return
0
;
}
}
vnodeMgr
.
stop
=
false
;
vnodeGlobal
.
stop
=
0
;
vnodeGlobal
.
queue
.
next
=
&
vnodeGlobal
.
queue
;
vnodeGlobal
.
queue
.
prev
=
&
vnodeGlobal
.
queue
;
// Start commit handers
vnodeGlobal
.
nthreads
=
nthreads
;
vnodeMgr
.
nthreads
=
tsNumOfCommitThreads
;
vnodeGlobal
.
threads
=
taosMemoryCalloc
(
nthreads
,
sizeof
(
TdThread
));
vnodeMgr
.
threads
=
taosMemoryCalloc
(
vnodeMgr
.
nthreads
,
sizeof
(
TdThread
));
if
(
vnodeGlobal
.
threads
==
NULL
)
{
if
(
vnodeMgr
.
threads
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
vError
(
"failed to init vnode module since: %s"
,
tstrerror
(
terrno
));
return
-
1
;
return
-
1
;
}
}
taosThreadMutexInit
(
&
(
vnodeMgr
.
mutex
),
NULL
);
taosThreadMutexInit
(
&
vnodeGlobal
.
mutex
,
NULL
);
taosThreadCondInit
(
&
(
vnodeMgr
.
hasTask
),
NULL
);
taosThreadCondInit
(
&
vnodeGlobal
.
hasTask
,
NULL
);
TD_DLIST_INIT
(
&
(
vnodeMgr
.
queue
));
for
(
uint16_t
i
=
0
;
i
<
vnodeMgr
.
nthreads
;
i
++
)
{
for
(
int
i
=
0
;
i
<
nthreads
;
i
++
)
{
taosThreadCreate
(
&
(
vnodeMgr
.
threads
[
i
]),
NULL
,
loop
,
NULL
);
taosThreadCreate
(
&
(
vnodeGlobal
.
threads
[
i
]),
NULL
,
loop
,
NULL
);
// pthread_setname_np(vnodeMgr.threads[i], "VND Commit Thread");
}
}
if
(
walInit
()
<
0
)
{
if
(
walInit
()
<
0
)
{
...
@@ -50,62 +74,83 @@ int vnodeInit() {
...
@@ -50,62 +74,83 @@ int vnodeInit() {
}
}
void
vnodeCleanup
()
{
void
vnodeCleanup
()
{
if
(
TD_CHECK_AND_SET_MOD_CLEAR
(
&
(
vnodeMgr
.
vnodeInitFlag
))
==
TD_MOD_UNINITIALIZED
)
{
int8_t
init
;
return
;
}
init
=
atomic_val_compare_exchange_8
(
&
(
vnodeGlobal
.
init
),
1
,
0
);
if
(
init
==
0
)
return
;
//
Stop commit handler
//
set stop
taosThreadMutexLock
(
&
(
vnode
Mgr
.
mutex
));
taosThreadMutexLock
(
&
(
vnode
Global
.
mutex
));
vnode
Mgr
.
stop
=
true
;
vnode
Global
.
stop
=
1
;
taosThreadCondBroadcast
(
&
(
vnode
Mgr
.
hasTask
));
taosThreadCondBroadcast
(
&
(
vnode
Global
.
hasTask
));
taosThreadMutexUnlock
(
&
(
vnode
Mgr
.
mutex
));
taosThreadMutexUnlock
(
&
(
vnode
Global
.
mutex
));
for
(
uint16_t
i
=
0
;
i
<
vnodeMgr
.
nthreads
;
i
++
)
{
// wait for threads
taosThreadJoin
(
vnodeMgr
.
threads
[
i
],
NULL
);
for
(
int
i
=
0
;
i
<
vnodeGlobal
.
nthreads
;
i
++
)
{
taosThreadJoin
(
vnodeGlobal
.
threads
[
i
],
NULL
);
}
}
taosMemoryFreeClear
(
vnodeMgr
.
threads
);
// clear source
taosThreadCondDestroy
(
&
(
vnodeMgr
.
hasTask
));
taosMemoryFreeClear
(
vnodeGlobal
.
threads
);
taosThreadMutexDestroy
(
&
(
vnodeMgr
.
mutex
));
taosThreadCondDestroy
(
&
(
vnodeGlobal
.
hasTask
));
taosThreadMutexDestroy
(
&
(
vnodeGlobal
.
mutex
));
}
}
int
vnodeScheduleTask
(
SVnodeTask
*
pTask
)
{
int
vnodeScheduleTask
(
int
(
*
execute
)(
void
*
),
void
*
arg
)
{
taosThreadMutexLock
(
&
(
vnodeMgr
.
mutex
))
;
SVnodeTask
*
pTask
;
TD_DLIST_APPEND
(
&
(
vnodeMgr
.
queue
),
pTask
);
ASSERT
(
!
vnodeGlobal
.
stop
);
taosThreadCondSignal
(
&
(
vnodeMgr
.
hasTask
));
pTask
=
taosMemoryMalloc
(
sizeof
(
*
pTask
));
if
(
pTask
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pTask
->
execute
=
execute
;
pTask
->
arg
=
arg
;
taosThreadMutexUnlock
(
&
(
vnodeMgr
.
mutex
));
taosThreadMutexLock
(
&
(
vnodeGlobal
.
mutex
));
pTask
->
next
=
&
vnodeGlobal
.
queue
;
pTask
->
prev
=
vnodeGlobal
.
queue
.
prev
;
vnodeGlobal
.
queue
.
prev
->
next
=
pTask
;
vnodeGlobal
.
queue
.
prev
=
pTask
;
taosThreadCondSignal
(
&
(
vnodeGlobal
.
hasTask
));
taosThreadMutexUnlock
(
&
(
vnodeGlobal
.
mutex
));
return
0
;
return
0
;
}
}
/* ------------------------ STATIC METHODS ------------------------ */
/* ------------------------ STATIC METHODS ------------------------ */
static
void
*
loop
(
void
*
arg
)
{
static
void
*
loop
(
void
*
arg
)
{
SVnodeTask
*
pTask
;
int
ret
;
setThreadName
(
"vnode-commit"
);
setThreadName
(
"vnode-commit"
);
SVnodeTask
*
pTask
;
for
(;;)
{
for
(;;)
{
taosThreadMutexLock
(
&
(
vnode
Mgr
.
mutex
));
taosThreadMutexLock
(
&
(
vnode
Global
.
mutex
));
for
(;;)
{
for
(;;)
{
pTask
=
TD_DLIST_HEAD
(
&
(
vnodeMgr
.
queue
));
pTask
=
vnodeGlobal
.
queue
.
next
;
if
(
pTask
==
NULL
)
{
if
(
pTask
==
&
vnodeGlobal
.
queue
)
{
if
(
vnodeMgr
.
stop
)
{
// no task
taosThreadMutexUnlock
(
&
(
vnodeMgr
.
mutex
));
if
(
vnodeGlobal
.
stop
)
{
taosThreadMutexUnlock
(
&
(
vnodeGlobal
.
mutex
));
return
NULL
;
return
NULL
;
}
else
{
}
else
{
taosThreadCondWait
(
&
(
vnode
Mgr
.
hasTask
),
&
(
vnodeMgr
.
mutex
));
taosThreadCondWait
(
&
(
vnode
Global
.
hasTask
),
&
(
vnodeGlobal
.
mutex
));
}
}
}
else
{
}
else
{
TD_DLIST_POP
(
&
(
vnodeMgr
.
queue
),
pTask
);
// has task
pTask
->
prev
->
next
=
pTask
->
next
;
pTask
->
next
->
prev
=
pTask
->
prev
;
break
;
break
;
}
}
}
}
taosThreadMutexUnlock
(
&
(
vnode
Mgr
.
mutex
));
taosThreadMutexUnlock
(
&
(
vnode
Global
.
mutex
));
(
*
(
pTask
->
execute
))
(
pTask
->
arg
);
pTask
->
execute
(
pTask
->
arg
);
taosMemoryFree
(
pTask
);
taosMemoryFree
(
pTask
);
}
}
...
...
source/libs/scalar/src/sclfunc.c
浏览文件 @
a9a24a14
...
@@ -331,6 +331,19 @@ static int32_t concatCopyHelper(const char *input, char *output, bool hasNcharCo
...
@@ -331,6 +331,19 @@ static int32_t concatCopyHelper(const char *input, char *output, bool hasNcharCo
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
static
int32_t
getNumOfNullEntries
(
SColumnInfoData
*
pColumnInfoData
,
int32_t
numOfRows
)
{
int32_t
numOfNulls
=
0
;
if
(
!
pColumnInfoData
->
hasNull
)
{
return
numOfNulls
;
}
for
(
int
i
=
0
;
i
<
numOfRows
;
++
i
)
{
if
(
pColumnInfoData
->
varmeta
.
offset
[
i
]
==
-
1
)
{
numOfNulls
++
;
}
}
return
numOfNulls
;
}
int32_t
concatFunction
(
SScalarParam
*
pInput
,
int32_t
inputNum
,
SScalarParam
*
pOutput
)
{
int32_t
concatFunction
(
SScalarParam
*
pInput
,
int32_t
inputNum
,
SScalarParam
*
pOutput
)
{
if
(
inputNum
<
2
||
inputNum
>
8
)
{
// concat accpet 2-8 input strings
if
(
inputNum
<
2
||
inputNum
>
8
)
{
// concat accpet 2-8 input strings
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
...
@@ -363,10 +376,12 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
...
@@ -363,10 +376,12 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
if
(
hasNcharCol
&&
(
GET_PARAM_TYPE
(
&
pInput
[
i
])
==
TSDB_DATA_TYPE_VARCHAR
))
{
if
(
hasNcharCol
&&
(
GET_PARAM_TYPE
(
&
pInput
[
i
])
==
TSDB_DATA_TYPE_VARCHAR
))
{
factor
=
TSDB_NCHAR_SIZE
;
factor
=
TSDB_NCHAR_SIZE
;
}
}
int32_t
numOfNulls
=
getNumOfNullEntries
(
pInputData
[
i
],
pInput
[
i
].
numOfRows
);
if
(
pInput
[
i
].
numOfRows
==
1
)
{
if
(
pInput
[
i
].
numOfRows
==
1
)
{
inputLen
+=
(
pInputData
[
i
]
->
varmeta
.
length
-
VARSTR_HEADER_SIZE
)
*
factor
*
numOfRows
;
inputLen
+=
(
pInputData
[
i
]
->
varmeta
.
length
-
VARSTR_HEADER_SIZE
)
*
factor
*
(
numOfRows
-
numOfNulls
)
;
}
else
{
}
else
{
inputLen
+=
pInputData
[
i
]
->
varmeta
.
length
-
numOfRows
*
VARSTR_HEADER_SIZE
;
inputLen
+=
pInputData
[
i
]
->
varmeta
.
length
-
(
numOfRows
-
numOfNulls
)
*
VARSTR_HEADER_SIZE
;
}
}
}
}
...
@@ -444,13 +459,15 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
...
@@ -444,13 +459,15 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
if
(
hasNcharCol
&&
(
GET_PARAM_TYPE
(
&
pInput
[
i
])
==
TSDB_DATA_TYPE_VARCHAR
))
{
if
(
hasNcharCol
&&
(
GET_PARAM_TYPE
(
&
pInput
[
i
])
==
TSDB_DATA_TYPE_VARCHAR
))
{
factor
=
TSDB_NCHAR_SIZE
;
factor
=
TSDB_NCHAR_SIZE
;
}
}
int32_t
numOfNulls
=
getNumOfNullEntries
(
pInputData
[
i
],
pInput
[
i
].
numOfRows
);
if
(
i
==
0
)
{
if
(
i
==
0
)
{
// calculate required separator space
// calculate required separator space
inputLen
+=
(
pInputData
[
0
]
->
varmeta
.
length
-
VARSTR_HEADER_SIZE
)
*
numOfRows
*
(
inputNum
-
2
)
*
factor
;
inputLen
+=
(
pInputData
[
0
]
->
varmeta
.
length
-
VARSTR_HEADER_SIZE
)
*
(
numOfRows
-
numOfNulls
)
*
(
inputNum
-
2
)
*
factor
;
}
else
if
(
pInput
[
i
].
numOfRows
==
1
)
{
}
else
if
(
pInput
[
i
].
numOfRows
==
1
)
{
inputLen
+=
(
pInputData
[
i
]
->
varmeta
.
length
-
VARSTR_HEADER_SIZE
)
*
numOfRows
*
factor
;
inputLen
+=
(
pInputData
[
i
]
->
varmeta
.
length
-
VARSTR_HEADER_SIZE
)
*
(
numOfRows
-
numOfNulls
)
*
factor
;
}
else
{
}
else
{
inputLen
+=
pInputData
[
i
]
->
varmeta
.
length
-
numOfRows
*
VARSTR_HEADER_SIZE
;
inputLen
+=
pInputData
[
i
]
->
varmeta
.
length
-
(
numOfRows
-
numOfNulls
)
*
VARSTR_HEADER_SIZE
;
}
}
}
}
...
...
tests/script/tsim/query/charScalarFunction.sim
浏览文件 @
a9a24a14
...
@@ -266,20 +266,21 @@ if $data11 != NULL then
...
@@ -266,20 +266,21 @@ if $data11 != NULL then
return -1
return -1
endi
endi
# print ====> select c2, c3 , concat(c2,c3) from ctb6
print ====> select c2, c3 , concat(c2,c3) from ctb6
# sql select c2, c3 , concat(c2,c3) from ctb6
sql select c2, c3 , concat(c2,c3) from ctb6
# print ====> rows: $rows
print ====> rows: $rows
# print ====> $data00 $data01 $data02
print ====> $data00 $data01 $data02
# print ====> $data10 $data11 $data12
print ====> $data10 $data11 $data12
# if $rows != 2 then
if $rows != 2 then
# return -1
return -1
# endi
endi
# if $data02 != 中文测试01中文测试01 then
if $data02 != 中文测试1中文测试2 then
# return -1
return -1
# endi
endi
# if $data12 != NULL then
if $data12 != NULL then
# return -1
return -1
# endi
endi
return
print ====> select c2, c3 , concat(c2,c3) from ntb6
print ====> select c2, c3 , concat(c2,c3) from ntb6
sql select c2, c3 , concat(c2,c3) from ntb6
sql select c2, c3 , concat(c2,c3) from ntb6
...
...
tests/test/c/tmqDemo.c
浏览文件 @
a9a24a14
...
@@ -13,8 +13,6 @@
...
@@ -13,8 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#define ALLOW_FORBID_FUNC
#include <assert.h>
#include <assert.h>
#include <dirent.h>
#include <dirent.h>
#include <stdio.h>
#include <stdio.h>
...
@@ -36,7 +34,12 @@
...
@@ -36,7 +34,12 @@
#define MAX_SQL_STR_LEN (1024 * 1024)
#define MAX_SQL_STR_LEN (1024 * 1024)
#define MAX_ROW_STR_LEN (16 * 1024)
#define MAX_ROW_STR_LEN (16 * 1024)
enum
_RUN_MODE
{
TMQ_RUN_INSERT_AND_CONSUME
,
TMQ_RUN_ONLY_INSERT
,
TMQ_RUN_ONLY_CONSUME
,
TMQ_RUN_MODE_BUTT
};
enum
_RUN_MODE
{
TMQ_RUN_INSERT_AND_CONSUME
,
TMQ_RUN_ONLY_INSERT
,
TMQ_RUN_ONLY_CONSUME
,
TMQ_RUN_MODE_BUTT
,
};
typedef
struct
{
typedef
struct
{
char
dbName
[
32
];
char
dbName
[
32
];
...
@@ -338,6 +341,7 @@ tmq_t* build_consumer() {
...
@@ -338,6 +341,7 @@ tmq_t* build_consumer() {
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
tmq_conf_set
(
conf
,
"td.connect.db"
,
g_stConfInfo
.
dbName
);
tmq_conf_set
(
conf
,
"td.connect.db"
,
g_stConfInfo
.
dbName
);
tmq_t
*
tmq
=
tmq_consumer_new1
(
conf
,
NULL
,
0
);
tmq_t
*
tmq
=
tmq_consumer_new1
(
conf
,
NULL
,
0
);
assert
(
tmq
);
tmq_conf_destroy
(
conf
);
tmq_conf_destroy
(
conf
);
return
tmq
;
return
tmq
;
}
}
...
@@ -596,7 +600,7 @@ void printParaIntoFile() {
...
@@ -596,7 +600,7 @@ void printParaIntoFile() {
g_fp
=
pFile
;
g_fp
=
pFile
;
time_t
tTime
=
taosGetTimestampSec
();
time_t
tTime
=
taosGetTimestampSec
();
struct
tm
tm
=
*
localtime
(
&
tTime
);
struct
tm
tm
=
*
taosLocalTime
(
&
tTime
,
NULL
);
taosFprintfFile
(
pFile
,
"###################################################################
\n
"
);
taosFprintfFile
(
pFile
,
"###################################################################
\n
"
);
taosFprintfFile
(
pFile
,
"# configDir: %s
\n
"
,
configDir
);
taosFprintfFile
(
pFile
,
"# configDir: %s
\n
"
,
configDir
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录