Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
cf54ec55
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
cf54ec55
编写于
7月 27, 2022
作者:
L
Liu Jicong
提交者:
GitHub
7月 27, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #15475 from taosdata/feature/stream
refactor(stream): remove option
上级
e326c1cf
904ec81b
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
51 addition
and
44 deletion
+51
-44
include/libs/executor/executor.h
include/libs/executor/executor.h
+2
-12
include/libs/stream/tstream.h
include/libs/stream/tstream.h
+1
-1
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+1
-1
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+2
-0
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+5
-5
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+1
-1
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+6
-12
source/libs/stream/src/streamDispatch.c
source/libs/stream/src/streamDispatch.c
+18
-4
source/libs/stream/src/streamExec.c
source/libs/stream/src/streamExec.c
+4
-4
source/libs/stream/src/streamTask.c
source/libs/stream/src/streamTask.c
+2
-2
source/libs/wal/inc/walInt.h
source/libs/wal/inc/walInt.h
+6
-0
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+3
-2
未找到文件。
include/libs/executor/executor.h
浏览文件 @
cf54ec55
...
@@ -64,17 +64,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers);
...
@@ -64,17 +64,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers);
* @param SReadHandle
* @param SReadHandle
* @return
* @return
*/
*/
qTaskInfo_t
qCreateQueueExecTaskInfo
(
void
*
msg
,
SReadHandle
*
readers
,
int32_t
*
numOfCols
,
qTaskInfo_t
qCreateQueueExecTaskInfo
(
void
*
msg
,
SReadHandle
*
readers
,
int32_t
*
numOfCols
,
SSchemaWrapper
**
pSchema
);
SSchemaWrapper
**
pSchema
);
/**
* Set the input data block for the stream scan.
* @param tinfo
* @param input
* @param type
* @return
*/
int32_t
qSetStreamInput
(
qTaskInfo_t
tinfo
,
const
void
*
input
,
int32_t
type
,
bool
assignUid
);
/**
/**
* Set multiple input data blocks for the stream scan.
* Set multiple input data blocks for the stream scan.
...
@@ -84,7 +74,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool
...
@@ -84,7 +74,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool
* @param type
* @param type
* @return
* @return
*/
*/
int32_t
qSetMultiStreamInput
(
qTaskInfo_t
tinfo
,
const
void
*
pBlocks
,
size_t
numOfBlocks
,
int32_t
type
,
bool
assignUid
);
int32_t
qSetMultiStreamInput
(
qTaskInfo_t
tinfo
,
const
void
*
pBlocks
,
size_t
numOfBlocks
,
int32_t
type
);
/**
/**
* Update the table id list, add or remove.
* Update the table id list, add or remove.
...
...
include/libs/stream/tstream.h
浏览文件 @
cf54ec55
...
@@ -270,7 +270,7 @@ typedef struct SStreamTask {
...
@@ -270,7 +270,7 @@ typedef struct SStreamTask {
int64_t
startVer
;
int64_t
startVer
;
int64_t
checkpointVer
;
int64_t
checkpointVer
;
int64_t
processedVer
;
int64_t
processedVer
;
int32_t
numOfVgroups
;
//
int32_t numOfVgroups;
// children info
// children info
SArray
*
childEpInfo
;
// SArray<SStreamChildEpInfo*>
SArray
*
childEpInfo
;
// SArray<SStreamChildEpInfo*>
...
...
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
浏览文件 @
cf54ec55
...
@@ -89,7 +89,7 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
...
@@ -89,7 +89,7 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
int32_t
code
=
vnodeProcessFetchMsg
(
pVnode
->
pImpl
,
pMsg
,
pInfo
);
int32_t
code
=
vnodeProcessFetchMsg
(
pVnode
->
pImpl
,
pMsg
,
pInfo
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
if
(
terrno
!=
0
)
code
=
terrno
;
if
(
terrno
!=
0
)
code
=
terrno
;
dGError
(
"vgId:%d, msg:%p failed to stream since %s"
,
pVnode
->
vgId
,
pMsg
,
terrstr
());
dGError
(
"vgId:%d, msg:%p failed to
process
stream since %s"
,
pVnode
->
vgId
,
pMsg
,
terrstr
());
vmSendRsp
(
pMsg
,
code
);
vmSendRsp
(
pMsg
,
code
);
}
}
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
cf54ec55
...
@@ -391,10 +391,12 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
...
@@ -391,10 +391,12 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
// exec
// exec
pInnerTask
->
execType
=
TASK_EXEC__PIPE
;
pInnerTask
->
execType
=
TASK_EXEC__PIPE
;
#if 0
SDbObj* pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb);
SDbObj* pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb);
ASSERT(pDbObj != NULL);
ASSERT(pDbObj != NULL);
sdbRelease(pSdb, pSourceDb);
sdbRelease(pSdb, pSourceDb);
pInnerTask->numOfVgroups = pSourceDb->cfg.numOfVgroups;
pInnerTask->numOfVgroups = pSourceDb->cfg.numOfVgroups;
#endif
if
(
tsSchedStreamToSnode
)
{
if
(
tsSchedStreamToSnode
)
{
SSnodeObj
*
pSnode
=
mndSchedFetchOneSnode
(
pMnode
);
SSnodeObj
*
pSnode
=
mndSchedFetchOneSnode
(
pMnode
);
...
...
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
cf54ec55
...
@@ -611,8 +611,8 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche
...
@@ -611,8 +611,8 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche
goto
_err
;
goto
_err
;
}
}
smaDebug
(
"vgId:%d, process submit req for rsma table %"
PRIi64
" level %"
PRIi8
" version:%"
PRIi64
,
SMA_VID
(
pSma
)
,
smaDebug
(
"vgId:%d, process submit req for rsma table %"
PRIi64
" level %"
PRIi8
" version:%"
PRIi64
,
suid
,
pItem
->
level
,
output
->
info
.
version
);
SMA_VID
(
pSma
),
suid
,
pItem
->
level
,
output
->
info
.
version
);
taosMemoryFreeClear
(
pReq
);
taosMemoryFreeClear
(
pReq
);
taosArrayClear
(
pResult
);
taosArrayClear
(
pResult
);
...
@@ -644,7 +644,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
...
@@ -644,7 +644,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
smaDebug
(
"vgId:%d, execute rsma %"
PRIi8
" task for qTaskInfo:%p suid:%"
PRIu64
,
SMA_VID
(
pSma
),
level
,
smaDebug
(
"vgId:%d, execute rsma %"
PRIi8
" task for qTaskInfo:%p suid:%"
PRIu64
,
SMA_VID
(
pSma
),
level
,
pItem
->
taskInfo
,
suid
);
pItem
->
taskInfo
,
suid
);
if
(
qSet
StreamInput
(
pItem
->
taskInfo
,
pMsg
,
inputType
,
tru
e
)
<
0
)
{
// INPUT__DATA_SUBMIT
if
(
qSet
MultiStreamInput
(
pItem
->
taskInfo
,
pMsg
,
1
,
inputTyp
e
)
<
0
)
{
// INPUT__DATA_SUBMIT
smaError
(
"vgId:%d, rsma % "
PRIi8
" qSetStreamInput failed since %s"
,
SMA_VID
(
pSma
),
level
,
tstrerror
(
terrno
));
smaError
(
"vgId:%d, rsma % "
PRIi8
" qSetStreamInput failed since %s"
,
SMA_VID
(
pSma
),
level
,
tstrerror
(
terrno
));
return
TSDB_CODE_FAILED
;
return
TSDB_CODE_FAILED
;
}
}
...
@@ -1329,7 +1329,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
...
@@ -1329,7 +1329,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
tdRefRSmaInfo
(
pSma
,
pRSmaInfo
);
tdRefRSmaInfo
(
pSma
,
pRSmaInfo
);
SSDataBlock
dataBlock
=
{.
info
.
type
=
STREAM_GET_ALL
};
SSDataBlock
dataBlock
=
{.
info
.
type
=
STREAM_GET_ALL
};
qSet
StreamInput
(
pItem
->
taskInfo
,
&
dataBlock
,
STREAM_INPUT__DATA_BLOCK
,
false
);
qSet
MultiStreamInput
(
pItem
->
taskInfo
,
&
dataBlock
,
1
,
STREAM_INPUT__DATA_BLOCK
);
tdRSmaFetchAndSubmitResult
(
pItem
,
pRSmaInfo
->
pTSchema
,
pRSmaInfo
->
suid
,
pStat
,
STREAM_INPUT__DATA_BLOCK
);
tdRSmaFetchAndSubmitResult
(
pItem
,
pRSmaInfo
->
pTSchema
,
pRSmaInfo
->
suid
,
pStat
,
STREAM_INPUT__DATA_BLOCK
);
tdUnRefRSmaInfo
(
pSma
,
pRSmaInfo
);
tdUnRefRSmaInfo
(
pSma
,
pRSmaInfo
);
...
@@ -1356,4 +1356,4 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
...
@@ -1356,4 +1356,4 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
_end:
_end:
tdReleaseSmaRef
(
smaMgmt
.
rsetId
,
pItem
->
refId
,
__func__
,
__LINE__
);
tdReleaseSmaRef
(
smaMgmt
.
rsetId
,
pItem
->
refId
,
__func__
,
__LINE__
);
}
}
\ No newline at end of file
source/dnode/vnode/src/tq/tq.c
浏览文件 @
cf54ec55
...
@@ -653,7 +653,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
...
@@ -653,7 +653,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
}
else
{
}
else
{
SReadHandle
mgHandle
=
{
SReadHandle
mgHandle
=
{
.
vnode
=
NULL
,
.
vnode
=
NULL
,
.
numOfVgroups
=
pTask
->
numOfVgroups
,
.
numOfVgroups
=
(
int32_t
)
taosArrayGetSize
(
pTask
->
childEpInfo
)
,
};
};
pTask
->
exec
.
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
exec
.
qmsg
,
&
mgHandle
);
pTask
->
exec
.
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
exec
.
qmsg
,
&
mgHandle
);
}
}
...
...
source/libs/executor/src/executor.c
浏览文件 @
cf54ec55
...
@@ -30,8 +30,7 @@ static void cleanupRefPool() {
...
@@ -30,8 +30,7 @@ static void cleanupRefPool() {
taosCloseRef
(
ref
);
taosCloseRef
(
ref
);
}
}
static
int32_t
doSetStreamBlock
(
SOperatorInfo
*
pOperator
,
void
*
input
,
size_t
numOfBlocks
,
int32_t
type
,
bool
assignUid
,
static
int32_t
doSetStreamBlock
(
SOperatorInfo
*
pOperator
,
void
*
input
,
size_t
numOfBlocks
,
int32_t
type
,
char
*
id
)
{
char
*
id
)
{
ASSERT
(
pOperator
!=
NULL
);
ASSERT
(
pOperator
!=
NULL
);
if
(
pOperator
->
operatorType
!=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
if
(
pOperator
->
operatorType
!=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
if
(
pOperator
->
numOfDownstream
==
0
)
{
if
(
pOperator
->
numOfDownstream
==
0
)
{
...
@@ -44,12 +43,12 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
...
@@ -44,12 +43,12 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
return
TSDB_CODE_QRY_APP_ERROR
;
return
TSDB_CODE_QRY_APP_ERROR
;
}
}
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
status
=
OP_NOT_OPENED
;
return
doSetStreamBlock
(
pOperator
->
pDownstream
[
0
],
input
,
numOfBlocks
,
type
,
assignUid
,
id
);
return
doSetStreamBlock
(
pOperator
->
pDownstream
[
0
],
input
,
numOfBlocks
,
type
,
id
);
}
else
{
}
else
{
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
status
=
OP_NOT_OPENED
;
SStreamScanInfo
*
pInfo
=
pOperator
->
info
;
SStreamScanInfo
*
pInfo
=
pOperator
->
info
;
pInfo
->
assignBlockUid
=
assignUid
;
/*pInfo->assignBlockUid = assignUid;*/
// TODO: if a block was set but not consumed,
// TODO: if a block was set but not consumed,
// prevent setting a different type of block
// prevent setting a different type of block
...
@@ -95,11 +94,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
...
@@ -95,11 +94,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
}
}
}
}
int32_t
qSetStreamInput
(
qTaskInfo_t
tinfo
,
const
void
*
input
,
int32_t
type
,
bool
assignUid
)
{
int32_t
qSetMultiStreamInput
(
qTaskInfo_t
tinfo
,
const
void
*
pBlocks
,
size_t
numOfBlocks
,
int32_t
type
)
{
return
qSetMultiStreamInput
(
tinfo
,
input
,
1
,
type
,
assignUid
);
}
int32_t
qSetMultiStreamInput
(
qTaskInfo_t
tinfo
,
const
void
*
pBlocks
,
size_t
numOfBlocks
,
int32_t
type
,
bool
assignUid
)
{
if
(
tinfo
==
NULL
)
{
if
(
tinfo
==
NULL
)
{
return
TSDB_CODE_QRY_APP_ERROR
;
return
TSDB_CODE_QRY_APP_ERROR
;
}
}
...
@@ -110,8 +105,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
...
@@ -110,8 +105,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
SExecTaskInfo
*
pTaskInfo
=
(
SExecTaskInfo
*
)
tinfo
;
int32_t
code
=
int32_t
code
=
doSetStreamBlock
(
pTaskInfo
->
pRoot
,
(
void
**
)
pBlocks
,
numOfBlocks
,
type
,
GET_TASKID
(
pTaskInfo
));
doSetStreamBlock
(
pTaskInfo
->
pRoot
,
(
void
**
)
pBlocks
,
numOfBlocks
,
type
,
assignUid
,
GET_TASKID
(
pTaskInfo
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"%s failed to set the stream block data"
,
GET_TASKID
(
pTaskInfo
));
qError
(
"%s failed to set the stream block data"
,
GET_TASKID
(
pTaskInfo
));
}
else
{
}
else
{
...
@@ -337,7 +331,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
...
@@ -337,7 +331,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
}
}
code
=
dsCreateDataSinker
(
pSubplan
->
pDataSink
,
handle
,
pSinkParam
);
code
=
dsCreateDataSinker
(
pSubplan
->
pDataSink
,
handle
,
pSinkParam
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
taosMemoryFreeClear
(
pSinkParam
);
taosMemoryFreeClear
(
pSinkParam
);
}
}
}
}
...
...
source/libs/stream/src/streamDispatch.c
浏览文件 @
cf54ec55
...
@@ -227,6 +227,8 @@ int32_t streamDispatchOneReq(SStreamTask* pTask, const SStreamDispatchReq* pReq,
...
@@ -227,6 +227,8 @@ int32_t streamDispatchOneReq(SStreamTask* pTask, const SStreamDispatchReq* pReq,
msg
.
pCont
=
buf
;
msg
.
pCont
=
buf
;
msg
.
msgType
=
pTask
->
dispatchMsgType
;
msg
.
msgType
=
pTask
->
dispatchMsgType
;
qDebug
(
"dispatch from task %d to task %d node %d"
,
pTask
->
taskId
,
pReq
->
taskId
,
vgId
);
tmsgSendReq
(
pEpSet
,
&
msg
);
tmsgSendReq
(
pEpSet
,
&
msg
);
code
=
0
;
code
=
0
;
...
@@ -281,8 +283,10 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
...
@@ -281,8 +283,10 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
return
code
;
return
code
;
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
}
else
if
(
pTask
->
dispatchType
==
TASK_DISPATCH__SHUFFLE
)
{
SArray
*
vgInfo
=
pTask
->
shuffleDispatcher
.
dbInfo
.
pVgroupInfos
;
int32_t
rspCnt
=
atomic_load_32
(
&
pTask
->
shuffleDispatcher
.
waitingRspCnt
);
ASSERT
(
pTask
->
shuffleDispatcher
.
waitingRspCnt
==
0
);
ASSERT
(
rspCnt
==
0
);
SArray
*
vgInfo
=
pTask
->
shuffleDispatcher
.
dbInfo
.
pVgroupInfos
;
int32_t
vgSz
=
taosArrayGetSize
(
vgInfo
);
int32_t
vgSz
=
taosArrayGetSize
(
vgInfo
);
SStreamDispatchReq
*
pReqs
=
taosMemoryCalloc
(
vgSz
,
sizeof
(
SStreamDispatchReq
));
SStreamDispatchReq
*
pReqs
=
taosMemoryCalloc
(
vgSz
,
sizeof
(
SStreamDispatchReq
));
if
(
pReqs
==
NULL
)
{
if
(
pReqs
==
NULL
)
{
...
@@ -301,7 +305,10 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
...
@@ -301,7 +305,10 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
if
(
pReqs
[
i
].
data
==
NULL
||
pReqs
[
i
].
dataLen
==
NULL
)
{
if
(
pReqs
[
i
].
data
==
NULL
||
pReqs
[
i
].
dataLen
==
NULL
)
{
goto
FAIL_SHUFFLE_DISPATCH
;
goto
FAIL_SHUFFLE_DISPATCH
;
}
}
SVgroupInfo
*
pVgInfo
=
taosArrayGet
(
vgInfo
,
i
);
pReqs
[
i
].
taskId
=
pVgInfo
->
taskId
;
}
}
for
(
int32_t
i
=
0
;
i
<
blockNum
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
blockNum
;
i
++
)
{
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
pData
->
blocks
,
i
);
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
pData
->
blocks
,
i
);
char
*
ctbName
=
buildCtbNameByGroupId
(
pTask
->
shuffleDispatcher
.
stbFullName
,
pDataBlock
->
info
.
groupId
);
char
*
ctbName
=
buildCtbNameByGroupId
(
pTask
->
shuffleDispatcher
.
stbFullName
,
pDataBlock
->
info
.
groupId
);
...
@@ -309,6 +316,9 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
...
@@ -309,6 +316,9 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
// TODO: get hash function by hashMethod
// TODO: get hash function by hashMethod
uint32_t
hashValue
=
MurmurHash3_32
(
ctbName
,
strlen
(
ctbName
));
uint32_t
hashValue
=
MurmurHash3_32
(
ctbName
,
strlen
(
ctbName
));
taosMemoryFree
(
ctbName
);
bool
found
=
false
;
// TODO: optimize search
// TODO: optimize search
int32_t
j
;
int32_t
j
;
for
(
j
=
0
;
j
<
vgSz
;
j
++
)
{
for
(
j
=
0
;
j
<
vgSz
;
j
++
)
{
...
@@ -318,12 +328,17 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
...
@@ -318,12 +328,17 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
if
(
streamAddBlockToDispatchMsg
(
pDataBlock
,
&
pReqs
[
j
])
<
0
)
{
if
(
streamAddBlockToDispatchMsg
(
pDataBlock
,
&
pReqs
[
j
])
<
0
)
{
goto
FAIL_SHUFFLE_DISPATCH
;
goto
FAIL_SHUFFLE_DISPATCH
;
}
}
pReqs
[
j
].
taskId
=
pVgInfo
->
taskId
;
if
(
pReqs
[
j
].
blockNum
==
0
)
{
atomic_add_fetch_32
(
&
pTask
->
shuffleDispatcher
.
waitingRspCnt
,
1
);
}
pReqs
[
j
].
blockNum
++
;
pReqs
[
j
].
blockNum
++
;
found
=
true
;
break
;
break
;
}
}
}
}
ASSERT
(
found
);
}
}
for
(
int32_t
i
=
0
;
i
<
vgSz
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
vgSz
;
i
++
)
{
if
(
pReqs
[
i
].
blockNum
>
0
)
{
if
(
pReqs
[
i
].
blockNum
>
0
)
{
// send
// send
...
@@ -331,7 +346,6 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
...
@@ -331,7 +346,6 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
if
(
streamDispatchOneReq
(
pTask
,
&
pReqs
[
i
],
pVgInfo
->
vgId
,
&
pVgInfo
->
epSet
)
<
0
)
{
if
(
streamDispatchOneReq
(
pTask
,
&
pReqs
[
i
],
pVgInfo
->
vgId
,
&
pVgInfo
->
epSet
)
<
0
)
{
goto
FAIL_SHUFFLE_DISPATCH
;
goto
FAIL_SHUFFLE_DISPATCH
;
}
}
pTask
->
shuffleDispatcher
.
waitingRspCnt
++
;
}
}
}
}
code
=
0
;
code
=
0
;
...
...
source/libs/stream/src/streamExec.c
浏览文件 @
cf54ec55
...
@@ -22,22 +22,22 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
...
@@ -22,22 +22,22 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
SStreamQueueItem
*
pItem
=
(
SStreamQueueItem
*
)
data
;
SStreamQueueItem
*
pItem
=
(
SStreamQueueItem
*
)
data
;
if
(
pItem
->
type
==
STREAM_INPUT__GET_RES
)
{
if
(
pItem
->
type
==
STREAM_INPUT__GET_RES
)
{
SStreamTrigger
*
pTrigger
=
(
SStreamTrigger
*
)
data
;
SStreamTrigger
*
pTrigger
=
(
SStreamTrigger
*
)
data
;
qSetMultiStreamInput
(
exec
,
pTrigger
->
pBlock
,
1
,
STREAM_INPUT__DATA_BLOCK
,
false
);
qSetMultiStreamInput
(
exec
,
pTrigger
->
pBlock
,
1
,
STREAM_INPUT__DATA_BLOCK
);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__DATA_SUBMIT
)
{
}
else
if
(
pItem
->
type
==
STREAM_INPUT__DATA_SUBMIT
)
{
ASSERT
(
pTask
->
isDataScan
);
ASSERT
(
pTask
->
isDataScan
);
SStreamDataSubmit
*
pSubmit
=
(
SStreamDataSubmit
*
)
data
;
SStreamDataSubmit
*
pSubmit
=
(
SStreamDataSubmit
*
)
data
;
qDebug
(
"task %d %p set submit input %p %p %d 1"
,
pTask
->
taskId
,
pTask
,
pSubmit
,
pSubmit
->
data
,
*
pSubmit
->
dataRef
);
qDebug
(
"task %d %p set submit input %p %p %d 1"
,
pTask
->
taskId
,
pTask
,
pSubmit
,
pSubmit
->
data
,
*
pSubmit
->
dataRef
);
qSet
StreamInput
(
exec
,
pSubmit
->
data
,
STREAM_INPUT__DATA_SUBMIT
,
false
);
qSet
MultiStreamInput
(
exec
,
pSubmit
->
data
,
1
,
STREAM_INPUT__DATA_SUBMIT
);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__DATA_BLOCK
||
pItem
->
type
==
STREAM_INPUT__DATA_RETRIEVE
)
{
}
else
if
(
pItem
->
type
==
STREAM_INPUT__DATA_BLOCK
||
pItem
->
type
==
STREAM_INPUT__DATA_RETRIEVE
)
{
SStreamDataBlock
*
pBlock
=
(
SStreamDataBlock
*
)
data
;
SStreamDataBlock
*
pBlock
=
(
SStreamDataBlock
*
)
data
;
SArray
*
blocks
=
pBlock
->
blocks
;
SArray
*
blocks
=
pBlock
->
blocks
;
qDebug
(
"task %d %p set ssdata input"
,
pTask
->
taskId
,
pTask
);
qDebug
(
"task %d %p set ssdata input"
,
pTask
->
taskId
,
pTask
);
qSetMultiStreamInput
(
exec
,
blocks
->
pData
,
blocks
->
size
,
STREAM_INPUT__DATA_BLOCK
,
false
);
qSetMultiStreamInput
(
exec
,
blocks
->
pData
,
blocks
->
size
,
STREAM_INPUT__DATA_BLOCK
);
}
else
if
(
pItem
->
type
==
STREAM_INPUT__MERGED_SUBMIT
)
{
}
else
if
(
pItem
->
type
==
STREAM_INPUT__MERGED_SUBMIT
)
{
SStreamMergedSubmit
*
pMerged
=
(
SStreamMergedSubmit
*
)
data
;
SStreamMergedSubmit
*
pMerged
=
(
SStreamMergedSubmit
*
)
data
;
SArray
*
blocks
=
pMerged
->
reqs
;
SArray
*
blocks
=
pMerged
->
reqs
;
qDebug
(
"task %d %p set submit input (merged), batch num: %d"
,
pTask
->
taskId
,
pTask
,
(
int32_t
)
blocks
->
size
);
qDebug
(
"task %d %p set submit input (merged), batch num: %d"
,
pTask
->
taskId
,
pTask
,
(
int32_t
)
blocks
->
size
);
qSetMultiStreamInput
(
exec
,
blocks
->
pData
,
blocks
->
size
,
STREAM_INPUT__MERGED_SUBMIT
,
false
);
qSetMultiStreamInput
(
exec
,
blocks
->
pData
,
blocks
->
size
,
STREAM_INPUT__MERGED_SUBMIT
);
}
else
{
}
else
{
ASSERT
(
0
);
ASSERT
(
0
);
}
}
...
...
source/libs/stream/src/streamTask.c
浏览文件 @
cf54ec55
...
@@ -64,7 +64,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
...
@@ -64,7 +64,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if
(
tEncodeI32
(
pEncoder
,
pTask
->
selfChildId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
selfChildId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
nodeId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
nodeId
)
<
0
)
return
-
1
;
if
(
tEncodeSEpSet
(
pEncoder
,
&
pTask
->
epSet
)
<
0
)
return
-
1
;
if
(
tEncodeSEpSet
(
pEncoder
,
&
pTask
->
epSet
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
numOfVgroups
)
<
0
)
return
-
1
;
/*if (tEncodeI32(pEncoder, pTask->numOfVgroups) < 0) return -1;*/
int32_t
epSz
=
taosArrayGetSize
(
pTask
->
childEpInfo
);
int32_t
epSz
=
taosArrayGetSize
(
pTask
->
childEpInfo
);
if
(
tEncodeI32
(
pEncoder
,
epSz
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
epSz
)
<
0
)
return
-
1
;
...
@@ -119,7 +119,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
...
@@ -119,7 +119,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
selfChildId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
selfChildId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
nodeId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
nodeId
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
pDecoder
,
&
pTask
->
epSet
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
pDecoder
,
&
pTask
->
epSet
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
numOfVgroups
)
<
0
)
return
-
1
;
/*if (tDecodeI32(pDecoder, &pTask->numOfVgroups) < 0) return -1;*/
int32_t
epSz
;
int32_t
epSz
;
if
(
tDecodeI32
(
pDecoder
,
&
epSz
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
epSz
)
<
0
)
return
-
1
;
...
...
source/libs/wal/inc/walInt.h
浏览文件 @
cf54ec55
...
@@ -61,26 +61,31 @@ static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight)
...
@@ -61,26 +61,31 @@ static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight)
}
}
static
inline
int64_t
walGetLastFileSize
(
SWal
*
pWal
)
{
static
inline
int64_t
walGetLastFileSize
(
SWal
*
pWal
)
{
if
(
taosArrayGetSize
(
pWal
->
fileInfoSet
)
==
0
)
return
0
;
SWalFileInfo
*
pInfo
=
(
SWalFileInfo
*
)
taosArrayGetLast
(
pWal
->
fileInfoSet
);
SWalFileInfo
*
pInfo
=
(
SWalFileInfo
*
)
taosArrayGetLast
(
pWal
->
fileInfoSet
);
return
pInfo
->
fileSize
;
return
pInfo
->
fileSize
;
}
}
static
inline
int64_t
walGetLastFileFirstVer
(
SWal
*
pWal
)
{
static
inline
int64_t
walGetLastFileFirstVer
(
SWal
*
pWal
)
{
if
(
taosArrayGetSize
(
pWal
->
fileInfoSet
)
==
0
)
return
-
1
;
SWalFileInfo
*
pInfo
=
(
SWalFileInfo
*
)
taosArrayGetLast
(
pWal
->
fileInfoSet
);
SWalFileInfo
*
pInfo
=
(
SWalFileInfo
*
)
taosArrayGetLast
(
pWal
->
fileInfoSet
);
return
pInfo
->
firstVer
;
return
pInfo
->
firstVer
;
}
}
static
inline
int64_t
walGetCurFileFirstVer
(
SWal
*
pWal
)
{
static
inline
int64_t
walGetCurFileFirstVer
(
SWal
*
pWal
)
{
if
(
pWal
->
writeCur
==
-
1
)
return
-
1
;
SWalFileInfo
*
pInfo
=
(
SWalFileInfo
*
)
taosArrayGet
(
pWal
->
fileInfoSet
,
pWal
->
writeCur
);
SWalFileInfo
*
pInfo
=
(
SWalFileInfo
*
)
taosArrayGet
(
pWal
->
fileInfoSet
,
pWal
->
writeCur
);
return
pInfo
->
firstVer
;
return
pInfo
->
firstVer
;
}
}
static
inline
int64_t
walGetCurFileLastVer
(
SWal
*
pWal
)
{
static
inline
int64_t
walGetCurFileLastVer
(
SWal
*
pWal
)
{
if
(
pWal
->
writeCur
==
-
1
)
return
-
1
;
SWalFileInfo
*
pInfo
=
(
SWalFileInfo
*
)
taosArrayGet
(
pWal
->
fileInfoSet
,
pWal
->
writeCur
);
SWalFileInfo
*
pInfo
=
(
SWalFileInfo
*
)
taosArrayGet
(
pWal
->
fileInfoSet
,
pWal
->
writeCur
);
return
pInfo
->
firstVer
;
return
pInfo
->
firstVer
;
}
}
static
inline
int64_t
walGetCurFileOffset
(
SWal
*
pWal
)
{
static
inline
int64_t
walGetCurFileOffset
(
SWal
*
pWal
)
{
if
(
pWal
->
writeCur
==
-
1
)
return
-
1
;
SWalFileInfo
*
pInfo
=
(
SWalFileInfo
*
)
taosArrayGet
(
pWal
->
fileInfoSet
,
pWal
->
writeCur
);
SWalFileInfo
*
pInfo
=
(
SWalFileInfo
*
)
taosArrayGet
(
pWal
->
fileInfoSet
,
pWal
->
writeCur
);
return
pInfo
->
fileSize
;
return
pInfo
->
fileSize
;
}
}
...
@@ -88,6 +93,7 @@ static inline int64_t walGetCurFileOffset(SWal* pWal) {
...
@@ -88,6 +93,7 @@ static inline int64_t walGetCurFileOffset(SWal* pWal) {
static
inline
bool
walCurFileClosed
(
SWal
*
pWal
)
{
return
taosArrayGetSize
(
pWal
->
fileInfoSet
)
!=
pWal
->
writeCur
;
}
static
inline
bool
walCurFileClosed
(
SWal
*
pWal
)
{
return
taosArrayGetSize
(
pWal
->
fileInfoSet
)
!=
pWal
->
writeCur
;
}
static
inline
SWalFileInfo
*
walGetCurFileInfo
(
SWal
*
pWal
)
{
static
inline
SWalFileInfo
*
walGetCurFileInfo
(
SWal
*
pWal
)
{
if
(
pWal
->
writeCur
==
-
1
)
return
NULL
;
return
(
SWalFileInfo
*
)
taosArrayGet
(
pWal
->
fileInfoSet
,
pWal
->
writeCur
);
return
(
SWalFileInfo
*
)
taosArrayGet
(
pWal
->
fileInfoSet
,
pWal
->
writeCur
);
}
}
...
...
tests/script/jenkins/basic.txt
浏览文件 @
cf54ec55
...
@@ -297,8 +297,9 @@
...
@@ -297,8 +297,9 @@
# --- sma
# --- sma
./test.sh -f tsim/sma/drop_sma.sim
./test.sh -f tsim/sma/drop_sma.sim
./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim
./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
# temp disable
./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
#./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim
#./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim
# --- valgrind
# --- valgrind
./test.sh -f tsim/valgrind/checkError1.sim
./test.sh -f tsim/valgrind/checkError1.sim
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录