Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
97ca0968
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
97ca0968
编写于
9月 19, 2022
作者:
5
54liuyao
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'feature/stream' into feature/TD-18820
上级
955838d1
756a2d3d
变更
20
隐藏空白更改
内联
并排
Showing
20 changed file
with
308 addition
and
98 deletion
+308
-98
cmake/taostools_CMakeLists.txt.in
cmake/taostools_CMakeLists.txt.in
+1
-1
include/libs/stream/streamState.h
include/libs/stream/streamState.h
+1
-1
include/util/taoserror.h
include/util/taoserror.h
+2
-0
source/client/src/TMQConnector.c
source/client/src/TMQConnector.c
+2
-2
source/client/src/clientHb.c
source/client/src/clientHb.c
+9
-3
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+1
-0
source/common/src/tglobal.c
source/common/src/tglobal.c
+19
-20
source/dnode/vnode/src/inc/sma.h
source/dnode/vnode/src/inc/sma.h
+5
-2
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+89
-5
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+4
-7
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
+1
-0
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+2
-1
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+2
-2
source/libs/nodes/src/nodesMsgFuncs.c
source/libs/nodes/src/nodesMsgFuncs.c
+149
-48
source/libs/planner/test/planTestUtil.cpp
source/libs/planner/test/planTestUtil.cpp
+2
-1
source/libs/stream/src/streamState.c
source/libs/stream/src/streamState.c
+7
-2
source/libs/tdb/src/db/tdbBtree.c
source/libs/tdb/src/db/tdbBtree.c
+4
-0
source/libs/tdb/src/db/tdbPager.c
source/libs/tdb/src/db/tdbPager.c
+2
-2
source/libs/wal/src/walMeta.c
source/libs/wal/src/walMeta.c
+4
-1
source/util/src/terror.c
source/util/src/terror.c
+2
-0
未找到文件。
cmake/taostools_CMakeLists.txt.in
浏览文件 @
97ca0968
...
@@ -2,7 +2,7 @@
...
@@ -2,7 +2,7 @@
# taos-tools
# taos-tools
ExternalProject_Add(taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG
125c77a
GIT_TAG
509ec72
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
#BUILD_IN_SOURCE TRUE
...
...
include/libs/stream/streamState.h
浏览文件 @
97ca0968
...
@@ -34,7 +34,7 @@ typedef struct {
...
@@ -34,7 +34,7 @@ typedef struct {
TXN
txn
;
TXN
txn
;
}
SStreamState
;
}
SStreamState
;
SStreamState
*
streamStateOpen
(
char
*
path
,
SStreamTask
*
pTask
);
SStreamState
*
streamStateOpen
(
char
*
path
,
SStreamTask
*
pTask
,
bool
specPath
);
void
streamStateClose
(
SStreamState
*
pState
);
void
streamStateClose
(
SStreamState
*
pState
);
int32_t
streamStateBegin
(
SStreamState
*
pState
);
int32_t
streamStateBegin
(
SStreamState
*
pState
);
int32_t
streamStateCommit
(
SStreamState
*
pState
);
int32_t
streamStateCommit
(
SStreamState
*
pState
);
...
...
include/util/taoserror.h
浏览文件 @
97ca0968
...
@@ -619,6 +619,8 @@ int32_t* taosGetErrno();
...
@@ -619,6 +619,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156)
#define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156)
#define TSDB_CODE_RSMA_INVALID_SCHEMA TAOS_DEF_ERROR_CODE(0, 0x3157)
#define TSDB_CODE_RSMA_INVALID_SCHEMA TAOS_DEF_ERROR_CODE(0, 0x3157)
#define TSDB_CODE_RSMA_REGEX_MATCH TAOS_DEF_ERROR_CODE(0, 0x3158)
#define TSDB_CODE_RSMA_REGEX_MATCH TAOS_DEF_ERROR_CODE(0, 0x3158)
#define TSDB_CODE_RSMA_STREAM_STATE_OPEN TAOS_DEF_ERROR_CODE(0, 0x3159)
#define TSDB_CODE_RSMA_STREAM_STATE_COMMIT TAOS_DEF_ERROR_CODE(0, 0x3160)
//index
//index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
...
...
source/client/src/TMQConnector.c
浏览文件 @
97ca0968
...
@@ -212,7 +212,7 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN
...
@@ -212,7 +212,7 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN
tmq_commit_async
(
tmq
,
res
,
commit_cb
,
consumer
);
tmq_commit_async
(
tmq
,
res
,
commit_cb
,
consumer
);
}
}
JNIEXPORT
int
JNICALL
Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp
(
JNIEnv
*
env
,
jobject
jobj
,
jlong
jtmq
)
{
JNIEXPORT
j
int
JNICALL
Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp
(
JNIEnv
*
env
,
jobject
jobj
,
jlong
jtmq
)
{
tmq_t
*
tmq
=
(
tmq_t
*
)
jtmq
;
tmq_t
*
tmq
=
(
tmq_t
*
)
jtmq
;
if
(
tmq
==
NULL
)
{
if
(
tmq
==
NULL
)
{
jniError
(
"jobj:%p, tmq is closed"
,
jobj
);
jniError
(
"jobj:%p, tmq is closed"
,
jobj
);
...
@@ -222,7 +222,7 @@ JNIEXPORT int JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp(
...
@@ -222,7 +222,7 @@ JNIEXPORT int JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqUnsubscribeImp(
return
tmq_unsubscribe
((
tmq_t
*
)
tmq
);
return
tmq_unsubscribe
((
tmq_t
*
)
tmq
);
}
}
JNIEXPORT
int
JNICALL
Java_com_taosdata_jdbc_tmq_TMQConnector_tmqConsumerCloseImp
(
JNIEnv
*
env
,
jobject
jobj
,
JNIEXPORT
j
int
JNICALL
Java_com_taosdata_jdbc_tmq_TMQConnector_tmqConsumerCloseImp
(
JNIEnv
*
env
,
jobject
jobj
,
jlong
jtmq
)
{
jlong
jtmq
)
{
tmq_t
*
tmq
=
(
tmq_t
*
)
jtmq
;
tmq_t
*
tmq
=
(
tmq_t
*
)
jtmq
;
if
(
tmq
==
NULL
)
{
if
(
tmq
==
NULL
)
{
...
...
source/client/src/clientHb.c
浏览文件 @
97ca0968
...
@@ -878,12 +878,18 @@ int hbMgrInit() {
...
@@ -878,12 +878,18 @@ int hbMgrInit() {
clientHbMgr
.
appHbMgrs
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
clientHbMgr
.
appHbMgrs
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
TdThreadMutexAttr
attr
=
{
0
};
TdThreadMutexAttr
attr
=
{
0
};
taosThreadMutexAttrSetType
(
&
attr
,
PTHREAD_MUTEX_RECURSIVE
);
int
ret
=
taosThreadMutexAttrInit
(
&
attr
);
int
ret
=
taosThreadMutexAttrInit
(
&
attr
);
assert
(
ret
==
0
);
assert
(
ret
==
0
);
taosThreadMutexInit
(
&
clientHbMgr
.
lock
,
&
attr
);
ret
=
taosThreadMutexAttrSetType
(
&
attr
,
PTHREAD_MUTEX_RECURSIVE
);
taosThreadMutexAttrDestroy
(
&
attr
);
assert
(
ret
==
0
);
ret
=
taosThreadMutexInit
(
&
clientHbMgr
.
lock
,
&
attr
);
assert
(
ret
==
0
);
ret
=
taosThreadMutexAttrDestroy
(
&
attr
);
assert
(
ret
==
0
);
// init handle funcs
// init handle funcs
hbMgrInitHandle
();
hbMgrInitHandle
();
...
...
source/common/src/tdatablock.c
浏览文件 @
97ca0968
...
@@ -1446,6 +1446,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
...
@@ -1446,6 +1446,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
int32_t
payloadSize
=
pageSize
-
blockDataGetSerialMetaSize
(
numOfCols
);
int32_t
payloadSize
=
pageSize
-
blockDataGetSerialMetaSize
(
numOfCols
);
int32_t
rowSize
=
pBlock
->
info
.
rowSize
;
int32_t
rowSize
=
pBlock
->
info
.
rowSize
;
int32_t
nRows
=
payloadSize
/
rowSize
;
int32_t
nRows
=
payloadSize
/
rowSize
;
ASSERT
(
nRows
>=
1
);
// the true value must be less than the value of nRows
// the true value must be less than the value of nRows
int32_t
additional
=
0
;
int32_t
additional
=
0
;
...
...
source/common/src/tglobal.c
浏览文件 @
97ca0968
...
@@ -58,7 +58,7 @@ int32_t tsNumOfMnodeFetchThreads = 1;
...
@@ -58,7 +58,7 @@ int32_t tsNumOfMnodeFetchThreads = 1;
int32_t
tsNumOfMnodeReadThreads
=
1
;
int32_t
tsNumOfMnodeReadThreads
=
1
;
int32_t
tsNumOfVnodeQueryThreads
=
4
;
int32_t
tsNumOfVnodeQueryThreads
=
4
;
int32_t
tsNumOfVnodeStreamThreads
=
2
;
int32_t
tsNumOfVnodeStreamThreads
=
2
;
int32_t
tsNumOfVnodeFetchThreads
=
4
;
int32_t
tsNumOfVnodeFetchThreads
=
1
;
int32_t
tsNumOfVnodeWriteThreads
=
2
;
int32_t
tsNumOfVnodeWriteThreads
=
2
;
int32_t
tsNumOfVnodeSyncThreads
=
2
;
int32_t
tsNumOfVnodeSyncThreads
=
2
;
int32_t
tsNumOfVnodeRsmaThreads
=
2
;
int32_t
tsNumOfVnodeRsmaThreads
=
2
;
...
@@ -365,8 +365,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
...
@@ -365,8 +365,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeStreamThreads
=
TMAX
(
tsNumOfVnodeStreamThreads
,
4
);
tsNumOfVnodeStreamThreads
=
TMAX
(
tsNumOfVnodeStreamThreads
,
4
);
if
(
cfgAddInt32
(
pCfg
,
"numOfVnodeStreamThreads"
,
tsNumOfVnodeStreamThreads
,
4
,
1024
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"numOfVnodeStreamThreads"
,
tsNumOfVnodeStreamThreads
,
4
,
1024
,
0
)
!=
0
)
return
-
1
;
tsNumOfVnodeFetchThreads
=
tsNumOfCores
/
4
;
tsNumOfVnodeFetchThreads
=
1
;
tsNumOfVnodeFetchThreads
=
TMAX
(
tsNumOfVnodeFetchThreads
,
4
);
if
(
cfgAddInt32
(
pCfg
,
"numOfVnodeFetchThreads"
,
tsNumOfVnodeFetchThreads
,
4
,
1024
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"numOfVnodeFetchThreads"
,
tsNumOfVnodeFetchThreads
,
4
,
1024
,
0
)
!=
0
)
return
-
1
;
tsNumOfVnodeWriteThreads
=
tsNumOfCores
;
tsNumOfVnodeWriteThreads
=
tsNumOfCores
;
...
@@ -385,9 +384,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
...
@@ -385,9 +384,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfQnodeQueryThreads
=
TMAX
(
tsNumOfQnodeQueryThreads
,
4
);
tsNumOfQnodeQueryThreads
=
TMAX
(
tsNumOfQnodeQueryThreads
,
4
);
if
(
cfgAddInt32
(
pCfg
,
"numOfQnodeQueryThreads"
,
tsNumOfQnodeQueryThreads
,
1
,
1024
,
0
)
!=
0
)
return
-
1
;
if
(
cfgAddInt32
(
pCfg
,
"numOfQnodeQueryThreads"
,
tsNumOfQnodeQueryThreads
,
1
,
1024
,
0
)
!=
0
)
return
-
1
;
// tsNumOfQnodeFetchThreads = tsNumOfCores / 2;
// tsNumOfQnodeFetchThreads = tsNumOfCores / 2;
// tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4);
// tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4);
// if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1;
// if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1;
tsNumOfSnodeSharedThreads
=
tsNumOfCores
/
4
;
tsNumOfSnodeSharedThreads
=
tsNumOfCores
/
4
;
tsNumOfSnodeSharedThreads
=
TRANGE
(
tsNumOfSnodeSharedThreads
,
2
,
4
);
tsNumOfSnodeSharedThreads
=
TRANGE
(
tsNumOfSnodeSharedThreads
,
2
,
4
);
...
@@ -527,15 +526,15 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
...
@@ -527,15 +526,15 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem
->
stype
=
stype
;
pItem
->
stype
=
stype
;
}
}
/*
/*
pItem = cfgGetItem(tsCfg, "numOfQnodeFetchThreads");
pItem = cfgGetItem(tsCfg, "numOfQnodeFetchThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfQnodeFetchThreads = numOfCores / 2;
tsNumOfQnodeFetchThreads = numOfCores / 2;
tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4);
tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4);
pItem->i32 = tsNumOfQnodeFetchThreads;
pItem->i32 = tsNumOfQnodeFetchThreads;
pItem->stype = stype;
pItem->stype = stype;
}
}
*/
*/
pItem
=
cfgGetItem
(
tsCfg
,
"numOfSnodeSharedThreads"
);
pItem
=
cfgGetItem
(
tsCfg
,
"numOfSnodeSharedThreads"
);
if
(
pItem
!=
NULL
&&
pItem
->
stype
==
CFG_STYPE_DEFAULT
)
{
if
(
pItem
!=
NULL
&&
pItem
->
stype
==
CFG_STYPE_DEFAULT
)
{
...
@@ -693,7 +692,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
...
@@ -693,7 +692,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfVnodeSyncThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeSyncThreads"
)
->
i32
;
tsNumOfVnodeSyncThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeSyncThreads"
)
->
i32
;
tsNumOfVnodeRsmaThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeRsmaThreads"
)
->
i32
;
tsNumOfVnodeRsmaThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeRsmaThreads"
)
->
i32
;
tsNumOfQnodeQueryThreads
=
cfgGetItem
(
pCfg
,
"numOfQnodeQueryThreads"
)
->
i32
;
tsNumOfQnodeQueryThreads
=
cfgGetItem
(
pCfg
,
"numOfQnodeQueryThreads"
)
->
i32
;
// tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
// tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
tsNumOfSnodeSharedThreads
=
cfgGetItem
(
pCfg
,
"numOfSnodeSharedThreads"
)
->
i32
;
tsNumOfSnodeSharedThreads
=
cfgGetItem
(
pCfg
,
"numOfSnodeSharedThreads"
)
->
i32
;
tsNumOfSnodeUniqueThreads
=
cfgGetItem
(
pCfg
,
"numOfSnodeUniqueThreads"
)
->
i32
;
tsNumOfSnodeUniqueThreads
=
cfgGetItem
(
pCfg
,
"numOfSnodeUniqueThreads"
)
->
i32
;
tsRpcQueueMemoryAllowed
=
cfgGetItem
(
pCfg
,
"rpcQueueMemoryAllowed"
)
->
i64
;
tsRpcQueueMemoryAllowed
=
cfgGetItem
(
pCfg
,
"rpcQueueMemoryAllowed"
)
->
i64
;
...
@@ -941,10 +940,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
...
@@ -941,10 +940,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsNumOfVnodeRsmaThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeRsmaThreads"
)
->
i32
;
tsNumOfVnodeRsmaThreads
=
cfgGetItem
(
pCfg
,
"numOfVnodeRsmaThreads"
)
->
i32
;
}
else
if
(
strcasecmp
(
"numOfQnodeQueryThreads"
,
name
)
==
0
)
{
}
else
if
(
strcasecmp
(
"numOfQnodeQueryThreads"
,
name
)
==
0
)
{
tsNumOfQnodeQueryThreads
=
cfgGetItem
(
pCfg
,
"numOfQnodeQueryThreads"
)
->
i32
;
tsNumOfQnodeQueryThreads
=
cfgGetItem
(
pCfg
,
"numOfQnodeQueryThreads"
)
->
i32
;
/*
/*
} else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) {
} else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) {
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
*/
*/
}
else
if
(
strcasecmp
(
"numOfSnodeSharedThreads"
,
name
)
==
0
)
{
}
else
if
(
strcasecmp
(
"numOfSnodeSharedThreads"
,
name
)
==
0
)
{
tsNumOfSnodeSharedThreads
=
cfgGetItem
(
pCfg
,
"numOfSnodeSharedThreads"
)
->
i32
;
tsNumOfSnodeSharedThreads
=
cfgGetItem
(
pCfg
,
"numOfSnodeSharedThreads"
)
->
i32
;
}
else
if
(
strcasecmp
(
"numOfSnodeUniqueThreads"
,
name
)
==
0
)
{
}
else
if
(
strcasecmp
(
"numOfSnodeUniqueThreads"
,
name
)
==
0
)
{
...
...
source/dnode/vnode/src/inc/sma.h
浏览文件 @
97ca0968
...
@@ -146,6 +146,7 @@ struct SRSmaInfoItem {
...
@@ -146,6 +146,7 @@ struct SRSmaInfoItem {
uint16_t
nScanned
;
uint16_t
nScanned
;
int32_t
maxDelay
;
// ms
int32_t
maxDelay
;
// ms
tmr_h
tmrId
;
tmr_h
tmrId
;
void
*
pStreamState
;
};
};
struct
SRSmaInfo
{
struct
SRSmaInfo
{
...
@@ -224,8 +225,10 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
...
@@ -224,8 +225,10 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
int32_t
tdRSmaProcessExecImpl
(
SSma
*
pSma
,
ERsmaExecType
type
);
int32_t
tdRSmaProcessExecImpl
(
SSma
*
pSma
,
ERsmaExecType
type
);
int32_t
tdRSmaPersistExecImpl
(
SRSmaStat
*
pRSmaStat
,
SHashObj
*
pInfoHash
);
int32_t
tdRSmaPersistExecImpl
(
SRSmaStat
*
pRSmaStat
,
SHashObj
*
pInfoHash
);
int32_t
tdRSmaProcessRestoreImpl
(
SSma
*
pSma
,
int8_t
type
,
int64_t
qtaskFileVer
);
int32_t
tdRSmaProcessRestoreImpl
(
SSma
*
pSma
,
int8_t
type
,
int64_t
qtaskFileVer
);
void
tdRSmaQTaskInfoGetFileName
(
int32_t
vid
,
int64_t
version
,
char
*
outputName
);
void
tdRSmaQTaskInfoGetFileName
(
int32_t
vgId
,
int64_t
version
,
char
*
outputName
);
void
tdRSmaQTaskInfoGetFullName
(
int32_t
vid
,
int64_t
version
,
const
char
*
path
,
char
*
outputName
);
void
tdRSmaQTaskInfoGetFullName
(
int32_t
vgId
,
int64_t
version
,
const
char
*
path
,
char
*
outputName
);
void
tdRSmaQTaskInfoGetFullPath
(
int32_t
vgId
,
int8_t
level
,
const
char
*
path
,
char
*
outputName
);
void
tdRSmaQTaskInfoGetFullPathEx
(
int32_t
vgId
,
tb_uid_t
suid
,
int8_t
level
,
const
char
*
path
,
char
*
outputName
);
static
FORCE_INLINE
void
tdRefRSmaInfo
(
SSma
*
pSma
,
SRSmaInfo
*
pRSmaInfo
)
{
static
FORCE_INLINE
void
tdRefRSmaInfo
(
SSma
*
pSma
,
SRSmaInfo
*
pRSmaInfo
)
{
int32_t
ref
=
T_REF_INC
(
pRSmaInfo
);
int32_t
ref
=
T_REF_INC
(
pRSmaInfo
);
...
...
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
97ca0968
...
@@ -92,6 +92,18 @@ void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char *path,
...
@@ -92,6 +92,18 @@ void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char *path,
tdGetVndFileName
(
vgId
,
path
,
VNODE_RSMA_DIR
,
TD_QTASKINFO_FNAME_PREFIX
,
version
,
outputName
);
tdGetVndFileName
(
vgId
,
path
,
VNODE_RSMA_DIR
,
TD_QTASKINFO_FNAME_PREFIX
,
version
,
outputName
);
}
}
void
tdRSmaQTaskInfoGetFullPath
(
int32_t
vgId
,
int8_t
level
,
const
char
*
path
,
char
*
outputName
)
{
tdGetVndDirName
(
vgId
,
path
,
VNODE_RSMA_DIR
,
true
,
outputName
);
int32_t
rsmaLen
=
strlen
(
outputName
);
snprintf
(
outputName
+
rsmaLen
,
TSDB_FILENAME_LEN
-
rsmaLen
,
"%"
PRIi8
,
level
);
}
void
tdRSmaQTaskInfoGetFullPathEx
(
int32_t
vgId
,
tb_uid_t
suid
,
int8_t
level
,
const
char
*
path
,
char
*
outputName
)
{
tdGetVndDirName
(
vgId
,
path
,
VNODE_RSMA_DIR
,
true
,
outputName
);
int32_t
rsmaLen
=
strlen
(
outputName
);
snprintf
(
outputName
+
rsmaLen
,
TSDB_FILENAME_LEN
-
rsmaLen
,
"%"
PRIi64
"%s%"
PRIi8
,
suid
,
TD_DIRSEP
,
level
);
}
static
FORCE_INLINE
int32_t
tdRSmaQTaskInfoContLen
(
int32_t
lenWithHead
)
{
static
FORCE_INLINE
int32_t
tdRSmaQTaskInfoContLen
(
int32_t
lenWithHead
)
{
return
lenWithHead
-
RSMA_QTASKINFO_HEAD_LEN
;
return
lenWithHead
-
RSMA_QTASKINFO_HEAD_LEN
;
}
}
...
@@ -130,6 +142,10 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
...
@@ -130,6 +142,10 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
taosTmrStopA
(
&
pItem
->
tmrId
);
taosTmrStopA
(
&
pItem
->
tmrId
);
}
}
if
(
isDeepFree
&&
pItem
->
pStreamState
)
{
streamStateClose
(
pItem
->
pStreamState
);
}
if
(
isDeepFree
&&
pInfo
->
taskInfo
[
i
])
{
if
(
isDeepFree
&&
pInfo
->
taskInfo
[
i
])
{
tdRSmaQTaskInfoFree
(
&
pInfo
->
taskInfo
[
i
],
SMA_VID
(
pSma
),
i
+
1
);
tdRSmaQTaskInfoFree
(
&
pInfo
->
taskInfo
[
i
],
SMA_VID
(
pSma
),
i
+
1
);
}
else
{
}
else
{
...
@@ -290,12 +306,33 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
...
@@ -290,12 +306,33 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
SRetention
*
pRetention
=
SMA_RETENTION
(
pSma
);
SRetention
*
pRetention
=
SMA_RETENTION
(
pSma
);
STsdbCfg
*
pTsdbCfg
=
SMA_TSDB_CFG
(
pSma
);
STsdbCfg
*
pTsdbCfg
=
SMA_TSDB_CFG
(
pSma
);
SVnode
*
pVnode
=
pSma
->
pVnode
;
SVnode
*
pVnode
=
pSma
->
pVnode
;
char
taskInfDir
[
TSDB_FILENAME_LEN
]
=
{
0
};
void
*
pStreamState
=
NULL
;
// set the backend of stream state
tdRSmaQTaskInfoGetFullPathEx
(
TD_VID
(
pVnode
),
pRSmaInfo
->
suid
,
idx
+
1
,
tfsGetPrimaryPath
(
pVnode
->
pTfs
),
taskInfDir
);
if
(
!
taosCheckExistFile
(
taskInfDir
))
{
char
*
s
=
strdup
(
taskInfDir
);
if
(
taosMulMkDir
(
taosDirName
(
s
))
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
taosMemoryFree
(
s
);
return
TSDB_CODE_FAILED
;
}
taosMemoryFree
(
s
);
}
pStreamState
=
streamStateOpen
(
taskInfDir
,
NULL
,
true
);
if
(
!
pStreamState
)
{
terrno
=
TSDB_CODE_RSMA_STREAM_STATE_OPEN
;
return
TSDB_CODE_FAILED
;
}
SReadHandle
handle
=
{
SReadHandle
handle
=
{
.
meta
=
pVnode
->
pMeta
,
.
meta
=
pVnode
->
pMeta
,
.
vnode
=
pVnode
,
.
vnode
=
pVnode
,
.
initTqReader
=
1
,
.
initTqReader
=
1
,
.
pStateBackend
=
pStreamState
,
};
};
pRSmaInfo
->
taskInfo
[
idx
]
=
qCreateStreamExecTaskInfo
(
param
->
qmsg
[
idx
],
&
handle
);
pRSmaInfo
->
taskInfo
[
idx
]
=
qCreateStreamExecTaskInfo
(
param
->
qmsg
[
idx
],
&
handle
);
if
(
!
pRSmaInfo
->
taskInfo
[
idx
])
{
if
(
!
pRSmaInfo
->
taskInfo
[
idx
])
{
terrno
=
TSDB_CODE_RSMA_QTASKINFO_CREATE
;
terrno
=
TSDB_CODE_RSMA_QTASKINFO_CREATE
;
...
@@ -303,6 +340,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
...
@@ -303,6 +340,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
}
}
SRSmaInfoItem
*
pItem
=
&
(
pRSmaInfo
->
items
[
idx
]);
SRSmaInfoItem
*
pItem
=
&
(
pRSmaInfo
->
items
[
idx
]);
pItem
->
triggerStat
=
TASK_TRIGGER_STAT_ACTIVE
;
// fetch the data when reboot
pItem
->
triggerStat
=
TASK_TRIGGER_STAT_ACTIVE
;
// fetch the data when reboot
pItem
->
pStreamState
=
pStreamState
;
if
(
param
->
maxdelay
[
idx
]
<
TSDB_MIN_ROLLUP_MAX_DELAY
)
{
if
(
param
->
maxdelay
[
idx
]
<
TSDB_MIN_ROLLUP_MAX_DELAY
)
{
int64_t
msInterval
=
int64_t
msInterval
=
convertTimeFromPrecisionToUnit
(
pRetention
[
idx
+
1
].
freq
,
pTsdbCfg
->
precision
,
TIME_UNIT_MILLISECOND
);
convertTimeFromPrecisionToUnit
(
pRetention
[
idx
+
1
].
freq
,
pTsdbCfg
->
precision
,
TIME_UNIT_MILLISECOND
);
...
@@ -322,7 +360,6 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
...
@@ -322,7 +360,6 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
pItem
->
fetchLevel
=
pItem
->
level
;
pItem
->
fetchLevel
=
pItem
->
level
;
taosTmrReset
(
tdRSmaFetchTrigger
,
RSMA_FETCH_INTERVAL
,
pItem
,
smaMgmt
.
tmrHandle
,
&
pItem
->
tmrId
);
taosTmrReset
(
tdRSmaFetchTrigger
,
RSMA_FETCH_INTERVAL
,
pItem
,
smaMgmt
.
tmrHandle
,
&
pItem
->
tmrId
);
smaInfo
(
"vgId:%d, item:%p table:%"
PRIi64
" level:%"
PRIi8
" maxdelay:%"
PRIi64
" watermark:%"
PRIi64
smaInfo
(
"vgId:%d, item:%p table:%"
PRIi64
" level:%"
PRIi8
" maxdelay:%"
PRIi64
" watermark:%"
PRIi64
", finally maxdelay:%"
PRIi32
,
", finally maxdelay:%"
PRIi32
,
...
@@ -1226,16 +1263,17 @@ int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer)
...
@@ -1226,16 +1263,17 @@ int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer)
if
(
tdRSmaRestoreQTaskInfoInit
(
pSma
,
&
nTables
)
<
0
)
{
if
(
tdRSmaRestoreQTaskInfoInit
(
pSma
,
&
nTables
)
<
0
)
{
goto
_err
;
goto
_err
;
}
}
if
(
nTables
<=
0
)
{
if
(
nTables
<=
0
)
{
smaDebug
(
"vgId:%d, no need to restore rsma task %"
PRIi8
" since no tables"
,
SMA_VID
(
pSma
),
type
);
smaDebug
(
"vgId:%d, no need to restore rsma task %"
PRIi8
" since no tables"
,
SMA_VID
(
pSma
),
type
);
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
#if 0
// step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore
// step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore
if (tdRSmaRestoreQTaskInfoReload(pSma, type, qtaskFileVer) < 0) {
if (tdRSmaRestoreQTaskInfoReload(pSma, type, qtaskFileVer) < 0) {
goto _err;
goto _err;
}
}
#endif
// step 3: reload ts data from checkpoint
// step 3: reload ts data from checkpoint
if
(
tdRSmaRestoreTSDataReload
(
pSma
)
<
0
)
{
if
(
tdRSmaRestoreTSDataReload
(
pSma
)
<
0
)
{
...
@@ -1440,6 +1478,50 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIte
...
@@ -1440,6 +1478,50 @@ static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIte
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
tdRSmaPersistExecImpl
(
SRSmaStat
*
pRSmaStat
,
SHashObj
*
pInfoHash
)
{
SSma
*
pSma
=
pRSmaStat
->
pSma
;
SVnode
*
pVnode
=
pSma
->
pVnode
;
int32_t
vid
=
SMA_VID
(
pSma
);
if
(
taosHashGetSize
(
pInfoHash
)
<=
0
)
{
return
TSDB_CODE_SUCCESS
;
}
int64_t
fsMaxVer
=
tdRSmaFSMaxVer
(
pSma
,
pRSmaStat
);
if
(
pRSmaStat
->
commitAppliedVer
<=
fsMaxVer
)
{
smaDebug
(
"vgId:%d, rsma persist, no need as applied %"
PRIi64
" not larger than fsMaxVer %"
PRIi64
,
vid
,
pRSmaStat
->
commitAppliedVer
,
fsMaxVer
);
return
TSDB_CODE_SUCCESS
;
}
void
*
infoHash
=
NULL
;
while
((
infoHash
=
taosHashIterate
(
pInfoHash
,
infoHash
)))
{
SRSmaInfo
*
pRSmaInfo
=
*
(
SRSmaInfo
**
)
infoHash
;
if
(
RSMA_INFO_IS_DEL
(
pRSmaInfo
))
{
continue
;
}
for
(
int32_t
i
=
0
;
i
<
TSDB_RETENTION_L2
;
++
i
)
{
SRSmaInfoItem
*
pItem
=
RSMA_INFO_ITEM
(
pRSmaInfo
,
i
);
if
(
pItem
&&
pItem
->
pStreamState
)
{
if
(
streamStateCommit
(
pItem
->
pStreamState
)
<
0
)
{
terrno
=
TSDB_CODE_RSMA_STREAM_STATE_COMMIT
;
goto
_err
;
}
smaDebug
(
"vgId:%d, rsma persist, stream state commit success, table %"
PRIi64
" level %d"
,
vid
,
pRSmaInfo
->
suid
,
i
+
1
);
}
}
}
return
TSDB_CODE_SUCCESS
;
_err:
smaError
(
"vgId:%d, rsma persist failed since %s"
,
vid
,
terrstr
());
return
TSDB_CODE_FAILED
;
}
#if 0
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
SSma *pSma = pRSmaStat->pSma;
SSma *pSma = pRSmaStat->pSma;
SVnode *pVnode = pSma->pVnode;
SVnode *pVnode = pSma->pVnode;
...
@@ -1459,7 +1541,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
...
@@ -1459,7 +1541,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
int64_t fsMaxVer = tdRSmaFSMaxVer(pSma, pRSmaStat);
int64_t fsMaxVer = tdRSmaFSMaxVer(pSma, pRSmaStat);
if (pRSmaStat->commitAppliedVer <= fsMaxVer) {
if (pRSmaStat->commitAppliedVer <= fsMaxVer) {
smaDebug("vgId:%d, rsma persist, no need as applied %" PRIi64 " not larger than fsMaxVer %" PRIi64, vid,
smaDebug("vgId:%d, rsma persist, no need as applied %" PRIi64 " not larger than fsMaxVer %" PRIi64, vid,
pRSmaStat
->
commitAppliedVer
,
fsMaxVer
);
pRSmaStat->commitAppliedVer, fsMaxVer);
return TSDB_CODE_SUCCESS;
return TSDB_CODE_SUCCESS;
}
}
...
@@ -1579,6 +1661,8 @@ _err:
...
@@ -1579,6 +1661,8 @@ _err:
return TSDB_CODE_FAILED;
return TSDB_CODE_FAILED;
}
}
#endif
/**
/**
* @brief trigger to get rsma result in async mode
* @brief trigger to get rsma result in async mode
*
*
...
@@ -1926,7 +2010,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
...
@@ -1926,7 +2010,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
if
((
pEnv
->
flag
&
SMA_ENV_FLG_CLOSE
)
&&
(
atomic_load_64
(
&
pRSmaStat
->
nBufItems
)
<=
0
))
{
if
((
pEnv
->
flag
&
SMA_ENV_FLG_CLOSE
)
&&
(
atomic_load_64
(
&
pRSmaStat
->
nBufItems
)
<=
0
))
{
smaDebug
(
"vgId:%d, exec task end, flag:%"
PRIi8
", nBufItems:%"
PRIi64
,
SMA_VID
(
pSma
),
pEnv
->
flag
,
smaDebug
(
"vgId:%d, exec task end, flag:%"
PRIi8
", nBufItems:%"
PRIi64
,
SMA_VID
(
pSma
),
pEnv
->
flag
,
atomic_load_64
(
&
pRSmaStat
->
nBufItems
));
atomic_load_64
(
&
pRSmaStat
->
nBufItems
));
break
;
break
;
}
}
}
}
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
97ca0968
...
@@ -141,11 +141,8 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
...
@@ -141,11 +141,8 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
ASSERT
(
taosArrayGetSize
(
pRsp
->
blockData
)
==
pRsp
->
blockNum
);
ASSERT
(
taosArrayGetSize
(
pRsp
->
blockData
)
==
pRsp
->
blockNum
);
ASSERT
(
taosArrayGetSize
(
pRsp
->
blockDataLen
)
==
pRsp
->
blockNum
);
ASSERT
(
taosArrayGetSize
(
pRsp
->
blockDataLen
)
==
pRsp
->
blockNum
);
if
(
pRsp
->
withSchema
)
{
ASSERT
(
!
pRsp
->
withSchema
);
ASSERT
(
taosArrayGetSize
(
pRsp
->
blockSchema
)
==
pRsp
->
blockNum
);
ASSERT
(
taosArrayGetSize
(
pRsp
->
blockSchema
)
==
0
);
}
else
{
ASSERT
(
taosArrayGetSize
(
pRsp
->
blockSchema
)
==
0
);
}
if
(
pRsp
->
reqOffset
.
type
==
TMQ_OFFSET__LOG
)
{
if
(
pRsp
->
reqOffset
.
type
==
TMQ_OFFSET__LOG
)
{
if
(
pRsp
->
blockNum
>
0
)
{
if
(
pRsp
->
blockNum
>
0
)
{
...
@@ -760,7 +757,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
...
@@ -760,7 +757,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
// expand executor
// expand executor
if
(
pTask
->
taskLevel
==
TASK_LEVEL__SOURCE
)
{
if
(
pTask
->
taskLevel
==
TASK_LEVEL__SOURCE
)
{
pTask
->
pState
=
streamStateOpen
(
pTq
->
pStreamMeta
->
path
,
pTask
);
pTask
->
pState
=
streamStateOpen
(
pTq
->
pStreamMeta
->
path
,
pTask
,
false
);
if
(
pTask
->
pState
==
NULL
)
{
if
(
pTask
->
pState
==
NULL
)
{
return
-
1
;
return
-
1
;
}
}
...
@@ -774,7 +771,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
...
@@ -774,7 +771,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
pTask
->
exec
.
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
exec
.
qmsg
,
&
handle
);
pTask
->
exec
.
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
exec
.
qmsg
,
&
handle
);
ASSERT
(
pTask
->
exec
.
executor
);
ASSERT
(
pTask
->
exec
.
executor
);
}
else
if
(
pTask
->
taskLevel
==
TASK_LEVEL__AGG
)
{
}
else
if
(
pTask
->
taskLevel
==
TASK_LEVEL__AGG
)
{
pTask
->
pState
=
streamStateOpen
(
pTq
->
pStreamMeta
->
path
,
pTask
);
pTask
->
pState
=
streamStateOpen
(
pTq
->
pStreamMeta
->
path
,
pTask
,
false
);
if
(
pTask
->
pState
==
NULL
)
{
if
(
pTask
->
pState
==
NULL
)
{
return
-
1
;
return
-
1
;
}
}
...
...
source/dnode/vnode/src/tsdb/tsdbMergeTree.c
浏览文件 @
97ca0968
...
@@ -320,6 +320,7 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
...
@@ -320,6 +320,7 @@ void tLDataIterNextBlock(SLDataIter *pIter) {
pIter
->
pSttBlk
=
NULL
;
pIter
->
pSttBlk
=
NULL
;
if
(
index
!=
-
1
)
{
if
(
index
!=
-
1
)
{
pIter
->
iSttBlk
=
index
;
pIter
->
pSttBlk
=
(
SSttBlk
*
)
taosArrayGet
(
pIter
->
pBlockLoadInfo
->
aSttBlk
,
pIter
->
iSttBlk
);
pIter
->
pSttBlk
=
(
SSttBlk
*
)
taosArrayGet
(
pIter
->
pBlockLoadInfo
->
aSttBlk
,
pIter
->
iSttBlk
);
}
}
}
}
...
...
source/libs/executor/src/executil.c
浏览文件 @
97ca0968
...
@@ -989,7 +989,8 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
...
@@ -989,7 +989,8 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
if
(
pNode
->
output
)
{
if
(
pNode
->
output
)
{
(
*
numOfOutputCols
)
+=
1
;
(
*
numOfOutputCols
)
+=
1
;
}
else
{
}
else
if
(
info
!=
NULL
)
{
// select distinct tbname from stb where tbname='abc';
info
->
output
=
false
;
info
->
output
=
false
;
}
}
}
}
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
97ca0968
...
@@ -5297,12 +5297,12 @@ bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
...
@@ -5297,12 +5297,12 @@ bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
}
}
static
void
doModeAdd
(
SModeInfo
*
pInfo
,
char
*
data
)
{
static
void
doModeAdd
(
SModeInfo
*
pInfo
,
char
*
data
)
{
int32_t
hashKeyBytes
=
IS_
VA
R_DATA_TYPE
(
pInfo
->
colType
)
?
varDataTLen
(
data
)
:
pInfo
->
colBytes
;
int32_t
hashKeyBytes
=
IS_
ST
R_DATA_TYPE
(
pInfo
->
colType
)
?
varDataTLen
(
data
)
:
pInfo
->
colBytes
;
SModeItem
**
pHashItem
=
taosHashGet
(
pInfo
->
pHash
,
data
,
hashKeyBytes
);
SModeItem
**
pHashItem
=
taosHashGet
(
pInfo
->
pHash
,
data
,
hashKeyBytes
);
if
(
pHashItem
==
NULL
)
{
if
(
pHashItem
==
NULL
)
{
int32_t
size
=
sizeof
(
SModeItem
)
+
pInfo
->
colBytes
;
int32_t
size
=
sizeof
(
SModeItem
)
+
pInfo
->
colBytes
;
SModeItem
*
pItem
=
(
SModeItem
*
)(
pInfo
->
pItems
+
pInfo
->
numOfPoints
*
size
);
SModeItem
*
pItem
=
(
SModeItem
*
)(
pInfo
->
pItems
+
pInfo
->
numOfPoints
*
size
);
memcpy
(
pItem
->
data
,
data
,
pInfo
->
col
Bytes
);
memcpy
(
pItem
->
data
,
data
,
hashKey
Bytes
);
pItem
->
count
+=
1
;
pItem
->
count
+=
1
;
taosHashPut
(
pInfo
->
pHash
,
data
,
hashKeyBytes
,
&
pItem
,
sizeof
(
SModeItem
*
));
taosHashPut
(
pInfo
->
pHash
,
data
,
hashKeyBytes
,
&
pItem
,
sizeof
(
SModeItem
*
));
...
...
source/libs/nodes/src/nodesMsgFuncs.c
浏览文件 @
97ca0968
...
@@ -17,6 +17,18 @@
...
@@ -17,6 +17,18 @@
#include "plannodes.h"
#include "plannodes.h"
#include "tdatablock.h"
#include "tdatablock.h"
#ifndef htonll
#define htonll(x) \
(((int64_t)x & 0x00000000000000ff) << 7 * 8) | (((int64_t)x & 0x000000000000ff00) << 5 * 8) | \
(((int64_t)x & 0x0000000000ff0000) << 3 * 8) | (((int64_t)x & 0x00000000ff000000) << 1 * 8) | \
(((int64_t)x & 0x000000ff00000000) >> 1 * 8) | (((int64_t)x & 0x0000ff0000000000) >> 3 * 8) | \
(((int64_t)x & 0x00ff000000000000) >> 5 * 8) | (((int64_t)x & 0xff00000000000000) >> 7 * 8)
#define ntohll(x) htonll(x)
#endif
#define NODES_MSG_DEFAULT_LEN 1024
#define NODES_MSG_DEFAULT_LEN 1024
#define TLV_TYPE_ARRAY_ELEM 0
#define TLV_TYPE_ARRAY_ELEM 0
...
@@ -86,8 +98,8 @@ static int32_t tlvEncodeImpl(STlvEncoder* pEncoder, int16_t type, const void* pV
...
@@ -86,8 +98,8 @@ static int32_t tlvEncodeImpl(STlvEncoder* pEncoder, int16_t type, const void* pV
pEncoder
->
allocSize
=
pEncoder
->
allocSize
*
2
;
pEncoder
->
allocSize
=
pEncoder
->
allocSize
*
2
;
}
}
STlv
*
pTlv
=
(
STlv
*
)(
pEncoder
->
pBuf
+
pEncoder
->
offset
);
STlv
*
pTlv
=
(
STlv
*
)(
pEncoder
->
pBuf
+
pEncoder
->
offset
);
pTlv
->
type
=
type
;
pTlv
->
type
=
htons
(
type
)
;
pTlv
->
len
=
len
;
pTlv
->
len
=
htonl
(
len
)
;
memcpy
(
pTlv
->
value
,
pValue
,
len
);
memcpy
(
pTlv
->
value
,
pValue
,
len
);
pEncoder
->
offset
+=
tlvLen
;
pEncoder
->
offset
+=
tlvLen
;
++
(
pEncoder
->
tlvCount
);
++
(
pEncoder
->
tlvCount
);
...
@@ -117,26 +129,32 @@ static int32_t tlvEncodeValueI8(STlvEncoder* pEncoder, int8_t value) {
...
@@ -117,26 +129,32 @@ static int32_t tlvEncodeValueI8(STlvEncoder* pEncoder, int8_t value) {
}
}
static
int32_t
tlvEncodeI16
(
STlvEncoder
*
pEncoder
,
int16_t
type
,
int16_t
value
)
{
static
int32_t
tlvEncodeI16
(
STlvEncoder
*
pEncoder
,
int16_t
type
,
int16_t
value
)
{
value
=
htons
(
value
);
return
tlvEncodeImpl
(
pEncoder
,
type
,
&
value
,
sizeof
(
value
));
return
tlvEncodeImpl
(
pEncoder
,
type
,
&
value
,
sizeof
(
value
));
}
}
static
int32_t
tlvEncodeValueI16
(
STlvEncoder
*
pEncoder
,
int16_t
value
)
{
static
int32_t
tlvEncodeValueI16
(
STlvEncoder
*
pEncoder
,
int16_t
value
)
{
value
=
htons
(
value
);
return
tlvEncodeValueImpl
(
pEncoder
,
&
value
,
sizeof
(
value
));
return
tlvEncodeValueImpl
(
pEncoder
,
&
value
,
sizeof
(
value
));
}
}
static
int32_t
tlvEncodeI32
(
STlvEncoder
*
pEncoder
,
int16_t
type
,
int32_t
value
)
{
static
int32_t
tlvEncodeI32
(
STlvEncoder
*
pEncoder
,
int16_t
type
,
int32_t
value
)
{
value
=
htonl
(
value
);
return
tlvEncodeImpl
(
pEncoder
,
type
,
&
value
,
sizeof
(
value
));
return
tlvEncodeImpl
(
pEncoder
,
type
,
&
value
,
sizeof
(
value
));
}
}
static
int32_t
tlvEncodeValueI32
(
STlvEncoder
*
pEncoder
,
int32_t
value
)
{
static
int32_t
tlvEncodeValueI32
(
STlvEncoder
*
pEncoder
,
int32_t
value
)
{
value
=
htonl
(
value
);
return
tlvEncodeValueImpl
(
pEncoder
,
&
value
,
sizeof
(
value
));
return
tlvEncodeValueImpl
(
pEncoder
,
&
value
,
sizeof
(
value
));
}
}
static
int32_t
tlvEncodeI64
(
STlvEncoder
*
pEncoder
,
int16_t
type
,
int64_t
value
)
{
static
int32_t
tlvEncodeI64
(
STlvEncoder
*
pEncoder
,
int16_t
type
,
int64_t
value
)
{
value
=
htonll
(
value
);
return
tlvEncodeImpl
(
pEncoder
,
type
,
&
value
,
sizeof
(
value
));
return
tlvEncodeImpl
(
pEncoder
,
type
,
&
value
,
sizeof
(
value
));
}
}
static
int32_t
tlvEncodeValueI64
(
STlvEncoder
*
pEncoder
,
int64_t
value
)
{
static
int32_t
tlvEncodeValueI64
(
STlvEncoder
*
pEncoder
,
int64_t
value
)
{
value
=
htonll
(
value
);
return
tlvEncodeValueImpl
(
pEncoder
,
&
value
,
sizeof
(
value
));
return
tlvEncodeValueImpl
(
pEncoder
,
&
value
,
sizeof
(
value
));
}
}
...
@@ -149,34 +167,44 @@ static int32_t tlvEncodeValueU8(STlvEncoder* pEncoder, uint8_t value) {
...
@@ -149,34 +167,44 @@ static int32_t tlvEncodeValueU8(STlvEncoder* pEncoder, uint8_t value) {
}
}
static
int32_t
tlvEncodeU16
(
STlvEncoder
*
pEncoder
,
int16_t
type
,
uint16_t
value
)
{
static
int32_t
tlvEncodeU16
(
STlvEncoder
*
pEncoder
,
int16_t
type
,
uint16_t
value
)
{
value
=
htons
(
value
);
return
tlvEncodeImpl
(
pEncoder
,
type
,
&
value
,
sizeof
(
value
));
return
tlvEncodeImpl
(
pEncoder
,
type
,
&
value
,
sizeof
(
value
));
}
}
static
int32_t
tlvEncodeValueU16
(
STlvEncoder
*
pEncoder
,
uint16_t
value
)
{
static
int32_t
tlvEncodeValueU16
(
STlvEncoder
*
pEncoder
,
uint16_t
value
)
{
value
=
htons
(
value
);
return
tlvEncodeValueImpl
(
pEncoder
,
&
value
,
sizeof
(
value
));
return
tlvEncodeValueImpl
(
pEncoder
,
&
value
,
sizeof
(
value
));
}
}
static
int32_t
tlvEncodeU64
(
STlvEncoder
*
pEncoder
,
int16_t
type
,
uint64_t
value
)
{
static
int32_t
tlvEncodeU64
(
STlvEncoder
*
pEncoder
,
int16_t
type
,
uint64_t
value
)
{
value
=
htonll
(
value
);
return
tlvEncodeImpl
(
pEncoder
,
type
,
&
value
,
sizeof
(
value
));
return
tlvEncodeImpl
(
pEncoder
,
type
,
&
value
,
sizeof
(
value
));
}
}
static
int32_t
tlvEncodeValueU64
(
STlvEncoder
*
pEncoder
,
uint64_t
value
)
{
static
int32_t
tlvEncodeValueU64
(
STlvEncoder
*
pEncoder
,
uint64_t
value
)
{
value
=
htonll
(
value
);
return
tlvEncodeValueImpl
(
pEncoder
,
&
value
,
sizeof
(
value
));
return
tlvEncodeValueImpl
(
pEncoder
,
&
value
,
sizeof
(
value
));
}
}
static
int32_t
tlvEncodeDouble
(
STlvEncoder
*
pEncoder
,
int16_t
type
,
double
value
)
{
static
int32_t
tlvEncodeDouble
(
STlvEncoder
*
pEncoder
,
int16_t
type
,
double
value
)
{
return
tlvEncodeImpl
(
pEncoder
,
type
,
&
value
,
sizeof
(
value
));
int64_t
temp
=
*
(
int64_t
*
)
&
value
;
temp
=
htonll
(
temp
);
return
tlvEncodeImpl
(
pEncoder
,
type
,
&
temp
,
sizeof
(
temp
));
}
}
static
int32_t
tlvEncodeValueDouble
(
STlvEncoder
*
pEncoder
,
double
value
)
{
static
int32_t
tlvEncodeValueDouble
(
STlvEncoder
*
pEncoder
,
double
value
)
{
return
tlvEncodeValueImpl
(
pEncoder
,
&
value
,
sizeof
(
value
));
int64_t
temp
=
*
(
int64_t
*
)
&
value
;
temp
=
htonll
(
temp
);
return
tlvEncodeValueImpl
(
pEncoder
,
&
temp
,
sizeof
(
temp
));
}
}
static
int32_t
tlvEncodeEnum
(
STlvEncoder
*
pEncoder
,
int16_t
type
,
int32_t
value
)
{
static
int32_t
tlvEncodeEnum
(
STlvEncoder
*
pEncoder
,
int16_t
type
,
int32_t
value
)
{
value
=
htonl
(
value
);
return
tlvEncodeImpl
(
pEncoder
,
type
,
&
value
,
sizeof
(
value
));
return
tlvEncodeImpl
(
pEncoder
,
type
,
&
value
,
sizeof
(
value
));
}
}
static
int32_t
tlvEncodeValueEnum
(
STlvEncoder
*
pEncoder
,
int32_t
value
)
{
static
int32_t
tlvEncodeValueEnum
(
STlvEncoder
*
pEncoder
,
int32_t
value
)
{
value
=
htonl
(
value
);
return
tlvEncodeValueImpl
(
pEncoder
,
&
value
,
sizeof
(
value
));
return
tlvEncodeValueImpl
(
pEncoder
,
&
value
,
sizeof
(
value
));
}
}
...
@@ -197,7 +225,7 @@ static int32_t tlvEncodeCStr(STlvEncoder* pEncoder, int16_t type, const char* pV
...
@@ -197,7 +225,7 @@ static int32_t tlvEncodeCStr(STlvEncoder* pEncoder, int16_t type, const char* pV
static
int32_t
tlvEncodeValueCStr
(
STlvEncoder
*
pEncoder
,
const
char
*
pValue
)
{
static
int32_t
tlvEncodeValueCStr
(
STlvEncoder
*
pEncoder
,
const
char
*
pValue
)
{
int16_t
len
=
strlen
(
pValue
);
int16_t
len
=
strlen
(
pValue
);
int32_t
code
=
tlvEncodeValueI
mpl
(
pEncoder
,
&
len
,
sizeof
(
len
)
);
int32_t
code
=
tlvEncodeValueI
16
(
pEncoder
,
len
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tlvEncodeValueImpl
(
pEncoder
,
pValue
,
len
);
code
=
tlvEncodeValueImpl
(
pEncoder
,
pValue
,
len
);
}
}
...
@@ -218,8 +246,8 @@ static int32_t tlvEncodeObj(STlvEncoder* pEncoder, int16_t type, FToMsg func, co
...
@@ -218,8 +246,8 @@ static int32_t tlvEncodeObj(STlvEncoder* pEncoder, int16_t type, FToMsg func, co
int32_t
code
=
func
(
pObj
,
pEncoder
);
int32_t
code
=
func
(
pObj
,
pEncoder
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
TSDB_CODE_SUCCESS
==
code
)
{
STlv
*
pTlv
=
(
STlv
*
)(
pEncoder
->
pBuf
+
start
);
STlv
*
pTlv
=
(
STlv
*
)(
pEncoder
->
pBuf
+
start
);
pTlv
->
type
=
type
;
pTlv
->
type
=
htons
(
type
)
;
pTlv
->
len
=
pEncoder
->
offset
-
start
-
sizeof
(
STlv
);
pTlv
->
len
=
htonl
(
pEncoder
->
offset
-
start
-
sizeof
(
STlv
)
);
}
}
++
(
pEncoder
->
tlvCount
);
++
(
pEncoder
->
tlvCount
);
return
code
;
return
code
;
...
@@ -236,8 +264,8 @@ static int32_t tlvEncodeObjArray(STlvEncoder* pEncoder, int16_t type, FToMsg fun
...
@@ -236,8 +264,8 @@ static int32_t tlvEncodeObjArray(STlvEncoder* pEncoder, int16_t type, FToMsg fun
}
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
TSDB_CODE_SUCCESS
==
code
)
{
STlv
*
pTlv
=
(
STlv
*
)(
pEncoder
->
pBuf
+
start
);
STlv
*
pTlv
=
(
STlv
*
)(
pEncoder
->
pBuf
+
start
);
pTlv
->
type
=
type
;
pTlv
->
type
=
htons
(
type
)
;
pTlv
->
len
=
pEncoder
->
offset
-
start
-
sizeof
(
STlv
);
pTlv
->
len
=
htonl
(
pEncoder
->
offset
-
start
-
sizeof
(
STlv
)
);
}
}
}
}
return
code
;
return
code
;
...
@@ -259,6 +287,8 @@ static int32_t tlvGetNextTlv(STlvDecoder* pDecoder, STlv** pTlv) {
...
@@ -259,6 +287,8 @@ static int32_t tlvGetNextTlv(STlvDecoder* pDecoder, STlv** pTlv) {
}
}
*
pTlv
=
(
STlv
*
)(
pDecoder
->
pBuf
+
pDecoder
->
offset
);
*
pTlv
=
(
STlv
*
)(
pDecoder
->
pBuf
+
pDecoder
->
offset
);
(
*
pTlv
)
->
type
=
ntohs
((
*
pTlv
)
->
type
);
(
*
pTlv
)
->
len
=
ntohl
((
*
pTlv
)
->
len
);
if
((
*
pTlv
)
->
len
+
pDecoder
->
offset
>
pDecoder
->
bufSize
)
{
if
((
*
pTlv
)
->
len
+
pDecoder
->
offset
>
pDecoder
->
bufSize
)
{
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
...
@@ -291,22 +321,52 @@ static int32_t tlvDecodeValueI8(STlvDecoder* pDecoder, int8_t* pValue) {
...
@@ -291,22 +321,52 @@ static int32_t tlvDecodeValueI8(STlvDecoder* pDecoder, int8_t* pValue) {
return
tlvDecodeValueImpl
(
pDecoder
,
pValue
,
sizeof
(
*
pValue
));
return
tlvDecodeValueImpl
(
pDecoder
,
pValue
,
sizeof
(
*
pValue
));
}
}
static
int32_t
tlvDecodeI16
(
STlv
*
pTlv
,
int16_t
*
pValue
)
{
return
tlvDecodeImpl
(
pTlv
,
pValue
,
sizeof
(
*
pValue
));
}
static
int32_t
tlvDecodeI16
(
STlv
*
pTlv
,
int16_t
*
pValue
)
{
int32_t
code
=
tlvDecodeImpl
(
pTlv
,
pValue
,
sizeof
(
*
pValue
));
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pValue
=
ntohs
(
*
pValue
);
}
return
code
;
}
static
int32_t
tlvDecodeValueI16
(
STlvDecoder
*
pDecoder
,
int16_t
*
pValue
)
{
static
int32_t
tlvDecodeValueI16
(
STlvDecoder
*
pDecoder
,
int16_t
*
pValue
)
{
return
tlvDecodeValueImpl
(
pDecoder
,
pValue
,
sizeof
(
*
pValue
));
int32_t
code
=
tlvDecodeValueImpl
(
pDecoder
,
pValue
,
sizeof
(
*
pValue
));
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pValue
=
ntohs
(
*
pValue
);
}
return
code
;
}
}
static
int32_t
tlvDecodeI32
(
STlv
*
pTlv
,
int32_t
*
pValue
)
{
return
tlvDecodeImpl
(
pTlv
,
pValue
,
sizeof
(
*
pValue
));
}
static
int32_t
tlvDecodeI32
(
STlv
*
pTlv
,
int32_t
*
pValue
)
{
int32_t
code
=
tlvDecodeImpl
(
pTlv
,
pValue
,
sizeof
(
*
pValue
));
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pValue
=
ntohl
(
*
pValue
);
}
return
code
;
}
static
int32_t
tlvDecodeValueI32
(
STlvDecoder
*
pDecoder
,
int32_t
*
pValue
)
{
static
int32_t
tlvDecodeValueI32
(
STlvDecoder
*
pDecoder
,
int32_t
*
pValue
)
{
return
tlvDecodeValueImpl
(
pDecoder
,
pValue
,
sizeof
(
*
pValue
));
int32_t
code
=
tlvDecodeValueImpl
(
pDecoder
,
pValue
,
sizeof
(
*
pValue
));
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pValue
=
ntohl
(
*
pValue
);
}
return
code
;
}
}
static
int32_t
tlvDecodeI64
(
STlv
*
pTlv
,
int64_t
*
pValue
)
{
return
tlvDecodeImpl
(
pTlv
,
pValue
,
sizeof
(
*
pValue
));
}
static
int32_t
tlvDecodeI64
(
STlv
*
pTlv
,
int64_t
*
pValue
)
{
int32_t
code
=
tlvDecodeImpl
(
pTlv
,
pValue
,
sizeof
(
*
pValue
));
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pValue
=
ntohll
(
*
pValue
);
}
return
code
;
}
static
int32_t
tlvDecodeValueI64
(
STlvDecoder
*
pDecoder
,
int64_t
*
pValue
)
{
static
int32_t
tlvDecodeValueI64
(
STlvDecoder
*
pDecoder
,
int64_t
*
pValue
)
{
return
tlvDecodeValueImpl
(
pDecoder
,
pValue
,
sizeof
(
*
pValue
));
int32_t
code
=
tlvDecodeValueImpl
(
pDecoder
,
pValue
,
sizeof
(
*
pValue
));
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pValue
=
ntohll
(
*
pValue
);
}
return
code
;
}
}
static
int32_t
tlvDecodeU8
(
STlv
*
pTlv
,
uint8_t
*
pValue
)
{
return
tlvDecodeImpl
(
pTlv
,
pValue
,
sizeof
(
*
pValue
));
}
static
int32_t
tlvDecodeU8
(
STlv
*
pTlv
,
uint8_t
*
pValue
)
{
return
tlvDecodeImpl
(
pTlv
,
pValue
,
sizeof
(
*
pValue
));
}
...
@@ -315,22 +375,54 @@ static int32_t tlvDecodeValueU8(STlvDecoder* pDecoder, uint8_t* pValue) {
...
@@ -315,22 +375,54 @@ static int32_t tlvDecodeValueU8(STlvDecoder* pDecoder, uint8_t* pValue) {
return
tlvDecodeValueImpl
(
pDecoder
,
pValue
,
sizeof
(
*
pValue
));
return
tlvDecodeValueImpl
(
pDecoder
,
pValue
,
sizeof
(
*
pValue
));
}
}
static
int32_t
tlvDecodeU16
(
STlv
*
pTlv
,
uint16_t
*
pValue
)
{
return
tlvDecodeImpl
(
pTlv
,
pValue
,
sizeof
(
*
pValue
));
}
static
int32_t
tlvDecodeU16
(
STlv
*
pTlv
,
uint16_t
*
pValue
)
{
int32_t
code
=
tlvDecodeImpl
(
pTlv
,
pValue
,
sizeof
(
*
pValue
));
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pValue
=
ntohs
(
*
pValue
);
}
return
code
;
}
static
int32_t
tlvDecodeValueU16
(
STlvDecoder
*
pDecoder
,
uint16_t
*
pValue
)
{
static
int32_t
tlvDecodeValueU16
(
STlvDecoder
*
pDecoder
,
uint16_t
*
pValue
)
{
return
tlvDecodeValueImpl
(
pDecoder
,
pValue
,
sizeof
(
*
pValue
));
int32_t
code
=
tlvDecodeValueImpl
(
pDecoder
,
pValue
,
sizeof
(
*
pValue
));
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pValue
=
ntohs
(
*
pValue
);
}
return
code
;
}
}
static
int32_t
tlvDecodeU64
(
STlv
*
pTlv
,
uint64_t
*
pValue
)
{
return
tlvDecodeImpl
(
pTlv
,
pValue
,
sizeof
(
*
pValue
));
}
static
int32_t
tlvDecodeU64
(
STlv
*
pTlv
,
uint64_t
*
pValue
)
{
int32_t
code
=
tlvDecodeImpl
(
pTlv
,
pValue
,
sizeof
(
*
pValue
));
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pValue
=
ntohll
(
*
pValue
);
}
return
code
;
}
static
int32_t
tlvDecodeValueU64
(
STlvDecoder
*
pDecoder
,
uint64_t
*
pValue
)
{
static
int32_t
tlvDecodeValueU64
(
STlvDecoder
*
pDecoder
,
uint64_t
*
pValue
)
{
return
tlvDecodeValueImpl
(
pDecoder
,
pValue
,
sizeof
(
*
pValue
));
int32_t
code
=
tlvDecodeValueImpl
(
pDecoder
,
pValue
,
sizeof
(
*
pValue
));
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pValue
=
ntohll
(
*
pValue
);
}
return
code
;
}
}
static
int32_t
tlvDecodeDouble
(
STlv
*
pTlv
,
double
*
pValue
)
{
return
tlvDecodeImpl
(
pTlv
,
pValue
,
sizeof
(
*
pValue
));
}
static
int32_t
tlvDecodeDouble
(
STlv
*
pTlv
,
double
*
pValue
)
{
int64_t
temp
=
0
;
int32_t
code
=
tlvDecodeI64
(
pTlv
,
&
temp
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pValue
=
*
(
double
*
)
&
temp
;
}
return
code
;
}
static
int32_t
tlvDecodeValueDouble
(
STlvDecoder
*
pDecoder
,
double
*
pValue
)
{
static
int32_t
tlvDecodeValueDouble
(
STlvDecoder
*
pDecoder
,
double
*
pValue
)
{
return
tlvDecodeValueImpl
(
pDecoder
,
pValue
,
sizeof
(
*
pValue
));
int64_t
temp
=
0
;
int32_t
code
=
tlvDecodeValueI64
(
pDecoder
,
&
temp
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pValue
=
*
(
double
*
)
&
temp
;
}
return
code
;
}
}
static
int32_t
convertIntegerType
(
int32_t
value
,
void
*
pValue
,
int16_t
len
)
{
static
int32_t
convertIntegerType
(
int32_t
value
,
void
*
pValue
,
int16_t
len
)
{
...
@@ -2462,33 +2554,54 @@ static int32_t msgToPhysiWindowNode(STlvDecoder* pDecoder, void* pObj) {
...
@@ -2462,33 +2554,54 @@ static int32_t msgToPhysiWindowNode(STlvDecoder* pDecoder, void* pObj) {
return
code
;
return
code
;
}
}
enum
{
enum
{
PHY_INTERVAL_CODE_WINDOW
=
1
,
PHY_INTERVAL_CODE_INLINE_ATTRS
};
PHY_INTERVAL_CODE_WINDOW
=
1
,
PHY_INTERVAL_CODE_INTERVAL
,
static
int32_t
physiIntervalNodeInlineToMsg
(
const
void
*
pObj
,
STlvEncoder
*
pEncoder
)
{
PHY_INTERVAL_CODE_OFFSET
,
const
SIntervalPhysiNode
*
pNode
=
(
const
SIntervalPhysiNode
*
)
pObj
;
PHY_INTERVAL_CODE_SLIDING
,
PHY_INTERVAL_CODE_INTERVAL_UNIT
,
int32_t
code
=
tlvEncodeValueI64
(
pEncoder
,
pNode
->
interval
);
PHY_INTERVAL_CODE_SLIDING_UNIT
if
(
TSDB_CODE_SUCCESS
==
code
)
{
};
code
=
tlvEncodeValueI64
(
pEncoder
,
pNode
->
offset
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tlvEncodeValueI64
(
pEncoder
,
pNode
->
sliding
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tlvEncodeValueI8
(
pEncoder
,
pNode
->
intervalUnit
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tlvEncodeValueI8
(
pEncoder
,
pNode
->
slidingUnit
);
}
return
code
;
}
static
int32_t
physiIntervalNodeToMsg
(
const
void
*
pObj
,
STlvEncoder
*
pEncoder
)
{
static
int32_t
physiIntervalNodeToMsg
(
const
void
*
pObj
,
STlvEncoder
*
pEncoder
)
{
const
SIntervalPhysiNode
*
pNode
=
(
const
SIntervalPhysiNode
*
)
pObj
;
const
SIntervalPhysiNode
*
pNode
=
(
const
SIntervalPhysiNode
*
)
pObj
;
int32_t
code
=
tlvEncodeObj
(
pEncoder
,
PHY_INTERVAL_CODE_WINDOW
,
physiWindowNodeToMsg
,
&
pNode
->
window
);
int32_t
code
=
tlvEncodeObj
(
pEncoder
,
PHY_INTERVAL_CODE_WINDOW
,
physiWindowNodeToMsg
,
&
pNode
->
window
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tlvEncode
I64
(
pEncoder
,
PHY_INTERVAL_CODE_INTERVAL
,
pNode
->
interval
);
code
=
tlvEncode
Obj
(
pEncoder
,
PHY_INTERVAL_CODE_INLINE_ATTRS
,
physiIntervalNodeInlineToMsg
,
pNode
);
}
}
return
code
;
}
static
int32_t
msgToPhysiIntervalNodeInline
(
STlvDecoder
*
pDecoder
,
void
*
pObj
)
{
SIntervalPhysiNode
*
pNode
=
(
SIntervalPhysiNode
*
)
pObj
;
int32_t
code
=
tlvDecodeValueI64
(
pDecoder
,
&
pNode
->
interval
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tlv
EncodeI64
(
pEncoder
,
PHY_INTERVAL_CODE_OFFSET
,
pNode
->
offset
);
code
=
tlv
DecodeValueI64
(
pDecoder
,
&
pNode
->
offset
);
}
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tlv
EncodeI64
(
pEncoder
,
PHY_INTERVAL_CODE_SLIDING
,
pNode
->
sliding
);
code
=
tlv
DecodeValueI64
(
pDecoder
,
&
pNode
->
sliding
);
}
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tlv
EncodeI8
(
pEncoder
,
PHY_INTERVAL_CODE_INTERVAL_UNIT
,
pNode
->
intervalUnit
);
code
=
tlv
DecodeValueI8
(
pDecoder
,
&
pNode
->
intervalUnit
);
}
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tlv
EncodeI8
(
pEncoder
,
PHY_INTERVAL_CODE_SLIDING_UNIT
,
pNode
->
slidingUnit
);
code
=
tlv
DecodeValueI8
(
pDecoder
,
&
pNode
->
slidingUnit
);
}
}
return
code
;
return
code
;
...
@@ -2504,20 +2617,8 @@ static int32_t msgToPhysiIntervalNode(STlvDecoder* pDecoder, void* pObj) {
...
@@ -2504,20 +2617,8 @@ static int32_t msgToPhysiIntervalNode(STlvDecoder* pDecoder, void* pObj) {
case
PHY_INTERVAL_CODE_WINDOW
:
case
PHY_INTERVAL_CODE_WINDOW
:
code
=
tlvDecodeObjFromTlv
(
pTlv
,
msgToPhysiWindowNode
,
&
pNode
->
window
);
code
=
tlvDecodeObjFromTlv
(
pTlv
,
msgToPhysiWindowNode
,
&
pNode
->
window
);
break
;
break
;
case
PHY_INTERVAL_CODE_INTERVAL
:
case
PHY_INTERVAL_CODE_INLINE_ATTRS
:
code
=
tlvDecodeI64
(
pTlv
,
&
pNode
->
interval
);
code
=
tlvDecodeObjFromTlv
(
pTlv
,
msgToPhysiIntervalNodeInline
,
pNode
);
break
;
case
PHY_INTERVAL_CODE_OFFSET
:
code
=
tlvDecodeI64
(
pTlv
,
&
pNode
->
offset
);
break
;
case
PHY_INTERVAL_CODE_SLIDING
:
code
=
tlvDecodeI64
(
pTlv
,
&
pNode
->
sliding
);
break
;
case
PHY_INTERVAL_CODE_INTERVAL_UNIT
:
code
=
tlvDecodeI8
(
pTlv
,
&
pNode
->
intervalUnit
);
break
;
case
PHY_INTERVAL_CODE_SLIDING_UNIT
:
code
=
tlvDecodeI8
(
pTlv
,
&
pNode
->
slidingUnit
);
break
;
break
;
default:
default:
break
;
break
;
...
...
source/libs/planner/test/planTestUtil.cpp
浏览文件 @
97ca0968
...
@@ -473,10 +473,11 @@ class PlannerTestBaseImpl {
...
@@ -473,10 +473,11 @@ class PlannerTestBaseImpl {
cout
<<
"nodesNodeToMsg: "
cout
<<
"nodesNodeToMsg: "
<<
chrono
::
duration_cast
<
chrono
::
microseconds
>
(
chrono
::
steady_clock
::
now
()
-
start
).
count
()
<<
"us"
<<
endl
;
<<
chrono
::
duration_cast
<
chrono
::
microseconds
>
(
chrono
::
steady_clock
::
now
()
-
start
).
count
()
<<
"us"
<<
endl
;
string
copyStr
(
pStr
,
len
);
SNode
*
pNode
=
NULL
;
SNode
*
pNode
=
NULL
;
char
*
pNewStr
=
NULL
;
char
*
pNewStr
=
NULL
;
int32_t
newlen
=
0
;
int32_t
newlen
=
0
;
DO_WITH_THROW
(
nodesMsgToNode
,
pStr
,
len
,
&
pNode
)
DO_WITH_THROW
(
nodesMsgToNode
,
copyStr
.
c_str
()
,
len
,
&
pNode
)
DO_WITH_THROW
(
nodesNodeToMsg
,
pNode
,
&
pNewStr
,
&
newlen
)
DO_WITH_THROW
(
nodesNodeToMsg
,
pNode
,
&
pNewStr
,
&
newlen
)
if
(
newlen
!=
len
||
0
!=
memcmp
(
pStr
,
pNewStr
,
len
))
{
if
(
newlen
!=
len
||
0
!=
memcmp
(
pStr
,
pNewStr
,
len
))
{
cout
<<
"nodesNodeToMsg error!!!!!!!!!!!!!! len = "
<<
len
<<
", newlen = "
<<
newlen
<<
endl
;
cout
<<
"nodesNodeToMsg error!!!!!!!!!!!!!! len = "
<<
len
<<
", newlen = "
<<
newlen
<<
endl
;
...
...
source/libs/stream/src/streamState.c
浏览文件 @
97ca0968
...
@@ -18,14 +18,19 @@
...
@@ -18,14 +18,19 @@
#include "tcommon.h"
#include "tcommon.h"
#include "ttimer.h"
#include "ttimer.h"
SStreamState
*
streamStateOpen
(
char
*
path
,
SStreamTask
*
pTask
)
{
SStreamState
*
streamStateOpen
(
char
*
path
,
SStreamTask
*
pTask
,
bool
specPath
)
{
SStreamState
*
pState
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamState
));
SStreamState
*
pState
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamState
));
if
(
pState
==
NULL
)
{
if
(
pState
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
return
NULL
;
}
}
char
statePath
[
300
];
char
statePath
[
300
];
sprintf
(
statePath
,
"%s/%d"
,
path
,
pTask
->
taskId
);
if
(
!
specPath
)
{
sprintf
(
statePath
,
"%s/%d"
,
path
,
pTask
->
taskId
);
}
else
{
memcpy
(
statePath
,
path
,
300
);
}
if
(
tdbOpen
(
statePath
,
4096
,
256
,
&
pState
->
db
)
<
0
)
{
if
(
tdbOpen
(
statePath
,
4096
,
256
,
&
pState
->
db
)
<
0
)
{
goto
_err
;
goto
_err
;
}
}
...
...
source/libs/tdb/src/db/tdbBtree.c
浏览文件 @
97ca0968
...
@@ -841,6 +841,10 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
...
@@ -841,6 +841,10 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
// copy content to the parent page
// copy content to the parent page
tdbBtreeInitPage
(
pParent
,
&
(
SBtreeInitPageArg
){.
flags
=
flags
,
.
pBt
=
pBt
},
0
);
tdbBtreeInitPage
(
pParent
,
&
(
SBtreeInitPageArg
){.
flags
=
flags
,
.
pBt
=
pBt
},
0
);
tdbPageCopy
(
pNews
[
0
],
pParent
,
1
);
tdbPageCopy
(
pNews
[
0
],
pParent
,
1
);
if
(
!
TDB_BTREE_PAGE_IS_LEAF
(
pNews
[
0
]))
{
((
SIntHdr
*
)(
pParent
->
pData
))
->
pgno
=
((
SIntHdr
*
)(
pNews
[
0
]
->
pData
))
->
pgno
;
}
}
}
for
(
int
i
=
0
;
i
<
3
;
i
++
)
{
for
(
int
i
=
0
;
i
<
3
;
i
++
)
{
...
...
source/libs/tdb/src/db/tdbPager.c
浏览文件 @
97ca0968
...
@@ -260,7 +260,7 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
...
@@ -260,7 +260,7 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
pPage
->
isDirty
=
0
;
pPage
->
isDirty
=
0
;
//
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
tRBTreeDrop
(
&
pPager
->
rbt
,
(
SRBTreeNode
*
)
pPage
);
tdbPCacheRelease
(
pPager
->
pCache
,
pPage
,
pTxn
);
tdbPCacheRelease
(
pPager
->
pCache
,
pPage
,
pTxn
);
}
}
...
@@ -353,7 +353,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
...
@@ -353,7 +353,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
pPage
->
isDirty
=
0
;
pPage
->
isDirty
=
0
;
//
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
tRBTreeDrop
(
&
pPager
->
rbt
,
(
SRBTreeNode
*
)
pPage
);
tdbPCacheRelease
(
pPager
->
pCache
,
pPage
,
pTxn
);
tdbPCacheRelease
(
pPager
->
pCache
,
pPage
,
pTxn
);
}
}
...
...
source/libs/wal/src/walMeta.c
浏览文件 @
97ca0968
...
@@ -268,7 +268,7 @@ int walRollFileInfo(SWal* pWal) {
...
@@ -268,7 +268,7 @@ int walRollFileInfo(SWal* pWal) {
char
*
walMetaSerialize
(
SWal
*
pWal
)
{
char
*
walMetaSerialize
(
SWal
*
pWal
)
{
char
buf
[
30
];
char
buf
[
30
];
ASSERT
(
pWal
->
fileInfoSet
);
ASSERT
(
pWal
->
fileInfoSet
);
int
sz
=
pWal
->
fileInfoSet
->
size
;
int
sz
=
taosArrayGetSize
(
pWal
->
fileInfoSet
)
;
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON
*
pMeta
=
cJSON_CreateObject
();
cJSON
*
pMeta
=
cJSON_CreateObject
();
cJSON
*
pFiles
=
cJSON_CreateArray
();
cJSON
*
pFiles
=
cJSON_CreateArray
();
...
@@ -384,8 +384,10 @@ static int walFindCurMetaVer(SWal* pWal) {
...
@@ -384,8 +384,10 @@ static int walFindCurMetaVer(SWal* pWal) {
int
code
=
regexec
(
&
walMetaRegexPattern
,
name
,
0
,
NULL
,
0
);
int
code
=
regexec
(
&
walMetaRegexPattern
,
name
,
0
,
NULL
,
0
);
if
(
code
==
0
)
{
if
(
code
==
0
)
{
sscanf
(
name
,
"meta-ver%d"
,
&
metaVer
);
sscanf
(
name
,
"meta-ver%d"
,
&
metaVer
);
wDebug
(
"vgId:%d, wal find current meta: %s is the meta file, ver %d"
,
pWal
->
cfg
.
vgId
,
name
,
metaVer
);
break
;
break
;
}
}
wDebug
(
"vgId:%d, wal find current meta: %s is not meta file"
,
pWal
->
cfg
.
vgId
,
name
);
}
}
taosCloseDir
(
&
pDir
);
taosCloseDir
(
&
pDir
);
regfree
(
&
walMetaRegexPattern
);
regfree
(
&
walMetaRegexPattern
);
...
@@ -422,6 +424,7 @@ int walLoadMeta(SWal* pWal) {
...
@@ -422,6 +424,7 @@ int walLoadMeta(SWal* pWal) {
// find existing meta file
// find existing meta file
int
metaVer
=
walFindCurMetaVer
(
pWal
);
int
metaVer
=
walFindCurMetaVer
(
pWal
);
if
(
metaVer
==
-
1
)
{
if
(
metaVer
==
-
1
)
{
wDebug
(
"vgId:%d wal find meta ver %d"
,
pWal
->
cfg
.
vgId
,
metaVer
);
return
-
1
;
return
-
1
;
}
}
char
fnameStr
[
WAL_FILE_LEN
];
char
fnameStr
[
WAL_FILE_LEN
];
...
...
source/util/src/terror.c
浏览文件 @
97ca0968
...
@@ -621,6 +621,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is m
...
@@ -621,6 +621,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is m
TAOS_DEFINE_ERROR
(
TSDB_CODE_RSMA_EMPTY_INFO
,
"Rsma info is empty"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RSMA_EMPTY_INFO
,
"Rsma info is empty"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RSMA_INVALID_SCHEMA
,
"Rsma invalid schema"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RSMA_INVALID_SCHEMA
,
"Rsma invalid schema"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RSMA_REGEX_MATCH
,
"Rsma regex match"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RSMA_REGEX_MATCH
,
"Rsma regex match"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RSMA_STREAM_STATE_OPEN
,
"Rsma stream state open"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RSMA_STREAM_STATE_COMMIT
,
"Rsma stream state commit"
)
//index
//index
TAOS_DEFINE_ERROR
(
TSDB_CODE_INDEX_REBUILDING
,
"Index is rebuilding"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_INDEX_REBUILDING
,
"Index is rebuilding"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录