Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f392ab38
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f392ab38
编写于
5月 23, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into fix/mnode
上级
3ff366cd
d1035938
变更
25
隐藏空白更改
内联
并排
Showing
25 changed file
with
267 addition
and
737 deletion
+267
-737
cmake/cmake.define
cmake/cmake.define
+2
-2
example/src/tstream.c
example/src/tstream.c
+1
-3
include/common/tmsg.h
include/common/tmsg.h
+2
-2
include/libs/index/index.h
include/libs/index/index.h
+8
-0
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+6
-13
include/util/tencode.h
include/util/tencode.h
+3
-3
source/common/src/tmsg.c
source/common/src/tmsg.c
+2
-2
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+2
-0
source/dnode/snode/src/snode.c
source/dnode/snode/src/snode.c
+1
-3
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+1
-1
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+1
-2
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+2
-6
source/dnode/vnode/src/meta/metaTable.c
source/dnode/vnode/src/meta/metaTable.c
+0
-3
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+48
-418
source/dnode/vnode/src/tsdb/tsdbWrite.c
source/dnode/vnode/src/tsdb/tsdbWrite.c
+1
-2
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+59
-37
source/libs/executor/inc/indexoperator.h
source/libs/executor/inc/indexoperator.h
+0
-35
source/libs/index/CMakeLists.txt
source/libs/index/CMakeLists.txt
+3
-0
source/libs/index/src/indexFilter.c
source/libs/index/src/indexFilter.c
+19
-18
source/libs/index/test/index_executor_tests.cpp
source/libs/index/test/index_executor_tests.cpp
+0
-0
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+6
-4
source/libs/scheduler/src/schFlowCtrl.c
source/libs/scheduler/src/schFlowCtrl.c
+16
-17
source/libs/scheduler/src/scheduler.c
source/libs/scheduler/src/scheduler.c
+69
-4
source/libs/stream/src/tstream.c
source/libs/stream/src/tstream.c
+14
-161
tools/taos-tools
tools/taos-tools
+1
-1
未找到文件。
cmake/cmake.define
浏览文件 @
f392ab38
...
@@ -71,8 +71,8 @@ ELSE ()
...
@@ -71,8 +71,8 @@ ELSE ()
ENDIF ()
ENDIF ()
IF (${SANITIZER} MATCHES "true")
IF (${SANITIZER} MATCHES "true")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -
static-libasan -
g3")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -g3")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -
static-libasan -
g3")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -g3")
MESSAGE(STATUS "Will compile with Address Sanitizer!")
MESSAGE(STATUS "Will compile with Address Sanitizer!")
ELSE ()
ELSE ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3")
...
...
example/src/tstream.c
浏览文件 @
f392ab38
...
@@ -82,9 +82,7 @@ int32_t create_stream() {
...
@@ -82,9 +82,7 @@ int32_t create_stream() {
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes
=
taos_query
(
pRes
=
taos_query
(
pConn
,
pConn
,
"create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from tu1 interval(10m)"
);
"create stream stream1 trigger at_once into outstb as select _wstartts, min(k), max(k), sum(k) as sum_of_k "
"from tu1 interval(10m)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create stream stream1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
printf
(
"failed to create stream stream1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
return
-
1
;
...
...
include/common/tmsg.h
浏览文件 @
f392ab38
...
@@ -244,12 +244,12 @@ typedef struct {
...
@@ -244,12 +244,12 @@ typedef struct {
const
void
*
pMsg
;
const
void
*
pMsg
;
}
SSubmitMsgIter
;
}
SSubmitMsgIter
;
int32_t
tInitSubmitMsgIter
(
const
SSubmitReq
*
pMsg
,
SSubmitMsgIter
*
pIter
);
int32_t
tInitSubmitMsgIter
(
SSubmitReq
*
pMsg
,
SSubmitMsgIter
*
pIter
);
int32_t
tGetSubmitMsgNext
(
SSubmitMsgIter
*
pIter
,
SSubmitBlk
**
pPBlock
);
int32_t
tGetSubmitMsgNext
(
SSubmitMsgIter
*
pIter
,
SSubmitBlk
**
pPBlock
);
int32_t
tInitSubmitBlkIter
(
SSubmitMsgIter
*
pMsgIter
,
SSubmitBlk
*
pBlock
,
SSubmitBlkIter
*
pIter
);
int32_t
tInitSubmitBlkIter
(
SSubmitMsgIter
*
pMsgIter
,
SSubmitBlk
*
pBlock
,
SSubmitBlkIter
*
pIter
);
STSRow
*
tGetSubmitBlkNext
(
SSubmitBlkIter
*
pIter
);
STSRow
*
tGetSubmitBlkNext
(
SSubmitBlkIter
*
pIter
);
// for debug
// for debug
int32_t
tPrintFixedSchemaSubmitReq
(
const
SSubmitReq
*
pReq
,
STSchema
*
pSchema
);
int32_t
tPrintFixedSchemaSubmitReq
(
SSubmitReq
*
pReq
,
STSchema
*
pSchema
);
typedef
struct
{
typedef
struct
{
int32_t
code
;
int32_t
code
;
...
...
include/libs/index/index.h
浏览文件 @
f392ab38
...
@@ -16,9 +16,11 @@
...
@@ -16,9 +16,11 @@
#ifndef _TD_INDEX_H_
#ifndef _TD_INDEX_H_
#define _TD_INDEX_H_
#define _TD_INDEX_H_
#include "nodes.h"
#include "os.h"
#include "os.h"
#include "taoserror.h"
#include "taoserror.h"
#include "tarray.h"
#include "tarray.h"
#include "tglobal.h"
#ifdef __cplusplus
#ifdef __cplusplus
extern
"C"
{
extern
"C"
{
...
@@ -189,6 +191,12 @@ void indexTermDestroy(SIndexTerm* p);
...
@@ -189,6 +191,12 @@ void indexTermDestroy(SIndexTerm* p);
*/
*/
void
indexInit
();
void
indexInit
();
/* index filter */
typedef
enum
{
SFLT_NOT_INDEX
,
SFLT_COARSE_INDEX
,
SFLT_ACCURATE_INDEX
}
SIdxFltStatus
;
SIdxFltStatus
idxGetFltStatus
(
SNode
*
pFilterNode
);
int32_t
doFilterTag
(
const
SNode
*
pFilterNode
,
SArray
*
result
);
/*
/*
* destory index env
* destory index env
*
*
...
...
include/libs/stream/tstream.h
浏览文件 @
f392ab38
...
@@ -114,17 +114,12 @@ static FORCE_INLINE void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit)
...
@@ -114,17 +114,12 @@ static FORCE_INLINE void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit)
int32_t
streamDataBlockEncode
(
void
**
buf
,
const
SStreamDataBlock
*
pOutput
);
int32_t
streamDataBlockEncode
(
void
**
buf
,
const
SStreamDataBlock
*
pOutput
);
void
*
streamDataBlockDecode
(
const
void
*
buf
,
SStreamDataBlock
*
pInput
);
void
*
streamDataBlockDecode
(
const
void
*
buf
,
SStreamDataBlock
*
pInput
);
typedef
struct
{
void
*
inputHandle
;
void
*
executor
;
}
SStreamRunner
;
typedef
struct
{
typedef
struct
{
int8_t
parallelizable
;
int8_t
parallelizable
;
char
*
qmsg
;
char
*
qmsg
;
// followings are not applicable to encoder and decoder
// followings are not applicable to encoder and decoder
int8_t
numOfRunners
;
void
*
inputHandle
;
SStreamRunner
*
runners
;
void
*
executor
;
}
STaskExec
;
}
STaskExec
;
typedef
struct
{
typedef
struct
{
...
@@ -320,17 +315,15 @@ int32_t streamEnqueueDataSubmit(SStreamTask* pTask, SStreamDataSubmit* input);
...
@@ -320,17 +315,15 @@ int32_t streamEnqueueDataSubmit(SStreamTask* pTask, SStreamDataSubmit* input);
int32_t
streamEnqueueDataBlk
(
SStreamTask
*
pTask
,
SStreamDataBlock
*
input
);
int32_t
streamEnqueueDataBlk
(
SStreamTask
*
pTask
,
SStreamDataBlock
*
input
);
int32_t
streamDequeueOutput
(
SStreamTask
*
pTask
,
void
**
output
);
int32_t
streamDequeueOutput
(
SStreamTask
*
pTask
,
void
**
output
);
int32_t
streamExecTask
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
const
void
*
input
,
int32_t
inputType
,
int32_t
workId
);
int32_t
streamTaskRun
(
SStreamTask
*
pTask
);
int32_t
streamTaskRun
(
SStreamTask
*
pTask
);
int32_t
streamTaskHandleInput
(
SStreamTask
*
pTask
,
void
*
data
);
int32_t
streamTaskHandleInput
(
SStreamTask
*
pTask
,
void
*
data
);
int32_t
streamTaskProcessRunReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
);
int32_t
streamTaskProcessRunReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
);
int32_t
stream
Task
ProcessDispatchReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamDispatchReq
*
pReq
,
SRpcMsg
*
pMsg
);
int32_t
streamProcessDispatchReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamDispatchReq
*
pReq
,
SRpcMsg
*
pMsg
);
int32_t
stream
Task
ProcessDispatchRsp
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamDispatchRsp
*
pRsp
);
int32_t
streamProcessDispatchRsp
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamDispatchRsp
*
pRsp
);
int32_t
stream
Task
ProcessRecoverReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamTaskRecoverReq
*
pReq
,
SRpcMsg
*
pMsg
);
int32_t
streamProcessRecoverReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamTaskRecoverReq
*
pReq
,
SRpcMsg
*
pMsg
);
int32_t
stream
Task
ProcessRecoverRsp
(
SStreamTask
*
pTask
,
SStreamTaskRecoverRsp
*
pRsp
);
int32_t
streamProcessRecoverRsp
(
SStreamTask
*
pTask
,
SStreamTaskRecoverRsp
*
pRsp
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
include/util/tencode.h
浏览文件 @
f392ab38
...
@@ -317,7 +317,7 @@ static FORCE_INLINE int32_t tDecodeI16v(SDecoder* pCoder, int16_t* val) {
...
@@ -317,7 +317,7 @@ static FORCE_INLINE int32_t tDecodeI16v(SDecoder* pCoder, int16_t* val) {
if
(
tDecodeU16v
(
pCoder
,
&
tval
)
<
0
)
{
if
(
tDecodeU16v
(
pCoder
,
&
tval
)
<
0
)
{
return
-
1
;
return
-
1
;
}
}
*
val
=
ZIGZAGD
(
int16_t
,
tval
);
if
(
val
)
*
val
=
ZIGZAGD
(
int16_t
,
tval
);
return
0
;
return
0
;
}
}
...
@@ -331,7 +331,7 @@ static FORCE_INLINE int32_t tDecodeI32v(SDecoder* pCoder, int32_t* val) {
...
@@ -331,7 +331,7 @@ static FORCE_INLINE int32_t tDecodeI32v(SDecoder* pCoder, int32_t* val) {
if
(
tDecodeU32v
(
pCoder
,
&
tval
)
<
0
)
{
if
(
tDecodeU32v
(
pCoder
,
&
tval
)
<
0
)
{
return
-
1
;
return
-
1
;
}
}
*
val
=
ZIGZAGD
(
int32_t
,
tval
);
if
(
val
)
*
val
=
ZIGZAGD
(
int32_t
,
tval
);
return
0
;
return
0
;
}
}
...
@@ -345,7 +345,7 @@ static FORCE_INLINE int32_t tDecodeI64v(SDecoder* pCoder, int64_t* val) {
...
@@ -345,7 +345,7 @@ static FORCE_INLINE int32_t tDecodeI64v(SDecoder* pCoder, int64_t* val) {
if
(
tDecodeU64v
(
pCoder
,
&
tval
)
<
0
)
{
if
(
tDecodeU64v
(
pCoder
,
&
tval
)
<
0
)
{
return
-
1
;
return
-
1
;
}
}
*
val
=
ZIGZAGD
(
int64_t
,
tval
);
if
(
val
)
*
val
=
ZIGZAGD
(
int64_t
,
tval
);
return
0
;
return
0
;
}
}
...
...
source/common/src/tmsg.c
浏览文件 @
f392ab38
...
@@ -28,7 +28,7 @@
...
@@ -28,7 +28,7 @@
#undef TD_MSG_SEG_CODE_
#undef TD_MSG_SEG_CODE_
#include "tmsgdef.h"
#include "tmsgdef.h"
int32_t
tInitSubmitMsgIter
(
const
SSubmitReq
*
pMsg
,
SSubmitMsgIter
*
pIter
)
{
int32_t
tInitSubmitMsgIter
(
SSubmitReq
*
pMsg
,
SSubmitMsgIter
*
pIter
)
{
if
(
pMsg
==
NULL
)
{
if
(
pMsg
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP
;
terrno
=
TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP
;
return
-
1
;
return
-
1
;
...
@@ -102,7 +102,7 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) {
...
@@ -102,7 +102,7 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) {
}
}
}
}
int32_t
tPrintFixedSchemaSubmitReq
(
const
SSubmitReq
*
pReq
,
STSchema
*
pTschema
)
{
int32_t
tPrintFixedSchemaSubmitReq
(
SSubmitReq
*
pReq
,
STSchema
*
pTschema
)
{
SSubmitMsgIter
msgIter
=
{
0
};
SSubmitMsgIter
msgIter
=
{
0
};
if
(
tInitSubmitMsgIter
(
pReq
,
&
msgIter
)
<
0
)
return
-
1
;
if
(
tInitSubmitMsgIter
(
pReq
,
&
msgIter
)
<
0
)
return
-
1
;
while
(
true
)
{
while
(
true
)
{
...
...
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
浏览文件 @
f392ab38
...
@@ -113,6 +113,8 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
...
@@ -113,6 +113,8 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
SRpcMsg
*
pMsg
=
*
(
SRpcMsg
**
)
taosArrayGet
(
pArray
,
i
);
SRpcMsg
*
pMsg
=
*
(
SRpcMsg
**
)
taosArrayGet
(
pArray
,
i
);
SRpcMsg
rsp
=
{.
info
=
pMsg
->
info
};
SRpcMsg
rsp
=
{.
info
=
pMsg
->
info
};
vnodePreprocessReq
(
pVnode
->
pImpl
,
pMsg
);
int32_t
ret
=
syncPropose
(
vnodeGetSyncHandle
(
pVnode
->
pImpl
),
pMsg
,
false
);
int32_t
ret
=
syncPropose
(
vnodeGetSyncHandle
(
pVnode
->
pImpl
),
pMsg
,
false
);
if
(
ret
==
TAOS_SYNC_PROPOSE_NOT_LEADER
)
{
if
(
ret
==
TAOS_SYNC_PROPOSE_NOT_LEADER
)
{
dTrace
(
"msg:%p, is redirect since not leader, vgId:%d "
,
pMsg
,
pVnode
->
vgId
);
dTrace
(
"msg:%p, is redirect since not leader, vgId:%d "
,
pMsg
,
pVnode
->
vgId
);
...
...
source/dnode/snode/src/snode.c
浏览文件 @
f392ab38
...
@@ -57,9 +57,7 @@ void sndMetaDelete(SStreamMeta *pMeta) {
...
@@ -57,9 +57,7 @@ void sndMetaDelete(SStreamMeta *pMeta) {
}
}
int32_t
sndMetaDeployTask
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
)
{
int32_t
sndMetaDeployTask
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
)
{
for
(
int
i
=
0
;
i
<
pTask
->
exec
.
numOfRunners
;
i
++
)
{
pTask
->
exec
.
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
exec
.
qmsg
,
NULL
);
pTask
->
exec
.
runners
[
i
].
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
exec
.
qmsg
,
NULL
);
}
return
taosHashPut
(
pMeta
->
pHash
,
&
pTask
->
taskId
,
sizeof
(
int32_t
),
pTask
,
sizeof
(
void
*
));
return
taosHashPut
(
pMeta
->
pHash
,
&
pTask
->
taskId
,
sizeof
(
int32_t
),
pTask
,
sizeof
(
void
*
));
}
}
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
f392ab38
...
@@ -51,7 +51,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
...
@@ -51,7 +51,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
void
vnodeDestroy
(
const
char
*
path
,
STfs
*
pTfs
);
void
vnodeDestroy
(
const
char
*
path
,
STfs
*
pTfs
);
SVnode
*
vnodeOpen
(
const
char
*
path
,
STfs
*
pTfs
,
SMsgCb
msgCb
);
SVnode
*
vnodeOpen
(
const
char
*
path
,
STfs
*
pTfs
,
SMsgCb
msgCb
);
void
vnodeClose
(
SVnode
*
pVnode
);
void
vnodeClose
(
SVnode
*
pVnode
);
int32_t
vnodePreprocess
WriteReqs
(
SVnode
*
pVnode
,
SArray
*
pMsgs
,
int64_t
*
version
);
int32_t
vnodePreprocess
Req
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
);
int32_t
vnodeProcessWriteReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
int64_t
version
,
SRpcMsg
*
pRsp
);
int32_t
vnodeProcessWriteReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
int64_t
version
,
SRpcMsg
*
pRsp
);
int32_t
vnodeProcessCMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
);
int32_t
vnodeProcessCMsg
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
);
int32_t
vnodeProcessSyncReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
);
int32_t
vnodeProcessSyncReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
);
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
f392ab38
...
@@ -173,8 +173,7 @@ typedef struct {
...
@@ -173,8 +173,7 @@ typedef struct {
}
STqExec
;
}
STqExec
;
struct
STQ
{
struct
STQ
{
char
*
path
;
char
*
path
;
// STqMetaStore* tqMeta;
SHashObj
*
pushMgr
;
// consumerId -> STqExec*
SHashObj
*
pushMgr
;
// consumerId -> STqExec*
SHashObj
*
execs
;
// subKey -> STqExec
SHashObj
*
execs
;
// subKey -> STqExec
SHashObj
*
pStreamTasks
;
SHashObj
*
pStreamTasks
;
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
f392ab38
...
@@ -104,7 +104,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeep
...
@@ -104,7 +104,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeep
int
tsdbClose
(
STsdb
**
pTsdb
);
int
tsdbClose
(
STsdb
**
pTsdb
);
int
tsdbBegin
(
STsdb
*
pTsdb
);
int
tsdbBegin
(
STsdb
*
pTsdb
);
int
tsdbCommit
(
STsdb
*
pTsdb
);
int
tsdbCommit
(
STsdb
*
pTsdb
);
int
tsdbScanAndConvertSubmitMsg
(
STsdb
*
pTsdb
,
const
SSubmitReq
*
pMsg
);
int
tsdbScanAndConvertSubmitMsg
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
);
int
tsdbInsertData
(
STsdb
*
pTsdb
,
int64_t
version
,
SSubmitReq
*
pMsg
,
SSubmitRsp
*
pRsp
);
int
tsdbInsertData
(
STsdb
*
pTsdb
,
int64_t
version
,
SSubmitReq
*
pMsg
,
SSubmitRsp
*
pRsp
);
int
tsdbInsertTableData
(
STsdb
*
pTsdb
,
SSubmitMsgIter
*
pMsgIter
,
SSubmitBlk
*
pBlock
,
SSubmitBlkRsp
*
pRsp
);
int
tsdbInsertTableData
(
STsdb
*
pTsdb
,
SSubmitMsgIter
*
pMsgIter
,
SSubmitBlk
*
pBlock
,
SSubmitBlkRsp
*
pRsp
);
tsdbReaderT
*
tsdbQueryTables
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
tsdbReaderT
*
tsdbQueryTables
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
...
@@ -123,11 +123,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
...
@@ -123,11 +123,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t
tqProcessVgDeleteReq
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcessVgDeleteReq
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
,
int32_t
workerId
);
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
,
int32_t
workerId
);
int32_t
tqProcessTaskDeploy
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcessTaskDeploy
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
);
#if 0
int32_t
tqProcessStreamTrigger
(
STQ
*
pTq
,
SSubmitReq
*
data
);
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId);
#endif
int32_t
tqProcessStreamTriggerNew
(
STQ
*
pTq
,
SSubmitReq
*
data
);
int32_t
tqProcessTaskRunReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tqProcessTaskRunReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tqProcessTaskDispatchReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tqProcessTaskDispatchReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tqProcessTaskRecoverReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tqProcessTaskRecoverReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
...
...
source/dnode/vnode/src/meta/metaTable.c
浏览文件 @
f392ab38
...
@@ -223,9 +223,6 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
...
@@ -223,9 +223,6 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
terrno
=
TSDB_CODE_TDB_TABLE_ALREADY_EXIST
;
terrno
=
TSDB_CODE_TDB_TABLE_ALREADY_EXIST
;
metaReaderClear
(
&
mr
);
metaReaderClear
(
&
mr
);
return
-
1
;
return
-
1
;
}
else
{
pReq
->
uid
=
tGenIdPI64
();
pReq
->
ctime
=
taosGetTimestampMs
();
}
}
metaReaderClear
(
&
mr
);
metaReaderClear
(
&
mr
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
f392ab38
...
@@ -36,15 +36,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
...
@@ -36,15 +36,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
/*ASSERT(0);*/
/*ASSERT(0);*/
/*}*/
/*}*/
#if 0
pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer,
(FTqDelete)taosMemoryFree, 0);
if (pTq->tqMeta == NULL) {
taosMemoryFree(pTq);
return NULL;
}
#endif
pTq
->
execs
=
taosHashInit
(
64
,
MurmurHash3_32
,
true
,
HASH_ENTRY_LOCK
);
pTq
->
execs
=
taosHashInit
(
64
,
MurmurHash3_32
,
true
,
HASH_ENTRY_LOCK
);
pTq
->
pStreamTasks
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
pTq
->
pStreamTasks
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
...
@@ -65,48 +56,6 @@ void tqClose(STQ* pTq) {
...
@@ -65,48 +56,6 @@ void tqClose(STQ* pTq) {
// TODO
// TODO
}
}
static
void
tdSRowDemo
()
{
#define DEMO_N_COLS 3
int16_t
schemaVersion
=
0
;
int32_t
numOfCols
=
DEMO_N_COLS
;
// ts + int
SRowBuilder
rb
=
{
0
};
SSchema
schema
[
DEMO_N_COLS
]
=
{
{.
type
=
TSDB_DATA_TYPE_TIMESTAMP
,
.
colId
=
1
,
.
name
=
"ts"
,
.
bytes
=
8
,
.
flags
=
COL_SMA_ON
},
{.
type
=
TSDB_DATA_TYPE_INT
,
.
colId
=
2
,
.
name
=
"c1"
,
.
bytes
=
4
,
.
flags
=
COL_SMA_ON
},
{.
type
=
TSDB_DATA_TYPE_INT
,
.
colId
=
3
,
.
name
=
"c2"
,
.
bytes
=
4
,
.
flags
=
COL_SMA_ON
}};
SSchema
*
pSchema
=
schema
;
STSchema
*
pTSChema
=
tdGetSTSChemaFromSSChema
(
&
pSchema
,
numOfCols
);
tdSRowInit
(
&
rb
,
schemaVersion
);
tdSRowSetTpInfo
(
&
rb
,
numOfCols
,
pTSChema
->
flen
);
int32_t
maxLen
=
TD_ROW_MAX_BYTES_FROM_SCHEMA
(
pTSChema
);
void
*
row
=
taosMemoryCalloc
(
1
,
maxLen
);
// make sure the buffer is enough
// set row buf
tdSRowResetBuf
(
&
rb
,
row
);
for
(
int32_t
idx
=
0
;
idx
<
pTSChema
->
numOfCols
;
++
idx
)
{
STColumn
*
pColumn
=
pTSChema
->
columns
+
idx
;
if
(
idx
==
0
)
{
int64_t
tsKey
=
1651234567
;
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NORM
,
&
tsKey
,
true
,
pColumn
->
offset
,
idx
);
}
else
if
(
idx
==
1
)
{
int32_t
val1
=
10
;
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NORM
,
&
val1
,
true
,
pColumn
->
offset
,
idx
);
}
else
{
tdAppendColValToRow
(
&
rb
,
pColumn
->
colId
,
pColumn
->
type
,
TD_VTYPE_NONE
,
NULL
,
true
,
pColumn
->
offset
,
idx
);
}
}
// print
tdSRowPrint
(
row
,
pTSChema
,
__func__
);
taosMemoryFree
(
pTSChema
);
}
int32_t
tqUpdateTbUidList
(
STQ
*
pTq
,
const
SArray
*
tbUidList
,
bool
isAdd
)
{
int32_t
tqUpdateTbUidList
(
STQ
*
pTq
,
const
SArray
*
tbUidList
,
bool
isAdd
)
{
void
*
pIter
=
NULL
;
void
*
pIter
=
NULL
;
while
(
1
)
{
while
(
1
)
{
...
@@ -261,32 +210,20 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
...
@@ -261,32 +210,20 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
}
}
int
tqPushMsg
(
STQ
*
pTq
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
ver
)
{
int
tqPushMsg
(
STQ
*
pTq
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
ver
)
{
if
(
msgType
!=
TDMT_VND_SUBMIT
)
return
0
;
if
(
msgType
==
TDMT_VND_SUBMIT
)
{
if
(
taosHashGetSize
(
pTq
->
pStreamTasks
)
==
0
)
return
0
;
// make sure msgType == TDMT_VND_SUBMIT
if
(
tdUpdateExpireWindow
(
pTq
->
pVnode
->
pSma
,
msg
,
ver
)
!=
0
)
{
if
(
tdUpdateExpireWindow
(
pTq
->
pVnode
->
pSma
,
msg
,
ver
)
!=
0
)
{
// TODO error handle
return
-
1
;
}
}
void
*
data
=
taosMemoryMalloc
(
msgLen
);
if
(
data
==
NULL
)
{
if
(
taosHashGetSize
(
pTq
->
pStreamTasks
)
==
0
)
return
0
;
return
-
1
;
}
memcpy
(
data
,
msg
,
msgLen
);
void
*
data
=
taosMemoryMalloc
(
msgLen
);
tqProcessStreamTrigger
(
pTq
,
data
);
if
(
data
==
NULL
)
{
return
-
1
;
}
}
memcpy
(
data
,
msg
,
msgLen
);
tqProcessStreamTriggerNew
(
pTq
,
data
);
#if 0
SRpcMsg req = {
.msgType = TDMT_VND_STREAM_TRIGGER,
.pCont = data,
.contLen = msgLen,
};
tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &req);
#endif
return
0
;
return
0
;
}
}
...
@@ -685,213 +622,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
...
@@ -685,213 +622,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
return
0
;
return
0
;
}
}
#if 0
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId;
int64_t fetchOffset;
int64_t blockingTime = pReq->blockingTime;
int32_t reqEpoch = pReq->epoch;
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
fetchOffset = walGetFirstVer(pTq->pWal);
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
fetchOffset = walGetLastVer(pTq->pWal);
} else {
fetchOffset = pReq->currentOffset + 1;
}
tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
SMqPollRspV2 rspV2 = {0};
rspV2.dataLen = 0;
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
if (pConsumer == NULL) {
vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode));
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = -1;
tmsgSendRsp(pMsg);
return 0;
}
int32_t consumerEpoch = atomic_load_32(&pConsumer->epoch);
while (consumerEpoch < reqEpoch) {
consumerEpoch = atomic_val_compare_exchange_32(&pConsumer->epoch, consumerEpoch, reqEpoch);
}
STqTopic* pTopic = NULL;
int32_t topicSz = taosArrayGetSize(pConsumer->topics);
for (int32_t i = 0; i < topicSz; i++) {
STqTopic* topic = taosArrayGet(pConsumer->topics, i);
// TODO race condition
ASSERT(pConsumer->consumerId == consumerId);
if (strcmp(topic->topicName, pReq->topic) == 0) {
pTopic = topic;
break;
}
}
if (pTopic == NULL) {
vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic,
TD_VID(pTq->pVnode));
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = -1;
tmsgSendRsp(pMsg);
return 0;
}
tqDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch,
TD_VID(pTq->pVnode));
rspV2.reqOffset = pReq->currentOffset;
rspV2.skipLogNum = 0;
while (1) {
/*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/
// TODO
consumerEpoch = atomic_load_32(&pConsumer->epoch);
if (consumerEpoch > reqEpoch) {
tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d",
consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch);
break;
}
SWalReadHead* pHead;
if (walReadWithHandle_s(pTopic->pReadhandle, fetchOffset, &pHead) < 0) {
// TODO: no more log, set timer to wait blocking time
// if data inserted during waiting, launch query and
// response to user
tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchOffset);
break;
}
tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
/*pHead = pTopic->pReadhandle->pHead;*/
if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
qTaskInfo_t task = pTopic->buffer.output[workerId].task;
ASSERT(task);
qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts;
if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(false);
}
if (pDataBlock == NULL) {
/*pos = fetchOffset % TQ_BUFFER_SIZE;*/
break;
}
taosArrayPush(pRes, pDataBlock);
}
if (taosArrayGetSize(pRes) == 0) {
tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId,
pReq->epoch, TD_VID(pTq->pVnode), fetchOffset);
fetchOffset++;
rspV2.skipLogNum++;
taosArrayDestroy(pRes);
continue;
}
rspV2.rspOffset = fetchOffset;
int32_t blockSz = taosArrayGetSize(pRes);
int32_t dataBlockStrLen = 0;
for (int32_t i = 0; i < blockSz; i++) {
SSDataBlock* pBlock = taosArrayGet(pRes, i);
dataBlockStrLen += sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
}
void* dataBlockBuf = taosMemoryMalloc(dataBlockStrLen);
if (dataBlockBuf == NULL) {
pMsg->code = -1;
taosMemoryFree(pHead);
}
rspV2.blockData = dataBlockBuf;
int32_t pos;
rspV2.blockPos = taosArrayInit(blockSz, sizeof(int32_t));
for (int32_t i = 0; i < blockSz; i++) {
pos = 0;
SSDataBlock* pBlock = taosArrayGet(pRes, i);
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)dataBlockBuf;
pRetrieve->useconds = 0;
pRetrieve->precision = 0;
pRetrieve->compressed = 0;
pRetrieve->completed = 1;
pRetrieve->numOfRows = htonl(pBlock->info.rows);
blockCompressEncode(pBlock, pRetrieve->data, &pos, pBlock->info.numOfCols, false);
taosArrayPush(rspV2.blockPos, &rspV2.dataLen);
int32_t totLen = sizeof(SRetrieveTableRsp) + pos;
pRetrieve->compLen = htonl(totLen);
rspV2.dataLen += totLen;
dataBlockBuf = POINTER_SHIFT(dataBlockBuf, totLen);
}
ASSERT(POINTER_DISTANCE(dataBlockBuf, rspV2.blockData) <= dataBlockStrLen);
int32_t msgLen = sizeof(SMqRspHead) + tEncodeSMqPollRspV2(NULL, &rspV2);
void* buf = rpcMallocCont(msgLen);
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
((SMqRspHead*)buf)->consumerId = consumerId;
void* msgBodyBuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqPollRspV2(&msgBodyBuf, &rspV2);
/*rsp.pBlockData = pRes;*/
/*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/
SRpcMsg resp = {.info = pMsg->info, pCont = buf, .contLen = msgLen, .code = 0};
tqDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset,
pHead->msgType, consumerId, pReq->epoch);
tmsgSendRsp(&resp);
taosMemoryFree(pHead);
return 0;
} else {
taosMemoryFree(pHead);
fetchOffset++;
rspV2.skipLogNum++;
}
}
/*if (blockingTime != 0) {*/
/*tqAddClientPusher(pTq->tqPushMgr, pMsg, consumerId, blockingTime);*/
/*} else {*/
rspV2.rspOffset = fetchOffset - 1;
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRspV2(NULL, &rspV2);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
return -1;
}
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
((SMqRspHead*)buf)->consumerId = consumerId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqPollRspV2(&abuf, &rspV2);
SRpcMsg resp = {.info = pMsg->info, .pCont = buf, .contLen = tlen, .code = 0};
tmsgSendRsp(&resp);
tqDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId,
pReq->epoch);
/*}*/
return 0;
}
#endif
int32_t
tqProcessVgDeleteReq
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
)
{
int32_t
tqProcessVgDeleteReq
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
)
{
SMqVDeleteReq
*
pReq
=
(
SMqVDeleteReq
*
)
msg
;
SMqVDeleteReq
*
pReq
=
(
SMqVDeleteReq
*
)
msg
;
...
@@ -981,7 +711,18 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
...
@@ -981,7 +711,18 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
ASSERT
(
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
WRITE_QUEUE
,
&
msg
)
==
0
);
ASSERT
(
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
WRITE_QUEUE
,
&
msg
)
==
0
);
}
}
int32_t
tqExpandTask
(
STQ
*
pTq
,
SStreamTask
*
pTask
,
int32_t
parallel
)
{
int32_t
tqProcessTaskDeploy
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
)
{
SStreamTask
*
pTask
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamTask
));
if
(
pTask
==
NULL
)
{
return
-
1
;
}
SDecoder
decoder
;
tDecoderInit
(
&
decoder
,
(
uint8_t
*
)
msg
,
msgLen
);
if
(
tDecodeSStreamTask
(
&
decoder
,
pTask
)
<
0
)
{
ASSERT
(
0
);
}
tDecoderClear
(
&
decoder
);
pTask
->
status
=
TASK_STATUS__IDLE
;
pTask
->
status
=
TASK_STATUS__IDLE
;
pTask
->
inputStatus
=
TASK_INPUT_STATUS__NORMAL
;
pTask
->
inputStatus
=
TASK_INPUT_STATUS__NORMAL
;
pTask
->
outputStatus
=
TASK_OUTPUT_STATUS__NORMAL
;
pTask
->
outputStatus
=
TASK_OUTPUT_STATUS__NORMAL
;
...
@@ -994,57 +735,19 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
...
@@ -994,57 +735,19 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
if
(
pTask
->
inputQ
==
NULL
||
pTask
->
outputQ
==
NULL
||
pTask
->
inputQAll
==
NULL
||
pTask
->
outputQAll
==
NULL
)
if
(
pTask
->
inputQ
==
NULL
||
pTask
->
outputQ
==
NULL
||
pTask
->
inputQAll
==
NULL
||
pTask
->
outputQAll
==
NULL
)
goto
FAIL
;
goto
FAIL
;
// exec
if
(
pTask
->
execType
!=
TASK_EXEC__NONE
)
{
if
(
pTask
->
execType
!=
TASK_EXEC__NONE
)
{
// expand runners
// expand runners
pTask
->
exec
.
numOfRunners
=
parallel
;
STqReadHandle
*
pStreamReader
=
tqInitSubmitMsgScanner
(
pTq
->
pVnode
->
pMeta
);
pTask
->
exec
.
runners
=
taosMemoryCalloc
(
parallel
,
sizeof
(
SStreamRunner
));
SReadHandle
handle
=
{
if
(
pTask
->
exec
.
runners
==
NULL
)
{
.
reader
=
pStreamReader
,
goto
FAIL
;
.
meta
=
pTq
->
pVnode
->
pMeta
,
}
.
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
,
for
(
int32_t
i
=
0
;
i
<
parallel
;
i
++
)
{
.
vnode
=
pTq
->
pVnode
,
STqReadHandle
*
pStreamReader
=
tqInitSubmitMsgScanner
(
pTq
->
pVnode
->
pMeta
);
};
SReadHandle
handle
=
{
pTask
->
exec
.
inputHandle
=
pStreamReader
;
.
reader
=
pStreamReader
,
pTask
->
exec
.
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
exec
.
qmsg
,
&
handle
);
.
meta
=
pTq
->
pVnode
->
pMeta
,
ASSERT
(
pTask
->
exec
.
executor
);
.
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
,
.
vnode
=
pTq
->
pVnode
,
};
pTask
->
exec
.
runners
[
i
].
inputHandle
=
pStreamReader
;
pTask
->
exec
.
runners
[
i
].
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
exec
.
qmsg
,
&
handle
);
ASSERT
(
pTask
->
exec
.
runners
[
i
].
executor
);
}
}
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
pTask
->
tbSink
.
vnode
=
pTq
->
pVnode
;
pTask
->
tbSink
.
tbSinkFunc
=
tqTableSink
;
}
return
0
;
FAIL:
if
(
pTask
->
inputQ
)
taosCloseQueue
(
pTask
->
inputQ
);
if
(
pTask
->
outputQ
)
taosCloseQueue
(
pTask
->
outputQ
);
if
(
pTask
->
inputQAll
)
taosFreeQall
(
pTask
->
inputQAll
);
if
(
pTask
->
outputQAll
)
taosFreeQall
(
pTask
->
outputQAll
);
if
(
pTask
)
taosMemoryFree
(
pTask
);
return
-
1
;
}
int32_t
tqProcessTaskDeploy
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
)
{
SStreamTask
*
pTask
=
taosMemoryCalloc
(
1
,
sizeof
(
SStreamTask
));
if
(
pTask
==
NULL
)
{
return
-
1
;
}
SDecoder
decoder
;
tDecoderInit
(
&
decoder
,
(
uint8_t
*
)
msg
,
msgLen
);
if
(
tDecodeSStreamTask
(
&
decoder
,
pTask
)
<
0
)
{
ASSERT
(
0
);
}
tDecoderClear
(
&
decoder
);
// exec
if
(
tqExpandTask
(
pTq
,
pTask
,
4
)
<
0
)
{
ASSERT
(
0
);
}
}
// sink
// sink
...
@@ -1052,8 +755,12 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
...
@@ -1052,8 +755,12 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
pTask
->
smaSink
.
smaSink
=
smaHandleRes
;
pTask
->
smaSink
.
smaSink
=
smaHandleRes
;
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
pTask
->
tbSink
.
vnode
=
pTq
->
pVnode
;
pTask
->
tbSink
.
tbSinkFunc
=
tqTableSink
;
ASSERT
(
pTask
->
tbSink
.
pSchemaWrapper
);
ASSERT
(
pTask
->
tbSink
.
pSchemaWrapper
);
ASSERT
(
pTask
->
tbSink
.
pSchemaWrapper
->
pSchema
);
ASSERT
(
pTask
->
tbSink
.
pSchemaWrapper
->
pSchema
);
pTask
->
tbSink
.
pTSchema
=
pTask
->
tbSink
.
pTSchema
=
tdGetSTSChemaFromSSChema
(
&
pTask
->
tbSink
.
pSchemaWrapper
->
pSchema
,
pTask
->
tbSink
.
pSchemaWrapper
->
nCols
);
tdGetSTSChemaFromSSChema
(
&
pTask
->
tbSink
.
pSchemaWrapper
->
pSchema
,
pTask
->
tbSink
.
pSchemaWrapper
->
nCols
);
ASSERT
(
pTask
->
tbSink
.
pTSchema
);
ASSERT
(
pTask
->
tbSink
.
pTSchema
);
...
@@ -1061,94 +768,17 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
...
@@ -1061,94 +768,17 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
taosHashPut
(
pTq
->
pStreamTasks
,
&
pTask
->
taskId
,
sizeof
(
int32_t
),
pTask
,
sizeof
(
SStreamTask
));
taosHashPut
(
pTq
->
pStreamTasks
,
&
pTask
->
taskId
,
sizeof
(
int32_t
),
pTask
,
sizeof
(
SStreamTask
));
return
0
;
}
int32_t
tqProcessStreamTrigger
(
STQ
*
pTq
,
void
*
data
,
int32_t
dataLen
,
int32_t
workerId
)
{
void
*
pIter
=
NULL
;
while
(
1
)
{
pIter
=
taosHashIterate
(
pTq
->
pStreamTasks
,
pIter
);
if
(
pIter
==
NULL
)
break
;
SStreamTask
*
pTask
=
(
SStreamTask
*
)
pIter
;
if
(
streamExecTask
(
pTask
,
&
pTq
->
pVnode
->
msgCb
,
data
,
STREAM_DATA_TYPE_SUBMIT_BLOCK
,
workerId
)
<
0
)
{
// TODO
}
}
return
0
;
}
#if 0
int32_t tqProcessStreamTriggerNew(STQ* pTq, SSubmitReq* data) {
SStreamDataSubmit* pSubmit = NULL;
// build data
pSubmit = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM);
if (pSubmit == NULL) return -1;
pSubmit->dataRef = taosMemoryMalloc(sizeof(int32_t));
if (pSubmit->dataRef == NULL) goto FAIL;
*pSubmit->dataRef = 1;
pSubmit->data = data;
pSubmit->type = STREAM_INPUT__DATA_BLOCK;
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = (SStreamTask*)pIter;
if (pTask->inputType == TASK_INPUT_TYPE__SUMBIT_BLOCK) {
streamEnqueueDataSubmit(pTask, pSubmit);
// TODO cal back pressure
}
// check run
int8_t execStatus = atomic_load_8(&pTask->status);
if (execStatus == TASK_STATUS__IDLE || execStatus == TASK_STATUS__CLOSING) {
SStreamTaskRunReq* pReq = taosMemoryMalloc(sizeof(SStreamTaskRunReq));
if (pReq == NULL) continue;
// TODO: do we need htonl?
pReq->head.vgId = pTq->pVnode->config.vgId;
pReq->streamId = pTask->streamId;
pReq->taskId = pTask->taskId;
SRpcMsg msg = {
.msgType = 0,
.pCont = pReq,
.contLen = sizeof(SStreamTaskRunReq),
};
tmsgPutToQueue(&pTq->pVnode->msgCb, FETCH_QUEUE, &msg);
}
}
streamDataSubmitRefDec(pSubmit);
return
0
;
return
0
;
FAIL:
FAIL:
if (pSubmit) {
if
(
pTask
->
inputQ
)
taosCloseQueue
(
pTask
->
inputQ
);
if (pSubmit->dataRef) {
if
(
pTask
->
outputQ
)
taosCloseQueue
(
pTask
->
outputQ
);
taosMemoryFree(pSubmit->dataRef);
if
(
pTask
->
inputQAll
)
taosFreeQall
(
pTask
->
inputQAll
);
}
if
(
pTask
->
outputQAll
)
taosFreeQall
(
pTask
->
outputQAll
);
taosFreeQitem(pSubmit);
if
(
pTask
)
taosMemoryFree
(
pTask
);
}
return
-
1
;
return
-
1
;
}
}
#endif
int32_t
tqProcessTaskExec
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
,
int32_t
workerId
)
{
SStreamTaskExecReq
req
;
tDecodeSStreamTaskExecReq
(
msg
,
&
req
);
int32_t
taskId
=
req
.
taskId
;
ASSERT
(
taskId
);
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
ASSERT
(
pTask
);
if
(
streamExecTask
(
pTask
,
&
pTq
->
pVnode
->
msgCb
,
req
.
data
,
STREAM_DATA_TYPE_SSDATA_BLOCK
,
workerId
)
<
0
)
{
// TODO
}
return
0
;
}
int32_t
tqProcessStreamTrigger
New
(
STQ
*
pTq
,
SSubmitReq
*
pReq
)
{
int32_t
tqProcessStreamTrigger
(
STQ
*
pTq
,
SSubmitReq
*
pReq
)
{
void
*
pIter
=
NULL
;
void
*
pIter
=
NULL
;
bool
failed
=
false
;
bool
failed
=
false
;
...
@@ -1234,7 +864,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
...
@@ -1234,7 +864,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchReq
*
pReq
=
pMsg
->
pCont
;
SStreamDispatchReq
*
pReq
=
pMsg
->
pCont
;
int32_t
taskId
=
pReq
->
taskId
;
int32_t
taskId
=
pReq
->
taskId
;
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
stream
Task
ProcessDispatchReq
(
pTask
,
&
pTq
->
pVnode
->
msgCb
,
pReq
,
pMsg
);
streamProcessDispatchReq
(
pTask
,
&
pTq
->
pVnode
->
msgCb
,
pReq
,
pMsg
);
return
0
;
return
0
;
}
}
...
@@ -1242,7 +872,7 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
...
@@ -1242,7 +872,7 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverReq
*
pReq
=
pMsg
->
pCont
;
SStreamTaskRecoverReq
*
pReq
=
pMsg
->
pCont
;
int32_t
taskId
=
pReq
->
taskId
;
int32_t
taskId
=
pReq
->
taskId
;
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
stream
Task
ProcessRecoverReq
(
pTask
,
&
pTq
->
pVnode
->
msgCb
,
pReq
,
pMsg
);
streamProcessRecoverReq
(
pTask
,
&
pTq
->
pVnode
->
msgCb
,
pReq
,
pMsg
);
return
0
;
return
0
;
}
}
...
@@ -1250,7 +880,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
...
@@ -1250,7 +880,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp
*
pRsp
=
pMsg
->
pCont
;
SStreamDispatchRsp
*
pRsp
=
pMsg
->
pCont
;
int32_t
taskId
=
pRsp
->
taskId
;
int32_t
taskId
=
pRsp
->
taskId
;
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
stream
Task
ProcessDispatchRsp
(
pTask
,
&
pTq
->
pVnode
->
msgCb
,
pRsp
);
streamProcessDispatchRsp
(
pTask
,
&
pTq
->
pVnode
->
msgCb
,
pRsp
);
return
0
;
return
0
;
}
}
...
@@ -1258,6 +888,6 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
...
@@ -1258,6 +888,6 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverRsp
*
pRsp
=
pMsg
->
pCont
;
SStreamTaskRecoverRsp
*
pRsp
=
pMsg
->
pCont
;
int32_t
taskId
=
pRsp
->
taskId
;
int32_t
taskId
=
pRsp
->
taskId
;
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
SStreamTask
*
pTask
=
taosHashGet
(
pTq
->
pStreamTasks
,
&
taskId
,
sizeof
(
int32_t
));
stream
Task
ProcessRecoverRsp
(
pTask
,
pRsp
);
streamProcessRecoverRsp
(
pTask
,
pRsp
);
return
0
;
return
0
;
}
}
source/dnode/vnode/src/tsdb/tsdbWrite.c
浏览文件 @
f392ab38
...
@@ -85,7 +85,7 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, STSRow *ro
...
@@ -85,7 +85,7 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdb *pTsdb, tb_uid_t uid, STSRow *ro
return
0
;
return
0
;
}
}
int
tsdbScanAndConvertSubmitMsg
(
STsdb
*
pTsdb
,
const
SSubmitReq
*
pMsg
)
{
int
tsdbScanAndConvertSubmitMsg
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
)
{
ASSERT
(
pMsg
!=
NULL
);
ASSERT
(
pMsg
!=
NULL
);
// STsdbMeta * pMeta = pTsdb->tsdbMeta;
// STsdbMeta * pMeta = pTsdb->tsdbMeta;
SSubmitMsgIter
msgIter
=
{
0
};
SSubmitMsgIter
msgIter
=
{
0
};
...
@@ -150,7 +150,6 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, const SSubmitReq *pMsg) {
...
@@ -150,7 +150,6 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, const SSubmitReq *pMsg) {
return
-
1
;
return
-
1
;
}
}
}
}
}
}
if
(
terrno
!=
TSDB_CODE_SUCCESS
)
return
-
1
;
if
(
terrno
!=
TSDB_CODE_SUCCESS
)
return
-
1
;
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
f392ab38
...
@@ -24,26 +24,66 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
...
@@ -24,26 +24,66 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
static
int
vnodeProcessSubmitReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
static
int
vnodeProcessSubmitReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int32_t
len
,
SRpcMsg
*
pRsp
);
static
int
vnodeProcessCreateTSmaReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int
len
,
SRpcMsg
*
pRsp
);
static
int
vnodeProcessCreateTSmaReq
(
SVnode
*
pVnode
,
int64_t
version
,
void
*
pReq
,
int
len
,
SRpcMsg
*
pRsp
);
int
vnodePreprocessWriteReqs
(
SVnode
*
pVnode
,
SArray
*
pMsgs
,
int64_t
*
version
)
{
int32_t
vnodePreprocessReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
)
{
#if 0
SDecoder
dc
=
{
0
};
SRpcMsg *pMsg;
SRpcMsg *pRpc;
*version = pVnode->state.processed;
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
pRpc = pMsg;
// set request version
if (walWrite(pVnode->pWal, pVnode->state.processed++, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) {
vError("vnode:%d write wal error since %s", TD_VID(pVnode), terrstr());
return -1;
}
}
walFsync(pVnode->pWal, false);
switch
(
pMsg
->
msgType
)
{
case
TDMT_VND_CREATE_TABLE
:
{
int64_t
ctime
=
taosGetTimestampMs
();
int32_t
nReqs
;
tDecoderInit
(
&
dc
,
(
uint8_t
*
)
pMsg
->
pCont
+
sizeof
(
SMsgHead
),
pMsg
->
contLen
-
sizeof
(
SMsgHead
));
tStartDecode
(
&
dc
);
tDecodeI32v
(
&
dc
,
&
nReqs
);
for
(
int32_t
iReq
=
0
;
iReq
<
nReqs
;
iReq
++
)
{
tb_uid_t
uid
=
tGenIdPI64
();
tStartDecode
(
&
dc
);
tDecodeI32v
(
&
dc
,
NULL
);
*
(
int64_t
*
)(
dc
.
data
+
dc
.
pos
)
=
uid
;
*
(
int64_t
*
)(
dc
.
data
+
dc
.
pos
+
8
)
=
ctime
;
tEndDecode
(
&
dc
);
}
tEndDecode
(
&
dc
);
tDecoderClear
(
&
dc
);
}
break
;
case
TDMT_VND_SUBMIT
:
{
SSubmitMsgIter
msgIter
=
{
0
};
SSubmitReq
*
pSubmitReq
=
(
SSubmitReq
*
)
pMsg
->
pCont
;
SSubmitBlk
*
pBlock
=
NULL
;
int64_t
ctime
=
taosGetTimestampMs
();
tb_uid_t
uid
;
tInitSubmitMsgIter
(
pSubmitReq
,
&
msgIter
);
for
(;;)
{
tGetSubmitMsgNext
(
&
msgIter
,
&
pBlock
);
if
(
pBlock
==
NULL
)
break
;
if
(
msgIter
.
schemaLen
>
0
)
{
uid
=
tGenIdPI64
();
tDecoderInit
(
&
dc
,
pBlock
->
data
,
msgIter
.
schemaLen
);
tStartDecode
(
&
dc
);
tDecodeI32v
(
&
dc
,
NULL
);
*
(
int64_t
*
)(
dc
.
data
+
dc
.
pos
)
=
uid
;
*
(
int64_t
*
)(
dc
.
data
+
dc
.
pos
+
8
)
=
ctime
;
pBlock
->
uid
=
htobe64
(
uid
);
tEndDecode
(
&
dc
);
tDecoderClear
(
&
dc
);
}
}
}
break
;
default:
break
;
}
#endif
return
0
;
return
0
;
}
}
...
@@ -106,13 +146,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
...
@@ -106,13 +146,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
pMsg
->
contLen
-
sizeof
(
SMsgHead
))
<
0
)
{
pMsg
->
contLen
-
sizeof
(
SMsgHead
))
<
0
)
{
}
}
}
break
;
}
break
;
#if 0
case TDMT_VND_TASK_WRITE_EXEC: {
if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pMsg->contLen - sizeof(SMsgHead),
0) < 0) {
}
} break;
#endif
case
TDMT_VND_ALTER_VNODE
:
case
TDMT_VND_ALTER_VNODE
:
break
;
break
;
default:
default:
...
@@ -195,17 +228,6 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
...
@@ -195,17 +228,6 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
case
TDMT_VND_TASK_RECOVER_RSP
:
case
TDMT_VND_TASK_RECOVER_RSP
:
return
tqProcessTaskRecoverRsp
(
pVnode
->
pTq
,
pMsg
);
return
tqProcessTaskRecoverRsp
(
pVnode
->
pTq
,
pMsg
);
#if 0
case TDMT_VND_TASK_PIPE_EXEC:
case TDMT_VND_TASK_MERGE_EXEC:
return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0);
case TDMT_VND_STREAM_TRIGGER:{
// refactor, avoid double free
int code = tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0);
pMsg->pCont = NULL;
return code;
}
#endif
case
TDMT_VND_QUERY_HEARTBEAT
:
case
TDMT_VND_QUERY_HEARTBEAT
:
return
qWorkerProcessHbMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
return
qWorkerProcessHbMsg
(
pVnode
,
pVnode
->
pQuery
,
pMsg
);
default:
default:
...
@@ -675,7 +697,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
...
@@ -675,7 +697,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
goto
_exit
;
goto
_exit
;
}
}
for
(
int
i
=
0
;;)
{
for
(;;)
{
tGetSubmitMsgNext
(
&
msgIter
,
&
pBlock
);
tGetSubmitMsgNext
(
&
msgIter
,
&
pBlock
);
if
(
pBlock
==
NULL
)
break
;
if
(
pBlock
==
NULL
)
break
;
...
...
source/libs/executor/inc/indexoperator.h
已删除
100644 → 0
浏览文件 @
3ff366cd
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _INDEX_OPERATOR_H
#define _INDEX_OPERATOR_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include "nodes.h"
#include "tglobal.h"
typedef
enum
{
SFLT_NOT_INDEX
,
SFLT_COARSE_INDEX
,
SFLT_ACCURATE_INDEX
}
SIdxFltStatus
;
SIdxFltStatus
idxGetFltStatus
(
SNode
*
pFilterNode
);
// construct tag filter operator later
int32_t
doFilterTag
(
const
SNode
*
pFilterNode
,
SArray
*
result
);
#ifdef __cplusplus
}
#endif
#endif
/*INDEX_OPERATOR_*/
source/libs/index/CMakeLists.txt
浏览文件 @
f392ab38
...
@@ -12,6 +12,9 @@ target_link_libraries(
...
@@ -12,6 +12,9 @@ target_link_libraries(
PUBLIC os
PUBLIC os
PUBLIC util
PUBLIC util
PUBLIC common
PUBLIC common
PUBLIC nodes
PUBLIC scalar
PUBLIC function
)
)
if
(
${
BUILD_WITH_LUCENE
}
)
if
(
${
BUILD_WITH_LUCENE
}
)
...
...
source/libs/
executor/src/indexoperato
r.c
→
source/libs/
index/src/indexFilte
r.c
浏览文件 @
f392ab38
...
@@ -13,10 +13,11 @@
...
@@ -13,10 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include "indexoperator.h"
#include "executorimpl.h"
#include "index.h"
#include "index.h"
#include "indexInt.h"
#include "nodes.h"
#include "nodes.h"
#include "querynodes.h"
#include "scalar.h"
#include "tdatablock.h"
#include "tdatablock.h"
// clang-format off
// clang-format off
...
@@ -69,9 +70,9 @@ typedef int32_t (*sif_func_t)(SIFParam *left, SIFParam *rigth, SIFParam *output)
...
@@ -69,9 +70,9 @@ typedef int32_t (*sif_func_t)(SIFParam *left, SIFParam *rigth, SIFParam *output)
static
sif_func_t
sifNullFunc
=
NULL
;
static
sif_func_t
sifNullFunc
=
NULL
;
// typedef struct SIFWalkParm
// typedef struct SIFWalkParm
// construct tag filter operator later
// construct tag filter operator later
static
void
destroyTagFilterOperatorInfo
(
void
*
param
)
{
//
static void destroyTagFilterOperatorInfo(void *param) {
STagFilterOperatorInfo
*
pInfo
=
(
STagFilterOperatorInfo
*
)
param
;
//
STagFilterOperatorInfo *pInfo = (STagFilterOperatorInfo *)param;
}
//
}
static
void
sifFreeParam
(
SIFParam
*
param
)
{
static
void
sifFreeParam
(
SIFParam
*
param
)
{
if
(
param
==
NULL
)
return
;
if
(
param
==
NULL
)
return
;
...
@@ -178,13 +179,13 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
...
@@ -178,13 +179,13 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
case
QUERY_NODE_NODE_LIST
:
{
case
QUERY_NODE_NODE_LIST
:
{
SNodeListNode
*
nl
=
(
SNodeListNode
*
)
node
;
SNodeListNode
*
nl
=
(
SNodeListNode
*
)
node
;
if
(
LIST_LENGTH
(
nl
->
pNodeList
)
<=
0
)
{
if
(
LIST_LENGTH
(
nl
->
pNodeList
)
<=
0
)
{
q
Error
(
"invalid length for node:%p, length: %d"
,
node
,
LIST_LENGTH
(
nl
->
pNodeList
));
index
Error
(
"invalid length for node:%p, length: %d"
,
node
,
LIST_LENGTH
(
nl
->
pNodeList
));
SIF_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
SIF_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
SIF_ERR_RET
(
scalarGenerateSetFromList
((
void
**
)
&
param
->
pFilter
,
node
,
nl
->
dataType
.
type
));
SIF_ERR_RET
(
scalarGenerateSetFromList
((
void
**
)
&
param
->
pFilter
,
node
,
nl
->
dataType
.
type
));
if
(
taosHashPut
(
ctx
->
pRes
,
&
node
,
POINTER_BYTES
,
param
,
sizeof
(
*
param
)))
{
if
(
taosHashPut
(
ctx
->
pRes
,
&
node
,
POINTER_BYTES
,
param
,
sizeof
(
*
param
)))
{
taosHashCleanup
(
param
->
pFilter
);
taosHashCleanup
(
param
->
pFilter
);
q
Error
(
"taosHashPut nodeList failed, size:%d"
,
(
int32_t
)
sizeof
(
*
param
));
index
Error
(
"taosHashPut nodeList failed, size:%d"
,
(
int32_t
)
sizeof
(
*
param
));
SIF_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SIF_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
break
;
break
;
...
@@ -194,7 +195,7 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
...
@@ -194,7 +195,7 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
case
QUERY_NODE_LOGIC_CONDITION
:
{
case
QUERY_NODE_LOGIC_CONDITION
:
{
SIFParam
*
res
=
(
SIFParam
*
)
taosHashGet
(
ctx
->
pRes
,
&
node
,
POINTER_BYTES
);
SIFParam
*
res
=
(
SIFParam
*
)
taosHashGet
(
ctx
->
pRes
,
&
node
,
POINTER_BYTES
);
if
(
NULL
==
res
)
{
if
(
NULL
==
res
)
{
q
Error
(
"no result for node, type:%d, node:%p"
,
nodeType
(
node
),
node
);
index
Error
(
"no result for node, type:%d, node:%p"
,
nodeType
(
node
),
node
);
SIF_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
SIF_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
}
*
param
=
*
res
;
*
param
=
*
res
;
...
@@ -210,7 +211,7 @@ static int32_t sifInitOperParams(SIFParam **params, SOperatorNode *node, SIFCtx
...
@@ -210,7 +211,7 @@ static int32_t sifInitOperParams(SIFParam **params, SOperatorNode *node, SIFCtx
int32_t
code
=
0
;
int32_t
code
=
0
;
int32_t
nParam
=
sifGetOperParamNum
(
node
->
opType
);
int32_t
nParam
=
sifGetOperParamNum
(
node
->
opType
);
if
(
NULL
==
node
->
pLeft
||
(
nParam
==
2
&&
NULL
==
node
->
pRight
))
{
if
(
NULL
==
node
->
pLeft
||
(
nParam
==
2
&&
NULL
==
node
->
pRight
))
{
q
Error
(
"invalid operation node, left: %p, rigth: %p"
,
node
->
pLeft
,
node
->
pRight
);
index
Error
(
"invalid operation node, left: %p, rigth: %p"
,
node
->
pLeft
,
node
->
pRight
);
SIF_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
SIF_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
SIFParam
*
paramList
=
taosMemoryCalloc
(
nParam
,
sizeof
(
SIFParam
));
SIFParam
*
paramList
=
taosMemoryCalloc
(
nParam
,
sizeof
(
SIFParam
));
...
@@ -232,7 +233,7 @@ static int32_t sifInitParamList(SIFParam **params, SNodeList *nodeList, SIFCtx *
...
@@ -232,7 +233,7 @@ static int32_t sifInitParamList(SIFParam **params, SNodeList *nodeList, SIFCtx *
int32_t
code
=
0
;
int32_t
code
=
0
;
SIFParam
*
tParams
=
taosMemoryCalloc
(
nodeList
->
length
,
sizeof
(
SIFParam
));
SIFParam
*
tParams
=
taosMemoryCalloc
(
nodeList
->
length
,
sizeof
(
SIFParam
));
if
(
tParams
==
NULL
)
{
if
(
tParams
==
NULL
)
{
q
Error
(
"failed to calloc, nodeList: %p"
,
nodeList
);
index
Error
(
"failed to calloc, nodeList: %p"
,
nodeList
);
SIF_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SIF_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
...
@@ -252,7 +253,7 @@ _return:
...
@@ -252,7 +253,7 @@ _return:
SIF_RET
(
code
);
SIF_RET
(
code
);
}
}
static
int32_t
sifExecFunction
(
SFunctionNode
*
node
,
SIFCtx
*
ctx
,
SIFParam
*
output
)
{
static
int32_t
sifExecFunction
(
SFunctionNode
*
node
,
SIFCtx
*
ctx
,
SIFParam
*
output
)
{
q
Error
(
"index-filter not support buildin function"
);
index
Error
(
"index-filter not support buildin function"
);
return
TSDB_CODE_QRY_INVALID_INPUT
;
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
}
static
int32_t
sifDoIndex
(
SIFParam
*
left
,
SIFParam
*
right
,
int8_t
operType
,
SIFParam
*
output
)
{
static
int32_t
sifDoIndex
(
SIFParam
*
left
,
SIFParam
*
right
,
int8_t
operType
,
SIFParam
*
output
)
{
...
@@ -390,8 +391,8 @@ _return:
...
@@ -390,8 +391,8 @@ _return:
static
int32_t
sifExecLogic
(
SLogicConditionNode
*
node
,
SIFCtx
*
ctx
,
SIFParam
*
output
)
{
static
int32_t
sifExecLogic
(
SLogicConditionNode
*
node
,
SIFCtx
*
ctx
,
SIFParam
*
output
)
{
if
(
NULL
==
node
->
pParameterList
||
node
->
pParameterList
->
length
<=
0
)
{
if
(
NULL
==
node
->
pParameterList
||
node
->
pParameterList
->
length
<=
0
)
{
q
Error
(
"invalid logic parameter list, list:%p, paramNum:%d"
,
node
->
pParameterList
,
index
Error
(
"invalid logic parameter list, list:%p, paramNum:%d"
,
node
->
pParameterList
,
node
->
pParameterList
?
node
->
pParameterList
->
length
:
0
);
node
->
pParameterList
?
node
->
pParameterList
->
length
:
0
);
return
TSDB_CODE_QRY_INVALID_INPUT
;
return
TSDB_CODE_QRY_INVALID_INPUT
;
}
}
...
@@ -485,7 +486,7 @@ EDealRes sifCalcWalker(SNode *node, void *context) {
...
@@ -485,7 +486,7 @@ EDealRes sifCalcWalker(SNode *node, void *context) {
return
sifWalkOper
(
node
,
ctx
);
return
sifWalkOper
(
node
,
ctx
);
}
}
q
Error
(
"invalid node type for index filter calculating, type:%d"
,
nodeType
(
node
));
index
Error
(
"invalid node type for index filter calculating, type:%d"
,
nodeType
(
node
));
ctx
->
code
=
TSDB_CODE_QRY_INVALID_INPUT
;
ctx
->
code
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
DEAL_RES_ERROR
;
return
DEAL_RES_ERROR
;
}
}
...
@@ -509,7 +510,7 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
...
@@ -509,7 +510,7 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
SIFCtx
ctx
=
{.
code
=
0
,
.
noExec
=
false
};
SIFCtx
ctx
=
{.
code
=
0
,
.
noExec
=
false
};
ctx
.
pRes
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
ctx
.
pRes
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
if
(
NULL
==
ctx
.
pRes
)
{
if
(
NULL
==
ctx
.
pRes
)
{
q
Error
(
"index-filter failed to taosHashInit"
);
index
Error
(
"index-filter failed to taosHashInit"
);
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
}
...
@@ -519,7 +520,7 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
...
@@ -519,7 +520,7 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
if
(
pDst
)
{
if
(
pDst
)
{
SIFParam
*
res
=
(
SIFParam
*
)
taosHashGet
(
ctx
.
pRes
,
(
void
*
)
&
pNode
,
POINTER_BYTES
);
SIFParam
*
res
=
(
SIFParam
*
)
taosHashGet
(
ctx
.
pRes
,
(
void
*
)
&
pNode
,
POINTER_BYTES
);
if
(
res
==
NULL
)
{
if
(
res
==
NULL
)
{
q
Error
(
"no valid res in hash, node:(%p), type(%d)"
,
(
void
*
)
&
pNode
,
nodeType
(
pNode
));
index
Error
(
"no valid res in hash, node:(%p), type(%d)"
,
(
void
*
)
&
pNode
,
nodeType
(
pNode
));
SIF_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
SIF_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
}
taosArrayAddAll
(
pDst
->
result
,
res
->
result
);
taosArrayAddAll
(
pDst
->
result
,
res
->
result
);
...
@@ -539,7 +540,7 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) {
...
@@ -539,7 +540,7 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) {
SIFCtx
ctx
=
{.
code
=
0
,
.
noExec
=
true
};
SIFCtx
ctx
=
{.
code
=
0
,
.
noExec
=
true
};
ctx
.
pRes
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
ctx
.
pRes
=
taosHashInit
(
4
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
if
(
NULL
==
ctx
.
pRes
)
{
if
(
NULL
==
ctx
.
pRes
)
{
q
Error
(
"index-filter failed to taosHashInit"
);
index
Error
(
"index-filter failed to taosHashInit"
);
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
}
...
@@ -549,7 +550,7 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) {
...
@@ -549,7 +550,7 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) {
SIFParam
*
res
=
(
SIFParam
*
)
taosHashGet
(
ctx
.
pRes
,
(
void
*
)
&
pNode
,
POINTER_BYTES
);
SIFParam
*
res
=
(
SIFParam
*
)
taosHashGet
(
ctx
.
pRes
,
(
void
*
)
&
pNode
,
POINTER_BYTES
);
if
(
res
==
NULL
)
{
if
(
res
==
NULL
)
{
q
Error
(
"no valid res in hash, node:(%p), type(%d)"
,
(
void
*
)
&
pNode
,
nodeType
(
pNode
));
index
Error
(
"no valid res in hash, node:(%p), type(%d)"
,
(
void
*
)
&
pNode
,
nodeType
(
pNode
));
SIF_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
SIF_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
}
*
status
=
res
->
status
;
*
status
=
res
->
status
;
...
...
source/libs/
executor
/test/index_executor_tests.cpp
→
source/libs/
index
/test/index_executor_tests.cpp
浏览文件 @
f392ab38
文件已移动
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
f392ab38
...
@@ -132,7 +132,7 @@ typedef struct SSchLevel {
...
@@ -132,7 +132,7 @@ typedef struct SSchLevel {
int32_t
taskSucceed
;
int32_t
taskSucceed
;
int32_t
taskNum
;
int32_t
taskNum
;
int32_t
taskLaunchedNum
;
int32_t
taskLaunchedNum
;
SHashObj
*
flowCtrl
;
// key is ep, element is SSchFlowControl
int32_t
taskDoneNum
;
SArray
*
subTasks
;
// Element is SQueryTask
SArray
*
subTasks
;
// Element is SQueryTask
}
SSchLevel
;
}
SSchLevel
;
...
@@ -175,11 +175,13 @@ typedef struct SSchJob {
...
@@ -175,11 +175,13 @@ typedef struct SSchJob {
SArray
*
levels
;
// starting from 0. SArray<SSchLevel>
SArray
*
levels
;
// starting from 0. SArray<SSchLevel>
SNodeList
*
subPlans
;
// subplan pointer copied from DAG, no need to free it in scheduler
SNodeList
*
subPlans
;
// subplan pointer copied from DAG, no need to free it in scheduler
SArray
*
dataSrcTasks
;
// SArray<SQueryTask*>
int32_t
levelIdx
;
int32_t
levelIdx
;
SEpSet
dataSrcEps
;
SEpSet
dataSrcEps
;
SHashObj
*
execTasks
;
// executing tasks, key:taskid, value:SQueryTask*
SHashObj
*
execTasks
;
// executing tasks, key:taskid, value:SQueryTask*
SHashObj
*
succTasks
;
// succeed tasks, key:taskid, value:SQueryTask*
SHashObj
*
succTasks
;
// succeed tasks, key:taskid, value:SQueryTask*
SHashObj
*
failTasks
;
// failed tasks, key:taskid, value:SQueryTask*
SHashObj
*
failTasks
;
// failed tasks, key:taskid, value:SQueryTask*
SHashObj
*
flowCtrl
;
// key is ep, element is SSchFlowControl
SExplainCtx
*
explainCtx
;
SExplainCtx
*
explainCtx
;
int8_t
status
;
int8_t
status
;
...
@@ -200,7 +202,7 @@ typedef struct SSchJob {
...
@@ -200,7 +202,7 @@ typedef struct SSchJob {
extern
SSchedulerMgmt
schMgmt
;
extern
SSchedulerMgmt
schMgmt
;
#define SCH_TASK_READY_
TO_L
UNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
#define SCH_TASK_READY_
FOR_LA
UNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children))
#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
#define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1)
#define SCH_SET_TASK_LASTMSG_TYPE(_task, _type) do { if(_task) { atomic_store_32(&(_task)->lastMsgType, _type); } } while (0)
#define SCH_SET_TASK_LASTMSG_TYPE(_task, _type) do { if(_task) { atomic_store_32(&(_task)->lastMsgType, _type); } } while (0)
...
@@ -223,7 +225,7 @@ extern SSchedulerMgmt schMgmt;
...
@@ -223,7 +225,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LE
AF_TASK(_job, _task) && SCH_IS_LE
VEL_UNFINISHED((_task)->level))
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_SET_JOB_TYPE(_job, type) (_job)->attr.queryJob = ((type) != SUBPLAN_TYPE_MODIFY)
#define SCH_SET_JOB_TYPE(_job, type) (_job)->attr.queryJob = ((type) != SUBPLAN_TYPE_MODIFY)
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
...
@@ -261,7 +263,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task);
...
@@ -261,7 +263,7 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task);
int32_t
schBuildAndSendMsg
(
SSchJob
*
job
,
SSchTask
*
task
,
SQueryNodeAddr
*
addr
,
int32_t
msgType
);
int32_t
schBuildAndSendMsg
(
SSchJob
*
job
,
SSchTask
*
task
,
SQueryNodeAddr
*
addr
,
int32_t
msgType
);
SSchJob
*
schAcquireJob
(
int64_t
refId
);
SSchJob
*
schAcquireJob
(
int64_t
refId
);
int32_t
schReleaseJob
(
int64_t
refId
);
int32_t
schReleaseJob
(
int64_t
refId
);
void
schFreeFlowCtrl
(
SSch
Level
*
pLevel
);
void
schFreeFlowCtrl
(
SSch
Job
*
pJob
);
int32_t
schCheckJobNeedFlowCtrl
(
SSchJob
*
pJob
,
SSchLevel
*
pLevel
);
int32_t
schCheckJobNeedFlowCtrl
(
SSchJob
*
pJob
,
SSchLevel
*
pLevel
);
int32_t
schDecTaskFlowQuota
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
int32_t
schDecTaskFlowQuota
(
SSchJob
*
pJob
,
SSchTask
*
pTask
);
int32_t
schCheckIncTaskFlowQuota
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
bool
*
enough
);
int32_t
schCheckIncTaskFlowQuota
(
SSchJob
*
pJob
,
SSchTask
*
pTask
,
bool
*
enough
);
...
...
source/libs/scheduler/src/schFlowCtrl.c
浏览文件 @
f392ab38
...
@@ -19,13 +19,13 @@
...
@@ -19,13 +19,13 @@
#include "catalog.h"
#include "catalog.h"
#include "tref.h"
#include "tref.h"
void
schFreeFlowCtrl
(
SSch
Level
*
pLevel
)
{
void
schFreeFlowCtrl
(
SSch
Job
*
pJob
)
{
if
(
NULL
==
p
Level
->
flowCtrl
)
{
if
(
NULL
==
p
Job
->
flowCtrl
)
{
return
;
return
;
}
}
SSchFlowControl
*
ctrl
=
NULL
;
SSchFlowControl
*
ctrl
=
NULL
;
void
*
pIter
=
taosHashIterate
(
p
Level
->
flowCtrl
,
NULL
);
void
*
pIter
=
taosHashIterate
(
p
Job
->
flowCtrl
,
NULL
);
while
(
pIter
)
{
while
(
pIter
)
{
ctrl
=
(
SSchFlowControl
*
)
pIter
;
ctrl
=
(
SSchFlowControl
*
)
pIter
;
...
@@ -33,11 +33,11 @@ void schFreeFlowCtrl(SSchLevel *pLevel) {
...
@@ -33,11 +33,11 @@ void schFreeFlowCtrl(SSchLevel *pLevel) {
taosArrayDestroy
(
ctrl
->
taskList
);
taosArrayDestroy
(
ctrl
->
taskList
);
}
}
pIter
=
taosHashIterate
(
p
Level
->
flowCtrl
,
pIter
);
pIter
=
taosHashIterate
(
p
Job
->
flowCtrl
,
pIter
);
}
}
taosHashCleanup
(
p
Level
->
flowCtrl
);
taosHashCleanup
(
p
Job
->
flowCtrl
);
p
Level
->
flowCtrl
=
NULL
;
p
Job
->
flowCtrl
=
NULL
;
}
}
int32_t
schCheckJobNeedFlowCtrl
(
SSchJob
*
pJob
,
SSchLevel
*
pLevel
)
{
int32_t
schCheckJobNeedFlowCtrl
(
SSchJob
*
pJob
,
SSchLevel
*
pLevel
)
{
...
@@ -47,9 +47,9 @@ int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
...
@@ -47,9 +47,9 @@ int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
}
}
int32_t
sum
=
0
;
int32_t
sum
=
0
;
int32_t
taskNum
=
taosArrayGetSize
(
pJob
->
dataSrcTasks
);
for
(
int32_t
i
=
0
;
i
<
pLevel
->
taskNum
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
taskNum
;
++
i
)
{
SSchTask
*
pTask
=
taosArrayGet
(
pLevel
->
sub
Tasks
,
i
);
SSchTask
*
pTask
=
*
(
SSchTask
**
)
taosArrayGet
(
pJob
->
dataSrc
Tasks
,
i
);
sum
+=
pTask
->
plan
->
execNodeStat
.
tableNum
;
sum
+=
pTask
->
plan
->
execNodeStat
.
tableNum
;
}
}
...
@@ -59,9 +59,9 @@ int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
...
@@ -59,9 +59,9 @@ int32_t schCheckJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
p
Level
->
flowCtrl
=
taosHashInit
(
pLevel
->
taskNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_ENTRY_LOCK
);
p
Job
->
flowCtrl
=
taosHashInit
(
pJob
->
taskNum
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_ENTRY_LOCK
);
if
(
NULL
==
p
Level
->
flowCtrl
)
{
if
(
NULL
==
p
Job
->
flowCtrl
)
{
SCH_JOB_ELOG
(
"taosHashInit %d flowCtrl failed"
,
p
Level
->
taskNum
);
SCH_JOB_ELOG
(
"taosHashInit %d flowCtrl failed"
,
p
Job
->
taskNum
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
...
@@ -78,7 +78,7 @@ int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) {
...
@@ -78,7 +78,7 @@ int32_t schDecTaskFlowQuota(SSchJob *pJob, SSchTask *pTask) {
int32_t
code
=
0
;
int32_t
code
=
0
;
SEp
*
ep
=
SCH_GET_CUR_EP
(
&
pTask
->
plan
->
execNode
);
SEp
*
ep
=
SCH_GET_CUR_EP
(
&
pTask
->
plan
->
execNode
);
ctrl
=
(
SSchFlowControl
*
)
taosHashGet
(
p
Level
->
flowCtrl
,
ep
,
sizeof
(
SEp
));
ctrl
=
(
SSchFlowControl
*
)
taosHashGet
(
p
Job
->
flowCtrl
,
ep
,
sizeof
(
SEp
));
if
(
NULL
==
ctrl
)
{
if
(
NULL
==
ctrl
)
{
SCH_TASK_ELOG
(
"taosHashGet node from flowCtrl failed, fqdn:%s, port:%d"
,
ep
->
fqdn
,
ep
->
port
);
SCH_TASK_ELOG
(
"taosHashGet node from flowCtrl failed, fqdn:%s, port:%d"
,
ep
->
fqdn
,
ep
->
port
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
...
@@ -110,11 +110,11 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
...
@@ -110,11 +110,11 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough) {
SEp
*
ep
=
SCH_GET_CUR_EP
(
&
pTask
->
plan
->
execNode
);
SEp
*
ep
=
SCH_GET_CUR_EP
(
&
pTask
->
plan
->
execNode
);
do
{
do
{
ctrl
=
(
SSchFlowControl
*
)
taosHashGet
(
p
Level
->
flowCtrl
,
ep
,
sizeof
(
SEp
));
ctrl
=
(
SSchFlowControl
*
)
taosHashGet
(
p
Job
->
flowCtrl
,
ep
,
sizeof
(
SEp
));
if
(
NULL
==
ctrl
)
{
if
(
NULL
==
ctrl
)
{
SSchFlowControl
nctrl
=
{.
tableNumSum
=
pTask
->
plan
->
execNodeStat
.
tableNum
,
.
execTaskNum
=
1
};
SSchFlowControl
nctrl
=
{.
tableNumSum
=
pTask
->
plan
->
execNodeStat
.
tableNum
,
.
execTaskNum
=
1
};
code
=
taosHashPut
(
p
Level
->
flowCtrl
,
ep
,
sizeof
(
SEp
),
&
nctrl
,
sizeof
(
nctrl
));
code
=
taosHashPut
(
p
Job
->
flowCtrl
,
ep
,
sizeof
(
SEp
),
&
nctrl
,
sizeof
(
nctrl
));
if
(
code
)
{
if
(
code
)
{
if
(
HASH_NODE_EXIST
(
code
))
{
if
(
HASH_NODE_EXIST
(
code
))
{
continue
;
continue
;
...
@@ -273,10 +273,9 @@ int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) {
...
@@ -273,10 +273,9 @@ int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET
(
schDecTaskFlowQuota
(
pJob
,
pTask
));
SCH_ERR_RET
(
schDecTaskFlowQuota
(
pJob
,
pTask
));
SSchLevel
*
pLevel
=
pTask
->
level
;
SEp
*
ep
=
SCH_GET_CUR_EP
(
&
pTask
->
plan
->
execNode
);
SEp
*
ep
=
SCH_GET_CUR_EP
(
&
pTask
->
plan
->
execNode
);
SSchFlowControl
*
ctrl
=
(
SSchFlowControl
*
)
taosHashGet
(
p
Level
->
flowCtrl
,
ep
,
sizeof
(
SEp
));
SSchFlowControl
*
ctrl
=
(
SSchFlowControl
*
)
taosHashGet
(
p
Job
->
flowCtrl
,
ep
,
sizeof
(
SEp
));
if
(
NULL
==
ctrl
)
{
if
(
NULL
==
ctrl
)
{
SCH_TASK_ELOG
(
"taosHashGet node from flowCtrl failed, fqdn:%s, port:%d"
,
ep
->
fqdn
,
ep
->
port
);
SCH_TASK_ELOG
(
"taosHashGet node from flowCtrl failed, fqdn:%s, port:%d"
,
ep
->
fqdn
,
ep
->
port
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
...
...
source/libs/scheduler/src/scheduler.c
浏览文件 @
f392ab38
...
@@ -391,6 +391,8 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
...
@@ -391,6 +391,8 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
SCH_TASK_ELOG
(
"taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
SCH_TASK_ELOG
(
"taosArrayPush childTask failed, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
SCH_TASK_DLOG
(
"children info, the %d child TID %"
PRIx64
,
n
,
(
*
childTask
)
->
taskId
);
}
}
if
(
parentNum
>
0
)
{
if
(
parentNum
>
0
)
{
...
@@ -423,6 +425,8 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
...
@@ -423,6 +425,8 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
SCH_TASK_ELOG
(
"taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
SCH_TASK_ELOG
(
"taosArrayPush parentTask failed, level:%d, taskIdx:%d, childIdx:%d"
,
i
,
m
,
n
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
SCH_TASK_DLOG
(
"parents info, the %d parent TID %"
PRIx64
,
n
,
(
*
parentTask
)
->
taskId
);
}
}
SCH_TASK_DLOG
(
"level:%d, parentNum:%d, childNum:%d"
,
i
,
parentNum
,
childNum
);
SCH_TASK_DLOG
(
"level:%d, parentNum:%d, childNum:%d"
,
i
,
parentNum
,
childNum
);
...
@@ -464,6 +468,17 @@ int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *ad
...
@@ -464,6 +468,17 @@ int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *ad
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
schRecordQueryDataSrc
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
if
(
!
SCH_IS_DATA_SRC_QRY_TASK
(
pTask
))
{
return
TSDB_CODE_SUCCESS
;
}
taosArrayPush
(
pJob
->
dataSrcTasks
,
&
pTask
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
schValidateAndBuildJob
(
SQueryPlan
*
pDag
,
SSchJob
*
pJob
)
{
int32_t
schValidateAndBuildJob
(
SQueryPlan
*
pDag
,
SSchJob
*
pJob
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
pJob
->
queryId
=
pDag
->
queryId
;
pJob
->
queryId
=
pDag
->
queryId
;
...
@@ -473,6 +488,11 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
...
@@ -473,6 +488,11 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
SCH_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
}
pJob
->
dataSrcTasks
=
taosArrayInit
(
pDag
->
numOfSubplans
,
POINTER_BYTES
);
if
(
NULL
==
pJob
->
dataSrcTasks
)
{
SCH_ERR_RET
(
TSDB_CODE_OUT_OF_MEMORY
);
}
int32_t
levelNum
=
(
int32_t
)
LIST_LENGTH
(
pDag
->
pSubplans
);
int32_t
levelNum
=
(
int32_t
)
LIST_LENGTH
(
pDag
->
pSubplans
);
if
(
levelNum
<=
0
)
{
if
(
levelNum
<=
0
)
{
SCH_JOB_ELOG
(
"invalid level num:%d"
,
levelNum
);
SCH_JOB_ELOG
(
"invalid level num:%d"
,
levelNum
);
...
@@ -551,6 +571,8 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
...
@@ -551,6 +571,8 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
}
SCH_ERR_JRET
(
schRecordQueryDataSrc
(
pJob
,
p
));
if
(
0
!=
taosHashPut
(
planToTask
,
&
plan
,
POINTER_BYTES
,
&
p
,
POINTER_BYTES
))
{
if
(
0
!=
taosHashPut
(
planToTask
,
&
plan
,
POINTER_BYTES
,
&
p
,
POINTER_BYTES
))
{
SCH_TASK_ELOG
(
"taosHashPut to planToTaks failed, taskIdx:%d"
,
n
);
SCH_TASK_ELOG
(
"taosHashPut to planToTaks failed, taskIdx:%d"
,
n
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
SCH_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
...
@@ -629,6 +651,17 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
...
@@ -629,6 +651,17 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
schRemoveTaskFromExecList
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
int32_t
code
=
taosHashRemove
(
pJob
->
execTasks
,
&
pTask
->
taskId
,
sizeof
(
pTask
->
taskId
));
if
(
code
)
{
SCH_TASK_ELOG
(
"task failed to rm from execTask list, code:%x"
,
code
);
SCH_ERR_RET
(
TSDB_CODE_SCH_INTERNAL_ERROR
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
schPushTaskToExecList
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
int32_t
schPushTaskToExecList
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
int32_t
code
=
taosHashPut
(
pJob
->
execTasks
,
&
pTask
->
taskId
,
sizeof
(
pTask
->
taskId
),
&
pTask
,
POINTER_BYTES
);
int32_t
code
=
taosHashPut
(
pJob
->
execTasks
,
&
pTask
->
taskId
,
sizeof
(
pTask
->
taskId
),
&
pTask
,
POINTER_BYTES
);
if
(
0
!=
code
)
{
if
(
0
!=
code
)
{
...
@@ -774,6 +807,9 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
...
@@ -774,6 +807,9 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
int32_t
schHandleTaskRetry
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
int32_t
schHandleTaskRetry
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
atomic_sub_fetch_32
(
&
pTask
->
level
->
taskLaunchedNum
,
1
);
atomic_sub_fetch_32
(
&
pTask
->
level
->
taskLaunchedNum
,
1
);
SCH_ERR_RET
(
schRemoveTaskFromExecList
(
pJob
,
pTask
));
SCH_SET_TASK_STATUS
(
pTask
,
JOB_TASK_STATUS_NOT_START
);
if
(
SCH_TASK_NEED_FLOW_CTRL
(
pJob
,
pTask
))
{
if
(
SCH_TASK_NEED_FLOW_CTRL
(
pJob
,
pTask
))
{
SCH_ERR_RET
(
schDecTaskFlowQuota
(
pJob
,
pTask
));
SCH_ERR_RET
(
schDecTaskFlowQuota
(
pJob
,
pTask
));
SCH_ERR_RET
(
schLaunchTasksInFlowCtrlList
(
pJob
,
pTask
));
SCH_ERR_RET
(
schLaunchTasksInFlowCtrlList
(
pJob
,
pTask
));
...
@@ -947,6 +983,32 @@ _return:
...
@@ -947,6 +983,32 @@ _return:
SCH_RET
(
schProcessOnJobFailure
(
pJob
,
errCode
));
SCH_RET
(
schProcessOnJobFailure
(
pJob
,
errCode
));
}
}
int32_t
schLaunchNextLevelTasks
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
if
(
!
SCH_IS_QUERY_JOB
(
pJob
))
{
return
TSDB_CODE_SUCCESS
;
}
SSchLevel
*
pLevel
=
pTask
->
level
;
int32_t
doneNum
=
atomic_add_fetch_32
(
&
pLevel
->
taskDoneNum
,
1
);
if
(
doneNum
==
pLevel
->
taskNum
)
{
pJob
->
levelIdx
--
;
pLevel
=
taosArrayGet
(
pJob
->
levels
,
pJob
->
levelIdx
);
for
(
int32_t
i
=
0
;
i
<
pLevel
->
taskNum
;
++
i
)
{
SSchTask
*
pTask
=
taosArrayGet
(
pLevel
->
subTasks
,
i
);
if
(
pTask
->
children
&&
taosArrayGetSize
(
pTask
->
children
)
>
0
)
{
continue
;
}
SCH_ERR_RET
(
schLaunchTask
(
pJob
,
pTask
));
}
}
return
TSDB_CODE_SUCCESS
;
}
// Note: no more task error processing, handled in function internal
// Note: no more task error processing, handled in function internal
int32_t
schProcessOnTaskSuccess
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
int32_t
schProcessOnTaskSuccess
(
SSchJob
*
pJob
,
SSchTask
*
pTask
)
{
bool
moved
=
false
;
bool
moved
=
false
;
...
@@ -1015,11 +1077,13 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
...
@@ -1015,11 +1077,13 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
qSetSubplanExecutionNode
(
par
->
plan
,
pTask
->
plan
->
id
.
groupId
,
&
source
);
qSetSubplanExecutionNode
(
par
->
plan
,
pTask
->
plan
->
id
.
groupId
,
&
source
);
SCH_UNLOCK
(
SCH_WRITE
,
&
par
->
lock
);
SCH_UNLOCK
(
SCH_WRITE
,
&
par
->
lock
);
if
(
SCH_TASK_READY_
TO_L
UNCH
(
readyNum
,
par
))
{
if
(
SCH_TASK_READY_
FOR_LA
UNCH
(
readyNum
,
par
))
{
SCH_ERR_RET
(
schLaunchTask
Impl
(
pJob
,
par
));
SCH_ERR_RET
(
schLaunchTask
(
pJob
,
par
));
}
}
}
}
SCH_ERR_RET
(
schLaunchNextLevelTasks
(
pJob
,
pTask
));
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
_return:
_return:
...
@@ -2400,8 +2464,6 @@ void schFreeJobImpl(void *job) {
...
@@ -2400,8 +2464,6 @@ void schFreeJobImpl(void *job) {
for
(
int32_t
i
=
0
;
i
<
numOfLevels
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfLevels
;
++
i
)
{
SSchLevel
*
pLevel
=
taosArrayGet
(
pJob
->
levels
,
i
);
SSchLevel
*
pLevel
=
taosArrayGet
(
pJob
->
levels
,
i
);
schFreeFlowCtrl
(
pLevel
);
int32_t
numOfTasks
=
taosArrayGetSize
(
pLevel
->
subTasks
);
int32_t
numOfTasks
=
taosArrayGetSize
(
pLevel
->
subTasks
);
for
(
int32_t
j
=
0
;
j
<
numOfTasks
;
++
j
)
{
for
(
int32_t
j
=
0
;
j
<
numOfTasks
;
++
j
)
{
SSchTask
*
pTask
=
taosArrayGet
(
pLevel
->
subTasks
,
j
);
SSchTask
*
pTask
=
taosArrayGet
(
pLevel
->
subTasks
,
j
);
...
@@ -2411,12 +2473,15 @@ void schFreeJobImpl(void *job) {
...
@@ -2411,12 +2473,15 @@ void schFreeJobImpl(void *job) {
taosArrayDestroy
(
pLevel
->
subTasks
);
taosArrayDestroy
(
pLevel
->
subTasks
);
}
}
schFreeFlowCtrl
(
pJob
);
taosHashCleanup
(
pJob
->
execTasks
);
taosHashCleanup
(
pJob
->
execTasks
);
taosHashCleanup
(
pJob
->
failTasks
);
taosHashCleanup
(
pJob
->
failTasks
);
taosHashCleanup
(
pJob
->
succTasks
);
taosHashCleanup
(
pJob
->
succTasks
);
taosArrayDestroy
(
pJob
->
levels
);
taosArrayDestroy
(
pJob
->
levels
);
taosArrayDestroy
(
pJob
->
nodeList
);
taosArrayDestroy
(
pJob
->
nodeList
);
taosArrayDestroy
(
pJob
->
dataSrcTasks
);
qExplainFreeCtx
(
pJob
->
explainCtx
);
qExplainFreeCtx
(
pJob
->
explainCtx
);
...
...
source/libs/stream/src/tstream.c
浏览文件 @
f392ab38
...
@@ -134,7 +134,7 @@ int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input) {
...
@@ -134,7 +134,7 @@ int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input) {
}
}
static
int32_t
streamTaskExecImpl
(
SStreamTask
*
pTask
,
void
*
data
,
SArray
*
pRes
)
{
static
int32_t
streamTaskExecImpl
(
SStreamTask
*
pTask
,
void
*
data
,
SArray
*
pRes
)
{
void
*
exec
=
pTask
->
exec
.
runners
[
0
].
executor
;
void
*
exec
=
pTask
->
exec
.
executor
;
// set input
// set input
if
(
pTask
->
inputType
==
STREAM_INPUT__DATA_SUBMIT
)
{
if
(
pTask
->
inputType
==
STREAM_INPUT__DATA_SUBMIT
)
{
...
@@ -171,12 +171,12 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
...
@@ -171,12 +171,12 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
}
}
// TODO: handle version
// TODO: handle version
int32_t
stream
TaskExec2
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
)
{
int32_t
stream
Exec
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
)
{
SArray
*
pRes
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
SArray
*
pRes
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
if
(
pRes
==
NULL
)
return
-
1
;
if
(
pRes
==
NULL
)
return
-
1
;
while
(
1
)
{
while
(
1
)
{
int8_t
execStatus
=
atomic_val_compare_exchange_8
(
&
pTask
->
status
,
TASK_STATUS__IDLE
,
TASK_STATUS__EXECUTING
);
int8_t
execStatus
=
atomic_val_compare_exchange_8
(
&
pTask
->
status
,
TASK_STATUS__IDLE
,
TASK_STATUS__EXECUTING
);
void
*
exec
=
pTask
->
exec
.
runners
[
0
].
executor
;
void
*
exec
=
pTask
->
exec
.
executor
;
if
(
execStatus
==
TASK_STATUS__IDLE
)
{
if
(
execStatus
==
TASK_STATUS__IDLE
)
{
// first run, from qall, handle failure from last exec
// first run, from qall, handle failure from last exec
while
(
1
)
{
while
(
1
)
{
...
@@ -278,7 +278,7 @@ FAIL:
...
@@ -278,7 +278,7 @@ FAIL:
return
-
1
;
return
-
1
;
}
}
int32_t
stream
Task
Sink
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
)
{
int32_t
streamSink
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
)
{
bool
firstRun
=
1
;
bool
firstRun
=
1
;
while
(
1
)
{
while
(
1
)
{
SStreamDataBlock
*
pBlock
=
NULL
;
SStreamDataBlock
*
pBlock
=
NULL
;
...
@@ -407,7 +407,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
...
@@ -407,7 +407,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
return
0
;
return
0
;
}
}
int32_t
stream
Task
ProcessDispatchReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamDispatchReq
*
pReq
,
SRpcMsg
*
pRsp
)
{
int32_t
streamProcessDispatchReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamDispatchReq
*
pReq
,
SRpcMsg
*
pRsp
)
{
// 1. handle input
// 1. handle input
streamTaskEnqueue
(
pTask
,
pReq
,
pRsp
);
streamTaskEnqueue
(
pTask
,
pReq
,
pRsp
);
...
@@ -415,172 +415,42 @@ int32_t streamTaskProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStream
...
@@ -415,172 +415,42 @@ int32_t streamTaskProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStream
// 2.1. idle: exec
// 2.1. idle: exec
// 2.2. executing: return
// 2.2. executing: return
// 2.3. closing: keep trying
// 2.3. closing: keep trying
stream
TaskExec2
(
pTask
,
pMsgCb
);
stream
Exec
(
pTask
,
pMsgCb
);
// 3. handle output
// 3. handle output
// 3.1 check and set status
// 3.1 check and set status
// 3.2 dispatch / sink
// 3.2 dispatch / sink
stream
Task
Sink
(
pTask
,
pMsgCb
);
streamSink
(
pTask
,
pMsgCb
);
return
0
;
return
0
;
}
}
int32_t
stream
Task
ProcessDispatchRsp
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamDispatchRsp
*
pRsp
)
{
int32_t
streamProcessDispatchRsp
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamDispatchRsp
*
pRsp
)
{
atomic_store_8
(
&
pTask
->
inputStatus
,
pRsp
->
inputStatus
);
atomic_store_8
(
&
pTask
->
inputStatus
,
pRsp
->
inputStatus
);
if
(
pRsp
->
inputStatus
==
TASK_INPUT_STATUS__BLOCKED
)
{
if
(
pRsp
->
inputStatus
==
TASK_INPUT_STATUS__BLOCKED
)
{
// TODO: init recover timer
// TODO: init recover timer
}
}
// continue dispatch
// continue dispatch
stream
Task
Sink
(
pTask
,
pMsgCb
);
streamSink
(
pTask
,
pMsgCb
);
return
0
;
return
0
;
}
}
int32_t
streamTaskProcessRunReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
)
{
int32_t
streamTaskProcessRunReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
)
{
stream
TaskExec2
(
pTask
,
pMsgCb
);
stream
Exec
(
pTask
,
pMsgCb
);
stream
Task
Sink
(
pTask
,
pMsgCb
);
streamSink
(
pTask
,
pMsgCb
);
return
0
;
return
0
;
}
}
int32_t
stream
Task
ProcessRecoverReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamTaskRecoverReq
*
pReq
,
SRpcMsg
*
pMsg
)
{
int32_t
streamProcessRecoverReq
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
SStreamTaskRecoverReq
*
pReq
,
SRpcMsg
*
pMsg
)
{
//
//
return
0
;
return
0
;
}
}
int32_t
stream
Task
ProcessRecoverRsp
(
SStreamTask
*
pTask
,
SStreamTaskRecoverRsp
*
pRsp
)
{
int32_t
streamProcessRecoverRsp
(
SStreamTask
*
pTask
,
SStreamTaskRecoverRsp
*
pRsp
)
{
//
//
return
0
;
return
0
;
}
}
int32_t
streamExecTask
(
SStreamTask
*
pTask
,
SMsgCb
*
pMsgCb
,
const
void
*
input
,
int32_t
inputType
,
int32_t
workId
)
{
SArray
*
pRes
=
NULL
;
// source
if
(
inputType
==
STREAM_DATA_TYPE_SUBMIT_BLOCK
&&
pTask
->
sourceType
!=
TASK_SOURCE__SCAN
)
return
0
;
// exec
if
(
pTask
->
execType
!=
TASK_EXEC__NONE
)
{
ASSERT
(
workId
<
pTask
->
exec
.
numOfRunners
);
void
*
exec
=
pTask
->
exec
.
runners
[
workId
].
executor
;
pRes
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
if
(
pRes
==
NULL
)
{
return
-
1
;
}
if
(
inputType
==
STREAM_DATA_TYPE_SUBMIT_BLOCK
)
{
qSetStreamInput
(
exec
,
input
,
inputType
);
while
(
1
)
{
SSDataBlock
*
output
;
uint64_t
ts
;
if
(
qExecTask
(
exec
,
&
output
,
&
ts
)
<
0
)
{
ASSERT
(
false
);
}
if
(
output
==
NULL
)
{
break
;
}
taosArrayPush
(
pRes
,
output
);
}
}
else
if
(
inputType
==
STREAM_DATA_TYPE_SSDATA_BLOCK
)
{
const
SArray
*
blocks
=
(
const
SArray
*
)
input
;
/*int32_t sz = taosArrayGetSize(blocks);*/
/*for (int32_t i = 0; i < sz; i++) {*/
/*SSDataBlock* pBlock = taosArrayGet(blocks, i);*/
/*qSetStreamInput(exec, pBlock, inputType);*/
qSetMultiStreamInput
(
exec
,
blocks
->
pData
,
blocks
->
size
,
STREAM_DATA_TYPE_SSDATA_BLOCK
);
while
(
1
)
{
SSDataBlock
*
output
;
uint64_t
ts
;
if
(
qExecTask
(
exec
,
&
output
,
&
ts
)
<
0
)
{
ASSERT
(
false
);
}
if
(
output
==
NULL
)
{
break
;
}
taosArrayPush
(
pRes
,
output
);
}
/*}*/
}
else
{
ASSERT
(
0
);
}
}
else
{
ASSERT
(
inputType
==
STREAM_DATA_TYPE_SSDATA_BLOCK
);
pRes
=
(
SArray
*
)
input
;
}
if
(
pRes
==
NULL
||
taosArrayGetSize
(
pRes
)
==
0
)
return
0
;
// sink
if
(
pTask
->
sinkType
==
TASK_SINK__TABLE
)
{
// blockDebugShowData(pRes);
pTask
->
tbSink
.
tbSinkFunc
(
pTask
,
pTask
->
tbSink
.
vnode
,
0
,
pRes
);
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__SMA
)
{
pTask
->
smaSink
.
smaSink
(
pTask
->
ahandle
,
pTask
->
smaSink
.
smaId
,
pRes
);
//
}
else
if
(
pTask
->
sinkType
==
TASK_SINK__FETCH
)
{
//
}
else
{
ASSERT
(
pTask
->
sinkType
==
TASK_SINK__NONE
);
}
// dispatch
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__INPLACE
)
{
SRpcMsg
dispatchMsg
=
{
0
};
if
(
streamBuildExecMsg
(
pTask
,
pRes
,
&
dispatchMsg
,
NULL
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
int32_t
qType
;
if
(
pTask
->
dispatchMsgType
==
TDMT_VND_TASK_PIPE_EXEC
||
pTask
->
dispatchMsgType
==
TDMT_SND_TASK_PIPE_EXEC
)
{
qType
=
FETCH_QUEUE
;
}
else
if
(
pTask
->
dispatchMsgType
==
TDMT_VND_TASK_MERGE_EXEC
||
pTask
->
dispatchMsgType
==
TDMT_SND_TASK_MERGE_EXEC
)
{
qType
=
MERGE_QUEUE
;
}
else
if
(
pTask
->
dispatchMsgType
==
TDMT_VND_TASK_WRITE_EXEC
)
{
qType
=
WRITE_QUEUE
;
}
else
{
ASSERT
(
0
);
}
tmsgPutToQueue
(
pMsgCb
,
qType
,
&
dispatchMsg
);
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__FIXED
)
{
SRpcMsg
dispatchMsg
=
{
0
};
SEpSet
*
pEpSet
=
NULL
;
if
(
streamBuildExecMsg
(
pTask
,
pRes
,
&
dispatchMsg
,
&
pEpSet
)
<
0
)
{
ASSERT
(
0
);
return
-
1
;
}
tmsgSendReq
(
pEpSet
,
&
dispatchMsg
);
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
SHashObj
*
pShuffleRes
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
if
(
pShuffleRes
==
NULL
)
{
return
-
1
;
}
int32_t
sz
=
taosArrayGetSize
(
pRes
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
pRes
,
i
);
SArray
*
pArray
=
taosHashGet
(
pShuffleRes
,
&
pDataBlock
->
info
.
groupId
,
sizeof
(
int64_t
));
if
(
pArray
==
NULL
)
{
pArray
=
taosArrayInit
(
0
,
sizeof
(
SSDataBlock
));
if
(
pArray
==
NULL
)
{
return
-
1
;
}
taosHashPut
(
pShuffleRes
,
&
pDataBlock
->
info
.
groupId
,
sizeof
(
int64_t
),
&
pArray
,
sizeof
(
void
*
));
}
taosArrayPush
(
pArray
,
pDataBlock
);
}
if
(
streamShuffleDispatch
(
pTask
,
pMsgCb
,
pShuffleRes
)
<
0
)
{
return
-
1
;
}
}
else
{
ASSERT
(
pTask
->
dispatchType
==
TASK_DISPATCH__NONE
);
}
return
0
;
}
int32_t
tEncodeSStreamTaskExecReq
(
void
**
buf
,
const
SStreamTaskExecReq
*
pReq
)
{
int32_t
tEncodeSStreamTaskExecReq
(
void
**
buf
,
const
SStreamTaskExecReq
*
pReq
)
{
int32_t
tlen
=
0
;
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
streamId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
streamId
);
...
@@ -607,20 +477,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId) {
...
@@ -607,20 +477,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId) {
pTask
->
streamId
=
streamId
;
pTask
->
streamId
=
streamId
;
pTask
->
status
=
TASK_STATUS__IDLE
;
pTask
->
status
=
TASK_STATUS__IDLE
;
pTask
->
inputQ
=
taosOpenQueue
();
pTask
->
outputQ
=
taosOpenQueue
();
pTask
->
inputQAll
=
taosAllocateQall
();
pTask
->
outputQAll
=
taosAllocateQall
();
if
(
pTask
->
inputQ
==
NULL
||
pTask
->
outputQ
==
NULL
||
pTask
->
inputQAll
==
NULL
||
pTask
->
outputQAll
==
NULL
)
goto
FAIL
;
return
pTask
;
return
pTask
;
FAIL:
if
(
pTask
->
inputQ
)
taosCloseQueue
(
pTask
->
inputQ
);
if
(
pTask
->
outputQ
)
taosCloseQueue
(
pTask
->
outputQ
);
if
(
pTask
->
inputQAll
)
taosFreeQall
(
pTask
->
inputQAll
);
if
(
pTask
->
outputQAll
)
taosFreeQall
(
pTask
->
outputQAll
);
if
(
pTask
)
taosMemoryFree
(
pTask
);
return
NULL
;
}
}
int32_t
tEncodeSStreamTask
(
SEncoder
*
pEncoder
,
const
SStreamTask
*
pTask
)
{
int32_t
tEncodeSStreamTask
(
SEncoder
*
pEncoder
,
const
SStreamTask
*
pTask
)
{
...
@@ -722,11 +579,7 @@ void tFreeSStreamTask(SStreamTask* pTask) {
...
@@ -722,11 +579,7 @@ void tFreeSStreamTask(SStreamTask* pTask) {
taosCloseQueue
(
pTask
->
outputQ
);
taosCloseQueue
(
pTask
->
outputQ
);
// TODO
// TODO
if
(
pTask
->
exec
.
qmsg
)
taosMemoryFree
(
pTask
->
exec
.
qmsg
);
if
(
pTask
->
exec
.
qmsg
)
taosMemoryFree
(
pTask
->
exec
.
qmsg
);
for
(
int32_t
i
=
0
;
i
<
pTask
->
exec
.
numOfRunners
;
i
++
)
{
qDestroyTask
(
pTask
->
exec
.
executor
);
qDestroyTask
(
pTask
->
exec
.
runners
[
i
].
executor
);
}
taosMemoryFree
(
pTask
->
exec
.
runners
);
/*taosMemoryFree(pTask->executor);*/
taosMemoryFree
(
pTask
);
taosMemoryFree
(
pTask
);
}
}
...
...
taos-tools
@
2c4a1c83
比较
788929bd
...
2c4a1c83
Subproject commit
788929bdc475d264d8306ceff30f7df006fd18d8
Subproject commit
2c4a1c83322b983881aea93ec2b51e7df826125a
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录