Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e6ff854e
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e6ff854e
编写于
6月 11, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into fix/mnode
上级
30117293
c1addc44
变更
37
展开全部
隐藏空白更改
内联
并排
Showing
37 changed file
with
343 addition
and
2644 deletion
+343
-2644
include/common/tcommon.h
include/common/tcommon.h
+1
-0
include/common/tmsg.h
include/common/tmsg.h
+23
-40
include/common/tmsgdef.h
include/common/tmsgdef.h
+0
-1
include/util/taoserror.h
include/util/taoserror.h
+13
-3
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+1
-2
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+2
-2
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+27
-18
source/common/src/tmsg.c
source/common/src/tmsg.c
+12
-84
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+26
-25
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+31
-25
source/dnode/mnode/impl/src/mndSma.c
source/dnode/mnode/impl/src/mndSma.c
+37
-59
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+4
-0
source/dnode/vnode/CMakeLists.txt
source/dnode/vnode/CMakeLists.txt
+0
-1
source/dnode/vnode/src/inc/sma.h
source/dnode/vnode/src/inc/sma.h
+5
-58
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+1
-2
source/dnode/vnode/src/meta/metaSma.c
source/dnode/vnode/src/meta/metaSma.c
+3
-3
source/dnode/vnode/src/sma/sma.c
source/dnode/vnode/src/sma/sma.c
+1
-17
source/dnode/vnode/src/sma/smaEnv.c
source/dnode/vnode/src/sma/smaEnv.c
+2
-86
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+5
-5
source/dnode/vnode/src/sma/smaTDBImpl.c
source/dnode/vnode/src/sma/smaTDBImpl.c
+0
-130
source/dnode/vnode/src/sma/smaTimeRange.c
source/dnode/vnode/src/sma/smaTimeRange.c
+0
-1037
source/dnode/vnode/src/sma/smaTimeRange2.c
source/dnode/vnode/src/sma/smaTimeRange2.c
+18
-936
source/dnode/vnode/src/tq/tqPush.c
source/dnode/vnode/src/tq/tqPush.c
+0
-3
source/dnode/vnode/src/tsdb/tsdbCommit.c
source/dnode/vnode/src/tsdb/tsdbCommit.c
+1
-1
source/dnode/vnode/src/tsdb/tsdbMemTable.c
source/dnode/vnode/src/tsdb/tsdbMemTable.c
+2
-2
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+2
-4
source/dnode/vnode/test/tsdbSmaTest.cpp
source/dnode/vnode/test/tsdbSmaTest.cpp
+1
-1
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-1
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+21
-36
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+1
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+3
-3
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+7
-2
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+47
-33
source/libs/executor/src/tsort.c
source/libs/executor/src/tsort.c
+12
-2
source/libs/stream/src/streamDispatch.c
source/libs/stream/src/streamDispatch.c
+4
-2
source/util/src/terror.c
source/util/src/terror.c
+28
-18
tools/taos-tools
tools/taos-tools
+1
-1
未找到文件。
include/common/tcommon.h
浏览文件 @
e6ff854e
...
...
@@ -50,6 +50,7 @@ typedef enum EStreamType {
STREAM_INVERT
,
STREAM_REPROCESS
,
STREAM_INVALID
,
STREAM_GET_ALL
,
}
EStreamType
;
typedef
struct
{
...
...
include/common/tmsg.h
浏览文件 @
e6ff854e
...
...
@@ -1496,6 +1496,7 @@ typedef struct {
#define STREAM_TRIGGER_AT_ONCE 1
#define STREAM_TRIGGER_WINDOW_CLOSE 2
#define STREAM_TRIGGER_MAX_DELAY 3
typedef
struct
{
char
name
[
TSDB_TABLE_FNAME_LEN
];
...
...
@@ -2335,24 +2336,26 @@ typedef struct {
}
SVgEpSet
;
typedef
struct
{
int8_t
version
;
// for compatibility(default 0)
int8_t
intervalUnit
;
// MACRO: TIME_UNIT_XXX
int8_t
slidingUnit
;
// MACRO: TIME_UNIT_XXX
int8_t
timezoneInt
;
// sma data expired if timezone changes.
int32_t
dstVgId
;
char
indexName
[
TSDB_INDEX_NAME_LEN
];
int32_t
exprLen
;
int32_t
tagsFilterLen
;
int32_t
numOfVgroups
;
int64_t
indexUid
;
tb_uid_t
tableUid
;
// super/child/common table uid
int64_t
interval
;
int64_t
offset
;
// use unit by precision of DB
int64_t
sliding
;
char
*
expr
;
// sma expression
char
*
tagsFilter
;
SVgEpSet
*
pVgEpSet
;
}
STSma
;
// Time-range-wise SMA
int8_t
version
;
// for compatibility(default 0)
int8_t
intervalUnit
;
// MACRO: TIME_UNIT_XXX
int8_t
slidingUnit
;
// MACRO: TIME_UNIT_XXX
int8_t
timezoneInt
;
// sma data expired if timezone changes.
int32_t
dstVgId
;
char
indexName
[
TSDB_INDEX_NAME_LEN
];
int32_t
exprLen
;
int32_t
tagsFilterLen
;
int64_t
indexUid
;
tb_uid_t
tableUid
;
// super/child/common table uid
tb_uid_t
dstTbUid
;
// for dstVgroup
int64_t
interval
;
int64_t
offset
;
// use unit by precision of DB
int64_t
sliding
;
char
*
dstTbName
;
// for dstVgroup
char
*
expr
;
// sma expression
char
*
tagsFilter
;
SSchemaWrapper
schemaRow
;
// for dstVgroup
SSchemaWrapper
schemaTag
;
// for dstVgroup
}
STSma
;
// Time-range-wise SMA
typedef
STSma
SVCreateTSmaReq
;
...
...
@@ -2436,27 +2439,6 @@ static int32_t tDecodeTSmaWrapper(SDecoder* pDecoder, STSmaWrapper* pReq) {
return
0
;
}
typedef
struct
{
int64_t
indexUid
;
STimeWindow
queryWindow
;
}
SVGetTsmaExpWndsReq
;
#define SMA_WNDS_EXPIRE_FLAG (0x1)
#define SMA_WNDS_IS_EXPIRE(flag) (((flag)&SMA_WNDS_EXPIRE_FLAG) != 0)
#define SMA_WNDS_SET_EXPIRE(flag) ((flag) |= SMA_WNDS_EXPIRE_FLAG)
typedef
struct
{
int64_t
indexUid
;
int8_t
flags
;
// 0x1 all window expired
int32_t
numExpWnds
;
TSKEY
wndSKeys
[];
}
SVGetTsmaExpWndsRsp
;
int32_t
tEncodeSVGetTSmaExpWndsReq
(
SEncoder
*
pCoder
,
const
SVGetTsmaExpWndsReq
*
pReq
);
int32_t
tDecodeSVGetTsmaExpWndsReq
(
SDecoder
*
pCoder
,
SVGetTsmaExpWndsReq
*
pReq
);
int32_t
tEncodeSVGetTSmaExpWndsRsp
(
SEncoder
*
pCoder
,
const
SVGetTsmaExpWndsRsp
*
pReq
);
int32_t
tDecodeSVGetTsmaExpWndsRsp
(
SDecoder
*
pCoder
,
SVGetTsmaExpWndsRsp
*
pReq
);
typedef
struct
{
int
idx
;
}
SMCreateFullTextReq
;
...
...
@@ -2516,7 +2498,8 @@ typedef struct {
int32_t
tSerializeSTableIndexRsp
(
void
*
buf
,
int32_t
bufLen
,
const
STableIndexRsp
*
pRsp
);
int32_t
tDeserializeSTableIndexRsp
(
void
*
buf
,
int32_t
bufLen
,
STableIndexRsp
*
pRsp
);
void
tFreeSTableIndexInfo
(
void
*
pInfo
);
void
tFreeSTableIndexInfo
(
void
*
pInfo
);
typedef
struct
{
int8_t
mqMsgType
;
...
...
include/common/tmsgdef.h
浏览文件 @
e6ff854e
...
...
@@ -190,7 +190,6 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_VND_CANCEL_SMA
,
"vnode-cancel-sma"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_DROP_SMA
,
"vnode-drop-sma"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_SUBMIT_RSMA
,
"vnode-submit-rsma"
,
SSubmitReq
,
SSubmitRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_GET_TSMA_EXP_WNDS
,
"vnode-get-tsma-expired-windows"
,
SVGetTsmaExpWndsReq
,
SVGetTsmaExpWndsRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_DELETE
,
"delete-data"
,
SVDeleteReq
,
SVDeleteRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_ALTER_CONFIG
,
"alter-config"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_ALTER_REPLICA
,
"alter-replica"
,
NULL
,
NULL
)
...
...
include/util/taoserror.h
浏览文件 @
e6ff854e
...
...
@@ -352,9 +352,6 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0619)
#define TSDB_CODE_TDB_TABLE_RECREATED TAOS_DEF_ERROR_CODE(0, 0x061A)
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x061B)
#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x061C)
#define TSDB_CODE_TDB_INVALID_SMA_STAT TAOS_DEF_ERROR_CODE(0, 0x061D)
#define TSDB_CODE_TDB_TSMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x061E)
// query
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700)
...
...
@@ -685,6 +682,19 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SML_INVALID_DATA TAOS_DEF_ERROR_CODE(0, 0x3002)
#define TSDB_CODE_SML_INVALID_DB_CONF TAOS_DEF_ERROR_CODE(0, 0x3003)
//tsma
#define TSDB_CODE_TSMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x3100)
#define TSDB_CODE_TSMA_NO_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x3101)
#define TSDB_CODE_TSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3102)
#define TSDB_CODE_TSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3103)
#define TSDB_CODE_TSMA_NO_INDEX_IN_CACHE TAOS_DEF_ERROR_CODE(0, 0x3104)
#define TSDB_CODE_TSMA_RM_SKEY_IN_HASH TAOS_DEF_ERROR_CODE(0, 0x3105)
//rsma
#define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150)
#define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151)
#ifdef __cplusplus
}
#endif
...
...
source/client/src/clientImpl.c
浏览文件 @
e6ff854e
...
...
@@ -689,16 +689,15 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
.
msgLen
=
ERROR_MSG_BUF_DEFAULT_SIZE
};
SAppInstInfo
*
pAppInfo
=
getAppInfo
(
pRequest
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
qCreateQueryPlan
(
&
cxt
,
&
pRequest
->
body
.
pDag
,
pNodeList
);
tscError
(
"0x%"
PRIx64
" failed to create query plan, code:%s 0x%"
PRIx64
,
pRequest
->
self
,
tstrerror
(
code
),
pRequest
->
requestId
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
schedulerAsyncExecJob
(
pAppInfo
->
pTransporter
,
pNodeList
,
pRequest
->
body
.
pDag
,
&
pRequest
->
body
.
queryJob
,
pRequest
->
sqlstr
,
pRequest
->
metric
.
start
,
schedulerExecCb
,
pRequest
);
}
else
{
tscError
(
"0x%"
PRIx64
" failed to create query plan, code:%s 0x%"
PRIx64
,
pRequest
->
self
,
tstrerror
(
code
),
pRequest
->
requestId
);
pRequest
->
body
.
queryFp
(
pRequest
->
body
.
param
,
pRequest
,
code
);
}
...
...
source/client/test/clientTests.cpp
浏览文件 @
e6ff854e
...
...
@@ -778,9 +778,9 @@ TEST(testCase, async_api_test) {
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
ASSERT_NE
(
pConn
,
nullptr
);
taos_query
(
pConn
,
"use
t
est"
);
taos_query
(
pConn
,
"use
n
est"
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"
desc abc1.tu
"
);
TAOS_RES
*
pRes
=
taos_query
(
pConn
,
"
select NOW() from (select * from regular_table_2 where tbname in ('regular_table_2_1') and q_bigint <= 9223372036854775807 and q_tinyint <= 127 and q_bool in ( true , false) ) order by ts;
"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
...
...
source/common/src/tdatablock.c
浏览文件 @
e6ff854e
...
...
@@ -294,7 +294,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, in
int32_t
colDataAssign
(
SColumnInfoData
*
pColumnInfoData
,
const
SColumnInfoData
*
pSource
,
int32_t
numOfRows
)
{
ASSERT
(
pColumnInfoData
!=
NULL
&&
pSource
!=
NULL
&&
pColumnInfoData
->
info
.
type
==
pSource
->
info
.
type
);
if
(
numOfRows
=
=
0
)
{
if
(
numOfRows
<
=
0
)
{
return
numOfRows
;
}
...
...
@@ -1239,6 +1239,9 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
return
NULL
;
}
if
(
pSrc
->
pData
==
NULL
)
{
continue
;
}
colDataAssign
(
pDst
,
pSrc
,
pDataBlock
->
info
.
rows
);
}
...
...
@@ -1631,25 +1634,31 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
break
;
default:
if
(
pColInfoData
->
info
.
type
<
TSDB_DATA_TYPE_MAX
&&
pColInfoData
->
info
.
type
>
TSDB_DATA_TYPE_NULL
)
{
char
tv
[
8
]
=
{
0
};
if
(
pColInfoData
->
info
.
type
==
TSDB_DATA_TYPE_FLOAT
)
{
float
v
=
0
;
GET_TYPED_DATA
(
v
,
float
,
pColInfoData
->
info
.
type
,
var
);
SET_TYPED_DATA
(
&
tv
,
pCol
->
type
,
v
);
}
else
if
(
pColInfoData
->
info
.
type
==
TSDB_DATA_TYPE_DOUBLE
)
{
double
v
=
0
;
GET_TYPED_DATA
(
v
,
double
,
pColInfoData
->
info
.
type
,
var
);
SET_TYPED_DATA
(
&
tv
,
pCol
->
type
,
v
);
}
else
if
(
IS_SIGNED_NUMERIC_TYPE
(
pColInfoData
->
info
.
type
))
{
int64_t
v
=
0
;
GET_TYPED_DATA
(
v
,
int64_t
,
pColInfoData
->
info
.
type
,
var
);
SET_TYPED_DATA
(
&
tv
,
pCol
->
type
,
v
);
if
(
pCol
->
type
==
pColInfoData
->
info
.
type
)
{
tdAppendColValToRow
(
&
rb
,
PRIMARYKEY_TIMESTAMP_COL_ID
+
k
,
pCol
->
type
,
TD_VTYPE_NORM
,
var
,
true
,
offset
,
k
);
}
else
{
uint64_t
v
=
0
;
GET_TYPED_DATA
(
v
,
uint64_t
,
pColInfoData
->
info
.
type
,
var
);
SET_TYPED_DATA
(
&
tv
,
pCol
->
type
,
v
);
char
tv
[
8
]
=
{
0
};
if
(
pColInfoData
->
info
.
type
==
TSDB_DATA_TYPE_FLOAT
)
{
float
v
=
0
;
GET_TYPED_DATA
(
v
,
float
,
pColInfoData
->
info
.
type
,
var
);
SET_TYPED_DATA
(
&
tv
,
pCol
->
type
,
v
);
}
else
if
(
pColInfoData
->
info
.
type
==
TSDB_DATA_TYPE_DOUBLE
)
{
double
v
=
0
;
GET_TYPED_DATA
(
v
,
double
,
pColInfoData
->
info
.
type
,
var
);
SET_TYPED_DATA
(
&
tv
,
pCol
->
type
,
v
);
}
else
if
(
IS_SIGNED_NUMERIC_TYPE
(
pColInfoData
->
info
.
type
))
{
int64_t
v
=
0
;
GET_TYPED_DATA
(
v
,
int64_t
,
pColInfoData
->
info
.
type
,
var
);
SET_TYPED_DATA
(
&
tv
,
pCol
->
type
,
v
);
}
else
{
uint64_t
v
=
0
;
GET_TYPED_DATA
(
v
,
uint64_t
,
pColInfoData
->
info
.
type
,
var
);
SET_TYPED_DATA
(
&
tv
,
pCol
->
type
,
v
);
}
tdAppendColValToRow
(
&
rb
,
PRIMARYKEY_TIMESTAMP_COL_ID
+
k
,
pCol
->
type
,
TD_VTYPE_NORM
,
tv
,
true
,
offset
,
k
);
}
tdAppendColValToRow
(
&
rb
,
PRIMARYKEY_TIMESTAMP_COL_ID
+
k
,
pCol
->
type
,
TD_VTYPE_NORM
,
tv
,
true
,
offset
,
k
);
}
else
{
uError
(
"the column type %"
PRIi16
" is undefined
\n
"
,
pColInfoData
->
info
.
type
);
TASSERT
(
0
);
...
...
source/common/src/tmsg.c
浏览文件 @
e6ff854e
...
...
@@ -3877,9 +3877,10 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) {
if
(
tEncodeCStr
(
pCoder
,
pSma
->
indexName
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pCoder
,
pSma
->
exprLen
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pCoder
,
pSma
->
tagsFilterLen
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pCoder
,
pSma
->
numOfVgroups
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pCoder
,
pSma
->
indexUid
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pCoder
,
pSma
->
tableUid
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pCoder
,
pSma
->
dstTbUid
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pCoder
,
pSma
->
dstTbName
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pCoder
,
pSma
->
interval
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pCoder
,
pSma
->
offset
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pCoder
,
pSma
->
sliding
)
<
0
)
return
-
1
;
...
...
@@ -3889,17 +3890,10 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) {
if
(
pSma
->
tagsFilterLen
>
0
)
{
if
(
tEncodeCStr
(
pCoder
,
pSma
->
tagsFilter
)
<
0
)
return
-
1
;
}
for
(
int32_t
v
=
0
;
v
<
pSma
->
numOfVgroups
;
++
v
)
{
if
(
tEncodeI32
(
pCoder
,
pSma
->
pVgEpSet
[
v
].
vgId
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pCoder
,
pSma
->
pVgEpSet
[
v
].
epSet
.
inUse
)
<
0
)
return
-
1
;
int8_t
numOfEps
=
pSma
->
pVgEpSet
[
v
].
epSet
.
numOfEps
;
if
(
tEncodeI8
(
pCoder
,
numOfEps
)
<
0
)
return
-
1
;
for
(
int32_t
n
=
0
;
n
<
numOfEps
;
++
n
)
{
const
SEp
*
pEp
=
&
pSma
->
pVgEpSet
[
v
].
epSet
.
eps
[
n
];
if
(
tEncodeCStr
(
pCoder
,
pEp
->
fqdn
)
<
0
)
return
-
1
;
if
(
tEncodeU16
(
pCoder
,
pEp
->
port
)
<
0
)
return
-
1
;
}
}
tEncodeSSchemaWrapper
(
pCoder
,
&
pSma
->
schemaRow
);
tEncodeSSchemaWrapper
(
pCoder
,
&
pSma
->
schemaTag
);
return
0
;
}
...
...
@@ -3907,14 +3901,15 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) {
if
(
tDecodeI8
(
pCoder
,
&
pSma
->
version
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pCoder
,
&
pSma
->
intervalUnit
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pCoder
,
&
pSma
->
slidingUnit
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pCoder
,
&
pSma
->
dstVgId
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pCoder
,
&
pSma
->
timezoneInt
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pCoder
,
&
pSma
->
dstVgId
)
<
0
)
return
-
1
;
if
(
tDecodeCStrTo
(
pCoder
,
pSma
->
indexName
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pCoder
,
&
pSma
->
exprLen
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pCoder
,
&
pSma
->
tagsFilterLen
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pCoder
,
&
pSma
->
numOfVgroups
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pCoder
,
&
pSma
->
indexUid
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pCoder
,
&
pSma
->
tableUid
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pCoder
,
&
pSma
->
dstTbUid
)
<
0
)
return
-
1
;
if
(
tDecodeCStr
(
pCoder
,
&
pSma
->
dstTbName
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pCoder
,
&
pSma
->
interval
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pCoder
,
&
pSma
->
offset
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pCoder
,
&
pSma
->
sliding
)
<
0
)
return
-
1
;
...
...
@@ -3928,27 +3923,9 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) {
}
else
{
pSma
->
tagsFilter
=
NULL
;
}
if
(
pSma
->
numOfVgroups
>
0
)
{
pSma
->
pVgEpSet
=
(
SVgEpSet
*
)
tDecoderMalloc
(
pCoder
,
pSma
->
numOfVgroups
*
sizeof
(
SVgEpSet
));
if
(
!
pSma
->
pVgEpSet
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
memset
(
pSma
->
pVgEpSet
,
0
,
pSma
->
numOfVgroups
*
sizeof
(
SVgEpSet
));
for
(
int32_t
v
=
0
;
v
<
pSma
->
numOfVgroups
;
++
v
)
{
if
(
tDecodeI32
(
pCoder
,
&
pSma
->
pVgEpSet
[
v
].
vgId
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pCoder
,
&
pSma
->
pVgEpSet
[
v
].
epSet
.
inUse
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pCoder
,
&
pSma
->
pVgEpSet
[
v
].
epSet
.
numOfEps
)
<
0
)
return
-
1
;
int8_t
numOfEps
=
pSma
->
pVgEpSet
[
v
].
epSet
.
numOfEps
;
for
(
int32_t
n
=
0
;
n
<
numOfEps
;
++
n
)
{
SEp
*
pEp
=
&
pSma
->
pVgEpSet
[
v
].
epSet
.
eps
[
n
];
if
(
tDecodeCStrTo
(
pCoder
,
pEp
->
fqdn
)
<
0
)
return
-
1
;
if
(
tDecodeU16
(
pCoder
,
&
pEp
->
port
)
<
0
)
return
-
1
;
}
}
}
// only needed in dstVgroup
tDecodeSSchemaWrapperEx
(
pCoder
,
&
pSma
->
schemaRow
);
tDecodeSSchemaWrapperEx
(
pCoder
,
&
pSma
->
schemaTag
);
return
0
;
}
...
...
@@ -3991,55 +3968,6 @@ int32_t tDecodeSVDropTSmaReq(SDecoder *pCoder, SVDropTSmaReq *pReq) {
return
0
;
}
int32_t
tEncodeSVGetTSmaExpWndsReq
(
SEncoder
*
pCoder
,
const
SVGetTsmaExpWndsReq
*
pReq
)
{
if
(
tStartEncode
(
pCoder
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pCoder
,
pReq
->
indexUid
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pCoder
,
pReq
->
queryWindow
.
skey
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pCoder
,
pReq
->
queryWindow
.
ekey
)
<
0
)
return
-
1
;
tEndEncode
(
pCoder
);
return
0
;
}
int32_t
tDecodeSVGetTsmaExpWndsReq
(
SDecoder
*
pCoder
,
SVGetTsmaExpWndsReq
*
pReq
)
{
if
(
tStartDecode
(
pCoder
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pCoder
,
&
pReq
->
indexUid
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pCoder
,
&
pReq
->
queryWindow
.
skey
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pCoder
,
&
pReq
->
queryWindow
.
ekey
)
<
0
)
return
-
1
;
tEndDecode
(
pCoder
);
return
0
;
}
int32_t
tEncodeSVGetTSmaExpWndsRsp
(
SEncoder
*
pCoder
,
const
SVGetTsmaExpWndsRsp
*
pReq
)
{
if
(
tStartEncode
(
pCoder
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pCoder
,
pReq
->
indexUid
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pCoder
,
pReq
->
flags
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pCoder
,
pReq
->
numExpWnds
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pReq
->
numExpWnds
;
++
i
)
{
if
(
tEncodeI64
(
pCoder
,
pReq
->
wndSKeys
[
i
])
<
0
)
return
-
1
;
}
tEndEncode
(
pCoder
);
return
0
;
}
int32_t
tDecodeSVGetTsmaExpWndsRsp
(
SDecoder
*
pCoder
,
SVGetTsmaExpWndsRsp
*
pReq
)
{
if
(
tStartDecode
(
pCoder
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pCoder
,
&
pReq
->
indexUid
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pCoder
,
&
pReq
->
flags
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pCoder
,
&
pReq
->
numExpWnds
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pReq
->
numExpWnds
;
++
i
)
{
if
(
tDecodeI64
(
pCoder
,
&
pReq
->
wndSKeys
[
i
])
<
0
)
return
-
1
;
}
tEndDecode
(
pCoder
);
return
0
;
}
int32_t
tSerializeSVDeleteReq
(
void
*
buf
,
int32_t
bufLen
,
SVDeleteReq
*
pReq
)
{
int32_t
headLen
=
sizeof
(
SMsgHead
);
if
(
buf
!=
NULL
)
{
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
e6ff854e
...
...
@@ -298,31 +298,32 @@ typedef struct {
}
SVgObj
;
typedef
struct
{
char
name
[
TSDB_TABLE_FNAME_LEN
];
char
stb
[
TSDB_TABLE_FNAME_LEN
];
char
db
[
TSDB_DB_FNAME_LEN
];
int64_t
createdTime
;
int64_t
uid
;
int64_t
stbUid
;
int64_t
dbUid
;
int8_t
intervalUnit
;
int8_t
slidingUnit
;
int8_t
timezone
;
int32_t
dstVgId
;
// for stream
int64_t
dstTbUid
;
int64_t
interval
;
int64_t
offset
;
int64_t
sliding
;
int32_t
exprLen
;
// strlen + 1
int32_t
tagsFilterLen
;
int32_t
sqlLen
;
int32_t
astLen
;
int32_t
numOfVgroups
;
char
*
expr
;
char
*
tagsFilter
;
char
*
sql
;
char
*
ast
;
SVgEpSet
*
pVgEpSet
;
char
name
[
TSDB_TABLE_FNAME_LEN
];
char
stb
[
TSDB_TABLE_FNAME_LEN
];
char
db
[
TSDB_DB_FNAME_LEN
];
char
dstTbName
[
TSDB_TABLE_FNAME_LEN
];
int64_t
createdTime
;
int64_t
uid
;
int64_t
stbUid
;
int64_t
dbUid
;
int64_t
dstTbUid
;
int8_t
intervalUnit
;
int8_t
slidingUnit
;
int8_t
timezone
;
int32_t
dstVgId
;
// for stream
int64_t
interval
;
int64_t
offset
;
int64_t
sliding
;
int32_t
exprLen
;
// strlen + 1
int32_t
tagsFilterLen
;
int32_t
sqlLen
;
int32_t
astLen
;
char
*
expr
;
char
*
tagsFilter
;
char
*
sql
;
char
*
ast
;
SSchemaWrapper
schemaRow
;
// for dstVgroup
SSchemaWrapper
schemaTag
;
// for dstVgroup
}
SSmaObj
;
typedef
struct
{
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
e6ff854e
...
...
@@ -151,33 +151,36 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, STrans* pTrans, SStreamObj*
ASSERT
(
pDb
);
if
(
mndExtractDbInfo
(
pMnode
,
pDb
,
&
pTask
->
shuffleDispatcher
.
dbInfo
,
NULL
)
<
0
)
{
sdbRelease
(
pMnode
->
pSdb
,
pDb
);
SArray
*
pVgs
=
pTask
->
shuffleDispatcher
.
dbInfo
.
pVgroupInfos
;
int32_t
sz
=
taosArrayGetSize
(
pVgs
);
SArray
*
sinkLv
=
taosArrayGetP
(
pStream
->
tasks
,
0
);
int32_t
sinkLvSize
=
taosArrayGetSize
(
sinkLv
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SVgroupInfo
*
pVgInfo
=
taosArrayGet
(
pVgs
,
i
);
for
(
int32_t
j
=
0
;
j
<
sinkLvSize
;
j
++
)
{
SStreamTask
*
pLastLevelTask
=
taosArrayGetP
(
sinkLv
,
j
);
if
(
pLastLevelTask
->
nodeId
==
pVgInfo
->
vgId
)
{
pVgInfo
->
taskId
=
pLastLevelTask
->
taskId
;
break
;
}
ASSERT
(
0
);
return
-
1
;
}
sdbRelease
(
pMnode
->
pSdb
,
pDb
);
SArray
*
pVgs
=
pTask
->
shuffleDispatcher
.
dbInfo
.
pVgroupInfos
;
int32_t
sz
=
taosArrayGetSize
(
pVgs
);
SArray
*
sinkLv
=
taosArrayGetP
(
pStream
->
tasks
,
0
);
int32_t
sinkLvSize
=
taosArrayGetSize
(
sinkLv
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SVgroupInfo
*
pVgInfo
=
taosArrayGet
(
pVgs
,
i
);
for
(
int32_t
j
=
0
;
j
<
sinkLvSize
;
j
++
)
{
SStreamTask
*
pLastLevelTask
=
taosArrayGetP
(
sinkLv
,
j
);
if
(
pLastLevelTask
->
nodeId
==
pVgInfo
->
vgId
)
{
pVgInfo
->
taskId
=
pLastLevelTask
->
taskId
;
ASSERT
(
pVgInfo
->
taskId
!=
0
);
break
;
}
}
}
else
{
pTask
->
dispatchType
=
TASK_DISPATCH__FIXED
;
pTask
->
dispatchMsgType
=
TDMT_STREAM_TASK_DISPATCH
;
SArray
*
pArray
=
taosArrayGetP
(
pStream
->
tasks
,
0
);
// one sink only
ASSERT
(
taosArrayGetSize
(
pArray
)
==
1
);
SStreamTask
*
lastLevelTask
=
taosArrayGetP
(
pArray
,
0
);
pTask
->
fixedEpDispatcher
.
taskId
=
lastLevelTask
->
taskId
;
pTask
->
fixedEpDispatcher
.
nodeId
=
lastLevelTask
->
nodeId
;
pTask
->
fixedEpDispatcher
.
epSet
=
lastLevelTask
->
epSet
;
}
}
else
{
pTask
->
dispatchType
=
TASK_DISPATCH__FIXED
;
pTask
->
dispatchMsgType
=
TDMT_STREAM_TASK_DISPATCH
;
SArray
*
pArray
=
taosArrayGetP
(
pStream
->
tasks
,
0
);
// one sink only
ASSERT
(
taosArrayGetSize
(
pArray
)
==
1
);
SStreamTask
*
lastLevelTask
=
taosArrayGetP
(
pArray
,
0
);
pTask
->
fixedEpDispatcher
.
taskId
=
lastLevelTask
->
taskId
;
pTask
->
fixedEpDispatcher
.
nodeId
=
lastLevelTask
->
nodeId
;
pTask
->
fixedEpDispatcher
.
epSet
=
lastLevelTask
->
epSet
;
}
return
0
;
}
...
...
@@ -379,7 +382,10 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pFinalTask
->
inputType
=
TASK_INPUT_TYPE__DATA_BLOCK
;
// dispatch
mndAddDispatcherToInnerTask
(
pMnode
,
pTrans
,
pStream
,
pFinalTask
);
if
(
mndAddDispatcherToInnerTask
(
pMnode
,
pTrans
,
pStream
,
pFinalTask
)
<
0
)
{
qDestroyQueryPlan
(
pPlan
);
return
-
1
;
}
// exec
pFinalTask
->
execType
=
TASK_EXEC__PIPE
;
...
...
source/dnode/mnode/impl/src/mndSma.c
浏览文件 @
e6ff854e
...
...
@@ -26,6 +26,7 @@
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "parser.h"
#include "tname.h"
#define TSDB_SMA_VER_NUMBER 1
...
...
@@ -82,10 +83,12 @@ static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) {
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pSma
->
name
,
TSDB_TABLE_FNAME_LEN
,
_OVER
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pSma
->
stb
,
TSDB_TABLE_FNAME_LEN
,
_OVER
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pSma
->
db
,
TSDB_DB_FNAME_LEN
,
_OVER
)
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pSma
->
dstTbName
,
TSDB_DB_FNAME_LEN
,
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pSma
->
createdTime
,
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pSma
->
uid
,
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pSma
->
stbUid
,
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pSma
->
dbUid
,
_OVER
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pSma
->
dstTbUid
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pSma
->
intervalUnit
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pSma
->
slidingUnit
,
_OVER
)
SDB_SET_INT8
(
pRaw
,
dataPos
,
pSma
->
timezone
,
_OVER
)
...
...
@@ -147,10 +150,12 @@ static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pSma
->
name
,
TSDB_TABLE_FNAME_LEN
,
_OVER
)
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pSma
->
stb
,
TSDB_TABLE_FNAME_LEN
,
_OVER
)
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pSma
->
db
,
TSDB_DB_FNAME_LEN
,
_OVER
)
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pSma
->
dstTbName
,
TSDB_DB_FNAME_LEN
,
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pSma
->
createdTime
,
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pSma
->
uid
,
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pSma
->
stbUid
,
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pSma
->
dbUid
,
_OVER
)
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pSma
->
dstTbUid
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
pSma
->
intervalUnit
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
pSma
->
slidingUnit
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
pSma
->
timezone
,
_OVER
)
...
...
@@ -260,14 +265,17 @@ static void *mndBuildVCreateSmaReq(SMnode *pMnode, SVgObj *pVgroup, SSmaObj *pSm
req
.
tagsFilterLen
=
pSma
->
tagsFilterLen
;
req
.
indexUid
=
pSma
->
uid
;
req
.
tableUid
=
pSma
->
stbUid
;
req
.
dstVgId
=
pSma
->
dstVgId
;
req
.
dstTbUid
=
pSma
->
dstTbUid
;
req
.
interval
=
pSma
->
interval
;
req
.
offset
=
pSma
->
offset
;
req
.
sliding
=
pSma
->
sliding
;
req
.
expr
=
pSma
->
expr
;
req
.
tagsFilter
=
pSma
->
tagsFilter
;
req
.
numOfVgroups
=
pSma
->
numOfVgroups
;
req
.
pVgEpSet
=
pSma
->
pVgEpSet
;
req
.
schemaRow
=
pSma
->
schemaRow
;
req
.
schemaTag
=
pSma
->
schemaTag
;
req
.
dstTbName
=
pSma
->
dstTbName
;
// get length
int32_t
ret
=
0
;
tEncodeSize
(
tEncodeSVCreateTSmaReq
,
&
req
,
contLen
,
ret
);
...
...
@@ -425,14 +433,30 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
mndReleaseDnode
(
pMnode
,
pDnode
);
// todo add sma info here
SVgEpSet
*
pVgEpSet
=
NULL
;
int32_t
numOfVgroups
=
0
;
if
(
mndSmaGetVgEpSet
(
pMnode
,
pDb
,
&
pVgEpSet
,
&
numOfVgroups
)
!=
0
)
{
SNode
*
pAst
=
NULL
;
if
(
nodesStringToNode
(
pSma
->
ast
,
&
pAst
)
<
0
)
{
return
-
1
;
}
if
(
qExtractResultSchema
(
pAst
,
&
pSma
->
schemaRow
.
nCols
,
&
pSma
->
schemaRow
.
pSchema
)
!=
0
)
{
nodesDestroyNode
(
pAst
);
return
-
1
;
}
nodesDestroyNode
(
pAst
);
pSma
->
schemaRow
.
version
=
1
;
// TODO: the schemaTag generated by qExtractResultXXX later.
pSma
->
schemaTag
.
nCols
=
1
;
pSma
->
schemaTag
.
version
=
1
;
pSma
->
schemaTag
.
pSchema
=
taosMemoryCalloc
(
1
,
sizeof
(
SSchema
));
if
(
!
pSma
->
schemaTag
.
pSchema
)
{
return
-
1
;
}
pSma
->
schemaTag
.
pSchema
[
0
].
type
=
TSDB_DATA_TYPE_BIGINT
;
pSma
->
schemaTag
.
pSchema
[
0
].
bytes
=
TYPE_BYTES
[
TSDB_DATA_TYPE_BIGINT
];
pSma
->
schemaTag
.
pSchema
[
0
].
colId
=
pSma
->
schemaRow
.
nCols
+
PRIMARYKEY_TIMESTAMP_COL_ID
;
pSma
->
schemaTag
.
pSchema
[
0
].
flags
=
0
;
snprintf
(
pSma
->
schemaTag
.
pSchema
[
0
].
name
,
TSDB_COL_NAME_LEN
,
"groupId"
);
pSma
->
pVgEpSet
=
pVgEpSet
;
pSma
->
numOfVgroups
=
numOfVgroups
;
int32_t
smaContLen
=
0
;
void
*
pSmaReq
=
mndBuildVCreateSmaReq
(
pMnode
,
pVgroup
,
pSma
,
&
smaContLen
);
...
...
@@ -464,12 +488,15 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
memcpy
(
smaObj
.
db
,
pDb
->
name
,
TSDB_DB_FNAME_LEN
);
smaObj
.
createdTime
=
taosGetTimestampMs
();
smaObj
.
uid
=
mndGenerateUid
(
pCreate
->
name
,
TSDB_TABLE_FNAME_LEN
);
char
resultTbName
[
TSDB_TABLE_FNAME_LEN
+
16
]
=
{
0
};
snprintf
(
resultTbName
,
TSDB_TABLE_FNAME_LEN
+
16
,
"td.tsma.rst.tb.%s"
,
pCreate
->
name
);
memcpy
(
smaObj
.
dstTbName
,
resultTbName
,
TSDB_TABLE_FNAME_LEN
);
smaObj
.
dstTbUid
=
mndGenerateUid
(
smaObj
.
dstTbName
,
TSDB_TABLE_FNAME_LEN
);
smaObj
.
stbUid
=
pStb
->
uid
;
smaObj
.
dbUid
=
pStb
->
dbUid
;
smaObj
.
intervalUnit
=
pCreate
->
intervalUnit
;
smaObj
.
slidingUnit
=
pCreate
->
slidingUnit
;
smaObj
.
timezone
=
pCreate
->
timezone
;
smaObj
.
dstVgId
=
pCreate
->
dstVgId
;
smaObj
.
interval
=
pCreate
->
interval
;
smaObj
.
offset
=
pCreate
->
offset
;
smaObj
.
sliding
=
pCreate
->
sliding
;
...
...
@@ -1087,53 +1114,4 @@ static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
static
void
mndCancelGetNextSma
(
SMnode
*
pMnode
,
void
*
pIter
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbCancelFetch
(
pSdb
,
pIter
);
}
static
int32_t
mndSmaGetVgEpSet
(
SMnode
*
pMnode
,
SDbObj
*
pDb
,
SVgEpSet
**
ppVgEpSet
,
int32_t
*
numOfVgroups
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SVgObj
*
pVgroup
=
NULL
;
void
*
pIter
=
NULL
;
SVgEpSet
*
pVgEpSet
=
NULL
;
int32_t
nAllocVgs
=
16
;
int32_t
nVgs
=
0
;
pVgEpSet
=
taosMemoryCalloc
(
nAllocVgs
,
sizeof
(
SVgEpSet
));
if
(
!
pVgEpSet
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pIter
==
NULL
)
break
;
if
(
pVgroup
->
dbUid
!=
pDb
->
uid
)
{
sdbRelease
(
pSdb
,
pVgroup
);
continue
;
}
if
(
nVgs
>=
nAllocVgs
)
{
void
*
p
=
taosMemoryRealloc
(
pVgEpSet
,
nAllocVgs
*
2
*
sizeof
(
SVgEpSet
));
if
(
!
p
)
{
taosMemoryFree
(
pVgEpSet
);
sdbCancelFetch
(
pSdb
,
pIter
);
sdbRelease
(
pSdb
,
pVgroup
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pVgEpSet
=
(
SVgEpSet
*
)
p
;
nAllocVgs
*=
2
;
}
(
pVgEpSet
+
nVgs
)
->
vgId
=
pVgroup
->
vgId
;
(
pVgEpSet
+
nVgs
)
->
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
++
nVgs
;
sdbRelease
(
pSdb
,
pVgroup
);
}
*
ppVgEpSet
=
pVgEpSet
;
*
numOfVgroups
=
nVgs
;
return
0
;
}
}
\ No newline at end of file
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
e6ff854e
...
...
@@ -252,8 +252,12 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
}
if
(
qExtractResultSchema
(
pAst
,
(
int32_t
*
)
&
pStream
->
outputSchema
.
nCols
,
&
pStream
->
outputSchema
.
pSchema
)
!=
0
)
{
nodesDestroyNode
(
pAst
);
return
-
1
;
}
// free
nodesDestroyNode
(
pAst
);
#if 0
printf("|");
...
...
source/dnode/vnode/CMakeLists.txt
浏览文件 @
e6ff854e
...
...
@@ -28,7 +28,6 @@ target_sources(
# sma
"src/sma/sma.c"
"src/sma/smaTDBImpl.c"
"src/sma/smaEnv.c"
"src/sma/smaOpen.c"
"src/sma/smaRollup.c"
...
...
source/dnode/vnode/src/inc/sma.h
浏览文件 @
e6ff854e
...
...
@@ -43,35 +43,17 @@ typedef struct SRSmaInfo SRSmaInfo;
struct
SSmaEnv
{
TdThreadRwlock
lock
;
int8_t
type
;
TXN
txn
;
void
*
pPool
;
// SPoolMem
SDiskID
did
;
TDB
*
dbEnv
;
// TODO: If it's better to put it in smaIndex level?
char
*
path
;
// relative path
SSmaStat
*
pStat
;
};
#define SMA_ENV_LOCK(env) ((env)->lock)
#define SMA_ENV_TYPE(env) ((env)->type)
#define SMA_ENV_DID(env) ((env)->did)
#define SMA_ENV_ENV(env) ((env)->dbEnv)
#define SMA_ENV_PATH(env) ((env)->path)
#define SMA_ENV_STAT(env) ((env)->pStat)
#define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems)
struct
SSmaStatItem
{
/**
* @brief The field 'state' is here to demonstrate if one smaIndex is ready to provide service.
* - TSDB_SMA_STAT_OK: 1) The sma calculation of history data is finished; 2) Or recevied information from
* Streaming Module or TSDB local persistence.
* - TSDB_SMA_STAT_EXPIRED: 1) If sma calculation of history TS data is not finished; 2) Or if the TSDB is open,
* without information about its previous state.
* - TSDB_SMA_STAT_DROPPED: 1)sma dropped
* N.B. only applicable to tsma
*/
int8_t
state
;
// ETsdbSmaStat
SHashObj
*
expiredWindows
;
// key: skey of time window, value: version
STSma
*
pTSma
;
// cache schema
int8_t
state
;
// ETsdbSmaStat
STSma
*
pTSma
;
// cache schema
};
struct
SSmaStat
{
...
...
@@ -84,29 +66,6 @@ struct SSmaStat {
#define SMA_STAT_ITEMS(s) ((s)->smaStatItems)
#define SMA_STAT_INFO_HASH(s) ((s)->rsmaInfoHash)
struct
SSmaKey
{
TSKEY
skey
;
int64_t
groupId
;
};
typedef
struct
SDBFile
SDBFile
;
struct
SDBFile
{
int32_t
fid
;
TTB
*
pDB
;
char
*
path
;
};
int32_t
tdSmaBeginCommit
(
SSmaEnv
*
pEnv
);
int32_t
tdSmaEndCommit
(
SSmaEnv
*
pEnv
);
int32_t
smaOpenDBEnv
(
TDB
**
ppEnv
,
const
char
*
path
);
int32_t
smaCloseDBEnv
(
TDB
*
pEnv
);
int32_t
smaOpenDBF
(
TDB
*
pEnv
,
SDBFile
*
pDBF
);
int32_t
smaCloseDBF
(
SDBFile
*
pDBF
);
int32_t
smaSaveSmaToDB
(
SDBFile
*
pDBF
,
void
*
pKey
,
int32_t
keyLen
,
void
*
pVal
,
int32_t
valLen
,
TXN
*
txn
);
void
*
smaGetSmaDataByKey
(
SDBFile
*
pDBF
,
const
void
*
pKey
,
int32_t
keyLen
,
int32_t
*
valLen
);
void
tdDestroySmaEnv
(
SSmaEnv
*
pSmaEnv
);
void
*
tdFreeSmaEnv
(
SSmaEnv
*
pSmaEnv
);
#if 0
...
...
@@ -114,13 +73,6 @@ int32_t tbGetTSmaStatus(SSma *pSma, STSma *param, void *result);
int32_t tbRemoveTSmaData(SSma *pSma, STSma *param, STimeWindow *pWin);
#endif
static
FORCE_INLINE
int32_t
tdEncodeTSmaKey
(
int64_t
groupId
,
TSKEY
tsKey
,
void
**
pData
)
{
int32_t
len
=
0
;
len
+=
taosEncodeFixedI64
(
pData
,
tsKey
);
len
+=
taosEncodeFixedI64
(
pData
,
groupId
);
return
len
;
}
int32_t
tdInitSma
(
SSma
*
pSma
);
int32_t
tdDropTSma
(
SSma
*
pSma
,
char
*
pMsg
);
int32_t
tdDropTSmaData
(
SSma
*
pSma
,
int64_t
indexUid
);
...
...
@@ -128,13 +80,11 @@ int32_t tdInsertRSmaData(SSma *pSma, char *msg);
int32_t
tdRefSmaStat
(
SSma
*
pSma
,
SSmaStat
*
pStat
);
int32_t
tdUnRefSmaStat
(
SSma
*
pSma
,
SSmaStat
*
pStat
);
int32_t
tdCheckAndInitSmaEnv
(
SSma
*
pSma
,
int8_t
smaType
);
int32_t
tdCheckAndInitSmaEnv
(
SSma
*
pSma
,
int8_t
smaType
,
bool
onlyCheck
);
int32_t
tdLockSma
(
SSma
*
pSma
);
int32_t
tdUnLockSma
(
SSma
*
pSma
);
int32_t
tdProcessTSmaInsertImpl
(
SSma
*
pSma
,
int64_t
indexUid
,
const
char
*
msg
);
static
FORCE_INLINE
int16_t
tdTSmaAdd
(
SSma
*
pSma
,
int16_t
n
)
{
return
atomic_add_fetch_16
(
&
SMA_TSMA_NUM
(
pSma
),
n
);
}
static
FORCE_INLINE
int16_t
tdTSmaSub
(
SSma
*
pSma
,
int16_t
n
)
{
return
atomic_sub_fetch_16
(
&
SMA_TSMA_NUM
(
pSma
),
n
);
}
...
...
@@ -219,11 +169,8 @@ static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SDisk
void
*
tdFreeRSmaInfo
(
SRSmaInfo
*
pInfo
);
int32_t
tdProcessTSmaCreateImpl
(
SSma
*
pSma
,
int64_t
version
,
const
char
*
pMsg
);
int32_t
tdUpdateExpiredWindowImpl
(
SSma
*
pSma
,
const
SSubmitReq
*
pMsg
,
int64_t
version
);
// TODO: This is the basic params, and should wrap the params to a queryHandle.
int32_t
tdGetTSmaDataImpl
(
SSma
*
pSma
,
char
*
pData
,
int64_t
indexUid
,
TSKEY
querySKey
,
int32_t
nMaxResult
);
int32_t
tdGetTSmaDaysImpl
(
SVnodeCfg
*
pCfg
,
void
*
pCont
,
uint32_t
contLen
,
int32_t
*
days
);
int32_t
tdProcessTSmaInsertImpl
(
SSma
*
pSma
,
int64_t
indexUid
,
const
char
*
msg
);
int32_t
tdProcessTSmaGetDaysImpl
(
SVnodeCfg
*
pCfg
,
void
*
pCont
,
uint32_t
contLen
,
int32_t
*
days
);
#ifdef __cplusplus
}
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
e6ff854e
...
...
@@ -150,7 +150,6 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t
smaOpen
(
SVnode
*
pVnode
);
int32_t
smaClose
(
SSma
*
pSma
);
int32_t
tdUpdateExpireWindow
(
SSma
*
pSma
,
const
SSubmitReq
*
pMsg
,
int64_t
version
);
int32_t
tdProcessTSmaCreate
(
SSma
*
pSma
,
int64_t
version
,
const
char
*
msg
);
int32_t
tdProcessTSmaInsert
(
SSma
*
pSma
,
int64_t
indexUid
,
const
char
*
msg
);
...
...
@@ -227,7 +226,7 @@ struct SVnode {
SQHandle
*
pQuery
;
};
#define TD_VID(PVNODE) (
PVNODE)->config.vgId
#define TD_VID(PVNODE) (
(PVNODE)->config.vgId)
#define VND_TSDB(vnd) ((vnd)->pTsdb)
#define VND_RSMA0(vnd) ((vnd)->pTsdb)
...
...
source/dnode/vnode/src/meta/metaSma.c
浏览文件 @
e6ff854e
...
...
@@ -34,13 +34,13 @@ int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) {
SMetaReader
mr
=
{
0
};
// validate req
// save smaIndex
metaReaderInit
(
&
mr
,
pMeta
,
0
);
if
(
metaGetTableEntryByUid
(
&
mr
,
pCfg
->
indexUid
)
==
0
)
{
// TODO: just for pass case
#if 1
terrno
=
TSDB_CODE_T
DB_T
SMA_ALREADY_EXIST
;
terrno
=
TSDB_CODE_TSMA_ALREADY_EXIST
;
metaReaderClear
(
&
mr
);
return
-
1
;
return
-
1
;
// don't goto _err;
#else
metaReaderClear
(
&
mr
);
return
0
;
...
...
source/dnode/vnode/src/sma/sma.c
浏览文件 @
e6ff854e
...
...
@@ -36,25 +36,9 @@ int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg) {
return
code
;
}
int32_t
tdUpdateExpireWindow
(
SSma
*
pSma
,
const
SSubmitReq
*
pMsg
,
int64_t
version
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
((
code
=
tdUpdateExpiredWindowImpl
(
pSma
,
pMsg
,
version
))
<
0
)
{
smaWarn
(
"vgId:%d, update expired sma window failed since %s"
,
SMA_VID
(
pSma
),
tstrerror
(
terrno
));
}
return
code
;
}
int32_t
tdGetTSmaData
(
SSma
*
pSma
,
char
*
pData
,
int64_t
indexUid
,
TSKEY
querySKey
,
int32_t
nMaxResult
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
((
code
=
tdGetTSmaDataImpl
(
pSma
,
pData
,
indexUid
,
querySKey
,
nMaxResult
))
<
0
)
{
smaWarn
(
"vgId:%d, get tsma data failed since %s"
,
SMA_VID
(
pSma
),
tstrerror
(
terrno
));
}
return
code
;
}
int32_t
smaGetTSmaDays
(
SVnodeCfg
*
pCfg
,
void
*
pCont
,
uint32_t
contLen
,
int32_t
*
days
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
((
code
=
td
GetTSma
DaysImpl
(
pCfg
,
pCont
,
contLen
,
days
))
<
0
)
{
if
((
code
=
td
ProcessTSmaGet
DaysImpl
(
pCfg
,
pCont
,
contLen
,
days
))
<
0
)
{
smaWarn
(
"vgId:%d, get tsma days failed since %s"
,
pCfg
->
vgId
,
tstrerror
(
terrno
));
}
smaDebug
(
"vgId:%d, get tsma days %d"
,
pCfg
->
vgId
,
*
days
);
...
...
source/dnode/vnode/src/sma/smaEnv.c
浏览文件 @
e6ff854e
...
...
@@ -151,31 +151,11 @@ static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path,
return
NULL
;
}
ASSERT
(
path
&&
(
strlen
(
path
)
>
0
));
SMA_ENV_PATH
(
pEnv
)
=
strdup
(
path
);
if
(
!
SMA_ENV_PATH
(
pEnv
))
{
tdFreeSmaEnv
(
pEnv
);
return
NULL
;
}
SMA_ENV_DID
(
pEnv
)
=
did
;
if
(
tdInitSmaStat
(
&
SMA_ENV_STAT
(
pEnv
),
smaType
)
!=
TSDB_CODE_SUCCESS
)
{
tdFreeSmaEnv
(
pEnv
);
return
NULL
;
}
char
aname
[
TSDB_FILENAME_LEN
]
=
{
0
};
tfsAbsoluteName
(
SMA_TFS
(
pSma
),
did
,
path
,
aname
);
if
(
smaOpenDBEnv
(
&
pEnv
->
dbEnv
,
aname
)
!=
TSDB_CODE_SUCCESS
)
{
tdFreeSmaEnv
(
pEnv
);
return
NULL
;
}
if
(
!
(
pEnv
->
pPool
=
openPool
()))
{
tdFreeSmaEnv
(
pEnv
);
return
NULL
;
}
return
pEnv
;
}
...
...
@@ -205,10 +185,7 @@ void tdDestroySmaEnv(SSmaEnv *pSmaEnv) {
if
(
pSmaEnv
)
{
tdDestroySmaState
(
pSmaEnv
->
pStat
,
SMA_ENV_TYPE
(
pSmaEnv
));
taosMemoryFreeClear
(
pSmaEnv
->
pStat
);
taosMemoryFreeClear
(
pSmaEnv
->
path
);
taosThreadRwlockDestroy
(
&
(
pSmaEnv
->
lock
));
smaCloseDBEnv
(
pSmaEnv
->
dbEnv
);
closePool
(
pSmaEnv
->
pPool
);
}
}
...
...
@@ -242,7 +219,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) {
}
/**
* 1. Lazy mode utilized when init SSmaStat to update expire
d
window(or hungry mode when tdNew).
* 1. Lazy mode utilized when init SSmaStat to update expire window(or hungry mode when tdNew).
* 2. Currently, there is mutex lock when init SSmaEnv, thus no need add lock on SSmaStat, and please add lock if
* tdInitSmaStat invoked in other multithread environment later.
*/
...
...
@@ -280,7 +257,6 @@ void *tdFreeSmaStatItem(SSmaStatItem *pSmaStatItem) {
if
(
pSmaStatItem
)
{
tDestroyTSma
(
pSmaStatItem
->
pTSma
);
taosMemoryFreeClear
(
pSmaStatItem
->
pTSma
);
taosHashCleanup
(
pSmaStatItem
->
expiredWindows
);
taosMemoryFreeClear
(
pSmaStatItem
);
}
return
NULL
;
...
...
@@ -341,7 +317,7 @@ int32_t tdUnLockSma(SSma *pSma) {
return
0
;
}
int32_t
tdCheckAndInitSmaEnv
(
SSma
*
pSma
,
int8_t
smaType
)
{
int32_t
tdCheckAndInitSmaEnv
(
SSma
*
pSma
,
int8_t
smaType
,
bool
onlyCheck
)
{
SSmaEnv
*
pEnv
=
NULL
;
// return if already init
...
...
@@ -399,63 +375,3 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) {
return
TSDB_CODE_SUCCESS
;
};
int32_t
tdSmaBeginCommit
(
SSmaEnv
*
pEnv
)
{
TXN
*
pTxn
=
&
pEnv
->
txn
;
// start a new txn
tdbTxnOpen
(
pTxn
,
0
,
poolMalloc
,
poolFree
,
pEnv
->
pPool
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
);
if
(
tdbBegin
(
pEnv
->
dbEnv
,
pTxn
)
!=
0
)
{
smaWarn
(
"tdSma tdb begin commit fail"
);
return
-
1
;
}
return
0
;
}
int32_t
tdSmaEndCommit
(
SSmaEnv
*
pEnv
)
{
TXN
*
pTxn
=
&
pEnv
->
txn
;
// Commit current txn
if
(
tdbCommit
(
pEnv
->
dbEnv
,
pTxn
)
!=
0
)
{
smaWarn
(
"tdSma tdb end commit fail"
);
return
-
1
;
}
tdbTxnClose
(
pTxn
);
clearPool
(
pEnv
->
pPool
);
return
0
;
}
#if 0
/**
* @brief Get the start TS key of the last data block of one interval/sliding.
*
* @param pSma
* @param param
* @param result
* @return int32_t
* 1) Return 0 and fill the result if the check procedure is normal;
* 2) Return -1 if error occurs during the check procedure.
*/
int32_t tdGetTSmaStatus(SSma *pSma, void *smaIndex, void *result) {
const char *procedure = "";
if (strncmp(procedure, "get the start TS key of the last data block", 100) != 0) {
return -1;
}
// fill the result
return TSDB_CODE_SUCCESS;
}
/**
* @brief Remove the tSma data files related to param between pWin.
*
* @param pSma
* @param param
* @param pWin
* @return int32_t
*/
int32_t tdRemoveTSmaData(SSma *pSma, void *smaIndex, STimeWindow *pWin) {
// for ("tSmaFiles of param-interval-sliding between pWin") {
// // remove the tSmaFile
// }
return TSDB_CODE_SUCCESS;
}
#endif
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
e6ff854e
...
...
@@ -65,7 +65,7 @@ static FORCE_INLINE int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SA
pRSmaInfo
=
taosHashGet
(
SMA_STAT_INFO_HASH
(
pStat
),
suid
,
sizeof
(
tb_uid_t
));
if
(
!
pRSmaInfo
||
!
(
pRSmaInfo
=
*
(
SRSmaInfo
**
)
pRSmaInfo
))
{
smaError
(
"vgId:%d, failed to get rsma info for uid:%"
PRIi64
,
SMA_VID
(
pSma
),
*
suid
);
terrno
=
TSDB_CODE_
TDB_INVALID_SMA
_STAT
;
terrno
=
TSDB_CODE_
RSMA_INVALID
_STAT
;
return
TSDB_CODE_FAILED
;
}
...
...
@@ -132,7 +132,7 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
SSmaStat
*
pStat
=
SMA_ENV_STAT
(
pEnv
);
SHashObj
*
infoHash
=
NULL
;
if
(
!
pStat
||
!
(
infoHash
=
SMA_STAT_INFO_HASH
(
pStat
)))
{
terrno
=
TSDB_CODE_
TDB_INVALID_SMA
_STAT
;
terrno
=
TSDB_CODE_
RSMA_INVALID
_STAT
;
return
TSDB_CODE_FAILED
;
}
...
...
@@ -167,13 +167,13 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
*/
int32_t
tdProcessRSmaCreate
(
SVnode
*
pVnode
,
SVCreateStbReq
*
pReq
)
{
SSma
*
pSma
=
pVnode
->
pSma
;
SMeta
*
pMeta
=
pVnode
->
pMeta
;
SMsgCb
*
pMsgCb
=
&
pVnode
->
msgCb
;
if
(
!
pReq
->
rollup
)
{
smaTrace
(
"vgId:%d, return directly since no rollup for stable %s %"
PRIi64
,
SMA_VID
(
pSma
),
pReq
->
name
,
pReq
->
suid
);
return
TSDB_CODE_SUCCESS
;
}
SMeta
*
pMeta
=
pVnode
->
pMeta
;
SMsgCb
*
pMsgCb
=
&
pVnode
->
msgCb
;
SRSmaParam
*
param
=
&
pReq
->
pRSmaParam
;
if
((
param
->
qmsg1Len
==
0
)
&&
(
param
->
qmsg2Len
==
0
))
{
...
...
@@ -181,7 +181,7 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
return
TSDB_CODE_SUCCESS
;
}
if
(
tdCheckAndInitSmaEnv
(
pSma
,
TSDB_SMA_TYPE_ROLLUP
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
tdCheckAndInitSmaEnv
(
pSma
,
TSDB_SMA_TYPE_ROLLUP
,
false
)
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
TSDB_CODE_TDB_INIT_FAILED
;
return
TSDB_CODE_FAILED
;
}
...
...
source/dnode/vnode/src/sma/smaTDBImpl.c
已删除
100644 → 0
浏览文件 @
30117293
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define ALLOW_FORBID_FUNC
#include "sma.h"
int32_t
smaOpenDBEnv
(
TDB
**
ppEnv
,
const
char
*
path
)
{
int
ret
=
0
;
if
(
path
==
NULL
)
return
-
1
;
ret
=
tdbOpen
(
path
,
4096
,
256
,
ppEnv
);
// use as param
if
(
ret
!=
0
)
{
smaError
(
"failed to create tsdb db env, ret = %d"
,
ret
);
return
-
1
;
}
return
0
;
}
int32_t
smaCloseDBEnv
(
TDB
*
pEnv
)
{
return
tdbClose
(
pEnv
);
}
static
inline
int
tdSmaKeyCmpr
(
const
void
*
arg1
,
int
len1
,
const
void
*
arg2
,
int
len2
)
{
const
SSmaKey
*
pKey1
=
(
const
SSmaKey
*
)
arg1
;
const
SSmaKey
*
pKey2
=
(
const
SSmaKey
*
)
arg2
;
ASSERT
(
len1
==
len2
&&
len1
==
sizeof
(
SSmaKey
));
if
(
pKey1
->
skey
<
pKey2
->
skey
)
{
return
-
1
;
}
else
if
(
pKey1
->
skey
>
pKey2
->
skey
)
{
return
1
;
}
if
(
pKey1
->
groupId
<
pKey2
->
groupId
)
{
return
-
1
;
}
else
if
(
pKey1
->
groupId
>
pKey2
->
groupId
)
{
return
1
;
}
return
0
;
}
static
int32_t
smaOpenDBDb
(
TTB
**
ppDB
,
TDB
*
pEnv
,
const
char
*
pFName
)
{
tdb_cmpr_fn_t
compFunc
;
// Create a database
compFunc
=
tdSmaKeyCmpr
;
if
(
tdbTbOpen
(
pFName
,
-
1
,
-
1
,
compFunc
,
pEnv
,
ppDB
)
<
0
)
{
return
-
1
;
}
return
0
;
}
static
int32_t
smaCloseDBDb
(
TTB
*
pDB
)
{
return
tdbTbClose
(
pDB
);
}
int32_t
smaOpenDBF
(
TDB
*
pEnv
,
SDBFile
*
pDBF
)
{
// TEnv is shared by a group of SDBFile
if
(
!
pEnv
||
!
pDBF
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
-
1
;
}
// Open DBF
if
(
smaOpenDBDb
(
&
(
pDBF
->
pDB
),
pEnv
,
pDBF
->
path
)
<
0
)
{
smaError
(
"failed to open DBF: %s"
,
pDBF
->
path
);
smaCloseDBDb
(
pDBF
->
pDB
);
return
-
1
;
}
return
0
;
}
int32_t
smaCloseDBF
(
SDBFile
*
pDBF
)
{
int32_t
ret
=
0
;
if
(
pDBF
->
pDB
)
{
ret
=
smaCloseDBDb
(
pDBF
->
pDB
);
pDBF
->
pDB
=
NULL
;
}
taosMemoryFreeClear
(
pDBF
->
path
);
return
ret
;
}
int32_t
smaSaveSmaToDB
(
SDBFile
*
pDBF
,
void
*
pKey
,
int32_t
keyLen
,
void
*
pVal
,
int32_t
valLen
,
TXN
*
txn
)
{
int32_t
ret
;
printf
(
"save tsma data into %s, keyLen:%d valLen:%d txn:%p
\n
"
,
pDBF
->
path
,
keyLen
,
valLen
,
txn
);
ret
=
tdbTbUpsert
(
pDBF
->
pDB
,
pKey
,
keyLen
,
pVal
,
valLen
,
txn
);
if
(
ret
<
0
)
{
smaError
(
"failed to upsert tsma data into db, ret = %d"
,
ret
);
return
-
1
;
}
return
0
;
}
void
*
smaGetSmaDataByKey
(
SDBFile
*
pDBF
,
const
void
*
pKey
,
int32_t
keyLen
,
int32_t
*
valLen
)
{
void
*
pVal
=
NULL
;
int
ret
;
ret
=
tdbTbGet
(
pDBF
->
pDB
,
pKey
,
keyLen
,
&
pVal
,
valLen
);
if
(
ret
<
0
)
{
smaError
(
"failed to get tsma data from db, ret = %d"
,
ret
);
return
NULL
;
}
ASSERT
(
*
valLen
>=
0
);
// TODO: lock?
// TODO: Would the key/value be destoryed during return the data?
// TODO: How about the key is updated while value length is changed? The original value buffer would be freed
// automatically?
return
pVal
;
}
\ No newline at end of file
source/dnode/vnode/src/sma/smaTimeRange.c
已删除
100644 → 0
浏览文件 @
30117293
此差异已折叠。
点击以展开。
source/dnode/vnode/src/sma/smaTimeRange2.c
浏览文件 @
e6ff854e
此差异已折叠。
点击以展开。
source/dnode/vnode/src/tq/tqPush.c
浏览文件 @
e6ff854e
...
...
@@ -238,9 +238,6 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
if
(
msgType
==
TDMT_VND_SUBMIT
)
{
if
(
taosHashGetSize
(
pTq
->
pStreamTasks
)
==
0
)
return
0
;
if
(
tdUpdateExpireWindow
(
pTq
->
pVnode
->
pSma
,
msg
,
ver
)
!=
0
)
{
// TODO handle sma error
}
void
*
data
=
taosMemoryMalloc
(
msgLen
);
if
(
data
==
NULL
)
{
return
-
1
;
...
...
source/dnode/vnode/src/tsdb/tsdbCommit.c
浏览文件 @
e6ff854e
...
...
@@ -150,7 +150,7 @@ int32_t tsdbCommit(STsdb *pTsdb) {
return
code
;
_err:
tsdbError
(
"vgId:%d failed to commit since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
tsdbError
(
"vgId:%d
,
failed to commit since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
tstrerror
(
code
));
return
code
;
}
...
...
source/dnode/vnode/src/tsdb/tsdbMemTable.c
浏览文件 @
e6ff854e
...
...
@@ -176,13 +176,13 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
pMemTable
->
nDelOp
++
;
tsdbError
(
"vgId:%d delete data from table suid:%"
PRId64
" uid:%"
PRId64
" skey:%"
PRId64
" eKey:%"
PRId64
tsdbError
(
"vgId:%d
,
delete data from table suid:%"
PRId64
" uid:%"
PRId64
" skey:%"
PRId64
" eKey:%"
PRId64
" since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
suid
,
uid
,
sKey
,
eKey
,
tstrerror
(
code
));
return
code
;
_err:
tsdbError
(
"vgId:%d failed to delete data from table suid:%"
PRId64
" uid:%"
PRId64
" skey:%"
PRId64
" eKey:%"
PRId64
tsdbError
(
"vgId:%d
,
failed to delete data from table suid:%"
PRId64
" uid:%"
PRId64
" skey:%"
PRId64
" eKey:%"
PRId64
" since %s"
,
TD_VID
(
pTsdb
->
pVnode
),
suid
,
uid
,
sKey
,
eKey
,
tstrerror
(
code
));
return
code
;
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
e6ff854e
...
...
@@ -225,6 +225,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace
(
"message in fetch queue is processing"
);
char
*
msgstr
=
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
));
int32_t
msgLen
=
pMsg
->
contLen
-
sizeof
(
SMsgHead
);
switch
(
pMsg
->
msgType
)
{
case
TDMT_VND_FETCH
:
return
qWorkerProcessFetchMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
,
0
);
...
...
@@ -236,13 +237,10 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
return
qWorkerProcessDropMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
,
0
);
case
TDMT_VND_QUERY_HEARTBEAT
:
return
qWorkerProcessHbMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
,
0
);
case
TDMT_VND_TABLE_META
:
return
vnodeGetTableMeta
(
pVnode
,
pMsg
);
case
TDMT_VND_CONSUME
:
return
tqProcessPollReq
(
pVnode
->
pTq
,
pMsg
,
pInfo
->
workerId
);
case
TDMT_STREAM_TASK_RUN
:
return
tqProcessTaskRunReq
(
pVnode
->
pTq
,
pMsg
);
case
TDMT_STREAM_TASK_DISPATCH
:
...
...
@@ -279,7 +277,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRp
void
smaHandleRes
(
void
*
pVnode
,
int64_t
smaId
,
const
SArray
*
data
)
{
// TODO
// blockDebugShowData(data);
// blockDebugShowData(data
, __func__
);
tdProcessTSmaInsert
(((
SVnode
*
)
pVnode
)
->
pSma
,
smaId
,
(
const
char
*
)
data
);
}
...
...
source/dnode/vnode/test/tsdbSmaTest.cpp
浏览文件 @
e6ff854e
...
...
@@ -373,7 +373,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
pTsdb
->
pTfs
=
tfsOpen
(
&
pDisks
,
numOfDisks
);
EXPECT_NE
(
pTsdb
->
pTfs
,
nullptr
);
// generate SSubmitReq msg and update expire
d
window
// generate SSubmitReq msg and update expire window
int16_t
schemaVer
=
0
;
uint32_t
mockRowLen
=
sizeof
(
STSRow
);
uint32_t
mockRowNum
=
2
;
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
e6ff854e
...
...
@@ -759,7 +759,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scan
int32_t
getBufferPgSize
(
int32_t
rowSize
,
uint32_t
*
defaultPgsz
,
uint32_t
*
defaultBufsz
);
void
doSetOperatorCompleted
(
SOperatorInfo
*
pOperator
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
,
bool
needFree
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
);
SqlFunctionCtx
*
createSqlFunctionCtx
(
SExprInfo
*
pExprInfo
,
int32_t
numOfOutput
,
int32_t
**
rowCellInfoOffset
);
void
relocateColumnData
(
SSDataBlock
*
pBlock
,
const
SArray
*
pColMatchInfo
,
SArray
*
pCols
);
void
initExecTimeWindowInfo
(
SColumnInfoData
*
pColData
,
STimeWindow
*
pQueryWindow
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
e6ff854e
...
...
@@ -1819,9 +1819,9 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO
}
}
static
void
extractQualifiedTupleByFilterResult
(
SSDataBlock
*
pBlock
,
const
int8_t
*
rowRes
,
bool
keep
,
bool
needFree
);
static
void
extractQualifiedTupleByFilterResult
(
SSDataBlock
*
pBlock
,
const
int8_t
*
rowRes
,
bool
keep
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
,
bool
needFree
)
{
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
)
{
if
(
pFilterNode
==
NULL
)
{
return
;
}
...
...
@@ -1840,30 +1840,29 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, bool needFree) {
bool
keep
=
filterExecute
(
filter
,
pBlock
,
&
rowRes
,
NULL
,
param1
.
numOfCols
);
filterFreeInfo
(
filter
);
extractQualifiedTupleByFilterResult
(
pBlock
,
rowRes
,
keep
,
needFree
);
extractQualifiedTupleByFilterResult
(
pBlock
,
rowRes
,
keep
);
blockDataUpdateTsWindow
(
pBlock
,
0
);
}
void
extractQualifiedTupleByFilterResult
(
SSDataBlock
*
pBlock
,
const
int8_t
*
rowRes
,
bool
keep
,
bool
needFree
)
{
void
extractQualifiedTupleByFilterResult
(
SSDataBlock
*
pBlock
,
const
int8_t
*
rowRes
,
bool
keep
)
{
if
(
keep
)
{
return
;
}
if
(
rowRes
!=
NULL
)
{
SSDataBlock
*
px
=
createOneDataBlock
(
pBlock
,
false
);
blockDataEnsureCapacity
(
px
,
pBlock
->
info
.
rows
);
SSDataBlock
*
px
=
createOneDataBlock
(
pBlock
,
true
);
int32_t
totalRows
=
pBlock
->
info
.
rows
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
numOfCols
;
++
i
)
{
SColumnInfoData
*
pDst
=
taosArrayGet
(
px
->
pDataBlock
,
i
);
SColumnInfoData
*
pSrc
=
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
SColumnInfoData
*
pSrc
=
taosArrayGet
(
px
->
pDataBlock
,
i
);
SColumnInfoData
*
pDst
=
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
// it is a reserved column for scalar function, and no data in this column yet.
if
(
p
Src
->
pData
==
NULL
)
{
if
(
p
Dst
->
pData
==
NULL
)
{
continue
;
}
colInfoDataCleanup
(
pDst
,
pBlock
->
info
.
rows
);
int32_t
numOfRows
=
0
;
for
(
int32_t
j
=
0
;
j
<
totalRows
;
++
j
)
{
if
(
rowRes
[
j
]
==
0
)
{
...
...
@@ -1883,20 +1882,8 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
}
else
{
ASSERT
(
pBlock
->
info
.
rows
==
numOfRows
);
}
SColumnInfoData
tmp
=
*
pSrc
;
*
pSrc
=
*
pDst
;
*
pDst
=
tmp
;
if
(
!
needFree
)
{
if
(
IS_VAR_DATA_TYPE
(
pDst
->
info
.
type
))
{
// this elements do not need free
pDst
->
varmeta
.
offset
=
NULL
;
}
else
{
pDst
->
nullbitmap
=
NULL
;
}
pDst
->
pData
=
NULL
;
}
}
blockDataDestroy
(
px
);
// fix memory leak
}
else
{
// do nothing
...
...
@@ -2137,11 +2124,6 @@ static int32_t compressQueryColData(SColumnInfoData* pColRes, int32_t numOfRows,
}
int32_t
doFillTimeIntervalGapsInResults
(
struct
SFillInfo
*
pFillInfo
,
SSDataBlock
*
pBlock
,
int32_t
capacity
)
{
// for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
// SColumnInfoData* pColInfoData = taosArrayGet(pOutput->pDataBlock, i);
// p[i] = pColInfoData->pData + (pColInfoData->info.bytes * pOutput->info.rows);
// }
int32_t
numOfRows
=
(
int32_t
)
taosFillResultDataBlock
(
pFillInfo
,
pBlock
,
capacity
-
pBlock
->
info
.
rows
);
pBlock
->
info
.
rows
+=
numOfRows
;
...
...
@@ -2896,7 +2878,6 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
return
seqLoadRemoteData
(
pOperator
);
}
else
{
return
concurrentlyLoadRemoteDataImpl
(
pOperator
,
pExchangeInfo
,
pTaskInfo
);
// return concurrentlyLoadRemoteData(pOperator);
}
}
...
...
@@ -2922,9 +2903,14 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) {
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
initExchangeOperator
(
SExchangePhysiNode
*
pExNode
,
SExchangeInfo
*
pInfo
)
{
static
int32_t
initExchangeOperator
(
SExchangePhysiNode
*
pExNode
,
SExchangeInfo
*
pInfo
,
const
char
*
id
)
{
size_t
numOfSources
=
LIST_LENGTH
(
pExNode
->
pSrcEndPoints
);
if
(
numOfSources
==
0
)
{
qError
(
"%s invalid number: %d of sources in exchange operator"
,
id
,
(
int32_t
)
numOfSources
);
return
TSDB_CODE_INVALID_PARA
;
}
pInfo
->
pSources
=
taosArrayInit
(
numOfSources
,
sizeof
(
SDownstreamSourceNode
));
pInfo
->
pSourceDataInfo
=
taosArrayInit
(
numOfSources
,
sizeof
(
SSourceDataInfo
));
if
(
pInfo
->
pSourceDataInfo
==
NULL
||
pInfo
->
pSources
==
NULL
)
{
...
...
@@ -2946,7 +2932,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
goto
_error
;
}
int32_t
code
=
initExchangeOperator
(
pExNode
,
pInfo
);
int32_t
code
=
initExchangeOperator
(
pExNode
,
pInfo
,
GET_TASKID
(
pTaskInfo
)
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
...
...
@@ -2975,7 +2961,7 @@ _error:
taosMemoryFreeClear
(
pInfo
);
taosMemoryFreeClear
(
pOperator
);
pTaskInfo
->
code
=
TSDB_CODE_OUT_OF_MEMORY
;
pTaskInfo
->
code
=
code
;
return
NULL
;
}
...
...
@@ -3744,7 +3730,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
longjmp
(
pTaskInfo
->
env
,
code
);
}
doFilter
(
pProjectInfo
->
pFilterNode
,
pBlock
,
true
);
doFilter
(
pProjectInfo
->
pFilterNode
,
pBlock
);
setInputDataBlock
(
pOperator
,
pInfo
->
pCtx
,
pBlock
,
order
,
scanFlag
,
false
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pInfo
->
pRes
->
info
.
rows
+
pBlock
->
info
.
rows
);
...
...
@@ -5263,7 +5249,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
(
*
pTaskInfo
)
->
pRoot
=
createOperatorTree
(
pPlan
->
pNode
,
*
pTaskInfo
,
pHandle
,
queryId
,
taskId
,
&
(
*
pTaskInfo
)
->
tableqinfoList
,
pPlan
->
pTagCond
);
if
(
NULL
==
(
*
pTaskInfo
)
->
pRoot
)
{
code
=
terrno
;
code
=
(
*
pTaskInfo
)
->
code
;
goto
_complete
;
}
...
...
@@ -5276,7 +5262,6 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
_complete:
taosMemoryFreeClear
(
*
pTaskInfo
);
terrno
=
code
;
return
code
;
}
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
e6ff854e
...
...
@@ -359,7 +359,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
while
(
1
)
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
doFilter
(
pInfo
->
pCondition
,
pRes
,
true
);
doFilter
(
pInfo
->
pCondition
,
pRes
);
bool
hasRemain
=
hashRemainDataInGroupInfo
(
&
pInfo
->
groupResInfo
);
if
(
!
hasRemain
)
{
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
e6ff854e
...
...
@@ -267,7 +267,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
}
int64_t
st
=
taosGetTimestampMs
();
doFilter
(
pTableScanInfo
->
pFilterNode
,
pBlock
,
false
);
doFilter
(
pTableScanInfo
->
pFilterNode
,
pBlock
);
int64_t
et
=
taosGetTimestampMs
();
pTableScanInfo
->
readRecorder
.
filterTime
+=
(
et
-
st
);
...
...
@@ -948,7 +948,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pInfo
->
pPseudoExpr
,
pInfo
->
numOfPseudoExpr
,
pInfo
->
pRes
);
}
doFilter
(
pInfo
->
pCondition
,
pInfo
->
pRes
,
false
);
doFilter
(
pInfo
->
pCondition
,
pInfo
->
pRes
);
blockDataUpdateTsWindow
(
pInfo
->
pRes
,
pInfo
->
primaryTsIndex
);
break
;
}
...
...
@@ -1716,7 +1716,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
}
pRes
->
info
.
rows
=
count
;
doFilter
(
pInfo
->
pFilterNode
,
pRes
,
true
);
doFilter
(
pInfo
->
pFilterNode
,
pRes
);
pOperator
->
resultInfo
.
totalRows
+=
pRes
->
info
.
rows
;
...
...
source/libs/executor/src/sortoperator.c
浏览文件 @
e6ff854e
...
...
@@ -296,7 +296,10 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
}
SSDataBlock
*
getMultiwaySortedBlockData
(
SSortHandle
*
pHandle
,
SSDataBlock
*
pDataBlock
,
int32_t
capacity
,
SArray
*
pColMatchInfo
,
SMultiwaySortMergeOperatorInfo
*
pInfo
)
{
SArray
*
pColMatchInfo
,
SOperatorInfo
*
pOperator
)
{
SMultiwaySortMergeOperatorInfo
*
pInfo
=
pOperator
->
info
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
blockDataCleanup
(
pDataBlock
);
SSDataBlock
*
p
=
tsortGetSortedDataBlock
(
pHandle
);
...
...
@@ -354,6 +357,8 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
}
blockDataDestroy
(
p
);
qDebug
(
"%s get sorted row blocks, rows:%d"
,
GET_TASKID
(
pTaskInfo
),
pDataBlock
->
info
.
rows
);
return
(
pDataBlock
->
info
.
rows
>
0
)
?
pDataBlock
:
NULL
;
}
...
...
@@ -371,7 +376,7 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
}
SSDataBlock
*
pBlock
=
getMultiwaySortedBlockData
(
pInfo
->
pSortHandle
,
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
,
pInfo
->
pColMatchInfo
,
p
Info
);
pOperator
->
resultInfo
.
capacity
,
pInfo
->
pColMatchInfo
,
p
Operator
);
if
(
pBlock
!=
NULL
)
{
pOperator
->
resultInfo
.
totalRows
+=
pBlock
->
info
.
rows
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
e6ff854e
...
...
@@ -750,14 +750,15 @@ int64_t getReskey(void* data, int32_t index) {
return
*
(
int64_t
*
)
pos
->
key
;
}
static
int32_t
saveResult
(
SResultRow
*
result
,
uint64_t
groupId
,
SArray
*
pUpdated
)
{
static
int32_t
saveResult
(
int64_t
ts
,
int32_t
pageId
,
int32_t
offset
,
uint64_t
groupId
,
SArray
*
pUpdated
)
{
int32_t
size
=
taosArrayGetSize
(
pUpdated
);
int32_t
index
=
binarySearch
(
pUpdated
,
size
,
result
->
win
.
skey
,
TSDB_ORDER_DESC
,
getReskey
);
int32_t
index
=
binarySearch
(
pUpdated
,
size
,
ts
,
TSDB_ORDER_DESC
,
getReskey
);
if
(
index
==
-
1
)
{
index
=
0
;
}
else
{
TSKEY
resTs
=
getReskey
(
pUpdated
,
index
);
if
(
resTs
<
result
->
win
.
skey
)
{
if
(
resTs
<
ts
)
{
index
++
;
}
else
{
return
TSDB_CODE_SUCCESS
;
...
...
@@ -769,14 +770,18 @@ static int32_t saveResult(SResultRow* result, uint64_t groupId, SArray* pUpdated
return
TSDB_CODE_OUT_OF_MEMORY
;
}
newPos
->
groupId
=
groupId
;
newPos
->
pos
=
(
SResultRowPosition
){.
pageId
=
result
->
pageId
,
.
offset
=
result
->
offset
};
*
(
int64_t
*
)
newPos
->
key
=
result
->
win
.
skey
;
newPos
->
pos
=
(
SResultRowPosition
){.
pageId
=
pageId
,
.
offset
=
offset
};
*
(
int64_t
*
)
newPos
->
key
=
ts
;
if
(
taosArrayInsert
(
pUpdated
,
index
,
&
newPos
)
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
saveResultRow
(
SResultRow
*
result
,
uint64_t
groupId
,
SArray
*
pUpdated
)
{
return
saveResult
(
result
->
win
.
skey
,
result
->
pageId
,
result
->
offset
,
groupId
,
pUpdated
);
}
static
void
removeResult
(
SArray
*
pUpdated
,
TSKEY
key
)
{
int32_t
size
=
taosArrayGetSize
(
pUpdated
);
int32_t
index
=
binarySearch
(
pUpdated
,
size
,
key
,
TSDB_ORDER_DESC
,
getReskey
);
...
...
@@ -819,7 +824,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if
(
pInfo
->
execModel
==
OPTR_EXEC_MODEL_STREAM
)
{
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
)
{
saveResult
(
pResult
,
tableGroupId
,
pUpdated
);
saveResult
Row
(
pResult
,
tableGroupId
,
pUpdated
);
}
}
...
...
@@ -869,7 +874,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if
(
pInfo
->
execModel
==
OPTR_EXEC_MODEL_STREAM
)
{
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
)
{
saveResult
(
pResult
,
tableGroupId
,
pUpdated
);
saveResult
Row
(
pResult
,
tableGroupId
,
pUpdated
);
}
}
...
...
@@ -1243,6 +1248,23 @@ static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, SInterva
}
}
static
int32_t
getAllIntervalWindow
(
SHashObj
*
pHashMap
,
SArray
*
resWins
)
{
void
*
pIte
=
NULL
;
size_t
keyLen
=
0
;
while
((
pIte
=
taosHashIterate
(
pHashMap
,
pIte
))
!=
NULL
)
{
void
*
key
=
taosHashGetKey
(
pIte
,
&
keyLen
);
uint64_t
groupId
=
*
(
uint64_t
*
)
key
;
ASSERT
(
keyLen
==
GET_RES_WINDOW_KEY_LEN
(
sizeof
(
TSKEY
)));
TSKEY
ts
=
*
(
int64_t
*
)((
char
*
)
key
+
sizeof
(
uint64_t
));
SResultRowPosition
*
pPos
=
(
SResultRowPosition
*
)
pIte
;
int32_t
code
=
saveResult
(
ts
,
pPos
->
pageId
,
pPos
->
offset
,
groupId
,
resWins
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
closeIntervalWindow
(
SHashObj
*
pHashMap
,
STimeWindowAggSupp
*
pSup
,
SInterval
*
pInterval
,
SArray
*
closeWins
)
{
void
*
pIte
=
NULL
;
...
...
@@ -1259,16 +1281,12 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
char
keyBuf
[
GET_RES_WINDOW_KEY_LEN
(
sizeof
(
TSKEY
))];
SET_RES_WINDOW_KEY
(
keyBuf
,
&
ts
,
sizeof
(
TSKEY
),
groupId
);
taosHashRemove
(
pHashMap
,
keyBuf
,
keyLen
);
SResKeyPos
*
pos
=
taosMemoryMalloc
(
sizeof
(
SResKeyPos
)
+
sizeof
(
uint64_t
));
if
(
pos
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pos
->
groupId
=
groupId
;
pos
->
pos
=
*
(
SResultRowPosition
*
)
pIte
;
*
(
int64_t
*
)
pos
->
key
=
ts
;
if
(
pSup
->
calTrigger
==
STREAM_TRIGGER_WINDOW_CLOSE
&&
!
taosArrayPush
(
closeWins
,
&
pos
))
{
taosMemoryFree
(
pos
);
return
TSDB_CODE_OUT_OF_MEMORY
;
SResultRowPosition
*
pPos
=
(
SResultRowPosition
*
)
pIte
;
if
(
pSup
->
calTrigger
==
STREAM_TRIGGER_WINDOW_CLOSE
)
{
int32_t
code
=
saveResult
(
ts
,
pPos
->
pageId
,
pPos
->
offset
,
groupId
,
closeWins
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
}
}
}
...
...
@@ -1296,8 +1314,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
SArray
*
pUpdated
=
taosArrayInit
(
4
,
POINTER_BYTES
);
SArray
*
pClosed
=
taosArrayInit
(
4
,
POINTER_BYTES
);
while
(
1
)
{
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
if
(
pBlock
==
NULL
)
{
...
...
@@ -1316,19 +1332,18 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
doClearWindows
(
&
pInfo
->
aggSup
,
&
pInfo
->
binfo
,
&
pInfo
->
interval
,
0
,
pOperator
->
numOfExprs
,
pBlock
,
NULL
);
qDebug
(
"%s clear existed time window results for updates checked"
,
GET_TASKID
(
pTaskInfo
));
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_GET_ALL
&&
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_MAX_DELAY
)
{
getAllIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
pUpdated
);
continue
;
}
pInfo
->
twAggSup
.
maxTs
=
TMAX
(
pInfo
->
twAggSup
.
maxTs
,
pBlock
->
info
.
window
.
ekey
);
hashIntervalAgg
(
pOperator
,
&
pInfo
->
binfo
.
resultRowInfo
,
pBlock
,
MAIN_SCAN
,
pUpdated
);
}
closeIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
&
pInfo
->
twAggSup
,
&
pInfo
->
interval
,
pUpdated
);
closeIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
&
pInfo
->
twAggSup
,
&
pInfo
->
interval
,
pClosed
);
finalizeUpdatedResult
(
pOperator
->
numOfExprs
,
pInfo
->
aggSup
.
pResultBuf
,
pClosed
,
pInfo
->
binfo
.
rowCellInfoOffset
);
taosArrayAddAll
(
pUpdated
,
pClosed
);
taosArrayDestroy
(
pClosed
);
finalizeUpdatedResult
(
pOperator
->
numOfExprs
,
pInfo
->
aggSup
.
pResultBuf
,
pUpdated
,
pInfo
->
binfo
.
rowCellInfoOffset
);
initMultiResInfoFromArrayList
(
&
pInfo
->
groupResInfo
,
pUpdated
);
blockDataEnsureCapacity
(
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
);
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
...
...
@@ -1898,7 +1913,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pSDataBlock
->
info
,
tsCols
,
startPos
,
nextWin
.
ekey
,
binarySearchForKey
,
NULL
,
TSDB_ORDER_ASC
);
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
&&
pUpdated
)
{
saveResult
(
pResult
,
tableGroupId
,
pUpdated
);
saveResult
Row
(
pResult
,
tableGroupId
,
pUpdated
);
}
// window start(end) key interpolation
// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos,
...
...
@@ -1993,7 +2008,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo
*
pInfo
=
pOperator
->
info
;
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
SArray
*
pUpdated
=
taosArrayInit
(
4
,
POINTER_BYTES
);
SArray
*
pClosed
=
taosArrayInit
(
4
,
POINTER_BYTES
);
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
...
...
@@ -2042,7 +2056,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
copyUpdateDataBlock
(
pInfo
->
pUpdateRes
,
pBlock
,
pInfo
->
primaryTsIndex
);
taosArrayDestroy
(
pUpWins
);
break
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_GET_ALL
&&
isFinalInterval
(
pInfo
)
&&
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_MAX_DELAY
)
{
getAllIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
pUpdated
);
continue
;
}
if
(
isFinalInterval
(
pInfo
))
{
int32_t
chIndex
=
getChildIndex
(
pBlock
);
int32_t
size
=
taosArrayGetSize
(
pInfo
->
pChildren
);
...
...
@@ -2064,13 +2083,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
if
(
isFinalInterval
(
pInfo
))
{
closeIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
&
pInfo
->
twAggSup
,
&
pInfo
->
interval
,
pClosed
);
finalizeUpdatedResult
(
pOperator
->
numOfExprs
,
pInfo
->
aggSup
.
pResultBuf
,
pClosed
,
pInfo
->
binfo
.
rowCellInfoOffset
);
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_WINDOW_CLOSE
)
{
taosArrayAddAll
(
pUpdated
,
pClosed
);
}
closeIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
&
pInfo
->
twAggSup
,
&
pInfo
->
interval
,
pUpdated
);
}
taosArrayDestroy
(
pClosed
);
finalizeUpdatedResult
(
pOperator
->
numOfExprs
,
pInfo
->
aggSup
.
pResultBuf
,
pUpdated
,
pInfo
->
binfo
.
rowCellInfoOffset
);
initMultiResInfoFromArrayList
(
&
pInfo
->
groupResInfo
,
pUpdated
);
...
...
source/libs/executor/src/tsort.c
浏览文件 @
e6ff854e
...
...
@@ -227,6 +227,8 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
for
(
int32_t
i
=
0
;
i
<
cmpParam
->
numOfSources
;
++
i
)
{
SSortSource
*
pSource
=
cmpParam
->
pSources
[
i
];
pSource
->
src
.
pBlock
=
pHandle
->
fetchfp
(
pSource
->
param
);
// set current source id done
if
(
pSource
->
src
.
pBlock
==
NULL
)
{
pSource
->
src
.
rowIndex
=
-
1
;
++
pHandle
->
numOfCompletedSources
;
...
...
@@ -426,8 +428,16 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
double
sortPass
=
floorl
(
log2
(
numOfSources
)
/
log2
(
pHandle
->
numOfPages
));
pHandle
->
totalElapsed
=
taosGetTimestampUs
()
-
pHandle
->
startTs
;
qDebug
(
"%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"
PRIzu
", sort elapsed:%"
PRId64
", total elapsed:%"
PRId64
,
pHandle
->
idStr
,
(
int32_t
)
(
sortPass
+
1
),
pHandle
->
pBuf
?
getTotalBufSize
(
pHandle
->
pBuf
)
:
0
,
pHandle
->
sortElapsed
,
pHandle
->
totalElapsed
);
if
(
sortPass
>
0
)
{
size_t
s
=
pHandle
->
pBuf
?
getTotalBufSize
(
pHandle
->
pBuf
)
:
0
;
qDebug
(
"%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"
PRIzu
", sort elapsed:%"
PRId64
", total elapsed:%"
PRId64
,
pHandle
->
idStr
,
(
int32_t
)(
sortPass
+
1
),
s
,
pHandle
->
sortElapsed
,
pHandle
->
totalElapsed
);
}
else
{
qDebug
(
"%s ordered source:%"
PRIzu
", available buf:%d, no need internal sort"
,
pHandle
->
idStr
,
numOfSources
,
pHandle
->
numOfPages
);
}
int32_t
numOfRows
=
blockDataGetCapacityInRow
(
pHandle
->
pDataBlock
,
pHandle
->
pageSize
);
blockDataEnsureCapacity
(
pHandle
->
pDataBlock
,
numOfRows
);
...
...
source/libs/stream/src/streamDispatch.c
浏览文件 @
e6ff854e
...
...
@@ -100,7 +100,6 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
.
upstreamNodeId
=
pTask
->
nodeId
,
.
blockNum
=
blockNum
,
};
qInfo
(
"dispatch from task %d (child id %d)"
,
pTask
->
taskId
,
pTask
->
childId
);
req
.
data
=
taosArrayInit
(
blockNum
,
sizeof
(
void
*
));
req
.
dataLen
=
taosArrayInit
(
blockNum
,
sizeof
(
int32_t
));
...
...
@@ -142,11 +141,14 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
break
;
}
}
ASSERT
(
vgId
!=
0
);
}
ASSERT
(
vgId
!=
0
);
req
.
taskId
=
downstreamTaskId
;
qInfo
(
"dispatch from task %d (child id %d) to down stream task %d in vnode %d"
,
pTask
->
taskId
,
pTask
->
childId
,
downstreamTaskId
,
vgId
);
// serialize
int32_t
tlen
;
tEncodeSize
(
tEncodeStreamDispatchReq
,
&
req
,
tlen
,
code
);
...
...
source/util/src/terror.c
浏览文件 @
e6ff854e
...
...
@@ -353,9 +353,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag valu
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_NO_CACHE_LAST_ROW
,
"TSDB no cache last row data"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_TABLE_RECREATED
,
"Table re-created"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR
,
"TDB env open error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_NO_SMA_INDEX_IN_META
,
"No sma index in meta"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_INVALID_SMA_STAT
,
"Invalid sma state"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TDB_TSMA_ALREADY_EXIST
,
"TSMA already exists"
)
// query
...
...
@@ -537,25 +534,38 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_DELETE_WHERE, "The DELETE statemen
TAOS_DEFINE_ERROR
(
TSDB_CODE_PAR_INVALID_REDISTRIBUTE_VG
,
"The REDISTRIBUTE VGROUP statement only support 1 to 3 dnodes"
)
//planner
TAOS_DEFINE_ERROR
(
TSDB_CODE_PLAN_INTERNAL_ERROR
,
"Planner internal error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_PLAN_INTERNAL_ERROR
,
"Planner internal error"
)
//udf
TAOS_DEFINE_ERROR
(
TSDB_CODE_UDF_STOPPING
,
"udf is stopping"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_UDF_PIPE_READ_ERR
,
"udf pipe read error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_UDF_PIPE_CONNECT_ERR
,
"udf pipe connect error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_UDF_PIPE_NO_PIPE
,
"udf no pipe"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_UDF_LOAD_UDF_FAILURE
,
"udf load failure"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_UDF_INVALID_STATE
,
"udf invalid state"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_UDF_INVALID_INPUT
,
"udf invalid function input"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_UDF_NO_FUNC_HANDLE
,
"udf no function handle"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_UDF_INVALID_BUFSIZE
,
"udf invalid bufsize"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_UDF_INVALID_OUTPUT_TYPE
,
"udf invalid output type"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_UDF_STOPPING
,
"udf is stopping"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_UDF_PIPE_READ_ERR
,
"udf pipe read error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_UDF_PIPE_CONNECT_ERR
,
"udf pipe connect error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_UDF_PIPE_NO_PIPE
,
"udf no pipe"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_UDF_LOAD_UDF_FAILURE
,
"udf load failure"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_UDF_INVALID_STATE
,
"udf invalid state"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_UDF_INVALID_INPUT
,
"udf invalid function input"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_UDF_NO_FUNC_HANDLE
,
"udf no function handle"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_UDF_INVALID_BUFSIZE
,
"udf invalid bufsize"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_UDF_INVALID_OUTPUT_TYPE
,
"udf invalid output type"
)
//schemaless
TAOS_DEFINE_ERROR
(
TSDB_CODE_SML_INVALID_PROTOCOL_TYPE
,
"Invalid line protocol type"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SML_INVALID_PRECISION_TYPE
,
"Invalid timestamp precision type"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SML_INVALID_DATA
,
"Invalid data type"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SML_INVALID_DB_CONF
,
"Invalid schemaless db config"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SML_INVALID_PROTOCOL_TYPE
,
"Invalid line protocol type"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SML_INVALID_PRECISION_TYPE
,
"Invalid timestamp precision type"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SML_INVALID_DATA
,
"Invalid data type"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SML_INVALID_DB_CONF
,
"Invalid schemaless db config"
)
//tsma
TAOS_DEFINE_ERROR
(
TSDB_CODE_TSMA_ALREADY_EXIST
,
"Tsma already exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TSMA_NO_INDEX_IN_META
,
"No tsma index in meta"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TSMA_INVALID_ENV
,
"Invalid tsma env"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TSMA_INVALID_STAT
,
"Invalid tsma state"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TSMA_NO_INDEX_IN_CACHE
,
"No tsma index in cache"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TSMA_RM_SKEY_IN_HASH
,
"Rm tsma skey in cache"
)
//rsma
TAOS_DEFINE_ERROR
(
TSDB_CODE_RSMA_INVALID_ENV
,
"Invalid rsma env"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_RSMA_INVALID_STAT
,
"Invalid rsma state"
)
#ifdef TAOS_ERROR_C
};
...
...
taos-tools
@
717f5aaa
比较
932da0f4
...
717f5aaa
Subproject commit
932da0f4cac013c2eded824d1d4d01cfa6168fa3
Subproject commit
717f5aaa5f0a1b4d92bb2ae68858fec554fb5eda
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录